# Create validation data
3/14/2025


In [None]:
import arcpy
import pandas as pd
import numpy as np
import os
from tqdm import tqdm

arcpy.env.overwriteOutput = True
arcpy.env.outputCoordinateSystem =  arcpy.SpatialReference("USA Contiguous Albers Equal Area Conic USGS")

data_path = "your_path"
results_path = "your_results_path"
scratch_gdb = "your_scratch_path"

# Local parcel data
meck = os.path.join(data_path, "city_buildings/Mecklenburg_NC/TaxData_2020.shp")
sacramento = os.path.join(data_path, "city_buildings/Sacramento_CA/Parcels.shp")
miami = os.path.join(data_path, "city_buildings/Miami_FL/parcels_with_landuse.gdb/parcels_with_landuse")

## Functions

In [None]:
# Classification functions
def classify_sacramento(row, field_index):
    residential_types = {"Residential"} 
    non_residential_types = {"Agricultural", "Care / Health", "Church / Welfare","Industrial", "Miscellaneous", "Office", "Public / Utilities","Recreational","Retail / Commercial"}
    return 1 if row[field_index] in residential_types else 0 if row[field_index] in non_residential_types else None
    
def classify_meck(input_shapefile, output_shapefile):
    # Set up the environment
    arcpy.env.overwriteOutput = True
    
    # Define land use classifications
    residential_types = {'Condo/Townhome', 'Multi-Family', 'Single-Family'}
    non_residential_types = {'Commercial', 'Govt-Inst', 'Hotel/Motel', 'Office', 'Warehouse'}
    
    # Copy input shapefile to output location
    print("Copying features...")
    arcpy.CopyFeatures_management(input_shapefile, output_shapefile)
    
    # Add the res_or_not field if it doesn't exist
    if "res_or_not" not in [f.name for f in arcpy.ListFields(output_shapefile)]:
        arcpy.AddField_management(output_shapefile, "res_or_not", "SHORT")
    
    # Get total count for progress bars
    total_count = int(arcpy.GetCount_management(output_shapefile)[0])
    
    # Dictionary to store objectid -> list of all valid descproper values
    objectid_descproper = {}
    print("First pass: Collecting land use classifications for each objectid...")
    # Collect all unique descproper values per objectid
    with arcpy.da.SearchCursor(output_shapefile, ["objectid", "descproper"]) as cursor:
        with tqdm(total=total_count, desc="Scanning features") as pbar:
            for objectid, landuse in cursor:
                if objectid not in objectid_descproper:
                    objectid_descproper[objectid] = set()
                # Include all values, even empty ones, but strip them
                if landuse is not None:
                    objectid_descproper[objectid].add(landuse.strip())
                pbar.update(1)
    
    # Dictionary to store the best res_or_not value per objectid
    objectid_res_or_not = {}
    print("Assigning residential classifications...")
    # Determine the best classification per objectid
    for objectid, landuse_values in tqdm(objectid_descproper.items(), desc="Processing classifications"):
        if any(lu in residential_types for lu in landuse_values):
            objectid_res_or_not[objectid] = 1  # Residential
        elif any(lu in non_residential_types for lu in landuse_values):
            objectid_res_or_not[objectid] = 0  # Non-Residential
        else:
            objectid_res_or_not[objectid] = None  # Unclassified (NULL)
    
    print("Second pass: Updating dataset with classifications...")
    # Create a dictionary to store the best row for each objectid
    best_rows = {}
    
    def is_better_row(current_descproper, current_classification, best_descproper, best_classification):
        """Helper function to determine if current row is better than the stored best row"""
        # Normalize descproper values
        current_descproper = current_descproper.strip() if current_descproper else ""
        best_descproper = best_descproper.strip() if best_descproper else ""
        
        # If current is residential and best isn't, current wins
        if current_classification == 1 and best_classification != 1:
            return True
            
        # If both are residential or both are non-residential, prefer non-empty descproper
        if current_classification == best_classification:
            # If current is a valid non-residential type and best is empty/blank
            if (current_descproper in non_residential_types and 
                (not best_descproper or best_descproper.isspace())):
                return True
            # If both are empty or both are non-empty, keep the existing one
            return False
            
        # If current is non-residential and best is unclassified, current wins
        if current_classification == 0 and best_classification is None:
            return True
            
        # In all other cases, keep the existing best row
        return False
    
    with arcpy.da.SearchCursor(output_shapefile, ["objectid", "descproper", "SHAPE@", "OID@"]) as cursor:
        with tqdm(total=total_count, desc="Finding best rows") as pbar:
            for row in cursor:
                objectid, descproper, shape, oid = row
                current_classification = objectid_res_or_not.get(objectid)
                
                # Initialize best row if we haven't seen this objectid
                if objectid not in best_rows:
                    best_rows[objectid] = (oid, descproper, current_classification)
                    pbar.update(1)
                    continue
                
                # Get the existing best row's information
                best_oid, best_descproper, best_classification = best_rows[objectid]
                
                # Check if current row is better using our helper function
                if is_better_row(descproper, current_classification, 
                               best_descproper, best_classification):
                    best_rows[objectid] = (oid, descproper, current_classification)
                
                pbar.update(1)
    
    print("Third pass: Applying final classifications and removing duplicates...")
    with arcpy.da.UpdateCursor(output_shapefile, ["objectid", "res_or_not", "OID@"]) as cursor:
        with tqdm(total=total_count, desc="Finalizing features") as pbar:
            for row in cursor:
                objectid, _, oid = row
                if objectid in best_rows:
                    best_oid, _, classification = best_rows[objectid]
                    if oid == best_oid:
                        row[1] = classification  # Update classification
                        cursor.updateRow(row)
                    else:
                        cursor.deleteRow()  # Remove non-best rows
                else:
                    row[1] = None  # Set classification to NULL for any remaining rows
                    cursor.updateRow(row)
                pbar.update(1)
    
    print(f"Process completed successfully. Output saved to: {output_shapefile}")
    # Return count of residential parcels for verification
    with arcpy.da.SearchCursor(output_shapefile, ["res_or_not"]) as cursor:
        residential_count = sum(1 for row in cursor if row[0] == 1)
    print(f"Total residential parcels: {residential_count}")

def create_miami_parcels_w_lu(parcel_polygons, landuse_layer, output_fc, workspace):
    """
    Samples land use data for parcels by converting parcels to points and sampling.
    Uses file-based storage for temporary data and cursor-based joining for speed.
    
    Parameters:
    parcel_polygons (str): Path to input parcel polygon feature class
    landuse_layer (str): Path to land use polygon feature class
    output_fc (str): Path to output feature class
    workspace (str): Path to folder for temporary files
    """
    import arcpy
    import os
    from datetime import datetime
    
    # Set up environment
    arcpy.env.overwriteOutput = True
    
    # Generate timestamp for unique temporary file names
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    
    # Temporary feature classes in specified workspace
    temp_points = os.path.join(workspace, f"parcel_points_{timestamp}")
    temp_landuse = os.path.join(workspace, f"landuse_projected_{timestamp}")
    temp_sampled_points = os.path.join(workspace, f"sampled_points_{timestamp}")
    
    try:
        print("Converting parcels to points...")
        arcpy.FeatureToPoint_management(
            in_features=parcel_polygons,
            out_feature_class=temp_points,
            point_location="INSIDE"
        )
        
        print("Reprojecting land use layer...")
        arcpy.FeatureClassToFeatureClass_conversion(
            in_features=landuse_layer,
            out_path=workspace,
            out_name=f"landuse_projected_{timestamp}"
        )
        
        print("Sampling land use at points...")
        arcpy.SpatialJoin_analysis(
            target_features=temp_points,
            join_features=temp_landuse,
            out_feature_class=temp_sampled_points,
            join_operation="JOIN_ONE_TO_ONE",
            join_type="KEEP_ALL",
            match_option="INTERSECT"
        )
        
        print("Copying parcels to output location...")
        arcpy.CopyFeatures_management(parcel_polygons, output_fc)
        
        # Add LU field if it doesn't exist
        if "LU" not in [f.name for f in arcpy.ListFields(output_fc)]:
            arcpy.AddField_management(output_fc, "LU", "TEXT", field_length=50)
        
        print("Creating land use lookup dictionary...")
        landuse_dict = {}
        with arcpy.da.SearchCursor(temp_sampled_points, ["PID", "LU"]) as cursor:
            for pid, lu in cursor:
                landuse_dict[pid] = lu
        
        print("Updating parcels with land use data...")
        with arcpy.da.UpdateCursor(output_fc, ["PID", "LU"]) as cursor:
            for row in cursor:
                pid = row[0]
                if pid in landuse_dict:
                    row[1] = landuse_dict[pid]
                    cursor.updateRow(row)
        
    except Exception as e:
        print(f"Error: {str(e)}")
        raise
        
    finally:
        print("Cleaning up temporary data...")
        # Clean up temporary feature classes
        for temp_fc in [temp_points, temp_landuse, temp_sampled_points]:
            if arcpy.Exists(temp_fc):
                arcpy.Delete_management(temp_fc)
    
    print(f"Process completed successfully. Output saved to: {output_fc}")
    parcel_count = int(arcpy.GetCount_management(output_fc)[0])
    print(f"Total parcels processed: {parcel_count}")

def classify_miami(row, field_index):
    residential_types = {"0", "10", "11", "12", "13", "20", "30", "35", "50", "61", "435"} 
    none_types = {"65", "69", "160", "170", "180", "800", "801", "802", "803", "804", "805"}
    return 1 if row[field_index] in residential_types else None if row[field_index] in none_types else 0

def get_classifier_and_field(city, fields):
    """Returns the appropriate classifier function and field index based on city."""
    city = city.lower()
    if city == "sacramento":
        return classify_sacramento, fields.index("LU_GENERAL")
    elif city == "miami":
        return classify_miami, fields.index("LU")
    else:
        raise ValueError(f"Unsupported city: {city}")

def process_shapefile(input_shapefile, output_location, city):
    """
    Process a shapefile by copying it and adding residential classification.
    
    Parameters:
        input_shapefile (str): Path to input shapefile
        output_location (str): Directory for output
        city (str): City name for classification rules
    """
    # Create the output paths
    output_name = f"{city}_residential_classified"
    output_path = os.path.join(output_location, output_name)
    
    # Copy the input feature class
    arcpy.CopyFeatures_management(input_shapefile, output_path)
    
    # Add the new field
    arcpy.AddField_management(output_path, "res_or_not", "SHORT")
    
    # Get the total count for progress bar
    total_count = int(arcpy.GetCount_management(output_path)[0])
    
    # Process the data
    with arcpy.da.UpdateCursor(output_path, ["*"]) as cursor:
        # Get classifier function and field index once, outside the loop
        classifier_func, field_index = get_classifier_and_field(city, cursor.fields)
        
        # Create progress bar
        with tqdm(total=total_count, desc=f"Processing {city}") as pbar:
            for row in cursor:
                # Classify directly using row data and field index
                res_value = classifier_func(row, field_index)
                row[-1] = res_value
                cursor.updateRow(row)
                pbar.update(1)

def update_shapefile(input_shapefile, output_location, city):
    """
    Update an existing classified shapefile.
    
    Parameters:
        input_shapefile (str): Path to input shapefile
        output_location (str): Directory containing the file
        city (str): City name for classification rules
    """
    output_name = f"{city}_residential_classified"
    output_path = os.path.join(output_location, output_name)
    
    # Get the total count for progress bar
    total_count = int(arcpy.GetCount_management(output_path)[0])
    
    with arcpy.da.UpdateCursor(output_path, ["*"]) as cursor:
        # Get classifier function and field index once, outside the loop
        classifier_func, field_index = get_classifier_and_field(city, cursor.fields)
        
        # Create progress bar
        with tqdm(total=total_count, desc=f"Updating {city}") as pbar:
            for row in cursor:
                # Classify directly using row data and field index
                res_value = classifier_func(row, field_index)
                row[-1] = res_value
                cursor.updateRow(row)
                pbar.update(1)

def join_classifications(city_data, state_data, city_name, output_workspace):
    """Perform spatial join and calculate accuracy metrics."""
    print(f"\nProcessing {city_name}...")
    
    # Define output file for spatial join
    joined_fc = os.path.join(output_workspace, f"{city_name}_validation")

    # Perform spatial join using LARGEST_OVERLAP
    print("Performing spatial join...")
    arcpy.analysis.SpatialJoin(
        target_features=city_data,
        join_features=state_data,
        out_feature_class=joined_fc,
        join_operation="JOIN_ONE_TO_ONE",
        join_type="KEEP_COMMON",
        match_option="LARGEST_OVERLAP"
    )
    print("Joined!") 

## Classify local parcel datasets

In [None]:
# Specify your output workspace
output_workspace = os.path.join(results_path, "validation/input_cities.gdb")  

# Need to add land use to Miami parcels
create_miami_parcels_w_lu(os.path.join(data_path, "city_buildings/Miami_FL/Parcel.shp"), os.path.join(data_path, "city_buildings/Miami_FL/Land_Use.shp"), os.path.join("city_buildings/Miami_FL/parcels_with_landuse.gdb/parcels_with_landuse"),os.path.join(data_path, "city_buildings/Miami_FL"))

# Separate classification function for Mecklenburg county
classify_meck(meck, os.path.join(output_workspace, "meck_residential_classified"))

# Classify Sacramento and Miami
cities = {
    "sacramento": sacramento,
    "miami": miami
}

for city, shapefile in cities.items():
    print(f"Processing {city}...")
    # process_shapefile(shapefile, output_workspace, city)
    
    process_shapefile(shapefile, output_workspace, city)
    print(f"Created new feature class for {city}")


## Set up for creating validation data

In [None]:
# Update city paths with updated classified parcel data
output_workspace = os.path.join(results_path, "validation/input_cities.gdb")

meck = os.path.join(output_workspace, "meck_residential_classified")
sacramento = os.path.join(output_workspace, "sacramento_residential_classified")
miami = os.path.join(output_workspace, "miami_residential_classified")

# Paths to state building footprints
state_footprints = {
    "NC": os.path.join(data_path, "overture/overture_classified.gdb/NC_buildings_class"),
    "FL": os.path.join(data_path, "overture/overture_classified.gdb/FL_buildings_class"),
    "CA": os.path.join(data_path, "overture/overture_classified.gdb/CA_buildings_class")
}

# Define city-state mapping
city_state_mapping = {
    "meck": "NC",
    "sacramento": "CA",
    "miami": "FL"
}

city_to_city_mapping = {
    "meck": meck,
    "sacramento": sacramento,
    "miami": miami
}

## Create validation data

In [None]:
# Output workspace for results
output_workspace = os.path.join(results_path, "validation/sj_cities.gdb")

# census blocks layer
arcpy.management.MakeFeatureLayer(os.path.join(results_path, "census/blocks.gdb/blocks_floodpop"), "blocks_w_buildings")

# Process each city
results = {}
for city in ["meck", "miami", "sacramento"]:
    state = city_state_mapping[city]
    state_data = state_footprints[state]
    state_block_data = state_footprints[state]
    city_data = city_to_city_mapping[city]

    # Perform spatial join 
    join_classifications(city_data, state_data, city, output_workspace)

    # Select and export buildings that intersect with the city data
    print("- Selecting buildings that intersect parcel data")
    arcpy.management.MakeFeatureLayer(state_data, "state_layer")
    
    arcpy.management.SelectLayerByLocation(
        in_layer="state_layer",
        overlap_type="INTERSECT",
        select_features=city_data,
        search_distance=None,
        selection_type="NEW_SELECTION",
        invert_spatial_relationship="NOT_INVERT"
    )

    city_fc_path = os.path.join(results_path, f"validation/buildings_cities.gdb/{city}_buildings")
    arcpy.conversion.ExportFeatures(
        in_features="state_layer",
        out_features=city_fc_path,
        where_clause="",
        use_field_alias_as_name="NOT_USE_ALIAS",
        sort_field=None
    )

    print("- Creating buildings layer with land use")
    city_val_path = os.path.join(results_path, f"validation/sj_cities.gdb/{city}_validation")
    building_w_lu_path = os.path.join(results_path, f"validation/buildings_cities.gdb/{city}_buildings_w_lu")
    
    arcpy.analysis.SpatialJoin(
        target_features=city_fc_path,
        join_features=city_val_path,
        out_feature_class=building_w_lu_path,
        join_operation="JOIN_ONE_TO_ONE",
        join_type="KEEP_ALL",
        match_option="HAVE_THEIR_CENTER_IN",
        search_radius=None,
        distance_field_name="",
        match_fields=None
    )

    city_val_blocks_path = os.path.join(results_path, f"validation/validation_blocks.gdb/{city}_blocks")
    
    print("- Creating census blocks for the city")
    arcpy.analysis.SpatialJoin(
        target_features="blocks_w_buildings",
        join_features=building_w_lu_path,
        out_feature_class=city_val_blocks_path,
        join_operation="JOIN_ONE_TO_ONE",
        join_type="KEEP_COMMON",
        match_option="INTERSECT",
        search_radius=None,
        distance_field_name="",
        match_fields=None
    )