# Multithreading

By default, Julia starts with a single thread. We must tell it explicitly to start multiple threads.

#### Environmental variable

On Linux/MacOS:

```bash
export JULIA_NUM_THREADS=4
```

On Windows:

```bash
set JULIA_NUM_THREADS=4
```

Afterwards start julia.

#### Command line argument

```bash
julia -t 4
```

or

```bash
julia --threads 4
```

#### Jupyter kernel

You can also create a *Jupyter kernel* for multithreaded Julia:

```julia
using IJulia
installkernel("Julia (4 threads)", env=Dict("JULIA_NUM_THREADS"=>"4"))
```

We can readily check how many threads we are running:

In [3]:
Threads.nthreads()

1

In [2]:
using IJulia
installkernel("Julia (4 threads)", env=Dict("JULIA_NUM_THREADS"=>"4"))

┌ Info: Installing Julia (4 threads) kernelspec in /home/eric/.local/share/jupyter/kernels/julia-(4-threads)-1.7
└ @ IJulia /home/eric/.julia/packages/IJulia/AQu2H/deps/kspec.jl:94


"/home/eric/.local/share/jupyter/kernels/julia-(4-threads)-1.7"

## `Threads.@spawn`

The `Threads.@spawn` macro is very similar to the `Distributed.@spawn` macro. However, instead of spawning tasks on different worker processes (potentially on different machines) it spawns tasks on different threads. Basically, it creates (and immediately returns) a `Task` and puts it onto a todo-list. The scheduler will then dynamically assign these tasks to threads.

Let's import everything we need from `Base.Threads`.

In [4]:
import Base.Threads: @spawn, nthreads, threadid

In [5]:
@spawn println("test")

test


Task (done) @0x00007f84c400cd00

While `Threads.@spawn` returns the task right away, we might have to wait until the task is done and then fetch our result (again, just as for `Distributed.@spawn`).

In [6]:
t = @spawn begin sleep(3); return 4 end # returns right away
@time fetch(t) # we need to wait until the task is done

  3.014648 seconds (4.83 k allocations: 270.199 KiB, 0.43% compilation time)


4

Note that we can use (some of) the control flow tools that we've already seen:

In [7]:
@sync t = @spawn begin sleep(3); return 4 end # only returns the task once it is done
@time fetch(t) # no need to wait, the task is already done

  0.000004 seconds


4

Of course, we can spawn many tasks in a loop.

In [8]:
for i in 1:2*nthreads()
    @spawn println("Hi, I'm ", threadid())
end

Hi, I'm 1
Hi, I'm 1


Note that the tasks are **dynamically scheduled**. Some threads do more work (more values of i) than others!

## `@threads`

Roughly speaking, the macro `Threads.@threads` is for multithreading (threads) what `Distributed.@distributed` was for distributed computing (processes).

In [9]:
import Base.Threads: @threads

In [10]:
@threads for i in 1:2*nthreads()
    println("Hi, I'm ", threadid())
end

Hi, I'm 1
Hi, I'm 1


In contrast to `@spawn` above, `@threads` is **statically scheduled**. The iteration range of the for loop is divided equally between the threads. Every thread handles precisely two iterations!

### Where is my `pmap` pendant?

Currently, it doesn't exist in base Julia but only in packages such as [ThreadPools.jl](https://github.com/tro3/ThreadPools.jl) or [ThreadsX.jl](https://github.com/tkf/ThreadsX.jl). However, conceptually, we can just implement it as follows:

In [2]:
tmap(fn, itr) = map(fetch, map(i -> Threads.@spawn(fn(i)), itr))

tmap (generic function with 1 method)

In [3]:
tmap(i -> println(i, " ($(threadid()))"), 1:10);

7 (1)
8 (4)
1 (2)
6 (5)
4 (3)
9 (6)
5 (2)
2 (2)
10 (2)
3 (2)


## `@threads` vs `@spawn` / `tmap`

#### `@threads`

* rather lightweight, i.e. small overhead
* **statically scheduled**
* good for uniform workload

#### `@spawn` / `tmap`

* more overhead
* **dynamically scheduled**
* handles non-uniform workloads well
* efficient nesting

# Example: filling an array in parallel

Our goal ist to fill an array in parallel using `@threads` and `@spawn`, respectively.

Note that while we had to use things like `SharedArrays` in the case of distributed computing, threads share the same memory! We also don't have to use constructs like `@everywhere` since everything is running in the same julia process.

In [None]:
import Base.Threads: @threads, @spawn, nthreads, threadid

In [None]:
function fill_array_threads(a)
    @threads for i in 1:length(a)
        a[i] = threadid()
    end
    return a
end

In [None]:
a = zeros(nthreads()*10);

In [None]:
fill_array_threads(a)

And here is a version using `Threads.@spawn`.

In [None]:
function fill_array_spawn(a)
    @sync for i in 1:length(a)
        @spawn(a[i] = threadid())
    end
    return a
end

In [None]:
a = zeros(nthreads()*10);
fill_array_spawn(a)

Let's benchmark both variants!

In [None]:
using BenchmarkTools
@btime fill_array_threads(x) samples=10 setup=(x = zeros(nthreads()*10));
@btime fill_array_spawn(x) samples=10 setup=(x = zeros(nthreads()*10));

As we can see, the `@threads` based implementation is much faster. However, this shouldn't be surprising, given that our tasks are lightweight and uniform.

## Non-uniform workload

In [None]:
using LinearAlgebra

In [None]:
function fill_array_threads_nonuniform(a)
    @threads for i in 1:length(a)
        # calculate the squared magnitude of random vectors growing cubically in size
        a[i] = sum(abs2, rand() for j in 1:(i^3))
    end
    return a
end

In [None]:
a = zeros(nthreads()*10);

In [None]:
fill_array_threads_nonuniform(a);

In [None]:
function fill_array_spawn_nonuniform(a)
    @sync for i in 1:length(a)
        Threads.@spawn(a[i] = sum(abs2, rand() for j in 1:(i^3)))
    end
    return a
end

In [None]:
fill_array_spawn_nonuniform(a);

In [None]:
using BenchmarkTools
@btime fill_array_threads_nonuniform(x) samples=10 setup=(x = zeros(nthreads()*10));
@btime fill_array_spawn_nonuniform(x) samples=10 setup=(x = zeros(nthreads()*10));

We see that compared to above, the result has flipped: the `@spawn` implementation, due to it's dynamic scheduling, is more efficient.

# Threads require more care: parallel summation (naive)

In [None]:
function mysum(xs)
    s = zero(eltype(xs))
    for x in xs
        s += x
    end
    return s
end

In [None]:
function mysum_threaded_naive(xs)
    s = zero(eltype(xs))
    @threads for x in xs
        s += x
    end
    return s
end

In [None]:
xs = rand(100_000);

In [None]:
@show sum(xs);
@show mysum(xs);
@show mysum_threaded_naive(xs);

## Parallel summation (divide the work)

In [None]:
function mysum_threaded(xs)
    b = ceil(Int, length(xs)/nthreads())
    map(sub_xs -> Threads.@spawn(sum(sub_xs)), Iterators.partition(xs, b)) .|> fetch |> sum
end

In [None]:
@show sum(xs);
@show mysum(xs);
@show mysum_threaded(xs);

In [None]:
using BenchmarkTools
@btime mysum($xs);
@btime mysum_threaded($xs);

(Alternatively, one can use [Atomic Operations](https://docs.julialang.org/en/v1/manual/multi-threading/#Atomic-Operations))