In [1]:
import sys
sys.path.append("..")

In [2]:
import typing

In [3]:
import bpack

In [4]:
try:
    from bpack.typing import type_params_to_str
except ImportError:
    from bpack import EByteOrder
    from bpack.typing import TypeParams

    def type_params_to_str(params: TypeParams) -> str:
        """Convert type parameters into a string describing a data type.

        The returned ``typestr`` is a string describing a data type.

        The *typestr* string format consists of 3 parts:

        * an (optional) character describing the byte order of the data

        - ``<``: little-endian,
        - ``>``: big-endian,
        - ``|``: not-relevant

        * a character code giving the basic type of the array, and
        * an integer providing the number of bytes the type uses

        The basic type character codes are:

        * ``i``: sighed integer
        * ``u``: unsigned integer
        * ``f``: float
        * ``c``: complex
        * ``S``: bytes (string)

        .. note:: *typestr* the format described above is a sub-set of the
        one used in the numpy "array interface".

        .. seealso:: https://numpy.org/doc/stable/reference/arrays.dtypes.html
        and https://numpy.org/doc/stable/reference/arrays.interface.html
        """
        byteorder = params.byteorder
        byteorder = "" if byteorder is None else EByteOrder(byteorder).value

        if params.type is int:
            if params.signed:
                typestr = "i"
            else:
                typestr = "u"
        elif params.type is float:
            typestr = "f"
        elif params.type is complex:
            typestr = "c"
        # elif params.type is datetime.timedelta:
        #     typestr = "m"
        # elif params.type is datetime.datetime:
        #     typestr = "M"
        # elif params.type is str:
        #     typestr = "U"
        elif params.type is bytes:
            typestr = "S"
            # typestr = "V"
        else:
            raise TypeError(
                f"data type '{params.type}' is not suported in bpack"
            )

        return f"{byteorder}{typestr}{params.size}"

In [5]:
try:
    from bpack.descriptors import flat_fields_iterator
except ImportError:
    import copy
    from typing import Iterator
    from bpack.descriptors import (
        Field,
        get_field_descriptor,
        set_field_descriptor,
    )

    def flat_fields_iterator(desctiptor, offset: int = 0) -> Iterator[Field]:
        """Recursively iterate on fields of a descriptor.

        The behaviour of this function is similar to the one of
        :func:`bpack.descriptors.fileds` if the input descriptor do not
        contain fileds that are desctipors (nested).
        The main difference is that this one is an iterator while
        :func:`bpack.descriptors.fileds` returns a tuple.

        If the input desctiptor is nested (i.e. has fields that are
        descriptors), then a the it is visited recursively to return oll
        the fields belonging to the main decriptor and to the nested ones.

        The nested descriptors are replaced by their fields and the
        returned sequence of fields is *flat*.

        .. note:: please note that in case of nested descriptors, the
        returned fields are copy of the original ones, with the `offset`
        attribute adjusted to the relative to the beginning of the root
        desctiptor.
        """
        for field_ in bpack.fields(desctiptor):
            fd = get_field_descriptor(field_)
            fd.offset += offset

            if bpack.is_descriptor(field_.type):
                yield from flat_fields_iterator(field_.type, offset=fd.offset)
            else:
                field_ = copy.copy(field_)
                set_field_descriptor(field_, fd)
                yield field_

In [6]:
import typing


def annotated_to_str(ta: typing.Annotated):
    _, params = typing.get_args(ta)
    return type_params_to_str(params)

In [7]:
import enum


def get_default_str(value):
    if isinstance(value, enum.Enum):
        return f"{value.__class__.__name__}.{value.name}"
    else:
        return str(value)

In [8]:
import bpack


def iter_descriptors(descriptor, recursive: bool = True):
    for f in bpack.fields(descriptor):
        if bpack.is_descriptor(f.type):
            yield f.type
            if recursive:
                yield from iter_descriptors(f.type)

In [9]:
import inspect


def method_or_property(x):
    return inspect.isfunction(x) or inspect.isdatadescriptor(x)

In [10]:
import io
import enum
import inspect
import textwrap
from typing import Optional
from dataclasses import MISSING
from bpack.descriptors import METADATA_KEY
from bpack.codecs import has_codec, get_codec_type, Codec


def gen_flat_descriptor_code(
    descriptor,name: Optional[str] = None, indent = "    "
):
    out = io.StringIO()

    if has_codec(descriptor):
        backend = get_codec_type(descriptor).__module__
        codec_type = "codec" if has_codec(descriptor, Codec) else "decoder"
        print(f"@{backend}.{codec_type}", file=out)
    print(
        f"@bpack.descriptor(baseunits={bpack.baseunits(descriptor).name}, "
        f"byteorder={bpack.byteorder(descriptor).name})",
        file=out
    )
    print(f"class {descriptor.__name__ if name is None else name}:", file=out)
    print(f'{indent}"""{descriptor.__doc__}"""', file=out)
    print(file=out)
    preoffset = 0
    for f in flat_fields_iterator(descriptor):
        if bpack.typing.is_annotated(f.type):
            typestr = f'T["{annotated_to_str(f.type)}"]'
        elif f.type is bool:
            typestr = "bool"
        elif issubclass(f.type, enum.Enum):
            typestr = f.type.__name__
        else:
            raise TypeError(f"unsupported filed type: {f.type!r}")

        metadata = f.metadata[METADATA_KEY]
        size = metadata["size"]
        offset = metadata["offset"]

        annot_or_bool = bpack.typing.is_annotated(f.type) or f.type is bool

        size_str = f"size={size}" if not annot_or_bool else ""
        offset_str = f"offset={offset}" if offset != preoffset else ""
        if f.default_factory is not MISSING:
            default_str = f"default_factory={f.default_factory}"
        elif f.default is not MISSING:
            default_str = f"default={get_default_str(f.default)}"
        else:
            default_str = ""

        if any([size_str, offset_str]) or f.default_factory is not MISSING:
            args = [
                item for item in [size_str, offset_str, default_str] if item
            ]
            field_str = f" = bpack.field({', '.join(args)})"
        elif annot_or_bool and f.default is not MISSING:
            field_str = f" = {get_default_str(f.default)}"
        else:
            field_str = ""

        print(f"{indent}{f.name}: {typestr}{field_str}", file=out)
        preoffset = offset + size

    print(file=out)
    for klass in iter_descriptors(descriptor):
        targets = {
            k: v for k, v in inspect.getmembers(klass, method_or_property)
            if not k.startswith("_")
        }
        targets.pop("tobytes", None)
        targets.pop("frombytes", None)
        if not targets:
            continue
        print(f"{indent}# === {klass.__name__} ===", file=out)

        for m in targets.values():
            print(textwrap.indent(inspect.getsource(m), ""), file=out)
    
    return out.getvalue()

In [11]:
from s1isp.descriptors import SecondaryHeader

code = gen_flat_descriptor_code(SecondaryHeader)

In [12]:
from s1isp.constants import SYNC_MARKER

code = code.replace(str(SYNC_MARKER), "SYNC_MARKER")

In [13]:
import black

mode = black.Mode(target_versions={black.TargetVersion.PY311}, line_length=79)
formatted_code = black.format_str(code, mode=mode)
print(formatted_code)

@bpack.bs.codec
@bpack.descriptor(baseunits=BITS, byteorder=BE)
class SecondaryHeader:
    """Packet Secondary Header (S1-IF-ASD-PL-0007, section 3.2)."""

    coarse_time: T["u32"] = 0
    fine_time: T["u16"] = 0
    sync_marker: T["u32"] = SYNC_MARKER
    data_take_id: T["u32"] = 0
    ecc_num: EEccNumber = bpack.field(size=8, default=EEccNumber.not_set)
    test_mode: ETestMode = bpack.field(
        size=3, offset=121, default=ETestMode.default
    )
    rx_channel_id: ERxChannelId = bpack.field(size=4, default=ERxChannelId.rxv)
    instrument_configuration_id: T["u32"] = 0
    data_word_index: T["u8"] = 0
    data_word: T["S16"] = 0
    space_packet_count: T["u32"] = 0
    pri_count: T["u32"] = 0
    error_flag: bool = False
    baq_mode: EBaqMode = bpack.field(
        size=5, offset=251, default=EBaqMode.BYPASS
    )
    baq_block_length: T["u8"] = 0
    range_decimation: ERangeDecimation = bpack.field(
        size=8, offset=272, default=0
    )
    rx_gain: T["u8"] = 0
    tx_

In [14]:
preamble = '''"""Flat version of the SencondaryHeader."""

import math
from typing import Union

import bpack
import bpack.bs
from bpack import T

from s1isp.luts import (
    lookup_d_value,
    lookup_range_decimation_info,
    lookup_filter_output_offset,
)
from s1isp.enums import (
    EEccNumber,
    ETestMode,
    ERxChannelId,
    EBaqMode,
    ERangeDecimation,
    EPolarization,
    ETemperatureCompensation,
    ESignalType,
    ECalMode,
    ECalType,
    ESasTestMode,
)
from s1isp.constants import SYNC_MARKER, REF_FREQ
from s1isp.descriptors import (
    RangeDecimationInfo,
    SasImgData,
    SasCalData,
    SasData,
    SesData,
    DatationService,
    FixedAncillaryDataService,
    SubCommutatedAncillaryDataService,
    CountersService,
    RadarConfigurationSupportService,
    RadarSampleCountService,
)


BITS = bpack.EBaseUnits.BITS
BE = bpack.EByteOrder.BE


'''

postamble = '''
    # === Services ===
    def get_datation(self) -> DatationService:
        """Return a DatationService instance."""
        return DatationService(self.coarse_time, self.fine_time)

    def get_fixed_ancillary_data(self) -> FixedAncillaryDataService:
        """Return a FixedAncillaryDataService instance."""
        return FixedAncillaryDataService(
            self.sync_marker,
            self.data_take_id,
            self.ecc_num,
            self.test_mode,
            self.rx_channel_id,
            self.instrument_configuration_id,
        )

    def get_subcom_ancillary_data(self) -> SubCommutatedAncillaryDataService:
        """Return a SubCommutatedAncillaryDataService instance."""
        return SubCommutatedAncillaryDataService(
            self.data_word_index,
            self.data_word,
        )

    def get_counters(self) -> CountersService:
        """Return a CountersService instance."""
        return CountersService(self.space_packet_count, self.pri_count)

    def get_sas(self) -> SasData:
        """Return a SasData instance."""
        return SasData(
            self.ssb_flag,
            self.polarization,
            self.temperature_compensation,
            self._dynamic_data,
            self._beam_address,
        )

    def get_ses(self) -> SesData:
        """Return a SesData instance."""
        return SesData(
            self.cal_mode,
            self.tx_pulse_number,
            self.signal_type,
            self.swap,
            self.swath_number,
        )

    def get_radar_configuration_support(
        self
    ) -> RadarConfigurationSupportService:
        """Return a RadarConfigurationSupportService instance."""
        return RadarConfigurationSupportService(
            self.error_flag,
            self.baq_mode,
            self.baq_block_length,
            self.range_decimation,
            self.rx_gain,
            self.tx_ramp_rate,
            self.tx_pulse_start_freq,
            self.tx_pulse_length,
            self.rank,
            self.pri,
            self.swst,
            self.swl,
            self.get_sas(),
            self.get_ses(),
        )

    def get_radar_sample_count(self) -> RadarSampleCountService:
        """Return a RadarSampleCountService instance."""
        return RadarSampleCountService(self.number_of_quads)
'''

if True:
    with open("flat_descriptors.py", "w") as fd:
        fd.write(preamble)
        fd.write(formatted_code)
        fd.write(postamble)

In [15]:
from flat_descriptors import SecondaryHeader as FlatSecondaryHeader

In [16]:
with open("../tests/data/000408-echo.dat", "rb") as fd:
    data = fd.read()

from s1isp.constants import (
    PRIMARY_HEADER_SIZE as PHSIZE,
    SECONDARY_HEADER_SIZE as SHSIZE,
)
data = data[PHSIZE:PHSIZE+SHSIZE]

In [17]:
sh = SecondaryHeader.frombytes(data)

In [18]:

fsh = FlatSecondaryHeader.frombytes(data)

In [19]:
sh

SecondaryHeader(datation=DatationService(coarse_time=1276273467, fine_time=61863), fixed_ancillary_data=FixedAncillaryDataService(sync_marker=892270675, data_take_id=87747936, ecc_num=<EEccNumber.s3_no_ical: 13>, test_mode=<ETestMode.default: 0>, rx_channel_id=<ERxChannelId.rxv: 0>, instrument_configuration_id=1), subcom_ancillary_data=SubCommutatedAncillaryDataService(data_word_index=25, data_word=b'\xbe\xa3'), counters=CountersService(space_packet_count=408, pri_count=4427), radar_configuration_support=RadarConfigurationSupportService(error_flag=False, baq_mode=<EBaqMode.FDBAQ_MODE_0: 12>, baq_block_length=31, range_decimation=<ERangeDecimation.x4_on_9: 4>, rx_gain=12, tx_ramp_rate=34770, tx_pulse_start_freq=12970, tx_pulse_length=1658, rank=10, pri=19499, swst=5271, swl=12178, sas=SasData(ssb_flag=False, polarization=<EPolarization.v_vh: 7>, temperature_compensation=<ETemperatureCompensation.fe_on_ta_on: 3>, _dynamic_data=2, _beam_address=0), ses=SesData(cal_mode=<ECalMode.pcc2_ical

In [20]:
fsh

SecondaryHeader(coarse_time=1276273467, fine_time=61863, sync_marker=892270675, data_take_id=87747936, ecc_num=<EEccNumber.s3_no_ical: 13>, test_mode=<ETestMode.default: 0>, rx_channel_id=<ERxChannelId.rxv: 0>, instrument_configuration_id=1, data_word_index=25, data_word=b'\xbe\xa3', space_packet_count=408, pri_count=4427, error_flag=False, baq_mode=<EBaqMode.FDBAQ_MODE_0: 12>, baq_block_length=31, range_decimation=<ERangeDecimation.x4_on_9: 4>, rx_gain=12, tx_ramp_rate=34770, tx_pulse_start_freq=12970, tx_pulse_length=1658, rank=10, pri=19499, swst=5271, swl=12178, ssb_flag=False, polarization=<EPolarization.v_vh: 7>, temperature_compensation=<ETemperatureCompensation.fe_on_ta_on: 3>, _dynamic_data=2, _beam_address=0, cal_mode=<ECalMode.pcc2_ical_interleaved: 0>, tx_pulse_number=2, signal_type=<ESignalType.echo: 0>, swap=False, swath_number=2, number_of_quads=10779)

In [21]:
assert sh.datation.coarse_time == fsh.coarse_time
assert sh.datation.fine_time == fsh.fine_time

In [22]:
assert sh.fixed_ancillary_data.sync_marker == fsh.sync_marker
assert sh.fixed_ancillary_data.data_take_id == fsh.data_take_id
assert sh.fixed_ancillary_data.ecc_num == fsh.ecc_num
assert sh.fixed_ancillary_data.test_mode == fsh.test_mode
assert sh.fixed_ancillary_data.rx_channel_id == fsh.rx_channel_id
assert sh.fixed_ancillary_data.instrument_configuration_id == fsh.instrument_configuration_id

In [23]:
assert sh.subcom_ancillary_data.data_word_index == fsh.data_word_index
assert sh.subcom_ancillary_data.data_word == fsh.data_word

In [24]:
assert sh.counters.space_packet_count == fsh.space_packet_count
assert sh.counters.pri_count == fsh.pri_count

In [25]:
assert sh.radar_configuration_support.error_flag == fsh.error_flag
assert sh.radar_configuration_support.baq_mode == fsh.baq_mode
assert sh.radar_configuration_support.baq_block_length == fsh.baq_block_length
assert sh.radar_configuration_support.range_decimation == fsh.range_decimation
assert sh.radar_configuration_support.rx_gain == fsh.rx_gain
assert sh.radar_configuration_support.tx_ramp_rate == fsh.tx_ramp_rate
assert sh.radar_configuration_support.tx_pulse_start_freq == fsh.tx_pulse_start_freq
assert sh.radar_configuration_support.tx_pulse_length == fsh.tx_pulse_length
assert sh.radar_configuration_support.rank == fsh.rank
assert sh.radar_configuration_support.pri == fsh.pri
assert sh.radar_configuration_support.swst == fsh.swst
assert sh.radar_configuration_support.swl == fsh.swl

In [26]:
assert sh.radar_configuration_support.sas.ssb_flag == fsh.ssb_flag
assert sh.radar_configuration_support.sas.polarization == fsh.polarization
assert sh.radar_configuration_support.sas.temperature_compensation == fsh.temperature_compensation
assert sh.radar_configuration_support.sas._dynamic_data == fsh._dynamic_data
assert sh.radar_configuration_support.sas._beam_address == fsh._beam_address

In [27]:
assert sh.radar_configuration_support.ses.cal_mode == fsh.cal_mode
assert sh.radar_configuration_support.ses.tx_pulse_number == fsh.tx_pulse_number
assert sh.radar_configuration_support.ses.signal_type == fsh.signal_type
assert sh.radar_configuration_support.ses.swap == fsh.swap
assert sh.radar_configuration_support.ses.swath_number == fsh.swath_number

In [28]:
assert sh.radar_sample_count.number_of_quads == fsh.number_of_quads

In [29]:
assert sh.radar_configuration_support.sas == fsh.get_sas()
assert sh.radar_configuration_support.ses == fsh.get_ses()

assert sh.datation == fsh.get_datation()
assert sh.fixed_ancillary_data == fsh.get_fixed_ancillary_data()
assert sh.subcom_ancillary_data == fsh.get_subcom_ancillary_data()
assert sh.counters == fsh.get_counters()
assert sh.radar_configuration_support == fsh.get_radar_configuration_support()
assert sh.radar_sample_count == fsh.get_radar_sample_count()