In [None]:
# Install necessary libraries
!pip install qiskit medmnist



In [None]:
# prompt: load bloodmnist from the python module 'medmnist' and load prepare the data for torch ml later

import medmnist
import torch
import torchvision.transforms as transforms

# Download and load the dataset using MedMNIST's API
# Specify download=True to download the dataset if it's not present
DOWNLOAD = True

# Define transformations for the dataset
# Convert numpy arrays to PyTorch tensors and normalize
data_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=[.5], std=[.5])
])

# Load the BloodMNIST dataset for training
train = medmnist.BreastMNIST(split='train', download=DOWNLOAD, transform=data_transform)

# Load the BloodMNIST dataset for validation (or testing)
# You can choose 'val' or 'test' depending on your need
test = medmnist.BreastMNIST(split='test', download=DOWNLOAD, transform=data_transform)

print("Training dataset loaded:", train)
print("Test dataset loaded:", test)

# You can now create PyTorch DataLoaders for batch processing
# batch_size = 64
# train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
# test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# The datasets (train_dataset and test_dataset) are now ready for use with PyTorch models.
# Each item in the dataset will be a tuple (image, label) where image is a PyTorch tensor.

Training dataset loaded: Dataset BreastMNIST of size 28 (breastmnist)
    Number of datapoints: 546
    Root location: /root/.medmnist
    Split: train
    Task: binary-class
    Number of channels: 1
    Meaning of labels: {'0': 'malignant', '1': 'normal, benign'}
    Number of samples: {'train': 546, 'val': 78, 'test': 156}
    Description: The BreastMNIST is based on a dataset of 780 breast ultrasound images. It is categorized into 3 classes: normal, benign, and malignant. As we use low-resolution images, we simplify the task into binary classification by combining normal and benign as positive and classifying them against malignant as negative. We split the source dataset with a ratio of 7:1:2 into training, validation and test set. The source images of 1×500×500 are resized into 1×28×28.
    License: CC BY 4.0
Test dataset loaded: Dataset BreastMNIST of size 28 (breastmnist)
    Number of datapoints: 156
    Root location: /root/.medmnist
    Split: test
    Task: binary-class
   

In [None]:
import torch
import torch.nn as nn
from torch.autograd import Function
from torch.utils.data import TensorDataset, DataLoader
import numpy as np
import pandas as pd
from qiskit import QuantumCircuit
#ParameterVector was redudant
from qiskit.primitives import StatevectorEstimator
from qiskit.quantum_info import SparsePauliOp

from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score, confusion_matrix

#To control the random trainable parameters during development:
torch.manual_seed(42)

#VQA Circuit
n_qubits = 4
#Params were redudant

def create_vqa_circuit(input_data, weights):
    qc = QuantumCircuit(n_qubits)
    shift = 0  # index into weights array

    # 1) Triple data re‑uploading with CZ entanglement
    for _ in range(3):
        for i in range(n_qubits):
            qc.ry(input_data[i % len(input_data)], i)
        for i in range(n_qubits - 1):
            qc.cz(i, i + 1)
        qc.barrier()

    # 2) Two variational layers: mixed-axis rotations + ring CNOTs
    for _ in range(2):
        # each layer has 2 rotations per qubit
        for i in range(n_qubits):
            qc.rx(weights[shift], i); shift += 1
            qc.ry(weights[shift], i); shift += 1
        # ring entanglement
        for i in range(n_qubits):
            qc.cx(i, (i + 1) % n_qubits)
        qc.barrier()

    # 3) Final fine‑tune layer: one Ry per qubit
    for i in range(n_qubits):
        qc.ry(weights[shift], i)
        shift += 1

    return qc


#Qiskit StatevectorEstimator primitive
estimator = StatevectorEstimator()
#observables = [ SparsePauliOp("Z" + "I" * (n_qubits-1)) ] #Single multi-qubit gate
observables = [
    SparsePauliOp("".join("Z" if j == i else "I" for j in range(n_qubits)))
    for i in range(n_qubits)
] #A weighted sum will be applied using trainable reeadout parameters

#PyTorch Custom Autograd Function For VQA Layer
class VQALayerFunction(Function):
    @staticmethod
    def forward(ctx, input_tensor, weights):
        input_vals = input_tensor.detach().numpy()
        weight_vals = weights.detach().numpy()
        ctx.save_for_backward(input_tensor, weights)

        qc = create_vqa_circuit(input_vals, weight_vals)
        job = estimator.run([(qc, observables)])
        expval = job.result()[0].data.evs

        return torch.tensor([expval], dtype=torch.float32)

    @staticmethod
    def backward(ctx, grad_output):
        input_tensor, weights = ctx.saved_tensors
        input_vals = input_tensor.detach().numpy()
        weight_vals = weights.detach().numpy()
        shift = np.pi / 2
        grads = []

        for i in range(len(weight_vals)): # Grad is calculated using parameter shift rule
            #print("Make changes here")
            #w abbreviates weight_vals
            w_plus, w_minus = weight_vals.copy(), weight_vals.copy()
            w_plus[i]  += shift
            w_minus[i] -= shift

            circuit_plus  = create_vqa_circuit(input_vals, w_plus)
            circuit_minus = create_vqa_circuit(input_vals, w_minus)

            EV_plus  = estimator.run([(circuit_plus, observables)]).result()[0].data.evs[0]
            EV_minus = estimator.run([(circuit_minus, observables)]).result()[0].data.evs[0]

            grads.append( 0.5 * (EV_plus - EV_minus) )


        grads_tensor = torch.tensor(grads, dtype=torch.float32)
        return None, (grad_output.view(-1)[0] * grads_tensor).view(-1)

#Quantum Layer as PyTorch Module
class VQALayer(nn.Module):
    def __init__(self):
        super().__init__()
        self.weights = nn.Parameter(torch.randn(5 * n_qubits)) #trainable parameters for the circuit
        self.best_weights = self.weights.detach().clone() #to capture the best weights during training
        self.readout_w = nn.Parameter(torch.ones(n_qubits) / n_qubits) #trainable parameters for the a weighted Pauli sum observable

    def forward(self, x):
        #run quantum circuit on each sample
        expvals = torch.stack([
            VQALayerFunction.apply(x[i], self.weights)
            for i in range(x.size(0))
        ], dim=0).squeeze(1)  #tensor shape [batch, n_qubits]

        return expvals #Returns expectation for each qubit

        #return torch.stack([VQALayerFunction.apply(x[i], self.weights) for i in range(x.size(0))]).view(-1, 1)

#Full Hybrid Model
class HybridModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.classical = nn.Linear(train[0][0].numel(), n_qubits)
        self.quantum = VQALayer()
        self.readout_w = nn.Parameter(torch.ones(n_qubits))

    def forward(self, x):
        x = x.view(x.size(0), -1)
        x = self.classical(x)
        x = torch.tanh(x)
        x = self.quantum(x)
        x = (x * self.readout_w).sum(dim=1, keepdim=True)
        return torch.sigmoid(x)


model = HybridModel()
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
loss_fn = nn.BCELoss()

#Training Loop
batch_size = 64
train_loader  = DataLoader(train, batch_size=batch_size, shuffle=True)

best_rocauc = 0
epochs = 10

for epoch in range(epochs):
    total_loss = 0.0

    #Iterate over batches
    for Xb, Yb in train_loader:
        optimizer.zero_grad()
        preds = model(Xb)
        loss = loss_fn(preds, Yb.float())
        loss.backward()

        #Print gradient norms for all parameters
        # for name, param in model.named_parameters():
        #     if param.grad is not None:
        #         print(f"{name} grad_norm = {param.grad.norm():.4f}")

        optimizer.step()
        total_loss += loss.item()

    # compute epoch-level metrics via the DataLoader
    model.eval()
    all_preds, all_labels = [], []
    with torch.no_grad():
        for Xb, Yb in train_loader:
            preds = model(Xb).view(-1)
            all_preds.append(preds.cpu())
            all_labels.append(Yb.view(-1).cpu())

    y_scores = torch.cat(all_preds).numpy()
    y_true   = torch.cat(all_labels).numpy().astype(int)
    acc_full = ( (y_scores > 0.5) == y_true ).mean()
    rocauc   = roc_auc_score(y_true, y_scores)

    avg_loss = total_loss / len(train_loader)
    print(f"Epoch {epoch+1} | Avg Loss: {avg_loss:.4f} | Accuracy: {acc_full*100:.2f}% | ROC AUC: {rocauc:.4f}")

    #Record best params
    rocauc = roc_auc_score(y_true, y_scores)

    if rocauc > best_rocauc:
        model.quantum.best_weights = model.quantum.weights.detach().clone()
        best_rocauc = rocauc

    if epoch == epochs-1:
        print(f"Best weights: {model.quantum.best_weights}\n Best ROCAUC: {best_rocauc}")

    model.train()


Epoch 1 | Avg Loss: 0.7541 | Accuracy: 42.67% | ROC AUC: 0.5944
Epoch 2 | Avg Loss: 0.6889 | Accuracy: 67.03% | ROC AUC: 0.5666
Epoch 3 | Avg Loss: 0.6508 | Accuracy: 71.79% | ROC AUC: 0.5486
Epoch 4 | Avg Loss: 0.6312 | Accuracy: 72.16% | ROC AUC: 0.5520
Epoch 5 | Avg Loss: 0.6121 | Accuracy: 72.53% | ROC AUC: 0.5653
Epoch 6 | Avg Loss: 0.5993 | Accuracy: 72.53% | ROC AUC: 0.5758
Epoch 7 | Avg Loss: 0.5902 | Accuracy: 72.71% | ROC AUC: 0.5852
Epoch 8 | Avg Loss: 0.5810 | Accuracy: 72.89% | ROC AUC: 0.5966
Epoch 9 | Avg Loss: 0.5840 | Accuracy: 72.89% | ROC AUC: 0.5963
Epoch 10 | Avg Loss: 0.5678 | Accuracy: 72.89% | ROC AUC: 0.6111
Best weights: tensor([-0.0115,  1.0955, -0.1100, -0.1695,  0.9276, -0.4935,  0.7547,  1.4807,
         1.0255,  2.1790, -0.5769, -0.0102, -1.4060,  2.1734,  0.3246, -0.9725,
        -0.1712,  0.2488, -0.0217,  0.2170])
 Best ROCAUC: 0.6110855369716808


In [None]:
#Implement Testing Loop:
model.quantum.weights.data.copy_(model.quantum.best_weights.detach().clone())

test_loader  = DataLoader(test, batch_size=batch_size, shuffle=True)

model.eval()
all_scores, all_labels = [], []
with torch.no_grad():
    for Xb, Yb in test_loader:
        probs = model(Xb).view(-1).cpu().numpy()
        all_scores.append(probs)
        all_labels.append(Yb.view(-1).cpu().numpy())
y_scores = np.concatenate(all_scores)
y_true   = np.concatenate(all_labels).astype(int)

thresholds = np.linspace(0, 1, 101)
best_t, best_acc = 0.5, 0.0
for t in thresholds:
    preds = (y_scores >= t).astype(int)
    acc   = (preds == y_true).mean()
    if acc > best_acc:
        best_acc, best_t = acc, t
print(f"Optimal threshold = {best_t:.2f}, accuracy = {best_acc:.4f}\n")

preds_best = (y_scores >= best_t).astype(int)

acc    = accuracy_score(y_true, preds_best)
prec   = precision_score(y_true, preds_best)
rec    = recall_score(y_true, preds_best)
f1     = f1_score(y_true, preds_best)
rocauc = roc_auc_score(y_true, y_scores) # Use y_scores for ROC AUC
cm     = confusion_matrix(y_true, preds_best)

# Print results
print("--- Test Set Metrics ---")
print(f"Test Accuracy:  {acc:.4f}")
print(f"Precision:      {prec:.4f}")
print(f"Recall:         {rec:.4f}")
print(f"F1-Score:       {f1:.4f}")
print(f"ROC AUC:        {rocauc:.4f}")
print("Confusion Matrix:")
print(cm)

Optimal threshold = 0.57, accuracy = 0.7372

--- Test Set Metrics ---
Test Accuracy:  0.7372
Precision:      0.7355
Recall:         1.0000
F1-Score:       0.8476
ROC AUC:        0.6368
Confusion Matrix:
[[  1  41]
 [  0 114]]
