# Station Trip Counts per Hour

Computes trip counts per hour (0–23) for each station and route from raw trip data, binned by day, month, and year. Station counts split into incoming (destination) and outgoing (origin).

**Outputs:**
- `prepared-data/stations/station_totals.parquet` – station_id, in, out
- `prepared-data/stations/station_by_year.parquet` – period, station_id, direction, hour, count
- `prepared-data/stations/station_by_month.parquet` – period, station_id, direction, hour, count
- `prepared-data/stations/station_by_day_YYYY.parquet` – by_day (yearly Parquet files)
- `prepared-data/stations/station_trip_counts_meta.json` – last_execution metadata
- `prepared-data/routes/route_pair_counts.parquet` – flat totals (route_key, count)
- `prepared-data/routes/route_by_year.parquet` – year-binned (period, route_key, hour, count)
- `prepared-data/routes/route_by_month.parquet` – month-binned (period, route_key, hour, count)
- `prepared-data/routes/route_by_day_YYYY.parquet` – by_day (yearly Parquet files)
- `prepared-data/routes/route_trip_counts_meta.json` – last_execution metadata

Data stays separate from routes for use in various frontend visualizations.

## Setup

In [1]:
# =============================================================================
# SETUP: Paths, execution banner
# =============================================================================
from pathlib import Path
import json
import sys
from collections import defaultdict
from datetime import datetime

cwd = Path.cwd()
project_root = cwd if (cwd / "package.json").exists() else cwd.parent.parent
raw_dir = project_root / "raw-data"
prepared_dir = project_root / "prepared-data"
stations_dir = prepared_dir / "stations"
routes_dir = prepared_dir / "routes"
stations_dir.mkdir(parents=True, exist_ok=True)
routes_dir.mkdir(parents=True, exist_ok=True)

sys.path.insert(0, str(project_root / "data-pipeline"))
from execution_utils import show_execution_banner, write_with_execution_metadata

out_path = stations_dir / "station_trip_counts_meta.json"
_pipeline_start_time = show_execution_banner(out_path)

print("Project root:", project_root)
print("Raw data:", raw_dir)
print("Output:", stations_dir)

No previous execution info (file does not exist yet).
Project root: c:\Users\Nicol\Desktop\INF252-Course-Project
Raw data: c:\Users\Nicol\Desktop\INF252-Course-Project\raw-data
Output: c:\Users\Nicol\Desktop\INF252-Course-Project\prepared-data\stations


## Load raw trips and compute counts

In [2]:
# =============================================================================
# Load raw trips, count per station per hour (in/out split), binned by day/month/year
# Station: period -> station_id -> "in"|"out" -> hour -> count
# Route: period -> route_key -> hour -> count, plus flat pair_counts
# =============================================================================
station_by_year = defaultdict(lambda: defaultdict(lambda: defaultdict(lambda: defaultdict(int))))
station_by_month = defaultdict(lambda: defaultdict(lambda: defaultdict(lambda: defaultdict(int))))
station_by_day = defaultdict(lambda: defaultdict(lambda: defaultdict(lambda: defaultdict(int))))
station_totals = defaultdict(lambda: {"in": 0, "out": 0})
route_by_year = defaultdict(lambda: defaultdict(lambda: defaultdict(int)))
route_by_month = defaultdict(lambda: defaultdict(lambda: defaultdict(int)))
route_by_day = defaultdict(lambda: defaultdict(lambda: defaultdict(int)))
pair_counts = defaultdict(int)  # flat totals: "origin_id|dest_id" -> count

if not raw_dir.exists():
    raise FileNotFoundError("raw-data/ not found. Run npm run download first.")

total_trips = 0
for year_dir in sorted(raw_dir.iterdir()):
    if not year_dir.is_dir():
        continue
    year_str = year_dir.name
    for json_path in sorted(year_dir.glob("*.json")):
        with open(json_path, encoding="utf-8") as f:
            data = json.load(f)
        trips = data if isinstance(data, list) else data.get("data", data.get("trips", []))
        for t in trips:
            started_at = t.get("started_at")
            if not started_at:
                continue
            try:
                dt = datetime.fromisoformat(started_at.replace("Z", "+00:00"))
            except (ValueError, TypeError):
                continue
            hour = dt.hour
            year = str(dt.year)
            month = f"{dt.year:04d}-{dt.month:02d}"
            day = f"{dt.year:04d}-{dt.month:02d}-{dt.day:02d}"
            start_id = str(t.get("start_station_id", "")).strip()
            end_id = str(t.get("end_station_id", "")).strip()
            if start_id:
                station_by_year[year][start_id]["out"][hour] += 1
                station_by_month[month][start_id]["out"][hour] += 1
                station_by_day[day][start_id]["out"][hour] += 1
                station_totals[start_id]["out"] += 1
            if end_id and end_id != start_id:
                station_by_year[year][end_id]["in"][hour] += 1
                station_by_month[month][end_id]["in"][hour] += 1
                station_by_day[day][end_id]["in"][hour] += 1
                station_totals[end_id]["in"] += 1
            if start_id and end_id and start_id != end_id:
                route_key = f"{start_id}|{end_id}"
                route_by_year[year][route_key][hour] += 1
                route_by_month[month][route_key][hour] += 1
                route_by_day[day][route_key][hour] += 1
                pair_counts[route_key] += 1
            total_trips += 1

print(f"Processed {total_trips:,} trips")
print(f"Stations: {len(station_by_year)} years, {len(station_by_month)} months, {len(station_by_day)} days")
print(f"Routes: {len(pair_counts)} pairs, {len(route_by_year)} years")

Processed 10,034,294 trips
Stations: 8 years, 80 months, 2398 days
Routes: 80805 pairs, 8 years


## Prune zeros and write output

In [3]:
# =============================================================================
# Prune zeros (omit keys with count 0), convert to JSON-serializable dict
# =============================================================================
import pandas as pd

def prune_zeros(nested: dict) -> dict:
    """For routes: period -> route_key -> hour -> count. Omit zero hours."""
    result = {}
    for period, entities in nested.items():
        result[period] = {}
        for entity_id, hours in entities.items():
            pruned = {str(h): c for h, c in hours.items() if c > 0}
            if pruned:
                result[period][entity_id] = pruned
    return result


def prune_station_zeros(nested: dict) -> dict:
    """For stations: period -> station_id -> "in"|"out" -> hour -> count. Omit zeros."""
    result = {}
    for period, stations in nested.items():
        result[period] = {}
        for station_id, dirs in stations.items():
            pruned_in = {str(h): c for h, c in dirs.get("in", {}).items() if c > 0}
            pruned_out = {str(h): c for h, c in dirs.get("out", {}).items() if c > 0}
            if pruned_in or pruned_out:
                result[period][station_id] = {}
                if pruned_in:
                    result[period][station_id]["in"] = pruned_in
                if pruned_out:
                    result[period][station_id]["out"] = pruned_out
    return result


def prune_totals(totals: dict) -> dict:
    """Station totals: {sid: {"in": x, "out": y}}. Omit if both zero."""
    return {
        sid: {"in": d["in"], "out": d["out"]}
        for sid, d in totals.items()
        if d["in"] or d["out"]
    }


def flatten_route_binned(nested: dict) -> pd.DataFrame:
    """Flatten route binned dict (period -> route_key -> hour -> count) to DataFrame for Parquet."""
    rows = []
    for period, routes in nested.items():
        for route_key, hours in routes.items():
            for hour, count in hours.items():
                if count > 0:
                    rows.append({"period": period, "route_key": route_key, "hour": int(hour), "count": count})
    return pd.DataFrame(rows) if rows else pd.DataFrame(columns=["period", "route_key", "hour", "count"])


def flatten_station_binned(nested: dict) -> pd.DataFrame:
    """Flatten station binned dict (period -> station_id -> in/out -> hour -> count) to DataFrame for Parquet."""
    rows = []
    for period, stations in nested.items():
        for station_id, dirs in stations.items():
            for direction, hours in dirs.items():
                for hour, count in hours.items():
                    if count > 0:
                        rows.append({"period": period, "station_id": station_id, "direction": direction, "hour": int(hour), "count": count})
    return pd.DataFrame(rows) if rows else pd.DataFrame(columns=["period", "station_id", "direction", "hour", "count"])


# Clean up legacy files at prepared-data root and in subfolders
for old in list(prepared_dir.glob("station_trip_counts*.json")) + list(prepared_dir.glob("route_trip_counts*.json")):
    if old.is_file():
        old.unlink()
        print(f"Removed legacy {old.relative_to(prepared_dir)}")
for old in list(stations_dir.glob("station_trip_counts*.json")):
    if old.is_file():
        old.unlink()
        print(f"Removed legacy stations/{old.name}")


# Station outputs (Parquet format to stations/)
totals_pruned = prune_totals(station_totals)
totals_df = pd.DataFrame([{"station_id": sid, "in": d["in"], "out": d["out"]} for sid, d in totals_pruned.items()])
totals_df.to_parquet(stations_dir / "station_totals.parquet", index=False)
print(f"Wrote {stations_dir / 'station_totals.parquet'} ({len(totals_df)} stations)")

station_by_year_pruned = prune_station_zeros(station_by_year)
df_year = flatten_station_binned(station_by_year_pruned)
df_year.to_parquet(stations_dir / "station_by_year.parquet", index=False)
print(f"Wrote {stations_dir / 'station_by_year.parquet'} ({len(df_year)} rows)")

station_by_month_pruned = prune_station_zeros(station_by_month)
df_month = flatten_station_binned(station_by_month_pruned)
df_month.to_parquet(stations_dir / "station_by_month.parquet", index=False)
print(f"Wrote {stations_dir / 'station_by_month.parquet'} ({len(df_month)} rows)")

station_by_day_pruned = prune_station_zeros(station_by_day)
station_years = sorted({day[:4] for day in station_by_day_pruned})
for year in station_years:
    year_data = {day: station_by_day_pruned[day] for day in station_by_day_pruned if day.startswith(year)}
    df_day = flatten_station_binned(year_data)
    year_path = stations_dir / f"station_by_day_{year}.parquet"
    df_day.to_parquet(year_path, index=False)
    print(f"Wrote {year_path} ({len(df_day)} rows)")

station_meta_path = stations_dir / "station_trip_counts_meta.json"
write_with_execution_metadata(station_meta_path, {}, _pipeline_start_time)
print(f"Wrote {station_meta_path}")

# Route outputs (Parquet format to routes/)
# Remove legacy JSON files if present
for old in [routes_dir / "route_trip_counts.json"] + list(routes_dir.glob("route_trip_counts_by_day_*.json")):
    if old.exists():
        old.unlink()
        print(f"Removed legacy {old.name}")

pair_df = pd.DataFrame([{"route_key": k, "count": v} for k, v in pair_counts.items() if v > 0])
pair_df.to_parquet(routes_dir / "route_pair_counts.parquet", index=False)
print(f"Wrote {routes_dir / 'route_pair_counts.parquet'} ({len(pair_df)} routes)")

df_year = flatten_route_binned(prune_zeros(route_by_year))
df_year.to_parquet(routes_dir / "route_by_year.parquet", index=False)
print(f"Wrote {routes_dir / 'route_by_year.parquet'} ({len(df_year)} rows)")

df_month = flatten_route_binned(prune_zeros(route_by_month))
df_month.to_parquet(routes_dir / "route_by_month.parquet", index=False)
print(f"Wrote {routes_dir / 'route_by_month.parquet'} ({len(df_month)} rows)")

route_by_day_pruned = prune_zeros(route_by_day)
route_years = sorted({day[:4] for day in route_by_day_pruned})
for year in route_years:
    year_data = {day: route_by_day_pruned[day] for day in route_by_day_pruned if day.startswith(year)}
    df_day = flatten_route_binned(year_data)
    year_path = routes_dir / f"route_by_day_{year}.parquet"
    df_day.to_parquet(year_path, index=False)
    print(f"Wrote {year_path} ({len(df_day)} rows)")

# Route metadata (last_execution only)
route_meta_path = routes_dir / "route_trip_counts_meta.json"
write_with_execution_metadata(route_meta_path, {}, _pipeline_start_time)
print(f"Wrote {route_meta_path}")

Removed legacy stations/station_trip_counts.json
Removed legacy stations/station_trip_counts_by_day_2019.json
Removed legacy stations/station_trip_counts_by_day_2020.json
Removed legacy stations/station_trip_counts_by_day_2021.json
Removed legacy stations/station_trip_counts_by_day_2022.json
Removed legacy stations/station_trip_counts_by_day_2023.json
Removed legacy stations/station_trip_counts_by_day_2024.json
Removed legacy stations/station_trip_counts_by_day_2025.json
Removed legacy stations/station_trip_counts_by_day_2026.json
Wrote c:\Users\Nicol\Desktop\INF252-Course-Project\prepared-data\stations\station_totals.parquet (292 stations)
Wrote c:\Users\Nicol\Desktop\INF252-Course-Project\prepared-data\stations\station_by_year.parquet (82156 rows)
Wrote c:\Users\Nicol\Desktop\INF252-Course-Project\prepared-data\stations\station_by_month.parquet (718694 rows)
Wrote c:\Users\Nicol\Desktop\INF252-Course-Project\prepared-data\stations\station_by_day_2019.parquet (1346816 rows)
Wrote c:\U