In [91]:
import agentpy as ap
import numpy as np
import matplotlib.pyplot as plt
import math
import IPython
import json

In [92]:
myJson={}

def convertToJson():
    with open("variables.json", "w") as outfile:
        json.dump(myJson, outfile)

class Semaphore(ap.Agent):
    def setup(self):
        self.step_time = 0.1
        self.direction = [0, 1]
        self.state = 0 # green = 0, yellow = 1, red = 2
        self.state_time = 0

        self.green_duration = 50
        self.yellow_duration = 5
        self.red_duration = 45

    def update(self) :
        self.state_time += self.step_time


        if self.state == 0:
            if self.state_time >= self.green_duration:
                self.state = 1
                self.state_time = 0

        if self.state == 1:
            if self.state_time >= self.yellow_duration:
                self.state = 2
                self.state_time = 0

        if self.state == 2:
            if self.state_time >= self.red_duration:
                self.state = 0
                self.state_time = 0

    def set_green(self):
        self.state = 0
        self.state_time = 0
    
    def set_yellow(self):
        self.state = 1
        self.state_time = 0

    def set_red(self):
        self.state = 2
        self.state_time = 0

In [93]:
class Car(ap.Agent):
    def setup(self):
        self.step_time = 0.1
        self.direction = [0, 1]
        self.speed = 0.0
        self.max_speed = 16.66
        self.state = 1 # 0 = chocado, 1 = ok

    def update_position(self):
        if self.state == 0:
            return
        
        self.model.avenue.move_by(self, [self.speed*self.step_time*self.direction[0], self.speed*self.step_time*self.direction[1]])

    def update_speed(self):
        if self.state == 0:
            return
        
        p = self.model.avenue.positions[self]

        min_car_distance = 10000000
        for car in self.model.cars:
            if car == self:
                continue

            dot_p1 = self.direction[0]*car.direction[0] + self.direction[1]*car.direction[1]

            p2 = self.model.avenue.positions[car]
            dot_p2 = (p2[0] - p[0])*self.direction[0] + (p2[1] - p[1])*self.direction[1]

            if dot_p1 > 0 and dot_p2 > 0:
                d = math.sqrt((p2[0] - p[0])**2 + (p2[1] - p[1])**2)
                if d < min_car_distance:
                    min_car_distance = d

        min_semaphore_distance = 10000000
        semaphore_state = 0
        for semaphore in self.model.semaphores:

            dot_p1 = self.direction[0]*semaphore.direction[0] + self.direction[1]*semaphore.direction[1]

            p2 = self.model.avenue.positions[semaphore]
            dot_p2 = (p2[0] - p[0])*self.direction[0] + (p2[1] - p[1])*self.direction[1]

            if dot_p1 < 0 and dot_p2 > 0:
                d = math.sqrt((p2[0] - p[0])**2 + (p2[1] - p[1])**2)
                if d < min_semaphore_distance:
                    min_semaphore_distance = d
                    semaphore_state = semaphore.state
                
        if min_car_distance < 2:
            self.speed = 0
            self.state = 1

        elif min_car_distance < 10:
            self.speed = np.maximum(self.speed - 10*self.step_time, 0)
        
        elif min_car_distance < 15:
            self.speed = np.maximum(self.speed - 5*self.step_time, 0)
        
        elif min_semaphore_distance < 20 and semaphore_state == 1:
            self.speed = np.minimum(self.speed + 2.5*self.step_time, self.max_speed)

        elif min_semaphore_distance < 40 and semaphore_state == 1:
            self.speed = np.maximum(self.speed - 2.5*self.step_time, 5)

        elif min_semaphore_distance < 30 and semaphore_state == 2:
            self.speed = np.maximum(self.speed - 10*self.step_time, 0)

        elif min_semaphore_distance < 50 and semaphore_state == 2:
            self.speed = np.maximum(self.speed - 2.5*self.step_time, 5)

        else:
            self.speed = np.minimum(self.speed + 2.0*self.step_time, self.max_speed)

In [94]:
stepCount=0
class AvenueModel(ap.Model):
    def setup(self):
        self.cars = ap.AgentList(self, self.p.cars, Car)
        self.cars.step_time = self.p.step_time
        self.cars.speed = self.p.v0
        self.stepCount=0

        c_north = int(self.p.cars/2)
        c_south = self.p.cars - c_north

        for k in range(c_north):
            self.cars[k].direction = [0, 1]

        for k in range(c_south):
            self.cars[c_north + k].direction = [0, -1]
        
        self.semaphores = ap.AgentList(self, 2, Semaphore)
        self.semaphores.step_time = self.p.step_time
        self.semaphores.green_duration = self.p.green
        self.semaphores.yellow_duration = self.p.yellow
        self.semaphores.red_duration = self.p.red
        self.semaphores[0].direction = [0, 1]
        self.semaphores[1].direction = [0, -1]

        self.avenue = ap.Space(self, shape = [60, self.p.size], torus = True)

        self.avenue.add_agents(self.semaphores, random=True)
        self.avenue.move_to(self.semaphores[0], [10, self.p.size*0.5 - 5])
        self.avenue.move_to(self.semaphores[1], [50, self.p.size*0.5 + 5])

        self.avenue.add_agents(self.cars, random=True)

        for k in range(c_north):
            self.avenue.move_to(self.cars[k], [40, 10*(k+1)])

        for k in range(c_south):
            self.avenue.move_to(self.cars[k+c_north], [20, self.p.size - 10*(k+1)])
        print(self.cars)
        
        
        #Usamos la info para generar el JSON
        cars=[]
        semaphores=[]
        for c in range(len(self.cars)):
            currentInfo={
                "x":self.model.avenue.positions[self.cars[c]][0],
                "z":self.model.avenue.positions[self.cars[c]][1],
                "direction":self.cars[c].direction
            }
            cars.append(currentInfo)
        
        for s in range(len(self.semaphores)):
            currentInfo={
                "x":self.model.avenue.positions[self.semaphores[s]][0],
                "z":self.model.avenue.positions[self.semaphores[s]][1],
                "direction":self.semaphores[s].direction,
                "state":self.semaphores[s].state,
                
            }
            semaphores.append(currentInfo)
        
        myJson[self.stepCount]={
            "step":self.stepCount,
            "cars":cars,
            "semaphores":semaphores
        }

        
        
    def step(self):
        self.stepCount+=1

        self.semaphores.update()

        self.cars.update_position()
        self.cars.update_speed()
        
        #Usamos la info para generar el JSON
        cars=[]
        semaphores=[]
        for c in range(len(self.cars)):
            currentInfo={
                "x":self.model.avenue.positions[self.cars[c]][0],
                "z":self.model.avenue.positions[self.cars[c]][1],
                "direction":self.cars[c].direction
            }
            cars.append(currentInfo)
        
        for s in range(len(self.semaphores)):
            currentInfo={
                "x":self.model.avenue.positions[self.semaphores[s]][0],
                "z":self.model.avenue.positions[self.semaphores[s]][1],
                "direction":self.semaphores[s].direction,
                "state":self.semaphores[s].state,
            }
            semaphores.append(currentInfo)
        
        myJson[self.stepCount]={
            "step":self.stepCount,
            "cars":cars,
            "semaphores":semaphores
        }
    
    def update(self):
        crashes = len(self.cars.select(self.cars.state == 0))
        self.record('Colisiones', crashes)
    
    def end(self):
        crashes = len(self.cars.select(self.cars.state == 0))
        self.report('Colisiones', crashes)
        convertToJson()

In [95]:
parameters = {
    'steps': 500,
    'step_time': 0.1,
    'size': 300,
    'green': 20,
    'yellow': 5,
    'red': 10,
    'cars': 20,
    'v0': 0
}

Visualización

In [96]:
def animation_plot_single(m, ax):
    ax.set_title(f"Avenida t={m.t*m.p.step_time:.2f}")

    colors = ["green", "yellow", "red"]

    pos_s1 = m.avenue.positions[m.semaphores[0]]
    ax.scatter(*pos_s1, s=20, c=colors[m.semaphores[0].state])

    pos_s2 = m.avenue.positions[m.semaphores[1]]
    ax.scatter(*pos_s2, s=20, c=colors[m.semaphores[1].state])

    ax.set_xlim(0, m.avenue.shape[0])
    ax.set_ylim(0, m.avenue.shape[1])

    for car in m.cars:
        pos_c = m.avenue.positions[car]
        ax.scatter(*pos_c, s=20, c="black")

    ax.set_axis_off()
    ax.set_aspect('equal', 'box')

def animation_plot(m, p):
    fig = plt.figure(figsize=(10, 10))
    ax = fig.add_subplot(111)
    animation = ap.animate(m(p), fig, ax, animation_plot_single)
    return IPython.display.HTML(animation.to_jshtml(fps=20))

In [97]:
model = AvenueModel(parameters)
results = model.run()

AgentList (20 objects)
Completed: 500 steps
Run time: 0:00:00.470083
Simulation finished


In [98]:
# Autos colisionados en cada iteración
results.variables.AvenueModel

Unnamed: 0_level_0,Colisiones
t,Unnamed: 1_level_1
0,0
1,0
2,0
3,0
4,0
...,...
496,0
497,0
498,0
499,0


In [99]:
# Información al final de la simulación
results.reporters

Unnamed: 0,seed,Colisiones
0,336806804398936551692936565382109226238,0


In [100]:
animation_plot(AvenueModel, parameters)

AgentList (20 objects)
