In [None]:
%%info

In [None]:
print(f'Start Spark name:{spark._sc.appName}, version:{spark.version}')

In [None]:
username = spark.conf.get("spark.executorEnv.USERNAME", "default_value")
username

### Data Retrieval

In [None]:
import json
import math
import os
import pandas as pd

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import udf, count, lag, col, to_timestamp, concat_ws, lit, lpad
from pyspark.sql.functions import col, to_date, to_timestamp, year, month, dayofmonth, dayofweek, expr, when, row_number, count
from pyspark.sql.types import BooleanType, IntegerType, DoubleType

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

In [None]:
from pyspark import SparkContext
sc = SparkContext.getOrCreate()

# List files in the directory
files = sc._jvm.org.apache.hadoop.fs.FileSystem.get(sc._jsc.hadoopConfiguration()) \
    .listStatus(sc._jvm.org.apache.hadoop.fs.Path('/data/geo/json'))
for file_status in files:
    print(file_status.getPath())

## Istdaten Processing

### 1) Select Relevant Columns and Rename for Convenience

In [None]:
istdaten_df = spark.read.orc('/data/sbb/orc/istdaten/year=2023')

istdaten_df = istdaten_df.select([
      istdaten_df['betriebstag'].alias('date'),
      istdaten_df['fahrt_bezeichner'].alias('trip_id'),
      istdaten_df['betreiber_id'].alias('operator_id'),
      istdaten_df['bpuic'].alias('stop_id'),
      istdaten_df['produkt_id'].alias('transportation'),
      istdaten_df['faellt_aus_tf'].alias('cancelled'),
      istdaten_df['ankunftszeit'].alias('arrival_time'),
      istdaten_df['an_prognose'].alias('arrival_prognosis'),
      istdaten_df['an_prognose_status'].alias('arr_prog_status'),
      istdaten_df['abfahrtszeit'].alias('departure_time'),
      istdaten_df['ab_prognose'].alias('departure_prognosis'),
      istdaten_df['ab_prognose_status'].alias('dep_prog_status'),
])

stops_df = spark.read.csv('/data/sbb/csv/timetables/stops', header=True)\
                .select(['stop_id','stop_lat', 'stop_lon'])\
                .drop(*['year','month','day'])\
                .dropDuplicates(['stop_id'])\
                .withColumnRenamed("stop_id", "stops_stop_id")\
                .withColumn("stop_lat", col("stop_lat").cast(DoubleType()))\
                .withColumn("stop_lon", col("stop_lon").cast(DoubleType()))

istdaten_df = istdaten_df.join(
    stops_df,
    istdaten_df.stop_id == stops_df.stops_stop_id,
    "left"
).drop('stops_stop_id')

istdaten_df.show(5)
istdaten_df.printSchema()
istdaten_df.count()

### 2) Filtering and Parsing

In [None]:
distinct_arr_prog_status = istdaten_df.select("arr_prog_status").distinct().collect()
distinct_arr_prog_status_list = [row.arr_prog_status for row in distinct_arr_prog_status]
print(distinct_arr_prog_status_list)

In [None]:
format_date = "dd.MM.yyyy"
format_timetable = "dd.MM.yyyy HH:mm"
format_prognosis = "dd.MM.yyyy HH:mm:ss"

istdaten_df = istdaten_df.filter(istdaten_df.arr_prog_status=='REAL')\
                         .withColumn("date", to_timestamp(col("date"), format_date))\
                         .withColumn("arrival_time", to_timestamp(col("arrival_time"), format_timetable))\
                         .withColumn("arrival_prognosis", to_timestamp(col("arrival_prognosis"), format_prognosis))\
                         .withColumn("departure_time", to_timestamp(col("departure_time"), format_timetable))\
                         .withColumn("departure_prognosis", to_timestamp(col("departure_prognosis"), format_prognosis))

istdaten_df = istdaten_df.filter((col("arr_prog_status") != "") & (col("arr_prog_status") != "UNBEKANNT") & (col('cancelled') == 'false'))

istdaten_df.show(5)
istdaten_df.printSchema()
istdaten_df.count()


In [None]:
hdfs_path = f"/user/{username}/stop_matching.csv"
stop_in_region = spark.read.csv(hdfs_path, header="True")
stop_in_region.show(5)

In [None]:
stop_ids_to_filter = stop_in_region.select("isdaten_stop_id").distinct().rdd.flatMap(lambda x: x).collect()

istdaten_df = istdaten_df.filter(istdaten_df.stop_id.isin(stop_ids_to_filter))

istdaten_df.printSchema()
istdaten_df.count()

### 3) Istdaten Feature Engineering

In [None]:
istdaten_df = istdaten_df.withColumn("delay_arrival", (col("arrival_prognosis").cast("long") - col("arrival_time").cast("long")))\
                         .withColumn("delay_departure", (col("departure_prognosis").cast("long") - col("departure_time").cast("long")))\
                         .withColumn("delay_at_station", (col("delay_departure") - col("delay_arrival")))\
                         .withColumn("year", year(col("date")))\
                         .withColumn("month", month(col("date")))\
                         .withColumn("day", dayofmonth(col("date")))\
                         .withColumn("day_of_week", (dayofweek(col("date")) + 5) % 7)\
                         .withColumn("is_weekday", (col("day_of_week") <= 4))\
                         .withColumn("is_weekend", (col("day_of_week") > 4))\
                         .withColumn("is_peak_time",
                                     when((col("is_weekday")) & 
                                          ((col("arrival_time").between(expr("make_timestamp(year(date), month(date), day(date), 6, 30, 0)"), 
                                                                        expr("make_timestamp(year(date), month(date), day(date), 8, 30, 0)"))) |
                                           (col("arrival_time").between(expr("make_timestamp(year(date), month(date), day(date), 16, 30, 0)"), 
                                                                        expr("make_timestamp(year(date), month(date), day(date), 18, 30, 0)")))), True)
                                     .otherwise(False))
istdaten_df.show(5)
istdaten_df.printSchema()
istdaten_df.count()

### 4) Additional Features

In [None]:
# Haversine formula to calculate the distance between two points on the earth
def haversine(lat1, lon1, lat2, lon2):
    if lat1 is None or lon1 is None or lat2 is None or lon2 is None:
        return None
    R = 6371  # Radius of the Earth in kilometers
    dlat = math.radians(lat2 - lat1)
    dlon = math.radians(lon2 - lon1)
    a = math.sin(dlat / 2) * math.sin(dlat / 2) + math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) * math.sin(dlon / 2) * math.sin(dlon / 2)
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
    distance = R * c  # Distance in kilometers
    return distance

# Register the UDF
haversine_udf = udf(haversine, DoubleType())

In [None]:
# Define the window spec for calculating trip_stop_seq with year, month, and day
trip_stop_seq_window = Window.partitionBy("trip_id", "year", "month", "day").orderBy("arrival_time")
istdaten_df = istdaten_df.withColumn("trip_stop_seq", row_number().over(trip_stop_seq_window))

# Define the window spec for calculating trip_num_stops with year, month, and day
trip_num_stops_window = Window.partitionBy("trip_id", "year", "month", "day")
istdaten_df = istdaten_df.withColumn("trip_num_stops", count("stop_id").over(trip_num_stops_window))

# Add columns for the previous stop_id and departure_time
window_spec_seq = Window.partitionBy("trip_id", "year", "month", "day").orderBy("trip_stop_seq")
istdaten_df = istdaten_df.withColumn("prev_stop_id", lag("stop_id", 1).over(window_spec_seq))\
                         .withColumn("prev_departure_time", lag("departure_time", 1).over(window_spec_seq))\
                         .withColumn("prev_departure_prognosis", lag("departure_prognosis", 1).over(window_spec_seq))\
                         .withColumn("prev_stop_lat", lag("stop_lat", 1).over(window_spec_seq))\
                         .withColumn("prev_stop_lon", lag("stop_lon", 1).over(window_spec_seq))\
                         .withColumn("scheduled_traveling_time_min", ((col("arrival_time").cast("long") - col("prev_departure_time").cast("long"))) / 60)\
                         .withColumn("actual_traveling_time_min", ((col("arrival_prognosis").cast("long") - col("prev_departure_prognosis").cast("long"))) / 60)\
                         .withColumn("traveling_distance_km", haversine_udf(col("prev_stop_lat"), col("prev_stop_lon"),col("stop_lat"), col("stop_lon")))

# Define a window spec ordered by trip identifier
windowSpec = Window.partitionBy("trip_id").orderBy("departure_time") 

# Create columns for previous stop departure time and prognose
istdaten_df = istdaten_df.withColumn("prev_delay", lag("delay_arrival", 1).over(windowSpec))

istdaten_df.show(5)
istdaten_df.printSchema()
istdaten_df.count()

### Weather Data [Optional]

In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

# Calculate the distance between each stop and each weather station
wstations_df = spark.read.csv('/data/wunderground/csv/stations', header=True)
distance_df = stops_df.crossJoin(wstations_df).withColumn(
    "distance",
    haversine_udf(col("stop_lat"),col("stop_lon"), col("lat_wgs84").cast(DoubleType()), col("lon_wgs84").cast(DoubleType()))
)

# Using window function to find the nearest station
windowSpec = Window.partitionBy("stops_stop_id").orderBy("distance")
nearest_stations = distance_df.withColumn("rank", row_number().over(windowSpec)) \
                              .filter(col("rank") == 1) \
                              .drop("rank")\
                              .select(['stops_stop_id', 'site', 'lat_wgs84', 'lon_wgs84', 'distance'])

# Show the results
nearest_stations.show(5)
nearest_stations.printSchema()
nearest_stations.count()

In [None]:
from pyspark.sql.functions import unix_timestamp, to_timestamp, round, from_unixtime, hour
from pyspark.sql.functions import broadcast

weather_data_df = spark.read.orc('/data/share/weather_win3h_avg_df.orc')

# Merge weather data with station coordinates
weather_stations_df = weather_data_df.join(
    nearest_stations,
    weather_data_df.site == nearest_stations.site,
    "inner"
)

# Select necessary columns including coordinates in WGS84 which will help in geographical matching
weather_stations_df = weather_stations_df.select(
    "valid_time_gmt", "temp", "avg_temp", "lat_wgs84", "lon_wgs84", "stops_stop_id"
).withColumn("weather_hour", (col("valid_time_gmt") / 3600).cast("integer") * 3600)

istdaten_df = istdaten_df.withColumn("arrival_unix", unix_timestamp(to_timestamp(col("arrival_time"))))\
                         .withColumn("arrival_unix_hour", (col("arrival_unix") / 3600).cast("integer") * 3600)

istdaten_df = istdaten_df.join(
    weather_stations_df,
    (istdaten_df.stop_id == weather_stations_df.stops_stop_id) &
    (istdaten_df.arrival_unix_hour == weather_stations_df.weather_hour),
    "left_outer"  # left_outer to keep transport data regardless of match
)

istdaten_df.show(5)
istdaten_df.printSchema()
istdaten_df.count()

### Feature Selection

In [None]:
from pyspark.sql.functions import hour

cols_to_keep = ['year' ,'month' ,'day' ,'day_of_week' ,'is_weekday' ,'is_weekend','is_peak_time', 
 'trip_id', 'stop_id','prev_stop_id','transportation', 'arrival_time', 'stop_lat','stop_lon', 'prev_stop_lat', 'prev_stop_lon','delay_arrival', 'delay_at_station',
'trip_stop_seq','trip_num_stops','scheduled_traveling_time_min','actual_traveling_time_min','traveling_distance_km','prev_delay','temp','avg_temp','lat_wgs84','lon_wgs84']

features = istdaten_df.select(cols_to_keep)\
                      .withColumn("arrival_hour", hour(col("arrival_time")))\

features.show(5)
features.printSchema()
features.count()

In [None]:
from pyspark.sql import functions as F

# Compute mean and std deviation for relevant columns per stop_id
statistics_df = features.groupBy('stop_id').agg(
    F.mean('delay_arrival').alias('station_mean_delay_arrival'),
    F.stddev('delay_arrival').alias('station_stddev_delay_arrival'),
    F.mean('scheduled_traveling_time_min').alias('station_mean_scheduled_traveling_time_min'),
    F.stddev('scheduled_traveling_time_min').alias('station_stddev_scheduled_traveling_time_min'),
    F.mean('actual_traveling_time_min').alias('station_mean_actual_traveling_time_min'),
    F.stddev('actual_traveling_time_min').alias('station_stddev_actual_traveling_time_min'),
    F.mean('traveling_distance_km').alias('station_mean_traveling_distance_km'),
    F.stddev('traveling_distance_km').alias('station_stddev_traveling_distance_km')
).withColumn(
    'mean_traveling_time_delta', 
    F.col('station_mean_actual_traveling_time_min') - F.col('station_mean_scheduled_traveling_time_min')
)

# Join the statistics for current stop
features_with_stats = features.join(statistics_df, features.stop_id == statistics_df.stop_id, how='left').drop(statistics_df.stop_id)

# Prepare statistics for previous stop with renamed columns
prev_statistics_df = statistics_df.select(*[F.col(x).alias('prev_' + x) for x in statistics_df.columns])

# Join the statistics for previous stop
features_with_stats = features_with_stats.join(prev_statistics_df, features_with_stats.prev_stop_id == prev_statistics_df.prev_stop_id, how='left').drop(prev_statistics_df.prev_stop_id)

# Show the result
features_with_stats.printSchema()
features_with_stats.count()


In [None]:
from pyspark.sql import functions as F

# Step 1: Aggregate on [stop_id, prev_stop_id]
edge_stats_df = features.groupBy('stop_id', 'prev_stop_id').agg(
    F.mean('scheduled_traveling_time_min').alias('edge_mean_scheduled_traveling_time_min'),
    F.stddev('scheduled_traveling_time_min').alias('edge_stddev_scheduled_traveling_time_min'),
    F.mean('actual_traveling_time_min').alias('edge_mean_actual_traveling_time_min'),
    F.stddev('actual_traveling_time_min').alias('edge_stddev_actual_traveling_time_min')
)

# Step 2: Join the computed statistics back to the original dataframe
features_with_edge_stats = features_with_stats.join(
    edge_stats_df, 
    (features_with_stats.stop_id == edge_stats_df.stop_id) & (features_with_stats.prev_stop_id == edge_stats_df.prev_stop_id), 
    how='left'
).drop(edge_stats_df.stop_id).drop(edge_stats_df.prev_stop_id)

# Optional: Show the results and schema to verify correctness
features_with_edge_stats.show(5)
features_with_stats.count()

### Null value processing

In [None]:
# Handle NaNs for categorical and numerical columns
categorical_cols = ['transportation']
features_with_edge_stats = features_with_edge_stats.na.fill("unknown", subset=categorical_cols)
features_with_edge_stats = features_with_edge_stats.na.replace("", "unknown", subset=categorical_cols)

boolean_cols = ["is_weekday", "is_peak_time"]
for col_name in boolean_cols:
    features_with_edge_stats = features_with_edge_stats.withColumn(col_name, when(col(col_name), 1).otherwise(0))

numerical_cols = ['delay_arrival','arrival_hour','day','month','year','day_of_week','stop_lat','stop_lon','prev_stop_lat','prev_stop_lon','trip_stop_seq','trip_num_stops',
 'scheduled_traveling_time_min','traveling_distance_km','edge_mean_scheduled_traveling_time_min','edge_stddev_scheduled_traveling_time_min','edge_mean_actual_traveling_time_min',
 'edge_stddev_actual_traveling_time_min','station_mean_delay_arrival','station_stddev_delay_arrival','prev_station_mean_delay_arrival','prev_station_stddev_delay_arrival',
 'actual_traveling_time_min','traveling_distance_km','delay_arrival','temp','avg_temp']

features_with_edge_stats = features_with_edge_stats.na.fill(0, subset=numerical_cols)
features_with_edge_stats = features_with_edge_stats.na.fill(0, subset=['delay_arrival'])

In [None]:
# save the data
hdfs_path = f"/user/{username}/features_with_edge_stats.parquet"
features_with_edge_stats.write.mode("overwrite").parquet(hdfs_path)

print(f"DataFrame saved to {hdfs_path}")

### Fit Training Data

In [None]:
# Setup the stages in the pipeline
stages = []
for column in categorical_cols:
    string_indexer = StringIndexer(inputCol=column, outputCol=column + "_idx", handleInvalid="keep")
    encoder = OneHotEncoder(inputCols=[string_indexer.getOutputCol()], outputCols=[column + "classVec"])
    stages.append(string_indexer)
    stages.append(encoder)

# Append the VectorAssembler and the corrected model to the pipeline stages
all_features = [col + "classVec" for col in categorical_cols] + numerical_cols
vectorAssembler = VectorAssembler(inputCols=all_features, outputCol="assembled_features", handleInvalid="skip")
stages.append(vectorAssembler)

# Apply StandardScaler to scale the features 
scaler = StandardScaler(inputCol="assembled_features", outputCol="features") 
stages.append(scaler)

# Initialise a Random forest-model for binary classification
model = RandomForestRegressor(featuresCol='features', labelCol='delay_arrival', numTrees=100) 
stages.append(model)

# Initialise pipeline
pipeline = Pipeline(stages=stages)

# Proceed to train the model
trainData, testData = features_with_edge_stats.randomSplit([0.8, 0.2], seed=42)
pipelineModel = pipeline.fit(trainData)

### Predict and Evaluate Model

In [None]:
# Make predictions on the test data
predictions = pipelineModel.transform(testData)

# Initialize the evaluator
evaluator_rmse = RegressionEvaluator(labelCol="delay_arrival", predictionCol="prediction", metricName="rmse")
evaluator_mae = RegressionEvaluator(labelCol="delay_arrival", predictionCol="prediction", metricName="mae")
evaluator_r2 = RegressionEvaluator(labelCol="delay_arrival", predictionCol="prediction", metricName="r2")

# Compute the metrics
rmse = evaluator_rmse.evaluate(predictions)
mae = evaluator_mae.evaluate(predictions)
r2 = evaluator_r2.evaluate(predictions)

# Print the evaluation metrics
print(f"Root Mean Squared Error (RMSE): {rmse}")
print(f"Mean Absolute Error (MAE): {mae}")
print(f"R-squared (R²): {r2}")

In [None]:
predictions.select("delay_arrival", "prediction").show(15)

In [None]:
# Save the model
hdfs_path = f"/user/{username}/models"
pipelineModel.write().overwrite().save(hdfs_path)
print(f"Pipeline model saved to {hdfs_path}")

In [None]:
spark.stop()