In [0]:
silver_path = "/mnt/silver/delta/airline_delays"           # silver delta path
gold_base_path = "/mnt/gold-parquet/airline_delays_gold"            # base path for gold Delta tables
gold_monthly_path = f"{gold_base_path}/monthly"             # partitioned by year/month
gold_carrier_path = f"{gold_base_path}/carrier"             # partitioned by carrier/year/month
gold_causes_path = f"{gold_base_path}/causes" 
gold_master_path = f"{gold_base_path}/master"

In [0]:

storage_account_name = "saflightdelays"
container_name = "gold"
container_name_parq='gold-parquet'
storage_account_key = ""

mount_point = f"/mnt/{container_name_parq}"
source_url = f"wasbs://{container_name_parq}@{storage_account_name}.blob.core.windows.net"

# Check if already mounted and unmount if needed
for mnt in dbutils.fs.mounts():
    if mnt.mountPoint == mount_point:
        print(f"Already mounted at {mount_point}, unmounting first...")
        dbutils.fs.unmount(mount_point)

# Mount
dbutils.fs.mount(
    source=source_url,
    mount_point=mount_point,
    extra_configs={f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net": storage_account_key}
)

print(f"Mounted successfully: {mount_point}")

Already mounted at /mnt/gold-parquet, unmounting first...
/mnt/gold-parquet has been unmounted.
Mounted successfully: /mnt/gold-parquet


In [0]:

from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType, IntegerType
from delta.tables import DeltaTable

# Paths 
silver_path = "/mnt/silver/delta/airline_delays"           # silver delta path
gold_base_path = "/mnt/gold/airline_delays_gold"            # base path for gold Delta tables
gold_monthly_path = f"{gold_base_path}/monthly"             # partitioned by year/month
gold_carrier_path = f"{gold_base_path}/carrier"             # partitioned by carrier/year/month
gold_airport_path = f"{gold_base_path}/airport"             # partitioned by airport/year/month

#1) Read silver table 
df = spark.read.format("delta").load(silver_path)

df = df.na.fill({c: 0 for c in ["arr_flights", "arr_del15", "carrier_ct", "weather_ct", "nas_ct", "security_ct", "late_aircraft_ct", "arr_cancelled", "arr_diverted", "arr_delay", "carrier_delay"]})


df = (df
      .withColumn("year", F.col("year").cast(IntegerType()))
      .withColumn("month", F.col("month").cast(IntegerType()))
      .withColumn("arr_flights", F.col("arr_flights").cast(DoubleType()))
      .withColumn("arr_del15", F.col("arr_del15").cast(DoubleType()))
      .withColumn("carrier_ct", F.col("carrier_ct").cast(DoubleType()))
      .withColumn("weather_ct", F.col("weather_ct").cast(DoubleType()))
      .withColumn("nas_ct", F.col("nas_ct").cast(DoubleType()))
      .withColumn("security_ct", F.col("security_ct").cast(DoubleType()))
      .withColumn("late_aircraft_ct", F.col("late_aircraft_ct").cast(DoubleType()))
      .withColumn("arr_cancelled", F.col("arr_cancelled").cast(DoubleType()))
      .withColumn("arr_diverted", F.col("arr_diverted").cast(DoubleType()))
      .withColumn("arr_delay", F.col("arr_delay").cast(DoubleType()))
      .withColumn("carrier_delay", F.col("carrier_delay").cast(DoubleType()))
     )

In [0]:
# Create a canonical timestamp column if available; otherwise derive a synthetic date from year/month

df = df.withColumn("year_month", F.concat_ws("-", F.col("year"), F.lpad(F.col("month").cast("string"), 2, "0")))

# 2) Define KPI formulas 
# delay_rate = arr_del15 / arr_flights  (fraction of flights delayed >15)
# avg_delay_per_flight = arr_delay / arr_flights (minutes per flight)
# cancel_rate = arr_cancelled / arr_flights
# divert_rate = arr_diverted / arr_flights
# percent_by_cause = cause_ct / sum_of_cause_cts  (normalize carrier/weather/nas/security/late_aircraft)
# also compute P95 and median if you have per-flight delay distribution (we have aggregated sums so P95 needs original events)

cause_cols = ["carrier_ct", "weather_ct", "nas_ct", "security_ct", "late_aircraft_ct"]

# Add basic KPIs at row level (guard divide-by-zero)
df_k = (df
       .withColumn("delay_rate", F.when(F.col("arr_flights") > 0, F.col("arr_del15") / F.col("arr_flights")).otherwise(F.lit(0.0)))
       .withColumn("avg_delay_per_flight", F.when(F.col("arr_flights") > 0, F.col("arr_delay") / F.col("arr_flights")).otherwise(F.lit(0.0)))
       .withColumn("cancel_rate", F.when(F.col("arr_flights") > 0, F.col("arr_cancelled") / F.col("arr_flights")).otherwise(F.lit(0.0)))
       .withColumn("divert_rate", F.when(F.col("arr_flights") > 0, F.col("arr_diverted") / F.col("arr_flights")).otherwise(F.lit(0.0)))
      )

# compute sum of cause counts (for normalization)
df_k = df_k.withColumn("cause_total", sum([F.col(c) for c in cause_cols]))

for c in cause_cols:
    df_k = df_k.withColumn(f"{c}_pct", F.when(F.col("cause_total") > 0, F.col(c) / F.col("cause_total")).otherwise(F.lit(0.0)))


In [0]:
display(df_k)

year,month,carrier,carrier_name,airport,airport_name,arr_flights,arr_del15,carrier_ct,weather_ct,nas_ct,security_ct,late_aircraft_ct,arr_cancelled,arr_diverted,arr_delay,carrier_delay,year_month,delay_rate,avg_delay_per_flight,cancel_rate,divert_rate,cause_total,carrier_ct_pct,weather_ct_pct,nas_ct_pct,security_ct_pct,late_aircraft_ct_pct
2025,7,YV,Mesa Airlines Inc.,CRP,"Corpus Christi, TX: Corpus Christi International",19.0,4.0,0.71,0.3,0.7,0.0,2.29,1.0,0.0,823.0,45.0,2025-07,0.2105263157894736,43.31578947368421,0.0526315789473684,0.0,4.0,0.1775,0.075,0.175,0.0,0.5725
2025,7,YV,Mesa Airlines Inc.,CVG,"Cincinnati, OH: Cincinnati/Northern Kentucky International",75.0,14.0,7.64,1.0,3.57,0.0,1.79,0.0,0.0,722.0,485.0,2025-07,0.1866666666666666,9.626666666666669,0.0,0.0,14.0,0.5457142857142857,0.0714285714285714,0.255,0.0,0.1278571428571428
2025,7,YV,Mesa Airlines Inc.,DFW,"Dallas/Fort Worth, TX: Dallas/Fort Worth International",97.0,23.0,8.09,0.73,4.85,0.0,9.33,1.0,0.0,2125.0,655.0,2025-07,0.2371134020618556,21.90721649484536,0.0103092783505154,0.0,23.0,0.3517391304347826,0.0317391304347826,0.2108695652173912,0.0,0.4056521739130435
2025,7,YV,Mesa Airlines Inc.,GUC,"Gunnison, CO: Gunnison-Crested Butte Regional",8.0,1.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,150.0,0.0,2025-07,0.125,18.75,0.0,0.0,1.0,0.0,1.0,0.0,0.0,0.0
2025,7,YV,Mesa Airlines Inc.,IAD,"Washington, DC: Washington Dulles International",978.0,234.0,52.94,14.27,67.41,0.0,99.39,48.0,6.0,23417.0,3756.0,2025-07,0.2392638036809816,23.94376278118609,0.0490797546012269,0.0061349693251533,234.01,0.2262296483056279,0.06098029998718,0.2880646126233921,0.0,0.4247254390837998
2025,5,OO,SkyWest Airlines Inc.,BFL,"Bakersfield, CA: Meadows Field",133.0,30.0,12.31,3.0,8.98,0.0,5.71,0.0,0.0,2589.0,563.0,2025-05,0.225563909774436,19.466165413533837,0.0,0.0,30.0,0.4103333333333333,0.1,0.2993333333333333,0.0,0.1903333333333333
2025,5,OO,SkyWest Airlines Inc.,BGM,"Binghamton, NY: Greater Binghamton/Edwin A. Link Field",25.0,2.0,2.0,0.0,0.0,0.0,0.0,0.0,1.0,60.0,60.0,2025-05,0.08,2.4,0.0,0.04,2.0,1.0,0.0,0.0,0.0,0.0
2025,5,OO,SkyWest Airlines Inc.,BHM,"Birmingham, AL: Birmingham-Shuttlesworth International",176.0,54.0,31.88,6.83,7.12,0.0,8.18,1.0,0.0,3970.0,2103.0,2025-05,0.3068181818181818,22.556818181818183,0.0056818181818181,0.0,54.01,0.5902610627661544,0.1264580633216071,0.1318274393630809,0.0,0.1514534345491575
2025,5,OO,SkyWest Airlines Inc.,BIL,"Billings, MT: Billings Logan International",175.0,24.0,10.29,1.68,4.63,0.0,7.4,1.0,1.0,1579.0,470.0,2025-05,0.1371428571428571,9.022857142857143,0.0057142857142857,0.0057142857142857,24.0,0.4287499999999999,0.0699999999999999,0.1929166666666666,0.0,0.3083333333333333
2025,5,OO,SkyWest Airlines Inc.,BIS,"Bismarck/Mandan, ND: Bismarck Municipal",144.0,29.0,8.59,4.71,7.3,0.0,8.4,2.0,1.0,4274.0,1089.0,2025-05,0.2013888888888889,29.68055555555556,0.0138888888888888,0.0069444444444444,29.0,0.2962068965517241,0.1624137931034482,0.2517241379310345,0.0,0.2896551724137931


In [0]:
# 3) Monthly-level aggregations (gold_monthly) 
agg_monthly = (df_k
               .groupBy("year", "month", "year_month")
               .agg(
                   F.sum("arr_flights").alias("total_arr_flights"),
                   F.sum("arr_del15").alias("total_arr_del15"),
                   F.sum("arr_delay").alias("total_arr_delay_minutes"),
                   F.sum("arr_cancelled").alias("total_arr_cancelled"),
                   F.sum("arr_diverted").alias("total_arr_diverted"),
                   *[F.sum(c).alias(f"sum_{c}") for c in cause_cols]
               )
              )

agg_monthly = (agg_monthly
               .withColumn("delay_rate", F.when(F.col("total_arr_flights")>0, F.col("total_arr_del15")/F.col("total_arr_flights")).otherwise(0.0))
               .withColumn("avg_delay_per_flight", F.when(F.col("total_arr_flights")>0, F.col("total_arr_delay_minutes")/F.col("total_arr_flights")).otherwise(0.0))
               .withColumn("cancel_rate", F.when(F.col("total_arr_flights")>0, F.col("total_arr_cancelled")/F.col("total_arr_flights")).otherwise(0.0))
               .withColumn("divert_rate", F.when(F.col("total_arr_flights")>0, F.col("total_arr_diverted")/F.col("total_arr_flights")).otherwise(0.0))
              )

# Add normalized cause shares
for c in cause_cols:
    agg_monthly = agg_monthly.withColumn(f"{c}_pct", F.when((F.col("sum_" + c) + 0) > 0, F.col("sum_" + c) / (F.expr(" + ".join([f"sum_{col}" for col in cause_cols])))).otherwise(0.0))


In [0]:
display(agg_monthly.limit(5))

year,month,year_month,total_arr_flights,total_arr_del15,total_arr_delay_minutes,total_arr_cancelled,total_arr_diverted,sum_carrier_ct,sum_weather_ct,sum_nas_ct,sum_security_ct,sum_late_aircraft_ct,delay_rate,avg_delay_per_flight,cancel_rate,divert_rate,carrier_ct_pct,weather_ct_pct,nas_ct_pct,security_ct_pct,late_aircraft_ct_pct
2024,12,2024-12,631944.0,132515.0,9585044.0,4579.0,1752.0,43489.29000000008,5391.969999999997,34704.26999999997,308.71999999999974,48620.75999999997,0.2096942134113149,15.16755282113605,0.0072458952059043,0.0027723975542136,0.3281838789432237,0.0406895037777229,0.2618893512516051,0.0023296983488889,0.3669075676785593
2024,8,2024-08,660639.0,150010.0,11292440.0,13434.0,2174.0,46480.130000000005,6147.579999999993,41086.280000000086,331.8900000000001,55964.15999999992,0.2270680356442777,17.093208242322962,0.0203348576151271,0.0032907533463813,0.3098467942545712,0.040981123663456,0.273890200949217,0.0022124519132186,0.373069429219537
2025,5,2025-05,667586.0,155262.0,11438449.0,7549.0,1898.0,43212.07000000002,7216.090000000004,49775.12000000002,203.42,54855.019999999975,0.2325722828219884,17.134045651047206,0.0113079063970784,0.0028430793935163,0.2783176046226978,0.046476942288157,0.3205884876194854,0.0013101748454158,0.3533067906242438
2025,4,2025-04,644084.0,124749.0,8747498.0,5291.0,1443.0,37564.129999999925,4662.500000000001,36994.31999999997,231.10999999999996,45297.05999999998,0.1936843641512597,13.5812999546643,0.0082147670179666,0.0022403910048999,0.3011173946557696,0.0373750131463853,0.2965497472046296,0.0018525982387691,0.3631052467544463
2024,6,2024-06,651799.0,157404.0,11642182.0,8315.0,2105.0,50697.429999999935,6373.670000000002,38325.49000000005,435.89000000000016,61571.86999999999,0.2414916254857709,17.86161377970816,0.0127570002408718,0.0032295232119104,0.3220840465971871,0.0404923370923357,0.2434843128541241,0.0027692373177742,0.3911700661385787


In [0]:
#4) Carrier-level KPIs (gold_carrier) 
agg_carrier = (df_k
               .groupBy("carrier", "carrier_name", "year", "month")
               .agg(
                   F.sum("arr_flights").alias("total_arr_flights"),
                   F.sum("arr_del15").alias("total_arr_del15"),
                   F.sum("arr_delay").alias("total_arr_delay_minutes"),
                   F.sum("arr_cancelled").alias("total_arr_cancelled"),
                   F.sum("arr_diverted").alias("total_arr_diverted"),
                   *[F.sum(c).alias(f"sum_{c}") for c in cause_cols]
               )
              )

agg_carrier = (agg_carrier
               .withColumn("delay_rate", F.when(F.col("total_arr_flights")>0, F.col("total_arr_del15")/F.col("total_arr_flights")).otherwise(0.0))
               .withColumn("avg_delay_per_flight", F.when(F.col("total_arr_flights")>0, F.col("total_arr_delay_minutes")/F.col("total_arr_flights")).otherwise(0.0))
               .withColumn("cancel_rate", F.when(F.col("total_arr_flights")>0, F.col("total_arr_cancelled")/F.col("total_arr_flights")).otherwise(0.0))
               .withColumn("divert_rate", F.when(F.col("total_arr_flights")>0, F.col("total_arr_diverted")/F.col("total_arr_flights")).otherwise(0.0))
              )

for c in cause_cols:
    agg_carrier = agg_carrier.withColumn(f"{c}_pct", F.when((F.col("sum_" + c) + 0) > 0, F.col("sum_" + c) / (F.expr(" + ".join([f"sum_{col}" for col in cause_cols])))).otherwise(0.0))


In [0]:
agg_carrier.limit(5).display()

carrier,carrier_name,year,month,total_arr_flights,total_arr_del15,total_arr_delay_minutes,total_arr_cancelled,total_arr_diverted,sum_carrier_ct,sum_weather_ct,sum_nas_ct,sum_security_ct,sum_late_aircraft_ct,delay_rate,avg_delay_per_flight,cancel_rate,divert_rate,carrier_ct_pct,weather_ct_pct,nas_ct_pct,security_ct_pct,late_aircraft_ct_pct
9E,Endeavor Air Inc.,2025,2,16896.0,3312.0,243201.0,339.0,41.0,681.68,121.5,1241.4599999999996,1.44,1265.9599999999998,0.1960227272727272,14.393998579545457,0.0200639204545454,0.0024266098484848,0.2058187703047065,0.0366843395611164,0.3748324295600294,0.0004347773577613798,0.3822296832163863
OH,PSA Airlines Inc.,2024,5,18829.0,5329.0,429500.0,178.0,64.0,1206.2900000000002,333.8700000000002,1532.06,17.5,2239.230000000001,0.2830208720590578,22.81055818152849,0.0094535025758139,0.0033990121620903,0.2263654190788054,0.062652117208831,0.2874975370382532,0.0032839489955807,0.4202009776785296
G7,GoJet Airlines LLC d/b/a United Express,2025,2,5220.0,1394.0,140921.0,150.0,6.0,289.44,25.21,461.6499999999999,0.0,617.72,0.2670498084291188,26.996360153256703,0.028735632183908,0.0011494252873563,0.207629732715456,0.0180843890331559,0.3311645457023571,0.0,0.4431213325490309
G7,GoJet Airlines LLC d/b/a United Express,2024,11,5558.0,1435.0,137827.0,40.0,14.0,303.82,8.28,503.42,0.16,619.2900000000001,0.2581863979848866,24.797948902482908,0.0071968333933069,0.0025188916876574,0.211725680676251,0.0057701554736335,0.350822665282201,0.00011150058886248492,0.4315699979790519
QX,Horizon Air,2025,1,7754.0,1285.0,66695.0,143.0,32.0,324.39000000000004,59.44,372.63,2.64,525.95,0.1657209182357493,8.601367036368327,0.0184420944028888,0.004126902244003,0.2524337574413447,0.046255009532703,0.289973152795611,0.0020543947706314,0.4092836854597097


In [0]:
# 5) Delay Causes KPI 
from pyspark.sql.functions import round as round_, sum as sum_, col, expr

causes = ["carrier_ct", "weather_ct", "nas_ct", "security_ct", "late_aircraft_ct"]

agg_causes = (
    df_k
    .groupBy("carrier", "carrier_name", "year", "month")
    .agg(*[sum_(col(c).cast("double")).alias(f"sum_{c}") for c in causes])
)

# compute total and percent shares
total_expr = " + ".join([f"sum_{c}" for c in causes])
agg_causes = (
    agg_causes
    .withColumn("total_cause_minutes", expr(total_expr))
)

for c in causes:
    agg_causes = agg_causes.withColumn(f"{c}_pct", expr(f"CASE WHEN total_cause_minutes>0 THEN round(100.0*sum_{c}/total_cause_minutes,2) ELSE null END"))



In [0]:
# === 6) Persist gold tables as Delta (partition for performance) ===
# monthly: partition by year/month
# (agg_monthly
#  .write
#  .format("delta")
#  .mode("overwrite")   # use append for incremental jobs; overwrite for first-run dev
#  .option("overwriteSchema", "true")
#  .partitionBy("year","month")
#  .save(gold_monthly_path)
# )

# # carrier-level: partition by carrier/year/month
# (agg_carrier
#  .write
#  .format("delta")
#  .mode("overwrite")
#  .option("overwriteSchema", "true")
#  .partitionBy("carrier","year","month")
#  .save(gold_carrier_path)
# )

# # delay causes kpi

# ( agg_causes
#     .write
#     .format("delta")
#     .mode("overwrite")
#     .option("overwriteSchema","true")
#     .partitionBy("year", "month")
#     .save(gold_causes_path)
# )




In [0]:
# register in the Hive metastore / Databricks catalog for SQL access
spark.sql(f"CREATE TABLE IF NOT EXISTS gold_monthly USING DELTA LOCATION '{gold_monthly_path}'")
spark.sql(f"CREATE TABLE IF NOT EXISTS gold_carrier USING DELTA LOCATION '{gold_carrier_path}'")
spark.sql(f"CREATE TABLE IF NOT EXISTS gold_causes USING DELTA LOCATION '{gold_causes_path}'")

print("Gold tables written and registered.")


Gold tables written and registered.


In [0]:
#  Variables: set your target parquet paths (adjust to your storage mount / abfss / wasbs path) ---


compression_codec = "snappy"   # fast + splittable -> good for parquet
target_repartition_files = 8   # tune for your cluster / file size needs (8 is an example)

def write_parquet(df, path, partition_by=None, mode="overwrite", repart_files=None, compression="snappy"):
    """
    df: Spark DataFrame
    path: destination path
    partition_by: list of partition column names (or None)
    mode: 'overwrite' for first-run dev; use 'append' for incremental jobs
    repart_files: None or int - if set, will call df.repartition(repart_files, *partition_by) when partition_by provided
    compression: parquet compression codec
    """
    writer = df
    # If repartition requested and partition columns exist, repartition by partition columns + target number
    if repart_files:
        if partition_by:
            writer = df.repartition(repart_files, *[col for col in partition_by])
        else:
            writer = df.repartition(repart_files)

    w = writer.write
    w = w.format("parquet").mode(mode).option("compression", compression)
    if partition_by:
        w = w.partitionBy(*partition_by)
    # For first-run / dev you may want to overwrite; for production incremental use append
    w.save(path)
    print(f"Wrote dataframe -> {path}  (mode={mode}, partition_by={partition_by}, files={repart_files}, compression={compression})")


# 1) monthly: partition by year/month

write_parquet(
    df = agg_monthly,
    path = gold_monthly_path,
    mode = "overwrite",             # change to "append" for incremental
    repart_files = target_repartition_files,
    compression = compression_codec
)


# 2) carrier-level: 
write_parquet(
    df = agg_carrier,
    path = gold_carrier_path,
    mode = "overwrite",
    repart_files = target_repartition_files,
    compression = compression_codec
)



# 3) delay causes KPI 

write_parquet(
    df = agg_causes,
    path = gold_causes_path,
    mode = "overwrite",
    repart_files = target_repartition_files,
    compression = compression_codec
)

write_parquet(
    df = df_k,
    path = gold_master_path,
    mode = "overwrite",
    repart_files = target_repartition_files,
    compression = compression_codec
)


# 4) Quick verification (Databricks helpers)

try:
    display(dbutils.fs.ls(gold_monthly_path))
    display(dbutils.fs.ls(gold_carrier_path))
    display(dbutils.fs.ls(gold_causes_path))
    display(dbutils.fs.ls(gold_master_path))
except Exception as e:
    print("dbutils not available; falling back to spark read-sample")
    # quick read sample
    print("Monthly sample:")
    spark.read.parquet(gold_monthly_path).limit(5).display()
    print("Carrier sample:")
    spark.read.parquet(gold_carrier_path).limit(5).display()
    print("Causes sample:")
    spark.read.parquet(gold_causes_path).limit(5).display()
    print("Causes sample:")
    spark.read.parquet(gold_master_path).limit(5).display()

Wrote dataframe -> /mnt/gold-parquet/airline_delays_gold/monthly  (mode=overwrite, partition_by=None, files=8, compression=snappy)
Wrote dataframe -> /mnt/gold-parquet/airline_delays_gold/carrier  (mode=overwrite, partition_by=None, files=8, compression=snappy)
Wrote dataframe -> /mnt/gold-parquet/airline_delays_gold/causes  (mode=overwrite, partition_by=None, files=8, compression=snappy)
Wrote dataframe -> /mnt/gold-parquet/airline_delays_gold/master  (mode=overwrite, partition_by=None, files=8, compression=snappy)


path,name,size,modificationTime
dbfs:/mnt/gold-parquet/airline_delays_gold/monthly/_SUCCESS,_SUCCESS,0,1762640803000
dbfs:/mnt/gold-parquet/airline_delays_gold/monthly/_committed_2108347030398625207,_committed_2108347030398625207,816,1762640366000
dbfs:/mnt/gold-parquet/airline_delays_gold/monthly/_committed_5258064619359259591,_committed_5258064619359259591,1626,1762640802000
dbfs:/mnt/gold-parquet/airline_delays_gold/monthly/_started_2108347030398625207,_started_2108347030398625207,0,1762640365000
dbfs:/mnt/gold-parquet/airline_delays_gold/monthly/_started_5258064619359259591,_started_5258064619359259591,0,1762640801000
dbfs:/mnt/gold-parquet/airline_delays_gold/monthly/part-00000-tid-5258064619359259591-2d6c21ef-f1d9-4470-9a48-7b41a5ada167-123-1.c000.snappy.parquet,part-00000-tid-5258064619359259591-2d6c21ef-f1d9-4470-9a48-7b41a5ada167-123-1.c000.snappy.parquet,5677,1762640801000
dbfs:/mnt/gold-parquet/airline_delays_gold/monthly/part-00001-tid-5258064619359259591-2d6c21ef-f1d9-4470-9a48-7b41a5ada167-124-1.c000.snappy.parquet,part-00001-tid-5258064619359259591-2d6c21ef-f1d9-4470-9a48-7b41a5ada167-124-1.c000.snappy.parquet,5674,1762640801000
dbfs:/mnt/gold-parquet/airline_delays_gold/monthly/part-00002-tid-5258064619359259591-2d6c21ef-f1d9-4470-9a48-7b41a5ada167-125-1.c000.snappy.parquet,part-00002-tid-5258064619359259591-2d6c21ef-f1d9-4470-9a48-7b41a5ada167-125-1.c000.snappy.parquet,5831,1762640801000
dbfs:/mnt/gold-parquet/airline_delays_gold/monthly/part-00003-tid-5258064619359259591-2d6c21ef-f1d9-4470-9a48-7b41a5ada167-126-1.c000.snappy.parquet,part-00003-tid-5258064619359259591-2d6c21ef-f1d9-4470-9a48-7b41a5ada167-126-1.c000.snappy.parquet,5673,1762640801000
dbfs:/mnt/gold-parquet/airline_delays_gold/monthly/part-00004-tid-5258064619359259591-2d6c21ef-f1d9-4470-9a48-7b41a5ada167-127-1.c000.snappy.parquet,part-00004-tid-5258064619359259591-2d6c21ef-f1d9-4470-9a48-7b41a5ada167-127-1.c000.snappy.parquet,5683,1762640802000


path,name,size,modificationTime
dbfs:/mnt/gold-parquet/airline_delays_gold/carrier/_SUCCESS,_SUCCESS,0,1762640806000
dbfs:/mnt/gold-parquet/airline_delays_gold/carrier/_committed_5481466059340698406,_committed_5481466059340698406,1626,1762640806000
dbfs:/mnt/gold-parquet/airline_delays_gold/carrier/_committed_5847147899503606816,_committed_5847147899503606816,816,1762640369000
dbfs:/mnt/gold-parquet/airline_delays_gold/carrier/_started_5481466059340698406,_started_5481466059340698406,0,1762640805000
dbfs:/mnt/gold-parquet/airline_delays_gold/carrier/_started_5847147899503606816,_started_5847147899503606816,0,1762640368000
dbfs:/mnt/gold-parquet/airline_delays_gold/carrier/part-00000-tid-5481466059340698406-a254b36e-6ba1-4f03-9665-bca3ee0b6ad1-136-1.c000.snappy.parquet,part-00000-tid-5481466059340698406-a254b36e-6ba1-4f03-9665-bca3ee0b6ad1-136-1.c000.snappy.parquet,14920,1762640805000
dbfs:/mnt/gold-parquet/airline_delays_gold/carrier/part-00001-tid-5481466059340698406-a254b36e-6ba1-4f03-9665-bca3ee0b6ad1-137-1.c000.snappy.parquet,part-00001-tid-5481466059340698406-a254b36e-6ba1-4f03-9665-bca3ee0b6ad1-137-1.c000.snappy.parquet,15055,1762640805000
dbfs:/mnt/gold-parquet/airline_delays_gold/carrier/part-00002-tid-5481466059340698406-a254b36e-6ba1-4f03-9665-bca3ee0b6ad1-138-1.c000.snappy.parquet,part-00002-tid-5481466059340698406-a254b36e-6ba1-4f03-9665-bca3ee0b6ad1-138-1.c000.snappy.parquet,15136,1762640805000
dbfs:/mnt/gold-parquet/airline_delays_gold/carrier/part-00003-tid-5481466059340698406-a254b36e-6ba1-4f03-9665-bca3ee0b6ad1-139-1.c000.snappy.parquet,part-00003-tid-5481466059340698406-a254b36e-6ba1-4f03-9665-bca3ee0b6ad1-139-1.c000.snappy.parquet,15114,1762640805000
dbfs:/mnt/gold-parquet/airline_delays_gold/carrier/part-00004-tid-5481466059340698406-a254b36e-6ba1-4f03-9665-bca3ee0b6ad1-140-1.c000.snappy.parquet,part-00004-tid-5481466059340698406-a254b36e-6ba1-4f03-9665-bca3ee0b6ad1-140-1.c000.snappy.parquet,15012,1762640805000


path,name,size,modificationTime
dbfs:/mnt/gold-parquet/airline_delays_gold/causes/_SUCCESS,_SUCCESS,0,1762640809000
dbfs:/mnt/gold-parquet/airline_delays_gold/causes/_committed_4452467296939432791,_committed_4452467296939432791,1632,1762640809000
dbfs:/mnt/gold-parquet/airline_delays_gold/causes/_committed_5367492173537244362,_committed_5367492173537244362,822,1762640372000
dbfs:/mnt/gold-parquet/airline_delays_gold/causes/_started_4452467296939432791,_started_4452467296939432791,0,1762640808000
dbfs:/mnt/gold-parquet/airline_delays_gold/causes/_started_5367492173537244362,_started_5367492173537244362,0,1762640371000
dbfs:/mnt/gold-parquet/airline_delays_gold/causes/part-00000-tid-4452467296939432791-f4f48876-44fc-4f67-8c7e-b75012c14363-149-1.c000.snappy.parquet,part-00000-tid-4452467296939432791-f4f48876-44fc-4f67-8c7e-b75012c14363-149-1.c000.snappy.parquet,9044,1762640808000
dbfs:/mnt/gold-parquet/airline_delays_gold/causes/part-00001-tid-4452467296939432791-f4f48876-44fc-4f67-8c7e-b75012c14363-150-1.c000.snappy.parquet,part-00001-tid-4452467296939432791-f4f48876-44fc-4f67-8c7e-b75012c14363-150-1.c000.snappy.parquet,9032,1762640808000
dbfs:/mnt/gold-parquet/airline_delays_gold/causes/part-00002-tid-4452467296939432791-f4f48876-44fc-4f67-8c7e-b75012c14363-151-1.c000.snappy.parquet,part-00002-tid-4452467296939432791-f4f48876-44fc-4f67-8c7e-b75012c14363-151-1.c000.snappy.parquet,9081,1762640808000
dbfs:/mnt/gold-parquet/airline_delays_gold/causes/part-00003-tid-4452467296939432791-f4f48876-44fc-4f67-8c7e-b75012c14363-152-1.c000.snappy.parquet,part-00003-tid-4452467296939432791-f4f48876-44fc-4f67-8c7e-b75012c14363-152-1.c000.snappy.parquet,9079,1762640808000
dbfs:/mnt/gold-parquet/airline_delays_gold/causes/part-00004-tid-4452467296939432791-f4f48876-44fc-4f67-8c7e-b75012c14363-153-1.c000.snappy.parquet,part-00004-tid-4452467296939432791-f4f48876-44fc-4f67-8c7e-b75012c14363-153-1.c000.snappy.parquet,9028,1762640808000


path,name,size,modificationTime
dbfs:/mnt/gold-parquet/airline_delays_gold/master/_SUCCESS,_SUCCESS,0,1762640813000
dbfs:/mnt/gold-parquet/airline_delays_gold/master/_committed_7318516604140017921,_committed_7318516604140017921,824,1762640812000
dbfs:/mnt/gold-parquet/airline_delays_gold/master/_started_7318516604140017921,_started_7318516604140017921,0,1762640811000
dbfs:/mnt/gold-parquet/airline_delays_gold/master/part-00000-tid-7318516604140017921-10e68129-c4ca-43db-add3-a2625b59b589-161-1.c000.snappy.parquet,part-00000-tid-7318516604140017921-10e68129-c4ca-43db-add3-a2625b59b589-161-1.c000.snappy.parquet,470677,1762640812000
dbfs:/mnt/gold-parquet/airline_delays_gold/master/part-00001-tid-7318516604140017921-10e68129-c4ca-43db-add3-a2625b59b589-162-1.c000.snappy.parquet,part-00001-tid-7318516604140017921-10e68129-c4ca-43db-add3-a2625b59b589-162-1.c000.snappy.parquet,469383,1762640812000
dbfs:/mnt/gold-parquet/airline_delays_gold/master/part-00002-tid-7318516604140017921-10e68129-c4ca-43db-add3-a2625b59b589-163-1.c000.snappy.parquet,part-00002-tid-7318516604140017921-10e68129-c4ca-43db-add3-a2625b59b589-163-1.c000.snappy.parquet,470854,1762640812000
dbfs:/mnt/gold-parquet/airline_delays_gold/master/part-00003-tid-7318516604140017921-10e68129-c4ca-43db-add3-a2625b59b589-164-1.c000.snappy.parquet,part-00003-tid-7318516604140017921-10e68129-c4ca-43db-add3-a2625b59b589-164-1.c000.snappy.parquet,471004,1762640812000
dbfs:/mnt/gold-parquet/airline_delays_gold/master/part-00004-tid-7318516604140017921-10e68129-c4ca-43db-add3-a2625b59b589-165-1.c000.snappy.parquet,part-00004-tid-7318516604140017921-10e68129-c4ca-43db-add3-a2625b59b589-165-1.c000.snappy.parquet,476317,1762640812000
dbfs:/mnt/gold-parquet/airline_delays_gold/master/part-00005-tid-7318516604140017921-10e68129-c4ca-43db-add3-a2625b59b589-166-1.c000.snappy.parquet,part-00005-tid-7318516604140017921-10e68129-c4ca-43db-add3-a2625b59b589-166-1.c000.snappy.parquet,475071,1762640812000
dbfs:/mnt/gold-parquet/airline_delays_gold/master/part-00006-tid-7318516604140017921-10e68129-c4ca-43db-add3-a2625b59b589-167-1.c000.snappy.parquet,part-00006-tid-7318516604140017921-10e68129-c4ca-43db-add3-a2625b59b589-167-1.c000.snappy.parquet,477059,1762640812000


In [0]:
from pyspark.sql import DataFrame

def inspect_dataframe(df: DataFrame, name: str, sample_rows: int = 5):
    print(f"\n {name} SCHEMA ")
    df.printSchema()

    print(f"\n  {name} COLUMN TYPES ")
    for field in df.schema.fields:
        print(f"{field.name:30}  →  {field.dataType}")

    print(f"\n  {name} SAMPLE DATA (first {sample_rows} rows) ")
    display(df.limit(sample_rows))

# Inspect all your key DataFrames
inspect_dataframe(agg_carrier, "agg_carrier")
inspect_dataframe(agg_monthly, "agg_monthly")
inspect_dataframe(agg_causes, "agg_causes")


===== 🔎 agg_carrier SCHEMA =====
root
 |-- carrier: string (nullable = true)
 |-- carrier_name: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- total_arr_flights: double (nullable = true)
 |-- total_arr_del15: double (nullable = true)
 |-- total_arr_delay_minutes: double (nullable = true)
 |-- total_arr_cancelled: double (nullable = true)
 |-- total_arr_diverted: double (nullable = true)
 |-- sum_carrier_ct: double (nullable = true)
 |-- sum_weather_ct: double (nullable = true)
 |-- sum_nas_ct: double (nullable = true)
 |-- sum_security_ct: double (nullable = true)
 |-- sum_late_aircraft_ct: double (nullable = true)
 |-- delay_rate: double (nullable = true)
 |-- avg_delay_per_flight: double (nullable = true)
 |-- cancel_rate: double (nullable = true)
 |-- divert_rate: double (nullable = true)
 |-- carrier_ct_pct: double (nullable = true)
 |-- weather_ct_pct: double (nullable = true)
 |-- nas_ct_pct: double (nullable = true)
 |--

carrier,carrier_name,year,month,total_arr_flights,total_arr_del15,total_arr_delay_minutes,total_arr_cancelled,total_arr_diverted,sum_carrier_ct,sum_weather_ct,sum_nas_ct,sum_security_ct,sum_late_aircraft_ct,delay_rate,avg_delay_per_flight,cancel_rate,divert_rate,carrier_ct_pct,weather_ct_pct,nas_ct_pct,security_ct_pct,late_aircraft_ct_pct
9E,Endeavor Air Inc.,2025,2,16896.0,3312.0,243201.0,339.0,41.0,681.68,121.5,1241.4599999999996,1.44,1265.9599999999998,0.1960227272727272,14.393998579545457,0.0200639204545454,0.0024266098484848,0.2058187703047065,0.0366843395611164,0.3748324295600294,0.0004347773577613798,0.3822296832163863
OH,PSA Airlines Inc.,2024,5,18829.0,5329.0,429500.0,178.0,64.0,1206.2900000000002,333.8700000000002,1532.06,17.5,2239.230000000001,0.2830208720590578,22.81055818152849,0.0094535025758139,0.0033990121620903,0.2263654190788054,0.062652117208831,0.2874975370382532,0.0032839489955807,0.4202009776785296
G7,GoJet Airlines LLC d/b/a United Express,2025,2,5220.0,1394.0,140921.0,150.0,6.0,289.44,25.21,461.6499999999999,0.0,617.72,0.2670498084291188,26.996360153256703,0.028735632183908,0.0011494252873563,0.207629732715456,0.0180843890331559,0.3311645457023571,0.0,0.4431213325490309
G7,GoJet Airlines LLC d/b/a United Express,2024,11,5558.0,1435.0,137827.0,40.0,14.0,303.82,8.28,503.42,0.16,619.2900000000001,0.2581863979848866,24.797948902482908,0.0071968333933069,0.0025188916876574,0.211725680676251,0.0057701554736335,0.350822665282201,0.00011150058886248492,0.4315699979790519
QX,Horizon Air,2025,1,7754.0,1285.0,66695.0,143.0,32.0,324.39000000000004,59.44,372.63,2.64,525.95,0.1657209182357493,8.601367036368327,0.0184420944028888,0.004126902244003,0.2524337574413447,0.046255009532703,0.289973152795611,0.0020543947706314,0.4092836854597097



===== 🔎 agg_monthly SCHEMA =====
root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year_month: string (nullable = false)
 |-- total_arr_flights: double (nullable = true)
 |-- total_arr_del15: double (nullable = true)
 |-- total_arr_delay_minutes: double (nullable = true)
 |-- total_arr_cancelled: double (nullable = true)
 |-- total_arr_diverted: double (nullable = true)
 |-- sum_carrier_ct: double (nullable = true)
 |-- sum_weather_ct: double (nullable = true)
 |-- sum_nas_ct: double (nullable = true)
 |-- sum_security_ct: double (nullable = true)
 |-- sum_late_aircraft_ct: double (nullable = true)
 |-- delay_rate: double (nullable = true)
 |-- avg_delay_per_flight: double (nullable = true)
 |-- cancel_rate: double (nullable = true)
 |-- divert_rate: double (nullable = true)
 |-- carrier_ct_pct: double (nullable = true)
 |-- weather_ct_pct: double (nullable = true)
 |-- nas_ct_pct: double (nullable = true)
 |-- security_ct_pct: double (nullable = tru

year,month,year_month,total_arr_flights,total_arr_del15,total_arr_delay_minutes,total_arr_cancelled,total_arr_diverted,sum_carrier_ct,sum_weather_ct,sum_nas_ct,sum_security_ct,sum_late_aircraft_ct,delay_rate,avg_delay_per_flight,cancel_rate,divert_rate,carrier_ct_pct,weather_ct_pct,nas_ct_pct,security_ct_pct,late_aircraft_ct_pct
2024,12,2024-12,631944.0,132515.0,9585044.0,4579.0,1752.0,43489.29000000008,5391.969999999997,34704.26999999997,308.71999999999974,48620.75999999997,0.2096942134113149,15.16755282113605,0.0072458952059043,0.0027723975542136,0.3281838789432237,0.0406895037777229,0.2618893512516051,0.0023296983488889,0.3669075676785593
2024,8,2024-08,660639.0,150010.0,11292440.0,13434.0,2174.0,46480.130000000005,6147.579999999993,41086.280000000086,331.8900000000001,55964.15999999992,0.2270680356442777,17.093208242322962,0.0203348576151271,0.0032907533463813,0.3098467942545712,0.040981123663456,0.273890200949217,0.0022124519132186,0.373069429219537
2025,5,2025-05,667586.0,155262.0,11438449.0,7549.0,1898.0,43212.07000000002,7216.090000000004,49775.12000000002,203.42,54855.019999999975,0.2325722828219884,17.134045651047206,0.0113079063970784,0.0028430793935163,0.2783176046226978,0.046476942288157,0.3205884876194854,0.0013101748454158,0.3533067906242438
2025,4,2025-04,644084.0,124749.0,8747498.0,5291.0,1443.0,37564.129999999925,4662.500000000001,36994.31999999997,231.10999999999996,45297.05999999998,0.1936843641512597,13.5812999546643,0.0082147670179666,0.0022403910048999,0.3011173946557696,0.0373750131463853,0.2965497472046296,0.0018525982387691,0.3631052467544463
2024,6,2024-06,651799.0,157404.0,11642182.0,8315.0,2105.0,50697.429999999935,6373.670000000002,38325.49000000005,435.89000000000016,61571.86999999999,0.2414916254857709,17.86161377970816,0.0127570002408718,0.0032295232119104,0.3220840465971871,0.0404923370923357,0.2434843128541241,0.0027692373177742,0.3911700661385787



===== 🔎 agg_causes SCHEMA =====
root
 |-- carrier: string (nullable = true)
 |-- carrier_name: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- sum_carrier_ct: double (nullable = true)
 |-- sum_weather_ct: double (nullable = true)
 |-- sum_nas_ct: double (nullable = true)
 |-- sum_security_ct: double (nullable = true)
 |-- sum_late_aircraft_ct: double (nullable = true)
 |-- total_cause_minutes: double (nullable = true)
 |-- carrier_ct_pct: double (nullable = true)
 |-- weather_ct_pct: double (nullable = true)
 |-- nas_ct_pct: double (nullable = true)
 |-- security_ct_pct: double (nullable = true)
 |-- late_aircraft_ct_pct: double (nullable = true)


===== 🧩 agg_causes COLUMN TYPES =====
carrier                         →  StringType()
carrier_name                    →  StringType()
year                            →  IntegerType()
month                           →  IntegerType()
sum_carrier_ct                  →  DoubleType()
sum_w

carrier,carrier_name,year,month,sum_carrier_ct,sum_weather_ct,sum_nas_ct,sum_security_ct,sum_late_aircraft_ct,total_cause_minutes,carrier_ct_pct,weather_ct_pct,nas_ct_pct,security_ct_pct,late_aircraft_ct_pct
9E,Endeavor Air Inc.,2025,2,681.68,121.5,1241.4599999999996,1.44,1265.9599999999998,3312.039999999999,20.58,3.67,37.48,0.04,38.22
OH,PSA Airlines Inc.,2024,5,1206.2900000000002,333.8700000000002,1532.06,17.5,2239.230000000001,5328.950000000001,22.64,6.27,28.75,0.33,42.02
G7,GoJet Airlines LLC d/b/a United Express,2025,2,289.44,25.21,461.6499999999999,0.0,617.72,1394.02,20.76,1.81,33.12,0.0,44.31
G7,GoJet Airlines LLC d/b/a United Express,2024,11,303.82,8.28,503.42,0.16,619.2900000000001,1434.97,21.17,0.58,35.08,0.01,43.16
QX,Horizon Air,2025,1,324.39000000000004,59.44,372.63,2.64,525.95,1285.0500000000002,25.24,4.63,29.0,0.21,40.93
