# Parallelisation

Sometimes projects you work on will be too large for even a highly optimised code to run on your desktop/laptop.  In this case the only way to get performance benefits is to write the code to take advantage of multiple CPUs. To do this can be tricky and is something you should consider in the prototyping stage as often serial code can prove impossible to paralleise efficiently without wholesale changes.

Parallelisation is becoming more important as in since 2005 serial performance of CPU's has not improved but the parallel performance has increased by 32x per core. Also machines are getting more CPU's as standard.

![](Plots/CPUClock.png)

The parallelisation model you settle on for your code should also take into account of the likely computer architecture you want to run on and you will need to consider tradeoffs between memory constrants and communication constraints.  In this section we will have a look at some of the issues we face when we try to think parallel

## Parallelisation levels

There are several levels in which you can paralleise your code

0. Multiple serial jobs. The first option for any code as it has perfect scaling and almost no overhead
1. CPU level parallelisim like SIMD (single instruction multiple data) or Vectorisation. Works on single CPU, mostly done by compilers
2. Multicore or Thread based parallelism. Works on multiple CPU's on the same machine
3. Distributed Computing or Process based paralellism.  Uses multiple machines/nodes

The 0th order approach is to just run multiple instances of your serial code. This works best when scanning parameter space or processing multiple data files.  You just break your inputs into blocks are run many simultaneous jobs each processing their own block.  If you can do this, do it.  It is simple, scales perfectly and is trivial to implement.  You will never write parallel code that out-performs this solution.

The 1st, vectorisation, is hard to control in python. You mainly access it by using optimised libraries which are written in a compiled code like C where there are lots of things you can do, this is why you should try to use numpy and scipy for calculations.  An example of vectorisation is when you apply an operation to a array.  The array can then be read into memory in blocks, which match the size of the CPU register, and operated on simultaneously with the same instruction.  If the register can hold 8 floating point numbers then you get a speed up of a factor 8 (in theory).

The 2nd, thread based parallelism, can be done in python with the `threading` or `multiprocessing` packages.  This creates multiple threads which share the same data and can all operate on it.  Unfortunatly this is usually pointless due to the Global Interpreter Lock (GIL).  The GIL blocks more than one thread from accessing the interpreter at a time so only one thread can do anything at one time.  As a result threaded code in python seldom runs any faster (there are some exceptions).  This is a general problem with python in that as it is designed to be interactive it is inherently opposed to parallelism.  Numpy does (sometimes) release the GIL so you can get a benefit with multi-threads, otherwise see https://scipy-cookbook.readthedocs.io/items/ParallelProgramming.html.  We will revisit this when we come to Cython which supports `OpenMP` which is the industry standard for thread parallelisim and here we can do it much more easily.  You can however utilise packages which support threading as they are compiled in C so avoid the GIL.

The third, process based parallelisim, can be done in python as it creates multiple interpreters so the GIL is not a problem.  In this case we make multiple seperate instances of your code which run independently and can communicate with each other.  This is the case we will look at here with the package `mpi4py` which wraps the industry standard Message Passing Interface (MPI) used in high performance computing.

## MPI in python

First we should note that parallel computing is hard.  You need to think carefully about what you are doing as parallel programmes can be difficult to debug as most debuggers can't track multiple processors.  Let's begin with a classic analogy to get us used to the idea of parallel computing, painting a heptagonal wall with a lovely rainbow pattern.

You have:
1. 7 painters who are ready to paint
2. 7 pots of paint, one in each colour of the rainbow
3. 7 walls that need to be painted with 7 stripes

What is the best way to co-ordinate this?  For the analogy to work we have to assume the painters are fixed as they are the CPU's and in high performance computing "take the back off the computer and move the chips around" is seldom the best approach.  It is fairly obvious the we should get each painter to paint a stripe in one colour then pass the paint to the right.  When they recieve the paint from the left they can then paint the next stripe.  If the do this 6 times the wall is finished.

Now we can translate this to MPI speak.
1. Each painter is a `rank` which controls one process
2. Each pot of paint represents a data process
3. Each wall represents a block of data.
4. Passing the paint is a `point to point message` either a `send` or a `recieve`

The main point is that it would be ridiculoues to keep the paint and to try to pass the walls instead.  Similarly the goal of parallelisation is to complete the process with the smallest amount of communication possible.  This is because communication is usually expensive, somewhere between reading from memory and reading from disk (the one it's closer to is very system dependent). As such 'distributed' usually applies to the largest data objects.

Simple models of using MPI in code would look like:

1. Take a list of tasks then break it into sections which are distributed to each rank (process).  Each process completes its tasks independently then sends the results back to rank 0 which collates the answers.

2. Evolve a large simulation which is distributed across multiple ranks.  Before each time step each rank sends the state of it's cells to its neighbours.  Now the time step can be carried out on each section and the new state can be communicated and we keep repeating this sequence.

3. Rank 0 has a long list of seperate tasks that need completion.  It distributres an initial task each of the other ranks. Then when they return the result to rank 0 it passes them the next task in the list. This is a master-slave system which is useful for large numbers of ranks as no processes wait for messages.  It's unsuitable for small numbers as obviously rank 0 does nothing other than coordinate so is wasted.

### Basic commands

Let's try some simple examples to see what MPI commands look like:

**Example 1:**
Here we just sent a message to each of the other ranks to say hello using `send` and `recv`

In [None]:
# Contents of Code/mpitest1.py (example x will be in file mpitest'x'.py)
# Run this with: mpiexec -n 8 python mpitest1.py
from mpi4py import MPI

# Set up communication stuff
comm = MPI.COMM_WORLD

# Find out which rank I am
rank = comm.Get_rank()

# Find out how many ranks there are
size = comm.Get_size()

print( "[{}] Starting".format(rank) )

# Send a message to all other ranks
for i in range(1,size):
#   Decide who I will send the message to
#   And who my message will come from
    rank_send = (rank+i)%size
    rank_recv = (rank-i)%size
#   Send the message, tag is not required here but is usefull later
    comm.send("Hello from {}".format(rank), dest=rank_send, tag=11)
#   collect my message
    message = comm.recv(source=rank_recv, tag=11)
#   Print what happened
    print("[{}] sent:{} recv:{} Message: {}".format(rank,rank_send,rank_recv,message))

The tag helps you keep track when you recieve multiple messages eg: I could send 5 messages from 0 to 1 and tag would tell me which was which.

**Example 2:**
Send an array or buffer type object to the other ranks.  Note here we must use `Send` and `Recv` not `send` and `recv`.  This capitilisation difference exists for all commands.  From here on we will only demonstrate one

In [2]:
data_s = np.random.randint(1,9,(4))
data_r = np.zeros((4), dtype='int')

for i in range(1,size):
    rank_send = (rank+i)%size
    rank_recv = (rank-i)%size
    
    comm.Send(data_s, dest=rank_send, tag=11)
    comm.Recv(data_r, source=rank_recv, tag=11)
    
    print("[{}] sent:{} recv:{} Array: ".format(rank,rank_send,rank_recv),data_r)

array([8, 6, 7, 3])

**Example 3:**
This type of operation where you are passing information around the ranks is best done with `sendrecv` which combines the two (we will discuss why later)

In [None]:
data_s = np.random.randint(1,9,(4))
data_r = np.zeros((4), dtype='int')

for i in range(1,size):
    rank_send = (rank+i)%size
    rank_recv = (rank-i)%size
    
    comm.Sendrecv(data_s, dest=rank_send, recvbuf=data_r, source=rank_recv)
    
    print("[{}] sent:{} recv:{} Array: ".format(rank,rank_send,rank_recv),data_r)

**Example 4:**
Send data from one rank to all others using a broadcast `bcast` (`Bcast` works the same here except `None` becomes an empty array)

In [None]:
if rank==0:
    data = "0 is the best rank"
else:
    data = None
    
data = comm.bcast(data, root=0)

print("[{}] I just heard that {}".format(rank,data) )

**Example 5:**
More usefully we can break a data object up and send a part to each rank with `Scatter` or put it back together with `Gather`.  There is also `Allgather` for a `Gather` +`Bcast`

In [None]:
data_r = np.empty(5,dtype='int')

if rank==0:
    data_s = np.arange(5*size)
else:
    data_s = None

comm.Scatter(data_s,data_r,root=0)

print("[{}] My data is ".format(rank),data_r)

if rank==3:
    data_s = np.empty(5*size, dtype='int')

comm.Gather(data_r,data_s,root=3)

print("[{}] My data now is ".format(rank),data_s)

There is also `alltoall` which combines both

**Example 6:**
The most useful (in my opinion anyway) is the global reduce which takes buffers form each rank then combines them with an operation like: `MIN`, `MAX`, `SUM`, `PROD`,....

In [None]:
data_s = np.random.randint(1,9,(5))
data_r = np.empty(5,dtype='int')

print( "[{}] Starting with data: ".format(rank),data_s )

comm.Reduce(data_s,data_r,op=MPI.SUM,root=0)

if rank==0:
    print("summed data: ",data_r)

We also have `Allreduce` which is a `Reduce` combined with a `Bcast`.  One other usefull command is `Barrier` which has no arguments and simply holds all processes until all reach it so forces syncronisation.  Don't be tempted to use this unless you have to as it just slows down your code.

All the above commands are blocking commands in that the process doesn't move on until they are complete.  This means that we always have to wait until comunications have completed before we can continue with our code.  Communication is usually slow so these are also non-blocking versions of `send` and `recv` we can use.  This allows us to send information then continue calculation while we wait for it to arrive.  The non-blocking versions are called `Isend` and `Irecv` and work like this:

**Example 7:**
Use non-blocking commands so we can do stuff while we wait

In [None]:
data_s = np.random.randint(1,9,(200))
data_r = np.zeros((200), dtype='int')


for i in range(1,size):
    rank_send = (rank+i)%size
    rank_recv = (rank-i)%size
    
#   Set both send and recieve, order doesn't matter as they don't block
    request = comm.Irecv(data_r, source=rank_recv, tag=11)
    comm.Isend(data_s, dest=rank_send, tag=11)
    
#   Count while I wait for message to arrive
    i=0
    while not request.Get_status():
        i+=1
        
print("[{}] while waiting I counted to ".format(rank),i)

### Deadlocks

One key thing to watch our for when using MPI is deadlocks.  This is when you have one rank stuck waiting for a message that another rank can't send.  This usually happens when you get you send\recv out of order which is easier to do that you think as you never know the order the ranks will meet the code.  It is possible you already met this as some of the examples are not safe.  The first and second example can both deadlock if they happen to be executed at almost exactly the same time.  

What happens if all ranks enter the `send` at the same time? As the sends are blocking sends they won't complete until the data is recieved by `recv`. But as all ranks are held in the `send` command, none make it to the `recv` and the code stalls forever.  Here this is unlikely as the message is short and the communication is fast and all we need is for the first rank to exit `send` before the last starts to avoid a deadlock.  This is the danger with parallel programming.  Small tests like this can be fine but if I then ported the code to a large machine with slow communication and I was sending large amounts of data this may happen most of the time.  This is why we have `sendrecv` which both sends and waits for data so can't deadlock. This solution forces the code to syncronise which can slow the code down.  The alternative is to use non-blocking communication which is more efficent but you have to check the messages complete before you try to use the data sent.  Collectives can also block is not all ranks can get to them.  For example this will stall forever:

In [None]:
data_r = np.empty(5,dtype='int')

if rank==0:
    data_s = np.arange(5*size)
else:
    data_s = None

if rank==0:
    comm.Scatter(data_s,data_r,root=0)

The best idea it to try to use collectives wherever possible then non-blocking then blocking

You should note that collectives can be either blocking or non-blocking depending on the implementation so don't rely on them to sync your ranks.  Also make sure all ranks see the same collectives in the same order as this may deadlock the code too (basicly don't use them in if statements like above)

## Parallel patterns

There are lots of different approaches to parallelising code.  The best one for your code is quite problem specific.  Here we will look as some basic "patterns" used to parallelise code.  First there are a few things to consider.

- How much of my code can be parallelised? If 30% of your code is serial then the maximum speedup you can achieve is $1/0.3 = 3.3$x.  This gives you a benchmark to work against and whether the effort will be worth it.

- How will my code scale?  If you plan to use you code for large problems but only use a small test data set you need to check how you approach will work on the full solution. Suppose you have two approaches the first is faster on the small problem but scales as $n^2$ (eg gaussian smoothing in 2D) the second is slower but scales as $n\ln(n)$ (eg gaussian smoothing via fft then weighting modes then ifft back).  Here the second would be best for the large problem

- Load balance.  How will you ensure that each CPU has the same amount of work to do?

Parallelisation also has costs

- Efficency (Sometimes we will chose algorithms that paralleise well but are slower for serial)
- Simplicity (Code get harder to read and maintain)
- Portability (parallel code is often writn with a particulare system in mind so it can be hard to migrate to new machines)


With this lets look at some simple paralleisation patterns (in order of complexity)

1. Task parallelisim
2. Domain decomposition
3. Divide and conquer
4. Recursive data
5. Pipelines

### Task parallelisim

Here we have a list of independent tasks that need to be completed and we just divide them up over the ranks.  The best example is splitting a loop like in the following example:

**Pattern 1**

In [None]:
# Contents of Code/mpipattern1.py (pattern x will be in file mpipattern'x'.py)
# Run this with: mpiexec -n 8 python mpipattern1.py

from mpi4py import MPI

def divide_loops(loops,size):
    loop_rank=loops//size
    auxloop = loops%size
    start_loop = rank*loop_rank
    end_loop = (rank+1)*loop_rank
    
    if(auxloop!=0):
        if (rank < auxloop):
            start_loop = start_loop + rank
            end_loop = end_loop + rank + 1
        else:
            start_loop = start_loop + auxloop
            end_loop = end_loop + auxloop

    return start_loop, end_loop

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

# sum numbers from 1 to 1000

print( "[{}] Starting".format(rank) )

loops = 1000

s,e = divide_loops(loops,size)

x=0
for i in range(s,e):
    x += i+1

print( "[{}] Local sum {}".format(rank,x) )

y = comm.reduce(x,op=MPI.SUM,root=0)
if rank==0:
    print( "[{}] Total sum {}".format(rank,y) )

For multiple loops it is usually best to flatten them.  An example is looping over $i,j$ where $i<=j$.  Here splitting on $i$ would be bad as the ranks would get different amounts of work to do affecting load balance. Instead flatten them into a single loop. So to calculate:

In [28]:
imax = 10
x=0
for ii in range(imax):
    for jj in range(ii,imax):
        x += ii+jj
print(x)

495


We could do:

**Pattern 2**

In [None]:
from mpi4py import MPI

def divide_loops(loops,size):
    loop_rank=loops//size
    auxloop = loops%size
    start_loop = rank*loop_rank
    end_loop = (rank+1)*loop_rank
    
    if(auxloop!=0):
        if (rank < auxloop):
            start_loop = start_loop + rank
            end_loop = end_loop + rank + 1
        else:
            start_loop = start_loop + auxloop
            end_loop = end_loop + auxloop

    return start_loop, end_loop

def gen(s,e,imax):
    count = 1
    for ii in range(imax):
        for jj in range(ii,imax):
            if count>e:
                break
            if count>s:
                yield (ii,jj)
            count += 1


comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

# double sum numbers from 1 to 50

print( "[{}] Starting".format(rank) )


imax = 10
loops = imax*(imax+1)//2

s,e = divide_loops(loops,size)
G1 = gen(s,e,imax)

x=0
for item in G1:
    x += item[0]+item[1]

print( "[{}] Local sum {}".format(rank,x) )

y = comm.reduce(x,op=MPI.SUM,root=0)
if rank==0:
    print( "[{}] Total sum {}".format(rank,y) )

For very large jobs with extremly uneven workloads we could instead opt of a master-slave set up where the jobs are distributed by a master process.   This can be quite tricky to programme but here is an simple (relative to what it could look like) example that calculates $n(n+1)/2$ for $n$ from 0 to 10:

**Pattern 3**

In [None]:
from mpi4py import MPI

# 0 is master
# rest are slaves

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

if rank==0:
    tasklist = (i for i in range(10))
    
    finished = 0
    
    status = MPI.Status()
    
    for i in range(1,size):
        try:
            message = next(tasklist)
        except StopIteration:
            message = -1
          
        print("[{}] Sending task: {} to rank {}".format(rank,message,i))
        comm.isend(message, dest=i, tag=i)

    while finished<size-1:
        flag = comm.iprobe(status=status, source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG)
        if flag==True:
            source = status.source
            test = comm.irecv(source=source, tag=source)
            reply = test.wait()
            print("[{}] Recieving result: {} from rank {}".format(rank,reply,source))
            if reply==-1:
                finished +=1
                print("[{}] Recieved termination finished count: {}".format(rank,finished))
            else:
                print("[{}] Done with result {}".format(rank,reply))
                try:
                    message = next(tasklist)
                except StopIteration:
                    message = -1
                print("[{}] Sending task: {} to rank {}".format(rank,message,i))
                comm.isend(message, dest=source, tag=source)


else:
    while True:
        test = comm.irecv(source=0, tag=rank)
        task = test.wait()
        print("[{}] Recieving task: {} from rank {}".format(rank,task,0))
        
        if task==-1:
            comm.isend(-1, dest=0, tag=rank)
            print("[{}] Sending termination to rank {}".format(rank,0))
            break
        else:
            result = task*(task+1)//2
            comm.isend(result, dest=0, tag=rank)
            print("[{}] Sending result {} to rank {}".format(rank,result,0))

print("[{}] I'm done".format(rank))

### Domain Decomposition

Task decomposition distributes the tasks.  Here we will instead distribute the data. A simple example is multiplication of a $n\times m$ matrix $M$ by a $n\times n$ matrix $N$ where $m>>n$.  We split the matrix M up over processors then apply the dot product then gather. This would look like:

**Pattern 4**

In [None]:
from mpi4py import MPI
import numpy as np

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

mm = 20
split = mm//size

matrix_M_r = np.empty((split,5),dtype='int')

matrix_N = np.arange(1,26).reshape((5,5))

if rank==0:
    matrix_M_s = np.array([range(i,i+5) for i in range(mm)])
else:
    matrix_M_s = None
    
if rank==0:
    print("[{}] My send data is ".format(rank),matrix_M_s)
    
comm.Scatter(matrix_M_s,matrix_M_r,root=0)

print("[{}] My recv data is: ".format(rank),matrix_M_r)

matrix_M_r = np.dot(matrix_M_r,matrix_N)

comm.Gather(matrix_M_r,matrix_M_s,root=0)
    
if rank==0:
    print("[{}] My gath data is \n".format(rank),matrix_M_s)

More usually ths is used for large simulations and so we need the different partitions to comunicate with each other.  Also we should note that there are many different ways to partition the data.  We can divide up the domain the simulation runs on, either into strips or squares (2D).  For nbody type codes where we are tracking particles this could lead to load imbalances when particles congregate in specific cells.  Here we could have an adaptive mesh or we could partition the particles instead.  Our example for domain decomposition with communication is left as an exercise using our "game of life" code.

### Divide and Conquer

This technique is used when dividing or recomdining tasks/data is non trivial so we instead scatter/gather iteritivley.  So our algorith looks like this:

![](Plots/DivideConquer.png)

A good example is sorting a very long list.  We can divide up the elements easily and sort then but merging the data back is difficult.  Instead here we merge pairs of lists back together and slowly recombine the full list

**Pattern 5**

In [None]:
from mpi4py import MPI
import numpy as np
import math

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

levels = int(math.log2(size))

def mergelist(list1,list2):
    size1 = len(list1)
    size2 = len(list2)
    size3 = size1+size2
    list3 = np.empty((size3),dtype='int')
    i=0
    j=0
    k=0
    while i<size1 and j<size2:
        if list1[i]<list2[j]:
            list3[k] = list1[i]
            i+=1
        else:
            list3[k] = list2[j]
            j+=1
        k+=1
    
    while i<size1:
        list3[k] = list1[i]
        i+=1
        k+=1
        
    while j<size2:
        list3[k] = list2[j]
        j+=1
        k+=1
    
    return list3
            
length = 200
split = length//size

list1 = np.empty((split),dtype='int')

if rank==0:
    listall = np.random.randint(0,100,(200))
else:
    listall = None
    
if rank==0:
    print("[{}] My send data is ".format(rank),listall)
    
comm.Scatter(listall,list1,root=0)

list1 = np.sort(list1)

level = 2
# now merge back
for i in range(1,levels+1):
    if rank%2**i == 2**(i-1):
        dest = rank - 2**(i-1)
        comm.Send(list1, dest=dest, tag=11)
    elif rank%2**i == 0:
        list2 = np.empty((split*2**(i-1)),dtype='int')
        src = rank + 2**(i-1)
        comm.Recv(list2, source=src, tag=11)
        list1 = mergelist(list1,list2)
        
if rank==0:
    print(list1)  

### Recursive data

Sometimes we will want to perform recursive operations on data which are inherently serial.  In these situations we can expose parallelisim by using recasting the problem to expose concurrency.  This can be difficult and often the algorithm is slower but with enough cores the code will still run faster.  A simple example is calcuating a cumulitive sum for a list.  Here we would normaly start at the begining and compute the sum as we progress along the list.  This is an $O(N)$ operation but is serial so can only use one core.  We can instead recast the problem by calculating recursive partial sums like so:

![](Plots/Recursive.png)

Now the algorithim is $O(N\ln(N))$ but each element is calculated separately. Now we have to be carefull as if we have fewer than $O(\ln(N))$ processors/threads then the code will be lower.  This couls still be useful if the list is very large so can't fit in memory. Here is an example of how this might look (This ends up being a divide and conquer algorithm like the previous example):

**Pattern 6**

In [None]:
from mpi4py import MPI
import numpy as np
import math

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

levels = int(math.log2(size))

length = 200
split = length//size

list1 = np.empty((split),dtype='int')

if rank==0:
    listall = np.random.randint(0,100,(length))
else:
    listall = None
    
if rank==0:
    print("[{}] My send data is ".format(rank),listall)
    
comm.Scatter(listall,list1,root=0)

x=0
for index, item in enumerate(list1):
     x += item
     list1[index] = x
     
for i in range(1,levels+1):
    if rank%2**i == 0:
        dest = rank + 2**(i-1)
        comm.send(list1[-1], dest=dest, tag=11)
    elif rank%2**i == 2**(i-1):
        src = rank - 2**(i-1)
        x = comm.recv(source=src, tag=11)
        list1 = list1+x

comm.Gather(list1,listall,root=0)

if rank==0:
    print(listall)

### Exercises

1. Task parallelisim: Take the code in triangle_int.py and parallelise it to split the calculation of $E_{ijk}$ across 4 processes.


2. Decomposing a domain: Parallelise the code in mpi_GOL.py to split the game of life across 4 ranks for a 10x40 game board.  Initial starting position is defined inside.  After 60 iterations the picture of a "duck" should be on the last tile

Documentation for mpi4py isn't great but some information can be found here:

https://info.gwdg.de/~ceulig/docs-dev/doku.php?id=en:services:application_services:high_performance_computing:mpi4py

https://mpi4py.readthedocs.io/en/stable/index.html