## Width Measuring Algorithm

This notebook serves as the final step in the road width extraction pipeline. It uses a line map to help shape or fill in gaps in the mask, which would then be used together to measure the width of roads.

Kindly ensure you the image files of the **satellite image** (`/roadimages`) and **outputted mask** (`/roadmasks`) in the `/data` folder.

Also make sure that you have a copy of the **OpenStreetMap dataset** of the area you wish to get road widths from, and place it in the `/shapefiles` folder (in this case, the dataset used is the [**Philippines Road Dataset**](https://data.humdata.org/dataset/hotosm_phl_roads))

### **Libraries & Modules**

- random
- cv2
- geopandas
- shapely
- math
- numpy
- pandas
- matplotlib
- pathlib

In [1]:
# %pip install -r requirements.txt
import random
import cv2

import geopandas as gpd
import shapely as sp

import math
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from pathlib import Path

 ### Define Bounds of Extracted Area

In [2]:
def get_sorted_file_parts(data_folder, data_subfolder, sort_index):
    file_paths = [file_path for file_path in (data_folder / data_subfolder).rglob("*") if file_path.is_file()]
    file_names = [file_path.stem for file_path in file_paths]
    file_parts = [file_name.split("_") for file_name in file_names]
    sorted_file_parts = sorted(file_parts, key=lambda x: float(x[sort_index]))
    return sorted_file_parts, file_paths

def get_min_max_coordinates(sorted_file_parts, index):
    min_coord = float(sorted_file_parts[0][index])
    max_coord = float(sorted_file_parts[-1][index])
    return min_coord, max_coord

### Get the OSM Dataset

In [3]:
def get_dataset_area():
    shape = gpd.read_file("data/shapefiles/gis_osm_roads_free_1.shp")
    dataset_area = gpd.read_file("data/shapefiles/dataset_area.shp")
    dataset_roads = shape[dataset_area['geometry'].item().contains(shape['geometry'])]
    # print(dataset_roads.head())
    # dataset_roads[:10000].plot()
    # print(dataset_roads['fclass'].unique())
    return dataset_roads

### Get Patch Parameters and Bounding Box

In [4]:
dataset_roads = get_dataset_area()
centerx, centery = (121.035004, 14.555672)
offset = 0.00068
minx, miny, maxx, maxy = (centerx - offset, centery - offset, centerx + offset, centery + offset)
bbox = sp.geometry.box(minx, miny, maxx, maxy)

### View the Road Patch Dataset

This simply serves as a way to peek at what the patch dataset would look like

In [5]:
# road_patch = dataset_roads.cx[minx:maxx, miny:maxy]
# road_patch.loc[:, 'geometry'] = road_patch['geometry'].apply(lambda geom: geom.intersection(bbox))
# road_patch = road_patch[~road_patch.is_empty]
# road_patch.plot()
# road_patch.head()

### Constants

In [6]:
PATCH_SIZE = 512 # Size of the patches
EARTH_CIRCUFERENCE = 40075016.686 # in meters
SEARCH_INTERVAL = 50  # Distance between interpolated points
SEARCH_RANGE = 10 # pixels the road can be adjusted to find the best fit [coords-SEARCH_RANGE, coords+SEARCH_RANGE]
LINE_LENGTH = 20  # Total length of perpendicular lines
MAX_DISTANCE_FROM_ROAD = 50 # Maximum distance a point can be from a road mask
MAX_ROAD_WIDTH = 150 # Maximum width of a road for it to be considered
LAT_SIZE = 0.0013364 # Estimated degrees per 512 pixels (latitude)
LONG_SIZE = 0.00137216 # Estimated degrees per 512 pixels (longitude)
DISPLAY_ALLOWANCE = 0.0000 # Allowance for displaying the road mask, to avoid clipping at the edges

Display Random Patch

In [7]:
def get_patch_paramaters(patch_path):
    #set patch size
    lat_size = LAT_SIZE/2
    long_size = LONG_SIZE/2
    #right = increase lon offset; up = increase lat offset
    lat_shift = 0.0000
    lon_shift = -0.000053
    patch_lat = float(patch_path.stem.split("_")[-2])
    patch_lon = float(patch_path.stem.split("_")[-1])

    #adjust linestring coordinates 
    minx, miny, maxx, maxy = (patch_lon - long_size, patch_lat - lat_size, patch_lon + long_size, patch_lat + lat_size)
    minx = minx - lon_shift
    maxx = maxx - lon_shift
    miny = miny - lat_shift
    maxy = maxy - lat_shift
    bbox = sp.geometry.box(minx, miny, maxx, maxy)

    #remove roads outside of patch
    patch_roads = dataset_roads.cx[minx:maxx, miny:maxy]
    return (patch_lat, patch_lon, minx, miny, maxx, maxy, bbox, patch_roads)

def get_mask_image(mask_path):
    return cv2.imread(str(mask_path))

def display_patch(patch_path, mask_path, patch_roads, patch_lat, patch_lon, minx, miny, maxx, maxy):

    print("Random patch path:", patch_path)

    image = cv2.imread(str(patch_path))
    mask_image = cv2.imread(str(mask_path))

    if patch_roads.empty:
        print("No roads found in the random patch.")
        plt.imshow(image)
    else:

        #show road patch
        fig, axes = plt.subplots(2, 3, figsize=(15, 7))
        axes[0][0].imshow(image)

        #plot linestrings (roadlines)
        patch_roads.plot(ax=axes[0][1])

        axes[0][1].set_xlim(minx-DISPLAY_ALLOWANCE, maxx+DISPLAY_ALLOWANCE)
        axes[0][1].set_ylim(miny-DISPLAY_ALLOWANCE, maxy+DISPLAY_ALLOWANCE)
        axes[0][1].set_aspect('equal')

        #overlay linestrings on patch
        axes[0][2].imshow(image, extent=[minx, maxx, miny, maxy])
        patch_roads.plot(ax=axes[0][2], color='red')
        axes[0][2].set_title("Overlayed Roads on Image")
        axes[0][2].set_xlim(minx-DISPLAY_ALLOWANCE, maxx+DISPLAY_ALLOWANCE)
        axes[0][2].set_ylim(miny-DISPLAY_ALLOWANCE, maxy+DISPLAY_ALLOWANCE)
        axes[0][2].set_aspect('equal')

        #coordinates relative to total dataset area
        axes[1][0].scatter(patch_lon, patch_lat, color='red')
        axes[1][0].set_xlim(min_lon, max_lon)
        axes[1][0].set_ylim(min_lat, max_lat)
        axes[1][0].set_aspect('equal')
        
        #ground truth mask
        axes[1][1].imshow(mask_image)

        #overlay linestrings on ground truth mask
        axes[1][2].imshow(mask_image, extent=[minx, maxx, miny, maxy])
        patch_roads.plot(ax=axes[1][2], color='red')
        axes[1][2].set_title("Overlayed Roads on Image")
        axes[1][2].set_xlim(minx, maxx)
        axes[1][2].set_ylim(miny, maxy)
        axes[1][2].set_aspect('equal')

        plt.show()


In [8]:
#create scaled version of cropped linemap to fit mask
# Calculate scaling factors
# Function to scale geometries
def scale_geometry(geom, x_scale, y_scale):
    return sp.affinity.scale(geom, xfact=x_scale, yfact=y_scale, origin=(0,0))

def shift_geometry(geom, x_shift, y_shift):
    return sp.affinity.translate(geom, xoff=-x_shift, yoff=-y_shift)

def scale_patch_roads(patch_roads, minx, miny, maxx, maxy):
    # Create a copy of the GeoDataFrame and apply scaling to the copy
    scaled_patch_roads = patch_roads.copy()

    scaled_patch_roads["geometry_scaled"] = scaled_patch_roads["geometry"].apply(
        lambda geom: shift_geometry(geom, minx, miny)
    )
    scaled_patch_roads["geometry_scaled"] = scaled_patch_roads["geometry_scaled"].apply(
        lambda geom: scale_geometry(geom, 512/(maxx-minx), 512/(maxy-miny))
    )

    x_shift_initial = minx
    y_shift_initial = miny
    x_scale = 512/(maxx-minx)
    y_scale = 512/(maxy-miny)

    return scaled_patch_roads, x_shift_initial, y_shift_initial, x_scale, y_scale

In [9]:
def get_binary_mask_from_image(image):
    if len(image.shape) == 3:
        image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    _, image_mask = cv2.threshold(image, 127, 255, cv2.THRESH_BINARY)
    return np.flipud(image_mask)

def get_binary_mask_from_roads(scaled_patch_roads):
    road_mask = np.zeros((512, 512), dtype=np.uint8)
    for geom in scaled_patch_roads["geometry_scaled"]:
        if geom.is_empty:
            continue
        if isinstance(geom, sp.LineString):
            coords = np.array(geom.coords, dtype=np.int32)
            cv2.polylines(road_mask, [coords], isClosed=False, color=1, thickness=1)
    return road_mask

In [10]:
def get_intersection(road_mask, image_mask):
    return np.sum((road_mask > 0) & (image_mask > 0))

In [11]:
def get_shifted_coordinates(scaled_patch_roads, image_mask, display=False):

    shifted_road_masks = [[None for _ in range(2 * SEARCH_RANGE + 1)] for _ in range(2 * SEARCH_RANGE + 1)]

    for i in range(-SEARCH_RANGE, SEARCH_RANGE + 1):
        for j in range(-SEARCH_RANGE, SEARCH_RANGE + 1):
            road_copy = scaled_patch_roads.copy()
            road_copy["geometry_scaled"] = road_copy["geometry_scaled"].apply(
                lambda geom: shift_geometry(geom, i, j)
            )
            road_mask = get_binary_mask_from_roads(road_copy)
            shifted_road_masks[i + SEARCH_RANGE][j + SEARCH_RANGE] = road_mask

    max_intersection = -1
    max_i = -1
    max_j = -1

    # Iterate through the shifted_road_masks to find the greatest intersection
    for i in range(2 * SEARCH_RANGE + 1):
        for j in range(2 * SEARCH_RANGE + 1):
            road_mask = shifted_road_masks[i][j]
            intersection = get_intersection(road_mask, image_mask)
            if intersection > max_intersection:
                max_intersection = intersection
                max_i = i
                max_j = j

    best_road_mask = shifted_road_masks[max_i][max_j]

    best_shift_values = (max_i - SEARCH_RANGE, max_j - SEARCH_RANGE)


    x_shift_second, y_shift_second = best_shift_values[0], best_shift_values[1]

    if display:

        print(f"Maximum intersection: {max_intersection}")
        print(f"Shift coordinates: ({max_i - SEARCH_RANGE}, {max_j - SEARCH_RANGE})")

        plt.figure(figsize=(10, 10))
        plt.imshow(image_mask, cmap='gray', interpolation='none')
        plt.imshow(best_road_mask, cmap='Reds', alpha=0.5, interpolation='none')  # Overlay road_mask in red with transparency
        plt.title("Intersection of Road Mask and Image Mask")
        plt.gca().invert_yaxis()
        plt.show()
    
    return x_shift_second, y_shift_second

In [12]:
def interpolate_points(line, distance_interval):
    length = line.length
    num_points = math.ceil(length / distance_interval)
    distances = [i * distance_interval for i in range(num_points + 1)]
    points = [line.interpolate(distance) for distance in distances]
    #remove points that are out of bounds
    points = [point for point in points if point.x >= 0 and point.x < 512 and point.y >= 0 and point.y < 512]
    return points

In [13]:
def perpendicular_line_eq(line, point):
    coords = list(line.coords)

    for i in range(len(coords) - 1):
        x1, y1 = coords[i]
        x2, y2 = coords[i + 1]
        segment = sp.LineString([(x1, y1), (x2, y2)])

        if segment.distance(point) <= 1e-7:  # Check if point is close to the segment
            # Compute original slope
            if x2 - x1 == 0:  # Vertical line
                return (0, point.y)
            elif y2 - y1 == 0:  # Horizontal line
                return (float('inf'), point.x)
            
            slope = (y2 - y1) / (x2 - x1)
            perp_slope = -1 / slope  # Perpendicular slope

            x, y = point.x, point.y
            b = y - perp_slope * x

            return (perp_slope, b)
    
    return None

In [14]:
#gets point slope pairs of a road line
def compute_points_and_perpendiculars(road_line, distance_interval):
    point_slope_pairs = []
    points = interpolate_points(road_line, distance_interval)
    for point in points:
                perp_result = perpendicular_line_eq(road_line, point)
                if perp_result is not None:
                    slope, intercept = perp_result
                    point_slope_pairs.append((point, slope))
    return point_slope_pairs
    

In [15]:
def display_perpendiculars(scaled_patch_roads):

    bbox = sp.geometry.box(0, 0, 512, 512)
    for road in scaled_patch_roads['geometry_scaled'].items():
        geom = road[1]
        
        if isinstance(geom, sp.MultiLineString):
            for line in geom.geoms:
                point_slope_pairs = compute_points_and_perpendiculars(line, SEARCH_INTERVAL)
                for point_slope_pair in point_slope_pairs:
                    point, slope = point_slope_pair
                    if slope == float('inf'):
                        x1, x2 = point.x, point.x
                        y1, y2 = point.y - LINE_LENGTH / 2, point.y + LINE_LENGTH / 2
                        plt.plot([x1, x2], [y1, y2], color='red')
                    else:
                        dx = LINE_LENGTH / (2 * (1 + slope ** 2) ** 0.5)
                        dy = slope * dx
                        x1, y1 = point.x - dx, point.y - dy
                        x2, y2 = point.x + dx, point.y + dy
                        plt.plot([x1, x2], [y1, y2], color='red')
        elif isinstance(geom, sp.LineString):
            point_slope_pairs = compute_points_and_perpendiculars(geom, SEARCH_INTERVAL)
            for point_slope_pair in point_slope_pairs:
                point, slope = point_slope_pair
                if slope == float('inf'):
                    x1, x2 = point.x
                    y1, y2 = point.y - LINE_LENGTH / 2, point.y + LINE_LENGTH / 2
                    plt.plot([x1, x2], [y1, y2], color='red')
                else:
                    dx = LINE_LENGTH / (2 * (1 + slope ** 2) ** 0.5)
                    dy = slope * dx
                    x1, y1 = point.x - dx, point.y - dy
                    x2, y2 = point.x + dx, point.y + dy
                    plt.plot([x1, x2], [y1, y2], color='red')

    plt.xlim(0, 512)
    plt.ylim(0, 512)
    plt.gca().set_aspect('equal', adjustable='box')
    plt.show()

In [16]:
def measure_nearest_road_to_point(point, slope, image_mask, plot=True):
    min_distance = 2
    x, y = int(point.x), int(point.y)
    width, height = image_mask.shape  # Fix: Ensure correct order (height, width)

    # Check if the point is inside the image bounds
    if not (0 <= x < width and 0 <= y < height):
        return None  # Ensure point is within bounds

    search_mode = "inside" if image_mask[y, x] == 255 else "outside"
    points_forward = []
    points_backward = []

    if slope == float('inf'):  # **Vertical case**
        # Move upward (-y direction)
        found_white = False
        for i in range(y - 1, -1, -1):
            if search_mode == "inside" and (image_mask[i, x] == 0 or i == 0 or i == height - 1):
                points_backward.append((x, i))
                break
            if search_mode == "outside":
                if image_mask[i, x] == 255 and not found_white:
                    points_backward.append((x, i))
                    found_white = True
                    continue
                if found_white and image_mask[i, x] == 0:
                    points_backward.append((x, i))
                    break

        # Move downward (+y direction)
        found_white = False
        for i in range(y + 1, height):
            if search_mode == "inside" and (image_mask[i, x] == 0 or i == 0 or i == height - 1):
                points_forward.append((x, i))
                break
            if search_mode == "outside":
                if image_mask[i, x] == 255 and not found_white:
                    points_forward.append((x, i))
                    found_white = True
                    continue
                if found_white and image_mask[i, x] == 0:
                    points_forward.append((x, i))
                    break

    else:  # **Non-vertical case (diagonal/horizontal)**
        step_size = 1 / max(abs(slope), 1)  # Normalize step size
        dx = step_size if slope >= 0 else -step_size
        dy = slope * dx

        # Move forward
        xi, yi = x + dx, y + dy
        found_white = False
        while 0 <= int(round(xi)) < width and 0 <= int(round(yi)) < height:
            if search_mode == "inside" and (image_mask[int(round(yi)), int(round(xi))] == 0 or int(round(xi)) == 0 or int(round(xi)) == width - 1 or int(round(yi)) == 0 or int(round(yi)) == height - 1):
                points_forward.append((int(round(xi)), int(round(yi))))
                break
            if search_mode == "outside":
                if image_mask[int(round(yi)), int(round(xi))] == 255 and not found_white:
                    points_forward.append((int(round(xi)), int(round(yi))))
                    found_white = True
                    xi += dx
                    yi += dy
                    continue
                if found_white and image_mask[int(round(yi)), int(round(xi))] == 0:
                    points_forward.append((int(round(xi)), int(round(yi))))
                    break
            xi += dx
            yi += dy

        # Move backward
        xi, yi = x - dx, y - dy
        found_white = False
        while 0 <= int(round(xi)) < width and 0 <= int(round(yi)) < height:
            if search_mode == "inside" and (image_mask[int(round(yi)), int(round(xi))] == 0 or int(round(xi)) == 0 or int(round(xi)) == width - 1 or int(round(yi)) == 0 or int(round(yi)) == height - 1):
                points_backward.append((int(round(xi)), int(round(yi))))
                break
            if search_mode == "outside":
                if image_mask[int(round(yi)), int(round(xi))] == 255 and not found_white:
                    points_backward.append((int(round(xi)), int(round(yi))))
                    found_white = True
                    xi -= dx
                    yi -= dy
                    continue
                if found_white and image_mask[int(round(yi)), int(round(xi))] == 0:
                    points_backward.append((int(round(xi)), int(round(yi))))
                    break
            xi -= dx
            yi -= dy

    # **Visualization**
    if plot:
        plt.imshow(image_mask, cmap='gray')
        plt.scatter(x, y, color='red', label='Start Point', edgecolors='black', marker='o')
        if points_forward:
            plt.scatter(*zip(*points_forward), color='blue', label='Forward Boundary', edgecolors='black', marker='x')
        if points_backward:
            plt.scatter(*zip(*points_backward), color='green', label='Backward Boundary', edgecolors='black', marker='s')
        plt.gca().invert_yaxis()
        plt.legend()
        plt.show()

    # **Return total measured width**
    if search_mode == "inside":
        if points_forward and points_backward:
            total_dist = math.dist((point.x, point.y), points_forward[0]) + math.dist((point.x, point.y), points_backward[0])
            if total_dist > MAX_ROAD_WIDTH:
                return None
            return total_dist
    else:
        forward_dist = math.dist(points_forward[0], points_forward[1]) if len(points_forward) > 1 else None
        backward_dist = math.dist(points_backward[0], points_backward[1]) if len(points_backward) > 1 else None
        if forward_dist is not None and forward_dist > MAX_ROAD_WIDTH:
            forward_dist = None
        if backward_dist is not None and backward_dist > MAX_ROAD_WIDTH:
            backward_dist = None

        if forward_dist is not None and backward_dist is not None:
            if math.dist((point.x, point.y), points_forward[0]) < math.dist((point.x, point.y), points_backward[0]) and math.dist((point.x, point.y), points_forward[0]) < MAX_DISTANCE_FROM_ROAD and forward_dist > 1.1:
                return forward_dist
            elif math.dist((point.x, point.y), points_backward[0]) < MAX_DISTANCE_FROM_ROAD and backward_dist > 1.1:
                return backward_dist
        elif forward_dist is not None and forward_dist > 1.1 and math.dist((point.x, point.y), points_forward[0]) < MAX_DISTANCE_FROM_ROAD:
            return forward_dist
        elif backward_dist is not None and backward_dist > 1.1 and math.dist((point.x, point.y), points_backward[0]) < MAX_DISTANCE_FROM_ROAD:
            return backward_dist

    return None

In [17]:
def convert_coords(point_i, x_shift_initial, y_shift_initial, x_scale, y_scale, x_shift_second, y_shift_second):
    point = shift_geometry(point_i, -x_shift_second, -y_shift_second)
    point = scale_geometry(point, 1/x_scale, 1/y_scale)
    point = shift_geometry(point, -x_shift_initial, -y_shift_initial)
    return point

In [18]:
#get width measurements of one road line
def measure_road_width(line,  image_mask, x_shift_initial, y_shift_initial, x_scale, y_scale, x_shift_second, y_shift_second, plot=False):
    point_slope_pairs = compute_points_and_perpendiculars(line, SEARCH_INTERVAL)
    widths = []
    for point_slope_pair in point_slope_pairs:
        point, slope = point_slope_pair
        width = measure_nearest_road_to_point((point), slope, image_mask, plot=plot)
        coords = convert_coords(point, x_shift_initial, y_shift_initial, x_scale, y_scale, x_shift_second, y_shift_second)
        if width is not None:
            widths.append((point_slope_pair, coords, width))
    return widths

In [19]:
# Old method to measure road width (by averaging the width of multiple perpendicular lines)

# instance = scaled_patch_roads.iloc[0]
# geom = instance['geometry_scaled']

# if isinstance(geom, sp.MultiLineString):
#     for line in geom.geoms:
#         widths = measure_road_width(line, image_mask, True)
# elif isinstance(geom, sp.LineString):
#     widths = measure_road_width(geom, image_mask, True)

# average_width = np.mean([width for _, _, width in widths])
# print(f"Average road width: {average_width:.2f} pixels")

In [20]:
# def get_patch_road_widths(roads, image_mask):
#     road_widths = []
#     for road in roads.itertuples():
#         geom = road.geometry_scaled
#         osm_id = road.osm_id
#         if isinstance(geom, sp.MultiLineString):
#             for line in geom.geoms:
#                 widths = measure_road_width(line, image_mask)
#                 road_widths.append((osm_id, widths))
#         elif isinstance(geom, sp.LineString):
#             widths = measure_road_width(geom, image_mask)
#             road_widths.append((osm_id, widths))
#     return road_widths

In [21]:
def get_patch_road_widths_df(roads, image_mask, x_shift_initial, y_shift_initial, x_scale, y_scale, x_shift_second, y_shift_second):
    road_widths = []
    for road in roads.itertuples():
        geom = road.geometry_scaled
        osm_id = road.osm_id
        if isinstance(geom, sp.MultiLineString):
            for line in geom.geoms:
                widths = measure_road_width(line, image_mask, x_shift_initial, y_shift_initial, x_scale, y_scale, x_shift_second, y_shift_second)
                for point_slope_pair, coords, width in widths:
                    point, slope = point_slope_pair
                    road_widths.append({
                        'osm_id': osm_id,
                        'coordinates_on_image': point,
                        'map_coordinates': coords,
                        'slope': slope,
                        'width': width
                    })
        elif isinstance(geom, sp.LineString):
            widths = measure_road_width(geom, image_mask, x_shift_initial, y_shift_initial, x_scale, y_scale, x_shift_second, y_shift_second)
            for point_slope_pair, coords, width in widths:
                point, slope = point_slope_pair
                road_widths.append({
                    'osm_id': osm_id,
                    'coordinates_on_image': point,
                    'map_coordinates': coords,
                    'slope': slope,
                    'width': width
                })
    return pd.DataFrame(road_widths)


In [22]:
def generate_width_statistics(patch_road_widths_df):
    width_stats = patch_road_widths_df.groupby('osm_id')['width'].describe()
    return width_stats

#calculate m per pixel: https://wiki.openstreetmap.org/wiki/Zoom_levels
def meters_per_pixel(lat):
    return EARTH_CIRCUFERENCE * abs(math.cos(math.radians(lat))) / (512 * (2 ** 18))

def get_average_meters_per_pixel(min_lat, max_lat):
    return (meters_per_pixel(min_lat) + meters_per_pixel(max_lat)) / 2

In [23]:
# Get the meter width of the roads in the patch
def get_meter_widths(patch_road_widths_df, avg_m_per_pixel):
    patch_road_widths_df['meter_width'] = patch_road_widths_df['width'] * avg_m_per_pixel
    return patch_road_widths_df 

# Include nameof the road in the DataFrame
def include_road_names(patch_road_widths_df, roads):
    road_names = roads.set_index('osm_id')['name'].to_dict()
    patch_road_widths_df['road_name'] = patch_road_widths_df['osm_id'].map(road_names)
    return patch_road_widths_df

def display_points_with_widths(patch_road_widths_df, image_mask):
    plt.figure(figsize=(12, 12))
    plt.imshow(image_mask, cmap='gray')
    n = len(patch_road_widths_df)
    cmap = plt.get_cmap('tab20', n)  # Use a categorical colormap

    for idx, (_, row) in enumerate(patch_road_widths_df.iterrows()):
        point = row['coordinates_on_image']
        width = row['meter_width']
        road_name = row.get('road_name', 'Unknown')
        osm_id = row['osm_id']
        color = cmap(idx)
        plt.scatter(point.x, point.y, color=color, s=80, edgecolors='black', linewidths=1.5)
        # Annotate with width and road name
        plt.annotate(f"{width:.1f}m\n{road_name}\n{osm_id}", (point.x, point.y), textcoords="offset points", xytext=(5,5), ha='left', fontsize=9, color=color, weight='bold')

    plt.title("Road Widths on Image")
    plt.gca().invert_yaxis()
    plt.tight_layout()
    plt.show()

# Average the width of those with the same osm_id (and then df should just be osm_id, coordinates, road_name, meter_width)
def average_widths_by_osm_id(patch_road_widths_df):
    averaged_df = patch_road_widths_df.groupby('osm_id').agg({
        'coordinates_on_image': 'first',
        'map_coordinates': 'first',
        'slope': 'first',
        'meter_width': 'median',
        'road_name': 'first'
    }).reset_index()
    return averaged_df[['osm_id', 'map_coordinates',  'road_name', 'meter_width']]

In [24]:
def extract_road_width_from_patch(min_lat, max_lat, patch_path = None, file_paths = None, display_patch_with_widths=False):

    if patch_path is None and file_paths is None:
        print("Please provide either a patch path or a list of file paths.")

    if patch_path is None:
        patch_path = random.choice(file_paths)
        print("Random patch path:", patch_path)
        
    mask_path = str(patch_path).replace("roadimages", "roadmasks")

    mask_image = get_mask_image(mask_path)
    patch_lat, patch_lon, minx, miny, maxx, maxy, bbox, patch_roads = get_patch_paramaters(patch_path)
    # display_patch(random_patch_path, mask_path, patch_roads, patch_lat, patch_lon, minx, miny, maxx, maxy)
    # print(patch_roads.head())

    scaled_patch_roads, x_shift_initial, y_shift_initial, x_scale, y_scale = scale_patch_roads(patch_roads, minx, miny, maxx, maxy)

    image_mask = get_binary_mask_from_image(mask_image)
    road_mask = get_binary_mask_from_roads(scaled_patch_roads)
    # plt.imshow(road_mask, cmap='gray')
    # plt.gca().invert_yaxis()

    best_shift_values = get_shifted_coordinates(scaled_patch_roads, 
                                                image_mask,
                                                display=False)

    scaled_patch_roads["geometry_scaled"] = scaled_patch_roads["geometry_scaled"].apply(
        lambda geom: shift_geometry(geom, best_shift_values[0], best_shift_values[1])
    )
    # print(scaled_patch_roads)

    # display_perpendiculars(scaled_patch_roads)
    # test = scaled_patch_roads.copy()
    # print(test["geometry"].iloc[0])
    # point = sp.Point(test["geometry_scaled"].iloc[0].coords[0])
    # print(convert_coords(point))

    # patch_road_widths = get_patch_road_widths(scaled_patch_roads, image_mask)
    patch_road_widths_df = get_patch_road_widths_df(scaled_patch_roads, image_mask, x_shift_initial, y_shift_initial, x_scale, y_scale, best_shift_values[0], best_shift_values[1])
    # width_statistics = generate_width_statistics(patch_road_widths_df)

    avg_m_per_pixel = get_average_meters_per_pixel(min_lat, max_lat)

    patch_road_widths_df = get_meter_widths(patch_road_widths_df, avg_m_per_pixel)
    patch_road_widths_df = include_road_names(patch_road_widths_df, scaled_patch_roads)
    agg_patch_road_widths_df = average_widths_by_osm_id(patch_road_widths_df)

    # Remove rows where name is None or empty
    agg_patch_road_widths_df = agg_patch_road_widths_df[agg_patch_road_widths_df['road_name'].notna() & (agg_patch_road_widths_df['road_name'] != '')]

    if display_patch_with_widths:
        display_points_with_widths(patch_road_widths_df, image_mask)
        
    return agg_patch_road_widths_df

In [None]:
def extract_road_widths_from_random_patch(data_folder, data_subfolder, display_patch_with_widths=False):
    sorted_by_lon, file_paths = get_sorted_file_parts(data_folder, data_subfolder, -1)
    min_lon, max_lon = get_min_max_coordinates(sorted_by_lon, -1)
    sorted_by_lat, file_paths = get_sorted_file_parts(data_folder, data_subfolder, -2)
    min_lat, max_lat = get_min_max_coordinates(sorted_by_lat, -2)

    patch_road_widths_df = extract_road_width_from_patch(min_lat, max_lat, file_paths=file_paths, display_patch_with_widths=display_patch_with_widths)
    return patch_road_widths_df

def extract_road_widths_from_all_patches(data_folder, data_subfolder):

    sorted_by_lon, file_paths = get_sorted_file_parts(data_folder, data_subfolder, -1)
    min_lon, max_lon = get_min_max_coordinates(sorted_by_lon, -1)
    sorted_by_lat, file_paths = get_sorted_file_parts(data_folder, data_subfolder, -2)
    min_lat, max_lat = get_min_max_coordinates(sorted_by_lat, -2)
    widths_df = pd.DataFrame()

    for file_path in file_paths:
        patch_road_widths_df = extract_road_width_from_patch(min_lat, max_lat, patch_path=file_path)
        widths_df = pd.concat([widths_df, patch_road_widths_df], ignore_index=True)

    # Remove outliers from rows with the same road_name using the IQR method
    for road_name, group in widths_df.groupby('road_name'):
        if len(group) < 3:
            continue
        q1 = group['meter_width'].quantile(0.25)
        q3 = group['meter_width'].quantile(0.75)
        iqr = q3 - q1
        lower_bound = q1 - 1.5 * iqr
        upper_bound = q3 + 1.5 * iqr
        widths_df = widths_df[~((widths_df['road_name'] == road_name) & 
                                ((widths_df['meter_width'] < lower_bound) | 
                                 (widths_df['meter_width'] > upper_bound)))]

    # Aggregate by osm_id and get the median width (to remove outliers in the same segment)
    widths_df = widths_df.groupby('osm_id').agg({
        'map_coordinates': 'first',
        'road_name': 'first',
        'meter_width': 'median'
    }).reset_index()

    # Reset index after aggregation
    widths_df.reset_index(drop=True, inplace=True)

    # Sort the DataFrame by road_name, then osm_id
    widths_df.sort_values(by=['road_name', 'osm_id'], inplace=True)

    # Save the DataFrame to a CSV file (/data/roadwidths/{data_subfolder}_road_widths.csv)
    output_path = Path("data/roadwidths") / f"{data_subfolder}_road_widths.csv"
    output_path.parent.mkdir(parents=True, exist_ok=True)
    widths_df.to_csv(output_path, index=False)

### Runner

In [26]:
# This path should point to the folder containing the road images
data_folder = Path("data/roadimages")
# Define the subfolders to process (must be the same in roadimages and roadmasks)
data_subfolders = ["bgc", "jaycee", "nicole", "raffy"]

For saving to csv

In [27]:
for data_subfolder in data_subfolders:
    print(f"Extracting road widths from {data_subfolder}...")
    extract_road_widths_from_all_patches(data_folder, data_subfolder)
    print(f"Road widths extracted and saved to data/roadwidths/{data_subfolder}_road_widths.csv")

Extracting road widths from bgc...


IndexError: list index out of range

For testing

In [None]:
extract_road_widths_from_random_patch(data_folder, data_subfolders[0], display_patch_with_widths=False)

Random patch path: data\roadimages\images\17__14.595545_121.024017.png


Unnamed: 0,osm_id,map_coordinates,road_name,meter_width
0,1289196715,POINT (121.0244582 14.5960002),Lakandili Street,11.002723
1,1289196716,POINT (121.0245004 14.5951577),Saloysoy Street,8.053123
2,24248438,POINT (121.0243544 14.5952125),Saloysoy Street,6.869955
3,24248460,POINT (121.0235596 14.5955095),Lakandili Street,5.960567
4,24509303,POINT (121.02452254031931 14.59597824237464),Aliw-iw Street,6.237479


### View Dataset

In [None]:
road_widths_df = pd.read_csv("data/roadwidths/images_road_widths.csv")

In [None]:
# Display sorted by road name
road_widths_df

Unnamed: 0,osm_id,map_coordinates,road_name,meter_width
0,1251985898,POINT (121.0214196371705 14.595102438022156),3rd Street,7.891973
1,563387015,POINT (121.0229834 14.5966411),Agham Street,6.219081
2,24509303,POINT (121.02452254031931 14.59597824237464),Aliw-iw Street,8.250669
3,563387014,POINT (121.02187028260131 14.597296533113054),Alley 1,7.611858
4,1289196713,POINT (121.0217305 14.5958073),Dupil Street,8.412547
5,24248441,POINT (121.02191802187119 14.596005776585722),Dupil Street,6.705068
6,24248525,POINT (121.0238709 14.597147),Dupil Street,9.337655
7,694195950,POINT (121.0224719 14.5953793),Emergency Entrance,7.628384
8,735083441,POINT (121.0223892 14.5951941),Emergency Entrance,14.763926
9,1289196715,POINT (121.0244582 14.5960002),Lakandili Street,11.002723
