We are use the popular Flights Dataset to analyze and predict flight delays in airports based on past flight records.
For this dataset, we will only look at the flights in 2014 .

In [None]:
!pip install pyspark



# Import findspark and findspark.init() to make pyspark importable as a regular library

In [None]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
conf = SparkConf().setAppName('appName').setMaster('local')
sc = SparkContext(conf=conf)
spark = SparkSession(sc)

#importing some packages we need

In [None]:
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql import Row
from pyspark.mllib.regression import LabeledPoint
from pyspark.sql.functions import udf
from pyspark.mllib.linalg import Vectors
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.param import Param, Params
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.stat import Statistics
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.mllib.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from IPython.display import display
from ipywidgets import interact
import sys
import numpy as np
import pandas as pd
import time
import datetime
import matplotlib.pyplot as plt
import os.path
import pyarrow as pa
from pyarrow import csv
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
%matplotlib inline

Removing headers of the dataset and renaming it so that further it will be easier for us analyse

In [None]:
textFile = sc.textFile('cleaned_dataset.csv')
textFileRDD = textFile.map(lambda x: x.split(','))
header = textFileRDD.first()
textRDD = textFileRDD.filter(lambda r: r != header)

# Creating the Dataframe from RDD (Resilient Distributed Dataset)

In [None]:
def parse(r):
    try:
        x=Row(Year=int(r[0]),\
          Month=int(r[1]),\
          DayofMonth=int(r[2]),\
          DayOfWeek=int(r[3]),\
          DepTime=int(float(r[8])), \
          CRSDepTime=int(r[7]),\
          ArrTime=int(float(r[11])),\
          CRSArrTime=int(r[10]), \
          UniqueCarrier=r[4],\
          DepDelay=int(float(r[9])),\
          Origin=r[5],\
          Dest=r[6], \
          Distance=int(float(r[12])),\
          CarrierDelay=int(float(r[13])),\
          WeatherDelay=int(float(r[14])),\
          NASDelay= int(float(r[15])),\
          SecurityDelay=int(float(r[16])),\
          LateAircraftDelay=int(float(r[17])))
    except:
        x=None
    return x

rowRDD = textRDD.map(lambda r: parse(r)).filter(lambda r:r != None)
sqlContext = SQLContext(sc)
airline_df = sqlContext.createDataFrame(rowRDD)



DepDelayed is a new column added to dataframe

Feature Engineering

True for delay > 15 minutes

False for delay <=15 minutes

In [None]:
airline_df = airline_df.withColumn('DepDelayed', when(airline_df['DepDelay'] > 15, 1).otherwise(0))

In [None]:
# define hour function to obtain hour of day
def hour_ex(x):
    h = int(str(int(x)).zfill(4)[:2])
    return h

# register as a UDF
sqlContext.udf.register("hour_ex_py",hour_ex, IntegerType())
f_udf = udf(hour_ex, IntegerType())

#CRSDepTime: scheduled departure time (local, hhmm)
airline_df = airline_df.withColumn('hour', f_udf(airline_df.CRSDepTime))
airline_df.registerTempTable("airlineDF")



In [None]:
airline_df.head(n=5)

[Row(Year=2014, Month=1, DayofMonth=3, DayOfWeek=5, DepTime=1513, CRSDepTime=1510, ArrTime=1621, CRSArrTime=1620, UniqueCarrier='AA', DepDelay=3, Origin='DFW', Dest='SAT', Distance=247, CarrierDelay=0, WeatherDelay=0, NASDelay=0, SecurityDelay=0, LateAircraftDelay=0, DepDelayed=0, hour=15),
 Row(Year=2014, Month=1, DayofMonth=4, DayOfWeek=6, DepTime=1522, CRSDepTime=1510, ArrTime=1618, CRSArrTime=1620, UniqueCarrier='AA', DepDelay=12, Origin='DFW', Dest='SAT', Distance=247, CarrierDelay=0, WeatherDelay=0, NASDelay=0, SecurityDelay=0, LateAircraftDelay=0, DepDelayed=0, hour=15),
 Row(Year=2014, Month=1, DayofMonth=5, DayOfWeek=7, DepTime=1522, CRSDepTime=1510, ArrTime=1624, CRSArrTime=1620, UniqueCarrier='AA', DepDelay=12, Origin='DFW', Dest='SAT', Distance=247, CarrierDelay=0, WeatherDelay=0, NASDelay=0, SecurityDelay=0, LateAircraftDelay=0, DepDelayed=0, hour=15),
 Row(Year=2014, Month=1, DayofMonth=6, DayOfWeek=1, DepTime=1612, CRSDepTime=1510, ArrTime=1710, CRSArrTime=1620, UniqueCa

Feature Selection:

1.   Decide which columns will be features (X) and the target (y).
2.   Drop unnecessary or redundant columns that won't contribute to the prediction.

In [None]:
# Drop columns you don’t need for modeling
columns_to_drop = ['Year', 'DepDelay', 'ArrDelay', 'DepTime', 'ArrTime', 'CRSArrTime']
airline_df = airline_df.drop(*columns_to_drop)

One-Hot Encoding

*   Convert categorical columns (UniqueCarrier, Origin, Dest) into numerical formats using one-hot encoding.


Assemble Features

*   Combine all features into a single vector for model input.



Train-Test Split

Check for Class Imbalance

**Based on the class distribution we can see:**

1.   1143834 delayed flights (1) (about 20% of the dataset)
2.   4532505 on-time flights (0) (about 80% of the dataset)



Our dataset is imbalanced, with a much higher proportion of on-time flights compared to delayed flights.

**What Does This Mean for Your Model?**

Accuracy: If your model predicts no delay (class 0) for almost all flights, it could still achieve a high accuracy (~80%) because the majority class (on-time) is so dominant.

Under Sampling Majority Class

In [None]:
# Checking class distribution based on the 'DepDelayed' column
airline_df.groupBy("DepDelayed").count().show()

# Undersample the majority class (0 - on-time flights)
fraction_delayed = 1.0  # Keep all delayed flights (1s)
fraction_on_time = 1143834 / 4532505  # Undersample on-time flights to match the delayed ones

# Undersampling the majority class
balanced_airline_df = airline_df.sampleBy("DepDelayed", fractions={0: fraction_on_time, 1: fraction_delayed}, seed=42)

# Verifying new class distribution
balanced_airline_df.groupBy("DepDelayed").count().show()


+----------+-------+
|DepDelayed|  count|
+----------+-------+
|         1|1147992|
|         0|4542191|
+----------+-------+

+----------+-------+
|DepDelayed|  count|
+----------+-------+
|         1|1147992|
|         0|1146245|
+----------+-------+



In [None]:
# Index and encode categorical variables
indexer_carrier = StringIndexer(inputCol="UniqueCarrier", outputCol="UniqueCarrierIndex")
encoder_carrier = OneHotEncoder(inputCol="UniqueCarrierIndex", outputCol="UniqueCarrierVec")

indexer_origin = StringIndexer(inputCol="Origin", outputCol="OriginIndex")
encoder_origin = OneHotEncoder(inputCol="OriginIndex", outputCol="OriginVec")

indexer_dest = StringIndexer(inputCol="Dest", outputCol="DestIndex")
encoder_dest = OneHotEncoder(inputCol="DestIndex", outputCol="DestVec")

# Create a pipeline to ensure transformations are sequential
pipeline = Pipeline(stages=[
    indexer_carrier, encoder_carrier,
    indexer_origin, encoder_origin,
    indexer_dest, encoder_dest
])

# Fit and transform the pipeline on the balanced dataset
model = pipeline.fit(balanced_airline_df)
balanced_airline_df = model.transform(balanced_airline_df)

In [None]:
assembler = VectorAssembler(
    inputCols=["hour", "Distance", "UniqueCarrierVec", "OriginVec", "DestVec"],
    outputCol="features"
)
balanced_airline_df = assembler.transform(balanced_airline_df)

In [None]:
print(balanced_airline_df.columns)

['Month', 'DayofMonth', 'DayOfWeek', 'CRSDepTime', 'UniqueCarrier', 'Origin', 'Dest', 'Distance', 'CarrierDelay', 'WeatherDelay', 'NASDelay', 'SecurityDelay', 'LateAircraftDelay', 'DepDelayed', 'hour', 'UniqueCarrierIndex', 'UniqueCarrierVec', 'OriginIndex', 'OriginVec', 'DestIndex', 'DestVec', 'features']


In [None]:
train_data, test_data = balanced_airline_df.randomSplit([0.8, 0.2], seed=42)

In [None]:
rf = RandomForestClassifier(
    featuresCol="features",
    labelCol="DepDelayed",
    numTrees=50,
    seed=42
)

In [None]:
model = rf.fit(train_data)

In [None]:
def calculate_metrics(predictions):
   
    metrics = predictions.groupBy("DepDelayed", "prediction").count().toPandas()
    TP = metrics[(metrics["DepDelayed"] == 1) & (metrics["prediction"] == 1)]["count"].sum()
    FP = metrics[(metrics["DepDelayed"] == 0) & (metrics["prediction"] == 1)]["count"].sum()
    FN = metrics[(metrics["DepDelayed"] == 1) & (metrics["prediction"] == 0)]["count"].sum()
    TN = metrics[(metrics["DepDelayed"] == 0) & (metrics["prediction"] == 0)]["count"].sum()

   
    precision = TP / (TP + FP) if (TP + FP) > 0 else 0.0
    recall = TP / (TP + FN) if (TP + FN) > 0 else 0.0
    f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0.0

    return precision, recall, f1_score

In [None]:
# Make predictions
predictions = model.transform(test_data)

# Evaluate AUC-ROC
evaluator = BinaryClassificationEvaluator(
    labelCol="DepDelayed",
    rawPredictionCol="rawPrediction",
    metricName="areaUnderROC"
)
auc = evaluator.evaluate(predictions)
print(f"AUC-ROC: {auc}")
precision, recall, f1_score = calculate_metrics(predictions)
print(f"Precision: {precision}, Recall: {recall}, F1-Score: {f1_score}")
# Evaluate precision and recall
predictions.groupBy("DepDelayed", "prediction").count().show()


AUC-ROC: 0.6730430321524823
Precision: 0.6109722321124058, Recall: 0.7104431689916805, F1-Score: 0.6569638107519948
+----------+----------+------+
|DepDelayed|prediction| count|
+----------+----------+------+
|         1|       0.0| 66442|
|         0|       0.0|125157|
|         1|       1.0|163019|
|         0|       1.0|103800|
+----------+----------+------+



The accuracy is approximately 62.87%.

> Add blockquote




In [None]:
lr = LogisticRegression(labelCol="DepDelayed", featuresCol="features")
lr_model = lr.fit(balanced_airline_df)
lr_predictions = lr_model.transform(balanced_airline_df)

# Evaluate
lr_auc = evaluator.evaluate(lr_predictions)
print(f"Logistic Regression AUC-ROC: {lr_auc}")
precision, recall, f1_score = calculate_metrics(lr_predictions)
print(f"Precision: {precision}, Recall: {recall}, F1-Score: {f1_score}")
lr_predictions.groupBy("DepDelayed", "prediction").count().show()


Logistic Regression AUC-ROC: 0.6858332470556392
Precision: 0.634198924758563, Recall: 0.6508616784785957, F1-Score: 0.6424222727206289
+----------+----------+------+
|DepDelayed|prediction| count|
+----------+----------+------+
|         1|       0.0|400808|
|         0|       0.0|715275|
|         1|       1.0|747184|
|         0|       1.0|430970|
+----------+----------+------+



In [None]:
dt = DecisionTreeClassifier(labelCol="DepDelayed", featuresCol="features", maxDepth=5)
dt_model = dt.fit(balanced_airline_df)
dt_predictions = dt_model.transform(balanced_airline_df)

# Evaluate
dt_auc = evaluator.evaluate(dt_predictions)
print(f"Decision Tree AUC-ROC: {dt_auc}")
precision, recall, f1_score = calculate_metrics(dt_predictions)
print(f"Precision: {precision}, Recall: {recall}, F1-Score: {f1_score}")
dt_predictions.groupBy("DepDelayed", "prediction").count().show()


Decision Tree AUC-ROC: 0.5332783605705813
Precision: 0.6049811413420918, Recall: 0.6867295242475557, F1-Score: 0.6432685313207461
+----------+----------+------+
|DepDelayed|prediction| count|
+----------+----------+------+
|         1|       0.0|359632|
|         0|       0.0|631490|
|         1|       1.0|788360|
|         0|       1.0|514755|
+----------+----------+------+



In [None]:

spark = SparkSession.builder \
    .appName("Logistic Regression with Hyperparameter Tuning") \
    .getOrCreate()


data = spark.read.csv("cleaned_dataset.csv", header=True, inferSchema=True)

# Create a binary label column (1: Delayed > 15 mins, 0: Not Delayed)
data = data.withColumn("DEP_DELAY", when(col("DEP_DELAY") > 15, 1).otherwise(0))


carrier_indexer = StringIndexer(inputCol="OP_UNIQUE_CARRIER", outputCol="CarrierIndex", handleInvalid="skip")
origin_indexer = StringIndexer(inputCol="ORIGIN", outputCol="OriginIndex", handleInvalid="skip")
dest_indexer = StringIndexer(inputCol="DEST", outputCol="DestIndex", handleInvalid="skip")

# Assemble feature vector
feature_columns = ['MONTH', 'DAY_OF_MONTH', 'DAY_OF_WEEK', 'CRS_DEP_TIME',
                   'CarrierIndex', 'OriginIndex', 'DestIndex', 'DISTANCE']
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")


pipeline = Pipeline(stages=[carrier_indexer, origin_indexer, dest_indexer, assembler])

data = pipeline.fit(data).transform(data)

train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

# Adding class weights (to handle class imbalance)
train_data = train_data.withColumn(
    "classWeight",
    when(col("DEP_DELAY") == 1, 1.5).otherwise(1)
)


lr = LogisticRegression(
    featuresCol="features",
    labelCol="DEP_DELAY",  
    weightCol="classWeight", 
    regParam=0.1,
    elasticNetParam=0.5,
    maxIter=50
)

# Define Hyperparameter Grid
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.1, 0.5]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .addGrid(lr.maxIter, [10, 50, 100]) \
    .build()

# Define Cross-Validator
evaluator = BinaryClassificationEvaluator(
    labelCol="DEP_DELAY",
    metricName="areaUnderROC"
)
crossval = CrossValidator(
    estimator=lr,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=3
)

print("Starting model training...")
cv_model = crossval.fit(train_data)
print("Model training completed.")

# Evaluate Best Model
best_model = cv_model.bestModel
print(f"Best Model Parameters: regParam={best_model._java_obj.getRegParam()}, "
      f"elasticNetParam={best_model._java_obj.getElasticNetParam()}, "
      f"maxIter={best_model._java_obj.getMaxIter()}")


predictions = best_model.transform(test_data)

# Evaluate Predictions
auc_roc = evaluator.evaluate(predictions)
print(f"Test AUC-ROC: {auc_roc:.4f}")


best_model.write().overwrite().save("logistic_regression_best_model")


Starting model training...
Model training completed.
Best Model Parameters: regParam=0.01, elasticNetParam=0.0, maxIter=100
Test AUC-ROC: 0.6539
