<a href="https://colab.research.google.com/github/TC2008B-Team5/Multiagent-Systems-T5/blob/main/MESA_Team5.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install -U 'mesa[rec]'

Collecting mesa[rec]
  Downloading mesa-3.0.3-py3-none-any.whl.metadata (9.8 kB)
Collecting solara (from mesa[rec])
  Downloading solara-1.41.0-py2.py3-none-any.whl.metadata (8.9 kB)
Collecting solara-server==1.41.0 (from solara-server[dev,starlette]==1.41.0->solara->mesa[rec])
  Downloading solara_server-1.41.0-py2.py3-none-any.whl.metadata (2.8 kB)
Collecting solara-ui==1.41.0 (from solara-ui[all]==1.41.0->solara->mesa[rec])
  Downloading solara_ui-1.41.0-py2.py3-none-any.whl.metadata (7.3 kB)
Collecting rich-click (from solara-server==1.41.0->solara-server[dev,starlette]==1.41.0->solara->mesa[rec])
  Downloading rich_click-1.8.4-py3-none-any.whl.metadata (7.9 kB)
Collecting watchdog (from solara-server[dev,starlette]==1.41.0->solara->mesa[rec])
  Downloading watchdog-6.0.0-py3-none-manylinux2014_x86_64.whl.metadata (44 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m44.3/44.3 kB[0m [31m1.5 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting watchfiles (from solara-s

In [2]:
from mesa import Agent, Model
from mesa.space import MultiGrid
from mesa.datacollection import DataCollector
from mesa.time import RandomActivation
from mesa.visualization import SolaraViz
import seaborn as sns
import numpy as np
import pandas as pd
import random
from heapq import heappush, heappop

In [3]:
class SemaphoreAgent(Agent):
    """Represents a traffic semaphore."""

    def __init__(self, unique_id, model, positions, cycle_length=5):
        super().__init__(unique_id, model)
        self.positions = positions
        self.state = "green"  # Possible states: "green" or "red"
        self.timer = 0
        self.cycle_length = cycle_length

        # Initialize semaphore property layer
        value = 1 if self.state == "green" else 0
        for pos in self.positions:
            self.model.semaphore_layer.set_cell(pos, value)

    def step(self):
        """Advance the semaphore's state."""
        self.timer += 1
        if self.timer >= self.cycle_length:
            self.timer = 0
            # Toggle the semaphore state
            self.state = "red" if self.state == "green" else "green"
            # Update the semaphore property layer
            value = 1 if self.state == "green" else 0
            for pos in self.positions:
                self.model.semaphore_layer.set_cell(pos, value)

In [4]:
class CarAgent(Agent):
    """Represents a car navigating from one parking lot to another."""

    def __init__(self, unique_id, model, start_parking_lot, destination_parking_lot):
        super().__init__(unique_id, model)
        self.start_parking_lot = start_parking_lot
        self.destination_parking_lot = destination_parking_lot
        self.state = "moving"
        self.path = []  # Path will be calculated in the first step
        # Record the parking lot numbers
        self.start_parking_lot_number = start_parking_lot.number
        self.destination_parking_lot_number = destination_parking_lot.number

    def calculate_path(self, start, end):
        """Calculate the path using A* algorithm, considering property layers."""
        grid = self.model.grid
        open_set = []
        heappush(open_set, (0, start))
        came_from = {}
        g_score = {start: 0}
        f_score = {start: self.heuristic(start, end)}
        closed_set = set()

        while open_set:
            _, current = heappop(open_set)
            if current == end:
                return self.reconstruct_path(came_from, current)

            closed_set.add(current)
            neighbors = grid.get_neighborhood(
                current, moore=False, include_center=False
            )
            for neighbor in neighbors:
                if grid.out_of_bounds(neighbor):
                    continue
                if neighbor in closed_set:
                    continue
                # Avoid positions with buildings
                if self.model.building_layer.get_cell(neighbor) == 1:
                    continue
                # Avoid other cars
                cell_contents = self.model.grid.get_cell_list_contents([neighbor])
                if any(isinstance(agent, CarAgent) for agent in cell_contents):
                    continue
                tentative_g_score = g_score[current] + 1  # Assuming uniform cost
                if neighbor not in g_score or tentative_g_score < g_score[neighbor]:
                    came_from[neighbor] = current
                    g_score[neighbor] = tentative_g_score
                    f_score[neighbor] = tentative_g_score + self.heuristic(
                        neighbor, end
                    )
                    heappush(open_set, (f_score[neighbor], neighbor))
        return []

    def heuristic(self, a, b):
        """Manhattan distance heuristic for A* pathfinding."""
        return abs(a[0] - b[0]) + abs(a[1] - b[1])

    def reconstruct_path(self, came_from, current):
        """Reconstruct the path from start to end."""
        total_path = [current]
        while current in came_from:
            current = came_from[current]
            total_path.insert(0, current)
        return total_path

    def step(self):
        """Advance the car agent's state."""
        if not self.path:
            # Calculate path from current position to destination
            self.path = self.calculate_path(
                self.pos, self.destination_parking_lot.pos
            )
            if not self.path:
                # No path found; remove agent
                self.remove()
                return

        if self.pos == self.destination_parking_lot.pos:
            self.state = "arrived"
            # Handle arrival at destination
            self.destination_parking_lot.add_car()
            self.remove()
            return

        next_pos = self.path.pop(0)
        # Check if the cell is empty and within bounds
        if self.model.grid.out_of_bounds(next_pos):
            # Recalculate path if out of bounds
            self.path = self.calculate_path(
                self.pos, self.destination_parking_lot.pos
            )
            return

        # Check for obstacles
        if self.model.building_layer.get_cell(next_pos) == 1:
            # Recalculate path if building in the way
            self.path = self.calculate_path(
                self.pos, self.destination_parking_lot.pos
            )
            return

        cell_contents = self.model.grid.get_cell_list_contents([next_pos])
        if any(isinstance(agent, CarAgent) for agent in cell_contents):
            # Recalculate path if blocked by another car
            self.path = self.calculate_path(
                self.pos, self.destination_parking_lot.pos
            )
            return

        # Respect semaphores by checking the semaphore layer
        if self.model.semaphore_layer.get_cell(next_pos) == 0:
            # Semaphore is red; wait
            self.path.insert(0, next_pos)
            return

        # Move agent
        self.model.grid.move_agent(self, next_pos)

    def remove(self):
        """Remove the agent from the model and grid."""
        self.model.grid.remove_agent(self)
        self.model.schedule.remove(self)

In [5]:
class Building:
    def __init__(self, positions):
        self.positions = positions


class ParkingLot:
    def __init__(self, pos, number, capacity=3):
        self.pos = pos
        self.number = number  # Unique identifier for the parking lot
        self.capacity = capacity
        self.current_cars = capacity  # Start full

    def add_car(self):
        if self.current_cars < self.capacity:
            self.current_cars += 1
            return True
        return False

    def remove_car(self):
        if self.current_cars > 0:
            self.current_cars -= 1
            return True
        return False

In [6]:
class CityModel(Model):
    """A model representing a city with parking lots, cars, and semaphores."""

    def __init__(
        self,
        width,
        height,
        building_positions,
        parking_lot_positions,
        semaphore_positions,
        num_cars,
    ):
        super().__init__()
        self.width = width
        self.height = height
        self.schedule = RandomActivation(self)
        self.grid = MultiGrid(width, height, torus=False)

        # Initialize property layers
        self.semaphore_layer = self.grid.add_property_layer("semaphore")
        self.building_layer = self.grid.add_property_layer("building")
        self.parking_layer = self.grid.add_property_layer("parking")

        # Initialize collections
        self.buildings = []
        self.parking_lots = {}
        self.semaphores = []

        # Initialize data collector
        self.datacollector = DataCollector(
            model_reporters={"Num_Cars": lambda m: m.num_cars}
        )

        # Initialize components
        self.initialize_buildings(building_positions)
        self.initialize_parking_lots(parking_lot_positions)
        self.initialize_semaphores(semaphore_positions)
        self.spawn_car_agents(num_cars)

    def initialize_buildings(self, building_positions):
        """Initialize buildings in the model."""
        for positions in building_positions:
            building = Building(positions=positions)
            self.buildings.append(building)
            for pos in positions:
                # Set building property layer values
                self.building_layer.set_cell(pos, 1)  # Mark as occupied

    def initialize_parking_lots(self, parking_lot_positions):
        """Initialize parking lots in the model."""
        for number, pos in enumerate(parking_lot_positions, start=1):
            lot = ParkingLot(pos=pos, number=number, capacity=3)
            self.parking_lots[pos] = lot
            # Set parking property layer values
            self.parking_layer.set_cell(pos, lot.current_cars)

    def initialize_semaphores(self, semaphore_positions):
        """Initialize semaphore agents in the model."""
        for positions in semaphore_positions:
            semaphore = SemaphoreAgent(self.next_id(), self, positions)
            self.semaphores.append(semaphore)
            for pos in positions:
                self.grid.place_agent(semaphore, pos)
                # Initialize semaphore layer
                self.semaphore_layer.set_cell(pos, 1)  # Initially green
            self.schedule.add(semaphore)

    def spawn_car_agents(self, num_cars):
        """Spawn car agents in the model."""
        for _ in range(num_cars):
            start_lots = [
                lot for lot in self.parking_lots.values() if lot.current_cars > 0
            ]
            if not start_lots:
                break
            start_parking_lot = self.random.choice(start_lots)
            start_parking_lot.remove_car()  # Remove car from the starting parking lot
            dest_lots = [
                lot for lot in self.parking_lots.values() if lot != start_parking_lot
            ]
            if not dest_lots:
                break
            destination_parking_lot = self.random.choice(dest_lots)
            car_agent = CarAgent(
                self.next_id(), self, start_parking_lot, destination_parking_lot
            )
            self.grid.place_agent(car_agent, start_parking_lot.pos)
            self.schedule.add(car_agent)
            # Indicate car's start and destination parking lots
            print(
                f"Car {car_agent.unique_id} exits from Parking Lot {car_agent.start_parking_lot_number} and heads to Parking Lot {car_agent.destination_parking_lot_number}"
            )

    @property
    def num_cars(self):
        """Return the number of car agents currently in the model."""
        return sum(isinstance(agent, CarAgent) for agent in self.schedule.agents)

    def step(self):
        """Advance the model by one step."""
        self.datacollector.collect(self)
        self.schedule.step()