In [1]:
from __future__ import annotations

from abc import ABC
from copy import deepcopy
from typing import Dict, Optional, Tuple, Union

import torch
from botorch.acquisition.acquisition import AcquisitionFunction
#from botorch.acquisition.objective import ScalarizedObjective
from botorch.exceptions import UnsupportedError
from botorch.models.gp_regression import FixedNoiseGP
from botorch.models.gpytorch import GPyTorchModel
from botorch.models.model import Model
from botorch.posteriors.posterior import Posterior
#from botorch.sampling.samplers import SobolQMCNormalSampler
from botorch.utils.sampling import draw_sobol_samples
from botorch.utils.transforms import convert_to_target_pre_hook, t_batch_mode_transform
from torch import Tensor
from torch.distributions import Normal


from __future__ import annotations

import math

from abc import ABC

from contextlib import nullcontext
from copy import deepcopy

from typing import Dict, Optional, Tuple, Union

import torch
from botorch.acquisition.acquisition import AcquisitionFunction
from botorch.acquisition.objective import PosteriorTransform
from botorch.exceptions import UnsupportedError
from botorch.models.gp_regression import SingleTaskGP
from botorch.models.gpytorch import GPyTorchModel
from botorch.models.model import Model
from botorch.utils.constants import get_constants_like
from botorch.utils.probability import MVNXPB
from botorch.utils.probability.utils import (
    log_ndtr as log_Phi,
    log_phi,
    log_prob_normal_in,
    ndtr as Phi,
    phi,
)
from botorch.utils.safe_math import log1mexp, logmeanexp
from botorch.utils.transforms import convert_to_target_pre_hook, t_batch_mode_transform
from gpytorch.likelihoods.gaussian_likelihood import FixedNoiseGaussianLikelihood
from torch import Tensor
from torch.nn.functional import pad

# the following two numbers are needed for _log_ei_helper
_neg_inv_sqrt2 = -(2**-0.5)
_log_sqrt_pi_div_2 = math.log(math.pi / 2) / 2


class AnalyticAcquisitionFunction(AcquisitionFunction, ABC):
    r"""
    Base class for analytic acquisition functions.

    :meta private:
    """

    def __init__(
        self,
        model: Model,
        posterior_transform: Optional[PosteriorTransform] = None,
    ) -> None:
        r"""Base constructor for analytic acquisition functions.

        Args:
            model: A fitted single-outcome model.
            posterior_transform: A PosteriorTransform. If using a multi-output model,
                a PosteriorTransform that transforms the multi-output posterior into a
                single-output posterior is required.
        """
        super().__init__(model=model)
        if posterior_transform is None:
            if model.num_outputs != 1:
                raise UnsupportedError(
                    "Must specify a posterior transform when using a "
                    "multi-output model."
                )
        else:
            if not isinstance(posterior_transform, PosteriorTransform):
                raise UnsupportedError(
                    "AnalyticAcquisitionFunctions only support PosteriorTransforms."
                )
        self.posterior_transform = posterior_transform

    def set_X_pending(self, X_pending: Optional[Tensor] = None) -> None:
        raise UnsupportedError(
            "Analytic acquisition functions do not account for X_pending yet."
        )

    def _mean_and_sigma(
        self, X: Tensor, compute_sigma: bool = True, min_var: float = 1e-12
    ) -> Tuple[Tensor, Optional[Tensor]]:
        """Computes the first and second moments of the model posterior.

        Args:
            X: `batch_shape x q x d`-dim Tensor of model inputs.
            compute_sigma: Boolean indicating whether or not to compute the second
                moment (default: True).
            min_var: The minimum value the variance is clamped too. Should be positive.

        Returns:
            A tuple of tensors containing the first and second moments of the model
            posterior. Removes the last two dimensions if they have size one. Only
            returns a single tensor of means if compute_sigma is True.
        """
        self.to(device=X.device)  # ensures buffers / parameters are on the same device
        posterior = self.model.posterior(
            X=X, posterior_transform=self.posterior_transform
        )
        mean = posterior.mean.squeeze(-2).squeeze(-1)  # removing redundant dimensions
        if not compute_sigma:
            return mean, None
        sigma = posterior.variance.clamp_min(min_var).sqrt().view(mean.shape)
        return mean, sigma

class DiscreteKnowledgeGradient(AnalyticAcquisitionFunction):
    r"""Knowledge Gradient using a fixed discretisation in the Design Space "X"."""

    def __init__(
        self,
        model: Model,
        bounds: Optional[Tensor] = None,
        num_discrete_points: Optional[int] = None,
        X_discretisation: Optional[Tensor] = None,
    ) -> None:
        r"""
        Discrete Knowledge Gradient
        Args:
            model: A fitted model.
            bounds: A `2 x d` tensor of lower and upper bounds for each column
            num_discrete_points: (int) The number of discrete points to use for input (X) space. More discrete
                points result in a better approximation, at the expense of
                memory and wall time.
            discretisation: A `k x d`-dim Tensor of `k` design points that will approximate the
                continuous space with a discretisation.
        """

        if X_discretisation is None:
            if num_discrete_points is None:
                raise ValueError(
                    "Must specify `num_discrete_points` for random discretisation if no `discretisation` is provided."
                )

            X_discretisation = draw_sobol_samples(
                bounds=bounds, n=num_discrete_points, q=1
            )

        super(AnalyticAcquisitionFunction, self).__init__(model=model)

        self.num_input_dimensions = bounds.shape[1]
        self.X_discretisation = X_discretisation

    @t_batch_mode_transform(expected_q=1, assert_output_shape=False)
    def forward(self, X: Tensor) -> Tensor:
        kgvals = torch.zeros(X.shape[0], dtype=torch.double)
        for xnew_idx, xnew in enumerate(X):
            xnew = xnew.unsqueeze(0)
            kgvals[xnew_idx] = self.compute_discrete_kg(
                xnew=xnew, optimal_discretisation=self.X_discretisation
            )
        return kgvals

    def compute_discrete_kg(
        self, xnew: Tensor, optimal_discretisation: Tensor
    ) -> Tensor:
        """

        Args:
        xnew: A `1 x 1 x d` Tensor with `1` acquisition function evaluations of
            `d` dimensions.
            optimal_discretisation: num_fantasies x d Tensor. Optimal X values for each z in zvalues.

        """
        # Augment the discretisation with the designs.
        concatenated_xnew_discretisation = torch.cat(
            [xnew, optimal_discretisation], dim=0
        ).squeeze()  # (m + num_X_disc, d)

        # Compute posterior mean, variance, and covariance.
        full_posterior = self.model.posterior(
            concatenated_xnew_discretisation, observation_noise=False
        )
        noise_variance = torch.unique(self.model.likelihood.noise_covar.noise)
        full_posterior_mean = full_posterior.mean  # (1 + num_X_disc , 1)

        # Compute full Covariante Cov(Xnew, X_discretised), select [Xnew X_discretised] submatrix, and subvectors.
        full_posterior_covariance = (
            full_posterior.mvn.covariance_matrix
        )  # (1 + num_X_disc , 1 + num_X_disc )
        posterior_cov_xnew_opt_disc = full_posterior_covariance[
            : len(xnew), :
        ].squeeze()  # ( 1 + num_X_disc,)
        full_posterior_variance = (
            full_posterior.variance.squeeze()
        )  # (1 + num_X_disc, )

        full_predictive_covariance = (
            posterior_cov_xnew_opt_disc
            / (full_posterior_variance + noise_variance).sqrt()
        )
        # initialise empty kgvals torch.tensor
        kgval = self.kgcb(a=full_posterior_mean, b=full_predictive_covariance)

        return kgval

    @staticmethod
    def kgcb(a: Tensor, b: Tensor) -> Tensor:
        r"""
        Calculates the linear epigraph, i.e. the boundary of the set of points
        in 2D lying above a collection of straight lines y=a+bx.
        Parameters
        ----------
        a
            Vector of intercepts describing a set of straight lines
        b
            Vector of slopes describing a set of straight lines
        Returns
        -------
        KGCB
            average height of the epigraph
        """

        a = a.squeeze()
        b = b.squeeze()
        assert len(a) > 0, "must provide slopes"
        assert len(a) == len(b), f"#intercepts != #slopes, {len(a)}, {len(b)}"

        maxa = torch.max(a)

        if torch.all(torch.abs(b) < 0.000000001):
            return torch.Tensor([0])  # , np.zeros(a.shape), np.zeros(b.shape)

        # Order by ascending b and descending a. There should be an easier way to do this
        # but it seems that pytorch sorts everything as a 1D Tensor

        ab_tensor = torch.vstack([-a, b]).T
        ab_tensor_sort_a = ab_tensor[ab_tensor[:, 0].sort()[1]]
        ab_tensor_sort_b = ab_tensor_sort_a[ab_tensor_sort_a[:, 1].sort()[1]]
        a = -ab_tensor_sort_b[:, 0]
        b = ab_tensor_sort_b[:, 1]

        # exclude duplicated b (or super duper similar b)
        threshold = (b[-1] - b[0]) * 0.00001
        diff_b = b[1:] - b[:-1]
        keep = diff_b > threshold
        keep = torch.cat([torch.Tensor([True]), keep])
        keep[torch.argmax(a)] = True
        keep = keep.bool()  # making sure 0 1's are transformed to booleans

        a = a[keep]
        b = b[keep]

        # initialize
        idz = [0]
        i_last = 0
        x = [-torch.inf]

        n_lines = len(a)
        # main loop TODO describe logic
        # TODO not pruning properly, e.g. a=[0,1,2], b=[-1,0,1]
        # returns x=[-inf, -1, -1, inf], shouldn't affect kgcb
        while i_last < n_lines - 1:
            i_mask = torch.arange(i_last + 1, n_lines)
            x_mask = -(a[i_last] - a[i_mask]) / (b[i_last] - b[i_mask])

            best_pos = torch.argmin(x_mask)
            idz.append(i_mask[best_pos])
            x.append(x_mask[best_pos])

            i_last = idz[-1]

        x.append(torch.inf)

        x = torch.Tensor(x)
        idz = torch.LongTensor(idz)
        # found the epigraph, now compute the expectation
        a = a[idz]
        b = b[idz]

        normal = Normal(torch.zeros_like(x), torch.ones_like(x))

        pdf = torch.exp(normal.log_prob(x))
        cdf = normal.cdf(x)

        kg = torch.sum(a * (cdf[1:] - cdf[:-1]) + b * (pdf[:-1] - pdf[1:]))
        kg -= maxa
        return kg

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
import botorch
from known_boundary.acquisition_function import EI_acquisition_opt,MES_acquisition_opt,LCB_acquisition_opt,ERM_acquisition_opt,SLogTEI_acquisition_opt,SLogEI_acquisition_opt
from known_boundary.utlis import  get_initial_points,transform,opt_model_MLE,opt_model_MAP
import numpy as np
import GPy
import torch
from botorch.test_functions import Ackley,Beale,Branin,Rosenbrock,SixHumpCamel,Hartmann,Powell,DixonPrice,Levy,StyblinskiTang,Griewank
import obj_functions.push_problems
from botorch.utils.transforms import unnormalize,normalize
from known_boundary.SLogGP import SLogGP
import scipy 


from botorch.models import SingleTaskGP,FixedNoiseGP
from botorch.acquisition import ExpectedImprovement,PosteriorMean,qKnowledgeGradient
from botorch.optim import optimize_acqf
from torch.quasirandom import SobolEngine
from gpytorch.kernels import MaternKernel, RBFKernel, IndexKernel
from gpytorch.kernels.scale_kernel import ScaleKernel
from gpytorch.means import ZeroMean

import warnings
warnings.filterwarnings("ignore")
import logging
logging.getLogger('lengthscale').disabled = True
logging.getLogger('variance').disabled = True
logging.getLogger('psi').disabled = True


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
dtype = torch.double


function_information = []


temp={}
temp['name']='Branin2D' 
temp['function'] = Branin(negate=True)
temp['fstar'] =  0.397887 
function_information.append(temp)

In [5]:
for information in function_information:

    fun = information['function']
    dim = fun.dim
    bounds = fun.bounds
    standard_bounds=np.array([0.,1.]*dim).reshape(-1,2)
    
    n_init = 4*dim

    
    fstar = information['fstar']
    
    print('fstar is: ',fstar)
    
    if dim <=3:
        step_size = 2
        iter_num = 50
        N = 10
    elif dim<=5:
        step_size = 3
        iter_num = 100
        N = 100
    else:
        step_size = 3
        iter_num = 150
        N = 15
        
    lengthscale_range = [0.001,2]
    variance_range = [0.001**2,20]
    noise = 1e-6
    
    print(information['name'])
        
    
    ############################# GP+EI ###################################
    BO_KG = []
    noise = 1e-6

    for exp in range(N):
        
        print(exp)
        
        seed = exp

        X_BO = get_initial_points(bounds, n_init,device,dtype,seed=seed)
        Y_BO = torch.tensor(
            [fun(x) for x in X_BO], dtype=dtype, device=device
        ).reshape(-1,1)

        best_record = [Y_BO.max().item()]
        np.random.seed(1234)

        for i in range(iter_num):

                print(i)
                
                if i%step_size == 0:
                    Y_mean =  Y_BO.mean()
                    Y_std = Y_BO.std()
            
                train_Y = (Y_BO -Y_mean) / Y_std
                train_X = normalize(X_BO, bounds)
                
                minimal = train_Y.min().item()
                
                train_Y = train_Y.numpy()
                train_X = train_X.numpy()
                
                # train the GP
                if i%step_size == 0:
                    
                    parameters = opt_model_MLE(train_X,train_Y,dim,'GP',noise=noise,seed=i,lengthscale_range=lengthscale_range,variance_range=variance_range)
                        
                    lengthscale = parameters[0]
                    variance = parameters[1]
                    
                    print('lengthscale: ',lengthscale)
                    print('variance: ',variance)
                
                covar_module =  ScaleKernel(RBFKernel())
                train_yvar = torch.tensor(noise, device=device, dtype=dtype)

                torch.manual_seed(exp+iter_num)
                model = FixedNoiseGP(torch.tensor(train_X), torch.tensor(train_Y), train_yvar.expand_as(torch.tensor(train_Y)),
                                     mean_module=ZeroMean(),covar_module=covar_module).to(device)
                
                model.covar_module.outputscale = variance
                covar_module.base_kernel.lengthscale = lengthscale
                
                model.eval()
                
                AF = DiscreteKnowledgeGradient(model=model, bounds=torch.tensor(standard_bounds.T),num_discrete_points=100) .to(device)

                standard_next_X, _ = optimize_acqf(
                    acq_function=AF,
                    bounds=torch.tensor(standard_bounds.T) .to(device),
                    q=1,
                    num_restarts=3*dim,
                    raw_samples=100,
                    options={},
                )

                
                # EI = ExpectedImprovement(model=model, best_f=np.max(train_Y)) .to(device)

                # standard_next_X, _ = optimize_acqf(
                #     acq_function=EI,
                #     bounds=torch.tensor(standard_bounds.T) .to(device),
                #     q=1,
                #     num_restarts=3*dim,
                #     raw_samples=30*dim,
                #     options={},
                # )
                
                
                
                # kernel = GPy.kern.RBF(input_dim=dim,lengthscale=lengthscale,variance=variance)
                # m = GPy.models.GPRegression(train_X.reshape(-1,dim), train_Y.reshape(-1,1),kernel)
                # m.Gaussian_noise.fix(noise)

                # np.random.seed(i)
                # standard_next_X = EI_acquisition_opt(m,bounds=standard_bounds,f_best=minimal)
                
                
                X_next = unnormalize(torch.tensor(standard_next_X), bounds).reshape(-1,dim)            
                Y_next = fun(X_next).reshape(-1,1)

                # Append data
                X_BO = torch.cat((X_BO, X_next), dim=0)
                Y_BO = torch.cat((Y_BO, Y_next), dim=0)
                
                
                
                # final choice is EI
                kernel = GPy.kern.RBF(input_dim=dim,lengthscale=lengthscale,variance=variance)
                m = GPy.models.GPRegression(train_X.reshape(-1,dim), train_Y.reshape(-1,1),kernel)
                m.Gaussian_noise.fix(noise)

                np.random.seed(i)
                standard_next_X_final = EI_acquisition_opt(m,bounds=standard_bounds,f_best=minimal)
                
                X_next_final = unnormalize(torch.tensor(standard_next_X_final), bounds).reshape(-1,dim)            
                Y_next_final = fun(X_next_final).reshape(-1,1)

                # Append data
                X_BO_final = torch.cat((X_BO, X_next_final), dim=0)
                Y_BO_final = torch.cat((Y_BO, Y_next_final), dim=0)
                
                best_record.append(Y_BO_final.max().item())
                
                print(best_record[-1])
                
                noise = variance*10**(-5)   #adaptive noise
                noise = np.round(noise, -int(np.floor(np.log10(noise))))
                print('noise: ',noise)
                
        best_record = np.array(best_record) 
        BO_KG.append(best_record)
        

fstar is:  0.397887
Branin2D
0
0
lengthscale:  0.25472596777010337
variance:  0.9502835652673391
-3.5451968551073154
noise:  1e-05
1
-3.5451968551073154
noise:  1e-05
2
lengthscale:  0.26427288507641644
variance:  0.8397149780705103
-1.9857158270411155
noise:  8e-06
3
-1.9857158270411155
noise:  8e-06
4
lengthscale:  0.2873438169759484
variance:  1.115755989042731
-1.9857158270411155
noise:  1e-05
5
-1.9857158270411155
noise:  1e-05
6
lengthscale:  0.3451401182139733
variance:  1.359889269642337
-1.9857158270411155
noise:  1e-05
7
-1.9857158270411155
noise:  1e-05
8
lengthscale:  0.34948060373411033
variance:  2.0152997900044616
-1.9857158270411155
noise:  2e-05
9
-1.9857158270411155
noise:  2e-05
10
lengthscale:  0.38904901362863364
variance:  2.793129397859841
-1.9857158270411155
noise:  3e-05
11
-1.9857158270411155
noise:  3e-05
12
lengthscale:  0.288197487780583
variance:  1.4219478171201851
-1.9857158270411155
noise:  1e-05
13
-1.9857158270411155
noise:  1e-05
14
lengthscale:  0.2

KeyboardInterrupt: 

In [10]:
standard_bounds.T

array([[0., 0.],
       [1., 1.]])