In [7]:
import numpy as np
import pickle
from datetime import datetime
import os
from qiskit import QuantumCircuit
from mqss.qiskit_adapter import MQSSQiskitAdapter
from qiskit import compiler
from qiskit.circuit import Parameter, ParameterVector
from qiskit_algorithms.optimizers import SPSA
from qiskit_algorithms.utils import algorithm_globals
_USE_MINIMIZE = True

%cd ..
from utils import scaled_data

C:\Ausbildung\Uni\BA_code


In [8]:
adapter = MQSSQiskitAdapter(token="")

all_online_backends = adapter.backends(online=True)
print("Available backends:", all_online_backends)

Available backends: [<mqss.qiskit_adapter.backend.MQSSQiskitBackend object at 0x0000024D6CB91820>, <mqss.qiskit_adapter.backend.MQSSQiskitBackend object at 0x0000024D6D18AFC0>, <mqss.qiskit_adapter.backend.MQSSQiskitBackend object at 0x0000024D6D35B7A0>]


In [9]:
backend = adapter.get_backend("QExa20")
print("Number of pending jobs:", backend.num_pending_jobs)

Number of pending jobs: 0


In [10]:
def basic_rz_crx_ry(n_qubits, layers_per_w, scaling=1):
    qc = QuantumCircuit(n_qubits, n_qubits)
    x = Parameter("x")

    # params per W layer: n RZ + n CRX + n RY = 3n
    # per W block: layers_per_w * 3n
    # full circuit has 2 W blocks => 2 * layers_per_w * 3n
    n_params = 2 * layers_per_w * 3 * n_qubits
    theta = ParameterVector("Î¸", n_params)

    t = 0
    def W():
        nonlocal t
        for _ in range(layers_per_w):
            # RZ on all qubits
            for q in range(n_qubits):
                qc.rz(theta[t], q)
                t += 1

            # CRX ring (q -> q+1 mod n)
            for q in range(n_qubits):
                qc.crx(theta[t], q, (q + 1) % n_qubits)  # includes last->first
                t += 1

            # RY on all qubits
            for q in range(n_qubits):
                qc.ry(theta[t], q)
                t += 1

    W()
    for w in range(n_qubits):
        qc.rx((scaling ** w) * x, w)
    W()
    qc.measure(range(n_qubits), range(n_qubits))

    assert t == n_params
    return qc, theta, x

In [11]:
def expval_pauliZ_from_counts(counts: dict[str, int], measured_clbit: int = 0) -> float:
    """
    <Z> from Qiskit counts for one classical bit.
    By default, Qiskit bitstrings have classical bit 0 on the RIGHT.
    """
    shots = sum(counts.values())
    if shots == 0:
        return 0.0

    z = 0.0
    for bitstr, c in counts.items():
        b = int(bitstr[::-1][measured_clbit])  # clbit index from right
        z += c * (1.0 if b == 0 else -1.0)
    return z / shots

In [12]:
def make_spsa_schedules(maxiter, a=0.02, c=0.15, A=50.0, alpha=0.602, gamma=0.101):
    k = np.arange(1, maxiter + 1, dtype=float)
    learning_rate = a / ((k + A) ** alpha)
    perturbation  = c / (k ** gamma)
    return learning_rate, perturbation

In [13]:
def build_objective_full_data(
    backend,
    qc_template,
    theta_params,
    x_param,
    x_train,
    y_train,
    shots,
):
    x_train = np.asarray(x_train, dtype=float)
    y_train = np.asarray(y_train, dtype=float)

    theta_list = list(theta_params)

    def objective(theta_vec):
        theta_vec = np.asarray(theta_vec, dtype=float)

        circuits = []
        # build one circuit per data point
        for x in x_train:
            bind = {theta_list[i]: float(theta_vec[i]) for i in range(len(theta_list))}
            bind[x_param] = float(x)
            circuits.append(qc_template.assign_parameters(bind, inplace=False))

        job = backend.run(circuits, shots=shots)
        res = job.result()

        preds = np.empty(len(circuits), dtype=float)
        for i in range(len(circuits)):
            preds[i] = expval_pauliZ_from_counts(res.get_counts(i), measured_clbit=0)

        return 0.5 * float(np.mean((y_train - preds) ** 2))

    return objective

In [14]:
def eval_loss_full_data(backend, qc_template, theta_params, x_param, theta_vec, x_data, y_data, shots=500):
    """Compute 0.5 * mean((y - pred)^2) over the full dataset."""
    x_data = np.asarray(x_data, dtype=float)
    y_data = np.asarray(y_data, dtype=float)
    theta_vec = np.asarray(theta_vec, dtype=float)

    theta_list = list(theta_params)

    circuits = []
    for x in x_data:
        bind = {theta_list[i]: float(theta_vec[i]) for i in range(len(theta_list))}
        bind[x_param] = float(x)
        circuits.append(qc_template.assign_parameters(bind, inplace=False))

    job = backend.run(circuits, shots=shots)
    res = job.result()

    preds = np.empty(len(circuits), dtype=float)
    for i in range(len(circuits)):
        preds[i] = expval_pauliZ_from_counts(res.get_counts(i), measured_clbit=0)

    return 0.5 * float(np.mean((y_data - preds) ** 2))

In [15]:
def save_checkpoint(path: str):
    tmp = path + ".tmp"
    with open(tmp, "wb") as f:
        pickle.dump(state, f)
    os.replace(tmp, path)

RUN_ID = datetime.now().strftime("%Y%m%d_%H%M%S")
saving_period = 10

state = {
    "theta_last": None,          # last seen params (for resume)
    "loss_history": [],          # loss each SPSA iteration
    "param_checkpoints": [],     # list of (iter, theta) saved occasionally
    "run_id": RUN_ID,
}
_iter = 0
pkl_path = f"basic_rz_crx_ry_fourier_QExa20{RUN_ID}.pkl"

def spsa_callback(nfev, params, value, stepsize, accepted):
    global _iter
    _iter += 1
    state["theta_last"] = params
    state["loss_history"].append(value)

    if _iter % 1 == 0:
        print(f"iter={_iter:5d}  loss={float(value):.8f}")

    if _iter % saving_period == 0:
        state["param_checkpoints"].append((_iter, params))

    if _iter % saving_period == 0:
        save_checkpoint(pkl_path)


In [None]:
n_points = 150
seed = 42
x_train, x_test, y_train, y_test = scaled_data(n_points=n_points, seed=seed)

# convert JAX arrays to numpy
x_train = np.asarray(x_train, dtype=float)
x_test  = np.asarray(x_test, dtype=float)
y_train = np.asarray(y_train, dtype=float)
y_test  = np.asarray(y_test, dtype=float)

qc, theta_params, x_param = basic_rz_crx_ry(
    n_qubits=10,
    layers_per_w=3,
)

train_shots = 1000
objective = build_objective_full_data(
    backend=backend,
    qc_template=qc,
    theta_params=theta_params,
    x_param=x_param,
    x_train=x_train,
    y_train=y_train,
    shots=train_shots,
)

iterations = 200

lr, pt = make_spsa_schedules(
    maxiter=iterations,
    a=0.02,   # step size scale
    c=0.15,   # perturbation scale
    A=50.0,
    alpha=0.602,
    gamma=0.101,
)

algorithm_globals.random_seed = seed

spsa = SPSA(
    maxiter=iterations,
    learning_rate=lr,
    perturbation=pt,
    blocking=True,
    last_avg=5,
)

spsa.callback = spsa_callback
# Initialization
rng = np.random.default_rng(seed)
x0 = rng.normal(loc=0.0, scale=0.01, size=len(theta_params))

result = spsa.minimize(objective, x0=x0)
theta_opt = np.array(result.x, dtype=float)

state["theta_last"] = theta_opt
save_checkpoint(pkl_path)

print("Done.")
print("Final objective loss:", float(result.fun))

final_train_loss = eval_loss_full_data(
    backend, qc, theta_params, x_param, theta_opt, x_train, y_train, shots=train_shots
)
final_test_loss = eval_loss_full_data(
    backend, qc, theta_params, x_param, theta_opt, x_test, y_test, shots=train_shots
)

print("Final train loss:", final_train_loss)
print("Final test  loss:", final_test_loss)