<a href="https://colab.research.google.com/github/anhpdd/geneal_tsp_150subang/blob/main/notebooks/1_road_id_search.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Install required package
!pip install osmnx

# Standard library imports
import xml.etree.ElementTree as ET
from typing import List, Dict, Any, Optional

# Third-party imports
import pandas as pd
import numpy as np
import geopandas as gpd
import requests
import osmnx as ox
import networkx as nx
import folium
from shapely.geometry import Point, LineString, Polygon, MultiPolygon
from shapely.ops import unary_union, polygonize
from geopy.distance import great_circle
from tqdm.auto import tqdm
from IPython.display import display

# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# Get all values from the worksheet and convert them into a pandas DataFrame
df = pd.read_excel('/content/drive/MyDrive/Colab/Capstone 1/tprop_df_validated.xlsx', sheet_name = 'test_1_static')

# Step 4: Display the DataFrame's info
df.info()

In [None]:
# Get all values from the worksheet and convert them into a pandas DataFrame
df = pd.read_excel('/content/drive/MyDrive/Colab/Capstone 1/tprop_df_test2_updated.xlsx', sheet_name = 'test2')

# Step 4: Display the DataFrame's info
df.info()

In [None]:
df.columns

In [None]:
df['road_name'].nunique()

## Test 2

In [None]:
OSM_API_BASE_URL = "https://api.openstreetmap.org/api/0.6"

# Define your headers once
HEADERS = {
    'User-Agent': 'MyDataProject/1.0 (https://example.com; myemail@example.com)'
}

def fetch_osm_data(url: str, timeout: int = 25) -> ET.Element | None:
    """
    Fetches data from the OSM API and parses it as XML.
    Includes a required User-Agent header.
    """
    try:
        # Add the headers to your request
        response = requests.get(url, timeout=timeout, headers=HEADERS)
        response.raise_for_status()
        return ET.fromstring(response.content)

    except requests.exceptions.HTTPError as e:
        # Your excellent 404 handling
        if e.response is not None and e.response.status_code == 404:
            return None
        print(f"HTTP Error for {url}: {e}")
        return None # Explicitly return None on other HTTP errors

    except requests.exceptions.RequestException as e:
        print(f"Request failed for {url}: {e}")
        return None # Explicitly return None on failure

    except ET.ParseError as e:
        print(f"XML parsing failed for {url}. Error: {e}")
        return None # Explicitly return None on failure


def extract_line_geometry(input_df: pd.DataFrame, name_column: str, id_column: str) -> Optional[gpd.GeoDataFrame]:
    """
    Fetches OSM data for Way IDs, parses road geometry, and returns a GeoDataFrame.
    Includes detailed print statements and a progress bar to show the process.

    Args:
        input_df (pd.DataFrame): DataFrame with a column for Way IDs.
        name_column (str): The name of the column containing the road/location name.
        id_column (str): The name of the column containing the OSM Way ID.

    Returns:
        Optional[gpd.GeoDataFrame]: A GeoDataFrame with road geometries and
                                    merged attributes, or None if no valid data.
    """
    print("--- Starting Geometry Extraction Process ---")
    all_geometries = []
    all_attributes = []

    # Using tqdm to create a progress bar for the loop
    for index, row in tqdm(input_df.iterrows(), total=len(input_df), desc="Processing Way IDs"):
        way_id = str(row[id_column])

        print(f"\nProcessing Way ID: {way_id} for '{row[name_column]}'")

        url = f"{OSM_API_BASE_URL}/way/{way_id}/full"
        print(f"  -> Fetching data from: {url}")
        root = fetch_osm_data(url)

        if not root:
            print(f"  -> Skipping Way ID '{way_id}' due to data fetch/parse error.")
            continue

        road_way_elem = root.find(f".//way[@id='{way_id}']")
        if not road_way_elem:
            print(f"  -> WARNING: Road Way ID '{way_id}' not found in the fetched XML. Skipping.")
            continue

        # Cache all node coordinates from the current XML response
        node_coords_cache = {}
        for node_elem in root.findall(".//node"):
            try:
                node_id = node_elem.get('id')
                lat = float(node_elem.get('lat'))
                lon = float(node_elem.get('lon'))
                node_coords_cache[node_id] = (lon, lat)
            except (TypeError, ValueError):
                pass # Skip nodes with invalid data

        print(f"  -> Found {len(node_coords_cache)} nodes in the XML.")

        # Extract road attributes (tags) from OSM
        osm_road_attrs = {'id': way_id}
        for tag in road_way_elem.findall('tag'):
            osm_road_attrs[tag.get('k')] = tag.get('v')

        # Combine attributes from the input DataFrame row with OSM attributes
        merged_attrs = row.to_dict()
        merged_attrs.update(osm_road_attrs)

        # Build the coordinate list for the LineString geometry
        road_coords = []
        for nd_ref_elem in road_way_elem.findall('nd'):
            node_ref = nd_ref_elem.get('ref')
            coords = node_coords_cache.get(node_ref)
            if coords:
                road_coords.append(coords)

        # Create the LineString if we have at least two points
        if len(road_coords) >= 2:
            line_geometry = LineString(road_coords)
            all_geometries.append(line_geometry)
            all_attributes.append(merged_attrs)
            print(f"  -> SUCCESS: Created LineString for Way ID '{way_id}' with {len(road_coords)} points.")
        else:
            print(f"  -> WARNING: Insufficient coordinates for Way ID '{way_id}'. Skipping.")

    if not all_geometries:
        print("\n--- Process Finished: No valid geometries could be created. ---")
        return None

    print("\n--- Finalizing GeoDataFrame ---")
    print(f"Creating GeoDataFrame with {len(all_geometries)} geometries...")
    gdf = gpd.GeoDataFrame(all_attributes, geometry=all_geometries, crs="EPSG:4326")

    print(f"Dropping duplicate entries based on the '{name_column}' column...")
    gdf = gdf.drop_duplicates(subset=[name_column])
    print(f"  -> Rows after dropping duplicates: {len(gdf)}")

    print("Filtering final columns...")
    gdf = gdf[[name_column, 'id', 'geometry']]

    print("--- Geometry Extraction Complete ---")
    return gdf

def generate_geometry_summary(
    result_gdf: Optional[gpd.GeoDataFrame],
    original_df: pd.DataFrame,
    name_column: str
) -> Optional[pd.DataFrame]:
    """
    Analyzes the result of the geometry extraction and prints a summary.

    Compares the final GeoDataFrame against the initial DataFrame to determine
    success and failure rates for creating road geometries.

    Args:
        result_gdf (Optional[gpd.GeoDataFrame]): The GeoDataFrame returned by the
                                                 extract_line_geometry function.
                                                 Can be None if the process failed entirely.
        original_df (pd.DataFrame): The original DataFrame that was passed to the
                                    geometry extraction function.
        name_column (str): The name of the column containing the unique road/location names.

    Returns:
        Optional[pd.DataFrame]: A DataFrame containing the rows of the original data
                                that failed to produce a valid geometry, or None if all
                                were successful.
    """
    print("\n--- Geometry Extraction Summary ---")

    if original_df.empty:
        print("Original DataFrame is empty. No roads to process.")
        return None

    total_unique_roads = original_df[name_column].nunique()
    print(f"Total unique roads attempted: {total_unique_roads}")

    if result_gdf is None or result_gdf.empty:
        print("No geometries were successfully created.")
        print(f"Success rate: 0.00%")
        print(f"Failure rate: 100.00%")
        return original_df # All records failed

    successful_count = len(result_gdf)
    failed_count = total_unique_roads - successful_count

    print(f"Successfully created geometries: {successful_count} roads ({successful_count/total_unique_roads:.2%})")
    print(f"Failed to create geometries: {failed_count} roads ({failed_count/total_unique_roads:.2%})")

    # Identify which specific roads failed
    successful_names = set(result_gdf[name_column])
    original_names = set(original_df[name_column])
    failed_names = original_names - successful_names

    if failed_names:
        print(f"\nReturning a DataFrame with {len(failed_names)} failed records for review.")
        failed_df = original_df[original_df[name_column].isin(failed_names)].copy()
        return failed_df
    else:
        print("\nCongratulations! All road geometries were created successfully.")
        return None

In [None]:
road_df = df[df['test2_result'] == 'Failed'][['road_name', 'id']].drop_duplicates().reset_index(drop=True)
road_df

In [None]:
# Get the road route details
road_route_details = extract_line_geometry(road_df, 'road_name','id').reset_index(drop=True)
road_route_details

In [None]:
generate_geometry_summary(road_route_details, road_df, 'road_name')

In [None]:
# Replaces 'old_column_name' with 'way_id'
road_route_details['id'] = road_route_details['id'].astype(int)

In [None]:
road_route_details.head()

In [None]:
tprop_df2 = df.merge(road_route_details, on =['road_name', 'id'], how='left')

In [None]:
tprop_df2['test2_result'] = np.where(tprop_df2['geometry'].notna(), 'Pass', 'Failed')
tprop_df2[(tprop_df2['test2_result'] == 'Failed')]

In [None]:
#tprop_df2.to_excel('/content/drive/MyDrive/Colab/Capstone 1/tprop_df_test2_updated.xlsx')

## Test 3

In [None]:
tprop_df2 = pd.read_excel('/content/drive/MyDrive/Colab/Capstone 1/tprop_df_test2_updated.xlsx')
tprop_df2

In [None]:
tprop_df2.info()

In [None]:
from shapely import wkt

# Step 1: Check current status
print(f"Total rows: {len(tprop_df2)}")
print(f"Missing geometries: {tprop_df2['geometry'].isna().sum()}")

# Step 2: Prepare the data - clean the ID column
missing_mask = tprop_df2['geometry'].isna()

if missing_mask.any():
    missing_geom_df = tprop_df2[missing_mask].copy()

    # Filter out rows with NaN IDs and convert to proper integers
    valid_id_mask = missing_geom_df['id'].notna()
    missing_geom_df = missing_geom_df[valid_id_mask].copy()

    # Convert IDs from float to integer (removes the .0)
    missing_geom_df['id'] = missing_geom_df['id'].astype(int).astype(str)

    print(f"\nValid rows to process: {len(missing_geom_df)}")
    print(f"Rows with NaN IDs (skipped): {(~valid_id_mask).sum()}")

    if len(missing_geom_df) > 0:
        print(f"\nExtracting geometries for {len(missing_geom_df)} rows...")

        # Step 3: Extract geometries from OSM
        extracted_gdf = extract_line_geometry(
            input_df=missing_geom_df,
            name_column='road_name',
            id_column='id'
        )

        # Step 4: Fill missing geometries
        if extracted_gdf is not None and not extracted_gdf.empty:
            # Directly update geometries using iterrows (preserves Shapely objects)
            for idx, row in extracted_gdf.iterrows():
                road_name = row['road_name']
                geometry = row['geometry']

                # Find matching rows and update geometry
                mask = (tprop_df2['road_name'] == road_name) & (tprop_df2['geometry'].isna())
                if mask.any():
                    tprop_df2.loc[mask, 'geometry'] = geometry

            filled_count = len(extracted_gdf)
            print(f"\n✓ Successfully filled {filled_count} geometries")
            print(f"Remaining missing geometries: {tprop_df2['geometry'].isna().sum()}")
        else:
            print("\n✗ No geometries could be extracted")

        # Step 5: Show summary
        failed_df = generate_geometry_summary(extracted_gdf, missing_geom_df, 'road_name')

        if failed_df is not None:
            print(f"\n⚠ {len(failed_df)} roads failed to extract:")
            display(failed_df[['road_name', 'id']].head(10))
    else:
        print("\n⚠ No valid IDs to process (all are NaN)")
else:
    print("✓ No missing geometries to fill!")

# Step 6: Convert string geometries to Shapely objects BEFORE creating GeoDataFrame
print("\nConverting geometry column to Shapely objects...")

def convert_to_shapely(geom):
    """Convert string WKT or keep Shapely geometry."""
    if pd.isna(geom):
        return None
    elif isinstance(geom, str):
        try:
            return wkt.loads(geom)
        except Exception as e:
            print(f"Warning: Could not parse geometry: {geom[:50]}...")
            return None
    else:
        # Already a Shapely geometry
        return geom

tprop_df2['geometry'] = tprop_df2['geometry'].apply(convert_to_shapely)

# Step 7: Now convert to GeoDataFrame
if not isinstance(tprop_df2, gpd.GeoDataFrame):
    tprop_df2 = gpd.GeoDataFrame(tprop_df2, geometry='geometry', crs="EPSG:4326")

print(f"\n{'='*60}")
print(f"FINAL SUMMARY:")
print(f"Total rows: {len(tprop_df2)}")
print(f"Rows with geometry: {tprop_df2['geometry'].notna().sum()}")
print(f"Rows without geometry: {tprop_df2['geometry'].isna().sum()}")
print(f"✓ Successfully converted to GeoDataFrame!")

In [None]:
OSM_API_BASE_URL = "https://api.openstreetmap.org/api/0.6"

# Define your headers once
HEADERS = {
    'User-Agent': 'MyDataProject/1.0 (https://example.com; myemail@example.com)'
}

def fetch_osm_data(url: str, timeout: int = 25) -> ET.Element | None:
    """
    Fetches data from the OSM API and parses it as XML.
    Includes a required User-Agent header.
    """
    try:
        # Add the headers to your request
        response = requests.get(url, timeout=timeout, headers=HEADERS)
        response.raise_for_status()
        return ET.fromstring(response.content)

    except requests.exceptions.HTTPError as e:
        # Your excellent 404 handling
        if e.response is not None and e.response.status_code == 404:
            return None
        print(f"HTTP Error for {url}: {e}")
        return None # Explicitly return None on other HTTP errors

    except requests.exceptions.RequestException as e:
        print(f"Request failed for {url}: {e}")
        return None # Explicitly return None on failure

    except ET.ParseError as e:
        print(f"XML parsing failed for {url}. Error: {e}")
        return None # Explicitly return None on failure

In [None]:
def get_district_data(object_id: str) -> dict | None:
    """
    Fetches OSM data for a given object_id, trying as relation, then way, then node.
    Correctly handles inner and outer ways for relations to form polygons with holes.
    """
    final_object_type = None
    final_tags = {}
    final_name_tag = None
    final_all_polygons_coordinates = []
    processed_successfully = False

    # --- Try to process as a RELATION ---
    try:
        relation_url = f"{OSM_API_BASE_URL}/relation/{object_id}/full"
        root_xml = fetch_osm_data(relation_url)
        if not root_xml:
            raise ValueError("XML data could not be fetched.")

        relation_element = root_xml.find(f".//relation[@id='{object_id}']")
        if relation_element is None:
            raise ValueError("Relation element not found in XML.")

        # (This part for getting tags and caching node coordinates is correct and remains the same)
        current_tags = {tag.get('k'): tag.get('v') for tag in relation_element.findall('tag') if tag.get('k')}
        current_name_tag = current_tags.get('name')
        nodes_coords_cache = {
            node.get('id'): (float(node.get('lon')), float(node.get('lat')))
            for node in root_xml.findall('.//node') if node.get('id') and node.get('lat') and node.get('lon')
        }

        # (This part for separating ways into outer/inner segments is also correct)
        outer_way_segments = []
        inner_way_segments = []
        for member in relation_element.findall("member[@type='way']"):
            way_elem = root_xml.find(f".//way[@id='{member.get('ref')}']")
            if way_elem is not None:
                coords = [nodes_coords_cache[nd.get('ref')] for nd in way_elem.findall('nd') if nd.get('ref') in nodes_coords_cache]
                if coords:
                    role = member.get('role', 'outer')
                    if role == 'outer':
                        outer_way_segments.append(coords)
                    elif role == 'inner':
                        inner_way_segments.append(coords)

        # --- DELETED SECTION: The original, fragile stitching loops have been removed. ---

        # +++ NEW, ROBUST STITCHING AND POLYGON CREATION LOGIC +++

        # Convert coordinate segments into Shapely LineString objects
        outer_lines = [LineString(segment) for segment in outer_way_segments]
        inner_lines = [LineString(segment) for segment in inner_way_segments]

        # Merge all connecting lines into continuous, single paths
        merged_outer_lines = unary_union(outer_lines)
        merged_inner_lines = unary_union(inner_lines)

        # Form valid polygons from the closed rings created by the merged lines
        stitched_outer_polygons = list(polygonize(merged_outer_lines))
        stitched_inner_polygons = list(polygonize(merged_inner_lines))

        # --- RESUMING LOGIC WITH CORRECTLY FORMED POLYGONS ---

        final_shapely_polygons = []
        # Create a mutable list of inner polygons to track which ones have been used
        remaining_inners = list(stitched_inner_polygons)

        for outer_poly in stitched_outer_polygons:
            holes_for_this_poly = []
            # This list will hold inner polygons that haven't been assigned to this outer_poly
            unassigned_inners = []

            for inner_poly in remaining_inners:
                # Check if the inner polygon is properly contained within the outer one
                if outer_poly.contains(inner_poly):
                    holes_for_this_poly.append(inner_poly.exterior.coords)
                else:
                    unassigned_inners.append(inner_poly)

            # Update the list of remaining inners for the next outer polygon
            remaining_inners = unassigned_inners

            # Create the final polygon with its associated holes
            final_shapely_polygons.append(Polygon(outer_poly.exterior.coords, holes_for_this_poly))

        # --- (The rest of the function for formatting output remains the same) ---

        if final_shapely_polygons:
            for shp_poly in final_shapely_polygons:
                exterior_coords = list(shp_poly.exterior.coords)
                if not shp_poly.exterior.is_ccw:
                    exterior_coords.reverse()
                poly_data = [exterior_coords]
                for interior_ring in shp_poly.interiors:
                    interior_coords = list(interior_ring.coords)
                    if interior_ring.is_ccw:
                        interior_coords.reverse()
                    poly_data.append(interior_coords)
                final_all_polygons_coordinates.append(poly_data)

            final_tags = current_tags
            final_name_tag = current_name_tag
            final_object_type = "relation"
            processed_successfully = True
            print(f"Successfully processed ID {object_id} as RELATION with {len(final_shapely_polygons)} polygon(s).")
        else:
            print(f"INFO: Relation {object_id} could not form valid polygons.")

    except Exception as e_relation:
        print(f"Error processing relation for {object_id}: {e_relation}. Trying as way.")
        processed_successfully = False

    # (The fallback logic for 'WAY' and 'NODE' remains unchanged)
    # ...

    # --- Final Return ---
    if not processed_successfully or not final_all_polygons_coordinates:
        print(f"FINAL: Could not derive usable geometry for OSM object ID {object_id}.")
        return None

    return {
        'name': final_name_tag,
        'tags': final_tags,
        'all_polygons_coordinates': final_all_polygons_coordinates,
        'id': object_id,
        'type': final_object_type
    }

In [None]:
def create_geometry(row):
    coords_list = row['all_polygons_coordinates'] # This will be a list of lists of coords
    if not coords_list:
        return None

    if row['type'] == 'node':
        # For a node, coords_list will contain one list with one (lon, lat) tuple: [[(lon, lat)]]
        if coords_list and len(coords_list[0]) == 1:
            return Point(coords_list[0][0])
        return None

    # --- NEW / MODIFIED BLOCK FOR WAY TYPES ---
    elif row['type'] == 'way_polygon': # This is the new type from get_district_data
        # It's already determined as a closed way with area tags
        # coords_list will contain one list of (lon, lat) tuples for the polygon exterior
        if coords_list and coords_list[0] and len(coords_list[0]) >= 4: # Min 3 unique points + 1 closing = 4
            # The get_district_data function should have already ensured it's closed,
            # but a defensive check here doesn't hurt.
            if coords_list[0][0] != coords_list[0][-1]:
                # This case should ideally not happen if get_district_data is perfect,
                # but if it does, try to close it or log a warning.
                print(f"Warning: way_polygon {row['id']} was not closed, closing it now.")
                return Polygon(coords_list[0] + [coords_list[0][0]])
            return Polygon(coords_list[0])
        else:
            # If not enough points for a polygon despite being flagged as 'way_polygon',
            # it might be malformed data. Return None or try LineString if appropriate.
            print(f"Warning: way_polygon {row['id']} has insufficient points ({len(coords_list[0])}). Cannot form Polygon.")
            return None # Or LineString(coords_list[0]) if you want a fallback
        return None

    elif row['type'] == 'way_line': # This is the new type from get_district_data
        # It's already determined as a line-string
        # coords_list will contain one list of (lon, lat) tuples for the line
        if coords_list and coords_list[0] and len(coords_list[0]) >= 2: # Min 2 points for a LineString
            return LineString(coords_list[0])
        else:
            print(f"Warning: way_line {row['id']} has insufficient points ({len(coords_list[0])}). Cannot form LineString.")
            return None
        return None
    # --- END NEW / MODIFIED BLOCK ---

    elif row['type'] == 'relation':
        polygons = []
        for poly_data in coords_list: # Each poly_data is [exterior_coords, [hole1_coords], ...]
            if poly_data:
                exterior_coords = poly_data[0]
                interior_coords_list = poly_data[1:]

                # Create Shapely Polygon with holes
                try:
                    poly = Polygon(exterior_coords, interior_coords_list)
                    if poly.is_valid:
                        polygons.append(poly)
                    else:
                        print(f"Warning: Invalid polygon created for relation {row['id']}. Attempting to make valid.")
                        # Attempt to make invalid polygons valid (e.g., using buffer(0))
                        valid_poly = poly.buffer(0)
                        if valid_poly.is_valid:
                            if isinstance(valid_poly, MultiPolygon):
                                polygons.extend(valid_poly.geoms)
                            else:
                                polygons.append(valid_poly)
                        else:
                            print(f"Warning: Could not make polygon valid for relation {row['id']}.")
                except Exception as e:
                    print(f"Error creating polygon for relation {row['id']}: {e}")

        if polygons:
            return MultiPolygon(polygons) if len(polygons) > 1 else polygons[0]
        return None

    else:
        # This 'else' block will catch the old 'way' type if it somehow persists,
        # or any other unexpected types.
        print(f"Unsupported or old type '{row['type']}'. Skipping geometry creation.")
        return None


def create_amen_gdf(df, id_column):
    gdf_list = []
    # Ensure the id_column in the input DataFrame `df` is treated as string
    df[id_column] = df[id_column].astype(str)

    # Fetch data for each ID and append to gdf_list
    for osm_id in df[id_column].unique(): # Use unique IDs to avoid redundant API calls
        result = get_district_data(osm_id)
        if result:
            gdf_list.append(result)

    if not gdf_list:
        print("No valid district data was fetched.")
        return None

    # Create a DataFrame from the fetched OSM data
    dff = pd.DataFrame(gdf_list)

    # Merge the original DataFrame with the fetched OSM data
    # Use left_on=id_column, right_on='id' to merge correctly
    merge_df = pd.merge(df, dff, left_on=id_column, right_on='id', how='left')

    # Create geometry column
    merge_df['geometry'] = merge_df.apply(create_geometry, axis=1)

    # Filter out rows where geometry could not be created
    merge_df = merge_df[merge_df['geometry'].notnull()].reset_index(drop=True)

    # Create GeoDataFrame
    gdf = gpd.GeoDataFrame(merge_df, geometry='geometry', crs="EPSG:4326")
    return gdf

In [None]:
# --- Main Execution ---
district_osm_dict = {
    'GOMBAK': '12438352',
    'HULU LANGAT': '12438351',
    'KLANG': '12391135',
    'HULU SELANGOR': '10714199',
    'KUALA LANGAT': '10743362',
    'KUALA LUMPUR': '2939672',
    'KUALA SELANGOR': '10714137',
    'PETALING': '12391134',
    'PUTRAJAYA': '4443881',
    'SABAK BERNAM': '10714136',
    'SEPANG': '10743315'
}

district_df = pd.DataFrame.from_dict(district_osm_dict, orient='index', columns=['id'])
district_df.index.name = 'district'
district_df = district_df.reset_index()
district_df['id'] = district_df['id'].astype(str) # Ensure IDs are strings

display(district_df.head())

In [None]:
dis = create_amen_gdf(district_df, 'id')
dis

In [None]:
def validate_record_locations(
    records_gdf: gpd.GeoDataFrame,
    districts_gdf: gpd.GeoDataFrame,
    district_col: str = 'district'
) -> gpd.GeoDataFrame:
    """
    Finds the actual district for each road (LineString) based on maximum length overlap.

    Args:
        records_gdf (gpd.GeoDataFrame): Roads to check. Must have LineString geometry and `district_col`.
        districts_gdf (gpd.GeoDataFrame): District boundaries with Polygon geometry.
        district_col (str): The column name linking records to districts.

    Returns:
        gpd.GeoDataFrame: Original records with 'actual_district', 'is_in_correct_district',
                          and 'overlap_length' columns.
    """
    # --- 1. Validation and Preparation ---
    if records_gdf.empty or districts_gdf.empty:
        print("Warning: One or both GeoDataFrames are empty.")
        records_gdf['actual_district'] = 'N/A'
        records_gdf['is_in_correct_district'] = False
        return records_gdf

    # Ensure both GeoDataFrames use the same CRS
    if records_gdf.crs != districts_gdf.crs:
        print(f"Warning: CRS mismatch. Reprojecting records_gdf to match districts_gdf.")
        records_gdf = records_gdf.to_crs(districts_gdf.crs)

    # --- 2. Validate and Fix District Geometries ---
    districts_clean = districts_gdf.copy()
    districts_clean['geometry'] = districts_clean['geometry'].buffer(0)  # Fix any invalid geometries

    # --- 3. Find District with Maximum Road Length Overlap ---
    results = []

    for idx, record in records_gdf.iterrows():
        road_geom = record.geometry
        assigned_district = record[district_col]

        best_district = None
        max_overlap_length = 0
        overlap_details = {}

        # Check intersection with each district
        for _, district in districts_clean.iterrows():
            district_geom = district.geometry
            district_name = district[district_col]

            if district_geom.intersects(road_geom):
                # Calculate the length of road within this district
                intersection = district_geom.intersection(road_geom)

                # Handle different intersection result types
                if intersection.is_empty:
                    overlap_length = 0
                elif hasattr(intersection, 'length'):
                    overlap_length = intersection.length
                else:
                    # Handle GeometryCollection or MultiLineString
                    overlap_length = sum(
                        geom.length for geom in intersection.geoms
                        if hasattr(geom, 'length')
                    )

                overlap_details[district_name] = overlap_length

                # Track the district with maximum overlap
                if overlap_length > max_overlap_length:
                    max_overlap_length = overlap_length
                    best_district = district_name

        # Build result record
        record_dict = record.to_dict()
        record_dict['actual_district'] = best_district if best_district else 'Outside any district'
        record_dict['is_in_correct_district'] = (assigned_district == best_district) if best_district else False
        record_dict['overlap_length'] = max_overlap_length
        record_dict['total_road_length'] = road_geom.length
        record_dict['overlap_percentage'] = (max_overlap_length / road_geom.length * 100) if road_geom.length > 0 else 0

        results.append(record_dict)

    # --- 4. Create Result GeoDataFrame ---
    validated_gdf = gpd.GeoDataFrame(results, crs=records_gdf.crs)

    return validated_gdf

In [None]:
districts_gdf = gpd.GeoDataFrame(dis, crs="EPSG:4326")
input_tprop = tprop_df2.copy()
records_gdf = gpd.GeoDataFrame(input_tprop, crs="EPSG:4326")

In [None]:
input_tprop.shape[0]

In [None]:
records_gdf.shape[0]

In [None]:
validated_records = validate_record_locations(records_gdf, districts_gdf)

In [None]:
faiures_test3 = validated_records[validated_records['is_in_correct_district'] == False]
faiures_test3

In [None]:
faiures_test3.to_excel('/content/drive/MyDrive/Colab/Capstone 1/faiures_test3.xlsx')

In [None]:
#validated_records.to_excel('/content/drive/MyDrive/Colab/Capstone 1/fix.xlsx')