# Multiprocessing dans Python

### Multiprocessing ?

Multiprocessing (plusiers threads dans cas de [Python](https://docs.python.org/3.5/library/multiprocessing.html#module-multiprocessing.dummy]) ) permet de paralliser l'execution des scripts/programs sequentiels. 
Parallelisation peut d'accelerer l'execution des programs.

### Machine Learning en parallel
(https://www.slideshare.net/ogrisel/strategies-and-tools-for-parallel-machine-learning-in-python)
Les slides parlent de trois cas d'utilisation differentes :
1. Evaluation d'un modéle
2. Selection d'un modéle
3. Apprentissage d'un modéle tel que RandomForest, Boosting, Bagging

Toutes les choses possible à paralliser dans Machine Learning 'classique' avec Sklearn ont déjà une bon implementation. Le seul chose à faire est de definir le keyword `n_jobs=n` ou n est le nombre des cores disponibles. Par exemple, une [GridSearch](http://scikit-learn.org/stable/tutorial/statistical_inference/putting_together.html) avec plusieurs modèles et paramètres peut d'étre lancer parallerement avec `GridSearchCV(pipe, param_grid, ... , n_jobs=8)`.

### Réseau de neurones en parallel
*Adrian et les autres fans du deep, n'hesite pas corriger si je dis les betises !*
Dans le cas des réseau de neurones, l'execution est fait souvent sur les GPUs. L'execution peut d'étre partager entre plusieurs GPUs, il y a méme une [tutoriel](https://pytorch.org/tutorials/beginner/former_torchies/parallelism_tutorial.html) dans le doc du PyTorch. Je ne sais pas, si on aura plusieurs GPUs dans le hachathon ? 

### Prétraitement et posttraitement en parallel
C'est ici, où on aura besoin definir nous méme le parallelism. Il existe une API facile dans Python : `multiprocessing.Pool` (un [tutoriel](http://chriskiehl.com/article/parallelism-in-one-line/) bon). Il y a aussi beaucoup les implementations plus haut niveau, comme [Dask](https://dask.org/). Voila une petit exemple de tout les deux :

In [None]:
import os
import sys

import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt

# Ajoute le package local `src` pour pouvoir l'utiliser dans un notebook
module_path = os.path.abspath(os.path.join('..'))
if module_path not in sys.path:
    sys.path.append(module_path)
    
from src.data.malimg import load_malimg

import time

X,_ = load_malimg() # PREPROCESSING example, so take only labels

In [3]:
from collections import Counter

def long_feature_engineering_task(line):
    """ performs a FAKE transformation line by line (supposed to be slow so it's not vectorized)
    
    Parameter
    ---------
    line : array like
        A list full of numbers.
    
    Returns
    -------
    new (fake) field 
    """
    line_str = [str(elem) for elem in line]
    line_str_extended = line_str * 4
    big_str = '_'.join(line_str_extended)
    counter = Counter(big_str)
    return np.array(counter.most_common(3)).astype(int)[:,1].sum()

# Sans parallelism
Le fake prétraitement sans parallelism

In [4]:
start = time.time()

result_column = []
for i in range(X.shape[0]):
    result_column.append(long_feature_engineering_task(X[i, :]))

end = time.time()

print(end - start)

39.18769645690918


# Python multiprocessing

In [6]:
def hundred_lines_job(indexes):
    """ Job started by threadpool.
    
        Executes feature engineering for a hundred lines.
    """
    return [long_feature_engineering_task(X[idx,:]) for idx in indexes]

In [7]:
from multiprocessing import Pool as ThreadPool

start = time.time()

n = int(X.shape[0] / 100)
job_indexes = np.array_split(np.arange(X.shape[0]), n) # indexes for each job

pool = ThreadPool()
result = pool.map(hundred_lines_job, job_indexes)
result_column = np.concatenate(result).ravel()

end = time.time()

print(end - start)

9.675493478775024


# Dask multiprocessing

In [8]:
from dask import compute, delayed

start = time.time()

n = int(X.shape[0] / 100)
job_indexes = np.array_split(np.arange(X.shape[0]), n) # indexes for each job

jobs = [delayed(hundred_lines_job)(indexes) for indexes in job_indexes]
result = compute(*jobs, scheduler='processes')
result_column = np.concatenate(result).ravel()

end = time.time()

print(end - start)

20.716754913330078


## Conclusion

J'ai voulu faire du pub pour le bibliotheque Dask, mais vu que c'est plus lente que pure Python dans cette tache, peut étre c'est mieux aller avec `from multiprocessing import Pool` simple :D. Dask est bon, si on a beaucoup des donnees, qui ne tient pas en memoire (alternative pour Spark etc)