# Parallelism in Julia

Julia offers three main methods for parallelism:

1. Coroutines ("green threads" or Tasks)
2. Multi-Threading
3. Multi-Core or Distributed Processing (Multi Processing)

There is also an additional level that's relevant for MixedModels, but which we won't highlight here: concurrency in external libraries, such as BLAS.

Note that the material presented here is very condensed and based on several Julia documents as well as my experience implementing multi-threading for `MixedModels.parametricbootstrap` and `MixedModelsSim.simulate_waldtests`.

1. [Announcing composable multi-threaded parallelism in Julia](https://julialang.org/blog/2019/07/multithreading/)
2. [Parallel Computing](https://docs.julialang.org/en/v1/manual/parallel-computing/)
3. [Surprising capture boxing behavior in closure](https://discourse.julialang.org/t/surprising-capture-boxing-behavior-in-closure/20254).
4. [`ProgressMeter.jl` documentation](https://github.com/timholy/ProgressMeter.jl)

## Coroutines

Coroutines don't depend on the particulars of the host operating system at all. They are handled completely within Julia. But they require a complete restructuring of your code.

If you've ever written a generator in Python (i.e. written a function that uses `yield` instead of `return`), then you've used a form of coroutines. They are not impossible to right, but they're hard to strap on post hoc. 

That said, if you're using comprehensions or `map` with independent entries, you should check out [`asyncmap`](https://docs.julialang.org/en/v1/base/parallel/#Base.asyncmap).

Also, there seems to be a serious restriction to these:

> Currently, all tasks in Julia are executed in a single OS thread co-operatively. Consequently, asyncmap is beneficial only when the mapping function involves any I/O - disk, network, remote worker invocation, etc.

Note that a lot of magic happens in the other two options that gives a surface appearance similar to coroutines (e.g. `fetch()` is defined on Tasks).


## Multi-Threading

Multi-Threading works by splitting the workload among threads on a single machine. This has implications for the cost of copying objects as well as how we can share memory.

Note that this appears to fall somewhere between Python's `threading` and `multiprocessing` libraries. The Julia documentation consistently calls things "threads" or even "operating system threads", yet discusses the "fork and join" approach, which on Unix systems would be closer to "processes". Based on my examination of running processes on my Linux system, Julia appears to be using "threads", but I haven't taken the time to go through the source. This may also be operating system dependent, similar to `multicore` vs. `multisession` in R. In either case, Julia does not have a GIL (Global Interpreter Lock) like Python.

If you have used the relatively new `future` package in R, then you will be relatively well preparing for multi-threading and distributed processing.

## Distributed Processing

Distributed processing creates additional "worker" processes that may or may not reside on a single machine.  This has implications for the cost of copying objects as well as how we can share memory.

It is much simpler than R's `snow` package, yet also allows for processes to be moved across machines. Some functinality maps onto serial Julia much the way functionality in Python's `multiprocessing` maps onto base Python.


# A word of warning

Parellization and concurrency are hard problems. Really hard. They can introduce subtle bugs that are hard to detect because of the stochastic nature of race conditions. They can also make it nearly impossible to kill 'a' process because you actually have lots of processes. So, as the saying goes, "measure twice, cut once".

# Multi-Threading

For several of the examples here, we will use rather naive/inefficient algorithms and then parallelize them. We use the naive version because they make easy pedagogical examples, both in terms of understanding the individual examples and in terms of highlighting that concurrency isn't a replacemant for good algorithms.

Note that multi-threading in Julia is still considered 'experimental', but Julia respects semantic versioning and so the examples presented here will continue working will future 1.x releases. Some of the functionality I present here was introduced 1.3, but that's what we require for MixedModels.jl, so you're all using it anyway.

The number and availability of threads depends on the environment variable `JULIA_NUM_THREADS`.

In [None]:
Threads.nthreads()

In [None]:
Threads.threadid()

In [None]:
using BenchmarkTools

For our purposes, we consider the Fibonnaci numbers to be the following sequence:

$ 0, 1, 2, 3, 5, 8, \ldots $

## Naive Single-Threading

In [None]:
function fib(n::BigInt)
    if n < 2
        return n
    end
   
    return fib(n - 1) + fib(n - 2)
end

fib(n::Int) = fib(BigInt(n))

In [None]:
@benchmark fib(15)

## Naive Multi-Threading

In [None]:
function fib_mt(n::BigInt)
    if n < 2
        return n
    end
    t = Threads.@spawn fib(n - 2)
    return fib(n - 1) + fetch(t)
end

fib_mt(n::Int) = fib_mt(BigInt(n))

Note this is different in internal magic, but very similar in application to R's `future`.

In [None]:
@benchmark fib_mt(15)

That's not really a huge speed boost. In some places, it was even worse. 

Pedagogical moment: Why?


There are two ways we could do much better.

## Efficient single-threaded implementation with different form.

In [None]:
function fib_loop(n::BigInt)
    if n < 2
        return n
    end
    prev_val = 0
    val = 1
    for i in 2:n
        val, prev_val = val + prev_val, val
    end
    return val
end

fib_loop(n::Int) = fib_loop(BigInt(n))

In [None]:
fib(5)

In [None]:
fib_loop(5)

In [None]:
@benchmark fib_loop(15)

## Single-Threading with Memoizing

In [None]:
let 
    global fib_memo
    memo = Dict()
    function fib_memo(n::BigInt)
        get!(memo, n) do 
            fib(n)
        end
    end
end

fib_memo(n::Int) = fib_memo(BigInt(n))

In [None]:
@benchmark fib_memo(12)

## Multi-Threading with Memoizing

In [None]:
let 
    global fib_memo_mt
    global fib_mt
    memo = Dict()
    function fib_memo_mt(n::BigInt)
        get!(memo, n) do 
            fib_mt(n)
        end
    end
end

fib_memo_mt(n::Int) = fib_memo_mt(BigInt(n))

In [None]:
@benchmark fib_memo_mt(12)

## Looping with Multi-Threading

In [None]:
for i in 1:10
    println(Threads.threadid())
end

For loops where each iteration depends on no other iteration, we can easily parallelize with `Threads.@threads`. Note that the execution order is not guaranteed and simultaneously modifying the same objects can lead to inconsistent state and nasty race conditions!

In [None]:
Threads.@threads for i in 1:10
    println(Threads.threadid())
end

In [None]:
using Random

In [None]:
Random.seed!(42)
for i in 1:10
    print(randstring(1))
end

In [None]:
Random.seed!(42)
Threads.@threads for i in 1:10
    print(randstring(1))
end

Note that both the sequence and character differ! In order to guarantee thread safety, Julia creates a freshly seeded copy of the default RNG for every thread, which means that we're drawing different random numbers for the extra threads. 

Let's wrap this up in a function and show how to make it multithreaded.

In [None]:
function argle_bargle(draws; seed=42, use_threads=false)
    val = []
    Random.seed!(seed)
    for i in 1:draws
        append!(val,randstring(1))
    end
    val
end

In [None]:
join(argle_bargle(10))

The default Julia RNG is the MersenneTwister, so we can just create our own RNG and share it among threads. However, random number generation is not *atomic* (roughly "an instruction/operation with no intermediate states") because a lot of calculations go into generating the next draw. In other words, we cannot allow two threads to access an RNG at the same time because they may leave the RNG in an inconsistent state. Moreover, when we access an RNG, we often ask it to generate lots of numbers (e.g. the random noise for all observations in a simulation), and so there is an additional level of not being atomic -- each draw is not atomic and we're asking for lots of draws. So we create a *lock* that blocks two threads from accessing the RNG at once.

Anything that has *side effects* -- in Julia indicated by convention with a `!` in the name is by definition not atomic.

In [None]:
function argle_bargle(draws; seed=42, use_threads=false)
    val = []
    rng = MersenneTwister(seed)
    if use_threads
        rnglock = ReentrantLock()
        Threads.@threads for i in 1:draws
            lock(rnglock)
            s = randstring(rng,1)
            append!(val,s)
            unlock(rnglock)
        end
    else
        for i in 1:draws
            s = randstring(rng,1)
            append!(val,s)
        end
    end
    val
end

In [None]:
join(argle_bargle(10))

In [None]:
join(argle_bargle(10, use_threads=true))

This is so simple compared to the granularity of the locking that the threads are largely scheduled sequentially, but it does highlight how to do the locking.

In addition to this approach, Julia allows "fast-forwarding" the MersenneTwister. This can be useful, if you know how much to fast forward by.

## Advanced Multi-Threading

For a more advanced example, this is the naive version of `simulate_waldtests` without multi-threading

```julia
function simulate_waldtests(
    rng::AbstractRNG,
    n::Integer,
    morig::MixedModel{T};
    β = morig.β,
    σ = morig.σ,
    θ = morig.θ,
) where {T}
    β = convert(Vector{T},β)
    σ = T(σ)
    θ = convert(Vector{T},θ)

    nβ, mod = length(β), deepcopy(morig)
    replicate(n, use_threads=false) do
        mod = simulate!(rng, mod, β = β, σ = σ, θ = θ)
        refit!(mod)
        ct = coeftable(mod)
        names = Tuple(Symbol.(ct.rownms))
        (
         β = NamedTuple{names}(ct.cols[1]),
         se = NamedTuple{names}(ct.cols[2]),
         z = NamedTuple{names}(ct.cols[3]),
         p = NamedTuple{names}(ct.cols[4]),
        )
    end
end
```

And this is the version with threading

```julia
function simulate_waldtests(
    rng::AbstractRNG,
    n::Integer,
    morig::MixedModel{T};
    β = morig.β,
    σ = morig.σ,
    θ = morig.θ,
    use_threads = false,
) where {T}
    β = convert(Vector{T},β)
    σ = T(σ)
    θ = convert(Vector{T},θ)

    nβ, m = length(β), deepcopy(morig)
    # we need to do for in-place operations to work across threads
    m_threads = [m]

    if use_threads
        Threads.resize_nthreads!(m_threads)
    end

    rnglock = ReentrantLock()
    replicate(n, use_threads=use_threads) do
        mod = m_threads[Threads.threadid()]
        lock(rnglock)
        mod = simulate!(rng, mod, β = β, σ = σ, θ = θ)
        unlock(rnglock)
        refit!(mod)
        ct = coeftable(mod)
        names = Tuple(Symbol.(ct.rownms))
        (
        # ct.testvalcol
         se = NamedTuple{names}(ct.cols[2]),
         z = NamedTuple{names}(ct.cols[3]),
         p = NamedTuple{names}(ct.cols[4]),
        )
    end
end
```

# Distributed Processing

Quoting from the documentation,

>  Julia provides a multiprocessing environment based on message passing ... Distributed programming in Julia is built on two primitives: remote references and remote calls. 

Distributed computing is more "mainline" / less experimental than multi-threading, but it does take a fair amount of finesse to really maximize its potential.

The number of workers available is determined by the startup flag `-p` in Julia. We can check it:

In [None]:
import Distributed

In [None]:
Distributed.workers()

We can also add workers from within Julia, unlike with threads. Note that the default cluster is simply the local machine.

In [None]:
Distributed.addprocs()

In [None]:
Distributed.workers()

In [None]:
Distributed.rmprocs(Distributed.workers())

In [None]:
Distributed.workers()

In [None]:
Distributed.addprocs()

In [None]:
Distributed.nprocs(), Distributed.nworkers()

Distributed processing is bit more complex than multithreading with a lot more options (especially if you want to use a proper cluster), but the basics look a lot like multi-threading.

In [None]:
function fib_dp(n::BigInt)
    if n < 2
        return n
    end
    t = Distributed.@spawn fib(n - 2)
    return fib(n - 1) + fetch(t)
end

fib_dp(n::Int) = fib_mt(BigInt(n))

In [None]:
@benchmark fib_dp(15)

In [None]:
fib(15), fib_mt(15),fib_dp(15)

Loops are a bit more tricky because memory isn't automatically shared.

In [None]:
Distributed.@distributed for i in 1:10
    println(Distributed.myid())
end

In [None]:
val = zeros(Int, 20);
for i = 1:length(val)
    val[i] = i
end
show(val)

In [None]:
val = zeros(Int, 20);
Threads.@threads for i = 1:length(val)
    val[i] = Threads.threadid()
end
show(val)

In [None]:
val = zeros(Int, 20);
Distributed.@distributed for i in 1:10
    val[i] = Distributed.myid()
end
show(val)
println()
sleep(1)
show(val)

In [None]:
using SharedArrays

In [None]:
val = SharedArray{Int}(20);
Distributed.@distributed for i in 1:10
    val[i] = Distributed.myid()
end

show(val)
println()
sleep(1)
show(val)

In [None]:
val = SharedArray{Int}(20);
Distributed.@sync Distributed.@distributed for i in 1:10
    val[i] = Distributed.myid()
end

show(val)
println()
sleep(1)
show(val)

Although this is very similar to threading, there are some differences that matter when you really get into performance tuning. For example

In [None]:
Threads.@spawn(1)

In [None]:
Distributed.@spawn(1)

If you're comfortable using `map`, then you should checkout [`pmap`](https://docs.julialang.org/en/v1/stdlib/Distributed/#Distributed.pmap), especially combined with [ProgressMeter](https://github.com/timholy/ProgressMeter.jl).

In [None]:
using ProgressMeter
using LinearAlgebra
import Distributed.pmap

In [None]:
M = Matrix{Float64}[rand(1000,1000) for i = 1:10];
@showprogress pmap(svdvals, M);

More advanced features in distributed processing include `@async`, `@sync`,`@spawnat`, `fetchfrom`, `@everywhere`, `remotecall_wait`, `wait`, `put!`, `take!`, `isready`, `SharedArray`s, `DArrays`, and `ClusterManager`s. 

I haven't yet gotten too far into how locking and synchronization works in distributed processing, so I don't have a good, optimized RNG example for you, sorry. In the future, I might add an example of distributed processing across a cluster for `parametricbootstrap` or `simulate_waldtests`.