In [3]:
import traci
import networkx as nx
import numpy as np
from typing import Dict, List, Optional, Tuple
import sumolib
from pathlib import Path

class EmergencyRouter:
    def __init__(self):
        """Initialize the emergency router with a directed graph"""
        self.network = nx.DiGraph()
        # Load the network file
        self.net = sumolib.net.readNet('additional/osm.net.xml')
        # Network will be built when first needed
        self.nodes = [node.getID() for node in self.net.getNodes()]
    
    def _build_network(self):
        """Build the network graph from SUMO network"""
        # Clear existing network
        self.network.clear()
        # ===============================================
        # Add nodes (junctions)
        for junction in self.net.getNodes():
            self.network.add_node(junction.getID())
        # ===============================================

        # Add edges (roads)
        for edge in self.net.getEdges():
            from_node = edge.getFromNode().getID()
            to_node = edge.getToNode().getID()
            length = edge.getLength()
            self.network.add_edge(from_node, to_node, weight=length, edge_id=edge.getID())

        print ("Loaded graph with ", len(self.network.nodes()), " nodes and ", len(self.network.edges()), " edges")

        # ===============================================

    def optimize_route(self, emergency_vehicle: Dict) -> Optional[List[str]]:
        """
        Optimize route for an emergency vehicle based on current traffic conditions
        
        Args:
            emergency_vehicle: Dictionary containing vehicle information
            
        Returns:
            List of edge IDs representing the optimized route, or None if no route found
        """
        # Build network if not already built
        if len(self.network.edges) == 0:
            self._build_network()
        # Get current position and destination
        current_edge = traci.vehicle.getRoadID(emergency_vehicle['id'])
        if current_edge.startswith(":"):
            print ("==== [Debug] ==== : Vehicle is in junction")
            parts = current_edge.replace(":", "").split("_")[:-1]
            current_node = "_".join(parts)
            return None
        elif current_edge in self.nodes:
            print ("==== [Debug] ==== : Vehicle is in junction")
            current_node = current_edge
            return None
        else:   
            current_node = self._get_current_node(current_edge,type='from')

        print ("==== [Debug] ==== : current_edge: ",current_edge)

        print ("==== [Debug] ==== : current_node: ",current_node)

        route = emergency_vehicle['route']
        destination = route[-1]
        destination_node = self._get_current_node(destination,type='to')
        print ("==== [Debug] ==== : destination_edge: ",destination)
        print ("==== [Debug] ==== : destination_node: ",destination_node)

        if current_node == destination_node:
            print ("==== [Debug] ==== : current_node is the same as destination_node")
            return None
        # Update edge weights based on current traffic conditions
        self._update_edge_weights()
        
        try:
            # Find shortest path using Dijkstra's algorithm
            path = nx.shortest_path(
                self.network,
                source=current_node,
                target=destination_node,
                weight='weight'
            )
            
            # Convert node path to edge path
            edge_path = []
            for i in range(len(path) - 1):
                edge = self.network[path[i]][path[i + 1]]['edge_id']
                edge_path.append(edge)
            print ("==== [Debug] ==== : edge_path: ",path)
            return [current_edge] + edge_path
            
        except nx.NetworkXNoPath:
            print(f"No path found for vehicle {emergency_vehicle['id']}")
            return None
    
    def _update_edge_weights(self):
        """Update edge weights based on current traffic conditions"""

        edge_id_list = [edge.getID() for edge in self.net.getEdges()]

        
        # for edge_id in traci.edge.getIDList():
        for edge_id in edge_id_list:
            # Get current traffic data
            vehicle_count = traci.edge.getLastStepVehicleNumber(edge_id)
            mean_speed = traci.edge.getLastStepMeanSpeed(edge_id)
            
            # Get edge from network
        
            edge = self.net.getEdge(edge_id)
            if edge:
                max_speed = edge.getSpeed()
                
                # Calculate congestion factor
                if mean_speed > 0:
                    congestion_factor = 1 + (vehicle_count * (1 - mean_speed/max_speed))
                else:
                    congestion_factor = float('inf')
                
                # Update edge weight
                base_length = edge.getLength()
                from_node = edge.getFromNode().getID()
                to_node = edge.getToNode().getID()
                
                if self.network.has_edge(from_node, to_node):
                    self.network[from_node][to_node]['weight'] = base_length * congestion_factor
    def _get_current_node(self, edge_id,type='from'):
        edge = self.net.getEdge(edge_id)
        if edge:
            if type == 'from':
                return edge.getToNode().getID()
            elif type == 'to':
                return edge.getToNode().getID()
        else:
            return None
        
    def get_route_metrics(self, route: List[str]) -> Dict[str, float]:
        """
        Calculate metrics for a given route
        
        Args:
            route: List of edge IDs representing the route
            
        Returns:
            Dictionary containing route metrics
        """
        total_length = 0
        total_time = 0
        
        for edge_id in route:
            # Get edge from network
            edge = self.net.getEdge(edge_id)
            if edge:
                # Get edge length
                length = edge.getLength()
                total_length += length
                
                # Estimate time based on current speed
                mean_speed = traci.edge.getLastStepMeanSpeed(edge_id)
                if mean_speed > 0:
                    total_time += length / mean_speed
        
        return {
            'total_length': total_length,
            'estimated_time': total_time,
            'num_edges': len(route)
        } 

In [4]:
def bulid_graph ():
    net = sumolib.net.readNet('additional/osm.net.xml')
    graph = nx.DiGraph()
    # Add nodes (junctions)
    for junction in net.getNodes():
        graph.add_node(junction.getID())

    # Add edges (roads)
    for edge in net.getEdges():
        if edge.allows('passenger'):
            from_node = edge.getFromNode().getID()
            to_node = edge.getToNode().getID()
            length = edge.getLength()
            graph.add_edge(from_node, to_node, weight=length, edge_id=edge.getID())

    return graph, net
        
def is_emergency_vehicle(veh_id):
    """Check if a vehicle is an emergency vehicle"""
    try:
        veh_type = traci.vehicle.getTypeID(veh_id)
        return veh_type in ['ambulance', 'fire_truck', 'police']
    except Exception as e:
        print(f"❌ Vehicle type check error for {veh_id}: {str(e)}")
        return False

def test_emergency_detection():
    """Test emergency vehicle detection"""
    try:
        emergency_vehicles = []
        for veh_id in traci.vehicle.getIDList():
            if is_emergency_vehicle(veh_id):
                emergency_vehicles.append({
                    'id': veh_id,
                    'position': traci.vehicle.getPosition(veh_id),
                    'speed': traci.vehicle.getSpeed(veh_id),
                    'route': traci.vehicle.getRoute(veh_id),
                    'type': traci.vehicle.getTypeID(veh_id)
                })
        
        if emergency_vehicles:
            print(f"✓ Detected {len(emergency_vehicles)} emergency vehicles")
        
        return emergency_vehicles
        
    except Exception as e:
        print(f"❌ Emergency detection error: {str(e)}")
        return []
    

def update_vehicle_route(vehicle_id, new_route):
    """Update vehicle's route with the optimized route"""
    try:
        # Get current route
        current_route = traci.vehicle.getRoute(vehicle_id)
        
        # Only update if the new route is different
        if new_route != current_route:
            print(f"\n🔄 Updating route for vehicle {vehicle_id}")
            print(f"Original route: {current_route}")
            print(f"New optimized route: {new_route}")
            
            # Update the route
            traci.vehicle.setRoute(vehicle_id, new_route)
            print("✓ Route updated successfully")
            
            # Highlight the new route edges
            for edge_id in new_route:
                traci.edge.setColor(edge_id, (0, 255, 0, 255))  # Green color for optimized route
    except Exception as e:
        print(f"❌ Error updating route: {str(e)}")


In [5]:
def run_simulation():
    """Run a simple SUMO simulation with TraCI"""

    # Initialize SUMO
    print("=== Starting SUMO Simulation ===")
    
    # Get the path to the SUMO configuration file
    sumocfg_path = 'osm.sumocfg'
    
    # Start SUMO with GUI
    sumo_binary = sumolib.checkBinary('sumo-gui')
    traci.start([sumo_binary, '-c', sumocfg_path])
    
    # Simulation loop
    step = 0
    max_steps = 500  # Run for 1 hour of simulation time
    
    print ("\n\n\n===============================================")
    print(f"Simulation will run for {max_steps} steps")
    # ===============================================
    # initialize 
    graph, net = bulid_graph()
    print ("Loaded graph with ", len(graph.nodes()), " nodes and ", len(graph.edges()), " edges")
    router = EmergencyRouter()
    router._build_network()
    # ===============================================


    while step < max_steps and traci.simulation.getMinExpectedNumber() > 0:
        # Perform simulation step
        traci.simulationStep()
        # ===============================================
        # Test emergency vehicle detection
        if step % 10 == 0:
            emergency_vehicles = test_emergency_detection()
 
            for ev in emergency_vehicles:
                ev_id = ev['id']
                ev_type = ev['type'] 
                print(f"  - {ev_type}: {ev_id}")
                original_route = traci.vehicle.getRoute(ev_id)
                print (f"Original Route of {ev_id}: ",original_route)
                optimized_route = router.optimize_route(ev)
                print (f"Optimized Route for {ev_id}: ",optimized_route)
                if optimized_route and optimized_route != original_route:
                    print(f"✓ Route optimized for {ev_id}")
                    traci.vehicle.setRoute(ev_id, optimized_route)
                    print("✓ Route updated successfully")
                else:
                    print ("No optimization")
                
        step += 1
    
    print("\n=== Simulation Complete ===")
    print(f"Total steps: {step}")
        

    traci.close()

if __name__ == "__main__":
    run_simulation() 

=== Starting SUMO Simulation ===
 Retrying in 1 seconds







Simulation will run for 500 steps
Loaded graph with  682  nodes and  475  edges
✓ Detected 1 emergency vehicles
  - ambulance: amb1
==== [Debug] ==== : Edge real time:  308558730
==== [Debug] ==== : Route real time:  ('308558730', '-518147860#4', '-518147860#3', '-518147860#2', '-518147861', '-238483769#2', '-238483769#1', '-238483769#0', '238483771#0', '238483771#1', '238483771#2', '1326528807#0', '1326528807#1', '238742346#0', '238742346#2')
Original Route of amb1:  ('308558730', '-518147860#4', '-518147860#3', '-518147860#2', '-518147861', '-238483769#2', '-238483769#1', '-238483769#0', '238483771#0', '238483771#1', '238483771#2', '1326528807#0', '1326528807#1', '238742346#0', '238742346#2')
==== [Debug] ==== : current_edge:  308558730
==== [Debug] ==== : current_node:  cluster_1270070821_1270070862
==== [Debug] ==== : destination_edge:  238742346#2
==== [Debug] ==== : destination_node:  282282903
==== [Debug] ==== : edge_path:  ['cluster_1270070821_1270070862', '5080634555', '43

FatalTraCIError: Connection closed by SUMO.