In [None]:
import osmnx as ox
import geopandas as gpd
import pandas as pd
import os
import pickle
from shapely.geometry import box
from pyproj import Transformer


### Utility Functions ###
def read_path_from_file(file_path: str) -> str:
    """Read OneDrive path from a text file."""
    try:
        with open(file_path, "r") as file:
            path = file.readline().strip()  # Read the first line and strip whitespace
        return path
    except FileNotFoundError:
        print(f"Error: File '{file_path}' does not exist.")
        return None
    except Exception as e:
        print(f"An error occurred: {e}")
        return None


def save_to_cache(data, filename, cache_dir="cache"):
    """Save a GeoDataFrame or Python object to a pickle file in the cache directory."""
    os.makedirs(cache_dir, exist_ok=True)
    cache_path = os.path.join(cache_dir, filename)
    print(f"Saving data to cache: {cache_path}")
    with open(cache_path, "wb") as f:
        pickle.dump(data, f)


def load_from_cache(filename, cache_dir="cache"):
    """Load a GeoDataFrame or Python object from a pickle file in the cache directory."""
    cache_path = os.path.join(cache_dir, filename)
    if os.path.exists(cache_path):
        print(f"Loading data from cache: {cache_path}")
        with open(cache_path, "rb") as f:
            return pickle.load(f)
    return None


def latlon_to_utm(lat: float, lon: float, epsg: int = 32614):
    """
    Convert latitude and longitude (EPSG:4326) to UTM coordinates (e.g., EPSG:32614).
    """
    transformer = Transformer.from_crs("EPSG:4326", f"EPSG:{epsg}", always_xy=True)
    x, y = transformer.transform(lon, lat)  # Transform to target coordinates
    return x, y


def subsample_data(center_point: tuple, scale: float, blocks: gpd.GeoDataFrame, buildings: gpd.GeoDataFrame, roads: gpd.GeoDataFrame):
    """
    Subsample blocks, buildings, and roads using a bounding box around a central point.
    Ensures proper alignment of CRS before subsampling.
    """
    print("- Subsampling data using bounding box...")

    # Default bounding box size: ~10,000 feet (~3,048 meters) on each side
    default_bbox_size = 3048  # Half the total size on each dimension, in meters
    scaled_bbox_size = default_bbox_size * scale

    # Calculate bounding box geometry based on scaled size
    center_x, center_y = center_point
    bbox = box(
        center_x - scaled_bbox_size,  # Min X
        center_y - scaled_bbox_size,  # Min Y
        center_x + scaled_bbox_size,  # Max X
        center_y + scaled_bbox_size   # Max Y
    )
    print("-- Bounding box geometry:", bbox)

    # Debugging: Check CRS alignment for all datasets
    print("-- Blocks CRS:", blocks.crs)
    print("-- Buildings CRS before reprojection:", buildings.crs)
    print("-- Roads CRS:", roads.crs)

    # Reproject buildings to match the CRS of the bounding box (EPSG:32614)
    if buildings.crs.to_string().lower() != "epsg:32614":
        print("-- Reprojecting buildings to EPSG:32614...")
        buildings = buildings.to_crs(epsg=32614)

    print("-- Buildings CRS after reprojection:", buildings.crs)

    # Filter each GeoDataFrame by the bounding box intersection
    blocks_subset = blocks[blocks.geometry.intersects(bbox)]
    buildings_subset = buildings[buildings.geometry.intersects(bbox)]
    roads_subset = roads[roads.geometry.intersects(bbox)]

    # Debugging: Check if any buildings are retained in the subset
    print("-- Buildings subset shape after intersects:", buildings_subset.shape)
    if len(buildings_subset) == 0:
        print("-- Warning: No buildings found within the bounding box.")
        print("-- Original buildings dataset sample (after reprojection):")
        print(buildings.geometry.head())  # Inspect original geometries

    print(f"-- Subsampled {len(blocks_subset)} blocks, {len(buildings_subset)} buildings, and {len(roads_subset)} roads.")
    return blocks_subset, buildings_subset, roads_subset


def load_data(block_path, osm_boundary_place, cache_dir="cache", subsample_scale=1.0):
    """
    Load blocks, buildings, and roads data from cache (if available), process, and optionally subsample using a scaled bounding box.
    Uses only the 'height' column for building heights and outputs the percentage of valid values.
    """
    # Corpus Christi center point from Google Maps
    corpus_christi_lat = 27.783611  # Latitude
    corpus_christi_lon = -97.414779  # Longitude

    print(f"Converting center coordinates ({corpus_christi_lat}, {corpus_christi_lon}) to UTM...")
    corpus_christi_center_utm = latlon_to_utm(corpus_christi_lat, corpus_christi_lon)  # Convert to UTM (EPSG:32614)
    CC_Padre_center_utm = latlon_to_utm(27.614956, -97.269562)
    print(f"Corpus Christi center in UTM (EPSG:32614): {corpus_christi_center_utm}")

    # Check cache for blocks
    print("- Checking cache for blocks...")
    blocks = load_from_cache("blocks.pkl", cache_dir)
    if blocks is None:
        print("-- Loading block data...")
        blocks = gpd.read_file(block_path).to_crs(epsg=32614)
        blocks.loc[:, "POP20"] = pd.to_numeric(blocks["POP20"], errors="coerce")
        save_to_cache(blocks, "blocks.pkl", cache_dir)

    # Check cache for buildings
    print("- Checking cache for buildings...")
    buildings = load_from_cache("buildings.pkl", cache_dir)
    if buildings is None:
        print("-- Fetching building data...")
        buildings = ox.features_from_place(
            osm_boundary_place,
            tags={"building": True, "building:height": True, "building:levels": True}
        )

        print("-- Raw buildings dataset columns:")
        print(buildings.columns)  # Print available columns for validation
        print("-- Preview of raw building dataset:")
        print(buildings.head())  # Inspect raw data for possible errors

        if "height" in buildings.columns:
            print("-- 'height' column found. Attempting numerical conversion...")
            buildings.loc[:, "height"] = pd.to_numeric(buildings["height"], errors="coerce")

            # Debugging step to Calculate percentage of valid values (>0 and numeric)
            total_height_values = len(buildings)
            valid_height_values = buildings["height"].dropna().gt(0).sum()
            percentage_valid = (valid_height_values / total_height_values) * 100 if total_height_values > 0 else 0
            print(f"-- Valid height values: {valid_height_values} / Total: {total_height_values} ({percentage_valid:.2f}%)")

            print("-- Converted 'height' values to numeric. Example values:")
            print(buildings["height"].head())
        else:
            print("-- ERROR: 'height' column is missing in the dataset! Setting height to None.")
            buildings.loc[:, "height"] = None

        # Reproject buildings to CRS: EPSG:32614
        print("-- Reprojecting buildings to EPSG:32614...")
        buildings = buildings.to_crs(epsg=32614)

        # Save the buildings dataset to cache
        save_to_cache(buildings, "buildings.pkl", cache_dir)

    # Check cache for roads
    print("- Checking cache for roads...")
    roads = load_from_cache("roads.pkl", cache_dir)
    if roads is None:
        print("-- Fetching road data...")
        roads = ox.graph_to_gdfs(ox.graph_from_place(osm_boundary_place, network_type="drive"), nodes=False)
        roads = roads.to_crs(epsg=32614)
        save_to_cache(roads, "roads.pkl", cache_dir)

    # Subsample datasets using the bounding box around Corpus Christi center in UTM
    print("- Subsampling datasets...")
    # CC Padre Island as center of box
    #blocks_subset, buildings_subset, roads_subset = subsample_data(CC_Padre_center_utm, subsample_scale, blocks, buildings, roads)
    # CC Corpus Christi Downtown as the center of box
    blocks_subset, buildings_subset, roads_subset = subsample_data(corpus_christi_center_utm, subsample_scale, blocks, buildings, roads)
    print("- Data loading complete.")

    '''
    # IF you want to calculate all of Corpus Christi, uncomment the lines below and comment out the lines above for subsampling
    blocks_subset = blocks
    building_subset = buildings
    road_subset = roads
    '''
    
    # Calculate building setbacks (distance to nearest road)
    print("Calculating building setbacks...")
    roads_union = roads_subset.geometry.unary_union  # Combine all road geometries into one
    buildings_subset["setback_ft"] = buildings_subset.geometry.apply(
        lambda building_geom: building_geom.distance(roads_union) * 3.28084  # Convert meters to feet
    )
    return blocks_subset, buildings_subset, roads_subset


# Define paths to block shapefile and Corpus Christi boundary
one_drive_path = read_path_from_file("OneDrive.txt")
block_path = os.path.join(one_drive_path, "Data", "tl_2023_48_tabblock20", "tl_2023_48_tabblock20.shp")
osm_boundary_place = "Corpus Christi, Texas, USA"

# Load data with subsampling
print("Starting data loading...")
blocks_subset, buildings_subset, roads_subset = load_data(block_path, osm_boundary_place, subsample_scale=0.1) # CHANGE SCALE OF BOUNDING BOX HERE

# Inspect subsampled data
print("-- Blocks subset:")
print(blocks_subset.head())
print("-- Buildings subset:")
print(buildings_subset.head())
print("-- Roads subset:")
print(roads_subset.head())

Starting data loading...
Converting center coordinates (27.783611, -97.414779) to UTM...
Corpus Christi center in UTM (EPSG:32614): (656184.519263486, 3074239.5261650323)
- Checking cache for blocks...
Loading data from cache: cache\blocks.pkl
- Checking cache for buildings...
Loading data from cache: cache\buildings.pkl
- Checking cache for roads...
Loading data from cache: cache\roads.pkl
- Subsampling datasets...
- Subsampling data using bounding box...
-- Bounding box geometry: POLYGON ((656489.3192634861 3073934.7261650325, 656489.3192634861 3074544.326165032, 655879.719263486 3074544.326165032, 655879.719263486 3073934.7261650325, 656489.3192634861 3073934.7261650325))
-- Blocks CRS: EPSG:32614
-- Buildings CRS before reprojection: EPSG:32614
-- Roads CRS: EPSG:32614
-- Buildings CRS after reprojection: EPSG:32614
-- Buildings subset shape after intersects: (405, 198)
-- Subsampled 35 blocks, 405 buildings, and 126 roads.
- Data loading complete.
Calculating building setbacks...


  roads_union = roads_subset.geometry.unary_union  # Combine all road geometries into one
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  super().__setitem__(key, value)


-- Blocks subset:
       STATEFP20 COUNTYFP20 TRACTCE20 BLOCKCE20          GEOID20  \
316275        48        355    001000      2012  483550010002012   
359470        48        355    001000      1017  483550010001017   
359630        48        355    001000      3008  483550010003008   
359631        48        355    001000      2022  483550010002022   
367172        48        355    001000      1019  483550010001019   

                       GEOIDFQ20     NAME20 MTFCC20 UR20 UACE20 FUNCSTAT20  \
316275  1000000US483550010002012  Block2012   G5040    U  20287          S   
359470  1000000US483550010001017  Block1017   G5040    U  20287          S   
359630  1000000US483550010003008  Block3008   G5040    U  20287          S   
359631  1000000US483550010002022  Block2022   G5040    U  20287          S   
367172  1000000US483550010001019  Block1019   G5040    U  20287          S   

        ALAND20  AWATER20   INTPTLAT20    INTPTLON20  HOUSING20  POP20  \
316275     5968         0  +27

In [None]:
def calculate_block_metrics(blocks_subset: gpd.GeoDataFrame, buildings_subset: gpd.GeoDataFrame, roads_subset: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
    """
    Calculate block-level metrics, including a new HHpBLD metric based on households per building count.
    """
    print("Available columns in buildings dataset:")
    print(buildings_subset.columns)
    # Check if required columns exist in the buildings dataset
    required_columns = ["geometry", "height"]
    missing_columns = [col for col in required_columns if col not in buildings_subset.columns]
    if missing_columns:
        print(f"Error: Missing required columns in buildings dataset: {missing_columns}")
        print("Preview of buildings dataset:")
        print(buildings_subset.head())
        raise KeyError(f"Required columns {missing_columns} are not found in buildings dataset.")

    print("Calculating block metrics...")
    blocks_subset["BLOCK_ID"] = blocks_subset.index  # Assign unique ID to each block
    buildings_subset["build_idx"] = buildings_subset.index  # Assign unique ID to each building
    
    # Calculate building area (square meters -> square miles)
    print("Calculating building area coverage (sq-mile)...")
    buildings_subset["build_area_sm"] = buildings_subset.geometry.area * 0.00000038610215855  # Convert area to sq-mile
    
    # Handle building height (convert meters to feet and filter valid heights)
    print("Processing building heights...")
    buildings_subset["height_m"] = pd.to_numeric(buildings_subset["height"], errors="coerce")
    buildings_subset["height_ft"] = buildings_subset["height_m"] * 3.28084  # Convert height to feet
    buildings_subset = buildings_subset[buildings_subset["height_ft"] > 0]  # Filter buildings with positive heights
    
    # Spatial join to associate buildings with blocks
    print("Associating buildings with blocks via spatial join...")
    buildings_with_blocks = gpd.sjoin(buildings_subset, blocks_subset, how="inner", predicate="intersects")
    
    # Aggregate metrics per block
    print("Aggregating building metrics per block...")
    building_metrics = buildings_with_blocks.groupby("BLOCK_ID").agg(
        avg_hght=("height_ft", "mean"),  # Average building height
        avg_sback=("setback_ft", "mean"),  # Average setback distance
        bld_area=("build_area_sm", "sum"),  # Sum building areas
        bld_cnt=("build_idx", "count"),  # Count number of buildings
    )
    
    # Calculate population density for blocks
    print("Calculating population density and other metrics for blocks...")
    blocks_subset["area_sm"] = blocks_subset.geometry.area * 0.00000038610215855  # Convert block area to sq-mile
    blocks_subset["pop_den"] = blocks_subset["POP20"] / blocks_subset["area_sm"]  # Population density per sq-mile
    
    # additional metrics
    print("Calculating additional block metrics: Building count per square mile, building area percentage, and households per building count...")
    building_metrics["bld_ctsm"] = building_metrics["bld_cnt"] / blocks_subset["area_sm"]  # Building count per sq-mile
    building_metrics["bld_prc"] = (building_metrics["bld_area"] / blocks_subset["area_sm"]) * 100  # Building area as percentage of block area
    
    # Add Households per Building Count (HHpBLD)
    print("Calculating Households per Building Count (HHpBLD)...")
    building_metrics["HHpBLD"] = blocks_subset["HOUSING20"] / building_metrics["bld_cnt"]
    building_metrics["HHpBLD"] = building_metrics["HHpBLD"].fillna(0)  # Fill missing values with 0 (e.g., no buildings)

    # Merge metrics into block dataset
    print("Merging building metrics into block dataset...")
    blocks_subset = blocks_subset.merge(building_metrics, on="BLOCK_ID", how="left")
    
    # Fill missing values explicitly for columns
    print("Filling missing values for block metrics...")
    blocks_subset["avg_hght"] = blocks_subset["avg_hght"].fillna(0)  
    blocks_subset["avg_sback"] = blocks_subset["avg_sback"].fillna(0)  
    blocks_subset["bld_area"] = blocks_subset["bld_area"].fillna(0)  
    blocks_subset["bld_ctsm"] = blocks_subset["bld_ctsm"].fillna(0)  
    blocks_subset["bld_prc"] = blocks_subset["bld_prc"].fillna(0)  
    blocks_subset["bld_cnt"] = blocks_subset["bld_cnt"].fillna(0)  
    blocks_subset["HHpBLD"] = blocks_subset["HHpBLD"].fillna(0)  

    print("Block metrics calculated successfully.")
    return blocks_subset

# Calculate block-level metrics
blocks_processed = calculate_block_metrics(blocks_subset, buildings_subset, roads_subset)
print("Block metrics successfully calculated:")
print(blocks_processed.head())

Available columns in buildings dataset:
Index(['geometry', 'addr:state', 'building', 'ele', 'gnis:feature_id', 'name',
       'source', 'addr:city', 'addr:housename', 'addr:housenumber',
       ...
       'fuel:octane_87', 'fuel:octane_89', 'fuel:octane_93', 'female', 'male',
       'portable', 'toilets:handwashing', 'capacity', 'size', 'setback_ft'],
      dtype='object', length=199)
Calculating block metrics...
Calculating building area coverage (sq-mile)...
Processing building heights...
Associating buildings with blocks via spatial join...
Aggregating building metrics per block...
Calculating population density and other metrics for blocks...
Calculating additional block metrics: Building count per square mile, building area percentage, and households per building count...
Calculating Households per Building Count (HHpBLD)...
Merging building metrics into block dataset...
Filling missing values for block metrics...
Block metrics calculated successfully.
Block metrics successfully c

In [10]:
def calculate_road_metrics(blocks_subset: gpd.GeoDataFrame, roads_subset: gpd.GeoDataFrame, buffer_size_feet=50) -> gpd.GeoDataFrame:
    """
    Calculate road-level metrics based on spatial overlap with blocks (weighted aggregation).
    Includes Block IDs and Overlap Percentages as columns, flags roads intersecting evacuation routes using buffered geometry,
    and calculates new columns: agg_HHpBLD (weighted households per building count) and length_cal (road segment length).
    """
    print(f"- Calculating road metrics with buffer size (for spatial analysis): {buffer_size_feet} feet...")
    buffer_size_meters = buffer_size_feet * 0.3048  # Convert buffer size to meters

    # Blocks subset should maintain the original BLOCK_ID
    blocks_subset = blocks_subset.copy()
    print(blocks_subset.head())  # Output a sample of blocks_subset

    roads_subset = roads_subset.copy().reset_index(drop=True)
    roads_subset["road_id"] = roads_subset.index

    # Check if required block-level metrics exist
    required_columns = ["area_sm", "pop_den", "bld_area", "bld_ctsm", "avg_hght", "avg_sback", "HHpBLD"]
    missing_columns = [col for col in required_columns if col not in blocks_subset.columns]
    if missing_columns:
        print(f"ERROR: The following required block-level metrics are missing: {missing_columns}")
        raise KeyError(f"Missing required columns: {missing_columns}")

    # Create buffers for roads
    print("- Creating road buffers...")
    road_buffers = roads_subset.copy()
    road_buffers["geometry"] = roads_subset.geometry.buffer(buffer_size_meters)
    road_buffers["road_id"] = roads_subset["road_id"]
    road_buffers["buffer_area"] = road_buffers.geometry.area  # Area of the road buffer, for overlap proportion calculation
    print("-- Road buffers structure after buffering:")
    print(road_buffers.columns)

    # Perform spatial join of blocks with road buffers
    print("- Performing spatial join of blocks with road buffers...")
    intersections = gpd.overlay(blocks_subset, road_buffers, how="intersection")
    print("-- Intersections structure after spatial join:")
    print(intersections.head())

    # Validate road buffer areas and block intersections
    print("-- Adding road buffer areas for overlap proportion calculation...")
    intersections["road_buffer_area"] = intersections["road_id"].map(road_buffers.set_index("road_id")["buffer_area"])
    intersections["overlap_area"] = intersections.geometry.area
    intersections["overlap_proportion"] = intersections["overlap_area"] / intersections["road_buffer_area"]
    print("-- Intersection overlap proportions sample:")
    print(intersections[["BLOCK_ID", "road_id", "overlap_area", "road_buffer_area", "overlap_proportion"]].head())

    # Create overlap dictionary for roads
    print("- Creating overlap dictionary for roads...")
    road_block_overlap = (
        intersections.groupby("road_id")
        .apply(lambda rows: dict(zip(rows["BLOCK_ID"], rows["overlap_proportion"])))
        .to_dict()
    )
    print("-- Adding block IDs and overlap percentages columns...")
    # Add block IDs and overlap percentages columns
    roads_subset["block_ids"] = [
        list(overlap_dict.keys()) for overlap_dict in road_block_overlap.values()
    ]
    roads_subset["overlap_percs"] = [
        list(overlap_dict.values()) for overlap_dict in road_block_overlap.values()
    ]

    # Load Evacuation Routes
    print("- Loading evacuation routes shapefile...")
    evac_path = os.path.join(one_drive_path, "Data", "TxDOT Evacuation Routes AGO.shp")
    evacuation_routes = gpd.read_file(evac_path)

    # Ensure both datasets use the same CRS for spatial operations
    if evacuation_routes.crs != road_buffers.crs:
        evacuation_routes = evacuation_routes.to_crs(road_buffers.crs)

    print("-- Flagging roads based on evacuation route overlap...")

    def flag_evacuation_routes(road_buffer_geom):
        """
        Flag roads based on overlap with evacuation routes.
        0 = No overlap
        1 = Major Evacuation Routes
        2 = Potential Contraflow
        3 = Potential EvacuLanes
        """
        for _, evac_row in evacuation_routes.iterrows():
            if road_buffer_geom.intersects(evac_row.geometry):
                route_type = evac_row["ROUTE_TYPE"]
                if route_type == "Major Evacuation Routes":
                    return 1
                elif route_type == "Potential Contraflow":
                    return 2
                elif route_type == "Potential EvacuLanes":
                    return 3
        return 0  # No overlap

    # Use the buffered geometry to calculate evacuation flags
    roads_subset["evac_flag"] = road_buffers.geometry.apply(flag_evacuation_routes)

    # Calculate weighted metrics for roads
    print("- Weighting metrics for roads...")
    def compute_weighted_metrics(road_id, overlap_dict):
        """
        Compute weighted metrics for a road using normalized weights for metrics like `agg_hght` and `agg_sback`.
        Includes HHpBLD as a new weighted metric.
        """
        blocks = blocks_subset.set_index("BLOCK_ID").loc[overlap_dict.keys()]
        raw_weights = pd.Series({BLOCK_ID: overlap_dict[BLOCK_ID] for BLOCK_ID in blocks.index})
        total_weight = raw_weights.sum()
        if total_weight > 0:
            normalized_weights = raw_weights / total_weight
        else:
            return pd.Series({
                "agg_pop": 0,
                "agg_area": 0,
                "agg_ctsm": 0,
                "agg_bldprc": 0,
                "agg_hght": 0,
                "agg_sback": 0,
                "agg_HHpBLD": 0,
            })
        valid_blocks_for_hght_sback = blocks[(blocks["avg_hght"] > 0) & (blocks["avg_sback"] > 0)]
        aggregated_metrics = {
            "agg_pop": (blocks["pop_den"] * normalized_weights).sum(),
            "agg_area": (blocks["bld_area"] * normalized_weights).sum(),
            "agg_ctsm": (blocks["bld_ctsm"] * normalized_weights).sum(),
            "agg_bldprc": (blocks["bld_prc"] * normalized_weights).sum(),
            "agg_hght": (valid_blocks_for_hght_sback["avg_hght"] * normalized_weights.loc[valid_blocks_for_hght_sback.index]).sum()
            if not valid_blocks_for_hght_sback.empty else 0,
            "agg_sback": (valid_blocks_for_hght_sback["avg_sback"] * normalized_weights.loc[valid_blocks_for_hght_sback.index]).sum()
            if not valid_blocks_for_hght_sback.empty else 0,
            "agg_HHpBLD": (blocks["HHpBLD"] * normalized_weights).sum(),
        }
        return pd.Series(aggregated_metrics)

    print("- Aggregating weighted metrics across roads...")
    road_metrics_df = pd.DataFrame([
        compute_weighted_metrics(road_id, overlap_dict)
        for road_id, overlap_dict in road_block_overlap.items()
    ], index=road_block_overlap.keys())
    print("-- Aggregated road metrics structure:")
    print(road_metrics_df.head())

    # Merge aggregated metrics into roads dataset
    print("- Merging metrics into roads dataset...")
    roads_subset = roads_subset.merge(road_metrics_df, left_on="road_id", right_index=True, how="left")

    # Calculate road length and add as a new column
    print("-- Adding calculated road segment lengths as length_cal...")
    roads_subset["length_cal"] = roads_subset.geometry.length  # Calculate length based on geometry

    print("-- Final roads structure:")
    print(roads_subset.head())
    return roads_subset

def add_urban_area_flag(roads: gpd.GeoDataFrame, polygons: gpd.GeoDataFrame, city_limits: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
    """
    Adds a flag ('UrbanArea') to the roads GeoDataFrame indicating whether
    a road segment is inside or touches any polygon in the polygons GeoDataFrame.
    """
    print("- Adding 'UrbanArea' flag to roads based on intersection or touching polygons...")

    # Ensure CRS is consistent for spatial operations
    if roads.crs != polygons.crs:
        print("-- Reprojecting urban area polygons to match roads CRS...")
        
        polygons = polygons.to_crs(roads.crs)

    def check_intersects_or_touches(road_geom):
        """
        Check if a road segment intersects or touches any polygon.
        Returns 1 if true (overlap/match), else 0 (no overlap/match).
        """
        for poly_geom in polygons.geometry:
            if road_geom.intersects(poly_geom) or road_geom.touches(poly_geom):
                return 1
        return 0

    # Apply the function to calculate 'UrbanArea' flag
    roads["UrbanArea"] = roads.geometry.apply(check_intersects_or_touches)
    print("-- Added 'UrbanArea' column to roads GeoDataFrame:")
    
    return roads

def associate_roads_with_employment_density(roads: gpd.GeoDataFrame, polygons: gpd.GeoDataFrame, density_column: str, city_limits: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
    """
    Associates roads with employment density polygons based on spatial intersection.
    Trims employment density polygons to be within the city limits (provided city_limits GeoDataFrame).
    Converts jobs/acre to jobs/square mile.
    If a road intersects multiple polygons, assigns the value of the polygon with the higher employment density.
    """
    print("- Ensuring CRS consistency and clipping polygons...")

    # Check if the CRS of polygons and city limits match
    if polygons.crs != city_limits.crs:
        print("-- CRS mismatch between polygons and city limits.")
        if roads.crs == polygons.crs:
            print("--- Converting city limits CRS to match polygons CRS...")
            city_limits = city_limits.to_crs(polygons.crs)
        else:
            print("--- Converting both city limits and polygons CRS to match roads CRS...")
            city_limits = city_limits.to_crs(roads.crs)
            polygons = polygons.to_crs(roads.crs)

    # If CRS matches between polygons and city limits, trim polygons first, then optionally convert
    if polygons.crs == city_limits.crs:
        print("-- CRS match between polygons and city limits. Clipping polygons to city limits...")
        polygons_clipped = gpd.clip(polygons, city_limits)
        if roads.crs != polygons.crs:
            print("--- Converting clipped polygons CRS to match roads CRS...")
            polygons_clipped = polygons_clipped.to_crs(roads.crs)
    else:
        polygons_clipped = polygons  # Already transformed earlier, no need to clip again

    print("-- Sample of clipped polygons:")
    print(polygons_clipped.head())

    # Convert employment density to jobs per square mile
    print(f"- Converting {density_column} from jobs/acre to jobs/square mile...")
    polygons_clipped["emp_den"] = polygons_clipped[density_column] * 640  # 1 square mile = 640 acres
    print("-- Updated density column with jobs/square mile (after clipping):")
    print(polygons_clipped[["emp_den"]].head())

    # Perform spatial join to associate roads with clipped polygons
    print("- Performing spatial join to associate roads with employment density polygons...")
    roads_with_density = gpd.sjoin(roads, polygons_clipped, how="left", predicate="intersects")
    print("-- Results of spatial join sample:")
    print(roads_with_density[[density_column, "emp_den"]].head())

    # Identify the road-polygon intersections and select the highest density value for each road
    print("- Aggregating road intersections to take the highest employment density for each road...")
    roads_with_density_agg = (
        roads_with_density.groupby("road_id")  # Assuming "road_id" is unique for roads
        .agg({"emp_den": "max"})  # Take the maximum density value
        .reset_index()
    )
    print("-- Aggregated density by road (max jobs/sq mile):")
    print(roads_with_density_agg.head())

    # Merge the maximum density back into the roads GeoDataFrame
    print("- Merging maximum density values back into the roads GeoDataFrame...")
    roads = roads.merge(roads_with_density_agg, on="road_id", how="left")
    print("-- Final roads sample with employment density:")
    print(roads.head())

    return roads




# Execute road metrics calculation
roads_processed = calculate_road_metrics(blocks_processed, roads_subset, buffer_size_feet=50)

print("- Loading the shapefile with polygons for 'UrbanArea' flagging...")
urbanarea_file = os.path.join(one_drive_path, "Data", "2020_Census_Urban_Areas", "2020_Census_Urban_Areas.shp")
urban_area_polygons = gpd.read_file(urbanarea_file)
# Path to the Corpus Christi city limits shapefile
cc_limits_file = os.path.join(one_drive_path, "Data", "CC-citylimits", "Citylimits.shp")
city_limits = gpd.read_file(cc_limits_file)

# Read the employment density shapefile and inspect column names
emp_density_file = os.path.join(one_drive_path, "Data", "CBG2010_SLD_YY", "CBG2010_SLD_YY.shp")
employment_density_polygons = gpd.read_file(emp_density_file)

roads_processed = add_urban_area_flag(roads_processed, urban_area_polygons, city_limits)

print("-- Final roads with 'UrbanArea' flag sample:")
print(roads_processed.head())



# Specify the column name for employment density
density_column = "D1C" 

# Process the roads to associate employment density (clipped to Corpus Christi city limits)
roads_processed = associate_roads_with_employment_density(
    roads=roads_processed,
    polygons=employment_density_polygons,
    density_column=density_column,
    city_limits=city_limits
)

# Final debugging output
print("-- Final roads GeoDataFrame with employment density sample:")
print(roads_processed.head())


- Calculating road metrics with buffer size (for spatial analysis): 50 feet...
  STATEFP20 COUNTYFP20 TRACTCE20 BLOCKCE20          GEOID20  \
0        48        355    001000      2012  483550010002012   
1        48        355    001000      1017  483550010001017   
2        48        355    001000      3008  483550010003008   
3        48        355    001000      2022  483550010002022   
4        48        355    001000      1019  483550010001019   

                  GEOIDFQ20     NAME20 MTFCC20 UR20 UACE20  ... BLOCK_ID  \
0  1000000US483550010002012  Block2012   G5040    U  20287  ...   316275   
1  1000000US483550010001017  Block1017   G5040    U  20287  ...   359470   
2  1000000US483550010003008  Block3008   G5040    U  20287  ...   359630   
3  1000000US483550010002022  Block2022   G5040    U  20287  ...   359631   
4  1000000US483550010001019  Block1019   G5040    U  20287  ...   367172   

    area_sm       pop_den   avg_hght   avg_sback  bld_area  bld_cnt  \
0  0.002304   

In [8]:
import warnings
import os
import geopandas as gpd

def save_shapefiles(buildings_subset, blocks_processed, roads_processed, output_dir="output"):
    """
    Save GeoDataFrames to shapefiles while ensuring the correct geometry column is activated.
    Includes functionality to validate geometries only for buildings and trims `overlap_percs` in roads to 4 decimal places.
    """
    # Create output directory
    os.makedirs(output_dir, exist_ok=True)
    print("Saving subsets to shapefiles...")

    # Suppress warnings about truncated column names for ESRI Shapefiles
    warnings.filterwarnings("ignore", message=".*Normalized/laundered field name.*", category=RuntimeWarning)

    def ensure_unique_column_names(df, name):
        """
        Ensure column names in a GeoDataFrame are unique by appending trailing numbers to duplicates.
        """
        duplicates = df.columns[df.columns.duplicated()].unique()
        if len(duplicates) > 0:
            print(f"WARNING: Duplicate column names detected in {name}: {duplicates}")
            df = df.rename(columns=lambda x: x[:10])  # Truncate names to 10 characters for ESRI compatibility
            # Resolve duplicates by appending a suffix
            seen = set()
            new_columns = []
            for col in df.columns:
                if col in seen:
                    count = sum([existing.startswith(col) for existing in seen]) + 1
                    new_col = f"{col[:7]}_{count}"  # Add numeric suffix to resolve duplicates
                    print(f"    Renaming column '{col}' to '{new_col}' to resolve duplication.")
                    new_columns.append(new_col)
                else:
                    new_columns.append(col)
                seen.add(new_columns[-1])
            df.columns = new_columns
        return df

    def print_columns(df, name):
        """
        Print column names in their entirety for a GeoDataFrame.
        """
        print(f"-- Full column names in {name}:")
        for col in df.columns:
            print(f"  {col}")

    def drop_extraneous_columns(df, columns_to_keep, name):
        """
        Drop columns not included in the `columns_to_keep` list, only if they exist in the dataset.
        """
        columns_to_drop = [col for col in df.columns if col not in columns_to_keep]
        columns_to_drop = [col for col in columns_to_drop if col in df.columns]  # Ensure column exists
        if columns_to_drop:
            print(f"-- Dropping extraneous columns from {name}: {columns_to_drop}")
            df = df.drop(columns=columns_to_drop)
        print_columns(df, f"{name} after dropping")  # Print columns after dropping
        return df

    def validate_and_fix_geometries(df, name):
        """
        Validate geometries in a GeoDataFrame, ensuring they are of the correct type.
        Filter out invalid geometries or attempt to convert them to `Polygon`/`MultiPolygon`.
        Applied **only** to buildings.
        """
        print(f"-- Validating geometries in {name}...")
        
        # Filter valid geometries
        valid_types = ["Polygon", "MultiPolygon"]
        
        # Separate invalid geometries
        invalid_geometries = df[~df.geometry.type.isin(valid_types)]
        if len(invalid_geometries) > 0:
            print(f"ERROR: Invalid geometry type detected in {name}. Expected Polygon or MultiPolygon.")
            print(f"-- Invalid geometries: {len(invalid_geometries)}")
            
            # Attempt to convert invalid geometries to polygons
            print("-- Attempting to convert invalid geometries to valid Polygon/MultiPolygon...")
            def convert_to_polygon(geom):
                # Try to buffer as a fix to convert Point/LineString geometries
                if geom.is_valid:
                    return geom.buffer(0)  # Buffer to create a polygon around invalid geometry
                return None  # Skip if geometry cannot be converted
                
            # Apply conversion
            df.loc[~df.geometry.type.isin(valid_types), "geometry"] = df.loc[~df.geometry.type.isin(valid_types), "geometry"].apply(convert_to_polygon)
            
            # Remove any geometries that could not be converted
            df = df[df.geometry.type.isin(valid_types)]
            print(f"-- After conversion: {len(df)} valid polygons remaining.")
        else:
            print(f"-- All geometries are valid in {name}.")
        
        return df

    def trim_overlap_percs(df, column_name, name):
        """
        Trim values in the `overlap_percs` field to 4 decimal places.
        """
        print(f"-- Trimming values in {column_name} to 4 decimal places for {name}...")
        if column_name in df.columns:
            df[column_name] = df[column_name].apply(lambda x: [round(value, 4) for value in x])
        return df

    # Process buildings subset
    print("- Saving buildings subset...")
    print_columns(buildings_subset, "buildings_subset")
    buildings_subset = drop_extraneous_columns(buildings_subset, columns_to_keep=[
        "geometry", "height", "setback_ft", "build_idx", "build_area_sm", "height_m", "height_ft"
    ], name="buildings_subset")
    buildings_subset = validate_and_fix_geometries(buildings_subset, "buildings_subset")  # Apply validation/fixing ONLY here
    buildings_subset = ensure_unique_column_names(buildings_subset, "buildings_subset")
    buildings_subset.to_file(f"{output_dir}/Corpus_Christi_buildings_subset.shp")

    # Process blocks subset
    print("- Saving blocks processed...")
    print_columns(blocks_processed, "blocks_processed")
    blocks_processed = drop_extraneous_columns(blocks_processed, columns_to_keep=[
        "STATEFP20", "COUNTYFP20", "TRACTCE20", "BLOCKCE20", "GEOID20",
        "ALAND20", "AWATER20", "HOUSING20", "POP20", "geometry", "BLOCK_ID", "area_sm",
        "pop_den", "avg_hght", "avg_sback", "bld_area", "bld_cnt", "bld_ctsm", "bld_prc"
    ], name="blocks_processed")
    blocks_processed = ensure_unique_column_names(blocks_processed, "blocks_processed")
    blocks_processed.to_file(f"{output_dir}/Corpus_Christi_blocks_processed.shp")

    # Process roads subset
    print("- Saving roads processed...")
    print_columns(roads_processed, "roads_processed")
    roads_processed = drop_extraneous_columns(roads_processed, columns_to_keep=[
        "osmid", "highway", "lanes", "name", "oneway", "length", "maxspeed", "geometry",
        "road_id", "agg_pop", "agg_area", "agg_ctsm", "agg_bldprc", "agg_HHpBLD", "agg_hght", "agg_sback",
        "block_ids", "overlap_percs", "evac_flag", "length_cal", "UrbanArea", "emp_den"
    ], name="roads_processed")
    roads_processed = trim_overlap_percs(roads_processed, "overlap_percs", "roads_processed")  # Trim `overlap_percs`
    roads_processed = ensure_unique_column_names(roads_processed, "roads_processed")
    roads_processed.to_file(f"{output_dir}/Corpus_Christi_roads_processed.shp")

    print("Shapefiles saved successfully.")

# Save the processed data to shapefiles
output_dir = "output/rev7"
# Ensure output directory exists
os.makedirs(output_dir, exist_ok=True)
save_shapefiles(buildings_subset, blocks_processed, roads_processed, output_dir)

Saving subsets to shapefiles...
- Saving buildings subset...
-- Full column names in buildings_subset:
  geometry
  addr:state
  building
  ele
  gnis:feature_id
  name
  source
  addr:city
  addr:housename
  addr:housenumber
  addr:postcode
  addr:street
  amenity
  brand
  brand:wikidata
  cuisine
  healthcare
  healthcare:counselling
  height
  phone
  website
  generator:method
  generator:output:electricity
  generator:source
  generator:type
  layer
  power
  shop
  opening_hours
  ref
  wholesale
  addr:unit
  official_name
  takeaway
  note
  email
  museum
  tourism
  addr:country
  alt_name
  contact:email
  contact:facebook
  contact:twitter
  man_made
  content
  office
  start_date
  building:levels
  leisure
  sport
  wikidata
  check_date
  fee
  fixme
  heritage
  heritage:operator
  historic
  ref:nrhp
  ship:type
  wikipedia
  building:part
  disused:shop
  type
  aeroway
  operator
  short_name
  parking
  old_name
  branch
  air_conditioning
  drive_through
  openin

  buildings_subset.to_file(f"{output_dir}/Corpus_Christi_buildings_subset.shp")


- Saving blocks processed...
-- Full column names in blocks_processed:
  STATEFP20
  COUNTYFP20
  TRACTCE20
  BLOCKCE20
  GEOID20
  GEOIDFQ20
  NAME20
  MTFCC20
  UR20
  UACE20
  FUNCSTAT20
  ALAND20
  AWATER20
  INTPTLAT20
  INTPTLON20
  HOUSING20
  POP20
  geometry
  BLOCK_ID
  area_sm
  pop_den
  avg_hght
  avg_sback
  bld_area
  bld_cnt
  bld_ctsm
  bld_prc
  HHpBLD
-- Dropping extraneous columns from blocks_processed: ['GEOIDFQ20', 'NAME20', 'MTFCC20', 'UR20', 'UACE20', 'FUNCSTAT20', 'INTPTLAT20', 'INTPTLON20', 'HHpBLD']
-- Full column names in blocks_processed after dropping:
  STATEFP20
  COUNTYFP20
  TRACTCE20
  BLOCKCE20
  GEOID20
  ALAND20
  AWATER20
  HOUSING20
  POP20
  geometry
  BLOCK_ID
  area_sm
  pop_den
  avg_hght
  avg_sback
  bld_area
  bld_cnt
  bld_ctsm
  bld_prc
- Saving roads processed...
-- Full column names in roads_processed:
  osmid
  highway
  lanes
  name
  oneway
  ref
  reversed
  length
  maxspeed
  geometry
  bridge
  junction
  width
  access
  road_i

  roads_processed.to_file(f"{output_dir}/Corpus_Christi_roads_processed.shp")


In [None]:
import os  # Import os for directory creation
import pandas as pd
import geopandas as gpd

# Function to resolve classifications into priority-based order
def resolve_to_priority(class_list: list, priority_mapping: dict, default_classification: str = "NOT FOUND IN LIST") -> str:
    """
    Resolves multiple classifications to a single one based on priority csv
    """
    if len(class_list) == 0:
        return default_classification
    valid_classes = [cls for cls in class_list if cls in priority_mapping]
    if len(valid_classes) == 0:
        return default_classification
    return sorted(valid_classes, key=lambda c: priority_mapping[c])[0]

# Function for decision-tree-based classification
def classify_using_decision_tree(input_shp: str, rules_csv: str, priority_csv: str, output_dir: str):
    """
    Classifies roads using decision tree rules and resolves classifications based on priority.
    Saves a shapefile with classifications prioritized
    """
    print("- Reading roads shapefile...")
    roads_gdf = gpd.read_file(input_shp)
    print("- Reading classification rules CSV...")
    rules = pd.read_csv(rules_csv)
    print("- Reading classification priority CSV...")
    priority = pd.read_csv(priority_csv)
    priority_mapping = dict(zip(priority["Classification"], priority["Priority"]))
    default_classification = "NOT FOUND IN LIST"

    # Initialize empty classifications for roads
    roads_gdf["All_Class"] = [[] for _ in range(len(roads_gdf))]

    print("- Applying classification rules...")
    for _, rule in rules.iterrows():
        # Parse conditions, operators, and values from rule
        conditions = []
        for i in range(1, 10):  # Process up to 9 conditions
            condition = rule.get(f"Condition_{i}")
            operator = rule.get(f"Operator_{i}")
            value = rule.get(f"Value_{i}")

            # Skip if any part of the condition is missing
            if pd.isna(condition) or pd.isna(operator) or pd.isna(value):
                continue

            # Format the value properly (add quotes for string, leave numeric as is)
            if isinstance(value, str):
                value = f'"{value}"'
            else:
                value = str(value)

            # Append the condition as a valid query string
            conditions.append(f"({condition} {operator} {value})")

        # Combine conditions into a query string
        if len(conditions) > 0:
            query = " and ".join(conditions)
        else:
            continue  # Skip rules without valid conditions

        # Classification from rule
        classification = rule["Classification"]

        try:
            # Evaluate query and get matching road indices
            matching_indices = roads_gdf.query(query).index
            for idx in matching_indices:
                roads_gdf.at[idx, "All_Class"].append(classification)
        except Exception as e:
            print(f"Error applying rule: {rule}")
            print(f"Generated query: {query}")
            print(f"Error details: {e}")
            continue  # Skip to the next rule

    print("- Resolving classifications to single priority-based classification...")
    roads_gdf["FINALCLASS"] = roads_gdf["All_Class"].apply(
        lambda class_list: resolve_to_priority(class_list, priority_mapping, default_classification)
    )

    # Ensure output directory exists
    os.makedirs(output_dir, exist_ok=True)

    # Save the resulting shapefile
    output_shp = f"{output_dir}/Corpus_Christi_roads_processed_decisiontree.shp"
    print(f"- Saving updated shapefile to: {output_shp}")
    roads_gdf.to_file(output_shp, driver="ESRI Shapefile")
    print("- Decision tree classification completed and file saved.")

# Function for point-based metric classification
def classify_using_metrics(input_shp: str, metrics_csv: str, priority_csv: str, output_dir: str):
    """
    Classifies roads based on metrics using ranges, weights, and priorities
    Resolves classification ties using the resolve_to_priority function
    Saves a shapefile with classifications
    """
    print("- Reading roads shapefile...")
    roads_gdf = gpd.read_file(input_shp)
    print("- Reading metrics CSV...")
    metrics = pd.read_csv(metrics_csv)
    metrics = metrics[metrics["Metric"].notna()]
    print(f"-- Metrics loaded. {len(metrics)} rows to process.")
    print("- Reading classification priority CSV...")
    priority = pd.read_csv(priority_csv)
    priority_mapping = dict(zip(priority["Classification"], priority["Priority"]))
    default_classification = "Unclassified"

    print("- Initializing classification data structures...")
    roads_gdf["Class_PNT"] = [{} for _ in range(len(roads_gdf))]

    print("- Applying metric-based classifications...")
    for _, rule in metrics.iterrows():
        metric = rule["Metric"]
        min_value = rule["Min"]
        max_value = rule["Max"]
        weight = rule["Weight"]
        classification = rule["Classification"]

        # Build query conditions for the metric range
        query = f"{metric} > {min_value}" if not pd.isna(min_value) else ""
        if not pd.isna(max_value):
            query += f" and {metric} <= {max_value}" if query else f"{metric} <= {max_value}"

        # Apply query to get matching indices
        matching_indices = roads_gdf.query(query).index
        for idx in matching_indices:
            # Update points for classification
            roads_gdf.at[idx, "Class_PNT"].setdefault(classification, 0)
            roads_gdf.at[idx, "Class_PNT"][classification] += weight

    print("- Resolving final classification using priority...")
    def resolve_tie(classes_points):
        """
        Resolves the classification when point values tie
        Uses the resolve_to_priority function to determine the final priority-based classification
        """
        if not classes_points:
            return default_classification
        max_point_value = max(classes_points.values())
        tied_classes = [
            c for c, points in classes_points.items() if points == max_point_value
        ]
        if len(tied_classes) == 1:  # No tie
            return tied_classes[0]
        # Resolve tie using priority mapping
        return resolve_to_priority(tied_classes, priority_mapping, default_classification)

    # Apply tie-resolution logic to each row
    roads_gdf["FINALCLASS"] = roads_gdf["Class_PNT"].apply(resolve_tie)

    # Ensure output directory exists
    os.makedirs(output_dir, exist_ok=True)

    # Save the resulting shapefile
    output_shp = f"{output_dir}/Corpus_Christi_roads_processed_metrics.shp"
    print(f"- Saving updated shapefile to: {output_shp}")
    roads_gdf.to_file(output_shp, driver="ESRI Shapefile")
    print("- Metric-based classification completed and file saved.")

# CHANGE VALUES HERE FOR YOUR INPUTS
input_shp_path = "output/rev7/Corpus_Christi_roads_processed.shp"
rules_csv_path = "decisiontree_3.csv"
priority_csv_path = "Classification_Priority.csv"
metrics_csv_path = "Classification_Point.csv"
output_directory = "output/rev7"

# Run Decision Tree Classification
print("- Running decision tree method...")
classify_using_decision_tree(
    input_shp=input_shp_path,
    rules_csv=rules_csv_path,
    priority_csv=priority_csv_path,
    output_dir=output_directory
)

# Run Metric-Based Classification
print("- Running metric-based classification method...")
classify_using_metrics(
    input_shp=input_shp_path,
    metrics_csv=metrics_csv_path,
    priority_csv=priority_csv_path,
    output_dir=output_directory
)

- Running decision tree method...
- Reading roads shapefile...
- Reading classification rules CSV...
- Reading classification priority CSV...
- Applying classification rules...
- Resolving classifications to single priority-based classification...
- Saving updated shapefile to: output/rev7/Corpus_Christi_roads_processed_decisiontree.shp
- Decision tree classification completed and file saved.
- Running metric-based classification method...
- Reading roads shapefile...
- Reading metrics CSV...
-- Metrics loaded. 25 rows to process.
- Reading classification priority CSV...
- Initializing classification data structures...
- Applying metric-based classifications...
- Resolving final classification using priority...
- Saving updated shapefile to: output/rev7/Corpus_Christi_roads_processed_metrics.shp
- Metric-based classification completed and file saved.
