<img
src="http://www.telecom-em.eu/sites/default/files/logoimt2016.JPG"
WIDTH=180 HEIGHT=180>

<CENTER>
<p>
<font size="5"> Introduction to parallel computing</font></p>
</CENTER>

----------------

# Parallel programming

Use the official documentation 
https://docs.julialang.org/en/v1/manual/parallel-computing/index.html

Julia evolves quickly so you will be sure that the official documentation is up to date.

Other resources can be found on the internet but not all of them are up to date...
(e.g. https://codingclubuc3m.github.io/2018-06-06-Parallel-computing-Julia.html , replace all occurences of  @parallel by @distributed)



In [1]:
# Using Pkg
# Pkg.add("Distributed")

using Distributed

CPU_CORES = 4 # number of cores on the machine

# Before adding workers.
nprocs()
nworkers() # when there are no no extra workers, nprocs() = nworkers().

# After adding them.
addprocs(CPU_CORES - 1) # 4 cores
# addprocs(1) # 2 cores
nprocs()
nworkers()
workers()

# nworkers() #Number of available worker processes. nworkers() = nprocs() - 1.
# rmproc()

3-element Vector{Int64}:
 2
 3
 4

### Using tasks

In [2]:
a() = sum(i for i in 1:1000) # sum of integers from 1 to 1000 = 500500
b   = Task(a)  # or b = @task a()  # associate task to this function

Task (runnable) @0x0000016370f17080

In [3]:
istaskstarted(b) 

false

In [4]:
schedule(b) # Adds the task to the scheduler's queue

Task (done) @0x0000016370f17080

In [5]:
istaskstarted(b)

true

In [6]:
istaskdone(b)

true

In [7]:
b

Task (done) @0x0000016370f17080

In [8]:
fetch(b) # get the answer of task

500500

### Using several workers

In [9]:
workers() # find out how many workers are active

3-element Vector{Int64}:
 2
 3
 4

In [10]:
addprocs(2) # add two workers

2-element Vector{Int64}:
 5
 6

In [11]:
workers() 

5-element Vector{Int64}:
 2
 3
 4
 5
 6

In [12]:
nprocs(),rmprocs(7) # list number of active processes, remove process with PID 7
                      # rmprocs(1) does not work because procs = 1 is not a worker

(6, Task (done) @0x00000163712a9750)

In [13]:
w = 2 # worker number 2
r = remotecall(rand, w, 2, 2) # RemoteRef: reference to computed result.
# call rand function to be executed on worker 2.A "Future" object is returned (something that will be fetched later on)

Future(2, 1, 7, ReentrantLock(nothing, 0x00000000, 0x00, Base.GenericCondition{Base.Threads.SpinLock}(Base.IntrusiveLinkedList{Task}(nothing, nothing), Base.Threads.SpinLock(0)), (0, 1526583197952, 1526617392336)), nothing)

In [14]:
fetch(r)  # gets the computed result
          # blocks main processor until result is available 

2×2 Matrix{Float64}:
 0.768383   0.756695
 0.0655836  0.556093

In [15]:
x,y = 3,4
ψ= (x,y)->sqrt(x^2+y^2)
remotecall_fetch(ψ,w,x,y) # obtain value of computation of psi by worker number w and passing arguments x,y (both remotecall and fetch are performed at the same time)

5.0

In [16]:
using LinearAlgebra: diagm

w  = 3
d  = diagm(0=>[1,1]) # diagonal matrix with ones on the diagonal (first argument is the index of the diagonal used)
ex = :($d+fetch($r)) # this computes the value of d + r (which was computed by process 2)
s  = @spawnat w eval(ex) # worker w calculates expression ex
         # equivalent to s = @spawnat 3 [1 0;0 1]+fetch(r)
fetch(s)

# here we just passed information from worker 2 to 3 (if task performed at 3 requires info from task performed at 2)

2×2 Matrix{Float64}:
 1.76838    0.756695
 0.0655836  1.55609

In [17]:
s= @spawn  sin.(randn(2,2)) #worker is chosen automatically if not specified
fetch(s)

2×2 Matrix{Float64}:
 0.236882  0.358619
 0.977097  0.840854

In [18]:
@everywhere using LinearAlgebra: det # execute this on all processes (load this function on every process)
result = [@spawnat w det(randn(10,10)) for w in workers()] # execute this function on all processes through a loop
# all processes have access to the function det
[fetch(result[k]) for k=1:nprocs()-1] # get results on all current processes

5-element Vector{Float64}:
 -2581.4272353560655
   227.44715452527038
   -30.691584101535643
   110.9495426632715
  -656.9499866938274

In [19]:
@everywhere println(det(randn(10,10))) # same calculation on each worker

-2997.767123325436
      From worker 2:	137.60166161796403
      From worker 4:	3102.9780024007705


In [20]:
@everywhere function f(x::Float64)
    println(x +randn())
end 
@everywhere x=5.
@everywhere f(x)

      From worker 5:	-2422.770130536894
      From worker 3:	-10.174328111618731
      From worker 6:	21.90191099733196
5.059522439701156
      From worker 2:	4.422386297319377
      From worker 4:	4.930406395848587
      From worker 6:	5.512892318184002
      From worker 5:	6.693447597016816

### Parallel loops

In [21]:
function MC_pi(n) # approximation of pi using MC method
    s = 0
    for k in 1:n
        if rand()^2+rand()^2<1. # if we fall in the disk (all possible numbers are in the square)
            s=s+1 # add 1
        end
    end
    4*s/n # ratio between area of circle and square is π/4, s/n is an estimation of that ratio 
    #(number of points falling in the circle/total)
end
@time MC_pi(10^5)


      From worker 3:	5.20697166819353
  0.000932 seconds


3.14956

workers()
addprocs(3)
workers()

### The map and pmaps functions

map allows to call the same function for different inputs of the same sizes.

pmap does the same but distributes all the arguments to the different processes

In [22]:
function MC_pi_parallel(n)
    @everywhere function g(x) # function computing whether a point is in the circle or not
        rand()^2+rand()^2<1. ? 1 : 0 # no data is required here, function returns 1 if condition is true, 0 othw
    end
    s = sum(pmap(g,collect(1:n))) # we call function g with N different arguments, and distribute the computations
    4*s/n
end
@time MC_pi_parallel(10^5)


 15.781999 seconds (5.31 M allocations: 187.768 MiB, 0.48% gc time, 6.98% compilation time)


3.141

"pmap" is slower here! The reason is that the tasks made by each worker are too simple and fast, but they are numerous, so the overhead from transferring data in and out for each worker dominates the computing time

In [23]:
M = [randn(1000,1000) for k=1:100];
@time [det(m) for m in M];
@time pmap(det,M);  #parallel mapping of a function


  3.975591 seconds (35.69 k allocations: 766.035 MiB, 4.02% gc time, 0.96% compilation time)
  3.372015 seconds (417.59 k allocations: 27.853 MiB, 32.46% compilation time)


### Recovering returned values, Sending data and passing arguments to pmap

In [24]:
M = 100
N = 1000
X = randn(M,N)

X_parallel = [X[:,k] for k = 1:N]; # this creates an array containing all columns of X that can be iterated over

@everywhere using LinearAlgebra: det

@time dets = pmap(x -> det(x*x'),X_parallel);
@time dets = map(x -> det(x*x'),X_parallel);

println(size(dets))
println(dets[2]) # this is 0 because matrix has always a rank of 1

  1.347254 seconds (491.73 k allocations: 32.344 MiB, 52.57% compilation time)
  3.293958 seconds (288.51 k allocations: 172.764 MiB, 2.45% gc time, 13.14% compilation time)
(1000,)
0.0


In [25]:
# we need anonymous functions to call custom functions with pmap
# need the correct prototype to send parameters
@everywhere using LinearAlgebra: I

@everywhere function custom_det(x,λ,M) # computes the determinant of a new matrix with parameters
    return  determinant = det(x'*x .+ λ*Matrix{Float64}(I, M, M))
end
λ = 1

@time dets = pmap(x -> custom_det(x,λ,M),X_parallel);
@time dets = map(x -> custom_det(x,λ,M),X_parallel);

println(size(dets))
println(dets[2]) # this is 0 because matrix has always a rank of 1

  0.833735 seconds (121.44 k allocations: 7.486 MiB, 23.51% compilation time)
  2.682360 seconds (291.45 k allocations: 325.226 MiB, 2.24% gc time, 15.14% compilation time)
(1000,)
12619.514295933546


### Distributed arrays

In [34]:
using Pkg
Pkg.add("DistributedArrays") # compatibility pb with julia 1.2


[32m[1m   Resolving[22m[39m package versions...
[32m[1m   Installed[22m[39m Primes ──────────── v0.5.5
[32m[1m   Installed[22m[39m IntegerMathUtils ── v0.1.2
[32m[1m   Installed[22m[39m DistributedArrays ─ v0.6.7
[32m[1m    Updating[22m[39m `C:\Users\ASUS\.julia\environments\v1.9\Project.toml`
  [90m[aaf54ef3] [39m[92m+ DistributedArrays v0.6.7[39m
[32m[1m    Updating[22m[39m `C:\Users\ASUS\.julia\environments\v1.9\Manifest.toml`
  [90m[aaf54ef3] [39m[92m+ DistributedArrays v0.6.7[39m
  [90m[18e54dd8] [39m[92m+ IntegerMathUtils v0.1.2[39m
  [90m[27ebfcd6] [39m[92m+ Primes v0.5.5[39m
[32m[1mPrecompiling[22m[39m project...
[32m  ✓ [39m[90mIntegerMathUtils[39m
[32m  ✓ [39m[90mlibsixel_jll[39m
[32m  ✓ [39m[90mXSLT_jll[39m
[32m  ✓ [39m[90mOpenSSL[39m
[32m  ✓ [39m[90mJpegTurbo[39m
[32m  ✓ [39m[90mImageMagick_jll[39m
[32m  ✓ [39mSymPy
[32m  ✓ [39m[90mPrimes[39m
[32m  ✓ [39m[90mXorg_libxcb_jll[39m
[32m  ✓ [39m[90m

In [35]:
@everywhere using DistributedArrays # distributed arrays are arrays that can be accessed and modified by any process

In [36]:
A = drandn(100,100) # directly generate distributed array
B = randn(100,100) 
dB = distribute(B) # convert array to distributed array
#C = @DArray [i+j for i = 1:100, j = 1:100];

100×100 DArray{Float64, 2, Matrix{Float64}}:
  1.18814     0.422681    0.450962   -0.482658  …   0.357781    -0.0210447
 -0.41439     0.286582    0.0111516  -0.640546      1.97215     -0.157579
  0.947658   -0.258985   -0.330954    0.373281     -0.988894     0.648848
 -0.239755    0.323074    0.142217   -0.447602      1.20229      1.26713
 -0.771207    1.35612    -0.0833835   1.63494       0.0146511    0.880515
 -1.47298    -0.649393   -0.860647   -0.461034  …   1.09864      0.129528
  0.476586   -0.0464275   1.91947     1.27594       0.406454    -0.294126
 -0.0208718  -0.228948   -1.07559    -0.841695      0.327868    -0.724863
 -1.01898     1.19777     1.33577     1.01085       0.439644     0.405333
  0.375037    0.164794    0.823919   -0.447744      0.714095     0.0416126
 -1.65748    -0.195948    1.2115     -0.863233  …   0.703068     0.0310144
 -2.5549      0.649844    1.26907    -0.128673      0.304069     2.33844
 -1.74131    -1.05486    -0.030598    0.185805      1.02395     -0

In [37]:
@time A*A; # distributed array
@time dB*dB; # converted distributed array
@time B*B; # standard array

 11.217067 seconds (2.16 M allocations: 141.912 MiB, 19.24% compilation time: 3% of which was recompilation)
  0.021145 seconds (3.70 k allocations: 435.883 KiB)
  1.208637 seconds (1.23 M allocations: 83.788 MiB, 99.49% compilation time)


In [38]:
@time [i+j for i = 1:100, j = 1:100];
@time @DArray [i+j for i = 1:100, j = 1:100];

  0.059840 seconds (37.38 k allocations: 2.682 MiB, 97.88% compilation time)
  0.523431 seconds (122.28 k allocations: 8.195 MiB, 51.51% compilation time: 37% of which was recompilation)


In [39]:
println(nprocs())
# addprocs(7)
# rmprocs(12:13)
@everywhere using DistributedArrays
# WorkerPool([2,3,4])
# @DArray [@show x^2 for x = 1:10]; # print all squares from 1 to 10 (not in order because taken care of by different processes)

6


In [40]:
# results depending on nb of procs and data size
@time @DArray [x^2 for x = 1:10^8]; # distributed version (only advantageous for large datasets)
@time [x^2 for x = 1:10^8]; # standard version

# test as a function of number of processes and data size

  1.229742 seconds (467.75 k allocations: 27.557 MiB, 60.15% compilation time)
  4.789114 seconds (54.76 k allocations: 766.464 MiB, 1.95% gc time, 3.46% compilation time)


In [41]:
dzeros((100,100), workers()[1:4], [1,4]) # creates a distributed zero array where 
# the second dimension is spread around workers 1 to 4

100×100 DArray{Float64, 2, Matrix{Float64}}:
 0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  …  0.0  0.0  0.0  0.0  0.0  0.0  0.0
 0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0     0.0  0.0  0.0  0.0  0.0  0.0  0.0
 0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0     0.0  0.0  0.0  0.0  0.0  0.0  0.0
 0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0     0.0  0.0  0.0  0.0  0.0  0.0  0.0
 0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0     0.0  0.0  0.0  0.0  0.0  0.0  0.0
 0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  …  0.0  0.0  0.0  0.0  0.0  0.0  0.0
 0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0     0.0  0.0  0.0  0.0  0.0  0.0  0.0
 0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0     0.0  0.0  0.0  0.0  0.0  0.0  0.0
 0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0     0.0  0.0  0.0  0.0  0.0  0.0  0.0
 0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0     0.0  0.0  0.0  0.0  0.0  0.0  0.0
 0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  …  0.0  0.0  0.0  0.0  0.0  0.0  0.0
 0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0     0.0  0.0  0.0  0.0  0.0  0.0  0.0
 0.0  0.0  0.0  0.0

------------