In [None]:
import json
# Neighborhood agglomeration notebook (script-style)
# Implements the algorithm from spec.md, including memoized neighbor graph and R^2 edge scoring.

import warnings
from dataclasses import dataclass
from typing import Dict, Set, Tuple, Optional, List

import numpy as np
import pandas as pd
import geopandas as gpd
from shapely.ops import unary_union
from sklearn.neighbors import NearestNeighbors
import os
import multiprocessing as mp

from utilities import ensure_feet_crs, compute_market_value_proxy, ols_r2


In [None]:
# Parameters (edit to your environment)
PARCELS_PATH = "parcels.parquet"
TILES_PATH = "neighborhood_tiles.parquet"
OUTPUT_PREFIX = "intermediate_tiles_"  # parquet written as f"{OUTPUT_PREFIX}{iter_num}.parquet"
TARGET_TILE_COUNT: Optional[int] = None  # desired final number of tiles
TARGET_FIT_PCT: Optional[float] = .69
CRS_EPSG_FEET: Optional[int] = 2236 # SW Florida
#CRS_EPSG_FEET: Optional[int] = 2264  #guilford
BUFFER_FEET = 30.0  # adjacency buffer per spec
K_NEIGHBORS = 3  # spatial lag k
RANDOM_STATE = 42  # for any deterministic ordering if needed


In [None]:
os.makedirs("intermediates", exist_ok=True)

In [None]:
# Build initial tile-parcel membership using spatial join (parcels within tiles)

def assign_parcels_to_tiles(
        parcels: gpd.GeoDataFrame,
        tiles: gpd.GeoDataFrame,
        parcel_key: str = "parcel_key",
        tile_key: str = "tile_key"
) -> Dict[str, Set[int]]:
    joined = gpd.sjoin(parcels[[parcel_key, "geometry"]].reset_index().rename(columns={"index": "pidx"}),
                       tiles[[tile_key, "geometry"]], how="inner", predicate="within")
    mapping: Dict[str, Set[int]] = {}
    for tid, grp in joined.groupby(tile_key):
        mapping[str(tid)] = set(grp["pidx"].tolist())
    return mapping


In [None]:
@dataclass
class Tile:
    key: str
    geometry: object
    r_squared: Optional[float] = np.nan  # R^2 from the merge that created this tile


from mp_helpers import _init_pool_buffered, _pair_overlaps_area, _init_pool_edges, _score_edge_pair_worker
from cache_io import make_cache_key, load_cache, save_cache, cache_valid, df_to_adjacency, df_to_edge_scores


def build_adjacency(tiles_gdf: gpd.GeoDataFrame, buffer_feet: float, n_jobs: Optional[int] = None) -> Dict[str, Set[str]]:
    # Defensive: ensure projected CRS in feet or convert buffer to data units
    crs = tiles_gdf.crs
    if crs is not None:
        unit = None
        try:
            unit = crs.axis_info[0].unit_name.lower()
        except Exception:
            pass
        # Approximate conversion if CRS uses meters
        if unit and unit.startswith("met"):
            buf = buffer_feet * 0.3048
        elif unit and ("foot" in unit):
            buf = buffer_feet
        else:
            raise Exception(f"Unsupported CRS unit: {unit}")
            # Unknown or degrees – safer to treat as meters-like fallback
            # Consider reprojecting before calling this, or pass CRS_EPSG_FEET
            buf = buffer_feet * 0.3048
    else:
        # No CRS – assume feet distance intent but we can’t be sure. Treat as feet.
        buf = buffer_feet

    keys = tiles_gdf["tile_key"].astype(str).tolist()
    geoms = tiles_gdf.geometry
    buffered = geoms.buffer(buf)
    buffered_list: List[object] = list(buffered.values)

    adjacency: Dict[str, Set[str]] = {k: set() for k in keys}

    # Spatial index and candidate generation (with fallback for older GeoPandas/rtree backends)
    buffered_gs = gpd.GeoSeries(buffered_list)
    sindex = buffered_gs.sindex
    cand_pairs: List[Tuple[int, int]] = []
    print("Generating candidate pairs")

    # Determine jobs early so the fallback can use threads
    if n_jobs is None:
        # Use all but one core by default, minimum 1
        try:
            cpu = os.cpu_count() or 1
        except Exception:
            cpu = 1
        n_jobs = max(1, cpu - 1)
    n_jobs = max(1, int(n_jobs))

    try:
        # Preferred fast path
        if hasattr(sindex, "query_bulk"):
            left_idx, right_idx = sindex.query_bulk(buffered_gs, predicate="intersects")
            cand_pairs = [(int(i), int(j)) for i, j in zip(left_idx, right_idx) if int(j) > int(i)]
        else:
            raise AttributeError("query_bulk not available")
    except Exception:
        # Fallback path: per-geometry queries using predicate when available, else bbox intersection
        def _cand_for_i(i: int) -> List[Tuple[int, int]]:
            geom = buffered_list[i]
            try:
                cand_idx = list(sindex.query(geom, predicate="intersects"))
            except Exception:
                try:
                    cand_idx = list(sindex.intersection(getattr(geom, "bounds", ())))
                except Exception:
                    cand_idx = []
            out: List[Tuple[int, int]] = []
            for j in cand_idx:
                j = int(j)
                if j > i:
                    out.append((i, j))
            return out

        if n_jobs > 1:
            print(f"Candidate generation (fallback): threading across {n_jobs} workers...")
            try:
                import concurrent.futures as _fut
                with _fut.ThreadPoolExecutor(max_workers=n_jobs) as ex:
                    for res in ex.map(_cand_for_i, range(len(buffered_list))):
                        if res:
                            cand_pairs.extend(res)
            except Exception:
                # If threading fails for any reason, use single-threaded loop
                for i in range(len(buffered_list)):
                    cand_pairs.extend(_cand_for_i(i))
        else:
            for i in range(len(buffered_list)):
                cand_pairs.extend(_cand_for_i(i))

    print(f"{len(cand_pairs)} candidate pairs")

    # Determine jobs
    if n_jobs is None:
        # Use all but one core by default, minimum 1
        try:
            cpu = os.cpu_count() or 1
        except Exception:
            cpu = 1
        n_jobs = max(1, cpu - 1)
    n_jobs = max(1, int(n_jobs))

    if len(cand_pairs) == 0:
        return adjacency


    # Single-process evaluation
    for i, j in cand_pairs:
        ki = keys[i]; kj = keys[j]
        adjacency[ki].add(kj)
        adjacency[kj].add(ki)
    return adjacency



In [None]:
# Edge scoring with memoization

@dataclass
class EdgeScore:
    r2: float
    n_obs: int
    n_sales: int


def score_edge(tile_a: str, tile_b: str,
               parcel_idx_by_tile: Dict[str, Set[int]],
               parcels: gpd.GeoDataFrame) -> EdgeScore:
    idxs = list(parcel_idx_by_tile.get(tile_a, set()) | parcel_idx_by_tile.get(tile_b, set()))
    if len(idxs) == 0:
        return EdgeScore(0.0, 0, 0)
    sub = parcels.loc[list(idxs)]

    # Sales count threshold (using adj_sale_price presence)
    n_sales = sub["adj_sale_price"].notna().sum() if "adj_sale_price" in sub.columns else 0
    if n_sales < 3:
        return EdgeScore(0.0, len(sub), int(n_sales))

    # Predict market_value_proxy using built and land area
    cols = ["market_value_proxy", "built_area_sqft", "land_area_sqft"]
    if not all(c in sub.columns for c in cols):
        return EdgeScore(0.0, len(sub), int(n_sales))
    df = sub[cols].astype(float).dropna()
    if len(df) < 3:
        return EdgeScore(0.0, len(sub), int(n_sales))

    y = df["market_value_proxy"].values
    X = df[["built_area_sqft", "land_area_sqft"]].values
    r2 = ols_r2(y, X)
    return EdgeScore(r2=float(r2), n_obs=int(len(df)), n_sales=int(n_sales))


In [None]:
# Spatial lag: 3-NN inverse-distance weighted imputation for a numeric column.

def spatial_lag_impute(points_gdf: gpd.GeoDataFrame, value_col: str, k: int = 3) -> pd.Series:
    """
    Impute missing values in value_col using k-NN inverse-distance weighted average of neighbors.
    Uses centroids for distance computations. Returns a Series of the same length with imputed values.
    """
    s = points_gdf[value_col].astype(float).copy()
    missing_mask = s.isna()
    if not missing_mask.any():
        return s

    # Build coordinate array
    centroids = points_gdf.geometry.centroid
    X = np.vstack([centroids.x.values, centroids.y.values]).T

    # Fit kNN on all points
    n_neighbors = min(k, len(points_gdf))
    nbrs = NearestNeighbors(n_neighbors=n_neighbors, algorithm="auto").fit(X)
    distances, indices = nbrs.kneighbors(X)

    # For each missing point, compute weighted avg from neighbors that have values.
    for i in np.where(missing_mask.values)[0]:
        idxs = indices[i]
        dists = distances[i]
        # Avoid zero distance
        dists = np.where(dists == 0, 1e-6, dists)
        vals = s.iloc[idxs].values
        # If some neighbors also missing, restrict to ones with values
        mask_has = ~np.isnan(vals)
        if not mask_has.any():
            continue  # nothing to impute with; leave NaN
        vals = vals[mask_has]
        d = dists[mask_has]
        w = 1.0 / d
        w = w / w.sum()
        s.iloc[i] = np.dot(w, vals)
    return s


In [None]:
# Merge mechanics and iteration

def dissolve_geometries(tiles_gdf: gpd.GeoDataFrame, members: List[str], tile_index_field="tile_key") -> object:
    geoms = tiles_gdf.set_index(tile_index_field).loc[members, "geometry"].values.tolist()
    return unary_union(geoms)


def run_agglomeration(parcels_path: str = PARCELS_PATH,
                      tiles_path: str = TILES_PATH,
                      target_tile_count: Optional[int] = TARGET_TILE_COUNT,
                      target_fit_pct: Optional[float] = TARGET_FIT_PCT,
                      buffer_feet: float = BUFFER_FEET,
                      k_neighbors: int = K_NEIGHBORS,
                      crs_epsg_feet: Optional[int] = CRS_EPSG_FEET,
                      output_prefix: str = OUTPUT_PREFIX,
                      n_jobs: Optional[int] = None,
                      cache_dir: str = ".cache",
                      cache_prefix: str = "init",
                      use_cache: bool = True,
                      force_rebuild: bool = False):
    # Load data
    parcels = gpd.read_parquet(parcels_path)
    tiles = gpd.read_parquet(tiles_path)

    # Reproject if desired
    parcels = ensure_feet_crs(parcels, crs_epsg_feet)
    tiles = ensure_feet_crs(tiles, crs_epsg_feet)

    print(list(parcels.columns))

    parcels["built_area_sqft"] = parcels["bldg_area_finished_sqft"] #TODO
    parcels["assessed_value"] = parcels["assr_market_value"] #TODO
    parcels["adj_sale_price"] = parcels["sale_price_time_adj"] #TODO
    parcels["parcel_key"] = parcels["key"] #TODO
    # Compute/impute built_area_sqft
    if "built_area_sqft" not in parcels.columns:
        print(list(parcels.columns))
        raise ValueError("parcels missing built_area_sqft column")
    parcels["built_area_sqft"] = parcels["built_area_sqft"].astype(float)
    parcels["built_area_sqft"] =  parcels["built_area_sqft"].fillna(0.0)
    parcels["built_area_sqft"] = spatial_lag_impute(parcels, "built_area_sqft", k=k_neighbors)

    # Market value proxy, then spatial lag
    parcels["market_value_proxy"] = compute_market_value_proxy(parcels)
    parcels["market_value_proxy"] = spatial_lag_impute(parcels, "market_value_proxy", k=k_neighbors)

    # Ensure land_area_sqft numeric
    if "land_area_sqft" in parcels.columns:
        parcels["land_area_sqft"] = parcels["land_area_sqft"].astype(float)

    # Initialize tiles structure
    tiles = tiles.copy()
    tiles["tile_key"] = tiles["tile_key"].astype(str)
    if "r_squared" not in tiles.columns:
        tiles["r_squared"] = np.nan

    # Prefilter tiles: drop any tile that does not intersect at least one parcel (speeds adjacency)
    try:
        j = gpd.sjoin(tiles[["tile_key", "geometry"]], parcels[["geometry"]], how="inner", predicate="intersects")
        keep_keys = set(j["tile_key"].astype(str).unique().tolist())
        before_n = len(tiles)
        tiles = tiles[tiles["tile_key"].isin(keep_keys)].reset_index(drop=True)
        after_n = len(tiles)
        print(f"Prefiltered tiles by parcel intersection: {before_n} -> {after_n}")
    except Exception as e:
        warnings.warn(f"Prefilter step failed; proceeding without drop. Error: {e}")

    # Initial parcel membership (on filtered tiles)
    parcel_idx_by_tile = assign_parcels_to_tiles(parcels, tiles)

    # Cache key after prefilter
    meta_now = make_cache_key(parcels_path, tiles_path, buffer_feet, crs_epsg_feet, k_neighbors, tiles)

    adjacency: Optional[Dict[str, Set[str]]] = None
    edge_scores: Dict[frozenset, EdgeScore] = {}

    if use_cache and not force_rebuild:
        cached_meta, df_adj, df_scores = load_cache(cache_dir=cache_dir, prefix=cache_prefix)
        if cached_meta is not None and df_adj is not None and cache_valid(cached_meta, meta_now):
            print("Loading adjacency/edge scores from cache...")
            adjacency = df_to_adjacency(df_adj)
            # Edge scores may be empty or missing; it's okay
            raw_scores = df_to_edge_scores(df_scores)
            for k, d in raw_scores.items():
                edge_scores[k] = EdgeScore(r2=float(d.get("r2", 0.0)), n_obs=int(d.get("n_obs", 0)), n_sales=int(d.get("n_sales", 0)))
        else:
            if cached_meta is not None:
                print("Cache present but invalid/stale; rebuilding...")

    if adjacency is None:
        # Build initial adjacency
        adjacency = build_adjacency(tiles[["tile_key", "geometry"]], buffer_feet, n_jobs=n_jobs)

        # Build unique edge list from adjacency
        _edge_pairs: List[Tuple[str, str]] = []
        for a, nbs in adjacency.items():
            for b in nbs:
                if a < b:
                    _edge_pairs.append((a, b))

        print(f"Scoring {len(_edge_pairs)} initial edges...")

        # Determine n_jobs if not provided
        _jobs = n_jobs
        if _jobs is None:
            try:
                _cpu = os.cpu_count() or 1
            except Exception:
                _cpu = 1
            _jobs = max(1, _cpu - 1)
        _jobs = max(1, int(_jobs))

        if len(_edge_pairs) == 0:
            pass
        elif _jobs == 1:
            # Single-thread scoring
            for a, b in _edge_pairs:
                edge_scores[frozenset([a, b])] = score_edge(a, b, parcel_idx_by_tile, parcels)
        else:
            # Parallel scoring via multiprocessing (spawn)
            try:
                cols_needed = [c for c in ["market_value_proxy", "built_area_sqft", "land_area_sqft", "adj_sale_price"] if c in parcels.columns]
                parcels_min_df = parcels[cols_needed].copy()
                chunksize = max(1, len(_edge_pairs) // (_jobs * 16))
                print(f"Initial edge scoring: {len(_edge_pairs):,} pairs across {_jobs} processes...")
                with mp.get_context("spawn").Pool(processes=_jobs, initializer=_init_pool_edges, initargs=(parcels_min_df, parcel_idx_by_tile)) as pool:
                    for a, b, r2, n_obs, n_sales in pool.imap_unordered(_score_edge_pair_worker, _edge_pairs, chunksize=chunksize):
                        edge_scores[frozenset([a, b])] = EdgeScore(r2=float(r2), n_obs=int(n_obs), n_sales=int(n_sales))
            except Exception as e:
                warnings.warn(f"Parallel edge scoring failed ({e}); falling back to single-thread.")
                for a, b in _edge_pairs:
                    edge_scores[frozenset([a, b])] = score_edge(a, b, parcel_idx_by_tile, parcels)
        print("Initial edge scoring complete")

        # Save cache
        try:
            save_cache(adjacency, edge_scores, meta_now, cache_dir=cache_dir, prefix=cache_prefix)
            print("Saved adjacency and initial edge scores cache.")
        except Exception as e:
            warnings.warn(f"Failed to save cache: {e}")

    # Define recompute helper after adjacency/edge_scores available
    def recompute_edges_for(tile_id: str):
        for nb in adjacency.get(tile_id, set()):  # type: ignore[union-attr]
            key = frozenset([tile_id, nb])
            edge_scores[key] = score_edge(tile_id, nb, parcel_idx_by_tile, parcels)

    # Helper to select best edge with tie-breakers
    def pick_best_edge() -> Optional[Tuple[str, str, EdgeScore]]:
        if not edge_scores:
            return None
        # Convert to list with union parcel count for tie-break
        candidates = []
        for k, es in edge_scores.items():
            if len(k) != 2:
                continue
            a, b = tuple(k)
            union_count = len(parcel_idx_by_tile.get(a, set()) | parcel_idx_by_tile.get(b, set()))
            candidates.append((a, b, es, union_count))
        if not candidates:
            return None
        # First by r2 desc, then union parcel count desc, then deterministic by sorted keys
        candidates.sort(key=lambda t: (t[2].r2, t[3], tuple(sorted([t[0], t[1]]))), reverse=True)
        a, b, es, _ = candidates[0]
        return a, b, es

    # Iterative merge
    iteration = 0
    most_recent_fit_quality = 1.0
    target_tile_count = 1 if target_tile_count is None else target_tile_count
    target_fit_pct = 0.0 if target_fit_pct is None else target_fit_pct
    fit_quality: list[float] = []
    while len(tiles) > target_tile_count and most_recent_fit_quality > target_fit_pct:
        pick = pick_best_edge()
        if pick is None:
            print("No prospective joins remain. Stopping.")
            break
        a, b, es = pick
        if es.r2 is None or not np.isfinite(es.r2):
            es = EdgeScore(0.0, es.n_obs, es.n_sales)

        most_recent_fit_quality = es.r2
        fit_quality.append(most_recent_fit_quality)
        # Merge a and b into new tile id
        new_key = f"{a}+{b}"
        if iteration % 10 == 0:
            # print(f"Iteration {iteration}: merging {a} + {b} with R^2={es.r2:.4f} (n={es.n_obs}, sales={es.n_sales}) -> {new_key}")
            print(f"Iteration {iteration}: merging regions with R^2={es.r2:.4f} (n={es.n_obs}, sales={es.n_sales})")

        # Geometry union
        new_geom = dissolve_geometries(tiles, [a, b])

        # Update tiles GeoDataFrame
        new_row = pd.DataFrame({"tile_key": [new_key], "r_squared": [es.r2]})
        new_gdf = gpd.GeoDataFrame(new_row, geometry=[new_geom], crs=tiles.crs)

        # Remove a, b; add new
        tiles = tiles[~tiles["tile_key"].isin([a, b])]
        tiles = pd.concat([tiles, new_gdf], ignore_index=True)

        # Update parcel memberships
        parcel_idx_by_tile[new_key] = parcel_idx_by_tile.get(a, set()) | parcel_idx_by_tile.get(b, set())
        parcel_idx_by_tile.pop(a, None)
        parcel_idx_by_tile.pop(b, None)

        # Update adjacency
        neighbors = (adjacency.get(a, set()) | adjacency.get(b, set())) - {a, b}
        # Remove references to a and b
        for nb in neighbors:
            adjacency[nb].discard(a)
            adjacency[nb].discard(b)
        adjacency.pop(a, None)
        adjacency.pop(b, None)
        # Set new adjacency entry
        adjacency[new_key] = set()
        for nb in neighbors:
            if nb == new_key:
                continue
            # Rook check using shared boundary on current geometries
            # We'll conservatively assume neighbors remain neighbors; validate via geometry if needed
            adjacency[new_key].add(nb)
            adjacency[nb].add(new_key)
        # Recompute edges involving new_key; remove old edges involving a or b
        keys_to_remove = [k for k in list(edge_scores.keys()) if a in k or b in k]
        for k in keys_to_remove:
            edge_scores.pop(k, None)
        recompute_edges_for(new_key)

        # Write intermediate parquet snapshot
        if iteration % 200 == 0:
            out_path = f"intermediates/{output_prefix}{iteration}.parquet"
            tiles[["tile_key", "r_squared", "geometry"]].to_parquet(out_path)
            print(f"Wrote {out_path} with {len(tiles)} tiles")

        iteration += 1
        if len(adjacency.get(new_key, set())) == 0 and len(tiles) > target_tile_count:
            # If the new tile has no neighbors, check if any edges remain; otherwise stop
            if not edge_scores:
                print("No edges remain after merge; stopping.")
                break

    print(f"Finished with {len(tiles)} tiles with a minimum fit score of {most_recent_fit_quality}.")
    out_path = f"output_tiles_final.parquet"
    tiles[["tile_key", "r_squared", "geometry"]].to_parquet(out_path)
    print(f"Wrote {out_path} with {len(tiles)} tiles")
    print("Saving fit quality to fit_quality.js")
    with open("fit_quality.js", "w") as f:
        json.dump(fit_quality, f)
    return tiles


In [None]:
# Example run (will execute when run as a script or in this notebook)
if __name__ == "__main__":
    try:
        run_agglomeration()
    except FileNotFoundError as e:
        print(f"Skipping run: {e}. Ensure input parquet files exist.")
