Skip to content

Commit

Permalink
Switched from brute-force escaping to COBS for serial protocol serial…
Browse files Browse the repository at this point in the history
…ization
  • Loading branch information
anonymouze committed Oct 10, 2020
1 parent 89fec1e commit 532f4ee
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 65 deletions.
96 changes: 52 additions & 44 deletions pyuavcan/transport/serial/_frame.py
Expand Up @@ -10,6 +10,7 @@
import itertools
import dataclasses
import pyuavcan
from cobs import cobs

_VERSION = 0

Expand All @@ -28,21 +29,21 @@

@dataclasses.dataclass(frozen=True)
class SerialFrame(pyuavcan.transport.commons.high_overhead_transport.Frame):
NODE_ID_MASK = 4095
NODE_ID_MASK = 4095
TRANSFER_ID_MASK = 2 ** 64 - 1
INDEX_MASK = 2 ** 31 - 1
INDEX_MASK = 2 ** 31 - 1

NODE_ID_RANGE = range(NODE_ID_MASK + 1)

FRAME_DELIMITER_BYTE = 0x9E
ESCAPE_PREFIX_BYTE = 0x8E
FRAME_DELIMITER_BYTE = 0x00
# ESCAPE_PREFIX_BYTE = 0x8E

NUM_OVERHEAD_BYTES_EXCEPT_DELIMITERS_AND_ESCAPING = _HEADER_SIZE + _CRC_SIZE_BYTES

source_node_id: typing.Optional[int]
source_node_id: typing.Optional[int]
destination_node_id: typing.Optional[int]
data_specifier: pyuavcan.transport.DataSpecifier
data_type_hash: int
data_specifier: pyuavcan.transport.DataSpecifier
data_type_hash: int

def __post_init__(self) -> None:
if not isinstance(self.priority, pyuavcan.transport.Priority):
Expand Down Expand Up @@ -105,23 +106,33 @@ def compile_into(self, out_buffer: bytearray) -> memoryview:

payload_crc_bytes = pyuavcan.transport.commons.crc.CRC32C.new(self.payload).value_as_bytes

escapees = self.FRAME_DELIMITER_BYTE, self.ESCAPE_PREFIX_BYTE
out_buffer[0] = self.FRAME_DELIMITER_BYTE
next_byte_index = 1
for nb in itertools.chain(header, self.payload, payload_crc_bytes):
if nb in escapees:
out_buffer[next_byte_index] = self.ESCAPE_PREFIX_BYTE
next_byte_index += 1
nb ^= 0xFF
out_buffer[next_byte_index] = nb
next_byte_index += 1

packet_bytes = header + self.payload + payload_crc_bytes
encoded_image = cobs.encode(packet_bytes)
# place in the buffer and update next_byte_index:
out_buffer[next_byte_index:next_byte_index] = encoded_image
next_byte_index += len(encoded_image)

out_buffer[next_byte_index] = self.FRAME_DELIMITER_BYTE
next_byte_index += 1

assert (next_byte_index - 2) >= (len(header) + len(self.payload) + len(payload_crc_bytes))
return memoryview(out_buffer)[:next_byte_index]

@staticmethod
def parse_from_cobs_image(header_payload_crc_image: memoryview,
timestamp: pyuavcan.transport.Timestamp) -> typing.Optional[SerialFrame]:
"""
:returns: Frame or None if the image is invalid.
"""
try:
unescaped_image = cobs.decode(bytearray(header_payload_crc_image))
except cobs.DecodeError:
return None
return SerialFrame.parse_from_unescaped_image(memoryview(unescaped_image), timestamp)

@staticmethod
def parse_from_unescaped_image(header_payload_crc_image: memoryview,
timestamp: pyuavcan.transport.Timestamp) -> typing.Optional[SerialFrame]:
Expand Down Expand Up @@ -184,43 +195,39 @@ def _unittest_frame_compile_message() -> None:
f = SerialFrame(timestamp=Timestamp.now(),
priority=Priority.HIGH,
source_node_id=SerialFrame.FRAME_DELIMITER_BYTE,
destination_node_id=SerialFrame.ESCAPE_PREFIX_BYTE,
destination_node_id=SerialFrame.FRAME_DELIMITER_BYTE,
data_specifier=MessageDataSpecifier(12345),
data_type_hash=0xdead_beef_bad_c0ffe,
transfer_id=1234567890123456789,
index=1234567,
end_of_transfer=True,
payload=memoryview(b'abcd\x9Eef\x8E'))
payload=memoryview(b'abcd\x00ef\x00'))

buffer = bytearray(0 for _ in range(1000))
mv = f.compile_into(buffer)

assert mv[0] == SerialFrame.FRAME_DELIMITER_BYTE
assert mv[-1] == SerialFrame.FRAME_DELIMITER_BYTE
segment = bytes(mv[1:-1])
assert SerialFrame.FRAME_DELIMITER_BYTE not in segment

segment_cobs = bytes(mv[1:-1])
assert SerialFrame.FRAME_DELIMITER_BYTE not in segment_cobs

segment = cobs.decode(segment_cobs)

# Header validation
assert segment[0] == _VERSION
assert segment[1] == int(Priority.HIGH)
assert segment[2] == SerialFrame.ESCAPE_PREFIX_BYTE
assert (segment[3], segment[4]) == (SerialFrame.FRAME_DELIMITER_BYTE ^ 0xFF, 0)
assert segment[5] == SerialFrame.ESCAPE_PREFIX_BYTE
assert (segment[6], segment[7]) == (SerialFrame.ESCAPE_PREFIX_BYTE ^ 0xFF, 0)
assert segment[8:10] == 12345 .to_bytes(2, 'little')
assert segment[10:18] == 0xdead_beef_bad_c0ffe .to_bytes(8, 'little')
assert segment[18:26] == 1234567890123456789 .to_bytes(8, 'little')
assert segment[26:30] == (1234567 + 0x8000_0000).to_bytes(4, 'little')
assert (segment[2], segment[3]) == (SerialFrame.FRAME_DELIMITER_BYTE, 0)
assert (segment[4], segment[5]) == (SerialFrame.FRAME_DELIMITER_BYTE, 0)
assert segment[6:8] == 12345.to_bytes(2, 'little')
assert segment[8:16] == 0xdead_beef_bad_c0ffe.to_bytes(8, 'little')
assert segment[16:24] == 1234567890123456789.to_bytes(8, 'little')
assert segment[24:28] == (1234567 + 0x8000_0000).to_bytes(4, 'little')
# Header CRC here

# Payload validation
assert segment[34:38] == b'abcd'
assert segment[38] == SerialFrame.ESCAPE_PREFIX_BYTE
assert segment[39] == 0x9E ^ 0xFF
assert segment[40:42] == b'ef'
assert segment[42] == SerialFrame.ESCAPE_PREFIX_BYTE
assert segment[43] == 0x8E ^ 0xFF
assert segment[44:] == pyuavcan.transport.commons.crc.CRC32C.new(f.payload).value_as_bytes
assert segment[32:40] == b'abcd\x00ef\x00'
assert segment[40:] == pyuavcan.transport.commons.crc.CRC32C.new(f.payload).value_as_bytes


def _unittest_frame_compile_service() -> None:
Expand All @@ -241,23 +248,24 @@ def _unittest_frame_compile_service() -> None:
mv = f.compile_into(buffer)

assert mv[0] == mv[-1] == SerialFrame.FRAME_DELIMITER_BYTE
segment = bytes(mv[1:-1])
assert SerialFrame.FRAME_DELIMITER_BYTE not in segment
segment_cobs = bytes(mv[1:-1])
assert SerialFrame.FRAME_DELIMITER_BYTE not in segment_cobs

segment = cobs.decode(segment_cobs)

# Header validation
assert segment[0] == _VERSION
assert segment[1] == int(Priority.FAST)
assert segment[2] == SerialFrame.ESCAPE_PREFIX_BYTE
assert (segment[3], segment[4]) == (SerialFrame.FRAME_DELIMITER_BYTE ^ 0xFF, 0)
assert (segment[5], segment[6]) == (0xFF, 0xFF)
assert segment[7:9] == ((1 << 15) | (1 << 14) | 123) .to_bytes(2, 'little')
assert segment[9:17] == 0xdead_beef_bad_c0ffe .to_bytes(8, 'little')
assert segment[17:25] == 1234567890123456789 .to_bytes(8, 'little')
assert segment[25:29] == 1234567 .to_bytes(4, 'little')
assert (segment[2], segment[3]) == (SerialFrame.FRAME_DELIMITER_BYTE, 0)
assert (segment[4], segment[5]) == (0xFF, 0xFF)
assert segment[6:8] == ((1 << 15) | (1 << 14) | 123).to_bytes(2, 'little')
assert segment[8:16] == 0xdead_beef_bad_c0ffe.to_bytes(8, 'little')
assert segment[16:24] == 1234567890123456789.to_bytes(8, 'little')
assert segment[24:28] == 1234567.to_bytes(4, 'little')
# Header CRC here

# CRC validation
assert segment[33:] == pyuavcan.transport.commons.crc.CRC32C.new(f.payload).value_as_bytes
assert segment[32:] == pyuavcan.transport.commons.crc.CRC32C.new(f.payload).value_as_bytes


def _unittest_frame_parse() -> None:
Expand Down
37 changes: 16 additions & 21 deletions pyuavcan/transport/serial/_stream_parser.py
Expand Up @@ -6,6 +6,7 @@

import typing
import pyuavcan
import math
from ._frame import SerialFrame


Expand Down Expand Up @@ -33,7 +34,7 @@ def __init__(self,
:param max_payload_size_bytes: Frames containing more that this many bytes of payload (after escaping and
not including the header and CRC) will be considered invalid.
"""
max_payload_size_bytes = int(max_payload_size_bytes)
max_payload_size_bytes = int(math.ceil(max_payload_size_bytes * 255.0 / 254.0)) # COBS stuffing overhead
if not (callable(callback) and max_payload_size_bytes > 0):
raise ValueError('Invalid parameters')

Expand Down Expand Up @@ -64,13 +65,6 @@ def _process_byte(self, b: int, timestamp: pyuavcan.transport.Timestamp) -> None
return

# Unescaping is done only if we're inside a frame currently.
if self._is_inside_frame():
if b == SerialFrame.ESCAPE_PREFIX_BYTE:
self._unescape_next = True
return
if self._unescape_next:
self._unescape_next = False
b ^= 0xFF

# Appending to the buffer always, regardless of whether we're in a frame or not.
# We may find out that the data does not belong to the protocol only much later; can't look ahead.
Expand All @@ -83,15 +77,15 @@ def _finalize(self, known_invalid: bool) -> None:
try:
mv = memoryview(self._frame_buffer)
parsed: typing.Optional[SerialFrame] = None
if (not known_invalid) and len(mv) <= self._max_frame_size_bytes:
if (not known_invalid) and len(mv) <= self._max_frame_size_bytes: # and
assert self._current_frame_timestamp is not None
parsed = SerialFrame.parse_from_unescaped_image(mv, self._current_frame_timestamp)
parsed = SerialFrame.parse_from_cobs_image(mv, self._current_frame_timestamp)
if parsed:
self._callback(parsed)
elif mv:
self._callback(mv)
else:
pass # Empty - nothing to report.
pass # Empty - nothing to report.
finally:
self._unescape_next = False
self._current_frame_timestamp = None
Expand Down Expand Up @@ -123,14 +117,14 @@ def proc(b: typing.Union[bytes, memoryview]) -> typing.Sequence[typing.Union[Ser
assert [] == proc(b'')

# The frame is well-delimited, but the content is invalid. Notice the unescaping in action.
assert [] == proc(b'\x9E\x8E\x61')
assert [memoryview(b'\x9E\x8E')] == proc(b'\x8E\x71\x9E')
# assert [] == proc(b'\x9E\x8E\x61')
# assert [memoryview(b'\x9E\x8E')] == proc(b'\x8E\x71\x9E')

# Valid frame.
f1 = SerialFrame(timestamp=ts,
priority=Priority.HIGH,
source_node_id=SerialFrame.FRAME_DELIMITER_BYTE,
destination_node_id=SerialFrame.ESCAPE_PREFIX_BYTE,
destination_node_id=SerialFrame.FRAME_DELIMITER_BYTE,
data_specifier=MessageDataSpecifier(12345),
data_type_hash=0xdead_beef_bad_c0ffe,
transfer_id=1234567890123456789,
Expand All @@ -146,23 +140,24 @@ def proc(b: typing.Union[bytes, memoryview]) -> typing.Sequence[typing.Union[Ser
f2 = SerialFrame(timestamp=ts,
priority=Priority.HIGH,
source_node_id=SerialFrame.FRAME_DELIMITER_BYTE,
destination_node_id=SerialFrame.ESCAPE_PREFIX_BYTE,
destination_node_id=SerialFrame.FRAME_DELIMITER_BYTE,
data_specifier=MessageDataSpecifier(12345),
data_type_hash=0xdead_beef_bad_c0ffe,
transfer_id=1234567890123456789,
index=1234567,
end_of_transfer=True,
payload=f1.compile_into(bytearray(1000)))
assert len(f2.payload) == 46 + 2 # The extra two are escapes.
assert len(f2.payload) == 43 # Cobs escaping

result = proc(f2.compile_into(bytearray(1000)))
assert len(result) == 1
assert isinstance(result[0], memoryview)
assert isinstance(result[0], memoryview) # no message size enforcement yet

# Create new instance with much larger frame size limit; feed both frames but let the first one be incomplete.
sp = StreamParser(outputs.append, 10**6)
assert [] == proc(f1.compile_into(bytearray(100))[:-2]) # First one is ended abruptly.
result = proc(f2.compile_into(bytearray(100))) # Then the second frame begins.
assert len(result) == 2 # Make sure the second one is retrieved correctly.
sp = StreamParser(outputs.append, 10 ** 6)
assert [] == proc(f1.compile_into(bytearray(100))[:-2]) # First one is ended abruptly.
result = proc(f2.compile_into(bytearray(100))) # Then the second frame begins.
assert len(result) == 2 # Make sure the second one is retrieved correctly.
assert isinstance(result[0], memoryview)
assert isinstance(result[1], SerialFrame)
assert SerialFrame.__eq__(f2, result)
3 changes: 3 additions & 0 deletions requirements.txt
Expand Up @@ -31,3 +31,6 @@ sphinx-computron >= 0.1.0
setuptools >= 46.0
wheel >= 0.34, < 2.0
twine ~= 3.2

# Serialization for serial
cobs ~= 1.1.4

0 comments on commit 532f4ee

Please sign in to comment.