# Multiprocessing

## How to get the number of CPUs in Python?

The Python standard library provides two ways:

 * `multiprocessing.cpu_count()` function
 * `os.cpu_count()` function.

Let's try one of these

In [1]:
# TODO

In [2]:
import multiprocessing
multiprocessing.cpu_count()

12

In [3]:
import os
os.cpu_count()

12

## How to use multiprocessing.Process in a for-loop?

Let's define this simple task function

In [4]:
from time import sleep
from random import random

# execute a task
def task(arg):
    # block during 1 second
    sleep(1)
    # report a message
    print(f'Task {arg} done', flush=True)

In [5]:
%%time 
task(1)

Task 1 done
CPU times: user 7.34 ms, sys: 4.23 ms, total: 11.6 ms
Wall time: 1 s


**Sequential execution**

In [6]:
%%time
for i in range(20):
    task(i)

# report that all tasks are completed
print('Done', flush=True)

Task 0 done
Task 1 done
Task 2 done
Task 3 done
Task 4 done
Task 5 done
Task 6 done
Task 7 done
Task 8 done
Task 9 done
Task 10 done
Task 11 done
Task 12 done
Task 13 done
Task 14 done
Task 15 done
Task 16 done
Task 17 done
Task 18 done
Task 19 done
Done
CPU times: user 59.1 ms, sys: 5.05 ms, total: 64.1 ms
Wall time: 20.1 s


**Parallel execution**

In the loop above, the `task` method calls are all independent. This is an embarrassingly parallel problem.

Try to use `multiprocessing.Process` in order to parallelize the loop. To do so, create a `multiprocessing.Process` for each task and use the `start()` method on each Process to run it. Do not forget to wait for all the processes to complete

In [7]:
%%time
import multiprocessing

# TODO

# report that all tasks are completed
print('Done', flush=True)

Done
CPU times: user 2.33 ms, sys: 83 µs, total: 2.42 ms
Wall time: 2.02 ms


In [8]:
%%time
import multiprocessing

# Create all the processes
processes = [ multiprocessing.Process(target=task, args=(i,)) for i in range(24) ]

# Start all the processes
for process in processes:
    process.start()

# wait for all processes to complete
for process in processes:
    process.join()
# report that all tasks are completed
print('Done', flush=True)

Task 2 done
Task 1 doneTask 0 done

Task 3 done
Task 4 done
Task 5 done
Task 6 done
Task 7 done
Task 8 done
Task 9 done
Task 10 done
Task 11 done
Task 12 done
Task 13 done
Task 14 done
Task 15 done
Task 16 done
Task 17 done
Task 18 done
Task 19 done
Task 20 done
Task 21 done
Task 22 done
Task 23 done
Done
CPU times: user 77.8 ms, sys: 103 ms, total: 181 ms
Wall time: 1.15 s


The loop should run in just over 1 second, even if you don't have 20 cpus on your machine. This is because the task uses no cpu and each OS has a mechanism for switching between processes when they're not doing anything. As you can see, the CPU user time is very low, which proves that the loop doesn't consume CPU. 

Let's start again by defining a task that actually consumes CPU. 

In [9]:
from time import sleep, perf_counter
from random import random

# execute a task
def task_cpu(arg):
    x=random()
    t=perf_counter()
    while (perf_counter()-t) < 10.0: # Do a calculation during 10 seconds
        x *=x
    # report a message
    print(f'Task {arg} done', flush=True)

Let's run one task. Look a the result of time. What happened?

In [10]:
%%time
task_cpu(1)

Task 1 done
CPU times: user 9.99 s, sys: 5.1 ms, total: 10 s
Wall time: 10 s


You can see now that the CPU user time is clode to 10 second. That means that this task actually consume 100% of one CPU during 10 second.

Let's try to parallalize the for-loop with the new task function. Try interpreting the wall-time of the loop execution depending of the number of CPUs on your machine.

In [11]:
%%time 
import multiprocessing

# TODO

# report that all tasks are completed
print('Done', flush=True)

Done
CPU times: user 2.13 ms, sys: 16 µs, total: 2.15 ms
Wall time: 1.7 ms


In [12]:
%%time
import multiprocessing

# Create all the processes
processes = [ multiprocessing.Process(target=task_cpu, args=(i,)) for i in range(24) ]

# Start all the processes
for process in processes:
    process.start()

# wait for all processes to complete
for process in processes:
    process.join()
    
# report that all tasks are completed
print('Done', flush=True)

Task 2 doneTask 4 doneTask 0 done
Task 5 done

Task 7 doneTask 3 done

Task 6 doneTask 9 done

Task 10 doneTask 8 done

Task 11 doneTask 1 done

Task 13 doneTask 14 done
Task 15 done
Task 12 done


Task 17 doneTask 16 done

Task 18 done
Task 19 done
Task 20 done
Task 21 done
Task 22 done
Task 23 done
Done
CPU times: user 26.8 ms, sys: 164 ms, total: 191 ms
Wall time: 10.2 s


## How to use multiprocessing.Pool in a for-loop?

In this example we created 24 processes, one for each task, even though we may have far fewer CPU cores in our system. Creating a process takes time. In order to be more efficient, we can use a **process pool**.

A process pool is a programming pattern for automatically managing a pool of worker processes.

The pool is responsible for a fixed number of processes.
 * It controls when they are created, such as when they are needed.
 * It also controls what they should do when they are not being used, such as making them wait without consuming computational resources.

Try to parallelize the for-loop example with `multiprocessing.Pool`. You can use `map` to execute tasks.

In [13]:
%%time

# TODO

# report that all tasks are completed
print('Done', flush=True)

Done
CPU times: user 1.87 ms, sys: 14 µs, total: 1.88 ms
Wall time: 1.43 ms


In [14]:
%%time

# create a process pool that uses all cpus
with multiprocessing.Pool() as pool:
	# call the function for each item in parallel
    pool.map(task_cpu, range(24))

# report that all tasks are completed
print('Done', flush=True)

Task 4 doneTask 0 doneTask 5 doneTask 2 doneTask 8 doneTask 1 doneTask 9 doneTask 3 doneTask 10 done




Task 11 done




Task 6 done
Task 7 done
Task 12 doneTask 14 doneTask 15 doneTask 17 doneTask 13 done
Task 18 done
Task 16 done




Task 19 done
Task 20 done
Task 21 done
Task 22 done
Task 23 done
Done
CPU times: user 62.7 ms, sys: 67.9 ms, total: 131 ms
Wall time: 20.2 s


Without arguments, the process pool uses all the CPU. Try to configure the number of CPU

In [15]:
%%time

# TODO

# report that all tasks are completed
print('Done', flush=True)

Done
CPU times: user 2.28 ms, sys: 0 ns, total: 2.28 ms
Wall time: 1.88 ms


In [16]:
%%time

# create a process pool that uses all cpus
with multiprocessing.Pool(processes=4) as pool:
	# call the function for each item in parallel
    pool.map(task_cpu, range(24))

# report that all tasks are completed
print('Done', flush=True)

Task 0 doneTask 6 doneTask 4 doneTask 2 done



Task 1 doneTask 7 doneTask 5 doneTask 3 done



Task 8 doneTask 10 doneTask 12 doneTask 14 done



Task 11 doneTask 15 doneTask 9 doneTask 13 done



Task 16 doneTask 20 doneTask 18 doneTask 22 done



Task 21 doneTask 19 doneTask 17 doneTask 23 done



Done
CPU times: user 38.2 ms, sys: 24 ms, total: 62.2 ms
Wall time: 1min


### Use the interface concurrent.futures

The concurrent.futures module provides a high-level interface for asynchronously executing callables.
The asynchronous execution can be performed with separate processes, using ProcessPoolExecutor.

Try to parallelize the for-loop with concurrent.futures

In [17]:
%%time 
import concurrent.futures

# TODO

# report that all tasks are completed
print('Done', flush=True)

Done
CPU times: user 0 ns, sys: 2.68 ms, total: 2.68 ms
Wall time: 2.15 ms


In [18]:
%%time
import concurrent.futures

with concurrent.futures.ProcessPoolExecutor() as executor:
    executor.map(task_cpu,range(24))

# report that all tasks are completed
print('Done', flush=True)

Task 0 doneTask 3 doneTask 1 doneTask 8 doneTask 5 doneTask 2 doneTask 9 doneTask 4 doneTask 6 doneTask 10 doneTask 7 doneTask 11 done











Task 12 doneTask 13 doneTask 15 doneTask 17 doneTask 16 doneTask 14 done

Task 18 done


Task 19 doneTask 21 done
Task 20 done



Task 23 doneTask 22 done

Done
CPU times: user 45.7 ms, sys: 96 ms, total: 142 ms
Wall time: 20.1 s


## Synchronization between processes

### Merge for-loop result

Let's define another task function. Now the function return a value.

In [19]:
from time import sleep
from random import random

# execute a task
def task_with_return(arg):
    x = random()
    # block during 1 second
    sleep(1)
    # report a message
    print(f'Task {arg} x = {x}', flush=True)
    #return
    return x

In [20]:
task_with_return(1)

Task 1 x = 0.02061786654731157


0.02061786654731157

**Sequential execution**

In [21]:
%%time 
result = 0

for i in range(24):
    result += task_with_return(i)

print(f'result = {result}')

Task 0 x = 0.07042293226994845
Task 1 x = 0.8027931897983235
Task 2 x = 0.15314958476143203
Task 3 x = 0.5119739029465238
Task 4 x = 0.31509560155347205
Task 5 x = 0.5885714998671596
Task 6 x = 0.5576350108411092
Task 7 x = 0.21875426084405836
Task 8 x = 0.4661202239594141
Task 9 x = 0.45792899284859745
Task 10 x = 0.04686538323461831
Task 11 x = 0.3748445473033528
Task 12 x = 0.38369702370146674
Task 13 x = 0.47664129066995964
Task 14 x = 0.08745084661007263
Task 15 x = 0.44279981590755924
Task 16 x = 0.6507507214834095
Task 17 x = 0.4442227494578893
Task 18 x = 0.7849689768995336
Task 19 x = 0.8233049581369861
Task 20 x = 0.7908920506607124
Task 21 x = 0.8382518267643617
Task 22 x = 0.36210922512100296
Task 23 x = 0.3023940538755846
result = 10.95163866951655
CPU times: user 66.3 ms, sys: 479 µs, total: 66.8 ms
Wall time: 24.1 s


**Parallel execution**

In [22]:
%%time
import concurrent.futures
from concurrent.futures import as_completed

result = 0

with concurrent.futures.ProcessPoolExecutor(max_workers=8) as executor:
    results = executor.map(task_with_return,range(24))
    for res in results:
        result += res

print(f'result = {result}')

Task 4 x = 0.6917439765457093Task 0 x = 0.8133627285664992Task 1 x = 0.04143390234437738Task 2 x = 0.13397935538713757Task 7 x = 0.3652963405675773Task 3 x = 0.8232202076433772Task 5 x = 0.4129277235699932Task 6 x = 0.389477612498472







Task 9 x = 0.0945893583923143Task 8 x = 0.2527632588471729Task 11 x = 0.3053253361321402Task 10 x = 0.5394807655581709Task 12 x = 0.17775185400103288Task 14 x = 0.8988999556196607Task 13 x = 0.01840864388953023


Task 15 x = 0.16277537661500407




Task 18 x = 0.1862193791387492Task 17 x = 0.645494765614521Task 22 x = 0.5963191113469579Task 16 x = 0.28306904463876603Task 19 x = 0.7719307617669617Task 21 x = 0.576663315652216Task 20 x = 0.08127373901058121Task 23 x = 0.9702781654421091







result = 10.232684678789031
CPU times: user 39 ms, sys: 52.4 ms, total: 91.4 ms
Wall time: 3.08 s


### Lock

Let's define a task with a lock. Only one process can have the lock at any time. If a process does not release an acquired lock, it cannot be acquired again.

In [23]:
from time import sleep

def task_with_lock(arg, lock):
    with lock:
        print(f'Task {arg} got the lock')
        sleep(1)
        print(i, 'world')

Try to parallelize a for-loop. Use `multiprocessing.Manager` to create a lock.

In [24]:
%%time
import multiprocessing
import  concurrent.futures 

# TODO

CPU times: user 10 µs, sys: 0 ns, total: 10 µs
Wall time: 14.8 µs


In [42]:
%%time
import multiprocessing
import  concurrent.futures 

with concurrent.futures.ProcessPoolExecutor(max_workers=8) as executor:
    m = multiprocessing.Manager()
    lock = m.Lock()
    futures = [executor.submit(task_with_lock, num, lock) for num in range(24)]
    for future in futures:
        future.result()


Task 0 got the lock
19 world
Task 2 got the lock
19 world
Task 1 got the lock
19 world
Task 5 got the lock
19 world
Task 3 got the lock
19 world
Task 4 got the lock
19 world
Task 7 got the lock
19 world
Task 6 got the lock
19 world
Task 8 got the lock
19 world
Task 9 got the lock
19 world
Task 10 got the lock
19 world
Task 11 got the lock
19 world
Task 12 got the lock
19 world
Task 13 got the lock
19 world
Task 14 got the lock
19 world
Task 15 got the lock
19 world
Task 16 got the lock
19 world
Task 17 got the lock
19 world
Task 18 got the lock
19 world
Task 19 got the lock
19 world
Task 20 got the lock
19 world
Task 21 got the lock
19 world
Task 22 got the lock
19 world
Task 23 got the lock
19 world
CPU times: user 149 ms, sys: 92.5 ms, total: 241 ms
Wall time: 24.4 s


## Handle exception

Try to parallelize the loop with sub You can use `as_completed()` method to gather the result.

In [43]:
from time import sleep
from random import random

# execute a task
def task_with_exc(arg):
    # block during 1 second
    sleep(1)
    # Exception
    if random() < 0.5:
        raise Exception(f"Task {arg} error")
    # report a message
    return f'Task {arg} done'

In [44]:
%%time 
result = 0

try:
    for i in range(24):
        task_with_exc(i)
except Exception as e:
    print(e)
print(f'result = {result}')

Task 0 error
result = 0
CPU times: user 0 ns, sys: 1.82 ms, total: 1.82 ms
Wall time: 1 s


With map 

In [45]:
%%time
import concurrent.futures

with concurrent.futures.ProcessPoolExecutor(max_workers=8) as executor:
    results = executor.map(task_with_exc,range(24))
    try:
        for result in results:
            print(result)
    except:
        print("Error during execution")


Task 0 done
Task 1 done
Task 2 done
Error during execution
CPU times: user 18.3 ms, sys: 39.8 ms, total: 58.1 ms
Wall time: 3.06 s


Try to parallelize the loop with sub You can use `as_completed()` method to gather the result.

In [46]:
%%time

import concurrent.futures

with concurrent.futures.ProcessPoolExecutor(max_workers=8) as executor:
    # execute our tasks
    futures = [ executor.submit(task_with_exc, i) for i in range(24)]
    for future in concurrent.futures.as_completed(futures):
        # get the result from the task
        try:
            print(future.result())
        except Exception:
            print(future.exception())

Task 5 done
Task 4 done
Task 3 done
Task 6 done
Task 0 done
Task 7 error
Task 1 error
Task 2 error
Task 8 done
Task 9 done
Task 13 done
Task 14 done
Task 15 error
Task 10 error
Task 12 error
Task 11 error
Task 16 done
Task 18 error
Task 20 done
Task 19 error
Task 23 error
Task 22 done
Task 17 error
Task 21 error
CPU times: user 30.8 ms, sys: 35.9 ms, total: 66.7 ms
Wall time: 3.06 s


## Use share memory 

`multiprocessing.Array`enable to define an array in shared memory. So each process can have access to same array.

Try to fill an shared array with the id of the task 

In [47]:
import multiprocessing
    
def task_array(i):
    a[i] = i

# TODO

In [48]:
import multiprocessing

def init(arr):
    global a
    a = arr
    
def task_array(i):
    a[i] = i

N = 10
arr = multiprocessing.Array('i', [0]*N)
pool = multiprocessing.Pool(initializer=init, initargs=(arr,))
pool.map(task_array, range(N))
print(arr[:])

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]


### More exercises

#### Fibonacci number

In [49]:
# calculate the nth fibonacci number
def fibonacci(n):
    # check for the start of the sequence
    if n <= 1:
        return n
    return (fibonacci(n-1) + fibonacci(n-2))

In [50]:
for i in range(20):
    print(f'f({i}) = {fibonacci(i)}')

f(0) = 0
f(1) = 1
f(2) = 1
f(3) = 2
f(4) = 3
f(5) = 5
f(6) = 8
f(7) = 13
f(8) = 21
f(9) = 34
f(10) = 55
f(11) = 89
f(12) = 144
f(13) = 233
f(14) = 377
f(15) = 610
f(16) = 987
f(17) = 1597
f(18) = 2584
f(19) = 4181


1. Caclulate fibonacci numbers iteratively

In [51]:
# TODO

In [52]:
# calculate the nth fibonacci number
def fibonacci_iteratively(n):
    # Init
    f0, f1 = 0, 1
    # Loop
    for _ in range(0, n):
        f0, f1 = f1, (f1 + f0)
    return f0

In [53]:
for i in range(20):
    print(f'f({i}) = {fibonacci(i)}')

f(0) = 0
f(1) = 1
f(2) = 1
f(3) = 2
f(4) = 3
f(5) = 5
f(6) = 8
f(7) = 13
f(8) = 21
f(9) = 34
f(10) = 55
f(11) = 89
f(12) = 144
f(13) = 233
f(14) = 377
f(15) = 610
f(16) = 987
f(17) = 1597
f(18) = 2584
f(19) = 4181


2. Parallelize 

In [54]:
# TODO

In [55]:
%%time
import concurrent.futures

with concurrent.futures.ProcessPoolExecutor() as executor:
    # fibonacci numbers to calculate
    numbers = range(30)
    # mapping of external tasks to internal tasks
    size = 2
    # calculate concurrently
    fibs = executor.map(fibonacci, numbers, chunksize=size)
    # store results
    results = dict(zip(numbers, fibs))
    print(results)
print('Done')

{0: 0, 1: 1, 2: 1, 3: 2, 4: 3, 5: 5, 6: 8, 7: 13, 8: 21, 9: 34, 10: 55, 11: 89, 12: 144, 13: 233, 14: 377, 15: 610, 16: 987, 17: 1597, 18: 2584, 19: 4181, 20: 6765, 21: 10946, 22: 17711, 23: 28657, 24: 46368, 25: 75025, 26: 121393, 27: 196418, 28: 317811, 29: 514229}
Done
CPU times: user 29.5 ms, sys: 43.7 ms, total: 73.1 ms
Wall time: 318 ms


### Count numbers

Generate with the cell above 10 files containing numbers 

In [56]:
import os
import numpy as np
for i in range(20):
    with open(os.path.join(os.getcwd(), f'numbers_{i}.txt'), 'w') as f:
        f.write(os.linesep.join(map(str, [np.random.randint(0, 100) for _ in range(1000) ] )))

Try to calculate the occurrency of each number

In [57]:
# TODO

In [58]:
%%time
import concurrent.futures
import glob

def count_words(filepath):
   counts = {}
   with open(filepath, 'r') as f:
      words = [word.strip() for word in f.read().split()]

      for word in words:
         if word not in counts:
            counts[word] = 0
         counts[word] += 1
      return counts


def merge_counts(counts1, counts2):
   for word, count in counts2.items():
      if word not in counts1:
         counts1[word] = 0
      counts1[word] += counts2[word]
   return counts1

counts = {}

with concurrent.futures.ProcessPoolExecutor() as executor:
   per_doc_counts = executor.map(count_words, glob.glob('numbers_*.txt'))
   for per_doc_count in per_doc_counts:
       merge_counts(counts,per_doc_count)
   print(counts)

{'34': 202, '83': 211, '76': 197, '28': 200, '42': 187, '26': 204, '15': 195, '21': 192, '96': 211, '98': 220, '38': 174, '1': 219, '52': 178, '60': 177, '75': 192, '58': 220, '20': 203, '2': 204, '67': 206, '84': 207, '69': 215, '85': 215, '97': 173, '5': 238, '10': 175, '57': 203, '40': 209, '24': 181, '89': 205, '41': 171, '6': 222, '7': 176, '95': 195, '62': 195, '17': 229, '44': 194, '90': 206, '16': 196, '14': 192, '9': 197, '63': 213, '18': 190, '78': 190, '71': 195, '65': 188, '59': 209, '43': 219, '49': 218, '25': 211, '3': 211, '27': 212, '68': 217, '32': 224, '12': 230, '72': 207, '93': 204, '99': 204, '31': 213, '86': 192, '46': 217, '55': 203, '92': 197, '77': 195, '11': 207, '33': 189, '54': 217, '48': 193, '87': 191, '66': 192, '74': 196, '0': 209, '22': 201, '70': 191, '50': 204, '53': 182, '82': 175, '56': 195, '36': 193, '88': 180, '37': 179, '64': 199, '45': 202, '61': 205, '13': 178, '73': 211, '94': 198, '8': 196, '35': 196, '91': 196, '80': 205, '4': 200, '81': 18