Skip to content
This repository was archived by the owner on Feb 25, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions api/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
- [Create ethernet frame](#create-ethernet-frame)
- [TCP test](#tcp-test)
- [Ping](#ping)
- [Get interface](#get-interface)
- [Get interfaces](#get-interfaces)

## Routes

Expand Down Expand Up @@ -173,8 +173,8 @@
| ------ | ---------------- | ----------- | ---------- | ----- |
| GET | /scapy/ping/{ip} | Ping | True | User |

#### Get interface
#### Get interfaces

| Method | URL | Description | Need token | Roles |
| ------ | --------------------- | ------------- | ---------- | ----- |
| GET | /scapy/interface/{ip} | Get interface | True | User |
| Method | URL | Description | Need token | Roles |
| ------ | ----------------- | -------------------------- | ---------- | ----- |
| GET | /scapy/interfaces | Get interfaces of the host | True | User |
186 changes: 75 additions & 111 deletions api/src/routes/scapy.py
Original file line number Diff line number Diff line change
@@ -1,143 +1,107 @@
"""Scapy routes modules."""

import time

from fastapi import APIRouter, HTTPException
from scapy.config import conf
from scapy.layers.inet import ICMP, IP, TCP
from scapy.layers.l2 import Ether
from scapy.sendrecv import sr1

from utils.scapy import ethernet_frame, get_ip_from_dns, interfaces, ping, tcp

router = APIRouter()

RE_IP = r"^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$"

@router.get("/ethernet-frame/{dst_mac}/{src_mac}/{eth_type}",\
summary="Create an Ethernet frame")
def create_ethernet_frame(dst_mac:str, src_mac:str, eth_type:str) -> dict:
"""Create an Ethernet frame.

Args:
dst_mac (str): Destination MAC address.
src_mac (str): Source MAC address.
eth_type (str): Ethernet type.

Returns:
dict: Ethernet frame details.

"""
frame = Ether(dst=dst_mac, src=src_mac, type=int(eth_type, 16))
"""Create an Ethernet frame."""
frame = ethernet_frame(dst_mac, src_mac, eth_type)

return {
"frame_summary": str(frame.summary()),
"frame_details": str(frame.show(dump=True)),
"frame": frame,
}

@router.get("/tcp-test/{target_ip}/{target_port}", summary="Test a TCP connection")
def get_tcp_test(target_ip:str, target_port:str) -> dict:
"""Test a TCP connection.

Args:
target_ip (str): Target IP.
target_port (str): Target port.

Returns:
dict: Result of the TCP test.

"""
if not isinstance(target_port, int) or not 1 <= target_port <= 65535: # noqa: PLR2004
return {"error":\
"Le champ 'target_port' doit être un entier entre 1 et 65535."}, 400

packet = IP(dst=target_ip) / TCP(dport=target_port, flags="S") # Paquet SYN
response = sr1(packet, timeout=2, verbose=0)

if response and response.haslayer(TCP):
tcp_flags = response.getlayer(TCP).flags
if tcp_flags == "SA":
return {
"message": f"Connexion TCP réussie avec \
{target_ip}:{target_port} (SYN-ACK reçu).",
"details": response.show(dump=True),
}
if tcp_flags == "RA":
return {
"message": f"Connexion TCP refusée par \
{target_ip}:{target_port} (RESET reçu).",
def get_tcp_test(target_ip:str, target_port:int) -> dict:
"""Test a TCP connection."""
if RE_IP.match(target_ip) is None:
target_ip = get_ip_from_dns(target_ip)
start_time = time.time()
status, response, packet, tcp_flags = tcp(target_ip, target_port)
rtt = (time.time() - start_time) * 1000 # Convert to ms

if status == -1:
raise HTTPException(
status_code=404,
detail={
"message": f"No response from {target_ip}:{target_port}",
},
)

if status == 0:
return {
"message":\
f"Connexion TCP réussie avec {target_ip}:{target_port} (SYN-ACK reçu).",
"details": {
"rtt_ms": round(rtt, 2),
"packet_size": len(response),
"ttl": response.ttl,
"source": packet.src,
"destination": packet.dst,
},
}

if status == 1:
return {
"message": f"Connexion TCP refusée par \
{target_ip}:{target_port} (Flags : {tcp_flags}).",
}
"message":\
f"Connexion TCP refusée par {target_ip}:{target_port} (RESET reçu).",
}

return {
"message": "Pas de réponse de la cible.",
"message": \
f"Connexion TCP refusée par {target_ip}:{target_port} (Flags : {tcp_flags}).",
}

@router.get("/ping/{ip}", summary="Ping a target IP")
def get_ping(ip: str) -> dict:
"""Ping a target IP.

Args:
ip (str): Target IP address.

Returns:
dict: Result of the ping.

"""
packet = IP(dst=ip) / ICMP()
response = sr1(packet, timeout=3, verbose=0)

if response:
return {
"message": f"Ping réussi : {response.summary()}",
"details": response.show(dump=True),
}

return {
"message": "Pas de réponse de la cible.",
}

def serialize_network_interface(iface: object) -> dict:
"""Convert a network interface object to a dictionary.

Args:
iface (object): Network interface object.

Returns:
dict: Dictionary representation of the network interface.

"""
return {
"name": str(iface.name) if hasattr(iface, "name") else None,
"ip": str(iface.ip) if hasattr(iface, "ip") else None,
"mac": str(iface.mac) if hasattr(iface, "mac") else None,
"mtu": int(iface.mtu) if hasattr(iface, "mtu") else None,
}

@router.get("/interface/{ip}", summary="Ping a target IP")
def get_interface(ip: str) -> dict | None:
"""Ping a target IP.

Args:
ip (str): Target IP address.

Returns:
dict: Result of the ping with detailed information.

"""
"""Ping a target IP."""
if RE_IP.match(ip) is None:
ip = get_ip_from_dns(ip)
try:
packet = IP(dst=ip) / ICMP()
response = sr1(packet, timeout=3, verbose=0)
start_time = time.time()
packet, response = ping(ip)
rtt = (time.time() - start_time) * 1000 # Convert to ms

if response:
return {
"interface": serialize_network_interface(conf.iface),
"source_ip": packet.src,
"destination_ip": packet.dst,
"rtt_ms": round(rtt, 2),
"packet_size": len(response),
"ttl": response.ttl,
"source": packet.src,
"destination": packet.dst,
}

return {
"message": f"No response from {ip}",
"interface": serialize_network_interface(conf.iface),
}
raise HTTPException(
status_code=404,
detail={
"message": f"No response from {ip}",
"source": packet.src,
"destination": ip,
},
)

except (OSError, ValueError) as e:
raise HTTPException(
status_code=500,
detail=f"Ping failed: {e!s}",
) from e

@router.get("/interfaces", summary="Get the interfaces of the machine")
def get_interface() -> dict | None:
"""Get the network interfaces of the host."""
try:
return interfaces()

except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e


102 changes: 102 additions & 0 deletions api/src/utils/scapy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
"""Module for Scapy utilities."""

import socket

from scapy.all import (
ICMP,
IP,
TCP,
Ether,
get_if_addr,
get_if_hwaddr,
get_if_list,
hexdump,
sr1,
)


def ping(ipv4: str) -> tuple[IP, IP]:
"""Ping an IPv4 address.

Args:
ipv4 (str): The IPv4 address to ping.

Returns:
str: Result of the ping.

"""
packet = IP(dst=ipv4) / ICMP()
response = sr1(packet, timeout=3, verbose=0)
return packet, response

def ethernet_frame(dst_mac:str, src_mac:str, eth_type:str) -> hexdump:
"""Create an Ethernet frame.

Args:
dst_mac (str): Destination MAC address.
src_mac (str): Source MAC address.
eth_type (str): Ethernet type.

Returns:
hexdump: Ethernet frame in hexa.

"""
frame = Ether(dst=dst_mac, src=src_mac, type=int(eth_type, 16))
return hexdump(frame, dump=True)

def interfaces() -> dict:
"""Get network interfaces of the host.

Returns:
dict: Network interfaces information.

"""
interfaces = {}

for iface in get_if_list():
interfaces[iface] = {
"name": iface,
"ip": get_if_addr(iface),
"mac": get_if_hwaddr(iface),
}

return {"interfaces": interfaces}

def tcp(target_ip:str, target_port:int) -> tuple[int, IP | None, IP | None, str | None]:
"""Test a TCP connection.

Args:
target_ip (str): Target IP.
target_port (str): Target port.

Returns:
tuple: Status of the TCP test.

"""
packet = IP(dst=target_ip) / TCP(dport=target_port, flags="S") # Paquet SYN
response = sr1(packet, timeout=2, verbose=0)

if response and response.haslayer(TCP):
tcp_flags = response.getlayer(TCP).flags
if tcp_flags == "SA":
return 0, response, packet, tcp_flags
if tcp_flags == "RA":
return 1, None, None, tcp_flags

return 2, None, None, tcp_flags
return -1, None, None, None

def get_ip_from_dns(dns:str) -> str | None:
"""Get the IP address from a DNS name.

Args:
dns (str): DNS name.

Returns:
str | None: IP address.

"""
try:
return socket.gethostbyname(dns)
except socket.gaierror:
return None