## Avance de proyecto 1 - Equipo 2 - Modelación de sistemas multiagentes

Simulación de un cruce de cuatro altos, donde la regla estándar es rotar turnos entre las 4 direcciones de llegada según hayan llegado los carros primero.
<br><br>
Tecnológico de Monterrey - 17 de noviembre de 2021
<br>
Código basado en la actividad 3 de Carlos del Rosal, integrante del equipo

In [168]:
!pip install mesa

# Paquete esencial que ayuda a modelar sistemas multiagentes
from mesa import Agent, Model
from mesa.space import MultiGrid
from mesa.time import SimultaneousActivation
from mesa.datacollection import DataCollector

# Paquete matemático utilizado para matrices de declaración sencilla
import numpy as np

# Paquetes útiles para trabajar y graficar la animación de la simulación
import matplotlib
import matplotlib.pyplot as plt
import matplotlib.animation as animation
import pandas as pd

# Nativos de Python para medir tiempos y sus diferencias
import datetime
import time

# Nativo de Python para aleatorizar la aparición de carros
import random



In [169]:
# Clase para el ambiente "debajo" de los agentes móviles, calle, jardín o alto
class Terrain(Agent):
  # Constructor
  def __init__(self, id, model, terrain_type):
    # Construcción de la clase padre Agent
    super().__init__(id, model)
    self.id = id

    # Tipo de la celda terreno {0: Jardín, 1: Calle, 2: Alto}
    self.terrain_type = terrain_type

In [170]:
class Car(Agent):
  # Constructor
  def __init__(self, id, model, state, origin, destination, start_pos):
    # Construcción de la clase padre Agent
    super().__init__(id, model)
    self.id = id

    # Estado del vehículo {-1: Por destruir, 0: Detenido, 1: Avanzando}
    self.state = state
    self.queued = False

    # Datos de ubicación y dirección del movimiento del vehículo
    self.origin = origin
    self.destination = destination
    self.pos = start_pos
    self.next_pos = None

    # Desplazamiento inicial
    self.dx = -1 if self.origin == "West" else 1 if self.origin == "East" else 0
    self.dy = 1 if self.origin == "North" else -1 if self.origin == "South" else 0

  # Instante de acción, definición de cambios del agente en una nueva iteración
  def step(self):
    # Cambios de variables, no exactamente estados, que dependen de la posición
    # Si la posición requiere formarse en un 4 altos, así lo hace
    if self.pos in self.model.stop_points and not(self.queued):
      self.model.passing_queue.append(self.origin)
      self.queued = True
    # O bien, si se sale del 4 altos, así lo indica al resto de los carros
    elif self.pos in self.model.continue_points:
      self.model.passing_queue.pop(0)
      self.queued = False
    # Checa si self.dx y self.dy deben de ser modificados por dar una vuelta
    elif self.pos in self.model.cross_points:
      self.check_turn()

    # Siguiente posición según la dirección, puede que por el alto no se mueva ahí
    future_pos = (self.pos[0] + self.dx, self.pos[1] + self.dy)

    # Si la nueva posición saca al carro del modelo, llama su destrucción
    if self.model.grid.out_of_bounds(future_pos):
      self.state = -1
      return

    # Máquina de estados del carro - Forma de árbol para mejor lectura
    if self.state == 0:
      # No puede cambiar de detenido si el camino delante no está libre
      if self.see_free_road(future_pos):
        # No es necesario checar el alto en esta posición
        if self.pos not in self.model.stop_points: self.state = 1
        # Solo si el modelo dice que es turno de este carro se avanza
        elif self.model.passing_queue[0] == self.origin:
          # Por último, se checa que no haya ya un carro cruzando
          if not(self.model.car_crossing()):
            self.state = 1
    elif self.state == 1:
      # Detiene el carro si no hay espacio delante
      if not(self.see_free_road(future_pos)): self.state = 0
      # Solo podría detenerlo también estar en un alto
      elif self.pos in self.model.stop_points:
        # Por ejemplo, si es turno de otro carro, o aún hay un carro cruzando
        if self.model.passing_queue[0] != self.origin or self.model.car_crossing():
          self.state = 0

    # Solo guarda el desplazamiento si la máquina anterior así lo dice
    self.next_pos = future_pos if self.state == 1 else self.next_pos

  # Instante de acción, aplicación de cambios del agente en una nueva iteración
  def advance(self):
    # Avanza si se requiere, a menos de que sea necesario que sea destruído
    if self.state == 1:
      # Actualiza los valores y mueve al agente
      self.model.grid.move_agent(self, self.next_pos)
      self.pos = self.next_pos
    elif self.state == -1:
      # Destruye al agente desde el modelo mismo
      self.model.destroy_car(self)
  
  # Función de visión del espacio delante, true si se puede avanzar sin chocar
  def see_free_road(self, future_pos):
    for agent in self.model.grid.get_cell_list_contents(future_pos):
      if isinstance(agent, Car):
        # Solo regresa false para un carro parado, bien pueden avanzar juntos
        if agent.state == 0:
          return False
    return True

  def check_turn(self):
    # Casos donde nunca se da vuelta
    if (self.origin, self.destination) in [("North", "South"), ("South", "North"),
      ("West", "East"), ("East", "West")]: return
    
    # Vuelta al norte
    if self.destination == "North" and self.pos[0] == self.model.v_road[1]:
      self.dx, self.dy = [0, -1]
    # Vuelta al oeste
    elif self.destination == "West" and self.pos[1] == self.model.h_road[1]:
      self.dx, self.dy = [1, 0]
    # Vuelta al sur
    elif self.destination == "South" and self.pos[0] == self.model.v_road[0]:
      self.dx, self.dy = [0, 1]
    # Vuelta al este
    elif self.destination == "East" and self.pos[1] == self.model.h_road[0]:
      self.dx, self.dy = [-1, 0]

In [171]:
# Función auxiliar para capturar el modelo en un momento como datos
# Se almacenan: {0: Calle, 1: Jardín, 2: Carro, 3: Choque, 4: Alto}
def get_grid(model):
  # Itera el modelo llenando una cuadrilla que inicia vacía
  grid = np.zeros((model.grid.width, model.grid.height))
  for (cell_content, x, y) in model.grid.coord_iter():
    if len(cell_content) == 1 and isinstance(cell_content[0], Terrain):
      # Los únicos escenarios con un solo agente son los terrenos
      if cell_content[0].terrain_type in [0,1]:
        grid[x][y] = cell_content[0].terrain_type
      else:
        grid[x][y] = 4
    elif len(cell_content) == 2:
      # El único escenario con 2 agentes es el de un carro sobre la calle
      grid[x][y] = 2
    else:
      # El único escenario con 2+ agentes en un lugar es el choque
      grid[x][y] = 3

  # Transposición para que (x,y) queden como cartesianas y se anime Width*Height
  return np.transpose(grid)

In [172]:
class CrossroadModel(Model):
  # Constructor
  def __init__(self, M, N, SPAWN_RATE):
    # Inicialización de atributos para almacenar los datos recibidos
    self.M = M
    self.N = N
    self.SPAWN_RATE = SPAWN_RATE
    self.cars_spawned = 0
    
    # Creacíon de un Multigrid() para poder tener más de un agente por celda
    self.grid = MultiGrid(M, N, False)

    # Permite activar al mismo tiempo todos los componentes del modelo
    self.schedule = SimultaneousActivation(self)

    # Recolector de datos para futura representación gráfica
    self.datacollector = DataCollector(model_reporters = {"Grid": get_grid})

    # Definición de las calles y los puntos donde se crean y destruyen carros
    self.v_road = [M // 2 - 1, M // 2]
    self.h_road = [N // 2 - 1, N // 2]
    self.spawns = {
        "North": (self.v_road[0], 0), "West": (self.M - 1, self.h_road[0]),
        "South": (self.v_road[1], self.N - 1), "East": (0, self.h_road[1])
    }
    
    # Definición de los puntos para pararse por una señal de alto
    self.stop_points = [
        (self.v_road[0], self.h_road[0] - 1), # North
        (self.v_road[1] + 1, self.h_road[0]), # West
        (self.v_road[1], self.h_road[1] + 1), # South
        (self.v_road[0] - 1, self.h_road[1]) # East
    ]
    self.cross_points = [(v,h) for v in self.v_road for h in self.h_road]
    self.continue_points = [
        (self.v_road[1], self.h_road[0] - 1), # North
        (self.v_road[1] + 1, self.h_road[1]), # West
        (self.v_road[0], self.h_road[1] + 1), # South
        (self.v_road[0] - 1, self.h_road[0])  # East
    ]

    # Definición de las señales de alto decorativas
    stop_sign_pos = [
        (self.v_road[1] + 1, self.h_road[0] - 1), (self.v_road[1] + 1, self.h_road[1] + 1), 
        (self.v_road[0] - 1, self.h_road[1] + 1), (self.v_road[0] - 1, self.h_road[0] - 1)
    ]

    # Colocación de los terrenos calle o jardín en todas las coordenadas
    for (content, x, y) in self.grid.coord_iter():
      if x in self.v_road or y in self.h_road:
        # Crea un terreno con tipo 1 (Calle)
        new_terrain = Terrain((x, y), self, 1)
      elif (x, y) in stop_sign_pos:
        new_terrain = Terrain((x,y), self, 2)
      else:
        # Crea un terreno con tipo 0 (Jardín)
        new_terrain = Terrain((x, y), self, 0)
      self.grid.place_agent(new_terrain, (x, y))
    
    # Definición de la variable que permite el paso de un carro
    self.passing_queue = []

  # Unidad de cambio del modelo. También se llama a actuar a los agentes
  def step(self):
    self.spawn_cars()
    self.datacollector.collect(self)
    self.schedule.step()
  
  # Función que cada step coloca carros en cualquiera de las 4 direcciones
  def spawn_cars(self):
    for dir in self.spawns:
      # Considera también que no haya ya un carro ahí
      if (random.randint(1, 100) < self.SPAWN_RATE * 100 and
          not(self.cars_there(self.spawns[dir]))):
        # Se elige una dirección de fin que no sea la misma
        other_dir = dir
        while other_dir == dir: other_dir = random.choice([key for key in self.spawns])
        new_car = Car(self.cars_spawned, self, 1, dir, other_dir, self.spawns[dir])
        
        # Se coloca el agente creado con un id que se mantiene único
        self.grid.place_agent(new_car, new_car.pos)
        self.schedule.add(new_car)
        self.cars_spawned += 1
  
  # Función que elimina carros que hayan cumplido el recorrido
  def destroy_car(self, car_instance):
    self.grid.remove_agent(car_instance)
    self.schedule.remove(car_instance)
    
  # Función que checa las calles por un choque, sirve para detener la simulación
  def cars_crashed(self):
    # Checa la calle horizontal
    for i in range(self.M):
      if (self.cars_there((i, self.h_road[0])) > 1 or 
          self.cars_there((i, self.h_road[1])) > 1):
        self.datacollector.collect(self)
        return True
    # Checa la calle vertical
    for i in range(self.N):
      if (self.cars_there((self.v_road[0], i)) > 1 or 
          self.cars_there((self.v_road[0], i)) > 1):
        self.datacollector.collect(self)
        return True
    return False

  # Devuelve un entero indicando cuantos carros hay en la posición elegida
  def cars_there(self, pos):
    # Obtiene los agentes en la celda deseada y checa si son de tipo carro
    car_counter = 0
    agents_there = self.grid.get_cell_list_contents(pos)
    for agent in agents_there:
      if isinstance(agent, Car): car_counter += 1
    return car_counter
  
  # Indica si el punto medio de los 4 altos está libre de carros
  def car_crossing(self):
    for point in self.cross_points:
      agents_there = self.grid.get_cell_list_contents(point)
      for agent in agents_there:
        if isinstance(agent, Car): return True
    return False

In [173]:
# Parámetros de la simulación
# Una simulación interesante usa un tablero de 16x16 y un spawn rate de 0.1
M = 16
N = 16
SPAWN_RATE = 0.1
MAX_ITERATIONS = 200
i = 0

# Construcción del modelo y ejecución de las iteraciones
model = CrossroadModel(M, N, SPAWN_RATE)
while i < MAX_ITERATIONS and not(model.cars_crashed()):
  model.step()
  i += 1

In [174]:
# Recopila los datos del recolector por ser animados
all_grids = model.datacollector.get_model_vars_dataframe()

In [175]:
# Genera una animación con los datos anteriores
%%capture

# Modificación del mapa de colores para representación bonita
old_cmap = matplotlib.cm.get_cmap('viridis', 5)
old_colors = old_cmap(np.linspace(0, 1, 5))
old_colors[0] = np.array([96/256, 170/256, 70/256, 1]) # Verde pasto - Jardín
old_colors[1] = np.array([50/256, 50/256, 50/256, 1]) # Gris oscuro - Calle
old_colors[2] = np.array([0/256, 50/256, 100/256, 1]) # Azul oscuro - Carro
old_colors[3] = np.array([230/256, 100/256, 20/256, 1]) # Naranja - Choque
old_colors[4] = np.array([256/256, 0/256, 0/256, 1]) # Rojo puro - Señal de alto
new_cmap = matplotlib.colors.ListedColormap(old_colors)

# Modificación de parámetros de matplotlib para una impresión mejor
plt.rcParams["animation.html"] = "jshtml"
plt.rcParams["axes.titlesize"] = 28
matplotlib.rcParams['animation.embed_limit'] = 2**128

# Construcción y personalización de la gráfica animada
fig, axs = plt.subplots(figsize=(7,7))
axs.set_title("4-way stop signs simulation")
axs.set_xticks([])
axs.set_yticks([])
patch = plt.imshow(all_grids.iloc[0][0], cmap=new_cmap)

# Función que selecciona datos para cada frame i
def animate(i):
    patch.set_data(all_grids.iloc[i][0])

# Creación del objeto animación
crossroad_simulation = animation.FuncAnimation(fig, animate, 
                                               frames = all_grids.size)

In [176]:
# Corre la animación generada
crossroad_simulation