<a href="https://colab.research.google.com/github/Kovacs37/Kovacs37/blob/main/mavlogparse.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# MAVLink Log Parser

## Relevant Setup (Installs/Imports)

In [15]:
!pip install pymavlink

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [16]:
import json
from pathlib import Path
from fnmatch import fnmatch
from pymavlink import mavutil
from google.colab import files
from inspect import getfullargspec

import ipywidgets as widgets
from IPython.display import display

def upload_one_file():
    return next(iter(files.upload().values()))

## Parsing Code

In [17]:
class Telemetry:
    DEFAULT_FIELDS = {
        'VFR_HUD'          : ['heading', 'alt', 'climb'],
        'VIBRATION'        : [f'vibration_{axis}' for axis in 'xyz'],
        'SCALED_IMU2'      : [s for axis in 'xyz'
                              for s in (axis+'acc', axis+'gyro')],
        'ATTITUDE'         : [s for rot in ('roll', 'pitch', 'yaw')
                              for s in (rot, rot+'speed')],
        'SCALED_PRESSURE2' : ['temperature'],
    }
    def __init__(self, log_file, fields=DEFAULT_FIELDS,
                 dialect='ardupilotmega'):
        ''' Creates a tlog parser on 'log_file', extracting 'fields'.
        'log_file' can be a string filename/path, or pathlib.Path instance.
        'fields' is either a dictionary in the form {'TYPE': ['attr']/None},
            or a string filename/path to a json/text file with the same
            structure. Specifying None (null if in a file) instead of an
            attribute list gets all the attributes for that type.
            Defaults to Telemetry.DEFAULT_FIELDS.
        'dialect' is a string specifying the mavlink parsing dialect to use.
            Default 'ardupilotmega'.
        '''
        self.log_file = str(log_file) # mavutil doesn't use Path 
        self.mlog = mavutil.mavlink_connection(self.log_file, dialect=dialect)

        self._init_fields(fields)

    def _init_fields(self, fields):
        ''' Determine CSV fields and populate None attribute lists. '''
        if isinstance(fields, (str, Path)):
            with open(fields) as field_file:
                fields = json.load(field_file)

        self.csv_fields = ['timestamp']
        nan = float('nan') # start with non-number data values
        self.data = [nan]
        self.offsets = {}
        for type_, field in fields.items():
            if field is None:
                type_class = f'MAVLink_{type_.lower()}_message'
                fields[type_] = \
                    getfullargspec(getattr(mavutil.mavlink,
                                           type_class).__init__).args[1:]
            self.offsets[type_] = offset = len(self.csv_fields)
            self.csv_fields.extend(f'{type_}.{attr}' for attr in fields[type_])
            self.data.extend(nan for _ in range(len(self.csv_fields) - offset))

        self.fields = fields
        self.type_set = set(fields) # put major fields in a set

    def __enter__(self):
        return self

    def __exit__(self, *exc):
        self.mlog.close()

    def __iter__(self):
        while "messages available":
            msg = self.mlog.recv_match(type=self.type_set)
            if not msg:
                break
            m_type = msg.get_type()

            if m_type == 'BAD_DATA':
                print('bad data recorded')
                continue
            elif not self.match_types(m_type, self.type_set):
                # keep going if the message is the wrong type
                # NOTE: specifically relevant because recv_match internal
                #       'skip_to_type' function auto-includes HEARTBEAT and
                #       PARAM_VALUE messages
                continue

            yield msg

    @staticmethod
    def match_types(m_type, patterns):
        ''' Return True if m_type matches one of patterns.
        'patterns' are types but case-insensitive on Windows, and with support
            for unix-style wildcards:
                *      -> match everything
                ?      -> match a single character
                [seq]  -> match any character in seq
                [!seq] -> match any character not in seq
        '''
        return any(fnmatch(m_type, p) for p in patterns)

    def to_csv(self, output=None, csv_sep=',', verbose=True):
        '''
        NOTE: opens output in append mode
               -> can create files, but WILL NOT overwrite existing files
                  (adds to the end instead).
        '''
        if output is None:
            output = Path(self.log_file).with_suffix('.csv')
        if verbose:
            print(f'Processing {self.log_file}\n  -> Saving to {output}')

        last_timestamp = None
        adding = Path(output).is_file()
        # TODO enable stdout output for printing to terminal?
        with self as mavlink, open(output, 'a') as out_file:
            def write_line(data):
                print(csv_sep.join(data), file=out_file)
            # convert to suitable for csv output
            self.data = [str(val) for val in self.data]

            if not adding:
                write_line(self.csv_fields) # field headings

            for msg in mavlink:
                wrote_last = False
                timestamp = getattr(msg, '_timestamp', 0.0)
                data = msg.to_dict()
                if last_timestamp is not None and timestamp != last_timestamp:
                    # new timestamp, so write latest data and timestamp
                    self.data[0] = f'{last_timestamp:.8f}'
                    write_line(self.data)
                    wrote_last = True

                self._update(msg.get_type(), data, convert=str)
                last_timestamp = timestamp

            try:
                if not wrote_last: # handle last message
                    self.data[0] = f'{last_timestamp:.8f}'
                    write_line(self.data)
            except UnboundLocalError:
                print('No desired messages found in file')

    def data_parser(self):
        last_timestamp = None
        with self as mavlink:
            for msg in mavlink:
                yielded_last = False
                timestamp = getattr(msg, '_timestamp', 0.0)
                data = msg.to_dict()
                if last_timestamp is not None and timestamp != last_timestamp:
                    # new timestamp, so yield latest data and timestamp
                    self.data[0] = last_timestamp
                    yield self.data
                    yielded_last = True

                self._update(msg.get_type(), data)
                last_timestamp = timestamp

            try:
                if not yielded_last: # handle last message
                    self.data[0] = last_timestamp
                    yield self.data
            except UnboundLocalError:
                print('No desired messages found in file')

    def _update(self, type_, data, convert=lambda d: d):
        ''' Update with the latest data for 'type_'. '''
        offset = self.offsets[type_]
        for index, desired_attr in enumerate(self.fields[type_]):
            self.data[offset+index] = convert(data[desired_attr])

    @classmethod
    def logs_to_csv(cls, output, logs, fields=DEFAULT_FIELDS, csv_sep=',',
                    dialect='ardupilotmega', verbose=True):
        for log in logs:
            cls(log, fields, dialect).to_csv(output, csv_sep, verbose)

    @staticmethod
    def csv_to_df(filename, timestamp='timestamp',
                  timezone='Australia/Melbourne', **kwargs):
        ''' Returns a pandas dataframe of a csv-log, indexed by timestamp.
        'filename' is the path to a csv-file, as output by Telemetry.to_csv
        'timestamp' is the name of the timestamp column. Defaults to
            "timestamp".
        'timezone' is the location where the data was collected. Data is
            assumed to be in UTC, and is converted to the specified timezone.
            Defaults to 'Australia/Melbourne'.
        'kwargs' are additional key-word arguments to pass to pandas read_csv.
            Mostly useful for 'usecols', if not all columns are required.
        '''
        import pandas as pd

        def parser(utc_epoch_seconds):
            return (pd.to_datetime(utc_epoch_seconds, unit='s')
                    .tz_localize('utc').tz_convert(timezone))

        return pd.read_csv(filename, index_col=timestamp,
                           parse_dates=[timestamp], date_parser=parser,
                           **kwargs)

    @classmethod
    def get_useful_fields(cls, tlogs, out='useful.json', fields=None,
                          dialect='ardupilotmega', verbose=True):
        ''' Returns a {type: [fields]} dictionary of all non-constant fields.
        'tlogs' should be an iterable of one or more string/Path filepaths.
        'out' is the filename to save to. If set to None does not save.
        'fields' a json file for a subset of fields to parse with. If left as
            None checks all fields in the file.
        'dialect' is the mavlink dialect in use. Default 'ardupilotmega'.
        'verbose' a boolean determining if progress updates should be printed.
        '''
        mavutil.set_dialect(dialect) # set dialect so field tracker is accurate
        fields = cls.__create_field_tracker(fields)
        # determine fields dictionary to initialise with
        init_fields = {type_: list(fields_)
                       for type_, fields_ in fields.items()}

        useful_types = {}

        for tlog in tlogs:
            if verbose:
                print(f'Extracting useful fields from {tlog!r}')
            with cls(tlog, init_fields, dialect) as mavlink:
                for msg in mavlink:
                    cls.__process(msg, mavlink, fields, init_fields,
                                  useful_types)

        useful_types = {t: useful_types[t] for t in sorted(useful_types)}

        if out:
            with open(out, 'w') as output:
                json.dump(useful_types, output, indent=4)
            if verbose:
                print(f'  -> Saving to {output}')

        return useful_types

    @staticmethod
    def __create_field_tracker(fields):
        ''' Create a dictionary of {type: {field: None}} for specified fields.
        If 'fields' is None gets all the types and fields from mavutil.mavlink.
        If 'fields' is a string/Path json file, reads in and replaces any
            None fields with all the valid fields for that type.
        '''
        def get_fields(type_):
            ''' Return a dict of {field: None} for all fields of type_. '''
            return {field: None for field in
                    getfullargspec(getattr(mavutil.mavlink,
                                           type_).__init__).args[1:]}

        if fields is None:
            fields = {t[8:-8].upper(): get_fields(t)
                      for t in dir(mavutil.mavlink)
                      if t.startswith('MAVLink_') and t.endswith('_message')}
        elif isinstance(fields, (str, Path)):
            fmt = 'MAVLink_{}_message'
            with open(fields) as in_file:
                fields = {type_: ({field: None for field in fields_} if fields_
                                  else get_fields(fmt.format(type_.lower())))
                          for type_, fields_ in json.load(in_file).items()}

        return fields

    @staticmethod
    def __process(msg, mavlink, fields, init_fields, useful_types):
        msg_type = msg.get_type()

        to_remove = []
        for field, data in fields[msg_type].items():
            msg_data = getattr(msg, field)
            if data is None:
                fields[msg_type][field] = msg_data
            elif msg_data != data:
                # data changes -> useful field
                if msg_type not in useful_types:
                    useful_types[msg_type] = []
                useful_types[msg_type].append(field)
                to_remove.append(field)
                if not fields[msg_type]:
                    # all fields useful -> stop checking type
                    mavlink.type_set.pop(msg_type)
                    init_fields.pop(msg_type)

        for field in to_remove:
            fields[msg_type].pop(field)


# Your Telemetry File(s)

Upload one or more `.tlog` files to analyse

> **Note:** this runs in Google's cloud servers, so don't upload sensitive data.

If uploading takes too long (e.g. due to large files), you may be better off [running locally](https://gist.github.com/ES-Alexander/1498c87ad6c07623f6776c552869c805#:~:text=Running%20Locally%20(on%20your%20computer)) instead.

In [18]:
tlogs = files.upload()

Saving 2022-07-05 12-20-52.tlog to 2022-07-05 12-20-52 (1).tlog


## Desired Fields

In [29]:
#@markdown ### Select Fields:
field_selection_method = 'Extract available fields' #@param ['Use default fields', 'Extract available fields', 'Upload json fields file']
if field_selection_method == 'Use default fields':
    fields = Telemetry.DEFAULT_FIELDS
    print(fields)
elif field_selection_method == 'Extract available fields':
    Telemetry.get_useful_fields(tlogs)
    print('\nModify useful.json in the file explorer from the sidebar to only')
    print('include the fields you want. (Double-click to view/edit)\n')
    button = widgets.Button(description="Done")
    def submit(b):
        global fields
        with open('useful.json') as file:
            fields = json.load(file)
        print(fields)
        print('\nFields successfully loaded')
    button.on_click(submit)
    display(button)
else:
    print('Upload json file of desired fields:')
    fields = json.loads(upload_one_file())

Extracting useful fields from '2022-07-05 12-20-52.tlog'
  -> Saving to <_io.TextIOWrapper name='useful.json' mode='w' encoding='UTF-8'>

Modify useful.json in the file explorer from the sidebar to only
include the fields you want. (Double-click to view/edit)



Button(description='Done', style=ButtonStyle())

{'AHRS': ['omegaIx', 'omegaIy', 'omegaIz', 'error_rp', 'error_yaw'], 'AHRS2': ['roll', 'pitch', 'yaw', 'altitude', 'lat', 'lng'], 'ATTITUDE': ['time_boot_ms', 'roll', 'pitch', 'yaw', 'rollspeed', 'pitchspeed', 'yawspeed'], 'COMMAND_ACK': ['command', 'result', 'target_component'], 'COMMAND_LONG': ['command', 'param1', 'param2'], 'DISTANCE_SENSOR': ['time_boot_ms'], 'EKF_STATUS_REPORT': ['pos_horiz_variance', 'pos_vert_variance', 'compass_variance', 'flags', 'velocity_variance'], 'FILE_TRANSFER_PROTOCOL': ['payload'], 'GLOBAL_POSITION_INT': ['time_boot_ms', 'hdg', 'relative_alt', 'vz', 'vx', 'vy', 'alt', 'lon', 'lat'], 'GPS_RAW_INT': ['time_usec', 'cog', 'fix_type', 'lat', 'lon', 'alt', 'eph', 'epv', 'satellites_visible', 'h_acc', 'v_acc', 'vel_acc'], 'HEARTBEAT': ['type', 'autopilot', 'base_mode', 'system_status'], 'HOME_POSITION': ['time_usec'], 'LOCAL_POSITION_NED': ['time_boot_ms', 'x', 'y', 'z', 'vx', 'vy', 'vz'], 'LOG_ENTRY': ['id'], 'LOG_REQUEST_LIST': ['start', 'end'], 'MISSION_A

## Convert Log(s) to CSV(s)

In [30]:
#@markdown ### Combine Output CSVs?
combined_CSV_output_filename = "Don't Combine" #@param ["Don't Combine"] {allow-input: true}
if combined_CSV_output_filename == "Don't Combine":
    combined_CSV_output_filename = None
print(fields)
Telemetry.logs_to_csv(combined_CSV_output_filename, tlogs, fields)
print('\nRight click output csv(s) to download (from sidebar at left)')

Processing 2022-07-05 12-20-52.tlog
  -> Saving to 2022-07-05 12-20-52.csv

Right click output csv(s) to download (from sidebar at left)


# Analysis

## Imports/Setup

In [31]:
import plotly.express as px
import pandas as pd

## Select File & Timezone

In [32]:
#@markdown ### Specify Timezone:
timezone = 'Australia/Melbourne' #@param ['Australia/Melbourne', 'US/Eastern', 'Etc/GMT+3', 'Etc/GMT-4'] {allow-input: true}

In [37]:
upload = 'Upload New'
dropdown = widgets.Dropdown(options=(*Path().glob('*.csv'), upload),
                            description="Select CSV:")
display(dropdown)

Dropdown(description='Select CSV:', options=(PosixPath('2022-07-05 12-20-52.csv'), 'Upload New'), value=PosixP…

In [38]:
filename = dropdown.value
if filename == upload:
    filename = upload_one_file()

## Convert to DataFrame








In [39]:
df = Telemetry.csv_to_df(filename, timezone=timezone)

ParserError: ignored

## Plot things

In [None]:
# example interactive Plotly scatter plot of depth (altitude) over time
px.line(df, x=df.index, y='VFR_HUD.alt')