From 7b0c0c12fce839e0b1243b53ef65c6863ba2f40b Mon Sep 17 00:00:00 2001 From: Nex Sabre Date: Wed, 26 Oct 2022 21:28:02 +0200 Subject: [PATCH 1/6] Code improvements --- packet_helper_core/checksum_status.py | 16 +-- packet_helper_core/core.py | 2 +- packet_helper_core/models/__init__.py | 0 packet_helper_core/models/scapy_data.py | 21 ++++ packet_helper_core/packet_data.py | 11 +-- packet_helper_core/packet_data_scapy.py | 41 ++++---- packet_helper_core/utils/conversion.py | 4 +- packet_helper_core/utils/scapy_reader.py | 13 ++- packet_helper_core/utils/utils.py | 4 +- tests/test_core.py | 118 +++++++++++++---------- tests/test_packet_data.py | 2 +- tests/utils/example_packets.py | 2 +- 12 files changed, 136 insertions(+), 98 deletions(-) create mode 100644 packet_helper_core/models/__init__.py create mode 100644 packet_helper_core/models/scapy_data.py diff --git a/packet_helper_core/checksum_status.py b/packet_helper_core/checksum_status.py index b039e71..19ee22d 100644 --- a/packet_helper_core/checksum_status.py +++ b/packet_helper_core/checksum_status.py @@ -1,16 +1,18 @@ -from dataclasses import dataclass +from pydantic import BaseModel -@dataclass -class ChecksumStatus: +class ChecksumStatus(BaseModel): chksum: str = "" chksum_calculated: str = "" status: bool | None = None - def __call__(self, *args, **kwargs): + def verify(self) -> None: def clean_chksum(element: str): return element.replace("0x", "") - if self.chksum == "" or self.chksum_calculated == "": - return - self.status = clean_chksum(self.chksum) == clean_chksum(self.chksum_calculated) + if self.chksum and self.chksum_calculated: + self.status = clean_chksum(self.chksum) == clean_chksum( + self.chksum_calculated + ) + else: + self.status = None diff --git a/packet_helper_core/core.py b/packet_helper_core/core.py index f1a85fc..17f5fc9 100644 --- a/packet_helper_core/core.py +++ b/packet_helper_core/core.py @@ -16,5 +16,5 @@ class Core: def __post_init__(self): self.hex_string = self.hex_string.replace(" ", "") - self.tshark_data = PacketData(str(decode_hex(self.hex_string))) + self.tshark_data = PacketData(raw=str(decode_hex(self.hex_string))) self.scapy_data = PacketDataScapy(self.hex_string, self.tshark_data) diff --git a/packet_helper_core/models/__init__.py b/packet_helper_core/models/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/packet_helper_core/models/scapy_data.py b/packet_helper_core/models/scapy_data.py new file mode 100644 index 0000000..f156660 --- /dev/null +++ b/packet_helper_core/models/scapy_data.py @@ -0,0 +1,21 @@ +from typing import Literal + +from pydantic import BaseModel + +from packet_helper_core.checksum_status import ChecksumStatus + + +class ScapyData(BaseModel): + name: str + bytes_record: str # bytes + hex_record: str # hex + hex_record_full: str # hex_one + length: int + length_unit: Literal[ + "B", + ] # length_unit + representation: str # repr + representation_full: str # repr_full + tshark_name: str = "" + tshark_raw_summary: str = "" + chksum_status: ChecksumStatus | None = None diff --git a/packet_helper_core/packet_data.py b/packet_helper_core/packet_data.py index 49a8b0d..9f48794 100644 --- a/packet_helper_core/packet_data.py +++ b/packet_helper_core/packet_data.py @@ -1,5 +1,4 @@ -from dataclasses import dataclass, asdict, field -from typing import Any +from dataclasses import dataclass, field from packet_helper_core.checksum_status import ChecksumStatus @@ -7,7 +6,7 @@ @dataclass class PacketData: raw: str - chksum_list: list[Any] = field(default_factory=list) + chksum_list: list[ChecksumStatus] = field(default_factory=list) _data_layer: list[str] = field(default_factory=list) @@ -89,7 +88,7 @@ def compose_body_list(self) -> list[list[str]]: return temp_body_dict def chksum_verification(self, element) -> None: - chksum_status = ChecksumStatus() + chksum_status: ChecksumStatus = ChecksumStatus() for x in element: x = x.lower() if "header checksum" in x and "incorrect" in x: @@ -102,8 +101,8 @@ def chksum_verification(self, element) -> None: if "calculated checksum" in x: chksum_status.chksum_calculated = x.split(":")[1].split()[0] else: - chksum_status() - self.chksum_list.append(asdict(chksum_status)) + chksum_status.verify() + self.chksum_list.append(chksum_status) def update_header(self): """Update header with data layer which is 'hidden' in the tshark output""" diff --git a/packet_helper_core/packet_data_scapy.py b/packet_helper_core/packet_data_scapy.py index 41ad3d1..47cd72d 100644 --- a/packet_helper_core/packet_data_scapy.py +++ b/packet_helper_core/packet_data_scapy.py @@ -2,6 +2,7 @@ from scapy_helper import get_hex +from packet_helper_core.models.scapy_data import ScapyData from packet_helper_core.packet_data import PacketData from packet_helper_core.utils.scapy_reader import scapy_reader @@ -21,30 +22,32 @@ def __post_init__(self): self.structure = self.__make_structure() def __make_structure(self): - temp_structure = [] - - for e, h in enumerate(self.headers_scapy): - one_frame = self.headers_scapy[e].copy() - one_frame.remove_payload() - _dict = { - "name": h.name, - "bytes": str(h), - "hex": get_hex(h), - "hex_one": get_hex(one_frame), - "length": len(h), - "length_unit": "B", - "repr": f"{repr(h).split(' |')[0]}>", - "repr_full": repr(h), - } + temp_structure: list[ScapyData] = [] + + for index, header in enumerate(self.headers_scapy): + scapy_header = self.headers_scapy[index].copy() + scapy_header.remove_payload() + scapy_data_dict: ScapyData = ScapyData( + **{ + "name": header.name, + "bytes_record": str(header), + "hex_record": get_hex(header), + "hex_record_full": get_hex(scapy_header), + "length": len(header), + "length_unit": "B", + "representation": f"{repr(header).split(' |')[0]}>", + "representation_full": repr(header), + } + ) # RAW elements on the end are added to the last package as data! try: - _dict["tshark_name"] = self.packet_data.body2[e][0] - _dict["tshark_raw_summary"] = self.packet_data.body2[e][1:] + scapy_data_dict.tshark_name = self.packet_data.body2[index][0] + scapy_data_dict.tshark_raw_summary = self.packet_data.body2[index][1:] except IndexError: break - _dict["chksum_status"] = self.packet_data.chksum_list[e] + scapy_data_dict.chksum_status = self.packet_data.chksum_list[index] - temp_structure.append(_dict) + temp_structure.append(scapy_data_dict) return temp_structure diff --git a/packet_helper_core/utils/conversion.py b/packet_helper_core/utils/conversion.py index fffaafb..1bd2d6c 100644 --- a/packet_helper_core/utils/conversion.py +++ b/packet_helper_core/utils/conversion.py @@ -1,7 +1,9 @@ import importlib +from scapy.base_classes import BasePacket -def from_sh_list(packet_list): + +def from_sh_list(packet_list) -> BasePacket: imported_all = importlib.import_module("scapy.all") def remove_none(): diff --git a/packet_helper_core/utils/scapy_reader.py b/packet_helper_core/utils/scapy_reader.py index 5f9e3ce..314df1b 100644 --- a/packet_helper_core/utils/scapy_reader.py +++ b/packet_helper_core/utils/scapy_reader.py @@ -2,26 +2,25 @@ import logging import os from time import time -from typing import List from scapy.all import wrpcap, rdpcap from scapy.packet import Packet -def scapy_reader(hex_str: str) -> List[Packet]: - hex_str = binascii.unhexlify(hex_str) - if not isinstance(hex_str, bytes): +def scapy_reader(hex_str: str) -> list[Packet]: + bytes_from_hex = binascii.unhexlify(hex_str) + if not isinstance(bytes_from_hex, bytes): raise Exception("ERR:: hex_str must be in bytes!") temp_filename = f"pcap_{time()}" - wrpcap(temp_filename, hex_str) + wrpcap(temp_filename, bytes_from_hex) # type: ignore pcap_object = rdpcap(temp_filename) # try to clean after all try: os.remove(temp_filename) - except Exception: - logging.error(f"Cannot remove {temp_filename}") + except OSError as os_err: + logging.error(f"Cannot remove {temp_filename}\n{os_err}") converted_packets = [] current = pcap_object[0] diff --git a/packet_helper_core/utils/utils.py b/packet_helper_core/utils/utils.py index 47bd182..62694c8 100644 --- a/packet_helper_core/utils/utils.py +++ b/packet_helper_core/utils/utils.py @@ -3,7 +3,7 @@ from scapy_helper import get_hex -def hex_str_operation(h_string, with_new_line: bool = False): +def hex_str_operation(h_string, with_new_line: bool = False) -> str: z = "" tmp = [] for e, x in enumerate(h_string.replace(" ", "")): @@ -22,7 +22,7 @@ def hex_str_operation(h_string, with_new_line: bool = False): return " ".join(tmp) -def decode_hex(hex_str: str, use_json: bool = False) -> Packet: +def decode_hex(hex_str: str) -> Packet: frame_bytes: bytes = bytes.fromhex(hex_str) _custom_params = [ "-o", diff --git a/tests/test_core.py b/tests/test_core.py index ec7ff6c..6fb6a1b 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -1,58 +1,70 @@ +import pytest + from packet_helper_core import PacketData, PacketDataScapy +from packet_helper_core.checksum_status import ChecksumStatus from packet_helper_core.core import Core -from scapy.all import IP, TCP, Ether # type: ignore +from scapy.all import IP, TCP, Ether # noqa from scapy_helper import get_hex -class TestCore: - simple_ether_ip_tcp_hex_string = get_hex(Ether() / IP() / TCP()) - - def test_core_post_init(self): - core_results = Core(TestCore.simple_ether_ip_tcp_hex_string) - - assert isinstance(core_results.hex_string, str), "Should be String" - assert isinstance( - core_results.scapy_data, PacketDataScapy - ), "Should be PacketDataScapy" - assert isinstance(core_results.tshark_data, PacketData), "Should be PacketData" - - assert core_results.scapy_data.header == [ - "ETH", - "IP", - "TCP", - ], "Should be properly decoded" - assert core_results.tshark_data.header == [ - "ETH", - "IP", - "TCP", - ], "Should be properly decoded" - - def test_core_chksum_verification(self): - core_results = Core(get_hex(Ether() / IP() / IP() / TCP())) - assert core_results.tshark_data.chksum_list - - def test_negative_core_chksum_verification_with_wrong_chksum(self): - core_results2 = Core(get_hex(Ether() / IP() / IP(chksum=0) / TCP())) - assert core_results2.tshark_data.chksum_list[2]["chksum"] == "0x0000" - - def test_one_of_custom_problematic_cases(self): - core_results = Core( - "ffffffaaa9ff00000000001208004500003c0001000040047cbb7f0000017f" - "000001450000280001000040067ccd7f0000017f0000010014005000000000" - "0000000050022000917d0000" - ) - assert core_results.tshark_data.chksum_list[3]["chksum"] == "0x917d" - - def test_Ethernet_IP_UDP_DNS(self): - core_result = Core( - "00E01CCCCCC2001F33D9736108004500008000004000401124550A0A01010" - "A0A01040035DB66006C2D2E795681800001000200020000046D61696C0870" - "617472696F747302696E0000010001C00C0005000100002A4B0002C011C01" - "10001000100002A4C00044A358C99C011000200010001438C0006036E7332" - "C011C011000200010001438C0006036E7331C011" - ) - chksum_obj = core_result.tshark_data.chksum_list[2] - - assert chksum_obj["chksum"] == "0x2d2e" - assert chksum_obj["chksum_calculated"] == "0x2d2d" - assert chksum_obj["status"] is False +def test_core_post_init(): + expected_headers = [ + "ETH", + "IP", + "TCP", + ] + core_results = Core(get_hex(Ether() / IP() / TCP())) + + assert isinstance(core_results.hex_string, str), "Should be String" + assert isinstance( + core_results.scapy_data, PacketDataScapy + ), "Should be PacketDataScapy" + assert isinstance(core_results.tshark_data, PacketData), "Should be PacketData" + assert core_results.scapy_data.header == expected_headers, "Should be properly decoded" + assert core_results.tshark_data.header == expected_headers, "Should be properly decoded" + + +def test_core_chksum_verification(): + core_results = Core(get_hex(Ether() / IP() / IP() / TCP())) + assert core_results.tshark_data.chksum_list + assert len(core_results.tshark_data.chksum_list) == 4 + + +@pytest.mark.parametrize( + "packet, position_to_check, expected_chksum_value", + ( + (get_hex(Ether() / IP() / IP(chksum=0) / TCP()), 2, "0x0000"), + ( + ( + "ffffffaaa9ff00000000001208004500003c0001000040047cbb7f0000017f" + "000001450000280001000040067ccd7f0000017f0000010014005000000000" + "0000000050022000917d0000" + ), + 3, + "0x917d", + ), + ), +) +def test_negative_core_chksum_verification_with_wrong_chksum( + packet: str, position_to_check: int, expected_chksum_value: str +): + core_results2 = Core(packet) + assert ( + core_results2.tshark_data.chksum_list[position_to_check].chksum + == expected_chksum_value + ) + + +def test_ethernet_ip_udp_dns(): + core_result = Core( + "00E01CCCCCC2001F33D9736108004500008000004000401124550A0A01010" + "A0A01040035DB66006C2D2E795681800001000200020000046D61696C0870" + "617472696F747302696E0000010001C00C0005000100002A4B0002C011C01" + "10001000100002A4C00044A358C99C011000200010001438C0006036E7332" + "C011C011000200010001438C0006036E7331C011" + ) + chksum_obj: ChecksumStatus = core_result.tshark_data.chksum_list[2] + + assert chksum_obj.chksum == "0x2d2e" + assert chksum_obj.chksum_calculated == "0x2d2d" + assert chksum_obj.status is False diff --git a/tests/test_packet_data.py b/tests/test_packet_data.py index d5d2051..f712d77 100644 --- a/tests/test_packet_data.py +++ b/tests/test_packet_data.py @@ -1,6 +1,6 @@ from packet_helper_core.packet_data import PacketData from packet_helper_core.utils.utils import decode_hex -from scapy.layers.all import IP, TCP, Ether, IPv6 # type: ignore +from scapy.layers.all import IP, TCP, Ether, IPv6 # noqa from scapy_helper import get_hex from tests.utils.example_packets import EXAMPLE_ETHER, EXAMPLE_ETHER_IP_IPV6_GRE_DATA diff --git a/tests/utils/example_packets.py b/tests/utils/example_packets.py index d25c3d5..e3be1b8 100644 --- a/tests/utils/example_packets.py +++ b/tests/utils/example_packets.py @@ -1,4 +1,4 @@ -from scapy.all import IP, TCP, Ether # type: ignore +from scapy.all import IP, TCP, Ether # noqa EXAMPLE_ETHER = "ff fd df af ff ff 00 00 00 00 00 12 08 00" From 214269552fc4cceb11f0923974c865bc6319c5f9 Mon Sep 17 00:00:00 2001 From: Nex Sabre Date: Wed, 26 Oct 2022 21:32:22 +0200 Subject: [PATCH 2/6] Add pydantic to requirements.txt --- requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/requirements.txt b/requirements.txt index 1db101e..3c4ed03 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ pyshark==0.5.3 scapy~=2.4.5 scapy_helper==0.14.8 +pydantic~=1.10.1 From 6410e50b57a46d1073a47c8b60aa5fac39be95f6 Mon Sep 17 00:00:00 2001 From: Nex Sabre Date: Wed, 26 Oct 2022 22:15:18 +0200 Subject: [PATCH 3/6] Some improvements --- packet_helper_core/__init__.py | 2 +- packet_helper_core/core.py | 20 --------------- packet_helper_core/decoder.py | 30 ++++++++++++++++++++++ packet_helper_core/packet_data.py | 24 ++++++++--------- packet_helper_core/utils/conversion.py | 3 +-- tests/{test_core.py => test_decoder.py} | 34 +++++++++++++------------ tests/test_from_sh_list.py | 15 ++++++++--- 7 files changed, 72 insertions(+), 56 deletions(-) delete mode 100644 packet_helper_core/core.py create mode 100644 packet_helper_core/decoder.py rename tests/{test_core.py => test_decoder.py} (62%) diff --git a/packet_helper_core/__init__.py b/packet_helper_core/__init__.py index 5fa4029..0561cf0 100644 --- a/packet_helper_core/__init__.py +++ b/packet_helper_core/__init__.py @@ -1,3 +1,3 @@ -from packet_helper_core.core import Core +from packet_helper_core.decoder import Decoder from packet_helper_core.packet_data import PacketData from packet_helper_core.packet_data_scapy import PacketDataScapy diff --git a/packet_helper_core/core.py b/packet_helper_core/core.py deleted file mode 100644 index 17f5fc9..0000000 --- a/packet_helper_core/core.py +++ /dev/null @@ -1,20 +0,0 @@ -from dataclasses import dataclass - -from packet_helper_core.packet_data import PacketData -from packet_helper_core.packet_data_scapy import PacketDataScapy -from packet_helper_core.utils.utils import decode_hex - - -@dataclass -class Core: - """ - Class Core is just a wrapper to create a handy-shortcut - for preparing a data from hex string - """ - - hex_string: str = "" - - def __post_init__(self): - self.hex_string = self.hex_string.replace(" ", "") - self.tshark_data = PacketData(raw=str(decode_hex(self.hex_string))) - self.scapy_data = PacketDataScapy(self.hex_string, self.tshark_data) diff --git a/packet_helper_core/decoder.py b/packet_helper_core/decoder.py new file mode 100644 index 0000000..90ff872 --- /dev/null +++ b/packet_helper_core/decoder.py @@ -0,0 +1,30 @@ +from packet_helper_core.packet_data import PacketData +from packet_helper_core.packet_data_scapy import PacketDataScapy +from packet_helper_core.utils.utils import decode_hex + + +class Decoder: + """ + Class Decoder is just a wrapper to create a handy-shortcut + for preparing a data from hex string + """ + + def __init__(self, hex_string: str) -> None: + self.hex_string = hex_string.replace(" ", "") + + self.__decoded_by_tshark, self.__decoded_by_scapy = (None,) * 2 + + def run(self, extend_with_scapy: bool = True) -> None: + self.__decoded_by_tshark = PacketData(raw=str(decode_hex(self.hex_string))) + if extend_with_scapy: + self.__decoded_by_scapy = PacketDataScapy( + raw=self.hex_string, packet_data=self.tshark_data + ) + + @property + def tshark_data(self) -> PacketData | None: + return self.__decoded_by_tshark + + @property + def scapy_data(self) -> PacketDataScapy | None: + return self.__decoded_by_scapy diff --git a/packet_helper_core/packet_data.py b/packet_helper_core/packet_data.py index 9f48794..c811fc1 100644 --- a/packet_helper_core/packet_data.py +++ b/packet_helper_core/packet_data.py @@ -11,17 +11,15 @@ class PacketData: _data_layer: list[str] = field(default_factory=list) def __post_init__(self): - self.raw_array = self.raw.split("\n") - self.length = self.raw_array[0].replace(")", "").split()[2] - self.array = self.raw_array[1:] + self.array = self.raw.split("\n")[1:] - self.header = self.compose_header() - self.body = self.compose_body() - self.body2 = self.compose_body_list() + self.header = self.__compose_header() + self.body = self.__compose_body() + self.body2 = self.__compose_body_list() - self.update_header() + self.__update_header() - def compose_header(self): + def __compose_header(self): return [ a.replace("Layer", "").replace(":", "").replace(" ", "") for a in self.array @@ -35,7 +33,7 @@ def __is_data_element(self, layer_fragment: str) -> bool: return True return False - def compose_body(self) -> dict[str, list[str]]: + def __compose_body(self) -> dict[str, list[str]]: temp_body_dict: dict[str, list[str]] = {} actual_layer: str = "" for x in self.array: @@ -51,7 +49,7 @@ def compose_body(self) -> dict[str, list[str]]: temp_body_dict[actual_layer].append(x) return temp_body_dict - def compose_body_list(self) -> list[list[str]]: + def __compose_body_list(self) -> list[list[str]]: temp_body_dict = [] line = [] ckhsum_flag = False @@ -82,12 +80,12 @@ def compose_body_list(self) -> list[list[str]]: if ckhsum_flag: for y in temp_body_dict: - self.chksum_verification(y) + self.__chksum_verification(y) temp_body_dict.append(data_found) return temp_body_dict - def chksum_verification(self, element) -> None: + def __chksum_verification(self, element) -> None: chksum_status: ChecksumStatus = ChecksumStatus() for x in element: x = x.lower() @@ -104,7 +102,7 @@ def chksum_verification(self, element) -> None: chksum_status.verify() self.chksum_list.append(chksum_status) - def update_header(self): + def __update_header(self) -> None: """Update header with data layer which is 'hidden' in the tshark output""" if self._data_layer: self.header.append("RAW") diff --git a/packet_helper_core/utils/conversion.py b/packet_helper_core/utils/conversion.py index 1bd2d6c..11b8d92 100644 --- a/packet_helper_core/utils/conversion.py +++ b/packet_helper_core/utils/conversion.py @@ -7,13 +7,12 @@ def from_sh_list(packet_list) -> BasePacket: imported_all = importlib.import_module("scapy.all") def remove_none(): - return {k: v for k, v in _value.items() if v is not None} + return {k: v for k, v in layer.get(_key, {}).items() if v is not None} new_packet = None for layer in packet_list: if isinstance(layer, dict): _key = [x for x in layer.keys()][0] - _value = layer.get(_key) _value = remove_none() if _key == "Ethernet": _key = "Ether" diff --git a/tests/test_core.py b/tests/test_decoder.py similarity index 62% rename from tests/test_core.py rename to tests/test_decoder.py index 6fb6a1b..41e12b1 100644 --- a/tests/test_core.py +++ b/tests/test_decoder.py @@ -2,7 +2,7 @@ from packet_helper_core import PacketData, PacketDataScapy from packet_helper_core.checksum_status import ChecksumStatus -from packet_helper_core.core import Core +from packet_helper_core.decoder import Decoder from scapy.all import IP, TCP, Ether # noqa from scapy_helper import get_hex @@ -13,21 +13,21 @@ def test_core_post_init(): "IP", "TCP", ] - core_results = Core(get_hex(Ether() / IP() / TCP())) + decoder = Decoder(get_hex(Ether() / IP() / TCP())) + decoder.run() - assert isinstance(core_results.hex_string, str), "Should be String" - assert isinstance( - core_results.scapy_data, PacketDataScapy - ), "Should be PacketDataScapy" - assert isinstance(core_results.tshark_data, PacketData), "Should be PacketData" - assert core_results.scapy_data.header == expected_headers, "Should be properly decoded" - assert core_results.tshark_data.header == expected_headers, "Should be properly decoded" + assert isinstance(decoder.hex_string, str), "Should be String" + assert isinstance(decoder.scapy_data, PacketDataScapy), "Should be PacketDataScapy" + assert isinstance(decoder.tshark_data, PacketData), "Should be PacketData" + assert decoder.scapy_data.header == expected_headers, "Should be properly decoded" + assert decoder.tshark_data.header == expected_headers, "Should be properly decoded" def test_core_chksum_verification(): - core_results = Core(get_hex(Ether() / IP() / IP() / TCP())) - assert core_results.tshark_data.chksum_list - assert len(core_results.tshark_data.chksum_list) == 4 + decoder = Decoder(get_hex(Ether() / IP() / IP() / TCP())) + decoder.run() + assert decoder.tshark_data.chksum_list + assert len(decoder.tshark_data.chksum_list) == 4 @pytest.mark.parametrize( @@ -48,22 +48,24 @@ def test_core_chksum_verification(): def test_negative_core_chksum_verification_with_wrong_chksum( packet: str, position_to_check: int, expected_chksum_value: str ): - core_results2 = Core(packet) + decoder = Decoder(packet) + decoder.run() assert ( - core_results2.tshark_data.chksum_list[position_to_check].chksum + decoder.tshark_data.chksum_list[position_to_check].chksum == expected_chksum_value ) def test_ethernet_ip_udp_dns(): - core_result = Core( + decoder = Decoder( "00E01CCCCCC2001F33D9736108004500008000004000401124550A0A01010" "A0A01040035DB66006C2D2E795681800001000200020000046D61696C0870" "617472696F747302696E0000010001C00C0005000100002A4B0002C011C01" "10001000100002A4C00044A358C99C011000200010001438C0006036E7332" "C011C011000200010001438C0006036E7331C011" ) - chksum_obj: ChecksumStatus = core_result.tshark_data.chksum_list[2] + decoder.run() + chksum_obj: ChecksumStatus = decoder.tshark_data.chksum_list[2] assert chksum_obj.chksum == "0x2d2e" assert chksum_obj.chksum_calculated == "0x2d2d" diff --git a/tests/test_from_sh_list.py b/tests/test_from_sh_list.py index 4cd01d1..b8c8feb 100644 --- a/tests/test_from_sh_list.py +++ b/tests/test_from_sh_list.py @@ -1,4 +1,8 @@ +from typing import Any + import pytest +from scapy.base_classes import BasePacket + from packet_helper_core.utils.conversion import from_sh_list from scapy.packet import Packet from scapy_helper import get_hex, to_list @@ -10,8 +14,11 @@ "packet", (SIMPLE_IP_IN_IP_PACKET, SIMPLE_IP_IN_IP_PACKET / SIMPLE_IP_IN_IP_PACKET) ) def test_from_sh_list(packet: Packet) -> None: - packet_list = to_list(packet) - new_packet = from_sh_list(packet_list) + packet_list: list[dict[str, Any]] = to_list(packet) + packet_generated_from_scapy_helper = from_sh_list(packet_list) - assert get_hex(packet) == get_hex(new_packet) - assert packet_list == to_list(new_packet) + assert isinstance(packet_generated_from_scapy_helper, BasePacket) + assert get_hex(packet) == get_hex( + packet_generated_from_scapy_helper + ), "Packets should return same hex results" + assert packet_list == to_list(packet_generated_from_scapy_helper) From 1ce5c6c5f84495c2290f6f84fb7c4e48088bf247 Mon Sep 17 00:00:00 2001 From: Nex Sabre Date: Mon, 7 Nov 2022 17:27:46 +0100 Subject: [PATCH 4/6] Some improvements --- packet_helper_core/__init__.py | 4 +- packet_helper_core/decoder.py | 30 ---------- packet_helper_core/decoders/__init__.py | 0 packet_helper_core/decoders/decode_string.py | 30 ++++++++++ .../scapy_data.py} | 7 +-- .../tshark_data.py} | 6 +- .../{ => models}/checksum_status.py | 0 packet_helper_core/models/scapy_data.py | 21 ------- packet_helper_core/packethelper.py | 30 ++++++++++ packet_helper_core/utils/utils.py | 55 ------------------- tests/test_decoder.py | 40 +++++++------- tests/test_packet_data.py | 19 ++++--- 12 files changed, 100 insertions(+), 142 deletions(-) delete mode 100644 packet_helper_core/decoder.py create mode 100644 packet_helper_core/decoders/__init__.py create mode 100644 packet_helper_core/decoders/decode_string.py rename packet_helper_core/{packet_data_scapy.py => decoders/scapy_data.py} (91%) rename packet_helper_core/{packet_data.py => decoders/tshark_data.py} (96%) rename packet_helper_core/{ => models}/checksum_status.py (100%) delete mode 100644 packet_helper_core/models/scapy_data.py create mode 100644 packet_helper_core/packethelper.py delete mode 100644 packet_helper_core/utils/utils.py diff --git a/packet_helper_core/__init__.py b/packet_helper_core/__init__.py index 0561cf0..938190c 100644 --- a/packet_helper_core/__init__.py +++ b/packet_helper_core/__init__.py @@ -1,3 +1 @@ -from packet_helper_core.decoder import Decoder -from packet_helper_core.packet_data import PacketData -from packet_helper_core.packet_data_scapy import PacketDataScapy +from packet_helper_core.packethelper import PacketHelper diff --git a/packet_helper_core/decoder.py b/packet_helper_core/decoder.py deleted file mode 100644 index 90ff872..0000000 --- a/packet_helper_core/decoder.py +++ /dev/null @@ -1,30 +0,0 @@ -from packet_helper_core.packet_data import PacketData -from packet_helper_core.packet_data_scapy import PacketDataScapy -from packet_helper_core.utils.utils import decode_hex - - -class Decoder: - """ - Class Decoder is just a wrapper to create a handy-shortcut - for preparing a data from hex string - """ - - def __init__(self, hex_string: str) -> None: - self.hex_string = hex_string.replace(" ", "") - - self.__decoded_by_tshark, self.__decoded_by_scapy = (None,) * 2 - - def run(self, extend_with_scapy: bool = True) -> None: - self.__decoded_by_tshark = PacketData(raw=str(decode_hex(self.hex_string))) - if extend_with_scapy: - self.__decoded_by_scapy = PacketDataScapy( - raw=self.hex_string, packet_data=self.tshark_data - ) - - @property - def tshark_data(self) -> PacketData | None: - return self.__decoded_by_tshark - - @property - def scapy_data(self) -> PacketDataScapy | None: - return self.__decoded_by_scapy diff --git a/packet_helper_core/decoders/__init__.py b/packet_helper_core/decoders/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/packet_helper_core/decoders/decode_string.py b/packet_helper_core/decoders/decode_string.py new file mode 100644 index 0000000..4634be9 --- /dev/null +++ b/packet_helper_core/decoders/decode_string.py @@ -0,0 +1,30 @@ +import functools + +from pyshark import InMemCapture +from pyshark.packet.packet import Packet + + +@functools.cache +def decode_string(hex_str: str) -> Packet: + """ + Decode string (in string-hex format) using 'InMemCapture' using a tshark. + """ + _custom_params = [ + "-o", + "tcp.check_checksum:TRUE", + "-o", + "ip.check_checksum:TRUE", + "-o", + "stt.check_checksum:TRUE", + "-o", + "udp.check_checksum:TRUE", + "-o", + "wlan.check_checksum:TRUE", + ] + # only interested with the first packet + packet = InMemCapture(custom_parameters=_custom_params) + decoded_packet: Packet = packet.parse_packet( + bytes.fromhex(hex_str.replace(" ", "")) + ) + packet.close() + return decoded_packet diff --git a/packet_helper_core/packet_data_scapy.py b/packet_helper_core/decoders/scapy_data.py similarity index 91% rename from packet_helper_core/packet_data_scapy.py rename to packet_helper_core/decoders/scapy_data.py index 47cd72d..27ca056 100644 --- a/packet_helper_core/packet_data_scapy.py +++ b/packet_helper_core/decoders/scapy_data.py @@ -2,15 +2,14 @@ from scapy_helper import get_hex -from packet_helper_core.models.scapy_data import ScapyData -from packet_helper_core.packet_data import PacketData +from packet_helper_core.decoders.tshark_data import TSharkData from packet_helper_core.utils.scapy_reader import scapy_reader @dataclass -class PacketDataScapy: +class ScapyData: raw: str - packet_data: PacketData + packet_data: TSharkData def __post_init__(self): self.header = [x.replace("\r", "") for x in self.packet_data.header] diff --git a/packet_helper_core/packet_data.py b/packet_helper_core/decoders/tshark_data.py similarity index 96% rename from packet_helper_core/packet_data.py rename to packet_helper_core/decoders/tshark_data.py index c811fc1..5b6e945 100644 --- a/packet_helper_core/packet_data.py +++ b/packet_helper_core/decoders/tshark_data.py @@ -1,10 +1,10 @@ from dataclasses import dataclass, field -from packet_helper_core.checksum_status import ChecksumStatus +from packet_helper_core.models.checksum_status import ChecksumStatus @dataclass -class PacketData: +class TSharkData: raw: str chksum_list: list[ChecksumStatus] = field(default_factory=list) @@ -19,7 +19,7 @@ def __post_init__(self): self.__update_header() - def __compose_header(self): + def __compose_header(self) -> list[str]: return [ a.replace("Layer", "").replace(":", "").replace(" ", "") for a in self.array diff --git a/packet_helper_core/checksum_status.py b/packet_helper_core/models/checksum_status.py similarity index 100% rename from packet_helper_core/checksum_status.py rename to packet_helper_core/models/checksum_status.py diff --git a/packet_helper_core/models/scapy_data.py b/packet_helper_core/models/scapy_data.py deleted file mode 100644 index f156660..0000000 --- a/packet_helper_core/models/scapy_data.py +++ /dev/null @@ -1,21 +0,0 @@ -from typing import Literal - -from pydantic import BaseModel - -from packet_helper_core.checksum_status import ChecksumStatus - - -class ScapyData(BaseModel): - name: str - bytes_record: str # bytes - hex_record: str # hex - hex_record_full: str # hex_one - length: int - length_unit: Literal[ - "B", - ] # length_unit - representation: str # repr - representation_full: str # repr_full - tshark_name: str = "" - tshark_raw_summary: str = "" - chksum_status: ChecksumStatus | None = None diff --git a/packet_helper_core/packethelper.py b/packet_helper_core/packethelper.py new file mode 100644 index 0000000..c6be9d6 --- /dev/null +++ b/packet_helper_core/packethelper.py @@ -0,0 +1,30 @@ +from packet_helper_core.decoders.decode_string import decode_string +from packet_helper_core.decoders.tshark_data import TSharkData +from packet_helper_core.decoders.scapy_data import ScapyData + + +class PacketHelper: + """ + Class PacketHelper is just a wrapper to create a handy-shortcut + for preparing a data from hex string + """ + + def __init__(self) -> None: + self.hex_string, self.__decoded_by_tshark, self.__decoded_by_scapy = (None,) * 3 + + def decode(self, hex_string: str, extend_with_scapy: bool = True) -> None: + self.hex_string = hex_string.replace(" ", "") + _decoded_string = decode_string(hex_string) + self.__decoded_by_tshark = TSharkData(raw=str(_decoded_string)) + if extend_with_scapy: + self.__decoded_by_scapy = ScapyData( + raw=hex_string, packet_data=self.__decoded_by_tshark + ) + + @property + def tshark_data(self) -> TSharkData | None: + return self.__decoded_by_tshark + + @property + def scapy_data(self) -> ScapyData | None: + return self.__decoded_by_scapy diff --git a/packet_helper_core/utils/utils.py b/packet_helper_core/utils/utils.py deleted file mode 100644 index 62694c8..0000000 --- a/packet_helper_core/utils/utils.py +++ /dev/null @@ -1,55 +0,0 @@ -import pyshark -from pyshark.packet.packet import Packet -from scapy_helper import get_hex - - -def hex_str_operation(h_string, with_new_line: bool = False) -> str: - z = "" - tmp = [] - for e, x in enumerate(h_string.replace(" ", "")): - z += x - if e % 2: - tmp.append(z) - z = "" - if with_new_line: - temp_list = [] - for e, v in enumerate(tmp, 1): - if not e % 16: - temp_list.append(f"{v}\n") - continue - temp_list.append(f"{v} ") - return "".join(temp_list) - return " ".join(tmp) - - -def decode_hex(hex_str: str) -> Packet: - frame_bytes: bytes = bytes.fromhex(hex_str) - _custom_params = [ - "-o", - "tcp.check_checksum:TRUE", - "-o", - "ip.check_checksum:TRUE", - "-o", - "stt.check_checksum:TRUE", - "-o", - "udp.check_checksum:TRUE", - "-o", - "wlan.check_checksum:TRUE", - ] - # only interested with the first packet - packet = pyshark.InMemCapture(custom_parameters=_custom_params) - return packet.parse_packet(frame_bytes) - - -def better_scapy_summary(scapy_summary) -> list: - list_ = [] - for frame in scapy_summary: - temp_frame = { - "name": frame.name, - "bytes": frame.raw_packet_cache, - "hex": get_hex(frame.raw_packet_cache), - "length": len(frame.raw_packet_cache), - "repr": f"{repr(frame).split(' |')[0]}>", - } - list_.append(temp_frame) - return list_ diff --git a/tests/test_decoder.py b/tests/test_decoder.py index 41e12b1..b969637 100644 --- a/tests/test_decoder.py +++ b/tests/test_decoder.py @@ -1,8 +1,9 @@ import pytest -from packet_helper_core import PacketData, PacketDataScapy -from packet_helper_core.checksum_status import ChecksumStatus -from packet_helper_core.decoder import Decoder +from packet_helper_core.decoders.tshark_data import TSharkData +from packet_helper_core.decoders.scapy_data import ScapyData +from packet_helper_core.models.checksum_status import ChecksumStatus +from packet_helper_core.packethelper import PacketHelper from scapy.all import IP, TCP, Ether # noqa from scapy_helper import get_hex @@ -13,19 +14,19 @@ def test_core_post_init(): "IP", "TCP", ] - decoder = Decoder(get_hex(Ether() / IP() / TCP())) - decoder.run() + ph = PacketHelper() + ph.decode(get_hex(Ether() / IP() / TCP())) - assert isinstance(decoder.hex_string, str), "Should be String" - assert isinstance(decoder.scapy_data, PacketDataScapy), "Should be PacketDataScapy" - assert isinstance(decoder.tshark_data, PacketData), "Should be PacketData" - assert decoder.scapy_data.header == expected_headers, "Should be properly decoded" - assert decoder.tshark_data.header == expected_headers, "Should be properly decoded" + assert isinstance(ph.hex_string, str), "Should be String" + assert isinstance(ph.scapy_data, ScapyData), "Should be Scapy Data" + assert isinstance(ph.tshark_data, TSharkData), "Should be TShark Data" + assert ph.scapy_data.header == expected_headers, "Should be properly decoded" + assert ph.tshark_data.header == expected_headers, "Should be properly decoded" def test_core_chksum_verification(): - decoder = Decoder(get_hex(Ether() / IP() / IP() / TCP())) - decoder.run() + decoder = PacketHelper() + decoder.decode(get_hex(Ether() / IP() / IP() / TCP())) assert decoder.tshark_data.chksum_list assert len(decoder.tshark_data.chksum_list) == 4 @@ -48,24 +49,25 @@ def test_core_chksum_verification(): def test_negative_core_chksum_verification_with_wrong_chksum( packet: str, position_to_check: int, expected_chksum_value: str ): - decoder = Decoder(packet) - decoder.run() + ph = PacketHelper() + ph.decode(packet) assert ( - decoder.tshark_data.chksum_list[position_to_check].chksum - == expected_chksum_value + ph.tshark_data.chksum_list[position_to_check].chksum + == expected_chksum_value ) def test_ethernet_ip_udp_dns(): - decoder = Decoder( + packet = ( "00E01CCCCCC2001F33D9736108004500008000004000401124550A0A01010" "A0A01040035DB66006C2D2E795681800001000200020000046D61696C0870" "617472696F747302696E0000010001C00C0005000100002A4B0002C011C01" "10001000100002A4C00044A358C99C011000200010001438C0006036E7332" "C011C011000200010001438C0006036E7331C011" ) - decoder.run() - chksum_obj: ChecksumStatus = decoder.tshark_data.chksum_list[2] + ph = PacketHelper() + ph.decode(packet) + chksum_obj: ChecksumStatus = ph.tshark_data.chksum_list[2] assert chksum_obj.chksum == "0x2d2e" assert chksum_obj.chksum_calculated == "0x2d2d" diff --git a/tests/test_packet_data.py b/tests/test_packet_data.py index f712d77..665d849 100644 --- a/tests/test_packet_data.py +++ b/tests/test_packet_data.py @@ -1,27 +1,32 @@ -from packet_helper_core.packet_data import PacketData -from packet_helper_core.utils.utils import decode_hex from scapy.layers.all import IP, TCP, Ether, IPv6 # noqa from scapy_helper import get_hex +from packet_helper_core import PacketHelper, TSharkData +from packet_helper_core.decoders.decode_string import decode_string from tests.utils.example_packets import EXAMPLE_ETHER, EXAMPLE_ETHER_IP_IPV6_GRE_DATA def test_packet_data(): - packet = decode_hex(EXAMPLE_ETHER) + packet = decode_string(EXAMPLE_ETHER) assert packet.__getitem__("eth"), "Layer Ether should be available in decoded hex" - pd = PacketData(raw=str(packet)) + pd = TSharkData(raw=str(packet)) assert "ETH" in pd.header, "Ether header should be found at packet" +def test_decode(): + packet = PacketHelper(hex_string=EXAMPLE_ETHER).decode() + print(packet) + + def test_decode_hex__data_should_be_present_after_gre_packet(): expected_data = ( "0035003500310000736f6d652072616e646f6d20737472696" "e672031313233343435393832373334393832373334323334" ) - packet = decode_hex(EXAMPLE_ETHER_IP_IPV6_GRE_DATA) - pd = PacketData(raw=str(packet)) + packet = decode_string(EXAMPLE_ETHER_IP_IPV6_GRE_DATA) + pd = TSharkData(raw=str(packet)) packet_raw_data = pd.body.get("RAW", []) assert packet_raw_data, "RAW block should be available" extracted_data_from_raw = packet_raw_data[0].split()[-1] @@ -33,7 +38,7 @@ def test_decode_hex__data_should_be_present_after_gre_packet(): def test_custom_packet_data(): frame = Ether() / IP() / IPv6() / TCP() - packet = decode_hex(get_hex(frame)) + packet = decode_string(get_hex(frame)) list_of_expected_packets = ("ETH", "IP", "IPV6", "TCP") list_of_layers_from_packet = [x.layer_name.upper() for x in packet.layers] for expected_packet in list_of_expected_packets: From 5c769ca6dc85a35d093f17ba884c35eef10ccffa Mon Sep 17 00:00:00 2001 From: Nex Sabre Date: Mon, 7 Nov 2022 19:39:05 +0100 Subject: [PATCH 5/6] Some improvements --- packet_helper_core/decoders/scapy_data.py | 5 +++-- packet_helper_core/decoders/tshark_data.py | 18 +++++++++-------- packet_helper_core/models/scapy_response.py | 21 ++++++++++++++++++++ packet_helper_core/packethelper.py | 6 +++--- setup.py | 2 +- tests/test_decoder.py | 5 +---- tests/test_packet_data.py | 22 ++++++++++----------- 7 files changed, 49 insertions(+), 30 deletions(-) create mode 100644 packet_helper_core/models/scapy_response.py diff --git a/packet_helper_core/decoders/scapy_data.py b/packet_helper_core/decoders/scapy_data.py index 27ca056..506c9fd 100644 --- a/packet_helper_core/decoders/scapy_data.py +++ b/packet_helper_core/decoders/scapy_data.py @@ -3,6 +3,7 @@ from scapy_helper import get_hex from packet_helper_core.decoders.tshark_data import TSharkData +from packet_helper_core.models.scapy_response import ScapyResponse from packet_helper_core.utils.scapy_reader import scapy_reader @@ -21,12 +22,12 @@ def __post_init__(self): self.structure = self.__make_structure() def __make_structure(self): - temp_structure: list[ScapyData] = [] + temp_structure: list[ScapyResponse] = [] for index, header in enumerate(self.headers_scapy): scapy_header = self.headers_scapy[index].copy() scapy_header.remove_payload() - scapy_data_dict: ScapyData = ScapyData( + scapy_data_dict: ScapyResponse = ScapyResponse( **{ "name": header.name, "bytes_record": str(header), diff --git a/packet_helper_core/decoders/tshark_data.py b/packet_helper_core/decoders/tshark_data.py index 5b6e945..f031f71 100644 --- a/packet_helper_core/decoders/tshark_data.py +++ b/packet_helper_core/decoders/tshark_data.py @@ -1,28 +1,30 @@ from dataclasses import dataclass, field +from pyshark.packet.packet import Packet + from packet_helper_core.models.checksum_status import ChecksumStatus @dataclass class TSharkData: - raw: str + decoded_packet: Packet chksum_list: list[ChecksumStatus] = field(default_factory=list) _data_layer: list[str] = field(default_factory=list) def __post_init__(self): - self.array = self.raw.split("\n")[1:] + self.__pkt_information_array = str(self.decoded_packet).split("\n")[1:] - self.header = self.__compose_header() - self.body = self.__compose_body() - self.body2 = self.__compose_body_list() + self.header: list[str] = self.__compose_header() + self.body: dict[str, list[str]] = self.__compose_body() + self.body2: list[list[str]] = self.__compose_body_list() self.__update_header() def __compose_header(self) -> list[str]: return [ a.replace("Layer", "").replace(":", "").replace(" ", "") - for a in self.array + for a in self.__pkt_information_array if a.startswith("Layer") ] @@ -36,7 +38,7 @@ def __is_data_element(self, layer_fragment: str) -> bool: def __compose_body(self) -> dict[str, list[str]]: temp_body_dict: dict[str, list[str]] = {} actual_layer: str = "" - for x in self.array: + for x in self.__pkt_information_array: if x.startswith("Layer"): actual_layer = x.replace(":", "").split()[1] temp_body_dict[actual_layer] = [] @@ -56,7 +58,7 @@ def __compose_body_list(self) -> list[list[str]]: data_found: list[str] = [ "RAW", ] - for arr in self.array: + for arr in self.__pkt_information_array: arr = arr.strip() if arr == "" and line: temp_body_dict.append(line) diff --git a/packet_helper_core/models/scapy_response.py b/packet_helper_core/models/scapy_response.py new file mode 100644 index 0000000..06a97c8 --- /dev/null +++ b/packet_helper_core/models/scapy_response.py @@ -0,0 +1,21 @@ +from typing import Literal + +from pydantic import BaseModel + +from packet_helper_core.models.checksum_status import ChecksumStatus + + +class ScapyResponse(BaseModel): + name: str + bytes_record: str # bytes + hex_record: str # hex + hex_record_full: str # hex_one + length: int + length_unit: Literal[ + "B", + ] # length_unit + representation: str # repr + representation_full: str # repr_full + tshark_name: str = "" + tshark_raw_summary: str = "" + chksum_status: ChecksumStatus | None = None diff --git a/packet_helper_core/packethelper.py b/packet_helper_core/packethelper.py index c6be9d6..ff5abb7 100644 --- a/packet_helper_core/packethelper.py +++ b/packet_helper_core/packethelper.py @@ -14,11 +14,11 @@ def __init__(self) -> None: def decode(self, hex_string: str, extend_with_scapy: bool = True) -> None: self.hex_string = hex_string.replace(" ", "") - _decoded_string = decode_string(hex_string) - self.__decoded_by_tshark = TSharkData(raw=str(_decoded_string)) + decoded_string = decode_string(self.hex_string) + self.__decoded_by_tshark = TSharkData(decoded_packet=decoded_string) if extend_with_scapy: self.__decoded_by_scapy = ScapyData( - raw=hex_string, packet_data=self.__decoded_by_tshark + raw=self.hex_string, packet_data=self.__decoded_by_tshark ) @property diff --git a/setup.py b/setup.py index 212305d..888f3d5 100644 --- a/setup.py +++ b/setup.py @@ -16,7 +16,7 @@ setup( name="packet_helper_core", - description="Engine to decode raw string hex into packets", + description="Engine to decode decoded_packet string hex into packets", long_description=long_description, long_description_content_type="text/markdown", author="Nex Sabre", diff --git a/tests/test_decoder.py b/tests/test_decoder.py index b969637..8522035 100644 --- a/tests/test_decoder.py +++ b/tests/test_decoder.py @@ -51,10 +51,7 @@ def test_negative_core_chksum_verification_with_wrong_chksum( ): ph = PacketHelper() ph.decode(packet) - assert ( - ph.tshark_data.chksum_list[position_to_check].chksum - == expected_chksum_value - ) + assert ph.tshark_data.chksum_list[position_to_check].chksum == expected_chksum_value def test_ethernet_ip_udp_dns(): diff --git a/tests/test_packet_data.py b/tests/test_packet_data.py index 665d849..30537b0 100644 --- a/tests/test_packet_data.py +++ b/tests/test_packet_data.py @@ -1,22 +1,20 @@ from scapy.layers.all import IP, TCP, Ether, IPv6 # noqa from scapy_helper import get_hex -from packet_helper_core import PacketHelper, TSharkData +from packet_helper_core import PacketHelper from packet_helper_core.decoders.decode_string import decode_string +from packet_helper_core.decoders.tshark_data import TSharkData from tests.utils.example_packets import EXAMPLE_ETHER, EXAMPLE_ETHER_IP_IPV6_GRE_DATA def test_packet_data(): - packet = decode_string(EXAMPLE_ETHER) - assert packet.__getitem__("eth"), "Layer Ether should be available in decoded hex" + decoded_pkt = decode_string(EXAMPLE_ETHER) + assert decoded_pkt.__getitem__( + "eth" + ), "Layer Ether should be available in decoded hex" - pd = TSharkData(raw=str(packet)) - assert "ETH" in pd.header, "Ether header should be found at packet" - - -def test_decode(): - packet = PacketHelper(hex_string=EXAMPLE_ETHER).decode() - print(packet) + pd = TSharkData(decoded_packet=decoded_pkt) + assert "ETH" in pd.header, "Ether header should be found at decoded_pkt" def test_decode_hex__data_should_be_present_after_gre_packet(): @@ -25,8 +23,8 @@ def test_decode_hex__data_should_be_present_after_gre_packet(): "e672031313233343435393832373334393832373334323334" ) - packet = decode_string(EXAMPLE_ETHER_IP_IPV6_GRE_DATA) - pd = TSharkData(raw=str(packet)) + decoded_pkt = decode_string(EXAMPLE_ETHER_IP_IPV6_GRE_DATA) + pd = TSharkData(decoded_packet=decoded_pkt) packet_raw_data = pd.body.get("RAW", []) assert packet_raw_data, "RAW block should be available" extracted_data_from_raw = packet_raw_data[0].split()[-1] From 7e848c5ee703b5f7a20904618a7fb13c46a6898e Mon Sep 17 00:00:00 2001 From: Nex Sabre Date: Mon, 7 Nov 2022 20:06:16 +0100 Subject: [PATCH 6/6] Some improvements --- packet_helper_core/decoders/scapy_data.py | 29 +++++++++++++--------- packet_helper_core/decoders/tshark_data.py | 4 +-- tests/decoders/__init__.py | 0 tests/decoders/test_scapy_data.py | 25 +++++++++++++++++++ tests/test_decoder.py | 2 +- tests/test_packet_data.py | 2 +- 6 files changed, 46 insertions(+), 16 deletions(-) create mode 100644 tests/decoders/__init__.py create mode 100644 tests/decoders/test_scapy_data.py diff --git a/packet_helper_core/decoders/scapy_data.py b/packet_helper_core/decoders/scapy_data.py index 506c9fd..3f1674c 100644 --- a/packet_helper_core/decoders/scapy_data.py +++ b/packet_helper_core/decoders/scapy_data.py @@ -1,5 +1,6 @@ from dataclasses import dataclass +from scapy.packet import Packet from scapy_helper import get_hex from packet_helper_core.decoders.tshark_data import TSharkData @@ -13,20 +14,24 @@ class ScapyData: packet_data: TSharkData def __post_init__(self): - self.header = [x.replace("\r", "") for x in self.packet_data.header] - self.headers_scapy = scapy_reader(self.raw) + self.headers: list[str] = [x.replace("\r", "") for x in self.packet_data.header] + self.scapy_headers: list[Packet] = scapy_reader(self.raw) - self.headers_full = [repr(x) for x in self.headers_scapy] - self.headers_single = [f"{x.split(' |')[0]}>" for x in self.headers_full] + self.full_scapy_representation_headers: list[str] = [ + repr(x) for x in self.scapy_headers + ] + self.single_scapy_representation_headers = [ + f"{x.split(' |')[0]}>" for x in self.full_scapy_representation_headers + ] - self.structure = self.__make_structure() + self.packet_structure = self.__make_structure() - def __make_structure(self): - temp_structure: list[ScapyResponse] = [] + def __make_structure(self) -> list[ScapyResponse]: + scapy_responses: list[ScapyResponse] = [] - for index, header in enumerate(self.headers_scapy): - scapy_header = self.headers_scapy[index].copy() - scapy_header.remove_payload() + for index, header in enumerate(self.scapy_headers): + scapy_header = self.scapy_headers[index].copy() + scapy_header.remove_payload() # payload is not necessary for our usage in this case scapy_data_dict: ScapyResponse = ScapyResponse( **{ "name": header.name, @@ -49,5 +54,5 @@ def __make_structure(self): scapy_data_dict.chksum_status = self.packet_data.chksum_list[index] - temp_structure.append(scapy_data_dict) - return temp_structure + scapy_responses.append(scapy_data_dict) + return scapy_responses diff --git a/packet_helper_core/decoders/tshark_data.py b/packet_helper_core/decoders/tshark_data.py index f031f71..a00caea 100644 --- a/packet_helper_core/decoders/tshark_data.py +++ b/packet_helper_core/decoders/tshark_data.py @@ -91,7 +91,7 @@ def __chksum_verification(self, element) -> None: chksum_status: ChecksumStatus = ChecksumStatus() for x in element: x = x.lower() - if "header checksum" in x and "incorrect" in x: + if "headers checksum" in x and "incorrect" in x: chksum_status.chksum = x.split(":")[1].split()[0] continue if "bad checksum" in x and not chksum_status.chksum: @@ -105,6 +105,6 @@ def __chksum_verification(self, element) -> None: self.chksum_list.append(chksum_status) def __update_header(self) -> None: - """Update header with data layer which is 'hidden' in the tshark output""" + """Update headers with data layer which is 'hidden' in the tshark output""" if self._data_layer: self.header.append("RAW") diff --git a/tests/decoders/__init__.py b/tests/decoders/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/decoders/test_scapy_data.py b/tests/decoders/test_scapy_data.py new file mode 100644 index 0000000..496448d --- /dev/null +++ b/tests/decoders/test_scapy_data.py @@ -0,0 +1,25 @@ +import pytest +from scapy.all import IP, TCP, Ether # noqa +from scapy_helper import get_hex + +from packet_helper_core import PacketHelper +from packet_helper_core.decoders.scapy_data import ScapyData + + +@pytest.fixture +def decode_example_packet() -> PacketHelper: + ph = PacketHelper() + ph.decode(get_hex(Ether() / IP() / IP() / TCP())) + return ph + + +def test_scapy_data(decode_example_packet: PacketHelper) -> None: + scapy_data = ScapyData( + decode_example_packet.hex_string, decode_example_packet.tshark_data + ) + assert scapy_data + assert scapy_data.headers + assert scapy_data.scapy_headers + assert scapy_data.full_scapy_representation_headers + assert scapy_data.single_scapy_representation_headers + assert scapy_data.packet_structure diff --git a/tests/test_decoder.py b/tests/test_decoder.py index 8522035..27f7cf0 100644 --- a/tests/test_decoder.py +++ b/tests/test_decoder.py @@ -20,7 +20,7 @@ def test_core_post_init(): assert isinstance(ph.hex_string, str), "Should be String" assert isinstance(ph.scapy_data, ScapyData), "Should be Scapy Data" assert isinstance(ph.tshark_data, TSharkData), "Should be TShark Data" - assert ph.scapy_data.header == expected_headers, "Should be properly decoded" + assert ph.scapy_data.headers == expected_headers, "Should be properly decoded" assert ph.tshark_data.header == expected_headers, "Should be properly decoded" diff --git a/tests/test_packet_data.py b/tests/test_packet_data.py index 30537b0..5641e66 100644 --- a/tests/test_packet_data.py +++ b/tests/test_packet_data.py @@ -14,7 +14,7 @@ def test_packet_data(): ), "Layer Ether should be available in decoded hex" pd = TSharkData(decoded_packet=decoded_pkt) - assert "ETH" in pd.header, "Ether header should be found at decoded_pkt" + assert "ETH" in pd.header, "Ether headers should be found at decoded_pkt" def test_decode_hex__data_should_be_present_after_gre_packet():