# Introduction to MPI

MPI (Message Passing Interface) is the most used protocol for communicating between processes.  It doesn't matter if the processes that want to talk to each other are on the same or different nodes (i.e., computers). In this tutorial, we'll use `mpi4py` to learn about MPI and its API.

An MPI program can run on a single computer/node with one or more processes that share memory or on multiple computers that are connected to each other via some network (not shown).

![Six nodes](images/6nodes.png)



## Communicators

MPI processes talk to each other via communicators. The global communicator `MPI.COMM_WORLD` connects all MPI processes. Each process has a unique *rank* with `MPI.COMM_WORLD`. The process with `rank=0` is usually called the root process. MPI processes can participate in different communicators and may have a different rank in these communicators.

![Communicators](images/6nodeswcommranks.png)

## Point to point communication

Sending messages from one MPI task to another is the foundation of MPI. Messages consist of some meta information such as the *source* and the *destination*, a *tag* that identifies the message, the *data type*, and the *count* of data items and the actual data.

For each `Send` there has to be a matching `Recv`. This means that the meta information has to fit (including the tag). There are some wildcards and some additional commands that make this more flexible.

Sending and receiving can be blocking or non-blocking. In the latter case, the flow of the program continuous after the call. In the former the program waits until the message has been transmitted. There is a very real danger for deadlocks here!

![Point to point communication](images/6nodesptp.png)


## Collective communication

Collective calls are performed by all ranks of a communicator and thus *must* be called by all ranks. Examples for collective calls are `Bcast`, `Scatter`, `Gather`, and `Allreduce`. We'll look at some collective calls later in the notebook.

![Collective Call](images/6nodescoll1.png)

You could implement collective calls yourself using `Send` and `Recv`, but using the provided collective calls makes your code easier to read and allows vendors to optimize MPI for their platform. For example, in the picture above all the calls share the bandwidth of rank 0. One could use a tree-like pattern to balance the network load instead.

![Collective Call Tree](images/6nodescoll2.png)


You can find more information about MPI at

* http://materials.jeremybejarano.com/MPIwithPython/index.html

And the documentation of mpi4py at http://mpi4py.scipy.org/docs/usrman/index.html.

## Starting the engines

Before we can use MPI, we need to start some IPython engines. For this notebook, we'll use some local engines. The easiest way to start them is by typing ``ipcluster start --engines=MPIEngineSetLauncher`` into a terminal window. This will start as many engines as there are virtual processors on your machine.

Please, go ahead and start the engines before you proceed.

## Connecting to the engines

Next, we want to connect to the engines:

In [1]:
from ipyparallel import Client
rc = Client()
rc.ids

[0, 1, 2, 3, 4, 5, 6, 7]

Next, we need to create a view and activate it.

In [2]:
view = rc[:]
view.activate()
view.block = True

## Setting up for MPI

We are now ready to use MPI with our IPython notebook.

We will use the cell magic ``%%px`` and the line magic ``%px`` to execute commands on all the engines. So, whenenver you see ``%%px`` the cell is executed on *all* the engines, but not in the process that controls your notebook!

In [3]:
%px from mpi4py import MPI # Import and initialize MPI on the engines
from mpi4py import MPI # Import and initialize MPI in the notebook 

In [4]:
%px import numpy as np
import numpy as np

Importing MPI initializes the MPI and sets up the default communicator `COMM_WORLD`, which includes all processes involved in this MPI program.

Using `COMM_WORLD`, each process can determine its *rank* and the total number of ranks available:

In [5]:
%%px
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

In [6]:
%%px
print("I'm %d of %d. Resistance is futile." % (rank + 1, size))

[stdout:0] I'm 2 of 8. Resistance is futile.
[stdout:1] I'm 5 of 8. Resistance is futile.
[stdout:2] I'm 6 of 8. Resistance is futile.
[stdout:3] I'm 7 of 8. Resistance is futile.
[stdout:4] I'm 4 of 8. Resistance is futile.
[stdout:5] I'm 1 of 8. Resistance is futile.
[stdout:6] I'm 3 of 8. Resistance is futile.
[stdout:7] I'm 8 of 8. Resistance is futile.


## Point to point

We can send messages from one rank to another. 

The following exercises will only work if more than one process is available.

In [7]:
%%px
import sys
import numpy as np
comm = MPI.COMM_WORLD
a = np.ones(1)
b = np.zeros(1)
if (size < 2):
    print ("Warning! Not enough ranks available!" )
else:
    if rank == 0:
        print ("I'm rank zero and I'm sending a datum.")
        comm.Send(a[0], dest = 1) # Default destination is 0!
    elif rank == 1:
        print ("I'm rank one and I'm receiving a datum.")
        comm.Recv(b, source = 0) # Default source is 0!


[stdout:0] I'm rank one and I'm receiving a datum.
[stdout:5] I'm rank zero and I'm sending a datum.


## Parallel reduction
In the previous examples, we used send and receive. Now we are going to look at some collective communication. These commands involve all the ranks that belong to a communicator. Let's start by creating an array of random numbers and scattering it to all the ranks, i.e., each rank gets some part of the array.

Variables that are defined on the engines can be retrieved through the `view`. To get the variable rank, e.g., you can write

In [8]:
ranks = view['rank']

In [10]:
ranks

[1, 4, 5, 6, 3, 0, 2, 7]

The ranks are not in any particular order, which can be annoying, but we can get the right order by casting ranks to an ndarray and calling argsort. The result gives us the engines in MPI rank order. Give it a try.

In [9]:
ranksort = np.array(ranks).argsort()
rank0 = int(ranksort[0])

In [13]:
ranksort

array([5, 0, 6, 4, 1, 2, 3, 7])

In [14]:
# This cell is executed within the notebook. The engines don't know anything about it
import numpy as np
a = np.random.random(100000)

In [15]:
# First push the array to the engine with MPI rank 0
%px a = None
view.push({'a':a}, targets=rank0)

Next, we scatter the data from rank 0.

In [16]:
%%px
a_partial = np.zeros(100000)
comm.Scatter(a, a_partial, root = 0)

We can now calculate the partial sum on each rank and then sum up the partial results using `Reduce`.

In [17]:
%%px
total = np.zeros(1)
sum_partial = np.sum(a_partial)
comm.Reduce(sum_partial, total)

Use the view to get the result back into the notebook.

In [18]:
total = view.pull('total', targets=rank0)
print("The sum of the random numbers is %f. The average is %f." % (total, total / len(a)))

The sum of the random numbers is 50052.176449. The average is 0.500522.


## Upper vs. lowercase in mpi4py
`mpi4py` offers two version of many calls. The first one is written with in uppercase like the one in the previous cell. It uses memory buffers, e.g., `np.array`, and maps the call directly to the appropriate C call. The second version is written in lower case and takes arbitrary Python object. The result is given as the return value.

In [19]:
%%px
a_all = comm.gather(a_partial)
print(a_all)

[stdout:0] None
[stdout:1] None
[stdout:2] None
[stdout:3] None
[stdout:4] None
[stdout:5] 
[array([ 0.55049318,  0.61523789,  0.3082229 , ...,  0.        ,
        0.        ,  0.        ]), array([ 0.29773394,  0.22057284,  0.02175943, ...,  0.        ,
        0.        ,  0.        ]), array([ 0.80176278,  0.27859166,  0.65503097, ...,  0.        ,
        0.        ,  0.        ]), array([ 0.,  0.,  0., ...,  0.,  0.,  0.]), array([ 0.24361521,  0.22960397,  0.99653137, ...,  0.        ,
        0.        ,  0.        ]), array([ 0.,  0.,  0., ...,  0.,  0.,  0.]), array([ 0.,  0.,  0., ...,  0.,  0.,  0.]), array([ 0.,  0.,  0., ...,  0.,  0.,  0.])]
[stdout:6] None
[stdout:7] None


Now, `a_all` contains a `list` of `np.array`s.

This second version is convenient, but it is **much** slower:

In [20]:
%%px
%timeit a_all = comm.gather(a_partial)

[stdout:0] 100 loops, best of 3: 5.06 ms per loop
[stdout:1] 100 loops, best of 3: 5.06 ms per loop
[stdout:2] 100 loops, best of 3: 5.06 ms per loop
[stdout:3] 100 loops, best of 3: 5.07 ms per loop
[stdout:4] 100 loops, best of 3: 5.07 ms per loop
[stdout:5] 
The slowest run took 4.33 times longer than the fastest. This could mean that an intermediate result is being cached.
100 loops, best of 3: 5.05 ms per loop
[stdout:6] 100 loops, best of 3: 5.07 ms per loop
[stdout:7] 100 loops, best of 3: 5.06 ms per loop


In [22]:
%%px
a_all = None
if rank == 0:
    a_all = np.zeros(len(a_partial))

%timeit comm.Gather(a_partial[:len(a_partial) // size], a_all)

[stdout:0] 
The slowest run took 49.87 times longer than the fastest. This could mean that an intermediate result is being cached.
10000 loops, best of 3: 105 µs per loop
[stdout:1] 
The slowest run took 172.82 times longer than the fastest. This could mean that an intermediate result is being cached.
10000 loops, best of 3: 105 µs per loop
[stdout:2] 
The slowest run took 17.24 times longer than the fastest. This could mean that an intermediate result is being cached.
10000 loops, best of 3: 105 µs per loop
[stdout:3] 
The slowest run took 33.13 times longer than the fastest. This could mean that an intermediate result is being cached.
10000 loops, best of 3: 105 µs per loop
[stdout:4] 
The slowest run took 61.92 times longer than the fastest. This could mean that an intermediate result is being cached.
10000 loops, best of 3: 105 µs per loop
[stdout:5] 
The slowest run took 126.58 times longer than the fastest. This could mean that an intermediate result is being cached.
10000 loops,

It gets worse as arrays get bigger.

To retrieve the result, we use the `view` again and `ranksort[0]` gives us the index of rank 0.

In [23]:
a_all = np.array(view['a_all'])[ranksort[0]]
a_all.sum() - a.sum()

-25056.071341655923

## Remarks
While IPython notebooks are a nice way to teach about MPI and well tested MPI routines can be quite useful within a notebook, developing MPI code within notebooks can quickly become awkward because mistakes lead to blocking engines. I find it easier to write and test my MPI routines outside ipython notebooks and start the program with `mpiexec` from the command line.

For example, you could write the following program and save it to a file:

In [25]:
%%writefile helloparallelworld.py
from __future__ import print_function
from mpi4py import MPI
import numpy as np

rank = MPI.COMM_WORLD.Get_rank()
numberOfRanks = MPI.COMM_WORLD.Get_size()

a = np.zeros(1)
if rank == 0:
    print("There are %d MPI ranks." % numberOfRanks)
    a = np.random.random(1)

print("I'm rank %d." % rank)
if rank == 0:
    MPI.COMM_WORLD.Send(a, dest=1)
elif rank == 1:
    MPI.COMM_WORLD.Recv(a, source=1)
    

Overwriting helloparallelworld.py


You can then switch to a terminal and execute it as `mpiexec -n 2 python helloparallelworld.py`.

## Exercises

### Peak to Peak
Write a program that generates random numbers from a Gaussian distribution and then finds the minimum and maximum number generated.

a) Generate the random numbers on rank 0 and scatter them. Calculate the maximum and minimum for the partial data for    
   each rank and send the results back to rank 0. Find the maximum and minimum on rank 0 and compare it with numpy's
   `ptp` function.
   
b) Generate random numbers at each rank. Calculate the minimum and maximum and use `Reduce` to find the extrema and send 
   them to rank 0.

In [24]:
%%writefile gauss_mpi.py
from __future__ import print_function
from mpi4py import MPI
import numpy as np
import sys

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
numberOfRanks = comm.Get_size()

N = 100000
Nnode = int(N / numberOfRanks) + 1

# compute numbers and scatter to workers
if rank == 0:
    print('\nlocal min and max values')
    sys.stdout.flush()
    send_buffer = np.random.normal(size=(N))
else:
    send_buffer = np.zeros(Nnode)

# initialize buffer for received data
receive_buffer = np.zeros(Nnode)

# scatter the data 
comm.Scatter(send_buffer, receive_buffer, root = 0)

# compute minimal and maximal values
gauss_min = np.array(np.min(receive_buffer))
gauss_max = np.array(np.max(receive_buffer))

print(rank, ':', gauss_min, gauss_max)


# collect min and max values from all ranks
all_gauss_min = np.zeros(numberOfRanks)
all_gauss_max = np.zeros(numberOfRanks)

comm.Gather(gauss_min, all_gauss_min)
comm.Gather(gauss_max, all_gauss_max)

if rank == 0:
    print('\nlocal min and max values collected')
    print(all_gauss_min)
    print(all_gauss_max)

    
# compute the global min and max values

global_gauss_min = np.zeros(1)
global_gauss_max = np.zeros(1)

comm.Reduce(gauss_min, global_gauss_min, op=MPI.MIN, root=0)
comm.Reduce(gauss_max, global_gauss_max, op=MPI.MAX, root=0)

if rank == 0:
    print('\nglobal min and max values')
    print('reduced:  ', global_gauss_min, global_gauss_max)
    print('computed: ', np.min(send_buffer), np.max(send_buffer))
    print('')
    

Overwriting gauss_mpi.py


In [20]:
import sys