Download for the source and save in the landing volume

In [0]:
import requests
from datetime import datetime

url = "https://data.calgary.ca/download/npk7-z3bj/application%2Fx-zip-compressed"

date_str = datetime.now().strftime("%Y-%m-%d")
zip_path = f"/Volumes/calgary_transit/landing/landing_vol/calgary_gtfs_{date_str}.zip"

# stream download (safer for larger files)
with requests.get(url, stream=True, timeout=120) as r:
    r.raise_for_status()
    with open(zip_path, "wb") as f:
        for chunk in r.iter_content(chunk_size=1024*1024):
            if chunk:
                f.write(chunk)

print("Saved:", zip_path)


Unzip into Bronze volume

In [0]:
import zipfile
import os

extract_dir = f"/Volumes/calgary_transit/bronze/bronze_vol/calgary_gtfs/{date_str}/"
os.makedirs(extract_dir, exist_ok=True)

with zipfile.ZipFile(zip_path, "r") as z:
    z.extractall(extract_dir)

print("Extracted to:", extract_dir)


Bronze volume to Bronze tables

In [0]:
from pyspark.sql import functions as F
import os, glob
from datetime import datetime

date_str = datetime.now().strftime("%Y-%m-%d")  # keep consistent with your run
extract_dir = f"/Volumes/calgary_transit/bronze/bronze_vol/calgary_gtfs/{date_str}/"

# Where tables will live
catalog = "calgary_transit"
schema = "bronze"   # <-- your bronze schema that holds Delta tables
table_prefix = "gtfs_"  # results like calgary_transit.bronze.gtfs_stops

txt_files = glob.glob(os.path.join(extract_dir, "*.txt"))

if not txt_files:
    raise FileNotFoundError(f"No .txt files found in: {extract_dir}")

ingest_ts = F.current_timestamp()

for fp in sorted(txt_files):
    base = os.path.splitext(os.path.basename(fp))[0]   # stops, routes, trips, ...
    table_name = f"{catalog}.{schema}.{table_prefix}{base}"

    df = (
        spark.read
            .option("header", True)
            .option("inferSchema", True)
            .option("mode", "PERMISSIVE")
            .csv(fp)
            .withColumn("_ingest_date", F.lit(date_str))
            .withColumn("_ingest_ts", ingest_ts)
            .withColumn("_source_file", F.lit(fp))
    )

    # Append (creates table on first run)
    (df.write
       .format("delta")
       .mode("append")
       .option("mergeSchema", "true")   # helpful if columns evolve slightly
       .saveAsTable(table_name))

    print(f"Appended {df.count():,} rows -> {table_name}")
