TASK 1 : Data Loading and Exploration (RDD & DataFrame): 

In [0]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("FlightDataAnalysis") \
    .getOrCreate()


In [0]:
data = spark.read.csv("/FileStore/tables/flights_sample_3m.csv", header=True, inferSchema=True)  

In [0]:
data.show(5)

+----------+--------------------+--------------------+------------+--------+---------+------+-------------------+----+--------------------+------------+--------+---------+--------+----------+---------+-------+------------+--------+---------+---------+-----------------+--------+----------------+------------+--------+--------+-----------------+-----------------+-------------+------------------+-----------------------+
|   FL_DATE|             AIRLINE|         AIRLINE_DOT|AIRLINE_CODE|DOT_CODE|FL_NUMBER|ORIGIN|        ORIGIN_CITY|DEST|           DEST_CITY|CRS_DEP_TIME|DEP_TIME|DEP_DELAY|TAXI_OUT|WHEELS_OFF|WHEELS_ON|TAXI_IN|CRS_ARR_TIME|ARR_TIME|ARR_DELAY|CANCELLED|CANCELLATION_CODE|DIVERTED|CRS_ELAPSED_TIME|ELAPSED_TIME|AIR_TIME|DISTANCE|DELAY_DUE_CARRIER|DELAY_DUE_WEATHER|DELAY_DUE_NAS|DELAY_DUE_SECURITY|DELAY_DUE_LATE_AIRCRAFT|
+----------+--------------------+--------------------+------------+--------+---------+------+-------------------+----+--------------------+------------+--------

In [0]:
#Convert DataFrame to RDD and Perform Operations

rdd = data.rdd
# Count the number of flights  
num_flights = rdd.count()
print(f"Total Flights: {num_flights}")

# Convert RDD back to DataFrame  
data_from_rdd = spark.createDataFrame(rdd, schema=data.schema)
data_from_rdd.show(5)

Total Flights: 3000000
+----------+--------------------+--------------------+------------+--------+---------+------+-------------------+----+--------------------+------------+--------+---------+--------+----------+---------+-------+------------+--------+---------+---------+-----------------+--------+----------------+------------+--------+--------+-----------------+-----------------+-------------+------------------+-----------------------+
|   FL_DATE|             AIRLINE|         AIRLINE_DOT|AIRLINE_CODE|DOT_CODE|FL_NUMBER|ORIGIN|        ORIGIN_CITY|DEST|           DEST_CITY|CRS_DEP_TIME|DEP_TIME|DEP_DELAY|TAXI_OUT|WHEELS_OFF|WHEELS_ON|TAXI_IN|CRS_ARR_TIME|ARR_TIME|ARR_DELAY|CANCELLED|CANCELLATION_CODE|DIVERTED|CRS_ELAPSED_TIME|ELAPSED_TIME|AIR_TIME|DISTANCE|DELAY_DUE_CARRIER|DELAY_DUE_WEATHER|DELAY_DUE_NAS|DELAY_DUE_SECURITY|DELAY_DUE_LATE_AIRCRAFT|
+----------+--------------------+--------------------+------------+--------+---------+------+-------------------+----+-------------------

In [0]:
data.columns

Out[6]: ['FL_DATE',
 'AIRLINE',
 'AIRLINE_DOT',
 'AIRLINE_CODE',
 'DOT_CODE',
 'FL_NUMBER',
 'ORIGIN',
 'ORIGIN_CITY',
 'DEST',
 'DEST_CITY',
 'CRS_DEP_TIME',
 'DEP_TIME',
 'DEP_DELAY',
 'TAXI_OUT',
 'WHEELS_OFF',
 'WHEELS_ON',
 'TAXI_IN',
 'CRS_ARR_TIME',
 'ARR_TIME',
 'ARR_DELAY',
 'CANCELLED',
 'CANCELLATION_CODE',
 'DIVERTED',
 'CRS_ELAPSED_TIME',
 'ELAPSED_TIME',
 'AIR_TIME',
 'DISTANCE',
 'DELAY_DUE_CARRIER',
 'DELAY_DUE_WEATHER',
 'DELAY_DUE_NAS',
 'DELAY_DUE_SECURITY',
 'DELAY_DUE_LATE_AIRCRAFT']

In [0]:
data.printSchema()

root
 |-- FL_DATE: date (nullable = true)
 |-- AIRLINE: string (nullable = true)
 |-- AIRLINE_DOT: string (nullable = true)
 |-- AIRLINE_CODE: string (nullable = true)
 |-- DOT_CODE: integer (nullable = true)
 |-- FL_NUMBER: integer (nullable = true)
 |-- ORIGIN: string (nullable = true)
 |-- ORIGIN_CITY: string (nullable = true)
 |-- DEST: string (nullable = true)
 |-- DEST_CITY: string (nullable = true)
 |-- CRS_DEP_TIME: integer (nullable = true)
 |-- DEP_TIME: double (nullable = true)
 |-- DEP_DELAY: double (nullable = true)
 |-- TAXI_OUT: double (nullable = true)
 |-- WHEELS_OFF: double (nullable = true)
 |-- WHEELS_ON: double (nullable = true)
 |-- TAXI_IN: double (nullable = true)
 |-- CRS_ARR_TIME: integer (nullable = true)
 |-- ARR_TIME: double (nullable = true)
 |-- ARR_DELAY: double (nullable = true)
 |-- CANCELLED: double (nullable = true)
 |-- CANCELLATION_CODE: string (nullable = true)
 |-- DIVERTED: double (nullable = true)
 |-- CRS_ELAPSED_TIME: double (nullable = true)

2. Data Cleaning and Transformation (DataFrame)

In [0]:
#Handle Missing Values
from pyspark.sql.functions import col

data = data.fillna({"DEP_DELAY": 0, "ARR_DELAY": 0})

# Verify if missing values are replaced
data.select("DEP_DELAY", "ARR_DELAY").show(5)


+---------+---------+
|DEP_DELAY|ARR_DELAY|
+---------+---------+
|     -4.0|    -14.0|
|     -6.0|     -5.0|
|      6.0|      0.0|
|     -1.0|     24.0|
|     -2.0|     -1.0|
+---------+---------+
only showing top 5 rows



In [0]:
#Convert FL_DATE to Date Type
from pyspark.sql.functions import to_date

data = data.withColumn("FL_DATE", to_date(col("FL_DATE"), "yyyy-MM-dd"))

# Verify the data type
data.printSchema()


root
 |-- FL_DATE: date (nullable = true)
 |-- AIRLINE: string (nullable = true)
 |-- AIRLINE_DOT: string (nullable = true)
 |-- AIRLINE_CODE: string (nullable = true)
 |-- DOT_CODE: integer (nullable = true)
 |-- FL_NUMBER: integer (nullable = true)
 |-- ORIGIN: string (nullable = true)
 |-- ORIGIN_CITY: string (nullable = true)
 |-- DEST: string (nullable = true)
 |-- DEST_CITY: string (nullable = true)
 |-- CRS_DEP_TIME: integer (nullable = true)
 |-- DEP_TIME: double (nullable = true)
 |-- DEP_DELAY: double (nullable = false)
 |-- TAXI_OUT: double (nullable = true)
 |-- WHEELS_OFF: double (nullable = true)
 |-- WHEELS_ON: double (nullable = true)
 |-- TAXI_IN: double (nullable = true)
 |-- CRS_ARR_TIME: integer (nullable = true)
 |-- ARR_TIME: double (nullable = true)
 |-- ARR_DELAY: double (nullable = false)
 |-- CANCELLED: double (nullable = true)
 |-- CANCELLATION_CODE: string (nullable = true)
 |-- DIVERTED: double (nullable = true)
 |-- CRS_ELAPSED_TIME: double (nullable = tru

In [0]:
#Create a New Column TotalDelay
from pyspark.sql.functions import expr

data = data.withColumn("TotalDelay", expr("DEP_DELAY + ARR_DELAY"))

# Show result
data.select("DEP_DELAY", "ARR_DELAY", "TotalDelay").show(5)


+---------+---------+----------+
|DEP_DELAY|ARR_DELAY|TotalDelay|
+---------+---------+----------+
|     -4.0|    -14.0|     -18.0|
|     -6.0|     -5.0|     -11.0|
|      6.0|      0.0|       6.0|
|     -1.0|     24.0|      23.0|
|     -2.0|     -1.0|      -3.0|
+---------+---------+----------+
only showing top 5 rows



In [0]:
#Remove flights where CANCELLED = 1
data = data.filter(col("CANCELLED") == 0)

# Verify cancellation filter
data.select("FL_DATE", "AIRLINE", "CANCELLED").show(5)


+----------+--------------------+---------+
|   FL_DATE|             AIRLINE|CANCELLED|
+----------+--------------------+---------+
|2019-01-09|United Air Lines ...|      0.0|
|2022-11-19|Delta Air Lines Inc.|      0.0|
|2022-07-22|United Air Lines ...|      0.0|
|2023-03-06|Delta Air Lines Inc.|      0.0|
|2020-02-23|    Spirit Air Lines|      0.0|
+----------+--------------------+---------+
only showing top 5 rows



In [0]:
#Remove flights where CANCELLED = 1
data = data.filter(col("CANCELLED") == 0)

data.select("FL_DATE", "AIRLINE", "CANCELLED").show(5)


+----------+--------------------+---------+
|   FL_DATE|             AIRLINE|CANCELLED|
+----------+--------------------+---------+
|2019-01-09|United Air Lines ...|      0.0|
|2022-11-19|Delta Air Lines Inc.|      0.0|
|2022-07-22|United Air Lines ...|      0.0|
|2023-03-06|Delta Air Lines Inc.|      0.0|
|2020-02-23|    Spirit Air Lines|      0.0|
+----------+--------------------+---------+
only showing top 5 rows



In [0]:
data.write.mode("overwrite").csv("/FileStore/tables/cleaned_flight_data.csv", header=True)


3. Data Analysis and Aggregation (DataFrame): 

In [0]:
# Calculate the Average Departure Delay per Airline (Sorted Descending)
from pyspark.sql.functions import avg, desc

avg_dep_delay = data.groupBy("AIRLINE") \
                  .agg(avg("DEP_DELAY").alias("Avg_Dep_Delay")) \
                  .orderBy(desc("Avg_Dep_Delay"))

# Show the result
avg_dep_delay.show()


+--------------------+------------------+
|             AIRLINE|     Avg_Dep_Delay|
+--------------------+------------------+
|     JetBlue Airways| 18.24340421656573|
|Frontier Airlines...|15.997834394904459|
|       Allegiant Air|13.904974679773607|
|    Spirit Air Lines|12.954614075097163|
|ExpressJet Airlin...|12.773196448390676|
|American Airlines...|12.569004752833834|
|  Mesa Airlines Inc.|12.227925094589633|
|United Air Lines ...|11.184855081777577|
|Southwest Airline...|10.808051992351954|
|SkyWest Airlines ...|  9.44232005523941|
|Delta Air Lines Inc.|  8.09660455688658|
|   PSA Airlines Inc.| 7.913136512159153|
|           Envoy Air| 6.672530032391624|
|   Endeavor Air Inc.| 5.921167631213148|
|    Republic Airline| 5.742866222257531|
|Hawaiian Airlines...| 5.086332976107924|
|         Horizon Air| 4.804442250740375|
|Alaska Airlines Inc.| 4.622522403661717|
+--------------------+------------------+



In [0]:
#Find the Top 5 Most Frequent Origin-Destination Pairs
from pyspark.sql.functions import count

top_routes = data.groupBy("ORIGIN", "DEST") \
               .agg(count("*").alias("Flight_Count")) \
               .orderBy(desc("Flight_Count")) \
               .limit(5)

# Show the result
top_routes.show()

+------+----+------------+
|ORIGIN|DEST|Flight_Count|
+------+----+------------+
|   SFO| LAX|        5219|
|   LAX| SFO|        5078|
|   OGG| HNL|        4589|
|   LAX| LAS|        4543|
|   LGA| ORD|        4516|
+------+----+------------+



In [0]:
#Calculate the Percentage of Flights Delayed More than 15 Minutes
from pyspark.sql.functions import count

total_flights = data.count()
delayed_flights = data.filter(col("DEP_DELAY") > 15).count()

# Calculate percentage
percentage_delayed = (delayed_flights / total_flights) * 100

print(f"Percentage of flights delayed more than 15 minutes: {percentage_delayed:.2f}%")

Percentage of flights delayed more than 15 minutes: 17.56%


In [0]:
#Calculate the Average Arrival Delay per Destination Airport
avg_arr_delay = data.groupBy("DEST") \
                  .agg(avg("ARR_DELAY").alias("Avg_Arr_Delay")) \
                  .orderBy(desc("Avg_Arr_Delay"))

# Show the result
avg_arr_delay.show(10)

+----+------------------+
|DEST|     Avg_Arr_Delay|
+----+------------------+
| IPT| 72.27272727272727|
| UIN| 30.87272727272727|
| SHD|29.890196078431373|
| PVU|26.567251461988302|
| CDB| 21.41176470588235|
| OGS| 20.59550561797753|
| ILG|20.307692307692307|
| PGD|20.253991830672113|
| BQN|20.074944071588366|
| ART|              17.5|
+----+------------------+
only showing top 10 rows



4. Advanced Analysis (DataFrame): 

In [0]:
#Calculate Running Average of TotalDelay for Each Airline
from pyspark.sql.window import Window
from pyspark.sql.functions import avg

# Define a window partitioned by AIRLINE and ordered by FL_DATE
window_spec = Window.partitionBy("AIRLINE").orderBy("FL_DATE").rowsBetween(-3, 0)

# Calculate the running average (last 4 days including current)
data = data.withColumn("Running_Avg_TotalDelay", avg("TotalDelay").over(window_spec))

# Show results
data.select("FL_DATE", "AIRLINE", "TotalDelay", "Running_Avg_TotalDelay").show(10)


+----------+-------------+----------+----------------------+
|   FL_DATE|      AIRLINE|TotalDelay|Running_Avg_TotalDelay|
+----------+-------------+----------+----------------------+
|2019-01-01|Allegiant Air|     196.0|                 196.0|
|2019-01-01|Allegiant Air|     -14.0|                  91.0|
|2019-01-01|Allegiant Air|      13.0|                  65.0|
|2019-01-01|Allegiant Air|      -5.0|                  47.5|
|2019-01-01|Allegiant Air|       5.0|                 -0.25|
|2019-01-01|Allegiant Air|      34.0|                 11.75|
|2019-01-01|Allegiant Air|       7.0|                 10.25|
|2019-01-01|Allegiant Air|     -53.0|                 -1.75|
|2019-01-01|Allegiant Air|     -38.0|                 -12.5|
|2019-01-01|Allegiant Air|     -45.0|                -32.25|
+----------+-------------+----------+----------------------+
only showing top 10 rows



In [0]:

#Find the Airline with the Most Flights Each Month

from pyspark.sql.functions import month, row_number
from pyspark.sql.window import Window

# Extract the month from the FlightDate
data = data.withColumn("Month", month("FL_DATE"))

# Group by Month & Airline, count flights
monthly_flight_counts = data.groupBy("Month", "AIRLINE") \
                          .count() \
                          .withColumnRenamed("count", "Flight_Count")

# Define window partitioned by month, ordered by flight count (descending)
window_spec = Window.partitionBy("Month").orderBy(col("Flight_Count").desc())

# Get top airline per month
top_airlines = monthly_flight_counts.withColumn("rank", row_number().over(window_spec)) \
                                    .filter(col("rank") == 1) \
                                    .drop("rank")

# Show results
top_airlines.show()


+-----+--------------------+------------+
|Month|             AIRLINE|Flight_Count|
+-----+--------------------+------------+
|    1|Southwest Airline...|       48856|
|    2|Southwest Airline...|       43921|
|    3|Southwest Airline...|       52026|
|    4|Southwest Airline...|       46211|
|    5|Southwest Airline...|       48179|
|    6|Southwest Airline...|       50913|
|    7|Southwest Airline...|       55687|
|    8|Southwest Airline...|       54918|
|    9|Southwest Airline...|       38416|
|   10|Southwest Airline...|       40195|
|   11|Southwest Airline...|       39315|
|   12|Southwest Airline...|       38368|
+-----+--------------------+------------+



5. Saving Results (DataFrame): 


In [0]:
from pyspark.sql.functions import avg

avg_dep_delay = data.groupBy("AIRLINE") \
                  .agg(avg("DEP_DELAY").alias("Avg_Dep_Delay"))

# Show results
avg_dep_delay.show()


+--------------------+------------------+
|             AIRLINE|     Avg_Dep_Delay|
+--------------------+------------------+
|   Endeavor Air Inc.| 5.921167631213148|
|       Allegiant Air|13.904974679773607|
|ExpressJet Airlin...|12.773196448390676|
|SkyWest Airlines ...|  9.44232005523941|
|   PSA Airlines Inc.| 7.913136512159153|
|         Horizon Air| 4.804442250740375|
|United Air Lines ...|11.184855081777577|
|    Republic Airline| 5.742866222257531|
|Frontier Airlines...|15.997834394904459|
|Southwest Airline...|10.808051992351954|
|     JetBlue Airways| 18.24340421656573|
|           Envoy Air| 6.672530032391624|
|Hawaiian Airlines...| 5.086332976107924|
|Alaska Airlines Inc.| 4.622522403661717|
|Delta Air Lines Inc.|  8.09660455688658|
|  Mesa Airlines Inc.|12.227925094589633|
|American Airlines...|12.569004752833834|
|    Spirit Air Lines|12.954614075097163|
+--------------------+------------------+



In [0]:
output_path = "/FileStore/tables/avg_dep_delay_by_airline.csv"

avg_dep_delay.write.mode("overwrite").csv(output_path, header=True)

6. SQL