# Prepare Master Parcel Attribute Table (MPAT) Inputs
*Author: Alemarie Ceria*

## Purpose

This notebook creates the analysis-ready inputs used to build the MPAT in `notebooks/02_build_mpat.ipynb`. Separating the preparation avoids reruns of expensive tasks. Each prep step writes a single canonical output and skips work if the output already exists, unless `FORCE=True`.

### Definitions

The following breaks down the data directories used in this notebook:

- `data/01_inputs/source/`: vendor/original downloads
- `data/01_inputs/prepared/`: validated/corrected/mosaicked + CRS-harmonized, used by MPAT
- `data/01_inputs/prepared/review_queue/`: outputs for manual review; may be incorporated into prepared inputs in later iterations but are not read directly by the MPAT build notebook
- `data/01_inputs/prepared/_archive/`: outdated prepared layers moved here into dated subfolders (e.g., `_archive/20260111/`) when schema changes or reprocessing occurs; preserves audit trail without cluttering active directory
- `data/02_interim/`: temporary working layers

### Outputs

- `data/01_inputs/prepared/`
    - `parcels_hi_higp_validated_32604.gpkg`
    - `cesspools_inventory_hi_hcpt_corrected_32604.gpkg`
    - `dem_hi_pacioos_mosaic_32604.tif`
    - `watertable_hi_hcpt_mosaic_32604.tif`
    - `annual_rainfall_hi_hcpt_32604.tif`
    - `coastline_hi_op_32604.gpkg`
    - `sma_hi_op_32604.gpkg`
- `data/01_inputs/prepared/review_queue/`
    - `cesspools_inventory_hi_hcpt_review_queue_32604.gpkg`

### Input / Output Contract

- This notebook reads only from `data/01_inputs/source/` and any existing prepared inputs.
- Canonical prepared inputs are written to `data/01_inputs/prepared/`.
- Features requiring manual review are written to `data/01_inputs/prepared/review_queue/` and are not read directly by the MPAT build notebook.
- Temporary working layers created during preparation are written to `data/02_interim/` and may be deleted and rebuilt.
- The MPAT build notebook (`02_build_mpat.ipynb`) reads only from `data/01_inputs/prepared/`.

### Workflow Overview

The main tasks of this notebook are: 

1. Validating the parcel layer and dissolving known duplicate parcel with cesspools
2. Correcting TMK string values and geometries in the cesspool layer
3. Mosaicking the multi-raster water table layers
4. Reprojecting all layers to `EPSG:32604` to ensure consistent spatial operations
5. Export vectors as geopackages and rasters to GeoTIFFs for consistency

## Setup

### Imports

In [1]:
from pathlib import Path
# import yaml
import time
from contextlib import contextmanager
import geopandas as gpd
import pandas as pd
import numpy as np
import janitor
from shapely import make_valid
import rasterio
from rasterio.merge import merge
from rasterio.warp import calculate_default_transform, reproject, Resampling
from rasterstats import point_query
import folium
from folium.plugins import GroupedLayerControl
from IPython.display import display

### Configurations

In [None]:
CONFIG = {
    "crs": {
        "target": "EPSG:32604"
    },
    "paths": {
        "source": "data/01_inputs/source",
        "prepared": "data/01_inputs/prepared",
        "review_queue": "data/01_inputs/prepared/_review_queue",
        "interim": "data/02_interim"
    },
    "inputs": {
        "source": {
            "cesspools": "cesspools_inventory_hi_hcpt/2025CP_Expld_V2_clean.shp",
            "dem_dir": "dem_hi_pacioos",
            "watertable_dir": "watertable_hi_hcpt",
            "slope_dir": "slope_hi_hcpt",
            "streams": "streams_hi_hcpt/Streams_prj.shp",
            "mun_wells": "wells_hi_hcpt/CWRM_Wells_MUN_prj.shp",
            "dom_wells": "wells_hi_hcpt/CWRM_Wells_DOM_prj.shp",
            "rainfall": "annual_rainfall_hi_hcpt/Rain_inann.tif",
            "coastline": "coastline_hi_op/coastline.shp",
            "sma": "sma_hi_op/sma.shp",
        },
        "interim": {
            "parcels_repaired": "tempspace/parcels_hi_higp_repaired_32604/parcels_hi_higp_repaired_32604.shp",
        }
    },
    "outputs": {
        # Base mpat layers
        "parcels_preprocessed": "parcels_hi_higp_preprocessed_32604.gpkg",
        "cesspools_corrected": "cesspools_inventory_hi_hcpt_corrected_32604.gpkg",
        "cesspools_review": "cesspools_inventory_hi_hcpt_review_queue_32604.gpkg",

        # Rasters
        "dem_mosaic": "dem_hi_pacioos_mosaic_32604.tif",
        "watertable_mosaic": "watertable_hi_hcpt_mosaic_32604.tif",
        "rainfall": "annual_rainfall_hi_hcpt_32604.tif",

        # Vectors
        "streams":   "streams_hi_hcpt_32604.gpkg",
        "mun_wells": "wells_hi_hcpt_mun_32604.gpkg",
        "dom_wells": "wells_hi_hcpt_dom_32604.gpkg",
        "coastline": "coastline_hi_op_32604.gpkg",
        "sma": "sma_hi_op_32604.gpkg",
    },
    "fields": {
        "parcels": {
            "keep_cols": ["tmk_txt", "island"],
            "rename_map": {"tmk_txt": "tmk_parcels"},
            "tmk_col": "tmk_parcels",  # Name after renaming, used for QA
            "target_dup_tmk": "139019004"
        },
        "cesspools": {
            "keep_cols": [
                "tmk", "class_i", "class_ii", "class_iii", "class_iv", 
                "class_v", "osds_qty", "bedroom", "effluent", "nitrogen_f", "phosphorus"
            ],
            "rename_map": {"tmk": "tmk_cps", "nitrogen_f": "nitrogen"},
            "tmk_col": "tmk_cps",      # Name after renaming
            "filter_col": "class_iv",  # Filter to class_iv != 0
            "filter_value": 0
        }
    },
    "thresholds": {
        "tmk_correction_distance_m": 25
    },
    "run": {
        "force": False,
        "write_log": True,
        "dry_run": False,
        "stop_on_error": True
    }
}

### Paths

In [3]:
# Project root
PROJECT_ROOT = Path.cwd().parent

# Data directories
SOURCE = PROJECT_ROOT / CONFIG["paths"]["source"]
PREP = PROJECT_ROOT / CONFIG["paths"]["prepared"]
REVIEW = PROJECT_ROOT / CONFIG["paths"]["review_queue"]
INTERIM = PROJECT_ROOT / CONFIG["paths"]["interim"]

# CRS
TARGET_CRS = CONFIG["crs"]["target"]

# Inputs
IN_SOURCE = {k: SOURCE / v for k, v in CONFIG["inputs"]["source"].items()}
IN_INTERIM = {k: INTERIM / v for k, v in CONFIG["inputs"]["interim"].items()}

# Outputs
OUT_PREP = {k: PREP / v for k, v in CONFIG["outputs"].items() if "review" not in k}
OUT_REVIEW = {
    "cesspools_review": REVIEW / CONFIG["outputs"]["cesspools_review"]
}

In [4]:
# Export configs to config.yaml
# yaml_path = PROJECT_ROOT / "config.yaml"
# yaml_path.write_text(yaml.safe_dump(CONFIG, sort_keys=False))

In [5]:
# Load config.yaml
# CONFIG = yaml.safe_load((PROJECT_ROOT / "config.yaml").read_text())

### Run Controls

For execution choices. Allows us to define:

- Skip (assumes existing file matches current processing steps) <br>or rerun expensive step (do when output schema is changed)
- Print progress/QA messages
- Run partial code 

In [6]:
FORCE = CONFIG["run"]["force"]                    # rerun steps even if outputs exist
WRITE_LOG = CONFIG["run"]["write_log"]            # print progress and QA messages
DRY_RUN = CONFIG["run"]["dry_run"]                # define steps but do not write outputs
STOP_ON_ERROR = CONFIG["run"]["stop_on_error"]

In [7]:
# Example step pattern
# if output_path.exists() and not FORCE:
#     log("Output exists; skipping.")
# else:
#     try:
#         if DRY_RUN:
#             log(f"[DRY RUN] Would write: {output_path}")
#         else:
#             # do work
#             write_output(...)
#             log(f"Wrote: {output_path}")
#     except Exception as e:
#         log(f"ERROR: {e}")
#         if STOP_ON_ERROR:
#             raise

### Helper Functions

#### Utility

In [8]:
def log(msg: str) -> None:
    """
    Print a formatted progress or QA message if logging is enabled.

    This helper centralizes all user-facing messages so verbosity can be
    controlled globally via the WRITE_LOG run flag.

    Example
    -------
    log("Loaded parcels layer")
    """
    if WRITE_LOG:
        print(f"\n--- {msg} ---")

def should_run(out_path: Path, force: bool) -> bool:
    """
    Determine whether a processing step should execute.

    A step should run if:
    - the expected output does not yet exist, or
    - FORCE is True (explicit override).

    This supports safe re-runs of the notebook without unintentionally
    recomputing expensive steps.

    Example
    -------
    if should_run(out_path, FORCE):
        run_step()
    else:
        log("Output exists; skipping.")
    """
    return force or (not out_path.exists())

@contextmanager
def timed_step(label: str):
    """
    Context manager to time an expensive processing step.

    Timing messages are logged only when WRITE_LOG is True.
    Uses wall-clock time (time.perf_counter) for accurate runtime reporting.

    Example
    -------
    with timed_step("Geometry validation"):
        parcels["geometry"] = parcels["geometry"].apply(make_valid)
    """
    if not WRITE_LOG:
        # No timing or logging; just run the block
        yield
        return

    start = time.perf_counter()
    try:
        yield
    finally:
        elapsed = time.perf_counter() - start
        if elapsed < 60:
            log(f"{label} completed in {elapsed:.2f} seconds")
        else:
            log(f"{label} completed in {elapsed / 60:.2f} minutes")

#### I/O

In [9]:
def read_vector(path, layer_name: str):
    """
    Read a vector dataset into a GeoDataFrame.

    Example
    -------
    parcels = read_vector(in_path, "parcels_repaired")
    """
    log(f"Reading {layer_name}: {path}")
    with timed_step(f"Read {layer_name}"):
        gdf = gpd.read_file(path).clean_names()
    log(f"{layer_name}: rows={len(gdf):,} | crs={gdf.crs}")
    return gdf

def write_vector_gpkg(gdf, out_path, layer_name: str):
    """
    Write a GeoDataFrame to a GeoPackage using Fiona (stable CRS writing in this env).

    Example
    -------
    write_vector_gpkg(parcels, OUT_PREP["parcels_validated"], "parcels_validated")
    """
    out_path.parent.mkdir(parents=True, exist_ok=True)
    with timed_step(f"Write {layer_name} (GPKG)"):
        gdf.to_file(out_path, layer=layer_name, driver="GPKG", engine="fiona")
    log(f"Wrote: {out_path}")


#### Vector

In [10]:
def ensure_vector_crs(gdf, target_crs: str, layer_name: str):
    """
    Ensure a GeoDataFrame is in the target CRS.

    Reprojects only if the CRS differs. Logs the action taken.

    Example
    -------
    parcels = ensure_vector_crs(parcels, TARGET_CRS, "parcels")
    """
    if gdf.crs is None:
        raise ValueError(f"{layer_name} CRS is missing. Cannot reproject safely.")

    target = gpd.GeoSeries([], crs=target_crs).crs  # normalize to CRS object

    if gdf.crs != target:
        log(f"{layer_name}: reprojecting from {gdf.crs} to {target_crs}")
        with timed_step(f"Reproject {layer_name}"):
            return gdf.to_crs(target)

    log(f"{layer_name}: already in target CRS ({target_crs})")
    return gdf

def validate_geometry_if_needed(gdf, layer_name: str):
    """
    Validate geometries using shapely.make_valid only when invalid geometries exist.

    Example
    -------
    parcels = validate_geometry_if_needed(parcels, "parcels")
    """
    n_invalid_before = int((~gdf.is_valid).sum())
    log(f"{layer_name}: invalid geometries (before): {n_invalid_before:,}")

    if n_invalid_before > 0:
        with timed_step(f"{layer_name}: make_valid"):
            invalid_mask = ~gdf.is_valid
            gdf.loc[invalid_mask, "geometry"] = (
                gdf.loc[invalid_mask, "geometry"].apply(make_valid)
            )

    n_invalid_after = int((~gdf.is_valid).sum())
    log(f"{layer_name}: invalid geometries (after):  {n_invalid_after:,}")

    if n_invalid_after > 0:
        log(f"WARNING: {layer_name} still has invalid geometries after make_valid.")

    return gdf

def check_geometry_quality(gdf, layer_name: str) -> dict:
    """
    Check for null, empty, and invalid geometries.
    
    Returns dict with counts for logging/decisions.
    
    Example
    -------
    geom_qa = check_geometry_quality(parcels, "parcels")
    """
    qa = {
        "null": int(gdf.geometry.isna().sum()),
        "empty": int(gdf.geometry.is_empty.sum()),
        "invalid": int((~gdf.is_valid).sum()),
    }
    log(f"{layer_name} geometry QA: null={qa['null']:,}, empty={qa['empty']:,}, invalid={qa['invalid']:,}")
    return qa

def summarize_vector(path: Path, name: str) -> dict:
    """
    Get summary stats for a vector file (GPKG/SHP/etc).

    Example
    -------
    stats = summarize_vector(OUT_PREP["parcels_validated"], "parcels_validated")
    """
    gdf = gpd.read_file(path)
    return {
        "name": name,
        "path": path.name,
        "crs": str(gdf.crs),
        "rows": len(gdf),
        "columns": len(gdf.columns),
    }

def dissolve_duplicate_tmk_polygons(
    parcels: gpd.GeoDataFrame,
    tmk_col: str,
    target_tmk: str,
    layer_name: str = "parcels",
) -> gpd.GeoDataFrame:
    if parcels is None or parcels.empty:
        log(f"{layer_name}: empty; skipping targeted dissolve.")
        return parcels

    if tmk_col not in parcels.columns:
        raise KeyError(f"{layer_name}: missing TMK column '{tmk_col}'")
    if "geometry" not in parcels.columns:
        raise KeyError(f"{layer_name}: missing geometry column")

    gdf = parcels.copy()
    gdf[tmk_col] = gdf[tmk_col].astype("string")
    target_tmk = str(target_tmk)

    target_mask = (gdf[tmk_col] == target_tmk)
    existing_n = int(target_mask.sum())
    log(f"{layer_name}: targeted dissolve TMK={target_tmk} occurrences={existing_n:,}")

    if existing_n <= 1:
        log(f"{layer_name}: nothing to dissolve (<=1 occurrence).")
        return gdf

    dup_parts = gdf.loc[target_mask].copy()
    dup_parts = dup_parts.loc[dup_parts.geometry.notna() & ~dup_parts.geometry.is_empty].copy()

    if len(dup_parts) < 2:
        log(f"{layer_name}: duplicates exist but <2 non-empty geometries; skipping.")
        return gdf

    geoms = dup_parts.geometry.tolist()
    with timed_step(f"{layer_name}: dissolve {target_tmk} (pairwise union)"):
        merged_geom = geoms[0]
        for geom in geoms[1:]:
            merged_geom = merged_geom.union(geom)

    if merged_geom is None or merged_geom.is_empty:
        raise ValueError(f"{layer_name}: union produced empty geometry for TMK {target_tmk}")

    parcels_keep = gdf.loc[~target_mask].copy()

    merged_row = dup_parts.iloc[[0]].copy()
    merged_row.geometry = [merged_geom]

    # IMPORTANT: assign a plain string scalar (avoid dtype weirdness)
    merged_row[tmk_col] = target_tmk

    out = pd.concat([parcels_keep, merged_row], ignore_index=True)
    out = gpd.GeoDataFrame(out, geometry="geometry", crs=gdf.crs)

    # IMPORTANT: compare against plain string, handle NA safely
    final_n = int((out[tmk_col].astype("string").fillna("") == target_tmk).sum())
    if final_n != 1:
        raise ValueError(f"{layer_name}: expected 1 row for {target_tmk}, got {final_n}")

    log(f"{layer_name}: dissolved TMK {target_tmk} to 1 row.")
    return out


#### Raster

In [11]:
def check_raster_crs(input_dir: Path, layer_name: str) -> dict:
    """
    Check CRS status of all rasters in a directory.
    
    Use before mosaicking to identify files needing CRS assignment.
    
    Parameters
    ----------
    input_dir : Path
        Directory containing .tif files.
    layer_name : str
        Name for logging.
        
    Returns
    -------
    dict
        Keys: 'valid' (list of filenames with CRS), 
              'missing' (list of filenames without CRS),
              'crs_values' (dict of filename: CRS string)
        
    Example
    -------
    crs_check = check_raster_crs(SOURCE / "watertable_hi_hcpt", "watertable")
    # Returns: {'valid': ['oahu_wt.tif', ...], 'missing': ['hawaii_wt.tif'], 'crs_values': {...}}
    """
    tif_files = sorted(input_dir.glob("*.tif"))
    log(f"{layer_name}: checking CRS for {len(tif_files)} rasters")
    
    result = {"valid": [], "missing": [], "crs_values": {}}
    
    for tif_path in tif_files:
        with rasterio.open(tif_path) as src:
            if src.crs is None:
                result["missing"].append(tif_path.name)
                result["crs_values"][tif_path.name] = None
                log(f"  {tif_path.name}: NO CRS")
            else:
                result["valid"].append(tif_path.name)
                result["crs_values"][tif_path.name] = str(src.crs)
                log(f"  {tif_path.name}: {src.crs}")
    
    if result["missing"]:
        log(f"{layer_name}: {len(result['missing'])} raster(s) missing CRS")
    else:
        log(f"{layer_name}: all rasters have valid CRS")
    
    return result
    
def set_raster_crs(
    in_path: Path,
    out_path: Path,
    epsg: int,
    layer_name: str,
    force: bool = False,
    dry_run: bool = False,
):
    """
    Assign a CRS to a raster that has no spatial reference.
    
    This does NOT reproject the data; it only sets the CRS metadata.
    Use when you know what CRS the raster should have but it's missing.
    
    Parameters
    ----------
    in_path : Path
        Input raster path (missing CRS).
    out_path : Path
        Output raster path (with CRS assigned).
    epsg : int
        EPSG code to assign (e.g., 4326, 32604).
    layer_name : str
        Name for logging.
    force : bool
        Overwrite output if exists.
    dry_run : bool
        Log actions without writing.
        
    Returns
    -------
    Path
        Path to output raster with CRS assigned.
        
    Example
    -------
    # Fix raster with missing CRS before mosaicking
    set_raster_crs(
        in_path=SOURCE / "watertable_hi_hcpt" / "hawaii_wt.tif",
        out_path=INTERIM / "hawaii_wt_crs_fixed.tif",
        epsg=4326,
        layer_name="watertable_hawaii",
        force=False,
        dry_run=False,
    )
    """
    if out_path.exists() and not force:
        log(f"{layer_name}: output exists; skipping CRS assignment.")
        return out_path
    
    with rasterio.open(in_path) as src:
        if src.crs is not None:
            log(f"WARNING: {layer_name} already has CRS={src.crs}. Skipping.")
            return in_path
        
        log(f"{layer_name}: assigning EPSG:{epsg} to raster with no CRS")
        
        if dry_run:
            log(f"[DRY RUN] Would write: {out_path}")
            return out_path
        
        profile = src.profile.copy()
        profile.update(crs=rasterio.crs.CRS.from_epsg(epsg))
        
        out_path.parent.mkdir(parents=True, exist_ok=True)
        
        with timed_step(f"Set CRS for {layer_name}"):
            with rasterio.open(out_path, "w", **profile) as dst:
                dst.write(src.read())
    
    log(f"Wrote: {out_path}")
    return out_path

def mosaic_rasters(
    input_dir: Path,
    out_path: Path,
    layer_name: str,
    force: bool = False,
    dry_run: bool = False,
):
    if out_path.exists() and not force:
        log(f"{layer_name}: mosaic exists; skipping.")
        return out_path

    tif_files = sorted(input_dir.glob("*.tif"))
    log(f"{layer_name}: found {len(tif_files)} input rasters")

    if len(tif_files) == 0:
        raise FileNotFoundError(f"No .tif files found in {input_dir}")

    datasets = []
    crs_values = set()
    nodata_values = set()

    for tif_path in tif_files:
        src = rasterio.open(tif_path)
        if src.crs is None:
            for ds in datasets:
                ds.close()
            raise ValueError(f"{layer_name}: missing CRS in {tif_path.name}. Fix before mosaicking.")

        datasets.append(src)
        crs_values.add(str(src.crs))
        nodata_values.add(src.nodata)

    # IMPORTANT: merge assumes inputs are already in the same CRS/grid space
    if len(crs_values) > 1:
        for ds in datasets:
            ds.close()
        raise ValueError(f"{layer_name}: inputs have multiple CRS values; reproject first. Found: {sorted(crs_values)}")

    # Choose nodata (prefer existing; otherwise set a safe DEM nodata)
    nodata = next((v for v in nodata_values if v is not None), None)
    if nodata is None:
        nodata = -9999.0
        log(f"WARNING {layer_name}: inputs have no nodata; using {nodata} for mosaic output.")

    if dry_run:
        log(f"[DRY RUN] Would mosaic {len(datasets)} rasters to {out_path}")
        for ds in datasets:
            ds.close()
        return out_path

    with timed_step(f"Mosaic {layer_name}"):
        mosaic_data, mosaic_transform = merge(
            datasets,
            nodata=nodata,
        )

        mosaic_crs = datasets[0].crs
        mosaic_dtype = datasets[0].dtypes[0]

        for ds in datasets:
            ds.close()

        profile = {
            "driver": "GTiff",
            "height": mosaic_data.shape[1],
            "width": mosaic_data.shape[2],
            "count": mosaic_data.shape[0],
            "dtype": mosaic_dtype,
            "crs": mosaic_crs,
            "transform": mosaic_transform,
            "nodata": nodata,
        }

        out_path.parent.mkdir(parents=True, exist_ok=True)
        with rasterio.open(out_path, "w", **profile) as dst:
            dst.write(mosaic_data)

    log(f"Wrote: {out_path}")
    return out_path
    
def ensure_raster_crs(
    in_path,
    out_path,
    target_epsg: int,
    layer_name: str,
    force: bool = False,
    dry_run: bool = False,
):
    """
    Ensure a raster is in the target CRS by writing a CRS-harmonized copy.

    Behavior
    --------
    - If out_path exists and force=False: skip and return out_path
    - If input CRS matches target EPSG: copy raster to out_path
    - Otherwise: reproject raster to target EPSG and write out_path

    Parameters
    ----------
    in_path : pathlib.Path
        Input raster path.
    out_path : pathlib.Path
        Output raster path (prepared input).
    target_epsg : int
        Target EPSG code (e.g., 32604).
    layer_name : str
        Name used in log messages.
    force : bool
        If True, overwrite/recompute even if out_path exists.
    dry_run : bool
        If True, log actions but do not write outputs.

    Returns
    -------
    pathlib.Path
        Path to the raster in the target CRS (out_path).

    Example
    -------
    ensure_raster_crs(
        in_path=SOURCE / "annual_rainfall_hi_hcpt" / "Rain_inann.tif",
        out_path=OUT_PREP["rainfall"],
        target_epsg=32604,
        layer_name="rainfall",
        force=FORCE,
        dry_run=DRY_RUN,
    )
    """
    if out_path.exists() and not force:
        log(f"{layer_name}: output exists; skipping CRS ensure step.")
        return out_path

    with rasterio.open(in_path) as src:
        if src.crs is None:
            raise ValueError(f"{layer_name} CRS is missing. Cannot reproject safely.")

        target_crs = rasterio.crs.CRS.from_epsg(target_epsg)

        # Ensure output directory exists (unless dry run)
        if not dry_run:
            out_path.parent.mkdir(parents=True, exist_ok=True)

        # -----------------------------
        # Case 1: Already in target CRS -> copy
        # -----------------------------
        if src.crs == target_crs:
            log(f"{layer_name}: CRS already EPSG:{target_epsg}")

            if dry_run:
                log(f"[DRY RUN] Would copy: {in_path} -> {out_path}")
                return out_path

            with timed_step(f"Copy {layer_name} (already in target CRS)"):
                profile = src.profile.copy()
                data = src.read()

                with rasterio.open(out_path, "w", **profile) as dst:
                    dst.write(data)

            log(f"Wrote: {out_path}")
            return out_path

        # -----------------------------
        # Case 2: CRS differs -> reproject
        # -----------------------------
        log(f"{layer_name}: reprojecting raster -> EPSG:{target_epsg}")

        transform, width, height = calculate_default_transform(
            src.crs, target_crs, src.width, src.height, *src.bounds
        )

        nodata = src.nodata
        if nodata is None:
            nodata = -9999.0
            log(f"WARNING {layer_name}: src nodata missing; using {nodata} for reprojection output.")

        profile = src.profile.copy()
        profile.update(
            crs=target_crs,
            transform=transform,
            width=width,
            height=height,
            nodata=nodata,
        )

        if dry_run:
            log(f"[DRY RUN] Would reproject: {in_path} -> {out_path}")
            return out_path

        with timed_step(f"Reproject {layer_name}"):
            with rasterio.open(out_path, "w", **profile) as dst:
                for i in range(1, src.count + 1):
                    reproject(
                        source=rasterio.band(src, i),
                        destination=rasterio.band(dst, i),
                        src_transform=src.transform,
                        src_crs=src.crs,
                        dst_transform=transform,
                        dst_crs=target_crs,
                        src_nodata=src.nodata,
                        dst_nodata=nodata,
                        resampling=Resampling.bilinear,
                    )

        log(f"Wrote: {out_path}")
        return out_path

def summarize_raster(path: Path, name: str) -> dict:
    """
    Get summary stats for a raster file.
    
    Example
    -------
    stats = summarize_raster(OUT_PREP["dem_mosaic"], "dem")
    """
    with rasterio.open(path) as src:
        return {
            "name": name,
            "path": path.name,
            "crs": str(src.crs),
            "dimensions": f"{src.width} x {src.height}",
            "resolution": f"{src.res[0]:.2f} x {src.res[1]:.2f}",
            "dtype": src.dtypes[0],
            "bands": src.count,
            "size_mb": path.stat().st_size / (1024 * 1024),
        }

#### Prep

In [12]:
def is_valid_tmk(tmk) -> bool:
    """
    Check if TMK is valid: 9 digits, numeric only, not null.
    
    Example
    -------
    valid_mask = parcels['tmk'].apply(is_valid_tmk)
    """
    if pd.isna(tmk):
        return False
    s = str(tmk).strip()
    return len(s) == 9 and s.isdigit()

def ensure_tmk_string(gdf, tmk_col: str, layer_name: str):
    """
    Ensure TMK column is string type (9 digits, zero-padded).
    
    Handles numeric TMKs that lost leading zeros by converting through Int64.
    
    Parameters
    ----------
    gdf : GeoDataFrame
        Input GeoDataFrame.
    tmk_col : str
        Name of TMK column.
    layer_name : str
        Name for logging.
        
    Returns
    -------
    GeoDataFrame
        GeoDataFrame with TMK as string type.
        
    Example
    -------
    parcels = ensure_tmk_string(parcels, "tmk_parcels", "parcels")
    """
    if gdf[tmk_col].dtype == "string":
        log(f"{layer_name}: {tmk_col} already string type")
        return gdf
    
    log(f"{layer_name}: converting {tmk_col} to string (was {gdf[tmk_col].dtype})")
    gdf[tmk_col] = (
        pd.to_numeric(gdf[tmk_col], errors="coerce")
        .round(0)
        .astype("Int64")
        .astype("string")
        .str.zfill(9)  # Zero-pad to 9 digits
    )
    return gdf

def load_and_prep_cesspools(
    in_path: Path,
    keep_cols: list,
    rename_map: dict,
    tmk_col: str,
    filter_col: str,
    filter_value,
    target_crs: str,
) -> gpd.GeoDataFrame:
    """
    Load cesspool layer and apply standard prep: subset, rename, reproject, 
    validate geometry, ensure TMK is string, filter to class IV.
    
    Example
    -------
    cesspools = load_and_prep_cesspools(
        in_path=SOURCE / "cesspools.shp",
        keep_cols=CONFIG["fields"]["cesspools"]["keep_cols"],
        rename_map=CONFIG["fields"]["cesspools"]["rename_map"],
        tmk_col="tmk_cps",
        filter_col="class_iv",
        filter_value=0,
        target_crs=TARGET_CRS,
    )
    """
    # Load
    cesspools = read_vector(in_path, "cesspools_raw")
    
    # Subset columns
    cols_with_geom = keep_cols + ["geometry"]
    log(f"Subsetting to columns: {cols_with_geom}")
    cesspools = cesspools[cols_with_geom].copy()
    
    # Rename columns
    log(f"Renaming columns: {rename_map}")
    cesspools = cesspools.rename(columns=rename_map)
    
    # Reproject
    cesspools = ensure_vector_crs(cesspools, target_crs, "cesspools")
    
    # Validate geometries
    cesspools = validate_geometry_if_needed(cesspools, "cesspools")
    
    # Remove null/empty geometries
    n_before = len(cesspools)
    null_mask = cesspools.geometry.isna()
    empty_mask = cesspools.geometry.is_empty
    cesspools = cesspools[~null_mask & ~empty_mask].copy()
    n_removed = n_before - len(cesspools)
    if n_removed > 0:
        log(f"Removed {n_removed:,} cesspools with null/empty geometries")
    
    # Ensure TMK is string
    cesspools = ensure_tmk_string(cesspools, tmk_col, "cesspools")
    
    # Filter to class IV (filter_col != filter_value)
    n_before = len(cesspools)
    cesspools = cesspools[cesspools[filter_col] != filter_value].copy()
    n_filtered = n_before - len(cesspools)
    log(f"Filtered to {filter_col} != {filter_value}: {len(cesspools):,} rows ({n_filtered:,} removed)")
    
    return cesspools


def check_tmk_correction_preconditions(
    cesspools: gpd.GeoDataFrame,
    parcels: gpd.GeoDataFrame,
    cp_tmk_col: str,
    pr_tmk_col: str,
    target_crs: str,
    thresh_m: float,
) -> int:
    """
    Verify inputs before TMK correction workflow. Raises errors if preconditions fail.
    Returns count of malformed cesspool TMKs (for logging).
    
    Example
    -------
    n_malformed = check_tmk_correction_preconditions(
        cesspools, parcels, "tmk_cps", "tmk_parcels", TARGET_CRS, 25
    )
    """
    # Ensure required columns exist
    required_cp = [cp_tmk_col, "geometry"]
    required_pr = [pr_tmk_col, "geometry"]
    
    missing_cp = [c for c in required_cp if c not in cesspools.columns]
    missing_pr = [c for c in required_pr if c not in parcels.columns]
    
    if missing_cp:
        raise KeyError(f"cesspools missing required columns: {missing_cp}")
    if missing_pr:
        raise KeyError(f"parcels missing required columns: {missing_pr}")
    
    # Ensure CRS match
    if cesspools.crs is None:
        raise ValueError("cesspools has no CRS set.")
    if parcels.crs is None:
        raise ValueError("parcels has no CRS set.")
    if cesspools.crs != parcels.crs:
        raise ValueError(f"CRS mismatch: cesspools={cesspools.crs}, parcels={parcels.crs}")
    
    # Ensure no null/empty geometries
    if cesspools.geometry.isna().any() or cesspools.geometry.is_empty.any():
        raise ValueError("cesspools contains null/empty geometries.")
    if parcels.geometry.isna().any() or parcels.geometry.is_empty.any():
        raise ValueError("parcels contains null/empty geometries.")
    
    # Parcels TMK quality (authoritative - should be clean)
    pr_invalid = ~parcels[pr_tmk_col].apply(is_valid_tmk)
    if pr_invalid.any():
        raise ValueError(
            f"Parcels (authoritative) has {pr_invalid.sum():,} invalid TMKs. Fix upstream."
        )
    
    # Cesspools TMK quality (will be corrected)
    cp_malformed_count = int((~cesspools[cp_tmk_col].apply(is_valid_tmk)).sum())
    
    log("Preconditions passed âœ“")
    log(f"  Cesspools: {len(cesspools):,} | Parcels: {len(parcels):,}")
    log(f"  CRS: {target_crs} | Threshold: {thresh_m}m")
    log(f"  Malformed cesspool TMKs: {cp_malformed_count:,} (will be flagged)")
    
    return cp_malformed_count

def prep_vector(
    in_path: Path,
    out_path: Path,
    layer_name: str,
    target_crs: str,
    keep_cols: list[str] | None = None,
    rename_map: dict[str, str] | None = None,
):
    """
    Prep a vector dataset to the project target CRS and write to a single-layer GPKG.

    Workflow:
      read -> validate geometry -> reproject if needed -> optional subset/rename -> QA -> write gpkg
    """
    gdf = read_vector(in_path, layer_name)

    gdf = validate_geometry_if_needed(gdf, layer_name)
    gdf = ensure_vector_crs(gdf, target_crs, layer_name)

    if keep_cols is not None:
        keep_cols = keep_cols + ["geometry"]
        missing = [c for c in keep_cols if c not in gdf.columns]
        if missing:
            raise KeyError(f"{layer_name}: missing required columns: {missing}")

        log(f"{layer_name}: subsetting to columns: {keep_cols}")
        gdf = gdf[keep_cols].copy()

    if rename_map:
        log(f"{layer_name}: renaming columns: {rename_map}")
        gdf = gdf.rename(columns=rename_map)

    _ = check_geometry_quality(gdf, layer_name)

    write_vector_gpkg(gdf, out_path, layer_name)
    return gdf

#### QA/QC

In [13]:
def add_island_column(gdf, tmk_col: str, out_col: str = "island"):
    """
    Add island name derived from the first TMK digit.

    TMK convention used here:
      1 = Hawaii, 2 = Maui, 3 = Oahu, 4 = Kauai
    """
    island_map = {"1": "Hawaii", "2": "Maui", "3": "Oahu", "4": "Kauai"}

    if gdf is None or len(gdf) == 0:
        return gdf

    if tmk_col not in gdf.columns:
        gdf[out_col] = "Unknown"
        return gdf

    gdf[out_col] = gdf[tmk_col].astype("string").str[0].map(island_map).fillna("Unknown")
    return gdf


def _style(fill_color: str, line_color: str, weight: int = 2, fill_opacity: float = 0.3):
    """
    Folium style function factory (avoids lambda closure issues).
    """
    def fn(_):
        return {
            "fillColor": fill_color,
            "color": line_color,
            "weight": weight,
            "fillOpacity": fill_opacity,
        }
    return fn


def build_review_qaqc_map(
    review_main: gpd.GeoDataFrame,
    parcels: gpd.GeoDataFrame,
    web_crs: str,
    cp_tmk_col: str,
    pr_tmk_col: str,
    nearest_pr_tmk_col: str = "nearest_tmk_parcels",
    status_col: str = "tmk_status",
    last_digit_col: str = "last_digit_off_by_1",
    dist_nearest_col: str = "dist_to_nearest_m",
    dist_claimed_col: str = "dist_to_tmk_cps_parcel_m",
):
    """
    Build an interactive Folium QA/QC map for review cases.

    Expects review_main to contain only review rows (not the full corrected layer),
    but will still work if review_main includes other statuses.

    Returns
    -------
    folium.Map
    """
    # -----------------------------
    # Preconditions
    # -----------------------------
    required = [status_col, cp_tmk_col, "geometry"]
    missing = [c for c in required if c not in review_main.columns]
    if missing:
        raise KeyError(f"review_main missing required columns: {missing}")

    if review_main.empty:
        log("QA/QC map: review_main is empty; nothing to map.")
        return None

    # Work on copies (do not mutate upstream objects)
    review = review_main.copy()
    pr = parcels[[pr_tmk_col, "geometry"]].copy()

    # Ensure WEB CRS for mapping
    review_web = review.to_crs(web_crs)
    pr_web = pr.to_crs(web_crs)

    # Normalize last_digit_off_by_1 to bool when present
    if last_digit_col in review_web.columns:
        # Accept bool, 0/1, "0"/"1", NaN
        review_web[last_digit_col] = (
            pd.to_numeric(review_web[last_digit_col], errors="coerce")
            .fillna(0)
            .astype(int)
            .astype(bool)
        )

    # Island label (non-blocking if TMK malformed/NA)
    island_map = {"1": "Hawaii", "2": "Maui", "3": "Oahu", "4": "Kauai"}
    review_web["island"] = (
        review_web[cp_tmk_col].astype("string").str[0].map(island_map).fillna("Unknown")
    )

    # -----------------------------
    # Split review subsets
    # -----------------------------
    review_hit = review_web.loc[review_web[status_col] == "hit_but_tmk_exists_elsewhere"].copy()
    review_nohit = review_web.loc[review_web[status_col] == "review_nohit_beyond_threshold"].copy()
    review_malformed = review_web.loc[review_web[status_col] == "tmk_malformed"].copy()

    if len(review_nohit) > 0 and last_digit_col in review_nohit.columns:
        review_nohit_off_by_1 = review_nohit.loc[review_nohit[last_digit_col] == True].copy()
        review_nohit_way_off = review_nohit.loc[review_nohit[last_digit_col] == False].copy()
    else:
        review_nohit_off_by_1 = gpd.GeoDataFrame(columns=review_web.columns, crs=review_web.crs)
        review_nohit_way_off = gpd.GeoDataFrame(columns=review_web.columns, crs=review_web.crs)

    # -----------------------------
    # Map extent
    # -----------------------------
    all_bounds = review_web.total_bounds  # [minx, miny, maxx, maxy]
    center_lat = (all_bounds[1] + all_bounds[3]) / 2
    center_lon = (all_bounds[0] + all_bounds[2]) / 2

    m = folium.Map(location=[center_lat, center_lon], zoom_start=8)
    m.fit_bounds([[all_bounds[1], all_bounds[0]], [all_bounds[3], all_bounds[2]]])

    # -----------------------------
    # HIT AMBIGUOUS
    # -----------------------------
    if len(review_hit) > 0:
        fg_hit_parcel_a = folium.FeatureGroup(name="[HIT AMBIGUOUS] Parcel A: where point landed (purple)")
        fg_hit_parcel_b = folium.FeatureGroup(name="[HIT AMBIGUOUS] Parcel B: claimed by tmk_cps (blue)")
        fg_hit_points = folium.FeatureGroup(name="[HIT AMBIGUOUS] Cesspool points (blue)")

        hit_parcel_a_tmks = review_hit[pr_tmk_col].dropna().unique()
        hit_parcel_b_tmks = review_hit[cp_tmk_col].dropna().unique()

        hit_parcels_a = pr_web.loc[pr_web[pr_tmk_col].isin(hit_parcel_a_tmks)].copy()
        hit_parcels_b = pr_web.loc[pr_web[pr_tmk_col].isin(hit_parcel_b_tmks)].copy()

        for _, row in hit_parcels_a.iterrows():
            folium.GeoJson(
                row.geometry.__geo_interface__,
                style_function=lambda x: {"fillColor": "purple", "color": "purple", "weight": 2, "fillOpacity": 0.3},
                tooltip=f"Parcel A (landed): {row[pr_tmk_col]}",
            ).add_to(fg_hit_parcel_a)

        for _, row in hit_parcels_b.iterrows():
            folium.GeoJson(
                row.geometry.__geo_interface__,
                style_function=lambda x: {"fillColor": "blue", "color": "blue", "weight": 2, "fillOpacity": 0.3},
                tooltip=f"Parcel B (claimed): {row[pr_tmk_col]}",
            ).add_to(fg_hit_parcel_b)

        for _, row in review_hit.iterrows():
            tooltip_text = (
                f"tmk_cps (claimed): {row.get(cp_tmk_col, 'NA')}<br>"
                f"tmk_parcels (landed): {row.get(pr_tmk_col, 'NA')}<br>"
                f"Dist to claimed parcel: {row.get(dist_claimed_col, 'N/A')} m<br>"
                f"Island: {row.get('island', 'Unknown')}"
            )
            folium.CircleMarker(
                location=[row.geometry.y, row.geometry.x],
                radius=4,
                color="blue",
                fill=True,
                fillColor="blue",
                fillOpacity=0.8,
                tooltip=tooltip_text,
            ).add_to(fg_hit_points)

        fg_hit_parcel_a.add_to(m)
        fg_hit_parcel_b.add_to(m)
        fg_hit_points.add_to(m)

    # -----------------------------
    # NO-HIT OFF BY 1 (orange)
    # -----------------------------
    if len(review_nohit_off_by_1) > 0:
        fg_nohit_off1_parcel = folium.FeatureGroup(name="[NO-HIT OFF BY 1] Nearest parcel (orange)")
        fg_nohit_off1_points = folium.FeatureGroup(name="[NO-HIT OFF BY 1] Cesspool points (orange)")

        if nearest_pr_tmk_col not in review_nohit_off_by_1.columns:
            log(f"WARNING: {nearest_pr_tmk_col} missing; cannot draw nearest parcels for no-hit.")
        else:
            nohit_off1_tmks = review_nohit_off_by_1[nearest_pr_tmk_col].dropna().unique()
            nohit_off1_parcels = pr_web.loc[pr_web[pr_tmk_col].isin(nohit_off1_tmks)].copy()

            for _, row in nohit_off1_parcels.iterrows():
                folium.GeoJson(
                    row.geometry.__geo_interface__,
                    style_function=lambda x: {"fillColor": "orange", "color": "orange", "weight": 2, "fillOpacity": 0.3},
                    tooltip=f"Nearest parcel: {row[pr_tmk_col]}",
                ).add_to(fg_nohit_off1_parcel)

        for _, row in review_nohit_off_by_1.iterrows():
            tooltip_text = (
                f"tmk_cps: {row.get(cp_tmk_col, 'NA')}<br>"
                f"Nearest parcel: {row.get(nearest_pr_tmk_col, 'NA')}<br>"
                f"Dist to nearest: {row.get(dist_nearest_col, 'N/A')} m<br>"
                f"Last digit off by 1: True<br>"
                f"Island: {row.get('island', 'Unknown')}"
            )
            folium.CircleMarker(
                location=[row.geometry.y, row.geometry.x],
                radius=4,
                color="orange",
                fill=True,
                fillColor="orange",
                fillOpacity=0.8,
                tooltip=tooltip_text,
            ).add_to(fg_nohit_off1_points)

        fg_nohit_off1_parcel.add_to(m)
        fg_nohit_off1_points.add_to(m)

    # -----------------------------
    # NO-HIT WAY OFF (red)
    # -----------------------------
    if len(review_nohit_way_off) > 0:
        fg_nohit_wayoff_parcel = folium.FeatureGroup(name="[NO-HIT WAY OFF] Nearest parcel (red)")
        fg_nohit_wayoff_points = folium.FeatureGroup(name="[NO-HIT WAY OFF] Cesspool points - MANUAL REVIEW (red)")

        if nearest_pr_tmk_col not in review_nohit_way_off.columns:
            log(f"WARNING: {nearest_pr_tmk_col} missing; cannot draw nearest parcels for no-hit.")
        else:
            nohit_wayoff_tmks = review_nohit_way_off[nearest_pr_tmk_col].dropna().unique()
            nohit_wayoff_parcels = pr_web.loc[pr_web[pr_tmk_col].isin(nohit_wayoff_tmks)].copy()

            for _, row in nohit_wayoff_parcels.iterrows():
                folium.GeoJson(
                    row.geometry.__geo_interface__,
                    style_function=lambda x: {"fillColor": "red", "color": "red", "weight": 2, "fillOpacity": 0.3},
                    tooltip=f"Nearest parcel: {row[pr_tmk_col]}",
                ).add_to(fg_nohit_wayoff_parcel)

        for _, row in review_nohit_way_off.iterrows():
            tooltip_text = (
                f"tmk_cps: {row.get(cp_tmk_col, 'NA')}<br>"
                f"Nearest parcel: {row.get(nearest_pr_tmk_col, 'NA')}<br>"
                f"Dist to nearest: {row.get(dist_nearest_col, 'N/A')} m<br>"
                f"Last digit off by 1: False<br>"
                f"Island: {row.get('island', 'Unknown')}<br>"
                f"<b>NEEDS MANUAL REVIEW</b>"
            )
            folium.CircleMarker(
                location=[row.geometry.y, row.geometry.x],
                radius=4,
                color="red",
                fill=True,
                fillColor="red",
                fillOpacity=0.8,
                tooltip=tooltip_text,
            ).add_to(fg_nohit_wayoff_points)

        fg_nohit_wayoff_parcel.add_to(m)
        fg_nohit_wayoff_points.add_to(m)

    # -----------------------------
    # TMK MALFORMED (yellow)
    # -----------------------------
    if len(review_malformed) > 0:
        fg_malformed_points = folium.FeatureGroup(name="[TMK MALFORMED] Cesspool points (yellow)")

        for _, row in review_malformed.iterrows():
            tooltip_text = (
                f"tmk_cps: {row.get(cp_tmk_col, 'NA')}<br>"
                f"Island: {row.get('island', 'Unknown')}<br>"
                f"<b>MALFORMED TMK - NEEDS MANUAL REVIEW</b>"
            )
            folium.CircleMarker(
                location=[row.geometry.y, row.geometry.x],
                radius=4,
                color="yellow",
                fill=True,
                fillColor="yellow",
                fillOpacity=0.8,
                tooltip=tooltip_text,
            ).add_to(fg_malformed_points)

        fg_malformed_points.add_to(m)

    folium.LayerControl(collapsed=False).add_to(m)

    # -----------------------------
    # Legend (counts reflect what was actually plotted)
    # -----------------------------
    log("QA/QC REVIEW MAP LEGEND")
    print("=" * 70)

    print("\n[HIT AMBIGUOUS] hit_but_tmk_exists_elsewhere:")
    print("  Purple polygon = Parcel A (where point landed)")
    print("  Blue polygon   = Parcel B (claimed by tmk_cps)")
    print("  Blue point     = Cesspool")
    print(f"  Count: {len(review_hit):,}")

    print("\n[NO-HIT OFF BY 1] last_digit_off_by_1=True:")
    print("  Orange polygon = Nearest parcel")
    print("  Orange point   = Cesspool (likely typo)")
    print(f"  Count: {len(review_nohit_off_by_1):,}")

    print("\n[NO-HIT WAY OFF] last_digit_off_by_1=False:")
    print("  Red polygon    = Nearest parcel")
    print("  Red point      = Cesspool (NEEDS MANUAL REVIEW)")
    print(f"  Count: {len(review_nohit_way_off):,}")

    if len(review_malformed) > 0:
        print("\n[TMK MALFORMED]:")
        print("  Yellow point   = Cesspool (invalid TMK format)")
        print(f"  Count: {len(review_malformed):,}")

    print("\n" + "=" * 70)
    print(f"Total review cases: {len(review_web):,}")
    print("=" * 70)

    return m

## Geoprocessing Steps

### Step 1: Preprocess Parcels

- Input: `data/02_interim/parcels_hi_higp_repaired_32604/tmk_state.shp`
- Output: `data/02_interim/parcels_hi_higp_validated_32604.gpkg`
- Workflow: 
    - Load raw parcels polygon (python)
    - Load and repair raw parcels polygon using Repair Geometry (ArcGIS; manual step)
    - Reload repaired parcels polygon (python)
    - Reproject to 32604 (if needed)
    - Validate remaining invalid geometries using Shapely (`make_valid`)
    - Identify parcel rows for TMK `139019004`
    - If duplicate parcel geometries exist:
        - Drop null or empty geometries
        - Validate geometries if needed
        - Dissolve parcel parts into a single geometry using pairwise Shapely `geometry.union()`
            - (avoids `union_all` / `unary_union`, which fail in this environment)
        - Replace the multiple parcel rows with the single merged parcel geometry
    - QA: assert TMK `139019004` appears exactly once in `parcels`
    - Export as GPKG

In [None]:
# Load error in python
# parcels_path = SOURCE / "parcels_hi_higp/tmk_state.shp"
# parcels = gpd.read_file(parcels_path)

# -------------------------------------------------------
# GEOSException         Traceback (most recent call last)
# Cell In[3], line 2
#       1 filepath = RAW / "parcels_hi_higp/tmk_state.shp"
# ----> 2 gdf = gpd.read_file(filepath)

# File ~\miniconda3\envs\geo\Lib\site-packages\geopandas\io\file.py:294, in _read_file(filename, bbox, mask, columns, rows, engine, **kwargs)
#     291             from_bytes = True
#     293 if engine == "pyogrio":
# --> 294     return _read_file_pyogrio(
#     295         filename, bbox=bbox, mask=mask, columns=columns, rows=rows, **kwargs
#     296     )
#     298 elif engine == "fiona":
#     299     if pd.api.types.is_file_like(filename):

# File ~\miniconda3\envs\geo\Lib\site-packages\geopandas\io\file.py:547, in _read_file_pyogrio(path_or_bytes, bbox, mask, rows, **kwargs)
#     538     warnings.warn(
#     539         "The 'include_fields' and 'ignore_fields' keywords are deprecated, and "
#     540         "will be removed in a future release. You can use the 'columns' keyword "
#    (...)
#     543         stacklevel=3,
#     544     )
#     545     kwargs["columns"] = kwargs.pop("include_fields")
# --> 547 return pyogrio.read_dataframe(path_or_bytes, bbox=bbox, **kwargs)

# File ~\miniconda3\envs\geo\Lib\site-packages\pyogrio\geopandas.py:327, in read_dataframe(path_or_buffer, layer, encoding, columns, read_geometry, force_2d, skip_features, max_features, where, bbox, mask, fids, sql, sql_dialect, fid_as_index, use_arrow, on_invalid, arrow_to_pandas_kwargs, **kwargs)
#     324 if geometry is None or not read_geometry:
#     325     return df
# --> 327 geometry = shapely.from_wkb(geometry, on_invalid=on_invalid)
#     329 return gp.GeoDataFrame(df, geometry=geometry, crs=meta["crs"])

# File ~\miniconda3\envs\geo\Lib\site-packages\shapely\io.py:320, in from_wkb(geometry, on_invalid, **kwargs)
#     316 # ensure the input has object dtype, to avoid numpy inferring it as a
#     317 # fixed-length string dtype (which removes trailing null bytes upon access
#     318 # of array elements)
#     319 geometry = np.asarray(geometry, dtype=object)
# --> 320 return lib.from_wkb(geometry, invalid_handler, **kwargs)

# GEOSException: IllegalArgumentException: Points of LinearRing do not form a closed linestring

The parcel layer geometry was repaired in ArcGIS Pro using the Repair Geometry tool with default settings to correct invalid polygon rings and topology errors. The repaired output was saved as an intermediate layer at `data/02_interim/parcels_hi_higp_repaired_32604/tmk_state.shp` and used as input for subsequent geometry validation steps using Shapely.

In [None]:
log("Step 1: Prep parcels (repaired > preprocessed)")

in_path = IN_INTERIM["parcels_repaired"]
out_path = OUT_PREP["parcels_preprocessed"]

parcels_cfg = CONFIG["fields"]["parcels"]
keep_cols = parcels_cfg["keep_cols"] + ["geometry"]
rename_map = parcels_cfg["rename_map"]
tmk_col = parcels_cfg["tmk_col"]
target_dup_tmk = parcels_cfg["target_dup_tmk"]

log(f"Input:  {in_path}")
log(f"Output: {out_path}")

if not should_run(out_path, FORCE):
    log("Output exists and FORCE=False; loading from interim output.")
    parcels = read_vector(out_path, "parcels_preprocessed")
    _ = check_geometry_quality(parcels, "parcels")

else:
    try:
        parcels = read_vector(in_path, "parcels_repaired")

        parcels = validate_geometry_if_needed(parcels, "parcels")
        parcels = ensure_vector_crs(parcels, TARGET_CRS, "parcels")

        missing_cols = [c for c in keep_cols if c not in parcels.columns]
        if missing_cols:
            raise KeyError(f"Missing required columns in parcels input: {missing_cols}")

        log(f"Subsetting to columns: {keep_cols}")
        parcels = parcels[keep_cols].copy()

        log(f"Renaming columns: {rename_map}")
        parcels = parcels.rename(columns=rename_map)

        parcels = ensure_tmk_string(parcels, tmk_col, "parcels")

        valid_tmk_mask = parcels[tmk_col].apply(is_valid_tmk)
        n_malformed = int((~valid_tmk_mask).sum())
        if n_malformed > 0:
            malformed_values = parcels.loc[~valid_tmk_mask, tmk_col].astype("string").unique().tolist()
            log(f"Filtering {n_malformed:,} parcels with malformed TMKs: {malformed_values}")
            parcels = parcels[valid_tmk_mask].copy()
        else:
            log("TMK QA: all TMKs valid (9 numeric digits)")

        # ------------------------------------------------------------
        # Targeted duplicate attribute consistency check (pre-dissolve)
        # ------------------------------------------------------------
        target_tmk_str = str(target_dup_tmk)
        target_mask = (parcels[tmk_col].astype("string") == target_tmk_str)
        target_n = int(target_mask.sum())

        if target_n > 1:
            dup_parts = parcels.loc[target_mask].copy()
            non_geom_cols = [c for c in dup_parts.columns if c != "geometry"]

            inconsistent = [
                c for c in non_geom_cols
                if dup_parts[c].astype("string").nunique(dropna=False) > 1
            ]

            if inconsistent:
                log(
                    f"WARNING parcels: target TMK {target_tmk_str} has inconsistent "
                    f"attributes across duplicates: {inconsistent}"
                )
        # ------------------------------------------------------------

        # Targeted dissolve as part of the prep step
        parcels = dissolve_duplicate_tmk_polygons(
            parcels=parcels,
            tmk_col=tmk_col,
            target_tmk=target_dup_tmk,
            layer_name="parcels",
        )

        _ = check_geometry_quality(parcels, "parcels")
        log(f"Final columns: {parcels.columns.tolist()}")

        if DRY_RUN:
            log(f"[DRY RUN] Would write: {out_path}")
        else:
            write_vector_gpkg(parcels, out_path, "parcels_preprocessed")

        del valid_tmk_mask, n_malformed

    except Exception as e:
        log(f"ERROR in Step 1: {e}")
        if STOP_ON_ERROR:
            raise

log(f"Step 1 complete: {len(parcels):,} parcels ready for cesspool TMK correction")
display(parcels.head())

In [None]:
# Quick map check: confirm TMK 139019004 is now a single parcel row + visualize window
log("Map check: verify parcel dissolve for TMK 139019004")

base_tmk = "139019004"

# Build small TMK window for spatial context
base_int = int(base_tmk)
tmk_window = [str(base_int + i) for i in range(-2, 3)]

# Slice parcels + cesspools using existing objects
par_near = parcels.loc[
    parcels["tmk_parcels"].astype("string").isin(tmk_window)
].copy()

# cp_near = cesspools.loc[
#     cesspools["tmk_validated"].astype("string").isin(tmk_window)
# ].copy()

# QA: confirm exactly one parcel row for target TMK
n_target = int((par_near["tmk_parcels"] == base_tmk).sum())
log(f"Parcels rows for target TMK in window: {n_target} (expected 1)")

# Parcel map
m = par_near.explore(
    column="tmk_parcels",
    categorical=True,
    legend=True,
    style_kwds={
        "fillOpacity": 0.25,
        "weight": 2,
    },
)

# # Overlay cesspools
# cp_near.explore(
#     m=m,
#     color="blue",
#     marker_kwds={"radius": 5},
#     tooltip=[
#         "tmk_validated",
#         "class_iv",
#         "effluent",
#         "nitrogen",
#     ],
# )

m

### Step 2: Prep rasters

- Workflow: 
    - Load raw raster
    - Mosaic tiles (if needed)
    - Reproject to 32604 (if needed)
    - Export GeoTIFF

#### Step 2a: DEM mosaic

- Inputs: `data/01_inputs/source/dem_hi_pacioos/*.tif`
- Output: `data/01_inputs/prepared/dem_hi_pacioos_mosaic_32604.tif`

In [14]:
log("Step 2a: Mosaic DEM rasters")

in_dir = IN_SOURCE["dem_dir"]
out_path = OUT_PREP["dem_mosaic"]

log(f"Input dir: {in_dir}")
log(f"Output:    {out_path}")

if not should_run(out_path, FORCE):
    log("Output exists and FORCE=False; skipping Step 2a.")
else:
    try:
        # Check CRS of source rasters
        crs_check = check_raster_crs(in_dir, "dem")
        
        if crs_check["missing"]:
            raise ValueError(f"DEM rasters missing CRS: {crs_check['missing']}. Fix source files first.")
        
        # Check if reprojection needed after mosaic
        source_crs_values = set(crs_check["crs_values"].values())
        needs_reproject = not all("32604" in str(crs) or "UTM zone 4N" in str(crs) for crs in source_crs_values)
        
        if needs_reproject:
            # Mosaic to interim, then reproject to prepared
            mosaic_path = INTERIM / "tempspace/dem_hi_pacioos_mosaic.tif"
            mosaic_rasters(in_dir, mosaic_path, "dem", force=FORCE, dry_run=DRY_RUN)
            ensure_raster_crs(mosaic_path, out_path, 32604, "dem", force=FORCE, dry_run=DRY_RUN)
        else:
            # Mosaic directly to prepared (already in target CRS)
            mosaic_rasters(in_dir, out_path, "dem", force=FORCE, dry_run=DRY_RUN)
        
        log("Step 2a complete: DEM mosaic ready")
        
    except Exception as e:
        log(f"ERROR in Step 2a: {e}")
        if STOP_ON_ERROR:
            raise


--- Step 2a: Mosaic DEM rasters ---

--- Input dir: f:\projects\shuler_lab_projects\HiOSDS-TechSuitabilityAnalysis\data\01_inputs\source\dem_hi_pacioos ---

--- Output:    f:\projects\shuler_lab_projects\HiOSDS-TechSuitabilityAnalysis\data\01_inputs\prepared\dem_hi_pacioos_mosaic_32604\dem_hi_pacioos_mosaic_32604.tif ---

--- dem: checking CRS for 8 rasters ---

---   usgs_dem_10m_bigisland.tif: EPSG:4326 ---

---   usgs_dem_10m_kahoolawe.tif: EPSG:4326 ---

---   usgs_dem_10m_kauai.tif: EPSG:4326 ---

---   usgs_dem_10m_lanai.tif: EPSG:4326 ---

---   usgs_dem_10m_maui.tif: EPSG:4326 ---

---   usgs_dem_10m_molokai.tif: EPSG:4326 ---

---   usgs_dem_10m_niihau.tif: EPSG:4326 ---

---   usgs_dem_10m_oahu.tif: EPSG:4326 ---

--- dem: all rasters have valid CRS ---

--- dem: mosaic exists; skipping. ---

--- dem: reprojecting raster -> EPSG:32604 ---

--- Reproject dem completed in 3.95 minutes ---

--- Wrote: f:\projects\shuler_lab_projects\HiOSDS-TechSuitabilityAnalysis\data\01_inputs

#### Step 2b: Water table Mosaic
- Inputs: `data/01_inputs/source/watertable_hi_hcpt/*.tif`
- Output: `data/01_inputs/prepared/watertable_hi_hcpt_mosaic_32604.tif`

In [15]:
log("Step 2b: Mosaic water table rasters")

in_dir = IN_SOURCE["watertable_dir"]
out_path = OUT_PREP["watertable_mosaic"]

log(f"Input dir: {in_dir}")
log(f"Output:    {out_path}")

if not should_run(out_path, FORCE):
    log("Output exists and FORCE=False; skipping Step 2b.")
else:
    try:
        # Check CRS of source rasters
        crs_check = check_raster_crs(in_dir, "watertable")
        
        if crs_check["missing"]:
            raise ValueError(f"Water table rasters missing CRS: {crs_check['missing']}. Fix source files first.")
        
        # Check if reprojection needed after mosaic
        source_crs_values = set(crs_check["crs_values"].values())
        needs_reproject = not all("32604" in str(crs) or "UTM zone 4N" in str(crs) for crs in source_crs_values)
        
        if needs_reproject:
            # Mosaic to interim, then reproject to prepared
            mosaic_path = INTERIM / "tempspace/watertable_hi_hcpt_mosaic.tif"
            mosaic_rasters(in_dir, mosaic_path, "watertable", force=FORCE, dry_run=DRY_RUN)
            ensure_raster_crs(mosaic_path, out_path, 32604, "watertable", force=FORCE, dry_run=DRY_RUN)
        else:
            # Mosaic directly to prepared (already in target CRS)
            mosaic_rasters(in_dir, out_path, "watertable", force=FORCE, dry_run=DRY_RUN)
        
        log("Step 2b complete: water table mosaic ready")
        
    except Exception as e:
        log(f"ERROR in Step 2b: {e}")
        if STOP_ON_ERROR:
            raise


--- Step 2b: Mosaic water table rasters ---

--- Input dir: f:\projects\shuler_lab_projects\HiOSDS-TechSuitabilityAnalysis\data\01_inputs\source\watertable_hi_hcpt ---

--- Output:    f:\projects\shuler_lab_projects\HiOSDS-TechSuitabilityAnalysis\data\01_inputs\prepared\watertable_hi_hcpt_mosaic_32604\watertable_hi_hcpt_mosaic_32604.tif ---

--- watertable: checking CRS for 4 rasters ---

---   BI_wt_prj.tif: EPSG:4326 ---

---   kauai_wt_prj.tif: EPSG:4326 ---

---   maui_wt_prj.tif: EPSG:4326 ---

---   oahu_wt_prj.tif: EPSG:4326 ---

--- watertable: all rasters have valid CRS ---

--- watertable: mosaic exists; skipping. ---

--- watertable: reprojecting raster -> EPSG:32604 ---

--- Reproject watertable completed in 0.13 seconds ---

--- Wrote: f:\projects\shuler_lab_projects\HiOSDS-TechSuitabilityAnalysis\data\01_inputs\prepared\watertable_hi_hcpt_mosaic_32604\watertable_hi_hcpt_mosaic_32604.tif ---

--- Step 2b complete: water table mosaic ready ---


#### Step 2c: Slopes

- Inputs: `data/01_inputs/source/slope_hi_hcpt/*.tif`
- Output: `data/01_inputs/prepared/.tif`

In [None]:
# Rasters pulled from HCPT are incomplete
# Prob just need to reproject

#### Step 2d: Rainfall

- Input: `data/01_inputs/source/annual_rainfall_hi_hcpt/Rain_inann.tif`
- Output: `data/01_inputs/prepared/annual_rainfall_hi_hcpt_32604.tif`

In [17]:
log("Step 2d: Reproject rainfall raster")

in_path = IN_SOURCE["rainfall"]
out_path = OUT_PREP["rainfall"]

log(f"Input:  {in_path}")
log(f"Output: {out_path}")

if not should_run(out_path, FORCE):
    log("Output exists and FORCE=False; skipping Step 2d.")
else:
    try:
        ensure_raster_crs(
            in_path=in_path,
            out_path=out_path,
            target_epsg=32604,
            layer_name="rainfall",
            force=FORCE,
            dry_run=DRY_RUN,
        )
        log("Step 2d complete: rainfall raster ready")
        
    except Exception as e:
        log(f"ERROR in Step 2d: {e}")
        if STOP_ON_ERROR:
            raise


--- Step 2d: Reproject rainfall raster ---

--- Input:  f:\projects\shuler_lab_projects\HiOSDS-TechSuitabilityAnalysis\data\01_inputs\source\annual_rainfall_hi_hcpt\Rain_inann.tif ---

--- Output: f:\projects\shuler_lab_projects\HiOSDS-TechSuitabilityAnalysis\data\01_inputs\prepared\annual_rainfall_hi_hcpt_32604\annual_rainfall_hi_hcpt_32604.tif ---

--- rainfall: reprojecting raster -> EPSG:32604 ---

--- Reproject rainfall completed in 0.44 seconds ---

--- Wrote: f:\projects\shuler_lab_projects\HiOSDS-TechSuitabilityAnalysis\data\01_inputs\prepared\annual_rainfall_hi_hcpt_32604\annual_rainfall_hi_hcpt_32604.tif ---

--- Step 2d complete: rainfall raster ready ---


### Step 3: Prep vectors

- Workflow:
    - Load > validate geometries > reproject to EPSG:32604 if needed > geometry QA > export GPKG
- Sub-steps:
    - 3a: Coastline
    - 3b: Special Management Area (SMA)
    - 3c: Streams
    - 3d: Municipal wells
    - 3e: Domestic wells

#### Step 3a: Coastline

- Input: `data/01_inputs/source/coastline_hi_op/coastline.shp`
- Output: `data/01_inputs/prepared/coastline_hi_op_32604.gpkg`

In [18]:
log("Step 3a: Prep coastline vector (source > prepared)")

in_path = IN_SOURCE["coastline"]
out_path = OUT_PREP["coastline"]

log(f"Input:  {in_path}")
log(f"Output: {out_path}")

if not should_run(out_path, FORCE):
    log("Output exists and FORCE=False; skipping Step 3a.")
else:
    try:
        _ = prep_vector(
            in_path=in_path,
            out_path=out_path,
            layer_name="coastline",
            target_crs=TARGET_CRS,
        )
        log("Step 3a complete: coastline ready")

    except Exception as e:
        log(f"ERROR in Step 3a: {e}")
        if STOP_ON_ERROR:
            raise


--- Step 3a: Prep coastline vector (source > prepared) ---

--- Input:  f:\projects\shuler_lab_projects\HiOSDS-TechSuitabilityAnalysis\data\01_inputs\source\coastline_hi_op\coastline.shp ---

--- Output: f:\projects\shuler_lab_projects\HiOSDS-TechSuitabilityAnalysis\data\01_inputs\prepared\coastline_hi_op_32604.gpkg ---

--- Reading coastline: f:\projects\shuler_lab_projects\HiOSDS-TechSuitabilityAnalysis\data\01_inputs\source\coastline_hi_op\coastline.shp ---


  _init_gdal_data()



--- Read coastline completed in 3.48 seconds ---

--- coastline: rows=13 | crs=EPSG:3750 ---

--- coastline: invalid geometries (before): 0 ---

--- coastline: invalid geometries (after):  0 ---

--- coastline: reprojecting from EPSG:3750 to EPSG:32604 ---

--- Reproject coastline completed in 0.10 seconds ---

--- coastline geometry QA: null=0, empty=0, invalid=0 ---

--- Write coastline (GPKG) completed in 0.64 seconds ---

--- Wrote: f:\projects\shuler_lab_projects\HiOSDS-TechSuitabilityAnalysis\data\01_inputs\prepared\coastline_hi_op_32604.gpkg ---

--- Step 3a complete: coastline ready ---


#### Step 3b: Special Management Area (SMA)

- Input: `data/01_inputs/source/sma_hi_op/sma.shp`
- Output: `data/01_inputs/prepared/sma_hi_op_32604.gpkg`

In [19]:
log("Step 3b: Prep SMA vector (source > prepared)")

in_path = IN_SOURCE["sma"]
out_path = OUT_PREP["sma"]

log(f"Input:  {in_path}")
log(f"Output: {out_path}")

if not should_run(out_path, FORCE):
    log("Output exists and FORCE=False; skipping Step 3b.")
else:
    try:
        _ = prep_vector(
            in_path=in_path,
            out_path=out_path,
            layer_name="sma",
            target_crs=TARGET_CRS,
        )
        log("Step 3b complete: SMA ready")

    except Exception as e:
        log(f"ERROR in Step 3b: {e}")
        if STOP_ON_ERROR:
            raise


--- Step 3b: Prep SMA vector (source > prepared) ---

--- Input:  f:\projects\shuler_lab_projects\HiOSDS-TechSuitabilityAnalysis\data\01_inputs\source\sma_hi_op\sma.shp ---

--- Output: f:\projects\shuler_lab_projects\HiOSDS-TechSuitabilityAnalysis\data\01_inputs\prepared\sma_hi_op_32604.gpkg ---

--- Reading sma: f:\projects\shuler_lab_projects\HiOSDS-TechSuitabilityAnalysis\data\01_inputs\source\sma_hi_op\sma.shp ---

--- Read sma completed in 0.17 seconds ---

--- sma: rows=45 | crs=EPSG:3750 ---

--- sma: invalid geometries (before): 2 ---

--- sma: make_valid completed in 2.00 seconds ---

--- sma: invalid geometries (after):  0 ---

--- sma: reprojecting from EPSG:3750 to EPSG:32604 ---

--- Reproject sma completed in 0.14 seconds ---

--- sma geometry QA: null=0, empty=0, invalid=0 ---

--- Write sma (GPKG) completed in 0.61 seconds ---

--- Wrote: f:\projects\shuler_lab_projects\HiOSDS-TechSuitabilityAnalysis\data\01_inputs\prepared\sma_hi_op_32604.gpkg ---

--- Step 3b comple

#### Step 3c: Streams

- Inputs: `data/01_inputs/source//*.tif`
- Output: `data/01_inputs/prepared/.tif`

In [20]:
log("Step 3c: Prep streams (source > prepared)")

in_path = IN_SOURCE["streams"]
out_path = OUT_PREP["streams"]

log(f"Input:  {in_path}")
log(f"Output: {out_path}")

if not should_run(out_path, FORCE):
    log("Output exists and FORCE=False; skipping Step 3c.")
else:
    try:
        _ = prep_vector(in_path, out_path, "streams", TARGET_CRS)
        log("Step 3c complete: streams ready")
    except Exception as e:
        log(f"ERROR in Step 3c: {e}")
        if STOP_ON_ERROR:
            raise


--- Step 3c: Prep streams (source > prepared) ---

--- Input:  f:\projects\shuler_lab_projects\HiOSDS-TechSuitabilityAnalysis\data\01_inputs\source\streams_hi_hcpt\Streams_prj.shp ---

--- Output: f:\projects\shuler_lab_projects\HiOSDS-TechSuitabilityAnalysis\data\01_inputs\prepared\streams_hi_hcpt_32604.gpkg ---

--- Reading streams: f:\projects\shuler_lab_projects\HiOSDS-TechSuitabilityAnalysis\data\01_inputs\source\streams_hi_hcpt\Streams_prj.shp ---

--- Read streams completed in 0.41 seconds ---

--- streams: rows=10,785 | crs=EPSG:4326 ---

--- streams: invalid geometries (before): 0 ---

--- streams: invalid geometries (after):  0 ---

--- streams: reprojecting from EPSG:4326 to EPSG:32604 ---

--- Reproject streams completed in 0.19 seconds ---

--- streams geometry QA: null=0, empty=0, invalid=0 ---

--- Write streams (GPKG) completed in 2.10 seconds ---

--- Wrote: f:\projects\shuler_lab_projects\HiOSDS-TechSuitabilityAnalysis\data\01_inputs\prepared\streams_hi_hcpt_32604.gp

#### Step 3d: Municipal Wells

- Inputs: `data/01_inputs/source//*.tif`
- Output: `data/01_inputs/prepared/.tif`

In [21]:
log("Step 3d: Prep municipal wells (source > prepared)")

in_path = IN_SOURCE["mun_wells"]
out_path = OUT_PREP["mun_wells"]

log(f"Input:  {in_path}")
log(f"Output: {out_path}")

if not should_run(out_path, FORCE):
    log("Output exists and FORCE=False; skipping Step 3d.")
else:
    try:
        _ = prep_vector(in_path, out_path, "mun_wells", TARGET_CRS)
        log("Step 3d complete: mun_wells ready")
    except Exception as e:
        log(f"ERROR in Step 3d: {e}")
        if STOP_ON_ERROR:
            raise


--- Step 3d: Prep municipal wells (source > prepared) ---

--- Input:  f:\projects\shuler_lab_projects\HiOSDS-TechSuitabilityAnalysis\data\01_inputs\source\wells_hi_hcpt\CWRM_Wells_MUN_prj.shp ---

--- Output: f:\projects\shuler_lab_projects\HiOSDS-TechSuitabilityAnalysis\data\01_inputs\prepared\wells_hi_hcpt_mun_32604.gpkg ---

--- Reading mun_wells: f:\projects\shuler_lab_projects\HiOSDS-TechSuitabilityAnalysis\data\01_inputs\source\wells_hi_hcpt\CWRM_Wells_MUN_prj.shp ---

--- Read mun_wells completed in 0.20 seconds ---

--- mun_wells: rows=534 | crs=EPSG:4326 ---

--- mun_wells: invalid geometries (before): 0 ---

--- mun_wells: invalid geometries (after):  0 ---

--- mun_wells: reprojecting from EPSG:4326 to EPSG:32604 ---

--- Reproject mun_wells completed in 0.00 seconds ---

--- mun_wells geometry QA: null=0, empty=0, invalid=0 ---

--- Write mun_wells (GPKG) completed in 0.79 seconds ---

--- Wrote: f:\projects\shuler_lab_projects\HiOSDS-TechSuitabilityAnalysis\data\01_input

#### Step 3e: Domestic Wells

- Inputs: `data/01_inputs/source//*.tif`
- Output: `data/01_inputs/prepared/.tif`

In [22]:
log("Step 3e: Prep domestic wells (source > prepared)")

in_path = IN_SOURCE["dom_wells"]
out_path = OUT_PREP["dom_wells"]

log(f"Input:  {in_path}")
log(f"Output: {out_path}")

if not should_run(out_path, FORCE):
    log("Output exists and FORCE=False; skipping Step 3e.")
else:
    try:
        _ = prep_vector(in_path, out_path, "dom_wells", TARGET_CRS)
        log("Step 3e complete: dom_wells ready")
    except Exception as e:
        log(f"ERROR in Step 3e: {e}")
        if STOP_ON_ERROR:
            raise


--- Step 3e: Prep domestic wells (source > prepared) ---

--- Input:  f:\projects\shuler_lab_projects\HiOSDS-TechSuitabilityAnalysis\data\01_inputs\source\wells_hi_hcpt\CWRM_Wells_DOM_prj.shp ---

--- Output: f:\projects\shuler_lab_projects\HiOSDS-TechSuitabilityAnalysis\data\01_inputs\prepared\wells_hi_hcpt_dom_32604.gpkg ---

--- Reading dom_wells: f:\projects\shuler_lab_projects\HiOSDS-TechSuitabilityAnalysis\data\01_inputs\source\wells_hi_hcpt\CWRM_Wells_DOM_prj.shp ---

--- Read dom_wells completed in 0.19 seconds ---

--- dom_wells: rows=910 | crs=EPSG:4326 ---

--- dom_wells: invalid geometries (before): 0 ---

--- dom_wells: invalid geometries (after):  0 ---

--- dom_wells: reprojecting from EPSG:4326 to EPSG:32604 ---

--- Reproject dom_wells completed in 0.01 seconds ---

--- dom_wells geometry QA: null=0, empty=0, invalid=0 ---

--- Write dom_wells (GPKG) completed in 1.15 seconds ---

--- Wrote: f:\projects\shuler_lab_projects\HiOSDS-TechSuitabilityAnalysis\data\01_inputs

### Step 4: Correct/Align Cesspool TMKs

- Workflow:
    - 4a: Load and prep cesspools (subset, rename, reproject, filter to class IV)
    - 4b: Spatial join cesspools to parcels + deduplication
    - 4c: Process hit cases (match_ok, tmk_corrected, hit_but_tmk_exists_elsewhere)
    - 4d: Process no-hit cases (geometry_corrected, nearest_corrected, review)
    - 4e: Build review table and export
    - 4f: QA/QC review map (if needed)

See Appendix: TMK Correction Decision Workflow at the end of this notebook for the full decision logic.

#### Step 4a: Load and prep cesspools

- Inputs:
    - `data/01_inputs/prepared/parcels_hi_higp_validated_32604.gpkg` (from Step 1)
    - `data/01_inputs/source/cesspools_inventory_hi_hcpt/2025CP_Expld_V2_clean.shp`
- Sub-steps:
    - Load raw cesspools layer
    - Subset to needed columns, rename
    - Reproject to EPSG:32604
    - Validate geometries, remove null/empty
    - Ensure TMK is string type
    - Filter to class IV records only

In [None]:
log("Step 4a: Load and prep cesspools")

in_path_cp = IN_SOURCE["cesspools"]
out_path_corrected = OUT_PREP["cesspools_corrected"]
out_path_review = OUT_REVIEW["cesspools_review"]

# Config
cp_cfg = CONFIG["fields"]["cesspools"]
cp_tmk_col = cp_cfg["tmk_col"]
pr_tmk_col = CONFIG["fields"]["parcels"]["tmk_col"]
thresh_m = CONFIG["thresholds"]["tmk_correction_distance_m"]

log(f"Input cesspools: {in_path_cp}")
log(f"Input parcels:   (from Step 1, {len(parcels):,} rows)")
log(f"Output corrected: {out_path_corrected}")
log(f"Output review:    {out_path_review}")

if not should_run(out_path_corrected, FORCE):
    log("Output exists and FORCE=False; loading from prepared outputs.")
    cesspools_corrected = read_vector(out_path_corrected, "cesspools_corrected")
    
    if out_path_review.exists():
        cesspools_review = read_vector(out_path_review, "cesspools_review")
    else:
        cesspools_review = gpd.GeoDataFrame()
    
    log(f"Loaded: {len(cesspools_corrected):,} corrected, {len(cesspools_review):,} review")
    SKIP_STEP_4 = True
else:
    SKIP_STEP_4 = False
    
    # Load and prep
    cesspools = load_and_prep_cesspools(
        in_path=in_path_cp,
        keep_cols=cp_cfg["keep_cols"],
        rename_map=cp_cfg["rename_map"],
        tmk_col=cp_tmk_col,
        filter_col=cp_cfg["filter_col"],
        filter_value=cp_cfg["filter_value"],
        target_crs=TARGET_CRS,
    )
    
    # Preconditions check
    n_malformed = check_tmk_correction_preconditions(
        cesspools, parcels, cp_tmk_col, pr_tmk_col, TARGET_CRS, thresh_m
    )
    
    # Store original count for validation
    n_original = len(cesspools)
    
    log(f"Step 4a complete: {len(cesspools):,} cesspools ready for TMK correction")

#### Step 4b: Spatial join cesspools to parcels

- Sub-steps:
    - Join cesspools to parcels using spatial intersection
    - Identify hits (landed in parcel) vs no-hits (outside all parcels)
    - Deduplicate boundary cases where one cesspool intersects multiple parcels

In [None]:
log("Step 4b: Spatial join cesspools to parcels")

if SKIP_STEP_4:
    log("Skipping (outputs exist; loaded in global environment)")
else:
    try:
        # Spatial join
        with timed_step("Spatial join"):
            cp_join = gpd.sjoin(
                cesspools.copy(),
                parcels[[pr_tmk_col, "geometry"]].copy(),
                how="left",
                predicate="intersects",
            )
        
        # Add stable row ID for tracking
        cp_join["_cp_rowid"] = range(len(cp_join))
        
        # Summary
        n_joined = len(cp_join)
        n_hits = int(cp_join[pr_tmk_col].notna().sum())
        n_nohits = int(cp_join[pr_tmk_col].isna().sum())
        n_extra = n_joined - n_original
        
        log(f"Original: {n_original:,} | Joined: {n_joined:,}")
        log(f"  Hits: {n_hits:,} | No-hits: {n_nohits:,} | Extra (boundary dupes): {n_extra:,}")
        
        # Deduplicate
        if n_extra > 0:
            log(f"Deduplicating {n_extra:,} boundary duplicates")
            cp_match_counts = cp_join.groupby(cp_join.index).size()
            cp_multi_idx = cp_match_counts[cp_match_counts > 1].index
            cp_join["_multi_match_flag"] = cp_join.index.isin(cp_multi_idx)
            cp_join = cp_join.loc[~cp_join.index.duplicated(keep="first")].copy()
            log(f"  Cesspools matching multiple parcels: {len(cp_multi_idx):,}")
        else:
            cp_join["_multi_match_flag"] = False
        
        # Validate row count
        if len(cp_join) != n_original:
            raise ValueError(f"Row count mismatch after dedup: {len(cp_join):,} vs {n_original:,}")
        
        log(f"Step 4b complete: {len(cp_join):,} rows after dedup")
        
    except Exception as e:
        log(f"ERROR in Step 4b: {e}")
        if STOP_ON_ERROR:
            raise

#### Step 4c: Process hit cases

- Sub-steps:
    - Compute match flags (is_hit, tmk_match, tmk_cps_in_parcels, tmk_cps_malformed)
    - Assign status for hit cases:
        - `match_ok`: TMKs match, no action needed
        - `tmk_corrected`: TMK mismatch, corrected to parcel TMK (string change only)
        - `hit_but_tmk_exists_elsewhere`: Ambiguous case, flagged for review
    - Assign `tmk_malformed` status for invalid TMKs

In [None]:
log("Step 4c: Process hit cases")

if SKIP_STEP_4:
    log("Skipping (outputs exist; loaded in global environment)")
else:
    try:
        # Compute flags
        log("Computing match flags")
        parcels_tmk_set = set(parcels[pr_tmk_col].dropna().unique())
        
        cp_join["is_hit"] = cp_join[pr_tmk_col].notna()
        cp_join["tmk_match"] = cp_join["is_hit"] & (cp_join[cp_tmk_col] == cp_join[pr_tmk_col])
        cp_join["tmk_cps_in_parcels"] = cp_join[cp_tmk_col].isin(parcels_tmk_set)
        cp_join["tmk_cps_malformed"] = ~cp_join[cp_tmk_col].apply(is_valid_tmk)
        
        # Initialize output columns
        cp_join["tmk_validated"] = cp_join[cp_tmk_col]
        cp_join["tmk_status"] = None
        
        # Malformed TMKs (regardless of hit/no-hit)
        cp_join.loc[cp_join["tmk_cps_malformed"], "tmk_status"] = "tmk_malformed"
        
        # Hit + TMK match = match_ok
        match_ok_mask = cp_join["is_hit"] & cp_join["tmk_match"] & ~cp_join["tmk_cps_malformed"]
        cp_join.loc[match_ok_mask, "tmk_status"] = "match_ok"
        
        # Hit + mismatch cases
        hit_mismatch_mask = cp_join["is_hit"] & ~cp_join["tmk_match"] & ~cp_join["tmk_cps_malformed"]
        
        # Mismatch where tmk_cps exists elsewhere = ambiguous
        hit_mismatch_exists = hit_mismatch_mask & cp_join["tmk_cps_in_parcels"]
        cp_join.loc[hit_mismatch_exists, "tmk_status"] = "hit_but_tmk_exists_elsewhere"
        
        # Mismatch where tmk_cps doesn't exist = correct to parcel TMK
        hit_mismatch_not_exists = hit_mismatch_mask & ~cp_join["tmk_cps_in_parcels"]
        cp_join.loc[hit_mismatch_not_exists, "tmk_status"] = "tmk_corrected"
        cp_join.loc[hit_mismatch_not_exists, "tmk_validated"] = cp_join.loc[hit_mismatch_not_exists, pr_tmk_col]
        
        # Summary
        n_malformed = int((cp_join["tmk_status"] == "tmk_malformed").sum())
        n_match_ok = int((cp_join["tmk_status"] == "match_ok").sum())
        n_tmk_corrected = int((cp_join["tmk_status"] == "tmk_corrected").sum())
        n_hit_ambiguous = int((cp_join["tmk_status"] == "hit_but_tmk_exists_elsewhere").sum())
        
        log(f"  tmk_malformed: {n_malformed:,}")
        log(f"  match_ok: {n_match_ok:,}")
        log(f"  tmk_corrected: {n_tmk_corrected:,}")
        log(f"  hit_but_tmk_exists_elsewhere: {n_hit_ambiguous:,}")
        
        log("Step 4c complete")
        
    except Exception as e:
        log(f"ERROR in Step 4c: {e}")
        if STOP_ON_ERROR:
            raise

#### Step 4d: Process no-hit cases

- Sub-steps:
    - No-hit where `tmk_cps` exists in parcels > `geometry_corrected` (move point to matching parcel)
    - No-hit where `tmk_cps` doesn't exist > find nearest parcel:
        - Within threshold > `nearest_corrected` (correct TMK and geometry)
        - Beyond threshold > `review_nohit_beyond_threshold` (flagged for review)

In [None]:
log("Step 4d: Process no-hit cases")

if SKIP_STEP_4:
    log("Skipping (outputs exist; loaded in global environment)")
else:
    try:
        nohit_mask = ~cp_join["is_hit"] & ~cp_join["tmk_cps_malformed"]
        n_nohits_valid = int(nohit_mask.sum())
        log(f"No-hit cases (excluding malformed): {n_nohits_valid:,}")
        
        # Initialize for later use
        nohit_review = gpd.GeoDataFrame()
        
        if n_nohits_valid > 0:
            # -----------------------------------------------------------------
            # No-hit where tmk_cps exists in parcels = geometry_corrected
            # -----------------------------------------------------------------
            nohit_tmk_exists = nohit_mask & cp_join["tmk_cps_in_parcels"]
            n_geom_correct = int(nohit_tmk_exists.sum())
            
            if n_geom_correct > 0:
                cp_join.loc[nohit_tmk_exists, "tmk_status"] = "geometry_corrected"
                
                # Move points to matching parcel
                for idx in cp_join.loc[nohit_tmk_exists].index:
                    tmk_val = cp_join.loc[idx, cp_tmk_col]
                    matching_parcel = parcels.loc[parcels[pr_tmk_col] == tmk_val]
                    if len(matching_parcel) > 0:
                        new_point = matching_parcel.geometry.iloc[0].representative_point()
                        cp_join.loc[idx, "geometry"] = new_point
                
                log(f"  geometry_corrected: {n_geom_correct:,}")
            
            # -----------------------------------------------------------------
            # No-hit where tmk_cps doesn't exist = nearest parcel lookup
            # -----------------------------------------------------------------
            nohit_tmk_not_exists = nohit_mask & ~cp_join["tmk_cps_in_parcels"]
            n_needs_nearest = int(nohit_tmk_not_exists.sum())
            
            if n_needs_nearest > 0:
                log(f"  Finding nearest parcel for {n_needs_nearest:,} records...")
                
                nohit_nearest_input = cp_join.loc[nohit_tmk_not_exists].copy()
                drop_cols = [c for c in ["index_right", pr_tmk_col] if c in nohit_nearest_input.columns]
                if drop_cols:
                    nohit_nearest_input = nohit_nearest_input.drop(columns=drop_cols)
                
                with timed_step("Nearest parcel lookup"):
                    nohit_nearest = gpd.sjoin_nearest(
                        nohit_nearest_input,
                        parcels[[pr_tmk_col, "geometry"]].copy(),
                        how="left",
                        distance_col="dist_to_nearest_m"
                    )
                
                # Deduplicate ties
                nohit_nearest = (
                    nohit_nearest
                    .sort_values(["dist_to_nearest_m", "index_right"])
                    .drop_duplicates(subset=["_cp_rowid"], keep="first")
                    .copy()
                )
                
                log(f"  Distance range: {nohit_nearest['dist_to_nearest_m'].min():.1f}m - {nohit_nearest['dist_to_nearest_m'].max():.1f}m")
                
                # Within threshold = nearest_corrected
                within_thresh = nohit_nearest["dist_to_nearest_m"] <= thresh_m
                n_within = int(within_thresh.sum())
                n_beyond = int((~within_thresh).sum())
                
                if n_within > 0:
                    for _, row in nohit_nearest.loc[within_thresh].iterrows():
                        cp_idx = cp_join.loc[cp_join["_cp_rowid"] == row["_cp_rowid"]].index[0]
                        cp_join.loc[cp_idx, "tmk_validated"] = row[pr_tmk_col]
                        cp_join.loc[cp_idx, "tmk_status"] = "nearest_corrected"
                        
                        matching_parcel = parcels.loc[parcels[pr_tmk_col] == row[pr_tmk_col]]
                        if len(matching_parcel) > 0:
                            new_point = matching_parcel.geometry.iloc[0].representative_point()
                            cp_join.loc[cp_idx, "geometry"] = new_point
                    
                    log(f"  nearest_corrected: {n_within:,} (within {thresh_m}m)")
                
                # Beyond threshold = review
                nohit_review = nohit_nearest.loc[~within_thresh].copy()
                
                if n_beyond > 0:
                    for rowid in nohit_review["_cp_rowid"].values:
                        cp_idx = cp_join.loc[cp_join["_cp_rowid"] == rowid].index[0]
                        cp_join.loc[cp_idx, "tmk_status"] = "review_nohit_beyond_threshold"
                    
                    # Add last_digit_off_by_1 flag
                    nohit_review["last_digit_off_by_1"] = (
                        nohit_review[cp_tmk_col].str[-1].astype(int) -
                        nohit_review[pr_tmk_col].str[-1].astype(int)
                    ).abs() == 1
                    
                    n_off_by_1 = int(nohit_review["last_digit_off_by_1"].sum())
                    log(f"  review_nohit_beyond_threshold: {n_beyond:,} (last digit off by 1: {n_off_by_1:,})")
        
        log("Step 4d complete")
        
    except Exception as e:
        log(f"ERROR in Step 4d: {e}")
        if STOP_ON_ERROR:
            raise

#### Step 4e: Build review table and export

- Outputs:
    - `data/01_inputs/prepared/cesspools_inventory_hi_hcpt_corrected_32604.gpkg`
    - `data/01_inputs/prepared/review_queue/cesspools_inventory_hi_hcpt_review_queue_32604.gpkg`
- Sub-steps:
    - Combine flagged records into review table
    - Add diagnostic columns (distance to claimed parcel, nearest parcel TMK)
    - Export corrected cesspools and review table as GPKG
    - Export review table as CSV for easier viewing

In [None]:
log("Step 4e: Build review table and export")

if SKIP_STEP_4:
    log("Skipping (outputs exist; loaded in global environment)")
else:
    try:
        # Build review table
        log("Building review table")
        review_statuses = ["tmk_malformed", "hit_but_tmk_exists_elsewhere", "review_nohit_beyond_threshold"]
        review_main = cp_join.loc[cp_join["tmk_status"].isin(review_statuses)].copy()
        
        # Add distance to claimed parcel for ambiguous hit cases
        if "hit_but_tmk_exists_elsewhere" in review_main["tmk_status"].values:
            review_main["dist_to_tmk_cps_parcel_m"] = None
            hit_ambig_mask = review_main["tmk_status"] == "hit_but_tmk_exists_elsewhere"
            for idx in review_main.loc[hit_ambig_mask].index:
                tmk_val = review_main.loc[idx, cp_tmk_col]
                point_geom = review_main.loc[idx, "geometry"]
                matching_parcel = parcels.loc[parcels[pr_tmk_col] == tmk_val]
                if len(matching_parcel) > 0:
                    dist = point_geom.distance(matching_parcel.geometry.iloc[0])
                    review_main.loc[idx, "dist_to_tmk_cps_parcel_m"] = round(dist, 1)
        
        # Merge nohit_review columns
        if len(nohit_review) > 0:
            nohit_review_cols = nohit_review[["_cp_rowid", pr_tmk_col, "dist_to_nearest_m", "last_digit_off_by_1"]].copy()
            nohit_review_cols = nohit_review_cols.rename(columns={pr_tmk_col: "nearest_tmk_parcels"})
            review_main = review_main.merge(nohit_review_cols, on="_cp_rowid", how="left")
        
        log(f"Review table: {len(review_main):,} records")
        
        # ---------------------------------------------------------------------
        # PREPARE FINAL OUTPUTS
        # ---------------------------------------------------------------------
        
        # Corrected layer: keep original columns + tmk_validated + tmk_status
        # Apply rename_map to get post-rename column names
        export_cols = [cp_cfg["rename_map"].get(c, c) for c in cp_cfg["keep_cols"]]
        export_cols += ["tmk_validated", "tmk_status", "geometry"]
        
        cesspools_corrected = cp_join[export_cols].copy()
        cesspools_review = review_main.copy()
        
        log(f"Export columns: {[c for c in export_cols if c != 'geometry']}")
        
        # Validate row count
        if len(cesspools_corrected) != n_original:
            raise ValueError(f"Row count mismatch: {len(cesspools_corrected):,} vs {n_original:,}")
        
        # Status summary
        log("=" * 60)
        log("TMK CORRECTION SUMMARY")
        status_counts = cesspools_corrected["tmk_status"].value_counts()
        for status, count in status_counts.items():
            pct = count / len(cesspools_corrected) * 100
            log(f"  {status}: {count:,} ({pct:.1f}%)")
        log("=" * 60)
        
        # Export
        if DRY_RUN:
            log(f"[DRY RUN] Would write: {out_path_corrected}")
            log(f"[DRY RUN] Would write: {out_path_review}")
        else:
            write_vector_gpkg(cesspools_corrected, out_path_corrected, "cesspools_corrected")
            write_vector_gpkg(cesspools_review, out_path_review, "cesspools_review")
            
            # CSV for easier viewing
            review_csv_path = REVIEW / "cesspools_review.csv"
            cesspools_review.drop(columns=["geometry"]).to_csv(review_csv_path, index=False)
            log(f"Wrote review CSV: {review_csv_path}")
        
        log("Step 4e complete")
        
    except Exception as e:
        log(f"ERROR in Step 4e: {e}")
        if STOP_ON_ERROR:
            raise

# Final summary (runs for both branches)
log(f"Cesspools corrected: {len(cesspools_corrected):,} rows")
log(f"Cesspools for review: {len(cesspools_review):,} rows")

#### Step 4f: QA/QC review map (if needed)

Interactive map for reviewing flagged cases:
- Purple/Blue: Hit ambiguous cases (landed in parcel A, TMK claims parcel B)
- Orange: No-hit cases where last digit is off by 1 (likely typo)
- Red: No-hit cases needing manual review
- Yellow: Malformed TMKs

In [None]:
log("Step 4f: QA/QC review map (optional)")

if len(cesspools_review) > 0:
    try:
        review_map = build_review_qaqc_map(
            review_main=cesspools_review,
            parcels=parcels,
            web_crs=WEB_CRS,
            cp_tmk_col=cp_tmk_col,
            pr_tmk_col=pr_tmk_col,
            nearest_pr_tmk_col="nearest_tmk_parcels",
        )
        if review_map is not None:
            display(review_map)
    except Exception as e:
        log(f"QA/QC map failed (non-blocking): {e}")
else:
    log("No review cases; skipping QA/QC map.")

In [None]:
log("QA/QC map inputs: status counts")
display(cesspools_review["tmk_status"].value_counts(dropna=False))

log("QA/QC map inputs: last_digit_off_by_1 counts (review_nohit only)")
mask_nohit = cesspools_review["tmk_status"] == "review_nohit_beyond_threshold"
if mask_nohit.any() and "last_digit_off_by_1" in cesspools_review.columns:
    display(cesspools_review.loc[mask_nohit, "last_digit_off_by_1"].value_counts(dropna=False))
else:
    log("No review_nohit_beyond_threshold rows (or last_digit_off_by_1 missing).")

#### Review questions for PI/team

*Data quality:*

1. How should we handle malformed TMKs (NULL, wrong length, non-numeric)? Options: exclude, attempt correction, or flag for manual lookup?

*Ambiguous hit cases:*

2. If a cesspool lands in parcel A but `tmk_cps` matches parcel B elsewhere, which is truth: spatial location or attributed TMK? 

*No-hit cases beyond threshold:*

3. If a cesspool exceeds the distance threshold but the nearest TMK differs by only 1 digit in the last position, should we correct? Sequential TMKs typically indicate adjacent parcels.
4. Is there a hard maximum distance beyond which we never correct? Current auto-correction threshold: `thresh_m = 25`.

*General:*

5. For unresolvable review cases: exclude from MPAT, include with flag, or hold for manual correction?

## Prep Summary
Table: dataset name, path, CRS, row count / raster metadata

In [None]:
log("Prep Summary")

# Vector summaries
vector_summary = []
for name, path in {**OUT_PREP, **OUT_REVIEW}.items():
    if path.suffix in [".gpkg", ".shp"] and path.exists():
        vector_summary.append(summarize_vector(path, name))

# Raster summaries
raster_summary = []
for name, path in OUT_PREP.items():
    if path.suffix == ".tif" and path.exists():
        raster_summary.append(summarize_raster(path, name))

log("Vector layers:")
df_vec = pd.DataFrame(vector_summary)
display(df_vec)

log("Raster layers:")
df_ras = pd.DataFrame(raster_summary)
# Format size: show GB if > 1000 MB
df_ras["size"] = df_ras["size_mb"].apply(
    lambda x: f"{x/1024:.2f} GB" if x > 1000 else f"{x:.1f} MB"
)
display(df_ras.drop(columns=["size_mb"]))

# Cesspool TMK correction status
log("Cesspool TMK correction status:")
status_summary = (
    cesspools_corrected["tmk_status"]
    .value_counts(dropna=False)
    .reset_index()
    .rename(columns={"index": "status", "tmk_status": "count"})
)
status_summary.columns = ["status", "count"]
status_summary["percent"] = (status_summary["count"] / len(cesspools_corrected) * 100).round(1)
display(status_summary)

#### Cesspool TMK Correction Status Definitions

| Status | Step | Correction Applied | Description |
|--------|------|-------------------|-------------|
| `match_ok` | 4c | None | Cesspool landed in parcel and TMKs match. No action needed. |
| `tmk_corrected` | 4c | String only | Cesspool landed in parcel but TMKs didn't match; `tmk_cps` didn't exist elsewhere in parcels. Corrected `tmk_validated` to parcel TMK. |
| `geometry_corrected` | 4d | Geometry only | Cesspool landed outside all parcels, but `tmk_cps` exists in parcels. Moved point to matching parcel. |
| `nearest_corrected` | 4d | String + geometry | Cesspool landed outside all parcels, `tmk_cps` doesn't exist, but nearest parcel within 25m. Corrected both TMK and geometry to nearest parcel. |
| `hit_but_tmk_exists_elsewhere` | 4c | None (review) | Cesspool landed in parcel A, but `tmk_cps` matches parcel B elsewhere. Ambiguous. Flagged for manual review. |
| `review_nohit_beyond_threshold` | 4d | None (review) | Cesspool landed outside all parcels, `tmk_cps` doesn't exist, and nearest parcel > 25m away. Flagged for manual review. |
| `tmk_malformed` | 4c | None (review) | Invalid TMK format (null, wrong length, or non-numeric). Flagged for manual review. |

*Notes*:

- Review queue: Statuses ending in "review" or flagged are exported to `data/01_inputs/review_queue/` for PI/team manual review. 
- See step 4f for interactive QA/QC review map
- See Appendix: TMK Correction Decision Workflow for full logic.

## Appendix

### TMK Correction Decision Workflow

#### Goal
Ensure each cesspool has a validated TMK matching the parcel layer.

#### Key assumptions
- Parcel TMK values (`tmk_parcels`) are authoritative
- Output: `cesspools_validated` GeoDataFrame with:
  - `tmk_validated` column
  - Corrected geometries where needed

#### Definitions
- Hit: cesspool point lands within a parcel polygon
- No-hit: cesspool point falls outside all parcel polygons
- TMK match: `tmk_cps` string equals `tmk_parcels` string

#### Decision workflow

Step 0: TMK data quality check
- Parcels (authoritative layer):
  - Malformed TMKs filtered upstream in `process_parcels`
  - If any remain, raise error
- Cesspools (layer to be corrected):
  - Is `tmk_cps` valid? (not null, 9 digits, numeric only)
    - Valid: continue to Step 1
    - Invalid: flag = `tmk_malformed`

Step 1: Spatial join
- Does cesspool intersect a parcel polygon?
  - Hit: go to Step 2a
  - No-hit: go to Step 2b

Step 2a: Hit cases (geometry is fine)
- Does `tmk_cps` == `tmk_parcels`?
  - Yes: status = `match_ok`, no action needed
  - No: does `tmk_cps` exist elsewhere in parcels layer?
    - Yes: flag = `hit_but_tmk_exists_elsewhere`. Ambiguous case where point landed in parcel A, but `tmk_cps` matches parcel B.
    - No: status = `tmk_corrected`. Set `tmk_validated` = `tmk_parcels`. String change only.

Step 2b: No-hit cases (geometry needs correction)
- Does `tmk_cps` exist anywhere in parcels layer?
  - Yes: status = `geometry_corrected`. Move point to parcel matching `tmk_cps`. No string change.
  - No: find nearest parcel, check distance
    - Distance <= threshold: status = `nearest_corrected`. Correct both string and geometry to nearest parcel.
    - Distance > threshold: status = `review_nohit_beyond_threshold`. Add `last_digit_off_by_1` flag column. Output to review table.

#### Status categories

| Status | Correction type | Description |
|--------|-----------------|-------------|
| `match_ok` | None | TMKs match, no correction needed |
| `tmk_corrected` | String only | Hit, corrected `tmk_cps` to `tmk_parcels` |
| `geometry_corrected` | Geometry only | No-hit, `tmk_cps` exists in parcels, moved point |
| `nearest_corrected` | String + geometry | No-hit, corrected to nearest parcel within threshold |
| `tmk_malformed` | None (flagged) | Invalid TMK format, needs manual review |
| `hit_but_tmk_exists_elsewhere` | None (flagged) | Hit parcel A, but `tmk_cps` matches parcel B |
| `review_nohit_beyond_threshold` | None (flagged) | No-hit, beyond threshold, needs manual review |

#### Review subset flags

| Flag/Status | Condition |
|-------------|-----------|
| `tmk_malformed` | NULL, wrong length (not 9), or non-numeric |
| `hit_but_tmk_exists_elsewhere` | Hit parcel A, but `tmk_cps` matches parcel B |
| `review_nohit_beyond_threshold` | No-hit, beyond threshold |
| `last_digit_off_by_1` | Boolean column: nearest TMK off by 1 in last digit |