# Julia Parallel Computing

1. Asynchronous Tasks
2. Multi-Threading
3. Distributed Computing with Distributed.jl
4. Distributed Computing with MPI.jl
4. GPU Computing (available packages)

We will make use of the following basic Monte Carlo integration function through out this presentation

In [1]:
using Statistics
using BenchmarkTools # for the `@btime` macro

function mc_integrate(f::Function, a::Real=0, b::Real=1, n::Int=100000)
    ihat = 0.0
    for k in 1:n
        x = (b - a)*rand() + a
        ihat += (f(x) - ihat) / k
    end
    return ihat
end

function intense_computation(t::Real)
    sleep(t)
    return rand()
end;

# Tasks

1. What are Tasks?
2. Creating and Running Tasks
3. Communication Between Tasks

## What are Tasks?

**Tasks** are execution streams that do not depend on each other and can be done in any order. They can be executed asynchronously but they are not executed in parallel. That is, only one task is running at a given time but the order of execution is not predetermined.

Tasks are also known as **coroutines**.

## Creating and Running Tasks

Running a task is done in 3 steps:
1. Creation
2. Scheduling
3. Collect Results

## Creating and Running Tasks

Creating a task can be done directly with the `Task` object:

In [2]:
my_task = Task(()->mc_integrate(sin, -pi, pi))

Task (runnable) @0x00000001136e97b0

Note the `Task` constructor takes a function with no arguments.

We can always define an zero argument anonymous function to pass to the `Task` constructor. The `@task` macro exists for this purpose:

In [3]:
my_task = @task mc_integrate(sin, -pi, pi)

Task (runnable) @0x0000000133a55660

## Creating and Running Tasks

Next we schedule the task to run using the `schedule` function

In [4]:
schedule(my_task)

Task (done) @0x0000000133a55660

Many times we want to create and schedule a task immediately. We can do this with the `@async` macro:

In [5]:
my_task = @async mc_integrate(sin, -pi, pi)

Task (done) @0x0000000112f3a8c0

## Creating and Running Tasks

We can collect the results of the task once it has completed with the `fetch` function

In [6]:
fetch(my_task)

0.0002668053796949929

There are a few helpful details to know about `fetch`:
1. If the task has not finished when `fetch` is called, the call to `fetch` will block until the task has completed.
2. If the task raises an exception, `fetch` will raise a `TaskFailedException` which wraps the original exception.

## Creating and Running Tasks

Remember that tasks are not inherently parallel, just asynchronous execution streams. 

In [7]:
function run_mci()
    N = 10
    result = zeros(N)
    for k in 1:N
        result[k] = mc_integrate(sin, -pi, pi)
    end
    return mean(result)
end

function run_mci_task()
    N = 10
    task_res = zeros(N)
    @sync for k in 1:N
        @async(task_res[k] = mc_integrate(sin, -pi, pi))
    end
    return mean(task_res)
end;

In [8]:
@btime run_mci()
@btime run_mci_task();

  19.574 ms (1 allocation: 144 bytes)
  19.532 ms (74 allocations: 5.52 KiB)


**NOTE:** The `@sync` macro will block at the end of the code block until all inclosed `@async` statements have completed execution.

## Communicating Between Tasks

Sometimes we need to communicate between tasks. An easy way to accomplish this is to use Julia's `Channel` type. We can think of a `Channel` like a pipe or a queue: objects are put in at one end and taken off at the other.

Let's rewrite `run_mci_task` to use channels by dividing the `run_mci` workflow into two functions.

## Communicating Between Tasks

The first function will perform small Monte-Carlo integrations and put the results on a channel with the `put!` function. When it has finished the requested number of computations it will close the channel with `close` and return.

In [9]:
function integrator(output::Channel{Float64}, N::Int)
    for k in 1:N
        result = mc_integrate(sin, -pi, pi)
        put!(output, result)
    end
    close(output)
    return
end;

**NOTE:** If the channel is full, `put!` will block until space opens up.

## Communicating Between Tasks

The second function will take the results off the channel using the `take!` function and accumulate them into an average. We keep pulling results from the channel as long as there is a result or the channel is open. We can check the former with `isready` and the latter with `isopen`.

In [10]:
function accumulator(input::Channel{Float64})
    mean_val = 0.0
    k = 0
    while isready(input) || isopen(input)
        value = take!(input)
        k += 1
        mean_val += (value - mean_val) / k
    end
    return mean_val
end;

**NOTE:** If the channel is empty, the `take!` function will block until there is an item available.

## Communicating Between Tasks

Now we create channel which can hold 10 results, create and schedule both tasks and finally fetch the result.

In [11]:
function run_mci_chan()
    comm_ch = Channel{Float64}(10)
    atask = @async accumulator(comm_ch)
    @async integrator(comm_ch, 10)
    result = fetch(atask)    
    return result
end;

In [12]:
@btime run_mci_chan();

  19.583 ms (24 allocations: 1.72 KiB)


## Why Tasks?

If tasks aren't parallel, why are we talking about them in a parallel computing tutorial?

Remeber that tasks are discrete computation units. They naturally define boundaries between computational tasks. Julia's native parallel capabilities are ways of scheduling tasks on other processors.

# Multi-Threading

1. Starting Julia with Multiple Threads
1. `@threads` Macro
1. `@spawn` Macro
1. Using Channels

## Starting Julia with Multiple Threads

Julia (v1.3 or greater) has multithreading built into the language. By default, Julia starts with a single thread.  To start Julia with multiple threads either
* set the environment variable `JULIA_NUM_THREADS` to some value > 1
* start Julia with `--threads` or `-t` option (Julia v1.5 or greater)

Once started, we can see how many threads are running with the function `Threads.nthreads`

In [13]:
Threads.nthreads()

2

## `@threads` Macro

Many computations take the form of looping over an array where the result of the computation is put into an element in the array and these computations do not interact. In this case, we can make use of the `Threads.@threads` macro.

Lets apply this to our Monte-Carlo integration.

In [14]:
function run_mci_mt()
    N = 10
    mt_res = zeros(N)
    Threads.@threads for k in 1:N
        mt_res[k] = mc_integrate(sin, -pi, pi)
    end
    return mean(mt_res)
end;

In [15]:
@btime run_mci_mt();

  9.824 ms (14 allocations: 1.14 KiB)


## `@spawn` Macro

Some applications require dispatching individual tasks on different threads. We can do this using the `Threads.@spawn` macro. This is like the `@async` macro but will schedule the task on an available thread. That is, it creates a `Task` and schedules it but on an available thread.

In [16]:
function run_mci_mt2()
    N = 10
    mt_res = Vector{Float64}(undef, N)
    @sync for k in 1:N
        @async(mt_res[k] = fetch(Threads.@spawn mc_integrate(sin, -pi, pi)))
    end
    return mean(mt_res)
end;

In [17]:
@btime run_mci_mt2();

  9.946 ms (124 allocations: 9.89 KiB)


There are a couple of oddities about Julia's multi-threading capability to remember:
1. An available thread is any thread that has completed all assigned tasks or any remaining tasks are *blocked*.
2. As of Julia 1.6, once a task has been assigned to a thread, it remains on that thread even after blocking operations. This will likely change in future releases of Julia.

The combination of these two behaviors can lead to load imbalances amongst threads when there are blocking operations within a thread's tasks.

## Using Channels

Just as before, we can use a `Channel` to communicate between tasks in a multi-threaded environment. The only difference is that we replace `@async` with `Threads.@spawn`.

In [18]:
function run_mci_mt3()
    comm_ch = Channel{Float64}(10)
    itask = Threads.@spawn integrator(comm_ch, 10)
    atask = Threads.@spawn accumulator(comm_ch)
    result = fetch(atask)
    return result
end;

In [41]:
@btime run_mci_mt3();

LoadError: TaskFailedException

[91m    nested task error: [39mInvalidStateException: Channel is closed.
    Stacktrace:
     [1] [0m[1mtry_yieldto[22m[0m[1m([22m[90mundo[39m::[0mtypeof(Base.ensure_rescheduled)[0m[1m)[22m
    [90m   @ [39m[90mBase[39m [90m./[39m[90m[4mtask.jl:871[24m[39m
     [2] [0m[1mwait[22m[0m[1m([22m[0m[1m)[22m
    [90m   @ [39m[90mBase[39m [90m./[39m[90m[4mtask.jl:931[24m[39m
     [3] [0m[1mwait[22m[0m[1m([22m[90mc[39m::[0mBase.GenericCondition[90m{ReentrantLock}[39m[0m[1m)[22m
    [90m   @ [39m[90mBase[39m [90m./[39m[90m[4mcondition.jl:124[24m[39m
     [4] [0m[1mtake_buffered[22m[0m[1m([22m[90mc[39m::[0mChannel[90m{Float64}[39m[0m[1m)[22m
    [90m   @ [39m[90mBase[39m [90m./[39m[90m[4mchannels.jl:416[24m[39m
     [5] [0m[1mtake![22m
    [90m   @ [39m[90m./[39m[90m[4mchannels.jl:410[24m[39m[90m [inlined][39m
     [6] [0m[1maccumulator[22m[0m[1m([22m[90minput[39m::[0mChannel[90m{Float64}[39m[0m[1m)[22m
    [90m   @ [39m[35mMain[39m [90m./[39m[90m[4mIn[10]:5[24m[39m
     [7] [0m[1m(::var"#21#23"{Channel{Float64}})[22m[0m[1m([22m[0m[1m)[22m
    [90m   @ [39m[35mMain[39m [90m./[39m[90m[4mthreadingconstructs.jl:258[24m[39m

**NOTE:** We can see from the timing results this is not the best way to distribute the work since the `integrator` function has much more computational work than the `accumulator` function.

# Distributed Computing with Distributed.jl

1. Architecture
1. Setting Up
1. `@distributed` Macro
1. `@spawnat` Macro
1. Remote Channels
1. Shutting Down

## Architecture

Communication patterns are one-sided so users only manage one process. Communication itself takes the form of function or macro calls rather than explicit send and receive calls.

Distributed.jl is built on two basic types: remote calls and remote references. A remote call is a directive to execute a particular function on a particular process. A remote reference is a reference to a variable stored on a particular process.

There is a strong resemblence to the way Julia handles tasks: Function calls (wrapped in appropriate types) are scheduled on worker processes through remote calls which return remote references. The results of these calls are then retrieved by fetching the values using the remote references.

## Setting Up

We can launch more Julia processes on the same or other machines with the `addprocs` function. Here we launch 2 worker processes on the local machine:

In [20]:
using Distributed
addprocs(2);

Each Julia process is identified by a (64-bit) integer. We can get a list of all active processes with `procs`:

In [21]:
@show procs();

procs() = [1, 2, 3]


There is a distinction between the original Julia process and those we launched. The original Julia process is often called the **master** process and always has id equal to 1. The launched processes are called **workers**. We can obtain a list of workers with the `workers` function:

In [22]:
@show workers();

workers() = [2, 3]


By default, distributed processing operations use the workers only.

## Setting Up

We can also start up worker processes from the command lines using the `-p` or `--procs` option.

In order to launch Julia processes on other machines, we give `addprocs` a vector of tuples where each tuple is the hostname as a string paired with the number of processes to start on that host.

## Setting Up

The Julia global state is not copied in the new processes. We need to manually load any modules and define any functions we need. This is done with the `Distributed.@everywhere` macro:

In [23]:
@everywhere using Statistics
@everywhere function mc_integrate(f::Function, a::Real=0, b::Real=1, n::Int=100000)
    ihat = 0.0
    for k in 1:n
        x = (b - a)*rand() + a
        ihat += (f(x) - ihat) / k
    end
    return ihat
end;

## `@distributed` Macro

The `@distributed` macro is the distributed memory equivalent of the `Threads.@threads` macro. This macro partitions the range of the for loop and executes the computation on all worker processes. 

In [24]:
function run_mci_dist()
    N = 10
    total = @distributed (+) for k in 1:N
        mc_integrate(sin, -pi, pi)
    end
    return total/N
end;

In [25]:
@btime run_mci_dist();

  10.065 ms (143 allocations: 6.67 KiB)


Between the macro and the for loop is an optional reduction. Here we have used `+` but this can be any valid reduction operator including a user defined function. The values given to the reduction are the values of the last expression in the loop.

**NOTE:** If we do not provide a reduction, `@distributed` creates a task for each element of the loop and schedules them on worker processes and returns without waiting for the tasks to complete. To wait for completion of the tasks, the whole block can be wrapped with `@sync` macro.

## `@spawnat` Macro

Julia also provides more fine grained control for launching tasks on workers with the `@spawnat` Macro:

In [26]:
function run_mci_dist2()
    N = 10
    futures = Vector{Future}(undef, N)
    for k in 1:N
        futures[k] = @spawnat(:any, mc_integrate(sin, -pi, pi))
    end
    return mean(fetch.(futures))
end;

The first argument to `@spawnat` is the worker to run the computation on. Here we have used `:any` indicating that Julia should pick a process for us. If we wanted to execute the computation on a particular worker, we could specify which one with the worker id value. The second argument is the expression to compute.

`@spawnat` returns a `Future` which is a remote reference. We call `fetch` on it to retrieve the value of the computation. Note that `fetch` will block until the computation is complete.

In [27]:
@btime run_mci_dist2();

  11.928 ms (1233 allocations: 52.48 KiB)


## `@spawnat` Macro

**WARNING:** The entire expression is sent to the worker process before anything in the expression is executed. This can cause performance issues if we need a small part of a big object or array.

In [28]:
@everywhere struct MyData
    Data::Vector{Float64}
    N::Int
end
function slow(my_data::MyData)
    return fetch(@spawnat(2, mean(rand(my_data.N))))
end;

In [29]:
large_data = MyData(rand(1000000), 5)
@btime slow(large_data);

  1.740 ms (100 allocations: 4.13 KiB)


## `@spawnat` Macro

**WARNING:** The entire expression is sent to the worker process before anything in the expression is executed. This can cause performance issues if we need a small part of a big object or array.

This is easily fixed using a local variable:

In [30]:
function fast(my_data::MyData)
    n = my_data.N
    return fetch(@spawnat(2, mean(rand(n))))
end;

In [31]:
@btime fast(large_data);

  164.427 μs (92 allocations: 3.87 KiB)


In [32]:
large_data = nothing

**TODO:** Do I want to do this bit?

In [33]:
function run_mci_dist3()
    N = 10
    vals = Vector{Float64}(undef, N)
    @sync for k in 1:N
        @async(vals[k] = @fetch(mc_integrate(sin, -pi, pi)))
    end
    return mean(vals)
end

run_mci_dist3 (generic function with 1 method)

In [34]:
@btime run_mci_dist3();

  10.536 ms (605 allocations: 29.14 KiB)


In [35]:
using SharedArrays
function run_mci_dist4()
    N = 10
    vals = SharedArray{Float64}(N)
    @sync @distributed for k in 1:N
        vals[k] = mc_integrate(sin, -pi, pi)
    end
    return mean(vals)
end

run_mci_dist4 (generic function with 1 method)

In [36]:
@btime run_mci_dist4();

  10.850 ms (493 allocations: 21.78 KiB)


## Remote Channels

As suggested by the name, these are the remote versions of the `Channel` type we've already seen. If you look at the source code, they are actually wrap an `AbstractChannel` to provide the needed remote functionality. We can effectively treat them just like a `Channel`.

Let's redo our our `integrator` - `accumulator` workflow but this time let's do a better job of distributing the work:

In [37]:
@everywhere function integrator(output::RemoteChannel{Channel{Float64}}, N::Int)
    for k in 1:N
        result = mc_integrate(sin, -pi, pi)
        put!(output, result)
    end
    put!(output, NaN)
    return
end;
@everywhere function accumulator(input::RemoteChannel{Channel{Float64}}, nworkers::Int)
    mean_val = 0.0
    k = 0
    finished = 0
    while finished < nworkers
        value = take!(input)
        if value === NaN
            finished += 1
        else
            k += 1
            mean_val += (value - mean_val) / k
        end
    end
    return mean_val
end;

In [38]:
function run_mci_rc()
    comm_ch = RemoteChannel(()->Channel{Float64}(10), 1)
    @spawnat(2, integrator(comm_ch, 5))
    @spawnat(3, integrator(comm_ch, 5))
    atask = @async accumulator(comm_ch, nworkers())
    return fetch(atask)
end;

Here we create a `RemoteChannel` on the master process, divide the computationally intensive `integrator` function into two calls and remotely execute them on the worker processes. Then we start a task on the master process to accumulate the values and call fetch to wait for and retrieve the result.

In [39]:
@btime run_mci_rc();

  11.281 ms (1130 allocations: 46.59 KiB)


## Shutting Down

To shutdown the worker processes we can use `rmprocs`.

In [40]:
rmprocs(workers())

Task (done) @0x0000000112924010

Alternatively, we can also just exit Julia and the workers will be shutdown as part of the exit process.

# Distributed Computing with MPI.jl

1. Overview of MPI.jl
1. Example

<!-- **TODO:** Stuff:
1. Outline
2. describe MPI.jl api and capabilities
2. breakup script
3. describe what we're doing -->

## Overview of MPI.jl

[MPI.jl](https://github.com/JuliaParallel/MPI.jl) is a Julia wrapper around an MPI library. By default it will download an MPI library suitable for running on the installing system. However, it is easily configured to use an existing system MPI implementation (e.g. one of the MPI modules on Eagle). See the documentation for [instructions on how to do this](https://juliaparallel.github.io/MPI.jl/stable/configuration/).

MPI.jl mostly requires transmitted things to be buffers of basic types (types that are easily converted to C). Some functions can transmit arbitrary data by serializing them but this functionality is not as fleshed out as in mpi4py.

## Example

We first need to load and initialize MPI.

```julia
using MPI
MPI.Init()
```

`MPI.Init` loads the MPI library and calls `MPI_Init` as well as sets up types for that specific MPI library.

## Example

Now we can implement our Monte-Carlo integration workflow using MPI

```julia
function run_mci_mpi()

    comm = MPI.COMM_WORLD
    rank = MPI.Comm_rank(comm)
    size = MPI.Comm_size(comm)

    if rank == 0
        N = 10
        num = [N]
    else
        num = Vector{Int}(undef, 1)
    end
    MPI.Bcast!(num, 0, comm)

    rank_sum = 0.0
    for k in rank+1:size:num[1]
        rank_sum += mc_integrate(sin, -pi, pi)
    end

    total = MPI.Reduce([rank_sum], MPI.SUM, 0, comm)
    if rank == 0
        result = total / N
    else
        result = nothing
    end

    return result
end
```

## Example

To benchmark this we time it many (10000) times and track the minimal value (this is similar to what the `@btime` macro does).

```julia
function run_loop(nruns::Int)
    
    min_time = 1e10
    result = 0.0
    
    for _ in 1:nruns
        MPI.Barrier(MPI.COMM_WORLD)
        start = time()
        result = run_mci_mpi()
        stop = time()
        elapsed = stop - start
        if elapsed < min_time
            min_time = elapsed
        end
    end

    if MPI.Comm_rank(MPI.COMM_WORLD) == 0
        println("Elapsed time: ", min_time)
    end

    return
end

run_loop(10000)
```

## Example

Results

```shell
mpirun -n 2 julia mpi_mci.jl
  Activating environment at `~/HPC_Apps/julia-tutorial/Project.toml`
  Activating environment at `~/HPC_Apps/julia-tutorial/Project.toml`
Elapsed time: 0.01108694076538086
```

## GPU Computing

We provide a brief survey of available packages that can be used to get started.

Packages exist for [NVIDIA's CUDA](https://github.com/JuliaGPU/CUDA.jl), [AMD's ROCm](https://github.com/JuliaGPU/AMDGPU.jl), and [Intel's oneAPI](https://github.com/JuliaGPU/oneAPI.jl). CUDA.jl is the most mature while the other two, as of this writing, are still underdevelopment.

The package [KernelAbstractions.jl](https://github.com/JuliaGPU/KernelAbstractions.jl) is an abstraction layer for enabling different GPU backends.

See the JuliaGPU organization's [webpage](https://juliagpu.org/) or [github repo](https://github.com/JuliaGPU) for a great place to get started.

# Additional Resources

The following are great resource for learning more

*  [Julia Documentation](https://docs.julialang.org/en/v1/) -- the manual discusses the inner workings of Julia including the native parallel computing capabilities
*  [Julia community](https://julialang.org/community/) especially the following discourse channels
    - [Julia discourse](https://discourse.julialang.org/) -- all channels
    - [Julia at Scale discourse](https://discourse.julialang.org/c/domain/parallel/34) -- for scalable Julia
    - [Julia GPU discourse](https://discourse.julialang.org/c/domain/gpu/11) -- for GPU Julia computing
*  [Julia Youtube Channel](https://www.youtube.com/c/TheJuliaLanguage) -- tutorials for Julia and Julia packages
*  MPI.jl [package repo](https://github.com/JuliaParallel/MPI.jl) and [documentation](https://juliaparallel.github.io/MPI.jl/stable/)
*  JuliaGPU [webpage](https://juliagpu.org/) and [github repo](https://github.com/JuliaGPU)