# Project 1

### Group J

In [1]:
# Imports
from datetime import datetime, timezone
import os
import json

spark_ok = True
try:
    import pyspark.sql.functions as F
    from pyspark.sql import SparkSession
    from pyspark.context import SparkContext
except Exception as e:
    spark_ok = False
    print("PySpark not available:", e)

In [2]:
# Init Spark

sc = SparkContext('local', 'project_1')

spark = (
    SparkSession.builder
    .appName('project_1')
    .getOrCreate()
)

print(spark.version)

4.1.0


In [17]:
# Variables, helpers

INBOX_DIR = "data/inbox"
STATE_DIR = "state"
MANIFEST_PATH = os.path.join(STATE_DIR, "manifest.json")
LOOKUP_TABLE = "taxi_zone_lookup.parquet"

OUTBOX_DIR = "data/outbox"
OUTPUT_PATH = os.path.join(OUTBOX_DIR, "trips_enriched.parquet")


def load_manifest(path):
    if os.path.exists(path):
        with open(path, "r", encoding="utf-8") as f:
            return json.load(f)
    return {"processed_files": []}


def save_manifest(path, manifest_obj):
    os.makedirs(os.path.dirname(path), exist_ok=True)
    tmp = path + ".tmp"
    with open(tmp, "w", encoding="utf-8") as f:
        json.dump(manifest_obj, f, indent=2)
    os.replace(tmp, path)


def list_parquet_files(inbox_dir):
    if not os.path.isdir(inbox_dir):
        return []
    return sorted(
        os.path.join(inbox_dir, fn)
        for fn in os.listdir(inbox_dir)
        if fn.lower().endswith(".parquet")
    )


def file_size_bytes(path):
    try:
        return os.path.getsize(path)
    except OSError:
        return None


def read_existing_output(spark, output_path):
    if os.path.exists(output_path):
        return spark.read.parquet(output_path)
    return None

In [18]:
# Load data

manifest = load_manifest(MANIFEST_PATH)
already_processed = {x["file"] for x in manifest.get("processed_files", [])}

inbox_files = list_parquet_files(INBOX_DIR)

# Ignore lookup table file if its already in inbox
candidate_trip_files = [
    p for p in inbox_files
    if os.path.basename(p) != LOOKUP_TABLE
]

new_trip_files = [
    p for p in candidate_trip_files
    if os.path.basename(p) not in already_processed
]

print(f"Total parquet files in inbox: {len(inbox_files)}")
print(f"Trip candidate files: {len(candidate_trip_files)}")
print(f"New trip files to process: {len(new_trip_files)}")
for p in new_trip_files:
    print("  NEW:", os.path.basename(p))

Total parquet files in inbox: 3
Trip candidate files: 2
New trip files to process: 2
  NEW: yellow_tripdata_2025-01.parquet
  NEW: yellow_tripdata_2025-02.parquet


In [19]:
# Load zone lookup table

LOOKUP_PATH = os.path.join(INBOX_DIR, LOOKUP_TABLE)

lookup_df = (
    spark.read.parquet(LOOKUP_PATH)
    .select(
        F.col("LocationID").cast("int").alias("LocationID"),
        F.col("Borough").alias("Borough"),
        F.col("Zone").alias("Zone"),
        F.col("service_zone").alias("service_zone"),
    )
)

lookup_df.printSchema()
lookup_df.show(5, truncate=False)

root
 |-- LocationID: integer (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)

+----------+-------------+-----------------------+------------+
|LocationID|Borough      |Zone                   |service_zone|
+----------+-------------+-----------------------+------------+
|1         |EWR          |Newark Airport         |EWR         |
|2         |Queens       |Jamaica Bay            |Boro Zone   |
|3         |Bronx        |Allerton/Pelham Gardens|Boro Zone   |
|4         |Manhattan    |Alphabet City          |Yellow Zone |
|5         |Staten Island|Arden Heights          |Boro Zone   |
+----------+-------------+-----------------------+------------+
only showing top 5 rows


In [20]:
# Minimum transformations

def normalize_trip_schema(df):
    """
    Normalizes both Yellow (tpep_*) and Green (lpep_*) taxi schemas to:
      pickup_ts, dropoff_ts, pickup_location_id, dropoff_location_id,
      passenger_count, trip_distance
    """
    if "tpep_pickup_datetime" in df.columns:
        pickup_col = "tpep_pickup_datetime"
        dropoff_col = "tpep_dropoff_datetime"
    elif "lpep_pickup_datetime" in df.columns:
        pickup_col = "lpep_pickup_datetime"
        dropoff_col = "lpep_dropoff_datetime"
    else:
        raise ValueError("Could not find pickup/dropoff datetime columns (tpep_* or lpep_*)")

    df = df.withColumnRenamed(pickup_col, "pickup_ts").withColumnRenamed(dropoff_col, "dropoff_ts")

    if "PULocationID" in df.columns:
        df = df.withColumnRenamed("PULocationID", "pickup_location_id")
    if "DOLocationID" in df.columns:
        df = df.withColumnRenamed("DOLocationID", "dropoff_location_id")

    return df


def transform_minimum(raw_df):
    """
    Minimum required transformations:
      1) parse/cast types
      2) clean invalids/nulls (rules documented below)
      3) deduplicate by defined key
    """
    df = normalize_trip_schema(raw_df)

    # Cast/parse types
    df = (
        df
        .withColumn("pickup_ts", F.to_timestamp("pickup_ts"))
        .withColumn("dropoff_ts", F.to_timestamp("dropoff_ts"))
        .withColumn("pickup_location_id", F.col("pickup_location_id").cast("int"))
        .withColumn("dropoff_location_id", F.col("dropoff_location_id").cast("int"))
        .withColumn("passenger_count", F.col("passenger_count").cast("int"))
        .withColumn("trip_distance", F.col("trip_distance").cast("double"))
    )

    # Cleaning rules:
    # - Remove rows with missing timestamps
    # - Remove rows where dropoff <= pickup
    df = df.filter(F.col("pickup_ts").isNotNull() & F.col("dropoff_ts").isNotNull())
    df = df.filter(F.col("dropoff_ts") > F.col("pickup_ts"))

    # - Remove rows with missing or non-positive location IDs
    df = df.filter(F.col("pickup_location_id").isNotNull() & (F.col("pickup_location_id") > 0))
    df = df.filter(F.col("dropoff_location_id").isNotNull() & (F.col("dropoff_location_id") > 0))

    # - Passenger count: null -> 0, then remove negative
    df = df.withColumn("passenger_count", F.coalesce(F.col("passenger_count"), F.lit(0)))
    df = df.filter(F.col("passenger_count") >= 0)

    # - Trip distance: null -> 0.0, then remove <= 0
    df = df.withColumn("trip_distance", F.coalesce(F.col("trip_distance"), F.lit(0.0)))
    df = df.filter(F.col("trip_distance") > 0.0)

    # ---- Deduplication ----
    dedup_key = [
        "source_file",
        "pickup_ts", "dropoff_ts",
        "pickup_location_id", "dropoff_location_id",
        "passenger_count", "trip_distance",
    ]
    df = df.dropDuplicates(dedup_key)

    # Derived fields
    df = (
        df
        .withColumn(
            "trip_duration_minutes",
            (F.col("dropoff_ts").cast("long") - F.col("pickup_ts").cast("long")) / F.lit(60.0)
        )
        .withColumn("pickup_date", F.to_date("pickup_ts"))
    )

    return df

In [21]:
# Enrichment functions

def enrich_with_zones(df, lookup_df):
    lk = lookup_df.select("LocationID", F.col("Zone").alias("zone_name"))

    pu = lk.select(
        F.col("LocationID").alias("pickup_location_id"),
        F.col("zone_name").alias("pickup_zone_name")
    )
    do = lk.select(
        F.col("LocationID").alias("dropoff_location_id"),
        F.col("zone_name").alias("dropoff_zone_name")
    )

    # broadcast is a good optimization for small dimension tables
    df = df.join(F.broadcast(pu), on="pickup_location_id", how="left")
    df = df.join(F.broadcast(do), on="dropoff_location_id", how="left")
    return df


def select_required_fields(df):
    return df.select(
        "pickup_ts", "dropoff_ts",
        "pickup_location_id", "dropoff_location_id",
        "pickup_zone_name", "dropoff_zone_name",
        "passenger_count", "trip_distance",
        "trip_duration_minutes", "pickup_date",
        "source_file", "ingested_at",
    )

In [22]:
# Correctness helper for sampling bad rows before cleaning

def find_bad_rows_examples(raw_df, limit=3):
    df = normalize_trip_schema(raw_df)

    df = (
        df
        .withColumn("pickup_ts", F.to_timestamp("pickup_ts"))
        .withColumn("dropoff_ts", F.to_timestamp("dropoff_ts"))
        .withColumn("pickup_location_id", F.col("pickup_location_id").cast("int"))
        .withColumn("dropoff_location_id", F.col("dropoff_location_id").cast("int"))
        .withColumn("passenger_count", F.col("passenger_count").cast("int"))
        .withColumn("trip_distance", F.col("trip_distance").cast("double"))
    )

    bad_conditions = (
        F.col("pickup_ts").isNull()
        | F.col("dropoff_ts").isNull()
        | (F.col("dropoff_ts") <= F.col("pickup_ts"))
        | F.col("pickup_location_id").isNull() | (F.col("pickup_location_id") <= 0)
        | F.col("dropoff_location_id").isNull() | (F.col("dropoff_location_id") <= 0)
        | F.col("passenger_count").isNull() | (F.col("passenger_count") < 0)
        | F.col("trip_distance").isNull() | (F.col("trip_distance") <= 0)
    )

    return df.filter(bad_conditions).select(
        "pickup_ts", "dropoff_ts",
        "pickup_location_id", "dropoff_location_id",
        "passenger_count", "trip_distance",
        "source_file"
    ).limit(limit)

In [23]:
# INCREMENTAL RUN JOB
# read -> clean -> enrich -> merge -> write -> manifest

if not new_trip_files:
    print("No new files to process. Job is idempotent; nothing to do.")
else:
    # Read new data
    new_raw_df = (
        spark.read.parquet(*new_trip_files)
        .withColumn("source_file", F.regexp_extract(F.input_file_name(), r"([^/\\\\]+)$", 1))
        .withColumn("ingested_at", F.current_timestamp())
    )

    print("\nNew raw schema:")
    new_raw_df.printSchema()

    # Correctness evidence
    input_rows = new_raw_df.count()
    print("\nInput rows (new files):", input_rows)

    print("\nBad row examples (up to 3):")  # Bad row examples
    for r in find_bad_rows_examples(new_raw_df, limit=3).collect():
        print(r)

    # Transform
    new_clean_df = transform_minimum(new_raw_df).cache()
    after_clean_rows = new_clean_df.count()
    print("\nRows after cleaning + dedup (new batch):", after_clean_rows)

    # Enrich
    new_enriched_df = enrich_with_zones(new_clean_df, lookup_df)
    new_enriched_df = select_required_fields(new_enriched_df)

    # Merge with previous output
    existing_df = read_existing_output(spark, OUTPUT_PATH)
    if existing_df is None:
        prev_rows = 0
        merged_df = new_enriched_df
    else:
        prev_rows = existing_df.count()
        merged_df = existing_df.unionByName(new_enriched_df, allowMissingColumns=True)

    # Global dedup
    global_dedup_key = [
        "source_file",
        "pickup_ts", "dropoff_ts",
        "pickup_location_id", "dropoff_location_id",
        "passenger_count", "trip_distance",
    ]
    final_df = merged_df.dropDuplicates(global_dedup_key)

    final_rows = final_df.count()
    print("\nPrevious output rows:", prev_rows)
    print("Final output rows after merge:", final_rows)

    # Write output
    os.makedirs(OUTBOX_DIR, exist_ok=True)
    (
        final_df
        .coalesce(1)
        .write
        .mode("overwrite")
        .parquet(OUTPUT_PATH)
    )
    print("\nWrote output dataset to:", OUTPUT_PATH)

    # Update manifest
    processed_records = manifest.get("processed_files", [])
    ingested_time_str = datetime.now(timezone.utc).isoformat()

    for p in new_trip_files:
        processed_records.append({
            "file": os.path.basename(p),
            "size_bytes": file_size_bytes(p),
            "ingested_utc": ingested_time_str,
        })

    manifest["processed_files"] = processed_records
    save_manifest(MANIFEST_PATH, manifest)
    print("Updated manifest:", MANIFEST_PATH)


New raw schema:
root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)
 |-- cbd_congestion_fee: double (nullable = true)
 |-- source_file: string (nullable = false)
 |-- ingested_a