# Variation 2: MPI Parallel HDF5

**Table of contents:**
 - [Dividing the work](#Dividing-the-work)
 - [Passing arguments to our program](#Passing-arguments-to-our-program)
 - [Writing HDF5 files](#Writing-HDF5-files)
 - [Discussion](#Discussion)

In this section of the tutorial, we learn how to use the __[Message Massing Interface (MPI)](https://en.wikipedia.org/wiki/Message_Passing_Interface)__ to parallelize the sample path generation, and how multiple processes can simultaneously write to a shared HDF5 file. I turns out that apart from MPI boilerplate and the code that partitions the workload, the original code remains largely unchanged.

This is not an introduction to MPI. You can think of it as a framework that allows us to spawn multiple (many!) processes in a single machine or across a cluster of machines, which can then exchange messages process-to-process, or collectively among groups of processes. Given `nranks` $\geq 1$ processes, each process is identified by its `rank`, where $0\leq$ `rank` $<$ `nranks`.


Our goal is to store the floating-point values of `path_count` series with `step_count` values each in a two-dimensional array of `path_count` rows and `step_count` columns. Instead of a single process, we have `nranks` processes at our disposal for the sample path generation. The first point of discussion is how we divide the work among `nranks` processes. Since passing arguments to the program(s) is identical to the original program, there isn't much to new to say about that. The only other novelty is to explore how multiple processes write different parts of a single dataset (`/data`). In the section dealing with an unknown amount of data, we saw selections and partial I/O as a means of incrementally writing a dataset. We will use a similar technique when writing in parallel.

## Dividing the work

Given the task of creating `path_count` sample paths (of length `step_count`), and having `nranks` processes to accomplish that task, it is reasonable to evenly divide the work among processes. Since `path_count` may not be divisible by `nranks`, there will be a few ranks to which we assign an extra path. The function `partition_work()`, shown below, implements this idea. Given a total of `path_count` paths and `nranks` processes, for each process `rank`, it computes the range of rows [`start`, `stop`) for which the process will generate sample paths. In other words, process `rank` will generate and write the row-block `[start,stop) x step_count` of the two-dimensional dataset `/data` with dimensions `path_count x step_count`.

In [None]:
%%writefile src/partitioner.hpp

#ifndef PARTITIONER_HPP
#define PARTITIONER_HPP

#include <cstddef>

extern void partition_work(std::size_t path_count, int rank, int nranks, std::size_t &start, std::size_t &stop);

#endif

In [None]:
%%writefile src/partitioner.cpp
#include "partitioner.hpp"

using namespace std;

void partition_work(size_t path_count, int rank, int nranks, size_t &start, size_t &stop)
{
  size_t count = path_count / nranks;
  size_t remainder = path_count % nranks;

  if ((size_t)rank < remainder) {
    // The first 'remainder' ranks get 'count + 1' tasks each
    start = rank * (count + 1);
    stop = start + count;
  } else {
    // The remaining 'nranks - remainder' ranks get 'count' task each
    start = rank * count + remainder;
    stop = start + (count - 1);
  }
}

In [None]:
%%bash
g++ -std=c++17 -Wall -pedantic -I./include -c ./src/partitioner.cpp -o ./build/partitioner.o

## Passing arguments to our program

By now, this should be a familiar theme. We need to account for an extra argument, `--use_subfiling`, that tells us whether to use [sub-filing](https://docs.hdfgroup.org/hdf5/rfc/RFC_VFD_subfiling_200424.pdf), i.e., we use multiple sub-files instead of a single shared file.

In [None]:
%%writefile src/parse_arguments2.hpp
#ifndef PARSE_ARGUMENTS2_HPP
#define PARSE_ARGUMENTS2_HPP

#include "argparse.hpp"

// Sets the options for which we are looking
extern void set_options2(argparse::ArgumentParser& program);

// Tests the options and retrieves the arguments
extern int get_arguments2
(
    const argparse::ArgumentParser& program,
    size_t&                         path_count,
    size_t&                         step_count,
    double&                         dt,
    double&                         theta,
    double&                         mu,
    double&                         sigma,
    bool&                           use_subfiling
);

#endif

In [None]:
%%writefile src/parse_arguments2.cpp
#include "parse_arguments2.hpp"
#include <cfloat>
#include <iostream>

using namespace std;

void set_options2(argparse::ArgumentParser& program)
{
    program.add_argument("-p", "--paths")  // command line arguments
    .help("chooses the numnber of paths")  // synopsis
    .default_value(size_t{100})            // default value
    .scan<'u', size_t>();                  // expected type

    program.add_argument("-s", "--steps")
    .help("chooses the number of steps")
    .default_value(size_t{1000})
    .scan<'u', size_t>();

    program.add_argument("-d", "--dt")
    .help("chooses the time step")
    .default_value(double{0.01})
    .scan<'f', double>();

    program.add_argument("-t", "--theta")
    .help("chooses the rate of reversion to the mean")
    .default_value(double{1.0})
    .scan<'f', double>();

    program.add_argument("-m", "--mu")
    .help("chooses the long-term mean of the process")
    .default_value(double{0.0})
    .scan<'f', double>();

    program.add_argument("-g", "--sigma")
    .help("chooses the volatility of the process")
    .default_value(double{0.1})
    .scan<'f', double>();

    program.add_argument("--use_subfiling");
}

int get_arguments2
(
    const argparse::ArgumentParser& program,
    size_t&                         path_count,
    size_t&                         step_count,
    double&                         dt,
    double&                         theta,
    double&                         mu,
    double&                         sigma,
    bool&                           use_subfiling
)
{
    path_count = program.get<size_t>("--paths");
    if (path_count == 0) {
        cerr << "Number of paths must be greater than zero" << endl;
        return -1;
    }
    step_count = program.get<size_t>("--steps");
    if (step_count == 0) {
        cerr << "Number of steps must be greater than zero" << endl;
        return -1;
    }
    dt = program.get<double>("--dt");
    if (dt < DBL_MIN) {
        cerr << "Time step must be greater than zero" << endl;
        return -1;
    }
    theta = program.get<double>("--theta");
    if (theta < DBL_MIN) {
        cerr << "Reversion rate must be greater than zero" << endl;
        return -1;
    }
    mu = program.get<double>("--mu");
    sigma = program.get<double>("--sigma");
    if (sigma < DBL_MIN) {
        cerr << "Volatility must be greater than zero" << endl;
        return -1;
    }
    use_subfiling = program.is_used("--use_subfiling");

    return 0;
}

In [None]:
%%bash
g++ -std=c++17 -Wall -pedantic -I./include -c ./src/parse_arguments2.cpp -o ./build/parse_arguments2.o

## Writing HDF5 files

Lines 15-36 are just MPI boilerplate to initialize MPI and to determine the total number of MPI ranks, `nprocs`, and the rank of the current process, `myid`.
Each process obtains its workpackage from the `partition_work()` function (line 67). We know already how to create sample paths and can reuse the existing `ou_sampler()` function. (Line 70)

In lines 91-99, we construct the hyperslab selections ("NumPy slices"), in-memory and in-file, and are ready to call `H5Dwrite()` in line 105.

The attribute decoration (lines 116-131) works exactly the same as in the original code.

In [None]:
%%writefile src/ou_hdf5_mpi.cpp
#include "parse_arguments.hpp"
#include "parse_arguments2.hpp"
#include "partitioner.hpp"
#include "ou_sampler.hpp"

#include "hdf5.h"
#include <iostream>
#include <vector>

using namespace std;

int main(int argc, char *argv[])
{
    // <ALTERNATIVE>:: The standard MPI IO File Driver only requires:
    //
    // MPI_Init(&argc, &argv);
    //
    // However, the subfiling VFD requires MPI_Init_thread with MPI_THREAD_MULTIPLE

    int mpi_thread_required = MPI_THREAD_MULTIPLE;
    int mpi_thread_provided = 0;

    MPI_Init_thread(&argc, &argv, mpi_thread_required, &mpi_thread_provided);
    if (mpi_thread_provided < mpi_thread_required)
    {
      cout << "MPI_THREAD_MULTIPLE not supported" << endl;
      MPI_Abort(MPI_COMM_WORLD, -1);
    }

    // Get the number of processes
    int nprocs;
    MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
    // Get the rank of the process
    int myid;
    MPI_Comm_rank(MPI_COMM_WORLD, &myid);

    size_t path_count, step_count;
    double dt, theta, mu, sigma;
#ifdef H5_HAVE_SUBFILING_VFD    
    bool subfiling;
#endif    

    argparse::ArgumentParser program("ou_hdf5_mpi");
    set_options(program);
    program.parse_args(argc, argv);
#ifdef H5_HAVE_SUBFILING_VFD
    get_arguments2(program, path_count, step_count, dt, theta, mu, sigma, subfiling);
#else
    get_arguments(program, path_count, step_count, dt, theta, mu, sigma);
#endif     

    if (myid == 0)
    {
        cout << "Running on " << nprocs << " MPI ranks with parameters:"
            << " paths=" << path_count << " steps=" << step_count
            << " dt=" << dt << " theta=" << theta << " mu=" << mu << " sigma=" << sigma
#ifdef H5_HAVE_SUBFILING_VFD
            << " subfiling=" << subfiling
#endif
            << endl;
    }

    vector<double> ou_process;
    
    size_t start, stop;
    partition_work(path_count, myid, nprocs, start, stop);
    size_t my_path_count = stop - start + 1;

    ou_sampler(ou_process, my_path_count, step_count, dt, theta, mu, sigma);
    
    // Use the Subfiling or MPI-IO driver
    hid_t fapl = H5Pcreate(H5P_FILE_ACCESS);
#ifdef H5_HAVE_SUBFILING_VFD
    if(subfiling)
      H5Pset_fapl_subfiling(fapl, NULL);
    else
#endif
      H5Pset_fapl_mpio(fapl, MPI_COMM_WORLD, MPI_INFO_NULL);

    //
    // Write the sample paths to an HDF5 file using the HDF5 C-API!
    //
    auto file = H5Fcreate("ou_process.2.h5", H5F_ACC_TRUNC, H5P_DEFAULT, fapl);

    { // create & write the dataset
        hsize_t dimsf[] = {(hsize_t)path_count, (hsize_t)step_count};
        auto filespace = H5Screate_simple(2, dimsf, NULL);
        auto dataset = H5Dcreate(file, "/dataset", H5T_NATIVE_DOUBLE, filespace, H5P_DEFAULT, H5P_DEFAULT, H5P_DEFAULT);
        
        // Define, by rank, a selection in memory and write it to a hyperslab in the file.
        hsize_t count[]  = {my_path_count, step_count};
        hsize_t offset[] = {start, 0};
        
        hid_t memspace = H5Screate_simple(2, count, NULL);

        // Select hyperslab in the file.
        // hid_t filespace = H5Dget_space(dataset);
        H5Sselect_hyperslab(filespace, H5S_SELECT_SET, offset, NULL, count, NULL);

        // <OPTIONAL> Create property list for collective dataset write.
        hid_t dxpl = H5Pcreate(H5P_DATASET_XFER);
        H5Pset_dxpl_mpio(dxpl, H5FD_MPIO_COLLECTIVE);

        H5Dwrite(dataset, H5T_NATIVE_DOUBLE, memspace, filespace, dxpl, ou_process.data());

        // housekeeping
        H5Pclose(dxpl);
        H5Sclose(memspace);
        H5Dclose(dataset);
        H5Pclose(fapl);
        H5Sclose(filespace);
    }

    { // make the file self-describing by adding a few attributes to `dataset`
        auto scalar = H5Screate(H5S_SCALAR);
        auto acpl = H5Pcreate(H5P_ATTRIBUTE_CREATE);
        H5Pset_char_encoding(acpl, H5T_CSET_UTF8);
        
        auto set_attribute = [&](const string& name, const double& value) {
            auto attr = H5Acreate_by_name(file, "dataset", name.c_str(), H5T_NATIVE_DOUBLE, scalar, acpl, H5P_DEFAULT, H5P_DEFAULT);
            H5Awrite(attr, H5T_NATIVE_DOUBLE, &value);
            H5Aclose(attr);
        };
        set_attribute("dt", dt);
        set_attribute("θ", theta);
        set_attribute("μ", mu);
        set_attribute("σ", sigma);

        H5Pclose(acpl);
        H5Sclose(scalar);
    }

    H5Fclose(file);

    MPI_Finalize();

    return 0;
}

In [None]:
%%bash
mpicxx -std=c++17 -Wall -pedantic -I/usr/include/hdf5/openmpi -L/usr/lib/x86_64-linux-gnu -I./include  ./src/ou_hdf5_mpi.cpp ./build/parse_arguments.o ./build/parse_arguments2.o ./build/ou_sampler.o ./build/partitioner.o -o ./build/ou_hdf5_mpi -lhdf5_openmpi -lmpi
mpiexec --host localhost:2 -n 2 ./build/ou_hdf5_mpi -p 10000
ls -iks ou_process.2.h5

## Discussion

In this section, we saw a simple example of multiple MPI processes simutaneously writing to the same dataset in a shared HDF5 file. This divide-and-conquer pattern is common practice in High-Performance Computing to efficiently read and write extremely large datasets across many thousands of processes. 