# Gruppe 3 - Big Data WS2020/21

## 1. Anwendungsfall: Topic Modeling mit LDA bei Wikipedia-Daten
Die Menge an Daten, die erstellt, vervielfältigt und konsumiert werden beträgt 40 Zetabyte jährlich. Mit einer sich ständig erhöhenden Menge an Daten, wird es immer schwieriger gesuchte Informationen zu finden. Dies trägt dazu bei, dass der Überblick über die relevanten Details verloren geht. Auch führt es dazu, dass bei spezieller Informationssuche mittlerweile "Google-Skills" gebraucht werden, um die gewünschten Informationen zu erhalten.

Unser Anwendungsfall basiert auf dem Ansatz des Topic Modelings, mit welchem themenverwandte Artikel auf Wikipedia zu finden sind. Dabei bietet das Topic Modeling Methoden zum Entdecken von versteckten Themen in der Sammlung, sowie die Verwendung der Klassifizierung zur Organisation/Zusammenfassung/Suche der Dokumente. 

### Warum ist das Problem ein "Big Data" Problem?
Zur Beantwortung der Frage werden die Aspekte Volume, Velocity und Variety betrachtet.

###### VOLUME
Große Mengen an Wikipedia-Artikeln (täglich 600 neue Artikel bei aktuell 55 Millionen, “Current incarnation takes up 44 GB”)
###### VELOCITY
Datenverarbeitung bei dem LDA-Algorithmus ist aufwendig: Alle Wörter in den Artikeln werden vektorisiert und durchlaufen mehrere Itrerationen (in unserem Beispiel: 15), wobei jedes Mal mathematische Berechnungen vorgenommen werden. Die Ergebnisse des Algorithmus sollen zeitnah zur Weiterarbeit an den Data Scientist geschickt werden. 
###### VARIETY
* Unstrukturierte Daten in Textformat
* Abwesenheit von Normalisierung
* Benötigen individuelle Datenvorbereitung

### Warum kann man das Problem nicht mit der herkömmlichen Storage/Analyse/Datenbank-Technologie lösen?
Der LDA-Algorithmus benötigt interaktive, iterative Datenabfragen, wofür eine relationale Datenbank nicht konzipiert wurde ist. Zusätzlicher Nachteile bei der Verwendung einer relationalen Datenbank ist, die deklarative Abfragesprache, die zu viele Implementierungsdetails versteckt. Dies ist vor allem in der parallelen Datenverarbeitung von großem Nachteil. 
Mit herkömmlichen relationalen Datenbanken oder DWH-Systemen kann auf Massendaten lediglich vertikal bis zu einem gewissen Punkt skaliert werden. Durch Spark kann bei Bedarf zusätzliche Knoten hinzugefügt werden und das Problem der Massendaten durch Parallelisierung begegnet werden. 

## 2. Datengrundlage

Für Testzwecke mit kleinen Datensätzen können über die Export Pages Seite von Wikipedia in unterschiedlicher Anzahl Artikel heruntergeladen werden, um später mit unterschiedlichen Dateigrößen und Artikel-Collections arbeiten zu können. (https://en.wikipedia.org/wiki/Special:Export)

Der komplette, aktuelle Wikipedia-Dump kann über https://dumps.wikimedia.org/enwiki/20210201/ heruntergeladen werden. Dieser beträgt komprimiert zum heutigen Stand (Februar 2021) 17.9 GB. Die Daten erhält man in einem .xml.bz2 Format. Um diese Daten verarbeiten zu können, müssen sie mithilfe von einem Wikiextractor extrahiert und gesäubert werden. 

Der Wikiextractor ist eine Python Bibliothek, welches reinen Text von Wikipedia Dumps extrahiert. Dabei erhält der Wikiextractor die heruntergeladenen Wikipedia Dumps als ein Argument. Um den reinen Textfluss zu erhalten, werden unter anderem Bilder, Links und Verweise der einzelnen Artikeln entfernt. 

Die Ausgabe wird in mehreren Dateien ähnlicher Größe in einem Verzeichnis gespeichert. Das Verzeichnis wird hierbei vom Wikiextractor selbst erstellt. Jede Datei enthält dann mehrere Dokumente in diesem Dokumentenformat.

Quelle: https://github.com/attardi/wikiextractor
Hinweis für Windows: der Wikiextraktor aus dem Masterbranche funktioniert aufgrund der schlechten Unterstützung für StringIO in der Python-Implementierung unter Windows nicht. Das Problem wurde aber in einem anderen Branch gefixt, der aber noch nicht in den Masterbranch gemergt wurde. https://github.com/attardi/wikiextractor/pull/238

## 3. Packages

#### `findspark, pyspark`
Für die Verwendung von Pyspark in einem Jupyter Notebook, wird die Funktion "init" von dem Python-Package "findspark" verwendet.

#### `pyspark.sql`
* SparkSession ist der Haupteinstiegspunkt für die später verwendeten DataFrame- und SQL-Funktionalitäten
* Über das SQLContext werden die strukturierten Daten mithilfe von SQL verarbeitet

#### `pyspark.ml`
Von der Bibliothek von PySpark ML (Nachfolger der MLib) für maschinelles Lernen werden verschiedene Features verwendet.
* CountVectorizer: Sammlung von Textdokumenten in Vektoren von Token-Zählungen konvertieren. Erzeugt die Repräsentation für die Dokumenten über das Vokabular, die dann an den LDA-Algorithmus übergeben werden. 
* RegexTokenizer: Ermöglicht eine erweiterte Tokenisierung auf der Grundlage des Abgleichs mit regulären Ausdrücken
* StopWordsRemover: Entfernt alle Stoppwörter aus den Eingabesequenzen. 
* LDA: Clustering Algorithmen (LDA) in ML
* Pipeline: Beim maschinellen Lernen ist es üblich, eine Abfolge von Algorithmen auszuführen, um Daten zu verarbeiten und daraus zu lernen. ML stellt einen solchen Workflow als Pipeline dar, die aus einer Abfolge von PipelineStages (Transformers und Estimators) besteht, die in einer bestimmten Reihenfolge ausgeführt werden.

#### `nltk` 
In unserem Projekt befassen wir uns mit der Analyse natürlicher Sprache. Dies fällt in den Bereich der natürlichen Sprachverarbeitung (NLP). Deshalb wird in diesem Projekt das Natural Language Toolkit (NLTK), eine NLP-Bibliothek in Python, für die Analyse der Textdaten verwendet. 

#### `nltk.stem.porter`
Mithilfe des Porter Stemming Algorithmus werden morphologische Affixe entfernt, dass nur der Wortstamm übrig bleibt.

#### `nltk.corpus import stopwords`
Eine der wichtigsten Formen der Vorverarbeitung ist das Herausfiltern unbrauchbarer Daten (Stopwords).

#### `nltk.tokenize`
Zusätzlich wird der Tokenizer verwendet, der Zeichenketten in Listen von Teilstrings zerlegt.

#### `import wikipedia`
Die Wikipedia-API von Python wird verwendet, um einen Testartikel von der Wikipedia-Webseiten runterzuladen.

#### `time` 
Wird zur Unterstützung bei den Messungen später verwendet.

In [2]:
import findspark
findspark.init()
import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession, SQLContext
from pyspark.ml.feature import CountVectorizer, RegexTokenizer, StopWordsRemover
from pyspark.ml.clustering import LDA
from pyspark.ml import Pipeline
import re
import nltk
from nltk.stem.porter import PorterStemmer
from nltk.corpus import stopwords
from nltk.tokenize import RegexpTokenizer
import wikipedia
import time
import warnings
warnings.filterwarnings('ignore', category=DeprecationWarning)

## 4. SparkContext

Der SparkContext ist hierbei der Einstiegspunkt für Spark. 

Dabei wurden unterschiedliche Modi zur Vergleichbarkeit verwendet.

Local Mode: im Local Mode wird jeder Prozessor des lokalen Rechners als Worker verwendet, um die Berechnung auszuführen.

Der Standalone Mode wird mit 2 Workern initalisiert. Um zu testen wie sich die Performance unseres Programms verhält, wenn die CPU-Zahl nach oben skaliert wird, haben wir unterschiedliche Einstellungen ausprobiert:
1. Zustand: 2 Worker - jeweils 2 CPUs - jeweils 8 GB RAM
2. Zustand: 2 Worker - jeweils 4 CPUs - jeweils 8 GB RAM
3. Zustand: 2 Worker - jeweils 6 CPUs - jeweils 8 GB RAM

Das Hochschulcluster wurde ausprobiert und es wurde die Erkenntnis gewonnen, dass die verwendete Datenmenge noch zu klein ist, dass sich das Cluster lohnt. Erst bei einer größeren Datenmengen würde die Verarbeitungszeit die Zeit, die es für den Verbindungsaufbau braucht, kompensieren.

In [3]:
#LocalMode
spark = SparkSession.builder.getOrCreate()

#StandaloneMode
#spark = SparkSession.builder.master('spark://localhost:7077').getOrCreate()

sc = spark.sparkContext
sqlContext = SQLContext(sc)

In [5]:
spark

## 5. Function Definitions
Im folgenden werden die Entwicklungsdetails zu den implementierten Funktionen beschrieben.

#### `splitByDoc`
Die rohen Daten mit allen Wikipedia-Artiken sind in Textdateien über mehrere Ordner verteilt. Die einzelnen Artikel werden in den Dateien durch ein `</doc>` Tag getrennt. Mit dieser Funktion werden die Artikel für die weitere Verarbeitung voneinander getrennt und als Array zurückgegeben. 

#### `get_title`
Es wird ein Artikel als Parameter übergeben. Bei jedem Artikel folgt nach dem Titel zunächst eine neue Zeile. Basierend darauf kann der Titel und Inhalt des Artikels unterteilt werden. In dieser Funktion wird ausschließlich der Titel zurückgegeben. 

#### `get_content`
Das Prinzip ist das gleiche wie bei `get_title`. Hier wird jedoch der Inhalt des Artikels zurückgegeben.

#### `clean`
Diese Funktion wird ausschließlich für die Datenbereinigung von einem einzelnen Testartikel verwendet. Dies geschieht nach dem Training des Modells und wird verwendet, um das Modell zu validieren. Die Funktionsweise der Datenbereinigung wird im Kapitel 'Data Pre-Processing' genauer beschrieben.

In [4]:
def splitByDoc(textfile):
    return list(filter(lambda x: x != '\n', textfile[1].split('</doc>')))

def get_title(content):
    # Remove any leading or lagging space if present 
    content = content.strip()
    title = ''
    try:
        if(content != ''):
            # Split the content on the basis of new line
            arr = content.split('\n', 2)
            # Second line is the title
            title = arr[1]
            # Rest is the actual content
            actual_content = arr[2]
    except:
        title = 'error'
    return title

def get_content(content):
    # Remove any leading or lagging space if present 
    content = content.strip()
    actual_content = ''
    try:
        if(content != ''):
            # Split the content on the basis of new line
            arr = content.split('\n', 2)
            # Second line is the title
            title = arr[1]
            # Rest is the actual content
            actual_content = arr[2]
    except:
        actual_content = 'error'
    return actual_content

def clean(article):
    title = article[0]
    document = article[1]
    tokens = RegexpTokenizer(r'\w+').tokenize(document.lower())
    tokens_clean = [token for token in tokens if token not in stopwords.words('english')]
    tokens_stemmed = [PorterStemmer().stem(token) for token in tokens_clean]
    return (title, tokens_stemmed)

## 6. Data Pre-Processing
Ziel der Datenbereinigung ist es, fehlerhafte und irrelevante Daten in den Wikipedia-Artikeln zu finden und sie zu korrigieren. Außerdem sollen die Textdokumente so aufbereitet werden, dass diese als Input für den LDA-Algorithmus verwendet werden können. Folgende Schritte wurden hierfür vorgenommen:

* Daten einlesen sowie fehlerhafte/leere Artikel entfernen
* RegexTokenizer: Artikel in Wörtern (Token) aufspalten und Token mit weniger als drei Zeichen entfernen
* Stopwörter entfernen
* CountVectorizer: Token in Vektoren transformieren

### Daten einlesen
Die Wikipedia-Artikel befinden sich zusammengefasst in Textdateien über mehrere Ordner verteilt. Alle Dateien werden aus den angegebenen Ordnern in ein RDD eingelesen. Für eine effiziente Verarbeitung werden die Artikel getrennt und einzeln gespeichert. Daraufhin werden Titel und Inhalt der Artikel separiert. Zusätzlich werden fehlerhafte und leere Artikel gefiltert. Bei den leeren Datensätzen handelt es sich, um Artikel mit einem validen Titel, aber einem leeren Inhaltskorpus. Dies entsteht, wenn mehrere Titel auf den selben Artikel referenzieren. Schließlich wird das RDD für die weitergehende Transformation in ein DataFrame (DF) konvertiert, da die Funktionen der `pyspark.ml` Bibliothek DataFrames als Input erwarten. Gleichzeitig hat DF gegenüber RDD den Vorteil, dass die Verarbeitung großer Datensätze einfacher sowie performanter gestaltet wird.

### RegexTokenizer
Unter Tokenisierung versteht man die Zerlegung einer Folge von Zeichenketten in Teile wie Wörter, Phrasen und andere Elemente, die Token genannt werden. Dabei werden einige Zeichen wie Satzzeichen verworfen. Der Regex-basierender Tokenizer extrahiert die Token, indem er das angegebene Regex-Muster verwendet `'\\W'`. Der Inhalt der Artikel wurde in unserem Beispiel in Wörter aufgespaltet. Mit dem optionalen Parameter `minTokenLength` wurden alle Wörter mit einer Länge unter 4 Zeichen entfernt. Der Rückgabewert ist ein Array.

### Stopwörter entfernen
Es werden Stoppwörter aus der Sammlung der Token herausgefiltert, um die Qualität und Aussagekraft der Topics zu steigern. Außerdem wurden manuell weitere Stopwörter hinzugefügt. 

### CountVectorizer
Der LDA-Algorithmus erwartet als Input nicht die rohen Textdokumente, sondern eine Matrix von Vektoren. Mit dem CountVectorizer wird jeder Token in einen Vektor konvertiert. 

### Pipeline
Eine Pipeline verkettet mehrere Transformatoren miteinander, um einen ML-Workflow zu spezifizieren. In unserem Beispiel wird in der Pipeline die Datenvorverarbeitung sowie die Berechnung des LDA-Algorithmus ausgeführt. 

## 7. LDA Model mit PySpark

### PySpark LDA-module

<code>pyspark.ml.clustering.LDA</code>

Das <code>pyspark.ml</code> package ist ein Python-package, das auf DataFrames aufbaut und für Maschinelles Lernen (ML) diverse APIs bereitstellt. Dadurch lassen sich schnell ML-Pipelines einrichten und konfigurieren. Das <code>pyspark.ml.clustering</code> module beinhaltet vier Klassen zum LDA-Algorithmus, die wir teilweise in diesem Notebook verwenden.

Die Bibliothek von PySpark bietet weniger Funktionalitäten zu LDA als die gensim-Bibliothek, profitiert jedoch von der parallelen Durchführung der Operationen über PySpark und damit einer schnellen Berechnung der Ergebnisse.

- LDA in Spark Dokumentation: https://spark.apache.org/docs/latest/ml-clustering.html
- LDA API: https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.clustering.LDA

### Training des Modells

Auf Basis der Token, die zu Vektoren zugeordnet wurden, kann nun das LDA-Model trainiert werden, um Themen aus den Token zu erschließen.

- 20 Themen werden erstellt und 15 Iterationen zum Trainieren des Modells durchgeführt.
- Pipeline führt Vorverarbietung und Training sequenziell durch.

In [6]:
#Performancemessung
start = time.time()

#6. Data Pre-Processing
data = sc.wholeTextFiles('C:/Users/Alina/Big Data/Wikipedia Exports/all_articles_2mb/*/*')
pagesRaw = data.flatMap(splitByDoc)
pagesTitleContent = pagesRaw.map(lambda x: (get_title(x), get_content(x))).filter(lambda x: x[0] != 'error' and x[0] != '')

#RDD to DataFrame
dfPagesTitleContent = sqlContext.createDataFrame(pagesTitleContent, ['title', 'content'])

regexTokenizer = RegexTokenizer(inputCol='content', outputCol='list_of_words_raw', pattern='\\W', minTokenLength=4)

stopWordsRemover = StopWordsRemover(inputCol='list_of_words_raw', outputCol='list_of_words')
stopwordsSpark = stopWordsRemover.getStopWords()
stopwordsSpark.extend(['also'])
stopWordsRemover.setStopWords(stopwordsSpark)

countVectorizer = CountVectorizer(inputCol='list_of_words', outputCol='features')

#7. LDA Model mit PySpark
lda = LDA(k=20, maxIter=15)
pipeline = Pipeline(stages=[regexTokenizer, stopWordsRemover, countVectorizer, lda])
model = pipeline.fit(dfPagesTitleContent)

#Performancemessung
end = time.time()
print(end - start)

18.725643634796143


#### Ergebnis der Pipeline 
Ergebnis der vier Operationen (Tokenzier, Stopwörter entfernen, Vektorzuordnung und LDA-Model trainieren)

In [7]:
model.stages

[RegexTokenizer_fc3281ed9360,
 StopWordsRemover_3bfa94a727f4,
 CountVectorizerModel: uid=CountVectorizer_177bd0f5f2f4, vocabularySize=24678,
 LocalLDAModel: uid=LDA_35ec442247bb, k=20, numFeatures=24678]

In [8]:
cleanedPagesTitles = pagesTitleContent.map(lambda x: x[0])
cleanedPagesTitles.take(20)
#cleanedPagesTitles.count()

['Anarchism\r',
 'Autism\r',
 'Albedo\r',
 'A\r',
 'Alabama\r',
 'Achilles\r',
 'Abraham Lincoln\r',
 'Aristotle\r',
 'An American in Paris\r',
 'Academy Award for Best Production Design\r',
 'Academy Awards\r',
 'Actrius\r',
 'Animalia (book)\r',
 'International Atomic Time\r',
 'Altruism\r',
 'Ayn Rand\r',
 'Alain Connes\r',
 'Allan Dwan\r',
 'Algeria\r',
 'List of Atlas Shrugged characters\r']

## 8. Result des LDA mit PySpark

### 8.1 Übersicht über erstellte "Topics" durch LDA

Nachdem das Modell trainiert wurde, sollen nun die erstellten Themen vorgestellt werden.

Im Folgenden werden für jede der 20 Themen die 10 am stärksten gewichteten Wörter angezeigt. 

In [9]:
model.stages[3].describeTopics(10).show()

# Genauere Darstellung
# model.stages[3].describeTopics(10).show(truncate=False)

+-----+--------------------+--------------------+
|topic|         termIndices|         termWeights|
+-----+--------------------+--------------------+
|    0|[21501, 17949, 44...|[5.90065693494601...|
|    1|[56, 1136, 1471, ...|[0.00144882192452...|
|    2|[599, 1258, 930, ...|[2.24467344719559...|
|    3|[161, 3417, 1017,...|[1.16927537452539...|
|    4|[13394, 16986, 67...|[5.80221435792415...|
|    5|[8499, 10935, 210...|[5.81321402653009...|
|    6|[24412, 12702, 24...|[5.66337017891234...|
|    7|[20052, 5159, 239...|[5.69584585574013...|
|    8|[9514, 15321, 229...|[5.86089455893770...|
|    9|[6158, 6519, 3629...|[5.73687311495423...|
|   10|[599, 259, 930, 1...|[3.86562899730206...|
|   11|[6330, 19542, 106...|[5.95510184606180...|
|   12|[7008, 19694, 192...|[6.10605986452751...|
|   13|[485, 16816, 1574...|[6.31448857484985...|
|   14|[7, 40, 0, 58, 2,...|[0.00545638404192...|
|   15|[13764, 4344, 169...|[5.93892669375734...|
|   16|[14341, 354, 9449...|[5.86385995742331...|


Wörter werden in <code>describeTopics()</code> nur anhand ihrer Indizen angezeigt.

Nun zeigen wir die konkreten Wörter in den Topics an.

In [10]:
# Vokabeln aus dem "countVectorizerModel"
vocab = model.stages[2].vocabulary

# describeTopics(): Übersicht über Themen, Top-Wörter und deren Gewichtung
topics = model.stages[3].describeTopics(3)

# Wörter anhand ihrer Indizen zuordnen und darstellen
topicsRdd = topics.rdd
topicsRaw = topicsRdd.map(lambda row: row['termIndices']).collect()
list_words = map(lambda entry: [vocab[idx] for idx in entry], topicsRaw)
print(list(list_words))

[['parthia', 'scribbling', 'findings'], ['achilles', 'hector', 'patroclus'], ['swift', 'modest', 'proposal'], ['animal', 'animalia', 'refer'], ['germanium', 'formula_5th', 'grandson'], ['profoundly', 'integers', 'bicycles'], ['pennock', 'futile', 'hannibal'], ['tile', 'louisiana', 'stucco'], ['lungless', 'skiing', 'disarray'], ['descriptions', 'posthumously', 'fertile'], ['swift', 'anatomy', 'proposal'], ['knight', 'respects', 'upper'], ['europa', 'analysing', 'roster'], ['landing', 'unsd', 'methodius'], ['lincoln', 'aristotle', 'first'], ['siphnian', 'chemicals', 'faunal'], ['matrilineal', 'half', 'figurative'], ['ismenus', 'borrell', 'prophesied'], ['1800s', 'afrotheres', 'punched'], ['alternation', 'pseudohalogens', 'surinam']]


### 8.2 Neuen Artikel hinzufügen, um Ergebnis zu testen

PySpark bietet keine direkten Funktionen an, um einen zusätzlichen Datensatz (hier: Artikel) mit den existieren Topics zu vergleichen. Aus diesem Grund wenden wir für Testzwecke den LDA-Algorithmus auf einen Artikel an, um die häufigsten Wörter zu erhalten.

- Themennahen Artikel auswählen
- <code>clean()</code> zur Datenvorverarbeitung
- DF erzeugen, da benötigtes Format
- Vektoren zuordnen
- Modell trainieren und häufigste Wörter im Topic anzeigen

In [12]:
article_title = "Lamborghini"
article_content_test = clean([article_title, wikipedia.page(article_title).content])[1]
article_content_test_rdd = sc.parallelize([article_content_test]).zipWithIndex()
df_txts_test  = sqlContext.createDataFrame(article_content_test_rdd, ['list_of_words', 'index'])
cv_test = CountVectorizer(inputCol='list_of_words', outputCol='features')

cvmodel_test = cv_test.fit(df_txts_test)
result_cv_test = cvmodel_test.transform(df_txts_test)

result_cv_test.select('list_of_words').rdd.flatMap(list).flatMap(list).take(20)

['automobili',
 'lamborghini',
 'p',
 'italian',
 'pronunci',
 'lamborˈɡiːni',
 'italian',
 'brand',
 'manufactur',
 'luxuri',
 'sport',
 'car',
 'suv',
 'base',
 'sant',
 'agata',
 'bolognes',
 'compani',
 'own',
 'volkswagen']

### 8.4 Schlussfolgerungen ziehen

Für Schlussfolgerungen referenzieren wir uns auf die "Log Likelihood" und "Perplexity", wie in der PySpark-Dokumentation angegeben.
https://spark.apache.org/docs/latest/ml-clustering.html#latent-dirichlet-allocation-lda

Log Likelihood = Calculates a lower bound on the log likelihood of the entire corpus.
- "Log Likelihood value is a measure of goodness of fit for any model. Higher the value, better is the model."
- "Lower bound: a value that is less than or equal to every element of a set of data."

Perplexity = Calculate an upper bound on perplexity. (Lower is better.)
- "It captures how surprised a model is of new data it has not seen before"

https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.clustering.LDA

https://towardsdatascience.com/evaluate-topic-model-in-python-latent-dirichlet-allocation-lda-7d57484bb5d0

In [13]:
ll = model.stages[3].logLikelihood(result_cv_test)
lp = model.stages[3].logPerplexity(result_cv_test)
print('The lower bound on the log likelihood of the entire corpus: ' + str(ll))
print('The upper bound on perplexity: ' + str(lp))

The lower bound on the log likelihood of the entire corpus: -704087.733206806
The upper bound on perplexity: 240.13906316739633


## 9. Leistungsmessung

Während der Erarbeitung und Analyse zu Spark und Gensim wurden auch verschiedene Testläuft durchgeführt. Dabei wurden die Zeiten gemessen, welche Spark oder Gensim benötigt haben, um die verwendeten Daten zu verarbeiten. Dabei ist zu beachten, dass die Vorverarbeitung der Daten jeweils in Spark geschieht und die Unterschiede durch die Verarbeitung der Daten (also der Modellerstellung) mit Spark, bzw. Gensim, zustandekommen. Die Zeiten unserer Testdurchläufe haben wir in den folgenden Diagrammen grafisch aufbereitet. Für die Tests wurden Datensätze mit 10, 100, 150, 1.000 und 10.000 MegaByte (MB)verwendet. Der Fokus hierbei lag auf den Datensätzen bis 1.000 MB.
<br>Alle Tests wurden auf derselben Hardware durchgeführt, einem „HP Spectre x360 15-df0126ng“.

   - CPU    = Intel Core i7-8750H mit 6 x 2,2GHz (TurboBoost: 4,1GHz)
   - RAM    = 2 x 8GB DDR4

Bei den Messungen wurden Messungen für Gensim im SingleCore Modus (SC) und im MultiCore Modus (MC) durchgeführt, da hier der Modus selbst gewählt werden kann.<br>
Die ersten Tests wurden mit dem Local-Mode von Spark durchgeführt. Dabei ist aufgefallen, dass hier der Rechner mit 16 GB Arbeitsspeicher immer wieder an seine Grenzen gekommen ist. Spark hatte bereits ab einem Datensatz von 150 MB Probleme („out of memory – error“). Unter Gensim trat dieses Problem erst bei dem Datensatz mit 1.000 MB auf.

<img src="Diagramm_01.png">

Vergleicht man die Zeiten fällt direkt auf, dass Spark im Local Mode deutlich schneller bei der Berechnung ist als Gensim. Dabei sind vor allem die Werte des MC-Modus mit Spark zu vergleichen, da Spark ebenfalls im Mehrkernbetrieb operiert.<br>
Im Anschluss daran haben wir den Standalone Mode von Spark verwendet. Die Testläufe wurden jeweils mit 2 Workern durchgeführt und die Anzahl der CPU-Kerne von Test zu Test verändert. Jedem Worker standen 8 GB Arbeitsspeicher zur Verfügung.<br>
Hier konnten die Spark Tests erfreulicherweise mit allen Datensätzen durchgeführt werden. Zum Schluss haben wir noch einen 10.000 MB Datensatz getestet und konnten eine Durchlaufzeit von 1:07:51, bei 2 Workern mit je 4 CPU-Kernen und jeweils 8 GB Arbeitsspeicher, erzielen. 

<img src="Diagramm_02.png">

Die erste Darstellung zeigt die Entwicklung der Durchlaufzeiten für die jeweiligen Datensätze. Angefangen mit 10 MB hin zu 1.000 MB. Dabei ist ersichtlich, dass die Zeiten bei kleinen Datensätzen relativ ähnlich sind, sobald wir aber 1.000 MB verwenden, die Zeiten auseinanderdriften.

<img src="Diagramm_03.png">

Das zweite Diagramm stellt die Zeiten im Vergleich zu den verwendeten CPU-Kernen dar. Dabei fällt auf, dass die beste Kombination in diesem Testfall der Datensatz mit 1.000 MB und je 4 CPU Kernen pro Worker zu sein scheint. Bei den kleineren Datenmengen waren die Zeiten, trotz verschiedener Konfigurationen, sehr konstant.<br>
Im Anschluss daran haben wir uns gefragt, wie sich die Dauer wohl mit einem extrem großen Datensatz entwickeln würde und einen Datensatz mit 10.000 MB verwendet. Die Daten dazu sind im nachfolgenden Diagramm zu finden.

<img src="Diagramm_04.png">

Vergleicht man den Anstieg der Zeiten kann man feststellen, dass die Dauer im Verhältnis zur Datenmenge weniger zunimmt. Bei der grauen Linie, welche der leistungsstärksten Konfiguration entspricht, hätten bei linearem Verlauf durch die anfangs 18 Sekunden für 10 MB die folgenden Werten für die anderen Datensätze erwartet werden können:<br>
- 100 MB    = 180 statt 64 Sekunden       -> 35% der Zeit
- 1.000 MB  = 1.800 statt 512 Sekunden    -> 28% der Zeit
- 10.000 MB = 18.000 statt 4.071 Sekunden -> 22% der Zeit<br>
Dadurch wird ersichtlich, dass die Datenverarbeitung mit Spark tatsächlich auf sehr große Datensätze ausgelegt ist und dort ihre Vorteile erst richtig entfalten kann. Je größer die Datensätze in unserem Beispiel wurden, desto mehr Zeit konnte im Verhältnis eingespart werden.

Um zu überprüfen, ob sich der Trend mit geringeren Durchlaufzeit bei der Konfiguration mit je 4 CPU-Kernen fortsetzt, wurde zusätzlich noch eine Messung mit dieser Konfiguration und dem 10.000 MB Datensatz durchgeführt. Auch hier war die Durchlaufzeit geringer als mit 6 CPU-Kernen je Worker. Jedoch ist der Abstand hier im Verhältnis zueinander nichtmehr so groß wie zuvor. 
<br>
Für unsere Tests mit Gensim haben wir eine Art Abkürzung gewählt und nur 2 verschiedene Konfigurationen für den Standalone Mode verwendet. Einmal mit 2 CPU-Kernen je Worker und einmal 6 CPU-Kerne je Worker, da hier vor allem die Unterschiede zwischen Single- und Multi-Core Verarbeitung durch Gensim im Vordergrund standen. Messungen mit 1.000 MB großen Datensätzen konnten aufgrund des dann auftretenden „out of memory“ Fehlers – also aufgrund von Arbeitsspeichermangel - nicht durchgeführt werden.

<img src="Diagramm_05.png">

Deutlich zu sehen in der obigen Grafik ist, dass es quasi keine Auswirkung hat wie viele CPU-Kerne dem Standalone Modus von Spark zur Verfügung stehen. Dafür hängt es umso mehr davon ab, ob Gensim im Singlecore oder im Multicore Modus ausgeführt wird. Der Multicore Modus reduziert die jeweilige Bearbeitungszeit deutlich. Auch zu beobachten ist, dass der Datensatz mit den 150 MB im Multicore Modus schneller bearbeitet werde konnte als der Datensatz mit 100 MB (siehe Diagramm unten). Dies lässt darauf schließen, dass hier ebenfalls eine Steigerung der Geschwindigkeit im Verhältnis zur Datenmenge feststellbar ist, wenn die Datensätze größer werden.<br>
Dies ist in der nachfolgenden Grafik nochmals deutlich eindeutiger zu sehen:

<img src="Diagramm_06.png">

Bei sehr kleinen Datensätzen kann in der Ausführungsdauer kaum ein Unterschied beobachtet werden. Bei großen Datensätzen ist also vor allem die Durchführung mit mehreren CPUs von entscheidender Bedeutung.<br>

Durch unsere Messungen konnten wir schlussendlich mehrere Kernpunkte identifizieren:
-	Je größer die Datenmenge ist, desto besser performen sowohl Spark als auch Gensim.
-	Bei lokaler Durchführung stoßen normale Computer schnell an ihre Leistungsgrenzen, vor allem den Arbeitsspeicher betreffend, wenn große Datenmengen verwendet werden.
-	Spark ist deutlich performanter als Gensim und führ die Verarbeitung der Daten deutlich schneller durch.
-	Auch bei großen Datenmengen kann es einen Unterschied machen wie viele Hardware-Ressourcen verwendet werden, da eine höhere Parallelisierung auch immer mehr Koordination und Kommunikation benötigt.