In [1]:
import struct
import uuid
import binascii
import enum
import datetime as dt
import numpy as np

In [2]:
packet = bytes.fromhex('')

In [3]:
class binaryPacket:
    def __init__(self, payload:bytes, packetClass:int, packetID:int, sourceUUID:uuid.UUID, destUUID:uuid.UUID):
        self._payload = payload
        self._class = packetClass
        self._id = packetID
        self._source = sourceUUID
        self._dest = destUUID
    
    def to_bytes(self):
        payloadLen = len(self._payload)
        header = struct.pack("<BB", 0xE4, 0xEB) + \
                             self._source.bytes + \
                             self._dest.bytes + \
                             struct.pack("<BBH", self._class, self._id, payloadLen)
        msg = header + self._payload
        cksum = binascii.crc_hqx(msg, 0xFFFF).to_bytes(2, "big")
        return msg + cksum
    
    def getClassIDCode(self):
        return (self._class << 8) | self._id
    
    def __str__(self):
        string = self.to_bytes().hex().upper()
        length = 4
        return '0x%s' % ' '.join(string[i:i + length] for i in range(0, len(string), length))
    
    def __repr__(self):
        return str(self)
    
    def __eq__(self, packet):
        return self.to_bytes() == packet.to_bytes()
    
    @classmethod
    def from_bytes(cls, packet:bytes):
        srcUUID, destUUID, pcls, pid, payload = cls.parseHeader(packet)
        return binaryPacket(payload, pcls, pid, uuid.UUID(bytes=srcUUID), uuid.UUID(bytes=destUUID))
    
    @classmethod
    def parseHeader(cls, packet:bytes):
        if binascii.crc_hqx(packet, 0xFFFF) != 0:
            raise RuntimeError("Checksum verification failed")
        
        if len(packet) < 0x28:
            raise RuntimeError("Packet too short!")
            
            
        s1, s2, pcls, pid, _ = struct.unpack("<BB16x16xBBH", packet[0:0x26])
        srcUUID = bytes(packet[0x0002:0x0012])
        destUUID = bytes(packet[0x0012:0x0022])
        if s1 != 0xE4 or s2 != 0xEB:
            raise RuntimeError("Not a packet!")
        payload = packet[0x26:-2]
        return srcUUID, destUUID, pcls, pid, payload
    
    @classmethod
    def matches(cls, packetClass:int, packetID:int):
        return True

In [4]:
class E4E_Data_IMU(binaryPacket):
    __class = 0x04
    __id = 0x00
    __VERSION = 0x01

    def __init__(self, src:uuid.UUID, dest:uuid.UUID, accX, accY, accZ, gyroX, gyroY, gyroZ, magX, magY, magZ, timestamp:dt.datetime = None):
        self.acc = [accX, accY, accZ]
        self.gyro = [gyroX, gyroY, gyroZ]
        self.mag = [magX, magY, magZ]
        if timestamp is None:
            timestamp = dt.datetime.now()
        self.timestamp = timestamp
        
        
        payload = struct.pack("<BBQ3f3f3f",
                                   self.__VERSION,
                                   0x00,
                                   int(timestamp.timestamp() * 1e3),
                                   *self.acc,
                                   *self.gyro,
                                   *self.mag)
        super(E4E_Data_IMU, self).__init__(payload, self.__class, self.__id, src, dest)
        
    @classmethod
    def matches(cls, packetClass:int, packetID:int):
        return packetClass == self.__class and packetID == self.__id
    
    @classmethod
    def from_bytes(cls, packet:bytes):
        srcUUID, destUUID, pcls, pid, payload = cls.parseHeader(packet)
        ver, _, timestamp_ms, accX, accY, accZ, gyroX, gyroY, gyroZ, magX, magY, magZ = struct.unpack("<BBQ3f3f3f", payload)
        if ver != cls.__VERSION:
            return RuntimeError("Unknown packet version!")
        
        timestamp = dt.datetime.fromtimestamp(timestamp_ms / 1e3)
        return E4E_Data_IMU(src=uuid.UUID(bytes=srcUUID),
                           dest=uuid.UUID(bytes=destUUID),
                           accX = accX,
                           accY = accY,
                           accZ = accZ,
                           gyroX = gyroX,
                           gyroY = gyroY,
                           gyroZ = gyroZ,
                           magX = magX,
                           magY = magY,
                           magZ = magZ,
                           timestamp=timestamp)

In [5]:
class E4E_Data_Audio_raw8(binaryPacket):
    __class = 0x04
    __id = 0x01
    __VERSION = 0x01
    
    def __init__(self, audioData:np.ndarray, src:uuid.UUID, dest:uuid.UUID, timestamp:dt.datetime=None):
        assert(len(audioData.shape) == 2)
        nChannels = audioData.shape[0]
        nSamples = audioData.shape[1]
        if timestamp is None:
            timestamp = dt.datetime.now()
        
        self.audioData = audioData
        payload = struct.pack("<BBHQ",
                             self.__VERSION,
                             nChannels,
                              nSamples,
                              int(timestamp.timestamp() * 1e3))
        for channel in range(nChannels):
            for sampleIdx in range(nSamples):
                payload += struct.pack("<B", int(audioData[channel, sampleIdx]))
        super(E4E_Data_Audio_raw8, self).__init__(payload, self.__class, self.__id, src, dest)
    
    @classmethod
    def matches(cls, packetClass:int, packetID:int):
        return packetClass == self.__class and packetID == self.__id
    
    @classmethod
    def from_bytes(cls, packet:bytes):
        srcUUID, destUUID, pcls, pid, payload = cls.parseHeader(packet)
        ver, nChannels, nSamples, timestamp_ms = struct.unpack("<BBHQ", payload[0:0x0C])
        audioBytes = payload[0x0C:]
        assert(len(audioBytes) == nChannels * nSamples)
        idx = 0
        audioData = np.zeros((nChannels, nSamples))
        for channel in range(nChannels):
            for sampleIdx in range(nSamples):
                audioData[channel, sampleIdx] = audioBytes[idx]
                idx += 1
        timestamp = dt.datetime.fromtimestamp(timestamp_ms / 1e3)
        return E4E_Data_Audio_raw8(audioData, uuid.UUID(bytes=srcUUID), uuid.UUID(bytes=destUUID), timestamp)

In [6]:
class E4E_Data_Audio_raw16(binaryPacket):
    __class = 0x04
    __id = 0x02
    __VERSION = 0x01
    
    def __init__(self, audioData:np.ndarray, src:uuid.UUID, dest:uuid.UUID, timestamp:dt.datetime=None):
        assert(len(audioData.shape) == 2)
        nChannels = audioData.shape[0]
        nSamples = audioData.shape[1]
        if timestamp is None:
            timestamp = dt.datetime.now()
        
        self.audioData = audioData
        payload = struct.pack("<BBHQ",
                             self.__VERSION,
                             nChannels,
                              nSamples,
                              int(timestamp.timestamp() * 1e3))
        for channel in range(nChannels):
            for sampleIdx in range(nSamples):
                payload += struct.pack("<H", int(audioData[channel, sampleIdx]))
        super(E4E_Data_Audio_raw16, self).__init__(payload, self.__class, self.__id, src, dest)
    
    @classmethod
    def matches(cls, packetClass:int, packetID:int):
        return packetClass == self.__class and packetID == self.__id
    
    @classmethod
    def from_bytes(cls, packet:bytes):
        srcUUID, destUUID, pcls, pid, payload = cls.parseHeader(packet)
        ver, nChannels, nSamples, timestamp_ms = struct.unpack("<BBHQ", payload[0:0x0C])
        audioBytes = payload[0x0C:]
        assert(len(audioBytes) == nChannels * nSamples * 2)
        idx = 0
        audioData = np.zeros((nChannels, nSamples))
        for channel in range(nChannels):
            for sampleIdx in range(nSamples):
                audioData[channel, sampleIdx], = struct.unpack("<H", audioBytes[idx * 2 : idx * 2 + 2])
                idx += 1
        timestamp = dt.datetime.fromtimestamp(timestamp_ms / 1e3)
        return E4E_Data_Audio_raw16(audioData, uuid.UUID(bytes=srcUUID), uuid.UUID(bytes=destUUID), timestamp)

In [7]:
class binaryPacketParser:
    class State(enum.Enum):
        FIND_SYNC1 = 0
        FIND_SYNC2 = 1
        HEADER = 2
        PAYLOAD = 3
        CKSUM = 4
        VALIDATE = 5
    packetMap = {
        0x0400: E4E_Data_IMU,
        0x0401: E4E_Data_Audio_raw8,
        0x0402: E4E_Data_Audio_raw16
    }
    
    HEADER_LEN = 0x0026
    
    def __init__(self):
        self.__state = self.State.FIND_SYNC1
        self.__payloadLen = 0
        self.__buffer = None
    
    def parseByte(self, data:int):
        if self.__state is self.State.FIND_SYNC1:
            if data == 0xE4:
                self.__state = self.State.FIND_SYNC2
                self.__buffer = bytearray()
                self.__buffer.append(data)
            return None
        elif self.__state is self.State.FIND_SYNC2:
            if data == 0xEB:
                self.__state = self.State.HEADER
                self.__buffer.append(data)
            else:
                self.__state = self.State.FIND_SYNC1
            return None
        elif self.__state is self.State.HEADER:
            self.__buffer.append(data)
            if len(self.__buffer) == self.HEADER_LEN:
                self.__state = self.State.PAYLOAD
                self.__payloadLen, = struct.unpack('<H', self.__buffer[self.HEADER_LEN - 2:self.HEADER_LEN])
            return None
        elif self.__state is self.State.PAYLOAD:
            self.__buffer.append(data)
            if len(self.__buffer) == self.__payloadLen + self.HEADER_LEN:
                self.__state = self.State.CKSUM
            return None
        elif self.__state is self.State.CKSUM:
            self.__buffer.append(data)
            self.__state = self.State.VALIDATE
            return None
        elif self.__state is self.State.VALIDATE:
            self.__buffer.append(data)
            if binascii.crc_hqx(self.__buffer, 0xFFFF) != 0:
                raise RuntimeError("Checksum verificatin failed")
            packetID, = struct.unpack('>H', self.__buffer[0x0022:0x0024])
            self.__state = self.State.FIND_SYNC1
            if packetID not in self.packetMap:
                return binaryPacket.from_bytes(self.__buffer)
            else:
                return self.packetMap[packetID].from_bytes(self.__buffer)
    
    def parseBytes(self, data:bytes):
        packets = []
        for byte in data:
            retval = self.parseByte(byte)
            if retval is not None:
                packets.append(retval)
        return packets

In [8]:
imuPacket = E4E_Data_IMU(src=uuid.uuid4(),
                        dest=uuid.uuid4(),
                        accX=1.0,
                        accY=2.0,
                        accZ=3.0,
                        gyroX=4.0,
                        gyroY=5.0,
                        gyroZ=6.0,
                        magX=7.0,
                        magY=8.0,
                        magZ=9.0,
                        timestamp=dt.datetime(2020, 12, 24, 0, 13, 13))
print(imuPacket)
print(hex(len(imuPacket.to_bytes())))
parser = binaryPacketParser()
packets = parser.parseBytes(imuPacket.to_bytes())
print(packets)
print(type(packets[0]))

0xE4EB 8C24 0255 B2B6 4784 80A1 1FE0 DD13 EB6C 9B3E EFC0 C80D 42CA 9F56 F5DA 1382 8CD3 0400 2E00 0100 A81D CF93 7601 0000 0000 803F 0000 0040 0000 4040 0000 8040 0000 A040 0000 C040 0000 E040 0000 0041 0000 1041 3172
0x56
[0xE4EB 8C24 0255 B2B6 4784 80A1 1FE0 DD13 EB6C 9B3E EFC0 C80D 42CA 9F56 F5DA 1382 8CD3 0400 2E00 0100 A81D CF93 7601 0000 0000 803F 0000 0040 0000 4040 0000 8040 0000 A040 0000 C040 0000 E040 0000 0041 0000 1041 3172]
<class '__main__.E4E_Data_IMU'>


In [9]:
rng = np.random.default_rng()
audioData = rng.integers(255, size=(2, 256))
audioPacket = E4E_Data_Audio_raw8(audioData = audioData,
                                 src=uuid.uuid4(),
                                 dest=uuid.uuid4(),
                                 timestamp=dt.datetime(2020, 12, 24, 20, 12, 8))
print(audioPacket)
print(hex(len(audioPacket.to_bytes())))
packets = parser.parseBytes(audioPacket.to_bytes())
print(packets)
print(type(packets[0]))

0xE4EB AC08 F72E 31E1 43CA A2BC 362B 7347 7E5E A898 BCAF 7CC8 45E2 A8C9 324F F6F0 0535 0401 0C02 0102 0001 C0C1 1898 7601 0000 C420 0BD9 00A8 A7AC A26E FCE4 2D2E 5A1F 50E7 1745 9141 66FD 508F 5192 3105 F82B 2A29 B0CA D529 54B7 5367 4066 C055 A35A 6A29 67DD 41E2 8727 CE3A E6C5 4EAB 6ED0 1BD4 AF41 ED50 22DC CEE0 E56E 33F8 5895 E7A7 3F6A 5618 4573 522C 5744 3EB1 3BEF CF60 558F 8B95 8288 C72B BF94 E65D 1701 1863 0A7F C5B1 052F A51D 5F5F A4C0 BE9F A4EF A74A 231B 32C9 1CC5 E251 AA3E 9EB6 2C0D EA25 E089 1E7D 2D39 4731 F479 803D 05CD 41BF 2860 74D6 48A0 8253 0048 886F 9CE4 D011 5812 D0A8 AD8C 9CD2 15AF F88F 7A0F 62E1 4615 4462 2E65 48E8 7089 F60A FD8E BF8B D435 AAE3 42D4 BA5D 8863 EFBE 04E6 85E7 9019 A57C 5F53 3AC6 6B2C 8504 3088 C9C8 CFD7 A626 B105 44BE 2756 E3A5 C4C5 8589 B413 7471 E7F2 6362 41DC 118A CB61 ECA7 09AD 7610 58EB EA49 A9B2 2315 C70B D01A AAEE 3158 A238 ADBD 5C7F 7750 4B0F 7843 107F D069 08E5 BAA0 1EE2 EE29 48CA F458 2CC7 7E8B 3F6B 6433 16D6 A1D9 F680 0A37 B3BD AB3A F76B 08BA FCA

In [10]:
rng = np.random.default_rng()
audioData = rng.integers(32767, size=(2, 256))
audioPacket = E4E_Data_Audio_raw16(audioData = audioData,
                                 src=uuid.uuid4(),
                                 dest=uuid.uuid4(),
                                 timestamp=dt.datetime(2020, 12, 24, 20, 12, 8))
print(audioPacket)
print(hex(len(audioPacket.to_bytes())))
packets = parser.parseBytes(audioPacket.to_bytes())
print(packets)
print(type(packets[0]))

0xE4EB E699 9299 31BB 4357 AAD9 F3CF 691D 0A68 C6EE 24D9 E346 4B36 9F36 F41B 83E2 209E 0402 0C04 0102 0001 C0C1 1898 7601 0000 795F 6139 4610 832E 307B 0C56 CC6A BA7D F70F 205B E04A E85A F161 C044 5F6C 4A27 F94A 106D 726F 451F 0A7B 534D 7929 CC06 BC52 0170 0D66 E040 AC6B 5C2D 8824 AC34 1B00 DD23 3071 AF58 0D49 8E1C 587F 970E 5E7C C30D 292F 1648 3109 7E4C 0B2B 0548 1311 4237 621C F169 D16F C126 DA53 3977 AD35 E815 681A 2728 C729 446A 1A4D 1717 CF27 B137 9117 5A7A 5970 DB38 965A FD14 E03B 566B 6A0D 9D2C D243 2476 C502 0E79 3F2E 6E1D 9B07 5A2B 9301 010F 5D25 1D5A D718 9A47 932C A769 9F7D 9572 FF5B 076A 1309 9909 357F DE05 0760 FA47 8560 C37C 0257 AA6E 062B 1F63 F744 B41B 0E1D EC3C C000 3935 A25E 8924 E73F EC33 E074 C22A 4774 1235 0D67 4D41 8208 380B 195B 5A0E 9B28 9C6D 1A74 C173 8E67 9E11 5E21 0159 B261 5A24 791E B324 BB46 2568 C33B 6416 DC58 E067 DA46 5A0C 8267 C15F 0762 E77D 7F0D FC1E 9D31 1274 E077 F677 4146 EA75 623C 2802 7470 6E62 B079 E761 322D DA27 FB25 5B58 2042 050D 0D12 6A32 107