# Parallelizing your code

In [1]:
import numpy as np

In [2]:
import multiprocessing as mp    
mp.set_start_method("fork")    # Only needed for Jupyter, do not do this in python code itself

Often times we want to perform multiple task simulatenous and leverage multiple cores.

Previously we would use the `threading` library or `multiprocessing` library to achieve this but
nowadays these are pretty terrible to use. For multiple reasons but not limited to:
    
    - Script freezing due to deadlock
    - Zombie python processes appearing
    - Script not finishing at the end
    - All other sadness


Instead we will use the `concurrent.futures` module to make this experience as smooth as possible. Lets try it

Python has to parallel execution modules. `threading` and `multiprocessing`. Threading runs parallel jobs in the same python process while Multiprocessing spawns new python processes.

The difference comes from the Global Interpretor Lock. Within a single python process, only one python task can be run at the same time. However if the task release this lock then another task can run. Often time, reading things from file or certain functions (like numba with `nogil`) will release the lock and allow concurrent processes.

Multiproccessing overcomes this by creating a new python process and running tasks there. However it requires explicitly sending data to that process.

In [4]:
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

In [5]:
def f(x):
    import time
    time.sleep(1.0)
    return np.sin(x)

with ThreadPoolExecutor(max_workers=1) as e:
    futures = e.submit(f,10)
    if futures.done():
        print('Done')
    else:
        print('Not done')
    print(futures.result())

Not done
-0.5440211108893699


We can submit a bunch more tasks if we want as well!

In [9]:
tasks = np.arange(0,10)

with ThreadPoolExecutor(max_workers=8) as e:
    futures = [e.submit(f,x) for x in tasks]
    i=0
    while(i < 10):
        print('Doing something else')
        i+=1
    all_values = [x.result() for x in futures]
all_values

Doing something else
Doing something else
Doing something else
Doing something else
Doing something else
Doing something else
Doing something else
Doing something else
Doing something else
Doing something else


[np.float64(0.0),
 np.float64(0.8414709848078965),
 np.float64(0.9092974268256817),
 np.float64(0.1411200080598672),
 np.float64(-0.7568024953079282),
 np.float64(-0.9589242746631385),
 np.float64(-0.27941549819892586),
 np.float64(0.6569865987187891),
 np.float64(0.9893582466233818),
 np.float64(0.4121184852417566)]

In [7]:
all_values

[np.float64(0.0),
 np.float64(0.8414709848078965),
 np.float64(0.9092974268256817),
 np.float64(0.1411200080598672),
 np.float64(-0.7568024953079282),
 np.float64(-0.9589242746631385),
 np.float64(-0.27941549819892586),
 np.float64(0.6569865987187891),
 np.float64(0.9893582466233818),
 np.float64(0.4121184852417566)]

Now this can be a bit laborious for large set of tasks, what we can instead do is leverage the `map` function and map our task across inputs

In [10]:
%%time
with ThreadPoolExecutor(max_workers=8) as e:
    for x in e.map(f, tasks):
        print(x)

0.0
0.8414709848078965
0.9092974268256817
0.1411200080598672
-0.7568024953079282
-0.9589242746631385
-0.27941549819892586
0.6569865987187891
0.9893582466233818
0.4121184852417566
CPU times: user 4.78 ms, sys: 3.43 ms, total: 8.21 ms
Wall time: 2 s


In [11]:
def g(x):
    import time
    time.sleep(0.3)
    return np.cos(x)

with ThreadPoolExecutor(max_workers=8) as e:
    f_tasks = e.map(f, tasks)
    print('Ok now do g')
    g_tasks = e.map(g, tasks)
    print('Lets see f')
    print(list(f_tasks))
    print('Lets see g')
    print(list(g_tasks))


Ok now do g
Lets see f
[np.float64(0.0), np.float64(0.8414709848078965), np.float64(0.9092974268256817), np.float64(0.1411200080598672), np.float64(-0.7568024953079282), np.float64(-0.9589242746631385), np.float64(-0.27941549819892586), np.float64(0.6569865987187891), np.float64(0.9893582466233818), np.float64(0.4121184852417566)]
Lets see g
[np.float64(1.0), np.float64(0.5403023058681398), np.float64(-0.4161468365471424), np.float64(-0.9899924966004454), np.float64(-0.6536436208636119), np.float64(0.2836621854632263), np.float64(0.9601702866503661), np.float64(0.7539022543433046), np.float64(-0.14550003380861354), np.float64(-0.9111302618846769)]


In [12]:
%pip install tqdm

Collecting tqdm
  Downloading tqdm-4.66.4-py3-none-any.whl (78 kB)
[K     |████████████████████████████████| 78 kB 7.1 MB/s eta 0:00:011
[?25hInstalling collected packages: tqdm
Successfully installed tqdm-4.66.4
You should consider upgrading via the '/Users/ahmed/venv/bin/python -m pip install --upgrade pip' command.[0m
Note: you may need to restart the kernel to use updated packages.


In [28]:
import hashlib


def hash_one(n):
    """A somewhat CPU-intensive task."""

    for i in range(1, n):
        hashlib.pbkdf2_hmac("sha256", b"password", b"salt", i * 10000)

    return "done"


def hash_all(n):
    """Function that does hashing in serial."""
    import tqdm
    for i in tqdm.tqdm(range(n)):
        hsh = hash_one(i)

    return "done"

In [32]:
hash_all(20)

100%|█████████████████████████████████████████████| 20/20 [00:06<00:00,  2.99it/s]


'done'

In [18]:
import tqdm

In [19]:
for x in tqdm.tqdm(range(10)):
    x*2

100%|██████████████████████████████████████████| 10/10 [00:00<00:00, 21833.96it/s]


In [33]:
def hash_all_process(n):
    """Function that does hashing in serial."""
    import tqdm
    with ThreadPoolExecutor(max_workers=4) as executor:
        for arg, res in tqdm.tqdm(zip(range(n), executor.map(hash_one,range(n))),total=n):
            pass

    return "done"

In [34]:
hash_all_process(20)

100%|█████████████████████████████████████████████| 20/20 [00:02<00:00,  7.25it/s]


'done'