In [1]:
# labs_energy.py
from __future__ import annotations
from dataclasses import dataclass
from typing import List, Tuple

In [2]:
def bits01_to_pm1(bits01: List[int]):   
    return [1 if b == 0 else -1 for b in bits01]
def labs_energy_pm1(s: List[int]) -> int:
    """LABS objective E(s)=sum_{k=1}^{N-1} C_k^2 with s_i in {+1,-1}."""
    n = len(s)
    e = 0
    for k in range(1, n):
        ck = 0
        for i in range(n - k):
            ck += s[i] * s[i + k]
        e += ck * ck
    return e

cong = [1,0,0,1]
print(labs_energy_pm1(bits01_to_pm1(cong)))

6


In [3]:
@dataclass(frozen=True)
class PauliTerm:
    word: str      # length N string of I/X/Y/Z
    coeff: float   # coefficient in H (we'll treat H as sum coeff * Z...Z)

def labs_ising_terms_Z(N: int) -> List[PauliTerm]:
    """
    Build Z-only Pauli terms whose classical energies reproduce LABS energy
    up to an additive constant / scaling.

    Implementation note:
    - The exact coefficient pattern depends on your chosen mapping convention.
    - For Phase 2 validation, the most important part is INTERNAL CONSISTENCY:
      (i) your classical energy function
      (ii) your Hamiltonian energy evaluation built from these terms
      match on all bitstrings for small N.
    """

    terms: List[PauliTerm] = []
    for k in range(1, N):
        pairs = [(i, i + k) for i in range(N - k)]
        # i<j over pairs
        for a in range(len(pairs)):
            for b in range(a + 1, len(pairs)):
                (i1, i2) = pairs[a]
                (j1, j2) = pairs[b]
                word = ["I"] * N
                for idx in (i1, i2, j1, j2):
                    word[idx] = "Z"
                terms.append(PauliTerm("".join(word), coeff=2.0))

    return terms

def ham_energy_from_terms_pm1(s_pm1: List[int], terms: List[PauliTerm]) -> float:
    """Evaluate sum coeff * product_{Z sites} s_i for Z-only terms."""
    n = len(s_pm1)
    e = 0.0
    for t in terms:
        assert len(t.word) == n
        prod = 1
        for i, p in enumerate(t.word):
            if p == "Z":
                prod *= s_pm1[i]
            elif p != "I":
                raise ValueError("This evaluator supports Z-only terms.")
        e += t.coeff * prod
    return e

In [5]:
from __future__ import annotations
from typing import List, Tuple, Dict
import math
import cudaq  # CUDA-Q Python
from labs_energy import PauliTerm, bits01_to_pm1, labs_energy_pm1


6


In [48]:
# qa_anneal.py
from __future__ import annotations
from typing import List, Tuple
import cudaq  # CUDA-Q Python
from labs_energy import PauliTerm, bits01_to_pm1, labs_energy_pm1


def linear_schedule(m: int, M: int) -> Tuple[float, float]:
    """Return (A, B) at step m in [0..M-1]."""
    s = (m + 1) / M
    A = 1.0 - s
    B = s
    return A, B


@cudaq.kernel
def qa_anneal_kernel(num_qubits: int,
                     terms_words: List[cudaq.pauli_word],
                     terms_coeffs: List[float],
                     total_time: float,
                     steps: int):

    q = cudaq.qvector(num_qubits)

    # Start in |+>^N
    for i in range(num_qubits):
        h(q[i])

    dt = total_time / steps

    # Trotterized annealing evolution
    for m in range(steps):
        # linear schedule
        s = (m + 1) / steps
        A = 1.0 - s
        B = s

        # Mixer: exp(-i dt A sum X_i) = Π_i Rx(2 dt A)
        theta_x = 2.0 * dt * A
        for i in range(num_qubits):
            rx(theta_x, q[i])

        # Problem: exp(-i dt B * Σ_j coeff_j * P_j)
        # CUDA-Q exp_pauli implements exp(i angle * P), so use angle = -dt * B * coeff
        for j in range(len(terms_words)):
            angle = -dt * B * terms_coeffs[j]
            exp_pauli(angle, q, terms_words[j])   # ✅ correct usage, no lists in kernel


def sample_anneal_best(N: int,
                       terms: List[PauliTerm],
                       shots: int = 2000,
                       total_time: float = 1.5,
                       steps: int = 60,
                       ) -> Tuple[str, int]:
    """
    Run QA-like sampling and return (best_bitstring, best_LABS_energy).
    """
    #cudaq.set_target(target)   # e.g. "qpp" (CPU) or "nvidia" (GPU)

    # Convert terms to CUDA-Q pauli_word objects, e.g. "IZZI...Z"
    words = [cudaq.pauli_word(t.word) for t in terms]
    coeffs = [float(t.coeff) for t in terms]

    counts = cudaq.sample(
        qa_anneal_kernel,
        N, words, coeffs, total_time, steps,
        shots_count=shots
    )

    best_e = None
    best_b = None
    for b, _c in counts.items():
        bits01 = [int(ch) for ch in b]
        s_pm1 = bits01_to_pm1(bits01)
        e = labs_energy_pm1(s_pm1)
        if best_e is None or e < best_e:
            best_e, best_b = e, b

    return best_b, int(best_e)


In [58]:
def anneal_population(N: int,
                      terms,
                      pop_size: int = 32,
                      shots: int = 5000,
                      total_time: float = 1.0,
                      steps: int = 30,
                      target: str = "qpp") -> List[Tuple[str, int]]:
    """
    Return a population list [(bitstring, labs_energy), ...] of size pop_size
    using ONE annealing run and selecting the best unique outcomes.
    """
    import cudaq
    #cudaq.set_target(target)

    words = [cudaq.pauli_word(t.word) for t in terms]
    coeffs = [float(t.coeff) for t in terms]

    counts = cudaq.sample(
        qa_anneal_kernel,
        N, words, coeffs, total_time, steps,
        shots_count=shots
    )

    scored = []
    for b, _c in counts.items():
        s_pm1 = bits01_to_pm1([int(ch) for ch in b])
        e = labs_energy_pm1(s_pm1)
        scored.append((b, int(e)))

    scored.sort(key=lambda x: x[1])  # low energy first

    # keep best UNIQUE
    pop = []
    seen = set()
    for b, e in scored:
        if b in seen:
            continue
        pop.append((b, e))
        seen.add(b)
        if len(pop) >= pop_size:
            break

    return pop

In [59]:
N = 10
terms = labs_ising_terms_Z(N)

pop = anneal_population(N, terms, pop_size=32, shots=5, total_time=1.0, steps=30)
for b, e in pop[:10]:
    print(b, e)


1111001110 37
1010101100 45
1111000111 85
0011001100 125
1101010101 141


In [65]:
import numpy as np
import cupy as cp

def pop_to_cupy_population(pop, n_bits: int, n_agents: int, fill_random: bool = True) -> cp.ndarray:
    """
    pop: List[Tuple[str, int]] or List[str]
         bitstrings like "0101..."
    Returns: cp.ndarray shape (n_agents, n_bits), dtype int8, values 0/1.
    """
    # accept either [(b,e), ...] or [b, ...]
    bitstrings = [x[0] if isinstance(x, (tuple, list)) else x for x in pop]

    # take up to n_agents seeds
    k = min(len(bitstrings), n_agents)

    seeds_cpu = np.zeros((n_agents, n_bits), dtype=np.int8)

    # fill seeds
    for i in range(k):
        b = bitstrings[i]
        if len(b) != n_bits:
            raise ValueError(f"Bitstring length {len(b)} != n_bits {n_bits}: {b}")
        seeds_cpu[i, :] = np.fromiter((1 if ch == "1" else 0 for ch in b), dtype=np.int8, count=n_bits)

    # fill remaining agents randomly (optional)
    if fill_random and k < n_agents:
        seeds_cpu[k:, :] = np.random.randint(0, 2, size=(n_agents - k, n_bits), dtype=np.int8)

    return cp.asarray(seeds_cpu)


In [63]:
class LabsMTS_GPU_Opt:
    def __init__(self, n_bits, n_agents, max_iter):
        self.n_bits = n_bits
        self.n_agents = n_agents
        self.max_iter = max_iter
        self.energy_kernel = cp.RawKernel(labs_energy_kernel_code, 'labs_energy')

    # ... keep calculate_labs_energy_batch and get_neighbor_batch unchanged ...

    def run(self, initial_population: cp.ndarray | None = None):
        print(f"Starting Optimized MTS (Custom CUDA) for LABS (N={self.n_bits}) with {self.n_agents} agents...")

        
        if initial_population is None:
            population = cp.random.randint(2, size=(self.n_agents, self.n_bits), dtype=cp.int8)
        else:
            if not isinstance(initial_population, cp.ndarray):
                raise TypeError("initial_population must be a cupy.ndarray")
            if initial_population.shape != (self.n_agents, self.n_bits):
                raise ValueError(f"initial_population shape must be ({self.n_agents}, {self.n_bits})")
            if initial_population.dtype != cp.int8:
                initial_population = initial_population.astype(cp.int8)
            population = initial_population.copy()

        energies = self.calculate_labs_energy_batch(population)

        best_idx = cp.argmin(energies)
        global_best_solution = population[best_idx].copy()
        global_best_energy = energies[best_idx]

        history_best_energy = []
        current_global_best_cpu = float(global_best_energy)
        print(f"Initial Best Energy: {current_global_best_cpu}")

        start_time = time.time()
        strength_2 = max(2, int(self.n_bits * 0.05))

        for it in range(self.max_iter):
            candidate_1 = self.get_neighbor_batch(population, perturbation_strength=1)
            e1 = self.calculate_labs_energy_batch(candidate_1)

            improved_mask = e1 < energies
            population[improved_mask] = candidate_1[improved_mask]
            energies[improved_mask] = e1[improved_mask]

            not_improved_mask = ~improved_mask
            if cp.any(not_improved_mask):
                candidate_2 = self.get_neighbor_batch(population, perturbation_strength=strength_2)
                e2 = self.calculate_labs_energy_batch(candidate_2)

                improved_s2_mask = (e2 < energies) & not_improved_mask
                population[improved_s2_mask] = candidate_2[improved_s2_mask]
                energies[improved_s2_mask] = e2[improved_s2_mask]

            current_best_idx = cp.argmin(energies)
            current_min_energy = energies[current_best_idx]

            if current_min_energy < global_best_energy:
                global_best_energy = current_min_energy
                global_best_solution = population[current_best_idx].copy()
                current_global_best_cpu = float(global_best_energy)
                print(f"Iter {it}: New Best: {current_global_best_cpu}")

            history_best_energy.append(current_global_best_cpu)

        cp.cuda.Stream.null.synchronize()
        total_time = time.time() - start_time
        print(f"Optimization finished in {total_time:.2f} seconds. ({(self.max_iter/total_time):.1f} it/s)")

        return (cp.asnumpy(global_best_solution),
                float(global_best_energy),
                cp.asnumpy(energies),
                history_best_energy)


In [None]:
# quantum side
N = 10
terms = labs_ising_terms_Z(N)

pop = anneal_population(
    N, terms,
    pop_size=64,
    shots=50,
    total_time=1.0,
    steps=30,   # or "nvidia" when ready
)

# convert pop -> initial population on GPU
N_AGENTS = 1000
init_pop_gpu = pop_to_cupy_population(pop, n_bits=N, n_agents=N_AGENTS, fill_random=True)

# classical MTS warm-started by QA
MAX_ITER = 10
mts_opt = LabsMTS_GPU_Opt(N, N_AGENTS, MAX_ITER)
best_sol, best_energy, final_energies, history = mts_opt.run(initial_population=init_pop_gpu)

print("Final Best Energy:", best_energy)
print("Best solution bits:", best_sol.astype(int))
