# Multi Processing

### Multi processing is the ability to asynchronously execute several tasks at once in your cpu. Moreover, if your CPU contains several cores we can also use multiple cores asynchronously to speed up your code.


# Multi Processing vs. Multi Threading

### This is the most common source of confusion when dealing with parallel computing, so we will try to shed some light on this

# Multi Threading

### In simple words, a thread is a sequence of such instructions within a program that can be executed independently of other code. For simplicity, you can assume that a thread is simply a subset of a process!

### Numpy standard library offers a multithreading module. We note that different threads are not necessarilly executed in different CPU cores!


In [1]:
import threading 
import os 
  
def task1(): 
    print("Task 1 assigned to thread: {}".format(threading.current_thread().name)) 
    print("ID of process running task 1: {}".format(os.getpid())) 
  
def task2(): 
    print("Task 2 assigned to thread: {}".format(threading.current_thread().name)) 
    print("ID of process running task 2: {}".format(os.getpid())) 
  
if __name__ == "__main__": 
  
    # print ID of current process 
    print("ID of process running main program: {}".format(os.getpid())) 
  
    # print name of main thread 
    print("Main thread name: {}".format(threading.current_thread().name)) 
  
    # creating threads 
    t1 = threading.Thread(target=task1, name='t1') 
    t2 = threading.Thread(target=task2, name='t2')   
  
    # starting threads 
    t1.start() 
    t2.start() 
  
    # wait until all threads finish 
    t1.join() 
    t2.join() 

ID of process running main program: 6661
Main thread name: MainThread
Task 1 assigned to thread: t1
ID of process running task 1: 6661
Task 2 assigned to thread: t2
ID of process running task 2: 6661


### The program above runs two threads in the same process i.e. the same core

# When is then multi-threading useful?

### Multi threading is useful for I/O bound operations like downloading data or pushing data to a database. This operations require the interpreter to wait until either the download or the database update is executed. By creating multiple threads we can, instead of waiting have multiple downloads simultaneously or execute more updates while waiting

# Some remarks on multi threading

### As threads are run in the same CPU core and environment, we need to be careful to protect global variables 


In [2]:
import threading 
  
# global variable x 
x = 0
  
def increment(): 
    """ 
    function to increment global variable x 
    """
    global x 
    x += 1
  
def thread_task(): 
    """ 
    task for thread 
    calls increment function 100000 times. 
    """
    for _ in range(100000): 
        increment() 
  
def main_task(): 
    global x 
    # setting global variable x as 0 
    x = 0
  
    # creating threads 
    t1 = threading.Thread(target=thread_task) 
    t2 = threading.Thread(target=thread_task) 
  
    # start threads 
    t1.start() 
    t2.start() 
  
    # wait until threads finish their job 
    t1.join() 
    t2.join() 
  
if __name__ == "__main__": 
    for i in range(10): 
        main_task() 
        print("Iteration {0}: x = {1}".format(i,x)) 

Iteration 0: x = 200000
Iteration 1: x = 123526
Iteration 2: x = 156748
Iteration 3: x = 168165
Iteration 4: x = 177653
Iteration 5: x = 200000
Iteration 6: x = 180494
Iteration 7: x = 200000
Iteration 8: x = 199976
Iteration 9: x = 200000


### As both threads are accessing the global variable $x$ we have no way to ensure that the output will be as expected since the interpreter decides in which order the actions within threads will be executed. This is known as race condition

# Thread lock
### One way to protect our code is to use a lock. A lock essentially makes sure that a locked piece of code is only accessed by one thread at a time until is fully executed

In [3]:
import threading 
  
# global variable x 
x = 0
  
def increment(): 
    """ 
    function to increment global variable x 
    """
    global x 
    x += 1
  
def thread_task(lock): 
    """ 
    task for thread 
    calls increment function 100000 times. 
    """
    for _ in range(100000):
        ########################################################################################
        ###### The piece of code below is locked and can only be accessed when thread at a time#
        ########################################################################################
        lock.acquire() 
        increment() 
        lock.release() 
  
def main_task(): 
    global x 
    # setting global variable x as 0 
    x = 0
  
    # creating a lock 
    lock = threading.Lock() 
  
    # creating threads 
    t1 = threading.Thread(target=thread_task, args=(lock,)) 
    t2 = threading.Thread(target=thread_task, args=(lock,)) 
  
    # start threads 
    t1.start() 
    t2.start() 
  
    # wait until threads finish their job 
    t1.join() 
    t2.join() 
  
if __name__ == "__main__": 
    for i in range(10): 
        main_task() 
        print("Iteration {0}: x = {1}".format(i,x)) 

Iteration 0: x = 200000
Iteration 1: x = 200000
Iteration 2: x = 200000
Iteration 3: x = 200000
Iteration 4: x = 200000
Iteration 5: x = 200000
Iteration 6: x = 200000
Iteration 7: x = 200000
Iteration 8: x = 200000
Iteration 9: x = 200000


# Thread pools

### Obviosly manually creating threads one at a time is not very efficient. Luckily, Python offers a threadpool library which provides a systematic way of creating as many threads as you want.

In [4]:
from concurrent.futures import ThreadPoolExecutor
import multiprocessing
import concurrent.futures
def task(n):
    print("Processing {}".format(n))
    return n

number_of_threads=8
pool = ThreadPoolExecutor(number_of_threads)


futures=[]
### We sumbit to the Threadpool all the tasks we want to perform
for i in range(20): 
    futures.append(pool.submit(task,i)) #Syntax is pool.submit(function,args)
results=[]

###As threads are completed we collect the results
for F in concurrent.futures.as_completed(futures):
    results.append(F.result())
    

pool.shutdown(wait=True)
print("My result list",results)


Processing 0
Processing 1
Processing 2Processing 3

Processing 4
Processing 5
Processing 6
Processing 7
Processing 8
Processing 9
Processing 10
Processing 11
Processing 12
Processing 13
Processing 14
Processing 15
Processing 16
Processing 17
Processing 18
Processing 19
My result list [2, 0, 7, 3, 6, 4, 1, 5, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]


### As the interpreter optimises thread execution we have no control over the order of the results

### We can do a similar code using .map() functionality which will also make sure that the results is presented to us in the same order that was submited

In [5]:
pool = ThreadPoolExecutor(number_of_threads)

results=pool.map(task,range(15))

pool.shutdown(wait=True)
print(results)
for value in results:
    print(value)

Processing 0
Processing 1Processing 2

Processing 3
Processing 4
Processing 5Processing 6

Processing 7
Processing 8Processing 9Processing 10
Processing 11
Processing 12
Processing 13Processing 14



<generator object Executor.map.<locals>.result_iterator at 0x7f06f03c22b0>
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14


# A practical example with improved performance

In [6]:
import time
import requests
import concurrent.futures


def get_wiki_page_existence(wiki_page_url, timeout=10):
    response = requests.get(url=wiki_page_url, timeout=timeout)

    page_status = "unknown"
    if response.status_code == 200:
        page_status = "exists"
    elif response.status_code == 404:
        page_status = "does not exist"

    return wiki_page_url + " - " + page_status

wiki_page_urls = ["https://en.wikipedia.org/wiki/" + str(i) for i in range(50)]

print("Running without threads:")
without_threads_start = time.time()
for url in wiki_page_urls:
    print(get_wiki_page_existence(wiki_page_url=url))
print("Without threads time:", time.time() - without_threads_start)

Running without threads:
https://en.wikipedia.org/wiki/0 - exists
https://en.wikipedia.org/wiki/1 - exists
https://en.wikipedia.org/wiki/2 - exists
https://en.wikipedia.org/wiki/3 - exists
https://en.wikipedia.org/wiki/4 - exists
https://en.wikipedia.org/wiki/5 - exists
https://en.wikipedia.org/wiki/6 - exists
https://en.wikipedia.org/wiki/7 - exists
https://en.wikipedia.org/wiki/8 - exists
https://en.wikipedia.org/wiki/9 - exists
https://en.wikipedia.org/wiki/10 - exists
https://en.wikipedia.org/wiki/11 - exists
https://en.wikipedia.org/wiki/12 - exists
https://en.wikipedia.org/wiki/13 - exists
https://en.wikipedia.org/wiki/14 - exists
https://en.wikipedia.org/wiki/15 - exists
https://en.wikipedia.org/wiki/16 - exists
https://en.wikipedia.org/wiki/17 - exists
https://en.wikipedia.org/wiki/18 - exists
https://en.wikipedia.org/wiki/19 - exists
https://en.wikipedia.org/wiki/20 - exists
https://en.wikipedia.org/wiki/21 - exists
https://en.wikipedia.org/wiki/22 - exists
https://en.wikipedi

In [7]:
def get_wiki_page_existence(wiki_page_url, timeout=10):
    response = requests.get(url=wiki_page_url, timeout=timeout)

    page_status = "unknown"
    if response.status_code == 200:
        page_status = "exists"
    elif response.status_code == 404:
        page_status = "does not exist"

    return wiki_page_url + " - " + page_status
wiki_page_urls = ["https://en.wikipedia.org/wiki/" + str(i) for i in range(50)]

print("Running threaded 1:")
threaded_start = time.time()
number_of_threads=8
pool = ThreadPoolExecutor(number_of_threads)


pool.map(get_wiki_page_existence,wiki_page_urls)
pool.shutdown(wait=True)

print("Threaded time:", time.time() - threaded_start)

#####################

print("Running threaded 2:")
threaded_start = time.time()
pool = ThreadPoolExecutor(number_of_threads)
futures = []
for url in wiki_page_urls:
    futures.append(pool.submit(get_wiki_page_existence, wiki_page_url=url))
for future in concurrent.futures.as_completed(futures):
    print(future.result())
pool.shutdown(wait=True)

print("Threaded time:", time.time() - threaded_start)

Running threaded 1:
Threaded time: 7.253835916519165
Running threaded 2:
https://en.wikipedia.org/wiki/3 - exists
https://en.wikipedia.org/wiki/7 - exists
https://en.wikipedia.org/wiki/2 - exists
https://en.wikipedia.org/wiki/1 - exists
https://en.wikipedia.org/wiki/0 - exists
https://en.wikipedia.org/wiki/4 - exists
https://en.wikipedia.org/wiki/10 - exists
https://en.wikipedia.org/wiki/5 - exists
https://en.wikipedia.org/wiki/11 - exists
https://en.wikipedia.org/wiki/13 - exists
https://en.wikipedia.org/wiki/8 - exists
https://en.wikipedia.org/wiki/14 - exists
https://en.wikipedia.org/wiki/12 - exists
https://en.wikipedia.org/wiki/9 - exists
https://en.wikipedia.org/wiki/20 - exists
https://en.wikipedia.org/wiki/21 - exists
https://en.wikipedia.org/wiki/6 - exists
https://en.wikipedia.org/wiki/22 - exists
https://en.wikipedia.org/wiki/17 - exists
https://en.wikipedia.org/wiki/23 - exists
https://en.wikipedia.org/wiki/24 - exists
https://en.wikipedia.org/wiki/25 - exists
https://en.wi

# Multi Processing

### In simple words, multiprocessing refers to the ability of a system to support more than one processor at the same time. Applications in a multiprocessing system are broken to smaller routines that run independently. The operating system allocates these threads to the processors improving performance of the system.

### A multiprocessing system can have:

### a)multiprocessor, i.e. a computer with more than one central processor.
### b) multi-core processor, i.e. a single computing component with two or more independent actual processing units (called “cores”).
#### Here, the CPU can easily executes several tasks at once, with each task using its own processor.

### Lucky for us Python's standard package includes a multiprocessing library

In [8]:
import multiprocessing 
import os 
  
def worker1(): 
    # printing process id 
    print("ID of process running worker1: {}".format(os.getpid())) 
  
def worker2(): 
    # printing process id 
    print("ID of process running worker2: {}".format(os.getpid())) 
  
if __name__ == "__main__": 
    # printing main program process id 
    print("ID of main process: {}".format(os.getpid())) 
  
    # creating processes 
    p1 = multiprocessing.Process(target=worker1) 
    p2 = multiprocessing.Process(target=worker2) 
  
    # starting processes 
    p1.start() 
    p2.start() 
  
    # process IDs 
    print("ID of process p1: {}".format(p1.pid)) 
    print("ID of process p2: {}".format(p2.pid)) 
  
    # wait until processes are finished 
    p1.join() 
    p2.join() 
  
    # both processes finished 
    print("Both processes finished execution!") 
  
    # check if processes are alive 
    print("Process p1 is alive: {}".format(p1.is_alive())) 
    print("Process p2 is alive: {}".format(p2.is_alive())) 

ID of main process: 6661
ID of process running worker1: 8913
ID of process running worker2: 8914
ID of process p1: 8913
ID of process p2: 8914
Both processes finished execution!
Process p1 is alive: False
Process p2 is alive: False


### As you can see now each process gets a different ID, meaning it is executed at a different core. Moreover, multiprocessing in Python means that each process will run with a independent GIL

# Queues

### Tipically on a multi processing task we have a number of jobs to do, which could potentially be larger than the number of cores available in the computer. We use queues to pile the number of jobs to do and dispatch them on a first in first out (FIFO) basis. Most importantly queues are thread-safe meaning that only one process can access the queue at a time

In [9]:
queue = multiprocessing.Queue()

In [10]:
queue

<multiprocessing.queues.Queue at 0x7f06f013c630>

In [11]:
queue.put("hello")
queue.put("world")

In [12]:
queue.get()

'hello'

In [13]:
queue.get()

'world'

In [14]:
import multiprocessing
import os
import random
def worker(queue):
    while not queue.empty():
        print("Process "+str(os.getpid())+"received: " + str(queue.get()))
    

    
queue = multiprocessing.Queue()

for i in range(20):
    queue.put(i)
num_processes=4
processes=[]
for _ in range(num_processes):
    p=multiprocessing.Process(target=worker, args=[queue])
    processes.append(p)
    p.start()


for pr in processes:
    pr.join()




Process 9056received: 0
Process 9056received: 2Process 9057received: 1
Process 9066received: 3

Process 9056received: 6Process 9071received: 4Process 9057received: 5

Process 9066received: 7
Process 9057received: 8Process 9071received: 9

Process 9066received: 10
Process 9057received: 11

Process 9071received: 12Process 9066received: 13Process 9057received: 14

Process 9071received: 15
Process 9066received: 16
Process 9057received: 17Process 9071received: 18


Process 9056received: 19


# Process pools
### Similar to threadpools we can use process pools with exactl the same user case as before

In [15]:
from concurrent.futures import ProcessPoolExecutor
import multiprocessing
import concurrent.futures
def task(n):
    print("Processing {}".format(n))
    return n

number_of_threads=4
pool = ProcessPoolExecutor(number_of_threads)


futures=[]
### We sumbit to the Threadpool all the tasks we want to perform
for i in range(20): 
    futures.append(pool.submit(task,i)) #Syntax is pool.submit(function,args)
results=[]

###As threads are completed we collect the results
for F in concurrent.futures.as_completed(futures):
    results.append(F.result())
    

pool.shutdown(wait=True)
print("My result list",results)

Processing 0Processing 3Processing 1Processing 2



Processing 4Processing 6Processing 7
Processing 8
Processing 9

Processing 10
Processing 11
Processing 12

Processing 14Processing 13Processing 15
Processing 5
Processing 16
Processing 17


Processing 18
Processing 19
My result list [2, 0, 3, 1, 4, 6, 7, 8, 9, 10, 11, 12, 14, 13, 5, 16, 17, 15, 18, 19]


# Multi processing and random numbers

### One needs to be careful with random number generation and multi processing, as we could be using the same random number twice

In [16]:
import numpy as np
import random
number_of_threads=4
pool = ProcessPoolExecutor(number_of_threads)

def Foo_np(seed=None):
    return np.random.uniform(0, 1)

results=pool.map(Foo_np, range(20))
for result in results:
    print(result)

0.408061032115131
0.408061032115131
0.408061032115131
0.408061032115131
0.6096351815323336
0.5753350473409706
0.6096351815323336
0.6096351815323336
0.6096351815323336
0.9928851053867359
0.5753350473409706
0.5753350473409706
0.5753350473409706
0.6811398943987135
0.9928851053867359
0.4997081962297416
0.9928851053867359
0.6296222884818174
0.9928851053867359
0.6811398943987135


### One needs to provide the random seed explicitly

In [17]:
import numpy as np
import random
number_of_threads=4
pool = ProcessPoolExecutor(number_of_threads)

def Foo_np(seed=None):
    np.random.seed(seed)
    return np.random.uniform(0, 1, 1)

results=pool.map(Foo_np, range(20))
for result in results:
    print(result)

[0.5488135]
[0.417022]
[0.4359949]
[0.5507979]
[0.96702984]
[0.22199317]
[0.89286015]
[0.07630829]
[0.8734294]
[0.01037415]
[0.77132064]
[0.18026969]
[0.15416284]
[0.77770241]
[0.51394334]
[0.8488177]
[0.22329108]
[0.294665]
[0.65037424]
[0.0975336]


# A simple Monte Carlo example: computing Pi

In [18]:
def compute_pi_python(num_sim,seed):
    np.random.seed(seed)
    x=2.0*(np.random.random(size=num_sim)-0.5)# np.random.random returns U[0,1] so we reescale it to U[-1,1]
    y=2.0*(np.random.random(size=num_sim)-0.5)

    inside=np.sum(x*x+y*y<=1)
    
    
    pi=inside/num_sim*4

    return pi

In [19]:
num_sim=10000000

In [20]:
%timeit compute_pi_python(num_sim,0)

441 ms ± 125 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [21]:
compute_pi_python(num_sim,0)

3.14129

In [22]:
import numpy as np
import random
number_of_threads=4 
def parallel_compute_pi_python(number_of_threads,num_sim):
    pool = ProcessPoolExecutor(number_of_threads)


    results=pool.map(compute_pi_python,[int(num_sim/number_of_threads)]*number_of_threads, range(number_of_threads))
    MC_mean=0
    for result in results:
        MC_mean+=result
    return MC_mean/number_of_threads

In [23]:
parallel_compute_pi_python(4,num_sim)

3.1413320000000002

In [24]:
%timeit parallel_compute_pi_python(4,num_sim)

164 ms ± 8.8 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


### Remark 1: For multithreading to start to be noticeable the Monte Carlo simulation needs to be heavy as setting up the process pool adds overhead

### Remark 2: We have made the oversimplication of changing the random seed arbitrarily in each process. This could cause that a sequence of random variables could be repeated accross processes and generate correlation. For the single process and multi-process approach to agree on the estimate one would need to set the correct seed on each process. This topic is outside the scope of this course, but one needs to be very careful with this!