# Part 1: Multiprocessing

In this first mini-tutorial, we are going to have a look at Python's `multiprocessing` module. This is a built-in module within the core Python modules and does not need any further installation.

Multiprocessing is a useful approach to parallelism where you can divide a task into separate sub-tasks that require minimal communication between tasks during computation time. Multiprocessing avoids the Python GIL issue by creating separate operating-system level processes, each running an instance of a Python interpreter. If tasks require commumication between each OS-level process, the multiprocessing module has functionality to coordinate this. However, multiprocessing that requires significant communications between tasks is likely to incur a lot of additional overhead.

A common use of the `multiprocessing` module is to parallelise a task over a set of operating system processes for a CPU-bound problem. I/O bound problems can also be solved using `multiprocessing`, though there are other alternatives for this which are more appropriate. (We do not cover them in these tutorials, but you may wish to investigate `asyncio`, which is standard from Python 3.4 onwards.)

In this example, we are going to introduce a set of examples that explore the multiprocessing module by calculating pi using a Monte Carlo approach. It's a simple problem that parallelises easily, although it may not be the most efficient way of actually calculating pi in practice! 

## Estimating Pi with a parallel Monte Carlo method

An interesting way to calculate pi is to imagine throwing darts or arrows at a target with a circle printed on it. If we assume where we hit on the target is random (we are not veryu good archers or darts players) then the relationship between the number of arrows hitting inside the circle compared to outside the circle can be used to help us estimate pi.

The workload can be split evenly across a number of processes, each one running a separate Python instance, on a separate CPU core.

To get a good estimate of pi using this method, we need to through around 10,000 darts at our target, which will give us an estimate to the first three decimal places. There are of course much better methods for estimating pi, but this is a nice example of using the `multiprocessing` module.

With the Monte Carlo method, we can use the Pythagorean principle to test if our dart has landed inside the circle.

`sqrt(x^2 + y^2) <= 1^2`

https://en.wikipedia.org/wiki/Pythagorean_theorem

Since we are using the _unit circle_ (The circle segment selected by drawing a square around the circle from the centre - basically a quarter of the full circle) we can simplify this further by taking out the square root operation:

`x^2 + y^2 <= 1`

The code to calculate this is as follows:



In [1]:
import random

def estimate_number_points_in_quarter_circle(number_estimates):
    number_trials_in_quarter_circle = 0
    for step in range(int(number_estimates)):
        x = random.uniform(0, 1)
        y = random.uniform(0, 1)
        is_in_unit_circle = x * x + y * y <= 1.0
        number_trials_in_quarter_circle += is_in_unit_circle
    return number_trials_in_quarter_circle

This is the Python implementation of our pi estimator using the circle method.

To solve this problem with parallelism, running a simulation with 10,000 dart throws, we could apportion the work between the number of CPU cores we have available to us, and do the computation in parallel. So, on a four-core CPU system, we could do 2,500 dart throws on each CPU core. 

Now we need to use the `multiprocessing` module to apportion our work in the function above between separate parallel processes. We are going to use the `Pool` object for this to manage a set of worker tasks that can share the workload of our problem.

In [2]:
from multiprocessing import Pool

Firstly we import the Pool class from the module. `Pool` wraps the the `Process` class API representing an OS-level preocess into a convenient pool of worker tasks that share a chunk of work and return an aggregated result.

In [3]:
import time

number_samples_in_total = 10000
number_parallel_blocks = 16

Next we create a task pool with a given number of processes to create. 

In [4]:
pool = Pool(processes=number_parallel_blocks)

# Calculate how many samples to send to each worker task, based on the number of parallel block
number_samples_per_worker_task = number_samples_in_total / number_parallel_blocks

print("Making {} samples per worker task".format(number_samples_per_worker_task))

Making 625.0 samples per worker task


Next, we create a list containing the number of estimates divided by the number of workers.

In [5]:
number_trials_per_process = [number_samples_per_worker_task] * number_parallel_blocks
t1 = time.time()

The next bit is where we use multiprocessing to distribute the task evenly among the worker processes. We do this using the `pool.map` method, passing the function we want to parallelise: our dart throwing pi estimator, `estimate_number_points_in_quarter_circle`, and the argument it takes: `number_estimates`. Because we are distributing this task over `n` processes, we pass the value we calculated for: `number_of_trials_per_process`.

The value we get back from the `pool.map` function will contain the same number of results as `number_trials_per_process`.

In [6]:
number_in_unit_circles = pool.map(estimate_number_points_in_quarter_circle, number_trials_per_process)

Therefore, to obtain our final estimate of pi, we simply sum up this list of results from each process, multiply by four (to get the full circle from our quarter circle) and divide by the total number of samples.

In [7]:
pi_estimate = sum(number_in_unit_circles) * 4 / number_samples_in_total

# Let's look at the results and how long it took
print("Estimated pi {}".format(pi_estimate))
print("Time Delta: {}".format(time.time()-t1))

Estimated pi 3.1488
Time Delta: 0.02374100685119629


Hopefully, you have a reasonable looking value of pi!

Try changing the initial paramters for number of parallel blocks and number of samples. 

Do you get a speed up if you keep increasing the `number_parallel_blocks`? _Hint: It will be dependent on the machine you are running this on, and the number of CPU cores it has._

#### A final note on Pools

With a Pool, we can split up a chunk of work we know the size of beforehand and distribute it amon the available CPUs. This is useful where we know that the workload is static, i.e. does vary considerably among processes, and that the workload is ready to distribute evenly at the start of the task. 

A Pool would not be the best choice necessarily if we have workloads that arrive over time. For this scenario, `multiprocessing` provides the `Queue` type.

### Queues for distributing workloads

A Queue provides the ability for inter-process communication. It allows us to pass Python objects between processes, which is essential in any task that where data is required to be exchanged between separate steps of the computation, or where computations can be broken down in to separate, but inter-dependent stages. 

In the next example, we are going to use a function that checks whether a number is prime or not.

This first example simply runs the prime checker in serial, for a baseline comparison.

In [8]:
import math
import time

def check_prime(n):
    if n % 2 == 0:
        return False
    for i in range(3, int(math.sqrt(n)) + 1, 2):
        if n % i == 0:
            return False
    return True

primes = []
t1 = time.time()
# number_range = range(100000000, 100010000)  # A
# number_range = range(100000000, 100100000)  # B
number_range = range(100000000, 101000000)  # C
# number_range = range(1000000000, 1000100000)  # D
# number_range = range(100000000000, 100000100000)  # E

for possible_prime in number_range:
    if check_prime(possible_prime):
        primes.append(possible_prime)

print("Took:", time.time() - t1)
print(len(primes), primes[:10], primes[-10:])

Took: 25.61171579360962
54208 [100000007, 100000037, 100000039, 100000049, 100000073, 100000081, 100000123, 100000127, 100000193, 100000213] [100999889, 100999897, 100999901, 100999903, 100999919, 100999939, 100999949, 100999979, 100999981, 100999993]


We can vary the workload when we test this function by commenting or uncommenting the `number_range` lines above. The workload is variable though - most numbers are non-prime, some can be cheap to check for, others have multiple factors which must be checked. The sequence of primes is also not predictable, so we can't determine the expected cost given a range of input numbers to check. 

Next we are going to use a task `Pool` to see how it performs when distributing the workload of `check_prime`.

In [11]:
import math
import time
import multiprocessing
#import numpy as np
import itertools

def check_prime(n):
    if n % 2 == 0:
        return False
    for i in range(3, int(math.sqrt(n)) + 1, 2):
        if n % i == 0:
            return False
    return True

primes = []
NBR_PROCESSES = 4
pool = multiprocessing.Pool(processes=NBR_PROCESSES)

t1 = time.time()
# number_range = xrange(100000000, 100010000)  # A
# number_range = xrange(100000000, 100100000)  # B
number_range = range(100000000, 101000000)  # C
# number_range = xrange(1000000000, 1000100000)  # D
# number_range = xrange(100000000000, 100000100000)  # E

# are_primes = pool.map(check_prime, number_range)  # original
# primes = np.array(number_range)[np.array(are_primes)]  # original
#
# note using pool.map is fastest, but uses ram
# using pool.imap is slower but uses less ram
# pool.imap_unordered is even slower
are_primes = pool.map(check_prime, number_range)
primes = [p for p in itertools.compress(number_range, are_primes)]

print("Took:", time.time() - t1)
print(len(primes), primes[:10], primes[-10:])

Took: 13.538913488388062
54208 [100000007, 100000037, 100000039, 100000049, 100000073, 100000081, 100000123, 100000127, 100000193, 100000213] [100999889, 100999897, 100999901, 100999903, 100999919, 100999939, 100999949, 100999979, 100999981, 100999993]


Using a `Pool` is faster than the serial code, but what if we could do better? Let's see how this would work using a queue:



In [None]:
import math
import time
import multiprocessing
from multiprocessing import Pool

FLAG_ALL_DONE = b"WORK_FINISHED"
FLAG_WORKER_FINISHED_PROCESSING = b"WORKER_FINISHED_PROCESSING"

def check_prime(possible_primes_queue, definite_primes_queue):
    while True:
        n = possible_primes_queue.get()
        if n == FLAG_ALL_DONE:
            # flag that our results have all been pushed to the results queue
            definite_primes_queue.put(FLAG_WORKER_FINISHED_PROCESSING)
            break
        else:
            if n % 2 == 0:
                continue
            for i in range(3, int(math.sqrt(n)) + 1, 2):
                if n % i == 0:
                    break
            else:
                definite_primes_queue.put(n)

primes = []

manager = multiprocessing.Manager()
# We could limit the input queue size with e.g. `maxsize=3`
possible_primes_queue = manager.Queue()
definite_primes_queue = manager.Queue()

NBR_PROCESSES = 4
pool = Pool(processes=NBR_PROCESSES)
processes = []
for _ in range(NBR_PROCESSES):
    p = multiprocessing.Process(
        target=check_prime,
        args=(
            possible_primes_queue,
            definite_primes_queue))
    processes.append(p)
    p.start()

t1 = time.time()
# number_range = xrange(100000000, 100010000)  # A
# number_range = xrange(100000000, 100100000)  # B
number_range = range(100000000, 101000000)  # C
# number_range = xrange(1000000000, 1000100000)  # D
# number_range = xrange(100000000000, 100000100000)  # E

for possible_prime in number_range:
    possible_primes_queue.put(possible_prime)
print("ALL JOBS ADDED TO THE QUEUE")

# add poison pills to stop the remote workers
for n in range(NBR_PROCESSES):
    possible_primes_queue.put(FLAG_ALL_DONE)

print("NOW WAITING FOR RESULTS...")
processors_indicating_they_have_finished = 0
while True:
    # block whilst waiting for results
    new_result = definite_primes_queue.get()
    if new_result == FLAG_WORKER_FINISHED_PROCESSING:
        print("WORKER {} HAS JUST FINISHED".format(processors_indicating_they_have_finished))
        processors_indicating_they_have_finished += 1
        if processors_indicating_they_have_finished == NBR_PROCESSES:
            break
    else:
        primes.append(new_result)
assert processors_indicating_they_have_finished == NBR_PROCESSES

print("Took:", time.time() - t1)
print(len(primes), primes[:10], primes[-10:])

### Summary and strategies for effective use of multiprocessing

Multiprocessing is a very useful tool built in to the core Python library. It requires no extra installation of python packages. Multiprocessing provides _process based_ parallelism - it works by creating forks of the Python interpreter process at the operating system level, and having these worker tasks carry out work in parallel. 

The parallelism is therefore managed by how the operating system manages processes and whether it can distribute each process to multiple CPU cores available on the system.

There is a relatively high overhead to communication between worker processes because they are managed through operating system calls.

Multiprocessing is well suited to a class of problems known as _embarassingly parallel_ - i.e. tasks that do not rely on each other completing in a particular order or that have inter-task dependencies. For example, running a numerical model with multiple parameter input files for a sensitivity analysis - assuming each model run does not depend on the results from other model runs. 

#### Strategies for efficient multiprocesssing

 -  Split tasks into independent units of work
 - If worker tasks may take variable amounts of time (for example, dealing with different sized input files), consider randomising the sequence of work.
 - As a rule of thumb, align the number of jobs/processes with the number of CPUs available on the system.
 
#### Taking it further

The latest documentation for the multiprocessing module is here: https://docs.python.org/3.7/library/multiprocessing.html

The Software Carpentry organisation have a more in depth tutorial on multiprocessing here: http://swcarpentry.github.io/python-intermediate-mosquitoes/04-multiprocessing.html

