In [187]:
from mesa import Agent, Model
from mesa.time import SimultaneousActivation
from mesa.space import MultiGrid, Position
from mesa.datacollection import DataCollector


import numpy as np
import matplotlib
import matplotlib.pyplot as plt
import matplotlib.animation as animation

from IPython.display import HTML


from enum import Enum
from math import sqrt
from random import choice

from typing import Tuple, TypedDict, List

from IPython.display import clear_output

In [188]:
class DropOff:
    def __init__(self, agent: Agent, position: Position) -> None:
        self.agent = agent
        self.position = position

In [189]:
def get_status_grid(model):
    result = np.zeros((model.grid.height, model.grid.width))
    for (content, x, y) in model.grid.coord_iter():
        for obj in content:
            if isinstance(obj, Car):
                result[y][x] = 0
            elif isinstance(obj, Person):
                result[y][x] = 255
            elif isinstance(obj, Cell) and obj.category == obj.Category.Block:
                result[y][x] = 50
            elif isinstance(obj, Cell) and obj.category == obj.Category.Street:
                result[y][x] = 100
            elif isinstance(obj, Cell) and obj.category == obj.Category.Intersection:
                result[y][x] = 150
    return result

In [190]:
class Directions(Enum):
    North = 'NORTH'
    East = 'EAST'
    South = 'SOUTH'
    West = 'WEST'

In [191]:
class Cell(Agent):
    class Category(Enum):
        Block = "BLOCK"
        Intersection = "INTERSECTION"
        Street = "STREET"

    def __init__(self, unique_id: int, model: Model, category: Category) -> None:
        super().__init__(unique_id, model)
        self.category = category
        self.steps_elapsed = 0

        self.traffic_directions = [direction for direction in Directions]
        self.index = 0
        self.is_green = self.traffic_directions[self.index]
        self.next_is_green = None

    def step(self):
        self.next_is_green = self.is_green

        if self.category == Cell.Category.Intersection:
            if self.steps_elapsed % 10 == 0:
                self.index += 1
                self.next_is_green = self.traffic_directions[self.index % 4]
        
        self.steps_elapsed += 1

    def advance(self):
        self.is_green = self.next_is_green

In [192]:
class MovingAgent(Agent):
    def __init__(self, unique_id: str, model: Model, destination: Position):
        super().__init__(unique_id, model)
        self.destination = destination

    def is_in_same_direction(self, agent_1: Agent, agent_2: Agent):
        return agent_1.direction == self.get_direction_to_position(agent_1.pos, agent_2.pos)

    def calculate_distance(self, pos1: Position, pos2: Position):
        pos1_x, pos1_y = pos1
        pos2_x, pos2_y = pos2

        diff_y = pos2_y - pos1_y
        diff_x = pos2_x - pos1_x

        return sqrt(pow(diff_y, 2) + pow(diff_x, 2))

    def get_direction_to_position(self, origin: Position, destination: Position) -> Directions:
        x_origin, y_origin = origin
        x_destination, y_destination = destination

        if x_destination < x_origin:
            return Directions.West
        elif x_destination > x_origin:
            return Directions.East
        elif y_destination > y_origin:
            return Directions.North
        elif y_destination < y_origin:
            return Directions.South
        else:
            raise ValueError("Origin and destination are the same position")

    def get_cell_on(self):
        neighbors = self.model.grid.get_neighbors(
            self.pos,
            moore=False,
            include_center=True
        )

        for neighbor in neighbors:
            if isinstance(neighbor, Cell) and neighbor.pos == self.pos:
                return neighbor
        
        return None
        # Logica del movimiento aqui
    def move(self):
        if self.direction == Directions.North:
            self.next_position = (self.pos[0], self.pos[1] + 1)
        if self.direction == Directions.East:
            self.next_position = (self.pos[0] + 1, self.pos[1])
        if self.direction == Directions.South:
            self.next_position = (self.pos[0], self.pos[1] - 1)
        if self.direction == Directions.West:
            self.next_position = (self.pos[0] - 1, self.pos[1])

In [193]:
class Car(MovingAgent):
    class States(Enum):
        Moving = 0
        Stopped = 1
        Unstoppable = 2
        Inactive = 3
    
    def __init__(self, unique_id: str, model: Model, destination: Position) -> None:
        super().__init__(unique_id, model, destination)
        self.state = Car.States.Stopped
        self.people_on_board = []
        self.direction = None
        self.next_position = self.pos
        self.next_state = None
        self.capacity = 2



    def check_neighbors(self):
        neighbors = self.model.grid.get_neighbors(
            self.pos,
            moore=False,
            include_center=False
        )
        for neighbor in neighbors:
            if isinstance(neighbor, Car) and neighbor.state != Car.States.Stopped and self.is_in_same_direction(self, neighbor):
                self.next_state = Car.States.Stopped
                return True
                
            if isinstance(neighbor, Cell) and neighbor.category == Cell.Category.Intersection:
                if neighbor.is_green != self.direction and self.get_direction_to_position(self.pos, neighbor.pos) == self.direction:
                    self.next_state = Car.States.Stopped
                    return True

        return False
    
    def check_offboard(self):
        for person in self.people_on_board:
            current_distance = self.calculate_distance(self.pos, person.destination)
            if current_distance <= person.distance_to_destination:
                person.distance_to_destination = current_distance
                return
            self.offboard(person)


    def can_board(self):
        return len(self.people_on_board) < self.capacity

    def board(self, person):
        self.people_on_board.append(person)
        self.model.eventually_remove(person)

    def offboard(self, person):
        person.next_state = Person.States.Stopped
        drop_off = DropOff(person, self.pos)
        self.people_on_board.remove(person)
        self.model.eventually_place(drop_off)

    def check_same_position(self, position_1, position_2):
        x_1, y_1 = position_1
        x_2, y_2 = position_2

        return x_1 == x_2 and y_1 == y_2

    def step(self):
        self.next_position = self.pos

        self.check_offboard()

        if self.check_neighbors():
            return

        if self.check_same_position(self.pos, self.destination):
            for person in self.people_on_board:
                self.offboard(person)
            self.next_state = Car.States.Inactive
            return

            #A lo mejor no es necesario?
        if self.direction is None or self.state == Car.States.Stopped or self.state == Car.States.Unstoppable:
            neighbors = self.model.grid.get_neighbors(
                self.pos,
                moore=False,
                include_center=False
            )

            closest_pos, closest_distance = None, float("inf")

            for neighbor in neighbors:

                if isinstance(neighbor, Cell):

                    if neighbor.category != Cell.Category.Block:
                        distance = self.calculate_distance(neighbor.pos, self.destination)
                        if distance < closest_distance:
                            closest_pos, closest_distance = neighbor.pos, distance
                    
                    if neighbor.category == Cell.Category.Intersection:
                        closest_pos = neighbor.pos

            if not closest_pos is None:
                self.direction = self.get_direction_to_position(self.pos, closest_pos)
                self.next_state = Car.States.Moving

            self.move()

            return
        
        if self.state == Car.States.Moving:
            cell_on = self.get_cell_on()

            if not cell_on is None:
                if cell_on.category == Cell.Category.Intersection:
                    self.next_state = Car.States.Unstoppable

                if cell_on.category == Cell.Category.Street:
                    self.move()

    def advance(self):
        self.state = self.next_state
        self.model.grid.move_agent(self, self.next_position)

In [194]:
class Person(MovingAgent):
    class States(Enum):
        Walking = 0
        Stopped = 1
        Onboard = 2
        Inactive = 3
    
    def __init__(self, unique_id: str, model: Model, origin: Position, destination: Position) -> None:
        super().__init__(unique_id, model, destination)
        self.state = Person.States.Stopped
        self.direction = Directions.North
        self.next_position = self.pos
        self.next_state = None
        self.distance_to_destination = self.calculate_distance(origin, destination)
        self.boarded_car = None
    
    def get_opposite_direction(self):
        if self.direction == Directions.North:
            return Directions.South
        elif self.direction == Directions.South:
            return Directions.North
        elif self.direction == Directions.West:
            return Directions.East
        elif self.direction == Directions.East:
            return Directions.West
        else:
            raise ValueError("Invalid direction")

    def is_closer_to_destination(self, pos):
        return self.calculate_distance(self.destination, pos) < self.calculate_distance(self.pos, self.destination)
        
    def check_neighbors(self):
        neighbors = self.model.grid.get_neighbors(
            self.pos,
            moore=False,
            include_center=False
        )
        for neighbor in neighbors:
            if isinstance(neighbor, Cell) and neighbor.category == Cell.Category.Intersection:
                if neighbor.is_green != self.direction and neighbor.is_green != self.get_opposite_direction() and self.get_direction_to_position(self.pos, neighbor.pos) == self.direction:
                    self.next_state = Person.States.Stopped
                    return True

            if isinstance(neighbor, Car):
                if neighbor.can_board() and self.is_closer_to_destination(neighbor.destination):
                    self.boarded_car = neighbor
                    self.next_state = Person.States.Onboard
                    neighbor.board(self) 
                    return True
        return False
        

    def step(self):
        
        self.next_position = self.pos
        self.next_state = self.state

        if self.state == Person.States.Onboard:
            return

        if self.check_neighbors():
            return

        if self.model.counter % 5 != 0:
            return

        if self.pos == self.destination:
            self.next_state = Person.States.Inactive
            return

        if self.direction is None or self.state == Person.States.Stopped:
            neighbors = self.model.grid.get_neighbors(
                self.pos,
                moore=False,
                include_center=False
            )

            closest_pos, closest_distance = None, float("inf")

            for neighbor in neighbors:
                if isinstance(neighbor, Cell) and neighbor.category != Cell.Category.Block:
                    distance = self.calculate_distance(neighbor.pos, self.destination)

                    if distance < closest_distance:
                        closest_pos, closest_distance = neighbor.pos, distance

            if not closest_pos is None:
                self.direction = self.get_direction_to_position(self.pos, closest_pos)
                self.next_state = Person.States.Walking

            self.move()

            return
        
        if not self.direction is None and self.state == Person.States.Walking:
            cell_on = self.get_cell_on()

            if not cell_on is None:
                if cell_on.category == Cell.Category.Intersection:
                    self.next_state = Person.States.Stopped

                if cell_on.category == Cell.Category.Street:
                    self.move()

    def advance(self):
        self.state = self.next_state
        # if self.state == Person.States.Onboard:
        #     self.model.schedule.remove(self)
        #     self.model.grid.remove_agent(self)
        #     return
        if not self.model is None: 
            self.model.grid.move_agent(self, self.next_position)
    

In [195]:
class CityGridModel(Model):
    def __init__(self, block_size, x_blocks, y_blocks, num_people, num_cars, **kwargs) -> None:
        self.num_people = num_people
        self.num_cars = num_cars
        self.kwargs = kwargs


        self.counter = 0

        self.street_positions = []
        self.intersection_positions = []

        self.grid = MultiGrid(x_blocks * (block_size + 1) + 1,
                              y_blocks * (block_size + 1) + 1, False)
        self.schedule = SimultaneousActivation(self)

        self.setup_grid(block_size)
        self.setup_cars()
        self.setup_people()

        self.agents_to_remove = []
        self.agents_to_place = []

        self.datacollector = DataCollector(
            model_reporters={"Grid": get_status_grid}
        )

    def setup_grid(self, block_size):
         for (content, x, y) in self.grid.coord_iter():
            id = f"Cell-{x},{y}"
            position = (x, y)

            if x % (block_size + 1) == 0 and y % (block_size + 1) == 0:
                new_cell = Cell(id, self, Cell.Category.Intersection)
                self.intersection_positions.append(position)
            elif x % (block_size + 1) == 0 or y % (block_size + 1) == 0:
                new_cell = Cell(id, self, Cell.Category.Street)
                self.street_positions.append(position)
            else:
                new_cell = Cell(id, self, Cell.Category.Block)
                
            self.schedule.add(new_cell)
            self.grid.place_agent(new_cell, position)
    
    def setup_cars(self):
        for i in range(self.num_cars):
            origin = self.get_random_street_position()
            destination = self.get_random_street_position()

            new_car = Car(f"Car-{i+1}", self, destination)

            self.schedule.add(new_car)
            self.grid.place_agent(new_car, origin)
    
    def setup_people(self):
        for i in range(self.num_people):
            origin = self.get_random_street_position()
            destination = self.get_random_street_position()
            
            new_person = Person(f"Person-{i+1}", self, origin, destination)

            self.schedule.add(new_person)
            self.grid.place_agent(new_person, origin)


    def get_random_street_position(self) -> Tuple[int, int]:
        return choice(self.street_positions)

    def print_city(self):
        agent_counts = np.zeros((self.grid.width, self.grid.height))

        for cell in self.grid.coord_iter():
            cell_content, x, y = cell
            agent_count = len(cell_content)
            agent_counts[x][y] = agent_count
        plt.imshow(agent_counts, interpolation="nearest")
        plt.colorbar()

    def eventually_remove(self, agent):
        self.agents_to_remove.append(agent)

    def eventually_place(self, agent):
        self.agents_to_place.append(agent)

    def step(self):
        self.counter += 1
        self.datacollector.collect(self)
        self.schedule.step()

        if len(self.agents_to_remove):
            for agent in self.agents_to_remove:
                self.schedule.remove(agent)
                self.grid.remove_agent(agent)
                self.agents_to_remove.remove(agent)
                
        if len(self.agents_to_place):
            for drop_off in self.agents_to_place:
                self.schedule.add(drop_off.agent)
                self.grid.place_agent(drop_off.agent, drop_off.position)
                self.agents_to_place.remove(drop_off)


In [196]:
my_model = CityGridModel(
    block_size=4,
    x_blocks=3,
    y_blocks=3,
    num_people=1,
    num_cars=3
)

iterations = 200

for i in range(iterations):
    my_model.step()

data = my_model.datacollector.get_model_vars_dataframe()


In [197]:
%%capture
fig, axs = plt.subplots(figsize=(7, 7))
patch = plt.imshow(data.iloc[0][0], 
                   cmap=plt.cm.plasma,
                   origin="lower",
                   vmin=0,
                   vmax=255)

plt.show()

def animate(index):
    patch.set_data(data.iloc[index][0])


anim = animation.FuncAnimation(fig, animate, frames=iterations)

In [198]:
HTML(anim.to_jshtml())