# A tour of python profiling and optimization (Part II)

*Antonino Ingargiola*, EuroScipy 2018

> premature optimization is the root of all evil (or at least most of it) in programming. 

                                                         - Donald Knuth

# Parallelism with Joblib

> [Joblib](https://joblib.readthedocs.io/) provides a *incredibly simple API* for both 
> **thread-based** a **process-based** parallelism (it also offer memoization).


*See also Part I: [Optimizing python code](Optimizing%20python%20code.ipynb).*

**Binder Warning**: If you are running this on *MyBinder.org* you may not see any
speed-up because each user is limited to 1 CPU.

## OU process function to be parallelized

*Continuing the Ornestein-Uhlenbeck process example from [Optimizing python code](Optimizing%20python%20code.ipynb).*

We define both Cython and Numba functions to be parallelized:

In [1]:
import time, os
import numpy as np
import numba
import math
from randomgen import RandomGenerator, Xoroshiro128

In [2]:
%load_ext cython

In [3]:
%%cython

import numpy as np
cimport numpy as np
from libc.math cimport exp, sqrt
import cython

@cython.boundscheck(False)
def OU_process_cy3(int num_steps, *, double[:] N_norm, double dt=0.1, 
                   double x0=0, double tau=1, double sigma=2):
    cdef Py_ssize_t i
    cdef double relax, diffuse
    cdef double[:] x = np.empty(num_steps + 1)
    
    with nogil:
        relax = exp(-dt / tau)
        diffuse = sigma * sqrt(1 - relax**2)
        x[0] = x0
        for i in range(num_steps):
            x[i+1] = x[i] * relax + diffuse * N_norm[i]
    return np.asfarray(x)

In [4]:
@numba.jit(nogil=True)
def OU_process_nb(num_steps, *, N_norm, δt=0.1, x0=0., τ=1., σ=2.):
    relax = math.exp(-δt / τ)
    diffuse = σ * math.sqrt(1 - relax**2)

    x = np.empty(num_steps + 1)
    x[0] = x0
    for i in range(num_steps):
        x[i+1] = x[i] * relax + diffuse * N_norm[i]
    return x

> **NOTE**: Both functions release the GIL! Can you find where?

# Joblib examples

*Reference: [Embarrassingly parallel for loops](https://joblib.readthedocs.io/en/latest/parallel.html)*

In [5]:
from joblib import Parallel, delayed

## Parallel sqrt

Example from the docs:

In [6]:
[math.sqrt(i ** 2) for i in range(10)]

[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]

In [7]:
Parallel(n_jobs=2)(delayed(math.sqrt)(i ** 2) for i in range(10))

[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]

## Busy sleep

In [8]:
def verbose_sleep(n):
    print(f'PID: {os.getpid()} - busy sleeping', flush=True)
    time.sleep(n)
    print(f'PID: {os.getpid()} - done sleeping', flush=True)
    return n

In [9]:
%%timeit -n1 -r1
[verbose_sleep(5) for i in range(4)]

PID: 54056 - busy sleeping
PID: 54056 - done sleeping
PID: 54056 - busy sleeping
PID: 54056 - done sleeping
PID: 54056 - busy sleeping
PID: 54056 - done sleeping
PID: 54056 - busy sleeping
PID: 54056 - done sleeping
20 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


In [10]:
%%timeit -n1 -r1
Parallel(n_jobs=4)(delayed(verbose_sleep)(5) for i in range(4))

5.4 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


## Random Numbers Generations

In [11]:
num_steps = int(5e6)
num_sim = 8

rg = RandomGenerator(Xoroshiro128())

First the single-core version using `np.random`:

In [12]:
%%timeit
np.random.randn(num_sim*num_steps)

1.55 s ± 144 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


Here we use thread-based parallelism (`prefer='threads'`):

In [13]:
%%timeit
Parallel(n_jobs=4, prefer='threads')(delayed(np.random.randn)(num_steps) 
                                     for i in range(num_sim))

1.31 s ± 82.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


Now the comparison using `randomgen`:

In [14]:
%%timeit
rg.randn(num_sim*num_steps)

630 ms ± 52 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [15]:
%%timeit
Parallel(n_jobs=4, prefer='threads')(delayed(rg.randn)(num_steps) 
                                     for i in range(num_sim))

580 ms ± 33.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


Process-based parallelism, commented because too slow:

In [16]:
# %%timeit -n1 -r1
# Parallel(n_jobs=4)(delayed(np.random.randn)(num_steps) for i in range(num_sim))

In [17]:
# %%timeit -n1 -r1
# Parallel(n_jobs=4)(delayed(rg.randn)(num_steps) for i in range(num_sim))

# Parallelize the OU process

In the previous notebook we have split the simulation in
RNG and the deterministic computations. This is allows better
optimization and to easily switch to the more performant `randomgen` 
instead of `numpy.random`.

For parallelization purposes, here we define 2 wrapper function 
performing the full simulation (cython and numba versions):

In [18]:
def ou_process_cy_mp(num_steps, *, dt=0.1, x0=0, tau=1, sigma=2):
    rg = RandomGenerator(Xoroshiro128())
    N_norm = rg.randn(num_steps)
    x = OU_process_cy3(num_steps, N_norm=N_norm, dt=dt, x0=x0, tau=tau, sigma=sigma)
    return x

def ou_process_nb_mp(num_steps, *, dt=0.1, x0=0, tau=1, sigma=2):
    rg = RandomGenerator(Xoroshiro128())
    N_norm = rg.randn(num_steps)
    x = OU_process_nb(num_steps, N_norm=N_norm, δt=dt, x0=x0, τ=tau, σ=sigma)
    return x

Test calling the functions:

In [19]:
N_norm = rg.randn(num_steps)

[OU_process_cy3(num_steps, N_norm=N_norm) for _ in range(num_sim)]
[ou_process_cy_mp(num_steps) for _ in range(num_sim)]
[ou_process_nb_mp(num_steps) for _ in range(num_sim)]

[array([ 0.        , -1.35056857, -0.42634809, ..., -0.47090951,
        -1.56315924, -2.29438242]),
 array([ 0.        ,  0.39933098, -0.50241349, ..., -1.04086923,
        -1.45958082, -1.60680918]),
 array([ 0.        ,  0.96531688,  0.1634128 , ...,  0.72873354,
        -1.08460205,  0.34895678]),
 array([ 0.        , -0.05919879,  0.04225244, ..., -0.23945914,
        -0.14030399,  1.31228252]),
 array([ 0.        , -1.0005614 , -0.89184516, ..., -1.26894611,
        -1.00964957, -0.83159191]),
 array([ 0.        ,  0.70408421,  1.54412846, ..., -3.2284155 ,
        -2.7897333 , -2.14465114]),
 array([ 0.        , -1.58378376, -1.90680166, ..., -1.88280106,
        -1.26820058, -0.13192404]),
 array([ 0.        , -0.68223056, -0.71738807, ..., -2.98058024,
        -3.10178986, -3.06540171])]

# Parallelize Cython 

We need to release the GIL. How?

- cython functions only callable from C (`cdef`) can release the gil using
      cdef function_name() nogi:
- cython functions callable from python (`def`, `cpdef`) 
  can release the gil using a `with nogil:` block.
  
Since our function needs to be called from python, we use the second approach
(see the `OU_process_cy3` definition above).

In [20]:
num_steps = int(5e6)
num_sim = 8

### Parallel cython: OU process

In [21]:
%%timeit
[OU_process_cy3(num_steps, N_norm=N_norm) for _ in range(num_sim)]

285 ms ± 28.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [22]:
%%timeit
Parallel(n_jobs=4, prefer='threads')(delayed(OU_process_cy3)(num_steps, N_norm=N_norm) 
                                     for _ in range(num_sim))

175 ms ± 42.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


### Parallel cython: OU process + RNG

In [23]:
%%timeit
[ou_process_cy_mp(num_steps) for _ in range(num_sim)]

793 ms ± 19.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [24]:
%%timeit
Parallel(n_jobs=4, prefer='threads')(delayed(ou_process_cy_mp)(num_steps) 
                                     for _ in range(num_sim))

431 ms ± 112 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


### Results

- We get a **~2x** speed improvement on a 2-core system 
- Empirically, best performance with 4 threads on 2-core system

# Parallelize Numba

We need to release the GIL. How?

- use `numba.jit(nogil=True)`  (see the `OU_process_nb` definition above)

In [25]:
num_steps = int(5e6)
num_sim = 8

### Parallel Numba: OU process

In [26]:
%%timeit
[OU_process_nb(num_steps, N_norm=N_norm, δt=0.1, x0=0, σ=1, τ=1)
 for _ in range(num_sim)]

269 ms ± 96.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [27]:
%%timeit
Parallel(n_jobs=4, prefer='threads')(delayed(OU_process_nb)(num_steps, N_norm=N_norm, δt=0.1, x0=0, σ=1, τ=1)
                                     for _ in range(num_sim))

159 ms ± 19.3 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


### Parallel Numba: OU process + RNG

In [28]:
%%timeit
[ou_process_nb_mp(num_steps) for _ in range(num_sim)]

773 ms ± 40.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [29]:
%%timeit
Parallel(n_jobs=4, prefer='threads')(delayed(ou_process_nb_mp)(num_steps) 
                                     for _ in range(num_sim))

403 ms ± 51.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


### Results

- We get a **~2x** speed improvement on a 2-core system 
- Empirically, best performance with 4 threads on 2-core system

# Take-home message

## Thread-based parallelism

*Release the GIL to unlock a performance boost with thread-based parallelism.*

Numpy's functions (including `randomgen`) already release the GIL! 🎉🎉🎉

## Process-based parallelism

Rarely achieves gains unless the application is heavily CPU-bound
and has relatively low RAM usage.

# Standard lib: Threading and Multiprocessing

The `mutiprocessing` package provides a relatively simple API (`Pool`)
compared to the `threading` package.

Unfortunately, even using `mutiprocessing` is much more cumbersome that joblib.

In [30]:
from multiprocessing import Pool

In [31]:
%%timeit -n1 -r1
with Pool(processes=4) as pool:
    try:
        Res = []
        for i in range(num_sim):
            result = pool.apply_async(
                ou_process_cy_mp, kwds=dict(num_steps=num_steps, 
                                            dt=0.1, x0=0, 
                                            tau=1, sigma=2))
            Res.append(result)
        X = []
        for i, result in enumerate(Res):
            X.append(result.get())
            #print(f'[Worker {i}] returned an array of shape {len(X[-1])}', flush=True)
    except KeyboardInterrupt:
        print('\n>>> Got keyboard interrupt.\n', flush=True)
print('Closing subprocess pool.', flush=True)

Closing subprocess pool.
8.88 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


The `threading` API is even more complex.

## Lessons

- Joblib's API rocks!