# HTTPS Proxy Interception Testing Prototype

This notebook implements a basic prototype for testing the HTTPS proxy interception with LLM-based security analysis. We'll test:

1. Basic mitmproxy setup and interception
2. Simple LLM analyzer service
3. Token detection and redaction
4. Event logging

In [67]:
import json
import re
import sqlite3
import uuid
from datetime import datetime
import requests
from flask import Flask, request, jsonify

# Token pattern matching
TOKEN_RE = re.compile(r'(sk-[A-Za-z0-9-_]{16,}|[A-Za-z0-9_]{24}\.[A-Za-z0-9_]{6}\.[A-Za-z0-9_-]{27}|[\w-]{24}\.[\w-]{6}\.[\w-]{27})')
APIKEY_RE = re.compile(r'(AKIA[0-9A-Z]{16})|([A-Za-z0-9]{32,})')

# Configuration
DB_PATH = "events.db"
LLM_ANALYZER_URL = "http://127.0.0.1:5001/analyze"  # We'll implement this later
ALERT_THRESHOLD = 6
REDACT_THRESHOLD = 8
BLOCK_THRESHOLD = 10

In [68]:
def init_db():
    """Initialize SQLite database for storing security events"""
    conn = sqlite3.connect(DB_PATH)
    cur = conn.cursor()
    cur.execute('''CREATE TABLE IF NOT EXISTS events
                   (ts REAL, id TEXT, host TEXT, path TEXT, direction TEXT, 
                    severity INTEGER, tags TEXT, decision TEXT, reason TEXT)''')
    conn.commit()
    conn.close()

# Initialize the database
init_db()

In [69]:
def store_event(event):
    """Store a security event in the database"""
    conn = sqlite3.connect(DB_PATH)
    cur = conn.cursor()
    cur.execute("INSERT INTO events VALUES (?,?,?,?,?,?,?,?,?)",
                (event["ts"], event["id"], event["host"], event["path"], 
                 event["direction"], event["severity"], json.dumps(event["tags"]), 
                 event["decision"], event["reason"]))
    conn.commit()
    conn.close()

def redact_text(text, tags):
    """Redact sensitive information from text based on tags"""
    text = TOKEN_RE.sub("[REDACTED_TOKEN]", text)
    text = APIKEY_RE.sub("[REDACTED_KEY]", text)
    if "email_in_message" in tags or "email_sensitive" in tags:
        text = re.sub(r'[\w\.-]+@[\w\.-]+', "[REDACTED_EMAIL]", text)
    return text

In [70]:
def analyze_payload(payload):
    """
    Simple heuristic analyzer (to be replaced with LLM later)
    Returns severity score, tags, and decision
    """
    body = payload.get("body", "")
    headers = payload.get("headers", {})
    tags = []
    severity = 0
    
    # Check for tokens in body
    if TOKEN_RE.search(body):
        tags.append("token_in_message")
        severity += 8
    
    # Check for tokens in headers
    auth_header = headers.get("authorization", "")
    if TOKEN_RE.search(auth_header):
        tags.append("token_in_header")
        severity += 6
    
    # Check for API keys
    if APIKEY_RE.search(body):
        tags.append("api_key_pattern")
        severity += 5
    
    # Check for emails
    if re.search(r'[\w\.-]+@[\w\.-]+', body):
        tags.append("email_in_message")
        severity += 2
    
    # Clamp severity
    severity = min(max(severity, 0), 10)
    
    # Determine decision
    if severity >= BLOCK_THRESHOLD:
        decision = "block"
    elif severity >= REDACT_THRESHOLD:
        decision = "redact"
    elif severity >= ALERT_THRESHOLD:
        decision = "alert"
    else:
        decision = "allow"
    
    reason = f"Found {len(tags)} potential issues. Severity: {severity}"
    
    return {
        "severity": severity,
        "tags": tags,
        "decision": decision,
        "reason": reason,
        "explain": f"Analysis found {', '.join(tags)} with severity {severity}"
    }

In [71]:
import os
from dotenv import load_dotenv
import google.generativeai as genai

# Load environment variables and configure Gemini
load_dotenv()
genai.configure(api_key=os.getenv("GOOGLE_API_KEY"))
model = genai.GenerativeModel('gemini-pro')

async def analyze_with_llm(http_packet):
    """
    Analyze HTTP packet content using Gemini LLM
    
    Args:
        http_packet (HTTPPacket): The packet to analyze
    Returns:
        dict: Analysis results including severity and recommendations
    """
    try:
        # Construct a detailed prompt for the LLM
        prompt = f"""Analyze this HTTP packet for security implications and data leakage.
        
HTTP Request Details:
Method: {http_packet.method}
URI: {http_packet.uri}
Source IP: {http_packet.metadata.src_ip}
Destination IP: {http_packet.metadata.dst_ip}
Headers: {json.dumps(http_packet.headers, indent=2)}
Body: {http_packet.body if http_packet.body else 'No body content'}

Analyze for:
1. Sensitive data exposure (API keys, tokens, credentials)
2. Security vulnerabilities
3. Suspicious patterns or behavior
4. Data privacy concerns

Return your analysis in this JSON format:
{{
    "severity": <number 0-10>,
    "risk_level": <"low"|"medium"|"high"|"critical">,
    "findings": [<list of specific security findings>],
    "recommendations": [<list of recommendations>],
    "sensitive_data_detected": <boolean>,
    "explanation": <detailed explanation>
}}"""

        # Get LLM analysis
        response = model.generate_content(
            prompt,
            generation_config={
                "temperature": 0.3,
                "top_p": 0.8,
                "top_k": 40,
                "max_output_tokens": 1024,
            }
        )
        
        # Parse response
        try:
            analysis = json.loads(response.text)
            return analysis
        except json.JSONDecodeError:
            return {
                "severity": 5,
                "risk_level": "medium",
                "findings": ["Error parsing LLM response"],
                "recommendations": ["Manual review required"],
                "sensitive_data_detected": False,
                "explanation": "Failed to parse LLM analysis"
            }
            
    except Exception as e:
        print(f"Error in LLM analysis: {str(e)}")
        return None

In [72]:
# Start live packet capture and analysis
async def start_live_capture(interface="any", duration=None):
    """
    Start live packet capture with LLM-powered analysis
    
    Args:
        interface (str): Network interface to capture on
        duration (int): Capture duration in seconds (None for continuous)
    """
    analyzer = EnhancedPacketAnalyzer(interface=interface)
    
    print(f"üöÄ Starting live packet capture on interface: {interface}")
    print(f"‚è±Ô∏è  Duration: {'Continuous' if duration is None else f'{duration} seconds'}")
    print("\nüìù Analysis Configuration:")
    print("- Capturing: HTTP traffic")
    print("- Analysis: Real-time LLM-powered inspection")
    print("- Detection: Sensitive data and security risks")
    print("- Logging: High-risk events to SQLite")
    print("\n‚ö° Live Capture Feed:")
    
    try:
        await analyzer.start_capture(duration=duration)
    except KeyboardInterrupt:
        print("\n\n‚õî Capture stopped by user")
    except Exception as e:
        print(f"\n‚ùå Error during capture: {str(e)}")
    finally:
        print("\nüìä Final Statistics:")
        analyzer.print_statistics()

# To start capturing:
# asyncio.run(start_live_capture(duration=60))  # Capture for 60 seconds
# or
# asyncio.run(start_live_capture())  # Capture continuously until Ctrl+C

# Live HTTP Packet Analysis

This section implements real-time HTTP packet capture and analysis. The system will:
1. Capture live HTTP traffic on the specified network interface
2. Extract and analyze packet payloads
3. Detect sensitive information in real-time
4. Log suspicious activities to the database

Important Notes:
- Requires administrator/root privileges for packet capture
- Wireshark/TShark must be installed
- Use responsibly and only on networks you own/have permission to monitor

In [73]:
# Example usage
async def run_enhanced_capture(duration=60):
    """Run enhanced packet capture for specified duration"""
    analyzer = EnhancedPacketAnalyzer()
    
    try:
        # Start capture
        print(f"Starting enhanced packet capture for {duration} seconds...")
        await analyzer.start_capture(duration=duration)
        
    except Exception as e:
        print(f"Error during capture: {str(e)}")
    finally:
        # Print statistics
        analyzer.print_statistics()
        
        # Print detailed info for last 5 suspicious packets
        suspicious_packets = [p for p in analyzer.packets 
                            if p.body and analyze_payload({
                                "body": p.body,
                                "headers": p.headers
                            })["decision"] != "allow"]
        
        if suspicious_packets:
            print("\nüîç Last Suspicious Packets Details:")
            for packet in suspicious_packets[-5:]:
                print("\nPacket Details:")
                pprint(packet.to_dict())
                print("-" * 50)

# To run the capture:
# asyncio.run(run_enhanced_capture())

In [74]:
class EnhancedPacketAnalyzer:
    def __init__(self, interface="any"):
        self.interface = interface
        self.capture = None
        self.packets = []
        self.stats = {
            "total_packets": 0,
            "http_packets": 0,
            "get_requests": 0,
            "post_requests": 0,
            "high_risk_packets": 0,
            "sensitive_data_detected": 0
        }
        
    async def start_capture(self, duration=None):
        """Start capturing packets with optional duration"""
        # Create capture with custom display filter for HTTP
        self.capture = pyshark.LiveCapture(
            interface=self.interface,
            display_filter='http'  # Focus on HTTP traffic
        )
        
        print(f"Starting packet capture on interface: {self.interface}")
        print(f"Duration: {'‚àû' if duration is None else f'{duration}s'}")
        
        try:
            # Start packet processing
            if duration:
                self.capture.sniff(timeout=duration)
            else:
                self.capture.sniff_continuously()
                
            for packet in self.capture:
                await self.process_packet(packet)
                
        except KeyboardInterrupt:
            print("\n‚õî Stopping packet capture...")
        finally:
            if self.capture:
                self.capture.close()
                
    async def process_packet(self, packet):
        """Process and analyze a single packet"""
        try:
            # Create HTTPPacket object
            http_packet = HTTPPacket(packet)
            self.packets.append(http_packet)
            
            # Update basic statistics
            self.stats["total_packets"] += 1
            if http_packet.http:
                self.stats["http_packets"] += 1
                if http_packet.method == "GET":
                    self.stats["get_requests"] += 1
                elif http_packet.method == "POST":
                    self.stats["post_requests"] += 1
            
            # Perform LLM analysis
            if http_packet.body or http_packet.headers:
                llm_analysis = await analyze_with_llm(http_packet)
                if llm_analysis:
                    if llm_analysis["risk_level"] in ["high", "critical"]:
                        self.stats["high_risk_packets"] += 1
                    if llm_analysis["sensitive_data_detected"]:
                        self.stats["sensitive_data_detected"] += 1
                    await self.handle_analysis_results(http_packet, llm_analysis)
            
        except Exception as e:
            print(f"Error processing packet: {str(e)}")
            
    async def handle_analysis_results(self, packet: HTTPPacket, analysis: dict):
        """Handle analysis results and alert on significant findings"""
        if analysis["risk_level"] in ["high", "critical"]:
            print("\nüö® High Risk Packet Detected!")
            print("=" * 60)
            print(f"Time: {packet.metadata.timestamp}")
            print(f"Risk Level: {analysis['risk_level'].upper()}")
            print(f"Severity Score: {analysis['severity']}/10")
            print(f"\nSource: {packet.metadata.src_ip}:{packet.metadata.src_port}")
            print(f"Destination: {packet.metadata.dst_ip}:{packet.metadata.dst_port}")
            print(f"Method: {packet.method}")
            print(f"URI: {packet.uri}")
            print("\nFindings:")
            for finding in analysis["findings"]:
                print(f"‚Ä¢ {finding}")
            print("\nRecommendations:")
            for rec in analysis["recommendations"]:
                print(f"‚Ä¢ {rec}")
            print("=" * 60)
            
            # Store event in database
            event = {
                "ts": packet.metadata.timestamp.timestamp(),
                "id": str(uuid.uuid4()),
                "host": packet.metadata.dst_ip,
                "path": packet.uri,
                "direction": "request" if packet.method else "response",
                "severity": analysis["severity"],
                "tags": json.dumps(analysis["findings"]),
                "decision": analysis["risk_level"],
                "reason": analysis["explanation"]
            }
            store_event(event)
        
    def print_statistics(self):
        """Print capture statistics"""
        print("\nüìä Capture Statistics")
        print("=" * 40)
        print(f"Total Packets: {self.stats['total_packets']}")
        print(f"HTTP Packets: {self.stats['http_packets']}")
        print(f"GET Requests: {self.stats['get_requests']}")
        print(f"POST Requests: {self.stats['post_requests']}")
        print(f"High Risk Packets: {self.stats['high_risk_packets']}")
        print(f"Sensitive Data Detected: {self.stats['sensitive_data_detected']}")
        print("=" * 40)

In [75]:
from typing import Dict, Any
import pyshark
import asyncio
from datetime import datetime
import json
from pprint import pprint

class PacketMetadata:
    def __init__(self, packet):
        self.timestamp = datetime.fromtimestamp(float(packet.sniff_timestamp))
        self.src_ip = packet.ip.src if hasattr(packet, 'ip') else None
        self.dst_ip = packet.ip.dst if hasattr(packet, 'ip') else None
        self.protocol = packet.highest_layer
        self.length = packet.length
        self.src_port = packet[packet.transport_layer].srcport if hasattr(packet, 'transport_layer') else None
        self.dst_port = packet[packet.transport_layer].dstport if hasattr(packet, 'transport_layer') else None
        
    def to_dict(self) -> Dict[str, Any]:
        return {
            "timestamp": self.timestamp.isoformat(),
            "src_ip": self.src_ip,
            "dst_ip": self.dst_ip,
            "protocol": self.protocol,
            "length": self.length,
            "src_port": self.src_port,
            "dst_port": self.dst_port
        }

class HTTPPacket:
    def __init__(self, packet):
        self.metadata = PacketMetadata(packet)
        self.http = packet.http if hasattr(packet, 'http') else None
        self.headers = {}
        self.body = None
        self.method = None
        self.uri = None
        self.response_code = None
        self._parse_http_layer(packet)
        
    def _parse_http_layer(self, packet):
        if not self.http:
            return
            
        # Extract HTTP headers
        for field in dir(packet.http):
            if field.startswith(('request_', 'response_')):
                header_name = field.replace('request_', '').replace('response_', '')
                self.headers[header_name] = getattr(packet.http, field)
        
        # Extract method, URI, and response code
        self.method = getattr(packet.http, "request_method", None)
        self.uri = getattr(packet.http, "request_uri", None)
        self.response_code = getattr(packet.http, "response_code", None)
        
        # Extract body if available
        if hasattr(packet.http, 'file_data'):
            self.body = packet.http.file_data
            
    def to_dict(self) -> Dict[str, Any]:
        return {
            "metadata": self.metadata.to_dict(),
            "http_info": {
                "method": self.method,
                "uri": self.uri,
                "response_code": self.response_code,
                "headers": self.headers,
                "body": self.body
            }
        }

In [76]:
# Test packet capture
async def run_packet_capture(duration=30):
    """Run packet capture for specified duration (seconds)"""
    analyzer = PacketAnalyzer()
    
    try:
        # Create task for packet capture
        capture_task = asyncio.create_task(analyzer.start_capture())
        
        # Run for specified duration
        print(f"Running packet capture for {duration} seconds...")
        await asyncio.sleep(duration)
        
        # Stop capture
        if analyzer.capture:
            analyzer.capture.close()
        capture_task.cancel()
        
    except asyncio.CancelledError:
        print("Packet capture stopped")
    except Exception as e:
        print(f"Error during packet capture: {str(e)}")

# Run the capture (uncomment to test)
# asyncio.run(run_packet_capture())

In [77]:
import pyshark
import asyncio
from datetime import datetime

class PacketAnalyzer:
    def __init__(self, interface="any"):
        self.interface = interface
        self.capture = None
        
    async def start_capture(self):
        """Start capturing packets on specified interface"""
        # Create capture with custom display filter
        self.capture = pyshark.LiveCapture(
            interface=self.interface,
            display_filter='http or tls'  # Only capture HTTP/HTTPS traffic
        )
        
        print(f"Starting packet capture on interface: {self.interface}")
        print("Analyzing packets for sensitive information...")
        
        try:
            # Process each packet as it arrives
            for packet in self.capture.sniff_continuously():
                await self.analyze_packet(packet)
        except KeyboardInterrupt:
            print("\nStopping packet capture...")
        finally:
            if self.capture:
                self.capture.close()
    
    async def analyze_packet(self, packet):
        """Analyze a single packet for sensitive information"""
        try:
            # Extract basic packet info
            timestamp = datetime.fromtimestamp(float(packet.sniff_timestamp))
            protocol = packet.transport_layer
            src_addr = packet.ip.src
            dst_addr = packet.ip.dst
            
            # Initialize payload data
            payload = {
                "id": str(uuid.uuid4()),
                "ts": timestamp.timestamp(),
                "direction": "request",  # or "response" based on port numbers
                "host": dst_addr,
                "path": "",
                "method": "",
                "headers": {},
                "body": "",
                "source": "packet_capture"
            }
            
            # Extract HTTP-specific information if available
            if hasattr(packet, 'http'):
                payload["method"] = getattr(packet.http, "request_method", "")
                payload["path"] = getattr(packet.http, "request_uri", "")
                
                # Extract headers
                for field in dir(packet.http):
                    if field.startswith(('request_', 'response_')):
                        header_name = field.replace('request_', '').replace('response_', '')
                        payload["headers"][header_name] = getattr(packet.http, field)
                
                # Get HTTP payload if available
                if hasattr(packet.http, 'file_data'):
                    payload["body"] = packet.http.file_data
            
            # Analyze the payload using our existing analyzer
            result = analyze_payload(payload)
            
            # Log if suspicious
            if result["decision"] != "allow":
                event = {
                    "ts": payload["ts"],
                    "id": payload["id"],
                    "host": payload["host"],
                    "path": payload["path"],
                    "direction": payload["direction"],
                    "severity": result["severity"],
                    "tags": result["tags"],
                    "decision": result["decision"],
                    "reason": result["reason"]
                }
                store_event(event)
                print(f"\nAlert - {result['decision'].upper()}: {result['reason']}")
                print(f"Host: {payload['host']}")
                print(f"Path: {payload['path']}")
                print(f"Severity: {result['severity']}")
                print("-" * 80)
                
        except Exception as e:
            print(f"Error analyzing packet: {str(e)}")

# Network Packet Capture and Analysis

Let's implement real-time packet capture and analysis using pyshark. This will allow us to:
1. Capture HTTP/HTTPS packets
2. Extract payload data
3. Analyze packets for sensitive information
4. Log suspicious packets to our database

In [80]:
# Run the complete HTTP packet analyzer
print("üîÑ Initializing HTTP Packet Analyzer...")

# Initialize database
print("üì¶ Setting up database...")
init_db()

# Configure environment
print("üîë Loading configuration...")
load_dotenv()
if not os.getenv("GOOGLE_API_KEY"):
    print("‚ùå GOOGLE_API_KEY not found in .env file")
    print("Please create a .env file with: GOOGLE_API_KEY=your-api-key")
else:
    # Configure Gemini
    print("ü§ñ Initializing LLM...")
    genai.configure(api_key=os.getenv("GOOGLE_API_KEY"))
    
    print("\n‚úÖ Setup complete!")
    print("üìù To start packet capture, run:")
    print("   await start_live_capture()")
    print("   or")
    print("   asyncio.run(start_live_capture())")
    print("\n‚ÑπÔ∏è  Make sure to run with administrator privileges!")
    print("‚ÑπÔ∏è  Wireshark/TShark must be installed for packet capture to work")

üîÑ Initializing HTTP Packet Analyzer...
üì¶ Setting up database...
üîë Loading configuration...
ü§ñ Initializing LLM...

‚úÖ Setup complete!
üìù To start packet capture, run:
   await start_live_capture()
   or
   asyncio.run(start_live_capture())

‚ÑπÔ∏è  Make sure to run with administrator privileges!
‚ÑπÔ∏è  Wireshark/TShark must be installed for packet capture to work


# Running with Administrator Privileges

Since packet capture requires admin privileges, here are your options:

## Option 1: Run Jupyter as Administrator
1. **Close this notebook**
2. **Right-click on Command Prompt** ‚Üí "Run as administrator" 
3. **Navigate to your project folder**: `cd C:\Users\Shiva\Code\Hack36`
4. **Start Jupyter**: `jupyter notebook` or `jupyter lab`
5. **Reopen this notebook**

## Option 2: Use a Python Script Instead
Save the code to a `.py` file and run it as admin:

```bash
# Right-click Command Prompt ‚Üí Run as administrator
cd C:\Users\Shiva\Code\Hack36
python packet_analyzer.py
```

## Option 3: Test Without Real Packets
For development/testing, you can simulate packet analysis without actual capture.

In [81]:
# Convert notebook to Python script for admin execution
import subprocess
import os

def convert_notebook_to_script():
    """Convert this notebook to a standalone Python script"""
    try:
        # Convert notebook to Python script
        result = subprocess.run([
            'jupyter', 'nbconvert', 
            '--to', 'python', 
            '--output', 'packet_analyzer',
            'initial_testing.ipynb'
        ], capture_output=True, text=True, cwd=os.getcwd())
        
        if result.returncode == 0:
            print("‚úÖ Successfully converted notebook to packet_analyzer.py")
            print("\nüìù To run with admin privileges:")
            print("1. Right-click Command Prompt ‚Üí 'Run as administrator'")
            print("2. cd C:\\Users\\Shiva\\Code\\Hack36")
            print("3. python packet_analyzer.py")
            print("\nüí° The script will automatically run the packet capture!")
        else:
            print("‚ùå Conversion failed:")
            print(result.stderr)
            
    except FileNotFoundError:
        print("‚ùå jupyter command not found. Make sure Jupyter is installed.")
    except Exception as e:
        print(f"‚ùå Error: {str(e)}")

# Run the conversion
convert_notebook_to_script()

‚úÖ Successfully converted notebook to packet_analyzer.py

üìù To run with admin privileges:
1. Right-click Command Prompt ‚Üí 'Run as administrator'
2. cd C:\Users\Shiva\Code\Hack36
3. python packet_analyzer.py

üí° The script will automatically run the packet capture!
