In [14]:
import os
from pathlib import Path

# Local data cache directory. Every run can re-use prior outputs and only fetch missing data.
DATA_DIR = Path("data")
DATA_DIR.mkdir(exist_ok=True)

# Task date window (drives both rainfall aggregation and rail query)
START_DATE = "2022-05-01"
END_DATE   = "2022-05-30"

# Turn on/off cloud caching. When enabled:
# - if local CSV missing, attempt to download it from GCS
# - after computing outputs, upload them to GCS for future runs / sharing.
USE_GCS_CACHE = True
BUCKET_NAME = "de-candidate-task-results-sp"
SERVICE_ACCOUNT_FILE = "searchlab-bq-training-sp-key.json"

# Business rule for the "night before" rainfall feature:
# Sum precipitation between 18:00–23:00 on day D-1 and attach it to rail date D.
RAINFALL_START_TIME = 18
RAINFALL_END_TIME   = 23   

# Each route maps to a rail origin/destination AND a weather station (CEDA MIDAS).
# This keeps the pipeline scalable: add routes by adding dicts, no code changes.
ROUTES = [
    {
        "from_loc": "LDS",
        "to_loc": "KGX",
        "weather_station_name": "bramham",
        "weather_region": "west-yorkshire",
        "weather_station_id": "00534",
        "ceda_csv_url": "https://dap.ceda.ac.uk/badc/ukmo-midas-open/data/uk-hourly-rain-obs/dataset-version-202507/west-yorkshire/00534_bramham/qc-version-1/midas-open_uk-hourly-rain-obs_dv-202507_west-yorkshire_00534_bramham_qcv-1_2022.csv?download=1",
    },
    {
        "from_loc": "LDS",
        "to_loc": "BDQ",
        "weather_station_name": "bradford",
        "weather_region": "west-yorkshire",
        "weather_station_id": "00516",
        "ceda_csv_url": "https://dap.ceda.ac.uk/badc/ukmo-midas-open/data/uk-hourly-rain-obs/dataset-version-202507/west-yorkshire/00516_bradford/qc-version-1/midas-open_uk-hourly-rain-obs_dv-202507_west-yorkshire_00516_bradford_qcv-1_2022.csv?download=1",
    },
]

# Basic run visibility: helps during development and shows config is loaded correctly.
print("Configured routes:", len(ROUTES))
for r in ROUTES:
    print(r["from_loc"], "->", r["to_loc"], "| weather:", r["weather_station_name"], r["weather_station_id"])

# Environment variables keep secrets out of source code and support CI / deployment later.
print("CEDA_ACCESS_TOKEN set:", bool(os.getenv("CEDA_ACCESS_TOKEN")))
print("RAIL_USER set:", bool(os.getenv("RAIL_USER")))
print("RAIL_PASSWORD set:", bool(os.getenv("RAIL_PASSWORD")))


Configured routes: 2
LDS -> KGX | weather: bramham 00534
LDS -> BDQ | weather: bradford 00516
CEDA_ACCESS_TOKEN set: True
RAIL_USER set: True
RAIL_PASSWORD set: True


In [15]:
def ensure_local_from_gcs(bucket_name, blob_name, local_path, service_account_file):
    """
    Cache strategy:
      - If local file exists, use it (fastest).
      - Else, if cloud caching enabled and blob exists, download from GCS.
      - Else, caller must compute/fetch it.
    """
    if local_path.exists():
        return True
    if not USE_GCS_CACHE:
        return False

    from google.cloud import storage
    from google.oauth2 import service_account

    creds = service_account.Credentials.from_service_account_file(service_account_file)
    client = storage.Client(credentials=creds)

    bucket = client.bucket(bucket_name)
    blob = bucket.blob(blob_name)

    if blob.exists(client):
        local_path.parent.mkdir(exist_ok=True)
        blob.download_to_filename(str(local_path))
        print(f"[INFO] Downloaded gs://{bucket_name}/{blob_name} -> {local_path}")
        return True

    return False


def upload_file_to_gcs(local_file_path, bucket_name, blob_name, service_account_file):
    """
    Upload computed outputs to GCS so future runs can avoid re-fetching/recomputing.
    This also makes results easy to share and easy to load into BigQuery from GCS.
    """
    if not USE_GCS_CACHE:
        return

    from google.cloud import storage
    from google.oauth2 import service_account

    creds = service_account.Credentials.from_service_account_file(service_account_file)
    client = storage.Client(credentials=creds)

    bucket = client.bucket(bucket_name)
    blob = bucket.blob(blob_name)
    blob.upload_from_filename(str(local_file_path))
    print(f"[INFO] Uploaded -> gs://{bucket_name}/{blob_name}")


In [16]:
import pandas as pd
from ceda_utils import get_weather_data

def compute_night_before_rainfall(weather_df: pd.DataFrame) -> pd.DataFrame:
    """
    Converts raw hourly rainfall observations into a daily "night before" feature.

    Logic:
      - parse timestamps
      - coerce precipitation to numeric
      - filter to evening hours (18:00–23:00)
      - sum by day and station
      - shift forward by +1 day so it aligns with the next day's rail date

    Output schema:
      date (YYYY-MM-DD), weather_station, rainfall_nightbefore_mm
    """
    df = weather_df.copy()

    # Robust parsing: handle unexpected values gracefully.
    df["ob_end_time"] = pd.to_datetime(df["ob_end_time"], errors="coerce")
    df["prcp_amt"] = pd.to_numeric(df["prcp_amt"], errors="coerce").fillna(0.0)

    # Feature window: evening prior to rail date.
    df = df[df["ob_end_time"].dt.hour.between(RAINFALL_START_TIME, RAINFALL_END_TIME)]

    # Aggregate rainfall by calendar date and station
    agg = (
        df.groupby([df["ob_end_time"].dt.date, "weather_station"])["prcp_amt"]
        .sum()
        .reset_index()
        .rename(columns={"ob_end_time": "obs_date", "prcp_amt": "rainfall_evening_mm"})
    )

    # Shift date forward: evening of D-1 becomes rainfall feature for rail date D
    agg["date"] = pd.to_datetime(agg["obs_date"]) + pd.Timedelta(days=1)
    agg["date"] = agg["date"].dt.strftime("%Y-%m-%d")

    return agg[["date", "weather_station", "rainfall_evening_mm"]].rename(
        columns={"rainfall_evening_mm": "rainfall_nightbefore_mm"}
    )


def get_or_build_rainfall(route: dict) -> pd.DataFrame:
    """
    Rainfall caching strategy:
      - Use local cached rainfall if present
      - Else download from GCS cache (if enabled)
      - Else fetch from CEDA and compute the feature, then cache locally and to GCS

    Rationale:
      - Weather data is stable for historical periods → caching is safe and saves time.
      - One rainfall CSV per station/route prevents mixing stations with different rainfall patterns.
    """
    station_id = route["weather_station_id"]
    station_name = route["weather_station_name"]

    rain_csv = DATA_DIR / f"rainfall_{station_id}_{station_name}_{START_DATE}_to_{END_DATE}.csv"
    rain_blob = rain_csv.name

    have = ensure_local_from_gcs(BUCKET_NAME, rain_blob, rain_csv, SERVICE_ACCOUNT_FILE)
    if have:
        df = pd.read_csv(rain_csv)
        print(f"[INFO] Loaded rainfall cache -> {rain_csv}")
        return df

    token = os.getenv("CEDA_ACCESS_TOKEN")
    if not token:
        raise ValueError("CEDA_ACCESS_TOKEN not set")

    weather_df = get_weather_data(url=route["ceda_csv_url"], access_token=token)
    df = compute_night_before_rainfall(weather_df)

    df.to_csv(rain_csv, index=False)
    print(f"[INFO] Saved rainfall -> {rain_csv}")

    upload_file_to_gcs(rain_csv, BUCKET_NAME, rain_blob, SERVICE_ACCOUNT_FILE)
    return df


In [17]:
import time
from base64 import b64encode
from datetime import datetime, timedelta
import requests

HSP_URL = "https://hsp-prod.rockshore.net/api/v1/serviceMetrics"

def call_hsp_api(url, headers, payload, retries=3):
    """
    HSP API is known to intermittently return 5xx and time out.
    This wrapper provides simple resilience:
      - retry on timeouts / errors
      - fixed backoff between retries
    """
    for attempt in range(retries):
        try:
            response = requests.post(url, headers=headers, json=payload, timeout=90)
            if response.status_code == 200:
                return response.json()
            print(f"[WARNING] Status {response.status_code}: {response.text[:200]}")
        except requests.exceptions.ReadTimeout:
            print(f"[WARNING] Timeout attempt {attempt + 1}")
        except Exception as e:
            print(f"[ERROR] Attempt {attempt + 1}: {e}")

        time.sleep(3)  # backoff to avoid hammering an unstable endpoint

    print("[ERROR] All retries failed")
    return None


def get_daily_service_metrics(from_loc, to_loc, start_date, end_date):
    """
    Calls serviceMetrics once per day and aggregates totals for the date.

    Output schema matches task requirement:
      date, departure_rail_station_crs, destination_rail_station_crs,
      service_count_total, service_count_ontime

    Note:
      - We use tolerance=["5"] so "on time" means within 5 minutes.
      - We query a morning time window (06:00–09:59) as part of task assumptions.
    """
    username = os.getenv("RAIL_USER")
    password = os.getenv("RAIL_PASSWORD")
    if not username or not password:
        raise ValueError("RAIL_USER and RAIL_PASSWORD must be set")

    # Basic Auth header required by HSP API (username:password base64)
    token = b64encode(f"{username}:{password}".encode()).decode()
    headers = {"Authorization": f"Basic {token}", "Content-Type": "application/json"}

    cur = datetime.strptime(start_date, "%Y-%m-%d")
    end = datetime.strptime(end_date, "%Y-%m-%d")

    records = []

    while cur <= end:
        date_str = cur.strftime("%Y-%m-%d")

        payload = {
            "from_loc": from_loc,
            "to_loc": to_loc,
            "from_time": "0600",
            "to_time": "0959",
            "from_date": date_str,
            "to_date": date_str,
            "days": "WEEKDAY",
            "tolerance": ["5"],
        }

        print(f"[INFO] Fetching rail {from_loc}->{to_loc} {date_str}")
        data = call_hsp_api(HSP_URL, headers, payload)

        if not data:
            # We skip failed days rather than failing the entire pipeline.
            # This supports "rerun to fill gaps".
            print(f"[ERROR] Skipping {date_str}")
            cur += timedelta(days=1)
            continue

        total = 0
        on_time = 0

        # Each "service" corresponds to a scheduled service pattern.
        # Metrics provides counts within tolerance / outside tolerance.
        for svc in data.get("Services", []):
            metrics = svc.get("Metrics", [])
            if not metrics:
                continue
            m = metrics[0]
            total += int(m["num_not_tolerance"]) + int(m["num_tolerance"])
            on_time += int(m["num_tolerance"])

        records.append({
            "date": date_str,
            "departure_rail_station_crs": from_loc,
            "destination_rail_station_crs": to_loc,
            "service_count_total": total,
            "service_count_ontime": on_time,
        })

        time.sleep(1)  # polite delay to reduce proxy errors / rate sensitivity
        cur += timedelta(days=1)

    return pd.DataFrame(records)


In [18]:
from datetime import datetime, timedelta

def daterange(start_date: str, end_date: str):
    """
    Generates YYYY-MM-DD strings for each day in a range (inclusive).
    Used to detect missing days from cached CSVs.
    """
    cur = datetime.strptime(start_date, "%Y-%m-%d")
    end = datetime.strptime(end_date, "%Y-%m-%d")
    while cur <= end:
        yield cur.strftime("%Y-%m-%d")
        cur += timedelta(days=1)


def get_or_build_rail(route: dict) -> pd.DataFrame:
    """
    Rail caching strategy:
      - Try local rail CSV
      - Else download from GCS cache if enabled
      - Identify which days are missing
      - Only fetch missing days (by fetching min->max missing and filtering)
      - Upsert into cached CSV and re-upload

    Rationale:
      - HSP API is slow/flaky, so caching per-route saves huge time.
      - This supports the "run once, rerun to fill gaps" approach.
    """
    from_loc = route["from_loc"]
    to_loc = route["to_loc"]

    rail_csv = DATA_DIR / f"rail_{from_loc}_{to_loc}_{START_DATE}_to_{END_DATE}.csv"
    rail_blob = rail_csv.name

    # local → gcs download
    ensure_local_from_gcs(BUCKET_NAME, rail_blob, rail_csv, SERVICE_ACCOUNT_FILE)
    existing = pd.read_csv(rail_csv) if rail_csv.exists() else pd.DataFrame()

    existing_dates = set(existing["date"].astype(str)) if not existing.empty else set()
    all_dates = set(daterange(START_DATE, END_DATE))
    missing = sorted(all_dates - existing_dates)

    print(f"[INFO] Rail cache {from_loc}->{to_loc}: cached={len(existing_dates)} missing={len(missing)}")

    if missing:
        # Fetch only the range covering missing days, then filter down.
        # This keeps code simple while still avoiding refetching already cached days.
        fetched = get_daily_service_metrics(from_loc, to_loc, min(missing), max(missing))
        fetched = fetched[fetched["date"].isin(missing)].reset_index(drop=True)

        if existing.empty:
            out = fetched
        else:
            out = pd.concat([existing, fetched], ignore_index=True)
            out = out.drop_duplicates(
                subset=["date", "departure_rail_station_crs", "destination_rail_station_crs"],
                keep="last"
            )
            out = out.sort_values("date").reset_index(drop=True)

        out.to_csv(rail_csv, index=False)
        print(f"[INFO] Saved rail -> {rail_csv}")
        upload_file_to_gcs(rail_csv, BUCKET_NAME, rail_blob, SERVICE_ACCOUNT_FILE)
        return out

    if existing.empty:
        print("[WARNING] Rail cache empty and nothing fetched (unexpected).")
    return existing


In [19]:
def build_final_dataset(rail_df: pd.DataFrame, rainfall_df: pd.DataFrame) -> pd.DataFrame:
    """
    Final join step: align rail date to rainfall "night before" by date.

    - Left join: keep rail rows even if rainfall missing (then rainfall becomes 0.0).
    - Coerce types and enforce final schema / column order.
    """
    out = rail_df.merge(rainfall_df, on="date", how="left")

    # Schema enforcement: ensures consistent CSV output + BigQuery load stability.
    out["rainfall_nightbefore_mm"] = pd.to_numeric(out["rainfall_nightbefore_mm"], errors="coerce").fillna(0.0)
    out["service_count_total"] = pd.to_numeric(out["service_count_total"], errors="coerce").fillna(0).astype(int)
    out["service_count_ontime"] = pd.to_numeric(out["service_count_ontime"], errors="coerce").fillna(0).astype(int)

    # Exact schema ordering required by the task
    out = out[
        [
            "date",
            "departure_rail_station_crs",
            "destination_rail_station_crs",
            "weather_station",
            "rainfall_nightbefore_mm",
            "service_count_total",
            "service_count_ontime",
        ]
    ]
    return out

final_frames = []

for route in ROUTES:
    # Step 1: rainfall (cached)
    rain_df = get_or_build_rainfall(route)

    # Step 2: rail metrics (cached + fill gaps)
    rail_df = get_or_build_rail(route)

    # Step 3: merge into required schema
    final_df = build_final_dataset(rail_df, rain_df)

    # Save per-route final output (useful for debugging + partial reruns)
    from_loc = route["from_loc"]
    to_loc = route["to_loc"]
    station_id = route["weather_station_id"]
    station_name = route["weather_station_name"]

    final_csv = DATA_DIR / f"final_{from_loc}_{to_loc}_{station_id}_{station_name}_{START_DATE}_to_{END_DATE}.csv"
    final_df.to_csv(final_csv, index=False)
    print(f"[INFO] Saved final -> {final_csv}")

    upload_file_to_gcs(final_csv, BUCKET_NAME, final_csv.name, SERVICE_ACCOUNT_FILE)

    final_frames.append(final_df)

# Combined output across routes (single file is convenient for BigQuery ingestion)
combined_final = (
    pd.concat(final_frames, ignore_index=True)
      .sort_values(["departure_rail_station_crs", "destination_rail_station_crs", "date"])
      .reset_index(drop=True)
)

combined_csv = DATA_DIR / f"final_ALL_ROUTES_{START_DATE}_to_{END_DATE}.csv"
combined_final.to_csv(combined_csv, index=False)
print(f"[INFO] Saved combined final -> {combined_csv}")

upload_file_to_gcs(combined_csv, BUCKET_NAME, combined_csv.name, SERVICE_ACCOUNT_FILE)


[INFO] Loaded rainfall cache -> data\rainfall_00534_bramham_2022-05-01_to_2022-05-30.csv
[INFO] Rail cache LDS->KGX: cached=30 missing=0
[INFO] Saved final -> data\final_LDS_KGX_00534_bramham_2022-05-01_to_2022-05-30.csv
[INFO] Uploaded -> gs://de-candidate-task-results-sp/final_LDS_KGX_00534_bramham_2022-05-01_to_2022-05-30.csv
[INFO] Loaded rainfall cache -> data\rainfall_00516_bradford_2022-05-01_to_2022-05-30.csv
[INFO] Rail cache LDS->BDQ: cached=30 missing=0
[INFO] Saved final -> data\final_LDS_BDQ_00516_bradford_2022-05-01_to_2022-05-30.csv
[INFO] Uploaded -> gs://de-candidate-task-results-sp/final_LDS_BDQ_00516_bradford_2022-05-01_to_2022-05-30.csv
[INFO] Saved combined final -> data\final_ALL_ROUTES_2022-05-01_to_2022-05-30.csv
[INFO] Uploaded -> gs://de-candidate-task-results-sp/final_ALL_ROUTES_2022-05-01_to_2022-05-30.csv


In [20]:
all_dates = set(daterange(START_DATE, END_DATE))
have_dates = set(rail_df["date"].astype(str)) if not rail_df.empty else set()
still_missing = sorted(all_dates - have_dates)

print("[INFO] Still missing rail dates:", len(still_missing))
print(still_missing[:20])


[INFO] Still missing rail dates: 0
[]


In [21]:
from google.cloud import bigquery
from google.oauth2 import service_account

SERVICE_ACCOUNT_FILE = "searchlab-bq-training-sp-key.json"
creds = service_account.Credentials.from_service_account_file(SERVICE_ACCOUNT_FILE)

PROJECT_ID = "searchlab-bq-training"
DATASET_ID = "de_candidate_task_seanparrott"

BUCKET_NAME = "de-candidate-task-results-sp"
GCS_URI = f"gs://{BUCKET_NAME}/final_ALL_ROUTES_2022-05-01_to_2022-05-30.csv"

TARGET_TABLE  = f"{PROJECT_ID}.{DATASET_ID}.daily_route_metrics"
STAGING_TABLE = f"{PROJECT_ID}.{DATASET_ID}.daily_route_metrics_staging"

client = bigquery.Client(credentials=creds, project=PROJECT_ID)

job_config = bigquery.LoadJobConfig(
    schema=[
        bigquery.SchemaField("date", "DATE"),
        bigquery.SchemaField("departure_rail_station_crs", "STRING"),
        bigquery.SchemaField("destination_rail_station_crs", "STRING"),
        bigquery.SchemaField("weather_station", "STRING"),
        bigquery.SchemaField("rainfall_nightbefore_mm", "FLOAT"),
        bigquery.SchemaField("service_count_total", "INTEGER"),
        bigquery.SchemaField("service_count_ontime", "INTEGER"),
    ],
    skip_leading_rows=1,
    source_format=bigquery.SourceFormat.CSV,
    write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,  # staging is always replaced
)

job_config.time_partitioning = bigquery.TimePartitioning(
    type_=bigquery.TimePartitioningType.DAY,
    field="date",
)

job = client.load_table_from_uri(GCS_URI, STAGING_TABLE, job_config=job_config)
job.result()

print("[INFO] Loaded staging:", STAGING_TABLE)
print("[INFO] Staging rows:", client.get_table(STAGING_TABLE).num_rows)


[INFO] Loaded staging: searchlab-bq-training.de_candidate_task_seanparrott.daily_route_metrics_staging
[INFO] Staging rows: 60


In [22]:
from google.api_core.exceptions import NotFound

# 1) Ensure target table exists (create empty table if it doesn't)
try:
    client.get_table(TARGET_TABLE)
    print("[INFO] Target table exists:", TARGET_TABLE)
except NotFound:
    schema = client.get_table(STAGING_TABLE).schema
    table = bigquery.Table(TARGET_TABLE, schema=schema)
    table.time_partitioning = bigquery.TimePartitioning(type_=bigquery.TimePartitioningType.DAY, field="date")
    client.create_table(table)
    print("[INFO] Created target table:", TARGET_TABLE)

# 2) MERGE staging -> target (upsert)
merge_sql = f"""
MERGE `{TARGET_TABLE}` T
USING `{STAGING_TABLE}` S
ON  T.date = S.date
AND T.departure_rail_station_crs = S.departure_rail_station_crs
AND T.destination_rail_station_crs = S.destination_rail_station_crs
AND T.weather_station = S.weather_station

WHEN MATCHED THEN UPDATE SET
  rainfall_nightbefore_mm = S.rainfall_nightbefore_mm,
  service_count_total = S.service_count_total,
  service_count_ontime = S.service_count_ontime

WHEN NOT MATCHED THEN INSERT (
  date,
  departure_rail_station_crs,
  destination_rail_station_crs,
  weather_station,
  rainfall_nightbefore_mm,
  service_count_total,
  service_count_ontime
) VALUES (
  S.date,
  S.departure_rail_station_crs,
  S.destination_rail_station_crs,
  S.weather_station,
  S.rainfall_nightbefore_mm,
  S.service_count_total,
  S.service_count_ontime
);
"""

before_count = client.get_table(TARGET_TABLE).num_rows

merge_job = client.query(merge_sql)
merge_job.result()

after_count = client.get_table(TARGET_TABLE).num_rows
affected = merge_job.num_dml_affected_rows

print("------ BigQuery Upsert Summary ------")
print("Rows before merge:", before_count)
print("Rows inserted/updated:", affected)
print("Rows after merge:", after_count)
print("--------------------------------------")



[INFO] Target table exists: searchlab-bq-training.de_candidate_task_seanparrott.daily_route_metrics
------ BigQuery Upsert Summary ------
Rows before merge: 60
Rows inserted/updated: 60
Rows after merge: 60
--------------------------------------


In [23]:
VIEW_REF = f"{PROJECT_ID}.{DATASET_ID}.v_daily_route_metrics"

view_sql = f"""
CREATE OR REPLACE VIEW `{VIEW_REF}` AS
SELECT
  date,
  departure_rail_station_crs,
  destination_rail_station_crs,
  weather_station,
  rainfall_nightbefore_mm,
  service_count_total,
  service_count_ontime
FROM `{TARGET_TABLE}`;
"""

client.query(view_sql).result()
print("[INFO] View ready:", VIEW_REF)


[INFO] View ready: searchlab-bq-training.de_candidate_task_seanparrott.v_daily_route_metrics


In [24]:
query = f"""
SELECT *
FROM `{VIEW_REF}`
ORDER BY date, departure_rail_station_crs, destination_rail_station_crs
LIMIT 20
"""
client.query(query).to_dataframe().head()




Unnamed: 0,date,departure_rail_station_crs,destination_rail_station_crs,weather_station,rainfall_nightbefore_mm,service_count_total,service_count_ontime
0,2022-05-01,LDS,BDQ,bradford,2.2,0,0
1,2022-05-01,LDS,KGX,bramham,0.0,0,0
2,2022-05-02,LDS,BDQ,bradford,0.0,6,6
3,2022-05-02,LDS,KGX,bramham,0.0,9,7
4,2022-05-03,LDS,BDQ,bradford,1.4,6,5
