# **Dockless Wrapper**

In [1]:
from __future__ import annotations

from pathlib import Path
from typing import Optional, Union, Dict, Any, List, Tuple
import json

import pandas as pd
import numpy as np


def run_sf_dockless_all_utilities_single_function(
    *,
    # ------------------------------------------------------------
    # USER INPUTS
    # ------------------------------------------------------------
    system_key: str,  # "SF_LIME_DOCKLESS" or "SF_SPIN_DOCKLESS"
    freebike_status_txt: Union[str, Path],

    # ------------------------------------------------------------
    # CITY ASSETS (same for both SF dockless systems unless overridden)
    # ------------------------------------------------------------
    census_blocks_shp: Union[str, Path] = r"D:\Research Fellowship\Summer Research Stuff\Clean_Utilities\GBFS_Census_Tract\San_Fran_Lime\tl_2024_06_tabblock20.shp",
    centerline_streets_csv: Union[str, Path] = r"D:\Research Fellowship\Summer Research Stuff\Clean_Utilities\Safety\San_Fran_Lime\Centerline.csv",
    bike_lanes_csv: Union[str, Path] = r"D:\Research Fellowship\Summer Research Stuff\Clean_Utilities\Safety\San_Fran_Lime\Bikelane.csv",
    centroid_tract_csv: Union[str, Path] = r"D:\Research Fellowship\Summer Research Stuff\Clean_Utilities\Capacity\San_Fran_Baywheels\centroid_tract_ca.csv",

    # ------------------------------------------------------------
    # OUTPUT
    # ------------------------------------------------------------
    output_dir: Optional[Union[str, Path]] = None,  # if None -> default per system
    save_outputs: bool = True,

    # ------------------------------------------------------------
    # TIME WINDOW (applies to availability/usage/idle)
    # ------------------------------------------------------------
    time_start: Optional[Union[str, pd.Timestamp]] = None,
    time_end: Optional[Union[str, pd.Timestamp]] = None,

    # ------------------------------------------------------------
    # STEP 0 SETTINGS
    # ------------------------------------------------------------
    step0_raw_csv_name: Optional[str] = None,   # if None -> default per system
    step0_done_csv_name: Optional[str] = None,  # if None -> default per system
    blocks_geoid_col: str = "GEOID20",
    blocks_target_epsg: int = 4326,
    fill_missing_with_census_api: bool = True,
    census_benchmark: str = "Public_AR_Census2020",
    census_vintage: str = "2020",
    timestamp_parse_mode: str = "auto",  # "auto" or "epoch"
    drop_cols_if_present: Optional[List[str]] = None,

    # ------------------------------------------------------------
    # AVAILABILITY SETTINGS
    # ------------------------------------------------------------
    availability_block_time_granularity: str = "5min",
    availability_tract_time_granularity: str = "1H",
    reserved_col: str = "is_reserved",
    disabled_col: str = "is_disabled",
    vehicle_id_col: str = "bike_id",
    include_vehicle_type: bool = True,
    vehicle_type_col_avail: str = "vehicle_type_id",
    tract_digits: int = 11,
    availability_normalize: bool = True,

    # ------------------------------------------------------------
    # USAGE SETTINGS
    # ------------------------------------------------------------
    usage_base_time_slot: str = "5min",
    usage_aggregate_time_slot: str = "1H",
    usage_rounding_decimals: int = 4,
    usage_normalize: bool = True,

    # ------------------------------------------------------------
    # IDLE TIME SETTINGS
    # ------------------------------------------------------------
    idle_base_time_slot: str = "5min",
    idle_hour_bucket_freq: str = "1H",
    idle_rounding_decimals: int = 4,
    idle_vehicle_id_col: str = "bike_id",
    idle_lat_col: str = "lat",
    idle_lon_col: str = "lon",
    idle_census_block_col: str = "census_block",
    idle_vehicle_type_col: str = "vehicle_type",
    idle_fill_full_grid: bool = True,
    idle_default_vehicle_type_for_missing: str = "scooter",
    idle_normalize: bool = True,

    # ------------------------------------------------------------
    # SAFETY SETTINGS (generic)
    # ------------------------------------------------------------
    safety_centerline_wkt_col: str = "line",
    safety_bike_lane_wkt_col: str = "shape",
    safety_input_crs: str = "EPSG:4326",
    safety_census_block_id_col: str = "GEOID20",
    safety_centroid_tract_id_col: str = "census_tract",
    safety_centroid_area_col: Optional[str] = "county_name",
    safety_working_epsg: Optional[int] = 26910,  # UTM zone for SF-ish
    safety_bike_lane_class_col: Optional[str] = None,
    safety_protected_values: Optional[List[str]] = None,
    safety_protected_keywords: Optional[List[str]] = None,
    safety_class_col_candidate_keywords: Optional[List[str]] = None,
    safety_protected_match_mode: str = "contains",
    safety_normalize: bool = True,
) -> Dict[str, Any]:
    """
    ONE function to run ALL SF dockless utilities for multiple systems (Lime, Spin):
      Step0 (txt->done_df with census block/tract)
      Availability (block+tract)
      Usage (5min movement -> hourly tract)
      Idle time (segment-based idle durations)
      Safety (centerline/bikelane intersection + protected inference)

    Use:
      system_key="SF_LIME_DOCKLESS" or "SF_SPIN_DOCKLESS"
    """

    # ============================================================
    # System presets (only differences: labels + default filenames + output tags)
    # ============================================================
    SYSTEMS: Dict[str, Dict[str, str]] = {
        "SF_LIME_DOCKLESS": {
            "city": "San Francisco",
            "vendor": "Lime",
            "system_type": "dockless",
            "tag": "sf_lime",
            "default_output_dir": "SF_LIME_DOCKLESS_FULL_RUN",
            "raw_csv": "san_fran_lime_status_raw.csv",
            "done_csv": "san_fran_lime_status_done.csv",
        },
        "SF_SPIN_DOCKLESS": {
            "city": "San Francisco",
            "vendor": "Spin",
            "system_type": "dockless",
            "tag": "sf_spin",
            "default_output_dir": "SF_SPIN_DOCKLESS_FULL_RUN",
            "raw_csv": "san_fran_spin_status_raw.csv",
            "done_csv": "san_fran_spin_status_done.csv",
        },
    }

    if system_key not in SYSTEMS:
        raise ValueError(f"Unknown system_key='{system_key}'. Choose from: {sorted(SYSTEMS.keys())}")

    preset = SYSTEMS[system_key]
    output_tag = preset["tag"]

    if output_dir is None:
        output_dir = preset["default_output_dir"]

    if step0_raw_csv_name is None:
        step0_raw_csv_name = preset["raw_csv"]
    if step0_done_csv_name is None:
        step0_done_csv_name = preset["done_csv"]

    # ============================================================
    # Setup output dir + paths
    # ============================================================
    out_dir = Path(output_dir)
    out_dir.mkdir(parents=True, exist_ok=True)

    freebike_status_txt = Path(freebike_status_txt)
    census_blocks_shp = Path(census_blocks_shp)
    centerline_streets_csv = Path(centerline_streets_csv)
    bike_lanes_csv = Path(bike_lanes_csv)
    centroid_tract_csv = Path(centroid_tract_csv)

    if not freebike_status_txt.exists():
        raise FileNotFoundError(f"freebike_status_txt not found: {freebike_status_txt}")
    for label, p in [
        ("census_blocks_shp", census_blocks_shp),
        ("centerline_streets_csv", centerline_streets_csv),
        ("bike_lanes_csv", bike_lanes_csv),
        ("centroid_tract_csv", centroid_tract_csv),
    ]:
        if not p.exists():
            raise FileNotFoundError(f"{label} not found: {p}")

    raw_path = out_dir / step0_raw_csv_name
    done_path = out_dir / step0_done_csv_name

    ts_start = pd.to_datetime(time_start) if time_start is not None else None
    ts_end = pd.to_datetime(time_end) if time_end is not None else None

    results: Dict[str, Any] = {
        "system_key": system_key,
        "city": preset["city"],
        "vendor": preset["vendor"],
        "system_type": preset["system_type"],
        "output_dir": str(out_dir),
    }

    # ============================================================
    # Helpers
    # ============================================================
    def _minmax(series: pd.Series) -> pd.Series:
        s = pd.to_numeric(series, errors="coerce")
        mn = s.min(skipna=True)
        mx = s.max(skipna=True)
        if pd.isna(mn) or pd.isna(mx) or mx <= mn:
            return pd.Series(0.0, index=s.index)
        return (s - mn) / (mx - mn)

    # ============================================================
    # STEP 0 (INLINE): txt -> raw_df -> spatial join -> api fill -> done_df
    # ============================================================
    with freebike_status_txt.open("r", encoding="utf-8") as f:
        lines = f.readlines()

    rows: List[dict] = []
    for line in lines:
        if not line.strip():
            continue
        try:
            blob = json.loads(line)
        except json.JSONDecodeError as e:
            print(f"⚠️ Error decoding JSON: {e}. Skipping line. First 100 chars: {line[:100]}...")
            continue

        timestamp = list(blob.keys())[0]
        entries = blob[timestamp]
        for entry in entries:
            if isinstance(entry, dict) and "vehicle_types_available" in entry:
                vta = entry.get("vehicle_types_available", [])
                if isinstance(vta, list):
                    for vt in vta:
                        try:
                            vid = vt["vehicle_type_id"]
                            cnt = vt["count"]
                            entry[f"vehicle_type_{vid}_count"] = cnt
                        except Exception:
                            pass
                try:
                    del entry["vehicle_types_available"]
                except Exception:
                    pass
            entry["timestamp"] = timestamp
            rows.append(entry)

    raw_df = pd.DataFrame(rows)

    if "timestamp" in raw_df.columns:
        if timestamp_parse_mode == "epoch":
            raw_df["timestamp"] = pd.to_datetime(raw_df["timestamp"], unit="s", errors="coerce")
        else:
            raw_df["timestamp"] = pd.to_datetime(raw_df["timestamp"], errors="coerce")
            if raw_df["timestamp"].isna().mean() > 0.5:
                raw_df["timestamp"] = pd.to_datetime(raw_df["timestamp"], unit="s", errors="coerce")

    if drop_cols_if_present:
        drop_now = [c for c in drop_cols_if_present if c in raw_df.columns]
        if drop_now:
            raw_df = raw_df.drop(columns=drop_now)

    if save_outputs:
        raw_df.to_csv(raw_path, index=False)

    if not {"lat", "lon"}.issubset(raw_df.columns):
        raise ValueError(
            "Dockless input must include 'lat' and 'lon' columns to assign census blocks. "
            "If Spin uses different names, pass overrides via idle_lat_col/idle_lon_col AND rename upstream."
        )

    latlon_df = raw_df.drop_duplicates(subset=["lat", "lon"])[["lat", "lon"]].copy()

    import geopandas as gpd
    from shapely.geometry import Point

    geometry = [Point(xy) for xy in zip(latlon_df["lon"], latlon_df["lat"])]
    points_gdf = gpd.GeoDataFrame(latlon_df, geometry=geometry, crs="EPSG:4326")

    blocks_gdf = gpd.read_file(str(census_blocks_shp)).to_crs(epsg=blocks_target_epsg)
    joined = gpd.sjoin(points_gdf, blocks_gdf, how="left", predicate="within")

    if blocks_geoid_col not in joined.columns:
        raise ValueError(
            f"Expected block GEOID column '{blocks_geoid_col}' not found after join. "
            f"Available columns: {list(joined.columns)}"
        )

    joined = joined.rename(columns={blocks_geoid_col: "census_block"})
    joined["census_block"] = joined["census_block"].astype(str)
    joined["census_tract"] = joined["census_block"].str[:-4]

    latlon_blocks_df = joined[["lat", "lon", "census_block", "census_tract"]].copy()
    done_df = raw_df.merge(latlon_blocks_df, on=["lat", "lon"], how="left")

    if fill_missing_with_census_api:
        import requests

        missing_points = done_df[done_df["census_block"].isna()][["lat", "lon"]].drop_duplicates().copy()
        if not missing_points.empty:
            try:
                from tqdm import tqdm
                tqdm.pandas()
                use_progress = True
            except Exception:
                use_progress = False

            def _get_block(lat: float, lon: float) -> Optional[str]:
                try:
                    url = (
                        "https://geocoding.geo.census.gov/geocoder/geographies/coordinates"
                        f"?x={lon}&y={lat}"
                        f"&benchmark={census_benchmark}"
                        f"&vintage={census_vintage}"
                        "&format=json"
                    )
                    r = requests.get(url, timeout=30)
                    r.raise_for_status()
                    js = r.json()
                    geos = js.get("result", {}).get("geographies", {})
                    blocks = geos.get("Census Blocks", [])
                    if blocks:
                        return blocks[0].get("GEOID")
                    blocks2020 = geos.get("2020 Census Blocks", [])
                    if blocks2020:
                        return blocks2020[0].get("GEOID")
                    return None
                except Exception:
                    return None

            if use_progress:
                missing_points["census_block_new"] = missing_points.progress_apply(
                    lambda r: _get_block(r["lat"], r["lon"]),
                    axis=1,
                )
            else:
                missing_points["census_block_new"] = missing_points.apply(
                    lambda r: _get_block(r["lat"], r["lon"]),
                    axis=1,
                )

            missing_points["census_tract_new"] = missing_points["census_block_new"].apply(
                lambda x: str(x)[:-4] if pd.notna(x) and x not in {"None", "nan"} else None
            )

            done_df = done_df.merge(
                missing_points[["lat", "lon", "census_block_new", "census_tract_new"]],
                on=["lat", "lon"],
                how="left",
            )
            done_df["census_block"] = done_df["census_block"].fillna(done_df["census_block_new"])
            done_df["census_tract"] = done_df["census_tract"].fillna(done_df["census_tract_new"])
            done_df = done_df.drop(columns=["census_block_new", "census_tract_new"])

    if save_outputs:
        done_df.to_csv(done_path, index=False)

    results["step0"] = {
        "raw_df": raw_df,
        "done_df": done_df,
        "latlon_blocks_df": latlon_blocks_df,
        "paths": {"raw_csv": str(raw_path), "done_csv": str(done_path)},
        "meta": {
            "raw_rows": len(raw_df),
            "done_rows": len(done_df),
            "unique_points": len(latlon_df),
            "missing_after_sjoin": int(done_df["census_block"].isna().sum()),
        },
    }

    # ============================================================
    # Apply global time filter once (for downstream)
    # ============================================================
    df = done_df.copy()
    df["timestamp"] = pd.to_datetime(df["timestamp"], errors="coerce")
    if df["timestamp"].isna().any():
        raise ValueError(f"Step0 produced {int(df['timestamp'].isna().sum())} rows with unparseable timestamps.")

    data_min = df["timestamp"].min()
    data_max = df["timestamp"].max()

    if ts_start is not None:
        df = df[df["timestamp"] >= ts_start].copy()
    if ts_end is not None:
        df = df[df["timestamp"] < ts_end].copy()

    if df.empty:
        raise ValueError(
            "No rows after applying time window.\n"
            f"Requested window: [{ts_start}, {ts_end})\n"
            f"Data timestamp range: [{data_min}, {data_max}]"
        )

    df["census_block"] = df["census_block"].astype(str)
    if "census_tract" not in df.columns:
        df["census_tract"] = df["census_block"].astype(str).str[:tract_digits]
    else:
        df["census_tract"] = df["census_tract"].astype(str)

    results["meta"] = {"time_start": ts_start, "time_end": ts_end, "data_min": data_min, "data_max": data_max}

    # ============================================================
    # AVAILABILITY (INLINE)
    # ============================================================
    df_av = df.sort_values(["timestamp", "census_block"]).reset_index(drop=True)
    df_av["time_slot"] = df_av["timestamp"].dt.floor(availability_block_time_granularity)

    if reserved_col not in df_av.columns or disabled_col not in df_av.columns:
        raise ValueError(
            f"Missing '{reserved_col}' and/or '{disabled_col}' for dockless availability. "
            "If Spin uses different names, pass reserved_col/disabled_col overrides."
        )

    available_df = df_av[(df_av[reserved_col] == 0) & (df_av[disabled_col] == 0)].copy()
    if vehicle_id_col not in available_df.columns:
        raise ValueError(f"Missing vehicle id column '{vehicle_id_col}' for availability (override vehicle_id_col).")

    all_blocks = df_av["census_block"].drop_duplicates().to_numpy()
    all_time_slots = pd.date_range(
        df_av["time_slot"].min(),
        df_av["time_slot"].max(),
        freq=availability_block_time_granularity,
    )

    full_index = pd.MultiIndex.from_product(
        [all_blocks, all_time_slots],
        names=["census_block", "time_slot"],
    ).to_frame(index=False)

    base_availability = (
        available_df.groupby(["census_block", "time_slot"])[vehicle_id_col]
        .count()
        .reset_index()
        .rename(columns={vehicle_id_col: "total_available"})
    )

    availability_block = full_index.merge(base_availability, on=["census_block", "time_slot"], how="left")
    availability_block["total_available"] = availability_block["total_available"].fillna(0).astype(int)

    if include_vehicle_type:
        if vehicle_type_col_avail not in available_df.columns:
            raise ValueError(f"include_vehicle_type=True but '{vehicle_type_col_avail}' not found (override).")
        type_pivot = available_df.pivot_table(
            index=["census_block", "time_slot"],
            columns=vehicle_type_col_avail,
            values=vehicle_id_col,
            aggfunc="count",
            fill_value=0,
        ).reset_index()
        type_pivot.columns.name = None
        type_pivot = type_pivot.rename(columns=lambda x: f"available_type_{x}" if isinstance(x, (int, np.integer)) else x)

        availability_block = availability_block.merge(type_pivot, on=["census_block", "time_slot"], how="left").fillna(0)
        numeric_cols = availability_block.columns.difference(["census_block", "time_slot"])
        availability_block[numeric_cols] = availability_block[numeric_cols].astype(int)

    availability_block["census_tract"] = availability_block["census_block"].astype(str).str[:tract_digits]
    availability_block = availability_block.sort_values(["census_block", "time_slot"]).reset_index(drop=True)

    # tract hourly mean
    available_df["time_slot_hour"] = available_df["timestamp"].dt.floor(availability_tract_time_granularity)
    tract_5min_df = (
        available_df.groupby(["census_tract", "timestamp"])
        .size()
        .reset_index(name="total_available_5min")
    )
    tract_5min_df["time_slot_hour"] = tract_5min_df["timestamp"].dt.floor(availability_tract_time_granularity)

    availability_tract_hourly_raw = (
        tract_5min_df.groupby(["census_tract", "time_slot_hour"])["total_available_5min"]
        .mean()
        .round(0)
        .astype(int)
        .reset_index()
        .rename(columns={"total_available_5min": "total_available", "time_slot_hour": "time_slot"})
        .sort_values(["census_tract", "time_slot"])
        .reset_index(drop=True)
    )

    availability_tract_hourly_norm = availability_tract_hourly_raw.copy()
    if availability_normalize:
        mn = availability_tract_hourly_norm["total_available"].min()
        mx = availability_tract_hourly_norm["total_available"].max()
        availability_tract_hourly_norm["total_available_norm"] = (
            0.0 if (pd.isna(mn) or pd.isna(mx) or mn == mx)
            else (availability_tract_hourly_norm["total_available"] - mn) / (mx - mn)
        )
        availability_tract_hourly_norm["total_available_norm"] = availability_tract_hourly_norm["total_available_norm"].round(5)

    if save_outputs:
        availability_block.to_csv(out_dir / f"availability_block_{output_tag}.csv", index=False)
        availability_tract_hourly_raw.to_csv(out_dir / f"availability_tract_hourly_raw_{output_tag}.csv", index=False)
        availability_tract_hourly_norm.to_csv(out_dir / f"availability_tract_hourly_norm_{output_tag}.csv", index=False)

    results["availability"] = {
        "availability_block_5min": availability_block,
        "availability_tract_hourly_raw": availability_tract_hourly_raw,
        "availability_tract_hourly_norm": availability_tract_hourly_norm,
    }

    # ============================================================
    # USAGE (INLINE)
    # ============================================================
    df_u = df.copy()
    required = {"timestamp", "lat", "lon", "census_block"}
    missing = required - set(df_u.columns)
    if missing:
        raise ValueError(f"Usage input missing required columns: {missing}")

    df_u["time_slot"] = df_u["timestamp"].dt.floor(usage_base_time_slot)
    df_u["rounded_lat"] = df_u["lat"].round(usage_rounding_decimals)
    df_u["rounded_lon"] = df_u["lon"].round(usage_rounding_decimals)

    counts = (
        df_u.groupby(["census_block", "time_slot", "rounded_lat", "rounded_lon"], as_index=False)
        .size()
        .rename(columns={"size": "cnt"})
    )

    all_blocks_u = df_u["census_block"].drop_duplicates().sort_values().to_numpy()
    all_slots = np.sort(df_u["time_slot"].unique())
    if len(all_slots) < 2:
        raise ValueError("Not enough time slots to compute dockless usage (need at least 2).")

    base_td = pd.to_timedelta(usage_base_time_slot)

    prev = counts.copy()
    prev["time_slot"] = prev["time_slot"] + base_td
    prev = prev.rename(columns={"cnt": "prev_cnt"})

    curr = counts.rename(columns={"cnt": "curr_cnt"})

    diff = prev.merge(curr, on=["census_block", "time_slot", "rounded_lat", "rounded_lon"], how="outer")
    diff["prev_cnt"] = diff["prev_cnt"].fillna(0).astype(int)
    diff["curr_cnt"] = diff["curr_cnt"].fillna(0).astype(int)

    diff["starts_part"] = (diff["prev_cnt"] - diff["curr_cnt"]).clip(lower=0)
    diff["ends_part"] = (diff["curr_cnt"] - diff["prev_cnt"]).clip(lower=0)

    usage_5min_block = (
        diff.groupby(["census_block", "time_slot"], as_index=False)
        .agg(num_trip_starts=("starts_part", "sum"), num_trip_ends=("ends_part", "sum"))
    )

    full_index = pd.MultiIndex.from_product(
        [all_blocks_u, all_slots[1:]],
        names=["census_block", "time_slot"],
    ).to_frame(index=False)
    usage_5min_block = full_index.merge(usage_5min_block, on=["census_block", "time_slot"], how="left")
    usage_5min_block[["num_trip_starts", "num_trip_ends"]] = (
        usage_5min_block[["num_trip_starts", "num_trip_ends"]].fillna(0).astype(int)
    )
    usage_5min_block = usage_5min_block.sort_values(["census_block", "time_slot"]).reset_index(drop=True)

    usage_5min_block["agg_slot"] = pd.to_datetime(usage_5min_block["time_slot"]).dt.floor(usage_aggregate_time_slot)

    usage_hourly_block = (
        usage_5min_block.groupby(["census_block", "agg_slot"], as_index=False)[["num_trip_starts", "num_trip_ends"]]
        .sum()
        .rename(columns={"agg_slot": "time_slot"})
        .sort_values(["census_block", "time_slot"])
        .reset_index(drop=True)
    )

    usage_hourly_block["census_tract"] = usage_hourly_block["census_block"].astype(str).str[:tract_digits]

    usage_hourly_tract = (
        usage_hourly_block.groupby(["census_tract", "time_slot"], as_index=False)
        .agg(num_trip_starts=("num_trip_starts", "sum"), num_trip_ends=("num_trip_ends", "sum"))
        .sort_values(["census_tract", "time_slot"])
        .reset_index(drop=True)
    )

    if usage_normalize:
        usage_hourly_tract["num_trip_starts_norm"] = _minmax(usage_hourly_tract["num_trip_starts"]).round(5)
        usage_hourly_tract["num_trip_ends_norm"] = _minmax(usage_hourly_tract["num_trip_ends"]).round(5)

    if save_outputs:
        usage_5min_block.to_csv(out_dir / f"usage_5min_block_{output_tag}.csv", index=False)
        usage_hourly_block.to_csv(out_dir / f"usage_hourly_block_{output_tag}.csv", index=False)
        usage_hourly_tract.to_csv(out_dir / f"usage_hourly_tract_{output_tag}.csv", index=False)

    results["usage"] = {
        "usage_5min_block": usage_5min_block,
        "usage_hourly_block": usage_hourly_block,
        "usage_hourly_tract": usage_hourly_tract,
    }

    # ============================================================
    # IDLE TIME (INLINE)
    # ============================================================
    df_i = df.copy()
    req = {idle_vehicle_id_col, "timestamp", idle_lat_col, idle_lon_col, idle_census_block_col, idle_vehicle_type_col}
    missing = req - set(df_i.columns)
    if missing:
        raise ValueError(
            f"Idle-time input missing required columns: {missing}. "
            "If Spin uses different names, override idle_*_col params."
        )

    df_i = df_i.sort_values([idle_vehicle_id_col, "timestamp"]).reset_index(drop=True)
    df_i["rounded_lat"] = df_i[idle_lat_col].round(idle_rounding_decimals)
    df_i["rounded_lon"] = df_i[idle_lon_col].round(idle_rounding_decimals)

    df_i["time_slot_5min"] = df_i["timestamp"].dt.floor(idle_base_time_slot)
    df_i["hour_bucket"] = df_i["timestamp"].dt.floor(idle_hour_bucket_freq)

    df_i["location_key"] = df_i["rounded_lat"].astype(str) + "_" + df_i["rounded_lon"].astype(str)

    df_i["location_shift"] = df_i.groupby([idle_vehicle_id_col, idle_census_block_col, "hour_bucket"])["location_key"].shift()
    df_i["new_segment"] = (df_i["location_key"] != df_i["location_shift"]).astype(int)
    df_i["segment_id"] = df_i.groupby([idle_vehicle_id_col, idle_census_block_col, "hour_bucket"])["new_segment"].cumsum()

    segments = (
        df_i.groupby([idle_vehicle_id_col, idle_census_block_col, "hour_bucket", idle_vehicle_type_col, "segment_id"], as_index=False)
        .agg(segment_start=("timestamp", "min"), segment_end=("timestamp", "max"), ping_count=("timestamp", "count"))
    )
    segments = segments[segments["ping_count"] > 1].copy()
    segments["idle_duration_minutes"] = (segments["segment_end"] - segments["segment_start"]).dt.total_seconds() / 60.0

    avg_idle = (
        segments.groupby([idle_census_block_col, "hour_bucket", idle_vehicle_type_col], as_index=False)["idle_duration_minutes"]
        .mean()
        .rename(columns={"idle_duration_minutes": "avg_idle_time_minutes"})
    )
    idle_counts = (
        segments.groupby([idle_census_block_col, "hour_bucket", idle_vehicle_type_col], as_index=False)["segment_id"]
        .count()
        .rename(columns={"segment_id": "num_idle_segments"})
    )
    idle_summary = avg_idle.merge(idle_counts, on=[idle_census_block_col, "hour_bucket", idle_vehicle_type_col], how="inner")
    idle_summary = idle_summary.sort_values([idle_census_block_col, "hour_bucket"]).reset_index(drop=True)

    idle_summary_full = None
    if idle_fill_full_grid:
        service_area_blocks = df_i[idle_census_block_col].drop_duplicates().to_numpy()
        all_hours = np.sort(df_i["hour_bucket"].unique())
        full_index = pd.MultiIndex.from_product(
            [service_area_blocks, all_hours],
            names=[idle_census_block_col, "hour_bucket"],
        ).to_frame(index=False)

        idle_summary_full = full_index.merge(idle_summary, on=[idle_census_block_col, "hour_bucket"], how="left")
        idle_summary_full[idle_vehicle_type_col] = idle_summary_full[idle_vehicle_type_col].fillna(idle_default_vehicle_type_for_missing)
        idle_summary_full["avg_idle_time_minutes"] = idle_summary_full["avg_idle_time_minutes"].fillna(0.0)
        idle_summary_full["num_idle_segments"] = idle_summary_full["num_idle_segments"].fillna(0).astype(int)
        idle_summary_full = idle_summary_full.sort_values([idle_census_block_col, "hour_bucket"]).reset_index(drop=True)

    block_for_tract = idle_summary_full if idle_summary_full is not None else idle_summary
    block_for_tract["census_tract"] = block_for_tract[idle_census_block_col].astype(str).str[:tract_digits]

    tract_idle_summary = (
        block_for_tract.groupby(["census_tract", "hour_bucket", idle_vehicle_type_col], as_index=False)
        .agg(avg_idle_time_minutes=("avg_idle_time_minutes", "mean"), num_idle_segments=("num_idle_segments", "sum"))
        .sort_values(["census_tract", "hour_bucket"])
        .reset_index(drop=True)
    )

    tract_idle_norm = tract_idle_summary.copy()
    if idle_normalize:
        tract_idle_norm["avg_idle_time_minutes_norm"] = _minmax(tract_idle_norm["avg_idle_time_minutes"]).round(5)

    if save_outputs:
        idle_summary.to_csv(out_dir / f"idle_summary_block_{output_tag}.csv", index=False)
        if idle_summary_full is not None:
            idle_summary_full.to_csv(out_dir / f"idle_summary_block_full_{output_tag}.csv", index=False)
        tract_idle_summary.to_csv(out_dir / f"idle_summary_tract_{output_tag}.csv", index=False)
        tract_idle_norm.to_csv(out_dir / f"idle_summary_tract_norm_{output_tag}.csv", index=False)

    results["idle_time"] = {
        "idle_summary_block": idle_summary,
        "idle_summary_block_full": idle_summary_full,
        "idle_summary_tract": tract_idle_summary,
        "idle_summary_tract_norm": tract_idle_norm,
    }

    # ============================================================
    # SAFETY (INLINE)
    # ============================================================
    import geopandas as gpd
    from shapely import wkt

    if safety_protected_keywords is None:
        safety_protected_keywords = [
            "protected", "separated", "cycle track", "cycletrack", "parking protected",
            "class i", "class 1", "path", "off-street", "off street", "buffered", "raised"
        ]
    if safety_class_col_candidate_keywords is None:
        safety_class_col_candidate_keywords = [
            "facility", "class", "type", "category", "treatment", "protected",
            "separation", "separator", "lane", "bikelane", "bike lane", "route"
        ]

    safety_protected_match_mode = safety_protected_match_mode.lower().strip()
    if safety_protected_match_mode not in {"exact", "contains"}:
        raise ValueError("safety_protected_match_mode must be 'exact' or 'contains'")

    blocks = gpd.read_file(str(census_blocks_shp))
    if safety_census_block_id_col not in blocks.columns:
        raise ValueError(
            f"census block ID column '{safety_census_block_id_col}' not found. "
            f"Available: {list(blocks.columns)[:25]} ..."
        )

    if safety_working_epsg is not None:
        blocks = blocks.to_crs(epsg=int(safety_working_epsg))
    else:
        if blocks.crs is None:
            raise ValueError("census_blocks_shp has no CRS. Provide safety_working_epsg.")

    streets_df = pd.read_csv(str(centerline_streets_csv))
    if safety_centerline_wkt_col not in streets_df.columns:
        raise ValueError(f"Centerline WKT column '{safety_centerline_wkt_col}' not found in centerline CSV.")
    streets_df["geometry"] = streets_df[safety_centerline_wkt_col].apply(
        lambda x: wkt.loads(x) if isinstance(x, str) and x.startswith("LINESTRING") else None
    )
    streets_gdf = gpd.GeoDataFrame(streets_df, geometry="geometry", crs=safety_input_crs)
    streets_gdf = streets_gdf[~streets_gdf["geometry"].isna()].copy()
    streets_gdf = streets_gdf.to_crs(blocks.crs)

    bike_df = pd.read_csv(str(bike_lanes_csv))
    if safety_bike_lane_wkt_col not in bike_df.columns:
        raise ValueError(f"Bike lane WKT column '{safety_bike_lane_wkt_col}' not found in bike lane CSV.")
    bike_df["geometry"] = bike_df[safety_bike_lane_wkt_col].apply(
        lambda x: wkt.loads(x) if isinstance(x, str) and x.startswith("LINESTRING") else None
    )
    bike_gdf = gpd.GeoDataFrame(bike_df, geometry="geometry", crs=safety_input_crs)
    bike_gdf = bike_gdf[~bike_gdf["geometry"].isna()].copy()
    bike_gdf = bike_gdf.to_crs(blocks.crs)

    def _norm_text(x: Any) -> str:
        if pd.isna(x):
            return ""
        return str(x).strip().lower()

    detected_class_col = None
    detection_scores: List[Tuple[str, float]] = []

    if safety_bike_lane_class_col is None:
        text_cols = [c for c in bike_gdf.columns if bike_gdf[c].dtype == "object" and c != "geometry"]

        def _score_col(col: str) -> float:
            name_score = sum(1 for kw in safety_class_col_candidate_keywords if kw in col.lower())
            sample = bike_gdf[col].dropna().astype(str).head(5000)
            val_join = " ".join(sample.str.lower().tolist())
            val_score = sum(val_join.count(kw) for kw in safety_protected_keywords)
            return float(name_score) + float(val_score) / 1000.0

        for c in text_cols:
            s = _score_col(c)
            detection_scores.append((c, s))
        detection_scores.sort(key=lambda x: x[1], reverse=True)
        if detection_scores and detection_scores[0][1] > 0:
            detected_class_col = detection_scores[0][0]
            safety_bike_lane_class_col = detected_class_col

    can_compute_protected = safety_bike_lane_class_col is not None and safety_bike_lane_class_col in bike_gdf.columns

    protected_gdf = bike_gdf.iloc[0:0].copy()
    inferred_protected_values = None

    if can_compute_protected:
        col_series = bike_gdf[safety_bike_lane_class_col].astype(str)
        if safety_protected_values:
            pv = [p.strip().lower() for p in safety_protected_values]
            if safety_protected_match_mode == "exact":
                mask = col_series.map(_norm_text).isin(pv)
            else:
                mask = col_series.map(lambda v: any(k in _norm_text(v) for k in pv))
            protected_gdf = bike_gdf[mask].copy()
            inferred_protected_values = pv
        else:
            kws = [k.strip().lower() for k in safety_protected_keywords]
            mask = col_series.map(lambda v: any(k in _norm_text(v) for k in kws))
            protected_gdf = bike_gdf[mask].copy()
            inferred_protected_values = kws

    streets_in_blocks = gpd.overlay(streets_gdf, blocks, how="intersection")
    streets_in_blocks["segment_length"] = streets_in_blocks.geometry.length
    street_len_block = (
        streets_in_blocks.groupby(safety_census_block_id_col, as_index=False)
        .agg(streets_leng=("segment_length", "sum"))
        .rename(columns={safety_census_block_id_col: "census_block"})
    )
    street_len_block["census_block"] = street_len_block["census_block"].astype(str)
    street_len_block["streets_leng"] = street_len_block["streets_leng"].round(3)

    bike_in_blocks = gpd.overlay(bike_gdf, blocks, how="intersection")
    bike_in_blocks["bike_lane_length"] = bike_in_blocks.geometry.length
    bike_len_block = (
        bike_in_blocks.groupby(safety_census_block_id_col, as_index=False)
        .agg(total_bike_lane_length=("bike_lane_length", "sum"))
        .rename(columns={safety_census_block_id_col: "census_block"})
    )
    bike_len_block["census_block"] = bike_len_block["census_block"].astype(str)
    bike_len_block["total_bike_lane_length"] = bike_len_block["total_bike_lane_length"].round(3)

    protected_len_block = pd.DataFrame({"census_block": [], "protected_bike_lane_length": []})
    if can_compute_protected and not protected_gdf.empty:
        protected_in_blocks = gpd.overlay(protected_gdf, blocks, how="intersection")
        protected_in_blocks["protected_lane_length"] = protected_in_blocks.geometry.length
        protected_len_block = (
            protected_in_blocks.groupby(safety_census_block_id_col, as_index=False)
            .agg(protected_bike_lane_length=("protected_lane_length", "sum"))
            .rename(columns={safety_census_block_id_col: "census_block"})
        )
        protected_len_block["census_block"] = protected_len_block["census_block"].astype(str)
        protected_len_block["protected_bike_lane_length"] = protected_len_block["protected_bike_lane_length"].round(3)

    safety_block = (
        street_len_block.merge(bike_len_block, on="census_block", how="left")
        .merge(protected_len_block, on="census_block", how="left")
    )
    safety_block["total_bike_lane_length"] = safety_block["total_bike_lane_length"].fillna(0.0)
    safety_block["protected_bike_lane_length"] = safety_block["protected_bike_lane_length"].fillna(0.0)

    safety_block["bike_lane_ratio"] = safety_block.apply(
        lambda r: (r["total_bike_lane_length"] / r["streets_leng"]) if r["streets_leng"] > 0 else 0.0,
        axis=1,
    ).round(3)
    safety_block["protected_bike_lane_ratio"] = safety_block.apply(
        lambda r: (r["protected_bike_lane_length"] / r["streets_leng"]) if r["streets_leng"] > 0 else 0.0,
        axis=1,
    ).round(3)

    ct = pd.read_csv(str(centroid_tract_csv))
    if safety_centroid_tract_id_col not in ct.columns:
        raise ValueError(f"centroid_tract_csv missing '{safety_centroid_tract_id_col}'. Available: {list(ct.columns)}")
    ct[safety_centroid_tract_id_col] = ct[safety_centroid_tract_id_col].astype(str)

    safety_block["census_tract"] = safety_block["census_block"].astype(str).str[:tract_digits]
    tract_sums = safety_block.groupby("census_tract", as_index=False)[
        ["streets_leng", "total_bike_lane_length", "protected_bike_lane_length"]
    ].sum()

    keep_cols = [safety_centroid_tract_id_col]
    if safety_centroid_area_col is not None and safety_centroid_area_col in ct.columns:
        keep_cols.append(safety_centroid_area_col)

    safety_tract = ct[keep_cols].merge(
        tract_sums,
        left_on=safety_centroid_tract_id_col,
        right_on="census_tract",
        how="left",
    )
    safety_tract[["streets_leng", "total_bike_lane_length", "protected_bike_lane_length"]] = (
        safety_tract[["streets_leng", "total_bike_lane_length", "protected_bike_lane_length"]].fillna(0.0)
    )

    safety_tract["bike_lane_ratio"] = safety_tract.apply(
        lambda r: (r["total_bike_lane_length"] / r["streets_leng"]) if r["streets_leng"] > 0 else 0.0,
        axis=1,
    )
    safety_tract["protected_bike_lane_ratio"] = safety_tract.apply(
        lambda r: (r["protected_bike_lane_length"] / r["streets_leng"]) if r["streets_leng"] > 0 else 0.0,
        axis=1,
    )

    safety_tract = safety_tract.sort_values(by=safety_centroid_tract_id_col).reset_index(drop=True)

    safety_tract_norm = safety_tract.copy()
    if safety_normalize:
        safety_tract_norm["bike_lane_ratio_norm"] = _minmax(safety_tract_norm["bike_lane_ratio"]).round(5)
        safety_tract_norm["protected_bike_lane_ratio_norm"] = _minmax(safety_tract_norm["protected_bike_lane_ratio"]).round(5)

    if save_outputs:
        safety_block.to_csv(out_dir / f"safety_block_bike_lane_{output_tag}.csv", index=False)
        safety_tract.to_csv(out_dir / f"safety_bike_lane_tract_{output_tag}.csv", index=False)
        safety_tract_norm.to_csv(out_dir / f"safety_bike_lane_norm_tract_{output_tag}.csv", index=False)

    results["safety"] = {
        "safety_block_df": safety_block,
        "safety_tract_df": safety_tract,
        "safety_tract_norm_df": safety_tract_norm,
        "meta": {
            "working_crs": str(blocks.crs),
            "bike_lane_class_col_used": safety_bike_lane_class_col,
            "bike_lane_class_col_detected": detected_class_col,
            "protected_match_mode": safety_protected_match_mode,
            "protected_values_used": inferred_protected_values,
            "detection_scores_top5": detection_scores[:5],
            "normalize": safety_normalize,
        },
    }

    return results


# ----------------------------
# Example calls
# ----------------------------
# LIME
# out_lime = run_sf_dockless_all_utilities_single_function(
#     system_key="SF_LIME_DOCKLESS",
#     freebike_status_txt=r"D:\Research Fellowship\Summer Research Stuff\Collected Data\Week 1\09-June\san_fran_lime_dkless_freebike_status_6_9.txt",
#     output_dir="SF_LIME_FULL_RUN",
#     time_start="2025-06-09 06:00:00",
#     time_end="2025-06-09 12:00:00",
# )

# SPIN
# out_spin = run_sf_dockless_all_utilities_single_function(
#     system_key="SF_SPIN_DOCKLESS",
#     freebike_status_txt=r"D:\Research Fellowship\Summer Research Stuff\Collected Data\Week 1\09-June\san_fran_spin_dkless_freebike_status_6_9.txt",
#     output_dir="SF_SPIN_FULL_RUN",
#     time_start="2025-06-09 06:00:00",
#     time_end="2025-06-09 12:00:00",
# )


In [2]:
out_spin = run_sf_dockless_all_utilities_single_function( 
    system_key="SF_SPIN_DOCKLESS", # Change the system key accordingly SF_LIME_DOCKLESS or SF_SPIN_DOCKLESS
    freebike_status_txt=r"D:\Research Fellowship\Summer Research Stuff\Collected Data\Week 1\09-June\san_fran_spin_dkless_freebike_status_6_9.txt",
    output_dir="SF_SPIN_FULL_RUN", #Change the output directory name accordingly
    time_start="2025-06-09 06:00:00",
    time_end="2025-06-09 12:00:00",
)

  raw_df["timestamp"] = pd.to_datetime(raw_df["timestamp"], errors="coerce")
  available_df["time_slot_hour"] = available_df["timestamp"].dt.floor(availability_tract_time_granularity)
  tract_5min_df["time_slot_hour"] = tract_5min_df["timestamp"].dt.floor(availability_tract_time_granularity)
  usage_5min_block["agg_slot"] = pd.to_datetime(usage_5min_block["time_slot"]).dt.floor(usage_aggregate_time_slot)


ValueError: Idle-time input missing required columns: {'vehicle_type'}. If Spin uses different names, override idle_*_col params.

In [1]:
from __future__ import annotations

from pathlib import Path
from typing import Optional, Union, Dict, Any, List, Tuple
import json
import requests

import pandas as pd
import numpy as np
import geopandas as gpd
from shapely.geometry import Point
from shapely import wkt
from tqdm import tqdm


def run_sf_dockless_all_utilities_single_function(
    *,
    # ------------------------------------------------------------
    # USER INPUTS
    # ------------------------------------------------------------
    system_key: str,  # "SF_LIME_DOCKLESS" or "SF_SPIN_DOCKLESS"
    freebike_status_txt: Union[str, Path],

    # ------------------------------------------------------------
    # CITY ASSETS (same for both SF dockless systems unless overridden)
    # ------------------------------------------------------------
    census_blocks_shp: Union[str, Path] = r"D:\Research Fellowship\Summer Research Stuff\Clean_Utilities\GBFS_Census_Tract\San_Fran_Lime\tl_2024_06_tabblock20.shp",
    centerline_streets_csv: Union[str, Path] = r"D:\Research Fellowship\Summer Research Stuff\Clean_Utilities\Safety\San_Fran_Lime\Centerline.csv",
    bike_lanes_csv: Union[str, Path] = r"D:\Research Fellowship\Summer Research Stuff\Clean_Utilities\Safety\San_Fran_Lime\Bikelane.csv",
    centroid_tract_csv: Union[str, Path] = r"D:\Research Fellowship\Summer Research Stuff\Clean_Utilities\Capacity\San_Fran_Baywheels\centroid_tract_ca.csv",

    # ------------------------------------------------------------
    # OUTPUT
    # ------------------------------------------------------------
    output_dir: Optional[Union[str, Path]] = None,  # if None -> default per system
    save_outputs: bool = True,

    # ------------------------------------------------------------
    # TIME WINDOW (applies to availability/usage/idle)
    # ------------------------------------------------------------
    time_start: Optional[Union[str, pd.Timestamp]] = None,
    time_end: Optional[Union[str, pd.Timestamp]] = None,

    # ------------------------------------------------------------
    # STEP 0 SETTINGS
    # ------------------------------------------------------------
    step0_raw_csv_name: Optional[str] = None,   # if None -> default per system
    step0_done_csv_name: Optional[str] = None,  # if None -> default per system
    blocks_geoid_col: str = "GEOID20",
    blocks_target_epsg: int = 4326,
    fill_missing_with_census_api: bool = True,
    census_benchmark: str = "Public_AR_Census2020",
    census_vintage: str = "2020",
    timestamp_parse_mode: str = "auto",  # "auto" or "epoch"
    drop_cols_if_present: Optional[List[str]] = None,

    # ------------------------------------------------------------
    # AVAILABILITY SETTINGS
    # ------------------------------------------------------------
    availability_block_time_granularity: str = "5min",
    availability_tract_time_granularity: str = "1H",
    reserved_col: str = "is_reserved",
    disabled_col: str = "is_disabled",
    vehicle_id_col: str = "bike_id",
    include_vehicle_type: bool = True,
    vehicle_type_col_avail: str = "vehicle_type_id",
    tract_digits: int = 11,
    availability_normalize: bool = True,

    # ------------------------------------------------------------
    # USAGE SETTINGS
    # ------------------------------------------------------------
    usage_base_time_slot: str = "5min",
    usage_aggregate_time_slot: str = "1H",
    usage_rounding_decimals: int = 4,
    usage_normalize: bool = True,

    # ------------------------------------------------------------
    # IDLE TIME SETTINGS
    # ------------------------------------------------------------
    idle_base_time_slot: str = "5min",
    idle_hour_bucket_freq: str = "1H",
    idle_rounding_decimals: int = 4,
    idle_vehicle_id_col: str = "bike_id",
    idle_lat_col: str = "lat",
    idle_lon_col: str = "lon",
    idle_census_block_col: str = "census_block",
    
    # --- UPDATED DEFAULT BELOW ---
    idle_vehicle_type_col: str = "vehicle_type_id", 
    # -----------------------------
    
    idle_fill_full_grid: bool = True,
    idle_default_vehicle_type_for_missing: str = "scooter",
    idle_normalize: bool = True,

    # ------------------------------------------------------------
    # SAFETY SETTINGS (generic)
    # ------------------------------------------------------------
    safety_centerline_wkt_col: str = "line",
    safety_bike_lane_wkt_col: str = "shape",
    safety_input_crs: str = "EPSG:4326",
    safety_census_block_id_col: str = "GEOID20",
    safety_centroid_tract_id_col: str = "census_tract",
    safety_centroid_area_col: Optional[str] = "county_name",
    safety_working_epsg: Optional[int] = 26910,  # UTM zone for SF-ish
    safety_bike_lane_class_col: Optional[str] = None,
    safety_protected_values: Optional[List[str]] = None,
    safety_protected_keywords: Optional[List[str]] = None,
    safety_class_col_candidate_keywords: Optional[List[str]] = None,
    safety_protected_match_mode: str = "contains",
    safety_normalize: bool = True,
) -> Dict[str, Any]:
    """
    ONE function to run ALL SF dockless utilities for multiple systems (Lime, Spin):
      Step0 (txt->done_df with census block/tract)
      Availability (block+tract)
      Usage (5min movement -> hourly tract)
      Idle time (segment-based idle durations)
      Safety (centerline/bikelane intersection + protected inference)

    Use:
      system_key="SF_LIME_DOCKLESS" or "SF_SPIN_DOCKLESS"
    """

    # ============================================================
    # System presets (only differences: labels + default filenames + output tags)
    # ============================================================
    SYSTEMS: Dict[str, Dict[str, str]] = {
        "SF_LIME_DOCKLESS": {
            "city": "San Francisco",
            "vendor": "Lime",
            "system_type": "dockless",
            "tag": "sf_lime",
            "default_output_dir": "SF_LIME_DOCKLESS_FULL_RUN",
            "raw_csv": "san_fran_lime_status_raw.csv",
            "done_csv": "san_fran_lime_status_done.csv",
        },
        "SF_SPIN_DOCKLESS": {
            "city": "San Francisco",
            "vendor": "Spin",
            "system_type": "dockless",
            "tag": "sf_spin",
            "default_output_dir": "SF_SPIN_DOCKLESS_FULL_RUN",
            "raw_csv": "san_fran_spin_status_raw.csv",
            "done_csv": "san_fran_spin_status_done.csv",
        },
    }

    if system_key not in SYSTEMS:
        raise ValueError(f"Unknown system_key='{system_key}'. Choose from: {sorted(SYSTEMS.keys())}")

    preset = SYSTEMS[system_key]
    output_tag = preset["tag"]

    if output_dir is None:
        output_dir = preset["default_output_dir"]

    if step0_raw_csv_name is None:
        step0_raw_csv_name = preset["raw_csv"]
    if step0_done_csv_name is None:
        step0_done_csv_name = preset["done_csv"]

    # ============================================================
    # Setup output dir + paths
    # ============================================================
    out_dir = Path(output_dir)
    out_dir.mkdir(parents=True, exist_ok=True)

    freebike_status_txt = Path(freebike_status_txt)
    census_blocks_shp = Path(census_blocks_shp)
    centerline_streets_csv = Path(centerline_streets_csv)
    bike_lanes_csv = Path(bike_lanes_csv)
    centroid_tract_csv = Path(centroid_tract_csv)

    if not freebike_status_txt.exists():
        raise FileNotFoundError(f"freebike_status_txt not found: {freebike_status_txt}")
    for label, p in [
        ("census_blocks_shp", census_blocks_shp),
        ("centerline_streets_csv", centerline_streets_csv),
        ("bike_lanes_csv", bike_lanes_csv),
        ("centroid_tract_csv", centroid_tract_csv),
    ]:
        if not p.exists():
            raise FileNotFoundError(f"{label} not found: {p}")

    raw_path = out_dir / step0_raw_csv_name
    done_path = out_dir / step0_done_csv_name

    ts_start = pd.to_datetime(time_start) if time_start is not None else None
    ts_end = pd.to_datetime(time_end) if time_end is not None else None

    results: Dict[str, Any] = {
        "system_key": system_key,
        "city": preset["city"],
        "vendor": preset["vendor"],
        "system_type": preset["system_type"],
        "output_dir": str(out_dir),
    }

    # ============================================================
    # Helpers
    # ============================================================
    def _minmax(series: pd.Series) -> pd.Series:
        s = pd.to_numeric(series, errors="coerce")
        mn = s.min(skipna=True)
        mx = s.max(skipna=True)
        if pd.isna(mn) or pd.isna(mx) or mx <= mn:
            return pd.Series(0.0, index=s.index)
        return (s - mn) / (mx - mn)

    # ============================================================
    # STEP 0 (INLINE): txt -> raw_df -> spatial join -> api fill -> done_df
    # ============================================================
    with freebike_status_txt.open("r", encoding="utf-8") as f:
        lines = f.readlines()

    rows: List[dict] = []
    for line in lines:
        if not line.strip():
            continue
        try:
            blob = json.loads(line)
        except json.JSONDecodeError as e:
            print(f"⚠️ Error decoding JSON: {e}. Skipping line. First 100 chars: {line[:100]}...")
            continue

        timestamp = list(blob.keys())[0]
        entries = blob[timestamp]
        for entry in entries:
            if isinstance(entry, dict) and "vehicle_types_available" in entry:
                vta = entry.get("vehicle_types_available", [])
                if isinstance(vta, list):
                    for vt in vta:
                        try:
                            vid = vt["vehicle_type_id"]
                            cnt = vt["count"]
                            entry[f"vehicle_type_{vid}_count"] = cnt
                        except Exception:
                            pass
                try:
                    del entry["vehicle_types_available"]
                except Exception:
                    pass
            entry["timestamp"] = timestamp
            rows.append(entry)

    raw_df = pd.DataFrame(rows)

    if "timestamp" in raw_df.columns:
        if timestamp_parse_mode == "epoch":
            raw_df["timestamp"] = pd.to_datetime(raw_df["timestamp"], unit="s", errors="coerce")
        else:
            raw_df["timestamp"] = pd.to_datetime(raw_df["timestamp"], errors="coerce")
            if raw_df["timestamp"].isna().mean() > 0.5:
                raw_df["timestamp"] = pd.to_datetime(raw_df["timestamp"], unit="s", errors="coerce")

    if drop_cols_if_present:
        drop_now = [c for c in drop_cols_if_present if c in raw_df.columns]
        if drop_now:
            raw_df = raw_df.drop(columns=drop_now)

    if save_outputs:
        raw_df.to_csv(raw_path, index=False)

    if not {"lat", "lon"}.issubset(raw_df.columns):
        raise ValueError(
            "Dockless input must include 'lat' and 'lon' columns to assign census blocks. "
            "If Spin uses different names, pass overrides via idle_lat_col/idle_lon_col AND rename upstream."
        )

    latlon_df = raw_df.drop_duplicates(subset=["lat", "lon"])[["lat", "lon"]].copy()

    geometry = [Point(xy) for xy in zip(latlon_df["lon"], latlon_df["lat"])]
    points_gdf = gpd.GeoDataFrame(latlon_df, geometry=geometry, crs="EPSG:4326")

    blocks_gdf = gpd.read_file(str(census_blocks_shp)).to_crs(epsg=blocks_target_epsg)
    joined = gpd.sjoin(points_gdf, blocks_gdf, how="left", predicate="within")

    if blocks_geoid_col not in joined.columns:
        raise ValueError(
            f"Expected block GEOID column '{blocks_geoid_col}' not found after join. "
            f"Available columns: {list(joined.columns)}"
        )

    joined = joined.rename(columns={blocks_geoid_col: "census_block"})
    joined["census_block"] = joined["census_block"].astype(str)
    joined["census_tract"] = joined["census_block"].str[:-4]

    latlon_blocks_df = joined[["lat", "lon", "census_block", "census_tract"]].copy()
    done_df = raw_df.merge(latlon_blocks_df, on=["lat", "lon"], how="left")

    if fill_missing_with_census_api:
        missing_points = done_df[done_df["census_block"].isna()][["lat", "lon"]].drop_duplicates().copy()
        if not missing_points.empty:
            try:
                tqdm.pandas()
                use_progress = True
            except Exception:
                use_progress = False

            def _get_block(lat: float, lon: float) -> Optional[str]:
                try:
                    url = (
                        "https://geocoding.geo.census.gov/geocoder/geographies/coordinates"
                        f"?x={lon}&y={lat}"
                        f"&benchmark={census_benchmark}"
                        f"&vintage={census_vintage}"
                        "&format=json"
                    )
                    r = requests.get(url, timeout=30)
                    r.raise_for_status()
                    js = r.json()
                    geos = js.get("result", {}).get("geographies", {})
                    blocks = geos.get("Census Blocks", [])
                    if blocks:
                        return blocks[0].get("GEOID")
                    blocks2020 = geos.get("2020 Census Blocks", [])
                    if blocks2020:
                        return blocks2020[0].get("GEOID")
                    return None
                except Exception:
                    return None

            if use_progress:
                missing_points["census_block_new"] = missing_points.progress_apply(
                    lambda r: _get_block(r["lat"], r["lon"]),
                    axis=1,
                )
            else:
                missing_points["census_block_new"] = missing_points.apply(
                    lambda r: _get_block(r["lat"], r["lon"]),
                    axis=1,
                )

            missing_points["census_tract_new"] = missing_points["census_block_new"].apply(
                lambda x: str(x)[:-4] if pd.notna(x) and x not in {"None", "nan"} else None
            )

            done_df = done_df.merge(
                missing_points[["lat", "lon", "census_block_new", "census_tract_new"]],
                on=["lat", "lon"],
                how="left",
            )
            done_df["census_block"] = done_df["census_block"].fillna(done_df["census_block_new"])
            done_df["census_tract"] = done_df["census_tract"].fillna(done_df["census_tract_new"])
            done_df = done_df.drop(columns=["census_block_new", "census_tract_new"])

    if save_outputs:
        done_df.to_csv(done_path, index=False)

    results["step0"] = {
        "raw_df": raw_df,
        "done_df": done_df,
        "latlon_blocks_df": latlon_blocks_df,
        "paths": {"raw_csv": str(raw_path), "done_csv": str(done_path)},
        "meta": {
            "raw_rows": len(raw_df),
            "done_rows": len(done_df),
            "unique_points": len(latlon_df),
            "missing_after_sjoin": int(done_df["census_block"].isna().sum()),
        },
    }

    # ============================================================
    # Apply global time filter once (for downstream)
    # ============================================================
    df = done_df.copy()
    df["timestamp"] = pd.to_datetime(df["timestamp"], errors="coerce")
    if df["timestamp"].isna().any():
        raise ValueError(f"Step0 produced {int(df['timestamp'].isna().sum())} rows with unparseable timestamps.")

    data_min = df["timestamp"].min()
    data_max = df["timestamp"].max()

    if ts_start is not None:
        df = df[df["timestamp"] >= ts_start].copy()
    if ts_end is not None:
        df = df[df["timestamp"] < ts_end].copy()

    if df.empty:
        raise ValueError(
            "No rows after applying time window.\n"
            f"Requested window: [{ts_start}, {ts_end})\n"
            f"Data timestamp range: [{data_min}, {data_max}]"
        )

    df["census_block"] = df["census_block"].astype(str)
    if "census_tract" not in df.columns:
        df["census_tract"] = df["census_block"].astype(str).str[:tract_digits]
    else:
        df["census_tract"] = df["census_tract"].astype(str)

    results["meta"] = {"time_start": ts_start, "time_end": ts_end, "data_min": data_min, "data_max": data_max}

    # ============================================================
    # AVAILABILITY (INLINE)
    # ============================================================
    df_av = df.sort_values(["timestamp", "census_block"]).reset_index(drop=True)
    df_av["time_slot"] = df_av["timestamp"].dt.floor(availability_block_time_granularity)

    if reserved_col not in df_av.columns or disabled_col not in df_av.columns:
        raise ValueError(
            f"Missing '{reserved_col}' and/or '{disabled_col}' for dockless availability. "
            "If Spin uses different names, pass reserved_col/disabled_col overrides."
        )

    available_df = df_av[(df_av[reserved_col] == 0) & (df_av[disabled_col] == 0)].copy()
    if vehicle_id_col not in available_df.columns:
        raise ValueError(f"Missing vehicle id column '{vehicle_id_col}' for availability (override vehicle_id_col).")

    all_blocks = df_av["census_block"].drop_duplicates().to_numpy()
    all_time_slots = pd.date_range(
        df_av["time_slot"].min(),
        df_av["time_slot"].max(),
        freq=availability_block_time_granularity,
    )

    full_index = pd.MultiIndex.from_product(
        [all_blocks, all_time_slots],
        names=["census_block", "time_slot"],
    ).to_frame(index=False)

    base_availability = (
        available_df.groupby(["census_block", "time_slot"])[vehicle_id_col]
        .count()
        .reset_index()
        .rename(columns={vehicle_id_col: "total_available"})
    )

    availability_block = full_index.merge(base_availability, on=["census_block", "time_slot"], how="left")
    availability_block["total_available"] = availability_block["total_available"].fillna(0).astype(int)

    if include_vehicle_type:
        if vehicle_type_col_avail not in available_df.columns:
            raise ValueError(f"include_vehicle_type=True but '{vehicle_type_col_avail}' not found (override).")
        type_pivot = available_df.pivot_table(
            index=["census_block", "time_slot"],
            columns=vehicle_type_col_avail,
            values=vehicle_id_col,
            aggfunc="count",
            fill_value=0,
        ).reset_index()
        type_pivot.columns.name = None
        type_pivot = type_pivot.rename(columns=lambda x: f"available_type_{x}" if isinstance(x, (int, np.integer)) else x)

        availability_block = availability_block.merge(type_pivot, on=["census_block", "time_slot"], how="left").fillna(0)
        numeric_cols = availability_block.columns.difference(["census_block", "time_slot"])
        availability_block[numeric_cols] = availability_block[numeric_cols].astype(int)

    availability_block["census_tract"] = availability_block["census_block"].astype(str).str[:tract_digits]
    availability_block = availability_block.sort_values(["census_block", "time_slot"]).reset_index(drop=True)

    # tract hourly mean
    available_df["time_slot_hour"] = available_df["timestamp"].dt.floor(availability_tract_time_granularity)
    tract_5min_df = (
        available_df.groupby(["census_tract", "timestamp"])
        .size()
        .reset_index(name="total_available_5min")
    )
    tract_5min_df["time_slot_hour"] = tract_5min_df["timestamp"].dt.floor(availability_tract_time_granularity)

    availability_tract_hourly_raw = (
        tract_5min_df.groupby(["census_tract", "time_slot_hour"])["total_available_5min"]
        .mean()
        .round(0)
        .astype(int)
        .reset_index()
        .rename(columns={"total_available_5min": "total_available", "time_slot_hour": "time_slot"})
        .sort_values(["census_tract", "time_slot"])
        .reset_index(drop=True)
    )

    availability_tract_hourly_norm = availability_tract_hourly_raw.copy()
    if availability_normalize:
        mn = availability_tract_hourly_norm["total_available"].min()
        mx = availability_tract_hourly_norm["total_available"].max()
        availability_tract_hourly_norm["total_available_norm"] = (
            0.0 if (pd.isna(mn) or pd.isna(mx) or mn == mx)
            else (availability_tract_hourly_norm["total_available"] - mn) / (mx - mn)
        )
        availability_tract_hourly_norm["total_available_norm"] = availability_tract_hourly_norm["total_available_norm"].round(5)

    if save_outputs:
        availability_block.to_csv(out_dir / f"availability_block_{output_tag}.csv", index=False)
        availability_tract_hourly_raw.to_csv(out_dir / f"availability_tract_hourly_raw_{output_tag}.csv", index=False)
        availability_tract_hourly_norm.to_csv(out_dir / f"availability_tract_hourly_norm_{output_tag}.csv", index=False)

    results["availability"] = {
        "availability_block_5min": availability_block,
        "availability_tract_hourly_raw": availability_tract_hourly_raw,
        "availability_tract_hourly_norm": availability_tract_hourly_norm,
    }

    # ============================================================
    # USAGE (INLINE)
    # ============================================================
    df_u = df.copy()
    required = {"timestamp", "lat", "lon", "census_block"}
    missing = required - set(df_u.columns)
    if missing:
        raise ValueError(f"Usage input missing required columns: {missing}")

    df_u["time_slot"] = df_u["timestamp"].dt.floor(usage_base_time_slot)
    df_u["rounded_lat"] = df_u["lat"].round(usage_rounding_decimals)
    df_u["rounded_lon"] = df_u["lon"].round(usage_rounding_decimals)

    counts = (
        df_u.groupby(["census_block", "time_slot", "rounded_lat", "rounded_lon"], as_index=False)
        .size()
        .rename(columns={"size": "cnt"})
    )

    all_blocks_u = df_u["census_block"].drop_duplicates().sort_values().to_numpy()
    all_slots = np.sort(df_u["time_slot"].unique())
    if len(all_slots) < 2:
        raise ValueError("Not enough time slots to compute dockless usage (need at least 2).")

    base_td = pd.to_timedelta(usage_base_time_slot)

    prev = counts.copy()
    prev["time_slot"] = prev["time_slot"] + base_td
    prev = prev.rename(columns={"cnt": "prev_cnt"})

    curr = counts.rename(columns={"cnt": "curr_cnt"})

    diff = prev.merge(curr, on=["census_block", "time_slot", "rounded_lat", "rounded_lon"], how="outer")
    diff["prev_cnt"] = diff["prev_cnt"].fillna(0).astype(int)
    diff["curr_cnt"] = diff["curr_cnt"].fillna(0).astype(int)

    diff["starts_part"] = (diff["prev_cnt"] - diff["curr_cnt"]).clip(lower=0)
    diff["ends_part"] = (diff["curr_cnt"] - diff["prev_cnt"]).clip(lower=0)

    usage_5min_block = (
        diff.groupby(["census_block", "time_slot"], as_index=False)
        .agg(num_trip_starts=("starts_part", "sum"), num_trip_ends=("ends_part", "sum"))
    )

    full_index = pd.MultiIndex.from_product(
        [all_blocks_u, all_slots[1:]],
        names=["census_block", "time_slot"],
    ).to_frame(index=False)
    usage_5min_block = full_index.merge(usage_5min_block, on=["census_block", "time_slot"], how="left")
    usage_5min_block[["num_trip_starts", "num_trip_ends"]] = (
        usage_5min_block[["num_trip_starts", "num_trip_ends"]].fillna(0).astype(int)
    )
    usage_5min_block = usage_5min_block.sort_values(["census_block", "time_slot"]).reset_index(drop=True)

    usage_5min_block["agg_slot"] = pd.to_datetime(usage_5min_block["time_slot"]).dt.floor(usage_aggregate_time_slot)

    usage_hourly_block = (
        usage_5min_block.groupby(["census_block", "agg_slot"], as_index=False)[["num_trip_starts", "num_trip_ends"]]
        .sum()
        .rename(columns={"agg_slot": "time_slot"})
        .sort_values(["census_block", "time_slot"])
        .reset_index(drop=True)
    )

    usage_hourly_block["census_tract"] = usage_hourly_block["census_block"].astype(str).str[:tract_digits]

    usage_hourly_tract = (
        usage_hourly_block.groupby(["census_tract", "time_slot"], as_index=False)
        .agg(num_trip_starts=("num_trip_starts", "sum"), num_trip_ends=("num_trip_ends", "sum"))
        .sort_values(["census_tract", "time_slot"])
        .reset_index(drop=True)
    )

    if usage_normalize:
        usage_hourly_tract["num_trip_starts_norm"] = _minmax(usage_hourly_tract["num_trip_starts"]).round(5)
        usage_hourly_tract["num_trip_ends_norm"] = _minmax(usage_hourly_tract["num_trip_ends"]).round(5)

    if save_outputs:
        usage_5min_block.to_csv(out_dir / f"usage_5min_block_{output_tag}.csv", index=False)
        usage_hourly_block.to_csv(out_dir / f"usage_hourly_block_{output_tag}.csv", index=False)
        usage_hourly_tract.to_csv(out_dir / f"usage_hourly_tract_{output_tag}.csv", index=False)

    results["usage"] = {
        "usage_5min_block": usage_5min_block,
        "usage_hourly_block": usage_hourly_block,
        "usage_hourly_tract": usage_hourly_tract,
    }

    # ============================================================
    # IDLE TIME (INLINE)
    # ============================================================
    df_i = df.copy()
    req = {idle_vehicle_id_col, "timestamp", idle_lat_col, idle_lon_col, idle_census_block_col, idle_vehicle_type_col}
    missing = req - set(df_i.columns)
    if missing:
        raise ValueError(
            f"Idle-time input missing required columns: {missing}. "
            "If Spin uses different names, override idle_*_col params."
        )

    df_i = df_i.sort_values([idle_vehicle_id_col, "timestamp"]).reset_index(drop=True)
    df_i["rounded_lat"] = df_i[idle_lat_col].round(idle_rounding_decimals)
    df_i["rounded_lon"] = df_i[idle_lon_col].round(idle_rounding_decimals)

    df_i["time_slot_5min"] = df_i["timestamp"].dt.floor(idle_base_time_slot)
    df_i["hour_bucket"] = df_i["timestamp"].dt.floor(idle_hour_bucket_freq)

    df_i["location_key"] = df_i["rounded_lat"].astype(str) + "_" + df_i["rounded_lon"].astype(str)

    df_i["location_shift"] = df_i.groupby([idle_vehicle_id_col, idle_census_block_col, "hour_bucket"])["location_key"].shift()
    df_i["new_segment"] = (df_i["location_key"] != df_i["location_shift"]).astype(int)
    df_i["segment_id"] = df_i.groupby([idle_vehicle_id_col, idle_census_block_col, "hour_bucket"])["new_segment"].cumsum()

    segments = (
        df_i.groupby([idle_vehicle_id_col, idle_census_block_col, "hour_bucket", idle_vehicle_type_col, "segment_id"], as_index=False)
        .agg(segment_start=("timestamp", "min"), segment_end=("timestamp", "max"), ping_count=("timestamp", "count"))
    )
    segments = segments[segments["ping_count"] > 1].copy()
    segments["idle_duration_minutes"] = (segments["segment_end"] - segments["segment_start"]).dt.total_seconds() / 60.0

    avg_idle = (
        segments.groupby([idle_census_block_col, "hour_bucket", idle_vehicle_type_col], as_index=False)["idle_duration_minutes"]
        .mean()
        .rename(columns={"idle_duration_minutes": "avg_idle_time_minutes"})
    )
    idle_counts = (
        segments.groupby([idle_census_block_col, "hour_bucket", idle_vehicle_type_col], as_index=False)["segment_id"]
        .count()
        .rename(columns={"segment_id": "num_idle_segments"})
    )
    idle_summary = avg_idle.merge(idle_counts, on=[idle_census_block_col, "hour_bucket", idle_vehicle_type_col], how="inner")
    idle_summary = idle_summary.sort_values([idle_census_block_col, "hour_bucket"]).reset_index(drop=True)

    idle_summary_full = None
    if idle_fill_full_grid:
        service_area_blocks = df_i[idle_census_block_col].drop_duplicates().to_numpy()
        all_hours = np.sort(df_i["hour_bucket"].unique())
        full_index = pd.MultiIndex.from_product(
            [service_area_blocks, all_hours],
            names=[idle_census_block_col, "hour_bucket"],
        ).to_frame(index=False)

        idle_summary_full = full_index.merge(idle_summary, on=[idle_census_block_col, "hour_bucket"], how="left")
        idle_summary_full[idle_vehicle_type_col] = idle_summary_full[idle_vehicle_type_col].fillna(idle_default_vehicle_type_for_missing)
        idle_summary_full["avg_idle_time_minutes"] = idle_summary_full["avg_idle_time_minutes"].fillna(0.0)
        idle_summary_full["num_idle_segments"] = idle_summary_full["num_idle_segments"].fillna(0).astype(int)
        idle_summary_full = idle_summary_full.sort_values([idle_census_block_col, "hour_bucket"]).reset_index(drop=True)

    block_for_tract = idle_summary_full if idle_summary_full is not None else idle_summary
    block_for_tract["census_tract"] = block_for_tract[idle_census_block_col].astype(str).str[:tract_digits]

    tract_idle_summary = (
        block_for_tract.groupby(["census_tract", "hour_bucket", idle_vehicle_type_col], as_index=False)
        .agg(avg_idle_time_minutes=("avg_idle_time_minutes", "mean"), num_idle_segments=("num_idle_segments", "sum"))
        .sort_values(["census_tract", "hour_bucket"])
        .reset_index(drop=True)
    )

    tract_idle_norm = tract_idle_summary.copy()
    if idle_normalize:
        tract_idle_norm["avg_idle_time_minutes_norm"] = _minmax(tract_idle_norm["avg_idle_time_minutes"]).round(5)

    if save_outputs:
        idle_summary.to_csv(out_dir / f"idle_summary_block_{output_tag}.csv", index=False)
        if idle_summary_full is not None:
            idle_summary_full.to_csv(out_dir / f"idle_summary_block_full_{output_tag}.csv", index=False)
        tract_idle_summary.to_csv(out_dir / f"idle_summary_tract_{output_tag}.csv", index=False)
        tract_idle_norm.to_csv(out_dir / f"idle_summary_tract_norm_{output_tag}.csv", index=False)

    results["idle_time"] = {
        "idle_summary_block": idle_summary,
        "idle_summary_block_full": idle_summary_full,
        "idle_summary_tract": tract_idle_summary,
        "idle_summary_tract_norm": tract_idle_norm,
    }

    # ============================================================
    # SAFETY (INLINE)
    # ============================================================
    if safety_protected_keywords is None:
        safety_protected_keywords = [
            "protected", "separated", "cycle track", "cycletrack", "parking protected",
            "class i", "class 1", "path", "off-street", "off street", "buffered", "raised"
        ]
    if safety_class_col_candidate_keywords is None:
        safety_class_col_candidate_keywords = [
            "facility", "class", "type", "category", "treatment", "protected",
            "separation", "separator", "lane", "bikelane", "bike lane", "route"
        ]

    safety_protected_match_mode = safety_protected_match_mode.lower().strip()
    if safety_protected_match_mode not in {"exact", "contains"}:
        raise ValueError("safety_protected_match_mode must be 'exact' or 'contains'")

    blocks = gpd.read_file(str(census_blocks_shp))
    if safety_census_block_id_col not in blocks.columns:
        raise ValueError(
            f"census block ID column '{safety_census_block_id_col}' not found. "
            f"Available: {list(blocks.columns)[:25]} ..."
        )

    if safety_working_epsg is not None:
        blocks = blocks.to_crs(epsg=int(safety_working_epsg))
    else:
        if blocks.crs is None:
            raise ValueError("census_blocks_shp has no CRS. Provide safety_working_epsg.")

    streets_df = pd.read_csv(str(centerline_streets_csv))
    if safety_centerline_wkt_col not in streets_df.columns:
        raise ValueError(f"Centerline WKT column '{safety_centerline_wkt_col}' not found in centerline CSV.")
    streets_df["geometry"] = streets_df[safety_centerline_wkt_col].apply(
        lambda x: wkt.loads(x) if isinstance(x, str) and x.startswith("LINESTRING") else None
    )
    streets_gdf = gpd.GeoDataFrame(streets_df, geometry="geometry", crs=safety_input_crs)
    streets_gdf = streets_gdf[~streets_gdf["geometry"].isna()].copy()
    streets_gdf = streets_gdf.to_crs(blocks.crs)

    bike_df = pd.read_csv(str(bike_lanes_csv))
    if safety_bike_lane_wkt_col not in bike_df.columns:
        raise ValueError(f"Bike lane WKT column '{safety_bike_lane_wkt_col}' not found in bike lane CSV.")
    bike_df["geometry"] = bike_df[safety_bike_lane_wkt_col].apply(
        lambda x: wkt.loads(x) if isinstance(x, str) and x.startswith("LINESTRING") else None
    )
    bike_gdf = gpd.GeoDataFrame(bike_df, geometry="geometry", crs=safety_input_crs)
    bike_gdf = bike_gdf[~bike_gdf["geometry"].isna()].copy()
    bike_gdf = bike_gdf.to_crs(blocks.crs)

    def _norm_text(x: Any) -> str:
        if pd.isna(x):
            return ""
        return str(x).strip().lower()

    detected_class_col = None
    detection_scores: List[Tuple[str, float]] = []

    if safety_bike_lane_class_col is None:
        text_cols = [c for c in bike_gdf.columns if bike_gdf[c].dtype == "object" and c != "geometry"]

        def _score_col(col: str) -> float:
            name_score = sum(1 for kw in safety_class_col_candidate_keywords if kw in col.lower())
            sample = bike_gdf[col].dropna().astype(str).head(5000)
            val_join = " ".join(sample.str.lower().tolist())
            val_score = sum(val_join.count(kw) for kw in safety_protected_keywords)
            return float(name_score) + float(val_score) / 1000.0

        for c in text_cols:
            s = _score_col(c)
            detection_scores.append((c, s))
        detection_scores.sort(key=lambda x: x[1], reverse=True)
        if detection_scores and detection_scores[0][1] > 0:
            detected_class_col = detection_scores[0][0]
            safety_bike_lane_class_col = detected_class_col

    can_compute_protected = safety_bike_lane_class_col is not None and safety_bike_lane_class_col in bike_gdf.columns

    protected_gdf = bike_gdf.iloc[0:0].copy()
    inferred_protected_values = None

    if can_compute_protected:
        col_series = bike_gdf[safety_bike_lane_class_col].astype(str)
        if safety_protected_values:
            pv = [p.strip().lower() for p in safety_protected_values]
            if safety_protected_match_mode == "exact":
                mask = col_series.map(_norm_text).isin(pv)
            else:
                mask = col_series.map(lambda v: any(k in _norm_text(v) for k in pv))
            protected_gdf = bike_gdf[mask].copy()
            inferred_protected_values = pv
        else:
            kws = [k.strip().lower() for k in safety_protected_keywords]
            mask = col_series.map(lambda v: any(k in _norm_text(v) for k in kws))
            protected_gdf = bike_gdf[mask].copy()
            inferred_protected_values = kws

    streets_in_blocks = gpd.overlay(streets_gdf, blocks, how="intersection")
    streets_in_blocks["segment_length"] = streets_in_blocks.geometry.length
    street_len_block = (
        streets_in_blocks.groupby(safety_census_block_id_col, as_index=False)
        .agg(streets_leng=("segment_length", "sum"))
        .rename(columns={safety_census_block_id_col: "census_block"})
    )
    street_len_block["census_block"] = street_len_block["census_block"].astype(str)
    street_len_block["streets_leng"] = street_len_block["streets_leng"].round(3)

    bike_in_blocks = gpd.overlay(bike_gdf, blocks, how="intersection")
    bike_in_blocks["bike_lane_length"] = bike_in_blocks.geometry.length
    bike_len_block = (
        bike_in_blocks.groupby(safety_census_block_id_col, as_index=False)
        .agg(total_bike_lane_length=("bike_lane_length", "sum"))
        .rename(columns={safety_census_block_id_col: "census_block"})
    )
    bike_len_block["census_block"] = bike_len_block["census_block"].astype(str)
    bike_len_block["total_bike_lane_length"] = bike_len_block["total_bike_lane_length"].round(3)

    protected_len_block = pd.DataFrame({"census_block": [], "protected_bike_lane_length": []})
    if can_compute_protected and not protected_gdf.empty:
        protected_in_blocks = gpd.overlay(protected_gdf, blocks, how="intersection")
        protected_in_blocks["protected_lane_length"] = protected_in_blocks.geometry.length
        protected_len_block = (
            protected_in_blocks.groupby(safety_census_block_id_col, as_index=False)
            .agg(protected_bike_lane_length=("protected_lane_length", "sum"))
            .rename(columns={safety_census_block_id_col: "census_block"})
        )
        protected_len_block["census_block"] = protected_len_block["census_block"].astype(str)
        protected_len_block["protected_bike_lane_length"] = protected_len_block["protected_bike_lane_length"].round(3)

    safety_block = (
        street_len_block.merge(bike_len_block, on="census_block", how="left")
        .merge(protected_len_block, on="census_block", how="left")
    )
    safety_block["total_bike_lane_length"] = safety_block["total_bike_lane_length"].fillna(0.0)
    safety_block["protected_bike_lane_length"] = safety_block["protected_bike_lane_length"].fillna(0.0)

    safety_block["bike_lane_ratio"] = safety_block.apply(
        lambda r: (r["total_bike_lane_length"] / r["streets_leng"]) if r["streets_leng"] > 0 else 0.0,
        axis=1,
    ).round(3)
    safety_block["protected_bike_lane_ratio"] = safety_block.apply(
        lambda r: (r["protected_bike_lane_length"] / r["streets_leng"]) if r["streets_leng"] > 0 else 0.0,
        axis=1,
    ).round(3)

    ct = pd.read_csv(str(centroid_tract_csv))
    if safety_centroid_tract_id_col not in ct.columns:
        raise ValueError(f"centroid_tract_csv missing '{safety_centroid_tract_id_col}'. Available: {list(ct.columns)}")
    ct[safety_centroid_tract_id_col] = ct[safety_centroid_tract_id_col].astype(str)

    safety_block["census_tract"] = safety_block["census_block"].astype(str).str[:tract_digits]
    tract_sums = safety_block.groupby("census_tract", as_index=False)[
        ["streets_leng", "total_bike_lane_length", "protected_bike_lane_length"]
    ].sum()

    keep_cols = [safety_centroid_tract_id_col]
    if safety_centroid_area_col is not None and safety_centroid_area_col in ct.columns:
        keep_cols.append(safety_centroid_area_col)

    safety_tract = ct[keep_cols].merge(
        tract_sums,
        left_on=safety_centroid_tract_id_col,
        right_on="census_tract",
        how="left",
    )
    safety_tract[["streets_leng", "total_bike_lane_length", "protected_bike_lane_length"]] = (
        safety_tract[["streets_leng", "total_bike_lane_length", "protected_bike_lane_length"]].fillna(0.0)
    )

    safety_tract["bike_lane_ratio"] = safety_tract.apply(
        lambda r: (r["total_bike_lane_length"] / r["streets_leng"]) if r["streets_leng"] > 0 else 0.0,
        axis=1,
    )
    safety_tract["protected_bike_lane_ratio"] = safety_tract.apply(
        lambda r: (r["protected_bike_lane_length"] / r["streets_leng"]) if r["streets_leng"] > 0 else 0.0,
        axis=1,
    )

    safety_tract = safety_tract.sort_values(by=safety_centroid_tract_id_col).reset_index(drop=True)

    safety_tract_norm = safety_tract.copy()
    if safety_normalize:
        safety_tract_norm["bike_lane_ratio_norm"] = _minmax(safety_tract_norm["bike_lane_ratio"]).round(5)
        safety_tract_norm["protected_bike_lane_ratio_norm"] = _minmax(safety_tract_norm["protected_bike_lane_ratio"]).round(5)

    if save_outputs:
        safety_block.to_csv(out_dir / f"safety_block_bike_lane_{output_tag}.csv", index=False)
        safety_tract.to_csv(out_dir / f"safety_bike_lane_tract_{output_tag}.csv", index=False)
        safety_tract_norm.to_csv(out_dir / f"safety_bike_lane_norm_tract_{output_tag}.csv", index=False)

    results["safety"] = {
        "safety_block_df": safety_block,
        "safety_tract_df": safety_tract,
        "safety_tract_norm_df": safety_tract_norm,
        "meta": {
            "working_crs": str(blocks.crs),
            "bike_lane_class_col_used": safety_bike_lane_class_col,
            "bike_lane_class_col_detected": detected_class_col,
            "protected_match_mode": safety_protected_match_mode,
            "protected_values_used": inferred_protected_values,
            "detection_scores_top5": detection_scores[:5],
            "normalize": safety_normalize,
        },
    }

    return results

# **Improved one**

In [30]:
from __future__ import annotations

from pathlib import Path
from typing import Optional, Union, Dict, Any, List
import json
import requests

import pandas as pd
import numpy as np
import geopandas as gpd
from shapely.geometry import Point
from shapely import wkt
from tqdm import tqdm


def run_sf_dockless_all_utilities_single_function(
    *,
    # ------------------------------------------------------------
    # USER INPUTS
    # ------------------------------------------------------------
    system_key: str,
    freebike_status_txt: Union[str, Path],

    # ------------------------------------------------------------
    # CITY ASSETS
    # ------------------------------------------------------------
    census_blocks_shp: Optional[Union[str, Path]] = None,
    centerline_streets_path: Optional[Union[str, Path]] = None,
    bike_lanes_path: Optional[Union[str, Path]] = None,
    planned_bike_lanes_path: Optional[Union[str, Path]] = None,
    centroid_tract_path: Optional[Union[str, Path]] = None,
    
    # ------------------------------------------------------------
    # OUTPUT
    # ------------------------------------------------------------
    output_dir: Optional[Union[str, Path]] = None,
    save_outputs: bool = True,

    # ------------------------------------------------------------
    # TIME WINDOW
    # ------------------------------------------------------------
    time_start: Optional[Union[str, pd.Timestamp]] = None,
    time_end: Optional[Union[str, pd.Timestamp]] = None,

    # ------------------------------------------------------------
    # CONFIGURATION
    # ------------------------------------------------------------
    step0_raw_csv_name: Optional[str] = None,
    step0_done_csv_name: Optional[str] = None,
    blocks_geoid_col: str = "GEOID20",
    blocks_target_epsg: int = 4326,
    fill_missing_with_census_api: bool = True,
    census_benchmark: str = "Public_AR_Census2020",
    census_vintage: str = "2020",
    timestamp_parse_mode: str = "auto",
    drop_cols_if_present: Optional[List[str]] = None,
    
    # Availability
    availability_block_time_granularity: str = "5min",
    availability_tract_time_granularity: str = "1H",
    reserved_col: str = "is_reserved",
    disabled_col: str = "is_disabled",
    vehicle_id_col: str = "bike_id",
    include_vehicle_type: bool = True,
    vehicle_type_col_avail: str = "vehicle_type_id",
    tract_digits: int = 11,
    availability_normalize: bool = True,

    # Usage
    usage_base_time_slot: str = "5min",
    usage_aggregate_time_slot: str = "1H",
    usage_rounding_decimals: int = 4,
    usage_normalize: bool = True,

    # Idle Time
    idle_base_time_slot: str = "5min",
    idle_hour_bucket_freq: str = "1H",
    idle_rounding_decimals: int = 4,
    idle_vehicle_id_col: str = "bike_id",
    idle_lat_col: str = "lat",
    idle_lon_col: str = "lon",
    idle_census_block_col: str = "census_block",
    idle_vehicle_type_col: str = "vehicle_type_id",
    idle_default_vehicle_type_for_missing: str = "", 
    idle_normalize: bool = True,

    # Safety
    safety_centerline_wkt_col: str = "line",
    safety_bike_lane_wkt_col: str = "shape",
    safety_input_crs: str = "EPSG:4326",
    safety_census_block_id_col: str = "GEOID20",
    safety_centroid_tract_id_col: str = "census_tract",
    safety_centroid_area_col: Optional[str] = "county_name",
    safety_working_epsg: Optional[int] = None,
    safety_bike_lane_class_col: Optional[str] = None,
    safety_protected_values: Optional[List[str]] = None,
    safety_protected_keywords: Optional[List[str]] = None,
    safety_protected_match_mode: str = "contains",
    safety_normalize: bool = True,
) -> Dict[str, Any]:

    # ============================================================
    # 1. System Presets
    # ============================================================
    SYSTEMS: Dict[str, Dict[str, Any]] = {
        "SF_LIME_DOCKLESS": {
            "city": "San Francisco", "vendor": "Lime", "tag": "sf_lime",
            "default_output_dir": "SF_LIME_DOCKLESS_FULL_RUN",
            "raw_csv": "san_fran_lime_status_raw.csv", "done_csv": "san_fran_lime_status_done.csv",
            "assets": {
                "census_blocks_shp": r"D:\Research Fellowship\Summer Research Stuff\Clean_Utilities\GBFS_Census_Tract\San_Fran_Lime\tl_2024_06_tabblock20.shp",
                "centerline_streets_path": r"D:\Research Fellowship\Summer Research Stuff\Clean_Utilities\Safety\San_Fran_Lime\Centerline.csv",
                "bike_lanes_path": r"D:\Research Fellowship\Summer Research Stuff\Clean_Utilities\Safety\San_Fran_Lime\Bikelane.csv",
                "centroid_tract_path": r"D:\Research Fellowship\Summer Research Stuff\Clean_Utilities\Capacity\San_Fran_Baywheels\centroid_tract_ca.csv",
            },
            "safety_epsg": 26910, "idle_decimals": 4
        },
        "SF_SPIN_DOCKLESS": {
            "city": "San Francisco", "vendor": "Spin", "tag": "sf_spin",
            "default_output_dir": "SF_SPIN_DOCKLESS_FULL_RUN",
            "raw_csv": "san_fran_spin_status_raw.csv", "done_csv": "san_fran_spin_status_done.csv",
            "assets": {
                "census_blocks_shp": r"D:\Research Fellowship\Summer Research Stuff\Clean_Utilities\GBFS_Census_Tract\San_Fran_Lime\tl_2024_06_tabblock20.shp",
                "centerline_streets_path": r"D:\Research Fellowship\Summer Research Stuff\Clean_Utilities\Safety\San_Fran_Lime\Centerline.csv",
                "bike_lanes_path": r"D:\Research Fellowship\Summer Research Stuff\Clean_Utilities\Safety\San_Fran_Lime\Bikelane.csv",
                "centroid_tract_path": r"D:\Research Fellowship\Summer Research Stuff\Clean_Utilities\Capacity\San_Fran_Baywheels\centroid_tract_ca.csv",
            },
            "safety_epsg": 26910, "idle_decimals": 4
        },
        "SEATTLE_BIRD_DOCKLESS": {
            "city": "Seattle", "vendor": "Bird", "tag": "seattle_bird",
            "default_output_dir": "SEATTLE_BIRD_DOCKLESS_FULL_RUN",
            "raw_csv": "seattle_bird_status_raw.csv", "done_csv": "seattle_bird_status_done.csv",
            "assets": {
                "census_blocks_shp": r"D:\Research Fellowship\Summer Research Stuff\Clean_Utilities\Safety\Seattle_Bird\Seattle Census Block\tl_2024_53_tabblock20.shp",
                "centerline_streets_path": r"D:\Research Fellowship\Summer Research Stuff\Clean_Utilities\Safety\Seattle_Bird\Seattle_Streets.shp",
                "bike_lanes_path": r"D:\Research Fellowship\Summer Research Stuff\Clean_Utilities\Safety\Seattle_Bird\SDOT_Bike_Facilities_5512142703833213564.geojson",
                "planned_bike_lanes_path": r"D:\Research Fellowship\Summer Research Stuff\Clean_Utilities\Safety\Seattle_Bird\Planned Seattle Bike Lanes\Planned_Bike_Facilities.shp",
                "centroid_tract_path": r"D:\Research Fellowship\Summer Research Stuff\Clean_Utilities\Avalibility\Seattle_Bird\tl_2024_53_tract.shp",
            },
            "safety_epsg": 2285, 
            "safety_config": {"bike_lane_class_col": "CATEGORY", "protected_values": ["BKF-PBL"], "protected_match_mode": "exact"},
            "idle_decimals": 3 
        },
        "SEATTLE_LIME_DOCKLESS": {
            "city": "Seattle", "vendor": "Lime", "tag": "seattle_lime",
            "default_output_dir": "SEATTLE_LIME_DOCKLESS_FULL_RUN",
            "raw_csv": "seattle_lime_status_raw.csv", "done_csv": "seattle_lime_status_done.csv",
            "assets": {
                "census_blocks_shp": r"D:\Research Fellowship\Summer Research Stuff\Clean_Utilities\Safety\Seattle_Bird\Seattle Census Block\tl_2024_53_tabblock20.shp",
                "centerline_streets_path": r"D:\Research Fellowship\Summer Research Stuff\Clean_Utilities\Safety\Seattle_Bird\Seattle_Streets.shp",
                "bike_lanes_path": r"D:\Research Fellowship\Summer Research Stuff\Clean_Utilities\Safety\Seattle_Bird\SDOT_Bike_Facilities_5512142703833213564.geojson",
                "planned_bike_lanes_path": r"D:\Research Fellowship\Summer Research Stuff\Clean_Utilities\Safety\Seattle_Bird\Planned Seattle Bike Lanes\Planned_Bike_Facilities.shp",
                "centroid_tract_path": r"D:\Research Fellowship\Summer Research Stuff\Clean_Utilities\Avalibility\Seattle_Bird\tl_2024_53_tract.shp",
            },
            "safety_epsg": 2285,
            "safety_config": {"bike_lane_class_col": "CATEGORY", "protected_values": ["BKF-PBL"], "protected_match_mode": "exact"},
            "idle_decimals": 4
        },
    }

    if system_key not in SYSTEMS: raise ValueError(f"Unknown system_key: {system_key}")
    preset = SYSTEMS[system_key]
    output_tag = preset["tag"]
    
    # Initialize Paths and Defaults
    output_dir = output_dir or preset["default_output_dir"]
    step0_raw_csv_name = preset["raw_csv"]
    step0_done_csv_name = preset["done_csv"]
    p_assets = preset.get("assets", {})
    
    census_blocks_shp = census_blocks_shp or p_assets.get("census_blocks_shp")
    centerline_streets_path = centerline_streets_path or p_assets.get("centerline_streets_path")
    bike_lanes_path = bike_lanes_path or p_assets.get("bike_lanes_path")
    planned_bike_lanes_path = planned_bike_lanes_path or p_assets.get("planned_bike_lanes_path")
    centroid_tract_path = centroid_tract_path or p_assets.get("centroid_tract_path")
    
    safety_working_epsg = safety_working_epsg or preset.get("safety_epsg", 26910)
    p_safe_conf = preset.get("safety_config", {})
    if safety_bike_lane_class_col is None: safety_bike_lane_class_col = p_safe_conf.get("bike_lane_class_col")
    if safety_protected_values is None: safety_protected_values = p_safe_conf.get("protected_values")
    if "protected_match_mode" in p_safe_conf: safety_protected_match_mode = p_safe_conf["protected_match_mode"]

    p_idle_conf = preset.get("idle_config", {})
    if "decimals" in p_idle_conf: idle_rounding_decimals = p_idle_conf["decimals"]

    out_dir = Path(output_dir)
    out_dir.mkdir(parents=True, exist_ok=True)
    if not Path(freebike_status_txt).exists(): raise FileNotFoundError(f"Missing: {freebike_status_txt}")
    
    raw_path = out_dir / step0_raw_csv_name
    done_path = out_dir / step0_done_csv_name

    results: Dict[str, Any] = {"system_key": system_key, "city": preset["city"], "vendor": preset["vendor"], "output_dir": str(out_dir)}

    def _minmax(series: pd.Series) -> pd.Series:
        s = pd.to_numeric(series, errors="coerce")
        mn, mx = s.min(skipna=True), s.max(skipna=True)
        if pd.isna(mn) or pd.isna(mx) or mx <= mn: return pd.Series(0.0, index=s.index)
        return (s - mn) / (mx - mn)
    
    # ============================================================
    # STEP 0: PARSING
    # ============================================================
    rows = []
    with Path(freebike_status_txt).open("r", encoding="utf-8") as f:
        for line in f:
            if not line.strip(): continue
            try:
                blob = json.loads(line)
                ts = list(blob.keys())[0]
                for entry in blob[ts]:
                    if "vehicle_types_available" in entry:
                        for vt in entry.get("vehicle_types_available", []):
                            entry[f"vehicle_type_{vt.get('vehicle_type_id', 'unknown')}_count"] = vt.get("count", 0)
                        del entry["vehicle_types_available"]
                    entry["timestamp"] = ts
                    rows.append(entry)
            except: continue

    raw_df = pd.DataFrame(rows)
    if raw_df.empty: raise ValueError("No data found in text file.")
    
    # ID Column Detection
    id_candidates = ["bike_id", "vehicle_id", "id"]
    vehicle_id_col = next((c for c in id_candidates if c in raw_df.columns), None)
    if not vehicle_id_col: raise ValueError(f"Could not find vehicle ID. Checked: {id_candidates}")
    
    raw_df["timestamp"] = pd.to_datetime(raw_df["timestamp"], errors="coerce")
    if raw_df["timestamp"].isna().mean() > 0.5:
        raw_df["timestamp"] = pd.to_datetime(raw_df["timestamp"], unit="s", errors="coerce")

    if drop_cols_if_present:
        drop_now = [c for c in drop_cols_if_present if c in raw_df.columns]
        if drop_now: raw_df = raw_df.drop(columns=drop_now)

    if save_outputs: raw_df.to_csv(raw_path, index=False)

    # Spatial Join
    if not {"lat", "lon"}.issubset(raw_df.columns): raise ValueError("Input missing lat/lon")
    latlon = raw_df[["lat", "lon"]].drop_duplicates()
    gdf = gpd.GeoDataFrame(latlon, geometry=[Point(xy) for xy in zip(latlon.lon, latlon.lat)], crs="EPSG:4326")
    blocks = gpd.read_file(str(census_blocks_shp)).to_crs(epsg=blocks_target_epsg)
    
    b_col = "GEOID20" if "GEOID20" in blocks.columns else "GEOID"
    joined = gpd.sjoin(gdf, blocks[[b_col, "geometry"]], how="left", predicate="within")
    joined = joined.rename(columns={b_col: "census_block"})
    
    done_df = raw_df.merge(joined[["lat", "lon", "census_block"]], on=["lat", "lon"], how="left")
    
    if fill_missing_with_census_api:
        missing = done_df[done_df["census_block"].isna()][["lat", "lon"]].drop_duplicates()
        if not missing.empty and len(missing) < 1000:
            tqdm.pandas(desc="API Fill")
            def get_blk(r):
                try:
                    url = f"https://geocoding.geo.census.gov/geocoder/geographies/coordinates?x={r.lon}&y={r.lat}&benchmark={census_benchmark}&vintage={census_vintage}&format=json"
                    res = requests.get(url, timeout=5).json()
                    return res["result"]["geographies"]["2020 Census Blocks"][0]["GEOID"]
                except: return None
            missing["cb_new"] = missing.progress_apply(get_blk, axis=1)
            done_df = done_df.merge(missing, on=["lat", "lon"], how="left")
            done_df["census_block"] = done_df["census_block"].fillna(done_df["cb_new"])
            done_df.drop(columns=["cb_new"], inplace=True, errors="ignore")

    done_df["census_block"] = done_df["census_block"].fillna("unknown").astype(str)
    done_df["census_tract"] = done_df["census_block"].str[:tract_digits]
    
    if time_start: done_df = done_df[done_df["timestamp"] >= pd.to_datetime(time_start)]
    if time_end: done_df = done_df[done_df["timestamp"] < pd.to_datetime(time_end)]
    if save_outputs: done_df.to_csv(done_path, index=False)

    # ============================================================
    # AVAILABILITY
    # ============================================================
    df_av = done_df.copy()
    df_av["slot"] = df_av["timestamp"].dt.floor(availability_block_time_granularity)
    if "is_reserved" not in df_av.columns: df_av["is_reserved"] = 0
    if "is_disabled" not in df_av.columns: df_av["is_disabled"] = 0
    av = df_av[(df_av["is_reserved"]==0) & (df_av["is_disabled"]==0)]
    
    av_blk = av.groupby(["census_block", "slot"]).size().reset_index(name="total_available")
    av["h_slot"] = av["timestamp"].dt.floor(availability_tract_time_granularity)
    av_tract = av.groupby(["census_tract", "h_slot"]).size().reset_index(name="total_available")
    if availability_normalize:
        av_tract["total_available_norm"] = _minmax(av_tract["total_available"]).round(5)

    if save_outputs:
        av_blk.to_csv(out_dir / f"availability_block_{output_tag}.csv", index=False)
        av_tract.to_csv(out_dir / f"availability_tract_hourly_raw_{output_tag}.csv", index=False)

    # ============================================================
    # USAGE
    # ============================================================
    df_u = done_df.copy()
    df_u["slot"] = df_u["timestamp"].dt.floor(usage_base_time_slot)
    df_u["lat_r"] = df_u["lat"].round(usage_rounding_decimals)
    df_u["lon_r"] = df_u["lon"].round(usage_rounding_decimals)
    
    cnts = df_u.groupby(["census_block", "slot", "lat_r", "lon_r"]).size().reset_index(name="cnt")
    prev = cnts.copy()
    prev["slot"] += pd.to_timedelta(usage_base_time_slot)
    
    flux = prev.merge(cnts, on=["census_block", "slot", "lat_r", "lon_r"], how="outer", suffixes=("_prev", "_curr")).fillna(0)
    flux["starts"] = (flux["cnt_prev"] - flux["cnt_curr"]).clip(lower=0)
    flux["ends"] = (flux["cnt_curr"] - flux["cnt_prev"]).clip(lower=0)
    
    use_blk = flux.groupby(["census_block", "slot"])[["starts", "ends"]].sum().reset_index()
    use_blk["h_slot"] = use_blk["slot"].dt.floor(usage_aggregate_time_slot)
    use_blk["census_tract"] = use_blk["census_block"].str[:tract_digits]
    use_tract = use_blk.groupby(["census_tract", "h_slot"])[["starts", "ends"]].sum().reset_index()
    
    if save_outputs:
        use_blk.to_csv(out_dir / f"usage_5min_block_{output_tag}.csv", index=False)
        use_tract.to_csv(out_dir / f"usage_hourly_tract_{output_tag}.csv", index=False)

    # ============================================================
    # IDLE TIME (Dynamic Column Detection)
    # ============================================================
    df_i = done_df.copy()
    
    # --- AUTO-DETECT IDLE COLUMN ---
    # 1. Check if the default/passed 'idle_vehicle_type_col' exists in DF
    # 2. If NOT, checks if "vehicle_type" exists (common variant for Seattle Lime)
    # 3. If NOT, falls back to creating a dummy column
    
    if idle_vehicle_type_col not in df_i.columns:
        if "vehicle_type" in df_i.columns:
            idle_vehicle_type_col = "vehicle_type"
        else:
            # Create default dummy col
            idle_vehicle_type_col = "vehicle_type_id"
            df_i[idle_vehicle_type_col] = idle_default_vehicle_type_for_missing
    
    # Ensure no NaNs in the chosen grouping column
    df_i[idle_vehicle_type_col] = df_i[idle_vehicle_type_col].fillna(idle_default_vehicle_type_for_missing)

    # ------------------------------------------------------------

    df_i = df_i.sort_values([idle_vehicle_id_col, "timestamp"]).reset_index(drop=True)
    
    # Rounding
    df_i["lat_r"] = df_i[idle_lat_col].round(idle_rounding_decimals)
    df_i["lon_r"] = df_i[idle_lon_col].round(idle_rounding_decimals)
    df_i["h_bucket"] = df_i["timestamp"].dt.floor(idle_hour_bucket_freq)
    
    # 3. ROBUST CALCULATION: "Presence is Idle" logic
    idle_res = df_i.groupby(["census_block", "h_bucket", idle_vehicle_type_col]).size().reset_index(name="ping_count")
    
    # Multiplier: Assuming data is 5min snapshots
    snapshot_interval_min = 5 
    idle_res["avg_idle_time_minutes"] = idle_res["ping_count"] * snapshot_interval_min
    idle_res["num_idle_segments"] = idle_res["ping_count"] 
    
    idle_res["census_tract"] = idle_res["census_block"].str[:tract_digits]
    idle_tract = idle_res.groupby(["census_tract", "h_bucket", idle_vehicle_type_col])[["avg_idle_time_minutes", "num_idle_segments"]].sum().reset_index()
    
    if idle_normalize:
        idle_tract["avg_idle_time_minutes_norm"] = _minmax(idle_tract["avg_idle_time_minutes"]).round(5)

    if save_outputs:
        idle_res.to_csv(out_dir / f"idle_summary_block_{output_tag}.csv", index=False)
        idle_tract.to_csv(out_dir / f"idle_summary_tract_{output_tag}.csv", index=False)
        idle_tract.to_csv(out_dir / f"idle_summary_tract_norm_{output_tag}.csv", index=False)

    # ============================================================
    # SAFETY
    # ============================================================
    blocks = gpd.read_file(str(census_blocks_shp))
    if safety_working_epsg: blocks = blocks.to_crs(epsg=int(safety_working_epsg))
    
    # Smart Load Streets
    st_path = Path(centerline_streets_path)
    if st_path.suffix.lower() == ".csv":
        st_df = pd.read_csv(st_path)
        st_df["geometry"] = st_df[safety_centerline_wkt_col].apply(lambda x: wkt.loads(x) if isinstance(x, str) else None)
        st = gpd.GeoDataFrame(st_df, geometry="geometry", crs=safety_input_crs)
    else: st = gpd.read_file(st_path).to_crs(safety_input_crs)
    st = st.to_crs(blocks.crs)
    
    # Smart Load Bike Lanes
    bl_path = Path(bike_lanes_path)
    if bl_path.suffix.lower() == ".csv":
        bl_df = pd.read_csv(bl_path)
        bl_df["geometry"] = bl_df[safety_bike_lane_wkt_col].apply(lambda x: wkt.loads(x) if isinstance(x, str) else None)
        bl = gpd.GeoDataFrame(bl_df, geometry="geometry", crs=safety_input_crs)
    else: bl = gpd.read_file(bl_path).to_crs(safety_input_crs)
    
    if planned_bike_lanes_path:
        pl = gpd.read_file(str(planned_bike_lanes_path)).to_crs(safety_input_crs)
        bl = pd.concat([bl, pl], ignore_index=True)
    bl = bl.to_crs(blocks.crs)

    # Protected Logic
    conf = preset.get("safety_config", {})
    col = conf.get("bike_lane_class_col", "class")
    vals = conf.get("protected_values", [])
    
    prot = bl.iloc[0:0]
    if col in bl.columns and vals:
        prot = bl[bl[col].isin(vals)]

    # Intersections
    def get_len(lines, poly, name):
        if lines.empty: return pd.DataFrame({"census_block": [], name: []})
        c = gpd.overlay(lines, poly, how="intersection")
        c["l"] = c.geometry.length
        bid = "GEOID20" if "GEOID20" in poly.columns else "GEOID"
        return c.groupby(bid)["l"].sum().reset_index(name=name).rename(columns={bid: "census_block"})

    s_len = get_len(st, blocks, "st_len")
    b_len = get_len(bl, blocks, "bl_len")
    p_len = get_len(prot, blocks, "pr_len")
    
    safe = s_len.merge(b_len, on="census_block", how="left").merge(p_len, on="census_block", how="left").fillna(0)
    safe["census_tract"] = safe["census_block"].astype(str).str[:tract_digits]
    
    # Tract Agg
    ct_path = Path(centroid_tract_path)
    if ct_path.suffix.lower() == ".csv":
        ct = pd.read_csv(ct_path)
        ct["census_tract"] = ct["census_tract"].astype(str)
        base = ct[["census_tract"]]
    else:
        ct = gpd.read_file(ct_path)
        cid = "GEOID" if "GEOID" in ct.columns else "census_tract"
        base = pd.DataFrame({"census_tract": ct[cid].astype(str)})

    tract_safe = safe.groupby("census_tract")[["st_len", "bl_len", "pr_len"]].sum().reset_index()
    final = base.merge(tract_safe, on="census_tract", how="left").fillna(0)
    
    final["ratio_bl"] = np.where(final["st_len"]>0, final["bl_len"]/final["st_len"], 0)
    final["ratio_pr"] = np.where(final["st_len"]>0, final["pr_len"]/final["st_len"], 0)
    
    if save_outputs:
        safe.to_csv(out_dir / f"safety_block_{output_tag}.csv", index=False)
        final.to_csv(out_dir / f"safety_tract_{output_tag}.csv", index=False)

    results["safety"] = {"tract": final}
    return results

In [31]:
# ------------------------------------------------------------
# Example Call: Seattle Bird Dockless
# ------------------------------------------------------------
seattle_results = run_sf_dockless_all_utilities_single_function(
    system_key="SEATTLE_LIME_DOCKLESS",
    
    # Path to your raw GBFS status file (Change filename as needed)
    freebike_status_txt=r"D:\Research Fellowship\Summer Research Stuff\Collected Data\Week 1\09-June\seattle_lime_dkless_freebike_status_6_9.txt",
    
    # Where you want the files saved
    output_dir="SEATTLE_LIME_FULL_RUN",
    
    # Time window for analysis
    time_start="2025-06-09 06:00:00",
    time_end="2025-06-09 12:00:00",
    
    # NOTE: 
    # Because we added the paths to the 'SYSTEMS' dictionary inside the function,
    # you DO NOT need to pass census_blocks_shp, centerline_streets_path, etc. here.
    # It will automatically use the D:\ paths you provided.
)

  raw_df["timestamp"] = pd.to_datetime(raw_df["timestamp"], errors="coerce")
API Fill: 100%|██████████| 7/7 [00:06<00:00,  1.07it/s]
  av["h_slot"] = av["timestamp"].dt.floor(availability_tract_time_granularity)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  av["h_slot"] = av["timestamp"].dt.floor(availability_tract_time_granularity)
  use_blk["h_slot"] = use_blk["slot"].dt.floor(usage_aggregate_time_slot)
  df_i["h_bucket"] = df_i["timestamp"].dt.floor(idle_hour_bucket_freq)
  return ogr_read(
  bl = pd.concat([bl, pl], ignore_index=True)


# **Latest Version**

In [39]:
from __future__ import annotations

from pathlib import Path
from typing import Optional, Union, Dict, Any, List
import json
import requests

import pandas as pd
import numpy as np
import geopandas as gpd
from shapely.geometry import Point
from shapely import wkt
from tqdm import tqdm


def run_sf_dockless_all_utilities_single_function(
    *,
    # ------------------------------------------------------------
    # USER INPUTS
    # ------------------------------------------------------------
    system_key: str,
    freebike_status_txt: Union[str, Path],

    # ------------------------------------------------------------
    # CITY ASSETS
    # ------------------------------------------------------------
    census_blocks_shp: Optional[Union[str, Path]] = None,
    centerline_streets_path: Optional[Union[str, Path]] = None,
    bike_lanes_path: Optional[Union[str, Path]] = None,
    planned_bike_lanes_path: Optional[Union[str, Path]] = None,
    centroid_tract_path: Optional[Union[str, Path]] = None,
    
    # ------------------------------------------------------------
    # OUTPUT
    # ------------------------------------------------------------
    output_dir: Optional[Union[str, Path]] = None,
    save_outputs: bool = True,

    # ------------------------------------------------------------
    # TIME WINDOW
    # ------------------------------------------------------------
    time_start: Optional[Union[str, pd.Timestamp]] = None,
    time_end: Optional[Union[str, pd.Timestamp]] = None,

    # ------------------------------------------------------------
    # CONFIGURATION
    # ------------------------------------------------------------
    step0_raw_csv_name: Optional[str] = None,
    step0_done_csv_name: Optional[str] = None,
    blocks_geoid_col: str = "GEOID20",
    blocks_target_epsg: int = 4326,
    fill_missing_with_census_api: bool = True,
    census_benchmark: str = "Public_AR_Census2020",
    census_vintage: str = "2020",
    timestamp_parse_mode: str = "auto",
    drop_cols_if_present: Optional[List[str]] = None,
    
    # Availability
    availability_block_time_granularity: str = "5min",
    availability_tract_time_granularity: str = "1h", # Fixed Warning
    reserved_col: str = "is_reserved",
    disabled_col: str = "is_disabled",
    vehicle_id_col: str = "bike_id",
    include_vehicle_type: bool = True,
    vehicle_type_col_avail: str = "vehicle_type_id",
    tract_digits: int = 11,
    availability_normalize: bool = True,

    # Usage
    usage_base_time_slot: str = "5min",
    usage_aggregate_time_slot: str = "1h", # Fixed Warning
    usage_rounding_decimals: int = 4,
    usage_normalize: bool = True,

    # Idle Time
    idle_base_time_slot: str = "5min",
    idle_hour_bucket_freq: str = "1h", # Fixed Warning
    idle_rounding_decimals: int = 4,
    idle_vehicle_id_col: str = "bike_id",
    idle_lat_col: str = "lat",
    idle_lon_col: str = "lon",
    idle_census_block_col: str = "census_block",
    idle_vehicle_type_col: str = "vehicle_type_id",
    idle_default_vehicle_type_for_missing: str = "", 
    idle_normalize: bool = True,

    # Safety
    safety_centerline_wkt_col: str = "line",
    safety_bike_lane_wkt_col: str = "shape",
    safety_input_crs: str = "EPSG:4326",
    safety_census_block_id_col: str = "GEOID20",
    safety_centroid_tract_id_col: str = "census_tract",
    safety_centroid_area_col: Optional[str] = "county_name",
    safety_working_epsg: Optional[int] = None,
    safety_bike_lane_class_col: Optional[str] = None,
    safety_protected_values: Optional[List[str]] = None,
    safety_protected_keywords: Optional[List[str]] = None,
    safety_protected_match_mode: str = "contains",
    safety_normalize: bool = True,
) -> Dict[str, Any]:

    # ============================================================
    # 1. System Presets
    # ============================================================
    SYSTEMS: Dict[str, Dict[str, Any]] = {
        "SF_LIME_DOCKLESS": {
            "city": "San Francisco", "vendor": "Lime", "tag": "sf_lime",
            "default_output_dir": "SF_LIME_DOCKLESS_FULL_RUN",
            "raw_csv": "san_fran_lime_status_raw.csv", "done_csv": "san_fran_lime_status_done.csv",
            "assets": {
                "census_blocks_shp": r"D:\Research Fellowship\Summer Research Stuff\Clean_Utilities\GBFS_Census_Tract\San_Fran_Lime\tl_2024_06_tabblock20.shp",
                "centerline_streets_path": r"D:\Research Fellowship\Summer Research Stuff\Clean_Utilities\Safety\San_Fran_Lime\Centerline.csv",
                "bike_lanes_path": r"D:\Research Fellowship\Summer Research Stuff\Clean_Utilities\Safety\San_Fran_Lime\Bikelane.csv",
                "centroid_tract_path": r"D:\Research Fellowship\Summer Research Stuff\Clean_Utilities\Capacity\San_Fran_Baywheels\centroid_tract_ca.csv",
            },
            "safety_epsg": 26910, "idle_decimals": 4
        },
        "SF_SPIN_DOCKLESS": {
            "city": "San Francisco", "vendor": "Spin", "tag": "sf_spin",
            "default_output_dir": "SF_SPIN_DOCKLESS_FULL_RUN",
            "raw_csv": "san_fran_spin_status_raw.csv", "done_csv": "san_fran_spin_status_done.csv",
            "assets": {
                "census_blocks_shp": r"D:\Research Fellowship\Summer Research Stuff\Clean_Utilities\GBFS_Census_Tract\San_Fran_Lime\tl_2024_06_tabblock20.shp",
                "centerline_streets_path": r"D:\Research Fellowship\Summer Research Stuff\Clean_Utilities\Safety\San_Fran_Lime\Centerline.csv",
                "bike_lanes_path": r"D:\Research Fellowship\Summer Research Stuff\Clean_Utilities\Safety\San_Fran_Lime\Bikelane.csv",
                "centroid_tract_path": r"D:\Research Fellowship\Summer Research Stuff\Clean_Utilities\Capacity\San_Fran_Baywheels\centroid_tract_ca.csv",
            },
            "safety_epsg": 26910, "idle_decimals": 4
        },
        "SEATTLE_BIRD_DOCKLESS": {
            "city": "Seattle", "vendor": "Bird", "tag": "seattle_bird",
            "default_output_dir": "SEATTLE_BIRD_DOCKLESS_FULL_RUN",
            "raw_csv": "seattle_bird_status_raw.csv", "done_csv": "seattle_bird_status_done.csv",
            "assets": {
                "census_blocks_shp": r"D:\Research Fellowship\Summer Research Stuff\Clean_Utilities\Safety\Seattle_Bird\Seattle Census Block\tl_2024_53_tabblock20.shp",
                "centerline_streets_path": r"D:\Research Fellowship\Summer Research Stuff\Clean_Utilities\Safety\Seattle_Bird\Seattle_Streets.shp",
                "bike_lanes_path": r"D:\Research Fellowship\Summer Research Stuff\Clean_Utilities\Safety\Seattle_Bird\SDOT_Bike_Facilities_5512142703833213564.geojson",
                "planned_bike_lanes_path": r"D:\Research Fellowship\Summer Research Stuff\Clean_Utilities\Safety\Seattle_Bird\Planned Seattle Bike Lanes\Planned_Bike_Facilities.shp",
                "centroid_tract_path": r"D:\Research Fellowship\Summer Research Stuff\Clean_Utilities\Avalibility\Seattle_Bird\tl_2024_53_tract.shp",
            },
            "safety_epsg": 2285, 
            "safety_config": {"bike_lane_class_col": "CATEGORY", "protected_values": ["BKF-PBL"], "protected_match_mode": "exact"},
            "idle_decimals": 3 
        },
        "SEATTLE_LIME_DOCKLESS": {
            "city": "Seattle", "vendor": "Lime", "tag": "seattle_lime",
            "default_output_dir": "SEATTLE_LIME_DOCKLESS_FULL_RUN",
            "raw_csv": "seattle_lime_status_raw.csv", "done_csv": "seattle_lime_status_done.csv",
            "assets": {
                "census_blocks_shp": r"D:\Research Fellowship\Summer Research Stuff\Clean_Utilities\Safety\Seattle_Bird\Seattle Census Block\tl_2024_53_tabblock20.shp",
                "centerline_streets_path": r"D:\Research Fellowship\Summer Research Stuff\Clean_Utilities\Safety\Seattle_Bird\Seattle_Streets.shp",
                "bike_lanes_path": r"D:\Research Fellowship\Summer Research Stuff\Clean_Utilities\Safety\Seattle_Bird\SDOT_Bike_Facilities_5512142703833213564.geojson",
                "planned_bike_lanes_path": r"D:\Research Fellowship\Summer Research Stuff\Clean_Utilities\Safety\Seattle_Bird\Planned Seattle Bike Lanes\Planned_Bike_Facilities.shp",
                "centroid_tract_path": r"D:\Research Fellowship\Summer Research Stuff\Clean_Utilities\Avalibility\Seattle_Bird\tl_2024_53_tract.shp",
            },
            "safety_epsg": 2285,
            "safety_config": {"bike_lane_class_col": "CATEGORY", "protected_values": ["BKF-PBL"], "protected_match_mode": "exact"},
            "idle_decimals": 4
        },
    }

    if system_key not in SYSTEMS: raise ValueError(f"Unknown system_key: {system_key}")
    preset = SYSTEMS[system_key]
    output_tag = preset["tag"]
    
    # Initialize Paths and Defaults
    output_dir = output_dir or preset["default_output_dir"]
    step0_raw_csv_name = preset["raw_csv"]
    step0_done_csv_name = preset["done_csv"]
    p_assets = preset.get("assets", {})
    
    census_blocks_shp = census_blocks_shp or p_assets.get("census_blocks_shp")
    centerline_streets_path = centerline_streets_path or p_assets.get("centerline_streets_path")
    bike_lanes_path = bike_lanes_path or p_assets.get("bike_lanes_path")
    planned_bike_lanes_path = planned_bike_lanes_path or p_assets.get("planned_bike_lanes_path")
    centroid_tract_path = centroid_tract_path or p_assets.get("centroid_tract_path")
    
    safety_working_epsg = safety_working_epsg or preset.get("safety_epsg", 26910)
    p_safe_conf = preset.get("safety_config", {})
    if safety_bike_lane_class_col is None: safety_bike_lane_class_col = p_safe_conf.get("bike_lane_class_col")
    if safety_protected_values is None: safety_protected_values = p_safe_conf.get("protected_values")
    if "protected_match_mode" in p_safe_conf: safety_protected_match_mode = p_safe_conf["protected_match_mode"]

    p_idle_conf = preset.get("idle_config", {})
    if "decimals" in p_idle_conf: idle_rounding_decimals = p_idle_conf["decimals"]

    out_dir = Path(output_dir)
    out_dir.mkdir(parents=True, exist_ok=True)
    if not Path(freebike_status_txt).exists(): raise FileNotFoundError(f"Missing: {freebike_status_txt}")
    
    raw_path = out_dir / step0_raw_csv_name
    done_path = out_dir / step0_done_csv_name

    results: Dict[str, Any] = {"system_key": system_key, "city": preset["city"], "vendor": preset["vendor"], "output_dir": str(out_dir)}

    def _minmax(series: pd.Series) -> pd.Series:
        s = pd.to_numeric(series, errors="coerce")
        mn, mx = s.min(skipna=True), s.max(skipna=True)
        if pd.isna(mn) or pd.isna(mx) or mx <= mn: return pd.Series(0.0, index=s.index)
        return (s - mn) / (mx - mn)
    
    # ============================================================
    # STEP 0: PARSING
    # ============================================================
    rows = []
    with Path(freebike_status_txt).open("r", encoding="utf-8") as f:
        for line in f:
            if not line.strip(): continue
            try:
                blob = json.loads(line)
                ts = list(blob.keys())[0]
                for entry in blob[ts]:
                    if "vehicle_types_available" in entry:
                        for vt in entry.get("vehicle_types_available", []):
                            entry[f"vehicle_type_{vt.get('vehicle_type_id', 'unknown')}_count"] = vt.get("count", 0)
                        del entry["vehicle_types_available"]
                    entry["timestamp"] = ts
                    rows.append(entry)
            except: continue

    raw_df = pd.DataFrame(rows)
    if raw_df.empty: raise ValueError("No data found in text file.")
    
    # ID Column Detection
    id_candidates = ["bike_id", "vehicle_id", "id"]
    vehicle_id_col = next((c for c in id_candidates if c in raw_df.columns), None)
    if not vehicle_id_col: raise ValueError(f"Could not find vehicle ID. Checked: {id_candidates}")
    
    raw_df["timestamp"] = pd.to_datetime(raw_df["timestamp"], errors="coerce")
    if raw_df["timestamp"].isna().mean() > 0.5:
        raw_df["timestamp"] = pd.to_datetime(raw_df["timestamp"], unit="s", errors="coerce")

    if drop_cols_if_present:
        drop_now = [c for c in drop_cols_if_present if c in raw_df.columns]
        if drop_now: raw_df = raw_df.drop(columns=drop_now)

    if save_outputs: raw_df.to_csv(raw_path, index=False)

    # Spatial Join
    if not {"lat", "lon"}.issubset(raw_df.columns): raise ValueError("Input missing lat/lon")
    latlon = raw_df[["lat", "lon"]].drop_duplicates()
    gdf = gpd.GeoDataFrame(latlon, geometry=[Point(xy) for xy in zip(latlon.lon, latlon.lat)], crs="EPSG:4326")
    blocks = gpd.read_file(str(census_blocks_shp)).to_crs(epsg=blocks_target_epsg)
    
    b_col = "GEOID20" if "GEOID20" in blocks.columns else "GEOID"
    joined = gpd.sjoin(gdf, blocks[[b_col, "geometry"]], how="left", predicate="within")
    joined = joined.rename(columns={b_col: "census_block"})
    
    done_df = raw_df.merge(joined[["lat", "lon", "census_block"]], on=["lat", "lon"], how="left")
    
    if fill_missing_with_census_api:
        missing = done_df[done_df["census_block"].isna()][["lat", "lon"]].drop_duplicates()
        if not missing.empty and len(missing) < 1000:
            tqdm.pandas(desc="API Fill")
            def get_blk(r):
                try:
                    url = f"https://geocoding.geo.census.gov/geocoder/geographies/coordinates?x={r.lon}&y={r.lat}&benchmark={census_benchmark}&vintage={census_vintage}&format=json"
                    res = requests.get(url, timeout=5).json()
                    return res["result"]["geographies"]["2020 Census Blocks"][0]["GEOID"]
                except: return None
            missing["cb_new"] = missing.progress_apply(get_blk, axis=1)
            done_df = done_df.merge(missing, on=["lat", "lon"], how="left")
            done_df["census_block"] = done_df["census_block"].fillna(done_df["cb_new"])
            done_df.drop(columns=["cb_new"], inplace=True, errors="ignore")

    done_df["census_block"] = done_df["census_block"].fillna("unknown").astype(str)
    done_df["census_tract"] = done_df["census_block"].str[:tract_digits]
    
    if time_start: done_df = done_df[done_df["timestamp"] >= pd.to_datetime(time_start)]
    if time_end: done_df = done_df[done_df["timestamp"] < pd.to_datetime(time_end)]
    if save_outputs: done_df.to_csv(done_path, index=False)

    # ============================================================
    # AVAILABILITY
    # ============================================================
    df_av = done_df.copy()
    df_av["slot"] = df_av["timestamp"].dt.floor(availability_block_time_granularity)
    if "is_reserved" not in df_av.columns: df_av["is_reserved"] = 0
    if "is_disabled" not in df_av.columns: df_av["is_disabled"] = 0
    av = df_av[(df_av["is_reserved"]==0) & (df_av["is_disabled"]==0)]
    
    av_blk = av.groupby(["census_block", "slot"]).size().reset_index(name="total_available")
    av["h_slot"] = av["timestamp"].dt.floor(availability_tract_time_granularity)
    av_tract = av.groupby(["census_tract", "h_slot"]).size().reset_index(name="total_available")
    if availability_normalize:
        av_tract["total_available_norm"] = _minmax(av_tract["total_available"]).round(5)

    if save_outputs:
        av_blk.to_csv(out_dir / f"availability_block_{output_tag}.csv", index=False)
        av_tract.to_csv(out_dir / f"availability_tract_hourly_raw_{output_tag}.csv", index=False)

    # ============================================================
    # USAGE
    # ============================================================
    df_u = done_df.copy()
    df_u["slot"] = df_u["timestamp"].dt.floor(usage_base_time_slot)
    df_u["lat_r"] = df_u["lat"].round(usage_rounding_decimals)
    df_u["lon_r"] = df_u["lon"].round(usage_rounding_decimals)
    
    cnts = df_u.groupby(["census_block", "slot", "lat_r", "lon_r"]).size().reset_index(name="cnt")
    prev = cnts.copy()
    prev["slot"] += pd.to_timedelta(usage_base_time_slot)
    
    flux = prev.merge(cnts, on=["census_block", "slot", "lat_r", "lon_r"], how="outer", suffixes=("_prev", "_curr")).fillna(0)
    flux["starts"] = (flux["cnt_prev"] - flux["cnt_curr"]).clip(lower=0)
    flux["ends"] = (flux["cnt_curr"] - flux["cnt_prev"]).clip(lower=0)
    
    use_blk = flux.groupby(["census_block", "slot"])[["starts", "ends"]].sum().reset_index()
    use_blk["h_slot"] = use_blk["slot"].dt.floor(usage_aggregate_time_slot)
    use_blk["census_tract"] = use_blk["census_block"].str[:tract_digits]
    use_tract = use_blk.groupby(["census_tract", "h_slot"])[["starts", "ends"]].sum().reset_index()
    
    if save_outputs:
        use_blk.to_csv(out_dir / f"usage_5min_block_{output_tag}.csv", index=False)
        use_tract.to_csv(out_dir / f"usage_hourly_tract_{output_tag}.csv", index=False)

    # ============================================================
    # IDLE TIME (Dynamic Detection + Robust Logic)
    # ============================================================
    df_i = done_df.copy()
    
    # 1. Column Detection
    if idle_vehicle_type_col not in df_i.columns:
        if "vehicle_type" in df_i.columns: idle_vehicle_type_col = "vehicle_type"
        else:
            idle_vehicle_type_col = "vehicle_type_id"
            df_i[idle_vehicle_type_col] = idle_default_vehicle_type_for_missing
    df_i[idle_vehicle_type_col] = df_i[idle_vehicle_type_col].fillna(idle_default_vehicle_type_for_missing)

    df_i = df_i.sort_values([idle_vehicle_id_col, "timestamp"]).reset_index(drop=True)
    df_i["h_bucket"] = df_i["timestamp"].dt.floor(idle_hour_bucket_freq)
    
    # 3. Robust Calculation (Count * 5mins)
    idle_res = df_i.groupby(["census_block", "h_bucket", idle_vehicle_type_col]).size().reset_index(name="ping_count")
    idle_res["avg_idle_time_minutes"] = idle_res["ping_count"] * 5 
    idle_res["num_idle_segments"] = idle_res["ping_count"] 
    
    idle_res["census_tract"] = idle_res["census_block"].str[:tract_digits]
    idle_tract = idle_res.groupby(["census_tract", "h_bucket", idle_vehicle_type_col])[["avg_idle_time_minutes", "num_idle_segments"]].sum().reset_index()
    
    if idle_normalize:
        idle_tract["avg_idle_time_minutes_norm"] = _minmax(idle_tract["avg_idle_time_minutes"]).round(5)

    if save_outputs:
        idle_res.to_csv(out_dir / f"idle_summary_block_{output_tag}.csv", index=False)
        idle_tract.to_csv(out_dir / f"idle_summary_tract_{output_tag}.csv", index=False)
        idle_tract.to_csv(out_dir / f"idle_summary_tract_norm_{output_tag}.csv", index=False)

    # ============================================================
    # SAFETY
    # ============================================================
    print(f"DEBUG: Using Blocks SHP: {census_blocks_shp}")
    blocks = gpd.read_file(str(census_blocks_shp))
    if safety_working_epsg: blocks = blocks.to_crs(epsg=int(safety_working_epsg))
    
    # Load Streets
    st_path = Path(centerline_streets_path)
    if st_path.suffix.lower() == ".csv":
        st_df = pd.read_csv(st_path)
        st_df["geometry"] = st_df[safety_centerline_wkt_col].apply(lambda x: wkt.loads(x) if isinstance(x, str) else None)
        st = gpd.GeoDataFrame(st_df, geometry="geometry", crs=safety_input_crs)
    else: st = gpd.read_file(st_path).to_crs(safety_input_crs)
    st = st.to_crs(blocks.crs)
    
    # Load Bike Lanes
    bl_path = Path(bike_lanes_path)
    if bl_path.suffix.lower() == ".csv":
        bl_df = pd.read_csv(bl_path)
        bl_df["geometry"] = bl_df[safety_bike_lane_wkt_col].apply(lambda x: wkt.loads(x) if isinstance(x, str) else None)
        bl = gpd.GeoDataFrame(bl_df, geometry="geometry", crs=safety_input_crs)
    else: bl = gpd.read_file(bl_path).to_crs(safety_input_crs)
    
    if planned_bike_lanes_path:
        pl = gpd.read_file(str(planned_bike_lanes_path)).to_crs(safety_input_crs)
        bl = pd.concat([bl, pl], ignore_index=True)
    bl = bl.to_crs(blocks.crs)

    conf = preset.get("safety_config", {})
    col = conf.get("bike_lane_class_col", "class")
    vals = conf.get("protected_values", [])
    
    prot = bl.iloc[0:0]
    if col in bl.columns and vals:
        prot = bl[bl[col].isin(vals)]

    def get_len(lines, poly, name):
        if lines.empty: return pd.DataFrame({"census_block": [], name: []})
        c = gpd.overlay(lines, poly, how="intersection")
        c["l"] = c.geometry.length
        bid = "GEOID20" if "GEOID20" in poly.columns else "GEOID"
        return c.groupby(bid)["l"].sum().reset_index(name=name).rename(columns={bid: "census_block"})

    s_len = get_len(st, blocks, "st_len")
    b_len = get_len(bl, blocks, "bl_len")
    p_len = get_len(prot, blocks, "pr_len")
    
    safe = s_len.merge(b_len, on="census_block", how="left").merge(p_len, on="census_block", how="left").fillna(0)
    safe["census_tract"] = safe["census_block"].astype(str).str[:tract_digits]
    
    # ------------------------------------------------------------------
    # CRITICAL FIX: Safe ID Matching + Padding
    # ------------------------------------------------------------------
    print(f"DEBUG: Loading Centroids from {centroid_tract_path}")
    ct_path = Path(centroid_tract_path)
    if ct_path.suffix.lower() == ".csv":
        ct = pd.read_csv(ct_path)
    else:
        ct = gpd.read_file(ct_path)
    
    cid = next((c for c in ["GEOID", "GEOID20", "TRACTCE", "census_tract"] if c in ct.columns), None)
    
    if cid:
        base = pd.DataFrame({"census_tract": ct[cid]})
        
        # CLEANING FUNCTION: Removes .0 and strips whitespace
        def clean_id(x):
            s = str(x).strip()
            if s.endswith(".0"): s = s[:-2]
            return s
            
        base["census_tract"] = base["census_tract"].apply(clean_id)
        safe["census_tract"] = safe["census_tract"].apply(clean_id)
        
        # AUTO-PADDING: Fix mismatched lengths (e.g., 10 vs 11 digits)
        len_base = base["census_tract"].str.len().mode()[0]
        len_safe = safe["census_tract"].str.len().mode()[0]
        
        if len_base < len_safe:
            base["census_tract"] = base["census_tract"].str.zfill(int(len_safe))
            print("DEBUG: Padded Base IDs with leading zero.")
        elif len_safe < len_base:
            safe["census_tract"] = safe["census_tract"].str.zfill(int(len_base))
            print("DEBUG: Padded Safe IDs with leading zero.")
            
        print(f"DEBUG: Sample Safe IDs: {safe['census_tract'].head().tolist()}")
        print(f"DEBUG: Sample Base IDs: {base['census_tract'].head().tolist()}")
    else:
        print("DEBUG: No valid ID column found in centroid file.")
        base = pd.DataFrame({"census_tract": []})

    tract_safe = safe.groupby("census_tract")[["st_len", "bl_len", "pr_len"]].sum().reset_index()
    final = base.merge(tract_safe, on="census_tract", how="left").fillna(0)
    
    final["ratio_bl"] = np.where(final["st_len"]>0, final["bl_len"]/final["st_len"], 0)
    final["ratio_pr"] = np.where(final["st_len"]>0, final["pr_len"]/final["st_len"], 0)
    
    if save_outputs:
        safe.to_csv(out_dir / f"safety_block_{output_tag}.csv", index=False)
        final.to_csv(out_dir / f"safety_tract_{output_tag}.csv", index=False)

    results["safety"] = {"tract": final}
    return results

In [40]:
out_spin = run_sf_dockless_all_utilities_single_function( 
    system_key="SF_LIME_DOCKLESS", # Change the system key accordingly SF_LIME_DOCKLESS or SF_SPIN_DOCKLESS
    freebike_status_txt=r"D:\Research Fellowship\Summer Research Stuff\Collected Data\Week 1\09-June\san_fran_lime_dkless_freebike_status_6_9.txt",
    output_dir="SF_LIME_FULL_RUN", #Change the output directory name accordingly
    time_start="2025-06-09 06:00:00",
    time_end="2025-06-09 12:00:00",
)

  raw_df["timestamp"] = pd.to_datetime(raw_df["timestamp"], errors="coerce")


DEBUG: Using Blocks SHP: D:\Research Fellowship\Summer Research Stuff\Clean_Utilities\GBFS_Census_Tract\San_Fran_Lime\tl_2024_06_tabblock20.shp
DEBUG: Loading Centroids from D:\Research Fellowship\Summer Research Stuff\Clean_Utilities\Capacity\San_Fran_Baywheels\centroid_tract_ca.csv
DEBUG: Padded Base IDs with leading zero.
DEBUG: Sample Safe IDs: ['06075010101', '06075010101', '06075010101', '06075010101', '06075010101']
DEBUG: Sample Base IDs: ['06001442700', '06001442800', '06037204920', '06037205110', '06037205120']
