# MTA Tracker - Real-time Transit Data Pipeline

Fetch, parse, and analyze real-time MTA GTFS data.

## 1. Setup

In [None]:
import requests
import json
import csv
import struct
from pathlib import Path
from datetime import datetime
from typing import Optional, Dict, Any, List, Set
from collections import defaultdict
import logging

logging.basicConfig(level=logging.INFO, format='%(message)s')
print('✓ Libraries loaded')

In [None]:
class MTATracker:
    BASE_URL = 'https://api-endpoint.mta.info/Dataservice/mtagtfsfeeds/nyct%2Fgtfs'
    TIMEOUT = 30
    
    def __init__(self):
        self.session = requests.Session()
        self.data = None
    
    def fetch_data(self):
        try:
            response = self.session.get(self.BASE_URL, timeout=self.TIMEOUT)
            self.data = response.content
            return self.data
        except Exception as e:
            print(f'✗ Error: {e}')
            return None
    
    def close(self):
        self.session.close()

print('✓ MTATracker class defined')

In [None]:
tracker = MTATracker()
data = tracker.fetch_data()

if data:
    print(f'✓ Fetched {len(data):,} bytes from MTA API')
else:
    print('✗ Failed to fetch data')

## 2. Data Transformation

In [None]:
class ProtobufParser:
    @staticmethod
    def decode_varint(data: bytes, pos: int) -> tuple:
        value = 0
        shift = 0
        while pos < len(data):
            byte = data[pos]
            pos += 1
            value |= (byte & 0x7f) << shift
            if (byte & 0x80) == 0:
                break
            shift += 7
        return value, pos
    
    @staticmethod
    def parse_feed(data: bytes) -> Dict[str, Any]:
        feed = {'header': {}, 'entities': []}
        pos = 0
        
        while pos < len(data):
            tag, pos = ProtobufParser.decode_varint(data, pos)
            field_num = tag >> 3
            wire_type = tag & 0x07
            
            if wire_type == 0:
                value, pos = ProtobufParser.decode_varint(data, pos)
            elif wire_type == 2:
                length, pos = ProtobufParser.decode_varint(data, pos)
                field_data = data[pos:pos+length]
                pos += length
                
                if field_num == 1:
                    feed['header'] = ProtobufParser.parse_header(field_data)
                elif field_num == 2:
                    entity = ProtobufParser.parse_entity(field_data)
                    if entity:
                        feed['entities'].append(entity)
            elif wire_type == 5:
                pos += 4
            elif wire_type == 1:
                pos += 8
        
        return feed
    
    @staticmethod
    def parse_header(data: bytes) -> Dict[str, Any]:
        header = {}
        pos = 0
        
        while pos < len(data):
            tag, pos = ProtobufParser.decode_varint(data, pos)
            field_num = tag >> 3
            wire_type = tag & 0x07
            
            if wire_type == 0:
                value, pos = ProtobufParser.decode_varint(data, pos)
                if field_num == 1:
                    header['gtfs_realtime_version'] = str(value)
                elif field_num == 2:
                    header['timestamp'] = value
                elif field_num == 3:
                    header['incrementality'] = value
            elif wire_type == 2:
                length, pos = ProtobufParser.decode_varint(data, pos)
                pos += length
        
        return header
    
    @staticmethod
    def parse_entity(data: bytes) -> Optional[Dict[str, Any]]:
        entity = {}
        pos = 0
        
        while pos < len(data):
            tag, pos = ProtobufParser.decode_varint(data, pos)
            field_num = tag >> 3
            wire_type = tag & 0x07
            
            if wire_type == 2:
                length, pos = ProtobufParser.decode_varint(data, pos)
                field_data = data[pos:pos+length]
                pos += length
                
                if field_num == 1:
                    entity['id'] = field_data.decode('utf-8', errors='ignore')
                elif field_num == 2:
                    trip_update = ProtobufParser.parse_trip_update(field_data)
                    if trip_update:
                        entity['trip_update'] = trip_update
                        entity['type'] = 'TRIP_UPDATE'
                elif field_num == 3:
                    vehicle = ProtobufParser.parse_vehicle(field_data)
                    if vehicle:
                        entity['vehicle'] = vehicle
                        entity['type'] = 'VEHICLE'
                elif field_num == 4:
                    alert = ProtobufParser.parse_alert(field_data)
                    if alert:
                        entity['alert'] = alert
                        entity['type'] = 'ALERT'
        
        return entity if 'id' in entity else None
    
    @staticmethod
    def parse_trip_update(data: bytes) -> Optional[Dict[str, Any]]:
        trip = {}
        pos = 0
        
        while pos < len(data):
            tag, pos = ProtobufParser.decode_varint(data, pos)
            field_num = tag >> 3
            wire_type = tag & 0x07
            
            if wire_type == 2:
                length, pos = ProtobufParser.decode_varint(data, pos)
                field_data = data[pos:pos+length]
                pos += length
                
                if field_num == 1:
                    trip['trip'] = ProtobufParser._parse_trip_descriptor(field_data)
                elif field_num == 2:
                    trip['delay'] = ProtobufParser._parse_stop_time_update(field_data)
        
        return trip if trip else None
    
    @staticmethod
    def parse_vehicle(data: bytes) -> Optional[Dict[str, Any]]:
        vehicle = {}
        pos = 0
        
        while pos < len(data):
            tag, pos = ProtobufParser.decode_varint(data, pos)
            field_num = tag >> 3
            wire_type = tag & 0x07
            
            if wire_type == 0:
                value, pos = ProtobufParser.decode_varint(data, pos)
            elif wire_type == 2:
                length, pos = ProtobufParser.decode_varint(data, pos)
                field_data = data[pos:pos+length]
                pos += length
                
                if field_num == 1:
                    vehicle['trip'] = ProtobufParser._parse_trip_descriptor(field_data)
                elif field_num == 2:
                    vehicle['position'] = ProtobufParser._parse_position(field_data)
                elif field_num == 3:
                    vehicle['current_stop_sequence'] = ProtobufParser._parse_stop_sequence(field_data)
                elif field_num == 5:
                    vehicle['label'] = field_data.decode('utf-8', errors='ignore')
            elif wire_type == 5:
                pos += 4
            elif wire_type == 1:
                pos += 8
        
        return vehicle if vehicle else None
    
    @staticmethod
    def parse_alert(data: bytes) -> Optional[Dict[str, Any]]:
        alert = {}
        pos = 0
        
        while pos < len(data):
            tag, pos = ProtobufParser.decode_varint(data, pos)
            field_num = tag >> 3
            wire_type = tag & 0x07
            
            if wire_type == 2:
                length, pos = ProtobufParser.decode_varint(data, pos)
                field_data = data[pos:pos+length]
                pos += length
                
                if field_num == 6:
                    alert['header_text'] = field_data.decode('utf-8', errors='ignore')
                elif field_num == 7:
                    alert['description_text'] = field_data.decode('utf-8', errors='ignore')
        
        return alert if alert else None
    
    @staticmethod
    def _parse_trip_descriptor(data: bytes) -> Dict[str, str]:
        trip = {}
        pos = 0
        while pos < len(data):
            tag, pos = ProtobufParser.decode_varint(data, pos)
            field_num = tag >> 3
            if (tag & 0x07) == 2:
                length, pos = ProtobufParser.decode_varint(data, pos)
                field_data = data[pos:pos+length]
                pos += length
                if field_num == 1:
                    trip['route_id'] = field_data.decode('utf-8', errors='ignore')
                elif field_num == 3:
                    trip['trip_id'] = field_data.decode('utf-8', errors='ignore')
        return trip
    
    @staticmethod
    def _parse_position(data: bytes) -> Dict[str, float]:
        pos_data = {}
        pos = 0
        while pos < len(data):
            tag, pos = ProtobufParser.decode_varint(data, pos)
            field_num = tag >> 3
            if (tag & 0x07) == 5:
                lat_bytes = data[pos:pos+4]
                pos += 4
                if field_num == 1:
                    pos_data['latitude'] = struct.unpack('>f', lat_bytes)[0]
            elif (tag & 0x07) == 5:
                lon_bytes = data[pos:pos+4]
                pos += 4
                if field_num == 2:
                    pos_data['longitude'] = struct.unpack('>f', lon_bytes)[0]
        return pos_data
    
    @staticmethod
    def _parse_stop_time_update(data: bytes) -> Optional[int]:
        pos = 0
        while pos < len(data):
            tag, pos = ProtobufParser.decode_varint(data, pos)
            if (tag & 0x07) == 0:
                value, pos = ProtobufParser.decode_varint(data, pos)
                if (tag >> 3) == 2:
                    return value
        return None
    
    @staticmethod
    def _parse_stop_sequence(data: bytes) -> Optional[int]:
        pos = 0
        tag, pos = ProtobufParser.decode_varint(data, pos)
        if (tag & 0x07) == 0:
            value, pos = ProtobufParser.decode_varint(data, pos)
            return value
        return None

print('✓ ProtobufParser class defined')

In [None]:
if data:
    feed = ProtobufParser.parse_feed(data)
    
    vehicles = sum(1 for e in feed['entities'] if e.get('type') == 'VEHICLE')
    trip_updates = sum(1 for e in feed['entities'] if e.get('type') == 'TRIP_UPDATE')
    alerts = sum(1 for e in feed['entities'] if e.get('type') == 'ALERT')
    
    print(f'✓ Parsed {len(feed["entities"])} entities')
    print(f'  - Vehicles: {vehicles}')
    print(f'  - Trip Updates: {trip_updates}')
    print(f'  - Alerts: {alerts}')

In [None]:
if data and 'feed' in locals():
    logs_dir = Path('logs')
    logs_dir.mkdir(exist_ok=True)
    
    ts = datetime.now().strftime('%Y%m%d_%H%M%S')
    run_dir = logs_dir / ts
    run_dir.mkdir(parents=True, exist_ok=True)
    
    # Save JSON
    json_file = run_dir / f'mta_feed_{ts}.json'
    with open(json_file, 'w') as f:
        json.dump(feed, f, indent=2, default=str)
    
    # Save CSV
    csv_file = run_dir / f'mta_entities_{ts}.csv'
    with open(csv_file, 'w', newline='') as f:
        writer = csv.writer(f)
        writer.writerow(['Entity_ID', 'Type', 'Route', 'Trip_ID', 'Delay_Seconds'])
        
        for entity in feed['entities']:
            entity_id = entity.get('id', '')
            ent_type = entity.get('type', 'UNKNOWN')
            route = entity.get('trip_update', {}).get('trip', {}).get('route_id', '')
            trip_id = entity.get('trip_update', {}).get('trip', {}).get('trip_id', '')
            delay = entity.get('trip_update', {}).get('delay', '')
            writer.writerow([entity_id, ent_type, route, trip_id, delay])
    
    # Save metadata
    meta_file = run_dir / f'mta_metadata_{ts}.txt'
    with open(meta_file, 'w') as f:
        f.write(f'Timestamp: {ts}\n')
        f.write(f'Total Entities: {len(feed["entities"])}\n')
        f.write(f'Vehicles: {vehicles}\n')
        f.write(f'Trip Updates: {trip_updates}\n')
        f.write(f'Alerts: {alerts}\n')
    
    print(f'✓ Saved to logs/{ts}/')
    print(f'  - {json_file.name}')
    print(f'  - {csv_file.name}')
    print(f'  - {meta_file.name}')

## 3. Implementation

Add custom analysis and features here.