
# Ex2 - Big Data Engineering - PySPARK Assignment
By: **_GROUP 2_ - Oshri Mandelawi, Ofek Shaharabani & Idan Kanat** - 3.7.2025

**Author:** Hadar Engel

##### Imports:

In [0]:
# Relevant imports:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col, asc, desc
from pyspark.sql.window import Window

In [0]:
spark = SparkSession.builder.getOrCreate()

##### Loading the dataset:

In [0]:
df = spark.table("workspace.default.flights_sample") # loading the flights dataset in Spark
df.printSchema()   # Check the column structure
display(df.limit(10)) # Preview first 10 rows

root
 |-- FL_DATE: date (nullable = true)
 |-- AIRLINE: string (nullable = true)
 |-- AIRLINE_DOT: string (nullable = true)
 |-- AIRLINE_CODE: string (nullable = true)
 |-- DOT_CODE: long (nullable = true)
 |-- FL_NUMBER: long (nullable = true)
 |-- ORIGIN: string (nullable = true)
 |-- ORIGIN_CITY: string (nullable = true)
 |-- DEST: string (nullable = true)
 |-- DEST_CITY: string (nullable = true)
 |-- CRS_DEP_TIME: long (nullable = true)
 |-- DEP_TIME: double (nullable = true)
 |-- DEP_DELAY: double (nullable = true)
 |-- TAXI_OUT: double (nullable = true)
 |-- WHEELS_OFF: double (nullable = true)
 |-- WHEELS_ON: double (nullable = true)
 |-- TAXI_IN: double (nullable = true)
 |-- CRS_ARR_TIME: long (nullable = true)
 |-- ARR_TIME: double (nullable = true)
 |-- ARR_DELAY: double (nullable = true)
 |-- CANCELLED: double (nullable = true)
 |-- CANCELLATION_CODE: string (nullable = true)
 |-- DIVERTED: double (nullable = true)
 |-- CRS_ELAPSED_TIME: double (nullable = true)
 |-- ELAPSE

FL_DATE,AIRLINE,AIRLINE_DOT,AIRLINE_CODE,DOT_CODE,FL_NUMBER,ORIGIN,ORIGIN_CITY,DEST,DEST_CITY,CRS_DEP_TIME,DEP_TIME,DEP_DELAY,TAXI_OUT,WHEELS_OFF,WHEELS_ON,TAXI_IN,CRS_ARR_TIME,ARR_TIME,ARR_DELAY,CANCELLED,CANCELLATION_CODE,DIVERTED,CRS_ELAPSED_TIME,ELAPSED_TIME,AIR_TIME,DISTANCE,DELAY_DUE_CARRIER,DELAY_DUE_WEATHER,DELAY_DUE_NAS,DELAY_DUE_SECURITY,DELAY_DUE_LATE_AIRCRAFT
2023-03-23,Delta Air Lines Inc.,Delta Air Lines Inc.: DL,DL,19790,2788,ATL,"Atlanta, GA",BWI,"Baltimore, MD",845,842.0,-3.0,21.0,903.0,1025.0,7.0,1035,1032.0,-3.0,0.0,,0.0,110.0,110.0,82.0,577.0,,,,,
2019-11-15,American Airlines Inc.,American Airlines Inc.: AA,AA,19805,2640,DFW,"Dallas/Fort Worth, TX",PBI,"West Palm Beach/Palm Beach, FL",1440,1441.0,1.0,14.0,1455.0,1814.0,5.0,1815,1819.0,4.0,0.0,,0.0,155.0,158.0,139.0,1102.0,,,,,
2020-10-12,Envoy Air,Envoy Air: MQ,MQ,20398,4072,MCI,"Kansas City, MO",DFW,"Dallas/Fort Worth, TX",1545,1538.0,-7.0,11.0,1549.0,1703.0,13.0,1728,1716.0,-12.0,0.0,,0.0,103.0,98.0,74.0,460.0,,,,,
2022-05-04,Southwest Airlines Co.,Southwest Airlines Co.: WN,WN,19393,2797,CMH,"Columbus, OH",STL,"St. Louis, MO",1115,1113.0,-2.0,10.0,1123.0,1130.0,7.0,1140,1137.0,-3.0,0.0,,0.0,85.0,84.0,67.0,409.0,,,,,
2019-09-10,PSA Airlines Inc.,PSA Airlines Inc.: OH,OH,20397,5035,CLT,"Charlotte, NC",TOL,"Toledo, OH",1145,1143.0,-2.0,24.0,1207.0,1314.0,3.0,1325,1317.0,-8.0,0.0,,0.0,100.0,94.0,67.0,466.0,,,,,
2022-03-13,American Airlines Inc.,American Airlines Inc.: AA,AA,19805,2359,DFW,"Dallas/Fort Worth, TX",DEN,"Denver, CO",1042,1041.0,-1.0,19.0,1100.0,1132.0,5.0,1158,1137.0,-21.0,0.0,,0.0,136.0,116.0,92.0,641.0,,,,,
2022-08-13,SkyWest Airlines Inc.,SkyWest Airlines Inc.: OO,OO,20304,4096,OMA,"Omaha, NE",MSP,"Minneapolis, MN",1325,1321.0,-4.0,15.0,1336.0,1428.0,5.0,1442,1433.0,-9.0,0.0,,0.0,77.0,72.0,52.0,282.0,,,,,
2021-09-13,American Airlines Inc.,American Airlines Inc.: AA,AA,19805,1302,DFW,"Dallas/Fort Worth, TX",BNA,"Nashville, TN",1039,1035.0,-4.0,14.0,1049.0,1212.0,6.0,1237,1218.0,-19.0,0.0,,0.0,118.0,103.0,83.0,631.0,,,,,
2023-07-12,Endeavor Air Inc.,Endeavor Air Inc.: 9E,9E,20363,5025,LGA,"New York, NY",STL,"St. Louis, MO",1240,1332.0,52.0,9.0,1341.0,1438.0,5.0,1435,1443.0,8.0,0.0,,0.0,175.0,131.0,117.0,888.0,,,,,
2019-12-15,Southwest Airlines Co.,Southwest Airlines Co.: WN,WN,19393,6542,STL,"St. Louis, MO",RDU,"Raleigh/Durham, NC",1745,1817.0,32.0,33.0,1850.0,2108.0,5.0,2035,2113.0,38.0,0.0,,0.0,110.0,116.0,78.0,667.0,32.0,0.0,6.0,0.0,0.0


### Question / Query 1:

In [0]:
# Query 1: Ranks Airlines by Average Arrival Delay per Year-Month & categorizes them into delay levels

display(
    df
    # Step 1: Add two new columns YEAR and MONTH extracted from the FL_DATE column
    .withColumn("YEAR", F.year(col("FL_DATE")))                      # Extract year from flight date
    .withColumn("MONTH", F.month(col("FL_DATE")))                    # Extract month from flight date

    # Step 2: Group by AIRLINE, YEAR, MONTH and calculate average ARR_DELAY for each group (as demanded in the instructions - i.e. not just grouping by month)
    .groupBy("AIRLINE", "YEAR", "MONTH")
    .agg(F.avg(col("ARR_DELAY")).alias("AVG_ARR_DELAY"))             # Compute average arrival delay

    # Step 3: Add a RANK column, ranking airlines by descending AVG_ARR_DELAY per (YEAR, MONTH)
    .withColumn(
        "RANK",
        F.rank().over(
            Window.partitionBy("YEAR", "MONTH")
                  .orderBy(F.desc("AVG_ARR_DELAY"))                  # Highest average delay gets rank 1
        )
    )

    # Step 4: Add a DELAY_LEVEL column classifying delay severity
    .withColumn(
        "DELAY_LEVEL",
        F.when(col("AVG_ARR_DELAY") > 20, "High")                    # More than 20 mins = High
         .when((col("AVG_ARR_DELAY") >= 10) & (col("AVG_ARR_DELAY") <= 20), "Medium")  # 10–20 mins
         .otherwise("Low")                                           # Less than 10 mins = Low
    )

    # Step 5: Sort the final output for readability
    .orderBy("YEAR", "MONTH", "RANK")                                # Sort by time and rank
)

AIRLINE,YEAR,MONTH,AVG_ARR_DELAY,RANK,DELAY_LEVEL
JetBlue Airways,2019,1,14.944398340248965,1,Medium
ExpressJet Airlines LLC d/b/a aha!,2019,1,14.542745098039216,2,Medium
SkyWest Airlines Inc.,2019,1,10.504681796540233,3,Medium
Republic Airline,2019,1,8.696883852691219,4,Low
Allegiant Air,2019,1,8.637829912023461,5,Low
United Air Lines Inc.,2019,1,7.874431695172115,6,Low
Envoy Air,2019,1,7.574979625101874,7,Low
Mesa Airlines Inc.,2019,1,5.837235228539576,8,Low
Frontier Airlines Inc.,2019,1,5.255102040816326,9,Low
Spirit Air Lines,2019,1,4.813245033112583,10,Low


### Question / Query 2:

In [0]:
# Query 2: Computes Average & Standard Deviation of Daily Cancellation Proportions per Airline (Only on Days with >= 100 Flights)

display(
    df
    # Step 1: Add a new DATE column (yyyy-MM-dd) extracted from FL_DATE
    .withColumn("DATE", F.to_date(col("FL_DATE")))                              # Extract date only (without time)

    # Step 2: Group by AIRLINE and DATE to compute:
    # - Total number of flights that day
    # - Number of cancelled flights (just sum the CANCELLED column)
    .groupBy("AIRLINE", "DATE")
    .agg(
        F.count("*").alias("NUM_FLIGHTS"),                                      # Total flights on that day
        F.sum("CANCELLED").alias("NUM_CANCELLED")                               # Total cancelled flights on that day
    )
    
    # Step 3: Keep only days where the airline had at least 100 flights
    .filter(col("NUM_FLIGHTS") >= 100)

    # Step 4: Compute cancellation proportion for each airline on each day
    .withColumn("CANCEL_PROP", col("NUM_CANCELLED") / col("NUM_FLIGHTS"))       # Daily cancellation rate per airline per day

    # Step 5: Group by AIRLINE to compute:
    # - Average of daily cancellation proportion
    # - Standard deviation of daily cancellation proportion
    .groupBy("AIRLINE")
    .agg(
        F.avg("CANCEL_PROP").alias("AVG_DAILY_CANCEL_PROP"),                          # Mean cancellation rate
        F.stddev("CANCEL_PROP").alias("STDDEV_DAILY_CANCEL_PROP")                     # Standard Deviation of cancellation rate
    )

    # Step 6: Sort airlines by mean cancellation proportion - AVG_DAILY_CANCEL_PROP in an ascending order (to check for best airlines in busy days)
    .orderBy(F.asc("AVG_DAILY_CANCEL_PROP"))
)

AIRLINE,AVG_DAILY_CANCEL_PROP,STDDEV_DAILY_CANCEL_PROP
Alaska Airlines Inc.,0.0,
Spirit Air Lines,0.0099009900990099,
Endeavor Air Inc.,0.0135714285714285,0.0507796359633606
Delta Air Lines Inc.,0.0150292630378116,0.0617153950692927
JetBlue Airways,0.0156368776093984,0.0392163070419962
United Air Lines Inc.,0.0195819915452815,0.0660677328809677
PSA Airlines Inc.,0.0225361171182795,0.0219820591613077
SkyWest Airlines Inc.,0.0236685089550162,0.0538935047713091
American Airlines Inc.,0.0278194236375991,0.0721638040637417
Mesa Airlines Inc.,0.03,


In [0]:
# Internal Check: if a specific airline had any cancellations on days with >= 100 flights:

display(
    df
    .withColumn("DATE", F.to_date(col("FL_DATE")))  # Extract pure date
    .filter(col("AIRLINE") == "Spirit Air Lines")  # Focus on the airline
    .groupBy("AIRLINE", "DATE")
    .agg(
        F.count("*").alias("NUM_FLIGHTS"),
        F.sum("CANCELLED").alias("NUM_CANCELLED")
    )
    .filter(col("NUM_FLIGHTS") >= 100)  # Only consider days with ≥ 100 flights
    .filter(col("NUM_CANCELLED") > 0)   # Only keep days that had cancellations
    .orderBy("DATE")
)

AIRLINE,DATE,NUM_FLIGHTS,NUM_CANCELLED
Spirit Air Lines,2022-11-28,101,1.0


### Question / Query 3:

In [0]:
# Query 3: Identifies the 5 Most Frequent Routes - Airport Pairs (Origin-Destination) with High Average TOTAL Delays (> 45 mins) and Calculates the Share of Flights with No Delay at All

display(
    df
    # Step 1: Compute the Total Delay = Departure Delay + Arrival Delay
    .withColumn("TOTAL_DELAY", col("DEP_DELAY") + col("ARR_DELAY"))

    # Step 2: Group by origin-destination pair and compute:
    # - Total number of flights on the route
    # - Average arrival delay
    # - Percentage of flights with no delay at all (neither departure nor arrival delayed)
    .groupBy("ORIGIN", "DEST")
    .agg(
        F.count("*").alias("NUM_FLIGHTS"),                                                             # Total number of flights on the route
        F.avg("TOTAL_DELAY").alias("AVG_TOTAL_DELAY"),                                                 # Average arrival delay
        F.round(                                                                                       # Share of perfectly on-time flights (both delays <= 0)
            F.avg(F.when((col("DEP_DELAY") <= 0) & (col("ARR_DELAY") <= 0), 100).otherwise(0)), 2
        ).alias("PERCENT_NO_DELAY")
    )

    # Step 3: Keep only routes with high average delay (greater than 45 minutes)
    .filter(col("AVG_TOTAL_DELAY") > 45)

    # Step 4: Sort by number of flights descending, to get busiest high-delay routes
    .orderBy(F.desc("NUM_FLIGHTS"))

    # Step 5: Keep only the top / busiest 5 origin-destination pairs
    .limit(5)
)

ORIGIN,DEST,NUM_FLIGHTS,AVG_TOTAL_DELAY,PERCENT_NO_DELAY
DEN,ASE,1002,45.32747252747253,43.61
MSN,DFW,465,50.95604395604396,49.46
ORD,ASE,449,45.14572864321608,41.43
ASE,DFW,411,68.67297297297297,44.04
ASE,ORD,407,83.11666666666666,43.73


### Question / Query 4:

In [0]:
# Query 4: Calculates Average Arrival Delay and the Proportion of On-Time or Early Arrivals by Time-of-Day Category

display(
    df
    # Step 1: Extract departure hour from 4-digit HHMM-format DEP_TIME (e.g., 1430 -> 14)
    .withColumn("DEP_HOUR", (col("DEP_TIME") / 100).cast("int"))

    # Step 2: Assign a time-of-day category
    .withColumn(
        "TIME_OF_DAY",
        F.when((col("DEP_HOUR") >= 5) & (col("DEP_HOUR") < 12), "Morning")      # Morning: 05:00–11:59
         .when((col("DEP_HOUR") >= 12) & (col("DEP_HOUR") < 17), "Afternoon")   # Afternoon: 12:00–16:59
         .when((col("DEP_HOUR") >= 17) & (col("DEP_HOUR") < 21), "Evening")     # Evening: 17:00–20:59
         .otherwise("Night")                                                    # Night: 21:00–04:59
    )

    # Step 3: Group by departure period - TIME_OF_DAY and compute:
    # - Proportion - Mean amount of flights with ARR_DELAY <= 0 (on-time or early)
    # - Average arrival delay
    .groupBy("TIME_OF_DAY")
    .agg(
        F.avg("ARR_DELAY").alias("AVG_ARR_DELAY"),                                            # Mean arrival delay
        F.avg(F.when(col("ARR_DELAY") <= 0, 100).otherwise(0)).alias("ON_TIME_PERCENTAGE")    # Proportion - Average amount of on-time or early arrivals, represented as a percentage
    )

# Step 4: Enforce chronological order of TIME_OF_DAY: Morning → Afternoon → Evening → Night
.orderBy(
    F.when(col("TIME_OF_DAY") == "Morning", 1)
     .when(col("TIME_OF_DAY") == "Afternoon", 2)
     .when(col("TIME_OF_DAY") == "Evening", 3)
     .otherwise(4)
)
)



### Question / Query 5:

In [0]:
# Query 5: Classifies Flight Duration Performance by Observed vs. Scheduled (Expected) Elapsed Time and Summarizing Flight Shares by performance category, and Coverage (Number of Unique Destinations), per Airline

display(
    df
    # Step 1: Filter out degenerate cases: Ensure CRS_ELAPSED_TIME > 0 and ELAPSED_TIME is not null
    .filter((col("CRS_ELAPSED_TIME") > 0) & (col("ELAPSED_TIME").isNotNull()))

    # Step 2: Compute performance ratio = actual observed elapsed time / scheduled elapsed
    .withColumn("PERFORMANCE_RATIO", col("ELAPSED_TIME") / col("CRS_ELAPSED_TIME"))

    # Step 3: Classify each flight into a performance category
    .withColumn(
        "PERFORMANCE_CATEGORY",
        F.when(col("PERFORMANCE_RATIO") < 0.9, "Faster - Significantly")           # Much Faster than expected: Performance Ratio < 0.9
         .when(col("PERFORMANCE_RATIO") <= 1.1, "Moderate / On Time (±10%)")   # Moderate performance / On Time: Performance  Ratio between 0.9 and 1.1
         .otherwise("Slower - Significantly")                                      # Much Slower than expected: Performance Ratio > 1.1
    )

    # Step 4: Count total number of flights per AIRLINE to enable proportion calculation (share of each performance category) later
    .withColumn("NUM_FLIGHTS", F.count("*").over(Window.partitionBy("AIRLINE"))) # used a window function to count the number of flights per airline

    # Step 5: Group by AIRLINE and PERFORMANCE_CATEGORY to compute:
    # - Percentage of flights in each performance group (relative to airline total)
    # - Number of unique destinations served in that category
    .groupBy("AIRLINE", "PERFORMANCE_CATEGORY")
    .agg(
        F.round((F.count("*") / F.first("NUM_FLIGHTS") * 100), 2).alias("PERCENT_FLIGHTS"),   # Percentage (%) of flights in each category
        F.countDistinct("DEST").alias("NUM_UNIQUE_DEST")                                     # Number of unique destinations in each group
    )

    # Step 6: Sort output by AIRLINE and category (alphabetically preserved: Faster → On Time → Slower)
    .orderBy("AIRLINE", "PERFORMANCE_CATEGORY")
)

