<a href="https://colab.research.google.com/github/cikidano/bm/blob/master/analyze_wifi_pcap_using_scapy.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [8]:
!pip install scapy
from scapy.all import *
!pip install prettytable
from prettytable import PrettyTable



In [15]:
def filter_packets(packets):
    tcp_packets = packets.filter(lambda p: TCP in p)
    udp_packets = packets.filter(lambda p: UDP in p)
    icmp_packets = packets.filter(lambda p: ICMP in p)
    other_packets = packets.filter(lambda p: not (TCP in p or UDP in p or ICMP in p))
    return tcp_packets, udp_packets, icmp_packets, other_packets

def filter_packets_by_source_ip(packets, source_ip):
    filtered_packets = packets.filter(lambda p: IP in p and p[IP].src == source_ip)
    return filtered_packets

def filter_packets_by_destination_port(packets, dest_port):
    filtered_packets = packets.filter(lambda p: TCP in p and p[TCP].dport == dest_port)
    return filtered_packets

#def filter_packets_by_mac_address(packets, mac_address):
 #   filtered_packets = packets.filter(lambda p: Ether in p and (p.src == mac_address or p.dst == mac_address))
  #  return filtered_packets

def filter_packets_by_mac_address(packets, mac_address):
    def match(p):
        if Dot11 not in p:
            return False
        d = p[Dot11]
        return mac_address in [d.addr1, d.addr2, d.addr3, getattr(d, "addr4", None)]
    return packets.filter(match)

def filter_packets_by_ip_range(packets, start_ip, end_ip):
    filtered_packets = packets.filter(lambda p: IP in p and start_ip <= p[IP].src <= end_ip)
    return filtered_packets

def filter_packets_by_protocol(packets, protocol):
    filtered_packets = packets.filter(lambda p: p.haslayer(protocol))
    return filtered_packets

def parse_dns_packets(packets):
    dns_packets = packets.filter(lambda p: DNS in p)
    domain_info = []
    for packet in dns_packets:
        domain_name = packet[DNS].qd.qname.decode('utf-8')
        domain_info.append(domain_name)
    return domain_info

def filter_suspicious_packets(packets, max_payload_size):
    suspicious_packets = packets.filter(lambda p: TCP in p and len(p[TCP].payload) > max_payload_size)
    return suspicious_packets

def filter_packets_by_tcp_flags(packets, flags):
    filtered_packets = packets.filter(lambda p: TCP in p and p[TCP].flags & flags)
    return filtered_packets

def parse_beacon_frames(packets):
    beacon_frames = packets.filter(lambda p: p.haslayer(Dot11Beacon))
    channel_info = {}
    for beacon in beacon_frames:
        ssid = beacon.info.decode('utf-8', 'ignore')
        bssid = beacon.addr3
        channel = ord(beacon[Dot11Elt:3].info)
        power_constraint = get_power_constraint(beacon)
        if channel not in channel_info:
            channel_info[channel] = []
        channel_info[channel].append((ssid, bssid, power_constraint))
    return channel_info

def get_power_constraint(beacon):
    power_constraint_elem = beacon.getlayer(Dot11Elt, ID=32)
    if power_constraint_elem:
        power_constraint = ord(power_constraint_elem.info)
        return power_constraint
    return None

def parse_management_frames(packets):
    management_frames = packets.filter(lambda p: p.haslayer(Dot11) and p.type == 0)
    frame_info = []
    for frame in management_frames:
        frame_type = frame.subtype
        source_mac = frame.addr2
        destination_mac = frame.addr1
        frame_info.append((frame_type, source_mac, destination_mac))
    return frame_info

def get_frame_type_explanation(frame_type):
    frame_types = {
        0: "Association Request",
        1: "Association Response",
        2: "Reassociation Request",
        3: "Reassociation Response",
        4: "Probe Request",
        5: "Probe Response",
        8: "Beacon",
        9: "ATIM",
        10: "Disassociation",
        11: "Authentication",
        12: "Deauthentication",
        13: "Action"
    }
    return frame_types.get(frame_type, "Unknown")

def summarize_findings(tcp_packets, udp_packets, icmp_packets, other_packets, source_ip_packets, dest_port_packets,
                       mac_address_packets, ip_range_packets, http_packets, suspicious_packets, syn_packets,
                       channel_info, management_frame_info, domain_info):

    table = PrettyTable()
    table.field_names = ["Finding", "Count/Value"]
    table.align["Finding"] = "l"
    table.align["Count/Value"] = "c"

    table.add_row(["TCP Packets", len(tcp_packets)])
    table.add_row(["UDP Packets", len(udp_packets)])
    table.add_row(["ICMP Packets", len(icmp_packets)])
    table.add_row(["Other Packets", len(other_packets)])
    table.add_row(["Packets from Source IP 192.168.0.1", len(source_ip_packets)])
    table.add_row(["Packets to Destination Port 80", len(dest_port_packets)])
    table.add_row(["Packets from/to MAC c0:7a:d6:9c:91:1f", len(mac_address_packets)])
    table.add_row(["Packets in IP Range 192.168.0.1 - 192.168.0.100", len(ip_range_packets)])
    table.add_row(["HTTP Packets", len(http_packets)])
    table.add_row(["Suspicious Packets (Payload > 1024)", len(suspicious_packets)])
    table.add_row(["SYN Packets", len(syn_packets)])

    beacon_channels = ", ".join(str(channel) for channel in channel_info.keys())
    table.add_row(["Beacon Frame Channels", beacon_channels])

    for frame_type, source_mac, destination_mac in management_frame_info:
        frame_type_explanation = get_frame_type_explanation(frame_type)
        table.add_row([f"Management Frame: {frame_type_explanation}", ""])

    dns_domains = "\n".join(domain_info)
    table.add_row(["DNS Domains", dns_domains])

    return table

def analyze_pcap(pcap_file):
    """
    Analyzes a pcap file and performs various packet filtering and parsing operations.

    Args:
        pcap_file: The path to the pcap file to analyze.
    """
    target_mac = "c0:7a:d6:9c:91:1f"

    packets = rdpcap(pcap_file)
    tcp_packets, udp_packets, icmp_packets, other_packets = filter_packets(packets)
    source_ip_packets = filter_packets_by_source_ip(packets, "192.168.0.1")
    dest_port_packets = filter_packets_by_destination_port(packets, 80)
    mac_address_packets = filter_packets_by_mac_address(packets, target_mac)
    ip_range_packets = filter_packets_by_ip_range(packets, "192.168.0.1", "192.168.0.100")
    http_packets = filter_packets_by_protocol(packets, TCP)
    domain_info = parse_dns_packets(packets)
    suspicious_packets = filter_suspicious_packets(packets, 1024)
    syn_packets = filter_packets_by_tcp_flags(packets, 0x02)  # SYN flag
    channel_info = parse_beacon_frames(packets)
    management_frame_info = parse_management_frames(packets)

    print("Detailed Output:")
    print(f"Number of TCP packets: {len(tcp_packets)}")
    print(f"Number of UDP packets: {len(udp_packets)}")
    print(f"Number of ICMP packets: {len(icmp_packets)}")
    print(f"Number of other packets: {len(other_packets)}")
    print(f"Packets from source IP 192.168.0.1: {len(source_ip_packets)}")
    print(f"Packets to destination port 80: {len(dest_port_packets)}")
    print(f"Packets from/to MAC address {target_mac}: {len(mac_address_packets)}")
    print(f"Packets in IP range 192.168.0.1 - 192.168.0.100: {len(ip_range_packets)}")
    print(f"HTTP packets: {len(http_packets)}")
    print(f"Suspicious packets (payload size > 1024): {len(suspicious_packets)}")
    print(f"SYN packets: {len(syn_packets)}")

    print("\nBeacon frame information by channel:")
    for channel, beacons in channel_info.items():
        print(f"Channel {channel}:")
        for ssid, bssid, power_constraint in beacons:
            print(f"  SSID: {ssid}, BSSID: {bssid}, Power Constraint: {power_constraint} dBm")

    print("\nManagement frame information:")
    for frame_type, source_mac, destination_mac in management_frame_info:
        frame_type_explanation = get_frame_type_explanation(frame_type)
        print(f"  Frame Type: {frame_type} ({frame_type_explanation}), Source MAC: {source_mac}, Destination MAC: {destination_mac}")

    print("\nDomain information from DNS packets:")
    for domain_name in domain_info:
        print(f"  {domain_name}")

    summary_table = summarize_findings(
        tcp_packets, udp_packets, icmp_packets, other_packets,
        source_ip_packets, dest_port_packets, mac_address_packets,
        ip_range_packets, http_packets, suspicious_packets, syn_packets,
        channel_info, management_frame_info, domain_info
    )

    print("\nSummary of Findings:")
    print(summary_table)


from scapy.all import rdpcap, Dot11, RadioTap
from collections import defaultdict
import numpy as np

def extract_device_features(pcap_file):
    packets = rdpcap(pcap_file)

    per_dev_sizes = defaultdict(list)
    per_dev_times = defaultdict(list)
    per_dev_signals = defaultdict(list)
    per_dev_types = defaultdict(list)

    for pkt in packets:
        if Dot11 not in pkt:
            continue

        mac = pkt.addr2
        if not mac:
            continue

        # Zeitstempel
        try:
            ts = float(pkt.time)
        except (TypeError, ValueError):
            continue

        # Paketgröße
        try:
            size = int(len(pkt))
        except (TypeError, ValueError):
            continue

        # Frame-Typ
        ftype = getattr(pkt, "type", None)
        if ftype is None:
            continue

        per_dev_sizes[mac].append(size)
        per_dev_times[mac].append(ts)
        per_dev_types[mac].append(int(ftype))

        # Signalstärke
        if RadioTap in pkt and hasattr(pkt[RadioTap], "dBm_AntSignal"):
            sig = pkt[RadioTap].dBm_AntSignal
            # Manche Implementierungen liefern None oder nicht-numerisch
            try:
                sig = float(sig)
                per_dev_signals[mac].append(sig)
            except (TypeError, ValueError):
                pass

    features = {}

    for mac in per_dev_sizes.keys():
        sizes = np.array(per_dev_sizes[mac], dtype=float)
        times = np.array(per_dev_times[mac], dtype=float)
        types = np.array(per_dev_types[mac], dtype=int)
        sigs = np.array(per_dev_signals.get(mac, []), dtype=float)

        if len(sizes) == 0 or len(times) == 0:
            continue

        # Inter-Arrival Times
        if len(times) > 1:
            sorted_times = np.sort(times)
            iat = np.diff(sorted_times)
            iat = iat[iat >= 0]  # negative oder kaputte Zeiten entfernen
            if len(iat) == 0:
                iat = np.array([0.0])
        else:
            iat = np.array([0.0])

        total = len(types)
        if total == 0:
            continue

        pct_mgmt = np.sum(types == 0) / total
        pct_ctrl = np.sum(types == 1) / total
        pct_data = np.sum(types == 2) / total

        dev_feat = {
            "packet_count": int(total),

            "avg_size": float(np.mean(sizes)),
            "std_size": float(np.std(sizes)),
            "min_size": float(np.min(sizes)),
            "max_size": float(np.max(sizes)),

            "avg_iat": float(np.mean(iat)),
            "std_iat": float(np.std(iat)),
            "min_iat": float(np.min(iat)),
            "max_iat": float(np.max(iat)),
            "cv_iat": float(np.std(iat) / (np.mean(iat) + 1e-6)),

            "pct_mgmt": float(pct_mgmt),
            "pct_ctrl": float(pct_ctrl),
            "pct_data": float(pct_data),
        }

        if len(sigs) > 0:
            dev_feat.update({
                "avg_signal": float(np.mean(sigs)),
                "std_signal": float(np.std(sigs)),
                "range_signal": float(np.max(sigs) - np.min(sigs)),
            })
        else:
            dev_feat.update({
                "avg_signal": 0.0,
                "std_signal": 0.0,
                "range_signal": 0.0,
            })

        features[mac] = dev_feat

    return features

if __name__ == "__main__":
    feats = extract_device_features("pluto_capture.pcap")
    for mac, f in feats.items():
        print(f"\n=== Gerät {mac} ===")
        for k, v in f.items():
            print(f"{k}: {v}")






def main():
    pcap_file = "pluto_capture.pcap"
    analyze_pcap(pcap_file)




=== Gerät c0:7a:d6:9c:91:1f ===
packet_count: 981
avg_size: 44.84913353720693
std_size: 0.76204013121133
min_size: 41.0
max_size: 45.0
avg_iat: 0.045986886657014184
std_iat: 0.08484827798653669
min_iat: 6.699562072753906e-05
max_iat: 2.0961358547210693
cv_iat: 1.845013636294057
pct_mgmt: 0.0
pct_ctrl: 0.9622833843017329
pct_data: 0.03771661569826707
avg_signal: 34.993883792048926
std_signal: 5.150137642802244
range_signal: 40.0

=== Gerät 94:83:c4:03:a8:38 ===
packet_count: 706
avg_size: 45.0
std_size: 0.0
min_size: 45.0
max_size: 45.0
avg_iat: 0.06535817612992956
std_iat: 0.12314110613578291
min_iat: 0.00011801719665527344
max_iat: 2.110898017883301
cv_iat: 1.8840676004083472
pct_mgmt: 0.0
pct_ctrl: 1.0
pct_data: 0.0
avg_signal: 33.71529745042493
std_signal: 2.416824523118281
range_signal: 15.0

=== Gerät 4c:34:88:55:59:e7 ===
packet_count: 5
avg_size: 45.0
std_size: 0.0
min_size: 45.0
max_size: 45.0
avg_iat: 6.746363043785095
std_iat: 5.830341042514428
min_iat: 0.1395130157470703
ma

In [12]:
if __name__ == "__main__":
    main()

Detailed Output:
Number of TCP packets: 0
Number of UDP packets: 0
Number of ICMP packets: 0
Number of other packets: 1897
Packets from source IP 192.168.0.1: 0
Packets to destination port 80: 0
Packets from/to MAC address c0:7a:d6:9c:91:1f: 1716
Packets in IP range 192.168.0.1 - 192.168.0.100: 0
HTTP packets: 0
Suspicious packets (payload size > 1024): 0
SYN packets: 0

Beacon frame information by channel:

Management frame information:

Domain information from DNS packets:

Summary of Findings:
+-------------------------------------------------+-------------+
| Finding                                         | Count/Value |
+-------------------------------------------------+-------------+
| TCP Packets                                     |      0      |
| UDP Packets                                     |      0      |
| ICMP Packets                                    |      0      |
| Other Packets                                   |     1897    |
| Packets from Source IP 192.168.0.1

In [None]:
from google.colab import drive
drive.mount('/content/drive')