In [1]:
import os

import numpy as np
import scipy as sp
import matplotlib.pyplot as plt

from osgeo import gdal
from scipy.io import loadmat
from scipy.fft import ifft, fft, fftshift, ifftshift, fftfreq
from scipy.fft import ifft2, fft2
from scipy.constants import pi, c

In [2]:
from s1_level0_structs import (
    PRIMARY_HEADER,
    PRIMARY_HEADER_FIELDS,
    SECONDARY_HEADER,
    SECONDARY_HEADER_FIELDS,
    SUB_COMMUTATIVE_DATA_SERVICE,
    SUB_COMM_KEY_POS,
    SUB_COMM_KEY_VAL,
)

In [3]:
def create_bit_string(bytes_string: str):
    bit_string = ''
    for byte in bytes_string:
        bit_string += '{:08b}'.format(byte)
    return bit_string


def read_and_pop(bit_string: str, bit_length: int):
    return bit_string[0:bit_length], bit_string[bit_length:]


def bit_len_map(brc: int):
    match brc:
        case 0:
            k = 4
        case 1:
            k = 5
        case 2:
            k = 7
        case 3:
            k = 10
        case 4:
            k = 16
        case other:
            raise ValueError(f'Invalid BRC Value Encountered in `bit_len_map`: {brc}')
    return k


# Avert your eyes.
def huffman_4(bit_string):
    k = bit_len_map(4)
    m_code = 0
    bit, bit_string = read_and_pop(bit_string, 1)
    bit_len = 1
    if bit == '0':  # 0 <-
        bit, bit_string = read_and_pop(bit_string, 1)
        bit_len += 1
        if bit == '0':  # 0 <- 0 <-
            return 0, bit_string, bit_len
        else:
            bit, bit_string = read_and_pop(bit_string, 1)
            bit_len += 1
            return (1, bit_string, bit_len) if bit == '0' else (2, bit_string, bit_len)  # 0 or 1 <- 1 <- 0
    else:
        bit, bit_string = read_and_pop(bit_string, 1)
        bit_len += 1
        if bit == '0':
            bit, bit_string = read_and_pop(bit_string, 1)
            bit_len += 1
            return (3, bit_string, bit_len) if bit == '0' else (4, bit_string, bit_len)  # 0 or 1 <- 1 <- 0
        else:
            bit, bit_string = read_and_pop(bit_string, 1)
            bit_len += 1
            if bit == '0':
                bit, bit_string = read_and_pop(bit_string, 1)
                bit_len += 1
                return (5, bit_string, bit_len) if bit == '0' else (6, bit_string, bit_len)  # 0 or 1 <- 1 <- 0
            else:
                bit, bit_string = read_and_pop(bit_string, 1)
                bit_len += 1
                if bit == '0':
                    return 7, bit_string, bit_len
                else:
                    bit, bit_string = read_and_pop(bit_string, 1)
                    bit_len += 1
                    if bit == '0':
                        return 8, bit_string, bit_len
                    else:
                        bit, bit_string = read_and_pop(bit_string, 1)
                        bit_len += 1
                        if bit == '0':
                            return 9, bit_string, bit_len
                        else:
                            bit, bit_string = read_and_pop(bit_string, 1)
                            bit_len += 1
                            if bit == '0':
                                bit, bit_string = read_and_pop(bit_string, 1)
                                bit_len += 1
                                return (10, bit_string, bit_len) if bit == '0' else (11, bit_string, bit_len)
                            else:
                                bit, bit_string = read_and_pop(bit_string, 1)
                                bit_len += 1
                                if bit == '0':
                                    bit, bit_string = read_and_pop(bit_string, 1)
                                    bit_len += 1
                                    return (12, bit_string, bit_len) if bit == '0' else (13, bit_string, bit_len)
                                else:
                                    bit, bit_string = read_and_pop(bit_string, 1)
                                    bit_len += 1
                                    return (14, bit_string, bit_len) if bit == '0' else (15, bit_string, bit_len)
    


def huffman_3(bit_string):
    k = bit_len_map(3)
    m_code = 0
    bit, bit_string = read_and_pop(bit_string, 1)
    bit_len = 1
    if bit == '0':
        bit, bit_string = read_and_pop(bit_string, 1)
        bit_len += 1
        if bit == '0':
            return 0, bit_string, bit_len
        else:
            return 1, bit_string, bit_len
    else:
        m_code = 2
        for i in range(k - 3):
            bit, bit_string = read_and_pop(bit_string, 1)
            bit_len += 1
            if bit == '0':
                break
            else:
                m_code += 1
        return m_code, bit_string, bit_len


def huffman(bit_string: str, brc: int):
    if brc == 3:
        return huffman_3(bit_string)
    elif brc == 4:
        return huffman_4(bit_string)
    else:
        bit_len = 0
        k = bit_len_map(brc)
        m_code = 0
        for i in range(k - 1):
            bit, bit_string = read_and_pop(bit_string, 1)
            bit_len += 1
            if bit == '0':
                break
            else:
                m_code += 1
        return m_code, bit_string, bit_len

In [4]:
class Packet:
    def __init__(
        self,
        primary_header,
        secondary_header,
        user_data_field
    ):
        self.primary_header   = primary_header
        self.secondary_header = secondary_header
        self.raw_user_data    = user_data_field

        self.version_number        = int(primary_header['packet_version_number'], 2)
        self.type                  = int(primary_header['packet_type'], 2)
        self.secondary_header_flag = int(primary_header['secondary_header_flag'], 2)
        self.process_id            = int(primary_header['process_id'], 2)
        self.process_category      = int(primary_header['process_category'], 2)
        self.sequence_flags        = int(primary_header['sequence_flags'])
        self.sequence_count        = int(primary_header['packet_sequence_count'], 2)
        self.packet_data_length    = int(primary_header['packet_data_length'], 2)
        self.user_data_length      = (self.packet_data_length + 1) - 62

        self.num_quads   = int(self.secondary_header['num_quadratures'], 2)
        self.num_samples = 2 * self.num_quads

        self.test_mode      = int(self.secondary_header['test_mode'], 2)
        self.baq_mode       = int(self.secondary_header['baq_mode'], 2)
        self.block_length   = int(self.secondary_header['baq_block_length'], 2)
        self.num_baq_blocks = int(np.ceil(2 * self.num_quads / 256))

        self._set_data_format()
        self._decode_user_data_field()


    def _set_num_words_type_c(self):
        num_words_ie = int(np.ceil(self.block_length * self.num_quads / 16))
        num_words_qe = int(np.ceil((self.block_length * self.num_quads + 8 * self.num_baq_blocks) / 16))
        self.num_words = (num_words_ie, num_words_qe)


    def _get_signs_and_m_codes(
        self,
        bit_string: str,
        brc: int,
        last_block: bool
    ) -> ([int], [int], str):
        signs = []
        m_codes = []
        s_code_count = 0
        total_bits = 0
        num_s_codes = 128 if not last_block else self.num_quads - (128 * (self.num_baq_blocks - 1))
        while s_code_count < num_s_codes:
            sign, bit_string = read_and_pop(bit_string, 1)
            m_code, bit_string, bit_len = huffman(bit_string, brc)
            signs.append(sign)
            m_codes.append(m_code)
            s_code_count += 1
            total_bits += (bit_len + 1)
        return  signs, m_codes, bit_string, total_bits
    

    def _decode_ie(self, bit_string):
        ie_bits = 0
        total_bits = 0
        for i in range(self.num_baq_blocks):
            brc, bit_string = read_and_pop(bit_string, 3)
            brc = int(brc, 2)
            if brc <= 4:
                last_block = i == self.num_baq_blocks - 1
                signs, m_codes, bit_string, total_bits = self._get_signs_and_m_codes(bit_string, brc, last_block)
                self.ie_signs.append(signs)
                self.ie_m_codes.append(m_codes)
                self.brc.append(brc)
            else:
                raise ValueError(f'Invalid BRC: {brc} at {i}')
            ie_bits += (total_bits + 3)
        bit_string, ie_offset = self._jump_to_next_word_boundary(bit_string, ie_bits)
        return bit_string, ie_bits + ie_offset


    def _decode_io(self, bit_string):
        io_bits = 0
        total_bits = 0
        for i in range(self.num_baq_blocks):
            brc = self.brc[i]
            last_block = i == self.num_baq_blocks - 1
            signs, m_codes, bit_string, total_bits = self._get_signs_and_m_codes(bit_string, brc, last_block)
            self.io_signs.append(signs)
            self.io_m_codes.append(m_codes)
            io_bits += total_bits
        bit_string, io_offset = self._jump_to_next_word_boundary(bit_string, io_bits)
        return bit_string, io_bits + io_offset


    def _decode_qe(self, bit_string):
        qe_bits = 0
        total_bits = 0
        for i in range(self.num_baq_blocks):
            brc = self.brc[i]
            threshold, bit_string = read_and_pop(bit_string, 8)
            last_block = i == self.num_baq_blocks - 1
            signs, m_codes, bit_string, total_bits = self._get_signs_and_m_codes(bit_string, brc, last_block)
            self.qe_signs.append(signs)
            self.qe_m_codes.append(m_codes)
            self.thresholds.append([threshold])
            qe_bits += (total_bits + 8)
        bit_string, qe_offset = self._jump_to_next_word_boundary(bit_string, qe_bits)
        return bit_string, qe_bits + qe_offset


    def _decode_qo(self, bit_string):
        qo_bits = 0
        total_bits = 0
        for i in range(self.num_baq_blocks):
            brc = self.brc[i]
            last_block = i == self.num_baq_blocks - 1
            signs, m_codes, bit_string, total_bits = self._get_signs_and_m_codes(bit_string, brc, last_block)
            self.qo_signs.append(signs)
            self.qo_m_codes.append(m_codes)
            qo_bits += total_bits
        bit_string, qo_offset = self._jump_to_next_word_boundary(bit_string, qo_bits)
        return bit_string, qo_bits + qo_offset


    def _jump_to_next_word_boundary(self, bit_string, num_bits):
        offset = num_bits % 16
        if offset == 0:
            return bit_string, offset
        next_word_boundary = 16 - offset
        filler, bit_string = read_and_pop(bit_string, next_word_boundary)
        return bit_string, next_word_boundary
    

    def _decode_type_c_data(self):    
        self._set_num_words_type_c()

        s_code_width = 0
        bit_string   = create_bit_string(self.raw_user_data)
        block_width  = (128 * self.block_length - 1) * self.num_baq_blocks

        self.brc        = []
        self.ie_signs   = []
        self.ie_m_codes = []
        self.io_signs   = []
        self.io_m_codes = []
        self.qe_signs   = []
        self.qe_m_codes = []
        self.qo_signs   = []
        self.qo_m_codes = []
        self.thresholds = []

        bit_string, ie_bits = self._decode_ie(bit_string)
        # print(f"Bit String Length after IE: {len(bit_string)}")
        bit_string, io_bits = self._decode_io(bit_string)
        # print(f"Bit String Length after IO: {len(bit_string)}")
        bit_string, qe_bits = self._decode_qe(bit_string)
        # print(f"Bit String Length after QE: {len(bit_string)}")
        bit_string, qo_bits = self._decode_qo(bit_string)
        # print(f"Bit String Length after QO: {len(bit_string)}")
        # print(f"Final Bit String: {bit_string}")

        self.num_ie_bits = ie_bits
        self.num_io_bits = io_bits
        self.num_qe_bits = qe_bits
        self.num_qo_bits = qo_bits

        self.remaining_user_data_bits = len(bit_string)


    def _decode_user_data_field(self):
        if self.data_format == 'A':
            pass
        elif self.data_format == 'B':
            pass
        elif self.data_format == 'C':
            pass
        elif self.data_format == 'D':
            self._decode_type_c_data()


    def _set_data_format(self):
        self.test_mode = int(self.secondary_header['test_mode'], 2)
        self.baq_mode  = int(self.secondary_header['baq_mode'], 2)
        if self.baq_mode == 0:
            if self.test_mode % 3 == 0:
                self.data_format = 'A'
            else:
                self.data_format = 'B'
        elif self.baq_mode in [3, 4, 5]:
            self.data_format = 'C'
        elif self.baq_mode in [12, 13, 14]:
            self.data_format = 'D'
        else:
            raise ValueError(f'No Data Format with BAQ Mode: {self.baq_mode} and Test Mode: {self.test_mode}')

In [5]:
def get_header_dict(header_bytes, header_bit_lengths, header_field_names):
    read_and_pop = lambda bit_string, bit_length: (bit_string[0:bit_length], bit_string[bit_length:])
    bit_string = ''
    for byte in header_bytes:
        bit_string += '{:08b}'.format(byte)

    header_value_array = []
    for bit_len in header_bit_lengths:
        header_field_value, bit_string = read_and_pop(bit_string, bit_len)
        header_value_array.append(header_field_value) 

    index = 0
    header_dict = {}
    for field in header_field_names:
        if index >= len(header_field_names) or index >= len(header_value_array):
            raise ValueError('The number of field names and header values does not match.')
        header_dict[field] = header_value_array[index]
        index += 1

    return header_dict


def packet_generator(raw_data):
    secondary_header_length = 62
    while raw_data:
        primary_header_bytes = raw_data.read(6)
        primary_header = get_header_dict(primary_header_bytes, PRIMARY_HEADER, PRIMARY_HEADER_FIELDS)
        secondary_header_bytes = raw_data.read(62)
        secondary_header = get_header_dict(secondary_header_bytes, SECONDARY_HEADER, SECONDARY_HEADER_FIELDS)
        packet_data_length = primary_header['packet_data_length']
        user_data = None
        if packet_data_length:
            packet_data_length = int(packet_data_length, 2)
            user_data_length = (packet_data_length + 1) - secondary_header_length
            if user_data_length > 0:
                # print(f"User Data Length (Octets): {user_data_length}")
                user_data = raw_data.read(user_data_length)
        yield Packet(
            primary_header = primary_header,
            secondary_header = secondary_header,
            user_data_field = user_data
        )

In [6]:
l0_data_file = 'sar_data/S1A_IW_RAW__0SDV_20240806T135224_20240806T135256_055093_06B68A_AE41.SAFE/s1a-iw-raw-s-vv-20240806t135224-20240806t135256-055093-06b68a.dat'

In [7]:
raw_data_file = open(l0_data_file, 'rb')

In [8]:
PacketGenerator = packet_generator(raw_data_file)

In [9]:
packet = next(PacketGenerator)

In [10]:
num_bytes = (packet.num_ie_bits + packet.num_io_bits + packet.num_qe_bits + packet.num_qo_bits) / 8
print(f"Total Octet Count: {num_bytes}")
user_data_bit_string = create_bit_string(packet.raw_user_data)
user_data_array_bytes = len(user_data_bit_string) / 8
print(f"User Data Length: {user_data_array_bytes}")

Total Octet Count: 18558.0
User Data Length: 18560.0


In [11]:
packet.secondary_header

{'coarse_time': '01010011110111001110110100101010',
 'fine_time': '0001111001001101',
 'sync_marker': '00110101001011101111100001010011',
 'data_take_id': '00001101011011010001010111000000',
 'ecc_number': '00001000',
 'na_1': '0',
 'test_mode': '000',
 'rx_channel_id': '0000',
 'instrument_configuration_id': '00000000000000000000000000000111',
 'sc_data_word_index': '00011011',
 'sc_data_word': '1011111011000011',
 'counter_service': '00000000000000111010101110010101',
 'pri_count': '00000000000000111011011010111000',
 'error_flag': '0',
 'na_2': '00',
 'baq_mode': '01100',
 'baq_block_length': '00011111',
 'na_3': '00000000',
 'range_decimation': '00001000',
 'rx_gain': '00001000',
 'tx_ramp_rate': '1000011001000101',
 'pulse_start_frequency': '0011000000101111',
 'pulse_length': '000000000000011110101111',
 'na_4': '000',
 'rank': '01001',
 'pri': '000000000101010101100011',
 'swst': '000000000000111001100001',
 'swl': '000000000011011010011011',
 'sas_ssb_message': '011111000110000

In [12]:
data_word_indicies = []
for i in range(10):
    packet = next(PacketGenerator)
    data_word_index = int(packet.secondary_header['sc_data_word_index'], 2)
    data_word_indicies.append(data_word_index)

In [13]:
data_word_indicies

[28, 29, 30, 31, 32, 33, 34, 35, 36, 37]