In [1]:
# testing implementation of n-dimensional inputs for turbo acquisition function

In [2]:
# imports
from dataclasses import dataclass
from typing import Any
import torch
import math
from botorch.generation import MaxPosteriorSampling
from botorch.models.gp_regression import ExactGP
from torch.quasirandom import SobolEngine
from botorch.acquisition.monte_carlo import qExpectedImprovement
from botorch.optim import optimize_acqf

In [3]:
# setup
dtype = torch.double
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
import warnings
from botorch.exceptions import BadInitialCandidatesWarning
warnings.filterwarnings('ignore', category=BadInitialCandidatesWarning)
warnings.filterwarnings('ignore', category=RuntimeWarning)

In [16]:
# objects
@dataclass
class NewTurboState:
    """Class to maintain the trust region for TuRBO"""
    dim: int
    batch_size: int
    center: torch.Tensor
    lb: torch.Tensor
    ub: torch.Tensor
    train_x: Any = None
    train_y: Any = None
    length: float = 0.3
    length_min: float = 0.25 ** 3
    length_max: float = 0.5  # modifying for [0,1] interval from 1.6 to 0.5
    failure_counter: int = 0
    failure_tolerance: int = float("nan")  # post initialized
    success_counter: int = 0
    success_tolerance: int = 4  # original paper uses 3
    best_value: float = -float("inf")
    restart_triggered: bool = False
    domain_constraints: torch.Tensor = None

    def __post_init__(self):
        self.failure_tolerance = math.ceil(
            max([4.0 / self.batch_size, float(self.dim) / self.batch_size]))

    def get_training_samples_in_region(self):
        """query the training data and get the samples in the trust region - note this is for 1-d case"""
        # get the updated geometry of the trust region
        self.center = self.train_x[self.train_y.argmax(), :].clone()
        self.lb = torch.clamp(self.center - self.length / 2.0, 0.0, 1.0)
        self.ub = torch.clamp(self.center + self.length / 2.0, 0.0, 1.0)
        # get the indices of the evaluated data points in the trust region
        idx_below_ub = torch.where(self.train_x <= self.ub, True, False)
        idx_above_lb = torch.where(self.train_x >= self.lb, True, False)
        idx_in_tr = idx_below_ub & idx_above_lb
        # get the training points to use that are in the trust region
        train_x_tr = self.train_x[idx_in_tr].unsqueeze(-1)
        train_y_tr = self.train_y[idx_in_tr].unsqueeze(-1)
        # return the training points to use
        if len(train_x_tr) < 1:
            print("\nNot enough points in tr, using global data to fit model\n")
            return self.train_x.unsqueeze(-1), self.train_y.unsqueeze(-1)
            # depending on which {x, y} data are in current state, may need to revisit this
        else:
            return train_x_tr, train_y_tr


def new_update_state(state: NewTurboState, x_train, y_train, y_next):
    """Update the state of the trust region each iteration"""
    # check if the last iteration was successful and update attributes
    if torch.max(y_next) > state.best_value + 1e-3 * math.fabs(state.best_value):
        state.success_counter += 1
        state.failure_counter = 0
    else:
        state.success_counter = 0
        state.failure_counter += 1
    # modify trust region geometry based on success or failure of last step
    if state.success_counter == state.success_tolerance:  # expand trust region
        state.length = min(2.0 * state.length, state.length_max)
        state.success_counter = 0
    elif state.failure_counter == state.failure_tolerance:  # shrink the trust region
        state.length /= 2.0
        state.failure_counter = 0
    # update the best value seen
    state.best_value = max(state.best_value, torch.max(y_next).item())
    # check if the trust region needs to restart
    if state.length < state.length_min:
        state.restart_triggered = True
        state.length = 0.5  # assumes x in [0, 1]
        print("\nTuRBO restart triggered")
    # update training data set
    state.train_x = x_train
    state.train_y = y_train
    return state


def turbo_region_bounds(model: ExactGP, x_center, length, dim):
    """Get the bounds for the turbo trust region"""
    # scale the trust region to be proportional to the length scales
    weights = model.covar_module.base_kernel.lengthscale.squeeze().detach()
    weights = weights / weights.mean()
    if dim > 1:
        weights = weights / torch.prod(weights.pow(1.0 / len(weights)))
    tr_lb = torch.clamp(x_center - weights * length / 2.0, 0.0, 1.0)
    tr_ub = torch.clamp(x_center + weights * length / 2.0, 0.0, 1.0)
    return tr_lb, tr_ub


def turbo_bounds_no_scaling(x_center, length, dim):
    """get the bounds of the turbo trust region without scaling them by the lengthscales,
    this means that the region is a hyper-square, all sides the same length"""
    tr_lb = torch.clamp(x_center - length / 2.0, 0.0, 1.0)
    tr_ub = torch.clamp(x_center + length / 2.0, 0.0, 1.0)
    return tr_lb, tr_ub


def turbo_thompson_sampling(state: NewTurboState, n_candidates, tr_lb, tr_ub, x_center, model, batch_size):
    """Convert candidates and trust region geometry to next sample point"""
    # thompson sampling
    dim = state.train_x.shape[-1]
    sobol = SobolEngine(dim, scramble=True)
    pert = sobol.draw(n_candidates).to(dtype=dtype, device=device)
    pert = tr_lb + (tr_ub - tr_lb) * pert
    # create perturbation mask
    prob_perturb = min(20.0 / dim, 1.0)
    mask = (
            torch.rand(n_candidates, dim, dtype=dtype, device=device)
            <= prob_perturb
    )
    ind = torch.where(mask.sum(dim=1) == 0)[0]
    if dim == 1:
        mask[ind, torch.randint(0, 1, size=(len(ind),), device=device)] = 1
    else:
        mask[ind, torch.randint(0, dim - 1, size=(len(ind),), device=device)] = 1
    # create candidate points from the perturbations and the mask
    x_cand = x_center.expand(n_candidates, dim).clone()
    x_cand[mask] = pert[mask]

    # sample the candidate points
    thompson_sampling = MaxPosteriorSampling(model=model, replacement=False)
    with torch.no_grad():
        return thompson_sampling(x_cand, num_samples=batch_size)


def generate_batch(state: NewTurboState,  # trust region state
                   model: ExactGP,  # GP model
                   dim,  # number of input dimensions
                   x,  # evaluated points on [0,1] for each dimension
                   y,  # evaluated function values corresponding to x
                   batch_size,
                   n_candidates=None,
                   num_restarts=10,
                   raw_samples=512,
                   acqf='ts',  # 'ei' or 'ts'
                   ):
    """Acquisition function for TuRBO, wraps Thompson sampling or Expected Improvement constrained to
    trust region boundaries"""
    assert acqf in ("ts", "ei")
    assert x.min() >= 0.0 and x.max() <= 1.0 and torch.all(torch.isfinite(y))
    if n_candidates is None:
        n_candidates = min(5000, max(2000, 200 * x.shape[-1]))

    # # scale the trust region to be proportional to the length scales
    x_center = x[y.argmax(), :].clone()
    tr_lb, tr_ub = turbo_region_bounds(model, x_center, state.length, dim=dim)

    if acqf == 'ts':
        x_next = turbo_thompson_sampling(state, n_candidates, tr_lb, tr_ub, x_center, model, batch_size)

    elif acqf == 'ei':
        ei = qExpectedImprovement(model, y.max(), maximize=True)
        x_next, acq_value = optimize_acqf(
            ei,
            bounds=torch.stack([tr_lb, tr_ub]),
            q=batch_size,
            num_restarts=num_restarts,
            raw_samples=raw_samples,
        )

    return x_next

In [5]:
# n-d test problem
from botorch.models import SingleTaskGP
from gpytorch.mlls import ExactMarginalLogLikelihood
from botorch import fit_gpytorch_mll

train_X = torch.rand(20, 2, dtype=dtype)
train_Y = torch.sin(train_X).sum(dim=1, keepdim=True)
model_example = SingleTaskGP(train_X, train_Y)
mll_example = ExactMarginalLogLikelihood(model_example.likelihood, model_example)
fitting = fit_gpytorch_mll(mll_example)

In [9]:
train_Y.argmax()

tensor(15)

In [20]:
test_lb, test_ub = turbo_region_bounds(model_example, train_X[5, :].clone(), 0.25, 2)
print(test_lb)
print(test_ub)

tensor([0.7134, 0.7153], dtype=torch.float64)
tensor([0.9666, 0.9621], dtype=torch.float64)


In [14]:
model_example.covar_module.base_kernel.lengthscale.squeeze().detach()

tensor([1.3511, 1.3173], dtype=torch.float64)

In [19]:
test_lb, test_ub = turbo_bounds_no_scaling(train_X[5, :].clone(), 0.25, 2)
print(test_lb)
print(test_ub)

tensor([0.7150, 0.7137], dtype=torch.float64)
tensor([0.9650, 0.9637], dtype=torch.float64)
