-
Notifications
You must be signed in to change notification settings - Fork 353
/
follow_leading_vehicle.py
325 lines (268 loc) · 15.2 KB
/
follow_leading_vehicle.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
#!/usr/bin/env python
# Copyright (c) 2018-2020 Intel Corporation
#
# This work is licensed under the terms of the MIT license.
# For a copy, see <https://opensource.org/licenses/MIT>.
"""
Follow leading vehicle scenario:
The scenario realizes a common driving behavior, in which the
user-controlled ego vehicle follows a leading car driving down
a given road. At some point the leading car has to slow down and
finally stop. The ego vehicle has to react accordingly to avoid
a collision. The scenario ends either via a timeout, or if the ego
vehicle stopped close enough to the leading vehicle
"""
import random
import py_trees
import carla
from srunner.scenariomanager.carla_data_provider import CarlaDataProvider
from srunner.scenariomanager.scenarioatomics.atomic_behaviors import (ActorTransformSetter,
ActorDestroy,
KeepVelocity,
StopVehicle,
WaypointFollower)
from srunner.scenariomanager.scenarioatomics.atomic_criteria import CollisionTest
from srunner.scenariomanager.scenarioatomics.atomic_trigger_conditions import (InTriggerDistanceToVehicle,
InTriggerDistanceToNextIntersection,
DriveDistance,
StandStill)
from srunner.scenariomanager.timer import TimeOut
from srunner.scenarios.basic_scenario import BasicScenario
from srunner.tools.scenario_helper import get_waypoint_in_distance
class FollowLeadingVehicle(BasicScenario):
"""
This class holds everything required for a simple "Follow a leading vehicle"
scenario involving two vehicles. (Traffic Scenario 2)
This is a single ego vehicle scenario
"""
timeout = 120 # Timeout of scenario in seconds
def __init__(self, world, ego_vehicles, config, randomize=False, debug_mode=False, criteria_enable=True,
timeout=60):
"""
Setup all relevant parameters and create scenario
If randomize is True, the scenario parameters are randomized
"""
self._map = CarlaDataProvider.get_map()
self._first_vehicle_location = 25
self._first_vehicle_speed = 10
self._reference_waypoint = self._map.get_waypoint(config.trigger_points[0].location)
self._other_actor_max_brake = 1.0
self._other_actor_stop_in_front_intersection = 20
self._other_actor_transform = None
# Timeout of scenario in seconds
self.timeout = timeout
super(FollowLeadingVehicle, self).__init__("FollowVehicle",
ego_vehicles,
config,
world,
debug_mode,
criteria_enable=criteria_enable)
if randomize:
self._ego_other_distance_start = random.randint(4, 8)
# Example code how to randomize start location
# distance = random.randint(20, 80)
# new_location, _ = get_location_in_distance(self.ego_vehicles[0], distance)
# waypoint = CarlaDataProvider.get_map().get_waypoint(new_location)
# waypoint.transform.location.z += 39
# self.other_actors[0].set_transform(waypoint.transform)
def _initialize_actors(self, config):
"""
Custom initialization
"""
first_vehicle_waypoint, _ = get_waypoint_in_distance(self._reference_waypoint, self._first_vehicle_location)
self._other_actor_transform = carla.Transform(
carla.Location(first_vehicle_waypoint.transform.location.x,
first_vehicle_waypoint.transform.location.y,
first_vehicle_waypoint.transform.location.z + 1),
first_vehicle_waypoint.transform.rotation)
first_vehicle_transform = carla.Transform(
carla.Location(self._other_actor_transform.location.x,
self._other_actor_transform.location.y,
self._other_actor_transform.location.z - 500),
self._other_actor_transform.rotation)
first_vehicle = CarlaDataProvider.request_new_actor('vehicle.nissan.patrol', first_vehicle_transform)
first_vehicle.set_simulate_physics(enabled=False)
self.other_actors.append(first_vehicle)
def _create_behavior(self):
"""
The scenario defined after is a "follow leading vehicle" scenario. After
invoking this scenario, it will wait for the user controlled vehicle to
enter the start region, then make the other actor to drive until reaching
the next intersection. Finally, the user-controlled vehicle has to be close
enough to the other actor to end the scenario.
If this does not happen within 60 seconds, a timeout stops the scenario
"""
# to avoid the other actor blocking traffic, it was spawed elsewhere
# reset its pose to the required one
start_transform = ActorTransformSetter(self.other_actors[0], self._other_actor_transform)
# let the other actor drive until next intersection
# @todo: We should add some feedback mechanism to respond to ego_vehicle behavior
driving_to_next_intersection = py_trees.composites.Parallel(
"DrivingTowardsIntersection",
policy=py_trees.common.ParallelPolicy.SUCCESS_ON_ONE)
driving_to_next_intersection.add_child(WaypointFollower(self.other_actors[0], self._first_vehicle_speed))
driving_to_next_intersection.add_child(InTriggerDistanceToNextIntersection(
self.other_actors[0], self._other_actor_stop_in_front_intersection))
# stop vehicle
stop = StopVehicle(self.other_actors[0], self._other_actor_max_brake)
# end condition
endcondition = py_trees.composites.Parallel("Waiting for end position",
policy=py_trees.common.ParallelPolicy.SUCCESS_ON_ALL)
endcondition_part1 = InTriggerDistanceToVehicle(self.other_actors[0],
self.ego_vehicles[0],
distance=20,
name="FinalDistance")
endcondition_part2 = StandStill(self.ego_vehicles[0], name="StandStill")
endcondition.add_child(endcondition_part1)
endcondition.add_child(endcondition_part2)
# Build behavior tree
sequence = py_trees.composites.Sequence("Sequence Behavior")
sequence.add_child(start_transform)
sequence.add_child(driving_to_next_intersection)
sequence.add_child(stop)
sequence.add_child(endcondition)
sequence.add_child(ActorDestroy(self.other_actors[0]))
return sequence
def _create_test_criteria(self):
"""
A list of all test criteria will be created that is later used
in parallel behavior tree.
"""
criteria = []
collision_criterion = CollisionTest(self.ego_vehicles[0])
criteria.append(collision_criterion)
return criteria
def __del__(self):
"""
Remove all actors upon deletion
"""
self.remove_all_actors()
class FollowLeadingVehicleWithObstacle(BasicScenario):
"""
This class holds a scenario similar to FollowLeadingVehicle
but there is an obstacle in front of the leading vehicle
This is a single ego vehicle scenario
"""
timeout = 120 # Timeout of scenario in seconds
def __init__(self, world, ego_vehicles, config, randomize=False, debug_mode=False, criteria_enable=True):
"""
Setup all relevant parameters and create scenario
"""
self._map = CarlaDataProvider.get_map()
self._first_actor_location = 25
self._second_actor_location = self._first_actor_location + 41
self._first_actor_speed = 10
self._second_actor_speed = 1.5
self._reference_waypoint = self._map.get_waypoint(config.trigger_points[0].location)
self._other_actor_max_brake = 1.0
self._first_actor_transform = None
self._second_actor_transform = None
super(FollowLeadingVehicleWithObstacle, self).__init__("FollowLeadingVehicleWithObstacle",
ego_vehicles,
config,
world,
debug_mode,
criteria_enable=criteria_enable)
if randomize:
self._ego_other_distance_start = random.randint(4, 8)
def _initialize_actors(self, config):
"""
Custom initialization
"""
first_actor_waypoint, _ = get_waypoint_in_distance(self._reference_waypoint, self._first_actor_location)
second_actor_waypoint, _ = get_waypoint_in_distance(self._reference_waypoint, self._second_actor_location)
first_actor_transform = carla.Transform(
carla.Location(first_actor_waypoint.transform.location.x,
first_actor_waypoint.transform.location.y,
first_actor_waypoint.transform.location.z - 500),
first_actor_waypoint.transform.rotation)
self._first_actor_transform = carla.Transform(
carla.Location(first_actor_waypoint.transform.location.x,
first_actor_waypoint.transform.location.y,
first_actor_waypoint.transform.location.z + 1),
first_actor_waypoint.transform.rotation)
yaw_1 = second_actor_waypoint.transform.rotation.yaw + 90
second_actor_transform = carla.Transform(
carla.Location(second_actor_waypoint.transform.location.x,
second_actor_waypoint.transform.location.y,
second_actor_waypoint.transform.location.z - 500),
carla.Rotation(second_actor_waypoint.transform.rotation.pitch, yaw_1,
second_actor_waypoint.transform.rotation.roll))
self._second_actor_transform = carla.Transform(
carla.Location(second_actor_waypoint.transform.location.x,
second_actor_waypoint.transform.location.y,
second_actor_waypoint.transform.location.z + 1),
carla.Rotation(second_actor_waypoint.transform.rotation.pitch, yaw_1,
second_actor_waypoint.transform.rotation.roll))
first_actor = CarlaDataProvider.request_new_actor(
'vehicle.nissan.patrol', first_actor_transform)
second_actor = CarlaDataProvider.request_new_actor(
'vehicle.diamondback.century', second_actor_transform)
first_actor.set_simulate_physics(enabled=False)
second_actor.set_simulate_physics(enabled=False)
self.other_actors.append(first_actor)
self.other_actors.append(second_actor)
def _create_behavior(self):
"""
The scenario defined after is a "follow leading vehicle" scenario. After
invoking this scenario, it will wait for the user controlled vehicle to
enter the start region, then make the other actor to drive towards obstacle.
Once obstacle clears the road, make the other actor to drive towards the
next intersection. Finally, the user-controlled vehicle has to be close
enough to the other actor to end the scenario.
If this does not happen within 60 seconds, a timeout stops the scenario
"""
# let the other actor drive until next intersection
driving_to_next_intersection = py_trees.composites.Parallel(
"Driving towards Intersection",
policy=py_trees.common.ParallelPolicy.SUCCESS_ON_ONE)
obstacle_clear_road = py_trees.composites.Parallel("Obstalce clearing road",
policy=py_trees.common.ParallelPolicy.SUCCESS_ON_ONE)
obstacle_clear_road.add_child(DriveDistance(self.other_actors[1], 4))
obstacle_clear_road.add_child(KeepVelocity(self.other_actors[1], self._second_actor_speed))
stop_near_intersection = py_trees.composites.Parallel(
"Waiting for end position near Intersection",
policy=py_trees.common.ParallelPolicy.SUCCESS_ON_ONE)
stop_near_intersection.add_child(WaypointFollower(self.other_actors[0], 10))
stop_near_intersection.add_child(InTriggerDistanceToNextIntersection(self.other_actors[0], 20))
driving_to_next_intersection.add_child(WaypointFollower(self.other_actors[0], self._first_actor_speed))
driving_to_next_intersection.add_child(InTriggerDistanceToVehicle(self.other_actors[1],
self.other_actors[0], 15))
# end condition
endcondition = py_trees.composites.Parallel("Waiting for end position",
policy=py_trees.common.ParallelPolicy.SUCCESS_ON_ALL)
endcondition_part1 = InTriggerDistanceToVehicle(self.other_actors[0],
self.ego_vehicles[0],
distance=20,
name="FinalDistance")
endcondition_part2 = StandStill(self.ego_vehicles[0], name="FinalSpeed")
endcondition.add_child(endcondition_part1)
endcondition.add_child(endcondition_part2)
# Build behavior tree
sequence = py_trees.composites.Sequence("Sequence Behavior")
sequence.add_child(ActorTransformSetter(self.other_actors[0], self._first_actor_transform))
sequence.add_child(ActorTransformSetter(self.other_actors[1], self._second_actor_transform))
sequence.add_child(driving_to_next_intersection)
sequence.add_child(StopVehicle(self.other_actors[0], self._other_actor_max_brake))
sequence.add_child(TimeOut(3))
sequence.add_child(obstacle_clear_road)
sequence.add_child(stop_near_intersection)
sequence.add_child(StopVehicle(self.other_actors[0], self._other_actor_max_brake))
sequence.add_child(endcondition)
sequence.add_child(ActorDestroy(self.other_actors[0]))
sequence.add_child(ActorDestroy(self.other_actors[1]))
return sequence
def _create_test_criteria(self):
"""
A list of all test criteria will be created that is later used
in parallel behavior tree.
"""
criteria = []
collision_criterion = CollisionTest(self.ego_vehicles[0])
criteria.append(collision_criterion)
return criteria
def __del__(self):
"""
Remove all actors upon deletion
"""
self.remove_all_actors()