In [40]:
# Step 0 — Setup, paths, helpers
# - Uses ONLY files present in your test_data_ICB_level folder.
# - Creates tables/maps/matrices subfolders if missing.

from __future__ import annotations
from pathlib import Path
from typing import Iterable, Optional
from datetime import datetime, timezone
import warnings, json

import numpy as np
import pandas as pd
import geopandas as gpd
import fiona

# >>> UPDATE PATH IF YOU MOVE THE FOLDER <<<
BASE = Path("/Users/rosstaylor/Downloads/Code Repositories/REACH Map (NHS SW)") \
    / "GitHub Repo" / "REACH-Map-NHS-SW" / "data" / "raw" / "test_data_ICB_level"

TABLES   = BASE / "tables";   TABLES.mkdir(parents=True, exist_ok=True)
MATRICES = BASE / "matrices"; MATRICES.mkdir(parents=True, exist_ok=True)
MAPS     = BASE / "maps";     MAPS.mkdir(parents=True, exist_ok=True)

RESPONSE_THRESHOLDS      = (7, 15, 18, 40)
SCENE_TO_AE_THRESHOLDS   = (30, 45, 60)

RUN_META = {
    "notebook": "02a_coverage",
    "utc": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
    "base": str(BASE),
    "resp_thresholds": RESPONSE_THRESHOLDS,
    "conv_thresholds": SCENE_TO_AE_THRESHOLDS,
}

def _ok(msg: str) -> None: print(f"[OK] {msg}")
def _warn(msg: str) -> None: warnings.warn(msg, stacklevel=2)
def _die(msg: str) -> None: raise RuntimeError(msg)

def _save_prov(target: Path, extras: dict | None = None) -> None:
    prov = dict(RUN_META); prov.update(extras or {})
    target.with_suffix(target.suffix + ".provenance.json").write_text(
        json.dumps(prov, indent=2)
    )

def _choose_col(df: pd.DataFrame, candidates: Iterable[str]) -> str:
    for c in candidates:
        if c in df.columns: return c
    _die(f"None of {list(candidates)} present. Found: {list(df.columns)[:12]}")

def _check_monotone(counts_by_t: dict[int, int]) -> None:
    vals = [counts_by_t[t] for t in sorted(counts_by_t)]
    if any(b < a for a, b in zip(vals, vals[1:])):
        _die("Coverage counts are not monotone with increasing thresholds.")


In [41]:
# Step 1 — LSOA universe + geometry

LOOKUP_GPKG = BASE / "cornwall_icb_lsoa_lookup.gpkg"
LOOKUP_CSV  = BASE / "cornwall_icb_lsoa_lookup.csv"

def _read_lookup_gpkg(gpkg: Path) -> gpd.GeoDataFrame:
    # Try named layer; else grab the first available
    layer_name = "cornwall_icb_lsoa_lookup"
    if gpkg.exists():
        try:
            layers = fiona.listlayers(gpkg)
            lyr = layer_name if layer_name in layers else layers[0]
            g = gpd.read_file(gpkg, layer=lyr)
            return g
        except Exception as e:
            _warn(f"GPKG read failed ({e}); will try CSV fallback.")
    return None

lsoa_g = _read_lookup_gpkg(LOOKUP_GPKG)
if lsoa_g is None:
    if not LOOKUP_CSV.exists(): _die("No lookup GPKG/CSV found.")
    lsoa_g = gpd.read_file(LOOKUP_CSV)  # will be a plain DF; geometry absent
    if "geometry" not in lsoa_g.columns:
        # No geometry in CSV → keep attributes only; we’ll skip maps if so
        lsoa_g = pd.read_csv(LOOKUP_CSV, dtype={"lsoa_code":"string"}).pipe(pd.DataFrame)

if "lsoa_code" not in lsoa_g.columns:
    _die("Lookup lacks 'lsoa_code'.")

lsoa_g["lsoa_code"] = lsoa_g["lsoa_code"].astype("string")
lsoa_g = lsoa_g.drop_duplicates(subset=["lsoa_code"]).reset_index(drop=True)
lsoa_index = lsoa_g["lsoa_code"].tolist()
_ok(f"LSOA universe={len(lsoa_index):,} | CRS={getattr(lsoa_g, 'crs', None)}")


[OK] LSOA universe=336 | CRS=EPSG:27700


In [42]:
# Step 2 — Canonical population parquet

POP_PARQUET = TABLES / "population_by_lsoa.parquet"
GH_CSV      = BASE / "demographics_general_health_icb.csv"

def _standardise_population(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    df["lsoa_code"] = df["lsoa_code"].astype("string")
    # Prefer 'population_total'; fall back to 'population'; else sum numerics
    if "population_total" in df.columns:
        out = df.rename(columns={"population_total": "population"})[["lsoa_code","population"]].copy()
    elif "population" in df.columns:
        out = df[["lsoa_code","population"]].copy()
    else:
        num_cols = [c for c in df.select_dtypes(include="number").columns if c != "lsoa_code"]
        if not num_cols: _die("No numeric columns to infer population from.")
        out = df.assign(population=df[num_cols].sum(axis=1, skipna=True))[["lsoa_code","population"]].copy()
    out.loc[:, "population"] = pd.to_numeric(out["population"], errors="coerce")
    if out["population"].isna().any(): _die("Population has NaNs after standardisation.")
    return out

if POP_PARQUET.exists():
    population = pd.read_parquet(POP_PARQUET)
    population["lsoa_code"] = population["lsoa_code"].astype("string")
else:
    if not GH_CSV.exists(): _die("Missing demographics_general_health_icb.csv.")
    population = pd.read_csv(GH_CSV, dtype={"lsoa_code":"string"})
    population = _standardise_population(population)
    # Align to universe and persist
    population = pd.DataFrame({"lsoa_code": lsoa_index}).merge(population, on="lsoa_code", how="left")
    if population["population"].isna().any(): _die("Population missing for some LSOAs.")
    population.to_parquet(POP_PARQUET, index=False)
    _save_prov(POP_PARQUET, {"source":"02a_standardised"})
    _ok(f"Wrote {POP_PARQUET.name} | rows={len(population):,} | sum={int(population['population'].sum()):,}")


In [43]:
# Step 3 — Equity parquet (IMD quintile + RUC)

EQUITY_PARQUET = TABLES / "lsoa_lookup_equity.parquet"
IMD_CSV = BASE / "imd_icb.csv"
RUC_CSV = BASE / "ruc_icb.csv"

def _build_equity() -> pd.DataFrame:
    eq = pd.DataFrame({"lsoa_code": lsoa_index})
    # Keep optional labels if present in lookup
    for src, dst in (("ladnm","lad_name"), ("icb_name","icb_name")):
        if src in lsoa_g.columns:
            eq = eq.merge(lsoa_g[["lsoa_code", src]].rename(columns={src: dst}), on="lsoa_code", how="left")

    if not IMD_CSV.exists(): _die("Missing imd_icb.csv.")
    imd = pd.read_csv(IMD_CSV, dtype={"lsoa_code":"string"})
    imd_col = next((c for c in ("imd19","IMD2019_Rank","imd_rank") if c in imd.columns), None)
    if imd_col is None: _die("IMD file lacks an IMD rank column (imd19/IMD2019_Rank/imd_rank).")
    imd = imd[["lsoa_code", imd_col]].rename(columns={imd_col:"imd_rank"})
    imd["imd_rank"] = pd.to_numeric(imd["imd_rank"], errors="coerce")
    imd = imd.sort_values("imd_rank")
    imd["imd_quintile"] = pd.qcut(imd["imd_rank"].rank(method="first"), 5, labels=[1,2,3,4,5]).astype("Int64")
    imd = imd[["lsoa_code","imd_quintile"]]

    if not RUC_CSV.exists(): _die("Missing ruc_icb.csv.")
    ruc = pd.read_csv(RUC_CSV, dtype={"lsoa_code":"string"})
    ruc_col = next((c for c in ("ruc21nm","urban_rural_flag","ruc_label") if c in ruc.columns), None)
    if ruc_col is None: _die("RUC file lacks ruc21nm/urban_rural_flag.")
    ruc = ruc[["lsoa_code", ruc_col]].rename(columns={ruc_col:"ruc_category"})
    return (eq.merge(imd, on="lsoa_code", how="left")
             .merge(ruc, on="lsoa_code", how="left"))

if EQUITY_PARQUET.exists():
    equity = pd.read_parquet(EQUITY_PARQUET)
else:
    equity = _build_equity()
    equity.to_parquet(EQUITY_PARQUET, index=False)
    _save_prov(EQUITY_PARQUET, {"source":"02a_built"})
    _ok(f"Wrote {EQUITY_PARQUET.name} | rows={len(equity):,}")


In [48]:
# Step 4 — Travel + sites → baseline min times (lsoa21cd-aware)

TRAVEL_CSV   = BASE / "travel_matrix_lsoa_icb.csv"
STATIONS_CSV = BASE / "ambulance_stations_icb.csv"
ACUTE_CSV    = BASE / "acute_hospitals_icb.csv"

if not TRAVEL_CSV.exists(): _die(f"Missing {TRAVEL_CSV.name}")
if not STATIONS_CSV.exists(): _die(f"Missing {STATIONS_CSV.name}")
if not ACUTE_CSV.exists(): _die(f"Missing {ACUTE_CSV.name}")

# Travel matrix (origin_lsoa, dest_lsoa, time_* → time_car_min)
travel = pd.read_csv(
    TRAVEL_CSV,
    dtype={"origin_lsoa": "string", "dest_lsoa": "string"},
)
time_col = _choose_col(
    travel, ("time_car_min", "time_min", "minutes", "drive_min", "t_min")
)
if time_col != "time_car_min":
    travel = travel.rename(columns={time_col: "time_car_min"})
travel["time_car_min"] = travel["time_car_min"].astype("float32")

def _load_site_codes(path: Path) -> np.ndarray:
    """
    Load site LSOA codes from CSV, preferring 'lsoa21cd', with fallbacks.
    Accepted cols: lsoa21cd, lsoa_code, lsoa11cd, LSOA, site_lsoa, lsoa
    """
    df = pd.read_csv(path)
    col = next(
        (c for c in (
            "lsoa21cd", "lsoa_code", "lsoa11cd", "LSOA", "site_lsoa", "lsoa"
        ) if c in df.columns),
        None,
    )
    if col is None:
        _die(f"{path.name} lacks an LSOA column (expect one of "
             f"lsoa21cd/lsoa_code/lsoa11cd/LSOA/site_lsoa/lsoa).")
    codes = (
        df[col]
        .astype("string")
        .str.strip()
        .dropna()
        .unique()
    )
    _ok(f"{path.name}: using column '{col}' → {len(codes)} LSOAs")
    return codes

station_lsoas = _load_site_codes(STATIONS_CSV)
acute_lsoas   = _load_site_codes(ACUTE_CSV)

# Response mins: min(station→LSOA) per demand LSOA
g = (
    travel.loc[travel["origin_lsoa"].isin(station_lsoas)]
    .groupby("dest_lsoa")["time_car_min"]
    .min()
)
t_resp_min = g.reindex(lsoa_index).astype("float32").rename("t_resp_min")

# Conveyance mins: min(LSOA→acute) per demand LSOA
g = (
    travel.loc[travel["dest_lsoa"].isin(acute_lsoas)]
    .groupby("origin_lsoa")["time_car_min"]
    .min()
)
t_conv_min = g.reindex(lsoa_index).astype("float32").rename("t_conv_min")

# Persist baseline min-times + labels for 02b/02c
BASELINE_NPZ = MATRICES / "baseline_min_times.npz"
np.savez_compressed(
    BASELINE_NPZ,
    t_resp_base=t_resp_min.to_numpy(dtype="float32"),
    t_conv_base=t_conv_min.to_numpy(dtype="float32"),
    lsoa_codes=np.array(lsoa_index, dtype=object),
    station_lsoas=station_lsoas,
    acute_lsoas=acute_lsoas,
)
_save_prov(BASELINE_NPZ, {"source": "computed_from_travel"})
_ok(
    f"Baseline mins → {BASELINE_NPZ.name} | "
    f"resp(mean)={float(t_resp_min.mean()):.1f} | "
    f"conv(mean)={float(t_conv_min.mean()):.1f}"
)


[OK] ambulance_stations_icb.csv: using column 'lsoa21cd' → 14 LSOAs
[OK] acute_hospitals_icb.csv: using column 'lsoa21cd' → 3 LSOAs
[OK] Baseline mins → baseline_min_times.npz | resp(mean)=12.9 | conv(mean)=36.4


In [49]:
# Step 5 — Coverage flags + monotonicity

pop = (population.set_index("lsoa_code")["population"]
       .astype("float32").reindex(lsoa_index))

df = pd.DataFrame({
    "lsoa_code": lsoa_index,
    "population": pop.values,
    "t_resp_min": t_resp_min.values,
    "t_conv_min": t_conv_min.values,
}).set_index("lsoa_code")

for t in RESPONSE_THRESHOLDS:
    df[f"covered_T{t}"] = df["t_resp_min"] <= t
for t in SCENE_TO_AE_THRESHOLDS:
    df[f"conv_T{t}"] = df["t_conv_min"] <= t

covered_counts = {t: int(df[f"covered_T{t}"].sum()) for t in RESPONSE_THRESHOLDS}
print("[CHECK] Response coverage counts:", covered_counts)
_check_monotone(covered_counts)
_ok("Monotonicity confirmed.")


[CHECK] Response coverage counts: {7: 106, 15: 213, 18: 253, 40: 334}
[OK] Monotonicity confirmed.


In [50]:
# Step 6 — KPI summary CSV (+ map layer if geometry available)

def _summary_overall(frame: pd.DataFrame) -> pd.DataFrame:
    rows = []
    tot_pop = int(frame["population"].sum())
    for t in RESPONSE_THRESHOLDS:
        cov_pop = int(frame.loc[frame[f"covered_T{t}"], "population"].sum())
        rows.append({"dimension":"overall","group":"ALL","metric":f"resp_T{t}",
                     "covered_pop":cov_pop,"total_pop":tot_pop,
                     "covered_pct": round(100*cov_pop/tot_pop, 2) if tot_pop else 0.0})
    for t in SCENE_TO_AE_THRESHOLDS:
        cov_pop = int(frame.loc[frame[f"conv_T{t}"], "population"].sum())
        rows.append({"dimension":"overall","group":"ALL","metric":f"conv_T{t}",
                     "covered_pop":cov_pop,"total_pop":tot_pop,
                     "covered_pct": round(100*cov_pop/tot_pop, 2) if tot_pop else 0.0})
    return pd.DataFrame(rows)

coverage_summary = _summary_overall(df)
COVERAGE_CSV = TABLES / "coverage_summary.csv"
coverage_summary.to_csv(COVERAGE_CSV, index=False)
_save_prov(COVERAGE_CSV, {"sources":["population_by_lsoa.parquet","baseline_min_times.npz"]})
_ok(f"Wrote {COVERAGE_CSV.name} | rows={len(coverage_summary):,}")

# Equity splits can be added later; for now we persist a mapping layer if geometry exists
if hasattr(lsoa_g, "geometry"):
    FLAGS_GPKG = MAPS / "lsoa_flags_baseline.gpkg"
    cols = ["population","t_resp_min","t_conv_min"] + [f"covered_T{t}" for t in RESPONSE_THRESHOLDS]
    gout = lsoa_g.merge(df[cols], left_on="lsoa_code", right_index=True, how="right")
    gout = gpd.GeoDataFrame(gout, geometry=gout.geometry, crs=getattr(lsoa_g, "crs", None))
    gout.to_file(FLAGS_GPKG, layer="lsoa_flags_baseline", driver="GPKG")
    _save_prov(FLAGS_GPKG, {"layer":"lsoa_flags_baseline"})
    _ok(f"Wrote {FLAGS_GPKG.name} (layer='lsoa_flags_baseline')")
else:
    _warn("No geometry available; skipped map GPKG export.")

# Compact acceptance
print("\n=== ACCEPTANCE ===")
print("Universe LSOAs:", len(lsoa_index))
print("Population sum:", int(population['population'].sum()))
print("Coverage counts:", covered_counts)
print("Monotone:", all(b >= a for a, b in zip(
    [covered_counts[t] for t in sorted(RESPONSE_THRESHOLDS)],
    [covered_counts[t] for t in sorted(RESPONSE_THRESHOLDS)][1:]
)))


[OK] Wrote coverage_summary.csv | rows=7
[OK] Wrote lsoa_flags_baseline.gpkg (layer='lsoa_flags_baseline')

=== ACCEPTANCE ===
Universe LSOAs: 336
Population sum: 570604
Coverage counts: {7: 106, 15: 213, 18: 253, 40: 334}
Monotone: True
