From 7f0777e5858312616fe9dc4293c26b2e31676651 Mon Sep 17 00:00:00 2001 From: Eliott-B Date: Thu, 19 Dec 2024 10:35:27 +0100 Subject: [PATCH 1/6] Refactor Scapy routes. Move all process in utils file. Change the output of the route. TODO: tcp-test route --- api/src/routes/scapy.py | 172 ++++++++++++++++++++++------------------ api/src/utils/scapy.py | 47 +++++++++++ 2 files changed, 140 insertions(+), 79 deletions(-) create mode 100644 api/src/utils/scapy.py diff --git a/api/src/routes/scapy.py b/api/src/routes/scapy.py index e889744..094db36 100644 --- a/api/src/routes/scapy.py +++ b/api/src/routes/scapy.py @@ -1,10 +1,10 @@ """Scapy routes modules.""" +import time + from fastapi import APIRouter, HTTPException -from scapy.config import conf -from scapy.layers.inet import ICMP, IP, TCP -from scapy.layers.l2 import Ether -from scapy.sendrecv import sr1 + +from utils.scapy import ethernet_frame, interface, ping router = APIRouter() @@ -22,54 +22,53 @@ def create_ethernet_frame(dst_mac:str, src_mac:str, eth_type:str) -> dict: dict: Ethernet frame details. """ - frame = Ether(dst=dst_mac, src=src_mac, type=int(eth_type, 16)) + frame = ethernet_frame(dst_mac, src_mac, eth_type) return { - "frame_summary": str(frame.summary()), - "frame_details": str(frame.show(dump=True)), + "frame": frame, } -@router.get("/tcp-test/{target_ip}/{target_port}", summary="Test a TCP connection") -def get_tcp_test(target_ip:str, target_port:str) -> dict: - """Test a TCP connection. - - Args: - target_ip (str): Target IP. - target_port (str): Target port. - - Returns: - dict: Result of the TCP test. - - """ - if not isinstance(target_port, int) or not 1 <= target_port <= 65535: # noqa: PLR2004 - return {"error":\ - "Le champ 'target_port' doit être un entier entre 1 et 65535."}, 400 - - packet = IP(dst=target_ip) / TCP(dport=target_port, flags="S") # Paquet SYN - response = sr1(packet, timeout=2, verbose=0) - - if response and response.haslayer(TCP): - tcp_flags = response.getlayer(TCP).flags - if tcp_flags == "SA": - return { - "message": f"Connexion TCP réussie avec \ - {target_ip}:{target_port} (SYN-ACK reçu).", - "details": response.show(dump=True), - } - if tcp_flags == "RA": - return { - "message": f"Connexion TCP refusée par \ - {target_ip}:{target_port} (RESET reçu).", - } - - return { - "message": f"Connexion TCP refusée par \ - {target_ip}:{target_port} (Flags : {tcp_flags}).", - } - - return { - "message": "Pas de réponse de la cible.", - } +# @router.get("/tcp-test/{target_ip}/{target_port}", summary="Test a TCP connection") +# def get_tcp_test(target_ip:str, target_port:str) -> dict: +# """Test a TCP connection. + +# Args: +# target_ip (str): Target IP. +# target_port (str): Target port. + +# Returns: +# dict: Result of the TCP test. + +# """ +# if not isinstance(target_port, int) or not 1 <= target_port <= 65535: # noqa: PLR2004 +# return {"error":\ +# "Le champ 'target_port' doit être un entier entre 1 et 65535."}, 400 + +# packet = IP(dst=target_ip) / TCP(dport=target_port, flags="S") # Paquet SYN +# response = sr1(packet, timeout=2, verbose=0) + +# if response and response.haslayer(TCP): +# tcp_flags = response.getlayer(TCP).flags +# if tcp_flags == "SA": +# return { +# "message": f"Connexion TCP réussie avec \ +# {target_ip}:{target_port} (SYN-ACK reçu).", +# "details": response.show(dump=True), +# } +# if tcp_flags == "RA": +# return { +# "message": f"Connexion TCP refusée par \ +# {target_ip}:{target_port} (RESET reçu).", +# } + +# return { +# "message": f"Connexion TCP refusée par \ +# {target_ip}:{target_port} (Flags : {tcp_flags}).", +# } + +# return { +# "message": "Pas de réponse de la cible.", +# } @router.get("/ping/{ip}", summary="Ping a target IP") def get_ping(ip: str) -> dict: @@ -82,35 +81,34 @@ def get_ping(ip: str) -> dict: dict: Result of the ping. """ - packet = IP(dst=ip) / ICMP() - response = sr1(packet, timeout=3, verbose=0) - - if response: - return { - "message": f"Ping réussi : {response.summary()}", - "details": response.show(dump=True), - } - - return { - "message": "Pas de réponse de la cible.", - } - -def serialize_network_interface(iface: object) -> dict: - """Convert a network interface object to a dictionary. - - Args: - iface (object): Network interface object. + try: + start_time = time.time() + packet, response = ping(ip) + rtt = (time.time() - start_time) * 1000 # Convert to ms - Returns: - dict: Dictionary representation of the network interface. + if response: + return { + "rtt_ms": round(rtt, 2), + "packet_size": len(response), + "ttl": response.ttl, + "source": packet.src, + "destination": packet.dst, + } - """ - return { - "name": str(iface.name) if hasattr(iface, "name") else None, - "ip": str(iface.ip) if hasattr(iface, "ip") else None, - "mac": str(iface.mac) if hasattr(iface, "mac") else None, - "mtu": int(iface.mtu) if hasattr(iface, "mtu") else None, - } + raise HTTPException( + status_code=404, + detail={ + "message": f"No response from {ip}", + "source": packet.src, + "destination": ip, + }, + ) + + except (OSError, ValueError) as e: + raise HTTPException( + status_code=500, + detail=f"Ping failed: {e!s}", + ) from e @router.get("/interface/{ip}", summary="Ping a target IP") def get_interface(ip: str) -> dict | None: @@ -124,20 +122,36 @@ def get_interface(ip: str) -> dict | None: """ try: - packet = IP(dst=ip) / ICMP() - response = sr1(packet, timeout=3, verbose=0) + iface, response, packet = interface(ip) if response: return { - "interface": serialize_network_interface(conf.iface), + "interface": serialize_network_interface(iface), "source_ip": packet.src, "destination_ip": packet.dst, } return { "message": f"No response from {ip}", - "interface": serialize_network_interface(conf.iface), + "interface": serialize_network_interface(iface), } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) from e + +def serialize_network_interface(iface: object) -> dict: + """Convert a network interface object to a dictionary. + + Args: + iface (object): Network interface object. + + Returns: + dict: Dictionary representation of the network interface. + + """ + return { + "name": str(iface.name) if hasattr(iface, "name") else None, + "ip": str(iface.ip) if hasattr(iface, "ip") else None, + "mac": str(iface.mac) if hasattr(iface, "mac") else None, + "mtu": int(iface.mtu) if hasattr(iface, "mtu") else None, + } diff --git a/api/src/utils/scapy.py b/api/src/utils/scapy.py new file mode 100644 index 0000000..9e02709 --- /dev/null +++ b/api/src/utils/scapy.py @@ -0,0 +1,47 @@ +"""Module for Scapy utilities.""" + +from scapy.all import ICMP, IP, Ether, conf, hexdump, sr1 + + +def ping(ipv4: str) -> tuple[IP, IP]: + """Ping an IPv4 address. + + Args: + ipv4 (str): The IPv4 address to ping. + + Returns: + str: Result of the ping. + + """ + packet = IP(dst=ipv4) / ICMP() + response = sr1(packet, timeout=3, verbose=0) + return packet, response + +def ethernet_frame(dst_mac:str, src_mac:str, eth_type:str) -> hexdump: + """Create an Ethernet frame. + + Args: + dst_mac (str): Destination MAC address. + src_mac (str): Source MAC address. + eth_type (str): Ethernet type. + + Returns: + hexdump: Ethernet frame in hexa. + + """ + frame = Ether(dst=dst_mac, src=src_mac, type=int(eth_type, 16)) + return hexdump(frame, dump=True) + +def interface(ipv4: str) -> tuple[object, IP, IP]: + """Get the network interface of an IPv4 address. + + Args: + ipv4 (str): The IPv4 address. + + Returns: + tuple: Network interface and IP object. + + """ + iface = conf.iface + packet, response = ping(ipv4) + return iface, response, packet From defab13974f70bc5aa709660a1d0e471fd58acd1 Mon Sep 17 00:00:00 2001 From: Eliott-B Date: Thu, 19 Dec 2024 11:01:19 +0100 Subject: [PATCH 2/6] Add test_tcp route --- api/src/routes/scapy.py | 90 ++++++++++++++++++++++------------------- api/src/utils/scapy.py | 26 +++++++++++- 2 files changed, 73 insertions(+), 43 deletions(-) diff --git a/api/src/routes/scapy.py b/api/src/routes/scapy.py index 094db36..db85097 100644 --- a/api/src/routes/scapy.py +++ b/api/src/routes/scapy.py @@ -4,7 +4,7 @@ from fastapi import APIRouter, HTTPException -from utils.scapy import ethernet_frame, interface, ping +from utils.scapy import ethernet_frame, interface, ping, tcp router = APIRouter() @@ -28,47 +28,53 @@ def create_ethernet_frame(dst_mac:str, src_mac:str, eth_type:str) -> dict: "frame": frame, } -# @router.get("/tcp-test/{target_ip}/{target_port}", summary="Test a TCP connection") -# def get_tcp_test(target_ip:str, target_port:str) -> dict: -# """Test a TCP connection. - -# Args: -# target_ip (str): Target IP. -# target_port (str): Target port. - -# Returns: -# dict: Result of the TCP test. - -# """ -# if not isinstance(target_port, int) or not 1 <= target_port <= 65535: # noqa: PLR2004 -# return {"error":\ -# "Le champ 'target_port' doit être un entier entre 1 et 65535."}, 400 - -# packet = IP(dst=target_ip) / TCP(dport=target_port, flags="S") # Paquet SYN -# response = sr1(packet, timeout=2, verbose=0) - -# if response and response.haslayer(TCP): -# tcp_flags = response.getlayer(TCP).flags -# if tcp_flags == "SA": -# return { -# "message": f"Connexion TCP réussie avec \ -# {target_ip}:{target_port} (SYN-ACK reçu).", -# "details": response.show(dump=True), -# } -# if tcp_flags == "RA": -# return { -# "message": f"Connexion TCP refusée par \ -# {target_ip}:{target_port} (RESET reçu).", -# } - -# return { -# "message": f"Connexion TCP refusée par \ -# {target_ip}:{target_port} (Flags : {tcp_flags}).", -# } - -# return { -# "message": "Pas de réponse de la cible.", -# } +@router.get("/tcp-test/{target_ip}/{target_port}", summary="Test a TCP connection") +def get_tcp_test(target_ip:str, target_port:int) -> dict: + """Test a TCP connection. + + Args: + target_ip (str): Target IP. + target_port (str): Target port. + + Returns: + dict: Result of the TCP test. + + """ + start_time = time.time() + status, response, packet, tcp_flags = tcp(target_ip, target_port) + rtt = (time.time() - start_time) * 1000 # Convert to ms + + if status == -1: + raise HTTPException( + status_code=404, + detail={ + "message": f"No response from {target_ip}:{target_port}", + }, + ) + + if status == 0: + return { + "message":\ + f"Connexion TCP réussie avec {target_ip}:{target_port} (SYN-ACK reçu).", + "details": { + "rtt_ms": round(rtt, 2), + "packet_size": len(response), + "ttl": response.ttl, + "source": packet.src, + "destination": packet.dst, + }, + } + + if status == 1: + return { + "message":\ + f"Connexion TCP refusée par {target_ip}:{target_port} (RESET reçu).", + } + + return { + "message": \ + f"Connexion TCP refusée par {target_ip}:{target_port} (Flags : {tcp_flags}).", + } @router.get("/ping/{ip}", summary="Ping a target IP") def get_ping(ip: str) -> dict: diff --git a/api/src/utils/scapy.py b/api/src/utils/scapy.py index 9e02709..ff71186 100644 --- a/api/src/utils/scapy.py +++ b/api/src/utils/scapy.py @@ -1,6 +1,6 @@ """Module for Scapy utilities.""" -from scapy.all import ICMP, IP, Ether, conf, hexdump, sr1 +from scapy.all import ICMP, IP, TCP, Ether, conf, hexdump, sr1 def ping(ipv4: str) -> tuple[IP, IP]: @@ -45,3 +45,27 @@ def interface(ipv4: str) -> tuple[object, IP, IP]: iface = conf.iface packet, response = ping(ipv4) return iface, response, packet + +def tcp(target_ip:str, target_port:int) -> tuple[int, IP | None, IP | None, str | None]: + """Test a TCP connection. + + Args: + target_ip (str): Target IP. + target_port (str): Target port. + + Returns: + tuple: Status of the TCP test. + + """ + packet = IP(dst=target_ip) / TCP(dport=target_port, flags="S") # Paquet SYN + response = sr1(packet, timeout=2, verbose=0) + + if response and response.haslayer(TCP): + tcp_flags = response.getlayer(TCP).flags + if tcp_flags == "SA": + return 0, response, packet, tcp_flags + if tcp_flags == "RA": + return 1, None, None, tcp_flags + + return 2, None, None, tcp_flags + return -1, None, None, None From a4ecdfbde44b005d210b86f0a89944b3a6a6151e Mon Sep 17 00:00:00 2001 From: Eliott-B Date: Thu, 19 Dec 2024 11:07:35 +0100 Subject: [PATCH 3/6] Transforms the DNS name into an IP if the IP does not satisfy the IP regex. --- api/src/routes/scapy.py | 10 +++++++++- api/src/utils/scapy.py | 17 +++++++++++++++++ 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/api/src/routes/scapy.py b/api/src/routes/scapy.py index db85097..9eaaed0 100644 --- a/api/src/routes/scapy.py +++ b/api/src/routes/scapy.py @@ -4,10 +4,12 @@ from fastapi import APIRouter, HTTPException -from utils.scapy import ethernet_frame, interface, ping, tcp +from utils.scapy import ethernet_frame, get_ip_from_dns, interface, ping, tcp router = APIRouter() +RE_IP = r"^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$" + @router.get("/ethernet-frame/{dst_mac}/{src_mac}/{eth_type}",\ summary="Create an Ethernet frame") def create_ethernet_frame(dst_mac:str, src_mac:str, eth_type:str) -> dict: @@ -40,6 +42,8 @@ def get_tcp_test(target_ip:str, target_port:int) -> dict: dict: Result of the TCP test. """ + if RE_IP.match(target_ip) is None: + target_ip = get_ip_from_dns(target_ip) start_time = time.time() status, response, packet, tcp_flags = tcp(target_ip, target_port) rtt = (time.time() - start_time) * 1000 # Convert to ms @@ -87,6 +91,8 @@ def get_ping(ip: str) -> dict: dict: Result of the ping. """ + if RE_IP.match(ip) is None: + ip = get_ip_from_dns(ip) try: start_time = time.time() packet, response = ping(ip) @@ -127,6 +133,8 @@ def get_interface(ip: str) -> dict | None: dict: Result of the ping with detailed information. """ + if RE_IP.match(ip) is None: + ip = get_ip_from_dns(ip) try: iface, response, packet = interface(ip) diff --git a/api/src/utils/scapy.py b/api/src/utils/scapy.py index ff71186..07de226 100644 --- a/api/src/utils/scapy.py +++ b/api/src/utils/scapy.py @@ -1,5 +1,7 @@ """Module for Scapy utilities.""" +import socket + from scapy.all import ICMP, IP, TCP, Ether, conf, hexdump, sr1 @@ -69,3 +71,18 @@ def tcp(target_ip:str, target_port:int) -> tuple[int, IP | None, IP | None, str return 2, None, None, tcp_flags return -1, None, None, None + +def get_ip_from_dns(dns:str) -> str | None: + """Get the IP address from a DNS name. + + Args: + dns (str): DNS name. + + Returns: + str | None: IP address. + + """ + try: + return socket.gethostbyname(dns) + except socket.gaierror: + return None From 8c63f90ea0bfe49a6155fbcba9d53d555775645f Mon Sep 17 00:00:00 2001 From: Eliott-B Date: Thu, 19 Dec 2024 11:19:19 +0100 Subject: [PATCH 4/6] Fix interface route, get the interface of the host --- api/README.md | 6 +++--- api/src/routes/scapy.py | 45 ++++------------------------------------- api/src/utils/scapy.py | 22 ++++++++++++++++---- 3 files changed, 25 insertions(+), 48 deletions(-) diff --git a/api/README.md b/api/README.md index 8cd567a..2376f6a 100644 --- a/api/README.md +++ b/api/README.md @@ -175,6 +175,6 @@ #### Get interface -| Method | URL | Description | Need token | Roles | -| ------ | --------------------- | ------------- | ---------- | ----- | -| GET | /scapy/interface/{ip} | Get interface | True | User | +| Method | URL | Description | Need token | Roles | +| ------ | ---------------- | ------------------------- | ---------- | ----- | +| GET | /scapy/interface | Get interface of the host | True | User | diff --git a/api/src/routes/scapy.py b/api/src/routes/scapy.py index 9eaaed0..0f10fec 100644 --- a/api/src/routes/scapy.py +++ b/api/src/routes/scapy.py @@ -122,50 +122,13 @@ def get_ping(ip: str) -> dict: detail=f"Ping failed: {e!s}", ) from e -@router.get("/interface/{ip}", summary="Ping a target IP") -def get_interface(ip: str) -> dict | None: - """Ping a target IP. - - Args: - ip (str): Target IP address. - - Returns: - dict: Result of the ping with detailed information. - - """ - if RE_IP.match(ip) is None: - ip = get_ip_from_dns(ip) +@router.get("/interface", summary="Get the interface of the machine") +def get_interface() -> dict | None: + """Get the network interface of the host.""" try: - iface, response, packet = interface(ip) - - if response: - return { - "interface": serialize_network_interface(iface), - "source_ip": packet.src, - "destination_ip": packet.dst, - } - - return { - "message": f"No response from {ip}", - "interface": serialize_network_interface(iface), - } + return interface() except Exception as e: raise HTTPException(status_code=500, detail=str(e)) from e -def serialize_network_interface(iface: object) -> dict: - """Convert a network interface object to a dictionary. - - Args: - iface (object): Network interface object. - Returns: - dict: Dictionary representation of the network interface. - - """ - return { - "name": str(iface.name) if hasattr(iface, "name") else None, - "ip": str(iface.ip) if hasattr(iface, "ip") else None, - "mac": str(iface.mac) if hasattr(iface, "mac") else None, - "mtu": int(iface.mtu) if hasattr(iface, "mtu") else None, - } diff --git a/api/src/utils/scapy.py b/api/src/utils/scapy.py index 07de226..d9d9012 100644 --- a/api/src/utils/scapy.py +++ b/api/src/utils/scapy.py @@ -34,7 +34,7 @@ def ethernet_frame(dst_mac:str, src_mac:str, eth_type:str) -> hexdump: frame = Ether(dst=dst_mac, src=src_mac, type=int(eth_type, 16)) return hexdump(frame, dump=True) -def interface(ipv4: str) -> tuple[object, IP, IP]: +def interface() -> tuple[object]: """Get the network interface of an IPv4 address. Args: @@ -44,9 +44,23 @@ def interface(ipv4: str) -> tuple[object, IP, IP]: tuple: Network interface and IP object. """ - iface = conf.iface - packet, response = ping(ipv4) - return iface, response, packet + return serialize_network_interface(conf.iface) + +def serialize_network_interface(iface: object) -> dict: + """Convert a network interface object to a dictionary. + + Args: + iface (object): Network interface object. + + Returns: + dict: Dictionary representation of the network interface. + + """ + return { + "name": str(iface.name) if hasattr(iface, "name") else None, + "ip": str(iface.ip) if hasattr(iface, "ip") else None, + "mac": str(iface.mac) if hasattr(iface, "mac") else None, + } def tcp(target_ip:str, target_port:int) -> tuple[int, IP | None, IP | None, str | None]: """Test a TCP connection. From 105276bf94c9231230157c225b581fb460539140 Mon Sep 17 00:00:00 2001 From: Eliott-B Date: Thu, 19 Dec 2024 11:20:21 +0100 Subject: [PATCH 5/6] Fix docstring of scapy routes --- api/src/routes/scapy.py | 33 +++------------------------------ 1 file changed, 3 insertions(+), 30 deletions(-) diff --git a/api/src/routes/scapy.py b/api/src/routes/scapy.py index 0f10fec..bde7bfc 100644 --- a/api/src/routes/scapy.py +++ b/api/src/routes/scapy.py @@ -13,17 +13,7 @@ @router.get("/ethernet-frame/{dst_mac}/{src_mac}/{eth_type}",\ summary="Create an Ethernet frame") def create_ethernet_frame(dst_mac:str, src_mac:str, eth_type:str) -> dict: - """Create an Ethernet frame. - - Args: - dst_mac (str): Destination MAC address. - src_mac (str): Source MAC address. - eth_type (str): Ethernet type. - - Returns: - dict: Ethernet frame details. - - """ + """Create an Ethernet frame.""" frame = ethernet_frame(dst_mac, src_mac, eth_type) return { @@ -32,16 +22,7 @@ def create_ethernet_frame(dst_mac:str, src_mac:str, eth_type:str) -> dict: @router.get("/tcp-test/{target_ip}/{target_port}", summary="Test a TCP connection") def get_tcp_test(target_ip:str, target_port:int) -> dict: - """Test a TCP connection. - - Args: - target_ip (str): Target IP. - target_port (str): Target port. - - Returns: - dict: Result of the TCP test. - - """ + """Test a TCP connection.""" if RE_IP.match(target_ip) is None: target_ip = get_ip_from_dns(target_ip) start_time = time.time() @@ -82,15 +63,7 @@ def get_tcp_test(target_ip:str, target_port:int) -> dict: @router.get("/ping/{ip}", summary="Ping a target IP") def get_ping(ip: str) -> dict: - """Ping a target IP. - - Args: - ip (str): Target IP address. - - Returns: - dict: Result of the ping. - - """ + """Ping a target IP.""" if RE_IP.match(ip) is None: ip = get_ip_from_dns(ip) try: From f059e0ad1f23afaeafafd40589aef394e2143018 Mon Sep 17 00:00:00 2001 From: Eliott-B Date: Thu, 19 Dec 2024 11:54:45 +0100 Subject: [PATCH 6/6] Get all interfaces of the host --- api/README.md | 10 +++++----- api/src/routes/scapy.py | 8 ++++---- api/src/utils/scapy.py | 44 ++++++++++++++++++++--------------------- 3 files changed, 31 insertions(+), 31 deletions(-) diff --git a/api/README.md b/api/README.md index 2376f6a..8022364 100644 --- a/api/README.md +++ b/api/README.md @@ -21,7 +21,7 @@ - [Create ethernet frame](#create-ethernet-frame) - [TCP test](#tcp-test) - [Ping](#ping) - - [Get interface](#get-interface) + - [Get interfaces](#get-interfaces) ## Routes @@ -173,8 +173,8 @@ | ------ | ---------------- | ----------- | ---------- | ----- | | GET | /scapy/ping/{ip} | Ping | True | User | -#### Get interface +#### Get interfaces -| Method | URL | Description | Need token | Roles | -| ------ | ---------------- | ------------------------- | ---------- | ----- | -| GET | /scapy/interface | Get interface of the host | True | User | +| Method | URL | Description | Need token | Roles | +| ------ | ----------------- | -------------------------- | ---------- | ----- | +| GET | /scapy/interfaces | Get interfaces of the host | True | User | diff --git a/api/src/routes/scapy.py b/api/src/routes/scapy.py index bde7bfc..9e25832 100644 --- a/api/src/routes/scapy.py +++ b/api/src/routes/scapy.py @@ -4,7 +4,7 @@ from fastapi import APIRouter, HTTPException -from utils.scapy import ethernet_frame, get_ip_from_dns, interface, ping, tcp +from utils.scapy import ethernet_frame, get_ip_from_dns, interfaces, ping, tcp router = APIRouter() @@ -95,11 +95,11 @@ def get_ping(ip: str) -> dict: detail=f"Ping failed: {e!s}", ) from e -@router.get("/interface", summary="Get the interface of the machine") +@router.get("/interfaces", summary="Get the interfaces of the machine") def get_interface() -> dict | None: - """Get the network interface of the host.""" + """Get the network interfaces of the host.""" try: - return interface() + return interfaces() except Exception as e: raise HTTPException(status_code=500, detail=str(e)) from e diff --git a/api/src/utils/scapy.py b/api/src/utils/scapy.py index d9d9012..02b1b6b 100644 --- a/api/src/utils/scapy.py +++ b/api/src/utils/scapy.py @@ -2,7 +2,17 @@ import socket -from scapy.all import ICMP, IP, TCP, Ether, conf, hexdump, sr1 +from scapy.all import ( + ICMP, + IP, + TCP, + Ether, + get_if_addr, + get_if_hwaddr, + get_if_list, + hexdump, + sr1, +) def ping(ipv4: str) -> tuple[IP, IP]: @@ -34,33 +44,23 @@ def ethernet_frame(dst_mac:str, src_mac:str, eth_type:str) -> hexdump: frame = Ether(dst=dst_mac, src=src_mac, type=int(eth_type, 16)) return hexdump(frame, dump=True) -def interface() -> tuple[object]: - """Get the network interface of an IPv4 address. - - Args: - ipv4 (str): The IPv4 address. +def interfaces() -> dict: + """Get network interfaces of the host. Returns: - tuple: Network interface and IP object. + dict: Network interfaces information. """ - return serialize_network_interface(conf.iface) + interfaces = {} -def serialize_network_interface(iface: object) -> dict: - """Convert a network interface object to a dictionary. - - Args: - iface (object): Network interface object. + for iface in get_if_list(): + interfaces[iface] = { + "name": iface, + "ip": get_if_addr(iface), + "mac": get_if_hwaddr(iface), + } - Returns: - dict: Dictionary representation of the network interface. - - """ - return { - "name": str(iface.name) if hasattr(iface, "name") else None, - "ip": str(iface.ip) if hasattr(iface, "ip") else None, - "mac": str(iface.mac) if hasattr(iface, "mac") else None, - } + return {"interfaces": interfaces} def tcp(target_ip:str, target_port:int) -> tuple[int, IP | None, IP | None, str | None]: """Test a TCP connection.