In [105]:
import math
from botorch.models.model import Model
from botorch.utils import t_batch_mode_transform
from torch import Tensor
from botorch.models.model_list_gp_regression import ModelListGP
import torch
from botorch.models import  FixedNoiseGP
from botorch.fit import fit_gpytorch_mll
from botorch.models import SingleTaskGP
from botorch.utils import standardize
from gpytorch.mlls.sum_marginal_log_likelihood import SumMarginalLogLikelihood
from botorch.acquisition import AnalyticAcquisitionFunction
from botorch.acquisition import MCAcquisitionFunction
from botorch.optim.optimize import optimize_acqf
from botorch.optim.initializers import gen_batch_initial_conditions

define the acquisition function

In [106]:
from botorch.acquisition import AnalyticAcquisitionFunction
import torch

class HyperVolumeScalarizedUCB(AnalyticAcquisitionFunction):
    def __init__(
        self,
        model,
        beta: float,
        theta: torch.Tensor,
        ref: torch.Tensor,
        maximize: bool = True,
    ) -> None:
        """
        Initializes the HyperVolume Scalarized Upper Confidence Bound Acquisition Function.

        Args:
            model: A BoTorch model representing the posterior distribution of the objectives.
            beta (Tensor of shape [1] or [o]): The exploration-exploitation trade-off parameter(s).
            theta (Tensor of shape [o]): The weights used for scalarizing the upper bounds, where `o` is the number of objectives.
            maximize (bool): Whether to maximize or minimize the scalarized objective. Defaults to True (maximize).
        """
        super(AnalyticAcquisitionFunction, self).__init__(model)
        self.maximize = maximize
        self.register_buffer("beta", torch.as_tensor(beta))
        self.register_buffer("theta", torch.as_tensor(theta))
        self.register_buffer("ref", torch.as_tensor(ref))
    @t_batch_mode_transform(expected_q=1)
    def forward(self, X: torch.Tensor) -> torch.Tensor:
        """
        Evaluate the scalarized Upper Confidence Bound on the candidate set X.

        Args:
            X (Tensor of shape [b, d]): A tensor containing `(b)` batches of `d`-dimensional design points.

        Returns:
            Tensor of shape [b]: A tensor containing the scalarized Upper Confidence Bound values for each batch.
        """
        self.beta = self.beta.to(X)
        self.theta = self.theta.to(X)
        self.ref = self.ref.to(X)
        posterior = self.model.posterior(X)
        means = posterior.mean.squeeze(dim=-2)  # b x o
        std_devs = posterior.variance.squeeze(dim=-2).sqrt()  # b x o
        m = means.shape[1]
        # Calculate upper confidence bounds for each objective
        u_t = means + (self.beta.expand_as(means) * std_devs) - self.ref # b x o

        # Apply the scalarization function to the upper bounds
        scalarized_ut = torch.min(torch.max(torch.zeros_like(u_t), u_t / self.theta) ** m, dim=-1)[0]  # b

        return scalarized_ut


define the auxiliary acquisition

In [107]:
class AuxiliaryAcq(MCAcquisitionFunction):
    def __init__(
        self,
        model,
        beta: float,
        theta: torch.Tensor,
        ref: torch.Tensor,
        maximize: bool = True,
    ) -> None:
        """
        An auxiliary acquisition defined in Algo.2

        Args:
            model: A BoTorch model representing the posterior distribution of the objectives.
            beta (Tensor of shape [1] or [o]): The exploration-exploitation trade-off parameter(s).
            theta (Tensor of shape [o]): The weights used for scalarizing the upper bounds, where `o` is the number of objectives.
            maximize (bool): Whether to maximize or minimize the scalarized objective. Defaults to True (maximize).
        """
        super(MCAcquisitionFunction, self).__init__(model)
        self.maximize = maximize
        self.register_buffer("beta", torch.as_tensor(beta))
        self.register_buffer("theta", torch.as_tensor(theta))
        self.register_buffer("ref", torch.as_tensor(ref))
    @t_batch_mode_transform()
    def forward(self, X: torch.Tensor) -> torch.Tensor:
        """
        Evaluate the scalarized Upper Confidence Bound on the candidate set X.

        Args:
            X (Tensor of shape [b, d]): A tensor containing `(b)` batches of `d`-dimensional design points.

        Returns:
            Tensor of shape [b]: A tensor containing the scalarized Upper Confidence Bound values for each batch.
        """
        self.beta = self.beta.to(X)
        self.theta = self.theta.to(X)
        self.ref = self.ref.to(X)
        posterior = self.model.posterior(X)
        #print(posterior.mean.shape)
        means = posterior.mean  # b x q x o
        std_devs = posterior.variance.sqrt()  # b x q x o
        # Calculate upper confidence bounds for each objective
        u_t = means + (self.beta.expand_as(means) * std_devs) - self.ref # b x qx o
        #print('233', u_t.shape)

        # Apply the scalarization function to the upper bounds
        scalarized_ut = torch.max(torch.min(u_t, dim=-1)[0], dim=-1)[0]  # b
        print('22', scalarized_ut.shape)

        return scalarized_ut


check feasibility

- optimize the posterior calculation

In [108]:
def create_ucb_constraints(model, beta: float, thresholds: torch.Tensor):
    """
    Creates a list of non-linear inequality constraints for a multi-output GP model, ensuring that the upper confidence
    bounds of the model's outputs are greater than or equal to the specified thresholds.

    Args:
        model (MultiTaskGP): A multi-output Gaussian Process model.
        beta (float): The scalar coefficient for the variance component of the UCB.
        thresholds (torch.Tensor): A tensor of thresholds for each output dimension.

    Returns:
        List[Tuple[Callable, bool]]: A list of tuples, each containing a callable constraint and a boolean indicating
                                      whether the constraint is intra-point (True) or inter-point (False). Each callable
                                      takes a tensor `X` of shape [q, d] (where `d` is the dimension of the input space
                                      and `q` can be 1 or more representing different design points) and returns a scalar
                                      that should be non-negative if the constraint is satisfied.
    """

    def make_constraint(i, threshold):
        """
        Creates a constraint function for the i-th objective.

        Args:
            i (int): The index of the output dimension for which to create the constraint.
            threshold (float): The threshold value that the UCB of the i-th output should meet.

        Returns:
            Callable: A function that evaluates the constraint across a batch of design points.
        """
        def constraint(X):
            X = X.unsqueeze(0)
            # Compute posterior at X
            posterior = model.posterior(X)
            mean = posterior.mean[:, i]  # Extract the mean for the i-th output
            variance = posterior.variance[:, i]  # Extract the variance for the i-th output
            ucb = mean + beta * variance.sqrt()  # Compute the UCB

            # Minimum across all points in the batch to satisfy the constraint for any single design point
            return ucb - threshold

        return constraint

    # Create a list of constraints for each output dimension, all set as intra-point since they evaluate individually
    constraints = [(make_constraint(i, thresholds[i]), True) for i in range(thresholds.size(0))]

    return constraints

sphere point generator

In [109]:
def get_random_sample_on_n_sphere(N, R):
    # Return a single sample of a vector of dimension N
    # with a uniform distribution on the (N-1)-Sphere surface of radius R.
    # RATIONALE: https://mathworld.wolfram.com/HyperspherePointPicking.html
    
    # Generate a normally distributed point
    X = torch.randn(N)

    # Normalize this point to the surface of the sphere, then scale by radius R
    return R * X / torch.norm(X)

complete BO - loop

test a test function



maximizing task:

let test function to be: $f_1: x,y  \rightarrow \frac{1}{x}+y; f_2: x, y \rightarrow x+y^2$

both taking bounds [1,2]

In [110]:
bounds = torch.tensor([[1.0]*2,[2.0]*2])
dtype =torch.float64
def f_1(x):
    r = 1/x[:,0]+ x[:,1]
    return(-r.unsqueeze(1))

def f_2(x):
    r = x[:,0]+ x[:,1]**2
    return(-r.unsqueeze(1))


x = torch.rand(5,2, dtype=dtype)+1
y_1 = f_1(x)
y_2 = f_2(x)



In [111]:
import warnings
warnings.filterwarnings('ignore')
beta =4.4
a = -1.8
b=  -2.5
thresholds = torch.tensor([a,b])
for batch in range(1):
    #not written in loop for now
    model_list = []
    m = 2
    current_model_1 = SingleTaskGP(train_X= x, train_Y= y_1)
    model_list.append(current_model_1)
    current_model_2 = SingleTaskGP(train_X= x, train_Y= y_2)
    model_list.append(current_model_2)
    model = ModelListGP(*model_list)

    #sample theta from distribution
    theta = get_random_sample_on_n_sphere(m,1).abs()

    #create auxiliary acquisition
    AuxAcq = AuxiliaryAcq(model= model, beta= torch.tensor(beta), theta = theta, ref= thresholds)

    #optimize auxiliary acquisition
    initializer, acq_value = optimize_acqf(
        acq_function = AuxAcq,
        q = 5,
        num_restarts = 10,
        raw_samples = 20,
        bounds = bounds
    )
    #declare
    if acq_value < 0: 
        print(f'decalre infeasibility in {batch+1} rounds')
        break

    #create acquisition function
    HVUCB = HyperVolumeScalarizedUCB(model= model, beta= torch.tensor(beta), theta = theta, ref= thresholds)
    #optimize constraint function
    candidate, _ = optimize_acqf(
        acq_function = HVUCB,
        q = 1,
        num_restarts = 10,
        raw_samples = 20,
        nonlinear_inequality_constraints = create_ucb_constraints(beta=beta, model= model, thresholds= thresholds),
        #ic_generator = gen_batch_initial_conditions,
        
        batch_initial_conditions = initializer.view([-1,1,m]),
        bounds = bounds,
        sequential = True
    )
    #update data
    x = torch.cat([x, candidate],dim=0)
    y_1 = torch.cat([y_1, f_1(candidate)], dim = 0)
    y_2 = torch.cat([y_2, f_2(candidate)], dim = 0)

22 torch.Size([20])
22 torch.Size([10])
22 torch.Size([10])
22 torch.Size([10])
22 torch.Size([10])
22 torch.Size([10])
22 torch.Size([10])
22 torch.Size([10])
22 torch.Size([10])


In [112]:
initializer

tensor([[2.0000, 2.0000],
        [1.7023, 1.6359],
        [1.9485, 1.1150],
        [1.6057, 1.2732],
        [1.6862, 1.2601]])

In [113]:
initializer.view([5,-1,2])

tensor([[[2.0000, 2.0000]],

        [[1.7023, 1.6359]],

        [[1.9485, 1.1150]],

        [[1.6057, 1.2732]],

        [[1.6862, 1.2601]]])