[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/JamesFergusson/Introduction-to-Research-Computing/blob/master/09_Parallelisation.ipynb)

# Parallelisation

Sometimes projects you work on will be too large or slow for even a highly optimised code to run on your desktop/laptop.  In this case the only way to get performance benefits is to write the code to take advantage of multiple CPUs. To do this can be tricky and is something you should consider in the prototyping stage as often serial code can prove impossible to parallelise efficiently without wholesale changes.

Parallelisation is becoming more important as since early 2000 serial performance of CPU's has not improved but the parallel performance has increased by 32x per core. This is the only way we are keeping up with Moores law and currently cutting edge CPUs are being built with 70+ cores which will filter down to normal computing soon so parallelisation is becoming more and more important.

![](Plots/CPUClock.png)

The parallelisation model you settle on for your code should also take into account of the likely computer architecture you want to run on and you will need to consider tradeoffs between memory constraints and communication constraints.  In this section we will have a look at some of the issues we face when we try to think parallel.

## Parallelisation levels

There are multiple levels in which you can parallelise your code

0. Multiple serial jobs. The first option for any code as it has perfect scaling and almost no overhead
1. CPU level parallelism like SIMD (single instruction multiple data) or Vectorisation. Works on single CPU, mostly done by compilers
2. Multicore or Thread based parallelism. Works on multiple CPU's on the same machine
3. Distributed Computing or Process based parallelism.  Uses multiple machines/nodes

The 0th order approach is to just run multiple instances of your serial code. This works best when scanning parameter space or processing multiple data files.  You just break your inputs into blocks and run many simultaneous jobs each processing their own block.  If you can do this, do it.  It is simple, scales perfectly and is trivial to implement.  You will never write parallel code that out-performs this solution.

The 1st, vectorisation, is hard to control in python. You mainly access it by using optimised libraries which are written in a compiled code like C where there are lots of things you can do, this is why you should try to use numpy and scipy for calculations.  An example of vectorisation is when you apply an operation to a array.  The array can then be read into memory in blocks, which match the size of the CPU register, and operated on simultaneously with the same instruction.  If the register can hold 8 floating point numbers then you get a speed up of a factor 8 (in theory).

The 2nd, thread based parallelism, can be done in python with the `threading` or `multiprocessing` packages.  This creates multiple threads which share the same data and can all operate on it.  Unfortunately this is usually pointless due to the Global Interpreter Lock (GIL).  The GIL blocks more than one thread from accessing the interpreter at a time so only one thread can do anything at one time.  As a result threaded code in python seldom runs any faster (there are some exceptions).  This is a general problem with python in that as it is designed to be interactive it is inherently opposed to parallelism.  Numpy does (sometimes) release the GIL so you can get a benefit with multi-threads, otherwise see https://scipy-cookbook.readthedocs.io/items/ParallelProgramming.html.  We will revisit this when we come to Cython which supports `OpenMP` which is the industry standard for thread parallelism and here we can do it much more easily.  You can however utilise packages which support threading as they are compiled in C so avoid the GIL.

The third, process based parallelism, can be done in python as it creates multiple interpreters so the GIL is not a problem.  In this case we make multiple separate instances of your code which run independently and can communicate with each other.  This is the case we will look at here with the package `mpi4py` which wraps the industry standard Message Passing Interface (MPI) used in high performance computing. Please note that the version of MPI that the mpi4py package was compiled against should be the same as the one used to run your MPI python code, otherwise it will fail. You can (probably) trust your cluster admin to sort that out, but this issue has a high chance to come up if you decide to run everything on your own machine, especially when installing things through a package manager. 

MPI is arguably the hardest so if you can master it then the rest should be easy(-er) to pick up.

## MPI in python

First we should note that parallel computing is hard.  You need to think carefully about what you are doing as parallel programmes can be difficult to debug as most debuggers can't track multiple processors.  Let's begin with a classic analogy to get us used to the idea of parallel computing, painting a heptagonal wall with a lovely rainbow pattern.

You have:
1. 7 painters who are ready to paint
2. 7 pots of paint, one in each colour of the rainbow
3. 7 walls that need to be painted with 7 stripes

What is the best way to co-ordinate this?  For the analogy to work we have to assume the painters are fixed as they are the CPU's and in high performance computing "take the back off the computer and move the chips around" is seldom the best approach.  It is fairly obvious that we should get each painter to paint a stripe in one colour then pass the paint to the right.  When they receive the paint from the left they can then paint the next stripe.  If they do this 6 times the wall is finished.

Now we can translate this to MPI speak.
1. Each painter is a `rank` which controls one process
2. Each pot of paint represents a data process
3. Each wall represents a block of data.
4. Passing the paint is a `point to point message` either a `send` or a `receive`

The main point is that it would be ridiculous to keep the paint and to try to pass the walls instead.  Similarly the goal of parallelisation is to complete the process with the smallest amount of communication possible.  This is because communication is usually expensive, somewhere between reading from memory and reading from disk (the one it's closer to is very system dependent). As such, 'distributed' usually applies to the largest data objects. 

Note that since your data objects are probably going to be rather abstract arrays, it helps immensely to plan your MPI code with pen and paper first, drawing some type of scheme for which cells you are going to pass around between processes before you actually start typing.  

### Basic commands


Let's try some simple examples to see what MPI commands look like:

**Example 1:**
Here we just sent a message to each of the other ranks to say hello using `send` and `recv`

In [None]:
# Contents of Code/mpitest1.py (example x will be in file mpitest'x'.py)
# Run this with: mpiexec -n 8 python mpitest1.py
from mpi4py import MPI

# Set up communication stuff
comm = MPI.COMM_WORLD

# Find out which rank I am
rank = comm.Get_rank()

# Find out how many ranks there are
size = comm.Get_size()

print( "[{}] Starting".format(rank) )

# Send a message to all other ranks
for i in range(1,size):
#   Decide who I will send the message to
#   And who my message will come from
    rank_send = (rank+i)%size
    rank_recv = (rank-i)%size
#   Send the message, tag is not required here but is usefull later
    comm.send("Hello from {}".format(rank), dest=rank_send, tag=11)
#   collect my message
    message = comm.recv(source=rank_recv, tag=11)
#   Print what happened
    print("[{}] sent:{} recv:{} Message: {}".format(rank,rank_send,rank_recv,message))

The tag helps you keep track when you receive multiple messages eg: I could send 5 messages from 0 to 1 and tag would tell me which was which.

**Example 2:**
Send an array or buffer type object to the other ranks.  Note here we must use `Send` and `Recv` not `send` and `recv`.  This capitalisation difference exists for all commands.  From here on we will only demonstrate one

In [None]:
# Send an array (or buffer like object) to all other ranks
data_s = np.random.randint(1,9,(4))
data_r = np.zeros((4), dtype='int')

print( "[{}] Starting with array ".format(rank), data_s )

for i in range(1,size):
#   Decide who I will send the message to
#   And who my message will come from
    rank_send = (rank+i)%size
    rank_recv = (rank-i)%size
#   Send the message, note capital "S"
    comm.Send(data_s, dest=rank_send, tag=11)
#   collect my message, note capital "R"
    comm.Recv(data_r, source=rank_recv, tag=11)
#   Print what happened
    print("[{}] sent:{} recv:{} Array: ".format(rank,rank_send,rank_recv),data_r)

**Example 3:**
This type of operation where you are passing information around the ranks is best done with `sendrecv` which combines the two (we will discuss why later)

In [None]:
# Send an array (or buffer like object) to all other ranks
data_s = np.random.randint(1,9,(4))
data_r = np.zeros((4), dtype='int')

print( "[{}] Starting with array ".format(rank), data_s )

for i in range(1,size):
    #   Decide who I will send the message to
    #   And who my message will come from
    rank_send = (rank+i)%size
    rank_recv = (rank-i)%size
    #   Send and recieves messages simultaneously
    comm.Sendrecv(data_s, dest=rank_send, recvbuf=data_r, source=rank_recv)
    #   Print what happened
    print("[{}] sent:{} recv:{} Array: ".format(rank,rank_send,rank_recv),data_r)

**Example 4:**
Send data from one rank to all others using a broadcast `bcast` (`Bcast` works the same here except `None` becomes an empty array)

In [None]:
#initilise the data only on rank 0
if rank==0:
    data = "0 is the best rank"
else:
    data = None

#Send the data to all other ranks
data = comm.bcast(data, root=0)

print("[{}] I just heard that {}".format(rank,data) )

**Example 5:**
More usefully we can break a data object up and send a part to each rank with `Scatter` or put it back together with `Gather`.  There is also `Allgather` for a `Gather` +`Bcast`

In [None]:
# Create data
data_r = np.empty(5,dtype='int')

if rank==0:
    data_s = np.arange(5*size)
else:
    data_s = None

# Scattering
comm.Scatter(data_s,data_r,root=0)

print("[{}] My data is ".format(rank),data_r)

if rank==3:
    data_s = np.empty(5*size, dtype='int')
# Gather it back up
comm.Gather(data_r,data_s,root=3)

print("[{}] My data now is ".format(rank),data_s)

There is also `alltoall` which combines both

**Example 6:**
The most useful (in my opinion anyway) is the global reduce which takes buffers form each rank then combines them with an operation like: `MIN`, `MAX`, `SUM`, `PROD`,....

In [None]:
# Create data
data_s = np.random.randint(1,9,(5))
data_r = np.empty(5,dtype='int')

print( "[{}] Starting with data: ".format(rank),data_s )

comm.Reduce(data_s,data_r,op=MPI.SUM,root=0)

if rank==0:
    print("summed data: ",data_r)

We also have `Allreduce` which is a `Reduce` combined with a `Bcast`.  One other usefull command is `Barrier` which has no arguments and simply holds all processes until all reach it so forces syncronisation.  Don't be tempted to use this unless you have to as it just slows down your code.

All the above commands are blocking commands in that the process doesn't move on until they are complete (in theory).  This means that we always have to wait until comunications have completed before we can continue with our code.  Communication is usually slow so there are also non-blocking versions of `send` and `recv` we can use.  This allows us to send information then continue calculation while we wait for it to arrive.  The non-blocking versions are called `Isend` and `Irecv` and work like this:

**Example 7:**
Use non-blocking commands so we can do stuff while we wait

In [None]:
# Send an array (or buffer like object) to all other ranks
data_s = np.random.randint(1,9,(200))
data_r = np.zeros((200), dtype='int')


for i in range(1,size):
#   Decide who I will send the message to
#   And who my message will come from
    rank_send = (rank+i)%size
    rank_recv = (rank-i)%size
#   Send the message, note capital "S"

#   Create recieve
    request = comm.Irecv(data_r, source=rank_recv, tag=11)
#   Create request
    comm.Isend(data_s, dest=rank_send, tag=11)
    
#   Count while I wait for message to arrive
    i=0
    while not request.Get_status():
        i+=1

#   collect my message, note capital "R"
#   Print what happened
print("[{}] while waiting I counted to ".format(rank),i)

### Deadlocks

One key thing to watch our for when using MPI is deadlocks.  This is when you have one rank stuck waiting for a message that another rank can't send.  This usually happens when you get you send\recv out of order which is easier to do than you think as you never know the order the ranks will meet the code.  It is possible you already met this as some of the examples are not safe.  The first and second example can both deadlock if they happen to have large data that they can't buffer.  

What happens if all ranks enter the `send` at the same time? As the sends are blocking sends they won't complete until the data is received by `recv`. But as all ranks are held in the `send` command, none make it to the `recv` and the code stalls forever.  Here this is unlikely as the message is short so it is sent to buffer allowing the send to move onto the receive.  This is the danger with parallel programming.  Small tests like this can be fine but if I then ported the code to a large machine with slow communication and I was sending large amounts of data this may happen most of the time. Then when I move back to a small example to see what was going on it would work fine.  This is why we have `sendrecv` which both sends and waits for data so can't deadlock. This solution forces the code to synchronise which can slow the code down.  The alternative is to use non-blocking communication which is more efficient but you have to check the messages complete before you try to use the data sent.  Collectives can also block if not all ranks can get to them.  For example this code which to new people sort of looks sensible will stall forever:

In [None]:
data_r = np.empty(5,dtype='int')

if rank==0:
    data_s = np.arange(5*size)
else:
    data_s = None

if rank==0:
    comm.Scatter(data_s,data_r,root=0)

The best idea is to try to use collectives wherever possible, then non-blocking, then blocking.

You should note that collectives can be either blocking or non-blocking depending on the implementation so don't rely on them to sync your ranks.  Also make sure all ranks see the same collectives in the same order as this may deadlock the code too (basically don't use them in if statements like above)

## Parallel patterns

There are lots of different approaches to parallelising code.  The best one for your code is quite problem specific.  Here we will look as some basic "patterns" that are commonly used.  First there are a few things to consider.

- How much of my code can be parallelised? If 30% of your code is serial then the maximum speedup you can achieve is $1/0.3 = 3.3$x.  This gives you a benchmark to work against and whether the effort will be worth it. This is where profiling is important.

- How will my code scale?  If you plan to use you code for large problems but only use a small test data set you need to check how your approach will work on the full solution. Suppose you have two approaches, the first is faster on the small problem but scales as $O(N^2)$ (eg gaussian smoothing in 2D), the second is slower but scales as $O(N\ln(N))$ (eg gaussian smoothing via fft then weighting modes then ifft back).  Here the first would be faster for small problems but the second would be best for the large problem.

- Load balance.  How will you ensure that each CPU has the roughly same amount of work to do?

Parallelisation also has costs

- Efficiency (Sometimes we will chose algorithms that parallelise well but are slower for serial. Also communication can have significant costs)
- Simplicity (Code gets harder to read and maintain as we will see)
- Portability (parallel code is often written with a particular system in mind so it can be hard to migrate to new machines)


With this lets look at some simple parallelisation patterns (in order of complexity)

1. Task parallelisim
2. Domain decomposition
3. Divide and conquer
4. Recursive data
5. Pipelines

### Task parallelism

Here we have a list of independent tasks that need to be completed and we just divide them up over the ranks.  The best example is splitting a loop like in the following example:

**Pattern 1**

In [None]:
# Contents of Code/mpipattern1.py (pattern x will be in file mpipattern'x'.py)
# Run this with: mpiexec -n 8 python mpipattern1.py

"""
This routine sums the numbers from 1 to loops
using MPI to parallelise the calculation
"""
from mpi4py import MPI



def divide_loops(loops,size):
    """
    Divides 'loops' into 'size' groups
    
    Divide the number of loops into as close to equal
    size chunks as possible for use with parallelisation
    
    
    Parameters
    ----------
    loops: int
        the number of loops to divide up
    size: int
        the number of groups to break loops into
        
    Returns
    ----------
    start_loop: int
        The loop number to start from
    end_loop: int
        The loop number to stop at 
    """
    # floor division
    loop_rank=loops//size
    # remainder
    auxloop = loops%size
    #calculate start and end
    start_loop = rank*loop_rank
    end_loop = (rank+1)*loop_rank
    
    # allocate remainder across loops
    if(auxloop!=0):
        if (rank < auxloop):
            start_loop = start_loop + rank
            end_loop = end_loop + rank + 1
        else:
            start_loop = start_loop + auxloop
            end_loop = end_loop + auxloop
    # return start and end
    return start_loop, end_loop

# initilise MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
print( "[{}] Starting".format(rank) )

# number to sum to
loops = 1000

# divide number across processors
s,e = divide_loops(loops,size)

# perform partial sums for each section
x=0
for i in range(s,e):
    x += i+1

print( "[{}] Local sum {}".format(rank,x) )

# combine results using reduce
y = comm.reduce(x,op=MPI.SUM,root=0)

# print output
if rank==0:
    print( "[{}] Total sum {}".format(rank,y) )

For multiple loops it is usually best to flatten them.  An example is looping over $i,j$ where $i<=j$.  Here splitting on $i$ would be bad as the ranks would get different amounts of work to do affecting load balance. Instead flatten them into a single loop. So to calculate:

In [None]:
imax = 10
x=0
for ii in range(imax):
    for jj in range(ii,imax):
        x += ii+jj
print(x)

We could do:

**Pattern 2**

In [None]:
"""
This routine sum the triangular double sum of numbers from 1 to imax
using MPI to parallelise the calculation

To ensure load balancing we flatten the loops
"""
from mpi4py import MPI

def divide_loops(loops,size):
    """
    Divides 'loops' into 'size' groups
    
    Divide the number of loops into as close to equal
    size chunks as possible for use with parallelisation
    
    
    Parameters
    ----------
    loops: int
        the number of loops to divide up
    size: int
        the number of groups to break loops into
        
    Returns
    ----------
    start_loop: int
        The loop number to start from
    end_loop: int
        The loop number to stop at 
    """
    # floor division
    loop_rank=loops//size
    # remainder
    auxloop = loops%size
    #calculate start and end
    start_loop = rank*loop_rank
    end_loop = (rank+1)*loop_rank
    
    # allocate remainder across loops
    if(auxloop!=0):
        if (rank < auxloop):
            start_loop = start_loop + rank
            end_loop = end_loop + rank + 1
        else:
            start_loop = start_loop + auxloop
            end_loop = end_loop + auxloop
    # return start and end
    return start_loop, end_loop

def gen(s,e,imax):
    """
    Create generator for part of triangular sum
    
    Parameters
    ----------
    s: int
        the loop number to begin at
    e: int
        the loop number to end at
        
    Returns
    ----------
    None: 
    
    """
    # initilise count
    count = 1
    #triangular loop
    for ii in range(imax):
        for jj in range(ii,imax):
            # gone to far then end
            if count>e:
                break
            # Don't return pair until we have gone s loops
            if count>s:
                yield (ii,jj)
            count += 1

# initilise MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
print( "[{}] Starting".format(rank) )

# double triangular sum numbers from 1 to imax
imax = 100
loops = imax*(imax+1)//2

# divide up the loops
s,e = divide_loops(loops,size)
# create generator for these loops
G1 = gen(s,e,imax)

# compute double sum
x=0
for item in G1:
    x += item[0]+item[1]

print( "[{}] Local sum {}".format(rank,x) )

# combine results using reduce
y = comm.reduce(x,op=MPI.SUM,root=0)

# print output
if rank==0:
    print( "[{}] Total sum {}".format(rank,y) )

For very large jobs with extremely uneven workloads we could instead opt of a master-slave set up where the jobs are distributed by a master process with all the work being done on slaves.   This can be quite tricky to programme but here is a simple (relative to what it could look like) example that calculates $n(n+1)/2$ for $n$ from 0 to 10:

**Pattern 3**

In [None]:
"""
This is a master slave MPI code for calculating n(n+1)/2 for a set of n
The communication code is quite general however
"""
from mpi4py import MPI

# 0 is master
# rest are slaves

# intiilise MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

# Master rank section
if rank==0:
    # define list of tasks
    tasklist = (i for i in range(10))
    
    #set count of finished ranks
    finished = 0
    
    #create status object for MPI_probe
    status = MPI.Status()
    
    #initial allocation of jobs
    for i in range(1,size):
        # check there are enough jobs for the number of ranks
        # -1 is a flag to say "no more work" 
        try:
            message = next(tasklist)
        except StopIteration:
            message = -1
        
        # now we send initial job ID's to slaves
        print("[{}] Sending task: {} to rank {}".format(rank,message,i))
        comm.isend(message, dest=i, tag=i)

    # Now we check for messages from complete jobs
    # then allocate new jobs to the slaves
    while finished<size-1:
        # Check for waiting messages with probe 
        flag = comm.iprobe(status=status, source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG)
        
        # if a message is waiting then enter this loop to recieve it
        if flag==True:
            # status object stores the origin of the message (and tag etc..)
            source = status.source
            #recv the message
            test = comm.irecv(source=source, tag=source)
            reply = test.wait()
            print("[{}] Recieving result: {} from rank {}".format(rank,reply,source))
            
            # now check the reply, -1 means the slave receieved a -1 and it's letting
            # us know that it's finished so we add it to our finshed count
            # otherwise we send it its next job ID (which may be -1 if there are 
            # no more jobs)
            if reply==-1:
                finished +=1
                print("[{}] Recieved termination finished count: {}".format(rank,finished))
            else:
                print("[{}] Done with result {}".format(rank,reply))
                try:
                    message = next(tasklist)
                except StopIteration:
                    message = -1
                print("[{}] Sending task: {} to rank {}".format(rank,message,i))
                comm.isend(message, dest=source, tag=source)

# Slave ranks section
else:
    # this loop keeps us checking for jobs until we recieve a -1
    while True:
        # recvout next job ID
        test = comm.irecv(source=0, tag=rank)
        task = test.wait()
        print("[{}] Recieving task: {} from rank {}".format(rank,task,0))
        
        # is job ID = -1 then no more jobs and we can stop
        # We let the master know we are stopping by sending -1 back
        # Otherwise we do the job associated with the ID we recieve
        if task==-1:
            comm.isend(-1, dest=0, tag=rank)
            print("[{}] Sending termination to rank {}".format(rank,0))
            break
        else:
            # This single line is the actual job
            result = task*(task+1)//2
            
            # now we send our result back to the master
            comm.isend(result, dest=0, tag=rank)
            print("[{}] Sending result {} to rank {}".format(rank,result,0))

# Finished message
print("[{}] I'm done".format(rank))

### Domain Decomposition

Task decomposition distributes the tasks.  Here we will instead distribute the data. A simple example is multiplication of a $n\times m$ matrix $M$ by a $n\times n$ matrix $N$ where $m>>n$.  We split the matrix M up over processors then apply the dot product then gather. This would look like:

**Pattern 4**

In [None]:
"""
A very simple domain decomposition code to multiple matricies in parallel
ie: M(mm,5) . N(5,5) in parallel using MPI
"""
from mpi4py import MPI
import numpy as np

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

# The long matrix dimension
mm = 100
# divide this by number of processors
# we have made no allowances for remainder
# in a prefect world we would keep track of this
# and pad our matrixies accordingly
split = mm//size
 
# Object to recv matrix from scatter
matrix_M_r = np.empty((split,5),dtype='int')

# matric to multiply the large one by
matrix_N = np.arange(1,26).reshape((5,5))

# Initilise the large matrix on rank 0
if rank==0:
    matrix_M_s = np.array([range(i,i+5) for i in range(mm)])
else:
    matrix_M_s = None
    
if rank==0:
    print("[{}] My send data is ".format(rank),matrix_M_s)

# Scatter the large matric to all ranks
comm.Scatter(matrix_M_s,matrix_M_r,root=0)

print("[{}] My recv data is: ".format(rank),matrix_M_r)

# compute multiplication
matrix_M_r = np.dot(matrix_M_r,matrix_N)

# gather reults back together
comm.Gather(matrix_M_r,matrix_M_s,root=0)
    
if rank==0:
    print("[{}] My gath data is \n".format(rank),matrix_M_s)




More usually this is used for large simulations and so we need the different partitions to communicate with each other.  Also we should note that there are many different ways to partition the data.  We can divide up the domain the simulation runs on, either into strips or squares (2D) or planes, columns or cubes (3D).  For nbody type codes where we are tracking particles this could lead to load imbalances when particles congregate in specific cells.  Here we could either have an adaptive mesh or we could partition the particles instead.  Our example for 1D domain decomposition with communication is left as an exercise using our "game of life" code.  Domain decomposition is sufficiently common MPI has specific commands for dealing with this under the heading of Topologies.  There are two main classes: Cartesian and Graph.  This is getting quite advanced so I'm not going to get into general examples but here are what the basic commands look like for future reference:

**Pattern 5**

In [None]:
"""
Cartesian topology
"""
from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

if size==8:
    cartesian3d = comm.Create_cart(dims = [2,2,2],periods = [True,True,True],reorder=True)

    coord3d = cartesian3d.Get_coords(rank)
    print ("In 3D topology, Processor ",rank, " has coordinates ",coord3d)

    left,right = cartesian3d.Shift(direction = 0,disp=1)
    up,down = cartesian3d.Shift(direction = 1,disp=1)
    back,front = cartesian3d.Shift(direction = 2,disp=1)

    print ("In 3D topology, Processor ",rank, " has neighbours ", left," and ", right)
    print ("In 3D topology, Processor ",rank, " has neighbours ", up," and ", down)
    print ("In 3D topology, Processor ",rank, " has neighbours ", back," and ", front)

#     We can also make sub-topologies so we only communicate to a small section of ranks
    cartesian2d = cartesian3d.Sub(remain_dims=[False,True,True])
    rank2d = cartesian2d.Get_rank()
    coord2d = cartesian2d.Get_coords(rank2d)

    print ("In 2D topology, Processor ",rank," has new rank ",rank2d," and coordinates ", coord2d)
    
    left,right = cartesian2d.Shift(direction = 0,disp=1)
    up,down = cartesian2d.Shift(direction = 1,disp=1)

    print ("In 2D topology, Processor ",rank, " has new rank ",rank2d," and neighbours ", left," and ", right)
    print ("In 2D topology, Processor ",rank, " has new rank ",rank2d," and neighbours ", up," and ", down)
    
    
# communication should be done using non-blocking send and receives
# rather than collectives as MPI can reorder process labels to 
# optimise communication.  However we should have the following collectives
# for neighbours (as they exist in the github: https://github.com/erdc/mpi4py/blob/master/src/MPI/Comm.pyx):

# output = cartesian3d.neighbor_allgather(sendobj=rank)
# output = cartesian3d.neighbor_alltoall(sendobj=rank)

# But they currently give: NotImplementedError it doesn't exist for my MPI implementation...

**Pattern 6**

In [None]:
"""
General Graph topology
"""
from mpi4py import MPI
import random

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

index, edges = [0], []
for i in range(size):
    pos = index[-1]
    connections = random.randint(1,5)
    index.append(pos+connections)
    edges1=[]
    for j in range(connections):
        x = random.randint(0,size-1)
        while x==rank or x in edges1:
            x = random.randint(0,size-1)
        edges1.append(x)
        
    edges += edges1

graph = comm.Create_graph(index[1:], edges)

neighs = graph.Get_neighbors(rank)

print ("In Graph we assigned processor ",rank, " to have neighbours ", neighs)

if rank==0:
    print ("Graph has dimensions ",graph.dims)
    print ("Graph has #nodes ",graph.nnodes)
    print ("Graph has #edges ",graph.nedges)
    print ("Graph has index ",graph.index)
    print ("Graph has edges ",graph.edges)

print ("Processor ",rank," has in degree ",graph.indegree)
print ("Processor ",rank," has out degree ",graph.outdegree)
print ("Processor ",rank," has in edges ",graph.inedges)
print ("Processor ",rank," has out edges ",graph.outedges)

# We should be able to only specify connection to nodes,
# of which we can have several, on this rank with the following:

# sources = [rank]
# connections = random.randint(1,5)
# degrees = [connections]
# destinations = []
# for i in range (connections):
#     x = random.randint(0,size-1)
#     while x==rank:
#         x = random.randint(0,size-1)
#     destinations.append(x)
#
# graph = comm.Create_dist_graph(sources,degrees,destinations)

# But they currently give: NotImplementedError so it doesn't exist for my MPI implementation...

### Divide and Conquer

This technique is used when dividing or recombining tasks/data is non trivial so we instead scatter/gather iteratively.  So our algorithm looks like this:

![](Plots/DivideConquer.png)

A good example is sorting a very long list.  We can divide up the elements easily and sort then but merging the data back is difficult.  Instead here we merge pairs of lists back together and slowly recombine the full list

**Pattern 7**

In [None]:
"""
Code to compute a 'merge-sort' using MPI
"""
from mpi4py import MPI
import numpy as np
import math

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

# calculate how many merge loops we will need
# we have assumed that the number or processors is a power of 2
# otherwise we need to have lots of logic for edge cases
# in our merge tree
levels = int(math.log2(size))

def mergelist(list1,list2):
    """
    Merges two sorted list into one sorted list
    
    
    Parameters
    ----------
    list1: numpy array
        our first sorted list
    list2: numpy array
        our second sorted list
        
    Returns
    ----------
    list3: numpy array
        our merged sorted list
    """
    # get sizes (len works on 1d arrays)
    size1 = len(list1)
    size2 = len(list2)
    size3 = size1+size2
    # create blank list
    list3 = np.empty((size3),dtype='int')
    # i counts list 1, j list 2 and k list3
    i=0
    j=0
    k=0
    #loop over both lists until we reach the end of one
    while i<size1 and j<size2:
        # test which is smaller and add it to list3
        # and increment the counter for that list and list3
        if list1[i]<list2[j]:
            list3[k] = list1[i]
            i+=1
        else:
            list3[k] = list2[j]
            j+=1
        k+=1
    
    # add remaining elements for the list
    # we didn't get to the end of
    while i<size1:
        list3[k] = list1[i]
        i+=1
        k+=1
        
    while j<size2:
        list3[k] = list2[j]
        j+=1
        k+=1
    
    return list3
 
# set list length
length = 200
# divide this by number of processors
# we have made no allowances for remainder
# in a prefect world we would keeep track of this
# and pad our arrays accordingly
split = length//size

# create list to recv data from scatter
list1 = np.empty((split),dtype='int')

# create long list on rank 0
if rank==0:
    listall = np.random.randint(0,100,(200))
else:
    listall = None
    
if rank==0:
    print("[{}] My send data is ".format(rank),listall)

#scatter orginal list
comm.Scatter(listall,list1,root=0)

# sort our section
list1 = np.sort(list1)

level = 2
# now merge back.  This is a bit fiddly
# we first send from 1mod2 to 0mod2 then from
# 2mod4 to 0mod4  and so on merging at each step
for i in range(1,levels+1):
    if rank%2**i == 2**(i-1):
        dest = rank - 2**(i-1)
        comm.Send(list1, dest=dest, tag=11)
    elif rank%2**i == 0:
        list2 = np.empty((split*2**(i-1)),dtype='int')
        src = rank + 2**(i-1)
        comm.Recv(list2, source=src, tag=11)
        list1 = mergelist(list1,list2)

#print answer
if rank==0:
    print(list1)  

### Recursive data

Sometimes we will want to perform recursive operations on data which are inherently serial.  In these situations we can expose parallelism by using recasting the problem to expose concurrency.  This can be difficult and often the algorithm is slower but with enough cores the code will still run faster.  A simple example is calculating a cumulative sum for a list.  Here we would normally start at the beginning and compute the sum as we progress along the list.  This is an $O(N)$ operation but is serial so can only use one core.  We can instead recast the problem by calculating recursive partial sums like so:

![](Plots/Recursive.png)

Now the algorithm is $O(N\ln(N))$ but each element is calculated separately. Now we have to be careful as if we have fewer than $O(\ln(N))$ processors/threads then the code will be lower.  This could still be useful if the list is very large so can't fit in memory. Here is an example of how this might look (This ends up being a divide and conquer algorithm like the previous example):

**Pattern 8**

In [None]:
"""
Code to compute a cumulitive sum using MPI after reacting the problem
"""
from mpi4py import MPI
import numpy as np
import math

# initilise
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

# calculate how many send loops we will need
# we have assumed that the number or processors is a power of 2
# otherwise we need to have lots of logic for edge cases
# in our merge tree
levels = int(math.log2(size))

# set list length
length = 200
# divide this by number of processors
# we have made no allowances for remainder
# in a prefect world we would keeep track of this
# and pad our arrays accordingly
split = length//size

# create list to recv data from scatter
list1 = np.empty((split),dtype='int')

# create long list on rank 0
if rank==0:
    listall = np.random.randint(0,100,(length))
else:
    listall = None
    
if rank==0:
    print("[{}] My send data is ".format(rank),listall)

#scatter orginal list   
comm.Scatter(listall,list1,root=0)

# compute cumulitive sum for our section
x=0
for index, item in enumerate(list1):
    x += item
    list1[index] = x

# now send cum sum to other ranks.  This is a bit fiddly
# we first send from 1mod2 to 0mod2 then from
# 2mod4 to 0mod4  and so on adding the new section on at each setp    
for i in range(1,levels+1):
    if rank%2**i == 0:
        dest = rank + 2**(i-1)
        comm.send(list1[-1], dest=dest, tag=11)
    elif rank%2**i == 2**(i-1):
        src = rank - 2**(i-1)
        x = comm.recv(source=src, tag=11)
        list1 = list1+x

# gather the cumultive sums back together
comm.Gather(list1,listall,root=0)

# output result
if rank==0:
    print(listall)

### Pipelines

The final pattern we will meet is pipelines here the data flows through the ranks with different operations being applied at each stage.  This can work well for heavy compute algorithms or ones that use special chips like GPU's.  You may want to input a stream of vectors to be multiplied by several matrixes with different operations inbetween so rather than moving the matrixes you pass the vectors along a production line where the matrixes remain fixed.  This is just like a production line in a car plant for example.  Here the communication is just simple non-blocking send and recv's between the ranks.  I currently don't have a good example but I will try to find one and add it later.

## Scaling

Whatever approach you chose for your code, it is important to always try to check how your final result scales to a larger number of processes on some kind of shorter problem before submitting a 12 hours long job taking up half the cluster only to find out that you didn't benefit at all from increasing the number of tasks from, say, 64 to 128. For example, below is a picture of scaling for one of the matrix multiplication algorithms. We see that, while the computation time scales almost perfectly, the communication time pretty much doesn't benefit from increasing the number of processes and, as a result, this algorithm on the given data set size probably shouldn't ask for more than 80 tasks, as it cannot use those extra resources effectively. You should be able to somewhat predict the scaling for your algorithm, but MPI can absolutely surprise you sometimes, not to mention your predictions might be miscalculated, so testing the scaling is quite a necessary part of your parallel code development.    

![](Plots/scaling_example.png)

### Exercises

1. Task parallelism: Take the code `Solutions/mpi_triangle_int.py` and parallelise it to split the calculation of $E_{ijk}$ across 4 processes.


2. Decomposing a domain: Parallelise the code `Solutions/mpi_GOL.py` to split the game of life across 4 ranks for a 10x40 game board.  Initial starting position is defined inside.  After 60 iterations the picture of a "duck" should be on the last tile

Documentation for mpi4py isn't great but some information can be found here:

https://info.gwdg.de/~ceulig/docs-dev/doku.php?id=en:services:application_services:high_performance_computing:mpi4py

https://mpi4py.readthedocs.io/en/stable/index.html