-
Notifications
You must be signed in to change notification settings - Fork 367
/
green_wave_env.py
574 lines (497 loc) · 21.8 KB
/
green_wave_env.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
import numpy as np
import re
from gym.spaces.discrete import Discrete
from gym.spaces.box import Box
from gym.spaces.tuple_space import Tuple
from flow.core import rewards
from flow.envs.base_env import Env
ADDITIONAL_ENV_PARAMS = {
# minimum switch time for each traffic light (in seconds)
"switch_time": 2.0,
# whether the traffic lights should be actuated by sumo or RL
# options are "controlled" and "actuated"
"tl_type": "controlled",
# determines whether the action space is meant to be discrete or continuous
"discrete": False,
}
ADDITIONAL_PO_ENV_PARAMS = {
# num of vehicles the agent can observe on each incoming edge
"num_observed": 2,
# velocity to use in reward functions
"target_velocity": 30,
}
class TrafficLightGridEnv(Env):
"""Environment used to train traffic lights to regulate traffic flow
through an n x m grid.
Required from env_params:
* switch_time: minimum switch time for each traffic light (in seconds).
Earlier RL commands are ignored.
* tl_type: whether the traffic lights should be actuated by sumo or RL
options are respectively "actuated" and "controlled"
* discrete: determines whether the action space is meant to be discrete or
continuous
States
An observation is the distance of each vehicle to its intersection, a
number uniquely identifying which edge the vehicle is on, and the speed
of the vehicle.
Actions
The action space consist of a list of float variables ranging from 0-1
specifying whether a traffic light is supposed to switch or not. The
actions are sent to the traffic light in the grid from left to right
and then top to bottom.
Rewards
The reward is the negative per vehicle delay minus a penalty for
switching traffic lights
Termination
A rollout is terminated once the time horizon is reached.
Additional
Vehicles are rerouted to the start of their original routes once they
reach the end of the network in order to ensure a constant number of
vehicles.
"""
def __init__(self, env_params, sim_params, scenario, simulator='traci'):
for p in ADDITIONAL_ENV_PARAMS.keys():
if p not in env_params.additional_params:
raise KeyError(
'Environment parameter "{}" not supplied'.format(p))
self.grid_array = scenario.net_params.additional_params["grid_array"]
self.rows = self.grid_array["row_num"]
self.cols = self.grid_array["col_num"]
# self.num_observed = self.grid_array.get("num_observed", 3)
self.num_traffic_lights = self.rows * self.cols
self.tl_type = env_params.additional_params.get('tl_type')
super().__init__(env_params, sim_params, scenario, simulator)
# Saving env variables for plotting
self.steps = env_params.horizon
self.obs_var_labels = {
'edges': np.zeros((self.steps, self.k.vehicle.num_vehicles)),
'velocities': np.zeros((self.steps, self.k.vehicle.num_vehicles)),
'positions': np.zeros((self.steps, self.k.vehicle.num_vehicles))
}
self.node_mapping = scenario.get_node_mapping()
# Keeps track of the last time the traffic lights in an intersection
# were allowed to change (the last time the lights were allowed to
# change from a red-green state to a red-yellow state.)
self.last_change = np.zeros((self.rows * self.cols, 1))
# Keeps track of the direction of the intersection (the direction that
# is currently being allowed to flow. 0 indicates flow from top to
# bottom, and 1 indicates flow from left to right.)
self.direction = np.zeros((self.rows * self.cols, 1))
# Value of 1 indicates that the intersection is in a red-yellow state.
# value 0 indicates that the intersection is in a red-green state.
self.currently_yellow = np.zeros((self.rows * self.cols, 1))
# when this hits min_switch_time we change from yellow to red
# the second column indicates the direction that is currently being
# allowed to flow. 0 is flowing top to bottom, 1 is left to right
# For third column, 0 signifies yellow and 1 green or red
self.min_switch_time = env_params.additional_params["switch_time"]
if self.tl_type != "actuated":
for i in range(self.rows * self.cols):
self.k.traffic_light.set_state(
node_id='center' + str(i), state="GrGr")
self.currently_yellow[i] = 0
# # Additional Information for Plotting
# self.edge_mapping = {"top": [], "bot": [], "right": [], "left": []}
# for i, veh_id in enumerate(self.k.vehicle.get_ids()):
# edge = self.k.vehicle.get_edge(veh_id)
# for key in self.edge_mapping:
# if key in edge:
# self.edge_mapping[key].append(i)
# break
# check whether the action space is meant to be discrete or continuous
self.discrete = env_params.additional_params.get("discrete", False)
@property
def action_space(self):
"""See class definition."""
if self.discrete:
return Discrete(2 ** self.num_traffic_lights)
else:
return Box(
low=-1,
high=1,
shape=(self.num_traffic_lights,),
dtype=np.float32)
@property
def observation_space(self):
"""See class definition."""
speed = Box(
low=0,
high=1,
shape=(self.initial_vehicles.num_vehicles,),
dtype=np.float32)
dist_to_intersec = Box(
low=0.,
high=np.inf,
shape=(self.initial_vehicles.num_vehicles,),
dtype=np.float32)
edge_num = Box(
low=0.,
high=1,
shape=(self.initial_vehicles.num_vehicles,),
dtype=np.float32)
traffic_lights = Box(
low=0.,
high=1,
shape=(3 * self.rows * self.cols,),
dtype=np.float32)
return Tuple((speed, dist_to_intersec, edge_num, traffic_lights))
def get_state(self):
"""See class definition."""
# compute the normalizers
grid_array = self.net_params.additional_params["grid_array"]
max_dist = max(grid_array["short_length"],
grid_array["long_length"],
grid_array["inner_length"])
# get the state arrays
speeds = [
self.k.vehicle.get_speed(veh_id) / self.k.scenario.max_speed()
for veh_id in self.k.vehicle.get_ids()
]
dist_to_intersec = [
self.get_distance_to_intersection(veh_id) / max_dist
for veh_id in self.k.vehicle.get_ids()
]
edges = [
self._convert_edge(self.k.vehicle.get_edge(veh_id)) /
(self.k.scenario.network.num_edges - 1)
for veh_id in self.k.vehicle.get_ids()
]
state = [
speeds, dist_to_intersec, edges,
self.last_change.flatten().tolist(),
self.direction.flatten().tolist(),
self.currently_yellow.flatten().tolist()
]
return np.array(state)
def _apply_rl_actions(self, rl_actions):
"""See class definition."""
# check if the action space is discrete
if self.discrete:
# convert single value to list of 0's and 1's
rl_mask = [int(x) for x in list('{0:0b}'.format(rl_actions))]
rl_mask = [0] * (self.num_traffic_lights - len(rl_mask)) + rl_mask
else:
# convert values less than 0 to zero and above 0 to 1. 0 indicates
# that should not switch the direction, and 1 indicates that switch
# should happen
rl_mask = rl_actions > 0.0
for i, action in enumerate(rl_mask):
if self.currently_yellow[i] == 1: # currently yellow
self.last_change[i] += self.sim_step
# Check if our timer has exceeded the yellow phase, meaning it
# should switch to red
if self.last_change[i] >= self.min_switch_time:
if self.direction[i] == 0:
self.k.traffic_light.set_state(
node_id='center{}'.format(i),
state="GrGr")
else:
self.k.traffic_light.set_state(
node_id='center{}'.format(i),
state='rGrG')
self.currently_yellow[i] = 0
else:
if action:
if self.direction[i] == 0:
self.k.traffic_light.set_state(
node_id='center{}'.format(i),
state='yryr')
else:
self.k.traffic_light.set_state(
node_id='center{}'.format(i),
state='ryry')
self.last_change[i] = 0.0
self.direction[i] = not self.direction[i]
self.currently_yellow[i] = 1
def compute_reward(self, rl_actions, **kwargs):
"""See class definition."""
return - rewards.min_delay_unscaled(self) \
- rewards.boolean_action_penalty(rl_actions >= 0.5, gain=1.0)
# ===============================
# ============ UTILS ============
# ===============================
def get_distance_to_intersection(self, veh_ids):
"""Determines the smallest distance from the current vehicle's position
to any of the intersections.
Parameters
----------
veh_ids : str
vehicle identifier
Returns
-------
tup
1st element: distance to closest intersection
2nd element: intersection ID (which also specifies which side of
the intersection the vehicle will be arriving at)
"""
if isinstance(veh_ids, list):
return [self.find_intersection_dist(veh_id) for veh_id in veh_ids]
else:
return self.find_intersection_dist(veh_ids)
def find_intersection_dist(self, veh_id):
"""Return distance from the vehicle's current position to the position
of the node it is heading toward."""
edge_id = self.k.vehicle.get_edge(veh_id)
# FIXME this might not be the best way of handling this
if edge_id == "":
return -10
if 'center' in edge_id:
return 0
edge_len = self.k.scenario.edge_length(edge_id)
relative_pos = self.k.vehicle.get_position(veh_id)
dist = edge_len - relative_pos
return dist
def _convert_edge(self, edges):
"""Converts the string edge to a number.
Start at the bottom left vertical edge and going right and then up, so
the bottom left vertical edge is zero, the right edge beside it is 1.
The numbers are assigned along the lowest column, then the lowest row,
then the second lowest column, etc. Left goes before right, top goes
before bot.
The values are are zero indexed.
Parameters
----------
edges : list of str or str
name of the edge(s)
Returns
-------
list of int or int
a number uniquely identifying each edge
"""
if isinstance(edges, list):
return [self._split_edge(edge) for edge in edges]
else:
return self._split_edge(edges)
def _split_edge(self, edge):
"""Utility function for convert_edge"""
if edge:
if edge[0] == ":": # center
center_index = int(edge.split("center")[1][0])
base = ((self.cols + 1) * self.rows * 2) \
+ ((self.rows + 1) * self.cols * 2)
return base + center_index + 1
else:
pattern = re.compile(r"[a-zA-Z]+")
edge_type = pattern.match(edge).group()
edge = edge.split(edge_type)[1].split('_')
row_index, col_index = [int(x) for x in edge]
if edge_type in ['bot', 'top']:
rows_below = 2 * (self.cols + 1) * row_index
cols_below = 2 * (self.cols * (row_index + 1))
edge_num = rows_below + cols_below + 2 * col_index + 1
return edge_num if edge_type == 'bot' else edge_num + 1
if edge_type in ['left', 'right']:
rows_below = 2 * (self.cols + 1) * row_index
cols_below = 2 * (self.cols * row_index)
edge_num = rows_below + cols_below + 2 * col_index + 1
return edge_num if edge_type == 'left' else edge_num + 1
else:
return 0
def additional_command(self):
"""Used to insert vehicles that are on the exit edge and place them
back on their entrance edge."""
for veh_id in self.k.vehicle.get_ids():
self._reroute_if_final_edge(veh_id)
def _reroute_if_final_edge(self, veh_id):
"""Checks if an edge is the final edge. If it is return the route it
should start off at."""
edge = self.k.vehicle.get_edge(veh_id)
if edge == "":
return
if edge[0] == ":": # center edge
return
pattern = re.compile(r"[a-zA-Z]+")
edge_type = pattern.match(edge).group()
edge = edge.split(edge_type)[1].split('_')
row_index, col_index = [int(x) for x in edge]
# find the route that we're going to place the vehicle on if we are
# going to remove it
route_id = None
if edge_type == 'bot' and col_index == self.cols:
route_id = "bot{}_0".format(row_index)
elif edge_type == 'top' and col_index == 0:
route_id = "top{}_{}".format(row_index, self.cols)
elif edge_type == 'left' and row_index == 0:
route_id = "left{}_{}".format(self.rows, col_index)
elif edge_type == 'right' and row_index == self.rows:
route_id = "right0_{}".format(col_index)
if route_id is not None:
type_id = self.k.vehicle.get_type(veh_id)
lane_index = self.k.vehicle.get_lane(veh_id)
# remove the vehicle
self.k.vehicle.remove(veh_id)
# reintroduce it at the start of the network
self.k.vehicle.add(
veh_id=veh_id,
edge=route_id,
type_id=str(type_id),
lane=str(lane_index),
pos="0",
speed="max")
def k_closest_to_intersection(self, edges, k):
"""
Return the veh_id of the k closest vehicles to an intersection for
each edge. Performs no check on whether or not edge is going toward an
intersection or not. Does no padding
"""
if k < 0:
raise IndexError("k must be greater than 0")
dists = []
def sort_lambda(veh_id):
return self.get_distance_to_intersection(veh_id)
if isinstance(edges, list):
for edge in edges:
vehicles = self.k.vehicle.get_ids_by_edge(edge)
dist = sorted(
vehicles,
key=sort_lambda
)
dists += dist[:k]
else:
vehicles = self.k.vehicle.get_ids_by_edge(edges)
dist = sorted(
vehicles,
key=lambda veh_id: self.get_distance_to_intersection(veh_id))
dists += dist[:k]
return dists
class PO_TrafficLightGridEnv(TrafficLightGridEnv):
"""Environment used to train traffic lights to regulate traffic flow
through an n x m grid.
Required from env_params:
* switch_time: minimum switch time for each traffic light (in seconds).
Earlier RL commands are ignored.
* num_observed: number of vehicles nearest each intersection that is
observed in the state space; defaults to 2
States
An observation is the number of observe vehicles in each intersection
closest to the traffic lights, a
number uniquely identifying which edge the vehicle is on, and the speed
of the vehicle.
Actions
The action space consist of a list of float variables ranging from 0-1
specifying whether a traffic light is supposed to switch or not. The
actions are sent to the traffic light in the grid from left to right
and then top to bottom.
Rewards
The reward is the delay of each vehicle minus a penalty for switching
traffic lights
Termination
A rollout is terminated once the time horizon is reached.
Additional
Vehicles are rerouted to the start of their original routes once they
reach the end of the network in order to ensure a constant number of
vehicles.
"""
def __init__(self, env_params, sim_params, scenario, simulator='traci'):
super().__init__(env_params, sim_params, scenario, simulator)
for p in ADDITIONAL_PO_ENV_PARAMS.keys():
if p not in env_params.additional_params:
raise KeyError(
'Environment parameter "{}" not supplied'.format(p))
# number of vehicles nearest each intersection that is observed in the
# state space; defaults to 2
self.num_observed = env_params.additional_params.get("num_observed", 2)
# used during visualization
self.observed_ids = []
@property
def observation_space(self):
"""
Partially observed state space.
Velocities, distance to intersections, edge number (for nearby
vehicles), and traffic light state.
"""
tl_box = Box(
low=0.,
high=1,
shape=(12 * self.num_observed * self.num_traffic_lights +
2 * len(self.k.scenario.get_edge_list()) +
3 * self.num_traffic_lights,),
dtype=np.float32)
return tl_box
def get_state(self):
"""
Returns self.num_observed number of vehicles closest to each traffic
light and for each vehicle its velocity, distance to intersection,
edge_number traffic light state. This is partially observed
"""
speeds = []
dist_to_intersec = []
edge_number = []
max_speed = max(
self.k.scenario.speed_limit(edge)
for edge in self.k.scenario.get_edge_list())
grid_array = self.net_params.additional_params["grid_array"]
max_dist = max(grid_array["short_length"], grid_array["long_length"],
grid_array["inner_length"])
all_observed_ids = []
for node, edges in self.scenario.get_node_mapping():
for edge in edges:
observed_ids = \
self.k_closest_to_intersection(edge, self.num_observed)
all_observed_ids += observed_ids
# check which edges we have so we can always pad in the right
# positions
speeds += [
self.k.vehicle.get_speed(veh_id) / max_speed
for veh_id in observed_ids
]
dist_to_intersec += [
(self.k.scenario.edge_length(
self.k.vehicle.get_edge(veh_id))
- self.k.vehicle.get_position(veh_id)) / max_dist
for veh_id in observed_ids
]
edge_number += \
[self._convert_edge(self.k.vehicle.get_edge(veh_id))
/ (self.k.scenario.network.num_edges - 1)
for veh_id in observed_ids]
if len(observed_ids) < self.num_observed:
diff = self.num_observed - len(observed_ids)
speeds += [0] * diff
dist_to_intersec += [0] * diff
edge_number += [0] * diff
# now add in the density and average velocity on the edges
density = []
velocity_avg = []
for edge in self.k.scenario.get_edge_list():
ids = self.k.vehicle.get_ids_by_edge(edge)
if len(ids) > 0:
density += [5 * len(ids) / self.k.scenario.edge_length(edge)]
velocity_avg += [
np.mean(
[self.k.vehicle.get_speed(veh_id)
for veh_id in ids]) / max_speed
]
else:
density += [0]
velocity_avg += [0]
self.observed_ids = all_observed_ids
return np.array(
np.concatenate([
speeds, dist_to_intersec, edge_number, density, velocity_avg,
self.last_change.flatten().tolist(),
self.direction.flatten().tolist(),
self.currently_yellow.flatten().tolist()
]))
def compute_reward(self, rl_actions, **kwargs):
"""See class definition."""
if self.env_params.evaluate:
return - rewards.min_delay_unscaled(self)
else:
return (- rewards.min_delay_unscaled(self) +
rewards.penalize_standstill(self, gain=0.2))
def additional_command(self):
"""See class definition."""
# specify observed vehicles
[self.k.vehicle.set_observed(veh_id) for veh_id in self.observed_ids]
class GreenWaveTestEnv(TrafficLightGridEnv):
"""
Class that overrides RL methods of green wave so we can test
construction without needing to specify RL methods
"""
def _apply_rl_actions(self, rl_actions):
"""See class definition."""
pass
def compute_reward(self, rl_actions, **kwargs):
"""No return, for testing purposes."""
return 0