-
Notifications
You must be signed in to change notification settings - Fork 17.2k
/
common.py
807 lines (706 loc) · 30.3 KB
/
common.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
from __future__ import print_function
import abc
import math
import os
import shutil
import sys
import time
from pymavlink import mavwp, mavutil
from pysim import util, vehicleinfo
# a list of pexpect objects to read while waiting for
# messages. This keeps the output to stdout flowing
expect_list = []
# get location of scripts
testdir = os.path.dirname(os.path.realpath(__file__))
# Check python version for abstract base class
if sys.version_info[0] >= 3 and sys.version_info[1] >= 4:
ABC = abc.ABC
else:
ABC = abc.ABCMeta('ABC', (), {})
class ErrorException(Exception):
"""Base class for other exceptions"""
pass
class AutoTestTimeoutException(ErrorException):
pass
class WaitModeTimeout(AutoTestTimeoutException):
"""Thrown when fails to achieve given mode change."""
pass
class WaitAltitudeTimout(AutoTestTimeoutException):
"""Thrown when fails to achieve given altitude range."""
pass
class WaitGroundSpeedTimeout(AutoTestTimeoutException):
"""Thrown when fails to achieve given ground speed range."""
pass
class WaitRollTimeout(AutoTestTimeoutException):
"""Thrown when fails to achieve given roll in degrees."""
pass
class WaitPitchTimeout(AutoTestTimeoutException):
"""Thrown when fails to achieve given pitch in degrees."""
pass
class WaitHeadingTimeout(AutoTestTimeoutException):
"""Thrown when fails to achieve given heading."""
pass
class WaitDistanceTimeout(AutoTestTimeoutException):
"""Thrown when fails to attain distance"""
pass
class WaitLocationTimeout(AutoTestTimeoutException):
"""Thrown when fails to attain location"""
pass
class WaitWaypointTimeout(AutoTestTimeoutException):
"""Thrown when fails to attain waypoint ranges"""
pass
class SetRCTimeout(AutoTestTimeoutException):
"""Thrown when fails to send RC commands"""
pass
class MsgRcvTimeoutException(AutoTestTimeoutException):
"""Thrown when fails to receive an expected message"""
pass
class NotAchievedException(ErrorException):
"""Thrown when fails to achieve a goal"""
pass
class PreconditionFailedException(ErrorException):
"""Thrown when a precondition for a test is not met"""
pass
class AutoTest(ABC):
"""Base abstract class.
It implements the common function for all vehicle types.
"""
def __init__(self,
viewerip=None,
use_map=False):
self.mavproxy = None
self.mav = None
self.viewerip = viewerip
self.use_map = use_map
def progress(self, text):
"""Display autotest progress text."""
print("AUTOTEST: " + text)
# following two functions swiped from autotest.py:
def buildlogs_dirpath(self):
return os.getenv("BUILDLOGS", util.reltopdir("../buildlogs"))
def buildlogs_path(self, path):
'''return a string representing path in the buildlogs directory'''
bits = [self.buildlogs_dirpath()]
if isinstance(path, list):
bits.extend(path)
else:
bits.append(path)
return os.path.join(*bits)
def sitl_streamrate(self):
'''allow subclasses to override SITL streamrate'''
return 10
def mavproxy_options(self):
'''returns options to be passed to MAVProxy'''
ret = ['--sitl=127.0.0.1:5501',
'--out=127.0.0.1:19550',
'--streamrate=%u' % self.sitl_streamrate()]
if self.viewerip:
ret.append("--out=%s:14550" % self.viewerip)
if self.use_map:
ret.append('--map')
return ret
def vehicleinfo_key(self):
return self.log_name
def apply_parameters_using_sitl(self):
'''start SITL, apply parameter file, stop SITL'''
sitl = util.start_SITL(self.binary,
wipe=True,
model=self.frame,
home=self.home,
speedup=self.speedup_default)
self.mavproxy = util.start_MAVProxy_SITL(self.log_name)
self.progress("WAITING FOR PARAMETERS")
self.mavproxy.expect('Received [0-9]+ parameters')
# setup test parameters
vinfo = vehicleinfo.VehicleInfo()
if self.params is None:
frames = vinfo.options[self.vehicleinfo_key()]["frames"]
self.params = frames[self.frame]["default_params_filename"]
if not isinstance(self.params, list):
self.params = [self.params]
for x in self.params:
self.mavproxy.send("param load %s\n" % os.path.join(testdir, x))
self.mavproxy.expect('Loaded [0-9]+ parameters')
self.set_parameter('LOG_REPLAY', 1)
self.set_parameter('LOG_DISARMED', 1)
# kill this SITL instance off:
util.pexpect_close(self.mavproxy)
util.pexpect_close(sitl)
self.mavproxy = None
def close(self):
'''tidy up after running all tests'''
if self.use_map:
self.mavproxy.send("module unload map\n")
self.mavproxy.expect("Unloaded module map")
self.mav.close()
util.pexpect_close(self.mavproxy)
util.pexpect_close(self.sitl)
valgrind_log = util.valgrind_log_filepath(binary=self.binary,
model=self.frame)
if os.path.exists(valgrind_log):
os.chmod(valgrind_log, 0o644)
shutil.copy(valgrind_log,
self.buildlogs_path("%s-valgrind.log" %
self.log_name))
#################################################
# GENERAL UTILITIES
#################################################
def expect_list_clear(self):
"""clear the expect list."""
global expect_list
for p in expect_list[:]:
expect_list.remove(p)
def expect_list_extend(self, list_to_add):
"""Extend the expect list."""
global expect_list
expect_list.extend(list_to_add)
def idle_hook(self, mav):
"""Called when waiting for a mavlink message."""
global expect_list
for p in expect_list:
util.pexpect_drain(p)
def message_hook(self, mav, msg):
"""Called as each mavlink msg is received."""
self.idle_hook(mav)
def expect_callback(self, e):
"""Called when waiting for a expect pattern."""
global expect_list
for p in expect_list:
if p == e:
continue
util.pexpect_drain(p)
#################################################
# SIM UTILITIES
#################################################
def get_sim_time(self):
"""Get SITL time."""
m = self.mav.recv_match(type='SYSTEM_TIME', blocking=True)
return m.time_boot_ms * 1.0e-3
def sim_location(self):
"""Return current simulator location."""
m = self.mav.recv_match(type='SIMSTATE', blocking=True)
return mavutil.location(m.lat*1.0e-7,
m.lng*1.0e-7,
0,
math.degrees(m.yaw))
def save_wp(self):
"""Trigger RC 7 to save waypoint."""
self.mavproxy.send('rc 7 1000\n')
self.mav.recv_match(condition='RC_CHANNELS.chan7_raw==1000',
blocking=True)
self.wait_seconds(1)
self.mavproxy.send('rc 7 2000\n')
self.mav.recv_match(condition='RC_CHANNELS.chan7_raw==2000',
blocking=True)
self.wait_seconds(1)
self.mavproxy.send('rc 7 1000\n')
self.mav.recv_match(condition='RC_CHANNELS.chan7_raw==1000',
blocking=True)
self.wait_seconds(1)
def log_download(self, filename, timeout=360):
"""Download latest log."""
self.disarm_vehicle()
self.mav.wait_heartbeat()
self.mavproxy.send("log list\n")
self.mavproxy.expect("numLogs")
self.mav.wait_heartbeat()
self.mav.wait_heartbeat()
self.mavproxy.send("set shownoise 0\n")
self.mavproxy.send("log download latest %s\n" % filename)
self.mavproxy.expect("Finished downloading", timeout=timeout)
self.mav.wait_heartbeat()
self.mav.wait_heartbeat()
def show_gps_and_sim_positions(self, on_off):
"""Allow to display gps and actual position on map."""
if on_off is True:
# turn on simulator display of gps and actual position
self.mavproxy.send('map set showgpspos 1\n')
self.mavproxy.send('map set showsimpos 1\n')
else:
# turn off simulator display of gps and actual position
self.mavproxy.send('map set showgpspos 0\n')
self.mavproxy.send('map set showsimpos 0\n')
def mission_count(self, filename):
"""Load a mission from a file and return number of waypoints."""
wploader = mavwp.MAVWPLoader()
wploader.load(filename)
num_wp = wploader.count()
return num_wp
def load_mission_from_file(self, filename):
"""Load a mission from a file to flight controller."""
self.mavproxy.send('wp load %s\n' % filename)
self.mavproxy.expect('Flight plan received')
self.mavproxy.send('wp list\n')
self.mavproxy.expect('Requesting [0-9]+ waypoints')
# update num_wp
wploader = mavwp.MAVWPLoader()
wploader.load(filename)
num_wp = wploader.count()
return num_wp
def save_mission_to_file(self, filename):
"""Save a mission to a file"""
self.mavproxy.send('wp save %s\n' % filename)
self.mavproxy.expect('Saved ([0-9]+) waypoints')
num_wp = int(self.mavproxy.match.group(1))
self.progress("num_wp: %d" % num_wp)
return num_wp
def set_rc_default(self):
"""Setup all simulated RC control to 1500."""
for chan in range(1, 16):
self.mavproxy.send('rc %u 1500\n' % chan)
def set_rc(self, chan, pwm, timeout=5):
"""Setup a simulated RC control to a PWM value"""
tstart = self.get_sim_time()
while self.get_sim_time() < tstart + timeout:
self.mavproxy.send('rc %u %u\n' % (chan, pwm))
m = self.mav.recv_match(type='RC_CHANNELS', blocking=True)
chan_pwm = getattr(m, "chan" + str(chan) + "_raw")
if chan_pwm == pwm:
return True
self.progress("Failed to send RC commands to channel %s" % str(chan))
raise SetRCTimeout()
def armed(self):
'''Return true if vehicle is armed and safetyoff'''
return self.mav.motors_armed();
def arm_vehicle(self):
"""Arm vehicle with mavlink arm message."""
self.mavproxy.send('arm throttle\n')
self.mav.motors_armed_wait()
self.progress("ARMED")
return True
def disarm_vehicle(self):
"""Disarm vehicle with mavlink disarm message."""
self.mavproxy.send('disarm\n')
self.mav.motors_disarmed_wait()
self.progress("DISARMED")
return True
def set_parameter(self, name, value):
for i in range(1, 10):
self.mavproxy.send("param set %s %s\n" % (name, str(value)))
self.mavproxy.send("param fetch %s\n" % name)
self.mavproxy.expect("%s = (.*)" % (name,))
returned_value = self.mavproxy.match.group(1)
if float(returned_value) == float(value):
# yes, exactly equal.
break
self.progress("Param fetch returned incorrect value (%s) vs (%s)"
% (returned_value, value))
def get_parameter(self, name):
self.mavproxy.send("param fetch %s\n" % name)
self.mavproxy.expect("%s = ([0-9.]*)" % (name,))
return float(self.mavproxy.match.group(1))
#################################################
# UTILITIES
#################################################
@staticmethod
def get_distance(loc1, loc2):
"""Get ground distance between two locations."""
dlat = loc2.lat - loc1.lat
dlong = loc2.lng - loc1.lng
return math.sqrt((dlat*dlat) + (dlong*dlong)) * 1.113195e5
@staticmethod
def get_bearing(loc1, loc2):
"""Get bearing from loc1 to loc2."""
off_x = loc2.lng - loc1.lng
off_y = loc2.lat - loc1.lat
bearing = 90.00 + math.atan2(-off_y, off_x) * 57.2957795
if bearing < 0:
bearing += 360.00
return bearing
def do_get_autopilot_capabilities(self):
self.mavproxy.send("long REQUEST_AUTOPILOT_CAPABILITIES 1\n")
m = self.mav.recv_match(type='AUTOPILOT_VERSION',
blocking=True,
timeout=10)
if m is None:
self.progress("AUTOPILOT_VERSION not received")
raise NotAchievedException()
self.progress("AUTOPILOT_VERSION received")
def do_set_mode_via_command_long(self):
base_mode = mavutil.mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED
custom_mode = 4 # hold
start = time.time()
while time.time() - start < 5:
self.mavproxy.send("long DO_SET_MODE %u %u\n" %
(base_mode, custom_mode))
m = self.mav.recv_match(type='HEARTBEAT',
blocking=True,
timeout=10)
if m is None:
raise ErrorException()
if m.custom_mode == custom_mode:
return
time.sleep(0.1)
return AutoTestTimeoutException()
def reach_heading_manual(self, heading):
"""Manually direct the vehicle to the target heading."""
if self.mav.mav_type in [mavutil.mavlink.MAV_TYPE_QUADROTOR,
mavutil.mavlink.MAV_TYPE_HELICOPTER,
mavutil.mavlink.MAV_TYPE_HEXAROTOR,
mavutil.mavlink.MAV_TYPE_OCTOROTOR,
mavutil.mavlink.MAV_TYPE_COAXIAL,
mavutil.mavlink.MAV_TYPE_TRICOPTER]:
self.mavproxy.send('rc 4 1580\n')
self.wait_heading(heading)
self.mavproxy.send('rc 4 1500\n')
self.mav.recv_match(condition='RC_CHANNELS.chan4_raw==1500',
blocking=True)
if self.mav.mav_type == mavutil.mavlink.MAV_TYPE_FIXED_WING:
self.progress("NOT IMPLEMENTED")
if self.mav.mav_type == mavutil.mavlink.MAV_TYPE_GROUND_ROVER:
self.mavproxy.send('rc 1 1700\n')
self.mavproxy.send('rc 3 1550\n')
self.wait_heading(heading)
self.mavproxy.send('rc 3 1500\n')
self.mav.recv_match(condition='RC_CHANNELS.chan3_raw==1500',
blocking=True)
self.mavproxy.send('rc 1 1500\n')
self.mav.recv_match(condition='RC_CHANNELS.chan1_raw==1500',
blocking=True)
def reach_distance_manual(self, distance):
"""Manually direct the vehicle to the target distance from home."""
if self.mav.mav_type in [mavutil.mavlink.MAV_TYPE_QUADROTOR,
mavutil.mavlink.MAV_TYPE_HELICOPTER,
mavutil.mavlink.MAV_TYPE_HEXAROTOR,
mavutil.mavlink.MAV_TYPE_OCTOROTOR,
mavutil.mavlink.MAV_TYPE_COAXIAL,
mavutil.mavlink.MAV_TYPE_TRICOPTER]:
self.mavproxy.send('rc 2 1350\n')
self.wait_distance(distance, accuracy=5, timeout=60)
self.mavproxy.send('rc 2 1500\n')
self.mav.recv_match(condition='RC_CHANNELS.chan2_raw==1500',
blocking=True)
if self.mav.mav_type == mavutil.mavlink.MAV_TYPE_FIXED_WING:
self.progress("NOT IMPLEMENTED")
if self.mav.mav_type == mavutil.mavlink.MAV_TYPE_GROUND_ROVER:
self.mavproxy.send('rc 3 1700\n')
self.wait_distance(distance, accuracy=2)
self.mavproxy.send('rc 3 1500\n')
self.mav.recv_match(condition='RC_CHANNELS.chan3_raw==1500',
blocking=True)
#################################################
# WAIT UTILITIES
#################################################
def wait_seconds(self, seconds_to_wait):
"""Wait some second in SITL time."""
tstart = self.get_sim_time()
tnow = tstart
while tstart + seconds_to_wait > tnow:
tnow = self.get_sim_time()
def wait_altitude(self, alt_min, alt_max, timeout=30):
"""Wait for a given altitude range."""
climb_rate = 0
previous_alt = 0
tstart = self.get_sim_time()
self.progress("Waiting for altitude between %u and %u" %
(alt_min, alt_max))
while self.get_sim_time() < tstart + timeout:
m = self.mav.recv_match(type='VFR_HUD', blocking=True)
climb_rate = m.alt - previous_alt
previous_alt = m.alt
self.progress("Wait Altitude: Cur:%u, min_alt:%u, climb_rate: %u"
% (m.alt, alt_min, climb_rate))
if m.alt >= alt_min and m.alt <= alt_max:
self.progress("Altitude OK")
return True
self.progress("Failed to attain altitude range")
raise WaitAltitudeTimout()
def wait_groundspeed(self, gs_min, gs_max, timeout=30):
"""Wait for a given ground speed range."""
tstart = self.get_sim_time()
self.progress("Waiting for groundspeed between %.1f and %.1f" %
(gs_min, gs_max))
while self.get_sim_time() < tstart + timeout:
m = self.mav.recv_match(type='VFR_HUD', blocking=True)
self.progress("Wait groundspeed %.1f, target:%.1f" %
(m.groundspeed, gs_min))
if m.groundspeed >= gs_min and m.groundspeed <= gs_max:
return True
self.progress("Failed to attain groundspeed range")
raise WaitGroundSpeedTimeout()
def wait_roll(self, roll, accuracy, timeout=30):
"""Wait for a given roll in degrees."""
tstart = self.get_sim_time()
self.progress("Waiting for roll of %d at %s" % (roll, time.ctime()))
while self.get_sim_time() < tstart + timeout:
m = self.mav.recv_match(type='ATTITUDE', blocking=True)
p = math.degrees(m.pitch)
r = math.degrees(m.roll)
self.progress("Roll %d Pitch %d" % (r, p))
if math.fabs(r - roll) <= accuracy:
self.progress("Attained roll %d" % roll)
return True
self.progress("Failed to attain roll %d" % roll)
raise WaitRollTimeout()
def wait_pitch(self, pitch, accuracy, timeout=30):
"""Wait for a given pitch in degrees."""
tstart = self.get_sim_time()
self.progress("Waiting for pitch of %u at %s" % (pitch, time.ctime()))
while self.get_sim_time() < tstart + timeout:
m = self.mav.recv_match(type='ATTITUDE', blocking=True)
p = math.degrees(m.pitch)
r = math.degrees(m.roll)
self.progress("Pitch %d Roll %d" % (p, r))
if math.fabs(p - pitch) <= accuracy:
self.progress("Attained pitch %d" % pitch)
return True
self.progress("Failed to attain pitch %d" % pitch)
raise WaitPitchTimeout()
def wait_heading(self, heading, accuracy=5, timeout=30):
"""Wait for a given heading."""
tstart = self.get_sim_time()
self.progress("Waiting for heading %u with accuracy %u" %
(heading, accuracy))
while self.get_sim_time() < tstart + timeout:
m = self.mav.recv_match(type='VFR_HUD', blocking=True)
self.progress("Heading %u" % m.heading)
if math.fabs(m.heading - heading) <= accuracy:
self.progress("Attained heading %u" % heading)
return True
self.progress("Failed to attain heading %u" % heading)
raise WaitHeadingTimeout()
def wait_distance(self, distance, accuracy=5, timeout=30):
"""Wait for flight of a given distance."""
tstart = self.get_sim_time()
start = self.mav.location()
while self.get_sim_time() < tstart + timeout:
pos = self.mav.location()
delta = self.get_distance(start, pos)
self.progress("Distance %.2f meters" % delta)
if math.fabs(delta - distance) <= accuracy:
self.progress("Attained distance %.2f meters OK" % delta)
return True
if delta > (distance + accuracy):
self.progress("Failed distance - overshoot delta=%f dist=%f"
% (delta, distance))
raise WaitDistanceTimeout()
self.progress("Failed to attain distance %u" % distance)
raise WaitDistanceTimeout()
def wait_location(self,
loc,
accuracy=5,
timeout=30,
target_altitude=None,
height_accuracy=-1):
"""Wait for arrival at a location."""
tstart = self.get_sim_time()
if target_altitude is None:
target_altitude = loc.alt
self.progress("Waiting for location"
"%.4f,%.4f at altitude %.1f height_accuracy=%.1f" %
(loc.lat, loc.lng, target_altitude, height_accuracy))
while self.get_sim_time() < tstart + timeout:
pos = self.mav.location()
delta = self.get_distance(loc, pos)
self.progress("Distance %.2f meters alt %.1f" % (delta, pos.alt))
if delta <= accuracy:
height_delta = math.fabs(pos.alt - target_altitude)
if (height_accuracy != -1 and height_delta > height_accuracy):
continue
self.progress("Reached location (%.2f meters)" % delta)
return True
self.progress("Failed to attain location")
raise WaitLocationTimeout()
def wait_waypoint(self,
wpnum_start,
wpnum_end,
allow_skip=True,
max_dist=2,
timeout=400):
"""Wait for waypoint ranges."""
tstart = self.get_sim_time()
# this message arrives after we set the current WP
start_wp = self.mav.waypoint_current()
current_wp = start_wp
mode = self.mav.flightmode
self.progress("\ntest: wait for waypoint ranges start=%u end=%u\n\n"
% (wpnum_start, wpnum_end))
# if start_wp != wpnum_start:
# self.progress("test: Expected start waypoint %u but got %u" %
# (wpnum_start, start_wp))
# raise WaitWaypointTimeout()
while self.get_sim_time() < tstart + timeout:
seq = self.mav.waypoint_current()
m = self.mav.recv_match(type='NAV_CONTROLLER_OUTPUT',
blocking=True)
wp_dist = m.wp_dist
m = self.mav.recv_match(type='VFR_HUD', blocking=True)
# if we changed mode, fail
if self.mav.flightmode != mode:
self.progress('Exited %s mode' % mode)
raise WaitWaypointTimeout()
self.progress("test: WP %u (wp_dist=%u Alt=%d), current_wp: %u,"
"wpnum_end: %u" %
(seq, wp_dist, m.alt, current_wp, wpnum_end))
if seq == current_wp+1 or (seq > current_wp+1 and allow_skip):
self.progress("test: Starting new waypoint %u" % seq)
tstart = self.get_sim_time()
current_wp = seq
# the wp_dist check is a hack until we can sort out
# the right seqnum for end of mission
# if current_wp == wpnum_end or (current_wp == wpnum_end-1 and
# wp_dist < 2):
if current_wp == wpnum_end and wp_dist < max_dist:
self.progress("Reached final waypoint %u" % seq)
return True
if seq >= 255:
self.progress("Reached final waypoint %u" % seq)
return True
if seq > current_wp+1:
self.progress("Failed: Skipped waypoint! Got wp %u expected %u"
% (seq, current_wp+1))
raise WaitWaypointTimeout()
self.progress("Failed: Timed out waiting for waypoint %u of %u" %
(wpnum_end, wpnum_end))
raise WaitWaypointTimeout()
def wait_mode(self, mode, timeout=None):
"""Wait for mode to change."""
mode_map = self.mav.mode_mapping()
if mode_map is None or mode not in mode_map:
self.progress("Unknown mode '%s'" % mode)
self.progress("Available modes '%s'" % mode_map.keys())
raise ErrorException()
self.progress("Waiting for mode %s" % mode)
tstart = self.get_sim_time()
hastimeout = False
while self.mav.flightmode != mode and not hastimeout:
if timeout is not None:
hastimeout = self.get_sim_time() > tstart + timeout
self.mav.wait_heartbeat()
self.progress("Got mode %s" % mode)
if self.mav.flightmode != mode and hastimeout:
raise WaitModeTimeout()
return True
def wait_ready_to_arm(self, timeout=None):
# wait for EKF checks to pass
return self.wait_ekf_happy(timeout=timeout)
def wait_ekf_happy(self, timeout=30):
"""Wait for EKF to be happy"""
tstart = self.get_sim_time()
required_value = 831
self.progress("Waiting for EKF value %u" % required_value)
while timeout is None or self.get_sim_time() < tstart + timeout:
m = self.mav.recv_match(type='EKF_STATUS_REPORT', blocking=True)
current = m.flags
if (tstart - self.get_sim_time()) % 5 == 0:
self.progress("Wait EKF.flags: required:%u current:%u" %
(required_value, current))
if current == required_value:
self.progress("EKF Flags OK")
return True
self.progress("Failed to get EKF.flags=%u" % required_value)
raise AutoTestTimeoutException()
def run_test(self, desc, function):
self.progress("#")
self.progress("########## %s ##########" % (desc))
self.progress("#")
try:
function()
except Exception as e:
self.progress('FAILED: "%s": %s' % (desc, repr(e)))
self.fail_list.append((desc, e))
return
self.progress('PASSED: "%s"' % desc)
@abc.abstractmethod
def init(self):
"""Initilialize autotest feature."""
pass
# def test_common_feature(self):
# """Common feature to test."""
# sucess = True
# # TEST ARMING/DISARM
# if not self.arm_vehicle():
# self.progress("Failed to ARM")
# sucess = False
# if not self.disarm_vehicle():
# self.progress("Failed to DISARM")
# sucess = False
# if not self.test_arm_motors_radio():
# self.progress("Failed to ARM with radio")
# sucess = False
# if not self.test_disarm_motors_radio():
# self.progress("Failed to ARM with radio")
# sucess = False
# if not self.test_autodisarm_motors():
# self.progress("Failed to AUTO DISARM")
# sucess = False
# # TODO: Test failure on arm (with arming check)
# # TEST MISSION FILE
# # TODO : rework that to work on autotest server
# # self.progress("TEST LOADING MISSION")
# # num_wp = self.load_mission_from_file(
# os.path.join(testdir, "fake_mission.txt"))
# # if num_wp == 0:
# # self.progress("Failed to load all_msg_mission")
# # sucess = False
# #
# # self.progress("TEST SAVING MISSION")
# # num_wp_old = num_wp
# # num_wp = self.save_mission_to_file(os.path.join(testdir,
# "fake_mission2.txt"))
# # if num_wp != num_wp_old:
# # self.progress("Failed to save all_msg_mission")
# # sucess = False
#
# self.progress("TEST CLEARING MISSION")
# self.mavproxy.send("wp clear\n")
# self.mavproxy.send('wp list\n')
# self.mavproxy.expect('Requesting [0-9]+ waypoints')
# num_wp = mavwp.MAVWPLoader().count()
# if num_wp != 0:
# self.progress("Failed to clear mission ")
# sucess = False
#
# return sucess
#
# # TESTS FAILSAFE
# @abc.abstractmethod
# def test_throttle_failsafe(self, home, distance_min=10, side=60,
# timeout=180):
# """Test that RTL success in case of thottle failsafe."""
# pass
#
# # TEST ARM RADIO
# @abc.abstractmethod
# def test_arm_motors_radio(self):
# """Test arming with RC sticks."""
# pass
#
# # TEST DISARM RADIO
# @abc.abstractmethod
# def test_disarm_motors_radio(self):
# """Test disarming with RC sticks."""
# pass
#
# # TEST AUTO DISARM
# @abc.abstractmethod
# def test_autodisarm_motors(self):
# """Test auto disarming."""
# pass
#
# # TEST RC OVERRIDE
# # TEST RC OVERRIDE TIMEOUT
# @abc.abstractmethod
# def test_rtl(self, home, distance_min=10, timeout=250):
# """Test that RTL success."""
# self.progress("# Enter RTL")
# self.mavproxy.send('switch 3\n')
# tstart = self.get_sim_time()
# while self.get_sim_time() < tstart + timeout:
# m = self.mav.recv_match(type='VFR_HUD', blocking=True)
# pos = self.mav.location()
# home_distance = self.get_distance(home, pos)
# self.progress("Alt: %u HomeDistance: %.0f" %
# (m.alt, home_distance))
# if m.alt <= 1 and home_distance < distance_min:
# self.progress("RTL Complete")
# return True
# return False
#
# @abc.abstractmethod
# def test_mission(self, filename):
# pass
@abc.abstractmethod
def autotest(self):
"""Autotest used by ArduPilot autotest CI."""
pass