# Parallel Programming in Python

Before we get into it, let's first discuss two common parallel programming paradigms: 

* shared memory parallel programming (sometimes referred to as threading or vectorization)
* distributed memory parallel programming

The first of these, is a paradigm where there are a number of threads being executed in parallel, but all the threads have access to the same memory. For example, openMP is a shared memory programming paradigm. Shared memory is both a blessing and a curse. While sharing memory doesn't have very much overhead in terms of communication, it also means that the programmer must be very careful when changing addresses in memory, since depending on the order in which different threads access memory and execute their programs, the overall program may yield different results (this is called a race condition). 

Distributed memory parallel programming avoids this issue by doing exactly what its name says, distributing all data in memory to all processes. In practice, this means that when we run a distributed memory program, each process runs its own separate version of the code, and the programmer has to tell the processes exactly how memory needs to be distributed.

We will address both of these paradigms in this lesson, focusing more on distributed memory programming for reasons that will become clear in a moment. 



## Threading

In [1]:
from __future__ import print_function, division
import urllib3
import shutil
import re
import os

urllib3.disable_warnings()

 
def download_xkcd(start, stop):
    
    http = urllib3.PoolManager()
    
    path = "xkcd_"
    content = True
    for i in range(start, stop):
            url = "http://www.xkcd.com/"+str(i)+"/"
            rd = http.request('GET', url)
            data = rd.data
            res = re.search(b"/comics/[a-z0-9_()]*.(jpg|png)", data)
            if res:
                imgurl = "http://imgs.xkcd.com"+res.group()
                with http.request('GET', imgurl, preload_content=False) as r, open(path+str(i)+imgurl[-4:], 'wb') as out_file:       
                    shutil.copyfileobj(r, out_file)                
            else:
                if re.search(b"Not Found", data) and i != 404:
                        content = False

In [13]:
%%timeit
download_xkcd(10,20)

195 ms ± 6.23 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [4]:
from queue import Queue
from threading import Thread
import threading

In [34]:
def download_worker(q):
    
    http = urllib3.PoolManager()

    path = "xkcd_"
    
    while not q.empty():
        i = q.get()
        q.task_done()
        url = "http://www.xkcd.com/"+str(i)+"/"
        rd = http.request('GET', url)
        data = rd.read()
        res = re.search(b'/comics/[a-z0-9_()]*.(jpg|png)', data)
        if res:
            imgurl = "http://imgs.xkcd.com"+res.group()
            with http.request('GET', imgurl, preload_content=False) as r, open(path+str(i)+imgurl[-4:], 'wb') as out_file:       
                shutil.copyfileobj(r, out_file)               
        else:
            if re.search(b"Not Found", data) and i != 404:
                print('Failed to download comic {}'.format(i))

In [35]:
class DownloadThread(threading.Thread):
    def __init__(self, function, args):
        self.running = False
        self.function = function
        self.args = args
        super(DownloadThread, self).__init__()

    def start(self):
        self.running = True
        super(DownloadThread, self).start()

    def run(self):
        while self.running:
            self.function(*self.args)

    def stop(self):
        self.running = False

In [36]:
def download_xkcd_threaded(nthreads, start, stop):
    threads = []

    q = Queue(maxsize=0)
    
    for i in range(start, stop):
        q.put(i)
        
    for i in range(nthreads):
        t = DownloadThread(download_worker, (q,))
        t.start()
        threads.append(t)  
        
    q.join()
    
    while len(threads)>0:
        del threads[-1]

In [26]:
%pdb

Automatic pdb calling has been turned OFF


In [38]:
%%timeit
download_xkcd_threaded(5, 10, 20)

176 ms ± 29.5 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


In [39]:
%%timeit
download_xkcd_threaded(10, 10, 20)

345 ms ± 53.8 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


In [40]:
%%timeit
download_xkcd_threaded(20, 10, 20)

774 ms ± 189 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


## Exercise 1
Why does increasing the number of threads more not yield faster results?

## The GIL
Unfortunately, we can't use threading to do all of our parallel programming in python. The reason for this is something called the Global Interpreter Lock (GIL). The GIL is the bane of threading in python. It is a mutex (mutual exclusion object) that protects access to python objects, preventing multiple threads from executing python bytecodes at once. Note that the GIL only exists in Cython (the most common implementation of the python language in C). If many threads are running then the GIL is passed between the different threads. Sometimes the GIL is passed voluntarily, but if one thread holds the GIL for a long time (1000 bytecodes in python 2) then it will have the GIL forcibly revoked.

**Note that the GIL does not prevent all race conditions**


Let's take a quick look at why:


In [20]:
def threaded_foo():

    def foo():
        global n
        n += 1
                
    threads = []
    for i in range(100):
        t = threading.Thread(target=foo)
        threads.append(t)

    for t in threads:
        t.start()

    for t in threads:
        t.join()

    print(n)    

In [21]:
n=0
for i in range(100):
    threaded_foo()

100
200
300
400
500
600
700
800
900
1000
1100
1200
1300
1400
1500
1600
1700
1800
1900
2000
2100
2200
2300
2400
2500
2600
2700
2800
2900
3000
3100
3200
3300
3400
3500
3600
3700
3800
3900
4000
4100
4200
4300
4400
4500
4600
4700
4800
4900
5000
5100
5200
5300
5400
5500
5600
5700
5800
5900
6000
6100
6200
6300
6400
6500
6600
6700
6800
6900
7000
7100
7200
7300
7400
7500
7600
7700
7800
7900
8000
8100
8200
8300
8400
8500
8600
8700
8800
8900
9000
9100
9200
9300
9400
9500
9600
9700
9800
9900
10000


## Exercise 2
Why doesn't n increment by 100 every time?

Practically, what this means is that only latency bound programs, programs that spend most of their waiting for other things to happen such as reading or writing a file, will experience speed ups from threading.

The way to get around the issue of the GIL is to use multiple python interpreters in order to parallelize code. One common implementaion of such parallelism is the multiprocessing package.

## Multiprocessing

The most basic way to use the multiprocessing module is to use the ``Process`` class. The syntax is very similar to ``Threading``, except now each process is given its own python interpreter, and thus each one has its own GIL, and doesn't need to pass it around.

In [22]:
import multiprocessing as mp
import random
import string

random.seed(123)

# Define an output queue
output = mp.Queue()

# define a example function
def rand_string(length, output):
    """ Generates a random string of numbers, lower- and uppercase chars. """
    rand_str = ''.join(random.choice(
                        string.ascii_lowercase
                        + string.ascii_uppercase
                        + string.digits)
                   for i in range(length))
    output.put(rand_str)

# Setup a list of processes that we want to run
processes = [mp.Process(target=rand_string, args=(5, output)) for x in range(4)]

# Run processes
for p in processes:
    p.start()

# Exit the completed processes
for p in processes:
    p.join()

# Get process results from the output queue
results = [output.get() for p in processes]

print(results)

['ZVby6', 'mgGYI', '1eUxg', 'dxdX4']


### The ``Pool`` class
A more convenient approach to simple parallel processing is using ``Pool``. Let's set up a simple Monte Carlo integration. We'll go over ``apply``, and ``apply_async``, but there are also ``map`` and ``map_asyn`` functions that are similar to they builtin python map functions.

In [23]:
import numpy as np


In [33]:
def counts_in_a_circle(n):
    np.random.seed()
    x = np.random.uniform(-1, 1, size=n)
    y = np.random.uniform(-1, 1, size=n)
    
    n_in_circle = np.sum(np.sqrt(x**2 + y**2)<1)
    
    return n_in_circle, n

In [34]:
def monte_carlo_area_mp(n_proc, n_samples):

    pool = mp.Pool(processes=n_proc)
    results = np.array([pool.apply(counts_in_a_circle, args=(n_samples//n_proc,)) for x in range(n_proc)])    
    
    area = 4*np.sum(results[:,0])/np.sum(results[:,1])
    
    print('Monte carlo area, fractional error: {}, {}'.format(area, np.abs(area-np.pi)/np.pi))

In [38]:
%%timeit
monte_carlo_area_mp(2, 100000000)

Monte carlo area, fractional error: 3.14186344, 8.619399141301981e-05
Monte carlo area, fractional error: 3.1415376, 1.7524101901078094e-05
Monte carlo area, fractional error: 3.14180432, 6.737551094184428e-05
Monte carlo area, fractional error: 3.14147244, 3.8265174084851955e-05
Monte carlo area, fractional error: 3.14145104, 4.50770056490629e-05
Monte carlo area, fractional error: 3.14133904, 8.072771290168311e-05
Monte carlo area, fractional error: 3.14142656, 5.286923166283999e-05
Monte carlo area, fractional error: 3.14111948, 0.00015061583151223393
7.71 s ± 1.18 s per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [40]:
%%timeit
monte_carlo_area_mp(4, 100000000)

Monte carlo area, fractional error: 3.14158552, 2.2706921551221597e-06
Monte carlo area, fractional error: 3.1414928, 3.178438480209791e-05
Monte carlo area, fractional error: 3.1418304, 7.567703277353008e-05
Monte carlo area, fractional error: 3.1415322, 1.924297528648465e-05
Monte carlo area, fractional error: 3.14151052, 2.6143933618850822e-05
Monte carlo area, fractional error: 3.14190312, 9.88245276967883e-05
Monte carlo area, fractional error: 3.1415612, 1.0011988587136655e-05
Monte carlo area, fractional error: 3.14154516, 1.511767916145237e-05
7.4 s ± 708 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [41]:
def monte_carlo_area_async(n_proc, n_samples):

    pool = mp.Pool(processes=n_proc)
    results = [pool.apply_async(counts_in_a_circle, args=(n_samples//n_proc,)) for x in range(n_proc)]
    print('Function called')

    output  = np.array([p.get() for p in results])
    print('Results returned')
    
    area = 4*np.sum(output[:,0])/np.sum(output[:,1])
    
    print('Monte carlo area, fractional error: {}, {}'.format(area, np.abs(area-np.pi)/np.pi))

In [42]:
monte_carlo_area_async(8, 100000000)

Function called
Results returned
Monte carlo area, fractional error: 3.14151568, 2.450145460622952e-05


## MPI
MPI (Message Passing Interface) is probably the most useful parallel and widely used programming paradigm for scientific computing. It is similar to multiprocessing, but is much more explicit about communication, is more flexible and can scale to many machines.

The idea behind MPI is very simple. You write your program as if each process is running it independently. Every process will allocate different memory, and run all computations independently. The programming must explicitly outline exceptions to this. We will see how this works below using the same monte carlo program as above. 

Th


I will first demonstrate how to use the simplest functionality in MPI, sending and receiving buffers between individual tasks.

In [43]:
%%file monte_carlo_area_sendrcv.py

from __future__ import print_function, division
from mpi4py import MPI
import numpy as np
import sys

def monte_carlo_area_sendrcv(n_samples):
    
    comm = MPI.COMM_WORLD
    
    rank = comm.Get_rank()
    size = comm.Get_size()
    
    print('I am rank {} of {}'.format(rank, size))
    
    n_samples_per_task = n_samples // size
    
    x = np.random.uniform(-1, 1, size=n_samples_per_task)
    y = np.random.uniform(-1, 1, size=n_samples_per_task)
    
    n_in_circle = np.sum(np.sqrt(x**2 + y**2)<1)

    n_in_circle_sum = np.array([0])
    draws_sum = n_samples_per_task * size
    
    if rank==0:
        n_in_circle_sum = n_in_circle
        
        for i in range(size-1):
            temp = comm.recv()
            print(temp)
            n_in_circle_sum += temp[0]
            
    else:
        comm.send([n_in_circle], 0)

    area = 4 * n_in_circle_sum/draws_sum
    
    print('[rank {}]: Monte carlo area, fractional error: {}, {}'.format(rank, area, np.abs(area-np.pi)/np.pi))

if __name__=='__main__':
    
    n_samples = int(sys.argv[1])
    
    monte_carlo_area_sendrcv(n_samples)
    

Writing monte_carlo_area_sendrcv.py


The first few lines in this code are generic to almost any MPI code. If you have used MPI in C or some other language before, you'll notice that I don't have to call MPI_Init. This is done automatically in the MPI import statement.

Note the Barrier command. This forces all tasks to wait at that command until every one of them has reached that point in the program.

Another commonly used set of functions is scatter and gather. These are inverses in some sense, with scatter dividing up an array on the root process and distributing those pieces to the other processes. Gather then takes many arrays on different processes and combines them in the root process.

In [44]:
%%file monte_carlo_area_scattergather.py

from __future__ import print_function, division
from mpi4py import MPI
import numpy as np
import sys

def monte_carlo_area_scattergather(n_samples):
    
    comm = MPI.COMM_WORLD
    
    rank = comm.Get_rank()
    size = comm.Get_size()
    
    print('I am rank {} of {}'.format(rank, size))
    
    comm.Barrier()
    
    if rank == 0:
        n_samples_per_task = [n_samples // size for i in range(size)]
    else:
        n_samples_per_task = None
    
    n_samples_per_task = comm.scatter(n_samples_per_task, root=0)
    
    x = np.random.uniform(-1, 1, size=n_samples_per_task)
    y = np.random.uniform(-1, 1, size=n_samples_per_task)
    
    n_in_circle = np.sum(np.sqrt(x**2 + y**2)<1)

    n_in_circle_sum = np.array([0])
    draws_sum = n_samples_per_task * size

    results = comm.gather(n_in_circle, root=0)
    
    if rank==0:
        n_in_circle_sum = np.sum(results)

    area = 4 * n_in_circle_sum/draws_sum
    
    print('[rank {}]: Monte carlo area, fractional error: {}, {}'.format(rank, area, np.abs(area-np.pi)/np.pi))

if __name__=='__main__':
    
    n_samples = int(sys.argv[1])
    
    monte_carlo_area_scattergather(n_samples)
    

Writing monte_carlo_area_scattergather.py


You'll notice that in the last example I gathered everything and then had to sum the elements manually. This is a very common operation that you will have to perform, and as such there is a command which does this for you, ``Reduce``.

In [45]:
%%file monte_carlo_area_reduce.py

from __future__ import print_function, division
from mpi4py import MPI
import numpy as np
import sys

def monte_carlo_area_reduce(n_samples):
    
    comm = MPI.COMM_WORLD
    
    rank = comm.Get_rank()
    size = comm.Get_size()
    
    print('I am rank {} of {}'.format(rank, size))
    
    comm.Barrier()
    
    n_samples_per_task = n_samples // size
    
    x = np.random.uniform(-1, 1, size=n_samples_per_task)
    y = np.random.uniform(-1, 1, size=n_samples_per_task)
    
    n_in_circle = np.sum(np.sqrt(x**2 + y**2)<1)

    n_in_circle_sum = np.array([0])
    draws_sum = n_samples_per_task * size
    
    n_in_circle_sum = comm.reduce(n_in_circle, root=0, op=MPI.SUM)
    
    if n_in_circle_sum is None:
        n_in_circle_sum = 0
        
        
    area = 4 * n_in_circle_sum/draws_sum
    
    print('[rank {}]: Monte carlo area, fractional error: {}, {}'.format(rank, area, np.abs(area-np.pi)/np.pi))

if __name__=='__main__':
    
    n_samples = int(sys.argv[1])
    
    monte_carlo_area_reduce(n_samples)

Writing monte_carlo_area_reduce.py


## Exercise 3
Look into the bcast function. Write a new monte carlo function which makes use of it.

There are many variations of these functions that will be useful in various applications. Some common variations are 

* all{gather, reduce}
* i{send,recv,...}
* Gather, Reduce, ....