# Bando Traffic Model with Mesa-Geo

This notebook implements the Bando Optimal Velocity traffic model using the Mesa-Geo agent-based modeling framework.

## Model Components

**Agents:**
- **VehicleAgent**: Represents individual vehicles following Bando dynamics
- **RoadAgent**: Represents road segments from the Singapore road network

**Environment:** 
- Road network from OpenStreetMap (Singapore)
- GeoSpace for spatial interactions

**Dynamics:**
- Bando Optimal Velocity Model for car-following behavior
- Vehicles adjust speed based on distance to leading vehicle
- Optional bottleneck effects for congestion modeling

**Parameters:**
- N: Number of vehicles per road segment
- tau: Optimal velocity scaling parameter
- alpha: Sensitivity to optimal velocity (vs neighbor velocity)
- epsilon: Bottleneck strength
- L: Road segment length (normalized)

In [None]:
# Imports
import osmnx as ox
import networkx as nx
import numpy as np
import pandas as pd
import geopandas as gpd
from shapely.geometry import Point, LineString
import folium

# Mesa and Mesa-Geo
import mesa
from mesa.visualization import SolaraViz, make_plot_component
import mesa_geo as mg
from mesa_geo.visualization import make_geospace_component

# Settings
ox.settings.use_cache = True

## 1. Build Road Network

Load Singapore road network from OpenStreetMap and prepare edge data.

In [None]:
# Build network
G = ox.graph_from_place("Singapore", network_type="drive")
G = ox.add_edge_speeds(G)
G = ox.add_edge_travel_times(G)

# Get edges
edges_gdf = ox.graph_to_gdfs(G, nodes=False, fill_edge_geometry=True)

# Prepare edges DataFrame
if not {"u", "v", "key"}.issubset(edges_gdf.columns):
    edges_gdf = edges_gdf.reset_index()

# Select relevant columns
edge_cols = [c for c in ["u", "v", "key", "geometry", "length", "highway", "maxspeed", "speed_kph", "travel_time"]
             if c in edges_gdf.columns]
edges = edges_gdf[edge_cols].copy()
edges = edges.set_crs(4326) if edges.crs is None else edges.to_crs(4326)

print(f"Total edges: {len(edges)}")
edges.head()

Total edges: 45949


Unnamed: 0,u,v,key,geometry,length,highway,maxspeed,speed_kph,travel_time
0,25451929,6749812859,0,"LINESTRING (103.87254 1.29523, 103.87103 1.295...",765.027747,motorway,70,70.0,39.344284
1,25455287,1637003462,0,"LINESTRING (103.874 1.29544, 103.87413 1.2955,...",629.055082,motorway_link,50,50.0,45.291966
2,25455287,2521018789,0,"LINESTRING (103.874 1.29544, 103.87438 1.29544...",652.575579,motorway,90,90.0,26.103023
3,26777521,172424179,0,"LINESTRING (103.82357 1.30398, 103.82359 1.303...",113.99577,primary,40,40.0,10.259619
4,26777521,1889379421,0,"LINESTRING (103.82357 1.30398, 103.82372 1.304...",86.667744,residential,50,50.0,6.240078


## 2. Bando Model Functions

Core functions for the Bando Optimal Velocity Model.

In [None]:
def optimal_vel(h, tau, L):
    """Optimal velocity function - returns normalized velocity (0-1)"""
    opt_v = tau/L * (np.tanh(h-2) + np.tanh(2)) / (1+np.tanh(2))
    return opt_v

def optimal_vel_bottleneck(h, tau, L, x, epsilon):
    """Optimal velocity with bottleneck effect"""
    xi = x % L
    bottleneck_factor = (1 - epsilon * np.exp(- (xi - (L/2))**2 ))
    opt_v = optimal_vel(h, tau, L)
    return bottleneck_factor * opt_v

def bando_acceleration(x, v, N, tau, alpha, bottleneck, epsilon, L):
    """Calculate Bando model acceleration for all vehicles"""
    xi1, xi2 = x, np.append(x, x+L)[1:N+1]  # periodic boundary
    vi1, vi2 = v, np.append(v, v)[1:N+1]
    h = xi2 - xi1  # headway (distance to leading vehicle)

    if bottleneck:
        bando_term = optimal_vel_bottleneck(h, tau, L, x, epsilon) - vi1
    else:
        bando_term = optimal_vel(h, tau, L) - vi1

    aggre_term = vi2 - vi1
    acceleration = alpha * bando_term + (1-alpha) * aggre_term
    return acceleration

## 3. Vehicle Agent Class

Vehicles follow Bando dynamics along road segments.

In [None]:
class VehicleAgent(mg.GeoAgent):
    """Vehicle agent following Bando dynamics."""

    def __init__(self, model, geometry, crs, road_segment, position, velocity, vehicle_index):
        super().__init__(model, geometry, crs)
        self.road_segment = road_segment  # Reference to road agent
        self.position = position  # Position on road (0-1 normalized)
        self.velocity = velocity  # Current velocity (normalized)
        self.vehicle_index = vehicle_index  # Index in vehicle array
        self.acceleration = 0.0
        
    def __repr__(self):
        return f"Vehicle {self.unique_id} on road {self.road_segment.edge_id}"
    
    def update_position_on_road(self):
        """Update geometry based on normalized position along road."""
        # Get point along line at current position
        road_geom = self.road_segment.geometry
        point_on_road = road_geom.interpolate(self.position, normalized=True)
        self.geometry = point_on_road
    
    def step(self):
        """Update vehicle position based on Bando dynamics."""
        # Dynamics are calculated at road segment level
        # Individual vehicles just update their position
        self.update_position_on_road()

## 4. Road Agent Class

Road segments manage vehicles and apply Bando dynamics.

In [None]:
class RoadAgent(mg.GeoAgent):
    """Road segment agent managing vehicle flow."""
    
    def __init__(self, model, geometry, crs, edge_id, u, v, key, 
                 length_m, speed_kph, highway_type, num_vehicles):
        super().__init__(model, geometry, crs)
        self.edge_id = edge_id
        self.u = u
        self.v = v
        self.key = key
        self.length_m = length_m
        self.speed_kph = speed_kph
        self.highway_type = highway_type
        self.num_vehicles = num_vehicles
        
        # Bando parameters (highway-specific)
        self.set_bando_params()
        
        # Vehicle state arrays
        self.positions = None  # Vehicle positions (normalized 0-1)
        self.velocities = None  # Vehicle velocities (normalized 0-1)
        self.vehicles = []  # List of vehicle agents on this road
        
        # Velocity history for steady-state calculation
        self.velocity_history = []
        self.internal_step_count = 0
        self.warmup_steps = 0  # Will be set after initialization
        
        # Performance metrics
        self.mean_speed_kph = speed_kph  # Initialize to free flow
        self.speed_ratio = 1.0
        
    def set_bando_params(self):
        """Set Bando parameters based on highway type."""
        highway_params = {
            'motorway': {'tau': 40, 'alpha': 0.9, 'epsilon': 0.2, 'base_ratio': 0.75},
            'trunk': {'tau': 35, 'alpha': 0.85, 'epsilon': 0.3, 'base_ratio': 0.80},
            'primary': {'tau': 30, 'alpha': 0.8, 'epsilon': 0.4, 'base_ratio': 0.85},
            'secondary': {'tau': 25, 'alpha': 0.75, 'epsilon': 0.5, 'base_ratio': 0.90},
            'tertiary': {'tau': 20, 'alpha': 0.7, 'epsilon': 0.6, 'base_ratio': 0.95},
            'residential': {'tau': 15, 'alpha': 0.6, 'epsilon': 0.7, 'base_ratio': 0.95}
        }
        
        params = highway_params.get(self.highway_type, highway_params['residential'])
        self.tau = params['tau']
        self.alpha = params['alpha']
        self.epsilon = params['epsilon']
        self.base_ratio = params['base_ratio']
        self.L = 1.0  # Normalized length
        self.bottleneck = self.highway_type in ['motorway', 'trunk', 'primary']
        
    def initialize_vehicles(self):
        """Initialize vehicle positions and velocities."""
        N = self.num_vehicles
        
        # Initialize positions uniformly with small perturbation
        self.positions = np.linspace(0, self.L, N, endpoint=False)
        perturbation = self.model.rng.random(N) * 0.1
        self.positions = (self.positions + perturbation) % self.L
        
        # Initialize velocities
        if self.bottleneck:
            self.velocities = np.array([optimal_vel_bottleneck(self.L/N, self.tau, self.L, xi, self.epsilon) 
                                       for xi in self.positions])
        else:
            self.velocities = np.ones(N) * optimal_vel(self.L/N, self.tau, self.L)
        
        # Add small velocity perturbation
        perturbation_v = self.model.rng.random(N) * 0.05
        self.velocities = np.clip(self.velocities + perturbation_v, 0, None)
        
        # Run internal warmup simulation to reach steady state
        self._run_warmup_simulation()
        
    def _run_warmup_simulation(self, time_end=8.0, delta_t=0.005):
        """Run simulation to steady state before main simulation starts."""
        total_steps = int(time_end / delta_t)
        self.warmup_steps = int(0.75 * total_steps)  # Use last 25% for steady-state
        
        velocity_history = []
        
        for step in range(total_steps):
            # Calculate acceleration
            a = bando_acceleration(self.positions, self.velocities, self.num_vehicles,
                                  self.tau, self.alpha, self.bottleneck, self.epsilon, self.L)
            
            # Euler update
            self.positions = self.positions + delta_t * self.velocities
            self.velocities = self.velocities + delta_t * a
            
            # Periodic boundary conditions
            self.positions = self.positions % self.L
            self.velocities = np.clip(self.velocities, 0, None)
            
            # Store velocities after warmup
            if step >= self.warmup_steps:
                velocity_history.append(self.velocities.copy())
        
        # Calculate steady-state speed using same approach as sg_traffic_bando
        if len(velocity_history) > 0:
            v_steady = np.array(velocity_history)
            mean_v_normalized = np.mean(v_steady)
            
            # IMPROVED SCALING: Use target ratios with variation (matching sg_traffic_bando)
            variation = (mean_v_normalized - 0.5) * 0.3  # ±15% variation around base
            speed_ratio = self.base_ratio + variation
            speed_ratio = np.clip(speed_ratio, 0.4, 1.1)  # realistic bounds
            
            # Add some random traffic variation (matching sg_traffic_bando)
            traffic_noise = self.model.rng.normal(0, 0.05)
            speed_ratio += traffic_noise
            speed_ratio = np.clip(speed_ratio, 0.3, 1.2)
            
            self.speed_ratio = speed_ratio
            self.mean_speed_kph = self.speed_kph * speed_ratio
        
    def update_bando_dynamics(self, delta_t=0.005):
        """Update vehicle dynamics using Bando model (Euler method)."""
        if self.num_vehicles == 0:
            return
            
        # Calculate acceleration
        a = bando_acceleration(self.positions, self.velocities, self.num_vehicles,
                              self.tau, self.alpha, self.bottleneck, self.epsilon, self.L)
        
        # Euler update
        self.positions = self.positions + delta_t * self.velocities
        self.velocities = self.velocities + delta_t * a
        
        # Periodic boundary conditions
        self.positions = self.positions % self.L
        self.velocities = np.clip(self.velocities, 0, None)
        
        # Store velocity history for ongoing analysis
        self.velocity_history.append(self.velocities.copy())
        self.internal_step_count += 1
        
        # Update metrics periodically using recent history
        if self.internal_step_count % 50 == 0 and len(self.velocity_history) > 10:
            # Use last 10 steps for moving average
            recent_v = np.array(self.velocity_history[-10:])
            mean_v_normalized = np.mean(recent_v)
            
            # Use same scaling approach as initialization
            variation = (mean_v_normalized - 0.5) * 0.3
            speed_ratio = self.base_ratio + variation
            speed_ratio = np.clip(speed_ratio, 0.4, 1.1)
            
            self.speed_ratio = speed_ratio
            self.mean_speed_kph = self.speed_kph * speed_ratio
        
    def __repr__(self):
        return f"Road {self.edge_id} ({self.highway_type})"
    
    def step(self):
        """Update road and vehicle dynamics."""
        # Update Bando dynamics
        self.update_bando_dynamics(delta_t=self.model.delta_t)
        
        # Update vehicle agent positions
        for i, vehicle in enumerate(self.vehicles):
            if i < len(self.positions):
                vehicle.position = self.positions[i]
                vehicle.velocity = self.velocities[i]

## 5. Bando Traffic Model Class

Main model class managing the traffic simulation.

In [None]:
class BandoTrafficModel(mesa.Model):
    """Mesa-Geo model for Bando traffic simulation."""
    
    def __init__(self, edges_gdf, num_roads=50, vehicles_per_road=10, 
                 delta_t=0.005, min_road_length=200):
        super().__init__()
        
        # Model parameters
        self.num_roads = num_roads
        self.vehicles_per_road = vehicles_per_road
        self.delta_t = delta_t
        self.min_road_length = min_road_length
        
        # GeoSpace
        self.space = mg.GeoSpace(warn_crs_conversion=False)
        
        # Data collection
        self.datacollector = mesa.DataCollector(
            model_reporters={
                "Mean Speed (km/h)": lambda m: np.mean([r.mean_speed_kph for r in m.agents_by_type[RoadAgent]]) if m.agents_by_type[RoadAgent] else 0,
                "Mean Speed Ratio": lambda m: np.mean([r.speed_ratio for r in m.agents_by_type[RoadAgent]]) if m.agents_by_type[RoadAgent] else 0,
                "Total Vehicles": lambda m: sum([r.num_vehicles for r in m.agents_by_type[RoadAgent]]) if m.agents_by_type[RoadAgent] else 0,
            }
        )
        
        # Initialize road network
        self.setup_roads(edges_gdf)
        
        # Initialize vehicles
        self.setup_vehicles()
        
        self.running = True
        
    def setup_roads(self, edges_gdf):
        """Create road agents from edges GeoDataFrame."""
        # Filter roads
        filtered_edges = edges_gdf[edges_gdf['length'] >= self.min_road_length].copy()
        
        # Sample roads if needed
        if len(filtered_edges) > self.num_roads:
            sampled_edges = filtered_edges.sample(self.num_roads, random_state=42)
        else:
            sampled_edges = filtered_edges
        
        # Create road agents
        for idx, row in sampled_edges.iterrows():
            # Extract edge info
            if isinstance(idx, tuple):
                u, v, key = idx
            else:
                u = row.get('u', idx)
                v = row.get('v', None)
                key = row.get('key', 0)
            
            # Handle highway type
            highway_raw = row.get('highway', 'residential')
            if isinstance(highway_raw, list):
                highway_type = highway_raw[0] if highway_raw else 'residential'
            else:
                highway_type = str(highway_raw) if highway_raw else 'residential'
            
            # Determine number of vehicles based on highway type
            highway_vehicles = {
                'motorway': 20,
                'trunk': 15,
                'primary': 12,
                'secondary': 10,
                'tertiary': 8,
                'residential': 6
            }
            num_vehicles = highway_vehicles.get(highway_type, self.vehicles_per_road)
            
            # Create road agent
            road_agent = RoadAgent(
                model=self,
                geometry=row['geometry'],
                crs=self.space.crs,
                edge_id=f"{u}_{v}_{key}",
                u=u,
                v=v,
                key=key,
                length_m=row['length'],
                speed_kph=row.get('speed_kph', 50),
                highway_type=highway_type,
                num_vehicles=num_vehicles
            )
            
            # Initialize vehicle dynamics
            road_agent.initialize_vehicles()
            
            # Add to space
            self.space.add_agents(road_agent)
    
    def setup_vehicles(self):
        """Create vehicle agents on roads."""
        for road in self.agents_by_type[RoadAgent]:
            for i in range(road.num_vehicles):
                # Get position along road
                position_normalized = road.positions[i]
                point_on_road = road.geometry.interpolate(position_normalized, normalized=True)
                
                # Create vehicle
                vehicle = VehicleAgent(
                    model=self,
                    geometry=point_on_road,
                    crs=self.space.crs,
                    road_segment=road,
                    position=position_normalized,
                    velocity=road.velocities[i],
                    vehicle_index=i
                )
                
                road.vehicles.append(vehicle)
                self.space.add_agents(vehicle)
    
    def step(self):
        """Run one step of the model."""
        # Update roads (which update vehicles)
        self.agents_by_type[RoadAgent].do("step")
        
        # Update vehicle positions
        self.agents_by_type[VehicleAgent].do("step")
        
        # Collect data
        self.datacollector.collect(self)

## 6. Run Basic Simulation

Test the model with a simple run.

In [None]:
# Create model
model = BandoTrafficModel(edges, num_roads=50, vehicles_per_road=10)

print(f"Total roads: {len(model.agents_by_type[RoadAgent])}")
print(f"Total vehicles: {len(model.agents_by_type[VehicleAgent])}")

# Run simulation
for i in range(100):
    model.step()
    if i % 20 == 0:
        print(f"Step {i}: Mean speed = {model.datacollector.model_vars['Mean Speed (km/h)'][-1]:.2f} km/h")

# Display results
results_df = model.datacollector.get_model_vars_dataframe()
results_df

Total roads: 50
Total vehicles: 468
Step 0: Mean speed = 5.92 km/h
Step 20: Mean speed = 5.96 km/h
Step 40: Mean speed = 5.99 km/h
Step 60: Mean speed = 6.03 km/h
Step 80: Mean speed = 6.07 km/h


Unnamed: 0,Mean Speed (km/h),Mean Speed Ratio,Total Vehicles
0,5.922294,0.111626,468
1,5.923229,0.111622,468
2,5.924155,0.111619,468
3,5.925071,0.111615,468
4,5.927115,0.111625,468
...,...,...,...
95,6.102065,0.113192,468
96,6.105943,0.113256,468
97,6.109748,0.113318,468
98,6.113481,0.113379,468


## 7. Visualization Setup

Create map visualization of traffic speeds.

In [None]:
def speed_color(speed_kph):
    """Color function for observed speeds (matching sg_traffic_bando)."""
    if speed_kph is None or np.isnan(speed_kph):
        return '#999999'
    
    # Speed ranges matching sg_traffic_bando visualization
    if speed_kph < 20:     return '#d73027'    # Red: Very slow
    elif speed_kph < 35:   return '#fc8d59'    # Orange: Slow
    elif speed_kph < 50:   return '#fee08b'    # Yellow: Moderate
    elif speed_kph < 70:   return '#91cf60'    # Light green: Fast
    else:                  return '#1a9850'    # Dark green: Very fast

def create_traffic_map(model, zoom=12):
    """Create Folium map showing traffic speeds."""
    # Create base map
    traffic_map = folium.Map(
        location=(1.3521, 103.8198),
        zoom_start=zoom,
        control_scale=True,
        tiles='CartoDB positron'
    )
    
    # Add road segments
    for road in model.agents_by_type[RoadAgent]:
        folium.GeoJson(
            road.geometry.__geo_interface__,
            style_function=lambda x, speed=road.mean_speed_kph: {
                'color': speed_color(speed),
                'weight': 3,
                'opacity': 0.8
            },
            popup=folium.Popup(
                f"""
                <b>Road:</b> {road.edge_id}<br>
                <b>Type:</b> {road.highway_type}<br>
                <b>Free-flow:</b> {road.speed_kph:.1f} km/h<br>
                <b>Current Speed:</b> {road.mean_speed_kph:.1f} km/h<br>
                <b>Speed Ratio:</b> {road.speed_ratio:.3f}<br>
                <b>Vehicles:</b> {road.num_vehicles}<br>
                """,
                max_width=300
            ),
            tooltip=f"Speed: {road.mean_speed_kph:.1f} km/h"
        ).add_to(traffic_map)
    
    # Add legend (updated to match sg_traffic_bando thresholds)
    legend_html = '''
    <div style="position: fixed; 
                bottom: 50px; left: 50px; width: 220px; height: 130px; 
                background-color: white; border:2px solid grey; z-index:9999; 
                font-size:14px; padding: 10px">
    <b>Traffic Speed (km/h)</b><br>
    <i class="fa fa-square" style="color:#d73027"></i> &lt; 20 Very Slow<br>
    <i class="fa fa-square" style="color:#fc8d59"></i> 20-35 Slow<br>
    <i class="fa fa-square" style="color:#fee08b"></i> 35-50 Moderate<br>
    <i class="fa fa-square" style="color:#91cf60"></i> 50-70 Fast<br>
    <i class="fa fa-square" style="color:#1a9850"></i> &gt; 70 Very Fast<br>
    </div>
    '''
    traffic_map.get_root().html.add_child(folium.Element(legend_html))
    
    return traffic_map

# Create and display map
traffic_map = create_traffic_map(model)
traffic_map

## 8. Interactive Mesa-Geo Visualization

Create interactive visualization with parameter controls.

In [None]:
def traffic_portrayal(agent):
    """Portrayal function for agents."""
    portrayal = {}
    
    if isinstance(agent, VehicleAgent):
        # Color vehicles by speed
        speed_kph = agent.velocity * agent.road_segment.speed_kph
        portrayal["color"] = speed_color(speed_kph)
        portrayal["radius"] = 3
        
    elif isinstance(agent, RoadAgent):
        # Color roads by average speed
        portrayal["color"] = speed_color(agent.mean_speed_kph)
        portrayal["weight"] = 2
        portrayal["opacity"] = 0.6
    
    return portrayal

# Model parameters for interface
model_params = {
    "edges_gdf": edges,  # Fixed parameter
    "num_roads": {
        "type": "SliderInt",
        "value": 30,
        "label": "Number of Road Segments",
        "min": 10,
        "max": 100,
        "step": 10,
    },
    "vehicles_per_road": {
        "type": "SliderInt",
        "value": 10,
        "label": "Vehicles per Road (default)",
        "min": 5,
        "max": 30,
        "step": 5,
    },
    "delta_t": {
        "type": "SliderFloat",
        "value": 0.01,
        "label": "Time Step (delta_t)",
        "min": 0.001,
        "max": 0.05,
        "step": 0.001,
    },
}

In [None]:
# # Create visualization components
# geo_component = make_geospace_component(traffic_portrayal, zoom=12, scroll_wheel_zoom=True)

# # Create visualization
# page = SolaraViz(
#     BandoTrafficModel,
#     model_params,
#     name="Bando Traffic Model - Singapore",
#     components=[
#         geo_component,
#         make_plot_component(["Mean Speed (km/h)", "Mean Speed Ratio"]),
#         make_plot_component(["Total Vehicles"]),
#     ],
# )

# # Render visualization
# page

## 9. Analysis

Analyze simulation results by highway type.

In [None]:
# Collect road statistics
road_stats = []
for road in model.agents_by_type[RoadAgent]:
    road_stats.append({
        'edge_id': road.edge_id,
        'highway_type': road.highway_type,
        'free_speed_kph': road.speed_kph,
        'mean_speed_kph': road.mean_speed_kph,
        'speed_ratio': road.speed_ratio,
        'num_vehicles': road.num_vehicles,
        'length_m': road.length_m
    })

road_stats_df = pd.DataFrame(road_stats)

# Summary by highway type
print("\nSpeed Statistics by Highway Type:")
summary = road_stats_df.groupby('highway_type').agg({
    'mean_speed_kph': ['mean', 'std'],
    'speed_ratio': ['mean', 'std'],
    'num_vehicles': 'mean'
}).round(2)
print(summary)

road_stats_df.head(10)


Speed Statistics by Highway Type:
               mean_speed_kph       speed_ratio       num_vehicles
                         mean   std        mean   std         mean
highway_type                                                      
motorway                10.16  0.48        0.12  0.00         20.0
motorway_link            4.77  0.80        0.08  0.01         10.0
primary                  6.83  1.29        0.11  0.02         12.0
residential              5.99  0.67        0.13  0.00          6.0
secondary                6.12  0.81        0.12  0.00         10.0
secondary_link           5.11   NaN        0.10   NaN         10.0
tertiary                 6.43  1.04        0.13  0.02          8.0
trunk                    6.31  0.79        0.09  0.01         15.0
unclassified             4.13  0.48        0.08  0.01         10.0


Unnamed: 0,edge_id,highway_type,free_speed_kph,mean_speed_kph,speed_ratio,num_vehicles,length_m
0,992571662_319637230_0,secondary,50.0,5.992198,0.119844,10,344.302934
1,5175441602_243677085_0,secondary,60.0,6.813118,0.113552,10,214.539769
2,12076505781_12076491368_0,secondary,53.903529,6.534581,0.121227,10,215.317059
3,5596873869_258305277_0,residential,50.0,6.101793,0.122036,6,209.938376
4,4593590665_253912978_0,residential,50.0,6.657677,0.133154,6,270.141454
5,244001050_244001064_0,unclassified,50.0,3.703709,0.074074,10,367.479134
6,3846201904_148145492_0,motorway_link,70.0,5.337358,0.076248,10,525.799845
7,246294564_246294558_0,primary,70.0,4.883784,0.069768,12,351.466966
8,246579426_6248463353_0,trunk,70.0,6.867931,0.098113,15,247.054699
9,2379764009_259577285_0,residential,50.0,6.253448,0.125069,6,389.948135


## 10. Test Fixed Implementation

Run a new simulation with the fixed speed scaling and warmup period.

In [None]:
# Create new model with fixed implementation
print("Creating model with improved speed scaling...")
model_fixed = BandoTrafficModel(edges, num_roads=50, vehicles_per_road=10, delta_t=0.005)

print(f"Total roads: {len(model_fixed.agents_by_type[RoadAgent])}")
print(f"Total vehicles: {len(model_fixed.agents_by_type[VehicleAgent])}")
print("\nInitial speeds after warmup simulation:")

# Collect initial statistics
initial_stats = []
for road in model_fixed.agents_by_type[RoadAgent]:
    initial_stats.append({
        'highway_type': road.highway_type,
        'free_speed_kph': road.speed_kph,
        'mean_speed_kph': road.mean_speed_kph,
        'speed_ratio': road.speed_ratio
    })

initial_df = pd.DataFrame(initial_stats)
print(f"\nSpeed range: {initial_df['mean_speed_kph'].min():.1f} - {initial_df['mean_speed_kph'].max():.1f} km/h")
print(f"Speed ratio range: {initial_df['speed_ratio'].min():.3f} - {initial_df['speed_ratio'].max():.3f}")

print("\nBy highway type:")
summary = initial_df.groupby('highway_type').agg({
    'mean_speed_kph': ['mean', 'std'],
    'speed_ratio': ['mean', 'std']
}).round(2)
print(summary)

In [None]:
# Run the simulation for 100 steps
print("\nRunning simulation for 100 steps...")
for i in range(100):
    model_fixed.step()
    if i % 20 == 0:
        model_fixed.datacollector.collect(model_fixed)
        print(f"Step {i}: Mean speed = {model_fixed.datacollector.model_vars['Mean Speed (km/h)'][-1]:.2f} km/h")

# Display results
results_fixed = model_fixed.datacollector.get_model_vars_dataframe()
print(f"\nFinal mean speed: {results_fixed['Mean Speed (km/h)'].iloc[-1]:.2f} km/h")
print(f"Final speed ratio: {results_fixed['Mean Speed Ratio'].iloc[-1]:.3f}")
results_fixed

In [None]:
# Create comparison map with fixed implementation
traffic_map_fixed = create_traffic_map(model_fixed)
traffic_map_fixed