requirements

In [30]:
%pip install dimod
%pip install dwave-neal
%pip install protes==0.3.12



DynamicMatrix

In [31]:
import numpy as np
def select_k_from_n(self,n: int, k: int,rng: np.random.Generator) -> list[int]:
        selected = []
        remaining_slots = k
        for i in range(n):
            prob_select = remaining_slots / (n - i)
            if rng.random() < prob_select:
                selected.append(i)
                remaining_slots -= 1

                if remaining_slots == 0:
                    break
        return selected
class DynamicMatrix:
    def __init__(self, initial_n=1):
        self.n = initial_n
        self.matrix = [[0] * self.n for _ in range(self.n)]

    def increment(self, x=1):
        self.n += x
        # Add new columns to existing rows
        for row in self.matrix:
            row.extend([0] * x)
        # Add new rows
        for _ in range(x):
            self.matrix.append([0] * self.n)

    def set_value(self, row, col, value):
        if 0 <= row < self.n and 0 <= col < self.n:
            self.matrix[row][col] = value
        else:
            raise IndexError("Index out of bounds")

    def get_value(self, row, col):
        if 0 <= row < self.n and 0 <= col < self.n:
            return self.matrix[row][col]
        raise IndexError("Index out of bounds")

    def __getitem__(self, idx):
        row, col = idx
        return self.get_value(row, col)

    def __setitem__(self, idx, value):
        row, col = idx
        self.set_value(row, col, value)

    def get_qubo(self):
        return  np.array(self.matrix,dtype=np.float64)

    def __str__(self):
        return '\n'.join(' '.join(str(x) for x in row) for row in self.matrix)

def add_equality(Q:DynamicMatrix,V:list,I:list,C,Panelty=1000):
    #sum V[i]*I[i] =C
    for ii,i in enumerate(I):
        for jj,j in enumerate(I):
            if j==i:
                Q[i,i]+=(V[ii]*V[ii]-2*V[ii]*C)*Panelty
                continue
            Q[i,j]+=V[ii]*V[jj]*Panelty

def make_symetric(Q:DynamicMatrix):
    for i in range(Q.n):
        for j in range(i+1,Q.n):
            Q[i,j]=Q[j,i]=(Q[i,j]+Q[j,i])/2

constraints.py

In [32]:
from __future__ import annotations
from dataclasses import dataclass
from typing import List, Optional, Sequence, Tuple
import numpy as np

class Constraint:
    """Base class for constraints."""

    def is_feasible(self, x: np.ndarray) -> bool:
        raise NotImplementedError

    def violation(self, x: np.ndarray) -> float:
        """Nonnegative violation magnitude; 0 if feasible."""
        raise NotImplementedError


@dataclass
class CardinalityConstraint(Constraint):
    """Enforce sum(x) == K."""

    K: int

    def is_feasible(self, x: np.ndarray) -> bool:
        return int(np.sum(x)) == int(self.K)

    def violation(self, x: np.ndarray) -> float:
        s = float(np.sum(x))
        return (s - float(self.K)) ** 2


@dataclass
class LinearInequalityConstraint(Constraint):
    """Enforce A x <= b (componentwise)."""

    A: np.ndarray  # (m, d)
    b: np.ndarray  # (m,)

    def __post_init__(self):
        self.A = np.asarray(self.A, dtype=np.float64)
        self.b = np.asarray(self.b, dtype=np.float64)

    def is_feasible(self, x: np.ndarray) -> bool:
        Ax = self.A @ x.astype(np.float64)
        return bool(np.all(Ax <= self.b + 1e-12))

    def violation(self, x: np.ndarray) -> float:
        Ax = self.A @ x.astype(np.float64)
        v = np.maximum(0.0, Ax - self.b)**2
        return float(np.sum(v))


@dataclass
class CompositeConstraint(Constraint):
    """Logical AND of multiple constraints."""

    constraints: List[Constraint]

    def is_feasible(self, x: np.ndarray) -> bool:
        return all(c.is_feasible(x) for c in self.constraints)

    def violation(self, x: np.ndarray) -> float:
        return float(sum(c.violation(x) for c in self.constraints))



@dataclass
class OneHotGroupsConstraint(Constraint):
    """Enforce one-hot in each group: for every G, sum_{i in G} x_i == 1."""

    groups: List[np.ndarray]  # each group is a 1D array of indices

    def __post_init__(self):
        norm_groups: List[np.ndarray] = []
        for g in self.groups:
            gi = np.asarray(g, dtype=np.int64).reshape(-1)
            if gi.size == 0:
                raise ValueError("OneHotGroupsConstraint: empty group is not allowed")
            norm_groups.append(gi)
        self.groups = norm_groups

    def is_feasible(self, x: np.ndarray) -> bool:
        x = np.asarray(x, dtype=np.int8)
        for g in self.groups:
            if int(np.sum(x[g])) != 1:
                return False
        return True

    def violation(self, x: np.ndarray) -> float:
        x = np.asarray(x, dtype=np.int8)
        v = 0.0
        for g in self.groups:
            s = float(np.sum(x[g]))
            v += (s - 1.0) ** 2
        return float(v)


def batch_is_feasible(X: np.ndarray, cons: Optional[Constraint]) -> np.ndarray:
    if cons is None:
        return np.ones((len(X),), dtype=bool)
    return np.array([cons.is_feasible(x) for x in X], dtype=bool)


def batch_violation(X: np.ndarray, cons: Optional[Constraint]) -> np.ndarray:
    if cons is None:
        return np.zeros((len(X),), dtype=np.float64)
    return np.array([cons.violation(x) for x in X], dtype=np.float64)


qubo.py

In [33]:
from __future__ import annotations
from typing import Tuple
import numpy as np

def qubo_energy(X: np.ndarray, Q: np.ndarray, const: float = 0.0) -> np.ndarray:
    """Compute y = const + sum_{i<=j} Q[i,j] x_i x_j for a batch X.

    Assumes Q is upper-triangular (diagonal contains linear terms).
    X: (B, d) with 0/1.
    Returns (B,) float.
    """
    if X.ndim != 2:
        raise ValueError("X must be 2D")
    B, d = X.shape
    if Q.shape != (d, d):
        raise ValueError("Q shape mismatch")

    # Efficient: compute (X @ Q) elementwise times X, but must avoid double counting
    # since Q is upper-triangular for i<=j exactly.
    # We'll do explicit upper triangle multiplication for clarity in template.
    y = np.full((B,), float(const), dtype=np.float64)
    # diagonal
    y += np.sum(X * np.diag(Q)[None, :], axis=1)
    # off diagonal
    iu = np.triu_indices(d, k=1)
    if len(iu[0]) > 0:
        y += np.sum((X[:, iu[0]] * X[:, iu[1]]) * Q[iu][None, :], axis=1)
    return y


def symmetrize_upper(Q_upper: np.ndarray) -> np.ndarray:
    """Return a symmetric matrix equivalent to the upper-triangular QUBO."""
    Q = np.array(Q_upper, copy=True)
    Q = Q + np.triu(Q, 1).T
    return Q


data.py

In [34]:
from __future__ import annotations

from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple

import numpy as np


@dataclass
class DataBuffer:
    """Stores feasible (X,y) for regression and (X,label) for feasibility classification."""

    X_reg: List[np.ndarray] = field(default_factory=list)
    y_reg: List[float] = field(default_factory=list)

    X_clf: List[np.ndarray] = field(default_factory=list)
    y_clf: List[int] = field(default_factory=list)  # 1 feasible, 0 infeasible

    def add_reg(self, x: np.ndarray, y: float) -> None:
        self.X_reg.append(np.asarray(x, dtype=np.int8))
        self.y_reg.append(float(y))

    def add_clf(self, x: np.ndarray, label: int) -> None:
        self.X_clf.append(np.asarray(x, dtype=np.int8))
        self.y_clf.append(int(label))

    def add_point(self, x: np.ndarray, feasible: bool, y: Optional[float] = None) -> None:
        self.add_clf(x, 1 if feasible else 0)
        if feasible:
            if y is None:
                raise ValueError("Feasible point requires y")
            self.add_reg(x, float(y))

    def get_reg_arrays(self) -> Tuple[np.ndarray, np.ndarray]:
        if not self.X_reg:
            return np.zeros((0, 0), dtype=np.int8), np.zeros((0,), dtype=float)
        X = np.stack(self.X_reg, axis=0)
        y = np.asarray(self.y_reg, dtype=float)
        return X, y

    def get_clf_arrays(self) -> Tuple[np.ndarray, np.ndarray]:
        if not self.X_clf:
            return np.zeros((0, 0), dtype=np.int8), np.zeros((0,), dtype=np.int64)
        X = np.stack(self.X_clf, axis=0)
        y = np.asarray(self.y_clf, dtype=np.int64)
        return X, y

    def size_reg(self) -> int:
        return len(self.y_reg)

    def size_clf(self) -> int:
        return len(self.y_clf)

    def summary(self) -> Dict[str, int]:
        return {"n_reg": self.size_reg(), "n_clf": self.size_clf()}

fm.py

In [35]:
from __future__ import annotations

from dataclasses import dataclass
from typing import Dict, Optional, Tuple

import numpy as np
import torch
from torch import nn


class FactorizationMachine(nn.Module):
    """Second-order Factorization Machine for binary/categorical (one-hot) features.

    Prediction:
        y = w0 + sum_i w_i x_i + sum_{i<j} <v_i, v_j> x_i x_j

    This is naturally a QUBO when x is binary.
    """

    def __init__(self, d: int, k: int):
        super().__init__()
        self.d = int(d)
        self.k = int(k)

        self.w0 = nn.Parameter(torch.zeros(1))
        self.w = nn.Parameter(torch.zeros(d))
        self.V = nn.Parameter(torch.randn(d, k) * 0.01)

        # Optional output scaling for regression:
        self.y_mean = 0.0
        self.y_std = 1.0

    def forward(self, X: torch.Tensor) -> torch.Tensor:
        # X: (B, d), float32
        linear = self.w0 + X @ self.w
        # FM trick:
        XV = X @ self.V                # (B, k)
        XV2 = XV * XV                  # (B, k)
        X2V2 = (X * X) @ (self.V * self.V)  # (B, k)
        interactions = 0.5 * torch.sum(XV2 - X2V2, dim=1)  # (B,)
        return linear + interactions


@dataclass
class FMTrainConfig:
    k: int = 16
    lr: float = 2e-2
    weight_decay: float = 1e-4
    epochs: int = 200
    batch_size: int = 256
    patience: int = 20
    device: str = "cpu"


def _batch_iter(X: np.ndarray, y: np.ndarray, batch_size: int, rng: np.random.Generator):
    n = len(X)
    idx = np.arange(n)
    rng.shuffle(idx)
    for s in range(0, n, batch_size):
        j = idx[s : s + batch_size]
        yield X[j], y[j]


def train_fm_regression(
    X: np.ndarray,
    y: np.ndarray,
    cfg: FMTrainConfig,
    seed: int = 0,
) -> Tuple[FactorizationMachine, Dict[str, float]]:
    """Train FM with MSE loss. Returns trained model and final losses."""
    if X.ndim != 2:
        raise ValueError("X must be 2D")
    if len(X) != len(y):
        raise ValueError("X and y length mismatch")
    if len(X) < 2:
        raise ValueError("Need at least 2 training points for regression")

    rng = np.random.default_rng(seed)

    d = X.shape[1]
    model = FactorizationMachine(d=d, k=cfg.k)
    device = torch.device(cfg.device)
    model.to(device)

    # Standardize targets for stability:
    y_mean = float(np.mean(y))
    y_std = float(np.std(y) + 1e-8)
    model.y_mean = y_mean
    model.y_std = y_std
    y_s = (y - y_mean) / y_std

    X_t = torch.tensor(X, dtype=torch.float32, device=device)
    y_t = torch.tensor(y_s, dtype=torch.float32, device=device)

    opt = torch.optim.Adam(model.parameters(), lr=cfg.lr, weight_decay=cfg.weight_decay)
    loss_fn = nn.MSELoss()

    best_loss = float("inf")
    best_state = None
    bad = 0

    for epoch in range(cfg.epochs):
        model.train()
        epoch_loss = 0.0
        for xb, yb in _batch_iter(X, y_s, cfg.batch_size, rng):
            xb_t = torch.tensor(xb, dtype=torch.float32, device=device)
            yb_t = torch.tensor(yb, dtype=torch.float32, device=device)

            opt.zero_grad(set_to_none=True)
            pred = model(xb_t)
            loss = loss_fn(pred, yb_t)
            loss.backward()
            opt.step()

            epoch_loss += float(loss.item()) * len(xb)

        epoch_loss /= len(X)

        # Simple early stopping on training loss (template-friendly)
        if epoch_loss < best_loss - 1e-6:
            best_loss = epoch_loss
            best_state = {k: v.detach().cpu().clone() for k, v in model.state_dict().items()}
            bad = 0
        else:
            bad += 1
            if bad >= cfg.patience:
                break

    if best_state is not None:
        model.load_state_dict(best_state)

    return model, {"train_mse": best_loss}


def train_fm_classifier(
    X: np.ndarray,
    y01: np.ndarray,
    cfg: FMTrainConfig,
    seed: int = 0,
) -> Tuple[FactorizationMachine, Dict[str, float]]:
    """Train FM as a binary classifier with BCEWithLogits loss."""
    if X.ndim != 2:
        raise ValueError("X must be 2D")
    if len(X) != len(y01):
        raise ValueError("X and y length mismatch")
    if len(X) < 10:
        raise ValueError("Need at least 10 points for classifier (template default)")

    rng = np.random.default_rng(seed)

    d = X.shape[1]
    model = FactorizationMachine(d=d, k=cfg.k)
    device = torch.device(cfg.device)
    model.to(device)

    X_t = torch.tensor(X, dtype=torch.float32, device=device)
    y_t = torch.tensor(y01.astype(np.float32), dtype=torch.float32, device=device)

    opt = torch.optim.Adam(model.parameters(), lr=cfg.lr, weight_decay=cfg.weight_decay)
    loss_fn = nn.BCEWithLogitsLoss()

    best_loss = float("inf")
    best_state = None
    bad = 0

    for epoch in range(cfg.epochs):
        model.train()
        epoch_loss = 0.0
        for xb, yb in _batch_iter(X, y01.astype(np.float32), cfg.batch_size, rng):
            xb_t = torch.tensor(xb, dtype=torch.float32, device=device)
            yb_t = torch.tensor(yb, dtype=torch.float32, device=device)

            opt.zero_grad(set_to_none=True)
            logits = model(xb_t)
            loss = loss_fn(logits, yb_t)
            loss.backward()
            opt.step()

            epoch_loss += float(loss.item()) * len(xb)

        epoch_loss /= len(X)

        if epoch_loss < best_loss - 1e-6:
            best_loss = epoch_loss
            best_state = {k: v.detach().cpu().clone() for k, v in model.state_dict().items()}
            bad = 0
        else:
            bad += 1
            if bad >= cfg.patience:
                break

    if best_state is not None:
        model.load_state_dict(best_state)

    return model, {"train_bce": best_loss}


def fm_predict_reg(model: FactorizationMachine, X: np.ndarray) -> np.ndarray:
    """Predict regression output in the original y scale."""
    device = next(model.parameters()).device
    model.eval()
    with torch.no_grad():
        xt = torch.tensor(X, dtype=torch.float32, device=device)
        y_s = model(xt).detach().cpu().numpy()
    return model.y_mean + model.y_std * y_s


def fm_predict_proba(model: FactorizationMachine, X: np.ndarray) -> np.ndarray:
    """Predict feasibility probability (sigmoid of logits)."""
    device = next(model.parameters()).device
    model.eval()
    with torch.no_grad():
        xt = torch.tensor(X, dtype=torch.float32, device=device)
        logits = model(xt).detach().cpu().numpy()
    return 1.0 / (1.0 + np.exp(-logits))


def fm_to_qubo(model: FactorizationMachine) -> Tuple[np.ndarray, float]:
    """Convert a trained FM (regression) into an upper-triangular QUBO and constant.

    We output Q such that:
        y_hat(x) = const + sum_{i<=j} Q[i,j] x_i x_j
    for x in {0,1}^d.

    Note: linear terms are placed on the diagonal because x_i^2 = x_i for binary x.
    """
    w0 = float(model.w0.detach().cpu().numpy().reshape(()))
    w = model.w.detach().cpu().numpy().astype(np.float64)
    V = model.V.detach().cpu().numpy().astype(np.float64)

    d = len(w)
    Q = np.zeros((d, d), dtype=np.float64)

    # diagonal = linear terms
    Q[np.arange(d), np.arange(d)] = w

    # off-diagonal = dot(v_i, v_j)
    G = V @ V.T  # (d, d), where G[i,j] = <v_i, v_j>
    for i in range(d):
        for j in range(i + 1, d):
            Q[i, j] = G[i, j]

    # undo y standardization: y = mean + std * y_std
    const = model.y_mean + model.y_std * w0
    Q = model.y_std * Q

    return Q, float(const)

utils.py

In [36]:
from __future__ import annotations

import json
import os
import random
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple

import numpy as np


def set_global_seed(seed: int) -> None:
    """Seed Python + NumPy (and optionally Torch) for reproducibility."""
    random.seed(seed)
    np.random.seed(seed)
    try:
        import torch

        torch.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
    except Exception:
        pass


def ensure_dir(path: str | Path) -> Path:
    p = Path(path)
    p.mkdir(parents=True, exist_ok=True)
    return p


def save_json(path: str | Path, obj: Any) -> None:
    Path(path).write_text(json.dumps(obj, indent=2, sort_keys=True), encoding="utf-8")


def load_json(path: str | Path) -> Any:
    return json.loads(Path(path).read_text(encoding="utf-8"))


def unique_rows(X: np.ndarray) -> np.ndarray:
    """Return unique rows of a 2D numpy array (preserving order)."""
    if X.ndim != 2:
        raise ValueError("X must be 2D")
    seen = set()
    out = []
    for row in X:
        key = row.tobytes()
        if key not in seen:
            seen.add(key)
            out.append(row)
    return np.array(out, dtype=X.dtype)


def hamming_distance(a: np.ndarray, b: np.ndarray) -> int:
    return int(np.sum(a != b))


def topk_unique(
    X: np.ndarray,
    y: np.ndarray,
    k: int,
) -> Tuple[np.ndarray, np.ndarray]:
    """Pick top-k (smallest y), ensure unique rows."""
    if len(X) == 0:
        return X, y
    order = np.argsort(y)
    Xs = X[order]
    ys = y[order]
    picked = []
    picked_y = []
    seen = set()
    for xi, yi in zip(Xs, ys):
        key = xi.tobytes()
        if key in seen:
            continue
        seen.add(key)
        picked.append(xi)
        picked_y.append(float(yi))
        if len(picked) >= k:
            break
    return np.array(picked, dtype=X.dtype), np.array(picked_y, dtype=float)


def _rankdata_average_ties(x: np.ndarray) -> np.ndarray:
    """Ranks with average rank for ties (1..n)."""
    x = np.asarray(x)
    order = np.argsort(x, kind="mergesort")
    ranks = np.empty_like(order, dtype=np.float64)

    xs = x[order]
    n = len(xs)
    i = 0
    while i < n:
        j = i + 1
        while j < n and xs[j] == xs[i]:
            j += 1
        # average rank for [i, j)
        r = 0.5 * ((i + 1) + j)  # 1-indexed
        ranks[order[i:j]] = r
        i = j
    return ranks


def spearmanr_np(a: np.ndarray, b: np.ndarray) -> float:
    """Spearman rank correlation (robust to ties via average ranks)."""
    a = np.asarray(a, dtype=np.float64).reshape(-1)
    b = np.asarray(b, dtype=np.float64).reshape(-1)
    if a.size != b.size:
        raise ValueError("spearmanr_np: size mismatch")
    if a.size < 2:
        return float("nan")

    ra = _rankdata_average_ties(a)
    rb = _rankdata_average_ties(b)
    ra = ra - np.mean(ra)
    rb = rb - np.mean(rb)

    denom = float(np.sqrt(np.sum(ra * ra) * np.sum(rb * rb)) + 1e-12)
    return float(np.sum(ra * rb) / denom)


def select_diverse_topk(
    X: np.ndarray,
    y: np.ndarray,
    k: int,
    min_hamming: int = 0,
) -> Tuple[np.ndarray, np.ndarray]:
    """Pick up to k smallest-y rows, enforcing min Hamming distance greedily."""
    if len(X) == 0 or k <= 0:
        return np.zeros((0, X.shape[1] if X.ndim == 2 else 0), dtype=getattr(X, "dtype", np.int8)), np.zeros((0,), dtype=float)

    X = np.asarray(X)
    y = np.asarray(y, dtype=np.float64)
    order = np.argsort(y)
    Xs = X[order]
    ys = y[order]

    picked_X: List[np.ndarray] = []
    picked_y: List[float] = []

    for xi, yi in zip(Xs, ys):
        if not picked_X:
            picked_X.append(xi)
            picked_y.append(float(yi))
        else:
            if min_hamming > 0:
                ok = True
                for xj in picked_X:
                    if hamming_distance(xi, xj) < min_hamming:
                        ok = False
                        break
                if not ok:
                    continue
            picked_X.append(xi)
            picked_y.append(float(yi))

        if len(picked_X) >= k:
            break

    return np.array(picked_X, dtype=X.dtype), np.array(picked_y, dtype=np.float64)

surrogate.py

In [37]:
from __future__ import annotations

from dataclasses import dataclass
from typing import Optional

import numpy as np

#from .constraints import Constraint, batch_violation
#from .qubo import qubo_energy


@dataclass
class SurrogateObjective:
    """Surrogate energy used by the solver.

    E(x) = QUBO(x) + rho * violation(x) - alpha * log(p_feasible(x)+eps)

    All terms are optional except QUBO.
    """

    Q: np.ndarray
    const: float
    constraint: Optional[Constraint] = None
    rho: float = 0.0
    p_feasible: Optional[callable] = None  # function X -> prob in (0,1)
    alpha: float = 0.0
    eps: float = 1e-6

    def __call__(self, X: np.ndarray) -> np.ndarray:
        X = np.asarray(X, dtype=np.int8)
        e = qubo_energy(X, self.Q, self.const)

        if self.constraint is not None and self.rho != 0.0:
            v = batch_violation(X, self.constraint)
            e = e + float(self.rho) * v

        if self.p_feasible is not None and self.alpha != 0.0:
            p = np.clip(self.p_feasible(X), self.eps, 1.0 - self.eps)
            e = e - float(self.alpha) * np.log(p)

        return e.astype(np.float64)

Benchmarks

In [38]:
from __future__ import annotations

from dataclasses import dataclass
from typing import Dict, Optional, Tuple

import numpy as np

In [39]:
class Benchmark:
    """A black-box objective with constraints over binary x."""

    name: str

    def n_vars(self) -> int:
        raise NotImplementedError

    def constraint(self) -> Optional[Constraint]:
        raise NotImplementedError

    def oracle(self, x: np.ndarray) -> float:
        """Objective to minimize. Only meaningful on feasible points."""
        raise NotImplementedError

    def sample_feasible(self, rng: np.random.Generator, n: int) -> np.ndarray:
        """Return (n, d) feasible samples."""
        raise NotImplementedError

    def info(self) -> Dict:
        return {"name": getattr(self, "name", self.__class__.__name__), "d": self.n_vars()}


knapsack.py

In [40]:
from __future__ import annotations

from dataclasses import dataclass
from typing import Dict, Optional, Tuple

import numpy as np

#from ..constraints import Constraint, LinearInequalityConstraint


@dataclass
class KnapsackBenchmark:
    """0-1 knapsack (minimization version).

    Items i=1..d with weights w_i and values v_i.
    Constraint: sum w_i x_i <= capacity.
    Objective (minimize): - sum v_i x_i  (i.e., maximize value)
    """

    d: int
    seed: int = 0
    capacity_ratio: float = 0.35

    def __post_init__(self):
        rng = np.random.default_rng(self.seed)
        self.weights = rng.integers(low=1, high=50, size=self.d).astype(np.float64)
        self.values = rng.integers(low=1, high=100, size=self.d).astype(np.float64)
        self.capacity = float(np.sum(self.weights) * self.capacity_ratio)
        self.name = f"knapsack_d{self.d}_seed{self.seed}_cap{self.capacity_ratio:.2f}"

    def n_vars(self) -> int:
        return self.d

    def constraint(self) -> Optional[Constraint]:
        A = self.weights.reshape(1, -1)
        b = np.array([self.capacity], dtype=np.float64)
        return LinearInequalityConstraint(A=A, b=b)

    def oracle(self, x: np.ndarray) -> float:
        return -float(np.dot(self.values, x.astype(np.float64)))

    def sample_feasible(self, rng: np.random.Generator, n: int) -> np.ndarray:
        # Simple greedy+random feasible generator
        X = np.zeros((n, self.d), dtype=np.int8)
        for t in range(n):
            idx = rng.permutation(self.d)
            wsum = 0.0
            x = np.zeros(self.d, dtype=np.int8)
            for i in idx:
                if wsum + self.weights[i] <= self.capacity and rng.random() < 0.5:
                    x[i] = 1
                    wsum += self.weights[i]
            X[t] = x
        return X

    def print_results(self,x):
        W,V=0,0
        for i in range(self.d):
            W+=x[i]*self.weights[i]
            V+=x[i]*self.values[i]
        print('capacity:',self.capacity,'Weight:',W,'Value:',V)

    def get_capacity(self):
        return self.capacity

    def info(self) -> Dict:
        return {"name": self.name, "d": self.d, "seed": self.seed, "capacity": self.capacity, "capacity_ratio": self.capacity_ratio}

def get_knapsack_qubo(P:KnapsackBenchmark):
    n,Weight,Value,capacity=P.d,P.weights,P.values,P.capacity
    Q=DynamicMatrix(n)
    #add objective function
    for i in range(n):
        Q[i,i]-=Value[i]
    #Constraint: sum w_i x_i <= capacity.
    b=int(np.log2(capacity+1)+1)
    Q.increment(b)
    #(sum w_i x_i +b_i*2**i-capacity)**2
    add_equality(Q,[Weight[i] for i in range(n)]+[2**i for i in range(b)],[i for i in range(Q.n)],capacity)
    return Q.get_qubo()

maxcut_cardinality.py

In [41]:
from __future__ import annotations

from dataclasses import dataclass
from itertools import combinations
from typing import Dict, Optional, Tuple

import numpy as np

#from ..constraints import CardinalityConstraint, Constraint


@dataclass
class MaxCutCardinalityBenchmark:
    """Constrained MaxCut variant: choose exactly K vertices in a subset S.

    Decision x in {0,1}^d indicates membership in S.
    Constraint: sum(x) == K.

    Objective (minimize): negative cut weight
        f(x) = - sum_{i<j} w_ij * [x_i != x_j]
    """

    d: int
    K: int
    seed: int = 0
    weight_scale: float = 1.0

    def __post_init__(self):
        rng = np.random.default_rng(self.seed)
        W = rng.random((self.d, self.d)) * self.weight_scale
        W = np.triu(W, 1)
        W = W + W.T
        np.fill_diagonal(W, 0.0)
        self.W = W
        self.name = f"maxcut_cardinality_d{self.d}_K{self.K}_seed{self.seed}"

    def n_vars(self) -> int:
        return self.d

    def constraint(self) -> Optional[Constraint]:
        return CardinalityConstraint(K=self.K)

    def cut_weight(self, x: np.ndarray) -> float:
        x = x.astype(np.int8)
        # Compute sum_{i<j} w_ij * (x_i xor x_j)
        # xor = x_i + x_j - 2 x_i x_j for binary
        # Use vectorized formula:
        # For upper triangle: sum w_ij * (x_i + x_j - 2 x_i x_j)
        iu = np.triu_indices(self.d, 1)
        xi = x[iu[0]].astype(np.float64)
        xj = x[iu[1]].astype(np.float64)
        xor = xi + xj - 2.0 * xi * xj
        return float(np.sum(self.W[iu] * xor))

    def oracle(self, x: np.ndarray) -> float:
        # minimize negative cut weight (maximize cut)
        return -self.cut_weight(x)

    def sample_feasible(self, rng: np.random.Generator, n: int) -> np.ndarray:
        X = np.zeros((n, self.d), dtype=np.int8)
        for i in range(n):
            idx = rng.choice(self.d, size=self.K, replace=False)
            X[i, idx] = 1
        return X

    def info(self) -> Dict:
        return {"name": self.name, "d": self.d, "K": self.K, "seed": self.seed, "weight_scale": self.weight_scale}
    def print_results(self,x):
        W=0
        cnt=0
        for i in range(self.d):
            cnt+=x[i]
            for j in range(i+1,self.d):
                if x[i]==x[j]:
                    continue
                W+=self.W[i][j]
        print('Weight:',W,'Vertices Count:',cnt)
    def brute_force_optimum(self, max_d: int = 24) -> Optional[Tuple[np.ndarray, float]]:
        """Compute exact optimum by enumerating all C(d,K) subsets if small."""
        if self.d > max_d:
            return None
        best_x = None
        best_y = float("inf")
        for comb in combinations(range(self.d), self.K):
            x = np.zeros(self.d, dtype=np.int8)
            x[list(comb)] = 1
            y = self.oracle(x)
            if y < best_y:
                best_y = y
                best_x = x
        if best_x is None:
            return None
        return best_x, float(best_y)

def get_maxcut_qubo(P:MaxCutCardinalityBenchmark):
    n=P.d
    W=P.W
    K=P.K
    Q=DynamicMatrix(n)
    for i in range(n):
        for j in range(i+1,n):
            W[i][j]=1
            #-(x(i)*(1-x(j))+x(j)*(1-x(i)))*W[i][j]
            Q[i,j]+=2*W[i][j]
            Q[i,i]-=W[i][j]
            Q[j,j]-=W[i][j]
    add_equality(Q,np.ones(n),[i for i in range(n)],K)
    make_symetric(Q)
    return Q.get_qubo()

onehot_qubo.py

In [42]:
from __future__ import annotations

from dataclasses import dataclass
from typing import Dict, List, Optional, Sequence

import numpy as np

#from ..constraints import Constraint, OneHotGroupsConstraint
#from ..qubo import qubo_energy


def _build_groups(group_sizes: Sequence[int]) -> List[np.ndarray]:
    groups: List[np.ndarray] = []
    off = 0
    for s in group_sizes:
        s = int(s)
        if s <= 0:
            raise ValueError("group_sizes must be positive")
        groups.append(np.arange(off, off + s, dtype=np.int64))
        off += s
    return groups


@dataclass
class OneHotQUBOBenchmark:
    """Synthetic constrained QUBO with one-hot groups.

    Variables are partitioned into groups G1..Gm, constraint is sum_{i in Gk} x_i == 1.

    Oracle (minimize): random QUBO energy. Optionally "plant" a feasible solution by
    adding linear biases favoring one chosen index per group.
    """

    group_sizes: Sequence[int]
    seed: int = 0
    interaction_scale: float = 1.0
    planted_bias: float = 2.0  # >0 encourages planted indices (via negative linear terms)

    def __post_init__(self):
        self.groups = _build_groups(self.group_sizes)
        self.d = int(sum(int(s) for s in self.group_sizes))

        rng = np.random.default_rng(self.seed)

        # random upper-triangular Q (diagonal = linear, off-diagonal = pairwise)
        Q = rng.normal(loc=0.0, scale=float(self.interaction_scale), size=(self.d, self.d)).astype(np.float64)
        Q = np.triu(Q, 0)

        # optional planted feasible x*: pick one index per group and bias it lower
        x_star = np.zeros((self.d,), dtype=np.int8)
        for g in self.groups:
            j = int(rng.choice(g))
            x_star[j] = 1
            Q[j, j] += -abs(float(self.planted_bias))  # lower energy when x_j=1

        self.Q = Q
        self.const = 0.0
        self.x_star = x_star
        self.name = f"onehot_qubo_groups{len(self.groups)}_d{self.d}_seed{self.seed}"

    def n_vars(self) -> int:
        return self.d

    def constraint(self) -> Optional[Constraint]:
        return OneHotGroupsConstraint(groups=self.groups)

    def oracle(self, x: np.ndarray) -> float:
        x = np.asarray(x, dtype=np.int8).reshape(1, -1)
        return float(qubo_energy(x, self.Q, self.const)[0])

    def sample_feasible(self, rng: np.random.Generator, n: int) -> np.ndarray:
        X = np.zeros((int(n), self.d), dtype=np.int8)
        for t in range(int(n)):
            for g in self.groups:
                j = int(rng.choice(g))
                X[t, j] = 1
        return X

    def info(self) -> Dict:
        return {
            "name": self.name,
            "d": self.d,
            "seed": self.seed,
            "group_sizes": [int(s) for s in self.group_sizes],
            "interaction_scale": float(self.interaction_scale),
            "planted_bias": float(self.planted_bias),
        }
    def print_results(self,x):
        cnt=0
        for g in self.groups:
            s=0
            for i in g:
                s+=x[int(i)]
            if(s!=1):
                cnt+=1
        print('violated:',cnt,'/',len(self.groups))
def get_onehot_qubo(P:OneHotQUBOBenchmark):
    groups=P.groups
    n=P.d
    Q=DynamicMatrix(n)

    for g in groups:
        add_equality(Q,np.ones(len(g)),g,1)
    return Q.get_qubo()


portfolio.py

In [43]:
from __future__ import annotations
from dataclasses import dataclass
from typing import Dict, Optional
import numpy as np

#from constraints import Constraint, CardinalityConstraint


@dataclass
class PortfolioBenchmark:
    """Portfolio optimization: Select K assets maximizing return, minimizing risk with diversification."""

    d: int = 50
    K: int = 10
    seed: int = 0
    return_scale: float = 0.1
    risk_scale: float = 0.3
    diversity_weight: float = 0.1
    risk_weight: float = 5.0

    def __post_init__(self):
        rng = np.random.default_rng(self.seed)

        self.returns = rng.normal(loc=0.05, scale=self.return_scale, size=self.d)

        self.risks = rng.uniform(low=0.1, high=self.risk_scale, size=self.d)

        self.correlation = self._generate_correlation_matrix(rng, self.d)
        self.covariance = np.outer(self.risks, self.risks) * self.correlation

        self.name = f"portfolio_d{self.d}_K{self.K}_seed{self.seed}"

    def _generate_correlation_matrix(self, rng: np.random.Generator, size: int) -> np.ndarray:

        correlation = rng.uniform(low=-1, high=1, size=(size, size))
        correlation = (correlation + correlation.T) / 2
        np.fill_diagonal(correlation, 1.0)

        return correlation

    def n_vars(self) -> int:
        return self.d

    def constraint(self) -> Optional[Constraint]:
        return CardinalityConstraint(K=self.K)

    def portfolio_return(self, x: np.ndarray) -> float:
        return float(np.dot(self.returns, x.astype(np.float64)))

    def portfolio_risk(self, x: np.ndarray) -> float:
        x_float = x.astype(np.float64)
        return float(x_float.T @ self.covariance @ x_float)

    def portfolio_diversification(self, x: np.ndarray) -> float:
        n_selected = np.sum(x)
        if n_selected <= 1:
            return 0.0
        #simple 1-1/n rule as we are putting same amount of money in each asset
        return float(1.0 - 1.0 / n_selected)

    def oracle(self, x: np.ndarray) -> float:

        x = np.asarray(x, dtype=np.int8)

        ret = self.portfolio_return(x)
        risk = self.portfolio_risk(x)
        div = self.portfolio_diversification(x)
        return -ret + self.risk_weight * risk - self.diversity_weight * div

    def sample_feasible(self, rng: np.random.Generator, n: int) -> np.ndarray:
        """Generate n feasible portfolios (exactly K assets selected)."""
        if self.K > self.d or self.K < 0:
            raise ValueError(f"Cannot select K={self.K} assets from d={self.d}")

        X = np.zeros((int(n), self.d), dtype=np.int8)
        for i in range(int(n)):
            indices = select_k_from_n(self.d,self.K,rng)
            X[i, indices] = 1
        return X
    def info(self) -> Dict:
        return {
            "name": self.name,
            "d": self.d,
            "K": self.K,
            "seed": self.seed,
            "avg_return": float(np.mean(self.returns)),
            "avg_risk": float(np.mean(self.risks)),
            "return_scale": self.return_scale,
            "risk_scale": self.risk_scale,
            "diversity_weight": self.diversity_weight,
            "risk_weight": self.risk_weight,
        }

loop.py

In [44]:
from __future__ import annotations

import time
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, Optional, Tuple

import numpy as np
import pandas as pd
import yaml

#from .benchmarks import KnapsackBenchmark, MaxCutCardinalityBenchmark, OneHotQUBOBenchmark
#from .constraints import Constraint, batch_is_feasible
#from .data import DataBuffer
#from .fm import FMTrainConfig, fm_predict_proba, fm_predict_reg, fm_to_qubo, train_fm_classifier, train_fm_regression
#from .surrogate import SurrogateObjective
#from .utils import ensure_dir, save_json, set_global_seed, topk_unique, select_diverse_topk, spearmanr_np
#from .solvers import CEMSolver, ProtesSolver, RandomSolver, has_protes
#from .solvers.sa_solver import SASolver, has_sa
#from .solvers.exact_enum_solver import ExactEnumSolver
#from .solvers.tabu_solver import TabuSolver, has_tabu
#from .solvers.qbsolv_solver import QBSolvSolver, QBSolvConfig, has_qbsolv


def build_benchmark(cfg: Dict[str, Any]):
    kind = cfg["kind"].lower()
    if kind == "maxcut_cardinality":
        return MaxCutCardinalityBenchmark(
            d=int(cfg.get("d", 60)),
            K=int(cfg.get("K", 20)),
            seed=int(cfg.get("seed", 0)),
            weight_scale=float(cfg.get("weight_scale", 1.0)),
        )
    if kind == "knapsack":
        return KnapsackBenchmark(
            d=int(cfg.get("d", 200)),
            seed=int(cfg.get("seed", 0)),
            capacity_ratio=float(cfg.get("capacity_ratio", 0.35)),
        )
    if kind == "onehot_qubo":
        return OneHotQUBOBenchmark(
            group_sizes=cfg.get("group_sizes", [5, 5, 5, 5]),
            seed=int(cfg.get("seed", 0)),
            interaction_scale=float(cfg.get("interaction_scale", 1.0)),
            planted_bias=float(cfg.get("planted_bias", 2.0)),
        )
    if kind == "portfolio":
        return PortfolioBenchmark(
            d=int(cfg.get("d", 50)),
            K=int(cfg.get("K", 10)),
            seed=int(cfg.get("seed", 0)),
            return_scale=float(cfg.get("return_scale", 0.1)),
            risk_scale=float(cfg.get("risk_scale", 0.3)),
            diversity_weight=float(cfg.get("diversity_weight", 0.1)),
            risk_weight=float(cfg.get("risk_weight", 5.0)),
        )
    raise ValueError(f"Unknown benchmark kind: {kind}")


def build_solver(cfg: Dict[str, Any], *, bench=None, cons=None):
    kind = cfg["kind"].lower()

    if kind == "protes":
        if not has_protes():
            print("[warn] solver.kind=protes but protes is not installed; falling back to CEM")
            kind = "cem"
        else:
            return ProtesSolver(
                batch_size=int(cfg.get("batch_size", 256)),
                elite_size=int(cfg.get("elite_size", 20)),
                k_gd=int(cfg.get("k_gd", 1)),
                lr=float(cfg.get("lr", 5e-2)),
                r=int(cfg.get("r", 5)),
                log=bool(cfg.get("log", False)),
            )

    if kind == "cem":
        return CEMSolver(
            batch_size=int(cfg.get("batch_size", 512)),
            elite_frac=float(cfg.get("elite_frac", 0.1)),
            n_iters=int(cfg.get("n_iters", 50)),
            lr=float(cfg.get("lr", 0.7)),
            init_p=float(cfg.get("init_p", 0.5)),
        )

    if kind == "random":
        return RandomSolver(p_one=float(cfg.get("p_one", 0.5)))

    if kind == "exact_enum":
        return ExactEnumSolver(
            max_d=int(cfg.get("max_d", 24)),
            batch_eval=int(cfg.get("batch_eval", 8192)),
        )

    if kind == "sa":
        if not has_sa():
            raise RuntimeError("solver.kind=sa requires 'dimod' and 'dwave-neal'. Install: pip install dimod dwave-neal")
        return SASolver(
            num_sweeps=int(cfg.get("num_sweeps", 2000)),
            beta_range=tuple(cfg["beta_range"]) if "beta_range" in cfg and cfg["beta_range"] is not None else None,
        )

    if kind == "tabu":
        if not has_tabu():
            raise RuntimeError("solver.kind=tabu requires 'dimod' and 'dwave-tabu'. Install: pip install dimod dwave-tabu")
        return TabuSolver(
            timeout=int(cfg.get("timeout", 1000)),
            tenure=int(cfg["tenure"]) if "tenure" in cfg and cfg["tenure"] is not None else None,
        )

    if kind == "qbsolv":
        if not has_qbsolv():
            raise RuntimeError("solver.kind=qbsolv requires 'dimod' and 'dwave-samplers'. Install: pip install dimod dwave-samplers")
        qbcfg = QBSolvConfig(
            subproblem_size=int(cfg.get("subproblem_size", 400)),
            max_outer_iters=int(cfg.get("max_outer_iters", 50)),
            max_no_improve=int(cfg.get("max_no_improve", 10)),
            tol=float(cfg.get("tol", 0.0)),
            sub_num_reads=int(cfg.get("sub_num_reads", 200)),
            sub_num_sweeps=int(cfg.get("sub_num_sweeps", 1000)),
            sub_beta_range=tuple(cfg["sub_beta_range"]) if cfg.get("sub_beta_range") is not None else None,
            polish_with_steepest_descent=bool(cfg.get("polish_with_steepest_descent", True)),
            candidates_per_subproblem=int(cfg.get("candidates_per_subproblem", 25)),
        )
        return QBSolvSolver(cfg=qbcfg)

    raise ValueError(f"Unknown solver kind: {kind}")


def init_dataset(
    bench,
    cons: Optional[Constraint],
    cfg: Dict[str, Any],
    rng: np.random.Generator,
) -> DataBuffer:
    data = DataBuffer()

    mode = cfg.get("mode", "generate").lower()
    if mode == "generate":
        n0 = int(cfg.get("n_feasible", 200))
        X0 = bench.sample_feasible(rng, n0)
        #comm
        for x in X0:
            y = bench.oracle(x)
            data.add_point(x, feasible=True, y=y)

        # Add some infeasible samples for the classifier (optional but recommended)
        n_neg = int(cfg.get("n_infeasible", 250))#EDIT
        d = bench.n_vars()
        for _ in range(n_neg):
            x = (rng.random(d) < 0.5).astype(np.int8)
            feasible = True if cons is None else cons.is_feasible(x)
            data.add_clf(x, 1 if feasible else 0)

    elif mode == "load_npz":
        path = Path(cfg["path"])
        arr = np.load(path)
        X = arr["X"].astype(np.int8)
        y = arr["y"].astype(np.float64)
        if X.ndim != 2:
            raise ValueError("X in npz must be 2D")
        if len(X) != len(y):
            raise ValueError("X and y sizes mismatch in npz")
        for xi, yi in zip(X, y):
            # assume loaded points are feasible
            data.add_point(xi, feasible=True, y=float(yi))

        # Optionally add negatives
        n_neg = int(cfg.get("n_infeasible", 0))
        d = X.shape[1]
        for _ in range(n_neg):
            x = (rng.random(d) < 0.5).astype(np.int8)
            feasible = True if cons is None else cons.is_feasible(x)
            data.add_clf(x, 1 if feasible else 0)

    else:
        raise ValueError(f"Unknown init_dataset.mode: {mode}")

    return data


def run_experiment(config: Dict[str, Any], out_dir: str | Path) -> Path:
    out_dir = ensure_dir(out_dir)

    seed = int(config.get("seed", 0))
    set_global_seed(seed)
    rng = np.random.default_rng(seed)

    print("building benchmark")
    bench = build_benchmark(config["benchmark"])

    print("building constraints")
    cons = bench.constraint()

    # Save config + benchmark info
    print("save benchmark info to",out_dir,"/benchmark.json")
    save_json(out_dir / "benchmark.json", bench.info())

    # Initial data
    print("init dataset")
    data = init_dataset(bench, cons, config.get("init_dataset", {}), rng)

    d = bench.n_vars()

    # Training configs
    fm_reg_cfg = FMTrainConfig(**config.get("fm_reg", {}))
    fm_clf_cfg = FMTrainConfig(**config.get("fm_clf", {}))

    # Loop configs
    n_iters = int(config.get("n_iters", 30))
    top_k = int(config.get("top_k", 20))
    solver_budget = int(config.get("solver_budget", 5000))
    candidate_pool_k = int(config.get("candidate_pool_k", 500))  # top unique from solver pool

    rho = float(config.get("penalty", {}).get("rho", 0.0))
    penalty_cfg = config.get("penalty", {})
    rho_adapt = bool(penalty_cfg.get("adaptive", False))
    rho_target = float(penalty_cfg.get("target_feasible_rate", 0.3))
    rho_grow = float(penalty_cfg.get("grow", 2.0))
    rho_shrink = float(penalty_cfg.get("shrink", 0.9))
    rho_min = float(penalty_cfg.get("min_rho", 0.0))
    rho_max = float(penalty_cfg.get("max_rho", 1e9))

    alpha = float(config.get("feasibility_term", {}).get("alpha", 0.0))
    use_clf = bool(config.get("feasibility_term", {}).get("enabled", False))
    min_clf_points = int(config.get("feasibility_term", {}).get("min_points", 200))

    cand_cfg = config.get("candidate_selection", {})
    min_hamming = int(cand_cfg.get("min_hamming", 0))

    print("building solver:",config["solver"])
    solver = build_solver(config["solver"], bench=bench, cons=cons)

    # Tracking best
    best_y = float("inf")
    best_x = None
    oracle_calls = 0

    rows = []
    t00=time.time()
    for it in range(n_iters):
        print("iteration:",it,"/",n_iters)
        t0 = time.time()

        # --- Train FM regression (feasible only)
        X_reg, y_reg = data.get_reg_arrays()
        if X_reg.shape[0] < 2:
            raise RuntimeError("Not enough feasible data to train regression FM")
        print("training FM!")
        fm_reg, reg_info = train_fm_regression(
            X_reg, y_reg, fm_reg_cfg, seed=seed + 1000 + it
        )
        print("getting qubo!")
        Q, const = fm_to_qubo(fm_reg)

        # Surrogate quality (cheap diagnostics on training set)
        try:
            y_hat = fm_predict_reg(fm_reg, X_reg).astype(np.float64)
            ss_res = float(np.sum((y_reg - y_hat) ** 2))
            ss_tot = float(np.sum((y_reg - float(np.mean(y_reg))) ** 2) + 1e-12)
            reg_info = {
                **reg_info,
                "surrogate_r2_train": float(1.0 - ss_res / ss_tot),
                "surrogate_spearman_train": float(spearmanr_np(y_reg, y_hat)),
            }
        except Exception:
            pass

        # --- Train feasibility classifier (optional)
        p_feasible_fn = None
        clf_info = {}
        if use_clf and data.size_clf() >= min_clf_points:
            X_clf, y_clf = data.get_clf_arrays()
            fm_clf, clf_info = train_fm_classifier(
                X_clf, y_clf, fm_clf_cfg, seed=seed + 2000 + it
            )
            p_feasible_fn = lambda X: fm_predict_proba(fm_clf, X)

        # --- Build surrogate objective for solver
        surrogate = SurrogateObjective(
            Q=Q,
            const=const,
            constraint=cons,
            rho=rho,
            p_feasible=p_feasible_fn,
            alpha=alpha,
        )

        # --- Solve surrogate with PROTES (or chosen solver)
        print("solving...!")
        result = solver.solve(
            objective=surrogate,
            d=d,
            budget=solver_budget,
            pool_size=candidate_pool_k,
            seed=seed + 3000 + it,
        )

        # Choose top candidates from evaluated pool (unique)
        X_pool, y_pool = result.X_pool, result.y_pool

        if len(X_pool) == 0:
            # fallback: at least evaluate returned best
            X_pool = result.X_best.reshape(1, -1)
            y_pool = surrogate(X_pool)

        X_top, y_top = topk_unique(X_pool, y_pool, k=candidate_pool_k)

        # Diverse selection for oracle queries (optional)
        if min_hamming > 0:
            X_query, _ = select_diverse_topk(X_top, y_top, k=top_k, min_hamming=min_hamming)
        else:
            X_query = X_top[:top_k] if len(X_top) >= top_k else X_top

        # --- Query oracle (feasible only), always log feasibility
        feas_mask = batch_is_feasible(X_query, cons)
        n_feas = int(np.sum(feas_mask))
        n_infeas = int(len(X_query) - n_feas)
        feas_rate = float(n_feas / max(1, len(X_query)))

        # Optional adaptive penalty update (based on observed feasibility)
        if rho_adapt and cons is not None:
            if feas_rate < rho_target:
                rho = min(rho_max, max(rho_min, rho * rho_grow))
            else:
                rho = min(rho_max, max(rho_min, rho * rho_shrink))

        # Add to classifier dataset
        for x, ok in zip(X_query, feas_mask):
            data.add_clf(x, 1 if bool(ok) else 0)

        print("evaluating oracles!")
        # Evaluate oracle on feasible only
        y_oracle_new = []
        for x, ok in zip(X_query, feas_mask):
            if not bool(ok):
                #print(sum(x[0:60]),sum(x[60:160]),sum(x[160:170]),sum(x))
                continue

            y = bench.oracle(x)
            oracle_calls += 1
            data.add_reg(x, y)
            y_oracle_new.append(float(y))
            #print('got this!')
            if y < best_y:
                best_y = float(y)
                best_x = x.copy()

        dt = time.time() - t0

        row = {
            "iter": it,
            "n_reg": data.size_reg(),
            "n_clf": data.size_clf(),
            "oracle_calls": oracle_calls,
            "query_k": int(len(X_query)),
            "query_feasible": n_feas,
            "query_infeasible": n_infeas,
            "query_feasible_rate": float(feas_rate),
            "rho": float(rho),
            "best_y": best_y,
            "time_sec": dt,
            **reg_info,
            **clf_info,
            **result.info,
        }
        rows.append(row)

        print(
            f"[iter {it:03d}] best_y={best_y:.6g} | oracle_calls={oracle_calls} | "
            f"feasible {n_feas}/{len(X_query)} | reg_n={data.size_reg()} | {solver.name}"
        )
    print('total time taken by fm_protes is:',time.time()-t00)
    print('save outputs!')
    # Save outputs
    hist = pd.DataFrame(rows)
    hist.to_csv(out_dir / "history.csv", index=False)

    if best_x is None:
        best_x = np.zeros((d,), dtype=np.int8)

    save_json(out_dir / "best.json", {"best_y": best_y, "best_x": best_x.tolist(), "oracle_calls": oracle_calls})
    # save config copy as yaml
    (out_dir / "config.yaml").write_text(yaml.safe_dump(config, sort_keys=False), encoding="utf-8")
    print('loop ans')
    print(best_x.tolist())
    x=best_x.tolist()
    bench.print_results(x)
    return out_dir,bench


QuboSolvers

In [45]:
import numpy as np
import time
#https://github.com/dwavesystems/dwave-samplers
from dwave.samplers import SimulatedAnnealingSampler
def DWaveQuboSolver(Q,num_reads=10000):
    t0=time.time()
    sampler = SimulatedAnnealingSampler()
    sampleset = sampler.sample_qubo(Q,num_of_samples=num_reads)
    print('total time taken:',time.time()-t0)
    return sampleset

**Solvers**

In [46]:
from __future__ import annotations

from dataclasses import dataclass
from typing import Callable, Dict, Optional

import numpy as np


ObjectiveFn = Callable[[np.ndarray], np.ndarray]

@dataclass
class SolverResult:
    X_pool: np.ndarray          # evaluated candidates (may include infeasible)
    y_pool: np.ndarray          # surrogate energies for X_pool
    X_best: np.ndarray          # best candidate (surrogate)
    y_best: float
    info: Dict[str, float]


class Solver:
    name: str = "base"

    def solve(
        self,
        objective: ObjectiveFn,
        d: int,
        budget: int,
        pool_size: int,
        seed: int,
    ) -> SolverResult:
        raise NotImplementedError

cem_solver.py

In [47]:
from __future__ import annotations

from dataclasses import dataclass

import numpy as np

#from .base import ObjectiveFn, Solver, SolverResult
#from ..utils import topk_unique


@dataclass
class CEMSolver(Solver):
    """Cross-Entropy Method baseline (product Bernoulli distribution).

    This is NOT TT-based, but is a strong simple baseline and a good fallback
    when PROTES is not installed.
    """

    name: str = "cem"
    batch_size: int = 512
    elite_frac: float = 0.1
    n_iters: int = 50
    lr: float = 0.7
    init_p: float = 0.5
    min_p: float = 0.01
    max_p: float = 0.99

    def solve(self, objective: ObjectiveFn, d: int, budget: int, pool_size: int, seed: int) -> SolverResult:
        rng = np.random.default_rng(seed)
        p = np.full((d,), self.init_p, dtype=np.float64)

        # bounded pool (best unique)
        X_keep = np.zeros((0, d), dtype=np.int8)
        y_keep = np.zeros((0,), dtype=np.float64)

        evals = 0
        it = 0
        X_best = np.zeros((d,), dtype=np.int8)
        y_best = float("inf")

        while evals < budget and it < self.n_iters:
            B = min(self.batch_size, budget - evals)
            X = (rng.random((B, d)) < p[None, :]).astype(np.int8)
            y = objective(X).astype(np.float64)
            evals += int(B)

            # track best over all evaluations (even if pool is pruned later)
            j = int(np.argmin(y)) if len(y) else 0
            if len(y) and float(y[j]) < y_best:
                y_best = float(y[j])
                X_best = X[j].copy()

            # merge into pool + prune
            if len(X_keep) == 0:
                X_keep, y_keep = X, y
            else:
                X_keep = np.concatenate([X_keep, X], axis=0)
                y_keep = np.concatenate([y_keep, y], axis=0)

            if pool_size is not None and int(pool_size) > 0:
                cap = int(pool_size)
                if len(X_keep) > 2 * cap:
                    X_keep, y_keep = topk_unique(X_keep, y_keep, k=cap)

            # elite update
            elite_n = max(1, int(self.elite_frac * B))
            elite_idx = np.argsort(y)[:elite_n]
            elite = X[elite_idx]
            p_new = np.mean(elite, axis=0)
            p = (1.0 - self.lr) * p + self.lr * p_new
            p = np.clip(p, self.min_p, self.max_p)

            it += 1

        # final prune
        if pool_size is not None and int(pool_size) > 0 and len(X_keep) > int(pool_size):
            X_pool, y_pool = topk_unique(X_keep, y_keep, k=int(pool_size))
        else:
            X_pool, y_pool = X_keep, y_keep

        return SolverResult(
            X_pool=X_pool,
            y_pool=y_pool,
            X_best=X_best,
            y_best=float(y_best),
            info={"evaluations": float(evals), "iters": float(it)},
        )


exact_enum_solver.py

In [48]:
from __future__ import annotations

from dataclasses import dataclass

import numpy as np

#from .base import ObjectiveFn, Solver, SolverResult
#from ..utils import topk_unique


def _enumerate_binary(d: int) -> np.ndarray:
    """Return all binary vectors of length d as (2^d, d) uint8."""
    n = 1 << int(d)
    I = np.arange(n, dtype=np.uint32)[:, None]
    bits = (I >> np.arange(d, dtype=np.uint32)[None, :]) & 1
    return bits.astype(np.int8)


@dataclass
class ExactEnumSolver(Solver):
    """Exact solver by full enumeration (validation / debugging only).

    Evaluates all 2^d candidates; only feasible for small d.
    """

    name: str = "exact_enum"
    max_d: int = 24
    batch_eval: int = 8192  # objective batch size to avoid huge temporary allocations

    def solve(self, objective: ObjectiveFn, d: int, budget: int, pool_size: int, seed: int) -> SolverResult:
        d = int(d)
        if d > int(self.max_d):
            raise RuntimeError(f"ExactEnumSolver: d={d} exceeds max_d={self.max_d} (2^d too large).")

        X_all = _enumerate_binary(d)
        n = int(len(X_all))

        # budget is ignored by design (this is exact); report true eval count
        y_all = np.empty((n,), dtype=np.float64)
        for s in range(0, n, int(self.batch_eval)):
            xb = X_all[s : s + int(self.batch_eval)]
            y_all[s : s + len(xb)] = objective(xb).astype(np.float64)

        idx = int(np.argmin(y_all)) if n else 0
        X_best = X_all[idx].copy() if n else np.zeros((d,), dtype=np.int8)
        y_best = float(y_all[idx]) if n else float("inf")

        if pool_size is not None and int(pool_size) > 0 and n > int(pool_size):
            X_pool, y_pool = topk_unique(X_all, y_all, k=int(pool_size))
        else:
            X_pool, y_pool = X_all, y_all

        return SolverResult(
            X_pool=X_pool,
            y_pool=y_pool,
            X_best=X_best,
            y_best=y_best,
            info={"evaluations": float(n), "ignored_budget": float(budget)},
        )


protes_solver.py

In [49]:
from __future__ import annotations

from dataclasses import dataclass
from typing import Any, Dict, List

import numpy as np

#from .base import ObjectiveFn, Solver, SolverResult
#from ..utils import topk_unique

try:
    from protes import protes as _protes
    _HAS_PROTES = True
except Exception:
    _HAS_PROTES = False
    _protes = None


def has_protes() -> bool:
    return _HAS_PROTES


@dataclass
class ProtesSolver(Solver):
    """PROTES solver wrapper.

    Requires:
        pip install protes==0.3.12

    API reference:
        https://pypi.org/project/protes/
        https://github.com/anabatsh/PROTES
    """

    name: str = "protes"
    batch_size: int = 256   # k
    elite_size: int = 20    # k_top
    k_gd: int = 1
    lr: float = 5e-2
    r: int = 5
    log: bool = False

    def solve(self, objective: ObjectiveFn, d: int, budget: int, pool_size: int, seed: int) -> SolverResult:
        if not _HAS_PROTES:
            raise RuntimeError(
                "PROTES is not installed. Install with: pip install protes==0.3.12 "
                "(or switch solver.kind to 'cem')."
            )

        info: Dict[str, Any] = {}

        # Keep a bounded pool (best unique) to avoid storing all samples when budgets get large.
        X_keep = np.zeros((0, d), dtype=np.int8)
        y_keep = np.zeros((0,), dtype=np.float64)
        evals_total = 0

        def f_batch(I):
            nonlocal X_keep, y_keep, evals_total
            X = np.array(I, dtype=np.int8)
            y = objective(X).astype(np.float64)
            evals_total += int(len(X))

            if pool_size is not None and int(pool_size) > 0:
                # merge + prune (keep a bit more before pruning to amortize)
                if len(X_keep) == 0:
                    X_keep, y_keep = X, y
                else:
                    X_keep = np.concatenate([X_keep, X], axis=0)
                    y_keep = np.concatenate([y_keep, y], axis=0)

                cap = int(pool_size)
                if len(X_keep) > 2 * cap:
                    X_keep, y_keep = topk_unique(X_keep, y_keep, k=cap)
            else:
                # store everything (original behavior)
                X_keep = np.concatenate([X_keep, X], axis=0) if len(X_keep) else X
                y_keep = np.concatenate([y_keep, y], axis=0) if len(y_keep) else y

            return y

        i_opt, y_opt = _protes(
            f=f_batch,
            d=int(d),
            n=2,
            m=int(budget),
            k=int(self.batch_size),
            k_top=int(self.elite_size),
            k_gd=int(self.k_gd),
            lr=float(self.lr),
            r=int(self.r),
            seed=int(seed),
            log=bool(self.log),
            info=info,
        )

        # Final prune to pool_size (if requested)
        if pool_size is not None and int(pool_size) > 0 and len(X_keep) > int(pool_size):
            X_pool, y_pool = topk_unique(X_keep, y_keep, k=int(pool_size))
        else:
            X_pool, y_pool = X_keep, y_keep

        X_best = np.array(i_opt, dtype=np.int8).reshape((d,))
        y_best = float(y_opt)

        out_info = {"evaluations": float(evals_total), "returned_y": float(y_opt)}
        for k, v in info.items():
            if isinstance(v, (int, float, np.number)):
                out_info[f"protes_{k}"] = float(v)

        return SolverResult(
            X_pool=X_pool,
            y_pool=y_pool,
            X_best=X_best,
            y_best=y_best,
            info=out_info,
        )


qbsolv_solver.py

In [50]:
from __future__ import annotations

from dataclasses import dataclass
from typing import Dict, Optional, Tuple

import numpy as np

#from .base import ObjectiveFn, Solver, SolverResult
#from ..constraints import CardinalityConstraint
#from ..utils import topk_unique

try:
    import dimod  # noqa: F401
    from dwave.samplers import SimulatedAnnealingSampler, SteepestDescentSolver
    _HAS_QBSOLV = True
except Exception:
    _HAS_QBSOLV = False
    SimulatedAnnealingSampler = None  # type: ignore
    SteepestDescentSolver = None  # type: ignore


def has_qbsolv() -> bool:
    return _HAS_QBSOLV


def _qubo_from_upper(Q: np.ndarray) -> Dict[Tuple[int, int], float]:
    Q = np.asarray(Q, dtype=np.float64)
    d = int(Q.shape[0])
    qubo: Dict[Tuple[int, int], float] = {}
    for i in range(d):
        v = float(Q[i, i])
        if v:
            qubo[(i, i)] = v
    for i in range(d):
        for j in range(i + 1, d):
            v = float(Q[i, j])
            if v:
                qubo[(i, j)] = v
    return qubo


def _add_cardinality_penalty_inplace(
    qubo: Dict[Tuple[int, int], float],
    *,
    d: int,
    K: int,
    rho: float,
) -> float:
    # rho*(sum x - K)^2, where (sum x)^2 = sum x_i + 2*sum_{i<j} x_i x_j (binary)
    lin = float(rho) * (1.0 - 2.0 * float(K))
    for i in range(int(d)):
        qubo[(i, i)] = float(qubo.get((i, i), 0.0) + lin)

    quad = 2.0 * float(rho)
    if quad:
        for i in range(int(d)):
            for j in range(i + 1, int(d)):
                qubo[(i, j)] = float(qubo.get((i, j), 0.0) + quad)

    return float(rho) * float(K) * float(K)


@dataclass(frozen=True)
class QBSolvConfig:
    subproblem_size: int = 400
    max_outer_iters: int = 50
    max_no_improve: int = 10
    tol: float = 0.0

    # subQUBO optimizer = Simulated Annealing
    sub_num_reads: int = 200
    sub_num_sweeps: int = 1000
    sub_beta_range: Optional[Tuple[float, float]] = None

    # Optional polish
    polish_with_steepest_descent: bool = True

    # How many candidate full solutions to keep per subproblem (limits memory)
    candidates_per_subproblem: int = 25


@dataclass
class QBSolvSolver(Solver):
    """QBSOLV-style decomposition baseline (NO tabu), subsolver = SA.

    Works on a QUBO/BQM; in this repo it supports:
      - pure FM QUBO
      - + cardinality penalty rho*(sum-K)^2
    Not supported:
      - feasibility classifier term (-alpha*log p)
      - non-quadratic violations (e.g., knapsack hinge)
    """

    name: str = "qbsolv"
    cfg: QBSolvConfig = QBSolvConfig()

    def solve(self, objective: ObjectiveFn, d: int, budget: int, pool_size: int, seed: int) -> SolverResult:
        if not _HAS_QBSOLV:
            raise RuntimeError("QBSolvSolver requires 'dimod' and 'dwave-samplers'. Install: pip install dimod dwave-samplers")

        # Expect SurrogateObjective-like object
        Q = getattr(objective, "Q", None)
        const = float(getattr(objective, "const", 0.0))
        constraint = getattr(objective, "constraint", None)
        rho = float(getattr(objective, "rho", 0.0))
        alpha = float(getattr(objective, "alpha", 0.0))
        p_feasible = getattr(objective, "p_feasible", None)

        if Q is None:
            raise ValueError("QBSolvSolver requires objective.Q (expected SurrogateObjective).")
        if p_feasible is not None and alpha != 0.0:
            raise RuntimeError("QBSolvSolver cannot include -alpha*log(p_feasible); disable feasibility_term for qbsolv.")
        if constraint is not None and rho != 0.0 and not isinstance(constraint, CardinalityConstraint):
            raise RuntimeError(
                "QBSolvSolver only supports quadratic penalties in this repo. "
                "Supported: CardinalityConstraint with rho*(sum-K)^2."
            )

        import dimod  # local import

        qubo = _qubo_from_upper(np.asarray(Q, dtype=np.float64))
        offset = float(const)
        if isinstance(constraint, CardinalityConstraint) and rho != 0.0:
            offset += _add_cardinality_penalty_inplace(qubo, d=int(d), K=int(constraint.K), rho=float(rho))

        bqm = dimod.BinaryQuadraticModel.from_qubo(qubo, offset=offset)

        rng = np.random.default_rng(int(seed))
        n = int(d)

        # caches for fast energy + impact ordering
        linear = np.zeros(n, dtype=np.float64)
        for v, bias in bqm.linear.items():
            linear[int(v)] = float(bias)

        adj: list[list[tuple[int, float]]] = [[] for _ in range(n)]
        for (u, v), w in bqm.quadratic.items():
            iu = int(u); iv = int(v)
            ww = float(w)
            adj[iu].append((iv, ww))
            adj[iv].append((iu, ww))

        def energy_full(x: np.ndarray) -> float:
            e = float(bqm.offset) + float(np.dot(linear, x.astype(np.float64)))
            for i in range(n):
                xi = int(x[i])
                if not xi:
                    continue
                for j, w in adj[i]:
                    if j > i and x[j]:
                        e += float(w)
            return float(e)

        def order_by_impact(x: np.ndarray) -> np.ndarray:
            field = linear.copy()
            # field_i = h_i + sum_j J_ij x_j
            for i in range(n):
                if not x[i]:
                    continue
                for j, w in adj[i]:
                    field[j] += float(w)
            delta = (1.0 - 2.0 * x.astype(np.float64)) * field
            return np.argsort(-np.abs(delta))

        def decompose_subbqm(subset: np.ndarray, xbest_fixed: np.ndarray) -> "dimod.BinaryQuadraticModel":
            in_subset = np.zeros(n, dtype=bool)
            in_subset[subset] = True

            lin: Dict[int, float] = {}
            quad: Dict[Tuple[int, int], float] = {}

            for iu in subset.astype(int):
                bias_u = float(linear[iu])
                for iv, w in adj[iu]:
                    if in_subset[iv]:
                        if iu < iv:
                            quad[(iu, iv)] = quad.get((iu, iv), 0.0) + float(w)
                    else:
                        if xbest_fixed[iv]:
                            bias_u += float(w)
                lin[iu] = bias_u

            return dimod.BinaryQuadraticModel(lin, quad, 0.0, dimod.BINARY)

        sa = SimulatedAnnealingSampler()
        sd = SteepestDescentSolver() if bool(self.cfg.polish_with_steepest_descent) else None

        # Initialize Xbest
        Xbest = rng.integers(0, 2, size=n, dtype=np.int8)
        best_energy = energy_full(Xbest)

        index = order_by_impact(Xbest)

        X_pool = np.zeros((0, n), dtype=np.int8)
        y_pool = np.zeros((0,), dtype=np.float64)

        no_improve = 0
        outer_done = 0

        for outer in range(1, int(self.cfg.max_outer_iters) + 1):
            outer_done = outer
            X = Xbest.copy()
            E = float(best_energy)

            for start in range(0, n, int(self.cfg.subproblem_size)):
                subset = index[start : start + int(self.cfg.subproblem_size)]
                if subset.size == 0:
                    continue

                sub_bqm = decompose_subbqm(subset, Xbest)

                # seed initial state from Xbest restricted to sub vars
                init = {int(v): int(Xbest[int(v)]) for v in sub_bqm.variables}
                init_ss = dimod.SampleSet.from_samples(init, vartype=dimod.BINARY, energy=0.0)

                sa_kwargs = dict(
                    num_reads=int(self.cfg.sub_num_reads),
                    num_sweeps=int(self.cfg.sub_num_sweeps),
                    seed=int(seed),
                    initial_states=init_ss,
                    initial_states_generator="tile",
                )
                if self.cfg.sub_beta_range is not None:
                    sa_kwargs["beta_range"] = list(self.cfg.sub_beta_range)

                ss = sa.sample(sub_bqm, **sa_kwargs)
                if sd is not None:
                    ss = sd.sample(sub_bqm, initial_states=ss)

                # take a few best candidates from this subproblem and lift to full X for the pool
                R = int(min(int(self.cfg.candidates_per_subproblem), len(ss)))
                if R > 0 and pool_size is not None and int(pool_size) > 0:
                    # ss is already sorted by energy (lowest first) for these samplers
                    for r in range(R):
                        x_cand = X.copy()
                        sub_sample = ss.record.sample[r]
                        # ss.variables is the variable order in record.sample columns
                        for col, v in enumerate(ss.variables):
                            x_cand[int(v)] = int(sub_sample[col])
                        e_cand = energy_full(x_cand)
                        X_pool = np.concatenate([X_pool, x_cand.reshape(1, -1)], axis=0)
                        y_pool = np.concatenate([y_pool, np.array([e_cand], dtype=np.float64)], axis=0)
                    # prune pool periodically
                    cap = int(pool_size)
                    if len(X_pool) > 2 * cap:
                        X_pool, y_pool = topk_unique(X_pool, y_pool, k=cap)

                # best sub-solution -> apply to current X
                best_sub = ss.first.sample
                for v, val in best_sub.items():
                    X[int(v)] = 1 if int(val) else 0
                E = energy_full(X)

            index = order_by_impact(X)

            if E < best_energy - float(self.cfg.tol):
                Xbest = X
                best_energy = float(E)
                no_improve = 0
            else:
                no_improve += 1
                if no_improve >= int(self.cfg.max_no_improve):
                    break

        # finalize pool
        if pool_size is not None and int(pool_size) > 0 and len(X_pool) > int(pool_size):
            X_pool, y_pool = topk_unique(X_pool, y_pool, k=int(pool_size))

        # ensure non-empty pool (loop expects some candidates)
        if len(X_pool) == 0:
            X_pool = Xbest.reshape(1, -1).copy()
            y_pool = np.array([float(best_energy)], dtype=np.float64)

        idx = int(np.argmin(y_pool)) if len(y_pool) else 0
        X_best = X_pool[idx].copy()
        y_best = float(y_pool[idx])

        return SolverResult(
            X_pool=X_pool,
            y_pool=y_pool,
            X_best=X_best,
            y_best=y_best,
            info={
                "evaluations": float(len(X_pool)),  # surrogate energies computed into the returned pool
                "qbsolv_outer_iters": float(outer_done),
            },
        )


random_solver.py

In [51]:
from __future__ import annotations

from dataclasses import dataclass
from typing import Callable, Optional

import numpy as np

#from .base import ObjectiveFn, Solver, SolverResult
#from ..utils import topk_unique


@dataclass
class RandomSolver(Solver):
    """Random search baseline (unconstrained sampling; feasibility handled by surrogate penalty / clf term)."""

    name: str = "random"
    p_one: float = 0.5

    def solve(self, objective: ObjectiveFn, d: int, budget: int, pool_size: int, seed: int) -> SolverResult:
        rng = np.random.default_rng(seed)
        B = int(budget)  # must respect evaluation budget
        X = (rng.random((B, d)) < self.p_one).astype(np.int8)
        y = objective(X).astype(np.float64)

        idx = int(np.argmin(y)) if len(y) else 0
        X_best = X[idx].copy() if len(y) else np.zeros((d,), dtype=np.int8)
        y_best = float(y[idx]) if len(y) else float("inf")

        # bounded pool
        if pool_size is not None and int(pool_size) > 0 and len(X) > int(pool_size):
            X_pool, y_pool = topk_unique(X, y, k=int(pool_size))
        else:
            X_pool, y_pool = X, y

        return SolverResult(
            X_pool=X_pool,
            y_pool=y_pool,
            X_best=X_best,
            y_best=y_best,
            info={"evaluations": float(B)},
        )


@dataclass
class RandomFeasibleSolver(Solver):
    """Random feasible sampling baseline (matches guide baseline: 'Random feasible sampling').

    Requires a feasible sampler (typically Benchmark.sample_feasible).
    """

    name: str = "random_feasible"
    sample_feasible: Callable[[np.random.Generator, int], np.ndarray] = None  # (rng, n) -> (n,d) feasible

    def solve(self, objective: ObjectiveFn, d: int, budget: int, pool_size: int, seed: int) -> SolverResult:
        if self.sample_feasible is None:
            raise RuntimeError("RandomFeasibleSolver requires sample_feasible(rng, n).")

        rng = np.random.default_rng(seed)
        B = int(budget)
        X = np.asarray(self.sample_feasible(rng, B), dtype=np.int8)
        if X.ndim != 2 or X.shape[1] != int(d):
            raise ValueError(f"sample_feasible must return (B,d); got {X.shape}")

        y = objective(X).astype(np.float64)

        idx = int(np.argmin(y)) if len(y) else 0
        X_best = X[idx].copy() if len(y) else np.zeros((d,), dtype=np.int8)
        y_best = float(y[idx]) if len(y) else float("inf")

        if pool_size is not None and int(pool_size) > 0 and len(X) > int(pool_size):
            X_pool, y_pool = topk_unique(X, y, k=int(pool_size))
        else:
            X_pool, y_pool = X, y

        return SolverResult(
            X_pool=X_pool,
            y_pool=y_pool,
            X_best=X_best,
            y_best=y_best,
            info={"evaluations": float(B)},
        )


sa_solver.py

In [52]:
from __future__ import annotations

from dataclasses import dataclass
from typing import Any, Dict, Optional, Tuple

import numpy as np

#from .base import ObjectiveFn, Solver, SolverResult
#from ..constraints import CardinalityConstraint
#from ..utils import topk_unique

try:
    import dimod  # noqa: F401
    import neal
    _HAS_SA = True
except Exception:
    _HAS_SA = False
    neal = None


def has_sa() -> bool:
    return _HAS_SA


def _qubo_from_upper(Q: np.ndarray) -> Dict[Tuple[int, int], float]:
    """Convert upper-triangular dense Q into a dimod QUBO dict."""
    Q = np.asarray(Q, dtype=np.float64)
    d = Q.shape[0]
    qubo: Dict[Tuple[int, int], float] = {}
    for i in range(d):
        v = float(Q[i, i])
        if v != 0.0:
            qubo[(i, i)] = v
    for i in range(d):
        for j in range(i + 1, d):
            v = float(Q[i, j])
            if v != 0.0:
                qubo[(i, j)] = v
    return qubo


def _add_cardinality_penalty_inplace(
    qubo: Dict[Tuple[int, int], float],
    *,
    d: int,
    K: int,
    rho: float,
) -> float:
    """Add rho*(sum x - K)^2 to QUBO dict in-place. Returns constant offset added."""
    # (sum x - K)^2 = (sum x)^2 - 2K sum x + K^2
    # (sum x)^2 = sum x_i + 2 * sum_{i<j} x_i x_j   for binary.
    # linear: rho*(1 - 2K) per variable
    lin = float(rho) * (1.0 - 2.0 * float(K))
    for i in range(d):
        qubo[(i, i)] = float(qubo.get((i, i), 0.0) + lin)

    # quadratic: 2*rho for each i<j
    quad = 2.0 * float(rho)
    if quad != 0.0:
        for i in range(d):
            for j in range(i + 1, d):
                qubo[(i, j)] = float(qubo.get((i, j), 0.0) + quad)

    # constant offset
    return float(rho) * float(K) * float(K)


@dataclass
class SASolver(Solver):
    """Simulated annealing baseline using dwave-neal.

    Notes:
    - Requires: dimod + dwave-neal
    - Operates on a QUBO/BQM, so it can only handle objectives that are (or can be made) quadratic.
    """

    name: str = "sa"
    num_sweeps: int = 2000
    beta_range: Optional[Tuple[float, float]] = None  # e.g., (0.1, 10.0); None lets neal choose

    def solve(self, objective: ObjectiveFn, d: int, budget: int, pool_size: int, seed: int) -> SolverResult:
        if not _HAS_SA:
            raise RuntimeError("SA solver requires 'dimod' and 'dwave-neal'. Install: pip install dimod dwave-neal")

        # Expect SurrogateObjective-like object (from this repo) so we can build a QUBO
        Q = getattr(objective, "Q", None)
        const = getattr(objective, "const", 0.0)
        constraint = getattr(objective, "constraint", None)
        rho = float(getattr(objective, "rho", 0.0))
        alpha = float(getattr(objective, "alpha", 0.0))
        p_feasible = getattr(objective, "p_feasible", None)

        if Q is None:
            raise ValueError("SASolver requires an objective with attribute 'Q' (expected SurrogateObjective).")

        if p_feasible is not None and alpha != 0.0:
            raise RuntimeError("SASolver cannot include -alpha*log(p_feasible); disable feasibility_term or use another solver.")

        qubo = _qubo_from_upper(np.asarray(Q, dtype=np.float64))
        offset = float(const)

        # Only support constraint penalty if it stays quadratic (cardinality)
        if constraint is not None and rho != 0.0:
            if isinstance(constraint, CardinalityConstraint):
                offset += _add_cardinality_penalty_inplace(qubo, d=int(d), K=int(constraint.K), rho=float(rho))
            else:
                raise RuntimeError(
                    "SASolver only supports penalty constraints that are quadratic. "
                    "Supported: CardinalityConstraint with rho*(sum-K)^2. "
                    "Use CEM/PROTES for non-quadratic violations."
                )

        import dimod  # local import after dependency check

        bqm = dimod.BinaryQuadraticModel.from_qubo(qubo, offset=offset)

        sampler = neal.SimulatedAnnealingSampler()

        num_reads = int(max(1, budget))  # interpret budget as number of returned SA samples
        ss = sampler.sample(
            bqm,
            num_reads=num_reads,
            num_sweeps=int(self.num_sweeps),
            beta_range=self.beta_range,
            seed=int(seed),
        )

        # Variables are integer-labeled 0..d-1, so SampleSet order is deterministic.
        X = ss.record.sample.astype(np.int8, copy=False)
        y = ss.record.energy.astype(np.float64, copy=False)

        # pool pruning
        if pool_size is not None and int(pool_size) > 0 and len(X) > int(pool_size):
            X_pool, y_pool = topk_unique(X, y, k=int(pool_size))
        else:
            X_pool, y_pool = X, y

        idx = int(np.argmin(y_pool)) if len(y_pool) else 0
        X_best = X_pool[idx].copy() if len(y_pool) else np.zeros((int(d),), dtype=np.int8)
        y_best = float(y_pool[idx]) if len(y_pool) else float("inf")

        return SolverResult(
            X_pool=X_pool,
            y_pool=y_pool,
            X_best=X_best,
            y_best=y_best,
            info={
                "evaluations": float(num_reads),
                "sa_num_sweeps": float(self.num_sweeps),
            },
        )


tabu_solver.py

In [53]:
from __future__ import annotations

from dataclasses import dataclass
from typing import Dict, Optional, Tuple

import numpy as np

#from .base import ObjectiveFn, Solver, SolverResult
#from ..constraints import CardinalityConstraint
#from ..utils import topk_unique

try:
    import dimod
    from tabu import TabuSampler
    _HAS_TABU = True
except ImportError:
    _HAS_TABU = False
    TabuSampler = None


def has_tabu() -> bool:
    return _HAS_TABU


def _qubo_from_upper(Q: np.ndarray) -> Dict[Tuple[int, int], float]:
    Q = np.asarray(Q, dtype=np.float64)
    d = Q.shape[0]
    qubo: Dict[Tuple[int, int], float] = {}
    # diag
    for i in range(d):
        v = float(Q[i, i])
        if v != 0.0:
            qubo[(i, i)] = v
    # upper off-diag
    for i in range(d):
        for j in range(i + 1, d):
            v = float(Q[i, j])
            if v != 0.0:
                qubo[(i, j)] = v
    return qubo


def _add_cardinality_penalty_inplace(
    qubo: Dict[Tuple[int, int], float],
    *,
    d: int,
    K: int,
    rho: float,
) -> float:
    # rho*(sum x - K)^2 where (sum x)^2 = sum x_i + 2*sum_{i<j} x_i x_j for binary
    lin = float(rho) * (1.0 - 2.0 * float(K))
    for i in range(int(d)):
        qubo[(i, i)] = float(qubo.get((i, i), 0.0) + lin)

    quad = 2.0 * float(rho)
    if quad != 0.0:
        for i in range(int(d)):
            for j in range(i + 1, int(d)):
                qubo[(i, j)] = float(qubo.get((i, j), 0.0) + quad)

    return float(rho) * float(K) * float(K)


@dataclass
class TabuSolver(Solver):
    """Tabu search baseline using dwave-tabu.

    Notes:
    - Requires: dimod + dwave-tabu
    - QUBO-only: cannot include non-quadratic violations or -alpha*log(p_feasible).
    """

    name: str = "tabu"
    timeout: int = 1000  # milliseconds per run (dwave-tabu)
    tenure: Optional[int] = None  # optional tabu tenure; None lets sampler decide

    def solve(self, objective: ObjectiveFn, d: int, budget: int, pool_size: int, seed: int) -> SolverResult:
        if not _HAS_TABU:
            raise RuntimeError("Tabu solver requires 'dimod' and 'dwave-tabu'. Install: pip install dimod dwave-tabu")

        Q = getattr(objective, "Q", None)
        const = getattr(objective, "const", 0.0)
        constraint = getattr(objective, "constraint", None)
        rho = float(getattr(objective, "rho", 0.0))
        alpha = float(getattr(objective, "alpha", 0.0))
        p_feasible = getattr(objective, "p_feasible", None)

        if Q is None:
            raise ValueError("TabuSolver requires an objective with attribute 'Q' (expected SurrogateObjective).")

        if p_feasible is not None and alpha != 0.0:
            raise RuntimeError("TabuSolver cannot include -alpha*log(p_feasible); disable feasibility_term or use another solver.")

        qubo = _qubo_from_upper(np.asarray(Q, dtype=np.float64))
        offset = float(const)

        if constraint is not None and rho != 0.0:
            if isinstance(constraint, CardinalityConstraint):
                offset += _add_cardinality_penalty_inplace(qubo, d=int(d), K=int(constraint.K), rho=float(rho))
            else:
                raise RuntimeError(
                    "TabuSolver only supports quadratic penalty constraints. "
                    "Supported: CardinalityConstraint with rho*(sum-K)^2. "
                    "Use CEM/PROTES for non-quadratic violations."
                )

        import dimod  # local import after dependency check

        bqm = dimod.BinaryQuadraticModel.from_qubo(qubo, offset=offset)
        sampler = TabuSampler()

        num_reads = int(max(1, budget))  # interpret budget as number of returned samples
        kwargs = {"num_reads": num_reads, "timeout": int(self.timeout)}
        if self.tenure is not None:
            kwargs["tenure"] = int(self.tenure)

        ss = sampler.sample(bqm, **kwargs)

        X = ss.record.sample.astype(np.int8, copy=False)
        y = ss.record.energy.astype(np.float64, copy=False)

        if pool_size is not None and int(pool_size) > 0 and len(X) > int(pool_size):
            X_pool, y_pool = topk_unique(X, y, k=int(pool_size))
        else:
            X_pool, y_pool = X, y

        idx = int(np.argmin(y_pool)) if len(y_pool) else 0
        X_best = X_pool[idx].copy() if len(y_pool) else np.zeros((int(d),), dtype=np.int8)
        y_best = float(y_pool[idx]) if len(y_pool) else float("inf")

        return SolverResult(
            X_pool=X_pool,
            y_pool=y_pool,
            X_best=X_best,
            y_best=y_best,
            info={
                "evaluations": float(num_reads),
                "tabu_timeout_ms": float(self.timeout),
            },
        )


Scripts


aggregate_sweep.py

In [54]:
#!/usr/bin/env python
from __future__ import annotations

import argparse
from pathlib import Path

import pandas as pd
import matplotlib.pyplot as plt


def _load_one_history(out_dir: str | Path) -> pd.DataFrame:
    out_dir = Path(out_dir)
    hist_path = out_dir / "history.csv"
    if not hist_path.exists():
        raise FileNotFoundError(f"Missing history.csv at: {hist_path}")
    hist = pd.read_csv(hist_path)
    # required columns in this repo
    need = {"oracle_calls", "best_y"}
    missing = need - set(hist.columns)
    if missing:
        raise ValueError(f"{hist_path} missing columns: {sorted(missing)}")
    return hist[["oracle_calls", "best_y"]].copy()


def aggregate_sweep_main() -> None:
    ap = argparse.ArgumentParser()
    ap.add_argument("--summary", required=True, help="Path to sweep_summary_*.csv")
    ap.add_argument("--solver", default=None, help="Filter by solver kind (optional)")
    ap.add_argument("--out", default=None, help="Output CSV path (default: alongside summary)")
    ap.add_argument("--plot", action="store_true", help="Also save a PNG plot (median + IQR)")
    args = ap.parse_args()

    summary_path = Path(args.summary)
    df = pd.read_csv(summary_path)

    # expected columns from scripts/run_sweep.py
    need = {"out_dir", "seed", "solver"}
    missing = need - set(df.columns)
    if missing:
        raise ValueError(f"{summary_path} missing columns: {sorted(missing)}")

    if args.solver is not None:
        df = df[df["solver"].astype(str) == str(args.solver)].copy()

    if df.empty:
        raise ValueError("No runs matched (check --solver filter / summary file).")

    rows = []
    for _, r in df.iterrows():
        hist = _load_one_history(r["out_dir"])
        hist["seed"] = int(r["seed"])
        hist["solver"] = str(r["solver"])
        rows.append(hist)

    all_hist = pd.concat(rows, axis=0, ignore_index=True)

    # aggregate per solver (even if only one)
    agg = (
        all_hist.groupby(["solver", "oracle_calls"])["best_y"]
        .agg(
            median="median",
            p25=lambda s: s.quantile(0.25),
            p75=lambda s: s.quantile(0.75),
            n="count",
        )
        .reset_index()
        .sort_values(["solver", "oracle_calls"])
    )

    out_csv = Path(args.out) if args.out else summary_path.with_suffix("").with_name(summary_path.stem + "_agg.csv")
    agg.to_csv(out_csv, index=False)
    print(f"saved: {out_csv}")

    if args.plot:
        # one plot per solver (simple, avoids clutter)
        for solver_kind, g in agg.groupby("solver"):
            x = g["oracle_calls"].to_numpy()
            med = g["median"].to_numpy()
            p25 = g["p25"].to_numpy()
            p75 = g["p75"].to_numpy()

            plt.figure()
            plt.plot(x, med, label=f"{solver_kind} median")
            plt.fill_between(x, p25, p75, alpha=0.2, label="IQR (p25..p75)")
            plt.xlabel("oracle_calls")
            plt.ylabel("best_y (lower is better)")
            plt.title(f"Best feasible objective vs oracle calls ({solver_kind})")
            plt.grid(True)
            plt.legend()

            out_png = out_csv.with_suffix("").with_name(out_csv.stem + f"_{solver_kind}_plot.png")
            plt.savefig(out_png, dpi=150, bbox_inches="tight")
            plt.close()
            print(f"saved: {out_png}")


plot_history.py


In [55]:
#!/usr/bin/env python
from __future__ import annotations

import argparse
from pathlib import Path

import pandas as pd
import matplotlib.pyplot as plt


def plot_history_main(history_file_path:str):
    #ap = argparse.ArgumentParser()
    #ap.add_argument("--history", required=True, help="Path to history.csv")
    #ap.add_argument("--x", default="oracle_calls", choices=["iter", "oracle_calls"], help="x-axis")
    #args = ap.parse_args()

    hist = pd.read_csv(history_file_path)

    x = hist["oracle_calls"].values
    y = hist["best_y"].values

    plt.figure()
    plt.plot(x, y, marker="o")
    plt.xlabel("oracle_calls")
    plt.ylabel("best_y (lower is better)")
    plt.title("Best feasible objective over time")
    plt.grid(True)
    out = Path(history_file_path).with_suffix("").as_posix() + f"_plot_{"oracle_calls"}.png"
    plt.savefig(out, dpi=150, bbox_inches="tight")
    print(f"saved plot to: {out}")


run_sweep.py

In [56]:
#!/usr/bin/env python
from __future__ import annotations

import argparse
import copy
import datetime as dt
from pathlib import Path

import pandas as pd
import yaml

import sys
#sys.path.append(str(Path(__file__).resolve().parents[1] / "src"))

#from fm_protes.loop import run_experiment
#from fm_protes.utils import ensure_dir


def run_sweep_main():
    ap = argparse.ArgumentParser()
    ap.add_argument("--config", required=True, help="Base YAML config")
    ap.add_argument("--seeds", default="0,1,2,3,4", help="Comma-separated seeds")
    ap.add_argument("--solvers", default="protes,cem,random", help="Comma-separated solver kinds")
    ap.add_argument("--out_root", default="results", help="Output root directory")
    args = ap.parse_args()

    base_cfg = yaml.safe_load(Path(args.config).read_text(encoding="utf-8"))
    seeds = [int(s.strip()) for s in args.seeds.split(",") if s.strip()]
    solvers = [s.strip() for s in args.solvers.split(",") if s.strip()]

    out_root = Path(args.out_root)
    ensure_dir(out_root)

    runs = []
    for s in seeds:
        for solver_kind in solvers:
            cfg = copy.deepcopy(base_cfg)
            cfg["seed"] = s
            cfg["solver"]["kind"] = solver_kind
            stamp = dt.datetime.now().strftime("%Y%m%d_%H%M%S")
            run_name = f"{cfg['benchmark']['kind']}_{solver_kind}_seed{s}_{stamp}"
            out_dir = out_root / run_name
            ensure_dir(out_dir)
            run_experiment(cfg, out_dir)

            hist = pd.read_csv(out_dir / "history.csv")
            best_y = float(hist["best_y"].iloc[-1])
            oracle_calls = int(hist["oracle_calls"].iloc[-1])

            runs.append(
                {
                    "run_name": run_name,
                    "solver": solver_kind,
                    "seed": s,
                    "best_y": best_y,
                    "oracle_calls": oracle_calls,
                    "out_dir": str(out_dir),
                }
            )

    df = pd.DataFrame(runs)
    summary_path = out_root / f"sweep_summary_{dt.datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
    df.to_csv(summary_path, index=False)
    print(f"Saved sweep summary: {summary_path}")


run_experiment.py

In [57]:
#!/usr/bin/env python
from __future__ import annotations

import argparse
import datetime as dt
from pathlib import Path

import yaml

# Allow running without installing package:
import sys
#sys.path.append(str(Path(__file__).resolve().parents[1] / "src"))

#from fm_protes.loop import run_experiment
#from fm_protes.utils import ensure_dir
#from fm_protes.solvers import has_protes, has_sa, has_tabu


def run_experiment_main(config_file=None,solver_kind=None,n_iters=None,out=None):
    #ap = argparse.ArgumentParser()
    #ap.add_argument("--config", required=False, help="Path to YAML config")
    #ap.add_argument("--out", default=None, help="Output directory (default: results/<run_name>)")
    #ap.add_argument("--check_deps", action="store_true", help="Print optional dependency status and exit")

    # solver smoke-test helpers
    #ap.add_argument("--solver_kind", default=None, help="Override solver.kind (e.g. sa, tabu, exact_enum)")
    #ap.add_argument("--no_clf", action="store_true", help="Disable feasibility_term (required for sa/tabu here)")
    #ap.add_argument("--n_iters", type=int, default=None, help="Override n_iters (for multi-point histories)")

    #args = ap.parse_args()


    config="_quick.yaml"
    if config_file is not None:
        config=config_file
    print("run experimant",config)

    cfg_path = Path(config)
    cfg = yaml.safe_load(cfg_path.read_text(encoding="utf-8"))

    if solver_kind is not None:
        cfg.setdefault("solver", {})
        cfg["solver"]["kind"] = str(solver_kind).lower()

    if 0: #no_clf:
        cfg["feasibility_term"] = {"enabled": False, "alpha": 0.0, "min_points": 0}

    if n_iters is not None:
        cfg["n_iters"] = int(n_iters)

    run_name = cfg.get("run_name")
    if not run_name:
        stamp = dt.datetime.now().strftime("%Y%m%d_%H%M%S")
        run_name = f"{cfg['benchmark']['kind']}_{stamp}"

    out_dir = Path(out) if out else Path("results") / run_name
    ensure_dir(out_dir)

    print(f"[run] config={cfg_path} -> out={out_dir}")
    return run_experiment(cfg, out_dir)

Main

In [58]:
solvers=["protes", "cem", "random", "exact_enum", "sa", "tabu", "qbsolv"]
problems=["maxcut_cardinality", "knapsack", "onehot_qubo","portfolio"]

out,bench=run_experiment_main("onehot_qubo.yaml")
plot_history_main(out/"history.csv")

Q=get_onehot_qubo(bench)
ans=DWaveQuboSolver(Q)
conf = ans.record.sample[0].astype(np.int8)
E=ans.first.energy
print('qubo solver ans')
bench.print_results(conf)
#HOME


run experimant onehot_qubo.yaml
[run] config=onehot_qubo.yaml -> out=results/onehot_qubo_largegroups_hard_onehot
building benchmark
building constraints
save benchmark info to results/onehot_qubo_largegroups_hard_onehot /benchmark.json
init dataset
building solver: {'kind': 'protes', 'tt_hard_mask': True, 'batch_size': 256, 'elite_size': 20, 'k_gd': 1, 'lr': 0.05, 'r': 5, 'log': False}
iteration: 0 / 25
training FM!
getting qubo!
solving...!
evaluating oracles!
[iter 000] best_y=inf | oracle_calls=0 | feasible 0/25 | reg_n=300 | protes
iteration: 1 / 25
training FM!
getting qubo!
solving...!
evaluating oracles!
[iter 001] best_y=inf | oracle_calls=0 | feasible 0/25 | reg_n=300 | protes
iteration: 2 / 25
training FM!
getting qubo!
solving...!
evaluating oracles!
[iter 002] best_y=inf | oracle_calls=0 | feasible 0/25 | reg_n=300 | protes
iteration: 3 / 25
training FM!
getting qubo!
solving...!
evaluating oracles!
[iter 003] best_y=inf | oracle_calls=0 | feasible 0/25 | reg_n=300 | protes

KeyboardInterrupt: 