<hr style="border: solid 1px red; margin-bottom: 2% ">

## ARCHER2 SCIENTIFIC PYTHON COURSE

# Parallel Processing and mpi4py
<hr style="border: solid 1px red; margin-bottom: -1%; ">

## Website:  http://www.archer2.ac.uk 

## Helpdesk: support@archer2.ac.uk

<br>

<img src="../images/ukri_logo.png" style="float: center">
<br>
<img src="../images/hpe_logo.png" style="float: center">
<br>
<img src="../images/epcc_logo.png" style="float: center">
<br>
<img src="../images/archer2_logo.png" style="float: center">
<br>

<img src="../images/reusematerial.png" width=50%>

<br>
<hr class="top">

## Message Passing with Python (mpi4py)

<hr class="bot">
<br>


There are a number of message passing interface implementations for python; the most widely used is `mpi4py` (v3.1.3 is the latest).

The source is found at
https://github.com/mpi4py/mpi4py

#### Install

mpi4py is already provided on ARCHER2 through the `cray-python` module. If you are undertaking this lesson on ARCHER2 you do not need to install it yourself.

`module load cray-python`

If you have the Anaconda distribution, it is a relatively straightforward matter to install `mpi4py` and an actual MPI implementation (usually OpenMPI  or MPICH) which is required to do the work underneath. E.g.,

```bash
local-shell> conda install --channel mpi4py mpich mpi4py
```

Note for users who already have MPI installed (e.g., for C/Fortran): to prevent name clashes in your `PATH`  it can be useful to use a conda environment to install `mpi4py` and the associated MPI implementation. See
http://conda.pydata.org/docs/using/envs.html

#### Using MPI in the ipython notebook

While it is possible to use `mpi4py` directly via notebook cells, the execution model is not very helpful. If interested, see the `ipyparallel` package. Here, we will use the shell escape to run parallel examples. If you are running on ARCHER2 then you cannot run the mpi4py examples on the login nodes, and will need to use a batch script to submit your jobs to the system.

#### Note

Python documentation for MPI tends to be (at best) terse. If you are not familiar with the concepts behind MPI, it may be worth trying to familiarise yourself in a different programming context, e.g., C or Fortran, for which materials are generally more detailed.

Readers with no prior knowledge of the Message Passing model might like to consult material available at

https://www.archer2.ac.uk/training/courses/210317-mpi/



<br>
<hr class="top">

## General

<hr class="bot">
<br>


The `mpi4py` python interface follows the MPI C++ interface (actually removed from the latest MPI standard). Formally, the MPI standard says nothing about python.

Internally, the `mpi4py` _implementation_ uses C.

Some appreciation of the object-oriented nature in python is useful.

The C++ names can be - annoyingly - slightly different from the corresponding C/Fortran bindings.
They are often reversed, e.g.,

```
MPI_Buffer_attach() becomes MPI.Attach_buffer()
```

The table on C++ bindings in the MPI standard (annex) can provide a useful quick reference.

ipython `help(mpi4py.MPI)` is rather long, so it can be helpful to narrow the search by e.g., `help(mpi4py.MPI.Intracomm)`. However this only works if you know what you are looking for.


<br>
<hr class="top">

## MPI Environment

<hr class="bot">
<br>


Python code using message passing should import the `mpi4py` python module, typically,

```py
from mpi4py import MPI
```

The `import mpi4py` statement is responsible for initialising MPI (if not already initialised)
so there is no analogue of calls to `MPI_Init()` and `MPI_Finalize()` in typical python code.

### `MPI.COMM_WORLD`

All communicators are python objects, and the pre-defined communicator is
`COMM_WORLD` accessed via the `MPI` module, e.g.,

```py
rank = MPI.COMM_WORLD.rank    # May also see comm.Get_rank()
size = MPI.COMM_WORLD.size    # May also see comm.Get_size()
 ```



### Example


In [None]:
# %load example1.py
"""Importing and using MPI.COMM_WORLD"""

import sys
from mpi4py import MPI

def main(comm):
    """Report rank and size of this communicator"""
    rank = comm.rank
    size = comm.size
    sys.stdout.write("Hello from rank {:2d} of {:2d}\n".format(rank, size))

if __name__ == "__main__":
    """Execute in MPI.COMM_WORLD"""
    main(MPI.COMM_WORLD)


In [None]:
# While one may load the examples for inspection (as above),
# cells themselves execute in serial.

# Use the shell escape to run the script via srun
!srun -n 8 python example1.py

<br>
<br>
<hr class="top">

## Point-to-Point Messages

<hr class="bot">
<br>


The interface makes significant use of optional arguments:

```py
Comm.Send(self, buf, int dest, int tag=0)
Comm.Recv(self, buf, int source=ANY_SOURCE, int tag=ANY_TAG,
          Status status=None) 
```

Note **upper case** in `Send()` and `Recv()`: these are for "buffer-like" data.

There are also distinct `send()` and `recv()` methods; see below.

There are no count or data type arguments associated with the message buffer `buf` (at first sight).


<br>

### Point-to-point messages using `numpy.ndarray` buffers

A numerical application might typically wish to communicate contiguous arrays of data:


```py
import numpy
sz = 1000
buf = numpy.zeros(sz, type = numpy.double)

if rank == 0:
    MPI.COMM_WORLD.Send([buf, sz, MPI.DOUBLE], 1, tag = 99)
elif rank == 1:
    MPI.COMM_WORLD.Recv([buf, sz, MPI.DOUBLE], source = 0, \
                        tag = 99)
```

Here the count and data type are explicitly specified as part of a list and can be omitted.


<br>

### Example: Blocking Send/Recv



In [None]:
# %load example3.py
"""A simple blocking Send/Recv pair"""

import sys
import numpy
from mpi4py import MPI

def main(comm, sz):
    """Send a message between ranks 0 and 1"""
    rank = comm.Get_rank()

    if rank == 0:
        msg = numpy.ones(sz, numpy.double)
        comm.Ssend([msg, sz, MPI.DOUBLE], dest = 1, tag = 999)
    elif rank == 1:
        msg = numpy.zeros(sz, numpy.double)
        comm.Recv([msg, sz, MPI.DOUBLE], source = 0, tag = 999)
        sys.stdout.write("Rank 1 received {}".format(msg))

if __name__ == "__main__":

    sz = 4
    main(MPI.COMM_WORLD, sz)


In [1]:
!mpiexec -n 2 python ./example3.py

Rank [0] received messages from ranks 1 and 1
Rank [1] received messages from ranks 0 and 0


Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  smsg = numpy.array([comm.rank], numpy.int)
Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  smsg = numpy.array([comm.rank], numpy.int)
Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  rmsg = numpy.zeros(2, numpy.int)
Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  rmsg = numpy.zeros(2, numpy.int)


<br>
<br>
<hr class="top">

## Non-Blocking Point-to-Point Messages

<hr class="bot">
<br>


Non-blocking ("immediate") methods are of the form:

```py
Comm.Isend(self, buf, int dest, int tag=0)
Comm.Irecv(self, buf, int source=ANY_SOURCE, int tag=ANY_TAG)
```

Both these return objects of the `Request` class. 

Request objects are handled either by instance methods, e.g.,
```py
request.Wait(self, Status status=None)
```

or class methods, e.g.,
```py
Request.Waitall(type cls, requests, statuses=None)
```

<br>

### Example: Non-blocking communication

In [None]:
# %load example4.py
"""An Example of non-blocking Isend/Irecv pairs"""

import sys
import numpy
from mpi4py import MPI

def main(comm):
    """Exchange messages with 'ajoining' ranks"""
    p1 = comm.rank + 1
    m1 = comm.rank - 1
    if p1 >= comm.size: p1 = 0
    if m1 < 0: m1 = comm.size - 1

    smsg = numpy.array([comm.rank], numpy.int)
    rmsg = numpy.zeros(2, numpy.int)

    reqs1 = comm.Issend(smsg, p1)
    reqs2 = comm.Issend(smsg, m1)
    reqr1 = comm.Irecv(rmsg[0:], source = p1)
    reqr2 = comm.Irecv(rmsg[1:], source = m1)

    reqr1.Wait(); reqr2.Wait()
    sys.stdout.write("Rank {} received messages from ranks {} and {}\n"\
                         .format(smsg, rmsg[1], rmsg[0]))

    MPI.Request.Waitall([reqs1, reqs2])

if __name__ == "__main__":

    main(MPI.COMM_WORLD)


In [None]:
! srun -n 4 python ./example4.py

<br>
<br>
<hr class="top">

## Python objects as messages

<hr class="bot">
<br>


As python objects can be serialised, message passing is available:

```py
Comm.send(self, obj, int dest, int tag=0)
Comm.recv(self, buf=None, int source=ANY_SOURCE, int tag=ANY_TAG,
          Status status=None)  
```

Note **lower case** `send()` and `recv()`.

Serialisation and deserialisation carry an overhead in memory and time; complex/large objects may be slow.

<br>

### Example: sending a list

Note the incoming message is received as the return value

```py
msg = comm.recv(...)
```

In [None]:
# %load example5.py
"""Message passing a python object"""

import sys
from mpi4py import MPI

def main(comm):
    """Send a list from rank 0 to rank 1"""

    if comm.rank == 0:
        msg = ["Any", "old", "thing", comm.rank, {"size" : comm.size}]
        comm.send(msg, dest = 1, tag = 999)
    elif comm.rank == 1:
        msg = comm.recv(source = 0, tag = 999)
        sys.stdout.write("Rank 1 received {}\n".format(msg))

if __name__ == "__main__":

    main(MPI.COMM_WORLD)


In [None]:
! srun -n 2 python ./example5.py

<br>
<br>
<hr class="top">

## Collective Communication

<hr class="bot">
<br>


Collective operations are implemented as methods on `Comm`, e.g.,

```py
Comm.Bcast(self, buf, int root=0)
```

Use `numpy.ndarray` as the actual data, or other contiguous buffer.

Again, note **upper case** `Bcast()`. Distinct `bcast()` etc are intended for python objects.


<br>

### Reductions

Reductions involve the `Op` class, e.g.,

```py
Comm.Reduce(self, sendbuf, recvbuf, Op op=SUM, int root=0)
```

Operations include `MIN`, `MAX`, `SUM` and so on.




In [None]:
import numpy
import sys
from mpi4py import MPI

def main(comm):
    """The sum of a ranks is..."""

    total_sum = numpy.array(0, 'i')
    rank = numpy.array(comm.rank, 'i')
    comm.Reduce([rank, MPI.INTEGER], [total_sum, MPI.INTEGER], op=MPI.SUM, root=0)

    if comm.rank == 0:
        sys.stdout.write("Sum of ranks: {}\n".format(total_sum))

if __name__ == "__main__":

    main(MPI.COMM_WORLD)



## Reductions with python objects


The equivalent method for python objects is:

```py
Comm.reduce(self, sendobj, op=SUM, int root=0)
```

Built-in `Op` operations rely on the relevant special method [e.g., `__add__()`] being implemented for `sendobj`.

You can create your own `Op` functions.

Standard python types should behave "as expected".


### Example

In [None]:
# %load example6.py

import sys
from mpi4py import MPI

def main(comm):
    """The sum of a list is..."""

    mylist = []
    mylist = comm.Reduce([comm.rank], op=MPI.SUM, root=0)

    if comm.rank == 0:
        sys.stdout.write("List of ranks: {}\n".format(mylist))


if __name__ == "__main__":

    main(MPI.COMM_WORLD)



In [None]:
! srun -n 4 python ./example6.py

<br>
<br>
<hr class="top">

## Communicators

<hr class="bot">
<br>

A communicator provides the context within which communication will occur.


The class heirarchy for communicator objects is

```
Comm
    Intracomm
    Intercomm
        Topocomm
            Cartcomm
            Distgraphcomm
            Graphcomm
```

Many methods are implemented on `Comm` and inherited by subclasses, e.g.,

```
    comm.Barrier()
```


<br>

### Example

Creating a Cartesian communicator `Cartcomm` object from an existing `Intracomm` object uses:
```
Intracomm.Create_cart(self, dims, periods=None, reorder=False)
```

and will return a new Cartesian communicator.

Methods for this object include `Get_cart_rank()`, `Get_coords()` and so on.

<br>


In [None]:
# %load example2.py
"""Creating a Cartesian Communicator"""

import sys
from mpi4py import MPI

def main(parent):
    """Create a 2-d Cartesian communicator from parent communicator"""

    dims = MPI.Compute_dims(parent.size, 2)

    comm = parent.Create_cart(dims, periods = [True, False])
    rank = comm.Get_rank()
    coords = comm.Get_coords(rank)
    upx = comm.Shift(0, 1)
    upy = comm.Shift(1, 1)

    out = "Rank{:2d} coords{:2d} {:2d} upx(src,dst) {} upy(src,dst) {}\n"\
                         .format(rank, coords[0], coords[1], upx, upy)
    sys.stdout.write(out)

if __name__ == "__main__":
    """Execute in MPI.COMM_WORLD"""
    main(MPI.COMM_WORLD)


In [None]:
srun -n 4 python ./example2.py

<br>
<br>
<hr class="top">

## Other features

<hr class="bot">
<br>


Standard MPI functionality is available (`mpi4py` version 2.0):

* `Comm.Abort()`, `MPI.Wtime()`, ...


* User-defined types (methods implemented in the `Datatype` class)


* Single-sided communication (see the `Win` class)


* MPI-IO (see the `File` class)


* a few functions are not implemented, e.g., `AlltoAllw()`


<br>

### Some other considerations

* Large numbers of MPI tasks can mean that the action of using `import` to load libraries across all these tasks can take a long time and slow down the starting of the parallel program. 


* Can start in python and call another language
    - can typically pass a communicator and other relevant data
    - convenient access to C `MPI_Comm` handles requires `mpi4py` 2.0.

<br>





In [None]:
from mpi4py import MPI
help(MPI)

<br>
<br>
<hr class="top">

## ARCHER2 Exercise: Compare `Send()` and `send()`

<hr class="bot">
<br>


#### Logging in

With your guest account:
```bash
local-shell> ssh -X username@login.archer2.ac.uk
```


<br>

#### The `Send()` example `exercise1.py`

This python script measures the time taken to pass a message back and forth between two MPI tasks. If we know the amount of data transferred, and the time taken, we can work out the bandwidth in bytes/second. In the
limit of zero message size, this time is the latency - the time taken to transfer a message at all.

Have a look at the script `exercise1.py` and check exactly what the `Send()` - actually `Ssend()` - and `Recv()` are doing.


<br>

#### Running the Send() version

Compute jobs on ARCHER2 must be submitted to the queue system. A submission script is provided. This should look like:

```bash
#!/bin/bash --login

#SBATCH --job-name mpi4py
#SBATCH --nodes=1              # Request one node
#SBATCH --tasks-per-node=2     # Request two tasks per node
#SBATCH --cpus-per-task=1      # Request one cpu per task
#SBATCH --time=00:20:00

#SBATCH --account=ta054
#SBATCH --partition=standard
#SBATCH --qos=short

module load cray-python

# Run with two MPI tasks 
srun python exercise1.py
```

Note that two relevant modules are loaded to provide access to python on the back end. For further information see

https://docs.archer2.ac.uk/user-guide/python/

<br>

#### Submitting the script

The `bash` script (not the python script) is submitted to the queue system using
```bash
guest123@login001:> sbatch <script-name>
```

Run the example script and check the output; this should report the time taken to exchange messages of a given size bettwen two MPI tasks.

<br>

#### Change the placement of the MPI tasks (optional)

Try altering the submission script so that the two MPI tasks each run on a different node. The altered script should include the changes:
```bash
#SBATCH --nodes=2              # Request two nodes
#SBATCH --tasks-per-node=1     # Request one task per node
...
srun python ./exercise1.py
```

Can you see if there is any difference in the reported results?


<br>

#### Exercise

Rewrite the python example so that is uses `send()` and `recv()` rather than `Send()` and `Recv()`.

Instead of using a `numpy.ndarray` object for the message data, try using a standard python list containing the required number of data items.

What influence does this have on the measured bandwidth?
<br>

<br>
<hr class="top">

## Summary

<hr class="bot">
<br>

* `mpi4py` can be used to write parallel python programs


* Can be used to interact with other programs using suitable communicator
  
<br>

* Useful links

  * https://mpi4py.readthedocs.io/en/stable/

 <br>
<hr class="top">
<hr class="bot">
<br>