# Multithreading

By default, Julia starts with a single thread. We must tell it explicitly to start multiple threads.

#### Environmental variable

On Linux/MacOS:

```bash
export JULIA_NUM_THREADS=4
```

On Windows:

```bash
set JULIA_NUM_THREADS=4
```

Afterwards start julia.

#### Command line argument

```bash
julia -t 4
```

or

```bash
julia --threads 4
```

#### Jupyter kernel

You can also create a *Jupyter kernel* for multithreaded Julia:

```julia
using IJulia
installkernel("Julia (4 threads)", env=Dict("JULIA_NUM_THREADS"=>"4"))
```

We can readily check how many threads we are running:

In [1]:
Threads.nthreads()

6

Note that, in contrast to our distributed computing discussion, we will only consider only a single process:

In [3]:
import Distributed: nprocs

nprocs()

1

## `Threads.@spawn`

The `Threads.@spawn` macro is very similar to the `Distributed.@spawn` macro. However, instead of spawning tasks on different worker processes (potentially on different machines) it spawns tasks on different threads. Basically, it creates (and immediately returns) a `Task` and puts it onto a todo-list. The scheduler will then dynamically assign these tasks to threads.

Let's import everything we need from `Base.Threads`.

In [1]:
import Base.Threads: @spawn, nthreads, threadid

In [116]:
@spawn println("test")

Task (runnable) @0x0000000128e6df90

test


While `Threads.@spawn` returns the task right away, we might have to wait until the task is done and then fetch our result (again, just as for `Distributed.@spawn`).

In [123]:
t = @spawn begin sleep(3); return 4 end # returns right away
@time fetch(t) # we need to wait until the task is done

  2.994932 seconds (95 allocations: 2.906 KiB)


4

Note that we can use (some of) the control flow tools that we've already seen:

In [124]:
@sync t = @spawn begin sleep(3); return 4 end # only returns the task once it is done
@time fetch(t) # no need to wait, the task is already done

  0.000001 seconds


4

Of course, we can spawn many tasks in a loop.

In [125]:
for i in 1:2*nthreads()
    @spawn println("Hi, I'm ", threadid())
end

Hi, I'm 2
Hi, I'm 5
Hi, I'm 2
Hi, I'm 3
Hi, I'm 4
Hi, I'm 2
Hi, I'm 6
Hi, I'm 2
Hi, I'm 2
Hi, I'm 1
Hi, I'm 1
Hi, I'm 1


Note that the tasks are **dynamically scheduled**. Some threads do more work (more values of i) than others!

## `@threads`

Roughly speaking, the macro `Threads.@threads` is for multithreading (threads) what `Distributed.@distributed` was for distributed computing (processes).

In [11]:
import Base.Threads: @threads

In [14]:
@threads for i in 1:2*nthreads()
    println("Hi, I'm ", threadid())
end

Hi, I'm 1
Hi, I'm 5
Hi, I'm 4
Hi, I'm 5
Hi, I'm 3
Hi, I'm 4
Hi, I'm 6
Hi, I'm 2
Hi, I'm 3
Hi, I'm 6
Hi, I'm 2
Hi, I'm 1


In contrast to `@spawn` above, `@threads` is **statically scheduled**. The iteration range of the for loop is divided equally between the threads. Every thread handles precisely two iterations!

(In a future version of Julia, there will likely be other scheduling policies, for example a `:dynamic` similar to the behavior of `@spawn`.)

In [None]:
# @threads :static for i in 1:2*nthreads()
#     println("Hi, I'm ", threadid())
# end

### Where is my `pmap` pendant?

Currently, it doesn't exist. But, conceptually, we can implement it as follows:

In [50]:
tmap(fn, itr) = map(fetch, map(i -> Threads.@spawn(fn(i)), itr))

tmap (generic function with 1 method)

In [57]:
tmap(i -> println(i, " ($(threadid()))"), 1:10);

1 (2)
9 (4)
5 (6)
6 (5)
2 (3)
10 (5)
3 (2)
8 (5)
7 (5)
4 (1)


Note, however, that this implementation creates temporary allocations.

## `@threads` vs `@spawn`

#### `@threads`

* rather lightweight, i.e. small overhead
* **statically scheduled**
* good for uniform workload

#### `@spawn`

* more overhead
* **dynamically scheduled**
* handles non-uniform workloads well
* efficient nesting

# Example: filling an array in parallel

Our goal ist to fill an array in parallel using `@threads` and `@spawn`, respectively.

Note that while we had to use things like `SharedArrays` in the case of distributed computing, threads share the same memory! We also don't have to use constructs like `@everywhere` since everything is running in the same julia process.

In [1]:
import Base.Threads: @threads, @spawn, nthreads, threadid

In [2]:
function fill_array_threads(a)
    @threads for i in 1:length(a)
        a[i] = threadid()
    end
    return a
end

fill_array_threads (generic function with 1 method)

In [3]:
a = zeros(nthreads()*10);

In [4]:
fill_array_threads(a)

60-element Array{Float64,1}:
 1.0
 1.0
 1.0
 1.0
 1.0
 1.0
 1.0
 1.0
 1.0
 1.0
 2.0
 2.0
 2.0
 ⋮
 5.0
 5.0
 6.0
 6.0
 6.0
 6.0
 6.0
 6.0
 6.0
 6.0
 6.0
 6.0

And here is a version using `Threads.@spawn`.

In [37]:
function fill_array_threads(a)
    @sync for i in 1:length(a)
        @spawn(a[i] = threadid())
    end
    return a
end

fill_array_threads (generic function with 1 method)

In [40]:
a = zeros(nthreads()*10);
fill_array_threads(a)

60-element Array{Float64,1}:
 2.0
 3.0
 3.0
 2.0
 3.0
 4.0
 3.0
 5.0
 3.0
 4.0
 2.0
 2.0
 5.0
 ⋮
 3.0
 6.0
 4.0
 2.0
 4.0
 3.0
 4.0
 6.0
 5.0
 3.0
 4.0
 4.0

Let's benchmark both variants!

In [42]:
using BenchmarkTools
@btime fill_array_threads(x) samples=10 setup=(x = zeros(nthreads()*10));
@btime fill_array_spawn(x) samples=10 setup=(x = zeros(nthreads()*10));

  47.508 μs (376 allocations: 43.70 KiB)
  2.906 ms (1300 allocations: 2.97 MiB)


As we can see, the `@threads` based implementation is much faster. However, this shouldn't be surprising, given that our tasks are lightweight and uniform.

## Non-uniform workload

In [54]:
using LinearAlgebra

In [86]:
function fill_array_threads_nonuniform(a)
    @threads for i in 1:length(a)
        a[i] = norm(rand(i^3))
    end
    return a
end

fill_array_threads_nonuniform (generic function with 1 method)

In [87]:
a = zeros(nthreads()*10);

In [88]:
fill_array_threads_nonuniform(a);

In [89]:
function fill_array_spawn_nonuniform(a)
    @sync for i in 1:length(a)
        Threads.@spawn(a[i] = norm(rand(i^3)))
    end
    return a
end

fill_array_spawn_nonuniform (generic function with 1 method)

In [90]:
fill_array_spawn_nonuniform(a);

In [91]:
using BenchmarkTools
@btime fill_array_threads_nonuniform(x) samples=10 setup=(x = zeros(nthreads()*10));
@btime fill_array_spawn_nonuniform(x) samples=10 setup=(x = zeros(nthreads()*10));

  6.214 ms (143 allocations: 25.56 MiB)
  3.691 ms (545 allocations: 25.60 MiB)


We see that compared to above, the result has flipped: the `@spawn` implementation, due to it's dynamic scheduling, is more efficient.

# Threads require more care: parallel summation (naive)

In [99]:
function mysum(xs)
    s = zero(eltype(xs))
    for x in xs
        s += x
    end
    return s
end

mysum (generic function with 1 method)

In [100]:
function mysum_threaded_naive(xs)
    s = zero(eltype(xs))
    @threads for x in xs
        s += x
    end
    return s
end

mysum_threaded_naive (generic function with 1 method)

In [101]:
xs = rand(100_000);

In [102]:
@show sum(xs);
@show mysum(xs);
@show mysum_threaded_naive(xs);

sum(xs) = 49947.95658092538
mysum(xs) = 49947.95658092537
mysum_threaded_naive(xs) = 8712.984662727036


## Parallel summation (divide the work)

In [103]:
function mysum_threaded(xs)
    b = ceil(Int, length(xs)/nthreads())
    map(sub_xs -> Threads.@spawn(sum(sub_xs)), Iterators.partition(xs, b)) .|> fetch |> sum
end

mysum_threaded (generic function with 1 method)

In [108]:
# function mysum_threaded_alternative(xs)
#     b = ceil(Int, length(xs)/nthreads())
#     tasks = [Threads.@spawn(sum(subx)) for subx in Iterators.partition(xs, b)]
#     return sum(fetch(t) for t in tasks)
# end

In [105]:
@show sum(xs);
@show mysum(xs);
@show mysum_threaded(xs);

sum(xs) = 49947.95658092538
mysum(xs) = 49947.95658092537
mysum_threaded(xs) = 49947.95658092538
mysum_threaded_loop(xs) = 49947.95658092538


In [107]:
using BenchmarkTools
@btime mysum($xs);
@btime mysum_threaded($xs);

  97.828 μs (0 allocations: 0 bytes)
  6.685 μs (48 allocations: 4.76 KiB)
  7.757 μs (48 allocations: 4.61 KiB)


## Parallel summation (atomics)

In [18]:
import Base.Threads: Atomic, atomic_add!

function mysum_threaded_atomics(xs)
    s = Atomic{eltype(xs)}(zero(eltype(xs)))
    @threads for x in xs
        atomic_add!(s, x)
    end
    return s[]
end

mysum_threaded_atomics (generic function with 1 method)

In [19]:
@show mysum(xs);
@show mysum_threaded_atomics(xs);

mysum(xs) = 50104.088123258254
mysum_threaded_atomics(xs) = 50104.088123258334


In [20]:
@btime mysum(xs);
@btime mysum_threaded_atomics(xs);
@btime mysum_threaded(xs);

  97.846 μs (1 allocation: 16 bytes)
  4.821 ms (33 allocations: 4.19 KiB)
  7.360 μs (48 allocations: 4.76 KiB)


See [Atomic Operations](https://docs.julialang.org/en/v1/manual/parallel-computing/#Atomic-Operations-1) in the Julia doc for more information.

## Nesting

Spawning of tasks can also be nested.

In [6]:
function threaded_fun()
    x = threadid()
    @spawn println("job1", " (spawned from $x, processed by $(threadid()))")
    @spawn println("job2", " (spawned from $x, processed by $(threadid()))")
    @spawn println("job3", " (spawned from $x, processed by $(threadid()))")
end

for i in 1:nthreads()
    @spawn threaded_fun()
end

job2 (spawned from 2, processed by 2)
job1 (spawned from 4, processed by 4)
job3 (spawned from 4, processed by 5)
job1 (spawned from 3, processed by 4)
job1 (spawned from 2, processed by 3)
job2 (spawned from 4, processed by 6)
job3 (spawned from 2, processed by 4)
job3 (spawned from 3, processed by 6)
job3 (spawned from 5, processed by 4)
job2 (spawned from 3, processed by 6)
job2 (spawned from 5, processed by 5)
job1 (spawned from 5, processed by 2)
job3 (spawned from 1, processed by 5)
job1 (spawned from 6, processed by 2)
job1 (spawned from 1, processed by 5)
job2 (spawned from 6, processed by 6)
job2 (spawned from 1, processed by 6)
job3 (spawned from 6, processed by 1)


In [7]:
function threaded_fun2()
    x = threadid()
    @threads for i in 1:3
        println("job$i", " (spawned from $x, processed by $(threadid()))")
    end
end

@threads for i in 1:nthreads()
    threaded_fun2()
end

job1 (spawned from 6, processed by 6)
job2 (spawned from 6, processed by 6)
job3 (spawned from 6, processed by 6)
job1 (spawned from 2, processed by 2)
job2 (spawned from 2, processed by 2)
job3 (spawned from 2, processed by 2)
job1 (spawned from 5, processed by 5)
job2 (spawned from 5, processed by 5)
job3 (spawned from 5, processed by 5)
job1 (spawned from 1, processed by 1)
job2 (spawned from 1, processed by 1)
job3 (spawned from 1, processed by 1)
job1 (spawned from 4, processed by 4)
job2 (spawned from 4, processed by 4)
job3 (spawned from 4, processed by 4)
job1 (spawned from 3, processed by 3)
job2 (spawned from 3, processed by 3)
job3 (spawned from 3, processed by 3)
