# Parallel Computing in Julia

Julia is designed from the ground up to be parallel. Let's start understanding the parallel computing model in Julia. 

- Julia's implementation of message passing is different from other environments such as MPI. 
- Communication in Julia is generally "one-sided", meaning that the programmer needs to explicitly manage only one process in a two-process operation. 
- Furthermore, these operations typically do not look like "message send" and "message receive" but rather resemble higher-level operations like calls to user functions.

But before we start exploring these constructs, let us look at what a Julia `Task` is: 

## Tasks

A good first reference for most things in Julia is by referencing the documentation. You can do this by typing `?` followed by the keyword you'd like to refer. 

Since our first discussion is on Tasks, let us load the documentation for a `Task`:

In [None]:
?Task

A Julia Task is: 

- a very lightweight coroutine
- Not a thread!
- Internal to and scheduled by a Julia Process

In [None]:
function mytask()
    println("Going to take a nap.")
    sleep(10)
    println("Woke up.")
    rand()
end

t=Task(mytask)

What happened here? We've created a task just like how the documentation told us. But is it running? 

The task is currently a `runnable`, which means that it is _created_ but not _scheduled_ yet. 

## Scheduling and waiting on a task

`schedule` starts the task, but will *return immediately*. This means that it does **not** block the master process.

(**NOTE**: Run the next two cells immediately one after the other before looking at the accompanying text)

In [None]:
schedule(t)

## Waiting on a task

The task has now been scheduled and is actively running in the background. Since it hasn't blocked the master process, we can perform some computation in the meantime. 

In [None]:
println("Doing something else while t is taking a nap...")
inv(rand(100, 100))
@time @show wait(t)
@show t.state
println("task finished")

## `@async` - syntax sugar for creating and scheduling tasks

Of course, we can **create and schedule tasks in one go** by putting code in an `@async` block

In [None]:
t=@async begin
    println("Going to take a nap.")
    sleep(5)
    println("Woke up.")
end

Sure enough, before the 5 seconds of sleep time are up, we can schedule computation. 

In [None]:
21+21

## Channels

Channels are used for communication between Tasks. To demonstrate, consider the following simple producer-consumer model, like so:

In [None]:
input = Channel{Int}(1)
result = Channel{Int}(1)
doubler = @async while true
    x = take!(input)
    println("Got message $x")
    put!(result, 2x)
end

printer = @async while true
    res = take!(result)
    @show res
end

Now let's add some input to the `Channel` via the `put!` command.

In [None]:
using Interact
@manipulate for i=1:100
    put!(input, i)
end

## Adding Julia Processes, running "Remote Tasks"

Now let's start running tasks remotely, on other Julia processes. First, we need to request our cluster manager (`JuliaRun`) for 8 worker Julia processes. Note that this means that the **master** process can now **shedule work** on the 4 worker processes. 

In [None]:
using JuliaRunClient
ctx = Context()
nb = self()

In [None]:
initParallel()
@result setJobScale(ctx, nb, 8)
waitForWorkers(8)

If you were on your own laptop or on a cluster that isn't set up with `JuliaRun`, you should use the `addprocs` command to initialize Julia worker processes.

In [None]:
# Run if using the notebook on your own computer
# addprocs(4)

Since we have a master process and 4 worker processes, the total number of processes we have initialized is 5. 

In [None]:
procs()

Now let's consider a simple example to demonstrate the use of these new tools. 



## Estimate $\pi$ in parallel

There's a simple monte carlo method one can use to calculate $\pi$: 
1. Remember that the ratio of the area of a unit circle and a unit square is: 
$$ 4r^2 / \pi r^2 = \pi / 4$$ where $r$ is the radius of the circle.
2. Next, remember that the square of the coordinates of a point gives you the distance from the origin. 
3. We can now randomly simulate `N` points, and calculate the fraction of points that fall within the unit circle. 
4. This is the ratio of the area of a unit circle and unit square. 4 times this ratio gives you the value of $\pi$.

In [None]:
@everywhere function trials(numsteps=1000)  # default value of the parameter
    pos = 0 
    for j in 1:numsteps
        pos += Int(rand()^2 + rand()^2 < 1)
    end
    return pos
end

function estimate_pi(in_circle, N)
    4in_circle / N
end

Let's see if it works with 10^8 trials.

In [None]:
estimate_pi(trials(10^8), 10^8)

## `@spawnat` - schedule tasks on different procceses 

`@spawnat` is like @async but runs on a different process

In [None]:

f=@spawnat 3 begin
    println("Process ", myid(), " starting random trials")
    res = trials(10^8)
    println("Process ", myid(), " done")
    res
end

In [None]:
typeof(f)

What's the curious `Future(3,1,12,Nullable{Any}())` thing?

In [None]:
f[]

A `Future` is a reference to the computation on a Julia worker (aka remote) process. Doing `f[]` returns its values

Now our monte carlo simulation to estimate $\pi$ is embarrassingly parallel, so we can offload some of the computation to another Julia process. Just like we created a task using `@async`, but this time it's running a task on a remote process.

In [None]:
function remote_trials(pid,n)
    @spawnat pid begin
        println("Process ", myid(), " starting trials")
        trials(n)
    end
end

For example, let us schedule 1000 trials on process 2. Since this task is scheduled on another process, it returns a `Future`. 

In [None]:
remote_trials(2, 1000)

Therefore, to estimate $\pi$ parallel, we need to spawn trials on all our worker processes.

In [None]:
function parallel_trials(n, pids=workers())
    @time futures = [remote_trials(p,n) for p in pids]
    sum([f[] for f in futures])
end

Each of them would start a number of trials and return the number of trials that fell within the unit circle. Eventually, we divide by the total number of trials, and estimate the value of $\pi$. Let us see if our simulation works. 

In [None]:
@time estimate_pi(parallel_trials(10^8), 10^8*nworkers())

(Remember to run it twice to get the true time!)

Let us compare with time in serial:

In [None]:
@time estimate_pi(trials(10^8*nworkers()), 10^8*nworkers())

# Distributed Arrays

We've played with the basic elements of Julia's parallel computing infrastructure. This is a one-sided communication model, and using some of these basic constructs requires understanding the model.

But what if, as a user, you don't really want to think about your parallel model? What if you can abstract out this logic and just use a simple array interface to perform distributed computation?

In [None]:
# If you were running this on your local notebook, you would do this to add processes
# addprocs(8)
# Use package for distributed arrays
using DistributedArrays

In [None]:
# Apply a map to the vector
map(t -> t*t, C)

To convert this `Array` into a `DArray` (a distributed array), use `distribute`

In [None]:
# Make the vector distributed
D = distribute(C)

The `distribute` command cuts the `Array` into chunks and then stores them on the different processes

In [None]:
# show how the vector is distributed accross the workers
D.indexes

Now when you run a `map` on a `DArray`, it runs in parallel!

In [None]:
# apply map to distributed vector (looks identical to non-distributed case)
map(t -> t*t, D)

The nice thing about this is `DArray`s aren't restricted to numeric types.

In [None]:
map(t -> Dates.monthname((t - 1) % 12 + 1), D)

See if you can parse and understand this next example.

In [None]:
monthString = map(t -> Dates.monthname((t - 1) % 12 + 1) |> s -> s*" is my favorite month.\n", D) |>
    t -> reduce(*, Array(t))
println(monthString)

We can also declare a distrubted array of matrices via a distributed comprehension. 

In [None]:
D55 = @DArray [randn(5,5) for i = 1:32]

And subsequently `map` a function on them in parallel. 

In [None]:
# Compute singular values of the dsitributed vector of matrices
Dsvd = map(svdvals, D55)

# Random walks

Now, we will look at one of the simplest types of Monte Carlo numerical simulation, random walks.

In the simplest random walk, a particle starts at $0$ and jumps to the left ($-1$) or the right ($+1$) with equal probability.

The following is a simple implementation of a single random walk:

In [None]:
@time begin
    
    numsteps = 1000
    pos = 0 
    for j in 1:numsteps

        if rand() < 0.5
            step = -1
        else
            step = +1
        end

        pos += step 
    end
    
end

Let's wrap it in a function, which is good programming practice, and allows us to have `numsteps` as a paramater.
It turns out to have an additional, important effect in Julia.

In [None]:
"""Single 1D random walk from the origin.
Returns the final position after `numsteps` steps."""
function walk(numsteps=1000)  # default value of the parameter
    
    pos = 0 
    
    for j in 1:numsteps

        if rand() < 0.5   # can replace by rand(Bool)
            step = -1
        else
            step = +1
        end

        pos += step 
    end
    
    return pos
    
end

In [None]:
@time walk(1)

In [None]:
@time walk(100)

In [None]:
@time walk(1000)

## Draw a random walk

One way to understand what each walker is doing is by visualizing its path. In Julia, most visualization is done via the `Plots` package, which an umbrella package with a uniform API across different plotting libraries (aka *backends*). Let's load the `plotly` backend.

In [None]:
using Plots; plotly()

Let us record the position at each step via a new function `trajectory`

In [None]:
function trajectory(numsteps=1000)

    pos = 0 
    positions = [pos]

    for j in 1:numsteps

        if rand() < 0.5
            step = -1
        else
            step = +1
        end

        pos += step 
        push!(positions, pos)

    end
    
    positions
end



And now, plot:

In [None]:
numsteps = 1000
plot(1:numsteps, trajectory(numsteps))

Now let us get a sense of how much time this takes. 

In [None]:
using Interact

In [None]:
@manipulate for k in 3:9
    @elapsed walk(10^k)
end

In [None]:
plot(3:9, [@elapsed walk(10^k) for k = 3:9])

## Add parallelism

Now that we have a sense of how much time it takes, we now offload work to other Julia processes. Let's now use `DArrays` to parallelize this random walk. 

In [None]:
@everywhere using DistributedArrays

Here's our `walk` function again. 

In [None]:
@everywhere function walk(numsteps)
    pos = 0

    for j in 1:numsteps
        
        if rand(Bool)  # NB
            step = -1
        else
            step = +1
        end
        
        pos += step # ifelse(rand() < 0.5, -1, +1)
    end
    
    return pos
end

Let us define how many walkers we want and how many steps we want them to walk. In serial, all our walkers are present on a single process.

In [None]:
@everywhere begin
    numsteps   = 10000
    numwalkers = 100000 
end
serialwalkers = collect(1:numwalkers)


But with `distribute`, the walkers are distributed across all worker processes.

In [None]:
parallelwalkers = distribute(serialwalkers)

And, as earlier, we can examine the distribution by looking at the indices stored on each worker.

In [None]:
parallelwalkers.indexes

In [None]:
typeof(parallelwalkers)

## Benchmarking

Most benchmarking in Julia is done via the package `BenchmarkTools`. 

In [None]:
using Compat
using BenchmarkTools

Let us perform the random walks by calling the `map` function on all the workers. 

In serial:

In [None]:
@benchmark map(_ -> walk(numsteps), serialwalkers)

In parallel:

In [None]:
@benchmark positions = map( _ -> walk(numsteps), parallelwalkers)