In [3]:
import osmnx as ox
import networkx as nx
import folium
import random
import mesa
from mesa.space import ContinuousSpace
from shapely.geometry import Point
from PIL import Image
import io
from IPython.display import display, Image as IPImage

# Bounding box coordinates for Bangalore
north, south, east, west = 12.976, 12.961, 77.599, 77.579  # Adjusted coordinates for Bangalore
bbox = (north, south, east, west)

# Create a graph from the bounding box using the bbox parameter
G = ox.graph_from_bbox(north, south, east, west, network_type='drive')

# Extract graph bounds
min_x = min(nx.get_node_attributes(G, 'x').values())
max_x = max(nx.get_node_attributes(G, 'x').values())
min_y = min(nx.get_node_attributes(G, 'y').values())
max_y = max(nx.get_node_attributes(G, 'y').values())

# Define the TrafficGeoSpace class to integrate with the OSMnx graph
class TrafficGeoSpace(ContinuousSpace):
    def __init__(self, G, crs="EPSG:3857"):
        x_max = max_x - min_x
        y_max = max_y - min_y
        super().__init__(x_max, y_max, False)
        self.G = G  # Assign the OSMnx graph to the space
        self.crs = crs  # Coordinate Reference System
        self.congestion_map = {}  # Dictionary to track congestion per node

    def move_agent(self, agent, pos):
        # Update congestion map
        node = agent.route[agent.current_step]
        if node not in self.congestion_map:
            self.congestion_map[node] = 0
        self.congestion_map[node] += 1
        super().move_agent(agent, pos)

    def get_congestion(self, node):
        return self.congestion_map.get(node, 0)

# Modify the VehicleAgent class to include a route, vehicle type, speed, and congestion handling
class VehicleAgent(mesa.Agent):
    def __init__(self, unique_id, model, vehicle_type, route=None):
        super().__init__(unique_id, model)
        self.vehicle_type = vehicle_type  # Assign vehicle type (car, truck, bike)
        self.route = route  # Assign the route for the vehicle
        self.current_step = 0  # Initialize the current step in the route
        self.speed = random.uniform(1, 3)  # Speed factor, higher means faster
        self.congested = False  # Initially, not in a congested area

    def step(self):
        # Handle traffic signals and congestion
        if self.model.traffic_signals.get(self.route[self.current_step], False):
            return  # Vehicle waits at the red light

        if self.route and self.current_step < len(self.route):
            next_node = self.route[self.current_step]
            x = self.model.space.G.nodes[next_node]['x'] - min_x
            y = self.model.space.G.nodes[next_node]['y'] - min_y
            
            # Adjust speed based on congestion
            if self.model.space.get_congestion(next_node) > 5:  # Threshold for congestion
                self.speed = max(0.5, self.speed - 0.5)  # Slow down in congestion
                self.congested = True
            else:
                self.congested = False

            self.model.space.move_agent(self, (x, y))
            self.current_step += int(self.speed)  # Move according to speed
        else:
            pass  # Optionally, handle reaching the end of the route

# Modify the TrafficModel to use the road network from OSMnx and include traffic signals and congestion
class TrafficModel(mesa.Model):
    def __init__(self, G, num_vehicles):
        super().__init__()
        self.schedule = mesa.time.RandomActivation(self)
        self.space = TrafficGeoSpace(G)
        self.traffic_signals = self.create_traffic_signals()
        self.create_vehicles(num_vehicles)

    def create_traffic_signals(self):
        # Define traffic signals at intersections (nodes)
        signals = {}
        # Use node degree to identify potential intersections
        intersections = [node for node, degree in dict(self.space.G.degree()).items() if degree > 3]
        for node in intersections:
            signals[node] = random.choice([True, False])  # True for red, False for green
        return signals

    def create_vehicles(self, num_vehicles):
        vehicle_types = ['car', 'truck', 'bike']
        for i in range(num_vehicles):
            start_point = (random.uniform(south, north), random.uniform(west, east))
            end_point = (random.uniform(south, north), random.uniform(west, east))
            start_node = ox.distance.nearest_nodes(self.space.G, X=start_point[1], Y=start_point[0])
            end_node = ox.distance.nearest_nodes(self.space.G, X=end_point[1], Y=end_point[0])

            try:
                route = nx.shortest_path(self.space.G, source=start_node, target=end_node, weight='length')
                vehicle_type = random.choice(vehicle_types)
                vehicle = VehicleAgent(i, self, vehicle_type=vehicle_type, route=route)
                start_position = (self.space.G.nodes[start_node]['x'] - min_x, self.space.G.nodes[start_node]['y'] - min_y)  # Adjust coordinates
                self.space.place_agent(vehicle, start_position)
                self.schedule.add(vehicle)
            except nx.NetworkXNoPath:
                pass  # Skip this vehicle if no route is found

    def step(self):
        # Update traffic signals at each step
        for node in self.traffic_signals.keys():
            self.traffic_signals[node] = not self.traffic_signals[node]  # Toggle signal state
        self.schedule.step()

# Running the modified TrafficModel
num_vehicles = 51
model = TrafficModel(G, num_vehicles)

# Running the simulation and updating the Folium map
map_images = []
for _ in range(10):  # Run for 10 steps
    model.step()

    # Create a new map for each step
    temp_map = folium.Map(location=[(north + south) / 2, (east + west) / 2], zoom_start=15)
    
    # Add the road network to the map
    edges = ox.graph_to_gdfs(G, nodes=False, edges=True)
    for _, row in edges.iterrows():
        points = [(row['geometry'].coords[i][1], row['geometry'].coords[i][0]) for i in range(len(row['geometry'].coords))]
        folium.PolyLine(points, color='blue', weight=2.5, opacity=1).add_to(temp_map)

    # Add vehicles to the map
    for vehicle in model.schedule.agents:
        if isinstance(vehicle, VehicleAgent) and vehicle.current_step > 0 and vehicle.current_step <= len(vehicle.route):
            vehicle_position = (vehicle.model.space.G.nodes[vehicle.route[vehicle.current_step - 1]]['y'],
                                vehicle.model.space.G.nodes[vehicle.route[vehicle.current_step - 1]]['x'])
            color = 'red' if vehicle.vehicle_type == 'car' else 'blue' if vehicle.vehicle_type == 'truck' else 'green'
            folium.CircleMarker(location=vehicle_position, radius=5, color=color, fill=True).add_to(temp_map)

    # Save the map state as an image
    img_data = temp_map._to_png(5)
    img = Image.open(io.BytesIO(img_data))
    map_images.append(img)

# Create an animated GIF from the saved map images
gif_path = 'bangalore_traffic_congestion.gif'
map_images[0].save(gif_path, save_all=True, append_images=map_images[1:], duration=500, loop=0)

# Display the animated GIF in the notebook
display(IPImage(filename=gif_path))

print("Simulation complete. Check the notebook for the Bangalore traffic congestion animation.")


  G = ox.graph_from_bbox(north, south, east, west, network_type='drive')


IndexError: list index out of range

In [4]:
import mesa
import osmnx as ox
import networkx as nx
import random
import folium

# Define the TrafficGeoSpace for spatial interactions
class TrafficGeoSpace:
    def __init__(self, G):
        self.G = G

    def get_nearest_node(self, pos):
        return ox.distance.nearest_nodes(self.G, pos[1], pos[0])

# Define the VehicleAgent class
class VehicleAgent(mesa.Agent):
    def __init__(self, unique_id, model, vehicle_type):
        super().__init__(unique_id, model)
        self.vehicle_type = vehicle_type
        self.route = []
        self.current_step = 0
        self.start_node = random.choice(list(self.model.space.G.nodes))
        self.end_node = random.choice(list(self.model.space.G.nodes))

        # Calculate route using shortest path
        try:
            self.route = nx.shortest_path(self.model.space.G, self.start_node, self.end_node, weight='length')
        except nx.NetworkXNoPath:
            self.route = []

    def step(self):
        if not self.route:
            return

        if self.current_step < len(self.route):
            if self.model.traffic_signals.get(self.route[self.current_step], False):
                return  # Vehicle waits at the red light
            self.current_step += 1
        else:
            # Handle end of route, possibly reset or remove the agent
            self.current_step = 0
            self.start_node = self.end_node
            self.end_node = random.choice(list(self.model.space.G.nodes))
            try:
                self.route = nx.shortest_path(self.model.space.G, self.start_node, self.end_node, weight='length')
            except nx.NetworkXNoPath:
                self.route = []

# Define the TrafficModel class
class TrafficModel(mesa.Model):
    def __init__(self, G, num_vehicles):
        self.schedule = mesa.time.RandomActivation(self)
        self.space = TrafficGeoSpace(G)
        self.traffic_signals = self.create_traffic_signals()
        self.create_vehicles(num_vehicles)

    def create_traffic_signals(self):
        signals = {}
        intersections = list(self.space.G.nodes)
        for node in intersections:
            signals[node] = random.choice([True, False])  # True for red, False for green
        return signals

    def create_vehicles(self, num_vehicles):
        for i in range(num_vehicles):
            vehicle_type = random.choice(['car', 'truck', 'bus'])
            vehicle = VehicleAgent(i, self, vehicle_type)
            self.schedule.add(vehicle)

    def step(self):
        # Toggle traffic signals at each step
        for node in self.traffic_signals.keys():
            self.traffic_signals[node] = not self.traffic_signals[node]  # Toggle signal state
        self.schedule.step()

# Define map boundaries (latitude and longitude)
north, south, east, west = 40.7577, 40.7510, -73.9857, -73.9870

# Create the road network graph using bounding box
bbox = (north, south, east, west)
G = ox.graph_from_bbox(*bbox, network_type='drive')

# Initialize and run the TrafficModel
num_vehicles = 10
model = TrafficModel(G, num_vehicles)

# Run the simulation and update the Folium map
map_images = []
for _ in range(10):  # Run for 10 steps
    model.step()

    # Create a new map for each step
    temp_map = folium.Map(location=[(north + south) / 2, (east + west) / 2], zoom_start=15)

    for vehicle in model.schedule.agents:
        if isinstance(vehicle, VehicleAgent):
            if vehicle.current_step > 0 and vehicle.current_step < len(vehicle.route):
                vehicle_position = (vehicle.model.space.G.nodes[vehicle.route[vehicle.current_step]]['y'],
                                    vehicle.model.space.G.nodes[vehicle.route[vehicle.current_step]]['x'])
                color = 'red' if vehicle.vehicle_type == 'car' else 'blue' if vehicle.vehicle_type == 'truck' else 'green'
                folium.CircleMarker(location=vehicle_position, radius=5, color=color, fill=True).add_to(temp_map)

    map_images.append(temp_map._to_png(5))  # Save the map as an image

# Save the map as an HTML file to visualize
final_map = folium.Map(location=[(north + south) / 2, (east + west) / 2], zoom_start=15)
final_map.save("traffic_simulation.html")


  G = ox.graph_from_bbox(*bbox, network_type='drive')


RuntimeError: The Mesa Model class was not initialized. You must explicitly initialize the Model by calling super().__init__() on initialization.

In [1]:
import osmnx as ox
import networkx as nx
import mesa
from mesa.space import ContinuousSpace
from shapely.geometry import Point
import random
import folium

# Bounding box coordinates for Bangalore
north, south, east, west = 12.976, 12.961, 77.599, 77.579

# Create a graph from the bounding box using the bbox parameter
G = ox.graph_from_bbox(north, south, east, west, network_type='drive')

# Extract graph bounds
min_x = min(nx.get_node_attributes(G, 'x').values())
max_x = max(nx.get_node_attributes(G, 'x').values())
min_y = min(nx.get_node_attributes(G, 'y').values())
max_y = max(nx.get_node_attributes(G, 'y').values())

# Define the TrafficGeoSpace class to integrate with the OSMnx graph
class TrafficGeoSpace(ContinuousSpace):
    def __init__(self, G, crs="EPSG:3857"):
        x_max = max_x - min_x
        y_max = max_y - min_y
        super().__init__(x_max, y_max, False)
        self.G = G  # Assign the OSMnx graph to the space
        self.crs = crs  # Coordinate Reference System

# Define the VehicleAgent class
class VehicleAgent(mesa.Agent):
    def __init__(self, unique_id, model):
        super().__init__(unique_id, model)
        self.route = None
        self.current_step = 0

    def step(self):
        # Logic for vehicle movement goes here
        pass


  G = ox.graph_from_bbox(north, south, east, west, network_type='drive')


In [2]:
class TrafficModel(mesa.Model):
    def __init__(self, G, num_vehicles):
        self.space = TrafficGeoSpace(G)
        self.schedule = mesa.time.RandomActivation(self)
        self.num_vehicles = num_vehicles

        # Add vehicle agents to the model
        for i in range(self.num_vehicles):
            route = self._generate_route()
            agent = VehicleAgent(i, self)
            agent.route = route
            self.schedule.add(agent)

    def _generate_route(self):
        # Generate a route for each vehicle using OSMnx graph
        start_node = random.choice(list(self.space.G.nodes))
        end_node = random.choice(list(self.space.G.nodes))
        route = nx.shortest_path(self.space.G, start_node, end_node)
        return route

    def step(self):
        self.schedule.step()


In [3]:
# Visualization Function
def visualize_traffic(model):
    m = ox.plot_graph_folium(model.space.G, popup_attribute='name', weight=2, color='blue')
    
    # Plot vehicles on the map
    for agent in model.schedule.agents:
        if agent.route:
            node_id = agent.route[agent.current_step]
            x, y = model.space.G.nodes[node_id]['x'], model.space.G.nodes[node_id]['y']
            folium.Marker([y, x], popup=f'Vehicle {agent.unique_id}').add_to(m)
    
    # Save the map as an HTML file or display it
    m.save('traffic_simulation.html')
    display(IPImage(io.BytesIO(m._to_png())))

# Initialize and run the model
model = TrafficModel(G, num_vehicles=10)
for _ in range(10):
    model.step()

# Visualize the traffic simulation
visualize_traffic(model)


RuntimeError: The Mesa Model class was not initialized. You must explicitly initialize the Model by calling super().__init__() on initialization.