# Reading IQ Data

Only care about reading the raw I/Q samples. In the interest of time, other information are neglected since the development time could be overwhelming.

In [None]:
import os
import glob
import struct
import numpy as np

file = os.path.expanduser('~/Downloads/20200614_150015.256007-87-02.iqData.XXXX.AKITA.dat')

In [None]:
class Object(object):
    pass

# I/Q data
def read_iq_block(align_num, num, fid):
    if align_num:
        tmp = fid.read(align_num * 4)
        raw = np.frombuffer(tmp, np.int16)
        return np.array(raw[::2][:num] + 1j * raw[1::2][:num])
    else:
        return np.empty(0)

In [None]:
fid = open(file, 'rb')

In [None]:
# Seems to have 24-byte header
fid.seek(24, 0)

# PACK data header
tmp = fid.read(32 * 4)
n = struct.unpack('IIIIIHHI', tmp[:28])
pack_header = Object()
pack_header.pack_check_code_1 = n[0]
pack_header.pack_check_code_2 = n[1]
pack_header.pack_check_counter = n[2]
pack_header.num_cpi = n[3]
pack_header.pack_data_size = n[4]
pack_header.division_number = n[5]
pack_header.division_part_number = n[6]
pack_header.mode = n[7] & 0x2 >> 1

print('CPI check code 1: {:X}'.format(pack_header.pack_check_code_1))
print('CPI check code 2: {:X}'.format(pack_header.pack_check_code_2))
print('CPI check counter: {}'.format(pack_header.pack_check_counter))
print('Number of CPI: {}'.format(pack_header.num_cpi))
print('CPI data size: {}'.format(pack_header.num_cpi))
print('Pack data size: {}'.format(pack_header.pack_data_size))
print('Division number: {} / {}'.format(
    pack_header.division_number, pack_header.division_part_number))
print('Observation mode: {}'.format(pack_header.mode))

In [None]:
# CPI data header
tmp = fid.read(32 * 4)
n = struct.unpack('IIIIHHHHHHHHHHIII', tmp[:12*4])
cpi_header = Object()
cpi_header.check_code_1 = n[0]
cpi_header.check_code_2 = n[1]
cpi_header.check_counter = n[2]
cpi_header.data_size = n[3]
cpi_header.pls_code = n[4]
cpi_header.num_samples = n[5]
cpi_header.align_num_range_short_hi = n[6]
cpi_header.align_num_range_short_lo = n[7]
cpi_header.align_num_range_long_hi = n[8]
cpi_header.align_num_range_long_lo = n[9]
cpi_header.num_range_short_hi = n[10]
cpi_header.num_range_short_lo = n[11]
cpi_header.num_range_long_hi = n[12]
cpi_header.num_range_long_lo = n[13]

print('')
print('cpi_header.check_code_1: {:08X}'.format(cpi_header.check_code_1))
print('cpi_header.check_code_2: {:08X}'.format(cpi_header.check_code_2))
print('cpi_header.check_counter: {}'.format(cpi_header.check_counter))
print('cpi_header.data_size: {}'.format(cpi_header.data_size))
print('cpi_header.pls_code: {}'.format(cpi_header.pls_code))
print('cpi_header.num_samples: {}'.format(cpi_header.num_samples))
print('cpi_header.align_num_range_short_hi: {}'.format(cpi_header.align_num_range_short_hi))
print('cpi_header.align_num_range_short_lo: {}'.format(cpi_header.align_num_range_short_lo))
print('cpi_header.align_num_range_long_hi: {}'.format(cpi_header.align_num_range_long_hi))
print('cpi_header.align_num_range_long_lo: {}'.format(cpi_header.align_num_range_long_lo))
print('cpi_header.num_range_short_hi: {}'.format(cpi_header.num_range_short_hi))
print('cpi_header.num_range_short_lo: {}'.format(cpi_header.num_range_short_lo))
print('cpi_header.num_range_long_hi: {}'.format(cpi_header.num_range_long_hi))
print('cpi_header.num_range_long_lo: {}'.format(cpi_header.num_range_long_lo))

In [None]:
# PRI data
tmp = fid.read(32 * 4)

# PRI data header
n = struct.unpack('IIHHHH', tmp[:4*4])
pri_header = Object()
pri_header.check_code_1 = n[0]
pri_header.check_code_2 = n[1]
pri_header.reserved = n[2]
pri_header.check_counter = n[3]
pri_header.elevation = (n[4] & 0x3FFF) * 360 / 2 ** 14
pri_header.azimuth = (n[5] & 0x3FFF) * 180 / 2 ** 13

# H channel data
h_long_hi = read_iq_block(cpi_header.align_num_range_long_hi, cpi_header.num_range_long_hi, fid)
h_long_lo = read_iq_block(cpi_header.align_num_range_long_lo, cpi_header.num_range_long_lo, fid)
h_short_hi = read_iq_block(cpi_header.align_num_range_short_hi, cpi_header.num_range_short_hi, fid)
h_short_lo = read_iq_block(cpi_header.align_num_range_short_lo, cpi_header.num_range_short_lo, fid)

# V channel data
v_long_hi = read_iq_block(cpi_header.align_num_range_long_hi, cpi_header.num_range_long_hi, fid)
v_long_lo = read_iq_block(cpi_header.align_num_range_long_lo, cpi_header.num_range_long_lo, fid)
v_short_hi = read_iq_block(cpi_header.align_num_range_short_hi, cpi_header.num_range_short_hi, fid)
v_short_lo = read_iq_block(cpi_header.align_num_range_short_lo, cpi_header.num_range_short_lo, fid)

print('pri_header.check_code_1: {:08X}'.format(pri_header.check_code_1))
print('pri_header.check_code_2: {:08X}'.format(pri_header.check_code_2))
print('pri_header.check_counter: {}'.format(pri_header.check_counter))
print('pri_header.elevation: {}'.format(pri_header.elevation))
print('pri_header.azimuth: {}'.format(pri_header.azimuth))
print('')
print('len(h_long_hi) = {}'.format(len(h_long_hi)))
print('len(h_long_lo) = {}'.format(len(h_long_lo)))
print('len(h_short_hi) = {}'.format(len(h_short_hi)))
print('len(h_short_lo) = {}'.format(len(h_short_lo)))
print('')
print('len(v_long_hi) = {}'.format(len(v_long_hi)))
print('len(v_long_lo) = {}'.format(len(v_long_lo)))
print('len(v_short_hi) = {}'.format(len(v_short_hi)))
print('len(v_short_lo) = {}'.format(len(v_short_lo)))

In [None]:
fid.close()