# Shared-memory parallelism

In [1]:
using Pkg
Pkg.activate(@__DIR__)
Pkg.instantiate()

[32m[1m  Activating[22m[39m project at `~/NERSC/julia-hpc-tutorial-juliacon25/parts/multithreading`


In [2]:
Sys.cpu_summary()

Apple M1 Max: 
          speed         user         nice          sys         idle          irq
#1-10  2400 MHz    2237547 s          0 s     864656 s   18805078 s          0 s


In [4]:
using Hwloc
topology_info()

Machine: 1 (3.17 GB)
 Package: 1 (3.17 GB)
  NUMANode: 1 (3.17 GB)
   L2Cache: 3 (4.0 MB)
    L1Cache: 10 (64.0 kB)
     Core: 10
      PU: 10
       OS_Device: 1


In [5]:
import Base.Threads: @sync, @spawn

In [6]:
function fib(x)
	if x <= 1
		return 1
	else
		b = @spawn fib(x-2)
		a = fib(x-1)
		return a+(fetch(b)::Int)
	end
end

fib (generic function with 1 method)

In [7]:
fib(10)

89

## Parallel-loops

In [8]:
import Base.Threads: @threads, nthreads, threadid

In [9]:
let 
	a = zeros(Int, nthreads()*2)
	@threads for i in 1:length(a)
	    a[i] = threadid()
	end
	a
end

2-element Vector{Int64}:
 1
 1

### Schedulers

Julia has diffferent schedulers for parallel for-loops:

- `:dynamic` (the default): Chunks the iteration-space.
- `:greedy`: One-task-per-thread, good for unequal workloads. Iteration-space is interpreted as a channel.
- `:static`: One-task-per-thread, equal division of iteration-space. Can not be nested.


In [10]:
let 
	a = zeros(Int, 1000)
	@threads :greedy for i in 1:length(a)
	    a[i] = threadid()
	end
	a
end

1000-element Vector{Int64}:
 1
 1
 1
 1
 1
 1
 1
 1
 1
 1
 1
 1
 1
 ⋮
 1
 1
 1
 1
 1
 1
 1
 1
 1
 1
 1
 1

## Parallel Primitives

`@threads` seems nice, but is difficult in many ways. Reductions is an immediate issue:

In [11]:
 function sum_bad(a)
    s = 0
    Threads.@threads for i in a
        s += i
    end
    s
end
sum_bad(1:1_000_000)

500000500000

Instead we have alternatives for things like `map`, `reduce` and `mapreduce` from `OhMyThreads.jl`:

- `tmap`
- `treduce`
- `tmapreduce`
- `tforeach`

In [13]:
using OhMyThreads

In [15]:
function test_tforeach()
	a = zeros(Int, nthreads()*2)
	tforeach(1:length(a)) do i
	    a[i] = threadid()
	end
	a
end
test_tforeach()

2-element Vector{Int64}:
 1
 1

In [17]:
using BenchmarkTools
data = rand(1_000_000 * nthreads());

### Sequential Sum

In [18]:
function simple_sum(data)
	acc = zero(eltype(data))
	for i in eachindex(data)
		acc += data[i]
	end
	acc
end

simple_sum (generic function with 1 method)

In [31]:
@benchmark sum($data)

BenchmarkTools.Trial: 10000 samples with 1 evaluation per sample.
 Range [90m([39m[36m[1mmin[22m[39m … [35mmax[39m[90m):  [39m[36m[1m109.292 μs[22m[39m … [35m141.417 μs[39m  [90m┊[39m GC [90m([39mmin … max[90m): [39m0.00% … 0.00%
 Time  [90m([39m[34m[1mmedian[22m[39m[90m):     [39m[34m[1m111.875 μs               [22m[39m[90m┊[39m GC [90m([39mmedian[90m):    [39m0.00%
 Time  [90m([39m[32m[1mmean[22m[39m ± [32mσ[39m[90m):   [39m[32m[1m112.211 μs[22m[39m ± [32m  1.840 μs[39m  [90m┊[39m GC [90m([39mmean ± σ[90m):  [39m0.00% ± 0.00%

  [39m [39m [39m [39m▂[39m [39m [39m [39m [39m [39m▁[39m▄[39m▆[39m█[39m█[34m█[39m[32m▆[39m[39m▄[39m▁[39m [39m [39m [39m [39m [39m [39m [39m [39m▂[39m▃[39m▃[39m▁[39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m▂
  [39m▅[39

In [20]:
@benchmark simple_sum($data)

BenchmarkTools.Trial: 5333 samples with 1 evaluation per sample.
 Range [90m([39m[36m[1mmin[22m[39m … [35mmax[39m[90m):  [39m[36m[1m929.250 μs[22m[39m … [35m 1.590 ms[39m  [90m┊[39m GC [90m([39mmin … max[90m): [39m0.00% … 0.00%
 Time  [90m([39m[34m[1mmedian[22m[39m[90m):     [39m[34m[1m931.958 μs              [22m[39m[90m┊[39m GC [90m([39mmedian[90m):    [39m0.00%
 Time  [90m([39m[32m[1mmean[22m[39m ± [32mσ[39m[90m):   [39m[32m[1m936.556 μs[22m[39m ± [32m18.053 μs[39m  [90m┊[39m GC [90m([39mmean ± σ[90m):  [39m0.00% ± 0.00%

  [39m█[39m▃[34m▇[39m[39m▂[39m▃[39m▁[32m▂[39m[39m▁[39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m▁
  [39m█[39m█[34m█[

### Naive Parallel

In [21]:
function parallel_sum_falsesharing(data; nchunks = nthreads())
    psums = zeros(eltype(data), nchunks)
    @sync for (c, idcs) in enumerate(OhMyThreads.index_chunks(data; n = nchunks))
        @spawn begin
            for i in idcs
                psums[c] += data[i]
            end
        end
    end
    return sum(psums)
end

parallel_sum_falsesharing (generic function with 1 method)

In [22]:
 sum(data) ≈ parallel_sum_falsesharing(data)

true

In [23]:
@benchmark parallel_sum_falsesharing($data)

BenchmarkTools.Trial: 1437 samples with 1 evaluation per sample.
 Range [90m([39m[36m[1mmin[22m[39m … [35mmax[39m[90m):  [39m[36m[1m3.422 ms[22m[39m … [35m 31.595 ms[39m  [90m┊[39m GC [90m([39mmin … max[90m): [39m0.00% … 0.00%
 Time  [90m([39m[34m[1mmedian[22m[39m[90m):     [39m[34m[1m3.433 ms               [22m[39m[90m┊[39m GC [90m([39mmedian[90m):    [39m0.00%
 Time  [90m([39m[32m[1mmean[22m[39m ± [32mσ[39m[90m):   [39m[32m[1m3.479 ms[22m[39m ± [32m752.162 μs[39m  [90m┊[39m GC [90m([39mmean ± σ[90m):  [39m0.00% ± 0.00%

  [39m▆[39m▇[39m█[34m▄[39m[39m▄[39m▃[39m▃[39m▃[39m▃[39m▁[39m▁[39m▁[39m [32m [39m[39m [39m▁[39m [39m [39m [39m [39m [39m [39m [39m [39m [39m▁[39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m 
  [39m█[39m█[39m█[34m█[39m

In [24]:
const CACHE_LINE_SIZE = 64
function parallel_sum_padded(data; nchunks = nthreads())
	# pad each entry
	stride = CACHE_LINE_SIZE ÷ sizeof(eltype(data))
    psums = zeros(eltype(data), nchunks * stride)
    @sync for (c, idcs) in enumerate(OhMyThreads.index_chunks(data; n = nchunks))
        @spawn begin
			c_idx = (c-1) * stride + 1
            for i in idcs
                psums[c_idx] += data[i]
            end
        end
    end
    return sum(psums)
end

parallel_sum_padded (generic function with 1 method)

In [25]:
@benchmark parallel_sum_padded($data)

BenchmarkTools.Trial: 1431 samples with 1 evaluation per sample.
 Range [90m([39m[36m[1mmin[22m[39m … [35mmax[39m[90m):  [39m[36m[1m3.422 ms[22m[39m … [35m 26.797 ms[39m  [90m┊[39m GC [90m([39mmin … max[90m): [39m0.00% … 0.00%
 Time  [90m([39m[34m[1mmedian[22m[39m[90m):     [39m[34m[1m3.433 ms               [22m[39m[90m┊[39m GC [90m([39mmedian[90m):    [39m0.00%
 Time  [90m([39m[32m[1mmean[22m[39m ± [32mσ[39m[90m):   [39m[32m[1m3.493 ms[22m[39m ± [32m753.884 μs[39m  [90m┊[39m GC [90m([39mmean ± σ[90m):  [39m0.00% ± 0.00%

  [39m▄[39m▅[39m█[34m▅[39m[39m▃[39m▂[39m▂[39m▂[39m▂[39m▁[39m▁[39m▁[39m [39m [39m [39m [39m [39m [32m [39m[39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m 
  [39m█[39m█[39m█[34m█[39m

### Task-local parallel sum

In [26]:
function parallel_sum_tasklocal(data; nchunks = nthreads())
    psums = zeros(eltype(data), nchunks)
    @sync for (c, idcs) in enumerate(OhMyThreads.index_chunks(data; n = nchunks))
        @spawn begin
            local s = zero(eltype(data))
            for i in idcs
                s += data[i]
            end
            psums[c] = s
        end
    end
    return sum(psums)
end

parallel_sum_tasklocal (generic function with 1 method)

In [27]:
@benchmark parallel_sum_tasklocal($data)

BenchmarkTools.Trial: 5166 samples with 1 evaluation per sample.
 Range [90m([39m[36m[1mmin[22m[39m … [35mmax[39m[90m):  [39m[36m[1m941.792 μs[22m[39m … [35m 16.158 ms[39m  [90m┊[39m GC [90m([39mmin … max[90m): [39m0.00% … 0.00%
 Time  [90m([39m[34m[1mmedian[22m[39m[90m):     [39m[34m[1m945.208 μs               [22m[39m[90m┊[39m GC [90m([39mmedian[90m):    [39m0.00%
 Time  [90m([39m[32m[1mmean[22m[39m ± [32mσ[39m[90m):   [39m[32m[1m965.743 μs[22m[39m ± [32m221.039 μs[39m  [90m┊[39m GC [90m([39mmean ± σ[90m):  [39m0.00% ± 0.00%

  [39m█[39m▇[34m▅[39m[39m▃[39m▂[39m▂[39m▁[39m▂[39m▂[39m▂[39m▂[32m▁[39m[39m▂[39m▂[39m▁[39m▁[39m▁[39m▁[39m▁[39m▁[39m▁[39m▁[39m [39m▁[39m [39m [39m [39m▁[39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m▁
  [39m█[39m

In [28]:
function parallel_sum_map(data; nchunks = nthreads())
    psums = zeros(eltype(data), nchunks)
    @sync for (c, idcs) in enumerate(OhMyThreads.index_chunks(data; n = nchunks))
        @spawn begin
            psums[c] = sum(view(data, idcs))
        end
    end
    return sum(psums)
end

parallel_sum_map (generic function with 1 method)

In [29]:
@benchmark parallel_sum_map($data)

BenchmarkTools.Trial: 10000 samples with 1 evaluation per sample.
 Range [90m([39m[36m[1mmin[22m[39m … [35mmax[39m[90m):  [39m[36m[1m123.375 μs[22m[39m … [35m210.708 μs[39m  [90m┊[39m GC [90m([39mmin … max[90m): [39m0.00% … 0.00%
 Time  [90m([39m[34m[1mmedian[22m[39m[90m):     [39m[34m[1m127.334 μs               [22m[39m[90m┊[39m GC [90m([39mmedian[90m):    [39m0.00%
 Time  [90m([39m[32m[1mmean[22m[39m ± [32mσ[39m[90m):   [39m[32m[1m128.290 μs[22m[39m ± [32m  5.180 μs[39m  [90m┊[39m GC [90m([39mmean ± σ[90m):  [39m0.00% ± 0.00%

  [39m [39m▃[39m▂[39m▁[39m▅[39m█[39m█[39m▆[39m█[34m█[39m[39m▆[32m▄[39m[39m▄[39m▂[39m▃[39m▂[39m▂[39m▁[39m [39m [39m▁[39m▁[39m [39m [39m [39m▁[39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m▂
  [39m█[39

In [30]:
@benchmark treduce($+, $data; ntasks = $nthreads())

BenchmarkTools.Trial: 10000 samples with 1 evaluation per sample.
 Range [90m([39m[36m[1mmin[22m[39m … [35mmax[39m[90m):  [39m[36m[1m109.459 μs[22m[39m … [35m142.500 μs[39m  [90m┊[39m GC [90m([39mmin … max[90m): [39m0.00% … 0.00%
 Time  [90m([39m[34m[1mmedian[22m[39m[90m):     [39m[34m[1m112.000 μs               [22m[39m[90m┊[39m GC [90m([39mmedian[90m):    [39m0.00%
 Time  [90m([39m[32m[1mmean[22m[39m ± [32mσ[39m[90m):   [39m[32m[1m112.220 μs[22m[39m ± [32m  1.282 μs[39m  [90m┊[39m GC [90m([39mmean ± σ[90m):  [39m0.00% ± 0.00%

  [39m [39m [39m [39m [39m [39m▁[39m [39m [39m [39m [39m [39m [39m [39m [39m▂[39m▄[39m▅[39m▇[39m█[39m█[34m█[39m[39m█[32m▇[39m[39m▅[39m▃[39m▂[39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m▂[39m▂[39m▃[39m▂[39m▂[39m▁[39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m▃
  [39m▃[39

## Extra Content

### Channels

Julia tasks are *communicating*, communication can happen with dedicated programming concepts like `Channel`, or directly through memory shared with another tasks.

Channels are first-in-first-out queues that can either be buffered (e.g. contain a reservoir for a number of elements), or un-buffered/blocking. 

In [32]:
let ch = Channel{Int}(Inf) # buffered
	@sync begin
		for i in 1:10
			@spawn put!(ch, rand(Int))
		end
	end
	close(ch) # Otherwise collect will wait for more data
	collect(ch)
end

10-element Vector{Int64}:
  5425055943555394301
 -5264605782367559992
 -4078813599772696767
  -894277178252281183
  9089634521574073254
 -7216591403843538244
  6280963797893285664
  7734217592546287403
 -3617760441057519500
 -5574986787406518659

### Race-conditions

`Channel` is a concurrent data-structure and ensure that it safe to use with multiple tasks. When we use our own data-structures we have to make sure that we make them safe if necessary, otherwise we will observe data-races.

In [33]:
mutable struct BrokenCounter
	x::Int
end

In [34]:
let a = BrokenCounter(0)
	N = 10
	K = 100_000
	@sync for i in 1:N
		@spawn for i in 1:K
			a.x += 1
			GC.safepoint()
		end
	end
	a.x, N*K, a.x/(N*K)
end

(1000000, 1000000, 1.0)

### Atomics & Locks

One way this can be fixed is to use atomics. Atomics allow to express the read-and-increment operation as one operation.

In [35]:
mutable struct AtomicCounter
	@atomic x::Int
end

In [36]:
let a = AtomicCounter(0)
	N = 10
	K = 100_000
	@sync for i in 1:N
		@spawn for i in 1:K
			@atomic a.x += 1
			GC.safepoint()
		end
	end
	a.x, N*K, a.x/(N*K)
end

(1000000, 1000000, 1.0)

In [37]:
let a = AtomicCounter(0)
	a.x = 10
end

LoadError: ConcurrencyViolationError("setfield!: atomic field cannot be written non-atomically")

In [38]:
let a = AtomicCounter(0)
	@atomic :sequentially_consistent a.x = 1+2
end

3

In [39]:
let a = Base.Lockable(BrokenCounter(0), Base.ReentrantLock())
	N = 10
	K = 100_000
	@sync for i in 1:N
		@spawn for i in 1:K
			@lock(a, a[].x += 1)
			GC.safepoint()
		end
	end
	@lock a begin
		a[].x, N*K, a[].x/(N*K)
	end
end

(1000000, 1000000, 1.0)