In [1]:
from prefect import flow
from prefect.logging import get_run_logger
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.functions import (
        col, to_date, concat_ws, lpad, try_to_timestamp, lit
    )
import duckdb
import pandas as pd

@flow(name="FlightDelayETL-Pipeline")
def flight_delay_pipeline():
    logger = get_run_logger()
    logger.info("Pipeline started")

    # =========================
    # importing cell
    # =========================

    

    spark = (
        SparkSession.builder
        .appName("FlightDelayETL")
        .config("spark.sql.parquet.enableVectorizedReader", "false")
        .getOrCreate()
    )

    spark.sparkContext.setLogLevel("WARN")

    # =========================
    # data loading cell
    # =========================

    flights_df = spark.read.parquet("data/source/flight_all")

    weather_df = (
        spark.read
        .option("multiline", "true")
        .option("mode", "PERMISSIVE")
        .json("data/source/weather_all")
    )

    airports_df = (
        spark.read
        .option("header", True)
        .option("inferSchema", True)
        .csv("data/source/airport-codes.csv")
    )

    print("Flights:", flights_df.count())
    print("Weather:", weather_df.count())
    print("Airports:", airports_df.count())

    # =========================
    # cleaning flights
    # =========================

    

    flights_clean = (
        flights_df
        .filter(col("Cancelled") == False)
        .withColumn("flight_date", to_date(col("FlightDate")))
        .withColumn(
            "scheduled_dep",
            try_to_timestamp(
                concat_ws(
                    " ",
                    col("flight_date").cast("string"),
                    lpad(col("CRSDepTime").cast("string"), 4, "0")
                ),
                lit("yyyy-MM-dd HHmm")
            )
        )
    )

    # =========================
    # airports cleaning
    # =========================

    airports_fixed = (
        airports_df
        .filter(col("iata_code").isNotNull())
        .withColumn("lat_str", trim(split(col("coordinates"), ",")[0]))
        .withColumn("lon_str", trim(split(col("coordinates"), ",")[1]))
        .withColumn("latitude", col("lat_str").cast("double"))
        .withColumn("longitude", col("lon_str").cast("double"))
        .select(
            col("iata_code"),
            col("name"),
            col("latitude"),
            col("longitude")
        )
    )

    orig_airports = airports_fixed.select(
        col("iata_code").alias("origin"),
        col("latitude").alias("origin_lat"),
        col("longitude").alias("origin_lon"),
        col("name").alias("origin_name")
    )

    dest_airports = airports_fixed.select(
        col("iata_code").alias("dest"),
        col("latitude").alias("dest_lat"),
        col("longitude").alias("dest_lon"),
        col("name").alias("dest_name")
    )

    # =========================
    # weather cleaning
    # =========================

    weather_clean = (
        weather_df
        .withColumnRenamed("latitude", "weather_lat")
        .withColumnRenamed("longitude", "weather_lon")
        .select(
            "weather_lat",
            "weather_lon",
            posexplode("hourly.time").alias("idx", "weather_time"),
            col("hourly.temperature_2m").alias("temperature_2m"),
            col("hourly.precipitation").alias("precipitation"),
            col("hourly.windspeed_10m").alias("windspeed_10m")
        )
        .withColumn("weather_time", to_timestamp("weather_time"))
        .withColumn("weather_hour", date_trunc("hour", col("weather_time")))
        .select(
            "weather_lat",
            "weather_lon",
            "weather_hour",
            col("temperature_2m")[col("idx")].alias("temperature_2m"),
            col("precipitation")[col("idx")].alias("precipitation"),
            col("windspeed_10m")[col("idx")].alias("windspeed_10m")
        )
    )

    # =========================
    # enrichment joins
    # =========================

    flights_enriched = (
        flights_clean
        .join(broadcast(orig_airports), "origin", "left")
        .join(broadcast(dest_airports), "dest", "left")
    )

    flights_final = (
        flights_enriched
        .withColumn("scheduled_hour", date_trunc("hour", col("scheduled_dep")))
        .join(
            weather_clean,
            (flights_enriched.origin_lat == weather_clean.weather_lat) &
            (flights_enriched.origin_lon == weather_clean.weather_lon) &
            (col("scheduled_hour") == weather_clean.weather_hour),
            "left"
        )
    )

    final_df = flights_final.select(
        "flight_date",
        "origin",
        "origin_name",
        "dest",
        "dest_name",
        "scheduled_dep",
        "temperature_2m",
        "precipitation",
        "windspeed_10m",
        "ArrDelay",
        "DepDelay"
    )

    final_df.show(10, truncate=False)
    print("Final row count:", final_df.count())

    # =========================
    # write parquet
    # =========================

    (
        final_df
        .write
        .mode("overwrite")
        .partitionBy("flight_date")
        .parquet("data/final/flights_enriched.parquet")
    )

    # =========================
    # DuckDB load & analytics
    # =========================

    

    con = duckdb.connect(database="flights_analysis.duckdb", read_only=False)

    con.execute("""
        CREATE OR REPLACE TABLE flights_enriched AS
        SELECT * FROM read_parquet('data/final/flights_enriched.parquet/**/*.parquet')
    """)

    avg_delay_by_origin = con.execute("""
        SELECT 
            origin,
            origin_name,
            COUNT(*) AS flights,
            AVG(DepDelay) AS avg_dep_delay,
            AVG(ArrDelay) AS avg_arr_delay
        FROM flights_enriched
        WHERE DepDelay IS NOT NULL
        GROUP BY origin, origin_name
        ORDER BY avg_dep_delay DESC
        LIMIT 15
    """).df()

    logger.info("Pipeline completed successfully")


if __name__ == "__main__":
    flight_delay_pipeline()


Flights: 29193782
Weather: 91
Airports: 82808
+-----------+------+------------------------------------------------+----+------------------------------------------------+-------------------+--------------+-------------+-------------+--------+--------+
|flight_date|origin|origin_name                                     |dest|dest_name                                       |scheduled_dep      |temperature_2m|precipitation|windspeed_10m|ArrDelay|DepDelay|
+-----------+------+------------------------------------------------+----+------------------------------------------------+-------------------+--------------+-------------+-------------+--------+--------+
|2018-01-23 |ABY   |Southwest Georgia Regional Airport              |ATL |Hartsfield Jackson Atlanta International Airport|2018-01-23 12:02:00|NULL          |NULL         |NULL         |-8.0    |-5.0    |
|2018-01-24 |ABY   |Southwest Georgia Regional Airport              |ATL |Hartsfield Jackson Atlanta International Airport|2018-01-24 