# Multithreading

## What are threads?
Threads are execution units within a process that can run simultaneously.

<img src="./imgs/what-are-threads.png" width=500px>

While processes are entirely separate, threads run in a **shared memory** space (heap).

## Starting Julia with multiple threads

By default, Julia starts with a single *user thread*. We must tell it explicitly to start multiple user threads. There are two ways to do this:

* Environment variable: `JULIA_NUM_THREADS=4`
* Command line argument: `julia -t 4`

**Jupyter lab:**

The simplest way is to globally set the environment variable `JULIA_NUM_THREADS` (e.g. in the `.bashrc`). But one can also create a specific Jupyter kernel for multithreaded Julia:

```julia
using IJulia
installkernel("Julia (4 threads)", env=Dict("JULIA_NUM_THREADS"=>"4"))
```

We can readily check how many threads we are running:

In [None]:
Threads.nthreads()

**It is currently not (easily) possible to change the number of threads at runtime!** (Will likely change in the future.)

### User threads vs default threads

Technically, the Julia process is also spawning multiple threads already in "single-threaded" mode, like
* a thread for unix signal listening
* multiple OpenBLAS threads for BLAS/LAPACK operations

For this reason, we call the threads specified via `-t` or the environment variable *user threads* or simply *Julia threads*.

## Tasks

By default, Julia waits for commands to finish ("**blocking**") and runs everything sequentially.

**Tasks** are a feature that allows (parts of) computations to be scheduled (suspended and resumed) in a flexible manner to implement **concurrency** (multitasking) and **parallelism**.

* Concurrency is about dealing with lots of things at once.
* Parallelism is about doing lots of things at once .

Example (concurrency): **asynchronous I/O** like
 * **multiple user input** (Why not already process some of the input?)
 * **data dumping to disk** (Maybe it's possible to continue a calculation?)
 * **receiving calculations from worker processes**
 
Example (parallelism): **multithreading, distributed computing**

## `@async` and `@sync`

We can create a task for asynchronous execution with the [`@async` macro](https://docs.julialang.org/en/v1/base/parallel/#Base.@async). What this means is that for whatever falls into its scope, Julia will start a task to then proceed to whatever comes next in the script without waiting for the task to complete ("**non-blocking**").

(**Note:** `@async` is kind of deprecated in favor of `@spawn` below, but we quickly mention it here nonetheless for pedagogical reasons)

In [None]:
@time sleep(2);

In [None]:
@time @async sleep(2)

Julia allows the script to proceed (and the `@time` macro to fully execute) without waiting for the task (in this case, sleeping for two seconds) to complete.

We can use the partner macro `@sync` to synchronize, that is wait for all encapsulated tasks. (see `?@sync`). 

In [None]:
@time @sync @async sleep(2)

Of course, here it doesn't make much sense to write `@sync @async` - we could simply drop it altogether. A better example is the following.

In [None]:
@time @sync begin
    @async sleep(2.0)
    @async sleep(2.0)
end

In [None]:
A = rand(1000,1000)
B = rand(1000,1000)

t = @async A * B

In [None]:
wait(t)

In [None]:
fetch(t)

## Task-based multithreading

In traditional HPC, we typically care about threads directly. Using e.g. OpenMP, we essentially tell each thread what to do.

Conceptually, Julia takes a different approach and implements **task-based** multithreading. In this paradigm, a task - e.g. a computational piece of a code - is marked for **parallel** execution on **any** of the available Julia threads. Julias **dynamic scheduler** will automatically put the task on one of the threads and trigger the execution of the task on said thread.

**Users should think about tasks and not threads**.

<br>
<img src="./imgs/task-based-parallelism.png" width=250px>
<br>

**Advantages:**
* high-level abstraction
* **composability / nestability** (Multithreaded code can call multithreaded code can call multithreaded code ....)

**Disadvantages:**
* potential scheduling overhead
* task → thread assignment uncertain (can vary dynamically + task migration)
* **can get in the way when performance engineering**
  * scheduler has limited information (e.g. about the system topology)
  * low-level profiling (e.g. with LIKWID) requires fixed `task -> thread -> core` mapping.

### Spawning tasks on threads: `Threads.@spawn`
`Threads.@spawn` spawns a task on a Julia thread. Specifically, it creates (and immediately returns) a `Task` and schedules it for execution on an available Julia thread.

To avoid having to prefix `Threads.` to `@spawn` (and other threading-related functions) let's load everything from `Base.Threads` into global scope.

In [None]:
using Base.Threads

In [None]:
@spawn println("test")

While `Threads.@spawn` returns the task right away - it is **non-blocking** - the result might only be fetchable after some time.

In [None]:
t = @spawn begin
    sleep(3);
    "result"
end
@time fetch(t)

Note that we can use (some of) the control flow tools that we've already covered, like `@sync`.

In [None]:
@sync t = @spawn begin
    sleep(3);
    "result"
end
@time fetch(t)

In [None]:
for i in 1:2*nthreads()
    @spawn println("Hi, I'm ", threadid())
end

#### Example: Recursive Fibonacci series

$$ F(n) = F(n-1) + F(n-2), \qquad F(1) = F(2) = 1$$

We can nest `@spawn` calls freely!

In [None]:
function fib(n)
    n < 2 && return n
    t = @spawn fib(n-2)
    return fib(n-1) + fetch(t)
end

In [None]:
fib.(1:10)

(Note: Algorithmically, this is a highly inefficient implementation of the Fibonacci series, of course!)

#### Example: `tmap` (threaded `map`)

(again, not the most efficient implementation but fine for now)

In [None]:
tmap(fn, itr) = map(fetch, map(i -> Threads.@spawn(fn(i)), itr))

In [None]:
using LinearAlgebra

In [None]:
M = [rand(200,200) for i in 1:10];

In [None]:
tmap(svdvals, M)

In [None]:
tmap(i -> println(i, " ($(threadid()))"), 1:10);

Note, however, that this implementation creates temporary allocations and thus isn't particularly efficient.

In [None]:
using BenchmarkTools

@btime tmap($svdvals, $M);
@btime map($svdvals, $M);

#### Load-balancing

If there are many tasks (e.g. many more than there are threads), Julia's scheduler balances the load of these tasks among threads. (**non-uniform workloads**)

In [None]:
function compute_nonuniform_spawn!(a, niter = zeros(Int, nthreads()), load = zeros(Int, nthreads()))
    @sync for i in 1:length(a)
        Threads.@spawn begin
            a[i] = sum(abs2, rand() for j in 1:i)
            
            # poor-mans bookkeeping (unsafe!)
            niter[threadid()] += 1
            load[threadid()] += i
        end
    end
    return niter, load
end

In [None]:
using Plots

a = zeros(nthreads()*10_000)
niter, load = compute_nonuniform_spawn!(a)

b1 = bar(niter, xlab="threadid", ylab="# iterations", title="Number of iterations (@spawn)", legend=false)
b2 = bar(load, xlab="threadid", ylab="workload", title="Workload (@spawn)", legend=false, color=:green)

display(b1)
display(b2)

### Multithreading for-loops: `@threads`

In [None]:
@threads for i in 1:2*nthreads()
    println("Hi, I'm ", threadid())
end

By default, `@threads` creates `nthreads()` many tasks each processing a contigious region of the iteration space. Each task is then essentially spawned with `@spawn`.

In [None]:
using BenchmarkTools

function square!(x)
    for i in eachindex(x)
        x[i] = x[i]^2
    end
end

function square_threads!(x)
    @threads for i in eachindex(x)
        x[i] = x[i]^2
    end
end

In [None]:
x = rand(1_000_000)
@btime square!($x);
@btime square_threads!($x);

#### Nestability / Composability

Multithreaded loops can be nested! → composability

In [None]:
function square_threads_all!(xs)
    @threads for i in eachindex(xs)
        square_threads!(xs[i])
    end
end
function square_all!(xs)
    @threads for i in eachindex(xs)
        square!(xs[i])
    end
end

In [None]:
xs = [rand(n,n) for n in (100,1000,10_000)]

@btime square_threads_all!($xs) samples = 5 evals = 3
@btime square_all!($xs) samples = 5 evals = 3

### Task-based vs thread-based multithreading

If one is coming from an OpenMP background (or similar), it is very easy to not forget about the task-based nature of Julia's multithreading. This might even be reinforced by names like `@threads` and the existence of functions like `threadid()`. Unfortunately, this can readily lead to incorrect code.

#### Task migration and `threadid()`

Julia's scheduler isn't only dynamically assigning tasks to any of the Julia threads, but it is also free to **migrate tasks between threads**. For example, a task might start running on Julia thread 1, then be paused and moved to Julia thread 3, where it then finishes execution. Hence, by default, there is **no fixed task-thread mapping**.

→ **`threadid()` should be used with extreme care** as its output isn't guaranteed to be constant across the exectution of a task!

(Also, `@threads` by default doesn't make many (long-time) guarantees about how iterations get mapped to tasks.)

### Problematic example: parallel summation

In [None]:
function sum_threads_unsafe(data)
    psums = zeros(nthreads())
    @threads for x in data
        psums[threadid()] += x
    end
    return sum(psums)
end

Why is this conceptually unsafe?

Note that while semantically unsafe, the function above might still work fine in practice. This is because task migration is (at least as of now) very rare (and, on a technical note, the above loop iterations don't yield).

In [None]:
data = rand(1_000 * nthreads());

In [None]:
sum_threads_unsafe(data) ≈ sum(data) # almost certainly still gives true

##### Fix 1: Chunking

**Idea:** Partition the data into chunks and then iterate over chunk indices instead of the data itself. In each iteration (task) one chunk is processed. (Below the number of chunks is chosen as `nthreads()`.)

In [None]:
function sum_threads_chunks(data)
    psums = zeros(nthreads())
    
     # manual partitioning of data
    data_chunks = collect(Iterators.partition(data, length(data)÷nthreads()))
    
    @threads for tid in 1:nthreads() # iterate over chunk/thread ids
        for x in data_chunks[tid] # iterate over data chunk
            psums[tid] += x # tid is safe because it is constant across one iteration
        end
    end
    return sum(psums)
end

**Note:** The **iteration variable** is always constant across one iteration.

In [None]:
sum_threads_chunks(data) ≈ sum(data)

The package [ChunkSplitters.jl](https://github.com/m3g/ChunkSplitters.jl) simplifies this pattern of manual chunking a little bit.

In [None]:
using ChunkSplitters

In [None]:
collect(chunks(data, nthreads()))

In [None]:
function sum_threads_chunksplitters(data; nchunks=nthreads())
    psums = zeros(nchunks)
    @threads for (data_range, ichunk) in chunks(data, nchunks)
        for idata in data_range
            psums[ichunk] += data[idata]
        end
    end
    return sum(psums)
end

In [None]:
sum_threads_chunksplitters(data) ≈ sum(data)

Note that this chunking scheme also isn't "thread-biased" anymore in the sense that we can choose `nchunks != nthreads()`.

##### Fix 2: Opt-out of task migration (*sticky tasks*)

We can choose the `:static` scheduling option for `@threads` to opt-out of Julia's dynamic scheduling and get **guarantees about the task-thread assignment** (and the iterations → task mapping). Clean solution for "traditional HPC", where one wants to think about threads at a low-level, but non-composable etc.


Syntax: `@threads :static for ...`

 * splits up the iteration space into `nthreads()` even, contiguous blocks (in-order) and creates precisely one task per block
 * **statically** maps tasks to threads, specifically: task 1 -> thread 1, task 2 -> thread 2, etc.
   * -> no task migration, i.e. **fixed task-thread mapping** 👍
   * -> only little overhead 👍
   * -> not composable / nestable 👎
     

In [None]:
@threads :dynamic for i in 1:2*nthreads()
    println(i, " -> thread ", threadid())
end

In [None]:
@threads :static for i in 1:2*nthreads()
    println(i, " -> thread ", threadid())
end

For `@threads :static`, every thread handles precisely two iterations!

In [None]:
@threads :dynamic for i in 1:3
    @threads :dynamic for j in 1:3
        println("$i, $j")
    end
end

In [None]:
@threads :static for i in 1:3
    @threads :static for j in 1:3
        println("$i, $j")
    end
end

**Note about `@spawn`**: we can also opt-out of task migration for `@spawn` and **spawn *sticky* tasks on specific threads**. However, Julia doesn't have a built-in tool for this (as of now) and one needs to use a package like, e.g., [ThreadPinning.jl](https://github.com/carstenbauer/ThreadPinning.jl). The latter exports a function `@tspawnat <threadid> ...`.

In [None]:
using ThreadPinning

@tspawnat 2 println("running on thread ", threadid())

#### No load-balancing

Currently, `@threads` doesn't give load-balancing (with any scheduler) as it only starts `nthreads()` tasks in total.

In [None]:
function compute_nonuniform_threads!(a, niter = zeros(Int, nthreads()), load = zeros(Int, nthreads()))
    @threads for i in 1:length(a)
        a[i] = sum(abs2, rand() for j in 1:i)

        # poor-mans bookkeeping (unsafe!)
        niter[threadid()] += 1
        load[threadid()] += i
    end
    return niter, load
end

In [None]:
using Plots

a = zeros(nthreads()*10_000)
niter, load = compute_nonuniform_threads!(a)

b1 = bar(niter, xlab="threadid", ylab="# iterations", title="Number of iterations (@threads)", legend=false)
b2 = bar(load, xlab="threadid", ylab="workload", title="Workload (@threads)", legend=false, color=:green)

display(b1)
display(b2)

(There will likely be a scheduling option for `@threads` that implements load-balancing in the future.)

## Multithreading: Things to be aware of

### Data races and thread safety

In [None]:
function sum_serial(x)
    s = zero(eltype(x))
    for i in eachindex(x)
        @inbounds s += x[i]
    end
    return s
end

In [None]:
function sum_threads_naive(x)
    s = zero(eltype(x))
    @threads for i in eachindex(x)
        @inbounds s += x[i]
    end
    return s
end

In [None]:
numbers = rand(nthreads()*10_000);

In [None]:
@show sum(numbers);
@show sum_serial(numbers);
@show sum_threads_naive(numbers);

**Wrong** result! Even worse, it's **non-deterministic** and different every time! It's also slow...

In [None]:
@btime sum_serial($numbers);
@btime sum_threads_naive($numbers);

Reason: There is a [race condition](https://en.wikipedia.org/wiki/Race_condition).

Note that race conditions aren't specific to reductions. More generally, they can appear when multiple tasks are modifying a shared "global" state simultaneously.

Not all of Julia and its packages in the ecosystem are thread-safe! In general, it is safer to assume that they're not unless documented/proven otherwise. (Example: `Dict` isn't thread-safe!)

#### Fix 1: Chunking (divide the work)

In [None]:
function sum_threads_chunksplitters(x; nchunks=nthreads())
    psums = zeros(eltype(x), nchunks)
    @threads for (data_range, ichunk) in chunks(x, nchunks)
        for idata in data_range
            @inbounds psums[ichunk] += x[idata]
        end
    end
    return sum(psums)
end

In [None]:
@show sum(numbers);
@show sum_serial(numbers);
@show sum_threads_chunksplitters(numbers);

In [None]:
@btime sum_threads_chunksplitters($numbers);

Speedup and correct result!


**Note: [False sharing](https://en.wikipedia.org/wiki/False_sharing#:~:text=In%20computer%20science%2C%20false%20sharing,managed%20by%20the%20caching%20mechanism.)**

`sum_threads_cunksplitters` above still has a more subtle performance issue because different tasks mutate non-local state (`psums`) in parallel. There is no (obvious) data race because they access different slots (`ichunk`). However, CPU cores work with cache **lines** instead of single elements.

Different tasks modify the same cache line → need for synchronization → performance decrease.

Possible solution: Entirely **task-local** computations

In [None]:
function sum_threads_chunks_tasklocal(x)
    chunks = Iterators.partition(x, length(x) ÷ Threads.nthreads())
    tasks = map(chunks) do chunk
        Threads.@spawn sum(chunk)
    end
    chunk_sums = fetch.(tasks)
    return sum(chunk_sums)
end

* no non-local mutation
* each task computes a partial sum independently

In [None]:
@btime sum_threads_chunks_tasklocal($numbers);

#### Fix 2 (not recommended): Low-level atomics and locks

See [Atomic Operations](https://docs.julialang.org/en/v1/manual/multi-threading/#Atomic-Operations) and/or [Data-race freedom](https://docs.julialang.org/en/v1/manual/multi-threading/#Data-race-freedom) in the Julia doc for more information. In general, one should avoid using them as much as possible since they actually limit the parallelism (especially if you don't know what you're doing).

### Garbage collection

[As of now](https://www.youtube.com/watch?v=Ks0p6PQyIPs), **Julia's GC is not parallel** and doesn't work nicely with multithreading. (Update: In Julia 1.10 the GC will be parallel!)

If it gets triggered, it stops the world (all threads) for clearing up memory.

Hence, when using multithreading, it is even more important to **avoid heap allocations!**

(If you can't avoid allocations, consider using multiprocessing instead.)

## High-level tools for parallel computing

### [ThreadsX.jl](https://github.com/tkf/ThreadsX.jl)

*Parallelized Base functions*

In [None]:
using ThreadsX

In [None]:
sum(numbers)

In [None]:
ThreadsX.sum(numbers)

In [None]:
@btime ThreadsX.sum($numbers);

### [FLoops.jl](https://github.com/JuliaFolds/FLoops.jl)

*Fast sequential, threaded, and distributed for-loops for Julia*

In [None]:
using FLoops

In [None]:
function sum_floops(x)
    @floop for xi in x
        @reduce(s = zero(eltype(x)) + xi)
    end
    return s
end

In [None]:
@btime sum_floops($numbers);

In [None]:
numbers = rand(nthreads()*10_000);

sum_floops(numbers) ≈ sum(numbers)

In [None]:
@btime sum_serial($numbers);
@btime sum_floops($numbers);

`@floop` supports different *executors* that allow for easy switching between serial and threaded execution

In [None]:
function sum_floops(x, executor)
    @floop executor for xi in x
        @reduce(s += xi)
    end
    return s
end

In [None]:
@btime sum_floops($numbers, $(SequentialEx()));
@btime sum_floops($numbers, $(ThreadedEx()));

There are many more [executors](https://juliafolds.github.io/FLoops.jl/stable/tutorials/parallel/#tutorials-executor), like `DistributedEx` or `CUDAEx`. See, e.g., [FoldsThreads.jl](https://github.com/JuliaFolds/FoldsThreads.jl) and [FoldsCUDA.jl](https://github.com/JuliaFolds/FoldsCUDA.jl).

Under the hood, FLoops is built on top of [Transducers.jl](https://juliafolds.github.io/Transducers.jl/stable/tutorials/tutorial_parallel/) (i.e. it translates for-loop semantics into folds).

### [Tullio.jl](https://github.com/mcabbott/Tullio.jl)

*Tullio is a very flexible einsum macro* ([Einstein notation](https://en.wikipedia.org/wiki/Einstein_notation))

In [None]:
using Tullio

In [None]:
A = rand(10,10)
B = rand(10,10)

C = @tullio C[i,j] := A[i,k] * B[k,j] # matrix multiplication

C ≈ A * B

In [None]:
sum_tullio(xs) = @tullio S := xs[i]

In [None]:
@btime sum_tullio($numbers);

(Uses `fastmath` and other tricks to be faster here.)

### [LoopVectorization.jl](https://github.com/JuliaSIMD/LoopVectorization.jl)

*Macro(s) for vectorizing loops.*

In [None]:
using LoopVectorization

In [None]:
function sum_turbo(x)
    s = zero(eltype(x))
    @tturbo for i in eachindex(x)
        @inbounds s += x[i]
    end
    return s
end

In [None]:
@btime sum_turbo($numbers);

(Uses all kinds of SIMD tricks to be faster than the others.)

## Thread affinity/pinning

A compute node has a complex topology (two sockets, multiple memory channels/domains). **It matters (dramatically) for performance where your Julia threads are running!** → Thread pinning

### Hawk compute node

<img src="./imgs/lstopo_hawk.svg" width=100%>

### Pinning Julia threads to CPU threads/cores

What about external tools like `numactl`, `taskset`, etc.? Doesn't work reliably because they often [can't distinguish](https://discourse.julialang.org/t/thread-affinitization-pinning-julia-threads-to-cores/58069/5) between Julia threads and other internal threads.

**Options:**

* Environment variable: `JULIA_EXCLUSIVE=1` (compact pinning)
* More control and convenient visualization: [ThreadPinning.jl](https://github.com/carstenbauer/ThreadPinning.jl) -> **Exercise saxpy_cpu**

#### [ThreadPinning.jl](https://github.com/carstenbauer/ThreadPinning.jl)

(See my short talk at JuliaCon2023 @ MIT: https://youtu.be/6Whc9XtlCC0)

**Pinning at three conceptual levels**

<br>
<img src="./imgs/threadpinning_pinthreads.svg" width=600px>
<br>

* `:cputhreads:` pin to CPU threads (incl. "hypterthreads") one after another
* `:cores:` pin to CPU cores one after another
* `:numa:` alternate between NUMA domains so, e.g., 0, 16, 32, 48, 64, .... (if a NUMA domain has 16 cores)
* `:sockets:` alternate between sockets so, e.g., 0, 64, 1, 65, 2, 66, .... (if a socket has 64 cores)


**Visualization of cluster topology and thread affinities**

<br>
<img src="./imgs/threadinfo.png" width=1000px>