In [1]:
import os, json, time
import numpy as np

from scipy.optimize import minimize

from qiskit.quantum_info import SparsePauliOp

from qiskit_aer import AerSimulator
from qiskit_aer.primitives import EstimatorV2 as AerEstimator


from susy_qm import calculate_Hamiltonian, ansatze

import threading
from typing import List, Callable

In [2]:
class MultiSeedScheduler:
    """Coordinates multiple optimizers so they share a single batched quantum evaluation."""

    def __init__(self, num_seeds: int, batch_evaluator: Callable[[List[np.ndarray]], List[float]]):
        """
        num_seeds: number of optimizers / seeds (e.g. 10)
        batch_evaluator: function that takes [x_0, ..., x_{S-1}] and returns [E_0, ..., E_{S-1}]
        """
        self.num_seeds = num_seeds
        self.batch_evaluator = batch_evaluator

        self.lock = threading.Lock()
        self.cond = threading.Condition(self.lock)

        self.params: List[np.ndarray] = [None] * num_seeds   # latest x_k
        self.pending: List[bool] = [False] * num_seeds       # has seed k requested in this round?
        self.energies: List[float] = [None] * num_seeds      # last energy result for seed k

        self.active: List[bool] = [True] * num_seeds         # which seeds are still running
        self.round_id: int = 0                               # increments each completed batch

    def _all_active_pending(self) -> bool:
        return all((not self.active[k]) or self.pending[k]
                   for k in range(self.num_seeds))

    def _run_batch(self):
        """Run one batched quantum evaluation for all active seeds."""
        # Build list of active seeds and their params
        active_indices = [k for k in range(self.num_seeds) if self.active[k]]
        if not active_indices:
            return

        params_list = [self.params[k] for k in active_indices]

        # Call the quantum backend (no need to release lock for Aer testing)
        energies_list = self.batch_evaluator(params_list)

        # Store results and reset pending flags
        for idx, E in zip(active_indices, energies_list):
            self.energies[idx] = float(E)
            self.pending[idx] = False

        self.round_id += 1

    def request_eval(self, k: int, x_k: np.ndarray) -> float:
        """
        Called by the cost function for seed k.
        Blocks until all active seeds have requested an evaluation for this round,
        then returns the energy for seed k.
        """
        with self.lock:
            if not self.active[k]:
                raise RuntimeError(f"Seed {k} is inactive")

            self.params[k] = np.asarray(x_k, dtype=float)
            self.pending[k] = True
            my_round = self.round_id

            # If all active seeds are pending, this thread becomes the leader
            if self._all_active_pending():
                self._run_batch()
                self.cond.notify_all()
            else:
                # Wait until the round advances (batch finished)
                while self.round_id == my_round:
                    self.cond.wait()

            return self.energies[k]

    def deactivate_seed(self, k: int):
        """Mark seed k as finished so it stops participating in future batches."""
        with self.lock:
            self.active[k] = False
            self.pending[k] = True   # treat as satisfied for this round
            if self._all_active_pending():
                self._run_batch()
                self.cond.notify_all()


In [3]:
def run_multi_vqe_aer(
    H,
    ansatz,
    run_info,
    num_seeds: int = 10,
    max_iter: int = 200,
    initial_tr_radius: float = 0.3,
    final_tr_radius: float = 1e-8,
    lam: float = 15.0,
    p: int = 2,
    eps: float = 0.0,
):
    """
    Run multiple VQE seeds in parallel on Aer using a MultiSeedScheduler + COBYQA.
    """

    backend = AerSimulator(method="statevector")
    observable = SparsePauliOp.from_operator(H)

    num_params = run_info["num_params"]
    num_qubits = run_info["num_qubits"]

    # Build Qiskit ansatz circuit once
    qc = ansatze.pl_to_qiskit(ansatz, num_qubits=num_qubits, reverse_bits=True)

    # Simple AerEstimator (no noise) for now
    estimator = AerEstimator(
        options={
            "backend_options": {
                "method": "automatic",
                "seed_simulator": 1234,
            },
            "run_options": {
                "shots": run_info["shots"],
            },
        }
    )

    # --- 3.1 Batched quantum evaluator (no multi-copy yet, just classical batching) ---
    def batch_evaluator(params_list: List[np.ndarray]) -> List[float]:
        """
        params_list: [theta^(0), ..., theta^(S-1)],
        returns [E_0, ..., E_{S-1}].
        """
        pubs = []
        for params in params_list:
            pubs.append((qc, [observable], list(params)))

        job = estimator.run(pubs=pubs)
        results = job.result()  # list-like of results

        energies = [res.data.evs[0] for res in results]
        return energies

    # --- 3.2 Scheduler + per-seed optimizer threads ---

    scheduler = MultiSeedScheduler(num_seeds=num_seeds, batch_evaluator=batch_evaluator)

    opt_results = [None] * num_seeds
    threads = []

    def run_single_seed(k: int, x0_k: np.ndarray):
        """Target function for each optimizer thread."""

        def cost_k(x):
            # Get raw energy from shared quantum evaluation
            energy = scheduler.request_eval(k, x)
            # Apply your penalty for small negatives
            neg = max(0.0, -(energy + eps))
            return energy + lam * (neg ** p)

        res = minimize(
            cost_k,
            x0_k,
            method="COBYQA",
            options={
                "maxiter": max_iter,
                "maxfev": 2 * max_iter,
                "initial_tr_radius": initial_tr_radius,
                "final_tr_radius": final_tr_radius,
                "scale": True,
                "disp": False,
            },
        )

        opt_results[k] = res
        scheduler.deactivate_seed(k)

    # Initialise random seeds and start all optimizer threads
    for k in range(num_seeds):
        # you can make these seeds reproducible if you want
        np.random.seed((os.getpid() * int(time.time()) + k) % 123456789)
        x0_k = np.random.random(size=num_params) * 2 * np.pi

        t = threading.Thread(target=run_single_seed, args=(k, x0_k))
        t.start()
        threads.append(t)

    for t in threads:
        t.join()

    # Collect results into a nice dict
    all_energies = [res.fun for res in opt_results]
    all_params = [res.x.tolist() for res in opt_results]
    all_success = [bool(res.success) for res in opt_results]
    all_nfev = [int(res.nfev) for res in opt_results]

    summary = {
        "num_seeds": num_seeds,
        "energies": all_energies,
        "params": all_params,
        "success": all_success,
        "nfev": all_nfev,
        "min_energy": float(np.min(all_energies)),
        "median_energy": float(np.median(all_energies)),
    }

    return summary


In [None]:
if __name__ == "__main__":

    log_enabled = True

    backend_name = "Aer"  # for this test
    use_noise_model = 0
    shots = 4096
    optimization_level = 3
    resilience_level = 2

    potential = "QHO"
    cutoff = 2

    ansatze_type = "exact"

    if potential == "QHO":
        ansatz_name = f"CQAVQE_QHO_{ansatze_type}"
    elif (potential != "QHO") and (cutoff <= 64):
        ansatz_name = f"CQAVQE_{potential}{cutoff}_{ansatze_type}"
    else:
        ansatz_name = f"CQAVQE_{potential}16_{ansatze_type}"

    ansatz = ansatze.get(ansatz_name)

    max_iter = 200
    initial_tr_radius = 0.3
    final_tr_radius = 1e-8
    lam = 15
    p = 2

    if potential == "AHO":
        i = np.log2(cutoff)
        factor = 2 ** (((i - 1) * i) / 2)
        eps = 0.5 / factor
    else:
        eps = 0.0

    H = calculate_Hamiltonian(cutoff, potential)
    eigenvalues = np.sort(np.linalg.eigvals(H))[:4]
    num_qubits = int(1 + np.log2(cutoff))

    if ansatz_name == "real_amplitudes":
        num_params = 2 * num_qubits
    else:
        num_params = ansatz.n_params

    run_info = {
        "backend": backend_name,
        "use_noise_model": use_noise_model,
        "Potential": potential,
        "cutoff": cutoff,
        "num_qubits": num_qubits,
        "num_params": num_params,
        "shots": shots,
        "optimization_level": optimization_level,
        "resilience_level": resilience_level,
        "lam": lam,
        "p": p,
        "eps": eps,
        "ansatz_name": ansatz_name,
    }

    print("Running multi-seed VQE on Aer for testing...")
    summary = run_multi_vqe_aer(
        H,
        ansatz,
        run_info,
        num_seeds=10,
        max_iter=max_iter,
        initial_tr_radius=initial_tr_radius,
        final_tr_radius=final_tr_radius,
        lam=lam,
        p=p,
        eps=eps,
    )

    print("Multi-seed summary:")
    print(json.dumps(summary, indent=4))
    print("Done")


Running multi-seed VQE on Aer for testing...
