In [1]:
### parameters
place = 'tel_aviv'
feature = 'sidewalk_width'

In [2]:
import pandas as pd
import geopandas as gpd
from geopandas import GeoDataFrame
from shapely.geometry import MultiLineString, LineString, Point


from tqdm import tqdm
import numpy as np
import os
from pathlib import Path
import warnings
warnings.filterwarnings(action='ignore')
crs_prj = 'EPSG:2039'




# Get the current working directory (e.g., the folder you're running from)
cwd = Path().resolve()

# Get the parent directory
parent_folder = f'{cwd.parent}/places/{place}'
data_folder = f'{parent_folder}/shp'
os.makedirs(f'{parent_folder}',exist_ok=True)
os.makedirs(f'{parent_folder}/shp',exist_ok=True)
os.makedirs(f'{parent_folder}/shp/{feature}',exist_ok=True)
detail_folder = f'{data_folder}/{feature}'

In [3]:
streets = gpd.read_file(f'{data_folder}/streets.shp')
street_edges = streets.copy()
no_sidewalks = gpd.read_file(f'{detail_folder}/no_sidewalks.shp')
sidewalks = gpd.read_file(f'{detail_folder}/sidewalks.shp')

In [5]:


def create_perpendicular_line(point, direction_vector, length=50, side='left'):
    dx, dy = direction_vector
    mag = np.hypot(dx, dy)
    if mag == 0:
        return None
    dx /= mag
    dy /= mag

    # Rotate ±90° to get perpendicular direction
    perp_dx, perp_dy = (-dy, dx) if side == 'left' else (dy, -dx)
    end_point = Point(point.x + perp_dx * length, point.y + perp_dy * length)
    return LineString([point, end_point])

def get_point_and_direction(line, fraction):
    point = line.interpolate(fraction, normalized=True)
    # Estimate direction using a small delta on the line
    delta = 0.001
    p1 = line.interpolate(max(fraction - delta, 0), normalized=True)
    p2 = line.interpolate(min(fraction + delta, 1), normalized=True)
    direction = np.array([p2.x - p1.x, p2.y - p1.y])
    return point, direction

# Generate side lines
side_lines = []

for idx, row in street_edges.iterrows():
    geom = row.geometry

    if not isinstance(geom, LineString) or geom.length == 0:
        print(f'{idx} is not a valid street edge')
        continue

    # Get 1/3 and 2/3 points and direction vectors
    for loc,fraction in {'start' :1/3,'middle':0.5,'end': 2/3}.items():
        point, direction = get_point_and_direction(geom, fraction)
        for side in ['left', 'right']:
            line = create_perpendicular_line(point, direction, side=side)
            if line:
                side_lines.append({
                    'oidrechov': row['oidrechov'],
                    'side':f'{loc}_{side}',
                    'geometry': line,
                })

# Convert to GeoDataFrame and save
side_lines_gdf = GeoDataFrame(side_lines, crs=street_edges.crs)
side_lines_gdf.to_file(f'{detail_folder}/side_lines_gdf0.shp', driver='ESRI Shapefile')

# Ensure each perpendicular line has a unique index
side_lines_gdf = side_lines_gdf.reset_index(drop=True)

# Spatial join: find which perpendicular lines intersect which street edges
joined = gpd.sjoin(side_lines_gdf, GeoDataFrame(geometry= street_edges.geometry.buffer(0.005), crs=crs_prj), how='left', predicate='intersects')
joined['line_index'] = joined.index

# Count how many street segments each perpendicular line intersects
intersection_counts = (
    joined.groupby('line_index')
    .size()
    .reset_index(name='count')
)
# Merge counts back to original
side_lines_gdf = side_lines_gdf.merge(intersection_counts, left_index=True, right_on='line_index', how='left').drop(columns=['line_index'])
side_lines_gdf.to_file(f'{detail_folder}/side_lines_gdf1.shp', driver='ESRI Shapefile') 

In [26]:
side_lines_gdf['count'].value_counts()

In [5]:
# Get all unique side labels from the 'side' column
side_keys = side_lines_gdf['side'].unique()

# Step 1: Spatial join between side lines and sidewalks
intersections = gpd.sjoin(side_lines_gdf, sidewalks, how='left').drop(columns=['Layer'])

# Step 2: Group intersections by line index
grouped = intersections.groupby(intersections.index)

# Step 3: Initialize results dictionary
side_line_results = {}

# Step 4: Process each side line
for line_id, group in grouped:
    line_row = side_lines_gdf.loc[line_id]
    side_label = line_row['side']
    oid = line_row['oidrechov']

    # Determine intersection result
    if pd.isna(group['index_right'].iloc[0]):
        res = 0
    else:
        count = line_row['count']
        if count > 1:
            res = -1
        elif len(group) == 1:
            res = group.iloc[0]['index_right']
        else:
            # Multiple intersections – choose the furthest from the start
            start_point = Point(line_row.geometry.coords[0])
            group = group.copy()
            # Fetch original sidewalk geometries using index_right
            group['sidewalk_geom'] = group['index_right'].apply(lambda idx: sidewalks.loc[idx, 'geometry'])
            # Compute distance from start point to each sidewalk
            group['dist'] = group['sidewalk_geom'].apply(lambda g: g.distance(start_point))
            res = group.sort_values('dist', ascending=False).iloc[0]['index_right']

    # Initialize entry if oid not in results
    if oid not in side_line_results:
        side_line_results[oid] = {key: None for key in side_keys}
        side_line_results[oid]['geometry'] = street_edges.loc[
            street_edges['oidrechov'] == oid, 'geometry'
        ].iloc[0]

    # Store result
    side_line_results[oid][side_label] = float(res) if res is not None else 0

# Step 5: Convert results to GeoDataFrame
records = []
for oid, values in side_line_results.items():
    record = {'oidrechov': oid, 'geometry': values['geometry']}
    for key in side_keys:
        record[key] = values.get(key)
    records.append(record)

gdf = GeoDataFrame(records, crs=crs_prj)
gdf.to_file(f'{detail_folder}/streets_sidewalks_mid.shp', driver='ESRI Shapefile')



In [47]:

# Extend line by 50m and buffer 1m
def extend_and_buffer(line, length=30, buffer_dist=1):
    if not isinstance(line, LineString) or len(line.coords) < 2:
        print('error in the geometry of the line')
        return None
    start, end = line.coords[0], line.coords[-1]
    dx, dy = end[0] - start[0], end[1] - start[1]
    norm = (dx**2 + dy**2) ** 0.5
    if norm == 0:
        return None
    dx /= norm
    dy /= norm
    new_end = (start[0] + dx * length, start[1] + dy * length)
    extended = LineString([start, new_end])
    return extended.buffer(buffer_dist)

# Apply to all perpendicular lines
buffered_perp_lines = side_lines_gdf.copy()
buffered_perp_lines['geometry'] = buffered_perp_lines['geometry'].apply(extend_and_buffer)
buffered_perp_lines = buffered_perp_lines.dropna(subset=['geometry'])
buffered_perp_lines.to_file(f'{detail_folder}/extended_line.shp', driver='ESRI Shapefile')
intersections = gpd.sjoin(buffered_perp_lines, no_sidewalks)

intersections['no_sidewalk_geom'] = intersections['index_right'].apply(lambda idx: no_sidewalks.loc[idx, 'geometry'])


side_fields = side_keys
side_fields_w = [side + '_w' for side in side_keys ]

gdf_widths = gdf.copy()

for side in side_fields:
    gdf_widths[side_fields_w ] = -1.0  # init with -1

# Pre-index: {(oid, side) → line geometry}
line_geom_lookup = {
    (row['oidrechov'], row['side']): row.geometry
    for idx, row in side_lines_gdf.iterrows()
}

# Pre-index: {(oid, side) → intersection GeoDataFrame}
intersection_groups = dict(tuple(intersections.groupby(['oidrechov', 'side'])))



# Process efficiently
for idx, row in tqdm(gdf.iterrows(), total=len(gdf), desc="Processing widths"):
    oid = row['oidrechov']
    values = [row[f] for f in side_fields]

    # Fast logic for all 0/-1 cases
    if all((v == 0) or (v == -1) for v in values):
        result_value = 0 if any(v == 0 for v in values) else -1
        for side in side_fields:
            gdf_widths.at[idx, f'{side}_w'] = result_value
        continue

    for side in side_fields:
        sid = row[side]
        if sid in [-1, 0]:
            continue

        # Lookup intersections
        group_key = (oid, side)
        if group_key not in intersection_groups:
            gdf_widths.at[idx, f'{side}_w'] = -1
            continue

        matching = intersection_groups[group_key]

        # Lookup line geometry
        line_geom = line_geom_lookup.get(group_key)
        if line_geom is None or line_geom.is_empty:
            gdf_widths.at[idx, f'{side}_w'] = -1
            continue

        start_point = Point(line_geom.coords[0])

        # Compute distances only once using vectorized apply
        matching = matching.copy()
        matching['dist_to_start'] = matching['no_sidewalk_geom'].apply(lambda g: g.distance(start_point))
        closest = matching.sort_values('dist_to_start').iloc[0]['no_sidewalk_geom']

        # Distance to sidewalk (true width)
        dist = sidewalks.loc[sid].geometry.distance(closest)
        gdf_widths.at[idx, f'{side}_w'] = dist

gdf_widths.to_file(f'{detail_folder}/gdf_widths.shp', driver='ESRI Shapefile')


In [48]:
gdf_widths

In [49]:


# Define left and right _w fields
left_fields = [f for f in gdf_widths.columns if f.endswith('_left_w')]
right_fields = [f for f in gdf_widths.columns if f.endswith('_right_w')]

def compute_side_final(values):
    values = [v for v in values if v != -1]
    if len(values) == 0:
        return -1
    if len(values) == 1:
        return values[0]
    if len(values) == 2:
        a, b = values
    else:
        # Pick two closest values
        values = sorted(values)
        best_pair = min(((a, b) for i, a in enumerate(values) for b in values[i+1:]), key=lambda x: abs(x[0] - x[1]))
        a, b = best_pair
    if abs(a - b) > 10:
        return min(a, b)
    return (a + b) / 2

# Apply to each row
def compute_final_columns(row):
    left_values = [row[f] for f in left_fields]
    right_values = [row[f] for f in right_fields]
    final_left = compute_side_final(left_values)
    final_right = compute_side_final(right_values)

    # Compute final
    if final_left == -1 and final_right == -1:
        final = -1
    elif final_left == -1:
        final = final_right
    elif final_right == -1:
        final = final_left
    else:
        final = (final_left + final_right) / 2
    return pd.Series({'final_left': final_left, 'final_right': final_right, 'final': final})

# Apply across DataFrame
gdf_widths[['final_left', 'final_right', 'final']] = gdf_widths.apply(compute_final_columns, axis=1)
gdf_widths.to_file(f'{detail_folder}/{feature}.shp',driver='ESRI Shapefile')

In [42]:
### THis code is for selecting sidewalks form my raw data - I don"t need to run it only for the first time:   ###
# -----------------------------------------------------------------------------
# Step 1: Read the 'translation' layer from a File Geodatabase
# -----------------------------------------------------------------------------
# This loads only the 'geometry' and 'Layer' columns from the specified layer
# within the ESRI File Geodatabase located at 'ASC/ASC.gdb'.
gdf = gpd.read_file('ASC/ASC.gdb', layer="translation")[['geometry', 'Layer']]

# -----------------------------------------------------------------------------
# Step 2: Define a function to drop the Z dimension from 3D MultiLineStrings
# -----------------------------------------------------------------------------
# This function takes a MultiLineString Z geometry and converts it to a
# 2D MultiLineString by removing the Z (elevation) component from each point.
# It assumes that the input is either a MultiLineString Z or already 2D.
def drop_z(geom):
    # Check if the geometry has a Z dimension
    if geom.has_z:
        # Rebuild each LineString within the MultiLineString, dropping the Z value
        return MultiLineString([
            LineString([(x, y) for x, y, z in line.coords])  # keep only X and Y
            for line in geom.geoms  # iterate over individual LineStrings
        ])
    else:
        # If the geometry is already 2D, return it unchanged
        return geom

# -----------------------------------------------------------------------------
# Step 3: Apply the drop_z function to each geometry in the GeoDataFrame
# -----------------------------------------------------------------------------
# This transforms the entire geometry column to 2D by removing Z values.
# The result is still a valid GeoDataFrame with MultiLineString geometries.
gdf['geometry'] = gdf['geometry'].apply(drop_z)
gdf = gdf.to_crs(crs_prj)
# -----------------------------------------------------------------------------
# Step 6: Export the translated geometries to a new shapefile
# -----------------------------------------------------------------------------
# The resulting GeoDataFrame is saved as a new shapefile named 'translation2.shp'
# in the 'files' directory. All attributes are preserved.
gdf.to_file(f'{data_folder}/itm_data.shp')

### YOU DONT NEED TO RUN THIS CELL ###
# -----------------------------------------------------------------------------
# Step 1: Filter sidewalk and non-sidewalk features based on the 'Layer' field
# -----------------------------------------------------------------------------
# Sidewalks are identified where 'Layer' == '2404'
# All other features are treated as non-sidewalks
sidewalks = gdf[gdf['Layer'] == '2404']
no_sidewalk = gdf[~(gdf['Layer'] == '2404')]
sidewalks.to_file(f'{data_folder}/sidewalks.shp')
### YOU DONT NEED TO RUN THIS CELL ###
# Optional: Save non-sidewalk features to a shapefile for inspection or use
no_sidewalk.to_file(f'{data_folder}/no_sidewalk.shp')