In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
from pyspark.sql import SparkSession

# Start a Spark session 
spark = SparkSession.builder.appName("FlightDelayPrediction").getOrCreate()

# Load the flight delay dataset from a CSV file, including headers and inferring data types
df = spark.read.csv("/content/drive/MyDrive/flight_data_with_delay.csv", header=True, inferSchema=True)

# Print the structure and data types of each column in the dataset
df.printSchema()

# Display the first 10 rows of data to get an initial view of the dataset
df.show(10)

# Calculate and print the total number of rows in the dataset
total_rows = df.count()
print(f"Total number of rows: {total_rows}")


root
 |-- Airline: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Cancelled: boolean (nullable = true)
 |-- Diverted: boolean (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- DepTime: double (nullable = true)
 |-- DepDelayMinutes: double (nullable = true)
 |-- DepDelay: double (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepDel15: double (nullable = true)
 |-- delayed: integer (nullable = true)

+--------------------+------+----+---------+--------+----------+-------+---------------+--------+-----+---------+--------+-------+
|             Airline|Origin|Dest|Cancelled|Diverted|CRSDepTime|DepTime|DepDelayMinutes|DepDelay|Month|DayOfWeek|DepDel15|delayed|
+--------------------+------+----+---------+--------+----------+-------+---------------+--------+-----+---------+--------+-------+
|Commutair Aka Cha...|   GJT| DEN|    false|   false|      1133| 1123.0|    

In [4]:
from pyspark.sql.functions import col, when, year, month, dayofweek
from pyspark.sql.types import IntegerType
from pyspark.sql import functions as F

# Count the number of missing (null) values in each column
missing_values = df.select([F.sum(F.col(c).isNull().cast("int")).alias(c) for c in df.columns])

# Show the missing values count for each column
missing_values.show()

+-------+------+----+---------+--------+----------+-------+---------------+--------+-----+---------+--------+-------+
|Airline|Origin|Dest|Cancelled|Diverted|CRSDepTime|DepTime|DepDelayMinutes|DepDelay|Month|DayOfWeek|DepDel15|delayed|
+-------+------+----+---------+--------+----------+-------+---------------+--------+-----+---------+--------+-------+
|      0|     0|   0|        0|       0|         0| 120433|         120495|  120495|    0|        0|  120495|      0|
+-------+------+----+---------+--------+----------+-------+---------------+--------+-----+---------+--------+-------+



In [5]:
# Fill missing values for specific columns
df = df.fillna({
    "DepDelayMinutes": 0,  # Fill with 0 for delay-related columns
    "DepDelay": 0,         # Fill with 0 for DepDelay
    "DepDel15": 0,         # Fill with 0 for DepDel15
    "delayed": 0           # Fill with 0 for delayed column
})


In [6]:
# Check for missing values again after handling them
missing_values_after = df.select([F.sum(F.col(c).isNull().cast("int")).alias(c) for c in df.columns])
missing_values_after.show()

+-------+------+----+---------+--------+----------+-------+---------------+--------+-----+---------+--------+-------+
|Airline|Origin|Dest|Cancelled|Diverted|CRSDepTime|DepTime|DepDelayMinutes|DepDelay|Month|DayOfWeek|DepDel15|delayed|
+-------+------+----+---------+--------+----------+-------+---------------+--------+-----+---------+--------+-------+
|      0|     0|   0|        0|       0|         0| 120433|              0|       0|    0|        0|       0|      0|
+-------+------+----+---------+--------+----------+-------+---------------+--------+-----+---------+--------+-------+



In [7]:
df_cleaned = df.dropna(subset=["depTime"])

In [8]:
df_cleaned.show()

+--------------------+------+----+---------+--------+----------+-------+---------------+--------+-----+---------+--------+-------+
|             Airline|Origin|Dest|Cancelled|Diverted|CRSDepTime|DepTime|DepDelayMinutes|DepDelay|Month|DayOfWeek|DepDel15|delayed|
+--------------------+------+----+---------+--------+----------+-------+---------------+--------+-----+---------+--------+-------+
|Commutair Aka Cha...|   GJT| DEN|    false|   false|      1133| 1123.0|            0.0|   -10.0|    4|        1|     0.0|      0|
|Commutair Aka Cha...|   HRL| IAH|    false|   false|       732|  728.0|            0.0|    -4.0|    4|        1|     0.0|      0|
|Commutair Aka Cha...|   DRO| DEN|    false|   false|      1529| 1514.0|            0.0|   -15.0|    4|        1|     0.0|      0|
|Commutair Aka Cha...|   IAH| GPT|    false|   false|      1435| 1430.0|            0.0|    -5.0|    4|        1|     0.0|      0|
|Commutair Aka Cha...|   DRO| DEN|    false|   false|      1135| 1135.0|           

In [9]:
# 2. Dealing with duplicates
df_cleaned = df_cleaned.dropDuplicates()

In [10]:
# 3. Data type conversion
df_cleaned = df_cleaned.withColumn("CRSDepTime", col("CRSDepTime").cast("timestamp"))
df_cleaned = df_cleaned.withColumn("DepTime", col("DepTime").cast("timestamp"))
df_cleaned = df_cleaned.withColumn("Month", col("Month").cast(IntegerType()))
df_cleaned = df_cleaned.withColumn("DayOfWeek", col("DayOfWeek").cast(IntegerType()))

In [11]:
# 4. Filtering out invalid rows (e.g., rows with negative or zero departure delays)
df_cleaned = df_cleaned.filter((col("DepDelayMinutes") >= 0) & (col("DepDelayMinutes").isNotNull()))

In [12]:
# 5. Normalization or scaling
# For example, normalizing the 'DepDelayMinutes' column
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import VectorAssembler

# Assemble the features into a vector
assembler = VectorAssembler(inputCols=["DepDelayMinutes"], outputCol="features")
df_featured = assembler.transform(df_cleaned)

scaler = MinMaxScaler(inputCol="features", outputCol="scaled_features")
scaler_model = scaler.fit(df_featured)
df_scaled = scaler_model.transform(df_featured)

# Show transformed dataset
df_scaled.show(10)


+--------------------+------+----+---------+--------+-------------------+-------------------+---------------+--------+-----+---------+--------+-------+--------+--------------------+
|             Airline|Origin|Dest|Cancelled|Diverted|         CRSDepTime|            DepTime|DepDelayMinutes|DepDelay|Month|DayOfWeek|DepDel15|delayed|features|     scaled_features|
+--------------------+------+----+---------+--------+-------------------+-------------------+---------------+--------+-----+---------+--------+-------+--------+--------------------+
|GoJet Airlines, L...|   IAD| RIC|    false|   false|1970-01-01 00:20:34|1970-01-01 00:20:27|            0.0|    -7.0|    4|        7|     0.0|      0|   [0.0]|               [0.0]|
|Commutair Aka Cha...|   HRL| IAH|    false|   false|1970-01-01 00:20:09|1970-01-01 00:21:48|           59.0|    59.0|    4|        7|     1.0|      1|  [59.0]|[0.00816835110065...|
|Commutair Aka Cha...|   IAH| MEM|    false|   false|1970-01-01 00:27:01|1970-01-01 00:28:

**DATA ANALYSIS USING SQL**

In [13]:
# creating DataFrame as a temporary SQL view
df_cleaned.createOrReplaceTempView("flights")

# 1. Aggregation: Summary statistics
agg_df = spark.sql("""
    SELECT
        AVG(DepDelayMinutes) AS avg_depdelay,
        STDDEV(DepDelayMinutes) AS stddev_depdelay,
        MIN(DepDelayMinutes) AS min_depdelay,
        MAX(DepDelayMinutes) AS max_depdelay
    FROM flights
""")
agg_df.show()
avg_delay = agg_df.collect()[0]['avg_depdelay']
print(f"1. What is the average departure delay?")
print(f"Answer: {avg_delay}")

+------------------+------------------+------------+------------+
|      avg_depdelay|   stddev_depdelay|min_depdelay|max_depdelay|
+------------------+------------------+------------+------------+
|16.951034000982784|53.757603044003645|         0.0|      7223.0|
+------------------+------------------+------------+------------+

1. What is the average departure delay?
Answer: 16.951034000982784


**Observation:** The average departure delay across all flights in the dataset is approximately 16.95 minutes. This means that, on average, flights were delayed by about 17 minutes

In [14]:
# 2. Grouping and filtering: Example of grouping by airline and calculating the average delay
grouped_df = spark.sql("""
    SELECT airline, AVG(DepDelayMinutes) AS avg_delay
    FROM flights
    GROUP BY airline
    ORDER BY avg_delay DESC
""")
grouped_df.show()

top_airline = grouped_df.collect()[0]['airline']
top_airline_avg_delay = grouped_df.collect()[0]['avg_delay']
print(f"2. What airline has the highest average departure delay?")
print(f"Answer: {top_airline}, {top_airline_avg_delay}")

+--------------------+------------------+
|             airline|         avg_delay|
+--------------------+------------------+
|     JetBlue Airways|29.915710107336995|
|Frontier Airlines...|24.323954232283466|
|       Allegiant Air|23.040772532188843|
|GoJet Airlines, L...|21.326688562973562|
|  Mesa Airlines Inc.|20.078229033105217|
|    Spirit Air Lines| 19.19650936216831|
|American Airlines...| 18.84348847387083|
|         Comair Inc.| 18.01541812486234|
|Commutair Aka Cha...|17.379058502049435|
|United Air Lines ...| 17.16639744971648|
|Southwest Airline...| 16.70069956317939|
|SkyWest Airlines ...| 16.07441887129177|
|   Republic Airlines|15.822337686014448|
|   Endeavor Air Inc.|14.790233300229556|
|Delta Air Lines Inc.|14.751198668051984|
|Air Wisconsin Air...|13.917513106159895|
|Capital Cargo Int...|  13.1522756273926|
|           Envoy Air|11.032422414365987|
|Alaska Airlines Inc.|10.541964522107493|
|         Horizon Air| 9.408613063209696|
+--------------------+------------

**Observation:** JetBlue Airways has the highest average departure delay, with flights being delayed by an average of approximately 29.92 minutes.

In [15]:
# 3. Time-based analysis: Example analysis over months
time_analysis_df = spark.sql("""
    SELECT Month, AVG(DepDelayMinutes) AS avg_delay
    FROM flights
    GROUP BY Month
    ORDER BY avg_delay DESC
""")
time_analysis_df.show()

# Find the month with the highest average delay
highest_delay_row = time_analysis_df.collect()[0]
month_with_highest_delay = highest_delay_row['Month']
highest_month_avg_delay = highest_delay_row['avg_delay']

# Print
print(f"3. How do delays vary by month?")
print(f"Answer: Month {month_with_highest_delay}, {highest_month_avg_delay}")


+-----+------------------+
|Month|         avg_delay|
+-----+------------------+
|    6| 19.23793084789784|
|    7|18.828658075634678|
|    4| 17.02915422343938|
|    5|16.218215708727833|
|    3|16.128246709076365|
|    1| 15.48457275939608|
|    2|15.274601887034112|
+-----+------------------+

3. How do delays vary by month?
Answer: Month 6, 19.23793084789784


**Observation:** The month with the highest average departure delay is June (Month 6), with an average delay of approximately 19.24 minutes.

In [16]:
# 4. Count the number of canceled flights by airline
canceled_flights_df = spark.sql("""
    SELECT airline, COUNT(*) AS canceled_flights
    FROM flights
    WHERE cancelled = 1
    GROUP BY airline
    ORDER BY canceled_flights DESC
""")
canceled_flights_df.show()

top_airline_canceled = canceled_flights_df.collect()[0]['airline']
top_airline_canceled_count = canceled_flights_df.collect()[0]['canceled_flights']
print(f"4. Which airline has the most canceled flights?")
print(f"Answer: {top_airline_canceled}, {top_airline_canceled_count}")


+--------------------+----------------+
|             airline|canceled_flights|
+--------------------+----------------+
|American Airlines...|             481|
|   Republic Airlines|             343|
|         Comair Inc.|             192|
|Delta Air Lines Inc.|             187|
|     JetBlue Airways|             181|
|SkyWest Airlines ...|             180|
|United Air Lines ...|             163|
|   Endeavor Air Inc.|             137|
|           Envoy Air|             132|
|Southwest Airline...|             126|
|Alaska Airlines Inc.|             103|
|    Spirit Air Lines|             102|
|       Allegiant Air|              83|
|Frontier Airlines...|              78|
|         Horizon Air|              63|
|Capital Cargo Int...|              61|
|GoJet Airlines, L...|              49|
|  Mesa Airlines Inc.|              40|
|Air Wisconsin Air...|              28|
|Commutair Aka Cha...|              28|
+--------------------+----------------+
only showing top 20 rows

4. Which airli

**Observation:** The dataset shows that American Airlines Inc. had the highest number of canceled flights, totaling 481.

In [17]:
# 5. Average departure delay for diverted flights
diverted_flights_delay_df = spark.sql("""
    SELECT AVG(DepDelayMinutes) AS avg_delay_diverted
    FROM flights
    WHERE diverted = 1
""")
diverted_flights_delay_df.show()

# Print question and answer for diverted flights
avg_delay_diverted = diverted_flights_delay_df.collect()[0]['avg_delay_diverted']
print(f"5. What is the average departure delay for diverted flights?")
print(f"Answer: {avg_delay_diverted}")


+------------------+
|avg_delay_diverted|
+------------------+
|30.967071736573892|
+------------------+

5. What is the average departure delay for diverted flights?
Answer: 30.967071736573892


**Observation:** The average departure delay for diverted flights is approximately 30.97 minutes. This indicates that, on average, diverted flights experience significant delays.

**Model Training (Classification)**

In [18]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline


In [19]:
from pyspark.sql.functions import hour, minute, col
from pyspark.ml.feature import VectorAssembler

# Convert CRSDepTime and DepTime columns to hour and minute
df_cleaned = df_cleaned.withColumn("CRSDepHour", hour(col("CRSDepTime"))) \
                       .withColumn("CRSDepMinute", minute(col("CRSDepTime"))) \
                       .withColumn("DepHour", hour(col("DepTime"))) \
                       .withColumn("DepMinute", minute(col("DepTime")))

In [20]:
# Assemble features (selecting relevant columns)
assembler = VectorAssembler(inputCols=["Month", "DayOfWeek", "CRSDepHour", "CRSDepMinute", "DepHour", "DepMinute"], outputCol="features")
df_featured = assembler.transform(df_cleaned)

# Split data into training and testing sets
train_data, test_data = df_featured.randomSplit([0.8, 0.2], seed=42)

# Define the Logistic Regression model
lr = LogisticRegression(labelCol="delayed", featuresCol="features")

# Create the pipeline (no need to include assembler here, it was already applied)
pipeline = Pipeline(stages=[lr])

# Fit the model
model = pipeline.fit(train_data)

In [21]:
# Make predictions
predictions = model.transform(test_data)

In [23]:
# Evaluate the model using accuracy
correct_predictions = predictions.filter(predictions.delayed == predictions.prediction).count()
total_predictions = predictions.count()
accuracy = correct_predictions / total_predictions

print(f"Model Accuracy: {accuracy}")

Model Accuracy: 0.692447458863505


**Observation:** The model accuracy of 0.6924 (approximately 69.24%) indicates that about 69.24% of the predictions made by the model are correct, meaning that the model was able to correctly classify whether a flight would be delayed or not for almost 70% of the cases.

**Model Tuning and Evaluation**

In [25]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Define parameter grid for hyperparameter tuning
param_grid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.1, 0.5]) \
    .addGrid(lr.maxIter, [10, 50]) \
    .build()

# Cross-validation setup
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=param_grid,
                          evaluator=BinaryClassificationEvaluator(labelCol="delayed"),
                          numFolds=3)

In [26]:
# Fit the model using cross-validation
cv_model = crossval.fit(train_data)


In [28]:
# Get predictions from the cross-validated model
cv_predictions = cv_model.transform(test_data)

# Calculate accuracy
correct_predictions = cv_predictions.filter(cv_predictions.delayed == cv_predictions.prediction).count()
total_predictions = cv_predictions.count()
accuracy = correct_predictions / total_predictions

# Print the accuracy
print(f"Model Accuracy: {accuracy}")

Model Accuracy: 0.6126538553856592


**Observation:** The Model Accuracy of 0.6127 means that approximately 61.27% of the predictions made by the model are correct. This suggests that the model performs moderately well, but there is still room for improvement.