-
Notifications
You must be signed in to change notification settings - Fork 367
/
highway_ramps.py
234 lines (195 loc) · 9 KB
/
highway_ramps.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
"""Contains the highway with ramps scenario class."""
from flow.scenarios.base_scenario import Scenario
from flow.core.params import InitialConfig, TrafficLightParams
from collections import defaultdict
from numpy import pi, sin, cos
ADDITIONAL_NET_PARAMS = {
# lengths of highway, on-ramps and off-ramps respectively
"highway_length": 300,
"on_ramps_length": 100,
"off_ramps_length": 100,
# number of lanes on highway, on-ramps and off-ramps respectively
"highway_lanes": 1,
"on_ramps_lanes": 1,
"off_ramps_lanes": 1,
# speed limit on highway, on-ramps and off-ramps respectively
"highway_speed": 10,
"on_ramps_speed": 10,
"off_ramps_speed": 10,
# positions of the on-ramps
"on_ramps_pos": [],
# positions of the off-ramps
"off_ramps_pos": [],
# probability for a vehicle to exit the highway at the next off-ramp
"next_off_ramp_proba": 0.2
}
class HighwayRampsScenario(Scenario):
"""Scenario class for a highway section with on and off ramps.
This scenario consists of a single or multi-lane highway network with a
variable number of on-ramps and off-ramps at arbitrary positions,
with arbitrary numbers of lanes. It can be used to generate periodic
perturbation on a more realistic highway.
Parameters in net_params:
* **highway_length** : total length of the highway
* **on_ramps_length** : length of each on-ramp
* **off_ramps_length** : length of each off-ramp
* **highway_lanes** : number of lanes on the highway
* **on_ramps_lanes** : number of lanes on each on-ramp
* **off_ramps_lanes** : number of lanes on each off-ramp
* **highway_speed** : speed limit on the highway
* **on_ramps_speed** : speed limit on each on-ramp
* **off_ramps_speed** : speed limit on each off-ramp
* **on_ramps_pos** : positions of the in-ramps on the highway (int list)
* **off_ramps_pos** : positions of the off-ramps on the highway (int list)
* **next_off_ramp_proba** : probability for a vehicle to exit the highway
at the next off-ramp
"""
def __init__(self,
name,
vehicles,
net_params,
initial_config=InitialConfig(),
traffic_lights=TrafficLightParams()):
"""Initialize a highway with on and off ramps scenario."""
for p in ADDITIONAL_NET_PARAMS.keys():
if p not in net_params.additional_params:
raise KeyError('Network parameter "{}" not supplied'.format(p))
# load parameters into class
params = net_params.additional_params
self.highway_length = params['highway_length']
self.on_ramps_length = params['on_ramps_length']
self.off_ramps_length = params['off_ramps_length']
self.highway_lanes = params['highway_lanes']
self.on_ramps_lanes = params['on_ramps_lanes']
self.off_ramps_lanes = params['off_ramps_lanes']
self.highway_speed = params['highway_speed']
self.on_ramps_speed = params['on_ramps_speed']
self.off_ramps_speed = params['off_ramps_speed']
self.on_ramps_pos = params['on_ramps_pos']
self.off_ramps_pos = params['off_ramps_pos']
self.p = params['next_off_ramp_proba']
# generate position of all network nodes
self.ramps_pos = sorted(self.on_ramps_pos + self.off_ramps_pos)
self.nodes_pos = sorted(list(set([0] + self.ramps_pos +
[self.highway_length])))
# highway_pos[x] = id of the highway node whose starting position is x
self.highway_pos = {x: i for i, x in enumerate(self.nodes_pos)}
# ramp_pos[x] = id of the ramp node whose intersection with the highway
# is at position x
self.ramp_pos = {x: "on_ramp_{}".format(i)
for i, x in enumerate(self.on_ramps_pos)}
self.ramp_pos.update({x: "off_ramp_{}".format(i)
for i, x in enumerate(self.off_ramps_pos)})
# make sure scenario is constructible
if (len(self.ramps_pos) > 0 and
(min(self.ramps_pos) <= 0 or
max(self.ramps_pos) >= self.highway_length)):
raise ValueError('All ramps positions should be positive and less '
'than highway length. Current ramps positions: {}'
'. Current highway length: {}.'.format(
self.ramps_pos, self.highway_length))
if len(self.ramps_pos) != len(list(set(self.ramps_pos))):
raise ValueError('Two ramps positions cannot be equal.')
super().__init__(name, vehicles, net_params, initial_config,
traffic_lights)
def specify_nodes(self, net_params):
"""See parent class."""
angle_on_ramps = - 3 * pi / 4
angle_off_ramps = - pi / 4
nodes_highway = [{
"id": "highway_{}".format(i),
"x": self.nodes_pos[i],
"y": 0
} for i in range(len(self.nodes_pos))]
nodes_on_ramps = [{
"id": "on_ramp_{}".format(i),
"x": x + self.on_ramps_length * cos(angle_on_ramps),
"y": self.on_ramps_length * sin(angle_on_ramps)
} for i, x in enumerate(self.on_ramps_pos)]
nodes_off_ramps = [{
"id": "off_ramp_{}".format(i),
"x": x + self.off_ramps_length * cos(angle_off_ramps),
"y": self.off_ramps_length * sin(angle_off_ramps)
} for i, x in enumerate(self.off_ramps_pos)]
return nodes_highway + nodes_on_ramps + nodes_off_ramps
def specify_edges(self, net_params):
"""See parent class."""
highway_edges = [{
"id": "highway_{}".format(i),
"type": "highway",
"from": "highway_{}".format(i),
"to": "highway_{}".format(i + 1),
"length": self.nodes_pos[i + 1] - self.nodes_pos[i]
} for i in range(len(self.nodes_pos) - 1)]
on_ramps_edges = [{
"id": "on_ramp_{}".format(i),
"type": "on_ramp",
"from": "on_ramp_{}".format(i),
"to": "highway_{}".format(self.highway_pos[x]),
"length": self.on_ramps_length
} for i, x in enumerate(self.on_ramps_pos)]
off_ramps_edges = [{
"id": "off_ramp_{}".format(i),
"type": "off_ramp",
"from": "highway_{}".format(self.highway_pos[x]),
"to": "off_ramp_{}".format(i),
"length": self.off_ramps_length
} for i, x in enumerate(self.off_ramps_pos)]
return highway_edges + on_ramps_edges + off_ramps_edges
def specify_routes(self, net_params):
"""See parent class."""
def get_routes(start_node_pos):
"""Compute the routes recursively."""
if start_node_pos not in self.nodes_pos:
raise ValueError('{} is not a node position.'.format(
start_node_pos))
id_highway_node = self.highway_pos[start_node_pos]
if id_highway_node + 1 >= len(self.nodes_pos):
return [(["highway_{}".format(id_highway_node - 1)], 1)]
id_ramp_node = self.ramp_pos[start_node_pos]
routes = get_routes(self.nodes_pos[id_highway_node + 1])
if id_ramp_node.startswith("on"):
return ([
(["highway_{}".format(id_highway_node - 1)] + route, prob)
for route, prob in routes if not route[0].startswith("on")
] + [
([id_ramp_node] + route, prob)
for route, prob in routes if not route[0].startswith("on")
] + [
(route, prob)
for route, prob in routes if route[0].startswith("on")
])
else:
return ([
(["highway_{}".format(id_highway_node - 1)] + route,
(1 - self.p) * prob)
for route, prob in routes if not route[0].startswith("on")
] + [
(["highway_{}".format(id_highway_node - 1), id_ramp_node],
self.p * prob)
for route, prob in routes if not route[0].startswith("on")
] + [
(route, prob)
for route, prob in routes if route[0].startswith("on")
])
routes = get_routes(self.nodes_pos[1])
rts = defaultdict(list)
for route, prob in routes:
rts[route[0]].append((route, prob))
return rts
def specify_types(self, net_params):
"""See parent class."""
types = [{
"id": "highway",
"numLanes": self.highway_lanes,
"speed": self.highway_speed
}, {
"id": "on_ramp",
"numLanes": self.on_ramps_lanes,
"speed": self.on_ramps_speed
}, {
"id": "off_ramp",
"numLanes": self.off_ramps_lanes,
"speed": self.off_ramps_speed
}]
return types