In [1]:
%load_ext autoreload
%autoreload 2
import os 
os.chdir("/Users/luohy/Documents/Projects/bus-observatory/gtfs-realtime-performance")
from src.s3 import list_files_in_bucket, filter_files_by_pattern, read_parquet_from_s3, load_all_parquet_files
import pandas as pd
from concurrent.futures import ThreadPoolExecutor, as_completed
from src.gtfs_segments import GTFS_shape_processor
from src.speeds import BusSpeedCalculator
from src.api import parse_zipped_gtfs
import geopandas as gpd
from src.api import query_feed_data, get_access_token
ACCESS_TOKEN = get_access_token()
pd.set_option('display.max_columns', None)
import matplotlib.pyplot as plt
from collections import defaultdict
from shapely.geometry import Point, Polygon
from datetime import datetime
import pytz
import contextily as ctx


# Preparation 

In [2]:
prefix = "norm/bus-mta-vp/vehicles/"
bucket = "dataclinic-gtfs-rt"
mdb_id = "mdb-513"

## Step 1: Figure out which static feed is correct.

Pretty annoying. The trips and shapes are defined in the static gtfs feeds, and everytime a new static feed is released, the old `trip_id` values become obsolete. This is unfortunate because . It should be possible to just load all of the relevant static feeds and run a quick check on the relevant realtime feed data to see which static feed is in correct for the given dates. However, we can also just look through the outputs from MobilityData's API and find the one we want. Below, I load all the feeds for Manhattan.

TODO:
- save segment as geojson to /table-map

In [5]:
response = query_feed_data("mdb-513", ACCESS_TOKEN)
# https://mobilitydatabase.org/feeds/mdb-513
response

[{'id': 'mdb-513-202402080022',
  'feed_id': 'mdb-513',
  'hosted_url': 'https://files.mobilitydatabase.org/mdb-513/mdb-513-202402080022/mdb-513-202402080022.zip',
  'note': None,
  'downloaded_at': '2024-02-08T00:52:25.481924Z',
  'hash': '25ebc713eff01d5d5da7e6a877926d636152417153dc591ef0763fa0851b3819',
  'bounding_box': {'minimum_latitude': 40.701536,
   'maximum_latitude': 40.865316,
   'minimum_longitude': -74.018088,
   'maximum_longitude': -73.864375},
  'validation_report': None,
  'service_date_range_start': None,
  'service_date_range_end': None},
 {'id': 'mdb-513-202404010033',
  'feed_id': 'mdb-513',
  'hosted_url': 'https://files.mobilitydatabase.org/mdb-513/mdb-513-202404010033/mdb-513-202404010033.zip',
  'note': None,
  'downloaded_at': '2024-04-01T00:26:36.207481Z',
  'hash': '4e9fce8767c4fe3eb5f0529ea49d29270543d0563d7fbf0d8c4c0920b61a3fbf',
  'bounding_box': {'minimum_latitude': 40.701536,
   'maximum_latitude': 40.865316,
   'minimum_longitude': -74.018088,
   'max

In [6]:
def fetch_all_static_feeds(mdb_id: str, ACCESS_TOKEN: str) -> pd.DataFrame:
    response = query_feed_data(mdb_id, ACCESS_TOKEN)
    if response is None:
        raise ValueError("No response for mdb_id: ", mdb_id)
    # Convert the response to a DataFrame
    feed_updates = pd.DataFrame(response)
    # Handle missing service_date_range_start values
    # If service_date_range_start is null, fill it with the date from downloaded_at
    if "service_date_range_start" in feed_updates.columns:
        # Extract date portion (YYYY-MM-DD) from the downloaded_at timestamp
        extracted_dates = feed_updates["downloaded_at"].str.extract(r"(\d{4}-\d{2}-\d{2})")[0]
        # Only fill null values, keeping existing dates as they are
        feed_updates["service_date_range_start"] = feed_updates["service_date_range_start"].fillna(extracted_dates)
    return feed_updates   

mdb_id = "mdb-513"
feed_updates = fetch_all_static_feeds(mdb_id, ACCESS_TOKEN=ACCESS_TOKEN)
preview_columns = ['id', 'service_date_range_start', 'hosted_url']
print(feed_updates[preview_columns])


                     id service_date_range_start  \
0  mdb-513-202402080022               2024-02-08   
1  mdb-513-202404010033               2024-04-01   
2  mdb-513-202407010038               2024-07-01   
3  mdb-513-202408290052               2024-08-29   
4  mdb-513-202409090026               2024-09-09   
5  mdb-513-202412120015               2024-08-31   
6  mdb-513-202501020055               2025-01-04   
7  mdb-513-202501230024               2025-01-04   
8  mdb-513-202502170105               2025-02-09   

                                          hosted_url  
0  https://files.mobilitydatabase.org/mdb-513/mdb...  
1  https://files.mobilitydatabase.org/mdb-513/mdb...  
2  https://files.mobilitydatabase.org/mdb-513/mdb...  
3  https://files.mobilitydatabase.org/mdb-513/mdb...  
4  https://files.mobilitydatabase.org/mdb-513/mdb...  
5  https://files.mobilitydatabase.org/mdb-513/mdb...  
6  https://files.mobilitydatabase.org/mdb-513/mdb...  
7  https://files.mobilitydatabase.org/m

In [19]:
# Define url and date range
url = "https://files.mobilitydatabase.org/mdb-513/mdb-513-202501020055/mdb-513-202501020055.zip"
start = "2025-01-05"
end = "2025-01-23"
feed_id = "mdb-513-202501020055"

# Based on the correct url
segment_df = GTFS_shape_processor(url, 4326, 2263).process_shapes()
GTFS_dict = parse_zipped_gtfs(url)

# Preview segment and GTFS_dict
print("--- segment_df.columns ---")
print(segment_df.columns)
print("--- GTFS_dict.keys() ---")
print(GTFS_dict.keys())

Parsed GTFS static feed


  return lib.line_locate_point(line, other)


Parsed GTFS static feed
--- segment_df.columns ---
Index(['trip_id', 'shape_id', 'stop_sequence', 'stop_id', 'stop_name',
       'prev_stop_id', 'prev_stop_name', 'projected_position',
       'prev_projected_position', 'segment_length', 'geometry'],
      dtype='object')
--- GTFS_dict.keys() ---
dict_keys(['agency.txt', 'calendar.txt', 'calendar_dates.txt', 'routes.txt', 'shapes.txt', 'stops.txt', 'stop_times.txt', 'trips.txt'])


# Step 2: Bus Speeds Calculation


In [8]:
def calculate_speeds_for_date_route(segment_df: pd.DataFrame, GTFS_dict: dict, bucket: str, date: str, route_id: str):

    # Load relevant realtime data from s3 bucket for the given date
    daily_files = list_files_in_bucket(bucket_name = bucket, prefix = f"{prefix}date={date}/")
    vehicle_positions = load_all_parquet_files(file_list = daily_files, bucket = bucket)
    # Filter by route
    vehicle_positions = vehicle_positions[vehicle_positions['trip.route_id'] == route_id]

    # Calculate speeds
    speed_calculator = BusSpeedCalculator(vehicle_positions, GTFS_dict, segment_df)
    speeds = speed_calculator.create_trip_speeds()
    speeds["route_id"] = route_id

    # Drop cols that are not needed
    speeds.drop(columns = ["stop_sequence", "stop_name", "prev_stop_name", "projected_position", "prev_projected_position", "unique_trip_id"], inplace = True)
    
    # Remove outlier
    speeds = speeds[speeds["speed_mph"] < 70]

    # Timezone conversion
    eastern_tz = pytz.timezone('America/New_York')
    speeds['interpolated_time'] = pd.to_datetime(speeds['interpolated_time'])
    speeds['datetime_nyc'] = speeds['interpolated_time'].dt.tz_localize('UTC').dt.tz_convert(eastern_tz)
    
    # add date column: datetime.date objects
    speeds["date"] = speeds["datetime_nyc"].dt.date
    # add weekday column: integer
    speeds["weekday"] = speeds["datetime_nyc"].dt.weekday
    # add hour column: integer
    speeds["hour"] = speeds["datetime_nyc"].dt.hour
    # drop interpolated_time
    speeds.drop(columns = ["interpolated_time"], inplace = True)

    return speeds

In [10]:
# # Test
# date = "2025-01-11"
# route_id = "M50"

# speeds = calculate_speeds_for_date_route(segment_df, GTFS_dict, bucket, date, route_id)

# speeds.head()

# Step 3: batch - raw speed
TODOs:
- nested loop: for dates, for route
    - can be improved with spark?

- design of parquet file
    - one large file?
    - partition by route? date?

In [25]:
# Config
# Define url and date range
url = "https://files.mobilitydatabase.org/mdb-513/mdb-513-202501020055/mdb-513-202501020055.zip"
start = "2025-01-05"
end = "2025-01-07"
feed_id = "mdb-513-202501020055"

# Based on the correct url
segment_df = GTFS_shape_processor(url, 4326, 2263).process_shapes()
GTFS_dict = parse_zipped_gtfs(url)

# create folder for date range under data 
# data/raw-speed/start_to_end
if not os.path.exists(f'data/raw-speeds/{feed_id}'):
    os.makedirs(f'data/raw-speeds/{feed_id}')

# 1. save GTFS "stops.txt" df as Parquet
GTFS_dict["stops.txt"].to_parquet(f'data/raw-speeds/{feed_id}/stops.parquet')

# 2. save segment df as GEOJSON
segment_df.to_file(f'data/raw-speeds/{feed_id}/segments.geojson', driver='GeoJSON')

# 3. save speed df as PARQUET: bus_speeds_{date}.parquet
# for date in date ranges
# for routes: filter out selected routes

# Get date list from start and end date: List[str]
date_list = pd.date_range(start=start, end=end).strftime('%Y-%m-%d').tolist()
print("--- date range ---")
print(date_list)

# Define route list: List[str] 
route_list = ["M50"]
print("--- route list ---")
print(route_list)

Parsed GTFS static feed


  return lib.line_locate_point(line, other)


Parsed GTFS static feed
--- date range ---
['2025-01-05', '2025-01-06', '2025-01-07']
--- route list ---
['M50']


In [26]:
# Batch Processing: date
for date in date_list:

    print(f"-- Processing Date {date} --")

    # First check if data is already in destination 
    if os.path.exists(f"data/raw-speeds/{feed_id}/bus_speeds_{date}.parquet"):
        print(f"Data already exists for {date}")
        print("Skipping to next date")
        continue

    # Load relevant realtime data from s3 bucket for the given date
    daily_files = list_files_in_bucket(bucket_name = bucket, prefix = f"{prefix}date={date}/")
    try:
        vehicle_positions = load_all_parquet_files(file_list = daily_files, bucket = bucket) # Read xx files from s3
    except Exception as e:
        print(f"Error loading parquet files for {date}: {e}")
        continue
    
    # Filter by route
    vehicle_positions = vehicle_positions[vehicle_positions['trip.route_id'].isin(route_list)]      

    # Calculate speeds
    speed_calculator = BusSpeedCalculator(vehicle_positions, GTFS_dict, segment_df)
    try:
        speeds = speed_calculator.create_trip_speeds() # Processing xx trips...
    except Exception as e:
        print(f"Error calculating speeds for {date}: {e}")
        continue

    # Check if speeds is empty, raise error and skip this date
    if speeds.empty:
        print(f"No data found for {date}. Check if the feed id and date match.")
        print("Skipping to next date")
        continue
    
    # Add route_id from the trip id
    speeds["route_id"] = speeds["trip_id"].str.split("_").str[-2]

    # Drop cols that are not needed
    speeds.drop(columns = ["stop_sequence", "stop_name", "prev_stop_name", "projected_position", "prev_projected_position", "unique_trip_id"], inplace = True)
    
    # Remove outlier
    speeds = speeds[speeds["speed_mph"] < 70]

    # Timezone conversion
    eastern_tz = pytz.timezone('America/New_York')
    speeds['interpolated_time'] = pd.to_datetime(speeds['interpolated_time'])
    speeds['datetime_nyc'] = speeds['interpolated_time'].dt.tz_localize('UTC').dt.tz_convert(eastern_tz)
    
    # Add date column: datetime.date objects
    speeds["date"] = speeds["datetime_nyc"].dt.date
    # Add weekday column: integer
    speeds["weekday"] = speeds["datetime_nyc"].dt.weekday
    # Add hour column: integer
    speeds["hour"] = speeds["datetime_nyc"].dt.hour
    # Drop interpolated_time
    speeds.drop(columns = ["interpolated_time"], inplace = True)

    speeds.to_parquet(f"data/raw-speeds/{feed_id}/bus_speeds_{date}.parquet")
    print(f"Wrote daily data for {date}")


Processing Date:  2025-01-05
Data already exists for 2025-01-05
Skipping to next date
Processing Date:  2025-01-06
Data already exists for 2025-01-06
Skipping to next date
Processing Date:  2025-01-07
Read 1401 parquet files from s3
Processing 133 trips...


100%|██████████| 133/133 [00:00<00:00, 364.13it/s]

Wrote daily data for 2025-01-07





# Step 4: batch - aggregation

TODOs:

- table for chart
- table for map