In [1]:
import agentpy as ap
import numpy as np
import matplotlib.pyplot as plt
import math
import IPython
import json 
import random

In [2]:
class Semaphore(ap.Agent):
    def setup(self):
        self.time_state_green = 0
        self.step_time = 0.1
        self.direction = [0,1]
        self.state = 0 # verde = 0, amarillo = 1, rojo = 2
        self.state_time = 0
        
        self.green_duration = 90
        self.yellow_duration = 5
        self.red_duration = 60
        
    def update(self):
        self.state_time += self.step_time
        
        #verde a amarillo
        if self.state == 0:
            if self.state_time >= self.green_duration:
                self.model.votar = True # Cuando acaba el verde volvemos a votar
                self.time_state_green += 1 # Le subimos a su contador de veces en verde
                self.state = 1
                self.state_time = 0
                
        #amarillo a rojo        
        if self.state == 1:
            if self.state_time >= self.yellow_duration:
                self.state = 2
                self.state_time = 0
                
        #rojo a verde      
        if self.state == 2:
            if self.state_time >= self.red_duration:
                self.state = 0
                self.state_time = 0
        
    def set_green(self):
        self.state = 0
        self.state_time = 0
        #self.time_state_green += 1 
    
    def set_yellow(self):
        self.state = 1
        self.state_time = 0
        
    def set_red(self):
        self.state = 2
        self.state_time = 0
    
    def cars(self):
        """Este método calculo el número de autos que están frente al semáforo a menos de 150 metros. """
        count = 0

        # Obten la posición actual del semáforo 
        p = self.model.avenue.positions[self]

        for car in self.model.cars:
            if car != self:
                # Verifica si el carro va en dirección al semáforo
                dot_p1 = self.direction[0]*car.direction[0] + self.direction[1]*car.direction[1]

                # Verifica si el carro está atrás o adelante
                p2 = self.model.avenue.positions[car]
                dot_p2 = (p2[0]-p[0])*self.direction[0] + (p2[1]-p[1])*self.direction[1]

                if dot_p1 < 0 and dot_p2 > 0:
                    d = math.sqrt((p[0]-p2[0])**2 + (p[1]-p2[1])**2)

                    if d < 150:
                        count += 1
                        
        return count

In [3]:
class Car(ap.Agent):
    
    def setup(self):
        self.step_time = 0.1
        self.direction = [1,0]
        self.speed = 0.0
        self.max_speed = 16.66
        self.state = 1 # 0 = chocado, 1 = ok
        
    def update_position(self):
        if self.state == 0:
            return 
        
        self.model.avenue.move_by(self,[self.speed*self.step_time*self.direction[0],self.speed*self.step_time*self.direction[1]])
        
    def update_speed(self):
        if self.state == 0:
            return
        
        p = self.model.avenue.positions[self]
        
        min_car_distance = 1000000
        
        for car in self.model.cars:
            if car != self:
                
                dot_p1 = self.direction[0]*car.direction[0] + self.direction[1]*car.direction[1]
            
                p2 = self.model.avenue.positions[car]
            
                dot_p2 = (p2[0]-p[0])*self.direction[0] + (p2[1]-p[1])*self.direction[1]
            
                if dot_p1 > 0 and dot_p2 > 0:
                    d = math.sqrt((p[0]-p2[0])**2 + (p[1]-p2[1])**2)
                    
                    if d < min_car_distance:
                        min_car_distance = d
        
        min_semaphore_distance = 1000000
        semaphore_state = 0
        for semaphore in self.model.semaphores:
                
            dot_p1 = self.direction[0]*semaphore.direction[0] + self.direction[1]*semaphore.direction[1]
            
            p2 = self.model.avenue.positions[semaphore]
            
            dot_p2 = (p2[0]-p[0])*self.direction[0] + (p2[1]-p[1])*self.direction[1]
            
            if dot_p1 < 0 and dot_p2 > 0:
                d = math.sqrt((p2[0]-p[0])**2 + (p2[1]-p[1])**2)
                
                if d < min_semaphore_distance:
                    min_semaphore_distance = d
                    semaphore_state = semaphore.state
            
        if min_car_distance < 2:
            self.speed = 0
            self.state = 1
            
        elif min_car_distance < 20:
            self.speed = np.maximum(self.speed - 10*self.step_time, 0)
        
        elif min_car_distance < 30:
            self.speed = np.maximum(self.speed - 5*self.step_time, 0)
            
        elif min_semaphore_distance < 25 and semaphore_state == 1:
            self.speed = np.minimum(self.speed + 2.5*self.step_time, self.max_speed)
            
        elif min_semaphore_distance < 40 and semaphore_state == 1:
            self.speed = np.maximum(self.speed - 2.5*self.step_time, 5)
            
        elif min_semaphore_distance < 30 and semaphore_state == 2:
            self.speed = np.maximum(self.speed - 10*self.step_time, 0)
        
        elif min_semaphore_distance < 50 and semaphore_state == 2:
            self.speed = np.maximum(self.speed - 2.5*self.step_time, 5)
        
        else:
            self.speed = np.minimum(self.speed + 2.0*self.step_time, self.max_speed)
        
        
        

In [4]:
class AvenueModel(ap.Model):
    
    def setup(self):
        # Preparamos estado de votar
        self.votar = True
        
        self.cars = ap.AgentList(self, self.p.cars, Car)
        self.cars.step_time = self.p.step_time
        self.cars.speed = self.p.v0
        
        c_north = int(self.p.cars/4)
        c_south = int(self.p.cars/4) 
        c_east = int(self.p.cars/4)
        c_west = int(self.p.cars/4)
        
        for k in range(c_north):
            self.cars[k].direction = [0,1]
            
        for k in range(c_south):
            self.cars[c_north + k].direction = [0,-1]

        for k in range(c_east):
            self.cars[c_north + c_south + k].direction = [1,0]

        for k in range(c_west):
            self.cars[c_north + c_south + c_east + k].direction = [-1,0]
            
        self.semaphores = ap.AgentList(self, self.p.semaphores, Semaphore)
        self.semaphores.step_time = self.p.step_time
        self.semaphores.green_duration = self.p.green
        self.semaphores.yellow_duration = self.p.yellow
        self.semaphores.red_duration = self.p.red
        self.semaphores[0].direction = [-1,0]
        self.semaphores[1].direction = [1,0]
        self.semaphores[2].direction = [0,1]
        self.semaphores[3].direction = [0,-1]

        # self.semaphores[0].state = 2
        # self.semaphores[1].state = 2
        
        self.avenue = ap.Space(self, shape = [self.p.size ,self.p.size], torus = True)
        
        self.avenue.add_agents (self.semaphores, random = True)
        self.avenue.move_to(self.semaphores[0],[self.p.size*0.5 - 25, self.p.size*0.5 - 20])
        self.avenue.move_to(self.semaphores[1],[self.p.size*0.5 + 25, self.p.size*0.5 + 20])
        self.avenue.move_to(self.semaphores[2],[self.p.size*0.5 - 25, self.p.size*0.5 + 20])
        self.avenue.move_to(self.semaphores[3],[self.p.size*0.5 + 25, self.p.size*0.5 - 20])
        
        self.avenue.add_agents(self.cars, random = True)
        
        for k in range(c_north):
            self.avenue.move_to(self.cars[k],[self.p.size*0.5 + 5,10*(k+1)])
            
        for k in range(c_south):
            self.avenue.move_to(self.cars[k + c_north],[self.p.size*0.5 - 5,self.p.size - 10*(k+1)])

        for k in range(c_east):
            self.avenue.move_to(self.cars[c_north + c_south + k],[10*(k+1),self.p.size*0.5 - 5])

        for k in range(c_west):
            self.avenue.move_to( self.cars[c_north + c_south + c_east + k],[self.p.size - 10*(k+1),self.p.size*0.5 + 5])
                
        self.frames = 0

        # Archivo json
        self.data = {}
        self.data['size'] = []
        self.data['size'].append({'size': self.p.size})
        self.data['cars'] = []
        self.data['semaphores'] = []
        self.data['frames'] = []

        # Carga de carros
        for k in range(self.p.cars):
            self.data['cars'].append({
                'id': k,
                'x': self.model.avenue.positions[self.cars[k]][0],
                'z': self.model.avenue.positions[self.cars[k]][1],
                'dir': math.atan2(self.cars[k].direction[0], self.cars[k].direction[1]) * 180 / math.pi,
                'type': random.randint(0, 6),
                'StateColor': self.semaphores[0].state,
                'StateColor2':self.semaphores[1].state,
                
            })

        # Carga de semaforos
        for k in range(len(self.semaphores)):
            self.data['semaphores'].append({
                'id': k + model.p.cars,
                'x': self.model.avenue.positions[self.semaphores[k]][0],
                'z': self.model.avenue.positions[self.semaphores[k]][1],
                'dir': math.atan2(self.semaphores[k].direction[0], self.semaphores[k].direction[1]) * 180 / math.pi,
                'stateColor': self.semaphores[k].state,
                'stateColor2': self.semaphores[k].state
            })
        
    def step(self):
        self.semaphores.update()
        
        self.cars.update_position()
        self.cars.update_speed()

        car_list = []
        for k in range(self.p.cars):
            car_list.append({
                'id': k,
                'x': self.model.avenue.positions[self.cars[k]][0],
                'z': self.model.avenue.positions[self.cars[k]][1],
                'dir': math.atan2(self.cars[k].direction[0], self.cars[k].direction[1]) * 180 / math.pi,
                'StateColor': self.semaphores[0].state,
                'StateColor2':self.semaphores[2].state,
            })
        sempahore_list = []
        for k in range(len(self.semaphores)):
            sempahore_list.append({
                'id': k + model.p.cars,
                'state': self.semaphores[k].state
            })

        self.data['frames'].append({
            'frame': self.frames,
            'cars': car_list,
            'semaphores': sempahore_list
        })
        
        # inicializacion de las votaciones
        votes = [0,0,0,0]
        
        # Si el estado de votar es verdadero, votamos
        if self.votar == True:
            # primer voto
            voteOne = random.randint(0,3)
            votes[voteOne] += 1

            # segundo voto
            count = 0
            indice = -1
            for i in range(len(self.semaphores)):
                if(self.semaphores[i].cars() > count):
                    indice = i
                    count = self.semaphores[i].cars()

            if(indice != -1):
                votes[indice] += 1 
            else:
                votes[random.randint(0,3)] +=1

            # tercer voto 
            green_quantity = 100000000
            indice_green = -1
            for i in range(len(self.semaphores)):
                if(self.semaphores[i].time_state_green < green_quantity):
                    indice_green = i
                    green_quantity = self.semaphores[i].time_state_green
                elif(self.semaphores[i].time_state_green == green_quantity):
                    indice_green = -1

            if(indice_green != -1):
                votes[indice_green] += 1
            else:
                votes[random.randint(0,3)] +=1

            # cuarto voto
            if(self.semaphores[0].state == 2):
                votes[random.randint(0,1)] +=1
            else:
                votes[random.randint(2,3)] +=1

            # Contamos votaciones
            votes_count = 0
            indice_votes = -1
            for i in range(len(votes)):
                if(votes[i] > votes_count):
                    votes_count = votes[i]
                    indice_votes = i

            if(indice_votes == 0 or indice_votes == 1):
                if self.semaphores[0].state == 0 or self.semaphores[1].state == 0 or self.semaphores[0].state == 1 or self.semaphores[1].state == 1:
                    self.semaphores[0].set_green()
                    self.semaphores[1].set_green()
                    self.semaphores[2].set_red()
                    self.semaphores[3].set_red()
                
            elif(indice_votes == 2 or indice_votes == 3):
                if self.semaphores[2].state == 0 or self.semaphores[3].state == 0 or self.semaphores[2].state == 1 or self.semaphores[3].state == 1:
                    self.semaphores[2].set_green()
                    self.semaphores[3].set_green()
                    self.semaphores[0].set_red()
                    self.semaphores[1].set_red()
            else:
                azar = random.randint(0,3)
                if(azar == 0 or azar == 1):
                    if self.semaphores[0].state == 0 or self.semaphores[1].state == 0 or self.semaphores[0].state == 1 or self.semaphores[1].state == 1:
                        self.semaphores[0].set_green()
                        self.semaphores[1].set_green()
                        self.semaphores[2].set_red()
                        self.semaphores[3].set_red()
                else:
                    if self.semaphores[2].state == 0 or self.semaphores[3].state == 0 or self.semaphores[2].state == 1 or self.semaphores[3].state == 1:
                        self.semaphores[2].set_green()
                        self.semaphores[3].set_green()
                        self.semaphores[0].set_red()
                        self.semaphores[1].set_red() 
            self.votar = False
        # Terminamos votaciones

        self.frames += 1

        
    def update(self):
        crashes = len(self.cars.select(self.cars.state == 0))
        self.record('Colisiones', crashes)
        self.record('semaforo_1',self.semaphores[0].cars())
        self.record('semaforo_2',self.semaphores[1].cars())
        self.record('semaforo_3',self.semaphores[2].cars())
        self.record('semaforo_4',self.semaphores[3].cars())
        
    def end(self):
        crashes = len(self.cars.select(self.cars.state == 0))
        self.report('Colisiones', crashes)
        json_file = json.dumps(self.data)
        with open('simul_data.json', 'w') as outfile:
            outfile.write(json_file)
        
    

In [5]:
def animation_plot_single(m,ax):
    ## generar grafica
    
    ax.set_title(f"Avenida t={m.t*m.p.step_time: .2f}")
    # , Semaforo 1: {m.semaphores[0].cars():d}, Semaforo 2: {m.semaphores[1].cars():d}, Semaforo 3: {m.semaphores[2].cars():d}, Semaforo 4: {m.semaphores[3].cars():d}
    
    colors = ["green", "yellow", "red"]
    
    pos_s1 = m.avenue.positions[m.semaphores[0]]
    ax.scatter(*pos_s1, s=20, c=colors[m.semaphores[0].state])
    
    pos_s1 = m.avenue.positions[m.semaphores[1]]
    ax.scatter(*pos_s1, s=20, c=colors[m.semaphores[1].state])

    pos_s1 = m.avenue.positions[m.semaphores[2]]
    ax.scatter(*pos_s1, s=20, c=colors[m.semaphores[2].state])

    pos_s1 = m.avenue.positions[m.semaphores[3]]
    ax.scatter(*pos_s1, s=20, c=colors[m.semaphores[3].state])

    ax.set_xlim(0, m.avenue.shape[0])
    ax.set_ylim(0, m.avenue.shape[1])
    
    for car in m.cars:
        pos_c = m.avenue.positions[car]
        ax.scatter(*pos_c, s=20, c="black")
        
    ax.set_axis_off()
    ax.set_aspect('equal','box')
    
def animation_plot(m,p):
    #pa animar
        
    fig = plt.figure(figsize=(10,10))
    ax = fig.add_subplot(111)
    animation = ap.animate(m(p), fig, ax, animation_plot_single)
    return IPython.display.HTML(animation.to_jshtml(fps=20))
    

In [6]:
parameters = {
    'steps' : 20000,
    'step_time' : 0.1,
    'size' : 500,
    'green' : 15,
    'yellow' : 5,
    'red' : 20,
    'cars' : 20,
    'v0' : 0,
    'semaphores' : 4
}

In [7]:
model = AvenueModel(parameters)
results = model.run()

Completed: 20000 steps
Run time: 0:00:18.900438
Simulation finished


In [8]:
results.variables.AvenueModel

Unnamed: 0_level_0,Colisiones,semaforo_1,semaforo_2,semaforo_3,semaforo_4
t,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
0,0,0,0,0,0
1,0,0,0,0,0
2,0,0,0,0,0
3,0,0,0,0,0
4,0,0,0,0,0
...,...,...,...,...,...
19996,0,2,2,5,5
19997,0,2,2,5,5
19998,0,2,2,5,5
19999,0,2,2,5,5


In [9]:
results.reporters

Unnamed: 0,seed,Colisiones
0,161367819613041466108490439171542754152,0


In [10]:
#animation_plot(AvenueModel, parameters)