Libraries

In [10]:
# External Libraries
import numpy as np
import pandas as pd
from math import ceil, pi, sin
from itertools import combinations
import warnings
warnings.filterwarnings("ignore")
import os
from tqdm import tqdm
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed
import joblib
import json
from pathlib import Path

from qiskit import QuantumCircuit, transpile
from qiskit.quantum_info import SparsePauliOp, Pauli, Statevector
from qiskit.circuit.library import PauliEvolutionGate
from qiskit.synthesis import SuzukiTrotter
from qiskit_aer import AerSimulator
from qiskit_ibm_runtime import EstimatorV2
from qiskit.transpiler.preset_passmanagers import generate_preset_pass_manager

from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import roc_auc_score, f1_score, balanced_accuracy_score, precision_score, recall_score


Global parameters

In [28]:
N_QUBITS = 13            # Number of qubits
TAU = 30e-9              # 30 ns (seconds)
A0 = 2 * pi * 15e9       # rad/s
B0 = 2 * pi * 11e9       # rad/s
#REPS = 4
#M = 4
REPS = 64
M = ceil(2000 / REPS)    # -> 32 slices when REPS=64
DELTA_T = TAU / M
ORDER = 2                # Suzuki-Trotter order
RANDOM_STATE = 42
N_SPLITS = 5

# added by Jiri
OPTIMIZATION_LEVEL = 0
SHOTS = 1024
BATCH_SIZE = 8


Loading dataset

In [12]:
dataset_save = "Dataset/Toxicity-13F.csv"

def load_dataset(filename):
    df = pd.read_csv(filename)
    return df

df = pd.read_csv(dataset_save)
print(df['Class'].value_counts())
df.head()

Class
NonToxic    115
Toxic        56
Name: count, dtype: int64


Unnamed: 0,MDEC-23,MATS2v,ATSC8s,VE3_Dt,CrippenMR,SpMax7_Bhe,SpMin1_Bhs,C1SP2,GATS8e,GATS8s,SpMax5_Bhv,VE3_Dzi,VPC-4,Class
0,60.1757,-0.0231,-0.6667,-167.1241,0.0,3.4009,2.3109,4,1.0229,1.0575,3.5545,-15.594,4.1692,NonToxic
1,44.5031,-0.1236,-16.5096,-16.208,172.2,3.3611,2.1117,2,1.7155,1.7013,3.6066,-14.3317,2.0821,NonToxic
2,37.5488,0.0662,19.3467,-159.1796,173.4028,3.2705,2.0198,8,0.6992,0.7828,3.6441,-25.4493,2.873,NonToxic
3,40.5929,0.0714,-9.5672,-21.4416,177.2726,3.2748,2.0191,6,0.9951,1.0298,3.6564,-19.6376,3.0444,NonToxic
4,52.7343,-0.0861,-11.8892,-2.078,171.1315,3.4094,2.1664,2,0.7363,0.7427,3.5216,-8.2157,2.9469,NonToxic


In [13]:
# Map 'NonToxic' → 1, 'Toxic' → 0
df['Class'] = df['Class'].apply(lambda v: 1 if str(v).strip().lower().startswith('non') else 0)
print(df['Class'].value_counts())
df.head()

Class
1    115
0     56
Name: count, dtype: int64


Unnamed: 0,MDEC-23,MATS2v,ATSC8s,VE3_Dt,CrippenMR,SpMax7_Bhe,SpMin1_Bhs,C1SP2,GATS8e,GATS8s,SpMax5_Bhv,VE3_Dzi,VPC-4,Class
0,60.1757,-0.0231,-0.6667,-167.1241,0.0,3.4009,2.3109,4,1.0229,1.0575,3.5545,-15.594,4.1692,1
1,44.5031,-0.1236,-16.5096,-16.208,172.2,3.3611,2.1117,2,1.7155,1.7013,3.6066,-14.3317,2.0821,1
2,37.5488,0.0662,19.3467,-159.1796,173.4028,3.2705,2.0198,8,0.6992,0.7828,3.6441,-25.4493,2.873,1
3,40.5929,0.0714,-9.5672,-21.4416,177.2726,3.2748,2.0191,6,0.9951,1.0298,3.6564,-19.6376,3.0444,1
4,52.7343,-0.0861,-11.8892,-2.078,171.1315,3.4094,2.1664,2,0.7363,0.7427,3.5216,-8.2157,2.9469,1


QFMs

In [26]:
# -------------------------
# Helper building functions for Hamiltonians
# -------------------------

def print_circuit_specs(circuit):
    print(
        f"""
    Quantum circuit {circuit.name} specifications
    -----------------------------

                    Depth: {circuit.depth()}
                Gate count: {len(circuit)}
        Nonlocal gate count: {circuit.num_nonlocal_gates()}
            Gate breakdown: {", ".join([f"{k.upper()}: {v}" for k, v in circuit.count_ops().items()])}
    """
    )

def s_of_t(t, tau=TAU):
    # s(t) = sin^2( (pi/2) * sin^2(pi t / (2 tau)) )
    inner = sin(pi * t / (2 * tau)) ** 2
    return sin((pi / 2) * inner) ** 2

def A_of_t(t): 
    return A0 * (1.0 - s_of_t(t))

def B_of_t(t): 
    return B0 * s_of_t(t)

def build_HD(n):
    #H_D = - sum_i X_i represented as a SparsePauliOp
    labels = []
    coeffs = []
    for i in range(n):
        s = ['I'] * n
        s[i] = 'X'
        labels.append(''.join(s))
        coeffs.append(-1.0)
    pauli_list = [Pauli(label) for label in labels]
    return SparsePauliOp(pauli_list, coeffs)

def build_HP_from_sample(x_std, Jij):
    # Problem Hamiltonian HP(x) = sum_i hi Z_i + sum_{i<j} Jij Z_i Z_j
    # where hi = x_i and Jij is correlation matrix entries.
    n = len(x_std)
    labels = []
    coeffs = []
    # local fields
    for i in range(n):
        s = ['I'] * n
        s[i] = 'Z'
        labels.append(''.join(s))
        coeffs.append(float(x_std[i]))
    # pairwise ZZ
    for i, j in combinations(range(n), 2):
        s = ['I'] * n
        s[i] = 'Z'
        s[j] = 'Z'
        labels.append(''.join(s))
        coeffs.append(float(Jij[i, j]))
    pauli_list = [Pauli(label) for label in labels]
    return SparsePauliOp(pauli_list, coeffs)

# -------------------------
# Quantum feature extraction
# -------------------------
def make_evolution_circuit(n_qubits, HP_op_time_dep_fn):
    qc = QuantumCircuit(n_qubits)
    # initial |+>^{n}
    for q in range(n_qubits):
        qc.h(q)

    # Prebuild H_D (Pauli sum op)
    HD_op = build_HD(n_qubits)

    # Append m slices. For slice k (0..m-1) freeze at midpoint t_{k+1/2} = (k+0.5)*dt
    for k in range(M):
        t_mid = (k + 0.5) * DELTA_T
        A = A_of_t(t_mid)
        B = B_of_t(t_mid)
        HP_mid = HP_op_time_dep_fn(k)

        # H_slice = A * HD + B * HP_mid
        H_slice = (A * HD_op) + (B * HP_mid)

        evo_gate = PauliEvolutionGate(H_slice, time=DELTA_T, synthesis=SuzukiTrotter(order=ORDER, reps=REPS))
        qc.append(evo_gate, qc.qubits)

    return qc

def statevector_expectation_z(statevec, qubit_index, n_qubits):
    """
    Compute <Z_i> = sum_{basis states} |amp|^2 * (-1)^{bit_i}
    Assumes standard integer index ordering where basis index 0 -> |00...0>.
    We take qubit 0 -> leftmost in Pauli strings above, and map it to the most-significant bit:
    bit = (index >> (n_qubits - 1 - qubit_index)) & 1
    """
    probs = np.abs(statevec.data) ** 2
    exp = 0.0
    for idx, p in enumerate(probs):
        bit = (idx >> (n_qubits - 1 - qubit_index)) & 1
        exp += p * (1.0 if bit == 0 else -1.0)

    return float(exp)

def build_circuit_for_sample(x_std, Jij):
    n = len(x_std)
    HP_op = build_HP_from_sample(x_std, Jij)
    qc = make_evolution_circuit(n, lambda k: HP_op)
    return qc

def _has_saved_statevector(qc):
    for instr, _, _ in qc.data:
        # instruction name can be 'save_statevector' or similar depending on version
        name = getattr(instr, "name", "")
        if name == "save_statevector" or "save_statevector" in name:
            return True
    return False

def compute_quantum_features_aer_batch(X_std, Jij, batch_size=BATCH_SIZE, shots=SHOTS, optimization_level=OPTIMIZATION_LEVEL):
    n_samples = X_std.shape[0]
    n_qubits = X_std.shape[1]
    features = np.zeros((n_samples, n_qubits), dtype=float)

    # Build circuits (do NOT call save_statevector here)
    circuits = [build_circuit_for_sample(X_std[i], Jij) for i in range(n_samples)]

    backend = AerSimulator(method="statevector")

    n_batches = (n_samples + batch_size - 1) // batch_size
    with tqdm(total=n_samples, desc="Aer batch features", unit="sample", dynamic_ncols=True) as pbar:
        for b in range(n_batches):
            start = b * batch_size
            end = min(start + batch_size, n_samples)
            batch_circuits = []

            # For this batch, ensure each circuit has exactly one save_statevector instruction
            for i in range(start, end):
                qc = circuits[i]
                if not _has_saved_statevector(qc):
                    qc.save_statevector()
                batch_circuits.append(qc)

            # Transpile and run this batch
            transpiled = transpile(batch_circuits, backend=backend, optimization_level=optimization_level)
            job = backend.run(transpiled, shots=shots)
            result = job.result()

            # Robust way to obtain experiment results list
            res_list = getattr(result, "results", None)
            if res_list is None:
                # fallback to trying to collect data() for each experiment
                res_list = []
                for j in range(len(transpiled)):
                    try:
                        data_j = result.data(j)
                        wrapper = type("R", (), {"data": type("D", (), {"to_dict": lambda: data_j})})()
                        res_list.append(wrapper)
                    except Exception:
                        raise RuntimeError("Could not recover result list from Aer job result.")

            # iterate returned results and extract saved statevector for each local experiment
            for local_j, res_exp in enumerate(res_list):
                global_i = start + local_j

                # Try common extraction patterns
                amps = None
                # 1) res_exp.data.to_dict() -> dict with 'statevector'
                try:
                    d = res_exp.data.to_dict() if hasattr(res_exp.data, "to_dict") else res_exp.data
                    if isinstance(d, dict) and "statevector" in d:
                        amps = np.asarray(d["statevector"])
                except Exception:
                    amps = None

                # 2) result.get_statevector(local_j)
                if amps is None:
                    try:
                        sv = result.get_statevector(local_j)
                        amps = np.asarray(sv.data) if hasattr(sv, "data") else np.asarray(sv)
                    except Exception:
                        amps = None

                # 3) fallback: maybe res_exp.data is dict-like directly
                if amps is None:
                    try:
                        if isinstance(res_exp.data, dict) and "statevector" in res_exp.data:
                            amps = np.asarray(res_exp.data["statevector"])
                    except Exception:
                        amps = None

                if amps is None:
                    raise RuntimeError(f"Could not extract statevector for batch {b} sample {local_j} (global {global_i})")

                probs = np.abs(amps) ** 2
                # compute <Z> per qubit
                for q in range(n_qubits):
                    exp = 0.0
                    for idx_basis, p in enumerate(probs):
                        bit = (idx_basis >> (n_qubits - 1 - q)) & 1
                        exp += p * (1.0 if bit == 0 else -1.0)
                    features[global_i, q] = exp

                pbar.update(1)

    return features

def compute_quantum_features_aer(X_std, Jij, shots=SHOTS, optimization_level=OPTIMIZATION_LEVEL):
    n_samples = X_std.shape[0]
    n_qubits = X_std.shape[1]
    features = np.zeros((n_samples, n_qubits), dtype=float)

    backend = AerSimulator(method="statevector")

    with tqdm(total=n_samples, desc="Aer features", unit="sample", dynamic_ncols=True) as pbar:
        for i in range(n_samples):
            qc = build_circuit_for_sample(X_std[i], Jij)
            if not _has_saved_statevector(qc):
                qc.save_statevector()

            transpiled = transpile(qc, backend=backend, optimization_level=optimization_level)
            if i == 0:
                print_circuit_specs(transpiled)
            job = backend.run(transpiled, shots=shots)
            result = job.result()

            # Extract statevector
            try:
                sv = result.get_statevector()
                amps = np.asarray(sv.data) if hasattr(sv, "data") else np.asarray(sv)
            except Exception:
                data = result.data(0)
                amps = np.asarray(data.get("statevector"))

            # Compute <Z> expectation per qubit
            probs = np.abs(amps) ** 2
            for q in range(n_qubits):
                exp = 0.0
                for idx_basis, p in enumerate(probs):
                    bit = (idx_basis >> (n_qubits - 1 - q)) & 1
                    exp += p * (1.0 if bit == 0 else -1.0)
                features[i, q] = exp

            pbar.update(1)

    return features

def run_quantum_feature_extraction(X_std, Jij, n_qubits=N_QUBITS, shots=SHOTS, optimization_level=OPTIMIZATION_LEVEL):
    n_samples = X_std.shape[0]
    features = np.zeros((n_samples, n_qubits), dtype=float)

    simulator = AerSimulator(method="statevector")
    estimator = EstimatorV2(mode=simulator)
    pass_manager = generate_preset_pass_manager(backend=simulator, optimization_level=optimization_level)

    # Prebuild single-qubit Z observables (SparsePauliOp) for expectation readout
    z_observables = []
    for i in range(n_qubits): 
        s = ['I'] * n_qubits 
        s[i] = 'Z' 
        z_observables.append(SparsePauliOp([Pauli(''.join(s))], [1.0]))

    with tqdm(total=n_samples, desc="Aer features", unit="sample", dynamic_ncols=True) as pbar:
        for idx in range(n_samples):
            qc = build_circuit_for_sample(X_std[i], Jij)
            qc_transpiled = pass_manager.run(qc)

            # Estimate <Z_i> for each qubit i
            job = estimator.run([(qc_transpiled, z_observables)])
            pub_result = job.result()[0]
            features[idx, :] = pub_result.data.evs
            
            pbar.update(1)

    
    return features

# ------------------------- 
# Main experiment: CV loop 
# ------------------------- 
def run_experiment(X, y): 
    skf = StratifiedKFold(n_splits=N_SPLITS, shuffle=True, random_state=RANDOM_STATE)

    metrics = { 'auc': [], 'f1': [], 'bal_acc': [], 'prec_0': [], 'rec_0': [], 'prec_1': [], 'rec_1': [] }

    fold_idx = 0 
    for train_idx, test_idx in skf.split(X, y): 
        fold_idx += 1 
        Xtrain, Xtest = X[train_idx], X[test_idx] 
        ytrain, ytest = y[train_idx], y[test_idx]
     
        # preprocessing 
        imputer = SimpleImputer(strategy='median') 
        imputer.fit(Xtrain) 
        Xtrain_im = imputer.transform(Xtrain) 
        Xtest_im = imputer.transform(Xtest) 

        scaler = StandardScaler() 
        scaler.fit(Xtrain_im) 
        Xtrain_std = scaler.transform(Xtrain_im) 
        Xtest_std = scaler.transform(Xtest_im) 

        # Jij computed from training set Pearson correlations, diag set to 0 
        rho = np.corrcoef(Xtrain_std, rowvar=False) 
        np.fill_diagonal(rho, 0.0) 
        Jij = rho.copy() 

        # Quantum features for all training and test samples 
        print(f"Fold {fold_idx}: computing quantum features for {len(Xtrain_std)} train + {len(Xtest_std)} test samples...") 
        #Xtilde_train = compute_quantum_features_aer_batch(Xtrain_std, Jij) 
        #Xtilde_test = compute_quantum_features_aer_batch(Xtest_std, Jij) 
        #Xtilde_train = compute_quantum_features_aer(Xtrain_std, Jij, shots=SHOTS, optimization_level=OPTIMIZATION_LEVEL)
        #Xtilde_test = compute_quantum_features_aer(Xtest_std, Jij, shots=SHOTS, optimization_level=OPTIMIZATION_LEVEL)
        Xtilde_train = run_quantum_feature_extraction(Xtrain_std, Jij, n_qubits=N_QUBITS,shots=SHOTS, optimization_level=OPTIMIZATION_LEVEL)
        Xtilde_test = run_quantum_feature_extraction(Xtest_std, Jij, n_qubits=N_QUBITS,shots=SHOTS, optimization_level=OPTIMIZATION_LEVEL)
        
        # Augment classical features with quantum features (gamma_q = 1) 
        Xaug_train = np.hstack([Xtrain_std, Xtilde_train]) 
        Xaug_test = np.hstack([Xtest_std, Xtilde_test]) 
        
        # classifier 
        clf = GradientBoostingClassifier(random_state=RANDOM_STATE) 
        clf.fit(Xaug_train, ytrain) 
        ypred = clf.predict(Xaug_test) 
        yproba = clf.predict_proba(Xaug_test)[:, 1] 
        
        # metrics 
        metrics['auc'].append(roc_auc_score(ytest, yproba)) 
        metrics['f1'].append(f1_score(ytest, ypred, zero_division=0)) 
        metrics['bal_acc'].append(balanced_accuracy_score(ytest, ypred)) 
        metrics['prec_0'].append(precision_score(ytest, ypred, pos_label=0, zero_division=0)) 
        metrics['rec_0'].append(recall_score(ytest, ypred, pos_label=0, zero_division=0)) 
        metrics['prec_1'].append(precision_score(ytest, ypred, pos_label=1, zero_division=0)) 
        metrics['rec_1'].append(recall_score(ytest, ypred, pos_label=1, zero_division=0)) 

    # Print median metrics (matching how the PDF aggregated by median across folds)
    print("\n=== Median metrics over 5 folds ===")
    for k, v in metrics.items():
        print(f"{k:8s}: {np.median(v):.4f}")
        
    return metrics

def _make_meta():
    return {
    "REPS": REPS,
    "M": M,
    "ORDER": ORDER,
    "DELTA_T": DELTA_T,
    "A0": A0,
    "B0": B0,
    "TAU": TAU,
    "RANDOM_STATE": RANDOM_STATE
    }

def run_experiment_and_save(X, y, outdir="saved_models", save_per_fold=True, save_final_model=True, aer_batch_kwargs=None):
    if aer_batch_kwargs is None:
        aer_batch_kwargs = {}

    os.makedirs(outdir, exist_ok=True)

    skf = StratifiedKFold(n_splits=N_SPLITS, shuffle=True, random_state=RANDOM_STATE)
    metrics = { 'auc': [], 'f1': [], 'bal_acc': [], 'prec_0': [], 'rec_0': [], 'prec_1': [], 'rec_1': [] }
    artifact_paths = {"folds": {}, "final": {}}

    fold_idx = 0
    for train_idx, test_idx in skf.split(X, y):
        fold_idx += 1
        Xtrain, Xtest = X[train_idx], X[test_idx]
        ytrain, ytest = y[train_idx], y[test_idx]

        # Preprocessing fit on train only
        imputer = SimpleImputer(strategy='median')
        imputer.fit(Xtrain)
        Xtrain_im = imputer.transform(Xtrain)
        Xtest_im = imputer.transform(Xtest)

        scaler = StandardScaler()
        scaler.fit(Xtrain_im)
        Xtrain_std = scaler.transform(Xtrain_im)
        Xtest_std = scaler.transform(Xtest_im)

        # Jij from training set
        rho = np.corrcoef(Xtrain_std, rowvar=False)
        np.fill_diagonal(rho, 0.0)
        Jij = rho.copy()

        # Quantum features (uses compute_quantum_features_aer_batch)
        print(f"[Fold {fold_idx}] computing quantum features for {len(Xtrain_std)} train + {len(Xtest_std)} test samples...")
        Xtilde_train = compute_quantum_features_aer_batch(Xtrain_std, Jij, **aer_batch_kwargs)
        Xtilde_test  = compute_quantum_features_aer_batch(Xtest_std,  Jij, **aer_batch_kwargs) 

        # Augment and train
        Xaug_train = np.hstack([Xtrain_std, Xtilde_train])
        Xaug_test  = np.hstack([Xtest_std,  Xtilde_test])

        clf = GradientBoostingClassifier(random_state=RANDOM_STATE)
        clf.fit(Xaug_train, ytrain)

        ypred = clf.predict(Xaug_test)
        # try to get probabilities; if unavailable, fill with None or zeros
        try:
            yproba = clf.predict_proba(Xaug_test)[:, 1]
        except Exception:
            yproba = None

        # Collect metrics
        # only compute AUC if probabilities are available
        if yproba is not None:
            metrics['auc'].append(roc_auc_score(ytest, yproba))
        else:
            metrics['auc'].append(np.nan)

        metrics['f1'].append(f1_score(ytest, ypred, zero_division=0))
        metrics['bal_acc'].append(balanced_accuracy_score(ytest, ypred))
        metrics['prec_0'].append(precision_score(ytest, ypred, pos_label=0, zero_division=0))
        metrics['rec_0'].append(recall_score(ytest, ypred, pos_label=0, zero_division=0))
        metrics['prec_1'].append(precision_score(ytest, ypred, pos_label=1, zero_division=0))
        metrics['rec_1'].append(recall_score(ytest, ypred, pos_label=1, zero_division=0))

        # Save per-fold artifacts and per-fold metrics/predictions
        fold_art = {}
        if save_per_fold:
            fold_dir = Path(outdir) / f"fold_{fold_idx}"
            fold_dir.mkdir(parents=True, exist_ok=True)

            model_path   = str(fold_dir / "model.joblib")
            imputer_path = str(fold_dir / "imputer.joblib")
            scaler_path  = str(fold_dir / "scaler.joblib")
            Jij_path     = str(fold_dir / "Jij.npy")
            meta_path    = str(fold_dir / "meta.json")
            metrics_path = str(fold_dir / "metrics.json")
            preds_path   = str(fold_dir / "predictions.npz")

            joblib.dump(clf, model_path)
            joblib.dump(imputer, imputer_path)
            joblib.dump(scaler, scaler_path)
            np.save(Jij_path, Jij)
            with open(meta_path, "w") as fh:
                json.dump(_make_meta(), fh, indent=2)

            # per-fold metrics dictionary (turn NaN into null in JSON)
            fold_metrics = {
                "auc": (float(metrics['auc'][-1]) if not np.isnan(metrics['auc'][-1]) else None),
                "f1": float(metrics['f1'][-1]),
                "bal_acc": float(metrics['bal_acc'][-1]),
                "prec_0": float(metrics['prec_0'][-1]),
                "rec_0": float(metrics['rec_0'][-1]),
                "prec_1": float(metrics['prec_1'][-1]),
                "rec_1": float(metrics['rec_1'][-1]),
                "n_train": int(len(Xtrain)),
                "n_test": int(len(Xtest))
            }
            # save fold metrics JSON
            with open(metrics_path, "w") as fh:
                json.dump(fold_metrics, fh, indent=2)

            # save predictions & test indices for reproducibility
            # yproba may be None; if so store an array of NaNs
            if yproba is None:
                yproba_arr = np.full_like(ypred, np.nan, dtype=float)
            else:
                yproba_arr = np.asarray(yproba, dtype=float)

            np.savez_compressed(preds_path,
                                y_test=np.asarray(ytest),
                                y_pred=np.asarray(ypred),
                                y_proba=yproba_arr,
                                test_indices=np.asarray(test_idx))

            fold_art = {
                "model": model_path,
                "imputer": imputer_path,
                "scaler": scaler_path,
                "Jij": Jij_path,
                "meta": meta_path,
                "metrics": metrics_path,
                "predictions": preds_path
            }
            artifact_paths["folds"][fold_idx] = fold_art
            print(f"[Fold {fold_idx}] saved artifacts & metrics to {fold_dir}")

    # Aggregate & save metrics (medians as in the PDF)
    metrics_median = {k: float(np.nanmedian(v)) for k, v in metrics.items()}
    metrics_out = {"per_fold": metrics, "median": metrics_median}
    metrics_path_all = str(Path(outdir) / "metrics.json")
    with open(metrics_path_all, "w") as fh:
        json.dump(metrics_out, fh, indent=2)
    artifact_paths["metrics"] = metrics_path_all

    print("\n=== Median metrics over 5 folds ===")
    for k, v in metrics_median.items():
        print(f"{k:8s}: {v:.4f}")

    # Train & save final model on full dataset if requested
    if save_final_model:
        print("Training final model on full dataset (will compute Jij from full data)...")
        # Preprocessing on full data
        imputer_full = SimpleImputer(strategy='median')
        imputer_full.fit(X)
        X_im_full = imputer_full.transform(X)

        scaler_full = StandardScaler()
        scaler_full.fit(X_im_full)
        X_std_full = scaler_full.transform(X_im_full)

        Jij_full = np.corrcoef(X_std_full, rowvar=False)
        np.fill_diagonal(Jij_full, 0.0)

        # quantum features for full dataset (may be slow)
        Xtilde_full = compute_quantum_features_aer_batch(X_std_full, Jij_full, **aer_batch_kwargs)

        Xaug_full = np.hstack([X_std_full, Xtilde_full])

        final_clf = GradientBoostingClassifier(random_state=RANDOM_STATE)
        final_clf.fit(Xaug_full, y)

        # Save final artifacts
        final_dir = Path(outdir) / "final"
        final_dir.mkdir(parents=True, exist_ok=True)
        final_model_path = str(final_dir / "final_model.joblib")
        imputer_full_path = str(final_dir / "imputer_full.joblib")
        scaler_full_path = str(final_dir / "scaler_full.joblib")
        Jij_full_path = str(final_dir / "Jij_full.npy")
        meta_full_path = str(final_dir / "meta_full.json")

        joblib.dump(final_clf, final_model_path)
        joblib.dump(imputer_full, imputer_full_path)
        joblib.dump(scaler_full, scaler_full_path)
        np.save(Jij_full_path, Jij_full)
        with open(meta_full_path, "w") as fh:
            json.dump(_make_meta(), fh, indent=2)

        artifact_paths["final"] = {
            "model": final_model_path,
            "imputer": imputer_full_path,
            "scaler": scaler_full_path,
            "Jij": Jij_full_path,
            "meta": meta_full_path
        }
        print(f"Saved final model+artifacts to {final_dir}")

    return {"metrics": metrics_out, "artifact_paths": artifact_paths}



Train and save model

In [None]:
if __name__ == "__main__":
    # load
    df = load_dataset(dataset_save)

    # PDF mapped NonToxic -> 1, Toxic -> 0
    if 'Class' in df.columns or 'class' in df.columns:
        name = 'Class' if 'Class' in df.columns else 'class'
        y_raw = df[name].values
        X = df.drop(columns=[name]).values


    # Map textual labels to binary per PDF: NonToxic -> 1, Toxic -> 0
    # If labels are already numeric {0,1}, keep them
    if y_raw.dtype.kind in 'OU':  # object / strings
        y = np.array([1 if str(v).lower().startswith('non') else 0 for v in y_raw], dtype=int)
    else:
        y = np.array(y_raw, dtype=int)

    print(f"Dataset shape: X={X.shape}, y={y.shape}, positive fraction={y.mean():.3f}")

    # Run experiment
    metrics = run_experiment(X,y)
    #metrics = run_experiment_and_save(X, y, outdir="saved_models_32_32", save_per_fold=True, save_final_model=True, aer_batch_kwargs={"batch_size": 16, "shots": 1024, "optimization_level": 1})

Dataset shape: X=(171, 13), y=(171,), positive fraction=0.673
Fold 1: computing quantum features for 136 train + 35 test samples...


Aer features:   1%|          | 1/136 [00:24<54:31, 24.23s/sample]

Predict with trained model

In [None]:
def predict_with_saved_model(X_new, model_dir="saved_models/final", aer_batch_kwargs=None):

    if aer_batch_kwargs is None:
        aer_batch_kwargs = {}

    model_dir = Path(model_dir)
    model_path = model_dir / "final_model.joblib"
    imputer_path = model_dir / "imputer_full.joblib"
    scaler_path  = model_dir / "scaler_full.joblib"
    Jij_path     = model_dir / "Jij_full.npy"
    meta_path    = model_dir / "meta_full.json"

    # --- Load all artifacts ---
    clf = joblib.load(model_path)
    imputer = joblib.load(imputer_path)
    scaler = joblib.load(scaler_path)
    Jij = np.load(Jij_path)
    with open(meta_path, "r") as fh:
        meta = json.load(fh)

    print("Loaded model and preprocessing artifacts from", model_dir)
    print("Quantum circuit params:", meta)

    # --- Preprocessing ---
    X_im = imputer.transform(X_new)
    X_std = scaler.transform(X_im)

    # --- Quantum features ---
    print(f"Computing quantum features for {len(X_std)} new samples...")
    X_tilde = compute_quantum_features_aer_batch(X_std, Jij, **aer_batch_kwargs)

    # --- Augment and predict ---
    X_aug = np.hstack([X_std, X_tilde])
    y_proba = clf.predict_proba(X_aug)[:, 1]
    y_pred = (y_proba >= 0.5).astype(int)

    return {"y_pred": y_pred, "y_proba": y_proba, "X_aug": X_aug}


# Suppose you have a few test samples (same number of features as training)
X_new = X[:5]  # or load from another CSV

# Predict using the saved final model
result = predict_with_saved_model(X_new, model_dir="saved_models_4_4/final", aer_batch_kwargs={'batch_size': 4})

print("Predicted labels:", result['y_pred'])
print("Predicted probabilities:", result['y_proba'])



Loaded model and preprocessing artifacts from saved_models_4_4\final
Quantum circuit params: {'REPS': 4, 'M': 4, 'ORDER': 2, 'DELTA_T': 7.5e-09, 'A0': 94247779607.69379, 'B0': 69115038378.97545, 'TAU': 3e-08, 'RANDOM_STATE': 42}
Computing quantum features for 5 new samples...


Aer batch features:   0%|          | 0/5 [00:00<?, ?sample/s]

Aer batch features: 100%|██████████| 5/5 [00:01<00:00,  4.53sample/s]

Predicted labels: [1 1 1 1 1]
Predicted probabilities: [0.98395874 0.95508129 0.99005912 0.97341944 0.98635758]



