Verbesserungsideen:
1. Zwischendateien eliminieren
2. Airport Datei filtern auf Continent Europa und Type nur small, medium, large airport
3. time ohne zeitzonen Informationen speichern

1. Sortieren der Input Datei

In [1]:
import pandas as pd

# CSV-Datei laden
input_file = '/Users/jankozirkelbach/Library/CloudStorage/GoogleDrive-janko.zirkelbach@hotmail.com/Meine Ablage/Master/Advanced Data Engineering/Projekt/20230126_merged.csv'
output_file = '/Users/jankozirkelbach/Library/CloudStorage/GoogleDrive-janko.zirkelbach@hotmail.com/Meine Ablage/Master/Advanced Data Engineering/Projekt/20230126_sorted.csv'

# Nur die relevanten Spalten laden und die Zeitzonen entfernen
df = pd.read_csv(input_file, usecols=['time', 'icao24', 'lat', 'lon', 'onground'])

# 'time' Spalte in ein Datum-Zeit-Objekt konvertieren und Zeitzoneninformationen entfernen
df['time'] = pd.to_datetime(df['time']).dt.tz_localize(None)

# Nach 'time' aufsteigend sortieren
df = df.sort_values(by='time')

# Neue Datei speichern
df.to_csv(output_file, index=False)

print(f"Die bereinigte Datei wurde unter '{output_file}' gespeichert.")

Die bereinigte Datei wurde unter '/Users/jankozirkelbach/Library/CloudStorage/GoogleDrive-janko.zirkelbach@hotmail.com/Meine Ablage/Master/Advanced Data Engineering/Projekt/20230126_sorted.csv' gespeichert.


2. Auswertung der Start und Landezeiten

In [2]:
import findspark
findspark.init()
import pyspark

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lead, lag, unix_timestamp, lit, concat, floor, row_number
from pyspark.sql.window import Window

# Spark-Session erstellen
spark = SparkSession.builder.appName("StartsUndLandungen").getOrCreate()

# Eingabedatei und Ausgabedatei
input_file = '/Users/jankozirkelbach/Library/CloudStorage/GoogleDrive-janko.zirkelbach@hotmail.com/Meine Ablage/Master/Advanced Data Engineering/Projekt/20230126_sorted.csv'
output_file = '/Users/jankozirkelbach/Library/CloudStorage/GoogleDrive-janko.zirkelbach@hotmail.com/Meine Ablage/Master/Advanced Data Engineering/Projekt/Starts_und_Landungen_pyspark.csv'

# Alle CSV-Dateien laden
df = spark.read.csv(input_file, header=True, inferSchema=True)

# Sicherstellen, dass die Spalte 'time' als Timestamp erkannt wird
df = df.withColumn("time", col("time").cast("timestamp"))

# Fensterfunktion für ICAO24, sortiert nach Zeit
window_icao24 = Window.partitionBy("icao24").orderBy("time")

# Lag und Lead berechnen
df = df.withColumn("prev_onground", lag("onground").over(window_icao24))
df = df.withColumn("next_onground", lead("onground").over(window_icao24))

# Landungen identifizieren
landungen_df = df.filter((col("prev_onground") == 0) & (col("onground") == 1)).select(
    col("icao24").alias("landung_icao24"),
    col("time").alias("landung_time"),
    col("lat").alias("landung_lat"),
    col("lon").alias("landung_lon")
)

# Starts identifizieren
starts_df = df.filter((col("onground") == 0) & (col("prev_onground") == 1)).select(
    col("icao24").alias("start_icao24"),
    col("time").alias("start_time"),
    col("lat").alias("start_lat"),
    col("lon").alias("start_lon")
)

# Verknüpfen von Landungen und Starts
joined_df = landungen_df.crossJoin(starts_df).filter(
    (col("landung_icao24") == col("start_icao24")) &  # Gleiche Flugzeugkennung
    (col("start_time") > col("landung_time"))         # Start nach Landung
)

# Berechnung der Zeitdifferenz
joined_df = joined_df.withColumn(
    "time_difference",
    unix_timestamp("start_time") - unix_timestamp("landung_time")
)

# Fensterfunktion für die nächste Startzeit nach jeder Landung
window_landung = Window.partitionBy("landung_icao24", "landung_time").orderBy("time_difference")

# Bestimme den nächsten Startzeitpunkt
joined_df = joined_df.withColumn("row_number", row_number().over(window_landung)).filter(
    col("row_number") == 1  # Nur den nächsten Start auswählen
).drop("row_number")

# Berechnung der Zeit am Boden
final_df = joined_df.withColumn(
    "time_am_boden_seconds",
    unix_timestamp("start_time") - unix_timestamp("landung_time")
).withColumn(
    "time_am_boden_minutes",
    floor(col("time_am_boden_seconds") / 60)
).withColumn(
    "time_am_boden_remaining_seconds",
    col("time_am_boden_seconds") % 60
).withColumn(
    "time_am_boden",
    concat(
        col("time_am_boden_minutes").cast("string"), lit("m "),
        col("time_am_boden_remaining_seconds").cast("string"), lit("s")
    )
)

# Ergebnis-Datenrahmen mit relevanten Spalten
result_df = final_df.select(
    col("landung_icao24").alias("icao24"),
    "landung_time",
    "landung_lat",
    "landung_lon",
    "start_time",
    "start_lat",
    "start_lon",
    "time_am_boden"
)

# Ergebnisse in einer einzigen Datei speichern
result_df.coalesce(1).write.csv(output_file, header=True, mode="overwrite")

print(f"Die Analyse wurde abgeschlossen und alle Ergebnisse wurden in '{output_file}' gespeichert.")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/26 14:31:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


CodeCache: size=131072Kb used=23273Kb max_used=23494Kb free=107798Kb
 bounds [0x000000010a9f8000, 0x000000010c118000, 0x00000001129f8000]
 total_blobs=9410 nmethods=8488 adapters=835
 compilation: disabled (not enough contiguous free space left)


[Stage 4:>                                                          (0 + 1) / 1]

Die Analyse wurde abgeschlossen und alle Ergebnisse wurden in '/Users/jankozirkelbach/Library/CloudStorage/GoogleDrive-janko.zirkelbach@hotmail.com/Meine Ablage/Master/Advanced Data Engineering/Projekt/Starts_und_Landungen_pyspark.csv' gespeichert.


                                                                                

3. Zuordnung zu Flugplätzen

In [4]:
from pyspark.sql.functions import col, lit, row_number, radians, sin, cos, sqrt, atan2

# Datei mit Flugdaten einlesen
flugdaten_file = "/Users/jankozirkelbach/Library/CloudStorage/GoogleDrive-janko.zirkelbach@hotmail.com/Meine Ablage/Master/Advanced Data Engineering/Projekt/Starts_und_Landungen_pyspark.csv/part-00000-a0720a09-afb5-4a5a-b624-5cc80df23b81-c000.csv"
result_df = spark.read.csv(flugdaten_file, header=True, inferSchema=True)

# Datei mit Flughafendaten einlesen
airport_file = "/Users/jankozirkelbach/Library/CloudStorage/GoogleDrive-janko.zirkelbach@hotmail.com/Meine Ablage/Master/Advanced Data Engineering/Projekt/airports.csv"
airports_df = spark.read.csv(airport_file, header=True, inferSchema=True)

# Flughafendaten vorbereiten: Spalten korrekt benennen
airports_df = airports_df.withColumnRenamed("latitude_deg", "airport_lat") \
                         .withColumnRenamed("longitude_deg", "airport_lon")

# Haversine-Funktion definieren
def haversine(lat1, lon1, lat2, lon2):
    R = 6371  # Erdradius in km
    dlat = radians(lat2 - lat1)
    dlon = radians(lon2 - lon1)
    a = sin(dlat / 2) ** 2 + cos(radians(lat1)) * cos(radians(lat2)) * sin(dlon / 2) ** 2
    c = 2 * atan2(sqrt(a), sqrt(1 - a))
    return R * c

# Kreuzprodukt zwischen Flugdaten und Flughafendaten
expanded_df = result_df.crossJoin(airports_df)

# Entfernung berechnen
expanded_df = expanded_df.withColumn(
    "distance_to_airport",
    haversine(
        col("landung_lat"), col("landung_lon"),
        col("airport_lat"), col("airport_lon")
    )
)

# Filter auf Umkreis von 10 km
filtered_df = expanded_df.filter(col("distance_to_airport") <= 10)

# Nächsten Flughafen finden
window = Window.partitionBy("icao24", "landung_time").orderBy("distance_to_airport")

nearest_airport_df = filtered_df.withColumn("row_number", row_number().over(window)) \
                                .filter(col("row_number") == 1) \
                                .select(
                                    "icao24",
                                    "landung_time",
                                    "landung_lat",
                                    "landung_lon",
                                    "start_time",
                                    "start_lat",
                                    "start_lon",
                                    "time_am_boden",
                                    "name",  # Flughafenname
                                    "distance_to_airport",
                                    "type"
                                )

# Ergebnisse anzeigen oder speichern
nearest_airport_df.write.csv("/Users/jankozirkelbach/Library/CloudStorage/GoogleDrive-janko.zirkelbach@hotmail.com/Meine Ablage/Master/Advanced Data Engineering/Projekt/result_with_nearest_airports.csv", header=True, mode="overwrite")


                                                                                