## Adversarial Attacks Evaluation

Evaluate robustness of baseline and lottery tickets under adversarial attacks (FGSM, PGD).

In [None]:
import sys
sys.path.append('..')

import numpy as np
import matplotlib.pyplot as plt
import torch
import json
from pathlib import Path

from src.utils.config import load_config, get_device
from src.models.deep_hedging import create_model
from src.models.losses import create_loss_function
from src.data.heston import get_or_generate_dataset
from src.data.preprocessor import create_dataloaders, compute_features
from src.attacks.fgsm import create_fgsm_attack
from src.attacks.pgd import create_pgd_attack
from src.evaluation.metrics import compute_all_metrics, compute_robustness_metrics

### Setup

In [None]:
config = load_config('../configs/config.yaml')
device = get_device(config)
print(f"Using device: {device}")

# Extract key parameters
heston_config = config['data']['heston']
K = heston_config['K']
T = config['data']['T']
n_steps = config['data']['n_steps']
dt = T / n_steps

# Load test data
cache_dir = config.get('caching', {}).get('directory', 'cache')
S_test, v_test, Z_test = get_or_generate_dataset(config, 'test', cache_dir)

# Create test dataloader
batch_size = config.get('training', {}).get('batch_size', 256)
_, _, test_loader = create_dataloaders(
    S_test[:1000], v_test[:1000], Z_test[:1000],  # Dummy train/val
    S_test[:1000], v_test[:1000], Z_test[:1000],
    S_test, v_test, Z_test,
    batch_size=batch_size
)

print(f"Test set: {S_test.shape[0]} paths")
print(f"K={K}, T={T}, n_steps={n_steps}, dt={dt:.6f}")

### Helper Functions

In [None]:
def evaluate_robustness(model, loss_fn, test_loader, config, device):
    """
    Evaluate model robustness against FGSM and PGD attacks.
    
    Returns:
        Dictionary with clean, FGSM, and PGD metrics
    """
    heston_config = config['data']['heston']
    K = heston_config['K']
    T = config['data']['T']
    n_steps = config['data']['n_steps']
    dt = T / n_steps
    
    model.eval()
    
    # Create attacks
    fgsm = create_fgsm_attack(model, loss_fn, config)
    pgd = create_pgd_attack(model, loss_fn, config)
    
    clean_pnls, fgsm_pnls, pgd_pnls = [], [], []
    
    for S, v, Z in test_loader:
        S, v, Z = S.to(device), v.to(device), Z.to(device)
        features = compute_features(S, v, K, T, dt)
        
        # Clean evaluation
        with torch.no_grad():
            deltas, y = model(features, S)
            pnl = loss_fn.compute_pnl(deltas, S, Z, dt)
            clean_pnls.append(pnl.cpu())
        
        # FGSM attack
        features_fgsm, _ = fgsm.attack(features, S, Z, dt)
        with torch.no_grad():
            deltas, y = model(features_fgsm, S)
            pnl = loss_fn.compute_pnl(deltas, S, Z, dt)
            fgsm_pnls.append(pnl.cpu())
        
        # PGD attack
        features_pgd, _ = pgd.attack(features, S, Z, dt)
        with torch.no_grad():
            deltas, y = model(features_pgd, S)
            pnl = loss_fn.compute_pnl(deltas, S, Z, dt)
            pgd_pnls.append(pnl.cpu())
    
    clean_pnls = torch.cat(clean_pnls).numpy()
    fgsm_pnls = torch.cat(fgsm_pnls).numpy()
    pgd_pnls = torch.cat(pgd_pnls).numpy()
    
    # Compute metrics
    clean_metrics = compute_all_metrics(clean_pnls)
    fgsm_metrics = compute_all_metrics(fgsm_pnls)
    pgd_metrics = compute_all_metrics(pgd_pnls)
    
    return {
        'clean': clean_metrics,
        'fgsm': fgsm_metrics,
        'pgd': pgd_metrics,
        'fgsm_gap': clean_metrics['pnl_mean'] - fgsm_metrics['pnl_mean'],
        'pgd_gap': clean_metrics['pnl_mean'] - pgd_metrics['pnl_mean'],
        'fgsm_cvar_gap': clean_metrics['cvar_05'] - fgsm_metrics['cvar_05'],
        'pgd_cvar_gap': clean_metrics['cvar_05'] - pgd_metrics['cvar_05']
    }

### Load Models

In [None]:
models_to_test = {}
loss_fn = create_loss_function(config)

# Baseline dense model
baseline_path = Path('../experiments/baseline/checkpoints/best.pt')
if baseline_path.exists():
    model_dense = create_model(config)
    checkpoint = torch.load(baseline_path, map_location=device)
    model_dense.load_state_dict(checkpoint['model_state_dict'])
    model_dense = model_dense.to(device)
    models_to_test['dense_baseline'] = model_dense
    print(f"Loaded dense baseline from {baseline_path}")
else:
    print(f"Dense baseline not found at {baseline_path}")

# Sparse tickets at different sparsity levels
for sparsity in [50, 60, 70, 80, 90, 95]:
    ticket_path = Path(f'../experiments/pruning/sparsity_{sparsity}/checkpoints/best.pt')
    if ticket_path.exists():
        model_sparse = create_model(config)
        checkpoint = torch.load(ticket_path, map_location=device)
        model_sparse.load_state_dict(checkpoint['model_state_dict'])
        model_sparse = model_sparse.to(device)
        models_to_test[f'ticket_{sparsity}%'] = model_sparse
        print(f"Loaded ticket {sparsity}% from {ticket_path}")
    else:
        print(f"Ticket {sparsity}% not found, skipping")

print(f"\nTotal models loaded: {len(models_to_test)}")

### Evaluate Robustness

In [None]:
results_attacks = {}

for model_name, model in models_to_test.items():
    print(f"\nEvaluating {model_name}...")
    
    results = evaluate_robustness(model, loss_fn, test_loader, config, device)
    results_attacks[model_name] = results
    
    print(f"  Clean CVaR:      {results['clean']['cvar_05']:.6f}")
    print(f"  FGSM CVaR:       {results['fgsm']['cvar_05']:.6f}")
    print(f"  PGD CVaR:        {results['pgd']['cvar_05']:.6f}")
    print(f"  PGD CVaR Gap:    {results['pgd_cvar_gap']:.6f}")

# Save results
output_dir = Path('../experiments/adversarial')
output_dir.mkdir(parents=True, exist_ok=True)

with open(output_dir / 'attack_results.json', 'w') as f:
    json.dump(results_attacks, f, indent=2, default=float)

print(f"\nResults saved to {output_dir / 'attack_results.json'}")

### Results Summary

In [None]:
print("\n" + "=" * 90)
print("ADVERSARIAL ROBUSTNESS SUMMARY")
print("=" * 90)
print(f"{'Model':<20} {'Clean CVaR':<15} {'FGSM CVaR':<15} {'PGD CVaR':<15} {'PGD Gap':<15}")
print("-" * 90)

for model_name, results in results_attacks.items():
    print(f"{model_name:<20} "
          f"{results['clean']['cvar_05']:<15.4f} "
          f"{results['fgsm']['cvar_05']:<15.4f} "
          f"{results['pgd']['cvar_05']:<15.4f} "
          f"{results['pgd_cvar_gap']:<15.4f}")

print("=" * 90)

### Visualization

In [None]:
if len(results_attacks) > 0:
    model_names = list(results_attacks.keys())
    clean_cvars = [results_attacks[m]['clean']['cvar_05'] for m in model_names]
    fgsm_cvars = [results_attacks[m]['fgsm']['cvar_05'] for m in model_names]
    pgd_cvars = [results_attacks[m]['pgd']['cvar_05'] for m in model_names]
    
    x = np.arange(len(model_names))
    width = 0.25
    
    fig, ax = plt.subplots(figsize=(14, 6))
    
    bars1 = ax.bar(x - width, clean_cvars, width, label='Clean', color='#2563eb', alpha=0.8)
    bars2 = ax.bar(x, fgsm_cvars, width, label='FGSM Attack', color='#f59e0b', alpha=0.8)
    bars3 = ax.bar(x + width, pgd_cvars, width, label='PGD Attack', color='#dc2626', alpha=0.8)
    
    ax.set_xlabel('Model')
    ax.set_ylabel('CVaR 5%')
    ax.set_title('Adversarial Robustness: Clean vs FGSM vs PGD')
    ax.set_xticks(x)
    ax.set_xticklabels(model_names, rotation=45, ha='right')
    ax.legend()
    ax.grid(True, alpha=0.3, axis='y')
    
    plt.tight_layout()
    plt.savefig('../figures/robustness_comparison.pdf', dpi=300)
    plt.show()
else:
    print("No models to visualize")

### Robustness gap vs sparsity

In [None]:
# Robustness gap vs sparsity
if len(results_attacks) > 1:
    # Extract sparsity levels from ticket names
    sparsities = []
    pgd_gaps = []
    
    for model_name, results in results_attacks.items():
        if 'ticket' in model_name:
            sparsity = int(model_name.split('_')[1].replace('%', ''))
            sparsities.append(sparsity)
            pgd_gaps.append(results['pgd_cvar_gap'])
    
    if sparsities:
        fig, ax = plt.subplots(figsize=(10, 6))
        
        ax.plot(sparsities, pgd_gaps, 'o-', linewidth=2, markersize=8, color='#dc2626')
        
        # Add baseline gap if available
        if 'dense_baseline' in results_attacks:
            baseline_gap = results_attacks['dense_baseline']['pgd_cvar_gap']
            ax.axhline(baseline_gap, color='#6b7280', linestyle='--', 
                      label=f'Dense Baseline ({baseline_gap:.4f})')
        
        ax.set_xlabel('Sparsity (%)')
        ax.set_ylabel('PGD CVaR Gap')
        ax.set_title('Robustness Degradation vs Sparsity')
        ax.legend()
        ax.grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.savefig('../figures/robustness_vs_sparsity.pdf', dpi=300)
        plt.show()

### Trade-off Analysis: Clean Performance vs Robustness

In [None]:
# =============================================================================
# FIGURE: Trade-off Curve (Clean Performance vs Robustness - Pareto Front)

if len(results_attacks) > 1:
    fig, axes = plt.subplots(1, 2, figsize=(14, 5))
    
    # Extract data
    model_names = list(results_attacks.keys())
    clean_cvars = [results_attacks[m]['clean']['cvar_05'] for m in model_names]
    pgd_cvars = [results_attacks[m]['pgd']['cvar_05'] for m in model_names]
    pgd_gaps = [results_attacks[m]['pgd_cvar_gap'] for m in model_names]
    
    # Determine colors based on model type
    colors = []
    for name in model_names:
        if 'dense' in name.lower() or 'baseline' in name.lower():
            colors.append('#2563eb')  # Blue for dense
        elif 'adv' in name.lower() or 'robust' in name.lower():
            colors.append('#16a34a')  # Green for adversarially trained
        else:
            colors.append('#dc2626')  # Red for standard tickets
    
    # --- Plot 1: Clean CVaR vs PGD CVaR ---
    ax1 = axes[0]
    scatter = ax1.scatter(clean_cvars, pgd_cvars, c=colors, s=150, alpha=0.8, edgecolors='black', linewidth=1)
    
    # Add diagonal line (no degradation)
    lims = [min(min(clean_cvars), min(pgd_cvars)) - 0.5, max(max(clean_cvars), max(pgd_cvars)) + 0.5]
    ax1.plot(lims, lims, 'k--', alpha=0.3, label='No degradation')
    
    # Annotate points
    for i, name in enumerate(model_names):
        short_name = name.replace('_baseline', '').replace('ticket_', 't')
        ax1.annotate(short_name, (clean_cvars[i], pgd_cvars[i]), 
                    textcoords="offset points", xytext=(5, 5), fontsize=9)
    
    ax1.set_xlabel('Clean CVaR 5%', fontsize=11)
    ax1.set_ylabel('PGD Attack CVaR 5%', fontsize=11)
    ax1.set_title('Clean vs Adversarial Performance', fontsize=12, fontweight='bold')
    ax1.grid(True, alpha=0.3)
    ax1.legend(loc='lower right')
    
    # --- Plot 2: Radar/Spider Plot ---
    ax2 = axes[1]
    
    # Select models for radar (max 5 for readability)
    radar_models = model_names[:5] if len(model_names) > 5 else model_names
    
    # Metrics for radar
    metrics_names = ['Clean CVaR', 'FGSM CVaR', 'PGD CVaR', 'Clean Sharpe', 'Robustness']
    
    # Normalize metrics to 0-1 scale (higher = better)
    def normalize(values, higher_is_better=True):
        min_v, max_v = min(values), max(values)
        if max_v == min_v:
            return [0.5] * len(values)
        norm = [(v - min_v) / (max_v - min_v) for v in values]
        return norm if higher_is_better else [1 - n for n in norm]
    
    radar_data = {}
    for m in radar_models:
        r = results_attacks[m]
        # For CVaR, less negative is better
        radar_data[m] = [
            -r['clean']['cvar_05'],  # Negate so higher = better
            -r['fgsm']['cvar_05'],
            -r['pgd']['cvar_05'],
            r['clean'].get('sharpe_ratio', 0),
            -r['pgd_cvar_gap']  # Smaller gap = more robust = better
        ]
    
    # Normalize each metric across models
    for i in range(len(metrics_names)):
        values = [radar_data[m][i] for m in radar_models]
        norm_values = normalize(values)
        for j, m in enumerate(radar_models):
            radar_data[m][i] = norm_values[j]
    
    # Create radar chart
    angles = np.linspace(0, 2 * np.pi, len(metrics_names), endpoint=False).tolist()
    angles += angles[:1]  # Close the polygon
    
    radar_colors = ['#2563eb', '#dc2626', '#16a34a', '#9333ea', '#f59e0b']
    
    for idx, m in enumerate(radar_models):
        values = radar_data[m] + radar_data[m][:1]
        ax2.plot(angles, values, 'o-', linewidth=2, label=m, color=radar_colors[idx % len(radar_colors)])
        ax2.fill(angles, values, alpha=0.1, color=radar_colors[idx % len(radar_colors)])
    
    ax2.set_xticks(angles[:-1])
    ax2.set_xticklabels(metrics_names, fontsize=9)
    ax2.set_ylim(0, 1)
    ax2.set_title('Multi-Metric Comparison (Normalized)', fontsize=12, fontweight='bold')
    ax2.legend(loc='upper right', bbox_to_anchor=(1.3, 1), fontsize=8)
    
    # Convert to polar
    ax2.remove()
    ax2 = fig.add_subplot(1, 2, 2, projection='polar')
    
    for idx, m in enumerate(radar_models):
        values = radar_data[m] + radar_data[m][:1]
        ax2.plot(angles, values, 'o-', linewidth=2, label=m, color=radar_colors[idx % len(radar_colors)])
        ax2.fill(angles, values, alpha=0.15, color=radar_colors[idx % len(radar_colors)])
    
    ax2.set_xticks(angles[:-1])
    ax2.set_xticklabels(metrics_names, fontsize=9)
    ax2.set_ylim(0, 1)
    ax2.set_title('Multi-Metric Comparison\n(Normalized, outer=better)', fontsize=11, fontweight='bold')
    ax2.legend(loc='upper right', bbox_to_anchor=(1.35, 1), fontsize=8)
    
    plt.tight_layout()
    plt.savefig('../figures/adversarial_tradeoff_analysis.pdf', dpi=300, bbox_inches='tight')
    plt.show()
else:
    print("Need multiple models for trade-off analysis")

### Summary

Key findings:
- Standard lottery tickets are vulnerable to adversarial attacks
- Robustness gap tends to increase with sparsity
- PGD attacks are more effective than FGSM
- This motivates the need for adversarial training of sparse networks