# 04_quantum_vqc.ipynb — Variational Quantum Classifier (shallow, re-uploading)

# Cell 0 — perf env

In [None]:
# Normalize underlying BLAS thread counts for reproducible timing
import os
os.environ.setdefault("OMP_NUM_THREADS", "8")
os.environ.setdefault("OPENBLAS_NUM_THREADS", "8")
os.environ.setdefault("MKL_NUM_THREADS", "8")
os.environ.setdefault("NUMEXPR_NUM_THREADS", "8")

'8'

# Cell 1 — imports & data

In [None]:
# Load k-mer encodings; reduce dimension with PCA; scale features; prepare train/val/test splits
from pathlib import Path
import json, warnings, numpy as np, pandas as pd
import pennylane as qml
from pennylane import numpy as pnp
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import (accuracy_score, precision_recall_fscore_support, roc_auc_score, confusion_matrix)

warnings.filterwarnings("ignore")
ROOT = Path("."); PROCESSED = ROOT/"data/processed"; RESULTS = ROOT/"results"
(RESULTS/"metrics").mkdir(parents=True, exist_ok=True)
np.random.seed(11); pnp.random.seed(11)

data = np.load(PROCESSED/"encodings.npz", allow_pickle=True)
with open(PROCESSED/"splits.json") as f: SPL = json.load(f)
y = data["y"]; X_kmer = data["kmer"].astype(np.float32)
tr_idx = np.array(SPL["train"]); va_idx = np.array(SPL["val"]); te_idx = np.array(SPL["test"])

D = 6
pca = PCA(n_components=D, random_state=11)
X_tr = pca.fit_transform(X_kmer[tr_idx])
X_va = pca.transform(X_kmer[va_idx])
X_te = pca.transform(X_kmer[te_idx])

scaler = StandardScaler(with_mean=True, with_std=True)
Xtr = scaler.fit_transform(X_tr).astype(np.float32)
Xva = scaler.transform(X_va).astype(np.float32)
Xte = scaler.transform(X_te).astype(np.float32)
ytr, yva, yte = y[tr_idx].astype(int), y[va_idx].astype(int), y[te_idx].astype(int)
Xtr.shape, ytr.mean()

((894, 6), np.float64(0.9228187919463087))

# Cell 2 — device + circuit

In [None]:
# Define shallow variational circuit (re-uploading style) and helper prediction routine
def make_device(n_wires, shots=None, use_mixed=False):
    backend = "default.mixed" if use_mixed else "lightning.qubit"
    try:
        return qml.device(backend, wires=n_wires, shots=shots)
    except Exception:
        return qml.device("default.qubit", wires=n_wires, shots=shots)

n_wires = D; L = 2
dev = make_device(n_wires, shots=None, use_mixed=False)

# Robust import for entangler layer across PennyLane versions
try:
    BasicEntanglerLayers = qml.BasicEntanglerLayers
except AttributeError:
    from pennylane.templates.layers import BasicEntanglerLayers

weights = pnp.random.normal(scale=0.15, size=(L, n_wires), requires_grad=True)

def layer(x, w, p_bitflip=0.0, p_depol=0.0):
    qml.AngleEmbedding(x, wires=range(n_wires), rotation="Y")
    if p_bitflip>0:
        for i in range(n_wires):
            qml.BitFlip(p_bitflip, wires=i)
    if p_depol>0:
        for i in range(n_wires):
            qml.DepolarizingChannel(p_depol, wires=i)
    BasicEntanglerLayers(w[None, :], wires=range(n_wires))

@qml.qnode(dev, interface="autograd")
def vqc(x, w):
    for l in range(L):
        layer(x, w[l])
    return qml.expval(qml.PauliZ(0))

def predict_proba(X, w, as_numpy=False):
    vals = []
    for xi in X:
        m = vqc(xi, w)           # Possibly an autograd ArrayBox
        vals.append((1 + m)/2)   # Map expectation [-1,1] -> probability [0,1]
    p = pnp.clip(pnp.stack(vals), 1e-6, 1-1e-6)
    if as_numpy:  # Explicit conversion after gradient use
        return np.asarray(p)
    return p

# Cell 3 — train (Adam + early stopping)

In [None]:
# Optimize VQC parameters with mini-batch stochastic Adam + early stopping on validation loss
opt = qml.AdamOptimizer(stepsize=0.05)
batch_size = 64; max_epochs = 60; patience = 6
best_va = float("inf"); best_w = pnp.array(weights, requires_grad=True); no_improve = 0
history = []

def bce_loss(y_true, p_hat):
    return -pnp.mean(y_true*pnp.log(p_hat) + (1-y_true)*pnp.log(1-p_hat))

def iterate_minibatches(X, y, bs, shuffle=True):
    idx = np.arange(len(y))
    if shuffle:
        np.random.shuffle(idx)
    for i in range(0, len(y), bs):
        sl = idx[i:i+bs]
        yield X[sl], y[sl]

for epoch in range(1, max_epochs+1):
    for Xb, yb in iterate_minibatches(Xtr, ytr, batch_size):
        def cost(w):
            y_true = pnp.array(yb, dtype=float)
            p_hat  = predict_proba(Xb, w, as_numpy=False)
            return bce_loss(y_true, p_hat)

        w_new = opt.step(cost, weights)
        weights = pnp.clip(w_new, -1.5, 1.5)  # Simple parameter clipping to stabilize training

    p_tr = predict_proba(Xtr, weights); p_va = predict_proba(Xva, weights)
    loss_tr = float(bce_loss(ytr, p_tr)); loss_va = float(bce_loss(yva, p_va))
    history.append({"epoch":epoch, "loss_tr":loss_tr, "loss_va":loss_va})
    print(f"epoch {epoch:02d} | loss_tr={loss_tr:.4f} | loss_va={loss_va:.4f}")

    if loss_va + 1e-4 < best_va:
        best_va = float(loss_va); best_w = pnp.array(weights, requires_grad=False); no_improve = 0
    else:
        no_improve += 1
        if no_improve >= patience:
            print("early stopping."); break

weights = best_w
pd.DataFrame(history).to_csv(RESULTS/"metrics/vqc_train_curve.csv", index=False)
np.save(RESULTS/"vqc_weights.npy", np.array(weights, dtype=float))

epoch 01 | loss_tr=0.6535 | loss_va=0.6475
epoch 02 | loss_tr=0.6472 | loss_va=0.6375
epoch 03 | loss_tr=0.6284 | loss_va=0.6157
epoch 04 | loss_tr=0.5795 | loss_va=0.5643
epoch 05 | loss_tr=0.5471 | loss_va=0.5406
epoch 06 | loss_tr=0.5471 | loss_va=0.5418
epoch 07 | loss_tr=0.5461 | loss_va=0.5398
epoch 08 | loss_tr=0.5454 | loss_va=0.5399
epoch 09 | loss_tr=0.5452 | loss_va=0.5407
epoch 10 | loss_tr=0.5449 | loss_va=0.5400
epoch 11 | loss_tr=0.5447 | loss_va=0.5403
epoch 12 | loss_tr=0.5448 | loss_va=0.5407
epoch 13 | loss_tr=0.5448 | loss_va=0.5406
early stopping.


# Cell 4 — metrics (val-optimal threshold)

In [None]:
# Determine decision threshold maximizing validation F1; report metrics for all splits
from sklearn.metrics import f1_score
thr_grid = np.linspace(0.1,0.9,33)
p_tr = predict_proba(Xtr, weights, as_numpy=True)
p_va = predict_proba(Xva, weights, as_numpy=True)
p_te = predict_proba(Xte, weights, as_numpy=True)
best_thr, best_f1 = 0.5, -1.0
for t in thr_grid:
    f1 = f1_score(yva, (p_va>=t).astype(int), zero_division=0)
    if f1>best_f1:
        best_thr, best_f1 = float(t), float(f1)

def pack(y_true, p, split):
    yhat = (p>=best_thr).astype(int)
    acc = accuracy_score(y_true, yhat)
    prec, rec, f1, _ = precision_recall_fscore_support(y_true, yhat, average="binary", zero_division=0)
    try:
        auc = roc_auc_score(y_true, p)
    except Exception:
        auc = float("nan")
    cm = confusion_matrix(y_true, yhat)
    return dict(model="VQC", split=split, acc=acc, prec=prec, rec=rec, f1=f1, auc=auc, thr=best_thr), cm

m_tr, _ = pack(ytr, p_tr, "train")
m_va, _ = pack(yva, p_va, "val")
m_te, cm_te = pack(yte, p_te, "test")
pd.DataFrame([m_tr,m_va,m_te]).to_csv(RESULTS/"metrics/vqc.csv", index=False)
m_te, cm_te

({'model': 'VQC',
  'split': 'test',
  'acc': 0.9228187919463087,
  'prec': 0.9228187919463087,
  'rec': 1.0,
  'f1': 0.9598603839441536,
  'auc': 0.5109881422924901,
  'thr': 0.1},
 array([[  0,  23],
        [  0, 275]]))