In [37]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *

In [38]:
spark = (SparkSession.builder
    .appName("air-traffic-year-join-ml")
    .master("local[*]")
    .config("spark.driver.bindAddress", "127.0.0.1")
    .config("spark.driver.host", "127.0.0.1")
    .config("spark.ui.enabled", "false")
    .getOrCreate()
)

spark.version

'4.0.1'

In [39]:
TRAFFIC_CSV_PATH = "../DataStorageLayer/export/ExportVerkehr.csv"
AIR_CSV_PATH     = "../DataStorageLayer/export/ExportSchadstoff.csv"

CSV_SEP = ";"
HAS_HEADER = True

In [40]:
traffic_raw = (spark.read
               .option("header", str(HAS_HEADER).lower())
               .option("sep", CSV_SEP)
               .option("inferSchema", "false")
               .csv(TRAFFIC_CSV_PATH))

air_raw = (spark.read
           .option("header", str(HAS_HEADER).lower())
           .option("sep", CSV_SEP)
           .option("inferSchema", "false")
           .csv(AIR_CSV_PATH))

print("traffic rows:", traffic_raw.count(), "cols:", len(traffic_raw.columns))
print("air rows:", air_raw.count(), "cols:", len(air_raw.columns))

traffic_raw.show(5, truncate=False)
air_raw.show(5, truncate=False)

traffic rows: 33 cols: 12
air rows: 52890 cols: 11
+------------------------+-----+-----+-----+-------------+-----------------+----+-------+--------+------------+---------+-----------------------+
|_id                     |NUTS1|NUTS2|NUTS3|DISTRICT_CODE|SUB_DISTRICT_CODE|YEAR|UNIT   |REF_YEAR|ROAD_TRAFFIC|SCWR_CALC|_imported_at           |
+------------------------+-----+-----+-----+-------------+-----------------+----+-------+--------+------------+---------+-----------------------+
|695f9339440359c0b02efb73|AT1  |AT13 |AT130|90001        |0                |1990|1.000 t|2022    |2162,23     |1344,17  |2026-01-08 11:21:29.340|
|695f9339440359c0b02efb74|AT1  |AT13 |AT130|90001        |0                |1991|1.000 t|2022    |2396,25     |1386,75  |2026-01-08 11:21:29.340|
|695f9339440359c0b02efb75|AT1  |AT13 |AT130|90001        |0                |1992|1.000 t|2022    |2384,47     |1437,86  |2026-01-08 11:21:29.340|
|695f9339440359c0b02efb76|AT1  |AT13 |AT130|90001        |0              

In [41]:
traffic_raw = traffic_raw.toDF(*[c.strip() for c in traffic_raw.columns])
air_raw = air_raw.toDF(*[c.strip() for c in air_raw.columns])

print("Traffic columns:", traffic_raw.columns)
print("Air columns:", air_raw.columns)

Traffic columns: ['_id', 'NUTS1', 'NUTS2', 'NUTS3', 'DISTRICT_CODE', 'SUB_DISTRICT_CODE', 'YEAR', 'UNIT', 'REF_YEAR', 'ROAD_TRAFFIC', 'SCWR_CALC', '_imported_at']
Air columns: ['_id', 'Region', 'Schadstoff', 'Einheit', 'NFR_Code', 'Trendbericht_Sektor', 'Quelle', 'Datenstand', 'Jahr', 'Werte', '_imported_at']


In [42]:
def cast_de_number_safe(df, colname: str):
    """
    Robust: ' 215.855,04 ' -> 215855.04 (double)
    ' NA ' / '' / '-' -> NULL
    Ungültige Werte -> NULL (durch SQL try_cast)
    """
    tmp = f"__{colname}_norm"

    # 1) trim + string
    df = df.withColumn(tmp, F.trim(F.col(colname).cast("string")))

    # 2) NA/Noise -> NULL
    df = df.withColumn(
        tmp,
        F.when(
            F.col(tmp).isNull()
            | (F.col(tmp) == "")
            | (F.lower(F.col(tmp)).isin("na", "n/a", "null", "none", "-", "—")),
            F.lit(None),
        ).otherwise(F.col(tmp))
    )

    # 3) de-DE cleanup
    df = df.withColumn(tmp, F.regexp_replace(F.col(tmp), r"\.", ""))  # Tausenderpunkte raus
    df = df.withColumn(tmp, F.regexp_replace(F.col(tmp), r",", "."))  # Komma -> Punkt

    # 4) try_cast (SQL) -> ungültig wird NULL statt Fehler
    df = df.withColumn(colname, F.expr(f"try_cast({tmp} as double)")).drop(tmp)

    return df

In [43]:
# Verkehr.csv
TRAFFIC_YEAR_COL  = "YEAR"
TRAFFIC_VALUE_COL = "ROAD_TRAFFIC"

# Schadstoff.csv
AIR_REGION_COL    = "Region"
AIR_YEAR_COL      = "Jahr"
AIR_POLLUTANT_COL = "Schadstoff"
AIR_VALUE_COL     = "Werte"   # nach trimmen heißt es "Werte"

TARGET_REGION = "Wien"

In [None]:
traffic = traffic_raw
# YEAR kann auch mal Spaces haben -> trimmen + cast
traffic = traffic.withColumn(TRAFFIC_YEAR_COL, F.trim(F.col(TRAFFIC_YEAR_COL)).cast("int"))
 
# ROAD_TRAFFIC safe zu double
traffic = cast_de_number_safe(traffic, TRAFFIC_VALUE_COL)

# Verkehr ist laut dir ohnehin Wien -> Region-Spalte setzen
traffic = traffic.withColumn("Region", F.lit(TARGET_REGION))

traffic.select(TRAFFIC_YEAR_COL, "Region", TRAFFIC_VALUE_COL).show(10, truncate=False)

+----+------+------------+
|YEAR|Region|ROAD_TRAFFIC|
+----+------+------------+
|1990|Wien  |2162.23     |
|1991|Wien  |2396.25     |
|1992|Wien  |2384.47     |
|1993|Wien  |2396.73     |
|1994|Wien  |2395.62     |
|1995|Wien  |2422.97     |
|1996|Wien  |2646.42     |
|1997|Wien  |2495.57     |
|1998|Wien  |2789.61     |
|1999|Wien  |2689.96     |
+----+------+------------+
only showing top 10 rows


In [45]:
air = air_raw

air = air.withColumn(AIR_YEAR_COL, F.col(AIR_YEAR_COL).cast("int"))
air = air.filter(F.trim(F.col(AIR_REGION_COL)) == TARGET_REGION)

air = cast_de_number_safe(air, AIR_VALUE_COL)

air.select(AIR_REGION_COL, AIR_YEAR_COL, AIR_POLLUTANT_COL, AIR_VALUE_COL).show(10, truncate=False)

+------+----+----------+--------+
|Region|Jahr|Schadstoff|Werte   |
+------+----+----------+--------+
|Wien  |1990|NOX       |28240.89|
|Wien  |1991|NOX       |28419.92|
|Wien  |1992|NOX       |27484.44|
|Wien  |1993|NOX       |24922.14|
|Wien  |1994|NOX       |23045.81|
|Wien  |1995|NOX       |22544.01|
|Wien  |1996|NOX       |24875.68|
|Wien  |1997|NOX       |22953.32|
|Wien  |1998|NOX       |24639.01|
|Wien  |1999|NOX       |23612.15|
+------+----+----------+--------+
only showing top 10 rows


In [46]:
# Verkehr pro Jahr (falls mehrere Zeilen pro Jahr vorhanden sind: avg)
traffic_year = (traffic
    .groupBy(TRAFFIC_YEAR_COL, "Region")
    .agg(F.avg(F.col(TRAFFIC_VALUE_COL)).alias("traffic_road_traffic_avg"))
)

traffic_year.orderBy(TRAFFIC_YEAR_COL).show(30, truncate=False)

+----+------+------------------------+
|YEAR|Region|traffic_road_traffic_avg|
+----+------+------------------------+
|1990|Wien  |2162.23                 |
|1991|Wien  |2396.25                 |
|1992|Wien  |2384.47                 |
|1993|Wien  |2396.73                 |
|1994|Wien  |2395.62                 |
|1995|Wien  |2422.97                 |
|1996|Wien  |2646.42                 |
|1997|Wien  |2495.57                 |
|1998|Wien  |2789.61                 |
|1999|Wien  |2689.96                 |
|2000|Wien  |2814.25                 |
|2001|Wien  |3016.53                 |
|2002|Wien  |3345.62                 |
|2003|Wien  |3607.81                 |
|2004|Wien  |3688.71                 |
|2005|Wien  |3740.25                 |
|2006|Wien  |3518.52                 |
|2007|Wien  |3515.83                 |
|2008|Wien  |3283.44                 |
|2009|Wien  |3159.47                 |
|2010|Wien  |3232.48                 |
|2011|Wien  |3108.74                 |
|2012|Wien  |3062.89     

In [47]:
# Schadstoffe pro Jahr & Schadstoff (avg)
air_year = (air
    .groupBy(AIR_YEAR_COL, AIR_POLLUTANT_COL)
    .agg(F.avg(F.col(AIR_VALUE_COL)).alias("poll_value_avg"))
)

air_year.orderBy(AIR_YEAR_COL, AIR_POLLUTANT_COL).show(30, truncate=False)

+----+----------+------------------+
|Jahr|Schadstoff|poll_value_avg    |
+----+----------+------------------+
|1990|NH3       |117.24            |
|1990|NMVOC     |10016.954285714284|
|1990|NOX       |8068.825714285716 |
|1990|PM2.5     |585.3585714285715 |
|1990|SO2       |2534.442857142857 |
|1991|NH3       |141.3457142857143 |
|1991|NMVOC     |9911.905714285715 |
|1991|NOX       |8119.975714285714 |
|1991|PM2.5     |NULL              |
|1991|SO2       |2687.5971428571424|
|1992|NH3       |149.8957142857143 |
|1992|NMVOC     |9045.205714285714 |
|1992|NOX       |7852.697142857142 |
|1992|PM2.5     |NULL              |
|1992|SO2       |1366.53           |
|1993|NH3       |161.6357142857143 |
|1993|NMVOC     |8280.477142857144 |
|1993|NOX       |7120.611428571427 |
|1993|PM2.5     |NULL              |
|1993|SO2       |1278.925714285714 |
|1994|NH3       |179.02285714285716|
|1994|NMVOC     |7619.9299999999985|
|1994|NOX       |6584.518571428571 |
|1994|PM2.5     |NULL              |
|

In [48]:
air_year_pivot = (air_year
    .groupBy(AIR_YEAR_COL)
    .pivot(AIR_POLLUTANT_COL)
    .agg(F.first("poll_value_avg"))
)

air_year_pivot.orderBy(AIR_YEAR_COL).show(30, truncate=False)
print("Pivot columns:", air_year_pivot.columns)

+----+------------------+------------------+------------------+------------------+------------------+
|Jahr|NH3               |NMVOC             |NOX               |PM2.5             |SO2               |
+----+------------------+------------------+------------------+------------------+------------------+
|1990|117.24            |10016.954285714284|8068.825714285716 |585.3585714285715 |2534.442857142857 |
|1991|141.3457142857143 |9911.905714285715 |8119.975714285714 |NULL              |2687.5971428571424|
|1992|149.8957142857143 |9045.205714285714 |7852.697142857142 |NULL              |1366.53           |
|1993|161.6357142857143 |8280.477142857144 |7120.611428571427 |NULL              |1278.925714285714 |
|1994|179.02285714285716|7619.9299999999985|6584.518571428571 |NULL              |1091.5914285714287|
|1995|187.27428571428572|7000.300000000001 |6441.145714285714 |577.6457142857141 |1064.7757142857142|
|1996|190.42428571428576|6534.689999999998 |7107.337142857144 |NULL              |

In [49]:
joined = (air_year_pivot
    .join(
        traffic_year.withColumnRenamed(TRAFFIC_YEAR_COL, "YEAR_join"),
        air_year_pivot[AIR_YEAR_COL] == F.col("YEAR_join"),
        how="left"
    )
    .drop("YEAR_join")
)

joined.orderBy(AIR_YEAR_COL).show(50, truncate=False)

+----+------------------+------------------+------------------+------------------+------------------+------+------------------------+
|Jahr|NH3               |NMVOC             |NOX               |PM2.5             |SO2               |Region|traffic_road_traffic_avg|
+----+------------------+------------------+------------------+------------------+------------------+------+------------------------+
|1990|117.24            |10016.954285714284|8068.825714285716 |585.3585714285715 |2534.442857142857 |Wien  |2162.23                 |
|1991|141.3457142857143 |9911.905714285715 |8119.975714285714 |NULL              |2687.5971428571424|Wien  |2396.25                 |
|1992|149.8957142857143 |9045.205714285714 |7852.697142857142 |NULL              |1366.53           |Wien  |2384.47                 |
|1993|161.6357142857143 |8280.477142857144 |7120.611428571427 |NULL              |1278.925714285714 |Wien  |2396.73                 |
|1994|179.02285714285716|7619.9299999999985|6584.518571428571 

In [50]:
# Jahre in Schadstoffen (Wien) ohne Verkehrseintrag
missing_traffic_years = (joined
    .filter(F.col("traffic_road_traffic_avg").isNull())
    .select(AIR_YEAR_COL)
    .orderBy(AIR_YEAR_COL))

missing_traffic_years.show(200, truncate=False)

+----+
|Jahr|
+----+
|2023|
|2024|
|2025|
|2026|
|2027|
|2028|
|2029|
|2030|
+----+



In [51]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor, LinearRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator

In [69]:
# ---------------------------
# HIER ANPASSEN
# ---------------------------

# Beispiel: setze hier einen existierenden Schadstoff-Spaltennamen aus dem Pivot:
# LABEL_COL = # oder "SO2", "NMVOC", "NH3", "PM2_5"
LABEL_COL = "NOX"

# Features: fürs Erste nur der Traffic (du kannst später weitere Features ergänzen)
FEATURE_COLS = ["traffic_road_traffic_avg", AIR_YEAR_COL]

# Option 1: fehlende Traffic-Werte auf 0 setzen (nur fürs schnelle Testen!)
IMPUTE_MISSING_TRAFFIC_WITH_ZERO = True

In [70]:
if IMPUTE_MISSING_TRAFFIC_WITH_ZERO:
    model_df = joined.fillna({"traffic_road_traffic_avg": 0.0})
else:
    model_df = joined

# Wenn du schon joined hast:
if "PM2.5" in joined.columns:
    joined = joined.withColumnRenamed("PM2.5", "PM2_5")

print("Spalten im joined:", model_df.columns)

Spalten im joined: ['Jahr', 'NH3', 'NMVOC', 'NOX', 'PM2_5', 'SO2', 'Region', 'traffic_road_traffic_avg']


In [71]:
if LABEL_COL is None:
    print("Bitte LABEL_COL setzen. Verfügbare Schadstoff-Spalten (Pivot):")
    # Pivot-Spalten sind alle außer Jahr + traffic feature
    pivot_cols = [c for c in model_df.columns if c not in [AIR_YEAR_COL, "traffic_road_traffic_avg", "Region"]]
    print(pivot_cols)
else:
    cols = [AIR_YEAR_COL, LABEL_COL] + FEATURE_COLS
    data = model_df.select(*cols).dropna(subset=[LABEL_COL] + FEATURE_COLS)

    train, test = data.randomSplit([0.8, 0.2], seed=42)

    assembler = VectorAssembler(inputCols=FEATURE_COLS, outputCol="features")
    model = RandomForestRegressor(featuresCol="features", labelCol=LABEL_COL, numTrees=200, maxDepth=8)

    pipeline = Pipeline(stages=[assembler, model])
    fitted = pipeline.fit(train)

    preds = fitted.transform(test)

    rmse = RegressionEvaluator(labelCol=LABEL_COL, predictionCol="prediction", metricName="rmse").evaluate(preds)
    r2 = RegressionEvaluator(labelCol=LABEL_COL, predictionCol="prediction", metricName="r2").evaluate(preds)

    print("RMSE:", rmse)
    print("R2:", r2)

    preds.select(AIR_YEAR_COL, LABEL_COL, "prediction").orderBy(AIR_YEAR_COL).show(200, truncate=False)

AnalysisException: [AMBIGUOUS_COLUMN_OR_FIELD] Column or field `Jahr` is ambiguous and has 2 matches. SQLSTATE: 42702

In [73]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator

LABEL_COL = "NOX"
FEATURE_COLS = ["traffic_road_traffic_avg", "year_feature"]

model_df = joined.withColumn("year_feature", F.col("Jahr").cast("double"))

cols = ["Jahr", LABEL_COL] + FEATURE_COLS
data = model_df.select(*cols).dropna(subset=[LABEL_COL] + FEATURE_COLS)

train, test = data.randomSplit([0.8, 0.2], seed=42)

assembler = VectorAssembler(inputCols=FEATURE_COLS, outputCol="features")
model = RandomForestRegressor(featuresCol="features", labelCol=LABEL_COL, numTrees=200, maxDepth=8)

pipeline = Pipeline(stages=[assembler, model])
fitted = pipeline.fit(train)

preds = fitted.transform(test)

rmse = RegressionEvaluator(labelCol=LABEL_COL, predictionCol="prediction", metricName="rmse").evaluate(preds)
r2 = RegressionEvaluator(labelCol=LABEL_COL, predictionCol="prediction", metricName="r2").evaluate(preds)

print("RMSE:", rmse)
print("R2:", r2)

preds.select("Jahr", LABEL_COL, "prediction").orderBy("Jahr").show(200, truncate=False)


RMSE: 400.8001065693109
R2: 0.9068242535709177
+----+------------------+------------------+
|Jahr|NOX               |prediction        |
+----+------------------+------------------+
|1992|7852.697142857142 |7506.0829071428425|
|1996|7107.337142857144 |6640.1187785714155|
|1998|7039.717142857143 |6656.407249999989 |
|2003|8161.374285714286 |8098.179778571419 |
|2009|6556.835714285714 |6526.15471190474  |
|2013|5777.665714285715 |5676.9191261904725|
|2019|3965.2000000000007|4755.1616642857   |
+----+------------------+------------------+



In [74]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator

FEATURE_COLS = ["traffic_road_traffic_avg", "year_feature"]
pollutants = ["NOX", "SO2", "NMVOC", "NH3", "PM2_5"]

results = []

for label in pollutants:
    data = model_df.select("Jahr", label, *FEATURE_COLS).dropna(subset=[label] + FEATURE_COLS)
    
    if data.count() < 10:
        print(f"⚠️ {label}: zu wenig Daten")
        continue
    
    train, test = data.randomSplit([0.8, 0.2], seed=42)

    assembler = VectorAssembler(inputCols=FEATURE_COLS, outputCol="features")
    model = RandomForestRegressor(
        featuresCol="features",
        labelCol=label,
        numTrees=200,
        maxDepth=8,
        seed=42
    )

    pipe = Pipeline(stages=[assembler, model])
    fitted = pipe.fit(train)
    preds = fitted.transform(test)

    rmse = RegressionEvaluator(labelCol=label, predictionCol="prediction", metricName="rmse").evaluate(preds)
    r2 = RegressionEvaluator(labelCol=label, predictionCol="prediction", metricName="r2").evaluate(preds)

    results.append((label, rmse, r2))
    print(f"{label}: RMSE={rmse:.2f}, R2={r2:.3f}")


NOX: RMSE=410.56, R2=0.902
SO2: RMSE=221.97, R2=0.746
NMVOC: RMSE=540.71, R2=0.947
NH3: RMSE=9.93, R2=0.924
PM2_5: RMSE=38.62, R2=0.899
