# 0. HPC - MPI tutorial

## Requirements

No requirements. 

**It is advised to do this tutorial with 4 processes in parallel in order to better understand the examples.**

## MPI tutorial with Mpi4py

MPI (message passing interface) is a technology used for inter-process communication. This technology has been designed to make the process of building parallel applications easier and standardized as each MPI library must implement the same set of functions. Thus, between the very famous MPI for C and the less known Mpi4py, you will find the same set of functionalities (some almost totally similar). In other terms, once you understood the logic of MPI and you are able to programme with it in one library and one language, it should be very easy to switch to another library in another language. 

The aim of this first notebook of the MPI series is to make you understand the basics of MPI and more specifically the Mpi4py module which is an MPI library for Python. **It is essential that you understood well this notebook as it will make the following ones very easy to understand**. 

The structure of this tutorial as well as the idea of the Ring and the Random Walkers examples that you will see in it **have been inspired by the very good MPI for C tutorial by Wes Kendall** (https://mpitutorial.com/tutorials/). However, these two examples have been rewritten and much modified in order to integrate more MPI functions in them and also (I hope) to simplify their logic slightly. 

**The following steps are meaningful if you want to be able to run the MPI notebook with Jupyter on your local machine. If you are not interested by that, just jump these steps.**

First, it must be highlighted that running an MPI program with the terminal is not exactly the same as running MPI into a Jupyter notebook. Indeed, running MPI with this latter is a bit trickier. In order to bring MPI to Jupyter Notebook, we have to do some configurations:
1. If it's not already the case, you need to install an MPI library on your machine such as OpenMPI (https://www.open-mpi.org/) in order to be able to use the mpiexec or mpirun commands to start MPI processes. Just test them in the terminal to make sure that they exist by running `mpirun` or `mpiexec` and you should get a message like `mpirun could not find anything to do`. If you get another message, it probably means that you do not have an MPI library on your local system. 
2. You need IPython, this latter will allow us to run multiple processes using MPI in Jupyter Lab. So just perform the following command on your Python virtual environment:</br>
`pip install ipython`
3. Now you should be able to use the `ipython` command on the terminal. Just run:</br> `ipython profile create --parallel --profile=mpi` to create a new profile called "mpi" when running the IPython clusters. This profile will be used to get MPI in Jupyter Lab.
4. Now you have to edit the file `~/.IPYTHONDIR/profile_mpi/ipcluster_config.py` by adding the line:</br> `c.IPClusterEngines.engine_launcher_class = 'MPIEngineSetLauncher'`. This line instructs ipcluster to use the MPI launcher. 
5. You are all set! You can now run the cluster using the command: `ipcluster start -n 4 --profile=mpi`. This command will run a cluster of 4 processes. 
6. Once the cluster is running, before running Jupyter Lab, you have to enable the cluster functionality in it with the command:</br>`ipcluster nbextension enable`. Needless to say, that you also need to install Jupyter Lab (recommended) or Jupyter Notebook if it's not already done. 

Let's test to see if everything is working as expected. We will initialize the Ipyparallel client which will handle the cluster. Note that, in comment, you have the command if your are executing this notebook locally with the mpi profile previously created. When executing this cell, if you have a cluster with 4 processes, you should see the list of their ids (or ranks): `[0, 1, 2, 3,...]`

In [1]:
import ipyparallel as ipp

rc = ipp.Client() # rc = ipp.Client(profile="mpi") on your local machine with the mpi profile
print("There are " + str(len(rc)) + " processes running in parallel")
print("IDs of the processes running in parallel", rc.ids)

There is 4 processes running in parallel
IDs of the processes running in parallel [0, 1, 2, 3]


Now that Ipyparallel is activated and working, we get the ability to use the magic cell command `%%px` from Ipyparallel. This cell magic command will have to be put in each cell from now on to use MPI. It means that the code in this cell will be executed simultaneously in each process (engines) available in the cluster. 

### Initializing Mpi4py

Technically, when you import the Mpi4py Python module, MPI is initialised and a sanity check is made on all engines. Here, we import this module and we do 3 things: 
1. First, the most important one, we declare the MPI object `comm` which is the first object to create in order to access all the other MPI functionalities. The `.COMM_WORLD` will wrap all available processes into a communicator. This communicator forms the entry point of the MPI execution environment. Here, `.COMM_WORLD` is a set of **all** available processes (**The WORLD of processes**). But later, if needed, you could create **a smaller subset** of just some processes. Thus, from now on, when executing functions on `comm`, these functions are executed on all processes available. 
2. Then, we execute `.Get_rank()` on the `comm` object. On MPI, all processes are assigned a rank which is a unique ID allowing us to identify the running processes.`.Get_rank()` will simply save the rank of each process in the `rank` variable.
3. Finally, `.Get_size()` returns the size of our communicator (our group of processes). This size should match the number of processes you have run the cluster with.

In [3]:
%%px

# import Mpi4py module as well as Numpy for using it later
from mpi4py import MPI
import numpy as np

# mpi initialization
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

print("There are " + str(size) + " processes running in parallel")
print("Hello from process " + str(rank))

[stdout:0] 
There are 4 processes running in parallel
Hello from process 0
[stdout:1] 
There are 4 processes running in parallel
Hello from process 1
[stdout:2] 
There are 4 processes running in parallel
Hello from process 2
[stdout:3] 
There are 4 processes running in parallel
Hello from process 3


Let's make some comments on the ouput: 
* As you can see, Jupyter prints the output per process. Here `stdout:0` is the standard output stream of process having rank 0. 
* Each "Hello" comes from a different rank, each process has a unique rank beginning from 0. 
* Finally, one last very important point to understand with MPI, your code is **schizophrenic**! No joke, this cell that you just have run has been executed **simultaneously** by 4 processes! You can see that the size of the communicator has been printed 4 times for 4 processes having executed this same instruction. **It is essential that you keep in mind that an MPI cell with the magic command `%%px` has its code executed on all the processes available simultaneously. Most part of the time, when programing with MPI, the recurrent bugs come from the fact that you still think unconsciously that the code you are writting is only executed once**.

### The `%%px` magic cell command

Before continuing with MPI, let's emphasize the importance of the `%%px` command. You have to be very careful to not forget the `%%px` with each cell **as MPI is now activated**. Let's make some tests to see how it goes if you don't use this command in all cells. We declare in the next cell a variable which will exist in all processes (because of the `%%px` command):

In [4]:
%%px

var1 = "Hey!" # this variable will now exist in all your MPI processes

Now let's try to get this variable with a cell having not the `%%px` command: 

In [5]:
print(var1)

NameError: name 'var1' is not defined

Wait! How is that possible?! Why our variable which was executed on all MPI processes now seems to have disappeared? 

It's simply because Ipyparallel, for activating MPI in Jupyter, uses **a cluster** with **a controller**. Stated differently, when running the `ipcluster` command with 4 processes, Ipyparallel uses another process called **a controller** to manage the workload of those 4 processes (it is actually more complicated than that, but it's not interesting to know more for our purposes). That's also the reason why we use MPI and not the **Push** and **Pull** functions of the Ipyparallel cluster which achieve the same goal as MPI, that is moving informations between processes. Indeed, the **Push** and **Pull** functions are far less efficient than the MPI commands because the message must go back into the controller before reaching the destination while in MPI, it is directly sent to the destination process from the source process. 

To really illustrate this functioning, let's do the inverse, we declare a variable in the controller and try to print it with the 4 processes working as engines of the cluster and managed by the controller: 

In [6]:
var2 = "Ho!"

In [7]:
%%px

print(var2)

CompositeError: one or more exceptions from call to method: execute
[0:execute]: NameError: name 'var2' is not defined
[1:execute]: NameError: name 'var2' is not defined
[2:execute]: NameError: name 'var2' is not defined
[3:execute]: NameError: name 'var2' is not defined

Nope, none of the engines seem to be aware of the existence of our `var2`. This is why, from now on, do not forget the `%%px` command in each cell as you don't want to messed up with the controller and the cluster. Furthermore, when you will do your own MPI applications, independant from Jupyter, you will not have this cluster in background anymore. Indeed, when you will execute, in the terminal, the command:</br> `mpirun -np 4 python3 my_mpi_app.py`</br>
You will only have those 4 processes executing MPI in parallel and no other processes such as a controller in background. The Ipyparallel cluster is only here for enabling Mpi4py in Jupyter Lab by giving you the `%%px` command, appart from that, **just ignore it**. 

### Sending and receiving with MPI - Blocking communication

Let's dive now into Mpi4py. Mpi4py has basically two big sets of commands (functions) that you can use:
1. The first set of methods is for sending **generic Python objects** through pickle-based communication. In other terms, you can send, between the processes, Python objects (a string, a dictionary...), **however** (big however), although it is convenient and simple, there is a big memory overhead as the object has to pickled and then unpickled at each communication and it's very inefficient.   
2. The second set of methods is the one that you will find in all, correctly implemented, MPI libraries. These methods allow you to send contiugous arrays of memory between processes very efficiently. In fact, this set of commands in Mpi4py is implemented using the MPI for C library. This means that, basically, when executing the Mpi4py commands, underneath, it is the C library which is actually executed. This way of doing brings two big advantages:
    * The communication of contiguous arrays of memory is very fast and efficient, near C-speed.
    * If you understand well these methods in Mpi4py, there will be almost no difference with the MPI for C library!
    
Appart from one small example later (just for your information), we will only use the second set of methods. For sending these contiguous arrays of memory, Mpi4py relies on Numpy which is another very efficient library based on C for matrices and vectors calculation.

Let's go back to the code and make a very simple test, we want to print something only with process 0:

In [8]:
%%px

if rank == 0: 
    print("I am process " + str(rank))

[stdout:0] I am process 0


Here, all the processes have executed this code but only process 0 was able to enter in the condition and to print the string. The logic would the same for all the other processes if you change the rank condition. As you can see, creating a piece of code executed by only one process is fairly easy. 

Now, we are ready to see how to send and receive with MPI but **using blocking communication**. This means that process 0, when sending something to process 1, will block until process 1 has received. Process 1, when executing the receive command, will block (wait) until it receives the message from process 0. **Blocking communication** is quite easy to think about in terms of logic, however, be careful with deadlock. Indeed, if process 1 is waiting for a message from process 0 which will never arrive, it will be waiting forever and all MPI processes will have to be killed and restarted.

The form of the **blocking send function (second set of commands)** is as follows:</br>
`def Send(self, buf, int dest, int tag)`</br>
* The `self` implictly means, in Python, that this function has to be executed from an instanciated object, in this case, a communicator (it's not a static function). Thus, `self` is not a parameter we have to fill. 
* `buf` is the buffer object to be sent. It must be an array (a Numpy array for example) and it makes the code sligthly more efficient to specify the type of this array, although, it is not mandatory. For example, to send an array of integers, we must pass a list with the object to send and its type as `buf` parameter:</br> `[np.array([1, 2, 3]), MPI.INT]`
* `dest` is the rank of the process to which the message must be sent.
* `tag` is an ID (integer) for this message in order to uniquely identify it (in case several messages are sent from multiple processes). 

The form of the **receive function (second set of commands)** is as follows:</br>
`def Recv(self, buf, int source, int tag, Status status=None)`</br> 
* `buf` is the buffer object to be received and is based on the same logic as for the send function. We must pass a list of correct size according to the object to be received and its type as `buf` parameter:</br> `[np.zeros((3), dtype=np.int), MPI.INT]`
* `dest` is the rank of the process from which the message must be received.
* `tag` is the message ID that has been set during the sending. This ID must match the one of the message sent. 

Here, process 0 will send an array of integers to process 1. Process 1 will put each element of this array to the power of 2 and will send it back to process 0. **Remember, we use Numpy to send informations in contiguous arrays of memory**:

In [9]:
%%px

if rank == 0: 
    # the array to send is initialized
    array_to_sent = np.array([1, 2, 3])
    # the array is sent to process 1 with message id 0
    comm.Send([array_to_sent, MPI.INT], 1, 0)
    
    # the modified array from process 1 is received
    comm.Recv([array_to_sent, MPI.INT], 1, 0)
    print("\nThe modified array received from process 1:", array_to_sent)

if rank == 1: 
    # the array which will receive the one sent is intialized with all 0 elements
    array_to_recv = np.zeros((3), dtype=np.int)
    # the array from process 0 with message id 0 is received
    comm.Recv([array_to_recv, MPI.INT], 0, 0)
    print("The array received from process 0:", array_to_recv)
    
    # the array is put to the power of 2
    array_to_send_back = array_to_recv**2
    print("The modified array to send back to 0:", array_to_send_back)
    # the array to send back to process 0
    # (the message id can remain the same as the communication is blocking and not occuring simultaneously)
    comm.Send([array_to_send_back, MPI.INT], 0, 0)

[stdout:0] 
The modified array received from process 1: [1 4 9]
[stdout:1] 
The array received from process 0: [1 2 3]
The modified array to send back to 0: [1 4 9]


To illustrate the importance of a contiguous array of memory in the second set of methods of Mpi4py, let's do the same but with a string as message. Here, Numpy does not really provide a good support for string and we will show that we can do without it. **In order to send our string, we will have to encode it into an array of bytes**:

In [10]:
%%px

if rank == 0:
    # the string to send encoded in bytes
    string_to_send = 'Hey P1, you want to hear a joke?'
    print("Message to send to P1:", string_to_send)
    string_to_send = string_to_send.encode('utf-8')
    print("Length of the string to send:", len(string_to_send))
    
    # send the string encoded in bytes
    comm.Send([string_to_send, MPI.CHAR], 1, 0)
    
    # response from P1 received
    string_to_send = bytearray(23)
    comm.Recv([string_to_send, MPI.CHAR], 1, 0)
    print("Response received from process 1:", string_to_send.decode('utf-8'))

if rank == 1: 
    # message from P0 received
    arr = bytearray(32) # empty bytearray for holding the incoming message (must be big enough)
    comm.Recv([arr, MPI.CHAR], 0, 0)
    print("Message received from P0:", arr.decode('utf-8'))
    
    # response to P0 sent
    arr = 'You! Stop bothering me!'
    print("Message to send to P0:", arr)
    arr = arr.encode('utf-8')
    print("Length of the string to send:", len(arr))
    comm.Send([arr, MPI.CHAR], 0, 0)

[stdout:0] 
Message to send to P1: Hey P1, you want to hear a joke?
Length of the string to send: 32
Response received from process 1: You! Stop bothering me!
[stdout:1] 
Message received from P0: Hey P1, you want to hear a joke?
Message to send to P0: You! Stop bothering me!
Length of the string to send: 23


Now, just to show one example of the first set of methods of Mpi4py (the one for sending generic Python objects), let's do the same string communication but using a simple Python string. **The function in this first set have the same name as the ones of the second set but with the first letter as lowercase**:

In [11]:
%%px

if rank == 0:
    # the string to send
    string_to_send = 'Hey P1, you want to hear a joke?'
    print("Message to send to P1:", string_to_send)
    # lowercase version of the send method for sending generic python object (first set of methods)
    comm.send(string_to_send, 1, 0)
    
    # response from P1 received
    string_to_recv = comm.recv(source=1, tag=0)
    print("Response received from P1:", string_to_recv)

if rank == 1: 
    # message from P1 received
    string_to_send = comm.recv(source=0, tag=0)
    print("Message received from P0:", string_to_send)
    
    # response to P0 is sent
    arr = 'You! Stop bothering me!'
    print("Message to send to P0:", arr)
    comm.send(arr, 0, 0)

[stdout:0] 
Message to send to P1: Hey P1, you want to hear a joke?
Response received from P1: You! Stop bothering me!
[stdout:1] 
Message received from P0: Hey P1, you want to hear a joke?
Message to send to P0: You! Stop bothering me!


Yes, it is beautifully easy but absolutely inefficient for big calculations involving huge matrices and many exchanges. That being said, you could use that kind of communications if you have, for example, neural networks models being trained in different processes and, at the end, they are all sent to process 0. In such an example, **the fact to be able to send the full object would be very valuable as it would be very difficult to send the caracteristics of a neural network through a Numpy array**. 

Let's illustrate the functions that we have seen (sending and receiving contiguous arrays of memory) with a small example. We will do **a ring communication**. Process 0 will send an integer (9) to the next process by rank which will itself send it to the next process and so on. When the process with the biggest rank is reached, the program stops (with some slight modifications you could make it loop again as much as you want). You may think that such a program is trivial and yes it is, however, you will see that there is some small considerations to take:  

In [12]:
%%px

# the origin of the message for Recv function
origin_proc = rank - 1

# the destination of the message for Send function
destination_proc = rank + 1

# if it is not the process 0 which sends the message for the first time
# then it is a process which will receive an incoming message from a lower rank
if rank != 0:
    # variable holding the received value
    v = np.zeros((1), dtype=np.int)
    comm.Recv([v, MPI.INT], origin_proc, 0)

# if it is process 0 
if rank == 0:
    v = np.array([9])
    comm.Send([v, MPI.INT], destination_proc, 0)
    print("I am process " + str(rank) + " and I send value " + str(v[0]) + " to process " + str(destination_proc))
# if it is a process between 0 and the last one
elif rank > 0 and rank < size - 1:
    comm.Send([v, MPI.INT], destination_proc, 0)
    print("I am process " + str(rank) + " and I received value " + str(v[0]) + " from process " + str(origin_proc) + " and I send it to process " + str(destination_proc))
# if it is the last process
else:
    print("I am process " + str(rank) + " and I received value " + str(v[0]) + " from process " + str(origin_proc))

[stdout:0] I am process 0 and I send value 9 to process 1
[stdout:1] I am process 1 and I received value 9 from process 0 and I send it to process 2
[stdout:2] I am process 2 and I received value 9 from process 1 and I send it to process 3
[stdout:3] I am process 3 and I received value 9 from process 2


### The status

We will continue our journey through MPI by looking at the status object.

We did not use it but the `.Recv` and `.Send` functions can take a status object as argument. This status object will hold additional informations about the completion of the transmission. We can get 5 informations with this status object:
* The source of the message (rank of the sender). 
* The tag of the message.
* The length in bytes of the message. 
* The cancelled state (if it has been cancelled or not). 
* The error state (an integer indicating the error or the success of the communication).

Let's test it: 

In [13]:
%%px

if rank == 0: 
    array_to_sent = np.array([28])
    comm.Send([array_to_sent, MPI.INT], 1, 99)

if rank == 1: 

    array_to_recv = np.zeros((1), dtype=np.int)
    
    # empty status object initialized
    status = MPI.Status()
    # we pass the empy status object as the last argument of the Recv function
    comm.Recv([array_to_recv, MPI.INT], 0, 99, status)
    
    print("Status message source:", status.source)
    print("Status message tag:", status.tag)
    print("Status message length in bytes:", status.count)
    print("Status message cancelled:", status.cancelled)
    print("Status message error:", status.error)

[stdout:1] 
Status message source: 0
Status message tag: 99
Status message length in bytes: 8
Status message cancelled: False
Status message error: 0


The tag of the message is 99, the message is an 8 bytes integer, the message has not been cancelled, the error status is 0 (meaning no error or success). 

Let's now do the same test but with an error introduced, indeed, the array receiving the message will not be big enough, the error will be catched: 

In [24]:
%%px

if rank == 0: 
    array_to_sent = np.array([28])
    comm.Send([array_to_sent, MPI.INT], 1, 99)

if rank == 1: 

    array_to_recv = np.zeros((0), dtype=np.int)
    status = MPI.Status()
    
    # we handle the exception returned by the variable which receives the incoming message being of size 0 instead of 1
    try:
        comm.Recv([array_to_recv, MPI.INT], 0, 99, status)
    except:
    
        print("Status message source:", status.source)
        print("Status message tag:", status.tag)
        print("Status message length in bytes:", status.count)
        print("Status message cancelled:", status.cancelled)
        print("Status message error:", status.error)
        # we extract the message associated with the error status code by using the Get_error_string() Mpi4py function
        print("The message associated with the error status " + str(status.error) + " is " + str(MPI.Get_error_string(status.error)))

[stdout:1] 
Status message source: 0
Status message tag: 99
Status message length in bytes: 0
Status message cancelled: False
Status message error: 0
The message associated with the error status 0 is No MPI error


Indeed, the message has been truncated and the status can inform us about that in addition to providing a very useful functionality as we will see now. 

(On the CSCS supercomputer, even thought the try and except is triggered, the error code is 0 instead of 15 for reasons that I do not understand).

### Sending and receiving with MPI - Dynamic communication

There is a last thing we need to look at before moving to non-blocking communication. As you have noticed with the previous examples, we need, somehow, to know the size of the object we receive. You will say that we know it as we programme the instructions for each process and you would not be wrong. However, for some purposes, you may be brung to do operations where you send an object but you can't be sure exactly of the size of this object. And of course, MPI has handled the case for us with the `.Probe()` function and the `Status` object that we have previously seen. 

This function will simply wait for an incoming message and return its status. We can then use this status to get the needed information for handling the process of receiving the message. Let's make process 0 sending an array of 1s of a random size to process 1: 

In [25]:
%%px

import random

if rank == 0: 
    # generate a random number between 1 (included) and 20 (included)
    r = random.randint(1, 20)
    # create a randomly sized array of 1s
    array_to_sent = np.ones(r, dtype=np.int)
    comm.Send([array_to_sent, MPI.INT], 1, 99)

if rank == 1: 

    # empty status object initialized
    status = MPI.Status()
    # we block with the prob function until the message arrive to get its status
    comm.probe(source=0, tag=99, status=status)
    # we extract the length of the incoming message
    length_in_bytes = status.count
    print("The length in bytes of the incoming message is:", length_in_bytes)
    # the length is divided by 8 as each integer has a length of 8 bytes
    array_to_recv = np.zeros((length_in_bytes//8), dtype=np.int)
    comm.Recv([array_to_recv, MPI.INT], 0, 99, status)
    print(array_to_recv)

[stdout:1] 
The length in bytes of the incoming message is: 96
[1 1 1 1 1 1 1 1 1 1 1 1]


Try running this previous cell several times to see that that the message is correctly handled each time.

### Sending and receiving with MPI - Non-blocking communication and the requests

We are going to quickly look at another type of MPI communication, non-blocking communication. The principle is almost the same as before, the only difference is that, now, when issuing a send or a receive, the process will not wait for the destination to have received the message and neither the destination will wait to receive it. The execution flow will continue and when the message arrives, it arrives, we just don't really know when. As a matter of fact, this type of communication is less used than the blocking communication for the simple reason that it is difficult to programme safe applications and to debug them with that type of communications. But it's still interesting and it can be, in some circumstances, useful. 

The non-blocking methods look like that: 

`def Isend(self, buf, int dest, int tag)` for sending.</br>
`def Irecv(self, buf, int source, int tag)` for receiving.

You may think that, appart from the name, there is no difference. Well, actually, there is a fundamental difference. The blocking functions were returning nothing, the non-blocking ones, this time, return something, an Object called a Request. 

The request object represents a sort of grasp on a non-blocking operation. It can be used to know the state of the operation, to wait for it to cancel it... Imagine you send a packet to your friend and the deliverer when taking it, will give you a smartphone with the informations about the sending process on it as well as the possibilities to cancel it or to wait for it to be delivered. This smartphone would be like the `Request` object. 

Let's do a quick example:

In [26]:
%%px

import time

# Empty request initialized
sent_request = MPI.Request()

if rank == 0: 
    array_to_sent = np.array([28])
    sent_request = comm.Isend([array_to_sent, MPI.INT], 1, 99)

if rank == 1:
    array_to_recv = np.zeros((1), dtype=np.int)
    comm.Irecv([array_to_recv, MPI.INT], 0, 99)
    print("Message is:", array_to_recv[0])

[stdout:1] Message is: 0


As we can see, here the output printed is the initialized variable and not the 28 which is sent by process 0. This is exactly the behaviour of non-blocking communication. The code continues its execution. Let's now modify this example a bit by waiting for the variable sent before printing it:

In [27]:
%%px

import time

if rank == 0: 
    array_to_sent = np.array([28])
    # the object returned is a Request object, here, for the sending, we don't use it
    sent_request = comm.Isend([array_to_sent, MPI.INT], 1, 99)

if rank == 1:
    array_to_recv = np.zeros((1), dtype=np.int)
    # we also save the Request object of the receiving function but this time we will use it to wait for the message
    recv_request = comm.Irecv([array_to_recv, MPI.INT], 0, 99)
    # we record the time we are waiting for the message
    start_time = time.time()
    # we wait for the message to come by using the Request object previously saved
    recv_request.Wait()
    end_time = time.time()-start_time
    
    print("Message is:", array_to_recv[0])
    print("Process 1 had to wait:", end_time, "seconds for the message to come")

[stdout:1] 
Message is: 28
Process 1 had to wait: 0.0010488033294677734 seconds for the message to come


We can wait for multiple requests with one command and also test for their completion, that is to say, getting a boolean indicating if the request has been completed or not. In case it was not clear before, **note that the completion of a request is not only for receiving but also for sending, a complete communication occurs when both the sending and the receiving requests are complete**. 

Let's illustrate all that with a last example. Here is the story, Process 0 has decided to be rude and to spam Process 1 with arrays of 12000 integers. Process 1, because it thinks that, maybe, there is somethig relevant in these spams, will receive them all. This very interesting communication will be non-blocking. 

In [28]:
%%px

if rank == 0 or rank == 1:
    # a list of 6 empty requests per process for processes 0 and 1 is initialized
    all_requests = [MPI.Request()] * 6
    # a counter for updating the index of the list
    requests_counter = 0

# the spammer process
if rank == 0: 
    # an array of 12000 integers is sent 6 times to process 1
    # the tag of each message has to be different as they are sent simultaneously
    # the request is saved in the list of all requests each time a sent is issued.
    array_to_sent = np.ones((12000), dtype=np.int)
    all_requests[requests_counter] = comm.Isend([array_to_sent, MPI.INT], 1, 0)
    requests_counter += 1
    all_requests[requests_counter] = comm.Isend([array_to_sent, MPI.INT], 1, 1)
    requests_counter += 1
    all_requests[requests_counter] = comm.Isend([array_to_sent, MPI.INT], 1, 2)
    requests_counter += 1
    all_requests[requests_counter] = comm.Isend([array_to_sent, MPI.INT], 1, 3)
    requests_counter += 1
    all_requests[requests_counter] = comm.Isend([array_to_sent, MPI.INT], 1, 4)
    requests_counter += 1
    all_requests[requests_counter] = comm.Isend([array_to_sent, MPI.INT], 1, 5)
    requests_counter += 1

# process 1 receives all message and save all receive requests
if rank == 1:
    array_to_recv = np.zeros((12000), dtype=np.int)
    all_requests[requests_counter] = comm.Irecv([array_to_recv, MPI.INT], 0, 5)
    requests_counter += 1
    all_requests[requests_counter] = comm.Irecv([array_to_recv, MPI.INT], 0, 4)
    requests_counter += 1
    all_requests[requests_counter] = comm.Irecv([array_to_recv, MPI.INT], 0, 3)
    requests_counter += 1
    all_requests[requests_counter] = comm.Irecv([array_to_recv, MPI.INT], 0, 2)
    requests_counter += 1
    all_requests[requests_counter] = comm.Irecv([array_to_recv, MPI.INT], 0, 1)
    requests_counter += 1
    all_requests[requests_counter] = comm.Irecv([array_to_recv, MPI.INT], 0, 0)
    requests_counter += 1

# the interesting part
if rank == 0 or rank == 1:
    # we print the completion of all request for process 0 and 1 before waiting
    completion_recv = [request.Test() for request in all_requests]
    print("Completion of the 6 requests before waiting (for process", str(rank), "):",completion_recv)
    # process 0 and 1 wait for all requests to finish
    MPI.Request.Waitall(all_requests)
    # we print again the completion after having waited
    completion_recv = [request.Test() for request in all_requests]
    print("Completion of the 6 requests after having waited (for process", str(rank), "):",completion_recv)

[stdout:0] 
Completion of the 6 requests before waiting (for process 0 ): [True, True, True, True, True, True]
Completion of the 6 requests after having waited (for process 0 ): [True, True, True, True, True, True]
[stdout:1] 
Completion of the 6 requests before waiting (for process 1 ): [False, False, False, False, False, False]
Completion of the 6 requests after having waited (for process 1 ): [True, True, True, True, True, True]


### Sending and receiving with MPI - Collective communication

This is the penultimate part. We will see how to do collective communication with MPI. Collective communication includes function like `.Bcast()` which are used to make several processes communicating together with one command. An essential point to keep in mind in collective communication is that these commands imply a synchronization point which means that all processes must reach the same point in the execution flow of the code before executing these collective commands. Speaking about synchronization, let's quickly see a command specially designed for synchronizing the processes. 

#### Barrier

The MPI `.Barrier()` function is specially designed for you to synchronise all processes at a point in the code. Let's look at an example. 

In [29]:
%%px

for process in range(0, size):
    if rank == process:
        print("Hello from process", rank)

[stdout:0] Hello from process 0
[stdout:1] Hello from process 1
[stdout:2] Hello from process 2
[stdout:3] Hello from process 3


Here, Jupyter outputs the printing automatically by ascending process rank, so it seems that the command has been first executed by process 0, then 1, 2... But it is actually not the case, as you may have remarked, Jupyter always orders the ouput by ascending rank of processes, even though, for example, the output of process 3 occured before 0. That's why, if you executed this piece of code in a python file, out of Jupyter, you should see an output like that (for example):
```
Hello from process 2
Hello from process 3
Hello from process 0
Hello from process 1
```

To fix this small inconvenience, we can simply put a barrier at each iteration in the for loop in order to synchronize the execution flow of the processes. In other terms, here, each process has to wait for all the others to have executed the current iteration of the for loop before moving to the next one. Thus, **we can guarantee** that  process 0 is the first to print "hello", then process 1, 2, 3...

In [30]:
%%px

for process in range(0, size):
    if rank == process:
        print("Hello from process", rank)
    comm.Barrier()

[stdout:0] Hello from process 0
[stdout:1] Hello from process 1
[stdout:2] Hello from process 2
[stdout:3] Hello from process 3


Now we will look at some collective communication functions. 

#### Bcast

Let's look at the MPI broadcast function. As its name indicates, this function will broadcast a message from one process to all the others. Here, process 0 will broadcast a message (an integer) to all the other processes. Please note that the broadcast function takes a parameter called `root`, this is the rank of the process broadcasting the message to the others.

In [31]:
%%px

if rank == 0:
    message = np.array([28])
else:
    message = np.array([0])
    
print("Before the broadcast from P0, I am process", rank, "and I have message", message[0])

# process 0 broadcasts integer 28 to all the other processes
comm.Bcast([message, MPI.INT], root=0)

print("After the broadcast from P0, I am process", rank, "and I have message", message[0])

[stdout:0] 
Before the broadcast from P0, I am process 0 and I have message 28
After the broadcast from P0, I am process 0 and I have message 28
[stdout:1] 
Before the broadcast from P0, I am process 1 and I have message 0
After the broadcast from P0, I am process 1 and I have message 28
[stdout:2] 
Before the broadcast from P0, I am process 2 and I have message 0
After the broadcast from P0, I am process 2 and I have message 28
[stdout:3] 
Before the broadcast from P0, I am process 3 and I have message 0
After the broadcast from P0, I am process 3 and I have message 28


`.Bcast()` is a perfect example of an MPI function build from others MPI functions. To illustrate that, let's build our own broadcast function `myBcast()`:

In [32]:
%%px

def myBcast(buff, root):
    
    comm = MPI.COMM_WORLD
    rank = comm.Get_rank()
    size = comm.Get_size()
    
    # if the process is the root, it has to send the message to all the others but itself
    if rank == root: 
        for i in range(0, size):
            if i != root:
                comm.Send(buff, i, 0)
    # if the process is not the root, it has to receive the message coming from the root
    else:
        comm.Recv(buff, root, 0)

Pretty easy right? Let's test it: 

In [33]:
%%px

if rank == 0:
    message = np.array([28])
else:
    message = np.array([0])
    
print("Before the broadcast from P0, I am process", rank, "and I have message", message[0])

myBcast([message, MPI.INT], root=0)

print("After the broadcast from P0, I am process", rank, "and I have message", message[0])

[stdout:0] 
Before the broadcast from P0, I am process 0 and I have message 28
After the broadcast from P0, I am process 0 and I have message 28
[stdout:1] 
Before the broadcast from P0, I am process 1 and I have message 0
After the broadcast from P0, I am process 1 and I have message 28
[stdout:2] 
Before the broadcast from P0, I am process 2 and I have message 0
After the broadcast from P0, I am process 2 and I have message 28
[stdout:3] 
Before the broadcast from P0, I am process 3 and I have message 0
After the broadcast from P0, I am process 3 and I have message 28


Everything works as expected!

#### Scatter & gather

Scatter and gather are MPI collective communication functions also very useful in some circumstances. Scatter will take an array and divide it evenly accross all processes:

In [34]:
%%px

array_to_scatter = np.arange((2*size), dtype=np.int)

if rank == 0:
    print("Original array in process 0:", array_to_scatter)

scattered_array = np.zeros((len(array_to_scatter)//size), dtype=np.int)

# scatter the array 
# the root process is the one having the full array to scatter
comm.Scatter([array_to_scatter, MPI.INT], [scattered_array, MPI.INT], root=0)

print("I am process", rank, "and I have", scattered_array, "of the original array")

[stdout:0] 
Original array in process 0: [0 1 2 3 4 5 6 7]
I am process 0 and I have [0 1] of the original array
[stdout:1] I am process 1 and I have [2 3] of the original array
[stdout:2] I am process 2 and I have [4 5] of the original array
[stdout:3] I am process 3 and I have [6 7] of the original array


Gather will do the inverse of scatter, we will use it to reverse what we did in the previous cell:

In [35]:
%%px

reconstituted_array = np.zeros((2*size), dtype=np.int)

comm.Gather([scattered_array, MPI.INT], [reconstituted_array, MPI.INT], root=0)

print("Reconstituted array:", reconstituted_array)

[stdout:0] Reconstituted array: [0 1 2 3 4 5 6 7]
[stdout:1] Reconstituted array: [0 0 0 0 0 0 0 0]
[stdout:2] Reconstituted array: [0 0 0 0 0 0 0 0]
[stdout:3] Reconstituted array: [0 0 0 0 0 0 0 0]


As you can see, only the process passed as the root argument of the `Gather()` function gets the full reconstituted array. It is also worth pointing out that the `Gather` function gathers the element by rank order. 

Let's see a concrete example of Scatter and Gather in action. We will use them to compute the mean of a big array. 

In [36]:
%%px

# the array we want the mean of
big_array = np.random.randint(1000, size=(2500*size))

# the variable which will hold the different parts of the above array
scattered_array = np.zeros((len(big_array)//size), dtype=np.int)
# scatter the big_array into scattered_array in each process
comm.Scatter([big_array, MPI.INT], [scattered_array, MPI.INT], root=0)
# each process do the sum of each of its scattered array
scattered_sum = np.array([np.sum(scattered_array)])
# the variable which will hold the sum of all the scattered_array previously calculated
reconstituted_sum = np.zeros((size), dtype=np.int)
# gather all scattered_sum into reconstituted_sum
comm.Gather([scattered_sum, MPI.INT], [reconstituted_sum, MPI.INT], root=0)

# calculate and test final mean
if rank == 0:
    mean = np.sum(reconstituted_sum) / len(big_array)
    mean_nump = np.mean(big_array)
    print("Mean using Gather and Scatter:", mean)
    print("Mean using Numpy function to check:", mean_nump)

[stdout:0] 
Mean using Gather and Scatter: 501.6872
Mean using Numpy function to check: 501.6872


### Computations with MPI -  Reduce and allreduce

Reduce is a function, which, as its name indicates, reduces an array to one element following a given function. Let's look at an example. 

In [37]:
%%px

# one random integer between 0 and 10 is generated per process
random_number = np.random.randint(10, size=1, dtype =np.int)
print("The random integer is for process", rank,"is:", random_number)
# an array of size 1 for holding the final sum
tot_sum = np.array([0], dtype =np.int)
# the function reduces the random numbers of each process into one total sum as we use the sum operation as parameter
# this sum is present only in the root process
comm.Reduce([random_number, MPI.INT], [tot_sum, MPI.INT], op=MPI.SUM , root=0)

print ("The sum is {}".format(tot_sum))

[stdout:0] 
The random integer is for process 0 is: [5]
The sum is [24]
[stdout:1] 
The random integer is for process 1 is: [9]
The sum is [0]
[stdout:2] 
The random integer is for process 2 is: [4]
The sum is [0]
[stdout:3] 
The random integer is for process 3 is: [6]
The sum is [0]


As we can see the sum is printed only in the process passed as root argument of the `reduce()` function. The variant `Allreduce()` does exactly the same but distribute the final sum accross all processes. 

In [38]:
%%px

# one random integer between 0 and 10 is generated per process
random_number = np.random.randint(10, size=1, dtype =np.int)
print("The random integer is for process", rank,"is:", random_number)
# array of size 1 for holding the final sum
tot_sum = np.array([0], dtype =np.int)
# the function reduces the random numbers of each process into one total sum
# but this time, this sum is then broadcasted in all processes
comm.Allreduce([random_number, MPI.INT], [tot_sum, MPI.INT], op=MPI.SUM)

print ("The sum is {}".format(tot_sum))

[stdout:0] 
The random integer is for process 0 is: [0]
The sum is [21]
[stdout:1] 
The random integer is for process 1 is: [6]
The sum is [21]
[stdout:2] 
The random integer is for process 2 is: [7]
The sum is [21]
[stdout:3] 
The random integer is for process 3 is: [8]
The sum is [21]


### A small concrete example - The random walkers

In order to solidify a part of what we have seen until now, let's look at a last (slightly more complicated) example of an MPI mini-application. This application heavily uses blocking communication as well as a bit the broadcast and the reduce functions. 

This example is a funny example called the random walkers. Basically, there is a number of initialized walkers which will randomly walk a number of steps, if the walk goes over the domain size of the process, the walker is sent to the next process like in the ring example. The principle is the following:
* You pass the total size of the walk. This size will then be divided among each process. For example, if your full walk is 100 and you have 4 processes, each process will have a 25 domain size (100/4).
* You pass the maximum number of steps a random walker can do, for example, if you pass 200, it means that a random walker can do a walk consisting of a random number between 1 and 200 steps.
* You pass the number of walkers to initialize in process 0 at step 0. 
* You pass the number of random walks you want the walkers to do before the programs stops.

Here is an image of the functioning:

![title](media/picture5.png)

Let's first do the Walker object which defines a random walker:

In [39]:
%%px

class Walker():
    # constructor
    def __init__(self, pos, max_walk_size, remaining_step_next_domain):
        self.position = pos                                              # a walker has a position
        self.max_walk_size = max_walk_size                               # the max walk size it can do
        self.remaining_step_next_domain = remaining_step_next_domain     # when coming to another domain, this is the remaining steps he has to do
    
    # do a random walk 
    # this function takes as argument the remaing steps before having to leave the current domain
    # it also takes a boolean "first" which indicates if it is a new random walk or if it is the remaining steps of a current random walk
    # return False if the walker stay on the current domain and True if it has to be sent to the next domain
    def random_walk(self, remaining_step_current_domain, first):
        
        if first: # if new random walk, generate a random number of steps
            r = random.randint(1, self.max_walk_size)
        else: # else take the remaining steps saved as attributes when the walker was sent to another domain
            r = self.remaining_step_next_domain
            
        # if the number of steps to do is smaller than the remaining domain size just increment the current position of the walker on the current domain
        # return False
        if r < remaining_step_current_domain:
            self.position += r
            return False
        # else save the remaining steps that will have to be done on the next domain
        # return True
        else:
            self.remaining_step_next_domain = r - remaining_step_current_domain
            return True

Now, let's do the Domain object:

In [40]:
%%px

class Domain():
    
    # constructor
    def __init__(self, rank, size, first):
        self.rank = rank               # rank of the process associated to that domain
        self.size = size               # size of that domain
        self.walkers = []              # list of walkers currently on this domain
        self.departing_walkers = []    # list of walkers which have to be sent to the next domain
        self.first = first             # boolean variable indicating if it is a new walk as process 0 has to initialize the move
    
    # function making walk all the walkers currently on this domain (in the "walkers" list)
    def make_them_walk(self, first):
        
        for walker in self.walkers[:]:
            remaining_step = self.size - walker.position
            departing = walker.random_walk(remaining_step, first)
            # if the walker is departing on the next domain
            # it is suppressed from the walkers list and added to the departing_walkers list
            if departing:
                self.departing_walkers.append(walker)
                self.walkers.remove(walker)

Finally let's do the program itself with the MPI logic, the basic idea will be the following:
* There is a while loop which makes walking all walkers the number of times passed by the user. 
* For each iteration of this while loop, there is another while loop which will test if there is still walkers departing, meaning that they do not finish their current walk.
* Inside this second while loop, each process will send, if it has departing walkers, these walkers on the next process. The next process will receive them, make them move and then send the departing ones to the next process and so on. 

In [56]:
%%px

full_walk_size = 100         # the full walk is 100
max_walk_size = 40           # a walker can, at maximum, do a full domain plus 15 steps of the next one per walk
first_walkers = 2            # there is 2 walkers beginning at step 0 at domain 0
tot_random_walk = 2          # the number of random walk each walkers must do before the program stops
verbose_output = True

# statistic for printing the total number of walkers sent for all processes at the end
num_walkers_sent = 0

# size of a domain in a process
per_domain_size = full_walk_size // size

# create the Domain objects and the first walkers
# if rank 0, the walkers and the communication have to be initialized
# between 0 and the last rank, just the domain has to be created
# the last one must possibly stretches if the full_walk_size can not be divided evenly between all the processes
if rank == 0:
    domain = Domain(rank, per_domain_size, True)
    for _ in range(0, first_walkers):
        domain.walkers.append(Walker(0, max_walk_size, 0))
elif rank > 0 and rank < size - 1:
    domain = Domain(rank, per_domain_size, False)
else:
    per_domain_size = full_walk_size - (size-1) * per_domain_size
    domain = Domain(rank, per_domain_size, False)

random_walk_counter = 0 # counter of random walk
while random_walk_counter < tot_random_walk:

    # first all walkers do a walk with the True parameter meaning that it is a brand new random walk
    domain.make_them_walk(True)

    # variable initialized on True to enter the while loop
    remaining_walkers = True
    # while there are remaining walkers which need to be sent
    while remaining_walkers:

        # origin (incoming walkers) and destination (walkers being sent) of the current domain
        origin = domain.rank - 1 if domain.rank - 1 >= 0 else size - 1
        destination = domain.rank + 1 if domain.rank + 1 != size else 0 

        # if it is not the domain 0 and a brand new random walk
        # process 0 has to initialize the walk, it can't wait for a message which will never arrive (deadlock)
        if not domain.first:
            # the process receives a message from the previous one about the number of departing walkers
            num_walkers = np.array([0], dtype=np.int)
            comm.Recv([num_walkers, MPI.INT], origin, 0)

            if verbose_output:
                print("Process", str(rank), "receives", num_walkers[0],"walker(s) from process", origin)
            # the process receives all these walkers, one by one
            for i in range(0, num_walkers[0]):
                # the process needs the remaining steps to perform for the arriving walkers
                npwalker = np.array([0], dtype=np.int)
                comm.Recv([npwalker, MPI.INT], origin, 1+i)
                # the process creates new walkers for the coming walkers with the correct number of remaining steps to do
                domain.walkers.append(Walker(0, max_walk_size, npwalker[0]))

        # the process makes walk again the incoming walker
        # here False as these walkers are still performing their current walk
        domain.make_them_walk(False)

        # the process checks now which walkers need to be sent to the next domain
        # it sends the number of them departing
        num_walkers = np.array([len(domain.departing_walkers)])
        comm.Send([num_walkers, MPI.INT], destination, 0)

        remaining_walkers = False # assumed on False for now

        if verbose_output:
            print("Process", str(rank), "sends", len(domain.departing_walkers),"walker(s) to process", destination)

        # number of walkers sent incremented for the current process (statistic)
        num_walkers_sent += len(domain.departing_walkers)

        # the process sends the walkers, one by one
        for i in range(0, len(domain.departing_walkers)):
            # the process only needs to sent the remaining steps of each walker
            npwalker = np.array([domain.departing_walkers[i].remaining_step_next_domain], dtype=np.int)
            comm.Send([npwalker, MPI.INT], destination, 1+i)
            # by having to sent new walkers, there is still remaing walkers
            remaining_walkers = True

        # the departing walkers have been sent, the list is cleared
        domain.departing_walkers.clear()

        # the only thing which matters is actually if the last process has departing walkers
        # because it would mean that a full new loop has to be done again
        if remaining_walkers:
            remain = np.array([1], dtype=np.int)
        else:
            remain = np.array([0], dtype=np.int)

        # so the last process will broadcast if it has send again walkers to process 0 to all processes so that 
        # no one can leave the loop before all walkers have finished their current walk
        comm.Bcast([remain, MPI.INT], root=size-1)

        # update all remaining_walkers variable based on the broadcasted value from the last process
        if remain[0] == 1:
            remaining_walkers = True
        else:
            remaining_walkers = False

        # if a new loop has to be done, this time process 0 has incoming messages from the last process
        domain.first = False

    # increment the number of random walk
    random_walk_counter += 1

# we have finished all the walks
# handle the last message sent by the last process but which has not been received by process 0 as they went out of the loop
# it is very important to not let messages unreceived, especially with Jupyter Lab as they persist until the next cell execution
if rank == 0:
    npwalker = np.array([0], dtype=np.int)
    comm.Recv([npwalker, MPI.INT], size-1, 0)

# after all random walks have been performed we unify the number of walkers sent in each process
# in a total amount of walkers sent by all processes with Allreduce
tot_num_walkers_sent = np.array([0])
comm.Allreduce([np.array([num_walkers_sent]), MPI.INT], [tot_num_walkers_sent, MPI.INT], op=MPI.SUM)

if rank == size - 1:
    print("Total number of walkers sent during the communication:", tot_num_walkers_sent[0])
    print("Cycle terminated")

[stdout:0] 
Process 0 sends 2 walker(s) to process 1
Process 0 receives 0 walker(s) from process 3
Process 0 sends 0 walker(s) to process 1
[stdout:1] 
Process 1 receives 2 walker(s) from process 0
Process 1 sends 0 walker(s) to process 2
Process 1 receives 0 walker(s) from process 0
Process 1 sends 2 walker(s) to process 2
[stdout:2] 
Process 2 receives 0 walker(s) from process 1
Process 2 sends 0 walker(s) to process 3
Process 2 receives 2 walker(s) from process 1
Process 2 sends 0 walker(s) to process 3
[stdout:3] 
Process 3 receives 0 walker(s) from process 2
Process 3 sends 0 walker(s) to process 0
Process 3 receives 0 walker(s) from process 2
Process 3 sends 0 walker(s) to process 0
Total number of walkers sent during the communication: 4
Cycle terminated


In order to understand the verbose_output, you should check the first line of process 0, this one should correspond to the first line of process 1 which itself corresponds to the first line of process 2...

**Try to put higher values for the number of walkers, the max walk size, the number of walks but disable verbose output (otherwise you will be polluated with a lot of messages) just to see the number of walkers sent during the communication**.

One last remark, as you can see in the code at the end of the previous cell, there is this condition only for process 0 and which handles the last message sent by the last process before they all goes out of the loop. **It is essential in Jupyter Lab to handle ALL MESSAGES and to not let messages unhandled, if you comment this condition, you should see that the program still works but when you rerun it multiple times the output will make no sense, it's because there is always a messages which has not been received but which will be received when you rerun the cell, thus it creates a bug very diffcult to fix**. This kind of problems does not occur if you run this program outside of Jupyter simply because every time the process states are reset between each run. On the contrary, in Jupyter, the processes are always runnning in background thus they keep the states between different runs. 

I hope you had pleasure doing this MPI tutorial and that you learned a lot of new things! If you understand it well, the notebooks following it and building a complete mini-app should be quite easy! 