# Parallel and Distributed Computing with Julia

## Tasks 

(aka [co-routines](https://en.wikipedia.org/wiki/Coroutine)) form the basis for Julia's provision of **parallel processing**. They are sometimes referred to as lightweight or green threads. 

When some code is executed as a task, it is possible to suspend it and switch to another task. The original task can be resumed and will continue from where it was suspended.

Tasks cooperate by using a **producer-consumer** mechanism. 

A producer task will halt at a point where it has some values, which need to be consumed, and a separate task will be able to access these values. 

Producer and consumer tasks can both continue to run by exchanging values as necessary.

### Message passing -- Basic parallel primitives

Julia provides a multiprocessing environment based on **unidirectional** *message passing*

In [None]:
@time rand(100,100);

In [None]:
?@time

In [None]:
a = @sync println("hi")

In [None]:
?@sync

In [None]:
a = @async 1+2

In [None]:
?@async

In [None]:
nprocs()

In [None]:
# add 3 workers
addprocs(3)

In [None]:
nprocs()

In [None]:
#run a command on a different worker
rmatrix = remotecall(rand, 4, 2, 2)

In [None]:
?remotecall

In [None]:
print(rmatrix)

In [None]:
print(fetch(rmatrix))

In [None]:
task = remotecall(rand, 2, 3, 3)

In [None]:
fetch(task)

In [None]:
?fetch

In [None]:
task2 = @spawnat 2 5 .+ fetch(task)

In [None]:
@spawnat 2 fetch(task2)[1,1]=200

In [None]:
fetch(task2)

In [None]:
?@spawnat

In [None]:
fetch(task2)

In [None]:
remotecall_fetch(getindex, 2, task2, 1,1)

In [None]:
?remotecall_fetch

In [None]:
r = remotecall(rand,2, 100,100)

In [None]:
fetch(r)

In [None]:
r = @spawn rand(100,100);

In [None]:
nprocs()

In [None]:
myid()

In [None]:
?myid()

In [None]:
@everywhere println(myid())

In [None]:
?@everywhere

In [None]:
r = @spawn myid()

In [None]:
?@spawn

In [None]:
fetch(r)

In [None]:
fetch(@spawn myid())

In [None]:
fetch(@spawn myid())

In [None]:
fetch(@spawn myid())

In [None]:
fetch(@spawn myid())

there is a pattern !! :-)

In [None]:
fetch(@spawnat 2 myid())

In [None]:
fetch(@spawnat 3 myid())

### Another level of basic parallelism

####  Parallel List compehension = ???

In [None]:
a = @parallel [2*i for i = 1:10]

parallel ???

In [None]:
?@parallel

In [None]:
function p_rand()
   n = 10^4
   x = @parallel (+) for i in 1:n
       sum(rand(10^4))
   end
   x / n
end

In [None]:
p_rand()

In [None]:
#using Distributed
#@distributed for N in 1:5:20

@parallel for N in 1:5:20
    println("The N of this iteration in $N")
end

### Parallel Map and Loops

We can use @spawn to flip coins on two processes. (Example from Julia manual)

In [None]:
@everywhere function count_heads(n)
    c::Int = 0
    for i = 1:n
        c += rand(Bool)
    end
    c
end

In [None]:
a = @spawn count_heads(100000000)

In [None]:
b = @spawn count_heads(100000000)

In [None]:
fetch(a)+fetch(b)

In [None]:
fetch(a)

In [None]:
fetch(a)+fetch(b)

In [None]:
a

This example demonstrates a powerful and often-used parallel programming pattern: **Reduction** 

Many iterations run independently over several processes, and then their results are combined 
using some function. 

The combination process is called a **reduction**, since it is generally 
tensor-rank-reducing: a vector of numbers is reduced to a single number, 
or a matrix is reduced to a single row or column, etc. 

In code, this typically looks like the pattern `x = f(x,v[i])`, where 
* `x` is the accumulator, 
* `f` is the *reduction* function, and 
* the `v[i]` are the elements being reduced. 

It is desirable for `f` to be associative, so that it does not matter what order the operations are performed in.

### Generalized reduction

We used two explicit @spawn statements, which limits the parallelism to two processes. To **run on any number of processes**, we can use a parallel for loop, running in distributed memory, which can be written in Julia using `@distributed` like this:

In [None]:
nheads = @parallel (+) for i = 1:200000000
    Int(rand(Bool))
end

In [None]:
using SharedArrays

a = SharedArray{Float64}(10)
@distributed for i = 1:10
    a[i] = i
end

In [None]:
Pkg.add("SharedArrays")

### Parallel map

In [None]:
M = Matrix{Float64}[rand(1000,1000) for i = 1:10];

In [None]:
?pmap

In [None]:
pmap(svdvals, M)

#### Conclusion on `pmap` and `@distributed`

*  Julia's `pmap` is designed for the case where each function call does a **large amount of work**. 

*  In contrast, `@distributed` for can handle situations where **each iteration is tiny**, perhaps merely summing two numbers. 

Only worker processes are used by both `pmap` and `@distributed for` for the parallel computation. 

In case of `@distributed for`, the *final reduction* is done on the *calling process*.

## Example:  parallel Fibonacci

In [None]:
function fib(n)
                 if (n < 2)
                     return n
                 else return fib(n-1) + fib(n-2)
                 end
end

In [None]:
fib(10)

In [None]:
z = @spawn fib(10)

In [None]:
fetch(z)

In [None]:
@time [fib(i) for i=1:45];

In [None]:
@everywhere function fib_parallel(n)
    if (n < 20)
      return fib(n)
    else
      x = @spawn fib_parallel(n-1)
      y = fib_parallel(n-2)
      return fetch(x) + y
    end 
end

In [None]:
@time [fib_parallel(i) for i=1:45];