In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load
import os
# os.environ["OPENBLAS_NUM_THREADS"] = "1"
%matplotlib inline
import matplotlib.pyplot as plt
from sklearn import datasets
import seaborn as sns
import pandas as pd
import numpy as np
import time
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

### Data Generation

In [2]:
from sklearn.model_selection import train_test_split

X, y = datasets.make_classification(n_samples=10000, n_features=10)
print(X.shape, y.shape)


X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state = 2020)
print(X_train.shape, X_test.shape)
print(y_train.shape, y_test.shape)

(10000, 10) (10000,)
(8000, 10) (2000, 10)
(8000,) (2000,)


### Training

In [3]:
from sklearn.naive_bayes import GaussianNB
from sklearn.linear_model import LogisticRegression
from sklearn.linear_model import SGDClassifier
from sklearn.svm import LinearSVC
from sklearn.ensemble import RandomForestClassifier

clf_dict = {
    
    "NB": {'clf': GaussianNB(),'params': {}},
                    
    "LR": {'clf': LogisticRegression(),
           'params' : {'penalty': ['l1','l2', 'elasticnet'],
                       'C': [0.001,0.01,0.1,1,10,100,1000]}},
    "SVC": {'clf': LinearSVC(),
            'params': { 'penalty': ['l1','l2'],
                       'C': [0.01, 0.1,1,10, 100],}}, 

    "RF": {'clf': RandomForestClassifier(),
           'params': {}},
    
            }

In [4]:
from sklearn.model_selection import GridSearchCV 
from sklearn.metrics import accuracy_score
import joblib

def classify_data(X_train, y_train, X_test, y_test, verbose = True):
    
    results_df = pd.DataFrame(data=np.zeros(shape=(len(clf_dict), 4)),
                              columns = ['classifier', 'training_time',
                                         'train_acc', 'test_acc'])
    
    for i, key in enumerate(clf_dict.keys()):
        start = time.time()
        cv = GridSearchCV(clf_dict[key]['clf'],
                          clf_dict[key]['params'],
                          cv = 5, refit  = True,
                          scoring = 'accuracy',
                          n_jobs = -1)
        
        estimator = cv.fit(X_train, y_train)
        clf_dict[key]['best_params'] = cv.best_params_
        end = time.time()
        tdiff = end - start
        joblib.dump(cv.best_estimator_, f"{key}.joblib")
        if verbose:
            print("{c} training time: {f:.2f} s".format(c=key,f=tdiff))

        train_score = estimator.score(X_train,y_train)
        test_score = estimator.score(X_test,y_test)
        results_df.loc[i,'classifier'] = key
        results_df.loc[i,'train_acc'] = train_score
        results_df.loc[i,'test_acc'] = test_score
        results_df.loc[i,'training_time'] = tdiff
        
    return results_df

In [5]:
res_df = classify_data(X_train, y_train, X_test, y_test)
display(res_df.sort_values(by='test_acc', ascending=False))

NB training time: 1.73 s
LR training time: 0.40 s




SVC training time: 2.92 s
RF training time: 7.33 s


Unnamed: 0,classifier,training_time,train_acc,test_acc
3,RF,7.327809,1.0,0.938
1,LR,0.398792,0.912875,0.921
2,SVC,2.917563,0.91275,0.9205
0,NB,1.731807,0.910375,0.9185


### Inference

In [6]:
import sklearn
sklearn.__version__, joblib.__version__

('0.23.2', '1.0.0')

In [7]:
saved_models = [pt for pt in os.listdir('.') if pt.endswith('joblib')]
saved_models

['RF.joblib', 'SVC.joblib', 'LR.joblib', 'NB.joblib']

In [8]:
import joblib

def load_sklearn_joblib_model(path, n_jobs = 1):
    # Load from file
    model = joblib.load(path)

    # make n_jobs = 1, to avoid oversubcription.
    # because esemble models like RF,GBM are already parallelized even for predict method.
    # ensemble models build the trees parallely using all cores.
    if 'n_jobs' in model.get_params().keys():
        n_jobs_param = {'n_jobs': n_jobs}
        model.set_params(**n_jobs_param)
        
    return model

### Will Parallelizing the predict method (n_jobs > 1) for sklearn improve inference time ?

- We can use Joblib to parallelize the predict method of an algo if it is not already.
- If it is already parallelized, we can use n_jobs parameter.

### How to Tell if an algo used sklearn is already parallelized ?

- A Hack: using CPUs/second. What that means ?

If you ever used %%time magic command, it will give two outputs

`CPU times: user 42.2 ms, sys: 6.23 ms, total: 48.4 ms
 Wall time: 47 ms`
 
 CPU times: Total time your cpus took (all cores).
 Wall time: Time took to run the function (Wait time to get the result).
 
 `CPU/second = CPU times / Wall time`
 
 - if CPU/s =~ 1 , it means the algo is running only on single core using it 100%.
 - if CPU/s >> 1 (significantly), multiple cores are being used i.e. parallelized.
 
 - if CPU/s << 1: The lower the number, the more of its time the process spent waiting (for the network, or the harddrive, or locks, or other processes to release the CPU, or just sleeping). E.g. if CPU/s is 0.75, 25% of the time was spent waiting.

To explore more on this:

https://pythonspeed.com/articles/blocking-cpu-or-io/

In [9]:
def timer(f):
    """
    A simple python decorator
    to calculate CPU & Wall times
    of any function
    """
    
    import time
    def timed(*args, **kw):
        cs, ws = time.process_time(), time.time()
        result = f(*args, **kw)
        ce, we = time.process_time(), time.time()
        ct, wt = ce-cs, we-ws
        print(f"func: {f.__name__}, CPU/s: {ct/wt:.4f}")
        print(f"CPUtimes: {ct:.4f} s , Walltime: {wt:.4f} s") 
        return result
    return timed


@timer
def joblib_predict(model, X):
    predictions = model.predict(X)
    return predictions.tolist()

In [10]:
# generate fake data for inference
X_infer, y_infer = datasets.make_classification(n_samples=20000, n_features=10)
print('Input:', X_infer.shape)


for load_path in saved_models:
    print('\nModel: ',load_path)
    clf = load_sklearn_joblib_model(load_path)
    time.sleep(3)
    preds = joblib_predict(clf, X_infer)
    time.sleep(10)
    del clf, preds

Input: (20000, 10)

Model:  RF.joblib
func: joblib_predict, CPU/s: 0.9998
CPUtimes: 0.2574 s , Walltime: 0.2574 s

Model:  SVC.joblib
func: joblib_predict, CPU/s: 1.9948
CPUtimes: 0.0143 s , Walltime: 0.0071 s

Model:  LR.joblib
func: joblib_predict, CPU/s: 2.7214
CPUtimes: 0.0047 s , Walltime: 0.0017 s

Model:  NB.joblib
func: joblib_predict, CPU/s: 1.0004
CPUtimes: 0.0043 s , Walltime: 0.0043 s


#### From the above results:
 - LR - predict is parallelized.
 - GaussianNB - No n_jobs & predict method is not parallelized.
 - LinearSVC - No n_jobs & predict method runs in parallel.
 - RandomForest - parallelized, but can be controlled using n_jobs.

In [11]:
from sklearn.utils import gen_batches
from joblib import Parallel, delayed, cpu_count
print('num of cpus:', cpu_count())

@timer
def joblib_parallel_predict(model, X, 
                            jb_kwargs = {'prefer': "threads",
                                         'require': "sharedmem"}):
    n_jobs = max(cpu_count(), 1)
    slices = gen_batches(len(X), len(X)//n_jobs)
    parallel = Parallel(n_jobs=n_jobs, **jb_kwargs)
    results = parallel(delayed(model.predict)(X[s]) for s in slices)
    return np.vstack(results).flatten().tolist()

num of cpus: 4


- Since, The predict method of NB is not parallelized, I will try to use Joblib's Parallel & delayed functions to make it Parallel across all the cores.

- What happens if try to parallelize the code which is already runs in all cores ? It results in oversubcriptions of threads, increasing overhead. So the function runs even slower.

In [12]:
load_path = 'NB.joblib'
print('\nModel: ', load_path)
clf = load_sklearn_joblib_model(load_path)
time.sleep(3)
preds = joblib_parallel_predict(clf, X_infer)
assert len(preds) == len(X_infer)
del clf, preds


Model:  NB.joblib
func: joblib_parallel_predict, CPU/s: 0.1467
CPUtimes: 0.0157 s , Walltime: 0.1071 s


- One thing to notice: CPU/s is increased only slightly (i.e only using all cores partially), Parallelized NB still takes way more time than original one (0.10 > 0.0047) for batch size of 20k. 

- This is due to the overhead of slicing data & accumulating the results. Also, sklearn models are mostly optimized for larger batches. So let us check at what is minimum batch size to use to leverage power of parallelzing.

In [13]:
for ns in [10, 100, 10**3, 10**4, 10**5, 10**6, 10**7]:
    load_path = 'NB.joblib'
    print('Model: ', load_path)
    X_infer, _ = datasets.make_classification(n_samples=ns, n_features=10)
    
    # direct predict
    print('\nInput:', X_infer.shape, 'Parallel: ', False)
    clf = load_sklearn_joblib_model(load_path)
    time.sleep(5)
    preds = joblib_predict(clf, X_infer)
    time.sleep(10)
    del clf, preds
    
    # parallel predict threads
    print('\nInput:', X_infer.shape, 'Parallel: ', 'threads')
    clf = load_sklearn_joblib_model(load_path)
    time.sleep(5)
    preds = joblib_parallel_predict(clf, X_infer)
    assert len(preds) == len(X_infer)
    time.sleep(10)
    del clf, preds
    
    # parallel predict processes
    print('\nInput:', X_infer.shape, 'Parallel: ', 'processes')
    clf = load_sklearn_joblib_model(load_path)
    time.sleep(5)
    preds = joblib_parallel_predict(clf, X_infer, jb_kwargs = {'prefer': 'processes'})
    assert len(preds) == len(X_infer)
    print('-'*50)
    time.sleep(10)
    del clf, preds

Model:  NB.joblib

Input: (10, 10) Parallel:  False
func: joblib_predict, CPU/s: 1.0362
CPUtimes: 0.0004 s , Walltime: 0.0004 s

Input: (10, 10) Parallel:  threads
func: joblib_parallel_predict, CPU/s: 0.0757
CPUtimes: 0.0078 s , Walltime: 0.1034 s

Input: (10, 10) Parallel:  processes
func: joblib_parallel_predict, CPU/s: 0.8672
CPUtimes: 0.0135 s , Walltime: 0.0156 s
--------------------------------------------------
Model:  NB.joblib

Input: (100, 10) Parallel:  False
func: joblib_predict, CPU/s: 1.0074
CPUtimes: 0.0006 s , Walltime: 0.0006 s

Input: (100, 10) Parallel:  threads
func: joblib_parallel_predict, CPU/s: 0.0625
CPUtimes: 0.0065 s , Walltime: 0.1035 s

Input: (100, 10) Parallel:  processes
func: joblib_parallel_predict, CPU/s: 0.9156
CPUtimes: 0.0093 s , Walltime: 0.0101 s
--------------------------------------------------
Model:  NB.joblib

Input: (1000, 10) Parallel:  False
func: joblib_predict, CPU/s: 1.0058
CPUtimes: 0.0006 s , Walltime: 0.0006 s

Input: (1000, 10) Pa

- only at batch size >= 10**6, the parallelizing is showing some use
- But one important thing is CPU/s is << 1 for lower/mini batches i.e indicating that CPU is waiting most of the time, can we levarge that somehow ?

In [14]:
X_infer, _ = datasets.make_classification(n_samples=100000, n_features=10)
X_infer.shape

(100000, 10)

In [15]:
clf = load_sklearn_joblib_model(load_path)
r = joblib_predict(model = clf, X = X_infer)
# print(len(r))

func: joblib_predict, CPU/s: 1.0338
CPUtimes: 0.0202 s , Walltime: 0.0195 s


In [16]:
from concurrent.futures import ThreadPoolExecutor
from functools import partial
import asyncio

def blocking_io():
    # File operations (such as logging) can block the
    # event loop: run them in a thread pool.
    with open('/dev/urandom', 'rb') as f:
        return f.read(100)


async def joblib_predict_async(**kwargs):
    loop = asyncio.get_running_loop()
    # func_args = {'model': clf, 'X': X_infer}
    with ThreadPoolExecutor() as pool:
        result = await loop.run_in_executor(
            pool, partial(joblib_predict, **kwargs))
        # print('thread pool', len(result))
    return result

In [17]:
clf = load_sklearn_joblib_model(load_path)
r = await joblib_predict_async(model = clf, X = X_infer)
# print(len(r))

func: joblib_predict, CPU/s: 1.0408
CPUtimes: 0.0227 s , Walltime: 0.0218 s


In [18]:
@timer
async def predict_parallel_async(model, X):
    loop = asyncio.get_event_loop()
    n_jobs = max(cpu_count(), 1)
    slices = gen_batches(len(X), len(X)//n_jobs)

    with ThreadPoolExecutor(max_workers=n_jobs) as pool:
        tasks = [loop.run_in_executor(pool, model.predict, X[s]) for s in slices]

    completed, pending = await asyncio.wait(tasks)
    results = []
    for t in completed:
        results.extend(t.result().tolist())
    return results

In [19]:
s= time.time()
clf = load_sklearn_joblib_model(load_path)
r = await predict_parallel_async(model = clf, X = X_infer)
print(time.time() - s)
print(len(r))

func: predict_parallel_async, CPU/s: 2.2230
CPUtimes: 0.0000 s , Walltime: 0.0000 s
0.02070903778076172
100000


- Asyncio also didn't help to improve inference times at smaller batches (expected as model.predict is cpu bound.)

- Joblib natively wont support async/await.

### WHY PARALLELIZING DIDN'T HELP ?

#### Training:

1. Both Thread Based & Process Based Parallelism can help. Why ?

- Because Training is a iterative process, the observarions must be used for train the algo multiple times. So in this case, With little overhead of creating process & sharing data the training can be faster.

- Also Training is done on large amount of data, so large batch sizes helps. (sklearn & numpy are optimized for matrix/larger batch calculations)

#### Predictions:

- Prediction is a single step process (we only use observation once to get prediction), So using process based parallelism adds unneccessary overhead and will not help in most cases.


- Usually predictions/inference happens on smaller set of data. Thread based parallelism can help to speed up the algo but if inference data is small then overhead of creating threads can overtake normal execution time. so If inference is running on smaller batches/single observation it may not improve the results.


- https://github.com/scikit-learn/scikit-learn/issues/7448
- https://github.com/scikit-learn/scikit-learn/pull/16310



### Solution to Faster & Scalable Machine Learning Inference APIs:

This can be tricky to answer without excat suitation but here are some options to try.

1. Make code/algo simpler so that it can run the predictions for single/mini-batch observation(s) in minimal possible time. Further, combine this ASGI server (reduces I/O time) with multiple workers for better request latency.

some framework(s) doing this: 
1. onnx (supports sklearn, xgboost & lgbm)
2. river

https://cloudblogs.microsoft.com/opensource/2020/12/17/accelerate-simplify-scikit-learn-model-inference-onnx-runtime/


OR

2. Use a Queue to collect the request, make predictions in batches & return response for all at once. Further, this can be also combined with ASGI server for better latency.

some framework(s) doing this: 

1. tf-serving (tensorflow)
2. BentoML (all major ml frameworks)
3. Clipper

OR

3.Other Alternative(s): 

- Using Distrubuted systems (Spark, Dask)
- Humming Bird (https://github.com/microsoft/hummingbird)
- Processor specific accelrators (like Intel's OpenVINO)
- Using GPU based infernece (TensorRT)