In [1]:
import requests
import json
import logging
import csv
import time
import yaml
import os
from datetime import datetime,timezone
from pathlib import Path
import pandas as pd
from src.tfl_client import *

# Gets all lines and their valid routes for given modes, including the name and id of the originating and terminating stops for each route
# GET https://api.tfl.gov.uk/Line/Mode/bus/Route

# Get disruptions for all lines of the given modes.
# GET https://api.tfl.gov.uk/Line/Mode/bus/Disruption

# Get the list of arrival predictions for given line ids based at the given stop
# GET https://api.tfl.gov.uk/Line/{lineId}/Arrivals

def load_schema(path: str) -> dict:
    with open(path, "r") as f:
        return yaml.safe_load(f)

def load_config(path: str) -> dict:
    with open(path, "r") as f:
        return json.load(f)

def load_secrets():
    app_id = os.getenv("TFL_APP_ID")
    app_key = os.getenv("TFL_APP_KEY")

    if not app_id or not app_key:
        raise RuntimeError("Missing TfL credentials: set TFL_APP_ID and TFL_APP_KEY")
    return app_id, app_key

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
app_id, app_key = load_secrets()

In [3]:
# Preprocess the Line routes data
snapshot_date = datetime.now(timezone.utc).date().isoformat()
OUTPUT_DIR = Path(f"data/reference/line_routes_snapshot/dt={snapshot_date}")
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
OUTPUT_FILE = OUTPUT_DIR / "line_routes.csv"

# Load config, schemas and params
config = load_config(Path("config/routes.json"))
schema = load_schema(Path("schemas/reference/line_routes.yaml"))

# Fetch data using the client
data = get_line_routes(app_id=app_id, app_key=app_key)

rows = []
target_id = set()

for line in data:
    line_id = line.get("id")
    if not isinstance(line_id, str) or not line_id.isdigit():
        continue
    
    target_id.add(line_id)   
    name = line.get("name")
    mode = line.get("modeName")

    for rs in line.get("routeSections", []):
        direction = f"{rs.get('direction')}"
        originationName = f"{rs.get('originationName')}"
        destinationName = f"{rs.get('destinationName')}"

        rows.append({
            "line_id": line_id,
            "mode": mode,
            "direction": direction,
            "origination_name": originationName,
            "destination_name": destinationName,
        })

# --- schema validation ---
required = set(schema["required_columns"])
for i, row in enumerate(rows):
    missing = required - row.keys()
    if missing:
        raise ValueError(f"Row {i} missing fields: {missing}")
        
output_columns = list(schema["output_columns"].keys())

df = pd.DataFrame(rows)[output_columns]
df.to_parquet(OUTPUT_DIR / "line_routes.parquet", engine="pyarrow")

SAMPLE_DIR = Path("data/reference/line_routes_snapshot/samples")
SAMPLE_DIR.mkdir(parents=True, exist_ok=True)
df.head(300).to_csv(
    SAMPLE_DIR / f"line_routes_{snapshot_date}.csv",
    index=False,
)

logger.info(f"Saved {len(rows)} rows to {OUTPUT_DIR / 'line_routes.parquet'}")

2026-01-22 02:31:42,151 - INFO - Saved 1026 rows to data/reference/line_routes_snapshot/dt=2026-01-22/line_routes.parquet


In [4]:
# Preprocess the names and ids of stops on the given lines ids.
schema = load_schema("schemas/reference/line_stop.yaml")

snapshot_date = datetime.now(timezone.utc).date().isoformat()
OUTPUT_DIR = Path(f"data/reference/line_stop_snapshot/dt={snapshot_date}")
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

rows = []
line_ids = config["line_ids"]
logger.info(f"Loading stops for lines: {line_ids}")

for line in line_ids:
    inbound, outbound = get_stops_sequence(id=line, app_id=app_id, app_key=app_key)
    inbound_routes = inbound.get("orderedLineRoutes", [])
    outbound_routes = outbound.get("orderedLineRoutes", [])
    
    if inbound_routes:
        for seq, stop_id in enumerate(inbound_routes[0].get("naptanIds", [])):
            rows.append({
                "snapshot_date": snapshot_date,
                "line_id": line,
                "direction": "inbound",
                "stop_id": stop_id,
                "stop_sequence": seq,
            }) # 

    if outbound_routes:
        for seq, stop_id in enumerate(outbound_routes[0].get("naptanIds", [])):
            rows.append({
                "snapshot_date": snapshot_date,
                "line_id": line,
                "direction": "outbound",
                "stop_id": stop_id,
                "stop_sequence": seq,
            })

# --- schema validation ---
required = set(schema["required_columns"])
for i, row in enumerate(rows):
    missing = required - row.keys()
    if missing:
        raise ValueError(f"Row {i} missing fields: {missing}")

output_columns = list(schema["output_columns"].keys())

# Save in Parquet
df = pd.DataFrame(rows)[output_columns]
df.to_parquet(OUTPUT_DIR / f"line_stop.parquet", engine="pyarrow")

SAMPLE_DIR = Path("data/reference/line_stop_snapshot/samples")
SAMPLE_DIR.mkdir(parents=True, exist_ok=True)
df.head(300).to_csv(SAMPLE_DIR / f'line_stop_{snapshot_date}.csv', index=False)

logger.info(f"Saved {len(rows)} rows to {SAMPLE_DIR / 'line_stop.csv'} and {OUTPUT_DIR / 'line_stop.parquet'}")

2026-01-22 02:32:00,927 - INFO - Loading stops for lines: ['12', '34']
2026-01-22 02:32:03,437 - INFO - Saved 180 rows to data/reference/line_stop_snapshot/samples/line_stop.csv and data/reference/line_stop_snapshot/dt=2026-01-22/line_stop.parquet


In [5]:
# Get the Timetable for every stop on a list of lines
snapshot_date = datetime.now(timezone.utc).date().isoformat()
line_stop_DIR = Path(f"data/reference/line_stop_snapshot/dt={snapshot_date}")
df = pd.read_parquet(line_stop_DIR/f"line_stop.parquet")
OUTPUT_DIR = Path(f"data/reference/stop_timetable_snapshot/dt={snapshot_date}") 
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

schema = load_schema(Path("schemas/reference/stop_timetable.yaml"))

line_stop_pairs = (
    df[["line_id", "stop_id", "stop_sequence"]]
    .drop_duplicates(subset=["line_id", "stop_id"])
    .sort_values(["line_id", "stop_sequence"], kind="stable")
)

total_pairs = len(line_stop_pairs)
logger.info(f"Starting timetable ingestion for {total_pairs} line-stop pairs")
processed = 0
success = 0
skipped = 0
failed = 0
LOG_EVERY = 50

timetable_rows = []

for line_id, stop_id, stop_sequence in line_stop_pairs.itertuples(index=False):
    processed += 1

    try:
        timetable_json = get_timetable(
            id=line_id,
            stop_id=stop_id,
            app_id=app_id,
            app_key=app_key
        )

        if timetable_json is None:
            skipped += 1
            continue

        timetable_rows.extend(
            extract_timetable_rows(
                timetable_json=timetable_json,
                snapshot_date=snapshot_date,
                stop_sequence=stop_sequence
            )
        )
        success += 1

    except Exception as e:
        failed += 1
        logger.error(
            f"Failed line={line_id}, stop={stop_id}: {e}"
        )

    if processed % LOG_EVERY == 0 or processed == total_pairs:
        logger.info(
            "Progress: %d/%d processed | success=%d | skipped=%d | failed=%d",
            processed,
            total_pairs,
            success,
            skipped,
            failed
        )

df = pd.DataFrame(timetable_rows)

# --- schema validation ---
required = set(schema["required_columns"])
missing = required - set(df.columns)
if missing:
    raise ValueError(f"Missing required columns: {missing}")

output_columns = list(schema["output_columns"].keys())
df = df[output_columns]
OUTPUT_FILE = OUTPUT_DIR / f"stop_timetable_{snapshot_date}.parquet"
df.to_parquet(OUTPUT_FILE, engine="pyarrow")

SAMPLE_DIR = Path("data/reference/stop_timetable_snapshot/samples")
SAMPLE_DIR.mkdir(parents=True, exist_ok=True)
SAMPLE_FILE = SAMPLE_DIR / f"stop_timetable_{snapshot_date}.csv"
df.head(300).to_csv(SAMPLE_FILE, index=False)

logger.info(f"Wrote {len(df)} rows to {OUTPUT_FILE}")
logger.info(f"Sample written to {SAMPLE_FILE}")  

2026-01-22 02:32:12,361 - INFO - Starting timetable ingestion for 180 line-stop pairs
2026-01-22 02:32:20,427 - INFO - Progress: 50/180 processed | success=50 | skipped=0 | failed=0
2026-01-22 02:32:30,071 - INFO - Progress: 100/180 processed | success=98 | skipped=2 | failed=0
2026-01-22 02:32:37,647 - INFO - Progress: 150/180 processed | success=148 | skipped=2 | failed=0
2026-01-22 02:32:42,530 - INFO - Wrote 65399 rows to data/reference/stop_timetable_snapshot/dt=2026-01-22/stop_timetable_2026-01-22.parquet
2026-01-22 02:32:42,531 - INFO - Sample written to data/reference/stop_timetable_snapshot/samples/stop_timetable_2026-01-22.csv


In [6]:
def validate_bronze_arrival_predictions(df: pd.DataFrame, schema_path: str):
    with open(schema_path, "r") as f:
        schema = yaml.safe_load(f)

    columns = schema["columns"]

    # 1. Column existence
    for col in columns:
        if col not in df.columns:
            raise ValueError(f"Missing required column: {col}")

    # 2. Nullability + type checks
    for col, meta in columns.items():
        if not meta.get("nullable", True):
            if df[col].isnull().any():
                raise ValueError(f"Non-nullable column has nulls: {col}")

        if meta["type"] == "datetime":
            try:
                df[col] = pd.to_datetime(df[col], utc=True)
            except Exception as e:
                raise ValueError(f"Failed to parse datetime column {col}: {e}")

        if meta["type"] == "int":
            if not pd.api.types.is_integer_dtype(df[col]):
                raise ValueError(f"Column {col} is not integer typed")

    return df

In [7]:
# Bronze: Ingestion arrival predictions for given line ids based at the given stop
import pyarrow as pa
import pyarrow.parquet as pq

ingestion_ts = datetime.utcnow()
date_part = ingestion_ts.strftime("%Y-%m-%d")
hour_part = ingestion_ts.strftime("%H")
minute_part = ingestion_ts.strftime("%M")

output_dir = Path(f"data/bronze/arrival_predictions/dt={date_part}")
output_dir.mkdir(parents=True, exist_ok=True)

run_id = ingestion_ts.strftime("%Y%m%dT%H%M%S")
output_file = output_dir / f"part-{run_id}.parquet"

# Take an example target
example_id = config["line_ids"]
print("example_id: ", example_id) 
data = get_arrivals(ids=example_id, app_id=app_id, app_key=app_key)

rows = []

for vehicle in data:
    rows.append({
        # partition / snapshot metadata
        "ingestion_ts": ingestion_ts.isoformat(),

        # core identifiers
        "vehicle_id": vehicle.get("vehicleId"),
        "line_id": vehicle.get("lineId"),

        # stop / location
        "stop_id": vehicle.get("naptanId"),
        "stop_name": vehicle.get("stationName"),

        # movement
        "direction": vehicle.get("direction"),
        "destination_name": vehicle.get("destinationName"),

        # KPIs
        "time_to_station_sec": vehicle.get("timeToStation"),
        "expected_arrival": vehicle.get("expectedArrival"), # Event time
    })

FIELDNAMES = [
    "ingestion_ts",
    "vehicle_id",
    "line_id",
    "stop_id",
    "stop_name",
    "direction",
    # "destination_name",
    "time_to_station_sec",
    "expected_arrival",
]

rows_df = (
    pd.DataFrame(rows)
    .sort_values(["line_id", "vehicle_id", "expected_arrival"], kind="stable")
    [FIELDNAMES]
)

# --- schema validation ---
rows_df = validate_bronze_arrival_predictions(
    rows_df,
    "schemas/bronze/arrival_predictions.yaml"
)

table = pa.Table.from_pandas(rows_df, preserve_index=False)
pq.write_table(table, output_file)

logger.info(f"Saved {len(rows)} rows to {output_file}")


2026-01-22 02:33:40,794 - INFO - Saved 85 rows to data/bronze/arrival_predictions/dt=2026-01-22/part-20260122T013340.parquet


example_id:  ['12', '34']


In [17]:
# Silver
# Keep set-based transformations with window semantics in SQL as the canonical logic.
# Python is used only as an orchestration and materialization layer, 
# which makes the pipeline portable to DuckDB locally and SparkSQL in production.

import duckdb
from pathlib import Path
execution_date = "2026-01-21" # Example execution date

con = duckdb.connect(database=":memory:")

bronze_path = (f"data/bronze/arrival_predictions/dt={execution_date}/*.parquet")

line_stop_path = (
    f"data/reference/line_stop_snapshot/"
    f"dt={execution_date}/line_stop.parquet"
)

sql = Path("src/silver/sql/arrival_predictions_route_head.sql").read_text()

sql = sql.replace("{{ bronze_path }}", f"'{bronze_path}'")
sql = sql.replace("{{ line_stop_path }}", f"'{line_stop_path}'")

df = con.execute(sql).fetch_df()
df.head()

output_dir = Path(f"data/silver/arrival_predictions_route_head/dt={execution_date}")
output_dir.mkdir(parents=True, exist_ok=True)

df.to_parquet(
    output_dir / f"part-{execution_date}.parquet",
    index=False,
)