# 🚍 NYC Bus Reliability vs Weather, Traffic, and Service Alerts

![Banner](./assets/banner.jpeg)

**Goal**: Quantify how weather conditions, traffic congestion, and service disruptions affect **NYC bus reliability**, measured via **Wait Assessment (WA)** — the share of observed trips that meet scheduled headways.

This notebook fulfills **Checkpoint 2: Exploratory Data Analysis & Visualization** for IT4063C Data Technologies Analytics.

## 🧭 Project Overview
NYC buses operate on city streets, making them vulnerable to **weather**, **congestion**, and **service alerts**. Reliable service is critical for millions of daily riders. This analysis explores how environmental and operational factors influence reliability.

## ❓ Research Question
**How do precipitation, snowfall, temperature, traffic speeds, and MTA bus alerts relate to monthly NYC bus Wait Assessment (2020–2024)?**

**Hypothesis:** Heavy rain, snow, and traffic congestion reduce Wait Assessment (lower reliability). Months with higher alert volumes (detours, delays) will also show lower WA.

## 🗂️ Data Sources
- **MTA Bus Performance (Wait Assessment)** — `data/bus_data.csv`
- **NOAA GHCN-Daily – Central Park (USW00094728)** — `data/weather_data.csv`
- **NYC DOT Traffic API (JSON)** — https://data.cityofnewyork.us/resource/i4gi-tjb9.json
- **MTA Bus Alerts Feed (Protocol Buffers)** — https://api-endpoint.mta.info/Dataservice/mtagtfsfeeds/camsys%2Fbus-alerts

These sources represent **three acquisition methods**: CSV, JSON API, and Protocol Buffers. See `data_types.md` for detailed schema definitions.

In [None]:
import os, sys, json, math, textwrap, time, warnings
from datetime import datetime, timedelta

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
import requests

from google.transit import gtfs_realtime_pb2

warnings.filterwarnings('ignore')
pd.set_option('display.max_columns', 200)
sns.set(style='whitegrid')

print("[INFO] Notebook start at:", datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
print("[INFO] Python:", sys.version)
print("[INFO] Pandas:", pd.__version__)
print("[INFO] Using working directory:", os.getcwd())

## ⚙️ Configuration & Utility Helpers
Config lives here to centralize retry/backoff, URLs, and required environment variables. The helper functions add explicit debug prints and hard assertions so failures are self-explanatory.

In [None]:
class Backoff:
    @staticmethod
    def sleep(i):
        # Exponential backoff with jitter
        delay = min(2 ** i + np.random.rand(), 10)
        print(f"[DEBUG] Backoff sleeping {delay:.2f}s before retry {i+1}")
        time.sleep(delay)

CFG = {
    'traffic_url': 'https://data.cityofnewyork.us/resource/i4gi-tjb9.json',
    'mta_alerts_url': 'https://api-endpoint.mta.info/Dataservice/mtagtfsfeeds/camsys%2Fbus-alerts',
    'months_back_for_traffic': 24,
    'csv_bus': 'data/bus_data.csv',
    'csv_weather': 'data/weather_data.csv'
}

def require_file(path):
    assert os.path.exists(path), f"[FATAL] Required file missing: {path}. Place it in the repo (see Data Sources)."
    print(f"[OK] Found file: {path}")

def require_env(varname, optional=False):
    val = os.getenv(varname)
    if not val:
        msg = f"[{'WARN' if optional else 'FATAL'}] Env var {varname} is {'recommended' if optional else 'required'}"
        if optional:
            print(msg)
        else:
            raise EnvironmentError(msg)
    else:
        print(f"[OK] Env var {varname} present ({'optional' if optional else 'required'})")
    return val

def nonempty_df(df, name="<df>"):
    assert isinstance(df, pd.DataFrame), f"[FATAL] {name} is not a DataFrame"
    assert len(df) > 0, f"[FATAL] {name} is empty"
    print(f"[OK] {name}: shape={df.shape}")
    return df

def must_have_cols(df, cols, name="<df>"):
    missing = [c for c in cols if c not in df.columns]
    assert not missing, f"[FATAL] {name} is missing columns: {missing}\nAvailable: {list(df.columns)}"
    print(f"[OK] {name} has required columns: {cols}")
    return df

def print_range(df, col, name):
    if col in df:
        m, M = df[col].min(), df[col].max()
        print(f"[DEBUG] {name}.{col} range: {m} → {M}")
    else:
        print(f"[WARN] {name} missing column for range: {col}")

def pct(n, d):
    return (n / d) * 100 if d else float('nan')

## 📥 Load & Normalize Datasets
Three distinct ingestion methods:
1) **CSV**: Bus performance, Weather
2) **JSON API**: NYC DOT traffic
3) **Protocol Buffers**: MTA bus alerts

In [None]:
# --- CSV 1: MTA Bus Performance ---
require_file(CFG['csv_bus'])
bus = pd.read_csv(CFG['csv_bus'], na_values=['', ' ', 'null', 'NULL'])
nonempty_df(bus, 'bus_raw')
bus.columns = [c.strip().lower().replace(' ', '_') for c in bus.columns]
must_have_cols(bus, ['month','wait_assessment','number_of_trips_passing_wait','number_of_scheduled_trips'], 'bus_raw')

bus['date'] = pd.to_datetime(bus['month'], errors='coerce')
bad_dates = bus['date'].isna().sum()
print(f"[DEBUG] bus_raw bad date rows: {bad_dates} / {len(bus)} ({pct(bad_dates, len(bus)):.2f}%)")
bus = bus.dropna(subset=['date'])
for col in ['number_of_trips_passing_wait','number_of_scheduled_trips']:
    bus[col] = pd.to_numeric(bus[col].astype(str).str.replace(',', '', regex=False), errors='coerce')
    na_cnt = bus[col].isna().sum()
    print(f"[DEBUG] bus_raw {col} NaN after numeric coercion: {na_cnt}")
bus['wait_assessment'] = pd.to_numeric(
    bus['wait_assessment'].astype(str).str.rstrip('%'), errors='coerce') / 100.0
print_range(bus, 'wait_assessment', 'bus_raw')

# --- CSV 2: NOAA Weather ---
require_file(CFG['csv_weather'])
weather = pd.read_csv(CFG['csv_weather'])
nonempty_df(weather, 'weather_raw')
weather.columns = [c.strip().lower() for c in weather.columns]
must_have_cols(weather, ['date'], 'weather_raw')
weather['date'] = pd.to_datetime(weather['date'], errors='coerce')
weather = weather.dropna(subset=['date'])

num_cols = ['prcp','snow','snwd','tmax','tmin','tavg','awnd']
for col in num_cols:
    if col in weather:
        weather[col] = pd.to_numeric(weather[col], errors='coerce')
        print(f"[DEBUG] weather_raw {col} NaN: {weather[col].isna().sum()}")
    else:
        print(f"[WARN] weather_raw missing optional column: {col}")

# Convert tenths-based NOAA values if present
if 'tmax' in weather: weather['tmax'] = weather['tmax'] / 10.0
if 'tmin' in weather: weather['tmin'] = weather['tmin'] / 10.0
if 'tavg' in weather: weather['tavg'] = weather['tavg'] / 10.0
if 'prcp' in weather: weather['prcp_mm'] = weather['prcp'] / 10.0
if 'snow' in weather: weather['snow_mm'] = weather['snow'] / 10.0
if 'awnd' in weather: weather['awnd_ms'] = weather['awnd'] / 10.0

# Fill missing TAVG from min/max if possible
if all(c in weather for c in ['tavg','tmin','tmax']):
    mask = weather['tavg'].isna()
    filled = mask.sum()
    weather.loc[mask, 'tavg'] = (weather.loc[mask, 'tmin'] + weather.loc[mask, 'tmax']) / 2
    print(f"[DEBUG] weather_raw tavg filled from tmin/tmax: {filled}")

print_range(weather, 'tavg', 'weather_raw')
print_range(weather, 'prcp_mm', 'weather_raw')
print_range(weather, 'snow_mm', 'weather_raw')

In [None]:
# --- JSON API: NYC DOT Traffic Speeds ---
def fetch_traffic_json(url, months_back=24, app_token=None, verify=True):
    start_dt = (datetime.utcnow() - timedelta(days=30*months_back)).strftime('%Y-%m-%dT00:00:00.000')
    params = {
        '$where': f"recordedtimestamp >= '{start_dt}'",
        '$limit': 50000,
        '$order': 'recordedtimestamp DESC'
    }
    headers = {}
    if app_token:
        headers['X-App-Token'] = app_token

    last_err = None
    for i in range(4):
        try:
            print(f"[INFO] Fetching traffic JSON (attempt {i+1}) from {url}")
            r = requests.get(url, params=params, headers=headers, timeout=25, verify=verify)
            if r.status_code != 200:
                last_err = RuntimeError(f"HTTP {r.status_code}: {r.text[:200]}")
                print(f"[WARN] Traffic attempt {i+1} failed → {last_err}")
                Backoff.sleep(i); continue
            data = r.json()
            if not isinstance(data, list) or not data:
                last_err = RuntimeError("Traffic API returned empty or non-list JSON")
                print(f"[WARN] Traffic attempt {i+1} returned empty data")
                Backoff.sleep(i); continue
            print(f"[OK] Traffic records fetched: {len(data)}")
            return pd.DataFrame(data)
        except requests.exceptions.SSLError as e:
            raise  # fail fast so the trust store/CA issue can be fixed explicitly
        except Exception as e:
            last_err = e
            print(f"[WARN] Traffic attempt {i+1} error → {e}")
            Backoff.sleep(i)
    raise last_err

socrata_token = os.getenv('SODA_APP_TOKEN')  # optional
traffic_raw = fetch_traffic_json(
    CFG['traffic_url'],
    months_back=CFG['months_back_for_traffic'],
    app_token=socrata_token,
    verify=True,
)
nonempty_df(traffic_raw, 'traffic_raw')

# Normalize and sanity check
ts_col = 'recordedtimestamp' if 'recordedtimestamp' in traffic_raw.columns else 'data_as_of'
must_have_cols(traffic_raw, [ts_col, 'speed'], 'traffic_raw')
traffic_raw['recordedtimestamp'] = pd.to_datetime(traffic_raw[ts_col], errors='coerce')
bad_ts = traffic_raw['recordedtimestamp'].isna().sum()
print(f"[DEBUG] traffic_raw bad timestamps: {bad_ts} / {len(traffic_raw)}")
traffic_raw = traffic_raw.dropna(subset=['recordedtimestamp'])
traffic_raw['speed'] = pd.to_numeric(traffic_raw['speed'], errors='coerce')
speed_nan = traffic_raw['speed'].isna().sum()
print(f"[DEBUG] traffic_raw speed NaN after coercion: {speed_nan}")
traffic_raw = traffic_raw.dropna(subset=['speed'])
print_range(traffic_raw, 'speed', 'traffic_raw')

traffic_raw['date'] = traffic_raw['recordedtimestamp'].dt.to_period('M').dt.to_timestamp()
traffic_monthly = (traffic_raw
    .groupby('date', as_index=False)['speed']
    .mean()
    .rename(columns={'speed':'mean_speed_mph'}))
nonempty_df(traffic_monthly, 'traffic_monthly')
print_range(traffic_monthly, 'mean_speed_mph', 'traffic_monthly')
print(f"[INFO] traffic_monthly rows: {len(traffic_monthly)}; date span: {traffic_monthly['date'].min()} → {traffic_monthly['date'].max()}")

In [None]:
# --- Protocol Buffers: MTA Bus Alerts ---
def fetch_mta_alerts_feed(url, api_key):
    headers = {'x-api-key': api_key}
    for i in range(4):
        try:
            print(f"[INFO] Fetching MTA GTFS-rt alerts (attempt {i+1})")
            r = requests.get(url, headers=headers, timeout=25)
            if r.status_code != 200:
                print(f"[WARN] Alerts attempt {i+1} HTTP {r.status_code}: {r.text[:160]}")
                Backoff.sleep(i); continue
            feed = gtfs_realtime_pb2.FeedMessage()
            feed.ParseFromString(r.content)
            print(f"[OK] Alerts entities fetched: {len(feed.entity)}")
            return feed
        except Exception as e:
            print(f"[WARN] Alerts attempt {i+1} error → {e}")
            Backoff.sleep(i)
    raise RuntimeError("Failed to fetch/parse MTA GTFS-rt alerts after retries")

mta_api_key = require_env('MTA_API_KEY', optional=False)
alerts_feed = fetch_mta_alerts_feed(CFG['mta_alerts_url'], mta_api_key)
header_ts = pd.to_datetime(datetime.utcfromtimestamp(alerts_feed.header.timestamp), utc=True).tz_convert('America/New_York') if alerts_feed.header.timestamp else pd.Timestamp.utcnow()
alerts = [e for e in alerts_feed.entity if e.HasField('alert')]
alerts_df = pd.DataFrame([
    {
        'date': header_ts.normalize(),
        'alert_count': len(alerts)
    }
])
nonempty_df(alerts_df, 'alerts_df')
print(alerts_df)

## 🔍 Exploratory Data Analysis (EDA)
Quick statistical summaries, distribution checks, correlations, and basic data health scans.

In [None]:
print('--- BUS DATA (raw) ---')
display(bus.describe(include='all'))
print('\n--- WEATHER DATA (raw) ---')
display(weather.describe(include='all'))
print('\n--- TRAFFIC DATA (monthly) ---')
display(traffic_monthly.describe(include='all'))
print('\n--- ALERTS SNAPSHOT ---')
display(alerts_df)

# Types & null scans
def scan_df(df, name):
    print(f"\n[SCAN] {name} dtypes:")
    print(df.dtypes)
    nulls = df.isna().sum().sort_values(ascending=False)
    print(f"[SCAN] {name} null counts (top 10):\n{nulls.head(10)}")
    print(f"[SCAN] {name} duplicated rows: {df.duplicated().sum()}")

scan_df(bus, 'bus')
scan_df(weather, 'weather')
scan_df(traffic_monthly, 'traffic_monthly')

# Identify obvious issues
print("\n[CHECK] Negative or zero scheduled trips in bus data:")
if 'number_of_scheduled_trips' in bus:
    print(bus.loc[bus['number_of_scheduled_trips'] <= 0, ['date','number_of_scheduled_trips']].head())
else:
    print('[WARN] number_of_scheduled_trips column missing after normalization')

print("\n[CHECK] Out-of-range WA values (should be 0–1):")
if 'wait_assessment' in bus:
    print(bus.loc[(bus['wait_assessment'] < 0) | (bus['wait_assessment'] > 1), ['date','wait_assessment']].head())
else:
    print('[WARN] wait_assessment column missing after normalization')

### 🧮 Aggregate Monthly Metrics
All sources are aligned at the **month** granularity for merging and comparison.

In [None]:
b = bus.copy()
b['wa_w'] = b['wait_assessment'] * b['number_of_scheduled_trips']
bus_monthly = (b.groupby('date', as_index=False)
    .agg(
        wa_w=('wa_w','sum'),
        scheduled=('number_of_scheduled_trips','sum'),
        passing=('number_of_trips_passing_wait','sum')
    )
    .assign(
        wa_weighted=lambda x: x.wa_w/x.scheduled,
        pct_passing=lambda x: x.passing/x.scheduled
    )
)
nonempty_df(bus_monthly, 'bus_monthly')
print_range(bus_monthly, 'wa_weighted', 'bus_monthly')

weather_monthly = (weather
    .groupby(weather['date'].dt.to_period('M').dt.to_timestamp(), as_index=False)
    .agg(
        tavg=('tavg','mean'), tmax=('tmax','mean'), tmin=('tmin','mean'),
        prcp_mm=('prcp_mm','sum'), snow_mm=('snow_mm','sum'), awnd_ms=('awnd_ms','mean')
    )
    .rename(columns={'date':'date'})
)
nonempty_df(weather_monthly, 'weather_monthly')

merged = (bus_monthly
    .merge(weather_monthly, on='date', how='inner')
    .merge(traffic_monthly, on='date', how='left')
    .merge(alerts_df, on='date', how='left')
)
nonempty_df(merged, 'merged')
print(f"[INFO] merged rows: {len(merged)}; date span: {merged['date'].min()} → {merged['date'].max()}")
display(merged.head(10))

# Critical sanity: WA within [0,1]
wa_oob = merged[(merged['wa_weighted'] < 0) | (merged['wa_weighted'] > 1)]
assert len(wa_oob) == 0, f"[FATAL] Found {len(wa_oob)} out-of-bounds WA rows"
print('[OK] All WA values within [0,1] after aggregation')

## 📊 Visualizations
At least four distinct visualizations using multiple libraries. Short descriptions accompany each plot.

In [None]:
# 1) Distribution of Monthly Wait Assessment (Seaborn)
sns.histplot(merged['wa_weighted'].dropna(), kde=True, bins=20)
plt.title('Distribution of Monthly Wait Assessment (Weighted)')
plt.xlabel('Wait Assessment (0–1)')
plt.ylabel('Frequency')
plt.show()
print("Insight: WA is generally concentrated toward higher values if the histogram skews right; heavy left-tail suggests frequent reliability issues.")

In [None]:
# 2) Correlation Heatmap (Seaborn)
num_cols = [c for c in ['wa_weighted','tavg','prcp_mm','snow_mm','awnd_ms','mean_speed_mph','alert_count'] if c in merged]
corr = merged[num_cols].corr()
sns.heatmap(corr, annot=True, cmap='coolwarm', fmt='.2f')
plt.title('Correlation Heatmap: Weather, Traffic, Alerts vs Reliability')
plt.show()
print("Insight: Negative correlation between precip/snow and WA suggests weather impact; positive correlation between mean traffic speed and WA suggests congestion effects.")

In [None]:
# 3) Traffic Speed vs WA (Matplotlib)
plt.scatter(merged['mean_speed_mph'], merged['wa_weighted'])
plt.xlabel('Mean Traffic Speed (mph)')
plt.ylabel('Wait Assessment (0–1)')
plt.title('Bus Reliability vs Traffic Congestion')
plt.grid(True, linewidth=0.4)
plt.show()
print("Insight: An upward slope indicates that higher speeds associate with better reliability; dispersion reveals noise and other drivers.")

In [None]:
# 4) Trends: WA & Mean Traffic Speed over Time (Plotly)
fig = px.line(
    merged.sort_values('date'),
    x='date', y=[c for c in ['wa_weighted','mean_speed_mph'] if c in merged],
    title='Trends: Wait Assessment & Mean Traffic Speed'
)
fig.update_layout(xaxis_title='Date', yaxis_title='Value')
fig.show()
print("Insight: Co-movement over months hints at traffic-reliability coupling; decoupling suggests other factors (alerts, incidents, seasonal ops).")

In [None]:
# 5) Precipitation vs Alert Count (Plotly, colored by WA)
if all(c in merged for c in ['prcp_mm','alert_count','wa_weighted']):
    fig2 = px.scatter(
        merged, x='prcp_mm', y='alert_count', color='wa_weighted',
        title='Precipitation vs Alert Count (Colored by WA)'
    )
    fig2.show()
    print("Insight: Clusters at high precipitation and high alerts with lower WA would support the hypothesis that weather + disruptions degrade reliability.")
else:
    print('[WARN] Missing one of prcp_mm/alert_count/wa_weighted; skipping scatter.')

## 🧹 Data Cleaning & Transformation
Actions reflect the issues found during EDA: outliers (precip/snow), missing values, duplicate rows, and data type coercions. The goal is a modeling-ready monthly dataset aligned across sources.

In [None]:
model_df = merged.copy()

# Drop rows lacking the target
before = len(model_df)
model_df = model_df.dropna(subset=['wa_weighted'])
after = len(model_df)
print(f"[CLEAN] Dropped {before - after} rows without WA")

# Clip extreme precipitation/snow outliers using IQR
for col in ['prcp_mm','snow_mm']:
    if col in model_df:
        q1, q3 = model_df[col].quantile([0.25,0.75])
        iqr = q3 - q1
        lower, upper = q1 - 1.5*iqr, q3 + 1.5*iqr
        clipped = ((model_df[col] < lower) | (model_df[col] > upper)).sum()
        model_df[col] = model_df[col].clip(lower, upper)
        print(f"[CLEAN] {col} clipped outliers: {clipped} (bounds {lower:.2f}..{upper:.2f})")
    else:
        print(f"[CLEAN] {col} not present; skipping outlier clipping")

# De-duplicate monthly rows if any
dups = model_df.duplicated(subset=['date']).sum()
if dups:
    print(f"[CLEAN] Found {dups} duplicate month rows; aggregating mean over duplicates")
    model_df = model_df.groupby('date', as_index=False).mean(numeric_only=True)
else:
    print("[CLEAN] No duplicate monthly rows detected")

display(model_df.head())
print_range(model_df, 'wa_weighted', 'model_df')

# Final schema sanity
required_for_model = ['wa_weighted','tavg','prcp_mm','snow_mm','awnd_ms','mean_speed_mph','alert_count']
missing_for_model = [c for c in required_for_model if c not in model_df.columns]
if missing_for_model:
    print(f"[WARN] Model features missing: {missing_for_model}. Modeling section will handle partial features.")
else:
    print('[OK] All planned model features present')

### Cleaning Summary
- **Missing values:** Dropped for `wa_weighted` and critical joins.
- **Outliers:** Clipped for precipitation and snow via IQR.
- **Duplicates:** Checked at monthly level; aggregated if present.
- **Types:** Coerced numeric fields; standardized `date` to monthly timestamps.
- **Sanity:** Asserted WA in [0,1]; scanned ranges and nulls for each dataset.

## 🧩 Machine Learning Plan (Preview)
- **Target:** `wa_weighted`
- **Candidate features:** `tavg`, `prcp_mm`, `snow_mm`, `awnd_ms`, `mean_speed_mph`, `alert_count`
- **Models:** Linear Regression → Random Forest → Ridge/Lasso
- **Evaluation:** MAE, RMSE, R² using a time-based split (train on early months, validate on later months).
- **Challenges:** Limited sample size at monthly granularity; collinearity (e.g., `tavg` and seasonality); missing alert coverage for past months (alerts snapshot is current). Consider aggregating alerts over history if API access allows or proxy with incident-rich periods.
- **Mitigations:** Regularization (Ridge/Lasso), feature scaling where appropriate, and careful backtesting windows to avoid leakage.

## 🔄 Prior Feedback & Updates
- Expanded from weather-only to include **traffic** (JSON API) and **service alerts** (Protocol Buffers).
- Added robust **sanity checks**, **debug messages**, and **assertions** to make failures explicit.
- Ensured **4+ visualizations** across **multiple libraries** (Seaborn, Matplotlib, Plotly).
- Strengthened cleaning and integration pipeline; enforced WA bounds and typed coercions.

In [None]:
# Keep this as the last cell for grading automation
!jupyter nbconvert --to python source.ipynb