diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index da29a19..3dd19a0 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -23,7 +23,7 @@ jobs: sudo apt-get install -y --allow-change-held-packages --force-yes tshark - name: Run pytest run: | - pytest + make test verify-code: name: Verify code w/Black&Flake8 runs-on: ubuntu-latest diff --git a/.gitignore b/.gitignore index e80085c..6ab0cb8 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,5 @@ venv/ .mypy_cache/ .pytest_cache/ + +.DS_Store/ diff --git a/Makefile b/Makefile index 951a1b2..06f5ad0 100644 --- a/Makefile +++ b/Makefile @@ -25,3 +25,8 @@ build: @echo "Building..." python3 setup.py sdist bdist_wheel --universal @echo "Building... Done" + +.PHONY: test +test: + mkdir -p static && touch static/index.html + PYTHONPATH=${PWD} pytest tests/integration diff --git a/README.md b/README.md index 6d4698e..d5544af 100644 --- a/README.md +++ b/README.md @@ -17,7 +17,7 @@ sudo apt-get -y install wireshark sudo apt-get install -y --allow-change-held-packages --force-yes tshark ``` -Recommended *Python 3.8* (as minimal). +Recommended *Python 3.11* (as minimal). To verify that all works, try to run test using `pytest` (in root directory of this package): diff --git a/packet_helper_core/utils/__init__.py b/api/__init__.py similarity index 100% rename from packet_helper_core/utils/__init__.py rename to api/__init__.py diff --git a/api/main.py b/api/main.py new file mode 100644 index 0000000..62fa846 --- /dev/null +++ b/api/main.py @@ -0,0 +1,52 @@ +from fastapi import FastAPI, Request, status +from fastapi.middleware.cors import CORSMiddleware +from fastapi.staticfiles import StaticFiles +from fastapi.templating import Jinja2Templates +from starlette.templating import _TemplateResponse # noqa + +from api.routers.api.create import api as api_create +from api.routers.api.hex import api as api_hex +from api.routers.api.info import api as api_info +from api.routers.api.packets import api as api_packets +from api.routers.version import version + +app = FastAPI( + title="Packet Helper Next", + description="Packet Helper API helps you to decode hex into packets with description 🚀", + version="0.1", + license_info={ + "name": "GPL v2.0", + "url": "https://github.com/PacketHelper/packet-helper-next/blob/main/LICENSE", + }, +) +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) +app.mount("/static", StaticFiles(directory="static"), name="static") + +# rest api +# /api/* +router_api_config = {"prefix": "/api", "tags": ["api"]} +for api_router in (api_create, api_hex, api_info, api_packets): + app.include_router(api_router, **router_api_config) + +# /* +app.include_router(version) + +templates = Jinja2Templates(directory="static") + + +@app.get("/", include_in_schema=False, status_code=status.HTTP_200_OK) +def get_root(request: Request) -> _TemplateResponse: + """Return Vue single page""" + return templates.TemplateResponse("index.html", {"request": request}) + + +@app.get("/hex/{hex_string}", include_in_schema=False, status_code=status.HTTP_200_OK) +def get_hex(request: Request) -> _TemplateResponse: + """Return specific path for Vue single-page""" + return templates.TemplateResponse("index.html", {"request": request}) diff --git a/api/models/__init__.py b/api/models/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/api/models/creator_packets.py b/api/models/creator_packets.py new file mode 100644 index 0000000..fa5339c --- /dev/null +++ b/api/models/creator_packets.py @@ -0,0 +1,19 @@ +from typing import Any + +from pydantic import BaseModel + + +class CreatorPacketsRequest(BaseModel): + packets: list[Any] + + +class CreatorPacketsResponse(BaseModel): + packets: list[dict[str, Any]] | None + + +class CreatorPacketsObjectsRequest(BaseModel): + packets: list[dict[str, Any]] + + +class CreatorPacketsObjectsResponse(BaseModel): + builtpacket: dict[str, str] # FIXME rename => built_packet diff --git a/api/models/decoded_hex.py b/api/models/decoded_hex.py new file mode 100644 index 0000000..ad7564c --- /dev/null +++ b/api/models/decoded_hex.py @@ -0,0 +1,17 @@ +from typing import Literal + +from pydantic import BaseModel + +from core.models.scapy_response import ScapyResponse + + +class HexSummary(BaseModel): + length: int + length_unit: Literal["B", "b"] + hexdump: str + + +class DecodedHexResponse(BaseModel): + hex: str + summary: HexSummary + structure: list[ScapyResponse] diff --git a/api/models/info_response.py b/api/models/info_response.py new file mode 100644 index 0000000..b8d03c2 --- /dev/null +++ b/api/models/info_response.py @@ -0,0 +1,11 @@ +from pydantic import BaseModel + + +class InfoResponse(BaseModel): + version: str + revision: str | None + + +class VersionResponse(BaseModel): + packethelper: str # FIXME rename => packet_helper + framework: str diff --git a/api/routers/api/__init__.py b/api/routers/api/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/api/routers/api/create.py b/api/routers/api/create.py new file mode 100644 index 0000000..a7ccfd1 --- /dev/null +++ b/api/routers/api/create.py @@ -0,0 +1,25 @@ +from fastapi import APIRouter, HTTPException, status +from core.utils.conversion import from_sh_list +from scapy_helper import get_hex + +from api.models.creator_packets import ( + CreatorPacketsObjectsRequest, + CreatorPacketsObjectsResponse, +) + +api = APIRouter() + + +@api.post("/create", status_code=status.HTTP_201_CREATED, tags=["api"]) +def post_api_create( + request: CreatorPacketsObjectsRequest, +) -> CreatorPacketsObjectsResponse: + _hex = None + try: + _hex = get_hex(from_sh_list(request.packets)) + except AttributeError as error: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail={"error": f"Layer is not supported {str(error).split()[-1]}"}, + ) + return CreatorPacketsObjectsResponse(builtpacket={"hex": _hex}) diff --git a/api/routers/api/hex.py b/api/routers/api/hex.py new file mode 100644 index 0000000..14c5578 --- /dev/null +++ b/api/routers/api/hex.py @@ -0,0 +1,56 @@ +import functools + +import pydantic +from fastapi import APIRouter, HTTPException, status +from core.decoders.scapy_data import ScapyData +from core.decoders.tshark_data import TSharkData +from core.decoders.decode_string import decode_string +from scapy_helper import hexdump + +from api.models.decoded_hex import DecodedHexResponse +from core.models.scapy_response import ScapyResponse + +api = APIRouter() + + +@api.get("/hex/{hex_string}", status_code=status.HTTP_200_OK, tags=["api"]) +def get_api_hex(hex_string: str) -> DecodedHexResponse: + @functools.cache + def prepare_api_response(hex_to_decode: str) -> list[ScapyResponse]: + packet = decode_string(hex_to_decode) + packet_data = TSharkData(packet) + scapy_data = ScapyData(hex_to_decode, packet_data) + + return scapy_data.packet_structure + + h = " ".join( + [ + "".join([hex_string[e - 1], hex_string[e]]) + for e, _ in enumerate(hex_string) + if e % 2 + ] + ) + + try: + response = DecodedHexResponse( + hex=hex_string, + summary={ + "length": len(h.split()), + "length_unit": "B", + "hexdump": hexdump(h, dump=True), + }, + structure=prepare_api_response(hex_string), + ) + except IndexError: + raise HTTPException( + status.HTTP_400_BAD_REQUEST, + detail={ + "error": f"Hex <{hex_string}> is incorrect. Is packet length is correct?" + }, + ) + except pydantic.error_wrappers.ValidationError as ve: + raise HTTPException( + status.HTTP_500_INTERNAL_SERVER_ERROR, + detail={"error": f"Incorrect response from engine: <{ve}>"}, + ) + return response diff --git a/api/routers/api/info.py b/api/routers/api/info.py new file mode 100644 index 0000000..1b09e7c --- /dev/null +++ b/api/routers/api/info.py @@ -0,0 +1,18 @@ +from os import getenv + +from fastapi import APIRouter, status + +from api.models.info_response import InfoResponse + +api = APIRouter() + + +@api.get( + "/info", + description="Get information about packet helper version and revision", + status_code=status.HTTP_200_OK, + tags=["api"], +) +def get_info() -> InfoResponse: + ph_version = getenv("PH_VERSION", "v1.0.0:00000000").split(":") + return InfoResponse(version=ph_version[0], revision=ph_version[1]) diff --git a/api/routers/api/packets.py b/api/routers/api/packets.py new file mode 100644 index 0000000..82c4c6c --- /dev/null +++ b/api/routers/api/packets.py @@ -0,0 +1,28 @@ +import importlib + +from fastapi import APIRouter, HTTPException, status +from scapy_helper import to_list + +from api.models.creator_packets import CreatorPacketsRequest, CreatorPacketsResponse + +api = APIRouter() + + +@api.post("/packets", status_code=status.HTTP_201_CREATED, tags=["api"]) +def post_api_packets(request: CreatorPacketsRequest) -> CreatorPacketsResponse: + imported_all = importlib.import_module("scapy.all") + packet = None + try: + for protocol in request.packets: + new_layer = imported_all.__getattribute__(protocol) + if packet is None: + packet = new_layer() + continue + packet /= new_layer() + except AttributeError as error: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail={"error": f"Layer is not supported {str(error).split()[-1]}"}, + ) + + return CreatorPacketsResponse(packets=to_list(packet)) diff --git a/api/routers/version.py b/api/routers/version.py new file mode 100644 index 0000000..50b1f08 --- /dev/null +++ b/api/routers/version.py @@ -0,0 +1,14 @@ +from fastapi import APIRouter, status + + +from api.models.info_response import VersionResponse + +version = APIRouter() + + +@version.get( + "/version", status_code=status.HTTP_200_OK, include_in_schema=False, deprecated=True +) +def get_version() -> VersionResponse: + """Return information about version of the Packet Helper""" + return VersionResponse(packethelper="0.1", framework="fastapi") diff --git a/core/__init__.py b/core/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/core/decoders/__init__.py b/core/decoders/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/core/decoders/decode_string.py b/core/decoders/decode_string.py new file mode 100644 index 0000000..4634be9 --- /dev/null +++ b/core/decoders/decode_string.py @@ -0,0 +1,30 @@ +import functools + +from pyshark import InMemCapture +from pyshark.packet.packet import Packet + + +@functools.cache +def decode_string(hex_str: str) -> Packet: + """ + Decode string (in string-hex format) using 'InMemCapture' using a tshark. + """ + _custom_params = [ + "-o", + "tcp.check_checksum:TRUE", + "-o", + "ip.check_checksum:TRUE", + "-o", + "stt.check_checksum:TRUE", + "-o", + "udp.check_checksum:TRUE", + "-o", + "wlan.check_checksum:TRUE", + ] + # only interested with the first packet + packet = InMemCapture(custom_parameters=_custom_params) + decoded_packet: Packet = packet.parse_packet( + bytes.fromhex(hex_str.replace(" ", "")) + ) + packet.close() + return decoded_packet diff --git a/core/decoders/scapy_data.py b/core/decoders/scapy_data.py new file mode 100644 index 0000000..d4942a8 --- /dev/null +++ b/core/decoders/scapy_data.py @@ -0,0 +1,58 @@ +from dataclasses import dataclass + +from scapy.packet import Packet +from scapy_helper import get_hex + +from core.decoders.tshark_data import TSharkData +from core.models.scapy_response import ScapyResponse +from core.utils.scapy_reader import scapy_reader + + +@dataclass +class ScapyData: + raw: str + packet_data: TSharkData + + def __post_init__(self): + self.headers: list[str] = [x.replace("\r", "") for x in self.packet_data.header] + self.scapy_headers: list[Packet] = scapy_reader(self.raw) + + self.full_scapy_representation_headers: list[str] = [ + repr(x) for x in self.scapy_headers + ] + self.single_scapy_representation_headers = [ + f"{x.split(' |')[0]}>" for x in self.full_scapy_representation_headers + ] + + self.packet_structure = self.__make_structure() + + def __make_structure(self) -> list[ScapyResponse]: + scapy_responses: list[ScapyResponse] = [] + + for index, header in enumerate(self.scapy_headers): + scapy_header = self.scapy_headers[index].copy() + scapy_header.remove_payload() # payload is not necessary for our usage in this case + scapy_data_dict: ScapyResponse = ScapyResponse( + **{ + "name": header.name, + "bytes_record": str(header), + "hex_record": get_hex(header), + "hex_record_full": get_hex(scapy_header), + "length": len(header), + "length_unit": "B", + "representation": f"{repr(header).split(' |')[0]}>", + "representation_full": repr(header), + } + ) + + # RAW elements on the end are added to the last package as data! + try: + scapy_data_dict.tshark_name = self.packet_data.body2[index][0] + scapy_data_dict.tshark_raw_summary = self.packet_data.body2[index][1:] + except IndexError: + break + + scapy_data_dict.chksum_status = self.packet_data.chksum_list[index] + + scapy_responses.append(scapy_data_dict) + return scapy_responses diff --git a/packet_helper_core/packet_data.py b/core/decoders/tshark_data.py similarity index 67% rename from packet_helper_core/packet_data.py rename to core/decoders/tshark_data.py index 49a8b0d..aa1df64 100644 --- a/packet_helper_core/packet_data.py +++ b/core/decoders/tshark_data.py @@ -1,31 +1,30 @@ -from dataclasses import dataclass, asdict, field -from typing import Any +from dataclasses import dataclass, field -from packet_helper_core.checksum_status import ChecksumStatus +from pyshark.packet.packet import Packet + +from core.models.checksum_status import ChecksumStatus @dataclass -class PacketData: - raw: str - chksum_list: list[Any] = field(default_factory=list) +class TSharkData: + decoded_packet: Packet + chksum_list: list[ChecksumStatus] = field(default_factory=list) _data_layer: list[str] = field(default_factory=list) def __post_init__(self): - self.raw_array = self.raw.split("\n") - self.length = self.raw_array[0].replace(")", "").split()[2] - self.array = self.raw_array[1:] + self.__pkt_information_array = str(self.decoded_packet).split("\n")[1:] - self.header = self.compose_header() - self.body = self.compose_body() - self.body2 = self.compose_body_list() + self.header: list[str] = self.__compose_header() + self.body: dict[str, list[str]] = self.__compose_body() + self.body2: list[list[str]] = self.__compose_body_list() - self.update_header() + self.__update_header() - def compose_header(self): + def __compose_header(self) -> list[str]: return [ a.replace("Layer", "").replace(":", "").replace(" ", "") - for a in self.array + for a in self.__pkt_information_array if a.startswith("Layer") ] @@ -36,10 +35,10 @@ def __is_data_element(self, layer_fragment: str) -> bool: return True return False - def compose_body(self) -> dict[str, list[str]]: + def __compose_body(self) -> dict[str, list[str]]: temp_body_dict: dict[str, list[str]] = {} actual_layer: str = "" - for x in self.array: + for x in self.__pkt_information_array: if x.startswith("Layer"): actual_layer = x.replace(":", "").split()[1] temp_body_dict[actual_layer] = [] @@ -52,14 +51,14 @@ def compose_body(self) -> dict[str, list[str]]: temp_body_dict[actual_layer].append(x) return temp_body_dict - def compose_body_list(self) -> list[list[str]]: + def __compose_body_list(self) -> list[list[str]]: temp_body_dict = [] line = [] ckhsum_flag = False data_found: list[str] = [ "RAW", ] - for arr in self.array: + for arr in self.__pkt_information_array: arr = arr.strip() if arr == "" and line: temp_body_dict.append(line) @@ -83,16 +82,16 @@ def compose_body_list(self) -> list[list[str]]: if ckhsum_flag: for y in temp_body_dict: - self.chksum_verification(y) + self.__chksum_verification(y) temp_body_dict.append(data_found) return temp_body_dict - def chksum_verification(self, element) -> None: - chksum_status = ChecksumStatus() + def __chksum_verification(self, element) -> None: + chksum_status: ChecksumStatus = ChecksumStatus() for x in element: x = x.lower() - if "header checksum" in x and "incorrect" in x: + if "headers checksum" in x and "incorrect" in x: chksum_status.chksum = x.split(":")[1].split()[0] continue if "bad checksum" in x and not chksum_status.chksum: @@ -102,10 +101,10 @@ def chksum_verification(self, element) -> None: if "calculated checksum" in x: chksum_status.chksum_calculated = x.split(":")[1].split()[0] else: - chksum_status() - self.chksum_list.append(asdict(chksum_status)) + chksum_status.verify() + self.chksum_list.append(chksum_status) - def update_header(self): - """Update header with data layer which is 'hidden' in the tshark output""" + def __update_header(self) -> None: + """Update headers with data layer which is 'hidden' in the tshark output""" if self._data_layer: self.header.append("RAW") diff --git a/core/models/__init__.py b/core/models/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/core/models/checksum_status.py b/core/models/checksum_status.py new file mode 100644 index 0000000..19ee22d --- /dev/null +++ b/core/models/checksum_status.py @@ -0,0 +1,18 @@ +from pydantic import BaseModel + + +class ChecksumStatus(BaseModel): + chksum: str = "" + chksum_calculated: str = "" + status: bool | None = None + + def verify(self) -> None: + def clean_chksum(element: str): + return element.replace("0x", "") + + if self.chksum and self.chksum_calculated: + self.status = clean_chksum(self.chksum) == clean_chksum( + self.chksum_calculated + ) + else: + self.status = None diff --git a/core/models/scapy_response.py b/core/models/scapy_response.py new file mode 100644 index 0000000..b05a520 --- /dev/null +++ b/core/models/scapy_response.py @@ -0,0 +1,19 @@ +from typing import Literal + +from pydantic import BaseModel, Field + +from core.models.checksum_status import ChecksumStatus + + +class ScapyResponse(BaseModel): + name: str + bytes_record: str # bytes + hex_record: str # hex + hex_record_full: str # hex_one + length: int + length_unit: Literal["B", "b"] # length_unit + representation: str # repr + representation_full: str # repr_full + tshark_name: str = "" + tshark_raw_summary: list[str] = Field(default_factory=list) + chksum_status: ChecksumStatus | None = None diff --git a/core/packet_helper.py b/core/packet_helper.py new file mode 100644 index 0000000..29d1641 --- /dev/null +++ b/core/packet_helper.py @@ -0,0 +1,30 @@ +from core.decoders.decode_string import decode_string +from core.decoders.tshark_data import TSharkData +from core.decoders.scapy_data import ScapyData + + +class PacketHelper: + """ + Class PacketHelper is just a wrapper to create a handy-shortcut + for preparing a data from hex string + """ + + def __init__(self) -> None: + self.hex_string, self.__decoded_by_tshark, self.__decoded_by_scapy = (None,) * 3 + + def decode(self, hex_string: str, extend_with_scapy: bool = True) -> None: + self.hex_string = hex_string.replace(" ", "") + decoded_string = decode_string(self.hex_string) + self.__decoded_by_tshark = TSharkData(decoded_packet=decoded_string) + if extend_with_scapy: + self.__decoded_by_scapy = ScapyData( + raw=self.hex_string, packet_data=self.__decoded_by_tshark + ) + + @property + def tshark_data(self) -> TSharkData | None: + return self.__decoded_by_tshark + + @property + def scapy_data(self) -> ScapyData | None: + return self.__decoded_by_scapy diff --git a/core/utils/__init__.py b/core/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/packet_helper_core/utils/conversion.py b/core/utils/conversion.py similarity index 77% rename from packet_helper_core/utils/conversion.py rename to core/utils/conversion.py index fffaafb..11b8d92 100644 --- a/packet_helper_core/utils/conversion.py +++ b/core/utils/conversion.py @@ -1,17 +1,18 @@ import importlib +from scapy.base_classes import BasePacket -def from_sh_list(packet_list): + +def from_sh_list(packet_list) -> BasePacket: imported_all = importlib.import_module("scapy.all") def remove_none(): - return {k: v for k, v in _value.items() if v is not None} + return {k: v for k, v in layer.get(_key, {}).items() if v is not None} new_packet = None for layer in packet_list: if isinstance(layer, dict): _key = [x for x in layer.keys()][0] - _value = layer.get(_key) _value = remove_none() if _key == "Ethernet": _key = "Ether" diff --git a/packet_helper_core/utils/scapy_reader.py b/core/utils/scapy_reader.py similarity index 65% rename from packet_helper_core/utils/scapy_reader.py rename to core/utils/scapy_reader.py index 5f9e3ce..314df1b 100644 --- a/packet_helper_core/utils/scapy_reader.py +++ b/core/utils/scapy_reader.py @@ -2,26 +2,25 @@ import logging import os from time import time -from typing import List from scapy.all import wrpcap, rdpcap from scapy.packet import Packet -def scapy_reader(hex_str: str) -> List[Packet]: - hex_str = binascii.unhexlify(hex_str) - if not isinstance(hex_str, bytes): +def scapy_reader(hex_str: str) -> list[Packet]: + bytes_from_hex = binascii.unhexlify(hex_str) + if not isinstance(bytes_from_hex, bytes): raise Exception("ERR:: hex_str must be in bytes!") temp_filename = f"pcap_{time()}" - wrpcap(temp_filename, hex_str) + wrpcap(temp_filename, bytes_from_hex) # type: ignore pcap_object = rdpcap(temp_filename) # try to clean after all try: os.remove(temp_filename) - except Exception: - logging.error(f"Cannot remove {temp_filename}") + except OSError as os_err: + logging.error(f"Cannot remove {temp_filename}\n{os_err}") converted_packets = [] current = pcap_object[0] diff --git a/packet_helper_core/__init__.py b/packet_helper_core/__init__.py deleted file mode 100644 index 5fa4029..0000000 --- a/packet_helper_core/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -from packet_helper_core.core import Core -from packet_helper_core.packet_data import PacketData -from packet_helper_core.packet_data_scapy import PacketDataScapy diff --git a/packet_helper_core/checksum_status.py b/packet_helper_core/checksum_status.py deleted file mode 100644 index b039e71..0000000 --- a/packet_helper_core/checksum_status.py +++ /dev/null @@ -1,16 +0,0 @@ -from dataclasses import dataclass - - -@dataclass -class ChecksumStatus: - chksum: str = "" - chksum_calculated: str = "" - status: bool | None = None - - def __call__(self, *args, **kwargs): - def clean_chksum(element: str): - return element.replace("0x", "") - - if self.chksum == "" or self.chksum_calculated == "": - return - self.status = clean_chksum(self.chksum) == clean_chksum(self.chksum_calculated) diff --git a/packet_helper_core/core.py b/packet_helper_core/core.py deleted file mode 100644 index f1a85fc..0000000 --- a/packet_helper_core/core.py +++ /dev/null @@ -1,20 +0,0 @@ -from dataclasses import dataclass - -from packet_helper_core.packet_data import PacketData -from packet_helper_core.packet_data_scapy import PacketDataScapy -from packet_helper_core.utils.utils import decode_hex - - -@dataclass -class Core: - """ - Class Core is just a wrapper to create a handy-shortcut - for preparing a data from hex string - """ - - hex_string: str = "" - - def __post_init__(self): - self.hex_string = self.hex_string.replace(" ", "") - self.tshark_data = PacketData(str(decode_hex(self.hex_string))) - self.scapy_data = PacketDataScapy(self.hex_string, self.tshark_data) diff --git a/packet_helper_core/packet_data_scapy.py b/packet_helper_core/packet_data_scapy.py deleted file mode 100644 index 41ad3d1..0000000 --- a/packet_helper_core/packet_data_scapy.py +++ /dev/null @@ -1,50 +0,0 @@ -from dataclasses import dataclass - -from scapy_helper import get_hex - -from packet_helper_core.packet_data import PacketData -from packet_helper_core.utils.scapy_reader import scapy_reader - - -@dataclass -class PacketDataScapy: - raw: str - packet_data: PacketData - - def __post_init__(self): - self.header = [x.replace("\r", "") for x in self.packet_data.header] - self.headers_scapy = scapy_reader(self.raw) - - self.headers_full = [repr(x) for x in self.headers_scapy] - self.headers_single = [f"{x.split(' |')[0]}>" for x in self.headers_full] - - self.structure = self.__make_structure() - - def __make_structure(self): - temp_structure = [] - - for e, h in enumerate(self.headers_scapy): - one_frame = self.headers_scapy[e].copy() - one_frame.remove_payload() - _dict = { - "name": h.name, - "bytes": str(h), - "hex": get_hex(h), - "hex_one": get_hex(one_frame), - "length": len(h), - "length_unit": "B", - "repr": f"{repr(h).split(' |')[0]}>", - "repr_full": repr(h), - } - - # RAW elements on the end are added to the last package as data! - try: - _dict["tshark_name"] = self.packet_data.body2[e][0] - _dict["tshark_raw_summary"] = self.packet_data.body2[e][1:] - except IndexError: - break - - _dict["chksum_status"] = self.packet_data.chksum_list[e] - - temp_structure.append(_dict) - return temp_structure diff --git a/packet_helper_core/utils/utils.py b/packet_helper_core/utils/utils.py deleted file mode 100644 index 47bd182..0000000 --- a/packet_helper_core/utils/utils.py +++ /dev/null @@ -1,55 +0,0 @@ -import pyshark -from pyshark.packet.packet import Packet -from scapy_helper import get_hex - - -def hex_str_operation(h_string, with_new_line: bool = False): - z = "" - tmp = [] - for e, x in enumerate(h_string.replace(" ", "")): - z += x - if e % 2: - tmp.append(z) - z = "" - if with_new_line: - temp_list = [] - for e, v in enumerate(tmp, 1): - if not e % 16: - temp_list.append(f"{v}\n") - continue - temp_list.append(f"{v} ") - return "".join(temp_list) - return " ".join(tmp) - - -def decode_hex(hex_str: str, use_json: bool = False) -> Packet: - frame_bytes: bytes = bytes.fromhex(hex_str) - _custom_params = [ - "-o", - "tcp.check_checksum:TRUE", - "-o", - "ip.check_checksum:TRUE", - "-o", - "stt.check_checksum:TRUE", - "-o", - "udp.check_checksum:TRUE", - "-o", - "wlan.check_checksum:TRUE", - ] - # only interested with the first packet - packet = pyshark.InMemCapture(custom_parameters=_custom_params) - return packet.parse_packet(frame_bytes) - - -def better_scapy_summary(scapy_summary) -> list: - list_ = [] - for frame in scapy_summary: - temp_frame = { - "name": frame.name, - "bytes": frame.raw_packet_cache, - "hex": get_hex(frame.raw_packet_cache), - "length": len(frame.raw_packet_cache), - "repr": f"{repr(frame).split(' |')[0]}>", - } - list_.append(temp_frame) - return list_ diff --git a/requirements-dev.txt b/requirements-dev.txt index c12ea09..3286653 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -3,3 +3,5 @@ black==22.10.0 flake8~=5.0.4 pytest==7.2.0 +requests~=2.28.1 +isort~=5.10.1 diff --git a/requirements.txt b/requirements.txt index 1db101e..27322d1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,7 @@ pyshark==0.5.3 scapy~=2.4.5 scapy_helper==0.14.8 +pydantic~=1.10.1 +fastapi~=0.85.0 +jinja2~=3.0.3 +uvicorn~=0.17.5 diff --git a/setup.py b/setup.py index 212305d..eb136af 100644 --- a/setup.py +++ b/setup.py @@ -15,8 +15,8 @@ ] setup( - name="packet_helper_core", - description="Engine to decode raw string hex into packets", + name="core", + description="Engine to decode decoded_packet string hex into packets", long_description=long_description, long_description_content_type="text/markdown", author="Nex Sabre", diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/integration/api/__init__.py b/tests/integration/api/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/integration/api/routers/__init__.py b/tests/integration/api/routers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/integration/api/routers/api/__init__.py b/tests/integration/api/routers/api/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/integration/api/routers/api/test_create.py b/tests/integration/api/routers/api/test_create.py new file mode 100644 index 0000000..0532622 --- /dev/null +++ b/tests/integration/api/routers/api/test_create.py @@ -0,0 +1,55 @@ +from fastapi import status +from fastapi.testclient import TestClient + +from api.main import app +from api.models.creator_packets import ( + CreatorPacketsObjectsRequest, + CreatorPacketsObjectsResponse, +) + +client = TestClient(app) + + +def test_post_api__success(): + response = client.post( + "/api/create", + json=CreatorPacketsObjectsRequest( + packets=[ + { + "Ethernet": { + "src": "ff:ff:ff:ff:ff:ff", + "dst": "ff:ff:ff:ff:ff:ff", + "type": 0, + } + }, + ] + ).dict(), + ) + assert response.status_code == status.HTTP_201_CREATED + creator_packets_response = CreatorPacketsObjectsResponse.parse_obj(response.json()) + assert ( + creator_packets_response.builtpacket.get("hex", "") + == "ff ff ff ff ff ff ff ff ff ff ff ff 00 00" + ) + + +def test_post_api_create__negative__packet_not_exists_in_scapy(): + response = client.post( + "/api/create", + json=CreatorPacketsObjectsRequest( + packets=[ + { + "NonExistingLayer": { + "src": "ff:ff:ff:ff:ff:ff", + "dst": "ff:ff:ff:ff:ff:ff", + "type": 0, + } + }, + ] + ).dict(), + ) + assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR + assert ( + response.json()["detail"]["error"] + == "Layer is not supported 'NonExistingLayer'" + ) diff --git a/tests/integration/api/routers/api/test_hex.py b/tests/integration/api/routers/api/test_hex.py new file mode 100644 index 0000000..2ffa9a8 --- /dev/null +++ b/tests/integration/api/routers/api/test_hex.py @@ -0,0 +1,24 @@ +import pytest +from fastapi import status +from fastapi.testclient import TestClient + +from api.main import app +from api.models.decoded_hex import DecodedHexResponse + +client = TestClient(app) + + +@pytest.mark.parametrize( + "hex_to_decode", + ( + "ffffffaaa9ff00000000001208004500003c0001000040047cbb7f0000017f00000145" + "0000280001000040067ccd7f0000017f00000100140050000000000000000050022000" + "917c0000", + ), +) +def test_get_packet(hex_to_decode: str): + response = client.get(f"api/hex/{hex_to_decode}") + assert response.status_code == status.HTTP_200_OK + assert DecodedHexResponse.parse_obj( + response.json() + ), "Response should be parsed to the 'DecodedHexResponse' without problems" diff --git a/tests/integration/api/routers/api/test_packets.py b/tests/integration/api/routers/api/test_packets.py new file mode 100644 index 0000000..02b21d9 --- /dev/null +++ b/tests/integration/api/routers/api/test_packets.py @@ -0,0 +1,31 @@ +from fastapi import status +from fastapi.testclient import TestClient + +from api.main import app +from api.models.creator_packets import CreatorPacketsRequest, CreatorPacketsResponse + +client = TestClient(app) + + +def test_post_api_packets__success(): + response = client.post( + "/api/packets", + json=CreatorPacketsRequest(packets=["Ether"]).dict(), + ) + assert response.status_code == status.HTTP_201_CREATED + json_response = CreatorPacketsResponse(**response.json()) + assert len(json_response.packets) == 1 + assert json_response.packets[0]["Ethernet"] + assert len(json_response.packets[0]["Ethernet"]) == 3 + + +def test_post_api_packets__negative__packet_not_exists_in_scapy(): + response = client.post( + "/api/packets", + json=CreatorPacketsRequest(packets=["NonExistingPacket"]).dict(), + ) + assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR + assert ( + response.json()["detail"]["error"] + == "Layer is not supported 'NonExistingPacket'" + ) diff --git a/tests/integration/api/test_main.py b/tests/integration/api/test_main.py new file mode 100644 index 0000000..e1844e2 --- /dev/null +++ b/tests/integration/api/test_main.py @@ -0,0 +1,15 @@ +from fastapi import status +from fastapi.testclient import TestClient + +from api.main import app +from api.models.info_response import VersionResponse + +client = TestClient(app) + + +def test_version(): + response = client.get("/version") + assert response.status_code == status.HTTP_200_OK + version_response = VersionResponse.parse_obj(response.json()) + assert version_response.packethelper == "0.1" + assert version_response.framework == "fastapi" diff --git a/tests/smoke/__init__.py b/tests/smoke/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/smoke/conftest.py b/tests/smoke/conftest.py new file mode 100644 index 0000000..c736a7d --- /dev/null +++ b/tests/smoke/conftest.py @@ -0,0 +1,13 @@ +import os + +import pytest + + +@pytest.fixture +def instance_uri() -> str: + return os.getenv("PACKET_HELPER_URI", "https://www.packethelper.com") + + +@pytest.fixture +def api_uri(instance_uri: str) -> str: + return f"{instance_uri}/api" diff --git a/tests/smoke/test_smoke_prod.py b/tests/smoke/test_smoke_prod.py new file mode 100644 index 0000000..7ebc15f --- /dev/null +++ b/tests/smoke/test_smoke_prod.py @@ -0,0 +1,12 @@ +from http import HTTPStatus + +import requests + + +def test_smoke(api_uri: str) -> None: + simple_packet = ( + "00001Cffffff0000000000000800450000340001000040047cc37f0000017f00000" + "14500002000010000402f7cac7f0000017f00000100000000" + ) # Ethernet / IP / IPv6 / GRE + response = requests.get(f"{api_uri}/hex/{simple_packet}") + assert response.status_code == HTTPStatus.OK diff --git a/tests/test_core.py b/tests/test_core.py deleted file mode 100644 index ec7ff6c..0000000 --- a/tests/test_core.py +++ /dev/null @@ -1,58 +0,0 @@ -from packet_helper_core import PacketData, PacketDataScapy -from packet_helper_core.core import Core -from scapy.all import IP, TCP, Ether # type: ignore -from scapy_helper import get_hex - - -class TestCore: - simple_ether_ip_tcp_hex_string = get_hex(Ether() / IP() / TCP()) - - def test_core_post_init(self): - core_results = Core(TestCore.simple_ether_ip_tcp_hex_string) - - assert isinstance(core_results.hex_string, str), "Should be String" - assert isinstance( - core_results.scapy_data, PacketDataScapy - ), "Should be PacketDataScapy" - assert isinstance(core_results.tshark_data, PacketData), "Should be PacketData" - - assert core_results.scapy_data.header == [ - "ETH", - "IP", - "TCP", - ], "Should be properly decoded" - assert core_results.tshark_data.header == [ - "ETH", - "IP", - "TCP", - ], "Should be properly decoded" - - def test_core_chksum_verification(self): - core_results = Core(get_hex(Ether() / IP() / IP() / TCP())) - assert core_results.tshark_data.chksum_list - - def test_negative_core_chksum_verification_with_wrong_chksum(self): - core_results2 = Core(get_hex(Ether() / IP() / IP(chksum=0) / TCP())) - assert core_results2.tshark_data.chksum_list[2]["chksum"] == "0x0000" - - def test_one_of_custom_problematic_cases(self): - core_results = Core( - "ffffffaaa9ff00000000001208004500003c0001000040047cbb7f0000017f" - "000001450000280001000040067ccd7f0000017f0000010014005000000000" - "0000000050022000917d0000" - ) - assert core_results.tshark_data.chksum_list[3]["chksum"] == "0x917d" - - def test_Ethernet_IP_UDP_DNS(self): - core_result = Core( - "00E01CCCCCC2001F33D9736108004500008000004000401124550A0A01010" - "A0A01040035DB66006C2D2E795681800001000200020000046D61696C0870" - "617472696F747302696E0000010001C00C0005000100002A4B0002C011C01" - "10001000100002A4C00044A358C99C011000200010001438C0006036E7332" - "C011C011000200010001438C0006036E7331C011" - ) - chksum_obj = core_result.tshark_data.chksum_list[2] - - assert chksum_obj["chksum"] == "0x2d2e" - assert chksum_obj["chksum_calculated"] == "0x2d2d" - assert chksum_obj["status"] is False diff --git a/tests/test_from_sh_list.py b/tests/test_from_sh_list.py deleted file mode 100644 index 4cd01d1..0000000 --- a/tests/test_from_sh_list.py +++ /dev/null @@ -1,17 +0,0 @@ -import pytest -from packet_helper_core.utils.conversion import from_sh_list -from scapy.packet import Packet -from scapy_helper import get_hex, to_list - -from tests.utils.example_packets import SIMPLE_IP_IN_IP_PACKET - - -@pytest.mark.parametrize( - "packet", (SIMPLE_IP_IN_IP_PACKET, SIMPLE_IP_IN_IP_PACKET / SIMPLE_IP_IN_IP_PACKET) -) -def test_from_sh_list(packet: Packet) -> None: - packet_list = to_list(packet) - new_packet = from_sh_list(packet_list) - - assert get_hex(packet) == get_hex(new_packet) - assert packet_list == to_list(new_packet) diff --git a/tests/unit/__init__.py b/tests/unit/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/unit/core/__init__.py b/tests/unit/core/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/unit/core/decoders/__init__.py b/tests/unit/core/decoders/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/unit/core/decoders/test_scapy_data.py b/tests/unit/core/decoders/test_scapy_data.py new file mode 100644 index 0000000..badf9f2 --- /dev/null +++ b/tests/unit/core/decoders/test_scapy_data.py @@ -0,0 +1,25 @@ +import pytest +from scapy.all import IP, TCP, Ether # noqa +from scapy_helper import get_hex + +from core.packet_helper import PacketHelper +from core.decoders.scapy_data import ScapyData + + +@pytest.fixture +def decode_example_packet() -> PacketHelper: + ph = PacketHelper() + ph.decode(get_hex(Ether() / IP() / IP() / TCP())) + return ph + + +def test_scapy_data(decode_example_packet: PacketHelper) -> None: + scapy_data = ScapyData( + decode_example_packet.hex_string, decode_example_packet.tshark_data + ) + assert scapy_data + assert scapy_data.headers + assert scapy_data.scapy_headers + assert scapy_data.full_scapy_representation_headers + assert scapy_data.single_scapy_representation_headers + assert scapy_data.packet_structure diff --git a/tests/unit/core/test_decoder.py b/tests/unit/core/test_decoder.py new file mode 100644 index 0000000..48c8ec1 --- /dev/null +++ b/tests/unit/core/test_decoder.py @@ -0,0 +1,71 @@ +import pytest + +from core.decoders.tshark_data import TSharkData +from core.decoders.scapy_data import ScapyData +from core.models.checksum_status import ChecksumStatus +from core.packet_helper import PacketHelper +from scapy.all import IP, TCP, Ether # noqa +from scapy_helper import get_hex + + +def test_core_post_init(): + expected_headers = [ + "ETH", + "IP", + "TCP", + ] + ph = PacketHelper() + ph.decode(get_hex(Ether() / IP() / TCP())) + + assert isinstance(ph.hex_string, str), "Should be String" + assert isinstance(ph.scapy_data, ScapyData), "Should be Scapy Data" + assert isinstance(ph.tshark_data, TSharkData), "Should be TShark Data" + assert ph.scapy_data.headers == expected_headers, "Should be properly decoded" + assert ph.tshark_data.header == expected_headers, "Should be properly decoded" + + +def test_core_chksum_verification(): + decoder = PacketHelper() + decoder.decode(get_hex(Ether() / IP() / IP() / TCP())) + assert decoder.tshark_data.chksum_list + assert len(decoder.tshark_data.chksum_list) == 4 + + +@pytest.mark.parametrize( + "packet, position_to_check, expected_chksum_value", + ( + (get_hex(Ether() / IP() / IP(chksum=0) / TCP()), 2, "0x0000"), + ( + ( + "ffffffaaa9ff00000000001208004500003c0001000040047cbb7f0000017f" + "000001450000280001000040067ccd7f0000017f0000010014005000000000" + "0000000050022000917d0000" + ), + 3, + "0x917d", + ), + ), +) +def test_negative_core_chksum_verification_with_wrong_chksum( + packet: str, position_to_check: int, expected_chksum_value: str +): + ph = PacketHelper() + ph.decode(packet) + assert ph.tshark_data.chksum_list[position_to_check].chksum == expected_chksum_value + + +def test_ethernet_ip_udp_dns(): + packet = ( + "00E01CCCCCC2001F33D9736108004500008000004000401124550A0A01010" + "A0A01040035DB66006C2D2E795681800001000200020000046D61696C0870" + "617472696F747302696E0000010001C00C0005000100002A4B0002C011C01" + "10001000100002A4C00044A358C99C011000200010001438C0006036E7332" + "C011C011000200010001438C0006036E7331C011" + ) + ph = PacketHelper() + ph.decode(packet) + chksum_obj: ChecksumStatus = ph.tshark_data.chksum_list[2] + + assert chksum_obj.chksum == "0x2d2e" + assert chksum_obj.chksum_calculated == "0x2d2d" + assert chksum_obj.status is False diff --git a/tests/unit/core/test_from_sh_list.py b/tests/unit/core/test_from_sh_list.py new file mode 100644 index 0000000..8649010 --- /dev/null +++ b/tests/unit/core/test_from_sh_list.py @@ -0,0 +1,24 @@ +from typing import Any + +import pytest +from scapy.base_classes import BasePacket + +from core.utils.conversion import from_sh_list +from scapy.packet import Packet +from scapy_helper import get_hex, to_list + +from tests.unit.core.utils.example_packets import SIMPLE_IP_IN_IP_PACKET + + +@pytest.mark.parametrize( + "packet", (SIMPLE_IP_IN_IP_PACKET, SIMPLE_IP_IN_IP_PACKET / SIMPLE_IP_IN_IP_PACKET) +) +def test_from_sh_list(packet: Packet) -> None: + packet_list: list[dict[str, Any]] = to_list(packet) + packet_generated_from_scapy_helper = from_sh_list(packet_list) + + assert isinstance(packet_generated_from_scapy_helper, BasePacket) + assert get_hex(packet) == get_hex( + packet_generated_from_scapy_helper + ), "Packets should return same hex results" + assert packet_list == to_list(packet_generated_from_scapy_helper) diff --git a/tests/test_packet_data.py b/tests/unit/core/test_packet_data.py similarity index 60% rename from tests/test_packet_data.py rename to tests/unit/core/test_packet_data.py index d5d2051..0496fd5 100644 --- a/tests/test_packet_data.py +++ b/tests/unit/core/test_packet_data.py @@ -1,17 +1,22 @@ -from packet_helper_core.packet_data import PacketData -from packet_helper_core.utils.utils import decode_hex -from scapy.layers.all import IP, TCP, Ether, IPv6 # type: ignore +from scapy.layers.all import IP, TCP, Ether, IPv6 # noqa from scapy_helper import get_hex -from tests.utils.example_packets import EXAMPLE_ETHER, EXAMPLE_ETHER_IP_IPV6_GRE_DATA +from core.decoders.decode_string import decode_string +from core.decoders.tshark_data import TSharkData +from tests.unit.core.utils.example_packets import ( + EXAMPLE_ETHER, + EXAMPLE_ETHER_IP_IPV6_GRE_DATA, +) def test_packet_data(): - packet = decode_hex(EXAMPLE_ETHER) - assert packet.__getitem__("eth"), "Layer Ether should be available in decoded hex" + decoded_pkt = decode_string(EXAMPLE_ETHER) + assert decoded_pkt.__getitem__( + "eth" + ), "Layer Ether should be available in decoded hex" - pd = PacketData(raw=str(packet)) - assert "ETH" in pd.header, "Ether header should be found at packet" + pd = TSharkData(decoded_packet=decoded_pkt) + assert "ETH" in pd.header, "Ether headers should be found at decoded_pkt" def test_decode_hex__data_should_be_present_after_gre_packet(): @@ -20,8 +25,8 @@ def test_decode_hex__data_should_be_present_after_gre_packet(): "e672031313233343435393832373334393832373334323334" ) - packet = decode_hex(EXAMPLE_ETHER_IP_IPV6_GRE_DATA) - pd = PacketData(raw=str(packet)) + decoded_pkt = decode_string(EXAMPLE_ETHER_IP_IPV6_GRE_DATA) + pd = TSharkData(decoded_packet=decoded_pkt) packet_raw_data = pd.body.get("RAW", []) assert packet_raw_data, "RAW block should be available" extracted_data_from_raw = packet_raw_data[0].split()[-1] @@ -33,7 +38,7 @@ def test_decode_hex__data_should_be_present_after_gre_packet(): def test_custom_packet_data(): frame = Ether() / IP() / IPv6() / TCP() - packet = decode_hex(get_hex(frame)) + packet = decode_string(get_hex(frame)) list_of_expected_packets = ("ETH", "IP", "IPV6", "TCP") list_of_layers_from_packet = [x.layer_name.upper() for x in packet.layers] for expected_packet in list_of_expected_packets: diff --git a/tests/unit/core/utils/__init__.py b/tests/unit/core/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/utils/example_packets.py b/tests/unit/core/utils/example_packets.py similarity index 89% rename from tests/utils/example_packets.py rename to tests/unit/core/utils/example_packets.py index d25c3d5..e3be1b8 100644 --- a/tests/utils/example_packets.py +++ b/tests/unit/core/utils/example_packets.py @@ -1,4 +1,4 @@ -from scapy.all import IP, TCP, Ether # type: ignore +from scapy.all import IP, TCP, Ether # noqa EXAMPLE_ETHER = "ff fd df af ff ff 00 00 00 00 00 12 08 00"