# Introduction to MPI
- [MPI](https://www.mpi-forum.org/) stands for Message Passing Interface
- [Distributed Memory](https://en.wikipedia.org/wiki/Distributed_memory?useskin=vector)
- A library specification
- Private memory space
- Communication is needed: sender an receiver cooperate

<div style="text-align: center;">
    <img src="https://www.iue.tuwien.ac.at/phd/weinbub/figures/distributedmemory.png" alt="Image Description" width="800">
</div>


Extensive documentation (very much needed, 6 basic functions, +125 functions in total!)
- [MPI tutorial](https://mpitutorial.com/tutorials/) 
- [OpenMPI docs](https://www.open-mpi.org/doc/current/)
- [MPI Cheat sheet](https://cheatography.com/test2000/cheat-sheets/parallel/)
- [Archer2 training](https://www.archer2.ac.uk/training/courses/200514-mpi/)
- [llnl Tutorial](https://hpc-tutorials.llnl.gov/mpi/)
- [PRC](https://researchcomputing.princeton.edu/education/external-online-resources/mpi)
- [Prace Pdc support](https://pdc-support.github.io/introduction-to-mpi/about/index.html)
- ...

# MPI function calls/communication characteristics
- Point to point
- Collective
- Blocking and non-blocking
<table>
  <tr>
    <td>
      <div style="text-align: center;">
    <img src="https://hpc-tutorials.llnl.gov/mpi/images/buffer_recv.gif" alt="Image Description" width="600">
</div>
      </td>
    <td>
        <div style="text-align: center;">
    <img src="https://hpc-tutorials.llnl.gov/mpi/images/collective_comm.gif" alt="Image Description" width="600">
</div>
    </td>
  </tr>
  <tr>
      <td>
          <div style="text-align: center;">
    <img src="http://www.md-esg.eu/wp-content/uploads/2021/10/juwels_lammps_scaling.png" alt="Image Description" width="600">
</div>
      </td>
      <td>
         <div style="text-align: center;">
    <img src="https://hpc-tutorials.llnl.gov/mpi/images/Cartesian_topology.gif" alt="Image Description" width="600">
</div> 
      </td>
  </tr>
</table>


# MPI essential functions

In [3]:
%%writefile mpi_basic.cpp
#include "mpi.h"
#include <iostream>

  int main(int argc, char **argv)
  {
    int processId;                 /* rank of process */
    int noProcesses;               /* number of processes */

    MPI_Init(&argc, &argv);                   /* Mandatory */
    MPI_Comm_size(MPI_COMM_WORLD, &noProcesses);
    MPI_Comm_rank(MPI_COMM_WORLD, &processId);
      
    std::cout << "Hello from process " << processId << " of " << noProcesses << "\n";

    MPI_Finalize();                       /* Mandatory */

    return 0;
  }


Overwriting mpi_basic.cpp


To compile, use `mpic++` (to understand what it does, use `mpic++ --showme`)

In [4]:
!mpic++ mpi_basic.cpp -o mpi_basic.x

And no run it with `mpirun`

In [5]:
!mpirun -np 2 ./mpi_basic.x

Hello from process 1 of 2
Hello from process 0 of 2


Functions explanations:
- [`MPI_Init`](https://www.open-mpi.org/doc/current/man3/MPI_Init.3.php)
- [`MPI_Comm_size`](https://www.open-mpi.org/doc/current/man3/MPI_Comm_size.3.php)
- [`MPI_Comm_rank`](https://www.open-mpi.org/doc/current/man3/MPI_Comm_size.3.php)
- [`MPI_Finnalize`](https://www.open-mpi.org/doc/current/man3/MPI_Finalize.3.php)

Notice that you are running almost identical processes, the same source code, only the rank changes. By using that, you can differentiate among processes.

The processes can run on the same machine, or on several machines.

## Exercise
Launch the same program using slurm, forst on the sme node, then on several nodes.

In [6]:
# Put the slurm script here


# MPI practical example: sum of large array
This is analogous to the openmp example. But now the array can be as large as the memory for all computers is in total. First let's write the the serial version, then we will solve the problem using both point-to-point comms and later with collective comms. Finally, the same will be done in `python`. 

## Serial version

In [12]:
%%writefile serial_main.cpp
#include "serial_avg.h"

int main(int argc, char **argv) {
    const int N = std::stoi(argv[1]);
    std::vector<double> data(N, 0.0);
    
    initialize(data);
    
    compute_avg(data);
    
    return 0;
}

Overwriting serial_main.cpp


In [13]:
%%writefile serial_avg.h
#pragma once
#include <vector>
#include <random>
#include <iostream>
void initialize(std::vector<double> & array);
void compute_avg(const std::vector<double> & array);

Overwriting serial_avg.h


In [14]:
%%writefile serial_avg.cpp
#include "serial_avg.h"
void initialize(std::vector<double> & array)
{
    std::mt19937 gen(0); // 0 == seed
    std::uniform_real_distribution<double> dis(0.0, 1.0);
    for (auto & x : array) {
        x = dis(gen);
    }
}

void compute_avg(const std::vector<double> & array) {
    double result = 0.0;
    for (auto & x : array) {
        result += x;
    }
    std::cout.setf(std::ios::scientific);
    std::cout.precision(16);
    std::cout << "avg: " << result/array.size();
}


Overwriting serial_avg.cpp


In [17]:
!echo "Compiling ..."
!g++ -std=c++17 -O3  serial_main.cpp serial_avg.cpp -o serial_array_avg.x
!echo "Executing ..."
!./serial_array_avg.x 100000000

Compiling ...
Executing ...
avg: 5.0005594736679237e-01

## Point to point parallel version
In this case we will split the work among processes, and then use send and receive to communicate the partial sums and finally print the average.

In [64]:
%%writefile parallel_main.cpp
#include "parallel_avg_pointpoint.h"
#include "mpi.h"

int main(int argc, char **argv) {
    // MPI initialization
    MPI_Init(&argc, &argv); 
    int pid, np;
    MPI_Comm_size(MPI_COMM_WORLD, &np);
    MPI_Comm_rank(MPI_COMM_WORLD, &pid);
    
    // local problem definition
    const int N = std::stoi(argv[1]); // total size
    int Nlocal = N/np; // size for this process
    std::vector<double> data(Nlocal, 0.0);
    
    initialize(data, pid, np);
    
    compute_avg(data, pid, np);
    
    MPI_Finalize();
    
    return 0;
}

Writing parallel_main.cpp


In [65]:
%%writefile parallel_avg.h
#pragma once
#include <vector>
#include <random>
#include <iostream>
#include "mpi.h"

void initialize(std::vector<double> & array, int pid, int np);
void compute_avg(const std::vector<double> & array, int pid, int np);

Writing parallel_avg.h


In [66]:
%%writefile parallel_avg_pointpoint.cpp
#include "parallel_avg.h"
void initialize(std::vector<double> & array, int pid, int np)
{
    std::mt19937 gen(pid); // pid == seed (what if seed == 0?)
    std::uniform_real_distribution<double> dis(0.0, 1.0);
    for (auto & x : array) {
        x = dis(gen);
    }
}

void compute_avg(const std::vector<double> & array, int pid, int np) {
    double result = 0.0;
    for (auto & x : array) {
        result += x;
    }
    // mpi communication: TODO
    if (0 == pid) {
        // receive and accumulate
    } else {
        // send
    }
    // only master prints
    std::cout.setf(std::ios::scientific);
    std::cout.precision(16);
    std::cout << "avg: " << result/array.size();
    std::cout << pid << "\n";
}


Overwriting parallel_avg_pointpoint.cpp


In [67]:
!echo "Compiling ..."
!mpic++ -std=c++17 parallel_main.cpp parallel_avg_pointpoint.cpp -o parallel_array_pointpoint.x
!echo "Executing ..."
!mpirun -np 2 parallel_array_pointpoint.x 100000000

Compiling ...
Executing ...
avg: 4.9993301343222502e-011
avg: 5.0003528264511499e-010


## Collective communications version
In this case, a reduction operation would be handy and easier. Please read the documentation and implement the solution.


In [69]:
%%writefile parallel_avg_collective.cpp
#include "parallel_avg.h"
void initialize(std::vector<double> & array, int pid, int np)
{
    std::mt19937 gen(pid); // pid == seed (what if seed == 0?)
    std::uniform_real_distribution<double> dis(0.0, 1.0);
    for (auto & x : array) {
        x = dis(gen);
    }
}

void compute_avg(const std::vector<double> & array, int pid, int np) {
    double result = 0.0;
    for (auto & x : array) {
        result += x;
    }
    // TODO: create reduction, collective communication
    
    // TODO: only master prints 
    std::cout.setf(std::ios::scientific);
    std::cout.precision(16);
    std::cout << "avg: " << result/array.size();
    std::cout << pid << "\n";
}


Writing parallel_avg_collective.cpp


In [70]:
!echo "Compiling ..."
!mpic++ -std=c++17 parallel_main.cpp parallel_avg_collective.cpp -o parallel_array_pointpoint.x
!echo "Executing ..."
!mpirun -np 2 parallel_array_pointpoint.x 100000000

Compiling ...
Executing ...
avg: 4.9993301343222502e-011
avg: 5.0003528264511499e-010


## Parallel metrics for both point-to-point and collective versions
Using `slurm`, compute the parallel metrics for both the point-to-point and the collective communication versions. Run both on a single computer and on several nodes  

# Python mpi
You can use the [`mpi4py`](https://mpi4py.readthedocs.io/en/stable/) package to call mpi from python. 
First, let's implement the point to point example. Take into account that `mpi4py` can serialize arbitrary objects using python pickle, but that is not optimal, so we better use `numpy` directly
## Point to point comms


In [18]:
%%writefile sum_pointpoint.py
from mpi4py import MPI
import sys
import numpy as np

def initiliaze(size, pid, npr):
    np.random.seed(pid)
    return np.random.uniform(0.0, 1.0, size=size)
    
def parallel_avg(data, pid, npr, comm):
    tag = 11
    sumLocal = np.sum(data)
    if pid >= 1:
        comm.Send([sumLocal, MPI.DOUBLE], dest=0, tag=tag)
    elif 0 == pid:
        sumTotal = sumLocal
        buf = np.empty(1, dtype=np.float64)
        for ii in range(1, npr):
            comm.Recv([buf, MPI.DOUBLE], source=ii, tag=tag)
            sumTotal += buf[0]
        N = data.size*npr
        print(f"avg: {sumTotal/N:20.15e}")
        
def main():
    # Initialize MPI
    comm = MPI.COMM_WORLD
    npr = comm.Get_size()
    pid = comm.Get_rank()
    N = int(sys.argv[1])
    Nlocal = int(N/npr)
    data = initiliaze(Nlocal, pid, npr)
    parallel_avg(data, pid, npr, comm)

if __name__ == '__main__':
    main()


Overwriting sum_pointpoint.py


In [23]:
!mpirun -np 10 python sum_pointpoint.py 1000000000

avg: 5.000068163660500e-01


Now implement a `slurm` script and compute the parallel metrics

## Collective communications


In [29]:
%%writefile sum_collective.py
from mpi4py import MPI
import sys
import numpy as np

def initiliaze(size, pid, npr):
    np.random.seed(pid)
    return np.random.uniform(0.0, 1.0, size=size)
    
def parallel_avg(data, pid, npr, comm):
    tag = 11
    sumLocal = np.sum(data)
    sumTotal = comm.allreduce(sumLocal, op=MPI.SUM) # What is allreduce?
    #comm.Reduce([sumLocal, MPI.DOUBLE], [sumTotal, MPI.DOUBLE], op=MPI.SUM, root=0)
    if 0 == pid:
        N = data.size*npr
        print(f"avg: {sumTotal/N:20.15e}")
        
def main():
    # Initialize MPI
    comm = MPI.COMM_WORLD
    npr = comm.Get_size()
    pid = comm.Get_rank()
    N = int(sys.argv[1])
    Nlocal = int(N/npr)
    data = initiliaze(Nlocal, pid, npr)
    parallel_avg(data, pid, npr, comm)

if __name__ == '__main__':
    main()


Overwriting sum_collective.py


In [31]:
!mpirun -np 10 python sum_collective.py 1000000000

avg: 5.000068163660500e-01


Now implement a `slurm` script and compute the parallel metrics

There is a lot more to learn regarding MPI, like [blocking/unblocking](https://hpc-tutorials.llnl.gov/mpi/routine_args/) ops, many other types of [collective comms](https://www.mpi-forum.org/docs/mpi-1.1/mpi-11-html/node64.html), [topologies](https://hpc-tutorials.llnl.gov/mpi/virtual_topologies/), [parallel IO](https://docs.alliancecan.ca/wiki/Parallel_I/O_introductory_tutorial) (but better do it using hdf5 or netcdf), etc.