In [16]:
# ==== QCNN sweep: self-contained (no external defs needed) ====
import json, time
import numpy as np
import matplotlib.pyplot as plt

from qiskit import QuantumCircuit
from qiskit.circuit import ParameterVector
from qiskit.circuit.library import ZFeatureMap
from qiskit.quantum_info import SparsePauliOp
from qiskit.primitives import StatevectorEstimator as Estimator

from qiskit_machine_learning.utils import algorithm_globals
from qiskit_machine_learning.neural_networks import EstimatorQNN
from qiskit_machine_learning.algorithms.classifiers import NeuralNetworkClassifier
from qiskit_machine_learning.optimizers import COBYLA

from sklearn.model_selection import train_test_split

# -------------------------
# Building blocks (conv/pool)
# -------------------------
def conv_circuit(params):
    target = QuantumCircuit(2)
    target.rz(-np.pi / 2, 1)
    target.cx(1, 0)
    target.rz(params[0], 0)
    target.ry(params[1], 1)
    target.cx(0, 1)
    target.ry(params[2], 1)
    target.cx(1, 0)
    target.rz(np.pi / 2, 0)
    return target

def conv_layer(num_qubits, param_prefix):
    qc = QuantumCircuit(num_qubits, name="Convolutional Layer")
    qubits = list(range(num_qubits))
    param_index = 0
    params = ParameterVector(param_prefix, length=num_qubits * 3)

    for q1, q2 in zip(qubits[0::2], qubits[1::2]):
        qc = qc.compose(conv_circuit(params[param_index : param_index + 3]), [q1, q2])
        qc.barrier()
        param_index += 3

    for q1, q2 in zip(qubits[1::2], qubits[2::2] + [0]):
        qc = qc.compose(conv_circuit(params[param_index : param_index + 3]), [q1, q2])
        qc.barrier()
        param_index += 3

    qc_inst = qc.to_instruction()
    qc2 = QuantumCircuit(num_qubits)
    qc2.append(qc_inst, qubits)
    return qc2

def pool_circuit(params):
    target = QuantumCircuit(2)
    target.rz(-np.pi / 2, 1)
    target.cx(1, 0)
    target.rz(params[0], 0)
    target.ry(params[1], 1)
    target.cx(0, 1)
    target.ry(params[2], 1)
    return target

def pool_layer(sources, sinks, param_prefix):
    num_qubits = len(sources) + len(sinks)
    qc = QuantumCircuit(num_qubits, name="Pooling Layer")
    param_index = 0
    params = ParameterVector(param_prefix, length=(num_qubits // 2) * 3)

    for source, sink in zip(sources, sinks):
        qc = qc.compose(pool_circuit(params[param_index : param_index + 3]), [source, sink])
        qc.barrier()
        param_index += 3

    qc_inst = qc.to_instruction()
    qc2 = QuantumCircuit(num_qubits)
    qc2.append(qc_inst, range(num_qubits))
    return qc2

# -------------------------
# Data generation (same as your earlier function)
# -------------------------
def generate_dataset(num_images):
    images, labels = [], []
    hor_array = np.zeros((6, 8))
    ver_array = np.zeros((4, 8))

    j = 0
    for i in range(0, 7):
        if i != 3:
            hor_array[j][i] = np.pi / 2
            hor_array[j][i + 1] = np.pi / 2
            j += 1

    j = 0
    for i in range(0, 4):
        ver_array[j][i] = np.pi / 2
        ver_array[j][i + 4] = np.pi / 2
        j += 1

    for _ in range(num_images):
        rng = algorithm_globals.random.integers(0, 2)
        if rng == 0:
            labels.append(-1)
            random_image = algorithm_globals.random.integers(0, 6)
            images.append(np.array(hor_array[random_image]))
        else:
            labels.append(1)
            random_image = algorithm_globals.random.integers(0, 4)
            images.append(np.array(ver_array[random_image]))
        # add noise
        for k in range(8):
            if images[-1][k] == 0:
                images[-1][k] = algorithm_globals.random.uniform(0, np.pi / 4)
    return images, labels

# -------------------------
# Build the QCNN (measures qubit 7)
# -------------------------
def build_qcnn(estimator):
    feature_map = ZFeatureMap(8)

    ansatz = QuantumCircuit(8, name="Ansatz")
    ansatz.compose(conv_layer(8, "c1"), range(8), inplace=True)
    ansatz.compose(pool_layer([0,1,2,3], [4,5,6,7], "p1"), range(8), inplace=True)
    ansatz.compose(conv_layer(4, "c2"), range(4,8), inplace=True)
    ansatz.compose(pool_layer([0,1], [2,3], "p2"), range(4,8), inplace=True)
    ansatz.compose(conv_layer(2, "c3"), range(6,8), inplace=True)
    ansatz.compose(pool_layer([0], [1], "p3"), range(6,8), inplace=True)

    circuit = QuantumCircuit(8)
    circuit.compose(feature_map, range(8), inplace=True)
    circuit.compose(ansatz, range(8), inplace=True)

    observable = SparsePauliOp.from_list([("I"*7 + "Z", 1)])  # readout on qubit 7

    qnn = EstimatorQNN(
        circuit=circuit.decompose(),
        observables=observable,
        input_params=feature_map.parameters,
        weight_params=ansatz.parameters,
        estimator=estimator,
    )
    return qnn

# -------------------------
# One trial
# -------------------------
def run_trial(seed, num_images=200, maxiter=200, estimator=None, dataset=None,
              split_random_state=None, optimizer_kind="COBYLA"):
    algorithm_globals.random_seed = int(seed)
    rng = np.random.default_rng(int(seed))

    # dataset (fresh or fixed)
    if dataset is None:
        images, labels = generate_dataset(num_images)
    else:
        images, labels = dataset

    rs = split_random_state if split_random_state is not None else int(seed)
    x_tr, x_te, y_tr, y_te = train_test_split(
        images, labels, test_size=0.30, random_state=rs, stratify=labels
    )

    qnn = build_qcnn(estimator or Estimator())
    init = rng.uniform(-np.pi, np.pi, qnn.num_weights)

    # pick optimizer
    if optimizer_kind == "COBYLA":
        optimizer = COBYLA(maxiter=maxiter)
    else:
        try:
            from qiskit_algorithms.optimizers import SPSA, L_BFGS_B
        except Exception:
            from qiskit.algorithms.optimizers import SPSA, L_BFGS_B
        optimizer = {"SPSA": SPSA(maxiter=maxiter),
                     "L_BFGS_B": L_BFGS_B(maxiter=maxiter)}.get(optimizer_kind, COBYLA(maxiter=maxiter))

    clf = NeuralNetworkClassifier(qnn, optimizer=optimizer, initial_point=init, callback=None)

    xtr, ytr = np.asarray(x_tr), np.asarray(y_tr)
    clf.fit(xtr, ytr)

    train_acc = float(clf.score(xtr, ytr))
    xte, yte = np.asarray(x_te), np.asarray(y_te)
    test_acc  = float(clf.score(xte, yte))

    return {"seed": int(seed), "train_acc": train_acc, "test_acc": test_acc}

# -------------------------
# Many trials + histogram
# -------------------------
def run_many(n_trials=10, num_images=200, maxiter=200, vary_dataset=True, vary_split=True,
             optimizer_kind="COBYLA", save_csv_path=None, show_hist=True):
    import time, csv
    from qiskit.primitives import StatevectorEstimator as Estimator

    est = Estimator()
    results = []

    fixed_dataset = None
    if not vary_dataset:
        fixed_dataset = generate_dataset(num_images)

    t0 = time.time()
    for i in range(n_trials):
        seed = 10000 + i
        dataset = None if vary_dataset else fixed_dataset
        split_rs = (seed if vary_split else 246)
        res = run_trial(seed, num_images, maxiter, est, dataset, split_rs, optimizer_kind)
        results.append(res)
        print(f"[{i+1}/{n_trials}] seed={seed} train={res['train_acc']:.3f} test={res['test_acc']:.3f}")

    test_accs = np.array([r["test_acc"] for r in results])
    train_accs = np.array([r["train_acc"] for r in results])

    print(f"\nTrials: {n_trials} | elapsed: {time.time()-t0:.1f}s")
    print(f"Test acc  mean={test_accs.mean():.3f}  std={test_accs.std(ddof=1):.3f}  "
          f"min={test_accs.min():.3f}  max={test_accs.max():.3f}")
    print(f"Train acc mean={train_accs.mean():.3f}  std={train_accs.std(ddof=1):.3f}  "
          f"min={train_accs.min():.3f}  max={train_accs.max():.3f}")

    if show_hist:
        plt.figure(figsize=(6,4))
        plt.hist(test_accs, bins=10)
        plt.xlabel("Test Accuracy")
        plt.ylabel("Count")
        plt.title(f"Distribution of Test Accuracy (n={n_trials})")
        plt.show()

    # --- CSV save instead of JSON ---
    if save_csv_path:
        fieldnames = ["trial", "seed", "train_acc", "test_acc",
                      "num_images", "maxiter", "vary_dataset", "vary_split", "optimizer"]
        with open(save_csv_path, "w", newline="") as f:
            w = csv.DictWriter(f, fieldnames=fieldnames)
            w.writeheader()
            for i, r in enumerate(results, start=1):
                w.writerow({
                    "trial": i,
                    "seed": r["seed"],
                    "train_acc": float(r["train_acc"]),
                    "test_acc": float(r["test_acc"]),
                    "num_images": int(num_images),
                    "maxiter": int(maxiter),
                    "vary_dataset": bool(vary_dataset),
                    "vary_split": bool(vary_split),
                    "optimizer": str(optimizer_kind),
                })
        print(f"Saved results to {save_csv_path}")

    return results


In [17]:
results = run_many(n_trials=50, num_images=50, maxiter=200, vary_dataset=True, vary_split=True, save_csv_path="bc_hi.csv")


No gradient function provided, creating a gradient function. If your Estimator requires transpilation, please provide a pass manager.


KeyboardInterrupt: 

In [None]:
res_cobyla = run_many(20, 200, 200, True, True, optimizer_kind="COBYLA", show_hist=False)
res_spsa   = run_many(20, 200, 200, True, True, optimizer_kind="SPSA", show_hist=False)


In [None]:
_ = run_many(30, 200, 200, True, True, save_path="qcnn_sweep_results.json")


In [None]:
#Do 3 more test with different trail, images, maxiter