In [None]:
import os
import numpy as np
import pandas as pd

from sklearn.model_selection import train_test_split, RepeatedStratifiedKFold
from sklearn.metrics import roc_auc_score

import pennylane as qml
from pennylane import numpy as pnp

import time


In [None]:
rskf = RepeatedStratifiedKFold(n_splits=2, n_repeats=30, random_state=42)  # 先10次验证流程，后面可加到30/50

auc_scores = []
skipped = 0

In [None]:
PROJECT_DIR = r"C:\Users\93539\qml_project"  # <-- 改成你的 qml_project 路径
os.chdir(PROJECT_DIR)
print("CWD:", os.getcwd())

Z = np.load(r"C:\Users\93539\qml_project\D32\Z_latent_D32.npy")   # (590000, 16)
y = np.load(r"C:\Users\93539\qml_project\D32\y_labels_D32.npy")       # (590000,)
print("Z shape:", Z.shape, "y shape:", y.shape, "pos rate:", y.mean())

In [None]:
# =========================
# 1) 抽样：N=100，保证至少 k 个正例
# =========================
def stratified_subsample_fixed_pos(Z, y, N=100, min_pos=5, seed=42):
    rng = np.random.default_rng(seed)
    pos_idx = np.where(y == 1)[0]
    neg_idx = np.where(y == 0)[0]

    # 目标正例数：至少 min_pos，同时不超过 N/2（避免太夸张）
    target_pos = max(min_pos, int
                     (round(N * y.mean())))
    target_pos = min(target_pos, N // 2)
    target_pos = min(target_pos, len(pos_idx))
    target_neg = N - target_pos

    sel_pos = rng.choice(pos_idx, size=target_pos, replace=False)
    sel_neg = rng.choice(neg_idx, size=target_neg, replace=False)
    sel = np.concatenate([sel_pos, sel_neg])
    rng.shuffle(sel)

    return Z[sel], y[sel]

X_small, y_small = stratified_subsample_fixed_pos(Z, y, N=100, min_pos=5, seed=42)
print("N:", len(y_small), "pos:", int(y_small.sum()), "pos_rate:", y_small.mean())

In [None]:
#Save these samples

np.savez(
    "Xy_small_N100_seed42_d16.npz",
    X=X_small,
    y=y_small
)

print("Saved:", X_small.shape, y_small.shape)


#To read, use the following code
data = np.load("Xy_small_N100_seed42_d16.npz")
X_small = data["X"]
y_small = data["y"]

print("Loaded:", X_small.shape, y_small.shape)
print("pos:", int(y_small.sum()), "pos_rate:", y_small.mean())

In [None]:
#To read, use the following code
data = np.load("Xy_small_N100D32_seed42.npz")
X_small = data["X"]
y_small = data["y"]

print("Loaded:", X_small.shape, y_small.shape)
print("pos:", int(y_small.sum()), "pos_rate:", y_small.mean())

In [None]:
# ====== 3) Draw a small sample N (stratified, ensuring the positive case proportion approximates the population)======
N = 200  
rng = np.random.default_rng(42)

pos_idx = np.where(y == 1)[0]
neg_idx = np.where(y == 0)[0]

# Sample positive and negative examples proportionally (ensuring at least 2 positive examples to prevent training failure)
target_pos = max(2, int(round(N * y.mean())))
target_pos = min(target_pos, len(pos_idx))
target_neg = N - target_pos

sel_pos = rng.choice(pos_idx, size=target_pos, replace=False)
sel_neg = rng.choice(neg_idx, size=target_neg, replace=False)
sel = np.concatenate([sel_pos, sel_neg])
rng.shuffle(sel)

X_small = Z[sel]
y_small = y[sel]
print(f"N={len(y_small)}, pos={int(y_small.sum())}, pos_rate={y_small.mean():.4f}")

In [None]:
# =========================
# 2) Preprocessing: Scale inputs to [-π, π] for angle encoding
# (using training set statistics; performed within each fold)
# =========================
def scale_to_pi(X_train, X_test, eps=1e-8):
    # 标准化 -> tanh 压缩 -> 映射到 [-pi, pi]
    mu = X_train.mean(axis=0, keepdims=True)
    std = X_train.std(axis=0, keepdims=True) + eps
    Xtr = (X_train - mu) / std
    Xte = (X_test - mu) / std

    Xtr = np.tanh(Xtr) * np.pi
    Xte = np.tanh(Xte) * np.pi
    return Xtr, Xte


In [None]:
# -------------------------
Initialize the shape of weights based on variants
# -------------------------
def init_weights(seed=42):
    rng = np.random.default_rng(seed)
    if "A2" in VARIANT:
        shape = (n_layers, n_qubits, 1)
    elif "A1" in VARIANT:
        shape = (n_layers, n_qubits, 3)
    else:
        shape = (n_layers, n_qubits, 2)
    w0 = 0.01 * rng.standard_normal(size=shape)
    return pnp.array(w0, requires_grad=True)

In [None]:
def predict_proba(X, weights, bias, w_read=None):
    probs = []
    for i in range(X.shape[0]):
        out = circuit(X[i], weights)

        if isinstance(out, (tuple, list)):
            z = pnp.array(out)  # shape (n_qubits,)

            if (w_read is not None):   # ✅ D2：可训练加权读出
                out_scalar = pnp.dot(w_read, z)
            else:                       # ✅ D1：固定平均
                out_scalar = pnp.mean(z)
        else:
            out_scalar = out            # baseline 等：标量

        logit = 2.0 * out_scalar + bias
        probs.append(sigmoid(logit))
    return pnp.array(probs)

def weighted_bce_loss(y_true, y_prob, pos_weight=10.0, eps=1e-8):
    # y_true ∈ {0,1}
    y_true = pnp.array(y_true)
    y_prob = pnp.clip(y_prob, eps, 1 - eps)
    loss = -(pos_weight * y_true * pnp.log(y_prob) + (1 - y_true) * pnp.log(1 - y_prob))
    return pnp.mean(loss)

def train_vqc(X_train, y_train, steps=50, lr=0.05, seed=42):
    if "D2" in VARIANT:
        w_read = pnp.array(np.zeros(n_qubits), requires_grad=True)  # 初始全0
    else:
        w_read = None
        
    rng = np.random.default_rng(seed)
    weights = init_weights(seed)
    bias = pnp.array(0.0, requires_grad=True)

    # 正例权重（建议用 neg/pos）
    pos = int(np.sum(y_train))
    neg = len(y_train) - pos
    pos_weight = (neg / max(pos, 1))  # 经典做法
    pos_weight = float(pos_weight)

    opt = qml.AdamOptimizer(stepsize=lr)

    def objective(w, b, wr):
        probs = predict_proba(X_train, w, b, wr)
        return weighted_bce_loss(y_train, probs, pos_weight=pos_weight)

    for _ in range(steps):
        if "D2" in VARIANT:
            weights, bias, w_read = opt.step(objective, weights, bias, w_read)
        else:
            weights, bias = opt.step(lambda w,b: objective(w,b,None), weights, bias)



    return weights, bias, w_read

In [None]:
# =========================
# 3) Modify the VQC circuit structure: 4 qubits + data re-uploading (4 dimensions injected each time, 4 times total = 16 dimensions)
# =========================
n_qubits = 4
n_layers = 1  
dev = qml.device("lightning.qubit", wires=n_qubits)

# -------------------------
# 纠缠结构：ring / chain
# -------------------------
def entangle_ring():
    for i in range(n_qubits):
        qml.CNOT(wires=[i, (i + 1) % n_qubits])

def entangle_chain():
    qml.CNOT(wires=[0, 1])
    qml.CNOT(wires=[1, 2])
    qml.CNOT(wires=[2, 3])

def entangle():
    if "B1" in VARIANT:
        entangle_chain()
    else:
        entangle_ring()

# -------------------------
# 数据注入（embedding）
# -------------------------
def embed_block(x4):
    # baseline: 只 RY
    if "C1" in VARIANT:
        # C1：更强的 embedding：RY + RZ（相位也注入）
        for q in range(n_qubits):
            qml.RY(x4[q], wires=q)
            qml.RZ(x4[q], wires=q)
    else:
        for q in range(n_qubits):
            qml.RY(x4[q], wires=q)

# -------------------------
# 可训练块（trainable block）
# weights shape 根据变体不同
# -------------------------
def trainable_block(weights, l):
    if "A2" in VARIANT:
        # A2：更强正则：只用 RY（参数更少）
        # weights: (n_layers, n_qubits, 1)
        for q in range(n_qubits):
            qml.RY(weights[l, q, 0], wires=q)

    elif  "A1" in VARIANT:
        # A1：更强表达：Rot（三参数）
        # weights: (n_layers, n_qubits, 3)
        for q in range(n_qubits):
            qml.Rot(weights[l, q, 0], weights[l, q, 1], weights[l, q, 2], wires=q)

    else:
        # baseline / B1 / C1 / D1：RY + RZ（两参数）
        # weights: (n_layers, n_qubits, 2)
        for q in range(n_qubits):
            qml.RY(weights[l, q, 0], wires=q)
            qml.RZ(weights[l, q, 1], wires=q)

# -------------------------
# 读出（readout）
# -------------------------
def readout():
    # D1 或 D2 都返回 tuple，外面再处理
    if ("D1" in VARIANT) or ("D2" in VARIANT):
        return tuple(qml.expval(qml.PauliZ(i)) for i in range(n_qubits))
    else:
        return qml.expval(qml.PauliZ(0))

# -------------------------
# 电路本体：16维 re-uploading（4次×4维）
# -------------------------
@qml.qnode(dev, interface="autograd")
def circuit(x, weights):
    num_chunks = x.shape[0] // n_qubits  # 16->4, 32->8
    for l in range(n_layers):
        for chunk in range(num_chunks):
            xq = x[chunk * n_qubits:(chunk + 1) * n_qubits]
            embed_block(xq)

            # ✅ B2-weak：每层只纠缠一次
            if ("B2" in VARIANT) and (chunk != 0):
                pass
            else:
                entangle()

            trainable_block(weights, l)
    return readout()
def sigmoid(z):
    return 1.0 / (1.0 + pnp.exp(-z))


In [None]:
#A2 (Stronger Regular Expressions / Weaker Injection): RY Injection Only
def feature_map_A2(x16):
    for chunk in range(4):
        x4 = x16[chunk*4:(chunk+1)*4]
        for q in range(n_qubits):
            qml.RY(x4[q], wires=q)
        entangle_ring()

In [None]:
#A1 (Stronger Expression): Make injection more “Rot-like”
def feature_map_A1(x16):
    for chunk in range(4):
        x4 = x16[chunk*4:(chunk+1)*4]
        for q in range(n_qubits):
            a = x4[q]
            qml.Rot(a, a, a*a, wires=q)
        entangle_ring()

In [None]:
#B1 (Chain entanglement): Change ring to chain
def entangle_chainB1():
    qml.CNOT(wires=[0,1])
    qml.CNOT(wires=[1,2])
    qml.CNOT(wires=[2,3])

def feature_map_B1(x16):
    for chunk in range(4):
        x4 = x16[chunk*4:(chunk+1)*4]
        for q in range(n_qubits):
            qml.RY(x4[q], wires=q)
            qml.RZ(x4[q], wires=q)
        entangle_chainB1()

def feature_map_A1_B1(x16):
    for chunk in range(4):
        x4 = x16[chunk*4:(chunk+1)*4]
        for q in range(n_qubits):
            a = x4[q]
            qml.Rot(a, a, a*a, wires=q)
        entangle_chainB1()

In [None]:
#C1 (Enhanced Data Injection): Adding “Quadratic Term Injection” to the baseline
def feature_map_C1(x16):
    for chunk in range(4):
        x4 = x16[chunk*4:(chunk+1)*4]
        for q in range(n_qubits):
            a = x4[q]
            qml.RY(a, wires=q)
            qml.RZ(a, wires=q)
            qml.RY(a*a, wires=q)   
        entangle_ring()

In [None]:
#D1 (mean(Zi) readout): QSVM lacks a dedicated “readout head,” employing “multi-bit overlap” for approximation.
def quantum_kernel_D1(x1, x2):
    probs = kernel_circuit(x1, x2)  # length 2^n_qubits
    # parity-even states: indices with even number of 1s
    total = 0.0
    for idx, p in enumerate(probs):
        if bin(idx).count("1") % 2 == 0:
            total += p
    return float(total)


In [None]:
# ========= 你要跑的结构 ========="baseline" "A2" "A1" "B1" "C1" "D1" "B1+C1", "B1+D1", "B1+C1+D1", "B1+D1+B2", "B1+D2", "B1+D2+B2"
variants = [ "baseline"]


# ========= 你要跑的 CV seed =========
cv_seeds = [42]

# ========= 结果容器 =========
all_rows = []

t_start = time.time()

for VARIANT in variants:
    print("\n==============================")
    print("Running VARIANT:", VARIANT)
    print("==============================")

    for cv_seed in cv_seeds:
        print(f"\n--- CV seed = {cv_seed} ---")

        rskf = RepeatedStratifiedKFold(
            n_splits=2,
            n_repeats=30,
            random_state=cv_seed
        )

        auc_scores = []
        skipped = 0

        for fold_id, (train_idx, test_idx) in enumerate(rskf.split(X_small, y_small), 1):
            X_train_raw, X_test_raw = X_small[train_idx], X_small[test_idx]
            y_train, y_test = y_small[train_idx], y_small[test_idx]

            # 防止单类fold（你这边一般不会发生，但保留更稳）
            if len(np.unique(y_train)) < 2 or len(np.unique(y_test)) < 2:
                skipped += 1
                continue

            # fold 内缩放到 [-pi, pi]
            X_train, X_test = scale_to_pi(X_train_raw, X_test_raw)

            # 训练：注意 train_vqc 必须返回 (weights, bias, w_read)
            # seed 用 fold_id 做变化，保证每个fold初始化不同但可复现
            weights, bias, w_read = train_vqc(
                X_train, y_train,
                steps=30,
                lr=0.05,
                seed=1000 * cv_seed + fold_id  # ✅ 推荐这样写，确保不同cv_seed下初始化也不同
            )

            # 预测并算 AUC
            y_prob = np.array(predict_proba(X_test, weights, bias, w_read), dtype=float)
            auc_scores.append(roc_auc_score(y_test, y_prob))

        mean_auc = float(np.mean(auc_scores)) if len(auc_scores) > 0 else np.nan
        std_auc  = float(np.std(auc_scores))  if len(auc_scores) > 0 else np.nan

        print(f"[{VARIANT}] seed={cv_seed}  AUC={mean_auc:.4f} ± {std_auc:.4f}  folds={len(auc_scores)}  skipped={skipped}")

        all_rows.append({
            "VARIANT": VARIANT,
            "cv_seed": cv_seed,
            "n_splits": 2,
            "n_repeats": 30,
            "folds_used": len(auc_scores),
            "skipped": skipped,
            "steps": 30,
            "lr": 0.05,
            "AUC_mean": mean_auc,
            "AUC_std": std_auc
        })

        # ========= 输出汇总表 =========
df_results = pd.DataFrame(all_rows)

print("\n======== SUMMARY (by VARIANT) ========")
print(df_results.groupby("VARIANT")[["AUC_mean", "AUC_std"]].mean())

# ========= 保存到文件（强烈建议）=========
#df_results.to_csv("vqc_variants_cvseeds_results.csv", index=False)
#print("\nSaved -> vqc_variants_cvseeds_results.csv")

print(f"\nTotal elapsed: {(time.time() - t_start)/60:.1f} minutes")
print(f"\nlayers:{n_layers}")
print("pos:", int(y_small.sum()), "pos_rate:", y_small.mean())