In [175]:
import matplotlib.pyplot as plt
import ipyparallel as ipp
from math import sqrt
import seaborn as sns
import pandas as pd
import numpy as np
import time
import re

# Run Parallel Python

To initialize the cluster, run this command: 
`!ipcluster start --n=X` where `X` is the number of engines you want to run.

In [159]:
rc = ipp.Client()
dv = rc[:]
print(f'Running {len(dv)} nodes')

Running 50 nodes


In [6]:
df = pd.read_csv('data/fixed_data.csv')

In [197]:
data = df[:10000000]

In [198]:
data.shape

(10000000, 15)

In [199]:
baskets = data.basket_id.unique()
print(f'There are {len(baskets):,} unique baskets.')
products = data['product'].unique()
print(f'There are {len(products):,} unique products.')

There are 2,604,106 unique baskets.
There are 44 unique products.


### Single core purchase vector extraction

In [163]:
def in_basket_single(b_id):
    global output_log
    basket_ls = data.loc[data.basket_id == b_id]['product']
    return [1 if p in basket_ls.values else 0 for p in products]

### Parallel purchase vector extraction

In [164]:
@dv.parallel(block=True)
def in_basket(b_id):
    global output_log
    basket_ls = data.loc[data.basket_id == b_id]['product']
    return [1 if p in basket_ls.values else 0 for p in products]

In [165]:
dv.push(dict(data=data, products=products, output_log=output_log))

<AsyncResult: _push>

### Estimate time for all baskets in current slice

In [193]:
def estimate(num_tests):
    start = time.time()
    res_set = in_basket.map(baskets[:num_tests])
    return (((time.time() - start)/num_tests)*len(baskets))/3600

### Test the performance of single-core vs. parallel

In [183]:
def speed_test(num_tests):
    
    # Linear
    temp_set = []
    start = time.time()
    for i in range(num_tests):
        temp_set.append(in_basket_single(baskets[i]))
    timed_test = (time.time() - start)
    
    print(f'Finished linear in {round(timed_test, 2)} seconds.')
    
    # Parallel
    end_parallel = estimate(num_tests)
    print(f'Finished parallel in {round(end_parallel, 2)} seconds.')
    
    # Extrapolate for all baskets in slice
    extrapolate = lambda x: ((x/num_tests)*len(baskets))/3600
    lin_est, par_est = extrapolate(timed_test), extrapolate(end_parallel)
    
    print(f'\nEstimated Completion Time (All {len(baskets):,} Baskets):\nLinear: {round(lin_est, 1)} hours\nParallel ({len(dv)} nodes): {round(par_est, 1)} hours')
    

In [184]:
speed_test(1000) # 50 nodes / 1,000,000 slice

Finished linear in 48.88 seconds.
Finished parallel in 14.08 seconds.

Estimated Completion Time (All 473,641 Baskets):
Linear: 6.4 hours
Parallel (50 nodes): 1.9 hours


#### Find an optimal run-time based on estimates

In [201]:
# 10,000,000 slice
e = estimate(1000)

print(f'Estimated Time: {round(e, 2)} hours.\nNodes: {len(dv)}\nMatrix Size: ({len(baskets):,} x {len(products)})')

Estimated Time: 10.28 hours.
Nodes: 50
Matrix Size: (2,604,106 x 44)


## Parallel Product Feature Extraction

In [None]:
# TODO: WRITE THIS