In [6]:
%pip install gtfs-realtime-bindings protobuf

Collecting gtfs-realtime-bindings
  Using cached gtfs_realtime_bindings-2.0.0-py3-none-any.whl.metadata (650 bytes)
Using cached gtfs_realtime_bindings-2.0.0-py3-none-any.whl (5.3 kB)
Installing collected packages: gtfs-realtime-bindings
Successfully installed gtfs-realtime-bindings-2.0.0
Note: you may need to restart the kernel to use updated packages.


In [2]:
import requests
from google.transit import gtfs_realtime_pb2
from google.protobuf.json_format import MessageToDict

url = "https://api-endpoint.mta.info/Dataservice/mtagtfsfeeds/camsys%2Fsubway-alerts"

try:
    response = requests.get(url)
    response.raise_for_status()
    
    print(f"Response Status: {response.status_code}")
    
    # Parse Protocol Buffer
    feed = gtfs_realtime_pb2.FeedMessage()
    feed.ParseFromString(response.content)
    
    # Convert to Dict for easier reading
    feed_dict = MessageToDict(feed)
    
    print(f"Successfully fetched {len(feed_dict.get('entity', []))} alerts.")
    
    if feed_dict.get('entity'):
        print("Sample Alert:")
        print(feed_dict['entity'][0])
        
except Exception as e:
    print(f"Error: {e}")

Response Status: 200
Successfully fetched 368 alerts.
Sample Alert:
{'id': 'lmm:alert:496244', 'alert': {'activePeriod': [{'start': '1767371924', 'end': '1767372519'}], 'informedEntity': [{'agencyId': 'MTASBWY', 'routeId': '1'}, {'agencyId': 'MTASBWY', 'stopId': '125'}], 'headerText': {'translation': [{'text': 'Uptown [1] trains are running with delays after NYPD conducted an investigation at 59 St-Columbus Circle.', 'language': 'en'}, {'text': '<p>Uptown [1] trains are running with delays after NYPD conducted an investigation at <b>59 St-Columbus Circle</b>.</p>', 'language': 'en-html'}]}}}


In [3]:
import time

def parse_alert_to_features(alert_entity, target_route_id="A"):
    """
    Parses a single GTFS-Realtime alert entity into model features.
    Returns None if the alert is not relevant to the target route.
    """
    alert = alert_entity.get('alert', {})
    
    # 1. Filter by Route
    # Check if this alert applies to our target line
    is_relevant = False
    for entity in alert.get('informedEntity', []):
        if entity.get('routeId') == target_route_id:
            is_relevant = True
            break
    
    if not is_relevant:
        return None

    # 2. Check Active Period
    # Alerts often have start/end times. We only care if it's active NOW.
    # (For historical training, we would check if it overlaps with the training window)
    current_time = time.time()
    is_active = False
    active_periods = alert.get('activePeriod', [])
    
    if not active_periods:
        # If no time specified, assume active
        is_active = True
    else:
        for period in active_periods:
            start = int(period.get('start', 0))
            end = int(period.get('end', 9999999999)) # Default to far future
            if start <= current_time <= end:
                is_active = True
                break
    
    if not is_active:
        return None

    # 3. Extract Semantics (Cause & Effect)
    # Ideally, we use the 'cause' and 'effect' enums if provided.
    # If missing (like in your sample), we fall back to keyword matching on the text.
    
    header_text = ""
    if 'headerText' in alert and 'translation' in alert['headerText']:
        # Get English text
        for t in alert['headerText']['translation']:
            if t.get('language') == 'en':
                header_text = t.get('text', '').lower()
                break
    
    # Simple Keyword Mapping (Feature Engineering)
    features = {
        "is_delay": 0,
        "is_reroute": 0,
        "is_suspension": 0,
        "cause_police": 0,
        "cause_medical": 0,
        "cause_signal": 0,
        "cause_mechanical": 0
    }
    
    # Map Effects
    if "delay" in header_text: features["is_delay"] = 1
    if "reroute" in header_text or "running over" in header_text: features["is_reroute"] = 1
    if "suspended" in header_text: features["is_suspension"] = 1
    
    # Map Causes
    if "investigation" in header_text or "nypd" in header_text or "police" in header_text: features["cause_police"] = 1
    if "medical" in header_text or "ems" in header_text: features["cause_medical"] = 1
    if "signal" in header_text: features["cause_signal"] = 1
    if "mechanical" in header_text or "brakes" in header_text: features["cause_mechanical"] = 1

    return features

# --- Test with the fetched data ---
# We'll look for alerts on the '1' line since that's what your sample showed, 
# just to verify the logic works.
print("Parsing alerts for Route '1'...")
active_alerts_vector = []

if 'feed_dict' in locals():
    for entity in feed_dict.get('entity', []):
        features = parse_alert_to_features(entity, target_route_id="1")
        if features:
            print(f"Found Active Alert: {features}")
            # In a real pipeline, we would aggregate these (e.g., take the max across all active alerts)
            active_alerts_vector.append(features)
            
    print(f"\nTotal Active Relevant Alerts: {len(active_alerts_vector)}")
else:
    print("feed_dict not found. Run the previous cell first.")

Parsing alerts for Route '1'...
Found Active Alert: {'is_delay': 1, 'is_reroute': 0, 'is_suspension': 0, 'cause_police': 1, 'cause_medical': 0, 'cause_signal': 0, 'cause_mechanical': 0}

Total Active Relevant Alerts: 1
