# VQC Inference and Analysis - NSL-KDD

This notebook loads trained Variational Quantum Classifier models and performs comprehensive evaluation and analysis.

**Tasks:**
- Load trained VQC models
- Evaluate on NSL-KDD test set
- Generate detailed performance metrics
- Compare with classical baselines
- Analyze model behavior

## 1. Setup and Imports

In [None]:
import sys
import os
from pathlib import Path

# Add project root to path
project_root = Path.cwd().parent.parent
sys.path.insert(0, str(project_root))

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import (
    classification_report, confusion_matrix, 
    roc_auc_score, roc_curve, precision_recall_curve,
    accuracy_score, precision_score, recall_score, f1_score
)
import json
import pickle
from datetime import datetime
import warnings
warnings.filterwarnings('ignore')

# PyTorch
import torch
import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader

# PennyLane
try:
    import pennylane as qml
    PENNYLANE_AVAILABLE = True
except ImportError:
    print("WARNING: PennyLane not installed")
    PENNYLANE_AVAILABLE = False

# Import custom models
from src.models.quantum.pennylane_models import HybridQuantumClassifier

# Set style
sns.set_style('whitegrid')
plt.rcParams['figure.figsize'] = (12, 6)

print("All imports successful!")

In [None]:
# Check device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

## 2. Configuration and Paths

In [None]:
# Paths
DATA_PATH = project_root / 'data' / 'raw' / 'NSL-KDD' / 'NSL-KDD Dataset'
MODELS_PATH = project_root / 'results' / 'models' / 'quantum'
LOGS_PATH = project_root / 'results' / 'logs'
FIGURES_PATH = project_root / 'results' / 'figures'

# List available models
print("Available VQC models:")
model_files = sorted(MODELS_PATH.glob('vqc_hybrid_nsl_kdd_*.pt'))
for i, model_file in enumerate(model_files):
    print(f"  {i+1}. {model_file.name}")

if not model_files:
    print("  No VQC models found. Please run 01_vqc_nsl_kdd.ipynb first.")

In [None]:
# Select the most recent model (or specify index)
if model_files:
    MODEL_FILE = model_files[-1]  # Most recent
    print(f"\nUsing model: {MODEL_FILE.name}")
    
    # Extract timestamp from filename
    timestamp = MODEL_FILE.stem.split('_')[-2] + '_' + MODEL_FILE.stem.split('_')[-1]
    
    # Find corresponding preprocessing file
    PREPROCESSING_FILE = MODELS_PATH / f'vqc_preprocessing_{timestamp}.pkl'
    
    if PREPROCESSING_FILE.exists():
        print(f"Preprocessing file: {PREPROCESSING_FILE.name}")
    else:
        print(f"WARNING: Preprocessing file not found: {PREPROCESSING_FILE.name}")

## 3. Load Preprocessing Pipeline

In [None]:
# Load preprocessing pipeline
with open(PREPROCESSING_FILE, 'rb') as f:
    preprocessing = pickle.load(f)

scaler = preprocessing['scaler']
pca = preprocessing['pca']
feature_cols = preprocessing['feature_cols']
n_qubits = preprocessing['n_qubits']

print(f"Preprocessing pipeline loaded:")
print(f"  Original features: {len(feature_cols)}")
print(f"  PCA components: {n_qubits}")
print(f"  Explained variance: {pca.explained_variance_ratio_.sum():.4f}")

## 4. Load Model

In [None]:
# Load model checkpoint
checkpoint = torch.load(MODEL_FILE, map_location=device)

# Extract configurations
model_config = checkpoint['model_config']
training_config = checkpoint['training_config']
test_metrics = checkpoint.get('test_metrics', {})

print("Model Configuration:")
for key, value in model_config.items():
    print(f"  {key}: {value}")

print("\nTraining Info:")
for key, value in training_config.items():
    print(f"  {key}: {value}")

In [None]:
# Recreate model
model = HybridQuantumClassifier(**model_config)
model.load_state_dict(checkpoint['model_state_dict'])
model = model.to(device)
model.eval()

print("Model loaded successfully!")
print(f"Total parameters: {sum(p.numel() for p in model.parameters()):,}")

## 5. Load and Preprocess Test Data

In [None]:
# NSL-KDD columns
COLUMNS = [
    'duration', 'protocol_type', 'service', 'flag', 'src_bytes', 'dst_bytes',
    'land', 'wrong_fragment', 'urgent', 'hot', 'num_failed_logins', 'logged_in',
    'num_compromised', 'root_shell', 'su_attempted', 'num_root', 'num_file_creations',
    'num_shells', 'num_access_files', 'num_outbound_cmds', 'is_host_login',
    'is_guest_login', 'count', 'srv_count', 'serror_rate', 'srv_serror_rate',
    'rerror_rate', 'srv_rerror_rate', 'same_srv_rate', 'diff_srv_rate',
    'srv_diff_host_rate', 'dst_host_count', 'dst_host_srv_count',
    'dst_host_same_srv_rate', 'dst_host_diff_srv_rate', 'dst_host_same_src_port_rate',
    'dst_host_srv_diff_host_rate', 'dst_host_serror_rate', 'dst_host_srv_serror_rate',
    'dst_host_rerror_rate', 'dst_host_srv_rerror_rate', 'label', 'difficulty'
]

# Load test data
test_df = pd.read_csv(DATA_PATH / 'KDDTest+.txt', header=None, names=COLUMNS)
test_df['binary_label'] = (test_df['label'] != 'normal').astype(int)

print(f"Test samples: {len(test_df):,}")
print("\nTest distribution:")
print(test_df['binary_label'].value_counts())

In [None]:
# Preprocess test data
categorical_cols = ['protocol_type', 'service', 'flag']
test_encoded = pd.get_dummies(test_df, columns=categorical_cols)

# Ensure all feature columns are present
for col in feature_cols:
    if col not in test_encoded.columns:
        test_encoded[col] = 0

# Extract features
X_test = test_encoded[feature_cols].values
y_test = test_df['binary_label'].values

# Apply preprocessing pipeline
X_test_scaled = scaler.transform(X_test)
X_test_pca = pca.transform(X_test_scaled)

print(f"Test data shape after preprocessing: {X_test_pca.shape}")

In [None]:
# Create DataLoader
X_test_tensor = torch.FloatTensor(X_test_pca)
y_test_tensor = torch.LongTensor(y_test)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

print(f"Test batches: {len(test_loader)}")

## 6. Generate Predictions

In [None]:
from tqdm.notebook import tqdm
import time

# Generate predictions
print("Generating predictions...")
all_preds = []
all_labels = []
all_probs = []
inference_times = []

model.eval()
with torch.no_grad():
    for X_batch, y_batch in tqdm(test_loader, desc='Inference'):
        X_batch = X_batch.to(device)
        
        start_time = time.time()
        outputs = model(X_batch)
        inference_times.append(time.time() - start_time)
        
        probs = torch.softmax(outputs, dim=1)
        _, predicted = outputs.max(1)
        
        all_preds.extend(predicted.cpu().numpy())
        all_labels.extend(y_batch.numpy())
        all_probs.extend(probs.cpu().numpy())

all_preds = np.array(all_preds)
all_labels = np.array(all_labels)
all_probs = np.array(all_probs)

# Inference statistics
total_inference_time = sum(inference_times)
avg_time_per_batch = np.mean(inference_times)
avg_time_per_sample = total_inference_time / len(all_preds)

print(f"\nInference Statistics:")
print(f"Total inference time: {total_inference_time:.2f} seconds")
print(f"Average time per batch: {avg_time_per_batch:.4f} seconds")
print(f"Average time per sample: {avg_time_per_sample*1000:.2f} ms")
print(f"Throughput: {len(all_preds)/total_inference_time:.2f} samples/second")

## 7. Performance Metrics

In [None]:
# Calculate metrics
accuracy = accuracy_score(all_labels, all_preds)
precision = precision_score(all_labels, all_preds)
recall = recall_score(all_labels, all_preds)
f1 = f1_score(all_labels, all_preds)
roc_auc = roc_auc_score(all_labels, all_probs[:, 1])

print("\n" + "="*60)
print("HYBRID VQC - TEST SET PERFORMANCE")
print("="*60)
print(f"Accuracy:  {accuracy:.4f} ({accuracy*100:.2f}%)")
print(f"Precision: {precision:.4f}")
print(f"Recall:    {recall:.4f}")
print(f"F1-Score:  {f1:.4f}")
print(f"ROC-AUC:   {roc_auc:.4f}")
print("="*60)

In [None]:
# Detailed classification report
print("\n" + "="*60)
print("CLASSIFICATION REPORT")
print("="*60)
print(classification_report(all_labels, all_preds, 
                          target_names=['Normal', 'Attack'], 
                          digits=4))

In [None]:
# Confusion matrix
cm = confusion_matrix(all_labels, all_preds)
print("\nConfusion Matrix:")
print(cm)
print(f"\nTrue Negatives:  {cm[0,0]:,}")
print(f"False Positives: {cm[0,1]:,}")
print(f"False Negatives: {cm[1,0]:,}")
print(f"True Positives:  {cm[1,1]:,}")

## 8. Visualizations

In [None]:
# Confusion Matrix Heatmap
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
            xticklabels=['Normal', 'Attack'],
            yticklabels=['Normal', 'Attack'],
            cbar_kws={'label': 'Count'})
plt.xlabel('Predicted Label', fontsize=12)
plt.ylabel('True Label', fontsize=12)
plt.title('Confusion Matrix - Hybrid VQC (Inference)', fontsize=14, fontweight='bold')
plt.tight_layout()
plt.savefig(FIGURES_PATH / 'vqc_inference_confusion_matrix.png', dpi=150, bbox_inches='tight')
plt.show()

In [None]:
# ROC Curve
fpr, tpr, thresholds = roc_curve(all_labels, all_probs[:, 1])

plt.figure(figsize=(8, 6))
plt.plot(fpr, tpr, linewidth=2, label=f'VQC (AUC = {roc_auc:.4f})')
plt.plot([0, 1], [0, 1], 'k--', linewidth=1, label='Random Classifier')
plt.xlabel('False Positive Rate', fontsize=12)
plt.ylabel('True Positive Rate', fontsize=12)
plt.title('ROC Curve - Hybrid VQC', fontsize=14, fontweight='bold')
plt.legend(fontsize=11, loc='lower right')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(FIGURES_PATH / 'vqc_inference_roc_curve.png', dpi=150, bbox_inches='tight')
plt.show()

In [None]:
# Precision-Recall Curve
precision_curve, recall_curve, pr_thresholds = precision_recall_curve(all_labels, all_probs[:, 1])

plt.figure(figsize=(8, 6))
plt.plot(recall_curve, precision_curve, linewidth=2, label='VQC')
plt.xlabel('Recall', fontsize=12)
plt.ylabel('Precision', fontsize=12)
plt.title('Precision-Recall Curve', fontsize=14, fontweight='bold')
plt.legend(fontsize=11)
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(FIGURES_PATH / 'vqc_inference_pr_curve.png', dpi=150, bbox_inches='tight')
plt.show()

In [None]:
# Prediction confidence distribution
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 5))

# For Normal samples
normal_probs = all_probs[all_labels == 0, 0]
ax1.hist(normal_probs, bins=50, alpha=0.7, color='blue', edgecolor='black')
ax1.axvline(0.5, color='red', linestyle='--', linewidth=2, label='Decision Threshold')
ax1.set_xlabel('Confidence (Probability of Normal)', fontsize=12)
ax1.set_ylabel('Count', fontsize=12)
ax1.set_title('Prediction Confidence for Normal Samples', fontsize=13, fontweight='bold')
ax1.legend()
ax1.grid(True, alpha=0.3)

# For Attack samples
attack_probs = all_probs[all_labels == 1, 1]
ax2.hist(attack_probs, bins=50, alpha=0.7, color='red', edgecolor='black')
ax2.axvline(0.5, color='blue', linestyle='--', linewidth=2, label='Decision Threshold')
ax2.set_xlabel('Confidence (Probability of Attack)', fontsize=12)
ax2.set_ylabel('Count', fontsize=12)
ax2.set_title('Prediction Confidence for Attack Samples', fontsize=13, fontweight='bold')
ax2.legend()
ax2.grid(True, alpha=0.3)

plt.tight_layout()
plt.savefig(FIGURES_PATH / 'vqc_inference_confidence.png', dpi=150, bbox_inches='tight')
plt.show()

## 9. Error Analysis

In [None]:
# Identify misclassified samples
false_positives = (all_labels == 0) & (all_preds == 1)
false_negatives = (all_labels == 1) & (all_preds == 0)

print("Error Analysis:")
print(f"False Positives: {false_positives.sum():,} (Normal classified as Attack)")
print(f"False Negatives: {false_negatives.sum():,} (Attack classified as Normal)")
print(f"\nFalse Positive Rate: {false_positives.sum() / (all_labels == 0).sum():.4f}")
print(f"False Negative Rate: {false_negatives.sum() / (all_labels == 1).sum():.4f}")

In [None]:
# Analyze confidence of misclassified samples
fp_confidences = all_probs[false_positives, 1]  # Confidence in Attack class
fn_confidences = all_probs[false_negatives, 0]  # Confidence in Normal class

fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 5))

if len(fp_confidences) > 0:
    ax1.hist(fp_confidences, bins=30, alpha=0.7, color='orange', edgecolor='black')
    ax1.axvline(fp_confidences.mean(), color='red', linestyle='--', linewidth=2, 
                label=f'Mean: {fp_confidences.mean():.3f}')
    ax1.set_xlabel('Confidence in Attack Prediction', fontsize=12)
    ax1.set_ylabel('Count', fontsize=12)
    ax1.set_title('False Positives - Prediction Confidence', fontsize=13, fontweight='bold')
    ax1.legend()
    ax1.grid(True, alpha=0.3)

if len(fn_confidences) > 0:
    ax2.hist(fn_confidences, bins=30, alpha=0.7, color='purple', edgecolor='black')
    ax2.axvline(fn_confidences.mean(), color='red', linestyle='--', linewidth=2,
                label=f'Mean: {fn_confidences.mean():.3f}')
    ax2.set_xlabel('Confidence in Normal Prediction', fontsize=12)
    ax2.set_ylabel('Count', fontsize=12)
    ax2.set_title('False Negatives - Prediction Confidence', fontsize=13, fontweight='bold')
    ax2.legend()
    ax2.grid(True, alpha=0.3)

plt.tight_layout()
plt.savefig(FIGURES_PATH / 'vqc_inference_error_analysis.png', dpi=150, bbox_inches='tight')
plt.show()

## 10. Comparison with Classical Models

Load results from classical models (if available) for comparison.

In [None]:
# Try to load classical model results for comparison
classical_results = {}

# Check for CNN results
cnn_metrics_files = sorted(LOGS_PATH.glob('cnn_*_metrics.json'))
if cnn_metrics_files:
    with open(cnn_metrics_files[-1], 'r') as f:
        cnn_data = json.load(f)
        if 'test_performance' in cnn_data:
            classical_results['CNN'] = cnn_data['test_performance'].get('accuracy', 0) * 100

# Check for LSTM results
lstm_metrics_files = sorted(LOGS_PATH.glob('lstm_*_metrics.json'))
if lstm_metrics_files:
    with open(lstm_metrics_files[-1], 'r') as f:
        lstm_data = json.load(f)
        if 'test_performance' in lstm_data:
            classical_results['LSTM'] = lstm_data['test_performance'].get('accuracy', 0) * 100

# Check for Transformer results
transformer_metrics_files = sorted(LOGS_PATH.glob('transformer_*_metrics.json'))
if transformer_metrics_files:
    with open(transformer_metrics_files[-1], 'r') as f:
        transformer_data = json.load(f)
        if 'test_performance' in transformer_data:
            classical_results['Transformer'] = transformer_data['test_performance'].get('accuracy', 0) * 100

# Add VQC results
classical_results['VQC (Quantum)'] = accuracy * 100

print("Model Comparison (Test Accuracy):")
for model_name, acc in classical_results.items():
    print(f"  {model_name:20s}: {acc:.2f}%")

In [None]:
# Visualize comparison
if len(classical_results) > 1:
    fig, ax = plt.subplots(figsize=(10, 6))
    
    models = list(classical_results.keys())
    accuracies = list(classical_results.values())
    
    colors = ['steelblue'] * (len(models) - 1) + ['orange']  # Highlight VQC
    bars = ax.bar(models, accuracies, color=colors, edgecolor='black', linewidth=1.5)
    
    # Add value labels on bars
    for bar in bars:
        height = bar.get_height()
        ax.text(bar.get_x() + bar.get_width()/2., height,
                f'{height:.2f}%',
                ha='center', va='bottom', fontsize=11, fontweight='bold')
    
    ax.set_ylabel('Test Accuracy (%)', fontsize=12)
    ax.set_title('Model Comparison - NSL-KDD Binary Classification', fontsize=14, fontweight='bold')
    ax.set_ylim(0, 100)
    ax.grid(True, alpha=0.3, axis='y')
    plt.xticks(rotation=45, ha='right')
    plt.tight_layout()
    plt.savefig(FIGURES_PATH / 'vqc_model_comparison.png', dpi=150, bbox_inches='tight')
    plt.show()
else:
    print("Not enough models for comparison. Run classical models first.")

## 11. Save Inference Results

In [None]:
# Save detailed inference results
inference_results = {
    'model_file': MODEL_FILE.name,
    'timestamp': datetime.now().isoformat(),
    'test_samples': int(len(all_labels)),
    'metrics': {
        'accuracy': float(accuracy),
        'precision': float(precision),
        'recall': float(recall),
        'f1_score': float(f1),
        'roc_auc': float(roc_auc)
    },
    'confusion_matrix': cm.tolist(),
    'error_analysis': {
        'false_positives': int(false_positives.sum()),
        'false_negatives': int(false_negatives.sum()),
        'false_positive_rate': float(false_positives.sum() / (all_labels == 0).sum()),
        'false_negative_rate': float(false_negatives.sum() / (all_labels == 1).sum())
    },
    'inference_performance': {
        'total_time_seconds': float(total_inference_time),
        'avg_time_per_sample_ms': float(avg_time_per_sample * 1000),
        'throughput_samples_per_second': float(len(all_preds) / total_inference_time)
    },
    'model_comparison': classical_results
}

results_file = LOGS_PATH / f'vqc_inference_results_{datetime.now().strftime("%Y%m%d_%H%M%S")}.json'
with open(results_file, 'w') as f:
    json.dump(inference_results, f, indent=2)

print(f"Inference results saved to: {results_file}")

## 12. Summary

In [None]:
print("\n" + "="*70)
print("VQC INFERENCE SUMMARY")
print("="*70)
print(f"\nModel: {MODEL_FILE.name}")
print(f"Test Samples: {len(all_labels):,}")
print(f"\nPerformance Metrics:")
print(f"  Accuracy:  {accuracy:.4f} ({accuracy*100:.2f}%)")
print(f"  Precision: {precision:.4f}")
print(f"  Recall:    {recall:.4f}")
print(f"  F1-Score:  {f1:.4f}")
print(f"  ROC-AUC:   {roc_auc:.4f}")
print(f"\nInference Speed:")
print(f"  Total time: {total_inference_time:.2f} seconds")
print(f"  Per sample: {avg_time_per_sample*1000:.2f} ms")
print(f"  Throughput: {len(all_preds)/total_inference_time:.2f} samples/sec")
print(f"\nErrors:")
print(f"  False Positives: {false_positives.sum():,}")
print(f"  False Negatives: {false_negatives.sum():,}")
print("="*70)