# Notebook 8: Quantum Machine Learning (QML) Models

**Purpose**: Train and evaluate QML models using PennyLane.

**Models**:
1. Variational Quantum Classifier (VQC)
2. Hybrid Quantum Neural Network (QNN)
3. Quantum Kernel SVM (QSVM)
4. Quantum Autoencoder
5. Quantum GAN

---

In [None]:
import sys
sys.path.append('..')

import numpy as np
import pandas as pd
import json
import time
from pathlib import Path
import matplotlib.pyplot as plt

import pennylane as qml
from pennylane import numpy as pnp
from pennylane.templates import AngleEmbedding, StronglyEntanglingLayers

import torch
import torch.nn as nn
import torch.optim as optim

from sklearn.svm import SVC
from sklearn.metrics import (
    accuracy_score, precision_score, recall_score, f1_score,
    roc_auc_score, roc_curve, confusion_matrix
)
import warnings
warnings.filterwarnings('ignore')

RANDOM_SEED = 42
np.random.seed(RANDOM_SEED)
torch.manual_seed(RANDOM_SEED)

BASE_DIR = Path('.').resolve().parent
FEATURES_DIR = BASE_DIR / 'data' / 'features'
RESULTS_DIR = BASE_DIR / 'results'
MODELS_DIR = BASE_DIR / 'models'
FIGURES_DIR = BASE_DIR / 'figures'

TARGET_COLUMN = 'Class'

print(f"PennyLane version: {qml.__version__}")

In [None]:
def safe_normalize(arr):
    arr = np.asarray(arr, dtype=float)
    min_val, max_val = arr.min(), arr.max()
    if max_val - min_val < 1e-10:
        return np.full_like(arr, 0.5)
    return (arr - min_val) / (max_val - min_val)

## 1. Load Data

In [None]:
train_df = pd.read_csv(FEATURES_DIR / 'pca_train.csv')
test_df = pd.read_csv(FEATURES_DIR / 'pca_test.csv')

X_train_full = train_df.drop(columns=[TARGET_COLUMN]).values.astype(np.float64)
y_train = train_df[TARGET_COLUMN].values.astype(np.int32)

X_test_full = test_df.drop(columns=[TARGET_COLUMN]).values.astype(np.float64)
y_test = test_df[TARGET_COLUMN].values.astype(np.int32)

print(f"Training: {X_train_full.shape}, Test: {X_test_full.shape}")

In [None]:
N_QUBITS = 4
N_SHOTS = 1024

X_train = X_train_full[:, :N_QUBITS]
X_test = X_test_full[:, :N_QUBITS]

X_train_scaled = (X_train - X_train.min()) / (X_train.max() - X_train.min() + 1e-8) * 2 * np.pi
X_test_scaled = (X_test - X_test.min()) / (X_test.max() - X_test.min() + 1e-8) * 2 * np.pi

print(f"QML input shape: {X_train.shape}")
print(f"Number of qubits: {N_QUBITS}")

In [None]:
from sklearn.model_selection import train_test_split

MAX_TRAIN_SAMPLES = 200
MAX_TEST_SAMPLES = 100

if len(X_train) > MAX_TRAIN_SAMPLES:
    X_train_qml, _, y_train_qml, _ = train_test_split(
        X_train_scaled, y_train, train_size=MAX_TRAIN_SAMPLES, 
        stratify=y_train, random_state=RANDOM_SEED
    )
else:
    X_train_qml, y_train_qml = X_train_scaled, y_train

if len(X_test) > MAX_TEST_SAMPLES:
    X_test_qml, _, y_test_qml, _ = train_test_split(
        X_test_scaled, y_test, train_size=MAX_TEST_SAMPLES, 
        stratify=y_test, random_state=RANDOM_SEED
    )
else:
    X_test_qml, y_test_qml = X_test_scaled, y_test

print(f"QML Training samples: {len(X_train_qml)}")
print(f"QML Test samples: {len(X_test_qml)}")

In [None]:
dev = qml.device('default.qubit', wires=N_QUBITS)
print(f"Using device: {dev.name}")

In [None]:
def compute_metrics(y_true, y_pred, y_prob=None):
    tn, fp, fn, tp = confusion_matrix(y_true, y_pred, labels=[0, 1]).ravel()
    metrics = {
        'accuracy': accuracy_score(y_true, y_pred),
        'precision': precision_score(y_true, y_pred, zero_division=0),
        'recall': recall_score(y_true, y_pred, zero_division=0),
        'f1_score': f1_score(y_true, y_pred, zero_division=0),
        'fpr': fp / (fp + tn) if (fp + tn) > 0 else 0,
        'tpr': tp / (tp + fn) if (tp + fn) > 0 else 0,
    }
    if y_prob is not None and not np.any(np.isnan(y_prob)):
        try:
            metrics['roc_auc'] = roc_auc_score(y_true, y_prob)
        except:
            metrics['roc_auc'] = 0.5
    else:
        metrics['roc_auc'] = 0.5
    return metrics

## 2. Model 1: Variational Quantum Classifier (VQC)

In [None]:
n_layers = 2

@qml.qnode(dev)
def vqc_circuit(weights, x):
    AngleEmbedding(x, wires=range(N_QUBITS))
    StronglyEntanglingLayers(weights, wires=range(N_QUBITS))
    return qml.expval(qml.PauliZ(0))

weight_shape = StronglyEntanglingLayers.shape(n_layers=n_layers, n_wires=N_QUBITS)
weights_init = pnp.random.randn(*weight_shape, requires_grad=True) * 0.1

print(f"VQC weight shape: {weight_shape}")

In [None]:
print("Training VQC...")

def vqc_cost(weights, X, y):
    predictions = pnp.array([vqc_circuit(weights, x) for x in X])
    predictions = (predictions + 1) / 2  # Map [-1, 1] to [0, 1]
    # MSE loss (simpler and autograd-compatible)
    loss = pnp.mean((predictions - y) ** 2)
    return loss

opt = qml.GradientDescentOptimizer(stepsize=0.1)
weights_vqc = weights_init.copy()

epochs = 30
history_vqc = []

start_time = time.time()
for epoch in range(epochs):
    weights_vqc, cost = opt.step_and_cost(lambda w: vqc_cost(w, X_train_qml, y_train_qml), weights_vqc)
    history_vqc.append(float(cost))
    if (epoch + 1) % 10 == 0:
        print(f"  Epoch {epoch+1}/{epochs}, Loss: {cost:.4f}")
train_time_vqc = time.time() - start_time

In [None]:
start_time = time.time()
y_prob_vqc = np.array([(float(vqc_circuit(weights_vqc, x)) + 1) / 2 for x in X_test_qml])
y_pred_vqc = (y_prob_vqc > 0.5).astype(int)
inference_time_vqc = time.time() - start_time

metrics_vqc = compute_metrics(y_test_qml, y_pred_vqc, y_prob_vqc)
metrics_vqc.update({
    'model': 'VQC', 'n_qubits': N_QUBITS, 'circuit_depth': n_layers,
    'n_shots': N_SHOTS, 'train_time': train_time_vqc, 'inference_time': inference_time_vqc
})
print(f"VQC - F1: {metrics_vqc['f1_score']:.4f}, ROC-AUC: {metrics_vqc['roc_auc']:.4f}")

## 3. Model 2: Hybrid Quantum Neural Network

In [None]:
@qml.qnode(dev, interface='torch')
def hybrid_qnn_circuit(inputs, weights):
    AngleEmbedding(inputs, wires=range(N_QUBITS))
    StronglyEntanglingLayers(weights, wires=range(N_QUBITS))
    return [qml.expval(qml.PauliZ(i)) for i in range(N_QUBITS)]

class HybridQNN(nn.Module):
    def __init__(self, n_qubits, n_layers):
        super().__init__()
        weight_shape = StronglyEntanglingLayers.shape(n_layers=n_layers, n_wires=n_qubits)
        self.q_weights = nn.Parameter(torch.randn(*weight_shape) * 0.1)
        self.post_net = nn.Sequential(
            nn.Linear(n_qubits, 8), nn.ReLU(), nn.Linear(8, 1), nn.Sigmoid()
        )
    
    def forward(self, x):
        q_out = hybrid_qnn_circuit(x, self.q_weights)
        q_out = torch.stack(q_out, dim=-1)
        return self.post_net(q_out)

In [None]:
print("\nTraining Hybrid QNN...")

model_hqnn = HybridQNN(N_QUBITS, n_layers=2)
optimizer = optim.Adam(model_hqnn.parameters(), lr=0.01)
criterion = nn.BCELoss()

X_train_t = torch.tensor(X_train_qml, dtype=torch.float32)
y_train_t = torch.tensor(y_train_qml, dtype=torch.float32)

history_hqnn = []
epochs = 20

start_time = time.time()
for epoch in range(epochs):
    model_hqnn.train()
    epoch_loss = 0
    for i in range(len(X_train_t)):
        optimizer.zero_grad()
        output = model_hqnn(X_train_t[i]).squeeze()
        loss = criterion(output, y_train_t[i])
        loss.backward()
        optimizer.step()
        epoch_loss += loss.item()
    history_hqnn.append(epoch_loss / len(X_train_t))
    if (epoch + 1) % 5 == 0:
        print(f"  Epoch {epoch+1}/{epochs}, Loss: {history_hqnn[-1]:.4f}")
train_time_hqnn = time.time() - start_time

In [None]:
model_hqnn.eval()
X_test_t = torch.tensor(X_test_qml, dtype=torch.float32)

start_time = time.time()
with torch.no_grad():
    y_prob_hqnn = np.array([model_hqnn(x).item() for x in X_test_t])
y_pred_hqnn = (y_prob_hqnn > 0.5).astype(int)
inference_time_hqnn = time.time() - start_time

metrics_hqnn = compute_metrics(y_test_qml, y_pred_hqnn, y_prob_hqnn)
metrics_hqnn.update({
    'model': 'Hybrid_QNN', 'n_qubits': N_QUBITS, 'circuit_depth': 2,
    'n_shots': N_SHOTS, 'train_time': train_time_hqnn, 'inference_time': inference_time_hqnn
})
print(f"Hybrid QNN - F1: {metrics_hqnn['f1_score']:.4f}, ROC-AUC: {metrics_hqnn['roc_auc']:.4f}")

## 4. Model 3: Quantum Kernel SVM (QSVM)

In [None]:
@qml.qnode(dev)
def quantum_kernel_circuit(x1, x2):
    AngleEmbedding(x1, wires=range(N_QUBITS))
    qml.adjoint(AngleEmbedding)(x2, wires=range(N_QUBITS))
    return qml.probs(wires=range(N_QUBITS))

def quantum_kernel(x1, x2):
    probs = quantum_kernel_circuit(x1, x2)
    return float(probs[0])

In [None]:
print("\nComputing Quantum Kernel Matrix...")

n_train_qsvm = min(50, len(X_train_qml))
n_test_qsvm = min(50, len(X_test_qml))

X_train_qsvm = X_train_qml[:n_train_qsvm]
y_train_qsvm = y_train_qml[:n_train_qsvm]
X_test_qsvm = X_test_qml[:n_test_qsvm]
y_test_qsvm = y_test_qml[:n_test_qsvm]

start_time = time.time()
K_train = np.zeros((n_train_qsvm, n_train_qsvm))
for i in range(n_train_qsvm):
    for j in range(i, n_train_qsvm):
        k_val = quantum_kernel(X_train_qsvm[i], X_train_qsvm[j])
        K_train[i, j] = k_val
        K_train[j, i] = k_val
    if (i + 1) % 10 == 0:
        print(f"  Training kernel: {i+1}/{n_train_qsvm}")

K_test = np.zeros((n_test_qsvm, n_train_qsvm))
for i in range(n_test_qsvm):
    for j in range(n_train_qsvm):
        K_test[i, j] = quantum_kernel(X_test_qsvm[i], X_train_qsvm[j])

kernel_time = time.time() - start_time
print(f"Kernel time: {kernel_time:.2f}s")

In [None]:
print("Training QSVM...")

start_time = time.time()
qsvm = SVC(kernel='precomputed', probability=True, random_state=RANDOM_SEED)
qsvm.fit(K_train, y_train_qsvm)
train_time_qsvm = time.time() - start_time + kernel_time

start_time = time.time()
y_pred_qsvm = qsvm.predict(K_test)
y_prob_qsvm = qsvm.predict_proba(K_test)[:, 1]
inference_time_qsvm = time.time() - start_time

metrics_qsvm = compute_metrics(y_test_qsvm, y_pred_qsvm, y_prob_qsvm)
metrics_qsvm.update({
    'model': 'QSVM', 'n_qubits': N_QUBITS, 'circuit_depth': 1,
    'n_shots': N_SHOTS, 'train_time': train_time_qsvm, 'inference_time': inference_time_qsvm
})
print(f"QSVM - F1: {metrics_qsvm['f1_score']:.4f}, ROC-AUC: {metrics_qsvm['roc_auc']:.4f}")

## 5. Model 4: Quantum Autoencoder

In [None]:
n_latent = 2
n_trash = N_QUBITS - n_latent

@qml.qnode(dev)
def qae_circuit(weights, x):
    AngleEmbedding(x, wires=range(N_QUBITS))
    for i in range(N_QUBITS):
        qml.RY(weights[0, i], wires=i)
        qml.RZ(weights[1, i], wires=i)
    for i in range(N_QUBITS - 1):
        qml.CNOT(wires=[i, i+1])
    return [qml.expval(qml.PauliZ(i)) for i in range(n_trash)]

In [None]:
print("\nTraining Quantum Autoencoder...")

X_train_normal_qml = X_train_qml[y_train_qml == 0][:50]
weights_qae = pnp.random.randn(2, N_QUBITS, requires_grad=True) * 0.1
opt = qml.GradientDescentOptimizer(stepsize=0.1)

def qae_cost(weights):
    total_cost = 0.0
    for x in X_train_normal_qml:
        trash_exp = qae_circuit(weights, x)
        total_cost += sum((1 - e) ** 2 for e in trash_exp)
    return total_cost / len(X_train_normal_qml)

history_qae = []
epochs = 20

start_time = time.time()
for epoch in range(epochs):
    weights_qae, cost = opt.step_and_cost(qae_cost, weights_qae)
    history_qae.append(float(cost))
    if (epoch + 1) % 5 == 0:
        print(f"  Epoch {epoch+1}/{epochs}, Loss: {cost:.4f}")
train_time_qae = time.time() - start_time

In [None]:
print("Evaluating Quantum Autoencoder...")

start_time = time.time()
train_losses = []
for x in X_train_normal_qml:
    trash_exp = qae_circuit(weights_qae, x)
    train_losses.append(float(sum((1 - e) ** 2 for e in trash_exp)))
threshold = np.percentile(train_losses, 95)

test_losses = []
for x in X_test_qml:
    trash_exp = qae_circuit(weights_qae, x)
    test_losses.append(float(sum((1 - e) ** 2 for e in trash_exp)))
    
test_losses = np.array(test_losses)
y_pred_qae = (test_losses > threshold).astype(int)
y_prob_qae = safe_normalize(test_losses)
inference_time_qae = time.time() - start_time

metrics_qae = compute_metrics(y_test_qml, y_pred_qae, y_prob_qae)
metrics_qae.update({
    'model': 'Quantum_Autoencoder', 'n_qubits': N_QUBITS, 'circuit_depth': 2,
    'n_shots': N_SHOTS, 'train_time': train_time_qae, 'inference_time': inference_time_qae
})
print(f"Quantum AE - F1: {metrics_qae['f1_score']:.4f}, ROC-AUC: {metrics_qae['roc_auc']:.4f}")

## 6. Model 5: Quantum GAN

In [None]:
n_qubits_gan = 2
dev_gan = qml.device('default.qubit', wires=n_qubits_gan)

@qml.qnode(dev_gan)
def qgan_discriminator(weights, x):
    qml.RY(x[0], wires=0)
    qml.RY(x[1], wires=1)
    qml.RY(weights[0], wires=0)
    qml.RY(weights[1], wires=1)
    qml.CNOT(wires=[0, 1])
    qml.RY(weights[2], wires=0)
    qml.RY(weights[3], wires=1)
    return qml.expval(qml.PauliZ(0))

In [None]:
print("\nTraining QGAN Discriminator...")

weights_qgan = pnp.random.randn(4, requires_grad=True) * 0.1
opt = qml.GradientDescentOptimizer(stepsize=0.05)

X_train_gan = X_train_qml[:, :2]
X_test_gan = X_test_qml[:, :2]

def qgan_cost(weights):
    loss = 0.0
    for x, y in zip(X_train_gan[:50], y_train_qml[:50]):
        pred = (qgan_discriminator(weights, x) + 1) / 2
        loss += (pred - y) ** 2
    return loss / 50

history_qgan = []
epochs = 15

start_time = time.time()
for epoch in range(epochs):
    weights_qgan, cost = opt.step_and_cost(qgan_cost, weights_qgan)
    history_qgan.append(float(cost))
    if (epoch + 1) % 5 == 0:
        print(f"  Epoch {epoch+1}/{epochs}, Loss: {cost:.4f}")
train_time_qgan = time.time() - start_time

In [None]:
start_time = time.time()
y_prob_qgan = np.array([(float(qgan_discriminator(weights_qgan, x)) + 1) / 2 for x in X_test_gan])
y_pred_qgan = (y_prob_qgan > 0.5).astype(int)
inference_time_qgan = time.time() - start_time

metrics_qgan = compute_metrics(y_test_qml, y_pred_qgan, y_prob_qgan)
metrics_qgan.update({
    'model': 'QGAN', 'n_qubits': n_qubits_gan, 'circuit_depth': 1,
    'n_shots': N_SHOTS, 'train_time': train_time_qgan, 'inference_time': inference_time_qgan
})
print(f"QGAN - F1: {metrics_qgan['f1_score']:.4f}, ROC-AUC: {metrics_qgan['roc_auc']:.4f}")

## 7. Save Results

In [None]:
all_metrics = [metrics_vqc, metrics_hqnn, metrics_qsvm, metrics_qae, metrics_qgan]
metrics_df = pd.DataFrame(all_metrics)

col_order = ['model', 'accuracy', 'precision', 'recall', 'f1_score', 'roc_auc', 
             'fpr', 'tpr', 'n_qubits', 'circuit_depth', 'n_shots', 'train_time', 'inference_time']
metrics_df = metrics_df[[c for c in col_order if c in metrics_df.columns]]

metrics_path = RESULTS_DIR / 'qml_metrics.csv'
metrics_df.to_csv(metrics_path, index=False)

print(f"✅ Saved QML metrics to: {metrics_path}")
print("\nQML Results:")
print(metrics_df.to_string(index=False))

In [None]:
# ROC Curves (with NaN handling)
plt.figure(figsize=(10, 8))

all_probs = {
    'VQC': (y_prob_vqc, y_test_qml),
    'Hybrid_QNN': (y_prob_hqnn, y_test_qml),
    'QSVM': (y_prob_qsvm, y_test_qsvm),
    'Quantum_Autoencoder': (y_prob_qae, y_test_qml),
    'QGAN': (y_prob_qgan, y_test_qml)
}

for model_name, (y_prob, y_true) in all_probs.items():
    if y_prob is not None and not np.any(np.isnan(y_prob)):
        try:
            fpr_vals, tpr_vals, _ = roc_curve(y_true, y_prob)
            auc_val = metrics_df[metrics_df['model'] == model_name]['roc_auc'].values[0]
            plt.plot(fpr_vals, tpr_vals, label=f'{model_name} (AUC={auc_val:.3f})')
        except Exception as e:
            print(f"Skipping {model_name}: {e}")

plt.plot([0, 1], [0, 1], 'k--', label='Random')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curves - QML Models')
plt.legend(loc='lower right')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(FIGURES_DIR / 'roc_curves_qml.png', dpi=150)
plt.show()

In [None]:
print("\n" + "="*50)
print("QML MODELS SUMMARY")
print("="*50)
print(f"Total models trained: {len(all_metrics)}")
print(f"\nBest by F1 Score: {metrics_df.loc[metrics_df['f1_score'].idxmax(), 'model']} ({metrics_df['f1_score'].max():.4f})")
print(f"Best by ROC-AUC: {metrics_df.loc[metrics_df['roc_auc'].idxmax(), 'model']} ({metrics_df['roc_auc'].max():.4f})")
print("\n✅ Notebook 8 Complete!")