# Data Pipeline Methodology

> Ahmed Hasan (7932883) — COMP 4710 Group 11, Winnipeg PTN Analysis

This notebook documents data sources, architecture, validation checks, and frequency-analysis outputs from the project DuckDB.


## 1) Data Sources

| Source Category | Dataset / Feed | ID | Purpose | Access |
|---|---|---|---|---|
| GTFS (Current) | Winnipeg Transit GTFS zip | n/a | Base schedule network (stops, routes, trips, stop_times, shapes, calendars) | https://gtfs.winnipegtransit.com/google_transit.zip |
| GTFS (Historical) | Transitland feed versions | n/a | Pre/post PTN comparison when available | https://www.transit.land/terms |
| Winnipeg Open Data | Neighbourhoods | `8k6x-xxsy` | Coverage boundaries | https://data.winnipeg.ca |
| Winnipeg Open Data | Community Areas | `gfvw-fk34` | Coarser aggregation boundaries | https://data.winnipeg.ca |
| Winnipeg Open Data | Cycling Network | `kjd9-dvf5` | Active mobility context | https://data.winnipeg.ca |
| Winnipeg Open Data | Pass-ups | `mer2-irmb` | Service quality events | https://data.winnipeg.ca |
| Winnipeg Open Data | On-time Performance | `gp3k-am4u` | Deviation/reliability analysis | https://data.winnipeg.ca |
| Winnipeg Open Data | Passenger Counts | `bv6q-du26` | Ridership proxy | https://data.winnipeg.ca |

PTN launch date: **June 29, 2025**.


## 2) Technology Stack

| Component | Role in Project |
|---|---|
| DuckDB | Embedded analytical database for storage, SQL transforms, and querying |
| DuckDB Spatial extension | Geospatial SQL operations |
| NetworkX | Graph analytics on transit network edges/stops |
| `gtfs-kit` | GTFS feed/service-date utilities |
| `gtfs-functions` | GTFS helper workflows |
| `pandera` | Data validation |
| `keplergl` | Interactive map visualization (downstream) |


## 3) Pipeline Architecture

1. **Ingest**: Download GTFS + Open Data into `data/raw/`.
2. **Load**: Materialize raw tables in DuckDB (`raw_gtfs_*`, `raw_open_data_*`, boundaries).
3. **Transform (SQL-first)**: Build `agg_*`, `ref_*`, `v_*` via SQL scripts under `ptn_analysis/data/sql/`.
4. **Consume**: Analysis modules read raw/agg/ref/view tables.

Primary command: `make data`


## 4) Setup

Run this first. It connects directly to the absolute DuckDB file in `data/processed/`.


In [None]:
from pathlib import Path
import sys
import duckdb

repo_root = Path.cwd()
while repo_root != repo_root.parent and not (repo_root / 'ptn_analysis').exists():
    repo_root = repo_root.parent
if not (repo_root / 'ptn_analysis').exists():
    raise RuntimeError('Could not find repo root containing ptn_analysis.')

repo_root_str = str(repo_root)
sys.path = [p for p in sys.path if p != repo_root_str]
sys.path.insert(0, repo_root_str)

absolute_db_path = (repo_root / 'data/processed/wpg_transit.duckdb').resolve()
if not absolute_db_path.exists():
    raise FileNotFoundError(f'DuckDB file missing: {absolute_db_path}. Run `make data`.')

con = duckdb.connect(str(absolute_db_path))
con.execute('INSTALL spatial; LOAD spatial;')

print(f'DB path: {absolute_db_path}')
database_connections = con.execute('PRAGMA database_list').fetchdf()
database_connections = database_connections.rename(columns={
    'seq': 'connection_sequence',
    'name': 'database_alias',
    'file': 'database_file_path',
})
display(database_connections)
print('Connected file (full path):', database_connections.loc[0, 'database_file_path'])


## 5) Data Status


In [None]:
tables = [
    ('raw_gtfs_stops', 'Transit Stops'),
    ('raw_gtfs_routes', 'Routes'),
    ('raw_gtfs_trips', 'Trips'),
    ('raw_gtfs_stop_times', 'Stop Times'),
    ('raw_gtfs_edges_weighted', 'Network Edges'),
    ('raw_neighbourhoods', 'Neighbourhoods'),
    ('raw_community_areas', 'Community Areas'),
    ('raw_open_data_pass_ups', 'Pass-up Data'),
    ('raw_open_data_on_time', 'On-time Data'),
    ('raw_open_data_passenger_counts', 'Passenger Counts'),
    ('agg_stops_per_neighbourhood', 'Coverage by Neighbourhood'),
    ('agg_stops_per_community', 'Coverage by Community'),
    ('agg_route_passups_summary', 'Route Pass-up Summary'),
    ('agg_route_ontime_summary', 'Route On-time Summary'),
    ('agg_stop_ontime_summary', 'Stop On-time Summary'),
    ('ref_route_mapping', 'Route Mapping'),
    ('ref_stop_mapping', 'Stop Mapping'),
    ('v_route_performance', 'Route Performance View'),
    ('v_stop_performance', 'Stop Performance View'),
]

print('=' * 56)
print('DATA PIPELINE STATUS')
print('=' * 56)

for table, name in tables:
    try:
        count = con.execute(f'SELECT COUNT(*) FROM {table}').fetchone()[0]
        print(f'{name:.<40} {count:>12,} rows')
    except Exception:
        print(f'{name:.<40} NOT LOADED')

print('\nTable prefix totals:')
for prefix in ('raw_', 'agg_', 'ref_', 'v_'):
    names = [
        r[0] for r in con.execute(
            "SELECT table_name FROM information_schema.tables WHERE table_schema='main' AND table_name LIKE ? ORDER BY table_name",
            [f'{prefix}%']
        ).fetchall()
    ]
    total_rows = 0
    for table_name in names:
        total_rows += con.execute(f'SELECT COUNT(*) FROM {table_name}').fetchone()[0]
    print(f'  {prefix:<4} total rows (sum of table counts): {total_rows:,}')

print('\nTable counts by prefix:')
for prefix in ('raw_', 'agg_', 'ref_', 'v_'):
    table_count = con.execute(
        "SELECT COUNT(*) FROM information_schema.tables WHERE table_schema='main' AND table_name LIKE ?",
        [f'{prefix}%']
    ).fetchone()[0]
    print(f'  {prefix:<4} tables: {table_count}')


In [None]:
# Validate feed date range
try:
    feed = con.execute('SELECT feed_start_date, feed_end_date FROM raw_gtfs_feed_info').fetchone()
    if feed:
        print(f'GTFS Feed Period: {feed[0]} to {feed[1]}')
    else:
        print('Feed info not loaded')
except Exception:
    print('Feed info not loaded')


## 6) Frequency Analysis (Ahmed)


In [None]:
import importlib
import ptn_analysis.analysis.frequency as frequency_module

importlib.reload(frequency_module)

summary_stats = frequency_module.get_frequency_summary(con=con)
print('Frequency Summary (Network-wide):')
for metric_name, metric_value in summary_stats.items():
    if isinstance(metric_value, float):
        print(f'  {metric_name}: {metric_value:.2f}')
    else:
        print(f'  {metric_name}: {metric_value:,}')

print('\nTop 10 routes by trip departures:')
route_frequency = frequency_module.compute_route_frequency(con=con).copy()
route_frequency = route_frequency.rename(columns={
    'route_id': 'gtfs_route_id',
    'total_trips': 'total_trip_departures',
    'peak_trips': 'peak_period_trip_departures',
    'avg_headway_peak': 'peak_period_avg_headway_minutes',
    'avg_headway_offpeak': 'midday_avg_headway_minutes',
})
route_frequency_columns = [
    'route_name',
    'gtfs_route_id',
    'total_trip_departures',
    'peak_period_trip_departures',
    'peak_period_avg_headway_minutes',
    'midday_avg_headway_minutes',
    'service_span_hours',
]
available_route_frequency_columns = [column for column in route_frequency_columns if column in route_frequency.columns]
display(route_frequency[available_route_frequency_columns].head(10))

print('\nHourly departure profile (all routes):')
hourly_profile = frequency_module.get_hourly_profile(con=con).copy()
hourly_profile = hourly_profile.rename(columns={'hour': 'service_hour', 'trip_count': 'trips_departing'})
display(hourly_profile)

print('\nRoute direction labels (derived from trip headsign):')
route_direction_labels = frequency_module.get_route_direction_labels(con=con).copy()
route_direction_labels = route_direction_labels.rename(columns={'route_id': 'gtfs_route_id', 'direction_id': 'direction_code'})
display(route_direction_labels.head(20))

print('\nTrips by route, direction, and service hour (sample):')
trips_per_hour = frequency_module.compute_trips_per_hour(con=con).copy()
trips_per_hour = trips_per_hour.rename(columns={
    'route_id': 'gtfs_route_id',
    'hour': 'service_hour',
    'trip_count': 'trips_departing',
    'direction_id': 'direction_code',
})
display(trips_per_hour.head(20))


## 7) References

- GTFS reference: https://gtfs.org/schedule/reference/
- Winnipeg Open Data portal: https://data.winnipeg.ca
- DuckDB docs: https://duckdb.org/docs/
- Transitland terms: https://www.transit.land/terms

---
Prepared by: **Ahmed Hasan (7932883)**
