# Parallel computing with MPI-3 RMA and Julia

<br/>
### JuliaCon 2018
<br/>
[Bart Janssens](https://github.com/barche)

## Bird's eye view

1. Classic MPI message passing
2. Native Julia parallelism
3. MPI-3 one-sided "Remote Memory Access" (RMA)
4. Applications:
    - MPI Arrays
    - Julia IO on MPI
    - New clustermanager

## The MPI way
* MPI: Message passing interface
* Original: **cooperative** communication

|Rank 0|Rank 1|
|---|---|---|
|`Init()`|`Init()`|
|`mymessage = "Hi 1!"`||
|`Send(mymessage,1)`|`Recv(buffer)`|
|`Finalize()`|`Finalize()`|



The same program is sent to all ranks:
```julia
using MPI

MPI.Init()

const comm = MPI.COMM_WORLD
const rank = MPI.Comm_rank(comm)
const tag = 0

if rank == 0
  msg = unsafe_wrap(Vector{UInt8},"Hi 1!")
  println("Sending message from rank 0 to rank 1")
  MPI.Send(msg, 1, tag, comm)
elseif rank == 1
  rec_buf = fill(UInt8('_'), 10)
  MPI.Recv!(rec_buf, 0, tag, comm)
  println("Received message $(String(rec_buf)) from rank 0")
end

println("Rank $rank is finalizing.")
MPI.Finalize()
```

Output of `mpirun -np 4 julia --project src/sendrecv.jl`:

```text
Sending message from rank 0 to rank 1
Rank 3 is finalizing.
Rank 2 is finalizing.
Received message Hi 1!_____ from rank 0
Rank 0 is finalizing.
Rank 1 is finalizing.
```

### Collective calls
* Send/Receive works only between two processes
* Collective calls exist, e.g. summing an array:

```julia
using MPI

MPI.Init()

const comm = MPI.COMM_WORLD
const rank = MPI.Comm_rank(comm)
const root = 0

vec = fill(1,10)
s = MPI.Reduce(sum(vec), +, root, comm) # MUST be called on all ranks
println("Sum on rank $rank: $(repr(s))")

MPI.Finalize()

```


Output of `mpirun -np 4 julia --project src/reduce.jl`:

```text
Sum on rank 2: nothing
Sum on rank 3: nothing
Sum on rank 0: 40
Sum on rank 1: nothing
```

Or using `Allreduce`:
```text
Sum on rank 2: 40Sum on rank 3: 40
Sum on rank 1: 40

Sum on rank 0: 40
```

## The Julia way
* One-sided
* Remote references and remote call
* Directed by a "master" process.
* "Workers" do the actual work and can be started and stopped using `addprocs` and `rmprocs`

Starting up:

In [1]:
using Distributed
addprocs(3)

3-element Array{Int64,1}:
 2
 3
 4

Creating some arrays remotely:

In [50]:
remote_arrays = [@spawnat w fill(myid(),10) for w in workers()]

3-element Array{Future,1}:
 Future(2, 1, 102, nothing)
 Future(3, 1, 103, nothing)
 Future(4, 1, 104, nothing)

Fetching the result:

In [51]:
fetch(remote_arrays[1])

10-element Array{Int64,1}:
 2
 2
 2
 2
 2
 2
 2
 2
 2
 2

Compute the sum:

In [52]:
sums = map(remote_arrays) do x
    @spawn begin
        println("Computing sum for $x on process $(myid())")
        return sum(fetch(x))
    end
end;

      From worker 3:	Computing sum for Future(3, 1, 103, nothing) on process 3
      From worker 4:	Computing sum for Future(4, 1, 104, nothing) on process 4
      From worker 2:	Computing sum for Future(2, 1, 102, Some([2, 2, 2, 2, 2, 2, 2, 2, 2, 2])) on process 2


In [53]:
sum(fetch.(sums))

90

In [54]:
sums

3-element Array{Future,1}:
 Future(2, 1, 106, Some(20))
 Future(3, 1, 107, Some(30))
 Future(4, 1, 108, Some(40))

Or using the higher level `@distributed` macro:

In [55]:
@distributed (+) for x=remote_arrays
    println("Computing sum for $x on process $(myid())")
    sum(fetch(x))
end

      From worker 2:	Computing sum for Future(2, 1, 102, Some([2, 2, 2, 2, 2, 2, 2, 2, 2, 2])) on process 2
      From worker 3:	Computing sum for Future(3, 1, 103, Some([3, 3, 3, 3, 3, 3, 3, 3, 3, 3])) on process 3
      From worker 4:	Computing sum for Future(4, 1, 104, Some([4, 4, 4, 4, 4, 4, 4, 4, 4, 4])) on process 4


90

## MPI one-sided communication
* First introduced in MPI-2, extended in MPI-3
* Focus here on "Remote Memory Access" (RMA)
* Basic idea: define a region of memory on each process, that other processes can read from or write to
* Target can be passive
* Needs locking!

#### Setup of the shared memory region

```julia
# set up an array of length N on each process and mark it shared
shared = zeros(N)
win = MPI.Win()
MPI.Win_create(shared, MPI.INFO_NULL, comm, win)
```

#### Have other ranks write the value of their rank to rank 0

```julia
# Every rank writes to the array held by rank 0 (dest) at its rank position
offset = rank
nb_elms = 1
MPI.Win_lock(MPI.LOCK_EXCLUSIVE, dest, no_assert, win)
MPI.Put([Float64(rank)], nb_elms, dest, offset, win)
MPI.Win_unlock(dest, win)
```

#### Read the array at rank 0
```julia
if rank == dest
  MPI.Win_lock(MPI.LOCK_SHARED, dest, no_assert, win)
  println("My partners sent me this: ", shared')
  MPI.Win_unlock(dest, win)
end
```

Output:
```text
My partners sent me this: [0.0 1.0 2.0 3.0]
```

## Application: MPIArrays

* Distributed array built around a per-processor Julia array that is shared as an MPI Window
* Implements `AbstractArray`
* Simple implementation

### How it works
See `src/simplempiarray.jl` file:
```julia
struct SimpleMPIVector{T} <: AbstractVector{T}
  localarray::Vector{T}
  comm::MPI.Comm
  win::MPI.Win
  myrank::Int

  function SimpleMPIVector{T}(comm::MPI.Comm, len) where {T}
    locarr = Vector{T}(undef, len)
    win = MPI.Win()
    MPI.Win_create(locarr, MPI.INFO_NULL, comm, win)
    return new{T}(locarr, comm, win, MPI.Comm_rank(comm))
  end
end
```

```julia
Base.IndexStyle(::Type{SimpleMPIVector{T}}) where {T} = IndexLinear()
Base.size(v::SimpleMPIVector) = size(v.localarray) .* MPI.Comm_size(v.comm)

function Base.getindex(v::SimpleMPIVector{T}, i::Int) where {T}
  loclen = length(v.localarray)
  target_rank = (i-1) ÷ loclen
  local_index = (i-1) % loclen
  result = Ref{T}()
  MPI.Win_lock(MPI.LOCK_SHARED, target_rank, 0, v.win)
  MPI.Get(result, 1, target_rank, local_index, v.win)
  MPI.Win_unlock(target_rank, v.win)
  return result[]
end
```

Usage example:
```julia
vec = SimpleMPIVector{Int}(comm, mylength)
rank == 0 && @show length(vec)

for i in mystart:myend
  vec[i] = rank
end

MPI.Barrier(comm)

rank == 0 && println("The global vector is ", vec')
```

Output of `mpirun -np 4 julia --project src/simplempiarray.jl`
```text
length(vec) = 16
The global vector is [0 0 0 0 1 1 1 1 2 2 2 2 3 3 3 3]
```

In [6]:
using Pkg
Pkg.activate(@__DIR__)
using DistributedArrays

In [7]:
distribute(rand(100), procs=procs(), dist=(nprocs(),))

Add `using Distributed` to your imports.
  likely near In[7]:1
Add `using Distributed` to your imports.
  likely near In[7]:1


100-element DArray{Float64,1,Array{Float64,1}}:
 0.8531564517665009  
 0.2089520384280661  
 0.4861428012318145  
 0.28061196776306474 
 0.6246679083390314  
 0.8919679135956722  
 0.9971890730164081  
 0.9376863917843443  
 0.38369774043545846 
 0.7200490732563147  
 0.2960357911575635  
 0.5553101074243334  
 0.7756154549737408  
 ⋮                   
 0.6253538163103969  
 0.409447688103004   
 0.17537936529799492 
 0.4931933437740019  
 0.5055103617375258  
 0.4544441735749394  
 0.6711289021422038  
 0.019533977256011026
 0.4761453639397015  
 0.40448152051286246 
 0.6382510233180076  
 0.12140908196787525 

  likely near /Users/bjanssens/.julia/packages/IJulia/RBfP/src/kernel.jl:32
  likely near /Users/bjanssens/.julia/packages/IJulia/RBfP/src/kernel.jl:32
  likely near /Users/bjanssens/.julia/packages/IJulia/RBfP/src/kernel.jl:32
