In [9]:
## Run when initialise the code
import os

import geopandas as gpd
import osmnx as ox
from geopandas import GeoDataFrame, GeoSeries
from osmnx import io

from pandas import DataFrame
project_crs = 'epsg:3857'
from sklearn.cluster import DBSCAN
from shapely.geometry import  Point, LineString, MultiPolygon, MultiPoint
import math
import warnings
import pandas as pd
from general_functions import *

from tqdm import tqdm
import time
import pickle
warnings.filterwarnings(action='ignore')
from momepy import remove_false_nodes,extend_lines
pjr_loc = os.path.dirname(os.getcwd())
import ast # to convert str with list to list of string
from itertools import combinations
import numpy as np
from math import log2
def get_five_largest_streets(simplified_network:GeoDataFrame,name_col:str,path:str):
    """
    FOR TEST PURPOSES - get the five largest streets
    :param simplified_network:
    :param name_col: the column that stores the street name
    :param path: the file name to store the longest street
    :return:
    """
    # Dissolve the GeoDataFrame based on 'street_id' to aggregate polylines of the same street
    dissolved_gdf = simplified_network[simplified_network['is_simplified']==1].dissolve(by=name_col)
    dissolved_gdf['street_length'] = dissolved_gdf['geometry'].length
    dissolved_gdf.sort_values(by='street_length',ascending=False).head(5).to_file(f'{path}.shp')

In [12]:
## Run when initialise the code
# In this example, the data is extracted from OSM by specifying a location's name, but you can also download data using a specified polygon. The code is designed to handle multiple polygons or location names seamlessly.

# Download data from OpenStreetMap, project it, and convert it to a GeoDataFrame. OSMnx automatically resolves topology errors and retrieves only the street-related polylines.

place = 'San_Francisco__California'
print(place)
data_folder =  f'{pjr_loc}/places/{place.replace(",","_").replace(" ","_")}'
data_folder_test  = f'{data_folder}/test/simplification'
os.makedirs(f'{data_folder}/delete_2_nodes',exist_ok = True)
os.makedirs(f'{data_folder}/split_tp_intersection',exist_ok = True)
data_folder

In [13]:
df_pro = gpd.read_file(f'{data_folder}/before_df.shp')

In [14]:
# region
# Functions and classes to be utilized - Module 2
def check_parallelism(to_translate: GeoDataFrame) -> bool:
    # See if there are parallel lines
    my_buffer = to_translate['geometry'].buffer(cap_style=2, distance=30, join_style=3)
    to_translate['geometry_right'] = to_translate['geometry'].apply(lambda x: x.parallel_offset(35, 'right'))
    to_translate['geometry_left'] = to_translate['geometry'].apply(lambda x: x.parallel_offset(35,
                                                                                               'left'))  # we need to offset by both sides since the parallel lines could be in opposite directions

    def is_parallel(my_s_join: GeoDataFrame, the_buffer: GeoSeries, geo_field: str):
        my_s_join['geometry'] = my_s_join[geo_field]
        new_data_0 = my_s_join.sjoin(GeoDataFrame(geometry=the_buffer, crs=project_crs), how='inner').reset_index()
        if not len(new_data_0):
            return False
        new_data_1 = new_data_0[
            new_data_0['index'] != new_data_0['index_right']]  # Remove overlay of polylines with its buffer
        for translated_line in new_data_1.iterrows():
            translated_line = translated_line[1]
            geo_tr_line = GeoDataFrame(data=pd.DataFrame([translated_line]), crs=project_crs)
            overlay = gpd.overlay(geo_tr_line, GeoDataFrame(geometry=the_buffer.loc[geo_tr_line['index_right']],
                                                            crs=project_crs), how='intersection')
            if (overlay.length / translated_line.length).iloc[0] * 100 > 10:
                return True
        return False

    if is_parallel(to_translate, my_buffer, 'geometry_right'):
        return True
    else:
        if is_parallel(to_translate, my_buffer, 'geometry_left'):
            return True
        else:
            return False


def create_center_line(one_poly):
    """
    This method calculate new line between the farthest points of the simplified polygon
    :param one_poly:
    :return:
    """
    lines_in_buffer = data.sjoin(GeoDataFrame(geometry=[one_poly], crs=project_crs)).drop(columns='index_right')

    list_pnts_of_line_group = []

    def update_list(line_local):
        """
        add the first start/end point into the list
        :param line_local:
        :return:
        """
        list_pnts_of_line_group.extend([Point(line_local.coords[0]), Point(line_local.coords[-1])])

    # Get the start/end points of these polylines
    lines_in_buffer['geometry'].apply(update_list)

    # Find all the unidirectional combinations between each two pair of points
    point_combinations = list(combinations(list_pnts_of_line_group, 2))

    # Save it into DataFrame frame and calculate distance
    df_test = DataFrame()
    df_test['point_1'] = [pair[0] for pair in point_combinations]
    df_test['point_2'] = [pair[1] for pair in point_combinations]
    df_test['dist'] = df_test.apply(lambda x: x['point_1'].distance(x['point_2']), axis=1)

    # Calculate  angle (0 and 180)
    # Calculate angle using vectorized operations
    # Vectorized angle calculation using NumPy
    dx = df_test['point_2'].apply(lambda p: p.x) - df_test['point_1'].apply(lambda p: p.x)
    dy = df_test['point_2'].apply(lambda p: p.y) - df_test['point_1'].apply(lambda p: p.y)
    df_test['angle'] = np.degrees(np.arctan2(dy, dx))
    df_test['angle'] = np.where(df_test['angle'] > 0, df_test['angle'], df_test['angle'] + 180)

    # Calculate the best two points by looking on their distance and angle. we compare the angle to the polylines angles. The angle has less important so the reason for 0.5
    avg = lines_in_buffer['angle'].mean()
    dis = abs(df_test['angle'] - avg)
    df_test['ratio'] = df_test['dist'] / df_test['dist'].max() + 0.5 * dis / dis.max()
    max_points = df_test.sort_values(by='ratio', ascending=False).iloc[0]

    # These points will be served to be initial reference in order to find more points
    pnt_f = max_points['point_1']
    pnt_l = max_points['point_2']

    angl_rng = lines_in_buffer['angle'].max() - lines_in_buffer['angle'].min()
    if angl_rng < 1:  # If the angel range is less than 1 degree the line will be based on the first and last points
        lines_pnt_geo = [pnt_f]
    else:
        if angl_rng > 100:  # Maximum of length to check is every 10 meters
            length_to_check = 10
        else:
            length_to_check = 75 - log2(
                angl_rng) * 10  # The range of  length_to_check (logarithm to create more changes at the beginning)
        lines_pnt_geo = add_more_pnts_to_new_lines(pnt_f, pnt_l, [pnt_f], length_to_check, lines_in_buffer)
    lines_pnt_geo.append(pnt_l)
    # Update dic_final
    return lines_pnt_geo


def add_more_pnts_to_new_lines(pnt_f_loc: Point, pnt_l_loc: Point, line_pnts: list, lngth_chck: float,
                               test_poly: GeoDataFrame) -> list:
    """
    This method checks if more points should be added to the new lines by checking along the new line if the distance to the old network roads are more than 10 meters
    :param test_poly: From these polylines find the closet one in each interation
    :param lngth_chck: Used latter to find how many checks should be done
    :return:
    """
    # Calculate distance and azimuth between the first and last point
    dist = pnt_f_loc.distance(pnt_l_loc)
    x_0 = pnt_f_loc.coords[0][0]
    y_0 = pnt_f_loc.coords[0][1]
    bearing = math.atan2(pnt_l_loc.coords[0][0] - x_0, pnt_l_loc.coords[0][1] - y_0)
    bearing = bearing + 2 * math.pi if bearing < 0 else bearing
    # Calculate the number of  checks going to carry out
    loops = int(dist / lngth_chck)
    # Calculate  the first point over the line
    for dis_on_line in range(1, loops):
        x_new = x_0 + lngth_chck * dis_on_line * math.sin(bearing)
        y_new = y_0 + lngth_chck * dis_on_line * math.cos(bearing)
        # S_joins to all the network lines (same name and group)
        # if the distance is less than 10 meters continue, else: find the projection point and add it to the correct location and run the function agein
        one_pnt_df = GeoDataFrame(geometry=[Point(x_new, y_new)], crs=project_crs)
        s_join_loc = one_pnt_df.sjoin_nearest(test_poly, distance_col='dis').iloc[0]

        if s_join_loc['dis'] > 10:
            line = data.loc[s_join_loc['index_right']]['geometry']
            pnt_med = line.interpolate(line.project(s_join_loc['geometry']))
            if pnt_med.distance(pnt_f_loc) < 10:  # Otherwise the code may stack in endless loops
                continue
            line_pnts.append(pnt_med)
            line_pnts = add_more_pnts_to_new_lines(pnt_med, pnt_l_loc, line_pnts, lngth_chck, test_poly)
            return line_pnts
    return line_pnts


def update_df_with_center_line(new_line, is_simplified=0, group_name=-1):
    """
    update our dictionary with new lines
    :param is_simplified:
    :param new_line:
    :param group_name: According to the DBSCAN algorithm, if no =-1
    :return:
    """
    dic_final['name'].append(name)
    # dic_final['geometry'].append(LineString(coordinates=(pnt_list[max_dis[0]], pnt_list[max_dis[1]])))
    dic_final['geometry'].append(new_line)
    dic_final['highway'].append(data.iloc[0]['highway'])
    dic_final['bearing'].append(data['angle'].mean())
    dic_final['group'].append(group_name)
    dic_final['is_simplified'].append(is_simplified)


# Function to calculate circular_distance
def circular_distance(angle1, angle2):
    diff = np.abs(angle1 - angle2) % 180
    return np.minimum(diff, 180 - diff)


# Initiate dic_final here for @def update_df_with_center_line
dic_final = {'name': [], 'geometry': [], 'highway': [], 'bearing': [], 'group': [], 'is_simplified': []}
# endregion

In [31]:
# this code run only on the street spacified  in @street_name
street_name = 'Kongahällavägen'
print(street_name)
# Make sure @dic_final is empty before execute this code
dic_final= {key: [] for key in dic_final}
street= df_pro.groupby('name').get_group(street_name) # group the street segments by street name
for_time = len(street)

smplfcton_fldr = f'{data_folder}/simplification/streets/'
number_of_parallel = 0 # count the number of polylines were refined

res = street# it holds all the streets
name = street_name
# Remove segments without angle. If less than two segments being left move to the next group.
res = res.dropna(subset=['angle'], axis=0)
if len(res) < 2:
    data = res
    _  = res['geometry'].apply(lambda x:update_df_with_center_line(x))
# Use DBSCAN to classify streets based on their angle, and group each class. Outliers could not consider parallel with any street, thus removed
res['group'] = DBSCAN(eps=10, min_samples=2).fit(res['angle'].to_numpy().reshape(-1, 1)).labels_

# if all is -1, don't touch the element
if (res['group']== -1).all():
    data = res
    _  = res['geometry'].apply(lambda x:update_df_with_center_line(x))
# cur_group = res[(res['group'] > -1) | (res.length>20)].groupby('group') # Remove short segments with -1 classification values
# The parallel test is on street segments that  have the same name and belong to the same angle group.
for group in res.groupby('group'):
    data = group[1]
    if group[0] ==-1: # No need to check if is parallel
        _  = data['geometry'].apply(lambda x:update_df_with_center_line(x))
        continue
    if check_parallelism(data.copy()):
        # print(group[0])
        # Remove unimportant streets which appear more than 10% in the group
        min_num_of_polylines = len(data) / 15
        # Use a single boolean condition for filtering
        condition = (data['highway'].isin(['service','unclassified'])) & (data.groupby('highway')['highway'].transform('count') <= min_num_of_polylines)
        data = data[~condition]

        number_of_parallel+=len(data) # Update the number of parallel polylines

        # unify lines to one polygon
        buffers = data.buffer(cap_style=3, distance=30, join_style=3)
        one_buffer = buffers.unary_union
        # simplify polygon with simplify function. If one_buffer is multipolygon object simplify each one them separately
        if isinstance(one_buffer, MultiPolygon):
            for polygon in one_buffer:
                lines_pnt_geo_final = create_center_line(polygon)
                update_df_with_center_line(LineString(lines_pnt_geo_final),1,group[0])
        else:
            lines_pnt_geo_final =create_center_line(one_buffer)
            # Update dic_final
            update_df_with_center_line(LineString(lines_pnt_geo_final),1,group[0])

    else:
        _  = data['geometry'].apply(lambda x:update_df_with_center_line(x))

print(number_of_parallel)
print('create new files')
# remove short lines
final_cols = ['name', 'geometry', 'highway', 'bearing', 'length']
new_network = GeoDataFrame(dic_final, crs=project_crs)
new_network['lenght']= new_network.length
# create network
new_network.to_file(f'{smplfcton_fldr}{street_name}.shp')