In [None]:
# 📌 Spark Setup
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("FlightDelayML").getOrCreate()


In [2]:
# 📌 Load Dataset
df = spark.read.csv("Airline_Delay_Cause.csv", header=True, inferSchema=True)
df.show(5)


+----+-----+-------+-----------------+-------+--------------------+-----------+---------+----------+----------+------+-----------+----------------+-------------+------------+---------+-------------+-------------+---------+--------------+-------------------+
|year|month|carrier|     carrier_name|airport|        airport_name|arr_flights|arr_del15|carrier_ct|weather_ct|nas_ct|security_ct|late_aircraft_ct|arr_cancelled|arr_diverted|arr_delay|carrier_delay|weather_delay|nas_delay|security_delay|late_aircraft_delay|
+----+-----+-------+-----------------+-------+--------------------+-----------+---------+----------+----------+------+-----------+----------------+-------------+------------+---------+-------------+-------------+---------+--------------+-------------------+
|2023|    8|     9E|Endeavor Air Inc.|    ABE|Allentown/Bethleh...|       89.0|     13.0|      2.25|       1.6|  3.16|        0.0|            5.99|          2.0|         1.0|   1375.0|         71.0|        761.0|    118.0|    

In [None]:
# 📌 Filter for JFK and LAX only
df = df.filter(df.airport.isin("ABE", "AEX", "ALB", "AGS"))


In [8]:
# 📌 Create binary label: 1 = delayed (>15 min), 0 = on time
from pyspark.sql.functions import when, col

df = df.withColumn("label", when(col("arr_delay") > 15, 1).otherwise(0))


In [11]:
# 📌 Feature Engineering
df = df.withColumn("MONTH", col("MONTH").cast("int"))
df = df.select("MONTH", "carrier", "airport", "label")


In [12]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline

# Index categorical columns
indexers = [
    StringIndexer(inputCol="carrier", outputCol="AIRLINE_INDEX"),
    StringIndexer(inputCol="airport", outputCol="DEST_INDEX")
]

# One-hot encode the indexed columns
encoder = OneHotEncoder(inputCols=["AIRLINE_INDEX", "DEST_INDEX"],
                        outputCols=["AIRLINE_VEC", "DEST_VEC"])

# Assemble all features into one vector
assembler = VectorAssembler(
    inputCols=["MONTH", "AIRLINE_VEC", "DEST_VEC"],  # removed DEP_HOUR
    outputCol="features"
)

# Create a pipeline
pipeline = Pipeline(stages=indexers + [encoder, assembler])


In [13]:
# 📌 Train-Test Split
train, test = df.randomSplit([0.8, 0.2], seed=42)


In [15]:
# 📌 Train & Evaluate Function
import time
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

def train_and_evaluate(model, model_name):
    pipeline = Pipeline(stages=indexers + [encoder, assembler, model])
    start = time.time()
    model_fit = pipeline.fit(train)
    predictions = model_fit.transform(test)
    
    acc_eval = MulticlassClassificationEvaluator(labelCol="label", metricName="accuracy")
    auc_eval = BinaryClassificationEvaluator(labelCol="label")

    acc = acc_eval.evaluate(predictions)
    auc = auc_eval.evaluate(predictions)

    print(f"Model: {model_name}")
    print(f"Accuracy: {acc:.4f}")
    print(f"AUC: {auc:.4f}")
    print(f"Time Taken: {time.time() - start:.2f} sec\n")


In [16]:
# 📌 Logistic Regression
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(featuresCol="features", labelCol="label", maxIter=10)
train_and_evaluate(lr, "Logistic Regression")


Model: Logistic Regression
Accuracy: 0.9677
AUC: 0.7627
Time Taken: 9.88 sec



In [17]:
# 📌 Random Forest
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(featuresCol="features", labelCol="label", numTrees=50)
train_and_evaluate(rf, "Random Forest")


Model: Random Forest
Accuracy: 0.9677
AUC: 0.7827
Time Taken: 5.67 sec



In [18]:
# 📌 Gradient Boosted Trees
from pyspark.ml.classification import GBTClassifier

gbt = GBTClassifier(featuresCol="features", labelCol="label", maxIter=30)
train_and_evaluate(gbt, "Gradient Boosted Trees")


Model: Gradient Boosted Trees
Accuracy: 0.9677
AUC: 0.7380
Time Taken: 19.68 sec

