diff --git a/packet_helper_core/__init__.py b/packet_helper_core/__init__.py index 5fa4029..938190c 100644 --- a/packet_helper_core/__init__.py +++ b/packet_helper_core/__init__.py @@ -1,3 +1 @@ -from packet_helper_core.core import Core -from packet_helper_core.packet_data import PacketData -from packet_helper_core.packet_data_scapy import PacketDataScapy +from packet_helper_core.packethelper import PacketHelper diff --git a/packet_helper_core/checksum_status.py b/packet_helper_core/checksum_status.py deleted file mode 100644 index b039e71..0000000 --- a/packet_helper_core/checksum_status.py +++ /dev/null @@ -1,16 +0,0 @@ -from dataclasses import dataclass - - -@dataclass -class ChecksumStatus: - chksum: str = "" - chksum_calculated: str = "" - status: bool | None = None - - def __call__(self, *args, **kwargs): - def clean_chksum(element: str): - return element.replace("0x", "") - - if self.chksum == "" or self.chksum_calculated == "": - return - self.status = clean_chksum(self.chksum) == clean_chksum(self.chksum_calculated) diff --git a/packet_helper_core/core.py b/packet_helper_core/core.py deleted file mode 100644 index f1a85fc..0000000 --- a/packet_helper_core/core.py +++ /dev/null @@ -1,20 +0,0 @@ -from dataclasses import dataclass - -from packet_helper_core.packet_data import PacketData -from packet_helper_core.packet_data_scapy import PacketDataScapy -from packet_helper_core.utils.utils import decode_hex - - -@dataclass -class Core: - """ - Class Core is just a wrapper to create a handy-shortcut - for preparing a data from hex string - """ - - hex_string: str = "" - - def __post_init__(self): - self.hex_string = self.hex_string.replace(" ", "") - self.tshark_data = PacketData(str(decode_hex(self.hex_string))) - self.scapy_data = PacketDataScapy(self.hex_string, self.tshark_data) diff --git a/packet_helper_core/decoders/__init__.py b/packet_helper_core/decoders/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/packet_helper_core/decoders/decode_string.py b/packet_helper_core/decoders/decode_string.py new file mode 100644 index 0000000..4634be9 --- /dev/null +++ b/packet_helper_core/decoders/decode_string.py @@ -0,0 +1,30 @@ +import functools + +from pyshark import InMemCapture +from pyshark.packet.packet import Packet + + +@functools.cache +def decode_string(hex_str: str) -> Packet: + """ + Decode string (in string-hex format) using 'InMemCapture' using a tshark. + """ + _custom_params = [ + "-o", + "tcp.check_checksum:TRUE", + "-o", + "ip.check_checksum:TRUE", + "-o", + "stt.check_checksum:TRUE", + "-o", + "udp.check_checksum:TRUE", + "-o", + "wlan.check_checksum:TRUE", + ] + # only interested with the first packet + packet = InMemCapture(custom_parameters=_custom_params) + decoded_packet: Packet = packet.parse_packet( + bytes.fromhex(hex_str.replace(" ", "")) + ) + packet.close() + return decoded_packet diff --git a/packet_helper_core/decoders/scapy_data.py b/packet_helper_core/decoders/scapy_data.py new file mode 100644 index 0000000..3f1674c --- /dev/null +++ b/packet_helper_core/decoders/scapy_data.py @@ -0,0 +1,58 @@ +from dataclasses import dataclass + +from scapy.packet import Packet +from scapy_helper import get_hex + +from packet_helper_core.decoders.tshark_data import TSharkData +from packet_helper_core.models.scapy_response import ScapyResponse +from packet_helper_core.utils.scapy_reader import scapy_reader + + +@dataclass +class ScapyData: + raw: str + packet_data: TSharkData + + def __post_init__(self): + self.headers: list[str] = [x.replace("\r", "") for x in self.packet_data.header] + self.scapy_headers: list[Packet] = scapy_reader(self.raw) + + self.full_scapy_representation_headers: list[str] = [ + repr(x) for x in self.scapy_headers + ] + self.single_scapy_representation_headers = [ + f"{x.split(' |')[0]}>" for x in self.full_scapy_representation_headers + ] + + self.packet_structure = self.__make_structure() + + def __make_structure(self) -> list[ScapyResponse]: + scapy_responses: list[ScapyResponse] = [] + + for index, header in enumerate(self.scapy_headers): + scapy_header = self.scapy_headers[index].copy() + scapy_header.remove_payload() # payload is not necessary for our usage in this case + scapy_data_dict: ScapyResponse = ScapyResponse( + **{ + "name": header.name, + "bytes_record": str(header), + "hex_record": get_hex(header), + "hex_record_full": get_hex(scapy_header), + "length": len(header), + "length_unit": "B", + "representation": f"{repr(header).split(' |')[0]}>", + "representation_full": repr(header), + } + ) + + # RAW elements on the end are added to the last package as data! + try: + scapy_data_dict.tshark_name = self.packet_data.body2[index][0] + scapy_data_dict.tshark_raw_summary = self.packet_data.body2[index][1:] + except IndexError: + break + + scapy_data_dict.chksum_status = self.packet_data.chksum_list[index] + + scapy_responses.append(scapy_data_dict) + return scapy_responses diff --git a/packet_helper_core/packet_data.py b/packet_helper_core/decoders/tshark_data.py similarity index 66% rename from packet_helper_core/packet_data.py rename to packet_helper_core/decoders/tshark_data.py index 49a8b0d..a00caea 100644 --- a/packet_helper_core/packet_data.py +++ b/packet_helper_core/decoders/tshark_data.py @@ -1,31 +1,30 @@ -from dataclasses import dataclass, asdict, field -from typing import Any +from dataclasses import dataclass, field -from packet_helper_core.checksum_status import ChecksumStatus +from pyshark.packet.packet import Packet + +from packet_helper_core.models.checksum_status import ChecksumStatus @dataclass -class PacketData: - raw: str - chksum_list: list[Any] = field(default_factory=list) +class TSharkData: + decoded_packet: Packet + chksum_list: list[ChecksumStatus] = field(default_factory=list) _data_layer: list[str] = field(default_factory=list) def __post_init__(self): - self.raw_array = self.raw.split("\n") - self.length = self.raw_array[0].replace(")", "").split()[2] - self.array = self.raw_array[1:] + self.__pkt_information_array = str(self.decoded_packet).split("\n")[1:] - self.header = self.compose_header() - self.body = self.compose_body() - self.body2 = self.compose_body_list() + self.header: list[str] = self.__compose_header() + self.body: dict[str, list[str]] = self.__compose_body() + self.body2: list[list[str]] = self.__compose_body_list() - self.update_header() + self.__update_header() - def compose_header(self): + def __compose_header(self) -> list[str]: return [ a.replace("Layer", "").replace(":", "").replace(" ", "") - for a in self.array + for a in self.__pkt_information_array if a.startswith("Layer") ] @@ -36,10 +35,10 @@ def __is_data_element(self, layer_fragment: str) -> bool: return True return False - def compose_body(self) -> dict[str, list[str]]: + def __compose_body(self) -> dict[str, list[str]]: temp_body_dict: dict[str, list[str]] = {} actual_layer: str = "" - for x in self.array: + for x in self.__pkt_information_array: if x.startswith("Layer"): actual_layer = x.replace(":", "").split()[1] temp_body_dict[actual_layer] = [] @@ -52,14 +51,14 @@ def compose_body(self) -> dict[str, list[str]]: temp_body_dict[actual_layer].append(x) return temp_body_dict - def compose_body_list(self) -> list[list[str]]: + def __compose_body_list(self) -> list[list[str]]: temp_body_dict = [] line = [] ckhsum_flag = False data_found: list[str] = [ "RAW", ] - for arr in self.array: + for arr in self.__pkt_information_array: arr = arr.strip() if arr == "" and line: temp_body_dict.append(line) @@ -83,16 +82,16 @@ def compose_body_list(self) -> list[list[str]]: if ckhsum_flag: for y in temp_body_dict: - self.chksum_verification(y) + self.__chksum_verification(y) temp_body_dict.append(data_found) return temp_body_dict - def chksum_verification(self, element) -> None: - chksum_status = ChecksumStatus() + def __chksum_verification(self, element) -> None: + chksum_status: ChecksumStatus = ChecksumStatus() for x in element: x = x.lower() - if "header checksum" in x and "incorrect" in x: + if "headers checksum" in x and "incorrect" in x: chksum_status.chksum = x.split(":")[1].split()[0] continue if "bad checksum" in x and not chksum_status.chksum: @@ -102,10 +101,10 @@ def chksum_verification(self, element) -> None: if "calculated checksum" in x: chksum_status.chksum_calculated = x.split(":")[1].split()[0] else: - chksum_status() - self.chksum_list.append(asdict(chksum_status)) + chksum_status.verify() + self.chksum_list.append(chksum_status) - def update_header(self): - """Update header with data layer which is 'hidden' in the tshark output""" + def __update_header(self) -> None: + """Update headers with data layer which is 'hidden' in the tshark output""" if self._data_layer: self.header.append("RAW") diff --git a/packet_helper_core/models/__init__.py b/packet_helper_core/models/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/packet_helper_core/models/checksum_status.py b/packet_helper_core/models/checksum_status.py new file mode 100644 index 0000000..19ee22d --- /dev/null +++ b/packet_helper_core/models/checksum_status.py @@ -0,0 +1,18 @@ +from pydantic import BaseModel + + +class ChecksumStatus(BaseModel): + chksum: str = "" + chksum_calculated: str = "" + status: bool | None = None + + def verify(self) -> None: + def clean_chksum(element: str): + return element.replace("0x", "") + + if self.chksum and self.chksum_calculated: + self.status = clean_chksum(self.chksum) == clean_chksum( + self.chksum_calculated + ) + else: + self.status = None diff --git a/packet_helper_core/models/scapy_response.py b/packet_helper_core/models/scapy_response.py new file mode 100644 index 0000000..06a97c8 --- /dev/null +++ b/packet_helper_core/models/scapy_response.py @@ -0,0 +1,21 @@ +from typing import Literal + +from pydantic import BaseModel + +from packet_helper_core.models.checksum_status import ChecksumStatus + + +class ScapyResponse(BaseModel): + name: str + bytes_record: str # bytes + hex_record: str # hex + hex_record_full: str # hex_one + length: int + length_unit: Literal[ + "B", + ] # length_unit + representation: str # repr + representation_full: str # repr_full + tshark_name: str = "" + tshark_raw_summary: str = "" + chksum_status: ChecksumStatus | None = None diff --git a/packet_helper_core/packet_data_scapy.py b/packet_helper_core/packet_data_scapy.py deleted file mode 100644 index 41ad3d1..0000000 --- a/packet_helper_core/packet_data_scapy.py +++ /dev/null @@ -1,50 +0,0 @@ -from dataclasses import dataclass - -from scapy_helper import get_hex - -from packet_helper_core.packet_data import PacketData -from packet_helper_core.utils.scapy_reader import scapy_reader - - -@dataclass -class PacketDataScapy: - raw: str - packet_data: PacketData - - def __post_init__(self): - self.header = [x.replace("\r", "") for x in self.packet_data.header] - self.headers_scapy = scapy_reader(self.raw) - - self.headers_full = [repr(x) for x in self.headers_scapy] - self.headers_single = [f"{x.split(' |')[0]}>" for x in self.headers_full] - - self.structure = self.__make_structure() - - def __make_structure(self): - temp_structure = [] - - for e, h in enumerate(self.headers_scapy): - one_frame = self.headers_scapy[e].copy() - one_frame.remove_payload() - _dict = { - "name": h.name, - "bytes": str(h), - "hex": get_hex(h), - "hex_one": get_hex(one_frame), - "length": len(h), - "length_unit": "B", - "repr": f"{repr(h).split(' |')[0]}>", - "repr_full": repr(h), - } - - # RAW elements on the end are added to the last package as data! - try: - _dict["tshark_name"] = self.packet_data.body2[e][0] - _dict["tshark_raw_summary"] = self.packet_data.body2[e][1:] - except IndexError: - break - - _dict["chksum_status"] = self.packet_data.chksum_list[e] - - temp_structure.append(_dict) - return temp_structure diff --git a/packet_helper_core/packethelper.py b/packet_helper_core/packethelper.py new file mode 100644 index 0000000..ff5abb7 --- /dev/null +++ b/packet_helper_core/packethelper.py @@ -0,0 +1,30 @@ +from packet_helper_core.decoders.decode_string import decode_string +from packet_helper_core.decoders.tshark_data import TSharkData +from packet_helper_core.decoders.scapy_data import ScapyData + + +class PacketHelper: + """ + Class PacketHelper is just a wrapper to create a handy-shortcut + for preparing a data from hex string + """ + + def __init__(self) -> None: + self.hex_string, self.__decoded_by_tshark, self.__decoded_by_scapy = (None,) * 3 + + def decode(self, hex_string: str, extend_with_scapy: bool = True) -> None: + self.hex_string = hex_string.replace(" ", "") + decoded_string = decode_string(self.hex_string) + self.__decoded_by_tshark = TSharkData(decoded_packet=decoded_string) + if extend_with_scapy: + self.__decoded_by_scapy = ScapyData( + raw=self.hex_string, packet_data=self.__decoded_by_tshark + ) + + @property + def tshark_data(self) -> TSharkData | None: + return self.__decoded_by_tshark + + @property + def scapy_data(self) -> ScapyData | None: + return self.__decoded_by_scapy diff --git a/packet_helper_core/utils/conversion.py b/packet_helper_core/utils/conversion.py index fffaafb..11b8d92 100644 --- a/packet_helper_core/utils/conversion.py +++ b/packet_helper_core/utils/conversion.py @@ -1,17 +1,18 @@ import importlib +from scapy.base_classes import BasePacket -def from_sh_list(packet_list): + +def from_sh_list(packet_list) -> BasePacket: imported_all = importlib.import_module("scapy.all") def remove_none(): - return {k: v for k, v in _value.items() if v is not None} + return {k: v for k, v in layer.get(_key, {}).items() if v is not None} new_packet = None for layer in packet_list: if isinstance(layer, dict): _key = [x for x in layer.keys()][0] - _value = layer.get(_key) _value = remove_none() if _key == "Ethernet": _key = "Ether" diff --git a/packet_helper_core/utils/scapy_reader.py b/packet_helper_core/utils/scapy_reader.py index 5f9e3ce..314df1b 100644 --- a/packet_helper_core/utils/scapy_reader.py +++ b/packet_helper_core/utils/scapy_reader.py @@ -2,26 +2,25 @@ import logging import os from time import time -from typing import List from scapy.all import wrpcap, rdpcap from scapy.packet import Packet -def scapy_reader(hex_str: str) -> List[Packet]: - hex_str = binascii.unhexlify(hex_str) - if not isinstance(hex_str, bytes): +def scapy_reader(hex_str: str) -> list[Packet]: + bytes_from_hex = binascii.unhexlify(hex_str) + if not isinstance(bytes_from_hex, bytes): raise Exception("ERR:: hex_str must be in bytes!") temp_filename = f"pcap_{time()}" - wrpcap(temp_filename, hex_str) + wrpcap(temp_filename, bytes_from_hex) # type: ignore pcap_object = rdpcap(temp_filename) # try to clean after all try: os.remove(temp_filename) - except Exception: - logging.error(f"Cannot remove {temp_filename}") + except OSError as os_err: + logging.error(f"Cannot remove {temp_filename}\n{os_err}") converted_packets = [] current = pcap_object[0] diff --git a/packet_helper_core/utils/utils.py b/packet_helper_core/utils/utils.py deleted file mode 100644 index 47bd182..0000000 --- a/packet_helper_core/utils/utils.py +++ /dev/null @@ -1,55 +0,0 @@ -import pyshark -from pyshark.packet.packet import Packet -from scapy_helper import get_hex - - -def hex_str_operation(h_string, with_new_line: bool = False): - z = "" - tmp = [] - for e, x in enumerate(h_string.replace(" ", "")): - z += x - if e % 2: - tmp.append(z) - z = "" - if with_new_line: - temp_list = [] - for e, v in enumerate(tmp, 1): - if not e % 16: - temp_list.append(f"{v}\n") - continue - temp_list.append(f"{v} ") - return "".join(temp_list) - return " ".join(tmp) - - -def decode_hex(hex_str: str, use_json: bool = False) -> Packet: - frame_bytes: bytes = bytes.fromhex(hex_str) - _custom_params = [ - "-o", - "tcp.check_checksum:TRUE", - "-o", - "ip.check_checksum:TRUE", - "-o", - "stt.check_checksum:TRUE", - "-o", - "udp.check_checksum:TRUE", - "-o", - "wlan.check_checksum:TRUE", - ] - # only interested with the first packet - packet = pyshark.InMemCapture(custom_parameters=_custom_params) - return packet.parse_packet(frame_bytes) - - -def better_scapy_summary(scapy_summary) -> list: - list_ = [] - for frame in scapy_summary: - temp_frame = { - "name": frame.name, - "bytes": frame.raw_packet_cache, - "hex": get_hex(frame.raw_packet_cache), - "length": len(frame.raw_packet_cache), - "repr": f"{repr(frame).split(' |')[0]}>", - } - list_.append(temp_frame) - return list_ diff --git a/requirements.txt b/requirements.txt index 1db101e..3c4ed03 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ pyshark==0.5.3 scapy~=2.4.5 scapy_helper==0.14.8 +pydantic~=1.10.1 diff --git a/setup.py b/setup.py index 212305d..888f3d5 100644 --- a/setup.py +++ b/setup.py @@ -16,7 +16,7 @@ setup( name="packet_helper_core", - description="Engine to decode raw string hex into packets", + description="Engine to decode decoded_packet string hex into packets", long_description=long_description, long_description_content_type="text/markdown", author="Nex Sabre", diff --git a/tests/decoders/__init__.py b/tests/decoders/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/decoders/test_scapy_data.py b/tests/decoders/test_scapy_data.py new file mode 100644 index 0000000..496448d --- /dev/null +++ b/tests/decoders/test_scapy_data.py @@ -0,0 +1,25 @@ +import pytest +from scapy.all import IP, TCP, Ether # noqa +from scapy_helper import get_hex + +from packet_helper_core import PacketHelper +from packet_helper_core.decoders.scapy_data import ScapyData + + +@pytest.fixture +def decode_example_packet() -> PacketHelper: + ph = PacketHelper() + ph.decode(get_hex(Ether() / IP() / IP() / TCP())) + return ph + + +def test_scapy_data(decode_example_packet: PacketHelper) -> None: + scapy_data = ScapyData( + decode_example_packet.hex_string, decode_example_packet.tshark_data + ) + assert scapy_data + assert scapy_data.headers + assert scapy_data.scapy_headers + assert scapy_data.full_scapy_representation_headers + assert scapy_data.single_scapy_representation_headers + assert scapy_data.packet_structure diff --git a/tests/test_core.py b/tests/test_core.py deleted file mode 100644 index ec7ff6c..0000000 --- a/tests/test_core.py +++ /dev/null @@ -1,58 +0,0 @@ -from packet_helper_core import PacketData, PacketDataScapy -from packet_helper_core.core import Core -from scapy.all import IP, TCP, Ether # type: ignore -from scapy_helper import get_hex - - -class TestCore: - simple_ether_ip_tcp_hex_string = get_hex(Ether() / IP() / TCP()) - - def test_core_post_init(self): - core_results = Core(TestCore.simple_ether_ip_tcp_hex_string) - - assert isinstance(core_results.hex_string, str), "Should be String" - assert isinstance( - core_results.scapy_data, PacketDataScapy - ), "Should be PacketDataScapy" - assert isinstance(core_results.tshark_data, PacketData), "Should be PacketData" - - assert core_results.scapy_data.header == [ - "ETH", - "IP", - "TCP", - ], "Should be properly decoded" - assert core_results.tshark_data.header == [ - "ETH", - "IP", - "TCP", - ], "Should be properly decoded" - - def test_core_chksum_verification(self): - core_results = Core(get_hex(Ether() / IP() / IP() / TCP())) - assert core_results.tshark_data.chksum_list - - def test_negative_core_chksum_verification_with_wrong_chksum(self): - core_results2 = Core(get_hex(Ether() / IP() / IP(chksum=0) / TCP())) - assert core_results2.tshark_data.chksum_list[2]["chksum"] == "0x0000" - - def test_one_of_custom_problematic_cases(self): - core_results = Core( - "ffffffaaa9ff00000000001208004500003c0001000040047cbb7f0000017f" - "000001450000280001000040067ccd7f0000017f0000010014005000000000" - "0000000050022000917d0000" - ) - assert core_results.tshark_data.chksum_list[3]["chksum"] == "0x917d" - - def test_Ethernet_IP_UDP_DNS(self): - core_result = Core( - "00E01CCCCCC2001F33D9736108004500008000004000401124550A0A01010" - "A0A01040035DB66006C2D2E795681800001000200020000046D61696C0870" - "617472696F747302696E0000010001C00C0005000100002A4B0002C011C01" - "10001000100002A4C00044A358C99C011000200010001438C0006036E7332" - "C011C011000200010001438C0006036E7331C011" - ) - chksum_obj = core_result.tshark_data.chksum_list[2] - - assert chksum_obj["chksum"] == "0x2d2e" - assert chksum_obj["chksum_calculated"] == "0x2d2d" - assert chksum_obj["status"] is False diff --git a/tests/test_decoder.py b/tests/test_decoder.py new file mode 100644 index 0000000..27f7cf0 --- /dev/null +++ b/tests/test_decoder.py @@ -0,0 +1,71 @@ +import pytest + +from packet_helper_core.decoders.tshark_data import TSharkData +from packet_helper_core.decoders.scapy_data import ScapyData +from packet_helper_core.models.checksum_status import ChecksumStatus +from packet_helper_core.packethelper import PacketHelper +from scapy.all import IP, TCP, Ether # noqa +from scapy_helper import get_hex + + +def test_core_post_init(): + expected_headers = [ + "ETH", + "IP", + "TCP", + ] + ph = PacketHelper() + ph.decode(get_hex(Ether() / IP() / TCP())) + + assert isinstance(ph.hex_string, str), "Should be String" + assert isinstance(ph.scapy_data, ScapyData), "Should be Scapy Data" + assert isinstance(ph.tshark_data, TSharkData), "Should be TShark Data" + assert ph.scapy_data.headers == expected_headers, "Should be properly decoded" + assert ph.tshark_data.header == expected_headers, "Should be properly decoded" + + +def test_core_chksum_verification(): + decoder = PacketHelper() + decoder.decode(get_hex(Ether() / IP() / IP() / TCP())) + assert decoder.tshark_data.chksum_list + assert len(decoder.tshark_data.chksum_list) == 4 + + +@pytest.mark.parametrize( + "packet, position_to_check, expected_chksum_value", + ( + (get_hex(Ether() / IP() / IP(chksum=0) / TCP()), 2, "0x0000"), + ( + ( + "ffffffaaa9ff00000000001208004500003c0001000040047cbb7f0000017f" + "000001450000280001000040067ccd7f0000017f0000010014005000000000" + "0000000050022000917d0000" + ), + 3, + "0x917d", + ), + ), +) +def test_negative_core_chksum_verification_with_wrong_chksum( + packet: str, position_to_check: int, expected_chksum_value: str +): + ph = PacketHelper() + ph.decode(packet) + assert ph.tshark_data.chksum_list[position_to_check].chksum == expected_chksum_value + + +def test_ethernet_ip_udp_dns(): + packet = ( + "00E01CCCCCC2001F33D9736108004500008000004000401124550A0A01010" + "A0A01040035DB66006C2D2E795681800001000200020000046D61696C0870" + "617472696F747302696E0000010001C00C0005000100002A4B0002C011C01" + "10001000100002A4C00044A358C99C011000200010001438C0006036E7332" + "C011C011000200010001438C0006036E7331C011" + ) + ph = PacketHelper() + ph.decode(packet) + chksum_obj: ChecksumStatus = ph.tshark_data.chksum_list[2] + + assert chksum_obj.chksum == "0x2d2e" + assert chksum_obj.chksum_calculated == "0x2d2d" + assert chksum_obj.status is False diff --git a/tests/test_from_sh_list.py b/tests/test_from_sh_list.py index 4cd01d1..b8c8feb 100644 --- a/tests/test_from_sh_list.py +++ b/tests/test_from_sh_list.py @@ -1,4 +1,8 @@ +from typing import Any + import pytest +from scapy.base_classes import BasePacket + from packet_helper_core.utils.conversion import from_sh_list from scapy.packet import Packet from scapy_helper import get_hex, to_list @@ -10,8 +14,11 @@ "packet", (SIMPLE_IP_IN_IP_PACKET, SIMPLE_IP_IN_IP_PACKET / SIMPLE_IP_IN_IP_PACKET) ) def test_from_sh_list(packet: Packet) -> None: - packet_list = to_list(packet) - new_packet = from_sh_list(packet_list) + packet_list: list[dict[str, Any]] = to_list(packet) + packet_generated_from_scapy_helper = from_sh_list(packet_list) - assert get_hex(packet) == get_hex(new_packet) - assert packet_list == to_list(new_packet) + assert isinstance(packet_generated_from_scapy_helper, BasePacket) + assert get_hex(packet) == get_hex( + packet_generated_from_scapy_helper + ), "Packets should return same hex results" + assert packet_list == to_list(packet_generated_from_scapy_helper) diff --git a/tests/test_packet_data.py b/tests/test_packet_data.py index d5d2051..5641e66 100644 --- a/tests/test_packet_data.py +++ b/tests/test_packet_data.py @@ -1,17 +1,20 @@ -from packet_helper_core.packet_data import PacketData -from packet_helper_core.utils.utils import decode_hex -from scapy.layers.all import IP, TCP, Ether, IPv6 # type: ignore +from scapy.layers.all import IP, TCP, Ether, IPv6 # noqa from scapy_helper import get_hex +from packet_helper_core import PacketHelper +from packet_helper_core.decoders.decode_string import decode_string +from packet_helper_core.decoders.tshark_data import TSharkData from tests.utils.example_packets import EXAMPLE_ETHER, EXAMPLE_ETHER_IP_IPV6_GRE_DATA def test_packet_data(): - packet = decode_hex(EXAMPLE_ETHER) - assert packet.__getitem__("eth"), "Layer Ether should be available in decoded hex" + decoded_pkt = decode_string(EXAMPLE_ETHER) + assert decoded_pkt.__getitem__( + "eth" + ), "Layer Ether should be available in decoded hex" - pd = PacketData(raw=str(packet)) - assert "ETH" in pd.header, "Ether header should be found at packet" + pd = TSharkData(decoded_packet=decoded_pkt) + assert "ETH" in pd.header, "Ether headers should be found at decoded_pkt" def test_decode_hex__data_should_be_present_after_gre_packet(): @@ -20,8 +23,8 @@ def test_decode_hex__data_should_be_present_after_gre_packet(): "e672031313233343435393832373334393832373334323334" ) - packet = decode_hex(EXAMPLE_ETHER_IP_IPV6_GRE_DATA) - pd = PacketData(raw=str(packet)) + decoded_pkt = decode_string(EXAMPLE_ETHER_IP_IPV6_GRE_DATA) + pd = TSharkData(decoded_packet=decoded_pkt) packet_raw_data = pd.body.get("RAW", []) assert packet_raw_data, "RAW block should be available" extracted_data_from_raw = packet_raw_data[0].split()[-1] @@ -33,7 +36,7 @@ def test_decode_hex__data_should_be_present_after_gre_packet(): def test_custom_packet_data(): frame = Ether() / IP() / IPv6() / TCP() - packet = decode_hex(get_hex(frame)) + packet = decode_string(get_hex(frame)) list_of_expected_packets = ("ETH", "IP", "IPV6", "TCP") list_of_layers_from_packet = [x.layer_name.upper() for x in packet.layers] for expected_packet in list_of_expected_packets: diff --git a/tests/utils/example_packets.py b/tests/utils/example_packets.py index d25c3d5..e3be1b8 100644 --- a/tests/utils/example_packets.py +++ b/tests/utils/example_packets.py @@ -1,4 +1,4 @@ -from scapy.all import IP, TCP, Ether # type: ignore +from scapy.all import IP, TCP, Ether # noqa EXAMPLE_ETHER = "ff fd df af ff ff 00 00 00 00 00 12 08 00"