# üìÖ Day 6: Robustness Analysis + Adversarial Training
## Noise Perturbation, Feature Perturbation, Adversarial Retraining ‚Äî GPU

---

**Steps:**
1. Load best models from Day 3-5
2. Gaussian noise perturbation test (0.01, 0.05, 0.1, 0.2, 0.5)
3. Feature perturbation (adversarial evasion simulation)
4. Adversarial retraining (add noisy samples)
5. Before vs After comparison

---

In [None]:
import os
os.add_dll_directory(r'C:\Program Files\NVIDIA GPU Computing Toolkit\CUDA\v13.1\bin\x64')

import numpy as np
import pandas as pd
import xgboost as xgb
from sklearn.metrics import accuracy_score, f1_score
from sklearn.utils.class_weight import compute_sample_weight
import matplotlib.pyplot as plt
import seaborn as sns
import time
import gc
import json
from datetime import datetime

plt.style.use('dark_background')
plt.rcParams['figure.figsize'] = (14, 6)
plt.rcParams['font.size'] = 12

os.makedirs('figures', exist_ok=True)

print(f"‚úÖ Ready | {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

In [None]:
# Load data
print("üì• Loading data...")
X_train = np.load('processed/X_train.npy')
X_test = np.load('processed/X_test.npy')

y_binary_train = np.load('processed/y_binary_train.npy')
y_binary_test = np.load('processed/y_binary_test.npy')
y_family_train = np.load('processed/y_family_train.npy')
y_family_test = np.load('processed/y_family_test.npy')
y_subtype_train = np.load('processed/y_subtype_train.npy')
y_subtype_test = np.load('processed/y_subtype_test.npy')

with open('processed/preprocessing_metadata.json', 'r') as f:
    meta = json.load(f)
feature_names = meta['feature_names']

# Load best models
print("üì¶ Loading trained models...")
bst_binary = xgb.Booster()
bst_binary.load_model('models/binary_xgb_gpu.json')

bst_family = xgb.Booster()
bst_family.load_model('models/family_best_xgb_gpu.json')

bst_subtype = xgb.Booster()
bst_subtype.load_model('models/subtype_xgb_gpu.json')

print("‚úÖ All loaded")

In [None]:
def evaluate_model(bst, X, y, level='binary'):
    """Evaluate a model and return F1 scores."""
    dmat = xgb.DMatrix(X, feature_names=feature_names)
    y_prob = bst.predict(dmat)
    
    if level == 'binary':
        y_pred = (y_prob > 0.5).astype(int)
        f1_mac = f1_score(y, y_pred, average='macro')
    else:
        y_pred = y_prob.argmax(axis=1)
        f1_mac = f1_score(y, y_pred, average='macro')
    
    acc = accuracy_score(y, y_pred)
    return acc, f1_mac

## üîä Step 1: Gaussian Noise Perturbation Test

In [None]:
noise_levels = [0.0, 0.01, 0.05, 0.1, 0.2, 0.5]
levels_config = [
    ('Binary', bst_binary, y_binary_test, 'binary'),
    ('8-Class', bst_family, y_family_test, 'multi'),
    ('34-Class', bst_subtype, y_subtype_test, 'multi')
]

noise_results = {level_name: {'accuracy': [], 'f1_macro': []} for level_name, _, _, _ in levels_config}

print("üîä Testing noise robustness...")
print("="*70)

for noise in noise_levels:
    print(f"\n   Noise level: {noise}")
    
    if noise == 0:
        X_noisy = X_test
    else:
        np.random.seed(42)
        noise_matrix = np.random.normal(0, noise, X_test.shape).astype(np.float32)
        X_noisy = X_test + noise_matrix
        del noise_matrix
    
    for level_name, bst, y, level_type in levels_config:
        acc, f1_mac = evaluate_model(bst, X_noisy, y, level_type)
        noise_results[level_name]['accuracy'].append(acc)
        noise_results[level_name]['f1_macro'].append(f1_mac)
        print(f"      {level_name:<10s}: Acc={acc*100:.2f}% | F1-Macro={f1_mac*100:.2f}%")
    
    if noise != 0:
        del X_noisy
        gc.collect()

In [None]:
# üìä Plot Noise vs F1
fig, axes = plt.subplots(1, 2, figsize=(18, 7))

colors = ['#00D4AA', '#FF6B6B', '#45B7D1']
markers = ['o', 's', '^']

for ax, metric_name in zip(axes, ['accuracy', 'f1_macro']):
    for i, (level_name, _, _, _) in enumerate(levels_config):
        vals = [v * 100 for v in noise_results[level_name][metric_name]]
        ax.plot(noise_levels, vals, color=colors[i], marker=markers[i],
                linewidth=2.5, markersize=10, label=level_name)
    
    ax.set_xlabel('Noise Level (œÉ)', fontsize=13)
    ax.set_ylabel(f'{metric_name.replace("_", " ").title()} (%)', fontsize=13)
    ax.set_title(f'{metric_name.replace("_", " ").title()} vs Noise Level', fontsize=14, fontweight='bold', color='white')
    ax.legend(fontsize=12)
    ax.grid(True, alpha=0.2)

plt.suptitle('üîä Robustness Test ‚Äî Gaussian Noise Perturbation', fontsize=16, fontweight='bold', color='#00D4AA', y=1.02)
plt.tight_layout()
plt.savefig('figures/robustness_noise_test.png', dpi=150, bbox_inches='tight', facecolor='#1a1a2e')
plt.show()
print("üíæ Saved to figures/robustness_noise_test.png")

## üéØ Step 2: Feature Perturbation (Adversarial Evasion)

In [None]:
# Get top-5 features from the binary model
importance = bst_binary.get_score(importance_type='weight')
sorted_imp = sorted(importance.items(), key=lambda x: x[1], reverse=True)[:5]

top5_indices = []
top5_names = []
for fname, score in sorted_imp:
    if fname.startswith('f'):
        try:
            idx = int(fname[1:])
            top5_indices.append(idx)
            top5_names.append(feature_names[idx] if idx < len(feature_names) else fname)
        except ValueError:
            pass
    elif fname in feature_names:
        idx = feature_names.index(fname)
        top5_indices.append(idx)
        top5_names.append(fname)

print(f"üéØ Top-5 features for perturbation:")
for i, (name, idx) in enumerate(zip(top5_names, top5_indices)):
    print(f"   {i+1}. {name} (index {idx})")

# Test perturbation of only top-5 features
perturbation_levels = [0.0, 0.05, 0.1, 0.2, 0.5, 1.0]
feature_perturb_results = {'accuracy': [], 'f1_macro': []}

print(f"\nüîä Perturbing top-5 features only:")
for noise in perturbation_levels:
    X_perturbed = X_test.copy()
    if noise > 0:
        np.random.seed(42)
        for idx in top5_indices:
            X_perturbed[:, idx] += np.random.normal(0, noise, X_test.shape[0]).astype(np.float32)
    
    acc, f1_mac = evaluate_model(bst_binary, X_perturbed, y_binary_test, 'binary')
    feature_perturb_results['accuracy'].append(acc)
    feature_perturb_results['f1_macro'].append(f1_mac)
    print(f"   œÉ={noise:.2f}: Acc={acc*100:.2f}% | F1={f1_mac*100:.2f}%")
    del X_perturbed

In [None]:
# üìä Plot Feature Perturbation
fig, ax = plt.subplots(figsize=(10, 6))

ax.plot(perturbation_levels, [v*100 for v in feature_perturb_results['f1_macro']],
        color='#FF4C61', marker='o', linewidth=2.5, markersize=10, label='Top-5 Features Only')

# Compare with full noise from earlier
full_noise_levels = [0.0, 0.05, 0.1, 0.2, 0.5]
full_noise_f1 = [v*100 for v in noise_results['Binary']['f1_macro'][:-1]]  # exclude 0.5 if missing
ax.plot(full_noise_levels[:len(full_noise_f1)], full_noise_f1,
        color='#00D4AA', marker='s', linewidth=2.5, markersize=10, label='All Features', linestyle='--')

ax.set_xlabel('Noise Level (œÉ)', fontsize=13)
ax.set_ylabel('F1-Macro (%)', fontsize=13)
ax.set_title('üéØ Adversarial Evasion: Top-5 Feature Perturbation vs All Features', fontsize=14, fontweight='bold', color='#00D4AA')
ax.legend(fontsize=12)
ax.grid(True, alpha=0.2)

plt.tight_layout()
plt.savefig('figures/robustness_feature_perturbation.png', dpi=150, bbox_inches='tight', facecolor='#1a1a2e')
plt.show()

## üõ°Ô∏è Step 3: Adversarial Retraining

In [None]:
# Generate noisy training data (noise=0.1)
print("üõ°Ô∏è Adversarial Retraining ‚Äî Adding noisy samples to training data...")
np.random.seed(42)
noise_level = 0.1
X_train_noisy = X_train + np.random.normal(0, noise_level, X_train.shape).astype(np.float32)

# Combine clean + noisy
X_train_combined = np.vstack([X_train, X_train_noisy])
y_binary_combined = np.concatenate([y_binary_train, y_binary_train])
y_family_combined = np.concatenate([y_family_train, y_family_train])
y_subtype_combined = np.concatenate([y_subtype_train, y_subtype_train])

del X_train_noisy
gc.collect()

print(f"   Original: {X_train.shape[0]:,} | Combined: {X_train_combined.shape[0]:,}")

In [None]:
# Retrain Binary model on GPU
print(f"\n{'='*60}")
print(f"üéÆ Adversarial Retraining ‚Äî Binary (GPU)")
print(f"{'='*60}")

n_benign = (y_binary_combined == 0).sum()
n_attack = (y_binary_combined == 1).sum()
spw = n_benign / n_attack if n_attack > 0 else 1.0

dtrain_adv = xgb.DMatrix(X_train_combined, label=y_binary_combined, feature_names=feature_names)
dtest_bin = xgb.DMatrix(X_test, label=y_binary_test, feature_names=feature_names)

xgb_params_bin = {
    'tree_method': 'hist', 'device': 'cuda', 'objective': 'binary:logistic',
    'eval_metric': ['logloss', 'error'], 'max_depth': 8, 'learning_rate': 0.1,
    'scale_pos_weight': spw, 'subsample': 0.8, 'colsample_bytree': 0.8,
    'verbosity': 1, 'seed': 42
}

t0 = time.time()
bst_adv_binary = xgb.train(
    xgb_params_bin, dtrain_adv,
    num_boost_round=300,
    evals=[(dtest_bin, 'test')],
    early_stopping_rounds=20,
    verbose_eval=50
)
print(f"   üéÆ GPU | ‚è±Ô∏è {time.time()-t0:.1f}s")

bst_adv_binary.save_model('models/binary_xgb_adversarial.json')
del dtrain_adv; gc.collect()

In [None]:
# Compare before vs after adversarial training
print("\n" + "="*80)
print("üìä BEFORE vs AFTER Adversarial Training")
print("="*80)

comparison_noise = [0.0, 0.01, 0.05, 0.1, 0.2, 0.5]
before_f1 = []
after_f1 = []

for noise in comparison_noise:
    if noise == 0:
        X_noisy = X_test
    else:
        np.random.seed(42)
        X_noisy = X_test + np.random.normal(0, noise, X_test.shape).astype(np.float32)
    
    _, f1_before = evaluate_model(bst_binary, X_noisy, y_binary_test, 'binary')
    _, f1_after = evaluate_model(bst_adv_binary, X_noisy, y_binary_test, 'binary')
    
    before_f1.append(f1_before)
    after_f1.append(f1_after)
    
    improvement = (f1_after - f1_before) * 100
    print(f"   œÉ={noise:.2f}: Before={f1_before*100:.2f}% | After={f1_after*100:.2f}% | Œî={improvement:+.2f}%")
    
    if noise != 0:
        del X_noisy

In [None]:
# üìä Before vs After plot
fig, ax = plt.subplots(figsize=(12, 7))

ax.plot(comparison_noise, [v*100 for v in before_f1], color='#FF4C61', marker='o',
        linewidth=2.5, markersize=10, label='Before Adv. Training')
ax.plot(comparison_noise, [v*100 for v in after_f1], color='#00D4AA', marker='s',
        linewidth=2.5, markersize=10, label='After Adv. Training')

# Fill the improvement area
ax.fill_between(comparison_noise, [v*100 for v in before_f1], [v*100 for v in after_f1],
                alpha=0.2, color='#00D4AA', label='Improvement')

ax.set_xlabel('Noise Level (œÉ)', fontsize=13)
ax.set_ylabel('F1-Macro (%)', fontsize=13)
ax.set_title('üõ°Ô∏è Adversarial Retraining ‚Äî Before vs After', fontsize=16, fontweight='bold', color='#00D4AA')
ax.legend(fontsize=12)
ax.grid(True, alpha=0.2)

plt.tight_layout()
plt.savefig('figures/robustness_adversarial_comparison.png', dpi=150, bbox_inches='tight', facecolor='#1a1a2e')
plt.show()
print("üíæ Saved to figures/robustness_adversarial_comparison.png")

In [None]:
# üìä Robustness Results Table
robustness_table = pd.DataFrame({
    'Noise Level': comparison_noise,
    'Before F1 (%)': [f*100 for f in before_f1],
    'After F1 (%)': [f*100 for f in after_f1],
    'Improvement': [(a-b)*100 for a, b in zip(after_f1, before_f1)]
})
print(robustness_table.to_string(index=False, float_format=lambda x: f'{x:.4f}'))

# Save
robust_results = {
    'timestamp': datetime.now().isoformat(),
    'device': 'GPU (CUDA)',
    'noise_test': noise_results,
    'feature_perturbation': feature_perturb_results,
    'adversarial_comparison': {
        'noise_levels': comparison_noise,
        'before_f1': before_f1,
        'after_f1': after_f1
    }
}
with open('models/robustness_results.json', 'w') as f:
    json.dump(robust_results, f, indent=2, default=str)

del X_train_combined, y_binary_combined, y_family_combined, y_subtype_combined
gc.collect()

print("\nüèÜ" * 20)
print(f"  ‚úÖ ROBUSTNESS ANALYSIS COMPLETE!")
print("üèÜ" * 20)