In [1]:
import numpy as np
from typing import Tuple


class SpeedReducerProblem:

    def __init__(self, enforce_integer_teeth=True):
        self.enforce_integer_teeth = enforce_integer_teeth
        self.lb = np.array([2.6, 0.7, 17.0, 7.3, 7.3, 2.9, 5.0], dtype=np.float64)
        self.ub = np.array([3.6, 0.8, 28.0, 8.3, 8.3, 3.9, 5.5], dtype=np.float64)
        self.dim = 7
        self.n_constraints = 11
        self.f_star_ref = 2996.3482

    def coerce_shape(self, X):
        X = np.asarray(X, dtype=np.float64)
        was_1d = (X.ndim == 1)
        if was_1d:
            X = X[None, :]
        assert X.shape[-1] == 7, "Expected last dimension to be 7."
        return X, was_1d

    def project_to_bounds(self, X):
        return np.clip(X, self.lb, self.ub)

    def evaluate(self, X):
        X, was_1d = self.coerce_shape(X)
        X = self.project_to_bounds(X).copy()
        if self.enforce_integer_teeth:
            X[:, 2] = np.rint(X[:, 2])
        x1, x2, x3, x4, x5, x6, x7 = [X[:, i] for i in range(7)]
        f = (
                0.7854 * x1 * x2 ** 2 * (3.3333 * x3 ** 2 + 14.9334 * x3 - 43.0934)
                - 1.508 * x1 * (x6 ** 2 + x7 ** 2)
                + 7.4777 * (x6 ** 3 + x7 ** 3)
                + 0.7854 * (x4 * x6 ** 2 + x5 * x7 ** 2)
        )
        g = np.empty((X.shape[0], 11), dtype=np.float64)
        g[:, 0] = 27.0 / (x1 * x2 ** 2 * x3) - 1.0
        g[:, 1] = 397.5 / (x1 * x2 ** 2 * x3 ** 2) - 1.0
        g[:, 2] = 1.93 * x4 ** 3 / (x2 * x3 * x6 ** 4) - 1.0
        g[:, 3] = 1.93 * x5 ** 3 / (x2 * x3 * x7 ** 4) - 1.0
        g[:, 4] = np.sqrt((745.0 * x4 / (x2 * x3)) ** 2 + 16.9e6) / (0.1 * x6 ** 3) - 1100.0
        g[:, 5] = np.sqrt((745.0 * x5 / (x2 * x3)) ** 2 + 157.5e6) / (0.1 * x7 ** 3) - 850.0
        g[:, 6] = x2 * x3 - 40.0
        g[:, 7] = 5.0 - x1 / x2
        g[:, 8] = x1 / x2 - 12.0
        g[:, 9] = 1.5 * x6 + 1.9 - x4
        g[:, 10] = 1.1 * x7 + 1.9 - x5

        if was_1d:
            return f[0], g[0]
        return f, g

    def is_feasible(self, X):
        _, g = self.evaluate(X)
        return np.all(g <= 0.0, axis=-1)

    def sample_lhs(self, n, rng=None):
        if rng is None:
            rng = np.random.default_rng()
        d = self.dim
        cut = np.linspace(0, 1, n + 1)
        a = cut[:-1]
        b = cut[1:]
        u = rng.random((n, d))
        pts = a[:, None] + (b - a)[:, None] * u
        X = np.empty_like(pts)
        for j in range(d):
            X[:, j] = pts[rng.permutation(n), j]
        X = self.lb + X * (self.ub - self.lb)
        if self.enforce_integer_teeth:
            X[:, 2] = np.rint(X[:, 2])
        return X


In [None]:
import torch
import gpytorch
import numpy as np

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

class ExactGP(gpytorch.models.ExactGP):
    def __init__(self, train_x, train_y, likelihood):
        super().__init__(train_x, train_y, likelihood)
        self.mean_module = gpytorch.means.ConstantMean()
        # ARD RBF kernel is a good default for BO
        self.covar_module = gpytorch.kernels.ScaleKernel(
            gpytorch.kernels.RBFKernel(ard_num_dims=train_x.shape[-1])
        )

    def forward(self, x):
        mean = self.mean_module(x)
        cov  = self.covar_module(x)
        return gpytorch.distributions.MultivariateNormal(mean, cov)

def fit_gp(train_x, train_y, iters=75, lr=0.1):
    """Fit a single GP with Adam on negative marginal log-likelihood."""
    train_x = train_x.to(device)
    train_y = train_y.to(device)
    likelihood = gpytorch.likelihoods.GaussianLikelihood().to(device)
    model = ExactGP(train_x, train_y, likelihood).to(device)

    model.train(); likelihood.train()
    opt = torch.optim.Adam(model.parameters(), lr=lr)
    mll = gpytorch.mlls.ExactMarginalLogLikelihood(likelihood, model)

    for _ in range(iters):
        opt.zero_grad()
        out = model(train_x)
        loss = -mll(out, train_y)
        loss.backward()
        opt.step()

    model.eval(); likelihood.eval()
    return model, likelihood

# ---- constrained Thompson sampling over candidate set ----
@torch.no_grad()
def constrained_thompson_step(model_f, models_c, lb, ub, n_candidates=1000, enforce_integer_teeth=True):
    """
    Draw one next x via constrained TS:
      1) sample f~posterior, c_i~posterior for i in constraints
      2) map feasibility by c_i(x) <= 0
      3) if any feasible: choose argmin f; else choose min total violation
    """
    d = lb.shape[0]
    # sample candidates uniformly in the box (we’ll add TR later)
    cand = torch.rand(n_candidates, d, device=device) * (ub - lb) + lb

    # If you want to enforce integer teeth in candidates as well:
    if enforce_integer_teeth:
        cand[:, 2] = torch.round(cand[:, 2])

    # sample objective
    f_dist = model_f(cand)
    f_samp = f_dist.sample()  # (n_candidates,)

    # sample constraints (stack as columns)
    c_samps = []
    for mc in models_c:
        c_samps.append(mc(cand).sample().unsqueeze(-1))
    C = torch.cat(c_samps, dim=-1)  # (n_candidates, G_or_g)

    # feasibility test in ORIGINAL G-space:
    # (for SCBO baseline, models_c already represent original constraints)
    feas = torch.all(C <= 0.0, dim=-1)

    if feas.any():
        idx = torch.argmin(f_samp[feas])
        best_idx = torch.nonzero(feas, as_tuple=False).squeeze(1)[idx]
    else:
        # sum of positive parts (total violation)
        viol = torch.clamp(C, min=0.0).sum(dim=-1)
        best_idx = torch.argmin(viol)

    return cand[best_idx].detach()

# ---- small utility to build training tensors from evaluated data ----
def numpy_to_torch(X_np, y_np):
    X = torch.as_tensor(X_np, dtype=torch.float32, device=device)
    y = torch.as_tensor(y_np, dtype=torch.float32, device=device).flatten()
    return X, y

# ---- quick driver for ONE SCBO iteration (no trust-region yet) ----
def scbo_build_models(X_hist, f_hist, C_hist):
    """
    Build one objective GP and G constraint GPs in ORIGINAL space (SCBO baseline).
    X_hist: (N,d)  f_hist: (N,)   C_hist: (N,G)
    """
    X_t, f_t = numpy_to_torch(X_hist, f_hist)
    model_f, lik_f = fit_gp(X_t, f_t)

    models_c = []
    G = C_hist.shape[1]
    for i in range(G):
        _, ci_t = numpy_to_torch(X_hist, C_hist[:, i:i+1])
        m_i, _ = fit_gp(X_t, ci_t.squeeze(-1))
        models_c.append(m_i)
    return model_f, models_c

def scbo_one_step(problem, X_hist, f_hist, C_hist, n_candidates=1000):
    """Fit GPs on history, do one constrained-TS pick, evaluate, append."""
    # bounds as torch
    lb = torch.tensor(problem.lb, dtype=torch.float32, device=device)
    ub = torch.tensor(problem.ub, dtype=torch.float32, device=device)

    # build models
    model_f, models_c = scbo_build_models(X_hist, f_hist, C_hist)

    # pick next x
    x_next_t = constrained_thompson_step(
        model_f, models_c, lb, ub,
        n_candidates=n_candidates,
        enforce_integer_teeth=problem.enforce_integer_teeth
    )
    x_next = x_next_t.cpu().numpy()

    # evaluate true f,g
    f_next, g_next = problem.evaluate(x_next)

    # append to history
    X_new = np.vstack([X_hist, x_next])
    f_new = np.concatenate([f_hist, np.atleast_1d(f_next)])
    C_new = np.vstack([C_hist, g_next])
    return X_new, f_new, C_new, x_next, f_next, g_next


In [None]:
# Cell 4 — Trust region (lite) and an SCBO run loop

import numpy as np
import torch

class TrustRegion:
    """
    Very small box trust region:
      - centered at incumbent (best feasible if any, else least-violation)
      - expands on 'success', shrinks on 'failure'
    """
    def __init__(self, lb, ub, init_frac=0.6, min_frac=0.05, max_frac=1.0,
                 grow=1.6, shrink=0.5, succ_tol=3, fail_tol=3, rng=None):
        self.lb = lb.astype(np.float32)
        self.ub = ub.astype(np.float32)
        self.center = (self.lb + self.ub) / 2.0
        self.frac = init_frac
        self.min_frac = min_frac
        self.max_frac = max_frac
        self.grow = grow
        self.shrink = shrink
        self.succ_tol = succ_tol
        self.fail_tol = fail_tol
        self.succ = 0
        self.fail = 0
        self.rng = np.random.default_rng() if rng is None else rng

    def set_center(self, x_c):
        self.center = x_c.astype(np.float32)

    def sample(self, n_cand):
        halfspan = 0.5 * self.frac * (self.ub - self.lb)
        low  = np.maximum(self.center - halfspan, self.lb)
        high = np.minimum(self.center + halfspan, self.ub)
        U = self.rng.random((n_cand, len(self.lb))).astype(np.float32)
        return low + U * (high - low)

    def step(self, success):
        if success:
            self.succ += 1; self.fail = 0
            if self.succ >= self.succ_tol:
                self.frac = min(self.max_frac, self.frac * self.grow)
                self.succ = 0
        else:
            self.fail += 1; self.succ = 0
            if self.fail >= self.fail_tol:
                self.frac = max(self.min_frac, self.frac * self.shrink)
                self.fail = 0

def best_feasible_value(f_hist, C_hist):
    feas = np.all(C_hist <= 0.0, axis=1)
    return np.min(f_hist[feas]) if np.any(feas) else np.nan

def least_violation_index(C_hist):
    viol = np.clip(C_hist, 0.0, None).sum(axis=1)
    return int(np.argmin(viol))

def scbo_run(problem, n_init=20, iters=100, n_cand=1024, verbose=True):
    """
    Full SCBO loop:
      1) LHS DoE
      2) repeat:
         - fit GPs on f and each constraint (original space)
         - sample candidates in trust region
         - constrained Thompson pick
         - evaluate, update TR, update histories
    Returns histories for plotting.
    """
    # 1) initial DoE
    X = problem.sample_lhs(n_init)
    f_vals = []
    C_vals = []
    for x in X:
        f_i, g_i = problem.evaluate(x)
        f_vals.append(f_i); C_vals.append(g_i)
    f_vals = np.array(f_vals, dtype=np.float64)
    C_vals = np.array(C_vals, dtype=np.float64)

    # init TR center: incumbent (feasible best else least-violation)
    feas = np.all(C_vals <= 0.0, axis=1)
    if np.any(feas):
        inc_idx = np.argmin(f_vals[feas])
        inc = X[feas][inc_idx]
    else:
        inc = X[least_violation_index(C_vals)]
    tr = TrustRegion(problem.lb, problem.ub)
    tr.set_center(inc.astype(np.float32))

    best_hist = []
    for t in range(iters):
        if verbose and (t % 10 == 0):
            print(f"SCBO iter {t+1}/{iters} | TR frac={tr.frac:.3f}")

        # ---- fit models on history
        model_f, models_c = scbo_build_models(X, f_vals, C_vals)

        # ---- sample candidates in TR (on device)
        cand_np = tr.sample(n_cand)
        cand_t = torch.tensor(cand_np, dtype=torch.float32, device=device)

        # enforce integer teeth for candidates too (optional, keeps consistency)
        if problem.enforce_integer_teeth:
            cand_t[:, 2] = torch.round(cand_t[:, 2])

        # ---- TS pick
        lb_t = torch.tensor(problem.lb, dtype=torch.float32, device=device)
        ub_t = torch.tensor(problem.ub, dtype=torch.float32, device=device)
        with torch.no_grad():
            f_dist = model_f(cand_t)
            f_samp = f_dist.sample()
            c_cols = []
            for mc in models_c:
                c_cols.append(mc(cand_t).sample().unsqueeze(-1))
            C_samp = torch.cat(c_cols, dim=-1)
            feas_mask = torch.all(C_samp <= 0.0, dim=-1)
            if feas_mask.any():
                idx = torch.argmin(f_samp[feas_mask])
                best_idx = torch.nonzero(feas_mask, as_tuple=False).squeeze(1)[idx]
            else:
                viol = torch.clamp(C_samp, min=0.0).sum(dim=-1)
                best_idx = torch.argmin(viol)
            x_next = cand_t[best_idx].cpu().numpy()

        # ---- evaluate true black box
        f_next, g_next = problem.evaluate(x_next)

        # ---- update histories
        X = np.vstack([X, x_next])
        f_vals = np.append(f_vals, f_next)
        C_vals = np.vstack([C_vals, g_next])

        # ---- update TR center and size
        prev_best = best_feasible_value(f_vals[:-1], C_vals[:-1])
        curr_best = best_feasible_value(f_vals, C_vals)
        success = (not np.isnan(curr_best)) and (np.isnan(prev_best) or curr_best < prev_best - 1e-12)
        tr.step(success)

        feas = np.all(C_vals <= 0.0, axis=1)
        if np.any(feas):
            inc_idx = np.argmin(f_vals[feas])
            inc = X[feas][inc_idx]
        else:
            inc = X[least_violation_index(C_vals)]
        tr.set_center(inc.astype(np.float32))

        best_hist.append(curr_best)

    return {
        "X": X, "f": f_vals, "C": C_vals,
        "best_hist": np.array(best_hist, dtype=float),
    }

# Quick smoke test (optional):
# prob = SpeedReducerProblem(enforce_integer_teeth=True)
# out = scbo_run(prob, n_init=20, iters=10, n_cand=512, verbose=True)
# print("last best feasible:", out["best_hist"][-1])
