# 01 – APD Crime Spatio-Temporal Modeling Pipeline


## Purpose
Build a complete, reproducible, end-to-end data pipeline for Atlanta Police 
Department (APD) crime data that:

1. **Cleans, standardizes, and de-duplicates** raw APD incident CSVs  
2. **Enriches** each incident with:
   - NPU, APD zone, neighborhood, and city (via spatial joins)
   - Historical hourly + daily weather (Open-Meteo archive)
   - Grid-based density features (7-day rolling window)
   - Rolling NPU crime trends (30-day average)
   - Campus proximity features (distance + closest school)
   - Temporal cyclical features (hour_sin, hour_cos, holidays, weekends)

3. Produces three modeling-ready datasets:
   - **Full cleaned + enriched APD dataset**
   - **Burglary-related subset (~117k rows)**
   - **NPU × Hour modeling panels**  
      - *Sparse panel* — only hours that contain ≥1 burglary  
      - *Dense panel* — all NPUs × all hours (including zero-count hours)

4. Supports downstream **spatio-temporal forecasting**, including:
   - Naive seasonal baselines (24h, 168h)
   - ARIMA/SARIMA
   - Prophet
   - Poisson GLM / Negative Binomial / Zero-Inflated Poisson (ZIP)
   - XGBoost / LightGBM / CatBoost (Poisson reg mode)
   - LSTM / TCN deep learning temporal models

## Primary Modeling Target
**burglary_count per (NPU, hour_ts)**  

## Key Outputs
Saved under: `../data/processed/apd/`

- `target_crimes.parquet`  
- `apd_burglary.parquet`  
- `npu_sparse_panel.parquet`  
- `npu_dense_panel.parquet`  

--- Imports & Init ---

In [1]:
import re
import warnings
from pathlib import Path
from typing import List, Dict, Any, Tuple
from datetime import datetime

import numpy as np
import pandas as pd
import geopandas as gpd

from dateutil import parser
import holidays

import openmeteo_requests
import requests_cache
from retry_requests import retry

from rich.console import Console
from rich.table import Table
from rich.panel import Panel

warnings.filterwarnings("ignore", "SettingWithCopyWarning")

console = Console()
console.print(
    Panel.fit(
        "[bold green]Libraries imported. Spatial + weather + rich logging ready.[/bold green]",
        title="Import Summary",
        border_style="cyan",
    )
)

--- Section 1: Config, Logger, Validation, Core Helpers ---

In [2]:
# Pipeline logger
pipeline_log: List[Dict[str, Any]] = []


def log_step(step_name: str, df: pd.DataFrame) -> None:
    """Log pipeline step name + shape."""
    if not isinstance(df, pd.DataFrame) or df.empty:
        rows_val: Any = "N/A"
        cols_val: Any = "N/A"
        rows_str = "N/A"
        cols_str = "N/A"
    else:
        rows_val = int(df.shape[0])
        cols_val = int(df.shape[1])
        rows_str = f"{rows_val:,}"
        cols_str = str(cols_val)

    pipeline_log.append({"step": step_name, "rows": rows_val, "cols": cols_val})
    console.print(f"[green]{step_name}[/green] [cyan]shape: {rows_str} x {cols_str}[/cyan]")


def show_pipeline_table() -> None:
    """Pretty-print pipeline log."""
    if not pipeline_log:
        console.print("[red]No pipeline steps logged yet.[/red]")
        return

    table = Table(title="Data Pipeline Summary", show_lines=True)
    table.add_column("Step", style="cyan", no_wrap=True)
    table.add_column("Rows", style="green")
    table.add_column("Cols", style="yellow")

    for entry in pipeline_log:
        rows_val = entry["rows"]
        cols_val = entry["cols"]
        rows_str = f"{rows_val:,}" if isinstance(rows_val, int) else str(rows_val)
        cols_str = f"{cols_val:,}" if isinstance(cols_val, int) else str(cols_val)
        table.add_row(entry["step"], rows_str, cols_str)

    console.print(table)


console.print(Panel("[bold green]Pipeline logger configured.[/bold green]", border_style="green"))


# --- Validation helpers ---


def run_validation_checks(df: pd.DataFrame, step_name: str, check_npu: bool = False) -> None:
    """
    Key integrity checks:
      - incident_number uniqueness
      - coordinate bounds (rough ATL box)
      - core completeness
      - NPU category sanity (optional)
    """
    LAT_MIN, LAT_MAX = 33.5, 34.0
    LON_MIN, LON_MAX = -84.6, -84.2

    # Uniqueness
    if "incident_number" in df.columns:
        duplicates = df.duplicated(subset=["incident_number"]).sum()
        if duplicates > 0:
            console.print(
                f"[bold red]FAIL: {step_name} - {duplicates:,} incident_number duplicates.[/bold red]"
            )
        else:
            console.print(f"[green]PASS: {step_name} - incident_number unique.[/green]")

    # Bounds
    if all(c in df.columns for c in ["latitude", "longitude"]):
        out_of_bounds = df[
            (df["latitude"] < LAT_MIN)
            | (df["latitude"] > LAT_MAX)
            | (df["longitude"] < LON_MIN)
            | (df["longitude"] > LON_MAX)
        ].shape[0]
        if out_of_bounds > 0:
            console.print(
                f"[bold yellow]WARNING: {step_name} - {out_of_bounds:,} rows outside expected ATL bounds.[/bold yellow]"
            )
        else:
            console.print(f"[green]PASS: {step_name} - coordinates within expected bounds.[/green]")

    # Completeness
    core_cols = ["report_date", "nibrs_offense"]
    for col in core_cols:
        if col in df.columns:
            missing_pct = df[col].isna().sum() / len(df)
            if missing_pct > 0.01:
                console.print(
                    f"[bold red]FAIL: {step_name} - '{col}' missing {missing_pct:.2%} (>1%).[/bold red]"
                )
            else:
                console.print(
                    f"[green]PASS: {step_name} - '{col}' completeness OK ({missing_pct:.2%} missing).[/green]"
                )

    # NPU category integrity
    if check_npu and "npu" in df.columns and df["npu"].dtype == "category":
        valid_npus = set("ABCDEFGHIJKLMNOPQRSTUVWXYZ") | {"OUT_OF_NPU_BOUNDS"}
        current_npus = set(df["npu"].cat.categories.str.upper().tolist())
        invalid_npus = current_npus - valid_npus
        if invalid_npus:
            console.print(
                f"[bold red]FAIL: {step_name} - unexpected NPU categories: {invalid_npus}[/bold red]"
            )
        else:
            console.print(
                f"[green]PASS: {step_name} - NPU categories valid (A–Z, OUT_OF_NPU_BOUNDS).[/green]"
            )


def show_missing_comparison(df_local: pd.DataFrame, snapshot: Dict[str, Any], step_name: str) -> None:
    """Compare missingness before/after a step."""
    table = Table(
        title=f"{step_name} - Missing Data Comparison",
        show_header=True,
        header_style="bold magenta",
    )
    table.add_column("Column", style="cyan")
    table.add_column("Before", justify="right", style="red")
    table.add_column("After", justify="right", style="green")
    table.add_column("Filled", justify="right", style="blue")

    for col, (before, before_pct) in snapshot.items():
        if col in df_local.columns:
            after = df_local[col].isna().sum()
            filled = before - after
            table.add_row(
                col,
                f"{before:,} ({before_pct:.1f}%)",
                f"{after:,} ({after/len(df_local)*100:.1f}%)",
                f"{filled:,}",
            )

    console.print(table)


# --- Paths & constants ---

DATA_DIR = Path("../data")
RAW_DATA_FOLDER = DATA_DIR / "raw" / "apd"
INTERIM_DATA_FOLDER = DATA_DIR / "interim" / "apd"
PROCESSED_DATA_FOLDER = DATA_DIR / "processed" / "apd"
EXTERNAL_DATA_FOLDER = DATA_DIR / "external"
SHAPEFILES_DIR = DATA_DIR / "raw" / "shapefiles"

for folder in [EXTERNAL_DATA_FOLDER, INTERIM_DATA_FOLDER, PROCESSED_DATA_FOLDER]:
    folder.mkdir(parents=True, exist_ok=True)

HOURLY_WEATHER_PATH = EXTERNAL_DATA_FOLDER / "atlanta_hourly_weather_2021_to_current.csv"
DAILY_WEATHER_PATH = EXTERNAL_DATA_FOLDER / "atlanta_daily_weather_2021_to_current.csv"
HOURLY_WEATHER_PARQUET = EXTERNAL_DATA_FOLDER / "atlanta_hourly_weather_2021_to_current.parquet"
DAILY_WEATHER_PARQUET = EXTERNAL_DATA_FOLDER / "atlanta_daily_weather_2021_to_current.parquet"

CITIES_SHP = SHAPEFILES_DIR / "census_boundary_2024_sf" / "ga_census_places_2024.shp"
CAMPUS_SHP = SHAPEFILES_DIR / "area_landmark_2024_sf" / "ga_census_landmarks_2023.shp"
NEIGHBORHOOD_SHP = SHAPEFILES_DIR / "atl_neighborhood_sf" / "atl_neighborhoods.shp"
NPU_SHP = SHAPEFILES_DIR / "atl_npu_sf" / "atl_npu_boundaries.shp"
APD_ZONE_SHP = SHAPEFILES_DIR / "apd_zone_2019_sf" / "apd_police_zones_2019.shp"

SCHOOL_CENTERS = {
    "GSU": (33.7530, -84.3863),
    "GA_Tech": (33.7756, -84.3963),
    "Emory": (33.7925, -84.3239),
    "Clark": (33.7533, -84.4124),
    "Spelman": (33.7460, -84.4129),
    "Morehouse": (33.7483, -84.4126),
    "Morehouse_Med": (33.7505, -84.4131),
    "Atlanta_Metro": (33.7145, -84.4020),
    "Atlanta_Tech": (33.7126, -84.4034),
    "SCAD": (33.7997, -84.3920),
    "John_Marshall": (33.7621, -84.3896),
}

CAMPUS_ENCODING = {
    "none": 0,
    "GSU": 1,
    "GA_Tech": 2,
    "Emory": 3,
    "Clark": 4,
    "Spelman": 5,
    "Morehouse": 6,
    "Morehouse_Med": 7,
    "Atlanta_Metro": 8,
    "Atlanta_Tech": 9,
    "SCAD": 10,
    "John_Marshall": 11,
}

# --- Generic helpers ---


def standardize_column_name(col: str) -> str:
    col = re.sub(r"(.)([A-Z][a-z]+)", r"\1_\2", col)
    col = re.sub(r"([a-z0-9])([A-Z])", r"\1_\2", col)
    col = col.lower()
    col = re.sub(r"[\s\-\.\,\(\)\[\]\{\}]+", "_", col)
    col = re.sub(r"[^\w]", "", col)
    col = re.sub(r"_+", "_", col).strip("_")
    return col


def parse_report_date(x: Any) -> pd.Timestamp:
    """Safe mixed-format parser for APD dates."""
    if pd.isna(x):
        return pd.NaT
    s = str(x).strip()
    if s.lower() == "nan":
        return pd.NaT
    # Common explicit pattern (mm/dd/yyyy hh:mm:ss AM/PM)
    if re.match(r"^\d{1,2}/\d{1,2}/\d{4} \d{1,2}:\d{2}:\d{2} [APMapm]{2}$", s):
        return pd.to_datetime(s, format="%m/%d/%Y %I:%M:%S %p", errors="coerce")
    try:
        return parser.parse(s, fuzzy=True)
    except Exception:
        return pd.NaT


def optimize_dtypes(df: pd.DataFrame) -> pd.DataFrame:
    """Downcast numeric columns to save memory."""
    import pandas.api.types as ptypes

    mem_before = df.memory_usage(deep=True).sum() / (1024**2)
    for col in list(df.columns):
        if ptypes.is_numeric_dtype(df[col]):
            if ptypes.is_integer_dtype(df[col]):
                df[col] = pd.to_numeric(df[col], downcast="integer")
            elif ptypes.is_float_dtype(df[col]):
                df[col] = pd.to_numeric(df[col], downcast="float")
    mem_after = df.memory_usage(deep=True).sum() / (1024**2)
    console.print(
        f"[cyan]optimize_dtypes:[/cyan] {mem_before:.2f} → {mem_after:.2f} MB ({mem_after - mem_before:+.2f} MB)"
    )
    return df


def add_count_encoding(df: pd.DataFrame, col: str) -> pd.DataFrame:
    """Count-encode a categorical column."""
    df = df.copy()
    if col in df.columns:
        new_col = f"{col}_count"
        console.print(f"[cyan]Count-encoding:[/cyan] '{col}' → '{new_col}'")
        df[new_col] = df[col].map(df[col].value_counts()).fillna(0).astype(int)
    return df


def cleanup_duplicate_columns(df: pd.DataFrame) -> pd.DataFrame:
    """Drop duplicate columns by name, keeping first occurrence."""
    cols = df.columns
    seen = set()
    keep = []
    for c in cols:
        if c not in seen:
            keep.append(c)
            seen.add(c)
        else:
            console.print(f"[yellow]Dropped duplicate column:[/yellow] {c}")
    return df.loc[:, keep]


log_step("Step 1: Logger, paths, helpers configured", pd.DataFrame())

--- Section 2: Weather Fetch/Cache ---

In [3]:
def fetch_atlanta_weather_full(lat: float = 33.749, lon: float = -84.388) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """Fetch full historical weather (2021→today) and cache to CSV/Parquet."""
    start_date = "2021-01-01"
    end_date = datetime.now().strftime("%Y-%m-%d")

    console.print(
        Panel(
            f"[bold cyan]Fetching Weather Data (2021→{end_date})[/bold cyan]\n"
            f"Location: ({lat:.3f}, {lon:.3f})",
            border_style="cyan",
        )
    )

    cache_session = requests_cache.CachedSession(".cache", expire_after=-1)
    retry_session = retry(cache_session, retries=5, backoff_factor=0.2)
    openmeteo = openmeteo_requests.Client(session=retry_session)

    url = "https://archive-api.open-meteo.com/v1/archive"
    params = {
        "latitude": lat,
        "longitude": lon,
        "start_date": start_date,
        "end_date": end_date,
        "hourly": [
            "temperature_2m",
            "precipitation",
            "rain",
            "apparent_temperature",
            "weather_code",
            "is_day",
        ],
        "daily": [
            "sunrise",
            "daylight_duration",
            "sunshine_duration",
            "precipitation_hours",
            "rain_sum",
            "temperature_2m_mean",
            "weather_code",
        ],
        "timezone": "America/New_York",
        "temperature_unit": "fahrenheit",
    }

    responses = openmeteo.weather_api(url, params=params)
    response = responses[0]

    # Hourly
    hourly = response.Hourly()
    hourly_df = pd.DataFrame(
        {
            "datetime": pd.date_range(
                start=pd.to_datetime(hourly.Time(), unit="s", utc=True),
                end=pd.to_datetime(hourly.TimeEnd(), unit="s", utc=True),
                freq=pd.Timedelta(seconds=hourly.Interval()),
                inclusive="left",
            ),
            "temp_f": hourly.Variables(0).ValuesAsNumpy(),
            "precip_in": hourly.Variables(1).ValuesAsNumpy(),
            "rain_in": hourly.Variables(2).ValuesAsNumpy(),
            "apparent_temp_f": hourly.Variables(3).ValuesAsNumpy(),
            "weather_code_hourly": hourly.Variables(4).ValuesAsNumpy(),
            "is_daylight": hourly.Variables(5).ValuesAsNumpy().astype(int),
        }
    )
    hourly_df["datetime"] = (
        hourly_df["datetime"].dt.tz_convert("America/New_York").dt.tz_localize(None)
    )

    # Daily
    daily = response.Daily()
    daily_df = pd.DataFrame(
        {
            "date": pd.date_range(
                start=pd.to_datetime(daily.Time(), unit="s", utc=True),
                end=pd.to_datetime(daily.TimeEnd(), unit="s", utc=True),
                freq=pd.Timedelta(seconds=daily.Interval()),
                inclusive="left",
            ),
            "sunrise": daily.Variables(0).ValuesInt64AsNumpy(),
            "daylight_duration_sec": daily.Variables(1).ValuesAsNumpy(),
            "sunshine_duration_sec": daily.Variables(2).ValuesAsNumpy(),
            "precip_hours": daily.Variables(3).ValuesAsNumpy(),
            "rain_sum_in": daily.Variables(4).ValuesAsNumpy(),
            "temp_mean_f": daily.Variables(5).ValuesAsNumpy(),
            "weather_code_daily": daily.Variables(6).ValuesAsNumpy(),
        }
    )
    daily_df["date"] = (
        daily_df["date"].dt.tz_convert("America/New_York").dt.tz_localize(None).dt.date
    )

    # Cache
    hourly_df.to_csv(HOURLY_WEATHER_PATH, index=False)
    daily_df.to_csv(DAILY_WEATHER_PATH, index=False)
    try:
        hourly_df.to_parquet(HOURLY_WEATHER_PARQUET, index=False, compression="snappy")
        daily_df.to_parquet(DAILY_WEATHER_PARQUET, index=False, compression="snappy")
    except Exception as e:
        console.print(f"[yellow]Weather Parquet export failed: {e}[/yellow]")

    console.print(
        f"[green]Weather saved:[/green] {len(hourly_df):,} hourly, {len(daily_df):,} daily rows."
    )
    return hourly_df, daily_df


REFRESH_WEATHER = False
CACHE_EXISTS = HOURLY_WEATHER_PATH.exists() and DAILY_WEATHER_PATH.exists()

console.print(Panel("Weather Data Initialization", border_style="cyan"))

if REFRESH_WEATHER or not CACHE_EXISTS:
    console.print("[yellow]Fetching weather history from API...[/yellow]")
    hourly_df, daily_df = fetch_atlanta_weather_full()
    log_step("Step 2: Weather fetched from API", pd.DataFrame())
else:
    console.print("[green]Using cached weather CSVs.[/green]")
    hourly_df = pd.read_csv(HOURLY_WEATHER_PATH)
    daily_df = pd.read_csv(DAILY_WEATHER_PATH)
    log_step("Step 2: Weather loaded from cache", pd.DataFrame())

--- Section 3: Standardize & Combine Raw APD CSVs ---

In [4]:
def combine_and_deduplicate(files: List[Path], dedupe_key: str) -> pd.DataFrame:
    console.print("\n[bold cyan]Combining APD CSVs...[/bold cyan]")
    dfs = []
    for fp in files:
        console.print(f"[cyan]Reading:[/cyan] {fp.name}")
        df = pd.read_csv(fp)
        df.columns = [standardize_column_name(c) for c in df.columns]
        dfs.append(df)

    df_combined = pd.concat(dfs, ignore_index=True)
    total_rows = len(df_combined)

    dedupe_key_std = standardize_column_name(dedupe_key)
    if dedupe_key_std not in df_combined.columns:
        raise KeyError(
            f"Dedupe key '{dedupe_key_std}' missing after standardization. "
            "Check raw columns for IncidentNumber."
        )

    df_dedup = df_combined.drop_duplicates(subset=[dedupe_key_std])
    dupes = total_rows - len(df_dedup)

    console.print(f"[yellow]Combined rows:[/yellow] {total_rows:,}")
    console.print(f"[red]Removed duplicates:[/red] {dupes:,}")
    log_step(f"Step 3: Combined & deduped by {dedupe_key_std}", df_dedup)
    return df_dedup


input_files = list(RAW_DATA_FOLDER.glob("*.csv"))
if not input_files:
    raise FileNotFoundError(
        f"No CSVs found in {RAW_DATA_FOLDER}. Cannot run pipeline without raw data."
    )

df_combined = combine_and_deduplicate(input_files, "IncidentNumber")
df_combined = cleanup_duplicate_columns(df_combined)
run_validation_checks(df_combined, "Step 3: Post-ingestion")

--- Section 4: Core Cleaning & Text Normalization ---

In [5]:
def clean_apd_data(df: pd.DataFrame) -> pd.DataFrame:
    console.print("\n[bold cyan]Cleaning APD data...[/bold cyan]")
    df = df.copy()

    drop_cols = [
        "report_number",
        "fire_arm_involved",
        "object_id",
        "occurred_from_date",
        "occurred_to_date",
        "part",
        "vic_count",
        "is_bias_motivation_involved",
        "x",
        "y",
        "beat_text",
    ]
    drop_actual = [c for c in drop_cols if c in df.columns]
    if drop_actual:
        df = df.drop(columns=drop_actual)
        console.print(f"[yellow]Dropped columns:[/yellow] {', '.join(drop_actual)}")

    # One-hot encode event_watch
    if "event_watch" in df.columns:
        console.print("[cyan]One-hot encoding 'event_watch'...[/cyan]")
        one_hot = pd.get_dummies(df["event_watch"], prefix="event_watch", dummy_na=False)
        one_hot.columns = [standardize_column_name(c) for c in one_hot.columns]
        df = pd.concat([df.drop(columns=["event_watch"]), one_hot], axis=1)

    # Normalize text columns
    text_cols = ["location_type", "street_address", "nibrs_offense", "nhood_name"]
    for c in text_cols:
        if c in df.columns:
            df[c] = df[c].astype(str).str.lower().str.strip()

    # Keep raw neighborhood name separate from shapefile label
    if "nhood_name" in df.columns:
        df = df.rename(columns={"nhood_name": "neighborhood_raw"})

    return df


df_clean = clean_apd_data(df_combined)
log_step("Step 4: Core cleaning", df_clean)

df_clean = add_count_encoding(df_clean, "location_type")
log_step("Step 4.5: Location type count-encoding", df_clean)

--- Section 5: Robust Date Standardization ---

In [6]:
console.print("\n[bold cyan]Standardizing report_date...[/bold cyan]")

if "report_date" not in df_clean.columns:
    raise KeyError("'report_date' column not found in APD data.")

total_rows = len(df_clean)
df_clean["raw_report_date"] = df_clean["report_date"].apply(
    lambda x: str(x).strip() if pd.notna(x) else np.nan
)
df_clean["report_date"] = df_clean["raw_report_date"].apply(parse_report_date)

invalid = df_clean["report_date"].isna().sum()
parsed = total_rows - invalid

console.print(f"[cyan]Rows:[/cyan] {total_rows:,}")
console.print(f"[green]Parsed dates:[/green] {parsed:,}")
console.print(f"[yellow]Dropped invalid dates:[/yellow] {invalid:,}")

df_clean = df_clean.dropna(subset=["report_date"]).copy()
df_clean["report_date"] = pd.to_datetime(df_clean["report_date"])

log_step("Step 5: report_date → datetime", df_clean)

--- Section 6: Spatial Enrichment (NPU, Zone, Campus, Neighborhood, City) ---

In [7]:
def to_gdf(df: pd.DataFrame, lon_col: str = "longitude", lat_col: str = "latitude") -> gpd.GeoDataFrame:
    for c in (lon_col, lat_col):
        if c not in df.columns:
            raise KeyError(f"Expected coordinate column '{c}' not found.")
    return gpd.GeoDataFrame(
        df,
        geometry=gpd.points_from_xy(df[lon_col], df[lat_col]),
        crs="EPSG:4326",
    )


def load_shapefile(path: Path, target_crs: str) -> gpd.GeoDataFrame:
    """Load shapefile and align CRS."""
    try:
        gdf = gpd.read_file(path)
    except Exception:
        console.print(f"[bold red]Could not load shapefile:[/bold red] {path}")
        raise
    if gdf.crs is None:
        gdf = gdf.set_crs("EPSG:4326")
    return gdf.to_crs(target_crs)


def attach_shapefile(
    gdf: gpd.GeoDataFrame,
    shp_path: Path,
    name_col_candidates: List[str],
    target_col: str,
    predicate: str = "within",
    nearest: bool = False,
) -> gpd.GeoDataFrame:
    """
    Generic spatial join wrapper for nearest or within joins.
    - For campus, we allow nearest (EPSG:3857) join.
    - For city/hood, we use polygon within join.
    """
    if not shp_path.exists():
        console.print(f"[yellow]Skipping {target_col}: shapefile not found.[/yellow]")
        return gdf

    console.print(f"[cyan]Spatial attach:[/cyan] {target_col} (nearest={nearest})")

    shp_gdf = load_shapefile(shp_path, gdf.crs)

    # Identify the label/name column
    name_col = next((c for c in name_col_candidates if c in shp_gdf.columns), shp_gdf.columns[0])

    # For campus polygons: filter to educational landmarks
    if "MTFCC" in shp_gdf.columns and "campus" in target_col:
        shp_gdf = shp_gdf[shp_gdf["MTFCC"] == "S1400"]

    shp_gdf = shp_gdf[[name_col, "geometry"]].rename(columns={name_col: "_temp_target"})

    if nearest:
        # project to metric CRS for distance & nearest join
        orig_index = gdf.index
        gdf_proj = gdf.to_crs("EPSG:3857")
        shp_proj = shp_gdf.to_crs("EPSG:3857")
        joined = gpd.sjoin_nearest(gdf_proj, shp_proj, how="left")
        joined = joined.to_crs(gdf.crs)
        joined = joined.set_index(orig_index, drop=False)
    else:
        joined = gpd.sjoin(gdf, shp_gdf, how="left", predicate=predicate)

    out = joined.copy()
    out = out[~out.index.duplicated(keep="first")]
    out = out.rename(columns={"_temp_target": target_col})
    out = out.drop(columns=["index_right"], errors="ignore")

    return out


def enrich_spatial(df: pd.DataFrame) -> pd.DataFrame:
    console.print("\n[bold cyan]Spatial enrichment...[/bold cyan]")

    df = df.copy()

    # --- KEEP original APD-provided NPU ---
    if "npu" in df.columns:
        df = df.rename(columns={"npu": "npu_raw"})
        console.print("[green]Preserved APD NPU as 'npu_raw'.[/green]")

    # Remove rows lacking coordinates
    df_temp = df.dropna(subset=["longitude", "latitude"]).copy()
    dropped = len(df) - len(df_temp)
    if dropped > 0:
        console.print(f"[yellow]Dropped {dropped:,} rows missing coordinates.[/yellow]")

    gdf = to_gdf(df_temp)

    # Attach shapefile NPU (stored as npu_shp)
    gdf = attach_shapefile(
        gdf, NPU_SHP, ["NPU", "NPU_ID", "NAME"], "npu_shp", nearest=True
    )

    # Attach zones, campus, neighborhood, city
    gdf = attach_shapefile(gdf, APD_ZONE_SHP, ["ZONE", "Zone"], "zone_raw")
    gdf = attach_shapefile(gdf, CAMPUS_SHP, ["FULLNAME"], "campus_label_shp", nearest=True)
    gdf = attach_shapefile(gdf, NEIGHBORHOOD_SHP, ["NAME"], "neighborhood_shp")
    gdf = attach_shapefile(gdf, CITIES_SHP, ["NAME"], "city_label")

    df_enriched = pd.DataFrame(gdf.drop(columns=["geometry"], errors="ignore"))

    # --- FINAL NPU ASSIGNMENT ---
    df_enriched["npu"] = df_enriched["npu_shp"].fillna(df_enriched.get("npu_raw"))
    df_enriched["npu"] = df_enriched["npu"].astype(str).str.upper().str.strip()

    return df_enriched



df_spatial = enrich_spatial(df_clean)
log_step("Step 6: Spatial enrichment", df_spatial)

--- Section 7: Temporal & Contextual Features ---

In [8]:
# --- Section 7: Temporal & Contextual Features ---

def engineer_date_context_features(df: pd.DataFrame) -> pd.DataFrame:
    console.print("\n[bold cyan]Engineering temporal/context features...[/bold cyan]")
    df = df.copy()
    date_col = "report_date"

    df[date_col] = pd.to_datetime(df[date_col])
    dt = df[date_col].dt

    # --- Core temporal ------------------------------------------------------
    df["incident_datetime"] = df[date_col]
    df["incident_date"] = dt.date
    df["incident_hour"] = dt.hour
    df["year"] = dt.year
    df["month"] = dt.month

    # Canonical day-of-week string
    df["day_of_week"] = dt.day_name()

    # APD-style day_number: Sunday=1, Monday=2, ..., Saturday=7
    dow = dt.dayofweek  # Monday = 0 ... Sunday = 6
    apd_map = {
        6: 1,  # Sunday
        0: 2,  # Monday
        1: 3,
        2: 4,
        3: 5,
        4: 6,
        5: 7,  # Saturday
    }
    df["day_number"] = dow.map(apd_map).astype("int8")

    # 6 × 4-hour bins: 0–4, 5–8, 9–12, 13–16, 17–20, 21–24
    bin_edges = [0, 5, 9, 13, 17, 21, 24]
    bin_labels = [
        "Early Night (0–4)",
        "Early Morning (5–8)",
        "Late Morning (9–12)",
        "Afternoon (13–16)",
        "Evening (17–20)",
        "Late Night (21–24)",
    ]
    df["hour_block"] = pd.cut(
        df["incident_hour"],
        bins=bin_edges,
        labels=bin_labels,
        right=False,        # [0,5) [5,9) ...
        include_lowest=True,
    )

    # Holiday flag (Georgia, US)
    years = sorted(df[date_col].dt.year.unique().tolist())
    holiday_dates = holidays.country_holidays(country="US", subdiv="GA", years=years)
    df["is_holiday"] = df[date_col].dt.date.isin(holiday_dates)

    # Hour cyclic encoding & weekend
    df["hour_sin"] = np.sin(2 * np.pi * df["incident_hour"] / 24)
    df["hour_cos"] = np.cos(2 * np.pi * df["incident_hour"] / 24)
    df["is_weekend"] = (dow >= 5)

    # Offense category
    if "nibrs_offense" in df.columns:
        df["offense_category"] = np.select(
            [
                df["nibrs_offense"].str.contains("burglary|robbery", case=False, na=False),
                df["nibrs_offense"].str.contains("motor vehicle theft", case=False, na=False),
                df["nibrs_offense"].str.contains(
                    "theft|larceny|shoplift|fraud|swindle|embezzelment|stolen property|false pretenses",
                    case=False,
                    na=False,
                ),
                df["nibrs_offense"].str.contains(
                    "assault|murder|rape|battery|intimidation|extortion|kidnapping",
                    case=False,
                    na=False,
                ),
            ],
            ["Burglary/Robbery", "Motor Vehicle Theft", "Theft/Larceny/Fraud", "Violent Crime"],
            default="Other/Misc.",
        )

    # Drop any legacy day-of-week columns that conflict
    legacy_cols = [c for c in ["day_of_the_week"] if c in df.columns]
    if legacy_cols:
        df = df.drop(columns=legacy_cols)

    return df


df_features = engineer_date_context_features(df_spatial)
df_features = cleanup_duplicate_columns(df_features)
log_step("Step 7: Temporal & contextual features", df_features)


--- Section 8: Weather Merge & Flags ---

In [9]:
console.print("\n[bold cyan]Merging weather data...[/bold cyan]")

# Hourly merge
df_features["report_hour"] = pd.to_datetime(df_features["report_date"]).dt.floor("h")
hourly_df["datetime"] = pd.to_datetime(hourly_df["datetime"])
hourly_df["weather_hour"] = hourly_df["datetime"].dt.floor("h")
hourly_df = hourly_df.drop(columns=["datetime"])

df_features = df_features.merge(
    hourly_df,
    left_on="report_hour",
    right_on="weather_hour",
    how="left",
).drop(columns=["report_hour", "weather_hour"], errors="ignore")

# Daily merge
daily_df["weather_date"] = pd.to_datetime(daily_df["date"]).dt.date
df_features["incident_date"] = pd.to_datetime(df_features["report_date"]).dt.date
daily_df = daily_df.drop(columns=["date"])

df_features = df_features.merge(
    daily_df,
    left_on="incident_date",
    right_on="weather_date",
    how="left",
).drop(columns=["incident_date", "weather_date"], errors="ignore")

console.print("[green]Weather merge complete.[/green]")


def derive_weather_flags(df: pd.DataFrame) -> pd.DataFrame:
    console.print("[bold cyan]Deriving weather flags...[/bold cyan]")
    temp_col = "temp_f"
    precip_col = "precip_in"
    if temp_col not in df.columns or precip_col not in df.columns:
        console.print("[yellow]Weather columns missing; skipping flags.[/yellow]")
        return df

    p85 = df[temp_col].dropna().quantile(0.85)
    p15 = df[temp_col].dropna().quantile(0.15)

    df["is_raining"] = (df[precip_col].fillna(0) > 0.01).astype(int)
    df["is_hot"] = (df[temp_col] >= p85).astype(int)
    df["is_cold"] = (df[temp_col] <= p15).astype(int)
    return df


df_features = cleanup_duplicate_columns(df_features)
df_features = derive_weather_flags(df_features)
df_features = optimize_dtypes(df_features)

log_step("Step 8: Weather & flags", df_features)

--- Section 9: Advanced Spatio-Temporal Features (Grid-Based) ---

In [10]:
console.print("\n[bold cyan]Step 9: Spatio-Temporal Feature Generation (Grid Density)[/bold cyan]\n")

mem_before = df_features.memory_usage(deep=True).sum() / (1024**2)
console.print(f"[cyan]Memory before Step 9:[/cyan] {mem_before:.2f} MB")


def build_spatial_grid(gdf, cell_size_m=500):
    """
    Build a 500m grid over crime points in EPSG:3857.
    Returns:
        gdf_projected, x_edges, y_edges
    """
    gdf = gpd.GeoDataFrame(
        gdf,
        geometry=gpd.points_from_xy(gdf["longitude"], gdf["latitude"]),
        crs="EPSG:4326",
    ).to_crs("EPSG:3857")

    xmin, ymin, xmax, ymax = gdf.total_bounds

    x_edges = np.arange(xmin, xmax + cell_size_m, cell_size_m)
    y_edges = np.arange(ymin, ymax + cell_size_m, cell_size_m)

    return gdf, x_edges, y_edges


def assign_grid_id(gdf, x_edges, y_edges):
    """Assign each point to a grid cell via searchsorted."""
    gdf["grid_x"] = np.searchsorted(x_edges, gdf.geometry.x) - 1
    gdf["grid_y"] = np.searchsorted(y_edges, gdf.geometry.y) - 1
    gdf["grid_id"] = gdf["grid_x"].astype(str) + "_" + gdf["grid_y"].astype(str)
    return gdf


def compute_daily_grid_counts(gdf):
    """Count crimes per grid per day."""
    gdf["date"] = gdf["report_date"].dt.date

    daily_grid = (
        gdf.groupby(["grid_id", "date"])
        .size()
        .reset_index(name="grid_daily_count")
    )

    daily_grid["date"] = pd.to_datetime(daily_grid["date"])
    return daily_grid


def compute_grid_rolling_density(daily_grid, window_days=7):
    """7-day rolling sum per grid cell."""
    daily_grid = daily_grid.sort_values(["grid_id", "date"])

    daily_grid["grid_density_7d"] = daily_grid.groupby("grid_id")["grid_daily_count"].transform(
        lambda x: x.rolling(window_days, min_periods=1).sum()
    )

    return daily_grid


console.print("[cyan]Projecting coordinates and constructing 500m grid...[/cyan]")
gdf_proj, x_edges, y_edges = build_spatial_grid(df_features, cell_size_m=500)
gdf_proj = assign_grid_id(gdf_proj, x_edges, y_edges)

console.print("[cyan]Computing daily crime counts per grid cell...[/cyan]")
daily_grid = compute_daily_grid_counts(gdf_proj)

console.print("[cyan]Calculating 7-day rolling grid density...[/cyan]")
daily_grid = compute_grid_rolling_density(daily_grid, window_days=7)

console.print("[cyan]Merging grid densities back to main dataframe...[/cyan]")

daily_grid["date"] = daily_grid["date"].dt.date
gdf_proj["date"] = gdf_proj["report_date"].dt.date

gdf_proj = gdf_proj.merge(
    daily_grid[["grid_id", "date", "grid_density_7d"]],
    on=["grid_id", "date"],
    how="left",
)

df_features["grid_density_7d"] = gdf_proj["grid_density_7d"].values
df_features["grid_id"] = gdf_proj["grid_id"].values

log_step("Step 9.1: 7-day grid-based density added", df_features)


def npu_rolling_average(df_in: pd.DataFrame, window_days: int = 30) -> pd.DataFrame:
    """
    Compute NPU 30-day rolling average crime count and merge back
    as 'npu_crime_avg_{window_days}d'.
    """
    df_local = df_in[["npu", "report_date"]].copy()
    df_local["date"] = pd.to_datetime(df_local["report_date"].dt.date)

    daily = (
        df_local.groupby(["npu", "date"])
        .size()
        .reset_index(name="daily_npu_count")
    )

    daily = daily.sort_values(["npu", "date"])
    feature_name = f"npu_crime_avg_{window_days}d"

    daily[feature_name] = daily.groupby("npu")["daily_npu_count"].transform(
        lambda x: x.shift(1).rolling(window_days, min_periods=1).mean()
    )

    df_merge = df_in.copy()
    df_merge["date"] = pd.to_datetime(df_merge["report_date"].dt.date)

    df_merge = df_merge.merge(
        daily[["npu", "date", feature_name]],
        on=["npu", "date"],
        how="left",
    )
    df_merge[feature_name] = df_merge[feature_name].fillna(0)
    df_merge = df_merge.drop(columns=["date"], errors="ignore")

    return df_merge


console.print("[cyan]Computing NPU 30-day rolling averages...[/cyan]")
df_features = npu_rolling_average(df_features, window_days=30)
log_step("Step 9.2: NPU 30-day trend added", df_features)

mem_after = df_features.memory_usage(deep=True).sum() / (1024**2)
console.print(f"[green]Step 9 complete. Memory change: {mem_after - mem_before:+.2f} MB[/green]")
console.print("[bold green]Grid-based spatio-temporal density successfully computed.[/bold green]")

--- Section 10: Campus Distance & Proximity Flags ---

In [11]:
console.print(Panel("[bold cyan]Step 10: Campus distance & proximity[/bold cyan]", border_style="cyan"))

DISTANCE_THRESHOLD_M = 2414.016  # ~1.5 miles
console.print(f"[cyan]Distance threshold:[/cyan] {DISTANCE_THRESHOLD_M:.0f} m (~1.5 mi)")


def calculate_nearest_campus_fully_vectorized(df_in: pd.DataFrame) -> Tuple[pd.Series, pd.Series]:
    """Vectorized Haversine distance from each crime to nearest campus center."""
    df_local = df_in.copy()
    valid_mask = df_local["latitude"].notna() & df_local["longitude"].notna()
    valid_df = df_local[valid_mask].copy()

    nearest_campus = pd.Series("none", index=df_local.index)
    nearest_distance = pd.Series(np.nan, index=df_local.index)

    if len(valid_df) == 0:
        return nearest_campus, nearest_distance

    campus_names = list(SCHOOL_CENTERS.keys())
    campus_lats = np.array([lat for lat, lon in SCHOOL_CENTERS.values()])
    campus_lons = np.array([lon for lat, lon in SCHOOL_CENTERS.values()])

    crime_lats = valid_df["latitude"].values[:, np.newaxis]
    crime_lons = valid_df["longitude"].values[:, np.newaxis]

    R = 6371000
    lat1, lon1 = np.radians(crime_lats), np.radians(crime_lons)
    lat2, lon2 = np.radians(campus_lats), np.radians(campus_lons)

    dlat = lat2 - lat1
    dlon = lon2 - lon1

    a = np.sin(dlat / 2) ** 2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon / 2) ** 2
    c = 2 * np.arctan2(np.sqrt(a), np.sqrt(1 - a))
    distances = R * c  # meters

    min_idx = distances.argmin(axis=1)
    min_distances = distances[np.arange(len(distances)), min_idx]

    nearest_distance.loc[valid_df.index] = min_distances
    within = min_distances <= DISTANCE_THRESHOLD_M
    nearest_campus.loc[valid_df.index[within]] = [campus_names[i] for i in min_idx[within]]

    return nearest_campus, nearest_distance


df_features["campus_label"], df_features["campus_distance_m"] = calculate_nearest_campus_fully_vectorized(
    df_features
)
df_features["campus_distance_m"] = df_features["campus_distance_m"].round(4).fillna(0)
df_features["campus_code"] = df_features["campus_label"].map(CAMPUS_ENCODING).fillna(0).astype(int)

# Binary flags per campus, including near_gsu
for campus in SCHOOL_CENTERS.keys():
    col = f"near_{campus.lower()}"
    df_features[col] = (df_features["campus_label"] == campus).astype(int)

# Drop shapefile-based campus label to avoid confusion
df_features = df_features.drop(columns=["campus_label_shp"], errors="ignore")

log_step("Step 10: Campus distance & flags", df_features)

--- Section 11: Spatial Repair & NPU Cleanup ---

In [12]:
console.print(
    Panel.fit("[bold cyan]Step 11: Spatial repair & NPU cleanup[/bold cyan]", border_style="cyan")
)

df = df_features.copy()

cols_to_inspect = ["zone_int", "npu", "neighborhood", "city_label", "campus_label"]
snapshot_before: Dict[str, Any] = {}
for col in cols_to_inspect:
    if col in df.columns:
        missing = df[col].isna().sum()
        pct = (missing / len(df)) * 100
        snapshot_before[col] = (missing, pct)

valid_df = df.dropna(subset=["longitude", "latitude"]).copy()
gdf = gpd.GeoDataFrame(
    valid_df,
    geometry=gpd.points_from_xy(valid_df["longitude"], valid_df["latitude"]),
    crs="EPSG:4326",
)

# NPU repair
if NPU_SHP.exists() and "npu" in df.columns:
    console.print("[cyan]Repairing NPU via spatial join...[/cyan]")
    gdf_npu = gpd.read_file(NPU_SHP).to_crs("EPSG:4326")
    cand = ["NPU", "NPU_ID", "NAME"]
    npu_col = next((c for c in cand if c in gdf_npu.columns), gdf_npu.columns[0])

    missing_mask = df["npu"].isna() | df["npu"].astype(str).str.upper().str.strip().isin(
        ["0", "NAN", ""]
    )
    if missing_mask.sum() > 0:
        gdf_missing = gdf[gdf.index.isin(df[missing_mask].index)]
        joined = gpd.sjoin(
            gdf_missing,
            gdf_npu[[npu_col, "geometry"]],
            how="left",
            predicate="intersects",
        ).drop(columns=["index_right"], errors="ignore")
        joined_clean = joined.groupby(joined.index).first()
        valid = joined_clean[npu_col].notna()
        idx = joined_clean[valid].index.intersection(df.index)
        df.loc[idx, "npu"] = joined_clean.loc[idx, npu_col].values
        console.print(f"[green]Filled {len(idx):,} NPU values.[/green]")

# Zone repair
if APD_ZONE_SHP.exists() and "zone_int" in df.columns:
    console.print("[cyan]Repairing zone_int via spatial join...[/cyan]")
    gdf_zones = gpd.read_file(APD_ZONE_SHP).to_crs("EPSG:4326")
    zone_col = "ZONE" if "ZONE" in gdf_zones.columns else gdf_zones.columns[0]
    missing_mask = df["zone_int"].isna()
    if missing_mask.sum() > 0:
        gdf_missing = gdf[gdf.index.isin(df[missing_mask].index)]
        joined = gpd.sjoin(
            gdf_missing, gdf_zones[[zone_col, "geometry"]], how="left", predicate="intersects"
        ).drop(columns=["index_right"], errors="ignore")
        joined_clean = joined.groupby(joined.index).first()
        zone_int_filled = pd.to_numeric(
            joined_clean[zone_col].astype(str).str.extract(r"(\d+)", expand=False),
            errors="coerce",
        )
        valid = zone_int_filled.notna()
        idx = zone_int_filled[valid].index.intersection(df.index)
        df.loc[idx, "zone_int"] = zone_int_filled.loc[idx].values
        console.print(f"[green]Filled {len(idx):,} zone_int values.[/green]")

# Neighborhood repair
if NEIGHBORHOOD_SHP.exists() and "neighborhood" in df.columns:
    console.print("[cyan]Repairing neighborhood via spatial join...[/cyan]")
    gdf_neigh = gpd.read_file(NEIGHBORHOOD_SHP).to_crs("EPSG:4326")
    name_col = "NAME"
    missing_mask = df["neighborhood"].isna() | (df["neighborhood"] == "")
    if missing_mask.sum() > 0:
        gdf_missing = gdf[gdf.index.isin(df[missing_mask].index)]
        joined = gpd.sjoin(
            gdf_missing, gdf_neigh[[name_col, "geometry"]], how="left", predicate="intersects"
        ).drop(columns=["index_right"], errors="ignore")
        joined_clean = joined.groupby(joined.index).first()
        valid = joined_clean[name_col].notna()
        idx = joined_clean[valid].index.intersection(df.index)
        df.loc[idx, "neighborhood"] = (
            joined_clean.loc[idx, name_col].str.lower().values
        )
        console.print(f"[green]Filled {len(idx):,} neighborhood values.[/green]")

console.print("[green]Spatial repair done.[/green]")
show_missing_comparison(df, snapshot_before, "Step 11: Spatial Repair")

# Clean duplicate suffix columns from merges
dup_suffixes = ("_left", "_right", "_x", "_y")
drop_dup_cols = [c for c in df.columns if c.endswith(dup_suffixes)]
if drop_dup_cols:
    df = df.drop(columns=drop_dup_cols, errors="ignore")
    console.print(f"[green]Removed {len(drop_dup_cols)} duplicate columns.[/green]")

# NPU normalization (A–Z + OUT_OF_NPU_BOUNDS)
if "npu" in df.columns:
    df["npu"] = df["npu"].astype(str).str.upper().str.strip()
    miss_npu = df["npu"].isin(["", "0", "NAN"]) | df["npu"].isna()
    df.loc[miss_npu, "npu"] = "OUT_OF_NPU_BOUNDS"
    df["npu"] = df["npu"].astype("category")
    console.print(
        f"[green]NPU normalized. OUT_OF_NPU_BOUNDS records: {miss_npu.sum():,}[/green]"
    )

log_step("Step 11: Spatial repair & NPU cleanup", df)



In [13]:
#unique_npu = df['npu_raw'].unique().tolist()
#print(unique_npu)

--- Section 12: Filter for Target Offenses & Save Interim ---

In [14]:
console.print("\n[bold cyan]Filtering for target offenses...[/bold cyan]")

TARGET_OFFENSES = [
    "larceny",
    "theft",
    "robbery",
    "burglary",
    "prowling",
    "shoplifting",
    "fraud",
    "swindle",
    "embezzelment",
    "credit card",
    "wire fraud",
    "impersonation",
    "motor vehicle theft",
]

if "nibrs_offense" not in df.columns:
    raise KeyError("'nibrs_offense' missing; cannot filter target offenses.")

mask = df["nibrs_offense"].str.contains("|".join(TARGET_OFFENSES), case=False, na=False)
df_model = df[mask].copy()

if "incident_number" in df_model.columns:
    before = len(df_model)
    df_model = df_model.drop_duplicates(subset=["incident_number"])
    after = len(df_model)
    console.print(
        f"[yellow]Removed {before - after:,} duplicate incident_numbers in target offense subset.[/yellow]"
    )

INTERIM_CSV_PATH = INTERIM_DATA_FOLDER / "apd_model_data_target_crimes.csv"
INTERIM_PARQUET_PATH = INTERIM_DATA_FOLDER / "apd_model_data_target_crimes.parquet"

df_model.to_csv(INTERIM_CSV_PATH, index=False)
try:
    df_model.to_parquet(INTERIM_PARQUET_PATH, index=False, compression="snappy")
    console.print(f"[green]Saved interim Parquet:[/green] {INTERIM_PARQUET_PATH.name}")
except Exception as e:
    console.print(f"[yellow]Interim Parquet export failed: {e}[/yellow]")

console.print(
    f"[bold yellow]Target offense rows:[/bold yellow] {len(df_model):,} / {len(df):,} total."
)
log_step("Step 12: Interim target offenses dataset saved", df_model)

In [15]:
# --- Section 12.5: Final tidy for target_crimes export ---------------------

console.print(
    Panel("[bold cyan]Step 12.5: Finalize target_crimes for modeling & export[/bold cyan]",
          border_style="cyan")
)

def finalize_target_dataset(df_model: pd.DataFrame) -> pd.DataFrame:
    df_model = df_model.copy()
    df_model["report_date"] = pd.to_datetime(df_model["report_date"])

    # Sort chronologically
    df_model = df_model.sort_values("report_date").reset_index(drop=True)

    # Re-enforce canonical temporal fields from report_date
    dt = df_model["report_date"].dt
    df_model["incident_hour"] = dt.hour
    df_model["year"] = dt.year
    df_model["month"] = dt.month
    df_model["day_of_week"] = dt.day_name()

    dow = dt.dayofweek
    apd_map = {6: 1, 0: 2, 1: 3, 2: 4, 3: 5, 4: 6, 5: 7}
    df_model["day_number"] = dow.map(apd_map).astype("int8")

    # Ensure hour_block exists and matches 0–4,5–8,... pattern
    bin_edges = [0, 5, 9, 13, 17, 21, 24]
    bin_labels = [
        "Early Night (0–4)",
        "Early Morning (5–8)",
        "Late Morning (9–12)",
        "Afternoon (13–16)",
        "Evening (17–20)",
        "Late Night (21–24)",
    ]
    df_model["hour_block"] = pd.cut(
        df_model["incident_hour"],
        bins=bin_edges,
        labels=bin_labels,
        right=False,
        include_lowest=True,
    )

    # Drop raw / shapefile duplicates we don’t want in modeling dataset
    cols_to_drop = [
        "day_of_the_week",  # legacy
        "zone", "zone_raw",  # keep zone_int only
        "npu_raw", "npu_shp",
        "neighborhood_raw", "neighborhood_shp",
        "campus_label_shp",
    ]
    drop_existing = [c for c in cols_to_drop if c in df_model.columns]
    if drop_existing:
        df_model = df_model.drop(columns=drop_existing)

    return df_model


df_model = finalize_target_dataset(df_model)
log_step("Step 12.5: target_crimes finalized (sorted, bins, cleaned columns)", df_model)


--- Section 13: Build NPU × Hour Modeling Panels & Export Feature Store ---

In [16]:
console.print(
    Panel("[bold cyan]Step 13: Build NPU–Hour panel for spatio-temporal modeling[/bold cyan]", 
          border_style="cyan")
)

df_final = df_model.copy()

In [17]:
# Ensure timestamp is properly normalized
df_final["hour_ts"] = pd.to_datetime(df_final["report_date"]).dt.floor("h")

# Ensure day_of_week is string
if "day_of_week" in df_final.columns:
    df_final["day_of_week"] = df_final["day_of_week"].astype(str)

In [18]:
# Always normalize to raw Python strings
df_final["npu"] = df_final["npu"].astype(str).str.upper().str.strip()

# Replace anything not A–Z with OUT_OF_NPU_BOUNDS
df_final.loc[~df_final["npu"].isin(list("ABCDEFGHIJKLMNOPQRSTUVWXYZ")), "npu"] = "OUT_OF_NPU_BOUNDS"

# Now drop OUT_OF_NPU_BOUNDS
df_final = df_final[df_final["npu"] != "OUT_OF_NPU_BOUNDS"]


In [19]:
# Burglary count target per NPU per hour
console.print("[cyan]Aggregating burglary counts per NPU per hour...[/cyan]")
panel_target = (
    df_final.groupby(["npu", "hour_ts"])
    .size()
    .reset_index(name="burglary_count")
)


In [20]:
npu_data = df_final['npu'].unique().tolist()
print(npu_data)

['M', 'E', 'L', 'G', 'K', 'O', 'R', 'S', 'N', 'P', 'X', 'J', 'V', 'B', 'F', 'W', 'I', 'D', 'T', 'H', 'Y', 'C', 'A', 'Z', 'Q']


In [21]:
agg_dict = {
    "grid_density_7d": "mean",
    "npu_crime_avg_30d": "first",
    "temp_f": "mean",
    "is_raining": "max",
    "is_hot": "max",
    "is_cold": "max",
    "is_daylight": "max",
    "is_weekend": "first",
    "is_holiday": "first",
    "day_number": "first",
    "month": "first",
    "year": "first",
    "hour_sin": "first",
    "hour_cos": "first",
    "day_of_week": "first",
    "campus_distance_m": "mean",
    "location_type_count": "mean",
}

panel_features = (
    df_final.groupby(["npu", "hour_ts"])
    .agg({col: func for col, func in agg_dict.items() if col in df_final.columns})
    .reset_index()
)


In [22]:
# Merge target + features → Sparse panel
console.print("[cyan]Merging aggregated features with burglary target panel...[/cyan]")

panel_merged = panel_target.merge(
    panel_features,
    on=["npu", "hour_ts"],
    how="left",
)


# Fill numeric with 0, categorical with "MISSING"
num_cols = panel_merged.select_dtypes(include=["number"]).columns
for col in num_cols:
    panel_merged[col] = panel_merged[col].fillna(0)


cat_cols = panel_merged.select_dtypes(include=["object", "category"]).columns

for col in cat_cols:
    panel_merged[col] = panel_merged[col].astype(str).fillna("MISSING")


console.print(
    f"[green]Sparse NPU–Hour panel created: {panel_merged.shape[0]:,} rows × {panel_merged.shape[1]} columns[/green]"
)
console.print(panel_merged.head(10))

In [23]:

# ----- Export full enriched datasets + panels -----

console.print(
    Panel("[bold cyan]Saving full dataset, burglary-only subset, sparse panel, and dense panel...[/bold cyan]",
          border_style="cyan")
)
# --- Ensure all boolean indicator columns are clean int64 before parquet ---
boolish = [
    "is_raining", "is_hot", "is_cold", "is_daylight",
    "is_weekend", "is_holiday"
]
# 1. Full cleaned & enriched dataset (after Step 11)
FULL_PATH_PARQUET = PROCESSED_DATA_FOLDER / "all_apd_crimes.parquet"
FULL_PATH_CSV = PROCESSED_DATA_FOLDER / "all_apd_crimes.csv"

df.to_parquet(FULL_PATH_PARQUET, index=False, compression="snappy")
df.to_csv(FULL_PATH_CSV, index=False)
console.print(f"[green]Saved full cleaned dataset →[/green] {FULL_PATH_PARQUET.name}")

# 2. Burglary-related dataset (target offenses)
BURGLARY_PARQUET = PROCESSED_DATA_FOLDER / "target_crimes.parquet"
BURGLARY_CSV = PROCESSED_DATA_FOLDER / "target_crimes.csv"

df_model.to_parquet(BURGLARY_PARQUET, index=False, compression="snappy")
df_model.to_csv(BURGLARY_CSV, index=False)
console.print(f"[green]Saved burglary-only dataset →[/green] {BURGLARY_PARQUET.name}")

# 3. Sparse NPU–hour panel
SPARSE_PANEL_PARQUET = PROCESSED_DATA_FOLDER / "npu_sparse_panel.parquet"
SPARSE_PANEL_CSV = PROCESSED_DATA_FOLDER / "npu_sparse_panel.csv"
for col in boolish:
    if col in panel_merged.columns:
        panel_merged[col] = (
            panel_merged[col]
            .replace("MISSING", 0)
            .fillna(0)
            .astype(int)
        )

panel_merged.to_parquet(SPARSE_PANEL_PARQUET, index=False, compression="snappy")
panel_merged.to_csv(SPARSE_PANEL_CSV, index=False)
console.print(f"[green]Saved sparse NPU–hour panel →[/green] {SPARSE_PANEL_PARQUET.name}")

# 4. Dense NPU–hour panel (all NPUs × all hours)
console.print("[cyan]Building dense NPU–hour panel...[/cyan]")
valid_npus = sorted(
    [n for n in df_final["npu"].unique() if isinstance(n, str) and n in list("ABCDEFGHIJKLMNOPQRSTUVWXYZ")]
)
start_ts = df_final["hour_ts"].min()
end_ts = df_final["hour_ts"].max()
all_hours = pd.date_range(start=start_ts, end=end_ts, freq="h")
all_npus = sorted(valid_npus)

panel_index = pd.MultiIndex.from_product(
    [all_npus, all_hours],
    names=["npu", "hour_ts"],
)
panel_dense = pd.DataFrame(index=panel_index).reset_index()

panel_dense = panel_dense.merge(panel_target, on=["npu", "hour_ts"], how="left")
panel_dense = panel_dense.merge(panel_features, on=["npu", "hour_ts"], how="left")

num_cols = panel_dense.select_dtypes(include=["number"]).columns
cat_cols = panel_dense.select_dtypes(include=["object", "category"]).columns

panel_dense[num_cols] = panel_dense[num_cols].fillna(0)

for col in cat_cols:
    if pd.api.types.is_categorical_dtype(panel_dense[col]):
        if "MISSING" not in panel_dense[col].cat.categories:
            panel_dense[col] = panel_dense[col].cat.add_categories(["MISSING"])
        panel_dense[col] = panel_dense[col].fillna("MISSING")
    else:
        panel_dense[col] = panel_dense[col].fillna("MISSING")

DENSE_PANEL_PARQUET = PROCESSED_DATA_FOLDER / "npu_dense_panel.parquet"
DENSE_PANEL_CSV = PROCESSED_DATA_FOLDER / "npu_dense_panel.csv"



for col in boolish:
    if col in panel_dense.columns:
        panel_dense[col] = (
            panel_dense[col]
            .replace("MISSING", 0)  # if it ever snuck in
            .fillna(0)
            .astype(int)
        )


panel_dense.to_parquet(DENSE_PANEL_PARQUET, index=False, compression="snappy")
panel_dense.to_csv(DENSE_PANEL_CSV, index=False)

console.print(
    f"[green]Dense panel created →[/green] {panel_dense.shape[0]:,} rows × {panel_dense.shape[1]} columns"
)

console.print(
    Panel.fit(
        "[bold green]Feature store exports complete (full, burglary-only, sparse, dense).[/bold green]",
        border_style="green",
    )
)

log_step("Step 13: NPU panels & feature store exports", df_model)

  if pd.api.types.is_categorical_dtype(panel_dense[col]):


--- Section 14: Pipeline Summary ---

SyntaxError: invalid syntax (3878976397.py, line 1)

In [24]:
show_pipeline_table()