# Parallelize code using native julia methods

This notebook presents an example of a typical a parallel problem (count stuff on a big dataset)
and uses native julia conde only to solve it. The code will involve two steps:

- 1) Split data across processes, make independent computations on each process and get partial results
- 2) Join partial results

This notebook will focus on the use of the functions pmap, @spawn, fetch and remotecall.

Some related material:

- http://docs.julialang.org/en/release-0.5/manual/parallel-computing/
- https://github.com/JuliaLang/julia/blob/master/examples/wordcount.jl
- https://blog.ajdecon.org/parallel-word-count-with-julia-an-interesting/



#### montecarlo example

In [1]:
function find_pi(n) 
    inside = 0
    for i = 1:n
    x = rand(); y = rand()
    inside += (x^2 + y^2) <= 1
    end
    4 * inside/ n
end

find_pi (generic function with 1 method)

In [5]:
@time find_pi(10);

  0.015498 seconds (5.16 k allocations: 274.803 KiB)


In [6]:
@time find_pi(1_000_000_000);

  4.723181 seconds (5 allocations: 176 bytes)


In [7]:
workers()

1-element Array{Int64,1}:
 1

In [8]:
addprocs(4);

In [10]:
@everywhere function find_pi(n) 
    inside = 0
    for i = 1:n
        x = rand(); y = rand()
        inside += (x^2 + y^2) <= 1
    end
    4 * inside/ n
end

In [13]:
pfind_pi(N)= mean( pmap(n->find_pi(n), [N/nworkers() for i=1:nworkers()] ))

pfind_pi (generic function with 1 method)

In [18]:
@time pfind_pi(1_00);

  0.001603 seconds (676 allocations: 62.375 KiB)


#### Serial vs parallel versions

In [16]:
@time pfind_pi(1_000_000_000);

  2.580743 seconds (737 allocations: 64.578 KiB)


In [17]:
@time find_pi(1_000_000_000);

  4.773313 seconds (2.71 k allocations: 140.011 KiB)


## Counting elements

In [1]:
addprocs(4) 

4-element Array{Int64,1}:
 2
 3
 4
 5

In [20]:
workers()

4-element Array{Int64,1}:
 2
 3
 4
 5

In [21]:
big_array = rand(1:10, 10^8);

In [22]:
function count_elements(array::Array{Int64})
    n = length(array)
    counts = Dict{Int64}{Int64}()
    for i in array
        if i in keys(counts)
            counts[i] += 1 
        else
            counts[i] = 1
        end
    end
    return counts
end

count_elements (generic function with 1 method)

In [23]:
@time result_sequential = count_elements(big_array);

  5.487689 seconds (20.31 k allocations: 1.113 MiB)


#### Faster way to create counts

Notice that this version is using **`get(counts,i,0)`**.

In [28]:
function count_elements2(array::Array{Int64})
    n = length(array)
    counts = Dict{Int64}{Int64}()
    for i in array
        counts[i] = get(counts,i,0) + 1
    end
    return counts
end

count_elements2 (generic function with 1 method)

In [29]:
@time count_elements2(big_array);

  3.884165 seconds (1.34 k allocations: 70.337 KiB)


### pmap function

Now we will build a custom reducer to aggregate the partial results then we will split the data
into similar size chunks and split the workload into different processess.

In [30]:
# reducer
function count_reduce(array_of_count_dicts)
    counts_combined = Dict{Int64}{Int64}()
    
    for d in array_of_count_dicts
        for k in keys(d)
            if k in keys(counts_combined)
                counts_combined[k] += d[k]  
            else
                counts_combined[k] = d[k] 
            end
        end
    end
    return counts_combined
end

count_reduce (generic function with 1 method)

In [31]:
# This code will fail because the different workers do not have the ¨count_elements" function
@time begin
    n = length(big_array)
    n_processors = length(workers())
    splits_ind = [Int(x) for x in 1:(n/n_processors):(n+1)]
    big_array_splits = [big_array[x:y-1] for (x,y) in zip(splits_ind[1:end-1], splits_ind[2:end])]
    res = pmap(count_elements, big_array_splits)
    d = count_reduce(res)
end

LoadError: [91mOn worker 4:
[91mUndefVarError: #count_elements not defined[39m
deserialize_datatype at ./serialize.jl:969
handle_deserialize at ./serialize.jl:674
deserialize at ./serialize.jl:634
handle_deserialize at ./serialize.jl:681
deserialize_msg at ./distributed/messages.jl:98
message_handler_loop at ./distributed/process_messages.jl:161
process_tcp_streams at ./distributed/process_messages.jl:118
#99 at ./event.jl:73[39m

In [32]:
@everywhere function count_elements(array::Array{Int64})
    n = length(array)
    counts = Dict{Int64}{Int64}()
    for i in array
        if i in keys(counts)
            counts[i] += 1 
        else
            counts[i] = 1
        end
    end
    return counts
end

In [34]:
@time begin
    n = length(big_array)
    n_processors = length(workers())
    splits_ind = [Int(x) for x in 1:(n/n_processors):(n+1)]
    big_array_splits = [big_array[x:y-1] for (x,y) in zip(splits_ind[1:end-1], splits_ind[2:end])]
    res = pmap(count_elements, big_array_splits)
    result_paralel = count_reduce(res);
end

  4.916874 seconds (15.95 k allocations: 763.813 MiB, 5.67% gc time)


Dict{Int64,Int64} with 10 entries:
  7  => 10002619
  4  => 9997099
  9  => 9995722
  10 => 9998713
  2  => 9996473
  3  => 10002016
  8  => 10001209
  5  => 10003816
  6  => 9997415
  1  => 10004918

In [16]:
# Both computations yield to the exact same result
result_paralel  == result_sequential

true

### @spawn and fetch functions

Using **```@spawn```** and **```fetch```** we can build our own pmaplike function.

- **```@spawn```**: Creates a closure around an expression and runs it on an automatically-chosen process, returning a Future to the result.

- **```fetch```**: Gets the computation returned from the Future object that we build using **```@spawn```**.

In [35]:
workers()

4-element Array{Int64,1}:
 2
 3
 4
 5

In [36]:
# 1) Splits input string into nprocs() equal-sized chunks (last one rounds up),
# 2) @spawns wordcount() for each chunk to run in parallel. 
# 3) Then fetch()s results and performs count_reduce().

function parallel_wordcount(big_array, n_processors)
    
    n = length(big_array)
    splits_ind = [Int(x) for x in 1:(n/n_processors):(n+1)]
    big_array_splits = [big_array[x:y-1] for (x,y) in zip(splits_ind[1:end-1], splits_ind[2:end])]
    
    partial_res = []
    for subarray in big_array_splits
        push!(partial_res, @spawn count_elements(subarray) )
    end    
    results = [fetch(r) for r in partial_res]
    return count_reduce(results)
end

parallel_wordcount (generic function with 1 method)

In [37]:
@time r = parallel_wordcount(big_array, 4);

  5.593987 seconds (251.25 k allocations: 777.149 MiB, 4.69% gc time)


In [38]:
r

Dict{Int64,Int64} with 10 entries:
  7  => 10002619
  4  => 9997099
  9  => 9995722
  10 => 9998713
  2  => 9996473
  3  => 10002016
  8  => 10001209
  5  => 10003816
  6  => 9997415
  1  => 10004918

### Let us look at the code piece by piece|

In [49]:
workers()

4-element Array{Int64,1}:
 2
 3
 4
 5

In [50]:
?remotecall

search: [1mr[22m[1me[22m[1mm[22m[1mo[22m[1mt[22m[1me[22m[1mc[22m[1ma[22m[1ml[22m[1ml[22m [1mr[22m[1me[22m[1mm[22m[1mo[22m[1mt[22m[1me[22m[1mc[22m[1ma[22m[1ml[22m[1ml[22m_wait [1mr[22m[1me[22m[1mm[22m[1mo[22m[1mt[22m[1me[22m[1mc[22m[1ma[22m[1ml[22m[1ml[22m_fetch [1mR[22m[1me[22m[1mm[22m[1mo[22m[1mt[22m[1me[22m[1mC[22mh[1ma[22mnne[1ml[22m



```
remotecall(f, id::Integer, args...; kwargs...) -> Future
```

Call a function `f` asynchronously on the given arguments on the specified process. Returns a [`Future`](@ref). Keyword arguments, if any, are passed through to `f`.

```
remotecall(f, pool::AbstractWorkerPool, args...; kwargs...) -> Future
```

`WorkerPool` variant of `remotecall(f, pid, ....)`. Waits for and takes a free worker from `pool` and performs a `remotecall` on it.


In [48]:
#run a command on a different worker
rmatrix = remotecall(2, rand, 2, 2)
print(rmatrix)

LoadError: [91mMethodError: no method matching remotecall(::Int64, ::Base.Random.#rand, ::Int64, ::Int64)[0m
Closest candidates are:
  remotecall(::Any, [91m::Base.Distributed.LocalProcess[39m, ::Any...; kwargs...) at distributed/remotecall.jl:318
  remotecall(::Any, [91m::Base.Distributed.Worker[39m, ::Any...; kwargs...) at distributed/remotecall.jl:324
  remotecall(::Any, [91m::Integer[39m, ::Any...; kwargs...) at distributed/remotecall.jl:336
  ...[39m

In [43]:
rmatrix

LoadError: [91mUndefVarError: rmatrix not defined[39m

In [44]:
fetch(rmatrix)

LoadError: [91mUndefVarError: rmatrix not defined[39m

In [None]:
partial_res = []
for subarray in big_array_splits
    r = remotecall(count_elements, subarray)
    push!(partial_res, @spawn count_elements(subarray) )
end