In [5]:
import pywren
import numpy as np
import scipy.linalg as linalg
import boto3
import io
import itertools
from numba import jit
import math
import time
from sklearn.kernel_approximation import RBFSampler

In [6]:
pwex = pywren.default_executor()

In [None]:
BUCKET = "imagenet-raw"
BLOCK_PATH_ROOT = "fc7-blocked/"
KERNEL_BLOCK_PATH_ROOT = "fc7-kernel-blocked/"
KEY_TEMPLATE = BLOCK_PATH_ROOT + "imagenet_features_alexnet_fc7_blocked_{0}_{1}.npz"
OUT_KEY_TEMPLATE = KERNEL_BLOCK_PATH_ROOT + "fc7_kernel_block_size_{0}_gamma_{1}_block_{2}_{3}_{4}_{5}"

BLOCK_SIZE = 4096
NUM_EXAMPLES = 1281167
NUM_BLOCKS = int(math.ceil(1281167.0/BLOCK_SIZE))


In [None]:
@jit
def fast_exp(K):
    for i in range(K.shape[0]):
        for j in range(K.shape[1]):
            K[i,j] = math.exp(K[i,j])
    return K

def chunk(l, n):
    """Yield successive n-sized chunks from l."""
    for i in range(0, len(l), n):
        yield l[i:i + n]


def computeDistanceMatrix(XTest, XTrain):
    XTrain = XTrain.reshape(XTrain.shape[0], -1)
    XTest = XTest.reshape(XTest.shape[0], -1)
    XTrain_norms = (np.linalg.norm(XTrain, axis=1) ** 2)[:, np.newaxis]
    XTest_norms = (np.linalg.norm(XTest, axis=1) ** 2)[:, np.newaxis]
    K = XTest.dot(XTrain.T)
    K *= -2
    K += XTrain_norms.T
    K += XTest_norms
    return K

def computeRBFGramMatrix(XTest, XTrain, gamma=1):
    gamma = -1.0 * gamma
    return fast_exp(gamma*computeDistanceMatrix(XTest, XTrain))

def compute_kernel_blocks(block_pairs, bucket=BUCKET, block_size=4096, gamma=1e-5, num_examples=1281167):
    times = np.zeros(4)
    for bp in block_pairs:
        times += compute_kernel_block(bp, bucket, block_size, gamma, num_examples)
        
    times /= float(len(block_pairs))
    return times

def compute_kernel_block(block_nums, bucket=BUCKET, block_size=4096, gamma=1e-5, num_examples=1281167):
    ''' Compute a kernel block when design matrix is sharded on s3 '''
    start = time.time()
    block_num_1, block_num_2 = block_nums[0], block_nums[1]
    # blocks are symmetric so only generate lower half
    block_num_1, block_num_2 = min(block_num_1, block_num_2), max(block_num_1, block_num_2)
    block_1_key = block_num_to_shard_key(block_num_1, block_size, num_examples)
    block_2_key = block_num_to_shard_key(block_num_2, block_size, num_examples)
    meta_time = time.time() - start
    
    start = time.time()
    block1 = np.load(s3_key_to_byte_io(bucket, block_1_key))["X_train"]
    block2 = np.load(s3_key_to_byte_io(bucket, block_2_key))["X_train"]
    download_time = time.time() - start
    
    start = time.time() 
    K = computeRBFGramMatrix(block1, block2, gamma=gamma)
    kernel_time  = time.time() - start
    
    start = time.time()
    out_key = block_num_to_output_shard_key(block_num_1, block_num_2, block_size, gamma, num_examples)
    save_matrix_to_s3(K, bucket, out_key)
    upload_time = time.time() - start
    return np.array([meta_time, download_time, kernel_time, upload_time])
    
def s3_key_to_byte_io(bucket, key):
     client = boto3.client('s3')
     return io.BytesIO(client.get_object(Bucket=bucket, Key=key)['Body'].read())

def block_num_to_shard_key(block_num, block_size, num_examples, key_template=KEY_TEMPLATE):
    block_start = block_size*block_num
    block_end = min(block_size*(block_num+1), num_examples)
    key = key_template.format(block_start, block_end)
    return key 

def block_num_to_output_shard_key(block_num_1, block_num_2, block_size, gamma, num_examples, key_template=OUT_KEY_TEMPLATE):
    block_start_idx_1 = block_size*block_num_1
    block_end_idx_1 = min(block_size*(block_num_1+1), num_examples)
    block_start_idx_2 = block_size*block_num_2
    block_end_idx_2 = min(block_size*(block_num_2+1), num_examples)
    return OUT_KEY_TEMPLATE.format(block_size, 
                                   gamma, 
                                   block_start_idx_1, 
                                   block_end_idx_1, 
                                   block_start_idx_2, 
                                   block_end_idx_2)

def save_matrix_to_s3(K, bucket, out_key):
     client = boto3.client('s3')
     outb = io.BytesIO()
     np.save(outb, K)
     response = client.put_object(Key=out_key, Bucket=bucket, Body=outb.getvalue())
     return response
    
def generate_chunked_block_pairs(num_blocks, inner_chunk_size=10, outer_chunk_size=1000):
    all_pairs = list(itertools.product(range(NUM_BLOCKS), range(NUM_BLOCKS)))
    sorted_pairs = map(lambda x: tuple(sorted(x)), all_pairs)
    dedup_sorted_pairs = list(set(sorted_pairs))
    print len(dedup_sorted_pairs)
    return list(chunk(list(chunk(dedup_sorted_pairs, inner_chunk_size)), outer_chunk_size))

In [None]:
chunked_blocks = generate_chunked_block_pairs(NUM_BLOCKS, 4, 500)

In [None]:
all_futures = [] 
all_times = []
gamma = 1e-3
for c in chunked_blocks:
    t = time.time()
    %time kernel_futures = pwex.map(lambda x: compute_kernel_blocks(x, gamma=gamma), c)
    %time pywren.wait(kernel_futures)
    all_futures.extend(kernel_futures)
    all_times.append(time.time() - t)

     

CPU times: user 18.7 s, sys: 1.35 s, total: 20.1 s
Wall time: 13.7 s
CPU times: user 30.4 s, sys: 3.69 s, total: 34.1 s
Wall time: 6min 53s
CPU times: user 15.6 s, sys: 1.02 s, total: 16.6 s
Wall time: 11.8 s
CPU times: user 21 s, sys: 2.57 s, total: 23.6 s
Wall time: 3min 10s
CPU times: user 18.7 s, sys: 1.33 s, total: 20 s
Wall time: 13.3 s
CPU times: user 13 s, sys: 1.84 s, total: 14.8 s
Wall time: 1min 1s
CPU times: user 17.1 s, sys: 1.2 s, total: 18.3 s
Wall time: 11.9 s
CPU times: user 13.6 s, sys: 1.9 s, total: 15.5 s
Wall time: 1min 14s
CPU times: user 16.3 s, sys: 1.2 s, total: 17.5 s
Wall time: 11.2 s
CPU times: user 14.7 s, sys: 2.06 s, total: 16.8 s
Wall time: 1min 14s
CPU times: user 16.7 s, sys: 1.22 s, total: 17.9 s
Wall time: 12 s
CPU times: user 13.7 s, sys: 1.91 s, total: 15.6 s
Wall time: 1min 7s


In [267]:
len(all_futures)

12286