# (Distributed) Parallel Python
Unfortunately, not all problems are of the "embarrassingly parallel" type. For example, simulations of dynamical systems (nervous/weather/quantum systems) can easily become too computationally heavy for single machines, both in terms of memory and compute. One solution is to *distribute* such simulations across multiple machines. In particular this implies that we are using multiple processes which all work on a part of the simulation and hence need to communicate. The de-facto standard protocol for inter-process communication (in academia) is the Message Passing Interface (MPI).

## What is MPI?
MPI is a standard for passing data ("messages") between processes. In the case of `mpi4py` the data typically consists of pickled Python objects. As a user, you don't need to worry how the data gets from one process to the other (possibly via a local network), the MPI implementation takes care of that for you.

#### WARNING
Compared to the solutions we've considered so far, using MPI *effectively* requires significant cognitive and development overhead, so you should very carefully evaluate whether you need to get your hands this dirty before reimplementing your simulation (or do it for fun as a challenge while your supervisor is on holidays ;) ). In the following we will focus on the `mpi4py` package.

#### How to install MPI?
Briefly: Don't. Typically you'll want to use MPI on some cluster/supercomputer on which the system administrator has taken care of installing and configuring an MPI library. You can also install it on your machine, for Linux typically via the package manager.

## Starting multiple processes: who am I?
Using MPI requires you to change the way you start your Python program. First, we can not (easily) run it from a jupyter notebook. Second, instead of calling it like `python <script>` from the commandline, we need to run it via the `mpirun` executable. At this point, we also specify how many processes we'd like to start via `-np <number of processes>`. How many processes we use should reflect both the available hardware as well as our choices of how to distribute work.

In MPI each process is assigned a index ("rank" in MPI lingo). This helps you to organize work ("rank X does Y") and communication ("rank X sends Z to rank Y"). We first implement (one of) the simplest possible MPI program(s): report your rank and exit.

In [None]:
# %load mpi_hello_world.py
from mpi4py import MPI

comm = MPI.COMM_WORLD
process = comm.Get_rank()

print(f"Hello world from {process = }")


## Performing different work on different processes
As mentioned, using the rank, we can let different processes do different work, for example, for a dynamical systems simulation, we can use it to determine initial positions of particles. For reproducibility, we would like to fix the seed of the random number generator. At the same time we want to make sure that different processes are using different number streams. A simple way to achieve this is to use some fixed seed in combination with the rank.

Let's see how we would generate initial conditions for a dynamical systems simulation.

In [None]:
# %load mpi_random_numbers.py
from mpi4py import MPI
import numpy as np


comm = MPI.COMM_WORLD  # get MPI communicator
process = comm.Get_rank()  # get process index
n_processes = comm.Get_size()  # get total number of processes

# initialize a random number generator for each process
rng = np.random.RandomState(1235 + process)

# define the number of particles per processes and time steps to simulate
n_particles_per_process = 3
n_particles = n_particles_per_process * n_processes
n_time_steps = 5

# determine work distribution between ranks
x_min = process / n_processes
x_max = (process + 1) / n_processes

# generate initial positions
x = np.random.uniform(x_min, x_max, size=n_particles_per_process)

print(f"{process=}: particle positions {x}")


So far, the ranks are acting independently. The distinguishing feature of MPI is that is allows communication between processes. In the following we consider *one particular view* of parallelization across processes.

## Distributed simulation
Now, let's consider a (super simplified) dynamical systems simulation. We have particles moving in a one-dimensional "box" between 0 and 1. Assuming lots and lots of particles, we may want to split the work of propagating the particles, i.e., computing their new position, across different processes. Here we decide that each process should propagate the particles within a certain "volume" of the box. For example, using two processes, one of them propagates all particles between 0 and 0.5, the other all particles between 0.5 and 1. Of course particles can cross the boundary from below 0.5 to above 0.5 and we hence need to communicate positions between the processes. Here, we go for a simple implementation: after each propagation step, information about the new positions is shared across all ranks. Each rank afterwards determines which particles it should propagate in the next step.

In [None]:
# %load mpi_particles.py
from mpi4py import MPI
import numpy as np
import time

from lib_particles import Particle, compute_new_particle_position


comm = MPI.COMM_WORLD
process = comm.Get_rank()
n_processes = comm.Get_size()

# initialize a random number generator for each process
rng = np.random.RandomState(1235 + process)

# define the number of particles per processes and time steps to simulate
n_particles_per_process = 3
n_particles = n_particles_per_process * n_processes
n_time_steps = 5

# determine work distribution between ranks
x_min = process / n_processes
x_max = (process + 1) / n_processes

# initialize local particles assuming homogeneity
local_particles = [
    Particle(n_particles_per_process * process + idx, rng.uniform(x_min, x_max))
    for idx in range(n_particles_per_process)
]
print(
    f"initial configuration ({process=}):",
    list(sorted(local_particles, key=lambda p: p.idx)),
)
# exit()

# simulate dynamics for a couple of time steps
for time_step in range(n_time_steps):

    # propagate particle positions
    for particle in local_particles:
        particle.x = compute_new_particle_position(particle.x)

    # communicate particle positions
    received_particles = []
    for source_process in range(n_processes):
        # we communicate the list `local_particles` from rank `root` to all
        # other ranks; all ranks receive the same list from rank `root`, which
        # is stored in `recv_buffer`; communication is *blocking*, i.e., all
        # ranks wait here until the have received the data
        received_particles += comm.bcast(local_particles, root=source_process)

    assert len(received_particles) == n_particles

    # after the communication is done, we figure out which particles this
    # rank now needs to keep track off
    local_particles = []
    for particle_new in received_particles:
        if x_min <= particle_new.x < x_max:
            local_particles.append(particle_new)

    print()
    print(
        f"{time_step=} ({process=}):",
        list(sorted(local_particles, key=lambda p: p.idx)),
    )


## Outlook
- optimizations/different communication patterns: e.g., send data only to the processes that need it, interleaving communication and computation
- (most) common issue: deadlocks, i.e., some processes are waiting for messages that they never receive :O
- as soon as you start using bigger systems, you will have to go through a scheduler to run your scripts, e.g., SLURM, LoadLeveler, etc.; this means writing a (bash) script which tells the scheduler what kind of resources you need and how to run your scripts

### Resources
- https://mpitutorial.com/
- https://www.open-mpi.org/doc/
- https://www.open-mpi.org/doc/