Skip to content

Commit

Permalink
DFReader: Create DFMetaData class to handle logger metadata
Browse files Browse the repository at this point in the history
DFMetaData added as metadata attribute of DFReader class
download static method to download XML into .pymavlink/LogMessages
When needed, reads <vehicle>.xml file from ~/.pymavlink/LogMessages
Method for printing help text for MAVExplorer 'logmessage help' command
Method for returning log message description
Applied code update suggestions and performed flake8 check on new code

Co-authored-by: Peter Barker <pb-gh@barker.dropbear.id.au>
  • Loading branch information
shancock884 and peterbarker committed Apr 2, 2024
1 parent a71c7b7 commit 2f6e31e
Showing 1 changed file with 152 additions and 1 deletion.
153 changes: 152 additions & 1 deletion DFReader.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import time

import struct
import sys
import gzip
import io

from . import mavutil

try:
Expand Down Expand Up @@ -555,6 +557,154 @@ def set_message_timestamp(self, m):
m._timestamp = self.timebase + count/rate


class DFMetaData(object):
'''handle dataflash messages metadata'''
def __init__(self, parent):
self.parent = parent
self.data = None
self.metadata_load_attempted = False

def reset(self):
'''clear cached data'''
self.data = None
self.metadata_load_attempted = False

@staticmethod
def dot_pymavlink(*args):
'''return a path to store pymavlink data'''
if 'HOME' not in os.environ:
dir = os.path.join(os.environ['LOCALAPPDATA'], '.pymavlink')
else:
dir = os.path.join(os.environ['HOME'], '.pymavlink')
if len(args) == 0:
return dir
return os.path.join(dir, *args)

@staticmethod
def download_url(url):
'''download a URL and return the content'''
if sys.version_info.major < 3:
from urllib2 import urlopen as url_open
from urllib2 import URLError as url_error
else:
from urllib.request import urlopen as url_open
from urllib.error import URLError as url_error
try:
resp = url_open(url)
except url_error as e:
print('Error downloading %s : %s' % (url, e))
return None
return resp.read()

@staticmethod
def download():
# Make sure the folder to store XML in has been created
os.makedirs(DFMetaData.dot_pymavlink('LogMessages'), exist_ok=True)
# Loop through vehicles to download
for vehicle in ['Rover', 'Copter', 'Plane', 'Tracker', 'Blimp', 'Sub']:
url = 'http://autotest.ardupilot.org/LogMessages/%s/LogMessages.xml.gz' % vehicle
file = DFMetaData.dot_pymavlink('LogMessages', "%s.xml" % vehicle)
print("Downloading %s as %s" % (url, file))
data = DFMetaData.download_url(url)
if data is None:
continue
# decompress it...
with gzip.GzipFile(fileobj=io.BytesIO(data)) as gz:
data = gz.read()
try:
open(file, mode='wb').write(data)
except Exception as e:
print("Failed to save to %s : %s" % (file, e))

def metadata_tree(self, verbose=False):
''' return a map between a log message and its metadata. May return
None if data is not available '''
# If we've already tried loading data, use it if we have it
# This avoid repeated attempts, when the file is not there
if self.metadata_load_attempted:
return self.data
self.metadata_load_attempted = True
# Get file name, based on vehicle type
mapping = {mavutil.mavlink.MAV_TYPE_GROUND_ROVER : "Rover",
mavutil.mavlink.MAV_TYPE_FIXED_WING : "Plane",
mavutil.mavlink.MAV_TYPE_QUADROTOR : "Copter",
mavutil.mavlink.MAV_TYPE_HELICOPTER : "Copter",
mavutil.mavlink.MAV_TYPE_ANTENNA_TRACKER : "Tracker",
mavutil.mavlink.MAV_TYPE_SUBMARINE : "Sub",
mavutil.mavlink.MAV_TYPE_AIRSHIP : "Blimp",
}
if self.parent.mav_type not in mapping:
return None
path = DFMetaData.dot_pymavlink("LogMessages", "%s.xml" % mapping[self.parent.mav_type])
# Does the file exist?
if not os.path.exists(path):
if verbose:
print("Can't find '%s'" % path)
print("Please run 'logmessage download' from MAVExplorer, or call")
print("DFMetaData.download() from Python.")
return None
# Read in the XML
xml = open(path, 'rb').read()
from lxml import objectify
objectify.enable_recursive_str()
tree = objectify.fromstring(xml)
data = {}
for p in tree.logformat:
n = p.get('name')
data[n] = p
# Cache and return data
self.data = data
return self.data

def print_help(self, msg):
'''print help for a log message'''
data = self.metadata_tree(verbose=True)
if data is None:
return
if msg not in data:
print("No help found for message: %s" % msg)
return
node = data[msg]
# Message name and description
print("Log Message: %s\n%s\n" % (msg, node.description.text))
# Protect against replay messages which dont list their fields
if not hasattr(node.fields, 'field'):
return
namelist = []
unitlist = []
# Loop through fields to build list of name/units
for f in node.fields.field:
namelist.append(f.get('name'))
units = f.get('units')
dtype = f.get('type')
if units:
unitlist.append("[%s] " % units)
elif 'char' in dtype:
unitlist.append("[%s] " % dtype)
elif hasattr(f, 'enum'):
unitlist.append("[enum] ")
elif hasattr(f, 'bitmask'):
unitlist.append("[bitmask] ")
else:
unitlist.append("")
# Now get the max string length from each list
namelen = len(max(namelist, key=len))
unitlen = len(max(unitlist, key=len))
# Loop through fields again to do the actual printing
for i in range(0, len(namelist)):
desc = node.fields.field[i].description.text
print("%-*s %-*s: %s" % (namelen, namelist[i], unitlen, unitlist[i], desc))

def get_description(self, msg):
'''get the description of a log message'''
data = self.metadata_tree()
if data is None:
return None
if msg in data:
return data[msg].description.text
return ""


class DFReader(object):
'''parse a generic dataflash file'''
def __init__(self):
Expand All @@ -572,6 +722,7 @@ def __init__(self):
self.percent = 0
self.unit_lookup = {} # lookup table of units defined by UNIT messages
self.mult_lookup = {} # lookup table of multipliers defined by MULT messages
self.metadata = DFMetaData(self)

def _rewind(self):
'''reset state on rewind'''
Expand Down

0 comments on commit 2f6e31e

Please sign in to comment.