

# Distributed arrays

Parallel computing is a mess! There are many types of parallelism:

- MPI
- CUDA
- OpenMP
- Threads

Idea: `DistributedArrays` gives one of the easiest methods for parallel computing, when the parallelism is "embarrassingly parallel", i.e. we want different processes doing the same thing independently. It's an *easy* form of parallelism, when it works.

A **distributed array** is an array lives on several processors (or cores) -- each processor has a part of the array (a "local part"). The array is **partitioned** among the different processors.

A `DistributedArray` will **look (to us) like a standard Julia array**. The partitioning among the processors should be **hidden**; i.e. it is an **abstraction**. (Hence "AbstractArray": something that *behaves like* an array.)

To use on a cluster, use the `ClusterManagers.jl` package to control the adquisition of processors.

In [None]:
Pkg.add("DistributedArrays")

In [None]:
using DistributedArrays  # precompile on one process

In [None]:
# Add processes:
addprocs(4)

# Load the package on each process:
@everywhere using DistributedArrays  

In [None]:
procs()  # one master process that controls and 4 subprocesses

In [None]:
@macroexpand @everywhere using DistributedArrays

In [None]:
rmprocs(2:5)

In [None]:
workers()

In [None]:
addprocs()

In [None]:
workers()

In [None]:
rmprocs(workers())

In [None]:
workers()

In [None]:
addprocs(4)

In [None]:
@everywhere using DistributedArrays

In [None]:
# Make some data:

a = 1:10^3  

b = a .^ 2

`a` and `b` are standard Julia arrays. Let's time how long it takes to sum them:

In [None]:
function bench_sum()
    a = rand(10^6)
    
    @time sum([t^2 for t in a])
    @time sum(t^2 for t in a)
end

In [None]:
bench_sum()

## Pleasantly parallel (independent calculations)

In [None]:
procs()

One "master" process and 4 "workers".

In [None]:
a = rand(10^6);  # standard Julia object

In [None]:
d = distribute(a)

The `distribute` command requires a transfer of data between procesess. Instead, we can create the DistributedArray directly on the different processes:

In [None]:
D = @DArray [i+j for i in 1:10, j in 1:10]

We can see which pieces of the array live on each worker:

In [None]:
D.indexes

We can create a distributed random matrix:

In [None]:
drand(10, 10)

In [None]:
T = typeof(D)

In [None]:
supertype(T)

In [None]:
show(D)  # requires data transfer

In [None]:
fieldnames(D)

We want to write *the same code* and have it *just work* on a DistributedArray:

In [None]:
[x^2 for x in D]

Note that the result is *not* a distributed array, so data transfer has happened.

In [None]:
@everywhere f(t) = t^2  # define a function on each worker

f.(D)

In [None]:
f(t) = t^2
dD = f.(D)  

In [None]:
@everywhere f(t) = t^2

dD = map(f, D)

In [None]:
# apply map to distributed vector (looks identical to non-distributed case)

dD == map(t->t^2, D)  # undistributes the array back onto the master node

In [None]:
@fetchfrom 14 localpart(dD)  # the result that worker 2 calculated

In [None]:
import DistributedArrays.localpart
localpart(dD::DArray, p::Integer) = @fetchfrom p localpart(dD)

In [None]:
localpart(dD, 14)

Remember: NEVER do performance comparisons in global scope, always inside a function. 

In [None]:
@everywhere begin
    using DistributedArrays
    using BenchmarkTools
end

In [None]:
function compare_timings()
    
    # serial
    a = [rand(100, 100) for i in 1:500]
    display(@benchmark map(t->t^2, $a))  # put '$' inside @benchmark
    
    # parallel
    da = distribute(a)
    display(@benchmark map(t->t^2, $da))
end

compare_timings()

In [None]:
# Distributed vectors not restricted to numerical types

map(t -> Dates.monthname((t - 1) % 12 + 1), D)

In [None]:
# A slightly more complicated example of map and reduce

monthString = map(t -> Dates.monthname((t - 1) % 12 + 1) |> s -> s*" is my favorite month.\n", D) |>
    t -> reduce(*, Array(t))
println(monthString)

In [None]:
# Distributed array comprehension

D55 = @DArray [randn(5,5) for i = 1:32]

In [None]:
# Compute eigenvalues of the distributed vector of matrices: 

Dsvd = map(eigvals, D55)

**Exercise**: Check the performance: is it 4 times faster than on a single process?