Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions packet_helper_core/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1 @@
from packet_helper_core.core import Core
from packet_helper_core.packet_data import PacketData
from packet_helper_core.packet_data_scapy import PacketDataScapy
from packet_helper_core.packethelper import PacketHelper
16 changes: 0 additions & 16 deletions packet_helper_core/checksum_status.py

This file was deleted.

20 changes: 0 additions & 20 deletions packet_helper_core/core.py

This file was deleted.

Empty file.
30 changes: 30 additions & 0 deletions packet_helper_core/decoders/decode_string.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import functools

from pyshark import InMemCapture
from pyshark.packet.packet import Packet


@functools.cache
def decode_string(hex_str: str) -> Packet:
"""
Decode string (in string-hex format) using 'InMemCapture' using a tshark.
"""
_custom_params = [
"-o",
"tcp.check_checksum:TRUE",
"-o",
"ip.check_checksum:TRUE",
"-o",
"stt.check_checksum:TRUE",
"-o",
"udp.check_checksum:TRUE",
"-o",
"wlan.check_checksum:TRUE",
]
# only interested with the first packet
packet = InMemCapture(custom_parameters=_custom_params)
decoded_packet: Packet = packet.parse_packet(
bytes.fromhex(hex_str.replace(" ", ""))
)
packet.close()
return decoded_packet
58 changes: 58 additions & 0 deletions packet_helper_core/decoders/scapy_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
from dataclasses import dataclass

from scapy.packet import Packet
from scapy_helper import get_hex

from packet_helper_core.decoders.tshark_data import TSharkData
from packet_helper_core.models.scapy_response import ScapyResponse
from packet_helper_core.utils.scapy_reader import scapy_reader


@dataclass
class ScapyData:
raw: str
packet_data: TSharkData

def __post_init__(self):
self.headers: list[str] = [x.replace("\r", "") for x in self.packet_data.header]
self.scapy_headers: list[Packet] = scapy_reader(self.raw)

self.full_scapy_representation_headers: list[str] = [
repr(x) for x in self.scapy_headers
]
self.single_scapy_representation_headers = [
f"{x.split(' |')[0]}>" for x in self.full_scapy_representation_headers
]

self.packet_structure = self.__make_structure()

def __make_structure(self) -> list[ScapyResponse]:
scapy_responses: list[ScapyResponse] = []

for index, header in enumerate(self.scapy_headers):
scapy_header = self.scapy_headers[index].copy()
scapy_header.remove_payload() # payload is not necessary for our usage in this case
scapy_data_dict: ScapyResponse = ScapyResponse(
**{
"name": header.name,
"bytes_record": str(header),
"hex_record": get_hex(header),
"hex_record_full": get_hex(scapy_header),
"length": len(header),
"length_unit": "B",
"representation": f"{repr(header).split(' |')[0]}>",
"representation_full": repr(header),
}
)

# RAW elements on the end are added to the last package as data!
try:
scapy_data_dict.tshark_name = self.packet_data.body2[index][0]
scapy_data_dict.tshark_raw_summary = self.packet_data.body2[index][1:]
except IndexError:
break

scapy_data_dict.chksum_status = self.packet_data.chksum_list[index]

scapy_responses.append(scapy_data_dict)
return scapy_responses
Original file line number Diff line number Diff line change
@@ -1,31 +1,30 @@
from dataclasses import dataclass, asdict, field
from typing import Any
from dataclasses import dataclass, field

from packet_helper_core.checksum_status import ChecksumStatus
from pyshark.packet.packet import Packet

from packet_helper_core.models.checksum_status import ChecksumStatus


@dataclass
class PacketData:
raw: str
chksum_list: list[Any] = field(default_factory=list)
class TSharkData:
decoded_packet: Packet
chksum_list: list[ChecksumStatus] = field(default_factory=list)

_data_layer: list[str] = field(default_factory=list)

def __post_init__(self):
self.raw_array = self.raw.split("\n")
self.length = self.raw_array[0].replace(")", "").split()[2]
self.array = self.raw_array[1:]
self.__pkt_information_array = str(self.decoded_packet).split("\n")[1:]

self.header = self.compose_header()
self.body = self.compose_body()
self.body2 = self.compose_body_list()
self.header: list[str] = self.__compose_header()
self.body: dict[str, list[str]] = self.__compose_body()
self.body2: list[list[str]] = self.__compose_body_list()

self.update_header()
self.__update_header()

def compose_header(self):
def __compose_header(self) -> list[str]:
return [
a.replace("Layer", "").replace(":", "").replace(" ", "")
for a in self.array
for a in self.__pkt_information_array
if a.startswith("Layer")
]

Expand All @@ -36,10 +35,10 @@ def __is_data_element(self, layer_fragment: str) -> bool:
return True
return False

def compose_body(self) -> dict[str, list[str]]:
def __compose_body(self) -> dict[str, list[str]]:
temp_body_dict: dict[str, list[str]] = {}
actual_layer: str = ""
for x in self.array:
for x in self.__pkt_information_array:
if x.startswith("Layer"):
actual_layer = x.replace(":", "").split()[1]
temp_body_dict[actual_layer] = []
Expand All @@ -52,14 +51,14 @@ def compose_body(self) -> dict[str, list[str]]:
temp_body_dict[actual_layer].append(x)
return temp_body_dict

def compose_body_list(self) -> list[list[str]]:
def __compose_body_list(self) -> list[list[str]]:
temp_body_dict = []
line = []
ckhsum_flag = False
data_found: list[str] = [
"RAW",
]
for arr in self.array:
for arr in self.__pkt_information_array:
arr = arr.strip()
if arr == "" and line:
temp_body_dict.append(line)
Expand All @@ -83,16 +82,16 @@ def compose_body_list(self) -> list[list[str]]:

if ckhsum_flag:
for y in temp_body_dict:
self.chksum_verification(y)
self.__chksum_verification(y)

temp_body_dict.append(data_found)
return temp_body_dict

def chksum_verification(self, element) -> None:
chksum_status = ChecksumStatus()
def __chksum_verification(self, element) -> None:
chksum_status: ChecksumStatus = ChecksumStatus()
for x in element:
x = x.lower()
if "header checksum" in x and "incorrect" in x:
if "headers checksum" in x and "incorrect" in x:
chksum_status.chksum = x.split(":")[1].split()[0]
continue
if "bad checksum" in x and not chksum_status.chksum:
Expand All @@ -102,10 +101,10 @@ def chksum_verification(self, element) -> None:
if "calculated checksum" in x:
chksum_status.chksum_calculated = x.split(":")[1].split()[0]
else:
chksum_status()
self.chksum_list.append(asdict(chksum_status))
chksum_status.verify()
self.chksum_list.append(chksum_status)

def update_header(self):
"""Update header with data layer which is 'hidden' in the tshark output"""
def __update_header(self) -> None:
"""Update headers with data layer which is 'hidden' in the tshark output"""
if self._data_layer:
self.header.append("RAW")
Empty file.
18 changes: 18 additions & 0 deletions packet_helper_core/models/checksum_status.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from pydantic import BaseModel


class ChecksumStatus(BaseModel):
chksum: str = ""
chksum_calculated: str = ""
status: bool | None = None

def verify(self) -> None:
def clean_chksum(element: str):
return element.replace("0x", "")

if self.chksum and self.chksum_calculated:
self.status = clean_chksum(self.chksum) == clean_chksum(
self.chksum_calculated
)
else:
self.status = None
21 changes: 21 additions & 0 deletions packet_helper_core/models/scapy_response.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from typing import Literal

from pydantic import BaseModel

from packet_helper_core.models.checksum_status import ChecksumStatus


class ScapyResponse(BaseModel):
name: str
bytes_record: str # bytes
hex_record: str # hex
hex_record_full: str # hex_one
length: int
length_unit: Literal[
"B",
] # length_unit
representation: str # repr
representation_full: str # repr_full
tshark_name: str = ""
tshark_raw_summary: str = ""
chksum_status: ChecksumStatus | None = None
50 changes: 0 additions & 50 deletions packet_helper_core/packet_data_scapy.py

This file was deleted.

30 changes: 30 additions & 0 deletions packet_helper_core/packethelper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from packet_helper_core.decoders.decode_string import decode_string
from packet_helper_core.decoders.tshark_data import TSharkData
from packet_helper_core.decoders.scapy_data import ScapyData


class PacketHelper:
"""
Class PacketHelper is just a wrapper to create a handy-shortcut
for preparing a data from hex string
"""

def __init__(self) -> None:
self.hex_string, self.__decoded_by_tshark, self.__decoded_by_scapy = (None,) * 3

def decode(self, hex_string: str, extend_with_scapy: bool = True) -> None:
self.hex_string = hex_string.replace(" ", "")
decoded_string = decode_string(self.hex_string)
self.__decoded_by_tshark = TSharkData(decoded_packet=decoded_string)
if extend_with_scapy:
self.__decoded_by_scapy = ScapyData(
raw=self.hex_string, packet_data=self.__decoded_by_tshark
)

@property
def tshark_data(self) -> TSharkData | None:
return self.__decoded_by_tshark

@property
def scapy_data(self) -> ScapyData | None:
return self.__decoded_by_scapy
7 changes: 4 additions & 3 deletions packet_helper_core/utils/conversion.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
import importlib

from scapy.base_classes import BasePacket

def from_sh_list(packet_list):

def from_sh_list(packet_list) -> BasePacket:
imported_all = importlib.import_module("scapy.all")

def remove_none():
return {k: v for k, v in _value.items() if v is not None}
return {k: v for k, v in layer.get(_key, {}).items() if v is not None}

new_packet = None
for layer in packet_list:
if isinstance(layer, dict):
_key = [x for x in layer.keys()][0]
_value = layer.get(_key)
_value = remove_none()
if _key == "Ethernet":
_key = "Ether"
Expand Down
Loading