In [69]:
import osmnx as ox
import geopandas as gpd
import networkx as nx
from shapely.geometry import Point
import numpy as np
import seaborn as sns
import pandas as pd
import requests
import xml.etree.ElementTree as ET
import h3
from shapely import wkt
from shapely.geometry import box, Polygon
import folium

# Load data
all_lts_df = pd.read_csv("/Users/leonardo/Desktop/Tesi/LTSBikePlan/data/Montereale_Valcellina_all_lts.csv")
all_lts_df['geometry'] = all_lts_df['geometry'].apply(wkt.loads)
gdf_nodes = pd.read_csv("/Users/leonardo/Desktop/Tesi/LTSBikePlan/data/Montereale_Valcellina_gdf_nodes.csv", index_col=0)
gdf_nodes['geometry'] = gdf_nodes['geometry'].apply(wkt.loads)

#Convert the DataFrame to a GeoDataFrame
all_lts = gpd.GeoDataFrame(all_lts_df, geometry='geometry')
all_lts.crs = "EPSG:32632"
all_lts_projected = all_lts.to_crs(epsg=4326)

nodes = gpd.GeoDataFrame(gdf_nodes, geometry='geometry')
nodes.crs = "EPSG:32632"
nodes_projected = nodes.to_crs(epsg=4326)

def classify_stress(row):
    if pd.notna(row['lts']):  # Checks if 'lts' value is not NaN
        if row['lts'] in [1, 2]:
            return 'low'
        elif row['lts'] in [3, 4]:
            return 'high'
    return None  # Returns None for NaN or unclassified values

# Apply the function to create 'type_stress' column
all_lts_projected['type_stress'] = all_lts_projected.apply(classify_stress, axis=1)
nodes_projected['type_stress'] = nodes_projected.apply(classify_stress, axis=1)

# Create a network
G = nx.Graph()
# Add nodes from nodes_projected
for idx, row in nodes_projected.iterrows():
    # Add node with attributes
    G.add_node(idx, **row.to_dict())

# Add edges from all_lts_projected
for _, row in all_lts_projected.iterrows():
    # Extract start and end node IDs
    u = row['u']
    v = row['v']
    # Add edge with attributes
    G.add_edge(u, v, **row.to_dict())

print(f"Number of nodes: {G.number_of_nodes()}")
print(f"Number of edges: {G.number_of_edges()}")

# Print node attributes
if G.number_of_nodes() > 0:
    first_node = next(iter(G.nodes))
    print("Node Attributes:", list(G.nodes[first_node].keys()))
else:
    print("No nodes in the graph.")

# Print edge attributes
if G.number_of_edges() > 0:
    first_edge = next(iter(G.edges))
    print("Edge Attributes:", list(G.edges[first_edge].keys()))
else:
    print("No edges in the graph.")

Number of nodes: 1233
Number of edges: 1104
Node Attributes: ['y', 'x', 'street_count', 'highway', 'ref', 'geometry', 'lts', 'message', 'type_stress']
Edge Attributes: ['u', 'v', 'key', 'osmid', 'lanes', 'name', 'highway', 'maxspeed', 'geometry', 'length', 'rule', 'lts', 'group', 'slope', 'slope_class', 'lanes_assumed', 'maxspeed_assumed', 'message', 'short_message', 'type_stress']


In [70]:
# Function to retrieve population data from OpenStreetMap
def get_osm_population(city_name):
    # Construct the Overpass API query
    query = f"""
    [out:xml][timeout:25];
    area[name="{city_name}"]->.searchArea;
    (
      node["population"](area.searchArea);
      way["population"](area.searchArea);
      relation["population"](area.searchArea);
    );
    out body;
    """
    url = "https://overpass-api.de/api/interpreter"
    
    # Send the request
    response = requests.get(url, params={'data': query})
    
    # Parse the XML response
    root = ET.fromstring(response.content)
    
    # Extract population data
    for element in root.iter('tag'):
        if element.attrib['k'] == 'population':
            return element.attrib['v']
    
    return "Population data not found"

# Retrieve population data for the specific city
total_city_population = int(get_osm_population("Montereale Valcellina"))

# Function to create a hexagonal grid within the city boundaries
def create_hex_grid_within_city_bounds(city_boundary, resolution):
    # Generate hexagons within the city boundary
    hexagons = h3.polyfill(city_boundary.__geo_interface__, resolution, geo_json_conformant=True)

    filtered_hexagons = []
    for h in hexagons:
        hex_polygon = Polygon(h3.h3_to_geo_boundary(h, geo_json=True))
        if hex_polygon.intersects(city_boundary):
            filtered_hexagons.append(hex_polygon)

    hex_grid = gpd.GeoDataFrame([{'geometry': hexagon} for hexagon in filtered_hexagons])
    hex_grid.crs = 'EPSG:4326'

    return hex_grid

# Create the hex grid within the city boundaries
city_boundary = all_lts_projected.geometry.unary_union.buffer(0.005)
hex_grid_within_city = create_hex_grid_within_city_bounds(city_boundary, 9)
hex_grid_projected = hex_grid_within_city.to_crs('EPSG: 32632')

# Calculate area for each hexagon to distribute population
hex_grid_projected['area'] = hex_grid_projected['geometry'].area

# Distribute the city's population across the hexagons
hex_grid_projected['estimated_population'] = (hex_grid_projected['area'] / hex_grid_projected['area'].sum()) * total_city_population

# Project the hex grid back to the original CRS for consistency with other data
hex_grid_projected = hex_grid_projected.to_crs('EPSG:4326')

# Choose an appropriate projected CRS for centroid calculation
projected_crs = 'EPSG:32632'  # Replace with a CRS that's appropriate for your area

# Project the all_lts data to the new CRS for centroid calculation
all_lts_projected_crs = all_lts_projected.to_crs(projected_crs)

# Calculate centroids
centroids = all_lts_projected_crs.geometry.centroid

# Reproject centroids back to the original CRS
centroids = centroids.to_crs(epsg=4326)

# Get the center latitude and longitude for the map
center_lat, center_lon = centroids.iloc[0].y, centroids.iloc[0].x

# Convert the hexagonal grid to GeoJSON for visualization
hex_grid_geojson = hex_grid_projected.to_json()

# Function to retrieve building data 
def get_building_data(city_name):
    # Retrieve buildings from OSM within the city boundary
    buildings = ox.features_from_place(city_name, tags={'building': True})
    return buildings

# Retrieve building data for Montereale Valcellina
buildings = get_building_data("Montereale Valcellina, Italy")

# Ensure CRS match between buildings and hex grid
buildings = buildings.to_crs(hex_grid_projected.crs)

# Ensure hex_grid_projected has an explicit index column
hex_grid_projected.reset_index(inplace=True)
hex_grid_projected.rename(columns={'index': 'hex_index'}, inplace=True)

# Spatial join to count buildings in each hexagon
hex_grid_with_buildings = gpd.sjoin(hex_grid_projected, buildings, how='left', predicate='intersects')

# Count buildings based on the hexagon index
building_counts = hex_grid_with_buildings.groupby('hex_index').size()

# Assign building counts to the hex grid
hex_grid_projected['building_count'] = hex_grid_projected['hex_index'].map(building_counts).fillna(0)

# Adjust population estimation based on building count
# Here: simple average of area-based and building-based estimates
hex_grid_projected['adjusted_population'] = hex_grid_projected.apply(
    lambda row: (row['estimated_population'] + row['building_count']) / 2, axis=1)
assert 'adjusted_population' in hex_grid_projected.columns, "Adjusted population column not found"

# Convert the hexagonal grid to GeoJSON for visualization
hex_grid_geojson = hex_grid_projected.to_crs(epsg=4326).to_json()

m = folium.Map(location=[center_lat, center_lon], zoom_start=13)

def color_function(feature):
    population = feature['properties']['adjusted_population']
    
    high_population_threshold = total_city_population * 0.02  # 2% of total population
    medium_population_threshold = total_city_population * 0.01  # 1% of total population

    if population > high_population_threshold:
        return '#ff0000'  # Red 
    elif population > medium_population_threshold:
        return '#ffff00'  # Yellow 
    else:
        return '#00ff00'  # Green 

folium.GeoJson(
    hex_grid_geojson,
    name='Hexagonal Grid',
    style_function=lambda feature: {
        'fillColor': color_function(feature),
        'color': 'black',
        'weight': 1,
        'fillOpacity': 0.5,
    }
).add_to(m)

# Add layer control and display the map
folium.LayerControl().add_to(m)
m

In [94]:
# Spatial join between hexagons and edges
edges_gdf = gpd.GeoDataFrame(all_lts_projected[['geometry', 'u', 'v', 'type_stress']], geometry='geometry')
hex_edges = gpd.sjoin(hex_grid_projected, edges_gdf, how='left', predicate='intersects')

# Group edges by hexagon using a list for column names
hexagon_edges = hex_edges.groupby('hex_index')[['u', 'v']].apply(lambda x: list(zip(x['u'], x['v'])))
# Remove hexagons with only NaN values in their edge list
filtered_hexagon_edges = {hex_id: edges for hex_id, edges in hexagon_edges.items() if not all(np.isnan(edge).any() for edge in edges)}

print(filtered_hexagon_edges)


{0: [(3506974525.0, 4187438666.0), (4187438666.0, 3506974525.0), (3506974548.0, 4187438666.0), (4187438666.0, 3506974548.0), (737610454.0, 481334646.0), (3506974548.0, 5018109329.0), (5018109329.0, 3506974548.0), (3506974548.0, 3506974552.0), (3506974552.0, 3506974548.0), (481334646.0, 3506974552.0), (3506974552.0, 481334646.0), (481334646.0, 481334639.0), (481334639.0, 481334646.0), (481334646.0, 737610454.0)], 1: [(3481946812.0, 1947733632.0), (1947733632.0, 3481946812.0), (1947733632.0, 564874543.0), (564874543.0, 1947733632.0), (1947733632.0, 3783691229.0), (3783691229.0, 1947733632.0), (1947733632.0, 1875944520.0), (1875944520.0, 1947733632.0)], 2: [(3452104942.0, 2143808112.0), (2143808112.0, 3452104942.0), (2143808184.0, 2143808112.0), (2143808112.0, 2143808184.0), (912306922.0, 2143808184.0), (2143808184.0, 912306922.0), (318694413.0, 912296263.0), (912296263.0, 318694413.0), (3481947868.0, 912311919.0), (912311919.0, 3481947868.0)], 6: [(1945789497.0, 3723637999.0), (372363799

In [98]:
# Assuming you have a function to find the centroid of a hexagon
def find_hexagon_centroid(hexagon_geometry):
    return hexagon_geometry.centroid

# Find the representative node for each hexagon
representative_nodes = {}
for hex_id, edges in filtered_hexagon_edges.items():
    hexagon_geometry = hex_grid_projected.loc[hex_grid_projected['hex_index'] == hex_id, 'geometry'].iloc[0]
    centroid = find_hexagon_centroid(hexagon_geometry)
    
    # Find the closest node to the centroid among the nodes of the hexagon's edges
    closest_node = None
    min_distance = float('inf')
    for u, v in edges:
        for node in [u, v]:
            if node in G.nodes:  # Check if the node is in the graph
                node_geom = Point(G.nodes[node]['x'], G.nodes[node]['y'])  # assuming nodes have 'x' and 'y' attributes
                distance = node_geom.distance(centroid)
                if distance < min_distance:
                    closest_node = node
                    min_distance = distance

    representative_nodes[hex_id] = closest_node

#print(representative_nodes)

shortest_paths = {}
for hex_id, start_node in representative_nodes.items():
    shortest_paths[hex_id] = {}
    for target_hex_id, target_node in representative_nodes.items():
        if hex_id != target_hex_id:
            if nx.has_path(G, start_node, target_node):  # Check if a path exists
                path_length = nx.dijkstra_path_length(G, source=start_node, target=target_node, weight='length')  # Assuming 'length' is the edge attribute for distance
                shortest_paths[hex_id][target_hex_id] = path_length
            else:
                shortest_paths[hex_id][target_hex_id] = None  # Indicate no path exists


In [104]:
# Create a subgraph with only low-stress edges
G_low_stress = nx.Graph()

# Add nodes to the subgraph
for node, data in G.nodes(data=True):
    G_low_stress.add_node(node, **data)

# Add only low-stress edges to the subgraph
for u, v, data in G.edges(data=True):
    if data.get('type_stress') == 'low':
        G_low_stress.add_edge(u, v, **data)


shortest_paths_low_stress = {}
for hex_id, start_node in representative_nodes.items():
    shortest_paths_low_stress[hex_id] = {}
    for target_hex_id, target_node in representative_nodes.items():
        if hex_id != target_hex_id:
            if nx.has_path(G_low_stress, start_node, target_node):  # Check if a path exists in the low-stress graph
                path_length = nx.dijkstra_path_length(G_low_stress, source=start_node, target=target_node, weight='length')  # Use the low-stress graph
                shortest_paths_low_stress[hex_id][target_hex_id] = path_length
            else:
                shortest_paths_low_stress[hex_id][target_hex_id] = None  # Indicate no path exists

#shortest_paths_low_stress

In [107]:
hexagon_connections = {}
detour_threshold = 1.5
for hex_id in shortest_paths:
    hexagon_connections[hex_id] = {}
    for target_hex_id in shortest_paths[hex_id]:
        baseline_path_length = shortest_paths[hex_id][target_hex_id]
        low_stress_path_length = shortest_paths_low_stress[hex_id][target_hex_id]

        # Check if the low-stress path exists and is within the threshold
        if low_stress_path_length is not None and (baseline_path_length is None or low_stress_path_length <= baseline_path_length * detour_threshold):
            hexagon_connections[hex_id][target_hex_id] = True
        else:
            hexagon_connections[hex_id][target_hex_id] = False

# hexagon_connections now contains information about which hexagons are connected via low-stress paths
hexagon_connections

{0: {1: False,
  2: False,
  6: False,
  7: False,
  8: False,
  9: False,
  10: True,
  11: False,
  13: False,
  14: True,
  15: False,
  16: False,
  17: True,
  18: False,
  19: False,
  21: False,
  22: False,
  23: False,
  25: False,
  27: False,
  28: False,
  31: False,
  32: False,
  33: False,
  35: False,
  36: False,
  37: False,
  38: False,
  39: False,
  43: False,
  46: True,
  47: False,
  48: False,
  49: False,
  51: False,
  52: False,
  53: False,
  56: False,
  57: False,
  58: False,
  59: True,
  60: False,
  61: False,
  62: False,
  64: False,
  65: False,
  66: False,
  68: False,
  69: False,
  70: False,
  72: False,
  73: False,
  75: False,
  77: False,
  78: False,
  80: False,
  81: False,
  82: True,
  83: False,
  84: False,
  86: True,
  87: False,
  89: True,
  90: False,
  91: False,
  92: False,
  93: False,
  95: False,
  97: False,
  98: False,
  99: False,
  100: False,
  101: False,
  105: False,
  106: False,
  109: False,
  110: False,
  11