## Threads & processes

In [1]:
from threading import Thread

In [2]:
import random 

def calc_pi(N, name=None):
    printing = name is not None 
    if printing:
        print(f"{name}: starting")
    M = 0 
    for i in range(N):
        x = random.uniform(-1, 1)
        y = random.uniform(-1, 1)
        if x**2 + y**2 < 1:
            M += 1
    
    if printing:
        print(f"{name}: Done")
            
    return 4*M/N

In [3]:
%%time 
calc_pi(10**7)

CPU times: user 5.5 s, sys: 4.05 ms, total: 5.5 s
Wall time: 5.5 s


3.1414272

In [5]:
%%time
n=10**4//2
t1 = Thread(target=calc_pi, args=(n, "Thread 1", )) 
t2 = Thread(target=calc_pi, args=(n, "Thread 2", ))

t1.start()
t2.start()

print("Have started threads")
a = t1.join()
b = t2.join()

Thread 1: starting
Thread 1: Done
Thread 2: starting
Thread 2: Done
Have started threads
CPU times: user 10.3 ms, sys: 0 ns, total: 10.3 ms
Wall time: 10.1 ms


['__bool__',
 '__class__',
 '__delattr__',
 '__dir__',
 '__doc__',
 '__eq__',
 '__format__',
 '__ge__',
 '__getattribute__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__le__',
 '__lt__',
 '__ne__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__']

#### Circumventing the GIL

In [7]:
import numba

In [10]:

@numba.jit(nopython=True, nogil=True)
def calc_pi_nogil(N):
    M = 0 
    for i in range(N):
        x = random.uniform(-1, 1)
        y = random.uniform(-1, 1)
        if x**2 + y**2 < 1:
            M += 1
            
    return 4 * M / N


@numba.jit(nopython=True)
def calc_pi_with_gil(N):
    M = 0 
    for i in range(N):
        x = random.uniform(-1, 1)
        y = random.uniform(-1, 1)
        if x**2 + y**2 < 1:
            M += 1
            
    return 4 * M / N


In [11]:
calc_pi_with_gil(100)
calc_pi_nogil(100)

3.2

In [12]:
%timeit calc_pi_nogil(10**7)

63.2 ms ± 688 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)


In [13]:
%timeit calc_pi_with_gil(10**7)

62.9 ms ± 387 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)


**Exercise: threading on a numpy function**

Many Numpy functions unlock the GIL. Try to sort two randomly generated arrays using numpy.sort in parallel. Time it; compare to the sequential version.


In [14]:
import numpy as np 

In [15]:
a = np.random.random(10**6)
b = np.random.random(10**6)

In [18]:
%%timeit -n 10 -r 10
np.sort(a)
np.sort(b)

121 ms ± 5.76 ms per loop (mean ± std. dev. of 10 runs, 10 loops each)


In [20]:
%%timeit -n 10 -r 10
t1 = Thread(target=np.sort, args=(a,))
t2 = Thread(target=np.sort, args=(b,))

t1.start()
t2.start()

t1.join()
t2.join()


62 ms ± 3.18 ms per loop (mean ± std. dev. of 10 runs, 10 loops each)


### Multiprocessing

In [27]:
from multiprocessing import Process
import multiprocessing as mp

In [36]:
%%timeit -n 1 -r 1

if __name__ == "__main__":
    n = 10**7//2
    ctx = mp.get_context("fork")
    p1 = ctx.Process(target=calc_pi, args=(n, ))
    p2 = ctx.Process(target=calc_pi, args=(n, ))
    
    p1.start()
    p2.start()
    
    p1.join()
    p2.join()
    

3.73 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


In [31]:
!ls -l

total 1336
drwxrwxr-x 2 flavio flavio    4096 mrt 11 08:53 notebooks
drwxrwxr-x 4 flavio flavio    4096 mrt 19 12:15 notes
-rw-rw-r-- 1 flavio flavio   13117 mrt 19 14:27 part1.ipynb
-rw-rw-r-- 1 flavio flavio    9226 mrt 19 15:57 part2.ipynb
-rw-rw-r-- 1 flavio flavio 1327134 mrt 19 12:10 test.ipynb


In [None]:
!python myscript.py

In [26]:
%%timeit -n 1 -r 1
calc_pi(10**7//2)
calc_pi(10**7//2)

6.32 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


### Overhead and the gains from multiprocessing
Using the original `calc_pi` function, write a program the runs the computation across multiple processes and records the time taken.
First, vary the amount of work and fix the number of processes.
Then, vary the number of processes and fix the amount of work.

By inspecting the running times, investigate the scaling behavior of the programs. What can we learn?

It is recommended to have the programs in separate script and call the scripts from the notebook with `!python myscript.py`.

Hint: you can use [multiprocessing.starmap](https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool.starmap)

from timeit import timeit 

`timeit(lambda: ....)`


In [38]:
!python mp_pool.py

hello world


In [43]:
from timeit import timeit

In [44]:
help(timeit)

Help on function timeit in module timeit:

timeit(stmt='pass', setup='pass', timer=<built-in function perf_counter>, number=1000000, globals=None)
    Convenience function to create Timer object and call timeit method.



In [46]:
timeit(lambda: print(4), number=4)

4
4
4
4


6.205400131875649e-05

In [48]:
!python mp_pool.py

Using 100 samples took 0.2575781769992318 seconds
Using 1000 samples took 0.25124963400230627 seconds
Using 10000 samples took 0.2991928610026662 seconds
Using 1000000 samples took 3.2885874150015297 seconds
Using 10000000 samples took 37.70154950699725 seconds


In [53]:
!python mp_pool_vary_workers.py

using 1 jobs took 2.842753086999437 seconds
using 2 jobs took 1.4961389279997093 seconds
using 4 jobs took 0.9307320130028529 seconds
using 8 jobs took 0.8835392270011653 seconds
using 16 jobs took 0.9303198029992927 seconds
using 32 jobs took 0.8894892330026778 seconds


## Collecting results from threads

- for instance, [this method](https://stackoverflow.com/questions/6893968/how-to-get-the-return-value-from-a-thread) is not easy to compile to numba
    - we could use a class (as in the above link), but I have not tried this
- problem: it's hard to make numba modify objects like lists, which is necessary in threaded 
    - in the above approach, each thread would write the result into a list at different indices. but we cannot numba-compile lists
- we can use `concurrent.futures` instead, which provides an easier-to-use interface
- under the hood it does not do quite the same as `threading.Thread`: it asynchronously executes the function. But Johan will certainly be able to tell you more.

Documentation: https://docs.python.org/3/library/concurrent.futures.html#executor-objects

https://stackoverflow.com/questions/61351844/difference-between-multiprocessing-asyncio-threading-and-concurrency-futures-i

key idea: "executor" are what was "Pool" in multiprocessing yesterday, and it works for threads and processes. 


In [1]:
import numba
import random 

@numba.jit(nopython=True, nogil=True)
def calc_pi(N, name=None):
    printing = name is not None 
    if printing:
        print(f"{name}: starting")
    M = 0 
    for i in range(N):
        x = random.uniform(-1, 1)
        y = random.uniform(-1, 1)
        if x**2 + y**2 < 1:
            M += 1
    
    if printing:
        print(f"{name}: Done")
            
    return 4*M/N
    

calc_pi(100) # compile


3.28

In [2]:
from  concurrent.futures  import ThreadPoolExecutor

In [3]:
#show the result
executor = ThreadPoolExecutor(max_workers=5)
a = executor.map(calc_pi, [10**7//2, 10**7//2])
list(a) # a is a generator

[3.1426448, 3.1415656]

In [7]:
%%timeit -n 10 -r 10
n_chunks = 2
executor = ThreadPoolExecutor(max_workers=5)
work = [10**7//n_chunks for _ in range(n_chunks)]
a = executor.map(calc_pi, work)

16.1 ms ± 1.16 ms per loop (mean ± std. dev. of 10 runs, 10 loops each)


In [8]:
%%timeit -n 10 -r 10
a = calc_pi(10**7)


70.2 ms ± 2.14 ms per loop (mean ± std. dev. of 10 runs, 10 loops each)


### Doing the same with processes

In [9]:
from concurrent.futures import ProcessPoolExecutor

In [11]:
# show the result 
executor = ProcessPoolExecutor(max_workers=4)
n_chunks = 2
work = [10**7//n_chunks for _ in range(n_chunks)]
a = executor.map(calc_pi, work)

In [12]:
%%timeit -n 10 -r 10
executor = ProcessPoolExecutor(max_workers=4)

n_chunks = 2
work = [10**7//n_chunks for _ in range(n_chunks)]
a = executor.map(calc_pi, work)


20.2 ms ± 862 µs per loop (mean ± std. dev. of 10 runs, 10 loops each)
