# Parallelize code using native julia methods

This notebook presents an example of a typical a parallel problem (count stuff on a big dataset)
and uses native julia conde only to solve it. The code will involve two steps:

- 1) Split data across processes, make independent computations on each process and get partial results
- 2) Join partial results

This notebook will focus on the use of the functions pmap, @spawn, fetch and remotecall.

Some related material:

- http://docs.julialang.org/en/release-0.5/manual/parallel-computing/
- https://github.com/JuliaLang/julia/blob/master/examples/wordcount.jl
- https://blog.ajdecon.org/parallel-word-count-with-julia-an-interesting/



# Map-reduce like problems

#### parallel for with the reduction operation (+)


Let us assume that we want to apply a function to a set of values and then we want to agregate the results. Let us assume the aggregation operation is the sum, and we want to apply to every element of a for loop some function `elem_op`.

In [4]:
function elem_op(x)
  return x/1000
end

elem_op (generic function with 1 method)

In [5]:
function sum_seq(n)
    aux = 0
    for i in 1:n
       aux += elem_op(i)
    end 
    return aux
end

sum_seq (generic function with 1 method)

In [6]:
@time sum_seq(10_000_000)

  0.213696 seconds (30.00 M allocations: 457.864 MiB, 23.59% gc time)


5.000000499999999e10

In [7]:
workers()

1-element Array{Int64,1}:
 1

In [8]:
addprocs(4) 

4-element Array{Int64,1}:
 2
 3
 4
 5

In [10]:
@everywhere function elem_op(x)
  return x/1000
end

function sum_par(n)   
    return @parallel (+) for i in 1:n
           elem_op(i)
        end 
end

In [18]:
@time sum_seq(1_000_000)

  0.024713 seconds (3.00 M allocations: 45.777 MiB, 32.12% gc time)


5.0000050000000006e8

In [78]:
@time sum_par(1_000_000)

  0.001097 seconds (537 allocations: 37.547 KiB)


5.000005e8

### Understanding the internals of `@parallel`


In [22]:
?@parallel

```
@parallel
```

A parallel for loop of the form :

```
@parallel [reducer] for var = range
    body
end
```

The specified range is partitioned and locally executed across all workers. In case an optional reducer function is specified, `@parallel` performs local reductions on each worker with a final reduction on the calling process.

Note that without a reducer function, `@parallel` executes asynchronously, i.e. it spawns independent tasks on all available workers and returns immediately without waiting for completion. To wait for completion, prefix the call with [`@sync`](@ref), like :

```
@sync @parallel for var = range
    body
end
```


In [25]:
n = 100
f(n) = @parallel (+) for i in 1:n;  i end 

f (generic function with 1 method)

In [80]:
# We can see that @parallel ends up rewritting our code as a call to Base.Distributed.preduce
@macroexpand @parallel (+) for i in 1:100; i end 

:((Base.Distributed.preduce)(+, begin  # distributed/macros.jl, line 157:
            function (#143#reducer, #144#R, #145#lo::Base.Distributed.Int, #146#hi::Base.Distributed.Int) # distributed/macros.jl, line 158:
                i = #144#R[#145#lo] # distributed/macros.jl, line 159:
                #142#ac = begin  # In[80], line 2:
                        i
                    end # distributed/macros.jl, line 160:
                if #145#lo != #146#hi # distributed/macros.jl, line 161:
                    for i = #144#R[#145#lo + 1:#146#hi] # distributed/macros.jl, line 162:
                        #142#ac = #143#reducer(#142#ac, begin  # In[80], line 2:
                                    i
                                end)
                    end
                end # distributed/macros.jl, line 165:
                #142#ac
            end
        end, 1:100))

Let us dive into `Base.Distributed.preduce`

In [81]:
?Base.Distributed.preduce

No documentation found.

`Base.Distributed.preduce` is a `Function`.

```
# 1 method for generic function "preduce":
preduce(reducer, f, R) in Base.Distributed at distributed/macros.jl:138
```


In [50]:
@everywhere some_complicated_function_per_iteration(x) = (log(x+1)+(x+2))/x

In [33]:
test_1([2,3,412121])

412126

In [51]:
@parallel (+) for i in 1:100000; some_complicated_function_per_iteration(i) end 

100091.6389972468

In [57]:
@which Base.Disstributed.preduce(+,f,2)

In [59]:
reduce(+,[1,2,3,4])

10

In [71]:
reduce(log,[1,2,3,4])

-0.0

In [72]:
@which reduce(log,[1,2,3,4])

#### what `reduce` does in the source code 


```julia
reduce(op, itr) = mapreduce(identity, op, itr)
reduce(op, a::Number) = a  
```


The first definition tell us that if we apply an operation to an iterable the result is to apply the `mapreduce(identity, op, itr)`.

The second definition tell us that if we apply an operation to a single number the result is the number.

In [76]:
?mapreduce(identity, +, [1,2,3,4])

```
mapreduce(f, op, itr)
```

Like `mapreduce(f, op, v0, itr)`. In general, this cannot be used with empty collections (see `reduce(op, itr)`).


In [77]:
mapreduce(identity, +, [1,2,3,4])

10

#### montecarlo example

In [None]:
function find_pi(n) 
    inside = 0
    for i = 1:n
    x = rand(); y = rand()
    inside += (x^2 + y^2) <= 1
    end
    4 * inside/ n
end

In [None]:
@time find_pi(10);

In [None]:
@time find_pi(1_000_000_000);

In [None]:
workers()

In [None]:
addprocs(4);

In [None]:
@everywhere function find_pi(n) 
    inside = 0
    for i = 1:n
        x = rand(); y = rand()
        inside += (x^2 + y^2) <= 1
    end
    4 * inside/ n
end

In [None]:
pfind_pi(N)= mean( pmap(n->find_pi(n), [N/nworkers() for i=1:nworkers()] ))

In [None]:
@time pfind_pi(1_00);

#### Serial vs parallel versions

In [None]:
@time pfind_pi(1_000_000_000);

In [None]:
@time find_pi(1_000_000_000);

## Counting elements

In [None]:
addprocs(4) 

In [None]:
workers()

In [None]:
big_array = rand(1:10, 10^8);

In [None]:
function count_elements(array::Array{Int64})
    n = length(array)
    counts = Dict{Int64}{Int64}()
    for i in array
        if i in keys(counts)
            counts[i] += 1 
        else
            counts[i] = 1
        end
    end
    return counts
end

In [None]:
@time result_sequential = count_elements(big_array);

#### Faster way to create counts

Notice that this version is using **`get(counts,i,0)`**.

In [None]:
function count_elements2(array::Array{Int64})
    n = length(array)
    counts = Dict{Int64}{Int64}()
    for i in array
        counts[i] = get(counts,i,0) + 1
    end
    return counts
end

In [None]:
@time count_elements2(big_array);

### pmap function

Now we will build a custom reducer to aggregate the partial results then we will split the data
into similar size chunks and split the workload into different processess.

In [None]:
# reducer
function count_reduce(array_of_count_dicts)
    counts_combined = Dict{Int64}{Int64}()
    
    for d in array_of_count_dicts
        for k in keys(d)
            if k in keys(counts_combined)
                counts_combined[k] += d[k]  
            else
                counts_combined[k] = d[k] 
            end
        end
    end
    return counts_combined
end

In [None]:
# This code will fail because the different workers do not have the ¨count_elements" function
@time begin
    n = length(big_array)
    n_processors = length(workers())
    splits_ind = [Int(x) for x in 1:(n/n_processors):(n+1)]
    big_array_splits = [big_array[x:y-1] for (x,y) in zip(splits_ind[1:end-1], splits_ind[2:end])]
    res = pmap(count_elements, big_array_splits)
    d = count_reduce(res)
end

In [None]:
@everywhere function count_elements(array::Array{Int64})
    n = length(array)
    counts = Dict{Int64}{Int64}()
    for i in array
        if i in keys(counts)
            counts[i] += 1 
        else
            counts[i] = 1
        end
    end
    return counts
end

In [None]:
@time begin
    n = length(big_array)
    n_processors = length(workers())
    splits_ind = [Int(x) for x in 1:(n/n_processors):(n+1)]
    big_array_splits = [big_array[x:y-1] for (x,y) in zip(splits_ind[1:end-1], splits_ind[2:end])]
    res = pmap(count_elements, big_array_splits)
    result_paralel = count_reduce(res);
end

In [None]:
# Both computations yield to the exact same result
result_paralel  == result_sequential

### @spawn and fetch functions

Using **```@spawn```** and **```fetch```** we can build our own pmaplike function.

- **```@spawn```**: Creates a closure around an expression and runs it on an automatically-chosen process, returning a Future to the result.

- **```fetch```**: Gets the computation returned from the Future object that we build using **```@spawn```**.

In [None]:
workers()

In [None]:
# 1) Splits input string into nprocs() equal-sized chunks (last one rounds up),
# 2) @spawns wordcount() for each chunk to run in parallel. 
# 3) Then fetch()s results and performs count_reduce().

function parallel_wordcount(big_array, n_processors)
    
    n = length(big_array)
    splits_ind = [Int(x) for x in 1:(n/n_processors):(n+1)]
    big_array_splits = [big_array[x:y-1] for (x,y) in zip(splits_ind[1:end-1], splits_ind[2:end])]
    
    partial_res = []
    for subarray in big_array_splits
        push!(partial_res, @spawn count_elements(subarray) )
    end    
    results = [fetch(r) for r in partial_res]
    return count_reduce(results)
end

In [None]:
@time r = parallel_wordcount(big_array, 4);

In [None]:
r

### Let us look at the code piece by piece|

In [None]:
workers()

In [None]:
?remotecall

In [None]:
#run a command on a different worker
rmatrix = remotecall(2, rand, 2, 2)
print(rmatrix)

In [None]:
rmatrix

In [None]:
fetch(rmatrix)

In [None]:
partial_res = []
for subarray in big_array_splits
    r = remotecall(count_elements, subarray)
    push!(partial_res, @spawn count_elements(subarray) )
end

# pmap


Let us consider the case where we have an array and we want to apply some function at every position of the array and get a new array with the results.  We can approach this problem in a variety of ways.


#### pmap solution

This type of problem can be parallelized with the **`pmap`** function. This only makes sence when the function that we want to apply at every element of the collection is expensive to compute. Otherwise this will be much slower than the simple non-parallel solution.

In [158]:
vals = rand(Int32,1_000_00)

for i in 1:length(vals)
    if vals[i]<0
        vals[i] = -vals[i]
    end
end

In [159]:
@everywhere function is_multiple_of_20(x)
    if x%20==0
        return true
    else
        return false
    end
end

In [160]:
@time is_multiple_of_20.(vals);

  0.016363 seconds (6.90 k allocations: 382.055 KiB)


In [161]:
@time map(is_multiple_of_20,vals);

  0.012561 seconds (7.96 k allocations: 508.472 KiB)


In [162]:
workers()

4-element Array{Int64,1}:
 2
 3
 4
 5

In [28]:
@time pmap(is_multiple_of_20,vals);

LoadError: [91mUndefVarError: is_multiple_of_20 not defined[39m

In [30]:
using BenchmarkTools
addprocs()

In [31]:
x=[rand(100,100) for i in 1:10];

In [32]:
@benchmark map(svd, x)

BenchmarkTools.Trial: 
  memory estimate:  5.47 MiB
  allocs estimate:  192
  --------------
  minimum time:     16.352 ms (0.00% GC)
  median time:      16.994 ms (0.00% GC)
  mean time:        17.231 ms (1.08% GC)
  maximum time:     21.518 ms (5.52% GC)
  --------------
  samples:          290
  evals/sample:     1

In [38]:
@benchmark pmap(svd,x)

BenchmarkTools.Trial: 
  memory estimate:  1.67 MiB
  allocs estimate:  1548
  --------------
  minimum time:     5.512 ms (0.00% GC)
  median time:      7.082 ms (0.00% GC)
  mean time:        7.183 ms (0.94% GC)
  maximum time:     12.217 ms (0.00% GC)
  --------------
  samples:          696
  evals/sample:     1