## 🧠 1. Was ist Spark? Warum PySpark?

> "Alle reden über Spark — aber was löst es eigentlich?"

- **Apache Spark** ist eine verteilte Datenverarbeitungs-Engine.
  - Anstatt dass ein Computer alles alleine macht (wie dein Laptop mit Python), verteilt Spark die Arbeit auf *viele* Rechner oder CPU-Kerne.
  - *Einsatzgebiet*: Riesige Datenmengen und Machine Learning auf Skalenniveau.

- **Warum nicht einfach Pandas oder reines Python?**
  - *Wenn die Daten bequem in den Arbeitsspeicher passen → Pandas ist völlig ausreichend.*
  - *Sind die Daten größer als der RAM → Pandas bricht zusammen.*
  - *Wenn echte Parallelverarbeitung gefragt ist → Spark ist die richtige Wahl.*

- **Warum PySpark?**
  - Spark ist in Scala und Java geschrieben — aber wer will für normale Datenverarbeitung in Java rumfrickeln?
  - **PySpark** erlaubt dir, Spark mit Python zu steuern. Fast alle Vorteile, aber einfacher zu schreiben.
 
- **Wann Scala/Java doch besser ist?**
    - Wenn du maximale Performance brauchst (z. B. komplexe UDFs).
    - Bei extrem großen Datasets (Python-Overhead kann spürbar werden).
    - Für Spark-Interna-Entwicklung (z. B. eigene Spark-Erweiterungen).

**Ehrliche Einschätzung**:  
> Nutze Spark nur, wenn du *musst* (Daten zu groß, Performance wird kritisch).  
> Ansonsten: **Pandas > PySpark** bei kleinen bis mittleren Daten.

## ⚙️ 2. Cluster Mode vs Local Mode

Stell dir vor, du hast eine schwere Datenverarbeitung.

| Modus | Was passiert | Wann verwenden | Wichtigster Punkt |
|:----|:------------|:----------------|:-----------|
| **Local Mode** | Spark läuft auf deinem Rechner, nutzt mehrere CPU-Kerne. | Entwicklung, Tests, kleine Datenmengen. | *Trainingsmodus.* |
| **Cluster Mode** | Spark läuft auf mehreren Servern (Nodes). | Produktion, Big Data. | *Echte Skalierung.* |

**Skizze:**

```plaintext
Local Mode:
+-----------------+
| Laptop          |
| [Core1][Core2]  |
| [Core3][Core4]  |
+-----------------+

Cluster Mode:
+--------------------+     +-------------------+     +-------------------+
| Worker Node 1      |     | Worker Node 2      |     | Worker Node 3      |
| [Task1][Task2]     |     | [Task3][Task4]     |     | [Task5][Task6]     |
+--------------------+     +-------------------+     +-------------------+
          \                    |                    /
           \                   |                   /
                +----------------------------------+
                |        Spark Driver Program     |
                +----------------------------------+
```

> **Wichtig**: *Im Cluster Mode steuert ein Programm viele Maschinen.*  
> **Local Mode simuliert nur ein Mini-Cluster auf deinem Laptop.*

## ⚙️ 3. Spark Architecture

In [1]:
from IPython.display import Image

Image(url="https://spark.apache.org/docs/latest/img/cluster-overview.png")

### Spark Architektur: Cluster Overview)

Dieses Diagramm zeigt **wie Spark verteilt arbeitet**:

| Komponente | Beschreibung |
|:---|:---|
| **Driver Program** | Dein Hauptprogramm. Es steuert alles: Job-Aufteilung, Kommunikation, Fehlerbehandlung. |
| **SparkContext** | Die zentrale Verbindung vom Driver zu Spark. Du programmierst damit die Aktionen und Transformationen. |
| **Cluster Manager** | Verwaltet die Ressourcen im Cluster (CPU, RAM). Beispiele: YARN, Kubernetes, Standalone. |
| **Worker Nodes** | Die Maschinen, die die eigentliche Datenverarbeitung übernehmen. |
| **Executor** | Ein Prozess auf einem Worker, der Tasks ausführt und Daten im Speicher hält. |
| **Task** | Die kleinste Recheneinheit. Ein Spark-Job wird in viele Tasks aufgeteilt. |
| **Cache** | Zwischenspeicher (RAM) für Daten, die mehrfach gebraucht werden, damit sie nicht immer neu berechnet werden müssen. |

---

### 🔥 Wichtig zu verstehen:

- **Driver** → Teilt Jobs auf und schickt sie über den **Cluster Manager** an die **Worker Nodes**.
- Jeder **Worker** hat mindestens einen **Executor**, der wiederum mehrere **Tasks** parallel ausführt.
- **Caches** sparen Zeit, indem sie Daten im RAM statt auf der Platte halten.

---

### 🧠 Skeptische Beobachtungen:

- **Single Point of Failure**: Wenn der Driver abstürzt, ist der ganze Job verloren (außer du hast High Availability konfiguriert).
- **Ressourcenverschwendung möglich**: Executors brauchen RAM – schlecht abgestimmt → viele Out-of-Memory-Fehler.
- **Netzwerk Bottleneck**: Schweres Shuffling (viel Datentausch zwischen Workern) kann Spark-Jonehmer nicht einfach abschalten.)

# ✨ 4.`SparkSession` als Einstiegspunkt

- In Spark 2.0 und neuer ist **`SparkSession`** der *offizielle Einstiegspunkt* für jede Spark-Anwendung.
- Früher musste man `SparkContext`, `SQLContext`, `HiveContext` usw. separat erstellen — heute bündelt `SparkSession` alles in einem Objekt.

**Merksatz**:  
> **Ohne SparkSession → kein Zugriff auf Spark.**

**Beispiel:**

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Localspark") \
    .master("local[*]") \
    .getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

In [22]:
df = spark.createDataFrame(
    [("Alice", "Engineering", 65000, 28),
    ("Bob", "Marketing", 58000, 32),
    ("Carol", "Engineering", 72000, 35)], 
                           ["name", "department", "salary", "age"])

In [23]:
df.show()

+-----+-----------+------+---+
| name| department|salary|age|
+-----+-----------+------+---+
|Alice|Engineering| 65000| 28|
|  Bob|  Marketing| 58000| 32|
|Carol|Engineering| 72000| 35|
+-----+-----------+------+---+



In [2]:
type(spark)

pyspark.sql.session.SparkSession

In [3]:
type(spark.sparkContext)

pyspark.context.SparkContext

## 🆚 5. DataFrame vs RDD (nur die Basics)

| Feature | DataFrame | RDD |
|:--------|:----------|:----|
| **Definition** | Tabelle mit Spalten und Datentypen (ähnlich SQL oder Pandas) | Rohes, unstrukturiertes Datenset (verteilte Liste von Objekten) |
| **Benutzerfreundlichkeit** | Hoch – SQL-ähnliche Operationen | Niedrig – eigene Map/Reduce-Logik schreiben |
| **Performance** | Optimiert durch Catalyst & Tungsten Engine | Weniger optimiert (du musst dich selbst um Performance kümmern) |
| **Anwendungsfall** | Klassische Datenverarbeitung, Analytics, Machine Learning Pipelines | Low-Level Transformationen, wenn extreme Flexibilität gebraucht wird |

**Skeptische Wahrheit**:  
> Wer heute noch direkt mit RDDs arbeitet, hat meist ein sehr spezielles Problem — oder kein Vertrauen in Spark-Optimierungen.

In [27]:
from pyspark import SparkContext

sc = SparkContext.getOrCreate()

# RDD aus Liste erstellen (verteilte Rohdaten)
rdd = sc.parallelize([
    ("Müller", 35, "Berlin"),
    ("Schmidt", 28, "München")
])

data = rdd.collect()
print(data)

[('Müller', 35, 'Berlin'), ('Schmidt', 28, 'München')]


In [28]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# DataFrame mit Schema erstellen
df = spark.createDataFrame(
    [("Müller", 35, "Berlin"), ("Schmidt", 28, "München")],
    ["Name", "Alter", "Stadt"]
)

df.show()

+-------+-----+-------+
|   Name|Alter|  Stadt|
+-------+-----+-------+
| Müller|   35| Berlin|
|Schmidt|   28|München|
+-------+-----+-------+



In [32]:
# Map-Operation (manuelle Transformation)
rdd_mapped = rdd.map(lambda x: (x[0], x[1] * 2))  # Verdopple das Alter
print(rdd_mapped.collect()) 

[('Müller', 70), ('Schmidt', 56)]


In [30]:
# SQL-ähnliche Operation
df_transformed = df.select("Name", (df.Alter * 2).alias("Doppeltes_Alter"))
df_transformed.show()

+-------+---------------+
|   Name|Doppeltes_Alter|
+-------+---------------+
| Müller|             70|
|Schmidt|             56|
+-------+---------------+



In [40]:
# Durchschnittsalter in einem Schritt berechnen
durchschnittsalter = rdd.map(lambda x: (x[1], 1)) \
                       .reduce(lambda a, b: (a[0] + b[0], a[1] + b[1]))

print(durchschnittsalter)
durchschnittsalter = durchschnittsalter[0] / durchschnittsalter[1]
print("Durchschnittsalter:", durchschnittsalter)

(63, 2)
Durchschnittsalter: 31.5


Was passiert Schritt für Schritt:

    1. Vorbereitung:

        - Ihr RDD enthält nach der map-Operation Paare von (Alter, 1)

        - Beispiel: [(35, 1), (28, 1)]

    2. Erster Reduzierungsschritt:

        - a = (35, 1), b = (28, 1)

        - Die Lambda-Funktion berechnet:

            a[0] + b[0] → 35 + 28 → 63 (Summe der Alter)

            a[1] + b[1] → 1 + 1 → 2 (Anzahl der Personen)

        - Ergebnis: (63, 2)

    3. Endergebnis:

        - Da nur zwei Elemente im RDD sind, ist dies das finale Ergebnis

In [42]:
from pyspark.sql import functions as F

df.agg(F.avg("Alter").alias("Durchschnittsalter")).show()

+------------------+
|Durchschnittsalter|
+------------------+
|              31.5|
+------------------+



## 🔄 6. Lebenszyklus eines DataFrames in Spark

| Phase                  | Was passiert?                                                                                                  | Warum es wichtig ist                                         |
| ---------------------- | -------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------ |
| **1 Transformation**   | Du **schreibst** Befehle (`select`, `filter`, `join`). Spark merkt sie sich – aber rechnet noch **gar nicht**. | Kein Warten, du kannst erst mal “Pipeline basteln”.          |
| **2 Logischer Plan**   | Spark malt sich eine **To-do-Liste**: „Welche Spalten brauche ich? Welche Tabelle wird womit verknüpft?“       | Tippfehler wie „Spalte existiert nicht“ fliegen hier auf.    |
| **3 Optimierter Plan** | Spark **sortiert** die To-do-Liste, damit es schneller geht (z. B. zuerst filtern → dann join).                | Der Optimierer (Catalyst-Optimierer + Adaptive Query Execution (AQE)): gut sortiert = viel weniger Daten bewegen.        |
| **4 Physischer Plan**  | Spark sagt jetzt: „So verteile ich die Arbeit auf die Kerne / Rechner – los geht’s!“                           | Hier entstehen Kosten wie Shuffle oder Sortieren.            |
| **5 Action**           | Ein Befehl wie `show()`, `count()` oder `write()` **startet** die Ausführung.                                  | Erst jetzt siehst du Tasks & Stages im Spark-UI (Port 4040). |




### Job vs Stage vs Task

| Ebene     | Was ist das? (auf den Punkt)                                                                                                                              | Wann entsteht sie?                                                                                   | Parallelität                                  |
| --------- | --------------------------------------------------------------------------------------------------------------------------------------------------------- | ---------------------------------------------------------------------------------------------------- | --------------------------------------------- |
| **Job**   | **Ein Action-Aufruf** wie `show()`, `count()`, `write()` startet einen Job.                                                                               | Immer genau dann, wenn du so eine Action ausführst.                                                  | Besteht aus mehreren Stages.                  |
| **Stage** | **Ein Arbeitsschritt, den Spark ohne neues Verteilen der Daten durchrechnen kann.**<br>(Über das „Daten-Verteilen“ – Shuffle – sprechen wir gleich noch.) | Spark setzt eine neue Stage, sobald Daten neu sortiert / über das Netzwerk geschickt werden müssten. | Viele Tasks laufen parallel in einer Stage.   |
| **Task**  | **Die kleinste Aufgabe:** „Verarbeite Partition X“                                                                                                        | Spark erzeugt 1 Task pro Partition in der Stage.                                                     | 1 Task benutzt 1 CPU-Kern auf einem Executor. |


In [8]:
from pyspark.sql import SparkSession

# 1️⃣ Session starten (lokal mit 4 Kernen = 4 parallele Tasks pro Stage)
spark = (
    SparkSession.builder
    .master("local[4]")
    .appName("jobs-stages-tasks-demo")
    .getOrCreate()
)

# 2️⃣ Mini-DataFrame im Code erzeugen
data = [
    ("Alice", "Engineering", 65000),
    ("Bob",   "Marketing",  58000),
    ("Carol", "Engineering", 72000),
    ("Dave",  "Marketing",  55000),
    ("Eve",   "Engineering", 90000),
]
cols = ["name", "department", "salary"]
df = spark.createDataFrame(data, cols)

# 3️⃣ Spark zwingen, mindestens einen Shuffle zu bauen
#spark.conf.set("spark.sql.shuffle.partitions", 4)  # → 4 Tasks in der Reduce-Stage

result = (
    df.repartition("department")            # löst einen Shuffle aus (Stage 0)
      .groupBy("department")                # Reduce-Seite (Stage 1)
      .avg("salary")
      .orderBy("avg(salary)", ascending=False)
)

# 4️⃣ Planschilderung anzeigen (Job noch nicht gestartet)
result.explain("simple")

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [avg(salary)#114 DESC NULLS LAST], true, 0
   +- Exchange rangepartitioning(avg(salary)#114 DESC NULLS LAST, 200), ENSURE_REQUIREMENTS, [plan_id=334]
      +- HashAggregate(keys=[department#105], functions=[avg(salary#106L)])
         +- HashAggregate(keys=[department#105], functions=[partial_avg(salary#106L)])
            +- Exchange hashpartitioning(department#105, 200), REPARTITION_BY_COL, [plan_id=329]
               +- Project [department#105, salary#106L]
                  +- Scan ExistingRDD[name#104,department#105,salary#106L]




In [9]:
# 5️⃣ Action → start Job → öffne Spark-UI unter http://localhost:4040
result.show()

+-----------+-----------------+
| department|      avg(salary)|
+-----------+-----------------+
|Engineering|75666.66666666667|
|  Marketing|          56500.0|
+-----------+-----------------+



> **Shuffling** -> Spark muss Daten quer über alle Worker „umparken“, damit Zeilen, die zusammengehören, auch wirklich zusammenliegen.
>
> Du gruppierst oder joinst nach einer Spalte – Spark braucht dann alle Zeilen desselben Keys in derselben Partition. Dafür werden Blöcke durchs Netzwerk geschickt → das nennt man Shuffle.

## 📊 7. Daten erkunden und transformieren

### Dataframe erstellen

In [7]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("DeutscheDaten").getOrCreate()

# DataFrame mit deutschen Spaltennamen erstellen
data = [
    ("Müller", 35, "Berlin", 4000),
    ("Schmidt", 28, "München", 3200),
    ("Fischer", 42, "Hamburg", 5100)
]
df = spark.createDataFrame(data, ["Name", "Alter", "Stadt", "Gehalt"])
df.show()

+-------+-----+-------+------+
|   Name|Alter|  Stadt|Gehalt|
+-------+-----+-------+------+
| Müller|   35| Berlin|  4000|
|Schmidt|   28|München|  3200|
|Fischer|   42|Hamburg|  5100|
+-------+-----+-------+------+



In [50]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Mit expliziten Datentypen
schema = StructType([
    StructField("Name", StringType(), True),
    StructField("Alter", IntegerType(), True),
    StructField("Stadt", StringType(), True),
    StructField("Gehalt", IntegerType(), True)
])

df_mit_schema = spark.createDataFrame(data, schema)
df_mit_schema.printSchema()


root
 |-- Name: string (nullable = true)
 |-- Alter: integer (nullable = true)
 |-- Stadt: string (nullable = true)
 |-- Gehalt: integer (nullable = true)



In [48]:
df_mit_schema.show()

+-------+-----+-------+------+
|   Name|Alter|  Stadt|Gehalt|
+-------+-----+-------+------+
| Müller|   35| Berlin|  4000|
|Schmidt|   28|München|  3200|
|Fischer|   42|Hamburg|  5100|
+-------+-----+-------+------+



### Übung 1: Grundlegende DataFrame-Erstellung

Aufgabe: Erstelle ein DataFrame mit Mitarbeiterdaten, das folgende Spalten enthält:

    Vorname (z.B. "Anna", "Thomas")
    Abteilung (z.B. "IT", "Vertrieb")
    Eintrittsjahr (z.B. 2015, 2020)

In [51]:
# Lösung
mitarbeiter_data = [
    ("Anna", "IT", 2018),
    ("Thomas", "Vertrieb", 2015),
    ("Julia", "HR", 2020)
]
mitarbeiter_df = spark.createDataFrame(mitarbeiter_data, ["Vorname", "Abteilung", "Eintrittsjahr"])
mitarbeiter_df.show()

+-------+---------+-------------+
|Vorname|Abteilung|Eintrittsjahr|
+-------+---------+-------------+
|   Anna|       IT|         2018|
| Thomas| Vertrieb|         2015|
|  Julia|       HR|         2020|
+-------+---------+-------------+



### CSV Lesen

In [137]:
import requests

# Korrekte RAW-URL
url = "https://raw.githubusercontent.com/gadanes/spark_kurs/main/notebooks/data.csv"

# Lade die CSV-Datei herunter
response = requests.get(url)
with open("/tmp/data.csv", "wb") as f:
    f.write(response.content)

# Lese die CSV-Datei mit Spark ein
df = spark.read.option("header", "true").csv("/tmp/data.csv")
df.show(100)

+----------+----------+---+----------+--------------------+-------------------+------+----------+----------------+
|first_name| last_name|age|      city|               email|          job_title|salary|department|years_experience|
+----------+----------+---+----------+--------------------+-------------------+------+----------+----------------+
|      Anna|   Mueller| 28|    Berlin|anna.mueller@gmai...|       Data Analyst| 52000|      Tech|               4|
|       Ben|   Schmidt| 35|   Hamburg|ben.schmidt@outlo...|  Software Engineer|  NULL|      Tech|              12|
|     Clara|     Klein| 22|    Munich|clara.klein@examp...|Marketing Assistant| 42000|     Other|               1|
|     David| Schneider| 40|   Cologne|david.schneider@c...|    Project Manager| 83000|Management|              15|
|       Eva|     Huber| 31| Stuttgart|    eva.huber@web.de|     Data Scientist|  NULL|      Tech|               6|
|     Felix|    Wagner| 29|    Berlin|felix.wagner@exam...|    DevOps Engineer| 

### Spalten auswählen (`select`)

Mit `.select()` kannst du gezielt bestimmte Spalten aus dem DataFrame auswählen.

**Merke:**  
> `.select()` **verändert** das ursprüngliche DataFrame **nicht**.  
> Es erzeugt eine **neue** Version.

In [58]:
df.select("first_name", "last_name", "city", "job_title").show()

+----------+---------+----------+-------------------+
|first_name|last_name|      city|          job_title|
+----------+---------+----------+-------------------+
|      Anna|  Mueller|    Berlin|       Data Analyst|
|       Ben|  Schmidt|   Hamburg|  Software Engineer|
|     Clara|    Klein|    Munich|Marketing Assistant|
|     David|Schneider|   Cologne|    Project Manager|
|       Eva|    Huber| Stuttgart|     Data Scientist|
|     Felix|   Wagner|    Berlin|    DevOps Engineer|
|      Gina|  Fischer|   Hamburg|         HR Manager|
|    Hannah|     Koch| Frankfurt|        UI Designer|
|      Igor|   Keller|    Munich|      Sales Manager|
|     Julia|  Schmitt|Düsseldorf|    Content Creator|
|      Karl|    Bauer|    Berlin|                CTO|
|      Lina|    Maier| Stuttgart|         Accountant|
|       Max|    Frank|   Cologne|   Network Engineer|
|      Nina|  Lehmann|   Hamburg|      Product Owner|
|    Oliver|    Weber|    Berlin|  Backend Developer|
|     Paula| Hartmann|    Mu

In [61]:
import pyspark.sql.functions as F

In [67]:
df.select(F.concat(F.col("first_name") , F.lit(" ") , F.col("last_name")), "city", "job_title").show()

+--------------------------------+----------+-------------------+
|concat(first_name,  , last_name)|      city|          job_title|
+--------------------------------+----------+-------------------+
|                    Anna Mueller|    Berlin|       Data Analyst|
|                     Ben Schmidt|   Hamburg|  Software Engineer|
|                     Clara Klein|    Munich|Marketing Assistant|
|                 David Schneider|   Cologne|    Project Manager|
|                       Eva Huber| Stuttgart|     Data Scientist|
|                    Felix Wagner|    Berlin|    DevOps Engineer|
|                    Gina Fischer|   Hamburg|         HR Manager|
|                     Hannah Koch| Frankfurt|        UI Designer|
|                     Igor Keller|    Munich|      Sales Manager|
|                   Julia Schmitt|Düsseldorf|    Content Creator|
|                      Karl Bauer|    Berlin|                CTO|
|                      Lina Maier| Stuttgart|         Accountant|
|         

In [70]:
df.select(F.concat("first_name", lit(" ") , "last_name"), "city", "job_title").show()

+--------------------------------+----------+-------------------+
|concat(first_name,  , last_name)|      city|          job_title|
+--------------------------------+----------+-------------------+
|                    Anna Mueller|    Berlin|       Data Analyst|
|                     Ben Schmidt|   Hamburg|  Software Engineer|
|                     Clara Klein|    Munich|Marketing Assistant|
|                 David Schneider|   Cologne|    Project Manager|
|                       Eva Huber| Stuttgart|     Data Scientist|
|                    Felix Wagner|    Berlin|    DevOps Engineer|
|                    Gina Fischer|   Hamburg|         HR Manager|
|                     Hannah Koch| Frankfurt|        UI Designer|
|                     Igor Keller|    Munich|      Sales Manager|
|                   Julia Schmitt|Düsseldorf|    Content Creator|
|                      Karl Bauer|    Berlin|                CTO|
|                      Lina Maier| Stuttgart|         Accountant|
|         

In [72]:
df.select(F.concat_ws(" ", "first_name", "last_name").alias("full_name"), "city", "job_title").show()

+---------------+----------+-------------------+
|      full_name|      city|          job_title|
+---------------+----------+-------------------+
|   Anna Mueller|    Berlin|       Data Analyst|
|    Ben Schmidt|   Hamburg|  Software Engineer|
|    Clara Klein|    Munich|Marketing Assistant|
|David Schneider|   Cologne|    Project Manager|
|      Eva Huber| Stuttgart|     Data Scientist|
|   Felix Wagner|    Berlin|    DevOps Engineer|
|   Gina Fischer|   Hamburg|         HR Manager|
|    Hannah Koch| Frankfurt|        UI Designer|
|    Igor Keller|    Munich|      Sales Manager|
|  Julia Schmitt|Düsseldorf|    Content Creator|
|     Karl Bauer|    Berlin|                CTO|
|     Lina Maier| Stuttgart|         Accountant|
|      Max Frank|   Cologne|   Network Engineer|
|   Nina Lehmann|   Hamburg|      Product Owner|
|   Oliver Weber|    Berlin|  Backend Developer|
| Paula Hartmann|    Munich|   Product Designer|
| Quentin Schulz| Frankfurt|         Consultant|
|      Rita Lang|Düs

### Übung 2: Grundlegende DataFrame-Erstellung

Aufgabe: Zeige vollständigen Namen, Berufsbezeichnung und Monatsgehalt an

In [86]:
# Zeige vollständigen Namen, Berufsbezeichnung und Monatsgehalt an
df.select(
    F.concat_ws(" ", "first_name", "last_name").alias("full_name"),
    "job_title",
    F.round(F.col("salary") / 12, 2).alias("monatsgehalt")
).show()


+---------------+-------------------+------------+
|      full_name|          job_title|monatsgehalt|
+---------------+-------------------+------------+
|   Anna Mueller|       Data Analyst|     4333.33|
|    Ben Schmidt|  Software Engineer|        NULL|
|    Clara Klein|Marketing Assistant|      3500.0|
|David Schneider|    Project Manager|     6916.67|
|      Eva Huber|     Data Scientist|        NULL|
|   Felix Wagner|    DevOps Engineer|     5633.33|
|   Gina Fischer|         HR Manager|      5750.0|
|    Hannah Koch|        UI Designer|        NULL|
|    Igor Keller|      Sales Manager|      6250.0|
|  Julia Schmitt|    Content Creator|     3416.67|
|     Karl Bauer|                CTO|     10000.0|
|     Lina Maier|         Accountant|        NULL|
|      Max Frank|   Network Engineer|     5916.67|
|   Nina Lehmann|      Product Owner|     5533.33|
|   Oliver Weber|  Backend Developer|     5583.33|
| Paula Hartmann|   Product Designer|     5416.67|
| Quentin Schulz|         Consu

### Zeilen filtern (`filter`, `where`)

Mit `.filter()` oder `.where()` kannst du Zeilen nach Bedingungen auswählen.

In [92]:
df.filter(df.age >= 30).show()

+----------+---------+---+---------+--------------------+-----------------+------+----------+----------------+
|first_name|last_name|age|     city|               email|        job_title|salary|department|years_experience|
+----------+---------+---+---------+--------------------+-----------------+------+----------+----------------+
|       Ben|  Schmidt| 35|  Hamburg|ben.schmidt@outlo...|Software Engineer|  NULL|      Tech|              12|
|     David|Schneider| 40|  Cologne|david.schneider@c...|  Project Manager| 83000|Management|              15|
|       Eva|    Huber| 31|Stuttgart|    eva.huber@web.de|   Data Scientist|  NULL|      Tech|               6|
|      Gina|  Fischer| 45|  Hamburg|gina.fischer@yaho...|       HR Manager| 69000|Management|              20|
|      Igor|   Keller| 38|   Munich|  igor.keller@gmx.de|    Sales Manager| 75000|Management|              14|
|      Karl|    Bauer| 50|   Berlin|karl.bauer@exampl...|              CTO|120000|     Other|              25|
|

In [90]:
df.filter(df.salary > 80000).show()

+----------+---------+---+-------+--------------------+---------------+------+----------+----------------+
|first_name|last_name|age|   city|               email|      job_title|salary|department|years_experience|
+----------+---------+---+-------+--------------------+---------------+------+----------+----------------+
|     David|Schneider| 40|Cologne|david.schneider@c...|Project Manager| 83000|Management|              15|
|      Karl|    Bauer| 50| Berlin|karl.bauer@exampl...|            CTO|120000|     Other|              25|
+----------+---------+---+-------+--------------------+---------------+------+----------+----------------+



In [93]:
df.where(df.age > 30).show()

+----------+---------+---+---------+--------------------+-----------------+------+----------+----------------+
|first_name|last_name|age|     city|               email|        job_title|salary|department|years_experience|
+----------+---------+---+---------+--------------------+-----------------+------+----------+----------------+
|       Ben|  Schmidt| 35|  Hamburg|ben.schmidt@outlo...|Software Engineer|  NULL|      Tech|              12|
|     David|Schneider| 40|  Cologne|david.schneider@c...|  Project Manager| 83000|Management|              15|
|       Eva|    Huber| 31|Stuttgart|    eva.huber@web.de|   Data Scientist|  NULL|      Tech|               6|
|      Gina|  Fischer| 45|  Hamburg|gina.fischer@yaho...|       HR Manager| 69000|Management|              20|
|      Igor|   Keller| 38|   Munich|  igor.keller@gmx.de|    Sales Manager| 75000|Management|              14|
|      Karl|    Bauer| 50|   Berlin|karl.bauer@exampl...|              CTO|120000|     Other|              25|
|

In [116]:
df.where(df.age > 30).select("first_name", "last_name", "age").show()

+----------+---------+---+
|first_name|last_name|age|
+----------+---------+---+
|       Ben|  Schmidt| 35|
|     David|Schneider| 40|
|       Eva|    Huber| 31|
|      Gina|  Fischer| 45|
|      Igor|   Keller| 38|
|      Karl|    Bauer| 50|
|      Lina|    Maier| 33|
|       Max|    Frank| 41|
|     Paula| Hartmann| 36|
|   Quentin|   Schulz| 43|
|    Stefan|   Becker| 39|
|      Tina|    Kraus| 32|
+----------+---------+---+



**Hinweis:**  
> **`filter`** und **`where`** sind **identisch** – es ist reine Geschmackssache.

### Übung 3: selektieren
1. Welche Mitarbeiter sind 30 Jahre oder älter und arbeiten in Tech Department?
2. Zeige alle Mitarbeiter aus Berlin oder München mit ihrem Vor- und Nachnamen.
3. Welche Mitarbeiter haben kein Gehalt (NULL)?

In [96]:
#1
df.filter(
    (F.col("age") >= 30) & 
    (F.col("department") == "Tech")
).select("first_name", "last_name", "age", "job_title").show()

+----------+---------+---+-----------------+
|first_name|last_name|age|        job_title|
+----------+---------+---+-----------------+
|       Ben|  Schmidt| 35|Software Engineer|
|       Eva|    Huber| 31|   Data Scientist|
|       Max|    Frank| 41| Network Engineer|
|      Tina|    Kraus| 32|    Data Engineer|
+----------+---------+---+-----------------+



In [None]:
df.filter(F.col("city").)

df.with

In [111]:
#2
df.filter(
    (F.col("city") == "Berlin") | (F.col("city") == "Munich")
).select("first_name", "last_name", "city").show()

+----------+---------+------+
|first_name|last_name|  city|
+----------+---------+------+
|      Anna|  Mueller|Berlin|
|     Clara|    Klein|Munich|
|     Felix|   Wagner|Berlin|
|      Igor|   Keller|Munich|
|      Karl|    Bauer|Berlin|
|    Oliver|    Weber|Berlin|
|     Paula| Hartmann|Munich|
+----------+---------+------+



In [112]:
#2
df.filter(
    F.col("city").isin(["Berlin", "Munich"])
).select("first_name", "last_name", "city").show()

+----------+---------+------+
|first_name|last_name|  city|
+----------+---------+------+
|      Anna|  Mueller|Berlin|
|     Clara|    Klein|Munich|
|     Felix|   Wagner|Berlin|
|      Igor|   Keller|Munich|
|      Karl|    Bauer|Berlin|
|    Oliver|    Weber|Berlin|
|     Paula| Hartmann|Munich|
+----------+---------+------+



In [99]:
#3
df.filter(
    F.col("salary").isNull()
).select("first_name", "last_name", "job_title").show()

+----------+---------+-----------------+
|first_name|last_name|        job_title|
+----------+---------+-----------------+
|       Ben|  Schmidt|Software Engineer|
|       Eva|    Huber|   Data Scientist|
|    Hannah|     Koch|      UI Designer|
|      Lina|    Maier|       Accountant|
|   Quentin|   Schulz|       Consultant|
|      Tina|    Kraus|    Data Engineer|
+----------+---------+-----------------+



### withColumn():
    - Eine neue Spalte zu einem DataFrame hinzufügen
    ODER
    - Eine bestehende Spalte modifizieren

### Neue Spalten erstellen (`withColumn`)

In [23]:
from pyspark.sql.functions import col

df = df.withColumn("age_plus_5", col("age") + 5)
df.show()

+-------+---+----------+--------------------+-------------------+------+----------+
|   name|age|      city|               email|          job_title|salary|age_plus_5|
+-------+---+----------+--------------------+-------------------+------+----------+
|   Anna| 28|    Berlin|anna.mueller@exam...|       Data Analyst| 52000|      33.0|
|    Ben| 35|   Hamburg|ben.schmidt@examp...|  Software Engineer| 74000|      40.0|
|  Clara| 22|    Munich|clara.klein@examp...|Marketing Assistant| 42000|      27.0|
|  David| 40|   Cologne|david.schneider@e...|    Project Manager| 83000|      45.0|
|    Eva| 31| Stuttgart|eva.huber@example...|     Data Scientist| 68000|      36.0|
|  Felix| 29|    Berlin|felix.wagner@exam...|    DevOps Engineer|  NULL|      34.0|
|   Gina| 45|   Hamburg|gina.fischer@exam...|         HR Manager| 69000|      50.0|
| Hannah| 26| Frankfurt|hannah.koch@examp...|        UI Designer| 54000|      31.0|
|   Igor| 38|    Munich|igor.keller@examp...|      Sales Manager| 75000|    

In [121]:
df_mit_land = df.withColumn("country", F.lit("Germany"))
df_mit_land.show()

+----------+---------+---+----------+--------------------+-------------------+------+----------+----------------+-------+
|first_name|last_name|age|      city|               email|          job_title|salary|department|years_experience|country|
+----------+---------+---+----------+--------------------+-------------------+------+----------+----------------+-------+
|      Anna|  Mueller| 28|    Berlin|anna.mueller@gmai...|       Data Analyst| 52000|      Tech|               4|Germany|
|       Ben|  Schmidt| 35|   Hamburg|ben.schmidt@outlo...|  Software Engineer|  NULL|      Tech|              12|Germany|
|     Clara|    Klein| 22|    Munich|clara.klein@examp...|Marketing Assistant| 42000|     Other|               1|Germany|
|     David|Schneider| 40|   Cologne|david.schneider@c...|    Project Manager| 83000|Management|              15|Germany|
|       Eva|    Huber| 31| Stuttgart|    eva.huber@web.de|     Data Scientist|  NULL|      Tech|               6|Germany|
|     Felix|   Wagner| 2

### Übung 3: Column hinzufügen
1. Geburtsjahrspalte hinzufügen

In [131]:
# Füge "Geburtsjahr" hinzu
df_mit_geburtsjahr = df.withColumn("Geburtsjahr", (F.lit(2025) - F.col("age")).cast("integer"))
df_mit_geburtsjahr.show()

+----------+---------+---+----------+--------------------+-------------------+------+----------+----------------+-----------+
|first_name|last_name|age|      city|               email|          job_title|salary|department|years_experience|Geburtsjahr|
+----------+---------+---+----------+--------------------+-------------------+------+----------+----------------+-----------+
|      Anna|  Mueller| 28|    Berlin|anna.mueller@gmai...|       Data Analyst| 52000|      Tech|               4|       1997|
|       Ben|  Schmidt| 35|   Hamburg|ben.schmidt@outlo...|  Software Engineer|  NULL|      Tech|              12|       1990|
|     Clara|    Klein| 22|    Munich|clara.klein@examp...|Marketing Assistant| 42000|     Other|               1|       2003|
|     David|Schneider| 40|   Cologne|david.schneider@c...|    Project Manager| 83000|Management|              15|       1985|
|       Eva|    Huber| 31| Stuttgart|    eva.huber@web.de|     Data Scientist|  NULL|      Tech|               6|     

**Wichtig:**  
> Jede `.withColumn()`-Operation erstellt **intern ein neues DataFrame** — Spark verändert nie das Originalobjekt direkt.

### Bedingte Logik (`when`, `otherwise`)

Mit `when` und `otherwise` kannst du **Bedingungen** einbauen, ähnlich wie `if-else`.

In [143]:
from pyspark.sql.functions import when

df = df.withColumn(
    "income_category",
    when(F.col("salary") >= 70000, "Hochverdiener")
    .when((F.col("salary") >= 28000) & (F.col("salary") < 70000), "Normalverdiener")
    .otherwise("Geringverdiener")
)
df.show(100)

+----------+----------+---+----------+--------------------+-------------------+------+----------+----------------+---------------+
|first_name| last_name|age|      city|               email|          job_title|salary|department|years_experience|income_category|
+----------+----------+---+----------+--------------------+-------------------+------+----------+----------------+---------------+
|      Anna|   Mueller| 28|    Berlin|anna.mueller@gmai...|       Data Analyst| 52000|      Tech|               4|Normalverdiener|
|       Ben|   Schmidt| 35|   Hamburg|ben.schmidt@outlo...|  Software Engineer|  NULL|      Tech|              12|Geringverdiener|
|     Clara|     Klein| 22|    Munich|clara.klein@examp...|Marketing Assistant| 42000|     Other|               1|Normalverdiener|
|     David| Schneider| 40|   Cologne|david.schneider@c...|    Project Manager| 83000|Management|              15|  Hochverdiener|
|       Eva|     Huber| 31| Stuttgart|    eva.huber@web.de|     Data Scientist|  NU

#### Übung 4: Kategorisiere Mitarbeiter nach Berufserfahrung

Aufgabe: Erstelle eine neue Spalte "Erfahrungsstufe", die Mitarbeiter basierend auf ihren Berufsjahren kategorisiert:

    "Senior": 15+ Jahre Erfahrung

    "Mid-Level": 5-14 Jahre Erfahrung

    "Junior": Weniger als 5 Jahre

In [148]:
df = df.withColumn(
    "Erfahrungsstufe",
    F.when(F.col("years_experience") >= 15, "Senior")
     .when(F.col("years_experience") >= 5, "Mid-Level")
     .otherwise("Junior")
)
df.select("first_name", "last_name", "years_experience", "Erfahrungsstufe").show()

+----------+---------+----------------+---------------+
|first_name|last_name|years_experience|Erfahrungsstufe|
+----------+---------+----------------+---------------+
|      Anna|  Mueller|               4|         Junior|
|       Ben|  Schmidt|              12|      Mid-Level|
|     Clara|    Klein|               1|         Junior|
|     David|Schneider|              15|         Senior|
|       Eva|    Huber|               6|      Mid-Level|
|     Felix|   Wagner|               5|      Mid-Level|
|      Gina|  Fischer|              20|         Senior|
|    Hannah|     Koch|               3|         Junior|
|      Igor|   Keller|              14|      Mid-Level|
|     Julia|  Schmitt|               2|         Junior|
|      Karl|    Bauer|              25|         Senior|
|      Lina|    Maier|               8|      Mid-Level|
|       Max|    Frank|              16|         Senior|
|      Nina|  Lehmann|               7|      Mid-Level|
|    Oliver|    Weber|               6|      Mid

**Merke:**  
> Viele verschachtelte `when`-Bedingungen können unübersichtlich werden → sauber strukturieren!

### Umgang mit fehlenden Werten (`fillna`, `dropna`)

In [30]:
# Fehlende Werte füllen (`fillna`):
# Ersetzt fehlende Gehälter durch 50000
df_filled = df.fillna({"salary": 50000})
df_filled.show()

+-------+---+----------+--------------------+-------------------+------+----------+---------------+
|   name|age|      city|               email|          job_title|salary|age_plus_5|income_category|
+-------+---+----------+--------------------+-------------------+------+----------+---------------+
|   Anna| 28|    Berlin|anna.mueller@exam...|       Data Analyst| 52000|      33.0|Normalverdiener|
|    Ben| 35|   Hamburg|ben.schmidt@examp...|  Software Engineer| 74000|      40.0|Normalverdiener|
|  Clara| 22|    Munich|clara.klein@examp...|Marketing Assistant| 42000|      27.0|Geringverdiener|
|  David| 40|   Cologne|david.schneider@e...|    Project Manager| 83000|      45.0|  Hochverdiener|
|    Eva| 31| Stuttgart|eva.huber@example...|     Data Scientist| 68000|      36.0|Normalverdiener|
|  Felix| 29|    Berlin|felix.wagner@exam...|    DevOps Engineer| 50000|      34.0|Geringverdiener|
|   Gina| 45|   Hamburg|gina.fischer@exam...|         HR Manager| 69000|      50.0|Normalverdiener|


In [29]:
# Zeilen mit fehlenden Werten löschen (`dropna`):
# Entfernt alle Zeilen, die mindestens einen `null`-Wert enthalten.
df_clean = df.dropna()
df_clean.show()

+-------+---+----------+--------------------+-------------------+------+----------+---------------+
|   name|age|      city|               email|          job_title|salary|age_plus_5|income_category|
+-------+---+----------+--------------------+-------------------+------+----------+---------------+
|   Anna| 28|    Berlin|anna.mueller@exam...|       Data Analyst| 52000|      33.0|Normalverdiener|
|    Ben| 35|   Hamburg|ben.schmidt@examp...|  Software Engineer| 74000|      40.0|Normalverdiener|
|  Clara| 22|    Munich|clara.klein@examp...|Marketing Assistant| 42000|      27.0|Geringverdiener|
|  David| 40|   Cologne|david.schneider@e...|    Project Manager| 83000|      45.0|  Hochverdiener|
|    Eva| 31| Stuttgart|eva.huber@example...|     Data Scientist| 68000|      36.0|Normalverdiener|
|   Gina| 45|   Hamburg|gina.fischer@exam...|         HR Manager| 69000|      50.0|Normalverdiener|
| Hannah| 26| Frankfurt|hannah.koch@examp...|        UI Designer| 54000|      31.0|Normalverdiener|


### Gruppieren und Aggregieren (`groupBy` + `agg`)

Mit `.groupBy()` kannst du dein DataFrame nach einer oder mehreren Spalten **gruppieren**.  
Mit `.agg()` kannst du dann **Aggregationfunktionen** auf jede Gruppe anwenden.

In [157]:
from pyspark.sql import functions as F

# 1. Gruppierung nach Abteilung (department)
# Frage: "Wie viele Mitarbeiter gibt es pro Abteilung?"

df.groupBy("department") \
  .agg(F.count("*").alias("Anzahl_Mitarbeiter")) \
  .show()

+----------+------------------+
|department|Anzahl_Mitarbeiter|
+----------+------------------+
|Management|                 4|
|   Finance|                 1|
|     Other|                11|
|      Tech|                 6|
|  Creative|                 3|
+----------+------------------+



In [159]:
# 2. Durchschnittsalter pro Stadt
# Frage: "Wie alt sind Mitarbeiter im Durchschnitt pro Stadt?"

df.groupBy("city") \
  .agg(F.avg("age").alias("Durchschnittsalter")) \
  .orderBy("Durchschnittsalter", ascending=False) \
  .show()

+----------+------------------+
|      city|Durchschnittsalter|
+----------+------------------+
| Stuttgart|34.333333333333336|
|   Cologne|             33.75|
|   Hamburg|             33.25|
| Frankfurt|              31.0|
|    Berlin|              31.0|
|    Munich|              29.0|
|Düsseldorf|              24.5|
+----------+------------------+



In [160]:
# 3. Höchstgehalt pro Abteilung (ohne NULL-Werte)
# Frage: "Was ist das maximale Gehalt in jeder Abteilung?"

df.filter(F.col("salary").isNotNull()) \
  .groupBy("department") \
  .agg(F.max("salary").alias("Maximalgehalt")) \
  .show()

+----------+-------------+
|department|Maximalgehalt|
+----------+-------------+
|  Creative|        65000|
|Management|        83000|
|     Other|        67000|
|      Tech|        71000|
+----------+-------------+



In [161]:
# 4. Kombinierte Aggregationen
# Frage: "Zeige pro Abteilung: Anzahl, Durchschnittsgehalt und Erfahrungssumme"

df.groupBy("department") \
  .agg(
      F.count("*").alias("Anzahl"),
      F.avg("salary").alias("Durchschnittsgehalt"),
      F.sum("years_experience").alias("Gesamterfahrung")
  ) \
  .show()

+----------+------+-------------------+---------------+
|department|Anzahl|Durchschnittsgehalt|Gesamterfahrung|
+----------+------+-------------------+---------------+
|Management|     4|  75666.66666666667|           67.0|
|   Finance|     1|               NULL|            8.0|
|     Other|    11|  48763.63636363636|           58.0|
|      Tech|     6| 63533.333333333336|           51.0|
|  Creative|     3|            53000.0|           16.0|
+----------+------+-------------------+---------------+



#### Übung 4: Aggregation
1. Frage: "Wie viele Mitarbeiter arbeiten in jeder Stadt?"
1. Frage: "Welche Abteilungen haben mehr als 5 Mitarbeiter?"
2. Frage: "Wie viele Normalverdiener (20.000-70.000€) und Hochverdiener (>70.000€) gibt es pro Stadt?"

In [166]:
#1
df.groupBy("city") \
  .agg(F.count("*").alias("Anzahl_Mitarbeiter")) \
  .show()

+----------+------------------+
|      city|Anzahl_Mitarbeiter|
+----------+------------------+
| Frankfurt|                 3|
|    Berlin|                 5|
|Düsseldorf|                 2|
|   Hamburg|                 4|
| Stuttgart|                 3|
|   Cologne|                 4|
|    Munich|                 4|
+----------+------------------+



In [165]:
#2
df.groupBy("department") \
  .agg(F.count("*").alias("Anzahl")) \
  .filter(F.col("Anzahl") > 5) \
  .show()

+----------+------+
|department|Anzahl|
+----------+------+
|     Other|    11|
|      Tech|     6|
+----------+------+



In [164]:
#3
from pyspark.sql import functions as F

# Zuerst Gehaltskategorie-Spalte erstellen
df_mit_kategorie = df.withColumn(
    "Gehaltsklasse",
    F.when(F.col("salary") >= 70000, "Hochverdiener")
     .when(F.col("salary") >= 20000, "Normalverdiener")
     .otherwise("Geringverdiener")
)

# Dann gruppieren
df_mit_kategorie.filter(F.col("Gehaltsklasse").isin(["Hochverdiener", "Normalverdiener"])) \
  .groupBy("city", "Gehaltsklasse") \
  .agg(F.count("*").alias("Anzahl")) \
  .orderBy("city") \
  .show()

+----------+---------------+------+
|      city|  Gehaltsklasse|Anzahl|
+----------+---------------+------+
|    Berlin|  Hochverdiener|     1|
|    Berlin|Normalverdiener|     4|
|   Cologne|Normalverdiener|     1|
|   Cologne|  Hochverdiener|     2|
|Düsseldorf|Normalverdiener|     2|
| Frankfurt|Normalverdiener|     1|
|   Hamburg|Normalverdiener|     3|
|    Munich|  Hochverdiener|     1|
|    Munich|Normalverdiener|     2|
| Stuttgart|Normalverdiener|     1|
+----------+---------------+------+



**Wichtig zu wissen:**  
> `.groupBy()` alleine macht noch nichts. Erst `.agg()`, `.count()`, `.sum()` oder `.avg()` lösen die echte Berechnung aus.

### Sortieren und Reihenfolge ändern (`orderBy`, `sort`)

Mit `.orderBy()` oder `.sort()` kannst du dein DataFrame sortieren.

In [185]:
from IPython.display import display, HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

In [None]:
df.orderBy(F.col("salary")).show()

In [197]:
df.dtypes

[('first_name', 'string'),
 ('last_name', 'string'),
 ('age', 'string'),
 ('city', 'string'),
 ('email', 'string'),
 ('job_title', 'string'),
 ('salary', 'int'),
 ('department', 'string'),
 ('years_experience', 'string'),
 ('income_category', 'string'),
 ('Erfahrungsstufe', 'string')]

In [195]:
df = df.withColumn("salary", F.col("salary").cast(IntegerType()))
df.orderBy(F.col("salary")).show()


+----------+----------+---+----------+--------------------+-------------------+------+----------+----------------+---------------+---------------+
|first_name| last_name|age|      city|               email|          job_title|salary|department|years_experience|income_category|Erfahrungsstufe|
+----------+----------+---+----------+--------------------+-------------------+------+----------+----------------+---------------+---------------+
|       Ben|   Schmidt| 35|   Hamburg|ben.schmidt@outlo...|  Software Engineer|  NULL|      Tech|              12|Geringverdiener|      Mid-Level|
|       Eva|     Huber| 31| Stuttgart|    eva.huber@web.de|     Data Scientist|  NULL|      Tech|               6|Geringverdiener|      Mid-Level|
|    Hannah|      Koch| 26| Frankfurt|hannah.koch@examp...|        UI Designer|  NULL|  Creative|               3|Geringverdiener|         Junior|
|      Lina|     Maier| 33| Stuttgart|lina.maier@t-onli...|         Accountant|  NULL|   Finance|               8|Geri

In [175]:
display(df)

DataFrame[first_name: string, last_name: string, age: string, city: string, email: string, job_title: string, salary: string, department: string, years_experience: string, income_category: string, Erfahrungsstufe: string]

In [190]:
# Oder aufsteigend (Standard)
df.orderBy("age").show(100)

+----------+----------+---+----------+--------------------+-------------------+------+----------+----------------+---------------+---------------+
|first_name| last_name|age|      city|               email|          job_title|salary|department|years_experience|income_category|Erfahrungsstufe|
+----------+----------+---+----------+--------------------+-------------------+------+----------+----------------+---------------+---------------+
|       Tom|   Richter| 20|    Munich|tom.richter@examp...|    Working Student| 18000|     Other|               0|Geringverdiener|         Junior|
|     Marco|Zimmermann| 21|    Berlin|marco.zimmermann@...|             Intern| 28000|     Other|               0|Normalverdiener|         Junior|
|     Clara|     Klein| 22|    Munich|clara.klein@examp...|Marketing Assistant| 42000|     Other|               1|Normalverdiener|         Junior|
|     Kevin|   Neumann| 22|   Cologne|kevin.neumann@gmx.de|Warehouse Assistant| 31000|     Other|               1|Norm

## 🔃 Sortieren und Reihenfolge ändern (`orderBy`, `sort`)

Mit `.orderBy()` oder `.sort()` kannst du dein DataFrame sortieren.

**Beispiel: Nach Gehalt absteigend sortieren:**

```python
df.orderBy(F.desc("salary")).show()
```

**Oder aufsteigend (Standard):**

```python
df.orderBy("age").show()

**Hinweis:**  
> `orderBy` und `sort` sind **gleichwertig**.  
> Bei riesigen DataFrames kann Sortieren teuer werden → vorsichtig einsetzen!

## ✅ Daten von Azure einlesen

In [10]:
spark.stop()

In [1]:
from pyspark.sql import SparkSession
from dotenv import load_dotenv
import os

spark = SparkSession.builder \
    .appName("Localspark") \
    .master("local[*]") \
    .config("spark.jars.packages",
        "com.microsoft.sqlserver:mssql-jdbc:12.6.1.jre11") \
    .getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

In [2]:
# Load variables from .env into os.environ
load_dotenv("env")
server_name = os.environ.get("SERVERNAME")
database_name = os.environ.get("DATABASENAME")
username = os.environ.get("USERNAME")
password = os.environ.get("PASSWORD")

In [3]:
# Standard JDBC URL format for SQL Server
jdbc_url = f"jdbc:sqlserver://{server_name}:1433;databaseName={database_name}"

#table name to read
table_name = "BIB.Autor"

In [4]:
try:
    jdbcDF = spark.read \
        .format("jdbc") \
        .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
        .option("url", jdbc_url) \
        .option("dbtable", table_name) \
        .option("user", username) \
        .option("password", password) \
        .load()

    # Display schema and a few rows if successful
    jdbcDF.printSchema()
    jdbcDF.show(5)

except Exception as e:
    print("JDBC read failed!")
    print("Error:", e)

root
 |-- ID_Autor: integer (nullable = true)
 |-- Vorname_1: string (nullable = true)
 |-- Vorname_2: string (nullable = true)
 |-- Nachname: string (nullable = true)

+--------+---------+---------+--------+
|ID_Autor|Vorname_1|Vorname_2|Nachname|
+--------+---------+---------+--------+
|       1|      Abe|     NULL|    Kobo|
|       2|   Chinua|     NULL|  Achebe|
|       3|  Theodor|       W.|  Adorno|
|       4|     NULL|     NULL|    Äsop|
|       5|   Samuel|     NULL|   Agnon|
+--------+---------+---------+--------+
only showing top 5 rows



### 🧩 Beispiel: join() in PySpark
🎯 Ziel: Zwei DataFrames anhand einer Spalte verbinden (JOIN)

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import Row

spark = SparkSession.builder.appName("Erweiterte Joins").getOrCreate()

df_kunden = spark.createDataFrame([
    Row(kunden_id=1, name="Anna"),
    Row(kunden_id=2, name="Ben"),
    Row(kunden_id=3, name="Clara"),
    Row(kunden_id=4, name="Dieter"),
    Row(kunden_id=5, name="Eva"),
    Row(kunden_id=6, name="Fritz"),
    Row(kunden_id=7, name="Gina")
])

df_bestellungen = spark.createDataFrame([
    Row(bestell_id=101, kunden_id=1, produkt="Buch"),
    Row(bestell_id=102, kunden_id=2, produkt="Tasse"),
    Row(bestell_id=103, kunden_id=4, produkt="Stift"),
    Row(bestell_id=104, kunden_id=1, produkt="Laptop"),
    Row(bestell_id=105, kunden_id=6, produkt="Kabel"),
    Row(bestell_id=106, kunden_id=8, produkt="Lampe")  # kein passender Kunde
])


In [2]:
df_kunden.show()

+---------+------+
|kunden_id|  name|
+---------+------+
|        1|  Anna|
|        2|   Ben|
|        3| Clara|
|        4|Dieter|
|        5|   Eva|
|        6| Fritz|
|        7|  Gina|
+---------+------+



In [4]:
df_bestellungen.show()

+----------+---------+-------+
|bestell_id|kunden_id|produkt|
+----------+---------+-------+
|       101|        1|   Buch|
|       102|        2|  Tasse|
|       103|        4|  Stift|
|       104|        1| Laptop|
|       105|        6|  Kabel|
|       106|        8|  Lampe|
+----------+---------+-------+



### Inner join
✅ Nur Kunden mit passenden Bestellungen.

In [3]:
df_kunden.join(df_bestellungen, on="kunden_id", how="inner").show()

+---------+------+----------+-------+
|kunden_id|  name|bestell_id|produkt|
+---------+------+----------+-------+
|        1|  Anna|       101|   Buch|
|        1|  Anna|       104| Laptop|
|        2|   Ben|       102|  Tasse|
|        4|Dieter|       103|  Stift|
|        6| Fritz|       105|  Kabel|
+---------+------+----------+-------+



### left Join
✅ Alle Kunden, auch wenn sie keine Bestellungen haben.

In [4]:
df_kunden.join(df_bestellungen, on="kunden_id", how="left")

DataFrame[kunden_id: bigint, name: string, bestell_id: bigint, produkt: string]

## right Join

In [7]:
df_kunden.join(df_bestellungen, on="kunden_id", how="right").show()

+---------+------+----------+-------+
|kunden_id|  name|bestell_id|produkt|
+---------+------+----------+-------+
|        1|  Anna|       101|   Buch|
|        2|   Ben|       102|  Tasse|
|        4|Dieter|       103|  Stift|
|        1|  Anna|       104| Laptop|
|        6| Fritz|       105|  Kabel|
|        8|  NULL|       106|  Lampe|
+---------+------+----------+-------+



## outer Join
✅ Alle Daten aus beiden Tabellen, inkl. null, wenn kein Match.

In [8]:
df_kunden.join(df_bestellungen, on="kunden_id", how="outer").show()

+---------+------+----------+-------+
|kunden_id|  name|bestell_id|produkt|
+---------+------+----------+-------+
|        1|  Anna|       101|   Buch|
|        1|  Anna|       104| Laptop|
|        2|   Ben|       102|  Tasse|
|        3| Clara|      NULL|   NULL|
|        4|Dieter|       103|  Stift|
|        5|   Eva|      NULL|   NULL|
|        6| Fritz|       105|  Kabel|
|        7|  Gina|      NULL|   NULL|
|        8|  NULL|       106|  Lampe|
+---------+------+----------+-------+



### left_semi Join
✅ Nur Kunden, die mindestens eine Bestellung haben (keine Bestelldaten sichtbar).

In [24]:
df_kunden.join(df_bestellungen, on="kunden_id", how="left_semi").show()

+---------+------+
|kunden_id|  name|
+---------+------+
|        1|  Anna|
|        2|   Ben|
|        4|Dieter|
|        6| Fritz|
+---------+------+



#### 🧠 Wann braucht man einen left_semi Join?

Ein left_semi Join wird verwendet, wenn du:

    ✅ nur Zeilen aus dem linken DataFrame behalten willst,
    ✅ nur dann, wenn es einen passenden Eintrag im rechten DataFrame gibt,
    ❌ aber keine Spalten aus dem rechten DataFrame brauchst.
    
<p style="text-align: center;"><b>Vergleich left_semi vs inner </b></p>

| Eigenschaft                  | `inner` Join | `left_semi` Join |
| ---------------------------- | ------------ | ---------------- |
| Gibt linke Zeilen zurück     | ✅            | ✅                |
| Gibt rechte Spalten zurück   | ✅            | ❌                |
| Nutzt man als Filter         | ❌            | ✅                |
| Performance bei großen Daten | Mittel       | Sehr gut         |

#### ⚙️ Performance-Optimierung

    - Schneller als inner join, weil keine unnötigen Spalten gezogen werden.
    - Reduziert Speicherbedarf und Shuffle bei großen Datenmengen.

 ### left_anti Join
✅ Nur Kunden, die keine Bestellung haben.

In [25]:
df_kunden.join(df_bestellungen, on="kunden_id", how="left_anti").show()

+---------+-----+
|kunden_id| name|
+---------+-----+
|        3|Clara|
|        5|  Eva|
|        7| Gina|
+---------+-----+



#### 🚀 Was ist ein Broadcast Join?

Ein Broadcast Join sendet (broadcastet) eine kleine Tabelle an alle Worker-Nodes, sodass ein teurer Shuffle-Vorgang vermieden wird.

Wann?

    - Wenn ein DataFrame klein ist (z. B. eine Lookup-Tabelle).
    - Ideal bei Skewed Joins oder Star Schema.

In [26]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.functions import broadcast

spark = SparkSession.builder.appName("Broadcast Join").getOrCreate()

# Große Tabelle
sales = spark.createDataFrame([
    Row(user_id=1, amount=200),
    Row(user_id=2, amount=150),
    Row(user_id=3, amount=100),
    Row(user_id=4, amount=50),
    Row(user_id=5, amount=300)
])

# Kleine Lookup-Tabelle
users = spark.createDataFrame([
    Row(user_id=1, name="Anna"),
    Row(user_id=2, name="Ben"),
    Row(user_id=3, name="Clara")
])

# Broadcast Join
joined = sales.join(broadcast(users), on="user_id", how="inner")
joined.show()


+-------+------+-----+
|user_id|amount| name|
+-------+------+-----+
|      1|   200| Anna|
|      2|   150|  Ben|
|      3|   100|Clara|
+-------+------+-----+



#### 🧠 Wann braucht man einen Broadcast Join?

    - Wenn du eine kleine Tabelle hast, z. B. < 10 MB.
    - Die kleine Tabelle wird an alle Worker geschickt.
    - Spart Shuffle-Zeit und Speicher – ideal für Star Schema Joins.


<p style="text-align: center;"><b>📌 Typische Anwendungsfälle</b></p>

| Anwendung                      | Warum Broadcast?                               |
| ------------------------------ | ---------------------------------------------- |
| Produkttabelle in E-Commerce   | Produktinfos sind klein, Bestellungen riesig   |
| Benutzerrolle-Lookup           | Rollen sind fix, schnell zu broadcasten        |
| ISO-Ländercodes oder Metadaten | Kleine Referenzdaten, oft in Queries gebraucht |


In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").appName("demo").getOrCreate()

In [6]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("ShuffleBeispiel").getOrCreate()

df1 = spark.createDataFrame([
    (1, "A"), (2, "B"), (3, "C")
], ["id", "wert1"])

df2 = spark.createDataFrame([
    (1, "X"), (2, "Y"), (4, "Z")
], ["id", "wert2"])

joined = df1.join(df2, on="id")

# Execution Plan anzeigen
joined.explain()
joined.show()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [id#55L, wert1#56, wert2#60]
   +- SortMergeJoin [id#55L], [id#59L], Inner
      :- Sort [id#55L ASC NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(id#55L, 200), ENSURE_REQUIREMENTS, [plan_id=183]
      :     +- Filter isnotnull(id#55L)
      :        +- Scan ExistingRDD[id#55L,wert1#56]
      +- Sort [id#59L ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(id#59L, 200), ENSURE_REQUIREMENTS, [plan_id=184]
            +- Filter isnotnull(id#59L)
               +- Scan ExistingRDD[id#59L,wert2#60]


+---+-----+-----+
| id|wert1|wert2|
+---+-----+-----+
|  1|    A|    X|
|  2|    B|    Y|
+---+-----+-----+



In [10]:
spark.catalog.listTables()

[]

### Spark-SQL API in PySpark

| Was?                   | Kurz erklärt                                                                                 | Warum das zählt                                                                  |
| ---------------------- | -------------------------------------------------------------------------------------------- | -------------------------------------------------------------------------------- |
| **Spark SQL**          | SQL-Engine **im** Cluster: DataFrames wirken wie Tabellen → du kannst Standard-SQL absetzen. | Analyst\:innen nutzen vertraute Syntax, Entwickler\:innen kriegen Cluster-Power. |
| **PySpark-Bridge**     | DataFrame → `createOrReplaceTempView()` → `spark.sql("…")`.                                  | Trennscharf: Python, SQL und Optimierer teilen denselben Plan.                   |
| **Vorteile**           | ✓ Vertraute Syntax <br>✓ Catalyst-Optimierung gratis <br>✓ Frei mischbar: SQL ↔ DataFrame    | Weniger Code-Duplizierung – du nimmst jeweils die ausdrucksstärkere API.         |
| **Temp Views** | leben nur solange die Session lebt; nach Notebook-Restart sind sie weg.           | Erst sichern / als Tabelle speichern, wenn du sie später noch brauchst.                   |


#### 1. Basis-Workflow

In [11]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = (SparkSession.builder
         .master("local[4]")
         .appName("sql-api-demo")
         .getOrCreate())

# 1. Mini-DataFrame im Arbeitsspeicher erzeugen
data = [
    ("Alice", "Engineering", 65000),
    ("Bob",   "Marketing",  58000),
    ("Carol", "Engineering", 72000),
    ("Dave",  "Marketing",  55000),
    ("Eve",   "HR",         50000),
]
cols = ["name", "dept", "salary"]

df = spark.createDataFrame(data, cols)

# 2. DataFrame als temporäre Tabelle registrieren
df.createOrReplaceTempView("employees")


#### 2. SQL direkt ausführen

In [12]:
# Durchschnittsgehalt pro Abteilung (SQL)
avg_salary_sql = spark.sql("""
    SELECT dept,
           ROUND(AVG(salary), 0) AS avg_salary,
           COUNT(*)             AS headcount
    FROM   employees
    GROUP  BY dept
    ORDER  BY avg_salary DESC
""")

avg_salary_sql.show()


+-----------+----------+---------+
|       dept|avg_salary|headcount|
+-----------+----------+---------+
|Engineering|   68500.0|        2|
|  Marketing|   56500.0|        2|
|         HR|   50000.0|        1|
+-----------+----------+---------+



#### 3. SQL und DataFrame-API kombinieren

In [13]:
# Ergebnis in DataFrame-Notation weiterverarbeiten
top_dept = (avg_salary_sql
            .filter(col("headcount") > 1)
            .withColumnRenamed("dept", "abteilung"))

top_dept.show()


+-----------+----------+---------+
|  abteilung|avg_salary|headcount|
+-----------+----------+---------+
|Engineering|   68500.0|        2|
|  Marketing|   56500.0|        2|
+-----------+----------+---------+



### Window Functions

Window Functions erlauben dir, über eine **Teilmenge** deiner Daten zu arbeiten, **ohne** sie vollständig zu aggregieren.

In [40]:
from pyspark.sql.window import Window

window_spec = Window.partitionBy("city").orderBy(F.desc("salary"))

df = df.withColumn("rank_in_city", F.rank().over(window_spec))
df.show()

+-------+---+----------+--------------------+-------------------+------+----------+---------------+------------+
|   name|age|      city|               email|          job_title|salary|age_plus_5|income_category|rank_in_city|
+-------+---+----------+--------------------+-------------------+------+----------+---------------+------------+
| Oliver| 27|    Berlin|oliver.weber@exam...|  Backend Developer| 67000|      32.0|Normalverdiener|           1|
|   Anna| 28|    Berlin|anna.mueller@exam...|       Data Analyst| 52000|      33.0|Normalverdiener|           2|
|   Karl| 50|    Berlin|karl.bauer@exampl...|                CTO|120000|      55.0|  Hochverdiener|           3|
|  Felix| 29|    Berlin|felix.wagner@exam...|    DevOps Engineer|  NULL|      34.0|Geringverdiener|           4|
|  David| 40|   Cologne|david.schneider@e...|    Project Manager| 83000|      45.0|  Hochverdiener|           1|
|   Tina| 32|   Cologne|tina.kraus@exampl...|      Data Engineer| 73000|      37.0|Normalverdien

25/04/29 08:00:42 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$driverEndpoint(BlockManagerMasterEndpoint.scala:123)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.isExecutorAlive$lzycompute$1(BlockManagerMasterEndpoint.scala:688)
	at org.apache.spark.storage.BlockManagerMasterE

### Lesen von Azure Datenbank

In [None]:
from pyspark.sql import SparkSession
from dotenv import load_dotenv
import os

spark = SparkSession.builder \
    .appName("Localspark") \
    .master("local[*]") \
    .config("spark.jars.packages",
        "com.microsoft.sqlserver:mssql-jdbc:12.6.1.jre11") \
    .getOrCreate()
spark.sparkContext.setLogLevel("ERROR")