___

In [1]:
import findspark
findspark.init()

In [3]:
# Imports
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
import pandas as pd
import numpy as np

# (I can't get no) satisfaction
Wir arbeiten weiterhin mit dem Datensatz über die Zufriedenheit von Angestellten einer fiktiven Firma. In dieser Kurseinheit betrachten wir die parallele Verarbeitung mit Spark. 

*Anmerkung: Der verwendete Datensatz ist natürlich eher klein, so dass normalerweise keine Parallelisierung benötigt wird. In der Kurseinheit dient er aber dazu, die grundlegenden Konzepte von Spark zu vermitteln.*

*Wichtige Methoden finden Sie im Foliensatz, es empfiehlt sich jedoch auch, die <a href="https://spark.apache.org/docs/latest/api/python/reference/index.html">pyspark-Doku</a> für die Bearbeitung zu nutzen.*

*In dieser Kurseinheit werden u.a. Lambda-Funktionen verwendet. Wenn Sie damit noch nicht vetraut sind, hilft ein Blick in die <a href="https://www.w3schools.com/python/python_lambda.asp">Python-Lambas-Doku</a>.*

Den Eintrittspunkt in die Funktionalität von Spark stellt der SparkContext dar. Dieser repräsentiert auch die Verbindung zu den Exekutoren. Bei der Erstellung wird außerdem eine Sammlung von Konfigurationsparametern mitgegeben. Wir beschränken uns zunächst auf die Angabe des Masters und einem App-Namen:
- **master:** Wir verwenden YARN als Ressourcen-Manager und starten die Anwendung im Client-Modus.
- **appName:** Hier vergeben wir einen Namen für unsere Applikation.

Da wir später auch mit SparkSQL arbeiten wollen, erstellen wir zudem direkt eine SparkSession. Diese stellt den Eingangspunkt für die SparkSQL-Funktionalität zur Verfügung und beinhaltet auch den SparkContext.

In [76]:
# SparkSession erstellen
spark = SparkSession.builder \
    .appName("My Spark Application") \
    .getOrCreate()

# SparkContext
sc = spark.sparkContext

## RDDs
Zunächst betrachten wir die Arbeit mit RDDs. Wie Sie in den Vorlesungsvideos gelernt haben, bilden diese die Grundlage der parallelen Verarbeitung. In der Praxis wird meist nicht direkt mit RDDs, sondern mit einer High-level-Bibliothek wie SparkSQL gearbeitet. Es ist jedoch wichtig, die Konzepte von RDDs zu verstehen, da diese die Basis von Spark darstellen. 

### Eigenschaften von RDDs
Betrachten wir zunächst die wichtigsten Eigenschaften von RDDs:
- In-Memory-Verarbeitung
- Lazy
- Partitioniert
- Unveränderlich
- Lineage

#### In-Memory-Verarbeitung
Beschreiben Sie die Vorteile der In-Memory-Verarbeitung.

#### Lazy
Beschreiben Sie, was es für die Programmierung bedeutet, dass die Verarbeitung lazy ausgeführt wird. Programmieren Sie ein Code-Beispiel Ihrer Wahl, in dem Sie die Auswirkungen der lazy Ausführung demonstrieren. Nennen Sie zudem die wichtigsten Unterschiede zwischen Transformations und Actions.

In [1]:
'''
Ein gutes Beispiel für Lazy-Ausführung ist die Verwendung von Generatoren in Python. 
Ein Generator erzeugt Werte “on the fly”, aber nur wenn sie benötigt werden. 
'''
def count_up_to(n):
    count = 1
    while count <= n:
        yield count
        count += 1

# Erstellen des Generatord, aber es wird noch nichts berechnet
numbers = count_up_to(5)

# Die Zahlen werden erst generiert, wenn wir durch sie iterieren
for number in numbers:
    print(number)


1
2
3
4
5


#### Partitioniert
Erstellen Sie aus der vorgegebenen Liste (`data`) ein RDD namens `rdd_part` mit drei Partitionen. Kontrollieren Sie nach der Erstellung, dass die korrekte Anzahl Partitionen erzeugt wurde.

In [78]:
data = [1 for i in range (10)] + [0 for i in range (5)]
data

[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0]

In [79]:
# RDD erstellen
rdd_part =  sc.parallelize(data, 3)
rdd_part

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:289

In [80]:
# Ihre Lösung
num_partitions = rdd_part.getNumPartitions()

In [81]:
# Anzahl Partitionen anzeigen
'''
n diesem Code wird ein RDD namens rdd_part erstellt, das die Daten aus der Liste data enthält und in drei Partitionen aufgeteilt ist. Die Anzahl der Partitionen wird dann mit getNumPartitions überprüft und ausgegeben.'''

print(f"Das RDD hat {num_partitions} Partitionen.")


Das RDD hat 3 Partitionen.


Im Folgenden schauen wir uns die Verteilung der Daten auf die unterschiedlichen Partitionen an.

> **Achtung:** Die Anwendung von `collect()` in Kombination mit `glom()` ist nur für kleine Datensätze geeignet!

In [None]:
rdd_part.glom().collect()

Wie wir sehen können, bestehen zwei Partitionen aus 1en und eine aus 0en. Filtern Sie das RDD nun so, dass nur die Werte größer 0 übrig bleiben. Erstellen Sie hierfür ein neues RDD namens `rdd_filtered`.

In [9]:
# Ihre Lösung
rdd_filtered = rdd_part.filter(lambda x: x > 0)

Wenn wir uns nun die Verteilung der Daten auf die Partitionen ansehen, sehen wir, dass wir eine leere Partition erzeugt haben:

In [None]:
rdd_filtered.glom().collect()

Beschreiben Sie die Nachteile von leeren Partitionen. Wie kann dieses Problem gelöst werden (Stichwort *Shuffling*) und warum sollte Shuffling normalerweise vermieden werden?

#### Unveränderlich und Lineage
Warum sind RDDs unveränderlich und warum kennt jedes RDD seine Abstammung (Lineage)?

### Daten laden
Laden Sie den Datensatz `employees_satisfaction_transformed.csv` in ein RDD. Denken Sie daran, dass die Daten standardmäßig aus dem HDFS geladen werden. Aus diesem Grund müssen die Daten zunächst ins HDFS geladen werden. Verwenden Sie hierzu das Notebook `hdfs-upload.ipynb`, das wir Ihnen in Moodle zur Verfügung gestellt haben.

Laden Sie anschließend die Daten in ein RDD.

In [82]:
rdd_df = sc.textFile("./employees_satisfaction_transformed.csv")
rdd_df

./employees_satisfaction_transformed.csv MapPartitionsRDD[2] at textFile at NativeMethodAccessorImpl.java:0

### Erste Datenexploration

Lassen Sie sich dann die ersten zehn Zeilen des RDDs anzeigen.

In [21]:
rdd_df.take(10)

[',age,department,education,recruitment_type,job_level,rating,awards,certifications,salary,gender,entry_date,satisfied',
 '0,28,HR,Postgraduate,Referral,5,2.0,1,0,78075.0,Male,2019-02-01,1',
 '1,50,Technology,Postgraduate,Recruitment Agency,3,5.0,2,1,38177.1,Male,2017-01-17,0',
 '2,43,Technology,Undergraduate,Referral,4,1.0,2,0,59143.5,Female,2012-08-27,1',
 '3,44,Sales,Postgraduate,On-Campus,2,3.0,0,0,26824.5,Female,2017-07-25,1',
 '4,33,HR,Undergraduate,Recruitment Agency,2,1.0,5,0,26824.5,Male,2019-05-17,1',
 '5,40,Purchasing,Undergraduate,Walk-in,3,3.0,7,1,38177.1,Male,2004-04-22,1',
 '6,26,Purchasing,Undergraduate,Referral,5,5.0,2,0,78075.0,Male,2019-12-10,1',
 '7,25,Technology,Undergraduate,Recruitment Agency,1,1.0,4,0,21668.4,Female,2017-03-18,0',
 '8,35,HR,Postgraduate,Referral,3,4.0,0,0,38177.1,Female,2015-04-02,1']

Die erste Zeile entspricht dem Header. Entfernen Sie diese aus dem Datensatz.

In [9]:
# Ihre Lösung
header = rdd_df.first()

In [10]:
rdd_no_header = rdd_df.filter(lambda line: line != header)

In [12]:
rdd_no_header.take(5)

['0,28,HR,Postgraduate,Referral,5,2.0,1,0,78075.0,Male,2019-02-01,1',
 '1,50,Technology,Postgraduate,Recruitment Agency,3,5.0,2,1,38177.1,Male,2017-01-17,0',
 '2,43,Technology,Undergraduate,Referral,4,1.0,2,0,59143.5,Female,2012-08-27,1',
 '3,44,Sales,Postgraduate,On-Campus,2,3.0,0,0,26824.5,Female,2017-07-25,1',
 '4,33,HR,Undergraduate,Recruitment Agency,2,1.0,5,0,26824.5,Male,2019-05-17,1']

Zählen Sie nun die Anzahl der Angestellten.

In [25]:
# Ihre Lösung
num_employees = rdd_no_header.count()
print("Anzahl der Angestellten: ", num_employees)

Anzahl der Angestellten:  498


### Datenverarbeitung

Zählen Sie nun die Angestellten pro Department.

*Sollten Sie keinen Lösungsansatz haben, finden Sie unter dem Button "Show Solution" die Beschreibung der notwendigen Verarbeitungs-Schritte.*

1. Aktuell ist jede Zeile des RDD ein String, in dem die einzelnen Felder durch Komma getrennt sind. Dieser String muss zunächst gesplittet werden.
2. Im nächsten Schritt müssen Key-Value-Paare gebildet werden. Der Key ist das Department und der Wert ist 1.
3. Dann müssen pro Key alle Werte addiert werden.

*Wenn Sie Probleme mit der konkreten Umsetzung der Aufgabe haben, finden Sie unter dem Button "Show Solution" Hinweise zur Programmierung.*

- Das Splitten der einzelnen Funktionen erfolgt mit `split(",")`.
- Anschließend liegen die Daten in einer Liste vor. Das dritte Element (`line[2]`) entspricht dem Department.
- Mithilfe der Funktion `reducyByKey()` können die Werte pro Key addiert werden. 

In [15]:
# Ihre Lösung
#Splitting
rdd_split = rdd_no_header.map(lambda line: line.split(','))
rdd_split.take(5)

[['0',
  '28',
  'HR',
  'Postgraduate',
  'Referral',
  '5',
  '2.0',
  '1',
  '0',
  '78075.0',
  'Male',
  '2019-02-01',
  '1'],
 ['1',
  '50',
  'Technology',
  'Postgraduate',
  'Recruitment Agency',
  '3',
  '5.0',
  '2',
  '1',
  '38177.1',
  'Male',
  '2017-01-17',
  '0'],
 ['2',
  '43',
  'Technology',
  'Undergraduate',
  'Referral',
  '4',
  '1.0',
  '2',
  '0',
  '59143.5',
  'Female',
  '2012-08-27',
  '1'],
 ['3',
  '44',
  'Sales',
  'Postgraduate',
  'On-Campus',
  '2',
  '3.0',
  '0',
  '0',
  '26824.5',
  'Female',
  '2017-07-25',
  '1'],
 ['4',
  '33',
  'HR',
  'Undergraduate',
  'Recruitment Agency',
  '2',
  '1.0',
  '5',
  '0',
  '26824.5',
  'Male',
  '2019-05-17',
  '1']]

In [16]:
#Key-Value-Paare erstellen:
rdd_kv = rdd_split.map(lambda line: (line[2], 1))  # line[2] ist das Department
rdd_kv.take(5)

[('HR', 1), ('Technology', 1), ('Technology', 1), ('Sales', 1), ('HR', 1)]

In [22]:
#Werte pro Key addieren:
rdd_counts = rdd_kv.reduceByKey(lambda a, b: a + b)
employee_per_department = rdd_counts.collect()

In [23]:
for department, count in employee_per_department:
    print(f"Department: {department}, Anzahl der Angestellten: {count}")


Department: HR, Anzahl der Angestellten: 106
Department: Technology, Anzahl der Angestellten: 98
Department: Sales, Anzahl der Angestellten: 87
Department: Purchasing, Anzahl der Angestellten: 114
Department: Marketing, Anzahl der Angestellten: 93


Die Arbeit mit RDDs ist offensichtlich recht mühsam, weshalb in der Praxis häufig eine High-level API wie SparkSQL verwendet wird. Diese schauen wir uns im Folgenden an.

## DataFrames
Im Folgenden arbeiten wir mit der High-level API SparkSQL.

*Vorsicht: DataFrames sind die zentrale Abstraktion der SparkSQL-Bibiliothek. Die Bibliothek bietet zwei praktisch gleichwertige APIs, um mit DataFrames zu arbeiten: Die **DataFrame API** und die **SQL API**. Wir arbeiten zunächst mit der DataFrame API.*

### Daten laden
Laden Sie die Daten aus der Datei `employees_satisfaction_transformed.csv` in ein DataFrame. Die Spaltenbezeichnungen sollen dabei im Header stehen und nicht Teil der Daten sein, nutzen Sie außerdem die automatische Schemaerkennung von Spark.

In [24]:
# Ihre Lösung
df = spark.read.csv("./employees_satisfaction_transformed.csv", header=True, inferSchema=True)

In [16]:
df

DataFrame[_c0: int, age: int, department: string, education: string, recruitment_type: string, job_level: int, rating: string, awards: int, certifications: int, salary: double, gender: string, entry_date: date, satisfied: int]

Lassen Sie sich das Schema ausgeben.

In [30]:
# Ihre Lösung
# Schema ausgeben
df.printSchema()


root
 |-- _c0: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- department: string (nullable = true)
 |-- education: string (nullable = true)
 |-- recruitment_type: string (nullable = true)
 |-- job_level: integer (nullable = true)
 |-- rating: string (nullable = true)
 |-- awards: integer (nullable = true)
 |-- certifications: integer (nullable = true)
 |-- salary: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- entry_date: date (nullable = true)
 |-- satisfied: integer (nullable = true)



Lassen Sie sich die Daten anzeigen.

In [17]:
# Ihre Lösung
df.show(10)

+---+---+----------+-------------+------------------+---------+------+------+--------------+-------+------+----------+---------+
|_c0|age|department|    education|  recruitment_type|job_level|rating|awards|certifications| salary|gender|entry_date|satisfied|
+---+---+----------+-------------+------------------+---------+------+------+--------------+-------+------+----------+---------+
|  0| 28|        HR| Postgraduate|          Referral|        5|   2.0|     1|             0|78075.0|  Male|2019-02-01|        1|
|  1| 50|Technology| Postgraduate|Recruitment Agency|        3|   5.0|     2|             1|38177.1|  Male|2017-01-17|        0|
|  2| 43|Technology|Undergraduate|          Referral|        4|   1.0|     2|             0|59143.5|Female|2012-08-27|        1|
|  3| 44|     Sales| Postgraduate|         On-Campus|        2|   3.0|     0|             0|26824.5|Female|2017-07-25|        1|
|  4| 33|        HR|Undergraduate|Recruitment Agency|        2|   1.0|     5|             0|26824

Geben Sie die Anzahl Angestellter aus.

In [31]:
# Ihre Lösung
# Anzahl der Angestellten ermitteln
num_employees = df.count()

# Anzahl der Angestellten ausgeben
print(f"Die Anzahl der Angestellten ist {num_employees}.")


Die Anzahl der Angestellten ist 498.


DataFrames basieren auf RDDs. Deshalb kann auch jedes DataFrame in ein RDD umgewandelt werden.

Erstellen Sie aus dem DataFrame der Angestellten ein RDD und lassen Sie sich die ersten fünf Zeilen anzeigen.

In [32]:
# Ihre Lösung
rdd = df.rdd
rdd.take(5)

[Row(_c0=0, age=28, department='HR', education='Postgraduate', recruitment_type='Referral', job_level=5, rating='2.0', awards=1, certifications=0, salary=78075.0, gender='Male', entry_date=datetime.date(2019, 2, 1), satisfied=1),
 Row(_c0=1, age=50, department='Technology', education='Postgraduate', recruitment_type='Recruitment Agency', job_level=3, rating='5.0', awards=2, certifications=1, salary=38177.1, gender='Male', entry_date=datetime.date(2017, 1, 17), satisfied=0),
 Row(_c0=2, age=43, department='Technology', education='Undergraduate', recruitment_type='Referral', job_level=4, rating='1.0', awards=2, certifications=0, salary=59143.5, gender='Female', entry_date=datetime.date(2012, 8, 27), satisfied=1),
 Row(_c0=3, age=44, department='Sales', education='Postgraduate', recruitment_type='On-Campus', job_level=2, rating='3.0', awards=0, certifications=0, salary=26824.5, gender='Female', entry_date=datetime.date(2017, 7, 25), satisfied=1),
 Row(_c0=4, age=33, department='HR', educa

Das RDD besteht aus einzelnen Rows. Mittels Index kann auf die einzelnen Rows zugegriffen werden. Der Zugriff auf die Spalten erfolgt entweder auch durch die Angabe des Index oder durch den Spaltennamen.

Lassen Sie sich das Gehalt der ersten Row ausgeben.

In [34]:
# Erste Zeile (Row) des RDD erhalten
first_row = rdd.first() #third_row = rdd.take(3)[2]

# Gehalt der ersten Zeile ausgeben
print(f"Das Gehalt der ersten Row ist {first_row['salary']}.")


Das Gehalt der ersten Row ist 78075.0.


### Verarbeitung
*Hinweis: Für die Bearbeitung der folgenden Aufaben ist es hilfreich, sich die `pyspark.sql.functions` anzusehen. Diese sind bereits importiert und können mit `f.<function_name>` verwendet werden.*

Fügen Sie eine neue Spalte `service_days` hinzu, in der angegeben wird, seit wie vielen Tagen ein\*e Angestellte\*r im Unternehmen ist. Verwenden Sie als Stichtag den 01.01.2021 (`comparison`)

In [27]:
comparison = "2021-01-01"
comparison = f.to_date(f.lit(comparison))
comparison

Column<'to_date(2021-01-01)'>

In [29]:
# Ihre Lösung
# Neue Spalte hinzufügen
df = df.withColumn("service_days", f.datediff(comparison, f.to_date("entry_date")))

# DataFrame anzeigen
df.show()

+---+---+----------+-------------+------------------+---------+------+------+--------------+-------+------+----------+---------+------------+
|_c0|age|department|    education|  recruitment_type|job_level|rating|awards|certifications| salary|gender|entry_date|satisfied|service_days|
+---+---+----------+-------------+------------------+---------+------+------+--------------+-------+------+----------+---------+------------+
|  0| 28|        HR| Postgraduate|          Referral|        5|   2.0|     1|             0|78075.0|  Male|2019-02-01|        1|         700|
|  1| 50|Technology| Postgraduate|Recruitment Agency|        3|   5.0|     2|             1|38177.1|  Male|2017-01-17|        0|        1445|
|  2| 43|Technology|Undergraduate|          Referral|        4|   1.0|     2|             0|59143.5|Female|2012-08-27|        1|        3049|
|  3| 44|     Sales| Postgraduate|         On-Campus|        2|   3.0|     0|             0|26824.5|Female|2017-07-25|        1|        1256|
|  4| 

Lassen Sie sich anschließend das Eintrittsdatum und die Tage im Unternehmen von allen Angestellten anzeigen, die länger als 6000 Tage im Unternehmen sind.

In [40]:
# Ihre Lösung
# DataFrame filtern
df_filtered = df.filter(df['service_days'] > 6000)

# Eintrittsdatum und Tage im Unternehmen anzeigen
df_filtered.select('entry_date', 'service_days').show()


+----------+------------+
|entry_date|service_days|
+----------+------------+
|2004-04-22|        6098|
|2004-03-19|        6132|
|2004-01-16|        6195|
|2004-07-13|        6016|
|2004-02-24|        6156|
|2004-07-15|        6014|
|2004-06-20|        6039|
|2004-01-20|        6191|
|2004-06-01|        6058|
|2004-04-19|        6101|
|2004-04-13|        6107|
|2004-07-05|        6024|
|2004-06-08|        6051|
|2004-01-20|        6191|
|2004-01-22|        6189|
|2004-01-05|        6206|
|2004-04-17|        6103|
|2004-07-05|        6024|
+----------+------------+



### Analyse

Lassen Sie sich die Departments des Datensatzes anzeigen (jedes nur einmal).

In [None]:
# Ihre Lösung
# Einzigartige Departments anzeigen
df.select("Department").distinct().show()


Ermitteln Sie (wie eben bei der Arbeit mit RDDs) die Anzahl Angestellte pro Department.

In [None]:
# Ihre Lösung
# Anzahl der Angestellten pro Abteilung ermitteln
df_dept_counts = df.groupBy("Department").count()

# Ergebnisse ausgeben
df_dept_counts.show()


Geben Sie durchschnittliche Zufriedenheit aller Personen aus, die in der Abteilung *HR* arbeiten, sowie das Alter der ältesten Person der Abteilung.

In [32]:
# Ihre Lösung
df.where(f.col('Department')=='HR') \
  .agg({'satisfied': 'mean', 'age': 'max'}) \
  .show()

+------------------+--------+
|    avg(satisfied)|max(age)|
+------------------+--------+
|0.6886792452830188|      54|
+------------------+--------+



Wie erstellen nun ein weiteres DataFrame. Erstellen Sie aus den Daten in `dept_data` das DataFrame `departments` mit den angegebenen Spaltennamen. Geben Sie das erzeugte DataFrame aus.

In [43]:
dept_data = [("Sales", "New York", 10001), 
            ("HR", "Los Angeles", 90001),
            ("Purchasing", "San Francisco", 94104),
            ("Marketing", "Philadelphia", 19110),
            ("Technology", "Los Angeles", 90099)]

column_names = ["name", "location", "zip_code"]

In [44]:
# Ihre Lösung
departments = spark.createDataFrame(dept_data, column_names)
departments.show(5)

+----------+-------------+--------+
|      name|     location|zip_code|
+----------+-------------+--------+
|     Sales|     New York|   10001|
|        HR|  Los Angeles|   90001|
|Purchasing|San Francisco|   94104|
| Marketing| Philadelphia|   19110|
|Technology|  Los Angeles|   90099|
+----------+-------------+--------+



Wie viele Angestellte arbeiten in Los Angeles?

In [42]:
# Ihre Lösung
df.join(departments, df.department == departments.name) \
  .where(departments.location == "Los Angeles") \
  .count()

204

Lassen Sie sich die ersten 10 Gehälter der Angestellten ausgeben, die an der Ostküste arbeiten (in diesem Fall `zip_code < 90000`).

In [49]:
# Ihre Lösung
df.join(departments, departments.name == df.department) \
  .where(departments.zip_code < 90000) \
  .select(df.salary) \
  .show(10)

+-------+
| salary|
+-------+
|38177.1|
|38177.1|
|59143.5|
|21668.4|
|59143.5|
|21668.4|
|78075.0|
|26824.5|
|38177.1|
|78075.0|
+-------+
only showing top 10 rows



### Caching
Durch die In-Memory-Verarbeitung werden DataFrames in der Regel nicht zwischengespeichert. Mit jeder Action werden alle Verarbeitungsschritte neu ausgeführt. Manchmal kann es aber sinnvoll sein, ein DataFrame zwischenzuspeichern (zu *cachen*). Beim Aufruf der ersten Action wird das DataFrame dann im Speicher gehalten.

(*Hinweis*: Gleiches gilt natürlich auch für RDDs, da DataFrames ja auf RDDs basieren)

Betrachten Sie den folgenden Code: An welcher Stelle könnte es sinnvoll sein, ein Caching durchzuführen und warum? Fügen Sie den Code für das Caching ein. Schreiben Sie in die untere Zelle Ihre Begründung und beschreiben Sie, welche DataFrames auf die gecachte Version zugreifen.

```python
df1 = spark.read.csv(path = "my_data.csv", header = True)
df2 = df1.filter(df1.salary.between(40000, 50000))
df3 = df2.select("name", "salary", "department", "satisfied")

# Cache df3
df3.cache()

df4 = df3.groupby("department").agg({"satisfied": "mean"})
df4.show()
df5 = df3.groupby("department").agg({"salary": "mean"})
df5.show()
df6 = df3.select("name").where(df3["satisfied"] == 1)
df6.show()
```

## SQL
Im Folgenden verwenden wir SQL zur Datenabfrage. Dafür erstellen wir zunächst eine temporale View, auf der dann die SQL-Abfragen erfolgen.

In [50]:
spark.sql("\
CREATE TEMP VIEW employees \
USING CSV \
OPTIONS (path = './employees_satisfaction_transformed.csv', header = 'True', inferSchema = 'True')")

DataFrame[]

Ermitteln Sie nun wieder die Anzahl der Angestellten pro Department, aber verwenden Sie dieses Mal SQL.

In [51]:
# Ihre Lösung
spark.sql("""
SELECT department, COUNT(*) as employee_count
FROM employees
GROUP BY department
""").show()

+----------+--------------+
|department|employee_count|
+----------+--------------+
|     Sales|            87|
|        HR|           106|
|Purchasing|           114|
| Marketing|            93|
|Technology|            98|
+----------+--------------+



## Ressourcen-Vebrauch

Sie arbeiten in diesen Übungsaufgaben mit ihrem eigenen Cluster. In der Praxis ist dies natürlich selten der Fall, hier werden meist viele Jobs gleichzeitig gestartet und konkurrieren um Ressourcen.

Öffnen Sie die __[WebUI von YARN](http://127.0.0.1:8088)__ und sehen sich die aktuellen Applikationen an. Sie sollten dort das aktuelle Jupyter Notebook finden. Solange die SparkSession (der SparkContext) erstellt ist, wird das Notebook dort als *RUNNING* aufgelistet. 

Wie viele Kerne und wie viel Speicher verbraucht die Spark-Applikation? Wie können Sie diese Summe berechnen?

*Hinweis: Werfen Sie einen Blick in die Datei `spark-defaults.conf` im `SPARK_CONF_DIR`.*

*Sollten Sie nicht wissen, wie Sie diese Datei finden können, finden Sie unter dem "Show Solution"-Button hilfreiche Kommandos.*

- Über "New" -> "Terminal" können Sie ein Terminal öffnen. Alternativ können Sie in einer Code-Zelle einen Bash-Befehl ausführen, indem Sie ein ! an den Anfang der Zelle schreiben
- Zugriff auf den Inhalt einer Umgebungs-Variablen mittels `$`, z.B. `$SPARK_CONF_DIR`
- Anzeigen von Dateiinhalten: `cat`

Stoppen Sie den SparkContext, um alle Ressourcen, die dieses Notebook benötigt, freizugeben.

In [75]:
# Ihre Lösung
sc.stop()

Öffnen Sie ein Terminal und starten Sie eine pyspark-Shell mit YARN als Ressourcen-Manager im client-mode mit 2 Exekutoren mit jeweils 1GB Memory und 2 Cores. Wie viele Cores und wie viel Speicher benötigt die Applikation und wie setzen sich diese Zahlen zusammen? Kontrollen Sie in der WebUI von YARN, ob Ihre Berechnungen korrekt sind.

Starten Sie anschließend in einem zweiten Terminal eine weitere pyspark-Shell (ohne die erste Shell zu beenden) mit YARN als Ressourcen-Manager im client-mode mit der gleichen Konfiguration (2 Exekutoren mit jeweils 1GB Memory und 2 Cores). Erstellen Sie dort ein RDD mit den Werten 1, 2, 3, 4 und lassen sich dieses mittels `collect()` ausgeben.

Was passiert und warum? Wie können Sie das Problem beheben?

Schließen Sie beide Shells und starten anschließend eine neue pyspark-Shell mit folgenden Eigenschaften:
- YARN als Ressourcen-Manager im client-mode
- 2 Exekutoren mit jeweils
    - 2GB Memory
    - 3 Cores
    
Wie viele Ressourcen vebraucht diese Applikation und wie setzen sich diese zusammen?

```python
export PYSPARK_PYTHON=python3
pyspark --master yarn --deploy-mode client --executor-memory 2g --executor-cores 3 --num-executors 2
```

In [73]:
from pyspark import SparkConf, SparkContext

# Erstellen Sie ein SparkConf-Objekt
conf = SparkConf()

conf.set('spark.executor.instances', '2')
conf.set('spark.executor.memory', '2g')
conf.set('spark.executor.cores', '3')

spark2 = SparkSession.builder \
    .master('yarn') \
    .appName("My Spark Application") \
    .config(conf = conf) \
    .getOrCreate()

print("Application ID: ", spark2.sparkContext.applicationId)


Application ID:  local-1708355894509


Starten Sie anschließend in einem zweiten Terminal noch eine pypsark-Shell (ohne die erste Shell zu beenden) mit folgendem Befehl:

`pyspark --master yarn --conf spark.yarn.am.cores=3`

Was passiert und warum? Überprüfen Sie auch den Status der Applikation in der WebUI von YARN.