In [1]:
from pathlib import Path
import os
import arcpy
import re
import pandas as pd
import matplotlib.pyplot as plt

# ------------------------------------------------------------------
# Project root directory
# ------------------------------------------------------------------
# PROJECT_ROOT should be set to the absolute path of the project root
# directory (i.e. the folder that contains this notebook and the
# data/, output/, and workspace/ subdirectories).
#
# Please make sure this path is correct for your local environment.
#
# Example:
# If the "slope_analysis_delivery" folder is located directly under
# the C: drive, then PROJECT_ROOT should be:
# r"C:\slope_analysis_delivery"
#
PROJECT_ROOT = r"C:\Gis_Projects\slope_analysis_delivery"

# Print the value of PROJECT_ROOT
print("PROJECT_ROOT =", PROJECT_ROOT)
print(
    "Note:\n"
    "Please ensure that PROJECT_ROOT is set to the absolute path of your actual "
    "\"slope_analysis_delivery\" project directory.\n"
    "For example:\n"
    "PROJECT_ROOT = r\"C:\\GIS_Projects\\slope_analysis_delivery\""
)

PROJECT_ROOT = C:\Gis_Projects\slope_analysis_delivery
Note:
Please ensure that PROJECT_ROOT is set to the absolute path of your actual "slope_analysis_delivery" project directory.
For example:
PROJECT_ROOT = r"C:\GIS_Projects\slope_analysis_delivery"


In [2]:
# ==================================================================
# Project configuration
# ==================================================================
# This section defines all project-level configuration variables,
# including input datasets, workspace locations, output paths, and
# analysis parameters used throughout the slope analysis pipeline.
# ==================================================================

# ------------------------------------------------------------------
# 1. Input dataset paths
# ------------------------------------------------------------------

# 1.1. DEM raster path (LiDAR-derived, 1 m resolution)
DEM = os.path.join(PROJECT_ROOT, r"data\input\dem\dem_1m_hamilton.tif")

# 1.2. Hamilton Primary Parcels dataset path (already clipped to the Hamilton City boundary)
PARCELS_GDB = os.path.join(PROJECT_ROOT, r"data\input\parcels\parcels.gdb")
PARCELS_RAW_FC = os.path.join(PARCELS_GDB, "hamilton_primary_parcels")

# 1.3. Constraint dataset paths (areas excluded from developable land analysis
CONSTRAINTS_GDB = os.path.join(PROJECT_ROOT, r"data\input\constraints\constraints.gdb")
SNA_FC = os.path.join(CONSTRAINTS_GDB, "Hamilton_Significant_Natural_Area")
OPEN_SPACE_FC = os.path.join(CONSTRAINTS_GDB, "Hamilton_Open_Space_Zone")
KNOWLEDGE_FC = os.path.join(CONSTRAINTS_GDB, "Hamilton_Knowledge_Zone")
FACILITIES_FC = os.path.join(CONSTRAINTS_GDB, "Hamilton_Facilities_Zone")

# 1.4. The residential and non-residential dataset path
ZONING_GDB = os.path.join(PROJECT_ROOT, r"data\input\zoning\zoning.gdb")
RES_NONRES_ZONE_FC = os.path.join(ZONING_GDB, "hamilton_residential_and_nonResidential_zone")
RES_NONRES_ZONE_FIELD = "Zone"  

# 1.5. Hamilton City boundary dataset path
BOUNDARY_GDB = os.path.join(PROJECT_ROOT, r"data\input\boundary\boundary.gdb")
BOUNDARY_HAMILTON_FC = os.path.join(BOUNDARY_GDB, "boundary_hamilton")


# ------------------------------------------------------------------
# 2. Workspace dataset paths (intermediate analysis outputs)
# ------------------------------------------------------------------

WORKSPACE_GDB = os.path.join(PROJECT_ROOT, r"data\workspace\ws.gdb")
CONSTRAINTS_MERGED_FC = os.path.join(WORKSPACE_GDB, "constraints_merged")
CONSTRAINTS_DISSOLVED_FC = os.path.join(WORKSPACE_GDB, "constraints_dissolved")
PARCELS_ERASED_FC = os.path.join(WORKSPACE_GDB, "parcels_erased_constraints")
PARCELS_ZONED_FC = os.path.join(WORKSPACE_GDB, "parcels_zoned")
PARCEL_ID_FIELD = "id"

# ------------------------------------------------------------------
# 3. Output dataset paths (final analysis results and deliverables)
# ------------------------------------------------------------------

SLOPE_GDB = os.path.join(PROJECT_ROOT, r"data\output\slope.gdb")
# The path of parcels_slope dataset, which contains all calculated values and also remain all original values
PARCELS_SLOPE = os.path.join(SLOPE_GDB, "parcels_slope")
# The final deliverable feature class for parcel-level slope analysis
PARCELS_SLOPE_DELIVER = os.path.join(SLOPE_GDB, "parcels_slope_deliver")

# ------------------------------------------------------------------
# 4. Slope classification thresholds (degrees)
# ------------------------------------------------------------------

SLOPE_CLASS = [
    (0, 6, "SLOPE_0_6"),
    (6, 12, "SLOPE_6_12"),
    (12, 18, "SLOPE_12_18"),
    (18, 25, "SLOPE_18_25"),
    (25, 35, "SLOPE_25_35"),
    (35, 90, "SLOPE_35_90"),
]

In [3]:
# ==================================================================
# Preflight project structure validation
# ==================================================================
# This module performs a fail-fast validation of the project structure
# and required input datasets before running the slope analysis pipeline.
#
# Purpose
# -------
# - Ensure all required input datasets exist and are accessible.
# - Verify that the expected project folder structure is present.
# - Automatically create missing workspace and output containers
#   (directories and empty File Geodatabases), if enabled.
#
# Important notes
# ---------------
# - Input datasets are NEVER created or modified by this module.
# - Only workspace and output containers may be created.
# - This validation is intended to be run once at the start of the
#   notebook, before any GIS processing is executed.
# ==================================================================


def ensure_gdb(path):
    """
    Ensure a File Geodatabase (.gdb) exists at the given path.

    Parameters
    ----------
    path : str
        Absolute path to the target File Geodatabase, e.g.
        r"C:\\GIS_Projects\\slope_analysis_delivery\\data\\workspace\\ws.gdb".

    Notes
    -----
    - This function creates the .gdb container only (not feature classes/tables).
    - The geodatabase will be created in the parent directory if it does not exist.
    """

    folder, name = os.path.split(path)

    if not arcpy.Exists(path):
        print(f"The GDB file does not exist, now creating a new GDB file: {path}")
        arcpy.CreateFileGDB_management(folder, name)


def _exists_dataset(path: str) -> bool:
    """
    Return True if an ArcGIS dataset exists.

    This helper wraps arcpy.Exists() to safely check existence of:
    - File Geodatabases (.gdb)
    - Feature classes / tables inside a geodatabase
    - Raster datasets

    Notes
    -----
    - Use os.path.isdir / os.path.isfile for normal filesystem paths.
    - arcpy.Exists() is the correct check for ArcGIS datasets in a GDB.
    """
    try:
        return arcpy.Exists(path)
    except Exception:
        return False


def validate_project_structure(
    exit_on_fail: bool = True, create_missing_outputs: bool = True
) -> None:
    """
    Preflight validation for the slope analysis pipeline.

    This function performs a fail-fast validation of the required project
    structure and input datasets before running any GIS processing.

    Validation rules
    ----------------
    - Input folders and input datasets MUST already exist. Missing inputs
      are always treated as errors.
    - Workspace and output containers may be created automatically when
      create_missing_outputs=True (e.g., ws.gdb, slope.gdb, csv/, plots/).
    - This function NEVER creates or modifies input datasets. It only creates
      missing output containers (directories and empty .gdb files).

    Parameters
    ----------
    exit_on_fail : bool
        If True, stop execution immediately when validation errors are detected.
    create_missing_outputs : bool
        If True, create missing workspace/output containers. Inputs are never created.
    """

    errors = []
    warnings = []

    # ------------------------------------------------------------------
    # A) Required folder structure (filesystem directories)
    # ------------------------------------------------------------------
    data_dir = os.path.join(PROJECT_ROOT, "data")
    input_dir = os.path.join(PROJECT_ROOT, r"data\input")
    workspace_dir = os.path.join(PROJECT_ROOT, r"data\workspace")
    output_dir = os.path.join(PROJECT_ROOT, r"data\output")

    required_dirs = [
        ("data", data_dir),
        ("data/input", input_dir),
        ("data/workspace", workspace_dir),
        ("data/output", output_dir),
    ]

    for label, d in required_dirs:
        if not os.path.isdir(d):
            errors.append(f"[MISSING DIR] {label}: {d}")

    # ------------------------------------------------------------------
    # B) Required input datasets (ArcGIS datasets; must exist)
    # ------------------------------------------------------------------
    required_inputs = [
        ("DEM raster", DEM),
        ("Parcels GDB", PARCELS_GDB),
        ("Parcels feature class", PARCELS_RAW_FC),
        ("Constraints GDB", CONSTRAINTS_GDB),
        ("SNA feature class", SNA_FC),
        ("Open Space feature class", OPEN_SPACE_FC),
        ("Knowledge feature class", KNOWLEDGE_FC),
        ("Facilities feature class", FACILITIES_FC),
        ("Zoning GDB", ZONING_GDB),
        ("Res/Non-res zone feature class", RES_NONRES_ZONE_FC),
        ("Boundary GDB", BOUNDARY_GDB),
        ("Hamilton boundary feature class", BOUNDARY_HAMILTON_FC),
    ]

    for label, p in required_inputs:
        if not _exists_dataset(p):
            errors.append(f"[MISSING INPUT] {label}: {p}")

    # ------------------------------------------------------------------
    # C) Output containers (deliverables and derived products)
    # ------------------------------------------------------------------
    csv_dir = os.path.join(PROJECT_ROOT, r"data\output\csv")
    plots_dir = os.path.join(PROJECT_ROOT, r"data\output\plots")

    output_dirs = [
        ("data/output/csv", csv_dir),
        ("data/output/plots", plots_dir),
    ]

    for label, d in output_dirs:
        if not os.path.isdir(d):
            if create_missing_outputs:
                os.makedirs(d, exist_ok=True)
                warnings.append(f"[CREATED DIR] {label}: {d}")
            else:
                errors.append(f"[MISSING DIR] {label}: {d}")

    # Output GDB (final outputs)
    if not _exists_dataset(SLOPE_GDB):
        if create_missing_outputs:
            try:
                ensure_gdb(SLOPE_GDB)
                warnings.append(f"[CREATED GDB] slope.gdb: {SLOPE_GDB}")
            except Exception as e:
                errors.append(f"[FAILED CREATE] slope.gdb: {SLOPE_GDB} | {e}")
        else:
            errors.append(f"[MISSING OUTPUT] slope.gdb: {SLOPE_GDB}")

    # ------------------------------------------------------------------
    # D) Workspace container (intermediate datasets; can be regenerated)
    # ------------------------------------------------------------------
    if not _exists_dataset(WORKSPACE_GDB):
        if create_missing_outputs:
            try:
                ensure_gdb(WORKSPACE_GDB)
                warnings.append(f"[CREATED GDB] ws.gdb: {WORKSPACE_GDB}")
            except Exception as e:
                errors.append(f"[FAILED CREATE] ws.gdb: {WORKSPACE_GDB} | {e}")
        else:
            errors.append(f"[MISSING WORKSPACE] ws.gdb: {WORKSPACE_GDB}")

    # ------------------------------------------------------------------
    # E) Report + exit behaviour
    # ------------------------------------------------------------------
    print(
        "\n=== Preflight Validation Report "
        "(inputs must exist; workspace/output may be created) ==="
    )

    if warnings:
        print("\nWarnings / Actions:")
        for w in warnings:
            print(" -", w)

    if errors:
        print("\nErrors:")
        for e in errors:
            print(" -", e)

        msg = f"\nPreflight validation failed: {len(errors)} issue(s) found."
        if exit_on_fail:
            raise SystemExit(msg)
        else:
            print(msg)
    else:
        print(
            "\nAll required folders and datasets are present. Ready to run the pipeline.\n"
        )


# ------------------------------------------------------------------
# Entry point: run preflight validation before executing the pipeline
# ------------------------------------------------------------------
validate_project_structure(exit_on_fail=True, create_missing_outputs=True)


=== Preflight Validation Report (inputs must exist; workspace/output may be created) ===

All required folders and datasets are present. Ready to run the pipeline.



In [4]:
# ==================================================================
# ArcPy environment configuration
# ==================================================================
# Configure geoprocessing environment settings used by the slope
# analysis pipeline (workspace, spatial reference, raster alignment,
# extent, and mask).
# ==================================================================


def set_env() -> None:
    """
    Initialise ArcPy environment settings for the slope analysis pipeline.

    This function should be called once at the start of the workflow,
    before running any raster or spatial analysis tools.

    It configures:
    - workspace and overwrite behaviour
    - Spatial Analyst license checkout
    - output coordinate system (from DEM)
    - snapRaster and cellSize (aligned to DEM)
    - extent and mask (prefer Hamilton boundary; fallback to DEM extent)
    """
    # Core geoprocessing behaviour
    arcpy.env.overwriteOutput = True
    arcpy.env.workspace = WORKSPACE_GDB

    # Required extension for Spatial Analyst tools (e.g., Slope, Reclassify, TabulateArea)
    arcpy.CheckOutExtension("Spatial")

    # Read DEM properties (spatial reference, extent, etc.)
    dem_desc = arcpy.Describe(DEM)

    # Ensure outputs align to DEM (reproducible raster processing)
    arcpy.env.outputCoordinateSystem = dem_desc.spatialReference
    arcpy.env.snapRaster = DEM
    arcpy.env.cellSize = DEM

    # Constrain processing extent/mask to Hamilton boundary when available;
    # otherwise fallback to the DEM extent.
    if arcpy.Exists(BOUNDARY_HAMILTON_FC):
        arcpy.env.extent = BOUNDARY_HAMILTON_FC
        arcpy.env.mask = BOUNDARY_HAMILTON_FC
        print(f"Boundary extent/mask: {BOUNDARY_HAMILTON_FC}")
    else:
        arcpy.env.extent = dem_desc.extent
        # Mask is intentionally not set when boundary is missing
        print("Boundary feature not found; using DEM extent only.")

    print(f"Workspace: {arcpy.env.workspace}")
    print(f"Spatial Reference: {dem_desc.spatialReference.name}")


# Entry point: initialise ArcPy env once before running the pipeline
set_env()

Boundary extent/mask: C:\Gis_Projects\slope_analysis_delivery\data\input\boundary\boundary.gdb\boundary_hamilton
Workspace: C:\Gis_Projects\slope_analysis_delivery\data\workspace\ws.gdb
Spatial Reference: NZGD_2000_Transverse_Mercator


In [5]:
# ------------------------------------------------------------------
# Spatial join: attach zoning information to parcels
# ------------------------------------------------------------------


def run_spatial_join_zone_to_parcels(overwrite: bool = False) -> None:
    """
    Attach zoning information to parcels using a spatial join.

    This step assigns a zoning category (Residential / Non-Residential)
    to each parcel by performing a Spatial Join where the parcel centroid
    falls within a zoning polygon (match_option="HAVE_THEIR_CENTER_IN").

    Only parcels with a valid zoning assignment are retained.

    Output
    ------
    PARCELS_ZONED_FC
        Parcel feature class with an added zoning field, stored in
        WORKSPACE_GDB.

    Parameters
    ----------
    overwrite : bool, optional
        If True, delete and recreate the output feature class if it
        already exists. Default is False.
    """

    ensure_gdb(WORKSPACE_GDB)

    if overwrite and arcpy.Exists(PARCELS_ZONED_FC):
        print(f"Deleting existing output: {PARCELS_ZONED_FC}")
        arcpy.management.Delete(PARCELS_ZONED_FC)

    if arcpy.Exists(PARCELS_ZONED_FC) and not overwrite:
        print(f"Output exists, skip (overwrite=False): {PARCELS_ZONED_FC}")
        return

    if not arcpy.Exists(PARCELS_RAW_FC):
        raise FileNotFoundError(f"Target parcels not found: {PARCELS_RAW_FC}")
    if not arcpy.Exists(RES_NONRES_ZONE_FC):
        raise FileNotFoundError(f"Zoning FC not found: {RES_NONRES_ZONE_FC}")

    # FieldMappings: preserve all parcel attributes and append zoning field
    fms = arcpy.FieldMappings()
    fms.addTable(PARCELS_RAW_FC)

    zone_fm = arcpy.FieldMap()
    zone_fm.addInputField(RES_NONRES_ZONE_FC, RES_NONRES_ZONE_FIELD)

    out_field = zone_fm.outputField
    out_field.name = RES_NONRES_ZONE_FIELD
    out_field.aliasName = RES_NONRES_ZONE_FIELD
    zone_fm.outputField = out_field

    zone_fm.mergeRule = "First"
    fms.addFieldMap(zone_fm)

    print("Running Spatial Join (parcel centroid within zoning polygon)...")
    arcpy.analysis.SpatialJoin(
        target_features=PARCELS_RAW_FC,
        join_features=RES_NONRES_ZONE_FC,
        out_feature_class=PARCELS_ZONED_FC,
        join_operation="JOIN_ONE_TO_ONE",
        join_type="KEEP_COMMON",  # Retain only parcels with a valid zoning assignment
        field_mapping=fms,
        match_option="HAVE_THEIR_CENTER_IN",
    )

    # Post-join validation: ensure required fields exist for downstream steps
    out_fields = {f.name for f in arcpy.ListFields(PARCELS_ZONED_FC)}
    if RES_NONRES_ZONE_FIELD not in out_fields:
        raise RuntimeError(f"Zone field not found in output: {RES_NONRES_ZONE_FIELD}")
    if PARCEL_ID_FIELD not in out_fields:
        raise RuntimeError(
            f"Parcel id field missing in output ({PARCEL_ID_FIELD}). "
            "Downstream zonal/join expects it."
        )

    print(f"Created parcels-with-zone FC: {PARCELS_ZONED_FC}")


# Entry point: attach zoning information to parcels
run_spatial_join_zone_to_parcels(overwrite=True)

Running Spatial Join (parcel centroid within zoning polygon)...
Created parcels-with-zone FC: C:\Gis_Projects\slope_analysis_delivery\data\workspace\ws.gdb\parcels_zoned


In [6]:
# ------------------------------------------------------------------
# Preprocess parcels by planning constraints (merge, dissolve, erase)
# ------------------------------------------------------------------


def preprocess_parcels_by_constraints(overwrite: bool = False) -> None:
    """
    Remove non-developable areas from parcels based on planning constraints.

    This step preprocesses the zoned parcel dataset by:
    1) Merging multiple planning constraint layers into a single feature class
    2) Dissolving the merged constraints to remove internal boundaries
    3) Erasing constrained areas from parcels (parcels minus constraints)

    The result represents parcels with constrained land removed, suitable
    for subsequent slope and feasibility analysis.

    Output
    ------
    PARCELS_ERASED_FC
        Parcel feature class with all constraint areas removed, stored in
        WORKSPACE_GDB.

    Parameters
    ----------
    overwrite : bool, optional
        If True, existing intermediate outputs will be deleted and recreated.
        Default is False.
    """

    ensure_gdb(WORKSPACE_GDB)

    _delete_if_exists(CONSTRAINTS_MERGED_FC, overwrite)
    _delete_if_exists(CONSTRAINTS_DISSOLVED_FC, overwrite)
    _delete_if_exists(PARCELS_ERASED_FC, overwrite)

    # Skip processing when all intermediate outputs already exist
    # and overwrite is disabled
    if (
        arcpy.Exists(CONSTRAINTS_MERGED_FC)
        and arcpy.Exists(CONSTRAINTS_DISSOLVED_FC)
        and arcpy.Exists(PARCELS_ERASED_FC)
        and not overwrite
    ):
        print("Preprocess outputs already exist, skip (overwrite=False).")
        return

    # Validate required input constraint layers and parcel dataset
    inputs = [SNA_FC, OPEN_SPACE_FC, KNOWLEDGE_FC, FACILITIES_FC]
    _require_exists(inputs)
    if not arcpy.Exists(PARCELS_ZONED_FC):
        raise FileNotFoundError(f"Raw parcels not found: {PARCELS_ZONED_FC}")

    # 1) Merge constraints
    print("Merging 4 constraint feature classes...")
    arcpy.management.Merge(inputs=inputs, output=CONSTRAINTS_MERGED_FC)
    print(f"Created: {CONSTRAINTS_MERGED_FC}")

    # 2) Dissolve constraints
    print("Dissolving merged constraints...")
    arcpy.management.Dissolve(
        in_features=CONSTRAINTS_MERGED_FC,
        out_feature_class=CONSTRAINTS_DISSOLVED_FC,
        dissolve_field=None,
        multi_part="MULTI_PART",
    )
    print(f"Created: {CONSTRAINTS_DISSOLVED_FC}")

    # 3) Erase parcels by constraints
    print("Erasing parcels by dissolved constraints (parcels minus constraints)...")
    arcpy.analysis.Erase(
        in_features=PARCELS_ZONED_FC,
        erase_features=CONSTRAINTS_DISSOLVED_FC,
        out_feature_class=PARCELS_ERASED_FC,
    )
    print(f"Created: {PARCELS_ERASED_FC}")


def _delete_if_exists(path: str, overwrite: bool) -> None:
    """
    Delete an existing ArcGIS dataset when overwrite is enabled.

    Parameters
    ----------
    path : str
        Path to the dataset to delete.
    overwrite : bool
        If True, the dataset will be deleted when it exists.
    """

    if overwrite and arcpy.Exists(path):
        print(f"Deleting existing output: {path}")
        arcpy.management.Delete(path)


def _require_exists(paths: list[str]) -> None:
    """
    Ensure that all required ArcGIS datasets exist.

    Parameters
    ----------
    paths : list of str
        Paths to required datasets.

    Raises
    ------
    FileNotFoundError
        If any required dataset is missing.
    """

    missing = [p for p in paths if not arcpy.Exists(p)]
    if missing:
        raise FileNotFoundError(f"Missing required input datasets: {missing}")


# Entry point: preprocess parcels by removing constrained areas
preprocess_parcels_by_constraints(overwrite=True)

Merging 4 constraint feature classes...
Created: C:\Gis_Projects\slope_analysis_delivery\data\workspace\ws.gdb\constraints_merged
Dissolving merged constraints...
Created: C:\Gis_Projects\slope_analysis_delivery\data\workspace\ws.gdb\constraints_dissolved
Erasing parcels by dissolved constraints (parcels minus constraints)...
Created: C:\Gis_Projects\slope_analysis_delivery\data\workspace\ws.gdb\parcels_erased_constraints


In [7]:
# ------------------------------------------------------------------
# Slope calculation and parcel-level zonal statistics
# ------------------------------------------------------------------


def run_slope_and_zonal(overwrite=False):
    """
    Calculate slope from the DEM and derive parcel-level slope statistics.

    This step computes a slope raster (in degrees) from the input DEM and
    calculates parcel-level zonal statistics (MEAN and MAX) using parcels
    with constraint areas already removed.

    The resulting slope metrics are joined back to a parcel feature class
    for subsequent analysis and delivery.

    Parameters
    ----------
    overwrite : bool, optional
        If True, force recalculation of intermediate outputs (e.g., slope raster
        and zonal statistics table). Default is False.

        - overwrite=False:
            Reuse existing intermediate outputs when available. This speeds up
            repeated runs and avoids unnecessary recalculation.

        - overwrite=True:
            Force a full recalculation. Recommended when:
                * the DEM has changed
                * slope calculation parameters have changed
                * previous results are suspected to be incorrect
                * a full workflow refresh is required

    Workflow
    --------
    1. Compute slope raster from DEM (degrees)
    2. Calculate parcel-level zonal statistics (MEAN, MAX)
    3. Create output parcel feature class
    4. Join slope statistics to parcels

    Outputs
    -------
    slope raster
        Stored in WORKSPACE_GDB as an intermediate product.
    zonal statistics table
        Stored in WORKSPACE_GDB.
    PARCELS_SLOPE
        Parcel feature class with MEAN and MAX slope values, stored in SLOPE_GDB.
    """

    ensure_gdb(WORKSPACE_GDB)
    ensure_gdb(SLOPE_GDB)

    slope_raster = os.path.join(WORKSPACE_GDB, "slope_degree")
    stats_table = os.path.join(WORKSPACE_GDB, "zonal_all")

    # Step 1: DEM → Slope (use the Slope tool)
    if overwrite or not arcpy.Exists(slope_raster):
        print("Calculating slope raster using Slope tool...")

        out_slope = arcpy.sa.Slope(
            in_raster=DEM,
            output_measurement="DEGREE",
            z_factor=1,  # vertical exaggeration factor (units already in meters)
            method="PLANAR",
        )
        out_slope.save(slope_raster)
        print(f"Slope raster saved: {slope_raster}")
    else:
        print(f"Using existing slope raster: {slope_raster}")

    # Step 2: Calculate parcel-level zonal statistics (MEAN, MAX)
    print("Running ZonalStatisticsAsTable (ALL)...")
    arcpy.sa.ZonalStatisticsAsTable(
        in_zone_data=PARCELS_ERASED_FC,
        zone_field=PARCEL_ID_FIELD,
        in_value_raster=slope_raster,
        out_table=stats_table,
        ignore_nodata="DATA",  # ignore nodata
        statistics_type="ALL",
    )
    print(f"Zonal table saved: {stats_table}")

    # Step 3: Create output parcel feature class for slope results
    print(f"Copying parcels to output FC: {PARCELS_SLOPE}")
    arcpy.management.CopyFeatures(PARCELS_ERASED_FC, PARCELS_SLOPE)

    # Step 4: Join slope statistics (MEAN, MAX) to parcels
    fields_to_join = ["MEAN", "MAX"]
    print(f"Joining fields {fields_to_join} into {PARCELS_SLOPE}...")
    arcpy.management.JoinField(
        in_data=PARCELS_SLOPE,
        in_field=PARCEL_ID_FIELD,
        join_table=stats_table,
        join_field=PARCEL_ID_FIELD,
        fields=fields_to_join,
    )

    print(f"Created parcels slope FC: {PARCELS_SLOPE}")


# Entry point: run slope and zonal statistics step
run_slope_and_zonal(overwrite=True)

Calculating slope raster using Slope tool...
Slope raster saved: C:\Gis_Projects\slope_analysis_delivery\data\workspace\ws.gdb\slope_degree
Running ZonalStatisticsAsTable (ALL)...
Zonal table saved: C:\Gis_Projects\slope_analysis_delivery\data\workspace\ws.gdb\zonal_all
Copying parcels to output FC: C:\Gis_Projects\slope_analysis_delivery\data\output\slope.gdb\parcels_slope
Joining fields ['MEAN', 'MAX'] into C:\Gis_Projects\slope_analysis_delivery\data\output\slope.gdb\parcels_slope...
Created parcels slope FC: C:\Gis_Projects\slope_analysis_delivery\data\output\slope.gdb\parcels_slope


In [8]:
# ------------------------------------------------------------------
# Parcel-level slope class area calculation (TabulateArea workflow)
# ------------------------------------------------------------------


def calc_slope_class_area_by_parcel(overwrite: bool = False) -> None:
    """
    Calculate parcel-level area (m²) for each slope class.

    This step derives parcel-level slope composition by reclassifying a
    continuous slope raster (degrees) into discrete slope classes and
    computing the area of each class within individual parcels using
    the TabulateArea operation.

    The resulting slope-class area statistics are permanently joined to
    the parcel feature class as AREA_SLOPE_* fields, enabling subsequent
    parcel-, zone-, and city-level analysis of slope constraints.

    Parameters
    ----------
    overwrite : bool, optional
        If False (default), existing slope-class area tables and joined
        parcel fields are reused when available.
        If True, existing slope-class area tables and previously joined
        AREA_SLOPE_* fields are removed and fully rebuilt to reflect the
        current slope classification settings.

    Workflow
    --------
    1. Reclassify the continuous slope raster (degrees) into discrete
       integer-based slope classes defined in SLOPE_CLASS
    2. Use TabulateArea to compute the area (m²) of each slope class
       within individual parcels
    3. Join the resulting area fields back to the parcel feature class
    4. Rename fields to semantic AREA_SLOPE_* names

    Outputs
    -------
    parcel slope-class area fields
        AREA_SLOPE_* fields added to PARCELS_SLOPE and stored in SLOPE_GDB.
    """

    # Path to continuous slope raster (degrees)
    slope_raster = os.path.join(WORKSPACE_GDB, "slope_degree")

    # Output table storing parcel × slope-class area results
    area_table = os.path.join(WORKSPACE_GDB, "parcel_slope_class_area")

    # ------------------------------------------------------------------
    # Pre-checks
    # ------------------------------------------------------------------
    # Ensure parcel feature class exists before running any analysis
    if not arcpy.Exists(PARCELS_SLOPE):
        print(f"{PARCELS_SLOPE} not found, skip slope area calculation.")
        return

    # ------------------------------------------------------------------
    # Step 1 and 2: Reclassify slope raster and run TabulateArea
    # ------------------------------------------------------------------
    # If area table already exists and overwrite is False, reuse it
    if arcpy.Exists(area_table) and not overwrite:
        print("Using existing slope class area table.")
    else:

        print("Reclassifying slope raster into slope classes...")

        # Build RemapRange from SLOPE_CLASS
        # Each slope interval is mapped to a sequential integer code:
        #   1 = SLOPE_0_6, 2 = SLOPE_6_12, 3 = SLOPE_12_18,
        #   4 = SLOPE_18_25, 5 = SLOPE_25_35, 6 = SLOPE_35_90
        remap_ranges = []
        for lo, hi, _ in SLOPE_CLASS:
            remap_ranges.append([lo, hi, len(remap_ranges) + 1])
        remap = arcpy.sa.RemapRange(remap_ranges)

        # Reclassify continuous slope raster to discrete class raster
        # Any slope values outside defined ranges are set to NoData
        slope_class_raster = arcpy.sa.Reclassify(
            slope_raster,
            "Value",
            remap,
            "NODATA",
        )

        # Persist the reclassified slope raster for verification and cartographic purposes.
        # This raster is saved to allow visual inspection of slope class boundaries
        # and to support subsequent map production; it is not used in any further analysis.
        out_raster = os.path.join(WORKSPACE_GDB, "slope_class_raster_all_bins")
        slope_class_raster.save(out_raster)

        # Compute area (m²) of each slope class within each parcel
        print("Running TabulateArea (parcel x slope class)...")
        arcpy.sa.TabulateArea(
            in_zone_data=PARCELS_SLOPE,
            zone_field=PARCEL_ID_FIELD,
            in_class_data=slope_class_raster,
            class_field="Value",
            out_table=area_table,
        )

        print(f"Slope class area table created: {area_table}")

    # ------------------------------------------------------------------
    # Step 3: Join slope-class area fields back to parcel feature class
    # ------------------------------------------------------------------

    # Collect VALUE_* fields generated by TabulateArea that should be joined
    print("Joining slope class area fields back to parcels...")
    join_fields = []
    for idx, (_, _, label) in enumerate(SLOPE_CLASS, start=1):
        src = f"VALUE_{idx}"  # field name in area_table
        dst = f"AREA_{label}"  # intended final field name in parcels

        if src in [f.name for f in arcpy.ListFields(area_table)]:
            join_fields.append(src)

    # Identify existing joined fields in parcels
    # This includes both raw VALUE_* fields and renamed AREA_SLOPE_* fields
    existing_fields = [f.name for f in arcpy.ListFields(PARCELS_SLOPE)]
    to_delete = [
        f
        for f in existing_fields
        if f.startswith("VALUE_") or f.startswith("AREA_SLOPE_")
    ]

    # If overwrite=True, remove previously joined fields to ensure a clean rebuild
    if overwrite and to_delete:
        print(f"Deleting existing joined fields: {len(to_delete)} fields")
        arcpy.management.DeleteField(PARCELS_SLOPE, to_delete)

    # Permanently join slope-class area fields to parcel feature class
    arcpy.management.JoinField(
        in_data=PARCELS_SLOPE,
        in_field=PARCEL_ID_FIELD,
        join_table=area_table,
        join_field=PARCEL_ID_FIELD,
        fields=join_fields,
    )

    # ------------------------------------------------------------------
    # Step 4: Rename VALUE_* fields to semantic AREA_SLOPE_* field names
    # ------------------------------------------------------------------
    existing_fields = {f.name for f in arcpy.ListFields(PARCELS_SLOPE)}

    for idx, (_, _, label) in enumerate(SLOPE_CLASS, start=1):
        old = f"VALUE_{idx}"  # default field name from TabulateArea
        new = f"AREA_{label}"  # descriptive, analysis-ready field name

        # Skip if VALUE_* field does not exist (already renamed or missing)
        if old not in existing_fields:
            continue

        # Skip renaming if target field name already exists
        if new in existing_fields:
            print(f"Field already exists, skip rename: {old} -> {new}")
            continue

        # Rename field and update alias
        arcpy.management.AlterField(
            PARCELS_SLOPE,
            old,
            new,
            new,
        )

        # Update cached field name set to reflect the rename
        existing_fields.remove(old)
        existing_fields.add(new)

    print("Slope class area calculation for each parcel finished.")


# Entry point: calculate parcel-level slope class areas
calc_slope_class_area_by_parcel(overwrite=True)

Reclassifying slope raster into slope classes...
Running TabulateArea (parcel x slope class)...
Slope class area table created: C:\Gis_Projects\slope_analysis_delivery\data\workspace\ws.gdb\parcel_slope_class_area
Joining slope class area fields back to parcels...
Slope class area calculation for each parcel finished.


In [9]:
# ------------------------------------------------------------------
# Export steep slope zones (>=18°) and intersect with parcels
# ------------------------------------------------------------------


def export_steep_slope_zones(overwrite: bool = False) -> None:
    """
    Generate steep slope polygons (>=18 degrees) and intersect them with parcels.

    This step extracts steep slope areas from the slope raster, converts them
    into polygon features, and intersects them with parcel boundaries to
    identify the location and extent of steep terrain within individual parcels.

    The output supports parcel-level analysis of steep slope distribution,
    enabling assessment of where and how steep slopes occur within parcels
    rather than simply indicating their presence or absence.

    Parameters
    ----------
    overwrite : bool, optional
        If True, existing steep slope outputs are deleted and fully rebuilt.
        If False (default), existing outputs are reused when available.

    Workflow
    --------
    1. Reclassify slope raster to retain steep slopes (>=18 degrees)
    2. Convert steep slope raster to polygon features
    3. Intersect steep slope polygons with parcels
    4. Calculate area (m²) for each parcel-level steep slope feature

    Outputs
    -------
    city_level_steep_slope_zones
        Polygon features representing contiguous steep slope areas (>=18°),
        classified into three steepness categories (18-25, 25-35, 35-90 degrees).
    parcel_level_steep_slope_zones
        Parcel-split steep slope polygons with slope_class labels and area (m²),
        stored in SLOPE_GDB.
    """

    # ------------------------------------------------------------------
    # Input raster: continuous slope values (degrees)
    # Each cell value represents the local slope angle in degrees
    # ------------------------------------------------------------------
    slope_raster = os.path.join(WORKSPACE_GDB, "slope_degree")

    # Ensure parcel layer exists; steep slope zones are only meaningful
    # when analysed within parcel boundaries
    if not arcpy.Exists(PARCELS_SLOPE):
        print(f"{PARCELS_SLOPE} not found, skip steep slope zone export.")
        return

    # Ensure slope raster exists; it must be generated beforehand
    if not arcpy.Exists(slope_raster):
        raise ValueError(
            f"{slope_raster} not found. Please run run_slope_and_zonal() first."
        )

    # ------------------------------------------------------------------
    # Output paths and overwrite handling
    # ------------------------------------------------------------------
    # These outputs are reused across runs unless overwrite=True
    out_steep_poly = os.path.join(SLOPE_GDB, "city_level_steep_slope_zones")
    out_parcel_steep = os.path.join(SLOPE_GDB, "parcel_level_steep_slope_zones")

    # If outputs already exist:
    # - overwrite=False → skip processing to avoid unnecessary recomputation
    # - overwrite=True  → delete and fully rebuild outputs
    for p in (out_steep_poly, out_parcel_steep):
        if arcpy.Exists(p):
            if overwrite:
                print(f"Deleting existing output: {p}")
                arcpy.management.Delete(p)
            else:
                print(f"Output exists (overwrite=False), skip: {p}")
                return

    # ------------------------------------------------------------------
    # Step 1: Reclassify slope raster to retain only steep slopes (>=18°)
    # ------------------------------------------------------------------
    # This step performs two operations simultaneously:
    # 1) Thresholding: slope values <18° are excluded (assigned NoData)
    # 2) Discrete classification: retained values are mapped to class codes
    print("Reclassifying slope raster to steep bins (>=18 degrees)...")

    # Build RemapRange dynamically from SLOPE_CLASS,
    # but only keep slope intervals >=18°
    remap_ranges = []
    code = 0
    for lo, hi, _ in SLOPE_CLASS:
        # Skip non-steep slope classes
        if hi <= 18:
            continue
        code += 1
        remap_ranges.append([lo, hi, code])

    # Class code mapping:
    #   1 = SLOPE_18_25
    #   2 = SLOPE_25_35
    #   3 = SLOPE_35_90
    remap = arcpy.sa.RemapRange(remap_ranges)

    # Reclassify the continuous slope raster:
    # - Cells <18° → NoData
    # - Cells >=18° → integer class codes (1, 2, 3)
    steep_raster = arcpy.sa.Reclassify(
        in_raster=slope_raster,
        reclass_field="Value",
        remap=remap,
        missing_values="NODATA",  # explicitly mask non-steep slopes
    )

    # Persist intermediate raster for QA and validation purposes only.
    # This raster supports visual inspection and comparison with derived polygons;
    # it is not used in downstream analytical steps.
    steep_raster_path = os.path.join(
        WORKSPACE_GDB, "city_level_steep_slope_classified_raster"
    )
    steep_raster.save(steep_raster_path)
    print(f"Saved steep raster: {steep_raster_path}")

    # ------------------------------------------------------------------
    # Step 2: Convert steep slope raster to polygon features
    # ------------------------------------------------------------------
    # Contiguous raster cells with the same class code are converted
    # into polygon features representing steep slope zones
    print("Converting steep raster to polygons (RasterToPolygon)...")

    arcpy.conversion.RasterToPolygon(
        in_raster=steep_raster_path,
        out_polygon_features=out_steep_poly,
        simplify="SIMPLIFY",
        raster_field="Value",  # preserve slope class codes (1/2/3)
    )

    # ------------------------------------------------------------------
    # Add human-readable slope class labels to polygon output
    # ------------------------------------------------------------------
    # RasterToPolygon may output either 'gridcode' or 'Value'
    value_field = (
        "gridcode"
        if "gridcode" in [f.name for f in arcpy.ListFields(out_steep_poly)]
        else "Value"
    )

    # Create a text field to store descriptive slope class labels
    if "slope_class" not in [f.name for f in arcpy.ListFields(out_steep_poly)]:
        arcpy.management.AddField(
            out_steep_poly, "slope_class", "TEXT", field_length=32
        )

    # Map integer class codes to semantic slope class labels
    code_to_label = {
        1: "SLOPE_18_25",
        2: "SLOPE_25_35",
        3: "SLOPE_35_90",
    }

    # Populate slope_class field based on class code
    with arcpy.da.UpdateCursor(out_steep_poly, [value_field, "slope_class"]) as cursor:
        for v, _ in cursor:
            cursor.updateRow((v, code_to_label.get(int(v), "UNKNOWN")))

    print(f"Created steep slope polygons: {out_steep_poly}")

    # ------------------------------------------------------------------
    # Step 3: Intersect steep slope polygons with parcels
    # ------------------------------------------------------------------
    # This operation splits steep slope polygons by parcel boundaries.
    # Each resulting feature represents a specific portion of a parcel
    # occupied by a particular steep slope class.
    print("Intersecting steep polygons with parcels...")

    arcpy.analysis.Intersect(
        in_features=[PARCELS_SLOPE, out_steep_poly],
        out_feature_class=out_parcel_steep,
        join_attributes="ALL",  # retain attributes from both inputs
        cluster_tolerance=None,
        output_type="INPUT",
    )

    # ------------------------------------------------------------------
    # Step 4: Calculate area (m²) for each parcel-level steep slope feature
    # ------------------------------------------------------------------
    # The calculated area supports parcel-level quantification of
    # steep slope extent and distribution
    if "area_m2" not in [f.name for f in arcpy.ListFields(out_parcel_steep)]:
        arcpy.management.AddField(out_parcel_steep, "area_m2", "DOUBLE")

    arcpy.management.CalculateGeometryAttributes(
        out_parcel_steep,
        [["area_m2", "AREA"]],
        area_unit="SQUARE_METERS",
    )

    print(f"Created parcel steep slope zones: {out_parcel_steep}")
    print("Steep slope zone export finished.")


# Entry point: export steep slope zones and intersect with parcels
export_steep_slope_zones(overwrite=True)

Reclassifying slope raster to steep bins (>=18 degrees)...
Saved steep raster: C:\Gis_Projects\slope_analysis_delivery\data\workspace\ws.gdb\city_level_steep_slope_classified_raster
Converting steep raster to polygons (RasterToPolygon)...
Created steep slope polygons: C:\Gis_Projects\slope_analysis_delivery\data\output\slope.gdb\city_level_steep_slope_zones
Intersecting steep polygons with parcels...
Created parcel steep slope zones: C:\Gis_Projects\slope_analysis_delivery\data\output\slope.gdb\parcel_level_steep_slope_zones
Steep slope zone export finished.


In [10]:
# ------------------------------------------------------------------
# Export parcels affected by steep slopes (parcel-level selection)
# ------------------------------------------------------------------


def export_steep_parcels(overwrite: bool = False) -> None:
    """
    Export parcels that are affected by steep slopes (>=18 degrees).

    This step selects and exports whole parcel polygons that contain
    any non-zero area in steep slope classes (18-25, 25-35, or 35-90 degrees),
    based on previously calculated parcel-level slope area fields.

    The output represents parcel-level exposure to steep slopes, rather
    than the geometry of steep slope areas themselves.

    Selection rule
    --------------
    A parcel is selected if:
        AREA_SLOPE_18_25 > 0
        OR AREA_SLOPE_25_35 > 0
        OR AREA_SLOPE_35_90 > 0

    Parameters
    ----------
    overwrite : bool, optional
        If True, existing output will be deleted and rebuilt.
        If False (default), existing output will be reused.
    """

    # ------------------------------------------------------------------
    # Pre-check: ensure parcel dataset exists
    # ------------------------------------------------------------------

    if not arcpy.Exists(PARCELS_SLOPE):
        print(f"{PARCELS_SLOPE} not found, skip steep parcel export.")
        return

    # ------------------------------------------------------------------
    # Output feature class
    # ------------------------------------------------------------------

    # This output contains whole parcel polygons affected by steep slopes (>=18°);
    # it does not represent steep slope geometries themselves.
    out_fc = os.path.join(SLOPE_GDB, "parcels_affected_by_steep_slopes")

    # Handle overwrite behaviour:
    # - overwrite=False → reuse existing output and skip processing
    # - overwrite=True  → delete existing output and rebuild
    if arcpy.Exists(out_fc) and not overwrite:
        print(f"{out_fc} already exists, skip (overwrite=False).")
        return
    if arcpy.Exists(out_fc) and overwrite:
        print(f"Deleting existing output: {out_fc}")
        arcpy.management.Delete(out_fc)

    # ------------------------------------------------------------------
    # Precondition check: required slope area fields
    # ------------------------------------------------------------------

    # These AREA_SLOPE_* fields store the parcel-level area (m²) of
    # steep slope classes and are generated by calc_slope_class_area_by_parcel().
    # Without them, parcel-level steep slope selection is not possible
    required_fields = {"AREA_SLOPE_18_25", "AREA_SLOPE_25_35", "AREA_SLOPE_35_90"}
    existing_fields = {f.name for f in arcpy.ListFields(PARCELS_SLOPE)}
    missing = sorted(required_fields - existing_fields)
    if missing:
        raise ValueError(
            "Missing required area fields for steep selection. "
            f"Please run calc_slope_class_area_by_parcel() first. Missing: {missing}"
        )

    # ------------------------------------------------------------------
    # Selection logic
    # ------------------------------------------------------------------
    # A parcel is considered affected by steep slopes if it contains
    # any non-zero area in one or more steep slope classes (>=18°).
    # This is a parcel-level boolean selection, not a spatial extraction
    # of steep slope geometry.
    where = "AREA_SLOPE_18_25 > 0 OR " "AREA_SLOPE_25_35 > 0 OR " "AREA_SLOPE_35_90 > 0"

    print("Selecting parcels with slope area >= 18 degrees (by area fields)")

    # Create a temporary feature layer to apply attribute selection
    lyr = "parcels_slope_lyr"
    arcpy.management.MakeFeatureLayer(PARCELS_SLOPE, lyr)

    # Apply attribute-based selection using slope area fields
    arcpy.management.SelectLayerByAttribute(lyr, "NEW_SELECTION", where)

    # Report number of selected parcels for logging / QA
    selected_count = int(arcpy.management.GetCount(lyr)[0])
    print(f"Selected parcels: {selected_count}")

    # ------------------------------------------------------------------
    # Export selected parcels
    # ------------------------------------------------------------------
    # The exported feature class contains full parcel geometries
    # for all parcels affected by steep slopes.
    print(f"Exporting steep parcels to: {out_fc}")
    arcpy.management.CopyFeatures(lyr, out_fc)

    # Clean up temporary layer
    arcpy.management.Delete(lyr)

    print("Steep parcel export finished.")


# Entry point: export parcels affected by steep slopes
export_steep_parcels(overwrite=True)

Selecting parcels with slope area >= 18 degrees (by area fields)
Selected parcels: 23663
Exporting steep parcels to: C:\Gis_Projects\slope_analysis_delivery\data\output\slope.gdb\parcels_affected_by_steep_slopes
Steep parcel export finished.


In [11]:
# ==================================================================
# Parcel-level steep slope summary metrics (attribute-based)
# ==================================================================
# This module derives parcel-level steep slope summary metrics from
# existing AREA_SLOPE_* fields generated by the slope classification
# and TabulateArea workflow.
#
# IMPORTANT:
# - This module performs NO spatial analysis.
# - All operations are attribute-based calculations on an existing
#   feature class.
# - Input AREA_SLOPE_* fields must already exist.
# ==================================================================


SLOPE_BINS = [
    "AREA_SLOPE_0_6",
    "AREA_SLOPE_6_12",
    "AREA_SLOPE_12_18",
    "AREA_SLOPE_18_25",
    "AREA_SLOPE_25_35",
    "AREA_SLOPE_35_90",
]

# Subset of slope bins considered "steep slopes" (>= 18 degrees)
STEEP_BINS = ["AREA_SLOPE_18_25", "AREA_SLOPE_25_35", "AREA_SLOPE_35_90"]

# ------------------------------------------------------------------
# Derived output field names
# ------------------------------------------------------------------
# TOTAL_FIELD:
#   Total slope area within each parcel (sum of all AREA_SLOPE_* fields)
TOTAL_FIELD = "TOTAL_SLOPE_AREA_M2"

# STEEP_M2_FIELD:
#   Total steep slope area (>=18°) within each parcel (m²)
STEEP_M2_FIELD = "STEEP_AREA_M2"

# STEEP_PCT_FIELD:
#   Percentage of steep slope area relative to total slope area (%)
STEEP_PCT_FIELD = "STEEP_AREA_PCT"

# BIN_FIELD:
#   Discrete 5% interval classification of STEEP_AREA_PCT
BIN_FIELD = "STEEP_BIN"


# ------------------------------------------------------------------
# Utility functions
# ------------------------------------------------------------------
def _ensure_field(fc: str, name: str, ftype: str, length: int | None = None) -> None:
    """
    Ensure that a field exists on the target feature class.

    If the field already exists, no action is taken.
    """

    fields = {f.name for f in arcpy.ListFields(fc)}
    if name in fields:
        return
    if ftype.upper() == "TEXT":
        arcpy.management.AddField(fc, name, ftype, field_length=length or 50)
    else:
        arcpy.management.AddField(fc, name, ftype)


def _calc_total_area(fc: str) -> None:
    """
    Calculate TOTAL_SLOPE_AREA_M2 as the sum of all slope-class area fields.
    Represents the total slope-covered area within each parcel.
    """

    # Treat NULL/None as 0 to avoid null propagation during summation
    expression = " + ".join([f"(!{c}! or 0)" for c in SLOPE_BINS])
    arcpy.management.CalculateField(fc, TOTAL_FIELD, expression, "PYTHON3")


def _calc_steep_area(fc: str) -> None:
    """
    Calculate STEEP_AREA_M2 as the sum of slope areas >=18 degrees.
    Represents the absolute extent of steep terrain within each parcel.
    """

    # Treat NULL/None as 0 to avoid null propagation during summation
    expression = " + ".join([f"(!{c}! or 0)" for c in STEEP_BINS])
    arcpy.management.CalculateField(fc, STEEP_M2_FIELD, expression, "PYTHON3")


def _calc_steep_pct(fc: str) -> None:
    """
    Calculate STEEP_AREA_PCT as the proportion of steep slope area
    relative to total slope area within each parcel.
    """

    # Guard against NULL/None and divide-by-zero
    expression = (
        f"(100.0 * (!{STEEP_M2_FIELD}! or 0) / (!{TOTAL_FIELD}! or 0)) "
        f"if (!{TOTAL_FIELD}! or 0) > 0 else 0"
    )
    arcpy.management.CalculateField(fc, STEEP_PCT_FIELD, expression, "PYTHON3")


def _calc_bin(fc: str) -> None:
    """
    Classify STEEP_AREA_PCT into discrete 5% interval bins (e.g. 0-5%, 5-10%, …, 95-100%).

    The resulting STEEP_BIN field supports parcel-level grouping,
    statistical summaries, and categorical visualisation.
    """

    code_block = """
def bin_label(pct):
    if pct is None:
        return None
    try:
        pct = float(pct)
    except:
        return None    

    # handle 100 explicitly
    if pct >= 100:
        return "95-100%"

    edges = list(range(0, 105, 5))  # 0...100
    for i in range(len(edges)-1):
        lo = edges[i]
        hi = edges[i+1]
        if lo <= pct < hi:
            return f"{lo}-{hi}%"
    return None
"""

    arcpy.management.CalculateField(
        fc,
        BIN_FIELD,
        f"bin_label(!{STEEP_PCT_FIELD}!)",
        "PYTHON3",
        code_block,
    )


def add_steep_bins(fc: str) -> None:
    """
    Derive and populate parcel-level steep slope summary fields.

    This function computes parcel-level steep slope metrics from
    precomputed AREA_SLOPE_* fields, including:
      - total slope area
      - total steep slope area (>=18°)
      - percentage of steep slope area
      - 5% interval steep slope proportion bins

    Preconditions
    -------------
    - The target feature class must already contain AREA_SLOPE_* fields
      generated by calc_slope_class_area_by_parcel().
    """

    if not arcpy.Exists(fc):
        raise FileNotFoundError(f"Target FC not found: {fc}")

    # Verify required input fields exist
    existing = {f.name for f in arcpy.ListFields(fc)}
    missing_bins = [c for c in SLOPE_BINS if c not in existing]
    if missing_bins:
        raise ValueError(
            "Missing required slope area fields on target FC. "
            f"Missing: {missing_bins}. Run calc_slope_class_area_by_parcel() first."
        )

    # Ensure derived fields exist
    _ensure_field(fc, TOTAL_FIELD, "DOUBLE")
    _ensure_field(fc, STEEP_M2_FIELD, "DOUBLE")
    _ensure_field(fc, STEEP_PCT_FIELD, "DOUBLE")
    _ensure_field(fc, BIN_FIELD, "TEXT", length=12)

    # Perform attribute calculations
    print("Calculating TOTAL_SLOPE_AREA_M2...")
    _calc_total_area(fc)

    print("Calculating STEEP_AREA_M2...")
    _calc_steep_area(fc)

    print("Calculating STEEP_AREA_PCT...")
    _calc_steep_pct(fc)

    print("Calculating STEEP_BIN...")
    _calc_bin(fc)

    print("Steep bin fields added and calculated.")

In [12]:
# ------------------------------------------------------------------
# Build delivery-ready parcel slope Feature Class
# ------------------------------------------------------------------
def build_parcels_slope_delivery_fc(in_fc: str, out_fc: str) -> str:
    """
    Build a delivery-ready parcel slope Feature Class using a whitelist strategy.

    This function prepares a clean, analysis-ready Feature Class for downstream
    delivery (CSV export, pandas analysis, reporting) by:
      - copying the source parcel Feature Class
      - retaining only an explicitly approved set of delivery fields
      - removing all intermediate and non-essential attributes

    The whitelist approach ensures a stable and controlled delivery schema,
    independent of internal processing or intermediate fields.

    Parameters
    ----------
    in_fc : str
        Input parcel Feature Class containing full analysis results.
    out_fc : str
        Output delivery Feature Class path.

    Returns
    -------
    str
        Path to the cleaned, delivery-ready Feature Class.
    """

    # These fields represent:
    # - parcel identifiers and zoning context
    # - geometric attributes
    # - parcel-level slope metrics (area by slope class)
    keep_fields = [
        "id",
        "Zone",
        "calc_area",
        "Shape_Length",
        "Shape_Area",
        "MEAN",
        "MAX",
        "AREA_SLOPE_0_6",
        "AREA_SLOPE_6_12",
        "AREA_SLOPE_12_18",
        "AREA_SLOPE_18_25",
        "AREA_SLOPE_25_35",
        "AREA_SLOPE_35_90",
    ]

    # Ensure input feature class exists
    if not arcpy.Exists(in_fc):
        raise FileNotFoundError(f"Input FC not found: {in_fc}")

    # ------------------------------------------------------------------
    # Create a fresh delivery FC
    # Always rebuild to ensure a clean output
    # ------------------------------------------------------------------
    if arcpy.Exists(out_fc):
        arcpy.management.Delete(out_fc)

    print(f"Creating delivery FC: {out_fc}")
    arcpy.management.CopyFeatures(in_fc, out_fc)

    # ------------------------------------------------------------------
    # Identify system-managed fields that must never be deleted
    # ------------------------------------------------------------------
    desc = arcpy.Describe(out_fc)
    oid_field = getattr(desc, "OIDFieldName", None)
    shape_field = getattr(desc, "shapeFieldName", None)

    system_keep = {f for f in [oid_field, shape_field] if f}

    # Final whitelist = user-defined fields + required system fields
    keep_set = set(keep_fields) | system_keep

    # ------------------------------------------------------------------
    # Field cleanup using whitelist strategy
    # ------------------------------------------------------------------
    existing = [f.name for f in arcpy.ListFields(out_fc)]

    # Warn if expected delivery fields are missing
    missing = [f for f in keep_fields if f not in existing]
    if missing:
        print(f"WARNING: Missing expected fields in delivery FC: {missing}")

    # Delete all fields not explicitly approved for delivery
    to_delete = [f for f in existing if f not in keep_set]
    if to_delete:
        print(f"Deleting {len(to_delete)} fields from delivery FC.")
        arcpy.management.DeleteField(out_fc, to_delete)
    else:
        print("No fields to delete; delivery FC already clean.")

    print("Delivery FC created and cleaned.")
    return out_fc


# ------------------------------------------------------------------
# Final delivery: export parcel slope metrics to CSV
# ------------------------------------------------------------------
def export_parcels_slope_to_csv() -> None:
    """
    Export parcel-level slope and steep-slope metrics to CSV for analysis.

    This function represents the final delivery step of the parcel-level
    slope analysis workflow. It:
      1. Builds a clean, delivery-ready parcel Feature Class
      2. Derives steep-slope summary metrics on the delivery dataset
      3. Exports the resulting attribute table to CSV

    The exported CSV is analysis-ready and intended for:
      - pandas-based statistical analysis
      - plotting and visualisation
      - reporting and appendix tables
    """

    # ------------------------------------------------------------------
    # Pre-check: ensure parcel slope dataset exists
    # ------------------------------------------------------------------
    if not arcpy.Exists(PARCELS_SLOPE):
        print(f"{PARCELS_SLOPE} not found, skip CSV export.")
        return

    # ------------------------------------------------------------------
    # Output directory for CSV delivery
    # ------------------------------------------------------------------
    out_dir = os.path.join(PROJECT_ROOT, r"data\output\csv")
    os.makedirs(out_dir, exist_ok=True)

    out_name = "parcels_slope_steep_metrics.csv"
    out_path = os.path.join(out_dir, out_name)

    # ------------------------------------------------------------------
    # Step 1: Build delivery-clean Feature Class
    # This removes all intermediate and non-essential fields
    # ------------------------------------------------------------------
    delivery_fc = build_parcels_slope_delivery_fc(PARCELS_SLOPE, PARCELS_SLOPE_DELIVER)

    # ------------------------------------------------------------------
    # Step 2: Derive steep-slope metrics on the delivery FC
    # (TOTAL_SLOPE_AREA_M2, STEEP_AREA_M2, STEEP_AREA_PCT, STEEP_BIN)
    # ------------------------------------------------------------------
    add_steep_bins(delivery_fc)

    # ------------------------------------------------------------------
    # Step 3: Export attribute table to CSV for pandas
    # ------------------------------------------------------------------
    print(f"Exporting delivery parcels_slope to CSV: {out_path}")

    arcpy.conversion.TableToTable(
        in_rows=delivery_fc,
        out_path=out_dir,
        out_name=out_name,
    )

    print("CSV export finished.")


# Entry point: final CSV delivery of parcel slope metrics
export_parcels_slope_to_csv()

Creating delivery FC: C:\Gis_Projects\slope_analysis_delivery\data\output\slope.gdb\parcels_slope_deliver
Deleting 10 fields from delivery FC.
Delivery FC created and cleaned.
Calculating TOTAL_SLOPE_AREA_M2...
Calculating STEEP_AREA_M2...
Calculating STEEP_AREA_PCT...
Calculating STEEP_BIN...
Steep bin fields added and calculated.
Exporting delivery parcels_slope to CSV: C:\Gis_Projects\slope_analysis_delivery\data\output\csv\parcels_slope_steep_metrics.csv
CSV export finished.


In [13]:
# ==================================================================
# Parcel-level steep slope analysis (post-processing, attribute-only)
# ==================================================================
# This script performs attribute-based analysis on parcel-level outputs
# exported from the ArcPy slope analysis pipeline (CSV delivery).
#
# Scope
# -----
# - Load parcel-level slope metrics from CSV
# - Produce city-level, zone-level, and parcel-level summary tables
# - Derive steep-slope exposure statistics using predefined slope bins
# - Export analysis-ready CSV outputs for reporting and plotting
#
# Notes
# -----
# - No spatial operations are performed in this script.
# - All calculations are based on tabulated slope-area attributes.
# - Designed to be reproducible and runnable as a standalone analysis step.
# ==================================================================

# --------------------------------------------------
# Paths
# --------------------------------------------------

# Output directory for derived CSV tables produced by this script.
CSV_PATH = os.path.join(
    PROJECT_ROOT,
    r"data\output\csv\parcels_slope_steep_metrics.csv",
)

# Output directory for all derived CSV tables generated by this script.
OUT_DIR = os.path.join(PROJECT_ROOT, r"data\output\csv")


# --------------------------------------------------
# Constant fields (analysis schema)
# These column names define the expected and stable schema of the delivery CSV.
# --------------------------------------------------

# Slope-area fields representing discrete slope classes (degrees).
# These fields are produced by the ArcPy slope classification + TabulateArea workflow.
SLOPE_BINS = [
    "AREA_SLOPE_0_6",
    "AREA_SLOPE_6_12",
    "AREA_SLOPE_12_18",
    "AREA_SLOPE_18_25",
    "AREA_SLOPE_25_35",
    "AREA_SLOPE_35_90",
]

# Core attribute fields expected in the input CSV
ZONE_COL = "Zone"  # Residential / Non-Residential classification
PARCEL_ID_COL = "id"  # Unique parcel identifier

# Optional derived fields that may already exist in the CSV.
# If present, they will be reused to maintain consistency with ArcPy outputs.
OPTIONAL_COLS = [
    "TOTAL_SLOPE_AREA_M2",
    "STEEP_AREA_M2",
    "STEEP_AREA_PCT",
    "STEEP_BIN",
]

# Zones explicitly targeted in comparative analysis
TARGET_ZONES = ["Residential Zone", "Non Residential Zone"]

os.makedirs(OUT_DIR, exist_ok=True)

# --------------------------------------------------
# Utilities
# --------------------------------------------------


def round_pct(df: pd.DataFrame, cols: list[str], ndigits: int = 1) -> pd.DataFrame:
    """
    Round percentage columns for presentation only.

    Notes
    -----
    - Applied at final output stage.
    - Rounding may cause totals to deviate slightly from 100%.
    """

    df = df.copy()
    for c in cols:
        if c in df.columns:
            df[c] = df[c].round(ndigits)
    return df


def normalize_steep_bin_label(val: object) -> str:
    """
    Normalise STEEP_BIN labels into a canonical format (e.g., '10-15%').

    This function does not reclassify data; it standardises label formatting
    to ensure reliable grouping and aggregation.
    """

    if val is None or (isinstance(val, float) and pd.isna(val)):
        return ""

    s = str(val).strip()
    if not s:
        return ""

    # Replace en dash / em dash with hyphen
    s = s.replace("–", "-").replace("—", "-")

    # Remove spaces
    s = re.sub(r"\s+", "", s)

    # Ensure it ends with %
    if not s.endswith("%"):
        s = s + "%"

    # Canonicalize patterns like "10to15%" if any (rare)
    s = s.replace("to", "-")

    return s


def expected_bins_0_to_100(step: int = 5) -> list[str]:
    """
    Return the expected steep-bin sequence for reporting (0-100% in fixed steps).

    This ensures bins with zero counts are still present in output tables.
    """

    bins = []
    for lo in range(0, 100, step):
        hi = lo + step
        bins.append(f"{lo}-{hi}%")
    return bins


# --------------------------------------------------
# Load & clean
# --------------------------------------------------


def load_parcel_slope_data() -> pd.DataFrame:
    """
    Load the delivery CSV and enforce a clean analysis schema.

    Responsibilities
    ----------------
    - Validate presence of required base fields (id, Zone, AREA_SLOPE_*)
    - Retain optional derived fields if already present
    - Enforce numeric types for slope-area attributes
    - Normalise STEEP_BIN labels for consistent aggregation

    Notes
    -----
    - No spatial metrics are recomputed here.
    - This mirrors downstream logic while preserving upstream results when available.
    """

    df = pd.read_csv(CSV_PATH)

    required_cols = [PARCEL_ID_COL, ZONE_COL] + SLOPE_BINS

    missing = [c for c in required_cols if c not in df.columns]
    if missing:
        raise ValueError(f"Missing required columns: {missing}")

    # Keep required columns + optional columns if present in the CSV
    keep_cols = required_cols + [c for c in OPTIONAL_COLS if c in df.columns]
    df = df[keep_cols].copy()

    # Keep only parcels with valid Zone (equivalent to KEEP_COMMON in Spatial Join)
    df = df[df[ZONE_COL].notna()].copy()

    # Ensure numeric area values for slope bins
    for c in SLOPE_BINS:
        df[c] = pd.to_numeric(df[c], errors="coerce").fillna(0.0)

    # Total slope-covered area per parcel (compute only if not already present)
    if "TOTAL_SLOPE_AREA_M2" not in df.columns:
        df["TOTAL_SLOPE_AREA_M2"] = df[SLOPE_BINS].sum(axis=1)
    else:
        df["TOTAL_SLOPE_AREA_M2"] = pd.to_numeric(
            df["TOTAL_SLOPE_AREA_M2"], errors="coerce"
        ).fillna(0.0)

    # If STEEP_AREA_* exist in CSV, keep them as numeric; otherwise, they will be computed
    # only where needed by parcel-level function (to keep behavior consistent).
    if "STEEP_AREA_M2" in df.columns:
        df["STEEP_AREA_M2"] = pd.to_numeric(
            df["STEEP_AREA_M2"], errors="coerce"
        ).fillna(0.0)
    if "STEEP_AREA_PCT" in df.columns:
        df["STEEP_AREA_PCT"] = pd.to_numeric(
            df["STEEP_AREA_PCT"], errors="coerce"
        ).fillna(0.0)

    # Normalize STEEP_BIN values if present (does not change meaning, only formatting)
    if "STEEP_BIN" in df.columns:
        df["STEEP_BIN"] = df["STEEP_BIN"].apply(normalize_steep_bin_label)

    return df


# --------------------------------------------------
# City-level statistics
# --------------------------------------------------


def city_level_slope_stats(df: pd.DataFrame) -> pd.DataFrame:
    """
    Compute city-level slope-area totals and percentages.

    Returns
    -------
    DataFrame with columns:
    - area_m2: total area (m²) per slope class
    - area_pct: percentage of total slope-covered area
    """

    area = df[SLOPE_BINS].sum()
    pct = area / area.sum() * 100.0

    out = pd.DataFrame({"area_m2": area, "area_pct": pct})
    return out.sort_values("area_m2", ascending=False)


# --------------------------------------------------
# Zone-level statistics
# --------------------------------------------------


def zone_level_slope_stats(df: pd.DataFrame) -> pd.DataFrame:
    """
    Compute zone-level slope exposure statistics.

    Returns
    -------
    DataFrame indexed by Zone, including:
    - TOTAL_AREA_M2
    - AREA_18PLUS_M2 / PCT_18PLUS
    - AREA_25PLUS_M2 / PCT_25PLUS
    - AREA_35PLUS_M2 / PCT_35PLUS
    """

    zone_area = df.groupby(ZONE_COL)[SLOPE_BINS].sum()
    zone_area["TOTAL_AREA_M2"] = zone_area.sum(axis=1)

    zone_area["AREA_18PLUS_M2"] = (
        zone_area["AREA_SLOPE_18_25"]
        + zone_area["AREA_SLOPE_25_35"]
        + zone_area["AREA_SLOPE_35_90"]
    )
    zone_area["AREA_25PLUS_M2"] = (
        zone_area["AREA_SLOPE_25_35"] + zone_area["AREA_SLOPE_35_90"]
    )
    zone_area["AREA_35PLUS_M2"] = zone_area["AREA_SLOPE_35_90"]

    zone_area["PCT_18PLUS"] = (
        zone_area["AREA_18PLUS_M2"] / zone_area["TOTAL_AREA_M2"] * 100.0
    )
    zone_area["PCT_25PLUS"] = (
        zone_area["AREA_25PLUS_M2"] / zone_area["TOTAL_AREA_M2"] * 100.0
    )
    zone_area["PCT_35PLUS"] = (
        zone_area["AREA_35PLUS_M2"] / zone_area["TOTAL_AREA_M2"] * 100.0
    )

    return zone_area.sort_values("TOTAL_AREA_M2", ascending=False)


def export_res_vs_nonres(zone_df: pd.DataFrame) -> pd.DataFrame:
    """
    Extract a Residential vs Non-Residential comparison table from zone-level stats.

    Returns a two-row DataFrame for the target zones with core exposure metrics.
    """

    available = [z for z in TARGET_ZONES if z in zone_df.index]
    if len(available) < 2:
        raise ValueError(
            "Expected both Residential Zone and Non Residential Zone in zone data."
        )

    cols = [
        "TOTAL_AREA_M2",
        "AREA_18PLUS_M2",
        "PCT_18PLUS",
        "AREA_25PLUS_M2",
        "PCT_25PLUS",
        "AREA_35PLUS_M2",
        "PCT_35PLUS",
    ]
    return zone_df.loc[available, cols].copy()


# --------------------------------------------------
# Parcel-level steep analysis
# --------------------------------------------------


def parcel_level_steep_stats(df: pd.DataFrame) -> pd.DataFrame:
    """
    Compute parcel-level steep slope exposure metrics for ranking and filtering.

    Behaviour
    ---------
    - Reuse STEEP_AREA_M2 / STEEP_AREA_PCT if present
    - Compute missing derived fields when required
    - Return only parcels with STEEP_AREA_M2 > 0
    """

    df = df.copy()

    if "STEEP_AREA_M2" not in df.columns:
        df["STEEP_AREA_M2"] = (
            df["AREA_SLOPE_18_25"] + df["AREA_SLOPE_25_35"] + df["AREA_SLOPE_35_90"]
        )

    if "STEEP_AREA_PCT" not in df.columns:
        df["STEEP_AREA_PCT"] = 0.0
        mask = df["TOTAL_SLOPE_AREA_M2"] > 0
        df.loc[mask, "STEEP_AREA_PCT"] = (
            df.loc[mask, "STEEP_AREA_M2"] / df.loc[mask, "TOTAL_SLOPE_AREA_M2"] * 100.0
        )

    steep_df = df[df["STEEP_AREA_M2"] > 0].copy()
    return steep_df.sort_values("STEEP_AREA_M2", ascending=False)


# --------------------------------------------------
# STEEP_BIN distribution counts (10-15% ... 95-100%)
# --------------------------------------------------


def steep_bin_counts_overall_res_nonres(df: pd.DataFrame) -> pd.DataFrame:
    """
    Compute STEEP_BIN distribution counts and percentages (0-100% in 5% bins).

    Outputs include distributions for:
    - All parcels
    - Residential Zone
    - Non Residential Zone
    """

    if "STEEP_BIN" not in df.columns:
        raise ValueError(
            "STEEP_BIN column not found in input CSV. Please ensure it is exported."
        )

    bins = expected_bins_0_to_100(step=5)

    # Normalize STEEP_BIN labels
    sbin = df["STEEP_BIN"].apply(normalize_steep_bin_label)
    valid = sbin.ne("")

    # ---- totals (denominators) ----
    total_all = int(valid.sum())
    total_res = int((valid & (df[ZONE_COL] == "Residential Zone")).sum())
    total_nonres = int((valid & (df[ZONE_COL] == "Non Residential Zone")).sum())

    out_rows = []
    for b in bins:
        all_cnt = int((valid & (sbin == b)).sum())
        res_cnt = int(
            (valid & (sbin == b) & (df[ZONE_COL] == "Residential Zone")).sum()
        )
        nonres_cnt = int(
            (valid & (sbin == b) & (df[ZONE_COL] == "Non Residential Zone")).sum()
        )

        out_rows.append(
            {
                "STEEP_BIN": b,
                "All_parcels": all_cnt,
                "All_%": (all_cnt / total_all * 100.0) if total_all > 0 else 0.0,
                "Residential_Zone": res_cnt,
                "Res_%": (res_cnt / total_res * 100.0) if total_res > 0 else 0.0,
                "Non_Residential_Zone": nonres_cnt,
                "Non-Res_%": (
                    nonres_cnt / total_nonres * 100.0 if total_nonres > 0 else 0.0
                ),
            }
        )

    out_df = pd.DataFrame(out_rows)

    # Optional: round percentage columns (consistent with rest of pipeline)
    out_df = round_pct(out_df, ["All_%", "Res_%", "Non-Res_%"])

    return out_df


# --------------------------------------------------
# Main
# --------------------------------------------------


def run_analysis():
    """
    operate post-processing analysis and export derived CSV tables.

    Workflow
    --------
    1. Load and validate parcel-level delivery CSV
    2. Export city-level slope summary table
    3. Export zone-level slope exposure tables (including res vs non-res)
    4. Export parcel-level steep exposure table
    5. Export STEEP_BIN distribution counts (overall + res/non-res)

    Notes
    -----
    - No plotting or spatial operations are performed in this function.
    """

    df = load_parcel_slope_data()
    print("Loaded parcels:", df.shape)

    # City-level summary
    city = city_level_slope_stats(df)
    city = round_pct(city, ["area_pct"])
    city.to_csv(os.path.join(OUT_DIR, "city_slope_stats.csv"))
    print("Saved city_slope_stats.csv")

    # Zone-level summary
    zone = zone_level_slope_stats(df)
    zone = round_pct(zone, ["PCT_18PLUS", "PCT_25PLUS", "PCT_35PLUS"])
    zone.to_csv(os.path.join(OUT_DIR, "zone_slope_stats.csv"))
    print("Saved zone_slope_stats.csv")

    # Residential vs Non-Residential comparison
    res_nonres = export_res_vs_nonres(zone)
    res_nonres = round_pct(res_nonres, ["PCT_18PLUS", "PCT_25PLUS", "PCT_35PLUS"])
    res_nonres.to_csv(os.path.join(OUT_DIR, "zone_slope_stats_res_vs_nonres.csv"))
    print("Saved zone_slope_stats_res_vs_nonres.csv")

    # Parcel-level steep stats
    steep = parcel_level_steep_stats(df)
    steep = round_pct(steep, ["STEEP_AREA_PCT"])
    steep.to_csv(os.path.join(OUT_DIR, "parcel_steep_stats.csv"), index=False)
    print("Saved parcel_steep_stats.csv")

    # STEEP_BIN counts for 0-5% ... 95-100% (overall + res/nonres)
    steep_bin_counts = steep_bin_counts_overall_res_nonres(df)
    steep_bin_counts.to_csv(
        os.path.join(OUT_DIR, "parcel_steep_bin_counts_0_100_res_nonres.csv"),
        index=False,
    )
    print("Saved parcel_steep_bin_counts_0_100_res_nonres.csv")


# Entry point: run post-processing analysis
run_analysis()

Loaded parcels: (56481, 12)
Saved city_slope_stats.csv
Saved zone_slope_stats.csv
Saved zone_slope_stats_res_vs_nonres.csv
Saved parcel_steep_stats.csv
Saved parcel_steep_bin_counts_0_100_res_nonres.csv


In [14]:
# ==================================================================
# Plot generation for Hamilton parcel-level slope analysis (attribute-only)
# ==================================================================
# This script reads analysis-ready CSV tables produced by the post-processing
# statistics stage and generates publication-ready figures for reporting.
#
# Inputs (from data/output/csv)
# -----------------------------
# - zone_slope_stats.csv
# - city_slope_stats.csv (optional)
# - parcel_steep_bin_counts_0_100_res_nonres.csv
#
# Outputs (to data/output/plots)
# -----------------------------
# - Zone slope composition pie charts
# - Zone steep exposure comparison charts (bar + stacked bar)
# - Parcel steep-bin distribution charts (pie + res vs non-res bar)
#
# Notes
# -----
# - No ArcPy or spatial processing is performed here.
# - All operations are pandas + matplotlib only.
# - Figures are written to OUT_DIR.
# ==================================================================


# --------------------------------------------------
# Paths
# --------------------------------------------------

# Directory containing analysis-ready CSV tables generated by the statistics script.
CSV_DIR = os.path.join(PROJECT_ROOT, r"data\output\csv")

# Output directory for all figure files generated by this script.
OUT_DIR = os.path.join(PROJECT_ROOT, r"data\output\plots")

# Ensure output directory exists so that fig.savefig() will not fail.
os.makedirs(OUT_DIR, exist_ok=True)

# --------------------------------------------------
# Constants
# --------------------------------------------------

# Zone-level summary table produced by zone_level_slope_stats()
# Contains per-zone slope bin areas and derived steep indicators.
ZONE_STATS_CSV = os.path.join(CSV_DIR, "zone_slope_stats.csv")

# Parcel steep-bin distribution table produced by steep_bin_counts_overall_res_nonres()
# Contains parcel counts by steep-slope proportion bins for overall/res/non-res.
STEEP_BIN_COUNTS_CSV = os.path.join(
    CSV_DIR, "parcel_steep_bin_counts_0_100_res_nonres.csv"
)

# Canonical zone labels used across the analysis pipeline.
RES_ZONE = "Residential Zone"
NONRES_ZONE = "Non Residential Zone"

# Slope bin area fields used in zone_slope_stats.csv (degrees).
# NOTE: These labels must match the Zone values in zone_slope_stats.csv.
SLOPE_BINS = [
    "AREA_SLOPE_0_6",
    "AREA_SLOPE_6_12",
    "AREA_SLOPE_12_18",
    "AREA_SLOPE_18_25",
    "AREA_SLOPE_25_35",
    "AREA_SLOPE_35_90",
]

# Subset of bins defined as "steep" for zone-level comparisons (>=18 degrees).
STEEP_BINS = [
    "AREA_SLOPE_18_25",
    "AREA_SLOPE_25_35",
    "AREA_SLOPE_35_90",
]

# Columns expected in the parcel_steep_bin_counts_0_100_res_nonres.csv table.
# These are counts (not percentages).
STEEP_COUNT_COLS = ["All_parcels", "Residential_Zone", "Non_Residential_Zone"]


# --------------------------------------------------
# Helper
# --------------------------------------------------


def pretty_bin(label: str) -> str:
    """
    Convert a slope-area field name into a human-readable bin label.

    Example
    -------
    'AREA_SLOPE_0_6' -> '0-6°'
    """

    return label.replace("AREA_SLOPE_", "").replace("_", "-") + "°"


def load_zone_stats() -> pd.DataFrame:
    """
    Load and validate the zone-level slope statistics table.

    Input
    -----
    zone_slope_stats.csv (ZONE_STATS_CSV)

    Validation
    ----------
    - Requires a 'Zone' column (used as index)
    - Requires all SLOPE_BINS columns
    - Requires TOTAL_AREA_M2 (for percentage-based plots)

    Returns
    -------
    DataFrame indexed by Zone.
    """

    df = pd.read_csv(ZONE_STATS_CSV)

    if "Zone" not in df.columns:
        raise ValueError("zone_slope_stats.csv must contain a 'Zone' column.")

    df = df.set_index("Zone")

    missing_bins = [c for c in SLOPE_BINS if c not in df.columns]
    if missing_bins:
        raise ValueError(
            f"zone_slope_stats.csv missing slope bin columns: {missing_bins}"
        )

    if "TOTAL_AREA_M2" not in df.columns:
        raise ValueError("zone_slope_stats.csv must contain 'TOTAL_AREA_M2'.")

    return df


def require_zones(df: pd.DataFrame, zones: list[str]) -> None:
    """
    Validate that expected zone labels are present in the zone stats index.
    """

    missing = [z for z in zones if z not in df.index]
    if missing:
        raise ValueError(f"Missing expected zones in zone stats: {missing}")


# --------------------------------------------------
# Steep-bin counts loaders / plots
# --------------------------------------------------


def load_steep_bin_counts() -> pd.DataFrame:
    """
    Load and validate the parcel STEEP_BIN distribution counts table.

    Input
    -----
    parcel_steep_bin_counts_0_100_res_nonres.csv (STEEP_BIN_COUNTS_CSV)

    Validation
    ----------
    - Requires STEEP_BIN column
    - Requires expected count columns defined in STEEP_COUNT_COLS

    Returns
    -------
    DataFrame in the bin order provided by the CSV (typically 0-5% ... 95-100%).
    """

    df = pd.read_csv(STEEP_BIN_COUNTS_CSV)

    if "STEEP_BIN" not in df.columns:
        raise ValueError("Steep-bin counts CSV must contain 'STEEP_BIN' column.")

    missing = [c for c in STEEP_COUNT_COLS if c not in df.columns]
    if missing:
        raise ValueError(f"Steep-bin counts CSV missing expected columns: {missing}")

    # Keep order as provided in the CSV (typically 0-5% ... 95-100%)
    return df


def plot_steep_bin_pie(column: str, out_name: str) -> None:
    """
    Plot a STEEP_BIN distribution pie chart for a specified group.

    Parameters
    ----------
    column : str
        Count column to plot (e.g., 'All_parcels', 'Residential_Zone', 'Non_Residential_Zone').
    out_name : str
        Output filename under OUT_DIR.

    Output
    ------
    Saved figure file in OUT_DIR.
    """

    df = load_steep_bin_counts()

    if column not in df.columns:
        raise ValueError(f"Column '{column}' not found in steep-bin counts CSV.")

    labels = df["STEEP_BIN"].astype(str).tolist()
    values = pd.to_numeric(df[column], errors="coerce").fillna(0).astype(int).tolist()

    total = sum(values)
    if total == 0:
        raise ValueError(f"Total count is 0 for column '{column}'. Nothing to plot.")

    # Legend text: bin + count + percentage
    legend_labels = []
    for lab, v in zip(labels, values):
        pct = (v / total) * 100.0
        legend_labels.append(f"{lab}: {v} ({pct:.1f}%)")

    fig, ax = plt.subplots()
    wedges, _ = ax.pie(values, labels=None, startangle=90)

    ax.legend(
        wedges,
        legend_labels,
        title="STEEP_BIN (count, %)",
        loc="center left",
        bbox_to_anchor=(1.0, 0.5),
    )

    ax.set_title(f"Parcel distribution by STEEP_BIN ({column})")
    fig.tight_layout()
    fig.savefig(os.path.join(OUT_DIR, out_name), dpi=300, bbox_inches="tight")
    plt.close(fig)


def plot_res_vs_nonres_steep_bin_bar(out_name: str) -> None:
    """
    Plot a side-by-side bar chart comparing Residential vs Non-Residential parcel
    counts across STEEP_BIN categories.

    Output
    ------
    Saved figure file in OUT_DIR.
    """

    df = load_steep_bin_counts()

    bins = df["STEEP_BIN"].astype(str).tolist()
    res = (
        pd.to_numeric(df["Residential_Zone"], errors="coerce")
        .fillna(0)
        .astype(int)
        .tolist()
    )
    nonres = (
        pd.to_numeric(df["Non_Residential_Zone"], errors="coerce")
        .fillna(0)
        .astype(int)
        .tolist()
    )

    x = list(range(len(bins)))
    width = 0.42

    plt.figure(figsize=(12, 5))

    bars_res = plt.bar(
        [i - width / 2 for i in x],
        res,
        width=width,
        label="Residential Zone",
    )
    bars_nonres = plt.bar(
        [i + width / 2 for i in x],
        nonres,
        width=width,
        label="Non-Residential Zone",
    )

    # ---- add value labels ----
    for bar in bars_res:
        height = bar.get_height()
        if height > 0:
            plt.text(
                bar.get_x() + bar.get_width() / 2,
                height,
                f"{int(height)}",
                ha="center",
                va="bottom",
                fontsize=8,
            )

    for bar in bars_nonres:
        height = bar.get_height()
        if height > 0:
            plt.text(
                bar.get_x() + bar.get_width() / 2,
                height,
                f"{int(height)}",
                ha="center",
                va="bottom",
                fontsize=8,
            )

    plt.xticks(x, bins, rotation=45, ha="right")
    plt.ylabel("Parcel count")
    plt.title("Residential vs Non-Residential parcel counts by steep slope proportion")
    plt.legend(loc="upper right")

    plt.tight_layout()
    plt.savefig(os.path.join(OUT_DIR, out_name), dpi=300)
    plt.close()


def plot_city_slope():
    """
    Plot city-level slope composition (area %), if city_slope_stats.csv is available.

    Input
    -----
    city_slope_stats.csv (generated by city_level_slope_stats())

    Output
    ------
    fig_city_slope_distribution.png saved to OUT_DIR.
    """

    df = pd.read_csv(os.path.join(CSV_DIR, "city_slope_stats.csv"))

    # city_slope_stats.csv stores slope bin labels in the first column (often 'Unnamed: 0' after to_csv()).
    labels = [pretty_bin(c) for c in df["Unnamed: 0"]]
    values = df["area_pct"]

    plt.figure()
    bars = plt.bar(labels, values)

    # Annotate each bar with its percentage value
    for bar in bars:
        height = bar.get_height()
        plt.text(
            bar.get_x() + bar.get_width() / 2,
            height,
            f"{height:.1f}%",
            ha="center",
            va="bottom",
            fontsize=9,
        )

    plt.title("City-level slope distribution (area %)")
    plt.xlabel("Slope bin (degrees)")
    plt.ylabel("Percentage of total area (%)")
    plt.xticks(rotation=30, ha="right")
    plt.tight_layout()
    plt.savefig(os.path.join(OUT_DIR, "fig_city_slope_distribution.png"), dpi=300)
    plt.close()


def plot_zone_slope_pie(zone_name: str):
    """
    Plot slope composition (area %) as a pie chart for a single zone.

    Input
    -----
    zone_slope_stats.csv

    Output
    ------
    Figure saved to OUT_DIR with a zone-specific filename.
    """

    df = load_zone_stats()
    require_zones(df, [zone_name])

    row = df.loc[zone_name, SLOPE_BINS]
    total = float(df.loc[zone_name, "TOTAL_AREA_M2"])

    # Convert to percentages of the zone total area
    pct = (row / total) * 100.0
    values = pct.values

    # Legend text: category + percentage
    legend_labels = [
        f"{pretty_bin(bin_name)} ({pct_value:.1f}%)"
        for bin_name, pct_value in zip(SLOPE_BINS, values)
    ]

    fig, ax = plt.subplots()

    wedges, _ = ax.pie(
        values,
        labels=None,
        startangle=90,
    )

    ax.legend(
        wedges,
        legend_labels,
        title="Slope bin",
        loc="center left",
        bbox_to_anchor=(1.0, 0.5),
    )

    ax.set_title(f"Slope composition within '{zone_name}'")
    fig.tight_layout()

    out_name = (
        "fig_residential_slope_pie.png"
        if zone_name == RES_ZONE
        else "fig_nonresidential_slope_pie.png"
    )
    fig.savefig(os.path.join(OUT_DIR, out_name), dpi=300, bbox_inches="tight")
    plt.close(fig)


def plot_steep_total_comparison():
    """
    Plot total steep slope exposure (>=18°) comparison between Residential and Non-Residential zones.

    Metric
    ------
    Percentage of total zone area classified as steep (>=18°).

    Output
    ------
    fig_steep_total_comparison.png saved to OUT_DIR.
    """

    df = load_zone_stats()
    require_zones(df, [RES_ZONE, NONRES_ZONE])

    # Prefer PCT_18PLUS if present; otherwise compute from area fields
    if "PCT_18PLUS" in df.columns:
        res = float(df.loc[RES_ZONE, "PCT_18PLUS"])
        nonres = float(df.loc[NONRES_ZONE, "PCT_18PLUS"])
    elif "AREA_18PLUS_M2" in df.columns:
        res = float(
            df.loc[RES_ZONE, "AREA_18PLUS_M2"]
            / df.loc[RES_ZONE, "TOTAL_AREA_M2"]
            * 100.0
        )
        nonres = float(
            df.loc[NONRES_ZONE, "AREA_18PLUS_M2"]
            / df.loc[NONRES_ZONE, "TOTAL_AREA_M2"]
            * 100.0
        )
    else:
        raise ValueError(
            "zone_slope_stats.csv must contain PCT_18PLUS or AREA_18PLUS_M2."
        )

    zones = [RES_ZONE, NONRES_ZONE]
    values = [res, nonres]

    plt.figure()
    bars = plt.bar(zones, values)

    # Annotate
    for bar in bars:
        height = bar.get_height()
        plt.text(
            bar.get_x() + bar.get_width() / 2,
            height,
            f"{height:.1f}%",
            ha="center",
            va="bottom",
            fontsize=9,
        )

    plt.ylabel("Percentage of zone area (%)")
    plt.title("Total steep slope exposure by zone (≥18°)")
    plt.xticks(rotation=15, ha="right")
    plt.tight_layout()
    plt.savefig(os.path.join(OUT_DIR, "fig_steep_total_comparison.png"), dpi=300)
    plt.close()


def plot_steep_composition_comparison():
    """
    Plot steep slope composition by zone as stacked bars (>=18° bins).

    Interpretation
    --------------
    Bars show each steep bin (18-25, 25-35, 35-90) as a percentage of TOTAL zone area.
    Values are not renormalised to sum to 100% of steep-only area.

    Output
    ------
    fig_steep_composition_comparison.png saved to OUT_DIR.
    """

    df = load_zone_stats()
    require_zones(df, [RES_ZONE, NONRES_ZONE])

    zones = [RES_ZONE, NONRES_ZONE]
    sub = df.loc[zones, STEEP_BINS]

    # Percent of TOTAL zone area
    pct = sub.div(df.loc[zones, "TOTAL_AREA_M2"], axis=0) * 100.0

    x = range(len(zones))
    bottom = None

    plt.figure()
    for col in STEEP_BINS:
        vals = pct[col]
        label = pretty_bin(col)
        if bottom is None:
            plt.bar(x, vals, label=label)
            bottom = vals.copy()
        else:
            plt.bar(x, vals, bottom=bottom, label=label)
            bottom += vals

    # Annotate total steep percentage per zone
    total_vals = pct.sum(axis=1)
    for i, zone in enumerate(zones):
        total = total_vals.loc[zone]
        plt.text(
            i,
            total,
            f"{total:.1f}%",
            ha="center",
            va="bottom",
            fontsize=9,
            fontweight="bold",
        )

    plt.xticks(x, zones, rotation=15, ha="right")
    plt.ylabel("Percentage of zone area (%)")
    plt.title("Steep slope composition by zone (≥18° bins)")
    plt.legend(title="Slope bin", bbox_to_anchor=(1.02, 1), loc="upper left")
    plt.tight_layout()

    plt.savefig(os.path.join(OUT_DIR, "fig_steep_composition_comparison.png"), dpi=300)
    plt.close()


# --------------------------------------------------
# Main
# --------------------------------------------------


def generate_plots():
    """
    Main entry point for figure generation.

    Preconditions
    -------------
    The following CSV inputs must already exist in CSV_DIR:
    - city_slope_stats.csv (optional)
    - zone_slope_stats.csv
    - parcel_steep_bin_counts_0_100_res_nonres.csv

    Output
    ------
    All figures are saved to OUT_DIR.
    """

    plot_city_slope()
    plot_zone_slope_pie(RES_ZONE)
    plot_zone_slope_pie(NONRES_ZONE)
    plot_steep_total_comparison()
    plot_steep_composition_comparison()

    # Pie charts for All_parcels / Residential_Zone / Non_Residential_Zone
    plot_steep_bin_pie("All_parcels", "fig_steep_bin_pie_all_parcels.png")
    plot_steep_bin_pie("Residential_Zone", "fig_steep_bin_pie_residential.png")
    plot_steep_bin_pie("Non_Residential_Zone", "fig_steep_bin_pie_non_residential.png")

    # Bar chart comparing Residential vs Non-Residential counts by STEEP_BIN
    plot_res_vs_nonres_steep_bin_bar("fig_steep_bin_counts_res_vs_nonres.png")

    print("All plots generated.")


# Entry point: generate all plots
generate_plots()

All plots generated.
