In [1]:
!pip install pennylane

Collecting pennylane
  Downloading PennyLane-0.41.1-py3-none-any.whl.metadata (10 kB)
Collecting rustworkx>=0.14.0 (from pennylane)
  Downloading rustworkx-0.16.0-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (10 kB)
Collecting tomlkit (from pennylane)
  Downloading tomlkit-0.13.2-py3-none-any.whl.metadata (2.7 kB)
Collecting appdirs (from pennylane)
  Downloading appdirs-1.4.4-py2.py3-none-any.whl.metadata (9.0 kB)
Collecting autoray>=0.6.11 (from pennylane)
  Downloading autoray-0.7.1-py3-none-any.whl.metadata (5.8 kB)
Collecting pennylane-lightning>=0.41 (from pennylane)
  Downloading pennylane_lightning-0.41.1-cp311-cp311-manylinux_2_28_x86_64.whl.metadata (12 kB)
Collecting diastatic-malt (from pennylane)
  Downloading diastatic_malt-2.15.2-py3-none-any.whl.metadata (2.6 kB)
Collecting scipy-openblas32>=0.3.26 (from pennylane-lightning>=0.41->pennylane)
  Downloading scipy_openblas32-0.3.29.0.0-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (5

In [3]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.decomposition import PCA
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score, confusion_matrix
import pennylane as qml

In [4]:
# Fix random seeds for reproducibility
seed = 42
np.random.seed(seed)
torch.manual_seed(seed)

# Log library versions
print(f"PyTorch version: {torch.__version__}")
import pennylane as qml
print(f"PennyLane version: {qml.version()}")
import shap
print(f"SHAP version: {shap.__version__}")

PyTorch version: 2.6.0+cu124
PennyLane version: 0.41.1
SHAP version: 0.47.2


In [5]:
# Load training and test datasets
train_df = pd.read_csv("DIA_trainingset_RDKit_descriptors.csv")
test_df  = pd.read_csv("DIA_testset_RDKit_descriptors.csv")

# Separate features and labels
X_train_full = train_df.drop(columns=["Label", "SMILES"])
y_train_full = train_df["Label"].values  # binary labels (0 or 1)
X_test_full  = test_df.drop(columns=["Label", "SMILES"])
y_test       = test_df["Label"].values

# Standardize features (fit on training data, apply to train/val/test)
scaler = StandardScaler()
X_train_full_scaled = scaler.fit_transform(X_train_full)
X_test_scaled = scaler.transform(X_test_full)

# Split training data into train and validation subsets
X_train_scaled, X_val_scaled, y_train, y_val = train_test_split(
    X_train_full_scaled, y_train_full, test_size=0.15, stratify=y_train_full, random_state=seed
)

# Apply PCA to reduce to 5 components
pca = PCA(n_components=5, random_state=seed)
X_train = pca.fit_transform(X_train_scaled)
X_val   = pca.transform(X_val_scaled)
X_test  = pca.transform(X_test_scaled)

print("Training samples:", X_train.shape[0], "| Features (PCA components):", X_train.shape[1])
print("Validation samples:", X_val.shape[0], "| Test samples:", X_test.shape[0])


Training samples: 405 | Features (PCA components): 5
Validation samples: 72 | Test samples: 120


In [6]:
# Set up a quantum device with 5 qubits
n_qubits = 5
dev = qml.device("default.qubit", wires=n_qubits)

# Define the variational quantum circuit (QNode) with trainable weights
@qml.qnode(dev, interface="torch", diff_method="parameter-shift")
def qnode(inputs, weights):
    # inputs: 1D tensor of length 5 (features), weights: trainable tensor of shape (n_layers, n_qubits, 3)
    qml.AngleEmbedding(inputs, wires=range(n_qubits), rotation="Y")         # embed features as Y-rotations
    for layer in range(weights.shape[0]):                                   # variational layers
        for q in range(n_qubits):
            qml.Rot(weights[layer, q, 0], weights[layer, q, 1], weights[layer, q, 2], wires=q)
        # Entangle qubits in a ring topology
        for q in range(n_qubits - 1):
            qml.CNOT(wires=[q, q+1])
        qml.CNOT(wires=[n_qubits-1, 0])
    return qml.expval(qml.PauliZ(0))  # expectation value on first qubit (output between -1 and 1)

In [7]:
# Initialize trainable weight shapes for 4 layers
n_layers = 4
weight_shapes = {"weights": (n_layers, n_qubits, 3)}

# Wrap the QNode in a torch.nn.Module for easy integration
qlayer = qml.qnn.TorchLayer(qnode, weight_shapes)

# Define the hybrid quantum-classical model (in this case, the model is just the QNN layer itself)
model = qlayer  # model will output a logit in [-1, 1]

# Define loss function (binary cross-entropy with logits) with class weighting for imbalance
neg_count = (y_train == 0).sum()
pos_count = (y_train == 1).sum()
pos_weight = torch.tensor([neg_count / float(pos_count)], dtype=torch.float32)
criterion = nn.BCEWithLogitsLoss(pos_weight=pos_weight)

# Define optimizer
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)

In [8]:
# Convert datasets to PyTorch tensors
X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.float32)  # shape (N_train,)
X_val_tensor   = torch.tensor(X_val, dtype=torch.float32)
y_val_tensor   = torch.tensor(y_val, dtype=torch.float32)
X_test_tensor  = torch.tensor(X_test, dtype=torch.float32)
y_test_tensor  = torch.tensor(y_test, dtype=torch.float32)


In [16]:
class HybridQNN(nn.Module):
    def __init__(self, qlayer):
        super().__init__()
        self.qlayer = qlayer
        self.head = nn.Sequential(
            nn.Linear(1, 16),
            nn.ReLU(),
            nn.Linear(16, 1),
        )

    def forward(self, x):
        # If x is 1D (n_features,), treat it as a batch of size 1
        if x.ndim == 1:
            x = x.unsqueeze(0)        # (n_features,) → (1, n_features)

        # Now x is (batch_size, n_features)
        q_out = self.qlayer(x)       # yields shape (batch_size,)
        q_out = q_out.unsqueeze(1)   # → (batch_size, 1)
        out   = self.head(q_out)     # → (batch_size, 1)
        return out.squeeze(1)        # → (batch_size,)

# Re-instantiate your model
model = HybridQNN(qlayer)


In [11]:
# 2) Optimizer + scheduler
optimizer = torch.optim.Adam(
    model.parameters(), lr=1e-3, weight_decay=1e-4
)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
    optimizer, mode="min", factor=0.5, patience=5, min_lr=1e-5, verbose=True
)
criterion = nn.BCEWithLogitsLoss(pos_weight=pos_weight)

# 3) DataLoader for batching
from torch.utils.data import TensorDataset, DataLoader
train_ds = TensorDataset(X_train_tensor, y_train_tensor)
train_loader = DataLoader(train_ds, batch_size=32, shuffle=True)

# 4) Training loop with gradient clipping and scheduler
num_epochs = 50
for epoch in range(1, num_epochs+1):
    model.train()
    total_loss = 0.0
    for batch_X, batch_y in train_loader:
        optimizer.zero_grad()
        logits = model(batch_X)
        loss   = criterion(logits, batch_y)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()
        total_loss += loss.item() * batch_X.shape[0]
    avg_loss = total_loss / len(train_loader.dataset)

    # Validation
    model.eval()
    with torch.no_grad():
        val_logits = model(X_val_tensor)
        val_loss   = criterion(val_logits, y_val_tensor).item()
        val_preds  = (torch.sigmoid(val_logits) >= 0.5).float()
        val_acc    = (val_preds == y_val_tensor).float().mean().item()

    print(f"Epoch {epoch:2d} | Train loss {avg_loss:.4f} | Val loss {val_loss:.4f} | Val acc {val_acc:.3f}")
    scheduler.step(val_loss)




Epoch  1 | Train loss 1.0932 | Val loss 1.0812 | Val acc 0.750
Epoch  2 | Train loss 1.0777 | Val loss 1.0720 | Val acc 0.750
Epoch  3 | Train loss 1.0649 | Val loss 1.0644 | Val acc 0.750
Epoch  4 | Train loss 1.0548 | Val loss 1.0586 | Val acc 0.750
Epoch  5 | Train loss 1.0455 | Val loss 1.0550 | Val acc 0.750
Epoch  6 | Train loss 1.0385 | Val loss 1.0526 | Val acc 0.750
Epoch  7 | Train loss 1.0333 | Val loss 1.0501 | Val acc 0.750
Epoch  8 | Train loss 1.0275 | Val loss 1.0485 | Val acc 0.750
Epoch  9 | Train loss 1.0228 | Val loss 1.0467 | Val acc 0.722
Epoch 10 | Train loss 1.0171 | Val loss 1.0458 | Val acc 0.736
Epoch 11 | Train loss 1.0120 | Val loss 1.0446 | Val acc 0.667
Epoch 12 | Train loss 1.0070 | Val loss 1.0429 | Val acc 0.653
Epoch 13 | Train loss 1.0006 | Val loss 1.0421 | Val acc 0.639
Epoch 14 | Train loss 0.9948 | Val loss 1.0407 | Val acc 0.653
Epoch 15 | Train loss 0.9882 | Val loss 1.0395 | Val acc 0.639
Epoch 16 | Train loss 0.9818 | Val loss 1.0390 | Val ac

In [12]:
# Train a logistic regression baseline
logistic_model = LogisticRegression(solver='lbfgs', max_iter=1000, random_state=seed)
logistic_model.fit(X_train, y_train)

# Train a random forest baseline
rf_model = RandomForestClassifier(n_estimators=100, random_state=seed)
rf_model.fit(X_train, y_train)


In [13]:
# Get predictions on test set for each model
# Quantum model: use sigmoid on the logit output to get probability of class 1
model.eval()
with torch.no_grad():
    q_logits = model(X_test_tensor).squeeze()
q_probs = torch.sigmoid(q_logits).numpy()               # predicted probabilities for class 1
q_preds = (q_probs >= 0.5).astype(int)                  # predicted labels

# Logistic regression predictions
lr_probs = logistic_model.predict_proba(X_test)[:, 1]
lr_preds = logistic_model.predict(X_test)

# Random forest predictions
rf_probs = rf_model.predict_proba(X_test)[:, 1]
rf_preds = rf_model.predict(X_test)

# Compute metrics
acc_q  = accuracy_score(y_test, q_preds)
prec_q = precision_score(y_test, q_preds)
rec_q  = recall_score(y_test, q_preds)
f1_q   = f1_score(y_test, q_preds)
auc_q  = roc_auc_score(y_test, q_probs)

acc_lr  = accuracy_score(y_test, lr_preds)
prec_lr = precision_score(y_test, lr_preds)
rec_lr  = recall_score(y_test, lr_preds)
f1_lr   = f1_score(y_test, lr_preds)
auc_lr  = roc_auc_score(y_test, lr_probs)

acc_rf  = accuracy_score(y_test, rf_preds)
prec_rf = precision_score(y_test, rf_preds)
rec_rf  = recall_score(y_test, rf_preds)
f1_rf   = f1_score(y_test, rf_preds)
auc_rf  = roc_auc_score(y_test, rf_probs)

print("Test Metrics:")
print(f" Quantum VQA - Accuracy: {acc_q:.3f}, Precision: {prec_q:.3f}, Recall: {rec_q:.3f}, F1: {f1_q:.3f}, AUC: {auc_q:.3f}")
print(f" Logistic Reg - Accuracy: {acc_lr:.3f}, Precision: {prec_lr:.3f}, Recall: {rec_lr:.3f}, F1: {f1_lr:.3f}, AUC: {auc_lr:.3f}")
print(f" RandomForest - Accuracy: {acc_rf:.3f}, Precision: {prec_rf:.3f}, Recall: {rec_rf:.3f}, F1: {f1_rf:.3f}, AUC: {auc_rf:.3f}")


Test Metrics:
 Quantum VQA - Accuracy: 0.592, Precision: 0.268, Recall: 0.367, F1: 0.310, AUC: 0.478
 Logistic Reg - Accuracy: 0.733, Precision: 0.000, Recall: 0.000, F1: 0.000, AUC: 0.515
 RandomForest - Accuracy: 0.725, Precision: 0.286, Recall: 0.067, F1: 0.108, AUC: 0.698


In [14]:
# Use Kernel SHAP on the quantum model (since it's not a standard neural network architecture).
# We'll use a subset of training data as background for the KernelExplainer to estimate expected values.
explainer = shap.KernelExplainer(
    lambda X: torch.sigmoid(model(torch.tensor(X, dtype=torch.float32))).detach().numpy(),
    X_train[:50]  # using 50 samples from training as background
)
# Compute SHAP values for the test set (this may be slow, so we use a subset or the whole test if small)
shap_values = explainer.shap_values(X_test[:50], nsamples=100)  # shap values for up to 50 test samples
shap_values = np.array(shap_values)  # shape (n_samples, n_features)
# Calculate mean absolute SHAP value for each feature
mean_abs_shap = np.abs(shap_values).mean(axis=0)
for i, val in enumerate(mean_abs_shap):
    print(f"PCA{i+1} mean(|SHAP|): {val:.4f}")


  0%|          | 0/50 [00:00<?, ?it/s]

PCA1 mean(|SHAP|): 0.0369
PCA2 mean(|SHAP|): 0.0304
PCA3 mean(|SHAP|): 0.0278
PCA4 mean(|SHAP|): 0.0490
PCA5 mean(|SHAP|): 0.0242


In [17]:
# Choose an index of a test sample to explain (e.g., 0)
sample_idx = 0
sample = X_test[sample_idx]
sample_true_label = y_test[sample_idx]
sample_logit = model(torch.tensor(sample, dtype=torch.float32)).item()
sample_prob = torch.sigmoid(torch.tensor(sample_logit)).item()

print(f"Test sample {sample_idx}: true label={sample_true_label}, model probability={sample_prob:.3f}")
# Get SHAP values for this sample (if not already computed above)
shap_values_single = explainer.shap_values(sample.reshape(1, -1))
shap_values_single = np.array(shap_values_single).flatten()
for i, val in enumerate(shap_values_single):
    print(f"PCA{i+1} contribution: {val:+.3f}")


Test sample 0: true label=0, model probability=0.527


  0%|          | 0/1 [00:00<?, ?it/s]

PCA1 contribution: +0.014
PCA2 contribution: +0.016
PCA3 contribution: +0.018
PCA4 contribution: +0.010
PCA5 contribution: +0.016


In [20]:
# Assume we have a molecular weight feature in the original data:
molwts = test_df["MolWt"].values  # molecular weight of each test compound
median_molwt = np.median(molwts)
heavy_mask = molwts > median_molwt   # boolean mask for heavy group
light_mask = molwts <= median_molwt  # mask for light group

# Predictions from quantum model (already computed as q_preds for test set)
# Calculate positive prediction rate in each group
pos_rate_heavy = q_preds[heavy_mask].mean()
pos_rate_light = q_preds[light_mask].mean()
disparate_impact = min(pos_rate_heavy, pos_rate_light) / max(pos_rate_heavy, pos_rate_light)

# Calculate error rates in each group
y_test = np.array(y_test)  # ensure numpy array
error_rate_heavy = 1 - accuracy_score(y_test[heavy_mask], q_preds[heavy_mask])
error_rate_light = 1 - accuracy_score(y_test[light_mask], q_preds[light_mask])

print(f"Positive prediction rate (heavy): {pos_rate_heavy:.3f}, (light): {pos_rate_light:.3f}")
print(f"Disparate Impact Ratio: {disparate_impact:.3f}")
print(f"Error rate (heavy): {error_rate_heavy:.3f}, (light): {error_rate_light:.3f}")


Positive prediction rate (heavy): 0.333, (light): 0.350
Disparate Impact Ratio: 0.952
Error rate (heavy): 0.417, (light): 0.400


In [21]:
# Get the predicted probabilities for the positive class from the quantum model
probs = q_probs  # (already computed via sigmoid on q_logits)
# Identify low-confidence predictions (between 0.4 and 0.6 probability)
flag_mask = (probs >= 0.4) & (probs <= 0.6)
num_flagged = flag_mask.sum()
flagged_idx = np.where(flag_mask)[0]

# Determine how many flagged were actually misclassified by the model
misclassified_flagged = ((q_preds != y_test) & flag_mask).sum()
# Compute baseline accuracy and accuracy if all flagged are corrected by an expert
base_accuracy = accuracy_score(y_test, q_preds)
corrected_preds = q_preds.copy()
# For flagged instances, set predictions to the true label (simulate perfect correction)
corrected_preds[flagged_idx] = y_test[flagged_idx]
corrected_accuracy = accuracy_score(y_test, corrected_preds)

print(f"Flagged for review: {num_flagged} out of {len(y_test)} test samples")
print(f"Model accuracy before = {base_accuracy:.3f}, after expert correction = {corrected_accuracy:.3f}")


Flagged for review: 53 out of 120 test samples
Model accuracy before = 0.592, after expert correction = 0.783
