In [None]:
# Library imports
%matplotlib inline
import math
import random
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
import pennylane as qml

# Pytorch imports
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt

# Set the random seed for reproducibility
seed = 42
torch.manual_seed(seed)
np.random.seed(seed)
random.seed(seed)
device = torch.device('cpu')

In [4]:
#Prep Training Data (Breast Cancer Wisconsin)

#Load the data file
df = pd.read_csv("data/winequality-red.csv", sep = ';', header=0)

Y = (df['quality'] >= 6).astype(int).values.reshape(-1,1)
X = df.drop(['quality'], axis=1).values


mean = X.mean(axis=0)
std = X.std(axis=0)
X = (X - mean) / std

#Convert to torch tensors
X_tensor = torch.tensor(X, dtype=torch.float32)
Y_tensor = torch.tensor(Y, dtype=torch.float32)
y = Y_tensor.view(-1).long()     # flatten to shape [N] and cast to int
counts = torch.bincount(y)       # bincount over 0,1
print(f"Zeros (class 0): {counts[0].item()}")
print(f"Ones  (class 1): {counts[1].item()}")

#Manual train/test split (80/20)
num_samples = X_tensor.shape[0]
indices = torch.randperm(num_samples)

split_idx = int(num_samples * 0.8)
train_indices = indices[:split_idx]
test_indices = indices[split_idx:]
X_train = X_tensor[train_indices]
Y_train = Y_tensor[train_indices]
X_test = X_tensor[test_indices]
Y_test = Y_tensor[test_indices]
print(X.shape, Y.shape)

Zeros (class 0): 744
Ones  (class 1): 855
(1599, 11) (1599, 1)


The base idea for this model is a quanvolutional neural network that uses multiple quantum kernels for quantum feature extraction. Here I use 3 1D filter kernels of size 3, allowing us to map a 30 feature datapoint to a (9,10) dimension tensor. The output of this is passed into a classical convolutional layer. One can use a quantum convolutional neural network instead, but due to time and resource constraints, I opted for a classical CNN.

In [63]:

quanv_dev1 = qml.device('lightning.qubit', wires = 3)
@qml.qnode(quanv_dev1, interface='torch', diff_method = 'best')
def quanv_circ1(input_vals, params):
    qml.IQPEmbedding(input_vals, list(range(len(input_vals))))
    for j in range(3):
        qml.U3(params[3*j], 
                params[3*j + 1], 
                params[3*j + 2],
                wires = j)
    qml.CNOT(wires = [1,2])
    qml.IsingXX(params[3*3], wires = [0,1])
    qml.IsingYY(params[3*3 + 1], wires = [0,1])
    qml.IsingZZ(params[3*3 + 2], wires = [0,1])
    qml.CNOT(wires = [1,2])
    for j in range(3):
        qml.U3(params[3*j + 12], 
                params[3*j + 1 + 12], 
                params[3*j + 2 + 12],
                wires = j)
    
    return [qml.expval(qml.PauliZ(i)) for i in [0,1,2]]

qsvt_dev = qml.device('default.mixed', wires = 4)
@qml.qnode(qsvt_dev, interface='torch', diff_method = 'best')
def qsvt_circ(input_vals):
    A = torch.diag(input_vals)
    angles = [0,0]
    qml.qsvt(A, angles, wires = list(range(4)))
    return qml.probs(wires = list(range(4)))

def quanv(data, window_size, params, num_qubits):
    num_qubits = 3
    dim = data.size(1)
    out_cols = dim - window_size + 1
    out = torch.zeros((num_qubits, out_cols))
    for idx, start in enumerate(range(0, out_cols)):
        seg1 = data[0, start:start+window_size]
        q1 = quanv_circ1(seg1, params[:21])
        for i in range(num_qubits):
            out[i, idx] = q1[i]
    return out


Here, you can see the architecture of the model, including the classical layers.

In [None]:
class HybridModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.q_params = nn.ParameterList(
            [
                nn.Parameter(torch.randn((46))*np.pi, requires_grad=True)
            ]
        )
        self.classical = nn.Sequential(
            nn.Conv1d(in_channels=3, out_channels=16, kernel_size=3, padding=1),
            nn.BatchNorm1d(16),
            nn.ReLU(),
            nn.MaxPool1d(2),
            nn.Conv1d(in_channels = 16, out_channels=32, kernel_size=3, padding=1),
            nn.BatchNorm1d(32),
            nn.ReLU(),
            nn.MaxPool1d(2)
        )
        self.output = nn.Sequential(
            nn.Linear(2*32, 32),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(32, 1),
            nn.Sigmoid()
        )
    def forward(self, x):
        quanv_output = None
        for q_param in self.q_params:
            quanv_output = quanv(x, 3, q_param, 3).view(1,3,9)
        convolution_output = self.classical(quanv_output).view(1, 32*2)
        output = self.output(convolution_output)
        return output
    # quanv_output = None
    #     qsvt_output = torch.zeros((3, 2**4))
    #     for q_param in self.q_params:
    #         quanv_output = quanv(x, 4, q_param, 3).view(3,8)
    #     for i, q_output in enumerate(quanv_output):
    #         qsvt_output[i, :] = qsvt_circ(q_output)
    #     qsvt_output = qsvt_output.view(1, 3, 16)
    #     convolution_output = self.classical(qsvt_output).view(1, 128)
    #     output = self.output(convolution_output)
    #     return output


Methods to evaluate robustness

In [72]:
import torch
import torch.nn.functional as F
from torch.utils.data import TensorDataset, DataLoader
from sklearn.metrics import accuracy_score

def fgsm_attack(model, x, y, epsilon, device):
    x_adv = x.clone().detach().to(device).requires_grad_(True)
    out = model(x_adv)

    if out.dim()==2 and out.size(1)==1:
        out = out.view(-1)

    loss = F.binary_cross_entropy_with_logits(out.unsqueeze(0), y.float().to(device))
    model.zero_grad()
    loss.backward()
    
    x_adv = x_adv + epsilon * x_adv.grad.sign()
    return x_adv.detach()

Training loop. I used a regularization loss term since I was finding that this model fits easily to the training dataset. Compared to a simple classical CNN classifier, unfortunately we acquire lower test accuracy, F-1 score, precision, and recall. Nonetheless, I thought this model performed quite well on this particular dataset, where it appeared difficult to acquire any comparable performance to a classical classifier.

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import TensorDataset, DataLoader
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score
import numpy as np

# — configs — 
device       = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
k_folds      = 5
n_epochs     = 30
batch_size   = 1
lr           = 0.0002
betas        = (0.5, 0.999)
weight_decay = 0.005

# Your existing train/test split:
X_tr = X_train.cpu()          # we'll .to(device) inside loop
Y_tr = Y_train.view(-1).cpu().long()
X_te = X_test.to(device)
Y_te = Y_test.view(-1).to(device).long()

ensemble_models = []

# Stratify only on the training set—ignore the val indices
skf = StratifiedKFold(n_splits=k_folds, shuffle=True, random_state=42)
for fold, (train_idx, _) in enumerate(skf.split(X_tr, Y_tr), 1):
    print(f"\n→ Fold {fold}/{k_folds}")

    # Build fold‐specific loader
    X_fold = X_tr[train_idx].to(device)
    Y_fold = Y_tr[train_idx].to(device)
    train_loader = DataLoader(
        TensorDataset(X_fold, Y_fold),
        batch_size=batch_size,
        shuffle=True
    )
    # Compute pos_weight = (#neg / #pos) on this fold
    n_pos = int((Y_fold==1).sum().item())
    n_neg = int((Y_fold==0).sum().item())
    pos_weight = n_neg / n_pos
    print(f"  pos/neg = {n_pos}/{n_neg} → pos_weight = {pos_weight:.2f}")

    # Instantiate & train a fresh model
    model_f = HybridModel().to(device)
    optimizer_f = torch.optim.Adam(
        model_f.parameters(),
        lr=lr, betas=betas, weight_decay=weight_decay
    )
    for epoch in range(1, n_epochs+1):
        model_f.train()
        running_loss = 0.0
        for xb, yb in train_loader:
            optimizer_f.zero_grad()
            pred = model_f(xb).view(-1)
            weights = torch.where(
                yb==1,
                torch.tensor(pos_weight, device=device),
                torch.tensor(1.0,        device=device)
            )
            loss = F.binary_cross_entropy(pred, yb.float(), weight=weights)
            loss.backward()
            optimizer_f.step()
            running_loss += loss.item() * xb.size(0)
        print(f"    Epoch {epoch:02d} — loss {running_loss/len(train_loader.dataset):.4f}")

    model_f.eval()
    ensemble_models.append(model_f)

# — Ensemble on the hold-out test set —
all_probs = []
with torch.no_grad():
    for m in ensemble_models:
        probs = []
        for x in X_te:
            p = m(x.unsqueeze(0)).view(-1)[0].item()
            probs.append(p)
        all_probs.append(probs)

avg_probs  = np.mean(np.vstack(all_probs), axis=0)
y_pred_ens = (avg_probs >= 0.5).astype(int)
y_true_te  = Y_te.cpu().numpy()

print("Ensemble Test Accuracy:  ", accuracy_score(y_true_te, y_pred_ens))
print("Ensemble Test Precision: ", precision_score(y_true_te, y_pred_ens))
print("Ensemble Test Recall:    ", recall_score(y_true_te, y_pred_ens))
print("Ensemble Test F1-score:  ", f1_score(y_true_te, y_pred_ens))



→ Fold 1/5
  pos/neg = 550/473 → pos_weight = 0.86
    Epoch 01 — loss 0.6275
    Epoch 02 — loss 0.6044
    Epoch 03 — loss 0.5669
    Epoch 04 — loss 0.5432
    Epoch 05 — loss 0.5264
    Epoch 06 — loss 0.4985
    Epoch 07 — loss 0.4862
    Epoch 08 — loss 0.4614
    Epoch 09 — loss 0.4383
    Epoch 10 — loss 0.4189
    Epoch 11 — loss 0.4030
    Epoch 12 — loss 0.3748
    Epoch 13 — loss 0.3500
    Epoch 14 — loss 0.3324
    Epoch 15 — loss 0.3032
    Epoch 16 — loss 0.2869
    Epoch 17 — loss 0.2785
    Epoch 18 — loss 0.2433
    Epoch 19 — loss 0.2300
    Epoch 20 — loss 0.2134
    Epoch 21 — loss 0.1961
    Epoch 22 — loss 0.1811
    Epoch 23 — loss 0.1644
    Epoch 24 — loss 0.1577
    Epoch 25 — loss 0.1442
    Epoch 26 — loss 0.1212
    Epoch 27 — loss 0.1158
    Epoch 28 — loss 0.1108
    Epoch 29 — loss 0.0944
    Epoch 30 — loss 0.0863

→ Fold 2/5
  pos/neg = 550/473 → pos_weight = 0.86
    Epoch 01 — loss 0.6351
    Epoch 02 — loss 0.6017
    Epoch 03 — loss 0.5745
    E

In [67]:
print("Ensemble Test Accuracy:  ", accuracy_score(y_true_te, y_pred_ens)*100)
print("Ensemble Test Precision: ", precision_score(y_true_te, y_pred_ens))
print("Ensemble Test Recall:    ", recall_score(y_true_te, y_pred_ens))
print("Ensemble Test F1-score:  ", f1_score(y_true_te, y_pred_ens))

Ensemble Test Accuracy:   68.125
Ensemble Test Precision:  0.7171052631578947
Ensemble Test Recall:     0.6488095238095238
Ensemble Test F1-score:   0.68125


In [75]:
import torch
import numpy as np
from sklearn.metrics import accuracy_score

def evaluate_ensemble_robustness(ensemble_models, device, X_test, Y_test, epsilons):
    """
    Evaluate ensemble accuracy under FGSM attack for various epsilons.
    For each epsilon:
      - For each model in the ensemble:
          • Generate adversarial examples on X_test (per-sample FGSM).
          • Collect model probabilities on those adversarial inputs.
      - Average probabilities across models.
      - Threshold at 0.5 and compute accuracy.
    Returns a dict mapping epsilon -> ensemble accuracy.
    """
    ensemble_robustness = {}

    for eps in epsilons:
        # Collect per-model probability lists
        per_model_probs = []

        for model in ensemble_models:
            model.eval()
            probs = []
            for i in range(X_test.size(0)):
                x = X_test[i].unsqueeze(0).to(device)
                y = Y_test[i].unsqueeze(0).long().to(device)
                # generate adversarial or keep clean
                if eps > 0.0:
                    x_adv = fgsm_attack(model, x, y, eps, device)
                else:
                    x_adv = x
                with torch.no_grad():
                    p = model(x_adv).view(-1)[0].item()  # assumes sigmoid final
                probs.append(p)
            per_model_probs.append(probs)

        # average probabilities across models: shape = [n_test]
        avg_probs = np.mean(np.array(per_model_probs), axis=0)
        y_pred = (avg_probs >= 0.5).astype(int)
        y_true = Y_test.view(-1).cpu().numpy().astype(int)

        acc = accuracy_score(y_true, y_pred)
        ensemble_robustness[eps] = acc

    return ensemble_robustness

# — USAGE —
epsilons = [0.0, 0.01, 0.05, 0.1, 0.2]
ensemble_robustness = evaluate_ensemble_robustness(
    ensemble_models, device, X_test, Y_test, epsilons
)
print("Ensemble Accuracy vs ε:", ensemble_robustness)


Ensemble Accuracy vs ε: {0.0: 0.68125, 0.01: 0.634375, 0.05: 0.4125, 0.1: 0.23125, 0.2: 0.109375}


In [73]:
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)
num_params = 0
for expert in ensemble_models:
    num_params += count_parameters(expert)
num_params

19915

In [None]:
#ensemble params
# ensemble_state = {
#     f'fold_{i}': m.state_dict()
#     for i, m in enumerate(ensemble_models, start=1)
# }
# torch.save(ensemble_state, 'wine_ensemble_models.pth')

In [None]:
#loading in ensemble
checkpoint = torch.load('wine_ensemble_models.pth', map_location=device)
loaded_models = []
for key, state in checkpoint.items():
    m = HybridModel().to(device)
    m.load_state_dict(state)
    m.eval()
    loaded_models.append(m)