<a href="https://colab.research.google.com/github/MounicaSrinivasan163/Flight-Delay-Prediction-PySpark-Big-Data-/blob/main/Flight_Delay_Prediction_PySpark_Big_Data.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!sudo rm /etc/apt/sources.list.d/r2u.list 2>/dev/null || true
!sudo apt-get clean
!sudo apt-get update -y
!sudo apt-get install -y openjdk-11-jdk-headless
!java -version


0% [Working]            Hit:1 https://cli.github.com/packages stable InRelease
0% [Waiting for headers] [Connecting to security.ubuntu.com (185.125.190.39)] [                                                                               Hit:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Hit:3 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease
Hit:4 http://archive.ubuntu.com/ubuntu jammy InRelease
Hit:5 http://archive.ubuntu.com/ubuntu jammy-updates InRelease
Get:6 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Hit:7 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Hit:8 https://r2u.stat.illinois.edu/ubuntu jammy InRelease
Hit:9 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:10 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:11 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Fetched 129 kB in 2s (84.1 kB/s)
Rea

In [None]:
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
!tar xf spark-3.1.2-bin-hadoop3.2.tgz

!pip install -q pyspark findspark


In [None]:
import os

# Set Java & Spark paths
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop3.2"


In [None]:
# 1. Imports
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [None]:
# 2. Start Spark session
spark = SparkSession.builder.appName("FlightDelayPrediction").getOrCreate()
spark

In [None]:
# 3. Load dataset
df = spark.read.csv("/content/flightdelaydata.csv", header=True, inferSchema=True)
df.show(5)

+----+-----+----------+---------+-------+---------------+-------------+----------+--------+--------+----------+--------+--------+---------+
|Year|Month|DayofMonth|DayOfWeek|Carrier|OriginAirportID|DestAirportID|CRSDepTime|DepDelay|DepDel15|CRSArrTime|ArrDelay|ArrDel15|Cancelled|
+----+-----+----------+---------+-------+---------------+-------------+----------+--------+--------+----------+--------+--------+---------+
|2013|    4|        19|        5|     DL|          11433|        13303|       837|      -3|       0|      1138|       1|       0|        0|
|2013|    4|        19|        5|     DL|          14869|        12478|      1705|       0|       0|      2336|      -8|       0|        0|
|2013|    4|        19|        5|     DL|          14057|        14869|       600|      -4|       0|       851|     -15|       0|        0|
|2013|    4|        19|        5|     DL|          15016|        11433|      1630|      28|       1|      1903|      24|       1|        0|
|2013|    4|        

In [None]:
df.printSchema()


root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- Carrier: string (nullable = true)
 |-- OriginAirportID: integer (nullable = true)
 |-- DestAirportID: integer (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- DepDelay: integer (nullable = true)
 |-- DepDel15: integer (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- ArrDelay: integer (nullable = true)
 |-- ArrDel15: integer (nullable = true)
 |-- Cancelled: integer (nullable = true)



In [None]:
df.count()

674564

In [None]:
df.describe().show()


+-------+------+-------------------+-----------------+-----------------+-------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+--------------------+
|summary|  Year|              Month|       DayofMonth|        DayOfWeek|Carrier|   OriginAirportID|     DestAirportID|        CRSDepTime|          DepDelay|          DepDel15|        CRSArrTime|          ArrDelay|          ArrDel15|           Cancelled|
+-------+------+-------------------+-----------------+-----------------+-------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+--------------------+
|  count|674564|             674563|           674563|           674563| 674563|            674563|            674563|            674563|            667279|            667279|            674563|            666963|            674563|      

In [None]:
from pyspark.sql.functions import col, when, substring, concat_ws

# 1. Extract departure hour from CRSDepTime (e.g., 1345 → 13)
df = df.withColumn("DepHour", (col("CRSDepTime") / 100).cast("int"))

# 2. Create time-of-day bucket
df = df.withColumn(
    "DepTimeBucket",
    when((col("DepHour") >= 5) & (col("DepHour") < 12), "Morning")
    .when((col("DepHour") >= 12) & (col("DepHour") < 17), "Afternoon")
    .when((col("DepHour") >= 17) & (col("DepHour") < 21), "Evening")
    .otherwise("Night")
)

# 3. Weekend flag
df = df.withColumn(
    "IsWeekend",
    when((col("DayOfWeek") == 6) | (col("DayOfWeek") == 7), 1).otherwise(0)
)

# 4. Route (Origin–Dest pair)
df = df.withColumn("Route", concat_ws("_", col("OriginAirportID"), col("DestAirportID")))

# 5. Bucket departure delay
df = df.withColumn(
    "DepDelayBucket",
    when(col("DepDelay") <= 0, "OnTime")
    .when((col("DepDelay") > 0) & (col("DepDelay") <= 15), "SlightDelay")
    .otherwise("HeavyDelay")
)

df.select("CRSDepTime", "DepHour", "DepTimeBucket", "IsWeekend", "Route", "DepDelayBucket").show(10, False)


+----------+-------+-------------+---------+-----------+--------------+
|CRSDepTime|DepHour|DepTimeBucket|IsWeekend|Route      |DepDelayBucket|
+----------+-------+-------------+---------+-----------+--------------+
|837       |8      |Morning      |0        |11433_13303|OnTime        |
|1705      |17     |Evening      |0        |14869_12478|OnTime        |
|600       |6      |Morning      |0        |14057_14869|OnTime        |
|1630      |16     |Afternoon    |0        |15016_11433|HeavyDelay    |
|1615      |16     |Afternoon    |0        |11193_12892|OnTime        |
|1726      |17     |Evening      |0        |10397_15016|OnTime        |
|1900      |19     |Evening      |0        |15016_10397|OnTime        |
|2145      |21     |Night        |0        |10397_14869|SlightDelay   |
|2157      |21     |Night        |0        |10397_10423|HeavyDelay    |
|1900      |19     |Evening      |0        |11278_10397|HeavyDelay    |
+----------+-------+-------------+---------+-----------+--------

# PySpark ML Pipeline

In [None]:
# 4. Define features
numeric_cols = ["Month", "DayofMonth", "DayOfWeek", "CRSDepTime", "DepDelay"]
categorical_cols = ["Carrier", "OriginAirportID", "DestAirportID"]
label_col = "ArrDel15"

In [None]:
# 5. Null handling
for c in categorical_cols:
    df = df.withColumn(c, col(c).cast("string"))
    df = df.fillna({c: "Unknown"})
df = df.fillna({c: 0 for c in numeric_cols})
df = df.dropna(subset=[label_col])  # drop rows where label is null

In [None]:
# 6. Split data
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)

In [None]:
# 7. Categorical encoding
indexers = [StringIndexer(inputCol=c, outputCol=c+"_idx", handleInvalid="keep") for c in categorical_cols]
ohe = OneHotEncoder(inputCols=[c+"_idx" for c in categorical_cols],
                    outputCols=[c+"_ohe" for c in categorical_cols],
                    handleInvalid="keep")

In [None]:
# 8. Assemble features
assembler_inputs = numeric_cols + [c+"_ohe" for c in categorical_cols]
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features", handleInvalid="skip")

In [None]:
# 9. Scale features
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=False)

In [None]:
# 10. Random Forest Classifier
rf = RandomForestClassifier(featuresCol="scaledFeatures", labelCol=label_col, numTrees=50)


In [None]:
# 11. Build pipeline
pipeline = Pipeline(stages=indexers + [ohe, assembler, scaler, rf])

In [None]:
# 12. Train model
rf_model = pipeline.fit(train_df)

In [None]:
# 13. Predictions
preds = rf_model.transform(test_df)
preds.select(label_col, "prediction", "probability").show(10)

+--------+----------+--------------------+
|ArrDel15|prediction|         probability|
+--------+----------+--------------------+
|       0|       0.0|[0.85014713236954...|
|       0|       0.0|[0.85014713236954...|
|       0|       0.0|[0.85014713236954...|
|       0|       0.0|[0.84866576950919...|
|       0|       0.0|[0.84453782431030...|
|       0|       0.0|[0.84453782431030...|
|       1|       0.0|[0.84685220262571...|
|       0|       0.0|[0.84923401672453...|
|       1|       0.0|[0.84685220262571...|
|       0|       0.0|[0.84544577574716...|
+--------+----------+--------------------+
only showing top 10 rows



In [None]:
# 14. Evaluation
evaluator = MulticlassClassificationEvaluator(labelCol=label_col, predictionCol="prediction", metricName="f1")
f1 = evaluator.evaluate(preds)
precision = evaluator.evaluate(preds, {evaluator.metricName: "weightedPrecision"})
recall = evaluator.evaluate(preds, {evaluator.metricName: "weightedRecall"})

print(f"F1-score: {f1:.4f}, Precision: {precision:.4f}, Recall: {recall:.4f}")

F1-score: 0.8283, Precision: 0.8638, Recall: 0.8549


In [None]:
# 15. Feature importance
rf_model_stages = rf_model.stages[-1]  # get trained RandomForest stage
importances = rf_model_stages.featureImportances
print("Feature importances:", importances)

Feature importances: (164,[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,18,19,20,22,23,24,26,27,30,31,32,33,34,35,36,37,39,41,42,44,45,47,48,50,52,53,58,59,62,67,68,70,72,74,75,76,77,81,84,87,91,93,94,96,97,98,99,100,101,102,103,105,106,107,109,110,112,113,115,116,117,118,121,122,126,127,128,131,133,135,136,141,142,143,145,155,156,159],[0.001128470074931612,0.028156970159298977,0.031324759525033594,0.0699202318191295,0.7122513570138483,9.092949370251746e-05,0.023850237827113974,0.009739523457629554,4.867665930797147e-05,0.02169771796307674,0.0002545456192731215,0.0001713176211335955,6.37937611137357e-05,0.00041681728895374975,2.9953046787085107e-05,0.0013404600899958852,0.024380084804031074,0.0001227617935102159,6.47649984415786e-05,6.067287362632376e-07,0.013350755640881154,0.012604042872634556,0.008109074683494075,0.0015212803663528485,0.0001308873232835138,2.1336687590661265e-05,8.32266782241622e-05,8.584861995006532e-06,4.892384387344494e-06,5.3711316453252425e-06,2.3530582834392355e-0