In [4]:
class Packet:
    def __init__(self, magic_word, header, tlvs):
        self.magic_word = magic_word
        self.header = header
        self.tlvs = tlvs

class Header:
    def __init__(self, version, total_packet_len, platform, frame_number, time_cpu_cycles, num_detected_obj, num_tlvs, sub_frame_number):
        self.version = version
        self.total_packet_len = total_packet_len
        self.platform = platform
        self.frame_number = frame_number
        self.time_cpu_cycles = time_cpu_cycles
        self.num_detected_obj = num_detected_obj
        self.num_tlvs = num_tlvs
        self.sub_frame_number = sub_frame_number


class TLV:
    def __init__(self, tlv_type, length):
        self.tlv_type = tlv_type
        self.length = length

class DetectedObjectsTLV(TLV):
    def __init__(self, tlv_type, length, object_coordinates):
        super().__init__(tlv_type, length)
        self.object_coordinates = object_coordinates

class StatsTLV(TLV):
    def __init__(self, tlv_type, length, inter_process, transmit_out, frame_margin, chirp_margin, active_cpu_load, inter_cpu_load):
        super().__init__(tlv_type, length)
        self.inter_process = inter_process
        self.transmit_out = transmit_out
        self.frame_margin = frame_margin
        self.chirp_margin = chirp_margin
        self.active_cpu_load = active_cpu_load
        self.inter_cpu_load = inter_cpu_load

class StatsTLV(TLV):
    def __init__(self, tlv_type, length, inter_frame_processing_time, transmit_output_time, inter_frame_processing_margin, inter_chirp_processing_margin, active_frame_cpu_load, inter_frame_cpu_load):
        super().__init__(tlv_type, length)
        self.inter_frame_processing_time = inter_frame_processing_time
        self.transmit_output_time = transmit_output_time
        self.inter_frame_processing_margin = inter_frame_processing_margin
        self.inter_chirp_processing_margin = inter_chirp_processing_margin
        self.active_frame_cpu_load = active_frame_cpu_load
        self.inter_frame_cpu_load = inter_frame_cpu_load

class RangeProfileTLV(TLV):
    # Add attributes as needed
    pass


In [2]:
import struct
import sys
from decimal import Decimal
import os
def parse_packet(data):
    # Parse magic word
    magic_word = data[:8]
    data = data[8:]

    # Parse header
    header_data = struct.unpack('4H4I', data[:32])
    header = Header(*header_data[4:])
    data = data[32:]

    # For the optional subframe number
    if header.version > 0x01000005:
        sub_frame_num = struct.unpack('I', data[:4])[0]
        header.sub_frame_number = sub_frame_num
        data = data[4:]

    tlvs = []
    for _ in range(header.num_tlvs):
        (tlv_type, tlv_length) = struct.unpack('2I', data[:8])
        data = data[8:]

        if tlv_type == 1:  # MMWDEMO_OUTPUT_MSG_DETECTED_POINTS
            object_coordinates = parse_detected_objects(data, tlv_length)
            tlvs.append(DetectedObjectsTLV(tlv_type, tlv_length, object_coordinates))
        elif tlv_type == 6:  # MMWDEMO_OUTPUT_MSG_STATS
            stats_data = struct.unpack('6I', data[:24])
            tlvs.append(StatsTLV(tlv_type, tlv_length, *stats_data))
        data = data[tlv_length:]

    return Packet(magic_word, header, tlvs)



In [3]:
import struct

def process_file(filename):
    # Read the binary data from the file
    with open(filename, "rb") as rawDataFile:
        rawData = rawDataFile.read()

    # Find the offset for the magic word
    magic = b'\x02\x01\x04\x03\x06\x05\x08\x07'
    offset = rawData.find(magic)
    if offset == -1:
        print("Magic sequence not found in the file!")
        return

    # Shift rawData to the start of the magic word
    rawData = rawData[offset:]

    # Parse the packets
    packets = []
    while rawData:
        # Parse the current packet
        packet = parse_packet(rawData)

        # Append the packet to the list
        packets.append(packet)

        # Shift rawData to the start of the next packet
        rawData = rawData[packet.header.length:]

    return packets

if __name__ == '__main__':
    filename = "D:/REPOS/mmw/slope-diff-data/1/xwr16xx_processed_stream_2023_09_14T16_41_23_134.dat"  # Adjust this to your filename
    packets = process_file(filename)

    # Example: Print details from parsed packets
    for packet in packets:
        print(f"Frame number: {packet.header.frame_num}")
        for tlv in packet.tlvs:
            if isinstance(tlv, DetectedObjectsTLV):
                print("Detected Objects:", tlv.object_coordinates)
            elif isinstance(tlv, StatsTLV):
                print(f"CPU Load: {tlv.active_cpu_load}")
                # ... and so on for other TLV types ...

error: unpack requires a buffer of 8 bytes