diff --git a/rflx/pyrflx/typevalue.py b/rflx/pyrflx/typevalue.py index 9b264e390..7ae2c9139 100644 --- a/rflx/pyrflx/typevalue.py +++ b/rflx/pyrflx/typevalue.py @@ -92,7 +92,7 @@ def assign(self, value: Any, check: bool = True) -> None: raise NotImplementedError @abstractmethod - def parse(self, value: Union[Bitstring, bytes]) -> None: + def parse(self, value: Union[Bitstring, bytes], check: bool = True) -> None: raise NotImplementedError @property @@ -180,10 +180,10 @@ def assign(self, value: int, check: bool = True) -> None: raise ValueError(f"value {value} not in type range {self._first} .. {self._last}") self._value = value - def parse(self, value: Union[Bitstring, bytes]) -> None: + def parse(self, value: Union[Bitstring, bytes], check: bool = True) -> None: if isinstance(value, bytes): value = Bitstring.from_bytes(value) - self.assign(int(value)) + self.assign(int(value), check) @property def expr(self) -> Number: @@ -253,7 +253,7 @@ def assign(self, value: str, check: bool = True) -> None: self._type.literals[prefixed_value.name], ) - def parse(self, value: Union[Bitstring, bytes]) -> None: + def parse(self, value: Union[Bitstring, bytes], check: bool = True) -> None: if isinstance(value, bytes): value = Bitstring.from_bytes(value) value_as_number = Number(int(value)) @@ -345,14 +345,15 @@ def __init__(self, vtype: Opaque) -> None: self._all_refinements: Sequence[Refinement] = [] def assign(self, value: bytes, check: bool = True) -> None: - self.parse(value) + self.parse(value, check) - def parse(self, value: Union[Bitstring, bytes]) -> None: - self._check_length_of_assigned_value(value) + def parse(self, value: Union[Bitstring, bytes], check: bool = True) -> None: + if check: + self._check_length_of_assigned_value(value) if self._refinement_message is not None: nested_msg = MessageValue(self._refinement_message, self._all_refinements) try: - nested_msg.parse(value) + nested_msg.parse(value, check) except (IndexError, ValueError, KeyError) as e: raise ValueError( f"Error while parsing nested message " @@ -437,8 +438,9 @@ def assign(self, value: List[TypeValue], check: bool = True) -> None: self._value = value - def parse(self, value: Union[Bitstring, bytes]) -> None: - self._check_length_of_assigned_value(value) + def parse(self, value: Union[Bitstring, bytes], check: bool = True) -> None: + if check: + self._check_length_of_assigned_value(value) if isinstance(value, bytes): value = Bitstring.from_bytes(value) if self._is_message_array: @@ -447,7 +449,7 @@ def parse(self, value: Union[Bitstring, bytes]) -> None: nested_message = TypeValue.construct(self._element_type) assert isinstance(nested_message, MessageValue) try: - nested_message.parse(value) + nested_message.parse(value, check) except (IndexError, ValueError, KeyError) as e: raise ValueError( f"cannot parse nested messages in array of type " @@ -467,7 +469,7 @@ def parse(self, value: Union[Bitstring, bytes]) -> None: nested_value = TypeValue.construct( self._element_type, imported=self._element_type.package != self._type.package ) - nested_value.parse(Bitstring(value_str[:type_size_int])) + nested_value.parse(Bitstring(value_str[:type_size_int]), check) new_value.append(nested_value) value_str = value_str[type_size_int:] @@ -545,7 +547,6 @@ def __init__( initial = self._fields[INITIAL.name] initial.first = Number(0) initial.typeval.assign(bytes()) - self._last_field: str = self._next_field(INITIAL.name) self._simplified_mapping: Dict[Name, Expr] = dict.fromkeys( [initial.name_length, initial.name_last, initial.name_first], Number(0) ) @@ -622,7 +623,7 @@ def _get_length(self, fld: str) -> Optional[Number]: if ( self._fields[l.source.name].set and l.length != UNDEFINED - and self.__simplified(l.condition) == TRUE + and (self._skip_verification or self.__simplified(l.condition) == TRUE) ): length = self.__simplified(l.length) return length if isinstance(length, Number) else None @@ -658,7 +659,7 @@ def size(self) -> Number: def assign(self, value: bytes, check: bool = True) -> None: raise NotImplementedError - def parse(self, value: Union[Bitstring, bytes]) -> None: + def parse(self, value: Union[Bitstring, bytes], check: bool = True) -> None: if isinstance(value, bytes): value = Bitstring.from_bytes(value) current_field_name = self._next_field(INITIAL.name) @@ -785,7 +786,7 @@ def check_outgoing_condition_satisfied() -> None: set_refinement(field, field_name) try: if isinstance(value, Bitstring): - field.typeval.parse(value) + field.typeval.parse(value, not self._skip_verification) elif isinstance(value, field.typeval.accepted_type): field.typeval.assign(value, not self._skip_verification) else: @@ -813,6 +814,9 @@ def check_outgoing_condition_satisfied() -> None: def _preset_fields(self, fld: str) -> None: nxt = self._next_field(fld) fields: List[str] = [] + if self._skip_verification and nxt == FINAL.name: + self._fields[fld].next = nxt + return while nxt and nxt != FINAL.name: if self._skip_verification: self._fields[nxt].prev = fld @@ -831,6 +835,8 @@ def _preset_fields(self, fld: str) -> None: else length is not None ): fields.append(nxt) + if self._skip_verification and nxt != FINAL.name: + self._fields[fld].next = nxt if length is None: break @@ -845,7 +851,6 @@ def _preset_fields(self, fld: str) -> None: break self._last_field = nxt if self._skip_verification: - self._fields[fld].next = nxt break nxt = self._next_field(nxt) try: diff --git a/tests/test_pyrflx.py b/tests/test_pyrflx.py index 0ec399987..4f1627fa2 100644 --- a/tests/test_pyrflx.py +++ b/tests/test_pyrflx.py @@ -1620,3 +1620,33 @@ def test_no_verification_ethernet(frame: MessageValue) -> None: frame_unv.set("Payload", payload) assert frame_unv.valid_message assert frame.bytestring == frame_unv.bytestring + + +def test_no_verification_array_nested_messages( + array_message_package: Package, array_message: MessageValue +) -> None: + array_message_one = array_message_package["Foo"] + array_message_one.set("Byte", 5) + array_message_two = array_message_package["Foo"] + array_message_two.set("Byte", 6) + foos: List[TypeValue] = [array_message_one, array_message_two] + array_message.set("Length", 2) + array_message.set("Bar", foos) + assert array_message.valid_message + + pyrflx = PyRFLX([f"{TESTDIR}/array_message.rflx"], skip_verification=True) + array_message_package_unv = pyrflx["Array_Message"] + array_message_unv = array_message_package_unv["Message"] + array_message_one_unv = array_message_package_unv["Foo"] + array_message_one_unv.set("Byte", 5) + array_message_two_unv = array_message_package_unv["Foo"] + array_message_two_unv.set("Byte", 6) + foos_unv: List[TypeValue] = [array_message_one_unv, array_message_two_unv] + array_message_unv.set("Length", 2) + array_message_unv.set("Bar", foos_unv) + assert array_message_unv.valid_message + assert array_message.bytestring == array_message_unv.bytestring + + array_message_unv = array_message_package_unv["Message"] + array_message_unv.parse(b"\x02\x05\x06") + assert array_message_unv.bytestring == b"\x02\x05\x06"