<a href="https://colab.research.google.com/github/hawa1983/DATA-602/blob/main/Service_Alert_Feeds.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [8]:
!pip install gtfs-realtime-bindings


Collecting gtfs-realtime-bindings
  Downloading gtfs-realtime-bindings-1.0.0.tar.gz (6.2 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: gtfs-realtime-bindings
  Building wheel for gtfs-realtime-bindings (setup.py) ... [?25l[?25hdone
  Created wheel for gtfs-realtime-bindings: filename=gtfs_realtime_bindings-1.0.0-py3-none-any.whl size=5987 sha256=8fd51ced4c81af7ef875058552b5a2fa5acf8210d85c47344fad783e96a1ece1
  Stored in directory: /root/.cache/pip/wheels/b6/43/38/17a10a2cdd30cb86acceb42e24e7d2d6bb98b2c59ff8983e20
Successfully built gtfs-realtime-bindings
Installing collected packages: gtfs-realtime-bindings
Successfully installed gtfs-realtime-bindings-1.0.0


### **Code Overview: MTA GTFS-Realtime Subway Alerts Parser**

This script connects to the **Metropolitan Transportation Authority (MTA)**’s **GTFS-Realtime “subway-alerts” feed**, retrieves live service alerts in **Protocol Buffers (protobuf)** format, and converts them into a structured, analysis-ready **pandas DataFrame**.

GTFS-Realtime feeds provide continuously updated information on **service changes, delays, and planned work** across subway lines. The feed encodes alerts in a compact binary format defined by Google’s GTFS-Realtime specification (`gtfs_realtime.proto`). This script handles the complete workflow:

1. **Fetches the raw protobuf feed** from the MTA API using multiple candidate URLs (to handle encoding inconsistencies such as `%2F`).
2. **Parses the binary data** into human-readable fields such as route ID, stop ID, trip ID, active time period, cause, effect, and severity.
3. **Flags active alerts** based on current timestamps, allowing integration with delay or headway datasets.
4. **Outputs a structured `pandas` DataFrame**, which can be merged with historical performance data to study relationships between alerts and operational delays.

This code forms the **data ingestion and transformation foundation** for modeling and visualization tasks in the MTA capstone project — such as predicting delay classes, identifying systemic service issues, or evaluating the operational impact of different alert types.

Excellent — here’s a well-formatted **Data Dictionary** section that goes right after your introductory text and before the code block.
It’s written in clear academic style, ready for inclusion in your Capstone report or notebook.

---

### 📊 **Data Dictionary: Parsed GTFS-Realtime Subway Alerts**

| **Column Name**  | **Data Type**    | **Description / Meaning**                                                                                                                        | **Example Value**                                                 |
| ---------------- | ---------------- | ------------------------------------------------------------------------------------------------------------------------------------------------ | ----------------------------------------------------------------- |
| `alert_id`       | *string*         | Unique identifier assigned by MTA for each alert record. Used to track or deduplicate alerts across feeds.                                       | `"1234567890"`                                                    |
| `is_active_now`  | *boolean*        | Indicates whether the alert is currently active at the time of data retrieval (`True` if now ∈ [start_ts, end_ts]).                              | `True`                                                            |
| `start_ts`       | *datetime (UTC)* | Start time when the alert became effective, converted from UNIX timestamp.                                                                       | `2025-10-15 10:00:00+00:00`                                       |
| `end_ts`         | *datetime (UTC)* | End time when the alert expires or is scheduled to end.                                                                                          | `2025-10-15 14:00:00+00:00`                                       |
| `route_id`       | *string*         | Identifier for the affected subway line(s). Corresponds to route IDs in the GTFS static feed (e.g., “A”, “6”, “Q”).                              | `"D"`                                                             |
| `stop_id`        | *string / None*  | ID of the affected stop or station, if specified in the alert’s informed entity.                                                                 | `"R14"`                                                           |
| `trip_id`        | *string / None*  | ID of the affected train trip, if provided. Useful for mapping specific runs or schedules.                                                       | `"067800_A..S05R"`                                                |
| `header`         | *string*         | Short human-readable summary of the alert, intended for passenger display.                                                                       | `"Delays on the D line"`                                          |
| `description`    | *string*         | Longer, detailed description of the service issue or advisory.                                                                                   | `"Signal problems at 125 St — expect delays in both directions."` |
| `cause`          | *categorical*    | Programmatic cause of the alert (enumeration from GTFS spec). Examples include `SIGNAL_PROBLEM`, `TRACK_MAINTENANCE`, `WEATHER`, etc.            | `"SIGNAL_PROBLEM"`                                                |
| `effect`         | *categorical*    | Operational effect of the alert (enumeration from GTFS spec). Typical values include `NO_SERVICE`, `REDUCED_SERVICE`, `SIGNIFICANT_DELAYS`, etc. | `"SIGNIFICANT_DELAYS"`                                            |
| `severity_level` | *categorical*    | Alert severity indicator, mapped to one of the GTFS-Realtime `SeverityLevel` enum values: `INFO`, `WARNING`, or `SEVERE`.                        | `"SEVERE"`                                                        |

**Usage Note:**
This structured DataFrame can be:

* **Merged** with train movement or headway datasets using `route_id` and temporal windows.
* **Aggregated** to compute alert frequency by route or severity level.
* **Labeled** to support supervised learning models predicting `delay_class` or `EWT` outcomes.


In [15]:
# ============================================================
# MTA Subway Alerts Feed Parser
# Author: Fomba (Sahr) Kassoh
# Purpose: Fetch and parse GTFS-Realtime service alerts from the MTA API
# Output: Structured pandas DataFrame of active and past alerts
# ============================================================

# --- Import necessary libraries ---
import requests                      # For making HTTP requests to MTA API
import pandas as pd                  # For storing and manipulating tabular data
from datetime import datetime, timezone  # For handling UTC timestamps
from google.transit import gtfs_realtime_pb2  # For parsing GTFS-Realtime protobuf data

# ============================================================
# Define possible GTFS-Realtime alert feed URLs.
# MTA sometimes encodes the "/" character differently (%2F), so
# we include multiple variants to ensure at least one works.
# ============================================================
CANDIDATE_URLS = [
    "https://api-endpoint.mta.info/Dataservice/mtagtfsfeeds/camsys/subway-alerts",
    "https://api-endpoint.mta.info/Dataservice/mtagtfsfeeds/camsys%2Fsubway-alerts",
    "https://api-endpoint.mta.info/Dataservice/mtagtfsfeeds/camsys/all-alerts",
    "https://api-endpoint.mta.info/Dataservice/mtagtfsfeeds/camsys%2Fall-alerts",
]

# ============================================================
# Define HTTP headers for the request.
# - The "User-Agent" header tricks the server into thinking the
#   request is coming from a web browser (avoids 403 errors).
# - The "Accept" header tells the server we expect protobuf data.
# ============================================================
HEADERS = {
    "User-Agent": (
        "Mozilla/5.0 (X11; Linux x86_64) "
        "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/127.0 Safari/537.36"
    ),
    "Accept": "application/x-protobuf",
}

# ============================================================
# Function: fetch_feed()
# Purpose:  Attempt to download the GTFS-Realtime feed from the
#           list of candidate URLs, handling errors gracefully.
# Returns:  Binary protobuf content (feed bytes)
# ============================================================
def fetch_feed():
    last_err = None
    for url in CANDIDATE_URLS:
        try:
            # Send GET request with custom headers
            r = requests.get(url, headers=HEADERS, timeout=30)

            # Raise HTTPError if the response code is not 200 (OK)
            r.raise_for_status()

            # Return the raw content (protobuf binary)
            return r.content
        except requests.HTTPError as e:
            # Save the last error and try the next URL
            last_err = e
            continue

    # If all attempts failed, raise the last error encountered
    raise last_err

# ============================================================
# Function: parse_alerts(pb_bytes)
# Purpose:  Parse the binary protobuf feed into a structured
#           pandas DataFrame with alert details.
# Input:    pb_bytes - raw protobuf bytes returned by fetch_feed()
# Output:   pandas DataFrame of alert records
# ============================================================
def parse_alerts(pb_bytes):
    # Create a GTFS FeedMessage object and parse binary data
    feed = gtfs_realtime_pb2.FeedMessage()
    feed.ParseFromString(pb_bytes)

    # --- Helper function: convert UNIX timestamp → datetime ---
    def _ts(x):
        return None if not x else datetime.fromtimestamp(int(x), tz=timezone.utc)

    # --- Helper function: extract translated text (string) ---
    def _txt(ts):
        return ts.translation[0].text if ts and ts.translation else None

    # Prepare to store parsed alert rows
    rows = []
    now = datetime.now(tz=timezone.utc).timestamp()  # current UTC time in seconds

    # ============================================================
    # Loop through each entity in the GTFS feed.
    # Each entity can be an alert, trip update, or vehicle position.
    # We're only interested in alert entities.
    # ============================================================
    for ent in feed.entity:
        if not ent.HasField("alert"):
            continue  # skip non-alert entities

        a = ent.alert  # shorthand for the alert object

        # Extract alert metadata
        periods = a.active_period or [gtfs_realtime_pb2.TimeRange()]  # start/end times
        informed = a.informed_entity or [gtfs_realtime_pb2.EntitySelector()]  # affected routes/trips/stops

        # Map numeric enum values to human-readable names
        cause  = gtfs_realtime_pb2.Alert.Cause.Name(a.cause) if a.HasField("cause") else None
        effect = gtfs_realtime_pb2.Alert.Effect.Name(a.effect) if a.HasField("effect") else None
        sev    = gtfs_realtime_pb2.Alert.SeverityLevel.Name(a.severity_level) if a.HasField("severity_level") else None

        # ============================================================
        # Iterate through all active time periods and affected entities
        # to generate one row per (alert × entity × time period)
        # ============================================================
        for ap in periods:
            # Determine if the alert is currently active
            active = (ap.start or 0) <= now <= (ap.end or 2**31 - 1)

            for inf in informed:
                # Build a row dictionary for each affected entity
                rows.append({
                    "alert_id": ent.id,  # unique alert identifier
                    "is_active_now": bool(active),  # whether it's active now
                    "start_ts": _ts(ap.start),      # start time (UTC datetime)
                    "end_ts": _ts(ap.end),          # end time (UTC datetime)
                    "route_id": getattr(inf, "route_id", None) or None,  # route affected (e.g., 'A', 'D', '6')
                    "stop_id": getattr(inf, "stop_id", None) or None,    # specific stop (if applicable)
                    # extract trip_id safely (nested object)
                    "trip_id": getattr(inf, "trip", None).trip_id if hasattr(inf, "trip") and inf.trip.trip_id else None,
                    "header": _txt(a.header_text),            # short alert summary
                    "description": _txt(a.description_text),  # detailed description
                    "cause": cause,                           # cause (e.g., SIGNAL_PROBLEM)
                    "effect": effect,                         # effect (e.g., SIGNIFICANT_DELAYS)
                    "severity_level": sev,                    # severity level (INFO, WARNING, SEVERE)
                })

    # Convert the collected list of dictionaries into a pandas DataFrame
    # and drop duplicate rows that may occur due to multiple selectors.
    return pd.DataFrame(rows).drop_duplicates()

# ============================================================
# MAIN EXECUTION
# ============================================================
if __name__ == "__main__":
    # Step 1: Fetch live GTFS-Realtime alerts feed from MTA API
    pb = fetch_feed()

    # Step 2: Parse the protobuf feed into a structured DataFrame
    alerts_df = parse_alerts(pb)

    # Step 3: Display the first few rows as a sanity check
    print(alerts_df.head())

# ============================================================
# SAMPLE OUTPUT (structure)
# ============================================================
#   alert_id  is_active_now           start_ts              end_ts route_id  stop_id  trip_id  header                description              cause          effect             severity_level
# 0 12345678  True            2025-10-15 10:00:00 2025-10-15 14:00:00   D       None     None     "Delays on D Line"  "Signal problems at 125 St"  SIGNAL_PROBLEM SIGNIFICANT_DELAYS SEVERE
# ============================================================


           alert_id  is_active_now                  start_ts end_ts route_id  \
0  lmm:alert:474838           True 2025-10-15 16:36:05+00:00    NaT        A   
1  lmm:alert:474838           True 2025-10-15 16:36:05+00:00    NaT        C   
2  lmm:alert:474838           True 2025-10-15 16:36:05+00:00    NaT        E   
3  lmm:alert:474838           True 2025-10-15 16:36:05+00:00    NaT     None   
4  lmm:alert:474835           True 2025-10-15 15:32:44+00:00    NaT        D   

  stop_id trip_id                                             header  \
0    None    None  Uptown [A][C][E] trains are running with delay...   
1    None    None  Uptown [A][C][E] trains are running with delay...   
2    None    None  Uptown [A][C][E] trains are running with delay...   
3     A27    None  Uptown [A][C][E] trains are running with delay...   
4    None    None  Uptown [D] trains are running with delays whil...   

                                         description cause effect  \
0  Uptown [A] tra

# What this code does (purpose → method → output)

## Purpose

Turn raw **alerts** (from your GTFS-RT parser) into **time-aligned features** you can join to your **headway/delay fact table**. The result is one row per `(timestamp_bin, route_id, stop_id)` with:

* a binary `incident_flag`
* one-hot flags for alert **severity** and **effect**
* the most common `alert_type` and `alert_scope` in that bin

These become model inputs for classification/regression (e.g., `delay_class`, `EWT`).

---

## Methodology (step-by-step)

1. **Inputs (assumed schemas)**

   * `alerts_df`: columns like `alert_id, start_ts, end_ts, route_id, stop_id, header, description, cause, effect, severity_level, is_active_now`
   * `fact_df`: your target index with `timestamp_bin` (UTC), `route_id`, `stop_id`

2. **Scope detection**

   * `_alert_scope(row)`:

     * if `stop_id` present → `"stop"` (very specific)
     * else if `route_id` present → `"route"` (line-level)
     * else → `"system"` (agency-wide)

3. **Normalization & tokens**

   * Convert `start_ts`/`end_ts` to timezone-aware UTC datetimes.
   * Uppercase categorical tokens:

     * `severity_token` from `severity_level`
     * `effect_token` from `effect`

4. **Light NLP for `alert_type`**

   * Build a coarse `alert_type` by scanning `header/description`:

     * contains “planned work”/“maintenance” → `PLANNED_WORK`
     * contains “delay”/“running local” → `DELAYS`
     * contains “no service”/“suspended” → `NO_SERVICE`
     * otherwise → `GENERAL`
   * (You can swap in a richer classifier later.)

5. **Temporal expansion into bins**

   * For each alert, generate a **date_range** of bin timestamps between `start_ts` and `end_ts` using `bin_freq` (default `"10min"`).
   * If `end_ts` is missing (open-ended alert), cap it at **now + `horizon_hours`** (default 6h) so bins don’t run forever.

6. **Emit rows by scope**

   * For **route/system** scope: emit `(timestamp_bin, route_id, stop_id=None, …)`
   * For **stop** scope: emit `(timestamp_bin, route_id, stop_id, …)`
   * Keep `alert_id`, `alert_type`, `severity_token`, `effect_token`, `alert_scope`.

7. **Aggregate per (bin, route, stop)**

   * Group by `timestamp_bin, route_id, stop_id` and compute:

     * `incident_flag`: `nunique(alert_id)` → then converted to binary (`>0`)
     * `alert_type`: **mode** within the bin
     * `alert_scope`: **mode** within the bin
     * `sev_set`: set of severities present
     * `eff_set`: set of effects present

8. **One-hot encode**

   * For severity: `INFO, WARNING, SEVERE` → `alert_severity_*` (1 if present in `sev_set`)
   * For effect: `NO_SERVICE, REDUCED_SERVICE, SIGNIFICANT_DELAYS, DETOUR` → `alert_effect_*`

9. **Join to your fact table**

   * Left-merge the aggregated alert features onto `fact_df` by `(timestamp_bin, route_id, stop_id)`.
   * Fill missing one-hots and `incident_flag` with 0 (int).

10. **Edge case handling**

    * If `alerts_df` is empty, return `fact_df` with all alert features set to 0 and `alert_scope/alert_type` as `NA`.

---

## Output (what you get back)

A DataFrame aligned to your **fact table index** with these added columns:

* `incident_flag` (0/1) — at least one alert intersects the bin/scope
* `alert_severity_INFO`, `alert_severity_WARNING`, `alert_severity_SEVERE` (0/1)
* `alert_effect_NO_SERVICE`, `alert_effect_REDUCED_SERVICE`, `alert_effect_SIGNIFICANT_DELAYS`, `alert_effect_DETOUR` (0/1)
* `alert_scope` (mode of `stop`/`route`/`system` for that bin)
* `alert_type` (mode of `PLANNED_WORK` / `DELAYS` / `NO_SERVICE` / `GENERAL`)

### Key parameters you can tune

* `bin_freq="10min"`: temporal granularity of alert influence
* `horizon_hours=6`: cap for open-ended alerts without `end_ts`

### Why this is useful

* Produces **model-ready** binary/categorical features that explain **operational state** in each time bin.
* Respects **spatial scope** (stop vs route vs system) and **temporal overlap**.
* Easy to extend (e.g., more effects, richer NLP, severity weighting, windowed lags).


In [16]:
import pandas as pd
import numpy as np

# --- inputs ---
# alerts_df columns: alert_id, start_ts, end_ts, route_id, stop_id, header, description, cause, effect, severity_level, is_active_now
# fact_df columns:   timestamp_bin (UTC, dt64[ns, UTC]), route_id, stop_id  (your headway/delay table index)

def _alert_scope(row):
    if pd.notna(row.stop_id):  return "stop"
    if pd.notna(row.route_id): return "route"
    return "system"  # (agency-wide/system-wide)

def build_alert_features(alerts_df: pd.DataFrame,
                         fact_df: pd.DataFrame,
                         bin_freq="10min",
                         horizon_hours=6) -> pd.DataFrame:
    df = alerts_df.copy()
    if df.empty:
        return fact_df.assign(
            incident_flag=0,
            alert_severity_INFO=0, alert_severity_WARNING=0, alert_severity_SEVERE=0,
            alert_effect_NO_SERVICE=0, alert_effect_REDUCED_SERVICE=0,
            alert_effect_SIGNIFICANT_DELAYS=0, alert_effect_DETOUR=0,
            alert_scope=pd.NA, alert_type=pd.NA
        )

    # normalize times & scope
    df["start_ts"] = pd.to_datetime(df["start_ts"], utc=True)
    df["end_ts"]   = pd.to_datetime(df["end_ts"],   utc=True)
    df["alert_scope"] = df.apply(_alert_scope, axis=1)

    # map severity/effect to uppercase tokens
    df["severity_token"] = df["severity_level"].str.upper().fillna("UNKNOWN")
    df["effect_token"]   = df["effect"].str.upper().fillna("UNKNOWN")

    # derive a simple alert_type from header/description (customize as you like)
    def infer_type(h, d):
        text = f"{h or ''} {d or ''}".lower()
        if "planned work" in text or "maintenance" in text: return "PLANNED_WORK"
        if "delay" in text or "running local" in text:      return "DELAYS"
        if "no service" in text or "suspended" in text:     return "NO_SERVICE"
        return "GENERAL"
    df["alert_type"] = [infer_type(h, d) for h, d in zip(df["header"], df["description"])]

    # expand alerts into 10-min bins between start and end (cap very long open-ended alerts)
    end_cap = pd.Timestamp.utcnow().tz_localize("UTC") + pd.Timedelta(hours=horizon_hours)
    df["end_ts_filled"] = df["end_ts"].fillna(end_cap)

    records = []
    for idx, r in df.iterrows():
        try:
            bins = pd.date_range(r.start_ts.floor(bin_freq),
                                 r.end_ts_filled.ceil(bin_freq),
                                 freq=bin_freq, tz="UTC")
        except Exception:
            continue
        scope = r.alert_scope
        # emit route-level rows
        if scope in ("route", "system"):
            for ts in bins:
                records.append((ts, r.route_id if pd.notna(r.route_id) else None, None,
                                r.alert_id, r.alert_type, r.severity_token, r.effect_token, scope))
        # emit stop-level rows
        if scope == "stop":
            for ts in bins:
                records.append((ts, r.route_id, r.stop_id,
                                r.alert_id, r.alert_type, r.severity_token, r.effect_token, scope))

    long = pd.DataFrame.from_records(
        records,
        columns=["timestamp_bin","route_id","stop_id","alert_id","alert_type","severity","effect","alert_scope"]
    ).drop_duplicates()

    # Aggregate to one row per (bin, route, stop)
    agg = (long
           .groupby(["timestamp_bin","route_id","stop_id"], dropna=False)
           .agg(
               incident_flag = ("alert_id", "nunique"),
               alert_type    = ("alert_type", lambda s: s.mode().iat[0] if len(s.mode()) else None),
               alert_scope   = ("alert_scope", lambda s: s.mode().iat[0] if len(s.mode()) else None),
               sev_set       = ("severity", lambda s: set(s.dropna())),
               eff_set       = ("effect",   lambda s: set(s.dropna())),
           ).reset_index())

    # one-hots
    for sev in ["INFO","WARNING","SEVERE"]:
        agg[f"alert_severity_{sev}"] = agg["sev_set"].apply(lambda S: int(sev in S))
    for eff in ["NO_SERVICE","REDUCED_SERVICE","SIGNIFICANT_DELAYS","DETOUR"]:
        agg[f"alert_effect_{eff}"] = agg["eff_set"].apply(lambda S: int(eff in S))

    agg["incident_flag"] = (agg["incident_flag"] > 0).astype(int)
    agg = agg.drop(columns=["sev_set","eff_set"])

    # join to fact table
    features = fact_df.merge(
        agg,
        on=["timestamp_bin","route_id","stop_id"],
        how="left",
        suffixes=("","")
    )

    # fill NA where appropriate
    for c in [c for c in features.columns if c.startswith("alert_severity_") or c.startswith("alert_effect_")]:
        features[c] = features[c].fillna(0).astype(int)
    features["incident_flag"] = features["incident_flag"].fillna(0).astype(int)

    return features


Here’s the straight-shot read of what that script does.

# What this code does (purpose → method → output)

## Purpose

Build a **headway fact table** for NYC Subway from the **NYCT GTFS-Realtime (nyct/gtfs)** feed, suitable for analysis and ML. It:

* polls the realtime feed for a short window,
* infers **arrival events** from **VehiclePositions**,
* computes **headways** and rolling stats per `(route_id, stop_id, direction_id)`,
* adds simple time features,
* saves a **Parquet** file you can later enrich (alerts, turnstiles, weather).

---

## How it works (step-by-step)

1. **Config**

   * `DURATION_MIN` (default 20): how long to poll.
   * `POLL_EVERY_SEC` (15): polling cadence.
   * `BIN_FREQ` ("10min"): binning granularity for aggregates.
   * `OUTPUT_PATH`: Parquet output path.
   * Optional `MTA_API_KEY` via env var; sets `x-api-key` header if present.

2. **Fetch GTFS-RT bytes**

   * Tries the two canonical `nyct/gtfs` URLs (with/without `%2F`).
   * Uses browsery headers; raises on HTTP errors.

3. **Parse VehiclePositions**

   * For each entity with `vehicle`:

     * Extracts `route_id`, `stop_id`, `direction_id` (0/1), `vehicle_id`, `current_status`, and a UTC **timestamp** (vehicle ts; falls back to feed header ts; else `now`).
     * Keeps only rows where all of `route_id`, `stop_id`, `direction_id`, `vehicle_id` exist.

4. **Detect “arrival events”**

   * Treats an arrival when `current_status == STOPPED_AT` (GTFS-RT enum value `1`).
   * **De-dupes** with a cooldown: `(vehicle_id, stop_id)` must be at least `ARRIVAL_COOLDOWN_SEC` (60s) apart to register another arrival (prevents repeated detections during dwell).
   * Accumulates arrivals during the polling window, sleeping `POLL_EVERY_SEC` between polls.
   * Returns a DataFrame of arrivals: `arrival_ts, route_id, stop_id, direction_id, vehicle_id`.

5. **Compute headways & features**

   * Sort by `(route_id, direction_id, stop_id, arrival_ts)`.
   * `Hr_sec` = time since previous arrival at that stop & direction; `Hr_min` in minutes.
   * `timestamp_bin` = `arrival_ts.floor(BIN_FREQ)` for aggregation.
   * Rolling stats per `(route, dir, stop)`:

     * `Hr_roll_mean`, `Hr_roll_std` over last 6 observations (≈ ~1 hr if headways ~10 min).
   * Calendar features: `hour`, `weekday`.
   * **Crude `delay_class`** label by comparing `Hr_min` to `Hr_roll_mean`:

     * `on_time` (≤1.25×), `minor_delay` (≤1.75×), `major_delay` (>1.75×), else `unknown`.
     * (Meant as a placeholder until you join schedule-based labels from GTFS-Static.)

6. **Aggregate to fact table**

   * Group by `(timestamp_bin, route_id, stop_id, direction_id)` and take the **last** values in the bin for:

     * `Hr_obs` (last headway in minutes), `Hr_roll_mean`, `Hr_roll_std`, `last_arrival_ts`, `delay_class`.
   * Adds placeholders you’ll join later:

     * `entries_rate` (turnstiles), `precip_mm` (NOAA) initialized to 0.0.

7. **Persist**

   * Writes `fact_transit_performance.parquet` if non-empty.

---

## Output

* **File:** `fact_transit_performance.parquet`
* **Grain:** one row per `(timestamp_bin, route_id, stop_id, direction_id)`
* **Key columns:**

  * `Hr_obs` — most recent headway (min) observed in the bin
  * `Hr_roll_mean`, `Hr_roll_std` — recent rolling stats (min)
  * `last_arrival_ts` — timestamp of the latest arrival in the bin
  * `delay_class` — coarse categorical label
  * `hour`, `weekday` — time features
  * `entries_rate`, `precip_mm` — placeholders for later joins

---

## Key assumptions & limitations (good to note in your capstone)

* **Arrival proxy:** Uses `STOPPED_AT` vehicle status as an arrival event; doesn’t use TripUpdates’ stop-times. This is robust in practice but can miss edge cases.
* **Cooldown heuristic:** 60s filter to avoid duplicate arrivals during dwell; tune for different dwell profiles.
* **Short polling window:** You need longer runs or repeated batches to build a rich dataset; `DURATION_MIN` is just a starter.
* **Delay labeling:** Relative to recent observed headways, **not** schedule adherence. Replace with schedule-based labels by joining GTFS-Static `stop_times.txt` for production.
* **Coverage:** Only routes/stops where VehiclePositions are published with `stop_id` populated will generate arrivals.

---

## Easy extensions (drop-in ideas)

* **Join alerts:** merge the alert features you built (`incident_flag`, severity/effect one-hots) by `(timestamp_bin, route_id, stop_id)`.
* **Weather & demand:** join NOAA precipitation/temperature and turnstile entry rates to model causal drivers.
* **Lag features:** add lagged headways (t-1, t-2), expanding means/std over longer windows.
* **Schedule deviation:** compute `actual_headway – scheduled_headway` using GTFS-Static to get a real “delay” metric.


In [14]:
"""
Build a headway fact table from MTA NYCT GTFS-RT (nyct/gtfs).

Installs (once):
  pip install gtfs-realtime-bindings pandas numpy pyarrow requests

Outputs:
  fact_transit_performance.parquet  (core headway fact table)

Notes:
  - Polls the realtime feed every 15s for DURATION_MIN minutes.
  - Derives arrival events from VehiclePositions when current_status == STOPPED_AT.
  - Computes headways per (route_id, stop_id, direction_id).
  - Adds time features (hour, weekday). You can later join turnstiles/weather.
"""

import os
import time
import math
import requests
import pandas as pd
import numpy as np
from datetime import datetime, timezone
from collections import defaultdict

from google.transit import gtfs_realtime_pb2

# ===============================
# CONFIG
# ===============================
DURATION_MIN      = 20           # how long to poll (minutes). Increase for richer data.
POLL_EVERY_SEC    = 15           # poll cadence
BIN_FREQ          = "10min"      # time bin for aggregates
OUTPUT_PATH       = "fact_transit_performance.parquet"

# NYCT GTFS-RT feed (TripUpdates + VehiclePositions)
GTFS_URLS = [
    "https://api-endpoint.mta.info/Dataservice/mtagtfsfeeds/nyct/gtfs",
    "https://api-endpoint.mta.info/Dataservice/mtagtfsfeeds/nyct%2Fgtfs",
]

HEADERS_BASE = {
    "User-Agent": "Mozilla/5.0",
    "Accept": "application/x-protobuf",
}
API_KEY = os.getenv("MTA_API_KEY")  # optional; some environments still require it

# ===============================
# Helpers
# ===============================
def fetch_gtfs_bytes():
    headers = dict(HEADERS_BASE)
    if API_KEY:
        headers["x-api-key"] = API_KEY
    last_err = None
    for url in GTFS_URLS:
        try:
            r = requests.get(url, headers=headers, timeout=30)
            r.raise_for_status()
            return r.content
        except requests.HTTPError as e:
            last_err = e
            continue
    raise last_err

def parse_vehicle_positions(pb_bytes):
    """
    Returns list of dict rows with:
      ts, route_id, stop_id, direction_id, vehicle_id, current_status
    """
    feed = gtfs_realtime_pb2.FeedMessage()
    feed.ParseFromString(pb_bytes)

    rows = []
    for ent in feed.entity:
        if not ent.HasField("vehicle"):
            continue
        v = ent.vehicle

        # Some producers put route_id in vehicle.trip.route_id, others in vehicle.vehicle.label
        route_id = None
        try:
            if v.trip and v.trip.route_id:
                route_id = v.trip.route_id
        except Exception:
            pass

        stop_id = None
        try:
            if v.stop_id:
                stop_id = v.stop_id
        except Exception:
            pass

        direction_id = None
        try:
            if v.trip and (v.trip.direction_id in (0,1)):
                direction_id = v.trip.direction_id
        except Exception:
            pass

        vehicle_id = None
        try:
            if v.vehicle and v.vehicle.id:
                vehicle_id = v.vehicle.id
        except Exception:
            pass

        # current_status: 0 = IN_TRANSIT_TO, 1 = STOPPED_AT, 2 = INCOMING_AT (per GTFS-RT enum)
        status = None
        try:
            status = v.current_status
        except Exception:
            pass

        # timestamp
        ts = None
        try:
            # vehicle timestamp is epoch seconds
            if v.timestamp:
                ts = datetime.fromtimestamp(int(v.timestamp), tz=timezone.utc)
        except Exception:
            pass
        # fallback to feed header time if missing
        if ts is None:
            try:
                ts = datetime.fromtimestamp(int(feed.header.timestamp), tz=timezone.utc)
            except Exception:
                ts = datetime.now(timezone.utc)

        if route_id and stop_id and (direction_id in (0,1)) and vehicle_id is not None:
            rows.append({
                "obs_ts": ts,
                "route_id": str(route_id),
                "stop_id": str(stop_id),
                "direction_id": int(direction_id),
                "vehicle_id": str(vehicle_id),
                "current_status": int(status) if status is not None else None,
            })
    return rows

# ===============================
# Arrival event extraction logic
# ===============================
"""
We treat an 'arrival event' when a vehicle is observed at
current_status == STOPPED_AT at a stop. We de-dupe by (vehicle_id, stop_id)
with a small cooldown so a long dwell doesn’t produce duplicates.
"""

STOPPED_AT = 1
ARRIVAL_COOLDOWN_SEC = 60  # minimum gap to register another arrival for same (vehicle, stop)

def accumulate_arrivals(duration_min=DURATION_MIN, poll_sec=POLL_EVERY_SEC):
    seen_last_arrival = {}  # (vehicle_id, stop_id) -> last_arrival_ts (epoch)
    arrivals = []           # list of dicts with arrival events

    t_end = time.time() + duration_min * 60.0
    iter_n = 0
    while time.time() < t_end:
        iter_n += 1
        try:
            pb = fetch_gtfs_bytes()
            vp_rows = parse_vehicle_positions(pb)
        except Exception as e:
            # keep going on transient errors
            time.sleep(poll_sec)
            continue

        now_epoch = time.time()
        for r in vp_rows:
            if r["current_status"] != STOPPED_AT:
                continue
            key = (r["vehicle_id"], r["stop_id"])

            last = seen_last_arrival.get(key)
            # register new arrival if cooldown elapsed or first sighting
            if (last is None) or ((now_epoch - last) > ARRIVAL_COOLDOWN_SEC):
                seen_last_arrival[key] = now_epoch
                arrivals.append({
                    "arrival_ts": r["obs_ts"],  # UTC-aware datetime
                    "route_id": r["route_id"],
                    "stop_id": r["stop_id"],
                    "direction_id": r["direction_id"],
                    "vehicle_id": r["vehicle_id"],
                })
        time.sleep(poll_sec)

    return pd.DataFrame(arrivals)

# ===============================
# Build headways & fact table
# ===============================
def build_headways(df_arrivals: pd.DataFrame) -> pd.DataFrame:
    if df_arrivals.empty:
        return pd.DataFrame(columns=[
            "timestamp_bin","route_id","stop_id","direction_id",
            "arrival_ts","Hr_sec","Hr_min","Hr_roll_mean","Hr_roll_std",
            "hour","weekday","delay_class"  # placeholder label if you want to add now
        ])

    df = df_arrivals.sort_values(["route_id","direction_id","stop_id","arrival_ts"]).copy()
    # compute headway = difference between consecutive arrivals at same (route, dir, stop)
    df["prev_arrival_ts"] = df.groupby(["route_id","direction_id","stop_id"])["arrival_ts"].shift(1)
    df["Hr_sec"] = (df["arrival_ts"] - df["prev_arrival_ts"]).dt.total_seconds()
    df = df.dropna(subset=["Hr_sec"])
    df["Hr_min"] = df["Hr_sec"] / 60.0

    # time bin
    df["timestamp_bin"] = df["arrival_ts"].dt.floor(BIN_FREQ)

    # rolling stats per stop/dir
    df["Hr_roll_mean"] = (
        df.groupby(["route_id","direction_id","stop_id"])["Hr_min"]
          .transform(lambda s: s.rolling(window=6, min_periods=2).mean())  # ~ last ~1 hr if 10-min-ish headways
    )
    df["Hr_roll_std"] = (
        df.groupby(["route_id","direction_id","stop_id"])["Hr_min"]
          .transform(lambda s: s.rolling(window=6, min_periods=2).std())
    )

    # simple calendar features
    df["hour"] = df["arrival_ts"].dt.hour
    df["weekday"] = df["arrival_ts"].dt.weekday  # Monday=0

    # OPTIONAL: create a quick categorical 'delay_class' based on headway inflation vs recent mean
    # (For true delay vs schedule you will later join GTFS-Static stop_times)
    def label_delay(hr, hr_mean):
        # crude thresholds; you will replace with schedule-based labels later
        if pd.isna(hr_mean):
            return "unknown"
        ratio = hr / (hr_mean if hr_mean > 0 else hr)
        if ratio <= 1.25:
            return "on_time"
        elif ratio <= 1.75:
            return "minor_delay"
        else:
            return "major_delay"
    df["delay_class"] = [label_delay(hr, m) for hr, m in zip(df["Hr_min"], df["Hr_roll_mean"])]

    # We keep the granular rows (one per arrival); for modeling we aggregate to one row per bin
    fact = (df.groupby(["timestamp_bin","route_id","stop_id","direction_id"])
              .agg(
                  Hr_obs          = ("Hr_min","last"),
                  Hr_roll_mean    = ("Hr_roll_mean","last"),
                  Hr_roll_std     = ("Hr_roll_std","last"),
                  last_arrival_ts = ("arrival_ts","last"),
                  delay_class     = ("delay_class","last")
              )
              .reset_index())

    # placeholders you can fill later when you join other sources
    fact["entries_rate"] = 0.0     # join turnstiles later
    fact["precip_mm"]    = 0.0     # join NOAA later

    return fact

# ===============================
# Orchestrate
# ===============================
def main():
    print(f"Polling nyct/gtfs for {DURATION_MIN} minutes…")
    arrivals = accumulate_arrivals(DURATION_MIN, POLL_EVERY_SEC)
    print(f"Collected arrival events: {len(arrivals):,}")

    fact = build_headways(arrivals)
    print(f"Headway fact rows: {len(fact):,}")

    # Save
    if not fact.empty:
        fact.to_parquet(OUTPUT_PATH, index=False)
        print(f"Wrote: {OUTPUT_PATH}")
    else:
        print("No data collected. Increase DURATION_MIN and try again.")

if __name__ == "__main__":
    main()


Polling nyct/gtfs for 20 minutes…


KeyboardInterrupt: 

In [12]:
# X = numerical + alert features
alert_cols = [
  "incident_flag","alert_severity_INFO","alert_severity_WARNING","alert_severity_SEVERE",
  "alert_effect_NO_SERVICE","alert_effect_REDUCED_SERVICE","alert_effect_SIGNIFICANT_DELAYS","alert_effect_DETOUR"
]
num_cols = ["Hr_roll_mean","Hr_roll_std","entries_rate","precip_mm","hour","weekday"]  # example
X = features[num_cols + alert_cols]
y = features["delay_class"]  # or Hr_next/EWT_pw_sec

from xgboost import XGBClassifier
clf = XGBClassifier(
    n_estimators=500, max_depth=8, learning_rate=0.05,
    subsample=0.9, colsample_bytree=0.8, eval_metric="mlogloss", random_state=42
)
clf.fit(X, y)


NameError: name 'features' is not defined

In [13]:
mask = features["incident_flag"] == 1
print("Alert windows F1:", f1_score(y[mask], clf.predict(X[mask]), average="macro"))
print("Normal windows F1:", f1_score(y[~mask], clf.predict(X[~mask]), average="macro"))


NameError: name 'features' is not defined

### **Merged Dataset: Headway Fact Table × Alert Features**

This process creates a unified, modeling-ready dataset that combines operational headway data with service alert indicators.
Below is an overview of what this step does and what the resulting dataset looks like.

---

#### **Purpose**

* Integrates train performance data (headways, delays, and timing features) with corresponding MTA service alert data.
* Produces a single dataset where each record represents a specific ten-minute time bin for a given route and stop.
* Enables modeling and analysis of how different alert types and severities influence train delays.

---

#### **Method**

* Both datasets’ timestamps are standardized to UTC to ensure alignment across time bins.
* A **left join** is performed on the keys:

  * **timestamp_bin** (the 10-minute interval)
  * **route_id** (the subway line)
  * **stop_id** (the affected station)
* Missing alert fields are replaced with default values:

  * Numeric flags → 0 (meaning no alert)
  * Text categories → “none” (meaning not applicable)
* A quick summary prints the total number of bins joined and how many contained active alerts.
* The merged dataset is saved as **fact_with_alerts.parquet** for later use in modeling.

---

#### **Resulting Columns**

* **timestamp_bin** – start time of the 10-minute interval (UTC)
* **route_id** – subway line identifier (e.g., “A”, “6”, “D”)
* **stop_id** – station identifier (e.g., “R14”)
* **Hr_obs** – most recent observed headway (minutes)
* **Hr_roll_mean**, **Hr_roll_std** – recent rolling mean and standard deviation of headway
* **delay_class** – categorical label: on_time, minor_delay, major_delay
* **incident_flag** – 1 if an alert was active during the bin; 0 otherwise
* **alert_severity_INFO**, **alert_severity_WARNING**, **alert_severity_SEVERE** – one-hot indicators for severity level
* **alert_effect_NO_SERVICE**, **alert_effect_REDUCED_SERVICE**, **alert_effect_SIGNIFICANT_DELAYS**, **alert_effect_DETOUR** – one-hot indicators for operational impact
* **alert_scope** – level of impact (stop, route, or system)
* **alert_type** – general classification (DELAYS, PLANNED_WORK, NO_SERVICE, or GENERAL)

---

#### **Sample Output (illustrative)**

| timestamp_bin        | route_id | stop_id | Hr_obs | delay_class | incident_flag | alert_severity_SEVERE | alert_effect_SIGNIFICANT_DELAYS | alert_scope | alert_type   |
| -------------------- | -------- | ------- | ------ | ----------- | ------------- | --------------------- | ------------------------------- | ----------- | ------------ |
| 2025-10-15 10:00 UTC | D        | R14     | 9.2    | on_time     | 0             | 0                     | 0                               | none        | none         |
| 2025-10-15 10:10 UTC | D        | R14     | 15.8   | minor_delay | 1             | 1                     | 1                               | route       | DELAYS       |
| 2025-10-15 10:20 UTC | D        | R14     | 21.5   | major_delay | 1             | 0                     | 1                               | route       | PLANNED_WORK |

---

#### **Outcome**

* The merged table now holds both **performance metrics** and **alert indicators**.
* It serves as the foundation for **supervised learning** or **causal analysis** linking service disruptions to train delay patterns.


### **Merged Dataset: Headway Fact Table × Alert Features**

This step produces a single, modeling-ready dataset that fuses the train-performance metrics from your headway fact table with alert indicators from the GTFS-Realtime alerts feed.

---

#### **Purpose**

* Combine operational headway and delay metrics with service-alert information for each ten-minute time bin.
* Provide a complete view of how route- or stop-level alerts coincide with observed headway performance.
* Prepare a unified table for statistical analysis or machine-learning models.

---

#### **Method**

* Convert timestamps in both tables to UTC to align bins.
* Perform a **left join** on the shared keys:
  • `timestamp_bin` • `route_id` • `stop_id`
* Fill missing alert values with defaults so periods without alerts are explicitly coded:
  • numeric flags → 0 • text categories → “none”.
* Print a brief summary confirming dataset size and number of bins with active alerts.
* Save the merged result as **fact_with_alerts.parquet** for downstream use.

---

#### **Key Columns**

* **timestamp_bin** – start of the ten-minute interval (UTC)
* **route_id** – subway line identifier (e.g., A, 6, D)
* **stop_id** – station identifier
* **Hr_obs**, **Hr_roll_mean**, **Hr_roll_std** – observed and rolling headways (minutes)
* **delay_class** – categorical label: on_time / minor_delay / major_delay
* **incident_flag** – 1 if any alert active, 0 otherwise
* **alert_severity_INFO**, **_WARNING**, **_SEVERE** – severity one-hot flags
* **alert_effect_NO_SERVICE**, **_REDUCED_SERVICE**, **_SIGNIFICANT_DELAYS**, **_DETOUR** – effect one-hot flags
* **alert_scope** – impact level (stop / route / system / none)
* **alert_type** – classified alert category (DELAYS, PLANNED_WORK, NO_SERVICE, GENERAL, none)

---

#### **Sample Output (illustrative)**

| timestamp_bin        | route_id | stop_id | Hr_obs | delay_class | incident_flag | alert_severity_SEVERE | alert_effect_SIGNIFICANT_DELAYS | alert_scope | alert_type   |
| -------------------- | -------- | ------- | ------ | ----------- | ------------- | --------------------- | ------------------------------- | ----------- | ------------ |
| 2025-10-15 10:00 UTC | D        | R14     | 9.2    | on_time     | 0             | 0                     | 0                               | none        | none         |
| 2025-10-15 10:10 UTC | D        | R14     | 15.8   | minor_delay | 1             | 1                     | 1                               | route       | DELAYS       |
| 2025-10-15 10:20 UTC | D        | R14     | 21.5   | major_delay | 1             | 0                     | 1                               | route       | PLANNED_WORK |

---

#### **Outcome**

* Each record now includes both **performance** and **alert context**, forming the foundation for modeling the relationship between service disruptions and train delay behavior.


In [17]:
# ============================================================
# Merge: Headway Fact Table  ×  Alert Features
# ============================================================

# --- Inputs expected ---
# fact_df   → output from build_headways()
# alerts_df → output from build_alert_features()

# Ensure timestamps are aligned and UTC-normalized
fact_df["timestamp_bin"] = pd.to_datetime(fact_df["timestamp_bin"], utc=True)
alerts_df["timestamp_bin"] = pd.to_datetime(alerts_df["timestamp_bin"], utc=True)

# --- Join operation (left join retains all headway rows) ---
merged_df = fact_df.merge(
    alerts_df[
        [
            "timestamp_bin", "route_id", "stop_id",
            "incident_flag",
            "alert_severity_INFO", "alert_severity_WARNING", "alert_severity_SEVERE",
            "alert_effect_NO_SERVICE", "alert_effect_REDUCED_SERVICE",
            "alert_effect_SIGNIFICANT_DELAYS", "alert_effect_DETOUR",
            "alert_scope", "alert_type"
        ]
    ],
    on=["timestamp_bin", "route_id", "stop_id"],
    how="left",
    suffixes=("", "_alert")
)

# --- Fill missing alert features with defaults (no alert observed) ---
for c in [
    "incident_flag",
    "alert_severity_INFO", "alert_severity_WARNING", "alert_severity_SEVERE",
    "alert_effect_NO_SERVICE", "alert_effect_REDUCED_SERVICE",
    "alert_effect_SIGNIFICANT_DELAYS", "alert_effect_DETOUR"
]:
    merged_df[c] = merged_df[c].fillna(0).astype(int)

merged_df["alert_scope"] = merged_df["alert_scope"].fillna("none")
merged_df["alert_type"]  = merged_df["alert_type"].fillna("none")

# --- Optional sanity check ---
print("Merged dataset shape:", merged_df.shape)
print("Alerts joined on:", merged_df[['incident_flag']].sum().item(), "bins with active alerts")

# --- Save merged dataset ---
merged_df.to_parquet("fact_with_alerts.parquet", index=False)
print("✅ fact_with_alerts.parquet written successfully.")


NameError: name 'fact_df' is not defined

In [18]:
# gtfs_static_reader.py
# ------------------------------------------------------------
# Read Static GTFS (local ZIP), resolve service active on a date,
# build timezone-aware scheduled stop times, and compute headways.
#
# Usage (example at bottom):
#   gtfs = read_gtfs_zip("nyct_gtfs_static.zip")
#   svc = active_service_ids(gtfs, "2025-10-15")
#   sched = build_scheduled_stop_times(gtfs, "2025-10-15", tz="America/New_York")
#   hw = scheduled_headways_by_stop_route_dir(sched, bin_freq="10min")
# ------------------------------------------------------------

import io
import zipfile
from datetime import datetime, date, timedelta
from zoneinfo import ZoneInfo

import numpy as np
import pandas as pd


# ---------------------------
# Core loaders
# ---------------------------
def read_gtfs_zip(path_zip: str, tables=None) -> dict[str, pd.DataFrame]:
    """
    Load selected GTFS tables from a local .zip into pandas DataFrames.
    Returns a dict like {"stops": df, "trips": df, ...}.
    """
    default_tables = [
        "agency", "routes", "trips", "stops",
        "stop_times", "calendar", "calendar_dates", "shapes"
    ]
    want = set(tables or default_tables)

    dfs = {}
    with zipfile.ZipFile(path_zip, "r") as zf:
        names_in_zip = {n.lower(): n for n in zf.namelist()}
        for t in want:
            fname = f"{t}.txt"
            if fname not in names_in_zip and fname.lower() in names_in_zip:
                fname = names_in_zip[fname.lower()]
            if fname not in names_in_zip:
                # Not all feeds include all files (e.g., calendar or shapes may be missing)
                continue
            with zf.open(fname) as f:
                # Read as UTF-8 with dtype inference; you can add specific dtypes if needed.
                dfs[t] = pd.read_csv(io.TextIOWrapper(f, encoding="utf-8"))
    return dfs


# ---------------------------
# Service calendar resolution
# ---------------------------
def _to_ymd(d) -> date:
    if isinstance(d, str):
        return datetime.strptime(d, "%Y-%m-%d").date()
    if isinstance(d, datetime):
        return d.date()
    return d  # assume date


def _dow_mask(target: date) -> str:
    # GTFS calendar columns are monday..sunday (1/0)
    return ["monday","tuesday","wednesday","thursday","friday","saturday","sunday"][target.weekday()]


def active_service_ids(gtfs: dict, when, *, include_exceptions=True) -> set[str]:
    """
    Return the set of service_id that run on 'when' (YYYY-MM-DD|date|datetime).
    Uses calendar + calendar_dates (exceptions).
    """
    d = _to_ymd(when)

    base = set()
    cal = gtfs.get("calendar")
    if cal is not None and not cal.empty:
        # filter date range and day-of-week
        cal = cal.copy()
        cal["start_date"] = pd.to_datetime(cal["start_date"], format="%Y%m%d", errors="coerce")
        cal["end_date"]   = pd.to_datetime(cal["end_date"],   format="%Y%m%d", errors="coerce")
        mask = (
            (cal["start_date"].dt.date <= d) &
            (cal["end_date"].dt.date   >= d) &
            (cal[_dow_mask(d)] == 1)
        )
        base = set(cal.loc[mask, "service_id"].astype(str))

    if include_exceptions:
        cdx = gtfs.get("calendar_dates")
        if cdx is not None and not cdx.empty:
            cdx = cdx.copy()
            cdx["date"] = pd.to_datetime(cdx["date"], format="%Y%m%d", errors="coerce").dt.date
            # exception_type: 1 = service added; 2 = service removed
            added  = set(cdx.loc[(cdx["date"] == d) & (cdx["exception_type"] == 1), "service_id"].astype(str))
            removed= set(cdx.loc[(cdx["date"] == d) & (cdx["exception_type"] == 2), "service_id"].astype(str))
            base = (base | added) - removed

    return base


# ---------------------------
# Time parsing helpers
# ---------------------------
def parse_gtfs_hhmmss_to_seconds(s: str) -> int | None:
    """
    GTFS times can exceed 24:00:00 (e.g., 27:15:00).
    Return seconds from service-day midnight. None for invalid.
    """
    if pd.isna(s):
        return None
    try:
        parts = str(s).split(":")
        if len(parts) != 3:
            return None
        h, m, sec = int(parts[0]), int(parts[1]), int(parts[2])
        return h*3600 + m*60 + sec
    except Exception:
        return None


def seconds_to_datetime_local(base_date: date, seconds_from_midnight: int, tz: str) -> datetime:
    """
    Convert seconds since service-day midnight (possibly > 24h) into
    a timezone-aware datetime in local time.
    """
    base_dt = datetime.combine(base_date, datetime.min.time()).replace(tzinfo=ZoneInfo(tz))
    return base_dt + timedelta(seconds=int(seconds_from_midnight))


# ---------------------------
# Build scheduled stop times
# ---------------------------
def build_scheduled_stop_times(
    gtfs: dict,
    service_date,
    tz: str = "America/New_York",
    include_departure=False,
    keep_columns_extra=None
) -> pd.DataFrame:
    """
    Return a DataFrame of scheduled stop events for the given service_date with:
      ['service_date','route_id','trip_id','stop_id','stop_sequence','direction_id',
       'arrival_sec','arrival_time','departure_sec','departure_time','shape_id', ...]
    Times are timezone-aware datetimes in 'tz'.
    """
    d = _to_ymd(service_date)
    svc_ids = active_service_ids(gtfs, d)
    if not svc_ids:
        return pd.DataFrame(columns=[
            "service_date","route_id","trip_id","stop_id","stop_sequence","direction_id",
            "arrival_sec","arrival_time","departure_sec","departure_time","shape_id"
        ])

    trips = gtfs.get("trips", pd.DataFrame()).copy()
    stop_times = gtfs.get("stop_times", pd.DataFrame()).copy()
    routes = gtfs.get("routes", pd.DataFrame()).copy()

    # Filter trips by active service_ids
    trips = trips[trips["service_id"].astype(str).isin(svc_ids)].copy()
    if trips.empty or stop_times.empty:
        return pd.DataFrame(columns=[
            "service_date","route_id","trip_id","stop_id","stop_sequence","direction_id",
            "arrival_sec","arrival_time","departure_sec","departure_time","shape_id"
        ])

    # Join stop_times ↔ trips ↔ routes (route_id, direction_id, shape_id)
    st = stop_times.merge(
        trips[["trip_id","route_id","direction_id","shape_id"]].astype({"trip_id":"string"}),
        on="trip_id", how="inner"
    ).merge(
        routes[["route_id"]].astype({"route_id":"string"}),
        on="route_id", how="left"
    )

    # Parse HH:MM:SS → seconds from midnight (can exceed 86400)
    st["arrival_sec"]   = st["arrival_time"].apply(parse_gtfs_hhmmss_to_seconds)
    st["departure_sec"] = st["departure_time"].apply(parse_gtfs_hhmmss_to_seconds)

    # Build timezone-aware datetimes
    st["arrival_time"]   = st["arrival_sec"].apply(lambda s: seconds_to_datetime_local(d, s, tz) if pd.notna(s) else pd.NaT)
    st["departure_time"] = st["departure_sec"].apply(lambda s: seconds_to_datetime_local(d, s, tz) if pd.notna(s) else pd.NaT)

    # Clean types
    st["stop_sequence"] = pd.to_numeric(st["stop_sequence"], errors="coerce").astype("Int64")
    st["direction_id"]  = pd.to_numeric(st["direction_id"], errors="coerce").astype("Int8")
    st["service_date"]  = pd.Timestamp(d).tz_localize(ZoneInfo(tz))

    cols = [
        "service_date","route_id","trip_id","stop_id","stop_sequence","direction_id",
        "arrival_sec","arrival_time"
    ]
    if include_departure:
        cols += ["departure_sec","departure_time"]
    if "shape_id" in st.columns:
        cols += ["shape_id"]
    if keep_columns_extra:
        cols += [c for c in keep_columns_extra if c in st.columns]

    st = st[cols].sort_values(["route_id","direction_id","stop_id","arrival_time"])
    return st.reset_index(drop=True)


# ---------------------------
# Scheduled headways (optional)
# ---------------------------
def scheduled_headways_by_stop_route_dir(sched_df: pd.DataFrame, bin_freq="10min") -> pd.DataFrame:
    """
    Compute scheduled headways at each (route_id, stop_id, direction_id).
    Returns an aggregated fact table by time bin:
      ['timestamp_bin','route_id','stop_id','direction_id','Sch_Hr_min']
    """
    if sched_df.empty:
        return pd.DataFrame(columns=[
            "timestamp_bin","route_id","stop_id","direction_id","Sch_Hr_min"
        ])

    df = sched_df.dropna(subset=["arrival_time"]).copy()
    df["arrival_time"] = pd.to_datetime(df["arrival_time"], utc=False).dt.tz_convert("UTC")
    df = df.sort_values(["route_id","direction_id","stop_id","arrival_time"])

    # Headway in minutes between consecutive scheduled arrivals at the same stop/direction
    df["prev_arrival"] = df.groupby(["route_id","direction_id","stop_id"])["arrival_time"].shift(1)
    df["Sch_Hr_min"] = (df["arrival_time"] - df["prev_arrival"]).dt.total_seconds() / 60.0
    df = df.dropna(subset=["Sch_Hr_min"])

    # Bin on UTC to align with your realtime facts (which you stored in UTC bins)
    df["timestamp_bin"] = df["arrival_time"].dt.tz_convert("UTC").dt.floor(bin_freq)

    fact = (df.groupby(["timestamp_bin","route_id","stop_id","direction_id"])
              .agg(Sch_Hr_min=("Sch_Hr_min","last"))
              .reset_index())

    return fact


# ---------------------------
# Example usage (uncomment and edit path/date)
# ---------------------------
if __name__ == "__main__":
    # 1) Point to your local GTFS static ZIP
    GTFS_ZIP = "nyct_gtfs_static.zip"  # e.g., downloaded from your MTA developer portal

    # 2) Load core tables
    gtfs = read_gtfs_zip(GTFS_ZIP)

    # 3) Pick a service date (NYC time)
    service_date = "2025-10-15"

    # 4) Build scheduled stop times with TZ-aware datetimes
    sched = build_scheduled_stop_times(gtfs, service_date, tz="America/New_York", include_departure=True)
    print("Scheduled rows:", len(sched))
    print(sched.head(6))

    # 5) (Optional) Compute scheduled headways per stop/route/direction, binned to 10 minutes
    sch_hw = scheduled_headways_by_stop_route_dir(sched, bin_freq="10min")
    print("Scheduled headway fact rows:", len(sch_hw))
    print(sch_hw.head(6))


FileNotFoundError: [Errno 2] No such file or directory: 'nyct_gtfs_static.zip'

In [19]:
import io
import zipfile
import requests
import pandas as pd

# URL of the live MTA Subway GTFS (static)
GTFS_URL = "https://rrgtfsfeeds.s3.amazonaws.com/gtfs_subway.zip"

def read_gtfs_from_url(url):
    r = requests.get(url, timeout=60)
    r.raise_for_status()
    zf = zipfile.ZipFile(io.BytesIO(r.content))
    gtfs_tables = {}
    for name in zf.namelist():
        if name.endswith(".txt"):
            with zf.open(name) as f:
                gtfs_tables[name.replace(".txt", "")] = pd.read_csv(f)
    return gtfs_tables

# Example: load and inspect
gtfs = read_gtfs_from_url(GTFS_URL)
print(gtfs.keys())
print(gtfs["routes"].head())


dict_keys(['agency', 'calendar_dates', 'calendar', 'routes', 'shapes', 'stop_times', 'stops', 'transfers', 'trips'])
  route_id agency_id route_short_name            route_long_name  \
0        1  MTA NYCT                1  Broadway - 7 Avenue Local   
1        2  MTA NYCT                2           7 Avenue Express   
2        3  MTA NYCT                3           7 Avenue Express   
3        4  MTA NYCT                4   Lexington Avenue Express   
4        5  MTA NYCT                5   Lexington Avenue Express   

                                          route_desc  route_type  \
0  Trains operate between 242 St in the Bronx and...           1   
1  Trains operate between Wakefield-241 St, Bronx...           1   
2  Trains operate between 148 St, Manhattan, and ...           1   
3  Trains operate daily between Woodlawn/Jerome A...           1   
4  Weekdays daytime, most trains operate between ...           1   

                                        route_url route_color rou