In [1]:
def initiate_path():
    path_temp = 'output'
    path_round_about_temp = f'{path_temp}/roundabout'
    path_network_temp = f'{path_temp}/networks'
    return path_temp, path_round_about_temp, path_network_temp

In [2]:
import geopandas as gpd
import osmnx as ox
from geopandas import GeoDataFrame, GeoSeries
from osmnx import io

path, path_round_about, path_network = initiate_path()
project_crs = 'epsg:3857'
from sklearn.cluster import DBSCAN
from shapely.geometry import Polygon, Point, LineString, MultiPolygon, MultiPoint
import math
import warnings

warnings.filterwarnings(action='ignore')
from momepy import remove_false_nodes,extend_lines

### Run only when download for first time

In [3]:
# Create graph from OSM server and project it
graph = ox.graph_from_place('Torino', network_type='all')
graph = ox.bearing.add_edge_bearings(graph, precision=1)
graph_pro = ox.projection.project_graph(graph, to_crs=project_crs)
io.save_graph_geopackage(graph_pro, filepath='.', encoding='utf-8', directed=False)

### End

In [4]:
# delete segments without names
my_gdf = gpd.read_file(f'{path}/edges.shp')
not_null = my_gdf.dropna(subset='name')
not_null.to_file(f'{path}/not_null.shp')
# delete segments present any kind of roundabout or tunnel
not_roundaout = not_null[~((not_null['junction'] == 'roundabout') | (not_null['junction'] == 'circular') | (
            not_null['tunnel'] == 'building_passage') | (not_null['tunnel'] == 'yes') | (
                                       not_null['highway'] == 'cycleway') | (not_null['highway'] == 'path'))]
round_about = not_null[not_null['junction'].isin(['roundabout', 'circular'])]
not_roundaout.to_file(f'{path}/not_roundabout_tunnel.shp')
round_about.to_file(f'{path_round_about}/roundabout.shp')
# Project dataframe and calculate angle (0 to 180)
df_pro = not_roundaout.to_crs(project_crs)
df_pro['angle'] = df_pro['bearing'].apply(lambda x: x if x < 180 else x - 180)
df_pro['length'] = df_pro.length
df_pro.to_file(f'{path_network}/pro.shp')

In [5]:
df_pro = gpd.read_file(f'{path_network}/pro.shp')

In [6]:
def length_of_parallel(my_s_join: GeoDataFrame, the_buffer: GeoSeries, geo_field: str) -> int:
    my_s_join['geometry'] = my_s_join[geo_field]
    new_data_0 = my_s_join.sjoin(GeoDataFrame(geometry=the_buffer, crs=project_crs), how='inner').reset_index()
    if len(new_data_0) == 0:
        return 0
    return len(new_data_0[new_data_0['index'] != new_data_0['index_right']])

In [7]:
def check_parallelism(to_translate: GeoDataFrame, is_test_local: bool = False) -> bool:
    my_buffer = to_translate['geometry'].buffer(cap_style=2, distance=30, join_style=3)
    to_translate['geometry_right'] = to_translate['geometry'].apply(lambda x: x.parallel_offset(35, 'right'))
    to_translate['geometry_left'] = to_translate['geometry'].apply(lambda x: x.parallel_offset(35, 'left'))
    # Currently is it not working
    if is_test_local:
        # to_translate.drop(columns= ['geometry', new_geometry[0]]).rename(columns = {new_geometry[1]: 'geometry'}).to_file(f'{path}/test_data/res_translate_{new_geometry[1]}.shp')
        # to_translate.drop(columns= ['geometry', new_geometry[1]]).rename(columns = {new_geometry[0]: 'geometry'}).to_file(f'{path}/test_data/res_translate_{new_geometry[0]}.shp')
        my_buffer.to_file(f'{path}/test_data/buffers_test.shp')
    if length_of_parallel(to_translate, my_buffer, 'geometry_right') > 0 or length_of_parallel(to_translate, my_buffer,                                                                                     'geometry_left') > 0:
        return True
    else:
        return False

In [8]:
def simplify(my_polygon: Polygon, x: (int, float)) -> Polygon:
    simplify_poly = my_polygon.simplify(x, preserve_topology=False)
    if simplify_poly.area < 50:
        simplify_poly = simplify(my_polygon, x / 2)
    return simplify_poly

In [9]:
def create_center_line(one_poly):
    """
    This method calculate new line between the farthest points of the simplified polygon
    :param one_poly:
    :return:
    """

    pnt_list = one_poly.exterior.coords[:-1]
    list_shp = [Point(item) for item in pnt_list]
    dis, dis_2, dis_3 = 0, 0, 0
    third_dis = (-1, -1)
    # The new line will be determined by the second-farthest points
    for k, point in enumerate(list_shp):
        j = k
        for point2 in list_shp[k + 1:]:
            j += 1
            temp_dis = point.distance(point2)
            if temp_dis > dis:
                dis = point.distance(point2)
            elif temp_dis > dis_2:
                dis_2 = point.distance(point2)
            elif temp_dis > dis_3:
                third_dis = (k, j)
                dis_3 = point.distance(point2)
    if is_test:
        dic_sim = {'index': [0], 'geometry': one_poly}
        GeoDataFrame(dic_sim, crs='epsg:3857').to_file(f'{path}/test_data/simplify_poly_{unit[0]}_{id_pol + 2}.shp')
    max_dist['name'].extend([id_pol + 2, id_pol + 2])
    max_dist['geometry'].extend([list_shp[third_dis[0]], list_shp[third_dis[1]]])

In [10]:
def update_df_with_center_line(new_line):
    """
    update our dictionary with new lines
    :param new_line:
    :return:
    """
    dic_final['name'].append(name)
    # dic_final['geometry'].append(LineString(coordinates=(pnt_list[max_dis[0]], pnt_list[max_dis[1]])))
    dic_final['geometry'].append(new_line)
    dic_final['highway'].append(data.iloc[0]['highway'])
    dic_final['bearing'].append(data['angle'].mean())
    dic_final['group'].append(unit[0])


In [11]:
def update_list(line_local):
    """
    add the first start/end point into the list
    :param line_local:
    :return:
    """
    list_pnts_of_line_group.extend([Point(line_local.coords[0]), Point(line_local.coords[-1])])


In [12]:
def add_more_pnts_to_new_lines(pnt_f_loc: Point, pnt_l_loc: Point, line_pnts: list) -> list:
    """
    This method checks if more points should be added to the new lines by checking along the new line if the distance to the old network roads are more than 10 meters
    :return:
    """
    # Calculate distance and azimuth between the first and last point
    dist = pnt_f_loc.distance(pnt_l_loc)
    x_0 = pnt_f_loc.coords[0][0]
    y_0 = pnt_f_loc.coords[0][1]
    bearing = math.atan2(pnt_l_loc.coords[0][0] - x_0, pnt_l_loc.coords[0][1] - y_0)
    bearing = bearing + 2 * math.pi if bearing < 0 else bearing
    # Calculate the number of  checks going to carry out
    loops = int(dist / 100)

    # Calculate  the first point over the line
    for dis_on_line in range(1, loops):
        x_new = x_0 + 100 * dis_on_line * math.sin(bearing)
        y_new = y_0 + 100 * dis_on_line * math.cos(bearing)
        # S_joins to all the network lines (same name and group)
        # if the distance is less than 10 meters continue, else: find the projection point and add it to the correct location and run the function agein
        one_pnt_df = GeoDataFrame(geometry=[Point(x_new, y_new)], crs=project_crs)
        s_join_loc = one_pnt_df.sjoin_nearest(data, distance_col='dis').iloc[0]
        if s_join_loc['dis'] > 10:
            pnt_med = s_join_loc['geometry']
            line = data.loc[s_join_loc['index_right']]['geometry']
            line_pnts.append(line.interpolate(line.project(pnt_med)))
            line_pnts = add_more_pnts_to_new_lines(pnt_med, pnt_l_loc, line_pnts)
            return line_pnts
    return line_pnts

In [15]:
# Main point to start

is_test = True
my_groupby = df_pro.groupby('name')
dic_final = {'name': [], 'geometry': [], 'highway': [], 'bearing': [], 'group': []}

for_time = len(my_groupby)
for i, street in enumerate(my_groupby):
    # Calculate time to run
    print(f'{round(i / for_time * 100, 2)}\t', end="")
    res = street[1]
    name = street[0]
    if is_test:
        name = 'Corso Giovanni Agnelli'
        res = my_groupby.get_group(name)
    # groupby angle
    res = res.dropna(subset=['angle'], axis=0)
    if len(res) == 0:
        continue
    res['group'] = DBSCAN(eps=5, min_samples=2).fit(res['angle'].to_numpy().reshape(-1, 1)).labels_
    cur_group = res[res['group'] > -1].groupby('group')
    is_parallel = False
    for group in cur_group:
        data = group[1]
        if check_parallelism(data, is_test):
            # if among of lines with same angles some are parallel,find the center line for each group
            is_parallel = True
            for unit in cur_group:
                data = unit[1]

                # new points DataFrame of start/end line of each group
                list_pnts_of_line_group = []
                data['geometry'].apply(update_list)
                df_pnts = GeoDataFrame(geometry=list_pnts_of_line_group, crs='epsg:3857').drop_duplicates()

                # unify lines to one polygon
                buffers = data.buffer(cap_style=3, distance=30, join_style=3)
                one_buffer = buffers.unary_union

                max_dist = {'name': [], 'geometry': []}
                # simplify polygon with simplify function. If one_buffer is multipolygon object simplify each one them separately
                if isinstance(one_buffer, MultiPolygon):
                    for id_pol, polygon in enumerate(one_buffer):
                        create_center_line(polygon)
                else:
                    id_pol = -1
                    create_center_line(one_buffer)
                max_df = GeoDataFrame(max_dist, crs='epsg:3857')
                # find for each points the closet point from the oribinal data. the closet points will create the new line
                s_join = max_df.sjoin_nearest(df_pnts).groupby('name')
                for geo in s_join:
                    same_name = geo[1]
                    if same_name.iloc[0]['index_right'] == same_name.iloc[1]['index_right']:
                        continue
                    in_0 = same_name.iloc[0]['index_right']
                    in_1 = same_name.iloc[1]['index_right']
                    # These points will be served to be initial reference in order to find more points
                    pnt_f = df_pnts.loc[in_0]['geometry']
                    pnt_l = df_pnts.loc[in_1]['geometry']
                    lines_pnt_geo = add_more_pnts_to_new_lines(pnt_f, pnt_l, [pnt_f])
                    lines_pnt_geo.append(pnt_l)
                    # Updata dic_final
                    update_df_with_center_line(LineString(lines_pnt_geo))
                    # dic_final['geometry'].append(LineString(lines_pnt_geo))
                    # new_lines['name'].append(geo[0])
                if is_test:
                    buffers.to_file(f'{path}/test_data/buffers_{unit[0]}.shp')
                    # dic_one = {'index':[0],'geometry':one_buffer.geoms}
                    # GeoDataFrame(dic_one,crs='epsg:3857').to_file(f'{path}/test_data/one_buffer_{unit[0]}.shp')
        # if one group is found as parallel all the groups are calculated as parallel so we don't need to check_parallelism
        if is_parallel:
            break
    # # טסט
    if is_test:
        final = GeoDataFrame(dic_final, crs=project_crs)
        final = final[final.length > 20]
        final.to_file(f'{path}/test_data/one_line.shp')
        res.to_file(f'{path}/test_data/groups.shp')
        break

if not is_test:
    print('create new files')
    # remove short lines
    final_cols = ['name', 'geometry', 'highway', 'bearing', 'length']
    final = GeoDataFrame(dic_final, crs=project_crs)
    final['is_simplify'] = 1
    final['length'] = final.length
    final = final[final.length > 100]
    final.to_file(f'{path}/one_line_third.shp')
    # create network
    new_network = df_pro[~df_pro['name'].isin(dic_final['name'])][final_cols]
    new_network['is_simplify'] = 0
    new_network.append(final).to_file(f'{path_network}/new_network_third.shp')

0.0	

Intersection

Split in intersection

In [12]:
class Intersection:
    def __init__(self,network):
        self.my_network = network
        self.inter_pnt_dic = {'geometry':[],'name':[]}
        self.lines_to_delete =[]
    def delete_false_intersection(self):
        # First clean all the false node
        self.my_network = remove_false_nodes(self.my_network)
        # the previous function has changed the topology so the length should be updated
        self.my_network['length'] =self.my_network.length
        self.my_network.to_file(f'{cur_path}/remove_false_nodes.shp')

    def intersection_network(self):
        # Create buffer around each element
        buffer_around_lines= self.my_network['geometry'].buffer(cap_style=3, distance=1, join_style=3)
        buffer_around_lines.to_file(f'{cur_path}/buffer.shp')

        # s_join between buffer to lines
        s_join_0 =gpd.sjoin(left_df=GeoDataFrame(geometry=buffer_around_lines,crs=project_crs),right_df=self.my_network)

        # delete lines belong to the buffer
        s_join = s_join_0[s_join_0.index!=s_join_0['index_right']]
        s_join.to_file(f'{cur_path}/s_join.shp')

        # Find new intersections that are not at the beginning or end of the line

        s_join.apply(self.__find_intersection_points, axis=1)
        inter_pnt_gdf = GeoDataFrame(self.inter_pnt_dic,crs=project_crs)
        inter_pnt_gdf.to_file(f'{cur_path}/inter_pnt.shp')

        # Split string line by points
        segments = {'geometry':[],'org_id':[]}
        # Groupby points name (which is the line they should split)
        for group_pnts in inter_pnt_gdf.groupby('name'):
            points  = group_pnts[1]
            points['is_split'] = True

            # get the line to split by comparing the name
            row = self.my_network.loc[group_pnts[0]]
            current = list(row.geometry.coords)
            points_line = [Point(x) for x in current]
            points_line_gdf = GeoDataFrame(geometry=points_line,crs=project_crs)
            points_line_gdf['is_split'] = False

            # append all the points together (line points and split points)
            line_all_pnts = points_line_gdf.append(points)

            # Find the distance of each point form the begining of the line on the line.
            line_all_pnts['dis_from_the_start'] = line_all_pnts['geometry'].apply(lambda x:row.geometry.project(x))
            line_all_pnts.sort_values('dis_from_the_start',inplace=True)

            # split the line
            seg =[]
            for point in line_all_pnts.iterrows():
                prop = point[1]
                seg.append(prop['geometry'])
                if prop['is_split']:
                    segments['geometry'].append(LineString(seg))
                    segments['org_id'].append(row.name)
                    seg = [prop['geometry']]
            segments['geometry'].append(LineString(seg))
            segments['org_id'].append(row.name)
        network_split = GeoDataFrame(data=segments,crs=project_crs)
        cols_no_geometry = self.my_network.columns[:-1]
        network_split_final = network_split.set_index('org_id')
        network_split_final[cols_no_geometry] =self.my_network[cols_no_geometry]
        network_split_final.to_file(f'{cur_path}/only_split.shp')
        # remove old and redundant line from our network and update with new one
        network_split =self.my_network.drop(index=network_split_final.index.unique()).append(network_split_final).drop(index= self.lines_to_delete)
        network_split['length'] = network_split.length
        self.my_network = network_split
        self.my_network.to_file(f'{cur_path}/split.shp')

    def __find_intersection_points(self,row):
        r"""
        find the intersection points between the two lines
        :param row:
        :return:
        """
        try:
            line_1 = self.my_network.loc[row.name]
            line_2 =  self.my_network.loc[row['index_right']]
            pnt = line_1.geometry.intersection(line_2.geometry)
            # If there are more than one intersection between two lines, one of the lines should be deleted.
            if isinstance(pnt,MultiPoint):
                temp_line= line_1.name if line_1.length< line_2.length else line_2.name
                if temp_line not in self.lines_to_delete:
                    self.lines_to_delete.append(temp_line)
                return
            # If it is first or end continue OR if there is no intersection between the two lines
            if len(pnt.coords)==0 or pnt.coords[0]==line_1.geometry.coords[0] or pnt.coords[0]==line_1.geometry.coords[-1]:
                return
            self.inter_pnt_dic['geometry'].append(pnt)
            self.inter_pnt_dic['name'].append(row.name)
        except:
            print(f"{row.name},{row['index_right']}:{pnt}")


In [4]:
cur_path = f'{path}/rearrange'
new_gpd = gpd.read_file(f'{path_network}/new_network_third.shp')
obj_intersection = Intersection(new_gpd)
obj_intersection.delete_false_intersection()
obj_intersection.intersection_network()

Roundabout

In [5]:
class EnvEntity:
        def __init__(self,network):
           self.pnt_dic = {}
           self.first_last_dic = {'geometry': [], 'line_name': [], 'position': []}
           self.network = network

        def __populate_pnt_dic(self,point: type, name_of_line: str):
            """
            Make "pnt_dic" contain a list of all the lines connected to each point.
            :param point:
            :param name_of_line:
            :return:
            """
            if not point in self.pnt_dic:
                self.pnt_dic[point] = []
            self.pnt_dic[point].append(name_of_line)

        def __send_pnts(self,temp_line: GeoSeries):
            """
            # Send the first and the last points to populate_pnt_dic
            :return:
            """
            my_geom = temp_line['geometry']
            self.__populate_pnt_dic(my_geom.coords[0], temp_line.name)
            self.__populate_pnt_dic(my_geom.coords[-1], temp_line.name)

        def get_deadend_gdf(self,delete_short:int =0)-> GeoDataFrame:
            self.network.apply(self.__send_pnts, axis=1)
            deadend_list = [item[1][0] for item in self.pnt_dic.items() if len(item[1]) == 1]
            if delete_short>0:
                # If it is necessary to eliminate dead-end short segments, it is  important to delete them from the network geodataframe.
                deadend_gdf =self.network.loc[deadend_list]
                self.network.drop(index=deadend_gdf[deadend_gdf.length<delete_short].index,inplace=True)
                return deadend_gdf[deadend_gdf.length>delete_short]
            return self.network.loc[deadend_list]

        def update_the_current_network(self,temp_network):
            r"""
            Update the current network in the new changes
            :param temp_network:
            :return:
            """
            new_network_temp = self.network.drop(index=temp_network.index)
            self.network = new_network_temp.append(temp_network)
            self.network['length'] = self.network.length
            self.network  = self.network[self.network['length']>1]

In [6]:
class Roundabout(EnvEntity):
    def __init__(self,network: GeoDataFrame):
       EnvEntity.__init__(self,network)
       self.pnt_dic ={}
       self.centroid =self.__from_roundabout_to_centroid()
       self.network.rename(columns={'name': 'str_name'}, inplace=True)
    def __from_roundabout_to_centroid(self):
        # Find the center of each roundabout
        # create polygon around each polygon and union
        round_about = gpd.read_file(f'{path_round_about}/roundabout.shp')
        round_about_buffer = round_about.to_crs(project_crs)['geometry'].buffer(cap_style=1, distance=10,
                                                                                join_style=1).unary_union
        dic_data = {'name': [], 'geometry': []}
        for ii, xx in enumerate(round_about_buffer):
            dic_data['name'].append(ii)
            dic_data['geometry'].append(xx.centroid)
        centroid =GeoDataFrame(dic_data, crs=project_crs)
        centroid.to_file(f'{path_round_about}/centroid.shp')
        return centroid
        # GeoDataFrame(dic_data,crs=project_crs).to_file(f'{path_round_about}/roundabout_union.shp')

    def __first_last_pnt_of_line(self,row: GeoSeries):
        r"""
        It get geometry of line and fill the first_last_dic with the first and last point and the name of the line
        :return:
        """
        geo = list(row['geometry'].coords)
        self.first_last_dic['geometry'].extend([Point(geo[0]), Point(geo[-1])])
        self.first_last_dic['line_name'].extend([row.name] * 2)
        self.first_last_dic['position'].extend([0, -1])
    def deadend(self,path_to_save: str, export_files: bool = False):
        r"""
        remove not connected line shorter than 100 meters and then return deadend_list lines and their endpoints (as another file)
        :param path_to_save:
        :param export_files:
        :return:
        """
        # Find the first and last points

        # Get deadend_gdf
        deadend_gdf = self.get_deadend_gdf()

        # Create gdf of line points with the reference to the line they belong
        deadend_gdf.apply(self.__first_last_pnt_of_line, axis=1)
        first_last_gdf = GeoDataFrame(self.first_last_dic, crs=project_crs)

        if export_files:
            # Optional - Create new file
            temp_list = {}
            temp_list ['geometry'] = [Point(x) for x in self.pnt_dic.keys()]
            temp_list ['lines']  = [str(x) for x in self.pnt_dic.values()]
            GeoDataFrame(temp_list,crs=project_crs).to_file(f'{path_deadend}/line_pnts.shp')
            first_last_gdf.to_file(f'{path_to_save}/first_last_pnts.shp')
            deadend_gdf.to_file(f'{path_to_save}/deadend_gdf_all.shp')

        return deadend_gdf, first_last_gdf
    def __update_geometry(self,cur,s_join):
        r"""
        :return:
        """
        if cur['highway'] == 'footway':
            # Don't snap footway to roundabout
            return cur['geometry']
        points_lines = s_join[s_join['line_name'] == cur.name]
        if len(points_lines) == 0:
            # No roundabout nearby
            return cur['geometry']
        # get the line geometry to change the first and/ or last point
        geo_cur = list(cur['geometry'].coords)

        if len(points_lines) == 1:
            # One line point near roundabout
            geo_cur[points_lines['position'].item()] = self.centroid.loc[points_lines['index_right'].item()]['geometry'].coords[
                0]
        if len(points_lines) == 2:
            # two line points near roundabout
            for ind in range(2):
                points_line = points_lines.iloc[ind]
                geo_cur[points_line['position'].item()] =self.centroid.loc[points_line['index_right'].item()]['geometry'].coords[0]
        return LineString(geo_cur)
    def my_spatial_join(self,deadend_lines, deadend_pnts):
        # Spatial join between roundabout centroid to nearby dead end lines
        # centroid = gpd.read_file(f'{path_round_about}/centroid.shp')
        s_join = gpd.sjoin_nearest(left_df=deadend_pnts, right_df=self.centroid, how='left', max_distance=60,
                                   distance_col='dist').dropna(subset='dist')
        s_join.to_file(f'{path_deadend}/roundabout_near_pnts.shp')
        # Update the geometry so the roundabout will be part of the line geometry
        change_geo = deadend_lines.copy()

        change_geo['geometry'] = change_geo.apply(lambda x:self.__update_geometry(x,s_join), axis=1)
        change_geo.reset_index().to_file(f'{path_deadend}/connect_roundabout.shp')
        return change_geo


In [19]:
# Network Improvement - to roundabout
# Find unconnected lines
# OLD CODE -  my_roundabout=Roundabout(obj_intersection.my_network.copy().reset_index())
exist_data = gpd.read_file(f'{path_network}/new_network_third.shp')
my_roundabout=Roundabout(exist_data.reset_index())
path_deadend = f'{path}/deadend'

deadend_lines, deadend_pnts = my_roundabout.deadend(path_to_save=path_deadend,export_files=True)
deadend_lines.iloc[2262]

index                                                     10860
str_name                                Piazza Giuseppe Perotti
highway                                             residential
bearing                                                   334.2
length                                                47.201322
is_simplif                                                    0
group                                                       NaN
geometry      LINESTRING (851804.0976559754 5634581.04569036...
Name: 10860, dtype: object

In [None]:
# update the current network
change_geo = my_roundabout.my_spatial_join(deadend_lines, deadend_pnts)
my_roundabout.update_the_current_network(change_geo)
my_roundabout.network.to_file(f'{path_network}/network_ra.shp')

## TEST AREA

In [17]:

s_join_test = gpd.sjoin_nearest(left_df=deadend_pnts, right_df=my_roundabout.centroid, how='left', max_distance=60,
                           distance_col='dist').dropna(subset='dist')

In [21]:
cur = deadend_lines.iloc[2262]
points_lines = s_join_test[s_join_test['line_name'] == cur.name]
geo_cur = list(cur['geometry'].coords)
points_lines

Unnamed: 0,geometry,line_name,position,index_right,name,dist
4524,POINT (851804.098 5634581.046),10860,0,37.0,37.0,58.478351
4525,POINT (851783.704 5634623.298),10860,-1,37.0,37.0,44.368053


In [None]:


if cur['highway'] == 'footway':
    # Don't snap footway to roundabout
    print('footway')
points_lines = s_join_test[s_join_test['line_name'] == cur.name]
if len(points_lines) == 0:
    # No roundabout nearby
    print(cur['geometry'])
# get the line geometry to change the first and/ or last point
geo_cur = list(cur['geometry'].coords)

if len(points_lines) == 1:
    # One line point near roundabout
    geo_cur[points_lines['position'].item()] = my_roundabout.centroid.loc[points_lines['index_right'].item()]['geometry'].coords[
        0]
if len(points_lines) == 2:
    # two line points near roundabout
    for ind in range(2):
        points_line = points_lines.iloc[ind]
        geo_cur[points_line['position'].item()] =my_roundabout.centroid.loc[points_line['index_right'].item()]['geometry'].coords[0]


## TEST AREA


## things to improve the code - Roundabout :
1. Snap all lines near roundabout to the roundabout
2. Replace the nearest points on the line with the roundabout (and not the end points)
3. Till 50 meters - connect. 50-100 meters - only if it is not cross nay other line

In [11]:
# Extend
new_network2 =gpd.read_file(f'{path_network}/network_ra.shp')
extend_lines_f= extend_lines(new_network2,100)
extend_lines_f['length'] = extend_lines_f.length
extend_lines_f.to_file(f'{cur_path}/extend_lines.shp')

things to do:
1. there are some bugs
2. more efforts to connect unconnected lines

In [13]:
cur_path = f'{path}/finals'
obj_intersection2 = Intersection(extend_lines_f)
obj_intersection2.delete_false_intersection()
obj_intersection2.intersection_network()


7449,8489:GEOMETRYCOLLECTION (LINESTRING (852163.1978101947 5629603.470719143, 852163.1978101946 5629603.470719143), POINT (852182.3919685575 5629598.286382937))
8489,7449:GEOMETRYCOLLECTION (LINESTRING (852163.1978101947 5629603.470719143, 852163.1978101946 5629603.470719143), POINT (852182.3919685575 5629598.286382937))


In [31]:
# Clear short segments
final = EnvEntity(obj_intersection2.my_network.reset_index())
final.update_the_current_network(final.get_deadend_gdf(delete_short=30))
final.network.to_file(f'{path_network}/final.shp')