In [None]:
# ---------------------------------------------------------
# ZELLE 1: SETUP & SPARK SESSION
# ---------------------------------------------------------
# Hier initialisieren wir den "Driver".
# Wir nutzen Spark im "Local Mode" (master="local[*]"), um einen
# Cluster auf diesem Rechner zu simulieren.
# Außerdem definieren wir relative Pfade, damit das Notebook bei jedem im Team läuft.
# ---------------------------------------------------------

import os
import shutil
import findspark

findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, from_unixtime
from pyspark.sql.types import DoubleType, LongType, StringType

spark = SparkSession.builder \
    .appName("OpenSky Flight Data Analysis") \
    .master("local[*]") \
    .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
    .getOrCreate()

current_dir = os.getcwd()
RAW_PATH = os.path.join(current_dir, "..", "data", "raw")
PROCESSED_PATH = os.path.join(current_dir, "..", "data", "processed")

print(f"Spark Version: {spark.version}")
print(f"Lese Rohdaten aus: {RAW_PATH}")

In [None]:
# ---------------------------------------------------------
# ZELLE 2: PHASE 1 - INGESTION (RAW ZONE / BRONZE LAYER)
# ---------------------------------------------------------
# Wir laden die Rohdaten (CSV) direkt aus dem "Data Lake" (lokaler Ordner).
# Strategie: "Schema-on-Read" -> Spark versucht, Datentypen automatisch zu erkennen.
# Dies entspricht dem Zugriff auf unstrukturierte Daten, bevor eine Validierung stattfindet.
# ---------------------------------------------------------

df_raw = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv(RAW_PATH)

print("Schema der Rohdaten:")
df_raw.printSchema()

print(f"Anzahl der Zeilen im Raw Layer: {df_raw.count()}")

In [None]:
# ---------------------------------------------------------
# ZELLE 3: PHASE 2 - PROCESSING & STORAGE (SILVER LAYER)
# ---------------------------------------------------------
# Dies ist der Kern des Data Engineering (ETL):
# 1. Cleaning: Casting von Strings zu korrekten Datentypen (Double/Long).
# 2. Filter: Entfernen von Datensätzen ohne Geokoordinaten.
# 3. Storage Optimization: Wir speichern die Daten als PARQUET.
# ---------------------------------------------------------

df_cleaned = df_raw.select(
    col("time").cast(LongType()),
    col("callsign").cast(StringType()),
    col("lat").cast(DoubleType()),
    col("lon").cast(DoubleType()),
    col("velocity").cast(DoubleType()),
    col("geoaltitude").cast(DoubleType())
)
df_cleaned = df_cleaned.filter(col("lat").isNotNull() & col("lon").isNotNull())

df_silver = df_cleaned.withColumn("flight_date", to_date(from_unixtime(col("time"))))

print(f"Schreibe optimierte Daten nach: {PROCESSED_PATH} ...")
df_silver.write \
    .mode("overwrite") \
    .partitionBy("flight_date") \
    .parquet(PROCESSED_PATH)

print("ETL Erfolgreich. Daten sind jetzt im Silver Layer.")

In [None]:
# ---------------------------------------------------------
# ZELLE 4: PHASE 3 - ANALYTICS (GOLD LAYER)
# ---------------------------------------------------------
# Ab hier arbeiten wir nur noch mit den optimierten Parquet-Daten (Silver Layer).
# Wir führen eine OLAP-Aggregation durch (Group By + Count), um den
# Datensatz mit den meisten Wegpunkten (aktivster Flug) zu finden.
# Dank Parquet muss Spark hierfür nur die relevanten Spalten laden.
# ---------------------------------------------------------

df_analytics = spark.read.parquet(PROCESSED_PATH)

top_flights = df_analytics.groupBy("callsign") \
    .count() \
    .orderBy(col("count").desc())

print("Top 5 aktivste Callsigns:")
top_flights.show(5)

target_callsign = top_flights.first()["callsign"]
print(f"Wir visualisieren jetzt Flug: {target_callsign}")

In [None]:
# ---------------------------------------------------------
# ZELLE 5: PHASE 4 - VISUALISIERUNG (SERVING LAYER)
# ---------------------------------------------------------
# Schnittstelle zwischen "Big Data" (Spark) und "Small Data" (Python/Pandas).
# 1. Wir filtern auf EINEN spezifischen Flug (Reduktion der Datenmenge).
# 2. Wir nutzen .toPandas(), um die Daten vom Cluster-Speicher in den lokalen RAM zu holen.
# 3. Visualisierung der Route mittels Folium (interaktive Karte).
# ---------------------------------------------------------

import folium
import pandas as pd

flight_trace = df_analytics \
    .filter(col("callsign") == target_callsign) \
    .select("lat", "lon", "geoaltitude", "time") \
    .orderBy("time") \
    .limit(500) \
    .toPandas()

if not flight_trace.empty:
    center_lat = flight_trace["lat"].mean()
    center_lon = flight_trace["lon"].mean()
    
    m = folium.Map(location=[center_lat, center_lon], zoom_start=6)

    route_points = list(zip(flight_trace["lat"], flight_trace["lon"]))
    
    folium.PolyLine(route_points, color="red", weight=3, opacity=0.8).add_to(m)
    
    folium.Marker(route_points[0], popup="Start", icon=folium.Icon(color="green")).add_to(m)
    folium.Marker(route_points[-1], popup="Ende", icon=folium.Icon(color="blue")).add_to(m)
    
    display(m)
else:
    print("Keine Daten gefunden.")