diff --git a/api/README.md b/api/README.md index 8cd567a..8022364 100644 --- a/api/README.md +++ b/api/README.md @@ -21,7 +21,7 @@ - [Create ethernet frame](#create-ethernet-frame) - [TCP test](#tcp-test) - [Ping](#ping) - - [Get interface](#get-interface) + - [Get interfaces](#get-interfaces) ## Routes @@ -173,8 +173,8 @@ | ------ | ---------------- | ----------- | ---------- | ----- | | GET | /scapy/ping/{ip} | Ping | True | User | -#### Get interface +#### Get interfaces -| Method | URL | Description | Need token | Roles | -| ------ | --------------------- | ------------- | ---------- | ----- | -| GET | /scapy/interface/{ip} | Get interface | True | User | +| Method | URL | Description | Need token | Roles | +| ------ | ----------------- | -------------------------- | ---------- | ----- | +| GET | /scapy/interfaces | Get interfaces of the host | True | User | diff --git a/api/src/routes/scapy.py b/api/src/routes/scapy.py index e889744..9e25832 100644 --- a/api/src/routes/scapy.py +++ b/api/src/routes/scapy.py @@ -1,143 +1,107 @@ """Scapy routes modules.""" +import time + from fastapi import APIRouter, HTTPException -from scapy.config import conf -from scapy.layers.inet import ICMP, IP, TCP -from scapy.layers.l2 import Ether -from scapy.sendrecv import sr1 + +from utils.scapy import ethernet_frame, get_ip_from_dns, interfaces, ping, tcp router = APIRouter() +RE_IP = r"^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$" + @router.get("/ethernet-frame/{dst_mac}/{src_mac}/{eth_type}",\ summary="Create an Ethernet frame") def create_ethernet_frame(dst_mac:str, src_mac:str, eth_type:str) -> dict: - """Create an Ethernet frame. - - Args: - dst_mac (str): Destination MAC address. - src_mac (str): Source MAC address. - eth_type (str): Ethernet type. - - Returns: - dict: Ethernet frame details. - - """ - frame = Ether(dst=dst_mac, src=src_mac, type=int(eth_type, 16)) + """Create an Ethernet frame.""" + frame = ethernet_frame(dst_mac, src_mac, eth_type) return { - "frame_summary": str(frame.summary()), - "frame_details": str(frame.show(dump=True)), + "frame": frame, } @router.get("/tcp-test/{target_ip}/{target_port}", summary="Test a TCP connection") -def get_tcp_test(target_ip:str, target_port:str) -> dict: - """Test a TCP connection. - - Args: - target_ip (str): Target IP. - target_port (str): Target port. - - Returns: - dict: Result of the TCP test. - - """ - if not isinstance(target_port, int) or not 1 <= target_port <= 65535: # noqa: PLR2004 - return {"error":\ - "Le champ 'target_port' doit être un entier entre 1 et 65535."}, 400 - - packet = IP(dst=target_ip) / TCP(dport=target_port, flags="S") # Paquet SYN - response = sr1(packet, timeout=2, verbose=0) - - if response and response.haslayer(TCP): - tcp_flags = response.getlayer(TCP).flags - if tcp_flags == "SA": - return { - "message": f"Connexion TCP réussie avec \ - {target_ip}:{target_port} (SYN-ACK reçu).", - "details": response.show(dump=True), - } - if tcp_flags == "RA": - return { - "message": f"Connexion TCP refusée par \ - {target_ip}:{target_port} (RESET reçu).", +def get_tcp_test(target_ip:str, target_port:int) -> dict: + """Test a TCP connection.""" + if RE_IP.match(target_ip) is None: + target_ip = get_ip_from_dns(target_ip) + start_time = time.time() + status, response, packet, tcp_flags = tcp(target_ip, target_port) + rtt = (time.time() - start_time) * 1000 # Convert to ms + + if status == -1: + raise HTTPException( + status_code=404, + detail={ + "message": f"No response from {target_ip}:{target_port}", + }, + ) + + if status == 0: + return { + "message":\ + f"Connexion TCP réussie avec {target_ip}:{target_port} (SYN-ACK reçu).", + "details": { + "rtt_ms": round(rtt, 2), + "packet_size": len(response), + "ttl": response.ttl, + "source": packet.src, + "destination": packet.dst, + }, } + if status == 1: return { - "message": f"Connexion TCP refusée par \ - {target_ip}:{target_port} (Flags : {tcp_flags}).", - } + "message":\ + f"Connexion TCP refusée par {target_ip}:{target_port} (RESET reçu).", + } return { - "message": "Pas de réponse de la cible.", + "message": \ + f"Connexion TCP refusée par {target_ip}:{target_port} (Flags : {tcp_flags}).", } @router.get("/ping/{ip}", summary="Ping a target IP") def get_ping(ip: str) -> dict: - """Ping a target IP. - - Args: - ip (str): Target IP address. - - Returns: - dict: Result of the ping. - - """ - packet = IP(dst=ip) / ICMP() - response = sr1(packet, timeout=3, verbose=0) - - if response: - return { - "message": f"Ping réussi : {response.summary()}", - "details": response.show(dump=True), - } - - return { - "message": "Pas de réponse de la cible.", - } - -def serialize_network_interface(iface: object) -> dict: - """Convert a network interface object to a dictionary. - - Args: - iface (object): Network interface object. - - Returns: - dict: Dictionary representation of the network interface. - - """ - return { - "name": str(iface.name) if hasattr(iface, "name") else None, - "ip": str(iface.ip) if hasattr(iface, "ip") else None, - "mac": str(iface.mac) if hasattr(iface, "mac") else None, - "mtu": int(iface.mtu) if hasattr(iface, "mtu") else None, - } - -@router.get("/interface/{ip}", summary="Ping a target IP") -def get_interface(ip: str) -> dict | None: - """Ping a target IP. - - Args: - ip (str): Target IP address. - - Returns: - dict: Result of the ping with detailed information. - - """ + """Ping a target IP.""" + if RE_IP.match(ip) is None: + ip = get_ip_from_dns(ip) try: - packet = IP(dst=ip) / ICMP() - response = sr1(packet, timeout=3, verbose=0) + start_time = time.time() + packet, response = ping(ip) + rtt = (time.time() - start_time) * 1000 # Convert to ms if response: return { - "interface": serialize_network_interface(conf.iface), - "source_ip": packet.src, - "destination_ip": packet.dst, + "rtt_ms": round(rtt, 2), + "packet_size": len(response), + "ttl": response.ttl, + "source": packet.src, + "destination": packet.dst, } - return { - "message": f"No response from {ip}", - "interface": serialize_network_interface(conf.iface), - } + raise HTTPException( + status_code=404, + detail={ + "message": f"No response from {ip}", + "source": packet.src, + "destination": ip, + }, + ) + + except (OSError, ValueError) as e: + raise HTTPException( + status_code=500, + detail=f"Ping failed: {e!s}", + ) from e + +@router.get("/interfaces", summary="Get the interfaces of the machine") +def get_interface() -> dict | None: + """Get the network interfaces of the host.""" + try: + return interfaces() except Exception as e: raise HTTPException(status_code=500, detail=str(e)) from e + + diff --git a/api/src/utils/scapy.py b/api/src/utils/scapy.py new file mode 100644 index 0000000..02b1b6b --- /dev/null +++ b/api/src/utils/scapy.py @@ -0,0 +1,102 @@ +"""Module for Scapy utilities.""" + +import socket + +from scapy.all import ( + ICMP, + IP, + TCP, + Ether, + get_if_addr, + get_if_hwaddr, + get_if_list, + hexdump, + sr1, +) + + +def ping(ipv4: str) -> tuple[IP, IP]: + """Ping an IPv4 address. + + Args: + ipv4 (str): The IPv4 address to ping. + + Returns: + str: Result of the ping. + + """ + packet = IP(dst=ipv4) / ICMP() + response = sr1(packet, timeout=3, verbose=0) + return packet, response + +def ethernet_frame(dst_mac:str, src_mac:str, eth_type:str) -> hexdump: + """Create an Ethernet frame. + + Args: + dst_mac (str): Destination MAC address. + src_mac (str): Source MAC address. + eth_type (str): Ethernet type. + + Returns: + hexdump: Ethernet frame in hexa. + + """ + frame = Ether(dst=dst_mac, src=src_mac, type=int(eth_type, 16)) + return hexdump(frame, dump=True) + +def interfaces() -> dict: + """Get network interfaces of the host. + + Returns: + dict: Network interfaces information. + + """ + interfaces = {} + + for iface in get_if_list(): + interfaces[iface] = { + "name": iface, + "ip": get_if_addr(iface), + "mac": get_if_hwaddr(iface), + } + + return {"interfaces": interfaces} + +def tcp(target_ip:str, target_port:int) -> tuple[int, IP | None, IP | None, str | None]: + """Test a TCP connection. + + Args: + target_ip (str): Target IP. + target_port (str): Target port. + + Returns: + tuple: Status of the TCP test. + + """ + packet = IP(dst=target_ip) / TCP(dport=target_port, flags="S") # Paquet SYN + response = sr1(packet, timeout=2, verbose=0) + + if response and response.haslayer(TCP): + tcp_flags = response.getlayer(TCP).flags + if tcp_flags == "SA": + return 0, response, packet, tcp_flags + if tcp_flags == "RA": + return 1, None, None, tcp_flags + + return 2, None, None, tcp_flags + return -1, None, None, None + +def get_ip_from_dns(dns:str) -> str | None: + """Get the IP address from a DNS name. + + Args: + dns (str): DNS name. + + Returns: + str | None: IP address. + + """ + try: + return socket.gethostbyname(dns) + except socket.gaierror: + return None