In [0]:
#Storage Details
storage_account_name = "flightdatastore1634"
container_name = "flightdata"
folder_path = "flightdata/raw"
file_name = "flight_data_2018_2024.csv"
sas_token = "sp=racwdlmeop&st=2025-06-15T23:33:01Z&se=2025-06-16T07:33:01Z&spr=https&sv=2024-11-04&sr=c&sig=bvVVcNci%2B4SM8yZ91%2By1%2B0wiAltJh%2FV67CfM4ggZmoE%3D"

spark.conf.set(
    f"fs.azure.sas.{container_name}.{storage_account_name}.blob.core.windows.net",
    sas_token
)

file_path = f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/{folder_path}/{file_name}"


#Bronze Layer

In [0]:
from pyspark.sql.functions import col

df = spark.read.option("header", "true").option("inferSchema", "true").csv(file_path)

# Strip Whitespace
new_columns = [c.strip() for c in df.columns]
df_clean = df.toDF(*new_columns)

df_clean.show(5)
df.count()

+----+-------+-----+----------+---------+----------+-------------------------+---------------------------------------+------------------------+---------------------------+-------------------------------+---------------------------------------+----------------------------------------------+-------------------------------------------------+--------------------------------------------------+-----------------+------------------------+---------------------------+-----------+-------------------------------+---------------+------------------+------------------+------+--------------+-----------+---------------+---------------+---------+-------------+----------------+----------------+----+--------------+---------+-------------+-------------+-------+----------+-------+--------+---------------+--------+--------------------+----------+-------+---------+--------+------+----------+-------+--------+---------------+--------+------------------+----------+---------+----------------+--------+------------

582425

In [0]:
mount_point = "/mnt/flightdata"

#DELTA PATH FOR BRONZE LAYER
delta_bronze_path = f"{mount_point}/bronze/bronze_flight_delays"

df_clean.write.format("delta").mode("overwrite").save(delta_bronze_path)

#Silver Layer

In [0]:
from pyspark.sql.functions import col, to_date, date_format, to_timestamp, when

bronze_path = f"{mount_point}/bronze/bronze_flight_delays"
df_bronze = spark.read.format("delta").load(bronze_path)

# Remove duplicates based on key columns
df_silver = df_bronze.dropDuplicates(["FlightDate", "Flight_Number_Marketing_Airline", "IATA_Code_Marketing_Airline"])

# Filter out canceled and diverted flights
df_silver = df_silver.filter((col("Cancelled") == 0) & (col("Diverted") == 0))

# Convert FlightDate to date and create YearMonth string
df_silver = df_silver.withColumn("FlightDate", to_date("FlightDate", "yyyy-MM-dd")) \
                     .withColumn("YearMonth", date_format("FlightDate", "yyyy-MM"))

# Convert DepDelayMinutes and ArrDelayMinutes to int
df_silver = df_silver.withColumn(
    "DepDelayMinutes",
    when(col("DepDelayMinutes").isNull(), 0).otherwise(col("DepDelayMinutes").cast("int"))
).withColumn(
    "ArrDelayMinutes",
    when(col("ArrDelayMinutes").isNull(), 0).otherwise(col("ArrDelayMinutes").cast("int"))
)

# Add timestamp version of FlightDate for time-based analysis
df_silver = df_silver.withColumn("FlightDateTS", to_timestamp(col("FlightDate"), "yyyy-MM-dd"))

# Save silver table with schema merge enabled
silver_path = f"{mount_point}/silver/silver_flight_delays"
df_silver.write.format("delta").mode("overwrite").option("mergeSchema", "true").save(silver_path)

df_silver.show(5)
print(f"Total rows in silver table: {df_silver.count()}")


+----+-------+-----+----------+---------+----------+-------------------------+---------------------------------------+------------------------+---------------------------+-------------------------------+---------------------------------------+----------------------------------------------+-------------------------------------------------+--------------------------------------------------+-----------------+------------------------+---------------------------+-----------+-------------------------------+---------------+------------------+------------------+------+---------------+-----------+---------------+---------------+---------+-------------+----------------+----------------+----+-------------------+---------+-------------+-------------+-------+----------+-------+--------+---------------+--------+--------------------+----------+-------+---------+--------+------+----------+-------+--------+---------------+--------+------------------+----------+---------+----------------+--------+------

Gold Layer

In [0]:
from pyspark.sql.functions import col, avg, count, sum, when, round, to_date, date_format

silver_path = f"{mount_point}/silver/silver_flight_delays"
df_silver = spark.read.format("delta").load(silver_path)


In [0]:
# 1. Flight Delay Distribution by Airline (Operating Airline)
gold_delay_dist = df_silver.groupBy("IATA_Code_Operating_Airline", "DepDel15") \
    .agg(round(avg("ArrDelayMinutes"), 2).alias("AvgArrDelayMinutes")) \
    .orderBy("IATA_Code_Operating_Airline", "DepDel15")

# 2. Delay Trend Over Time (Daily averages)
gold_delay_trend = df_silver.groupBy("FlightDate") \
    .agg(
        round(avg("ArrDelayMinutes"), 2).alias("AvgArrDelayMinutes"),
        round(avg("DepDelayMinutes"), 2).alias("AvgDepDelayMinutes")
    ) \
    .orderBy("FlightDate")

# 3. Cancellation Rate by Reason (Marketing Airline Network)
gold_cancellation_reason = df_silver.filter(col("Cancelled") == 1) \
    .groupBy("Marketing_Airline_Network", "CancellationCode") \
    .agg(count("*").alias("CancellationCount")) \
    .orderBy("Marketing_Airline_Network", "CancellationCode")

# 4. Most Frequent Routes
gold_routes = df_silver.groupBy("Origin", "Dest") \
    .agg(count("*").alias("FlightCount")) \
    .orderBy("FlightCount", ascending=False)

# 5. Geographic View of Average Delays (by OriginCityName)
gold_geo_delays = df_silver.groupBy("OriginCityName") \
    .agg(
        count("*").alias("FlightCount"),
        round(avg("ArrDelayMinutes"), 2).alias("AvgArrDelayMinutes")
    ) \
    .orderBy("OriginCityName")

# 6. Monthly Aggregated Delay Metrics
df_silver = df_silver.withColumn("YearMonth", date_format("FlightDate", "yyyy-MM"))
gold_monthly_metrics = df_silver.groupBy("YearMonth") \
    .agg(
        round(avg("DepDelayMinutes"), 2).alias("AvgDepDelayMinutes"),
        round(avg("ArrDelayMinutes"), 2).alias("AvgArrDelayMinutes"),
        round(avg("TaxiOut"), 2).alias("AvgTaxiOut"),
        round(avg("TaxiIn"), 2).alias("AvgTaxiIn")
    ) \
    .orderBy("YearMonth")


In [0]:
from pyspark.sql.functions import avg, desc

# Average arrival delay by Origin airport
top10_arrival_delays = (
    df_silver.groupBy("Origin")
    .agg(avg("ArrDelayMinutes").alias("AvgArrDelay"))
    .orderBy(desc("AvgArrDelay"))
    .limit(10)
)

# Average departure delay by Origin airport
top10_departure_delays = (
    df_silver.groupBy("Origin")
    .agg(avg("DepDelayMinutes").alias("AvgDepDelay"))
    .orderBy(desc("AvgDepDelay"))
    .limit(10)
)

top10_arrival_delays.show()
top10_departure_delays.show()


+------+-----------------+
|Origin|      AvgArrDelay|
+------+-----------------+
|   CKB|             86.2|
|   ELM|          85.1875|
|   PLN|85.10526315789474|
|   MQT|80.72972972972973|
|   ALO| 73.3529411764706|
|   FSM|69.97872340425532|
|   ART|69.58536585365853|
|   STC|69.11111111111111|
|   SMX|67.33333333333333|
|   MLI|65.58646616541354|
+------+-----------------+

+------+------------------+
|Origin|       AvgDepDelay|
+------+------------------+
|   ELM|            88.125|
|   CKB|              86.0|
|   PLN| 85.26315789473684|
|   MQT| 81.67567567567568|
|   ALO| 73.50980392156863|
|   STC| 71.61111111111111|
|   SMX| 69.66666666666667|
|   FSM| 64.70212765957447|
|   MLI| 62.11654135338346|
|   CIU|61.666666666666664|
+------+------------------+



In [0]:
from pyspark.sql.functions import sum, col

# Define on-time flights: arrival delay <= 15 min
df_ontime = df_silver.withColumn("OnTimeArr", (col("ArrDelayMinutes") <= 15).cast("int"))

# Calculate % on-time arrivals by Operating Airline
ontime_performance = (
    df_ontime.groupBy("IATA_Code_Operating_Airline")
    .agg(
        (sum("OnTimeArr") / count("*") * 100).alias("PctOnTimeArrival")
    )
    .orderBy(desc("PctOnTimeArrival"))
)

ontime_performance.write.format("delta").mode("overwrite").saveAsTable("flight_data_db.ontime_performance")

ontime_performance.show()


+---------------------------+-----------------+
|IATA_Code_Operating_Airline| PctOnTimeArrival|
+---------------------------+-----------------+
|                         YX|83.58105956046289|
|                         PT|82.06002728512959|
|                         DL|81.95449844881075|
|                         G7|81.05503423788993|
|                         C5|78.66032210834554|
|                         UA| 78.5860655737705|
|                         G4|78.29309108295482|
|                         9E| 77.2605267572555|
|                         WN|77.04613927298082|
|                         QX|76.97368421052632|
|                         OO|76.49106940990278|
|                         YV|75.54662379421222|
|                         OH|75.48449612403101|
|                         NK|74.51322145311488|
|                         HA|74.39382239382239|
|                         MQ|73.70005473453749|
|                         F9|73.58436124936883|
|                         AS|73.11732504

In [0]:
df_delayed = df_silver.filter(col("ArrDelayMinutes") > 15)

# Airlines with most delayed flights
repeat_delayed_airlines = (
    df_delayed.groupBy("IATA_Code_Operating_Airline")
    .count()
    .orderBy(desc("count"))
)

# Airports with most delayed flights
repeat_delayed_airports = (
    df_delayed.groupBy("Origin")
    .count()
    .orderBy(desc("count"))
)

repeat_delayed_airlines.show()
repeat_delayed_airports.show()


+---------------------------+-----+
|IATA_Code_Operating_Airline|count|
+---------------------------+-----+
|                         WN|17447|
|                         AA|17171|
|                         DL|10470|
|                         UA|10450|
|                         OO|10398|
|                         B6| 5403|
|                         NK| 5118|
|                         MQ| 3844|
|                         F9| 3662|
|                         AS| 3634|
|                         YX| 3093|
|                         OH| 2783|
|                         9E| 2711|
|                         G4| 1816|
|                         HA| 1658|
|                         YV| 1521|
|                         ZW| 1246|
|                         C5| 1166|
|                         PT| 1052|
|                         QX| 1050|
+---------------------------+-----+
only showing top 20 rows
+------+-----+
|Origin|count|
+------+-----+
|   DFW| 6076|
|   ORD| 5998|
|   DEN| 5387|
|   ATL| 5098|
|   CL

In [0]:
from pyspark.sql.functions import col, count, desc

df_delayed = df_silver.filter(col("ArrDelayMinutes") > 15)

repeat_delayed_cities = (
    df_delayed.groupBy("OriginCityName")
    .agg(count("*").alias("DelayedFlightCount"))
    .orderBy(desc("DelayedFlightCount"))
)
repeat_delayed_cities.write.format("delta").mode("overwrite").saveAsTable("flight_data_db.repeat_delayed_cities")


In [0]:
gold_delay_dist.write.format("delta").mode("overwrite").saveAsTable("flight_data_db.gold_delay_dist")
gold_delay_trend.write.format("delta").mode("overwrite").saveAsTable("flight_data_db.gold_delay_trend")
gold_cancellation_reason.write.format("delta").mode("overwrite").saveAsTable("flight_data_db.gold_cancellation_reason")
gold_routes.write.format("delta").mode("overwrite").saveAsTable("flight_data_db.gold_routes")
gold_geo_delays.write.format("delta").mode("overwrite").saveAsTable("flight_data_db.gold_geo_delays")
gold_monthly_metrics.write.format("delta").mode("overwrite").saveAsTable("flight_data_db.gold_monthly_metrics")
