Skip to content

Commit

Permalink
Debugging analog parsing.
Browse files Browse the repository at this point in the history
  • Loading branch information
mcferrill committed Mar 24, 2021
1 parent bf6ceaa commit a079f9a
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 33 deletions.
86 changes: 54 additions & 32 deletions chapter10/analog.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,53 @@ class AnalogF1(packet.Packet):
"""

# u4 factor
# u1 same
# p3

# u6 length
# u2 mode

# u8 subchannel_count

# u8 subchannel
csdw_format = BitFormat('''
u6 length
u2 mode
u8 subchannel
u8 subchannel_count
p3
u1 same
u4< factor
u8 one
u8 two
u8 three
u8 four
''')

class Message(packet.Message):
def __repr__(self):
return '<Analog Sample %s bytes>' % len(self.data)

@classmethod
def from_packet(cls, packet):
subchannel = packet._subchannel
if packet.same:
csdw = packet.subchannels[0]
else:
csdw = packet.subchannels[subchannel]

# Find the sample size in bytes (16 bit aligned).
length = csdw['length'] or 64 # Length 0 = 64 bits
length = length // 8 + (length % 16 and 1 or 0)

data = packet.buffer.read(length)
if len(data) < length:
raise EOFError

# Account for filler byte when length is odd.
if length % 2:
packet.buffer.seek(1, 1)

packet._subchannel += 1
if packet._subchannel >= len(packet.subchannels):
packet._subchannel = 0

return packet.Message(data, parent=packet, **csdw)

def __init__(self, *args, **kwargs):
packet.Packet.__init__(self, *args, **kwargs)

Expand All @@ -66,31 +99,20 @@ def __init__(self, *args, **kwargs):
'same')
self.subchannels = [{k: getattr(self, k) for k in keys}]

# Read CSDWs for subchannels if applicable.
# Read CSDWs for additional subchannels if applicable.
if not self.same:
for i in range(self.subchannel_count):
raw = self.buffer.read(4)
print('parse: ', raw)
# assert 0
self.subchannels.append(
self.csdw_format.unpack(self.buffer.read(4)))

def __next__(self):
subchannel = self._subchannel
if self.same:
csdw = self.subchannels[0]
else:
csdw = self.subchannels[subchannel]

# Find the sample size in bytes (16 bit aligned).
length = csdw['length'] or 64 # Length 0 = 64 bits
length = length // 8 + (length % 16 and 1 or 0)

data = self.buffer.read(length)
if len(data) < length:
raise StopIteration

# Account for filler byte when length is odd.
if length % 2:
self.packet.file.seek(1, 1)

self._subchannel += 1

return self.Message(data, parent=self, **csdw)
self.csdw_format.unpack(raw))

def _raw_body(self):
raw = bytes()
for channel in self.subchannels:
print(channel)
csdw = self.csdw_format.pack(channel)
raw += csdw
raw += b''.join([bytes(msg) for msg in list(self)])
return raw
4 changes: 3 additions & 1 deletion chapter10/packet.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,9 @@ def __init__(self, file=None, parent=None, **kwargs):
self.validate()

# Read channel specific data word (CSDW)
self.__dict__.update(self.csdw_format.unpack(self.buffer.read(4)))
csdw = self.csdw_format.unpack(self.buffer.read(4))
print(csdw)
self.__dict__.update(csdw)

def get_time(self):
"""Return a timestamp for this packet. Depends on parent C10 object."""
Expand Down
4 changes: 4 additions & 0 deletions tests/unit/test_analog.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,7 @@ def test_next(packet):
assert sample.data == b'\xfc\xfc'
if i >= 2:
break


def test_bytes(packet):
assert packet.buffer.getvalue() == bytes(packet)

0 comments on commit a079f9a

Please sign in to comment.