In [0]:
df_flight = (
    spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .csv("/Volumes/workspace/flight_delay/flight_delay_data/*.csv")
)

df_flight.display()

# Medallion Architecture 
## 1. Bronze Layer

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Read all years of flight data
df_bronze = (
    spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .csv("/Volumes/workspace/flight_delay/flight_delay_data/*.csv")
    .withColumn("ingestion_ts", current_timestamp())
)

df_bronze.display()


In [0]:
df_bronze.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("workspace.flight_delay.bronze_flights")

## 2.silver layer

In [0]:
from pyspark.sql.functions import *

df_silver = df_bronze.select(
    "FL_DATE",
    "AIRLINE",
    "ORIGIN", "ORIGIN_CITY",
    "DEST", "DEST_CITY",
    "CRS_DEP_TIME", "CRS_ARR_TIME",
    "DEP_DELAY", "ARR_DELAY",
    "DISTANCE",
    "CANCELLED", "DIVERTED",
    "DELAY_DUE_CARRIER",
    "DELAY_DUE_WEATHER",
    "DELAY_DUE_NAS",
    "DELAY_DUE_SECURITY",
    "DELAY_DUE_LATE_AIRCRAFT"
)


In [0]:
delay_cols = [
    "DEP_DELAY", "ARR_DELAY",
    "DELAY_DUE_CARRIER", "DELAY_DUE_WEATHER",
    "DELAY_DUE_NAS", "DELAY_DUE_SECURITY",
    "DELAY_DUE_LATE_AIRCRAFT"
]

df_silver = df_silver.fillna(0, subset=delay_cols)

df_silver = df_silver.dropna(
    subset=["FL_DATE", "ORIGIN", "DEST", "AIRLINE"]
)


In [0]:
df_silver = df_silver.filter(col("CANCELLED") == 0)


In [0]:
df_features = (
    df_silver
    .withColumn("month", month("FL_DATE"))
    .withColumn("day_of_week", dayofweek("FL_DATE"))
    .withColumn("dep_hour", floor(col("CRS_DEP_TIME") / 100))
)


In [0]:
df_features = df_features.withColumn(
    "is_peak_hour",
    when(
        (col("dep_hour").between(7, 10)) | (col("dep_hour").between(16, 20)),
        1
    ).otherwise(0)
)


In [0]:
df_features = df_features.withColumn(
    "is_delayed",
    when(col("ARR_DELAY") > 15, 1).otherwise(0)
)


In [0]:
p99 = df_features.approxQuantile("ARR_DELAY", [0.99], 0.01)[0]

df_features = df_features.withColumn(
    "ARR_DELAY_CAPPED",
    when(col("ARR_DELAY") > p99, p99).otherwise(col("ARR_DELAY"))
)

In [0]:
df_silver.write.format("delta").mode("overwrite") \
    .saveAsTable("workspace.flight_delay.silver_flights")

## 3. Gold layer

In [0]:
df_gold = df_features.select(
    "AIRLINE",
    "ORIGIN", "DEST",
    "DISTANCE",
    "dep_hour", "month", "day_of_week",
    "is_peak_hour",
    "DELAY_DUE_WEATHER",
    "DELAY_DUE_NAS",
    "DELAY_DUE_CARRIER",
    "DELAY_DUE_LATE_AIRCRAFT",
    "is_delayed"
)

df_gold.display()

In [0]:
df_gold.write.format("delta").mode("overwrite") \
    .saveAsTable("workspace.flight_delay.gold_flight_features")


In [0]:
%sql
DESCRIBE HISTORY workspace.flight_delay.bronze_flights;

In [0]:
%sql
SELECT COUNT(*) 
FROM workspace.flight_delay.bronze_flights VERSION AS OF 0;

In [0]:
%sql
SELECT COUNT(*) 
FROM workspace.flight_delay.bronze_flights;

In [0]:
%sql
OPTIMIZE workspace.flight_delay.silver_flights;

In [0]:
%sql
OPTIMIZE workspace.flight_delay.gold_flight_features
ZORDER BY (ORIGIN, DEST, AIRLINE);

## transformation and business logic

In [0]:
df_transformed = df_silver.withColumn(
    "is_delayed",
    when(col("ARR_DELAY") > 15, 1).otherwise(0)
)


In [0]:
df_transformed = df_transformed.filter(col("CANCELLED") == 0)


In [0]:
df_transformed = df_transformed.withColumn(
    "dep_hour",
    floor(col("CRS_DEP_TIME") / 100)
)

df_transformed = df_transformed.withColumn(
    "is_peak_hour",
    when(
        (col("dep_hour").between(7, 10)) |
        (col("dep_hour").between(16, 20)), 1
    ).otherwise(0)
)


In [0]:
df_transformed = df_transformed.withColumn(
    "primary_delay_reason",
    when(col("DELAY_DUE_WEATHER") > 0, "Weather")
    .when(col("DELAY_DUE_NAS") > 0, "Air Traffic")
    .when(col("DELAY_DUE_CARRIER") > 0, "Carrier")
    .when(col("DELAY_DUE_LATE_AIRCRAFT") > 0, "Late Aircraft")
    .otherwise("No Delay")
)


In [0]:
p99 = df_transformed.approxQuantile("ARR_DELAY", [0.99], 0.01)[0]

df_transformed = df_transformed.withColumn(
    "ARR_DELAY_CAPPED",
    when(col("ARR_DELAY") > p99, p99).otherwise(col("ARR_DELAY"))
)


In [0]:
df_route_kpis = df_transformed.groupBy("ORIGIN", "DEST").agg(
    count("*").alias("total_flights"),
    avg("ARR_DELAY").alias("avg_arrival_delay"),
    sum("is_delayed").alias("delayed_flights")
)


In [0]:
%sql
SHOW GRANTS ON TABLE workspace.flight_delay.gold_flight_features;
