In [93]:
import math
from typing import Optional

import torch
from botorch.acquisition.acquisition import AcquisitionFunction
from botorch.acquisition.objective import PosteriorTransform
from botorch.exceptions import UnsupportedError
from botorch.models.gp_regression import FixedNoiseGP
from botorch.models.gpytorch import GPyTorchModel
from botorch.models.model import Model
from botorch.utils.constants import get_constants_like
from botorch.utils.probability import MVNXPB
from botorch.utils.probability.utils import (
    log_ndtr as log_Phi,
    log_phi,
    log_prob_normal_in,
    ndtr as Phi,
    phi,
)
from botorch.utils.safe_math import log1mexp, logmeanexp
from botorch.utils.transforms import convert_to_target_pre_hook, t_batch_mode_transform
from torch import Tensor
from torch.nn.functional import pad
from botorch.test_functions import Hartmann, Ackley, Beale
from botorch.optim import optimize_acqf
from botorch.acquisition import ExpectedImprovement
from botorch.acquisition.analytic import AnalyticAcquisitionFunction
from botorch.fit import fit_gpytorch_mll
from botorch.models import SingleTaskGP
from botorch.test_functions import Hartmann
from gpytorch.mlls import ExactMarginalLogLikelihood
from botorch.acquisition import qKnowledgeGradient

In [2]:
class DiverseAcquisitionFunction(AnalyticAcquisitionFunction):
    def __init__(self, model, lambda_, epsilon_, best_f):
        super().__init__(model=model)
        self.register_buffer("lambda_", torch.as_tensor(lambda_))
        self.register_buffer("epsilon_", torch.as_tensor(epsilon_))
        self.register_buffer("best_f", torch.as_tensor(best_f))

    @t_batch_mode_transform(expected_q = 1)
    def forward(self, X: Tensor) -> Tensor:
        mean, sigma = self._mean_and_sigma(X)
        factor = (1 + self.epsilon_ ) if self.best_f > 0 else (1 - epsilon_)
        
        ei_portion = Phi((self.best_f - mean)/sigma) * (self.best_f - mean)
        dei_portion = phi((self.best_f - mean)/sigma) + self.lambda_ * Phi((factor * self.best_f - mean)/sigma)
        
        return ei_portion + dei_portion * sigma

In [3]:
import warnings
warnings.filterwarnings("ignore")
from botorch.test_functions import Hartmann, Ackley, Beale

In [22]:
#Hartmann Function
num_start = 100
bounds = torch.tensor([[0.0] * 6, [1.0] * 6])
func = Hartmann(dim=6, negate=False)

In [102]:
#Ackley Function
dim = 2
num_start = 100
bounds = torch.tensor([[-32.0] * 2, [32.0] * 2])
func = Ackley(dim = 2)

In [86]:
#Beale Function
num_start = 100
bounds = torch.tensor([[-32.0] * 2, [32.0] * 2])
func = Beale()

In [95]:
import warnings
warnings.filterwarnings('ignore')

lambda_ = 0.5
epsilon_ = 0.05
DEI_param_settings = {"lambda_":lambda_, "epsilon_":epsilon_}
EI_param_settings = {"maximize":False}
know_param_settings = {"num_fantasies":50}

In [103]:
def scalarize_input(train_x, bounds):
    train_x_i = torch.clone(train_x)
    for dim in range(bounds.shape[1]):
        bound = bounds[:, dim]
        train_x_i[:, dim] -= bound[0]
        train_x_i[:, dim] /= ((bound[1] - bound[0]))
    return train_x_i

def revert_input(train_x_i, bounds):
    train_x = torch.clone(train_x_i)
    for dim in range(bounds.shape[1]):
        bound = bounds[:, dim]
        train_x[:, dim] *= ((bound[1] - bound[0]))
        train_x[:, dim] += bound[0]
    return train_x
    
def scalarize_output(train_obj):
    mean = train_obj.mean()
    std = train_obj.std()
    return (train_obj - mean)/std, mean, std

def revert_output(train_obj, mean, std):
    return train_obj * std + mean
    
def bayes_opt_loop(opt_func, bounds, num_start, num_sim, acf_func, param_settings = {}):
    dim = bounds.shape[1]
    
    #actual observed 
    obs_x = revert_input(torch.rand(num_start, dim), bounds)
    obs_obj = opt_func(obs_x).unsqueeze(-1)
    
    #regularized inputs to model
    model_bounds = torch.tensor([[0.0] * dim, [1.0] * dim])
    model_input = scalarize_input(obs_x, bounds)
    model_output, mean, sigma = scalarize_output(obs_obj)
    var = torch.zeros(model_output.shape) + 10**(-6)

    model = FixedNoiseGP(train_X = model_input, train_Y = model_output, train_Yvar = var)
    mll = ExactMarginalLogLikelihood(model.likelihood, model)
    fit_gpytorch_mll(mll)

    acf_func = acf_func(model = model, best_f = model_output.min(), **param_settings)

    for i in range(num_sim):
        best_f = model_output.min()
        acf_func.model = model
        acf_func.best_f = best_f
        
        new_point, _ = optimize_acqf(acf_func, bounds=model_bounds, q = 1, num_restarts = 5, raw_samples = 100)
        eval_point = revert_input(new_point, bounds)
        eval_result = opt_func(eval_point).expand(1, 1)
        reg_eval_result = (eval_result - mean)/sigma
        
        model_input = torch.cat((model_input, new_point), 0)
        model_output = torch.cat((model_output, reg_eval_result), 0)
        var = torch.cat((var, torch.as_tensor(10**-6).expand(1,1)), 0)
        
        #rechange parameters for output
        if i % 10 == 0:
            model_output = revert_output(model_output, mean, sigma)
            model_output, mean, sigma = scalarize_output(model_output)
        
        model = FixedNoiseGP(train_X = model_input, train_Y = model_output, train_Yvar = var)
        mll = ExactMarginalLogLikelihood(model.likelihood, model)
        fit_gpytorch_mll(mll)
    
    obs_x = revert_input(model_input, bounds)
    obs_obj = revert_output(model_output, mean, sigma)
    return obs_x, obs_obj

In [None]:
x, y = bayes_opt_loop(func, bounds, 100, 1000, ExpectedImprovement, EI_param_settings)

In [132]:
x, y = bayes_opt_loop(func, bounds, 100, 1000, DiverseAcquisitionFunction, DEI_param_settings)

In [98]:
#x, y = bayes_opt_loop(func, bounds, 100, 10, qKnowledgeGradient, know_param_settings)

In [128]:
def evaluate(true_min, train_obj, train_x, epsilon_):
    factor = (1 + epsilon_) if true_min > 0 else (1 - epsilon_)

    torch_ind = train_obj < factor * true_min
    ind = []
    i = 0
    for val in torch_ind:
        if val:
            ind.append(i)
        i += 1

    feasible_sol, feasible_x = train_obj[ind], train_x[ind]
    max_dist = 0 

    for i in range(feasible_sol.shape[0]):
        for j in range(i + 1, feasible_sol.shape[0]):
            val1, val2 = feasible_x[i], feasible_x[j]
            max_dist = max(max_dist, torch.norm(val1 - val2))

    return feasible_sol, feasible_x, max_dist.item()

In [None]:
evaluate(0.4, y, x, 10)