<img style="float: right;" src="http://www2.le.ac.uk/liscb1.jpg">
# Parallelisation in Python

In this notebook we'll go over how to use the multiprocessing library to enable threading in Python. There are a few important things to note first:
1. Don't call too many processes. This module follows the idea of threading - i.e. shared memory parallelisation. This means you shouldn't run more sub threads than the number of processors on your computer. 
1. Be careful with memory! If you're passing objects to each subprocess, they will be copied, which could use up a lot of memory.
1. Not everything can be passed to a subprocess. Class objects can't be parsed to a subprocess.

## Basics
The simplest way to spawn a subprocess is to use the `Process` class. The `.start()` method launches the new process, while `.join()` waits for the process to finish.

In [1]:
from multiprocessing import Process

def f(name):
    print('Hello', name)
    return

if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

Hello bob


We can pass in variable arguments by spawning the subprocesses within a loop. Each argument is serialized using [Pickle](https://pymotw.com/2/pickle/index.html#module-pickle).

In [2]:
def worker(num):
    """thread worker function"""
    print('Worker:', num)
    return

if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()

Worker: 0
Worker: 1
Worker: 2
Worker: 3
Worker: 4


It isn't neccessary to pass identifying data, such as a name, to each subprocess. There are methods that allow you to find that information.

In [3]:
import multiprocessing
import time

def worker():
    name = multiprocessing.current_process().name
    print(name, 'Starting')
    time.sleep(2)
    print(name, 'Exiting')

def my_service():
    name = multiprocessing.current_process().name
    print(name, 'Starting')
    time.sleep(3)
    print(name, 'Exiting')

if __name__ == '__main__':
    service = multiprocessing.Process(name='my_service', target=my_service)
    worker_1 = multiprocessing.Process(name='worker 1', target=worker)
    worker_2 = multiprocessing.Process(target=worker) # use default name

    worker_1.start()
    worker_2.start()
    service.start()

worker 1 Starting
Process-9 Starting
my_service Starting
worker 1 Exiting
Process-9 Exiting
my_service Exiting


## Communicating  results
It's possible that some of the previous processes printed their output to the console at the same time, essentially overwriting each other! To stop this behaviour, you need to add a thread-safe way to return data or print messages.

One of the safest way to return data from a subprocess is to use a `Queue`.

In [4]:
from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())    # prints "[42, None, 'hello']"
    p.join()

[42, None, 'hello']


Another method is to use a `Lock` to attach each process to the console/output. While a process is locked to the output, no other process can write to it.

In [5]:
from multiprocessing import Process, Lock

def f(l, i):
    l.acquire()
    print('hello world', i)
    l.release()

if __name__ == '__main__':
    lock = Lock()

    for num in range(10):
        Process(target=f, args=(lock, num)).start()

hello world 0
hello world 1
hello world 2
hello world 3
hello world 4
hello world 5
hello world 6
hello world 7
hello world 8
hello world 9


## Using a pool of workers
Instead of launching subprocesses within a loop, you can use the Pool class to control a group of workers. Subprocesses will then only start if a worker is available. There are a number of different ways to launch jobs using a Pool, including `.map` and `.apply_async`, both of which are demonstrated below.

In [7]:
from multiprocessing import Pool, TimeoutError
import time
import os

def f(x):
    return x*x

if __name__ == '__main__':
    # start 4 worker processes
    with Pool(processes=4) as pool:

        # print "[0, 1, 4,..., 81]"
        print("Using .map")
        print(pool.map(f, range(10)))

        # print same numbers in arbitrary order
        print("Using .imap_unordered")
        for i in pool.imap_unordered(f, range(10)):
            print(i)

        # evaluate "f(20)" asynchronously
        print("Using .apply_async")
        res = pool.apply_async(f, (20,))      # runs in *only* one process
        print(res.get(timeout=1))             # prints "400"

        print("Using .apply_async")
        # evaluate "os.getpid()" asynchronously
        res = pool.apply_async(os.getpid, ()) # runs in *only* one process
        print(res.get(timeout=1))             # prints the PID of that process

        print("Using .apply_async in a list")
        # launching multiple evaluations asynchronously *may* use more processes
        multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
        print([res.get(timeout=1) for res in multiple_results])
        
        print("Using .apply_async knowing it will timeout")
        # make a single worker sleep for 10 secs
        res = pool.apply_async(time.sleep, (10,))
        try:
            print(res.get(timeout=1))
        except TimeoutError:
            print("We lacked patience and got a multiprocessing.TimeoutError")

        print("For the moment, the pool remains available for more work")

    # exiting the 'with'-block has stopped the pool
    print("Now the pool is closed and no longer available")

Using .map
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
Using .imap_unordered
0
1
4
16
9
36
25
49
81
64
Using .apply_async
400
Using .apply_async
23338
Using .apply_async in a list
[23335, 23336, 23337, 23336]
Using .apply_async knowing it will timeout
We lacked patience and got a multiprocessing.TimeoutError
For the moment, the pool remains available for more work
Now the pool is closed and no longer available


## Exercise

Use multiprocessing.Pool to run a simple function over 10 different input values. 
1. First create your function using `def`, set it to return a number based on the input value.
1. Within the main function (i.e. `if __name__ == "__main__":`) create a Pool of 5 workers.
2. Use `apply_async` to run your function, using the pool of workers over a list of 10 input values. 
3. Loop through the subprocesses and print each result.

Now add a time.sleep() to your function, does it change the results? Are you querying processes that are still running? Can you make sure each processe waits until it is complete before you print the results?