# Multithreading in Julia

_Part of this notebook is inspired by the material of th [Julia for HPC Course @ UCL ARC ](https://github.com/carstenbauer/JuliaUCL24) by Carsten Bauer._

## Setup

In [1]:
# Running this cell is important to make sure we install all the necessary packages.
using Pkg
Pkg.instantiate()

## Thread pinning

In [2]:
using ThreadPinning
pinthreads(:cores)
threadinfo(; slurm=ThreadPinning.SLURM.isslurmjob())

Hostname: 	nid001041
CPU(s): 	1 x AMD EPYC 7763 64-Core Processor
CPU target: 	znver3
Cores: 		64 (128 CPU-threads due to 2-way SMT)
NUMA domains: 	4 (16 cores each)

[31m[1mSLURM: 128 assigned CPU-threads (entire node).[22m[39m

[32m[1mJulia threads: 	16[22m[39m

[36m[1mCPU socket 1[22m[39m
  [33m[1m0[22m[39m,[90m64[39m, [33m[1m1[22m[39m,[90m65[39m, [33m[1m2[22m[39m,[90m66[39m, [33m[1m3[22m[39m,[90m67[39m, [33m[1m4[22m[39m,[90m68[39m, [33m[1m5[22m[39m,[90m69[39m, [33m[1m6[22m[39m,[90m70[39m, [33m[1m7[22m[39m,[90m71[39m, 
  [33m[1m8[22m[39m,[90m72[39m, [33m[1m9[22m[39m,[90m73[39m, [33m[1m10[22m[39m,[90m74[39m, [33m[1m11[22m[39m,[90m75[39m, [33m[1m12[22m[39m,[90m76[39m, [33m[1m13[22m[39m,[90m77[39m, [33m[1m14[22m[39m,[90m78[39m, [33m[1m15[22m[39m,[90m79[39m, 
  [39m16,[90m80[39m, [39m17,[90m81[39m, [39m18,[90m82[39m, [39m19,[90m83[39m, [39m20,[90m84[39m, [39m21,[9

## Spawning parallel tasks

In [3]:
using Base.Threads

@show nthreads();

nthreads() = 16


In [4]:
@time t = @spawn begin # `@spawn` returns right away
    sleep(2)
    3+3
end

@time fetch(t) # `fetch` waits for the task to finish

  0.006382 seconds (2.81 k allocations: 202.148 KiB, 98.85% compilation time)
  1.974770 seconds (273 allocations: 17.328 KiB, 0.18% compilation time)


6

## Example: multi-threaded `map`

In [5]:
using LinearAlgebra, BenchmarkTools

BLAS.set_num_threads(1) # Fix number of BLAS threads

function tmap(fn, itr)
    # for each i ∈ itr, spawn a task to compute fn(i)
    tasks = map(i -> @spawn(fn(i)), itr)
    # fetch and return all the results
    return fetch.(tasks)
end

M = [rand(100,100) for i in 1:(8 * nthreads())];

tmap(svdvals, M);

@btime  map(svdvals, $M) samples=10 evals=3;
@btime tmap(svdvals, $M) samples=10 evals=3;

  73.021 ms (1025 allocations: 17.26 MiB)
  6.353 ms (1673 allocations: 17.32 MiB)


***Exercise***: do you see any difference if you increase the number of BLAS threads?

## Example: multi-threaded `for` loop (reduction)

In [6]:
using ChunkSplitters, Base.Threads, BenchmarkTools

function sum_threads(fn, data; nchunks=nthreads())
    psums = zeros(eltype(data), nchunks)
    @threads :static for (c, elements) in enumerate(chunks(data; n=nchunks))
        psums[c] = sum(fn, elements)
    end
    return sum(psums)
end

v = randn(20_000_000);

@btime sum(sin, $v);

@btime sum_threads(sin, $v);

  236.035 ms (0 allocations: 0 bytes)
  16.543 ms (85 allocations: 8.73 KiB)


***Exercise***: do you see differences if you change the scheduler type?  Remember you can choose between `:dynamic` (currently the default if omitted), `:greedy` (only if using Julia v1.11+), and `:static`.

In [7]:
function sum_map_spawn(fn, data; nchunks=nthreads())
    ts = map(chunks(data, n=nchunks)) do elements
        @spawn sum(fn, elements)
    end
    return sum(fetch.(ts))
end

@btime sum_map_spawn(sin, $v);

  16.517 ms (112 allocations: 9.34 KiB)


In [8]:
using OhMyThreads: @tasks

function sum_tasks(fn, data; nchunks=nthreads())
    psums = zeros(eltype(data), nchunks)
    @tasks for (c, elements) in enumerate(chunks(data; n=nchunks))
        psums[c] = sum(fn, elements)
    end
    return sum(psums)
end

@btime sum_tasks(sin, $v);

  16.492 ms (104 allocations: 9.70 KiB)


In [9]:
using OhMyThreads: tmapreduce

@btime tmapreduce(sin, +, $v);

  16.517 ms (345 allocations: 28.81 KiB)


## Multi-threading: is it always worth it?

In [10]:
using BenchmarkTools

function overhead!(v)
    for idx in eachindex(v)
        v[idx] = idx
    end
end
    
function overhead_threads!(v)
    @threads for idx in eachindex(v)
        v[idx] = idx
    end
end

N = 10

@btime overhead!(v) setup=(v = Vector{Int}(undef, N))
@btime overhead_threads!(v) setup=(v = Vector{Int}(undef, N))

  15.922 ns (0 allocations: 0 bytes)
  22.153 μs (81 allocations: 8.17 KiB)


***Exercise***: do you see any improvement in the parallel efficiency if you change the size of the problem (here: `N`)?

## Unbalanced workload: computing hexadecimal $\pi$

_This section is inspired by the blogpost [Computing the hexadecimal value of pi](https://giordano.github.io/blog/2017-11-21-hexadecimal-pi/) by Mosè Giordano._

The [Bailey–Borwein–Plouffe formula](https://en.wikipedia.org/wiki/Bailey%E2%80%93Borwein%E2%80%93Plouffe_formula) is one of the [several algorithms to compute $\pi$](https://en.wikipedia.org/wiki/Approximations_of_%CF%80):

$$
\pi = \sum_{k = 0}^{\infty}\left[ \frac{1}{16^k} \left( \frac{4}{8k + 1} -
\frac{2}{8k + 4} - \frac{1}{8k + 5} - \frac{1}{8k + 6} \right) \right]
$$

What makes this formula stand out among other approximations of $\pi$ is that it allows one to directly extract the $n$-th fractional digit of the hexadecimal value of $\pi$ without computing the preceding ones.

The Wikipedia article about the Bailey–Borwein–Plouffe formula explains that the $n + 1$-th fractional digit $d_n$ is given by

$$
d_{n} = 16 \left[ 4 \Sigma(n, 1) - 2 \Sigma(n, 4) - \Sigma(n, 5) - \Sigma(n,
6) \right]
$$

where

$$
\Sigma(n, j) = \sum_{k = 0}^{n} \frac{16^{n-k} \bmod (8k+j)}{8k+j} + \sum_{k
= n+1}^{\infty} \frac{16^{n-k}}{8k+j}
$$

Only the fractional part of expression in square brackets on the right side of $d_n$ is relevant, thus, in order to avoid rounding errors, when we compute each term of the finite sum above we can take only the fractional part. This allows us to always use ordinary double precision floating-point arithmetic, without resorting to arbitrary-precision numbers. In addition note that the terms of the infinite sum get quickly very small, so we can stop the summation when they become negligible.

### Serial implementation

In [11]:
# Return the fractional part of x, modulo 1, always positive
fpart(x) = mod(x, one(x))

function Σ(n, j)
    # Compute the finite sum
    s = 0.0
    denom = j
    for k in 0:n
        s = fpart(s + powermod(16, n - k, denom) / denom)
        denom += 8
    end
    # Compute the infinite sum
    num = 1 / 16
    while (frac = num / denom) > eps(s)
        s     += frac
        num   /= 16
        denom += 8
    end
    return fpart(s)
end

pi_digit(n) =
    floor(Int, 16 * fpart(4Σ(n-1, 1) - 2Σ(n-1, 4) - Σ(n-1, 5) - Σ(n-1, 6)))

pi_string(n) = "0x3." * join(string.(pi_digit.(1:n), base = 16)) * "p0"

pi_string (generic function with 1 method)

Let's make sure this works:

In [12]:
pi_string(13)

"0x3.243f6a8885a30p0"

In [13]:
# Parse the string as a double-precision floating point number
parse(Float64, pi_string(13))

3.141592653589793

In [14]:
Float64(π) == parse(Float64, pi_string(13))

true

In [15]:
N_pi = 1_000

setprecision(BigFloat, 4 * N_pi) do
    BigFloat(π) == parse(BigFloat, pi_string(N_pi))
end

true

In [16]:
using BenchmarkTools

b = @benchmark pi_string(N_pi)

pi_serial_t = minimum(b.times)

b

BenchmarkTools.Trial: 20 samples with 1 evaluation.
 Range [90m([39m[36m[1mmin[22m[39m … [35mmax[39m[90m):  [39m[36m[1m247.706 ms[22m[39m … [35m264.021 ms[39m  [90m┊[39m GC [90m([39mmin … max[90m): [39m0.00% … 0.00%
 Time  [90m([39m[34m[1mmedian[22m[39m[90m):     [39m[34m[1m248.879 ms               [22m[39m[90m┊[39m GC [90m([39mmedian[90m):    [39m0.00%
 Time  [90m([39m[32m[1mmean[22m[39m ± [32mσ[39m[90m):   [39m[32m[1m251.850 ms[22m[39m ± [32m  5.038 ms[39m  [90m┊[39m GC [90m([39mmean ± σ[90m):  [39m0.00% ± 0.00%

  [39m▁[39m [39m▄[39m▁[34m█[39m[39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [32m [39m[39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m 
  [39m█[39m▆[39m█[39m█

### Multi-threaded implementation

Since the Bailey–Borwtimesn–Plouffe formula extracts the $n$-th digit of $\pi$ without computing the other ones, we can write a multi-threaded version of `pi_string`, taking advantage of native support for [multi-threading](https://docs.julialang.org/en/v1/manual/multi-threading/) in Julia. However note that the computational cost of `pi_digit` is $O(n\log(n))$, so the larger the value of $n$, the longer the function will take, which makes this workload very unbalanced. ***Question***: what do you expect to be the worst performing scheduler?

#### For-loop: static scheduler

In [17]:
function pi_string_threads_static(N)
    digits = Vector{Int}(undef, N)
    @threads :static for n in eachindex(digits)
        digits[n] = pi_digit(n)
    end
    return "0x3." * join(string.(digits, base = 16)) * "p0"
end

@assert pi_string_threads_static(N_pi) == pi_string(N_pi)

b = @benchmark pi_string_threads_static(N_pi)

pi_threads_static_t = minimum(b.times)

display(b)

pi_serial_t / pi_threads_static_t / nthreads() * 100

BenchmarkTools.Trial: 139 samples with 1 evaluation.
 Range [90m([39m[36m[1mmin[22m[39m … [35mmax[39m[90m):  [39m[36m[1m36.141 ms[22m[39m … [35m36.480 ms[39m  [90m┊[39m GC [90m([39mmin … max[90m): [39m0.00% … 0.00%
 Time  [90m([39m[34m[1mmedian[22m[39m[90m):     [39m[34m[1m36.180 ms              [22m[39m[90m┊[39m GC [90m([39mmedian[90m):    [39m0.00%
 Time  [90m([39m[32m[1mmean[22m[39m ± [32mσ[39m[90m):   [39m[32m[1m36.187 ms[22m[39m ± [32m38.037 μs[39m  [90m┊[39m GC [90m([39mmean ± σ[90m):  [39m0.00% ± 0.00%

  [39m [39m [39m [39m [39m [39m [39m [39m▂[39m [39m▇[39m [39m▄[39m▆[34m█[39m[39m [39m [32m [39m[39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m 
  [39m▄[39m▃[39m▅[39m▆[39m▆[39m▄[39m

42.836672630255215

#### For-loop: dynamic scheduler

In [18]:
function pi_string_threads_dynamic(N)
    digits = Vector{Int}(undef, N)
    @threads :dynamic for n in eachindex(digits)
        digits[n] = pi_digit(n)
    end
    return "0x3." * join(string.(digits, base = 16)) * "p0"
end

@assert pi_string_threads_dynamic(N_pi) == pi_string(N_pi)

b = @benchmark pi_string_threads_dynamic(N_pi)

pi_threads_dynamic_t = minimum(b.times)

display(b)

pi_serial_t / pi_threads_dynamic_t / nthreads() * 100

BenchmarkTools.Trial: 138 samples with 1 evaluation.
 Range [90m([39m[36m[1mmin[22m[39m … [35mmax[39m[90m):  [39m[36m[1m36.138 ms[22m[39m … [35m 37.668 ms[39m  [90m┊[39m GC [90m([39mmin … max[90m): [39m0.00% … 0.00%
 Time  [90m([39m[34m[1mmedian[22m[39m[90m):     [39m[34m[1m36.194 ms               [22m[39m[90m┊[39m GC [90m([39mmedian[90m):    [39m0.00%
 Time  [90m([39m[32m[1mmean[22m[39m ± [32mσ[39m[90m):   [39m[32m[1m36.265 ms[22m[39m ± [32m186.407 μs[39m  [90m┊[39m GC [90m([39mmean ± σ[90m):  [39m0.00% ± 0.00%

  [39m [39m [39m▁[39m█[39m▇[34m▆[39m[39m [39m [39m [39m [39m [32m [39m[39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m 
  [39m▄[39m▇[39m█[39m█[39m█[

42.83972490659247

#### For-loop: greedy scheduler (only Julia v1.11+)

In [19]:
@static if VERSION >= v"1.11"

function pi_string_threads_greedy(N)
    digits = Vector{Int}(undef, N)
    @threads :greedy for n in eachindex(digits)
        digits[n] = pi_digit(n)
    end
    return "0x3." * join(string.(digits, base = 16)) * "p0"
end

@assert pi_string_threads_greedy(N_pi) == pi_string(N_pi)
    
b = @benchmark pi_string_threads_greedy(N_pi)

pi_threads_greedy_t = minimum(b.times)

display(b)

pi_serial_t / pi_threads_greedy_t / nthreads() * 100

end

#### Tasks

In [20]:
function pi_string_tasks(N)
    tasks = [Threads.@spawn pi_digit(n) for n in 1:N]
    digits = [fetch(t) for t in tasks]
    return "0x3." * join(string.(digits, base = 16)) * "p0"
end

@assert pi_string_tasks(N_pi) == pi_string(N_pi)

b = @benchmark pi_string_tasks(N_pi)

pi_tasks_t = minimum(b.times)

display(b)

pi_serial_t / pi_tasks_t / nthreads() * 100

BenchmarkTools.Trial: 282 samples with 1 evaluation.
 Range [90m([39m[36m[1mmin[22m[39m … [35mmax[39m[90m):  [39m[36m[1m17.612 ms[22m[39m … [35m 27.506 ms[39m  [90m┊[39m GC [90m([39mmin … max[90m): [39m0.00% … 0.00%
 Time  [90m([39m[34m[1mmedian[22m[39m[90m):     [39m[34m[1m17.743 ms               [22m[39m[90m┊[39m GC [90m([39mmedian[90m):    [39m0.00%
 Time  [90m([39m[32m[1mmean[22m[39m ± [32mσ[39m[90m):   [39m[32m[1m17.781 ms[22m[39m ± [32m581.837 μs[39m  [90m┊[39m GC [90m([39mmean ± σ[90m):  [39m0.00% ± 0.00%

  [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m▂[39m█[39m▃[39m▆[39m▂[34m▂[39m[39m▅[39m▃[39m [39m▄[39m [39m [39m [39m [39m [39m [32m [39m[39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m 
  [39m▂[39m▁[39m▁[39m▂[39m▁[

87.90555239444942

#### OhMyThreads.jl

In [21]:
using OhMyThreads: @tasks

function pi_string_omt(N; ntasks::Int=8 * nthreads(), scheduler::Symbol=:dynamic)
    digits = Vector{Int}(undef, N)
    @tasks for n in eachindex(digits)
        @set ntasks=ntasks
        @set scheduler=scheduler
        digits[n] = pi_digit(n)
    end
    return "0x3." * join(string.(digits, base = 16)) * "p0"
end

@assert pi_string_omt(N_pi) == pi_string(N_pi)

b = @benchmark pi_string_omt(N_pi; ntasks=32 * nthreads())

pi_omt_t = minimum(b.times)

display(b)

pi_serial_t / pi_omt_t / nthreads() * 100

BenchmarkTools.Trial: 282 samples with 1 evaluation.
 Range [90m([39m[36m[1mmin[22m[39m … [35mmax[39m[90m):  [39m[36m[1m17.629 ms[22m[39m … [35m18.014 ms[39m  [90m┊[39m GC [90m([39mmin … max[90m): [39m0.00% … 0.00%
 Time  [90m([39m[34m[1mmedian[22m[39m[90m):     [39m[34m[1m17.772 ms              [22m[39m[90m┊[39m GC [90m([39mmedian[90m):    [39m0.00%
 Time  [90m([39m[32m[1mmean[22m[39m ± [32mσ[39m[90m):   [39m[32m[1m17.773 ms[22m[39m ± [32m54.271 μs[39m  [90m┊[39m GC [90m([39mmean ± σ[90m):  [39m0.00% ± 0.00%

  [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m▂[39m▂[39m [39m [39m [39m [39m [39m▆[39m▂[39m▂[39m▅[39m [39m█[34m▇[39m[39m [39m▂[39m▂[39m [39m▁[39m▃[39m [39m▁[39m▃[39m [39m▁[39m [39m [39m▁[39m▁[39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m 
  [39m▃[39m▁[39m▁[39m▃[39m▁[39m▃[39m▃[39

87.81962078092401