In [1]:
import gym
import numpy as np
from roundabout.env import Env


In [2]:
RLParams = {'desired_steps': 2400,
            'warmup_steps': 800,
            'Q_upperbound': 35,
            'Q_lowerbound': 20,
            'Q_randomness': 3}

In [12]:
class RLRoundabout(gym.Env):
    def __init__(self, envParams, initParams, RLParams):
        self.env = Env(envParams)
        self.initParams = initParams
        self.desired_steps = RLParams['desired_steps']
        self.warmup_steps = RLParams['warmup_steps']
        self.Q_upperbound = RLParams['Q_upperbound']
        self.Q_lowerbound = RLParams['Q_lowerbound']
        self.Q_randomness = RLParams['Q_randomness']
        self.action_space = gym.spaces.Box(low = 0.0, high = 1.0, shape = (4,), dtype=np.float32)
        self.observation_space = gym.spaces.Dict({"new_vehicles": gym.spaces.MultiBinary([4]),
                                                  "approaching_vehicles": gym.spaces.Box(low=0, high=100, shape = (4,), dtype=np.int64),
                                                  "slot_occupied_rate": gym.spaces.Box(low=0, high=1, shape = (1,), dtype=np.float32),
                                                  "current_queue": gym.spaces.Box(low = 0, high = 30, shape = (4,), dtype=np.int64)})
        
        self.env.initialize(self.initParams)
    
    def step(self, action):
        
        self.env.P = action
        self.env.Q = np.random.uniform(self.Q_lowerbound, self.Q_upperbound, 4) + np.random.normal(0, self.Q_randomness, 4)
        self.env.step()

        state = dict()
        state["approaching_vehicles"] = np.array(list(map(len, self.env.approaching_vehicles)))
        state["current_queue"] = np.array(self.env.queue_length)
        state["new_vehicles"] = np.array(self.env.new_vehicles)
        state["slot_occupied_rate"] =[1 - sum(list(map(lambda slot: slot.virtual_vehicle is None, self.env.slots)))/self.env.numSlots]
        
        reward = - sum(np.square(self.env.queue_length))

        done = (self.env.num_step >= self.desired_steps) 
        info = {}        
        return state, reward, done, info
    
    def reset(self): 
        self.env.initialize(self.initParams)

        for _ in range(self.warmup_steps):
            state, _, _, _ = self.step(action = np.ones(4))
        
        # state = dict()
        # state["new_vehicle"] = self.env.new_vehicles
        # state["approaching_vehicles"] = np.array(list(map(len, self.env.approaching_vehicles)))
        # state["current_queue"] = np.array(self.env.queue_length)
        
        return state

In [13]:
envParams = {
    "a": 3, 
    "v": 8,
    "C": 3,
    "veh_length": 5,
    "headway": 3,
    "Q": np.array([35, 35, 35, 35])*6/8,
    "step_size": 0.05,
    "boundary": 250,
    "eta": np.array([[0.0, 0.0, 5/7, 2/7],
                     [2/7, 0.0, 0.0, 5/7],
                     [5/7, 2/7, 0.0, 0.0],
                     [0.0, 5/7, 2/7, 0.0]]),
    "fifo": True
}

initParams = {
    "l_0": np.array([0, 0, 0, 0]),  
    #"l_0": np.array([14, 15, 13, 14]),  
}

In [14]:
test_env = RLRoundabout(envParams, initParams, RLParams)
test_env.reset()

{'approaching_vehicles': array([ 6,  6, 10,  7]),
 'current_queue': array([0, 0, 0, 0]),
 'new_vehicles': array([0, 1, 0, 1]),
 'slot_occupied_rate': [0.8333333333333334]}

In [6]:
test_env.step(test_env.action_space.sample())

({'approaching_vehicles': array([ 8, 10,  9,  3]),
  'current_queue': array([0, 1, 0, 0]),
  'new_vehicles': [0, 0, 0, 0],
  'slot_occupied_rate': [0.8333333333333334]},
 -1,
 False,
 {})

In [7]:
test_env.reset()

{'approaching_vehicles': array([5, 5, 4, 3]),
 'current_queue': array([0, 0, 0, 0]),
 'new_vehicles': [0, 0, 0, 0],
 'slot_occupied_rate': [0.9166666666666666]}

In [17]:
test_env.observation_space.sample()

OrderedDict([('approaching_vehicles', array([ 64, 100,  41,  13])),
             ('current_queue', array([20,  3, 30, 26])),
             ('new_vehicles', array([[1, 0, 0, 1]], dtype=int8)),
             ('slot_occupied_rate', array([0.5345858], dtype=float32))])

In [15]:
isinstance(OrderedDict(test_env.reset()), gym.spaces.Dict)

False

In [16]:
from collections import OrderedDict
OrderedDict(test_env.reset())

OrderedDict([('approaching_vehicles', array([7, 8, 3, 4])),
             ('current_queue', array([0, 0, 0, 0])),
             ('new_vehicles', array([0, 0, 1, 0])),
             ('slot_occupied_rate', [0.6666666666666667])])