# Parallel Computing made even simpler
We've already seen some examples of multithreaded and distributed programming, and it was already pretty simple. Just slap a macro in front of your `for` loop.

But the fun does not stop there. There are a couple of really nice packages that allow to define computations like folds (also known as Map-Reduce) and then to choose whether to run them single threaded, multithreaded or distributed (and even on the GPU).

You may have to run the next cell to install a couple packages, otherwise skip it.

In [None]:
using Pkg
Pkg.add("Folds")
Pkg.add("Transducers")
Pkg.add("Dagger")

In [1]:
using Folds
using Transducers
using Distributed
using Dagger

The `Distributed` package allows us to get some processes to run our code in parallel using the `addprocs` function. Let's get four of them.

In [2]:
addprocs(4)

4-element Vector{Int64}:
 2
 3
 4
 5

## Low level interface

In [3]:
# call `rand` on processor 2 with argument (2,2)
r = remotecall(rand, 2, 2, 2)

Future(2, 1, 6, ReentrantLock(nothing, 0x00000000, 0x00, Base.GenericCondition{Base.Threads.SpinLock}(Base.IntrusiveLinkedList{Task}(nothing, nothing), Base.Threads.SpinLock(0)), (0, 0, 0)), nothing)

In [4]:
# evaluate `1 .+ fetch(r)` on processor 2
s = @spawnat 2 1 .+ fetch(r)

Future(2, 1, 7, ReentrantLock(nothing, 0x00000000, 0x00, Base.GenericCondition{Base.Threads.SpinLock}(Base.IntrusiveLinkedList{Task}(nothing, nothing), Base.Threads.SpinLock(0)), (8, 139737700013952, 0)), nothing)

In [5]:
fetch(s)

2×2 Matrix{Float64}:
 1.78364  1.95816
 1.90316  1.74173

In [6]:
t = @spawnat :any fetch(s)^2

Future(2, 1, 9, ReentrantLock(nothing, 0x00000000, 0x00, Base.GenericCondition{Base.Threads.SpinLock}(Base.IntrusiveLinkedList{Task}(nothing, nothing), Base.Threads.SpinLock(0)), (0, 139736901260560, 0)), nothing)

In [7]:
fetch(t)

2×2 Matrix{Float64}:
 6.90807  6.90325
 6.70935  6.76033

The `@everywhere` macro is used to make function definitions, as well as loading files/packages across all processes.

In [8]:
@everywhere norm2(x) = sum(abs2.(x))

Let's work through an example of taking a serial loop and parallelising it. 

In [9]:
n = 10000
count = 0
for i in 1:n
    count += (norm2(rand(2)) < 1.0)
end
4 * count / n

3.1464

For this kind of *Map-Reduce* style operations, we can use slap the `@distributed` macro in front of our loop.

In [10]:
n = 1000000
count = @distributed (+) for i in 1:n
    Int(norm2(rand(2)) < 1.0)
end
4 * count / n

3.141968

External variables can be used inside the loop, as long as they're read-only.

In [11]:
a = rand(1000000)
@distributed (+) for i in eachindex(a)
    a[i]
end

499873.8555342362

Finally, we can also use the `pmap` function which applies a function to each element of an iterator in parallel and return the results

In [12]:
@everywhere collatz(n) = iseven(n) ? div(n,2) : 3n+1
@everywhere function collatz_length(n)
    count = 0
    while  n != 1
        n = collatz(n)
        count += 1
    end
    count
end

In [13]:
pmap(collatz_length, 1:1000)

1000-element Vector{Int64}:
   0
   1
   7
   2
   5
   8
  16
   3
  19
   6
   ⋮
 111
  93
  23
  23
  49
  49
  49
  49
 111

## Shared Arrays
When trying to modify an array in-place in parallel, the following naive solution will not work

In [14]:
a = zeros(100)
@distributed for i in eachindex(a)
    a[i] = i
end
a

100-element Vector{Float64}:
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 ⋮
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0

This is because, each process works on its own local copy of the array, not the true one. Instead, one should use a `SharedArray` which is a special array type shared across processes.

In [15]:
using SharedArrays

a = SharedArray{Int}(100)
@distributed for i in eachindex(a)
    a[i] = i
end
a

100-element SharedVector{Int64}:
   1
   2
   3
   4
   5
   6
   7
   8
   9
  10
   ⋮
  92
  93
  94
  95
  96
  97
  98
  99
 100

## Using Dagger to parallelize nested loops
Beyond the standard library, the Dagger package provides some higher level interface for running things over multiple processes/servers.

One notable thing Dagger can do is parallelize nested loops, which `@distributed` and `Threads.@threads` can't.

In [18]:
@everywhere begin
    using Dagger
    using Random
    Random.seed!(0)

    # Some "expensive" functions that complete at different speeds
    const crn = abs.(randn(20, 7))
    f(i) = sleep(crn[i, 7])
    g(i, j, y) = sleep(crn[i, j])
end
function nested_dagger()
    @sync for i in 1:20
        y = Dagger.@spawn f(i)
        for j in 1:6
            z = Dagger.@spawn g(i, j, y)
        end
    end
end







nested_dagger (generic function with 1 method)

In [19]:
nested_dagger()

For comparison, here's the two variants using only `@distributed`, parallelizing either the outer or inner loop.

In [40]:
function nested_dist_outer()
    @distributed for i in 1:20
        y = f(i)
        for j in 1:6
            z = g(i, j, y)
        end
    end
end
function nested_dist_inner()
    for i in 1:20
        y = f(i)
        @distributed for j in 1:6
            z = g(i, j, y)
        end
    end
end

nested_dist_inner (generic function with 1 method)

In [42]:
nested_dist_outer() |> fetch

## Using Folds and Transducers

As a simple example, we'll try to compute the number of palindromic primes under $10^8$.

Let's start by defining functions that return whether a given integer is a prime, or palindromic.

In [3]:
@everywhere function isprime(n)
    k = 2
    while k^2 <= n
        if mod(n,k) == 0
            return false
        end
        k += 1
    end
    return true
end

In [4]:
@everywhere function ispalindromic(n)
    let xs = digits(n)
        xs == reverse(xs)
    end
end

In [5]:
N = 10^8

100000000

The next two lines show different ways of computing what we want. The first one uses `Folds.sum` with the functionalities from `Transducers`, and must be read from left to right (the `|>` is the pipe operator).

By default, `Folds` runs in multithreaded mode (which is pretty slow in this case because the notebook only has a single thread).

In [6]:
1:N |> Filter(isprime) |> Filter(ispalindromic) |> Map(n -> 1) |> Folds.sum

782

This next line uses the `foldxd` function from transducers, which does everything distributed.

In [7]:
foldxd(+, (1 for n in 1:N if isprime(n) && ispalindromic(n)))

782

For more complicated Map-Reduce operations, the `FLoops` package allows for specifying a `for` loop with (almost) arbitrary operations, and it is compatible with the various execution modes from `Folds`.