In [35]:
import obspy
import struct
import numpy as np
import math
import array
import matplotlib
import matplotlib.pyplot as plt
from pathlib import Path
print('Previous matplotlib backend: ' + matplotlib.get_backend())
%matplotlib qt5
print('Current matplotlib backend: ' + matplotlib.get_backend())


def _bcd(byte):
    """Decode 1-byte binary code decimals."""

    if isinstance(byte, str):
        try:
            byte = ord(byte)
        except TypeError:
            raise ValueError('not a byte')
    elif isinstance(byte, int):
        if byte > 255:
            raise ValueError('not a byte')

    else:
        raise ValueError('not a byte')
    v1 = byte >> 4
    v2 = byte & 0xF
    return v1, v2

def _decode_bcd(bytes_in):
    """Decode arbitrary length binary code decimals."""
    v = 0
    if isinstance(bytes_in, int):
        bytes_in = bytes([bytes_in])
        disp('bytes in')
    n = len(bytes_in)
    n = n*2 - 1  # 2 values per byte
    for byte in bytes_in:
        v1, v2 = _bcd(byte)
        v += v1*10**n + v2*10**(n-1)
        n -= 2
    return v

def _decode_bin(bytes_in):
    """Decode unsigned ints."""
    if isinstance(bytes_in, int):
        bytes_in = bytes([bytes_in])
    ll = len(bytes_in)
    # zero-pad to 4 bytes
    b = (chr(0)*(4-ll)).encode()
    b += bytes_in
    return struct.unpack('>I', b)[0]

def _decode_flt(bytes_in):
    """Decode single-precision floats."""
    if isinstance(bytes_in, int):
        bytes_in = bytes([bytes_in])
    ll = len(bytes_in)
    # zero-pad to 4 bytes
    b = (chr(0)*(4-ll)).encode()
    b += bytes_in
    f = struct.unpack('>f', b)[0]
    if math.isnan(f):
        f = None
    return f

def _decode_fraction(bytes_in):
    """Decode positive binary fractions."""
    bytes_ord = bytes_in
    bit = ''.join('{:08b}'.format(b) for b in bytes_ord)
    return sum(int(x) * 2**-n for n, x in enumerate(bit, 1))

Previous matplotlib backend: Qt5Agg
Current matplotlib backend: Qt5Agg


In [100]:
smpl_file = '.\data\STRYDE_SEGDv3_Read_Pack_v1.4continuous_data_example_1trace.rsamp'
# smpl_file = ".\data\7.166_15_00_00_SN_2300202892_0.rsamp.segd"
smpl_file = Path(r'data\7.166_15_00_00_SN_2300202892_0.rsamp.segd')
# smpl_file = Path(r'data\STRYDE_SEGDv3_Read_Pack_v1.4continuous_data_example_1trace.rsamp')
# with open(smpl_file, mode='rb') as file: # b is important -> binary
#     fileContent = file.read()


In [101]:
fp = open(smpl_file, 'rb')
buf_header1 = fp.read(32)
buf_header2 = fp.read(32)
buf_header3 = fp.read(32)
fp.close()

In [102]:
general_header1 = dict()
if _decode_bcd(buf_header1[0:2]) > 9999:
    general_header1['file number'] = '{0:0{1}x}'.format(buf_header1[0], 2) + '{0:0{1}x}'.format(buf_header1[1], 2)
else:
    general_header1['file number'] = _decode_bcd(buf_header1[0:2])

general_header1['format code'] = _decode_bcd(buf_header1[2:4])

_year = _decode_bcd(buf_header1[10:11]) + 2000
_nblocks, _jday = _bcd(buf_header1[11])
general_header1['n_additional_blocks'] = _nblocks   # important
_jday *= 100
_jday += _decode_bcd(buf_header1[12:13])
_hour = _decode_bcd(buf_header1[13:14])
_min = _decode_bcd(buf_header1[14:15])
_sec = _decode_bcd(buf_header1[15:16])
general_header1['time'] = obspy.UTCDateTime(year=_year, julday=_jday,
                               hour=_hour, minute=_min, second=_sec)

# 여기 사이에 추가로 더 들어가야 함.
general_header1['scan_type_per_record'] = _decode_bcd(buf_header1[27:28])
general_header1['n_channel_sets_per_scan_type'] = _decode_bcd(buf_header1[28:29])
general_header1['n_sample_skew_32bit_extensions'] = _decode_bcd(buf_header1[29:30])
if _decode_bcd(buf_header1[30:31]) > 99:
    general_header1['extended_header_length'] = 'ff'
else:
    general_header1['extended_header_length'] = _decode_bcd(buf_header1[30:31])
general_header1['external_header_length'] = _decode_bcd(buf_header1[31:32])



In [103]:
general_header1

{'file number': 'ffff',
 'format code': 8036,
 'n_additional_blocks': 2,
 'time': 2021-08-17T15:00:00.000000Z,
 'scan_type_per_record': 1,
 'n_channel_sets_per_scan_type': 5,
 'n_sample_skew_32bit_extensions': 0,
 'extended_header_length': 'ff',
 'external_header_length': 0}

In [104]:
# 이거는 거의 완성
general_header2 = dict()

general_header2['extended_file_number'] = _decode_bin(buf_header2[0:3])
general_header2['extended_channel_sets_per_scan_types'] = _decode_bin(buf_header2[3:5])
general_header2['extended_header_blocks'] = _decode_bin(buf_header2[5:8])
general_header2['external_header_blocks'] = _decode_bin(buf_header2[8:10]) # or extended skew blocks

_rev = ord(buf_header2[10:11])
_rev += ord(buf_header2[11:12])/10.
general_header2['segd_revision_number'] = _rev
general_header2['no_blocks_of_general_trailer'] = _decode_bin(buf_header2[12:16])
general_header2['extended_record_length_in_ms'] = _decode_bin(buf_header2[16:20])
general_header2['record_set_number'] = _decode_bin(buf_header2[20:22])
general_header2['n_extended_additional_blocks'] = _decode_bin(buf_header2[22:24])
general_header2['dominant_sampling_interval_in_us'] = _decode_bin(buf_header2[24:27])
general_header2['external_header_blocks'] = _decode_bin(buf_header2[27:30])
general_header2['header_block_type'] = _decode_bin(buf_header2[31:32])

In [105]:
general_header2

{'extended_file_number': 10003,
 'extended_channel_sets_per_scan_types': 5,
 'extended_header_blocks': 839,
 'external_header_blocks': 0,
 'segd_revision_number': 3.0,
 'no_blocks_of_general_trailer': 0,
 'extended_record_length_in_ms': 3688000000,
 'record_set_number': 0,
 'n_extended_additional_blocks': 2,
 'dominant_sampling_interval_in_us': 2000,
 'header_block_type': 2}

In [106]:
# channel import

buf_channel = []
fp = open(smpl_file, 'rb')
fp.seek((general_header1['n_additional_blocks'] + 1) * 32)
for i in range(general_header1['n_channel_sets_per_scan_type']):
    # buf_channel[str(i+1)] = fp.read(96)
    buf_channel.append(fp.read(96))

fp.close()

In [107]:
def read_channel_set_header(buf_channel):
    ls_channel_set_header = []

    for item in buf_channel:
        channel = dict()
        channel['scan_type'] = _decode_bcd(item[0:1])
        channel['channel_set_number'] = _decode_bin(item[1:3])
        channel['channel_type_identification'] = '{0:0{1}x}'.format(item[3], 2)   # hex
        channel['channel_set_start_time'] = _decode_bcd(item[4:8])
        channel['channel_set_end_time'] = _decode_bcd(item[8:12])
        channel['n_samples_in_each_trac_of_this_channel_set'] = _decode_bin(item[12:16])
        channel['sample_descale_multiplication_factor'] = round(_decode_flt(item[16:20]), 5) # Careful this
        channel['n_channels_in_this_channel_set'] = _decode_bin(item[20:23])
        channel['sampling_interval_in_us'] = _decode_bin(item[23:26])
        ls_channel_set_header.append(channel)

    return ls_channel_set_header

In [108]:
ls_channel_set_header = read_channel_set_header(buf_channel)

# Extended header

In [88]:

fp = open(smpl_file, 'rb')
fp.seek((general_header1['n_additional_blocks'] + 1) * 32 +
        general_header1['n_channel_sets_per_scan_type'] * 96)

test = fp.read(general_header2['extended_header_blocks'] * 32)

fp.close()
test

b'{\n "sync": {\n  "syncEventInfo": [\n   {\n    "syncEvent": "eSYNC_EVENT_OPTICAL_LINK",\n    "localTimeInTicks": "5119000000000",\n    "globalTime": {\n     "tvSec": "1271514446",\n     "tvNsec": 875002000,\n     "timeType": "eTIME_TYPE_GPS"\n    },\n    "opticalSyncSource": "eNODE_TYPE_NEST_NODE"\n   },\n   {\n    "syncEvent": "eSYNC_EVENT_OPTICAL_LINK",\n    "localTimeInTicks": "5119999000000",\n    "globalTime": {\n     "tvSec": "1271516398",\n     "tvNsec": 46873000,\n     "timeType": "eTIME_TYPE_GPS"\n    },\n    "opticalSyncSource": "eNODE_TYPE_MOC"\n   },\n   {\n    "syncEvent": "eSYNC_EVENT_INTERNAL_GNSS",\n    "localTimeInTicks": "5120003584000",\n    "globalTime": {\n     "timeType": "eTIME_TYPE_GPS"\n    },\n    "NmeaString": "@UBX BIN::hdr: 0xb5 62 class: 0x1 id: 0x20 length: 16\\n\\t ::   GPS:: iTOW: 226807000 fTOW: 0 week: 2102 leapS: 18 valid: 7 tACC: 0",\n    "ubxMsgBytes": "tWIBIBAA2MyEDQAAAAA2CBIHAAAAAL1g"\n   },\n   {\n    "syncEvent": "eSYNC_EVENT_INTERNAL_GNSS",\

# Data

In [None]:
#

In [109]:
# demux_trace header안에 trace header가 있는데, demux trace header의 수는
# channel set header에서 channel set * number of channel 곱해서 유추해야 함.

fp = open(smpl_file, 'rb')
fp.seek((general_header1['n_additional_blocks'] + 1) * 32 +
        general_header1['n_channel_sets_per_scan_type'] * 96 +
        general_header2['extended_header_blocks'] * 32)

buf_demux_trace_header = fp.read(20)
# test = fp.read(32)

fp.close()


In [110]:
demux_trace_header = dict()
if _decode_bcd(buf_demux_trace_header[0:2]) > 9999:
    demux_trace_header['file_number'] = '{0:0{1}x}'.format(buf_demux_trace_header[0], 2) + '{0:0{1}x}'.format(buf_demux_trace_header[1], 2)
else:
    demux_trace_header['file_number'] = _decode_bcd(buf_demux_trace_header[0:2])

demux_trace_header['scan_type_number'] = _decode_bcd(buf_demux_trace_header[2:3])
demux_trace_header['channel_set_number'] = _decode_bcd(buf_demux_trace_header[3:4])
demux_trace_header['trace_number'] = _decode_bcd(buf_demux_trace_header[4:6])
demux_trace_header['first_timing_word'] = _decode_bcd(buf_demux_trace_header[6:9])
demux_trace_header['trace_header_extensions'] = _decode_bin(buf_demux_trace_header[9:10])
demux_trace_header['sample_skew'] = _decode_fraction(buf_demux_trace_header[10:11])
demux_trace_header['trace_edit'] = '{0:0{1}x}'.format(buf_demux_trace_header[11], 2)    # Hex



demux_trace_header

{'file_number': 'ffff',
 'scan_type_number': 1,
 'channel_set_number': 1,
 'trace_number': 1,
 'first_timing_word': 0,
 'trace_header_extensions': 11,
 'sample_skew': 0.0,
 'trace_edit': '00'}

352

In [111]:
fp = open(smpl_file, 'rb')
fp.seek((general_header1['n_additional_blocks'] + 1) * 32 +
        general_header1['n_channel_sets_per_scan_type'] * 96 +
        general_header2['extended_header_blocks'] * 32 +
        20 +
        demux_trace_header['trace_header_extensions'] * 32)

test = fp.read(ls_channel_set_header[0]['n_samples_in_each_trac_of_this_channel_set'] * 3)
# test = fp.read(32)

fp.close()

In [112]:
len(test)
ls_num1 = []
ls_num2 = []
for i in range(0, len(test), 3):
    # print(test[i:i+3])
    ls_num1.append(int.from_bytes(test[i:i+3], byteorder='big', signed=True))
    # ls_num2.append(int.from_bytes(test[i:i+3], byteorder='big', signed=True))

In [73]:
test_arr = np.linspace(0, 100, 101)
for i in range(0, 100, 2):
    print(test_arr[i:i+2])

[0. 1.]
[2. 3.]
[4. 5.]
[6. 7.]
[8. 9.]
[10. 11.]
[12. 13.]
[14. 15.]
[16. 17.]
[18. 19.]
[20. 21.]
[22. 23.]
[24. 25.]
[26. 27.]
[28. 29.]
[30. 31.]
[32. 33.]
[34. 35.]
[36. 37.]
[38. 39.]
[40. 41.]
[42. 43.]
[44. 45.]
[46. 47.]
[48. 49.]
[50. 51.]
[52. 53.]
[54. 55.]
[56. 57.]
[58. 59.]
[60. 61.]
[62. 63.]
[64. 65.]
[66. 67.]
[68. 69.]
[70. 71.]
[72. 73.]
[74. 75.]
[76. 77.]
[78. 79.]
[80. 81.]
[82. 83.]
[84. 85.]
[86. 87.]
[88. 89.]
[90. 91.]
[92. 93.]
[94. 95.]
[96. 97.]
[98. 99.]


In [113]:
ls_num1 = np.array(ls_num1)
ls_num1.size

1844000

In [76]:
ls_num1[0:10]

array([-1356, -1411, -1372, -1347, -1381, -1454, -1527, -1499, -1402,
       -1380])

In [65]:
ls_num2[0:10]

[-1356,
 -346881,
 -4915206,
 -1411,
 -360961,
 8257530,
 -1372,
 -350977,
 -5963782,
 -1347]

In [99]:

fig = plt.figure(figsize=plt.figaspect(0.5))
ax = fig.add_subplot(1, 1, 1)
ax.plot(ls_num1[0:50000])
# ax.set_xlabel('Space')
# ax.set_ylabel('Time')
# ax.view_init(35, -120)

[<matplotlib.lines.Line2D at 0x18a55c92b20>]

In [98]:
ls_num1

array([-5151926,   931145,  6557715, ...,        0,        0,        0])

In [114]:
my_geo = dict()
my_geo['data'] = ls_num1
from scipy.io import savemat
savemat('geo_file2.mat', my_geo)

In [20]:
tr = obspy.Trace(ls_num)

In [21]:
tr

... | 1970-01-01T00:00:00.000000Z - 1970-03-05T12:34:59.000000Z | 1.0 Hz, 5488500 samples

In [82]:
num_first = test[3:6]
num_first
num_first

b'\x0e5I'

In [66]:
def sext24(d):
    if ord(d[2]) & 0x80:
        return d+'\xff'
    else:
        return d+'\x00'

In [75]:
def twos_comp(val, bits=16):
        """compute the 2's complement of int value """
        if bits == 0:      # Use as many bits needed for the value.
            bits = val.bit_length()
        return ((val & (2 ** bits) - 1) - (2 ** bits)) * -1

In [77]:
    value = 6752
    print(f'{value:05d} = 0x{value:04x} = 0b{value:016b}')

    value = twos_comp(value)
    print(f'{value:05d} = 0x{value:04x} = 0b{value:016b}')

    value = twos_comp(value)
    print(f'{value:05d} = 0x{value:04x} = 0b{value:016b}')

06752 = 0x1a60 = 0b0001101001100000
58784 = 0xe5a0 = 0b1110010110100000
06752 = 0x1a60 = 0b0001101001100000


In [92]:
my_value = -5151926
my_value = 931145
print(f'ob{my_value:024b}')
print(f'0x{my_value:04x}')

ob000011100011010101001001
0xe3549


In [98]:
my_value.to_bytes(3, 'big')

b'\x0e5I'

In [107]:
num_first.decode()
# num_first.from_bytes()
int.from_bytes(num_first, byteorder='big', signed=True)
# num_first
# num_first
# num_first

TypeError: 'bytes' object cannot be interpreted as an integer

In [105]:
ls_bytes = []
ls_bytes.append(num_first)
ls_bytes.append(num_first)

In [106]:
ls_bytes

[b'\x0e5I', b'\x0e5I']

In [28]:
fp = open(smpl_file, 'rb')

test = fp.read(100)
# test = fp.read(32)

fp.close()

In [29]:
test

b'\xff\xff\x806\x00\x00Hp\x00\x00 1\x12\x15\x00\x00"`\x02\x00\x00\x00\xff\x00\x00\x1f\xff\x01\x05\x00\xff\x00\x00\'\x12\x00\x05\x00\x01v\x00\x00\x03\x00\x00\x00\x00\x00\xda\x17\xe8\xc0\x00\x00\x00\x03\x00\x07\xd0\x00\x00\x00\x00\x02\x00\x04\x84o\xfck\xc4\x80\x00\x00\x00\x00\x00T94\x00\x00\x00\x00\x00T94\x00\x001 \x00\x00\x00\x03\x00\x05\xa3\xce'

In [90]:
buf = array.array('i')
buf.frombytes(test)
buf.byteswap()
buf = np.array(buf, dtype=np.int32)

In [91]:
buf

array([-1318893042,   894002192,   327155711, ...,           0,
                 0,           0])

In [129]:
(general_header1['n_additional_blocks'] + 1) * 32

128

In [102]:
_decode_bin(buf_header2[12:16])

0

In [60]:
_ehl = _decode_bcd(buf_header1[31:32])
_ehl == 0xFF

False

In [62]:
buf_header1[1] == 0xFF

True

In [48]:
_bsi /= 10.

In [52]:
_bsi
_rec_type, _rec_len = _bcd(buf_header1[25])

In [53]:
_rec_type

1

In [54]:
_rec_len

15

In [18]:
# buf_header1[0:2]
buf_header1[0]

255

In [26]:
a = '{0:0{1}x}'.format(buf_header1[0], 2) + '{0:0{1}x}'.format(buf_header1[1], 2)
# '{0:0{1}x}'.format(buf_header1[1], 2)
a

'ffff'