

# Distributed arrays

Parallel computing is a mess: too many types of parallelism:
- MPI
- CUDA
- OpenMP
- Threads

Idea: `DistributedArrays` *will* be (currently not quite there yet) the best, and easiest, way to do parallel computing. 

It's an *easy* form of parallelism, when it works.

An array that lives on several processors -- each processor has a part of the array (a "local part"). The array is **partitioned** on the different processors.

Note that the idea of referring to a vector as just one object, `v`, was a big deal in Matlab / Python etc.

Now `v` will refer to an array that is a more complicated object.

A `DistributedArray` will just **look like** (to us) a standard Julia array. The complicated stuff inside is hidden by an **abstraction**.

In [1]:
using Pkg; Pkg.add("DistributedArrays")

[32m[1m  Updating[22m[39m registry at `~/.julia/registries/General`
[32m[1m  Updating[22m[39m git-repo `https://github.com/JuliaRegistries/General.git`
[32m[1m  Updating[22m[39m `~/.julia/environments/v1.0/Project.toml`
[90m [no changes][39m
[32m[1m  Updating[22m[39m `~/.julia/environments/v1.0/Manifest.toml`
[90m [no changes][39m


In [2]:
using Distributed
# Add processes:
addprocs(2)

2-element Array{Int64,1}:
 2
 3

In [3]:
using DistributedArrays

In [4]:
# Use package for "distributed arrays"
@everywhere using DistributedArrays

## Parallelize it (where "it" is embarrassingly parallel)

In [5]:
procs()

3-element Array{Int64,1}:
 1
 2
 3

In [6]:
workers()

2-element Array{Int64,1}:
 2
 3

In [7]:
a = [1:1000;]

1000-element Array{Int64,1}:
    1
    2
    3
    4
    5
    6
    7
    8
    9
   10
   11
   12
   13
    ⋮
  989
  990
  991
  992
  993
  994
  995
  996
  997
  998
  999
 1000

In [8]:
# Distribute the data:

D = distribute(a)

1000-element DArray{Int64,1,Array{Int64,1}}:
    1
    2
    3
    4
    5
    6
    7
    8
    9
   10
   11
   12
   13
    ⋮
  989
  990
  991
  992
  993
  994
  995
  996
  997
  998
  999
 1000

In [9]:
T = typeof(D)

DArray{Int64,1,Array{Int64,1}}

In [10]:
supertype(T)

AbstractArray{Int64,1}

To get at the information inside the object:

    `D.<TAB>`

In [11]:
fieldnames(typeof(D))

(:id, :dims, :pids, :indices, :cuts, :localpart, :release)

Which piece of the `DArray` is stored on each worker:

In [12]:
D.indices

2-element Array{Tuple{UnitRange{Int64}},1}:
 (1:500,)   
 (501:1000,)

We want to write the *same* code and have it "just work":

In [13]:
dD = map(t -> t^2, D)  # dD is the distributed answer

1000-element DArray{Int64,1,Array{Int64,1}}:
       1
       4
       9
      16
      25
      36
      49
      64
      81
     100
     121
     144
     169
       ⋮
  978121
  980100
  982081
  984064
  986049
  988036
  990025
  992016
  994009
  996004
  998001
 1000000

In [14]:
# apply map to distributed vector (looks identical to non-distributed case)

dD == map(t->t^2, a)  # undistributes the array back onto the master node

true

In [15]:
@fetchfrom 3 localpart(dD)  # the result that worker 2 calculated

500-element Array{Int64,1}:
  251001
  252004
  253009
  254016
  255025
  256036
  257049
  258064
  259081
  260100
  261121
  262144
  263169
       ⋮
  978121
  980100
  982081
  984064
  986049
  988036
  990025
  992016
  994009
  996004
  998001
 1000000

In [16]:
@everywhere using Dates

In [17]:
# Distributed vectors not restricted to numerical types

map(t -> Dates.monthname((t - 1) % 12 + 1), D)

1000-element DArray{String,1,Array{String,1}}:
 "January"  
 "February" 
 "March"    
 "April"    
 "May"      
 "June"     
 "July"     
 "August"   
 "September"
 "October"  
 "November" 
 "December" 
 "January"  
 ⋮          
 "May"      
 "June"     
 "July"     
 "August"   
 "September"
 "October"  
 "November" 
 "December" 
 "January"  
 "February" 
 "March"    
 "April"    

In [None]:
# A slightly more complicated example of map and reduce

monthString = map(t -> Dates.monthname((t - 1) % 12 + 1) |> s -> s*" is my favorite month.\n", D) |>
    t -> reduce(*, Array(t))
println(monthString)

In [19]:
# Distributed array comprehension

D55 = @DArray [randn(500,500) for i = 1:32]

32-element DArray{Array{Float64,2},1,Array{Array{Float64,2},1}}:
 [0.540243 -1.09863 … -0.0320705 -2.13346; 1.47592 -2.45283 … -1.79427 -1.35625; … ; 1.46436 0.562266 … -1.61478 0.754182; -0.00183824 0.657098 … -0.128423 -0.630894]     
 [-0.748836 0.724214 … 2.6096 -0.513093; 2.24875 2.36366 … 0.420865 1.21235; … ; -0.875299 0.144333 … 1.59832 1.62788; 0.446644 -1.54994 … -0.880385 0.00976228]           
 [0.465207 -0.00523897 … 2.05525 -0.640681; 1.0202 0.193896 … 0.625545 0.0710984; … ; -0.640639 0.932369 … 0.544009 -0.670359; 0.00330771 0.373471 … 0.267226 -0.0792813]  
 [0.629801 1.06126 … 0.520062 -0.0844751; 1.11461 0.0411959 … -1.70078 0.0721042; … ; 1.16154 0.161893 … 0.683445 -2.00771; -0.0486126 0.292486 … 0.513096 -1.05025]       
 [2.0821 0.401563 … -0.678405 0.0764716; 0.481985 0.029227 … -1.05093 -0.831277; … ; 2.04734 -1.5946 … 1.43547 -1.52479; -0.690757 0.485431 … -0.081544 -0.491193]         
 [-0.543271 -0.711215 … 0.306203 1.02335; -0.0745167 0.97604 … 0.771504 -0.

In [20]:
using LinearAlgebra

In [29]:
# Compute singular values of the distributed vector of matrices: 

@time Dsvd = map(svdvals, D55);

  0.963862 seconds (7.53 k allocations: 415.067 KiB)


In [23]:
BLAS.set_num_threads(1)

In [24]:
d5 = [randn(500,500) for i = 1:32];

In [30]:
@time map(svdvals, d5);

  1.631661 seconds (326 allocations: 70.335 MiB)
