<img
src="http://www.telecom-em.eu/sites/default/files/logoimt2016.JPG"
WIDTH=180 HEIGHT=180>

<CENTER>
<p>
<font size="5"> Introduction to parallel computing</font></p>
</CENTER>

----------------

# Parallel programming

Use the official documentation 
https://docs.julialang.org/en/v1/manual/parallel-computing/index.html

Julia evolves quickly so you will be sure that the official documentation is up to date.

Other resources can be found on the internet but not all of them are up to date...
(e.g. https://codingclubuc3m.github.io/2018-06-06-Parallel-computing-Julia.html , replace all occurences of  @parallel by @distributed)



In [1]:
# Using Pkg
# Pkg.add("Distributed")

using Distributed

CPU_CORES = 4 # number of cores on the machine

# Before adding workers.
nprocs()
nworkers() # when there are no no extra workers, nprocs() = nworkers().

# After adding them.
addprocs(CPU_CORES - 1) # 4 cores
# addprocs(1) # 2 cores
nprocs()
nworkers()
workers()

# nworkers() #Number of available worker processes. nworkers() = nprocs() - 1.
# rmproc()

3-element Vector{Int64}:
 2
 3
 4

In [4]:
workers() #identifiants des "workers" autre que le master "1"

3-element Vector{Int64}:
 2
 3
 4

In [None]:
nprocs() #nb total de processus

4

In [None]:
nworkers() #nb de workers dispo excpeté master 

3

### Using tasks

In [7]:
a() = sum(i for i in 1:1000) # sum of integers from 1 to 1000 = 500500
b   = Task(a)  # or b = @task a()  # associate task to this function

Task (runnable) @0x000001d4a0bf3a30

In [8]:
istaskstarted(b)  #vérifie si la tâche a commencé (resources CPU pour effectuer le calcul)

false

In [9]:
schedule(b) # Adds the task to the scheduler's queue

Task (done) @0x000001d4a0bf3a30

In [10]:
istaskstarted(b)

true

In [11]:
istaskdone(b)

true

In [12]:
b

Task (done) @0x000001d4a0bf3a30

In [13]:
fetch(b) # get the answer of task

500500

### Using several workers

In [14]:
workers() # find out how many workers are active

3-element Vector{Int64}:
 2
 3
 4

In [10]:
addprocs(2) # add two workers

2-element Array{Int64,1}:
 5
 6

In [15]:
workers() 

3-element Vector{Int64}:
 2
 3
 4

In [16]:
nprocs(),rmprocs(7) # list number of active processes, remove process with PID 7
                      # rmprocs(1) does not work because procs = 1 is not a worker

(4, Task (done) @0x000001d4a324dd20)

In [17]:
w = 2 # worker number 2
r = remotecall(rand, w, 2, 2) # RemoteRef: reference to computed result.
# call rand function to be executed on worker 2.A "Future" object is returned (something that will be fetched later on)

Future(2, 1, 5, ReentrantLock(nothing, 0x00000000, 0x00, Base.GenericCondition{Base.Threads.SpinLock}(Base.IntrusiveLinkedList{Task}(nothing, nothing), Base.Threads.SpinLock(0)), (0, 0, 0)), nothing)

In [18]:
fetch(r)  # gets the computed result
          # blocks main processor until result is available 

2×2 Matrix{Float64}:
 0.0851432  0.828852
 0.296953   0.883665

In [19]:
x,y = 3,4
ψ= (x,y)->sqrt(x^2+y^2)
remotecall_fetch(ψ,w,x,y) # obtain value of computation of psi by worker number w and passing arguments x,y (both remotecall and fetch are performed at the same time)

5.0

In [20]:
using LinearAlgebra: diagm

w  = 3
d  = diagm(0=>[1,1]) # diagonal matrix with ones on the diagonal (first argument is the index of the diagonal used)
ex = :($d+fetch($r)) # this computes the value of d + r (which was computed by process 2)
s  = @spawnat w eval(ex) # worker w calculates expression ex
         # equivalent to s = @spawnat 3 [1 0;0 1]+fetch(r)
fetch(s)

# here we just passed information from worker 2 to 3 (if task performed at 3 requires info from task performed at 2)

2×2 Matrix{Float64}:
 1.08514   0.828852
 0.296953  1.88366

In [21]:
s= @spawn  sin.(randn(2,2)) #worker is chosen automatically if not specified
fetch(s)

2×2 Matrix{Float64}:
 0.983798   -0.420118
 0.0568309   0.09142

In [22]:
@everywhere using LinearAlgebra: det # execute this on all processes (load this function on every process)
result = [@spawnat w det(randn(10,10)) for w in workers()] # execute this function on all processes through a loop
# all processes have access to the function det
[fetch(result[k]) for k=1:nprocs()-1] # get results on all current processes

3-element Vector{Float64}:
  1666.5089948099342
 -1485.0663484656561
  1145.2060732177708

In [23]:
@everywhere println(det(randn(10,10))) # same calculation on each worker

-22.661163182446714
      From worker 3:	-909.2604297905103
      From worker 4:	10248.427994196905


      From worker 2:	23.729089101013805


In [24]:
@everywhere function f(x::Float64)
    println(x +randn())
end 
@everywhere x=5.
@everywhere f(x)

5.2034962943531875
      From worker 4:	5.041548545449668
      From worker 2:	4.627296814449237
      From worker 3:	3.6108370160957253


### Parallel loops

In [26]:
function MC_pi(n) # approximation of pi using MC method
    s = 0
    for k in 1:n
        if rand()^2+rand()^2<1. # if we fall in the disk (all possible numbers are in the square)
            s=s+1 # add 1
        end
    end
    4*s/n # ratio between area of circle and square is π/4, s/n is an estimation of that ratio 
    #(number of points falling in the circle/total)
end
@time MC_pi(10^5)

  0.000437 seconds


3.14616

workers()
addprocs(3)
workers()

### The map and pmaps functions

map allows to call the same function for different inputs of the same sizes.

pmap does the same but distributes all the arguments to the different processes

In [25]:
function MC_pi_parallel(n)
    @everywhere function g(x) # function computing whether a point is in the circle or not
        rand()^2+rand()^2<1. ? 1 : 0 # no data is required here, function returns 1 if condition is true, 0 othw
    end
    s = sum(pmap(g,collect(1:n))) # we call function g with N different arguments, and distribute the computations
    4*s/n
end
@time MC_pi_parallel(10^5)


  8.155285 seconds (9.22 M allocations: 495.892 MiB, 2.86% gc time, 19 lock conflicts, 8.80% compilation time)


3.14272

"pmap" is slower here! The reason is that the tasks made by each worker are too simple and fast, but they are numerous, so the overhead from transferring data in and out for each worker dominates the computing time

In [27]:
M = [randn(1000,1000) for k=1:100];
@time [det(m) for m in M];
@time pmap(det,M);  #parallel mapping of a function


  1.478719 seconds (41.95 k allocations: 765.808 MiB, 12.73% gc time, 1.75% compilation time)
  2.469497 seconds (448.58 k allocations: 22.220 MiB, 15.75% compilation time)


### Recovering returned values, Sending data and passing arguments to pmap

In [29]:
M = 100
N = 1000
X = randn(M,N)

X_parallel = [X[:,k] for k = 1:N]; # this creates an array containing all columns of X that can be iterated over

@everywhere using LinearAlgebra: det

@time dets = pmap(x -> det(x*x'),X_parallel);
@time dets = map(x -> det(x*x'),X_parallel);

println(size(dets))
println(dets[2]) # this is 0 because matrix has always a rank of 1

  0.398738 seconds (281.19 k allocations: 16.032 MiB, 41.40% compilation time)
  0.240329 seconds (56.77 k allocations: 156.811 MiB, 12.29% gc time, 12.83% compilation time)
(1000,)
-0.0


In [31]:
# we need anonymous functions to call custom functions with pmap
# need the correct prototype to send parameters
@everywhere using LinearAlgebra: I

@everywhere function custom_det(x,λ,M) # computes the determinant of a new matrix with parameters
    return  determinant = det(x'*x .+ λ*Matrix{Float64}(I, M, M))
end
λ = 1

@time dets = pmap(x -> custom_det(x,λ,M),X_parallel);
@time dets = map(x -> custom_det(x,λ,M),X_parallel);

println(size(dets))
println(dets[2]) # this is 0 because matrix has always a rank of 1

  0.524321 seconds (281.20 k allocations: 16.033 MiB, 29.29% compilation time)
  0.468030 seconds (81.15 k allocations: 310.467 MiB, 13.24% gc time, 35.45% compilation time)
(1000,)
8521.480646418373


### Distributed arrays

In [None]:
#using Pkg
#Pkg.add("DistributedArrays") # compatibility pb with julia 1.2


[32m[1m    Updating[22m[39m registry at `C:\Users\vince\.julia\registries\General.toml`
[32m[1m   Resolving[22m[39m package versions...
[32m[1m   Installed[22m[39m DistributedArrays ─ v0.6.7
[32m[1m   Installed[22m[39m Primes ──────────── v0.5.7
[32m[1m   Installed[22m[39m IntegerMathUtils ── v0.1.3
[32m[1m    Updating[22m[39m `C:\Users\vince\.julia\environments\v1.11\Project.toml`
  [90m[aaf54ef3] [39m[92m+ DistributedArrays v0.6.7[39m
[32m[1m    Updating[22m[39m `C:\Users\vince\.julia\environments\v1.11\Manifest.toml`
  [90m[aaf54ef3] [39m[92m+ DistributedArrays v0.6.7[39m
  [90m[18e54dd8] [39m[92m+ IntegerMathUtils v0.1.3[39m
  [90m[27ebfcd6] [39m[92m+ Primes v0.5.7[39m
[92m[1mPrecompiling[22m[39m project...
  38899.5 ms[32m  ✓ [39m[90mIntegerMathUtils[39m
   2887.7 ms[32m  ✓ [39m[90mPrimes[39m
   5227.4 ms[32m  ✓ [39mDistributedArrays
  3 dependencies successfully precompiled in 98 seconds. 436 already precompiled.


In [33]:
@everywhere using DistributedArrays # distributed arrays are arrays that can be accessed and modified by any process

In [34]:
A = drandn(100,100) # directly generate distributed array
B = randn(100,100) 
dB = distribute(B) # convert array to distributed array
#C = @DArray [i+j for i = 1:100, j = 1:100];

100×100 DArray{Float64, 2, Matrix{Float64}}:
  0.559415   0.864566   -0.120601   …  -1.27598    -0.334256    0.683437
  1.3749     0.389149   -2.86064       -0.0330292   0.969102   -0.595086
  0.247098  -0.377818   -0.544308      -0.954368    0.685744   -0.482122
 -0.674017   0.216606    0.684336       1.2016     -0.558792   -1.34543
  0.242152  -1.82626     0.301732       1.36243    -1.01877    -0.76866
 -2.06718    0.792555    0.107558   …  -0.770056   -0.591014    1.246
 -1.8685    -1.75361     0.70944       -1.92153    -1.47071    -1.74159
 -0.962513   0.519184   -0.529195      -0.0779362  -0.0697712   0.274783
 -0.155531   1.55098    -1.034          1.63434    -0.164336   -1.05915
  1.4215    -1.45593     0.241222       0.329127   -0.633941   -1.22289
  1.44913    0.900084   -0.930967   …   0.987568   -1.07236     1.63509
 -0.56764    0.46079    -0.996194      -0.0250393  -0.433401   -0.33864
 -0.627784   0.70548     0.560018      -1.4997     -0.7564      1.74477
  ⋮              

In [35]:
@time A*A; # distributed array
@time dB*dB; # converted distributed array
@time B*B; # standard array

  4.932886 seconds (2.43 M allocations: 122.180 MiB, 0.82% gc time, 7 lock conflicts, 23.26% compilation time)
  0.003412 seconds (1.83 k allocations: 167.492 KiB, 7 lock conflicts, 0.02% compilation time)
  0.769964 seconds (4.34 M allocations: 215.981 MiB, 8.03% gc time, 99.27% compilation time)


In [36]:
@time [i+j for i = 1:100, j = 1:100];
@time @DArray [i+j for i = 1:100, j = 1:100];

  0.059471 seconds (166.62 k allocations: 8.688 MiB, 99.63% compilation time)
  0.241436 seconds (101.91 k allocations: 5.053 MiB, 43.11% compilation time)


In [41]:
println(nprocs())
addprocs(7)
# rmprocs(12:13)
@everywhere using DistributedArrays
# WorkerPool([2,3,4])
# @DArray [@show x^2 for x = 1:10]; # print all squares from 1 to 10 (not in order because taken care of by different processes)

4


In [42]:
# results depending on nb of procs and data size
@time @DArray [x^2 for x = 1:10^8]; # distributed version (only advantageous for large datasets)
@time [x^2 for x = 1:10^8]; # standard version

# test as a function of number of processes and data size

  2.211755 seconds (89.60 k allocations: 4.413 MiB, 2.08% compilation time)
  0.446623 seconds (31.77 k allocations: 764.536 MiB, 26.80% gc time, 6.89% compilation time)


In [43]:
dzeros((100,100), workers()[1:4], [1,4]) # creates a distributed zero array where 
# the second dimension is spread around workers 1 to 4

100×100 DArray{Float64, 2, Matrix{Float64}}:
 0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  …  0.0  0.0  0.0  0.0  0.0  0.0  0.0
 0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0     0.0  0.0  0.0  0.0  0.0  0.0  0.0
 0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0     0.0  0.0  0.0  0.0  0.0  0.0  0.0
 0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0     0.0  0.0  0.0  0.0  0.0  0.0  0.0
 0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0     0.0  0.0  0.0  0.0  0.0  0.0  0.0
 0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  …  0.0  0.0  0.0  0.0  0.0  0.0  0.0
 0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0     0.0  0.0  0.0  0.0  0.0  0.0  0.0
 0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0     0.0  0.0  0.0  0.0  0.0  0.0  0.0
 0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0     0.0  0.0  0.0  0.0  0.0  0.0  0.0
 0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0     0.0  0.0  0.0  0.0  0.0  0.0  0.0
 0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  …  0.0  0.0  0.0  0.0  0.0  0.0  0.0
 0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0     0.0  0.0  0.0  0.0  0.0  0.0  0.0
 0.0  0.0  0.0  0.0

------------