# Python Data Processing

Functionally: the MTA doesn't like us sending requests to their API from client
devices. The server needs to make the requests and buffer them for the client.
This means doing protobuf processing in python (unless there's a way to pass the
unmodified protobuf files to the already working TypeScript processor). It
remains to be seen just how much of the data processing should happen in python
and how much should stay on client-side. I'm going to end up making my own API.

So what data would we like to receive from a rendering standpoint? Assuming a 30s lag time...

A list of **Trains**:

**Train**:
- tripID : string
- nextStop : { stopTime: number, stopID: string }
- prevStop : { stopTime: number, stopID: string }
- route_id : string

This is made a little easier than it would have been on the client side, because
the server can be *always running* and have a backlog of data. We can buffer the
data through a database. One python script feeds the database, and the server
(who never really uses the realtime URLs) just pulls trains from the database to
serve to clients.



In [16]:
import requests
from google.transit import gtfs_realtime_pb2
import sqlite3
import time

In [17]:
url = "https://api-endpoint.mta.info/Dataservice/mtagtfsfeeds/nyct%2Fgtfs-l"

feed = gtfs_realtime_pb2.FeedMessage()
response = requests.get(url)
feed.ParseFromString(response.content)
print("Current unix time:", int(time.time()))

Current unix time: 1748444965


Combine fields from trip_updates and vehicles. Data for each train is split across these two objects--we combine them into one so we can always index by the route id (which is unique).

In [18]:
from collections import defaultdict
updates = defaultdict(lambda: {'trip':{},'trip_update':{},'vehicle':{}})
n = 0
for entity in feed.entity:
    n += 1

    if entity.HasField('trip_update'):
        tid = entity.trip_update.trip.trip_id
        updates[tid]['trip_update'] = entity.trip_update
        updates[tid]['trip'] = entity.trip_update.trip
    
    if entity.HasField('vehicle'):
        tid = entity.vehicle.trip.trip_id
        updates[tid]['vehicle'] = entity.vehicle
    

print(n)

51


In [19]:
for obj in dict(updates).values():
    print(obj)
    print('='*10)

{'trip': trip_id: "063650_L..N"
start_date: "20250528"
schedule_relationship: SCHEDULED
route_id: "L"
direction_id: 0
, 'trip_update': trip {
  trip_id: "063650_L..N"
  start_date: "20250528"
  schedule_relationship: SCHEDULED
  route_id: "L"
  direction_id: 0
}
stop_time_update {
  stop_sequence: 20
  arrival {
    delay: 48
    time: 1748444973
    uncertainty: 0
  }
  departure {
    delay: 48
    time: 1748444988
    uncertainty: 0
  }
  stop_id: "L05N"
  schedule_relationship: SCHEDULED
}
stop_time_update {
  stop_sequence: 21
  arrival {
    delay: 48
    time: 1748445033
    uncertainty: 0
  }
  departure {
    delay: 48
    time: 1748445048
    uncertainty: 0
  }
  stop_id: "L03N"
  schedule_relationship: SCHEDULED
}
stop_time_update {
  stop_sequence: 22
  arrival {
    delay: 48
    time: 1748445123
    uncertainty: 0
  }
  departure {
    delay: 48
    time: 1748445138
    uncertainty: 0
  }
  stop_id: "L02N"
  schedule_relationship: SCHEDULED
}
stop_time_update {
  stop_seq

Stop time updates work differently than I actually thought. Rather than storing
old information, they store the stop times for *all* future stops. This is
great, but still presents some challenges. We need to get the previous stop and its time.

In [34]:
pos = 0
ct = 0
for obj in dict(updates).values():
    ct += 1
    ts = obj['trip_update'].timestamp

    for stu in obj['trip_update'].stop_time_update:
        #print(stu)
        if (stu.arrival.time < ts):
            #print("Passed stop")
            pos += 1
            break
        #print('='*10)

    

print(pos, ct)



17 32
