In [1]:
# Step 0 — Config & Imports
# -------------------------
# Minimal configuration for a journal annex: fixed CRS, decay bins, paths, and a modality
# default (MRI). Comments are concise and UK English. Keep constants explicit.

from pathlib import Path
from typing import Dict, Tuple

import geopandas as gpd
import numpy as np
import pandas as pd

# --- Core settings (edit to taste) ---
MODALITY: str = "MRI"              # Default modality (your project defaults to MRI)
CRS_EPSG: int = 27700              # British National Grid for siting and spatial ops
OD_CUTOFF_MIN: int = 60            # Car travel-time cut-off for scoring
RANDOM_STATE: int = 42             # Reproducible clustering/seeding later on

# Piecewise decay weights applied to car minutes. Zero beyond 60 by construction.
# Each tuple is (min_inclusive, max_inclusive, weight).
DECAY_BINS_MINUTES: Tuple[Tuple[int, int, float], ...] = (
    (0, 10, 1.00),
    (10, 20, 0.75),
    (20, 30, 0.50),
    (30, 40, 0.30),
    (40, 50, 0.15),
    (50, 60, 0.05),
)

# Optional exclusions (e.g., Isles of Scilly) for clustering; scoring may still include them.
EXCLUDED_LSOAS: set[str] = {"E06000053"}  # Isles of Scilly

# --- File paths (replace with your actual locations) ---
# Keep paths explicit and stable for reproducibility.
PROCESSED_DIR = Path("data/processed")

LSOA_PATH = PROCESSED_DIR / "LSOA_MRI_Demand_with_Demographics_2024_v1.gpkg"
OD_PATH = PROCESSED_DIR / "LSOA_to_LSOA_car_minutes_dense.csv"  # dense for siting; trimmed at scoring
SITES_BASELINE_PATH = PROCESSED_DIR / "MRI_sites_capability_baseline.csv"

# --- Column standards (lightweight and explicit) ---
LSOA_ID_COL = "lsoa21cd"
OD_ORIG_COL = "origin"
OD_DEST_COL = "destination"
OD_MINS_COL = "car_mins"
SITE_LSOA_COL = "lsoa21cd"         # destination LSOA of the site
SITE_NAME_COL = "site_name"
SITE_CAPACITY_COL = "scanner_count"  # capacity units (scanners/rooms)

# Demand column candidates; first match wins. MRI default first, then fallbacks.
DEMAND_COL_CANDIDATES = (
    "mri_total_demand",
    "ct_total_demand",
    "total_demand",
    "demand",
)


In [2]:
# Step 1 — Load inputs and light validation
# -----------------------------------------
# One guard per failure class (I/O, fields, shape, values). Silent success; clear errors.

def _require_file(p: Path) -> None:
    """Raise a clear error if a required input file is missing."""
    if not p.exists():
        raise FileNotFoundError(f"Required file not found: {p}")

def _select_demand_col(df: pd.DataFrame) -> str:
    """Pick the first present demand column from the candidate list."""
    for c in DEMAND_COL_CANDIDATES:
        if c in df.columns:
            return c
    raise KeyError(
        "No demand column found. Expected one of: "
        f"{', '.join(DEMAND_COL_CANDIDATES)}"
    )

def _normalise_lsoa_codes(s: pd.Series) -> pd.Series:
    """Trim and uppercase LSOA codes to avoid subtle mismatches."""
    return s.astype(str).str.strip().str.upper()

def load_inputs(
    lsoa_path: Path,
    od_path: Path,
    sites_path: Path,
    modality: str = MODALITY,
) -> Dict[str, object]:
    """
    Load baseline LSOA GeoData, dense OD matrix (car minutes), and baseline site capacity.

    Returns:
        dict with keys:
            - gdf_lsoa (GeoDataFrame): LSOA geometry + demand.
            - od (DataFrame): origin, destination, car_mins (dense; later trimmed to ≤60).
            - sites (DataFrame): baseline site capacity by destination LSOA.
            - demand_col (str): chosen demand column name.
    """
    # --- I/O guards ---
    _require_file(lsoa_path)
    _require_file(od_path)
    _require_file(sites_path)

    # --- LSOA GeoData ---
    gdf_lsoa = gpd.read_file(lsoa_path)
    if LSOA_ID_COL not in gdf_lsoa.columns:
        raise KeyError(f"Missing '{LSOA_ID_COL}' in {lsoa_path.name}")

    # Normalise IDs; ensure target CRS for any later spatial work.
    gdf_lsoa[LSOA_ID_COL] = _normalise_lsoa_codes(gdf_lsoa[LSOA_ID_COL])
    if gdf_lsoa.crs is None:
        raise ValueError("LSOA GeoDataFrame has no CRS; expected EPSG:27700.")
    if gdf_lsoa.crs.to_epsg() != CRS_EPSG:
        gdf_lsoa = gdf_lsoa.to_crs(CRS_EPSG)

    demand_col = _select_demand_col(gdf_lsoa)
    # Demand must be numeric and non-negative (zeros permitted).
    gdf_lsoa[demand_col] = pd.to_numeric(gdf_lsoa[demand_col], errors="coerce")
    if gdf_lsoa[demand_col].isna().any():
        raise ValueError("Demand column contains NaNs after coercion to numeric.")
    if (gdf_lsoa[demand_col] < 0).any():
        raise ValueError("Demand column contains negative values, which are invalid.")

    # --- OD matrix (dense; used for siting, later trimmed for scoring) ---
    od = pd.read_csv(od_path)
    missing_od_cols = {OD_ORIG_COL, OD_DEST_COL, OD_MINS_COL} - set(od.columns)
    if missing_od_cols:
        raise KeyError(f"OD matrix missing columns: {sorted(missing_od_cols)}")

    # Normalise IDs; minutes must be numeric and ≥ 0.
    od[OD_ORIG_COL] = _normalise_lsoa_codes(od[OD_ORIG_COL])
    od[OD_DEST_COL] = _normalise_lsoa_codes(od[OD_DEST_COL])
    od[OD_MINS_COL] = pd.to_numeric(od[OD_MINS_COL], errors="coerce")

    if od[OD_MINS_COL].isna().any() or (od[OD_MINS_COL] < 0).any():
        raise ValueError("OD car minutes must be non-negative numeric values.")

    # Single shape check: OD covers the LSOA universe and includes the diagonal.
    lsoas = set(gdf_lsoa[LSOA_ID_COL].unique())
    od_orig = set(od[OD_ORIG_COL].unique())
    od_dest = set(od[OD_DEST_COL].unique())
    if not (lsoas <= od_orig and lsoas <= od_dest):
        raise ValueError("OD matrix does not cover all LSOAs present in the baseline file.")

    diag = od.loc[od[OD_ORIG_COL] == od[OD_DEST_COL], [OD_ORIG_COL, OD_MINS_COL]]
    missing_diag = lsoas - set(diag[OD_ORIG_COL].unique())
    if missing_diag:
        raise ValueError("OD matrix is missing diagonal rows for some LSOAs (self-pairs).")
    # Allow 0 or very small self-times; enforce non-negative handled above.

    # --- Baseline site capability ---
    sites = pd.read_csv(sites_path)
    missing_site_cols = {SITE_LSOA_COL, SITE_CAPACITY_COL} - set(sites.columns)
    if missing_site_cols:
        raise KeyError(f"Sites file missing columns: {sorted(missing_site_cols)}")

    sites[SITE_LSOA_COL] = _normalise_lsoa_codes(sites[SITE_LSOA_COL])
    sites[SITE_CAPACITY_COL] = pd.to_numeric(sites[SITE_CAPACITY_COL], errors="coerce")

    if sites[SITE_CAPACITY_COL].isna().any() or (sites[SITE_CAPACITY_COL] < 0).any():
        raise ValueError("Site capacity must be non-negative numeric values.")

    # Light deduplication by destination LSOA in case of accidental repeats.
    sites = (
        sites.groupby(SITE_LSOA_COL, as_index=False)[SITE_CAPACITY_COL]
        .sum()
        .rename(columns={SITE_CAPACITY_COL: SITE_CAPACITY_COL})
    )

    # Return a tidy bundle; later steps can trim OD to ≤60 and compute R_j and F_i.
    return {
        "gdf_lsoa": gdf_lsoa,
        "od": od,
        "sites": sites,
        "demand_col": demand_col,
    }


# --- Usage (example; leave commented in the annex) ---
# data = load_inputs(LSOA_PATH, OD_PATH, SITES_BASELINE_PATH, modality=MODALITY)
# gdf_lsoa, od, sites, demand_col = (
#     data["gdf_lsoa"], data["od"], data["sites"], data["demand_col"]
# )


In [3]:
# Step 2 — Prepare OD for scoring (trim, decay, denominators)
# -----------------------------------------------------------
# We trim the dense OD matrix to ≤60 minutes, apply piecewise decay weights,
# attach origin demand, and aggregate to a per-destination denominator used
# for computing site pressure R_j in the next step.

from typing import Dict


def weight_for_minutes(minutes: float) -> float:
    """
    Return the decay weight for a given car travel time in minutes.
    Uses inclusive bins defined in DECAY_BINS_MINUTES; zero beyond the cut-off.
    """
    m = float(minutes)
    for lo, hi, w in DECAY_BINS_MINUTES:
        if lo <= m <= hi:
            return float(w)
    return 0.0


def prepare_scoring_frames(bundle: Dict[str, object], lsoa_id_col: str = LSOA_ID_COL) -> Dict[str, pd.DataFrame]:
    """
    Build trimmed, decay-weighted OD and the per-destination denominator.

    Args:
        bundle: dict from load_inputs() with keys 'gdf_lsoa', 'od', 'sites', 'demand_col'.
        lsoa_id_col: LSOA code column name (default from config).

    Returns:
        dict with:
            - od_scoring: origin, destination, car_mins, decay_w, demand, weighted_demand (≤60 min only)
            - site_denominator: destination LSOA with 'total_weighted_demand'
            - sites_ready: baseline site capacity table keyed by destination LSOA
    """
    gdf_lsoa: gpd.GeoDataFrame = bundle["gdf_lsoa"]
    od: pd.DataFrame = bundle["od"]
    sites: pd.DataFrame = bundle["sites"]
    demand_col: str = bundle["demand_col"]

    # 1) Trim to the scoring cut-off (≤60 minutes) and drop zero-weight rows.
    od_scoring = od.loc[od[OD_MINS_COL] <= OD_CUTOFF_MIN, [OD_ORIG_COL, OD_DEST_COL, OD_MINS_COL]].copy()
    od_scoring["decay_w"] = od_scoring[OD_MINS_COL].apply(weight_for_minutes)
    od_scoring = od_scoring.loc[od_scoring["decay_w"] > 0.0].copy()

    # 2) Attach origin demand and compute weighted demand.
    origin_demand = gdf_lsoa[[lsoa_id_col, demand_col]].rename(columns={lsoa_id_col: OD_ORIG_COL})
    od_scoring = od_scoring.merge(origin_demand, on=OD_ORIG_COL, how="left", validate="many_to_one")

    if od_scoring[demand_col].isna().any():
        # One clear failure if any origin demand is missing in the trimmed OD.
        raise ValueError("Trimmed OD references origins with no demand value in the LSOA table.")

    od_scoring["weighted_demand"] = od_scoring[demand_col] * od_scoring["decay_w"]

    # 3) Aggregate to per-destination denominator for R_j.
    site_denominator = (
        od_scoring.groupby(OD_DEST_COL, as_index=False)["weighted_demand"]
        .sum()
        .rename(columns={"weighted_demand": "total_weighted_demand"})
    )

    # 4) Ensure every site destination appears in the denominator table
    #    (sites with no ≤60-minute origins get zero, so R_j becomes 0 later).
    sites_ready = sites[[SITE_LSOA_COL, SITE_CAPACITY_COL]].copy()
    sites_ready = (
        sites_ready.groupby(SITE_LSOA_COL, as_index=False)[SITE_CAPACITY_COL].sum()
        .rename(columns={SITE_LSOA_COL: OD_DEST_COL})
    )

    site_denominator = sites_ready.merge(
        site_denominator, on=OD_DEST_COL, how="left", validate="one_to_one"
    )
    site_denominator["total_weighted_demand"] = site_denominator["total_weighted_demand"].fillna(0.0)

    return {
        "od_scoring": od_scoring,
        "site_denominator": site_denominator,  # columns: destination, scanner_count, total_weighted_demand
        "sites_ready": sites_ready,            # keyed by destination for any later joins
    }


# --- Usage (example; leave commented in the annex) ---
# data = load_inputs(LSOA_PATH, OD_PATH, SITES_BASELINE_PATH)
# frames = prepare_scoring_frames(data)
# od_scoring = frames["od_scoring"]
# site_denominator = frames["site_denominator"]
# sites_ready = frames["sites_ready"]


In [4]:
# Step 3 — Compute site pressure (R_j) and baseline accessibility (F_i)
# --------------------------------------------------------------------
# R_j: site capacity divided by decay-weighted demand within ≤60 minutes.
# F_i: sum over destinations of (decay weight × R_j) for each origin LSOA.

from typing import Dict

RJ_COL = "Rj"
BASELINE_SCORE_COL = "score_baseline"


def compute_site_pressure(site_denominator: pd.DataFrame) -> pd.DataFrame:
    """
    Compute site pressure R_j = capacity / total_weighted_demand.
    Zero denominator yields R_j = 0.0 (no contribution to any F_i).
    """
    df = site_denominator.copy()
    denom = df["total_weighted_demand"].to_numpy()
    cap = df[SITE_CAPACITY_COL].to_numpy()

    with np.errstate(divide="ignore", invalid="ignore"):
        rj = np.where(denom > 0.0, cap / denom, 0.0)

    df[RJ_COL] = rj.astype(float)
    return df[[OD_DEST_COL, SITE_CAPACITY_COL, "total_weighted_demand", RJ_COL]]


def compute_baseline_access(
    bundle: Dict[str, object], frames: Dict[str, pd.DataFrame]
) -> Dict[str, object]:
    """
    Merge R_j onto the trimmed OD, sum decay-weighted contributions per origin,
    and append the baseline score to the LSOA GeoDataFrame.

    Args:
        bundle: dict from load_inputs() with keys 'gdf_lsoa', 'demand_col', etc.
        frames: dict from prepare_scoring_frames() with keys 'od_scoring', 'site_denominator'.

    Returns:
        dict with:
            - rj_table (DataFrame): destination, capacity, denominator, Rj
            - gdf_baseline (GeoDataFrame): original LSOAs + 'score_baseline'
    """
    gdf_lsoa: gpd.GeoDataFrame = bundle["gdf_lsoa"]
    od_scoring: pd.DataFrame = frames["od_scoring"]
    site_denominator: pd.DataFrame = frames["site_denominator"]

    # 1) Site pressure R_j
    rj_table = compute_site_pressure(site_denominator)

    # 2) Join R_j to OD and sum contributions per origin
    od_with_rj = od_scoring.merge(
        rj_table[[OD_DEST_COL, RJ_COL]],
        on=OD_DEST_COL,
        how="left",
        validate="many_to_one",
    )

    # Any destination with no R_j (should not happen) contributes zero
    od_with_rj[RJ_COL] = od_with_rj[RJ_COL].fillna(0.0)

    # Contribution = decay_w × R_j
    od_with_rj["contrib"] = od_with_rj["decay_w"] * od_with_rj[RJ_COL]

    fi = (
        od_with_rj.groupby(OD_ORIG_COL, as_index=False)["contrib"]
        .sum()
        .rename(columns={"contrib": BASELINE_SCORE_COL})
    )

    # 3) Attach F_i back to all LSOAs (fill 0 for any with no ≤60-minute links)
    gdf_baseline = gdf_lsoa.merge(
        fi.rename(columns={OD_ORIG_COL: LSOA_ID_COL}),
        on=LSOA_ID_COL,
        how="left",
        validate="one_to_one",
    )
    gdf_baseline[BASELINE_SCORE_COL] = gdf_baseline[BASELINE_SCORE_COL].fillna(0.0)

    return {"rj_table": rj_table, "gdf_baseline": gdf_baseline}


# --- Usage (example; leave commented in the annex) ---
# data = load_inputs(LSOA_PATH, OD_PATH, SITES_BASELINE_PATH)
# frames = prepare_scoring_frames(data)
# out_baseline = compute_baseline_access(data, frames)
# rj_table = out_baseline["rj_table"]
# gdf_baseline = out_baseline["gdf_baseline"]  # contains 'score_baseline'
