In [0]:
import pyspark.sql.functions as F
import pyspark.sql.window as W
from pyspark.sql.functions import col, when
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import GBTClassifier
from pyspark.ml import Pipeline
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
import matplotlib.pyplot as plt
import sys

df = spark.read.parquet(f"{folder_path}/otpw_12m.parquet")

# binary target label we are predicting
df = df.withColumn("label", when(col("DEP_DELAY").cast("double") > 15, 1).otherwise(0))

# average delay rates
carrier_delay = df.groupBy("OP_UNIQUE_CARRIER").agg(F.mean("label").alias("CARRIER_DELAY_RATE"))
origin_delay = df.groupBy("ORIGIN").agg(F.mean("label").alias("ORIGIN_DELAY_RATE"))
dest_delay = df.groupBy("DEST").agg(F.mean("label").alias("DEST_DELAY_RATE"))

df = df.join(carrier_delay, on="OP_UNIQUE_CARRIER", how="left")
df = df.join(origin_delay, on="ORIGIN", how="left")
df = df.join(dest_delay, on="DEST", how="left")

# time features
df = df.withColumn("DEP_HOUR", (col("CRS_DEP_TIME").cast("int") / 100).cast("int"))
df = df.withColumn("IS_WEEKEND", when(col("DAY_OF_WEEK").isin("6", "7"), 1).otherwise(0))
df = df.withColumn("IS_WINTER", when(col("MONTH").isin("12", "1", "2"), 1).otherwise(0))
df = df.withColumn("IS_SPRING", when(col("MONTH").isin("3", "4", "5"), 1).otherwise(0))
df = df.withColumn("IS_SUMMER", when(col("MONTH").isin("6", "7", "8"), 1).otherwise(0))
df = df.withColumn("IS_FALL", when(col("MONTH").isin("9", "10", "11"), 1).otherwise(0))
holiday_dates = [
    "2015-01-01",  # New Year's Day
    "2015-01-19",  # Martin Luther King Jr. Day
    "2015-02-14",  # Valentine's Day
    "2015-02-16",  # Presidents Day
    "2015-05-25",  # Memorial Day
    "2015-07-04",  # Independence Day
    "2015-09-07",  # Labor Day
    "2015-11-26",  # Thanksgiving
    "2015-12-25"   # Christmas
    "2015-12-31"   # New Year's Eve
]
df = df.withColumn(
    "IS_HOLIDAY",
    when(col("FL_DATE").isin(holiday_dates), 1).otherwise(0)
)

# window definitions
w_tailnum = W.Window.partitionBy("TAIL_NUM").orderBy("FL_DATE", "CRS_DEP_TIME")
w_airport_history = W.Window.partitionBy("ORIGIN").orderBy("FL_DATE", "CRS_DEP_TIME").rowsBetween(-sys.maxsize, -1)
w_dest_history = W.Window.partitionBy("DEST").orderBy("FL_DATE", "CRS_DEP_TIME").rowsBetween(-sys.maxsize, -1)

df = df.withColumn("Prev_TaxiIn", F.lag("TAXI_IN").over(w_tailnum))
df = df.withColumn("Prev_TaxiOut", F.lag("TAXI_OUT").over(w_tailnum))
df = df.withColumn("Prev_ArrDelay", F.lag("ARR_DELAY").over(w_tailnum))
df = df.withColumn("Prev_ArrTime", F.lag("ARR_TIME").over(w_tailnum))
df = df.withColumn("Turnaround_Time", (col("CRS_DEP_TIME").cast("int") - col("Prev_ArrTime").cast("int")))

# convert times to timestamp for rolling window logic
df = df.withColumn("DEP_DATETIME", F.concat_ws(" ", col("FL_DATE"), F.format_string("%04d", col("CRS_DEP_TIME").cast("int"))))
df = df.withColumn("DEP_TIMESTAMP", F.unix_timestamp("DEP_DATETIME", "yyyy-MM-dd HHmm"))

# delayed or cancelled flights at airport within prior 2 hours
w_airport_2h = (
    W.Window.partitionBy("ORIGIN")
    .orderBy("DEP_TIMESTAMP")
    .rangeBetween(-7200, -1)  # last 2 hours
)
df = df.withColumn(
    "Num_airport_wide_delays",
    F.sum(F.when(col("DEP_DELAY").cast("double") > 15, 1).otherwise(0)).over(w_airport_2h)
)
df = df.withColumn(
    "Num_airport_wide_cancelations",
    F.sum(F.when(col("CANCELLED").cast("double") == 1, 1).otherwise(0)).over(w_airport_2h)
)

# flights arriving at origin 2 hours before scheduled departure
df = df.withColumn(
    "Oncoming_flights",
    F.sum(
        F.when(
            (col("ARR_TIME").cast("int") >= (col("CRS_DEP_TIME").cast("int") - 200)) &
            (col("ARR_TIME").cast("int") <= col("CRS_DEP_TIME").cast("int")), 1
        ).otherwise(0)
    ).over(W.Window.partitionBy("ORIGIN", "FL_DATE"))
)

# rolling on time arrival and departure percentages
df = df.withColumn(
    "OntimeArrivalPct",
    F.avg(F.when(col("ARR_DELAY").cast("double") <= 0, 1).otherwise(0)).over(w_dest_history)
)
df = df.withColumn(
    "OntimeDeparturePct",
    F.avg(F.when(col("DEP_DELAY").cast("double") <= 0, 1).otherwise(0)).over(w_airport_history)
)

base_features = [
    "YEAR", "QUARTER", "MONTH", "DAY_OF_MONTH", "DAY_OF_WEEK",
    "CRS_DEP_TIME", "CRS_ARR_TIME", "DISTANCE",
    "ORIGIN_AIRPORT_ID", "DEST_AIRPORT_ID",
    "ORIGIN_WAC", "DEST_WAC",
    "HourlyPrecipitation", "HourlyVisibility", "HourlyWindSpeed",
    "HourlyWindGustSpeed", "HourlyWindDirection",
    "HourlyDryBulbTemperature", "HourlyDewPointTemperature", "HourlyWetBulbTemperature",
    "HourlyRelativeHumidity", "HourlySeaLevelPressure",
    "HourlyPresentWeatherType"
]
engineered_features = [
    "DEP_HOUR", "IS_WEEKEND", "IS_WINTER", "IS_SPRING", "IS_SUMMER", "IS_FALL", "IS_HOLIDAY",
    "CARRIER_DELAY_RATE", "ORIGIN_DELAY_RATE", "DEST_DELAY_RATE",
    "Prev_TaxiIn", "Prev_TaxiOut", "Prev_ArrDelay", "Turnaround_Time",
    "Num_airport_wide_delays", "Num_airport_wide_cancelations", "Oncoming_flights",
    "OntimeArrivalPct", "OntimeDeparturePct"
]
all_features = base_features + engineered_features

for c in all_features:
    df = df.withColumn(c + "_dbl", col(c).cast("double"))

# drop columns with too many nulls
null_threshold = 1000000
null_counts = df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns]).collect()[0].asDict()
sparse_cols = [col for col, count in null_counts.items() if count > null_threshold]
df = df.drop(*sparse_cols)

# drop rows with nulls
df = df.dropna()

final_cols = [c + "_dbl" for c in all_features if (c not in sparse_cols)]
df_model = df.select(["label"] + final_cols)
df_model = df_model.orderBy("FL_DATE", "CRS_DEP_TIME", "ORIGIN", "DEST")
df_model = df_model.cache()
df_model.count()

assembler = VectorAssembler(inputCols=final_cols, outputCol="features")
gbt = GBTClassifier(
    labelCol="label",
    featuresCol="features",
    maxDepth=5,
    maxIter=20,
    stepSize=0.1,
    subsamplingRate=1.0,
    featureSubsetStrategy="all",
    seed=31
)
pipeline = Pipeline(stages=[assembler, gbt])
model = pipeline.fit(df_model)

importances = model.stages[-1].featureImportances.toArray()
feature_tuples = [(feature, float(importance)) for feature, importance in zip(final_cols, importances)]
schema = StructType([
    StructField("feature", StringType(), False),
    StructField("importance", DoubleType(), False)
])
feature_importance_df = spark.createDataFrame(feature_tuples, schema).orderBy(col("importance").desc())

display(feature_importance_df)

pandas_df = feature_importance_df.toPandas()
plt.figure(figsize=(12, 6))
plt.barh(pandas_df["feature"], pandas_df["importance"])
plt.xlabel("Importance")
plt.title("GBTClassifier Feature Importances")
plt.gca().invert_yaxis()
plt.tight_layout()
plt.show()

In [0]:
# Logistic regression baseline model
import pyspark.sql.functions as F
import pyspark.sql.window as W
from pyspark.sql.functions import col, when
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, precision_score, recall_score, f1_score, classification_report
import sys

df = spark.read.parquet(f"{folder_path}/otpw_12m.parquet")

df = df.withColumn("label", when(col("DEP_DELAY").cast("double") > 15, 1).otherwise(0))

carrier_delay = df.groupBy("OP_UNIQUE_CARRIER").agg(F.mean("label").alias("CARRIER_DELAY_RATE"))
origin_delay = df.groupBy("ORIGIN").agg(F.mean("label").alias("ORIGIN_DELAY_RATE"))
dest_delay = df.groupBy("DEST").agg(F.mean("label").alias("DEST_DELAY_RATE"))
df = df.join(carrier_delay, on="OP_UNIQUE_CARRIER", how="left")
df = df.join(origin_delay, on="ORIGIN", how="left")
df = df.join(dest_delay, on="DEST", how="left")

df = df.withColumn("DEP_HOUR", (col("CRS_DEP_TIME").cast("int") / 100).cast("int"))
df = df.withColumn("IS_WEEKEND", when(col("DAY_OF_WEEK").isin("6", "7"), 1).otherwise(0))
df = df.withColumn("IS_WINTER", when(col("MONTH").isin("12", "1", "2"), 1).otherwise(0))
df = df.withColumn("IS_SPRING", when(col("MONTH").isin("3", "4", "5"), 1).otherwise(0))
df = df.withColumn("IS_SUMMER", when(col("MONTH").isin("6", "7", "8"), 1).otherwise(0))
df = df.withColumn("IS_FALL", when(col("MONTH").isin("9", "10", "11"), 1).otherwise(0))
holiday_dates = [
    "2015-01-01",  # New Year's Day
    "2015-01-19",  # Martin Luther King Jr. Day
    "2015-02-14",  # Valentine's Day
    "2015-02-16",  # Presidents Day
    "2015-05-25",  # Memorial Day
    "2015-07-04",  # Independence Day
    "2015-09-07",  # Labor Day
    "2015-11-26",  # Thanksgiving
    "2015-12-25",  # Christmas
    "2015-12-31"   # New Year's Eve
]
df = df.withColumn(
    "IS_HOLIDAY",
    when(col("FL_DATE").isin(holiday_dates), 1).otherwise(0)
)

w_tailnum = W.Window.partitionBy("TAIL_NUM").orderBy("FL_DATE", "CRS_DEP_TIME")
w_airport_history = W.Window.partitionBy("ORIGIN").orderBy("FL_DATE", "CRS_DEP_TIME").rowsBetween(-sys.maxsize, -1)
w_dest_history = W.Window.partitionBy("DEST").orderBy("FL_DATE", "CRS_DEP_TIME").rowsBetween(-sys.maxsize, -1)

df = df.withColumn("Prev_TaxiIn", F.lag("TAXI_IN").over(w_tailnum))
df = df.withColumn("Prev_TaxiOut", F.lag("TAXI_OUT").over(w_tailnum))
df = df.withColumn("Prev_ArrDelay", F.lag("ARR_DELAY").over(w_tailnum))
df = df.withColumn("Prev_ArrTime", F.lag("ARR_TIME").over(w_tailnum))
df = df.withColumn("Turnaround_Time", (col("CRS_DEP_TIME").cast("int") - col("Prev_ArrTime").cast("int")))

df = df.withColumn("DEP_DATETIME", F.concat_ws(" ", col("FL_DATE"), F.format_string("%04d", col("CRS_DEP_TIME").cast("int"))))
df = df.withColumn("DEP_TIMESTAMP", F.unix_timestamp("DEP_DATETIME", "yyyy-MM-dd HHmm"))
w_airport_2h = (
    W.Window.partitionBy("ORIGIN")
    .orderBy("DEP_TIMESTAMP")
    .rangeBetween(-7200, -1)
)
df = df.withColumn(
    "Num_airport_wide_delays",
    F.sum(F.when(col("DEP_DELAY").cast("double") > 15, 1).otherwise(0)).over(w_airport_2h)
)
df = df.withColumn(
    "Num_airport_wide_cancelations",
    F.sum(F.when(col("CANCELLED").cast("double") == 1, 1).otherwise(0)).over(w_airport_2h)
)
df = df.withColumn(
    "Oncoming_flights",
    F.sum(
        F.when(
            (col("ARR_TIME").cast("int") >= (col("CRS_DEP_TIME").cast("int") - 200)) &
            (col("ARR_TIME").cast("int") <= col("CRS_DEP_TIME").cast("int")), 1
        ).otherwise(0)
    ).over(W.Window.partitionBy("ORIGIN", "FL_DATE"))
)
df = df.withColumn(
    "OntimeArrivalPct",
    F.avg(F.when(col("ARR_DELAY").cast("double") <= 0, 1).otherwise(0)).over(w_dest_history)
)
df = df.withColumn(
    "OntimeDeparturePct",
    F.avg(F.when(col("DEP_DELAY").cast("double") <= 0, 1).otherwise(0)).over(w_airport_history)
)

base_features = [
    "YEAR", "QUARTER", "MONTH", "DAY_OF_MONTH", "DAY_OF_WEEK",
    "CRS_DEP_TIME", "CRS_ARR_TIME", "DISTANCE",
    "ORIGIN_AIRPORT_ID", "DEST_AIRPORT_ID",
    "ORIGIN_WAC", "DEST_WAC",
    "HourlyPrecipitation", "HourlyVisibility", "HourlyWindSpeed",
    "HourlyWindGustSpeed", "HourlyWindDirection",
    "HourlyDryBulbTemperature", "HourlyDewPointTemperature", "HourlyWetBulbTemperature",
    "HourlyRelativeHumidity", "HourlySeaLevelPressure",
    "HourlyPresentWeatherType"
]
engineered_features = [
    "DEP_HOUR", "IS_WEEKEND", "IS_WINTER", "IS_SPRING", "IS_SUMMER", "IS_FALL", "IS_HOLIDAY",
    "CARRIER_DELAY_RATE", "ORIGIN_DELAY_RATE", "DEST_DELAY_RATE",
    "Prev_TaxiIn", "Prev_TaxiOut", "Prev_ArrDelay", "Turnaround_Time",
    "Num_airport_wide_delays", "Num_airport_wide_cancelations", "Oncoming_flights",
    "OntimeArrivalPct", "OntimeDeparturePct"
]
all_features = base_features + engineered_features

for c in all_features:
    df = df.withColumn(c + "_dbl", col(c).cast("double"))

# Drop columns with too many nulls
null_threshold = 1000000
null_counts = df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns]).collect()[0].asDict()
sparse_cols = [col for col, count in null_counts.items() if count > null_threshold]
df = df.drop(*sparse_cols)

# Drop rows with nulls
df = df.dropna()

# Categorical columns to index
categorical_cols = ["ORIGIN", "DEST", "OP_UNIQUE_CARRIER"]

# Define StringIndexers for categoricals
indexers = [
    StringIndexer(inputCol=c, outputCol=f"{c}_idx", handleInvalid='keep') for c in categorical_cols
]

# Final feature list
numeric_features = [c + "_dbl" for c in all_features if (c not in sparse_cols)]
feature_cols = [f"{c}_idx" for c in categorical_cols] + numeric_features

# Assemble features
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

# First 3 quarters for training, last quarter for test
train_df = df.filter(col("MONTH").between(1, 9))
test_df = df.filter(col("MONTH").between(10, 12))

# Define logistic regression and pipeline
lr = LogisticRegression(featuresCol="features", labelCol="label")
pipeline = Pipeline(stages=indexers + [assembler, lr])

# Train
model = pipeline.fit(train_df)

# Predictions
predictions = model.transform(test_df)

# Evaluate overall AUC
evaluator = BinaryClassificationEvaluator(labelCol="label", metricName="areaUnderROC")
auc = evaluator.evaluate(predictions)
print(f"Baseline Logistic Regression AUC: {auc:.3f}")

pdf = predictions.select("label", "prediction").toPandas()

# Confusion Matrix Plot
cm = confusion_matrix(pdf["label"], pdf["prediction"])
plt.figure(figsize=(5,4))
sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", cbar=False,
            xticklabels=["Not Delayed", "Delayed"], yticklabels=["Not Delayed", "Delayed"])
plt.xlabel("Predicted Label")
plt.ylabel("True Label")
plt.title("Confusion Matrix")
plt.show()

# Precision, Recall, F1 for Delayed class (label=1)
precision_delayed = precision_score(pdf["label"], pdf["prediction"], pos_label=1)
recall_delayed = recall_score(pdf["label"], pdf["prediction"], pos_label=1)
f1_delayed = f1_score(pdf["label"], pdf["prediction"], pos_label=1)

print(f"Precision (Delayed class): {precision_delayed:.3f}")
print(f"Recall (Delayed class): {recall_delayed:.3f}")
print(f"F1 (Delayed class): {f1_delayed:.3f}")

# Full classification report for both classes
print("\nClassification report:\n")
print(classification_report(pdf["label"], pdf["prediction"], target_names=["Not Delayed", "Delayed"]))

In [0]:
# Logistic regression with 3 month blocking time series cross-validation and grid search for parameter selection for first 3/4 of data for training, last 1/4 for test 
import pyspark.sql.functions as F
import pyspark.sql.window as W
from pyspark.sql.functions import col, when
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import precision_score, recall_score, f1_score, classification_report, confusion_matrix
import pandas as pd
import sys


df = spark.read.parquet(f"{folder_path}/otpw_12m.parquet")
df = df.withColumn("label", when(col("DEP_DELAY").cast("double") > 15, 1).otherwise(0))
carrier_delay = df.groupBy("OP_UNIQUE_CARRIER").agg(F.mean("label").alias("CARRIER_DELAY_RATE"))
origin_delay = df.groupBy("ORIGIN").agg(F.mean("label").alias("ORIGIN_DELAY_RATE"))
dest_delay = df.groupBy("DEST").agg(F.mean("label").alias("DEST_DELAY_RATE"))
df = df.join(carrier_delay, on="OP_UNIQUE_CARRIER", how="left")
df = df.join(origin_delay, on="ORIGIN", how="left")
df = df.join(dest_delay, on="DEST", how="left")
df = df.withColumn("DEP_HOUR", (col("CRS_DEP_TIME").cast("int") / 100).cast("int"))
df = df.withColumn("IS_WEEKEND", when(col("DAY_OF_WEEK").isin("6", "7"), 1).otherwise(0))
df = df.withColumn("IS_WINTER", when(col("MONTH").isin("12", "1", "2"), 1).otherwise(0))
df = df.withColumn("IS_SPRING", when(col("MONTH").isin("3", "4", "5"), 1).otherwise(0))
df = df.withColumn("IS_SUMMER", when(col("MONTH").isin("6", "7", "8"), 1).otherwise(0))
df = df.withColumn("IS_FALL", when(col("MONTH").isin("9", "10", "11"), 1).otherwise(0))
holiday_dates = [
    "2015-01-01", "2015-01-19", "2015-02-14", "2015-02-16", "2015-05-25",
    "2015-07-04", "2015-09-07", "2015-11-26", "2015-12-25", "2015-12-31"
]
df = df.withColumn("IS_HOLIDAY", when(col("FL_DATE").isin(holiday_dates), 1).otherwise(0))
w_tailnum = W.Window.partitionBy("TAIL_NUM").orderBy("FL_DATE", "CRS_DEP_TIME")
w_airport_history = W.Window.partitionBy("ORIGIN").orderBy("FL_DATE", "CRS_DEP_TIME").rowsBetween(-sys.maxsize, -1)
w_dest_history = W.Window.partitionBy("DEST").orderBy("FL_DATE", "CRS_DEP_TIME").rowsBetween(-sys.maxsize, -1)
df = df.withColumn("Prev_TaxiIn", F.lag("TAXI_IN").over(w_tailnum))
df = df.withColumn("Prev_TaxiOut", F.lag("TAXI_OUT").over(w_tailnum))
df = df.withColumn("Prev_ArrDelay", F.lag("ARR_DELAY").over(w_tailnum))
df = df.withColumn("Prev_ArrTime", F.lag("ARR_TIME").over(w_tailnum))
df = df.withColumn("Turnaround_Time", (col("CRS_DEP_TIME").cast("int") - col("Prev_ArrTime").cast("int")))
df = df.withColumn("DEP_DATETIME", F.concat_ws(" ", col("FL_DATE"), F.format_string("%04d", col("CRS_DEP_TIME").cast("int"))))
df = df.withColumn("DEP_TIMESTAMP", F.unix_timestamp("DEP_DATETIME", "yyyy-MM-dd HHmm"))
w_airport_2h = (W.Window.partitionBy("ORIGIN").orderBy("DEP_TIMESTAMP").rangeBetween(-7200, -1))
df = df.withColumn("Num_airport_wide_delays", F.sum(F.when(col("DEP_DELAY").cast("double") > 15, 1).otherwise(0)).over(w_airport_2h))
df = df.withColumn("Num_airport_wide_cancelations", F.sum(F.when(col("CANCELLED").cast("double") == 1, 1).otherwise(0)).over(w_airport_2h))
df = df.withColumn("Oncoming_flights", F.sum(F.when((col("ARR_TIME").cast("int") >= (col("CRS_DEP_TIME").cast("int") - 200)) & (col("ARR_TIME").cast("int") <= col("CRS_DEP_TIME").cast("int")), 1).otherwise(0)).over(W.Window.partitionBy("ORIGIN", "FL_DATE")))
df = df.withColumn("OntimeArrivalPct", F.avg(F.when(col("ARR_DELAY").cast("double") <= 0, 1).otherwise(0)).over(w_dest_history))
df = df.withColumn("OntimeDeparturePct", F.avg(F.when(col("DEP_DELAY").cast("double") <= 0, 1).otherwise(0)).over(w_airport_history))

base_features = [
    "YEAR", "QUARTER", "MONTH", "DAY_OF_MONTH", "DAY_OF_WEEK",
    "CRS_DEP_TIME", "CRS_ARR_TIME", "DISTANCE",
    "ORIGIN_AIRPORT_ID", "DEST_AIRPORT_ID",
    "ORIGIN_WAC", "DEST_WAC",
    "HourlyPrecipitation", "HourlyVisibility", "HourlyWindSpeed",
    "HourlyWindGustSpeed", "HourlyWindDirection",
    "HourlyDryBulbTemperature", "HourlyDewPointTemperature", "HourlyWetBulbTemperature",
    "HourlyRelativeHumidity", "HourlySeaLevelPressure",
    "HourlyPresentWeatherType"
]
engineered_features = [
    "DEP_HOUR", "IS_WEEKEND", "IS_WINTER", "IS_SPRING", "IS_SUMMER", "IS_FALL", "IS_HOLIDAY",
    "CARRIER_DELAY_RATE", "ORIGIN_DELAY_RATE", "DEST_DELAY_RATE",
    "Prev_TaxiIn", "Prev_TaxiOut", "Prev_ArrDelay", "Turnaround_Time",
    "Num_airport_wide_delays", "Num_airport_wide_cancelations", "Oncoming_flights",
    "OntimeArrivalPct", "OntimeDeparturePct"
]
all_features = base_features + engineered_features

for c in all_features:
    df = df.withColumn(c + "_dbl", col(c).cast("double"))

null_threshold = 1000000
null_counts = df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns]).collect()[0].asDict()
sparse_cols = [col for col, count in null_counts.items() if count > null_threshold]
df = df.drop(*sparse_cols)
df = df.dropna()

categorical_cols = ["ORIGIN", "DEST", "OP_UNIQUE_CARRIER"]
indexers = [
    StringIndexer(inputCol=c, outputCol=f"{c}_idx", handleInvalid='keep') for c in categorical_cols
]
numeric_features = [c + "_dbl" for c in all_features if (c not in sparse_cols)]
feature_cols = [f"{c}_idx" for c in categorical_cols] + numeric_features
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

train_df = df.filter(col("MONTH").between(1, 9))
test_df = df.filter(col("MONTH").between(10, 12))


blocks = [(1, 3), (4, 6), (7, 9)]

param_grid = [
    {"regParam": 0.01, "elasticNetParam": 0.0},   # Ridge
    {"regParam": 0.01, "elasticNetParam": 0.5},   # Elastic Net
    {"regParam": 0.1,  "elasticNetParam": 0.0},   # Ridge
    {"regParam": 0.1,  "elasticNetParam": 0.5},   # Elastic Net
    {"regParam": 1.0,  "elasticNetParam": 0.0},   # Ridge
    {"regParam": 1.0,  "elasticNetParam": 0.5},   # Elastic Net
]

all_grid_results = []
for params in param_grid:
    cv_results = []
    for i in range(1, len(blocks)):
        train_months = [month for b in blocks[:i] for month in range(b[0], b[1] + 1)]
        val_months = list(range(blocks[i][0], blocks[i][1] + 1))

        cv_train = train_df.filter(col("MONTH").isin(train_months))
        cv_val = train_df.filter(col("MONTH").isin(val_months))

        if cv_train.count() == 0 or cv_val.count() == 0:
            continue

        lr = LogisticRegression(featuresCol="features", labelCol="label",
                               regParam=params["regParam"],
                               elasticNetParam=params["elasticNetParam"])
        pipeline = Pipeline(stages=indexers + [assembler, lr])

        cv_model = pipeline.fit(cv_train)
        cv_pred = cv_model.transform(cv_val)

        evaluator = BinaryClassificationEvaluator(labelCol="label", metricName="areaUnderROC")
        auc = evaluator.evaluate(cv_pred)

        pdf = cv_pred.select("label", "prediction").toPandas()
        if len(pdf) == 0:
            continue

        precision_delayed = precision_score(pdf["label"], pdf["prediction"], pos_label=1)
        recall_delayed = recall_score(pdf["label"], pdf["prediction"], pos_label=1)
        f1_delayed = f1_score(pdf["label"], pdf["prediction"], pos_label=1)
        accuracy = (pdf["label"] == pdf["prediction"]).mean()

        cv_results.append({
            "regParam": params["regParam"],
            "elasticNetParam": params["elasticNetParam"],
            "AUC": auc,
            "accuracy": accuracy,
            "precision_delayed": precision_delayed,
            "recall_delayed": recall_delayed,
            "f1_delayed": f1_delayed
        })
    if len(cv_results) > 0:
        df_cv_results = pd.DataFrame(cv_results)
        avg_auc = df_cv_results["AUC"].mean()
        avg_f1 = df_cv_results["f1_delayed"].mean()
        all_grid_results.append({
            "regParam": params["regParam"],
            "elasticNetParam": params["elasticNetParam"],
            "mean_AUC": avg_auc,
            "mean_f1": avg_f1
        })

grid_results_df = pd.DataFrame(all_grid_results)
print("\nGrid search CV results:")
print(grid_results_df.sort_values("mean_f1", ascending=False))

# Select best params
best_row = grid_results_df.loc[grid_results_df["mean_f1"].idxmax()]
best_reg = best_row["regParam"]
best_elastic = best_row["elasticNetParam"]
print(f"\nBest parameters: regParam={best_reg}, elasticNetParam={best_elastic}")


lr = LogisticRegression(featuresCol="features", labelCol="label",
                       regParam=best_reg,
                       elasticNetParam=best_elastic)
pipeline = Pipeline(stages=indexers + [assembler, lr])
final_model = pipeline.fit(train_df)
final_predictions = final_model.transform(test_df)
final_pdf = final_predictions.select("label", "prediction").toPandas()
evaluator = BinaryClassificationEvaluator(labelCol="label", metricName="areaUnderROC")
final_auc = evaluator.evaluate(final_predictions)
precision_delayed = precision_score(final_pdf["label"], final_pdf["prediction"], pos_label=1)
recall_delayed = recall_score(final_pdf["label"], final_pdf["prediction"], pos_label=1)
f1_delayed = f1_score(final_pdf["label"], final_pdf["prediction"], pos_label=1)
accuracy = (final_pdf["label"] == final_pdf["prediction"]).mean()

print("\nFinal holdout Q4 results:")
print(f"AUC: {final_auc:.3f}")
print(f"Accuracy: {accuracy:.3f}")
print(f"Precision (Delayed): {precision_delayed:.3f}")
print(f"Recall (Delayed): {recall_delayed:.3f}")
print(f"F1 (Delayed): {f1_delayed:.3f}")

print("\nClassification report for Q4 test set:\n")
print(classification_report(final_pdf["label"], final_pdf["prediction"], target_names=["Not Delayed", "Delayed"]))

cm = confusion_matrix(final_pdf["label"], final_pdf["prediction"])
plt.figure(figsize=(5,4))
sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", cbar=False,
            xticklabels=["Not Delayed", "Delayed"], yticklabels=["Not Delayed", "Delayed"])
plt.xlabel("Predicted Label")
plt.ylabel("True Label")
plt.title("Confusion Matrix (Q4 Test Set)")
plt.show()

In [0]:
import sys
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression, GBTClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.stat import Correlation
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
import pandas as pd
import mlflow
import mlflow.spark
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix, precision_score, recall_score, f1_score, classification_report

def plot_correlation_heatmap(
    df, 
    columns, 
    sample_size=5000, 
    title="Correlation Matrix Heatmap"
):
    """
    Plots a correlation matrix heatmap for the given Spark DataFrame.
    
    Parameters:
        df (Spark DataFrame): Input DataFrame, feature-engineered and all columns numeric.
        columns (list): List of column names (strings) to include in the correlation matrix.
        sample_size (int): How many rows to sample for speed (default 5000).
        title (str): Title for the plot.
    """
    # remove rows with nulls in selected columns and sample
    df_corr_sample = df.select(columns).dropna().limit(sample_size)

    assembler = VectorAssembler(inputCols=columns, outputCol="features_corr")
    df_corr_vec = assembler.transform(df_corr_sample)

    corr = Correlation.corr(df_corr_vec, "features_corr", "pearson").head()[0].toArray()

    # Convert result to Pandas DataFrame for plotting
    corr_df = pd.DataFrame(corr, columns=columns, index=columns)

    # Plot
    plt.figure(figsize=(1.5*len(columns), 1.2*len(columns)))
    sns.heatmap(
        corr_df,
        cmap='coolwarm',
        center=0,
        annot=True,
        fmt=".2f",
        annot_kws={"size": 6},
        linewidths=0.5,
        cbar_kws={"shrink": 0.7}
    )
    plt.title(title, fontsize=16)
    plt.xticks(rotation=45, ha='right', fontsize=8)
    plt.yticks(rotation=0, fontsize=8)
    plt.tight_layout()
    plt.show()

# Example usage after all feature engineering
# corr_cols = [
#     "label_dbl", "DEP_HOUR_dbl", "CARRIER_DELAY_RATE_dbl", "ORIGIN_DELAY_RATE_dbl",
#     "Prev_TaxiIn_dbl", "Prev_TaxiOut_dbl", "Prev_ArrDelay_dbl", "Turnaround_Time_dbl"
#     # ... add any more _dbl numeric columns you want
# ]

# plot_correlation_heatmap(df, corr_cols, sample_size=5000, title="My Flight Delay Feature Correlation Heatmap")

def plot_gbt_feature_importances(
    df,
    feature_cols,
    label_col="label",
    maxDepth=5,
    maxIter=20,
    stepSize=0.1,
    subsamplingRate=1.0,
    featureSubsetStrategy="all",
    seed=31,
    top_n=20,
    figsize=(12,6)
):
    """
    Fit a GBTClassifier and plot feature importances.

    Parameters:
        df (DataFrame): Spark DataFrame with features already engineered (numeric)
        feature_cols (list): List of column names (strings) to use as features
        label_col (str): Column name for label (default: "label")
        maxDepth, maxIter, stepSize, subsamplingRate, featureSubsetStrategy, seed: GBTClassifier params
        top_n (int): Plot the top_n most important features
        figsize (tuple): Matplotlib figure size
    """
    
    assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
    gbt = GBTClassifier(
        labelCol=label_col,
        featuresCol="features",
        maxDepth=maxDepth,
        maxIter=maxIter,
        stepSize=stepSize,
        subsamplingRate=subsamplingRate,
        featureSubsetStrategy=featureSubsetStrategy,
        seed=seed
    )
    pipeline = Pipeline(stages=[assembler, gbt])

    # fit GBT
    model = pipeline.fit(df)

    # extract importances
    importances = model.stages[-1].featureImportances.toArray()
    feature_tuples = [(feature, float(importance)) for feature, importance in zip(feature_cols, importances)]

    # create dataframe for sorting and plotting
    schema = StructType([
        StructField("feature", StringType(), False),
        StructField("importance", DoubleType(), False)
    ])
    feature_importance_df = df.sql_ctx.createDataFrame(feature_tuples, schema).orderBy(col("importance").desc())

    # toPandas for plotting
    pandas_df = feature_importance_df.toPandas().head(top_n)
    plt.figure(figsize=figsize)
    plt.barh(pandas_df["feature"], pandas_df["importance"])
    plt.xlabel("Importance")
    plt.title(f"GBTClassifier Feature Importances (Top {top_n})")
    plt.gca().invert_yaxis()
    plt.tight_layout()
    plt.show()
    return feature_importance_df

# Example usage after feature engineering:
# feature_cols = ["DEP_HOUR_dbl", "Prev_ArrDelay_dbl", ...] 
# feature_importance_df = plot_gbt_feature_importances(df_model, feature_cols, label_col="label", top_n=15)

def run_logreg_baseline(
    df,
    feature_cols,
    label_col="label",
    split_col="MONTH",
    train_months=range(1,10),
    test_months=range(10,13),
    experiment_name="/Users/<your-username>/flight-delay-logreg",  # set your Databricks user or shared experiment path
    run_name="logreg_baseline"
):
    """
    Train, evaluate, and log a logistic regression baseline on a feature-engineered Spark DataFrame.
    Saves the model as a Databricks experiment with MLflow.
    """
    # set experiment (creates if doesn't exist)
    mlflow.set_experiment(experiment_name)
    with mlflow.start_run(run_name=run_name):
        
        assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
        lr = LogisticRegression(featuresCol="features", labelCol=label_col)
        pipeline = Pipeline(stages=[assembler, lr])

        # train/test split by time
        train_df = df.filter(col(split_col).isin(train_months))
        test_df = df.filter(col(split_col).isin(test_months))

        # train model
        model = pipeline.fit(train_df)
        # save model with MLflow
        mlflow.spark.log_model(model, "logreg-model")

        # predict with test data
        predictions = model.transform(test_df)
        pdf = predictions.select(label_col, "prediction").toPandas()

        # metrics
        evaluator = BinaryClassificationEvaluator(labelCol=label_col, metricName="areaUnderROC")
        auc = evaluator.evaluate(predictions)
        precision_delayed = precision_score(pdf[label_col], pdf["prediction"], pos_label=1)
        recall_delayed = recall_score(pdf[label_col], pdf["prediction"], pos_label=1)
        f1_delayed = f1_score(pdf[label_col], pdf["prediction"], pos_label=1)
        accuracy = (pdf[label_col] == pdf["prediction"]).mean()

        # log metrics
        mlflow.log_metric("AUC", auc)
        mlflow.log_metric("Precision_Delayed", precision_delayed)
        mlflow.log_metric("Recall_Delayed", recall_delayed)
        mlflow.log_metric("F1_Delayed", f1_delayed)
        mlflow.log_metric("Accuracy", accuracy)

        # confusion matrix plot
        cm = confusion_matrix(pdf[label_col], pdf["prediction"])
        plt.figure(figsize=(5, 4))
        sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", cbar=False,
                    xticklabels=["Not Delayed", "Delayed"], yticklabels=["Not Delayed", "Delayed"])
        plt.xlabel("Predicted Label")
        plt.ylabel("True Label")
        plt.title("Confusion Matrix")
        plt.tight_layout()
        plt.savefig("confusion_matrix.png")
        plt.close()
        mlflow.log_artifact("confusion_matrix.png")

        # print metrics
        print(f"AUC: {auc:.3f}")
        print(f"Accuracy: {accuracy:.3f}")
        print(f"Precision (Delayed class): {precision_delayed:.3f}")
        print(f"Recall (Delayed class): {recall_delayed:.3f}")
        print(f"F1 (Delayed class): {f1_delayed:.3f}")

        print("\nClassification report:\n")
        print(classification_report(pdf[label_col], pdf["prediction"], target_names=["Not Delayed", "Delayed"]))

        return model

# Usage Example:
# feature_cols = ["DEP_HOUR", "Prev_ArrDelay", ...] # numeric, feature-engineered columns
# model = run_logreg_baseline(
#     df=my_featured_df,
#     feature_cols=feature_cols,
#     label_col="label",
#     experiment_name="/Users/<your-username>/flight-delay-logreg"
# )

def run_blocked_ts_logreg_cv(
    df,
    feature_cols,
    label_col="label",
    month_col="MONTH",
    blocks=[(1, 3), (4, 6), (7, 9)],
    test_months=range(10, 13),
    param_grid=[
        {"regParam": 0.01, "elasticNetParam": 0.0},   # Ridge
        {"regParam": 0.01, "elasticNetParam": 0.5},   # Elastic Net
        {"regParam": 0.1,  "elasticNetParam": 0.0},   # Ridge
        {"regParam": 0.1,  "elasticNetParam": 0.5},   # Elastic Net
        {"regParam": 1.0,  "elasticNetParam": 0.0},   # Ridge
        {"regParam": 1.0,  "elasticNetParam": 0.5},   # Elastic Net
    ],
    experiment_name="/Users/<your-username>/flight-delay-logreg-cv",  # change to your workspace path
    run_name="blocked_ts_logreg_cv"
):
    """
    Time series blocked cross-validated logistic regression with grid search.
    Assumes input Spark DataFrame has all numeric features, already indexed.
    Logs model and results to MLflow (Databricks Experiments).
    """
    mlflow.set_experiment(experiment_name)
    with mlflow.start_run(run_name=run_name):
        assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
        all_grid_results = []

        for params in param_grid:
            cv_results = []
            for i in range(1, len(blocks)):
                train_months = [month for b in blocks[:i] for month in range(b[0], b[1] + 1)]
                val_months = list(range(blocks[i][0], blocks[i][1] + 1))

                cv_train = df.filter(col(month_col).isin(train_months))
                cv_val = df.filter(col(month_col).isin(val_months))

                if cv_train.count() == 0 or cv_val.count() == 0:
                    continue

                lr = LogisticRegression(featuresCol="features", labelCol=label_col,
                                       regParam=params["regParam"],
                                       elasticNetParam=params["elasticNetParam"])
                pipeline = Pipeline(stages=[assembler, lr])

                cv_model = pipeline.fit(cv_train)
                cv_pred = cv_model.transform(cv_val)

                evaluator = BinaryClassificationEvaluator(labelCol=label_col, metricName="areaUnderROC")
                auc = evaluator.evaluate(cv_pred)
                pdf = cv_pred.select(label_col, "prediction").toPandas()
                if len(pdf) == 0:
                    continue

                precision_delayed = precision_score(pdf[label_col], pdf["prediction"], pos_label=1)
                recall_delayed = recall_score(pdf[label_col], pdf["prediction"], pos_label=1)
                f1_delayed = f1_score(pdf[label_col], pdf["prediction"], pos_label=1)
                accuracy = (pdf[label_col] == pdf["prediction"]).mean()

                cv_results.append({
                    "regParam": params["regParam"],
                    "elasticNetParam": params["elasticNetParam"],
                    "AUC": auc,
                    "accuracy": accuracy,
                    "precision_delayed": precision_delayed,
                    "recall_delayed": recall_delayed,
                    "f1_delayed": f1_delayed
                })
            if len(cv_results) > 0:
                df_cv_results = pd.DataFrame(cv_results)
                avg_auc = df_cv_results["AUC"].mean()
                avg_f1 = df_cv_results["f1_delayed"].mean()
                all_grid_results.append({
                    "regParam": params["regParam"],
                    "elasticNetParam": params["elasticNetParam"],
                    "mean_AUC": avg_auc,
                    "mean_f1": avg_f1
                })

        grid_results_df = pd.DataFrame(all_grid_results)
        print("\nGrid search CV results:")
        print(grid_results_df.sort_values("mean_f1", ascending=False))

        # log grid search results as artifact
        grid_results_df.to_csv("grid_search_cv_results.csv", index=False)
        mlflow.log_artifact("grid_search_cv_results.csv")

        # select best params
        best_row = grid_results_df.loc[grid_results_df["mean_f1"].idxmax()]
        best_reg = best_row["regParam"]
        best_elastic = best_row["elasticNetParam"]
        print(f"\nBest parameters: regParam={best_reg}, elasticNetParam={best_elastic}")

        mlflow.log_param("best_regParam", best_reg)
        mlflow.log_param("best_elasticNetParam", best_elastic)

        # train on full training data (first 3/4 of year)
        train_months = [month for b in blocks for month in range(b[0], b[1] + 1)]
        train_df = df.filter(col(month_col).isin(train_months))
        test_df = df.filter(col(month_col).isin(test_months))

        lr = LogisticRegression(featuresCol="features", labelCol=label_col,
                               regParam=best_reg,
                               elasticNetParam=best_elastic)
        pipeline = Pipeline(stages=[assembler, lr])
        final_model = pipeline.fit(train_df)
        mlflow.spark.log_model(final_model, "final-model")
        
        # predict with test data (last 1/4 of year)
        final_predictions = final_model.transform(test_df)
        final_pdf = final_predictions.select(label_col, "prediction").toPandas()
        evaluator = BinaryClassificationEvaluator(labelCol=label_col, metricName="areaUnderROC")
        final_auc = evaluator.evaluate(final_predictions)
        precision_delayed = precision_score(final_pdf[label_col], final_pdf["prediction"], pos_label=1)
        recall_delayed = recall_score(final_pdf[label_col], final_pdf["prediction"], pos_label=1)
        f1_delayed = f1_score(final_pdf[label_col], final_pdf["prediction"], pos_label=1)
        accuracy = (final_pdf[label_col] == final_pdf["prediction"]).mean()

        mlflow.log_metric("AUC", final_auc)
        mlflow.log_metric("Accuracy", accuracy)
        mlflow.log_metric("Precision_Delayed", precision_delayed)
        mlflow.log_metric("Recall_Delayed", recall_delayed)
        mlflow.log_metric("F1_Delayed", f1_delayed)

        print("\nFinal holdout Q4 results:")
        print(f"AUC: {final_auc:.3f}")
        print(f"Accuracy: {accuracy:.3f}")
        print(f"Precision (Delayed): {precision_delayed:.3f}")
        print(f"Recall (Delayed): {recall_delayed:.3f}")
        print(f"F1 (Delayed): {f1_delayed:.3f}")

        print("\nClassification report for Q4 test set:\n")
        print(classification_report(final_pdf[label_col], final_pdf["prediction"], target_names=["Not Delayed", "Delayed"]))

        # confusion matrix plot
        cm = confusion_matrix(final_pdf[label_col], final_pdf["prediction"])
        plt.figure(figsize=(5,4))
        sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", cbar=False,
                    xticklabels=["Not Delayed", "Delayed"], yticklabels=["Not Delayed", "Delayed"])
        plt.xlabel("Predicted Label")
        plt.ylabel("True Label")
        plt.title("Confusion Matrix (Q4 Test Set)")
        plt.tight_layout()
        plt.savefig("confusion_matrix.png")
        plt.close()
        mlflow.log_artifact("confusion_matrix.png")
        
        return final_model, grid_results_df

# Example usage:
# final_model, grid_results_df = run_blocked_ts_logreg_cv(
#     df=your_df,
#     feature_cols=your_feature_cols,
#     label_col="label",
#     month_col="MONTH",
#     experiment_name="/Users/your-user/flight-delay-logreg-cv"
# )

In [0]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml import Pipeline

# Exclude target and date column
exclude_columns = {"delay_class", "sched_depart_date_time_UTC", "FL_DATE"}
column_types = dict(df_clean.dtypes)

numerical_features = []
categorical_features = []

# Separate features by type
for col_name, col_type in column_types.items():
    if col_name in exclude_columns:
        continue
        
    if col_type in ['int', 'bigint', 'double', 'float']:
        categorical_int_cols = {
            'DAY_OF_WEEK', 'MONTH', 'QUARTER', 'DISTANCE_GROUP', 'YEAR', 'season'
            'ORIGIN_WAC', 'origin_type', 'dest_type', 'DEST_WAC', 'is_weekend', 'is_peak_hour', 'distance_category'
            'departure_dayofweek', 'departure_month', 'hour_category', 'is_holiday_window', 'STATION', 'departure_hour'
        }
        if col_name in categorical_int_cols:
            categorical_features.append(col_name)
        else:
            numerical_features.append(col_name)
    elif col_type == 'string':
        categorical_features.append(col_name)

# Build categorical stages
indexers = [StringIndexer(inputCol=f, outputCol=f+"_indexed", handleInvalid="keep") for f in categorical_features]
encoders = [OneHotEncoder(inputCol=f+"_indexed", outputCol=f+"_encoded", dropLast=True) for f in categorical_features]

# Assemble categorical features
categorical_assembler = VectorAssembler(
    inputCols=[f+"_encoded" for f in categorical_features],
    outputCol="categorical_features_vector",
    handleInvalid="keep"
)

# Assemble numerical features
numerical_assembler = VectorAssembler(
    inputCols=numerical_features,
    outputCol="numerical_features_vector",
    handleInvalid="keep"
)

# Scale numerical features
scaler = StandardScaler(
    inputCol="numerical_features_vector",
    outputCol="scaled_numerical_features",
    withMean=True,
    withStd=True
)

# Combine all stages into one pipeline
pipeline_stages = indexers + encoders + [categorical_assembler, numerical_assembler, scaler]

pipeline = Pipeline(stages=pipeline_stages)

# Fit and transform on the same DataFrame
model = pipeline.fit(df_clean)
df_transformed = model.transform(df_clean)

# Combine categorical and scaled numerical features into final feature vector
final_assembler = VectorAssembler(
    inputCols=["categorical_features_vector", "scaled_numerical_features"],
    outputCol="features",
    handleInvalid="keep"
)

df_final = final_assembler.transform(df_transformed)

df_modeling = df_final.select("features", "delay_class", "sched_depart_date_time_UTC", "FL_DATE")

# Check feature sizes
total_features = df_modeling.select("features").first()[0].size
categorical_size = total_features - len(numerical_features)

print(f"Total feature vector size: {total_features}")
print(f"Categorical features (one-hot): {categorical_size}")
print(f"Numerical features (scaled): {len(numerical_features)}")
print(f"Dataset ready for PCA and modeling")
row_count = df_modeling.count()
print(f"Total rows in final dataset: {row_count}")

df_modeling.show(5)


In [0]:
from pyspark.ml.feature import PCA
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import numpy as np

# Fit PCA with many components to find variance threshold
pca_full = PCA(k=124, inputCol="features", outputCol="pca_features")
pca_full_model = pca_full.fit(df_modeling)

# Get explained variance ratios
explained_variance = pca_full_model.explainedVariance.toArray()
cumulative_variance = np.cumsum(explained_variance)

# Find number of components for 95% variance
components_95 = np.argmax(cumulative_variance >= 0.95) + 1
print(f"Components needed for 95% variance: {components_95}")

# Fit PCA with optimal number of components
pca_optimal = PCA(k=components_95, inputCol="features", outputCol="pca_features")
pca_optimal_model = pca_optimal.fit(df_modeling)
df_final_pca = pca_optimal_model.transform(df_modeling)

print(f"\nPCA Results:")
print(f"Optimal PCA components: {components_95}")
print(f"Variance explained: {cumulative_variance[components_95-1]:.3f}")

final_components = components_95

# Select final columns for modeling
df_model_ready = df_final_pca.select("pca_features", "delay_class", "sched_depart_date_time_UTC").withColumnRenamed("pca_features", "features")

print(f"\nFinal dataset:")
print(f"Features: {final_components} PCA components")
print(f"Target: delay_class (6 classes)")
print(f"Rows: {df_model_ready.count()}")

# Show class distribution
print(f"\nClass distribution:")
df_model_ready.groupBy("delay_class").count().orderBy("delay_class").show()

In [0]:
# ONE-HOT ENCODING FOR CATEGORICAL FEATURES
print("=== One-Hot Encoding Categorical Features ===")

# Create StringIndexers for all categorical features
indexers = []
for feature in categorical_features:
    indexer = StringIndexer(
        inputCol=feature, 
        outputCol=f"{feature}_indexed",
        handleInvalid="keep"  # Handle any unseen categories
    )
    indexers.append(indexer)

# Create OneHotEncoders for all indexed features
encoders = []
for feature in categorical_features:
    encoder = OneHotEncoder(
        inputCol=f"{feature}_indexed",
        outputCol=f"{feature}_encoded",
        dropLast=True  # Avoid multicollinearity
    )
    encoders.append(encoder)

# Assemble all one-hot encoded features into a single vector
categorical_encoded_cols = [f"{feature}_encoded" for feature in categorical_features]
categorical_assembler = VectorAssembler(
    inputCols=categorical_encoded_cols,
    outputCol="categorical_features_vector",
    handleInvalid="keep"
)

# Create pipeline for categorical features
categorical_pipeline = Pipeline(stages=indexers + encoders + [categorical_assembler])

print(f"Created categorical pipeline with {len(categorical_features)} features")
print(f"Pipeline stages: {len(indexers)} indexers + {len(encoders)} encoders + 1 assembler")


In [0]:
# SCALING FOR NUMERICAL FEATURES  
print("=== Scaling Numerical Features ===")

# Assemble numerical features into a single vector
numerical_assembler = VectorAssembler(
    inputCols=numerical_features,
    outputCol="numerical_features_vector",
    handleInvalid="keep"
)

# Create StandardScaler (centers around 0, scales to unit variance)
scaler = StandardScaler(
    inputCol="numerical_features_vector",
    outputCol="scaled_numerical_features",
    withMean=True,  # Center around 0 
    withStd=True    # Scale to unit variance
)

# Create pipeline for numerical features
numerical_pipeline = Pipeline(stages=[numerical_assembler, scaler])

print(f"Created numerical pipeline with {len(numerical_features)} features")
print("StandardScaler: withMean=True, withStd=True (good for PCA)")

In [0]:
# COMBINE BOTH PIPELINES AND FIT
print("=== Fitting Complete Feature Pipeline ===")

# Fit categorical pipeline
print("Fitting categorical pipeline...")
categorical_model = categorical_pipeline.fit(df_clean)
df_categorical = categorical_model.transform(df_clean)

# Fit numerical pipeline  
print("Fitting numerical pipeline...")
numerical_model = numerical_pipeline.fit(df_clean)
df_numerical = numerical_model.transform(df_clean)

# Combine the results
# We need to join the categorical and numerical transformations
df_processed = df_categorical.select(
    "DEP_DELAY_GROUP",  # Keep target variable
    "categorical_features_vector"
).join(
    df_numerical.select(
        "DEP_DELAY_GROUP",  # For join key
        "scaled_numerical_features"
    ),
    on="DEP_DELAY_GROUP",
    how="inner"
)

print("Feature processing complete!")
print(f"Final dataset shape: {df_processed.count()} rows")

# Show vector dimensions
categorical_size = df_processed.select("categorical_features_vector").first()[0].size
numerical_size = df_processed.select("scaled_numerical_features").first()[0].size

print(f"Categorical features vector size: {categorical_size}")
print(f"Numerical features vector size: {numerical_size}")
print(f"Total features: {categorical_size + numerical_size}")

In [0]:
# FINAL FEATURE VECTOR ASSEMBLY
print("=== Creating Final Feature Vector ===")

# Combine categorical and numerical feature vectors
final_assembler = VectorAssembler(
    inputCols=["categorical_features_vector", "scaled_numerical_features"],
    outputCol="features",
    handleInvalid="keep"
)

df_final = final_assembler.transform(df_processed)

# Select only what we need for modeling
df_modeling = df_final.select("features", "DEP_DELAY_GROUP")

print("Final modeling dataset ready!")
print(f"Shape: {df_modeling.count()} rows")
print(f"Feature vector size: {df_modeling.select('features').first()[0].size}")
print(f"Target classes: {df_modeling.select('DEP_DELAY_GROUP').distinct().count()}")

# Show sample
print("\nSample of final dataset:")
df_modeling.show(5)

In [0]:
# 3.3: Handle ALL Categorical Features
print("\n=== HANDLING ALL CATEGORICAL FEATURES ===")

# Identify all string columns as categorical features
categorical_features = [field.name for field in df_features.schema.fields
                       if isinstance(field.dataType, StringType)]

print(f"Found {len(categorical_features)} categorical features to encode")

# Process all categorical features
for col_name in categorical_features:
    # Calculate cardinality
    distinct_count = df_features.select(col_name).distinct().count()
    print(f"Encoding {col_name} with {distinct_count} distinct values")
    
    # Get value frequencies
    value_counts = df_features.groupBy(col_name).count().orderBy("count", ascending=False)
    
    # Create indicators for top N values
    top_n = 10 if distinct_count > 10 else distinct_count # Take up to 10 most common values
    top_values = [row[col_name] for row in value_counts.limit(top_n).collect()]
    
    for value in top_values:
        if value is not None:  # Skip None values
            safe_value = str(value).replace(" ", "_").replace("-", "_").replace(".", "_").lower()
            new_col = f"{col_name}_{safe_value}"
            df_features = df_features.withColumn(
                new_col,
                when(col(col_name) == value, 1).otherwise(0)
            )
    print(f"Created {len(top_values)} indicator variables for top values of {col_name}")

In [0]:
# 3.4: Scale ALL Numeric Features
print("\n=== SCALING ALL NUMERIC FEATURES ===")

# Identify all numeric columns
numeric_features = [field.name for field in df_features.schema.fields 
                   if any(isinstance(field.dataType, t) for t in [DoubleType, IntegerType, FloatType])]

# Exclude binary indicator columns and target variables
binary_indicators = [col_name for col_name in df_features.columns 
                    if col_name.startswith("is_") 
                    or col_name.endswith("_short") 
                    or col_name.endswith("_medium")
                    or col_name.endswith("_long")
                    or col_name.endswith("_very_long")
                    or col_name.endswith("_morning")
                    or col_name.endswith("_afternoon")
                    or col_name.endswith("_evening")
                    or col_name.endswith("_night")
                    or any(col_name.startswith(f"{c}_") for c in categorical_features)]

target_vars = ["DEP_DELAY", "DEP_DEL15"]

# Filter numeric features to exclude binary indicators and target variables
numeric_to_scale = [col_name for col_name in numeric_features 
                   if col_name not in binary_indicators
                   and col_name not in target_vars
                   and not col_name.endswith("_scaled")]  # Avoid scaling already scaled features

print(f"Found {len(numeric_to_scale)} numeric features to scale")

# Apply z-score scaling to all numeric features
for col_name in numeric_to_scale:
    # Calculate statistics
    stats = df_features.select(
        F.mean(col(col_name)).alias("mean"),
        F.stddev(col(col_name)).alias("stddev")
    ).collect()[0]
    
    mean_val = stats["mean"]
    stddev_val = stats["stddev"]
    
    if mean_val is not None and stddev_val is not None and stddev_val > 0:
        # Create standardized version (z-score: (x - mean) / stddev)
        df_features = df_features.withColumn(
            col_name + "_scaled",
            (col(col_name) - mean_val) / stddev_val
        )
        print(f"Applied z-score scaling to {col_name}")
    else:
        print(f"Skipping scaling for {col_name} (constant or missing data)")

#### Drop Diverted Flights:

print("=== Dropping Diverted Flights ===")
before_count = otpw_3m_imputed.count()
otpw_3m_imputed = otpw_3m_imputed.filter((col("DIVERTED") != 1) | col("DIVERTED").isNull())
after_count = otpw_3m_imputed.count()
diverted_dropped = before_count - after_count
print(f"Dropped {diverted_dropped:,} diverted flights ({diverted_dropped/before_count*100:.2f}%)")
print(f"Remaining flights: {after_count:,}")

%md
#### Verify No Null Categorical Features After Cleaning:

print("\n=== Verifying Categorical Features Have No Nulls ===")
categorical_cols = [
    "ORIGIN", "DEST", "OP_UNIQUE_CARRIER", "TAIL_NUM", 
    "ORIGIN_AIRPORT_SEQ_ID", "DEST_AIRPORT_SEQ_ID",
    "ORIGIN_CITY_MARKET_ID", "DEST_CITY_MARKET_ID",
    "ORIGIN_CITY_NAME", "DEST_CITY_NAME", "ORIGIN_STATE_ABR", "DEST_STATE_ABR",
    "HourlyPresentWeatherType", "CANCELLATION_CODE", "DAY_OF_WEEK"
]

categorical_null_check = []
for col_name in categorical_cols:
    if col_name in otpw_3m_imputed.columns:
        null_count = otpw_3m_imputed.filter(col(col_name).isNull()).count()
        categorical_null_check.append((col_name, null_count))
        
cat_null_df = pd.DataFrame(categorical_null_check, columns=["Column", "Null Count"])
print("\nCategorical Columns Null Count:")
display(cat_null_df)

if cat_null_df["Null Count"].sum() == 0:
    print("✓ All categorical features have been successfully cleaned (no nulls)")
else:
    print(f"⚠️  Warning: {cat_null_df[cat_null_df['Null Count'] > 0].shape[0]} categorical columns still have nulls")

%md
#### Remove All Leakage Features:

print("\n=== Removing Data Leakage Features ===")

# Define all leakage features (features known only after flight completion)
leakage_features = [
    # Actual times (only known after flight)
    "DEP_TIME", "ARR_TIME", "WHEELS_OFF", "WHEELS_ON",
    
    # Actual delays (target-related)
    "DEP_DELAY", "DEP_DELAY_NEW", "DEP_DELAY_GROUP",
    "ARR_DELAY", "ARR_DELAY_NEW", "ARR_DELAY_GROUP",
    
    # Taxi times (only known after flight)
    "TAXI_OUT", "TAXI_IN",
    
    # Flight durations (only known after completion)
    "ACTUAL_ELAPSED_TIME", "AIR_TIME",
    
    # Delay breakdowns (only known after delay occurs)
    "CARRIER_DELAY", "WEATHER_DELAY", "NAS_DELAY", 
    "SECURITY_DELAY", "LATE_AIRCRAFT_DELAY",
    
    # Other post-flight info
    "FIRST_DEP_TIME", "TOTAL_ADD_GTIME", "LONGEST_ADD_GTIME",
    
    # Keep ARR_DEL15 if predicting departure delay, remove if predicting arrival
    "ARR_DEL15"  # Remove this since we're predicting DEP_DEL15
]

# Count and remove leakage features
existing_leakage = [col for col in leakage_features if col in otpw_3m_imputed.columns]
print(f"Found {len(existing_leakage)} leakage features to remove:")
for feat in existing_leakage:
    print(f"  - {feat}")

otpw_3m_clean_no_leakage = otpw_3m_imputed.drop(*existing_leakage)

print(f"\nColumns before: {len(otpw_3m_imputed.columns)}")
print(f"Columns after: {len(otpw_3m_clean_no_leakage.columns)}")
print(f"Removed: {len(otpw_3m_imputed.columns) - len(otpw_3m_clean_no_leakage.columns)} columns")

%md
#### Feature Selection: Correlation Analysis with Target:

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.stat import Correlation
import matplotlib.pyplot as plt
import seaborn as sns

print("\n=== Feature Correlation Analysis ===")

# Select numerical features for correlation analysis
numerical_features = [
    "QUARTER", "MONTH", "DAY_OF_MONTH", "DAY_OF_WEEK",
    "CRS_DEP_TIME", "CRS_ARR_TIME", "CRS_ELAPSED_TIME",
    "DISTANCE", "DISTANCE_GROUP",
    "HourlyDryBulbTemperature", "HourlyDewPointTemperature",
    "HourlyRelativeHumidity", "HourlyWindSpeed", "HourlyVisibility",
    "HourlyPrecipitation", "HourlySeaLevelPressure"
]

# Filter to existing numerical columns
existing_numerical = [col for col in numerical_features if col in otpw_3m_clean_no_leakage.columns]

# Create temporary dataset with no nulls for correlation
temp_df = otpw_3m_clean_no_leakage.select(["DEP_DEL15"] + existing_numerical).na.drop()

# Calculate correlation with target
correlations = []
for feature in existing_numerical:
    corr = temp_df.stat.corr("DEP_DEL15", feature)
    correlations.append((feature, abs(corr), corr))

# Create correlation dataframe
corr_df = pd.DataFrame(correlations, columns=["Feature", "Abs_Correlation", "Correlation"])
corr_df = corr_df.sort_values("Abs_Correlation", ascending=False)

print("\nTop 15 Features by Absolute Correlation with DEP_DEL15:")
display(corr_df.head(15))

# Visualization
plt.figure(figsize=(10, 8))
plt.barh(corr_df.head(15)["Feature"], corr_df.head(15)["Correlation"])
plt.xlabel("Correlation with DEP_DEL15")
plt.title("Top 15 Features Correlated with Departure Delay")
plt.tight_layout()
plt.show()

%md
#### Prepare Features for Modeling:

from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler

print("\n=== Preparing Features for Modeling ===")

# Define feature categories
categorical_features = [
    "OP_UNIQUE_CARRIER", "ORIGIN", "DEST", 
    "ORIGIN_STATE_ABR", "DEST_STATE_ABR",
    "DAY_OF_WEEK", "MONTH"
]

numerical_features_for_model = [
    "QUARTER", "DAY_OF_MONTH",
    "CRS_DEP_TIME", "CRS_ARR_TIME", "CRS_ELAPSED_TIME",
    "DISTANCE",
    "HourlyDryBulbTemperature", "HourlyDewPointTemperature",
    "HourlyRelativeHumidity", "HourlyWindSpeed", "HourlyVisibility",
    "HourlyPrecipitation", "HourlySeaLevelPressure"
]

# Filter to existing columns
categorical_features = [col for col in categorical_features if col in otpw_3m_clean_no_leakage.columns]
numerical_features_for_model = [col for col in numerical_features_for_model if col in otpw_3m_clean_no_leakage.columns]

print(f"Categorical features: {len(categorical_features)}")
print(f"Numerical features: {len(numerical_features_for_model)}")

# Save clean dataset before splitting
otpw_3m_clean_no_leakage.write.mode("overwrite").parquet(f"{folder_path}/otpw_3m_clean_no_leakage.parquet")
print(f"\n✓ Saved clean dataset (no leakage): {folder_path}/otpw_3m_clean_no_leakage.parquet")

%md
#### Train/Validation/Test Split (60/20/20):

from pyspark.sql.functions import rand

print("\n=== Creating Train/Validation/Test Split ===")

# Set seed for reproducibility
seed = 42

# Split: 60% train, 20% validation, 20% test
train_data, val_data, test_data = otpw_3m_clean_no_leakage.randomSplit([0.6, 0.2, 0.2], seed=seed)

# Cache for performance
train_data.cache()
val_data.cache()
test_data.cache()

print(f"Total rows: {otpw_3m_clean_no_leakage.count():,}")
print(f"Training set: {train_data.count():,} ({train_data.count()/otpw_3m_clean_no_leakage.count()*100:.1f}%)")
print(f"Validation set: {val_data.count():,} ({val_data.count()/otpw_3m_clean_no_leakage.count()*100:.1f}%)")
print(f"Test set: {test_data.count():,} ({test_data.count()/otpw_3m_clean_no_leakage.count()*100:.1f}%)")

# Check target distribution in each set
print("\n=== Target Distribution ===")
for name, dataset in [("Train", train_data), ("Validation", val_data), ("Test", test_data)]:
    delayed = dataset.filter(col("DEP_DEL15") == 1).count()
    total = dataset.count()
    print(f"{name}: {delayed:,} delayed ({delayed/total*100:.2f}%) | {total-delayed:,} on-time ({(total-delayed)/total*100:.2f}%)")

# Save splits
train_data.write.mode("overwrite").parquet(f"{folder_path}/train_data.parquet")
val_data.write.mode("overwrite").parquet(f"{folder_path}/val_data.parquet")
test_data.write.mode("overwrite").parquet(f"{folder_path}/test_data.parquet")
print(f"\n✓ Saved data splits to {folder_path}")

%md
#### Apply One-Hot Encoding and Standard Scaler (Training Data Only):

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler

print("\n=== Building Feature Engineering Pipeline ===")

# Stage 1: String Indexing for categorical features
indexers = [
    StringIndexer(inputCol=col, outputCol=f"{col}_indexed", handleInvalid="keep")
    for col in categorical_features
]

# Stage 2: One-Hot Encoding
encoders = [
    OneHotEncoder(inputCol=f"{col}_indexed", outputCol=f"{col}_encoded")
    for col in categorical_features
]

# Stage 3: Assemble all features
encoded_cols = [f"{col}_encoded" for col in categorical_features]
assembler = VectorAssembler(
    inputCols=numerical_features_for_model + encoded_cols,
    outputCol="features_unscaled"
)

# Stage 4: Standard Scaler (fitted on training data only)
scaler = StandardScaler(
    inputCol="features_unscaled",
    outputCol="features",
    withStd=True,
    withMean=True
)

# Create pipeline
pipeline = Pipeline(stages=indexers + encoders + [assembler, scaler])

print("Pipeline stages:")
print(f"  - {len(indexers)} StringIndexers")
print(f"  - {len(encoders)} OneHotEncoders")
print(f"  - 1 VectorAssembler")
print(f"  - 1 StandardScaler")

# Fit pipeline on TRAINING DATA ONLY
print("\n⚠️  IMPORTANT: Fitting pipeline on training data only to prevent data leakage")
pipeline_model = pipeline.fit(train_data)

# Transform all datasets using the fitted pipeline
train_transformed = pipeline_model.transform(train_data)
val_transformed = pipeline_model.transform(val_data)
test_transformed = pipeline_model.transform(test_data)

print("\n✓ Pipeline fitted on training data")
print("✓ Transformations applied to all datasets")

# Show example
print("\nExample of transformed features:")
train_transformed.select("DEP_DEL15", "features").show(5, truncate=False)

# Save transformed datasets and pipeline
train_transformed.write.mode("overwrite").parquet(f"{folder_path}/train_transformed.parquet")
val_transformed.write.mode("overwrite").parquet(f"{folder_path}/val_transformed.parquet")
test_transformed.write.mode("overwrite").parquet(f"{folder_path}/test_transformed.parquet")
pipeline_model.write().overwrite().save(f"{folder_path}/feature_pipeline_model")

print(f"\n✓ Saved transformed datasets and pipeline to {folder_path}")
print("\nReady for model training! ✨")