# imports

In [26]:
from __future__ import annotations

import logging
from pathlib import Path
from typing import List

import pandas as pd

# Logging setup

In [27]:
logging.basicConfig(
    format="%(asctime)s %(levelname)8s %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S",
    level=logging.INFO,
)
logger = logging.getLogger(__name__)


# Constants and schema

In [28]:

try:
    # When executed as a standalone script
    ROOT = Path(__file__).resolve().parents[1]
except NameError:
    # When executed inside a Jupyter notebook (__file__ is undefined)
    ROOT = Path.cwd()

RAW_FOLDER = ROOT / "data" / "raw" / "2023-citibike-tripdata"
OUTPUT_FILE = ROOT / "data" / "processed" / "2023" / "citibike_2023_all.parquet"
STATION_FILE = ROOT / "data" / "processed" / "stations_2023.parquet"

CANONICAL_COLUMNS_BIKE = [
    "ride_id",
    "rideable_type",
    "started_at",
    "ended_at",
    "start_station_name",
    "start_station_id",
    "end_station_name",
    "end_station_id",
    "start_lat",
    "start_lng",
    "end_lat",
    "end_lng",
    "member_casual",
]

# NOTE: Some Citi Bike station IDs are alphanumeric (e.g. "SYS038"), so we
# treat station IDs as *strings* during ingest and cast later if needed.
_DTYPES = {
    "ride_id": "string",
    "rideable_type": "category",
    "start_station_name": "string",
    "start_station_id": "string",
    "end_station_name": "string",
    "end_station_id": "string",
    "start_lat": "float32",
    "start_lng": "float32",
    "end_lat": "float32",
    "end_lng": "float32",
    "member_casual": "category",
}

_LAT_MIN, _LAT_MAX = 40.5, 41.0
_LON_MIN, _LON_MAX = -74.3, -73.6

# Helpers

In [29]:

def list_raw_files(folder: Path) -> List[Path]:
    """Return all Citi Bike CSV/CSV.GZ files for 2023, searching recursively.

    The raw directory contains 12 monthly sub‑folders (202301‑…‑202312‑citibike‑tripdata)
    that in turn hold one or more split CSV files ( _1.csv, _2.csv, … ).
    """
    files = sorted(folder.rglob("*citibike-tripdata*.csv*"))
    if not files:
        raise FileNotFoundError(f"No Citi Bike raw files found recursively under {folder}")
    logger.info("Discovered %d raw files", len(files))
    return files


def read_monthly_file(fp: Path) -> pd.DataFrame:
    """Read a single raw Citi Bike CSV (compressed or not)."""
    logger.debug("Reading %s", fp.name)
    df = pd.read_csv(
        fp,
        dtype=_DTYPES,
        parse_dates=["started_at", "ended_at"],
        low_memory=False,
    )
    # Standardise columns & order
    df = df[CANONICAL_COLUMNS_BIKE]
    return df


def validate(df: pd.DataFrame) -> pd.DataFrame:
    """Run basic sanity checks & return cleaned DataFrame.

    *No rows are dropped*; off‑grid coordinates are flagged via an `off_grid` boolean
    so downstream visualisations can decide what to show.
    """
    # Primary‑key uniqueness
    assert df["ride_id"].is_unique, "ride_id must be unique"

    # Flag rows whose coordinates fall outside the nominal Citi Bike service area
    mask_geo = (
        df["start_lat"].between(_LAT_MIN, _LAT_MAX)
        & df["start_lng"].between(_LON_MIN, _LON_MAX)
        & df["end_lat"].between(_LAT_MIN, _LAT_MAX)
        & df["end_lng"].between(_LON_MIN, _LON_MAX)
    )
    df["off_grid"] = ~mask_geo
    if df["off_grid"].any():
        logger.warning("%d rides flagged as off‑grid (kept for viz)", df["off_grid"].sum())

    # Member/casual flag integrity (some files use TITLE‑CASE or extra spaces)
    df["member_casual"] = (
        df["member_casual"].str.strip().str.lower().map({"member": "member", "casual": "casual"})
    )
    assert df["member_casual"].isin(["member", "casual"]).all(), "Invalid member_casual entries"

    # Derived hour column for downstream aggregation
    df["start_hour"] = df["started_at"].dt.floor("h")

    return df


# Visualization helper

In [30]:

def quick_visualize(parquet_path: Path, sample_frac: float = 0.05) -> None:
    """Create an interactive geo‑scatter map of ride start points.

    Writes an HTML file to <root>/reports/citibike_startpoints_map.html.
    """
    try:
        import plotly.express as px
    except ImportError:
        logger.error("plotly is not installed. Run `pip install plotly` to enable visualisation.")
        return

    logger.info("Loading parquet for visualisation → %s", parquet_path)
    df = pd.read_parquet(parquet_path, columns=["start_lat", "start_lng", "member_casual"])
    df = df.dropna(subset=["start_lat", "start_lng"])

    if sample_frac < 1.0:
        df = df.sample(frac=sample_frac, random_state=42)
        logger.info("Sampled %.0f%% of rows (%d) for plotting", sample_frac * 100, len(df))

    fig = px.scatter_mapbox(
        df,
        lat="start_lat",
        lon="start_lng",
        color="member_casual",
        zoom=11,
        height=650,
        opacity=0.35,
        title="Citi Bike ride start locations (sample)",
    )
    fig.update_layout(mapbox_style="carto-positron", margin={"r": 0, "t": 40, "l": 0, "b": 0})

    reports_dir = ROOT / "reports"
    reports_dir.mkdir(exist_ok=True, parents=True)
    html_path = reports_dir / "citibike_startpoints_map.html"
    fig.write_html(html_path)
    logger.info("Interactive map written to %s", html_path)

# ETL Main flow

In [31]:

def main() -> None:
    logger.info("Citi Bike validation pipeline start …")

    frames: List[pd.DataFrame] = []
    for fp in list_raw_files(RAW_FOLDER):
        frames.append(validate(read_monthly_file(fp)))

    df_all = pd.concat(frames, ignore_index=True)
    before = len(df_all)
    df_all = df_all.drop_duplicates("ride_id")
    logger.info("Removed %d duplicate rows (before=%d, after=%d)", before - len(df_all), before, len(df_all))

    OUTPUT_FILE.parent.mkdir(parents=True, exist_ok=True)
    df_all.to_parquet(OUTPUT_FILE, engine="pyarrow", compression="snappy", index=False)
    logger.info("Writing consolidated Parquet → %s", OUTPUT_FILE)

    # Save station metadata for joins (includes off‑grid stations if any)
    station_cols = [
        "start_station_id",
        "start_station_name",
        "start_lat",
        "start_lng",
    ]
    stations = (
        df_all[station_cols]
        .drop_duplicates("start_station_id")
        .rename(columns={
            "start_station_id": "station_id",
            "start_station_name": "station_name",
            "start_lat": "lat",
            "start_lng": "lon",
        })
    )
    STATION_FILE.parent.mkdir(parents=True, exist_ok=True)
    stations.to_parquet(STATION_FILE, engine="pyarrow", compression="snappy", index=False)
    logger.info("Station metadata saved → %s", STATION_FILE)

    # Quick interactive map (optional)
    quick_visualize(OUTPUT_FILE, sample_frac=0.05)

    logger.info("Citi Bike raw data validated, saved, and visualised")


if __name__ == "__main__":
    main()


2025-05-10 00:41:33,906 INFO     Citi Bike validation pipeline start …


2025-05-10 00:41:33,914 INFO     Discovered 40 raw files
2025-05-10 00:42:59,139 INFO     Removed 0 duplicate rows (before=35107030, after=35107030)
2025-05-10 00:43:16,069 INFO     Writing consolidated Parquet → /Users/vaibhavranshoor/Downloads/Citibike_rides/data/processed/2023/citibike_2023_all.parquet
2025-05-10 00:43:17,095 INFO     Station metadata saved → /Users/vaibhavranshoor/Downloads/Citibike_rides/data/processed/stations_2023.parquet
2025-05-10 00:43:18,563 INFO     Loading parquet for visualisation → /Users/vaibhavranshoor/Downloads/Citibike_rides/data/processed/2023/citibike_2023_all.parquet
2025-05-10 00:43:22,039 INFO     Sampled 5% of rows (1755352) for plotting
  fig = px.scatter_mapbox(
2025-05-10 00:43:22,663 INFO     Interactive map written to /Users/vaibhavranshoor/Downloads/Citibike_rides/reports/citibike_startpoints_map.html
2025-05-10 00:43:22,667 INFO     Citi Bike raw data validated, saved, and visualised
