# Parallel For Loops in Julia

## Setting

A computational problem ends up taking the form of **iterating over dimensions a large multidimensional array**. **Some dimensions** of this array are **independent** from each others. One may use this feature to **parallelize calculations** to obtain a speed-up compared to the serial outcome.

## How ?

This notebook illustrates how one may **parallelize a for loop** using :

1. native parallel julia 
2. `SharedArrays`
3. `DistibutedArrays`

## A Toy Problem

The program takes a 3-dimensional matrix `A[:,:,:]` and sets its rows and columns to be equal to the third dimension. For instance `A[:,:,1] = ones(n1,n2)`, `A[:,:,2] = ones(n1,n2)*2`, `A[:,:,3] = ones(n1,n2)*3`, where $n1$ and $n2$ are the number of rows and columns respectively.

This is a toy example, because three lines of code would suffice
```julia
for k=1:size(A,3)
    A[:,:,k] = k
end
```

Yet, here we are going to act **as if the first and the second dimensions of $A$ are dependent** (calculations are to be serially done, respecting the order):
```julia
# This loop can be parallelized
for k=1:size(A,3)
    # We cannot parallelize this level
    for j=1:size(A,2)
        for i=1:size(A,1)
             A[i,j,k] = k
         end
      end
    end
end
```

This may look like a silly constraint, but this type of restriction often arises when solving economic models via backwad induction.

## Preliminaries

Let's add workers to the current session, define some constants and load the required packages.

In [50]:
using Distributed

if nworkers() < 3 # Let's say we want 3 workers
    addprocs(3)
end

println(nworkers())

@everywhere using SharedArrays
@everywhere using DistributedArrays
@everywhere using Distributions
@everywhere const dim1 = 100
@everywhere const dim2 = 100
@everywhere const dim3 = nworkers()*1

3


# I. The serial solution

## The obvious candidate

The following solution does what it should, but does not respect the constraint that calculations at the level of the first and second dimensions should be done serially:

In [9]:
t = zeros(dim1,dim2,dim3)

@time for k=1:size(t,3)
    println(k)
    t[:,:,k] .= k
end

1
2
3
  0.068585 seconds (270.07 k allocations: 14.209 MiB)


## Taking into account the serial constraints

The following solution respects the constraint:

In [11]:
@time for k=1:size(t,3)
    # We cannot parallelize this level
    #----------------------------------
    for j=1:size(t,2)
        for i=1:size(t,1)
         t[i,j,k] = k
         end
    end
end

  0.005491 seconds (32.03 k allocations: 1.011 MiB)


Let's spice up the problem by making each execution slower.
Let's use `sleep(0.0)`, which (suprisingly) takes more than 0 second to run.

In [12]:
@everywhere function give_my_id_serial!(x::Array{Float64,3}, id::Int64)
    for indexDim2 = 1:dim2
        for indexDim1 = 1:dim1
            x[indexDim1, indexDim2, id] = id
            #sleep (0.0) still take some time:
            #0.000236 seconds (37 allocations: 800 bytes)
            sleep(0.0)
        end
    end
end

In [13]:
function wrapper(x::Array{Float64,3})
    give_my_id_serial!(x, 1)
    give_my_id_serial!(x, 2)
    give_my_id_serial!(x, 3)
end

wrapper (generic function with 1 method)

In [14]:
@time wrapper(t)

 35.104333 seconds (151.98 k allocations: 4.540 MiB)


As illustrated above, the task we want to accomplish takes approximately 35 seconds.
Yet, as the code makes it obvious, 3 workers could be used in parallel to divide the execution time by
approximately 3.

## II. Using native Julia

Let's split the data evenly among the 3 workers. In do that by initializing a 3-dimensional array on each worker.  

In [16]:
asyncmap(fetch, (@spawnat p eval(:(myLocalArray = zeros(dim1, dim2, 1)))) for p in workers())

3-element Array{Array{Float64,3},1}:
 [0.0 0.0 … 0.0 0.0; 0.0 0.0 … 0.0 0.0; … ; 0.0 0.0 … 0.0 0.0; 0.0 0.0 … 0.0 0.0]
 [0.0 0.0 … 0.0 0.0; 0.0 0.0 … 0.0 0.0; … ; 0.0 0.0 … 0.0 0.0; 0.0 0.0 … 0.0 0.0]
 [0.0 0.0 … 0.0 0.0; 0.0 0.0 … 0.0 0.0; … ; 0.0 0.0 … 0.0 0.0; 0.0 0.0 … 0.0 0.0]

The following function is built so that each worker assigns a specific value equal to `id`, respecting the serial constraint

In [17]:
@everywhere function give_my_id_parallel!(x::Array{Float64,3}, id::Int64)
    for indexDim2 = 1:dim2
        for indexDim1 = 1:dim1
            x[indexDim1, indexDim2, 1] = id
            sleep(0.0)
        end
    end
end

In [18]:
@time asyncmap(fetch, (@spawnat p give_my_id_parallel!(myLocalArray, p-1)) for p in workers())

 12.089481 seconds (393.82 k allocations: 20.636 MiB)


3-element Array{Nothing,1}:
 nothing
 nothing
 nothing

Now, let's recombine the data. For this part, I get inspiration from this excellent post:
https://stackoverflow.com/questions/39058884/julia-use-of-pmap-with-arrays-vs-sharedarrays

In [19]:
#inspired by: https://stackoverflow.com/questions/39058884/julia-use-of-pmap-with-arrays-vs-sharedarrays
getfrom(p::Int, nm::Symbol; mod=Main) = fetch(@spawnat(p, getfield(mod, nm)))

function recombine_data(Data::Symbol)
    Results = zeros(dim1,dim2,dim3)
    for (idx, pid) in enumerate(workers())
        Results[:,:,idx] = getfrom(pid, Data)
    end
    return Results
end

recombine_data (generic function with 1 method)

In [20]:
@time k = recombine_data(:myLocalArray)

  0.061616 seconds (52.64 k allocations: 3.372 MiB)


100×100×3 Array{Float64,3}:
[:, :, 1] =
 1.0  1.0  1.0  1.0  1.0  1.0  1.0  1.0  …  1.0  1.0  1.0  1.0  1.0  1.0  1.0
 1.0  1.0  1.0  1.0  1.0  1.0  1.0  1.0     1.0  1.0  1.0  1.0  1.0  1.0  1.0
 1.0  1.0  1.0  1.0  1.0  1.0  1.0  1.0     1.0  1.0  1.0  1.0  1.0  1.0  1.0
 1.0  1.0  1.0  1.0  1.0  1.0  1.0  1.0     1.0  1.0  1.0  1.0  1.0  1.0  1.0
 1.0  1.0  1.0  1.0  1.0  1.0  1.0  1.0     1.0  1.0  1.0  1.0  1.0  1.0  1.0
 1.0  1.0  1.0  1.0  1.0  1.0  1.0  1.0  …  1.0  1.0  1.0  1.0  1.0  1.0  1.0
 1.0  1.0  1.0  1.0  1.0  1.0  1.0  1.0     1.0  1.0  1.0  1.0  1.0  1.0  1.0
 1.0  1.0  1.0  1.0  1.0  1.0  1.0  1.0     1.0  1.0  1.0  1.0  1.0  1.0  1.0
 1.0  1.0  1.0  1.0  1.0  1.0  1.0  1.0     1.0  1.0  1.0  1.0  1.0  1.0  1.0
 1.0  1.0  1.0  1.0  1.0  1.0  1.0  1.0     1.0  1.0  1.0  1.0  1.0  1.0  1.0
 1.0  1.0  1.0  1.0  1.0  1.0  1.0  1.0  …  1.0  1.0  1.0  1.0  1.0  1.0  1.0
 1.0  1.0  1.0  1.0  1.0  1.0  1.0  1.0     1.0  1.0  1.0  1.0  1.0  1.0  1.0
 1.0  1.0  1.0  1.0  1.0

## III. Using DistributedArrays

The package `DistributedArrays` was created to distribute "chunks" of a mutlidimensional array (matrix) to workers.
Each worker owns only a part of the multidimensional array. In the code below, I create a 3-dimensional matrix `a`. Using the function `dzeros`, I specify that I want it to be spread among all the available workers. In the last part of the command, I tell ̀`DistributedArrays` that I want the matrix to be spread among workers along the third dimension only.

In [28]:
#a = dzeros(dim1,dim2,dim3);
#Split along the 3rd dimension
#-----------------------------
a = dzeros((dim1,dim2,dim3), workers(), [1,1,nworkers()]);

Using the function `procs`, one can see that:
* workers 2 owns the chunk `t[:, :, 1]` of `a`
* workers 3 owns the chunk `t[:, :, 2]` of `a`.
* workers 4 owns the chunk `t[:, :, 3]` o `a`.

In [29]:
procs(a)

1×1×3 Array{Int64,3}:
[:, :, 1] =
 2

[:, :, 2] =
 3

[:, :, 3] =
 4

The function `give_my_id_parallel!` can be sent to workers using `asyncmap` and the usual `fetch @spawnat` commands:

In [30]:
@time asyncmap(fetch, (@spawnat p give_my_id_parallel!(localpart(a), p-1)) for p in procs(a))

 11.746890 seconds (460.22 k allocations: 23.943 MiB)


1×1×3 Array{Nothing,3}:
[:, :, 1] =
 nothing

[:, :, 2] =
 nothing

[:, :, 3] =
 nothing

As expected, execution time is divided by approximately 3.
By inspecting `a`, one sees that it has the correct dimension and values.

In [31]:
@time [@spawnat p println(size(localpart(a))) for p in procs(a)]

  0.043735 seconds (121.68 k allocations: 6.261 MiB)


1×1×3 Array{Future,3}:
[:, :, 1] =
 Future(2, 1, 14504, nothing)

[:, :, 2] =
 Future(3, 1, 14505, nothing)

[:, :, 3] =
 Future(4, 1, 14506, nothing)

      From worker 3:	(100, 100, 1)
      From worker 2:	(100, 100, 1)
      From worker 4:	(100, 100, 1)


In [33]:
map(fetch, (@spawnat p size(localpart(a))) for p=procs(a) )

1×1×3 Array{Tuple{Int64,Int64,Int64},3}:
[:, :, 1] =
 (100, 100, 1)

[:, :, 2] =
 (100, 100, 1)

[:, :, 3] =
 (100, 100, 1)

Let's check that we have the right values:

In [36]:
for k=1:3
    println(a[1,1,k])
end

1.0
2.0
3.0


## IV. Using SharedArrays

`SharedArrays` are `DistributedArrays` for which the **chunks are equal to the full array**. That is, each worker receives the entire array.


In [41]:
@time b = SharedArray(zeros(dim1,dim2,dim3));

@everywhere function give_my_id!(x::SharedArray{Float64,3}, indexDim3)
    for indexDim2 = 1:dim2
        for indexDim1 = 1:dim1
            x[indexDim1, indexDim2, indexDim3] = indexDim3
            sleep(0.0)
        end
    end
end

@time asyncmap(fetch, (@spawnat p give_my_id!(b, p)) for p=1:nworkers())

  0.493586 seconds (361.09 k allocations: 19.209 MiB, 2.90% gc time)
 12.050383 seconds (1.26 M allocations: 62.050 MiB, 0.09% gc time)


3-element Array{Nothing,1}:
 nothing
 nothing
 nothing

Once again, execution time is approximately divided by  3.
Inspecting the array `b`, we see that we have the correct result

In [42]:
for k=1:3
    println(b[1,1,k])
end

1.0
2.0
3.0


## V. Timing initialization and execution

In [43]:
function initialize_solve_serial()
    
    # initialization
    k = zeros(dim1,dim2,dim3)
    
    # calculation
    wrapper(k)
    
    return k
end


function initialize_solve_native()
    
    # initialization
    asyncmap(fetch, (@spawnat p eval(:(myLocalArray = zeros(dim1, dim2, 1)))) for p in workers())

    # calculation
    asyncmap(fetch, (@spawnat p give_my_id_parallel!(myLocalArray, p-1)) for p in workers())
    
    # recombine results
    k = recombine_data(:myLocalArray)
end

function initialize_solve_shared()
    
    # initialization
    k = SharedArray(zeros(dim1,dim2,dim3));
    
    # calculation
    asyncmap(fetch, (@spawnat p give_my_id!(k, p)) for p=1:nworkers())
    
    return k
end

function initialize_solve_distributed()
    
    @sync begin 
        k = dzeros((dim1,dim2,dim3), workers(), [1,1,nworkers()]);
    end
    
    asyncmap(fetch, (@spawnat p give_my_id_parallel!(localpart(a), p-1)) for p in procs(a))
    
    return k
    
bend

initialize_solve_distributed (generic function with 1 method)

To make the benchmark more accurate, let's use `BenchmarkTools` rather than the usual `@time` macro

In [44]:
using BenchmarkTools

In [45]:
@btime initialize_solve_serial()

  35.596 s (151994 allocations: 4.77 MiB)


100×100×3 Array{Float64,3}:
[:, :, 1] =
 1.0  1.0  1.0  1.0  1.0  1.0  1.0  1.0  …  1.0  1.0  1.0  1.0  1.0  1.0  1.0
 1.0  1.0  1.0  1.0  1.0  1.0  1.0  1.0     1.0  1.0  1.0  1.0  1.0  1.0  1.0
 1.0  1.0  1.0  1.0  1.0  1.0  1.0  1.0     1.0  1.0  1.0  1.0  1.0  1.0  1.0
 1.0  1.0  1.0  1.0  1.0  1.0  1.0  1.0     1.0  1.0  1.0  1.0  1.0  1.0  1.0
 1.0  1.0  1.0  1.0  1.0  1.0  1.0  1.0     1.0  1.0  1.0  1.0  1.0  1.0  1.0
 1.0  1.0  1.0  1.0  1.0  1.0  1.0  1.0  …  1.0  1.0  1.0  1.0  1.0  1.0  1.0
 1.0  1.0  1.0  1.0  1.0  1.0  1.0  1.0     1.0  1.0  1.0  1.0  1.0  1.0  1.0
 1.0  1.0  1.0  1.0  1.0  1.0  1.0  1.0     1.0  1.0  1.0  1.0  1.0  1.0  1.0
 1.0  1.0  1.0  1.0  1.0  1.0  1.0  1.0     1.0  1.0  1.0  1.0  1.0  1.0  1.0
 1.0  1.0  1.0  1.0  1.0  1.0  1.0  1.0     1.0  1.0  1.0  1.0  1.0  1.0  1.0
 1.0  1.0  1.0  1.0  1.0  1.0  1.0  1.0  …  1.0  1.0  1.0  1.0  1.0  1.0  1.0
 1.0  1.0  1.0  1.0  1.0  1.0  1.0  1.0     1.0  1.0  1.0  1.0  1.0  1.0  1.0
 1.0  1.0  1.0  1.0  1.0

In [46]:
@btime initialize_solve_native()

  11.995 s (1723 allocations: 767.14 KiB)


100×100×3 Array{Float64,3}:
[:, :, 1] =
 1.0  1.0  1.0  1.0  1.0  1.0  1.0  1.0  …  1.0  1.0  1.0  1.0  1.0  1.0  1.0
 1.0  1.0  1.0  1.0  1.0  1.0  1.0  1.0     1.0  1.0  1.0  1.0  1.0  1.0  1.0
 1.0  1.0  1.0  1.0  1.0  1.0  1.0  1.0     1.0  1.0  1.0  1.0  1.0  1.0  1.0
 1.0  1.0  1.0  1.0  1.0  1.0  1.0  1.0     1.0  1.0  1.0  1.0  1.0  1.0  1.0
 1.0  1.0  1.0  1.0  1.0  1.0  1.0  1.0     1.0  1.0  1.0  1.0  1.0  1.0  1.0
 1.0  1.0  1.0  1.0  1.0  1.0  1.0  1.0  …  1.0  1.0  1.0  1.0  1.0  1.0  1.0
 1.0  1.0  1.0  1.0  1.0  1.0  1.0  1.0     1.0  1.0  1.0  1.0  1.0  1.0  1.0
 1.0  1.0  1.0  1.0  1.0  1.0  1.0  1.0     1.0  1.0  1.0  1.0  1.0  1.0  1.0
 1.0  1.0  1.0  1.0  1.0  1.0  1.0  1.0     1.0  1.0  1.0  1.0  1.0  1.0  1.0
 1.0  1.0  1.0  1.0  1.0  1.0  1.0  1.0     1.0  1.0  1.0  1.0  1.0  1.0  1.0
 1.0  1.0  1.0  1.0  1.0  1.0  1.0  1.0  …  1.0  1.0  1.0  1.0  1.0  1.0  1.0
 1.0  1.0  1.0  1.0  1.0  1.0  1.0  1.0     1.0  1.0  1.0  1.0  1.0  1.0  1.0
 1.0  1.0  1.0  1.0  1.0

In [47]:
@btime initialize_solve_distributed()

  11.451 s (7776 allocations: 416.05 KiB)


100×100×3 Array{Float64,3}:
[:, :, 1] =
 1.0  1.0  1.0  1.0  1.0  1.0  1.0  1.0  …  1.0  1.0  1.0  1.0  1.0  1.0  1.0
 1.0  1.0  1.0  1.0  1.0  1.0  1.0  1.0     1.0  1.0  1.0  1.0  1.0  1.0  1.0
 1.0  1.0  1.0  1.0  1.0  1.0  1.0  1.0     1.0  1.0  1.0  1.0  1.0  1.0  1.0
 1.0  1.0  1.0  1.0  1.0  1.0  1.0  1.0     1.0  1.0  1.0  1.0  1.0  1.0  1.0
 1.0  1.0  1.0  1.0  1.0  1.0  1.0  1.0     1.0  1.0  1.0  1.0  1.0  1.0  1.0
 1.0  1.0  1.0  1.0  1.0  1.0  1.0  1.0  …  1.0  1.0  1.0  1.0  1.0  1.0  1.0
 1.0  1.0  1.0  1.0  1.0  1.0  1.0  1.0     1.0  1.0  1.0  1.0  1.0  1.0  1.0
 1.0  1.0  1.0  1.0  1.0  1.0  1.0  1.0     1.0  1.0  1.0  1.0  1.0  1.0  1.0
 1.0  1.0  1.0  1.0  1.0  1.0  1.0  1.0     1.0  1.0  1.0  1.0  1.0  1.0  1.0
 1.0  1.0  1.0  1.0  1.0  1.0  1.0  1.0     1.0  1.0  1.0  1.0  1.0  1.0  1.0
 1.0  1.0  1.0  1.0  1.0  1.0  1.0  1.0  …  1.0  1.0  1.0  1.0  1.0  1.0  1.0
 1.0  1.0  1.0  1.0  1.0  1.0  1.0  1.0     1.0  1.0  1.0  1.0  1.0  1.0  1.0
 1.0  1.0  1.0  1.0  1.0

In [48]:
@btime initialize_solve_shared()

  11.485 s (51442 allocations: 1.78 MiB)


100×100×3 SharedArray{Float64,3}:
[:, :, 1] =
 1.0  1.0  1.0  1.0  1.0  1.0  1.0  1.0  …  1.0  1.0  1.0  1.0  1.0  1.0  1.0
 1.0  1.0  1.0  1.0  1.0  1.0  1.0  1.0     1.0  1.0  1.0  1.0  1.0  1.0  1.0
 1.0  1.0  1.0  1.0  1.0  1.0  1.0  1.0     1.0  1.0  1.0  1.0  1.0  1.0  1.0
 1.0  1.0  1.0  1.0  1.0  1.0  1.0  1.0     1.0  1.0  1.0  1.0  1.0  1.0  1.0
 1.0  1.0  1.0  1.0  1.0  1.0  1.0  1.0     1.0  1.0  1.0  1.0  1.0  1.0  1.0
 1.0  1.0  1.0  1.0  1.0  1.0  1.0  1.0  …  1.0  1.0  1.0  1.0  1.0  1.0  1.0
 1.0  1.0  1.0  1.0  1.0  1.0  1.0  1.0     1.0  1.0  1.0  1.0  1.0  1.0  1.0
 1.0  1.0  1.0  1.0  1.0  1.0  1.0  1.0     1.0  1.0  1.0  1.0  1.0  1.0  1.0
 1.0  1.0  1.0  1.0  1.0  1.0  1.0  1.0     1.0  1.0  1.0  1.0  1.0  1.0  1.0
 1.0  1.0  1.0  1.0  1.0  1.0  1.0  1.0     1.0  1.0  1.0  1.0  1.0  1.0  1.0
 1.0  1.0  1.0  1.0  1.0  1.0  1.0  1.0  …  1.0  1.0  1.0  1.0  1.0  1.0  1.0
 1.0  1.0  1.0  1.0  1.0  1.0  1.0  1.0     1.0  1.0  1.0  1.0  1.0  1.0  1.0
 1.0  1.0  1.0  1.

## VI. Benchmark Results


### Serial
* 35.596 s (151994 allocations: 4.77 MiB)

### Native Julia
* 11.995 s (1723 allocations: 767.14 KiB)

### DistributedArrays
* 11.451 s (7776 allocations: 416.05 KiB)

### SharedArrays
* 11.485 s (51442 allocations: 1.78 MiB)


This benchmark suggests that `DistributedArrays` is as good as native Julia, if not better.
This benchmark also illustrates that there is a small (marginal) penalty associated to using `SharedArrays`, associated with the cost of copying the entire data to each worker, but this penalty is likely to be marginal when each calculation takes more time (look at the allocations). 

---

## Appendix

### Version

In [10]:
versioninfo()

Julia Version 1.5.2
Commit 539f3ce943 (2020-09-23 23:17 UTC)
Platform Info:
  OS: Linux (x86_64-pc-linux-gnu)
  CPU: Intel(R) Core(TM) i7-8850H CPU @ 2.60GHz
  WORD_SIZE: 64
  LIBM: libopenlibm
  LLVM: libLLVM-9.0.1 (ORCJIT, skylake)


### Safety checks

Let's make sure that all the results are equal.
For the tests to make sense, we need to convert DistributedArrays and SharedArrays to native Julia Arrays.

In [None]:
using Base.Test

# Conversion
aArray = convert(Array{Float64,3}, a)
bArray = convert(Array{Float64,3}, b)

# Tets
println(@test t==aArray)
println(@test t==bArray)