# M3. Actividad

Realizado por Andrea Corona Arroyo A01366768

Link a Github: https://github.com/AndreaCorona523/MultiAgentes.git

Modela, de manera individual, el sistema multiagente necesario para simular una intersección controlada por señales de semáforos inteligentes:

- Mientras no haya un vehículo cercano, el semáforo estará en luz amarilla. 
- Cuando un vehículo se acerque a la intersección, enviará un mensaje con el tiempo estimado de arribo.
- El semáforo dará luz verde al semáforo más cercano y establecerá un programa de luces a partir de ese punto para el resto de los vehículos.

Detalla cuáles sería los agentes involucrados, qué tipo de agente sería y forma de interacción entre ellos. Adicionalmente, desarrolla una simulación implemente tu modelo.

# Agentes e Interacciones

En el modelo que he diseñado, se cuentan con dos tipos de agente: Auto y Semáforo. Como sus nombres lo indican el primero representa los automóviles que circulan por la calle y el segundo son los semáforos que indican qué automóviles pueden cruzar y cuáles no. 

A continación se detalla el agente tipo Auto:
- Tipo de agente: Auto
- Medidas de rendimiento: Seguro y rápido, esto de manera que pueda a llegar a su destino sin inconvenientes y que no se quede detenido en un semáforo por mucho tiempo.
- Entorno: Calles, otro trafico (coches), semáforos.
- Actuadores: Dirección, freno, señal. 
- Sensores: GPS, dado que posee instrucciones al girar hacia los lados. 

Este tipo de agentes es reactivo simple, dado que tiene reglas de reglas de situación-acción, las cuales son las siguientes:
- En cada paso elige aleatoriamente si sigue derecho, da vuelta a la izquierda o a la derecha.
- Si el semáforo no está en rojo o el auto no se encuentra a punto de entrar a la intersección, el auto puede avanzar. 
- Si el auto desea moverse a un lugar, donde se encuentra otro auto, este espera en su posición. 
- El auto envía una señal cuando se encuentra cerca del semáforo.

A continación se detalla el agente tipo Semáforo:
- Tipo de agente: Semáforo
- Medidas de rendimiento: Movibilidad vehicular eficiente, sin tráfico pesado. 
- Entorno: Calles, coches, otros semáforos.
- Actuadores: Luz.
- Sensores: Sensor de señal.

Este tipo de agentes es reactivo simple, dado que tiene reglas de reglas de situación-acción, las cuales son las siguientes:
- Si hay un agente a cerca, cambia su luz a verde.
- Si hay autos cerca de dos semáforos en calles con direcciones opuestas, ambos se van a prender. 
- Si detecta una señal, sólo se cambiará de verde a rojo, hasta que haya detectado que el cruce esté libre (sin autos en circulación).
- Si no hay autos cercanos, el semáforo permanece en amarillo.

Por otro lado, el modelo va a ser la calle por la que circulan los automóviles.

Las interacciones entre ellos, se definen por las reglas establecidas previamente. En otras palabras, la interacción principal será que el coche envía una señal que el semáforo recibe y este último actúa en consecuencia, así como el estado del semáforo, afecta el comportamiento del auto. El modelo regula la actualizaron de las señales y los semáforos. 

## Imports

Antes de empezar a crear el modelo de los robots de limpieza con multiagentes es necesario tener instalado los siguientes paquetes:
- `python`
- `mesa`: el framework de Python para el modelado de agentes.
- `numpy`
- `matplotlib`

Para poder modelar el juego de la vida usando el framework de `mesa` es necesario importar dos clases: una para el modelo general, y otro para los agentes. 

In [1]:
# La clase `Model` se hace cargo de los atributos a nivel del modelo, maneja los agentes. 
# Cada modelo puede contener múltiples agentes y todos ellos son instancias de la clase `Agent`.
from mesa import Agent, Model 

# Debido a que no se permitirá más de un agente por celda elegimos `SingleGrid`.
from mesa.space import SingleGrid

# Con `SimultaneousActivation` hacemos que todos los agentes se activen de manera simultanea.
from mesa.time import SimultaneousActivation

# Vamos a hacer uso de `DataCollector` para obtener el grid completo cada paso (o generación) y lo usaremos para graficarlo.
from mesa.datacollection import DataCollector

# mathplotlib lo usamos para graficar/visualizar como evoluciona el autómata celular.
%matplotlib inline
import matplotlib
import matplotlib.pyplot as plt
import matplotlib.animation as animation
plt.rcParams["animation.html"] = "jshtml"
matplotlib.rcParams['animation.embed_limit'] = 2**128

# Definimos los siguientes paquetes para manejar valores númericos.
import numpy as np
import pandas as pd
import random

# Definimos otros paquetes que vamos a usar para medir el tiempo de ejecución de nuestro algoritmo.
import time
import datetime

# Creación del Modelo

In [2]:
def obtenerCalle(model):
    '''
    Esta es una función auxiliar que nos permite guardar el grid para cada uno de los agentes.
    param model: El modelo del cual optener el grid.
    return una matriz con la información del grid del agente.
    '''
    grid = np.zeros((model.grid.width, model.grid.height))
    for cell in model.grid.coord_iter():
        cell_content, x, y = cell
        if isinstance(cell_content, CarAgent):
            grid[x][y] = 0
        elif isinstance(cell_content, TrafficLight):
            grid[x][y] = cell_content.status
        else:
            grid[x][y] = 1
    return grid

class CarAgent(Agent):
    '''
    Representa a un coche.
    '''
    def __init__(self, unique_id, model,orientation, direction):
        '''
        Agente representa un automóvil, se inicializa con un id, con el numero de movimientos realizados en 0, 
        con su siguiente posición en None, su orientación (V - vertical y H - horizontal),
        su direccion (U - arriba y D - abajo), el tipo de movimiento que va a 
        realizar (F - adelante, TR - vuelta a la derecha y TL - vuelta a la izquierda), una lista vacía de pasos 
        siguientes, y con el color del coche
        '''
        
        super().__init__(unique_id, model)
        self.numMovimientos = 0
        self.nextPos = None
        self.orientation = orientation
        self.direction = direction
        self.nextSteps = []
        self.movement = "F"
        self.color = random.uniform(0, 0.29)
    
    def moveForward(self):
        '''
        Este método se utiliza para avanzar el agente
        '''
        nextX = self.pos[0]
        nextY = self.pos[1]
        if self.orientation == "V":
            if self.direction == "U":
                nextX -= 1
            else:
                nextX += 1
        else:
            if self.direction == "U":
                nextY += 1
            else:
                nextY -= 1
        self.nextPos = (nextX, nextY)
           
    def turnLeft(self): 
        '''
        Este método se utiliza para girar el agente a la izquierda
        '''
        if self.orientation == "V":
            self.orientation = "H"
            if self.direction == "U":
                self.direction = "D"
            else:
                self.direction = "U"
        else:
            self.orientation = "V"

    def turnRight(self):
        '''
        Este método se utiliza para girar el agente a la derecha
        '''
        if self.orientation == "V":
            self.orientation = "H"
        else:
            self.orientation = "V"
            if self.direction == "U":
                self.direction = "D"
            else:
                self.direction = "U"
       
    def findClosestTrafficLight(self):
        '''
        Este método se utiliza para encontrar el semáforo más cercano
        '''
        if self.orientation == "V":
            if self.direction == "U":
                return 3
            return 1
        else:
            if self.direction == "U":
                return 0
            return 2
        
    def closestTrafficLight(self):
        '''
        Este método se utiliza para calcular la distancia entre el auto y el semáforo
        '''
        idx = self.findClosestTrafficLight()
        if idx % 2 == 0: 
            return idx,  abs(self.pos[1] - model.trafficLights[idx].pos[1])
        return idx, abs(self.pos[0] - model.trafficLights[idx].pos[0])
    
    def sendSignal(self):
        '''
        Este método se utiliza para enviar la señal al semáforo.
        '''
        idx, dist = self.closestTrafficLight()
        if dist <= 2:
            if not self.unique_id in model.trafficLights[idx].signals and not model.carInIntersection(self):
                model.trafficLights[idx].signals.append(self.unique_id)
                
                return True
        return False
        
    def checkTrafficLight(self):
        '''
        Este método se utiliza para revisar si el semáforo más cercano está en rojo y si puede avanzar
        '''
        idx, dist = self.closestTrafficLight()
        if model.trafficLights[idx].status == model.trafficLights[idx].RED and dist == 0 and len(self.nextSteps) == 0:
            return False
        return True
                 
    def movementTurn(self, direc):
        '''
        Este método se utiliza para realizar la vuelta dependiendo si es a la izquierda o derecha
        '''
        if direc == "TL":
            self.nextSteps.extend(["F", "TL"])
        else:
            self.nextSteps.extend(["F", "F", "TR"])
            
    def step(self):
        '''
        En este método el agente realiza las acciones del agente por cada paso.
        '''
        
        move = "S"
        #Checa el siguiente movimiento del auto
        if self.checkTrafficLight():
            move = "F"
            if self.closestTrafficLight()[1] == 0 and self.movement == "F":
                self.movement = random.choice(["F", "TR", "TL"])
                
            if (self.movement == "TL" or self.movement == "TR") and len(self.nextSteps) == 0:
                self.movementTurn(self.movement)

            if len(self.nextSteps) > 0:
                move = self.nextSteps.pop(0)
                
        if move == "TR":
            self.turnRight()
            self.movement = "F"
        elif move == "TL":
            self.turnLeft()
            self.movement = "F"
            
        
        if (self.movement == "F" or move == "F") and move != "S":
            self.moveForward()
            
             
        #Actualizar la posición en la lista de posiciones de los autos
        
        carsPositions = [car.nextPos for car in model.cars if car != self]
        
        if self.nextPos in carsPositions:
            self.nextPos = self.pos
            if (self.movement == "TL" or self.movement == "TR") and move == "F":
                self.nextSteps.insert(0, "F")
        else:
            self.numMovimientos += 1
        
        self.sendSignal()
            

    def advance(self):
        '''
        Define el nuevo estado calculado del método step.
        '''
        self.model.grid.move_agent(self, self.nextPos)
        self.numMovimientos += 1

class TrafficLight(Agent):
    '''
    Representa a un semáforo con los tres colores verde, amarillo y rojo.
    '''
    GREEN = 0.3
    YELLOW = 0.52
    RED = 0.7
    
    def __init__(self, unique_id, model):
        '''
        Agente representa una celda del piso, se inicializa con un id y 
        con el status de rojo
        '''
        super().__init__(unique_id, model)
        self.status = self.YELLOW
        self.signals = []
        
    def addToQueue(self):
        '''
        Este método se utiliza para agregar el semáforo a la fila de espera en caso de que reciba señales,
        o quitarse en caso de que ya no tenga señales. Asimismo, si no tiene señales se agrega a la lista 
        del modelo que almacena los semáforos libres.
        '''
        idxSelf = model.findTrafficLightIndex(self.unique_id) 
        if len(self.signals) > 0:
            if not self.unique_id in model.queueTrafficLights:
                model.queueTrafficLights.append(self.unique_id)
            if self.unique_id in model.freeTrafficLights:
                model.freeTrafficLights.remove(self.unique_id)
        else:
            if idxSelf in model.trafficLightsOn:
                model.trafficLightsOn.remove(idxSelf)
            if self.unique_id in model.queueTrafficLights:
                model.queueTrafficLights.remove(self.unique_id)
            if not self.unique_id in model.freeTrafficLights:
                model.freeTrafficLights.append(self.unique_id)
    
    
    def checkDistFromCar(self, idxTF):
        '''
        Este método se utiliza para revisar la distancia de un auto con respecto a un 
        semáforo determinado
        '''
        idxSelf = model.findTrafficLightIndex(self.unique_id) 
        for car in model.cars:
            idx, dist = car.closestTrafficLight()
            if idx == idxSelf and not model.carInIntersection(car):
                return dist
        return -1
    
    def updateSignals(self):
        '''
        Este método se utiliza para actualizar las señales de los semáforos.
        '''
        idxSelf = model.findTrafficLightIndex(self.unique_id) 

        for car in model.cars:
            idx, dist = car.closestTrafficLight()
            if car.unique_id in self.signals and not model.carInIntersection(car):
                if idx != idxSelf :
                    self.signals.remove(car.unique_id)

                if idx == idxSelf:
                    if car.direction == "U" and car.orientation == "H" and car.pos[1] > self.pos[1]:
                        self.signals.remove(car.unique_id)
                        
                    elif car.direction == "D" and car.orientation == "H" and car.pos[1] < self.pos[1]:
                        self.signals.remove(car.unique_id)
                    elif car.direction == "U" and car.orientation == "V" and car.pos[0] < self.pos[0]:
                        self.signals.remove(car.unique_id)
                    elif car.direction == "D" and car.orientation == "V" and car.pos[0] > self.pos[0]:
                        self.signals.remove(car.unique_id)
                    
        
class StreetModel(Model):
    '''
    Define el modelo de la calle donde se encuentran los autos.
    '''
    def __init__(self, num, width, height):
        self.grid = SingleGrid(width, height, True)
        self.width = height
        self.height = height
        self.maxNumCars = num
        self.midWidth = self.width // 2
        self.midHeight = self.height // 2
        self.schedule = SimultaneousActivation(self)
        self.firstTrafficLightName = ""
        
        self.cars = []
        self.trafficLights = []
        self.queueTrafficLights = []
        self.freeTrafficLights = []
        self.trafficLightsOn = []
        
        
        #Posicionar semáforos 

        a = TrafficLight("trafLight Sup Left", self)
        self.grid.place_agent(a, (self.midWidth - 2, self.midHeight - 2))
        self.schedule.add(a)
        self.trafficLights.append(a)
        
        
        b = TrafficLight("trafLight Sup Right", self)
        self.grid.place_agent(b, (self.midWidth - 2, self.midHeight + 1))
        self.schedule.add(b)
        self.trafficLights.append(b)
        
        c = TrafficLight("trafLight Inf Right", self)
        self.grid.place_agent(c, (self.midWidth + 1, self.midHeight + 1))
        self.schedule.add(c)
        self.trafficLights.append(c)
        
        d = TrafficLight("trafLight Inf Left", self)
        self.grid.place_agent(d, (self.midWidth + 1, self.midHeight - 2))
        self.schedule.add(d)
        self.trafficLights.append(d) 
        
        
        #Posicionar autos hasta un máximo de ocho a la vez
        maxNumCars = min(8, self.maxNumCars)
        numCars = random.randint(1, maxNumCars)
        self.addCars(numCars)
        
        
        # Aquí definimos con colector para obtener la habitación y el número de movimientos totales.
        self.datacollector = DataCollector(
            model_reporters={'Habitacion': obtenerCalle},
            agent_reporters={'Movimientos': lambda a: getattr(a, 'numMovimientos', None)},
        )
        
    
    def step(self):
        '''
        En cada paso el colector tomará la información que se definió y almacenará el grid para luego graficarlo.
        '''
        #Se actualiza la luz de los semáforos
        self.changeTrafficLight()
        
        self.datacollector.collect(self)
        self.schedule.step()
        
        #Se agregan la cantidad de autos faltantes de forma paulatina
        carsLeft = self.maxNumCars - len(self.cars)
        if  carsLeft > 0 :
            carsLeft = min(8, carsLeft)
            numCars = random.randint(1, carsLeft)
            self.addCars(numCars)

    def addCars(self, n):
        '''
        Este método se utiliza para agregar n cantidad de autos.
        '''
        for n in range(n):
            approved = False
            while not approved:
                pos = random.randint(self.midWidth - 1, self.midWidth)
                orientation = random.choice(["H","V"])
                if pos < self.midWidth:
                    direction = "U" 
                    if orientation == "H":
                        coords = (pos, 0)
                    else: 
                        coords = (self.height - 1, pos)

                else:
                    direction = "D"
                    if orientation == "H":
                        coords = (pos, self.width - 1)
                    else: 
                        coords = (0, pos)
                
                carsPositions = [car.pos for car in self.cars]
                
                if not coords in carsPositions:
                    approved = True
            
            
            a = CarAgent(len(self.cars), self, orientation, direction)
            self.grid.place_agent(a, coords)
            self.schedule.add(a)
            self.cars.append(a)
    
    def intersectionEmpty(self):
        '''
        Este método se utiliza para verificar la intersección está libre.
        '''
        for i in range(self.midWidth - 1, self.midWidth + 1):
            for j in range(self.midHeight - 1, self.midHeight + 1):
                if not self.grid.is_cell_empty((i,j)):
                    return False
        return True
    
    def carInIntersection(self, car):
        '''
        Este método se utiliza para verificar si un coche se encuentra o no en la intersección.
        '''
        for i in range(self.midWidth - 1, self.midWidth + 1):
            for j in range(self.midHeight - 1, self.midHeight + 1):
                if car.pos == (i,j):
                    return True
        return False
                
    def findTrafficLightIndex(self, nameTrafficLight):
        '''
        Este método se utiliza para encontrar el indice de un semáforo en la lista de semáforos.
        '''
        for i in range(len(self.trafficLights)):
                if self.trafficLights[i].unique_id == nameTrafficLight:
                    return i

    def changeTrafficLight(self):
        '''
        Este método se utiliza para realizar el cambio de los semáforos.
        '''
            
        #Se actualiza la lista de señales del semáforo
        for idxTF in range(len(self.trafficLights)):
            
            self.trafficLights[idxTF].updateSignals()
            self.trafficLights[idxTF].addToQueue()
            
            if idxTF in self.trafficLightsOn:
                self.trafficLights[idxTF].status = self.trafficLights[idxTF].GREEN
            elif self.trafficLights[idxTF].unique_id in self.queueTrafficLights:
                self.trafficLights[idxTF].status = self.trafficLights[idxTF].RED
            elif self.trafficLights[idxTF].unique_id in self.freeTrafficLights:
                self.trafficLights[idxTF].status = self.trafficLights[idxTF].YELLOW
        
        #Se cambian a verde los semáforos correspondientes
        if len(self.queueTrafficLights) > 0:
            if len(self.trafficLightsOn) == 0:
                self.firstTrafficLightName = self.queueTrafficLights[0]
                idx = self.findTrafficLightIndex(self.firstTrafficLightName)
                self.trafficLights[idx].status = self.trafficLights[idx].GREEN
                self.trafficLightsOn.append(idx)

            else:
                idx = self.trafficLightsOn[0]

            modTF = idx % 2
            if modTF == idx:
                idxOther = idx + 2
            else:
                idxOther = idx - 2
            if self.trafficLights[idxOther].unique_id in self.queueTrafficLights:
                self.trafficLights[idxOther].status = self.trafficLights[idxOther].GREEN
                if not idxOther in self.trafficLightsOn:
                    self.trafficLightsOn.append(idxOther)
                    
            
            

A continuación se corre el modelo:

In [3]:
# Definimos el tamaño del Grid
M = 12
N = 12

# Definimos el número de agentes
NUM_AGENTS = 5

# Definimos tiempo máximo (segundos)
MAX_TIME = 0.06

# Registramos el tiempo de inicio y corremos el modelo
num_generations = 0

start_time = time.time()
model = StreetModel(NUM_AGENTS, M, N)
while time.time() - start_time < MAX_TIME:
    num_generations += 1
    model.step()

Obtenemos la información que almacenó el colector, este nos entregará un DataFrame de pandas que contiene toda la información.

In [4]:
all_grid = model.datacollector.get_model_vars_dataframe()

Graficamos la información usando matplotlib

In [5]:
%%capture

fig, axs = plt.subplots(figsize=(7,7))
axs.set_xticks([])
axs.set_yticks([])
patch = plt.imshow(all_grid.iloc[0][0], cmap=plt.colormaps["gist_ncar"])

def animate(i):
    patch.set_data(all_grid.iloc[i][0])
    
anim = animation.FuncAnimation(fig, animate, frames=num_generations)

In [6]:
anim