## 9. Parallel Computing


# `sum`: Recalling the performance lecture: an easy enough function to understand

The  **sum** function `sum(a)`, which computes

$$
\mathrm{sum}(a) = \sum_{i=1}^n a_i.
$$

In [None]:
#Pkg.add("BenchmarkTools")
using BenchmarkTools

# 1a. serial Julia (as fast as handwritten c)

In [None]:
function jsum(A)   
    s = 0.  # s = zero(eltype(A))
    for a in A
        s += a
    end
    s
end

In [None]:
a = randn(10^7) # array of random numbers, uniform on [0,1)
j_bench = @benchmark jsum(erf($a))
minimum(j_bench.times)/1e6

## 1b. Built in Julia

In [None]:
sum_bench = @benchmark sum(erf($a))
minimum(sum_bench.times)/1e6

# 2a. Distributed Computing with DArrays (my favorite for the high level -- I wish it were more complete)

In [None]:
workers()

In [None]:
Pkg.add("DistributedArrays")
using DistributedArrays

In [None]:
A = distribute(a)  # one could have used A = drand(10^7) for a parallel constructor
A.indexes

In [None]:
addprocs(2)

In [None]:
workers() # 2 and 3 are workers  (so in parallel we have two processors)

In [None]:
@everywhere using DistributedArrays
A = distribute(a)
A.indexes
A.pids

In [None]:
sum(erf(A))-sum(erf(a))

In [None]:
minimum((@benchmark sum(erf($a))).times)/1e6

In [None]:
minimum((@benchmark sum(erf($A))).times)/1e6

In [None]:
addprocs(2)

In [None]:
workers()

In [None]:
@everywhere using DistributedArrays

In [None]:
A = distribute(a);

In [None]:
minimum((@benchmark sum(erf($A))).times)/1e6

In [None]:
A.indexes

In [None]:
addprocs(1)
workers()

In [None]:
@everywhere using DistributedArrays

In [None]:
minimum((@benchmark sum(erf($A))).times)/1e6

In [None]:
@everywhere using DistributedArrays

In [None]:
A = distribute(a)  # one could have used A = drand(10^7) for a parallel constructor
sum(A) - sum(a)

In [None]:
minimum((@benchmark sum($A)).times)/1e6

In [None]:
summary(A)

In [None]:
@which sum(A)

In [None]:
@which reduce(+,A)

In [None]:
A = drand(100,100)

In [None]:
@which reduce(+,r)

## 2b. How do we implement DArray functionality?  Answer: Bootstrap from serial functionality.

In [None]:
#@everywhere using DistributedArrays
A = drand(10^7)

In [None]:
a = convert( Array{Float64,1} , A)

In [None]:
# Simulated Parallel Algorithm
s = sum.([a[i[1]] for i in A.indexes]) 
display(s)
sum(s)

In [None]:
# Actual Parallel Algorithm

In [None]:
for p in A.pids
    display(fetch(@spawnat p sum(localpart(A))))
end

In [None]:
results = Float64[]
@sync for p in A.pids
     @async push!(results, fetch(@spawnat p sum(localpart(A))))
end

println(results)
sum(results)

In [None]:
sum(a)

In [None]:
std(A)

In [None]:
mean(A)

In [None]:
# What about a histogram?

In [None]:
Pkg.add("StatsBase")
using StatsBase

In [None]:
w = fit(Histogram, rand(3000), 0:0.05:1.0).weights
bar(0:.05:.95,w,.05)

In [None]:
using PyPlot

In [None]:
A = drand(10^8)
@everywhere using StatsBase
@everywhere w1(x) = fit(Histogram,x,0:0.05:1.0).weights
results = Array{Int64,1}[]
@sync for p in A.pids
    @async push!(results, fetch(@spawnat p w1(localpart(A))))
end
sum(results)

In [None]:
nprocs()

## 3. @parallel  (looks like a "for loop", data starts and ends on master)

The history of Julia includes a "back and forth" between "for loop" notation and vectorized
notation.  The truth is, for some problems, one or the other, or both could be more natural,
or more performant, but not for the old reasons of dynamic languages where "for loops" were
always to be avoided.

In [None]:
 foo(a)=
@parallel (+) for aa in a
    aa # this could be a more complicated statistic
end

In [None]:
a

In [None]:
@benchmark foo($a)


In [None]:
@everywhere randstat(n) = maximum(eigvals(SymTridiagonal(randn(n),randn(n-1))))

In [None]:
workers()

In [None]:
@benchmark @parallel (+) for i in 1:250
    randstat(500) 
end

In [None]:
@benchmark mapreduce(randstat,+,500*dones(Int,250))

## 4. MPI is possible for the die-hards and also some libraries

(See separate notebook)

## 5. Threads under development

In [None]:
function sim(n, seed = rand(UInt))
    nt = Threads.nthreads()
    ra = zeros(nt)
    Threads.@threads for i in 1:nt
        id   = Threads.threadid()
        rng  = MersenneTwister(id + seed)
        d, r = divrem(n, Threads.nthreads())
        bs = d + (r > 0)
        if id == nt
            bs = n - (nt - 1)*bs
        end
        ra[id] = sum(rand(rng, bs))
    end
    return sum(ra)
end

In [None]:
@benchmark sim(10^7)

In [None]:
function sim2(a)
    nt = Threads.nthreads()
    return sum(a)
end

In [None]:
a =  rand(10^7)
@benchmark sim2(a)

In [None]:
workers()

## 6.  GPU Libraries using ArrayFire

Step 1. Follow directions in https://github.com/JuliaComputing/ArrayFire.jl from a terminal to load arrayfire on your machine.

In [None]:
using ArrayFire

In [None]:
getActiveBackend()

In [None]:
getAvailableBackends()