[![Binder](https://mybinder.org/badge_logo.svg)](https://mybinder.org/v2/gh/jolin-io/fall-in-love-with-julia/main?filepath=12%20concurrent%20programming%20-%2001%20introduction.ipynb)

<a href="https://www.jolin.io" target="_blank" rel="noreferrer noopener">
<img src="https://www.jolin.io/assets/Jolin/Jolin-Banner-Website-v1.1-darkmode.webp">
</a>

# Fall-in-love-with-Julia: Concurrent Programming in Julia 101

an introduction session

I am Stephan Sahm, and today we are going to learn all about meta.

1. Asyncronous programming 🚲
2. Multi-threading 🚘
3. Parallel computing 🚀

# Asyncronous programming 🚲

Compute concurrently on the same thread.

`@async`

In [None]:
using Dates

In [None]:
function countdown(from=10)
    @async begin
        for i in from:-1:0
            println("countdown $(i)")
            sleep(1)
        end
    end
end

In [None]:
countdown()

In [None]:
rand(10)

In [None]:
function sleep_blocking(seconds)
    now = Dates.now()
    until = now + Dates.Second(seconds)

    while now < until
        now = Dates.now()
    end
end

In [None]:
countdown(5)
sleep_blocking(3)
println("awake")

In [None]:
countdown(5)
sleep(3)
println("awake")

### your space 😎

👉 Print the current time in addition to the countdown value to see that execution is really stopped.

In [None]:
# your space

👉 What would happen if we used `sleep_blocking` in the async task? Try it

In [None]:
# your space

Behind the scenes Julia is running a scheduler which takes care of switching between tasks.

> 🪧 Now you understand, why `sleep` may actually sleep much longer than you specified. `sleep` returns control to the scheduler and if another task becomes active inbetween, there is no other way, but to wait until the scheduler regains control.

## `wait` and `fetch`

beside `sleep` there are other ways to organise green threads

In [None]:
task = countdown()

In [None]:
wait(task)
println("done")

In [None]:
task = countdown()
result = fetch(task)
@show result

### your space 🤓

👉 Change the return value of your asyncronous countdown task

In [None]:
# your space

### `@sync`
if you directly want to wait on multiple `@async` calls, you can use `@sync`

In [None]:
function countdown_blocking(id, from=10)
    println("begin countdown $id")
    wait(countdown(from))
    println("end countdown $id")
end

In [None]:
@sync begin
    @async countdown_blocking("from 4", 4)
    
    @async countdown_blocking("from 6", 6)
    
    @async countdown_blocking("from 2", 2)
end
println("all done") 

### `notify` & `Condition`

In [None]:
cond = Condition()  # think of this as a trigger

In [None]:
@async begin
    # this will block until we trigger the Condition
    wait(cond)
    println("triggered $(Dates.now())")
end

In [None]:
notify(cond)

> 🪧 Condition == rerunning the `wait` will wait again

### `notify` & `Event`

In [None]:
event = Base.Event()  # think of this as a state

In [None]:
@async begin
    # this will block until the Event is set to "on"
    wait(event)
    println("triggered $(Dates.now())")
end

In [None]:
notify(event)  # set state to "on"

> 🪧 Event == rerunning the `wait` will succeed immediately (until Event is reset) 

In [None]:
reset(event)  # set state to "off"

## `Channel` & `take!` & `put!`

A `Channel` is *the* mean of communication between concurrent code (holds true for all kinds of concurrent programming).

In [None]:
function create_countdown_channel(from=10)
    Channel(4) do ch
        for i in from:-1:0
            sleep(1)
            put!(ch, i)
        end
    end
end

In [None]:
channel_countdown = create_countdown_channel()

In [None]:
while isopen(channel_countdown)
    i = take!(channel_countdown)
    println("countdown $i")
end

In [None]:
for i in channel_countdown
    println("countdown $i")
end

> 🪧 Channel() will pass only a single message at a time. `put!` 

> 🪧 Channel(4) will buffer 4 elements in the channel itself.

### your space 😎

👉 Change the number of buffered elements. What do you think will change?

In [None]:
# your space

👉 Exchange the `sleep` and `put!`. Can you guess what will change?

In [None]:
# your space

# Multi-threading 🚘

Use `Threads.@spawn` instead of `@async`

⚠️ be careful to write threadsafe code ⚠️

> Warning from Julia documentation: "Multi-threaded programming has many inherent difficulties, and if a program using threads exhibits unusual or undesirable behavior (e.g. crashes or mysterious results), thread interactions should typically be suspected first."

👉 Why is multi-threading more dangerous than asynchronous programming?

In [None]:
# It is not possible in Julia to add threads within a running Julia process. 
# It needs to be done on the command line.
Threads.nthreads()

In [None]:
Threads.threadid()

In [None]:
function threads_countdown(from=10)
    Threads.@spawn begin
        Core.println(Threads.threadid())
        for i in from:-1:0
            Core.println("countdown $(i) $(Dates.now())")
            sleep_blocking(1)
        end
    end
end

In [None]:
println(Dates.now())
threads_countdown(5)
sleep_blocking(3)
println("awake $(Dates.now())")

`wait`, `fetch`, `@sync` work all out of the box for threads as well, which is nice, but...

### CAUTION

There is a known [issue](https://github.com/JuliaLang/julia/issues/43952) with `Threads.@spawn`

- using standard `println` will break the independence
- using `sleep(1)` instead of `sleep_blocking(1)` will also break it

Here [another issue](https://github.com/JuliaLang/julia/issues/41586#issuecomment-880875258) which describes further difficulties. And [another](https://github.com/JuliaLang/julia/issues/34267).

The support for threads will improve over the next Julia versions.

### For what should you use Threads then?

Speed up by "simple" parallelization of independent tasks

- package [`ThreadPools`](https://github.com/tro3/ThreadPools.jl), and the similar but already builtin `Threads.@threads`
- package [`ThreadsX`](https://github.com/tkf/ThreadsX.jl) gives threaded versions of many builtin functions like `any`, `sum`, etc.
- package [`Transducers`](https://juliafolds.github.io/Transducers.jl/dev/parallelism/#overview-parallel) is all about stream transformations and also supports threaded parallization

### Another warning: Atomic operations example

In [None]:
acc = Ref(0)

Threads.@threads for i in 1:1000
    acc[] += 1
end

acc[]

In [None]:
acc = Threads.Atomic{Int64}(0)

Threads.@threads for i in 1:1000
      Threads.atomic_add!(acc, 1)
end

acc[]

### If you really want to use more low-level threads

1. Read about **locks** in the [documentation](https://docs.julialang.org/en/v1/manual/multi-threading/#Data-race-freedom) and api, e.g. [ReentrantLock](https://docs.julialang.org/en/v1/base/parallel/#Base.ReentrantLock), [Semaphore](https://docs.julialang.org/en/v1/base/parallel/#Base.Semaphore), and [SpinLock](https://docs.julialang.org/en/v1/base/multi-threading/#Low-level-synchronization-primitives)

2. Read about **atomic operations** in the [documentation](https://docs.julialang.org/en/v1/manual/multi-threading/#man-atomics) and [api](https://docs.julialang.org/en/v1/base/multi-threading/#Base.@atomic)

# Parallel computing 🚀

doing things completely independently on multiple worker processes

while the main process is usually orchestrating

In [None]:
using Distributed  # standard library which comes with every julia distribution

In [None]:
Distributed.nprocs()

In [None]:
Distributed.addprocs(1)

In [None]:
Distributed.nprocs()

`RemoteChannel`

In [None]:
Distributed.workers()

### `@everywhere`

run code on all processes

In [None]:
Distributed.@everywhere using InteractiveUtils

In [None]:
varinfo()

### `@spawnat` and `remotecall`

In [None]:
future = @spawnat :any varinfo()

In [None]:
fetch(future)

In [None]:
# @spawnat :any calls nextproc to determine the process
Distributed.nextproc()

combine spawn and fetch with `Distributed.@fetch`, `Distributed.@fetchfrom` or

In [None]:
remotecall_fetch(varinfo, Distributed.nextproc())

### Caution Magic: (implicitly) passing data between processes

In [None]:
A = rand(10,10)

In [None]:
future = @spawnat 2 begin
    sum(A)
end
fetch(future)

In [None]:
Distributed.@fetchfrom 2 varinfo()

no global definition with extra remotecall args

In [None]:
B = rand(10,10)

remotecall_fetch(B -> sum(B), 2, B)

In [None]:
Distributed.@fetchfrom 2 varinfo()

magic actually also works within `remotecall`

In [None]:
remotecall_fetch(()->sum(B), 2)

In [None]:
Distributed.@fetchfrom 2 varinfo()

if you want to use the magical data transfer, you can use `let` to prevent global assignments 

In [None]:
C = rand(10,10)
let C=C
    remotecall_fetch(()->sum(C), 2)
end

In [None]:
Distributed.@fetchfrom 2 varinfo()

In [None]:
Distributed.clear!(:A)
Distributed.@fetchfrom 2 varinfo()

### your space 🤓

👉 Check how much better `remotecall_fetch` is compared to `fetch(remotecall(...))`

In [None]:
# your space

using BenchmarkTools
@btime 1 + 1;

### `RemoteChannel`

Here a really nice example of complex communication between different machines

In [None]:
const jobs = RemoteChannel(()->Channel{Int}(32));
const results = RemoteChannel(()->Channel{Tuple}(32));

In [None]:
@everywhere function do_work(jobs, results) # define work function everywhere
    while true
        job_id = take!(jobs)
        exec_time = rand()
        sleep(exec_time) # simulates elapsed time doing actual work
        put!(results, (job_id, exec_time, myid()))
    end
end

In [None]:
function make_jobs(n)
    for i in 1:n
        put!(jobs, i)
    end
end;

n = 12;
errormonitor(@async make_jobs(n)); # feed the jobs channel with "n" jobs

In [None]:
for p in workers() # start tasks on the workers to process requests in parallel
    remote_do(do_work, p, jobs, results)
end

@elapsed while n > 0 # print out results
    job_id, exec_time, where = take!(results)
    println("$job_id finished in $(round(exec_time; digits=2)) seconds on worker $where")
    global n = n - 1
end

### Convenience helpers

In [None]:
nheads = @distributed (+) for i = 1:20000
    Int(rand(Bool))
end

In [None]:
using LinearAlgebra

M = [rand(5,5) for i = 1:4]
pmap(svdvals, M)

other packages
- stdlib [`SharedArrays`](https://docs.julialang.org/en/v1/stdlib/SharedArrays/) to share one large array on multiple processes **on the same machine**
- package [`DistributedArrays`](https://juliaparallel.org/DistributedArrays.jl/stable/) distribute Arrays over multiple machines 
- package [`DTables`](https://github.com/JuliaParallel/DTables.jl) distribute Tables (e.g. Dataframes) over multiple machines

# Wouldn't it be great to just have a distributed `@spawn`?

Welcome [`Dagger.jl`](https://github.com/JuliaParallel/Dagger.jl)!

In [None]:
using Dagger

In [None]:
a = Dagger.@spawn 1+3

In [None]:
b = Dagger.@spawn rand(a, 4)

In [None]:
c = Dagger.@spawn sum(b)
fetch(c)

# Summary (taken from julia documentation)

Julia supports these four categories of concurrent and parallel programming:

1. **Asynchronous "tasks", or coroutines**:

    Julia Tasks allow suspending and resuming computations
    for I/O, event handling, producer-consumer processes, and similar patterns.
    Tasks can synchronize through operations like [`wait`](https://docs.julialang.org/en/v1/base/parallel/#Base.wait) and [`fetch`](https://docs.julialang.org/en/v1/base/parallel/#Base.fetch-Tuple{Task}), and
    communicate via [`Channel`](https://docs.julialang.org/en/v1/base/parallel/#Base.Channel)s. While strictly not parallel computing by themselves,
    Julia lets you schedule [`Task`](https://docs.julialang.org/en/v1/base/parallel/#Core.Task)s on several threads.

2. **Multi-threading**:

    Julia's [multi-threading](https://docs.julialang.org/en/v1/manual/multi-threading/#man-multithreading) provides the ability to schedule Tasks
    simultaneously on more than one thread or CPU core, sharing memory. This is usually the easiest way
    to get parallelism on one's PC or on a single large multi-core server. Julia's multi-threading
    is composable. When one multi-threaded function calls another multi-threaded function, Julia
    will schedule all the threads globally on available resources, without oversubscribing.

3. **Distributed computing**:

    Distributed computing runs multiple Julia processes with separate memory spaces. These can be on the same
    computer or multiple computers. The [`Distributed`](https://docs.julialang.org/en/v1/stdlib/Distributed/#man-distributed) standard library provides the capability for remote execution
    of a Julia function. With this basic building block, it is possible to build many different kinds of
    distributed computing abstractions. Packages like [`DistributedArrays.jl`](https://github.com/JuliaParallel/DistributedArrays.jl)
    are an example of such an abstraction. On the other hand, packages like [`MPI.jl`](https://github.com/JuliaParallel/MPI.jl) and
    [`Elemental.jl`](https://github.com/JuliaParallel/Elemental.jl) provide access to the existing MPI ecosystem of libraries.

4. **GPU computing**:

    The Julia GPU compiler provides the ability to run Julia code natively on GPUs. There
    is a rich ecosystem of Julia packages that target GPUs. The [JuliaGPU.org](https://juliagpu.org)
    website provides a list of capabilities, supported GPUs, related packages and documentation.


# Thank you for your participation

for questions or suggestions please contact me at stephan.sahm@jolin.io


Sponsored by [Jolin.io](https://www.jolin.io)

<a href="https://www.jolin.io" target="_blank" rel="noreferrer noopener">
<img src="https://www.jolin.io/assets/Jolin/Jolin-Banner-Website-v1.1-darkmode.webp">
</a>

Jolin.io is an IT-consultancy focussing on Julia

We are there to help you, if you want to
- try out Julia at your company, or
- transition Matlab, Fortran, R, Python, etc. to Julia
- or speed up your existing Julia code