In [1]:
import math
from shapely.geometry import Polygon, Point, MultiPolygon, LineString
import geopandas as gpd
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import pickle

# Notebook overview

In this notebook we analyze the processed clusters based on their shape and size. We check for two requirements:
1. Is the cluster a rectangle?
2. Do the sides match the lengths of a crosswalk stripe?

Once we have checked each cluster, we merge the ones that stem from the same original polygon into one crosswalk polygon.

Additionally, we take the two outer stripes of each crosswalk polygon and extend these to create a polygon on each end of the crosswalk. These polygons will be run through notebook 2 in order to check if any crosswalk stripes were missed.

**Input**: cluster dictionary as created in notebook 2.

**Output**: Shapely file with extended polygons that serve as input for the next step.

**Previous notebook**: 2. Point cloud processing, matching and cluster growing.

**Next notebook**: 2. Point cloud processing, matching and cluster growing

## Settings

In [4]:
CRS = 'epsg:28992'

# Loading data

In [5]:
# Load cluster dictionary
path = "../data/output/cluster dict.pkl"

with open(path, 'rb') as file:
    clusters = pickle.load(file)

# Plotting function

In [6]:
# Function to plot PC polygon (can be used if desired, not necessary)
def plot_PC_2D(PC_pol_dict, coords, intensity):
    x = PC_pol_dict[coords][:, 0]
    y = PC_pol_dict[coords][:, 1]
    plt.figure()
    plt.scatter(x, y, c=PC_pol_dict[intensity], cmap='viridis', vmin=0)
    plt.colorbar(label='Reflective index')  # Add colorbar to show gradient values
    plt.xlabel('X')
    plt.ylabel('Y')

    plt.grid(True)
    plt.axis('equal')
    plt.show()

# Filtering polygons

In [7]:
# Create Shapely polygons out of the PC polygons

# Loop over each cluster group
for cluster_group in clusters:

    # Loop over each cluster
    for cluster in cluster_group:

        pols = []

        # Loop over coordinates in cluster and buffer them to create small polygons
        for coor in cluster['clean_coords']:
            temp_point = Point(coor[0], coor[1])
            buff_point = temp_point.buffer(0.08)
            pols.append(buff_point)
        
        # Merge all small coordinate polygons together into one Shapely polgyon
        multi_pol = MultiPolygon(pols)
        cluster['Multi Polygon'] = multi_pol
        cluster['Polygon'] = multi_pol.convex_hull

In [8]:
# Function to plot the polygons
def plot_polygon(polygon):
    # Extract polygon coordinates
    x_coords, y_coords = polygon.exterior.xy
    
    # Calculate aspect ratio based on the range of x and y coordinates
    x_range = max(x_coords) - min(x_coords)
    y_range = max(y_coords) - min(y_coords)
    aspect_ratio = y_range / x_range
    
    # Create the plot with the calculated aspect ratio
    fig, ax = plt.subplots(figsize=(5, 5 * aspect_ratio))  # Adjust the figsize as needed
    ax.set_aspect('equal')  # Ensure equal aspect ratio
    
    # Plot the polygon
    ax.plot(x_coords, y_coords, color='blue')
    
    # Show the plot
    plt.show()

In [9]:
# Function to check if a polygon is a crosswalk stripe
def is_rectangle(polygon):

    # We first check if the minimum rotated rectangle overlaps for at least 90% with the original polygon
    # This filters out shapes that are not square/rectangular
    min_rect = polygon.minimum_rotated_rectangle
    
    area = polygon.area/min_rect.area

    if area > .90:
        area_bool = True
        
    else:
        area_bool = False

    # We then check the lengths of the vertices of the polygon to see if they resemble a crosswalk stripe
    if area_bool == True:
        coords = min_rect.boundary.coords
        
        linestrings = [LineString(coords[k:k+2]) for k in range(len(coords) - 1)]

        lengths = []

        for line in linestrings:
            lengths.append(line.length)
        
        # Define the conditions
        condition1 = sum(1.4 <= length <= 7 for length in lengths)
        condition2 = sum(0.6 <= length <= 0.8 for length in lengths)

        # Check if the conditions are met
        if not (condition1 == 2 and condition2 == 2):
            area_bool = False

    
    return area, area_bool


In [10]:
# Function to merge rectangular clusters of one crosswalk polygon
def merge_clusters(clusters):

    polygons = []
    clean_coords = []
    clean_intensity = []
    coords = []
    intensity = []

    for cluster in clusters:
        len_coords = len(cluster['clean_coords'])

        break_triggered = False
        for item in clean_coords:
            if len(item) == len_coords:
                break_triggered = True
                break  # Exit the inner loop if a match is found

        if break_triggered:
            continue  # Skip the rest of the outer loop for this cluster

        # Only executed if no match is found
        polygons.append(cluster['Polygon'])
        coords.append(cluster['coordinates'])
        intensity.append(cluster['intensity'])
        clean_coords.append(cluster['clean_coords'])
        clean_intensity.append(cluster['clean_intensity'])
    
    PC_coords = np.concatenate(coords, axis=0)
    PC_intensity = np.concatenate(intensity, axis=0)
    PC_coords_clean = np.concatenate(clean_coords, axis=0)
    PC_intensity_clean = np.concatenate(clean_intensity, axis=0)
    
    multi_pol = MultiPolygon(polygons)
    
    pol = multi_pol.convex_hull

    pol_dict = {'CW_index': clusters[0]['CW_index'], 'coordinates': PC_coords, 'intensity': PC_intensity, 'clean_coords': PC_coords_clean, 'clean_intensity': PC_intensity_clean, 'polygon': pol}

    return pol_dict

In [11]:
# Function to filter out duplicates in the rectangle cluster dictionary
def filter_duplicates(data):
    seen_lengths = set()
    unique_items = []
    
    for item in data:
        length = len(item['clean_coords'])
        if length not in seen_lengths:
            seen_lengths.add(length)
            unique_items.append(item)
    
    return unique_items

In [12]:
# Function to check if the polygons are rectangles 
def check_rectangles(clusters):
    # Initialize list of rectangular clusters
    rect_clusters = []

    # Initialize list of final polygons
    polygon_dict = []

    for i in range(0, len(clusters)):

        merged = merge_clusters(clusters[i])

        # Initialize list to keep track of rectangular cluster dictionaries in a crosswalk
        cluster_list = []

        for cluster in clusters[i]:
            
            area, rect_bool = is_rectangle(cluster['Polygon'])
            
            if rect_bool is True:

                # Append cluster to list of rectangular clusters of the crosswalk
                cluster_list.append(cluster)        

        if len(cluster_list) > 0:

            rect_clusters.append(filter_duplicates(cluster_list))

            # Merge clusters
            pol = merge_clusters(cluster_list)

            # Add final polygon to polygon dict
            polygon_dict.append(pol)

    return polygon_dict, rect_clusters

In [13]:
polygon_dict, rect_clusters = check_rectangles(clusters)

In [14]:
# Save final polygons as gdf
path = "../data/output/filtered polygons.shp"
polygons_df = pd.DataFrame.from_records(polygon_dict)
polygons_df = polygons_df.drop(columns=['CW_index', 'coordinates', 'intensity', 'clean_coords', 'clean_intensity'])
polygons_gdf = gpd.GeoDataFrame(polygons_df, crs='epsg:28992', geometry='polygon')
polygons_gdf.to_file(path)

In [15]:
# Function to compute the eucledian distance between two points
def euclidean_distance(p1, p2):
    return np.sqrt((p1[0] - p2[0])**2 + (p1[1] - p2[1])**2)

In [16]:
# Function to get the outer clusters of a group of clusters that stem from the same polygon
def get_outer_clusters(clusters):
    if len(clusters) == 1:
        return [clusters[0], clusters[0]]
    
    centroid_points = []

    for cluster in clusters:
        centroid = np.array(cluster['Polygon'].centroid.coords[0])
        centroid_point = Point(centroid[0], centroid[1])
        centroid_points.append(centroid_point)
    point_coords = np.array([(point.x, point.y) for point in centroid_points])

    max_distance = 0
    outer_points = (None, None)

    for i in range(len(point_coords)):
        for j in range(i + 1, len(point_coords)):
            dist = euclidean_distance(point_coords[i], point_coords[j])
            if dist > max_distance:
                max_distance = dist
                outer_points = (point_coords[i], point_coords[j])

    # Retrieve original cluster
    # Iterate through the clusters
    outer_clusters = []

    # Iterate through the clusters
    for cluster in clusters:
        # Extract the centroid of the cluster
        centroid = np.array(cluster['Polygon'].centroid.coords[0])
        
        # Create a Point object from the centroid coordinates
        centroid_point = Point(centroid[0], centroid[1])
        
        # Check if the centroid matches either of the outer points
        if np.array_equal(centroid_point.coords[0], outer_points[0]): 
            outer_clusters.append(cluster)

        elif np.array_equal(centroid_point.coords[0], outer_points[1]):
            outer_clusters.append(cluster)
    
    centroids = []
    for cluster in outer_clusters:
        centroid = cluster['Polygon'].centroid.x
        centroids.append(centroid)
    
    if len(centroids) > 1:
        if centroids[1] < centroids[0]:
            outer_clusters = [outer_clusters[1], outer_clusters[0]]

    return outer_clusters

In [17]:
# Get the outer clusters of each group of clusters
outer_cluster_dict = []

for clusters in rect_clusters:
    outer_clusters = get_outer_clusters(clusters)
    outer_cluster_dict.append(outer_clusters)

In [18]:
# Function to extend a polygon to either the left or the right
def extend_polygon(outer_clusters, direction):
    # Set the outer cluster based on direction
    if direction == 'right':
        outer_cluster = outer_clusters[1]
    if direction == 'left':
        outer_cluster = outer_clusters[0]

    # Get minimum rectangle
    min_rect = outer_cluster['Polygon'].minimum_rotated_rectangle

    # Extract the coordinates of the rectangle
    rect_coords = list(min_rect.exterior.coords)

    # Remove the duplicate last point
    rect_coords = rect_coords[:-1]

    # Calculate the lengths of the sides
    sides = []
    for i in range(len(rect_coords)):
        p1 = rect_coords[i]
        p2 = rect_coords[(i + 1) % len(rect_coords)]
        side = LineString([p1, p2])
        sides.append((side.length, side))

    # Sort sides by length
    sides.sort(key=lambda x: x[0])

    # Short sides are the first two in the sorted list
    short_sides = sides[:2]

    # Extract start and end points
    if short_sides[0][1].coords[0][0] < short_sides[0][1].coords[1][0]:
        start_point1 = short_sides[0][1].coords[0]
        end_point1 = short_sides[0][1].coords[1]
    else:
        start_point1 = short_sides[0][1].coords[1]
        end_point1 = short_sides[0][1].coords[0]

    if short_sides[1][1].coords[0][0] < short_sides[1][1].coords[1][0]:
        start_point2 = short_sides[1][1].coords[0]
        end_point2 = short_sides[1][1].coords[1]
    else:
        start_point2 = short_sides[1][1].coords[1]
        end_point2 = short_sides[1][1].coords[0]

    # Calculate the direction vector
    direction_vector = (end_point1[0] - start_point1[0], end_point1[1] - start_point1[1])
    # Calculate the length of the direction vector
    length = math.sqrt(direction_vector[0]**2 + direction_vector[1]**2)

    # Normalize the direction vector
    normalized_direction = (direction_vector[0] / length, direction_vector[1] / length)

    # Define the extension length
    extension_length = 2  # for example, extend by 1 unit

    if direction == 'right':
        # Calculate the new end points for right direction
        new_end_point1 = (
            end_point1[0] + normalized_direction[0] * extension_length,
            end_point1[1] + normalized_direction[1] * extension_length
        )

        new_end_point2 = (
            end_point2[0] + normalized_direction[0] * extension_length,
            end_point2[1] + normalized_direction[1] * extension_length
        )

        new_start_point1 = end_point1
        new_start_point2 = end_point2
    else:
        # Calculate the new start points for left direction
        new_start_point1 = (
            start_point1[0] - normalized_direction[0] * extension_length,
            start_point1[1] - normalized_direction[1] * extension_length
        )

        new_start_point2 = (
            start_point2[0] - normalized_direction[0] * extension_length,
            start_point2[1] - normalized_direction[1] * extension_length
        )

        new_end_point1 = start_point1
        new_end_point2 = start_point2

    new_pol = [new_start_point1, new_start_point2, new_end_point1, new_end_point2]
    new_pol = Polygon(new_pol).minimum_rotated_rectangle

    return new_pol


In [19]:
extended_polygons = []

for clusters in outer_cluster_dict:
    if len(clusters) > 1:
        # Extend polygons and plot them in green
        left_extension = extend_polygon(clusters, "left")
        right_extension = extend_polygon(clusters, "right")
        extended_polygons.append(left_extension)
        extended_polygons.append(right_extension)

In [20]:
extended_df = pd.DataFrame({'geometry': extended_polygons})
extended_gdf = gpd.GeoDataFrame(extended_df, geometry='geometry')
extended_gdf = extended_gdf.set_crs(CRS)

In [21]:
# Save extended outer clusters 
path = "../data/output/extended polygons.shp"

extended_gdf.to_file(path)