Skip to content

Commit

Permalink
Check checksum when parsing
Browse files Browse the repository at this point in the history
ref #240
  • Loading branch information
jklmnn committed Sep 1, 2020
1 parent 136ad04 commit 851d1f5
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 21 deletions.
51 changes: 32 additions & 19 deletions rflx/pyrflx/typevalue.py
Original file line number Diff line number Diff line change
Expand Up @@ -835,8 +835,10 @@ def check_outgoing_condition_satisfied() -> None:
if checksum_calculation:
self._preset_fields(field_name)
for checksum in self._checksums.values():
if self._is_checksum_settable(checksum):
self._calculate_checksum(checksum)
if not self._fields[checksum.field_name].set and self._is_checksum_settable(
checksum
):
self._set_checksum(checksum)

def _preset_fields(self, fld: str) -> None:
assert not self._skip_verification
Expand Down Expand Up @@ -914,7 +916,7 @@ def valid_path(value_range: ValueRange) -> bool:
upper_field_name = "Final"
while field != upper_field_name:
field = self._next_field(field)
if field == "Final":
if field == "Final" or field in self._checksums:
continue
if field == "" or not self._fields[field].set:
break
Expand All @@ -929,8 +931,8 @@ def valid_path(value_range: ValueRange) -> bool:
isinstance(expr_tuple.evaluated_expression, ValueRange)
and isinstance(expr_tuple.expression, ValueRange)
and (
not isinstance(expr_tuple.evaluated_expression.lower, Number)
or not isinstance(expr_tuple.evaluated_expression.upper, Number)
not isinstance(expr_tuple.evaluated_expression.lower.simplified(), Number)
or not isinstance(expr_tuple.evaluated_expression.upper.simplified(), Number)
or not valid_path(expr_tuple.expression)
)
):
Expand All @@ -947,7 +949,14 @@ def valid_path(value_range: ValueRange) -> bool:
return False
return True

def _calculate_checksum(self, checksum: "MessageValue.Checksum") -> None:
def _set_checksum(self, checksum: "MessageValue.Checksum") -> None:
self._fields[checksum.field_name].typeval.assign(0)
checksum_value = self._calculate_checksum(checksum)
self.set(checksum.field_name, checksum_value, False)

def _calculate_checksum(
self, checksum: "MessageValue.Checksum"
) -> Union[bytes, int, str, Sequence[TypeValue], Bitstring]:
if not checksum.function:
raise AttributeError(
f"cannot calculate checksum for {checksum.field_name}: "
Expand Down Expand Up @@ -975,12 +984,7 @@ def _calculate_checksum(self, checksum: "MessageValue.Checksum") -> None:
else:
assert isinstance(expr_tuple.evaluated_expression, Number)
arguments[str(expr_tuple.expression)] = expr_tuple.evaluated_expression.value

self.set(
checksum.field_name,
checksum.function(self.bytestring, **arguments),
checksum_calculation=False,
)
return checksum.function(self.bytestring, **arguments)

def get(self, field_name: str) -> Union["MessageValue", Sequence[TypeValue], int, str, bytes]:
if field_name not in self.valid_fields:
Expand Down Expand Up @@ -1068,7 +1072,15 @@ def required_fields(self) -> List[str]:

@property
def valid_message(self) -> bool:
return bool(self.valid_fields) and self._next_field(self.valid_fields[-1]) == FINAL.name
return (
bool(self.valid_fields)
and self._next_field(self.valid_fields[-1]) == FINAL.name
and all(
self._is_checksum_settable(checksum)
and self._calculate_checksum(checksum) == self.get(checksum.field_name)
for checksum in self._checksums.values()
)
)

def __update_simplified_mapping(self, field: Optional[Field] = None) -> None:
if field:
Expand All @@ -1081,13 +1093,14 @@ def __update_simplified_mapping(self, field: Optional[Field] = None) -> None:

self._simplified_mapping = {}
for v in self._fields.values():
if not v.set:
continue
if isinstance(v.typeval, ScalarValue):
if isinstance(v.typeval, ScalarValue) and v.set:
self._simplified_mapping[v.name_variable] = v.typeval.expr
self._simplified_mapping[v.name_length] = v.typeval.size
self._simplified_mapping[v.name_first] = v.first
self._simplified_mapping[v.name_last] = v.last
if isinstance(v.typeval, ScalarValue) or v.set:
self._simplified_mapping[v.name_length] = v.typeval.size
if isinstance(v.first, Number):
self._simplified_mapping[v.name_first] = v.first
if isinstance(v.last, Number):
self._simplified_mapping[v.name_last] = v.last

# ISSUE: Componolit/RecordFlux#240
self._simplified_mapping.update({ValidChecksum(f): TRUE for f in self._checksums})
Expand Down
44 changes: 42 additions & 2 deletions tests/test_pyrflx.py
Original file line number Diff line number Diff line change
Expand Up @@ -1533,7 +1533,6 @@ def test_checksum_field_not_defined(icmp_checksum: MessageValue) -> None:
def test_checksum_function_not_set(icmp_checksum: MessageValue) -> None:
icmp_checksum.set("Tag", "Echo_Request")
icmp_checksum.set("Code_Zero", 0)
icmp_checksum.set("Checksum", 1234)
icmp_checksum.set("Identifier", 5)
icmp_checksum.set("Sequence_Number", 1)
with pytest.raises(
Expand All @@ -1543,7 +1542,7 @@ def test_checksum_function_not_set(icmp_checksum: MessageValue) -> None:
icmp_checksum.set("Data", b"\x00")


def test_checksum_icmp(icmp_checksum: MessageValue) -> None:
def test_checksum_manual_icmp(icmp_checksum: MessageValue) -> None:
test_data = (
b"\x47\xb4\x67\x5e\x00\x00\x00\x00"
b"\x4a\xfc\x0d\x00\x00\x00\x00\x00\x10\x11\x12\x13\x14\x15\x16\x17"
Expand All @@ -1557,10 +1556,51 @@ def test_checksum_icmp(icmp_checksum: MessageValue) -> None:
icmp_checksum.set("Identifier", 5)
icmp_checksum.set("Sequence_Number", 1)
icmp_checksum.set("Data", test_data)
assert icmp_checksum.get("Checksum") == 1234
assert icmp_checksum.bytestring == b"\x08\x00\x04\xd2\x00\x05\x00\x01" + test_data


def test_checksum_auto_icmp(icmp_checksum: MessageValue) -> None:
test_data = (
b"\x47\xb4\x67\x5e\x00\x00\x00\x00"
b"\x4a\xfc\x0d\x00\x00\x00\x00\x00\x10\x11\x12\x13\x14\x15\x16\x17"
b"\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f\x20\x21\x22\x23\x24\x25\x26\x27"
b"\x28\x29\x2a\x2b\x2c\x2d\x2e\x2f\x30\x31\x32\x33\x34\x35\x36\x37"
)
icmp_checksum.set_checksum_function({"Checksum": icmp_checksum_function})
icmp_checksum.set("Tag", "Echo_Request")
icmp_checksum.set("Code_Zero", 0)
icmp_checksum.set("Identifier", 5)
icmp_checksum.set("Sequence_Number", 1)
icmp_checksum.set("Data", test_data)
assert icmp_checksum.get("Checksum") == 12824
assert icmp_checksum.bytestring == b"\x08\x00\x32\x18\x00\x05\x00\x01" + test_data


def test_checksum_parse_icmp(icmp_checksum: MessageValue) -> None:
test_data = (
b"\x08\x00\x32\x18\x00\x05\x00\x01\x47\xb4\x67\x5e\x00\x00\x00\x00"
b"\x4a\xfc\x0d\x00\x00\x00\x00\x00\x10\x11\x12\x13\x14\x15\x16\x17"
b"\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f\x20\x21\x22\x23\x24\x25\x26\x27"
b"\x28\x29\x2a\x2b\x2c\x2d\x2e\x2f\x30\x31\x32\x33\x34\x35\x36\x37"
)
icmp_checksum.set_checksum_function({"Checksum": icmp_checksum_function})
icmp_checksum.parse(test_data)
assert icmp_checksum.valid_message


def test_invalid_checksum_parse_icmp(icmp_checksum: MessageValue) -> None:
test_data = (
b"\x08\x00\x35\x19\x00\x05\x00\x01\x47\xb4\x67\x5e\x00\x00\x00\x00"
b"\x4a\xfc\x0d\x00\x00\x00\x00\x00\x10\x11\x12\x13\x14\x15\x16\x17"
b"\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f\x20\x21\x22\x23\x24\x25\x26\x27"
b"\x28\x29\x2a\x2b\x2c\x2d\x2e\x2f\x30\x31\x32\x33\x34\x35\x36\x37"
)
icmp_checksum.set_checksum_function({"Checksum": icmp_checksum_function})
icmp_checksum.parse(test_data)
assert not icmp_checksum.valid_message


def test_checksum_is_checksum_settable(tlv_checksum_type: Message) -> None:
# pylint: disable=protected-access
tlv_msg = MessageValue(tlv_checksum_type)
Expand Down

0 comments on commit 851d1f5

Please sign in to comment.