-
Notifications
You must be signed in to change notification settings - Fork 0
/
t265_to_mavlink_as.py
446 lines (363 loc) · 17.1 KB
/
t265_to_mavlink_as.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
#!/usr/bin/env python3
#####################################################
## librealsense T265 to MAVLink ##
#####################################################
# This script assumes pyrealsense2.[].so file is found under the same directory as this script
# Install required packages:
# pip install pyrealsense2
# pip install transformations
# pip3 install dronekit
# pip3 install apscheduler
# Set the path for IDLE
import sys
sys.path.append("/usr/local/lib/")
# Set MAVLink protocol to 2.
import os
os.environ["MAVLINK20"] = "1"
# Import the libraries
import pyrealsense2 as rs
import numpy as np
import transformations as tf
import math as m
import time
import argparse
import threading
#from apscheduler.schedulers.background import BackgroundScheduler
from dronekit import connect, VehicleMode
from pymavlink import mavutil
#######################################
# Parameters
#######################################
# Default configurations for connection to the FCU
connection_string_default = '/dev/ttyAMA0'
#connection_baudrate_default = 921600
connection_baudrate_default = 500000
vision_msg_hz_default = 30
#vision_msg_hz_default = 14
confidence_msg_hz_default = 1
camera_orientation_default = 0
# In NED frame, offset from the IMU or the center of gravity to the camera's origin point
body_offset_enabled = 1
body_offset_x = 0.11 # In meters (m), so 0.05 = 5cm
body_offset_y = -0.01 # In meters (m)
body_offset_z = 0.08 # In meters (m)
# Global scale factor, position x y z will be scaled up/down by this factor
scale_factor = 1.0
# Enable using yaw from compass to align north (zero degree is facing north)
compass_enabled = 0
# Default global position of home/ origin
home_lat = 425633500 # Terni
home_lon = 126432900 # Terni
home_alt = 163000
vehicle = None
pipe = None
# pose data confidence: 0x0 - Failed / 0x1 - Low / 0x2 - Medium / 0x3 - High
pose_data_confidence_level = ('Failed', 'Low', 'Medium', 'High')
#######################################
# Parsing user' inputs
#######################################
parser = argparse.ArgumentParser(description='Reboots vehicle')
parser.add_argument('--connect',
help="Vehicle connection target string. If not specified, a default string will be used.")
parser.add_argument('--baudrate', type=float,
help="Vehicle connection baudrate. If not specified, a default value will be used.")
parser.add_argument('--vision_msg_hz', type=float,
help="Update frequency for VISION_POSITION_ESTIMATE message. If not specified, a default value will be used.")
parser.add_argument('--confidence_msg_hz', type=float,
help="Update frequency for confidence level. If not specified, a default value will be used.")
parser.add_argument('--scale_calib_enable', type=bool,
help="Scale calibration. Only run while NOT in flight")
parser.add_argument('--camera_orientation', type=int,
help="Configuration for camera orientation. Currently supported: forward, usb port to the right - 0; downward, usb port to the right - 1")
parser.add_argument('--debug_enable',type=int,
help="Enable debug messages on terminal")
args = parser.parse_args()
connection_string = args.connect
connection_baudrate = args.baudrate
vision_msg_hz = args.vision_msg_hz
confidence_msg_hz = args.confidence_msg_hz
scale_calib_enable = args.scale_calib_enable
camera_orientation = args.camera_orientation
debug_enable = args.debug_enable
# Using default values if no specified inputs
if not connection_string:
connection_string = connection_string_default
print("INFO: Using default connection_string", connection_string)
else:
print("INFO: Using connection_string", connection_string)
if not connection_baudrate:
connection_baudrate = connection_baudrate_default
print("INFO: Using default connection_baudrate", connection_baudrate)
else:
print("INFO: Using connection_baudrate", connection_baudrate)
if not vision_msg_hz:
vision_msg_hz = vision_msg_hz_default
print("INFO: Using default vision_msg_hz", vision_msg_hz)
else:
print("INFO: Using vision_msg_hz", vision_msg_hz)
if not confidence_msg_hz:
confidence_msg_hz = confidence_msg_hz_default
print("INFO: Using default confidence_msg_hz", confidence_msg_hz)
else:
print("INFO: Using confidence_msg_hz", confidence_msg_hz)
if body_offset_enabled == 1:
print("INFO: Using camera position offset: Enabled, x y z is", body_offset_x, body_offset_y, body_offset_z)
else:
print("INFO: Using camera position offset: Disabled")
if compass_enabled == 1:
print("INFO: Using compass: Enabled. Heading will be aligned to north.")
else:
print("INFO: Using compass: Disabled")
if scale_calib_enable == True:
print("\nINFO: SCALE CALIBRATION PROCESS. DO NOT RUN DURING FLIGHT.\nINFO: TYPE IN NEW SCALE IN FLOATING POINT FORMAT\n")
else:
if scale_factor == 1.0:
print("INFO: Using default scale factor", scale_factor)
else:
print("INFO: Using scale factor", scale_factor)
if not camera_orientation:
camera_orientation = camera_orientation_default
print("INFO: Using default camera orientation", camera_orientation)
else:
print("INFO: Using camera orientation", camera_orientation)
# Transformation to convert different camera orientations to NED convention. Replace camera_orientation_default for your configuration.
# 0: Forward, USB port to the right
# 1: Downfacing, USB port to the right
# Important note for downfacing camera: you need to tilt the vehicle's nose up a little - not flat - before you run the script, otherwise the initial yaw will be randomized, read here for more details: https://github.com/IntelRealSense/librealsense/issues/4080. Tilt the vehicle to any other sides and the yaw might not be as stable.
if camera_orientation == 0:
# Forward, USB port to the right
H_aeroRef_T265Ref = np.array([[0,0,-1,0],[1,0,0,0],[0,-1,0,0],[0,0,0,1]])
H_T265body_aeroBody = np.linalg.inv(H_aeroRef_T265Ref)
elif camera_orientation == 1:
# Downfacing, USB port to the right
H_aeroRef_T265Ref = np.array([[0,0,-1,0],[1,0,0,0],[0,-1,0,0],[0,0,0,1]])
H_T265body_aeroBody = np.array([[0,1,0,0],[1,0,0,0],[0,0,-1,0],[0,0,0,1]])
else:
# Default is facing forward, USB port to the right
H_aeroRef_T265Ref = np.array([[0,0,-1,0],[1,0,0,0],[0,-1,0,0],[0,0,0,1]])
H_T265body_aeroBody = np.linalg.inv(H_aeroRef_T265Ref)
if not debug_enable:
debug_enable = 0
else:
debug_enable = 1
np.set_printoptions(precision=4, suppress=True) # Format output on terminal
print("INFO: Debug messages enabled.")
#######################################
# Functions
#######################################
# https://mavlink.io/en/messages/common.html#VISION_POSITION_ESTIMATE
def send_vision_position_message():
global current_time, H_aeroRef_aeroBody
if H_aeroRef_aeroBody is not None:
rpy_rad = np.array( tf.euler_from_matrix(H_aeroRef_aeroBody, 'sxyz'))
msg = vehicle.message_factory.vision_position_estimate_encode(
current_time, # us Timestamp (UNIX time or time since system boot)
H_aeroRef_aeroBody[0][3], # Global X position
H_aeroRef_aeroBody[1][3], # Global Y position
H_aeroRef_aeroBody[2][3], # Global Z position
rpy_rad[0], # Roll angle
rpy_rad[1], # Pitch angle
rpy_rad[2] # Yaw angle
)
vehicle.send_mavlink(msg)
vehicle.flush()
# For a lack of a dedicated message, we pack the confidence level into a message that will not be used, so we can view it on GCS
# Confidence level value: 0 - 3, remapped to 0 - 100: 0% - Failed / 33.3% - Low / 66.6% - Medium / 100% - High
def send_confidence_level_dummy_message():
global data, current_confidence
if data is not None:
# Show confidence level on terminal
print("INFO: Tracking confidence: ", pose_data_confidence_level[data.tracker_confidence])
# Send MAVLink message to show confidence level numerically
msg = vehicle.message_factory.vision_position_delta_encode(
0, #us Timestamp (UNIX time or time since system boot)
0, #Time since last reported camera frame
[0, 0, 0], #angle_delta
[0, 0, 0], #position_delta
float(data.tracker_confidence * 100 / 3)
)
vehicle.send_mavlink(msg)
vehicle.flush()
# If confidence level changes, send MAVLink message to show confidence level textually and phonetically
if current_confidence is None or current_confidence != data.tracker_confidence:
current_confidence = data.tracker_confidence
confidence_status_string = 'Tracking confidence: ' + pose_data_confidence_level[data.tracker_confidence]
status_msg = vehicle.message_factory.statustext_encode(
3, #severity, defined here: https://mavlink.io/en/messages/common.html#MAV_SEVERITY, 3 will let the message be displayed on Mission Planner HUD
confidence_status_string.encode() #text char[50]
)
vehicle.send_mavlink(status_msg)
vehicle.flush()
# Send a mavlink SET_GPS_GLOBAL_ORIGIN message (http://mavlink.org/messages/common#SET_GPS_GLOBAL_ORIGIN), which allows us to use local position information without a GPS.
def set_default_global_origin():
msg = vehicle.message_factory.set_gps_global_origin_encode(
int(vehicle._master.source_system),
home_lat,
home_lon,
home_alt
)
vehicle.send_mavlink(msg)
vehicle.flush()
# Send a mavlink SET_HOME_POSITION message (http://mavlink.org/messages/common#SET_HOME_POSITION), which allows us to use local position information without a GPS.
def set_default_home_position():
x = 0
y = 0
z = 0
q = [1, 0, 0, 0] # w x y z
approach_x = 0
approach_y = 0
approach_z = 1
msg = vehicle.message_factory.set_home_position_encode(
int(vehicle._master.source_system),
home_lat,
home_lon,
home_alt,
x,
y,
z,
q,
approach_x,
approach_y,
approach_z
)
vehicle.send_mavlink(msg)
vehicle.flush()
# Request a timesync update from the flight controller, for future work.
# TODO: Inspect the usage of timesync_update
def update_timesync(ts=0, tc=0):
if ts == 0:
ts = int(round(time.time() * 1000))
msg = vehicle.message_factory.timesync_encode(
tc, # tc1
ts # ts1
)
vehicle.send_mavlink(msg)
vehicle.flush()
# Listen to messages that indicate EKF is ready to set home, then set EKF home automatically.
def statustext_callback(self, attr_name, value):
# These are the status texts that indicates EKF is ready to receive home position
if value.text == "GPS Glitch" or value.text == "GPS Glitch cleared" or value.text == "EKF2 IMU1 ext nav yaw alignment complete":
time.sleep(0.1)
print("INFO: Set EKF home with default GPS location")
set_default_global_origin()
set_default_home_position()
# Listen to attitude data to acquire heading when compass data is enabled
def att_msg_callback(self, attr_name, value):
global heading_north_yaw
if heading_north_yaw is None:
heading_north_yaw = value.yaw
print("INFO: Received first ATTITUDE message with heading yaw", heading_north_yaw * 180 / m.pi, "degrees")
else:
heading_north_yaw = value.yaw
print("INFO: Received ATTITUDE message with heading yaw", heading_north_yaw * 180 / m.pi, "degrees")
def vehicle_connect():
global vehicle
try:
vehicle = connect(connection_string, wait_ready = True, baud = connection_baudrate, source_system = 1)
except:
print('Connection error! Retrying...')
if vehicle == None:
return False
else:
return True
def realsense_connect():
global pipe
# Declare RealSense pipeline, encapsulating the actual device and sensors
pipe = rs.pipeline()
# Build config object before requesting data
cfg = rs.config()
# Enable the stream we are interested in
cfg.enable_stream(rs.stream.pose) # Positional data
# Start streaming with requested config
pipe.start(cfg)
# Monitor user input from the terminal and update scale factor accordingly
def scale_update():
global scale_factor
while True:
scale_factor = float(input("INFO: Type in new scale as float number\n"))
print("INFO: New scale is ", scale_factor)
#######################################
# Main code starts here
#######################################
print("INFO: Connecting to Realsense camera.")
realsense_connect()
print("INFO: Realsense connected.")
print("INFO: Connecting to vehicle.")
while (not vehicle_connect()):
pass
print("INFO: Vehicle connected.")
# Listen to the mavlink messages that will be used as trigger to set EKF home automatically
vehicle.add_message_listener('STATUSTEXT', statustext_callback)
if compass_enabled == 1:
# Listen to the attitude data in aeronautical frame
vehicle.add_message_listener('ATTITUDE', att_msg_callback)
data = None
current_confidence = None
H_aeroRef_aeroBody = None
heading_north_yaw = None
# Send MAVlink messages in the background
#sched = BackgroundScheduler()
#sched.add_job(send_vision_position_message, 'interval', seconds = 1/vision_msg_hz)
#sched.add_job(send_confidence_level_dummy_message, 'interval', seconds = 1/confidence_msg_hz)
# For scale calibration, we will use a thread to monitor user input
if scale_calib_enable == True:
scale_update_thread = threading.Thread(target=scale_update)
scale_update_thread.daemon = True
scale_update_thread.start()
#sched.start()
if compass_enabled == 1:
# Wait a short while for yaw to be correctly initiated
time.sleep(1)
print("INFO: Sending VISION_POSITION_ESTIMATE messages to FCU.")
try:
last_vision_send = time.time()
last_confidence_send = time.time()
while True:
now = time.time()
# Wait for the next set of frames from the camera
frames = pipe.wait_for_frames()
# Fetch pose frame
pose = frames.get_pose_frame()
if now - last_confidence_send > (1/confidence_msg_hz_default):
send_confidence_level_dummy_message()
last_confidence_send = now;
if now - last_vision_send > (1/vision_msg_hz_default):
if pose:
# Store the timestamp for MAVLink messages
current_time = int(round(time.time() * 1000000))
# Pose data consists of translation and rotation
data = pose.get_pose_data()
# In transformations, Quaternions w+ix+jy+kz are represented as [w, x, y, z]!
H_T265Ref_T265body = tf.quaternion_matrix([data.rotation.w, data.rotation.x, data.rotation.y, data.rotation.z])
H_T265Ref_T265body[0][3] = data.translation.x * scale_factor
H_T265Ref_T265body[1][3] = data.translation.y * scale_factor
H_T265Ref_T265body[2][3] = data.translation.z * scale_factor
# Transform to aeronautic coordinates (body AND reference frame!)
H_aeroRef_aeroBody = H_aeroRef_T265Ref.dot( H_T265Ref_T265body.dot( H_T265body_aeroBody))
# Take offsets from body's center of gravity (or IMU) to camera's origin into account
if body_offset_enabled == 1:
H_body_camera = tf.euler_matrix(0, 0, 0, 'sxyz')
H_body_camera[0][3] = body_offset_x
H_body_camera[1][3] = body_offset_y
H_body_camera[2][3] = body_offset_z
H_camera_body = np.linalg.inv(H_body_camera)
H_aeroRef_aeroBody = H_body_camera.dot(H_aeroRef_aeroBody.dot(H_camera_body))
# Realign heading to face north using initial compass data
if compass_enabled == 1:
H_aeroRef_aeroBody = H_aeroRef_aeroBody.dot( tf.euler_matrix(0, 0, heading_north_yaw, 'sxyz'))
# Show debug messages here
if debug_enable == 1:
os.system('clear') # This helps in displaying the messages to be more readable
print("DEBUG: Raw RPY[deg]: {}".format( np.array( tf.euler_from_matrix( H_T265Ref_T265body, 'sxyz')) * 180 / m.pi))
print("DEBUG: NED RPY[deg]: {}".format( np.array( tf.euler_from_matrix( H_aeroRef_aeroBody, 'sxyz')) * 180 / m.pi))
print("DEBUG: Raw pos xyz : {}".format( np.array( [data.translation.x, data.translation.y, data.translation.z])))
print("DEBUG: NED pos xyz : {}".format( np.array( tf.translation_from_matrix( H_aeroRef_aeroBody))))
send_vision_position_message()
last_vision_send = now
except KeyboardInterrupt:
print("INFO: KeyboardInterrupt has been caught. Cleaning up...")
finally:
pipe.stop()
vehicle.close()
print("INFO: Realsense pipeline and vehicle object closed.")
sys.exit()