In [1]:
# Imports used across ETL exploration and final incremental pipeline
import time
import os, json, shutil
from datetime import datetime, timezone
from pyspark.sql.functions import col, current_timestamp, input_file_name, to_date, unix_timestamp, broadcast

In [2]:
# Centralized project paths for input, output, lookup, and ETL state
BASE_PATH = "/home/jovyan/work"

INBOX = f"{BASE_PATH}/data/inbox"
OUTBOX = f"{BASE_PATH}/data/outbox/trips_enriched.parquet"
LOOKUP = f"{BASE_PATH}/data/taxi_zone_lookup.parquet"
STATE = f"{BASE_PATH}/state/manifest.json"

In [3]:
# Read manifest file that tracks already processed input files
def load_manifest(path):
    if not os.path.exists(path):
        return {"processed_files": []}
    with open(path, "r") as f:
        return json.load(f)

In [4]:
# Persist updated manifest to keep ingestion incremental and idempotent
def save_manifest(path, manifest):
    with open(path, "w") as f:
        json.dump(manifest, f, indent=2)

In [5]:
# Compute full input list and the subset of files that are new in this run
manifest = load_manifest(STATE)
processed = set(x["filename"] for x in manifest["processed_files"])

all_files = sorted([f for f in os.listdir(INBOX) if f.endswith(".parquet")])
new_files = [f for f in all_files if f not in processed]

all_files, new_files

(['yellow_tripdata_2025-01.parquet', 'yellow_tripdata_2025-02.parquet'], [])

In [6]:
# Initialize Spark session for the ETL run
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Project1-ETL").getOrCreate()

## ETL rules used in the final run

This cell documents the business rules implemented in the final incremental Spark job.

- **Cleaning rules**: keep only rows where required fields are non-null, `passenger_count > 0`, `trip_distance > 0`, and `tpep_dropoff_datetime > tpep_pickup_datetime`.
- **Dedup key**: `VendorID`, `tpep_pickup_datetime`, `tpep_dropoff_datetime`, `PULocationID`, `DOLocationID`.
- **Incremental behavior**: process only files not present in `state/manifest.json`.
- **Idempotency**: anti-join new batch against existing outbox using the dedup key, then append only unseen records.

In [7]:
# Helper to verify whether output parquet folder already contains data files
def outbox_has_data(path: str) -> bool:
    return (
        os.path.exists(path)
        and os.path.isdir(path)
        and any(name.startswith("part-") and name.endswith(".parquet") for name in os.listdir(path))
    )

In [8]:
# Cleanup safety: remove empty/corrupted outbox folder before writing
if os.path.exists(OUTBOX) and not outbox_has_data(OUTBOX):
    print("OUTBOX exists but has no parquet parts. Deleting corrupted/empty outbox folder...")
    shutil.rmtree(OUTBOX)

In [9]:
# Log file discovery result for this run (all vs new files)
print("All files:", all_files)
print("New files:", new_files)

All files: ['yellow_tripdata_2025-01.parquet', 'yellow_tripdata_2025-02.parquet']
New files: []


In [10]:
# Final incremental ETL: process only new files, clean, dedup, enrich, append, and update manifest
if not new_files:
    print("Nothing new to process. Exiting without changes.")
else:
    import time
    start = time.time()
    now = datetime.now(timezone.utc).isoformat()

    new_paths = [os.path.join(INBOX, f) for f in new_files]
    df_new = spark.read.parquet(*new_paths).withColumn("source_file", input_file_name())

    # ---------- Types ----------
    df_typed = (
        df_new
        .withColumn("tpep_pickup_datetime", col("tpep_pickup_datetime").cast("timestamp"))
        .withColumn("tpep_dropoff_datetime", col("tpep_dropoff_datetime").cast("timestamp"))
        .withColumn("passenger_count", col("passenger_count").cast("int"))
        .withColumn("trip_distance", col("trip_distance").cast("double"))
        .withColumn("PULocationID", col("PULocationID").cast("int"))
        .withColumn("DOLocationID", col("DOLocationID").cast("int"))
        .withColumn("VendorID", col("VendorID").cast("int"))
    )

    # ---------- Cleaning ----------
    df_clean = df_typed.filter(
        col("tpep_pickup_datetime").isNotNull()
        & col("tpep_dropoff_datetime").isNotNull()
        & col("passenger_count").isNotNull()
        & col("trip_distance").isNotNull()
        & col("PULocationID").isNotNull()
        & col("DOLocationID").isNotNull()
        & col("VendorID").isNotNull()
        & (col("passenger_count") > 0)
        & (col("trip_distance") > 0)
        & (col("tpep_dropoff_datetime") > col("tpep_pickup_datetime"))
    )

    # ---------- Dedup ----------
    dedup_key = ["VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime", "PULocationID", "DOLocationID"]

    # 1) remove duplicados dentro do batch
    df_dedup_batch = df_clean.dropDuplicates(dedup_key)

    # 2) opcional: remove registros já existentes no OUTBOX (idempotência mesmo sem manifest)
    if outbox_has_data(OUTBOX):
        existing_keys = (
            spark.read.parquet(OUTBOX)
            .select(*dedup_key)
            .dropDuplicates(dedup_key)
        )
        df_dedup = df_dedup_batch.join(existing_keys, on=dedup_key, how="left_anti")
    else:
        df_dedup = df_dedup_batch

    # ---------- Enrichment ----------
    zones = spark.read.parquet(LOOKUP)

    zones_pickup = zones.select(
        col("LocationID").alias("PULocationID"),
        col("Zone").alias("pickup_zone")
    )

    zones_dropoff = zones.select(
        col("LocationID").alias("DOLocationID"),
        col("Zone").alias("dropoff_zone")
    )

    df_enriched = (
        df_dedup
        .join(broadcast(zones_pickup), on="PULocationID", how="left")
        .join(broadcast(zones_dropoff), on="DOLocationID", how="left")
    )

    # ---------- Derived columns ----------
    df_ready = (
        df_enriched
        .withColumn(
            "trip_duration_minutes",
            (unix_timestamp("tpep_dropoff_datetime") - unix_timestamp("tpep_pickup_datetime")) / 60.0
        )
        .withColumn("pickup_date", to_date("tpep_pickup_datetime"))
        .withColumn("ingested_at", current_timestamp())
    )

    # ---------- Select required output columns (helps grading) ----------
    df_out = df_ready.select(
        "tpep_pickup_datetime", "tpep_dropoff_datetime",
        "PULocationID", "DOLocationID",
        "pickup_zone", "dropoff_zone",
        "passenger_count", "trip_distance",
        "trip_duration_minutes", "pickup_date",
        "source_file", "ingested_at"
    )

    # ---------- Metrics for README ----------
    input_rows = df_new.count()
    clean_rows = df_clean.count()
    dedup_rows = df_dedup.count()
    final_rows = df_out.count()  # should match dedup_rows in most cases

    print("Row counts (new batch):", {
        "input": input_rows,
        "after_cleaning": clean_rows,
        "after_dedup": dedup_rows,
        "final_output": final_rows
    })

    # ---------- Write ----------
    if final_rows > 0:
        df_out.coalesce(4).write.mode("append").parquet(OUTBOX)
        wrote = final_rows
    else:
        wrote = 0

    # ---------- Runtime ----------
    end = time.time()
    print("Full job runtime (seconds):", round(end - start, 2))
    print("Rows written:", wrote)

    # ---------- Manifest update (always) ----------
    for f in new_files:
        file_path = os.path.join(INBOX, f)
        manifest["processed_files"].append({
            "filename": f,
            "size_bytes": os.path.getsize(file_path),
            "processed_at": now
        })

    save_manifest(STATE, manifest)

    print("✅ Run complete!")
    print("Output path:", OUTBOX)
    print("Manifest updated:", STATE)
    print("Cleaning rules: non-null required fields, passenger_count > 0, trip_distance > 0, dropoff > pickup")
    print("Dedup key:", dedup_key)


Nothing new to process. Exiting without changes.


In [11]:
bad_rows = df_typed.filter(
    (col("passenger_count") <= 0) |
    (col("trip_distance") <= 0) |
    (col("tpep_dropoff_datetime") <= col("tpep_pickup_datetime"))
)

bad_rows.select(
    "passenger_count",
    "trip_distance",
    "tpep_pickup_datetime",
    "tpep_dropoff_datetime"
).show(3, truncate=False)


NameError: name 'df_new' is not defined

In [None]:
df_final = spark.read.parquet(OUTBOX)
df_final.show(5, truncate=False)