In [0]:
# ============================================================
# Create business-ready analytics tables (Gold layer)
# from the curated Silver flights fact table.
#
# Gold layer characteristics:
# - Aggregated data
# - Business metrics
# - Optimized for reporting
# ============================================================

In [0]:
# Base project path in DBFS
base_path = "dbfs:/FileStore/tables/FileStores/flight_project"

# Silver input (trusted, curated data)
silver_flights_path = f"{base_path}/silver/flights_fact"

# Gold output base path
gold_path = f"{base_path}/gold"

In [0]:
# Read Silver flights fact table
# This is the final, trusted dataset created in Silver layer

silver_df = spark.read.format("delta") \
    .load(silver_flights_path)
silver_df.display()
silver_df.printSchema()

YEAR,MONTH,DAY,AIRLINE,airline_name,ORIGIN_AIRPORT,airport_name,city,state,ARRIVAL_DELAY,DEPARTURE_DELAY,is_delayed,is_cancelled,FLIGHT_NUMBER
2015,1,1,AA,American Airlines Inc.,IAH,George Bush Intercontinental Airport,Houston,TX,54.0,58.0,1,0,89
2015,1,1,AA,American Airlines Inc.,LAX,Los Angeles International Airport,Los Angeles,CA,-12.0,-2.0,0,0,115
2018,5,1,AA,American Airlines Inc.,LAX,Los Angeles International Airport,Los Angeles,CA,,,0,0,209
2018,5,24,AA,American Airlines Inc.,LAX,Los Angeles International Airport,Los Angeles,CA,,,0,0,209
2018,9,28,AA,American Airlines Inc.,LAX,Los Angeles International Airport,Los Angeles,CA,,,0,0,209
2018,10,5,AA,American Airlines Inc.,LAX,Los Angeles International Airport,Los Angeles,CA,,,0,0,209
2020,11,21,AA,American Airlines Inc.,LAX,Los Angeles International Airport,Los Angeles,CA,,,0,0,209
2021,3,24,AA,American Airlines Inc.,LAX,Los Angeles International Airport,Los Angeles,CA,,,0,0,209
2021,8,16,AA,American Airlines Inc.,LAX,Los Angeles International Airport,Los Angeles,CA,,,0,0,209
2022,9,25,AA,American Airlines Inc.,LAX,Los Angeles International Airport,Los Angeles,CA,,,0,0,209


root
 |-- YEAR: integer (nullable = true)
 |-- MONTH: integer (nullable = true)
 |-- DAY: integer (nullable = true)
 |-- AIRLINE: string (nullable = true)
 |-- airline_name: string (nullable = true)
 |-- ORIGIN_AIRPORT: string (nullable = true)
 |-- airport_name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- ARRIVAL_DELAY: integer (nullable = true)
 |-- DEPARTURE_DELAY: integer (nullable = true)
 |-- is_delayed: integer (nullable = true)
 |-- is_cancelled: integer (nullable = true)
 |-- FLIGHT_NUMBER: integer (nullable = true)



In [0]:
# Calculate total number of flights for each airline
# This helps understand airline-level traffic

from pyspark.sql.functions import count

flights_by_airline_df = silver_df.groupBy("AIRLINE") \
    .agg(
        count("*").alias("total_flights")
    )

In [0]:
flights_by_airline_df.show()

+-------+-------------+
|AIRLINE|total_flights|
+-------+-------------+
|     UA|           39|
|     NK|           12|
|     AA|           81|
|     EV|           51|
|     B6|           51|
|     DL|           87|
|     OO|           56|
|     F9|           15|
|     US|           70|
|     MQ|           23|
|     HA|           14|
|     AS|           22|
|     VX|            9|
|     WN|          160|
+-------+-------------+



In [0]:
# Write airline-level flight counts to Gold layer
# Overwrite mode is fine because Gold can be rebuilt anytime

flights_by_airline_df.write \
    .format("delta") \
    .mode("overwrite") \
    .save(f"{gold_path}/flights_by_airline")

In [0]:
spark.read.format("delta").load(f"{gold_path}/flights_by_airline").display()

AIRLINE,total_flights
UA,39
NK,12
AA,81
EV,51
B6,51
DL,87
OO,56
F9,15
US,70
MQ,23


In [0]:
# Calculate number of flights per month
# Useful for trend analysis and seasonality

monthly_trends_df = silver_df.groupBy(
    "YEAR",
    "MONTH"
).agg(
    count("*").alias("monthly_flights")
)

In [0]:
monthly_trends_df.display()

YEAR,MONTH,monthly_flights
2022,10,9
2019,10,11
2020,6,10
2019,5,10
2018,10,7
2021,8,5
2021,6,12
2021,5,7
2019,3,9
2021,10,9


In [0]:
monthly_trends_df.write \
    .format("delta") \
    .mode("overwrite") \
    .save(f"{gold_path}/monthly_trends")

In [0]:
spark.read.format("delta").load(f"{gold_path}/monthly_trends").display()

YEAR,MONTH,monthly_flights
2022,10,9
2019,10,11
2020,6,10
2019,5,10
2018,10,7
2021,8,5
2021,6,12
2021,5,7
2019,3,9
2021,10,9


In [0]:
# Calculate delay and cancellation statistics by airline
# These are key operational KPIs

from pyspark.sql.functions import avg, sum

delay_summary_df = silver_df.groupBy("AIRLINE").agg(
    avg("ARRIVAL_DELAY").alias("avg_arrival_delay"),
    avg("DEPARTURE_DELAY").alias("avg_departure_delay"),
    sum("is_cancelled").alias("total_cancelled_flights"),
    sum("is_delayed").alias("total_delayed_flights")
)


In [0]:
display(delay_summary_df)

AIRLINE,avg_arrival_delay,avg_departure_delay,total_cancelled_flights,total_delayed_flights
UA,4.428571428571429,7.142857142857143,0,5
NK,0.0833333333333333,4.916666666666667,0,3
AA,9.9,11.25,1,5
EV,5.5,8.2,0,1
B6,1.181818181818182,5.318181818181818,0,4
DL,-1.25,7.3125,0,1
OO,11.526315789473683,10.421052631578949,3,5
F9,-5.0,3.75,0,1
US,-1.8571428571428568,0.6428571428571429,0,1
MQ,,,5,0


In [0]:
# Write delay and cancellation summary to Gold layer

delay_summary_df.write \
    .format("delta") \
    .mode("overwrite") \
    .save(f"{gold_path}/delay_summary")

In [0]:
spark.read.format("delta").load(f"{gold_path}/delay_summary").display()

AIRLINE,avg_arrival_delay,avg_departure_delay,total_cancelled_flights,total_delayed_flights
UA,4.428571428571429,7.142857142857143,0,5
NK,0.0833333333333333,4.916666666666667,0,3
AA,9.9,11.25,1,5
EV,5.5,8.2,0,1
B6,1.181818181818182,5.318181818181818,0,4
DL,-1.25,7.3125,0,1
OO,11.526315789473683,10.421052631578949,3,5
F9,-5.0,3.75,0,1
US,-1.8571428571428568,0.6428571428571429,0,1
MQ,,,5,0


In [0]:
# Base project path
base_path = "dbfs:/FileStore/tables/FileStores/flight_project"

# Gold table paths
gold_flights_by_airline = f"{base_path}/gold/flights_by_airline"
gold_monthly_trends = f"{base_path}/gold/monthly_trends"
gold_delay_summary = f"{base_path}/gold/delay_summary"



In [0]:
# ------------------------------------------------------------
# OPTIMIZE: flights_by_airline
#
# Why?
# - Compacts small Delta files
# - Improves query performance
# - Reduces metadata overhead
# ------------------------------------------------------------

spark.sql(
    f"OPTIMIZE delta.`{gold_flights_by_airline}`"
)

DataFrame[path: string, metrics: struct<numFilesAdded:bigint,numFilesRemoved:bigint,filesAdded:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,filesRemoved:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,partitionsOptimized:bigint,zOrderStats:struct<strategyName:string,inputCubeFiles:struct<num:bigint,size:bigint>,inputOtherFiles:struct<num:bigint,size:bigint>,inputNumCubes:bigint,mergedFiles:struct<num:bigint,size:bigint>,numOutputCubes:bigint,mergedNumCubes:bigint>,clusteringStats:struct<inputZCubeFiles:struct<numFiles:bigint,size:bigint>,inputOtherFiles:struct<numFiles:bigint,size:bigint>,inputNumZCubes:bigint,mergedFiles:struct<numFiles:bigint,size:bigint>,numOutputZCubes:bigint>,numBins:bigint,numBatches:bigint,totalConsideredFiles:bigint,totalFilesSkipped:bigint,preserveInsertionOrder:boolean,numFilesSkippedToReduceWriteAmplification:bigint,numBytesSkippedToReduceWriteAmplification:bigint,startTimeMs:bigint,endTimeMs:bigint,

In [0]:
# ------------------------------------------------------------
# OPTIMIZE: monthly_trends
#
# This table is commonly filtered by YEAR and MONTH.
# File compaction improves scan performance.
# ------------------------------------------------------------

spark.sql(
    f"OPTIMIZE delta.`{gold_monthly_trends}`"
)

DataFrame[path: string, metrics: struct<numFilesAdded:bigint,numFilesRemoved:bigint,filesAdded:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,filesRemoved:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,partitionsOptimized:bigint,zOrderStats:struct<strategyName:string,inputCubeFiles:struct<num:bigint,size:bigint>,inputOtherFiles:struct<num:bigint,size:bigint>,inputNumCubes:bigint,mergedFiles:struct<num:bigint,size:bigint>,numOutputCubes:bigint,mergedNumCubes:bigint>,clusteringStats:struct<inputZCubeFiles:struct<numFiles:bigint,size:bigint>,inputOtherFiles:struct<numFiles:bigint,size:bigint>,inputNumZCubes:bigint,mergedFiles:struct<numFiles:bigint,size:bigint>,numOutputZCubes:bigint>,numBins:bigint,numBatches:bigint,totalConsideredFiles:bigint,totalFilesSkipped:bigint,preserveInsertionOrder:boolean,numFilesSkippedToReduceWriteAmplification:bigint,numBytesSkippedToReduceWriteAmplification:bigint,startTimeMs:bigint,endTimeMs:bigint,

In [0]:
# ------------------------------------------------------------
# OPTIMIZE: delay_summary
#
# Aggregated KPIs by airline.
# Optimizing helps dashboard queries run faster.
# ------------------------------------------------------------

spark.sql(
    f"OPTIMIZE delta.`{gold_delay_summary}`"
)

DataFrame[path: string, metrics: struct<numFilesAdded:bigint,numFilesRemoved:bigint,filesAdded:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,filesRemoved:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,partitionsOptimized:bigint,zOrderStats:struct<strategyName:string,inputCubeFiles:struct<num:bigint,size:bigint>,inputOtherFiles:struct<num:bigint,size:bigint>,inputNumCubes:bigint,mergedFiles:struct<num:bigint,size:bigint>,numOutputCubes:bigint,mergedNumCubes:bigint>,clusteringStats:struct<inputZCubeFiles:struct<numFiles:bigint,size:bigint>,inputOtherFiles:struct<numFiles:bigint,size:bigint>,inputNumZCubes:bigint,mergedFiles:struct<numFiles:bigint,size:bigint>,numOutputZCubes:bigint>,numBins:bigint,numBatches:bigint,totalConsideredFiles:bigint,totalFilesSkipped:bigint,preserveInsertionOrder:boolean,numFilesSkippedToReduceWriteAmplification:bigint,numBytesSkippedToReduceWriteAmplification:bigint,startTimeMs:bigint,endTimeMs:bigint,

In [0]:
# ------------------------------------------------------------
# Z-ORDER: flights_by_airline
#
# Why Z-ORDER?
# - Improves data skipping
# - Faster filtering on AIRLINE column
# ------------------------------------------------------------

spark.sql(
    f"""
    OPTIMIZE delta.`{gold_flights_by_airline}`
    ZORDER BY (AIRLINE)
    """
)

DataFrame[path: string, metrics: struct<numFilesAdded:bigint,numFilesRemoved:bigint,filesAdded:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,filesRemoved:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,partitionsOptimized:bigint,zOrderStats:struct<strategyName:string,inputCubeFiles:struct<num:bigint,size:bigint>,inputOtherFiles:struct<num:bigint,size:bigint>,inputNumCubes:bigint,mergedFiles:struct<num:bigint,size:bigint>,numOutputCubes:bigint,mergedNumCubes:bigint>,clusteringStats:struct<inputZCubeFiles:struct<numFiles:bigint,size:bigint>,inputOtherFiles:struct<numFiles:bigint,size:bigint>,inputNumZCubes:bigint,mergedFiles:struct<numFiles:bigint,size:bigint>,numOutputZCubes:bigint>,numBins:bigint,numBatches:bigint,totalConsideredFiles:bigint,totalFilesSkipped:bigint,preserveInsertionOrder:boolean,numFilesSkippedToReduceWriteAmplification:bigint,numBytesSkippedToReduceWriteAmplification:bigint,startTimeMs:bigint,endTimeMs:bigint,

In [0]:
# ------------------------------------------------------------
# Z-ORDER: monthly_trends
#
# Improves performance for time-based queries
# ------------------------------------------------------------

spark.sql(
    f"""
    OPTIMIZE delta.`{gold_monthly_trends}`
    ZORDER BY (YEAR, MONTH)
    """
)

DataFrame[path: string, metrics: struct<numFilesAdded:bigint,numFilesRemoved:bigint,filesAdded:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,filesRemoved:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,partitionsOptimized:bigint,zOrderStats:struct<strategyName:string,inputCubeFiles:struct<num:bigint,size:bigint>,inputOtherFiles:struct<num:bigint,size:bigint>,inputNumCubes:bigint,mergedFiles:struct<num:bigint,size:bigint>,numOutputCubes:bigint,mergedNumCubes:bigint>,clusteringStats:struct<inputZCubeFiles:struct<numFiles:bigint,size:bigint>,inputOtherFiles:struct<numFiles:bigint,size:bigint>,inputNumZCubes:bigint,mergedFiles:struct<numFiles:bigint,size:bigint>,numOutputZCubes:bigint>,numBins:bigint,numBatches:bigint,totalConsideredFiles:bigint,totalFilesSkipped:bigint,preserveInsertionOrder:boolean,numFilesSkippedToReduceWriteAmplification:bigint,numBytesSkippedToReduceWriteAmplification:bigint,startTimeMs:bigint,endTimeMs:bigint,