### Load
Idempotent monthly load: Before importing a given month, any existing rows for that source file are removed using DELETE WHERE source_file = .... This makes the load repeatable and extensible at any time without creating duplicates.

In [1]:
## DB SetUp and SetUp Check
## For additional help, check the .env.example file.

import os
from dotenv import load_dotenv
from urllib.parse import quote_plus
from sqlalchemy import create_engine, text

load_dotenv()

PG_USER = os.getenv("POSTGRES_USER")
PG_PASS_RAW = os.getenv("POSTGRES_PASS")
PG_HOST = os.getenv("POSTGRES_HOST")
PG_PORT = os.getenv("POSTGRES_PORT", "5432")
PG_DB   = os.getenv("POSTGRES_DB")
PG_SCHEMA = os.getenv("POSTGRES_SCHEMA", "public")

missing = [k for k,v in {
    "POSTGRES_USER": PG_USER,
    "POSTGRES_PASS": PG_PASS_RAW,
    "POSTGRES_HOST": PG_HOST,
    "POSTGRES_DB": PG_DB,
    "POSTGRES_SCHEMA": PG_SCHEMA
}.items() if not v]
if missing:
    raise ValueError(f"Missing env vars: {missing}")

PG_PASS = quote_plus(PG_PASS_RAW)
url = f"postgresql+psycopg2://{PG_USER}:{PG_PASS}@{PG_HOST}:{PG_PORT}/{PG_DB}"
engine = create_engine(url, future=True)

with engine.connect() as conn:
    print(conn.execute(text("SELECT current_user, current_database(), current_schema();")).fetchone())

print("Setup OK. Schema:", PG_SCHEMA)

('patrickpaubandt', 'nf_da_onl_13102025', 'public')
Setup OK. Schema: s_patrickpaubandt


In [None]:
BASE_URL = "https://d37ci6vzurychx.cloudfront.net/trip-data"
MONTHS = ["2025-07", "2025-08", "2025-09"]  # expanded

def file_name(month: str) -> str:
    return f"yellow_tripdata_{month}.parquet"

def file_url(month: str) -> str:
    return f"{BASE_URL}/{file_name(month)}"


In [None]:
# Download + read parquet files

import os
import requests
import pyarrow.parquet as pq
import pandas as pd
import numpy as np
from sqlalchemy import text

# Save data locally
DATA_DIR = r"C:\Users\patri\Documents\Data Analytics - neuefische\Capstone-project-NYCTaxi\data"

# Target schema (adjusted: without passenger_count, store_and_fwd_flag)
TARGET_COLS = [
    "vendorid","tpep_pickup_datetime","tpep_dropoff_datetime","trip_distance","ratecodeid",
    "pulocationid","dolocationid","payment_type",
    "fare_amount","extra","mta_tax","tip_amount","tolls_amount","improvement_surcharge","total_amount",
    "congestion_surcharge","airport_fee","cbd_congestion_fee",
    "source_file"
]

# For the projection when reading (the source_file column is added later in normalize_df)
READ_COLS = [c for c in TARGET_COLS if c != "source_file"]

def normalize_df(df: pd.DataFrame, source_file: str) -> pd.DataFrame:
    # Normalize column names
    df.columns = [c.strip().lower() for c in df.columns]

    # Add source_file
    df["source_file"] = source_file

    # Ensure all target cols exist
    for c in TARGET_COLS:
        if c not in df.columns:
            df[c] = None

    df = df[TARGET_COLS]
    return df

BATCH_SIZE = 200_000

for month in MONTHS:
    sf = file_name(month)
    url = file_url(month)

    # Idempotent: delete any existing rows for the month
    with engine.begin() as conn:
        deleted = conn.execute(
            text(f"DELETE FROM {PG_SCHEMA}.stg_yellow_trips WHERE source_file = :sf"),
            {"sf": sf}
        ).rowcount

    print(f"\nDeleted existing rows for {sf}: {deleted}")
    print("Will load from:", url)

    local_path = os.path.join(DATA_DIR, sf)

    if not os.path.exists(local_path):
        print("Downloading to:", local_path)
        with requests.get(url, stream=True, timeout=300) as r:
            r.raise_for_status()
            with open(local_path, "wb") as f:
                for chunk in r.iter_content(chunk_size=1024 * 1024):  # 1MB
                    if chunk:
                        f.write(chunk)
    else:
        print("File already exists:", local_path)

    # read Parquet in batches
    pf = pq.ParquetFile(local_path)
    print("Rows in file:", pf.metadata.num_rows)
    print("Row groups:", pf.num_row_groups)

    # --- NEU: Case-safe Projection Liste aus dem Parquet Schema bauen ---
    schema_cols = pf.schema.names
    col_map = {c.lower(): c for c in schema_cols}  # lowercase -> echter Parquet-Name

    READ_COLS_PARQUET = [col_map[c] for c in READ_COLS if c in col_map]

    missing = [c for c in READ_COLS if c not in col_map]
    if missing:
        print("WARNING: Diese Columns gibt's im Parquet nicht (werden als NULL gefÃ¼llt):", missing)
    # -------------------------------------------------------------------

    inserted_total = 0

    # Audit counters (neu)
    audit = {
        "read_rows": 0,
        "kept_rows": 0,
        "dropped_payment_not_1": 0,
        "dropped_trip_distance_le_0_or_null": 0,
        "dropped_fare_amount_le_0_or_null": 0,
        "dropped_total_amount_le_0_or_null": 0,
        "dropped_dropoff_not_after_pickup_or_null": 0,
        "dropped_tip_amount_lt_0": 0,
    }

    for i, batch in enumerate(pf.iter_batches(batch_size=BATCH_SIZE, columns=READ_COLS_PARQUET)):
        df = batch.to_pandas()
        audit["read_rows"] += len(df)

        df = normalize_df(df, sf)

        # ---- Type normalization for filters ----
        df["tpep_pickup_datetime"] = pd.to_datetime(df["tpep_pickup_datetime"], errors="coerce")
        df["tpep_dropoff_datetime"] = pd.to_datetime(df["tpep_dropoff_datetime"], errors="coerce")

        for c in ["trip_distance", "fare_amount", "total_amount", "tip_amount", "payment_type"]:
            df[c] = pd.to_numeric(df[c], errors="coerce")

        # ---- Import filters----

        # payment_type = 1
        m = df["payment_type"] == 1
        audit["dropped_payment_not_1"] += int((~m).sum())
        df = df[m]

        # trip_distance > 0
        m = df["trip_distance"].notna() & (df["trip_distance"] > 0)
        audit["dropped_trip_distance_le_0_or_null"] += int((~m).sum())
        df = df[m]

        # fare_amount > 0
        m = df["fare_amount"].notna() & (df["fare_amount"] > 0)
        audit["dropped_fare_amount_le_0_or_null"] += int((~m).sum())
        df = df[m]

        # total_amount > 0
        m = df["total_amount"].notna() & (df["total_amount"] > 0)
        audit["dropped_total_amount_le_0_or_null"] += int((~m).sum())
        df = df[m]

        # dropoff > pickup (duration > 0)
        m = (
            df["tpep_pickup_datetime"].notna() &
            df["tpep_dropoff_datetime"].notna() &
            (df["tpep_dropoff_datetime"] > df["tpep_pickup_datetime"])
        )
        audit["dropped_dropoff_not_after_pickup_or_null"] += int((~m).sum())
        df = df[m]

        # tip_amount >= 0
        m = df["tip_amount"].isna() | (df["tip_amount"] >= 0)
        audit["dropped_tip_amount_lt_0"] += int((~m).sum())
        df = df[m]

        audit["kept_rows"] += len(df)

        # Guard-rails: stop if location IDs are basically all NULL in this batch
        if len(df) > 0:
            null_rates = df[["pulocationid", "dolocationid"]].isna().mean()
            if (null_rates > 0.9).any():
                raise ValueError(f"STOP: Mapping failed in batch {i}. Null rates: {null_rates.to_dict()}")

        # Insert
        if len(df) > 0:
            df.to_sql(
                name="stg_yellow_trips",
                con=engine,
                schema=PG_SCHEMA,
                if_exists="append",
                index=False,
                method="multi",
                chunksize=10_000
            )

        inserted_total += len(df)

        # Progress print every batch
        print(f"Batch {i+1}: inserted {len(df):,} rows | total inserted: {inserted_total:,}")

    print("DONE. Total inserted:", inserted_total)
    print("AUDIT:", audit)



Deleted existing rows for yellow_tripdata_2025-07.parquet: 0
Will load from: https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-07.parquet
File already exists: C:\Users\patri\Documents\Data Analytics - neuefische\Capstone-project-NYCTaxi\data\yellow_tripdata_2025-07.parquet
Rows in file: 3898963
Row groups: 4
Batch 1: inserted 159,633 rows | total inserted: 159,633
Batch 2: inserted 149,251 rows | total inserted: 308,884
Batch 3: inserted 157,401 rows | total inserted: 466,285
Batch 4: inserted 161,510 rows | total inserted: 627,795
Batch 5: inserted 158,678 rows | total inserted: 786,473
Batch 6: inserted 158,150 rows | total inserted: 944,623
Batch 7: inserted 162,202 rows | total inserted: 1,106,825
Batch 8: inserted 161,667 rows | total inserted: 1,268,492
Batch 9: inserted 157,682 rows | total inserted: 1,426,174
Batch 10: inserted 160,038 rows | total inserted: 1,586,212
Batch 11: inserted 162,025 rows | total inserted: 1,748,237
Batch 12: inserted 157,687 rows

In [None]:
## Validation of the Import 

from sqlalchemy import text

sf = "yellow_tripdata_2025-07.parquet"

with engine.connect() as conn:
    # 1) Row count by file
    n = conn.execute(text(f"""
        SELECT COUNT(*) 
        FROM {PG_SCHEMA}.stg_yellow_trips
        WHERE source_file = :sf
    """), {"sf": sf}).scalar()

    # 2) Date coverage
    date_cov = conn.execute(text(f"""
        SELECT
          MIN(tpep_pickup_datetime) AS min_pickup,
          MAX(tpep_pickup_datetime) AS max_pickup,
          MIN(tpep_dropoff_datetime) AS min_dropoff,
          MAX(tpep_dropoff_datetime) AS max_dropoff
        FROM {PG_SCHEMA}.stg_yellow_trips
        WHERE source_file = :sf
    """), {"sf": sf}).fetchone()

    # 3) Critical NULLs (should be low for location IDs)
    nulls = conn.execute(text(f"""
        SELECT
          SUM(CASE WHEN pulocationid IS NULL THEN 1 ELSE 0 END) AS pu_nulls,
          SUM(CASE WHEN dolocationid IS NULL THEN 1 ELSE 0 END) AS do_nulls,
          SUM(CASE WHEN tpep_pickup_datetime IS NULL THEN 1 ELSE 0 END) AS pickup_nulls,
          SUM(CASE WHEN tpep_dropoff_datetime IS NULL THEN 1 ELSE 0 END) AS dropoff_nulls
        FROM {PG_SCHEMA}.stg_yellow_trips
        WHERE source_file = :sf
    """), {"sf": sf}).fetchone()

    # 4) Quick payment sanity (sollte jetzt nur noch 1 sein)
    payment = conn.execute(text(f"""
        SELECT payment_type, COUNT(*) AS n
        FROM {PG_SCHEMA}.stg_yellow_trips
        WHERE source_file = :sf
        GROUP BY 1
        ORDER BY n DESC
    """), {"sf": sf}).fetchall()

print("Rows loaded for file:", n)
print("Date coverage:", date_cov)
print("Critical nulls:", nulls)
print("Payment counts:", payment)


Rows loaded for file: 2278002
Date coverage: (datetime.datetime(2009, 1, 1, 8, 52, 26), datetime.datetime(2025, 7, 31, 23, 59, 58), datetime.datetime(2009, 1, 1, 10, 0, 26), datetime.datetime(2025, 8, 1, 22, 24, 4))
Critical nulls: (0, 0, 0, 0)
Payment counts: [(1, 2278002)]
