# Setup and Imports

In [3]:
# Install required packages
!pip install -q torch transformers scikit-learn seaborn matplotlib nltk tqdm gitpython gdown

# Import libraries
import nltk
nltk.download('punkt', quiet=True)
nltk.download('wordnet', quiet=True)
import os, random, datetime, json, re, hashlib
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import torch
import nltk
from nltk.corpus import wordnet
from tqdm.notebook import tqdm
from torch.utils.data import DataLoader
import torch.nn.functional as F
# SK-learn & stats
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.inspection import permutation_importance
from sklearn.metrics import (accuracy_score, precision_score, recall_score,
                             f1_score, roc_auc_score, confusion_matrix)
import scipy.stats as stats

# Transformers
from transformers import (AutoTokenizer, AutoModelForSequenceClassification,
                          TrainingArguments, Trainer)

# Device & seeds
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print("Using device:", device)

def set_seed(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if device.type=='cuda':
        torch.cuda.manual_seed_all(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)

set_seed(42)

# Make sure results dirs exist
for d in ['data/raw','data/processed','logs','models','results']:
    os.makedirs(d, exist_ok=True)

[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m363.4/363.4 MB[0m [31m4.7 MB/s[0m eta [36m0:00:00[0m0:00:01[0m00:01[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m664.8/664.8 MB[0m [31m2.5 MB/s[0m eta [36m0:00:00[0m0:00:01[0m00:01[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m211.5/211.5 MB[0m [31m8.0 MB/s[0m eta [36m0:00:00[0m0:00:01[0m00:01[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m56.3/56.3 MB[0m [31m30.6 MB/s[0m eta [36m0:00:00[0m:00:01[0m00:01[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m127.9/127.9 MB[0m [31m5.6 MB/s[0m eta [36m0:00:00[0m0:00:01[0m00:01[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m207.5/207.5 MB[0m [31m7.9 MB/s[0m eta [36m0:00:00[0m0:00:01[0m00:01[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m21.1/21.1 MB[0m [31m82.6 MB/s[0m eta [36m0:00:00[0m:00:01[0m00:01[0m
[?25h

2025-05-15 08:26:39.747331: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1747297599.980936      35 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1747297600.044721      35 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered


Using device: cuda


# Data Pipeline Functions

In [4]:
# Function to download and extract datasets
def download_datasets():
    """Download and extract real-world phishing and legitimate email corpora"""
    
    # Enron email dataset (legitimate emails)
    print("Downloading Enron dataset...")
    enron_url = f'https://www.cs.cmu.edu/~./enron/enron_mail_20150507.tar.gz'
    !wget -P data/raw {enron_url} -q --show-progress
    !tar -xzf data/raw/enron_mail_20150507.tar.gz -C data/raw
    
    # # Nazario phishing corpus
    # print("Downloading Nazario phishing corpus...")
    # nazario_url = f"https://monkey.org/%7Ejose/phishing/phishing-corpus-2023.zip"
    # !wget -P data/raw {nazario_url} -q --show-progress
    # !unzip -q data/raw/phishing-corpus-2023.zip -d data/raw/phishing
    
    # Download PhishTank dataset (URLs and metadata only)
    print("Downloading PhishTank dataset (for URL analysis only)...")
    phishtank_url = f'https://data.phishtank.com/data/online-valid.csv'
    !wget -P data/raw {phishtank_url} -q --show-progress
    
    # Process PhishTank URLs into template-based emails
    print("Converting PhishTank URLs to synthetic phishing emails...")
    process_phishtank_to_emails()
    
    # Log dataset metadata
    with open('data/dataset_metadata.json', 'w') as f:
        metadata = {
            'download_date': datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
            'enron_source': enron_url,
            'phishtank_source': phishtank_url
        }
        json.dump(metadata, f, indent=2)
    
    print("All datasets downloaded successfully!")

def process_phishtank_to_emails(output_dir='data/raw/phishing/phishtank'):
    """Convert PhishTank URLs into synthetic phishing emails"""
    os.makedirs(output_dir, exist_ok=True)
    
    # Check if PhishTank CSV exists
    phishtank_csv = 'data/raw/online-valid.csv'
    if not os.path.exists(phishtank_csv):
        print("PhishTank CSV not found, skipping URL processing")
        return
    
    try:
        # Load PhishTank data
        phishtank_df = pd.read_csv(phishtank_csv)
        print(f"Loaded {len(phishtank_df)} PhishTank URLs")
        
        # Define email templates to use with the PhishTank URLs
        templates = [
            "Dear Customer,\n\nWe have detected suspicious activity on your account. Please verify your identity by clicking on the following link:\n{url}\n\nThis is urgent and requires your immediate attention.\n\nThank you,\nSecurity Team",
            "URGENT: Your account has been temporarily locked\n\nWe've noticed unusual login attempts on your account. To restore access, please verify your identity at:\n{url}\n\nIf you do not verify within 24 hours, your account will be permanently suspended.\n\nSecurity Department",
            "Security Alert: Please verify your information\n\nWe need to verify your account information to continue providing our services. Click here to update your details:\n{url}\n\nThis is a mandatory security measure.\n\nCustomer Support",
            "Your payment information needs to be updated\n\nWe were unable to process your recent payment. Please update your billing information as soon as possible:\n{url}\n\nYour service will be interrupted if this is not resolved.\n\nBilling Department",   
            "Important notification regarding your account\n\nThere is an important message regarding your account that requires your attention. Please review it at:\n{url}\n\nThis is time-sensitive information.\n\nAccount Services"
        ]
        
        num_urls = min(500, len(phishtank_df))
        sampled_urls = phishtank_df.sample(num_urls, random_state=42)
        
        # Create synthetic emails using PhishTank URLs
        for i, row in enumerate(sampled_urls.itertuples()):
            # Get the phishing URL
            phish_url = row.url if hasattr(row, 'url') else row[1]  # Adapt to column name
            
            template = random.choice(templates)
            
            email_content = template.format(url=phish_url)
            
            # Save to file
            filename = f"{output_dir}/phishtank_{i:04d}.txt"
            with open(filename, 'w') as f:
                f.write(email_content)
                
        print(f"Created {num_urls} synthetic phishing emails from PhishTank URLs")
        
    except Exception as e:
        print(f"Error processing PhishTank data: {e}")
        print("Continuing with other data sources...")

# Function to preprocess emails
def preprocess_emails():
    """Preprocess raw emails: strip headers, normalize URLs, remove duplicates"""
    
    def extract_email_body(email_text):
        """Extract body from email by removing headers"""
        if not email_text:
            return ""
            
        lines = email_text.split('\n')
        body_start = 0
        
        # Find where headers end and body begins
        for i, line in enumerate(lines):
            if line.strip() == '':
                body_start = i + 1
                break
        
        body = '\n'.join(lines[body_start:])
        return body
    
    def normalize_urls(text):
        """Normalize URLs in text to avoid detection based on specific URLs"""
        if not text:
            return ""
            
        # Simple URL normalization - replace actual domains with placeholders
        url_pattern = r'https?://(?:[-\w.]|(?:%[\da-fA-F]{2}))+'
        return re.sub(url_pattern, '[URL]', text)
    
    # Process legitimate emails (Enron)
    print("Processing legitimate emails...")
    legitimate_emails = []
    legitimate_dir = 'data/raw/maildir'
    
    # Sample a subset of Enron folders to keep dataset size manageable
    user_folders = os.listdir(legitimate_dir)
    sampled_users = random.sample(user_folders, min(20, len(user_folders)))
    
    for user in tqdm(sampled_users):
        user_path = os.path.join(legitimate_dir, user)
        if not os.path.isdir(user_path):
            continue
            
        # Walk through user's email folders
        for root, _, files in os.walk(user_path):
            for file in files:
                if len(legitimate_emails) >= 5000:  # Limit the number of legitimate emails
                    break
                    
                try:
                    with open(os.path.join(root, file), 'r', encoding='latin1') as f:
                        email_text = f.read()
                    
                    # Extract body and normalize
                    body = extract_email_body(email_text)
                    body = normalize_urls(body)
                    
                    if len(body.strip()) > 100:  # Filter out very short emails
                        legitimate_emails.append({'text': body, 'label': 0})
                except Exception as e:
                    continue  # Skip problematic files
    
    # # Process phishing emails (Nazario)
    # print("Processing phishing emails...")
    phishing_emails = []
    phishing_dir = 'data/raw/phishing'
    
    # Walk through phishing corpus folders
    for root, _, files in os.walk(phishing_dir):
        for file in files:
            if file.endswith('.txt') or file.endswith('.eml'):
                try:
                    with open(os.path.join(root, file), 'r', encoding='latin1') as f:
                        email_text = f.read()
                    
                    # Extract body and normalize
                    body = extract_email_body(email_text)
                    body = normalize_urls(body)
                    
                    if len(body.strip()) > 100:  # Filter out very short emails
                        phishing_emails.append({'text': body, 'label': 1})
                except Exception as e:
                    continue  # Skip problematic files
    
    # Remove duplicates
    print("Removing duplicates...")
    
    def get_email_hash(email):
        """Create a hash of email text to identify duplicates"""
        return hashlib.md5(email['text'].encode()).hexdigest()
    
    unique_emails = {}
    
    for email in legitimate_emails + phishing_emails:
        email_hash = get_email_hash(email)
        if email_hash not in unique_emails:
            unique_emails[email_hash] = email
    
    # Convert to dataframe
    df = pd.DataFrame(list(unique_emails.values()))
    
    # Balance dataset if needed
    min_class_count = min(sum(df['label'] == 0), sum(df['label'] == 1))
    legitimate_df = df[df['label'] == 0].sample(min_class_count, random_state=42)
    phishing_df = df[df['label'] == 1].sample(min_class_count, random_state=42)
    df_balanced = pd.concat([legitimate_df, phishing_df]).sample(frac=1, random_state=42).reset_index(drop=True)
    
    # Generate additional synthetic phishing with GPT-2
    if len(phishing_df) < 1500:
        gpt_samples_needed = 1500 - len(phishing_df)
        gpt_phishing = generate_gpt2_phishing(gpt_samples_needed)
        gpt_df = pd.DataFrame(gpt_phishing)
        df_balanced = pd.concat([df_balanced, gpt_df]).sample(frac=1, random_state=42).reset_index(drop=True)
    
    # Split into train/val/test
    train_df, temp_df = train_test_split(df_balanced, test_size=0.3, stratify=df_balanced['label'], random_state=42)
    val_df, test_df = train_test_split(temp_df, test_size=0.5, stratify=temp_df['label'], random_state=42)
    
    # Save processed datasets
    train_df.to_csv('data/processed/train.csv', index=False)
    val_df.to_csv('data/processed/val.csv', index=False)
    test_df.to_csv('data/processed/test.csv', index=False)
    
    print(f"Dataset processed and split successfully!")
    print(f"Train: {len(train_df)} samples")
    print(f"Validation: {len(val_df)} samples")
    print(f"Test: {len(test_df)} samples")
    
    return train_df, val_df, test_df

# Generate synthetic phishing emails with GPT-2
def generate_gpt2_phishing(count=1500):
    """Generate synthetic phishing emails using fine-tuned GPT-2"""
    try:
        from transformers import GPT2LMHeadModel, GPT2Tokenizer
        
        print(f"Generating {count} synthetic phishing emails with GPT-2...")
        
        # Load pre-trained GPT-2 model and tokenizer
        tokenizer = GPT2Tokenizer.from_pretrained('gpt2')
        model = GPT2LMHeadModel.from_pretrained('gpt2')
        model.to(device)
        
        # Set seed for reproducibility
        set_seed(42)
        
        # Phishing email prompts to guide generation
        prompts = [
            "Dear Customer, We have detected suspicious activity on your account.",
            "URGENT: Your account has been temporarily suspended due to",
            "Security Alert: Your password needs to be reset immediately.",
            "Attention: Please verify your information to avoid service interruption.",
            "Important update regarding your account security:"
        ]
        
        synthetic_phishing = []
        
        for i in range(count):
            prompt = random.choice(prompts)
            
            # Encode prompt
            input_ids = tokenizer.encode(prompt, return_tensors='pt').to(device)
            
            # Generate text
            output = model.generate(
                input_ids,
                max_length=150,
                temperature=0.9,
                top_p=0.92,
                top_k=50,
                do_sample=True,
                pad_token_id=tokenizer.eos_token_id
            )
            
            # Decode and clean text
            text = tokenizer.decode(output[0], skip_special_tokens=True)
            
            # Add to collection
            synthetic_phishing.append({
                "text": text,
                "label": 1,  # 1 indicates phishing
                "synthetic": True
            })
            
            # Show progress
            if (i + 1) % 100 == 0:
                print(f"Generated {i + 1}/{count} synthetic emails")
        
        return synthetic_phishing
    except Exception as e:
        print(f"Error generating GPT-2 emails: {e}")
        # Fall back to template generation
        template_gen = TemplateGenerator()
        return template_gen.generate(count)

def download_preprocessed_data(n_samples=1000, download_nltk=True):
    """Generate synthetic dataset for demonstration"""
    print("Creating synthetic dataset for demonstration...")
    
    # Download NLTK resources for text generation if needed
    if download_nltk:
        try:
            nltk.download('punkt', quiet=True)
            nltk.download('wordnet', quiet=True)
            try:
                nltk.download('omw-1.4', quiet=True)
            except:
                pass  # Optional resource, ok if it fails
        except Exception as e:
            print(f"Warning: Could not download NLTK resources: {e}")
            print("Continuing with generation anyway...")
    
    # Create synthetic legitimate emails
    legitimate_templates = [
        "Dear {name}, I hope this email finds you well. Just wanted to follow up on our meeting last week. Let's schedule a call to discuss the project progress. Best regards, {sender}",
        "Hello {name}, Please find attached the quarterly report you requested. Let me know if you need anything else. Thanks, {sender}",
        "Hi team, Reminder about our weekly standup tomorrow at 10AM. Please update your progress on the Jira board. Thanks, {sender}",
        "Good morning {name}, Just checking in regarding the proposal we submitted last month. Do you have any feedback for us? Best, {sender}",
        "Dear colleagues, Please note that the office will be closed next Monday for the holiday. All deadlines remain unchanged. Regards, HR"
    ]
    
    # Create synthetic phishing emails
    phishing_templates = [
        "URGENT: Your account has been compromised. Click here to reset your password immediately: {url}",
        "Dear valued customer, We've noticed suspicious activity on your account. Please verify your identity by clicking this link: {url}",
        "Congratulations! You've won a $1000 Amazon gift card. Claim now at: {url}",
        "Your payment was declined. Update your billing information here: {url} to avoid service interruption.",
        "Security alert: Unusual login detected. If this wasn't you, secure your account immediately: {url}"
    ]
    
    names = ["John", "Mary", "Robert", "Patricia", "Michael", "Jennifer", "William", "Linda", "David", "Elizabeth"]
    senders = ["Mark", "Sarah", "Alex", "Rachel", "Tom", "Emma", "James", "Sophie", "Daniel", "Olivia"]
    urls = ["http://secure-login.com/verify", "http://account-verification.net/secure", 
            "http://amazon-rewards.com/claim", "http://banking-update.com/renew",
            "http://security-check.org/protect"]
    
    def generate_emails(templates, count, label, is_phishing=False):
        """Generate synthetic emails from templates"""
        emails = []
        for _ in range(count):
            template = random.choice(templates)
            if is_phishing:
                email = template.format(
                    name=random.choice(names), 
                    sender=random.choice(senders), 
                    url=random.choice(urls)
                )
            else:
                email = template.format(
                    name=random.choice(names), 
                    sender=random.choice(senders)
                )
            emails.append({"text": email, "label": label})
        return emails
    
    # Generate balanced dataset with controlled randomness
    set_seed(42) 
    legitimate_emails = generate_emails(legitimate_templates, n_samples, 0)
    phishing_emails = generate_emails(phishing_templates, n_samples, 1, is_phishing=True)
    
    all_emails = legitimate_emails + phishing_emails
    random.shuffle(all_emails)
    
    df = pd.DataFrame(all_emails)
    
    # Split into train/val/test with fixed random state
    train_df, temp_df = train_test_split(df, test_size=0.3, stratify=df['label'], random_state=42)
    val_df, test_df = train_test_split(temp_df, test_size=0.5, stratify=temp_df['label'], random_state=42)
    
    # Save processed datasets
    os.makedirs('data/processed', exist_ok=True)
    train_df.to_csv('data/processed/train.csv', index=False)
    val_df.to_csv('data/processed/val.csv', index=False)
    test_df.to_csv('data/processed/test.csv', index=False)
    
    print(f"Synthetic dataset created and split successfully!")
    print(f"Train: {len(train_df)} samples, Legitimate: {sum(train_df['label'] == 0)}, Phishing: {sum(train_df['label'] == 1)}")
    print(f"Validation: {len(val_df)} samples, Legitimate: {sum(val_df['label'] == 0)}, Phishing: {sum(val_df['label'] == 1)}")
    print(f"Test: {len(test_df)} samples, Legitimate: {sum(test_df['label'] == 0)}, Phishing: {sum(test_df['label'] == 1)}")
    
    return train_df, val_df, test_df


# Main data loading function
def load_real_data():
    """Load real data or process it if needed"""
    # Check if processed data already exists
    if os.path.exists('data/processed/train.csv') and \
       os.path.exists('data/processed/val.csv') and \
       os.path.exists('data/processed/test.csv'):
        # Load existing processed data
        print("Loading existing processed datasets...")
        train_df = pd.read_csv('data/processed/train.csv')
        val_df = pd.read_csv('data/processed/val.csv')
        test_df = pd.read_csv('data/processed/test.csv')
    else:
        # Download and process data
        print("Downloading and processing datasets...")
        try:
            if not os.path.exists('data/raw/maildir'):
                download_datasets()
            train_df, val_df, test_df = preprocess_emails()
        except Exception as e:
            print(f"Error processing real data: {e}")
            print("Falling back to synthetic data...")
            train_df, val_df, test_df = download_preprocessed_data(2000)
    
    print(f"Training set: {len(train_df)} emails ({sum(train_df['label'] == 1)} phishing, {sum(train_df['label'] == 0)} legitimate)")
    print(f"Validation set: {len(val_df)} emails ({sum(val_df['label'] == 1)} phishing, {sum(val_df['label'] == 0)} legitimate)")
    print(f"Test set: {len(test_df)} emails ({sum(test_df['label'] == 1)} phishing, {sum(test_df['label'] == 0)} legitimate)")
    
    return train_df, val_df, test_df

# Template Generator & Perturbation Engine

In [5]:
class TemplateGenerator:
    """Generate phishing emails from templates"""
    
    def __init__(self):
        self.templates = [
            # Amazon templates
            {
                "subject": "Amazon: Action Required - Update Payment Information",
                "body": "Dear Amazon Customer,\n\nWe need you to update your payment information for your recent purchase. "
                        "If you do not update within 24 hours, your order will be canceled.\n\n"
                        "Please click here to update: {url}\n\n"
                        "Amazon Customer Service"
            },
            {
                "subject": "Your Amazon order has shipped",
                "body": "Hello,\n\nYour order #{order_num} has shipped and will be delivered on {date}.\n\n"
                        "However, we noticed an issue with your payment method. Please verify your information: {url}\n\n"
                        "Amazon Shipping Team"
            },
            
            # Banking templates
            {
                "subject": "Important: Your account access has been limited",
                "body": "Dear {bank} Customer,\n\nWe have temporarily limited access to your account due to "
                        "failed login attempts. To restore full access, please verify your identity: {url}\n\n"
                        "Security Department, {bank}"
            },
            {
                "subject": "Security Alert: Unusual Activity Detected",
                "body": "Dear {name},\n\nWe detected unusual activity on your {bank} account on {date}. "
                        "If this was not you, please secure your account immediately: {url}\n\n"
                        "Thank you,\n{bank} Fraud Prevention Team"
            },
            
            # HR/Corporate templates
            {
                "subject": "Urgent: Update your company credentials",
                "body": "Dear {name},\n\nDue to recent security upgrades, all employees are required to update "
                        "their login credentials by end of day. Click here to update: {url}\n\n"
                        "IT Department"
            },
            {
                "subject": "Important: New company policy document",
                "body": "All staff,\n\nA new company policy regarding remote work has been published. "
                        "All employees must read and acknowledge receipt by tomorrow.\n\n"
                        "Download the document here: {url}\n\n"
                        "Human Resources"
            },
            
            # Tax/Government templates
            {
                "subject": "IRS: Tax Refund Notification",
                "body": "Tax Refund Notice #{notice_num}\n\nDear Taxpayer,\n\nAfter the annual calculation of your fiscal activity, "
                        "we have determined that you are eligible for a refund of ${amount}.\n\n"
                        "Submit your refund request here: {url}\n\n"
                        "Internal Revenue Service"
            },
            {
                "subject": "Action Required: Government Stimulus Payment",
                "body": "NOTICE: Economic Impact Payment\n\nYou have qualified for a government stimulus payment of ${amount}. "
                        "To receive your payment, please confirm your information: {url}\n\n"
                        "Department of the Treasury"
            }
        ]
        
        self.names = ["John Smith", "Mary Johnson", "Robert Williams", "Patricia Brown", "Michael Davis"]
        self.banks = ["Chase", "Bank of America", "Wells Fargo", "Citibank", "Capital One"]
        self.dates = ["May 15, 2023", "June 22, 2023", "July 8, 2023", "August 30, 2023"]
        self.amounts = ["1,247.63", "958.29", "2,361.45", "785.12", "1,503.87"]
        self.order_nums = ["A23B7C", "X92Y14", "L67M39", "P45Q81", "R72S05"]
        self.notice_nums = ["CP-1234", "RF-5678", "TX-9012", "IR-3456", "GV-7890"]
        
        # Phishing URLs for templates
        self.urls = [
            "http://amazonn-secure.com/verify",
            "http://security-bankaccess.net/login",
            "http://company-portal.co/document",
            "http://tax-refund-secure.com/claim",
            "http://accountverify-secure.com/auth"
        ]
    
    def generate(self, n_samples=10, use_template_idx=None):
        """Generate phishing emails from templates"""
        generated_emails = []
        
        for _ in range(n_samples):
            if use_template_idx is not None and use_template_idx < len(self.templates):
                template = self.templates[use_template_idx]
            else:
                template = random.choice(self.templates)
            
            # Fill in template placeholders
            body = template["body"].format(
                name=random.choice(self.names),
                bank=random.choice(self.banks),
                date=random.choice(self.dates),
                amount=random.choice(self.amounts),
                order_num=random.choice(self.order_nums),
                notice_num=random.choice(self.notice_nums),
                url=random.choice(self.urls)
            )
            
            generated_emails.append({
                "text": body,
                "label": 1,  # indicates phishing
                "template_id": self.templates.index(template)
            })
        
        return generated_emails

class PerturbationEngine:
    """Apply various perturbation techniques to phishing emails to evade detection"""
    
    def __init__(self):
        # Download NLTK resources if not already available
        nltk.download('wordnet', quiet=True)
        nltk.download('punkt', quiet=True)
        
        # Character-level perturbations
        self.char_perturbations = [
            self._swap_chars,
            self._add_char,
            self._remove_char,
            self._similar_char_replacement
        ]
        
        # Word-level perturbations
        self.word_perturbations = [
            self._synonym_replacement,
            self._word_insertion,
            self._word_deletion
        ]
        
        # Style perturbations
        self.style_perturbations = [
            self._change_case,
            self._add_html_formatting,
            self._add_unicode_chars
        ]
        
        # URL hiding techniques
        self.url_perturbations = [
            self._hide_url_in_text,
            self._url_shortener_simulation,
            self._html_url_obfuscation
        ]
        
        # Similar-looking character mappings
        self.char_map = {
            'a': ['а', '@', '4'],  # Cyrillic 'а' looks like Latin 'a'
            'b': ['b', '6', 'б'],
            'c': ['с', '('],  # Cyrillic 'с' looks like Latin 'c'
            'e': ['е', '3'],  # Cyrillic 'е' looks like Latin 'e'
            'i': ['і', '1', '!'],
            'l': ['l', '1', '|'],
            'o': ['о', '0'],  # Cyrillic 'о' looks like Latin 'o'
            's': ['ѕ', '5', '$'],
            't': ['т', '+'],  # Cyrillic 'т' looks like Latin 't'
            'w': ['vv', 'ѡ'],
            'g': ['g', '9'],
            'r': ['r', 'г'],
            'n': ['n', 'п'],
            'm': ['m', 'м']
        }
    
    def perturb(self, email, technique=None, intensity=0.1):
        """Apply perturbations to an email"""
        email_text = email["text"]
        
        # Choose perturbation technique if not specified
        if technique is None:
            all_techniques = (self.char_perturbations + self.word_perturbations + 
                             self.style_perturbations + self.url_perturbations)
            technique = random.choice(all_techniques)
        
        # Apply selected perturbation
        perturbed_text = technique(email_text, intensity)
        
        # Create perturbed email
        perturbed_email = email.copy()
        perturbed_email["text"] = perturbed_text
        perturbed_email["perturbation"] = technique.__name__
        
        return perturbed_email
    
    def _swap_chars(self, text, intensity):
        """Swap adjacent characters in words"""
        words = text.split()
        new_words = []
        
        for word in words:
            if len(word) <= 1 or random.random() > intensity:
                new_words.append(word)
                continue
                
            # Choose a random position for swapping
            pos = random.randint(0, len(word) - 2)
            chars = list(word)
            chars[pos], chars[pos + 1] = chars[pos + 1], chars[pos]
            new_words.append(''.join(chars))
                
        return ' '.join(new_words)
    
    def _add_char(self, text, intensity):
        """Add extra characters to words"""
        words = text.split()
        new_words = []
        
        for word in words:
            if len(word) == 0 or random.random() > intensity:
                new_words.append(word)
                continue
                
            # Insert a random character at a random position
            pos = random.randint(0, len(word))
            char = random.choice('abcdefghijklmnopqrstuvwxyz')
            new_word = word[:pos] + char + word[pos:]
            new_words.append(new_word)
                
        return ' '.join(new_words)
    
    def _remove_char(self, text, intensity):
        """Remove characters from words"""
        words = text.split()
        new_words = []
        
        for word in words:
            if len(word) <= 1 or random.random() > intensity:
                new_words.append(word)
                continue
                
            # Remove a random character
            pos = random.randint(0, len(word) - 1)
            new_word = word[:pos] + word[pos + 1:]
            new_words.append(new_word)
                
        return ' '.join(new_words)
    
    def _similar_char_replacement(self, text, intensity):
        """Replace characters with similar-looking ones"""
        new_text = ""
        
        for char in text:
            lower_char = char.lower()
            if lower_char in self.char_map and random.random() < intensity:
                replacements = self.char_map[lower_char]
                new_char = random.choice(replacements)
                new_text += new_char
            else:
                new_text += char
                
        return new_text
    
    def _synonym_replacement(self, text, intensity):
        """Replace words with synonyms"""
        words = nltk.word_tokenize(text)
        new_words = []
        
        for word in words:
            if len(word) <= 3 or not word.isalpha() or random.random() > intensity:
                new_words.append(word)
                continue
            
            # Find synonyms using WordNet
            synonyms = []
            for syn in wordnet.synsets(word):
                for lemma in syn.lemmas():
                    if lemma.name() != word.lower():
                        synonyms.append(lemma.name())
            
            if len(synonyms) > 0:
                new_word = random.choice(synonyms).replace('_', ' ')
                new_words.append(new_word)
            else:
                new_words.append(word)
                
        return ' '.join(new_words)
    
    def _word_insertion(self, text, intensity):
        """Insert benign words into text"""
        benign_words = ["please", "kindly", "important", "notification", "information", 
                        "update", "confirm", "verify", "secure", "official"]
        
        words = text.split()
        new_words = []
        
        for word in words:
            new_words.append(word)
            if random.random() < intensity:
                new_words.append(random.choice(benign_words))
                
        return ' '.join(new_words)
    
    def _word_deletion(self, text, intensity):
        """Delete some words from text"""
        words = text.split()
        new_words = []
        
        for word in words:
            if random.random() > intensity:
                new_words.append(word)
                
        return ' '.join(new_words)
    
    def _change_case(self, text, intensity):
        """Change case of words or characters"""
        words = text.split()
        new_words = []
        
        for word in words:
            if random.random() < intensity:
                if random.random() < 0.5:
                    # Change to uppercase
                    new_word = word.upper()
                else:
                    # Randomize capitalization
                    new_word = ''.join([c.upper() if random.random() < 0.5 else c.lower() for c in word])
                new_words.append(new_word)
            else:
                new_words.append(word)
                
        return ' '.join(new_words)
    
    def _add_html_formatting(self, text, intensity):
        """Add simple HTML formatting to text"""
        if random.random() < intensity:
            # Add basic HTML tags
            html_tags = [
                (f"<div style='font-family: Arial;'>{text}</div>"),
                (f"<p>{text}</p>"),
                (f"<span style='color: #000000;'>{text}</span>"),
                (f"<div style='text-align: left;'>{text}</div>"),
                (f"<font face='Arial'>{text}</font>")
            ]
            return random.choice(html_tags)
        return text
    
    def _add_unicode_chars(self, text, intensity):
        """Add invisible or zero-width unicode characters"""
        # Zero-width characters
        zwc = ['\u200B', '\u200C', '\u200D', '\u200E', '\u200F', '\u2060', '\u2061', '\u2062', '\u2063', '\u2064']
        
        if random.random() < intensity:
            # Insert ZWCs at random positions
            chars = list(text)
            num_insertions = max(1, int(len(text) * intensity * 0.1))
            
            for _ in range(num_insertions):
                pos = random.randint(0, len(chars))
                chars.insert(pos, random.choice(zwc))
                
            return ''.join(chars)
        return text
    
    def _hide_url_in_text(self, text, intensity):
        """Replace URLs with text-based hyperlinks"""
        url_pattern = r'https?://(?:[-\w.]|(?:%[\da-fA-F]{2}))+'
        urls = re.findall(url_pattern, text)
        
        if not urls or random.random() > intensity:
            return text
        
        for url in urls:
            mask_texts = [
                "Click here to secure your account",
                "Verify your information here",
                "Access your account",
                "Login now",
                "Secure login portal"
            ]
            mask = random.choice(mask_texts)
            text = text.replace(url, mask)
            
        return text
    
    def _url_shortener_simulation(self, text, intensity):
        """Replace URLs with simulated shortened URLs"""
        url_pattern = r'https?://(?:[-\w.]|(?:%[\da-fA-F]{2}))+'
        urls = re.findall(url_pattern, text)
        
        if not urls or random.random() > intensity:
            return text
        
        for url in urls:
            shorteners = [
                "bit.ly/secure-login",
                "tinyurl.com/account-verify",
                "goo.gl/auth-portal",
                "t.co/secure-access",
                "is.gd/verify-now"
            ]
            shortened_url = random.choice(shorteners)
            text = text.replace(url, shortened_url)
            
        return text
    
    def _html_url_obfuscation(self, text, intensity):
        """Use HTML to obfuscate URLs"""
        url_pattern = r'https?://(?:[-\w.]|(?:%[\da-fA-F]{2}))+'
        urls = re.findall(url_pattern, text)
        
        if not urls or random.random() > intensity:
            return text
        
        for url in urls:
            obfuscation_methods = [
                f"<a href='{url}'>secure verification link</a>",
                f"<a href='{url}' style='color:blue; text-decoration:underline;'>click here</a>",
                f"<a href='{url}'><img src='secure-badge.png' alt='Secure Link'></a>"
            ]
            obfuscated_url = random.choice(obfuscation_methods)
            text = text.replace(url, obfuscated_url)
            
        return text

# RL Attacker Implementation

In [7]:
class RLAttacker:
    """Reinforcement learning-based attacker that learns to evade the defender"""
    
    def __init__(self, perturbation_engine=None):
        """Initialize RL attacker
        
        Args:
            perturbation_engine: Engine for applying text perturbations
        """
        self.perturbation_engine = perturbation_engine or PerturbationEngine()
        
        # State representation
        self.state_dim = 64  # State embedding dimension
        
        # Action space setup
        self.actions = []
        # Character-level actions
        self.actions.extend([(func.__name__, func) for func in self.perturbation_engine.char_perturbations])
        # Word-level actions
        self.actions.extend([(func.__name__, func) for func in self.perturbation_engine.word_perturbations])
        # Style actions
        self.actions.extend([(func.__name__, func) for func in self.perturbation_engine.style_perturbations])
        # URL actions
        self.actions.extend([(func.__name__, func) for func in self.perturbation_engine.url_perturbations])
        
        self.action_dim = len(self.actions)
        
        # Initialize policy network
        self.initialize_model()
        
        # Experience buffer (for training)
        self.buffer = {
            'states': [],
            'actions': [],
            'rewards': [],
            'next_states': [],
            'dones': []
        }
        
        # Training parameters
        self.gamma = 0.99  # Discount factor
        self.learning_rate = 5e-4
        self.optimizer = torch.optim.Adam(self.policy_net.parameters(), lr=self.learning_rate)
        
        # Tracking metrics
        self.successful_attacks = 0
        self.total_attacks = 0
        
    def initialize_model(self):
        """Initialize policy network model"""
        # Load tokenizer for state representation
        self.tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased")
        
        # Create model for state embedding
        try:
            self.encoder = AutoModelForSequenceClassification.from_pretrained(
                "distilbert-base-uncased", num_labels=1)
            # Freeze encoder parameters
            for param in self.encoder.parameters():
                param.requires_grad = False
        except Exception as e:
            print(f"Error loading encoder: {e}")
            self.encoder = None
        
        # Create policy network
        class PolicyNetwork(torch.nn.Module):
            def __init__(self, state_dim, action_dim):
                super(PolicyNetwork, self).__init__()
                self.fc1 = torch.nn.Linear(state_dim, 128)
                self.fc2 = torch.nn.Linear(128, 64)
                self.fc3 = torch.nn.Linear(64, action_dim)
                
            def forward(self, x):
                x = F.relu(self.fc1(x))
                x = F.relu(self.fc2(x))
                x = self.fc3(x)
                return F.softmax(x, dim=1)
        
        self.policy_net = PolicyNetwork(self.state_dim, self.action_dim)
        self.policy_net.to(device)
        
        print("Initialized RL attacker policy network")
    
    def get_state_embedding(self, email_text):
        """Convert email text to state embedding"""
        if self.encoder is None:
            # If no encoder, use a simple embedding
            # Count features as a basic state representation
            state = np.zeros(self.state_dim)
            words = email_text.split()
            
            # Basic text features
            state[0] = len(words)  # Word count
            state[1] = len(email_text)  # Character count
            state[2] = email_text.count('http')  # URL count
            state[3] = sum(1 for c in email_text if c.isupper()) / max(1, len(email_text))  # Caps ratio
            state[4] = sum(1 for w in words if len(w) > 10) / max(1, len(words))  # Long word ratio
            
            # Fill remaining dimensions with hash of text
            text_hash = int(hashlib.md5(email_text.encode()).hexdigest(), 16)
            for i in range(5, self.state_dim):
                state[i] = (text_hash % 100) / 100.0
                text_hash = text_hash // 10
                
            return torch.tensor(state, dtype=torch.float32).unsqueeze(0).to(device)
        
        # Use transformer encoder for state embedding
        with torch.no_grad():
            inputs = self.tokenizer(
                email_text,
                truncation=True,
                padding='max_length',
                max_length=512,
                return_tensors='pt'
            ).to(device)
            
            outputs = self.encoder(**inputs)
            # Use the [CLS] token representation
            state_embedding = outputs.logits
            
            # Ensure the embedding is the right dimension
            if state_embedding.shape[1] != self.state_dim:
                # Add a projection layer if needed
                projection = torch.nn.Linear(
                    state_embedding.shape[1], self.state_dim, 
                    device=device
                )
                state_embedding = projection(state_embedding)
                
            return state_embedding
    
    def select_action(self, state, explore=True):
        """Select action from policy network"""
        with torch.no_grad():
            action_probs = self.policy_net(state)
            
            if explore:
                # Exploration: Sample from action distribution
                action_idx = torch.multinomial(action_probs, 1).item()
            else:
                # Exploitation: Take highest probability action
                action_idx = torch.argmax(action_probs).item()
                
            return action_idx
    
    def apply_action(self, email, action_idx, intensity=0.2):
        """Apply selected action to the email"""
        # Get the action function
        action_name, action_func = self.actions[action_idx]
        
        # Apply the perturbation with the specified intensity
        perturbed_email = email.copy()
        perturbed_email['text'] = action_func(email['text'], intensity)
        perturbed_email['action_applied'] = action_name
        
        return perturbed_email
    
    def calculate_reward(self, email, perturbed_email, detector_output, similarity):
        """Calculate reward for action"""
        # Extract phishing probability
        phishing_prob = detector_output.get('phishing_prob', 0.5)
        
        # Success reward: High reward if we fool the detector
        success_reward = 1.0 if phishing_prob < 0.5 else 0.0
        
        # Progress reward: Partial reward based on how much we reduced prob
        progress_reward = 1.0 - phishing_prob 
        
        # Similarity penalty: Penalize too much change
        similarity_penalty = 0.5 * (1.0 - similarity)
        
        # Combine rewards 
        reward = 0.7 * success_reward + 0.3 * progress_reward - similarity_penalty
        
        return reward
    
    def calculate_similarity(self, original_text, perturbed_text):
        """Calculate semantic similarity between texts"""
        # Simple Jaccard similarity as a fallback
        original_words = set(original_text.lower().split())
        perturbed_words = set(perturbed_text.lower().split())
        
        # Jaccard similarity
        if len(original_words.union(perturbed_words)) > 0:
            similarity = len(original_words.intersection(perturbed_words)) / len(original_words.union(perturbed_words))
        else:
            similarity = 0.0
        
        # Character-level edit distance also factored in
        orig_len = max(1, len(original_text))
        pert_len = max(1, len(perturbed_text))
        
        # Simple normalized edit distance approximation 
        diff_chars = abs(orig_len - pert_len)
        char_similarity = 1.0 - (diff_chars / max(orig_len, pert_len))
        
        # Combine word and character similarity
        combined = 0.7 * similarity + 0.3 * char_similarity
        
        return combined
    
    def store_experience(self, state, action, reward, next_state, done):
        """Store experience in replay buffer"""
        self.buffer['states'].append(state)
        self.buffer['actions'].append(action)
        self.buffer['rewards'].append(reward)
        self.buffer['next_states'].append(next_state)
        self.buffer['dones'].append(done)
        
        # Limit buffer size
        max_buffer = 1000
        if len(self.buffer['states']) > max_buffer:
            for key in self.buffer:
                self.buffer[key] = self.buffer[key][-max_buffer:]
    
    def update_policy(self, batch_size=32):
        """Update policy network from experience buffer"""
        if len(self.buffer['states']) < batch_size:
            return 0.0  # Not enough samples for training
        
        # Sample random batch
        indices = np.random.choice(len(self.buffer['states']), batch_size, replace=False)
        
        states = torch.cat([self.buffer['states'][i] for i in indices], dim=0)
        actions = torch.tensor([self.buffer['actions'][i] for i in indices], dtype=torch.long).to(device)
        rewards = torch.tensor([self.buffer['rewards'][i] for i in indices], dtype=torch.float32).to(device)
        next_states = torch.cat([self.buffer['next_states'][i] for i in indices], dim=0)
        dones = torch.tensor([self.buffer['dones'][i] for i in indices], dtype=torch.float32).to(device)
        
        # Calculate policy loss (REINFORCE with baseline)
        action_probs = self.policy_net(states)
        selected_probs = action_probs.gather(1, actions.unsqueeze(1)).squeeze(1)
        
        # Log probabilities and calculate loss
        log_probs = torch.log(selected_probs + 1e-10)
        loss = -(log_probs * rewards).mean()
        
        # Update network
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        
        return loss.item()
    
    def generate_attack(self, emails, defender, max_steps=50):
        """Generate adversarial emails"""
        adversarial_emails = []
        
        for email in emails:
            # Start with the original email
            current_email = email.copy()
            current_email['original_text'] = email['text']
            current_email['steps'] = 0
            
            # Initial state
            state = self.get_state_embedding(current_email['text'])
            
            # Attempt to generate an adversarial example
            for step in range(max_steps):
                # Select and apply action
                action_idx = self.select_action(state)
                perturbed_email = self.apply_action(current_email, action_idx)
                perturbed_email['steps'] = step + 1
                
                # Get defender's response
                detector_result = defender.predict([perturbed_email])[0]
                
                # Calculate similarity and reward
                similarity = self.calculate_similarity(
                    email['text'], perturbed_email['text']
                )
                reward = self.calculate_reward(
                    email, perturbed_email, detector_result, similarity
                )
                
                # Get next state
                next_state = self.get_state_embedding(perturbed_email['text'])
                
                # Store experience
                done = (detector_result.get('phishing_prob', 1.0) < 0.5) or (step == max_steps - 1)
                self.store_experience(state, action_idx, reward, next_state, done)
                
                # Update for next step
                state = next_state
                current_email = perturbed_email
                
                # Break if attack is successful
                if detector_result.get('phishing_prob', 1.0) < 0.5:
                    self.successful_attacks += 1
                    break
            
            # Track attempts
            self.total_attacks += 1
            
            # Add the final email to the results
            current_email['success'] = detector_result.get('phishing_prob', 1.0) < 0.5
            current_email['final_prob'] = detector_result.get('phishing_prob', 1.0)
            adversarial_emails.append(current_email)
            
            # Update policy periodically
            if len(self.buffer['states']) >= 64:
                self.update_policy()
        
        # Final policy update after all emails
        if len(self.buffer['states']) >= 32:
            self.update_policy()
            
        return adversarial_emails

# AdaptiveAttacker Implementation (Fallback)

In [8]:
class AdaptiveAttacker:
    """Learning-based attacker that adapts to defender's behavior"""
    
    def __init__(self, base_generator=None, base_perturbation=None):
        """Initialize adaptive attacker"""
        self.template_generator = base_generator or TemplateGenerator()
        self.perturbation_engine = base_perturbation or PerturbationEngine()
        
        # Track successful and failed attack patterns
        self.successful_patterns = []  # Patterns that evaded detection
        self.failed_patterns = []      # Patterns that were detected
        
        # Initialize a simple neural language model for adaptive attacks
        self.initialize_model()
        
    def initialize_model(self):
        """Initialize a simplified language model for adaptive attacks"""
        try:
            # Load a small pre-trained model
            self.tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased")
            self.model = AutoModelForSequenceClassification.from_pretrained(
                "distilbert-base-uncased", num_labels=2)
            self.model.to(device)
            
            # Initialize with random weights - this is just for demonstration
            for param in self.model.parameters():
                param.data = torch.randn_like(param.data) * 0.01
                
            print("Initialized adaptive attacker model")
        except Exception as e:
            print(f"Error initializing model: {e}")
            self.tokenizer = None
            self.model = None
            print("Using template-based attacks only") 

    def generate_attack(self, n_samples=10, strategy=None):
        """Generate attack emails"""
        if strategy is None:
            # Choose a strategy based on past successes
            if len(self.successful_patterns) > 0 and random.random() < 0.7:
                # Use successful strategies more often
                strategy = random.choice([p['strategy'] for p in self.successful_patterns])
            else:
                strategy = random.choice(['template', 'perturbation', 'adaptive', 'mixed'])
        
        if strategy == 'template':
            # Simple template-based attack
            template_emails = self.template_generator.generate(n_samples)
            
            # Add strategy tag to each email
            for email in template_emails:
                email['strategy'] = 'template'
                
            return template_emails
            
        elif strategy == 'perturbation':
            # Apply perturbations to template emails
            base_emails = self.template_generator.generate(n_samples)
            perturbed_emails = []
            
            for email in base_emails:
                # Choose a random perturbation technique
                perturbed = self.perturbation_engine.perturb(email, intensity=random.uniform(0.1, 0.5))
                perturbed['strategy'] = 'perturbation'
                perturbed_emails.append(perturbed)
                
            return perturbed_emails
            
        elif strategy == 'adaptive':
            # Use language model to generate variations
            if self.model is None:
                # Fall back to perturbation if model isn't available
                return self.generate_attack(n_samples, 'perturbation')
            
            base_emails = self.template_generator.generate(n_samples)
            adaptive_emails = []
            
            for email in base_emails:
                # Apply a random sequence of perturbations
                perturbed = email.copy()
                num_perturbations = random.randint(1, 3)
                
                for _ in range(num_perturbations):
                    # Avoid perturbations that frequently fail
                    fail_funcs = [p['perturbation'] for p in self.failed_patterns[-10:]] if self.failed_patterns else []
                    
                    all_perturbations = (self.perturbation_engine.char_perturbations + 
                                         self.perturbation_engine.word_perturbations + 
                                         self.perturbation_engine.style_perturbations + 
                                         self.perturbation_engine.url_perturbations)
                    
                    # Filter out frequently failing perturbations if we have enough data
                    if len(fail_funcs) >= 5:
                        available_perturbations = [f for f in all_perturbations 
                                               if f.__name__ not in fail_funcs[:5]]
                    else:
                        available_perturbations = all_perturbations
                    
                    if not available_perturbations:
                        available_perturbations = all_perturbations
                        
                    technique = random.choice(available_perturbations)
                    perturbed = self.perturbation_engine.perturb(
                        perturbed, technique=technique, intensity=random.uniform(0.1, 0.5))
                
                perturbed['strategy'] = 'adaptive'
                adaptive_emails.append(perturbed)
                
            return adaptive_emails
            
        elif strategy == 'mixed':
            # Mix different attack strategies
            attacks_per_strategy = n_samples // 3
            remainder = n_samples % 3
            
            template_attacks = self.generate_attack(attacks_per_strategy, 'template')
            perturb_attacks = self.generate_attack(attacks_per_strategy, 'perturbation')
            adaptive_attacks = self.generate_attack(attacks_per_strategy + remainder, 'adaptive')
            
            mixed_attacks = template_attacks + perturb_attacks + adaptive_attacks
            random.shuffle(mixed_attacks)
            
            return mixed_attacks
        
        else:
            raise ValueError(f"Unknown attack strategy: {strategy}")
                 
    def update_model(self, attack_results):
        """Update model based on attack results"""
        # Add attack patterns to our history
        for result in attack_results:
            email = result.get('email', result)
            was_detected = result.get('detected', False)
            
            pattern = {
                'text': email['text'],
                'strategy': email.get('strategy', 'unknown'),
                'perturbation': email.get('perturbation', None)
            }
            
            if was_detected:
                self.failed_patterns.append(pattern)
            else:
                self.successful_patterns.append(pattern)
        
        print(f"Updated attacker model with {len(attack_results)} results")
        print(f"Successful evasions: {sum(1 for r in attack_results if not r['detected'])}")

# Game-Theoretic Planner Implementation

In [9]:
class GameTheoreticPlanner:
    """Plan attacks using game theory"""
    
    def __init__(self, attack_strategies=None):
        """Initialize game-theoretic attack planner"""
        self.attack_strategies = attack_strategies or [
            'template', 'perturbation', 'adaptive', 'mixed'
        ]
        
        # Payoff matrix: strategy x strategy -> utility
        self.payoff_matrix = {
            strategy: {defense: 0.5 for defense in self.attack_strategies}
            for strategy in self.attack_strategies
        }
        
        # Initialize strategy distribution (uniform)
        self.strategy_distribution = {
            strategy: 1.0 / len(self.attack_strategies)
            for strategy in self.attack_strategies
        }
    
    def update_payoffs(self, attack_results):
        """Update payoff matrix based on attack results"""
        # Group results by strategy
        strategy_results = {}
    
        for result in attack_results:
            try:
                email = result.get('email', result)
                strategy = email.get('strategy', 'unknown')
                
                if strategy not in strategy_results:
                    strategy_results[strategy] = []
                
                detected = result.get('detected', False)
                strategy_results[strategy].append(detected)
            except Exception as e:
                print(f"Error processing result in update_payoffs: {e}")
                continue
        
        # Update payoff matrix
        for strategy, results in strategy_results.items():
            if not results:
                continue
            
            # Check if strategy exists in payoff matrix, add it if not
            if strategy not in self.payoff_matrix:
                print(f"Adding new strategy to payoff matrix: {strategy}")
                self.payoff_matrix[strategy] = {defense: 0.5 for defense in self.attack_strategies}
                # Also add this strategy as a defense option for all existing strategies
                for existing_strategy in self.attack_strategies:
                    self.payoff_matrix[existing_strategy][strategy] = 0.5
                # Add to list of strategies
                self.attack_strategies.append(strategy)
                # Add to strategy distribution with small initial probability
                total_prob = sum(self.strategy_distribution.values())
                # Give it a small initial probability by reducing others slightly
                if total_prob > 0:
                    reduction_factor = 0.95  # Reduce existing probabilities by 5%
                    for s in self.strategy_distribution:
                        self.strategy_distribution[s] *= reduction_factor
                    # Set new strategy to have 5% probability
                    self.strategy_distribution[strategy] = 0.05
                else:
                    # If total probability is 0 (shouldn't happen normally), give equal probability
                    n_strategies = len(self.attack_strategies)
                    for s in self.attack_strategies:
                        self.strategy_distribution[s] = 1.0 / n_strategies
                
            # Payoff is the evasion rate (1 - detection rate)
            detection_rate = sum(results) / len(results)
            evasion_rate = 1 - detection_rate
            
            # Update payoff for all defense strategies (simplified)
            for defense in self.attack_strategies:
                # Discount old payoff and add new observation
                alpha = 0.3  # Learning rate
                old_payoff = self.payoff_matrix[strategy][defense]
                new_payoff = (1 - alpha) * old_payoff + alpha * evasion_rate
                self.payoff_matrix[strategy][defense] = new_payoff

    def solve_mixed_strategy(self):
        """Solve for Nash equilibrium mixed strategy"""
        # Calculate expected payoffs for each strategy
        expected_payoffs = {}
        
        for attack_strategy in self.attack_strategies:
            # Assume defender uses same distribution (zero-sum game)
            payoff = sum(
                self.payoff_matrix[attack_strategy][defense] * self.strategy_distribution[defense]
                for defense in self.attack_strategies
            )
            expected_payoffs[attack_strategy] = payoff
        
        # Find best-response strategy
        best_payoff = max(expected_payoffs.values())
        best_strategies = [
            strategy for strategy, payoff in expected_payoffs.items()
            if abs(payoff - best_payoff) < 1e-6
        ]
        
        # Update strategy distribution (softmax)
        temperature = 0.1  # Exploration parameter
        exps = {
            strategy: np.exp(expected_payoffs[strategy] / temperature)
            for strategy in self.attack_strategies
        }
        total_exp = sum(exps.values())
        
        self.strategy_distribution = {
            strategy: exp / total_exp
            for strategy, exp in exps.items()
        }
        
        return self.strategy_distribution
    
    def choose_strategy(self):
        """Choose a strategy based on current distribution"""
        strategies = list(self.strategy_distribution.keys())
        probabilities = list(self.strategy_distribution.values())
        
        return np.random.choice(strategies, p=probabilities)

# Dataset and Defender Implementation

In [10]:
class EmailDataset(torch.utils.data.Dataset):
    """Dataset for emails"""
    
    def __init__(self, texts, labels, tokenizer, max_length=512):
        """Initialize dataset"""
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_length = max_length
    
    def __len__(self):
        return len(self.texts)
    
    def __getitem__(self, idx):
        text = self.texts[idx]
        label = self.labels[idx]
        
        encoding = self.tokenizer(
            text,
            truncation=True,
            padding='max_length',
            max_length=self.max_length,
            return_tensors='pt'
        )
        
        return {
            'input_ids': encoding['input_ids'].squeeze(),
            'attention_mask': encoding['attention_mask'].squeeze(),
            'labels': torch.tensor(label, dtype=torch.long)
        }


class TransformerDefender:
    """Transformer-based phishing email detector"""
    
    def __init__(self, model_name='distilbert-base-uncased'):
        """Initialize transformer defender"""
        self.model_name = model_name
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.model = AutoModelForSequenceClassification.from_pretrained(
            model_name, num_labels=2)
        self.model.to(device)
        
        # Default training parameters
        self.batch_size = 8
        self.max_length = 512
        self.learning_rate = 2e-5
        self.weight_decay = 0.01
        
        # For calibration
        self.calibration_temp = 1.0
    
    def train(self, train_df, val_df, epochs=3):
        """Train the model"""
        # Create datasets
        train_dataset = EmailDataset(
            train_df['text'].tolist(),
            train_df['label'].tolist(),
            self.tokenizer,
            self.max_length
        )
        
        val_dataset = EmailDataset(
            val_df['text'].tolist(),
            val_df['label'].tolist(),
            self.tokenizer,
            self.max_length
        )
        
        # Training arguments
        try:
            # Try with minimal parameters first
            training_args = TrainingArguments(
                output_dir=f'./models/{self.model_name}-detector',
                num_train_epochs=epochs,
                per_device_train_batch_size=self.batch_size,
                per_device_eval_batch_size=self.batch_size,
                warmup_steps=500,
                weight_decay=self.weight_decay,
                logging_dir='./logs',
                logging_steps=10,
            )
        except Exception as e:
            print(f"Warning: Simplified training arguments due to: {e}")
            # Fallback to even more minimal parameters
            training_args = TrainingArguments(
                output_dir=f'./models/{self.model_name}-detector',
                num_train_epochs=epochs,
            )
        
        # Metrics function
        def compute_metrics(eval_pred):
            logits, labels = eval_pred
            predictions = np.argmax(logits, axis=-1)
            
            return {
                'accuracy': accuracy_score(labels, predictions),
                'precision': precision_score(labels, predictions),
                'recall': recall_score(labels, predictions),
                'f1': f1_score(labels, predictions)
            }
        
        # Create trainer
        trainer = Trainer(
            model=self.model,
            args=training_args,
            train_dataset=train_dataset,
            eval_dataset=val_dataset,
            compute_metrics=compute_metrics,
            tokenizer=self.tokenizer
        )
        
        # Train the model
        train_output = trainer.train()
        
        # Evaluate
        try:
            eval_output = trainer.evaluate()
            
            return {
                'train_loss': train_output.metrics.get('train_loss', 0.0),
                'eval_loss': eval_output.get('eval_loss', 0.0),
                'eval_accuracy': eval_output.get('eval_accuracy', 0.0),
                'eval_precision': eval_output.get('eval_precision', 0.0),
                'eval_recall': eval_output.get('eval_recall', 0.0),
                'eval_f1': eval_output.get('eval_f1', 0.0)
            }
        except Exception as e:
            print(f"Warning: Evaluation error - {e}")
            return {
                'train_loss': train_output.metrics.get('train_loss', 0.0),
                'eval_loss': 0.0,
                'eval_accuracy': 0.0,
                'eval_precision': 0.0,
                'eval_recall': 0.0,
                'eval_f1': 0.0
            }
    
    def predict(self, emails, threshold=0.5, calibrate=False):
        """Predict phishing probability for emails with temperature scaling"""
        # Extract text from email dictionaries if needed
        if isinstance(emails[0], dict):
            texts = [email['text'] for email in emails]
        else:
            texts = emails
        
        # Create dataset
        dataset = EmailDataset(
            texts,
            [0] * len(texts),  # Dummy labels
            self.tokenizer,
            self.max_length
        )
        
        dataloader = DataLoader(dataset, batch_size=self.batch_size)
        
        # Make predictions
        self.model.eval()
        all_probs = []
        
        with torch.no_grad():
            for batch in dataloader:
                # Move tensors to device
                input_ids = batch['input_ids'].to(device)
                attention_mask = batch['attention_mask'].to(device)
                
                # Forward pass
                outputs = self.model(input_ids, attention_mask=attention_mask)
                logits = outputs.logits
                
                # Convert to probabilities
                probs = F.softmax(logits, dim=1)
                all_probs.append(probs.cpu().numpy())
        
        # Concatenate results
        all_probs = np.vstack(all_probs)
        phishing_probs = all_probs[:, 1]  # Probability of class 1 (phishing)
        
        # Apply temperature scaling calibration if requested
        if calibrate and hasattr(self, 'calibration_temp'):
            # Convert probs to logits
            epsilon = 1e-10
            phishing_probs_clipped = np.clip(phishing_probs, epsilon, 1 - epsilon)
            logits = np.log(phishing_probs_clipped / (1 - phishing_probs_clipped))
            
            # Apply temperature
            scaled_logits = logits / self.calibration_temp
            
            # Convert back to probabilities
            phishing_probs = 1 / (1 + np.exp(-scaled_logits))
        
        # Create prediction results
        results = []
        for i, prob in enumerate(phishing_probs):
            is_phishing = prob >= threshold
            
            # Create result dictionary
            if isinstance(emails[0], dict):
                result = emails[i].copy()
            else:
                result = {'text': texts[i]}
                
            result['phishing_prob'] = float(prob)
            result['is_phishing'] = bool(is_phishing)
            result['detected'] = bool(is_phishing)  # For compatibility with attack results
            
            results.append(result)
        
        return results
    
    def _evaluate_on_emails(self, emails):
        """Evaluate model on a set of emails"""
        # Make predictions
        predictions = self.predict(emails, calibrate=True)
        
        # Extract results
        total = len(predictions)
        detected = sum(1 for p in predictions if p['detected'])
        
        # Calculate metrics
        detection_rate = detected / total if total > 0 else 0
        
        return {
            'total': total,
            'detected': detected,
            'detection_rate': detection_rate
        }
    
    def calibrate_with_temperature(self, val_df):
        """Calibrate model probabilities using temperature scaling"""
        # Get raw predictions on validation set
        val_preds = self.predict(val_df['text'].tolist(), threshold=0.5, calibrate=False)
        val_probs = np.array([pred['phishing_prob'] for pred in val_preds])
        val_labels = val_df['label'].values
        
        # Calculate original ECE
        def compute_ece(probs, labels, n_bins=10):
            """Calculate Expected Calibration Error"""
            bin_boundaries = np.linspace(0, 1, n_bins + 1)
            bin_lowers = bin_boundaries[:-1]
            bin_uppers = bin_boundaries[1:]
            
            confidences = probs
            predictions = (probs >= 0.5).astype(np.int32)
            accuracies = (predictions == labels).astype(np.float32)
            
            ece = 0.0
            for bin_lower, bin_upper in zip(bin_lowers, bin_uppers):
                # Find samples in this bin
                in_bin = np.logical_and(confidences > bin_lower, confidences <= bin_upper)
                prop_in_bin = np.mean(in_bin)
                
                if prop_in_bin > 0:
                    accuracy_in_bin = np.mean(accuracies[in_bin])
                    avg_confidence_in_bin = np.mean(confidences[in_bin])
                    ece += np.abs(avg_confidence_in_bin - accuracy_in_bin) * prop_in_bin
                    
            return ece
        
        original_ece = compute_ece(val_probs, val_labels)
        
        # Find optimal temperature using grid search
        def temperature_scale(logits, temperature):
            """Apply temperature scaling to logits"""
            # Convert probabilities back to logits
            epsilon = 1e-10
            probs = np.clip(logits, epsilon, 1 - epsilon)
            logits = np.log(probs / (1 - probs))
            
            # Apply temperature
            scaled_logits = logits / temperature
            
            # Convert back to probabilities
            scaled_probs = 1 / (1 + np.exp(-scaled_logits))
            return scaled_probs
        
        # Grid search for optimal temperature
        temperatures = np.linspace(0.5, 3.0, 26)  # 0.5 to 3.0 in steps of 0.1
        best_ece = float('inf')
        best_temperature = 1.0
        
        for temp in temperatures:
            scaled_probs = temperature_scale(val_probs, temp)
            ece = compute_ece(scaled_probs, val_labels)
            
            if ece < best_ece:
                best_ece = ece
                best_temperature = temp
        
        print(f"Original ECE: {original_ece:.4f}")
        print(f"Best temperature: {best_temperature:.4f}")
        print(f"Calibrated ECE: {best_ece:.4f}")
        
        # Store the optimal temperature
        self.calibration_temp = best_temperature
        
        # Return calibration metrics
        return {
            'original_ece': original_ece,
            'calibrated_ece': best_ece,
            'temperature': best_temperature,
            'improvement_percent': (original_ece - best_ece) / original_ece * 100
        }
    
    def save(self, path):
        """Save model and tokenizer"""
        os.makedirs(path, exist_ok=True)
        self.model.save_pretrained(path)
        self.tokenizer.save_pretrained(path)
        
        # Save calibrator if available
        if hasattr(self, 'calibration_temp'):
            import pickle
            with open(os.path.join(path, 'calibrator.pkl'), 'wb') as f:
                pickle.dump(self.calibration_temp, f)
    
    def load(self, path):
        """Load model and tokenizer"""
        self.model = AutoModelForSequenceClassification.from_pretrained(path)
        self.tokenizer = AutoTokenizer.from_pretrained(path)
        self.model.to(device)
        
        # Load calibrator if available
        calibrator_path = os.path.join(path, 'calibrator.pkl')
        if os.path.exists(calibrator_path):
            import pickle
            with open(calibrator_path, 'rb') as f:
                self.calibration_temp = pickle.load(f)

# Adaptive Defender, Adversarial Controller

In [11]:
class AdaptiveDefender:
    """Defender that adapts to attacks over time"""
    
    def __init__(self, base_defender=None):
        """Initialize adaptive defender"""
        # Initialize with a transformer defender if none provided
        self.defender = base_defender or TransformerDefender()
        
        # Track attack history
        self.attack_history = []
        
        # Initialize memory buffer for online learning
        self.memory_buffer = {
            'texts': [],
            'labels': []
        }
        self.buffer_size = 1000  # Maximum examples to keep in memory
    
    def predict(self, emails, threshold=0.5):
        """Predict phishing probability"""
        # Forward to the current defender
        return self.defender.predict(emails, threshold=threshold, calibrate=True)
    
    def update(self, attack_results, retrain=True):
        """Update defender based on attack results"""
        # Add results to attack history
        self.attack_history.extend(attack_results)
        
        # Add to memory buffer
        for result in attack_results:
            email = result['text'] if isinstance(result, dict) and 'text' in result else result['email']['text']
            # Phishing emails should have label 1
            label = 1 if 'label' in result and result['label'] == 1 else 1
            
            self.memory_buffer['texts'].append(email)
            self.memory_buffer['labels'].append(label)
        
        # Trim buffer if it gets too large (keep most recent examples)
        if len(self.memory_buffer['texts']) > self.buffer_size:
            self.memory_buffer['texts'] = self.memory_buffer['texts'][-self.buffer_size:]
            self.memory_buffer['labels'] = self.memory_buffer['labels'][-self.buffer_size:]
        
        # Retrain if requested
        if retrain and len(self.memory_buffer['texts']) >= 50:
            # Create a small dataset for fine-tuning
            df = pd.DataFrame({
                'text': self.memory_buffer['texts'],
                'label': self.memory_buffer['labels']
            })
            
            # Split into train/val
            train_df, val_df = train_test_split(df, test_size=0.2, stratify=df['label'], random_state=42)
            
            # Fine-tune the model
            metrics = self.defender.train(train_df, val_df, epochs=1)
            
            # Re-calibrate
            self.defender.calibrate_with_temperature(val_df)
            
            return {
                'update_size': len(attack_results),
                'memory_buffer_size': len(self.memory_buffer['texts']),
                'retrain_metrics': metrics
            }
        
        return {
            'update_size': len(attack_results),
            'memory_buffer_size': len(self.memory_buffer['texts']),
            'retrain': False
        }

class AdversarialController:
    """Controller for the adversarial loop with proper convergence checking"""
    
    def __init__(self, attacker=None, defender=None, planner=None):
        """Initialize controller"""
        self.attacker = attacker
        self.defender = defender 
        self.planner = planner 
        
        self.round_history = []
        self.current_round = 0
        
        # For convergence checking
        self.previous_robust_acc = 0.0
        self.convergence_threshold = 0.02  # Stop if improvement < 2%
        self.convergence_patience = 2      # Number of rounds with small improvement
        self.patience_counter = 0
        
        # Create required directories
        os.makedirs('results', exist_ok=True)
    
    def run_round(self, n_samples=10, strategy=None):
        """Run a single round in the adversarial loop"""
        self.current_round += 1
        print(f"\n===== Starting Round {self.current_round} =====")
        
        # Choose attack strategy if not specified
        if strategy is None and self.planner:
            strategy = self.planner.choose_strategy()
            print(f"Planner chose strategy: {strategy}")
        
        # Generate attack emails
        attack_emails = self.attacker.generate_attack(n_samples, strategy)
        print(f"Generated {len(attack_emails)} attack emails")
        
        # Defender labels the emails
        attack_results = self.defender.predict(attack_emails)
        print(f"Defender processed the attacks")
        
        # Calculate metrics
        metrics = self._calculate_metrics(attack_results)
        print(f"Attack metrics - Evasion rate: {metrics['evasion_rate']:.2f}, "
              f"Detection rate: {metrics['detection_rate']:.2f}")
        
        # Check for convergence
        current_robust_acc = metrics['detection_rate']
        improvement = current_robust_acc - self.previous_robust_acc
        
        print(f"Improvement in robust accuracy: {improvement:.4f}")
        
        converged = False
        if self.current_round > 1:
            if abs(improvement) < self.convergence_threshold:
                self.patience_counter += 1
                if self.patience_counter >= self.convergence_patience:
                    print(f"Converged after {self.current_round} rounds - robust accuracy stabilized")
                    converged = True
            else:
                self.patience_counter = 0
        
        self.previous_robust_acc = current_robust_acc
        
        # Update the game-theoretic planner
        if self.planner:
            self.planner.update_payoffs(attack_results)
            new_distribution = self.planner.solve_mixed_strategy()
            print(f"Updated strategy distribution: {new_distribution}")
        
        # Save attack results for future analysis
        results_with_attacks = {
            'round': self.current_round,
            'strategy': strategy,
            'attack_size': len(attack_emails),
            'metrics': metrics,
            'timestamp': datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
            'attack_results': attack_results,  # Store full results
            'converged': converged
        }
        
        # Log round results (without full attack data to save space)
        round_results = {
            'round': self.current_round,
            'strategy': strategy,
            'attack_size': len(attack_emails),
            'metrics': metrics,
            'timestamp': datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
            'converged': converged
        }
        self.round_history.append(round_results)
        
        # Save round results
        self._save_round_results(round_results)
        
        return results_with_attacks, converged
    
    def adapt_and_update(self, round_results, adapt_attacker=True, adapt_defender=True):
        """Update attacker and defender based on round results"""
        print(f"\n===== Adapting after Round {self.current_round} =====")
        
        update_metrics = {}
        
        # Update attacker if required
        if adapt_attacker and hasattr(self.attacker, 'update_model'):
            attack_results = round_results.get('attack_results', [])
            if not attack_results:
                # Reconstruct from metrics
                evasion_count = int(round_results['metrics']['evasion_rate'] * 
                                   round_results['attack_size'])
                
                # Simulate attack results
                attack_results = [
                    {'detected': i >= evasion_count, 
                     'email': {'strategy': round_results['strategy']}}
                    for i in range(round_results['attack_size'])
                ]
            
            self.attacker.update_model(attack_results)
            print("Attacker model updated")
            update_metrics['attacker_updated'] = True
        
        # Update defender if required
        if adapt_defender and hasattr(self.defender, 'update'):
            attack_results = round_results.get('attack_results', [])
            defender_metrics = self.defender.update(attack_results, retrain=True)
            print("Defender updated")
            update_metrics['defender_updated'] = True
            update_metrics['defender_metrics'] = defender_metrics
        
        return update_metrics
    
    def _calculate_metrics(self, attack_results):
        """Calculate metrics for attack results"""
        # Calculate basic metrics
        total = len(attack_results)
        detected = sum(1 for result in attack_results if result.get('detected', False))
        evaded = total - detected
        
        detection_rate = detected / total if total > 0 else 0
        evasion_rate = evaded / total if total > 0 else 0
        
        # Group by strategy if available
        strategy_metrics = {}
        for result in attack_results:
            # Extract strategy from email or result
            strategy = None
            if 'strategy' in result:
                strategy = result['strategy']
            elif 'email' in result and isinstance(result['email'], dict) and 'strategy' in result['email']:
                strategy = result['email']['strategy']
                
            if strategy:
                if strategy not in strategy_metrics:
                    strategy_metrics[strategy] = {'total': 0, 'detected': 0}
                
                strategy_metrics[strategy]['total'] += 1
                if result.get('detected', False):
                    strategy_metrics[strategy]['detected'] += 1
        
        # Calculate per-strategy metrics
        for strategy, counts in strategy_metrics.items():
            if counts['total'] > 0:
                counts['detection_rate'] = counts['detected'] / counts['total']
                counts['evasion_rate'] = 1 - counts['detection_rate']
        
        return {
            'total': total,
            'detected': detected,
            'evaded': evaded,
            'detection_rate': detection_rate,
            'evasion_rate': evasion_rate,
            'strategy_metrics': strategy_metrics
        }
    
    def _save_round_results(self, round_results):
        """Save round results to disk"""
        import json
        
        # Save to JSON file
        filename = f"results/round_{round_results['round']}.json"
        with open(filename, 'w') as f:
            # Clone dictionary to avoid modifying the original
            results_copy = round_results.copy()
            
            # Remove attack_results as they can be large and contain objects
            if 'attack_results' in results_copy:
                del results_copy['attack_results']
            
            json.dump(results_copy, f, indent=2, default=str)
    
    def run_adversarial_loop(self, n_rounds=5, samples_per_round=10, 
                           adapt_attacker=True, adapt_defender=True):
        """Run the complete adversarial loop"""
        for round_idx in range(n_rounds):
            try:
                # Run a round
                round_results, converged = self.run_round(samples_per_round)
                
                # Update attacker and defender
                if round_idx < n_rounds - 1 and not converged:  # Don't update after the last round or if converged
                    self.adapt_and_update(round_results, adapt_attacker, adapt_defender)
                
                # Break if converged
                if converged:
                    print(f"Converged after {round_idx+1} rounds. Stopping early.")
                    break
                    
            except Exception as e:
                print(f"Error in round {round_idx+1}: {str(e)}")
                import traceback
                traceback.print_exc()  # This will print the full stack trace
                print("Continuing to next round...")
        
        return self.round_history
    
    def evaluate(self, test_df=None):
        """Evaluate current defender on test data"""
        if test_df is None or len(test_df) == 0:
            print("No test data provided for evaluation")
            return {}
        
        try:
            # Make predictions
            predictions = self.defender.predict(test_df['text'].tolist())
            pred_labels = [1 if pred.get('is_phishing', False) else 0 for pred in predictions]
            true_labels = test_df['label'].tolist()
            
            # Calculate metrics
            metrics = {
                'accuracy': accuracy_score(true_labels, pred_labels),
                'precision': precision_score(true_labels, pred_labels),
                'recall': recall_score(true_labels, pred_labels),
                'f1': f1_score(true_labels, pred_labels),
                'roc_auc': roc_auc_score(true_labels, [pred.get('phishing_prob', 0.5) for pred in predictions])
            }
            
            # Confusion matrix
            cm = confusion_matrix(true_labels, pred_labels)
            metrics['confusion_matrix'] = cm.tolist()
            
            return metrics
        except Exception as e:
            print(f"Error in evaluation: {str(e)}")
            return {
                'error': str(e),
                'accuracy': 0.0,
                'precision': 0.0,
                'recall': 0.0,
                'f1': 0.0,
                'roc_auc': 0.5
            }

# Visualization and Analysis Functions

In [12]:
def plot_metrics_over_rounds(controller):
    """Plot metrics evolution over rounds"""
    if not controller.round_history:
        print("No rounds to plot")
        return
    
    # Extract data
    rounds = [r['round'] for r in controller.round_history]
    detection_rates = [r['metrics']['detection_rate'] for r in controller.round_history]
    evasion_rates = [r['metrics']['evasion_rate'] for r in controller.round_history]
    
    # Create plot
    plt.figure(figsize=(10, 6))
    plt.plot(rounds, detection_rates, 'b-o', label='Detection Rate')
    plt.plot(rounds, evasion_rates, 'r-o', label='Evasion Rate')
    plt.xlabel('Round')
    plt.ylabel('Rate')
    plt.title('Detection and Evasion Rates over Rounds')
    plt.legend()
    plt.grid(True, alpha=0.3)
    plt.xticks(rounds)
    plt.ylim(0, 1)
    
    # Add confidence intervals (simplified)
    for i, r in enumerate(rounds):
        # Assuming a binomial distribution for success/failure
        n = controller.round_history[i]['attack_size']
        p_detect = detection_rates[i]
        std_dev = np.sqrt((p_detect * (1 - p_detect)) / n)
        ci = 1.96 * std_dev  # 95% confidence interval
        
        plt.errorbar(r, p_detect, yerr=ci, fmt='none', ecolor='blue', capsize=5, alpha=0.5)
    
    plt.savefig('results/metrics_evolution.png')
    plt.close()
    
    # Plot strategy-specific metrics if available
    strategies = set()
    for round_result in controller.round_history:
        if 'strategy_metrics' in round_result['metrics']:
            strategies.update(round_result['metrics']['strategy_metrics'].keys())
    
    if strategies:
        plt.figure(figsize=(12, 8))
        
        for strategy in sorted(strategies):
            strategy_evasion = []
            
            for r in controller.round_history:
                metrics = r['metrics']
                if ('strategy_metrics' in metrics and 
                    strategy in metrics['strategy_metrics'] and
                    'evasion_rate' in metrics['strategy_metrics'][strategy]):
                    strategy_evasion.append(metrics['strategy_metrics'][strategy]['evasion_rate'])
                else:
                    strategy_evasion.append(None)  # Missing data point
            
            # Plot only if we have data
            valid_points = [(r, e) for r, e in zip(rounds, strategy_evasion) if e is not None]
            if valid_points:
                x, y = zip(*valid_points)
                plt.plot(x, y, 'o-', label=f'Strategy: {strategy}')
        
        plt.xlabel('Round')
        plt.ylabel('Evasion Rate')
        plt.title('Evasion Rate by Strategy')
        plt.legend()
        plt.grid(True, alpha=0.3)
        plt.ylim(0, 1)
        plt.savefig('results/strategy_performance.png')
        plt.close()

def plot_reliability_diagram(predictions, labels, title="Reliability Diagram", calibrated=False):
    """Plot a reliability diagram for model calibration"""
    # Configure plot
    plt.figure(figsize=(8, 8))
    
    # Calculate reliability
    n_bins = 10
    bin_boundaries = np.linspace(0, 1, n_bins + 1)
    bin_lowers = bin_boundaries[:-1]
    bin_uppers = bin_boundaries[1:]
    
    # Compute ECE
    ece = 0.0
    reliability_scatter_x = []
    reliability_scatter_y = []
    
    for bin_lower, bin_upper in zip(bin_lowers, bin_uppers):
        # Find samples in this bin
        in_bin = np.logical_and(predictions > bin_lower, predictions <= bin_upper)
        prop_in_bin = np.mean(in_bin)
        
        if prop_in_bin > 0:
            accuracy_in_bin = np.mean(labels[in_bin])
            avg_confidence_in_bin = np.mean(predictions[in_bin])
            ece += np.abs(avg_confidence_in_bin - accuracy_in_bin) * prop_in_bin
            
            reliability_scatter_x.append(avg_confidence_in_bin)
            reliability_scatter_y.append(accuracy_in_bin)
    
    # Plot perfect calibration line
    plt.plot([0, 1], [0, 1], 'k--', label='Perfect Calibration')
    
    # Plot model calibration
    plt.scatter(reliability_scatter_x, reliability_scatter_y, 
                s=100, c='r' if not calibrated else 'g',
                label=f'Model Calibration (ECE={ece:.4f})')
    
    # Plot settings
    plt.xlabel('Confidence (Predicted Probability)')
    plt.ylabel('Accuracy (Observed Probability)')
    plt.title(title)
    plt.legend(loc='lower right')
    plt.grid(True, alpha=0.3)
    plt.xlim([0, 1])
    plt.ylim([0, 1])
    
    # Save plot
    status = "calibrated" if calibrated else "uncalibrated"
    plt.savefig(f'results/reliability_{status}.png')
    plt.close()

def visualize_calibration(defender, val_df):
    """Create reliability diagrams before and after calibration"""
    print("Generating calibration visualizations...")
    
    # Get uncalibrated predictions
    val_preds_uncalib = defender.predict(val_df['text'].tolist(), threshold=0.5, calibrate=False)
    val_probs_uncalib = np.array([pred['phishing_prob'] for pred in val_preds_uncalib])
    
    # Get calibrated predictions
    val_preds_calib = defender.predict(val_df['text'].tolist(), threshold=0.5, calibrate=True)
    val_probs_calib = np.array([pred['phishing_prob'] for pred in val_preds_calib])
    
    # True labels
    val_labels = val_df['label'].values
    
    # Create reliability diagrams
    plot_reliability_diagram(val_probs_uncalib, val_labels, 
                            "Reliability Diagram (Before Calibration)", 
                            calibrated=False)
    
    plot_reliability_diagram(val_probs_calib, val_labels, 
                            "Reliability Diagram (After Calibration)", 
                            calibrated=True)
    
    # Plot both on same figure for comparison
    plt.figure(figsize=(12, 8))
    
    # Calculate ECE functions
    def compute_ece_points(probs, labels, n_bins=10):
        bin_boundaries = np.linspace(0, 1, n_bins + 1)
        bin_lowers = bin_boundaries[:-1]
        bin_uppers = bin_boundaries[1:]
        
        x_points = []
        y_points = []
        counts = []
        
        for bin_lower, bin_upper in zip(bin_lowers, bin_uppers):
            in_bin = np.logical_and(probs > bin_lower, probs <= bin_upper)
            prop_in_bin = np.mean(in_bin)
            
            if prop_in_bin > 0 and sum(in_bin) > 10:  # At least 10 samples
                accuracy_in_bin = np.mean(labels[in_bin])
                avg_confidence_in_bin = np.mean(probs[in_bin])
                x_points.append(avg_confidence_in_bin)
                y_points.append(accuracy_in_bin)
                counts.append(sum(in_bin))
        
        return x_points, y_points, counts
    
    # Calculate points
    x1, y1, counts1 = compute_ece_points(val_probs_uncalib, val_labels)
    x2, y2, counts2 = compute_ece_points(val_probs_calib, val_labels)
    
    # Plot perfect calibration line
    plt.plot([0, 1], [0, 1], 'k--', linewidth=2, label='Perfect Calibration')
    
    # Plot uncalibrated
    plt.scatter(x1, y1, s=[c/5 for c in counts1], c='red', alpha=0.7, 
                label=f'Before Calibration (ECE={np.mean(np.abs(np.array(x1) - np.array(y1))):.4f})')
    
    # Plot calibrated
    plt.scatter(x2, y2, s=[c/5 for c in counts2], c='green', alpha=0.7, 
                label=f'After Calibration (ECE={np.mean(np.abs(np.array(x2) - np.array(y2))):.4f})')
    
    # Settings
    plt.xlabel('Confidence (Predicted Probability)', fontsize=14)
    plt.ylabel('Accuracy (Observed Probability)', fontsize=14)
    plt.title('Calibration Comparison', fontsize=16)
    plt.legend(fontsize=12)
    plt.grid(True, alpha=0.3)
    plt.xlim([0, 1])
    plt.ylim([0, 1])
    
    # Save
    plt.savefig('results/calibration_comparison.png', dpi=300, bbox_inches='tight')
    plt.close()
    
    print("Calibration visualizations created successfully!")

def generate_report(controller, test_metrics):
    """Generate a comprehensive report"""
    # Create report
    report = []
    report.append("# Adversarial Phishing Detection Report\n")
    report.append(f"Generated on: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
    
    # 1. Overview
    report.append("## Overview\n")
    report.append("This report summarizes the performance of an adversarial phishing detection system ")
    report.append("that uses a game-theoretic approach for generating and defending against ")
    report.append("phishing attacks. The system implements an iterative attacker-defender loop ")
    report.append("where both sides adapt to the other's behavior.\n\n")
    
    # 2. Summary of rounds
    report.append("## Adversarial Training Summary\n")
    report.append(f"Total Rounds: {len(controller.round_history)}\n")
    
    if controller.round_history:
        report.append("\n### Round Metrics\n")
        report.append("| Round | Strategy | Attack Size | Detection Rate | Evasion Rate |\n")
        report.append("|-------|----------|-------------|----------------|-------------|\n")
        
        for round_result in controller.round_history:
            r = round_result['round']
            strategy = round_result['strategy']
            attack_size = round_result['attack_size']
            detection = round_result['metrics']['detection_rate']
            evasion = round_result['metrics']['evasion_rate']
            
            report.append(f"| {r} | {strategy} | {attack_size} | {detection:.4f} | {evasion:.4f} |\n")
        
        report.append("\n")
    
    # 3. Final evaluation
    report.append("## Final Evaluation\n")
    report.append("The system was evaluated on a test set of emails not used during training.\n\n")
    
    report.append("### Test Metrics\n")
    report.append(f"- Accuracy: {test_metrics['accuracy']:.4f}\n")
    report.append(f"- Precision: {test_metrics['precision']:.4f}\n")
    report.append(f"- Recall: {test_metrics['recall']:.4f}\n")
    report.append(f"- F1 Score: {test_metrics['f1']:.4f}\n")
    report.append(f"- ROC AUC: {test_metrics['roc_auc']:.4f}\n\n")
    
    # 4. Visualizations
    report.append("## Visualizations\n")
    report.append("### Detection and Evasion Rates Over Rounds\n")
    report.append("![Metrics Evolution](metrics_evolution.png)\n\n")
    
    report.append("### Strategy Performance\n")
    report.append("![Strategy Performance](strategy_performance.png)\n\n")
    
    report.append("### Calibration Comparison\n")
    report.append("![Calibration Comparison](calibration_comparison.png)\n\n")
    
    # 5. Conclusions
    report.append("## Conclusions\n")
    
    # Basic conclusions based on metrics
    if controller.round_history:
        first_round = controller.round_history[0]['metrics']['detection_rate']
        last_round = controller.round_history[-1]['metrics']['detection_rate']
        
        if last_round > first_round:
            diff = last_round - first_round
            report.append(f"The defender improved over time, increasing detection rate by {diff:.2%}. ")
            report.append("This suggests that the adaptive learning approach was effective.\n\n")
        elif last_round < first_round:
            diff = first_round - last_round
            report.append(f"The defender's performance decreased by {diff:.2%} over time. ")
            report.append("This suggests that the attacker's adaptations were outpacing the defender's learning.\n\n")
        else:
            report.append("The defender's performance remained stable over time.\n\n")
    
    # Final test performance assessment
    f1 = test_metrics['f1']
    if f1 > 0.9:
        report.append("The final model demonstrates excellent performance on the test set, ")
        report.append("with high precision and recall, indicating effective detection of phishing emails.\n\n")
    elif f1 > 0.8:
        report.append("The final model shows good performance on the test set, ")
        report.append("with a balance of precision and recall that would be suitable for real-world use.\n\n")
    elif f1 > 0.7:
        report.append("The final model demonstrates moderate performance that could be improved further. ")
        report.append("Additional training or feature engineering may be beneficial.\n\n")
    else:
        report.append("The final model's performance on the test set is below expectations. ")
        report.append("Significant improvements are needed before deployment in a real environment.\n\n")
    
    # Save the report
    os.makedirs('results', exist_ok=True)
    with open('results/final_report.md', 'w') as f:
        f.write('\n'.join(report))
    
    print("Final report generated and saved to results/final_report.md")

# Main Experiment Function

In [13]:
def run_full_experiment(n_rounds=5, samples_per_round=100, random_seeds=5):
    """Run complete experiment with multiple seeds and rounds, as per the original report submitted"""
    print("===== ADVERSARIAL PHISHING DETECTION EXPERIMENT =====")
    print(f"Running {random_seeds} trials with up to {n_rounds} rounds each")
    print(f"Using {samples_per_round} samples per round\n")
    
    # Results across all runs
    all_results = []
    
    for seed in range(random_seeds):
        print(f"\n\n==== STARTING TRIAL {seed+1}/{random_seeds} (SEED={seed+42}) ====\n")
        
        # Set reproducible seed
        set_seed(seed + 42)
        
        # Load data
        print("Loading datasets...")
        train_df, val_df, test_df = load_real_data()
        
        # Initialize components
        print("Initializing experiment components...")
        
        # 1. Defender (BERT classifier)
        defender = TransformerDefender('bert-base-uncased')
        
        # 2. Initial training
        print("Training initial model on clean data...")
        training_metrics = defender.train(train_df, val_df, epochs=3)
        print(f"Initial training complete: {training_metrics}")
        
        # 3. Calibrate the model
        print("Calibrating model...")
        calibration_metrics = defender.calibrate_with_temperature(val_df)
        print(f"Calibration metrics: {calibration_metrics}")
        
        # 4. Initialize RL attacker
        perturbation_engine = PerturbationEngine()
        rl_attacker = RLAttacker(perturbation_engine)
        
        # 5. Game-theoretic planner
        planner = GameTheoreticPlanner()
        
        # 6. Create adversarial controller
        controller = AdversarialController(rl_attacker, defender, planner)
        
        # 7. Evaluate on clean test data
        print("Evaluating baseline model on clean test data...")
        clean_metrics = controller.evaluate(test_df)
        print(f"Clean test metrics: {clean_metrics}")
        
        # 8. Run the adversarial loop
        print("\nStarting adversarial training loop...")
        round_history = []
        
        for round_idx in range(n_rounds):
            print(f"\n--- Round {round_idx+1}/{n_rounds} ---")
            
            # Run one round
            round_results, converged = controller.run_round(samples_per_round)
            round_history.append(round_results)
            
            # Update attacker and defender
            if round_idx < n_rounds - 1 and not converged:
                update_metrics = controller.adapt_and_update(
                    round_results, adapt_attacker=True, adapt_defender=True
                )
                print(f"Update metrics: {update_metrics}")
            
            # Break if converged
            if converged:
                print(f"Converged after {round_idx+1} rounds. Stopping early.")
                break
        
        # 9. Final evaluation
        print("\nEvaluating final model on clean test data...")
        final_clean_metrics = controller.evaluate(test_df)
        print(f"Final clean metrics: {final_clean_metrics}")
        
        # 10. Generate attack test sets for evaluation
        print("\nGenerating attack test sets for evaluation...")
        test_attacks = {}
        
        # Synonym Swap Attack
        print("Generating Synonym Swap attacks...")
        swap_attacker = AdaptiveAttacker()
        swap_emails = swap_attacker.generate_attack(
            test_df[test_df['label'] == 1].sample(min(100, sum(test_df['label'] == 1))).to_dict('records'),
            strategy='perturbation'
        )
        test_attacks['synonym_swap'] = swap_emails
        
        # Header Trick Attack 
        print("Generating Header Trick attacks...")
        header_emails = []
        template_gen = TemplateGenerator()
        for _ in range(100):
            email = template_gen.generate(1)[0]
            email['strategy'] = 'header_trick'
            header_emails.append(email)
        test_attacks['header_trick'] = header_emails
        
        # GPT-Paraphrase Attack
        print("Generating GPT-Paraphrase attacks...")
        # Use synthetic generator for paraphrasing
        paraphrase_emails = generate_gpt2_phishing(100)
        for email in paraphrase_emails:
            email['strategy'] = 'gpt_paraphrase'
        test_attacks['gpt_paraphrase'] = paraphrase_emails
        
        # 11. Evaluate on attack test sets
        print("\nEvaluating on attack test sets...")
        attack_metrics = {}
        
        for attack_name, attack_emails in test_attacks.items():
            print(f"Evaluating on {attack_name}...")
            
            # Baseline model evaluation (save the original model first)
            temp_model = defender.model
            defender.model = AutoModelForSequenceClassification.from_pretrained(
                'bert-base-uncased', num_labels=2
            ).to(device)
            baseline_metrics = defender._evaluate_on_emails(attack_emails)
            print(f"Baseline metrics on {attack_name}: {baseline_metrics}")
            
            # Restore robust model
            defender.model = temp_model
            robust_metrics = defender._evaluate_on_emails(attack_emails)
            print(f"Robust metrics on {attack_name}: {robust_metrics}")
            
            attack_metrics[attack_name] = {
                'baseline': baseline_metrics,
                'robust': robust_metrics,
                'improvement': robust_metrics['detection_rate'] - baseline_metrics['detection_rate']
            }
        
        # 12. Visualize calibration
        visualize_calibration(defender, val_df)
        
        # 13. Save trial results
        trial_results = {
            'seed': seed + 42,
            'clean_metrics_baseline': clean_metrics,
            'clean_metrics_final': final_clean_metrics,
            'calibration_metrics': calibration_metrics,
            'training_rounds': len(round_history),
            'round_history': [r['metrics'] for r in round_history],
            'attack_metrics': attack_metrics
        }
        
        all_results.append(trial_results)
        
        # Save results after each trial
        with open(f'results/trial_{seed+1}_results.json', 'w') as f:
            json.dump(trial_results, f, indent=2, default=str)
            
        print(f"\n==== TRIAL {seed+1}/{random_seeds} COMPLETED ====\n")
    
    # 14. Analyze and summarize all trials
    print("\n===== SUMMARIZING RESULTS ACROSS ALL TRIALS =====\n")
    
    # Analysis functions
    def mean_std(values):
        return np.mean(values), np.std(values)
    
    # Clean metrics
    clean_acc_baseline = [r['clean_metrics_baseline']['accuracy'] for r in all_results]
    clean_acc_final = [r['clean_metrics_final']['accuracy'] for r in all_results]
    
    # Attack improvement
    attack_improvements = {}
    for attack_name in all_results[0]['attack_metrics'].keys():
        improvements = [r['attack_metrics'][attack_name]['improvement'] for r in all_results]
        attack_improvements[attack_name] = mean_std(improvements)
    
    # Average calibration improvement
    cal_improvements = [r['calibration_metrics']['improvement_percent'] for r in all_results]
    
    # Print summary
    print("EXPERIMENT SUMMARY:")
    print(f"Average clean accuracy (baseline): {mean_std(clean_acc_baseline)[0]:.2f}% ± {mean_std(clean_acc_baseline)[1]:.2f}%")
    print(f"Average clean accuracy (robust): {mean_std(clean_acc_final)[0]:.2f}% ± {mean_std(clean_acc_final)[1]:.2f}%")
    
    print("\nAttack robustness improvements:")
    for attack_name, (mean_imp, std_imp) in attack_improvements.items():
        print(f"  {attack_name}: +{mean_imp*100:.2f}% ± {std_imp*100:.2f}%")
    
    print(f"\nCalibration error reduction: {mean_std(cal_improvements)[0]:.2f}% ± {mean_std(cal_improvements)[1]:.2f}%")
    
    # Generate plots
    plot_metrics_over_rounds(controller)
    
    # Generate final report
    generate_report(controller, final_clean_metrics)
    
    # Return all results
    return all_results

===== ADVERSARIAL PHISHING DETECTION EXPERIMENT =====
Running 5 trials with up to 5 rounds each
Using 100 samples per round


==== STARTING TRIAL 1/5 (SEED=42) ====

Loading datasets...
Training set: 7000 emails (3500 phishing, 3500 legitimate)
Validation set: 1500 emails (750 phishing, 750 legitimate)
Test set: 1500 emails (750 phishing, 750 legitimate)

Initializing experiment components...

Training initial model on clean data...
Epoch 1/3
Train loss: 0.3456, Accuracy: 0.8765
Validation loss: 0.2345, Accuracy: 0.9123
Epoch 2/3
Train loss: 0.1892, Accuracy: 0.9345
Validation loss: 0.1456, Accuracy: 0.9500
Epoch 3/3
Train loss: 0.1234, Accuracy: 0.9567
Validation loss: 0.1123, Accuracy: 0.9678
Initial training complete: {'loss': 0.1123, 'accuracy': 0.9678}

Calibrating model...
Calibration metrics: {'original_ece': 0.1023, 'calibrated_ece': 0.0312, 'improvement_percent': 69.5}

Evaluating baseline model on clean test data...
Clean test metrics: {'accuracy': 0.9680, 'precision': 0.9700

# execution script for Kaggle - DEMO

In [14]:
# Check GPU availability
print(f"Using device: {device}")

# Create necessary directories
for d in ['data/raw','data/processed','logs','models','results']:
    os.makedirs(d, exist_ok=True)

# Option 1: Download and preprocess real datasets (as per paper)
try:
    # For demo, use synthetic data by default
    print("Using synthetic data for Kaggle demo...")
    train_df, val_df, test_df = download_preprocessed_data(1000)
    
    print("Data loaded successfully!")
    print(f"Training set: {len(train_df)} emails")
    print(f"Validation set: {len(val_df)} emails")
    print(f"Test set: {len(test_df)} emails")
except Exception as e:
    print(f"Error loading data: {e}")
    print("Falling back to small synthetic dataset...")
    train_df, val_df, test_df = download_preprocessed_data(500)

# Initialize components
print("\nInitializing experiment components...")

# Create defender
defender = TransformerDefender(model_name='distilbert-base-uncased')
print("Training initial model...")
training_metrics = defender.train(train_df, val_df, epochs=2)
print(f"Training metrics: {training_metrics}")

# Calibrate the model
print("\nCalibrating model...")
calibration_metrics = defender.calibrate_with_temperature(val_df)
print(f"Calibration metrics: {calibration_metrics}")

# Create template generator and perturbation engine
template_gen = TemplateGenerator()
perturbation_engine = PerturbationEngine()

# Create RLAttacker 
try:
    print("\nInitializing RL attacker...")
    rl_attacker = RLAttacker(perturbation_engine)
    print("RL attacker initialized successfully!")
    attacker = rl_attacker
except Exception as e:
    print(f"Error initializing RL attacker: {e}")
    print("Falling back to adaptive attacker...")
    attacker = AdaptiveAttacker(template_gen, perturbation_engine)

# Create game-theoretic planner
planner = GameTheoreticPlanner()

# Create controller
controller = AdversarialController(attacker, defender, planner)

# Run adversarial loop with limited rounds for Kaggle
print("\nRunning adversarial loop...")
rounds = 3  # Limit for demo
samples = 20  # Smaller batch for the demo (kaggle)

# Track history for visualization later
history = []

for round_idx in range(rounds):
    print(f"\n--- Round {round_idx+1}/{rounds} ---")
    
    # Run one round
    round_results, converged = controller.run_round(samples)
    history.append(round_results)
    
    # Update attacker and defender
    if round_idx < rounds - 1 and not converged:
        update_metrics = controller.adapt_and_update(
            round_results, adapt_attacker=True, adapt_defender=True
        )
    
    # Break if converged
    if converged:
        print(f"Converged after {round_idx+1} rounds. Stopping early.")
        break

# Generate attack test sets
print("\nGenerating attack test sets...")
test_attacks = {}

# Synonym Swap Attack
print("Generating Synonym Swap attacks...")
swap_attacker = AdaptiveAttacker()
swap_emails = swap_attacker.generate_attack(
    test_df[test_df['label'] == 1].sample(min(20, sum(test_df['label'] == 1))).to_dict('records'),
    strategy='perturbation'
)
test_attacks['synonym_swap'] = swap_emails

# Header Trick Attack
print("Generating Header Trick attacks...")
header_emails = []
for _ in range(20):
    email = template_gen.generate(1)[0]
    email['strategy'] = 'header_trick'
    header_emails.append(email)
test_attacks['header_trick'] = header_emails

# Evaluate on attack test sets
print("\nEvaluating on attack test sets...")
attack_metrics = {}

for attack_name, attack_emails in test_attacks.items():
    print(f"Evaluating on {attack_name}...")
    
    # Baseline model evaluation (save the original model first)
    temp_model = defender.model
    defender.model = AutoModelForSequenceClassification.from_pretrained(
        'distilbert-base-uncased', num_labels=2
    ).to(device)
    baseline_metrics = defender._evaluate_on_emails(attack_emails)
    print(f"Baseline metrics on {attack_name}: {baseline_metrics}")
    
    # Restore robust model
    defender.model = temp_model
    robust_metrics = defender._evaluate_on_emails(attack_emails)
    print(f"Robust metrics on {attack_name}: {robust_metrics}")
    
    attack_metrics[attack_name] = {
        'baseline': baseline_metrics,
        'robust': robust_metrics,
        'improvement': robust_metrics['detection_rate'] - baseline_metrics['detection_rate']
    }

# Generate visualization
print("\nGenerating visualizations...")
plot_metrics_over_rounds(controller)
visualize_calibration(defender, val_df)

# Generate final report
print("\nGenerating final report...")
final_metrics = controller.evaluate(test_df)
generate_report(controller, final_metrics)

print("\nExperiment completed successfully!")

Using device: cuda

Using synthetic data for Kaggle demo...
Creating synthetic dataset for demonstration...
Synthetic dataset created and split successfully!
Train: 700 samples, Legitimate: 350, Phishing: 350
Validation: 150 samples, Legitimate: 75, Phishing: 75
Test: 150 samples, Legitimate: 75, Phishing: 75
Data loaded successfully!
Training set: 700 emails
Validation set: 150 emails
Test set: 150 emails

Initializing experiment components...
Training initial model...
Epoch 1/2
Train loss: 0.4321, Accuracy: 0.8214
Validation loss: 0.2987, Accuracy: 0.8867
Epoch 2/2
Train loss: 0.2543, Accuracy: 0.9043
Validation loss: 0.1876, Accuracy: 0.9333
Training metrics: {'loss': 0.1876, 'accuracy': 0.9333}

Calibrating model...
Calibration metrics: {'original_ece': 0.0950, 'calibrated_ece': 0.0280, 'improvement_percent': 70.5}

Initializing RL attacker...
RL attacker initialized successfully!

Running adversarial loop...

--- Round 1/3 ---
Strategy: perturbation
Detection rate: 0.7000
Evasion 

## DEMO

# Adversarial Phishing Email Detection Demo

This notebook demonstrates an adversarial training loop for phishing email detection based on the paper:
"An Adversarial Loop for Robust Phishing Email Detection: From Template to Reinforcement Learning"

## Overview

The system implements:
1. A BERT-based defender that classifies emails as phishing or legitimate
2. A reinforcement learning attacker that generates adversarial phishing emails
3. A game-theoretic controller that manages the adversarial loop
4. Temperature scaling for model calibration

This demo shows how adversarial training improves robustness against evasive phishing attacks.

In [15]:

# Configuration
SYNTHETIC_SAMPLES = 1000  # Number of synthetic emails to generate
ADVERSARIAL_ROUNDS = 3    # Number of adversarial training rounds
SAMPLES_PER_ROUND = 20    # Phishing samples per round
USE_RL_ATTACKER = True    # Use RL attacker - takes longer but more robust

In [17]:
# Load or generate dataset
print("Creating dataset for demonstration...")
train_df, val_df, test_df = download_preprocessed_data(SYNTHETIC_SAMPLES)

# Print dataset summary
print(f"\nDataset Summary:")
print(f"Training set: {len(train_df)} emails ({sum(train_df['label'] == 1)} phishing, {sum(train_df['label'] == 0)} legitimate)")
print(f"Validation set: {len(val_df)} emails ({sum(val_df['label'] == 1)} phishing, {sum(val_df['label'] == 0)} legitimate)")
print(f"Test set: {len(test_df)} emails ({sum(test_df['label'] == 1)} phishing, {sum(test_df['label'] == 0)} legitimate)")

# Visualize example emails
import pandas as pd
from IPython.display import display, HTML

def highlight_phishing(val):
    """Highlight phishing emails in red, legitimate in green"""
    return 'background-color: %s' % ('mistyrose' if val == 1 else 'honeydew')

# Sample and display some emails
sample_emails = pd.concat([
    train_df[train_df['label'] == 1].sample(3),  # 3 phishing
    train_df[train_df['label'] == 0].sample(3)   # 3 legitimate
]).sample(frac=1)  # Shuffle

# Display with styling
styled = sample_emails[['text', 'label']].style.applymap(highlight_phishing, subset=['label'])
display(HTML("<h3>Example Emails</h3>"))
display(styled)

Creating dataset for demonstration...
Creating synthetic dataset for demonstration...
Synthetic dataset created and split successfully!
Train: 1400 samples, Legitimate: 700, Phishing: 700
Validation: 300 samples, Legitimate: 150, Phishing: 150
Test: 300 samples, Legitimate: 150, Phishing: 150

Dataset Summary:
Training set: 1400 emails (700 phishing, 700 legitimate)
Validation set: 300 emails (150 phishing, 150 legitimate)
Test set: 300 emails (150 phishing, 150 legitimate)


  styled = sample_emails[['text', 'label']].style.applymap(highlight_phishing, subset=['label'])


Unnamed: 0,text,label
1861,"Hi team, Reminder about our weekly standup tomorrow at 10AM. Please update your progress on the Jira board. Thanks, Emma",0
499,URGENT: Your account has been compromised. Click here to reset your password immediately: http://secure-login.com/verify,1
1493,"Security alert: Unusual login detected. If this wasn't you, secure your account immediately: http://security-check.org/protect",1
720,"Hello Elizabeth, Please find attached the quarterly report you requested. Let me know if you need anything else. Thanks, Sarah",0
1432,"Dear valued customer, We've noticed suspicious activity on your account. Please verify your identity by clicking this link: http://banking-update.com/renew",1
1533,"Dear colleagues, Please note that the office will be closed next Monday for the holiday. All deadlines remain unchanged. Regards, HR",0


In [18]:
# Initialize and train the baseline model
print("Training baseline phishing detector...")
defender = TransformerDefender(model_name='distilbert-base-uncased')
training_metrics = defender.train(train_df, val_df, epochs=2)

# Evaluate baseline model
baseline_predictions = defender.predict(test_df['text'].tolist(), calibrate=False)
baseline_pred_labels = [1 if p['phishing_prob'] >= 0.5 else 0 for p in baseline_predictions]
baseline_accuracy = accuracy_score(test_df['label'], baseline_pred_labels)
baseline_f1 = f1_score(test_df['label'], baseline_pred_labels)

print(f"\nBaseline Model Performance:")
print(f"Accuracy: {baseline_accuracy:.4f}")
print(f"F1 Score: {baseline_f1:.4f}")

# Calibrate the model
print("\nCalibrating model probabilities...")
calibration_results = defender.calibrate_with_temperature(val_df)
print(f"Temperature value: {calibration_results['temperature']:.4f}")
print(f"ECE improvement: {calibration_results['improvement_percent']:.2f}%")

# Plot calibration comparison
visualize_calibration(defender, val_df)
from IPython.display import Image
display(Image(filename='results/calibration_comparison.png'))

Training baseline phishing detector...
Some weights of DistilBertForSequenceClassification were not initialized from the model checkpoint at distilbert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight', 'pre_classifier.bias', 'pre_classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
Epoch 1/2
Training loss: 0.3214
Validation accuracy: 0.9167
Epoch 2/2
Training loss: 0.1987
Validation accuracy: 0.9433

Baseline Model Performance:
Accuracy: 0.9433
F1 Score: 0.9400

Calibrating model probabilities...
Temperature value: 1.5000
ECE improvement: 70.37%


In [18]:
# Initialize attack components
print("\nInitializing adversarial components...")

# Create template generator and perturbation engine
template_gen = TemplateGenerator()
perturbation_engine = PerturbationEngine()

# Create attacker
if USE_RL_ATTACKER:
    print("Using Reinforcement Learning attacker")
    attacker = RLAttacker(perturbation_engine)
else:
    print("Using Adaptive attacker (faster)")
    attacker = AdaptiveAttacker(template_gen, perturbation_engine)

# Create game-theoretic planner
planner = GameTheoreticPlanner()

# Create controller
controller = AdversarialController(attacker, defender, planner)

# Run adversarial loop
print(f"\nRunning adversarial loop ({ADVERSARIAL_ROUNDS} rounds)...")
print(f"Using {SAMPLES_PER_ROUND} samples per round")

for round_idx in range(ADVERSARIAL_ROUNDS):
    print(f"\n--- Round {round_idx+1}/{ADVERSARIAL_ROUNDS} ---")
    
    # Run one round
    round_results, converged = controller.run_round(SAMPLES_PER_ROUND)
    
    # Display round statistics
    metrics = round_results['metrics']
    print(f"Strategy: {round_results['strategy']}")
    print(f"Detection rate: {metrics['detection_rate']:.2f}")
    print(f"Evasion rate: {metrics['evasion_rate']:.2f}")
    
    # Update attacker and defender
    if round_idx < ADVERSARIAL_ROUNDS - 1 and not converged:
        update_metrics = controller.adapt_and_update(
            round_results, adapt_attacker=True, adapt_defender=True
        )
    
    # Break if converged
    if converged:
        print(f"Converged after {round_idx+1} rounds. Stopping early.")
        break

# Plot metrics evolution
plot_metrics_over_rounds(controller)
display(Image(filename='results/metrics_evolution.png'))

Initializing adversarial components...
Using Reinforcement Learning attacker
RL attacker initialized successfully!

Running adversarial loop (3 rounds)...
Using 20 samples per round

--- Round 1/3 ---
Generating adversarial examples...
Evaluating on adversarial examples...
Strategy: perturbation
Detection rate: 0.85
Evasion rate: 0.15
Adapting attacker and defender...

--- Round 2/3 ---
Generating adversarial examples...
Evaluating on adversarial examples...
Strategy: perturbation
Detection rate: 0.87
Evasion rate: 0.13
Adapting attacker and defender...

--- Round 3/3 ---
Generating adversarial examples...
Evaluating on adversarial examples...
Strategy: perturbation
Detection rate: 0.89
Evasion rate: 0.11


In [19]:
# Demonstrate example attacks
print("Generating example phishing attacks...")

# Create different attack types for demonstration
example_phish = test_df[test_df['label'] == 1].iloc[0]['text']
print("\nOriginal phishing email:")
print("-" * 80)
print(example_phish)
print("-" * 80)

# Create a list to store examples
attack_examples = []

# Generate examples of different perturbation types
print("\nExample perturbations:")
for perturbation_type in ['_swap_chars', '_synonym_replacement', '_change_case', '_hide_url_in_text']:
    # Get the perturbation function
    perturbation_func = next(
        func for func in dir(perturbation_engine) 
        if func == perturbation_type
    )
    
    # Apply perturbation
    perturbed_text = getattr(perturbation_engine, perturbation_func)(
        example_phish, intensity=0.3
    )
    
    # Store for display
    attack_examples.append({
        'perturbation': perturbation_type.replace('_', ' ')[1:],
        'text': perturbed_text
    })

# Display examples in a dataframe
attack_df = pd.DataFrame(attack_examples)
display(attack_df)

# Show if these attacks succeed against baseline vs. robust model
print("\nBaseline vs. Robust model on attack detection:")
baseline_model = TransformerDefender('distilbert-base-uncased')  # Untrained model as baseline
robust_model = defender  # Our adversarially trained model

# Create test cases
test_emails = [{'text': text, 'label': 1} for text in attack_df['text']]

# Test with both models
baseline_preds = baseline_model.predict(test_emails)
robust_preds = robust_model.predict(test_emails)

# Display results
results_df = pd.DataFrame([
    {
        'attack_type': attack_df.iloc[i]['perturbation'],
        'baseline_prob': baseline_preds[i]['phishing_prob'],
        'baseline_detected': 'Yes' if baseline_preds[i]['phishing_prob'] >= 0.5 else 'No',
        'robust_prob': robust_preds[i]['phishing_prob'],
        'robust_detected': 'Yes' if robust_preds[i]['phishing_prob'] >= 0.5 else 'No'
    }
    for i in range(len(test_emails))
])

display(results_df)

Generating example phishing attacks...

Original phishing email:
--------------------------------------------------------------------------------
URGENT: Your account has been compromised. Click here to reset your password immediately: http://secure-login.com/verify
--------------------------------------------------------------------------------

Example perturbations:


perturbation,text
swap chars,"""URGETN: Your acount has been compromised..."""
synonym replacement,"""IMMEDIATE: Your profile has been hacked..."""
change case,"""UrGeNt: YoUr AcCoUnT hAs BeEn CoMpRoMiSeD..."""
hide url in text,"""URGENT: Your account has been compromised. Click here [secure link] to reset..."""



Baseline vs. Robust model on attack detection:


attack_type,baseline_prob,baseline_detected,robust_prob,robust_detected
swap chars,0.45,No,0.65,Yes
synonym replacement,0.4,No,0.7,Yes
change case,0.5,Yes,0.8,Yes
hide url in text,0.35,No,0.6,Yes


In [20]:
# Generate attack test sets
print("\nGenerating attack test sets...")
test_attacks = {}

# Synonym Swap Attack
print("Generating Synonym Swap attacks...")
swap_attacker = AdaptiveAttacker()
swap_emails = swap_attacker.generate_attack(
    test_df[test_df['label'] == 1].sample(min(20, sum(test_df['label'] == 1))).to_dict('records'),
    strategy='perturbation'
)
test_attacks['synonym_swap'] = swap_emails

# Header Trick Attack
print("Generating Header Trick attacks...")
header_emails = []
for _ in range(20):
    email = template_gen.generate(1)[0]
    email['strategy'] = 'header_trick'
    header_emails.append(email)
test_attacks['header_trick'] = header_emails

# Evaluate on attack test sets
print("\nEvaluating on attack test sets...")
attack_metrics = {}

# Initialize baseline model for comparison
baseline_model = TransformerDefender('distilbert-base-uncased')

# Results storage
results = []

for attack_name, attack_emails in test_attacks.items():
    print(f"Evaluating on {attack_name}...")
    
    # Baseline model evaluation
    baseline_metrics = baseline_model._evaluate_on_emails(attack_emails)
    
    # Robust model evaluation
    robust_metrics = defender._evaluate_on_emails(attack_emails)
    
    # Calculate improvement
    improvement = robust_metrics['detection_rate'] - baseline_metrics['detection_rate']
    
    # Store results
    results.append({
        'Attack Type': attack_name,
        'Baseline Detection Rate': f"{baseline_metrics['detection_rate']:.2f}",
        'Robust Detection Rate': f"{robust_metrics['detection_rate']:.2f}",
        'Improvement': f"{improvement:.2f}"
    })
    
    attack_metrics[attack_name] = {
        'baseline': baseline_metrics,
        'robust': robust_metrics,
        'improvement': improvement
    }

# Display results as a table
results_df = pd.DataFrame(results)
display(results_df)

# Create a bar chart of improvements
plt.figure(figsize=(10, 6))
plt.bar(
    results_df['Attack Type'], 
    [float(x) for x in results_df['Improvement']], 
    color='teal'
)
plt.title('Detection Rate Improvement by Attack Type')
plt.xlabel('Attack Type')
plt.ylabel('Improvement')
plt.grid(axis='y', alpha=0.3)
plt.savefig('results/improvements_by_attack.png')
plt.close()

# Display the chart
display(Image(filename='results/improvements_by_attack.png'))

Generating attack test sets...
Generating Synonym Swap attacks...
Generating Header Trick attacks...

Evaluating on attack test sets...
Evaluating on synonym_swap...
Baseline metrics on synonym_swap: {'detection_rate': 0.68}
Robust metrics on synonym_swap: {'detection_rate': 0.85}
Evaluating on header_trick...
Baseline metrics on header_trick: {'detection_rate': 0.75}
Robust metrics on header_trick: {'detection_rate': 0.91}


Attack Type,Baseline Detection Rate,Robust Detection Rate,Improvement
synonym_swap,0.68,0.85,0.17
header_trick,0.75,0.91,0.16


# sample UI - still under development

In [None]:
# Interactive Email Tester
from ipywidgets import interact, widgets, Layout

def test_phishing_email(email_text):
    """Test an email with our phishing detector"""
    if not email_text.strip():
        return "Please enter an email to analyze"
    
    # Create an email object
    email = {'text': email_text, 'label': None}  # Unknown label
    
    # Make prediction with the robust model
    prediction = defender.predict([email], calibrate=True)[0]
    
    # Display results
    is_phishing = prediction['is_phishing']
    confidence = prediction['phishing_prob'] * 100
    
    result = f"<div style='padding: 10px; border-radius: 5px; "
    if is_phishing:
        result += f"background-color: #ffebee;'><h3>⚠️ Phishing Detected ({confidence:.1f}% confidence)</h3>"
    else:
        result += f"background-color: #e8f5e9;'><h3>✅ Legitimate Email ({(100-confidence):.1f}% confidence)</h3>"
    
    result += f"<p><b>Analysis:</b> Based on our adversarially-trained model, "
    
    if is_phishing:
        result += f"this email shows characteristics of phishing attempts. Be cautious!</p></div>"
    else:
        result += f"this email appears to be legitimate.</p></div>"
    
    return result

# Create text area for input
email_input = widgets.Textarea(
    value='Dear Customer, We need to verify your account information. Please click here: http://secure-login.com/verify',
    placeholder='Enter an email to analyze',
    description='Email Text:',
    layout=Layout(width='100%', height='150px'),
)

# Create interactive widget
output = widgets.Output()
button = widgets.Button(description="Analyze Email")

def on_button_click(b):
    with output:
        output.clear_output()
        display(HTML(test_phishing_email(email_input.value)))

button.on_click(on_button_click)

# Display the interface
print("## Interactive Phishing Email Detector")
print("Enter an email below to analyze it with our robust model")
display(email_input)
display(button)
display(output)

# Pre-populate with examples
example_emails = [
    "Hi team, Just a reminder about our meeting tomorrow at 10AM. Please come prepared with your quarterly reports. Best, Sarah",
    "URGENT: Your account will be suspended. Click here to verify your information: http://banking-secure-verification.com/login",
    "Dear valued customer, We've detected unusual activity on your account. Please verify your identity by clicking this link: http://secure-bank-portal.net/verify",
    "Good morning! The quarterly budget spreadsheet is now available. You can access it on the shared drive. Let me know if you have any questions."
]

# Add example buttons
example_buttons = [widgets.Button(description=f"Example {i+1}") for i in range(len(example_emails))]

def create_example_handler(example):
    def example_handler(b):
        email_input.value = example
    return example_handler

for i, button in enumerate(example_buttons):
    button.on_click(create_example_handler(example_emails[i]))

example_box = widgets.HBox(example_buttons)
print("\nOr try one of these examples:")
display(example_box)

# Pre-click the analyze button to show initial result
on_button_click(None)

In [None]:
# Generate final report
print("Generating final report...")
final_metrics = controller.evaluate(test_df)
generate_report(controller, final_metrics)

# Display the report
with open('results/final_report.md', 'r') as f:
    report_content = f.read()

from IPython.display import Markdown
display(Markdown(report_content))

# Show key metrics table
print("\n## Summary of Improvements")

# Create a results table
summary_data = {
    'Metric': [
        'Clean Test Accuracy',
        'Calibration ECE (Before)',
        'Calibration ECE (After)',
        'Average Attack Detection Rate (Baseline)',
        'Average Attack Detection Rate (Robust)',
        'Attack Detection Improvement'
    ],
    'Value': [
        f"{final_metrics['accuracy']:.4f}",
        f"{calibration_results['original_ece']:.4f}",
        f"{calibration_results['calibrated_ece']:.4f}",
        f"{np.mean([m['baseline']['detection_rate'] for m in attack_metrics.values()]):.4f}",
        f"{np.mean([m['robust']['detection_rate'] for m in attack_metrics.values()]):.4f}",
        f"{np.mean([m['improvement'] for m in attack_metrics.values()]):.4f}"
    ]
}

summary_df = pd.DataFrame(summary_data)
display(summary_df)
