Programmieren 3 - Verteilung

Peter Rösch, Fakultät für Informatik

Hochschule Augsburg, 2023/2024

# Einführung

**Frage:** Mit welchen Verteilten Systemen haben Sie schon gearbeitet?

[Email von Lesley Lamport](https://lamport.azurewebsites.net/pubs/distributed-system.txt) 

Informationen zu verteilten Systemen finden Sie z.B. über die Bibliothek, [O'Reilly](https://www.hs-augsburg.de/Bibliothek/O-Reilly.html) oder bei Wikipedia: [Verteiltes System](http://de.wikipedia.org/wiki/Verteilte_Systeme), [Verteilte Anwendungen](http://de.wikipedia.org/wiki/Verteilte_Anwendung). 

**Frage:** Wie können Sie ein verteiltes System halbwegs bequem auf Ihrem Rechner (Singular) erstellen und testen?

# Docker

**Frage:** Wie hat sich der Standard-Ansatz zur Erstellung eines Web-Servers mit der Zeit verändert?

(Ausgemusterter) Rechner $\rightarrow$ Virtuelle Maschine $\rightarrow$ Container (Docker)

Welche Gründe gab es für diese Entwicklung?

## Einführung

Quelle: Jeffrey Nickoloff, Stephen Kuenzli: Docker in Action, Second Edition [O'Reilly](https://learning.oreilly.com/library/view/docker-in-action/9781617294761).

Hello world:

In [None]:
%%bash
docker run dockerinaction/hello_world

Was ist da gerade passiert?

[Visualisierung](https://learning.oreilly.com/api/v2/epubs/urn:orm:book:9781617294761/files/01fig01_alt.jpg)

Visual Studio Code bietet ein Plugin für Docker (Demo).

[Mehrere Container auf einem Linux-System](https://learning.oreilly.com/api/v2/epubs/urn:orm:book:9781617294761/files/01fig04_alt.jpg)

Frage: Was ist der Unterschied zu Virtuellen Maschinen?

Beispiel für ein vom host isoliertes verteiltes System: [*nginx*, *watcher* und *mailer*](https://learning.oreilly.com/api/v2/epubs/urn:orm:book:9781617294761/files/02fig01_alt.jpg)

Wir werden nur *nginx* und *watcher* ausprobieren.

In [None]:
%%bash
docker run --detach --name web nginx:latest

Starten eines interaktiven Containers, der mit den anderen Containern verbunden ist.

Eingabe im Terminal:

    docker run --interactive --tty --link web:web --name web_test busybox:1.29 /bin/sh

Von *web_test* aus ausführen:
    
    wget -O - http://web:80/

Es gibt jede Menge erprobter [Docker-Images](https://hub.docker.com/search?type=image).

**Frage:** Welche Einsatzmöglichkeiten sehen Sie für *docker*?

# Middleware

Auch hier finden Sie Informationen über die [Bibliothek](https://www.hs-augsburg.de/Bibliothek.html) oder im entsprechenden [Wikipedia-Artikel](http://de.wikipedia.org/wiki/Middleware).

Eine früher populäre Middleware ist [corba](https://corba.org). Diese Middleware hatte einige [Nachteile](https://www.answers.com/Q/What_are_the_advantages_and_disadvantage_OF_CORBA).

Einen Vergleich unterschiedlicher Ansätze (2010) finden Sie [hier](https://pdfs.semanticscholar.org/b1d8/666a51ed67cef7ba7f0b6b7e8d0e0a777bba.pdf).

Wir verwenden die Middleware *ice* von [zeroc](http://www.zeroc.com). Die Dokumentation finden Sie [hier](https://doc.zeroc.com/ice/3.7).

Frage: Welche Probleme können bei einer verteilten Anwendung auftauchen?

## Ein Server zur Berechnung von $\pi$ mit *zeroc-ice*

**Aufgabe:**

Erstellen Sie einen Server in Java, der die Zahl $\pi$ mit der Monte-Carlo-Methode berechnet. Der Server soll zu Demonstrationszwecken von einem in Python implementierten Client aus aufgerufen werden. Es werden maximal eine Million Versuche unterstützt.

Sie können davon ausgehen, dass keine Firewalls involviert sind und kein redundanter Service notwendig ist.

Als Technologie soll *zeroc-ice* verwendet werden.

**Aufgabe verstehen:**

Gibt es Fragen?

**Recherche:**
    
* Es existiert eine Implementierung der $\pi$-Berechnung in Java (siehe unten)
* Da die Technologie vorgegeben ist, findet man die Terminologie in der [Dokumentation](https://doc.zeroc.com/ice/3.7). 

In [None]:
%%file /tmp/PiMonteCarlo.java
import java.util.Random; 

public class PiMonteCarlo{
    private Random rand;
    private long seed = 20220512;

    public PiMonteCarlo(){
        rand = new Random(seed); 
    }
    public double calcPi(long n){
        long hits = 0;
        for (int i=0; i<n; i++){
            double x = this.rand.nextDouble();
            double y = this.rand.nextDouble();
            if (x * x + y * y <= 1.0)
                    hits++;
        }
        return 4.0 * hits / n;
    }
    public static void main(String[] args){
        long n = Long.valueOf(args[0]);
        PiMonteCarlo aPiMonteCarlo  = new PiMonteCarlo();
        double pi = aPiMonteCarlo.calcPi(n);
        System.out.println("Pi:" + pi);
    }
}

In [None]:
%%script bash
cd /tmp
javac PiMonteCarlo.java
java PiMonteCarlo 100000

**Zerlegung in Teilaufgaben:**

Die Zerlegung ist hier weitgehend vorgegeben.

* Entwurf der Architektur
* Definition der Schnittstellen
* Erstellung des Servers
* Erstellung des Clients

**Lösungsansätze (Algorithmen und Datenstrukturen):**
    
* Der Algorithmus zur Berechnung von $\pi$ existiert (siehe oben)
* Es wird ein Server, ein Client und eine Middleware benötigt
* In diesem Fall müssen nur einfache Datentypen (Anzahl der Schritte, Ergebnis für $\pi$) ausgetauscht werden


**Entscheidungen (Ansatz, Werkzeuge):**

Hier gibt es klare Vorgaben:

* Middleware: *zeroc-ice*
* Server: Programmiersprache Java, Code zur Berechnung wie oben dargestellt
* Client: Programmiersprache Python
* Es soll mit einem möglichst einfaches System begonnen werden

In [None]:
%%bash
mkdir -p ice_example/slice
mkdir -p ice_example/client
mkdir -p ice_example/server

### Die Schnittstelle (*slice*)

Ein Blick in die [Dokumentation](https://doc.zeroc.com/ice/3.7/the-slice-language) kann sehr hilfreich sein ...

In [None]:
%%file ice_example/slice/PiMonteCarlo.ice

#pragma once

module Calculators{

    exception RangeError{
        string reason = "out of range";
    };
 
    interface PiMonteCarlo {
        double calculatePi(long N)
            throws RangeError;
    };
};

### Automatische Erzeugung der sprachspezifischen Dateien.

In [None]:
%%bash
cd ice_example/client
slice2py ../slice/PiMonteCarlo.ice
cd ../server
slice2java ../slice/PiMonteCarlo.ice

### Implementierung des Servers

Und wieder schauen wir in die [Dokumentation](https://doc.zeroc.com/ice/3.7/language-mappings/java-mapping/server-side-slice-to-java-mapping) ...

In [None]:
%%file ice_example/server/PiMonteCarloI.java
import java.util.Random; 

public class PiMonteCarloI implements Calculators.PiMonteCarlo{
    private Random rand;
    private long seed = 2122019;

    public PiMonteCarloI(){
        rand = new Random(seed); 
    }

    public double calculatePi(long n, com.zeroc.Ice.Current current) 
            throws Calculators.RangeError{
        if (n<=0 || n > 1e6)
            throw new Calculators.RangeError();
        long hits = 0;
        for (int i=0; i<n; i++){
            double x = this.rand.nextDouble();
            double y = this.rand.nextDouble();
            if (x * x + y * y <= 1.0)
                    hits++;
        }
        return 4.0 * hits / n;
    }
}

In [None]:
%%file ice_example/server/Server.java

public class Server {
    public static void
    main(String[] args)
    {
        int status = 0;
        com.zeroc.Ice.Communicator ic = null;
        try {
            ic = com.zeroc.Ice.Util.initialize(args);
            com.zeroc.Ice.ObjectAdapter adapter =
                ic.createObjectAdapterWithEndpoints("PiCruncherAdapter", 
                        "default -p 34000");
            com.zeroc.Ice.Object obj = new PiMonteCarloI();
            adapter.add(obj, com.zeroc.Ice.Util.stringToIdentity("PiCruncher"));
            adapter.activate();
            ic.waitForShutdown();
        } catch (com.zeroc.Ice.LocalException e) {
            e.printStackTrace();
            status = 1;
        } catch (Exception e) {
            System.err.println(e.getMessage());
            status = 1;
        }
        if (ic != null) {
            // Clean up
            //
            try {
                ic.destroy();
            } catch (Exception e) {
                System.err.println(e.getMessage());
                status = 1;
            }
        }
        System.exit(status);
    }
}

In [None]:
%%bash
cd ice_example/server
mkdir classes
javac -d classes -classpath ./classes:/usr/share/java/ice-3.7.8.jar *.java Calculators/*.java

### Implementierung des Clients

Wieder hilft die [Dokumentation](https://doc.zeroc.com/ice/3.6/language-mappings/python-mapping) ...

In [None]:
%%file ice_example/client/PiMonteCarloClientIce.py

import Calculators
import Ice
import traceback
import sys

if __name__ == '__main__':
    status = 0
    ic = None
    try:
        ic = Ice.initialize(sys.argv)
        obj = ic.stringToProxy('PiCruncher:default -h "localhost" -p 34000')
        #obj = ic.stringToProxy('PiCruncher:default -h "rechentier.informatik.hs-augsburg.de" -p 34000')
        PiCalculator = Calculators.PiMonteCarloPrx.checkedCast(obj)
        pi = PiCalculator.calculatePi(1000000)
        print('pi:', pi)
    except Exception:
        traceback.print_exc()
        status = 1
    finally:
        try:
            if ic is not None:
                ic.destroy()
        except Execption:
            traceback.print_exc()
            status = 1
    sys.exit(status)

### Demonstration / Plausibilitätstest 

In [None]:
%%file ice_example/server/start_server.sh
export CLASSPATH=$CLASSPATH:./classes:/usr/share/java/ice-3.7.8.jar:.
java Server

Diese Date muss von der shell aus ausgeführt werden mit
    
    cd ice_example/server
    source start_server.sh

In [None]:
%%bash
cd ice_example/client
python PiMonteCarloClientIce.py

## Pi-Server in einem Docker-Container

Ziel: Erstellung eines Docker-Containers, der unseren Pi-Server unter Port 34000 zur Verfügung stellt.

In [None]:
%%file Dockerfile

FROM debian:bookworm

LABEL maintainer="prog3@hs-augsburg.de"

RUN apt update && \
    apt-get -y install openjdk-17-jre && \
    apt-get -y install libzeroc-ice3.7-java && \
    adduser pi_calculator

ENV APPROOT="/app" \
    APP="start_server.sh" \
    VERSION="0.42"

WORKDIR ${APPROOT}

COPY ["./ice_example/server/", "${APPROOT}"]

RUN chmod a+x ${APPROOT}/${APP} && \
    chown -R pi_calculator:pi_calculator ${APPROOT}

USER pi_calculator:pi_calculator

ENTRYPOINT ${APPROOT}/${APP}
#ENTRYPOINT ["/bin/bash"]

EXPOSE 34000

In [None]:
%%bash
docker image build -t pi_server:0.42 .

In [None]:
%%bash
docker run -d --rm -p 34000:34000/tcp pi_server:0.42

# Verteilung mit multiprocessing

Mit dem *multiprocessing*-Paket sind nur wenige Änderungen nötig, um eine auf Queues basierende parallelisierte Anwendung auf mehrere Rechner zu verteilen.

Die Queues werden von einem *TaskManager* verwaltet, der auf einem der Rechner im Cluster läuft und über einen bestimmten Port erreichbar ist. Der *TaskManager* kümmert sich um den Zugriff aller Rechner auf die Objekte, die er verwaltet (in diesem Fall zwei Queues).

In [None]:
%%file /tmp/distributedQueue.py

from multiprocessing.managers import BaseManager
from multiprocessing import JoinableQueue, Queue

class TaskManager(BaseManager):
    pass

if __name__ == '__main__':
    from sys import argv, exit
    if len(argv) != 2:
        print('usage:', argv[0], b'socket_nr')
        exit(0)
    master_socket = int(argv[1])
    task_queue = JoinableQueue()
    result_queue = Queue()
    TaskManager.register('get_job_queue', 
                         callable = lambda:task_queue)
    TaskManager.register('get_result_queue', 
                         callable = lambda:result_queue)
    m = TaskManager(address = ('', master_socket), 
                    authkey = b'secret')
    print('starting queue server, socket', master_socket)
    m.get_server().serve_forever()

Der Worker muss die IP-Adresse und den Port kennen, unter dem er den *TaskManager* und somit die beiden Queues erreichen kann. Auf jedem Rechner im Cluster sollte ein Worker gestartet werden.

In [None]:
%%file /tmp/workerTSP.py

from multiprocessing import cpu_count, Process
from distributedQueue import TaskManager
import math

def _distance(p1, p2):
    delta_x = p2[0] - p1[0]
    delta_y = p2[1] - p1[1]
    return math.sqrt(delta_x * delta_x + delta_y * delta_y)

def _path_length(path):
    total_len = 0.0
    for i in range(1, len(path)):
        total_len += _distance(path[i-1], path[i])
    return total_len

def _shortest_closed_path(path_completed, path_ahead,
        min_length=None, min_path=None):
    if len(path_ahead) == 1:
        # create a closed path by adding the starting point
        total_path = path_completed + path_ahead + (path_completed[0], )
        total_path_length = _path_length(total_path)
        if min_length == None or total_path_length < min_length:
            min_length, min_path = total_path_length, total_path  
    else:
        # continue recursion
        for i in range(len(path_ahead)):
            min_length, min_path = \
                _shortest_closed_path(path_completed + (path_ahead[i], ),
                                       path_ahead[:i] + path_ahead[i+1:],
                                       min_length, min_path)
    return min_length, min_path

def _worker_function(job_queue, result_queue):
    while True:
        task = job_queue.get()
        result = _shortest_closed_path(*task)
        result_queue.put(result)
        job_queue.task_done()

def _start_workers(m):
    job_queue, result_queue = m.get_job_queue(), m.get_result_queue()
    nr_of_processes = cpu_count()
    processes = [Process(target = _worker_function,
            args = (job_queue, result_queue))
        for i in range(nr_of_processes)]
    for p in processes:
        p.start()
    return nr_of_processes

if __name__ == '__main__':
    from sys import argv, exit
    if len(argv) < 3:
        print('usage:', argv[0], 'server_IP server_socket')
        exit(0)
    server_ip = argv[1]
    server_socket = int(argv[2])
    TaskManager.register('get_job_queue')
    TaskManager.register('get_result_queue')
    m = TaskManager(address=(server_ip, server_socket), authkey = b'secret')
    m.connect()
    nr_of_processes = _start_workers(m)
    print(nr_of_processes, 'workers started')

Der Master füllt die *jobQueue* mit Auftägen und wertet die Ergebnisse, die von den Workern auf die *resultQueue* gelegt wurden, aus. Dazu muss er ebenfalls wissen, wie er den TaskManager finden kann. 

In [None]:
%%file /tmp/masterTSP.py

from distributedQueue import TaskManager
import random, math, time

staedte_positionen = ((0.010319427306382911, 0.8956251389386756), (0.6999898714299346, 0.42254500074835377), (0.4294574582950912, 0.4568408794115657), (0.6005454852683483, 0.9295407203370832), (0.9590226056623925, 0.581453646599427), (0.748521134122647, 0.5437775417153159), (0.7571232013282426, 0.606435031856663), (0.07528757443413125, 0.07854082131763074), (0.32346175150639334, 0.7291706487873425), (0.012935451483722882, 0.974440252089956), (0.7894689664351368, 0.8925464165283283), (0.5017081207027582, 0.2323298297211428), (0.5994368069089712, 0.006438246252584379), (0.3471372841416518, 0.32362936726486546), (0.9080568556459205, 0.5872162265716462), (0.008216651916432838, 0.5605251786730867), (0.12281649843134745, 0.778836327426156), (0.9698199622470612, 0.9108771425774694), (0.22977122891732482, 0.9692739885317619), (0.8192293086323663, 0.5857981607663957), (0.1422079724040628, 0.8147259475583606), (0.6706795717064135, 0.591561956032189), (0.15756919328106178, 0.6331745919782176), (0.9932745190952539, 0.20429268341528184), (0.21104352892679712, 0.8836996377783977), (0.15162951778287448, 0.43829883402923786), (0.1014198097226855, 0.5877946138306056), (0.8961534561384676, 0.6498866051905969), (0.02348788064910401, 0.2555771312427847), (0.7629752603198586, 0.031097354437254032), (0.9202799257088203, 0.8545409146117934), (0.4740012769258859, 0.30554661789326976), (0.9662984341217945, 0.24235140218349704), (0.236385903920734, 0.8065137287975154), (0.7509340695304845, 0.9276718423781918), (0.891709366337186, 0.9691233497708065), (0.45766675798331646, 0.3966074453757069), (0.362463818656684, 0.629782983287922), (0.3895828182648007, 0.11182372435220689), (0.8007718207811885, 0.07083259575886258), (0.9395297121272306, 0.003549829042441055), (0.9990444201768337, 0.4816092706412669), (0.806664037655748, 0.45636915118812094), (0.7248316046403981, 0.4136143673445848), (0.9797254747122175, 0.5348075095243779), (0.832410347070477, 0.36236092065071435), (0.17697174259486892, 0.09903555437885947), (0.3320429025096797, 0.42538137689172295), (0.010390541304141299, 0.3196764197089256), (0.13647705960093703, 0.6166884292149969), (0.7413967117502017, 0.6758731780971651), (0.5057620560480408, 0.6176726900765315), (0.811221033004999, 0.15436803010778977), (0.5010541138760939, 0.35001152238091926), (0.9413826105193199, 0.9418596542666187), (0.891256361420491, 0.7886584654021789), (0.3676445849723219, 0.9387145658378656), (0.7976904766536591, 0.7297167662430665), (0.5966826978617474, 0.29179542156826277), (0.6209578021367281, 0.22193571777470145), (0.8298034730084203, 0.5164834220744453), (0.1974315640582841, 0.9764209254933037), (0.3181560706032852, 0.9659291942205317), (0.8665674546422951, 0.8281710981528015), (0.341232980616892, 0.5707946637100852), (0.8931358896561539, 0.40864805338293986), (0.26644032823825714, 0.9989727471390323), (0.3993087575662785, 0.009572468741341433), (0.7385521851703551, 0.8947961501854975), (0.3265958212912289, 0.12135269959328665), (0.33657186037515696, 0.04678149607307802), (0.6574688023519235, 0.14620381872693322), (0.9232073321379433, 0.464399378682132), (0.3350568606219765, 0.8140710044746052), (0.43439242705535963, 0.6850627844635814), (0.6748600302251079, 0.17179426903224415), (0.3257145924815924, 0.17892361406234325), (0.9843761318782708, 0.7246387654097534), (0.3302488609623919, 0.5461838792803725), (0.942182061647097, 0.271796972592925), (0.7992439374549364, 0.3344916623897427), (0.07722251160513627, 0.5998378921773792), (0.9551490162437984, 0.99084148343811), (0.2994585617190968, 0.8420506992016424), (0.692980959785355, 0.832838090803397), (0.31555831127132894, 0.06401272570899819), (0.02665227648457802, 0.5242147042171419), (0.1974784428862567, 0.9137326594564479), (0.8486377116437235, 0.773093204292392), (0.6588651068050204, 0.6191834372968826), (0.9294759207447961, 0.04471010558595201), (0.9407045003182903, 0.7240803846820537), (0.6814942236797052, 0.6579517970003296), (0.2956248273119104, 0.4141031496785965), (0.729642956744248, 0.18897087844791205), (0.6092213719795501, 0.12514914017649392), (0.7431271140678826, 0.12660475585183406), (0.9023640654012873, 0.21133242457776658), (0.3513947221768753, 0.10988741056845952), (0.7560785506387285, 0.1994584377393509))

def _create_argument_list_TSP(path_completed, path_ahead, r_depth, l):
    if len(path_completed) == r_depth:
        l.append((path_completed, path_ahead))
    else:
        for i in range(len(path_ahead)):
            _create_argument_list_TSP(path_completed + (path_ahead[i],) , 
                                       path_ahead[:i] + path_ahead[i+1:], r_depth, l)

def _calculate(m, nr_of_cities):
    job_queue, result_queue = m.get_job_queue(), m.get_result_queue()

    in_list = []
    result_list = []
    _create_argument_list_TSP((staedte_positionen[0], ), 
                            staedte_positionen[1:nr_of_cities], 3, in_list)
    for arg in in_list:
        job_queue.put(arg)
    job_queue.join()
    while not result_queue.empty():
        result_list.append(result_queue.get()) 
    return min(result_list)

if __name__ == '__main__':
    from sys import argv, exit
    if len(argv) != 4:
        print('usage:', argv[0], 'server_IP server_socket nr_of_cities')
        exit(0)
    server_ip = argv[1]
    server_socket = int(argv[2])
    TaskManager.register('get_job_queue')
    TaskManager.register('get_result_queue')
    m = TaskManager(address=(server_ip, server_socket), authkey = b'secret')
    m.connect()

    t1 = time.time()
    result = _calculate(m, int(argv[3]))
    t2 = time.time()
    print(' result: ', result)
    print(' time:   ', t2-t1, ' s\n')

# Verteilung mit IPython

Die Verteilung erfolgt mit den Paket *IPython.parallel*, die Dokumentation finden Sie [hier](http://ipython.org/ipython-doc/stable/parallel/parallel_intro.html)

Installation:

    conda install -y ipyparallel

Folgende Schritte sind nötig, um ein Cluster mit IPython und python3 auf den Rechnern im Labor (Linux) aufzusetzen:
 1. Auswahl eines Ports, 34000+N (N: Gruppen-Nummer), Platzhalter PORT
 1. Feststellen der IP-Adresse, auf dem der *ipcontroller3* laufen soll z.B. mit 
            hostname -I
   Platzhalter ist IP
 1. Auf einem Rechner muss der Controller gestartet werden mit
          ipcontroller --ip=IP --port=PORT
 Dabei werden die Dateien *ipcontroller-client.json* und *ipcontroller-engine.json* im Verzeichnis *~/.ipython/profile_default/security* erzeugt, die von den Engines und dem Notebook, das das Cluster nutzt, benötigt werden.
 1. Jetzt können Sie sich *mit der selben Benutzerkennung* auf anderen Rechnern anmelden und Engines starten mit
          ipengine&
  Sie können natürlich mehrere Engines pro Rechner starten.
 1. Starten Sie den Jupyter-Server, *wieder mit der gleichen Benutzerkennung*, durch Eingabe von 
         jupyter-notebook
         
Falls sich die beteiligten Rechner *nicht* das Verzeichnis *~/.ipython* teilen, müssen Sie die Dateien *ipcontroller-client.json* und *ipcontroller-engine.json* aus dem Verzeichnis *~/.ipython/profile_default/security* des Rechners, auf dem der Controller läuft, in das entsprechende Verzeichnis jedes Rechners kopieren, der für das Cluster rechnet bzw. das Cluster nutzen soll. **Dieser Schritt muss wiederholt werden, sobald der Controller neu gestartet wird.** Details finden Sie in der [Dokumentation](http://ipython.org/ipython-doc/stable/parallel/parallel_intro.html).

Der Befehl *Client* gibt eine Liste aller Engines zurück, die Teil des Clusters sind. 

Wird dagegen der Name einer Datei übergeben, so werden die Verbindungs-Daten zum *ipcontroller* aus dieser Datei gelesen (siehe unten).

In [None]:
staedte_positionen = (
    (0.010319427306382911, 0.8956251389386756),
    (0.6999898714299346, 0.42254500074835377),
    (0.4294574582950912, 0.4568408794115657),
    (0.6005454852683483, 0.9295407203370832),
    (0.9590226056623925, 0.581453646599427),
    (0.748521134122647, 0.5437775417153159),
    (0.7571232013282426, 0.606435031856663),
    (0.07528757443413125, 0.07854082131763074),
    (0.32346175150639334, 0.7291706487873425),
    (0.012935451483722882, 0.974440252089956),
    (0.7894689664351368, 0.8925464165283283),
    (0.5017081207027582, 0.2323298297211428),
    (0.5994368069089712, 0.006438246252584379),
    (0.3471372841416518, 0.32362936726486546),
    (0.9080568556459205, 0.5872162265716462),
    (0.008216651916432838, 0.5605251786730867),
    (0.12281649843134745, 0.778836327426156),
    (0.9698199622470612, 0.9108771425774694),
    (0.22977122891732482, 0.9692739885317619),
    (0.8192293086323663, 0.5857981607663957),
    (0.1422079724040628, 0.8147259475583606),
    (0.6706795717064135, 0.591561956032189),
    (0.15756919328106178, 0.6331745919782176),
    (0.9932745190952539, 0.20429268341528184),
    (0.21104352892679712, 0.8836996377783977),
    (0.15162951778287448, 0.43829883402923786),
    (0.1014198097226855, 0.5877946138306056),
    (0.8961534561384676, 0.6498866051905969),
    (0.02348788064910401, 0.2555771312427847),
    (0.7629752603198586, 0.031097354437254032),
    (0.9202799257088203, 0.8545409146117934),
    (0.4740012769258859, 0.30554661789326976),
    (0.9662984341217945, 0.24235140218349704),
    (0.236385903920734, 0.8065137287975154),
    (0.7509340695304845, 0.9276718423781918),
    (0.891709366337186, 0.9691233497708065),
    (0.45766675798331646, 0.3966074453757069),
    (0.362463818656684, 0.629782983287922),
    (0.3895828182648007, 0.11182372435220689),
    (0.8007718207811885, 0.07083259575886258),
    (0.9395297121272306, 0.003549829042441055),
    (0.9990444201768337, 0.4816092706412669),
    (0.806664037655748, 0.45636915118812094),
    (0.7248316046403981, 0.4136143673445848),
    (0.9797254747122175, 0.5348075095243779),
    (0.832410347070477, 0.36236092065071435),
    (0.17697174259486892, 0.09903555437885947),
    (0.3320429025096797, 0.42538137689172295),
    (0.010390541304141299, 0.3196764197089256),
    (0.13647705960093703, 0.6166884292149969),
    (0.7413967117502017, 0.6758731780971651),
    (0.5057620560480408, 0.6176726900765315),
    (0.811221033004999, 0.15436803010778977),
    (0.5010541138760939, 0.35001152238091926),
    (0.9413826105193199, 0.9418596542666187),
    (0.891256361420491, 0.7886584654021789),
    (0.3676445849723219, 0.9387145658378656),
    (0.7976904766536591, 0.7297167662430665),
    (0.5966826978617474, 0.29179542156826277),
    (0.6209578021367281, 0.22193571777470145),
    (0.8298034730084203, 0.5164834220744453),
    (0.1974315640582841, 0.9764209254933037),
    (0.3181560706032852, 0.9659291942205317),
    (0.8665674546422951, 0.8281710981528015),
    (0.341232980616892, 0.5707946637100852),
    (0.8931358896561539, 0.40864805338293986),
    (0.26644032823825714, 0.9989727471390323),
    (0.3993087575662785, 0.009572468741341433),
    (0.7385521851703551, 0.8947961501854975),
    (0.3265958212912289, 0.12135269959328665),
    (0.33657186037515696, 0.04678149607307802),
    (0.6574688023519235, 0.14620381872693322),
    (0.9232073321379433, 0.464399378682132),
    (0.3350568606219765, 0.8140710044746052),
    (0.43439242705535963, 0.6850627844635814),
    (0.6748600302251079, 0.17179426903224415),
    (0.3257145924815924, 0.17892361406234325),
    (0.9843761318782708, 0.7246387654097534),
    (0.3302488609623919, 0.5461838792803725),
    (0.942182061647097, 0.271796972592925),
    (0.7992439374549364, 0.3344916623897427),
    (0.07722251160513627, 0.5998378921773792),
    (0.9551490162437984, 0.99084148343811),
    (0.2994585617190968, 0.8420506992016424),
    (0.692980959785355, 0.832838090803397),
    (0.31555831127132894, 0.06401272570899819),
    (0.02665227648457802, 0.5242147042171419),
    (0.1974784428862567, 0.9137326594564479),
    (0.8486377116437235, 0.773093204292392),
    (0.6588651068050204, 0.6191834372968826),
    (0.9294759207447961, 0.04471010558595201),
    (0.9407045003182903, 0.7240803846820537),
    (0.6814942236797052, 0.6579517970003296),
    (0.2956248273119104, 0.4141031496785965),
    (0.729642956744248, 0.18897087844791205),
    (0.6092213719795501, 0.12514914017649392),
    (0.7431271140678826, 0.12660475585183406),
    (0.9023640654012873, 0.21133242457776658),
    (0.3513947221768753, 0.10988741056845952),
    (0.7560785506387285, 0.1994584377393509),
)

In [None]:
from ipyparallel import Client

In [None]:
c = None


def setup_client():
    global c
    c = Client()
    print("Nr of Engines:", len(c))

Die folgende Funktion verwendet eine *load_balanced_view*, um den Algorithmus zum TSP verteilt im Cluster auszuführen.

In [None]:
setup_client()

In [None]:
def TSP_IPython_cluster(path_completed, path_ahead, r_depth):
    def _shortest_closed_path_tuple_call(arg_tupel):
        return _shortest_closed_path(*arg_tupel)

    def _create_argument_list_TSP(path_completed, path_ahead, r_depth, l):
        if len(path_completed) == r_depth:
            l.append((path_completed, path_ahead))
        else:
            for i in range(len(path_ahead)):
                _create_argument_list_TSP(
                    path_completed + (path_ahead[i],),
                    path_ahead[:i] + path_ahead[i + 1 :],
                    r_depth,
                    l,
                )

    l_in = []
    _create_argument_list_TSP(path_completed, path_ahead, r_depth, l_in)
    #
    # Load-Balanced view (fuer Cluster)
    lview = c.load_balanced_view()
    lOut = lview.map_sync(_shortest_closed_path_tuple_call, l_in)
    return min(lOut)

Bevor die nächsten Zeilen funktionieren, müssen erst einige Engines und ggf. ein Controller gestartet werden.

In [None]:
setup_client()
nr_of_cities = 10

r_depth = 2

Das magische Kommando *%%px* bewirkt, das der Inhalt der Zelle auf allen Engines ausgeführt wird.

In [None]:
%%px

import math

def _distance(p1, p2):
    delta_x = p2[0] - p1[0]
    delta_y = p2[1] - p1[1]
    return math.sqrt(delta_x * delta_x + delta_y * delta_y)

def _path_length(path):
    total_len = 0.0
    for i in range(1, len(path)):
        total_len += _distance(path[i-1], path[i])
    return total_len

def _shortest_closed_path(path_completed, path_ahead,
        min_length=None, min_path=None):
    if len(path_ahead) == 1:
        # create a closed path by adding the starting point
        total_path = path_completed + path_ahead + (path_completed[0], )
        total_path_length = _path_length(total_path)
        if min_length == None or total_path_length < min_length:
            min_length, min_path = total_path_length, total_path  
    else:
        # continue recursion
        for i in range(len(path_ahead)):
            min_length, min_path = \
                _shortest_closed_path(path_completed + (path_ahead[i], ),
                                       path_ahead[:i] + path_ahead[i+1:],
                                       min_length, min_path)
    return min_length, min_path

Jetzt kann der TSP-Algorithmus im Cluster ausgeführt werden:

In [None]:
%%timeit
TSP_IPython_cluster((staedte_positionen[0], ), staedte_positionen[1:nr_of_cities], r_depth)

# Ausblick: High-Level-Ansatz mit *dask*

Durch eine Repräsentation einer Aufgabe als Datenfluß-Graph ('Task Graph') kann [*dask*](https://dask.org) das Problem unter Einsatz verschiedener [Scheduler](https://docs.dask.org/en/latest/) lösen. Je nach Scheduler werden die Berechnungen auf einem Rechner seriell, parallel oder verteilt in einem Cluster gelöst.

Im Terminal eingeben:

    conda install -y dask

In [None]:
import dask.array as da
import numpy as np
from dask.threaded import get as get_threaded
from operator import add

data_size = (10000, 10000)

data = da.random.normal(41.41, 1, size=data_size, chunks=(1000, 1000))
dsk = {"x": data, "y": (add, "x", 1.01), "z": (da.mean, "y")}

In [None]:
data

Die Berechnungen werden hier parallelisiert auf einem Rechner ausgeführt.

In [None]:
m = np.array(get_threaded(dsk, "z"))
print(m)

Dir Rechenzeit pro Aufruf wird mit *timeit* bestimmt:

In [None]:
%timeit m  = np.array(get_threaded(dsk, 'z'))

Falls genügend Hauptspeicher vorhanden ist, können die Daten im RAM gehalten werden.

In [None]:
data_p = data.persist()
dsk_p = {"x": data_p, "y": (add, "x", 1.01), "z": (da.mean, "y")}

In [None]:
%timeit m  = np.array(get_threaded(dsk_p, 'z'))

In [None]:
del data, data_p

In [None]:
np_data = np.random.normal(41.41, 1, size=(10000, 10000))

In [None]:
%%timeit
m = np.mean(np_data+1.01)

In [None]:
del np_data

# Aufgaben am 05.12. und 07.12.2023

**Wichtiger Hinweis:** Um Konflikte zu vermeiden, verwendet jedes Team den Port $34000 + N_{\rm G}$, wobei 
$N_{\rm G}$ der Team-Nummer entspricht.

## Multiprocessing

Sie sollen die Berechnung von Pi nach Monte Carlo auf mehrere Rechner verteilen.
1. Skizzieren Sie die benötigten Komponenten und das Zusammenspiel (Datenfluss).
1. Implementieren die verteilte Anwendung und messen Sie die Rechenzeiten der auf mehrere Rechner verteilten Pi-Berechnung im labor M2.02.
1. Stellen Sie sicher, dass Sie auch alle Prozessoren auf den einzelnen Rechnern auslasten.
1. Visualisieren Sie die Systemlast mit Xosview und erklären Sie Ihre Beobachtungen.

## Middleware / docker

1. Erstellen Sie ein Java-Programm zur Berechnung der Werte der allgemeinen harmonischen und der allgemeinen alternierenden harmonischen Reihe erstellen. Eingabegrößen sind: Art der Reihe, $\alpha$ sowie die Grenze $N$, bis zu der die Summation durchgeführt wird.
1. Nennen und beschreiben Sie zwei wichtige Vorteile, die Entwickler durch den Einsatz einer Middleware haben? (ca. vier Sätze)
1. Vergegenwärtigen Sie sich nochmals die Struktur und Funktionsweise der im Rahmen der Vorlesung vorgestellten verteilten Monte-Carlo-Simulation, die [Ice](http://www.zeroc.com) verwendet.
1. Sie sollen eine verteilte Anwendung zur Berechnung der Werte der allgemeinen harmonischen und der allgemeinen alternierenden harmonischen Reihe erstellen. Eingabegrößen sind: Art der Reihe, $\alpha$ sowie die Grenze $N$, bis zu der die Summation durchgeführt wird. Dabei soll der Server, der die Berechnungen durchführt, in Java und der Client in Python implementiert werden. Als Middleware ist [Ice](http://www.zeroc.com) zu verwenden. Gehen Sie nach dem in der ersten Veranstaltung eingeführten Schema vor, wobei Sie aufgrund der Vorgaben einige Schritte überspringen können. 
1. Erstellen und testen Sie ein Docker-Image für den in den letzten Teilaufgabe erstellten Server. Beachten Sie hierzu die Anleitung zur Verwendung von Docker in moodle.

**Hinweis:** Seite 535 des Buchs [Konkrete Mathematik (nicht nur) für Informatiker* von Edmund Weitz](https://link.springer.com/book/10.1007/978-3-662-62618-4)

# Überprüfung

1. Was bedeutet der Begriff "Remote Procedure Call" (RPC)? (max. drei Sätze)
1. Sie wollen eine Berechnung, die Sie auf einem Rechner mit *multiprocessing* parallelisiert haben, auf mehrere Rechner verteilen. Auf welche Faktoren müssen Sie achten, damit die Verteilung zu einer Verkürzung der Rechenzeit führt? (ca. vier Sätze)
1. Was bedeutet der Begriff Lastverteilung ("load balancing")? (max. fünf Sätze)
1. Geben Sie ein Beispiel für ein parallelisierbares Problem, das sich *nicht* für eine Verteilung auf mehrere Rechner eignet. (ca. vier Sätze)
1. Welche Vorteile bietet *docker* im Vergleich zu *VirtualBox*?