# MapReduce
MapReduce ist ein von Google eingeführtes Framework zur Verarbeitung großer, verteilt gespeicherter Datenmengen. Insbesondere für Aggregationen stellt MapReduce einen performanten, aber dennoch einfach zu implementierenden Weg bereit. Es wird beispielsweise von Apache Hadoop implementiert.

In [None]:
import re
import multiprocessing

from tui_dsmt.parallel import MapReduce
from tui_dsmt.parallel.datasets import load_texts

texts = load_texts()

## Inhaltsverzeichnis
- [Map und Reduce in Python](#Map-und-Reduce-in-Python)
- [Mathematische Formulierung](#Mathematische-Formulierung)
- [MapReduce in Software](#MapReduce-in-Software)
- [Zusammenfassung](#Zusammenfassung)

## Map und Reduce in Python
Die beiden Funktionen `map` und `reduce` werden in der funktionalen Programmierung verwendet, um Listen zu verarbeiten. Auch in Python sind diese Funktionen im Kern der Sprache enthalten und werden mit jeder Installation mitgeliefert.

`map` erhält als Parameter eine Funktion und eine Liste und wirkt wie eine Abbildung. Die übergebene Funktion wird für jedes Element einzeln aufgerufen und alle Werte zu einer neuen Liste zusammengefasst. (In Python akzeptiert die Funktion als zweiten Parameter ein beliebiges, iterierbares Objekt und gibt einen Generator zurück!)

In [None]:
def multiply_by_2(x):
    print(f'multiply_by_2 aufgerufen mit {x=}')
    return x * 2

my_list = [1, 2, 3, 4, 5]
list(map(multiply_by_2, my_list))

Da die Funktionsaufrufe vollständig unabhängig voneinander erfolgen und immer nur auf einem spezifischen Wert arbeiten, lässt sich der Aufruf problemlos parallelisieren. Python liefert dazu fertige Methoden:

In [None]:
with multiprocessing.Pool(processes=2) as pool:
    my_new_list = pool.map(multiply_by_2, my_list)

my_new_list

`reduce` dagegen reduziert dem Namen entsprechend eine Liste mit Hilfe einer angegebenen Funktion. Diese hat zwei Parameter: `a` bezeichnet den `accumulator`, also das aktuellste Zwischenergebnis, während `v` den aktuellen Wert `value` repräsentiert. (In Python versteckt sich diese Funktion im Modul `functools`, da andere Methoden wie beispielsweise Comprehensions als lesbarer gelten.)

In [None]:
def sum_up(a, v):
    print(f'sum_up aufgerufen mit {a=} und {v=}')
    return a + v

from functools import reduce
reduce(sum_up, my_list, 0)

## Mathematische Formulierung
Seien $K$ und $L$ Mengen von Schlüsseln, während $V$ und $W$ Mengen von Werten bezeichnen. Sind die Mengen mit einem Asterisk versehen, so ist die Menge aller möglichen Listen aus der Grundmenge gemeint. (Diese Listen können Elemente mehrfach enthalten und ebenfalls leer sein.)

Die Map-Funktion soll nun aus jedem Paar von Schlüssel und Wert $(k, v) \in K x V$ eine beliebige Anzahl neuer Paare bestehend aus Schlüssel und Wert $(l, w) \in (L, W)$ bestimmen. Für Map gilt also:

$$
Map: K \times V \rightarrow (L \times W)^* \quad\text{ bzw. }\quad (k, v) \rightarrow [(l_1, w_1), \dots, (l_n, w_n)]
$$

Sei nun $U$ eine weitere Menge von Werten. Die Reduce Funktion hat zum Ziel, alle Werte $w_1, \dots, w_m \in W$, die einem gemeinsamen Schlüssel $l \in L$ zugeordnert werden, auf eine Liste von Werten $u_1, \dots, u_p \in U$ zu reduzieren. Es gilt daher:
$$
Reduce: L \times W^* \rightarrow U^* \quad\text{ bzw. }\quad (l, [w_1, w_2, \dots, w_m]) \rightarrow (u_1, u_2, \dots, u_p)
$$

## MapReduce in Software
MapReduce ist allerdings kein rein mathematisches Problem. Stattdessen sollen Daten, die über viele Server verteilt gespeichert werden, effizient bearbeitet werden. Der Ablauf von MapReduce sieht veranschaulicht wie folgt aus:

<center>
     <br>
    <small>Ville Tuulos, Andreas Pietzowski | <a href="https://creativecommons.org/licenses/by-sa/3.0/deed.en">CC BY-SA 3.0 DEED</a></small>
</center>

Im Folgenden soll das Vorgehen anhand eines Beispiels demonstriert werden.

### Verteilte Daten
Zuerst werden die Daten geladen. In diesem Beispiel befinden sich verschiedene Bücher auf verschiedenen (virtuellen) Servern, die mit `Node1` bis `Node4` gekennzeichnet sind. (Mit dem Parameter `num_nodes` können Sie die Anzahl der Server verändern.)

In [None]:
mr = MapReduce(texts, num_nodes=4)
mr.show()

### Abbildung: Map
*Map* erhält einen Schlüssel und einen Wert und gibt eine beliebige Anzahl an Tupeln bestehend aus einem Schlüssel und einem Wert zurück. In diesem Schritt kann die Funktion auf beliebige Daten und auf einem beliebigen Server ohne die Notwendigkeit zur Synchronisation angewendet werden.

Die folgende Map-Funktion gibt Tupel bestehend aus einem Wort und der Zahl $1$ zurück. (Durch Aufsummieren der Zahlen können somit die Wörter gezählt werden.)

In [None]:
def map_function(title, data):
    for value in data.split():
        value = re.sub(r'[^A-Za-zÄÖÜäöü]', '', value)
        if value:
            yield value.lower(), 1

map_result = mr.map(map_function)
map_result.show()

### Umverteilung: Shuffle
*Shuffle* sorgt dafür, dass Listen der Werte, die dem gleichen Schlüssel zugeordnet wurden, auf den selben Server transportiert werden. Da hier der einzige Austausch über das Netzwerk geschieht, ist dieser Schritt bei einer schlechten Implementierung besonders anfällig für Performance-Probleme. In der Regel wird das Sortieren und Versenden zwar vom Framework übernommen, durch geschickte Wahl der Tupel und eine Zwischenreduzierung lässt sich der Aufwand des Shuffle allerdings massiv reduzieren.

In [None]:
shuffle_result = map_result.shuffle()
shuffle_result.show()

### Reduktion: Reduce
*Reduce* wird auf die entstandenen Listen angewendet und reduziert sie auf die endgültigen Ergebnisse. Auch diese Operation kann wieder vollständig lokal und ohne eine Synchronisierung zwischen den Knoten erfolgen.

Die erste Reduce-Funktion zählt dabei das Vorkommen jedes Wortes, indem die Werte aufsummiert werden.

In [None]:
def reduce_function(key, values):
    yield key, sum(values)

reduce_result = shuffle_result.reduce(reduce_function)
reduce_result.show()

In der Reduce-Funktion ist es jetzt ebenfalls möglich, Filter über die aggregierten Ergebnisse zu implementieren.

In [None]:
def reduce_function(key, values):
    my_sum = sum(values)
    if my_sum > 150:
        yield key, my_sum

reduce_result = shuffle_result.reduce(reduce_function)
reduce_result.show()

Da keine weitere Shuffle-Phase folgt, können im Reduce-Schritt **keine** beliebigen Tupel zurückgegeben werden!

In [None]:
# funktioniert nicht, da keine zweite Shuffle Phase!
def reduce_function(key, values):
    my_sum = sum(values)
    for v in key:
        yield v, my_sum

reduce_result = shuffle_result.reduce(reduce_function)
reduce_result.show()

## Zusammenfassung
MapReduce ist ein weiteres Modell, das durch seine starke Abstraktion eine einfache Programmierung ermöglicht und gleichzeitig die üblichen Fehler vermeidet. Besonders geeignet ist es, wenn verteilte Dateien für die Analyse in relativ kleine Schlüssel-Wert-Paare geteilt und anschließend reduziert werden können, wie es beispielsweise bei der Textanalyse häufig der Fall ist.