# JuliaHEP 2023 Workshop -  HPC Tutorial

**When:** November 7, 2023

**Where:** Erlangen Centre for Astroparticle Physics (ECAP)

<div style="color: red"><b>TODO: What's the plan for the next hour</b></div>

## Julia on HPC Clusters (interactively)

* **JupyterHub** (if available): https://jh.pc2.uni-paderborn.de/
* **VS Code** → Remote SSH Extension
  * login node (easy)
  * compute node (tricky, sometimes impossible): [PC2 docs](https://upb-pc2.atlassian.net/wiki/spaces/PC2DOK/pages/1902225/Access+for+Applications+like+Visual+Studio+Code#Compute-nodes)
* Traditional terminal approach (SSH + e.g. vim + REPL)

## Computational Kernel: AXPY

"*A time X plus Y*"

$$ \vec{y} = a \cdot \vec{x} + \vec{y} $$

Depending on the data type / precision:

* **S**AXPY (S = single precision, i.e. `Float32`)
* **D**AXPY (D = double precision, i.e. `Float64`)

In [1]:
function axpy_serial!(y, a, x)
    #
    # TODO: Implement the (serial) AXPY kernel.
    #
    for i in eachindex(x,y)
        @inbounds y[i] = a * x[i] + y[i]
    end
    return nothing
end

axpy_serial! (generic function with 1 method)

Let's benchmark the performance.

In [2]:
using BenchmarkTools

const N = 2^30

a = 3.141
x = rand(N)
y = rand(N)

@btime axpy_serial!($y, $a, $x) samples=5 evals=3;

  678.868 ms (0 allocations: 0 bytes)


#### Questions
* How many **bytes** are transferred per iteration?
* How many **flops** (floating point operations) are performed per iteration?

**Bonus question:** How many **bytes** would be transferred in a non-inplace variant, i.e. `z[i] = a * x[i] + y[i]`?

<div style="color: red"><b>TODO: Hardware imbalance FLOPS vs MEM</b></div>

In [3]:
function generate_input_data(; N, dtype, kwargs...)
    a = dtype(3.141)
    x = rand(dtype, N)
    y = rand(dtype, N)
    return a,x,y
end

function measure_perf(f::F; N=2^30, dtype=Float64, verbose=true, kwargs...) where {F}  
    # input data
    a,x,y = generate_input_data(; N, dtype, kwargs...)

    # time measurement
    t = @belapsed $f($y, $a, $x) evals = 2 samples = 10
    
    # compute memory bandwidth and flops
    bytes = 3 * sizeof(dtype) * N # TODO: num bytes transferred in AXPY kernel (all iterations)
    flops = 2 * N # TODO: num flops performed in AXPY kernel (all iterations)
    mem_rate = bytes * 1e-9 / t # TODO: memory bandwidth in GB/s
    flop_rate = flops * 1e-9 / t # TODO: flops in GFLOP/s
    
    if verbose
        println("Dtype: $dtype")
        println("\tMemory Bandwidth (GB/s): ", round(mem_rate; digits=2))
        println("\tCompute (GFLOP/s): ", round(flop_rate; digits=2))
    end
    return mem_rate, flop_rate
end

measure_perf (generic function with 1 method)

In [4]:
measure_perf(axpy_serial!);

Dtype: Float64
	Memory Bandwidth (GB/s): 39.26
	Compute (GFLOP/s): 3.27


## Node-Level Parallelisation (Multithreading)

**SIMD:** `axpy_serial!` is already *parallel* at instruction level

In [5]:
@code_native debuginfo=:none axpy_serial!(y,a,x)

	[0m.text
	[0m.file	[0m"axpy_serial!"
	[0m.globl	[0m"julia_axpy_serial!_1072"       [90m# -- Begin function julia_axpy_serial!_1072[39m
	[0m.p2align	[33m4[39m[0m, [33m0x90[39m
	[0m.type	[0m"julia_axpy_serial!_1072"[0m,[0m@function
[91m"julia_axpy_serial!_1072":[39m              [90m# @"julia_axpy_serial!_1072"[39m
	[0m.cfi_startproc
[90m# %bb.0:                                # %top[39m
	[96m[1mpushq[22m[39m	[0m%rbp
	[0m.cfi_def_cfa_offset [33m16[39m
	[0m.cfi_offset [0m%rbp[0m, [33m-16[39m
	[96m[1mmovq[22m[39m	[0m%rsp[0m, [0m%rbp
	[0m.cfi_def_cfa_register [0m%rbp
	[96m[1mpushq[22m[39m	[0m%r15
	[96m[1mpushq[22m[39m	[0m%r14
	[96m[1mpushq[22m[39m	[0m%r13
	[96m[1mpushq[22m[39m	[0m%r12
	[96m[1mpushq[22m[39m	[0m%rbx
	[96m[1mandq[22m[39m	[33m$-32[39m[0m, [0m%rsp
	[96m[1msubq[22m[39m	[33m$96[39m[0m, [0m%rsp
	[0m.cfi_offset [0m%rbx[0m, [33m-56[39m
	[0m.cfi_offset [0m%r12[0m, [33m-48[39m
	[0m.cfi_of

We want to parallelize our AXPY kernel via multithreading. Julia provides the `@threads` macro to multithread for-loops.

**Make sure that you actually have multiple threads!** (I recommend 8 threads on Noctua 2.)

In [6]:
using Base.Threads: @threads, nthreads

@assert nthreads() > 1
nthreads()

128

In [7]:
function axpy_multithreading_dynamic!(y, a, x)
    #
    # TODO: Implement a naive multithreaded AXPY kernel (with @threads).
    #
    @threads for i in eachindex(x,y)
        @inbounds y[i] = a * x[i] + y[i]
    end
    return nothing
end

axpy_multithreading_dynamic! (generic function with 1 method)

In [8]:
measure_perf(axpy_multithreading_dynamic!);

Dtype: Float64
	Memory Bandwidth (GB/s): 32.55
	Compute (GFLOP/s): 2.71


🙁 **What's going on?! Why no (or not much) speedup?!** 😢

### Pinning Julia threads

**Why** pin threads?

* stable performance (e.g. avoid fluctuations in benchmarks)
* avoid double occupation of CPU-cores / CPU-threads
* fixed memory locality
* (hardware performance monitoring → [LIKWID.jl](https://github.com/JuliaPerf/LIKWID.jl))

**How** pin Julia threads? → [ThreadPinning.jl](https://github.com/carstenbauer/ThreadPinning.jl)

What about external tools like `numactl`, `taskset`, etc.? Doesn't work reliably because they often [can't distinguish](https://discourse.julialang.org/t/thread-affinitization-pinning-julia-threads-to-cores/58069/5) between Julia threads and other internal threads.

<br>
<img src="./imgs/threadpinning_pinthreads.svg" width=700>
<br>

(More? See my short talk at JuliaCon2023 @ MIT: https://youtu.be/6Whc9XtlCC0)

In [9]:
using ThreadPinning

In [10]:
threadinfo()


System: 128 cores (no SMT), 2 sockets, 8 NUMA domains

[0m[1m| [22m[39m0,[33m[1m1[22m[39m,[33m[1m2[22m[39m,[33m[1m3[22m[39m,[33m[1m4[22m[39m,[33m[1m5[22m[39m,[33m[1m6[22m[39m,[33m[1m7[22m[39m,[33m[1m8[22m[39m,[33m[1m9[22m[39m,[33m[1m10[22m[39m,[33m[1m11[22m[39m,[33m[1m12[22m[39m,[33m[1m13[22m[39m,[33m[1m14[22m[39m,[33m[1m15[22m[39m,
  [39m16,[33m[1m17[22m[39m,[33m[1m18[22m[39m,[33m[1m19[22m[39m,[33m[1m20[22m[39m,[33m[1m21[22m[39m,[33m[1m22[22m[39m,[33m[1m23[22m[39m,[33m[1m24[22m[39m,[33m[1m25[22m[39m,[33m[1m26[22m[39m,[33m[1m27[22m[39m,[33m[1m28[22m[39m,[33m[1m29[22m[39m,[33m[1m30[22m[39m,[33m[1m31[22m[39m,
  [33m[1m32[22m[39m,[33m[1m33[22m[39m,[33m[1m34[22m[39m,[33m[1m35[22m[39m,[33m[1m36[22m[39m,[33m[1m37[22m[39m,[33m[1m38[22m[39m,[33m[1m39[22m[39m,[33m[1m40[22m[39m,[33m[1m41[22m[39m,[33m[1m42[22m[39m,[33m[1m43[22

In [11]:
pinthreads(:cores)

In [12]:
threadinfo()


System: 128 cores (no SMT), 2 sockets, 8 NUMA domains

[0m[1m| [22m[33m[1m0[22m[39m,[33m[1m1[22m[39m,[33m[1m2[22m[39m,[33m[1m3[22m[39m,[33m[1m4[22m[39m,[33m[1m5[22m[39m,[33m[1m6[22m[39m,[33m[1m7[22m[39m,[33m[1m8[22m[39m,[33m[1m9[22m[39m,[33m[1m10[22m[39m,[33m[1m11[22m[39m,[33m[1m12[22m[39m,[33m[1m13[22m[39m,[33m[1m14[22m[39m,[33m[1m15[22m[39m,
  [33m[1m16[22m[39m,[33m[1m17[22m[39m,[33m[1m18[22m[39m,[33m[1m19[22m[39m,[33m[1m20[22m[39m,[33m[1m21[22m[39m,[33m[1m22[22m[39m,[33m[1m23[22m[39m,[33m[1m24[22m[39m,[33m[1m25[22m[39m,[33m[1m26[22m[39m,[33m[1m27[22m[39m,[33m[1m28[22m[39m,[33m[1m29[22m[39m,[33m[1m30[22m[39m,[33m[1m31[22m[39m,
  [33m[1m32[22m[39m,[33m[1m33[22m[39m,[33m[1m34[22m[39m,[33m[1m35[22m[39m,[33m[1m36[22m[39m,[33m[1m37[22m[39m,[33m[1m38[22m[39m,[33m[1m39[22m[39m,[33m[1m40[22m[39m,[33m[1m41[22m[39m,[33m[1m

In [13]:
pinthreads(:sockets)

In [14]:
threadinfo()


System: 128 cores (no SMT), 2 sockets, 8 NUMA domains

[0m[1m| [22m[33m[1m0[22m[39m,[33m[1m1[22m[39m,[33m[1m2[22m[39m,[33m[1m3[22m[39m,[33m[1m4[22m[39m,[33m[1m5[22m[39m,[33m[1m6[22m[39m,[33m[1m7[22m[39m,[33m[1m8[22m[39m,[33m[1m9[22m[39m,[33m[1m10[22m[39m,[33m[1m11[22m[39m,[33m[1m12[22m[39m,[33m[1m13[22m[39m,[33m[1m14[22m[39m,[33m[1m15[22m[39m,
  [33m[1m16[22m[39m,[33m[1m17[22m[39m,[33m[1m18[22m[39m,[33m[1m19[22m[39m,[33m[1m20[22m[39m,[33m[1m21[22m[39m,[33m[1m22[22m[39m,[33m[1m23[22m[39m,[33m[1m24[22m[39m,[33m[1m25[22m[39m,[33m[1m26[22m[39m,[33m[1m27[22m[39m,[33m[1m28[22m[39m,[33m[1m29[22m[39m,[33m[1m30[22m[39m,[33m[1m31[22m[39m,
  [33m[1m32[22m[39m,[33m[1m33[22m[39m,[33m[1m34[22m[39m,[33m[1m35[22m[39m,[33m[1m36[22m[39m,[33m[1m37[22m[39m,[33m[1m38[22m[39m,[33m[1m39[22m[39m,[33m[1m40[22m[39m,[33m[1m41[22m[39m,[33m[1m

#### Benchmark with pinned threads

In [15]:
pinthreads(:cores)
measure_perf(axpy_multithreading_dynamic!);

Dtype: Float64
	Memory Bandwidth (GB/s): 32.47
	Compute (GFLOP/s): 2.71


**Still the same performance?!** 😢

### Data placement (NUMA)

One (of two) AMD Milan CPUs in Noctua 2:

<img src="./imgs/amd_milan_cpu_die.svg" width=800>

**Image source:** AMD, [High Performance Computing (HPC) Tuning Guide for AMD EPYCTM 7003 Series Processors](https://www.amd.com/system/files/documents/high-performance-computing-tuning-guide-amd-epyc7003-series-processors.pdf)

<img src="./imgs/noctua2_topo.svg" width=1000>

In [16]:
threadinfo(; groupby=:numa) # switch from socket/CPU grouping to NUMA grouping


System: 128 cores (no SMT), 2 sockets, 8 NUMA domains

[0m[1m| [22m[33m[1m0[22m[39m,[33m[1m1[22m[39m,[33m[1m2[22m[39m,[33m[1m3[22m[39m,[33m[1m4[22m[39m,[33m[1m5[22m[39m,[33m[1m6[22m[39m,[33m[1m7[22m[39m,[33m[1m8[22m[39m,[33m[1m9[22m[39m,[33m[1m10[22m[39m,[33m[1m11[22m[39m,[33m[1m12[22m[39m,[33m[1m13[22m[39m,[33m[1m14[22m[39m,[33m[1m15[22m[39m[0m[1m |[22m
[0m[1m| [22m[33m[1m16[22m[39m,[33m[1m17[22m[39m,[33m[1m18[22m[39m,[33m[1m19[22m[39m,[33m[1m20[22m[39m,[33m[1m21[22m[39m,[33m[1m22[22m[39m,[33m[1m23[22m[39m,[33m[1m24[22m[39m,[33m[1m25[22m[39m,[33m[1m26[22m[39m,[33m[1m27[22m[39m,[33m[1m28[22m[39m,[33m[1m29[22m[39m,[33m[1m30[22m[39m,[33m[1m31[22m[39m[0m[1m |[22m
[0m[1m| [22m[33m[1m32[22m[39m,[33m[1m33[22m[39m,[33m[1m34[22m[39m,[33m[1m35[22m[39m,[33m[1m36[22m[39m,[33m[1m37[22m[39m,[33m[1m38[22m[39m,[33m[1m39[22m[39m

#### How to control data placement (explicitly)?
→ [NUMA.jl](https://github.com/JuliaPerf/NUMA.jl)

`Vector{Float64}(numanode(i), length)` (kind of similar to `Vector{Float64}(undef, length)`)

In [17]:
using NUMA, Random

In [18]:
data = Vector{Float64}(numanode(1), 100); rand!(data);

In [19]:
which_numa_node(data)

1

In [20]:
data = Vector{Float64}(numanode(8), 100); rand!(data);

In [21]:
which_numa_node(data)

8

Let's do a quick and dirty benchmark to get an idea how much this matters for performance.

In [22]:
node1 = current_numa_node()
node2 = mod1(current_numa_node() + nnumanodes()÷2, nnumanodes()) # numa node in other CPU/socket

println("local NUMA node")
x = Vector{Float64}(numanode(node1), N); rand!(x)
y = Vector{Float64}(numanode(node1), N); rand!(y)

@btime axpy_serial!($y, $a, $x) samples=5 evals=3;

println("distant NUMA node")
x = Vector{Float64}(numanode(node2), N); rand!(x)
y = Vector{Float64}(numanode(node2), N); rand!(y)

@btime axpy_serial!($y, $a, $x) samples=5 evals=3;

local NUMA node
  724.972 ms (0 allocations: 0 bytes)
distant NUMA node
  1.047 s (0 allocations: 0 bytes)


Note that the performance issue will be mouch more pronounced in multithreaded cases, where different threads might try to access the same non-local data over the same memory channel(s).

#### How to control data placement (implicitly)?

→ **"First-touch" policy**

```julia
x = Vector{Float64}(undef, 10)   # allocation, no "touch" yet
rand!(x)                         # first touch == first write
```

In [23]:
pinthreads(:numa)
threadinfo(; groupby=:numa)


System: 128 cores (no SMT), 2 sockets, 8 NUMA domains

[0m[1m| [22m[33m[1m0[22m[39m,[33m[1m1[22m[39m,[33m[1m2[22m[39m,[33m[1m3[22m[39m,[33m[1m4[22m[39m,[33m[1m5[22m[39m,[33m[1m6[22m[39m,[33m[1m7[22m[39m,[33m[1m8[22m[39m,[33m[1m9[22m[39m,[33m[1m10[22m[39m,[33m[1m11[22m[39m,[33m[1m12[22m[39m,[33m[1m13[22m[39m,[33m[1m14[22m[39m,[33m[1m15[22m[39m[0m[1m |[22m
[0m[1m| [22m[33m[1m16[22m[39m,[33m[1m17[22m[39m,[33m[1m18[22m[39m,[33m[1m19[22m[39m,[33m[1m20[22m[39m,[33m[1m21[22m[39m,[33m[1m22[22m[39m,[33m[1m23[22m[39m,[33m[1m24[22m[39m,[33m[1m25[22m[39m,[33m[1m26[22m[39m,[33m[1m27[22m[39m,[33m[1m28[22m[39m,[33m[1m29[22m[39m,[33m[1m30[22m[39m,[33m[1m31[22m[39m[0m[1m |[22m
[0m[1m| [22m[33m[1m32[22m[39m,[33m[1m33[22m[39m,[33m[1m34[22m[39m,[33m[1m35[22m[39m,[33m[1m36[22m[39m,[33m[1m37[22m[39m,[33m[1m38[22m[39m,[33m[1m39[22m[39m

In [24]:
for tid in 1:8
    @sync @tspawnat tid begin            # ThreadPinning.@tspawnat creates *sticky* tasks that don't migrate between threads
        x = Vector{Float64}(undef, 10)   # allocation, no "touch" yet
        rand!(x)                         # first touch
        @show tid, which_numa_node(x)
    end
end

(tid, which_numa_node(x)) = (1, 1)
(tid, which_numa_node(x)) = (2, 2)
(tid, which_numa_node(x)) = (3, 3)
(tid, which_numa_node(x)) = (4, 4)
(tid, which_numa_node(x)) = (5, 5)
(tid, which_numa_node(x)) = (6, 6)
(tid, which_numa_node(x)) = (7, 7)
(tid, which_numa_node(x)) = (8, 8)


##### NUMA-optimized AXPY

**Question**
* How can we modify our AXPY benchmark to optimize for local memory accesses (based on the first-touch policy)?

In [25]:
using Random

function generate_input_data(; N, dtype, parallel=false, kwargs...)
    #
    # TODO: introduce a new keyword argument that, when set to true, initializes the data in parallel
    #       (in the same way as we'll later use it)
    #
    a = dtype(3.141)
    x = Vector{dtype}(undef, N)
    y = Vector{dtype}(undef, N)
    if !parallel
        rand!(x)
        rand!(y)
    else
        @threads for i in eachindex(x,y)
            x[i] = rand()
            y[i] = rand()
        end
    end
    return a,x,y
end

generate_input_data (generic function with 1 method)

In [26]:
pinthreads(:numa)
measure_perf(axpy_multithreading_dynamic!; parallel=false);
measure_perf(axpy_multithreading_dynamic!; parallel=true);

Dtype: Float64
	Memory Bandwidth (GB/s): 32.2
	Compute (GFLOP/s): 2.68
Dtype: Float64
	Memory Bandwidth (GB/s): 232.58
	Compute (GFLOP/s): 19.38


**Speedup! Yeah!** 😄 🎉

But.... less than expected!? 😕

**Question**
* What kind of speedup would we expect (ideally)?

In [27]:
threadinfo(; groupby=:numa)


System: 128 cores (no SMT), 2 sockets, 8 NUMA domains

[0m[1m| [22m[33m[1m0[22m[39m,[33m[1m1[22m[39m,[33m[1m2[22m[39m,[33m[1m3[22m[39m,[33m[1m4[22m[39m,[33m[1m5[22m[39m,[33m[1m6[22m[39m,[33m[1m7[22m[39m,[33m[1m8[22m[39m,[33m[1m9[22m[39m,[33m[1m10[22m[39m,[33m[1m11[22m[39m,[33m[1m12[22m[39m,[33m[1m13[22m[39m,[33m[1m14[22m[39m,[33m[1m15[22m[39m[0m[1m |[22m
[0m[1m| [22m[33m[1m16[22m[39m,[33m[1m17[22m[39m,[33m[1m18[22m[39m,[33m[1m19[22m[39m,[33m[1m20[22m[39m,[33m[1m21[22m[39m,[33m[1m22[22m[39m,[33m[1m23[22m[39m,[33m[1m24[22m[39m,[33m[1m25[22m[39m,[33m[1m26[22m[39m,[33m[1m27[22m[39m,[33m[1m28[22m[39m,[33m[1m29[22m[39m,[33m[1m30[22m[39m,[33m[1m31[22m[39m[0m[1m |[22m
[0m[1m| [22m[33m[1m32[22m[39m,[33m[1m33[22m[39m,[33m[1m34[22m[39m,[33m[1m35[22m[39m,[33m[1m36[22m[39m,[33m[1m37[22m[39m,[33m[1m38[22m[39m,[33m[1m39[22m[39m

### Tasks vs Threads

Conceptually, Julia implements **task-based multithreading**.

**A user shouldn't care about threads but tasks!**

<img src="./imgs/julia_tasks_vs_threads.png" width=1000>

In **"traditional" HPC**, we typically care about threads directly, i.e. we tell every thread what it should do.

In Julia's **task-based multithreading**, a task - e.g. a computational piece of a code - is only marked for **parallel execution** (`@spawn`, `@threads`) on **any** of the available Julia threads. Julias **dynamic scheduler** will then take care of running the task on any of the threads (the task might even migrate!).

*Advantages:*
* high-level abstraction
* **composability / nestability** (Multithreaded code can call multithreaded code can call multithreaded code ....)

*Disadvantages:*
* potential scheduling overhead
* **task → thread assignment uncertain (can vary dynamically + task migration)**
* can get in the way when performance engineering
  * scheduler has limited information (e.g. about the system topology)
  * low-level profiling (e.g. with LIKWID) requires fixed `task → thread → core` mapping.

#### Opt-out of dynamic scheduling

We can pt-out of Julia's dynamic scheduling and get **guarantees about the task-thread assignment** (and the iterations → task mapping).

Syntax: `@threads :static for ...`

 * splits up the iteration space into `nthreads()` even, contiguous blocks (in-order) and creates precisely one task per block
 * **statically** maps tasks to threads, specifically: task 1 -> thread 1, task 2 -> thread 2, etc.
   * no task migration, i.e. **fixed task-thread mapping** 👍
   * only little overhead 👍
   * not composable / nestable 👎
     

**In short:**

Dynamic scheduling: `@spawn`, `@threads :dynamic` (default)

Static scheduling (i.e. fixed task → thread mapping): `ThreadPinning.@tspawnat`, `@threads :static`

#### Statically scheduled AXPY

In [28]:
function axpy_multithreading_static!(y, a, x)
    #
    # TODO: Implement a statically scheduled multithreaded AXPY kernel (with @threads :static).
    #
    @threads :static for i in eachindex(x,y)
        @inbounds y[i] = a * x[i] + y[i]
    end
    return nothing
end

axpy_multithreading_static! (generic function with 1 method)

We also need to adapt the input data generation.

In [29]:
function generate_input_data(; N, dtype, parallel=false, static=false, kwargs...)
    #
    # TODO: introduce a new keyword argument `static` that, when set to true, initializes the data in parallel with static scheduling
    #       (in the same way as we'll later use it)
    #
    a = dtype(3.141)
    x = Vector{dtype}(undef, N)
    y = Vector{dtype}(undef, N)
    if !parallel
        rand!(x)
        rand!(y)
    else
        if !static
            @threads for i in eachindex(x,y)
                x[i] = rand()
                y[i] = rand()
            end
        else
            @threads :static for i in eachindex(x,y)
                x[i] = rand()
                y[i] = rand()
            end
        end
    end
    return a,x,y
end

generate_input_data (generic function with 1 method)

In [30]:
pinthreads(:numa)
measure_perf(axpy_multithreading_static!; parallel=false, static=true);
measure_perf(axpy_multithreading_static!; parallel=true, static=true);

Dtype: Float64
	Memory Bandwidth (GB/s): 32.24
	Compute (GFLOP/s): 2.69
Dtype: Float64
	Memory Bandwidth (GB/s): 314.47
	Compute (GFLOP/s): 26.21


**Finally, the expected speedup!** 😄 🎉