# Einführung in die Parallelisierung (Julia 1.4.1)

Zunächst ein paar Liks mit Infos, die im Folgenden aufbereitet wurden:

Einführender Artikel im Linux-Magazin: https://www.linux-magazin.de/ausgaben/2015/10/julia/4/ 

Interessantes Tutorial zum Thema: https://www.youtube.com/watch?v=HWLV6oTmfO8

Parallel Computing in Julia Docs : https://docs.julialang.org/en/v1/manual/parallel-computing/

DistributedArrays.jl: https://juliaparallel.github.io/DistributedArrays.jl/stable/#Example-1


### Motivation

In [1]:
## Beispiel 0: Approximation von Pi mit nur einem Prozess
using Distributed

# Teste wie viele Prozesse arbeiten
print("Number of Cores: ", nprocs()," and Number of Workers: ", nworkers(), "    ---    ")

# define a function that counts the number of points falling inside the circle
@everywhere function points_inside_circle(n)
    n_in = 0
    for i = 1:n
        x, y = rand(), rand()
        n_in += (x*x + y*y) <= 1
    end
    return n_in
end

# define a function wrapper that computes the approximation of pi in parallel
@everywhere function pi_p(n)    
    p = nworkers()
    n_in = @distributed (+) for i = 1:p
        points_inside_circle(n/p)
    end
    return 4 * n_in / n
end


@time pi_p(1_000_000_000)


Number of Cores: 1 and Number of Workers: 1    ---     12.448137 seconds (385.42 k allocations: 19.520 MiB, 0.12% gc time)


3.141633092

### Erste Schritte

### Beispiele

In [2]:
## Beispiel 1: Füge fünf neue Prozesse hinzu 
using Distributed
addprocs(5)
print("Number of Cores: ", nprocs()," and Number of Workers: ", nworkers(), "    ---    ")
# Somit haben wir einen Leader (immer ID 1) und n Worker verteilt auf n+1 Prozesse

Number of Cores: 6 and Number of Workers: 5    ---    

In [3]:
## Beispiel 2: Rufe Informationen über die Prozesse ab
for i in workers()
    id, pid, host = fetch(@spawnat i (myid(), getpid(), gethostname()))
    print("| I have the ID: ", id, " with PID: ", pid, " and HOST: ", host, " |")
end

| I have the ID: 2 with PID: 20205 and HOST: MacBook-3.local || I have the ID: 3 with PID: 20206 and HOST: MacBook-3.local || I have the ID: 4 with PID: 20207 and HOST: MacBook-3.local || I have the ID: 5 with PID: 20208 and HOST: MacBook-3.local || I have the ID: 6 with PID: 20209 and HOST: MacBook-3.local |

### Prozesse ausführen

Wichtige Funktionen: 
* $remotecall()$ - Aufgabe von einem bestimmten Prozess ausführen lassen. (Asynchroner Aufruf, kein Blocking, Return sofort)
* $fetch()$ - Ergebnis abholen. (Julia verwendet eine Future-Referenz. Diese ist sofort verfügbar, das Ergebnis muss aber abgholt werden.)
* $remotecall$_$fetch()$ - Synchroner Aufruf inkl. Abholung des Ergebnisses. (Kombiniert $remotecal()$ und $fetch()$.)
* $@spawn$ - Aufgaben an andere Prozesse senden
* $@spawnat$ - Aufgabe an einen bestimmten Prozess senden

### Beispiele

In [4]:
## Beispiel 3: Prozess mit ID 2 soll 1 + 3 rechnen

result = remotecall(+, 2, 1, 3) 

fetch(result) # Ergebnis muss abgeholt werden

4

In [5]:
## Beispiel 4: Prozess 2 nutzt Ergebnis von Prozess 4

result = remotecall(rand, 4, 3, 4) # Prozess 4 erzeugt eine 3x4 Zufallsmatrix
s = @spawnat 2 1 .+ fetch(result)  # Prozess 2 benutzt das Ergebnis von 4 und hängt 1. davor
fetch(s)

3×4 Array{Float64,2}:
 1.49902  1.33954  1.43374  1.09485
 1.68542  1.57466  1.26226  1.13836
 1.09244  1.54261  1.56586  1.71432

In [6]:
## Beispiel 5: Approximation von Pi mit mehreren Prozessen
# Mit @distributed wird die for-loop in der zweiten Funktion verteilt ausgeführt.
using Distributed
# define a function that counts the number of points falling inside the circle
@everywhere function points_inside_circle(n)
    n_in = 0
    for i = 1:n
        x, y = rand(), rand()
        n_in += (x*x + y*y) <= 1
    end
    return n_in
end

# define a function wrapper that computes the approximation of pi in parallel
@everywhere function pi_p(n)    
    p = nworkers()
    n_in = @distributed (+) for i = 1:p 
        points_inside_circle(n/p)
    end
    return 4 * n_in / n
end


@time pi_p(1_000_000_000)


  6.453599 seconds (82.94 k allocations: 4.094 MiB)


3.1415535

Wie man hier sehen  kann, halbiert sich die Berechnungszeit fast, indem man verteilt auf sechs Prozesse rechnet. Ich rechne auf 6 Kernen, also macht diese Aufteilung für mich Sinn.

# Einführung in das Paket Distributed Arrays
Große Berechnungen sind meist aus großen Arrays aufgebaut.
In diesem Fall macht es Sinn, die Arrays unter verschiedenen Prozessen aufzuteilen.
Indem die Speicherkapazitäten von verschiedenen Maschinen genutzt werden können, können so unter anderem Arrays berechnet werden, die sonst zu groß wären.
Jeder Prozess kann den ihm zugewiesenen Teil des Arrays lesen und überschreiben. Gleichzeitig kann jeder Prozess das vollständige Array lesen (read-only).

In Julia werden Distributed Arrays durch den "DArray"-Typ beschrieben. 
* Elementtyp und Dimensionen sind wie bei normalen Arrays
* Enthaltene Daten werden durch eine Aufteilung der Indexmenge in eine bestimmte Anzahl von Blöcken in jeder Dimension aufgeteilt

Vorteile:
* When dividing data among a large number of processes, one often sees diminishing returns in performance. Placing DArrays on a subset of processes allows multiple DArray computations to happen at once, with a higher ratio of work to communication on each process.
* Each Process has a global view on all distributed objects.

Beachte:
* Das Konzept unterscheidet sich sehr stark von SPMD.

In [7]:
@everywhere using DistributedArrays

### Beispiele für Distributed Arrays:
* Julia übernimmt hier die gesamte Arbeit: Die einzelnen Matrixelemente sind auf die verschiedenen Prozesse verteilt und auch initialisiert.

### Für mehr Kontrolle:
* Spezifizifikation eines bestimmten Prozesses für eine Aufgabe
* Festlegung, wie die Daten aufgeteilt werden sollen

Beispiel:

In [9]:
#= 
Second argument: Array should be created on the first three workers.
Third argument: Specify a distribution; the nth element of this array specifies
how many pieces dimension n should be divided into.
HERE: The first dimension will not be divided, the second dimension will be divided into 4 pieces.
Therefore each local chunk will be of size (100,25).
=#
dzeros((100,100), workers()[1:4], [1, 4])

100×100 DArray{Float64,2,Array{Float64,2}}:
 0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  …  0.0  0.0  0.0  0.0  0.0  0.0  0.0
 0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0     0.0  0.0  0.0  0.0  0.0  0.0  0.0
 0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0     0.0  0.0  0.0  0.0  0.0  0.0  0.0
 0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0     0.0  0.0  0.0  0.0  0.0  0.0  0.0
 0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0     0.0  0.0  0.0  0.0  0.0  0.0  0.0
 0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  …  0.0  0.0  0.0  0.0  0.0  0.0  0.0
 0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0     0.0  0.0  0.0  0.0  0.0  0.0  0.0
 0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0     0.0  0.0  0.0  0.0  0.0  0.0  0.0
 0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0     0.0  0.0  0.0  0.0  0.0  0.0  0.0
 0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0     0.0  0.0  0.0  0.0  0.0  0.0  0.0
 0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  …  0.0  0.0  0.0  0.0  0.0  0.0  0.0
 0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0     0.0  0.0  0.0  0.0  0.0  0.0  0.0
 0.0  0.0  0.0  0.0 

## Hilfreiche Funktionen:

Note: Indexing a DArray (square brackets) with ranges of indices always creates a SubArray, not copying any data.

# Initialisierung

* $init$ - vom Programmierer bereitzustellen; bekommt ein Tupel von Index-Bereichen übergeben, für die das Array dann innerhalb der Init-Funktion initialisiert wird; Init-Funktion initialisiert also lokale Bereiche des Distributed Arrays
* $dims$ - Dimension des Distributed Arrays 
* $procs$ - Optional: Spezifiziert einen Vektor von Prozess IDs, die genutzt werden sollen
* $dist$ - Optional: Integer Vektor, der angibt wie der Distributed Array in jeder Dimension unter den Prozessen aufgeteilt werden soll

### Beispiel: Verteilte Zufallsmatrix

In [10]:
@everywhere using DistributedArrays
# ------------------------------------
#=Erzeuge eine verteilte Zufallsmatrix und gib den Teil zurück, für den Prozess 2 verantwortlich ist.
Mehrmaliges Hintereinanderausführen führt zu einer immer feineren, automatischen Aufteilung der Matrix,
da mehr Prozesse hinzugefügt werden.=#

d = drand(10,10);
r = @spawnat 2 begin
    localpart(d)
end
fetch(r)

10×2 Array{Float64,2}:
 0.659764     0.763937
 0.496316     0.455932
 0.230941     0.682584
 0.396005     0.956611
 0.292389     0.98873
 0.857961     0.897359
 0.11904      0.414582
 0.993662     0.804279
 0.825948     0.860532
 0.000595759  0.413251

Das Praktische: 
Julia versteckt die gesamte Kommunikation vor dem Benutzer. Will man z.B. die Summe aller Elemente der Matrix berechnen, so ruft man einfach $sum(d)$ auf. Das liefert direkt das Ergebnis. Julia summiert dabei mit jedem einzelnen Prozess alle Elemente des Teilarray auf. Anschließend werden dann alle Teilergebnisse zur Gesamtsumme addiert.

In [11]:
#=Erzeuge eine verteilte Zufallsmatrix.=#

d = drand(10,10);

@time sum(d)

  1.283986 seconds (1.59 M allocations: 91.756 MiB, 1.34% gc time)


51.07062451268587

### Beispiel: Matrix verteilt an vier Prozesse angeordnet in einem 2x2-Raster

Jeder Prozess ist einem Teil der Matrix zugeordnet, hat aber trotzdem über die globalen Indizes remoten Zugriff auf den Rest der Matrix, obwaohl diese nicht lokal ist. 


In [12]:
@everywhere using LinearAlgebra
@everywhere using InteractiveUtils 

# erstelle globale, dünnbesetzte nxn Matrizen aa(n), b1(n) und b2(n)
@everywhere function aa(n)
    la = zeros(n, n)
    la[diagind(la, 0)] .= 2.0
    la[diagind(la, -1)] .= -1.0
    la[diagind(la, 1)] .= -1.0
    return la
end

@everywhere function b1(n)
    la = zeros(n, n);
    la[1, n] = -1.0;
    return la
end

@everywhere function b2(n)
    la = zeros(n, n);
    la[n, 1] = -1.0;
    return la
end

#=  Ordne die Prozesse (Workers) einzelnen Teilen zu
    => Es entsteht keine große Datenkommunikation zwischen dem Leader und den Workers
    d11 bis d22 sind keine Matrizen, sondern Handles - Futures
    => sie nehmen keinen Speicherplatz ein =#
   
d11 = @spawnat 2 aa(4)
d12 = @spawnat 3 b1(4)
d21 = @spawnat 4 b2(4)
d22 = @spawnat 5 aa(4)

#=  erzeuge eine verteilte Matrix
    DA ist keine ganze matrix, sondern eine Referenz =#

DA = DArray(reshape([d11 d12 d21 d22], (2, 2)))  

8×8 DArray{Float64,2,Array{Float64,2}}:
  2.0  -1.0   0.0   0.0   0.0   0.0   0.0   0.0
 -1.0   2.0  -1.0   0.0   0.0   0.0   0.0   0.0
  0.0  -1.0   2.0  -1.0   0.0   0.0   0.0   0.0
  0.0   0.0  -1.0   2.0  -1.0   0.0   0.0   0.0
  0.0   0.0   0.0  -1.0   2.0  -1.0   0.0   0.0
  0.0   0.0   0.0   0.0  -1.0   2.0  -1.0   0.0
  0.0   0.0   0.0   0.0   0.0  -1.0   2.0  -1.0
  0.0   0.0   0.0   0.0   0.0   0.0  -1.0   2.0

In [13]:
# Setzte n = 100 und betrachte die Variablen, die im Leader-Prozess gespeichert sind:

n = 100
d11 = @spawnat 2 aa(n)
d12 = @spawnat 3 b1(n)
d21 = @spawnat 4 b2(n)
d22 = @spawnat 5 aa(n)
DA = DArray(reshape([d11 d12 d21 d22], (2, 2))); 
varinfo()


| name                 |      size | summary                                    |
|:-------------------- | ---------:|:------------------------------------------ |
| Base                 |           | Module                                     |
| Core                 |           | Module                                     |
| DA                   | 544 bytes | 200×200 DArray{Float64,2,Array{Float64,2}} |
| Main                 |           | Module                                     |
| aa                   |   0 bytes | typeof(aa)                                 |
| b1                   |   0 bytes | typeof(b1)                                 |
| b2                   |   0 bytes | typeof(b2)                                 |
| d                    | 600 bytes | 10×10 DArray{Float64,2,Array{Float64,2}}   |
| d11                  |  32 bytes | Future                                     |
| d12                  |  32 bytes | Future                                     |
| d21                  |  32 bytes | Future                                     |
| d22                  |  32 bytes | Future                                     |
| n                    |   8 bytes | Int64                                      |
| pi_p                 |   0 bytes | typeof(pi_p)                               |
| points_inside_circle |   0 bytes | typeof(points_inside_circle)               |
| r                    | 240 bytes | Future                                     |
| result               |  32 bytes | Future                                     |
| s                    | 176 bytes | Future                                     |


DA ist eine 200x200 Matrix und besetzt 544 Bytes Speicher. Jeder Prozess erhält eine 100x100 Matrix zugewiesen.

In [14]:
# Betrachte die gespeicherten Variablen des Prozesses 2:
fetch(@spawnat 2 varinfo())

| name                 |      size | summary                                  |
|:-------------------- | ---------:|:---------------------------------------- |
| Base                 |           | Module                                   |
| Core                 |           | Module                                   |
| Distributed          | 1.511 MiB | Module                                   |
| Main                 |           | Module                                   |
| aa                   |   0 bytes | typeof(aa)                               |
| b1                   |   0 bytes | typeof(b1)                               |
| b2                   |   0 bytes | typeof(b2)                               |
| d                    | 760 bytes | 10×10 DArray{Float64,2,Array{Float64,2}} |
| n                    |   8 bytes | Int64                                    |
| pi_p                 |   0 bytes | typeof(pi_p)                             |
| points_inside_circle |   0 bytes | typeof(points_inside_circle)             |
| result               | 176 bytes | Future                                   |


Zurück zur kleineren Matrix: Nun wollen wir die Matrixmultiplikation A*A direkt auf dem Distributed Array durchführen:

In [15]:
d11 = @spawnat 2 aa(4)
d12 = @spawnat 3 b1(4)
d21 = @spawnat 4 b2(4)
d22 = @spawnat 5 aa(4)
DA = DArray(reshape([d11 d12 d21 d22], (2, 2)));

DB = dzeros(8, 8)
DB = DA * DA

# Betrachte die Remote-Variablen von Prozess drei:
remotecall_fetch(localpart, 3, DB)

4×4 Array{Float64,2}:
 0.0  0.0  1.0  -4.0
 0.0  0.0  0.0   1.0
 0.0  0.0  0.0   0.0
 0.0  0.0  0.0   0.0

Wir können über die globalen Indizes remote die ganze Matrix erreichen, auch wenn dieser Teil einem bestimmten Prozess zugeordnet ist. Als Output wird ein View ausgegeben:

In [16]:
DB[5:8,1:4]

4×4 view(::DArray{Float64,2,Array{Float64,2}}, 5:8, 1:4) with eltype Float64:
 0.0  0.0  1.0  -4.0
 0.0  0.0  0.0   1.0
 0.0  0.0  0.0   0.0
 0.0  0.0  0.0   0.0

### Beispiel: Julia mit Julia (funktioniert hier leider noch nicht wie gewünscht^^)

http://mathemartician.blogspot.com/2012/07/julia-set-in-julia.html

zum Nachlesen!

In [None]:
#DistributedArrays, WIDTH, HEIGHT und MAXITER sind global
@everywhere WIDTH = 1000
@everywhere HEIGHT = 500
@everywhere MAXITER = 100

#Images ist lokal
using Images

# Julia-Funktion
@everywhere function julia(z, maxiter::Int64)
    c = -0.8 + 0.156im
    for n = 1:maxiter
        if abs(z) > 2.0
            return n-1
        end
        z = z^2 + c
    end
    return maxiter
end

# Init-Funktion zum Anlegen des Arrays
@everywhere function parJuliaInit(I)
    e = (size(I[1], 1), size(I[2], 1))
    m = Array(Int, e)
    xMin=I[2][1]
    yMin=I[1][1]
    
    for x = I[2], y = I[1]
        c = complex((x - WIDTH/2) / (HEIGHT/2), (y - HEIGHT/2) / (HEIGHT/2))
        m[y - yMin + 1, x - xMin + 1] = julia(c, MAXITER)
    end
    return m
end

# Distributed Array anlegen
Dm = DArray(parJuliaInit, (HEIGHT, WIDTH))

# Distributed Array auf den lokalen Prozess holen
#m = convert(Array, Dm)

#Bild als PNG speichern
#imwrite(grayim(transpose(m)/(1.0*MAXITER)),"juliaset.png")