In [166]:
# import cudaq
import numpy as np
import sys
import os

notebook_dir = '/workspace/tutorial_notebook'
if os.path.exists(notebook_dir) and notebook_dir not in sys.path:
    sys.path.insert(0, notebook_dir)


In [167]:
class LabsEnergyCounter:
    """Callable LABS energy evaluator that counts how many times it's been called."""
    def __init__(self, warn_at: int | None = 200_000):
        self.count = 0
        self.warn_at = warn_at

    def reset(self) -> None:
        self.count = 0

    def __call__(self, s):
        """
        Compute the LABS energy function E(s) = sum_{k=1}^{N-1} C_k^2
        where C_k = sum_{i=1}^{N-k} s_i * s_{i+k}

        Args:
            s: Binary sequence with values in {-1, +1}

        Returns:
            Energy value (lower is better)
        """
        N = len(s)
        energy = 0
        for k in range(1, N):
            C_k = sum(s[i] * s[i + k] for i in range(N - k))
            energy += C_k ** 2

        self.count += 1
        if self.warn_at is not None and self.count == self.warn_at:
            print(f"Reached {self.warn_at:,} energy evaluations, exiting.")
        return energy

labs_energy = LabsEnergyCounter(warn_at=200_000)


In [168]:
def merit_factor(s, E) -> float:
    N = len(s)
    if E == 0:
        return float('inf')
    return N * N / (2 * E)

In [169]:

def combine(p1, p2):
    """
    Single-point crossover of two parent sequences (Algorithm 3 from paper).

    Args:
        p1: First parent sequence
        p2: Second parent sequence

    Returns:
        Child sequence combining p1[0:k] and p2[k:N]
    """
    N = len(p1)
    k = np.random.randint(1, N)  # cut point in {1, ..., N-1}
    return np.concatenate([p1[:k], p2[k:]])


In [170]:
def mutate(s, p_mut):
    """
    Probabilistic bit-flipping mutation (Algorithm 3 from paper).

    Args:
        s: Input sequence
        p_mut: Probability of flipping each bit

    Returns:
        Mutated sequence
    """
    s = s.copy()
    for i in range(len(s)):
        if np.random.random() < p_mut:
            s[i] = -s[i]  # flip +1 <-> -1
    return s


In [171]:
def tabu_search(s, max_iter=100, tabu_tenure=7):
    """
    Tabu search: a modified greedy local search that maintains a tabu list
    to avoid cycling back to recently visited solutions.

    Args:
        s: Starting sequence
        max_iter: Maximum number of iterations
        tabu_tenure: Number of iterations a move stays tabu

    Returns:
        best_s: Best sequence found
        best_energy: Energy of best sequence
    """
    s = s.copy()
    best_s = s.copy()
    best_energy = labs_energy(s)
    tabu_list = {}  # position -> iteration when it becomes non-tabu

    for iteration in range(max_iter):
        # Find best non-tabu neighbor (single bit flip)
        best_neighbor = None
        best_neighbor_energy = float('inf')
        best_flip_pos = -1

        for i in range(len(s)):
            # Check if move is tabu (unless it leads to aspiration criterion)
            is_tabu = tabu_list.get(i, 0) > iteration

            # Compute neighbor energy
            neighbor = s.copy()
            neighbor[i] = -neighbor[i]
            neighbor_energy = labs_energy(neighbor)

            # Accept if not tabu, or if it beats the best (aspiration criterion)
            if (not is_tabu) or (neighbor_energy < best_energy):
                if neighbor_energy < best_neighbor_energy:
                    best_neighbor = neighbor
                    best_neighbor_energy = neighbor_energy
                    best_flip_pos = i

        if best_neighbor is None:
            break

        # Move to best neighbor
        s = best_neighbor
        tabu_list[best_flip_pos] = iteration + tabu_tenure

        # Update global best if improved
        if best_neighbor_energy < best_energy:
            best_s = s.copy()
            best_energy = best_neighbor_energy

    return best_s, best_energy

In [188]:
def memetic_tabu_search(N, pop_size=10, max_generations=100,
                        p_mut=0.1, p_combine=0.5, target_energy=None,
                        tabu_max_iter=100, tabu_tenure=7, verbose=False,
                        initial_population=None):
    """
    Memetic Tabu Search (MTS) for the LABS problem.

    Args:
        N: Sequence length
        pop_size: Population size
        max_generations: Maximum number of generations
        p_mut: Mutation probability per bit
        p_combine: Probability of combining vs sampling
        target_energy: Stop if this energy is reached (optional)
        tabu_max_iter: Max iterations for tabu search
        tabu_tenure: Tabu tenure for tabu search
        verbose: Print progress if True
        initial_population: Optional list of initial sequences

    Returns:
        best_s: Best sequence found
        best_energy: Energy of best sequence
        population: Final population
        energies: Energies of final population
    """
    # Initialize population with values in {-1, +1}
    if initial_population is not None:
        population = [np.array(s) for s in initial_population[:pop_size]]
        # Fill remaining slots with random if needed
        while len(population) < pop_size:
            population.append(np.random.choice([-1, 1], size=N))
    else:
        population = [np.random.choice([-1, 1], size=N) for _ in range(pop_size)]

    energies = [labs_energy(s) for s in population]

    # Find best solution in initial population
    best_idx = np.argmin(energies)
    best_s = population[best_idx].copy()
    best_energy = energies[best_idx]

    if verbose:
        print(f"Initial best energy: {best_energy}")

    for gen in range(max_generations):
        # Check stopping criterion
        if target_energy is not None and best_energy <= target_energy:
            if verbose:
                print(f"Target energy reached at generation {gen}")
            break

        # Make child: combine or sample
        if np.random.random() < p_combine and pop_size >= 2:
            # Combine two parents
            idx1, idx2 = np.random.choice(pop_size, 2, replace=False)
            child = combine(population[idx1], population[idx2])
        else:
            # Sample from population
            idx = np.random.randint(pop_size)
            child = population[idx].copy()

        # Mutate child
        child = mutate(child, p_mut)

        # Run tabu search
        result, result_energy = tabu_search(child, max_iter=tabu_max_iter,
                                             tabu_tenure=tabu_tenure)
        
        if labs_energy.count >= labs_energy.warn_at:
            if verbose:
                print("Reached maximum allowed energy evaluations, stopping.")
            break

        # Update global best if improved
        better = result_energy < best_energy
        if better:
            best_s = result.copy()
            best_energy = result_energy

        if better or verbose:
            print(
                f"Generation {gen}: New best energy = {best_energy} "
                f"New merit factor = {merit_factor(best_s, best_energy)}, "
                f"labs_count= {labs_energy.count}",
                flush=True
            )

        # Add result to population (replace worst member if result is better)
        worst_idx = np.argmax(energies)
        if result_energy < energies[worst_idx]:
            population[worst_idx] = result
            energies[worst_idx] = result_energy

    return best_s, best_energy, population, energies

In [207]:
# Parallelize across N using process-based parallelism (fork) for reproducibility & isolation

Ns = [5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60, 65, 66, 70, 75, 80, 82]
results = []

# Problem parameters
pop_size = 20  # Population size
max_generations = 1000  # Number of generations

# seeds = 42, 7, 67, 58, 7841, 1234, 5678, 91011
seeds = [1234, 5678, 91011]


def _run_one_N(N: int, seed: int):
    np.random.seed(int(seed))

    labs_energy.reset()

    best_s, best_energy, population, energies = memetic_tabu_search(
        N=int(N),
        pop_size=pop_size,
        max_generations=max_generations,
        p_mut=0.1,
        p_combine=0.5,
        tabu_max_iter=100,
        tabu_tenure=7,
        verbose=False
    )

    return {
        "seed": int(seed),
        "N": int(N),
        "best_energy": int(best_energy),
        "merit_factor": float(merit_factor(best_s, best_energy)),
        "energy_evals": int(labs_energy.count),
        "best_s": best_s.copy(),
        "final_population": population,
        "final_energies": energies
    }


import multiprocessing as mp
import concurrent.futures as cf

print("Running Memetic Tabu Search for LABS (parallel over N, loop over seeds)")
print(f"Population size: {pop_size}, Max generations: {max_generations}")
print(f"Seeds: {seeds}")
print("-" * 50)

_ctx = mp.get_context("fork")
max_workers = min(len(Ns), (os.cpu_count() or 1))

for seed in seeds:
    seed_results = []

    with cf.ProcessPoolExecutor(max_workers=max_workers, mp_context=_ctx) as ex:
        futures = {ex.submit(_run_one_N, int(N), int(seed)): int(N) for N in Ns}
        for fut in cf.as_completed(futures):
            seed_results.append(fut.result())

    seed_results.sort(key=lambda r: Ns.index(r["N"]))
    results.extend(seed_results)

    print(f"\nSeed={seed} Summary (best_energy, merit_factor, evals):")
    for r in seed_results:
        print(f"N={r['N']}: E={r['best_energy']}, F={r['merit_factor']:.6f}, evals={r['energy_evals']}")



Seed=38273 Summary (best_energy, merit_factor, evals):
N=5: E=2, F=6.250000, evals=31660
N=10: E=13, F=3.846154, evals=200220
N=15: E=15, F=7.500000, evals=201154
N=20: E=26, F=7.692308, evals=200120
N=25: E=36, F=8.680556, evals=200100
N=30: E=75, F=6.000000, evals=201087
N=35: E=81, F=7.561728, evals=203078
N=40: E=136, F=5.882353, evals=200070
N=45: E=170, F=5.955882, evals=202565
N=50: E=225, F=5.555556, evals=200060
N=55: E=279, F=5.421147, evals=203557
N=60: E=338, F=5.325444, evals=204054
N=65: E=360, F=5.868056, evals=201551
N=66: E=377, F=5.777188, evals=204651
N=70: E=431, F=5.684455, evals=203049
N=75: E=521, F=5.398273, evals=202547
N=80: E=608, F=5.263158, evals=200045
N=82: E=689, F=4.879536, evals=205045


In [208]:
# Task: Export the current `results` list to a CSV file containing N, best_energy, merit_factor, energy_evals, and best_s.

import csv

out_path = "labs_mts_results2.csv"

with open(out_path, "w", newline="") as f:
    writer = csv.DictWriter(
        f,
        fieldnames=["N", "best_energy", "merit_factor", "energy_evals", "best_s"],
    )
    writer.writeheader()
    for r in results:
        best_s_arr = np.asarray(r.get("best_s", []))
        writer.writerow(
            {
                "N": int(r["N"]),
                "best_energy": int(r["best_energy"]),
                "merit_factor": float(r["merit_factor"]),
                "energy_evals": int(r["energy_evals"]),
                "best_s": " ".join(map(str, best_s_arr.tolist())),
            }
        )

print(f"Wrote {len(results)} rows to {out_path}")


Wrote 36 rows to labs_mts_results2.csv
