In [None]:
import osmnx as ox
import networkx as nx
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import random
from matplotlib.animation import FuncAnimation
from IPython.display import display, HTML
from tqdm import tqdm
import warnings
import time
import statistics as st

In [None]:
warnings.filterwarnings("ignore")
plt.rcParams['animation.embed_limit'] = 2**128

start_time e end_time serviranno alla fine per il tempo di computazione

In [None]:
start_time = time.time()

In [None]:
end_time = time.time()

print(f'time total = {end_time - start_time}s')

Function computing global efficiency of the entire graph

In [None]:
def centrality_analysis(graph,city : str):
    fig, axs = plt.subplots(2, 2, figsize=(13,13))

    # compute degree centrality
    degree_centrality = nx.degree_centrality(graph)

    # compute edge betweness centrality
    edge_betweenness_centrality = nx.edge_betweenness_centrality(graph)

    # compute betweness centrality
    betweenness_centrality = nx.betweenness_centrality(graph)

    # compute closeness centrality
    closeness_centrality = nx.closeness_centrality(graph)

    #normalized degree centrality values
    dc_values = np.array([degree_centrality[node] for node in graph.nodes])
    norm_dc_values = (dc_values - min(dc_values)) / (max(dc_values) - min(dc_values))

    #normalized edge betweenness centrality values
    ebc_values = np.array([edge_betweenness_centrality[edge] for edge in graph.edges])
    norm_ebc_values = (ebc_values - min(ebc_values)) / (max(ebc_values) - min(ebc_values))

    #normalized betweenness centrality values
    bc_values = np.array([betweenness_centrality[node] for node in graph.nodes])
    norm_bc_values = (bc_values - min(bc_values)) / (max(bc_values) - min(bc_values))

    #normalized closeness centrality values
    c_values = np.array([closeness_centrality[node] for node in graph.nodes])
    norm_c_values = (c_values - min(c_values)) / (max(c_values) - min(c_values))
    
    cmap = 'viridis'
    values = [norm_dc_values , norm_ebc_values , norm_bc_values , norm_c_values]
    names = [city + ' degree centrality map',city + ' edge betweenness centrality map', city + ' betweenness centrality map',city + ' closeness centrality map']
    map_names = ['Degree centrality','Edge Betweenness centrality','Betweenness centrality','Closeness centrality']
    
    for i,values in enumerate(values):
        row = i // 2
        col = i % 2
        ax = axs[row, col]

        norm=plt.Normalize(vmin=values.min(), vmax=values.max())
        sm = plt.cm.ScalarMappable(norm=norm, cmap=cmap)
        sm.set_array([])
        if i == 1:
            ox.plot_graph(
                    graph,
                    ax,
                    bgcolor='white',
                    node_size=0,
                    edge_color=plt.cm.viridis(values),  
                    edge_linewidth=1, 
                    show=False)
        else: 
            ox.plot_graph(graph, ax,
                        node_color=plt.cm.viridis(values), 
                        node_size=3, 
                        edge_linewidth=0.5, 
                        bgcolor = 'white', 
                        show=False)

        cb = fig.colorbar(sm, ax=ax, label = map_names[i])
        plt.axis('on')
        ax.set_title(names[i])

    plt.show()

In [None]:
def global_efficiency(graph):
    nodes, edges = ox.graph_to_gdfs(graph, nodes=True, edges=True)
    efficiency = 0
    efficiency_euclid = 0
    N = len(graph.nodes)
    for i in range(N):
        for j in range(N):
            if i!=j:
                if graph.has_edge(i,j):
                    d_ij = nx.shortest_path_length(G, i, j, weight='length')
                    efficiency += 1 / d_ij
                    #euclid_distance = (2*np.pi*6371000*ox.distance.euclidean(nodes['lat'][i],nodes['lon'][i],nodes['lat'][j],nodes['lon'][j]))/360
                    #efficiency_euclid += 1 / euclid_distance
                    global_efficiency = efficiency/(N*(N-1))
    return global_efficiency

Function setting velocity limit in a given number of streets

In [None]:
def set_max_speed(graph,street_names, max_speed : int):
    '''street_names : array of strings with streets names
        max_speed : in Km/h
    '''
    for _,_,data in graph.edges(data=True):
        if 'name' in data:
            if data['name'] in street_names:
                data['maxspeed'] = max_speed
                #set automatically the new travelling time
                data['travelling_time'] = data['length']*3.6 / float(data['maxspeed'])

Function setting velocity limit in every street

In [None]:
def set_max_speed_all(graph,max_speed : int):
    '''max_speed : in Km/h'''
    for _,_,data in graph.edges(data=True):
        data['maxspeed'] = max_speed

        #set automatically the new travelling time
        data['travelling_time'] = data['length']*3.6 / float(data['maxspeed'])

Function removing streets

In [None]:
def remove_edges(graph,street_names):
    edges_to_be_removed = []
    for u,v,data in graph.edges(data=True):
        if 'name' in data and data['name'] in street_names:
            edges_to_be_removed.append([u,v])
    
    for u,v in edges_to_be_removed:
        if u in graph.nodes:
            graph.remove_node(u)
        if v in graph.nodes:
            graph.remove_node(v)

Function cleaning data of Osmnx edges data and setting travelling times (work fine for Bologna but it's not meant to be used on other cities)

<div style="color: red; font-weight:bold"> 
 Needs to be runned before others when initializing the graph
 
 It's setted on Bologna, it's not meant to be runned on other cities

</div> 

In [None]:
def clean_graph_data(graph):
    '''function cleaning data in graph.edges and setting travelling time

        needs to be runned as first function after the urban network is initialized

        it's setted on Bologna, it's not meant to be runned on other cities '''
    
    for _,_,data in graph.edges(data=True):
        data['passage'] = 0
        data['accident'] = 0
        if 'maxspeed' not in data:
            data['maxspeed'] = 30.0
        if type(data['maxspeed']) == list:
            data['maxspeed'] = data['maxspeed'][0]
        if data['maxspeed'] == 'signals' or data['maxspeed'] == 'IT:urban':
            data['maxspeed'] = 30.0
        if 'name' in data and type(data['name']) == list:
            data['name'] = data['name'][0]

        data['maxspeed'] = float(data['maxspeed'])
        data['travelling_time'] = data['length']*3.6 / float(data['maxspeed'])

    for _,data in graph.nodes(data=True):
        data['accident'] = 0

Function showing traffic flow and accidents

In [None]:
def show_traffic_and_accident(graph,street_passage,accident_positions):
    nodes,edges = ox.graph_to_gdfs(graph,nodes=True,edges=True)

    for edge in street_passage:
        edges.loc[(edge[0],edge[1]),'passage'][0] += edge[2]
        #edges.loc[(edge[0],edge[1]),'passage'][0] += 1

    for edge in accident_positions:
        edges.loc[edge,'accident'][0] += 1

    graph = ox.graph_from_gdfs(nodes,edges)

    # Get edge passages
    edge_values = np.array([data['passage'] for u, v, key, data in graph.edges(keys=True, data=True)])
    norm_edge_values = (edge_values - min(edge_values))/(max(edge_values - min(edge_values)))

    # Define a colormap
    cmap = plt.cm.RdBu 

    # Normalize edge lengths
    norm = plt.Normalize(min(norm_edge_values), max(norm_edge_values))

    # Map edge lengths to colors
    colors = [cmap(norm(value)) for value in norm_edge_values]

    #Plot the network with colored edges
    fig , ax = ox.plot_graph(graph, edge_color=colors, node_size=0, bgcolor='black', edge_linewidth=0.5,show=False)

    # Add colorbar
    sm = plt.cm.ScalarMappable(cmap=cmap, norm=norm)
    sm.set_array([])
    plt.colorbar(sm, ax=ax,label='Edge Passage')
    plt.title('Roads more populated')
    plt.show()

    # Get edge passages
    edge_values = [data['accident'] for u, v, key, data in graph.edges(keys=True, data=True)]

    # Define a colormap
    cmap = plt.cm.RdBu 

    # Normalize edge lengths
    norm = plt.Normalize(min(edge_values), max(edge_values))

    # Map edge lengths to colors
    colors = [cmap(norm(value)) for value in edge_values]

    #Plot the network with colored edges
    fig , ax = ox.plot_graph(graph, edge_color=colors, node_size=0, bgcolor='black', edge_linewidth=0.5,show=False)

    # Add colorbar
    sm = plt.cm.ScalarMappable(cmap=cmap, norm=norm)
    sm.set_array([])
    plt.colorbar(sm, ax=ax,label='Accidents')
    plt.title('Accident')
    plt.show()

Latitudes and longitudes of 'Zona 30' cities except Minneapolis (all the italians except Bologna are in discussion)

In [None]:
cities = {'Bologna' : (44.495555, 11.3428), 'Ferrara' : (44.835297, 11.619865), 'Modena' : (44.64582, 10.92572), 'Reggio Emilia' : (44.7, 10.633333), 
          'Firenze' : (43.771389, 11.254167), 'Torino' : (45.079167, 7.676111), 'Napoli' : (40.833333, 14.25), 'Milano' : (45.466944, 9.19), 
          'Roma' : (41.893056, 12.482778), 'Bergamo' : (45.695, 9.67), 'Verona' : (45.438611, 10.992778), 'Lecce' : (40.352011, 18.169139), 
          'Padova' : (45.406389, 11.877778), 'Olbia' : (40.916667, 9.5), 'Pesaro' : (43.91015, 12.9133), 'Lecco' : (45.85334, 9.39048),
          'Nantes' : (47.218056, -1.552778), 'Minneapolis' : (44.98, -93.268333), 'Bilbao' : (43.266667, -2.933334),'Zurich' : (47.374444, 8.541111), 
          'Valencia' : (39.483333, -0.366667), 'Barcelona' : (41.3825, 2.176944), 'Madrid' : (40.415524, -3.707488), 'Toronto' : (43.716589, -79.340686), 
          'Bruxelles' : (50.846667, 4.351667), 'Lyon' : (45.766944, 4.834167), 'Graz' : (47.070833, 15.438611), 'Edinburgh' : (55.953333, -3.189167)}

In [None]:
city = 'Bologna'
city_radius = 3900 #in meters

# Create urban network from latitude and longitude
G = ox.graph_from_point(cities[city], dist=city_radius, dist_type='bbox', network_type='drive_service', simplify = True)
G = ox.project_graph(G)
osmids = list(G.nodes)
G = nx.relabel.convert_node_labels_to_integers(G)

# give each node its original osmid as attribute since we relabeled them
osmid_values = {k: v for k, v in zip(G.nodes, osmids)}
nx.set_node_attributes(G, osmid_values, "osmid")


fig, ax = ox.plot.plot_graph(G, bgcolor='black',node_size=5, node_color='white', show=False)
plt.axis('on')
plt.title(city)
plt.show()

#inizializing some useful graph attributes
clean_graph_data(G)

ztl = ['Tangenziale di Bologna', 'Autostrada Bologna-Padova','Autostrada Adriatica','Via Francesco Rizzoli','Via Ugo Bassi',"Via dell'Indipendenza"]
tangenziale = ['Tangenziale di Bologna', 'Autostrada Bologna-Padova','Autostrada Adriatica']
remove_edges(G,tangenziale)
fig, ax = ox.plot.plot_graph(G, bgcolor='black',node_size=5, node_color='white', show=False)
plt.axis('on')
plt.title(city)
plt.show()

In [None]:
ztl = ['Tangenziale di Bologna', 'Autostrada Bologna-Padova','Autostrada Adriatica','Via Francesco Rizzoli','Via Ugo Bassi',"Via dell'Indipendenza"]
tangenziale = ['Tangenziale di Bologna', 'Autostrada Bologna-Padova','Autostrada Adriatica']
remove_edges(G,tangenziale)
fig, ax = ox.plot.plot_graph(G, bgcolor='black',node_size=5, node_color='white', show=False)
plt.axis('on')
plt.title(city)
plt.show()

Queste due sotto per decidere un nodo di arrivo 'interessante'

In [None]:
# Compute degree centrality
closeness_centrality = nx.closeness_centrality(G)


In [None]:
c_values = np.array([closeness_centrality[node] for node in G.nodes])
#c_values = np.array([1/(closeness_centrality[node]+1) for node in G.nodes])
norm_c_values = (c_values - min(c_values)) / (max(c_values) - min(c_values))

cmap = 'viridis'
values = norm_c_values
names = [city + ' degree centrality map',city + ' edge betweenness centrality map', city + ' betweenness centrality map',city + ' closeness centrality map']

fig,ax=ox.plot_graph(G,
                        node_color=plt.cm.viridis(values), 
                        node_size=3, 
                        edge_linewidth=0.5, 
                        bgcolor = 'white', 
                        show=False)
plt.show()

In [None]:


# Convert degree centrality dictionary to an array
nodes = list(closeness_centrality.keys())
degree_values = np.array(list(closeness_centrality.values()))

# Noc_values = np.array([closeness_centrality[node] for node in G.nodes])rmalize degree centrality values to create a probability distribution
probability_distribution = degree_values / np.sum(degree_values)

# Sample a node based on the probability distribution
selected_node = np.random.choice(nodes, p=probability_distribution)

selected_node

Queste due sotto serviranno dopo per la Città50

In [None]:
#DON'T RUN!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
#DON'T 
#RUN
with open('strade30.txt','w') as file:
    for u,v,data in G.edges(data=True):
        if data['maxspeed'] < 50:
            if 'name' in data:
                file.write(f'{data['name']}\n{data['maxspeed']}\n')

In [None]:
with open('strade10.txt','w') as file:
    for u,v,data in G.edges(data=True):
        if data['maxspeed'] < 30:
            if 'name' in data:
                file.write(f'{data['name']}\n{data['maxspeed']}\n')

In [None]:
strade10 = []
limite10 = []
i=1
with open('strade10.txt','r') as file:
    for line in file:
    #lines = [line.strip() for line in file.readlines()]
        #values = line.split()
        if i%2 != 0:
            lines = line.strip()
            strade10.append(lines)
        else: 
            limite10.append(float(line))
        i+=1

In [None]:
strade = []
limite = []
i=1
with open('strade30.txt','r') as file:
    for line in file:
    #lines = [line.strip() for line in file.readlines()]
        #values = line.split()
        if i%2 != 0:
            lines = line.strip()
            strade.append(lines)
        else: 
            limite.append(float(line))
        i+=1

In [None]:
def città10(graph,streets,limits):
    for _,_,data in graph.edges(data=True):
        if 'name' in data:
            if data['name'] in streets:
                index = streets.index(data['name'])
                data['maxspeed'] = limits[index]

In [None]:
def città50(graph,streets,limits):
    set_max_speed_all(graph,50)
    for _,_,data in graph.edges(data=True):
        if 'name' in data:
            if data['name'] in streets:
                index = streets.index(data['name'])
                data['maxspeed'] = limits[index]

In [None]:
città50(G,strade,limite)
set_max_speed(G,['Via Francesco Zanardi','Via San Donato','Via Giacomo Matteotti','Via Castiglione','Via Leonetto Cipriani',
                 'Via Attilio Muggia','Via Iacopo Barozzi','Via Vallescura','Via Cino da Pistoia','Via Francesco Petrarca','Viale Antonio Aldini',
                 'Via Bambaglioli Graziolo','Viale Pietro Pietramellara','Viale Angelo Masini','Via Amedeo Parmeggiani'],50)

set_max_speed(G,['Via Gastone Rossi','Via Edmondo De Amicis','Via Paolo Fabbri','Via Oreste Regnoli','Via Massenzio Masia','Via Mario Musolesi','Via Gianni Palmieri','Via Giuseppe Bentivogli',
                 'Via Pietro Loreta','Via Athos Bellettini','Via Serena','Via Caduti della Via Fani','Via Ferruccio Garavaglia','Via Giuseppe Gioannetti','Via Virginia Marini',
                 'Via della Villa','Via Tommaso Salvini','Via Piana','Via Michelino','Via Oreste Trebbi','Via Corrado Masetti','Via Stefano Bottari','Via Giovanni Bertini',
                 'Via Enrico Ferri','Via Francesco Bartoli','Via Ferruccio Benini','Via Angelo Beolco','Via Enrico Capelli','Piazzetta Carlo Musi','Via Filippo e Angelo Cuccoli',
                 'Via Alfonso Torreggiani','Via Carlo Cignani','Via Ambrogio Magenta','Via Giuseppe Maria Mitelli','Via Pietro Faccini','Via del Mastelletta',
                 "Via de' Gandolfi",'Via Alfonso Lombardi','Via Bartolomeo Passarotti','Via Vittorio Alfieri','Via Giorgio Vasari','Via Bruno Arnaud','Via don Giovanni Fornasini',
                 'Via Alfredo Calzolari','Via Jacopo di Palo','Via Francesco Primaticcio', 'Via Delfino Insolera','Via John Cage','Piazza Lucio Dalla','Via Valerio Zurini',
                 'Via Cesare Masina','Via Giorgio Bassani','Via Carlo Togliani','Via Agostino Bignardi','Via della Selva Pescarola',"Via della Ca' Bianca",'Via Molino di Pescarola',
                 'Via Gino Rocchi','Via Cherubino Ghirardacci','Via Gaetano Monti','Via Gian Ludovico Bianconi','Via Giuseppe Albini','Via Giuseppe Ruggi','Via Umberto Monari',
                 'Via Aristide Busi','Via Carlo Francioni','Via Gaetano Giordani','Via Luigi Silvagni','Via di Villa Pardo','Via Antonio Francesco Ghiselli',
                 'Via dei Carrettieri','Via dal Lino','Via Brigate Partigiane','Via Volontari della Libertà','Via dello Sport','Via Estro Menabue','Via Porrettana',"Via Pietro de Coubertin",
                 'Via Eugenio Curiel','Via Irma Bandiera','Via Pietro Busacchi','Via Paolo Giovanni Martini','Via M. Bastia','Via Gino Onofri','Via Giovanni Cerbai',
                 'Via XXI Aprile 1945','Via Paride Pasquali','Via Francesco Orioli','Via Fernando De Rosa','Via Luigi Valeriani','Via Antonio Zoccoli',
                 'Via Antonio Zannoni','Via Rino Ruscello','Via Marino Dalmonte','Via Giovanni Battista Melloni','Via Edoardo Brizio',
                 'Via Carlo Zucchi','Via Domenico Bianchini','Via Rodolfo Audinot','Via Giuseppe Galletti','Via Giuseppe Pacchioni','Via Andrea Costa',
                 'Via Filippo Turati', 'Via Girolamo Giaccobbi','Via Francesco Roncati','Via Giovanni Spataro','Via Giambattista Martini','Via Stanislao Mattei','Via Pietro Busacchi',
                 'Piazza della Pace','Via del Partigiano','Via Duccio Galimberti','Via Rino Ruscello','Via Martino dal Monte','Via Girolamo Giacobbi','Via Luigi Breventani',
                 'Via Alessandro Guidotti','Via Carlo Rusconi','Via Luigi Tanari','Via dello Scalo','Via Innocenzo Malvasia','Via S.Pio V','Via della Ghisiliera','Via della Secchia',
                 'Via Floriano Ambrosini','Via Amedeo Parmeggiani','Via Montello','Via Gorizia','Via Vittorio Veneto','Via Bainsizza','Via Cimabue','Via Indro Montanelli',
                 "Via Ragazzi del '99",'Via Francesco Baracca','Via Oreste Vancini','Via Greta Garbo','Via Carlo Togliani','Via Domenico Svampa',
                 'Via Melozzo da Forlì','Via Giovanni Segantini','Via Berretta Rossa','Via Camonia','Via Amedeo Modigliani','Via Innocenzo da Imola',
                 'Via Pomponia','Via Valeria','Via Lemonia','Via Decumana','Via Tiziano Vecellio','Via della Ferriera','Via Giovanni Fattori','Via Telemaco Signorini',
                 'Via Armando Spadini','Via del Giglio','Via del Cardo','Via Giorgione','Via Gino Cervi','Via Demetrio Martinelli','Via Pinturicchio',
                 'Via del Giacinto','Via Elio Bernardi','Via di Saliceto','Via Piero Gobetti'],30)

#città10(G,strade10,limite10)

# Get edge maxspeed
edge_values = np.array([data['maxspeed'] for u, v, key, data in G.edges(keys=True, data=True)])
norm_edge_values = (edge_values - min(edge_values))/(max(edge_values - min(edge_values)))

# Define a colormap
cmap = plt.cm.RdBu 

# Normalize edge lengths
norm = plt.Normalize(min(edge_values), max(edge_values))

# Map edge lengths to colors
colors = [cmap(norm(value)) for value in edge_values]

#Plot the network with colored edges
fig , ax = ox.plot_graph(G, edge_color=colors, node_size=0, bgcolor='black', edge_linewidth=0.5,show=False)

# Add colorbar
sm = plt.cm.ScalarMappable(cmap=cmap, norm=norm)
sm.set_array([])
plt.colorbar(sm, ax=ax,label='Edge Passage')
plt.title('Roads more populated')
plt.show()

In [None]:
#connectivity analysis
n_edges = G.number_of_edges()
n_nodes = G.number_of_nodes()

#meshedness coefficient
alpha = (n_edges - n_nodes + 1) / (2 * n_nodes - 5)

#connectivity
beta = n_edges/n_nodes

# gammaindex is a measure of the relation between the real number of edges and the number of all possible edges in a network
gamma = n_edges/(3*(n_nodes-2))

#characteristic path length
#l_geo = nx.average_shortest_path_length(G)

#print results
print(city + ' urban network connectivity analysis:\n')
print('# Edges: ', n_edges)
print('# Nodes: ', n_nodes)
print("meshedness coefficient:", alpha)
print("beta connectivity:", beta)
print("gamma index", gamma)
#print("characteristic path length:", l_geo)


In [None]:
degree_sequence = sorted((d for n, d in G.degree()), reverse=True)

fig = plt.figure("Degree of a random graph", figsize=(15, 15))

axgrid = fig.add_gridspec(5, 4)



ax2 = fig.add_subplot(axgrid[3:, 2:])
ax2.bar(*np.unique(degree_sequence, return_counts=True))
ax2.set_title(city + " urban network degree histogram")
ax2.set_xlabel("Degree")
ax2.set_ylabel("# of Nodes")

In [None]:
centrality_analysis(G,city)

In [None]:
#computing Global Efficiency
G1 = global_efficiency(G)

#computing Information Centrality
G.remove_edge(4,7) #remove some edges wiht for loops
G2 = global_efficiency(G) #ma perchè scriviamo in inglese?
print(G1,G2, G1-G2, (G1-G2)/G1) #scemo chi legge
#G.edges #scemo chi l'edges

# CLASSE CARS

In [None]:
class Cars():
    def __init__(self,n_cars):
        self.journey = np.zeros((n_cars,2))
        self.road = []
        self.positions = np.zeros((n_cars,2))
        self.velocities = np.zeros((n_cars,2))
        self.accel = np.zeros((n_cars,2))
        self.max_speed = np.zeros(n_cars)
        self.turn_range = np.zeros(n_cars)
        self.travelling_time = np.zeros((n_cars,2))
        self.position_along_path = np.zeros(n_cars,dtype=int)
        self.n_cars = n_cars
        self.speed_up = 1
        self.accidents = 0


    def get_positions(self):
        return self.positions
    
    def get_velocities(self):
        return self.velocities
    
    def get_accel(self):
        return self.accel
    
    def get_n_cars(self):
        return self.n_cars
    
    def number_of_accidents(self):
        return self.accidents
    
    def set_acceleration_zero(self):
        for i in range(self.n_cars):
            self.accel[i] = (0,0)
    
    def get_velocity_angle(self,car_i):
        vx = self.velocities[car_i][0]
        v = np.linalg.norm(self.velocities[car_i])
        arccos = np.arccos(vx/v)
        if arccos > 0:
            if self.velocities[car_i][1] > 0:
                angle = arccos*180/np.pi
            elif self.velocities[car_i][1] < 0:
                angle = -arccos*180/np.pi
            else:
                angle = 0
        
        if arccos < 0:
            if self.velocities[car_i][1] > 0:
                angle = arccos*180/np.pi
            elif self.velocities[car_i][1] < 0:
                angle = -arccos*180/np.pi
            else:
                angle = 180

        if arccos == 0:
            if self.velocities[car_i][1] > 0:
                angle = 90
            elif self.velocities[car_i][1] < 0:
                angle = 270

        return angle
             
    def get_angle(self,vector):
        x = vector[0]
        y = vector[1]
        v = np.linalg.norm(vector)
        arccos = np.arccos(x/v)
        if arccos > 0:
            if y > 0:
                angle = arccos*180/np.pi
            elif y < 0:
                angle = -arccos*180/np.pi
            else:
                angle = 0
        
        if arccos < 0:
            if y > 0:
                angle = arccos*180/np.pi
            elif y < 0:
                angle = -arccos*180/np.pi
            else:
                angle = 180

        if arccos == 0:
            if y > 0:
                angle = 90
            elif y < 0:
                angle = 270

        return angle

    def another_car_is_near(self,car_i):
        for n in range(self.n_cars):
            diff_position = self.positions[n] - self.positions[car_i]
            dist = np.linalg.norm(diff_position)
            v_i = np.linalg.norm(self.velocities[car_i])
            v_n = np.linalg.norm(self.velocities[n])
            
            if n!=car_i and dist <= 2*self.turn_range[car_i] and dist > 0:
                diff_angles = np.abs(self.get_velocity_angle(n) - self.get_velocity_angle(car_i))
                diff_velocities = v_i - v_n
                angle_positions = np.abs(self.get_angle(diff_position)-self.get_velocity_angle(car_i))

                if diff_angles <= 160 and diff_angles >= 20:
                    return True , dist
                
                if diff_angles <= 20 and diff_velocities > 0 and angle_positions <= 20:
                    return True , dist
                
                else:
                    return False , dist
            else: 
                return False , dist


    def accident_probability_distribution(self,v,size,radius):
        dv = v*3.6*2*radius/(size*self.speed_up)
        p = 0.01/(0.01+np.exp(-dv*2/100))

        return p/50
    

    def accident(self,car_i,size,radius):
        for n in range(self.n_cars):
            dist = np.linalg.norm(self.positions[car_i] - self.positions[n])
            v_i = np.linalg.norm(self.velocities[car_i])
            a_i = np.linalg.norm(self.accel[car_i])
            diff_velocities = np.linalg.norm(self.velocities[car_i] - self.velocities[n])

            if n!=car_i and dist <= 10*self.turn_range[car_i]: # and a_i >= 4*size*self.speed_up/(2*radius) A THRESHOLD CAMBIALO
                diff_angles = np.abs(self.get_velocity_angle(n) - self.get_velocity_angle(car_i))
                if diff_angles <= 180 and diff_angles >= 20:
                    r = np.random.uniform(0,1)
                    if r <= self.accident_probability_distribution(diff_velocities,size,radius):
                        return True
                    else:
                        return False
                else: 
                    return False
            else: 
                return False

    def start_another_journey(self,car_i,graph,size,radius,edge_passage,edge_counts,nodes,edges,dt,closeness_centrality):
        # Convert degree centrality dictionary to an array
        nodes_list = list(closeness_centrality.keys())
        degree_values = np.array(list(closeness_centrality.values()))
        inverse_degree_values = np.array([1/(closeness_centrality[node]+1) for node in nodes_list])

        # Normalize degree centrality values to create a probability distribution
        probability_distribution = degree_values / np.sum(degree_values)
        inverse_probability_distribution = inverse_degree_values/np.sum(inverse_degree_values)

        start = np.random.choice(nodes_list, p=inverse_probability_distribution)
            
        while start not in graph.nodes:
            start = np.random.choice(nodes_list, p=inverse_probability_distribution)

        # Convert degree centrality dictionary to an array
        nodes_list = list(closeness_centrality.keys())
        degree_values = np.array(list(closeness_centrality.values()))

        # Normalize degree centrality values to create a probability distribution
        probability_distribution = degree_values / np.sum(degree_values)

        target = np.random.choice(nodes_list, p=probability_distribution)

        while target not in graph.nodes or target == start:
            target = np.random.choice(nodes_list, p=probability_distribution)
    

        road = ox.shortest_path(graph,start,target,weight='travelling_time')
        road_length = self.road_length(road,edges)

        while road is None:
            start = np.random.choice(nodes_list, p=inverse_probability_distribution)
            while start not in graph.nodes:
                start = np.random.choice(nodes_list, p=inverse_probability_distribution)
            target = np.random.choice(nodes_list, p=probability_distribution)
            while target not in graph.nodes or target == start:
                target = np.random.choice(nodes_list, p=probability_distribution)
            road = ox.shortest_path(graph,start,target,weight='travelling_time')
            road_length = self.road_length(road,edges)
            
        for l in range(len(road)-1):
            if (road[l],road[l+1]) in edge_passage:
                index = edge_passage.index((road[l],road[l+1]))
                edge_counts[index] += 1
                continue
            edge_passage.append((road[l],road[l+1]))
            edge_counts.append(1)
        
        #graph = ox.graph_from_gdfs(nodes,edges) #CONTROLLA E CORREGGI QUESTA PARTE MOLTO IMPORTANTE
            
        self.road[car_i] = road
        self.journey[car_i] = (start , target)

        node_direction = np.array([nodes['x'][road[1]] - nodes['x'][road[0]] , nodes['y'][road[1]] - nodes['y'][road[0]]])
        node_direction /= np.linalg.norm(node_direction)

        self.positions[car_i] = (nodes['x'][start] , nodes['y'][start])

        self.max_speed[car_i] = float(edges['maxspeed'][road[0]][road[1]][0])*self.speed_up*size/(3.6*2*radius)  #speed limit in m/s
        v = random.uniform(self.max_speed[car_i]/4 , self.max_speed[car_i])
        self.velocities[car_i] = v*node_direction

        self.turn_range[car_i] = v*dt
        

    def road_length(self,road,edges):
        if road is None:
            return 0
        edge_series = edges['length']
        length = 0
        for i in range(len(road)-1):
            length += edge_series[road[i]][road[i+1]][0]
            return length
        
    def set_initial_condition(self,graph,size,radius,speed_up,dt,edge_passage,edge_counts,nodes,edges,closeness_centrality):
        #self.max_speed = max_speed
        self.speed_up = speed_up

        for i in tqdm(range(self.n_cars)):
            # Convert degree centrality dictionary to an array
            nodes_list = list(closeness_centrality.keys())
            degree_values = np.array(list(closeness_centrality.values()))
            inverse_degree_values = np.array([1/(closeness_centrality[node]+1) for node in nodes_list])

            # Normalize degree centrality values to create a probability distribution
            probability_distribution = degree_values / np.sum(degree_values)
            inverse_probability_distribution = inverse_degree_values/np.sum(inverse_degree_values)

            start = np.random.choice(nodes_list, p=inverse_probability_distribution)
            
            while start not in graph.nodes:
                start = np.random.choice(nodes_list, p=inverse_probability_distribution)


            target = np.random.choice(nodes_list, p=probability_distribution)

            while target not in graph.nodes or target == start:
                target = np.random.choice(nodes_list, p=probability_distribution)
        

            road = ox.shortest_path(graph,start,target,weight='travelling_time')
            road_length = self.road_length(road,edges)

            while road is None:
                start = np.random.choice(nodes_list, p=inverse_probability_distribution)
                while start not in graph.nodes:
                    start = np.random.choice(nodes_list, p=inverse_probability_distribution)
                target = np.random.choice(nodes_list, p=probability_distribution)
                while target not in graph.nodes or target == start:
                    target = np.random.choice(nodes_list, p=probability_distribution)
                road = ox.shortest_path(graph,start,target,weight='travelling_time')
                road_length = self.road_length(road,edges)
            
            for l in range(len(road)-1):
                if (road[l],road[l+1]) in edge_passage:
                    index = edge_passage.index((road[l],road[l+1]))
                    edge_counts[index] += 1
                    continue
                edge_passage.append((road[l],road[l+1]))
                edge_counts.append(1)
                
            #graph = ox.graph_from_gdfs(nodes,edges) #CONTROLLA E CORREGGI QUESTA PARTE MOLTO IMPORTANTE
                
            self.road.append(road)
            self.journey[i] = (start , target)

            node_direction = np.array([nodes['x'][road[1]] - nodes['x'][road[0]] , nodes['y'][road[1]] - nodes['y'][road[0]]])
            node_direction /= np.linalg.norm(node_direction)

            self.positions[i] = (nodes['x'][start] , nodes['y'][start])

            self.max_speed[i] = float(edges['maxspeed'][road[0]][road[1]][0])*speed_up*size/(3.6*2*radius)  #speed limit in m/s
            v = random.uniform(self.max_speed[i]/4 , self.max_speed[i])
            self.velocities[i] = v*node_direction

            self.turn_range[i] = v*dt



    def update(self,graph,size,radius,speed_up,dt,n_frame,nodes,edges,edge_passage,edge_counts,accident_position,travelling_times,closeness_centrality):

        #nodes, _ = ox.graph_to_gdfs(graph, nodes=True, edges=True)
        #nodes['x'] = ((nodes['x'] - min(nodes['x']))/(max(nodes['x'])-min(nodes['x'])))*size
        #nodes['y'] = ((nodes['y']-min(nodes['y']))/(max(nodes['y']) - min(nodes['y'])))*size

        for i in range(self.n_cars):
            road = self.road[i]
            v_abs = np.linalg.norm(self.velocities[i])
            L = len(road)
        
            #start another journey when target reached
            if np.linalg.norm(self.positions[i]- np.array([nodes['x'][road[L-1]] , nodes['y'][road[L-1]]])) <= self.turn_range[i]:
                self.start_another_journey(i,graph,size,radius,edge_passage,edge_counts,nodes,edges,dt,closeness_centrality)
                self.position_along_path[i] = 0
                self.travelling_time[i][0] = (n_frame * dt * speed_up) - self.travelling_time[i][1]
                self.travelling_time[i][1] = n_frame * dt * speed_up
                travelling_times.append(self.travelling_time[i][0])
                #self.velocities[i] = (0.0001,0.0001)
                #self.positions[i] = (-1,-1)
                continue
            

            #ACCELERAZIONI PER I NODI, GUARDA BENE E CAMBIA COSE 
            if np.linalg.norm(self.positions[i]-np.array([(nodes['x'][road[self.position_along_path[i]+1]]) , nodes['y'][road[self.position_along_path[i]+1]]])) <= 2*self.turn_range[i]:
                #self.accel[i] = (0,0)
                self.accel[i] = -self.velocities[i]/7
                #self.velocities[i] += self.accel[i]*dt
            elif np.linalg.norm(self.positions[i]-np.array([(nodes['x'][road[self.position_along_path[i]+1]]) , nodes['y'][road[self.position_along_path[i]+1]]])) >= 2*self.turn_range[i]:
                self.accel[i] = self.velocities[i]/5
                #self.velocities[i] += self.accel[i]*dt
                
            #another_car , dist = self.another_car_is_near(i)
            #DECELERAZIONI SE VEDE MACCHINE, ATTENZIONE A QUEL +=
            if self.another_car_is_near(i)[0]:
                self.accel[i] -= self.velocities[i]/5
                #self.accel[i] -= self.velocities[i]/self.another_car_is_near(i)[1]
                #self.velocities[i] += self.accel[i]*dt
            else:
                if v_abs <= 0.1*self.max_speed[i]:
                    #self.accel[i] = self.max_speed[i]*self.velocities[i]/(4*v_abs)
                    #self.velocities[i] += self.accel[i]*dt
                    self.accel[i] += (self.max_speed[i]-v_abs)*self.velocities[i]/v_abs
                    #self.accel[i] += self.max_speed[i]*self.velocities[i]/5

                    
            v = np.linalg.norm(self.velocities[i])
            self.turn_range[i] = v*dt

            #ACCIDENTS
            if self.accident(i,size,radius):
                self.accidents += 1   
                self.velocities[i] = (0.0,0.0)
                accident_position.append((road[self.position_along_path[i]],road[self.position_along_path[i]+1]))

            #change direction if an intermediate node is near
            if np.linalg.norm(self.positions[i]-np.array([(nodes['x'][road[self.position_along_path[i]+1]]) , nodes['y'][road[self.position_along_path[i]+1]]])) <= self.turn_range[i]:
                self.position_along_path[i] += 1
        
                v_abs = np.linalg.norm(self.velocities[i])
                self.max_speed[i] = float(edges['maxspeed'][road[self.position_along_path[i]]][road[self.position_along_path[i]+1]][0])*self.speed_up*size/(3.6*2*radius)

                node_direction = np.array([nodes['x'][road[self.position_along_path[i]+1]]- self.positions[i][0] , nodes['y'][road[self.position_along_path[i]+1]] - self.positions[i][1]])
                node_direction /= np.linalg.norm(node_direction)

                self.velocities[i] = v_abs*node_direction
                self.turn_range[i] = v_abs*dt
            
            #if exceed limit, put velocity back to speed limit
            v_abs = np.linalg.norm(self.velocities[i])
            if v_abs >= self.max_speed[i]:
                self.accel[i] -= (v_abs - self.max_speed[i])*self.velocities[i]/v_abs
                #self.velocities[i] += self.accel[i]*dt
                v = np.linalg.norm(self.velocities[i])
                self.turn_range[i] = v*dt

            '''if v_abs <= 0.05*self.max_speed[i]:
                #print(f'D: {self.accel[i]},{self.velocities[i]}')
                node_direction = np.array([nodes['x'][road[self.position_along_path[i]+1]]- self.positions[i][0] , nodes['y'][road[self.position_along_path[i]+1]] - self.positions[i][1]])
                node_direction /= np.linalg.norm(node_direction)
                self.accel[i] = self.max_speed[i]*node_direction'''

            #update positions
            self.velocities[i] += self.accel[i]*dt
            v = np.linalg.norm(self.velocities[i])
            self.turn_range[i] = v*dt
            self.positions[i] += self.velocities[i]*dt + self.accel[i]*dt*dt/2
            

# Funzione che usa la classe CARS

In [None]:
def traffic_simulation(graph, n_cars, radius, speed_up, closeness_centrality, size = 100, scale = 0.3, dt = 0.05, n_frame = 300, animate = True):
    
    ''' speed_up = speed_up factor making simulation faster than reality
    
        size : refers to maximum value of animation image boundaries. Needs to be converted in meters using Bologna diameter

        scale : refers to arrows dimensions within the animation
       
        turn range : range with which the car sees a new node and turn its velocity. Take it around  max_speed*dt
       '''
    cars = Cars(n_cars)
    edges_passage = []
    edge_counts = []
    accident_positions = []
    travelling_times = []

    nodes, edges = ox.graph_to_gdfs(graph, nodes=True, edges=True)
    nodes['x'] = ((nodes['x'] - min(nodes['x']))/(max(nodes['x'])-min(nodes['x'])))*size
    nodes['y'] = ((nodes['y']-min(nodes['y']))/(max(nodes['y']) - min(nodes['y'])))*size
    
    cars.set_initial_condition(graph,size,radius,speed_up,dt,edges_passage,edge_counts,nodes,edges,closeness_centrality)
    print('\nInitialization done\n')

    n_cars = cars.get_n_cars()
    c_positions = np.zeros((n_frame, n_cars, 2))
    c_velocities = np.zeros((n_frame, n_cars, 2))
    c_accel = np.zeros((n_frame, n_cars, 2))

    #for loop computing time steps for animation
    for n in tqdm(range(n_frame)):
        c_positions[n] = cars.get_positions()
        c_velocities[n] = cars.get_velocities()
        c_accel[n] = cars.get_accel()
        
        cars.set_acceleration_zero()

        cars.update(graph,size,radius,speed_up,dt,n,nodes,edges,edges_passage,edge_counts,accident_positions,travelling_times,closeness_centrality)
        

    if animate:
        fig, ax = plt.subplots(figsize=(7,7))

        for N in range(len(graph.nodes)):
            circle = plt.Circle((nodes['x'][N] , nodes['y'][N]) , size/900, color = 'grey')
            ax.add_patch(circle)
        
        
        velocities_magnitudes = np.linalg.norm(c_velocities[0], axis=1)
        velocities_normalized = c_velocities[0] / np.vstack([velocities_magnitudes, velocities_magnitudes]).T 
        #velocities_normalized = c_velocities[0] * 0.5
        scat = ax.quiver(c_positions[0][:,0], 
                        c_positions[0][:,1],
                        velocities_normalized[:,0],
                        velocities_normalized[:,1], color = 'red')
        
        
        ax.set_xlim(0,size)
        ax.set_ylim(0,size)

        #update function useful for animation
        def update(frame):
            scat.set_offsets(c_positions[frame])

            velocities_magnitudes = np.linalg.norm(c_velocities[frame], axis=1)
            velocities_normalized = c_velocities[frame]/ np.vstack([velocities_magnitudes, velocities_magnitudes]).T 
            #velocities_normalized *= scale
            scat.set_UVC(velocities_normalized[:,0]*scale, 
                        velocities_normalized[:,1]*scale)

            return scat,

        ani = FuncAnimation(fig, update, frames=n_frame, blit=True)
        print("Simulation finished. Video processing ...\n")
        display(HTML(ani.to_jshtml()))
    
    cars_velocities = np.zeros((np.shape(c_velocities)[0],np.shape(c_velocities)[1]))
    max_velocity_per_frame = np.zeros(n_frame)
    mean_velocity_per_frame = np.zeros(n_frame)


    for i in range(np.shape(c_velocities)[0]):
        for n in range(np.shape(c_velocities)[1]):
            cars_velocities[i][n] = np.linalg.norm(c_velocities[i][n])*3.6*2*radius/(size*speed_up)
        mean_velocity_per_frame[i] = np.mean(cars_velocities[i])
        max_velocity_per_frame[i] = max(cars_velocities[i])

    mean_speed = np.mean(mean_velocity_per_frame)

    cars_accelerations = np.zeros((np.shape(c_accel)[0],np.shape(c_accel)[1]))
    max_accel_per_frame = np.zeros(n_frame)
    min_accel_per_frame = np.zeros(n_frame)
    mean_accel_per_frame = np.zeros(n_frame)

    for i in range(np.shape(c_accel)[0]):
        for n in range(np.shape(c_accel)[1]):
            cars_accelerations[i][n] = np.linalg.norm(c_accel[i][n])*3.6*2*radius/(size*speed_up)
        mean_accel_per_frame[i] = np.mean(cars_accelerations[i])
        max_accel_per_frame[i] = max(cars_accelerations[i])
        min_accel_per_frame[i] = min(cars_accelerations[i])

    mean_accel = np.mean(mean_accel_per_frame)
    
    mean_travelling_time = sum(travelling_times)/len(travelling_times)

    print(f"\nVelocity respect to real life is {speed_up} times faster \n")
    print(f"Number of accidents: {cars.number_of_accidents()}\n")
    print(f'Mean speed: {mean_speed:.2f} Km/h\n')
    print(f'Mean acceleration: {mean_accel:.2f}Km/h/s\n')
    print(f'Min accel: {min(min_accel_per_frame)}, Max accel: {max(max_accel_per_frame)}\n')
    print(f'Mean travelling time: {int(mean_travelling_time/60)}m {round((mean_travelling_time/60 - int(mean_travelling_time/60))*60,2)}s\n')
    #nodes, edges = ox.graph_to_gdfs(traffic_graph, nodes=True, edges=True)
    street_passage = [(x[0],x[1],y) for x,y in zip(edges_passage,edge_counts)]

    return cars_velocities , cars_accelerations, max_accel_per_frame, max_velocity_per_frame, mean_speed, mean_accel, travelling_times, mean_travelling_time, street_passage,accident_positions

In [None]:
closeness_centrality = nx.closeness_centrality(G)

In [None]:
n_frame, dt, speed_up, n_cars= 100, 0.1, 100, 1000

cars_velocities , cars_accelerations, max_accel_per_frame, max_velocity_per_frame, mean_speed, mean_accel, travelling_times, mean_travelling_time, street_passage, accident_positions = traffic_simulation(G,
                n_cars = n_cars,
                radius = city_radius,
                speed_up = speed_up,
                closeness_centrality=closeness_centrality,
                size = 2*city_radius,
                scale = 0.3,
                dt = dt,
                n_frame = n_frame, 
                animate=False)

if max(max_velocity_per_frame) < 100:
    already_passed = []
    edge_counts = []
    with open('edges_passage.txt','r') as file:
        for line in file:
            values = line.split()
            # Convert the values to integers (assuming they're integers)
            edge1 = int(values[0])
            edge2 = int(values[1])
            count = int(values[2])
            already_passed.append((edge1,edge2))
            edge_counts.append(count)


    for edge in street_passage:
        if (edge[0],edge[1]) in already_passed:
            index = already_passed.index((edge[0],edge[1]))
            edge_counts[index] += 1
            continue
        already_passed.append((edge[0],edge[1]))
        edge_counts.append(edge[2])
            
    with open('edges_passage.txt','w') as file:
        pass

    with open('edges_passage.txt','w') as file:
        for edge in already_passed:
            index = already_passed.index(edge)
            file.write(f"{edge[0]} {edge[1]} {edge_counts[index]}\n")

    # travelling times and mean speed
    with open('travelling_times.txt','a') as file:
        for time in travelling_times:
            file.write(f"{time}\n")

    with open('mean_speed.txt','a') as file:
        file.write(f"{mean_speed}\n")
    
    with open('mean_accel.txt','a') as file:
    file.write(f"{mean_accel}\n")

    #accident positions
    if accident_positions:
        with open('accident_position.txt','a') as file:
            for accident in accident_positions:
                file.write(f"{accident[0]} {accident[1]}\n")
    print('Alright my man...')

In [None]:
max(max_velocity_per_frame)

In [None]:
#DON'T RUN!!!!!!!!!
#DON'T
#RUN!!!!!!!!!!1
with open('edges_passage.txt','w') as file:
    pass

with open('mean_speed.txt','w') as file:
    pass

with open('mean_accel.txt','w') as file:
    pass

with open('travelling_times.txt','w') as file:
    pass

with open('accident_position.txt','w') as file:
    pass

## Writing on a txt file the edge passage

In [None]:
already_passed = []
edge_counts = []
with open('edges_passage.txt','r') as file:
    for line in file:
        values = line.split()
        # Convert the values to integers (assuming they're integers)
        edge1 = int(values[0])
        edge2 = int(values[1])
        count = int(values[2])
        already_passed.append((edge1,edge2))
        edge_counts.append(count)


for edge in street_passage:
    if (edge[0],edge[1]) in already_passed:
        index = already_passed.index((edge[0],edge[1]))
        edge_counts[index] += 1
        continue
    already_passed.append((edge[0],edge[1]))
    edge_counts.append(edge[2])
        
with open('edges_passage.txt','w') as file:
    pass

with open('edges_passage.txt','w') as file:
    for edge in already_passed:
        index = already_passed.index(edge)
        file.write(f"{edge[0]} {edge[1]} {edge_counts[index]}\n")

# travelling times and mean speed
with open('travelling_times.txt','a') as file:
    for time in travelling_times:
        file.write(f"{time}\n")

with open('mean_speed.txt','a') as file:
    file.write(f"{mean_speed}\n")

with open('mean_accel.txt','a') as file:
    file.write(f"{mean_accel}\n")

#accident positions
if accident_positions:
    with open('accident_position.txt','a') as file:
        for accident in accident_positions:
            file.write(f"{accident[0]} {accident[1]}\n")

In [None]:
len(already_passed)

## Reading from a txt file

In [None]:
streets = []
with open('edges_passage.txt','r') as file:
    for line in file:
        values = line.split()
        # Convert the values to integers (assuming they're integers)
        edge1 = int(values[0])
        edge2 = int(values[1])
        count = int(values[2])
        streets.append((edge1,edge2,count))


Travelling_times = []
with open('travelling_times.txt','r') as file:
    for line in file:
        values = line.split()
        # Convert the values to integers (assuming they're integers)
        time = float(values[0])
        Travelling_times.append(time)

Travelling_times = np.array(Travelling_times)
# Create a figure and axis for plotting
fig, ax = plt.subplots()

# Define the initial histogram
n_bins = int(max(Travelling_times/60))
hist, _ = np.histogram(Travelling_times/60, bins=n_bins)
bars = ax.bar(range(n_bins), hist)

ax.set_title('Travelling times')
ax.set_xlabel('minutes')
ax.set_ylabel('Frequency')

mean_travelling_time = sum(Travelling_times)/len(Travelling_times)
mode = st.mode(Travelling_times)

plt.text(0.95, 0.95, f"Mean: {int(mean_travelling_time/60)}m {(mean_travelling_time/60 - int(mean_travelling_time/60))*60:.2f}s\n Mode: {int(mode/60)}m {(mode/60 - int(mode/60))*60:.2f}s\n", 
         horizontalalignment='right', verticalalignment='top', transform=plt.gca().transAxes)
#print(f'Mode: {int(mode/60)}m {(mode/60 - int(mode/60))*60:.2f}s\n')
#print(f'Mean travelling time: {int(mean_travelling_time/60)}m {round((mean_travelling_time/60 - int(mean_travelling_time/60))*60,2)}s\n')

plt.show()

In [None]:
show_traffic_and_accident(G,street_passage,accident_positions)

In [None]:
max(max_velocity_per_frame)

### Animazione velocità 

In [None]:
# Create a figure and axis for plotting
fig, ax = plt.subplots()

# Define the initial histogram
n_bins = int(max(max_velocity_per_frame))
#n_bins = int(max(cars_velocities[0]))
hist, _ = np.histogram(cars_velocities[0], bins=n_bins)
bars = ax.bar(range(n_bins), hist)

# Update function for each frame
def update(frame):
    ax.cla()  # Clear previous plot
    #n_bins = int(max(cars_velocities[frame]))
    hist, _ = np.histogram(cars_velocities[frame], bins=n_bins)
    ax.bar(range(n_bins), hist)
    ax.set_title('Speed')
    ax.set_xlabel('Km/h')
    ax.set_ylabel('Frequency')

    mean = np.mean(cars_velocities[frame])
    mode = st.mode(cars_velocities[frame])
    plt.text(0.95, 0.95, f"#Cars: {n_cars}\n Mean: {mean:.2f}Km/h\n Mode: {mode:.2f}Km/h\n Time: {int(speed_up*frame*dt/60)}m {(speed_up*frame*dt/60 - int(speed_up*frame*dt/60))*60:.1f}s\n", 
         horizontalalignment='right', verticalalignment='top', transform=plt.gca().transAxes)
# Create the animation
ani = FuncAnimation(fig, update, frames=np.shape(cars_velocities)[0])
display(HTML(ani.to_jshtml()))

plt.show()

Acceleration animation (?)

In [None]:
# Create a figure and axis for plotting
fig, ax = plt.subplots()

# Define the initial histogram
n_bins = int(max(max_accel_per_frame))
#n_bins = int(max(cars_velocities[0]))
hist, _ = np.histogram(cars_accelerations[0], bins=n_bins)
bars = ax.bar(range(n_bins), hist)

# Update function for each frame
def update(frame):
    ax.cla()  # Clear previous plot
    #n_bins = int(max(cars_velocities[frame]))
    hist, _ = np.histogram(cars_accelerations[frame], bins=n_bins)
    ax.bar(range(n_bins), hist)
    ax.set_title('Accelerations')
    ax.set_xlabel('$\\frac{Km}{h \cdot s}$')
    ax.set_ylabel('Frequency')

    mean = np.mean(cars_accelerations[frame])
    mode = st.mode(cars_accelerations[frame])
    plt.text(0.95, 0.95, f"#Cars: {n_cars}\n Mean: {mean:.2f}Km/h\n Mode: {mode:.2f}Km/h\n Time: {int(speed_up*frame*dt/60)}m {(speed_up*frame*dt/60 - int(speed_up*frame*dt/60))*60:.1f}s\n", 
         horizontalalignment='right', verticalalignment='top', transform=plt.gca().transAxes)
# Create the animation
ani = FuncAnimation(fig, update, frames=np.shape(cars_velocities)[0])
display(HTML(ani.to_jshtml()))

plt.show()

# DATI TRAFFICO REALI

In [None]:
traffico = pd.read_excel('rilevazione-flusso-veicoli-tramite-spire-anno-2024.xlsx')

In [None]:
traffico['total'] = np.zeros(len(traffico['x']))

In [None]:
traffico

In [None]:
for dati in traffico:
    if dati == '05:00-06:00':
        print(dati)
a = 11
b = 13
a_s = str(11)
b_s = str(13)
d = b-a
for _ in range(d):
    print(a_s+':00-'+str(int(a_s) + 1)+':00')
    a += 1
    a_s = str(a)


#traffico[a+':00-'+str(int(a) + 1)+':00']
#traffico['11:00-12:00']


# CAMBIA IL NUMERO DI 'PASSAGE'

In [None]:
def show_traffic_data(graph,traffic_data,days,hours = [0,24]): 
    traffic_edges = []
    passage_values = []
    
    #data = traffic_data['latitudine'],traffic_data['longitudine'],traffic_data['giorno settimana']

    # Filter traffic data based on 'giorno settimana'
    filtered_data = traffic_data[traffic_data['giorno settimana'].isin(days)]
    min_hour = hours[0]
    max_hour = hours[1]
    diff_hours = max_hour - min_hour
    min_hour_str = str(min_hour)
    max_hour_str = str(max_hour)
    d = max_hour - min_hour
    for _ in range(d):
        filtered_data['total'] += filtered_data[min_hour_str+':00-'+str(int(min_hour_str) + 1)+':00']
        min_hour += 1
        min_hour_str = str(min_hour)

    # Use `nearest_edges` function on filtered data
    #traffic_edges = filtered_data.apply(lambda row: ox.nearest_edges(graph, row['latitudine'], row['longitudine'])[:2], axis=1)

    '''with tqdm(total=len(filtered_data)) as pbar:
        # Use apply function to apply nearest_edges function on each row
        traffic_edges = filtered_data.apply(lambda row: ox.nearest_edges(graph, row['latitudine'], row['longitudine'])[:2], axis=1)
        pbar.update(len(filtered_data))'''
    # Calculate the total number of rows in the DataFrame
    #total_rows = len(filtered_data)

    traffic_edges = ox.nearest_edges(graph,filtered_data['x'],filtered_data['y'])

    for a in filtered_data['total']:
        passage_values.append(a)

    
    #with tqdm(total=total_rows) as pbar:
        #for lat, long in zip(filtered_data['latitudine'], filtered_data['longitudine']):
            #traffic_edges.append(ox.nearest_edges(graph,lat,long)[:2])

            #pbar.update(1)

    #for lat,long in tqdm(zip(filtered_data['latitudine'],filtered_data['longitudine'])):
        #if filtered_data['giorno settimana'][i] in days:
        #traffic_edges.append(ox.nearest_edges(graph,data[0][i],data[1][i])[:2])
        #traffic_edges.append(ox.nearest_edges(graph,lat,long)[:2])

    nodes,edges = ox.graph_to_gdfs(graph,nodes=True,edges=True)

    for i in range(len(traffic_edges)):
        edges.loc[(traffic_edges[i][0],traffic_edges[i][1]),'passage'][0] = passage_values[i]
    

    min_passage = sum(passage_values)/len(passage_values)
    
    for u,v,data in graph.edges(data=True):
        if data['passage'] == 0:
            edges.loc[(u,v),'passage'][0] += 0
            
    #for edge in traffic_edges:
        #edges.loc[(edge[0],edge[1]),'passage'][0] += edges[2]

    graph = ox.graph_from_gdfs(nodes,edges)

    #Get edge passages
    edge_values = np.array([data['passage'] for u, v, data in graph.edges(data=True)])
    norm_edge_values = (edge_values - min(edge_values))/(max(edge_values - min(edge_values)))

    # Define a colormap
    cmap = plt.cm.RdBu 

    # Normalize edge lengths
    norm = plt.Normalize(min(edge_values), max(edge_values))

    # Map edge lengths to colors
    colors = [cmap(norm(value)) for value in edge_values]

    #Plot the network with colored edges
    fig , ax = ox.plot_graph(graph, edge_color=plt.cm.RdBu(norm_edge_values), node_size=0, bgcolor='black', edge_linewidth=0.5,show=False)

    # Add colorbar
    sm = plt.cm.ScalarMappable(cmap=cmap, norm=norm)
    sm.set_array([])
    plt.colorbar(sm, ax=ax,label='Edge Passage')
    plt.title('Traffic data')
    plt.show()

    return passage_values
    

In [None]:
days = ['Mercoledì']
passage_values = show_traffic_data(G,traffico,days,[11,13])

In [None]:
'''for _,_,data in G.edges(data=True):
        if float(data['maxspeed']) > 40:
            data['maxspeed'] = 0.001'''
# Get edge passages
edge_values = np.array([data['maxspeed'] for u, v, key, data in G.edges(keys=True, data=True)])
norm_edge_values = (edge_values - min(edge_values))/(max(edge_values - min(edge_values)))

# Define a colormap
cmap = plt.cm.RdBu 

# Normalize edge lengths
norm = plt.Normalize(min(edge_values), max(edge_values))

# Map edge lengths to colors
colors = [cmap(norm(value)) for value in edge_values]

#Plot the network with colored edges
fig , ax = ox.plot_graph(G, edge_color=colors, node_size=0, bgcolor='black', edge_linewidth=0.5,show=False)

# Add colorbar
sm = plt.cm.ScalarMappable(cmap=cmap, norm=norm)
sm.set_array([])
plt.colorbar(sm, ax=ax,label='Edge Passage')
plt.title('Roads more populated')
plt.show()

# Incidenti

In [None]:
incidenti = pd.read_excel('Incidenti.xlsx')

In [None]:
incidenti

In [None]:
print(incidenti['DATA'][0].year,type(incidenti['DATA'][0].year))
print(type(incidenti['GIORNO'][0]))
print(incidenti['ORA'][0],type(incidenti['ORA'][0]))
print(type(incidenti['X'][1].astype(float)))
len(incidenti['X'])
a = [2010,2011]
incidenti['DATA'][0].year in a

In [None]:
def show_accident_data(graph,accident_data,years,hours=[0,24]):
    nodes,edges = ox.graph_to_gdfs(graph,nodes=True,edges=True)
    
    graph = ox.graph_from_gdfs(nodes,edges)
    
    edge_accident = []
    hour = accident_data['ORA'].astype(float)

    x = accident_data['X'].astype(float)
    y = accident_data['Y'].astype(float)


    
    for i in range(len(accident_data)):
        if accident_data['DATA'][i].year in years: #and hour[i] > hours[0] and hour[i] < hours[1]:
            edge_accident.append(ox.nearest_edges(graph,x[i],y[i]))

    for edge in edge_accident:
        edges.loc[(edge[0],edge[1]),'accident'][0] += 1
    

    graph = ox.graph_from_gdfs(nodes,edges)

    #Get edge passages
    edge_values = np.array([data['accident'] for u, v, data in graph.edges(data=True)])
    norm_edge_values = (edge_values - min(edge_values))/(max(edge_values)-min(edge_values))
    # Define a colormap
    cmap = plt.cm.RdBu 

    # Normalize edge lengths
    norm = plt.Normalize(min(edge_values), max(edge_values))

    # Map edge lengths to colors
    colors = [cmap(norm(value)) for value in edge_values]

    #Plot the network with colored edges
    fig , ax = ox.plot_graph(graph, edge_color=plt.cm.RdBu(norm_edge_values), node_size=0., bgcolor='black', edge_linewidth=0.5,show=False)

    # Add colorbar
    sm = plt.cm.ScalarMappable(cmap=cmap, norm=norm)
    sm.set_array([])
    plt.colorbar(sm, ax=ax,label='Edge Passage')
    plt.title('Traffic data')
    plt.show()


    


In [None]:
show_accident_data(G,incidenti,[2018,2019,2020])

In [None]:
x=incidenti['X']
y=incidenti['Y']

plt.scatter(x,y,s=0.5)
plt.show()

In [None]:
# Must be an overpass instance which supports attic
ox.settings.overpass_endpoint = "https://overpass-api.de/api"

def query_year(coordinate, year):
    date = f'{year}-01-01T00:00:00Z'
    # Request attic data
    ox.settings.overpass_settings = '[out:json][timeout:{timeout}][date:"' + date + '"]{maxsize}'
    graph = ox.graph_from_point(cities['Bologna'], dist=4000, dist_type='bbox', network_type='drive_service', simplify = True)
    # Restore old settings
    ox.settings.overpass_settings = '[out:json][timeout:{timeout}]{maxsize}'
    fig, ax = ox.plot.plot_graph(G, bgcolor='black',node_size=5, node_color='white', show=False)
    plt.axis('on')
    plt.title('Bologna')
    plt.show()
    return graph

G = query_year((44.495555, 11.3428),2024)
clean_graph_data(G)
# Get edge passages
edge_values = np.array([data['maxspeed'] for u, v, key, data in G.edges(keys=True, data=True)])
norm_edge_values = (edge_values - min(edge_values))/(max(edge_values - min(edge_values)))

# Define a colormap
cmap = plt.cm.RdBu 

# Normalize edge lengths
norm = plt.Normalize(min(edge_values), max(edge_values))

# Map edge lengths to colors
colors = [cmap(norm(value)) for value in edge_values]

#Plot the network with colored edges
fig , ax = ox.plot_graph(G, edge_color=colors, node_size=0, bgcolor='black', edge_linewidth=0.5,show=False)

# Add colorbar
sm = plt.cm.ScalarMappable(cmap=cmap, norm=norm)
sm.set_array([])
plt.colorbar(sm, ax=ax,label='Edge Passage')
plt.title('Roads more populated')
plt.show()