In [19]:
import numpy
from matplotlib import pyplot as matpyplot

import pickle
from collections import Counter

In [20]:
with open(
    '../data/20240416/'
    'gnss_test_raw_data_1713259484.0477698.pickle', 'rb'
) as handle:    
    ubxmsg_data = pickle.load(handle)

print(f'Quantity of messages: {len(ubxmsg_data)}')

Quantity of messages: 6803


# Describing the content of the file

- Types of messages
- Quantity of messages per type
- Duration of the recording (inspection of timestamps)

In [21]:
def inspect_ubxmsg_data(ubxmsg_data_record):
    MSGS_IDs =  []
    MSGS_IDs_count = Counter()

    init_ts = None
    final_ts = None

    for ubx_msg_ts in ubxmsg_data_record:
        ubxmsg = ubx_msg_ts[0]

        if init_ts is None:
            init_ts = ubx_msg_ts[1]
        final_ts = ubx_msg_ts[1]

        if ubxmsg:
            if not ubxmsg.identity in MSGS_IDs:
                print(str(ubxmsg.identity))
                MSGS_IDs.append(ubxmsg.identity)
        
            MSGS_IDs_count.update({f"{ubxmsg.identity}": 1})

    total_recording_time = final_ts - init_ts

    print(len(MSGS_IDs))
    print(MSGS_IDs_count.items())
    print(f'Data recording started at: {init_ts}')
    print(f'Data recording ended at: {final_ts}')
    print(
        f'Data recording duration: {total_recording_time} sec'
        f' OR {(total_recording_time) / 60} min'
    )

In [22]:
# Second closer data
inspect_ubxmsg_data(ubxmsg_data)

GNVTG
GNGGA
GNGSA
GPGSV
GLGSV
GAGSV
GBGSV
NAV-SAT
NAV-SIG
NAV-DOP
RXM-MEASX
GNRMC
GNTXT
13
dict_items([('GNVTG', 401), ('GNGGA', 401), ('GNGSA', 1604), ('GPGSV', 1799), ('GLGSV', 1033), ('GAGSV', 109), ('GBGSV', 49), ('NAV-SAT', 43), ('NAV-SIG', 41), ('NAV-DOP', 51), ('RXM-MEASX', 400), ('GNRMC', 400), ('GNTXT', 72)])
Data recording started at: 1713259484.04777
Data recording ended at: 1713259484.04777
Data recording duration: 0.0 sec OR 0.0 min


# Analyzing Messages batch

Go through the first message and capture the first identity - then repeat until the same id appears.

In [23]:
for n in range(0, 20):
    print(ubxmsg_data[n])

(NMEAMessage('GN','VTG', 0, payload=['26.62', 'T', '', 'M', '0.885', 'N', '1.639', 'K', 'A']), 1713259484.04777)
(NMEAMessage('GN','GGA', 0, payload=['092444.00', '6026.69592', 'N', '02218.89658', 'E', '1', '07', '1.78', '64.7', 'M', '18.7', 'M', '', '']), 1713259484.04777)
(NMEAMessage('GN','GSA', 0, payload=['A', '3', '29', '12', '31', '20', '25', '', '', '', '', '', '', '', '3.69', '1.78', '3.24', '1']), 1713259484.04777)
(NMEAMessage('GN','GSA', 0, payload=['A', '3', '73', '67', '', '', '', '', '', '', '', '', '', '', '3.69', '1.78', '3.24', '2']), 1713259484.04777)
(NMEAMessage('GN','GSA', 0, payload=['A', '3', '', '', '', '', '', '', '', '', '', '', '', '', '3.69', '1.78', '3.24', '3']), 1713259484.04777)
(NMEAMessage('GN','GSA', 0, payload=['A', '3', '', '', '', '', '', '', '', '', '', '', '', '', '3.69', '1.78', '3.24', '4']), 1713259484.04777)
(NMEAMessage('GP','GSV', 0, payload=['2', '1', '08', '04', '14', '342', '08', '05', '10', '117', '10', '12', '26', '124', '22', '20', '

In [24]:
def building_batches_ubxmsg_data(
        ubxmsg_data_record,
        skip_msgs_list=[],
):
    # This block of code captures the batch of messages in the whole list.
    # Whenever the cycle of messages repeats itself the batch of messages is stored
    starting_id = None

    MSGS_BATCH = []
    ALL_MSGS = []
    for i, ubxmsg in enumerate(ubxmsg_data_record):
        ubxmsg_info = ubxmsg[0]

        if ubxmsg_info:
            if ubxmsg_info.identity in skip_msgs_list:
                continue

            if starting_id is None:
                starting_id = ubxmsg_info.identity

            if ubxmsg_info.identity == starting_id:
                if MSGS_BATCH:
                    ALL_MSGS.append(MSGS_BATCH)
                MSGS_BATCH = [ubxmsg_info]
            else:
                MSGS_BATCH.append(ubxmsg_info)
    ALL_MSGS.append(MSGS_BATCH)

    print(f'Batch of messages captured: {len(ALL_MSGS)}')
    return ALL_MSGS

In [25]:
ALL_MSGS_CLOSER = building_batches_ubxmsg_data(ubxmsg_data)

Batch of messages captured: 401


# Extracting the features at each batch

## Features description

- Carrier to Noise Ratio
- Doppler Measurement
- Pseudorange Residual
- PDOP, HDOP, VDOP, TDOP
- Elevation
- Azymuth
- N of visible satellites
- Multipath Indicator
- Pseudorange RMS error
- Standard Deviation of Latitute Error
- Standard Deviation of Longitude Error
- Range RMS
- Noise Level (by GPS Core)

## Where to get the features?

- UBX-NAV-SAT: Carrier Noise Ratio, Pseudorange Residual, Elevation, Azymuth, #VS
- UBX-RXM-MEASX: Carrier Noise Ratio, Doppler Measurement, #VS, Multipath Indicator, PR RMS error
- UBX-NAV-DOP: HDOP, VDOP, PDOP, TDOP
- NMEA-ST-GST: Standard Deviation of Latitude and Longitude Errors, Range RMS
- UBX-MON-RF: Noise Level (GPS Core)

### Code

#### Batch Message Structure

A batch message contains all the features and relevant info to store in one
sample. A batch is collected when the receiver outputs all different messages
before repeating them again.

In [26]:
class Satellite:
    def __init__(
            self, gnss_id=None, sat_id=None, cn_ratio=0, pse_res=0,
            doppler_meas=0, hdop=0, vdop=0, pdop=0, tdop=0,
            elevation=0, azymuth=0, n_vis_sat=0, batch_number=0,
            multipath_ind=None, pr_rms=0, std_lat_err=0, std_lon_err=0,
            range_rms=0, noise_lvl_gpscore=0
        ) -> None:

        self.gnss_id = gnss_id
        self.sat_id = sat_id
        self.cn_ratio = cn_ratio
        self.pse_res = pse_res
        self.doppler_meas = doppler_meas
        self.hdop=hdop
        self.vdop=vdop
        self.pdop=pdop
        self.tdop=tdop
        self.elevation = elevation
        self.azymuth = azymuth
        self.n_vis_sat = n_vis_sat
        self.multipath_ind = multipath_ind
        self.pr_rms = pr_rms
        self.std_lat_err = std_lat_err
        self.std_lon_err = std_lon_err
        self.range_rms = range_rms
        self.noise_lvl_gpscore = noise_lvl_gpscore

        self.batch_number = batch_number

        self.features = []

        self.full_sat_id = self.build_full_gnss_sat_id(
            self.gnss_id, self.sat_id
        )
    
    def convert_sat_data_to_list(
            self, dops,
            n_vis_sat,
            std_lat_err,
            std_lon_err,
            range_rms,
            noise_lvl_gpscore
        ):
        # Update DOPs to ensure they have the general value - if it exists
        if dops:
            self.hdop = dops[0]
            self.vdop = dops[1]
            self.pdop = dops[2]
            self.tdop = dops[3]

        self.n_vis_sat = n_vis_sat
        self.std_lat_err = std_lat_err
        self.std_lon_err = std_lon_err
        self.noise_lvl_gpscore = noise_lvl_gpscore
        self.range_rms = range_rms

        return [
            self.batch_number,
            self.full_sat_id,
            self.gnss_id,
            self.sat_id,
            self.cn_ratio,
            self.pse_res,
            self.doppler_meas,
            self.hdop,
            self.vdop,
            self.pdop,
            self.tdop,
            self.elevation,
            self.azymuth,
            self.n_vis_sat,
            self.multipath_ind,
            self.pr_rms,
            self.std_lat_err,
            self.std_lon_err,
            self.range_rms,
            self.noise_lvl_gpscore,
        ]

    def build_full_gnss_sat_id(self, gnss_id, sat_id):
        return str(gnss_id) + str(sat_id)

Procedure to get the features

1. Extract the features for each satellite in a batch and store it as a row
indicating the satellite full ID and the batch number.

The current code extracts the features following this order:

- Batch, Full sat id, gnss id, sat id, CNo, PR, DM, HD, VD, PD, TD, El, Az, NSV, Multipath, PR RMS, Std LatErr, Std LonErr, Range RMS, Noise LVL

In [27]:
# Name of the message attributes where the features are located

# UBX-RXM-MEASX
FEATURES_AS_MEASX_SAT_ATTRS = [
    'gnssId',
    'svId',
    'cNo',
    'dopplerHz',
    'pseuRangeRMSErr',
    'mpathIndic',
]

# UBX-NAV-SAT
FEATURES_AS_NAVSAT_SAT_ATTRS = [
    'gnssId',
    'svId',
    'cno',
    'elev',
    'azim',
    'prRes',
]

# UBX-NAV-DOP
FEATURES_AS_NAVDOP_REC_ATTRS = [
    'pDOP',
    'tDOP',
    'vDOP',
    'hDOP',
]

# UBX-MON-RF
FEATURES_AS_MON_RF_ATTS = [
    'noisePerMS'
]

# NMEA-ST_GST
FEATURES_AS_GST_ATTS = [
    'stdLat',
    'stdLong',
    'rangeRMS',
]

In [28]:
def building_nsat_id(number_of_ind_Sv: int):
    """
    This function takes the satellite ID and converts it to a str to add the
    leading zeros it needs for the attribute getter.

    Parameter:
        - number_of_ind_Sv: An int representing the individual satellite ID

    Returns:
        - nsat_name: The satellite ID as str with leading zeros as it is placed
        in the attributes.
    """
    Sv_str = str(number_of_ind_Sv)
    nsat_name = Sv_str.zfill(2)

    return nsat_name

In [29]:
def extract_list_attrs_from_msg(
        attrs_list,
        ubxmsg,
        sat_id: int
    ):

    """
    Goes over the attributes list and tries to find them in the ubxmsg using
    also the satellite ID. Stores all the attributes in a list following the
    same order than in the attributes list.

    Parameter:
        - attrs_list: A list containing the attributes from the message to
        extract in a predetermined order.
        - ubxmsg: The dictionary containing the complete message from the
        receiver.
        - sat_id: The number assigned to the satellite in the message.
    
    Returns:
        - A list containing the attributes from the message according to the
        predetermined list order.
    """

    sat_attrs_list = []
    for meas_attr in attrs_list:
        sat_attr_data = getattr(ubxmsg, f'{meas_attr}_{sat_id}', None)
        # print(f'{meas_attr} is: {sat_attr_data}')
        sat_attrs_list.append(sat_attr_data)
    
    return sat_attrs_list

### Extracting features functions

In [30]:
def extracting_features_ubxmsgs(ubx_allmsgs_data):
    # SATID, Batch + Features composition (columns)
    # Batch, Full sat id, gnss id, sat id, CNo, PR, DM, HD, VD, PD, TD, El, Az, NSV, Multipath
    ALL_MSGS_FEATURES = []

    for n_batch, gnssmsgs_batch in enumerate(ubx_allmsgs_data):
        # For every batch
        SAT_BATCH_DATA = {}
        std_lat_err = 0
        std_lon_err = 0
        range_rms = 0
        curr_noise_lvl_gpscore = 0

        sat_n_sat = 0

        if len(gnssmsgs_batch) < 2:
            continue

        for data_msg in gnssmsgs_batch:
            # Go through every msg and extract info

            # DOP list to update all the satellites later (we know they are four)
            dops = []
            
            if data_msg.identity == 'RXM-MEASX':
                meas_n_sat = data_msg.numSv

                for sat in range(1, meas_n_sat + 1):
                    curr_sat_id = building_nsat_id(sat)
                    curr_sat_attrs = extract_list_attrs_from_msg(
                        attrs_list=FEATURES_AS_MEASX_SAT_ATTRS,
                        ubxmsg=data_msg,
                        sat_id=curr_sat_id
                    )

                    curr_sat = Satellite(
                        gnss_id=curr_sat_attrs[0],
                        sat_id=curr_sat_attrs[1],
                        cn_ratio=curr_sat_attrs[2],
                        doppler_meas=curr_sat_attrs[3],
                        pr_rms=curr_sat_attrs[4],
                        multipath_ind=curr_sat_attrs[5],
                        batch_number=n_batch,
                    )

                    if curr_sat.full_sat_id in SAT_BATCH_DATA:
                        # Save only the new features
                        existing_sat = SAT_BATCH_DATA.get(curr_sat.full_sat_id)
                        existing_sat.cn_ratio = curr_sat.cn_ratio
                        existing_sat.doppler_meas = curr_sat.doppler_meas
                    else:
                        # Save a new satellite
                        SAT_BATCH_DATA[curr_sat.full_sat_id] = curr_sat
            
            elif data_msg.identity == 'NAV-SAT':
                sat_n_sat = data_msg.numSvs

                for sat in range(1, sat_n_sat + 1):
                    curr_sat_id = building_nsat_id(sat)
                    curr_sat_attrs = extract_list_attrs_from_msg(
                        attrs_list=FEATURES_AS_NAVSAT_SAT_ATTRS,
                        ubxmsg=data_msg,
                        sat_id=curr_sat_id
                    )

                    curr_sat = Satellite(
                        gnss_id=curr_sat_attrs[0],
                        sat_id=curr_sat_attrs[1],
                        cn_ratio=curr_sat_attrs[2],
                        elevation=curr_sat_attrs[3],
                        azymuth=curr_sat_attrs[4],
                        pse_res=curr_sat_attrs[5],
                        batch_number=n_batch
                    )

                    if curr_sat.full_sat_id in SAT_BATCH_DATA:
                        # Save only the new features
                        existing_sat = SAT_BATCH_DATA.get(curr_sat.full_sat_id)
                        existing_sat.elevation = curr_sat.elevation
                        existing_sat.azymuth = curr_sat.azymuth
                        existing_sat.pse_res = curr_sat.pse_res
                    else:
                        # Save a new satellite
                        SAT_BATCH_DATA[curr_sat.full_sat_id] = curr_sat
            
            elif data_msg.identity == 'NAV-DOP':
                # Update the values in the DOP list
                dops.extend(
                    [data_msg.hDOP, data_msg.vDOP, data_msg.pDOP, data_msg.tDOP]
                )
            
            elif data_msg.identity == 'GNGST':
                std_lat_err = data_msg.stdLat
                std_lon_err = data_msg.stdLong
                range_rms = data_msg.rangeRms
            
            elif data_msg.identity == 'MON-RF':
                rf_num_blocks = data_msg.nBlocks
                rf_block_features = numpy.zeros_like(
                    rf_num_blocks, len(FEATURES_AS_MON_RF_ATTS)
                )
                for rf_block in range(1, rf_num_blocks + 1):
                    curr_rf_block_id = building_nsat_id(rf_block)
                    curr_rf_block_attrs = extract_list_attrs_from_msg(
                        attrs_list=FEATURES_AS_MON_RF_ATTS,
                        ubxmsg=data_msg,
                        sat_id=curr_rf_block_id
                    )
                    rf_block_features[rf_block-1] = curr_rf_block_attrs
                
                curr_noise_lvl_gpscore = rf_block_features[:, 0].mean()

        n_vis_sat = meas_n_sat if meas_n_sat > sat_n_sat else sat_n_sat
        ALL_MSGS_FEATURES.extend(
            [
                satellite.convert_sat_data_to_list(
                    dops, n_vis_sat,
                    std_lat_err, std_lon_err, range_rms,
                    curr_noise_lvl_gpscore
                )
                for satellite in SAT_BATCH_DATA.values()
            ]
        )

    return ALL_MSGS_FEATURES

In [31]:
allmsgs_features_closer = extracting_features_ubxmsgs(ALL_MSGS_CLOSER)

In [32]:
print(len(allmsgs_features_closer))
print(allmsgs_features_closer[0])

4670
[0, '04', 0, 4, 8, -100.5, 1073.4, 0, 0, 0, 0, 14, 342, 19, None, 0, 0, 0, 0, 0]


#### Saving data for later inspection

In [33]:
with open(
    f'./gnss_test_raw_data_1713259484.0477698_features.pickle',
    'wb'
) as handle:
    pickle.dump(
        allmsgs_features_closer,
        handle,
        protocol=pickle.HIGHEST_PROTOCOL
    )

print("\nProcessing complete")


Processing complete


# Inspecting Noise Level

Example to extract only one specific type of message

In [34]:
def building_batches_single_ubxmsg_data(
        ubxmsg_data_record,
        msg_to_inspect=None,
):
    # This block of code captures the batch of messages in the whole list.

    MSGS_BATCH = []
    
    for __, ubxmsg in enumerate(ubxmsg_data_record):
        ubxmsg_info = ubxmsg[0]

        if ubxmsg_info:
            if ubxmsg_info.identity == msg_to_inspect:
                rf_num_blocks = ubxmsg_info.nBlocks
                rf_block_features = numpy.zeros_like(
                    (rf_num_blocks, 1)
                )
                for rf_block in range(1, rf_num_blocks + 1):
                    curr_rf_block_id = building_nsat_id(rf_block)
                    curr_rf_block_attrs = extract_list_attrs_from_msg(
                        attrs_list=FEATURES_AS_MON_RF_ATTS,
                        ubxmsg=ubxmsg_info,
                        sat_id=curr_rf_block_id
                    )
                    rf_block_features[rf_block-1] = curr_rf_block_attrs[0]
                
                curr_noise_lvl_gpscore = rf_block_features[:].mean()
                MSGS_BATCH.append(curr_noise_lvl_gpscore)

    print(f'Batch of messages captured: {len(MSGS_BATCH)}')
    return numpy.array(MSGS_BATCH)