From 532f4ee35e39052d96fb479761fdfa491a73e4b8 Mon Sep 17 00:00:00 2001 From: vadim Date: Sat, 10 Oct 2020 14:58:24 +0300 Subject: [PATCH] Switched from brute-force escaping to COBS for serial protocol serialization --- pyuavcan/transport/serial/_frame.py | 96 +++++++++++---------- pyuavcan/transport/serial/_stream_parser.py | 37 ++++---- requirements.txt | 3 + 3 files changed, 71 insertions(+), 65 deletions(-) diff --git a/pyuavcan/transport/serial/_frame.py b/pyuavcan/transport/serial/_frame.py index 2ff445c06..4651410e3 100644 --- a/pyuavcan/transport/serial/_frame.py +++ b/pyuavcan/transport/serial/_frame.py @@ -10,6 +10,7 @@ import itertools import dataclasses import pyuavcan +from cobs import cobs _VERSION = 0 @@ -28,21 +29,21 @@ @dataclasses.dataclass(frozen=True) class SerialFrame(pyuavcan.transport.commons.high_overhead_transport.Frame): - NODE_ID_MASK = 4095 + NODE_ID_MASK = 4095 TRANSFER_ID_MASK = 2 ** 64 - 1 - INDEX_MASK = 2 ** 31 - 1 + INDEX_MASK = 2 ** 31 - 1 NODE_ID_RANGE = range(NODE_ID_MASK + 1) - FRAME_DELIMITER_BYTE = 0x9E - ESCAPE_PREFIX_BYTE = 0x8E + FRAME_DELIMITER_BYTE = 0x00 + # ESCAPE_PREFIX_BYTE = 0x8E NUM_OVERHEAD_BYTES_EXCEPT_DELIMITERS_AND_ESCAPING = _HEADER_SIZE + _CRC_SIZE_BYTES - source_node_id: typing.Optional[int] + source_node_id: typing.Optional[int] destination_node_id: typing.Optional[int] - data_specifier: pyuavcan.transport.DataSpecifier - data_type_hash: int + data_specifier: pyuavcan.transport.DataSpecifier + data_type_hash: int def __post_init__(self) -> None: if not isinstance(self.priority, pyuavcan.transport.Priority): @@ -105,16 +106,14 @@ def compile_into(self, out_buffer: bytearray) -> memoryview: payload_crc_bytes = pyuavcan.transport.commons.crc.CRC32C.new(self.payload).value_as_bytes - escapees = self.FRAME_DELIMITER_BYTE, self.ESCAPE_PREFIX_BYTE out_buffer[0] = self.FRAME_DELIMITER_BYTE next_byte_index = 1 - for nb in itertools.chain(header, self.payload, payload_crc_bytes): - if nb in escapees: - out_buffer[next_byte_index] = self.ESCAPE_PREFIX_BYTE - next_byte_index += 1 - nb ^= 0xFF - out_buffer[next_byte_index] = nb - next_byte_index += 1 + + packet_bytes = header + self.payload + payload_crc_bytes + encoded_image = cobs.encode(packet_bytes) + # place in the buffer and update next_byte_index: + out_buffer[next_byte_index:next_byte_index] = encoded_image + next_byte_index += len(encoded_image) out_buffer[next_byte_index] = self.FRAME_DELIMITER_BYTE next_byte_index += 1 @@ -122,6 +121,18 @@ def compile_into(self, out_buffer: bytearray) -> memoryview: assert (next_byte_index - 2) >= (len(header) + len(self.payload) + len(payload_crc_bytes)) return memoryview(out_buffer)[:next_byte_index] + @staticmethod + def parse_from_cobs_image(header_payload_crc_image: memoryview, + timestamp: pyuavcan.transport.Timestamp) -> typing.Optional[SerialFrame]: + """ + :returns: Frame or None if the image is invalid. + """ + try: + unescaped_image = cobs.decode(bytearray(header_payload_crc_image)) + except cobs.DecodeError: + return None + return SerialFrame.parse_from_unescaped_image(memoryview(unescaped_image), timestamp) + @staticmethod def parse_from_unescaped_image(header_payload_crc_image: memoryview, timestamp: pyuavcan.transport.Timestamp) -> typing.Optional[SerialFrame]: @@ -184,43 +195,39 @@ def _unittest_frame_compile_message() -> None: f = SerialFrame(timestamp=Timestamp.now(), priority=Priority.HIGH, source_node_id=SerialFrame.FRAME_DELIMITER_BYTE, - destination_node_id=SerialFrame.ESCAPE_PREFIX_BYTE, + destination_node_id=SerialFrame.FRAME_DELIMITER_BYTE, data_specifier=MessageDataSpecifier(12345), data_type_hash=0xdead_beef_bad_c0ffe, transfer_id=1234567890123456789, index=1234567, end_of_transfer=True, - payload=memoryview(b'abcd\x9Eef\x8E')) + payload=memoryview(b'abcd\x00ef\x00')) buffer = bytearray(0 for _ in range(1000)) mv = f.compile_into(buffer) assert mv[0] == SerialFrame.FRAME_DELIMITER_BYTE assert mv[-1] == SerialFrame.FRAME_DELIMITER_BYTE - segment = bytes(mv[1:-1]) - assert SerialFrame.FRAME_DELIMITER_BYTE not in segment + + segment_cobs = bytes(mv[1:-1]) + assert SerialFrame.FRAME_DELIMITER_BYTE not in segment_cobs + + segment = cobs.decode(segment_cobs) # Header validation assert segment[0] == _VERSION assert segment[1] == int(Priority.HIGH) - assert segment[2] == SerialFrame.ESCAPE_PREFIX_BYTE - assert (segment[3], segment[4]) == (SerialFrame.FRAME_DELIMITER_BYTE ^ 0xFF, 0) - assert segment[5] == SerialFrame.ESCAPE_PREFIX_BYTE - assert (segment[6], segment[7]) == (SerialFrame.ESCAPE_PREFIX_BYTE ^ 0xFF, 0) - assert segment[8:10] == 12345 .to_bytes(2, 'little') - assert segment[10:18] == 0xdead_beef_bad_c0ffe .to_bytes(8, 'little') - assert segment[18:26] == 1234567890123456789 .to_bytes(8, 'little') - assert segment[26:30] == (1234567 + 0x8000_0000).to_bytes(4, 'little') + assert (segment[2], segment[3]) == (SerialFrame.FRAME_DELIMITER_BYTE, 0) + assert (segment[4], segment[5]) == (SerialFrame.FRAME_DELIMITER_BYTE, 0) + assert segment[6:8] == 12345.to_bytes(2, 'little') + assert segment[8:16] == 0xdead_beef_bad_c0ffe.to_bytes(8, 'little') + assert segment[16:24] == 1234567890123456789.to_bytes(8, 'little') + assert segment[24:28] == (1234567 + 0x8000_0000).to_bytes(4, 'little') # Header CRC here # Payload validation - assert segment[34:38] == b'abcd' - assert segment[38] == SerialFrame.ESCAPE_PREFIX_BYTE - assert segment[39] == 0x9E ^ 0xFF - assert segment[40:42] == b'ef' - assert segment[42] == SerialFrame.ESCAPE_PREFIX_BYTE - assert segment[43] == 0x8E ^ 0xFF - assert segment[44:] == pyuavcan.transport.commons.crc.CRC32C.new(f.payload).value_as_bytes + assert segment[32:40] == b'abcd\x00ef\x00' + assert segment[40:] == pyuavcan.transport.commons.crc.CRC32C.new(f.payload).value_as_bytes def _unittest_frame_compile_service() -> None: @@ -241,23 +248,24 @@ def _unittest_frame_compile_service() -> None: mv = f.compile_into(buffer) assert mv[0] == mv[-1] == SerialFrame.FRAME_DELIMITER_BYTE - segment = bytes(mv[1:-1]) - assert SerialFrame.FRAME_DELIMITER_BYTE not in segment + segment_cobs = bytes(mv[1:-1]) + assert SerialFrame.FRAME_DELIMITER_BYTE not in segment_cobs + + segment = cobs.decode(segment_cobs) # Header validation assert segment[0] == _VERSION assert segment[1] == int(Priority.FAST) - assert segment[2] == SerialFrame.ESCAPE_PREFIX_BYTE - assert (segment[3], segment[4]) == (SerialFrame.FRAME_DELIMITER_BYTE ^ 0xFF, 0) - assert (segment[5], segment[6]) == (0xFF, 0xFF) - assert segment[7:9] == ((1 << 15) | (1 << 14) | 123) .to_bytes(2, 'little') - assert segment[9:17] == 0xdead_beef_bad_c0ffe .to_bytes(8, 'little') - assert segment[17:25] == 1234567890123456789 .to_bytes(8, 'little') - assert segment[25:29] == 1234567 .to_bytes(4, 'little') + assert (segment[2], segment[3]) == (SerialFrame.FRAME_DELIMITER_BYTE, 0) + assert (segment[4], segment[5]) == (0xFF, 0xFF) + assert segment[6:8] == ((1 << 15) | (1 << 14) | 123).to_bytes(2, 'little') + assert segment[8:16] == 0xdead_beef_bad_c0ffe.to_bytes(8, 'little') + assert segment[16:24] == 1234567890123456789.to_bytes(8, 'little') + assert segment[24:28] == 1234567.to_bytes(4, 'little') # Header CRC here # CRC validation - assert segment[33:] == pyuavcan.transport.commons.crc.CRC32C.new(f.payload).value_as_bytes + assert segment[32:] == pyuavcan.transport.commons.crc.CRC32C.new(f.payload).value_as_bytes def _unittest_frame_parse() -> None: diff --git a/pyuavcan/transport/serial/_stream_parser.py b/pyuavcan/transport/serial/_stream_parser.py index 61f5c97ba..cd94c7d3c 100644 --- a/pyuavcan/transport/serial/_stream_parser.py +++ b/pyuavcan/transport/serial/_stream_parser.py @@ -6,6 +6,7 @@ import typing import pyuavcan +import math from ._frame import SerialFrame @@ -33,7 +34,7 @@ def __init__(self, :param max_payload_size_bytes: Frames containing more that this many bytes of payload (after escaping and not including the header and CRC) will be considered invalid. """ - max_payload_size_bytes = int(max_payload_size_bytes) + max_payload_size_bytes = int(math.ceil(max_payload_size_bytes * 255.0 / 254.0)) # COBS stuffing overhead if not (callable(callback) and max_payload_size_bytes > 0): raise ValueError('Invalid parameters') @@ -64,13 +65,6 @@ def _process_byte(self, b: int, timestamp: pyuavcan.transport.Timestamp) -> None return # Unescaping is done only if we're inside a frame currently. - if self._is_inside_frame(): - if b == SerialFrame.ESCAPE_PREFIX_BYTE: - self._unescape_next = True - return - if self._unescape_next: - self._unescape_next = False - b ^= 0xFF # Appending to the buffer always, regardless of whether we're in a frame or not. # We may find out that the data does not belong to the protocol only much later; can't look ahead. @@ -83,15 +77,15 @@ def _finalize(self, known_invalid: bool) -> None: try: mv = memoryview(self._frame_buffer) parsed: typing.Optional[SerialFrame] = None - if (not known_invalid) and len(mv) <= self._max_frame_size_bytes: + if (not known_invalid) and len(mv) <= self._max_frame_size_bytes: # and assert self._current_frame_timestamp is not None - parsed = SerialFrame.parse_from_unescaped_image(mv, self._current_frame_timestamp) + parsed = SerialFrame.parse_from_cobs_image(mv, self._current_frame_timestamp) if parsed: self._callback(parsed) elif mv: self._callback(mv) else: - pass # Empty - nothing to report. + pass # Empty - nothing to report. finally: self._unescape_next = False self._current_frame_timestamp = None @@ -123,14 +117,14 @@ def proc(b: typing.Union[bytes, memoryview]) -> typing.Sequence[typing.Union[Ser assert [] == proc(b'') # The frame is well-delimited, but the content is invalid. Notice the unescaping in action. - assert [] == proc(b'\x9E\x8E\x61') - assert [memoryview(b'\x9E\x8E')] == proc(b'\x8E\x71\x9E') + # assert [] == proc(b'\x9E\x8E\x61') + # assert [memoryview(b'\x9E\x8E')] == proc(b'\x8E\x71\x9E') # Valid frame. f1 = SerialFrame(timestamp=ts, priority=Priority.HIGH, source_node_id=SerialFrame.FRAME_DELIMITER_BYTE, - destination_node_id=SerialFrame.ESCAPE_PREFIX_BYTE, + destination_node_id=SerialFrame.FRAME_DELIMITER_BYTE, data_specifier=MessageDataSpecifier(12345), data_type_hash=0xdead_beef_bad_c0ffe, transfer_id=1234567890123456789, @@ -146,23 +140,24 @@ def proc(b: typing.Union[bytes, memoryview]) -> typing.Sequence[typing.Union[Ser f2 = SerialFrame(timestamp=ts, priority=Priority.HIGH, source_node_id=SerialFrame.FRAME_DELIMITER_BYTE, - destination_node_id=SerialFrame.ESCAPE_PREFIX_BYTE, + destination_node_id=SerialFrame.FRAME_DELIMITER_BYTE, data_specifier=MessageDataSpecifier(12345), data_type_hash=0xdead_beef_bad_c0ffe, transfer_id=1234567890123456789, index=1234567, end_of_transfer=True, payload=f1.compile_into(bytearray(1000))) - assert len(f2.payload) == 46 + 2 # The extra two are escapes. + assert len(f2.payload) == 43 # Cobs escaping + result = proc(f2.compile_into(bytearray(1000))) assert len(result) == 1 - assert isinstance(result[0], memoryview) + assert isinstance(result[0], memoryview) # no message size enforcement yet # Create new instance with much larger frame size limit; feed both frames but let the first one be incomplete. - sp = StreamParser(outputs.append, 10**6) - assert [] == proc(f1.compile_into(bytearray(100))[:-2]) # First one is ended abruptly. - result = proc(f2.compile_into(bytearray(100))) # Then the second frame begins. - assert len(result) == 2 # Make sure the second one is retrieved correctly. + sp = StreamParser(outputs.append, 10 ** 6) + assert [] == proc(f1.compile_into(bytearray(100))[:-2]) # First one is ended abruptly. + result = proc(f2.compile_into(bytearray(100))) # Then the second frame begins. + assert len(result) == 2 # Make sure the second one is retrieved correctly. assert isinstance(result[0], memoryview) assert isinstance(result[1], SerialFrame) assert SerialFrame.__eq__(f2, result) diff --git a/requirements.txt b/requirements.txt index bd5f8a2c6..8668bb03b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -31,3 +31,6 @@ sphinx-computron >= 0.1.0 setuptools >= 46.0 wheel >= 0.34, < 2.0 twine ~= 3.2 + +# Serialization for serial +cobs ~= 1.1.4