In [1]:
!pip install pennylane scikit-learn numpy pandas joblib



In [3]:
import os
import time
import numpy as np
import pandas as pd
import joblib

from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.decomposition import PCA
from sklearn.metrics import accuracy_score, classification_report
from sklearn.svm import SVC
from sklearn.preprocessing import MinMaxScaler

# Pennylane
try:
    import pennylane as qml
    from pennylane import numpy as qnp
except Exception as e:
    raise ImportError("Install pennylane: pip install pennylane") from e


In [5]:
# -----------------------------
# USER TUNABLE FLAGS (speed vs accuracy)
# -----------------------------
FAST_MODE = True  # set False if you want more accuracy and don't mind time
VERBOSE = True

# Base random seed
RND = 42
np.random.seed(RND)
try:
    qnp.random.seed(RND)
except Exception:
    pass

# Artifacts dir
ARTIFACTS = "artifacts"
os.makedirs(ARTIFACTS, exist_ok=True)

In [7]:

# -----------------------------
# Hyperparameters (fast defaults)
# -----------------------------
if FAST_MODE:
    TFIDF_MAX_FEATURES = 500   # smaller TF-IDF -> fewer PCA comps needed
    N_QUBITS = 4               # fewer qubits -> huge speedup
    N_LAYERS = 1               # smaller ansatz
    EPOCHS = 20                # small but with larger stepsize and bigger batches
    BATCH_SIZE = 128
    STEPSIZE = 0.12            # larger step
    EARLY_STOPPING = 4         # stop earlier
else:
    TFIDF_MAX_FEATURES = 2000
    N_QUBITS = 6
    N_LAYERS = 3
    EPOCHS = 80
    BATCH_SIZE = 64
    STEPSIZE = 0.05
    EARLY_STOPPING = 10

PCA_COMPONENTS = N_QUBITS
DEVICE_NAME = "default.qubit"  # statevector simulator
DATA_FILES = ["spam_ham_dataset.csv", "spam.csv"]

In [9]:
# -----------------------------
# Data utilities
# -----------------------------
def load_and_merge(paths):
    frames = []
    for p in paths:
        if not os.path.exists(p):
            if VERBOSE:
                print(f"Info: {p} not found, skipping.")
            continue
        try:
            df = pd.read_csv(p, encoding="utf-8", low_memory=False)
        except Exception:
            df = pd.read_csv(p, encoding="latin-1", low_memory=False)

        if "text" in df.columns and ("label" in df.columns or "label_num" in df.columns):
            if "label_num" not in df.columns:
                df["label_num"] = df["label"].map(lambda s: 1 if str(s).strip().lower().startswith("spam") else 0)
            frames.append(df[["text", "label_num"]].rename(columns={"label_num": "label"}))
        elif "v2" in df.columns and "v1" in df.columns:
            frames.append(pd.DataFrame({
                "text": df["v2"].astype(str),
                "label": df["v1"].apply(lambda s: 1 if str(s).strip().lower()=="spam" else 0)
            }))
        else:
            cols = list(df.columns)
            if len(cols) >= 2:
                tmp = df[[cols[0], cols[1]]].rename(columns={cols[0]: "text", cols[1]: "label"})
                try:
                    tmp["label"] = tmp["label"].astype(int)
                except Exception:
                    tmp["label"] = tmp["label"].apply(lambda s: 1 if str(s).strip().lower().startswith("spam") else 0)
                frames.append(tmp)
    if not frames:
        raise FileNotFoundError(f"No valid dataset found among {paths}")
    df_all = pd.concat(frames, ignore_index=True)
    df_all["text"] = df_all["text"].astype(str).str.strip()
    df_all = df_all.drop_duplicates(subset=["text"])
    df_all = df_all.dropna(subset=["text", "label"])
    df_all["label"] = df_all["label"].astype(int)
    if VERBOSE:
        print("Merged dataset shape:", df_all.shape)
    return df_all



In [11]:
def prepare_features(df):
    texts = df["text"].values
    labels = df["label"].values
    # standard split
    X_train_txt, X_test_txt, y_train, y_test = train_test_split(texts, labels, test_size=0.20, stratify=labels, random_state=RND)
    X_train_txt, X_val_txt, y_train, y_val = train_test_split(X_train_txt, y_train, test_size=0.1765, stratify=y_train, random_state=RND)

    # TF-IDF
    tfv = TfidfVectorizer(max_features=TFIDF_MAX_FEATURES, stop_words="english")
    X_train_tfidf = tfv.fit_transform(X_train_txt).toarray()
    X_val_tfidf = tfv.transform(X_val_txt).toarray()
    X_test_tfidf = tfv.transform(X_test_txt).toarray()

    joblib.dump(tfv, os.path.join(ARTIFACTS, "tfidf.joblib"))
    if VERBOSE:
        print("TF-IDF shapes:", X_train_tfidf.shape, X_val_tfidf.shape, X_test_tfidf.shape)
    return X_train_tfidf, X_val_tfidf, X_test_tfidf, y_train, y_val, y_test


In [13]:
# -----------------------------
# Quantum model (compact)
# -----------------------------
dev = qml.device(DEVICE_NAME, wires=N_QUBITS)

@qml.qnode(dev, interface="autograd")
def qnode(q_weights, x):
    # re-uploading: embed then apply one-layer StronglyEntanglingLayers repeatedly
    for layer in range(q_weights.shape[0]):
        qml.templates.AngleEmbedding(x, wires=range(N_QUBITS), rotation="X")
        # slice single layer
        qml.templates.StronglyEntanglingLayers(q_weights[layer:layer+1], wires=range(N_QUBITS))
    # return expectation of first qubit only (faster than all expvals)
    return qml.expval(qml.PauliZ(0))


In [15]:

# Note: using only one readout reduces runtime — often enough for binary tasks.
# If you want higher expressivity, change to return all qubits' expvals.

# -----------------------------
# Flatten helpers and prediction
# -----------------------------
def pack_flat(q_weights, w_class, b):
    return qnp.concatenate([q_weights.reshape(-1), qnp.array(w_class).reshape(-1), qnp.array([b])])

def unpack_flat(flat):
    q_cnt = N_LAYERS * N_QUBITS * 3
    q_flat = flat[:q_cnt]
    q_weights = q_flat.reshape((N_LAYERS, N_QUBITS, 3))
    c_flat = flat[q_cnt:]
    w_class = c_flat[:1]  # single weight because qnode returns one scalar
    b = c_flat[1]
    return q_weights, w_class, b

def sigmoid(x):
    return 1.0 / (1.0 + qnp.exp(-x))

def batch_predict(flat_params, X):
    q_weights, w_class, b = unpack_flat(flat_params)
    outs = []
    for x in X:
        v = qnode(q_weights, x)  # scalar
        z = w_class[0] * v + b
        outs.append(sigmoid(z))
    return qnp.array(outs, dtype=float)

def batch_loss(flat_params, X, y):
    preds = batch_predict(flat_params, X)
    yq = qnp.array(y, dtype=float)
    eps = 1e-7
    return -qnp.mean(yq * qnp.log(preds + eps) + (1 - yq) * qnp.log(1 - preds + eps))


In [23]:
# -----------------------------
# Training loop (optimized)
# -----------------------------
def train_vqc(X_train, y_train, X_val, y_val, X_test, y_test):
    # init smaller q_weights and 1 classical weight + bias
    q_init = 0.01 * np.random.randn(N_LAYERS, N_QUBITS, 3)
    w_init = np.array([0.1])  # single weight
    b_init = 0.0

    flat_params = pack_flat(qnp.array(q_init), w_init, b_init)

    opt = qml.AdamOptimizer(stepsize=STEPSIZE)
    best_val = -1.0
    best_flat = flat_params
    no_imp = 0
    n = X_train.shape[0]

    if VERBOSE:
        print("Start VQC training | epochs:", EPOCHS, "batch:", BATCH_SIZE, "stepsize:", STEPSIZE)

    for ep in range(EPOCHS):
        idx = np.random.permutation(n)
        Xs = X_train[idx]
        ys = y_train[idx]
        for i in range(0, n, BATCH_SIZE):
            Xb = Xs[i:i+BATCH_SIZE]
            yb = ys[i:i+BATCH_SIZE]
            def cost(p): return batch_loss(p, Xb, yb)
            flat_params = opt.step(cost, flat_params)

        # validation
        yv_prob = np.array(batch_predict(flat_params, X_val).tolist(), dtype=float)
        yv_pred = (yv_prob >= 0.5).astype(int)
        val_acc = accuracy_score(y_val, yv_pred)

        if VERBOSE:
            print(f"Epoch {ep+1}/{EPOCHS} - Val acc: {val_acc:.4f}")

        if val_acc > best_val + 1e-6:
            best_val = val_acc
            best_flat = flat_params
            no_imp = 0
        else:
            no_imp += 1

        if no_imp >= EARLY_STOPPING:
            if VERBOSE:
                print("Early stopping at epoch", ep+1)
            break
    # test eval
    y_test_prob = np.array(batch_predict(best_flat, X_test).tolist(), dtype=float)
    y_test_pred = (y_test_prob >= 0.5).astype(int)
    test_acc = accuracy_score(y_test, y_test_pred)
    if VERBOSE:
        print("VQC test acc:", test_acc)
        print(classification_report(y_test, y_test_pred))
    joblib.dump({"flat_params": np.array(best_flat)}, os.path.join(ARTIFACTS, "vqc_fast.joblib"))
    return test_acc, best_flat

In [25]:
# -----------------------------
# Classical baseline (fast)
# -----------------------------
def train_classical_svm(Xtr, ytr, Xval, yval, Xtest, ytest):
    svc = SVC(kernel="linear", C=1.0, probability=True, random_state=RND)
    svc.fit(Xtr, ytr)
    if VERBOSE:
        print("Classical SVM val acc:", accuracy_score(yval, svc.predict(Xval)))
        print("Classical SVM test acc:", accuracy_score(ytest, svc.predict(Xtest)))
    joblib.dump(svc, os.path.join(ARTIFACTS, "svm_fast.joblib"))
    return svc

In [35]:

# -----------------------------
# Main pipeline
# -----------------------------
def main():
    t0 = time.time()
    df = load_and_merge(DATA_FILES)
    Xtr_tfidf, Xv_tfidf, Xt_tfidf, ytr, yv, yt = prepare_features(df)

    # baseline
    svc = train_classical_svm(Xtr_tfidf, ytr, Xv_tfidf, yv, Xt_tfidf, yt)

    # PCA -> n_qubits
    pca = PCA(n_components=PCA_COMPONENTS, random_state=RND)
    Xtr_pca = pca.fit_transform(Xtr_tfidf)
    Xv_pca = pca.transform(Xv_tfidf)
    Xt_pca = pca.transform(Xt_tfidf)
    joblib.dump(pca, os.path.join(ARTIFACTS, "pca_fast.joblib"))

    # scale to [-pi, pi] for AngleEmbedding
    scaler = MinMaxScaler(feature_range=(-np.pi, np.pi))
    Xtr_pca = scaler.fit_transform(Xtr_pca)
    Xv_pca = scaler.transform(Xv_pca)
    Xt_pca = scaler.transform(Xt_pca)
    joblib.dump(scaler, os.path.join(ARTIFACTS, "scaler_fast.joblib"))

    # Convert arrays to dtype float for qnode
    Xtr_pca = np.array(Xtr_pca, dtype=float)
    Xv_pca = np.array(Xv_pca, dtype=float)
    Xt_pca = np.array(Xt_pca, dtype=float)

    # Train VQC
    global N_QUBITS, N_LAYERS  # qnode references these
    N_QUBITS = N_QUBITS  # already set
    N_LAYERS = N_LAYERS  # already set

    vqc_acc, best = train_vqc(Xtr_pca, ytr, Xv_pca, yv, Xt_pca, yt)
    elapsed = time.time() - t0
    if VERBOSE:
        print(f"Total elapsed: {elapsed:.1f}s")

    # Ensemble
    vqc_probs = np.array(batch_predict(qnp.array(best), Xt_pca).tolist(), dtype=float)
    svm_probs = svc.predict_proba(Xt_tfidf)[:, 1]
    ensemble_probs = 0.5 * (vqc_probs + svm_probs)
    ensemble_pred = (ensemble_probs >= 0.5).astype(int)
    ensemble_acc = accuracy_score(yt, ensemble_pred)
    if VERBOSE:
        print(f"VQC test acc: {vqc_acc:.4f}, SVM test acc: {accuracy_score(yt, svc.predict(Xt_tfidf)):.4f}, Ensemble acc: {ensemble_acc:.4f}")

if __name__ == "__main__":
    main()


Merged dataset shape: (10151, 2)
TF-IDF shapes: (6686, 500) (1434, 500) (2031, 500)
Classical SVM val acc: 0.9504881450488145
Classical SVM test acc: 0.9492860659773511
Start VQC training | epochs: 20 batch: 128 stepsize: 0.12
Epoch 1/20 - Val acc: 0.8501
Epoch 2/20 - Val acc: 0.8668
Epoch 3/20 - Val acc: 0.8591
Epoch 4/20 - Val acc: 0.8668
Epoch 5/20 - Val acc: 0.8640
Epoch 6/20 - Val acc: 0.8480
Early stopping at epoch 6
VQC test acc: 0.8414574101427869
              precision    recall  f1-score   support

           0       0.89      0.91      0.90      1610
           1       0.63      0.58      0.60       421

    accuracy                           0.84      2031
   macro avg       0.76      0.75      0.75      2031
weighted avg       0.84      0.84      0.84      2031

Total elapsed: 1264.4s
VQC test acc: 0.8415, SVM test acc: 0.9493, Ensemble acc: 0.9424
