# Lab 10: Parallel computing

In this lab we are going to introduce tools that Julia's ecosystem offers for different ways of parallel computing. As an ilustration for how capable Julia was/is consider the fact that it has joined (alongside C,C++ and Fortran) the so-called "PetaFlop club"[^1][], a list of languages capable of running at over 1PFLOPS.


[^1]: https://juliacomputing.com/media/2017/09/julia-joins-petaflop-club/ "Julia Joins Petaflop Club"

## Introduction
Nowadays there is no need to convince anyone about the advantages of having more cores available for your computation be it on a laptop, workstation or a cluster. The trend can be nicely illustrated in the figure bellow:
<img src="./42-years-processor-trend.png" align="center">
Image source[^2]

[^2]: https://www.karlrupp.net/2018/02/42-years-of-microprocessor-trend-data/ "Performance metrics trend of CPUs in the last 42years"

However there are some shortcomings when going from sequential programming, that we have to note
- We don't think in parallel
- We learn to write and reason about programs serially
- The desire for parallelism often comes *after* you've written your algorithm (and found it too slow!)
- Harder to reason and therefore harder to debug
- The number of cores is increasing, thus knowing how the program scales is crucial (not just that it runs better)
- Benchmarking parallel code, that tries to exhaust the processor pool is much more affected by background processes


<div class="alert alert-block alert-warning">
<b>Warning:</b> "Shortcomings of parallelism": Parallel computing brings its own set of problems and not an insignificant overhead with data manipulation and communication, therefore try always to optimize your serial code as much as you can before advancing to parallel acceleration.
</div>

<div class="alert alert-block alert-warning">
<b>Warning:</b>"Disclaimer":
    With the increasing complexity of computer HW some statements may become outdated. Moreover we won't cover as many tips that you may encounter on a parallel programming specific course, which will teach you more in the direction of how to think in parallel, whereas here we will focus on the tools that you can use to realize the knowledge gained therein.
</div>
    
## Process based parallelism
As the name suggest process based parallelism is builds on the concept of running code on multiple processes, which can run even on multiple machines thus allowing to scale computing from a local machine to a whole network of machines - a major difference from the other parallel concept of threads. In Julia this concept is supported within standard library `Distributed` and the scaling to cluster can be realized by 3rd party library [`ClusterManagers.jl`](https://github.com/JuliaParallel/ClusterManagers.jl).

Let's start simply with knowing how to start up additional Julia processes. There are two ways:
- by adding processes using cmd line argument `-p ##`
```bash
julia -p 4
```
- by adding processes after startup using the `addprocs(##)` function from std library `Distributed`
```julia


In [1]:
cd("Scientific-Programing-in-Julia") # just for me

In [2]:
using Distributed
addprocs(4)

4-element Vector{Int64}:
 2
 3
 4
 5

In [3]:
nworkers()  # returns number of workers

4

In [4]:
nprocs()    # returns number of processes `nworkers() + 1`

5

The result shown in a process manager such as `htop`:
```bash
.../julia-1.6.2/bin/julia --project                                                                                     
.../julia-1.6.2/bin/julia -Cnative -J/home/honza/Apps/julia-1.6.2/lib/julia/sys.so -g1 --bind-to 127.0.0.1 --worker
.../julia-1.6.2/bin/julia -Cnative -J/home/honza/Apps/julia-1.6.2/lib/julia/sys.so -g1 --bind-to 127.0.0.1 --worker
.../julia-1.6.2/bin/julia -Cnative -J/home/honza/Apps/julia-1.6.2/lib/julia/sys.so -g1 --bind-to 127.0.0.1 --worker
.../julia-1.6.2/bin/julia -Cnative -J/home/honza/Apps/julia-1.6.2/lib/julia/sys.so -g1 --bind-to 127.0.0.1 --worker
```

Both of these result in total of 5 running processes - 1 controller, 4 workers - with their respective ids accessible via `myid()` function call. Note that the controller process has always id 1 and other processes are assigned subsequent integers, see for yourself with `@everywhere` macro, which runs easily code on all or a subset of processes.

In [5]:
@everywhere println(myid())

1
      From worker 5:	5
      From worker 3:	3
      From worker 4:	4
      From worker 2:	2


In [6]:
myid()

1

In [7]:
@everywhere [2,3] println(myid()) # select a subset of workers

      From worker 2:	2
      From worker 3:	3


The same way that we have added processes we can also remove them

In [8]:
workers()

4-element Vector{Int64}:
 2
 3
 4
 5

In [9]:
rmprocs(2)  # kills worker with id 2

Task (done) @0x0000000169b5b0a0

In [10]:
workers()

3-element Vector{Int64}:
 3
 4
 5

As we have seen from the `htop/top` output, added processes start with specific cmd line arguments, however they are not shared with any aliases that we may have defined, e.g. `julia` ~ `julia --project=.`. Therefore in order to use an environment, we have to first activate it on all processes

In [11]:
@everywhere begin
  using Pkg; Pkg.activate(@__DIR__);Pkg.instantiate() # @__DIR__ equivalent to a call to pwd()
end

[32m[1m  Activating[22m[39m project at `~/AI_Center/Scientific-Programing-in-Julia`


      From worker 4:	[32m[1m  Activating[22m[39m project at `~/AI_Center/Scientific-Programing-in-Julia`
      From worker 3:	[32m[1m  Activating[22m[39m project at `~/AI_Center/Scientific-Programing-in-Julia`
      From worker 5:	[32m[1m  Activating[22m[39m project at `~/AI_Center/Scientific-Programing-in-Julia`


or we can load files containing this line on all processes with cmdline option `-L ###.jl` together with `-p ##`.

#### There are generally two ways of working with multiple processes
- using low level functionality - we specify what/where is loaded, what/where is being run and when we fetch results
    + `@everywhere` to run everywhere and wait for completion
    + `@spawnat` and `remotecall` to run at specific process and return `Future` (a reference to a future result - remote reference)
    + `fetch` - fetching remote reference
    * `pmap` - for easily mapping a function over a collection

- using high level functionality - define only simple functions and apply them on collections
    + [`DistributedArrays`](https://github.com/JuliaParallel/DistributedArrays.jl)' with `DArray`s
    + [`Transducers.jl`](https://github.com/JuliaFolds/Transducers.jl) pipelines

### Sum with processes (First example)
Writing your own sum of an array function is a good way to show all the potential problems, you may encounter with parallel programming. For comparison here is the naive version that uses `zero` for initialization and `@inbounds` for removing boundschecks.




In [12]:
function naive_sum(a)
    r = zero(eltype(a))
    for i in eachindex(a)
        @inbounds r += a[i]
    end
    r
end

naive_sum (generic function with 1 method)

Its performance will serve us as a sequential baseline.

In [13]:
using BenchmarkTools

In [14]:
a = rand(10_000_000); # 10^7

In [15]:
sum(a) ≈ naive_sum(a)

true

In [16]:
@btime sum($a)

  1.843 ms (0 allocations: 0 bytes)


5.00036166607632e6

In [17]:
@btime naive_sum($a)

  9.309 ms (0 allocations: 0 bytes)


5.000361666077414e6

In [18]:
rmprocs(workers()) # clean workers

Task (done) @0x0000000118aacbe0

Note that the built-in `sum` exploits single core parallelism with Single instruction, multiple data (SIMD instructions) and is thus faster.


<div class="alert alert-block alert-success">
<b>Exercise:</b> 

Write a distributed/multiprocessing version of `sum` function `dist_sum(a, np=nworkers())` without the help of `DistributedArrays`. Measure the speed up when doubling the number of workers (up to the number of logical cores - see [note](@ref lab10_thread) on hyper threading).
    
**HINTS**:
- map builtin `sum` over chunks of the array using `pmap`
- there are built in partition iterators `Iterators.partition(array, chunk_size)`
- `chunk_size` should relate to the number of available workers
- `pmap` has the option to pass the ids of workers as the second argument `pmap(f, WorkerPool([2,4]), collection)`
- `pmap` collects the partial results to the controller where it can be collected with another `sum`
    
</div>
    
####

<div class="alert alert-block alert-info">
<b>Solution</b>: </div>

####

In [48]:
#using Distributed
addprocs(8)

@everywhere begin
    using Pkg; Pkg.activate(@__DIR__)
end

function dist_sum(a, np=nworkers())
    chunk_size = div(length(a), np)
    sum(pmap(sum, WorkerPool(workers()[1:np]), Iterators.partition(a, chunk_size)))
end

dist_sum(a) ≈ sum(a)
@btime dist_sum($a)

@time dist_sum(a, 1) 
@time dist_sum(a, 3)
@time dist_sum(a, 4)
@time dist_sum(a, 8)
rmprocs(workers()) # clean workers

[32m[1m  Activating[22m[39m project at `~/AI_Center/Scientific-Programing-in-Julia`


      From worker 18:	[32m[1m  Activating[22m[39m project at `~/AI_Center/Scientific-Programing-in-Julia`
      From worker 15:	[32m[1m  Activating[22m[39m project at `~/AI_Center/Scientific-Programing-in-Julia`
      From worker 16:	[32m[1m  Activating[22m[39m project at `~/AI_Center/Scientific-Programing-in-Julia`
      From worker 17:	[32m[1m  Activating[22m[39m project at `~/AI_Center/Scientific-Programing-in-Julia`
      From worker 21:	[32m[1m  Activating[22m[39m project at `~/AI_Center/Scientific-Programing-in-Julia`
      From worker 14:	[32m[1m  Activating[22m[39m project at `~/AI_Center/Scientific-Programing-in-Julia`
      From worker 19:	[32m[1m  Activating[22m[39m project at `~/AI_Center/Scientific-Programing-in-Julia`
      From worker 20:	[32m[1m  Activating[22m[39m project at `~/AI_Center/Scientific-Programing-in-Julia`
  15.410 ms (625 allocations: 76.32 MiB)
  0.066097 seconds (249 allocations: 76.309 MiB)
  0.126273 seconds (438 alloca

Task (done) @0x00000001183e4550

###
As you can see the built-in `pmap` already abstracts quite a lot from the process and all the data movement is handled internally, however in order to show off how we can abstract even more, let's use the `DistributedArrays.jl` pkg.

<div class="alert alert-block alert-success">
<b>Exercise:</b> 
Write a distributed/multiprocessing version of `sum` function `dist_sum_lib(a, np=nworkers())` with the help of `DistributedArrays`. Measure the speed up when doubling the number of workers (up to the number of logical cores - see note on hyper threading).

**HINTS**:
- chunking and distributing the data can be handled for us using the `distribute` function on an array (creates a `DArray`)
- `distribute` has an option to specify on which workers should an array be distributed to
- `sum` function has a method for `DArray`
- remember to run `using DistributedArrays` on every process
</div>

In [81]:
using DistributedArrays

In [82]:
function dist_sum_lib(a, np=nworkers())
    adist = distribute(a,procs=workers()[1:np])
    sum(adist)
end

dist_sum_lib (generic function with 2 methods)

<div class="alert alert-block alert-info">
<b>Solution</b>: </div>

####

In [87]:
using Distributed
addprocs(8)

@everywhere begin
    using Pkg; Pkg.activate(@__DIR__)
end

@everywhere begin
    using DistributedArrays
end 

#And the actual computation.

adist = distribute(a)       # distribute array to workers |> typeof - DArray
@time adist = distribute(a); # we should not disregard this time
@btime sum($adist)          # call the built-in function (dispatch on DArrray)

function dist_sum_lib(a, np=nworkers())
    adist = distribute(a, procs = workers()[1:np])
    sum(adist)
end

dist_sum_lib(a) ≈ sum(a)
@btime dist_sum_lib($a)
@time dist_sum_lib(a, 1) # 80ms
@time dist_sum_lib(a, 2) # 54ms
@time dist_sum_lib(a, 4) # 48ms
@time dist_sum_lib(a, 8) # 33ms
rmprocs(workers()) # clean workers

[32m[1m  Activating[22m[39m project at `~/AI_Center/Scientific-Programing-in-Julia`


      From worker 77:	[32m[1m  Activating[22m[39m project at `~/AI_Center/Scientific-Programing-in-Julia`
      From worker 76:	[32m[1m  Activating[22m[39m project at `~/AI_Center/Scientific-Programing-in-Julia`
      From worker 70:	[32m[1m  Activating[22m[39m project at `~/AI_Center/Scientific-Programing-in-Julia`
      From worker 71:	[32m[1m  Activating[22m[39m project at `~/AI_Center/Scientific-Programing-in-Julia`
      From worker 72:	[32m[1m  Activating[22m[39m project at `~/AI_Center/Scientific-Programing-in-Julia`
      From worker 75:	[32m[1m  Activating[22m[39m project at `~/AI_Center/Scientific-Programing-in-Julia`
      From worker 74:	[32m[1m  Activating[22m[39m project at `~/AI_Center/Scientific-Programing-in-Julia`
      From worker 73:	[32m[1m  Activating[22m[39m project at `~/AI_Center/Scientific-Programing-in-Julia`
      From worker 80:	[32m[1m  Activating[22m[39m project at `~/AI_Center/Scientific-Programing-in-Julia`
      From

Task (done) @0x00000001194cce80

###
In both previous examples we have included the data transfer time from the controller process, in practice however distributed computing is used in situations where the data may be stored on individual local machines. As a general rule of thumb we should always send only instruction what to do and not the actual data to be processed. This will be more clearly demonstrated in the next more practical example.

### Distributed file processing
`Distributed` is often used in processing of files, such as the commonly encountered `mapreduce` jobs with technologies like [`Hadoop`](https://hadoop.apache.org/), [`Spark`](http://spark.apache.org/), where the files live on a distributed file system and a typical job requires us to map over all the files and gather some statistics such as histograms, sums and others. We will simulate this situation with the Julia's pkg codebase, which on a typical user installation can contain up to hundreds of thousand of `.jl` files (depending on how extensively one uses Julia).

<div class="alert alert-block alert-success">
<b>Exercise:</b>
Write a distributed pipeline for computing a histogram of symbols found in AST by parsing Julia source files in your `.julia/packages/` directory. We have already implemented most of the code that you will need (available as source code 
 [`here`](https://github.com/JuliaTeachingCTU/Scientific-Programming-in-Julia/blob/master/docs/src/lecture_10/pkg_processing.jl) .
    
Your task is to write a function that does the `map` and `reduce` steps, that will create and gather the dictionaries from different workers. There are two ways to do a map
- either over directories inside `.julia/packages/` - call it `distributed_histogram_pkgwise`
- or over all files obtained by concatenation of `filter_jl` outputs (*NOTE* that this might not be possible if the listing itself is expensive - speed or memory requirements) - call it `distributed_histogram_filewise`
Measure if the speed up scales linearly with the number of processes by restricting the number of workers inside a `pmap`.

**HINTS**:
- for each file path apply `tokenize` to extract symbols and follow it with the update of a local histogram
- try writing sequential version first
- either load `./pkg_processing.jl` on startup with `-L` and `-p` options or `include("./pkg_processing.jl")` inside `@everywhere`
- use `pmap` to easily iterate in parallel over a collection - the result should be an array of histogram, which has to be merged on the controller node (use builtin `mergewith!` function in conjunction with `reduce`)
- `pmap` supports `do` syntax
```julia
pmap(collection) do item
    do_something(item)
end
```
- pkg directory can be obtained with `joinpath(DEPOT_PATH[1], "packages")`

**BONUS**:
What is the most frequent symbol in your codebase?
    
</div>

In [90]:
include("lab10/pkg_processing.jl")

tokenize

<div class="alert alert-block alert-info">
<b>Solution</b>: </div>

###

Let's implement first a sequential version as it is much easier to debug.

In [88]:
using ProgressMeter
function sequential_histogram(path)
    h = Dict{Symbol, Int}()
    @showprogress for pkg_dir in sample_all_installed_pkgs(path)
        for jl_path in filter_jl(pkg_dir)
            syms = tokenize(jl_path)
            for s in syms
                v = get!(h, s, 0)
                h[s] += 1
            end
        end
    end
    h
end

sequential_histogram (generic function with 1 method)

In [91]:
path = joinpath(DEPOT_PATH[1], "packages") # usually the first entry
@time h = sequential_histogram(path)

[32mProgress: 100%|█████████████████████████████████████████| Time: 0:00:37[39m


 37.181339 seconds (45.63 M allocations: 4.896 GiB, 0.85% gc time, 2.50% compilation time)


Dict{Symbol, Int64} with 68056 entries:
  :MBEDTLS_TLS_RSA_WITH_CAMELLIA_256_GCM_SHA384 => 1
  :ptr1                                         => 11
  :set_unresolved                               => 2
  :wrap_dot                                     => 5
  :cind                                         => 2
  :ci_quesenberry_hurst                         => 2
  :cuStreamWaitEvent                            => 2
  :cusparseDestroyCsrsm2Info                    => 3
  :lowertype                                    => 3
  :fluxρ_z                                      => 4
  :assert_in_interval                           => 8
  :acol                                         => 27
  :eix                                          => 3
  :effectiveDevicePixelRatio                    => 2
  :Base64                                       => 39
  :slice_arg                                    => 9
  :ptr_index                                    => 8
  :S′                                           => 9
  :

First we try to distribute over package folders. TODO add the ability to run it only on some workers

In [92]:
addprocs(8)

@everywhere begin
    cd("Scientific-Programing-in-Julia")
    using Pkg; Pkg.activate(@__DIR__); Pkg.instantiate()
    using ProgressMeter
    # we have to realize that the code that workers have access to functions we have defined
    include("lab10/pkg_processing.jl") 
end

"""
    merge_with!(h1, h2)

Merges count dictionary `h2` into `h1` by adding the counts. Equivalent to `Base.mergewith!(+)`.
"""
function merge_with!(h1, h2)
    for s in keys(h2)
        get!(h1, s, 0)
        h1[s] += h2[s]
    end
    h1
end

[32m[1m  Activating[22m[39m project at `~/AI_Center/Scientific-Programing-in-Julia`


      From worker 89:	[32m[1m  Activating[22m[39m project at `~/AI_Center/Scientific-Programing-in-Julia`
      From worker 86:	[32m[1m  Activating[22m[39m project at `~/AI_Center/Scientific-Programing-in-Julia`
      From worker 91:	[32m[1m  Activating[22m[39m project at `~/AI_Center/Scientific-Programing-in-Julia`
      From worker 93:	[32m[1m  Activating[22m[39m project at `~/AI_Center/Scientific-Programing-in-Julia`
      From worker 92:	[32m[1m  Activating[22m[39m project at `~/AI_Center/Scientific-Programing-in-Julia`
      From worker 87:	[32m[1m  Activating[22m[39m project at `~/AI_Center/Scientific-Programing-in-Julia`
      From worker 88:	[32m[1m  Activating[22m[39m project at `~/AI_Center/Scientific-Programing-in-Julia`
      From worker 90:	[32m[1m  Activating[22m[39m project at `~/AI_Center/Scientific-Programing-in-Julia`


merge_with!

In [93]:
function distributed_histogram_pkgwise(path, np=nworkers())
    r = @showprogress pmap(WorkerPool(workers()[1:np]), sample_all_installed_pkgs(path)) do pkg_dir
        h = Dict{Symbol, Int}()
        for jl_path in filter_jl(pkg_dir)
            syms = tokenize(jl_path)
            for s in syms
                v = get!(h, s, 0)
                h[s] += 1
            end
        end
        h
    end
    reduce(merge_with!, r)
end

distributed_histogram_pkgwise (generic function with 2 methods)

In [94]:
path = joinpath(DEPOT_PATH[1], "packages")

@time h = distributed_histogram_pkgwise(path, 2); # 41.5s
@time h = distributed_histogram_pkgwise(path, 4); # 24.0s
@time h = distributed_histogram_pkgwise(path, 8); # 24.0s

[32mProgress: 100%|█████████████████████████████████████████| Time: 0:00:20[39m


 20.714750 seconds (3.92 M allocations: 220.398 MiB, 0.16% gc time, 5.34% compilation time)


[32mProgress: 100%|█████████████████████████████████████████| Time: 0:00:10[39m


 10.256315 seconds (298.66 k allocations: 25.198 MiB, 0.26% gc time, 0.23% compilation time)


[32mProgress: 100%|█████████████████████████████████████████| Time: 0:00:08[39m


  8.193193 seconds (269.32 k allocations: 23.443 MiB)


Second we try to distribute over all files.

In [95]:
function distributed_histogram_filewise(path, np=nworkers())
    jl_files = reduce(vcat, filter_jl(pkg_dir) for pkg_dir in sample_all_installed_pkgs(path))
    r = @showprogress pmap(WorkerPool(workers()[1:np]), jl_files) do jl_path
        h = Dict{Symbol, Int}()
        syms = tokenize(jl_path)
        for s in syms
            v = get!(h, s, 0)
            h[s] += 1
        end
        h
    end
    reduce(merge_with!, r)
end

distributed_histogram_filewise (generic function with 2 methods)

In [96]:
path = joinpath(DEPOT_PATH[1], "packages")
@time h = distributed_histogram_filewise(path, 2); # 46.9s
@time h = distributed_histogram_filewise(path, 4); # 24.8s
@time h = distributed_histogram_filewise(path, 8); # 20.4s

[32mProgress: 100%|█████████████████████████████████████████| Time: 0:00:20[39m


 21.158707 seconds (2.63 M allocations: 178.579 MiB, 0.46% gc time, 1.55% compilation time)


[32mProgress: 100%|█████████████████████████████████████████| Time: 0:00:10[39m


 11.079841 seconds (1.70 M allocations: 128.355 MiB, 0.35% gc time, 0.03% compilation time)


[32mProgress: 100%|█████████████████████████████████████████| Time: 0:00:07[39m


  8.257418 seconds (1.69 M allocations: 127.583 MiB, 0.54% gc time)


Here we can see that we have improved the timings a bit by increasing granularity of tasks.

**BONUS**: You can do some analysis with `DataFrames`

In [97]:
using DataFrames
df = DataFrame(:sym => collect(keys(h)), :count => collect(values(h)));
sort!(df, :count, rev=true);
df[1:50,:]

Unnamed: 0_level_0,sym,count
Unnamed: 0_level_1,Symbol,Int64
1,call,764136
2,=,192197
3,block,158812
4,.,154421
5,macrocall,153231
6,::,101525
7,curly,96004
8,tuple,68521
9,@test,67672
10,ref,64542


In [98]:
rmprocs(workers())

Task (done) @0x000000016b7e2a10

## Threading
The number of threads for a Julia process can be set up in an environmental variable `JULIA_NUM_THREADS` or directly on Julia startup with cmd line option `-t ##` or `--threads ##`. If both are specified the latter takes precedence.
```bash
julia -t 8
```
In order to find out how many threads are currently available, there exist the `nthreads` function inside `Base.Threads` library. There is also an analog to the Distributed `myid` example, called `threadid`.

In [99]:
using Base.Threads

In [100]:
nthreads()

4

In [101]:
threadid()

1

As opposed to distributed/multiprocessing programming, threads have access to the whole memory of Julia's process, therefore we don't have to deal with separate environment manipulation, code loading and data transfers. However we have to be aware of the fact that memory can be modified from two different places and that there may be some performance penalties of accessing memory that is physically further from a given core (e.g. caches of different core or different NUMA[^3] nodes). Another significant difference from distributed computing is that we cannot spawn additional threads on the fly in the same way that we have been able to do with `addprocs` function.

[^3]: https://en.wikipedia.org/wiki/Non-uniform\_memory\_access  "NUMA" 

<div class="alert alert-block alert-warning">
<b>Info:</b> "Hyper threads"
    In most of today's CPUs the number of threads is larger than the number of physical cores. These additional threads are usually called hyper threads[^4] or when talking about cores - logical cores. The technology relies on the fact, that for a given "instruction" there may be underutilized parts of the CPU core's machinery (such as one of many arithmetic units) and if a suitable work/instruction comes in it can be run simultaneously. In practice this means that adding more threads than physical cores may not be accompanied with the expected speed up.
    
[^4]: https://en.wikipedia.org/wiki/Hyper-threading "Hyperthreading"
    
</div>

The easiest (not always yielding the correct result) way how to turn a code into multi threaded code is putting the `@threads` macro in front of a for loop, which instructs Julia to run the body on separate threads.

In [102]:
A = Array{Union{Int,Missing}}(missing, nthreads());

In [103]:
for i in 1:nthreads()
    A[threadid()] = threadid()
end

In [104]:
A # only the first element is filled

4-element Vector{Union{Missing, Int64}}:
 1
  missing
  missing
  missing

In [105]:
A = Array{Union{Int,Missing}}(missing, nthreads());

In [106]:
@threads for i in 1:nthreads()
    A[threadid()] = threadid()
end

In [107]:
A

4-element Vector{Union{Missing, Int64}}:
 1
 2
 3
 4

### Multithreaded sum
Armed with this knowledge let's tackle the problem of the simple `sum`.

In [108]:
function threaded_sum_naive(a)
    r = zero(eltype(a))
    @threads for i in eachindex(a)
        @inbounds r += a[i]
    end
    return r
end

threaded_sum_naive (generic function with 1 method)

Comparing this with the built-in sum we see not an insignificant discrepancy (one that cannot be explained by reordering of computation) and moreover the timings show us some ridiculous overhead.

In [109]:
a = rand(10_000_000); # 10^7

In [110]:
sum(a), threaded_sum_naive(a)

(4.9998796361902645e6, 1.2509275137483007e6)

In [111]:
@btime sum($a)

  1.845 ms (0 allocations: 0 bytes)


4.9998796361902645e6

In [112]:
@btime threaded_sum_naive($a)

  279.292 ms (20000024 allocations: 305.18 MiB)


1.2502303664049988e6

Recalling what has been said above we have to be aware of the fact that the data can be accessed from multiple threads at once, which if not taken into an account means that each thread reads possibly outdated value and overwrites it with its own updated state. 

There are two solutions which we will tackle in the next two exercises. 

<div class="alert alert-block alert-success">
<b>Exercise:</b>
Implement `threaded_sum_atom`, which uses `Atomic` wrapper around the accumulator variable `r` in order to ensure correct locking of data access. 

**HINTS**:
- use `atomic_add!` as a replacement of `r += A[i]`
- "collect" the result by dereferencing variable `r` with empty bracket operator `[]`

!!! info "Side note on dereferencing"
    In Julia we can create references to a data types, which are guarranteed to point to correct and allocated type in memory, as long as a reference exists the memory is not garbage collected. These are constructed with `Ref(x)`, `Ref(a, 7)` or `Ref{T}()` for reference to variable `x`, `7`th element of array `a` and an empty reference respectively. Dereferencing aka asking about the underlying value is done using empty bracket operator `[]`.
    ```@repl lab10_refs
    x = 1       # integer
    rx = Ref(x) # reference to that particular integer `x`
    x == rx[]   # dereferencing yields the same value
    ```
    There also exist unsafe references/pointers `Ptr`, however we should not really come into a contact with those.

**BONUS**:
Try chunking the array and calling sum on individual chunks to obtain some real speedup.


    
</div>

<div class="alert alert-block alert-info">
<b>Solution</b>: </div>

#### 1 option

In [113]:
function threaded_sum_atom(a)
    r = Atomic{eltype(a)}(zero(eltype(a)))
    @threads for i in eachindex(a)
        @inbounds atomic_add!(r, a[i])
    end
    return r[]
end

threaded_sum_atom (generic function with 1 method)

In [114]:
sum(a) ≈ threaded_sum_atom(a)

true

In [117]:
@btime threaded_sum_atom($a)

  618.387 ms (24 allocations: 1.88 KiB)


4.999879636189357e6

#### Bonus

That's better but far from the performance we need. 

**BONUS**: There is a fancier and faster way to do this by chunking the array

In [None]:
function threaded_sum_fancy_atom(a)
    r = Atomic{eltype(a)}(zero(eltype(a)))
    len, rem = divrem(length(a), nthreads())
    @threads for t in 1:nthreads()
        rₜ = zero(eltype(a))
        @simd for i in (1:len) .+ (t-1)*len
            @inbounds rₜ += a[i]
        end
        atomic_add!(r, rₜ)
    end
    # catch up any stragglers
    result = r[]
    @simd for i in length(a)-rem+1:length(a)
        @inbounds result += a[i]
    end
    return result
end

In [None]:
sum(a) ≈ threaded_sum_fancy_atom(a)

In [None]:
@btime threaded_sum_fancy_atom($a)

Finally we have beaten the "sequential" sum. The quotes are intentional, because the Base's implementation of a sum uses Single instruction, multiple data (SIMD) instructions as well, which allow to process multiple elements at once.

###

<div class="alert alert-block alert-success">
<b>Exercise:</b>
Implement `threaded_sum_buffer`, which uses an array of length `nthreads()` (we will call this buffer) for local aggregation of results of individual threads. 

**HINTS**:
- use `threadid()` to index the buffer array
- sum the buffer array to obtain final result
    
</div>

<div class="alert alert-block alert-info">
<b>Solution</b>: </div>

####

In [118]:
function threaded_sum_buffer(a)
    R = zeros(eltype(a), nthreads())
    @threads for i in eachindex(a)
        @inbounds R[threadid()] += a[i]
    end
    r = zero(eltype(a))
    # sum the partial results from each thread
    for i in eachindex(R)
        @inbounds r += R[i]
    end
    return r
end

threaded_sum_buffer (generic function with 1 method)

In [119]:
sum(a) ≈ threaded_sum_buffer(a)

true

In [128]:
@btime threaded_sum_buffer($a)

  4.558 ms (22 allocations: 1.92 KiB)


4.999879636190381e6

In [127]:
@btime sum($a)

  1.847 ms (0 allocations: 0 bytes)


4.9998796361902645e6


Though this implementation is cleaner and faster, there is possible drawback with this implementation, as the buffer `R` lives in a continuous part of the memory and each thread that accesses it brings it to its caches as a whole, thus invalidating the values for the other threads, which it in the same way.

###

Seeing how multithreading works on a simple example, let's apply it on the "more practical" case of the Symbol histogram from exercise above.

<div class="alert alert-block alert-success">
<b>Exercise:</b>

Write a multithreaded analog of the file processing pipeline from [exercise](@ref lab10_dist_file_p) above. Again the task is to write the `map` and `reduce` steps, that will create and gather the dictionaries from different workers. There are two ways to map
- either over directories inside `.julia/packages/` - `threaded_histogram_pkgwise`
- or over all files obtained by concatenation of `filter_jl` outputs - `threaded_histogram_filewise`
Compare the speedup with the version using process based parallelism.

**HINTS**:
- create a separate dictionary for each thread in order to avoid the need for atomic operations
- 

**BONUS**:
In each of the cases count how many files/pkgs each thread processed. Would the dynamic scheduler help us in this situation?
</div>

<div class="alert alert-block alert-info">
<b>Solution</b>: </div>

####

Setup is now much simpler.
```julia
using Base.Threads
include("./pkg_processing.jl") 
path = joinpath(DEPOT_PATH[1], "packages")
```
Firstly the version with folder-wise parallelism.

In [None]:
path = joinpath(DEPOT_PATH[1], "packages")

In [122]:
function threaded_histogram_pkgwise(path)
    ht = [Dict{Symbol, Int}() for _ in 1:nthreads()]
    @threads for pkg_dir in sample_all_installed_pkgs(path)
        h = ht[threadid()]
        for jl_path in filter_jl(pkg_dir)
            syms = tokenize(jl_path)
            for s in syms
                v = get!(h, s, 0)
                h[s] += 1
            end
        end
    end
    reduce(mergewith!(+), ht)
end

threaded_histogram_pkgwise (generic function with 1 method)

In [124]:
@time h = threaded_histogram_pkgwise(path);

 14.718499 seconds (42.30 M allocations: 4.754 GiB, 1.80% gc time)


Secondly the version with file-wise parallelism.

In [125]:
function threaded_histogram_filewise(path)
    jl_files = reduce(vcat, filter_jl(pkg_dir) for pkg_dir in sample_all_installed_pkgs(path))
    ht = [Dict{Symbol, Int}() for _ in 1:nthreads()]
    @threads for jl_path in jl_files
        h = ht[threadid()]
        syms = tokenize(jl_path)
        for s in syms
            v = get!(h, s, 0)
            h[s] += 1
        end
    end
    reduce(mergewith!(+), ht)
end

threaded_histogram_filewise (generic function with 1 method)

In [126]:
@time h = threaded_histogram_filewise(path);

 15.913946 seconds (42.48 M allocations: 4.775 GiB, 1.87% gc time, 0.39% compilation time)


## Task switching
There is a way how to run "multiple" things at once, which does not necessarily involve either threads or processes. In Julia this concept is called task switching or asynchronous programming, where we fire off our requests in a short time and let the cpu/os/network handle the distribution. As an example which we will try today is querying a web API, which has some variable latency. In the usuall sequantial fashion we can always post queries one at a time, however generally the APIs can handle multiple request at a time, therefore in order to better utilize them, we can call them asynchronously and fetch all results later, in some cases this will be faster.

<div class="alert alert-block alert-warning">
<b>Burst requests:</b>
    It is a good practice to check if an API supports some sort of batch request, because making a burst of single request might lead to a worse performance for others and a possible blocking of your IP/API key.
</div>

Consider following functions

In [129]:
function aa()
    for i in 1:10
        sleep(1)
    end
end

function bb()
    for i in 1:10
        @async sleep(1)
    end
end

function cc()
    @sync for i in 1:10
        @async sleep(1)
    end
end

cc (generic function with 1 method)

How much time will the execution of each of them take?

In [130]:
@time aa()
@time bb() 
@time cc()

 10.018780 seconds (359 allocations: 10.328 KiB)
  0.000024 seconds (40 allocations: 4.062 KiB)
  1.005568 seconds (726 allocations: 48.406 KiB, 0.15% compilation time)


<div class="alert alert-block alert-success">
<b>Exercise:</b>
Choose one of the free web APIs and query its endpoint using the `HTTP.jl` library. Implement both sequential and asynchronous version. Compare them on an burst of 10 requests.

**HINTS**:
- use `HTTP.request` for `GET` requests on your chosen API, e.g. `r = HTTP.request("GET", "https://catfact.ninja/fact")` for random cat fact
- converting body of a response can be done simply by constructing a `String` out of it - `String(r.body)`
- in order to parse a json string use `JSON.jl`'s parse function
- Julia offers `asyncmap` - asynchronous `map`

</div>

<div class="alert alert-block alert-info">
<b>Solution</b>: </div>

####

In [131]:
using HTTP, JSON

function query_cat_fact()
    r = HTTP.request("GET", "https://catfact.ninja/fact")
    j = String(r.body)
    d = JSON.parse(j)
    d["fact"]
end

# without asyncmap
function get_cat_facts_async(n)
    facts = Vector{String}(undef, n)
    @sync for i in 1:10
        @async facts[i] = query_cat_fact()
    end
    facts
end

get_cat_facts_async(n) = asyncmap(x -> query_cat_fact(), Base.OneTo(n))
get_cat_facts(n) = map(x -> query_cat_fact(), Base.OneTo(n))

@time get_cat_facts_async(10);
@time get_cat_facts(10);

  8.916924 seconds (14.90 M allocations: 790.576 MiB, 1.84% gc time, 55.42% compilation time)
  1.668796 seconds (27.82 k allocations: 2.145 MiB, 1.93% compilation time)
