# Intorduction to Joblib - Easy parallelization with joblib

    - What is Joblib ? 

        joblib is a python module which includes functionality for doing various useful tasks, 
        
        like caching the output of functions given certain input, 
        
        logging/tracing, 
        
        and easy parallelization. 

    - Purpose of this Jupyter Notebook :

        Today, we'll focus on the latter, which essentially is a wrapper around Python's built-in multiprocessing module. 
        
        It's essentially just an extremely easy way to write a parallelized for loop, whcic i find quite usefull when we constructing the features for 
        
        our extremly large traning dataset.

    - Reference : https://pypi.org/project/joblib/ 


In [1]:
import numpy as np 
import pandas as pd 

# First look of joblib package 

    - The Two important functiosn of joblib are 

        from joblib import " Parallel, delayed "


    - delayed :
    
        這個 function 裡面放你想要執行的 function.
    
    - Parallel :
    
        constructs a class which can be called with a list of tuples, where each tuple includes a function and its arguments.

In [2]:
from joblib import Parallel, delayed

now , consider a normal one core process constucting a for loop

In [3]:
[np.power(i, 2) for i in range(10)]

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

now using Parallel with one core and delayed

In [5]:
Parallel() (delayed(np.power)(i,2) for i in range(10))

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

To parallel computing, simply set n_jobs = b in Parallel(),and verbose 是每當完成幾個工作就會回傳提醒

p.s. verbose -> 囉唆、冗長的


In [7]:
Parallel(n_jobs=-1,verbose=1)(delayed(np.power)(i,2) for i in range(10))

[Parallel(n_jobs=-1)]: Using backend LokyBackend with 8 concurrent workers.
[Parallel(n_jobs=-1)]: Done   6 out of  10 | elapsed:    0.4s remaining:    0.3s
[Parallel(n_jobs=-1)]: Done  10 out of  10 | elapsed:    0.4s finished


[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

# Complex Parallel Computing

    np.power is not a function you'd need to parallelize calls to in general, b
    
    ecause it's so fast and the overhead for parallelization makes it not worth while (we'll get to that later). 
    
    Here's a function which actually takes a while to compute:

    - 線性捲機 : https://baike.baidu.hk/item/%E7%B7%9A%E6%80%A7%E5%8D%B7%E7%A9%8D/5908978

        Reader do not have to fully understant what is this, u only have to know is a complex math computing when size become bigger

In [8]:
def convolve_random(size):
    ''' Convolve two random arrays of length "size" '''
    return np.convolve(np.random.random_sample(size), np.random.random_sample(size))

In [9]:
convolve_random(10)

array([3.25184162e-01, 1.08763762e+00, 1.19716088e+00, 1.12011675e+00,
       1.24492314e+00, 1.26467220e+00, 1.39385762e+00, 1.84781234e+00,
       2.29287844e+00, 3.03601823e+00, 1.69915964e+00, 1.44255611e+00,
       1.21953639e+00, 9.94905586e-01, 1.05836613e+00, 1.08608258e+00,
       8.95703072e-01, 3.80152711e-01, 1.44145227e-03])

In [10]:
testing_loop_generate_list  = [ convolve_random(100 + i*10) for i in range(2)]

print("Len of loop :"  ,len(testing_loop_generate_list))
print("size of loop 1 :" ,len(testing_loop_generate_list[0]))
print("size of loop 2 :" ,len(testing_loop_generate_list[1]))

Len of loop : 2
size of loop 1 : 199
size of loop 2 : 219


現在 we build a list 含有 8 個 convolve_random list

In [11]:
# Time to run once with length-40000 arrays
%timeit [convolve_random(40000 + i*1000) for i in range(8)]

9.2 s ± 243 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


可以電腦已經需要大量的計算時間了, Next we look at how much times do we need if we use Parallel

In [12]:
%timeit Parallel(n_jobs=8,verbose=1)(delayed(convolve_random)(40000 + i*1000) for i in range(8))

[Parallel(n_jobs=8)]: Using backend LokyBackend with 8 concurrent workers.
[Parallel(n_jobs=8)]: Done   2 out of   8 | elapsed:    0.6s remaining:    1.8s
[Parallel(n_jobs=8)]: Done   8 out of   8 | elapsed:    0.8s finished
[Parallel(n_jobs=8)]: Using backend LokyBackend with 8 concurrent workers.
[Parallel(n_jobs=8)]: Done   2 out of   8 | elapsed:    0.6s remaining:    1.8s
[Parallel(n_jobs=8)]: Done   8 out of   8 | elapsed:    0.7s finished
[Parallel(n_jobs=8)]: Using backend LokyBackend with 8 concurrent workers.
[Parallel(n_jobs=8)]: Done   2 out of   8 | elapsed:    0.5s remaining:    1.6s
[Parallel(n_jobs=8)]: Done   8 out of   8 | elapsed:    0.7s finished
[Parallel(n_jobs=8)]: Using backend LokyBackend with 8 concurrent workers.
[Parallel(n_jobs=8)]: Done   2 out of   8 | elapsed:    0.6s remaining:    1.8s
[Parallel(n_jobs=8)]: Done   8 out of   8 | elapsed:    0.8s finished
[Parallel(n_jobs=8)]: Using backend LokyBackend with 8 concurrent workers.
[Parallel(n_jobs=8)]: Don

729 ms ± 28.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


[Parallel(n_jobs=8)]: Done   2 out of   8 | elapsed:    0.5s remaining:    1.6s
[Parallel(n_jobs=8)]: Done   8 out of   8 | elapsed:    0.7s finished


we can see the major computing time differences  between two generate method

# More Complex Parallel Computing

    - the brutally honest question will be what is the best situation for using Joblib's Paralel Computing

       我自己認為當你 "必須" loop through a bunch of files , and keep doing the same assignment on each file 

        - 不同的 stock_id 的 orderbook file (extremly large file)
        - 不同的 stock_id 的 option file    (extremly large file)

In [45]:
# File Case for data path file case !
data_dir = r'/Users/chen-lichiang/Desktop/python/Optiver Kaggle /Deveopment/'

# Funtion to make preprocessing function in parallel (for each stock id)
def preprocessor(list_stock_ids):
    

    def orderbook_featrue_engineeering(order_book_df):
        """
        for single stock_id's orderbook
        """

        order_book_df['function_checking'] = "check"

        return order_book_df

    # Parrallel for loop 
    def for_joblib(stock_id):
        """
        把每一個 unique stock id 需要處理的放在這裡
        """

        file_path_book  = data_dir + "book_train.parquet/stock_id="  + str(stock_id)
        file_path_trade = data_dir + "trade_train.parquet/stock_id=" + str(stock_id)

        orderbook_file  = pd.read_parquet(file_path_book)  # can eaily be other storage file , like pickle,sql
        orderbook_file  = orderbook_featrue_engineeering(orderbook_file)

        tradebook_file  = pd.read_parquet(file_path_trade)

        combine_file = pd.merge(orderbook_file,tradebook_file)
        combine_file['stock_id'] = stock_id

        return combine_file

        # Use parallel api to call paralle for loop
    df = Parallel(n_jobs = -1, verbose = 1)(delayed(for_joblib)(stock_id) for stock_id in list_stock_ids)
    df = pd.concat(df,ignore_index=True)

    return df 

preprocessor(list_stock_ids=[i for i in range(0,5)])

[Parallel(n_jobs=-1)]: Using backend LokyBackend with 8 concurrent workers.
[Parallel(n_jobs=-1)]: Done   2 out of   5 | elapsed:    0.7s remaining:    1.0s
[Parallel(n_jobs=-1)]: Done   5 out of   5 | elapsed:    1.0s finished


Unnamed: 0,time_id,seconds_in_bucket,bid_price1,ask_price1,bid_price2,ask_price2,bid_size1,ask_size1,bid_size2,ask_size2,function_checking,price,size,order_count,stock_id
0,5,21,1.001422,1.002818,1.001370,1.002922,3,30,2,100,check,1.002301,326,12,0
1,5,46,1.002818,1.003232,1.002301,1.003801,155,1,200,34,check,1.002778,128,4,0
2,5,50,1.002353,1.003025,1.002301,1.003232,3,20,100,301,check,1.002818,55,1,0
3,5,57,1.002508,1.003646,1.002457,1.003749,200,104,28,200,check,1.003155,121,5,0
4,5,68,1.002870,1.003749,1.002818,1.003801,100,100,11,34,check,1.003646,4,1,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1273020,32767,504,0.998583,0.999239,0.998566,0.999274,100,11,100,32,check,0.998902,1,1,4
1273021,32767,520,0.999203,0.999894,0.999186,0.999947,100,7,200,58,check,0.999437,200,7,4
1273022,32767,521,0.999203,0.999876,0.999186,0.999894,100,100,200,7,check,0.999805,1,1,4
1273023,32767,553,0.999203,1.000354,0.999079,1.000407,100,17,25,25,check,0.999664,1,1,4


# Create "Dict for aggregations" in Parallel Computing

In [89]:
# File Case for data path file case !
data_dir = r'/Users/chen-lichiang/Desktop/python/Optiver Kaggle /Deveopment/'

# Funtion to make preprocessing function in parallel (for each stock id)
def preprocessor(list_stock_ids):
    

    def orderbook_featrue_engineeering(order_book_df, stock_id):
        """
        for single stock_id's orderbook
        """

        order_book_df['function_checking'] = "check"

        df_feature     = get_stats_window(df=order_book_df,seconds_in_bucket = 0, add_suffix = False)
        df_feature_450 = get_stats_window(df=order_book_df,seconds_in_bucket = 450, add_suffix = True)
        df_feature_300 = get_stats_window(df=order_book_df,seconds_in_bucket = 300, add_suffix = True)
        df_feature_150 = get_stats_window(df=order_book_df,seconds_in_bucket = 150, add_suffix = True)

        # Merge all
        df_feature = df_feature.merge(df_feature_450, how = 'left', left_on = 'time_id_', right_on = 'time_id__450')
        df_feature = df_feature.merge(df_feature_300, how = 'left', left_on = 'time_id_', right_on = 'time_id__300')
        df_feature = df_feature.merge(df_feature_150, how = 'left', left_on = 'time_id_', right_on = 'time_id__150')

        # Drop unnecesary time_ids
        df_feature.drop(['time_id__450', 'time_id__300', 'time_id__150'], axis = 1, inplace = True)

        return df_feature

    
    # Function to get group stats for different windows (seconds in bucket)
    def get_stats_window(df,seconds_in_bucket, add_suffix = False):

        # Dict for aggregations
        create_feature_dict = {
            'ask_price1': [np.sum, np.mean, np.std],
            'ask_price2': [np.sum, np.mean, np.std],
        }

        # Group by the window
        df_feature = df[df['seconds_in_bucket'] >= seconds_in_bucket].groupby(['time_id']).agg(create_feature_dict).reset_index()

        # Rename columns joining suffix
        df_feature.columns = ['_'.join(col) for col in df_feature.columns]

        # Add a suffix to differentiate windows
        if add_suffix:
            df_feature = df_feature.add_suffix('_' + str(seconds_in_bucket))


        return df_feature

    # Parrallel for loop 
    def for_joblib(stock_id):
        """
        把每一個 unique stock id 需要處理的放在這裡
        """

        file_path_book  = data_dir + "book_train.parquet/stock_id="  + str(stock_id)
        file_path_trade = data_dir + "trade_train.parquet/stock_id=" + str(stock_id)

        orderbook_file  = pd.read_parquet(file_path_book)  # can eaily be other storage file , like pickle,sql
        orderbook_file  = orderbook_featrue_engineeering(orderbook_file,stock_id)
        tradebook_file  = pd.read_parquet(file_path_trade)

        combine_file = pd.merge(orderbook_file,tradebook_file,left_on='time_id_',right_on='time_id')

        return combine_file

        # Use parallel api to call paralle for loop
    df = Parallel(n_jobs = -1, verbose = 1)(delayed(for_joblib)(stock_id) for stock_id in list_stock_ids)
    df = pd.concat(df,ignore_index=True)

    return df 

data = preprocessor(list_stock_ids=[i for i in range(0,5)])
data

[Parallel(n_jobs=-1)]: Using backend LokyBackend with 8 concurrent workers.
[Parallel(n_jobs=-1)]: Done   2 out of   5 | elapsed:    1.8s remaining:    2.7s
[Parallel(n_jobs=-1)]: Done   5 out of   5 | elapsed:    2.3s finished


Unnamed: 0,time_id_,ask_price1_sum,ask_price1_mean,ask_price1_std,ask_price2_sum,ask_price2_mean,ask_price2_std,ask_price1_sum_450,ask_price1_mean_450,ask_price1_std_450,...,ask_price1_mean_150,ask_price1_std_150,ask_price2_sum_150,ask_price2_mean_150,ask_price2_std_150,time_id,seconds_in_bucket,price,size,order_count
0,5,303.259064,1.004169,0.000601,303.304626,1.00432,0.000604,68.264679,1.003892,0.000438,...,1.004295,0.000407,233.030380,1.004441,0.000395,5,21,1.002301,326,12
1,5,303.259064,1.004169,0.000601,303.304626,1.00432,0.000604,68.264679,1.003892,0.000438,...,1.004295,0.000407,233.030380,1.004441,0.000395,5,46,1.002778,128,4
2,5,303.259064,1.004169,0.000601,303.304626,1.00432,0.000604,68.264679,1.003892,0.000438,...,1.004295,0.000407,233.030380,1.004441,0.000395,5,50,1.002818,55,1
3,5,303.259064,1.004169,0.000601,303.304626,1.00432,0.000604,68.264679,1.003892,0.000438,...,1.004295,0.000407,233.030380,1.004441,0.000395,5,57,1.003155,121,5
4,5,303.259064,1.004169,0.000601,303.304626,1.00432,0.000604,68.264679,1.003892,0.000438,...,1.004295,0.000407,233.030380,1.004441,0.000395,5,68,1.003646,4,1
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1273020,32767,245.918777,0.999670,0.000402,245.938507,0.99975,0.000405,75.968857,0.999590,0.000398,...,0.999638,0.000435,200.944031,0.999722,0.000440,32767,504,0.998902,1,1
1273021,32767,245.918777,0.999670,0.000402,245.938507,0.99975,0.000405,75.968857,0.999590,0.000398,...,0.999638,0.000435,200.944031,0.999722,0.000440,32767,520,0.999437,200,7
1273022,32767,245.918777,0.999670,0.000402,245.938507,0.99975,0.000405,75.968857,0.999590,0.000398,...,0.999638,0.000435,200.944031,0.999722,0.000440,32767,521,0.999805,1,1
1273023,32767,245.918777,0.999670,0.000402,245.938507,0.99975,0.000405,75.968857,0.999590,0.000398,...,0.999638,0.000435,200.944031,0.999722,0.000440,32767,553,0.999664,1,1


above we dot  address tadebook dataset, therefore we have duplicate columns in the left side of dataframe

# I think this is a basic introduction to parallel loop computing using Joblib, hope it's helpful to u !