In [1]:
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from threading import Thread
max_cpu = mp.cpu_count()
print(f'{max_cpu} cpu\'s detected')
import numpy as np

from math import sqrt
from time import sleep, time

4 cpu's detected


# Global Interpreter Lock
Please keep in mind the [Python Global Interpreter Lock (GIL)](https://realpython.com/python-gil/#what-problem-did-the-gil-solve-for-python), this means that - on a single thread - no two lines of Pyhton code can execute at the same time (ever). Parallel computing is achieved by sending the script to multiple cores.

# Two different worker functions
Below, we create a worker function that does some numpy computation and a working function that simply waits a few seconds. The reason we create these two is that they very widely in their potential for a speed-up once parallelized. This is because numpy is already relying C++ code in the back-end. I do not know the specifics, but I belief C++ makes use of vectorization allowing it to already compute multiple things at the same time. 

In [2]:
def poolworker(i, comp_cost_per_job):
    data = np.random.uniform(size=(1024, 1024))
    for i in range(comp_cost_per_job*30):
        np.matmul(data, data) # already done in parallel in by Numpy, use other function
    return i

In [3]:
def poolworker(i, comp_cost_per_job):
    sleep(comp_cost_per_job)
    return 

# Using (built-in) Concurrent futures

In [4]:
n_jobs = 5
comp_cost_per_job = 1
t0 = time()
results = [poolworker(i, comp_cost_per_job) for i in range(n_jobs)]
print("Sequential", time() - t0)

# Thread Pool
t0 = time()
pool = ThreadPoolExecutor(8) # can crash when using interactively
futures = [pool.submit(poolworker, i, comp_cost_per_job) for i in range(n_jobs)]
results = [future.result() for future in futures]
print("Concurrent Thread Pool", time() - t0)

# Process Pool
t0 = time()
pool = ProcessPoolExecutor(8) # can crash when using interactively
futures = [pool.submit(poolworker, i, comp_cost_per_job) for i in range(n_jobs)]
results = [future.result() for future in futures]
print("Concurrent Process Pool", time() - t0)

Sequential 5.018466949462891
Concurrent Thread Pool 1.005444049835205
Concurrent Process Pool 1.0631179809570312


Here Thread Pools don't give any speedup, whereas Process Pools do

Please Keep in mind that Process Pools use different address spaces, you can't use any variables defined outside the function's scope, whereas with Thread Pools, this is possible! :)

# multiprocessing package

In [5]:
import multiprocessing as mp


# Sequential
t0 = time()
results = [poolworker(i, comp_cost_per_job) for i in range(n_jobs)]
print("Sequential", time() - t0)


t0 = time()
# Step 1: Init multiprocessing.Pool()
pool = mp.Pool(mp.cpu_count()) # can crash when using interactively
# Step 2: `pool.apply` the `howmany_within_range()`
results = [pool.apply(poolworker, args=(i, comp_cost_per_job)) for i in range(n_jobs)]
# Step 3: Don't forget to close
pool.close()    
print("MP Process Pool", round(time() - t0, 1))

Sequential 5.016125917434692
MP Process Pool 5.0


# Joblib

In [6]:
from joblib import Parallel, delayed

# n_jobs = 10
# comp_cost_per_job = 4

# Sequential
t0 = time()
results = [poolworker(i, comp_cost_per_job) for i in range(n_jobs)]
print("Sequential", time() - t0)

# Joblib
t0 = time()
backend = 'multiprocessing' # can crash when using interactively
results = Parallel(n_jobs=-1, backend=backend, batch_size=1)(delayed(
    poolworker)(i, comp_cost_per_job) for i in range(n_jobs))
print(f"Joblib {backend} Pool", round(time() - t0, 1))

# Joblib
t0 = time()
backend = 'loky' # so far robust when using interactively
results = Parallel(n_jobs=-1, backend=backend)(delayed(
    poolworker)(i, comp_cost_per_job) for i in range(n_jobs))
print(f"Joblib {backend} Pool", round(time() - t0, 1))

Sequential 40.039032220840454
Joblib multiprocessing Pool 12.2
Joblib loky Pool 13.1


# Dask

In [None]:
from dask.distributed import Client
import bokeh

if 'client' not in globals():
    client = Client(processes=True, n_workers=3)  # start local workers as processes
# or
# clientthr = Client(processes=False)  # start local workers as threads

In [None]:
n_jobs = 10
comp_cost_per_job = 4
# sequestial
t0 = time()
results = [poolworker(i, comp_cost_per_job) for i in range(n_jobs)]
print("  Sequential",round(time() - t0, 1))
# Dask futures
# so far robust when using interactively
t0 = time() 
# results = Parallel(n_jobs=-1, backend=backend)(delayed(
#     poolworker)(i, comp_cost_per_job) for i in range(n_jobs))
futures = [client.submit(poolworker, i, comp_cost_per_job) for i in range(n_jobs)]
results = client.gather(futures)
print("Dask Process Pool", round(time() - t0, 1))