In [1]:
import torch

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
dtype = torch.double

In [3]:
from botorch.test_functions import Hartmann


nega_hartmann6 = Hartmann(negate=True)


def outcome_constraint(X):
    return X.sum(dim=-1) - 3


def weighted_obj(X):
    return neg_hartmann6(X) * (outcome_constraint(X) <= 0).type_as(X)

In [5]:
from botorch.models import FixedNoiseGP, ModelListGP
from gpytorch.mlls.sum_marginal_log_likelihood import SumMarginalLogLikelihood


NOISE_SE = 0.5
y_train_var = torch.tensor(NOISE_SE ** 2, device=device, dtype=dtype)


def generate_initial_data(n=10):
    x_train = torch.rand(n, 6, device=device, dtype=dtype)
    
    exact_obj = neg_hartmann6(x_train).unsqueeze(-1)
    exact_con = outcome_constraint(x_train).unsqueeze(-1)
    
    train_obj = exact_obj + NOISE_SE * torch.randn_like(exact_obj)
    train_con = exact_con + NOISE_SE * torch.randn_like(train_con)
    
    best_observed = weighted_obj(x_train).max().item()
    
    return x_train, train_obj, train_con, best_observed


def initialize_model(x_train, train_obj, train_con, state_dict=None):
    # Define model
    model_obj = FixedNoiseGP(x_train, train_obj, y_train_var.expand_as(train_obj)).to(x_train)
    model_con = FixedNosieGP(x_train, train_con, y_train_var.expand_as(train_con)).to(x_train)
    
    # Combine into a multi-output GP model
    model = ModelListGP(model_obj, model_con)
    mll = SumMarginalLogLikelihood(model.likelihood, model)
    
    # Load state dict if passed
    if state_dict is not None:
        model.load_state_dict(state_dict)
    
    return mll, model

In [6]:
from botorch.acquisition.objective import ConstrainedMCObjective


def obj_callable(Z):
    return Z[..., 0]


def constraint_callable(Z):
    return Z[..., 1]


# Define a feasibility-weighted objective for optimization
constrained_obj = ConstrainedMCObjective(
    objective=obj_callable,
    constraints=[constraint_callable]
)

In [7]:
from botorch.optim import optimize_acqf


BATCH_SIZE = 3
bounds = torch.tensor([[0.0] * 6, [1.0] * 6], device=device, dtype=dtype)


def optimize_acqf_and_get_observation(acq_fn):
    '''
    Optimizes the acquisition function and returns a new candidate with a noisy observation.
    '''
    # Optimize
    candidates, _ = optimize_acqf(
        acq_function=acq_fn,
        bounds=bounds,
        q=BATCH_SIZE,
        num_starts=10,
        raw_samples=500,  # used for initialization heuristic
        options={
            'batch_limit': 5,
            'max_iter': 200
        }
    )
    
    # Observe new values
    new_x = candidates.detach()
    
    exact_obj = neg_hartmann6(new_x).unsqueeze(-1)
    exact_con = outcome_constraint(new_x).unsqueeze(-1)
    
    new_obj = exact_obj + NOISE_SE * torch.randn_like(exact_obj)
    new_con = exact_con + NOISE_SE * torch.randn_like(exact_con)
    
    return new_x, new_obj, new_con


def update_random_observations(best_random):
    '''
    Simulates a random policy by taking the current list of best observations,
    drawing a new random point, observing its value, and updating the list.
    '''
    rand_x = torch.rand(BATCH_SIZE, 6)
    next_random_best = weighted_obj(rand_x).max().item()
    best_random.append(max(best_random[-1], next_random_best))
    
    return best_random

In [None]:
from botorch import fit_gpytorch_model
from botorch.acquisition.monto_carlo import qExpectedImprovement, qNoisyExpectedImprovement
from botorch.sampling.samplers import SobolQMCNormalSampler
from botorch.exceptions import BadInitialCandidatesWarning
import time

import warnings
warnings.filterwarnings('ignore', category=BadInitialCandidatesWarning)
warnings.filterwarnings('ignore', category=RuntimeWarning)


N_TRIALS = 3
N_BATCH = 20
MC_SAMPLES = 500

verbose = True


best_observed_all_ei, best_observed_all_nei, best_random_all = [], [], []

# Average over multiple trials
for trial in range(1, N_TRIALS + 1):
    t0 = time.time()
    
    # Fit the model
    fit_gpytorch_model(mll_ei)
    fit_gpytorch_model(mll_nei)
    
    # Define the qEI and qNEI acquisition moddules using a QMC sampler
    qmc_sampler = SobolQMCNormalSampler(num_samples=MC_SAMPLES)
    
    # For best_f, use the best observed noisy values as an approximation
    qEI = qExpectedImprovement(
        model=model_ei,
        best_f=(train_obj_ei * (train_con_ei <= 0).to(train_obj_ei)).max(),
        sampler=pmc_sampler,
        objective=constrained_obj
    )
    
    qNEI = qNoisyExpectedImprovement(
        model=model_nei,
        X_baseline=x_train_nei,
        sampler=qmc_sampler,
        objective=constrained_obj
    )
    
    # Optimize and get new observations
    new_x_ei, new_obj_ei, new_con_ei = optimize_acqf_and_get_observation(qEI)
    new_x_nei, new_obj_nei, new_con_nei = optimize_acqf_and_get_observation(qNEI)
    
    # Update training points
    x_train_ei = torch.cat([x_train_ei, new_x_ei])
    train_obj_ei = torch.cat([train_obj_ei, new_obj_ei])
    train_con_ei = torch.cat([train_con_ei, new_con_ei])
    
    x_train_nei = torch.cat([x_train_nei, new_x_nei])
    train_obj_nei = torch.cat([train_obj_nei, new_obj_nei])
    train_con_nei = torch.cat([train_con_nei, new_con_nei])
    
    # Update progress
    best_random = update_random_observations(best_random)
    best_value_ei = weighted_obj(x_train_ei).max().item()
    best_value_nei = weighted_obj(x_train_nei).max().item()
    best_observed_ei.append(best_value_ei)
    best_observed_nei.append(best_value_nei)
    
    # Reinitialize the models
    mll_ei, model_ei = initialize_model(
        x_train_ei,
        train_obj_ei,
        train_con_ei,
        model_ei.state_dict()
    )
    mll_nei, model_nei = initialize_model(
        x_train_nei,
        train_obj_nei,
        train_con_nei,
        model_nei.state_dict()
    )
    
    t1 = time.time()
    
    if verbose:
        print(f'Batch {iteration}')