## 01 - Ingest GTFS Static Data (Bronze Layer with SCD2)

This notebook downloads and ingests the **static GTFS feed** from King County Metro into the **Bronze Delta Lake layer**, using a **Slowly Changing Dimension Type 2 (SCD2)** approach.

### Purpose
To extract and version core static transit reference data (routes, stops, trips) in a way that preserves historical changes over time. This data will be enriched and analyzed in later stages.

### Workflow Summary
- Downloads the latest GTFS static `.zip` file from [King County Metro GTFS](https://metro.kingcounty.gov/gtfs/)
- Extracts and parses key `.txt` files (`routes.txt`, `stops.txt`, `trips.txt`)
- Converts them into Spark DataFrames
- Compares the new data with the latest existing Delta tables
- Updates Delta tables using SCD2:
  - Inserts new or changed records
  - Marks outdated records with `end_time` and `is_current = false`
- Stores each ingested table as a Delta Lake dataset in the Bronze layer


In [0]:
### Download and Ingest GTFS Static Files
from pyspark.sql import functions as F
import requests, zipfile, io, datetime as dt, os, shutil, tempfile   
from pyspark.sql.functions import lit, current_timestamp


In [0]:
# Just one time run in order to implement SCD2 for the historical data
'''TODAY       = "2025-05-21"          # Static ingest date
BRONZE_BASE = "dbfs:/bronze"
BRONZE_PATH = f"{BRONZE_BASE}/gtfs_static"'''

In [0]:
# Just one time run in order to implement SCD2 for the historical data
'''target_tables = {
    "routes.txt": "route_id",
    "stops.txt": "stop_id",
    "trips.txt": "trip_id"
}

for name, key_col in target_tables.items():
    table_name = name.replace(".txt", "")
    table_path = f"{BRONZE_PATH}/{table_name}"
    old_path = os.path.join(BRONZE_PATH, TODAY, table_name)
   
    df = spark.read.format("delta").load(old_path)
    df_new = df.withColumn("start_time", lit(TODAY)) \
               .withColumn("end_time", lit(None).cast("timestamp")) \
               .withColumn("is_current", lit(True)) \
               .withColumn("processed_at", current_timestamp())
    df_new.display()
    df_new.write.format("delta").mode("overwrite").save(table_path)
    print(f"✅ Updated {table_name}")'''

    

In [0]:

from pyspark.sql import functions as F
import requests, zipfile, io, datetime as dt, os, shutil, tempfile  # dt alias

# ---------- config ----------
GTFS_URL    = "https://metro.kingcounty.gov/gtfs/google_transit.zip"
BRONZE_BASE = "dbfs:/bronze"
BRONZE_PATH = f"{BRONZE_BASE}/gtfs_static/"

# Shared temp folder in DBFS where executors can read the files
DBFS_TMP_DIR = "dbfs:/tmp/gtfs_static"          # lives in DBFS (NOT driver /tmp)
dbutils.fs.mkdirs(DBFS_TMP_DIR)            # idempotent

# Create driver‑local temporary directory for the ZIP extraction
tmp_dir = tempfile.mkdtemp(prefix="gtfs_")
print(f"Driver temp dir: {tmp_dir}")
# Define tables and corresponding key columns
target_tables = {
    "routes.txt": "route_id",
    "stops.txt": "stop_id",
    "trips.txt": "trip_id"
}
try:
    print("Downloading GTFS zip …")
    z = zipfile.ZipFile(io.BytesIO(requests.get(GTFS_URL, timeout=30).content))
    for name, key_col in target_tables.items():
        table_name = name.replace(".txt", "")
        table_path = f"{BRONZE_BASE}/gtfs_static/{table_name}"

        # Skip tables not in zip (if any)
        if name not in z.namelist():
            print(f"⚠️ Skipping {name} — not found in zip.")
            continue

        print(f"📥 Processing {name} with key column '{key_col}'")

        # Extract and read
        local_file = os.path.join(tmp_dir, name)
        with z.open(name) as src, open(local_file, "wb") as dst:
            dst.write(src.read())
        print("z open successful!", name)
        dbfs_file = f"{DBFS_TMP_DIR}/{name}"
        with open(local_file, "r") as f:
            dbutils.fs.put(dbfs_file, f.read(), overwrite=True)
        df = (
            spark.read.option("header", True)
            .csv(dbfs_file)
            )
        # Add SCD2 columns
        df_new = df.withColumn("start_time", current_timestamp()) \
                .withColumn("end_time", lit(None).cast("timestamp")) \
                .withColumn("is_current", lit(True)) \
                .withColumn("processed_at", current_timestamp())

        try:
            df_existing = spark.read.format("delta").load(table_path)
            existing_cols = set(df_existing.columns)
            new_cols      = set(df_new.columns)

            cols_missing_in_new = existing_cols - new_cols     
            cols_missing_in_table = new_cols - existing_cols
            if "df_existing" in locals():          # only runs when the table already exists
                all_cols = set(df_existing.columns).union(cols_missing_in_table)   # every column currently in the SCD2 table
                #print (all_cols)
                #print (cols_missing_in_new)
                #print (cols_missing_in_table)
                # Build a column list: use real column when present, otherwise a NULL placeholder
                df_new = df_new.select([
                    F.col(c) if c in df_new.columns
                    else F.lit(None).cast("string").alias(c)   # <-- adjust cast if needed
                    for c in all_cols
                ])

                # Build a column list: use real column when present, otherwise a NULL placeholder
                df_existing = df_existing.select([
                    F.col(c) if c in df_existing.columns
                    else F.lit(None).cast("string").alias(c)   # <-- adjust cast if needed
                    for c in all_cols
                ])


            compare_cols = [c for c in df.columns if c != key_col]
            df_existing_current = df_existing.filter("is_current")

            diff_condition = " OR ".join([f"new.{c} != existing.{c}" for c in compare_cols])

            df_changes = (
                df_new.alias("new")
                .join(df_existing_current.alias("existing"), on=key_col, how="left")
                .filter(diff_condition)
                .select("new.*")
            )

            if df_changes.count() > 0:
                df_to_expire = (
                    df_existing_current.alias("existing")
                    .join(df_changes.alias("changes"), on=key_col, how="inner")
                    .select("existing.*")                           # Select all columns from the existing (current) row
                    .withColumn("end_time", current_timestamp())    # Mark when this record was expired
                    .withColumn("is_current", lit(False))           # Set is_current = False
                    .withColumn("processed_at", current_timestamp()) # Optionally track processing time
                    ) 


                df_final_current = df_existing_current.join(
                    df_to_expire.select(key_col), on=key_col, how="left_anti"
                )
                df_historical = df_existing.filter("is_current = false")
                #print("df final current columns:", df_final_current.columns)
                #print(f"df historical columns:", df_historical.columns)
                #print(f"df to expire columns:", df_to_expire.columns)
                #print(f"df changes columns:", df_changes.columns)
                df_final = (df_final_current
                            .unionByName(df_to_expire)
                            .unionByName(df_changes)
                            .unionByName(df_historical)
                )
                df_final.display()
                df_final.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save(table_path)
                print(f"✅ Updated {table_name} (SCD2)")

            else:
                print(f"✓ No changes found in {table_name}")

        except Exception as e:
            print(e)
            #print(f"✓ Creating new SCD2 table for {table_name} with exception!")
            #df_new.write.format("delta").mode("overwrite").save(table_path)
except Exception as e:
    print(f"Error downloading GTFS zip: {e}")

In [0]:
shutil.rmtree(tmp_dir, ignore_errors=True) # Clean up temp files