# Parallel computing

- distributing the workload of complex processes can speed up work
- a lot of processes can be parallelised
- there are however cases where subprocesses can not be split up any more

- there is a fundamental limit to the speed up archievable

In [1]:
from joblib import Parallel, delayed
from math import sqrt
import time
import os 
os.system("taskset -p 0xFFFFFFFF %d" % os.getpid())
import numpy as np

def f(k):
    return 2*k

def benchmark(function, function_name):
    start = time.time()
    function()
    end = time.time()
    print("{0} seconds for {1}".format((end - start), function_name))

#### Normal fast code

In [2]:
def list_single_thread(BIG=2000000):
    return [f(i) for i in range(BIG)]

#### Parallelized code

In [3]:
def list_multi_thread(n_jobs=2, BIG=2000000): #default parameter is 2 cores
    return Parallel(n_jobs=n_jobs)(delayed(f)(i) for i in np.split(np.arange(BIG), n_jobs))

#### Do a benchmark

In [4]:
%time _ = list_single_thread()

CPU times: user 315 ms, sys: 35.4 ms, total: 351 ms
Wall time: 349 ms


In [5]:
%time _ = list_multi_thread(n_jobs=2)
%time _ = list_multi_thread(n_jobs=4)
%time _ = list_multi_thread(n_jobs=8)

CPU times: user 77.3 ms, sys: 126 ms, total: 204 ms
Wall time: 459 ms
CPU times: user 30.6 ms, sys: 71.2 ms, total: 102 ms
Wall time: 126 ms
CPU times: user 37 ms, sys: 86.5 ms, total: 124 ms
Wall time: 151 ms


# But why is it not working or getting worse for more jobs?
- the overhead of splitting the job and putting it back together needs ressources
- some tasks are easier to parallelize
- execution of a simple task on distinct parameters for example (Grid Scan, hyperparameter optimization)

In [6]:
def parallel_dot(A,B,n_jobs=2):
    """
     Computes A x B using more CPUs.
     This works only when the number 
     of rows of A and the n_jobs are even.
    """
    parallelizer = Parallel(n_jobs=n_jobs)
    # this iterator returns the functions to execute for each task
    tasks_iterator = (delayed(np.dot)(A_block,B) 
                      for A_block in np.split(A,n_jobs))
    result = parallelizer(tasks_iterator)
    # merging the output of the jobs
    return np.vstack(result)

A = np.random.randint(0,high=10,size=(1000,1000))
B = np.random.randint(0,high=10,size=(1000,1000))

In [7]:
%time _ = np.dot(A,B)

CPU times: user 3.86 s, sys: 6.5 ms, total: 3.87 s
Wall time: 3.87 s


In [8]:
%time _ = parallel_dot(A,B,n_jobs=2)
%time _ = parallel_dot(A,B,n_jobs=4)
%time _ = parallel_dot(A,B,n_jobs=8)

CPU times: user 28.2 ms, sys: 44.3 ms, total: 72.5 ms
Wall time: 2.63 s
CPU times: user 591 ms, sys: 43.9 ms, total: 635 ms
Wall time: 1.63 s
CPU times: user 63 ms, sys: 48 ms, total: 111 ms
Wall time: 1.78 s


### Exercise 9.1

Paralellize the calculation of a polynomial.
Define a polynomial function of third order that takes four parameters.

Make a 10x10x10x10 grid of parameters with: $$ 0\leq a,b,c,d \leq 1$$ 

Calculate the polynomials for: $$ -2 \leq x \leq +2 $$

Test the speedup.

In [9]:
### Your code here
from collections import OrderedDict
import csv
from itertools import product

def poly(x,a,b,c,d):
    return a*x**3 + b*x**2 + c*x + d

params = OrderedDict([
    ('a' , np.linspace(0,1,10)),
    ('b' , np.linspace(0,1,10)),
    ('c' , np.linspace(0,1,10)),
    ('d' , np.linspace(0,1,10))
    ])

with open('grid_params.csv', "w") as f:
        c = csv.DictWriter(f, fieldnames=list(params.keys()))
        c.writeheader()

        for parameters in product(*params.values()):
            d = OrderedDict(zip(list(params.keys()), parameters))
            c.writerow(d)

In [10]:
import pandas as pd
x = np.linspace(-2,2,50)
df = pd.read_csv('grid_params.csv')
%time res = [poly(x,*line) for line in df.values]
%time res_par = Parallel(n_jobs=4)(delayed(poly)(x,*line) for line in df.values)

CPU times: user 284 ms, sys: 8.02 ms, total: 292 ms
Wall time: 292 ms
CPU times: user 788 ms, sys: 4.74 ms, total: 793 ms
Wall time: 954 ms


In [11]:
import multiprocessing as mp
print("Number of processors: ", mp.cpu_count())
pool = mp.Pool(mp.cpu_count())
%time result = [pool.apply(poly, args=(x, *line)) for line in df.values]
pool.close()

Number of processors:  8
CPU times: user 4.89 s, sys: 740 ms, total: 5.63 s
Wall time: 8.6 s


In [12]:
pool = mp.Pool(mp.cpu_count())
%time result = pool.starmap(poly, [(x, *line) for line in df.values])
pool.close()

CPU times: user 316 ms, sys: 27.5 ms, total: 343 ms
Wall time: 343 ms


# Are there other libraries for parallel processing?

- Yes. A lot of them https://wiki.python.org/moin/ParallelProcessing

- multiprocessing is another library that is easy to use
