In [1]:
import numpy as np
import random

In [171]:
class TrafficLightNetworkEnv():
    """
    Gym style environment for RL. You may also inherit the class structure from OpenAI Gym. 
    Parameters:
        n_time_steps:       int
                            Total number of time steps within each episode
        n_traffic_nodes:    int
                            Number of traffic nodes (intersections) in the network
        n_tl_queue_places:  int
                            Number of queue places before each traffic light
        n_initial_cars:     int
                            Number of cars to be initially allocated in the network
        seed:               int
                            seed of the RNG (for reproducibility)
    """
    
    def __init__(self, n_time_steps, n_traffic_nodes, n_tl_queue_places, n_initial_cars, seed):
        """
        Initialize the environment.
        
        """
        
        self.n_time_steps = n_time_steps
        self.n_traffic_nodes = n_traffic_nodes
        self.n_traffic_lights = n_traffic_nodes*8
        self.n_initial_cars = n_initial_cars
        self.n_tl_queue_places = n_tl_queue_places
        
        if n_traffic_nodes == 1:
          self.n_destinations = 4
        elif n_traffic_nodes == 2:
          self.n_destinations = 6
        elif n_traffic_nodes == 4:
          self.n_destinations = 8
        elif n_traffic_nodes == 6:
          self.n_destinations = 10

        self.traffic_nodes = {} # a dictionary to hold array of traffic light ids, where the key is id
        self.traffic_lights = {} # a dictionary of tuples (tl_state, [occupied places]), where the key is id
        self.cars = {} # a dictionary of tuples (entrance, destination, current_tl_id, current_queue_place, current_route_step, [route]), where key is id

        ### define action space variables
        # car agent actions
        self.car_actions = np.array([0,1,2]) # [go straight, turn left, turn right]

        # traffic node (intersection) agent actions        
        self.intersection_actions = np.array([0,1,2,3,4,5]) # [west, north, east, south, east-west, south-north]
        
        ### define state space variables
        # state space is defined by each car with values [tl, place, des]
                
        self.set_seed(seed)
        self.set_route_map()
        self.reset()
        
    
    def step(self, node_actions):
        """
        Interface between environment and agent. Performs one step in the environemnt.
        Parameters:
            action: list of (int, int)
                    for each traffic node: the index of the respective action in the intersection_actions array:
                    (node_id, action)
        Returns:
            output: ( object, float, bool)
                    information provided by the environment about its current state:
                    (state, reward, done)
        """

        ### set traffic light states according to the action in the respective traffic node
        for action in node_actions:
          
          # extract list of traffic lights for the node
          tls = self.traffic_nodes[action[0]]
          # produce new state for each traffic light according to the action of the node
          tls_state = self.intersection_lights(action[1])

          # update the state of each of the traffic lights, keeping the number of occupied places in its queue
          for tl_id, new_tl_state in zip(tls, tls_state):
            tl_queue = self.traffic_lights[tl_id][1]
            self.traffic_lights[tl_id] = (new_tl_state, tl_queue)

        ### move cars
        sorted_cars = [key for (key, value) in sorted(self.cars.items(), key = lambda x: (x[1][2], x[1][3]))]
        stuck_cars = []
        for car in sorted_cars:
          move_flag = self.move_car(car)
          if not move_flag:
            stuck_cars.append(car)

        print("stuck cars: ", stuck_cars)
        self.state = self.extract_state()
        reward = 0
        done = False
        return self.state, reward, done

    
    
    def set_seed(self,seed=0):
        """
        Sets the seed of the RNG.
        
        """
        np.random.seed(seed)
    
    
    
    def reset(self):
        """
        Resets the environment to its initial values.
        Returns:
            state:  object
                    the initial state of the environment
        """
        self.current_step = 0

        ### initialize nodes & traffic lights
        self.traffic_nodes = {}
        self.traffic_lights = {}
        for i in range(0, self.n_traffic_nodes):
          node_id = i+1
          tls = [k for k in range(i*8+1, (i+1)*8 + 1)]
          self.traffic_nodes[node_id] = tls

          # randomly choose inital action that will set corresponding traffic lights states
          initial_action = random.choice(self.intersection_actions)
          tls_state = self.intersection_lights(initial_action)

          # initialize traffic lights for this node
          for tl_id, tl_state in zip(tls, tls_state):
            self.traffic_lights[tl_id] = (tl_state, [])
        
        ### place randomly cars in the network
        self.cars = {}
        i=1
        s = set()
        while i<=self.n_initial_cars:
          # a car is defined by (entrance, destination, current_tl_id, current_queue_place, current_route_step, [route])
          ent_des = np.random.choice(range(1, self.n_destinations+1), 2, replace=False)
          route = self.get_route(ent_des[0], ent_des[1])
          tl = route[0]
          place = np.random.randint(1, self.n_tl_queue_places)          
          car = (ent_des[0],
                 ent_des[1],
                 tl,
                 place,
                 0,
                 route
                 )
          # discard cars at duplicate places
          if not (tl, place) in s:
            s.add((tl, place))
            self.cars[i] = car
            self.traffic_lights[tl][1].append(place)
            i=i+1
        
        # sort traffic light queues
        for tl in self.traffic_lights:
          self.traffic_lights[tl][1].sort()

        self.state = self.extract_state()

        return self.state
    
    def render(self):
        """
        Plots the state of the environment. For visulization purposes only. 

        """
        pass
    
    def set_route_map(self):
        """
         Creates and stores all possible routes in the network. A route consists of a list of traffic light ids.
          Routes assume particular numbering of the network exits and traffic lights (within a node) - starting from the street on the left(west), clockwise rotation.
          Traffic lights with even numbers are left turns.
        """
        self.route_map = {}

        if(self.n_traffic_nodes == 2): # 6 network exits
          self.route_map[(1,2)] = [2]
          self.route_map[(1,3)] = [1,10]
          self.route_map[(1,4)] = [1,9]
          self.route_map[(1,5)] = [1,9]
          self.route_map[(1,6)] = [1]

          self.route_map[(2,1)] = [3]
          self.route_map[(2,3)] = [4,10]
          self.route_map[(2,4)] = [4,9]
          self.route_map[(2,5)] = [4,9]
          self.route_map[(2,6)] = [3]

          self.route_map[(3,1)] = [11,5]
          self.route_map[(3,2)] = [11,5]
          self.route_map[(3,4)] = [12]
          self.route_map[(3,5)] = [11]
          self.route_map[(3,6)] = [11,6]

          self.route_map[(4,1)] = [13,5]
          self.route_map[(4,2)] = [13,5]
          self.route_map[(4,3)] = [13]
          self.route_map[(4,5)] = [14]
          self.route_map[(4,6)] = [13,6]

          self.route_map[(5,1)] = [16,5]
          self.route_map[(5,2)] = [16,5]
          self.route_map[(5,3)] = [15]
          self.route_map[(5,4)] = [15]
          self.route_map[(5,6)] = [16,6]

          self.route_map[(6,1)] = [8]
          self.route_map[(6,2)] = [7]
          self.route_map[(6,3)] = [7,10]
          self.route_map[(6,4)] = [7,9]
          self.route_map[(6,5)] = [7,9]
    
    def get_route(self, entrance, destination):
        """
        Provides the route from entrance to destination in the network(city). A route consists of a list of traffic light ids.
        Parameters:
            entrance:     int                          
            destination:  int
        Returns:
            output:  list
                     list of traffic lights that form the route
        """
        return self.route_map[(entrance, destination)]


    def extract_state(self):
        """
        Extracts the state from self.cars
        Returns:
            state:  object
                    the state of the environment
        """      
        state = []
        for k in self.cars:
          car = self.cars[k]
          state.append((car[2], car[3], car[1]))

        return state

    def intersection_lights(self, action):
        """
        Provides the states of the traffic lights in an intersection, based on the action.
         The numbering of the traffic lights in the node starts from the street on left(west), clockwise rotation. Traffic lights with even numbers are left turns.
        Parameters:
            action:  int
                     the index of the respective action in intersection_actions
        Returns:
            output:  list
                     list of traffic light states, where each state can be one of ('green', 'red')
        """
        if action == 0: # west
          return ['green', 'green', 'red', 'red', 'red', 'red', 'red', 'red']
        elif action == 1: # north
          return ['red', 'red', 'green', 'green', 'red', 'red', 'red', 'red']
        elif action == 2: # east
          return ['red', 'red', 'red', 'red', 'green', 'green', 'red', 'red']
        elif action == 3: # south
          return ['red', 'red', 'red', 'red', 'red', 'red', 'green', 'green']
        elif action == 4: # east-west
          return ['green', 'red', 'red', 'red', 'green', 'red', 'red', 'red']
        elif action == 5: # south-north
          return ['red', 'red', 'green', 'red', 'red', 'red', 'green', 'red']
    
    def traffic_light_state(self, traffic_light):
        """
        Provides the states of the traffic lights in an intersection, based on the action.
        Parameters:
            traffic_light:  int
                            the identifier of the traffic light
        Returns:
            output:  string
                     the state of the traffic light ('green', 'red')
        """
        return self.traffic_lights[traffic_light][0]

    def update_tl_queue(self, traffic_light, place):
        """
        Updates the occupied places for the specified traffic light
        Parameters:
          traffic_light:  int
                          the identifier of the traffic light
          value:          int
                          value to update the number of occupied places
          place:          int
                          the place being occupied that causes the change
        """
        tl = self.traffic_lights[traffic_light]
        if place in tl[1]:
          tl[1].remove(place)
        else:
          tl[1].append(place) 

    def queue_place_occupied(self, traffic_light, place):
        """
        Returns whether the place of the specified traffic light is occupied
        Parameters:
          traffic_light:  int
                          the identifier of the traffic light
          place:          int
                          the place to check
        Returns:
          output: boolean
                  True if the queue place is occupied
        """
        return (place in self.traffic_lights[traffic_light][1])
    
    def next_place(self, car):
        """
        Calculates the next queue place for a car
        Parameters:
          car:        int
                      the identifier of the car to be moved
        Returns:
          output:  (int,int,int)
                   values for (traffic_light, queue_place, route_step)
                   
        """
        c = self.cars[car]
        current_tl = c[2]
        current_queue_place = c[3]
        current_route_step = c[4]
        route = c[5]

        is_last_route_step = (current_route_step == len(route)-1)

        # the car should get out of the city
        if current_queue_place == 1 and is_last_route_step:
          output = (-1,-1,-1)
        # the car should cross an intersection and queue at another traffic light
        elif current_queue_place == 1:
          new_route_step = current_route_step + 1
          output = (route[new_route_step], self.n_tl_queue_places, new_route_step)
        # the car should move one place further within the same queue
        else:
          output = (current_tl, current_queue_place - 1, current_route_step)
        
        return output

    def move_car(self, car):
        """
        Moves car one place further. Assumption is made that the car (if exists) at the next queue place of the same traffic light has already moved.
        Parameters:
          car:        int
                      the identifier of the car to be moved
        Returns:
          output:  boolean
                   whether the car was able to move as opposed to being stuck
        """
        c = self.cars[car]
        entrance = c[0]
        destination = c[1]
        current_tl = c[2]
        current_queue_place = c[3]
        current_route_step = c[4]
        route = c[5]

        (new_tl, new_queue_place, new_route_step) = self.next_place(car)
        next_place_occupied = (False if new_tl == -1 else self.queue_place_occupied(new_tl, new_queue_place))
        tl_state = self.traffic_light_state(current_tl)

        can_move = True
        actually_moved = False
        left_city = False

        ### the car cannot move because of a red light
        if tl_state == 'red' and (current_queue_place == 1 or next_place_occupied):
          can_move = False
          actually_moved = False

        ### handle when the car gets out of the city
        elif tl_state == 'green' and new_tl == -1:
          # decrease the number of cars before this traffic light and remove the car
          self.update_tl_queue(current_tl, 1)
          self.cars.pop(car)
          actually_moved = True
          left_city = True
 
        ### handle when the car stays in the city
        
        # the car crosses the intersection and queues at the next traffic light
        elif tl_state == 'green' and current_queue_place == 1:
          if not next_place_occupied:
            # update the queue of cars before current and next traffic lights
            self.update_tl_queue(current_tl, 1)
            self.update_tl_queue(new_tl, new_queue_place)
            actually_moved = True
          
          # explicitly state that the car is stuck and the last car at the next traffic light should move first
          else:
            actually_moved = False

        # the car stays in the same queue - movement is possible on both green(always) and red(if next place unoccupied) light
        else:
          if not next_place_occupied:
            # update the queue of cars before this traffic light
            self.update_tl_queue(new_tl, current_queue_place)
            self.update_tl_queue(new_tl, new_queue_place)
            actually_moved = True
          
        # update car information
        if actually_moved and not left_city:
          self.cars[car] = (entrance, destination, new_tl, new_queue_place, new_route_step, route)
        
        flag_stuck = can_move and not actually_moved
        
        return not flag_stuck
   

In [197]:
env = TrafficLightNetworkEnv(n_time_steps=1, n_traffic_nodes=2, n_tl_queue_places=20, n_initial_cars=160, seed=1)
print(env.traffic_nodes)
print(env.state)

{1: [1, 2, 3, 4, 5, 6, 7, 8], 2: [9, 10, 11, 12, 13, 14, 15, 16]}
[(11, 10, 2), (11, 8, 6), (13, 6, 1), (3, 15, 6), (1, 1, 6), (11, 2, 1), (13, 14, 3), (7, 8, 5), (7, 18, 3), (3, 14, 6), (16, 16, 6), (14, 10, 5), (1, 7, 3), (7, 15, 5), (15, 14, 3), (7, 7, 3), (13, 8, 6), (11, 5, 2), (13, 19, 3), (16, 8, 2), (7, 1, 4), (15, 11, 4), (13, 1, 6), (15, 1, 3), (11, 14, 1), (2, 16, 2), (7, 6, 5), (12, 18, 4), (15, 17, 4), (13, 15, 6), (7, 2, 3), (3, 1, 6), (2, 11, 2), (4, 14, 3), (2, 8, 2), (3, 5, 1), (11, 6, 2), (16, 5, 2), (16, 10, 6), (15, 19, 4), (7, 17, 4), (11, 16, 5), (16, 15, 2), (4, 15, 3), (12, 19, 4), (1, 13, 4), (3, 8, 6), (4, 17, 5), (11, 1, 1), (4, 3, 4), (15, 10, 4), (12, 13, 4), (16, 17, 1), (16, 4, 2), (1, 5, 4), (7, 9, 3), (4, 12, 4), (13, 17, 2), (13, 12, 3), (4, 18, 3), (14, 5, 5), (1, 17, 5), (16, 9, 1), (13, 3, 2), (1, 6, 6), (15, 13, 4), (7, 19, 3), (3, 13, 6), (13, 11, 3), (4, 16, 3), (16, 6, 6), (1, 14, 4), (1, 4, 5), (4, 19, 4), (4, 8, 3), (3, 6, 6), (16, 14, 6), (13

In [207]:
a = env.intersection_actions
for i in range(0,100):
  a1=random.choice(a)
  a2=random.choice(a)
  _,_,_ = env.step([(1,a1), (2,a1)])

stuck cars:  []
stuck cars:  []
stuck cars:  []
stuck cars:  []
stuck cars:  []
stuck cars:  []
stuck cars:  []
stuck cars:  []
stuck cars:  []
stuck cars:  []
stuck cars:  [75, 92, 93, 57, 150, 34, 44, 70, 48, 60, 74]
stuck cars:  []
stuck cars:  [110, 73, 55, 65, 13, 106, 108, 81, 79, 46, 72, 118, 142, 62, 114, 104]
stuck cars:  []
stuck cars:  []
stuck cars:  [27, 16, 8, 56, 132, 94, 113, 152, 122, 14, 80, 41, 9, 67]
stuck cars:  []
stuck cars:  []
stuck cars:  [73, 55, 65, 13, 106, 108, 81, 79, 46, 72, 118, 142, 62, 114, 104]
stuck cars:  []
stuck cars:  []
stuck cars:  []
stuck cars:  [132, 94, 113, 152, 122, 14, 80, 41, 9, 67]
stuck cars:  []
stuck cars:  []
stuck cars:  []
stuck cars:  [73, 55, 65, 13, 106, 108, 81, 79, 46, 72, 118, 142, 62, 114, 104]
stuck cars:  []
stuck cars:  []
stuck cars:  []
stuck cars:  []
stuck cars:  [152, 122, 14, 80, 41, 9, 67]
stuck cars:  []
stuck cars:  [152, 122, 14, 80, 41, 9, 67]
stuck cars:  []
stuck cars:  []
stuck cars:  []
stuck cars:  []
s

In [184]:
env.traffic_lights

{1: ('green', [2]),
 2: ('green', []),
 3: ('red', [9, 10]),
 4: ('red', []),
 5: ('red', []),
 6: ('red', []),
 7: ('red', [2, 3, 10, 13]),
 8: ('red', []),
 9: ('green', []),
 10: ('green', []),
 11: ('red', [1, 2, 3, 5]),
 12: ('red', []),
 13: ('red', [1, 3, 9, 14]),
 14: ('red', [5]),
 15: ('red', [9]),
 16: ('red', [3, 11])}

In [209]:
# print cars
[(key, value) for (key, value) in sorted(env.cars.items(), key = lambda x: (x[1][2], x[1][3]))]

[(43, (5, 2, 5, 1, 1, [16, 5])),
 (53, (5, 1, 5, 6, 1, [16, 5])),
 (107, (5, 2, 5, 13, 1, [16, 5])),
 (77, (5, 6, 6, 1, 1, [16, 6])),
 (11, (5, 6, 6, 3, 1, [16, 6])),
 (62, (1, 5, 9, 1, 1, [1, 9])),
 (74, (2, 4, 9, 2, 1, [4, 9])),
 (114, (1, 4, 9, 3, 1, [1, 9])),
 (104, (1, 4, 9, 4, 1, [1, 9])),
 (44, (2, 3, 10, 1, 1, [4, 10])),
 (70, (2, 3, 10, 2, 1, [4, 10])),
 (60, (2, 3, 10, 3, 1, [4, 10])),
 (9, (6, 3, 10, 4, 1, [7, 10])),
 (67, (6, 3, 10, 5, 1, [7, 10]))]