In [None]:
#pip install mesa
#pip install seaborn
#pip install requests

In [None]:
# hola

# Rodrigo Merino de la Parra A00836396

# Simulacion de interseccion con semaforos inteligentes

Jupyter Notebook

LIBRERIAS 

pip install --upgrade mesa (Install Mesa)

pip install seaborn ( used for data visualization )

link video de simulacion: https://www.youtube.com/watch?v=kVRj0xavIhk

In [None]:
import mesa
from mesa.visualization.ModularVisualization import ModularServer
from mesa.visualization.modules import CanvasGrid
import seaborn as sns
import numpy as np 
import pandas as pd
import random
import queue
import requests

# Clase modelo de la interseccion
class IntersectionModel(mesa.Model):
    '''
    Modela una interseccion controlada por semaforos inteligentes
    '''
    

    def __init__(self, width, height):
        super().__init__()
        self.grid = mesa.space.MultiGrid(width, height, True)
        self.schedule = mesa.time.RandomActivation(self)
        self.directions = ['N_S', 'S_N', 'W_E', 'E_W']
        self.width = width
        self.height = height
        self.traffic_lights = {}
        self.mensajes = queue.Queue()
        self.current_step_data = {} # Diccionario para almacenar datos de cada paso para enviar a la visualización
        

        # Creamos los semaforos
        traffic_light_E_W = TrafficLight(1, self)
        traffic_light_W_E = TrafficLight(2, self)
        traffic_light_N_S = TrafficLight(3, self)
        traffic_light_S_N = TrafficLight(4, self)

        # Ponemos los semaforos en las posiciones correctas
        self.grid.place_agent(traffic_light_W_E, (self.width // 2 - 2, self.height // 2 - 1))  # left
        self.grid.place_agent(traffic_light_S_N, (self.width // 2, self.height // 2 - 2))  # bottom 
        self.grid.place_agent(traffic_light_E_W, (self.width // 2 + 1, self.height // 2))  # right
        self.grid.place_agent(traffic_light_N_S, (self.width // 2 - 1, self.height // 2 + 1))  # top 

        traffic_light_data = {
            'lights': {
                'W_E': {'pos': (self.width // 2 - 2, self.height // 2 - 1), 'state': 'red'},
                'S_N': {'pos': (self.width // 2, self.height // 2 - 2), 'state': 'red'},
                'E_W': {'pos': (self.width // 2 + 1, self.height // 2), 'state': 'red'},
                'N_S': {'pos': (self.width // 2 - 1, self.height // 2 + 1), 'state': 'red'}
            
            }

            
        }

        # CHECAR ESTO DE LA API --------------------------------------------------------------------------------------------------
        response = requests.post('http://127.0.0.1:5000/api/post_traffic_lights_data', json=traffic_light_data)
        


        # Ponemos las direcciones de los semaforos
        traffic_light_E_W.direction = 'E_W'
        traffic_light_W_E.direction = 'W_E'
        traffic_light_N_S.direction = 'N_S'
        traffic_light_S_N.direction = 'S_N'

        # Almacena los semaforos en un diccionario
        self.traffic_lights['W_E'] = traffic_light_W_E
        self.traffic_lights['N_S'] = traffic_light_N_S
        self.traffic_lights['E_W'] = traffic_light_E_W
        self.traffic_lights['S_N'] = traffic_light_S_N

        # Schedule a los semaforos
        self.schedule.add(traffic_light_W_E)
        self.schedule.add(traffic_light_S_N)
        self.schedule.add(traffic_light_E_W)
        self.schedule.add(traffic_light_N_S)

        # Creamos los vehiculos
        self.spawn_vehicle()
    
        # Creamos un agente central para controlar los semaforos
        central_controller = CentralController(5, self)  
        self.schedule.add(central_controller)

    def spawn_vehicle(self):
        # Seleccionamos una direccion aleatoria
        direction = random.choice(self.directions)
        vehicle = Vehicle(6, self, direction) # Creamos el vehiculo
        coordinate = self.get_starting_coordinate(direction)
        x, y = coordinate
        self.grid.place_agent(vehicle, (x, y))


        # Agrergamos el vehiculo al schedule
        self.schedule.add(vehicle) 

    # Funcion para las coordenadas iniciales de los vehiculos
    def get_starting_coordinate(self, direction):
        if direction == 'N_S':
            return (self.width // 2 - 1, self.height - 1)
        elif direction == 'S_N':
            return (self.width // 2, 0)
        elif direction == 'W_E':
            return (0, (self.height // 2) - 1)
        elif direction == 'E_W':
            return (self.width - 1, self.height // 2)
        
    # Funcion para mandar un mensaje
    def mandar_mensaje(self, mensaje):
        self.mensajes.put(mensaje)


    def step(self):
        self.schedule.step()
        if self.schedule.steps % 5 == 0:
            self.spawn_vehicle()

        # ---------------------------------------- ALMACENAMIENTO DE DATOS ---------------------------------------- #
        # CHECAR ESTO DE LA API --------------------------------------------------------------------------------------------------

        self.current_step_data['step'] = self.schedule.steps
        self.current_step_data['agentes'] = {}


        for agent in self.schedule.agents:
            if isinstance(agent, Vehicle):
                self.current_step_data['agentes'][agent.unique_id] = {
                    'pos': {'x': agent.pos[0], 'y': agent.pos[1]}
                }
                
        print('Sending data to API...')  # For debugging
        response = requests.post('http://127.0.0.1:5000/api/vehicle_data', json=self.current_step_data)
        if response.status_code == 200:
            print('Data successfully sent to API')
        else:
            print(f'Failed to send data to API. Status code: {response.status_code}')



# Clase del agente vehiculo
class Vehicle(mesa.Agent):
    '''
    Agente de vehiculo
    '''

    def __init__(self, unique_id, model, direction):
        self.unique_id = unique_id
        self.model = model
        self.direction = direction
        self.pos = (1,1)
        self.new_direction = None 

    def step(self):
        self.move()


    # Equivalente a la funcion accion()
    def move(self):
        next_position = self.pos
        arriba_izquierda = (self.model.width // 2 - 1, self.model.height // 2)
        arriba_derecha = (self.model.width // 2, self.model.height // 2)
        abajo_izquierda = (self.model.width // 2 - 1, self.model.height // 2 - 1)
        abajo_derecha = (self.model.width // 2, self.model.height // 2 - 1)

        # Movemos el vehículo dependiendo de su dirección
        if self.can_move():

            
            if self.direction == 'W_E' and not self.new_direction:
                if self.pos == abajo_izquierda and random.choice([True, False]):
                    self.new_direction = 'S'  
                elif self.pos == abajo_derecha and random.choice([True, False]):
                    self.new_direction = 'N' 

            elif self.direction == 'E_W' and not self.new_direction:
                if self.pos == arriba_derecha and random.choice([True, False]):
                    self.new_direction = 'N'
                elif self.pos == arriba_izquierda and random.choice([True, False]):
                    self.new_direction = 'S'

            elif self.direction == 'N_S' and not self.new_direction:
                if self.pos == arriba_izquierda and random.choice([True, False]):
                    self.new_direction = 'W'
                elif self.pos == abajo_izquierda and random.choice([True, False]):
                    self.new_direction = 'E'

            elif self.direction == 'S_N' and not self.new_direction:
                if self.pos == abajo_derecha and random.choice([True, False]):
                    self.new_direction = 'E'
                elif self.pos == arriba_derecha and random.choice([True, False]):
                    self.new_direction = 'W'
            


            # Actualiza la posición basada en la dirección original o la nueva dirección
            if self.direction == 'N_S':
                next_position = (self.pos[0], self.pos[1] - 1)
            elif self.direction == 'S_N':
                next_position = (self.pos[0], self.pos[1] + 1)
            elif self.direction == 'W_E':
                next_position = (self.pos[0] + 1, self.pos[1])
            elif self.direction == 'E_W':
                next_position = (self.pos[0] - 1, self.pos[1])        

            if self.new_direction == 'S':
                next_position = (self.pos[0], self.pos[1] - 1)
            elif self.new_direction == 'N':
                next_position = (self.pos[0], self.pos[1] + 1)
            elif self.new_direction == 'E':
                next_position = (self.pos[0] + 1, self.pos[1])
            elif self.new_direction == 'W':
                next_position = (self.pos[0] - 1, self.pos[1])
            

            
            # Checamos si la siguiente posición está dentro de los límites del grid
            if 0 <= next_position[0] < self.model.grid.width and 0 <= next_position[1] < self.model.grid.height:

                # Verifica si la celda destino ya está ocupada por otro vehículo
                cell_contents = self.model.grid.get_cell_list_contents(next_position)
                vehicles_in_cell = [obj for obj in cell_contents if isinstance(obj, Vehicle)]

                if not vehicles_in_cell:  # Si no hay vehículos en la celda destino
                    self.model.grid.move_agent(self, next_position)

            else:
                # Si la celda destino está fuera de los límites del grid, elimina el agente
                self.model.grid.remove_agent(self)
                self.model.schedule.remove(self)
        else:
            pass # se espera 

    # Equivalente a la funcion see()
    def can_move(self):
        # Coordenadas del cruzamiento
        arriba_izquierda = (self.model.width // 2 - 1, self.model.height // 2)
        arriba_derecha = (self.model.width // 2, self.model.height // 2)
        abajo_izquierda = (self.model.width // 2 - 1, self.model.height // 2 - 1)
        abajo_derecha = (self.model.width // 2, self.model.height // 2 - 1)

        # Verifica si el vehículo está en alguna de las coordenadas del cruce
        esta_en_cruce = self.pos in [arriba_izquierda, arriba_derecha, abajo_izquierda, abajo_derecha]

        # Si el vehículo está en el cruce, puede moverse sin importar el estado del semáforo
        if esta_en_cruce:
            return True

        # Obtiene el semáforo relevante basado en la dirección del vehículo.
        traffic_light = self.model.traffic_lights[self.direction]

        # Verifica si el semáforo está en rojo.
        is_light_red = traffic_light.state == 'red'

        # Calcula si el vehículo está adyacente al semáforo.
        is_adjacent_to_light = self.get_distance() == 1

        # Si el semáforo está en rojo y el vehículo está adyacente a él, no puede moverse.
        if is_light_red and is_adjacent_to_light:
            return False

        # En cualquier otro caso, el vehículo puede moverse.
        return True 

    def get_distance(self):
        # Obtenemos la posicion del semaforo 
        traffic_light_pos = self.model.traffic_lights[self.direction].pos
        # Calculamos distancia Manhattan
        distance = abs(self.pos[0] - traffic_light_pos[0]) + abs(self.pos[1] - traffic_light_pos[1])
        return distance 


# Clase para el agente semaforo
class TrafficLight(mesa.Agent):
    '''
    Agente de semaforo inteligente
    '''

    def __init__(self, unique_id, model):
        super().__init__(unique_id, model)
        self.state = 'yellow' # Estado inicial
        self.phase = None # Fase actual
        self.phase_counter = 0 # Contador de pasos en la fase actual
        self.neighbors = []
        self.direction = {}
        self.cantidad_carros = 0

    def see(self):
        self.neighbors = [carro for carro in self.model.grid.get_neighbors(self.pos, moore=False, include_center=True, radius=(self.model.width // 2)) if isinstance(carro, Vehicle) and carro.direction == self.direction]
        self.cantidad_carros = len(self.neighbors)
        return self.cantidad_carros

    # ESTA FUNCION VA A MANDAR UN MENSAJE A LA CLASE CENTRAL CONTROLLER PARA QUE SU FUNCION SEE LO RECIBA
    def accion(self):

        mensaje = Message(
            performative="inform",
            content=f"{self.cantidad_carros},{self.direction}",
            sender=str(self.unique_id),
            is_reply=False
        )

        self.model.mandar_mensaje(mensaje)


    def step(self):
        self.see()
        self.accion()


# Clase para los mensajes que enviaran los semaforos al controlador
class Message():

    performatives = ["request","inform"]
    parameters = ["content","sender","reply-with","in-reply-to"]

    def __init__(self,msg="",performative="",content="",sender="",query="q1",is_reply=True):
        """Constructor to build a new message"""
        self.empty = False
        self.request = False
        self.inform = False
        self.msg = msg

        #If we want to build a message from the paramters
        if msg == "":
            self.is_reply = is_reply
            self.query = query
            assert performative in Message.performatives , f"Performaive: {performative}"
            self.performative=performative
            self.content = content
            self.sender = sender

        #if we want to build a message from a string (a KQML message)
        else:
            self.decode()

        #Identify if its either Request or Inform performative
        if self.performative == "request":
            self.request = True
        elif self.performative == "inform":
            self.inform = True
        else:
            self.empty = True

    def decode(self):
        """Method to convert a string message (KQML format) to message parameters"""
        current = self.msg[1:-1]
        current = current.split("\n")
        self.performative = current[0]
        assert self.performative in Message.performatives , f"Performaive: {self.performative}"
        parameterList = current[1].split(":")[1:]
        parametersDict = {}
        for parameter in parameterList:
            pair = parameter.split(" ")
            parametersDict[pair[0]] = pair[1]
        if "in-reply-to" in parametersDict.keys():
            self.query = parametersDict["in-reply-to"]
            self.is_reply = True
        else:
            self.query = parametersDict["reply-with"]
            self.is_reply = False
        self.content = parametersDict["content"]
        self.sender = parametersDict["sender"]

    def __str__(self):
        """Method to convert message paramters to a string (KQML format)"""
        s = "("
        s+= self.performative + "\n"
        s+= ":sender " + self.sender
        s+= ":content "+self.content
        if self.is_reply:
            s+= ":in-reply-to " + self.query
        else:
            s+= ":reply-with " + self.query
        s+= ")"
        return s


# Clase del control central de los semaforos
class CentralController(mesa.Agent):
    def __init__(self, unique_id, model):
        super().__init__(unique_id, model)
        self.phase_duration = 10  # Duración de la fase actual
        self.phase_counter = 0  # Contador de pasos en la fase actual
        self.traffic_info = {}  # Diccionario con información del tráfico
        self.wait_steps = 2  # Pasos a esperar antes de cambiar a verde
        self.change_phase_in_next_step = False  # Indica si se debe cambiar la fase en el próximo paso
        self.next_traffic_light_to_green = None  # Próximo semáforo a poner en verde



    # FUNCION PARA VER Y RECIBIR EL MENSAJE QUE NOS ENVIA EL SEMAFORO
    def see(self):

        while not self.model.mensajes.empty():
            mensaje = self.model.mensajes.get()  # Obtiene el mensaje más antiguo
            content = mensaje.content.split(',')
            cantidad_carros = int(content[0])
            direction = content[1]
            # Aquí podrías actualizar un diccionario o estructura de datos con esta información
            self.traffic_info[direction] = cantidad_carros

    def decide_next_traffic_light(self):
        # Decidir cuál semáforo tiene más vehículos esperando
        max_vehicles = -1
        for direction, car_count in self.traffic_info.items():
            if car_count > max_vehicles:
                max_vehicles = car_count
                self.next_traffic_light_to_green = direction


    def step(self):
        self.see()
        self.phase_counter += 1
        if self.change_phase_in_next_step:
            if self.phase_counter >= self.wait_steps:
                self.act()
                self.phase_counter = 0  # Reiniciamos el contador de la fase
                self.change_phase_in_next_step = False
        else:
            # Decidimos qué semáforo cambiará en el próximo ciclo permitido
            self.decide_next_traffic_light()
            self.change_phase_in_next_step = True


    def act(self):
        # Poner todos los semáforos en rojo excepto el seleccionado
        for direction in self.traffic_info.keys():
            self.model.traffic_lights[direction].state = 'red'
        # Poner en verde el semáforo seleccionado
        if self.next_traffic_light_to_green:
            self.model.traffic_lights[self.next_traffic_light_to_green].state = 'green'
            print(f"Semaforo {self.next_traffic_light_to_green} en verde")


In [None]:

def agent_portrayal(agent):
    """
    Funcion para definir como se vera cada agente en el tablero
    """
    portrayal = {"Shape": "rect", "w": 0.8, "h": 0.8, "Filled": "true"}

    if isinstance(agent, Vehicle):
        portrayal["Color"] = "blue"
        portrayal["Layer"] = 1
    elif isinstance(agent, TrafficLight):
        if agent.state == 'yellow':
            portrayal["Color"] = "yellow"
        elif agent.state == 'red':
            portrayal["Color"] = "red"
        elif agent.state == 'green':
            portrayal["Color"] = "green"
        portrayal["Layer"] = 2
        portrayal["w"] = 0.5
        portrayal["h"] = 0.5
    return portrayal

# Definir tamaño del multi grid 
grid_width = 50
grid_height = 50

grid = CanvasGrid(agent_portrayal, grid_width, grid_height, 500, 500)

# Parametros del modelo
model_params = {
    "width": grid_width,
    "height": grid_height
}

# Crear y lanzar el servidor
server = ModularServer(IntersectionModel, [grid], "Intersection Model", model_params)

server.port = 8521 # El puerto predeterminado
server.launch()