In [None]:
# build_db.py
import duckdb, pandas as pd, zipfile, io
from pyproj import Transformer

from pathlib import Path
GTFS_ZIP = str(Path(__file__).with_name("data") / "gtfs_m.zip")
DB_PATH = str(Path(__file__).with_name("data") / "mta_gtfs.duckdb")

# GTFS_ZIP = "gtfs_m.zip"  
# DB_PATH  = "mta_gtfs.duckdb"

## Load and setup data

### Load GTFS files

In [10]:
def read_csv(z, name, **kw):
    with z.open(name) as f:
        return pd.read_csv(io.TextIOWrapper(f, encoding=kw.pop("encoding", "utf-8")), **kw)

def to_sec(hms):
    # supports times like 24:15:00+
    h, m, s = map(int, hms.split(":"))
    return h*3600 + m*60 + s

with zipfile.ZipFile(GTFS_ZIP) as z:
    routes = read_csv(z, "routes.txt")
    trips  = read_csv(z, "trips.txt")
    stops  = read_csv(z, "stops.txt")
    stimes = read_csv(z, "stop_times.txt")
    cal    = read_csv(z, "calendar.txt")
    caldates = read_csv(z, "calendar_dates.txt") if "calendar_dates.txt" in z.namelist() else pd.DataFrame()

# times → seconds
stimes["arrival_sec"] = stimes["arrival_time"].map(to_sec)
# base stop level only
# stops = stops.rename(columns={"stop_lat":"lat","stop_lon":"lon"})
stops = stops.loc[stops.get("location_type", 0).fillna(0).eq(0), ["stop_id","stop_name","stop_lat","stop_lon","parent_station"]]

# project to EPSG:2263 (feet-based CRS for NYC)
tf = Transformer.from_crs("EPSG:4326", "EPSG:2263", always_xy=True)
x, y = tf.transform(stops["stop_lon"].values, stops["stop_lat"].values)
stops["x2263"] = x
stops["y2263"] = y

### Create Database

In [15]:
con = duckdb.connect(DB_PATH)
con.execute("INSTALL spatial; LOAD spatial;")  # lets us use ST_* if desired

# write tables
con.register("routes_df", routes)
con.register("trips_df", trips)
con.register("stops_df", stops)
con.register("stimes_df", stimes)
con.register("cal_df", cal)
if not caldates.empty:
    con.register("caldates_df", caldates)


# Base/source tables (persisted) -> raw tables from GTFS data
con.execute("CREATE OR REPLACE TABLE routes_base AS SELECT * FROM routes_df")
con.execute("""
            CREATE OR REPLACE TABLE trips_base AS
            SELECT trip_id, route_id, direction_id, trip_headsign, service_id
            FROM trips_df
""")
con.execute("""
            CREATE OR REPLACE TABLE stop_times_base AS
            SELECT trip_id, stop_id, stop_sequence, arrival_sec  -- arrival_sec already added in pandas
            FROM stimes_df
""")
con.execute("""
            CREATE OR REPLACE TABLE calendar_base AS
            SELECT CAST(start_date AS VARCHAR) AS start_date,
                CAST(end_date   AS VARCHAR) AS end_date,
                service_id, monday,tuesday,wednesday,thursday,friday,saturday,sunday
            FROM cal_df
""")
if not caldates.empty:
    con.execute("""
    CREATE OR REPLACE TABLE calendar_dates_base AS
    SELECT service_id, CAST(date AS VARCHAR) AS date, exception_type
    FROM caldates_df
    """)

# Dim tables (persisted) -> reference tables (cleaned/descriptive lookup tables)
con.execute("CREATE OR REPLACE TABLE dim_routes AS SELECT * FROM routes_base")
con.execute("""
CREATE OR REPLACE TABLE dim_stops  AS
SELECT stop_id, stop_name, stop_lat, stop_lon, parent_station, x2263, y2263
FROM stops_df
""")

# Fact table (persisted) -> big event table, every stop arrival (to be queried from dim tables)
con.execute("""
CREATE OR REPLACE TABLE fact_stop_events AS
SELECT
  t.route_id,
  t.direction_id,
  t.service_id,
  st.stop_id,
  st.stop_sequence,
  st.arrival_sec,
  t.trip_id
FROM stop_times_base st
JOIN trips_base      t ON st.trip_id = t.trip_id
""")

# Convenience copy of trips with just what you need (persisted)
con.execute("""
CREATE OR REPLACE TABLE dim_trips AS
SELECT trip_id, route_id, direction_id, trip_headsign, service_id
FROM trips_base
""")

# helper VIEWS for day types
con.execute("""
CREATE OR REPLACE VIEW svcs_weekday AS
SELECT DISTINCT service_id
FROM calendar_base
WHERE monday=1 OR tuesday=1 OR wednesday=1 OR thursday=1 OR friday=1
""")
con.execute("""
CREATE OR REPLACE VIEW svcs_saturday AS
SELECT DISTINCT service_id
FROM calendar_base
WHERE saturday=1
""")
con.execute("""
CREATE OR REPLACE VIEW svcs_sunday AS
SELECT DISTINCT service_id
FROM calendar_base
WHERE sunday=1
""")


print(f"✅ Built {DB_PATH}")

✅ Built mta_gtfs.duckdb


## Explore DuckDB

In [46]:
con.close()

In [None]:
# con = duckdb.connect(DB_PATH, read_only=True)

# # If you want spatial functions (ST_*):
# con.execute("INSTALL spatial; LOAD spatial;")

In [None]:
import os
from pathlib import Path
import duckdb
import pandas as pd

PARQ_BASE = Path(r"parquet").as_posix()

con = duckdb.connect()  # in-memory; we're just querying Parquet

In [2]:
def attach_parquet_views(base: str):
    con.execute(f"CREATE OR REPLACE VIEW dim_stops AS SELECT * FROM read_parquet('{base}/dim_stops/*.parquet')")
    con.execute(f"CREATE OR REPLACE VIEW dim_trips AS SELECT * FROM read_parquet('{base}/dim_trips/*.parquet')")
    con.execute(f"CREATE OR REPLACE VIEW dim_routes AS SELECT * FROM read_parquet('{base}/dim_routes/*.parquet')")
    con.execute(f"CREATE OR REPLACE VIEW calendar_base AS SELECT * FROM read_parquet('{base}/calendar_base/*.parquet')")
    con.execute(f"CREATE OR REPLACE VIEW fact_stop_events AS SELECT * FROM read_parquet('{base}/fact_stop_events/*.parquet')")

attach_parquet_views(PARQ_BASE)

# quick sanity: row counts
con.execute("""
SELECT 'dim_stops' t, COUNT(*) n FROM dim_stops
UNION ALL SELECT 'dim_trips', COUNT(*) FROM dim_trips
UNION ALL SELECT 'dim_routes', COUNT(*) FROM dim_routes
UNION ALL SELECT 'calendar_base', COUNT(*) FROM calendar_base
UNION ALL SELECT 'fact_stop_events', COUNT(*) FROM fact_stop_events
""").fetchdf()


Unnamed: 0,t,n
0,dim_stops,14456
1,dim_trips,230451
2,dim_routes,1532
3,calendar_base,136
4,fact_stop_events,7447673


In [3]:
feeds = con.execute("SELECT DISTINCT feed_id FROM dim_routes ORDER BY feed_id").fetchdf()
feeds

Unnamed: 0,feed_id
0,mta-nyct-bus-bronx
1,mta-nyct-bus-brooklyn
2,mta-nyct-bus-busco
3,mta-nyct-bus-manhattan
4,mta-nyct-bus-queens
5,mta-nyct-bus-si


In [4]:
df_top_routes = con.execute("""
SELECT feed_id, route_id, COUNT(*) AS stop_events
FROM fact_stop_events
GROUP BY 1,2
ORDER BY stop_events DESC
LIMIT 25
""").fetchdf()
df_top_routes

Unnamed: 0,feed_id,route_id,stop_events
0,mta-nyct-bus-brooklyn,B6,135232
1,mta-nyct-bus-brooklyn,B15,97827
2,mta-nyct-bus-si,S78,80098
3,mta-nyct-bus-brooklyn,B82,77476
4,mta-nyct-bus-manhattan,M101,76687
5,mta-nyct-bus-brooklyn,B41,75908
6,mta-nyct-bus-brooklyn,B46,74993
7,mta-nyct-bus-brooklyn,B44,72989
8,mta-nyct-bus-brooklyn,B8,71967
9,mta-nyct-bus-manhattan,M15,71490


In [5]:
df_stops = con.execute("""
SELECT feed_id, COUNT(DISTINCT stop_id) AS n_stops
FROM dim_stops
GROUP BY 1
ORDER BY 1
""").fetchdf()
df_stops

Unnamed: 0,feed_id,n_stops
0,mta-nyct-bus-bronx,1893
1,mta-nyct-bus-brooklyn,4600
2,mta-nyct-bus-busco,2758
3,mta-nyct-bus-manhattan,1826
4,mta-nyct-bus-queens,1414
5,mta-nyct-bus-si,1965


In [6]:
# List tables
con.execute("SHOW ALL TABLES").fetchdf()

Unnamed: 0,database,schema,name,column_names,column_types,temporary
0,memory,main,calendar_base,"[service_id, monday, tuesday, wednesday, thurs...","[VARCHAR, BIGINT, BIGINT, BIGINT, BIGINT, BIGI...",False
1,memory,main,dim_routes,"[route_id, agency_id, route_short_name, route_...","[VARCHAR, VARCHAR, VARCHAR, VARCHAR, VARCHAR, ...",False
2,memory,main,dim_stops,"[stop_id, stop_name, stop_desc, lat, lon, loca...","[BIGINT, VARCHAR, VARCHAR, DOUBLE, DOUBLE, BIG...",False
3,memory,main,dim_trips,"[trip_id, route_id, direction_id, service_id, ...","[VARCHAR, VARCHAR, BIGINT, VARCHAR, VARCHAR, V...",False
4,memory,main,fact_stop_events,"[route_id, direction_id, service_id, stop_id, ...","[VARCHAR, BIGINT, VARCHAR, BIGINT, BIGINT, BIG...",False


In [7]:
# Peek at a table
con.execute("SELECT * FROM dim_stops LIMIT 5").fetchdf()

Unnamed: 0,stop_id,stop_name,stop_desc,lat,lon,location_type,parent_station,zone_id,feed_id,x2263,y2263
0,100014,BEDFORD PK BLVD/GRAND CONCOURSE,,40.872562,-73.888156,0,,,mta-nyct-bus-bronx,1015182.0,257195.345044
1,100017,PAUL AV/W 205 ST,,40.876836,-73.88971,0,,,mta-nyct-bus-bronx,1014750.0,258751.986309
2,100018,PAUL AV/WEST MOSHOLU PKWY SOUTH,,40.880392,-73.886081,0,,,mta-nyct-bus-bronx,1015752.0,260048.862093
3,100019,GRAND CONCOURSE/E 138 ST,,40.813496,-73.929489,0,,,mta-nyct-bus-bronx,1003768.0,235663.490524
4,100020,GRAND CONCOURSE/E 144 ST,,40.816812,-73.928001,0,,,mta-nyct-bus-bronx,1004179.0,236871.962862


In [36]:
con.execute("SELECT * FROM dim_routes LIMIT 5").fetchdf()

Unnamed: 0,route_id,agency_id,route_short_name,route_long_name,route_desc,route_type,route_color,route_text_color,feed_id
0,B1,MTA NYCT,B1,Bay Ridge - Manhattan Beach,via 86th St / Ocean Pkwy,3,00AEEF,FFFFFF,mta-nyct-bus-bronx
1,B11,MTA NYCT,B11,Sunset Park - Midwood,via 49th & 50th St / Avenue J,3,006CB7,FFFFFF,mta-nyct-bus-bronx
2,B12,MTA NYCT,B12,Lefferts Gardens - East New York,via Clarkson Av / Empire Blvd / East New York Av,3,6CBE45,FFFFFF,mta-nyct-bus-bronx
3,B13,MTA NYCT,B13,Spring Creek - Wyckoff Hospital,via Crescent St / Jamaica Av / Wyckoff Av,3,FAA61A,FFFFFF,mta-nyct-bus-bronx
4,B14,MTA NYCT,B14,Spring Creek - Crown Heights,via Sutter Av / Pitkin Av,3,00AEEF,FFFFFF,mta-nyct-bus-bronx


In [38]:
con.execute("SELECT * FROM calendar_base LIMIT 5").fetchdf()

Unnamed: 0,service_id,monday,tuesday,wednesday,thursday,friday,saturday,sunday,start_date,end_date,feed_id
0,GH_D5-Sunday,0,0,0,0,0,0,1,20250629,20260101,mta-nyct-bus-bronx
1,GH_D5-Weekday-SDon,1,1,1,1,1,0,0,20250630,20260102,mta-nyct-bus-bronx
2,GH_D5-Weekday-SDon-BM,1,1,1,1,0,0,1,20250629,20260101,mta-nyct-bus-bronx
3,GH_D5-Saturday,0,0,0,0,0,1,0,20250705,20260103,mta-nyct-bus-bronx
4,GH_O5-Weekday,1,1,1,1,1,0,0,20251013,20251231,mta-nyct-bus-bronx


In [None]:
# schema
con.execute("DESCRIBE dim_stops").fetchdf()         # or:
con.execute("PRAGMA table_info('dim_stops')").fetchdf()

In [39]:
con.execute("""
SELECT table_schema, table_name, column_name, data_type
FROM information_schema.columns
WHERE table_schema='main'
ORDER BY table_name, ordinal_position
""").fetchdf()

Unnamed: 0,table_schema,table_name,column_name,data_type
0,main,calendar_base,service_id,VARCHAR
1,main,calendar_base,monday,BIGINT
2,main,calendar_base,tuesday,BIGINT
3,main,calendar_base,wednesday,BIGINT
4,main,calendar_base,thursday,BIGINT
5,main,calendar_base,friday,BIGINT
6,main,calendar_base,saturday,BIGINT
7,main,calendar_base,sunday,BIGINT
8,main,calendar_base,start_date,BIGINT
9,main,calendar_base,end_date,BIGINT


### Query Testing

In [8]:
feeds["feed_id"].tolist()[0:4]

['mta-nyct-bus-bronx',
 'mta-nyct-bus-brooklyn',
 'mta-nyct-bus-busco',
 'mta-nyct-bus-manhattan']

In [20]:
def to_sec(hms: str) -> int:
    hh, mm, *rest = hms.split(":")
    ss = int(rest[0]) if rest else 0
    return int(hh) * 3600 + int(mm) * 60 + ss

def buses_by_stop_route_dir_within_radius(
    lon: float,
    lat: float,
    start_time: str,       # "HH:MM" or "HH:MM:SS"
    end_time: str,         # "HH:MM" or "HH:MM:SS"
    day_type: str,         # "Weekday" | "Saturday" | "Sunday"
    radius_ft: int = 250,
    selected_feeds: list[str] | None = None, 
    con: duckdb.DuckDBPyConnection | None = None,
) -> pd.DataFrame:
    """
    Returns one row per (route_id, direction_id, stop_id) within radius,
    with stop name + lat/lon and count of buses in the inclusive time window.
    Handles midnight-spanning windows (e.g., 23:30–00:30).
    """

    # project the query point to EPSG:2263 (NY state plane feet)
    x0, y0 = Transformer.from_crs("EPSG:4326", "EPSG:2263", always_xy=True).transform(lon, lat)
    s, e = to_sec(start_time), to_sec(end_time)

    # define placeholders for feeds selection 
    sel = list(selected_feeds or [])
    if sel:
        values = ",".join(["(?)"] * len(sel))           # -> "(?),(?),(?)"
        chosen_cte = f"chosen_feeds(feed_id) AS (VALUES {values}),"
        feed_pred = "feed_id IN (SELECT feed_id FROM chosen_feeds)"
    else:
        chosen_cte = ""                                  # no CTE
        feed_pred = "TRUE"                               # no filter = all feeds

    sql = f"""
    WITH
    {chosen_cte}
    dim_stops AS (SELECT * FROM read_parquet('{PARQ_BASE}/dim_stops/*.parquet')),
    dim_trips  AS (SELECT * FROM read_parquet('{PARQ_BASE}/dim_trips/*.parquet')),
    dim_routes AS (SELECT * FROM read_parquet('{PARQ_BASE}/dim_routes/*.parquet')),
    calendar_base AS (SELECT * FROM read_parquet('{PARQ_BASE}/calendar_base/*.parquet')),
    fact_stop_events AS (SELECT * FROM read_parquet('{PARQ_BASE}/fact_stop_events/*.parquet')),
    svcs AS (
      SELECT DISTINCT feed_id, service_id
      FROM calendar_base
      WHERE {feed_pred}
      AND (
        (? = 'Weekday'  AND (monday=1 OR tuesday=1 OR wednesday=1 OR thursday=1 OR friday=1))
        OR (? = 'Saturday' AND saturday=1)
        OR (? = 'Sunday'   AND sunday=1)
        )
    ),
    win AS (SELECT ?::INTEGER AS s, ?::INTEGER AS e),
    near_stops AS (
      SELECT feed_id, stop_id, stop_name, lat, lon
      FROM dim_stops
      WHERE {feed_pred}
      AND ((x2263 - ?)*(x2263 - ?) + (y2263 - ?)*(y2263 - ?)) <= ?*?
    )
    SELECT
      r.feed_id,
      r.route_id,
      t.direction_id,
      s.stop_id,
      s.stop_name,
      s.lat  AS stop_lat,
      s.lon  AS stop_lon,
      COUNT(*) AS buses_scheduled
    FROM fact_stop_events f
    JOIN dim_trips  t ON f.feed_id = t.feed_id AND f.trip_id = t.trip_id
    JOIN dim_routes r ON t.feed_id = r.feed_id AND t.route_id = r.route_id
    JOIN svcs       v ON f.feed_id = v.feed_id AND f.service_id = v.service_id
    JOIN near_stops s ON f.feed_id = s.feed_id AND f.stop_id   = s.stop_id
    CROSS JOIN win
    WHERE
      (
        (SELECT e FROM win) >= (SELECT s FROM win)
        AND f.arrival_sec BETWEEN (SELECT s FROM win) AND (SELECT e FROM win)
      )
      OR
      (
        (SELECT e FROM win) < (SELECT s FROM win)   -- midnight wrap
        AND (f.arrival_sec >= (SELECT s FROM win) OR f.arrival_sec <= (SELECT e FROM win))
      )
    GROUP BY r.feed_id, r.route_id, t.direction_id, s.stop_id, s.stop_name, s.lat, s.lon
    ORDER BY s.stop_name, r.feed_id, r.route_id, t.direction_id;
    """

    params = []
    if sel:
        params += sel                                # feeds for chosen_feeds CTE (once)
        params += [day_type, day_type, day_type]         # 3 day-type placeholders
        params += [s, e]                   # window
        params += [x0, x0, y0, y0, int(radius_ft), int(radius_ft)]  # spatial
    df = con.execute(sql, params).fetchdf()

    return df

In [None]:
# --- inputs you can tweak ---
lon, lat = -73.998522, 40.745306      # intersection point
radius_ft = 250
day_type = "Weekday"                  # "Weekday" | "Saturday" | "Sunday"
selected_feeds = feeds["feed_id"].tolist()  # or specify ['mta-nyct-bus', ...]

buses_by_stop_route_dir_within_radius(lon, lat, "07:45", "08:45", day_type, radius_ft, selected_feeds, con)

Unnamed: 0,feed_id,route_id,direction_id,stop_id,stop_name,stop_lat,stop_lon,buses_scheduled
0,mta-nyct-bus-manhattan,M20,0,405623,8 AV/W 23 ST,40.744919,-73.998656,6
1,mta-nyct-bus-manhattan,M23+,0,402126,W 23 ST/8 AV,40.745059,-73.998141,15
2,mta-nyct-bus-manhattan,M23+,1,402154,W 23 ST/8 AV,40.745547,-73.998888,14


In [22]:
# Example intersection (lon, lat)
lon, lat = -73.998522, 40.745306

from pyproj import Transformer
tf = Transformer.from_crs("EPSG:4326", "EPSG:2263", always_xy=True)
x0, y0 = tf.transform(lon, lat)

nearby = con.execute("""
SELECT stop_id, stop_name, lat, lon
FROM dim_stops
WHERE (x2263 - ?)*(x2263 - ?) + (y2263 - ?)*(y2263 - ?) <= ?*?
""", [x0, x0, y0, y0, 250, 250]).fetchdf()
nearby

Unnamed: 0,stop_id,stop_name,lat,lon
0,402126,W 23 ST/8 AV,40.745059,-73.998141
1,402154,W 23 ST/8 AV,40.745547,-73.998888
2,405623,8 AV/W 23 ST,40.744919,-73.998656
