# MPI4Py - Message Passing Interface for Python

## Motivation

In [None]:
%%writefile mpi4py/fstmpi.py

import numpy as np
from mpi4py import MPI


comm = MPI.COMM_WORLD # communicator size
rank = comm.Get_rank() # process rank
sndbuf = np.array([rank]) # send buffer
rcvbuf = np.empty_like(sndbuf) # receive buffer

comm.Reduce([sndbuf, 1, MPI.INT], [rcvbuf, 1, MPI.INT], op=MPI.SUM, root=0) # sum reduction on process 0 (root)

print(f'Process {rank}: Sending {rank} to 0.')
if rank == 0:
    print(f'Root: Sum of ranks is {rcvbuf}')

In [None]:
!mpirun --oversubscribe --np 8 python3 mpi4py/fstmpi.py

## The Basics

#### Processes & Communicators
MPI programs follow the __SPMD__ (Single program, multiple data) programming paradigm, where each process runs the same executable but potentially with different data. In the pure (single-threaded) MPI model, each core on a node can execute an __MPI process__, even though there are most certainly multiple cores in each physical processor and perhaps even multiple processors on each node. These processes communicate via explicit __message passing__ over a __transparent network__, such that in general the programmer need not care whether the communicating processes are located on the same processors or distributed over several nodes.
MPI processes are organized in __logical sets__ that define which processes are allowed to communicate with each other. Such a set of processes is known as a __communicator__. One special communicator that contains all processes is created at the start of an MPI program; this communicator is called __MPI.COMM_WORLD__.

#### Pickling
__Pickling__ refers to the serialization and deserealization of Python objects, which is commonly done by the help of the `pickle` module. Before objects can be stored or transferred via network, they need to be converted into a byte stream that preserves the objects' structure. The inverse process converts this byte stream back into an object that is identical to the original. __MPI4Py__ provides pickle-based communication of generic Python object as well as direct array data communication of buffer-provider objects, such as NumPy arrays. Communication functions with all-lowercase names are meant for generic pickled objects, while those starting with an __upper-case__ letter are used for __buffered objects__.

In [None]:
%%writefile Bob.py

class Bob:
    def __init__(self, msg):
        self.msg = msg

    def report(self, a, b, c):
        return f'{a*b + c} {self.msg}'

In [None]:
import pickle
from Bob import *


bobj = Bob('bottles of beer on the wall') # creates a Bob object
print(bobj.report(4,20,19)) # calls Bob function

with open('bobfile.pkl', 'wb') as picklefile: # creates a pickle file (write binary mode)
    pickle.dump(bobj, picklefile) # pickles the Bob object

In [None]:
import pickle


with open('bobfile.pkl', 'rb') as file: # read the pickle file (read binary mode)
    bobj = pickle.load(file) # unpickles the Bob object

print(bobj.report(4,21,16)) # calls Bob function

In [None]:
%%writefile picklempi.py

import numpy as np
from mpi4py import MPI
import pickle


comm = MPI.COMM_WORLD # world communicator size
rank = comm.Get_rank() # process rank
size = comm.Get_size() # communicator size

if rank == 0:
    with open('bobfile.pkl', 'rb') as file: # read the pickle file (read binary mode)
        bobj = pickle.load(file) # unpickles the Bob object
else:
    bobj = None

bobj = comm.bcast(bobj, root=0) # root broadcasts the Bob object
print(bobj.report(size,size,rank)) # calls Bob function

In [None]:
!mpirun -export-bindings --bind-to-core --np 8 python3 picklempi.py

## Point-to-Point Communication

The simplest method to communicate with MPI is __point-to-point communication__ between two specific processes, a sender and a receiver. Both processes actively participate in this form of communication where the sender must execute some send function while the receiver executes some receive function. Furthermore, both processes must have the following information: the communication patner (i.e. source or destination) and the tag that identifies the message.
MPI is equipped with two flavors of point-to-point communication: blocking and non-blocking.
With blocking communication, the processes wait until the communication has reached a certain state before they continue to process the data, while processes engaging in non-blocking communication continue immediately and require the programmer to check whether it is safe to process the data.

| SENDING | Blocking | Nonblocking |
|---|---|---|
| Synchronous | `Ssend` | `Issend` |
| Buffered  | `Bsend` | `Ibsend` |
| Standard  | `Send` | `Isend` |
| Ready  | `Rsend` | `Irsend` |

| RECEIVING | Blocking | Nonblocking |
|---|---|---|
| Standard  | `Recv` | `Irecv` |

| COMPLETING | Blocking | Nonblocking |
|---|---|---|
| Standard | `Wait` | `Test` |
| Any  | `Waitany` | `Testany` |
| Some  | `Waitsome` | `Testsome` |
| All  | `Waitall` | `Testall` |

### Blocking Point-to-Point Communication

Blocking send or receive functions cause the executing process to suspend until the message buffer is safe to use. After a blocking send, the process only continues when the data to be sent have been copied from the send buffer, however, this does not mean that the data have been received. In the case of a blocking receive, the completion impies that the data have been copied to the receive buffer and is safe to be used.

In [None]:
%%writefile mpi4py/demo-bp2p.py

import numpy as np
from mpi4py import MPI


comm = MPI.COMM_WORLD
rank = comm.Get_rank()
data = [0.12, 3.45, 6.78, 9.10]

if rank == 0:
    sndbuf = np.array(data) # send buffer
    print(f'Process {rank} sends {sndbuf}')
    comm.Send(sndbuf, dest=1) # standard blocking send
elif rank == 1:
    rcvbuf = np.empty_like(data) # receive buffer
    comm.Recv(rcvbuf, source=0) # blocking receive
    print(f'Process {rank} receives {rcvbuf}')

In [None]:
!mpirun --bind-to core --np 2 python3 mpi4py/demo-bp2p.py

#### Communication Modes

For blocking point-to-point communication, the MPI standard defines four modes of communication with subtle differences in their semantics:

__Synchronous Send__ is the most stringent communication mode, since the sending process requires the receiving process to provide a matching receive, i.e. it has to accept the handshake, in order to initiate the send. This means that the receiving process has to declare its readinoss for receiving a message. Ideally, every MPI program still works correctly when standard send is replace with synchronous send, however, if it is used incorrectly, it can lead to deadlocks and serialization. The standard use case for this mode is debugging.

In [None]:
%%writefile mpi4py/demo_bp2p-synchronous.py

import numpy as np
from mpi4py import MPI


comm = MPI.COMM_WORLD
rank = comm.Get_rank()
data = [0.12, 3.45, 6.78, 9.10]

if rank == 0:
    sndbuf = np.array(data) # send buffer
    print(f'Process {rank} sends {sndbuf}')
    comm.Ssend(sndbuf, dest=1) # synchronous blocking send
elif rank == 1:
    rcvbuf = np.empty_like(data) # receive buffer
    comm.Recv(rcvbuf, source=0) # blocking receive
    print(f'Process {rank} receives {rcvbuf}')

In [None]:
!mpirun --bind-to core --np 2 python3 mpi4py/demo_bp2p-synchronous.py

__Buffered Send__  copies the data from the message buffer to a buffer that is managed by the user and subsequently returns. Once a matching receive has been received, the data will be transmitted over the network from the user's buffer. Naturally, this requires an additional buffer and an extra transfer between the buffers. However, this communication mode is local, and its completion does not depend on the occurrence of a matching receive. This communciation mode also requires the programmer to attach and detach a user-managed buffer, where the detach call blocks und all messages in the buffer have been transmitted.

In [None]:
%%writefile mpi4py/demo_bp2p-buffered.py

import numpy as np
from mpi4py import MPI


comm = MPI.COMM_WORLD
rank = comm.Get_rank()
data = [0.12, 3.45, 6.78, 9.10]

if rank == 0:
    sndbuf = np.array(data) # send buffer
    MPI.Attach_buffer(sndbuf) # attach buffer
    print(f'Process {rank} sends {sndbuf}')
    comm.Bsend(sndbuf, dest=1) # buffered blocking send
    MPI.Detach_buffer() # detach buffer
elif rank == 1:
    rcvbuf = np.empty_like(data) # receive buffer
    comm.Recv(rcvbuf, source=0) # blocking receive
    print(f'Process {rank} receives {rcvbuf}')

In [None]:
!mpirun --bind-to core --np 2 python3 mpi4py/demo_bp2p-buffered.py

__Standard Mode__ is either synchronous or bufferd, depending on the MPI library, and comes with the respective advantages and disadvantages.

In [None]:
%%writefile mpi4py/demo_bp2p-standard.py

import numpy as np
from mpi4py import MPI


comm = MPI.COMM_WORLD
rank = comm.Get_rank()
data = [0.12, 3.45, 6.78, 9.10]

if rank == 0:
    sndbuf = np.array(data) # send buffer
    print(f'Process {rank} sends {sndbuf}')
    comm.Send(sndbuf, dest=1) # standard blocking send
elif rank == 1:
    rcvbuf = np.empty_like(data) # receive buffer
    comm.Recv(rcvbuf, source=0) # blocking receive
    print(f'Process {rank} receives {rcvbuf}')

In [None]:
!mpirun --bind-to core --np 2 python3 mpi4py/demo_bp2p-standard.py

⚡ __Ready Send__ ⚡ communication works under the assumption that the matching receive has alread been posten and thus the send call completes immediately. However, this call only succeeds if the matching receive has indeed been posted, otherwise the behvaiour is undefined. This communication has the potential to be that fastest but it should be handled with utmost care and used only when the control flow of the parallel program permits it.

In [None]:
%%writefile mpi4py/demo_bp2p-ready.py

import numpy as np
from mpi4py import MPI


comm = MPI.COMM_WORLD
rank = comm.Get_rank()
data = [0.12, 3.45, 6.78, 9.10]

if rank == 0:
    sndbuf = np.array(data) # send buffer
    print(f'Process {rank} sends {sndbuf}')
    comm.Rsend(sndbuf, dest=1) # ready blocking send
elif rank == 1:
    rcvbuf = np.empty_like(data) # receive buffer
    comm.Recv(rcvbuf, source=0) # blocking receive
    print(f'Process {rank} receives {rcvbuf}')

In [None]:
!mpirun --bind-to core --np 2 python3 mpi4py/demo_bp2p-ready.py

### Nonblocking Point-to-Point Communication

Nonblocking calls only initiate send or receive operations but do not complete them. The calls will return before the message has been copied to or from the buffer and a separate call is necessary to complete the operation. It is the programmers responsibility to leave the buffer unmodified between initiation and completion of a call. Only after calling test or wait functions the buffers can be savely read or written. The function names of these nonblocking send and receives calls irrespective of their communstart with a capital I, which stands for immediate
The primary reason for introducing nonblocking communication into a program is to overlap computation and communication by offloading the communication part to the network hardware with minimal involvement of the CPU, which can continue with the computation part in the meantime.
MPI provides nonblocking alternatives to all the previously mentioned communication modes for sending and receiving as well as a set of functions to check for completion of transmission.

* Blocking sends can be used with nonblocking receives and vice versa
* Nonblocking calls followed immediately by a matching wait are equivalent to blocking calls

#### Test

`MPI.Test` returns immediately with a flag that indicates whether the given request is completed. Blocking behaviour can be emulated with calling `MPI_Test` inside a loop, which turns it into a safe polling mechanism.

In [None]:
%%writefile mpi4py/demo_ip2p-standard-test.py

import numpy as np
from mpi4py import MPI


comm = MPI.COMM_WORLD
rank = comm.Get_rank()
data = [0.12, 3.45, 6.78, 9.10]

if rank == 0:
    sndbuf = np.array(data) # send buffer
    print(f'Process {rank} sends {sndbuf}')
    req = comm.Isend(sndbuf, dest=1) # standard blocking send
    if (req.Test() is not True):
        print(f'Isend of process {rank} pending ...')
    if (req.Test() is True):
        print(f'Isend of process {rank} successful.')
elif rank == 1:
    rcvbuf = np.empty_like(data) # receive buffer
    req = comm.Recv(rcvbuf, source=0) # blocking receive
    print(f'Process {rank} receives {rcvbuf}')

In [None]:
!mpirun --bind-to core --np 2 python3 mpi4py/demo_ip2p-standard-test.py

#### Wait

`MPI.Wait` is essentially the blocking version of `MPI.Test` that returns only when the operation corresponding to the given request has been completed.

In [None]:
%%writefile mpi4py/demo_ip2p-standard-wait.py

import numpy as np
from mpi4py import MPI


comm = MPI.COMM_WORLD
rank = comm.Get_rank()
data = [0.12, 3.45, 6.78, 9.10]

if rank == 0:
    sndbuf = np.array(data) # send buffer
    print(f'Process {rank} sends {sndbuf}')
    req = comm.Isend(sndbuf, dest=1) # standard blocking send
    if (req.Wait() is True): # instance method
        print(f'Isend successfull')
elif rank == 1:
    rcvbuf = np.empty_like(data) # receive buffer
    comm.Recv(rcvbuf, source=0) # blocking receive
    print(f'Process {rank} receives {rcvbuf}')

In [None]:
!mpirun --bind-to core --np 2 python3 mpi4py/demo_ip2p-standard-wait.py

#### Testany

`MPI.Testany` and its friends can be used when the order of completed requests is irrelevant. This call returns immediately and indicates whether any one request has been completed.

In [None]:
%%writefile mpi4py/demo_ip2p-standard-testany.py

import numpy as np
from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
data = [0.12, 3.45, 6.78, 9.10]
reqs = [] # requests

if rank == 0:
    sndbuf = np.array(data) # send buffer
    print(f'Process {rank} sends {sndbuf}')
    for i in range(1, comm.Get_size()):
        req = comm.Isend(sndbuf, dest=i) # standard nonblocking send
        reqs.append(req)
    MPI.Request.Testany(reqs) # class method
elif 0 < rank:
    rcvbuf = np.empty_like(data) # receive buffer
    req = comm.Irecv(rcvbuf, source=0) # nonblocking receive
    # print(f'Process {rank} receives {rcvbuf}') # unsafe!
    req.Wait() # instance method
    print(f'Process {rank} receives {rcvbuf}')

In [None]:
!mpirun --oversubscribe --np 8 python3 mpi4py/demo_ip2p-standard-testany.py

#### Waitall

`MPI.Waitall` blocks until all given requests have been completed and has similar relatives as `MPI.Testany`.

In [None]:
%%writefile mpi4py/demo_ip2p-standard-waitall.py

import numpy as np
from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
data = [0.12, 3.45, 6.78, 9.10]
reqs = []

if rank == 0:
    sndbuf = np.array(data) # send buffer
    print(f'Process {rank} sends {sndbuf}')
    for i in range(1, comm.Get_size()):
        req = comm.Isend(sndbuf, dest=i) # standard nonblocking send
        reqs.append(req)
    print(f'Process {rank} waits for all requests to complete')
    MPI.Request.Waitall(reqs) # class method
    print(f'Process {rank} finished waiting')
elif 0 < rank:
    rcvbuf = np.zeros_like(data) # receive buffer
    req = comm.Irecv(rcvbuf, source=0) # blocking receive
    # print(f'Process {rank} receives {rcvbuf}') # unsafe!
    req.Wait()
    print(f'Process {rank} receives {rcvbuf}')

In [None]:
!mpirun --oversubscribe --np 8 python3 mpi4py/demo_ip2p-standard-waitall.py

## Collective Communication


So far, processes have communicated directly with each other and without any involvement of other processes. In collective communication, on the other hand, all processes in a communicator are involved either by sending messages directly to each other or by forwarding messages. Usually, the motivation behind using this mode of communication is to manipulate a shared set of information, e.g. a problem that requires distribution over several compute nodes due to its size. This collective communication routines are internally and transparently built upon point-to-point communication functions.

### Communicators

Communicators are the centerpiece of collective communication; all collective communication happens relative to them. For all intents and purposes, communicators consist of two parts: a groupd and a context. The context helps to distinguish messages within a communicator from those in other communicators and allows a process to be in several communicators at once. Messages sent in one context cannot be received in another context. A group is nothing else than the group of processes with in a communicator. While the context is generally transparent to the user, groups are convenient for efficiently creating new communicators. 

#### Splitting

A common way to create a new communicator is to __split__ the old one and separate its processes depending on their rank. To this end, each process is assigned a `color` and all processes with the same color end up in a common communicator. Additionally, the processes' ranks in the new communicator are ordered depending on the `key` value.

In [None]:
%%writefile mpi4py/demo_split.py

from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

print(f'Process {rank} in comm({size})')

newcomm = comm.Split(rank % 2, key=rank)
newrank = newcomm.Get_rank()
newsize = newcomm.Get_size()

print(f'Process {rank} in comm({size}) \t {newrank} in newcomm({newsize})')

In [None]:
!mpirun --np 8 --oversubscribe python3 mpi4py/comm_split.py

#### Groups

The main difference from communicators is that groups do not enable communication between processes, instead, they provide local routines to build new groups via set operations and new communicators can be established from these groups. This implies that set operations are local and the creation of a new communicator from a group is collective only over the processes within that group. Communicators are essentially groups with the additional ability to communicate.

In [None]:
%%writefile mpi4py/demo_groups.py

from mpi4py import MPI


comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

grpworld = comm.Get_group() # local
grpeven = grpworld.Incl([0, 2, 4, 6]) # local
grpodd = MPI.Group.Difference(grpworld, grpeven) # local

commeven = comm.Create_group(grpeven) # collective only over grpeven
commodd = comm.Create_group(grpodd) # collective only over grpodd

if (commeven != MPI.COMM_NULL):
    print(f'Process {rank} is in commeven')
    
if (commodd != MPI.COMM_NULL):
    print(f'Process {rank} is in commodd')

In [None]:
!mpirun --np 8 --oversubscribe python3 mpi4py/demo_groups.py

### Collective Communication

Of course there are more interesting things to do than splitting process groups, for example, moving data within a group of processes instead of only sending and receiving from two specific processes. These operations are where MPI can really play its strengths. To this end, MPI provides __three types__ of collective data-movement routines: broadcast, gather, and scatter. In each of which, a process either sends to or receives a __fixed amount of data__ from all processes. For gathering and scattering there are also versions that support a __variable amount of data__ for each process and whose function name is suffixed with a lowercase V. Moreover, since MPI-3 __nonblocking collective communication__ functions are also available. Similar to the nonblocking point-to-point routines, the function names start with a capital I and return a `request` object that can be passed to wait and test functions.

__NOTES__
* collectives need to be called by all processes in a communicator
* amount of data must be known



| FIXED-DATA | Blocking | Nonblocking |
|---|---|---|
| Broadcast | [`Bcast`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.Bcast) | [`Ibcast`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.Ibcast) |
| Scatter  | [`Scatter`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.Scatter) | [`Iscatter`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.Iscatter) |
| Gather  | [`Gather`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.Gather) | [`Igather`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.Igather) |
| Allgather  | [`Allgather`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.Allgather) | [`Iallgather`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.Iallgather) |
| Alltoall  | [`Alltoall`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.Alltoall) | [`Ialltoall`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.Ialltoall) |

| VARIABLE-DATA | Blocking | Nonblocking |
|---|---|---|
| Broadcast | - | - |
| Scatter  | [`Scatterv`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.Scatterv) | [`Iscatterv`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.Iscatterv) |
| Gather  | [`Gatherv`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.Gatherv) | [`Igatherv`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.Igatherv) |
| Allgather  | [`Allgatherv`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.Allgatherv) | [`Iallgatherv`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.Iallgatherv) |
| Alltoall  | [`Alltoallv`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.Alltoallv) | [`Ialltoallv`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.Ialltoallv) |

#### Broadcast

A broadcast is used when one distinguished process, often called the "root", sends the same data to all processes in a communicator.
__one-to-all__

In [None]:
%%writefile mpi4py/demo_broadcast.py

import numpy as np
from mpi4py import MPI


comm = MPI.COMM_WORLD
rank = comm.Get_rank()

if rank == 0:
    data = np.array([2.71, 8.28, 1.82, 8.459], dtype=np.float32)
else:
    data = np.empty(4, dtype=np.float32)

print(f'Process {rank} initially has {data}')
if rank == 0: print(f'Process {rank} broadcasts {data}')
comm.Bcast(data, root=0) # blocking broadcast
print(f'Process {rank} received {data} from process 0')

In [None]:
!mpirun --np 4 --oversubscribe python3 mpi4py/demo_broadcast.py

#### Scatter

While a broadcast operation distributes the same data from the root process to all other processes, a scattering operation sends different data from the root to every process. __one-to-all__

In [None]:
%%writefile mpi4py/demo_scatter.py

import numpy as np
from mpi4py import MPI


comm = MPI.COMM_WORLD
rank = comm.Get_rank()
sndbuf = None # send buffer
rcvbuf = np.empty(1, dtype=np.int32) # receive buffer

if rank == 0:
    sndbuf = np.arange(start=0, stop=comm.Get_size(), step=1, dtype=np.int32)**2
    print(f'Process {rank} scatters {sndbuf}')

comm.Scatter(sndbuf, rcvbuf, root=0) # blocking broadcast
print(f'Process {rank} received {rcvbuf} from process 0')

In [None]:
!mpirun --np 8 --oversubscribe python3 mpi4py/demo_scatter.py

#### Gather

The gather operation allows a distinguishd process to collect specific array elements from each process. This is the inverse of the scattering operation. __all-to-one__

In [None]:
%%writefile mpi4py/demo_gather.py

import numpy as np
from mpi4py import MPI


comm = MPI.COMM_WORLD
rank = comm.Get_rank()
sndbuf = sndbuf = np.array([rank, rank], dtype=np.int32)**2 # send buffer
rcvbuf = None # receive buffer

if rank == 0:
    rcvbuf = np.empty(2*comm.Get_size(), dtype=np.int32) # receive buffer

print(f'Process {rank} sends {sndbuf}')
comm.Gather(sndbuf, rcvbuf, root=0) # blocking gather

if rank == 0:
    print(f'Process {rank} gathered {rcvbuf} from processes 0 to {comm.Get_size()}')

In [None]:
!mpirun --np 8 --oversubscribe python3 mpi4py/demo_gather.py

#### Allgather

Allgather operations can be understood as a gather operation with the addition of all processes receiving the result, instead of only the root. Practically, this is equivalent to a gather operation followed by a broadcast operation by the gathering process, however, a respectable MPI library uses a specialized algorithm.

In [None]:
%%writefile mpi4py/demo_allgather.py

import numpy as np
from mpi4py import MPI


comm = MPI.COMM_WORLD
rank = comm.Get_rank()
sndbuf = sndbuf = np.array([rank, rank], dtype=np.int32)**2 # send buffer
rcvbuf = np.empty(2*comm.Get_size(), dtype=np.int32) # receive buffer

print(f'Process {rank} sends {sndbuf}')
comm.Allgather(sndbuf, rcvbuf) # blocking allgather
print(f'Process {rank} allgathered {rcvbuf} from processes 0 to {comm.Get_size()}')

In [None]:
!mpirun --np 8 --oversubscribe python3 mpi4py/demo_allgather.py

#### Allgatherv

This is simply the variable-data version of an allgather operation.

In [None]:
%%writefile mpi4py/demo_allgatherv.py

import numpy as np
from mpi4py import MPI


comm = MPI.COMM_WORLD
rank = comm.Get_rank()
sndbuf = np.array([0] if (rank == 0) else [rank]*rank, dtype=np.int32) # send buffer
rcvbuf = np.empty(7, dtype=np.int32) # receive buffer
counts = [1, 1, 2, 3]

print(f'Process {rank} sends {sndbuf}')
comm.Allgatherv(sndbuf, [rcvbuf, [1, 1, 2, 3], MPI.INT]) # blocking allgather
print(f'Process {rank} allgatherved {rcvbuf}')

In [None]:
!mpirun --np 4 --oversubscribe python3 mpi4py/demo_allgatherv.py

#### Iscatterv

Unsurprisingly, there is also a nonblocking variable-data version of the scatter operation.

In [None]:
%%writefile mpi4py/demo_iscatterv.py

import numpy as np
from mpi4py import MPI


comm = MPI.COMM_WORLD
rank = comm.Get_rank()
sndbuf = None
rcvbuf = np.empty(1 if (rank == 0) else rank*rank, dtype=np.int32)

if (rank == 0):
    sndbuf = np.array([0] if (rank == 0) else [rank]*rank, dtype=np.int32) # send buffer
    
print(f'Process {rank} sends {sndbuf}')
comm.Iscatterv([sndbuf, [rcvbuf, [1, 1, 2, 3], MPI.INT], MPI.INT], rcvbuf, root=0) # blocking allgather
print(f'Process {rank} iscatterved {rcvbuf}')

In [None]:
!mpirun --np 4 --oversubscribe python3 mpi4py/demo_iscatterv.py

### Collective Computation

Additionally to moving data around between processes, MPI can also perform basic computations on distributed data. This is implemented in the form of __reduce__ and __scan__ operations, where the former returns only the complete result and the latter returns incremental results on each process. The operation to be performed is given to the routine as an argument, which can either be one of the __built-in operations__, e.g. summation or finding a maximum value, or a __user-defined operation__. The standard reduce function is a so-called __rooted collective__, since the result of the reduction is only available on a distinguished root process, however, there is also a __non-rooted__ version, where the result is available on all processes, and a __non-blocking__ version available.

__Disclaimer:__ As of August 2022, MPI4Py unfortunately does not support MPI's scan or exscan routines.

| FIXED-DATA | Blocking | Nonblocking |
|---|---|---|
| Reduce | [`Reduce`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.Bcast) | [`Ireduce`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.Ibcast) |
| Allreduce  | [`Allreduce`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.Scatter) | [`Iallreduce`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.Iscatter) |
| Scan  | - | - |
| Exscan  | - | - |

#### Reduce

In [1]:
%%writefile mpi4py/demo_reduce.py

import numpy as np
from mpi4py import MPI


comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

rcvbuf = np.zeros(2, dtype=np.int32)
sndbuf = np.array([rank, size], dtype=np.int32)

print(f'Process {rank} sends {sndbuf} via Reduce')
comm.Reduce(sndbuf, rcvbuf, root=0, op=MPI.SUM)
print(f'Process {rank} receives {rcvbuf} via Reduce')

Writing mpi4py/demo_reduce.py


In [2]:
!mpirun --np 8 --oversubscribe python3 mpi4py/demo_reduce.py

Process 7 sends [7 8] via Reduce
Process 0 sends [0 8] via Reduce
Process 4 sends [4 8] via Reduce
Process 5 sends [5 8] via Reduce
Process 5 receives [0 0] via Reduce
Process 7 receives [0 0] via Reduce
Process 1 sends [1 8] via Reduce
Process 3 sends [3 8] via Reduce
Process 1 receives [0 0] via Reduce
Process 3 receives [0 0] via Reduce
Process 6 sends [6 8] via Reduce
Process 6 receives [0 0] via Reduce
Process 2 sends [2 8] via Reduce
Process 2 receives [0 0] via Reduce
Process 0 receives [28 64] via Reduce
Process 4 receives [0 0] via Reduce


#### Allreduce

In [None]:
%%writefile mpi4py/demo_allreduce.py

import numpy as np
from mpi4py import MPI


comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

rcvbuf = np.zeros(2, dtype=np.float32)
sndbuf = np.array([rank, size], dtype=np.float32)

print(f'Process {rank} sends {sndbuf} via Reduce')
comm.Allreduce(sndbuf, rcvbuf, op=MPI.PROD)
print(f'Process {rank} receives {rcvbuf} via Reduce')

In [None]:
!mpirun --np 8 --oversubscribe python3 mpi4py/demo_allreduce.py

#### Iallreduce

In [None]:
%%writefile mpi4py/demo_iallreduce.py

import numpy as np
from mpi4py import MPI


comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

rcvbuf = np.zeros(2, dtype=np.float32)
sndbuf = np.array([rank, size], dtype=np.float32)

print(f'Process {rank} sends {sndbuf} via Reduce')
req = comm.Iallreduce(sndbuf, rcvbuf, op=MPI.SUM)
# some local computation
req.Wait()
print(f'Process {rank} receives {rcvbuf} via Reduce')

In [None]:
!mpirun --np 8 --oversubscribe python3 mpi4py/demo_iallreduce.py

### Collective Synchronization

With the barrier, MPI provides a single global synchronisation operation. A process that posts a call to this function halts until all other processes in the communicator have also done so. It is tempting to understand this as some kind of checkpoint from which all processes proceed at the same time, but this is not the case. The call to the barrier blocks until all processes have reached the barrier, afterwards the processes are free to proceed. Barriers are useful only for a handful of cases, however, they are not necessary in an ideal program. 

#### Barrier

In [None]:
%%writefile mpi4py/demo_barrier.py

import time
from mpi4py import MPI


comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

time.sleep(float(rank)/2)
print(f'Process {rank} has reached the barrier')
comm.Barrier()
print(f'Process {rank} has passed the barrier')

In [None]:
!mpirun --np 8 --oversubscribe python3 mpi4py/demo_barrier.py

## One-sided Communication

Collective as well as point-to-point communication share the characteristic that in general two processes have to actively participate in the communication, a sender and a receiver. More precisely, they both are synchronous, two-sided modes of communication. This might cause delays when one of the processes has to wait frequently or longer for the other process. One-sided communication alleviates this issue by offering routines for remote memory access (RMA) that allow individual processes to initiate communication as either a sending or receiving party. RMA implements zero-copy networking by which data can be directly transferred between the main memories instead of passing it through the whole memory hierarchy up to the CPU. While the previously introduced two-sided communication routines require matching send and receive operations, where both participating processes have to anticpate the transfer, one-sided communication routes are more permissive. 

In general, one-sided communication operates according to the following pattern: 
* Allocate memory __windows__ through a collective routine
* Start an RMA __epoch__
* Communicate via put, get, and accumulate
* Stop an RMA __epoch__
* Deallocate memory windows

#### Windows

In order to make one-sided communication possible, all processes must establish buffers in their respective local memory that remote processes can access at will. These buffers are called "windows" and they are created through a collective call that is executed by all processes willing to operate on these windows.

Three routines are provided for window creation:
* [MPI.Win.create](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Win.html?#mpi4py.MPI.Win.Create), in case the memory buffer is already allocated,
* [MPI.Win.allocate](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Win.html#mpi4py.MPI.Win.Allocate) and [MPI.Win.allocate_shared](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Win.html#mpi4py.MPI.Win.Allocate_shared), is case the memory has yet to be allocated, and 
* [MPI.Win.create_dynamic](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Win.html?#mpi4py.MPI.Win.Create_dynamic), in case the necessary size of the memory buffer is unknown.

#### Epochs & Synchronization

The MPI standard defines two ways of accessing the window of a remote process: With __active target sycnhronization__, the remote window can only be accessed during a specific time period, the so-called "epoch", which is initiated by calling [MPI.Win.Fence](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Win.html#mpi4py.MPI.Win.Fence) by both the origin and the target process and also ended by calling it a second time by both processes. Between these two calles, the window can be accessed by a remote process. On the other hand, __passive target synchronization__ only requires the origin process to [MPI.Win.Lock](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Win.html#mpi4py.MPI.Win.Lock) and [MPI.Win.Unlock](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Win.html#mpi4py.MPI.Win.Unlock) the target window between performing any operations.

#### Put, Get, Accumulate

One-sided communication relies on three basic communication routines:
* [MPI.Win.Get](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Win.html#mpi4py.MPI.Win.Get) for remote reads,
* [MPI.Win.Put](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Win.html#mpi4py.MPI.Win.Put) for remote writes, and
* [MPI.Win.Accumulate](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Win.html#mpi4py.MPI.Win.Accumulate) for remote updates where basic operations, such as summation or replacement, can be performed at the target windows.

#### Get with Active Target Synchronization

In [None]:
%%writefile mpi4py/demo_get.py

import numpy as np
from mpi4py import MPI
from mpi4py.util import dtlib


mpi_dtype = MPI.INT
np_dtype = dtlib.to_numpy_dtype(mpi_dtype)
itemsize = mpi_dtype.Get_size()

sndbuf = np.empty((), dtype=np_dtype)
rcvbuf = np.empty_like(sndbuf)

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

right = (rank+1) % size;
left  = (rank-1+size) % size;
sum = 0;
np.copyto(sndbuf, rank)

win = MPI.Win.Create(memory=sndbuf, disp_unit=sndbuf.itemsize, info=MPI.INFO_NULL, comm=comm) # create window

for i in range(size):
    win.Fence(MPI.MODE_NOPUT | MPI.MODE_NOPRECEDE) # active target synchronization
    win.Get((rcvbuf, 1, MPI.INT), left, (0, 1, MPI.INT))
    win.Fence(MPI.MODE_NOSTORE | MPI.MODE_NOPUT | MPI.MODE_NOSUCCEED)  # active target synchronization
    np.copyto(sndbuf, rcvbuf)
    sum += rcvbuf

win.Free() # free window

print(f'Process {rank} computes\tsum = {sum}')

In [None]:
!mpirun --np 6 --oversubscribe python3 mpi4py/demo_osc-get-ats.py

#### Get & Put with Passive Target Synchronization

In [None]:
%%writefile mpi4py/demo_osc-getput-pts.py

import time
import numpy as np
from mpi4py import MPI
from mpi4py.util import dtlib


mpi_dtype = MPI.INT
np_dtype = dtlib.to_numpy_dtype(mpi_dtype)
item_size = mpi_dtype.Get_size()

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
comm_size = comm.Get_size()

buf = None
win_size = (comm_size * item_size) if rank == 0 else 0
win = MPI.Win.Allocate(win_size, comm=comm)

if rank == 0:
    buf = np.arange(start=0, stop=comm_size, dtype=np_dtype)
    win.Lock(rank=0)
    win.Put(buf, target_rank=0)
    win.Unlock(rank=0)
    comm.Barrier()
else:
    buf = np.empty((comm_size), dtype=np_dtype)
    comm.Barrier()
    win.Lock(rank=0)
    win.Get(buf, target_rank=0)
    time.sleep(1)
    win.Unlock(rank=0)

win.Free() # free window
    
print(f'Process {rank} computes\tbuf = {buf}')

In [None]:
!mpirun --np 6 --oversubscribe python3 mpi4py/demo_osc-getput-pts.py