In [1]:
# Qiskit Iris QNN with Amplitude Embedding + Data Reuploading (3-class)
# Uses assign_parameters with a safe fallback for older versions.

import numpy as np
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, classification_report

from qiskit import QuantumCircuit
from qiskit.circuit.library import StatePreparation, TwoLocal
from qiskit.quantum_info import SparsePauliOp

# Prefer AerEstimator if available (fast statevector); otherwise default Estimator
try:
    from qiskit_aer.primitives import Estimator as AerEstimator
    EstimatorImpl = AerEstimator
    print("Using AerEstimator (statevector).")
except Exception:
    from qiskit.primitives import Estimator
    EstimatorImpl = Estimator
    print("Using default Estimator.")

# -----------------------------
# 1) Load & preprocess Iris data
# -----------------------------
X, y = load_iris(return_X_y=True)              # X: (150,4), y: {0,1,2}
scaler = StandardScaler()
X = scaler.fit_transform(X)

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.25, random_state=0, stratify=y
)

def one_hot(y, K=3):
    oh = np.zeros((len(y), K), dtype=float)
    oh[np.arange(len(y)), y.astype(int)] = 1.0
    return oh

Y_train = one_hot(y_train, 3)
Y_test  = one_hot(y_test, 3)

# -----------------------------------------
# 2) Quantum model: amplitude + reuploading
# -----------------------------------------
n_qubits = 2                 # 2^2 = 4 amplitudes ← 4 Iris features
D = 2**n_qubits              # 4
L = 5                        # reuploading depth

Z0 = SparsePauliOp.from_list([("ZI", 1.0)])
Z1 = SparsePauliOp.from_list([("IZ", 1.0)])
ZZ = SparsePauliOp.from_list([("ZZ", 1.0)])
OBSERVABLES = [Z0, Z1, ZZ]   # 3 logits → 3 classes after softmax

def l2_normalize_4(v):
    v = np.asarray(v, dtype=float).reshape(-1)
    assert v.shape[0] == 4, "This demo expects 4 features."
    n = np.linalg.norm(v)
    return v / (n if n > 0 else 1.0)

def make_layer_template():
    # One TwoLocal block with ry/rz + CZ entanglement, reps=1
    return TwoLocal(
        num_qubits=n_qubits,
        rotation_blocks=["ry", "rz"],
        entanglement="cz",
        reps=1,
        parameter_prefix="θ"
    )

_LAYER_TEMPLATE = make_layer_template()
N_LAYER_PARAMS = len(_LAYER_TEMPLATE.parameters)

# ---- helper: robust parameter assignment across Qiskit versions ----
def assign_or_bind(op, mapping):
    """Try assign_parameters; fall back to bind_parameters if needed."""
    try:
        return op.assign_parameters(mapping)
    except AttributeError:
        # older objects may only support bind_parameters
        return op.bind_parameters(mapping)

def make_reupload_circuit(x_norm, theta):
    """
    x_norm: normalized 4-vector (||x||=1) for amplitude embedding
    theta:  length L * N_LAYER_PARAMS (concatenated layer params)
    """
    assert len(theta) == L * N_LAYER_PARAMS
    qc = QuantumCircuit(n_qubits)
    idx = 0

    for _ in range(L):
        # Uϕ(x): amplitude embedding each reupload
        qc.append(StatePreparation(x_norm, normalize=False), range(n_qubits))

        # V(ω): one variational layer (fresh copy of the template)
        layer = _LAYER_TEMPLATE.copy()

        params_slice = list(layer.parameters)
        bind_vals = theta[idx: idx + N_LAYER_PARAMS]
        param_map = {p: float(v) for p, v in zip(params_slice, bind_vals)}

        layer_assigned = assign_or_bind(layer, param_map)
        qc = qc.compose(layer_assigned)

        idx += N_LAYER_PARAMS

    return qc

# -----------------------
# 3) Loss & train routine
# -----------------------
def softmax(logits):
    z = logits - np.max(logits, axis=1, keepdims=True)
    e = np.exp(z)
    return e / np.sum(e, axis=1, keepdims=True)

def cross_entropy(probs, targets, eps=1e-9):
    return -np.mean(np.sum(targets * np.log(probs + eps), axis=1))

# Evaluate model logits for a batch using Estimator
def logits_batch(estimator, Xb, theta):
    circuits = []
    for x in Xb:
        x_norm = l2_normalize_4(x)
        circuits.append(make_reupload_circuit(x_norm, theta))

    # Evaluate each observable separately; stack into (B, 3)
    outs = []
    for obs in OBSERVABLES:
        job = estimator.run(circuits=circuits, observables=[obs] * len(circuits))
        vals = np.asarray(job.result().values, dtype=float)
        outs.append(vals)
    return np.stack(outs, axis=1)

def predict_proba(estimator, Xb, theta):
    return softmax(logits_batch(estimator, Xb, theta))

def batch_loss(estimator, Xb, Yb, theta):
    probs = predict_proba(estimator, Xb, theta)
    return cross_entropy(probs, Yb)

# -----------------------
# 4) SPSA optimizer (simple)
# -----------------------
class SPSA:
    def __init__(self, a=0.2, c=0.2, alpha=0.602, gamma=0.101, seed=1):
        self.a0, self.c0, self.alpha, self.gamma = a, c, alpha, gamma
        self.k = 0
        self.rng = np.random.default_rng(seed)

    def step(self, theta, loss_fn):
        k = self.k + 1
        ak = self.a0 / (k ** self.alpha)
        ck = self.c0 / (k ** self.gamma)
        delta = self.rng.choice([-1.0, 1.0], size=theta.shape)
        thetap = theta + ck * delta
        thetam = theta - ck * delta
        lp = loss_fn(thetap)
        lm = loss_fn(thetam)
        ghat = (lp - lm) / (2.0 * ck) * delta
        theta_new = theta - ak * ghat
        self.k = k
        return theta_new, float(lp), float(lm)

# -----------------------
# 5) Train
# -----------------------
estimator = EstimatorImpl()

rng = np.random.default_rng(0)
theta = 0.01 * rng.standard_normal(L * N_LAYER_PARAMS)

opt = SPSA(a=0.2, c=0.2, alpha=0.602, gamma=0.101, seed=1)

epochs = 50
batch_size = 20

def iterate_minibatches(Xd, Yd, bs, seed=0):
    rng = np.random.default_rng(seed)
    idx = rng.permutation(len(Xd))
    for start in range(0, len(Xd), bs):
        j = idx[start:start+bs]
        yield Xd[j], Yd[j]

for ep in range(1, epochs + 1):
    for Xb, Yb in iterate_minibatches(X_train, Y_train, batch_size, seed=ep):
        loss_fn = lambda th: batch_loss(estimator, Xb, Yb, th)
        theta, lp, lm = opt.step(theta, loss_fn)

    train_probs = predict_proba(estimator, X_train, theta)
    train_pred  = np.argmax(train_probs, axis=1)
    train_acc   = accuracy_score(y_train, train_pred)
    train_loss  = cross_entropy(train_probs, Y_train)

    if ep == 1 or ep % 5 == 0:
        print(f"Epoch {ep:2d} | loss={train_loss:.4f} | acc={train_acc:.3f}")

# -----------------------
# 6) Evaluate
# -----------------------
test_probs = predict_proba(estimator, X_test, theta)
test_pred  = np.argmax(test_probs, axis=1)
test_acc   = accuracy_score(y_test, test_pred)
print("\nTest accuracy:", test_acc)
print("\nClassification report:\n", classification_report(y_test, test_pred, digits=3))


Using AerEstimator (statevector).


  return TwoLocal(


Epoch  1 | loss=1.0921 | acc=0.420


KeyboardInterrupt: 

In [None]:
# Qiskit Iris QNN — Amplitude Embedding + Data Reuploading
# With stronger ansatz, wider readout, trainable classical head, SPSA training.

import numpy as np
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, classification_report

from qiskit import QuantumCircuit
from qiskit.circuit.library import StatePreparation, TwoLocal
from qiskit.quantum_info import SparsePauliOp

# Prefer AerEstimator if available (fast statevector); else default Estimator
try:
    from qiskit_aer.primitives import Estimator as AerEstimator
    EstimatorImpl = AerEstimator
    print("Using AerEstimator (statevector).")
except Exception:
    from qiskit.primitives import Estimator
    EstimatorImpl = Estimator
    print("Using default Estimator.")

# -----------------------------
# 1) Load & preprocess Iris data
# -----------------------------
X, y = load_iris(return_X_y=True)              # X: (150,4), y: {0,1,2}

# Standardize features (good for optimization)
scaler = StandardScaler()
X = scaler.fit_transform(X)

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.25, random_state=0, stratify=y
)

def one_hot_smooth(y, K=3, eps=0.05):
    y = y.astype(int)
    oh = np.full((len(y), K), eps/(K-1), dtype=float)
    oh[np.arange(len(y)), y] = 1.0 - eps
    return oh

Y_train = one_hot_smooth(y_train, 3, eps=0.05)
Y_test  = one_hot_smooth(y_test,  3, eps=0.05)

# -----------------------------------------
# 2) Quantum model: amplitude + reuploading
# -----------------------------------------
n_qubits = 2                 # 2^2 = 4 amplitudes → matches 4 Iris features
D = 2**n_qubits              # 4
L = 5                        # reuploading depth (↑ capacity)

# Richer readout: 7 observables → feature vector (NUM_OBS = 7)
OBSERVABLES = [
    SparsePauliOp.from_list([("ZI", 1.0)]),
    SparsePauliOp.from_list([("IZ", 1.0)]),
    SparsePauliOp.from_list([("ZZ", 1.0)]),
    SparsePauliOp.from_list([("XX", 1.0)]),
    SparsePauliOp.from_list([("YY", 1.0)]),
    SparsePauliOp.from_list([("XZ", 1.0)]),
    SparsePauliOp.from_list([("ZX", 1.0)]),
]
NUM_OBS = len(OBSERVABLES)
NUM_CLASSES = 3

def l2_normalize_4(v):
    v = np.asarray(v, dtype=float).reshape(-1)
    assert v.shape[0] == 4, "This demo expects 4 features."
    n = np.linalg.norm(v)
    return v / (n if n > 0 else 1.0)

# TwoLocal template with stronger expressivity; robust to version quirks
def make_layer_template():
    try:
        # Try a beefy template
        return TwoLocal(
            num_qubits=n_qubits,
            rotation_blocks=["rx", "ry", "rz"],
            entanglement="full",      # full entanglement if supported
            reps=2,                   # deeper per layer
            parameter_prefix="θ"
        )
    except Exception:
        # Fallback if "full" not supported in your version → use CZ ring
        return TwoLocal(
            num_qubits=n_qubits,
            rotation_blocks=["rx", "ry", "rz"],
            entanglement="cz",
            reps=2,
            parameter_prefix="θ"
        )

_LAYER_TEMPLATE = make_layer_template()
N_LAYER_PARAMS = len(_LAYER_TEMPLATE.parameters)

# ---- helper: robust parameter assignment across Qiskit versions ----
def assign_or_bind(op, mapping):
    """Try assign_parameters; fall back to bind_parameters if needed."""
    try:
        return op.assign_parameters(mapping)
    except AttributeError:
        return op.bind_parameters(mapping)

def make_reupload_circuit(x_norm, theta_q):
    """
    x_norm: normalized 4-vector (||x||=1) for amplitude embedding
    theta_q:  quantum parameter vector of length L * N_LAYER_PARAMS
    """
    assert len(theta_q) == L * N_LAYER_PARAMS
    qc = QuantumCircuit(n_qubits)
    idx = 0
    for _ in range(L):
        # Uϕ(x): amplitude re-embedding each block (data reuploading)
        qc.append(StatePreparation(x_norm, normalize=False), range(n_qubits))

        # V(ω): one variational layer (fresh template per block)
        layer = _LAYER_TEMPLATE.copy()
        params_slice = list(layer.parameters)
        bind_vals = theta_q[idx: idx + N_LAYER_PARAMS]
        param_map = {p: float(v) for p, v in zip(params_slice, bind_vals)}
        layer_assigned = assign_or_bind(layer, param_map)
        qc = qc.compose(layer_assigned)
        idx += N_LAYER_PARAMS
    return qc

# -----------------------
# 3) Classical head + utils
# -----------------------
TEMPERATURE = 1.5  # soften logits → smoother gradients

def softmax(logits):
    z = logits / TEMPERATURE
    z = z - np.max(z, axis=1, keepdims=True)
    e = np.exp(z)
    return e / np.sum(e, axis=1, keepdims=True)

def cross_entropy(probs, targets, eps=1e-9):
    return -np.mean(np.sum(targets * np.log(probs + eps), axis=1))

# Parameter vector packing:
# [ quantum θ_q | classical Wc(flat) | classical bc ]
def split_theta(theta):
    q_end = L * N_LAYER_PARAMS
    c_end = q_end + NUM_CLASSES * NUM_OBS
    theta_q = theta[:q_end]
    Wc = theta[q_end:c_end].reshape(NUM_CLASSES, NUM_OBS)
    bc = theta[c_end:]
    return theta_q, Wc, bc

# Evaluate logits for a batch: expectations → classical linear head
def logits_batch(estimator, Xb, theta):
    theta_q, Wc, bc = split_theta(theta)

    circuits = []
    for x in Xb:
        x_norm = l2_normalize_4(x)
        circuits.append(make_reupload_circuit(x_norm, theta_q))

    # Collect expectation features (B, NUM_OBS)
    feats = []
    for obs in OBSERVABLES:
        job = estimator.run(circuits=circuits, observables=[obs] * len(circuits))
        vals = np.asarray(job.result().values, dtype=float)  # (B,)
        feats.append(vals)
    F = np.stack(feats, axis=1)

    # Classical linear head → logits (B, NUM_CLASSES)
    return F @ Wc.T + bc[np.newaxis, :]

def predict_proba(estimator, Xb, theta):
    return softmax(logits_batch(estimator, Xb, theta))

def batch_loss(estimator, Xb, Yb, theta):
    probs = predict_proba(estimator, Xb, theta)
    return cross_entropy(probs, Yb)

# -----------------------
# 4) SPSA optimizer (hardware-friendly)
# -----------------------
class SPSA:
    def __init__(self, a=0.15, c=0.15, alpha=0.602, gamma=0.2, seed=1):
        self.a0, self.c0, self.alpha, self.gamma = a, c, alpha, gamma
        self.k = 0
        self.rng = np.random.default_rng(seed)

    def step(self, theta, loss_fn):
        k = self.k + 1
        ak = self.a0 / (k ** self.alpha)
        ck = self.c0 / (k ** self.gamma)
        delta = self.rng.choice([-1.0, 1.0], size=theta.shape)
        thetap = theta + ck * delta
        thetam = theta - ck * delta
        lp = loss_fn(thetap)
        lm = loss_fn(thetam)
        ghat = (lp - lm) / (2.0 * ck) * delta
        theta_new = theta - ak * ghat
        self.k = k
        return theta_new, float(lp), float(lm)

# -----------------------
# 5) Train
# -----------------------
estimator = EstimatorImpl()

rng = np.random.default_rng(0)
# Quantum params
theta_q0 = 0.01 * rng.standard_normal(L * N_LAYER_PARAMS)
# Classical head params (Wc, bc)
Wc0 = 0.01 * rng.standard_normal(NUM_CLASSES * NUM_OBS)
bc0 = 0.01 * rng.standard_normal(NUM_CLASSES)

theta = np.concatenate([theta_q0, Wc0, bc0])

opt = SPSA(a=0.15, c=0.15, alpha=0.602, gamma=0.2, seed=1)

# Longer training, smaller batches, multiple steps per batch
epochs = 120
batch_size = 8
steps_per_batch = 3

def iterate_minibatches(Xd, Yd, bs, seed=0):
    rng = np.random.default_rng(seed)
    idx = rng.permutation(len(Xd))
    for start in range(0, len(Xd), bs):
        j = idx[start:start+bs]
        yield Xd[j], Yd[j]

best_loss, patience, wait = float('inf'), 15, 0
best_theta = theta.copy()

for ep in range(1, epochs + 1):
    for Xb, Yb in iterate_minibatches(X_train, Y_train, batch_size, seed=ep):
        loss_fn = lambda th: batch_loss(estimator, Xb, Yb, th)
        for _ in range(steps_per_batch):
            theta, _, _ = opt.step(theta, loss_fn)

    # Train metrics each epoch
    train_probs = predict_proba(estimator, X_train, theta)
    train_pred  = np.argmax(train_probs, axis=1)
    train_acc   = accuracy_score(y_train, train_pred)
    train_loss  = cross_entropy(train_probs, Y_train)

    # Early stopping on training loss (you can split a val set if you prefer)
    if train_loss + 1e-4 < best_loss:
        best_loss, wait = train_loss, 0
        best_theta = theta.copy()
    else:
        wait += 1

    if ep == 1 or ep % 10 == 0:
        print(f"Epoch {ep:3d} | loss={train_loss:.4f} | acc={train_acc:.3f} | wait={wait}")

    if wait >= patience:
        print(f"Early stop at epoch {ep}.")
        theta = best_theta
        break

# -----------------------
# 6) Evaluate
# -----------------------
test_probs = predict_proba(estimator, X_test, theta)
test_pred  = np.argmax(test_probs, axis=1)
test_acc   = accuracy_score(y_test, test_pred)
print("\nTest accuracy:", test_acc)
print("\nClassification report:\n", classification_report(y_test, test_pred, digits=3))


Using AerEstimator (statevector).


  return TwoLocal(


In [3]:
import numpy as np
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, classification_report

from qiskit import QuantumCircuit
from qiskit.quantum_info import SparsePauliOp

# Prefer AerEstimator if available
try:
    from qiskit_aer.primitives import Estimator as AerEstimator
    EstimatorImpl = AerEstimator
    print("Using AerEstimator (statevector).")
except Exception:
    from qiskit.primitives import Estimator
    EstimatorImpl = Estimator
    print("Using default Estimator.")

# -----------------------------
# 1) Load & preprocess Iris data
# -----------------------------
X, y = load_iris(return_X_y=True)              # X: (150,4), y: {0,1,2}
scaler = StandardScaler()
X = scaler.fit_transform(X)

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.25, random_state=0, stratify=y
)

NUM_CLASSES = 3

def one_hot(y, K=3):
    oh = np.zeros((len(y), K), dtype=float)
    oh[np.arange(len(y)), y.astype(int)] = 1.0
    return oh

Y_train = one_hot(y_train, NUM_CLASSES)
Y_test  = one_hot(y_test,  NUM_CLASSES)

# -----------------------------------------
# 2) Model: amp embed once + angle reupload
# -----------------------------------------
n_qubits = 2
L = 3  # number of data-reupload + variational blocks

def l2_normalize_4(v):
    v = np.asarray(v, dtype=float).reshape(-1)
    assert v.shape[0] == 4, "This demo expects 4 features."
    n = np.linalg.norm(v)
    return v / (n if n > 0 else 1.0)

# Observables → 7D feature vector
OBSERVABLES = [
    SparsePauliOp.from_list([("ZI", 1.0)]),
    SparsePauliOp.from_list([("IZ", 1.0)]),
    SparsePauliOp.from_list([("ZZ", 1.0)]),
    SparsePauliOp.from_list([("XX", 1.0)]),
    SparsePauliOp.from_list([("YY", 1.0)]),
    SparsePauliOp.from_list([("XZ", 1.0)]),
    SparsePauliOp.from_list([("ZX", 1.0)]),
]
NUM_OBS = len(OBSERVABLES)

# Parameter layout:
# [ theta_q (4*L) | Wr (4*4) | Wc (NUM_CLASSES * NUM_OBS) | bc (NUM_CLASSES) ]
N_Q = 4 * L
N_R = 4 * 4
N_W = NUM_CLASSES * NUM_OBS
N_B = NUM_CLASSES
TOTAL_PARAMS = N_Q + N_R + N_W + N_B

def split_theta(theta):
    theta = np.asarray(theta, dtype=float)
    assert theta.shape[0] == TOTAL_PARAMS

    q = theta[:N_Q]
    r = theta[N_Q:N_Q+N_R].reshape(4, 4)
    w = theta[N_Q+N_R:N_Q+N_R+N_W].reshape(NUM_CLASSES, NUM_OBS)
    b = theta[N_Q+N_R+N_W:]
    return q, r, w, b

def make_amp_angle_circuit(x_raw, theta_q, Wr):
    """
    x_raw: (4,) standardized features (not normalized)
    theta_q: (N_Q,) quantum variational params
    Wr: (4,4) reupload weight matrix
    """
    x_raw = np.asarray(x_raw, dtype=float).reshape(4)
    x_norm = l2_normalize_4(x_raw)

    qc = QuantumCircuit(n_qubits)

    # Amplitude embedding ONCE
    qc.prepare_state(x_norm, range(n_qubits))

    # Apply L blocks of: data reupload via angles + variational block
    idx = 0
    for _ in range(L):
        # ---- data reupload via learnable linear combos ----
        # alpha = Wr @ x_raw  (shape (4,))
        alpha = Wr @ x_raw        # 4 reupload angles

        # qubit 0: RY, RZ; qubit 1: RY, RZ
        qc.ry(float(alpha[0]), 0)
        qc.rz(float(alpha[1]), 0)
        qc.ry(float(alpha[2]), 1)
        qc.rz(float(alpha[3]), 1)

        # ---- variational block with 4 params ----
        p0, p1, p2, p3 = theta_q[idx:idx+4]
        idx += 4

        qc.ry(float(p0), 0)
        qc.rz(float(p1), 0)
        qc.ry(float(p2), 1)
        qc.rz(float(p3), 1)

        # Entangling gate
        qc.cx(0, 1)

    return qc

# -----------------------
# 3) Loss & helpers
# -----------------------
def softmax(logits):
    z = logits - np.max(logits, axis=1, keepdims=True)
    e = np.exp(z)
    return e / np.sum(e, axis=1, keepdims=True)

def cross_entropy(probs, targets, eps=1e-9):
    return -np.mean(np.sum(targets * np.log(probs + eps), axis=1))

def features_batch(estimator, Xb, theta):
    theta_q, Wr, Wc, bc = split_theta(theta)
    circuits = [make_amp_angle_circuit(x, theta_q, Wr) for x in Xb]

    feats = []
    for obs in OBSERVABLES:
        job = estimator.run(circuits=circuits, observables=[obs] * len(circuits))
        vals = np.asarray(job.result().values, dtype=float)  # (B,)
        feats.append(vals)
    F = np.stack(feats, axis=1)  # (B, NUM_OBS)
    return F, Wc, bc

def logits_batch(estimator, Xb, theta):
    F, Wc, bc = features_batch(estimator, Xb, theta)
    return F @ Wc.T + bc[np.newaxis, :]

def predict_proba(estimator, Xb, theta):
    return softmax(logits_batch(estimator, Xb, theta))

def batch_loss(estimator, Xb, Yb, theta):
    probs = predict_proba(estimator, Xb, theta)
    return cross_entropy(probs, Yb)

# -----------------------
# 4) SPSA optimizer
# -----------------------
class SPSA:
    def __init__(self, a=0.15, c=0.1, alpha=0.602, gamma=0.101, seed=1):
        self.a0, self.c0, self.alpha, self.gamma = a, c, alpha, gamma
        self.k = 0
        self.rng = np.random.default_rng(seed)

    def step(self, theta, loss_fn):
        k = self.k + 1
        ak = self.a0 / (k ** self.alpha)
        ck = self.c0 / (k ** self.gamma)
        delta = self.rng.choice([-1.0, 1.0], size=theta.shape)
        thetap = theta + ck * delta
        thetam = theta - ck * delta
        lp = loss_fn(thetap)
        lm = loss_fn(thetam)
        ghat = (lp - lm) / (2.0 * ck) * delta
        theta_new = theta - ak * ghat
        self.k = k
        return theta_new, float(lp), float(lm)

# -----------------------
# 5) Train
# -----------------------
estimator = EstimatorImpl()

rng = np.random.default_rng(0)
theta_init = 0.05 * rng.standard_normal(TOTAL_PARAMS)
theta = theta_init.copy()

opt = SPSA(a=0.15, c=0.1, alpha=0.602, gamma=0.101, seed=1)

epochs = 80
batch_size = 8
steps_per_batch = 1

def iterate_minibatches(Xd, Yd, bs, seed=0):
    rng = np.random.default_rng(seed)
    idx = rng.permutation(len(Xd))
    for start in range(0, len(Xd), bs):
        j = idx[start:start+bs]
        yield Xd[j], Yd[j]

best_loss, best_theta = float('inf'), theta.copy()

for ep in range(1, epochs + 1):
    for Xb, Yb in iterate_minibatches(X_train, Y_train, batch_size, seed=ep):
        loss_fn = lambda th: batch_loss(estimator, Xb, Yb, th)
        for _ in range(steps_per_batch):
            theta, _, _ = opt.step(theta, loss_fn)

    train_probs = predict_proba(estimator, X_train, theta)
    train_pred  = np.argmax(train_probs, axis=1)
    train_acc   = accuracy_score(y_train, train_pred)
    train_loss  = cross_entropy(train_probs, Y_train)

    if train_loss < best_loss:
        best_loss = train_loss
        best_theta = theta.copy()

    if ep == 1 or ep % 10 == 0:
        print(f"Epoch {ep:2d} | loss={train_loss:.4f} | acc={train_acc:.3f}")

theta = best_theta  # use best params

# -----------------------
# 6) Evaluate
# -----------------------
test_probs = predict_proba(estimator, X_test, theta)
test_pred  = np.argmax(test_probs, axis=1)
test_acc   = accuracy_score(y_test, test_pred)
print("\nTest accuracy:", test_acc)
print("\nClassification report:\n", classification_report(y_test, test_pred, digits=3))


Using AerEstimator (statevector).
Epoch  1 | loss=0.9195 | acc=0.732
Epoch 10 | loss=0.6682 | acc=0.804
Epoch 20 | loss=0.5852 | acc=0.804
Epoch 30 | loss=0.5572 | acc=0.821
Epoch 40 | loss=0.5307 | acc=0.812
Epoch 50 | loss=0.5108 | acc=0.848
Epoch 60 | loss=0.4949 | acc=0.848
Epoch 70 | loss=0.4922 | acc=0.839
Epoch 80 | loss=0.4770 | acc=0.839

Test accuracy: 0.9210526315789473

Classification report:
               precision    recall  f1-score   support

           0      1.000     1.000     1.000        13
           1      0.917     0.846     0.880        13
           2      0.846     0.917     0.880        12

    accuracy                          0.921        38
   macro avg      0.921     0.921     0.920        38
weighted avg      0.923     0.921     0.921        38



In [5]:
import numpy as np
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, classification_report

from qiskit import QuantumCircuit
from qiskit.quantum_info import SparsePauliOp

# Prefer statevector simulator
try:
    from qiskit_aer.primitives import Estimator as AerEstimator
    EstimatorImpl = AerEstimator
    print("Using AerEstimator.")
except:
    from qiskit.primitives import Estimator
    EstimatorImpl = Estimator
    print("Using default Estimator.")


# -----------------------------
# 1) Load & preprocess Iris data
# -----------------------------
X, y = load_iris(return_X_y=True)
X = StandardScaler().fit_transform(X)

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.25, random_state=0, stratify=y
)

def one_hot(y, K=3):
    oh = np.zeros((len(y), K))
    oh[np.arange(len(y)), y] = 1
    return oh

Y_train = one_hot(y_train, 3)
Y_test  = one_hot(y_test,  3)


# -----------------------------
# 2) Model config
# -----------------------------
n_qubits = 2
L = 3                                   # reupload depth
OBS = [
    SparsePauliOp.from_list([("ZI",1)]),
    SparsePauliOp.from_list([("IZ",1)]),
    SparsePauliOp.from_list([("ZZ",1)]),
    SparsePauliOp.from_list([("XX",1)]),
    SparsePauliOp.from_list([("YY",1)]),
    SparsePauliOp.from_list([("XZ",1)]),
    SparsePauliOp.from_list([("ZX",1)]),
]
NUM_OBS = len(OBS)
NUM_CLASSES = 3

# total parameters:
# Q = 4*L variational
# R = 4x4 reupload matrix
# Wc, bc = classical head
N_Q = 4*L
N_R = 16
N_W = NUM_CLASSES*NUM_OBS
N_B = NUM_CLASSES
TOTAL = N_Q + N_R + N_W + N_B


def split_params(theta):
    q = theta[:N_Q]
    r = theta[N_Q:N_Q+N_R].reshape(4,4)
    w = theta[N_Q+N_R:N_Q+N_R+N_W].reshape(NUM_CLASSES, NUM_OBS)
    b = theta[N_Q+N_R+N_W:]
    return q, r, w, b


def l2norm(x):
    n = np.linalg.norm(x)
    return x/n if n>0 else x


def make_circuit(x_raw, qparams, Wr):
    x_raw = np.asarray(x_raw)
    x_norm = l2norm(x_raw)

    qc = QuantumCircuit(n_qubits)

    # amplitude embed ONCE
    qc.prepare_state(x_norm, range(n_qubits))

    idx = 0
    for _ in range(L):
        # --- reupload ---
        alpha = Wr @ x_raw
        qc.ry(alpha[0], 0)
        qc.rz(alpha[1], 0)
        qc.ry(alpha[2], 1)
        qc.rz(alpha[3], 1)

        # --- variational ---
        p0, p1, p2, p3 = qparams[idx:idx+4]
        idx += 4

        qc.ry(p0, 0)
        qc.rz(p1, 0)
        qc.ry(p2, 1)
        qc.rz(p3, 1)
        qc.cx(0,1)

    return qc


# -----------------------------
# 3) Forward pass
# -----------------------------
estimator = EstimatorImpl()

def features_batch(Xb, theta):
    qparams, Wr, Wc, bc = split_params(theta)
    circuits = [make_circuit(x, qparams, Wr) for x in Xb]

    feats = []
    for obs in OBS:
        job = estimator.run(circuits=circuits, observables=[obs]*len(circuits))
        vals = np.asarray(job.result().values)
        feats.append(vals)
    return np.stack(feats, axis=1), Wc, bc


def softmax(z):
    z = z - np.max(z,axis=1,keepdims=True)
    e = np.exp(z)
    return e/np.sum(e,axis=1,keepdims=True)


def predict_prob(Xb, theta):
    F, Wc, bc = features_batch(Xb, theta)
    logits = F @ Wc.T + bc
    return softmax(logits)


def loss_fn(Xb, Yb, theta):
    p = predict_prob(Xb, theta)
    return -np.mean(np.sum(Yb*np.log(p+1e-9),axis=1))


# -----------------------------
# 4) ADAM optimizer!!
# -----------------------------
def adam_train(theta, X, Y, epochs=60, batch_size=12, lr=0.015):
    m = np.zeros_like(theta)
    v = np.zeros_like(theta)
    b1, b2, eps = 0.9, 0.999, 1e-8

    def grad_est(theta, Xb, Yb, h=1e-3):
        g = np.zeros_like(theta)
        for i in range(len(theta)):
            tp = theta.copy(); tp[i]+=h
            tm = theta.copy(); tm[i]-=h
            g[i] = (loss_fn(Xb, Yb, tp) - loss_fn(Xb, Yb, tm))/(2*h)
        return g

    for ep in range(1, epochs+1):
        idx = np.random.permutation(len(X))
        for k in range(0, len(X), batch_size):
            j = idx[k:k+batch_size]
            Xb, Yb = X[j], Y[j]

            g = grad_est(theta, Xb, Yb)

            m = b1*m + (1-b1)*g
            v = b2*v + (1-b2)*(g*g)

            mhat = m/(1-b1**ep)
            vhat = v/(1-b2**ep)

            theta -= lr*mhat/(np.sqrt(vhat)+eps)

        if ep % 10 == 0:
            train_p = predict_prob(X, theta)
            acc = accuracy_score(y_train, np.argmax(train_p,axis=1))
            L = loss_fn(X, Y, theta)
            print(f"Epoch {ep:2d} | Loss: {L:.4f} | Acc: {acc:.3f}")

    return theta


# -----------------------------
# 5) Train
# -----------------------------
rng = np.random.default_rng(0)
theta0 = 0.05 * rng.standard_normal(TOTAL)

theta_opt = adam_train(theta0, X_train, Y_train, epochs=60, batch_size=12)


# -----------------------------
# 6) Evaluate
# -----------------------------
test_probs = predict_prob(X_test, theta_opt)
test_pred  = np.argmax(test_probs, axis=1)

print("\nTest accuracy:", accuracy_score(y_test, test_pred))
print("\nClassification report:\n", classification_report(y_test, test_pred, digits=3))


Using AerEstimator.


KeyboardInterrupt: 

In [None]:
import numpy as np
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, classification_report

from qiskit import QuantumCircuit
from qiskit.quantum_info import SparsePauliOp

try:
    from qiskit_aer.primitives import Estimator as AerEstimator
    EstimatorImpl = AerEstimator
except:
    from qiskit.primitives import Estimator
    EstimatorImpl = Estimator


# -----------------------------
# Data
# -----------------------------
X, y = load_iris(return_X_y=True)
X = StandardScaler().fit_transform(X)

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.25, random_state=0, stratify=y
)

NUM_CLASSES = 3

def one_hot(y):
    oh = np.zeros((len(y),NUM_CLASSES))
    oh[np.arange(len(y)),y] = 1
    return oh

Y_train = one_hot(y_train)
Y_test  = one_hot(y_test)


# -----------------------------
# Model config
# -----------------------------
n_qubits = 2
L = 2                                    # amplitude reupload depth
OBS = [
    SparsePauliOp.from_list([("ZI",1)]),
    SparsePauliOp.from_list([("IZ",1)]),
    SparsePauliOp.from_list([("ZZ",1)]),
    SparsePauliOp.from_list([("XX",1)]),
    SparsePauliOp.from_list([("YY",1)]),
    SparsePauliOp.from_list([("XZ",1)]),
    SparsePauliOp.from_list([("ZX",1)])
]
NUM_OBS = len(OBS)


N_Q = 4*L
N_W = NUM_CLASSES*NUM_OBS
N_B = NUM_CLASSES
TOTAL = N_Q + N_W + N_B


def split(theta):
    q = theta[:N_Q]
    w = theta[N_Q:N_Q+N_W].reshape(NUM_CLASSES, NUM_OBS)
    b = theta[N_Q+N_W:]
    return q, w, b


def l2norm(x):
    n = np.linalg.norm(x)
    return x/n if n>0 else x


def make_amp_reupload(xvec, qparams):
    xnorm = l2norm(xvec)
    qc = QuantumCircuit(n_qubits)
    idx = 0

    for _ in range(L):
        qc.prepare_state(xnorm, range(n_qubits))

        p0,p1,p2,p3 = qparams[idx:idx+4]
        idx += 4

        qc.ry(p0,0); qc.rz(p1,0)
        qc.ry(p2,1); qc.rz(p3,1)
        qc.cx(0,1)

    return qc


# -----------------------------
# Forward pass
# -----------------------------
estimator = EstimatorImpl()

def features_batch(Xb, theta):
    qparams, Wc, bc = split(theta)
    circuits = [make_amp_reupload(x, qparams) for x in Xb]

    feats = []
    for obs in OBS:
        job = estimator.run(circuits=circuits, observables=[obs]*len(circuits))
        vals = np.asarray(job.result().values)
        feats.append(vals)
    return np.stack(feats,axis=1), Wc, bc


def softmax(z):
    z = z - np.max(z,axis=1,keepdims=True)
    e = np.exp(z)
    return e/np.sum(e,axis=1,keepdims=True)


def predict_prob(Xb, theta):
    F, Wc, bc = features_batch(Xb, theta)
    logits = F @ Wc.T + bc
    return softmax(logits)


def loss_fn(Xb, Yb, theta):
    p = predict_prob(Xb, theta)
    return -np.mean(np.sum(Yb*np.log(p+1e-9),axis=1))


# -----------------------------
# Adam optimizer
# -----------------------------
def adam(theta, X, Y, epochs=60, batch=12, lr=0.02):
    m = np.zeros_like(theta)
    v = np.zeros_like(theta)
    b1, b2, eps = 0.9, 0.999, 1e-8

    def grad(theta, Xb, Yb, h=1e-3):
        g = np.zeros_like(theta)
        for i in range(len(theta)):
            tp = theta.copy(); tp[i]+=h
            tm = theta.copy(); tm[i]-=h
            g[i] = (loss_fn(Xb,Yb,tp) - loss_fn(Xb,Yb,tm))/(2*h)
        return g

    for ep in range(1,epochs+1):
        idx = np.random.permutation(len(X))
        for k in range(0,len(X),batch):
            j = idx[k:k+batch]
            Xb,Yb = X[j],Y[j]

            g = grad(theta, Xb, Yb)

            m = b1*m + (1-b1)*g
            v = b2*v + (1-b2)*(g*g)

            mhat = m/(1-b1**ep)
            vhat = v/(1-b2**ep)

            theta -= lr*mhat/(np.sqrt(vhat)+eps)

        if ep%10==0:
            L = loss_fn(X,Y,theta)
            acc = accuracy_score(y_train, np.argmax(predict_prob(X,theta),axis=1))
            print(f"[B2] Epoch {ep} | Loss={L:.4f} | Acc={acc:.3f}")

    return theta


# -----------------------------
# Train
# -----------------------------
rng = np.random.default_rng(0)
theta0 = 0.05*rng.standard_normal(TOTAL)
theta_opt = adam(theta0, X_train, Y_train, epochs=60, batch=12)


# -----------------------------
# Evaluate
# -----------------------------
test_probs = predict_prob(X_test, theta_opt)
test_pred  = np.argmax(test_probs,axis=1)

print("\n[B2] Test accuracy:", accuracy_score(y_test,test_pred))
print("\n[B2] Classification report:\n", classification_report(y_test,test_pred,digits=3))
