In [None]:
import time
from statistics import mode
from typing import Any, Callable, List, Tuple

import numpy as np
import pandas as pd
from sklearn import datasets, preprocessing
from sklearn.metrics import confusion_matrix
from sklearn.model_selection import train_test_split

from qiskit import ClassicalRegister, QuantumCircuit, QuantumRegister
from qiskit.quantum_info import Statevector
from qiskit_aer.primitives import SamplerV2

In [None]:
# ============================================================
# 1. Classical data handling
# ============================================================

def load_and_normalize_iris() -> Tuple[np.ndarray, np.ndarray]:
    iris = datasets.load_iris()
    X = iris.data
    y = iris.target

    X_petal = X[:, 2:4]  # only petal length and width

    scaler = preprocessing.MinMaxScaler((0.0, 1.0))
    X_norm = scaler.fit_transform(X_petal)
    return X_norm, y


# ============================================================
# 2. Encoding helpers
# ============================================================

def encode_features_to_angles(features: np.ndarray) -> np.ndarray:
    f = np.clip(features.astype(float), 0.0, 1.0)
    return np.pi * f


def apply_angle_encoding(qc: QuantumCircuit, qubits, angles):
    for i, a in enumerate(angles):
        qc.ry(float(a), qubits[i])


def encode_features_to_amplitude_state(features: np.ndarray) -> np.ndarray:
    state = None
    for x_j in features:
        x_j = float(np.clip(x_j, 0.0, 1.0))
        v = np.array([np.sqrt(x_j), np.sqrt(1 - x_j)], dtype=np.complex128)
        state = v if state is None else np.kron(state, v)

    if not Statevector(state).is_valid():
        raise ValueError("Encoded statevector invalid.")
    return state


In [None]:
# ============================================================
# 3. SWAP test circuits
# ============================================================

def build_swap_test_angle(test_angles, train_angles):
    n = len(test_angles)

    anc = QuantumRegister(1, "anc")
    qt = QuantumRegister(n, "test")
    qr = QuantumRegister(n, "train")
    c = ClassicalRegister(1, "c")

    qc = QuantumCircuit(anc, qt, qr, c)

    apply_angle_encoding(qc, qt, test_angles)
    apply_angle_encoding(qc, qr, train_angles)

    qc.h(anc[0])
    for t, r in zip(qt, qr):
        qc.cswap(anc[0], t, r)
    qc.h(anc[0])
    qc.measure(anc[0], c[0])

    return qc


def estimate_similarity_angle(test_angles, train_angles, shots, sampler):
    qc = build_swap_test_angle(test_angles, train_angles)
    out = sampler.run([qc], shots=shots).result()
    counts = out[0].data.c.get_counts()
    p1 = counts.get("1", 0) / shots
    sim = max(0.0, min(1.0, 1 - 2 * p1))
    return sim


def build_swap_test_amplitude(test_state, train_state):
    n = int(np.log2(len(test_state)))

    anc = QuantumRegister(1, "anc")
    qt = QuantumRegister(n, "test")
    qr = QuantumRegister(n, "train")
    c = ClassicalRegister(1, "c")

    qc = QuantumCircuit(anc, qt, qr, c)
    qc.initialize(test_state, qt)
    qc.initialize(train_state, qr)

    qc.h(anc[0])
    for t, r in zip(qt, qr):
        qc.cswap(anc[0], t, r)
    qc.h(anc[0])
    qc.measure(anc[0], c[0])

    return qc


def estimate_similarity_amplitude(test_state, train_state, shots, sampler):
    qc = build_swap_test_amplitude(test_state, train_state)
    out = sampler.run([qc], shots=shots).result()
    counts = out[0].data.c.get_counts()
    p1 = counts.get("1", 0) / shots
    sim = max(0.0, min(1.0, 1 - 2 * p1))
    return sim

In [None]:
# ============================================================
# 4. Core experiment driver (NO PRINTS)
# ============================================================

def run_qknn_core(
    encode_fn_train: Callable[[np.ndarray], Any],
    encode_fn_test: Callable[[np.ndarray], Any],
    similarity_fn: Callable[[Any, Any, int, SamplerV2], float],
    train_size: int,
    k_neighbors: int,
    shots: int,
    prefix: str,
):
    X, y = load_and_normalize_iris()

    X_train, X_test, y_train, y_test = train_test_split(
        X, y,
        train_size=train_size,
        stratify=y,
        random_state=42,
    )

    num_features = X_train.shape[1]
    num_train = len(X_train)
    num_test = len(X_test)

    sampler = SamplerV2()

    # Encode train set once
    train_enc_start = time.perf_counter()
    train_repr = [encode_fn_train(x) for x in X_train]
    train_enc_end = time.perf_counter()

    encoding_time_list = []
    classification_time_list = []
    preds = []

    # Main loop
    for i in range(num_test):
        x_test = X_test[i]

        # Encode test
        t0 = time.perf_counter()
        test_repr = encode_fn_test(x_test)
        t1 = time.perf_counter()
        encoding_time_list.append((t1 - t0) / num_features)

        # Classify
        t2 = time.perf_counter()
        sims = [
            similarity_fn(test_repr, train_repr[j], shots, sampler)
            for j in range(num_train)
        ]

        sims = np.array(sims)
        topk = sims.argsort()[::-1][:k_neighbors]
        lbls = [y_train[j] for j in topk]
        preds.append(mode(lbls))
        t3 = time.perf_counter()

        classification_time_list.append(t3 - t2)

    # Confusion matrix
    cm = confusion_matrix(y_test, preds)
    cm_df = pd.DataFrame(cm)

    # Save results
    cm_df.to_csv(f"{prefix}_confusion_matrix.csv", index=False)
    pd.DataFrame({"encoding_time": encoding_time_list}).to_csv(
        f"{prefix}_encoding_times.csv", index=False
    )
    pd.DataFrame({"classification_time": classification_time_list}).to_csv(
        f"{prefix}_classification_times.csv", index=False
    )
    # Training encode summary
    pd.DataFrame({
        "total_train_encoding_time": [train_enc_end - train_enc_start],
        "per_train_sample": [(train_enc_end - train_enc_start) / num_train],
        "per_train_feature": [(train_enc_end - train_enc_start) / (num_train * num_features)],
    }).to_csv(f"{prefix}_train_encoding_summary.csv", index=False)


# ============================================================
# 5. Public wrappers
# ============================================================

def run_angle(
    train_size=100,
    k_neighbors=10,
    shots=2048,
    prefix="qknn_angle",
):
    run_qknn_core(
        encode_fn_train=encode_features_to_angles,
        encode_fn_test=encode_features_to_angles,
        similarity_fn=estimate_similarity_angle,
        train_size=train_size,
        k_neighbors=k_neighbors,
        shots=shots,
        prefix=prefix,
    )


def run_amplitude(
    train_size=100,
    k_neighbors=10,
    shots=2048,
    prefix="qknn_amplitude",
):
    run_qknn_core(
        encode_fn_train=encode_features_to_amplitude_state,
        encode_fn_test=encode_features_to_amplitude_state,
        similarity_fn=estimate_similarity_amplitude,
        train_size=train_size,
        k_neighbors=k_neighbors,
        shots=shots,
        prefix=prefix,
    )


# ============================================================
# 6. Script entry
# ============================================================

if __name__ == "__main__":
    run_angle()
    run_amplitude()
