# Multi-Threading Tutorial

This notebook is a tutorial for Julia's built-in multi-threading capabilities.

In [1]:
using LinearAlgebra

## Multi-threading Overview

Multi-threading is a type of *shared memory* parallel programming.

- It is a way to make use of all the cores available on your computer when you run computations.
- All of these cores have access to the same memory (technically, they still have their own cache hierarchies)
- Each core can be doing work separately

**Exercise 0**: How many cores does your computer have? Look it up!

## Getting Started

All of the multi-threading capabilities are handeled in Base Julia, specifically in a package `Base.Threads`.

- You don't need to load this package since it's part of Base
- You do need to write macros like `Threads.@spawn` unless you import these functions specifically

Let's call `nthreads` to see how many threads we have available to us.

In [2]:
Threads.nthreads()

4

Huh, I only have one thread! Why is this?

**We need to tell Julia how many threads it has available when it starts up**

When you run a julia executable or interactive julia in the REPL, this can be done with

```
julia --threads=4
```

**Exercise 1**: Start an interactive Julia session with the `--threads` command, and run `Threads.nthreads()`.

### Multi-threading in a Jupyter Notebook

In order to get multi-threading to work in a Jupyter notebook, you need to launch a kernel that has set `--threads` to a specific value.
Here is a [discourse link](https://github.com/JuliaLang/IJulia.jl/issues/882#issuecomment-579520246) on how to do that.

1. Launch an interactive Julia session
2. Run the following commands
```julia
using IJulia
IJulia.installkernel("Julia 4 Threads", env=Dict(
    "JULIA_NUM_THREADS" => "4",
))
```
3. Exit Julia `exit()`
4. Launch jupyter notebook `jupyter notebook`
5. You can now switch your kernel!

**Exercise 2**: Do this with the number of cores you have on your machine and launch your multi-threaded Jupyter notebook

### Re-running with a new kernel

I am going to re-run with a new kernel and see if I have more threads.

In [3]:
Threads.nthreads()

4

Yup, this number should now be 4.

## Illustration 1: Minimum Distance

Let's suppose I have a dataset $x_1 \dots x_n \in \mathbb{R}^d$.

- Here is code that will compute the minimum distance across all pairs of vectors.
- You should copy this code

In [4]:
function compute_min_dist(dataset; slow=false)
    n = length(dataset)
    
    min_dist = Inf
    for i=1:n, j=(i+1):n
        dist_val = norm(dataset[i] - dataset[j])
        min_dist = min(dist_val, min_dist)
    end
    
    # simulate an intensive computation
    if slow
        sleep(5)
    end
    
    return min_dist
end

compute_min_dist (generic function with 1 method)

Let's just check that this runs on a small dataset.

In [5]:
n = 10
d = 3

# generate dataset
dataset = [randn(d) for i=1:n]

# compute min dist
compute_min_dist(dataset)

0.8004991222082375

Now suppose that I had several datasets.
Let's start by making 3 datasets of a larger size.

In [6]:
n = 1000
d = 20

# generate 4 datasets
dataset1 = [randn(d) for i=1:n]
dataset2 = [randn(d) for i=1:n]
dataset3 = [randn(d) for i=1:n]
dataset4 = [randn(d) for i=1:n];

Let's time how long it takes to run serial code on all three of these datasets

In [7]:
@time begin
    md1 = compute_min_dist(dataset1)
    md2 = compute_min_dist(dataset2)
    md3 = compute_min_dist(dataset3)
    md4 = compute_min_dist(dataset4)
end

  0.276913 seconds (4.00 M allocations: 426.822 MiB, 23.81% gc time)


2.302077920504382

In our serial code above, the minimum distance is computed for the first dataset, then for the second, then for the third.

Now let's see how to use multi-threading to have these run in parallel!

### Multi-Threading in Action: Syntax

We can use the `Threads.@spawn` macro which is as follows:

- `t = Threads.@spawn exp` creates a task `t` that will evaluate the expression `exp` using an available thread
- In order to retrieve the value given by `exp`, we need to use `fetch(t)`

In [8]:
@time begin
    
    # spawning threads
    t1 = Threads.@spawn compute_min_dist(dataset1)
    t2 = Threads.@spawn compute_min_dist(dataset2)
    t3 = Threads.@spawn compute_min_dist(dataset3)
    t4 = Threads.@spawn compute_min_dist(dataset4)

    # fetching results
    p_md1 = fetch(t1)
    p_md2 = fetch(t2)
    p_md3 = fetch(t3)
    p_md4 = fetch(t4)
end

@assert isapprox(md1, p_md1)
@assert isapprox(md2, p_md2)
@assert isapprox(md3, p_md3)
@assert isapprox(md4, p_md4)

  0.087556 seconds (4.04 M allocations: 428.862 MiB, 14.49% gc time, 67.83% compilation time)


**Questions**

- What is the speed increase? Is it proportioal to the # of threads?
- Why or why not? Does `@time` tell us anything helpful here?

### Multi-Threading a `for` Loop

You can also use the `Threads` package to automatically run `for` loops with multi-threading.

```julia
Threads.@threads for i=1:n
    x[i] = myfunc(i)
end
```

- this will automatically create tasks to run
- there is another syntax we can use, but let's see it later

We'll recreate the example above, but with 25 datasets.

In [9]:
num_datasets = 25
n = 1000
d = 40

# slow -- set this true if you want compute_min_dist to be artificially challenging
slow = false

# generate many datasets
datasets = [ [randn(d) for i=1:n] for k=1:num_datasets ]

# initialize the array of results
md_val = zeros(num_datasets)
p_md_val = zeros(num_datasets)

# run the serial code
@time begin
    for k=1:num_datasets
       md_val[k] = compute_min_dist(datasets[k]; slow=slow) 
    end
end

# run the parallel code
@time begin
    Threads.@threads for k=1:num_datasets
       p_md_val[k] = compute_min_dist(datasets[k]; slow=slow) 
    end
end

# check the same answer
@assert all(isapprox.(md_val, p_md_val))

  1.320962 seconds (24.98 M allocations: 4.652 GiB, 17.29% gc time, 1.67% compilation time)
  0.714050 seconds (25.16 M allocations: 4.661 GiB, 16.59% gc time, 32.76% compilation time)


Questions

- What is the speed up now? 
- Is this close or far from optimal, given the number of threads?
- Why is this happening?
- What changes if we set `slow = true`? Why?

Note: there is nothing to "fetch" here because the expression `p_md_vals[k] = compute_min_dist(datasets[k])` does not itself have an output.

## Main Exercise: Monte Carlo Estimation

In this exercise, I want you to use the `Threads` package to paralleize a simple Monte Carlo estimation.

- If $(X,Y) \sim \textrm{Unif}[0,1]$ then $\Pr( X^2 + Y^2 \leq 1 ) = \frac{\pi}{4}$ *why?*
- So, we can use Monte Carlo sampling (i.e. law of large numbers) to estimate $\pi$ by the following:
  - Sample $\{X_i, Y_i\}_{i=1}^n \sim \textrm{Unif}[0,1]$
  - Compute the estimate $\hat{p} = \frac{1}{n} \sum_{i=1}^n \mathbf{1}\{X_i^2 + Y_i^2 \leq 1\}$
  - Compute an estimate for $\pi$ via $\hat{\pi} = 4 \hat{p}$

The following serial code will run this. Your goal is to parallelize it, and time the difference.

In [10]:
function mc_estimate_pi(num_samples)
    count = 0
    for i=1:num_samples
        x,y = rand(2)
        if x^2 + y^2 <= 1
            count += 1
        end
    end
    return 4 * (count / num_samples)
end

mc_estimate_pi (generic function with 1 method)

In [11]:
num_sample_vec = [10, 100, 1000, 100_000, 10_000_000, 100_000_000]

for num_samples in num_sample_vec
    elap_time = @elapsed pi_est = mc_estimate_pi(num_samples)
    println("Num Samples: $num_samples \t Time: $elap_time \t Estimated Pi: $pi_est")
end

Num Samples: 10 	 Time: 0.057777025 	 Estimated Pi: 2.8
Num Samples: 100 	 Time: 6.079e-6 	 Estimated Pi: 3.24
Num Samples: 1000 	 Time: 2.9875e-5 	 Estimated Pi: 3.188
Num Samples: 100000 	 Time: 0.003114607 	 Estimated Pi: 3.13384
Num Samples: 10000000 	 Time: 0.346879577 	 Estimated Pi: 3.1418496
Num Samples: 100000000 	 Time: 3.356312932 	 Estimated Pi: 3.14159376


Now it's your turn - implement a multi-threaded version of this and time the difference!

## Possible Strategies

Unfortunately, class ended right about here and I waved my hands a bit.
I want to highlight a few possible strategies that may or may not have worked.

### Naive Implementation: Race Conditions

The most naive implementation has a race condition.
We'll see what that means next (and how to prevent it!), but I just want to highlight that it will result in severely underestimating $\pi$ *due to memory over-write issues*

In [12]:
function multi_mc_estimate_pi_bad1(num_samples)
    count = 0
    Threads.@threads for i=1:num_samples
        x,y = rand(2)
        if x^2 + y^2 <= 1
            count += 1 # race condition ! overwriting shared memory
        end
    end
    return 4 * (count / num_samples)
end

num_sample_vec = [10, 100, 1000, 100_000, 10_000_000, 100_000_000]

for num_samples in num_sample_vec
    elap_time = @elapsed pi_est = multi_mc_estimate_pi_bad1(num_samples)
    println("Num Samples: $num_samples \t Time: $elap_time \t Estimated Pi: $pi_est")
end

Num Samples: 10 	 Time: 0.109941842 	 Estimated Pi: 3.2
Num Samples: 100 	 Time: 7.7727e-5 	 Estimated Pi: 3.12
Num Samples: 1000 	 Time: 0.000115161 	 Estimated Pi: 2.184
Num Samples: 100000 	 Time: 0.004403535 	 Estimated Pi: 0.98616
Num Samples: 10000000 	 Time: 0.45870064 	 Estimated Pi: 0.9367492
Num Samples: 100000000 	 Time: 4.918535244 	 Estimated Pi: 0.8907236


The above shows us that we are severely underestimating $\pi$. See more below on what is a race condition and why this is happening: `hint`: closely examine the shared variable `count`

### Safe Implementation

Here's another implementation which avoids the race conditions:

- Create a shared array `events` for the 0/1 event of whether $X^2 + Y^2 \leq 1$
- Each task writes to the shared array in parallel (no race conditions)
- We sum the array at the end

In [13]:
function multi_mc_estimate_pi_bad2(num_samples)
    event_array = zeros(num_samples)
    Threads.@threads for i=1:num_samples
        x,y = rand(2)
        event_array[i] = x^2 + y^2 <= 1
    end
    return 4 * (sum(event_array) / num_samples)
end

num_sample_vec = [10, 100, 1000, 100_000, 10_000_000, 100_000_000]

for num_samples in num_sample_vec
    elap_time = @elapsed pi_est = multi_mc_estimate_pi_bad2(num_samples)
    println("Num Samples: $num_samples \t Time: $elap_time \t Estimated Pi: $pi_est")
end

Num Samples: 10 	 Time: 0.142836314 	 Estimated Pi: 2.4
Num Samples: 100 	 Time: 6.7688e-5 	 Estimated Pi: 3.2
Num Samples: 1000 	 Time: 9.252e-5 	 Estimated Pi: 3.172
Num Samples: 100000 	 Time: 0.005108917 	 Estimated Pi: 3.14108
Num Samples: 10000000 	 Time: 0.268595659 	 Estimated Pi: 3.1408504
Num Samples: 100000000 	 Time: 2.169292747 	 Estimated Pi: 3.14180504


What do we observed

- We've fixed the race condition, so the estimated $\pi$ is correct.
- While we are seeing a little bit of speed-up here for larger `num_samples`, it is not very significant.

The reason we don't have super significant speed up is that these computational tasks aren't that demanding relative to the start-up cost.

- **Exercise**: Does increasing `num_samples` yield a better speed up? What computational costs might this have?

## Pitfalls of Multi-Threading: Race Conditions

Let's demonstrate a common pitfall of multi-threading known as *race conditions*.

Let's start by seeing how we might parallelize a sum.

In [14]:
function sum_single(a)
    sum_val = 0
    for x in a
        sum_val += x
    end
    return sum_val
end

sum_single (generic function with 1 method)

In [15]:
a = collect(1:100)
sum_single(a)

5050

Okay, great - how let's try to just apply the `Threads.@threads` macro to parallelize this loop

In [16]:
function sum_multi_bad(a)
    sum_val = 0
    Threads.@threads for x in a
        sum_val += x
    end
    return sum_val
end

sum_multi_bad (generic function with 1 method)

In [17]:
a = collect(1:100)
sum_multi_bad(a)

4791

This code is not returning the right answer! And what's worse is that it returns a different answer every time I run it! **What the heck!**

The **problem** is that we're abusing the *shared memory*

- Each thread has read / write access to the variable `sum_val`
- Thread A goes to read `sum_val` and then computes `y = sum_val + x`
- At the same time, Thread B reads `sum_val` and then computes `y' = sum_val + x'`
- Then Thread A writes `sum_val = y`
- Then Thread B overwrites `sum_val = y'`
- And it's as if the addition of `x` didn't even happen.

This is called a *race condition*. It was the reason that x-ray machine was killing people.

We have two options to avoid race conditions:

- **Lock Mechanisms**: We could place additional "locks" on the variable `sum_val` so race conditions don't occur. But, this kind of defeats the purpose of parallelism here.
- **Separate Data Buffers**: We could split the data up, have each thread work on it's own data, and then merge the answers together again.

We're going to focus on the second approach. 
But to do this, we'll need to take a detour.

## Detour: Map and Iterators

Let me show off a few things about Julia here, which will make implementing separate data buffers much easier.

### The `map` function

The `map` function exists is many functional programming languages.
It has two inputs `map(f, x)` which applies the function `f` to every element in `x` and returns the answer.
This is, by definition, *embarassingly parallel*.
Let's see how it works.

In [18]:
x = [1, 4, 9, 16]

map(sqrt, x)

4-element Vector{Float64}:
 1.0
 2.0
 3.0
 4.0

Sometimes, the function `f` you want to apply isn't pre-made. 
So, you have to make it. 
You can write an *anonymous function* like this:

In [19]:
x = [1, 2, 3, 4]

map(t->t^2, x)

4-element Vector{Int64}:
  1
  4
  9
 16

You can read this like "the function `f` takes in `t` and outputs `t^2`".
This allows you to write your own functions here without explicitly naming them.

In [20]:
x = [1, 2, 3, 4]

map(t->t^2 + 4*t - sin(t), x)

4-element Vector{Float64}:
  4.158529015192103
 11.090702573174319
 20.85887999194013
 32.75680249530793

But writing this notation can be really annoying and awkward, especially if your function `f` naturally has multiple lines.
What do you do in this case?
Well, there is an alternative specification for `map`.

It looks like this:

In [21]:
x = [1,2,3,4,5]

y = map(x) do t
    return sqrt(t)
end

5-element Vector{Float64}:
 1.0
 1.4142135623730951
 1.7320508075688772
 2.0
 2.23606797749979

Here's the same style of `map` call but with a more complex `f`

In [22]:
x = [1, 5, 9, 12, 43, 29]

y = map(x) do t
    # collatz conjecture
    if iseven(t)
        return div(t,2)
    else
        return 3 * t + 1
    end
end

6-element Vector{Int64}:
   4
  16
  28
   6
 130
  88

Note that `map` calls are *embarassingly parallel*, which makes them especially good for multi-threading.

### Another detour: reduce and mapreduce

The `reduce` function is a cousin to `map`.

```julia
reduce(op, itr; init)

```
- apply the binary operation `op` successively to elements in iterable `itr`
- the optional argument `init` is the initial value

So, for example:

In [23]:
x = [1,2,3,4,5]

reduce(+, x; init=0)

15

In [24]:
reduce(*, x; init=1)

120

A **very common** and **very parallelizable** type of operation is to combine `map` and `reduce`.
This is called `mapreduce`.

```julia
mapreduce(f, op, itr; init)

```
- compute `x = map(f, itr)`
- return `y = reduce(op, x; init)`

In [25]:
x = [1,2,3,4,5]

mapreduce(x->x^2, +, x; init=0)

55

If we have time, I will show you how `mapreduce` can be implemented in Distributed Memory model of parallel programming using Julia.

### Final detour: partition iterator

The last thing I want to show you is an iterator construction.
There is a fancy way to iterate over partitions of an array of a fixed size.

It's easier to just show you

In [26]:
a = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

# this is an iterator, it doesn't create new memory
chunks = Iterators.partition(a, 3)

# I am going to iterate now
for chunk in chunks
    println(chunk)
end

[1, 2, 3]
[4, 5, 6]
[7, 8, 9]
[10]


And one last thing, we can use the function `cld` to compute `ceil( x / y)`.

In [27]:
cld(100, 6)

17

This will be helpful for computing one chunk for each of our threads!

## Back to Multi-threading: Data Buffer

So now let's see how to use `Iterations.partition` and `map` together to create a data buffer for multi-tasking

In [28]:
function sum_multi_good(a)
   chunks = Iterators.partition(a, cld(length(a), Threads.nthreads()))
   tasks = map(chunks) do chunk
       Threads.@spawn sum_single(chunk)
   end
   chunk_sums = fetch.(tasks)
   return sum(chunk_sums)
end

sum_multi_good (generic function with 1 method)

**Question**: Break this code down line-by-line and explain it

1. What is the code doing?
2. How does this avoid the race condition?

In [29]:
a = collect(1:100)

sum_multi_good(a)

5050

Yay - it's working!

But let us make a sobering remark

In [30]:
@time sum(a)
@time sum_multi_good(a)

  0.000017 seconds (1 allocation: 16 bytes)
  0.000203 seconds (39 allocations: 2.266 KiB)


5050

The overhead here for parallelization was not worth it.

- The built-in `sum` function is super fast
- Re-writing a fast function and parallelizing it is typically a waste of time
- You only want to parallelize over functions that take a long time to compute

So, this example really served to demonstrate race conditions.
Trying to re-implement `sum` is not advisable.

## Final Thoughts

Let me make a few remarks:

- When you are multi-threading (or coding anything, really) -- **don't re-invent the wheel**
- You can often check to see if someone has a solution for the problem you have (not on HW though)
- For example, `OhMyThreads.jl` has higher level support for threaded mapreduce and more