# Distributed Computing: `Distributed` standard library

**What**
* **single Julia process -> multiple Julia processes** that coordinate to perform certain computations

**Why**
* Scaling things up: run computations on multiple CPU cores, potentially even on different machines, e.g. nodes of a supercomputer or a local cluster of desktop machines.
* Effectively increase your memory: process a large dataset, which wouldn't fit into local memory, in parallel across multiple machines with separate dedicated RAM.

**Julia provides two fundamental implementations and paradigms**
* Julia's built-in [`Distributed` standard library](https://docs.julialang.org/en/v1/stdlib/Distributed/)
  * master-worker model
* [Message Passing Interface (MPI)](https://www.mpi-forum.org/) through [MPI.jl](https://github.com/JuliaParallel/MPI.jl)
  * Single Program Multiple Data (SPMD)
  
The focus of this notebook is on the **`Distributed` standard library.**

## `Distributed` standard library

Julia's `Distributed` follows a master-worker paradigm for its native distributed parallelism: **One master process coordinates all the worker processes, which perform the actual computations.**

In [1]:
using Distributed

In [2]:
nprocs()

1

In [3]:
nworkers() # the master is considered a worker as long as there are no real workers

1

To increase the number of workers, i.e. Julia processes, from within a Julia session we can dynamically call **`addprocs`**.

Alternatively, when starting Julia from the command line, one can use the `-p` option up front. Example,

```
julia -p 4
```

will start Julia with 5 processes, 1 master and 4 workers.

In [4]:
addprocs(4)

4-element Vector{Int64}:
 2
 3
 4
 5

Every process has a Julia internal `pid` (process id). The master is always 1. You can get the workers pids from `workers()`.

In [5]:
workers()

4-element Vector{Int64}:
 2
 3
 4
 5

Note that the 4 worker's pids aren't necessarily 2, 3, 4 and 5 and one shouldn't rely on those literal values. Let's remove the processes and add them once more.

In [6]:
rmprocs(workers())

Task (done) @0x00000001234f4fd0

In [7]:
nworkers() # only the master is left

1

In [8]:
addprocs(4)

4-element Vector{Int64}:
 6
 7
 8
 9

In [9]:
workers()

4-element Vector{Int64}:
 6
 7
 8
 9

### One master to rule them all - `@spawn`, `@spawnat`, `@fetch`, `@fetchfrom`, `@everywhere`...

To execute commands and start computations on workers we can use the following macros

* `@spawn`: run a command or a code block on any worker and return a `Future` to it's result. It's basically a version of `@async` for remote processes.
* `@spawnat`: same as `@spawn` but one can choose a specific worker by providing its pid.

**Example:** Let's say we would like to generate a random matrix on one of the workers.

In [10]:
@spawn 3+3 # similar to @async

Future(6, 1, 10, ReentrantLock(nothing, 0x00000000, 0x00, Base.GenericCondition{Base.Threads.SpinLock}(Base.InvasiveLinkedList{Task}(nothing, nothing), Base.Threads.SpinLock(0)), (0, 4965123904, 0)), nothing)

In [11]:
result = @spawn 3+3

Future(7, 1, 11, ReentrantLock(nothing, 0x00000000, 0x00, Base.GenericCondition{Base.Threads.SpinLock}(Base.InvasiveLinkedList{Task}(nothing, nothing), Base.Threads.SpinLock(0)), (8, 4518087280, 4518087600)), nothing)

In [12]:
fetch(result)

6

Because the combination of spawning at fetching is so common, there is `@fetch` which combines them.

In [13]:
@fetch 3+3

6

In [14]:
@fetch rand(3,3)

3×3 Matrix{Float64}:
 0.456195  0.611805  0.319853
 0.877583  0.763865  0.715715
 0.208555  0.497212  0.354929

Which worker did the work?

In [30]:
@fetch begin
    println(myid());
    3+3
end

      From worker 9:	9


6

Using `@spawnat` and `@fetchfrom` we can delegate the work to a specific worker.

In [42]:
@fetchfrom 7 begin
    println(myid());
    3+3
end

      From worker 7:	7


6

We can use `@sync` as a blocker to wait for all workers to complete their tasks.

In [43]:
@sync begin
    pids = workers()
    @spawn (sleep(2); println("Today is reverse day!"))
    @spawn (sleep(1); println(" class!"))
    @spawn println("Hello")
end;
println("Done!")

      From worker 8:	Hello
      From worker 7:	 class!
      From worker 6:	Today is reverse day!
Done!


Ok, now that we understood all that, let's delegate a *complicated* calculation

In [44]:
using Random

function complicated_calculation()
    sleep(1) # so complex that it takes a long time :)
    randexp(5)
end

@fetch complicated_calculation()

LoadError: On worker 9:
UndefVarError: #complicated_calculation not defined
Stacktrace:
  [1] [0m[1mdeserialize_datatype[22m
[90m    @ [39m[90m~/.julia/juliaup/julia-1.8.0+0.x64/share/julia/stdlib/v1.8/Serialization/src/[39m[90m[4mSerialization.jl:1364[24m[39m
  [2] [0m[1mhandle_deserialize[22m
[90m    @ [39m[90m~/.julia/juliaup/julia-1.8.0+0.x64/share/julia/stdlib/v1.8/Serialization/src/[39m[90m[4mSerialization.jl:866[24m[39m
  [3] [0m[1mdeserialize[22m
[90m    @ [39m[90m~/.julia/juliaup/julia-1.8.0+0.x64/share/julia/stdlib/v1.8/Serialization/src/[39m[90m[4mSerialization.jl:813[24m[39m
  [4] [0m[1mhandle_deserialize[22m
[90m    @ [39m[90m~/.julia/juliaup/julia-1.8.0+0.x64/share/julia/stdlib/v1.8/Serialization/src/[39m[90m[4mSerialization.jl:873[24m[39m
  [5] [0m[1mdeserialize[22m
[90m    @ [39m[90m~/.julia/juliaup/julia-1.8.0+0.x64/share/julia/stdlib/v1.8/Serialization/src/[39m[90m[4mSerialization.jl:813[24m[39m[90m [inlined][39m
  [6] [0m[1mdeserialize_global_from_main[22m
[90m    @ [39m[90m~/.julia/juliaup/julia-1.8.0+0.x64/share/julia/stdlib/v1.8/Distributed/src/[39m[90m[4mclusterserialize.jl:160[24m[39m
  [7] [0m[1m#5[22m
[90m    @ [39m[90m~/.julia/juliaup/julia-1.8.0+0.x64/share/julia/stdlib/v1.8/Distributed/src/[39m[90m[4mclusterserialize.jl:72[24m[39m[90m [inlined][39m
  [8] [0m[1mforeach[22m
[90m    @ [39m[90m./[39m[90m[4mabstractarray.jl:2774[24m[39m
  [9] [0m[1mdeserialize[22m
[90m    @ [39m[90m~/.julia/juliaup/julia-1.8.0+0.x64/share/julia/stdlib/v1.8/Distributed/src/[39m[90m[4mclusterserialize.jl:72[24m[39m
 [10] [0m[1mhandle_deserialize[22m
[90m    @ [39m[90m~/.julia/juliaup/julia-1.8.0+0.x64/share/julia/stdlib/v1.8/Serialization/src/[39m[90m[4mSerialization.jl:959[24m[39m
 [11] [0m[1mdeserialize[22m
[90m    @ [39m[90m~/.julia/juliaup/julia-1.8.0+0.x64/share/julia/stdlib/v1.8/Serialization/src/[39m[90m[4mSerialization.jl:813[24m[39m
 [12] [0m[1mhandle_deserialize[22m
[90m    @ [39m[90m~/.julia/juliaup/julia-1.8.0+0.x64/share/julia/stdlib/v1.8/Serialization/src/[39m[90m[4mSerialization.jl:870[24m[39m
 [13] [0m[1mdeserialize[22m
[90m    @ [39m[90m~/.julia/juliaup/julia-1.8.0+0.x64/share/julia/stdlib/v1.8/Serialization/src/[39m[90m[4mSerialization.jl:813[24m[39m
 [14] [0m[1mhandle_deserialize[22m
[90m    @ [39m[90m~/.julia/juliaup/julia-1.8.0+0.x64/share/julia/stdlib/v1.8/Serialization/src/[39m[90m[4mSerialization.jl:873[24m[39m
 [15] [0m[1mdeserialize[22m
[90m    @ [39m[90m~/.julia/juliaup/julia-1.8.0+0.x64/share/julia/stdlib/v1.8/Serialization/src/[39m[90m[4mSerialization.jl:813[24m[39m[90m [inlined][39m
 [16] [0m[1mdeserialize_msg[22m
[90m    @ [39m[90m~/.julia/juliaup/julia-1.8.0+0.x64/share/julia/stdlib/v1.8/Distributed/src/[39m[90m[4mmessages.jl:87[24m[39m
 [17] [0m[1m#invokelatest#2[22m
[90m    @ [39m[90m./[39m[90m[4messentials.jl:729[24m[39m[90m [inlined][39m
 [18] [0m[1minvokelatest[22m
[90m    @ [39m[90m./[39m[90m[4messentials.jl:726[24m[39m[90m [inlined][39m
 [19] [0m[1mmessage_handler_loop[22m
[90m    @ [39m[90m~/.julia/juliaup/julia-1.8.0+0.x64/share/julia/stdlib/v1.8/Distributed/src/[39m[90m[4mprocess_messages.jl:176[24m[39m
 [20] [0m[1mprocess_tcp_streams[22m
[90m    @ [39m[90m~/.julia/juliaup/julia-1.8.0+0.x64/share/julia/stdlib/v1.8/Distributed/src/[39m[90m[4mprocess_messages.jl:133[24m[39m
 [21] [0m[1m#103[22m
[90m    @ [39m[90m./[39m[90m[4mtask.jl:484[24m[39m

What happened?

**Every worker is a separate Julia process.** (Think of having multiple Julia REPLs open at once.)

We only defined `complicated_calculation()` on the master process. The function doesn't exist on any of the workers yet.

The macro `@everywhere` allows us to perform steps on all processes (master and worker). This is particularly useful for loading packages and functions definitions etc.

In [45]:
@everywhere begin # execute this block on all workers
    using Random
    
    function complicated_calculation()
        sleep(1)
        randexp(5) # lives in Random
    end
end

In [46]:
@fetch complicated_calculation()

5-element Vector{Float64}:
 0.6710237418692887
 1.4397357198845113
 0.7428484539865857
 1.6067567517086865
 0.5521923518284665

### Data movement

There is a crucial difference between the following two pieces of code. Can you guess what it is? (without reading on 😉)

In [47]:
function method1()
    A = rand(100,100)
    B = rand(100,100)
    C = @fetch A^2 * B^2
end

method1 (generic function with 1 method)

In [48]:
function method2()
    C = @fetch rand(100,100)^2 * rand(100,100)^2
end

method2 (generic function with 1 method)

Let's benchmark them.

In [49]:
using BenchmarkTools
@btime method1();
@btime method2();

  909.329 μs (97 allocations: 237.96 KiB)
  591.787 μs (75 allocations: 81.11 KiB)


Method 1 is slower, because `A` and `B` are created on the master process, transferred to a worker, and squared and multiplied on the worker process before the result is finally transferred back to the master.

Method 2, on the other hand, creates, squares, and multiplies the random matrix all on the work process and only submits the result to the master.

Hence, `method1` is **transferring 3x as much data** between the master and the worker!

**Efficient data movement is crucial for efficient parallel computing!**

In this toy example, it's rather easy to identify the faster method.

In a real program, however, understanding data movement does require more thought and likely some measurement.

For example, if the first process needs matrix `A` in a follow-up computation then the first method might be better in this case. Or, if computing `A` is expensive and only the current process has it, then moving it to another process might be unavoidable.

#### Computer latency at a human scale

To understand why thinking about data is important it's instructive to look at the time scales involved in data access.

<img src="../imgs/latency_human_scales.png" width=900px>

(taken from https://www.prowesscorp.com/computer-latency-at-a-human-scale/)

#### Avoid globals (once more)

In [50]:
myglobal = 4

4

In [51]:
function whohas(s::String)
    @everywhere begin
        var = Symbol($s)
        if isdefined(Main, var)
            println("$var exists.")
        else
            println("Doesn't exist.")
        end
    end
    nothing
end

whohas (generic function with 1 method)

In [52]:
whohas("myglobal")

myglobal exists.
      From worker 7:	Doesn't exist.
      From worker 8:	Doesn't exist.
      From worker 6:	Doesn't exist.
      From worker 9:	Doesn't exist.


In [53]:
@fetchfrom 6 myglobal+2

6

In [54]:
whohas("myglobal")

myglobal exists.
      From worker 6:	myglobal exists.
      From worker 7:	Doesn't exist.
      From worker 9:	Doesn't exist.
      From worker 8:	Doesn't exist.


Globals get copied to workers and continue to exist as globals even after the call.

This could lead to **memory accumulation** if many globals are used (just as it would in a single Julia session).

It's better to avoid them.

#### Explicit data movement: `Channel` and `RemoteChannel`

Implement communication between tasks. Functions: `put!`, `take!`, `fetch`, `isready` and `wait`.

In [55]:
ch = Channel{Int}(5) # a channel that can hold up to 5 integers

Channel{Int64}(5) (empty)

In [56]:
isready(ch) # something in the channel?

false

In [57]:
put!(ch, 3)

3

In [58]:
isready(ch)

true

In [59]:
fetch(ch)

3

In [60]:
take!(ch)

3

In [61]:
isready(ch)

false

In [62]:
put!(ch, 4)

4

In [63]:
fetch(ch)

4

In [64]:
take!(ch)

4

**Be careful**, `take!` and `put!` are blocking if the channel is empty or full!

In [65]:
isready(ch)

false

In [None]:
# take!(ch) if we execute this, while isready(ch) == false, the current Julia session will hang.

##### `RemoteChannel`

* A `Channel` is local to a process. Worker 2 cannot directly refer to a `Channel` on worker 3 and vice-versa.


* A `RemoteChannel`, however, can put and take values across workers. A `RemoteChannel` can be thought of as a handle to a `Channel`.


* Any process with a reference to a `RemoteChannel` can put and take items from the channel. Data is automatically sent to (or retrieved from) the process a `RemoteChannel` is associated with.


* The process id, pid, associated with a `RemoteChannel` identifies the process where the backing store, i.e., the backing Channel exists.

In [66]:
nworkers()

4

In [67]:
function do_something()
    rc = RemoteChannel(()->Channel{Int}(10)) # lives on the master
    @sync for p in workers()
        @spawnat p put!(rc, myid())
    end
    rc
end

r = do_something()

RemoteChannel{Channel{Int64}}(1, 1, 15894)

In [68]:
isready(r)

true

In [69]:
while isready(r)
    @show take!(r)
end

take!(r) = 7
take!(r) = 8
take!(r) = 6
take!(r) = 9


The ecosystem also contains a couple of tools, that make data transfer even simpler. See for example [ParallelDataTransfer.jl](https://github.com/ChrisRackauckas/ParallelDataTransfer.jl/).

## High-level tools: `@distributed` and `pmap`

So far we have seen some of the fundamental building blocks for distributed computing in Julia. However, in practice, one wants to think as little as possible about how to distribute the work and explicitly spawn tasks.

Fortunately, many useful parallel computations do not require (much) data movement at all. A common example is a direct Monte Carlo simulation, where multiple processes can handle independent simulation trials simultaneously. (We'll get to that later in the exercises!)

Julia provides **high-level convenience** tools to
 * parallelize loops ([**`@distributed`**](https://docs.julialang.org/en/v1/stdlib/Distributed/#Distributed.@distributed)) and
 * apply a function to all elements of a collection ([**`pmap`**](https://docs.julialang.org/en/v1/stdlib/Distributed/#Distributed.pmap))


#### Distributed loops (`@distributed`)

In [70]:
using Distributed, BenchmarkTools; rmprocs(workers()); addprocs(4); nworkers()

4

#### Example: Reduction

Task: Counting heads in a series of coin tosses.

In [72]:
rand(Bool, 3,3)

3×3 Matrix{Bool}:
 0  0  0
 0  0  0
 0  1  1

In [73]:
function count_heads_loop(n)
    c = 0
    for i = 1:n
        c += rand(Bool)
    end
    return c
end

N = 200_000_000
@btime count_heads_loop($N);

  322.239 ms (0 allocations: 0 bytes)


Note that these kinds of computations are called **reductions** (with `+` being the **reducer function**).

In [None]:
reduce(+, map(..., 1:n))

In [74]:
count_heads_reduce(n) = mapreduce(i -> rand(Bool), +, 1:n)
@btime count_heads_reduce($N)

  327.932 ms (0 allocations: 0 bytes)


99995068

In [79]:
function count_heads_distributed_loop(n)
    c = @distributed (+) for i in 1:n
        Int(rand(Bool))
    end
    return c
end

count_heads_distributed_loop (generic function with 1 method)

In [80]:
@btime count_heads_distributed_loop($N);

  81.166 ms (302 allocations: 12.55 KiB)


The distributed version is about **4x faster**, which is all we could hope for.

With `@distributed` the work is **evenly distributed** between the workers.

In [81]:
function count_heads_distributed_verbose(n)
    c = @distributed (+) for i in 1:n
        x = Int(rand(Bool))
        println(x);
        x
    end
    c
end

count_heads_distributed_verbose(8);

      From worker 12:	0
      From worker 10:	1
      From worker 12:	0
      From worker 10:	1
      From worker 11:	0
      From worker 11:	1
      From worker 13:	0
      From worker 13:	1


However, by using `@distributed` we let Julia decide how to split up the work and can't control it ourselves.

#### A common mistake

In [94]:
function g(n)
    a = 0
    @distributed (+) for i in 1:n
        a += 1;
        println(a)
        a
    end

end

a = g(10);

      From worker 13:	1
      From worker 13:	2
      From worker 12:	1
      From worker 10:	1
      From worker 11:	1
      From worker 12:	2
      From worker 10:	2
      From worker 11:	2
      From worker 11:	3
      From worker 10:	3


What do you expect the value of `a` to be?

In [95]:
a

18

#### Example: `SharedArray`s

Apart from `@distributed (reducer) ...` there also is a `@distributed for ...` form. The latter is **non-blocking** and returns a `Task`. (You can think of it as a distributed version of `@spawn` for all the iterations.)

However, since the loop body will be executed on different processes, one must be careful to operate on data structures that are available on all processes (similar to the mistake highlighted above).

In [97]:
function square_broken()
    A = collect(1:10)
    @sync @distributed for i in eachindex(A)
        A[i] = A[i]^2
    end
    return A
end

square_broken (generic function with 1 method)

In [98]:
square_broken()

10-element Vector{Int64}:
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10

To actually make all processes operate on the same array, one can use a `SharedArray`. For this to work, the **processes need to live on the same host**.

In [99]:
@everywhere using SharedArrays # must be loaded everywhere

In [100]:
A = rand(3,2)

3×2 Matrix{Float64}:
 0.969195  0.935103
 0.292818  0.0302614
 0.508691  0.0683493

In [101]:
S = SharedArray(A)

3×2 SharedMatrix{Float64}:
 0.969195  0.935103
 0.292818  0.0302614
 0.508691  0.0683493

In [102]:
function square!(X)
    for i in eachindex(X)
        sleep(0.001) # mimicing some computational cost
        X[i] = X[i]^2
    end
end

function square_distributed!(X)
    @sync @distributed for i in eachindex(X)
        sleep(0.001) # mimicing some computational cost
        X[i] = X[i]^2
    end
end

A = rand(10,10)
S = SharedArray(A)

@btime square!(A);
@btime square_distributed!($S);

  215.424 ms (508 allocations: 15.31 KiB)
  57.534 ms (574 allocations: 26.09 KiB)


### Parallel map: `pmap`

The `square!` functions above are typical `map` operations where a function `f` is applied to all elements of a collection.

In [103]:
map(x->x^2, 1:10)

10-element Vector{Int64}:
   1
   4
   9
  16
  25
  36
  49
  64
  81
 100

Such a pattern can be parallelized in Julia via the high-level function `pmap` ("parallel map").

#### Example: Singular values of multiple matrices

In [104]:
using Distributed, BenchmarkTools; rmprocs(workers()); addprocs(4); nworkers()

4

In [105]:
@everywhere using LinearAlgebra

M = Matrix{Float64}[rand(200,200) for i = 1:10];

In [106]:
svdvals(rand(2,2))

2-element Vector{Float64}:
 1.0416253690166695
 0.38207294202501374

In [107]:
map(svdvals, M)

10-element Vector{Vector{Float64}}:
 [100.22216010162019, 7.902964826634815, 7.863366839274766, 7.764180940316365, 7.728568696034237, 7.619445432235017, 7.509260755518567, 7.389142363513787, 7.302011040948945, 7.23728664449622  …  0.29583855129634457, 0.27915861522366114, 0.2760055951919928, 0.18987877772113662, 0.1503453058570444, 0.14509688045515612, 0.11594109178982377, 0.10656738289268992, 0.057043219949094115, 0.027303958851537333]
 [99.72317568478442, 8.04951866464346, 7.966285787199083, 7.745545037655448, 7.604392166260176, 7.598192208719936, 7.524463556383573, 7.446852110118335, 7.400323509879351, 7.207331727567699  …  0.30443464637937667, 0.28743798945022264, 0.21815690833040588, 0.18584889928307266, 0.1448917143135192, 0.12751916317962095, 0.09768315274287644, 0.059353917491157565, 0.03349897112088835, 0.019994795451369916]
 [100.29116038265839, 7.917122163685977, 7.905278390410856, 7.765393448186639, 7.674170521465391, 7.666093988178944, 7.502556019677619, 7.4325419199829, 7

In [108]:
pmap(svdvals, M)

10-element Vector{Vector{Float64}}:
 [100.22216010162019, 7.9029648266348085, 7.863366839274777, 7.7641809403163755, 7.728568696034231, 7.619445432235024, 7.509260755518561, 7.389142363513796, 7.302011040948944, 7.237286644496214  …  0.29583855129634423, 0.27915861522366187, 0.27600559519199364, 0.18987877772113748, 0.15034530585704392, 0.14509688045515634, 0.11594109178982402, 0.10656738289269002, 0.057043219949094434, 0.027303958851537583]
 [99.72317568478442, 8.049518664643456, 7.966285787199081, 7.745545037655454, 7.6043921662601806, 7.5981922087199525, 7.524463556383573, 7.446852110118339, 7.400323509879348, 7.2073317275676985  …  0.3044346463793764, 0.2874379894502229, 0.21815690833040582, 0.18584889928307255, 0.1448917143135191, 0.12751916317962078, 0.09768315274287591, 0.05935391749115699, 0.03349897112088893, 0.019994795451369517]
 [100.29116038265839, 7.917122163685977, 7.905278390410855, 7.765393448186638, 7.674170521465394, 7.666093988178941, 7.502556019677615, 7.4325419199

Let's check that this indeed utilized multiple workers.

In [110]:
pmap(M) do m
    println(myid());
    svdvals(m)
end;

      From worker 17:	17
      From worker 14:	14
      From worker 15:	15
      From worker 16:	16
      From worker 17:	17
      From worker 14:	14
      From worker 15:	15
      From worker 16:	16
      From worker 17:	17
      From worker 14:	14


In [109]:
pmap(m->begin println(myid()); svdvals(m) end, M)

      From worker 15:	15
      From worker 16:	16
      From worker 17:	17
      From worker 14:	14
      From worker 15:	15
      From worker 16:	16
      From worker 14:	14
      From worker 17:	17
      From worker 15:	15
      From worker 16:	16


10-element Vector{Vector{Float64}}:
 [100.22216010162019, 7.9029648266348085, 7.863366839274777, 7.7641809403163755, 7.728568696034231, 7.619445432235024, 7.509260755518561, 7.389142363513796, 7.302011040948944, 7.237286644496214  …  0.29583855129634423, 0.27915861522366187, 0.27600559519199364, 0.18987877772113748, 0.15034530585704392, 0.14509688045515634, 0.11594109178982402, 0.10656738289269002, 0.057043219949094434, 0.027303958851537583]
 [99.72317568478442, 8.049518664643456, 7.966285787199081, 7.745545037655454, 7.6043921662601806, 7.5981922087199525, 7.524463556383573, 7.446852110118339, 7.400323509879348, 7.2073317275676985  …  0.3044346463793764, 0.2874379894502229, 0.21815690833040582, 0.18584889928307255, 0.1448917143135191, 0.12751916317962078, 0.09768315274287591, 0.05935391749115699, 0.03349897112088893, 0.019994795451369517]
 [100.29116038265839, 7.917122163685977, 7.905278390410855, 7.765393448186638, 7.674170521465394, 7.666093988178941, 7.502556019677615, 7.4325419199

In [111]:
@btime map($svdvals, $M);
@btime pmap($svdvals, $M);

  50.904 ms (81 allocations: 4.22 MiB)
  15.402 ms (518 allocations: 37.42 KiB)


In [None]:
map(i->iteration(i), 1:n)

### When to choose which? (`@distributed` vs `pmap`)

Julia's `pmap` is designed for the case where

* one wants to apply **a function to a collection**,
* each function call does a **larger amount of work**, and/or
* the **workload is non-uniform** (load-balancing).

On the other hand, `@distributed` is good for

* **reductions**, like sums, where
* **each iteration may be tiny**, i.e. perhaps only summing two numbers, and/or
* each iteration **takes about the same time** (uniform)

## High-level array abstractions: [DistributedArrays.jl](https://github.com/JuliaParallel/DistributedArrays.jl)

In a `DArray`, each process has local access to just a chunk of the data, and no two processes share the same chunk. Processes can be on different hosts.

Distributed arrays are for example useful if

* Expensive calculations should be performed in parallel on parts of the array on different hosts.
* The data doesn't fit into the local machines memory (i.e. loading big files in parallel).

In [113]:
using Distributed, BenchmarkTools; rmprocs(workers());

└ @ Distributed /Users/julia/.julia/scratchspaces/a66863c6-20e8-4ff4-8a62-49f30b1f605e/agent-cache/default-macmini-x64-6.0/build/default-macmini-x64-6-0/julialang/julia-release-1-dot-8/usr/share/julia/stdlib/v1.8/Distributed/src/cluster.jl:1048


In [114]:
# make sure that all workers use the same Julia environment
addprocs(4; exeflags="--project")

4-element Vector{Int64}:
 18
 19
 20
 21

In [115]:
# check
@everywhere @show Base.active_project()

Base.active_project() = "/Users/crstnbr/repos/JuliaWorkshops/JuliaHLRS22/Project.toml"
      From worker 18:	Base.active_project() = "/Users/crstnbr/repos/JuliaWorkshops/JuliaHLRS22/Project.toml"
      From worker 21:	Base.active_project() = "/Users/crstnbr/repos/JuliaWorkshops/JuliaHLRS22/Project.toml"
      From worker 20:	Base.active_project() = "/Users/crstnbr/repos/JuliaWorkshops/JuliaHLRS22/Project.toml"
      From worker 19:	Base.active_project() = "/Users/crstnbr/repos/JuliaWorkshops/JuliaHLRS22/Project.toml"


In [116]:
@everywhere using DistributedArrays, LinearAlgebra

In [117]:
M = Matrix{Float64}[rand(200,200) for i = 1:10];

In [118]:
D = distribute(M)

10-element DArray{Matrix{Float64}, 1, Vector{Matrix{Float64}}}:
 [0.9958450726667368 0.8146647116946115 … 0.8781761651635122 0.8136397144747828; 0.22430577802469576 0.22019411874526984 … 0.6093920446154011 0.7275124529536269; … ; 0.6208343837625583 0.32306908986431093 … 0.6184536708211557 0.586311855579728; 0.1984646471994016 0.501563698963817 … 0.4445045448442775 0.8886762618530463]
 [0.11521656213503484 0.42507800193048506 … 0.7597256252727945 0.39344835814088364; 0.10057599577712573 0.36392267098261377 … 0.5509618376163837 0.3716381856079186; … ; 0.3937617843406124 0.19941558289146477 … 0.015995849597608136 0.10049928711737932; 0.062006415396738235 0.2651184111822895 … 0.6702086460744835 0.23718258647599]
 [0.8928494719087141 0.43151467397779353 … 0.7940771420139011 0.5713269166086569; 0.12609181697247107 0.23467631126273214 … 0.5655667634763868 0.012923910054933985; … ; 0.4596890954044771 0.0019073209879029163 … 0.9516825814047944 0.682636229060409; 0.9800171829512591 0.10960564402

Which workers hold parts of D?

In [119]:
procs(D)

4-element Vector{Int64}:
 18
 19
 20
 21

In [120]:
workers()

4-element Vector{Int64}:
 18
 19
 20
 21

Which parts do they hold?

In [121]:
localpart(D) # the master doesn't hold anything

Matrix{Float64}[]

In [122]:
# Which parts do they hold?
for p in workers()
    println(@fetchfrom p DistributedArrays.localindices(D))
end

(1:3,)
(4:6,)
(7:8,)
(9:10,)


In [123]:
@btime map($svdvals, $M);
@btime map($svdvals, $D);

  50.767 ms (81 allocations: 4.22 MiB)
  13.728 ms (576 allocations: 30.41 KiB)


In [124]:
@btime pmap($svdvals, $M);

  15.486 ms (518 allocations: 37.56 KiB)


## *Actual* distributed computing: Comments

### Creating workers on other machines

So far we have worked with multiple process on the same system, because we simply used `addprocs(::Integer)`. To put worker processes on other machines, e.g. nodes of a cluster, we need to modify the initial `addprocs` call appropriately.

In Julia, starting worker processes is handled by [ClusterManagers](https://docs.julialang.org/en/v1/manual/distributed-computing/#ClusterManagers).

* The default one is `LocalManager`. It is automatically used when running `addprocs(i::Integer)` and we have implicitly used it already.
* Another important one is `SSHManager`. It is automatically used when running `addprocs(hostnames::Array)`, e.g. `addprocs(["node123", "node456"])`. The only requirement is a **passwordless ssh access** to all specified hosts.
* Cluster managers for SLURM, PBS, and others are provided in [ClusterManagers.jl](https://github.com/JuliaParallel/ClusterManagers.jl). For SLURM, this will make `addprocs` use `srun` under the hood.

*Demonstrate in terminal from thp node*

```julia
using Distributed

addprocs(["l93", "l94"])

@everywhere println(gethostname())
```

One can also start multiple processes on different machines:
```julia
addprocs([("node123", 2), ("node456", 3)]) # starts 2 workers on node123 and 3 workers on node456

# Use :auto to start as many processes as CPU threads are available
```

**Be aware of different paths:**
* By default, `addprocs` expects to find the julia executable on the remote machines under the same path as on the host (master).
* It will also try to `cd` to the same folder (set the working directory).


As you can see from `?addprocs`, `addprocs` takes a bunch of keyword arguments, two of which are of particular importance in this regard:

* `dir`: working directory for the worker processes
* `exename`: path to julia executable (potentially augmented with pre-commands) for the worker processes

In [None]:
# cleanup
rmprocs(workers())