# Multithreading

## What are threads?
Threads are execution units within a process that can run simultaneously.

<img src="./imgs/processes_threads.png" width=400px>

While processes are entirely separate, threads run in a **shared memory** space.

## Starting Julia with multiple threads

By default, Julia starts with a single *user thread*. We must tell it explicitly to start multiple user threads. There are two ways to do this:

* Environment variable: `JULIA_NUM_THREADS=4`
* Command line argument: `julia -t 4`

**Jupyter lab:**

The simplest way is to globally set the environment variable `JULIA_NUM_THREADS` (e.g. in the `.bashrc`). But one can also create a specific Jupyter kernel for multithreaded Julia:

```julia
using IJulia
installkernel("Julia (4 threads)", env=Dict("JULIA_NUM_THREADS"=>"4"))
```

We can readily check how many threads we are running:

In [1]:
Threads.nthreads()

6

### User threads vs default threads

Technically, the Julia process is also spawning multiple threads already in "single-threaded" mode, like
* a thread for unix signal listening
* multiple OpenBLAS threads for BLAS/LAPACK operations

For this reason, we call the threads specified via `-t` or the environment variable *user threads* or simply *Julia threads*.

## Tasks

By default, Julia waits for commands to finish ("**blocking**") and runs everything sequentially.

**Tasks** are a feature that allows (parts of) computations to be scheduled (suspended and resumed) in a flexible manner to implement **concurrency** (multitasking) and **parallelism**.

* Concurrency is about dealing with lots of things at once.
* Parallelism is about doing lots of things at once .

Example (concurrency): **asynchronous I/O** like
 * **multiple user input** (Why not already process some of the input?)
 * **data dumping to disk** (Maybe it's possible to continue a calculation?)
 * **receiving calculations from worker processes**
 
Example (parallelism): **multithreading, distributed computing**

## `@async` and `@sync`

We can create a task for asynchronous execution with the [`@async` macro](https://docs.julialang.org/en/v1/base/parallel/#Base.@async). What this means is that for whatever falls into its scope, Julia will start a task to then proceed to whatever comes next in the script without waiting for the task to complete ("**non-blocking**").

(**Note:** `@async` is kind of deprecated in favor of `@spawn` below, but we quickly mention it here nonetheless for pedagogical reasons)

In [5]:
@time sleep(2);

  2.003316 seconds (66 allocations: 1.688 KiB)


In [6]:
@time @async sleep(2)

  0.009963 seconds (7.35 k allocations: 448.450 KiB)


Task (runnable) @0x0000000112f35f90

Julia allows the script to proceed (and the `@time` macro to fully execute) without waiting for the task (in this case, sleeping for two seconds) to complete.

We can use the partner macro `@sync` to synchronize, that is wait for all encapsulated tasks. (see `?@sync`). 

In [7]:
@time @sync @async sleep(2)

  2.037175 seconds (8.27 k allocations: 563.504 KiB, 1.57% compilation time)


Task (done) @0x0000000113085e40

Of course, here it doesn't make much sense to write `@sync @async` - we could simply drop it altogether. A better example is the following.

In [8]:
@time @sync begin
    @async sleep(2.0)
    @async sleep(2.0)
end

  2.017607 seconds (15.93 k allocations: 1.073 MiB, 0.79% compilation time)


Task (done) @0x000000011328c940

In [9]:
A = rand(1000,1000)
B = rand(1000,1000)

t = @async A * B

Task (done) @0x00000001108c0160

In [10]:
wait(t)

In [11]:
fetch(t)

1000×1000 Matrix{Float64}:
 258.863  251.988  251.66   248.055  …  254.089  258.598  252.147  251.962
 256.655  246.754  252.633  248.102     255.455  252.97   255.976  246.44
 247.152  241.599  241.174  241.047     244.05   246.678  246.611  236.804
 261.366  255.333  257.759  250.723     253.528  262.9    263.362  251.323
 251.187  248.668  251.973  242.18      248.841  239.106  248.397  241.887
 253.758  251.199  255.334  241.883  …  254.323  250.681  251.337  244.613
 246.54   251.146  249.127  241.508     252.774  246.718  246.471  244.15
 251.284  248.965  242.299  239.541     247.846  251.664  252.859  242.705
 250.135  245.174  246.241  243.463     243.931  254.803  250.343  247.529
 257.451  251.01   250.736  241.272     250.654  252.469  254.773  252.831
 238.57   240.734  235.473  230.755  …  239.8    234.493  240.531  230.98
 254.714  243.69   248.54   240.839     246.479  249.655  252.558  245.778
 252.709  246.558  245.182  247.438     248.51   249.056  249.118  244.268
 

## Task-based multithreading

In traditional HPC, we typically care about threads directly. Using e.g. OpenMP, we essentially tell each thread what to do.

Conceptually, Julia takes a different approach and implements **task-based** multithreading. In this paradigm, a task - e.g. a computational piece of a code - is marked for **parallel** execution on **any** of the available Julia threads. Julias **dynamic scheduler** will automatically put the task on one of the threads and trigger the execution of the task on said thread.

Ideally, **a user should think about tasks and not threads**.

**Advantages:**
* high-level and convenient
* **composability / nestability** (Multithreaded code can call multithreaded code can call multithreaded code ....)

**Disadvantages:**
* potential scheduling overhead
* **can get in the way when performance engineering**
  * scheduler has limited information (e.g. about the system topology)
  * low-level profiling (e.g. with LIKWID) currently requires a known task -> thread -> cpu core mapping.

(Blog post: [Announcing composable multi-threaded parallelism in Julia](https://julialang.org/blog/2019/07/multithreading/))

### Spawning tasks on threads: `Threads.@spawn`
`Threads.@spawn` spawns a task on a Julia thread. Specifically, it creates (and immediately returns) a `Task` and schedules it for execution on an available Julia thread.

To avoid having to prefix `Threads.` to `@spawn` (and other threading-related functions) let's load everything from `Base.Threads` into global scope.

In [14]:
using Base.Threads

In [15]:
@spawn println("test")

Task (runnable) @0x00000001132aa8c0

test


While `Threads.@spawn` returns the task right away - it is **non-blocking** - the result might only be fetchable after some time.

In [16]:
t = @spawn begin
    sleep(3);
    "result"
end
@time fetch(t)

  2.983542 seconds (95 allocations: 2.547 KiB)


"result"

Note that we can use (some of) the control flow tools that we've already covered, like `@sync`.

In [17]:
@sync t = @spawn begin
    sleep(3);
    "result"
end
@time fetch(t)

  0.000004 seconds


"result"

In [18]:
for i in 1:2*nthreads()
    @spawn println("Hi, I'm ", threadid())
end

#### Example: Recursive Fibonacci series

$$ F(n) = F(n-1) + F(n-2), \qquad F(1) = F(2) = 1$$

We can nest `@spawn` calls freely!

In [19]:
function fib(n)
    n < 2 && return n
    t = @spawn fib(n-2)
    return fib(n-1) + fetch(t)
end

fib (generic function with 1 method)

In [20]:
fib.(1:10)

Hi, I'm 

10-element Vector{Int64}:
  1
  1
  2
  3
  5
  8
 13
 21
 34
 55

5
Hi, I'm 6
Hi, I'm 6
Hi, I'm 1
Hi, I'm 3
Hi, I'm 6
Hi, I'm 6
Hi, I'm 6
Hi, I'm 6
Hi, I'm 2
Hi, I'm 6
Hi, I'm 4


(Note: Algorithmically, this is a highly inefficient implementation of the Fibonacci series, of course!)

#### Example: `tmap` (threaded `map`)

(again, not the most efficient implementation but fine for now)

In [21]:
tmap(fn, itr) = map(fetch, map(i -> Threads.@spawn(fn(i)), itr))

tmap (generic function with 1 method)

In [22]:
using LinearAlgebra

In [23]:
M = [rand(200,200) for i in 1:10];

In [24]:
tmap(svdvals, M)

10-element Vector{Vector{Float64}}:
 [100.22170255091726, 8.24052689368954, 7.920930771051425, 7.753311895941921, 7.662172505595194, 7.566895404062618, 7.491885588995352, 7.429194837557763, 7.333251767465407, 7.295327268695022  …  0.2589182851822991, 0.23679912303817682, 0.21108330393241795, 0.19554370286798717, 0.165883670293, 0.12749719318923072, 0.10676202595374887, 0.09115153599396487, 0.054574455869891195, 0.02642592857816373]
 [99.82297643141749, 8.000913007213283, 7.883720528251313, 7.756708339652432, 7.71526567248039, 7.5986296018015596, 7.510962866843324, 7.4605892912946805, 7.28255516167145, 7.232648003920655  …  0.334930969474883, 0.2710295768435623, 0.2637360301764507, 0.21983630641359803, 0.1981284648151442, 0.15540874909087712, 0.10733525370730641, 0.048014330373218686, 0.029627805895823492, 0.006789924306356225]
 [100.19897835581133, 7.91330160167902, 7.839735145235196, 7.800795447444994, 7.631962190125877, 7.562194445720498, 7.514048943814794, 7.333581657411916, 7.25905

In [25]:
tmap(i -> println(i, " ($(threadid()))"), 1:10);

9 (5)
3 (5)
7 (2)
8 (3)
2 (5)
5 (4)
1 (5)
10 (6)
6 (5)
4 (1)


Note, however, that this implementation creates temporary allocations and thus isn't particularly efficient.

In [27]:
using BenchmarkTools

@btime tmap($svdvals, $M);
@btime map($svdvals, $M);

  20.900 ms (148 allocations: 4.22 MiB)
  53.099 ms (81 allocations: 4.22 MiB)


### Multithreading for-loops: `@threads`

In [47]:
@threads for i in 1:2*nthreads()
    println("Hi, I'm ", threadid())
end

Hi, I'm 2
Hi, I'm 6
Hi, I'm 3
Hi, I'm 4
Hi, I'm 1
Hi, I'm 3
Hi, I'm 4
Hi, I'm 2
Hi, I'm 2
Hi, I'm 6
Hi, I'm 5
Hi, I'm 4


By default, `@threads` creates `nthreads()` many tasks each processing a contigious region of the iteration space. Each task is then essentially spawned with `@spawn`.

In [48]:
using BenchmarkTools

function square!(x)
    for i in eachindex(x)
        x[i] = x[i]^2
    end
end

function square_threads!(x)
    @threads for i in eachindex(x)
        x[i] = x[i]^2
    end
end

x = rand(1_000_000)
@btime square!($x);
@btime square_threads!($x);

  302.639 μs (0 allocations: 0 bytes)
  76.121 μs (32 allocations: 2.81 KiB)


### Task-based vs thread-based multithreading

If one is coming from an OpenMP background (or similar), it is very easy to not consider the task-based nature of Julia's multithreading. This might even be reinforced by names like `@threads` and the existence of functions like `threadid()`. Unfortunately, this can readily lead to incorrect code.

#### Task migration and `threadid()`

Since the user should conceptually only care about tasks, Julia's scheduler isn't only dynamically assigning tasks to any of the Julia threads, but it is also free to **migrate tasks between threads**. For example, a task might start running on Julia thread 1, then be paused and moved to Julia thread 3, where it then finishes execution. Hence, by default, there is **no fixed task-thread mapping**.

→ **`threadid()` should be used with extreme care** as its output isn't guaranteed to be constant across the exectution of a task!

##### Unsafe (!) example: partial sums

In [75]:
function partial_sums_unsafe(data)
    psums = zeros(nthreads())
    @threads for x in data
        tid = threadid()
        old_sum = psums[tid]
        new_sum = old_sum + x
        psums[tid] = new_sum
    end
    return psums
end

partial_sums_unsafe (generic function with 1 method)

Why is this conceptually unsafe?

Note that while semantically unsafe, the function above might still work fine in practice. This is because task migration is (at least as of now) very rare. (The scheduler isn't using the freedom much.)

In [71]:
data = rand(1_000 * nthreads());

In [76]:
sum(partial_sums_unsafe(data)) ≈ sum(data) # very likely still gives true

true

##### How to fix the issue?

* **Option 1:** Iterate over (e.g. thread) indices instead of data. Then use the **loop variable** (which is constant across one iteration) to process chunks of the data in each iteration (task).

In [91]:
function partial_sums_safe1(data)
    psums = zeros(nthreads())
    
     # manual partitioning of data
    data_chunks = collect(Iterators.partition(data, length(data)÷nthreads()))
    
    @threads for tid in 1:nthreads() # iterate over thread ids
        for x in data_chunks[tid] # iterate over data chunk
            old_sum = psums[tid]
            new_sum = old_sum + x
            psums[tid] = new_sum
        end
    end
    return psums
end

partial_sums_safe1 (generic function with 1 method)

In [92]:
sum(partial_sums_safe1(data)) ≈ sum(data)

true

The package [ChunkSplitters.jl](https://github.com/m3g/ChunkSplitters.jl) simplifies this pattern of manual partitioning.

In [114]:
using ChunkSplitters

In [115]:
collect(chunks(data, nthreads()))

6-element Vector{Tuple{UnitRange{Int64}, Int64}}:
 (1:1000, 1)
 (1001:2000, 2)
 (2001:3000, 3)
 (3001:4000, 4)
 (4001:5000, 5)
 (5001:6000, 6)

In [116]:
function partial_sums_safe_chunks(data; nchunks=nthreads())
    psums = zeros(nchunks)
    @threads for (data_range, ichunk) in chunks(data, nchunks)
        for idata in data_range
            old_sum = psums[ichunk]
            new_sum = old_sum + data[idata]
            psums[ichunk] = new_sum
        end
    end
    return psums
end

partial_sums_safe_chunks (generic function with 1 method)

In [117]:
sum(partial_sums_safe_chunks(data)) ≈ sum(data)

true

Note that this chunking scheme also isn't "thread-biased" anymore in the sense that we can choose `nchunks != nthreads()`.

#### Scheduling options

Syntax: `@threads [schedule] for ...`

  * `:dynamic` (**default**)
    * creates O(`nthreads()`) many tasks each processing a contigious region of the iteration space
    * each task essentially spawned with `@spawn`
      * -> task migration
      * -> composability / nestability
    
  * `:static`
    * evenly splits up the iteration space and creates one task per block
    * **statically** maps tasks to threads, specifically: task 1 -> thread 1, task 2 -> thread 2, etc.
      * -> no task migration, i.e. **fixed task-thread mapping**
      * -> not composable / nestable
      * -> only little overhead

* **Task migration (!)**: 

(Technically, our `tmap` example above is ill-defined.)
* **Spawning tasks on specific threads**: Julia doesn't have a built-in tool for this (as of now). However, some packages like [ThreadPinning.jl](https://github.com/carstenbauer/ThreadPinning.jl) export `@tspawnat <threadid> ...` which allows to spawn *sticky* tasks.

In [46]:
using ThreadPinning

@tspawnat 2 println("running on thread ", threadid())

Task (runnable) @0x00000001127753c0

running on thread 2


In [None]:
@threads :dynamic for i in 1:2*nthreads()
    println(i, " -> thread ", threadid())
end

In [None]:
@threads :static for i in 1:2*nthreads()
    println(i, " -> thread ", threadid())
end

For `@threads :static`, every thread handles precisely two iterations!

In [None]:
@threads :dynamic for i in 1:3
    @threads :dynamic for j in 1:3
        println("$i, $j")
    end
end

In [None]:
@threads :static for i in 1:3
    @threads :static for j in 1:3
        println("$i, $j")
    end
end

### Load-balancing

In [None]:
function compute_nonuniform_spawn!(a, niter = zeros(Int, nthreads()), load = zeros(Int, nthreads()))
    @sync for i in 1:length(a)
        Threads.@spawn begin
            a[i] = sum(abs2, rand() for j in 1:i)
            
            # only for bookkeeping
            niter[threadid()] += 1
            load[threadid()] += i
        end
    end
    return niter, load
end

In [None]:
a = zeros(nthreads()*20)
niter, load = compute_nonuniform_spawn!(a)

In [None]:
using Plots

b1 = bar(niter, xlab="threadid", ylab="# iterations", title="Number of iterations", legend=false)
b2 = bar(load, xlab="threadid", ylab="workload", title="Workload", legend=false)

display(b1)
display(b2)

In [None]:
function compute_nonuniform_threads!(a, niter = zeros(Int, nthreads()), load = zeros(Int, nthreads()))
    @threads for i in 1:length(a)
        a[i] = sum(abs2, rand() for j in 1:i)

        # only for bookkeeping
        niter[threadid()] += 1
        load[threadid()] += i
    end
    return niter, load
end

In [None]:
a = zeros(nthreads()*20)
niter, load = compute_nonuniform_threads!(a)

In [None]:
b1 = bar(niter, xlab="threadid", ylab="# iterations", title="Number of iterations", legend=false)
b2 = bar(load, xlab="threadid", ylab="workload", title="Workload", legend=false)

display(b1)
display(b2)

(There might be a scheduling option for `@threads` that implements load-balancing in the future.)

## Multithreading: Things to be aware of

### Race conditions and thread safety

In [None]:
function sum_serial(x)
    s = zero(eltype(x))
    for i in eachindex(x)
        @inbounds s += x[i]
    end
    return s
end

In [None]:
function sum_threads_naive(x)
    s = zero(eltype(x))
    @threads for i in eachindex(x)
        @inbounds s += x[i]
    end
    return s
end

In [None]:
numbers = rand(nthreads()*10_000);

In [None]:
@show sum(numbers);
@show sum_serial(numbers);
@show sum_threads_naive(numbers);

**Wrong** result! Even worse, it's **non-deterministic** and different every time! It's also slow...

In [None]:
@btime sum_serial($numbers);
@btime sum_threads_naive($numbers);

Reason: There is a [race condition](https://en.wikipedia.org/wiki/Race_condition).

Note that race conditions aren't specific to reductions. More generally, they can appear when multiple threads are modifying a shared "global" state simultaneously.

Not all of Julia and its packages in the ecosystem are thread-safe! In general, it is safer to assume that they're not unless proven otherwise.

#### Fix 1: Divide the work

In [None]:
function sum_threads_subsums(x)
    blocksize = length(x) ÷ nthreads()
    @assert isinteger(blocksize)
    idcs = collect(Iterators.partition(1:length(x), blocksize))
    
    subsums = zeros(eltype(x), nthreads())
    @threads for tid in 1:nthreads()
        for i in idcs[tid]
            @inbounds subsums[tid] += x[i]
        end
    end
    return sum(subsums)
end

In [None]:
@show sum(numbers);
@show sum_serial(numbers);
@show sum_threads_subsums(numbers);

In [None]:
@btime sum_threads_subsums($numbers);

Speedup and correct result. But not ideal:

* cumbersome to do this manually
* can have more subtle performance issues like [false sharing](https://en.wikipedia.org/wiki/False_sharing#:~:text=In%20computer%20science%2C%20false%20sharing,managed%20by%20the%20caching%20mechanism.)

#### Fix 2: Atomics

See [Atomic Operations](https://docs.julialang.org/en/v1/manual/multi-threading/#Atomic-Operations) in the Julia doc for more information. But in generaly one shouldn't avoid using them as much as possible since they actually limit the parallelism.

### Garbage collection

[As of now](https://www.youtube.com/watch?v=Ks0p6PQyIPs), **Julia's GC is not parallel** and doesn't work nicely with multithreading.

If it gets triggered, it essentially "stops the world" (all threads) for clearing up memory.

Hence, when using multithreading, it is even more important to **avoid heap allocations!**

(If you can't avoid allocations, consider using multiprocessing instead.)

## High-level tools for parallel computing

### [ThreadsX.jl](https://github.com/tkf/ThreadsX.jl)

*Parallelized Base functions*

In [None]:
using ThreadsX

In [None]:
sum(numbers)

In [None]:
ThreadsX.sum(numbers)

In [None]:
@btime ThreadsX.sum($numbers);

### [FLoops.jl](https://github.com/JuliaFolds/FLoops.jl)

*Fast sequential, threaded, and distributed for-loops for Julia*

In [None]:
using FLoops

In [None]:
function sum_floops(x)
    @floop for xi in x
        @reduce(s = zero(eltype(x)) + xi)
    end
    return s
end

In [None]:
@btime sum_floops($numbers);

In [None]:
numbers = rand(nthreads()*10_000);

sum_floops(numbers) ≈ sum(numbers)

In [None]:
@btime sum_serial($numbers);
@btime sum_floops($numbers);

`@floop` supports different *executors* that allow for easy switching between serial and threaded execution

In [None]:
function sum_floops(x, executor)
    @floop executor for xi in x
        @reduce(s += xi)
    end
    return s
end

In [None]:
@btime sum_floops($numbers, $(SequentialEx()));
@btime sum_floops($numbers, $(ThreadedEx()));

There are many more [executors](https://juliafolds.github.io/FLoops.jl/stable/tutorials/parallel/#tutorials-executor), like `DistributedEx` or `CUDAEx`. See, e.g., [FoldsThreads.jl](https://github.com/JuliaFolds/FoldsThreads.jl) and [FoldsCUDA.jl](https://github.com/JuliaFolds/FoldsCUDA.jl).

Under the hood, FLoops is built on top of [Transducers.jl](https://juliafolds.github.io/Transducers.jl/stable/tutorials/tutorial_parallel/) (i.e. it translates for-loop semantics into folds).

### [Tullio.jl](https://github.com/mcabbott/Tullio.jl)

*Tullio is a very flexible einsum macro* ([Einstein notation](https://en.wikipedia.org/wiki/Einstein_notation))

In [None]:
using Tullio

In [None]:
A = rand(10,10)
B = rand(10,10)

C = @tullio C[i,j] := A[i,k] * B[k,j] # matrix multiplication

C ≈ A * B

In [None]:
sum_tullio(xs) = @tullio S := xs[i]

In [None]:
@btime sum_tullio($numbers);

(Uses `fastmath` and other tricks to be faster here.)

### [LoopVectorization.jl](https://github.com/JuliaSIMD/LoopVectorization.jl)

*Macro(s) for vectorizing loops.*

In [None]:
using LoopVectorization

In [None]:
function sum_turbo(x)
    s = zero(eltype(x))
    @tturbo for i in eachindex(x)
        @inbounds s += x[i]
    end
    return s
end

In [None]:
@btime sum_turbo($numbers);

(Uses all kinds of SIMD tricks to be faster than the others.)

## System topology and thread affinity

### Hawk compute node

<img src="../imgs/lstopo_hawk.svg" width=100%>

**Not pinning threads (or pinning them badly) can degrade performance massively!**

### Pinning Julia threads to CPU threads

What about external tools like `numactl`, `taskset`, etc.? Doesn't work reliably because it [can't distinguish](https://discourse.julialang.org/t/thread-affinitization-pinning-julia-threads-to-cores/58069/5) between Julia threads and other internal threads.

**Options:**

* Environment variable: `JULIA_EXCLUSIVE=1` (compact pinning)
* More control and convenient visualization: [ThreadPinning.jl](https://github.com/carstenbauer/ThreadPinning.jl)
  * `compact`: pin to cpu thread 0, 1, 2, 3, ... one after another
  * `spread`: alternate between sockets so, e.g., 0, 64, 1, 65, 2, 66, .... (if a socket has 64 cores)
  * `numa`: same as `spread` but alternate between NUMA domains so, e.g., 0, 16, 32, 48, 64, .... (if a NUMA domain has 16 cores)
  * **Caveat:** currently one works on Linux.

<img src="../imgs/threadinfo.png" width=1000px>