# Lecture 7: MPI communication

## NOTES

* For these practicals we will be using a different `conda environment`. When opening a notebook or a terminal make sure you are using the **CuPy Kernel**!!!
* MPI code must typically be invoked from the command line and not from a notebook. However, for this lectures I will be using a workaround. Both approaches are explained in the first MPI practical.
* mpi4py documentation [here](https://mpi4py.readthedocs.io/en/stable/index.html).

## Two important mpi4py-specific notes

**1) All communication functions are member functions of a MPI communicator.**

To see the complete list of available functions, read the Communicator class API [here](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm).

Example:

```python
    from mpi4py import MPI
    comm = MPI.COMM_WORLD

    comm.barrier() # Collective communication: waits for all ranks to call this. Very useful for synchronization.
```

**2) Lower case functions are for Python objects. Upper case functions for data buffers (e.g., arrays).**

**Note:** Buffer functions are much faster since they use C under the hood. However, they are slightly harder to use. Lower case function use Python [pickle](https://docs.python.org/3/library/pickle.html) instead.

Examples: `WORLD_COMM.send` VS `WORLD_COMM.Send`. See more below.

## Point-to-point communication

Point-to-point means from one rank to one rank.

The basic mpi4py functions for point-to-point communication are:

* For sending: `send` (object version), `Send` (buffer version).
* For receiving: `recv` (object version), `Recv` (buffer version).

**Note:** You should use matching calls (e.g., object send with object receive).

Their typical syntax is:

```python
    # Object version
    comm.send(object_to_send, destination_rank, tag)
    sent_object = comm.recv(source_rank, tag)

    # Buffer version (faster, typially for numpy arrays)
    comm.Send(buffer_to_send, destination_rank, tag)
    comm.Recv(destination_buffer, source_rank, tag) # destination_buffer will be overwritten
```

* `destination_rank` is the rank the message is sent to.
* `source rank` is the rank the message is coming from.
* `buffer_to_send` and `destination_buffer` are numpy arrays. The former is the message to send. The latter is the destination buffer which will be overwritten. **Note:** The two buffers must be the same type and size!
* `sent_object` is where the message is stored after reception in the object version.
* `tag` is an optional integer which identifies the specific communication. It is recommended to use it if you are sending/receiving multiple messages so that processes can distinguish which message is being sent/received. This is more useful for non-blocking communication, but I am providing it for completeness.

### Examples

**Sending/receiving objects**

In [1]:
%%writefile temp.py
from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()

# We can use if statements to make different ranks do different things!

# From rank 0 to rank 1
if rank == 0:
    data = {'a': 7, 'b': 3.14}
    comm.send(data, dest=1, tag=11)
    print("Rank %d." % rank, "Data being sent:", data, flush=True)
elif rank == 1:
    data = comm.recv(source=0, tag=11)
    print("Rank %d." % rank, "Received data:  ", data, flush=True)

# The other way around
if rank == 1:
    data['c'] = 47
    comm.send(data, dest=0, tag=12)
    print("Rank %d." % rank, "Data being sent:", data, flush=True)
elif rank == 0:
    data = comm.recv(source=1, tag=12)
    print("Rank %d." % rank, "Received data:  ", data, flush=True)


Overwriting temp.py


In [2]:
!mpiexec -n 2 python3 temp.py

Rank 0. Data being sent: {'a': 7, 'b': 3.14}
Rank 1. Received data:   {'a': 7, 'b': 3.14}
Rank 1. Data being sent: {'a': 7, 'b': 3.14, 'c': 47}
Rank 0. Received data:   {'a': 7, 'b': 3.14, 'c': 47}


In [3]:
%%writefile temp.py
from mpi4py import MPI
import numpy

comm = MPI.COMM_WORLD
rank = comm.Get_rank()

if rank == 0:
    data = numpy.arange(4, dtype=numpy.int64)
    comm.Send(data, dest=1, tag=13)
    print("Rank %d." % rank, "Array being sent:", data, flush=True)
elif rank == 1:
    data = numpy.empty(4, dtype=numpy.int64) # IMPORTANT: must be the same data type!
    comm.Recv(data, source=0, tag=13)
    print("Rank %d." % rank, "Received array:  ", data, flush=True)


Overwriting temp.py


In [4]:
!mpiexec -n 2 python3 temp.py

Rank 0. Array being sent: [0 1 2 3]
Rank 1. Received array:   [0 1 2 3]


### Common mistakes

What is wrong with the following codes?

**Snippet 1**

In [5]:
%%writefile temp.py
from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()

# We can use if statements to make different ranks do different things!

# From rank 0 to rank 1
if rank == 0:
    data = {'a': 7, 'b': 3.14}
    comm.send(data, dest=1, tag=11)
elif rank == 1:
    data = comm.recv(source=0, tag=11)

print("Rank %d." % rank, "I have this datum:", data, flush=True)

Overwriting temp.py


In [6]:
#!mpiexec -n 3 python3 temp.py

`data` was never defined on Rank 2! This is an easy fix:

In [7]:
%%writefile temp.py
from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()

# We can use if statements to make different ranks do different things!

data = None # Now data is a well-defined variable on all ranks!

# From rank 0 to rank 1
if rank == 0:
    data = {'a': 7, 'b': 3.14}
    comm.send(data, dest=1, tag=11)
elif rank == 1:
    data = comm.recv(source=0, tag=11)

print("Rank %d." % rank, "I have this datum:", data, flush=True)

Overwriting temp.py


In [8]:
!mpiexec -n 3 python3 temp.py

Rank 2. I have this datum: None
Rank 0. I have this datum: {'a': 7, 'b': 3.14}
Rank 1. I have this datum: {'a': 7, 'b': 3.14}


**Snippet 2 - DEADLOCK**

In [9]:
%%writefile temp.py
from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()

# We can use if statements to make different ranks do different things!

data = None # Now data is a well-defined variable on all ranks!

# From rank 0 to rank 1
if rank == 0:
    data = {'a': 7, 'b': 3.14}
    comm.send(data, dest=1, tag=11)
elif rank == 1:
    data = comm.recv(source=1, tag=11) # WRONG SOURCE! It will wait forever! 

print("Rank %d." % rank, "I have this datum:", data, flush=True)

Overwriting temp.py


In [10]:
#!mpiexec -n 2 python3 temp.py

**Snippet 3 - Wrong data type**

In [11]:
%%writefile temp.py
from mpi4py import MPI
import numpy

comm = MPI.COMM_WORLD
rank = comm.Get_rank()

if rank == 0:
    data = numpy.arange(4, dtype=numpy.int32)
    comm.Send(data, dest=1, tag=13)
    print("Rank %d." % rank, "Array being sent:", data, flush=True)
elif rank == 1:
    # Super wrong! No error will be thrown so a bug that is hard to catch!!
    data = numpy.empty(4, dtype=numpy.int64) # WRONG! Sending to a larger buffer. The 32-bit integers will be intepreted as 64-bit integers.
    comm.Recv(data, source=0, tag=13)
    print("Rank %d." % rank, "Received array:  ", data, flush=True)

if rank == 0:
    data = numpy.arange(4, dtype=numpy.int64)
    comm.Send(data, dest=1, tag=13)
    print("Rank %d." % rank, "Array being sent:", data, flush=True)
elif rank == 1:
    data = numpy.empty(4, dtype=numpy.int32) # WRONG! Sending to a smaller buffer. This throws an error luckily!
    comm.Recv(data, source=0, tag=13)
    print("Rank %d." % rank, "Received array:  ", data, flush=True)

Overwriting temp.py


In [12]:
#!mpiexec -n 2 python3 temp.py

**Snippet 4 - Writing to the same memory/file**

You really do not want multiple ranks writing to the same memory chunk and/or file at the same time since this will cause undefined behaviour (at the very least you won't know what is being written at the end).

In interpreted languages, writing to the same memory chunk is hard to do so unless you are using a wrapped compiled language function you are at least safe from this behaviour. However, you are still allowed to write to the same file:

```python
# DO NOT RUN THIS
from mpi4py import MPI
import numpy as np

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

# EXTREMELY BAD IDEA!!!!
with open("temp.txt", "w") as f:
      f.write("Rank %d has written!" % rank)
```

If you really want to do this, check out the mpi4py [MPI I/O docs](https://mpi4py.readthedocs.io/en/stable/tutorial.html#input-output-mpi-io), the [h5py](https://www.h5py.org/) software, or the [ADIOS2](https://adios2.readthedocs.io/en/latest/) library.

## Collective communication - <span style="color:red">Back to the slides</span>.


## Collective communication examples

### One-to-all: Broadcast functions

**Object broadcast**

In [13]:
%%writefile temp.py
from mpi4py import MPI

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

data = None
if rank == 0:
    data = {'ciao' : 'bello'}

data = comm.bcast(data, root=0)
print("Rank %d" % rank, "Data = ", data, flush=True)

Overwriting temp.py


In [14]:
!mpiexec -n 3 python3 temp.py

Rank 0 Data =  {'ciao': 'bello'}
Rank 2 Data =  {'ciao': 'bello'}
Rank 1 Data =  {'ciao': 'bello'}


**Buffer broadcast**

In [15]:
%%writefile temp.py
from mpi4py import MPI
import numpy as np

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

if rank == 0:
    data = np.arange(10, dtype=np.int64)
else:
    data = np.empty(10, dtype=np.int64)

comm.Bcast(data, root=0)

print("Rank %d" % rank, "Data = ", data, flush=True)

Overwriting temp.py


In [16]:
!mpiexec -n 3 python3 temp.py

Rank 0 Data =  [0 1 2 3 4 5 6 7 8 9]
Rank 1 Data =  [0 1 2 3 4 5 6 7 8 9]
Rank 2 Data =  [0 1 2 3 4 5 6 7 8 9]


### One-to-all: Scatter functions

**Object scatter**

In [17]:
%%writefile temp.py
from mpi4py import MPI

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

data = None
if rank == 0:
    data = [(i+1)**2 for i in range(size)]

# object version
data = comm.scatter(data, root=0) # root is the rank that sends the data

assert data == (rank+1)**2 # checking that communication was done correctly

Overwriting temp.py


In [18]:
!mpiexec -n 3 python3 temp.py

**Vector/buffer scatter**

In [19]:
%%writefile temp.py
from mpi4py import MPI
import numpy as np

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

sendbuf = None
if rank == 0:
    sendbuf = np.empty([size, 10], dtype='i') # The array being sent has shape MPI_size-by-10
    # initialising the array to be sent
    for i in range(10):
        sendbuf[:,i] = np.arange(size)

# The array being received has length 10.
recvbuf = np.empty(10, dtype='i') # need to define this on all ranks, even the sending rank!

# Vector version with equal chunks being sent
comm.Scatter(sendbuf, recvbuf, root = 0) # root is the rank that sends the data

# checking that the communication was done correctly
assert np.allclose(recvbuf, rank) # recvbuf will be a vector of values [rank, rank, rank, ....]

# Note: arrays will always be split along the first axis so the first dimension must always be equal to MPI_size.
# Note: There is also a Scatterv function which allows to send chunks of different sizes to different ranks.
#       It is slightly more complicated to use.

Overwriting temp.py


In [20]:
!mpiexec -n 3 python3 temp.py

### All-to-one: Gather

**Object version**

In [21]:
%%writefile temp.py
from mpi4py import MPI
import numpy as np

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

data = rank**2
data = comm.gather(data, root=0)
if rank == 0:
    for i in range(size):
        assert data[i] == i**2
else:
    assert data is None

Overwriting temp.py


In [22]:
!mpiexec -n 3 python3 temp.py

**Buffer/numpy version**

In [23]:
%%writefile temp.py
from mpi4py import MPI
import numpy as np

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

sendbuf = rank*np.ones(10, dtype=np.int64)
recvbuf = None

if rank == 0:
    recvbuf = np.empty([size, 10], dtype=np.int64)

comm.Gather(sendbuf, recvbuf, root=0)

if rank == 0:
    assert np.allclose(recvbuf.T, np.arange(size))

Overwriting temp.py


In [24]:
!mpiexec -n 3 python3 temp.py

### All-to-one: Reduce

**Available reduce operations**

```python
from mpi4py import MPI

MPI.MAX # maximum
MPI.MIN # minimum
MPI.SUM # sum
MPI.PROD # product
MPI.LAND # logical and
MPI.BAND # bit-wise and
MPI.LOR # logical or
MPI.BOR # bit-wise or
MPI.LXOR # logical xor
MPI.BXOR # bit-wise xor
MPI.MAXLOC # max value and its location
MPI.MINLOC # min value and its location
```

**Custom reduce operation (ADVANCED and OPTIONAL. Presented for completeness)**

In [25]:
%%writefile temp.py
import numpy as np
from mpi4py import MPI

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

# create numpy arrays to reduce
src = (rank+3)*np.arange(1,6)
dst = np.zeros_like(src)

print("Rank %d. Initial array:" % rank, src, flush=True)

def myadd(xmem, ymem, dt):
    # here xmem and ymem are modifiable memory buffers.
    # we can treat them like numpy arrays as follows
    x = np.frombuffer(xmem, dtype=src.dtype)
    y = np.frombuffer(ymem, dtype=src.dtype)
    # if we modify what's inside x and y, then xmem and ymem
    # will also be modified.

    z = x + y

    print("Rank %d. Reduction: %s + %s = %s" % (rank, x, y, z), flush=True)

    # ymem is the output of the reduction.
    # Need to use [:] to access the underlying memory
    # Otherwise y will simply be a pointer to the memory underlying z
    y[:] = z

op = MPI.Op.Create(myadd, commute=True)

comm.Reduce(src, dst, op=op, root=0)

if rank == 0:
    print("\nANSWER:\n %s" % dst, flush=True)

Overwriting temp.py


In [26]:
!mpiexec -n 4 python3 temp.py

Rank 3. Initial array: [ 6 12 18 24 30]
Rank 0. Initial array: [ 3  6  9 12 15]
Rank 1. Initial array: [ 4  8 12 16 20]
Rank 0. Reduction: [ 4  8 12 16 20] + [ 3  6  9 12 15] = [ 7 14 21 28 35]
Rank 2. Initial array: [ 5 10 15 20 25]
Rank 2. Reduction: [ 6 12 18 24 30] + [ 5 10 15 20 25] = [11 22 33 44 55]
Rank 0. Reduction: [11 22 33 44 55] + [ 7 14 21 28 35] = [18 36 54 72 90]

ANSWER:
 [18 36 54 72 90]


**Object version**

In [27]:
%%writefile temp.py
import numpy as np
from mpi4py import MPI

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

data = rank

# need to use pre-built MPI operation (see above for list of available ops)
ranksum = None
ranksum = comm.reduce(data, op=MPI.SUM, root=0)

if rank == 0:
    assert ranksum == (size*(size-1))//2

Overwriting temp.py


In [28]:
!mpiexec -n 4 python3 temp.py

**Buffer/numpy version**

In [29]:
%%writefile temp.py
import numpy as np
from mpi4py import MPI

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

sendbuf = np.arange(3, dtype=np.int64)
recvbuf = None

if rank == 0:
    recvbuf = np.empty(3, dtype=np.int64)
    
comm.Reduce(sendbuf, recvbuf, op=MPI.PROD, root=0)

if rank == 0:
    assert np.allclose(recvbuf, np.arange(3)**size)
else:
    assert recvbuf is None

Overwriting temp.py


In [30]:
!mpiexec -n 4 python3 temp.py

### All-to-all: Allgather

In [31]:
%%writefile temp.py
from mpi4py import MPI
import numpy as np

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

# Showing only the object version. Buffer version is the same.
# This is exactly the same as gather, but now all ranks will have the result
# Therefore, there is so no need to specify a root.
data = rank**2
data = comm.allgather(data)
for i in range(size):
    assert data[i] == i**2

Overwriting temp.py


In [32]:
!mpiexec -n 3 python3 temp.py

### All-to-all: Allreduce

In [33]:
%%writefile temp.py
import numpy as np
from mpi4py import MPI

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

data = rank

# Showing only the object version. Buffer version is the same.
# This is exactly the same as reduce, but now all ranks will have the result
# Therefore, there is so no need to specify a root.
ranksum = None
ranksum = comm.allreduce(data, op=MPI.SUM)

assert ranksum == (size*(size-1))//2

Overwriting temp.py


In [34]:
!mpiexec -n 3 python3 temp.py

### All-to-all: Alltoall

In [35]:
%%writefile temp.py
import numpy as np
from mpi4py import MPI

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

A = np.arange(3*size).reshape((size, 3))

a = A[rank,:]

if rank == 0:
    print("Initial matrix:\n", A, "\n", flush=True)

B = comm.alltoall(A)
B = np.vstack(B)

print("Rank %d\n" % rank, "received matrix =\n", B, flush=True)

Overwriting temp.py


In [36]:
!mpiexec -n 4 python3 temp.py

Initial matrix:
 [[ 0  1  2]
 [ 3  4  5]
 [ 6  7  8]
 [ 9 10 11]] 

Rank 0
 received matrix =
 [[0 1 2]
 [0 1 2]
 [0 1 2]
 [0 1 2]]
Rank 1
 received matrix =
 [[3 4 5]
 [3 4 5]
 [3 4 5]
 [3 4 5]]
Rank 2
 received matrix =
 [[6 7 8]
 [6 7 8]
 [6 7 8]
 [6 7 8]]
Rank 3
 received matrix =
 [[ 9 10 11]
 [ 9 10 11]
 [ 9 10 11]
 [ 9 10 11]]


### A common collective communication mistake causing Deadlock

Keep in mind that all collective communication will require synchronization between all ranks (a bit like a barrier). If a rank never makes it for whatever reason, deadlock will occur!

In [37]:
%%writefile temp.py
import numpy as np
from mpi4py import MPI

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

data = rank

# This is the same as the allreduce example.
# However, say we do some computation
# before calling allreduce that
# causes an error only on a single rank:
if rank == 1:
    raise RuntimeError("Rank 1 got it wrong!")

# Since Rank 1 will stop executing code upon error
# the subsequent time allreduce is called will
# cause a deadlock!
ranksum = None
ranksum = comm.allreduce(data, op=MPI.SUM)

# Note that it may even happen that the error
# message never makes it to stdout so that
# you may not even realise an error has occured!

assert ranksum == (size*(size-1))//2

Overwriting temp.py


In [38]:
#!mpiexec -n 3 python3 temp.py # commenting this out since it will cause deadlock