In [None]:
#import all the packages
import time
import osmnx as ox
import networkx as nx
from shapely.geometry import Point
from shapely.geometry import MultiPolygon
from shapely.geometry import Polygon
from shapely.ops import nearest_points
ox.settings.log_console=True
ox.settings.use_cache=True
import pandas as pd
import geopandas as gpd
import contextily
from IPython.display import GeoJSON
import math

In [None]:
#import the road networks & dataset
bxlzone=['Brussels, Belgium', 'Anderlecht, Belgium', 'Auderghem, Belgium', 'Berchem-Sainte-Agathe, Belgium', 'Etterbeek, Belgium', 'Evere, Belgium', 'Forest, Belgium', 'Ganshoren, Belgium', 'Ixelles, Belgium', 'Jette, Belgium', 'Koekelberg, Belgium', 'Molenbeek-Saint-Jean, Belgium', 'Saint-Gilles, Belgium', 'Saint-Josse-ten-Noode, Belgium', 'Schaerbeek, Belgium', 'Uccle, Belgium', 'Watermael-Boitsfort, Belgium', 'Woluwe-Saint-Lambert, Belgium', 'Woluwe-Saint-Pierre, Belgium']
wka=ox.geometries_from_place(bxlzone,tags={'leisure':['park'],'nature':True,"highway": ["street_lamp","traffic_signals"],
                                          "amenity": ["bench", 'toilets',"shower", 'parcel_locker']})
bxl = pd.read_csv(r"D:\Postgrad study source\Master thesis\final\BXLwalk\bxl_walk_walking.csv", delimiter=',', index_col=False)
G=ox.graph.graph_from_place(bxlzone, network_type='walk', simplify=True, retain_all=False, truncate_by_edge=False, which_result=None, buffer_dist=None, clean_periphery=True, custom_filter=None)

In [None]:
#retrieve the start and end coordinates
slat=[lat for lat in bxl['start_lat']]
slon=[lon for lon in bxl['start_lon']]
elat=[lat for lat in bxl['end_lat']]
elon=[lon for lon in bxl['end_lon']]

In [None]:
#Scenery POIs retrieve and thereof calculation

scenery = ox.geometries_from_place(bxlzone, tags={'leisure':['park'],'nature':True})
scenery_list = scenery["name"].tolist()
park_names=scenery_list
park_nodes = []


# loop over park names
# loop over park names
for name in park_names:
    # check if name is NaN
    if isinstance(name, float) and math.isnan(name):
        # retrieve the first coordinates instead
        first_coord = park_gdf["geometry"].iloc[0].exterior.coords[0]
        park_node = [(first_coord[1], first_coord[0])]
    else:
        # geocode park name to a GeoDataFrame
        park_gdf = ox.geocode_to_gdf(name)
        # get the polygon of the park
        park_poly = park_gdf["geometry"].iloc[0]
        # extract the minimum bounding rectangle of the park polygon
        park_mbr = park_poly.minimum_rotated_rectangle
        # convert the MBR into a set of four nodes
        park_node = [(park_mbr.exterior.coords[i][1], park_mbr.exterior.coords[i][0]) for i in range(4)]
    park_nodes.append(park_node)

park_flattened_list = [item for sublist in park_nodes if sublist is not None for item in sublist]
#Scenery Nodes
scenery_nodes = []
for coord in park_flattened_list:
    scenery_node = ox.distance.nearest_nodes(G, coord[1], coord[0])
    scenery_nodes.append(scenery_node)

#Scenery Edges  
import time

scenery_egs = []

start_time = time.time()

for coord in park_flattened_list:
    scenery_eg = ox.distance.nearest_edges(G, coord[1], coord[0], interpolate=1, return_dist=False)
    scenery_egs.append(scenery_eg)

end_time = time.time()
elapsed_time = end_time - start_time

print(f"Elapsed time: {elapsed_time} seconds")
# count how many times each edge appears in the list
scenery_edge_count = {}
for edge in scenery_egs:
    if edge in scenery_edge_count:
        scenery_edge_count[edge] += 1
    else:
        scenery_edge_count[edge] = 1
scenery_edge_count

In [None]:
# light Visibility POIs retrieve and thereof calculation
lv = ox.geometries_from_place(bxlzone, tags={"highway": ["street_lamp"]})
lv_coords_list = []

for geom in lv.geometry:
    if isinstance(geom, Polygon):
        lv_coords_list.append(list(geom.centroid.coords)[0])
    elif isinstance(geom, Point):
        lv_coords_list.append((geom.x, geom.y))
#lv nodes
lv_nodes = []
for coord in lv_coords_list:
    lv_node = ox.distance.nearest_nodes(G, coord[0], coord[1])
    lv_nodes.append(lv_node1)
#lv edges
lv_egs = []

start_time = time.time()

for coord in lv_coords_list:
    lv_eg = ox.distance.nearest_edges(G, coord[0], coord[1], interpolate=1, return_dist=False)
    lv_egs.append(lv_eg)

end_time = time.time()
elapsed_time = end_time - start_time

print(f"Elapsed time: {elapsed_time} seconds")
lv_egs

In [None]:
# Traffic Signal POIs retrieve and thereof calculation
ts= ox.geometries_from_place(bxlzone, tags={"highway": ["traffic_signals"]})
ts_coords_list = []
for geom in ts.geometry:
    if isinstance(geom, Polygon):
        lv_coords_list.append(list(geom.centroid.coords[0]))
    elif isinstance(geom, Point):
        ts_coords_list.append((geom.x, geom.y))
#ts nodes
ts_nodes = []
for coord in ts_coords_list:
    ts_node = ox.distance.nearest_nodes(G, coord[0], coord[1])
    ts_nodes.append(ts_node)

#ts edges
ts_egs = []

start_time = time.time()

for coord in ts_coords_list:
    ts_eg = ox.distance.nearest_edges(G, coord[0], coord[1], interpolate=1, return_dist=False)
    ts_egs.append(ts_eg)

end_time = time.time()
elapsed_time = end_time - start_time

print(f"Elapsed time: {elapsed_time} seconds")
ts_egs

In [None]:
# Toilet & Bench POIs retrieve and thereof calculation
toilet = ox.geometries_from_place(bxlzone, tags={"amenity": ["bench", 'toilets']}) 
toilet_coords_list = []
for geom in toilet.geometry:
    if isinstance(geom, Polygon):
        toilet_coords_list.append(list(geom.centroid.coords[0]))
    elif isinstance(geom, Point):
        toilet_coords_list.append((geom.x, geom.y))
    elif isinstance(geom, MultiPolygon):
        for poly in geom:
            toilet_coords_list.append(list(poly.centroid.coords[0]))
#toilet nodes
toilet_nodes = []
for coord in toilet_coords_list:
    toilet_node = ox.distance.nearest_nodes(G, coord[0], coord[1])
    toilet_nodes.append(toilet_node)
#toilet edges
toilet_egs = []

start_time = time.time()

for coord in toilet_coords_list:
    toilet_eg = ox.distance.nearest_edges(G, coord[0], coord[1], interpolate=1, return_dist=False)
    toilet_egs.append(toilet_eg)

end_time = time.time()
elapsed_time = end_time - start_time

print(f"Elapsed time: {elapsed_time} seconds")

In [None]:
# Fasicilaty_shower_lock POIs retrieve and thereof calculation
fsl= ox.geometries_from_place(bxlzone, tags={"amenity": ["shower", 'parcel_locker']})
fsl_coords_list = []
for geom in fsl.geometry:
    if isinstance(geom, Polygon):
        fsl_coords_list.append(list(geom.centroid.coords[0]))
    elif isinstance(geom, Point):
        fsl_coords_list.append((geom.x, geom.y))
    elif isinstance(geom, MultiPolygon[0]):
        for poly in geom:
            fsl_coords_list.append(list(poly.centroid.coords[0]))
#fsl nodes
fsl_nodes = []
for coord in fsl_coords_list:
    fsl_node = ox.distance.nearest_nodes(G, coord[0], coord[1])
    fsl_nodes.append(fsl_node)
#fsl edges
fsl_egs = []

start_time = time.time()

for coord in fsl_coords_list:
    fsl_eg = ox.distance.nearest_edges(G, coord[0], coord[1], interpolate=1, return_dist=False)
    fsl_egs.append(fsl_eg)

end_time = time.time()
elapsed_time = end_time - start_time

print(f"Elapsed time: {elapsed_time} seconds")
# count how many times each edge appears in the list
fsl_edge_count = {}
for edge in fsl_egs:
    if edge in fsl_edge_count:
        fsl_edge_count[edge] += 1
    else:
        fsl_edge_count[edge] = 1
fsl_edge_count

In [None]:
#visualize the POIs nodes
wka.explore(tiles='Stamen Toner')

In [None]:
#Below is refinement

In [None]:
#create a new attribute in G
for u, v, k, data in G.edges(keys=True,data=True):
    # check if this edge appears in the list
    edge = (u, v, k)
    # modify the length of this edge
    data['wl'] = data['length']

In [None]:
# Refined the new weight based on the road attribute
for u, v, k, data in G.edges(keys=True, data=True):
    # check if this edge has a 'highway' attribute of 'pedestrian'
    if 'highway' in data and data['highway'] == 'pedestrian':
        # modify the length of this edge
        data['wl'] = data['wl']/2*(1-0.2934)

In [None]:
# Define the scaling factors for each feature
feature_scales = {
    'fsl': 0.0097,
    'scenery': 0.5618,
    'ts': 0.0541,
    'toilet': 0.2895,
    'lv': 0.0521
}

# Loop over the edges in the graph to refined other POIs
for u, v, k, data in G.edges(keys=True, data=True):
    # Check if this edge appears in any of the feature edge lists
    for feature in feature_scales.keys():
        edge_list_name = feature + '_egs'
        edge_count_name = feature + '_edge_count'
        scale = feature_scales[feature]
        if (u, v, k) in locals()[edge_list_name]:
            # Modify the length of this edge according to the feature scaling factor
            count = locals()[edge_count_name][(u, v, k)]
            data['wl'] = ((data['wl'] / (count + 1)) * (scale))

In [None]:
#original routes
routes=[]
start_nodes=[]
end_nodes=[]
for slat,slon,elat,elon in zip(slat,slon,elat,elon):
    start_node = ox.distance.nearest_nodes(G, slon,slat)
    end_node=ox.distance.nearest_nodes(G, elon, elat)
    start_nodes.append(start_node)
    end_nodes.append(end_node)
    route = nx.shortest_path(G,
                                  start_node,
                                  end_node,
                                  weight='length')
    routes.append(route)

In [None]:
# Calculate the original distance of the routes seperately and overall distance
odist = []
total_distance = 0
for start_node, end_node in zip(start_nodes, end_nodes):
    distance = nx.shortest_path_length(G, source=start_node, target=end_node, weight='length', method='dijkstra')
    odist.append(distance)
    total_distance += distance
odist
print('The total distance is', total_distance, 'meter')

In [None]:
#get the new routes with condition of POIs nodes
nroutes=[]
for start_node, end_node in zip(start_nodes, end_nodes):
    nroute = nx.shortest_path(G,
                                  start_node,
                                  end_node,
                                  weight='wl')
    nroutes.append(nroute)

In [None]:
# the distance of new routes
ndist=[]
ntotal=0
for nroute in nroutes:
    edge_lengths = ox.utils_graph.get_route_edge_attributes(G, nroute, 'length')
    ndist.append(edge_lengths)
dts=[]
for nd in ndist:
    nds=sum(nd)
    dts.append(nds)
dts
#total new routes distance
ntotal=sum(dts)
ntotal

In [None]:
#new routes visualization
fig, ax = ox.plot_graph_routes(G, routes, figsize=(20,15), node_size=0)

In [None]:
#calculate the number of certain type POIs node in routes & new routes
count = 0
for sublist in routes:
    if any(item in sublist for item in lv_nodes):
        count += 1

print(count)

In [None]:
count = 0
for sublist in nroutes:
    if any(item in sublist for item in lv_nodes):
        count += 1

print(count)

In [None]:
#zoom in a specific leg to gain more intelligible view

In [None]:
single_slat=50.847
single_slon=4.354
single_elat=50.845
single_elon=4.34

In [None]:
single_start_node = ox.distance.nearest_nodes(G, single_slon, single_slat)
single_dest_node = ox.distance.nearest_nodes(G, single_elon, single_elat)
single_route = nx.shortest_path(G, single_start_node, single_dest_node, weight="length")
#get the route edges
#depict the original shortest route
# define the start and end points as latitude-longitude coordinates
start_coords = (single_slat, single_slon)
end_coords = (single_elat, single_elon)

# create a bounding box around the start and end points
north = 50.86
south = 50.83
east = 4.33
west = 4.36
bbox = (north, south, east, west)

# plot the graph with the specified routes and bounding box
fig, ax = ox.plot_graph_routes(G, [single_route, single_nroute], route_colors=['r', 'g'], node_size=0, bbox=bbox)

In [None]:
single_distance = nx.shortest_path_length(G, source=single_start_node , target=single_dest_node ,
                                          weight='length', method='dijkstra')
single_distance

In [None]:
edge_lengths = ox.utils_graph.get_route_edge_attributes(G, single_nroute, 'length')