In [None]:
import pandas as pd
import geopandas as gpd

from shapely.geometry import LineString, MultiLineString, Point
from shapely.ops import unary_union, snap

import osmnx

import matplotlib.pyplot as plt
import seaborn as sns

import os
import sys
sys.path.append(os.path.join(os.path.abspath(".."), "functions"))

import data_reader
import spatial_assignment
from constants import CRS_LATLONG, CRS_PROJECTED

# 1) Frequency

In [None]:
av = data_reader.AV_shape(namefile="area_verde_manual_v1.geojson", datapath="../data")

b_av = av.copy()
b_av['geometry'] = b_av.buffer(distance=3000)

In [None]:
data = gpd.read_parquet("../data/TPM/tper-vigente.parquet")
data.head()

In [None]:
counts_per_stop = data.groupby(['stop_id']).count().reset_index()[[ 'stop_id', 'trip_id']].rename(columns={'trip_id':'counts'}).sort_values(by='counts')
print(counts_per_stop.head())
print(counts_per_stop.tail())

sns.histplot(data=counts_per_stop, x='counts')

In [None]:
# n. passaggi medi alla fermata all'ora

counts_per_stop['counts'].mean()/24

In [42]:
# frequenza media oraria

float(1/ (counts_per_stop['counts'].mean()/24))

0.32591402575063977

# 2) Capillarity

In [None]:
def process_lines(gdf, snap_tolerance=1e-8):
    # 1. Keep only geometry and reproject
    gdf = gdf[['geometry']]

    # 1.5: Remove directionality
    gdf['geometry'] = gdf['geometry'].apply(normalize_linestring_direction)

    # 2. Remove exact duplicates
    gdf = gdf.drop_duplicates(subset='geometry')

    # 3. Snap geometries to themselves (cleans tiny gaps)
    gdf['geometry'] = gdf['geometry'].apply(lambda geom: snap(geom, geom, tolerance=snap_tolerance))

    # 4. Merge into a MultiLineString
    merged = unary_union(gdf.geometry)
    if isinstance(merged, LineString):
        merged = MultiLineString([merged])
    elif isinstance(merged, (list, tuple)):
        merged = MultiLineString(merged)

    # 5. Explode and drop overlapping segments
    exploded = gpd.GeoSeries(merged).explode(index_parts=False)
    unique = exploded.drop_duplicates()

    # 6. Calculate total length
    total_length = unique.length.sum()

    # Return as GeoDataFrame
    unique_gdf = gpd.GeoDataFrame(geometry=unique, crs=gdf.crs)
    return unique_gdf, total_length

def normalize_linestring_direction(geom):
    if isinstance(geom, LineString):
        coords = list(geom.coords)
        return LineString(coords if coords < coords[::-1] else coords[::-1])
    return geom


In [None]:
shp = gpd.read_file('../data/TPM/tper-vigente.geojson')

In [None]:
# Calculate the length of the PT road newtork

gdf = shp[~shp['geo_point_2d'].isnull()][['geometry']]
gdf = gdf.set_crs(CRS_LATLONG).to_crs(CRS_PROJECTED)

unique_pt_roads, length_pt_roads = process_lines(gdf=gdf)

print(length_pt_roads)

In [None]:
unique_pt_roads.plot()

In [None]:
# Create a convex hull around the roads (to extract roads from OSM)

unique_pt_roads = unique_pt_roads[['geometry']]
unique_pt_roads = unique_pt_roads.to_crs(CRS_LATLONG)

merged = unary_union(unique_pt_roads.geometry)
convex_hull = merged.convex_hull

unique_pt_roads = unique_pt_roads.to_crs(CRS_PROJECTED)

In [None]:
# Download the road newtork from OSM



In [None]:
# rn = osmnx.graph_from_polygon(convex_hull, network_type="drive", simplify=True)

# rn2 = osmnx.convert.graph_to_gdfs(G=rn, nodes=False, edges=True)
# rn2 = rn2.reset_index()[['u','v','key','length','geometry']]
# rn2.to_parquet(path="../data/road_newtork_all_bologna_v1.parquet")

rn2 = gpd.read_parquet(path="../data/road_newtork_all_bologna_v1.parquet")

In [None]:
# Calculate the length of the all road newtork

gdf = rn2[['geometry']]
gdf = gdf.set_crs(CRS_LATLONG).to_crs(CRS_PROJECTED)

unique_all_roads, length_all_roads = process_lines(gdf=gdf)

print(length_all_roads)

In [None]:
unique_all_roads.plot()

In [None]:
# Create a unified plot of both road networks

fig, ax = plt.subplots(figsize=(10, 10))

unique_all_roads.plot(ax=ax, color='gray', linewidth=0.5, alpha=1, label='All Roads')

unique_pt_roads.plot(ax=ax, color='blue', linewidth=1, label='Roads with TPM')

plt.legend()
plt.show()


In [41]:
# Capillarity index:

float(length_pt_roads / length_all_roads)

0.1962080743230842

# 3) Cost


Cerco la vicinanza spaziale, dove per le OD abbiamo il punto preciso, per le tariffe abbiamo fatto geocoding

In [None]:
file1 = "../data/OD/Shape_zone_centroid.SHP" 
file2 = "../data/TPM/tariffe_zona_bologna_geocoding.csv"

localita_file1 = gpd.read_file(file1)
localita_file1 = localita_file1.set_crs("EPSG:23032").to_crs(CRS_PROJECTED)

print(f"Località 1")
print(localita_file1.head())

localita_file2 = pd.read_csv(file2)
geometry = [Point(xy) for xy in zip(localita_file2['Longitude'], localita_file2['Latitude'])]
localita_file2 = gpd.GeoDataFrame(localita_file2, geometry=geometry, crs=CRS_LATLONG).to_crs(CRS_PROJECTED)
localita_file2 = localita_file2[~localita_file2['Latitude'].isnull()]

print(f"Località 2")
print(localita_file2.head())

In [None]:
nearest_mapping = {}

for idx, point_row in localita_file2.iterrows():

    point = point_row.geometry
    point_code = point_row['LOCALITA']
    
    distances = localita_file1.distance(point)
    
    min_dist = distances.min()
    if min_dist < 5000:
        nearest_idx = distances.idxmin()
        nearest_code = int(localita_file1.loc[nearest_idx, 'NO'])
    else:
        nearest_code = None
    
    nearest_mapping[point_code] = nearest_code

nearest_mapping


In [None]:
zone_tariffe_mapping = {}

zona = localita_file2.sort_values(by='TARIFFA')['TARIFFA'].unique()
costo = [2.9, 3.7, 4.5, 5.4, 5.4, 6.2, 6.8, 7.4, 4, 2.3 ]

for i in range(len(costo)):
    zone_tariffe_mapping[zona[i]] = costo[i]

zone_tariffe_mapping

In [None]:
tariffe_mapping = {}

for key, value in nearest_mapping.items():
    if value is not None:
        if value not in tariffe_mapping:
            tariffe_mapping[value] = {}
        
        tariffa_new = localita_file2.loc[localita_file2['LOCALITA'] == key, 'TARIFFA']
        if not tariffa_new.empty:
            tariffa_new = tariffa_new.iloc[0]

            cost_new = zone_tariffe_mapping[tariffa_new]

            if cost_new not in tariffe_mapping[value]:
                tariffe_mapping[value][cost_new] = 0
            tariffe_mapping[value][cost_new] = tariffe_mapping[value][cost_new] + 1

tariffe_mapping

In [None]:
cost_mapping = {}

for key, value in tariffe_mapping.items():
    tot_cost = 0
    tot_n = 0
    for c, n in value.items():
        tot_cost += c*n
        tot_n += n
    cost_mapping[key] = round(tot_cost/tot_n,4)

cost_mapping

In [None]:
av = data_reader.AV_shape(namefile="area_verde_manual_v1.geojson", datapath="../data")
flow_shape = data_reader.OD_shapes(namefile_polygons="Shape_zone.SHP", namefile_centers="Shape_zone_centroid.SHP", datapath="../data/OD")
flow_shape = spatial_assignment.OD_to_AV(df_od=flow_shape, df_av=av)[['id', 'mostly_within_area_verde']]
flow_shape.head()

In [None]:
flow_data = data_reader.OD_flows(namefile="PROGETTO-OD.xlsx", datapath="../data/OD")

flow_data = (
    flow_data
    .merge(flow_shape, how='left', left_on='from', right_on='id')
    .drop(columns='id')
    .rename(columns={'mostly_within_area_verde': 'from_mwav'})
    .merge(flow_shape, how='left', left_on='to', right_on='id')
    .drop(columns='id')
    .rename(columns={'mostly_within_area_verde': 'to_mwav'})
)

flow_data.head()

In [None]:
cost_list = []
del_list = []

for _, row in flow_data.iterrows():

    # Case of external trips
    if (row['from_mwav'] == False) and (row['to_mwav'] == False):
        cost_list.append(None)
        del_list.append(False)
        continue

    # Case of urban trips
    if (row['from_mwav'] == True) and (row['to_mwav'] == True):
        cost_list.append(zone_tariffe_mapping['urb.BO'])
        del_list.append(True)
        continue

    # Case of extra-urban trips
    if (row['from_mwav'] == True):
        id_check = row['to']
    elif (row['to_mwav'] == True):
        id_check = row['from']
    else:
        id_check = None

    if id_check is not None and id_check in cost_mapping:
        cost_list.append(cost_mapping[id_check])
        del_list.append(True)
    else:
        cost_list.append(None)
        del_list.append(False)

flow_data['cost'] = cost_list
flow_data.head()
    

In [40]:
# Average cost paid

sum(flow_data.loc[del_list, 'flow'] * flow_data.loc[del_list, 'cost']) / sum(flow_data.loc[del_list,'flow'])

3.074469699017669