In [0]:
# To access Kaggle datasets, you must authenticate using a Kaggle API token.
# Download your 'kaggle.json' from your Kaggle account settings and provide its path below.
kaggle_json_path = "kaggle.json"


dataset_name = "Flight Delay and Cancellation Dataset (2019-2023)"
dataset_url = "https://www.kaggle.com/api/v1/datasets/download/patrickzel/flight-delay-and-cancellation-dataset-2019-2023"
download_dir = f"/tmp" # in regular Databricks env we could write to our workspace or other DFS path

In [0]:
# Load Kaggle API credentials from the provided JSON file
import json
with open(kaggle_json_path, 'r') as f:
    kaggle_creds = json.load(f)
KAGGLE_USERNAME = kaggle_creds['username']
KAGGLE_KEY = kaggle_creds['key']

In [0]:
# download the dataset using the Kaggle API
import requests
import os

os.makedirs(download_dir, exist_ok=True)

session = requests.Session()
session.auth = (KAGGLE_USERNAME, KAGGLE_KEY)
response = session.get(dataset_url, stream=True)
if response.status_code != 200:
    raise Exception(f"Failed to download file, status code: {response.status_code}")

zip_path = os.path.join(download_dir, dataset_name + ".zip")
with open(zip_path, "wb") as f:
    for chunk in response.iter_content(chunk_size=10485760):
        if chunk:
            f.write(chunk)

print(f"File downloaded to {zip_path}")

import zipfile
with zipfile.ZipFile(zip_path, "r") as zip_ref:
    zip_ref.extractall(download_dir)

print(f"Files extracted to {download_dir}")

File downloaded to /tmp/Flight Delay and Cancellation Dataset (2019-2023).zip
Files extracted to /tmp


In [0]:
flights_name = "flights_sample_3m.csv"
flights_path = os.path.join(download_dir, flights_name)

import pandas as pd
import os

df_flights = pd.read_csv(flights_path)
print("Shape of the df: ", df_flights.shape)

Shape of the df:  (3000000, 32)


In [0]:
df_flights.display()

com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$5(SequenceExecutionState.scala:139)
	at scala.Option.getOrElse(Option.scala:201)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:139)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:136)
	at scala.collection.immutable.Range.foreach(Range.scala:192)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:136)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:721)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:441)
	at scala.Option.getOrElse(Option.scala:201)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:441)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.can

In [0]:
# Use a Unity Catalog catalog and schema, e.g., "main.bronze"
flights_permanent_name = "flights_permanent"

# Drop table if exists (in Unity Catalog)
spark.sql(
    f"DROP TABLE IF EXISTS {flights_permanent_name}"
)
spark_df_flights = spark.createDataFrame(df_flights)
# Save as permanent table (Delta format by default)
spark_df_flights.write.format("delta").mode("overwrite").saveAsTable(
    flights_permanent_name
)

In [0]:
%sql
SELECT * FROM flights_permanent LIMIT 10

FL_DATE,AIRLINE,AIRLINE_DOT,AIRLINE_CODE,DOT_CODE,FL_NUMBER,ORIGIN,ORIGIN_CITY,DEST,DEST_CITY,CRS_DEP_TIME,DEP_TIME,DEP_DELAY,TAXI_OUT,WHEELS_OFF,WHEELS_ON,TAXI_IN,CRS_ARR_TIME,ARR_TIME,ARR_DELAY,CANCELLED,CANCELLATION_CODE,DIVERTED,CRS_ELAPSED_TIME,ELAPSED_TIME,AIR_TIME,DISTANCE,DELAY_DUE_CARRIER,DELAY_DUE_WEATHER,DELAY_DUE_NAS,DELAY_DUE_SECURITY,DELAY_DUE_LATE_AIRCRAFT
2023-05-26,Envoy Air,Envoy Air: MQ,MQ,20398,3778,SGF,"Springfield, MO",DFW,"Dallas/Fort Worth, TX",852,841.0,-11.0,10.0,851.0,944.0,10.0,1027,954.0,-33.0,0.0,,0.0,95.0,73.0,53.0,364.0,,,,,
2019-06-04,SkyWest Airlines Inc.,SkyWest Airlines Inc.: OO,OO,20304,5395,FAT,"Fresno, CA",SFO,"San Francisco, CA",1751,1755.0,4.0,17.0,1812.0,1840.0,4.0,1854,1844.0,-10.0,0.0,,0.0,63.0,49.0,28.0,158.0,,,,,
2022-12-06,Southwest Airlines Co.,Southwest Airlines Co.: WN,WN,19393,887,OKC,"Oklahoma City, OK",DEN,"Denver, CO",935,939.0,4.0,20.0,959.0,1020.0,6.0,1020,1026.0,6.0,0.0,,0.0,105.0,107.0,81.0,495.0,,,,,
2023-03-20,Republic Airline,Republic Airline: YX,YX,20452,4520,JFK,"New York, NY",BNA,"Nashville, TN",1705,1657.0,-8.0,18.0,1715.0,1822.0,6.0,1846,1828.0,-18.0,0.0,,0.0,161.0,151.0,127.0,765.0,,,,,
2020-01-21,Spirit Air Lines,Spirit Air Lines: NK,NK,20416,1960,FLL,"Fort Lauderdale, FL",SJU,"San Juan, PR",1930,1929.0,-1.0,16.0,1945.0,2247.0,5.0,2259,2252.0,-7.0,0.0,,0.0,149.0,143.0,122.0,1046.0,,,,,
2020-03-18,American Airlines Inc.,American Airlines Inc.: AA,AA,19805,510,PHX,"Phoenix, AZ",DCA,"Washington, DC",840,830.0,-10.0,18.0,848.0,1516.0,4.0,1601,1520.0,-41.0,0.0,,0.0,261.0,230.0,208.0,1979.0,,,,,
2023-06-25,Allegiant Air,Allegiant Air: G4,G4,20368,2837,TRI,"Bristol/Johnson City/Kingsport, TN",SFB,"Sanford, FL",2212,2357.0,105.0,4.0,1.0,116.0,5.0,2344,121.0,97.0,0.0,,0.0,92.0,84.0,75.0,535.0,0.0,0.0,3.0,0.0,94.0
2021-07-06,Delta Air Lines Inc.,Delta Air Lines Inc.: DL,DL,19790,2082,DTW,"Detroit, MI",MSY,"New Orleans, LA",850,843.0,-7.0,11.0,854.0,951.0,4.0,1021,955.0,-26.0,0.0,,0.0,151.0,132.0,117.0,926.0,,,,,
2019-01-31,Alaska Airlines Inc.,Alaska Airlines Inc.: AS,AS,19930,683,MCI,"Kansas City, MO",SEA,"Seattle, WA",700,828.0,88.0,22.0,850.0,1030.0,6.0,905,1036.0,91.0,0.0,,0.0,245.0,248.0,220.0,1489.0,88.0,0.0,3.0,0.0,0.0
2022-12-02,American Airlines Inc.,American Airlines Inc.: AA,AA,19805,2343,DFW,"Dallas/Fort Worth, TX",COS,"Colorado Springs, CO",1254,1252.0,-2.0,16.0,1308.0,1346.0,7.0,1351,1353.0,2.0,0.0,,0.0,117.0,121.0,98.0,592.0,,,,,


In [0]:
%sql
CREATE OR REPLACE TABLE flights_clean AS
WITH t AS (
  SELECT
    *,
    -- converting e.g. 5, 945, 1530 to 0005, 0945, 1530
    LPAD(CAST(CRS_DEP_TIME AS STRING), 4, '0') AS crs_dep_str,
    LPAD(CAST(DEP_TIME     AS STRING), 4, '0') AS dep_str,
    LPAD(CAST(CRS_ARR_TIME AS STRING), 4, '0') AS crs_arr_str,
    LPAD(CAST(ARR_TIME     AS STRING), 4, '0') AS arr_str
  FROM flights_permanent
)
SELECT
  -- flight date
  TO_DATE(FL_DATE, 'yyyy-MM-dd') AS fl_date,

  -- basic info
  AIRLINE_CODE,
  AIRLINE,
  ORIGIN,
  ORIGIN_CITY,
  DEST,
  DEST_CITY,
  DEP_DELAY,
  ARR_DELAY,
  CANCELLED,
  CANCELLATION_CODE,

  -- timestamps (errors converted to NULL via try_to_timestamp)
  try_to_timestamp(
    CONCAT(FL_DATE, ' ',
           SUBSTRING(crs_dep_str, 1, 2), ':',
           SUBSTRING(crs_dep_str, 3, 2)),
    'yyyy-MM-dd HH:mm'
  ) AS crs_dep_ts,

  try_to_timestamp(
    CONCAT(FL_DATE, ' ',
           SUBSTRING(dep_str, 1, 2), ':',
           SUBSTRING(dep_str, 3, 2)),
    'yyyy-MM-dd HH:mm'
  ) AS dep_ts,

  try_to_timestamp(
    CONCAT(FL_DATE, ' ',
           SUBSTRING(crs_arr_str, 1, 2), ':',
           SUBSTRING(crs_arr_str, 3, 2)),
    'yyyy-MM-dd HH:mm'
  ) AS crs_arr_ts,

  try_to_timestamp(
    CONCAT(FL_DATE, ' ',
           SUBSTRING(arr_str, 1, 2), ':',
           SUBSTRING(arr_str, 3, 2)),
    'yyyy-MM-dd HH:mm'
  ) AS arr_ts,

  -- simple flags
  CASE WHEN DEP_DELAY > 15 THEN 1 ELSE 0 END AS is_dep_delayed_15m,
  CASE WHEN ARR_DELAY > 15 THEN 1 ELSE 0 END AS is_arr_delayed_15m,
  CASE WHEN CANCELLED = 1 THEN 1 ELSE 0 END AS is_cancelled

FROM t;

num_affected_rows,num_inserted_rows


In [0]:
%sql
CREATE OR REPLACE TABLE silver_flights AS
WITH formatted_strings AS (
  SELECT
    *,
    -- Standardizing hours to 4-digit format (e.g. '5' -> '0005')
    LPAD(CAST(CRS_DEP_TIME AS STRING), 4, '0') AS crs_dep_str,
    LPAD(CAST(DEP_TIME     AS STRING), 4, '0') AS dep_str,
    LPAD(CAST(CRS_ARR_TIME AS STRING), 4, '0') AS crs_arr_str,
    LPAD(CAST(ARR_TIME     AS STRING), 4, '0') AS arr_str
  FROM flights_permanent
)
SELECT
  -- 1. Keys and Dates
  f.FL_DATE,
  TO_DATE(f.FL_DATE, 'yyyy-MM-dd') AS fl_date_iso,
  
  -- 2. Airlines
  f.AIRLINE_CODE,
  f.AIRLINE AS airline_name, 

  -- 3. Origin Airport - JOIN with airports_data
  f.ORIGIN AS origin_code,
  COALESCE(ap_org.name, f.ORIGIN_CITY) AS origin_airport_name,
  f.ORIGIN_CITY AS origin_city,
  ap_org.lat AS origin_lat,
  ap_org.lon AS origin_lon,

  -- 4. Destination Airport - JOIN with airports_data
  f.DEST AS dest_code,
  COALESCE(ap_dst.name, f.DEST_CITY) AS dest_airport_name,
  f.DEST_CITY AS dest_city,
  ap_dst.lat AS dest_lat,
  ap_dst.lon AS dest_lon,

  -- 5. Timestamps (String to timestamp conversion)
  try_to_timestamp(CONCAT(f.FL_DATE, ' ', SUBSTRING(f.crs_dep_str, 1, 2), ':', SUBSTRING(f.crs_dep_str, 3, 2)), 'yyyy-MM-dd HH:mm') AS crs_dep_ts,
  try_to_timestamp(CONCAT(f.FL_DATE, ' ', SUBSTRING(f.dep_str, 1, 2), ':', SUBSTRING(f.dep_str, 3, 2)), 'yyyy-MM-dd HH:mm') AS dep_ts,
  try_to_timestamp(CONCAT(f.FL_DATE, ' ', SUBSTRING(f.crs_arr_str, 1, 2), ':', SUBSTRING(f.crs_arr_str, 3, 2)), 'yyyy-MM-dd HH:mm') AS crs_arr_ts,
  try_to_timestamp(CONCAT(f.FL_DATE, ' ', SUBSTRING(f.arr_str, 1, 2), ':', SUBSTRING(f.arr_str, 3, 2)), 'yyyy-MM-dd HH:mm') AS arr_ts,

  -- 6. Metrics and Flags (0/1)
  f.DEP_DELAY,
  f.ARR_DELAY,
  f.CANCELLED,
  f.CANCELLATION_CODE,
  CASE WHEN f.DEP_DELAY > 15 THEN 1 ELSE 0 END AS is_dep_delayed_15m,
  CASE WHEN f.ARR_DELAY > 15 THEN 1 ELSE 0 END AS is_arr_delayed_15m,
  CASE WHEN f.CANCELLED = 1 THEN 1 ELSE 0 END AS is_cancelled

FROM formatted_strings f
-- JOIN by IATA code (crucial for correct airport names)
LEFT JOIN airports_data ap_org ON f.ORIGIN = ap_org.iata
LEFT JOIN airports_data ap_dst ON f.DEST = ap_dst.iata;

num_affected_rows,num_inserted_rows


In [0]:
%sql
CREATE OR REPLACE TABLE gold_airline_stats AS
SELECT
  AIRLINE_CODE,
  airline_name,
  
  COUNT(*) AS total_flights,
  AVG(ARR_DELAY) AS avg_arr_delay_min,
  
  -- For airlines, arrival delay is more important (passenger experience)
  ROUND(AVG(is_arr_delayed_15m), 4) AS arrival_delay_pct,
  ROUND(AVG(is_cancelled), 4) AS cancel_pct

FROM silver_flights
GROUP BY 1, 2;

num_affected_rows,num_inserted_rows


In [0]:
%sql
CREATE OR REPLACE TABLE gold_airline_stats AS
SELECT
  AIRLINE_CODE,
  airline_name,
  
  COUNT(*) AS total_flights,
  AVG(ARR_DELAY) AS avg_arr_delay_min,
  
  -- Dla linii ważniejsze jest opóźnienie przylotu (doświadczenie pasażera)
  ROUND(AVG(is_arr_delayed_15m), 4) AS arrival_delay_pct,
  ROUND(AVG(is_cancelled), 4) AS cancel_pct

FROM silver_flights
GROUP BY 1, 2;

num_affected_rows,num_inserted_rows


In [0]:
%sql
CREATE OR REPLACE TABLE gold_route_risk AS
SELECT
  origin_code,
  origin_city,
  dest_code,
  dest_city,
  
  COUNT(*) AS total_flights,
  ROUND(AVG(is_dep_delayed_15m), 4) AS delay_pct,

  -- === NEW: Distance Calculation (Haversine Formula) ===
  -- 6371 is Earth's radius in km. We use AVG() because we group by route, 
  -- and distance for a given route is constant (averaging changes nothing but avoids SQL error).
  CAST(
    AVG(
      6371 * 2 * ASIN(SQRT(
        POWER(SIN(RADIANS(dest_lat - origin_lat) / 2), 2) +
        COS(RADIANS(origin_lat)) * COS(RADIANS(dest_lat)) *
        POWER(SIN(RADIANS(dest_lon - origin_lon) / 2), 2)
      ))
    ) AS INT
  ) AS distance_km,

  -- Risk model business logic
  CASE 
    WHEN AVG(is_dep_delayed_15m) >= 0.30 THEN 'High Risk'
    WHEN AVG(is_dep_delayed_15m) >= 0.15 THEN 'Medium Risk'
    ELSE 'Low Risk'
  END AS risk_category,
  
  -- Ready label
  CONCAT(origin_code, ' -> ', dest_code) AS route_label

FROM silver_flights
-- Important: skip rows where coordinates could not be matched (missing in dictionary)
-- so math doesn't return NULL
WHERE origin_lat IS NOT NULL AND dest_lat IS NOT NULL

GROUP BY 1, 2, 3, 4
HAVING count(*) > 10;

num_affected_rows,num_inserted_rows
