<a href="https://colab.research.google.com/github/amontoison/Workshop-GERAD/blob/main/distributed_computing.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Parallel computing and GPU programming with Julia
## Part II: Multi-processing and distributed computing
Alexis Montoison

In [2]:
import Pkg
Pkg.activate("colab4")
Pkg.add(["BenchmarkTools", "MPI"])

[32m[1m  Activating[22m[39m project at `/content/colab4`
[32m[1m   Resolving[22m[39m package versions...
[32m[1m  No Changes[22m[39m to `/content/colab4/Project.toml`
[32m[1m  No Changes[22m[39m to `/content/colab4/Manifest.toml`


In [3]:
using BenchmarkTools
using Distributed
using MPI

<img src="https://github.com/amontoison/Workshop-GERAD/blob/main/Graphics/multithreading_multiprocessing.png?raw=1" width=450px>

An implementation of distributed memory parallel computing is provided by module `Distributed` as part of the standard library shipped with Julia.

Most modern computers possess more than one CPU, and several computers can be combined together in a cluster. Harnessing the power of these multiple CPUs allows many computations to be completed more quickly. There are two major factors that influence performance: the speed of the CPUs themselves, and the speed of their access to memory. In a cluster, it's fairly obvious that a given CPU will have fastest access to the RAM within the same computer (node).

![](https://github.com/amontoison/Workshop-GERAD/blob/main/Graphics/distributed_computing.png?raw=1)

Julia provides a multiprocessing environment based on message passing to allow programs to run on multiple processes in separate memory domains at once.

Julia’s main implementation of message passing for distributed-memory systems is contained in the Distributed module. Its approach is different from other frameworks like MPI in that communication is generally “one-sided”, meaning that the programmer needs to explicitly manage only one process in a two-process operation.

<img src="https://github.com/amontoison/Workshop-GERAD/blob/main/Graphics/distributedjl_model.svg?raw=1" width=850>

**Master-worker paradigm**:
* One master process coordinates a set of worker processes (that eventually perform computations).
* Programmer only controls the master directly. The workers are "instructed" (**one-sided** communication).


Julia can be started with a given number of `p` local workers with:
```julia
julia -p 4
```

The Distributed module is automatically loaded if the `-p` flag is used. But we can also dynamically add processes in a running Julia session:

In [4]:
using Distributed

println(nprocs())
addprocs(4)         # add 4 workers
println(nprocs())   # total number of processes
println(nworkers()) # only worker processes
# rmprocs(workers())  # remove worker processes

1
5
4


Note what happens here: there is one master process which can create additional worker processes, and as we shall see, it can also distribute work to these workers.
For running on a cluster, we instead need to provide the `--machine-file` option and the name of a file containing a list of machines that are accessible via password-less `ssh`. Support for running on clusters with various schedulers (including SLURM) can be found in the [ClusterManagers.jl](https://github.com/JuliaParallel/ClusterManagers.jl) package.


In [5]:
myid()

1

In [6]:
@sync pmap(i -> println("I'm worker $(myid()), working on i=$i"), 1:10)

      From worker 5:	I'm worker 5, working on i=4
      From worker 2:	I'm worker 2, working on i=1
      From worker 3:	I'm worker 3, working on i=2
      From worker 4:	I'm worker 4, working on i=3
      From worker 5:	I'm worker 5, working on i=5
      From worker 5:	I'm worker 5, working on i=6
      From worker 5:	I'm worker 5, working on i=7
      From worker 5:	I'm worker 5, working on i=8
      From worker 5:	I'm worker 5, working on i=9
      From worker 5:	I'm worker 5, working on i=10


10-element Vector{Nothing}:
 nothing
 nothing
 nothing
 nothing
 nothing
 nothing
 nothing
 nothing
 nothing
 nothing

In [7]:
@sync @distributed for i in 1:10
  println("I'm worker $(myid()), working on i=$i")
end

      From worker 4:	I'm worker 4, working on i=7
      From worker 4:	I'm worker 4, working on i=8
      From worker 2:	I'm worker 2, working on i=1
      From worker 3:	I'm worker 3, working on i=4
      From worker 3:	I'm worker 3, working on i=5
      From worker 3:	I'm worker 3, working on i=6
      From worker 5:	I'm worker 5, working on i=9
      From worker 5:	I'm worker 5, working on i=10
      From worker 2:	I'm worker 2, working on i=2
      From worker 2:	I'm worker 2, working on i=3


Task (done) @0x00007e2b4e308010

`pmap` should be preferred when each operation is not very fast.
`@distributed` for is better when each operation is very fast.

In [8]:
a = [i for i=1:10]
@sync @distributed for i in 1:10
  println("working on i=$i")
  a[i] = i^2
end
a

      From worker 3:	working on i=4
      From worker 3:	working on i=5
      From worker 3:	working on i=6
      From worker 5:	working on i=9
      From worker 5:	working on i=10
      From worker 2:	working on i=1
      From worker 2:	working on i=2
      From worker 2:	working on i=3
      From worker 4:	working on i=7
      From worker 4:	working on i=8


10-element Vector{Int64}:
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10

nothing changed in a!! a is made available to each worker, but it’s basically for “reading” because a whole copy of a is sent to the memory space of each worker. Each worker has its own memory domain, for safety.

In [9]:
printsquare(i) = println("working on i=$i: its square it $(i^2)")
@sync @distributed for i in 1:10
  printsquare(i) # error
end

LoadError: TaskFailedException

[91m    nested task error: [39mOn worker 2:
    UndefVarError: `#printsquare` not defined in `Main`
    Suggestion: add an appropriate import or assignment. This global was declared but not assigned.
    Stacktrace:
      [1] [0m[1mdeserialize_datatype[22m
    [90m    @[39m [90m/usr/local/share/julia/stdlib/v1.11/Serialization/src/[39m[90m[4mSerialization.jl:1471[24m[39m
      [2] [0m[1mhandle_deserialize[22m
    [90m    @[39m [90m/usr/local/share/julia/stdlib/v1.11/Serialization/src/[39m[90m[4mSerialization.jl:897[24m[39m
      [3] [0m[1mdeserialize[22m
    [90m    @[39m [90m/usr/local/share/julia/stdlib/v1.11/Serialization/src/[39m[90m[4mSerialization.jl:844[24m[39m
      [4] [0m[1mhandle_deserialize[22m
    [90m    @[39m [90m/usr/local/share/julia/stdlib/v1.11/Serialization/src/[39m[90m[4mSerialization.jl:904[24m[39m
      [5] [0m[1mdeserialize[22m
    [90m    @[39m [90m/usr/local/share/julia/stdlib/v1.11/Serialization/src/[39m[90m[4mSerialization.jl:844[24m[39m[90m [inlined][39m
      [6] [0m[1mdeserialize_global_from_main[22m
    [90m    @[39m [90m/usr/local/share/julia/stdlib/v1.11/Distributed/src/[39m[90m[4mclusterserialize.jl:160[24m[39m
      [7] [0m[1m#5[22m
    [90m    @[39m [90m/usr/local/share/julia/stdlib/v1.11/Distributed/src/[39m[90m[4mclusterserialize.jl:72[24m[39m[90m [inlined][39m
      [8] [0m[1mforeach[22m
    [90m    @[39m [90m./[39m[90m[4mabstractarray.jl:3187[24m[39m
      [9] [0m[1mdeserialize[22m
    [90m    @[39m [90m/usr/local/share/julia/stdlib/v1.11/Distributed/src/[39m[90m[4mclusterserialize.jl:72[24m[39m
     [10] [0m[1mhandle_deserialize[22m
    [90m    @[39m [90m/usr/local/share/julia/stdlib/v1.11/Serialization/src/[39m[90m[4mSerialization.jl:990[24m[39m
     [11] [0m[1mdeserialize[22m
    [90m    @[39m [90m/usr/local/share/julia/stdlib/v1.11/Serialization/src/[39m[90m[4mSerialization.jl:844[24m[39m
     [12] [0m[1mhandle_deserialize[22m
    [90m    @[39m [90m/usr/local/share/julia/stdlib/v1.11/Serialization/src/[39m[90m[4mSerialization.jl:901[24m[39m
     [13] [0m[1mdeserialize[22m
    [90m    @[39m [90m/usr/local/share/julia/stdlib/v1.11/Serialization/src/[39m[90m[4mSerialization.jl:844[24m[39m
     [14] [0m[1mdeserialize_datatype[22m
    [90m    @[39m [90m/usr/local/share/julia/stdlib/v1.11/Serialization/src/[39m[90m[4mSerialization.jl:1495[24m[39m
     [15] [0m[1mhandle_deserialize[22m
    [90m    @[39m [90m/usr/local/share/julia/stdlib/v1.11/Serialization/src/[39m[90m[4mSerialization.jl:897[24m[39m
     [16] [0m[1mdeserialize[22m
    [90m    @[39m [90m/usr/local/share/julia/stdlib/v1.11/Serialization/src/[39m[90m[4mSerialization.jl:844[24m[39m
     [17] [0m[1mhandle_deserialize[22m
    [90m    @[39m [90m/usr/local/share/julia/stdlib/v1.11/Serialization/src/[39m[90m[4mSerialization.jl:904[24m[39m
     [18] [0m[1mdeserialize[22m
    [90m    @[39m [90m/usr/local/share/julia/stdlib/v1.11/Serialization/src/[39m[90m[4mSerialization.jl:844[24m[39m[90m [inlined][39m
     [19] [0m[1mdeserialize_msg[22m
    [90m    @[39m [90m/usr/local/share/julia/stdlib/v1.11/Distributed/src/[39m[90m[4mmessages.jl:87[24m[39m
     [20] [0m[1m#invokelatest#2[22m
    [90m    @[39m [90m./[39m[90m[4messentials.jl:1055[24m[39m[90m [inlined][39m
     [21] [0m[1minvokelatest[22m
    [90m    @[39m [90m./[39m[90m[4messentials.jl:1052[24m[39m[90m [inlined][39m
     [22] [0m[1mmessage_handler_loop[22m
    [90m    @[39m [90m/usr/local/share/julia/stdlib/v1.11/Distributed/src/[39m[90m[4mprocess_messages.jl:176[24m[39m
     [23] [0m[1mprocess_tcp_streams[22m
    [90m    @[39m [90m/usr/local/share/julia/stdlib/v1.11/Distributed/src/[39m[90m[4mprocess_messages.jl:133[24m[39m
     [24] [0m[1m#103[22m
    [90m    @[39m [90m/usr/local/share/julia/stdlib/v1.11/Distributed/src/[39m[90m[4mprocess_messages.jl:121[24m[39m
    
    ...and 3 more exceptions.
    
    Stacktrace:
     [1] [0m[1msync_end[22m[0m[1m([22m[90mc[39m::[0mChannel[90m{Any}[39m[0m[1m)[22m
    [90m   @[39m [90mBase[39m [90m./[39m[90m[4mtask.jl:466[24m[39m
     [2] [0m[1mmacro expansion[22m
    [90m   @[39m [90m./[39m[90m[4mtask.jl:499[24m[39m[90m [inlined][39m
     [3] [0m[1m(::Distributed.var"#177#179"{var"#7#8", UnitRange{Int64}})[22m[0m[1m([22m[0m[1m)[22m
    [90m   @[39m [35mDistributed[39m [90m/usr/local/share/julia/stdlib/v1.11/Distributed/src/[39m[90m[4mmacros.jl:278[24m[39m

For this to work, we need to export functions and packages `@everywhere`:

In [20]:
@everywhere printsquare(i) = println("working on i=$i: its square it $(i^2)")
@sync @distributed for i in 1:10
  printsquare(i)
end

      From worker 2:	working on i=1: its square it 1
      From worker 2:	working on i=2: its square it 4
      From worker 2:	working on i=3: its square it 9
      From worker 5:	working on i=9: its square it 81
      From worker 5:	working on i=10: its square it 100
      From worker 3:	working on i=4: its square it 16
      From worker 3:	working on i=5: its square it 25
      From worker 3:	working on i=6: its square it 36
      From worker 4:	working on i=7: its square it 49
      From worker 4:	working on i=8: its square it 64


Task (done) @0x00007e2ba48f0970

**Warning**: 1 worker ≠ 1 CPU core or thread: if we ask for 50 processors and our machine only has 4, we will see that we have 50 workers, but several workers will be sharing the same CPU (but different memory domains). It will slow us down compared to asking for 4
workers only.

Each process has a unique identifier accessible via the `myid()` function (master has `myid() == 1`).
The `@spawn` and `@spawnat` macros can be used to transfer work to a process, and then return the `Future` result to the master process using the `fetch` function. `@spawn` selects the process automatically while `@spawnat` lets you choose.

In [11]:
# execute myid() and rand() on process 2
r = @spawnat 2 (myid(), rand())

# fetch the result
fetch(r)

(2, 0.5518871084618457)

One use case could be to manually distribute expensive function calls between processes, but there are higher-level and simpler constructs than @spawn / @spawnat:
- the `@distributed` macro for for loops. Can be used with a reduction operator to gather work performed by the independent tasks.
- the pmap function which maps an array or range to a given function.

To illustrate the difference between these approaches we revisit the `sum_sqrt` function from the Multithreading notebook. To use `pmap` we need to modify our function to accept a range so we will use this modified version. Note that to make any function available to all processes it needs to be decorated with the @everywhere macro:

In [12]:
@everywhere function sqrt_sum_range(A, r)
    s = zero(eltype(A))
    for i in r
        @inbounds s += sqrt(A[i])
    end
    return s
end

In [13]:
A = rand(100_000)
batch = Int(length(A) / 100)

1000

In [14]:
# distributed (+)
@distributed (+) for r in [(1:batch) .+ offset for offset in 0:batch:length(A)-1]
    sqrt_sum_range(A, r)
end

66736.69641686781

In [15]:
# pmap
sum(pmap(r -> sqrt_sum_range(A, r), [(1:batch) .+ offset for offset in 0:batch:length(A)-1]))

66736.6964168678

In [22]:
# @spawnat
futures = Array{Future}(undef, nworkers())

@time begin
    for (i, id) in enumerate(workers())
        batch = floor(Int, length(A) / nworkers())
        remainder = length(A) % nworkers()
        if (i-1) < remainder
            start = 1 + (i - 1) * (batch + 1)
            stop = start + batch
        else
            start = 1 + (i - 1) * batch + remainder
            stop = start + batch - 1
        end
        futures[i] = @spawnat myid() sqrt_sum_range(A, start:stop)
    end
    p = sum(fetch.(futures))
end

  0.067154 seconds (26.99 k allocations: 1.311 MiB, 99.08% compilation time)


66736.69641686778

The `@spawnat` version looks cumbersome for this case particular case as the algorithm required the explicit partitioning of the array which is common in MPI, for instance. The `@distributed` (+) parallel for loop and the `pmap` mapping are much simpler, but which one is preferable for a given use case?
- `@distributed` is appropriate for reductions. It does not load-balance and simply divides the work evenly between processes. It is best in cases where each loop iteration is cheap.
- `pmap` can handle reductions as well as other algorithms. It performs load-balancing and since dynamic scheduling introduces some overhead it’s best to use pmap for computationally heavy tasks.
- In the case of `@spawnat`, because the futures are not immediately using CPU resources, it opens the possibility of using asynchronous and uneven workloads.

Just like with multithreading, multiprocessing with Distributed comes with an overhead because of sending messages and moving data between processes.

In [17]:
# Finally, it should be emphasized that a common use case of pmap involves heavy computations inside functions defined in imported packages. For example, computing the singular value decomposition of many matrices:
@everywhere using LinearAlgebra
x=[rand(100,100) for i in 1:10]
@btime map(LinearAlgebra.svd, x);
@btime pmap(LinearAlgebra.svd, x);

  15.819 ms (173 allocations: 4.71 MiB)
  17.183 ms (1815 allocations: 1.61 MiB)


**DistributedArrays**: Another way to approach parallelization over multiple machines is through [DistributedArrays.jl](https://github.com/JuliaParallel/DistributedArrays.jl) package. A `DistributedArrays` is distributed across a set of workers. Each worker can read and write from its local portion of the array and each worker has read-only access to the portions of the array held by other workers.

**Summary**:
- `@distributed` is good for reductions and fast inner loops with limited data transfer.
- `pmap` is good for expensive inner loops that return a value.
- `SharedArrays` can be an easier drop-in replacement for threading-like behaviors on a single machine.

## MPI
[MPI.jl](https://github.com/JuliaParallel/MPI.jl) is a Julia interface to the Message Passing Interface, which has been the standard workhorse of parallel computing for decades. Like `Distributed`, MPI belongs to the distributed-memory paradigm.

The idea behind MPI is that:
- Tasks have a rank and are numbered 0, 1, 2, 3, …
- Each task manages its own memory
- Each task can run multiple threads
- Tasks communicate and share data by sending messages.
- Many higher-level functions exist to distribute information to other tasks and gather information from other tasks.
- All tasks typically run the entire code and we have to be careful to avoid that all tasks do the same thing.

<img src="https://github.com/amontoison/Workshop-GERAD/blob/main/Graphics/mpi_model.svg?raw=1" width=850>

MPI.jl provides Julia bindings for the Message Passing Interface (MPI) standard. This is how a hello world MPI program looks like in Julia:

#### Fundamental MPI functions

* `MPI.Init()` and `MPI.Finalize()` (the latter isn't necessary in Julia)

* `MPI.COMM_WORLD`: default *communicator*, includes all MPI ranks

* `MPI.Comm_rank(comm)`: unique rank of the process calling this function (**MPI rank ids start at 0!**)

* `MPI.Comm_size(comm)`: total number of ranks in the given communicator

In [18]:
using MPI
MPI.Init()
comm = MPI.COMM_WORLD  # MPI.COMM_WORLD is the communicator - a group of processes that can talk to each other
rank = MPI.Comm_rank(comm)  # Comm_rank returns the individual rank (0, 1, 2, …) for each task that calls it
size = MPI.Comm_size(comm)  # Comm_size returns the total number of ranks.
println("Hello from process $(rank) out of $(size)")
MPI.Barrier(comm)

Hello from process 0 out of 1


The MPI.jl package contains a lot of functionality, but in principle one can get away with only point-to-point communication (`MPI.send()` and `MPI.recv()`). However, collective communication can sometimes require less effort. In any case, it is good to have a mental model of different communication patterns in MPI.

In [19]:
using MPI
MPI.Init()

comm = MPI.COMM_WORLD
rank = MPI.Comm_rank(comm)
size = MPI.Comm_size(comm)

if rank != 0
    # All ranks other than 0 should send a message
    local message = "Hello World, I'm rank $rank"
    MPI.send(message, comm, dest=0, tag=0)
else
    # Rank 0 will receive each message and print them
    for sender in 1:(size-1)
        message = MPI.recv(comm, source=sender, tag=0)
        println(message)
    end
end

**Summary**:
- MPI is a standard work-horse of parallel computing.
- All communication is handled explicitly - not behind the scenes as in `Distributed`.
- Programming with MPI requires a different mental model since each parallel rank is executing the same program and the programmer needs to distribute the work by hand.

<img src='https://github.com/amontoison/Workshop-GERAD/blob/main/Graphics/meme_distributed_computing.png?raw=1' width='400'>

# References:
- https://github.com/hlrs-tasc/julia-on-hpc-systems
- https://docs.julialang.org/en/v1/manual/distributed-computing
- https://github.com/JuliaParallel/MPI.jl