In [11]:
from construct import Struct, Int32ul, Int32sl, Byte, this, Computed, If, RepeatUntil

format_word = Struct(
    "daq_major" / Byte,
    "daq_minor" / Byte,
    "data_format_major" / Byte,
    "data_format_minor" / Byte
)

two_word_file_header = Struct(
    "endian_indicator" / Int32ul,
    "data_format" / format_word
)

detector_config_header = Struct(
    "header_number" / Int32ul,
    "config_record_len" / Int32ul,
    "repeat_value" / Computed(
        lambda this: (this.config_record_len // 72) + (this.config_record_len // 144)
    )
)

charge_config_header = Struct(
    "charge_config_len" / Int32ul,
    "detector_code" / Int32sl,
    "tower_number" / Int32sl,
    "channel_post_amp" / Int32sl,
    "channel_bias" / Int32sl,
    "rtf_offset" / Int32sl,
    "delta_t" / Int32sl,
    "trigger_time" / Int32sl,
    "trace_len" / Int32sl
)

phonon_config_header = Struct(
    "phonon_channel_config_record_len" / Int32ul,
    "detector_code" / Int32sl,
    "tower_number" / Int32sl,
    "post_amp_gain" / Int32sl,
    "qet_bias" / Int32sl,
    "squid_bias" / Int32sl,
    "squid_lockpoint" / Int32sl,
    "rtf_offset" / Int32sl,
    "variable_gain" / Int32sl,
    "delta_t" / Int32sl,
    "trigger_time" / Int32sl,
    "trace_len" / Int32sl
)

header_list = Struct(
    "header_number" / Int32ul,
    "charge_config" / If(
        lambda this: this.header_number == 0x10002,
        charge_config_header
    ),
    "phonon_config" / If(
        lambda this: this.header_number == 0x10001,
        phonon_config_header
    )
)

event_header = Struct(
    "event_header_word" / Int32ul,
    "event_size" / Int32ul,
    "event_identifier" / Computed(
        lambda this: (this.event_header_word >> 16) & 0xFFFF
    ),
    "event_class" / Computed(
        lambda this: (this.event_header_word >> 8) & 0xF
    ),
    "event_category" / Computed(
        lambda this: (this.event_header_word >> 12) & 0xF
    ),
    "event_type" / Computed(
        lambda this: (this.event_header_word & 0xFF)
    )
)

logical_record_header = Struct(
    "admin_header" / Int32ul,
    "admin_len" / Int32ul,
    "series_number_1" / Int32ul,
    "series_number_2" / Int32ul,
    "event_number_in_series" / Int32ul,
    "seconds_from_epoch" / Int32ul,
    "time_from_last_event" / Int32ul,
    "live_time_from_last_event" / Int32ul
)

trace_record = Struct(
    "trace_header" / Int32ul,
    "trace_len" / Int32ul,
    "trace_bookkeeping_header" / Int32ul,
    "bookkeeping_len" / Int32ul,
    "digitizer_base_address" / Int32ul,
    "digitizer_channel" / Int32ul,
    "detector_code" / Int32ul,
    "timebase_header" / Int32ul,
    "timebase_len" / Int32ul,
    "t0_in_ns" / Int32ul,
    "delta_t_ns" / Int32ul,
    "num_of_points" / Int32ul,
    "second_trace_header" / Int32ul,
    "num_samples" / Int32ul
)

data_sample = Struct(
    "data_selection" / Int32ul,
    "sample_a" / Computed(
        lambda this: (this.data_selection >> 16) & 0xFFFF
    ),
    "sample_b" / Computed(
        lambda this: (this.data_selection & 0xFFFF)
    )
)

trace_data = Struct(
    "trace_rcrd" / trace_record,
    "sample_data" / RepeatUntil(
        lambda ctx, sample_list: len(sample_list) >= ctx.trace_rcrd.num_samples,
        data_sample
    )
)

events = Struct(
    "event_hdr" / event_header,
    "logic_rcrd" / logical_record_header,
    "unknown" / Byte[76],
    "data" / RepeatUntil(
        lambda ctx, data_list: len(data_list) >= ctx._root.detector_header.repeat_value,
        trace_data
    ),
    "unknown2" / Byte[lambda ctx: ctx.event_hdr.event_size - 747468]
)

# The documentation makes it unclear if the following will be part of the events struct or not
soudan_history_buffer = Struct(
    
)

trigger_record = Struct(
    
)

tlb_trigger_mask_record = Struct(
    
)

gps_data = Struct(
    
)

detector_trigger_threshold_data = Struct(
    
)

detector_trigger_rates = Struct(
    
)

veto_trigger_rates = Struct(
    
)

In [12]:
soudan = Struct(
    "file_hdr" / two_word_file_header,
    "detector_hdr" / detector_config_header,
    "hdrs" / RepeatUntil(
        lambda ctx, hdr_list: len(hdr_list) >= ctx.detector_hdr.repeat_value,
        header_list
    ),
    "repeat_events" / RepeatUntil(
        lambda ctx, event_list: len(event_list) >= ctx.detector_hdr.repeat_value,
        events
    )
    
)

In [None]:
def parse_file(input_path, output_path):
    with open(input_path, 'rb') as f:
        raw_data = f.read()
        parsed_data = soudan.parse(raw_data)

    with open(output_path, 'wb') as f:
        f.write(parsed_data)

