# Parallelizing your code

In [1]:
import numpy as np

In [2]:
import multiprocessing as mp    
mp.set_start_method("fork")    # Only needed for Jupyter, do not do this in python code itself

Often times we want to perform multiple task simulatenous and leverage multiple cores.

Previously we would use the `threading` library or `multiprocessing` library to achieve this but
nowadays these are pretty terrible to use. For multiple reasons but not limited to:
    
    - Script freezing due to deadlock
    - Zombie python processes appearing
    - Script not finishing at the end
    - All other sadness


Instead we will use the `concurrent.futures` module to make this experience as smooth as possible. Lets try it

Python has to parallel execution modules. `threading` and `multiprocessing`. Threading runs parallel jobs in the same python process while Multiprocessing spawns new python processes.

The difference comes from the Global Interpretor Lock. Within a single python process, only one python task can be run at the same time. However if the task release this lock then another task can run. Often time, reading things from file or certain functions (like numba with `nogil`) will release the lock and allow concurrent processes.

Multiproccessing overcomes this by creating a new python process and running tasks there. However it requires explicitly sending data to that process.

In [3]:
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

In [4]:
def f(x):
    return np.sin(x)

with ThreadPoolExecutor(max_workers=1) as e:
    futures = e.submit(f,10)
    print(futures.result())

-0.5440211108893698


We can submit a bunch more tasks if we want as well!

In [24]:
tasks = np.arange(0,10)

with ThreadPoolExecutor(max_workers=8) as e:
    futures = [e.submit(f,x) for x in tasks]
    all_values = [x.result() for x in futures]
    

Now this can be a bit laborious for large set of tasks, what we can instead do is leverage the `map` function and map our task across inputs

In [26]:
with ThreadPoolExecutor(max_workers=8) as e:
    for x in e.map(f, tasks):
        print(x)

0.0
0.8414709848078965
0.9092974268256817
0.1411200080598672
-0.7568024953079282
-0.9589242746631385
-0.27941549819892586
0.6569865987187891
0.9893582466233818
0.4121184852417566


In [27]:
with ThreadPoolExecutor(max_workers=8) as e:
    res = np.array(list(e.map(f,tasks)))
print(res)

[ 0.          0.84147098  0.90929743  0.14112001 -0.7568025  -0.95892427
 -0.2794155   0.6569866   0.98935825  0.41211849]


In [6]:
import hashlib


def hash_one(n):
    """A somewhat CPU-intensive task."""

    for i in range(1, n):
        hashlib.pbkdf2_hmac("sha256", b"password", b"salt", i * 10000)

    return "done"


def hash_all(n):
    """Function that does hashing in serial."""

    for i in range(n):
        hsh = hash_one(n)

    return "done"

In [7]:
%timeit hash_all(10)

2.11 s ± 31.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [18]:
def hash_all_process(n):
    """Function that does hashing in serial."""

    with ProcessPoolExecutor(max_workers=4) as executor:
        for arg, res in zip(range(n), executor.map(hash_one, range(n), chunksize=2)):
            pass

    return "done"

In [19]:
%timeit hash_all_process(10)

334 ms ± 2.26 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
