-
Notifications
You must be signed in to change notification settings - Fork 1.4k
/
avoidance.py
354 lines (284 loc) · 12.4 KB
/
avoidance.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
from __future__ import print_function
"""
avoidance.py: Demonstrate autopilot collision avoidance functionality
Full documentation is provided at http://python.dronekit.io/examples/avoidance.html
"""
from dronekit import connect, VehicleMode, LocationGlobalRelative, LocationGlobal, Command
import time
import math
from pymavlink import mavutil
import threading
import dronekit
import mavlink_hub
#Set up option parsing to get connection string
import argparse
parser = argparse.ArgumentParser(description='Demonstrates autopilot automatic avoidance functionality.')
parser.add_argument('--extra-connection',
help="extra MAVLink connections to make.")
parser.add_argument('--binary',
help="path to autopilot binary to use")
parser.add_argument('--defaults',
help="path to autopilot defaults file to use")
parser.add_argument('--resolution', choices=["TCAS", "RTL", "PERPENDICULAR"],
help="Specify resolution behaviour for conflict",
default="PERPENDICULAR")
args = parser.parse_args()
# example of a binary path: $HOME/rc/ardupilot/build/sitl-debug/bin/arducopter-quad
# example of a defaults path: $HOME/rc/ardupilot/Tools/autotest/copter_params.parm
sitls = []
hub_thread = None
target_systems = []
hub_connection_strings = []
vehicle_setup = [{}, {}]
vehicle_setup[0] = {
"lat": -35.363261,
"lon": 149.165230,
"target-lat": -35.363261 + 0.00064,
"target-lon": 149.165230,
}
vehicle_setup[1] = {
"lat": vehicle_setup[0]["lat"] + 0.00064,
"lon": vehicle_setup[0]["lon"] + 0.00032,
# "target-lat": -35.363261 + 1.25*0.00120,
"target-lat": -35.363261 + -1.25*0.00120,
# "target-lat": -35.363261 + 0.00032,
"target-lon": 149.165230 - 0.5*0.00128,
}
# ready data for many dronekit-sitl processes
simulation_count = 2
for i in range(0,simulation_count):
print("Creating simulator (SITL) %d" % (i,))
from dronekit_sitl import SITL
sitl = SITL(instance=i, path=args.binary, defaults_filepath=args.defaults, gdb=True)
# sitl.download('copter', '3.3', verbose=True)
lat = -35.363261 + i*0.00128
lat = vehicle_setup[i]["lat"]
lon = vehicle_setup[i]["lon"]
sitl_args = ['--model', 'quad', '--home=%s,%s,584,353' % (lat,lon,) ]
sitls.append([sitl, sitl_args])
hub_connection_strings.append(sitl.connection_string())
# start the SITLs one at a time, giving each a unique SYSID_THISMAV
def change_sysid_target(i, new_sysid, connection_string):
print("%d: Launching SITL (%s)" % (i,str(sitls[i][1])))
sitls[i][0].launch(sitls[i][1], await_ready=True, verbose=True, speedup=50,)
print("%d: Connecting to its vehicle 1" % (i,))
vehicle = dronekit.connect(connection_string, wait_ready=True, target_system=1)
while vehicle.parameters["SYSID_THISMAV"] != new_sysid:
print("%d: Resetting its SYID_THISMAV to %d" % (i, new_sysid,))
vehicle.parameters["SYSID_THISMAV"] = new_sysid
time.sleep(0.1)
# set avoidance behaviour to RTL:
if i == 0:
# enable ADSB:
vehicle.parameters["ADSB_ENABLE"] = 1
# enable avoidance:
vehicle.parameters["AVD_ENABLE"] = 1
# set the warn radius down to let us see everything on a reasonable scale:
vehicle.parameters["AVD_W_DIST_XY"] = 30
vehicle.parameters["AVD_W_DIST_Z"] = 20
# set the warn horizon down to let us see everything on a reasonable scale:
vehicle.parameters["AVD_W_TIME"] = 10
# set the fail radius down to let us see everything on a reasonable scale:
vehicle.parameters["AVD_F_DIST_XY"] = 20
vehicle.parameters["AVD_F_DIST_Z"] = 10
# set the time horizon down to let us see everything on a reasonable scale:
vehicle.parameters["AVD_F_TIME"] = 10
vehicle.parameters["AVD_W_ACTION"] = mavutil.mavlink.MAV_COLLISION_ACTION_REPORT
vehicle.parameters["AVD_W_RCVRY"] = 1
vehicle.parameters["AVD_F_RCVRY"] = 1
if args.resolution == "RTL":
vehicle.parameters["AVD_F_ACTION"] = mavutil.mavlink.MAV_COLLISION_ACTION_RTL
elif args.resolution == "PERPENDICULAR":
vehicle.parameters["AVD_F_ACTION"] = mavutil.mavlink.MAV_COLLISION_ACTION_MOVE_PERPENDICULAR
elif args.resolution == "TCAS":
vehicle.parameters["AVD_F_ACTION"] = mavutil.mavlink.MAV_COLLISION_ACTION_TCAS
print("%d: Allowing time for parameter write" % (i,))
time.sleep(2)
print("%d: Stop" % (i,))
sitls[i][0].stop()
vehicle.disconnect()
change_sysid_threads = []
for i in range(0,len(sitls)):
new_sysid = len(sitls)-i+1
change_sysid_threads.append(threading.Thread(target=change_sysid_target,
args=(i,new_sysid, hub_connection_strings[i])))
change_sysid_threads[-1].start()
target_systems.append(new_sysid)
# mav.remove_message_listener(vehicle)
time.sleep(1) # mavlink not thread safe...
for thread in change_sysid_threads:
print("Waiting for thread...")
thread.join()
print("Sleeping a little to let SITLs go away...")
time.sleep(2)
print("Launching SITLs")
for i in range(0,len(sitls)):
sitl = sitls[i][0]
sitl_args = sitls[i][1]
sitl.launch(sitl_args, await_ready=True, restart=True, use_saved_data=True, wd=sitl.wd, verbose=True)
# create another connection so a GCS can be connected for visualisation
if args.extra_connection:
hub_connection_strings.append(args.extra_connection)
# dronekit-python (us!) port:
connection_string = "udpout:localhost:2345"
hub_connection_strings.append("udpin:localhost:2345")
hub_should_quit = False
def mavlink_hub_target():
print("Hub thread starting")
hub = mavlink_hub.MAVLinkHub(hub_connection_strings)
hub.init()
while not hub_should_quit:
hub.loop()
print("Hub quitting")
hub.connection_maintenance_target_should_live = False
print("Starting mavlink_hub thread")
hub_thread = threading.Thread(target=mavlink_hub_target)
hub_thread.start()
# Create a MAVLink connection:
print("Attempting connect to (%s)" % (connection_string,))
mav = dronekit.mavlink.MAVConnection(connection_string)
print("Connected")
vehicles = [None] * len(target_systems)
# Connect to the Vehicle
def vehicle_connect_target(mav, id, offset):
vehicle = dronekit.connect(mav, wait_ready=True, target_system=id)
vehicles[offset] = vehicle
offset = 0
connect_threads = []
for id in target_systems:
print('Connecting to vehicle with system ID (%d)' % id)
connect_threads.append(threading.Thread(target=vehicle_connect_target,
args=(mav,id,offset)))
connect_threads[-1].start()
offset += 1
time.sleep(2) # mav is not thread safe?!
for thread in connect_threads:
print("Waiting for connect thread...")
thread.join()
def dump_vehicle_state(vehicle):
print(" Autopilot Firmware version: %s" % vehicle.version)
print(" Major version number: %s" % vehicle.version.major)
print(" Minor version number: %s" % vehicle.version.minor)
print(" Patch version number: %s" % vehicle.version.patch)
print(" Release type: %s" % vehicle.version.release_type())
print(" Release version: %s" % vehicle.version.release_version())
print(" Stable release?: %s" % vehicle.version.is_stable())
for vehicle in vehicles:
dump_vehicle_state(vehicle)
def print_collision_message(conn, name , m):
print("Got collision message: %s" % (str(m),))
vehicles[0].add_message_listener('COLLISION', print_collision_message)
mav_lock = threading.Lock() # FIXME; move locking into MAVSystem
def arm_and_takeoff(vehicle, aTargetAltitude):
"""
Arms vehicle and fly to aTargetAltitude.
"""
print("Basic pre-arm checks")
# Don't try to arm until autopilot is ready
while not vehicle.is_armable:
print(" Waiting for vehicle to initialise...")
time.sleep(1)
# Copter should arm in GUIDED mode
while vehicle.mode != 'GUIDED':
print("Setting mode GUIDED")
mav_lock.acquire()
vehicle.mode = VehicleMode("GUIDED")
mav_lock.release()
time.sleep(0.5)
# Confirm vehicle armed before attempting to take off
while not vehicle.armed:
print("Arming motors")
mav_lock.acquire()
vehicle.armed = True
mav_lock.release()
time.sleep(0.5)
print("Taking off!")
mav_lock.acquire()
vehicle.simple_takeoff(aTargetAltitude) # Take off to target altitude
mav_lock.release()
# Wait until the vehicle reaches a safe height before processing the goto (otherwise the command
# after Vehicle.simple_takeoff will execute immediately).
while True:
print(" Mode: %s Altitude: %f" % (str(vehicle.mode), vehicle.location.global_relative_frame.alt,))
#Break and return from function just below target altitude.
if vehicle.location.global_relative_frame.alt>=aTargetAltitude*0.95:
print("Reached target altitude")
break
time.sleep(1)
# sleep so we can see the change in map
def watch_things_for_a_while(duration=30):
start = time.time()
while time.time() - start < duration:
for i in range(0,len(vehicles)):
vehicle = vehicles[i]
print("Vehicle %d (%s) Lat=%f Lon=%f Alt=%f MODE=%s Hdg=%f" % (target_systems[i], vehicle.mode, vehicle.location.global_frame.lat, vehicle.location.global_frame.lon, vehicle.location.global_relative_frame.alt, str(vehicle.mode), vehicle.heading))
time.sleep(1)
def vehicle_launcher_target(i):
arm_and_takeoff(vehicles[i], 5)
def launch_all_vehicles():
launcher_threads = []
for i in range(0,len(vehicles)):
print("Launching Vehicle %d:" % (target_systems[i],))
launcher_threads.append(threading.Thread(target=vehicle_launcher_target,
args=(i,)))
launcher_threads[-1].start()
for thread in launcher_threads:
print("Waiting for launcher thread...")
thread.join()
speed = 2
for i in range(0,len(vehicles)):
print("Set default/target airspeed to %d on vehicle %d (%s)" % (target_systems[i],speed, str(target_systems[i])))
vehicles[i].airspeed = speed
speed += 1
def oblique_chase():
print("Starting oblique chase")
bring_vehicles_to_altitudes([ 10, 5 ])
for i in range(0,len(vehicles)):
print("vehicle %d: doing simple_goto ..." % (target_systems[i],))
point = LocationGlobalRelative(vehicle_setup[i]["target-lat"], vehicle_setup[i]["target-lon"], 20)
vehicles[i].simple_goto(point)
watch_things_for_a_while(60)
for i in range(0, len(vehicles)):
while vehicles[i].mode != "RTL":
print("Setting returning to Launch - vehicle %d" % (target_systems[i],))
vehicles[i].mode = VehicleMode("RTL")
time.sleep(0.1)
watch_things_for_a_while(duration=60)
print("Finished oblique chase")
def bring_vehicles_to_altitudes(altitudes):
print("having vehicles climb a bit")
while True:
all_done = True
for i in range(0, len(vehicles)):
v_location = vehicles[i].location.global_relative_frame
print("%d alt: %f" % (i, v_location.alt))
if v_location.alt < altitudes[i]-2:
v_location.alt = altitudes[i]
vehicles[i].simple_goto(v_location)
all_done = False
if all_done:
break
time.sleep(1)
def vertical_separation():
bring_vehicles_to_altitudes([ 5, 30 ])
def should_see_no_collision_msgs(conn, name , m):
print("UNEXPECTED collision message: %s" % (str(m),))
vehicles[0].add_message_listener('COLLISION', should_see_no_collision_msgs)
for i in range(0,len(vehicles)):
print("vehicle %d going towards first point for 30 seconds ..." % (target_systems[i],))
point = LocationGlobalRelative(vehicle_setup[i]["target-lat"], vehicle_setup[i]["target-lon"], altitudes[i])
vehicles[i].simple_goto(point)
watch_things_for_a_while(60)
for i in range(0, len(vehicles)):
while vehicles[i].mode != "RTL":
print("Setting returning to Launch - vehicle %d" % (target_systems[i],))
vehicles[i].mode = VehicleMode("RTL")
time.sleep(0.1)
watch_things_for_a_while(duration=60)
launch_all_vehicles()
oblique_chase()
# vertical_separation()
print("All done")
if hub_thread is not None:
hub_should_quit = True
hub_thread.join()