In [1]:
# Model design
import agentpy as ap

# Visualization
import matplotlib.pyplot as plt
import seaborn as sns
import IPython
from IPython.display import HTML

# Create Json
import json

# API
from http.server import BaseHTTPRequestHandler, HTTPServer
import logging

# Spawn points
spawns = ((16,1),(15,18),(30,10),(1,9))         # Up, Down, Left, Right

stoplights = ((17,8), (14,11), (17,11), (14,8)) # Up, Down, Left, Right

roadup = ((16,1),(16,2),(16,3),(16,4),(16,5),(16,6),(16,7),(16,8),(16,9),(16,10),(16,11),(16,12),(16,13),(16,14),(16,15))
roaddown = ((15,1),(15,2),(15,3),(15,4),(15,5),(15,6),(15,7),(15,8),(15,9),(15,10),(15,11),(15,12),(15,13),(15,14),(15,15),(15,16),(15,17),(15,18))
roadleft = ((1,10),(2,10),(3,10),(4,10),(5,10),(6,10),(7,10),(8,10),(9,10),(10,10),(11,10),(12,10),(13,10),(14,10),(15,10),(16,10),(17,10),(18,10),(19,10),(20,10),(21,10),(22,10),(23,10),(24,10),(25,10),(26,10),(27,10),(28,10),(29,10),(30,10))
roadright = ((1,9),(2,9),(3,9),(4,9),(5,9),(6,9),(7,9),(8,9),(9,9),(10,9),(11,9),(12,9),(13,9),(14,9),(15,9),(16,9),(17,9),(18,9),(19,9),(20,9),(21,9),(22,9),(23,9),(24,9),(25,9),(26,9),(27,9),(28,9),(29,9),(30,9))

# Direct Routes
route1 = ((16,1), (16,8), (16,9), (16,18), (17,8))          # Up
route2 = ((15,18), (15,11), (15,10), (15,1), (14,11))       # Down
route3 = ((30,10), (17,10), (16,10), (1,10), (17,11))       # Left
route4 = ((1,9), (14,9), (15,9), (30,9), (14,8))            # Right

routes = (route1, route2, route3, route4)

# Stoplight routes that GOD will check
stopLight1 = ((16,8),(16,7),(16,6))             # Up
stopLight2 = ((15,13),(15,12),(15,11))          # Down
stopLight3 = ((17,10),(18,10),(19,10))          # Left
stopLight4 = ((14,9),(13,9),(12,9))             # Right

stopLightsX = (stopLight3, stopLight4)          # Left and Right stops
stopLightsY = (stopLight1, stopLight2)          # Up and Down stops

stopLightsArray = (stopLight1, stopLight2, stopLight3, stopLight4)


In [2]:
class Car(ap.Agent):
    
    def setup(self):
        # Initialize an attribute with a parameter
        self.grid = self.model.grid
        self.random = self.model.random
        self.route = self.random.choice(routes)
        self.id = 0;

        # Spawn the car in random from list of spawn points
        # Car must have objetive
        # Tracks that the objetive was reached, 0 = not reached, 1 = reached
        self.goal = 0
        self.group = 1
        
        # Agent position in x and y
        self.x = self.route[0][0]
        self.y = self.route[0][1]

        # Agent objective in x and y
        self.objx = self.route[3][0]
        self.objy = self.route[3][1]
        
        # Agent in movement or stopped, 0 = stopped, 1 = moving
        self.moving = 0

        # Step that Agent is at
        self.actStep = 1

        self.speed = 1
        #self.speed = self.random.choice(range(1,3))
        
        self.position = (self.x, self.y)

    def check_goal(self):
        if self.x == self.objx and self.y == self.objy:
            self.goal = 1
        else:
            self.goal = 0
            
    # Agent's Movement
    def moveXright(self):
        self.x += self.speed
        self.grid.move_to(self, (self.x,self.y))
        self.actStep += 1

    def moveXleft(self):
        self.x -= self.speed
        self.grid.move_to(self, (self.x,self.y))
        self.actStep += 1

    def moveYup(self):
        self.y += self.speed
        self.grid.move_to(self, (self.x,self.y))
        self.actStep += 1

    def moveYdown(self):
        self.y -= self.speed
        self.grid.move_to(self, (self.x,self.y))
        self.actStep += 1

    def move(self, trafficLights):
        # Check the traffic light status
        self.checkLight(trafficLights)
        # move
        if (self.x != self.objx):
            if (self.x > self.objx):
                self.moveXleft()
            else:
                self.moveXright()
        else:
            if (self.y > self.objy):
                self.moveYdown()
            else:
                self.moveYup()

    def whereAmI(self):
        self.position = (self.x, self.y)
        return (self.position)

    def checkLight(self, trafficLights):
        if self.x == self.route[1][0] and self.y == self.route[1][1]:
            i = 0
            for light in trafficLights:
                if light.position == self.route[4]:
                    if light.showLight() == False:
                        self.speed = 0
                    else:
                        self.speed = 1
                else:
                    i += 1


In [31]:
class God(ap.Agent):
    def setup(self):
        
        self.grid = self.model.grid
        self.pathsToCheck = stopLightsArray
        self.group = 0
        self.lightPathToTurnOn = 0
        
        # Tracks if cars should continue moving or not, 0 = stop, 1 = move
        self.turnLight = 0

        # Counts cars in
        # countX[0] = stopLight3
        # countX[1] = stopLight4
        self.countX = [0,0]

        # countY[0] = stopLight1
        # countY[1] = stopLight2
        self.countY = [0,0]

    #Checks amount of cars in 1 strip of path
    # must receive an array from Stoplights (3 locations)
    # Ex. countCars(stopLightsArray[1], cars_on_road)
    def countCars(self, spotList, cars_on_road):
        count = 0
        for spot in spotList:
            for car in cars_on_road:
                if car.whereAmI() == spot:
                    count += 1
        return count

    def passOrNot(self):
        maxX = max(self.countX[0], self.countX[1])
        maxY = max(self.countY[0], self.countY[1])

        # Returns a list that contains (bool, maxIndex). 0 = X and 1 = Y 
        # if self.countX[maxIndexX] >= self.countY[maxIndexY]:
        if maxX >= maxY:
            #return (0, maxIndexX)
            return False
        else:
            #return (1, maxIndexY)
            return True

    # Counts all of the cars in the designated spots
    def broadcast(self):
        self.countY[0] = self.countCars(self.pathsToCheck[0], self.model.cars_on_road)
        self.countY[1] = self.countCars(self.pathsToCheck[1], self.model.cars_on_road)
        self.countX[0] = self.countCars(self.pathsToCheck[2], self.model.cars_on_road)
        self.countX[1] = self.countCars(self.pathsToCheck[3], self.model.cars_on_road)

        self.lightPathToTurnOn = self.passOrNot()
        
    # Makes a decision based on broadcast results
    def callBack(self):
        return self.lightPathToTurnOn

In [42]:
# Stoplight Agent
class StopLight(ap.Agent): 

    def setup(self):
        # Initialize an attribute with a parameter
        self.state = False
        self.group = 0
        self.id = 0
        self.timer = 10
        self.position = (0, 0)
    
    def showLight(self):
        return self.state


    def changeColor(self):
        if self.timer == 0:
            self.timer = 10
            if self.state == True:
                self.state = not self.state
                self.group = 0
            else:
                self.state = not self.state
                self.group = 2
        else:
            self.timer = self.timer - 1

# no jala 😭
# aunque traiga false, siempre entra a true
# fuera y dentro de model
"""
        if self.model.god.callBack(): # si al llamar a Dios dice que es TRUE :       
            if self.id == 2 or self.id == 3:
                self.state = not self.state
                self.group = 2
            else:
                self.state = not self.state
                self.group = 0
        else:
            if self.id == 0 or self.id == 1:
                print("y pass")
                self.state = not self.state
                self.group = 2
            else:
                self.state = not self.state
                self.group = 0
                 """
        


'\n        if self.model.god.callBack(): # si al llamar a Dios dice que es TRUE :       \n            if self.id == 2 or self.id == 3:\n                self.state = not self.state\n                self.group = 2\n            else:\n                self.state = not self.state\n                self.group = 0\n        else:\n            if self.id == 0 or self.id == 1:\n                print("y pass")\n                self.state = not self.state\n                self.group = 2\n            else:\n                self.state = not self.state\n                self.group = 0\n                 '

In [21]:
class Traffic(ap.Model):
    def setup(self):
        n = self.p.cars
        m = self.p.trafficLights
        #t = self.p.timer
        
        self.grid = ap.Grid(self, (self.p.x, self.p.y), track_empty=True)
        self.timer = 0
        
        # Initiate car agents on to the grid
        self.agents = ap.AgentList(self, n, Car)
        self.grid.add_agents(self.agents, random=False, empty=False)
        
        id_car = 0
        for car in self.agents:
            car.id = id_car
            id_car += 1

        # Initiate traffic lights on the grid
        self.trafficLights = ap.AgentList(self, m, StopLight)
        self.grid.add_agents(self.trafficLights, stoplights, random=False, empty=False)

        id_light = 0
        for tl in self.trafficLights:
            tl.id = id_light
            tl.position = stoplights[id_light]
            id_light += 1


        #Initiate God Agent
        self.god = ap.AgentList(self, 1, God)
        self.grid.add_agents(self.god, random=True, empty=True)
        
        self.carDICT = {}

        posDICT = []
        for c in self.agents:
            car = {
                "id" : c.id,
                "x" : c.x,
                "y" : 0,
                "z" : c.y
            }
            posDICT.append(car)
        
        self.carDICT["car"] = posDICT

    def step(self):
        self.god.broadcast()
        self.trafficLights.changeColor()

        self.cars_on_road.move(self.trafficLights)
        
        self.carDICT = {}

        posDICT = []
        for c in self.agents:
            car = {
                "id" : c.id,
                "x" : c.x,
                "y" : 0,
                "z" : c.y
            }
            posDICT.append(car)
        
        self.carDICT["car"] = posDICT

    def update(self):
        self.agents.check_goal()
        self.cars_on_road = self.agents.select(self.agents.goal == 0)

        # Executed after every step done
        if len(self.cars_on_road) == 0:
           self.stop()

        
    def end(self):
        # Called at the end of the simulation
        self.report('Simulation ran', 1)
        #jsonString = json.dumps(self.carDICT)
        #print(jsonString)
        #with open('intento.json', 'w') as file:
            #json.dump(self.carDICT, file, indent = 2)



In [6]:
# Define parameters
parameters = {
    'steps' : 40,
    'cars' : 4, # Amount of cars
    'timer' : 6, #Timer
    'trafficLights': 4,
    'x' : 30, # Width of grid
    'y' : 18, # Height of grid
}

In [7]:
model = Traffic(parameters)
print(model)
results = model.run()
print(model.carDICT)

Traffic
updated callBack
[0]
Y pass
Completed: 1 stepsupdated callBack
[0]
Y pass
Completed: 2 stepsupdated callBack
[0]
Y pass
Completed: 3 stepsupdated callBack
[0]
Y pass
Completed: 4 stepsupdated callBack
[0]
Y pass
Completed: 5 stepsupdated callBack
[1]
Y pass
Completed: 6 stepsupdated callBack
[1]
Y pass
Completed: 7 stepsupdated callBack
[1]
Y pass
Completed: 8 stepsupdated callBack
[0]
Y pass
Completed: 9 stepsupdated callBack
[0]
Y pass
Completed: 10 stepsupdated callBack
[0]
Y pass
Completed: 11 stepsupdated callBack
[0]
Y pass
Completed: 12 stepsupdated callBack
[0]
Y pass
Completed: 13 stepsupdated callBack
[0]
Y pass
Completed: 14 stepsupdated callBack
[0]
Y pass
Completed: 15 stepsupdated callBack
[0]
Y pass
Completed: 16 stepsupdated callBack
[0]
Y pass
Completed: 17 stepsupdated callBack
[0]
Y pass
Completed: 18 stepsupdated callBack
[0]
Y pass
Completed: 19 stepsupdated callBack
[0]
Y pass
Completed: 20 stepsupdated callBack
[0]
Y pass
Completed: 21 stepsupdated callBa

In [None]:
# Convert string to json
jsonString = json.dumps(model.carDICT)

# Server instance
class Server(BaseHTTPRequestHandler):
    def _set_response(self):
        self.send_response(200)
        self.send_header('Content-type', 'text/html')
        self.end_headers()
        
    def do_Car(self):
        logging.info("GET request,\nPath: %s\nHeaders:\n%s\n", str(self.path), str(self.headers))
        self._set_response()
        message = "Pal Nortec"
        self.wfile.write(jsonString.encode('utf-8'))

    def do_stopLight(self):
        logging.info("GET request,\nPath: %s\nHeaders:\n%s\n", str(self.path), str(self.headers))
        self._set_response()
        message = "Pal Nortec"
        self.wfile.write(bytes(message,'utf-8'))

    def do_GET(self):
        if self.path == '/car':
            self.do_Car()
        elif self.path == '/trafficLight':
            self.do_stopLight()

# Run server
def run(server_class=HTTPServer, handler_class=Server, port=8585):
    logging.basicConfig(level=logging.INFO)
    server_address = ('', port)
    httpd = server_class(server_address, handler_class)
    logging.info("Starting httpd...\n") 
    try:
        httpd.serve_forever()
    except KeyboardInterrupt: 
        pass
    httpd.server_close()
    logging.info("Stopping httpd...\n")

if __name__ == '__main__':
    from sys import argv
    
    if len(argv) == 2:
        run(port=int(argv[1]))
    else:
        run()


In [43]:
def animation_plot(model, ax):
    group_grid = model.grid.attr_grid('group')
    color_dict = {0:'#d62c2c', 1:'#019FAF', 2:'#66cdaa', None:'#ffffff'}
    ap.gridplot(group_grid, ax=ax, color_dict=color_dict, convert=True)
    ax.set_title(f"Traffic Simulation")

fig, ax = plt.subplots() 
model = Traffic(parameters)
animation = ap.animate(model, fig, ax, animation_plot)
IPython.display.HTML(animation.to_jshtml())