#Bus Delay Analysis for Milestone 2
Using AWS EMR, Spark, S3 & Zeppelin

This notebook loads 46K rows of real-time bus arrival data collected
from LTA DataMall API, performs big-data transformations, computes
delay patterns, ETA drift, and builds predictive models.

#dataset
Stored on S3 as:
`s3://transportbuddy-bucket/data/bus_arrivals.csv`

#Steps
1️ Load data from S3  
2️ Feature engineering (hour/minute extraction)  
3 ️ETA drift calculation (delay behaviour over time)  
4a Average ETA by Hour of Day(overall buses)
4b Average ETA Per Service Per Hour
5 Average ETA Per-service hourly pattern
6 ️Average ETA Drift by Service
7 Binary Classification Model predicting the likelihood of a bus service being significantly delayed
8Export all relevant files to create charts on frontend




In [1]:
%spark.pyspark
#STEP 1 - Load CSV from S3

from pyspark.sql.functions import col, to_timestamp

# Load dataset (5 days, 670k+ rows)
df_raw = spark.read.csv(
    "s3://transportbuddy-bucket/data/bus_arrivals.csv",
    header=True,
    inferSchema=True
)

df = df_raw \
    .withColumn("timestamp", to_timestamp("collected_at")) \
    .withColumn("bus_stop", col("bus_stop_code")) \
    .withColumn("service", col("service_no")) \
    .withColumn("eta_minutes", col("minutes_away")) \
    .withColumn("eta_seconds", col("minutes_away") * 60) \
    .select(
        "timestamp",
        "bus_stop",
        "service",
        "eta_minutes",
        "eta_seconds",
        "operator",
        "load",
        "bus_type",
        "latitude",
        "longitude"
    )

df.show(5)
df.printSchema()


In [2]:
%spark.pyspark
#STEP 2 - Feature Engineering (Hour, Minute, Day)

from pyspark.sql.functions import hour, minute, dayofweek

df2 = df \
    .withColumn("hour", hour("timestamp")) \
    .withColumn("minute", minute("timestamp")) \
    .withColumn("weekday", dayofweek("timestamp"))

df2.show(5)
df2.printSchema()


In [3]:
%spark.pyspark
# ========================== STEP 3 ==========================
# NOTES
# ETA Drift Calculation (in minutes)
# This shows how the ETA prediction changes over time.

# INTERPRETATION:
# 1)eta_change_minutes > 0 (ETA increased where bus is expected to arrive later than before)
#(bus is more delayed than previous reading)
#
# 2)eta_change_minutes < 0 ( ETA decreased where bus is expected to arrive earlier than before)
#(prediction is moving forward → bus catching up)
#
# 3)eta_change_minutes = 0  (ETA remained the same → prediction stable)

# 4)eta_change_minutes = NULL ( Occurs on the first record of each (bus_stop, service) group
# because there is no previous ETA to compare.)
# =================================================================

from pyspark.sql.window import Window
from pyspark.sql.functions import lag, abs

windowSpec = Window.partitionBy("bus_stop", "service").orderBy("timestamp")

# Previous ETA in minutes
df3 = df2.withColumn(
    "prev_eta_minutes",
    lag("eta_minutes").over(windowSpec)
)

# ETA drift = current ETA - previous ETA 
df3 = df3.withColumn(
    "eta_change_minutes",
    col("eta_minutes") - col("prev_eta_minutes")
)

# Added for richer insights: drift volatility (speed of change)
df3 = df3.withColumn(
    "eta_volatility",
    abs(col("eta_change_minutes"))
)

z.show(
    df3.select(
        "timestamp",
        "bus_stop",
        "service",
        "eta_minutes",
        "prev_eta_minutes",
        "eta_change_minutes",
        "eta_volatility"
    ).limit(20)
)


In [4]:
%spark.pyspark
# ======================= STEP 4a ==============================
# Average ETA by Hour of Day(overall buses)
#
# Expectations:
# 7-9 AM: Higher average ETA (morning peak)
# 5-7 PM: Higher average ETA (evening peak)
# Off-peak hours: Lower and more stable ETA
# Timeframe dataset was collected 5pm - 9 pm
# ============================================================

from pyspark.sql.functions import avg, stddev

# Now show weekday + hour to see patterns across 5 full days
avg_by_hour = df2.groupBy("weekday", "hour") \
    .agg(
        avg("eta_minutes").alias("avg_eta"),
        stddev("eta_minutes").alias("eta_variability")
    ) \
    .orderBy("weekday", "hour")

z.show(avg_by_hour)


In [5]:
%spark.pyspark
# ======================= STEP 4B ==============================
# Per-Service Hourly ETA 

from pyspark.sql.functions import avg, stddev

avg_by_service_hour = df2.groupBy("service", "weekday", "hour") \
    .agg(
        avg("eta_minutes").alias("avg_eta"),
        stddev("eta_minutes").alias("eta_variability")
    ) \
    .orderBy("service", "weekday", "hour")

z.show(avg_by_service_hour.limit(50))


In [6]:
%spark.pyspark
# ======================= STEP 5 ==============================
# Median ETA + Service Stability Stats
# Median ETA by Bus Service
# Median gives the true typical ETA as average is distorted by outliers.
#
# To Identify which bus services have longer or shorter
# predicted arrival times on average.
# ==============================================================

from pyspark.sql.functions import percentile_approx, avg, stddev

median_by_service = df3.groupBy("service") \
    .agg(
        percentile_approx("eta_minutes", 0.5).alias("median_eta"),
        avg("eta_minutes").alias("avg_eta"),
        stddev("eta_minutes").alias("eta_variability"),
        avg("eta_volatility").alias("drift_volatility")
    ) \
    .orderBy("median_eta", ascending=False)

z.show(median_by_service)


In [7]:
%spark.pyspark
# -----------------------------------------------
# STEP 6 - ETA Drift Analysis (Per Bus Service)
#
# Identify which bus services have more unstable ETA behaviour.
# Positive drift - ETA keeps increasing (bus getting delayed)
# Negative drift - ETA decreasing (bus catching up)
# Near zero - stable predictions
#
# Analyse the average drift per service using eta_change_minutes
# created in Step 3.
# -----------------------------------------------

from pyspark.sql.functions import avg, stddev

# Use df3 because it contains:
# - prev_eta_minutes
# - eta_change_minutes
# - eta_volatility (added in Step 3)
drift_by_service = (
    df3.filter(df3.eta_change_minutes.isNotNull())   # remove first rows (NULL)
       .groupBy("service")
       .agg(
           avg("eta_change_minutes").alias("avg_eta_drift"),
           stddev("eta_change_minutes").alias("drift_variability"),
           avg("eta_volatility").alias("avg_volatility")
       )
       .orderBy("avg_eta_drift", ascending=False)
)

z.show(drift_by_service)


In [8]:
%spark.pyspark

# Step 7- FINAL ML MODEL

from pyspark.sql.functions import col, percentile_approx, when, isnan
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator

print("Starting FINAL ML...")#To check

#Feature Engineering & Label Creation (Data Preparation)

# 1) Median ETA per service (used only for label creation)
median_df = df3.groupBy("service").agg(
    percentile_approx("eta_minutes", 0.5).alias("median_eta")
)
df_ml = df3.join(median_df, on="service", how="left")

# 2) Create label
# Label is 1 if current ETA is higher than the historical median ETA for that service
df_ml = df_ml.withColumn(
    "label",
    (col("eta_minutes") > col("median_eta")).cast("int")
)

# 3) AGGRESSIVE CLEANING - Fix for NaN, null, and inf
all_cols = [
    "eta_minutes", "prev_eta_minutes", "eta_change_minutes",
    "eta_volatility", "hour", "minute", "weekday", "bus_stop"
]

for c in all_cols:
    df_ml = df_ml.withColumn(
        c,
        when(col(c).isNull() | isnan(col(c)), 0.0)
        .otherwise(col(c))
        .cast("double")
    )

# Filter out null labels
df_ml = df_ml.filter(col("label").isNotNull())
df_ml = df_ml.filter(col("median_eta").isNotNull())

print(f"Records after cleaning: {df_ml.count()}")


# 4) Encode service as CONTINUOUS feature
service_indexer = StringIndexer(
    inputCol="service",
    outputCol="service_index",
    handleInvalid="keep"
)

df_indexed = service_indexer.fit(df_ml).transform(df_ml)
df_indexed = df_indexed.withColumn("service_index", col("service_index").cast("double"))


#Model Setup

# 5)Assemble features
feature_cols = [
    "prev_eta_minutes", 
    "hour", 
    "minute", 
    "weekday", 
    "bus_stop", 
    "service_index"
]

assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features",
    handleInvalid="skip"
)


# 6)Random Forest Classifier
rf= RandomForestClassifier(
    featuresCol="features",
    labelCol="label",
    numTrees=50,
    maxDepth=8,
    seed=42
)

# Pipeline
pipeline = Pipeline(stages=[assembler, rf])


# 7) Split Data 
train, test = df_indexed.randomSplit([0.70, 0.30], seed=42)
print(f"Train size: {train.count()}")
print(f"Test size: {test.count()}")


# 8)Fit model (using only the training data)
print("Fitting model...")
model = pipeline.fit(train)
print("Model fitted successfully!")


# 9)Predict
pred = model.transform(test)

# 10)Evaluate
evaluator = BinaryClassificationEvaluator(
    labelCol="label",
    rawPredictionCol="rawPrediction",
    metricName="areaUnderROC"
)
auc = evaluator.evaluate(pred)
print(f"\n AUC = {auc:.4f}")


# 11)Show predictions
pred.select(
    "service", "bus_stop", "eta_minutes", "prev_eta_minutes",
    "eta_change_minutes", "eta_volatility",
    "label", "prediction", "probability"
).show(5, truncate=False)

print("\nFINAL ML")

In [9]:
%spark.pyspark

# ============================================================
# Export all datasets needed for frontend (SINGLE FILE EACH)
# ============================================================
from pyspark.ml.functions import vector_to_array
from pyspark.sql.functions import col

print("Starting exports...") #To check

# 1)Export avg_by_hour
avg_by_hour.coalesce(1).write.csv(
    "s3://transportbuddy-bucket/exports/avg_by_hour/",
    mode="overwrite",
    header=True
)

# 2)Export avg_by_service_hour
avg_by_service_hour.coalesce(1).write.csv(
    "s3://transportbuddy-bucket/exports/avg_by_service_hour/",
    mode="overwrite",
    header=True
)

# 3)Export median_by_service
median_by_service.coalesce(1).write.csv(
    "s3://transportbuddy-bucket/exports/median_by_service/",
    mode="overwrite",
    header=True
)

# 4)Export drift_by_service
drift_by_service.coalesce(1).write.csv(
    "s3://transportbuddy-bucket/exports/drift_by_service/",
    mode="overwrite",
    header=True
)

# 5)Export top 10 worst services
top10_worst = drift_by_service.orderBy("avg_eta_drift", ascending=False).limit(10)
top10_worst.coalesce(1).write.csv(
    "s3://transportbuddy-bucket/exports/top10_worst/",
    mode="overwrite",
    header=True
)

# 6)Export FULL ML predictions (vector -> array)
pred_fixed = pred.withColumn(
    "prob_array",
    vector_to_array("probability")
)

pred_export = pred_fixed.select(
    "timestamp",
    "service",
    "bus_stop",
    "eta_minutes",
    "prev_eta_minutes",
    "eta_change_minutes",
    "eta_volatility",
    "label",
    "prediction",
    col("prob_array")[1].alias("prob_delay_increase")  # probability ETA will grow
)

pred_export.coalesce(1).write.csv(
    "s3://transportbuddy-bucket/exports/full_predictions/",
    mode="overwrite",
    header=True
)


print("All 6 export files successfully written to S3 (single CSV each).")


In [10]:
%python
%spark.pyspark
