# Multiprocessing and Distributed Computing

**What**
* **single Julia process → multiple Julia processes** that coordinate to perform certain computations

**Why**
* **Scaling things up**: run computations on multiple CPU cores, potentially even on different machines, e.g. nodes of a supercomputer or a local cluster of desktop machines.
* Effectively increase your total memory, e.g. to process a large dataset that wouldn't fit into local memory.

**Julia provides two fundamental implementations and paradigms**
* Julia's built-in [Distributed.jl](https://docs.julialang.org/en/v1/stdlib/Distributed/) standard library
* [Message Passing Interface (MPI)](https://www.mpi-forum.org/) through [MPI.jl](https://github.com/JuliaParallel/MPI.jl)

 

## Distributed.jl (standard library) vs MPI

**Distributed.jl**
* convenient for **"ad-hoc" distributed computing** (e.g. data processing)
* intuitive **master-worker model** (often naturally aligns with the structure of scientific computations)
* "one-sided" communication
* **interactivity**, e.g. in a REPL / in Jupyter
* built-in, no external setup necessary
* higher overhead than MPI and doesn't scale as well (by default doesn't utilizie Infiniband/OmniPath)

**MPI**
* **de-facto industry standard** for massively parallel computing, e.g. large scale distributed computing
* **known to scale well** up to thousands of compute nodes
* Single Program Multiple Data (SPMD) programming model (can be more challenging at first)
* **No (or very poor) interactivity** (see [MPIClusterManagers.jl](https://github.com/JuliaParallel/MPIClusterManagers.jl) or [tmpi](https://github.com/Azrael3000/tmpi))


The focus of this notebook is on **Distributed.jl** (MPI → later).

## Distributed.jl programming model

<img src="./imgs/distributedjl_model.svg" width=850>

**Master-worker paradigm**:
* One master process coordinates a set of worker processes (that eventually perform computations).
* Programmer only controls the master directly. The workers are "instructed" (**one-sided** communication).



## Adding and removing workers

In [None]:
using Distributed

In [None]:
nprocs()

In [None]:
nworkers() # the master is considered a worker as long as there are no other workers

To increase the number of workers, i.e. Julia processes, we can call **`addprocs`**.

(Alternatively, one can set the number of worker processes when starting julia with the `-p` option. E.g. `julia -p 4` gives 5 processes, 1 master and 4 workers.)

In [None]:
addprocs(4)

Every process has a Julia internal `pid` (process id). **The master always has pid 1.** You can get the worker pids from `workers()`.

In [None]:
workers()

To remove workers, we can pass an array of pids to `rmprocs`.

In [None]:
rmprocs(workers())

In [None]:
nworkers() # only the master is left

In [None]:
addprocs(4)

In [None]:
workers()

### One master to rule them all - `@spawn`, `@spawnat`, `@fetch`, `@fetchfrom`, `@everywhere`...

To submit computations to any worker we can use the following macro

* `@spawn`: run a command or a code block on any worker and return a `Future` (a wrapped `Task`) to it's result.

Note the conceptual similarity between `Threads.@spawn` (task → thread) and `Distributed.@spawn` (task → process).

In [None]:
@spawn 3+3

In [None]:
result = @spawn 3+3

In [None]:
fetch(result)

`@fetch` does the same, just in one command. It has **blocking** semantics and only returns once the result has arrived.

In [None]:
@fetch 3+3

In [None]:
@fetch rand(3,3)

Which worker did the work?

In [None]:
@fetch begin
    println(myid());
    3+3
end

Ok, now that we understood all that, let's delegate a *complicated* calculation

In [None]:
using Random

function complicated_calculation()
    sleep(1) # so complex that it takes a long time :)
    randexp(5)
end

@fetch complicated_calculation()

What happened?

**Every worker is a separate Julia process.** (Think of having multiple Julia REPLs open at once.)

We only defined `complicated_calculation()` on the master process. The function doesn't exist on any of the workers.

The macro `@everywhere` allows us to perform steps on all processes (master and worker). This is particularly useful for loading packages and defining functions.

In [None]:
@everywhere begin # execute this block on all workers
    using Random
    
    function complicated_calculation()
        sleep(1)
        randexp(5)
    end
end

In [None]:
@fetch complicated_calculation()

### Data movement

Sending messages and moving data typically constitute most of the overhead in a distributed program. **Reducing the number of messages and the amount of data sent is critical to achieving performance and scalability.**

Data is **implicitly** transferred to a worker as part of a task that needs it. Example:

In [None]:
function method1()
    A = rand(100,100)
    B = rand(100,100)
    C = @fetch A^2 * B^2
end

`A` and `B` are created on the master process, transferred to a worker, squared and multiplied by the worker before the result is finally transferred back to the master.

Compare this to this similar implementation:

In [None]:
function method2()
    C = @fetch rand(100,100)^2 * rand(100,100)^2
end

Here, the entire computation happens on a worker process. Only the result is transferred to the master.

Let's benchmark:

In [None]:
using BenchmarkTools
@btime method1();
@btime method2();

Note that there are more tools for explicit data movement like `Channels`/`RemoteChannels` and [ParallelDataTransfer.jl](https://github.com/ChrisRackauckas/ParallelDataTransfer.jl/). For the sake of brevity, we will not cover them here. (See `notebooks/backup` for basic peer-to-peer communication via `RemoteChannels`.)

(Fortunately, many useful parallel computations do not require (much) data movement at all. A common example is a direct Monte Carlo simulation, where multiple processes can handle independent simulation trials simultaneously. → **montecarlo_pi** exercise)

## High-level tools: `@distributed` and `pmap`

So far we have seen some of the fundamental building blocks for distributed computing with Distributed.jl. However, in practice, one wants to think as little as possible about how to distribute the work and explicitly spawn tasks in parallel programs.

Julia provides **high-level convenience** tools to
 * distribute `map` $\quad$ → $\quad$ [`pmap`](https://docs.julialang.org/en/v1/stdlib/Distributed/#Distributed.pmap)
 * distribute loops $\quad$ → $\quad$ [`@distributed`](https://docs.julialang.org/en/v1/stdlib/Distributed/#Distributed.@distributed)

### Parallel map: `pmap`

Very often, one simply wants to parallize a `map` operation where a function `f` is applied to all elements of a collection. This is a typical instance of **data parallelism**, which covers a vast class of compute-intensive programs.

In [None]:
map(x->x^2, 1:10)

Such a pattern can be parallelized in Julia via the high-level function `pmap` ("parallel map").

#### Example: Singular values of multiple matrices

In [None]:
@everywhere using LinearAlgebra

M = Matrix{Float64}[rand(200,200) for i = 1:10]; # array holding 10 matrices

In [None]:
map(svdvals, M)

In [None]:
pmap(svdvals, M)

Let's check that this indeed utilized multiple workers.

In [None]:
pmap(M) do m
    println(myid())
    svdvals(m)
end

# do synax:
#
# pmap(M) do m
#     ...
# end
#
# is the same as
#
# pmap(m -> ..., M)

In [None]:
@btime map($svdvals, $M);
@btime pmap($svdvals, $M);

### Distributed loops (`@distributed`)

#### Example: Reduction

Reductions (e.g. `sum`) can't be expressed with `pmap` (e.g. $\mathbb{R}^n \rightarrow \mathbb{R}$ vs $\mathbb{R}^n \rightarrow \mathbb{R}^n$).

Task: Counting heads in a series of coin tosses.

In [None]:
function count_heads_loop(n)
    c = 0
    for i = 1:n
        c += rand((0,1))
    end
    return c
end

N = 200_000_000
@btime count_heads_loop($N);

(Here, `+` is the **reducer function**)

We can parallelize this reduction using
* `@distributed (reducer function) for ...`.

Note that this has **blocking** character and returns the result once it has arrived.

In [None]:
function count_heads_distributed_loop(n)
    c = @distributed (+) for i in 1:n
        rand((0,1))
    end
    return c
end

In [None]:
@btime count_heads_distributed_loop($N);

The distributed version is about **4x faster**, which is all we could hope for.

Similar to `@threads :static`, `@distributed` distributes the work **evenly** among the workers.

In [None]:
function count_heads_distributed_verbose(n)
    c = @distributed (+) for i in 1:n
        x = rand((0,1))
        println(x)
        x
    end
    c
end

count_heads_distributed_verbose(8);

#### Example: Array mutation (if time permits)

Apart from `@distributed (reducer) ...` there also is a `@distributed for ...` form. The latter is **non-blocking** and returns a `Task`. (You can think of it as a distributed version of `@spawn` for all the iterations.)

However, since the loop body will be executed on different processes, one must be careful to operate on **data structures that are available on all processes** (similar to the mistake highlighted above).

In [None]:
function square_broken()
    A = collect(1:10)
    @sync @distributed for i in eachindex(A)
        A[i] = A[i]^2
    end
    return A
end

In [None]:
square_broken()

To actually make all processes operate on the same array, one can use a `SharedArray`. For this to work, the **processes need to live on the same machine**.

In [None]:
@everywhere using SharedArrays # must be loaded everywhere

In [None]:
A = rand(3,2)

In [None]:
S = SharedArray(A)

In [None]:
function square!(X)
    for i in eachindex(X)
        sleep(0.001) # mimic some computational cost
        X[i] = X[i]^2
    end
end

function square_distributed!(X)
    @sync @distributed for i in eachindex(X)
        sleep(0.001) # mimic some computational cost
        X[i] = X[i]^2
    end
end

A = rand(10,10)
S = SharedArray(A)

@btime square!($A);
@btime square_distributed!($S);

Note that the functions `square!` and `square_distributed!` could have also been written with `map` and `pmap`, respectively. (data parallelism)

### `@distributed` vs `pmap`

Julia's `pmap` is designed for the case where

* one wants to apply **a function to a collection**,
* one needs **load-balancing** (e.g. workload is non-uniform), and/or
* each function call does enough work to amortize the overhead. 

On the other hand, `@distributed` is good for

* **reductions** (can't be written as `map`/`pmap`),
* loops where each iteration **takes about the same time** (uniform workload), and/or
* loops where the workload of each iteration is (very) small.

## High-level array abstractions: [DistributedArrays.jl](https://github.com/JuliaParallel/DistributedArrays.jl)

`DArray`: each process has local access to just a chunk of the data, and no two processes share the same chunk.

* `pmap(x::Array)`: parallel function on regular data structure
* `map(x::DArray)`: regular function on parallel data structure

In [None]:
using Distributed, BenchmarkTools; rmprocs(workers()); addprocs(4); # clean reset :)

In [None]:
@everywhere using DistributedArrays, LinearAlgebra

In [None]:
M = Matrix{Float64}[rand(200,200) for i = 1:10];

In [None]:
D = distribute(M)

Which part does each worker hold?

In [None]:
for p in workers()
    println(@fetchfrom p localindices(D))
end

In [None]:
@btime map($svdvals, $M) samples=5 evals=3;
@btime map($svdvals, $D) samples=5 evals=3;
@btime pmap($svdvals, $M) samples=5 evals=3;

## Final comments

### Creating workers on other machines

Starting worker processes (via `addprocs`) is handled by [ClusterManagers](https://docs.julialang.org/en/v1/manual/distributed-computing/#ClusterManagers).

* The default is `LocalManager`. It is automatically used when running `addprocs(i::Integer)`.
* Another manager is `SSHManager`. It is automatically used when running `addprocs(hostnames::Array)`, e.g. `addprocs(["node123", "node456"])`. The only requirement is a **passwordless ssh access** to all specified hosts.
* Cluster managers for SLURM, PBS, and others are provided in [ClusterManagers.jl](https://github.com/JuliaParallel/ClusterManagers.jl). For SLURM, this will make `addprocs` use `srun` under the hood.

*Example*

```julia
using Distributed

addprocs(["node123", "node123"])

@everywhere println(gethostname())
```

One can also start multiple processes on different machines:
```julia
addprocs([("node123", 2), ("node456", 3)]) # starts 2 workers on node123 and 3 workers on node456

# Use :auto to start as many processes as CPU threads are available
```

**Be aware of different paths:**
* By default, `addprocs` expects to find the julia executable on the remote machines under the same path as on the host (master).
* It will also try to `cd` to the same folder (set the working directory).


As you can see from `?addprocs`, `addprocs` takes a bunch of keyword arguments, two of which are of particular importance in this regard:

* `dir`: working directory for the worker processes
* `exename`: path to julia executable (potentially augmented with pre-commands) for the worker processes

In [None]:
# cleanup
rmprocs(workers())