# Multiprocessing and Distributed Computing

**What**
* **single Julia process → multiple Julia processes** that coordinate to perform certain computations

**Why**
* **Scaling things up**: run computations on multiple CPU cores, potentially even on different machines, e.g. nodes of a supercomputer or a local cluster of desktop machines.
* Effectively increase your total memory, e.g. to process a large dataset that wouldn't fit into local memory.

**Julia provides two fundamental implementations and paradigms**
* Julia's built-in [Distributed.jl](https://docs.julialang.org/en/v1/stdlib/Distributed/) standard library
* [Message Passing Interface (MPI)](https://www.mpi-forum.org/) through [MPI.jl](https://github.com/JuliaParallel/MPI.jl)

 

## Distributed.jl (standard library) vs MPI

**Distributed.jl**
* convenient for **"ad-hoc" distributed computing** (e.g. data processing)
* intuitive **master-worker model** (often naturally aligns with the structure of scientific computations)
* "one-sided" communication
* **interactivity**, e.g. in a REPL / in Jupyter
* built-in, no external setup necessary
* higher overhead than MPI and doesn't scale as well (by default doesn't utilizie Infiniband/OmniPath)

**MPI**
* **de-facto industry standard** for massively parallel computing, e.g. large scale distributed computing
* **known to scale well** up to thousands of compute nodes
* Single Program Multiple Data (SPMD) programming model (can be more challenging at first)
* **No (or very poor) interactivity** (see [MPIClusterManagers.jl](https://github.com/JuliaParallel/MPIClusterManagers.jl) or [tmpi](https://github.com/Azrael3000/tmpi))


The focus of this notebook is on **MPI** (Distributed.jl → later).

# Distributed Computing: Message Passing Interface (MPI)

Passing messages (i.e. moving data between processes) can be very costly in a distributed program. Reducing the number of messages and the amount of data sent is critical to achieving performance and scalability. For these reasons, MPI is centered around message passing.

## MPI and MPI.jl

* **[MPI](https://www.mpi-forum.org/)**: A [standard](https://www.mpi-forum.org/docs/) with several specific implementations (e.g. [OpenMPI](https://www.open-mpi.org/), [IntelMPI](https://www.intel.com/content/www/us/en/developer/tools/oneapi/mpi-library.html#gs.73krlr), [MPICH](https://www.mpich.org/))
* **[MPI.jl](https://github.com/JuliaParallel/MPI.jl)**: Julia package and interface to (most) MPI implementations ([paper](https://proceedings.juliacon.org/papers/10.21105/jcon.00068))



### MPI programming model

<img src="./imgs/mpi_model.svg" width=850>

**Sinlge Program Multiple Data (SPMD):**
* **all processes execute the same code** but have different IDs (rank).
* **conditionals can be used to simulate different behavior (MPMD)**
* individual processes flow at there own pace, **they can (and will) get out of sync**
* selecting the concrete number of processes is deferred to "runtime"

**Message passing:**
* **Two-sided communication:** explicit `Send` and explicit `Recv`
* Conceptually, a message is a tuple (memory address, size, datatype) (e.g. a `Vector{Float64}`)

### Basic example: Hello World

```julia
using MPI

MPI.Init()

comm   = MPI.COMM_WORLD
rank   = MPI.Comm_rank(comm)
nranks = MPI.Comm_size(comm)

println("Hello world, I am rank $rank of $nranks")

MPI.Finalize()

# see mpi_examples/1_mpi_hello.jl
```

#### Fundamental MPI functions

* `MPI.Init()` and `MPI.Finalize()` (the latter isn't necessary in Julia)

* `MPI.COMM_WORLD`: default *communicator*, includes all MPI ranks

* `MPI.Comm_rank(comm)`: unique rank of the process calling this function (**MPI rank ids start at 0!**)

* `MPI.Comm_size(comm)`: total number of ranks in the given communicator

### Running an MPI program

#### Getting MPI

By default, an appropriate MPI will be **automatically** downloaded when adding MPI.jl to a Julia environment (see e.g. MPICH_jll.jl)). Works out of the box more often than not.

However, in particular on larger HPC clusters, one sometimes wants/needs to use a **system-wide MPI** installation. Potential reasons include:

* Vendor-specific MPI required for MPI to work at all.
* Fine-tuned MPI configuration necessary for best performance.
* CUDA-aware or ROCm-aware MPI

##### How to use a system MPI?

```julia
using MPIPreferences
MPIPreferences.use_system_binary()
```
If you do this before adding MPI.jl, no MPI will be downloaded.

For more, check out the [MPI.jl documentation](https://juliaparallel.org/MPI.jl/stable/configuration/#configure_system_binary).

#### Parallel startup: `mpiexecjl` driver

MPI.jl provides [**`mpiexecjl`**](https://juliaparallel.org/MPI.jl/stable/configuration/#Julia-wrapper-for-mpiexec) that wraps, e.g., `mpirun`, `mpiexec`, or `srun` (configurable) to run you Julia MPI program. This enables
* the use of different MPI implementations for different Julia environments/projects
* the use of the default MPI that was installed by MPI.jl automatically

**You should use the following command to run your MPI application:**

```
mpiexecjl --project -n N julia mycode.jl
```

Here, `N` is the desired number of MPI ranks.

<img src="./imgs/julia_mpi_example.png" width=700>

## Comment: MPI.jl vs MPI in C

MPI.jl is very similar to [mpi4py](https://mpi4py.readthedocs.io/en/stable/) for Python (see [this paper](https://proceedings.juliacon.org/papers/10.21105/jcon.00068)).

**Advantages**

* Julia MPI functions typically have **less function arguments** than C counterparts (see below).
* MPI.jl functions can often automatically register and handle **custom Julia types and functions** (see e.g. `mpi_examples/6_mpi_custom_reduction.jl`)

**Disadvantages**

* Not every (exotic) function of the MPI API is wrapped yet.
* Minor translation necessary when porting C/Fortran code.

If you really need to, you can use the "low-level" C-API via `MPI.API.*`, which is identical to what you might know from C.

## Point-to-point communication

* **Sending:**
    ```julia
              MPI.Send(buf, comm; dest)
              
    ```
  * `buf`: data buffer, typically an array
  * `comm`: communicator
  * `destination`: target rank (to receive the data)
* **Receiving:**
    ```julia
              MPI.Recv!(recvbuf, comm; source=MPI.ANY_SOURCE)
              
    ```
  * `recvbuf`: buffer to store the received data in, typically an array
  * `comm`: communicator
  * `source`: source rank (whos sending the data)

These functions have **blocking semantics**, i.e. the calling process waits for the operation to "complete"!

(Note that for sending, "complete" only means that the buffer can be reused but not necessarily that the full communication has happened (e.g. the message sending might have completed but the receiving hasn't started yet). A good explanation of the different MPI sending modes is available in [this stackoverflow post](https://stackoverflow.com/a/47041382/2365675).)

Example:

<img src="./imgs/mpi_sendrecv.svg" width=300px>

```julia
msg = fill(rank, 10) # Vector{Int64} of length 10 filled with rank id

if rank != 0
    MPI.Send(msg, comm; dest=0) # blocking
else
    println(msg)
    for r in 1:world_size-1
        MPI.Recv!(msg, comm; source=r) # blocking
        println(msg)
    end
end

# full example, see mpi_examples/2_mpi_blocking_communication.jl
```

### Deadlocks
Note that blocking point-to-point communcation **can lead to deadlocks!**

<img src="./imgs/mpi_deadlock.svg" width=400px>

```julia
# ring topology, i.e. periodic boundary conditions
left  = mod(rank - 1, nranks)
right = mod(rank + 1, nranks)

# Warning: deadlock!
MPI.Recv!(msg, comm; source=left)
MPI.Send(msg, comm; dest=right)

# the following will never be reached
```

**Solutions:**
* Alternate order of send and receive on neighboring ranks.
* `MPI.Sendrecv!`
* Non-blocking communication

### Non-blocking communication

**Why?**

* Avoid deadlocks
* Avoid serialization/sequentialization
* Overlaping of communication with computations and/or other communication

Essentially the same function signatures as above, but different function names and different behavior.

* **Sending:**
    ```julia
              req = MPI.Isend(buf, comm[, req]; dest)
    ```
* **Receiving:**
    ```julia
              req = MPI.Irecv!(recvbuf, comm[, req]; source)
    ```
    
Each function returns a `MPI.Request` (`req`), which may be used for (blocking) waiting/testing operations (`MPI.Wait(req)`). Optionally, the request object may be preallocated and passed as a third argument.

```julia
# ring topology, i.e. periodic boundary conditions
left  = mod(rank - 1, nranks)
right = mod(rank + 1, nranks)

# non-blocking communication → no deadlock
req1 = MPI.Irecv!(msg, comm; source=left)
req2 = MPI.Isend(msg, comm; dest=right)

# blocking wait
MPI.Waitall([req1, req2])

# the following will be reached

# full example, see mpi_examples/3_mpi_ring_communication_nonblocking.jl
```

## Collective communication

Perform operations involving **all processes/ranks** within a communicator.

* Types of collective operations
  * **synchronization**: let all ranks wait until all have reached a synchronization point (*barrier*)
  * **data movement**: one-to-many and many-to-many communications (broadcast, scatter, gather, all to all)
  * **collective computation**: parallel reduction (e.g. summation) of rank-local information → result on the "master"

### Synchronization

* Example: **Barrier**
    ```julia
              MPI.Barrier(comm)
    ```

Rank stops execution until **all ranks have reached the barrier**.

Exemplary use-case: time measurement.

```julia
MPI.Barrier(comm)                         # synchronize all ranks
time_start = MPI.Wtime()                  # start the clock

@time sleep(rank)                         # perform computations that take different amounts of time

MPI.Barrier(comm)                         # synchronize all ranks
elapsed_time = MPI.Wtime() - time_start   # stop the clock

# full example: mpi_examples/4_mpi_wtime.jl
```

### Data movement

* Example: **Broadcasting**
    ```julia
              MPI.Bcast!(buffer, comm; root=0)
              
    ```
  * `buffer`: source buffer for the `root` rank, destination buffer for all other ranks
  * `comm`: communicator
  * `root`: source rank (holds the data to be broadcasted)

<br>
<img src="imgs/mpi_bcast.svg" width=700>
<br>

Broadcasting can be implemented by means of the point-to-point communications → **exercise!**

### Collective computation

* Example: **Reduction**
    ```julia
              recvbuf = MPI.Reduce(sendbuf, op, comm; root=0)
              
    ```
  * `sendbuf`: rank-local data buffer to be sent/reduced
  * `op`: reduction operation (e.g. `+` or `min`)
  * `comm`: communicator
  * `root`: target rank (to receive the result)
  * `recvbuf`: result of parallel reduction on `root` rank, `nothing` on all other ranks
  
<br>
<img src="imgs/mpi_reduce.svg" width=785>
<br>

Exemplary use-case: Parallel **trapezoidal integration**.

<img src="imgs/trapezoids.png" width=400>

```julia
# full example: mpi_examples/5_mpi_trapezoidal_integration.jl

"Function to be integrated (from 0 to 1). The analytic result is π."
f(x) = 4 * √(1 - x^2)

"Evaluate definite integral (from `a` to `b`) by using the trapezoidal rule."
function integrate_trapezoidal(a, b, n, h)
    y = (f(a) + f(b)) / 2.0
    for i in 1:n-1
        x = a + i * h
        y = y + f(x)
    end
    return y * h
end

# compute local integration interval etc....

# perform local integration
res_loc = integrate_trapezoidal(a_loc, b_loc, n_loc, h)

# parallel reduction
res = MPI.Reduce(res_loc, +, comm)

# print result
if 0 == rank
    @printf("π (numerical integration) ≈ %20.16f\n", res)
end
```

## MPI + CUDA

The auto-shipped Julia artifacts for MPI are not CUDA-aware → use system libraries.

Experience:

* the system setup isn't always trivial/stable
* but using MPI + CUDA is easy

```julia
# Example: sending and receiving CuArrays
send_mesg = CuArray{Float64}(undef, N)
recv_mesg = CuArray{Float64}(undef, N)
fill!(send_mesg, Float64(rank))

# pass GPU buffers (CuArrays) into MPI functions
MPI.Sendrecv!(send_mesg, dst, 0, recv_mesg, src, 0, comm)

# Full examples, see mpi_examples/mpi_cuda_aware/*.jl
```

## Profiling

Recommendation:
* [NVIDIA Nsight Systems](https://developer.nvidia.com/nsight-systems) + [NVTX.jl](https://github.com/JuliaGPU/NVTX.jl) (for instrumentation)

<img src="imgs/report1.png" width=800px>

(source: `notebooks/backup/mpi_profiling_nsys`)

Other options:
* [Extrae.jl](https://github.com/bsc-quantic/Extrae.jl) (beta)
* [ScoreP.jl](https://github.com/JuliaPerf/ScoreP.jl) (experimental)

## High-level tools

* [PartitionedArrays.jl](https://github.com/fverdugo/PartitionedArrays.jl): Data-oriented parallel implementation of partitioned vectors and sparse matrices needed in FD, FV, and FE simulations.
* [Elemental.jl](https://github.com/JuliaParallel/Elemental.jl): A package for dense and sparse distributed linear algebra and optimization.
* [PETSc.jl](): Suite of data structures and routines for the scalable (parallel) solution of scientific applications modeled by partial differential equations. ([original website](https://petsc.org/release/))
* ...

## More? Good MPI resources

* Great [MPI self-study materials](https://www.hlrs.de/training/self-study-materials/mpi-course-material) provided by HLRS (slides + exercises).
* [Using MPI: Portable Parallel Programming with the Message-Passing Interface](https://mitpress.mit.edu/9780262527392/using-mpi/), by William Gropp, Ewing Lusk and Anthony Skjellum (book)