# Parallel Python
Original from https://blog.floydhub.com/multiprocessing-vs-threading-in-python-what-every-data-scientist-needs-to-know/ \
and also from this book:
https://static.packt-cdn.com/products/9781787285378/cover/smaller

<table align="left">
  <td>
    <a target="_blank" href="https://colab.research.google.com/github/dtrad/geoml_course/blob/master/ParallelPython.ipynb"><img src="https://www.tensorflow.org/images/colab_logo_32px.png" />Run in Google Colab</a>
  </td>
</table>

## Concurrency in Python
Not only does multiprocessing enable us to utilize more of our machine, but we also avoid the limitations that the Global Interpreter Lock imposes on us in CPython.

In [33]:
import multiprocessing 
# returns an integer value of how many available CPUs we have 
multiprocessing.cpu_count()

12

## Process
A process is an instance of a computer program being executed. Each process has its own memory space it uses to store the instructions being run, as well as any data it needs to store and access to execute. \
Let us run in parallel the following function

In [34]:
import random
from functools import reduce
import time

def func(number):
    random_list = random.sample(range(10), number)
    print(random_list)
    return reduce(lambda x, y: x*y, random_list)
func(5)

[0, 8, 9, 4, 2]


0

Let us make the process to perform many more calculations and run in parallel:

In [35]:
reduce?

[0;31mDocstring:[0m
reduce(function, sequence[, initial]) -> value

Apply a function of two arguments cumulatively to the items of a sequence,
from left to right, so as to reduce the sequence to a single value.
For example, reduce(lambda x, y: x+y, [1, 2, 3, 4, 5]) calculates
((((1+2)+3)+4)+5).  If initial is present, it is placed before the items
of the sequence in the calculation, and serves as a default when the
sequence is empty.
[0;31mType:[0m      builtin_function_or_method


In [36]:
multiprocessing.Process?

[0;31mInit signature:[0m
[0mmultiprocessing[0m[0;34m.[0m[0mProcess[0m[0;34m([0m[0;34m[0m
[0;34m[0m    [0mgroup[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mtarget[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mname[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0margs[0m[0;34m=[0m[0;34m([0m[0;34m)[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mkwargs[0m[0;34m=[0m[0;34m{[0m[0;34m}[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0;34m*[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mdaemon[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m     
Process objects represent activity that is run in a separate process

The class is analogous to `threading.Thread`
[0;31mFile:[0m           ~/anaconda3/envs/tfgpu4/lib/python3.9/multiprocessing/context.py
[0;31mType:[0m           type
[0;31mSubclasses:[0m     


In [37]:
import multiprocessing
import random
from functools import reduce
import time

def func(number):
    random_list = random.sample(range(1000000), number)
    return reduce(lambda x, y: x*y, random_list)

    
number = 50000
process1 = multiprocessing.Process(target=func, args=(number,))
process2 = multiprocessing.Process(target=func, args=(number,))

start = time.time()
process1.start()
process2.start()

process1.join()
process2.join()
end = time.time()
print('multi-processing time',end - start)

multi-processing time 1.2547197341918945


## Threads
Threads are components of a process, which can run parallely. There can be multiple threads in a process, and they share the same memory space, i.e. the memory space of the parent process. This would mean the code to be executed as well as all the variables declared in the program would be shared by all threads.

In [38]:
import threading
import random
from functools import reduce
import time

def func(number):
    random_list = random.sample(range(1000000), number)
    return reduce(lambda x, y: x*y, random_list)

    
number = 50000

thread1 = threading.Thread(target=func, args=(number,))
thread2 = threading.Thread(target=func, args=(number,))

start = time.time()
thread1.start()
thread2.start()

thread1.join()
thread2.join()
end = time.time()

print('multithreading time',end - start)


multithreading time 2.308295726776123


In [39]:
threading.Thread?

[0;31mInit signature:[0m
[0mthreading[0m[0;34m.[0m[0mThread[0m[0;34m([0m[0;34m[0m
[0;34m[0m    [0mgroup[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mtarget[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mname[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0margs[0m[0;34m=[0m[0;34m([0m[0;34m)[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mkwargs[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0;34m*[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mdaemon[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m     
A class that represents a thread of control.

This class can be safely subclassed in a limited fashion. There are two ways
to specify the activity: by passing a callable object to the constructor, or
by overriding the run() method in a subclass.
[0;31mInit docstring:[0m
This constructor shou

# Creating threads:
## classes
Let us create a new class that controls a thread by using inherintance from the Thread class.\
As usual for inherintance, we call the initializer for the base class (Thread) and define the functions we want to overwrite or redefine.\
This is common when we want to have more control on the process. Notice that the parent class Thread provides most of the functions we are using, except the "run"

In [41]:
from threading import Thread
class myWorkerThread(Thread):
        def __init__(self):
            print("class initialization")
            Thread.__init__(self)
        def run(self):
            print("Thread is now running")
myThread = myWorkerThread()
print("Created a Thread Object from the class")
myThread.start()
print("Started my thread")
myThread.join()
print("My Thread finished")

class initialization
Created a Thread Object from the class
Thread is now running
Started my thread
My Thread finished


# Creating Processes
### Forks
To fork a process is to create a second exact replica of the given process. In other words, when we fork something, we effectively clone it and then run it as a child process of the process that we just cloned from. \
Notice we use the return value (PID) from fork to know if we are in the parent or child.

In [42]:
import os 

def child():
    print("We are in the child process with PID= %d"%os.getpid())

def parent(): 
    print("We are in the parent process with PID= %d"%os.getpid())
    newRef=os.fork() 
    if newRef==0: 
        child() 
    else: 
        print("We are in the parent process and our child process has PID= %d"%newRef)

parent()

We are in the parent process with PID= 23530
We are in the parent process and our child process has PID= 991
We are in the child process with PID= 991


## Joblib

Examples and introduction taken from:
https://joblib.readthedocs.io/en/latest/why.html 

Joblib provides a simple helper class to write parallel for loops using multiprocessing. \
We write the code to be executed as a generator expression, and convert it to parallel computing. \
For example, let us consider the following test, that will print sequential integers:

In [43]:
from math import sqrt
output=[sqrt(i ** 2) for i in range(10)]
print(output)

[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]


We can do this with 10 processes as follows:

In [44]:
from joblib import Parallel, delayed
output=Parallel(n_jobs=10)(delayed(sqrt)(i**2) for i in range(10))
print(output)

[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]


In [45]:
delayed?

[0;31mSignature:[0m [0mdelayed[0m[0;34m([0m[0mfunction[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m Decorator used to capture the arguments of a function.
[0;31mFile:[0m      ~/anaconda3/envs/tfgpu4/lib/python3.9/site-packages/joblib/parallel.py
[0;31mType:[0m      function


In [46]:
(delayed(sqrt)(i**2) for i in range(10))

<generator object <genexpr> at 0x7f50de8f3430>

Let us time this with different number of processes

In [47]:
import time
start = time.time()
Parallel(n_jobs=1)(delayed(sqrt)(i**2) for i in range(100000))
end = time.time()
print(end - start)

5.769952774047852


In [48]:
start = time.time()
Parallel(n_jobs=-1)(delayed(sqrt)(i**2) for i in range(100000))
end = time.time()
print(end - start)

1.0290379524230957


By default joblib.Parallel uses the 'loky' backend module to start separate Python worker processes to execute tasks concurrently on separate CPUs. \
This is a reasonable default for generic Python programs but can induce a significant overhead as the input and output \
data need to be serialized in a queue for communication with the worker processes.

However, if you have a function based on a compiled extension that releases the Python Global Interpreter Lock (GIL) \
during most of its computation then it is more efficient to use threads instead of Python processes as concurrent workers. 

To hint that your code can efficiently use threads, just pass prefer="threads" as parameter of the joblib.Parallel constructor. \
In this case joblib will automatically use the "threading" backend instead of the default "loky" backend:

In [49]:
start = time.time()
output=Parallel(n_jobs=-1, prefer="threads")(delayed(sqrt)(i**2) for i in range(100000))
end = time.time()
print(output[0:10])
print(end - start)

[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
4.574456214904785


However, as you see in the time, we did not gain anything because the GIL. \
If we ran instead something not affected by the GIL, like the sleep function below, then we do gain speed.

In [51]:
from time import sleep
from joblib import Parallel, delayed
r = Parallel(n_jobs=-1, prefer="threads",verbose=10)(delayed(sleep)(.2) for _ in range(20)) 

[Parallel(n_jobs=-1)]: Using backend ThreadingBackend with 12 concurrent workers.
[Parallel(n_jobs=-1)]: Done   3 out of  20 | elapsed:    0.2s remaining:    1.2s
[Parallel(n_jobs=-1)]: Done   6 out of  20 | elapsed:    0.2s remaining:    0.5s
[Parallel(n_jobs=-1)]: Done   9 out of  20 | elapsed:    0.2s remaining:    0.3s
[Parallel(n_jobs=-1)]: Done  12 out of  20 | elapsed:    0.2s remaining:    0.1s
[Parallel(n_jobs=-1)]: Done  15 out of  20 | elapsed:    0.4s remaining:    0.1s
[Parallel(n_jobs=-1)]: Done  18 out of  20 | elapsed:    0.4s remaining:    0.0s
[Parallel(n_jobs=-1)]: Done  20 out of  20 | elapsed:    0.4s finished


In [52]:
from time import sleep
from joblib import Parallel, delayed
r = Parallel(n_jobs=1, prefer="threads",verbose=10)(delayed(sleep)(.2) for _ in range(20)) 

[Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.
[Parallel(n_jobs=1)]: Done   1 out of   1 | elapsed:    0.2s remaining:    0.0s
[Parallel(n_jobs=1)]: Done   2 out of   2 | elapsed:    0.4s remaining:    0.0s
[Parallel(n_jobs=1)]: Done   3 out of   3 | elapsed:    0.6s remaining:    0.0s
[Parallel(n_jobs=1)]: Done   4 out of   4 | elapsed:    0.8s remaining:    0.0s
[Parallel(n_jobs=1)]: Done   5 out of   5 | elapsed:    1.0s remaining:    0.0s
[Parallel(n_jobs=1)]: Done   6 out of   6 | elapsed:    1.2s remaining:    0.0s
[Parallel(n_jobs=1)]: Done   7 out of   7 | elapsed:    1.4s remaining:    0.0s
[Parallel(n_jobs=1)]: Done   8 out of   8 | elapsed:    1.6s remaining:    0.0s
[Parallel(n_jobs=1)]: Done   9 out of   9 | elapsed:    1.8s remaining:    0.0s
[Parallel(n_jobs=1)]: Done  20 out of  20 | elapsed:    4.0s finished
