# Advanced Port Scan Detector

**Author:** Raiza Rosas

**Date:** 2026-02-11

**Description:** Analyzes PCAPs and detects multiple scan types with IOC extraction

In [4]:
%pip install scapy
from scapy.all import *
from collections import defaultdict, Counter
import json
from datetime import datetime
import sys

class ScanDetector:
    def __init__(self, pcap_file):
        self.pcap_file = pcap_file
        self.scan_types = {
            'SYN': 0,
            'FIN': 0,
            'XMAS': 0,
            'NULL': 0,
            'ACK': 0,
            'UNKNOWN': 0
        }
        self.source_ips = defaultdict(lambda: {
            'ports_contacted': set(),
            'packet_count': 0,
            'scan_types': [],
            'first_seen': None,
            'last_seen': None,
            'avg_packet_rate': 0
        })
        self.total_packets = 0
        
    def classify_scan_type(self, flags):
        """
        Classify scan based on TCP flags
        Returns: Scan type as string
        """
        # TCP flags as integer
        F = 0x01  # FIN
        S = 0x02  # SYN
        R = 0x04  # RST
        P = 0x08  # PSH
        A = 0x10  # ACK
        U = 0x20  # URG
        
        if flags == S:
            return 'SYN'
        elif flags == F:
            return 'FIN'
        elif flags == (F | P | U):  # 0x29
            return 'XMAS'
        elif flags == 0:
            return 'NULL'
        elif flags == A:
            return 'ACK'
        else:
            return 'UNKNOWN'
        
    def analyze_packet(self, pkt):
        """Analyze individual packet"""
        if not pkt.haslayer(TCP) or not pkt.haslayer(IP):
            return
            
        self.total_packets += 1
        
        src_ip = pkt[IP].src
        dst_port = pkt[TCP].dport
        flags = int(pkt[TCP].flags)
        timestamp = float(pkt.time)
        
        # Initialize timestamps
        if self.source_ips[src_ip]['first_seen'] is None:
            self.source_ips[src_ip]['first_seen'] = timestamp
        self.source_ips[src_ip]['last_seen'] = timestamp
        
        # Update counters
        self.source_ips[src_ip]['packet_count'] += 1
        self.source_ips[src_ip]['ports_contacted'].add(dst_port)
        
        # Classify and track scan type
        scan_type = self.classify_scan_type(flags)
        self.scan_types[scan_type] += 1
        self.source_ips[src_ip]['scan_types'].append(scan_type)
    
    def calculate_metrics(self):
        """Calculate detection metrics for each IP"""
        for src_ip, data in self.source_ips.items():
            if data['first_seen'] and data['last_seen']:
                duration = data['last_seen'] - data['first_seen']
                if duration > 0:
                    data['avg_packet_rate'] = data['packet_count'] / duration
                else:
                    data['avg_packet_rate'] = data['packet_count']
    
    def detect_scans(self):
        """Main detection logic"""
        print("\n" + "="*70)
        print("ADVANCED PORT SCAN DETECTOR")
        print("="*70)
        print(f"\n[*] Loading PCAP: {self.pcap_file}")
        
        try:
            packets = rdpcap(self.pcap_file)
        except Exception as e:
            print(f"[!] Error loading PCAP: {e}")
            sys.exit(1)
        
        print(f"[*] Total packets in PCAP: {len(packets)}")
        print(f"[*] Analyzing packets...\n")
        
        # Analyze all packets
        for pkt in packets:
            self.analyze_packet(pkt)
        
        # Calculate metrics
        self.calculate_metrics()
        
        # Display results
        self.display_results()
        
        # Save IOCs
        self.save_iocs()
    
    def display_results(self):
        """Display detection results"""
        print("="*70)
        print("SCAN TYPE DISTRIBUTION")
        print("="*70)
        
        for scan_type, count in sorted(self.scan_types.items(), key=lambda x: x[1], reverse=True):
            if count > 0:
                percentage = (count / self.total_packets) * 100
                print(f"  {scan_type:8s} → {count:5d} packets ({percentage:5.2f}%)")
        
        print("\n" + "="*70)
        print("SUSPICIOUS SOURCE IPs")
        print("="*70)
        
        # Sort by number of ports contacted
        sorted_ips = sorted(self.source_ips.items(), 
                          key=lambda x: len(x[1]['ports_contacted']), 
                          reverse=True)
        
        for src_ip, data in sorted_ips:
            ports_count = len(data['ports_contacted'])
            
            # Threshold: >20 ports = suspicious
            if ports_count > 20:
                # Determine severity
                if ports_count > 100:
                    severity = "CRITICAL"
                elif ports_count > 50:
                    severity = "HIGH"
                else:
                    severity = "MEDIUM"
                
                # Get most common scan type
                scan_type_counts = Counter(data['scan_types'])
                most_common_scan = scan_type_counts.most_common(1)[0][0] if scan_type_counts else "UNKNOWN"
                
                # Calculate scan duration
                duration = data['last_seen'] - data['first_seen'] if data['last_seen'] and data['first_seen'] else 0
                
                print(f"\n{severity} {src_ip}")
                print(f"  ├─ Ports Contacted: {ports_count}")
                print(f"  ├─ Total Packets: {data['packet_count']}")
                print(f"  ├─ Primary Scan Type: {most_common_scan}")
                print(f"  ├─ Scan Duration: {duration:.2f} seconds")
                print(f"  ├─ Packet Rate: {data['avg_packet_rate']:.2f} packets/sec")
                
                # Detect slow scan
                if data['avg_packet_rate'] < 1.0 and ports_count > 20:
                    print(f"  └─ SLOW SCAN DETECTED (likely evasion attempt)")
                else:
                    print(f"  └─ Severity: {severity.split()[1]}")
    
    def save_iocs(self):
        """Save Indicators of Compromise to JSON"""
        iocs = {
            'metadata': {
                'timestamp': datetime.now().isoformat(),
                'pcap_analyzed': self.pcap_file,
                'total_packets': self.total_packets,
                'analyst': '[Your Name]'
            },
            'scan_summary': self.scan_types,
            'malicious_ips': []
        }
        
        for src_ip, data in self.source_ips.items():
            ports_count = len(data['ports_contacted'])
            
            if ports_count > 20:
                # Determine severity
                if ports_count > 100:
                    severity = "CRITICAL"
                elif ports_count > 50:
                    severity = "HIGH"
                else:
                    severity = "MEDIUM"
                
                # Get unique scan types
                unique_scans = list(set(data['scan_types']))
                
                iocs['malicious_ips'].append({
                    'ip_address': src_ip,
                    'ports_scanned': ports_count,
                    'packet_count': data['packet_count'],
                    'scan_types': unique_scans,
                    'severity': severity,
                    'avg_packet_rate': round(data['avg_packet_rate'], 2),
                    'scan_duration': round(data['last_seen'] - data['first_seen'], 2) if data['last_seen'] and data['first_seen'] else 0,
                    'first_seen': datetime.fromtimestamp(data['first_seen']).isoformat() if data['first_seen'] else None,
                    'last_seen': datetime.fromtimestamp(data['last_seen']).isoformat() if data['last_seen'] else None
                })
        
        # Save to file
        output_file = 'scan_iocs.json'
        with open(output_file, 'w') as f:
            json.dump(iocs, f, indent=4)
        
        print(f"\n{'='*70}")
        print(f"[+] IOCs saved to: {output_file}")
        print(f"[+] Total malicious IPs identified: {len(iocs['malicious_ips'])}")
        print("="*70 + "\n")

if __name__ == "__main__":
    if len(sys.argv) != 2:
        print("Usage: python3 scan_analyzer.py <pcap_file>")
        sys.exit(1)
    
    pcap_file = sys.argv[1]
    
    detector = ScanDetector(pcap_file)
    detector.detect_scans()

Collecting scapy
  Downloading scapy-2.7.0-py3-none-any.whl.metadata (5.8 kB)
Downloading scapy-2.7.0-py3-none-any.whl (2.6 MB)
   ---------------------------------------- 0.0/2.6 MB ? eta -:--:--
   ---------------------------------------- 0.0/2.6 MB ? eta -:--:--
   ---------------------------------------- 0.0/2.6 MB ? eta -:--:--
   ---------------------------------------- 0.0/2.6 MB ? eta -:--:--
   ---------------------------------------- 0.0/2.6 MB ? eta -:--:--
   ---------------------------------------- 0.0/2.6 MB ? eta -:--:--
   ---- ----------------------------------- 0.3/2.6 MB ? eta -:--:--
   ---- ----------------------------------- 0.3/2.6 MB ? eta -:--:--
   -------- ------------------------------- 0.5/2.6 MB 837.5 kB/s eta 0:00:03
   -------- ------------------------------- 0.5/2.6 MB 837.5 kB/s eta 0:00:03
   -------- ------------------------------- 0.5/2.6 MB 837.5 kB/s eta 0:00:03
   ------------ --------------------------- 0.8/2.6 MB 500.8 kB/s eta 0:00:04
   -----




ADVANCED PORT SCAN DETECTOR

[*] Loading PCAP: --f=c:\Users\raiza\AppData\Roaming\jupyter\runtime\kernel-v30c68cf03644a0feb3bd8e74774999a5cad057028.json
[!] Error loading PCAP: [Errno 22] Invalid argument: '--f=c:\\Users\\raiza\\AppData\\Roaming\\jupyter\\runtime\\kernel-v30c68cf03644a0feb3bd8e74774999a5cad057028.json'


SystemExit: 1

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)
