# Thurston County Broadband — Coverage Overview & Terrain-Aware Cell Decay Analysis

This notebook builds a reproducible workflow to:

1. Produce a county-level overview of fixed broadband availability and measured performance.
2. Estimate cellular fallback coverage from OSM-scraped towers using a distance-decay + terrain-aware approach.
3. Attach results to parcels/buildings and compute proximity to WSDOT dig-once corridors and road-based extension costs.

The notebook contains runnable code *skeletons* and detailed pseudocode explanations for each block so you can adapt it to your local data paths and parameter choices.

Save this notebook and run cells sequentially. Replace placeholder paths with your actual files in the `data/` folder.

## 1) Setup — imports, constants, and default parameters

This cell imports the Python libraries commonly used in the analysis and defines default parameters (frequency, TX power, CRS, etc.).

Pseudocode explanation:

- Import geopandas, numpy, rasterio, osmnx, shapely, networkx, scipy, and matplotlib.
- Set `TARGET_CRS` to your county projected CRS (e.g., `EPSG:2285` for WA State Plane South).
- Define radio parameters (frequency in MHz, tx power in dBm, antenna heights, receiver threshold).
- Define paths for your `data/` and `output/` directories.

Run this cell first to confirm your environment has the required libraries.

In [None]:

# Standard imports
import os
from pathlib import Path
import geopandas as gpd
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import rasterio
from rasterio.transform import from_origin
from shapely.geometry import Point, LineString
import osmnx as ox
import networkx as nx
import nbformat
import json

# Parameters - change these to match your environment and preferences
TARGET_CRS = "EPSG:2285"  # WA State Plane South (example)
DATA_DIR = Path("data")
OUTPUT_DIR = Path("output")
os.makedirs(DATA_DIR, exist_ok=True)
os.makedirs(OUTPUT_DIR, exist_ok=True)

# Radio parameters (tweakable)
F_MHZ = 700.0         # center frequency in MHz for rural cellular band (example)
TX_DBM = 46.0         # typical macro site transmit power (dBm)
TX_ANT_GAIN_DB = 15.0 # transmitter antenna gain (dBi)
RX_ANT_GAIN_DB = 0.0  # receiver antenna gain (dBi)
ANTENNA_HEIGHT_M = 25.0  # typical tower antenna height (m)
RECV_HEIGHT_M = 1.5     # assumed receiver height on building (m)
OTHER_LOSSES_DB = 10.0  # miscellaneous losses (dB)
RECV_THRESHOLD_DBM = -100.0  # usable threshold
GRID_RES_M = 100  # coarse grid raster resolution for initial propagation (meters)


## 2) Data inputs — expected files and brief descriptions

Place or point this notebook to the following files under `data/`:

- `parcels_thurston.geojson` — parcel polygons (county cadastral)
- `thurston_blocks.geojson` — census block geometries (or block groups)
- `block_speed_summary.geojson` — your aggregated speed-test results by block
- `fcc_bdc_fixed.gpkg` — fixed broadband availability polygons (FCC / WABO)
- `wsdot_dig_once.geojson` — WSDOT dig-once corridors or road segments
- `dem_thurston.tif` — DEM covering the county (for LOS / slope)

If you don't have these exact files, adapt paths to your available layers. The code below attempts to read them and will raise errors that point to missing files.

In [None]:

# Load vector layers (replace filenames if different)
parcels_fp = DATA_DIR / "parcels_thurston.geojson"
blocks_fp = DATA_DIR / "thurston_blocks.geojson"
speed_blocks_fp = DATA_DIR / "block_speed_summary.geojson"
fcc_fp = DATA_DIR / "fcc_bdc_fixed.gpkg"
wsdot_fp = DATA_DIR / "wsdot_dig_once.geojson"
dem_fp = DATA_DIR / "dem_thurston.tif"

# Quick loader with checks
def load_gpkg(fp):
    if fp.exists():
        return gpd.read_file(fp).to_crs(TARGET_CRS)
    else:
        raise FileNotFoundError(f"Missing file: {fp}")

# Attempt to load (comment out or adapt if you don't have files yet)
loaded = {}
for name, fp in [("parcels", parcels_fp), ("blocks", blocks_fp),
                 ("speed_blocks", speed_blocks_fp), ("fcc", fcc_fp),
                 ("wsdot", wsdot_fp)]:
    try:
        loaded[name] = gpd.read_file(fp).to_crs(TARGET_CRS)
        print(f"Loaded {name} -> {fp.name}, count: {len(loaded[name])}")
    except Exception as e:
        print(f"Could not load {name}: {e}")

# DEM check
if dem_fp.exists():
    print(f"DEM found: {dem_fp} (size: {dem_fp.stat().st_size} bytes)")
else:
    print(f"DEM not found at {dem_fp}. You can use SRTM or WA state DEM and save it to this path.")


## 3) Build a coverage overview & categories

Objective: create a county block-level or parcel-level `coverage_status` that combines availability (FCC/WABO) and measured performance (median speed).

Pseudocode explanation:

1. Ensure `blocks` and `fcc` are in the same CRS.
2. Create boolean `fixed_available` by testing intersection/contains with the FCC polygons.
3. Merge `speed_blocks` (median download) into `blocks` by GEOID.
4. Create `coverage_status` categories: `served_fixed_good`, `served_fixed_poor`, `no_fixed`.

This block outputs a GeoDataFrame called `blocks_cov` that you can map immediately.

In [None]:

# Example coverage overview; the `loaded` dict from previous cell must contain blocks/fcc/speed_blocks
blocks = loaded.get("blocks")
fcc = loaded.get("fcc")
speed_blocks = loaded.get("speed_blocks")

if blocks is None:
    print("blocks layer not loaded. Please provide a blocks GeoJSON and re-run.")
else:
    # 1) Fixed availability flag
    if fcc is not None:
        # Use unary_union for faster geometry tests if polygons are many
        fcc_union = fcc.unary_union
        blocks["fixed_available"] = blocks.geometry.apply(lambda g: g.intersects(fcc_union))
    else:
        blocks["fixed_available"] = False
        print("FCC layer missing; all fixed_available set to False (adjust manually if needed).")
    
    # 2) Merge speed summary if present
    if speed_blocks is not None and "GEOID" in speed_blocks.columns:
        blocks = blocks.merge(speed_blocks[["GEOID","median_down","num_tests"]], on="GEOID", how="left")
    else:
        blocks["median_down"] = np.nan
        blocks["num_tests"] = 0
        print("Speed summary layer missing or missing GEOID column.")
    
    # 3) Define coverage_status
    def coverage_category(row):
        md = row.get("median_down", 0) if pd.notnull(row.get("median_down", None)) else 0
        if row.get("fixed_available", False) and md >= 25:
            return "served_fixed_good"
        if row.get("fixed_available", False) and md < 25:
            return "served_fixed_poor"
        return "no_fixed"
    
    blocks["coverage_status"] = blocks.apply(coverage_category, axis=1)
    blocks_cov = blocks.copy()
    print("blocks_cov created. Value counts:")
    print(blocks_cov["coverage_status"].value_counts())


## 4) Scrape cell towers from OpenStreetMap (OSM) via OSMnx

Goal: fetch OSM features likely to represent towers/antennas. OSM tags commonly used:

- `man_made = mast|tower`
- `communication = tower` or `tower:type` or `antenna`

Pseudocode:
1. Derive bounding box from county `blocks_cov` extent.
2. Use `ox.geometries_from_bbox` with a tag filter.
3. Convert polygons to centroids, clean duplicates, and reproject to TARGET_CRS.

Note: OSM completeness varies; validate results and optionally supplement with FCC/ASR.

In [None]:

# Scrape towers in bbox using osmnx. This can be slow; restrict bbox carefully.
if 'blocks_cov' not in globals():
    print("Run the blocks coverage cell to generate blocks_cov before scraping towers.")
else:
    bounds = blocks_cov.total_bounds  # minx, miny, maxx, maxy (in TARGET_CRS)
    minx, miny, maxx, maxy = bounds[0], bounds[1], bounds[2], bounds[3]
    # osmnx expects lat/lon bbox by default; convert bbox to latlon if needed
    # We'll convert centroid bounds to latlon via geopandas
    bbox_poly = gpd.GeoSeries([
        Point(minx, miny), Point(maxx, maxy)
    ], crs=TARGET_CRS).unary_union.envelope
    bbox_latlon = gpd.GeoSeries([bbox_poly]).set_crs(TARGET_CRS).to_crs("EPSG:4326").total_bounds
    lat_min, lon_min, lat_max, lon_max = bbox_latlon[1], bbox_latlon[0], bbox_latlon[3], bbox_latlon[2]
    
    # OSM filter tags: try multiple keys to capture towers/masts/antenna
    tags = {"man_made": ["mast", "tower"], "tower:type": True, "communication": True, "antenna": True}
    try:
        gdf_osm = ox.geometries_from_bbox(lat_max, lat_min, lon_max, lon_min, tags)
        # Keep point geometries; convert polygons to centroids
        gdf_osm = gdf_osm.explode(index_parts=False).copy()
        gdf_osm["geometry"] = gdf_osm.geometry.centroid
        towers = gdf_osm[["geometry"]].dropna().reset_index(drop=True).set_crs("EPSG:4326").to_crs(TARGET_CRS)
        # Deduplicate by snapping within 50m
        towers["x"] = towers.geometry.x.round(1)
        towers["y"] = towers.geometry.y.round(1)
        towers = towers.drop_duplicates(subset=["x","y"]).drop(columns=["x","y"])
        towers.to_file(OUTPUT_DIR / "osm_towers_thurston.geojson", driver="GeoJSON")
        print(f"Scraped {len(towers)} candidate towers and saved to output/osm_towers_thurston.geojson")
    except Exception as e:
        print('OSM scraping failed:', e)
        towers = gpd.GeoDataFrame(columns=["geometry"], crs=TARGET_CRS)


## 5) Quick propagation raster (FSPL-based) — coarse grid

This block computes a coarse raster of the **maximum** received power (dBm) from the set of towers using Free-Space Path Loss (FSPL).

Pseudocode:
1. Build an evenly spaced grid (resolution GRID_RES_M) covering county extent.
2. For each grid cell, compute distances to all towers and FSPL -> received power Pr = Tx + gains - FSPL - losses.
3. Store the maximum Pr (best-serving tower) for each cell and save as a GeoTIFF.

Notes: This is fast but ignores terrain and diffraction, so it's used as an initial estimate.

In [None]:

# FSPL quick raster generation
from math import log10
if 'towers' not in globals() or len(towers)==0:
    print("No towers detected in workspace. Run the OSM scrape cell or place a towers layer at data/osm_towers_thurston.geojson.")
else:
    # bounding box
    minx, miny, maxx, maxy = blocks_cov.total_bounds
    buffer = 5000  # meters extra padding around bbox
    minx -= buffer; miny -= buffer; maxx += buffer; maxy += buffer
    res = GRID_RES_M
    nx = int(np.ceil((maxx - minx) / res))
    ny = int(np.ceil((maxy - miny) / res))
    xs = np.linspace(minx + res/2, maxx - res/2, nx)
    ys = np.linspace(miny + res/2, maxy - res/2, ny)
    
    tower_coords = np.array([[pt.x, pt.y] for pt in towers.geometry])
    def fspl_db(dist_m, f_mhz):
        # avoid zero distances
        d_km = np.maximum(dist_m / 1000.0, 0.001)
        return 20.0 * np.log10(d_km) + 20.0 * np.log10(f_mhz) + 32.44
    
    pr_grid = np.full((ny, nx), -9999.0, dtype=float)
    # iterate rows (careful with large grids)
    for i, y in enumerate(ys[::-1]):
        row_vals = np.empty(nx, dtype=float)
        for j, x in enumerate(xs):
            dists = np.sqrt((tower_coords[:,0] - x)**2 + (tower_coords[:,1] - y)**2)
            pl = fspl_db(dists, F_MHZ)
            pr_vals = TX_DBM + TX_ANT_GAIN_DB + RX_ANT_GAIN_DB - pl - OTHER_LOSSES_DB
            row_vals[j] = pr_vals.max() if len(pr_vals)>0 else -9999.0
        pr_grid[i,:] = row_vals
    
    # create raster
    transform = from_origin(minx, maxy, res, res)
    out_meta = {
        'driver': 'GTiff',
        'height': pr_grid.shape[0],
        'width': pr_grid.shape[1],
        'count': 1,
        'dtype': 'float32',
        'crs': TARGET_CRS,
        'transform': transform
    }
    out_raster = OUTPUT_DIR / "fspl_pr_coarse.tif"
    with rasterio.open(out_raster, 'w', **out_meta) as dst:
        dst.write(pr_grid.astype('float32'), 1)
    print("Saved FSPL coarse PR raster to:", out_raster)


## 6) Terrain-aware LOS sampling (pseudocode + helper function)

Goal: for a tower → receiver pair, sample the DEM along the straight line and determine if any terrain blocks the line-of-sight.

Pseudocode explanation:
- For a pair of coordinates, generate N equally spaced points along the straight line.
- Sample DEM elevations at those points.
- Interpolate the height of the straight line from tower antenna height to receiver height.
- If DEM height + vegetation/building assumptions exceed the straight line at any sample point, mark NLOS and return a blocking metric.

We'll provide a helper function `profile_los` that returns (los_boolean, max_obstruction_m).

In [None]:

# DEM LOS sampling helper function
import math
from rasterio import Affine
from rasterio.enums import Resampling

def profile_los(dem_path, tower_xy, target_xy, tower_elev_m=None, rx_elev_m=None, n_samples=200):
    """Sample DEM along the line from tower_xy to target_xy.
    dem_path: path to DEM in same CRS as coordinates (or transform coordinates accordingly).
    tower_xy, target_xy: tuples (x,y) in DEM CRS.
    tower_elev_m: additional antenna height above ground for tower (m).
    rx_elev_m: receiver height above ground (m).
    
    Returns:
      dict with keys: 'blocked' (bool), 'max_obstruction_m' (float), 'profile' (array), 'dist_m' (float)
    """
    if not Path(dem_path).exists():
        raise FileNotFoundError(f"DEM file not found: {dem_path}")
    with rasterio.open(dem_path) as src:
        # sample points along linear interpolation
        line = LineString([Point(tower_xy), Point(target_xy)])
        length = line.length
        xs = np.linspace(tower_xy[0], target_xy[0], n_samples)
        ys = np.linspace(tower_xy[1], target_xy[1], n_samples)
        coords = [(x, y) for x, y in zip(xs, ys)]
        samples = np.array([v[0] for v in src.sample(coords)])
        # determine heights
        tower_base_elev = samples[0]
        rx_base_elev = samples[-1]
        tower_h = tower_base_elev + (ANTENNA_HEIGHT_M if tower_elev_m is None else tower_elev_m)
        rx_h = rx_base_elev + (RECV_HEIGHT_M if rx_elev_m is None else rx_elev_m)
        distances = np.linspace(0, length, n_samples)
        los_line = tower_h + (rx_h - tower_h) * (distances / distances[-1])
        obstruction = samples - los_line
        max_ob = obstruction.max()
        blocked = max_ob > 0
        return {
            "blocked": bool(blocked),
            "max_obstruction_m": float(max_ob),
            "profile_elev": samples.tolist(),
            "dist_m": float(length)
        }


## 7) Attach cellular estimates to parcels/buildings

Tasks:
- Sample the coarse FSPL PR raster at parcel centroids to assign a `cell_rssi_dbm`.
- Optionally, for candidate parcels (no fixed service or PR near threshold), run `profile_los` against nearest towers to refine NLOS vs LOS.

Pseudocode:
- Compute parcel centroids.
- Sample `fspl_pr_coarse.tif` at centroid points using rasterio.sample.
- For parcels with PR within 10 dB of threshold, sample a few tower profiles with the DEM to check obstruction and add diffraction penalty if blocked.


In [None]:

# Sample coarse PR raster at parcel centroids
from rasterio import open as rio_open

pr_raster_path = OUTPUT_DIR / "fspl_pr_coarse.tif"
if not pr_raster_path.exists():
    print("PR raster missing; run the FSPL raster generation cell first.")
else:
    parcels = loaded.get("parcels")
    if parcels is None:
        print("Parcels not loaded; ensure parcels_thurston.geojson exists in data/")
    else:
        parcels = parcels.to_crs(TARGET_CRS)
        parcels["centroid"] = parcels.geometry.centroid
        cent_coords = [(pt.x, pt.y) for pt in parcels["centroid"]]
        with rio_open(pr_raster_path) as src:
            samples = list(src.sample(cent_coords))
            parcels["cell_rssi_dbm"] = [float(s[0]) if s is not None else np.nan for s in samples]
        # Simple class
        parcels["cell_class"] = pd.cut(parcels["cell_rssi_dbm"].fillna(-9999),
                                       bins=[-9999, -110, -95, -60, 999],
                                       labels=["none","poor","marginal","good"])
        parcels.to_file(OUTPUT_DIR / "parcels_with_cell_rssi.geojson", driver="GeoJSON")
        print("Saved parcels_with_cell_rssi.geojson with cell_rssi_dbm and cell_class")


## 8) Proximity to WSDOT dig-once corridors (Euclidean and network distance)

Pseudocode:
- Compute centroid-to-dig-once Euclidean distance using `unary_union` on the dig-once geometry.
- Build an OSMnx road graph for the county and compute network shortest path distance from parcel centroid to nearest dig-once node.
- Save both metrics to the parcel table for prioritization.

Notes: Building a full county road graph can be resource-intensive; you may restrict to a buffer around candidate parcels.

In [None]:

# Proximity to dig-once (euclidean + network)
wsdot = loaded.get("wsdot")
parcels = loaded.get("parcels")
if wsdot is None or parcels is None:
    print("wsdot or parcels not loaded; please provide wsodt_dig_once.geojson and parcels.")
else:
    # Euclidean distance to nearest dig-once geometry
    wsdot_union = wsdot.unary_union
    parcels["dist_to_digonce_m"] = parcels.geometry.centroid.apply(lambda p: p.distance(wsdot_union))
    print("Computed Euclidean distance to dig-once for parcels.")
    
    # Network distance: build an OSMnx graph for the bbox (latlon)
    # Convert bbox to latlon
    bbox = parcels.total_bounds  # minx, miny, maxx, maxy in TARGET_CRS
    bbox_gs = gpd.GeoSeries([
        Point(bbox[0], bbox[1]), Point(bbox[2], bbox[3])
    ], crs=TARGET_CRS)
    bbox_latlon = bbox_gs.to_crs("EPSG:4326").unary_union.envelope.bounds  # minx,miny,maxx,maxy in lon/lat
    minlon, minlat, maxlon, maxlat = bbox_latlon[0], bbox_latlon[1], bbox_latlon[2], bbox_latlon[3]
    
    print("Building OSMnx road graph (this may take some time)...")
    try:
        G = ox.graph_from_bbox(maxlat, minlat, maxlon, minlon, network_type='drive')
        # Convert dig-once centroids to nearest nodes
        dig_nodes = [ox.distance.nearest_nodes(G, X=pt.x, Y=pt.y) for pt in wsdot.to_crs('EPSG:4326').geometry.centroid]
        # For each parcel centroid, compute nearest graph node and shortest distance to any dig node
        # This is potentially slow; you can vectorize or sample candidate parcels only
        parcel_nodes = []
        for centroid in parcels.geometry.centroid.to_crs('EPSG:4326'):
            node = ox.distance.nearest_nodes(G, X=centroid.x, Y=centroid.y)
            parcel_nodes.append(node)
        parcels["osmnx_node"] = parcel_nodes
        # compute shortest path length (meters) to nearest dig node for each parcel
        def nearest_network_dist(u):
            try:
                dists = [nx.shortest_path_length(G, u, v, weight='length') for v in dig_nodes]
                return min(dists) if len(dists)>0 else np.nan
            except Exception as e:
                return np.nan
        parcels["dist_to_digonce_net_m"] = parcels["osmnx_node"].apply(nearest_network_dist)
        parcels.to_file(OUTPUT_DIR / "parcels_with_digonce_dist.geojson", driver="GeoJSON")
        print("Saved parcels_with_digonce_dist.geojson")
    except Exception as e:
        print("OSMnx graph-building or network distances failed:", e)


## 9) Road extension cost surface and least-cost path (pseudocode)

Goal: create a raster cost surface where cost increases with slope and is lower along roads. Then compute least-cost routes from dig-once corridors to candidate parcels.

Pseudocode:
- Derive slope raster from DEM (degrees or percent).
- Compute base cost per cell = distance * (1 + alpha * normalized_slope).
- Rasterize roads (from OSMnx or local roads) and reduce cost on road cells.
- Use `skimage.graph.route_through_array` or Dijkstra on raster to compute least-cost path and total cost.

This cell provides a code skeleton; adapting depends on the exact DEM resolution and available road data.

In [None]:

# Road extension cost surface (skeleton)
try:
    import skimage.graph as skgraph
    import rasterio.features as rfeatures
except Exception as e:
    print("skimage or rasterio.features import issue (install skimage).")

dem_path = dem_fp
roads_gdf = None
try:
    # use OSMnx to get roads if no local road layer provided
    bbox = parcels.total_bounds
    bbox_gs = gpd.GeoSeries([Point(bbox[0], bbox[1]), Point(bbox[2], bbox[3])], crs=TARGET_CRS)
    bbox_latlon = bbox_gs.to_crs("EPSG:4326").unary_union.envelope.bounds
    minlon, minlat, maxlon, maxlat = bbox_latlon[0], bbox_latlon[1], bbox_latlon[2], bbox_latlon[3]
    roads_graph = ox.graph_from_bbox(maxlat, minlat, maxlon, minlon, network_type='drive')
    roads = ox.graph_to_gdfs(roads_graph, nodes=False, edges=True).to_crs(TARGET_CRS)
    roads_gdf = roads
    print(f"Loaded {len(roads_gdf)} road segments from OSM for cost surface.")
except Exception as e:
    print("OSMnx roads load failed or skipped:", e)

if dem_path.exists() and roads_gdf is not None:
    # 1) compute slope using rasterio (very basic approach)
    with rasterio.open(dem_path) as src:
        dem = src.read(1).astype('float32')
        transform = src.transform
        # compute slope via finite differences (simple, not hydrology-optimized)
        x, y = np.gradient(dem, src.res[0], src.res[1])
        slope = np.sqrt(x*x + y*y)
        # normalize slope for cost factor
        slope_norm = slope / (np.nanpercentile(slope, 98) + 1e-6)
        alpha = 3.0  # slope penalty multiplier (tune)
        base_cost = 1.0
        cost_raster = base_cost * (1 + alpha * slope_norm)
        # rasterize roads to same grid and reduce cost where roads exist
        road_shapes = ((geom, 1) for geom in roads_gdf.geometry)
        roads_raster = rfeatures.rasterize(road_shapes, out_shape=dem.shape, transform=transform, fill=0, dtype='uint8')
        cost_raster[roads_raster==1] *= 0.2  # road cells are cheaper (20% of base)
        # Save cost raster
        cost_meta = src.meta.copy()
        cost_meta.update({'dtype': 'float32', 'count': 1})
        with rasterio.open(OUTPUT_DIR / "road_extension_cost.tif", 'w', **cost_meta) as dst:
            dst.write(cost_raster.astype('float32'), 1)
    print("Saved cost raster to output/road_extension_cost.tif")
else:
    print("DEM or roads missing; cost surface step skipped.")    


## 10) Prioritization scoring & outputs

Create a combined score for parcels that indicates priority for fiber/duct extension. Example criteria and weights:

- `unserved` or `cellular_only`: high base priority
- `dist_to_digonce_net_m`: smaller distance is better
- `road_extension_cost` or least-cost path cost: smaller cost is better
- population/households (from ACS): more households increases priority

Pseudocode:
- Normalize each metric to 0-1 range with appropriate direction.
- Weighted sum to produce `priority_score`.
- Export top N parcels as GeoJSON/CSV.


In [None]:

# Prioritization skeleton
parcels = parcels if 'parcels' in globals() else loaded.get('parcels')
if parcels is None:
    print("Parcels not available. Run earlier cells to load and compute parcel attributes.")
else:
    # Example flagging: rely on previous cell outputs
    if "cell_rssi_dbm" not in parcels.columns:
        parcels["cell_rssi_dbm"] = np.nan
    if "dist_to_digonce_m" not in parcels.columns:
        parcels["dist_to_digonce_m"] = np.nan
    # create base priority: unserved or cellular_only
    parcels["fixed_available"] = parcels.get("fixed_available", False)
    parcels["reliance"] = "fixed"
    parcels.loc[(~parcels["fixed_available"]) & (parcels["cell_rssi_dbm"] >= -110), "reliance"] = "cellular_only"
    parcels.loc[(~parcels["fixed_available"]) & (parcels["cell_rssi_dbm"] < -110), "reliance"] = "unserved"
    parcels["base_score"] = parcels["reliance"].map({"unserved": 1.0, "cellular_only": 0.7, "fixed": 0.0})
    # normalize distance (smaller is better)
    parcels["dist_norm"] = parcels["dist_to_digonce_m"].fillna(1e6)
    parcels["dist_norm"] = 1 - np.clip((parcels["dist_norm"] / parcels["dist_norm"].max()), 0, 1)
    # placeholder for cost_norm (lower cost = higher priority)
    parcels["cost_norm"] = 1.0  # replace with actual cost normalization
    # combine weights
    parcels["priority_score"] = 0.5*parcels["base_score"] + 0.3*parcels["dist_norm"] + 0.2*parcels["cost_norm"]
    # export top parcels
    top = parcels.sort_values("priority_score", ascending=False).head(500)
    top.to_file(OUTPUT_DIR / "priority_parcels_top500.geojson", driver="GeoJSON")
    print("Saved top 500 parcels to output/priority_parcels_top500.geojson")    


## 11) Visualization examples

Simple maps using geopandas and matplotlib:

- PR raster as base (imshow)
- Parcels colored by `reliance` or `priority_score`
- Dig-once corridors and top priority parcels overlayed


In [None]:

# Simple visualization examples (static)
import matplotlib.pyplot as plt
from rasterio.plot import show

# show PR raster and parcel overlay
pr_path = OUTPUT_DIR / "fspl_pr_coarse.tif"
parcels_file = OUTPUT_DIR / "parcels_with_cell_rssi.geojson"
if pr_path.exists() and Path(parcels_file).exists():
    with rasterio.open(pr_path) as src:
        fig, ax = plt.subplots(figsize=(10,10))
        show(src, ax=ax)
        parcels = gpd.read_file(parcels_file).to_crs(src.crs)
        parcels_sample = parcels.sample(min(2000, len(parcels)))  # sample for plotting speed
        parcels_sample.plot(ax=ax, column="cell_class", markersize=2, alpha=0.6, legend=True)
        plt.title("Coarse PR raster with sample parcels overlay (cell_class)")
        plt.show()
else:
    print("PR raster or parcels_with_cell_rssi.geojson missing; run earlier cells.")


## 12) Next steps, validation, and sensitivity tests

Recommendations:
- Validate OSM tower locations against FCC ASR or operator records.
- Compare FSPL estimates against real speedtest measurements where available.
- Add vegetation/building clutter models or use a propagation tool (SPLAT!, ITM/Longley-Rice) for higher accuracy.
- Run sensitivity analysis on frequency/antenna height/other_losses.
- Document results and store parameter settings for reproducibility.

This notebook is designed to be a practical starting point — adapt layers, parameters, and refinement steps to suit your needs.