In [3]:
!pip install pybloom-live
!pip install scapy


Collecting scapy
  Using cached scapy-2.6.1-py3-none-any.whl.metadata (5.6 kB)
Using cached scapy-2.6.1-py3-none-any.whl (2.4 MB)
Installing collected packages: scapy
Successfully installed scapy-2.6.1


In [None]:
import csv
import requests
import time
import threading
import os
from queue import Queue
from scapy.all import sniff, IP
from pybloom_live import BloomFilter  # Use pybloom-live for Bloom Filter
import msvcrt  # Used to detect key presses in Windows

# Get Desktop path for the current user
desktop_path = os.path.join(os.path.expanduser("~"), "Desktop")

# Ask the user for a file name for the output CSV
output_file_name = input("Enter the output file name (without extension): ")
log_file_path = os.path.join(desktop_path, f"{output_file_name}.csv")

# Queue for chaining the processes
packet_queue = Queue()
ip_queue = Queue()
analysis_queue = Queue()

# Local Bloom Filter (or you can use a set)
blocklist = BloomFilter(capacity=1000000, error_rate=0.001)

# Step 1: Download Blocklists
def download_blocklists():
    # List of blocklist URLs
    blocklist_urls = [
        'https://raw.githubusercontent.com/stamparm/ipsum/master/ipsum.txt',  # Example (Emerging Threats)
        'https://raw.githubusercontent.com/firehol/blocklist-ipsets/master/ipblocks.txt',  # FireHOL
        'https://raw.githubusercontent.com/abuse-ch/abuseipdb/master/abuseipdb-blocklist.txt'  # AbuseIPDB
    ]
    
    print("[*] Starting to download blocklists...")
    for url in blocklist_urls:
        try:
            response = requests.get(url)
            if response.status_code == 200:
                print(f"[+] Downloaded blocklist from {url}")
                process_blocklist(response.text)
        except Exception as e:
            print(f"[-] Error downloading blocklist from {url}: {e}")

# Step 2: Process the Blocklist Data
def process_blocklist(data):
    # Assuming the blocklist is a list of IPs (one per line)
    for line in data.splitlines():
        line = line.strip()
        if line and is_valid_ip(line):
            blocklist.add(line)

def is_valid_ip(ip):
    try:
        # Check if the IP is in correct format
        parts = ip.split('.')
        return len(parts) == 4 and all(0 <= int(part) < 256 for part in parts)
    except ValueError:
        return False

# Step 3: Packet Capture and IP Extraction
def packet_capture(packet_queue):
    def process_packet(packet):
        if packet.haslayer(IP):
            packet_queue.put(packet)
    
    print("[*] Starting packet capture...")
    sniff(prn=process_packet, filter="ip", store=False)

def ip_extraction(packet_queue, ip_queue):
    print("[*] Starting IP extraction...")
    while True:
        packet = packet_queue.get()
        src_ip = packet[IP].src
        dst_ip = packet[IP].dst
        ip_queue.put((src_ip, dst_ip))
        packet_queue.task_done()

# Step 4: Analyze Extracted IPs with Multiple Threat Analysis Methods
def threat_analysis(ip_queue, analysis_queue):
    print("[*] Starting threat analysis...")
    while True:
        src_ip, dst_ip = ip_queue.get()
        is_malicious = False
        src_ip_flags = {'Bloom Filter': 'Clean', 'Google API': 'Clean', 'Reputation Check': 'Clean'}
        dst_ip_flags = {'Bloom Filter': 'Clean', 'Google API': 'Clean', 'Reputation Check': 'Clean'}
        
        # Chaining Method 1: Bloom Filter Check
        for ip in [src_ip, dst_ip]:
            if ip in blocklist:
                print(f"[+] Malicious IP detected by Bloom Filter: {ip}")
                is_malicious = True
                if ip == src_ip:
                    src_ip_flags['Bloom Filter'] = 'Malicious'
                else:
                    dst_ip_flags['Bloom Filter'] = 'Malicious'
        
        # Chaining Method 2: Google Safe Browsing API
        if not is_malicious:
            for ip in [src_ip, dst_ip]:
                if analyze_ip_with_google(ip):
                    print(f"[+] Malicious IP detected by Google Safe Browsing API: {ip}")
                    is_malicious = True
                    if ip == src_ip:
                        src_ip_flags['Google API'] = 'Malicious'
                    else:
                        dst_ip_flags['Google API'] = 'Malicious'
        
        # Chaining Method 3: Simple Reputation Check 
        if not is_malicious:
            for ip in [src_ip, dst_ip]:
                if check_ip_reputation(ip):
                    print(f"[+] Malicious IP detected by Reputation Check: {ip}")
                    is_malicious = True
                    if ip == src_ip:
                        src_ip_flags['Reputation Check'] = 'Malicious'
                    else:
                        dst_ip_flags['Reputation Check'] = 'Malicious'
        
        # Final decision based on all methods
        if is_malicious:
            reputation = "Malicious"
        else:
            reputation = "Clean"
        
        # Log the flags for both source and destination IP
        analysis_queue.put((src_ip, reputation, src_ip_flags))
        analysis_queue.put((dst_ip, reputation, dst_ip_flags))
        ip_queue.task_done()

# Check IP Reputation (Example method, can be extended to use a real API)
def check_ip_reputation(ip):
    # Hardcoded list of malicious IPs for testing
    malicious_ips = [
        '192.168.1.1',  # Example of a private network IP flagged as malicious
        '10.0.0.1',     # Another example private network IP flagged as malicious
        '203.0.113.5',  # Example public IP
        '198.51.100.23', # Example public IP
        '172.16.0.10',  # Example private IP flagged malicious
        '8.8.8.8',      # Google DNS server (for testing purposes, flagged malicious here)
        '1.1.1.1',      # Cloudflare DNS (flagged malicious for testing)
        '104.244.42.129', # Example public IP
        '192.0.2.123',  # Example test IP
        '198.18.0.1'    # Example special-purpose IP (used for testing malicious flagging)
    ]
    return ip in malicious_ips


# Analyze IP with Google Safe Browsing API
def analyze_ip_with_google(ip):
    API_KEY = "your_google_safe_browsing_api_key"
    URL = f"https://safebrowsing.googleapis.com/v4/threatMatches:find?key={API_KEY}"
    payload = {
        "client": {"clientId": "your-client-id", "clientVersion": "1.0"},
        "threatInfo": {
            "threatTypes": ["MALWARE", "SOCIAL_ENGINEERING"],
            "platformTypes": ["WINDOWS"],
            "threatEntryTypes": ["URL"],
            "threatEntries": [{"url": f"http://{ip}"}],
        },
    }
    try:
        response = requests.post(URL, json=payload)
        if response.status_code == 200:
            data = response.json()
            if data.get("matches"):
                return True
    except requests.exceptions.RequestException as e:
        print(f"[-] Error checking IP {ip} with Google Safe Browsing: {e}")
    return False

# Step 5: Log Results to CSV (Create file and load live output)
def result_logging(analysis_queue):
    # Create CSV file and write headers if it doesn't exist
    if not os.path.exists(log_file_path):
        with open(log_file_path, "w", newline='') as csvfile:
            csv_writer = csv.writer(csvfile)
            csv_writer.writerow(["IP", "Reputation", "Method Flags", "Time"])
    
    print("[*] Logging results to CSV...")
    with open(log_file_path, "a", newline='') as csvfile:
        csv_writer = csv.writer(csvfile)
        while True:
            ip, reputation, flags = analysis_queue.get()
            log_entry = [ip, reputation, str(flags), time.ctime()]
            csv_writer.writerow(log_entry)
            print(f"[+] Logged: {log_entry}")
            analysis_queue.task_done()

# Periodically update the blocklist
def periodic_update_blocklist():
    while True:
        download_blocklists()
        time.sleep(86400)  # Update blocklist every 24 hours

# Listen for the 'S' key to stop the program
def listen_for_stop():
    print("[*] Press 'S' to stop the program...")
    while True:
        if msvcrt.kbhit():
            if msvcrt.getch() == b'S':  # 'S' key to stop
                print("[*] 'S' key pressed. Stopping the program.")
                break

# Orchestrating Threads with Timer for 1 Minute
def main():
    # Track start time for 1 minute run
    start_time = time.time()

    # Start blocklist update thread
    threading.Thread(target=periodic_update_blocklist, daemon=True).start()
    
    # Start packet capture, IP extraction, threat analysis, and logging threads
    threads = [
        threading.Thread(target=packet_capture, args=(packet_queue,), daemon=True),
        threading.Thread(target=ip_extraction, args=(packet_queue, ip_queue), daemon=True),
        threading.Thread(target=threat_analysis, args=(ip_queue, analysis_queue), daemon=True),
        threading.Thread(target=result_logging, args=(analysis_queue,), daemon=True)
    ]

    for thread in threads:
        thread.start()

    # Listen for stop signal in the main thread
    listen_for_stop()

    # Stop the program after 1 minute if 'S' is not pressed
    while time.time() - start_time > 60:
        time.sleep(1)

    print("[*] 1 minute is up. Stopping the program.")

# Run the program
if __name__ == "__main__":
    main()
