# Big Data Analytics
Praktikum Sommersemester 2023. <small>Version 1.0</small>

**Aufgabe 3: Abfragen mit Apache Spark** 

Machen Sie sich mit Apache Spark vertraut. Bearbeiten Sie die Aufgaben indem Sie **Spark RDD** entsprechend transformieren. 

## Arbeitsanweisung
Nutzen Sie die markierten Zellen im vorliegenden Notebook `BDA1_A3_Spark.ipynb` für Ihre Lösungen und laden Sie es in Ilias hoch. In den Zellen muss ausführbarer python code vorliegen. Die Ausgabe soll unterhalb der jeweiligen Zellen produziert werden.
Liefern Sie auch aussagekräftiges Markdown zu Ihrem Code (Vorgehen, Quellen, etc) ab.

**Hinweis**: Verwenden Sie für diese Aufgaben *nicht* Spark SQL und *keine* Dataframes. 

----

## Vorbereitung
* Verwenden Sie immer den vorgegeben Spark Master um Inkonsistenzen der python3 Versionen zwischen Worker und Client zu verhindern.
* Ändern Sie nicht die SparkContext Konfiguration und beenden Sie bitte den SparkContext nachdem Sie die Bearbeitung beenden, um die Resourcen wieder frei zu geben! (`stop_sc1()`)
* Stellen Sie sicher, dass `pyspark` installiert ist (pip install)

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[K     |████████████████████████████▏   | 273.4 MB 116.9 MB/s eta 0:00:01

IOPub data rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_data_rate_limit`.

Current values:
NotebookApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
NotebookApp.rate_limit_window=3.0 (secs)



[K     |████████████████████████████████| 310.8 MB 22 kB/s 
[?25hCollecting py4j==0.10.9.7
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
[K     |████████████████████████████████| 200 kB 57.0 MB/s eta 0:00:01
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317145 sha256=177e06ebb4cd33c3794e685e80071d1a294c8b5ff0d4ed057e0d6d9ce1fa3a85
  Stored in directory: /home/jovyan/.cache/pip/wheels/9f/34/a4/159aa12d0a510d5ff7c8f0220abbea42e5d81ecf588c4fd884
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.7 pyspark-3.4.0


Für diese Aufgabe steht ein HDFS mit dem Namenode `namenode` unter Port `19000` bereit. 

Sie finden die folgenden Dateien darin:

* **`/data/bda1/co2data.tsv`**<br>Datensatz von Messungen verschiedener CO<sub>2</sub>-Sensoren
* **`/data/bda1/co2data_pm.tsv`**<br>Datensatz von Messungen verschiedener CO<sub>2</sub>-Sensoren mit Partikelmessung, pm ist die Partikeldichte, npm die Partikelanzahl bestimmter Größen

In [108]:
import pyspark
from pprint import pprint
import os
import sys

print(sys.version)  # python3 version

os.environ['PYSPARK_PYTHON'] = '/usr/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = 'ipython3'
os.environ['PYSPARK_DRIVER_PYTHON_OPTS'] = 'notebook'
def stop_sc1():
    """stop spark context if exists"""
    try:
        sc1.stop()
        print('Spark Context stopped')
    except Exception as ex1:
        print(f'No context stopped: {ex1}')

3.9.6 | packaged by conda-forge | (default, Jul 11 2021, 03:39:48) 
[GCC 9.3.0]


### Spark Context erzeugen
Die Session mit dem vorgegebenen Spark Master öffnen.

In [109]:
stop_sc1()

# never ever change these lines!
config = pyspark.SparkConf().setAll([('spark.executor.memory', '1g'), ('spark.executor.cores', '1'), ('spark.cores.max', '2'), ('spark.driver.memory','1g'), ("spark.app.name", os.environ['JUPYTERHUB_CLIENT_ID'])])
sc1 = pyspark.SparkContext(master='spark://jupiter.bigdata.fh-aachen.de:17077', conf=config)

Spark Context stopped


## Aufgabe 3 a

Untersuchen Sie die Sensordaten unter ``hdfs://namenode/data/bda1/*``. Beantworten Sie die folgenden Fragen, indem Sie jeweils geeignete RDD Operationen in Spark ausführen:

1. Wieviele Messungen sind der Datenmenge `co2data.tsv` zu finden? Wieviele sind es in `co2data_pm.tsv` ?
2. Wie lauten die Attributnamen der Datenmenge `co2data_pm.tsv` ? Geben Sie sie zeilenweise aus.
3. Wieviele verschiedene Sensoren (angegeben im Feld _serial_number_) enhält die Datenmenge `co2data.tsv` ?  Beachten Sie den Hinweis unter 3a)
4. Wieviele Datenpunkte je Sensor liegen vor in `co2data.tsv` ? Geben Sie sowohl Sensor als auch Anzahl aus und beachten Sie den Hinweis unter 3a)
5. Was ist der höchste, und was der niedrigste Temperaturwert in der Datenmenge `co2data_pm.tsv` ? 
6. Was ist der durchschnittliche CO<sub>2</sub>-Wert je Sensor in der Datenmenge `co2data_pm.tsv` ? Runden Sie gerne auf drei Nachkommastellen und beachten Sie den Hinweis unter 3a)

**Hinweis:**<br>Die Serial Number besteht aus drei Komponenten: `s_` als Präfix, die eindeutige MAC-Adresse des Sensors und einer Zahl. Die Eindeutigkeit wird nur über die MAC-Adresse bestimmt. Nutzen Sie die MAC-Adresse.

Nachdem wir die Konfigurationen gesetzt und den Spark Kontext erzeugt haben, haben wir den Spark Kontext unter der Variable
`sc1` gespeichert. Um die Datei unter `hdfs://namenode/data/bda1/*` zu lesen haben wir den Port `19000` wie oben in der Aufgabenstellung
angekündigt definiert und einen Pfad: `hdfs://namenode:19000/data/bda1/co2data.tsv` bzw. `hdfs://namenode:19000/data/bda1/co2data_pm.tsv`
erhalten. Mit der Funktion `textFile()` können wir nun die Dateien lesen und die Anzahl der Messungen mit der Operation `count() - 1` finden. 
Wir zielen 1 ab, um den Header zu überspringen und reine Daten zu zählen.

In [116]:
# Aufgabe 3a 1
read_co2data = sc1.textFile("hdfs://namenode:19000/data/bda1/co2data.tsv")
counter1 = read_co2data.count() - 1
read_co2data_pm = sc1.textFile("hdfs://namenode:19000/data/bda1/co2data_pm.tsv")
counter2 = read_co2data_pm.count() - 1
print("Anzahl der Messungen in co2data.tsv:", counter1)
print("Anzahl der Messungen in co2data_pm.tsv:", counter2)



Anzahl der Messungen in co2data.tsv: 14013884
Anzahl der Messungen in co2data_pm.tsv: 14001337


                                                                                

Mit der Funktion `first()` bekommen wir die erste Zeile der Datenmenge und geben sie dann mit einer for Schleife aus, wobei wir 
jede Zeile der RDD durch einen Tab Delimiter `\t` getrennt haben, um die einzelnen Felder zu erhalten.

In [117]:
# Aufgabe 3a 2
attributes = read_co2data_pm.first()
for attr in attributes.split("\t"):
    print(attr)

"timestamp"
"measurement_count"
"version"
"serial_number"
"co2_ppm"
"temperature_celsius"
"relative_humidity_percent"
"pm1"
"pm2"
"pm4"
"pm10"
"npm0"
"npm1"
"npm2"
"npm4"
"npm10"
"ps"


In dieser Aufgabe zählen wir mithilfe von mapReduce die Anzahl der eindeutigen Sensoren. Zunächst definieren wir die erste Zeile als Header,
um es in der nächsten Zeile zu überspringen und Daten ohne Header zu erhalten. Danach erzeugen wir die Liste verschiedener Sensoren, wobei
wir als Eigenschaft der Eindeutigkeit die Mac-Adresse nehmen, die zwischen der 3 und 15 Charakter liegt. Dann zählen wir die einzelnen
Sensoren mithilfe von MapReduce.

In [118]:
# Aufgabe 3a 3
header = read_co2data.first()
data_without_header = read_co2data.filter(lambda row: row != header)
co2_data = data_without_header.map(lambda row: (row.split("\t")[1][3:15]))
unique_s = co2_data.distinct()
cnt = unique_s.map(lambda attr: (attr, 1)).reduceByKey(lambda x, y: x + y)
count_unique_sens = cnt.count()
print("Anzahl verschiedenen Sensoren:", count_unique_sens)



Anzahl verschiedenen Sensoren: 10


                                                                                

Hier zählen wir die Anzahl der eindeutigen Sensoren mit der Funktion `countByValue()` und geben diese Anzahl aus.

In [119]:
# Aufgabe 3a 4
cnt_byValue = co2_data.countByValue()
for value, amt in cnt_byValue.items():
    print("Sensor: ", value, ", Amount: ", amt)



Sensor:  d8bfc014724e , Amount:  2103522
Sensor:  10521c0202ab , Amount:  2064
Sensor:  e8db84c5f771 , Amount:  1665530
Sensor:  8caab57c3e19 , Amount:  1561046
Sensor:  e8db84c5f33d , Amount:  2270696
Sensor:  8caab57a6dd9 , Amount:  2781677
Sensor:  10521c01cf19 , Amount:  385105
Sensor:  d8bfc0147061 , Amount:  578457
Sensor:  3c6105d3abae , Amount:  1533919
Sensor:  8caab57cc961 , Amount:  1131868


                                                                                

In dieser Aufgabe arbeiten wir der `co2data_pm`. Wir überspringen den Header(wollte schon oben definierte Variable nehmen, habe aber einen
Fehler bekommen, daher nochmal definiert). Dann nehmen wir die Spalte mit den Temperaturwerten und stellen sicher, dass wir keinen `null`
oder leeren Wert nehmen. Das realisieren wir mit der `filter()` Funktion. Danach wieder mit map ersetzen wir die Anführungszeichen bei
Werten und casten, um eine Zahl zu erhalten. Schließlich haben wir eine Liste mit allen numerischen Temperaturwerten bekommen, woraus wir
einen maximalen und minimalen Temperaturwert mit den Funktionen `max()` und `min()` bekommen.

In [120]:
# Aufgabe 3a 5
attributes = read_co2data_pm.first()
data_pm_without_header = read_co2data_pm.filter(lambda row: row != attributes)
co2_data_pm = data_pm_without_header.map(lambda row: row.split("\t")[5])
data_without_null = co2_data_pm.filter(lambda row: row != '"null"' and row != '')
numbers = data_without_null.map(lambda n: float(n.replace('"', '')))
max_temp = numbers.max()
min_temp = numbers.min()
print("Maximale Temperatur: ", max_temp, ", Minimale Temperatur: ", min_temp)



Maximale Temperatur:  41.0 , Minimale Temperatur:  6.0


                                                                                

Wir arbeiten immer noch mit der Datei aus 3a 5. Nun brauchen wir ein RDD mit zwei Werten: Sensorname und durchschnittlicher CO2 Wert für
diesen Sensor. Wir überspringen den Header wieder und erstellen ein RDD mit zwei Werten aus unserer Datei, hier haben wir noch den Sensor
und sein CO2 Wert. Jetzt filtern wir wieder beide Spalten, um die `null` Werte zu entfernen und casten auf `float` den zweiten Wert aus
der Tupel, was ein CO2 Wert ist. Und dann gruppieren wir die Keys, was in diesem Fall die Sensornamen sind und weisen diesen einzelnen Sensornamen 
den durchschnittlichen CO2 Wert zu, den wir durch Division der Summe aller CO2 Werte durch die Anzahl bekommen. Darüber hinaus runden wir
die CO2 Zahlen bis auf 3. Nachkommazahl. Zum Schluss nutzen wir die Funktion `collect()`, um eine Liste der Tupeln mit Sensornamen und 
Durchschnittswerten zu bekommen und geben die Werte aus.

In [121]:
# Aufgabe 3a 6
attributes = read_co2data_pm.first()
data_pm_without_header = read_co2data_pm.filter(lambda row: row != attributes)
co2_data_pm_values = data_pm_without_header.map(lambda sensor: (sensor.split("\t")[3][3:15], sensor.split("\t")[4]))
data_without_null = co2_data_pm_values.filter(lambda row: row[0] != '"null"' and row[1] != '"null"')
numbers_pm = data_without_null.map(lambda n: (n[0], float(n[1].replace('"',''))))
avg_numbers = numbers_pm.groupByKey().mapValues(lambda num: sum(num) / len(num))
round_numbers = avg_numbers.mapValues(lambda num: round(num, 3))
num_list = round_numbers.collect()
for sensor, co2_val in num_list:
    print("Sensor: ", sensor, ", Avg_CO2_Wert: ", co2_val)



Sensor:  e8db84c5fc6a , Avg_CO2_Wert:  554.979
Sensor:  308398a2a1f6 , Avg_CO2_Wert:  657.518
Sensor:  8caab57c9751 , Avg_CO2_Wert:  519.263
Sensor:  8caab57cc813 , Avg_CO2_Wert:  547.52
Sensor:  8caab57d01da , Avg_CO2_Wert:  574.967
Sensor:  308398a2ddcb , Avg_CO2_Wert:  481.186
Sensor:  3c6105d3908f , Avg_CO2_Wert:  730.933
Sensor:  8caab57cbb52 , Avg_CO2_Wert:  496.846
Sensor:  a848fac03782 , Avg_CO2_Wert:  549.538
Sensor:  ac0bfbd6547d , Avg_CO2_Wert:  486.608
Sensor:  308398b5b69c , Avg_CO2_Wert:  1058.714
Sensor:  308398a2fb52 , Avg_CO2_Wert:  541.715
Sensor:  308398a2f790 , Avg_CO2_Wert:  688.314
Sensor:  ac0bfbd64321 , Avg_CO2_Wert:  580.367
Sensor:  308398b595c0 , Avg_CO2_Wert:  531.905
Sensor:  e8db84c62ab4 , Avg_CO2_Wert:  560.562
Sensor:  3c6105d381e8 , Avg_CO2_Wert:  546.007
Sensor:  e8db84c5f33d , Avg_CO2_Wert:  613.406
Sensor:  e8db84c5f771 , Avg_CO2_Wert:  960.3
Sensor:  ac0bfbd85271 , Avg_CO2_Wert:  573.868
Sensor:  3c6105d467fd , Avg_CO2_Wert:  640.568
Sensor:  8caab5

                                                                                

## Aufgabe 3 b

Machen Sie Aussagen in Markdown zu den beiden Datenmengen, nachdem Sie sich die Information aus Spark gezogen haben.

* Kommen Sensoren (MAC-Adresse in Serial Number) in beiden Datenmengen vor? Wenn ja, welche?
* Anhand welchen Attributs können Sie auf vorhandene Werte in den Attributen `pm*` und `npm*` filtern? Geben Sie ein Beispiel.
* Enthält eine Datenmenge "fehlerhafte" CO<sub>2</sub>-Messungen? Wenn ja, wieviele Messungen sind betroffen? Fehlerhaft ist ein Wert `null`


<hr>
In dieser Aufgabe haben wir zwei Listen der verschiedenen Sensoren aus `co2data` und `co2data_pm` erzeugt. Um die Duplikate aus den beiden
Listen zu bekommen, haben wir die Funktion `intersection()` benutzt und mit `collect()` eine Liste der in beiden RDDs vorkommenden Sensoren
ausgegeben.

In [122]:
# keine Zellenvorgabe
header1 = read_co2data.first()
data_without_header_1 = read_co2data.filter(lambda row: row != header1)
co2_data = data_without_header_1.map(lambda row: (row.split("\t")[1][3:15]))
unique_s = co2_data.distinct()

header_2 = read_co2data_pm.first()
data_without_header_2 = read_co2data_pm.filter(lambda row: row != header_2)
co2_data_pm = data_without_header_2.map(lambda row: (row.split("\t")[3][3:15]))
unique_s_pm = co2_data_pm.distinct()

duplicates = unique_s.intersection(unique_s_pm).collect()
print("Folgende Sensoren kommen in beiden Datenmengen vor: ", duplicates)



Folgende Sensoren kommen in beiden Datenmengen vor:  ['e8db84c5f33d', 'e8db84c5f771', '10521c0202ab', '8caab57c3e19', '3c6105d3abae']


                                                                                

Hier nehmen wir den RDD von co2_data_pm und nehmen z.B die 7 Spalte, die `pm1` ist. Wir filtern mit der Funktion `filter()` auf vorhandene
Werte `is not None and p!= '"null"'`. Dann nehmen wir die ersten 10 Werte, weil die ganze Liste zu groß ist und geben die aus. 
Die weiteren pm und npm Spalten können analog mit entsprechendem Spaltenindex ausgewählt und ausgegeben werden.

In [123]:
header = read_co2data_pm.first()
data_without_header = read_co2data_pm.filter(lambda row: row != header)
pm1 = data_without_header.map(lambda row: (row.split("\t")[7]))
filter_pm1 = pm1.filter(lambda p: p is not None and p != '"null"').take(10)
print("Die ersten 10 Werte von pm1 auf existierende gefiltert: ", filter_pm1)

Die ersten 10 Werte von pm1 auf existierende gefiltert:  ['"1.58"', '"2.12"', '"1.59"', '"2.15"', '"1.69"', '"2.1"', '"1.77"', '"2.01"', '"1.78"', '"1.93"']


In der letzten Aufgabe haben wir die mit der `co2_data_pm` gearbeitet. Die RDD haben wir die `null` Werte gefiltert und sie dann
mit `count()` gezählt. Zum Schluss haben wir die Anzahl der fehlerhaften Messungen mit `print()` ausgegeben.

In [124]:
co2_numbers = data_without_header.map(lambda row: (row.split("\t")[4]))
null_vals = co2_numbers.filter(lambda k: k == '"null"').count()
print("Anzahl der fehlerhaften Messungen: ", null_vals)



Anzahl der fehlerhaften Messungen:  9


                                                                                

In [125]:
stop_sc1()  # always exit your spark context after work!

Spark Context stopped


## Nützliche Links
* https://spark.apache.org/docs/latest/rdd-programming-guide.html
* https://spark.apache.org/docs/latest/api/python/reference/pyspark.html#rdd-apis
* https://spark.apache.org/examples.html