# SBN Reader - Locosys SiRF Binary

Created by Michael George (AKA Logiqx)

Website: https://logiqx.github.io/gps-wizard/

In [27]:
import os
import sys

from datetime import datetime, timezone

import numpy as np

import unittest

from file_reader import FileReader

## Main Class

In [28]:
class SbnReader(FileReader):
    '''SBN file - Locosys SiRF Binary'''

    def __init__(self, filename):
        '''Basic init just records the filename'''

        super().__init__(filename)

        self.header = {}
        self.data = {}
        
        self.numRecords = 0


    def load(self):
        '''Load file into memory'''

        with open(self.filename, "rb") as f:
            self.buffer = f.read()
            self.bufferPtr = 0

            self.readHeader()
            self.readData()
            self.consumeData()


    def readUnsigned16(self):
        '''Read unsigned 16-bit integer from the buffer - big endian'''
        
        value = self.buffer[self.bufferPtr] << 8 | self.buffer[self.bufferPtr + 1]
        self.bufferPtr += 2
        
        return value
        

    def readBytes(self, numBytes):
        '''Read fixed number of bytes from the buffer'''
        
        value = self.buffer[self.bufferPtr : self.bufferPtr + numBytes]
        self.bufferPtr += numBytes
        
        return value
        

    def readMessage(self):
        '''Read message from buffer'''

        # Check the start token is a0a2
        startToken = self.readUnsigned16()
        if startToken != 0xa0a2:
            raise ValueError('Unexpected start token {:x} - expected {:x}'.format(startToken, 0xa0a2))

        # Internal length is sometimes big endian, sometimes little endian!
        length = self.readUnsigned16()
        if length > 255:
            length = length >> 8
            
        # Main message content
        payload = self.readBytes(length)

        # Evaluate checksum
        checksum = self.readUnsigned16()
        if checksum != sum(payload):
            raise ValueError('Checksum difference - {}'.format(checksum - sum(payload)))

        # Check the end token is b0b3
        endToken = self.readUnsigned16()
        if endToken != 0xb0b3:
            raise ValueError('Unexpected end token {:x} - expected {:x}'.format(endToken, 0xb0b3))

        return payload


    def readHeader(self):
        '''Load header into memory'''

        # Main header content
        message = self.readMessage()

        # Parse ASCII data, ignoring payload[0] which is the mid-file ID
        splitFields = message[1:].decode('ascii').split(',')
        self.header['username'] = splitFields[0]
        self.header['serial'] = int(splitFields[1])
        self.header['frequency'] = 5 if splitFields[2] == '2' else 1
        self.header['firmware'] = splitFields[3]
        

    def readData(self):
        '''Read data into memory'''

        fileSize = len(self.buffer)
     
        leanBuffer = bytearray()

        while self.bufferPtr < fileSize:
            message = self.readMessage()
                       
            # Note: Typical message IDs are 41 (Geodetic Navigation Data) and 13 (Visible List)
            messageId = message[0]

            # Geodetic Navigation Data – Message ID 41
            if messageId == 41:
                leanBuffer.extend(message)
                self.numRecords += 1

        if self.bufferPtr != fileSize:
            raise ValueError('File appears to be truncated')

        messageLen = len(message)
        if len(leanBuffer) / self.numRecords != messageLen:
            raise ValueError('File appears to have varying message lengths')

        # Standard SiRF SBN format
        dtype = [
            ('message_id', 'u1'),
            ('nav_valid', '>u2'),
            ('nav_type', '>u2'),
            ('extended_week_no', '>u2'),
            ('time_of_week', '>u4'),
            ('utc_year', '>u2'),
            ('utc_month', 'u1'),
            ('utc_day', 'u1'),
            ('utc_hour', 'u1'),
            ('utc_minute', 'u1'),
            ('utc_second', '>u2'),
            ('sv_ids', '>u4'),
            ('latitude', '>i4'),
            ('longitude', '>i4'),
            ('altitude_ellipsoid', '>i4'),
            ('altitude_msl', '>i4'),
            ('map_datum', 'u1'),
            ('sog', '>u2'),
            ('cog', '>u2'),
            ('magnetic_variation', '>i2'),
            ('climb_rate', '>i2'),
            ('heading_rate', '>i2'),
            ('ehpe', '>u4'),
            ('evpe', '>u4'),
            ('ete', '>u4'),
            ('ehve', '>u2'),
            ('clock_bias', '>i4'),
            ('clock_bias_error', '>u4'),
            ('clock_drift', '>i4'),
            ('clock_drift_error', '>u4'),
            ('distance', '>u4'),
            ('distance_error', '>u2'),
            ('heading_error', '>u2'),
            ('sv_count', 'u1'),
            ('hdop', 'u1'),
            ('additional_mode_info', 'u1')
        ]

        # Locosys extensions (GT-11)
        if messageLen >= 95:
            dtype.extend([
                ('unfiltered_sog', '>u2'),
                ('unfiltered_cog', '>u2')
            ])

        # Locosys extensions (GT-31 onwards)
        if messageLen >= 97:
            dtype.extend([
                ('sdop', 'u1'),
                ('vsdop', 'u1')
            ])

        # Future-proofing - e.g. messages over 97 bytes in length
        for i in range(messageLen - 97):
            dtype.extend([
                ('unknown_{:02}'.format(i), 'u1')
            ])
        
        self.rawData = np.frombuffer(leanBuffer, dtype=np.dtype(dtype), count=self.numRecords)


    def consumeData(self):
        '''Consume data - convert to standard data structure'''

        # Convert HDOP from integer to decimal
        self.data['hdop'] = self.rawData['hdop'] / 5

        # Satellite count is already an integer
        self.data['sat'] = self.rawData['sv_count']

        # Convert date/time to regular timestamp (seconds)
        vfunc = np.vectorize(getDateTime)
        timestampSecs = vfunc(  self.rawData['utc_year'], self.rawData['utc_month'], self.rawData['utc_day'],
                                self.rawData['utc_hour'], self.rawData['utc_minute'], self.rawData['utc_second'] // 1000)

        # Convert timestamps to milliseconds
        self.data['timestamp'] = timestampSecs + (self.rawData['utc_second'] % 1000) / 1000

        # Copy satellite IDs
        self.data['sv_ids'] = self.rawData['sv_ids']

        # Convert latitude and longitude to decimal
        self.data['lat'] = self.rawData['latitude'] / 10000000
        self.data['lon'] = self.rawData['longitude'] / 10000000

        # Convert elevation and SOG from cm/s to m/s
        self.data['ele'] = self.rawData['altitude_msl'] / 100
        self.data['sog'] = self.rawData['sog'] / 100

        # Convert COG from integer to decimal
        self.data['cog'] = self.rawData['cog'] / 100

        # Convert rate of climb from cm/s to m/s
        self.data['roc'] = self.rawData['climb_rate'] / 100

        # Convert SDOP from cm/s to m/s - original SBN format was without SDOP
        if 'sdop' in self.rawData.dtype.names:
            self.data['sdop'] = self.rawData['sdop'] / 100

        # Convert VSDOP from cm/s to m/s - original SBN format was without VSDOP
        if 'vsdop' in self.rawData.dtype.names:
            self.data['vsdop'] = self.rawData['vsdop'] / 100


def getDateTime(year, month, day, hour, minute, second):
    '''Decode date/time'''

    return datetime(year, month, day, hour, minute, second, tzinfo=timezone.utc).timestamp()

## Unit Tests

In [29]:
class TestHeader(unittest.TestCase):
    '''Class to test header was correctly loaded'''

    def testUsername(self):
        '''Test the username is as expected'''

        self.assertEqual(sbnReader.header['username'], 'GEORG30MICHA')


    def testSerial(self):
        '''Test the serial is as expected'''

        self.assertEqual(sbnReader.header['serial'], 932000175)


    def testFrequency(self):
        '''Test the frequency is as expected'''

        self.assertEqual(sbnReader.header['frequency'], 1)


    def testFirmware(self):
        '''Test the firmware is as expected'''

        self.assertEqual(sbnReader.header['firmware'], 'V1.4(B0803T)')

In [30]:
class TestData(unittest.TestCase):
    '''Class to test data was correctly loaded'''

    def testNumRecords(self):
        '''Test the number of records is as expected'''

        self.assertEqual(sbnReader.numRecords, 4161)

        for fieldName in sbnReader.data:
            self.assertEqual(sbnReader.data[fieldName].size, 4161)


    def testHdop(self):
        '''Test the horizontal dilution of precision is as expected'''

        self.assertEqual(sbnReader.data['hdop'].min(), 0.8)
        self.assertEqual(sbnReader.data['hdop'].max(), 2.0)


    def testSat(self):
        '''Test the satellite count is as expected'''

        self.assertEqual(sbnReader.data['sat'].min(), 4)
        self.assertEqual(sbnReader.data['sat'].max(), 10)


    def testTimestamp(self):
        '''Test the timestamp is as expected'''

        self.assertEqual(sbnReader.data['timestamp'].min(), 1649672182)
        self.assertEqual(sbnReader.data['timestamp'].max(), 1649678792)


    def testSatelliteIds(self):
        '''Test the satellite IDs are as expected'''

        self.assertEqual(sbnReader.data['sv_ids'].min(), 302123008)
        self.assertEqual(sbnReader.data['sv_ids'].max(), 3542222850)


    def testLatitude(self):
        '''Test the latitude is as expected'''

        self.assertEqual(sbnReader.data['lat'].min(), 50.571016)
        self.assertEqual(sbnReader.data['lat'].max(), 50.5833319)


    def testLongitude(self):
        '''Test the longitude is as expected'''

        self.assertEqual(sbnReader.data['lon'].min(), -2.4620455)
        self.assertEqual(sbnReader.data['lon'].max(), -2.4563038)


    def testElevation(self):
        '''Test the elevation is as expected'''

        self.assertEqual(sbnReader.data['ele'].min(), -3.02)
        self.assertEqual(sbnReader.data['ele'].max(), 11.93)


    def testSpeed(self):
        '''Test the speed over ground is as expected'''

        self.assertEqual(sbnReader.data['sog'].min(), 0.13)
        self.assertEqual(sbnReader.data['sog'].max(), 16.83)


    def testCog(self):
        '''Test the course over ground is as expected'''

        self.assertEqual(sbnReader.data['cog'].min(), 0.01)
        self.assertEqual(sbnReader.data['cog'].max(), 359.92)


    def testRoc(self):
        '''Test the rate of climb is as expected'''

        self.assertEqual(sbnReader.data['roc'].min(), -0.56)
        self.assertEqual(sbnReader.data['roc'].max(), 0.35)


    def testSdop(self):
        '''Test the speed dilution of precision is as expected'''

        self.assertEqual(sbnReader.data['sdop'].min(), 0.09)
        self.assertEqual(sbnReader.data['sdop'].max(), 2.1)


    def testVsdop(self):
        '''Test the vertical speed dilution of precision is as expected'''

        self.assertEqual(sbnReader.data['vsdop'].min(), 0.1)
        self.assertEqual(sbnReader.data['vsdop'].max(), 0.39)

In [31]:
if __name__ == '__main__':
    # Determine whether session is interactive or batch to facilitate unittest.main(..., exit=testExit)
    import __main__ as main
    testExit = hasattr(main, '__file__')

    projdir = os.path.realpath(os.path.join(sys.path[0], "..", ".."))

    filename = os.path.join(projdir, 'sessions', '20220411', 'GT31_1Hz_GEORG30MICHA_932000175_20220411_111600.SBN')
    sbnReader = SbnReader(filename)
    sbnReader.load()
    
    unittest.main(argv=['first-arg-is-ignored'], exit=testExit)

.................
----------------------------------------------------------------------
Ran 17 tests in 0.009s

OK
