In [None]:
!pip install botorch

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
import os
import math
import torch

SEED = 8
torch.manual_seed(SEED)

<torch._C.Generator at 0x7f20860c2110>

In [None]:
from botorch.utils.transforms import standardize
bounds = torch.stack([torch.zeros(2), torch.ones(2)])
train_X = bounds[0] + (bounds[1] - bounds[0]) * torch.rand(20, 2)
train_Y = torch.sin(2 * math.pi * train_X[:, [0]]) * torch.cos(2 * math.pi * train_X[:, [1]])
train_Y = standardize(train_Y + 0.05 * torch.randn_like(train_Y))

In [None]:
from botorch.models import SingleTaskGP
from gpytorch.mlls import ExactMarginalLogLikelihood
from botorch.fit import fit_gpytorch_model
model = SingleTaskGP(train_X, train_Y)
mll = ExactMarginalLogLikelihood(model.likelihood, model)
fit_gpytorch_model(mll);

In [None]:
from botorch.acquisition import qKnowledgeGradient

NUM_FANTASIES = 128
qKG = qKnowledgeGradient(model, num_fantasies=NUM_FANTASIES)

In [None]:
from torch.quasirandom import SobolEngine
sobol_engine = SobolEngine(dimension=2, scramble=True, seed=8)
sobol_samples = sobol_engine.draw(NUM_FANTASIES)

In [None]:
qKG(torch.cat((train_X[0].view(1,-1),sobol_samples),0)).item()

torch.linalg.solve_triangular has its arguments reversed and does not return a copy of one of the inputs.
X = torch.triangular_solve(B, A).solution
should be replaced with
X = torch.linalg.solve_triangular(A, B). (Triggered internally at  ../aten/src/ATen/native/BatchLinearAlgebra.cpp:2183.)
  Linv = torch.triangular_solve(Eye, L, upper=False).solution


0.19309169054031372

In [None]:
# inner workings, not to be run
# sampler = SobolQMCNormalSampler(
#                 num_samples=num_fantasies, resample=False, collapse_batch_dims=True
#             )

# def _split_fantasy_points(X: Tensor, n_f: int) -> Tuple[Tensor, Tensor]:
#     if n_f > X.size(-2):
#         raise ValueError(
#             f"n_f ({n_f}) must be less than the q-batch dimension of X ({X.size(-2)})"
#         )
#     split_sizes = [X.size(-2) - n_f, n_f]
#     X_actual, X_fantasies = torch.split(X, split_sizes, dim=-2)
#     X_fantasies = X_fantasies.permute(-2, *range(X_fantasies.dim() - 2), -1)
#     X_fantasies = X_fantasies.unsqueeze(dim=-2)
#     return X_actual, X_fantasies

# def _get_value_function(
#     model: Model,
#     objective: Optional[MCAcquisitionObjective] = None,
#     posterior_transform: Optional[PosteriorTransform] = None,
#     sampler: Optional[MCSampler] = None,
#     project: Optional[Callable[[Tensor], Tensor]] = None,
#     valfunc_cls: Optional[Type[AcquisitionFunction]] = None,
#     valfunc_argfac: Optional[Callable[[Model, Dict[str, Any]]]] = None,
# ) -> AcquisitionFunction:
#     r"""Construct value function (i.e. inner acquisition function)."""
#     if valfunc_cls is not None:
#         common_kwargs: Dict[str, Any] = {
#             "model": model,
#             "posterior_transform": posterior_transform,
#         }
#         if issubclass(valfunc_cls, MCAcquisitionFunction):
#             common_kwargs["sampler"] = sampler
#             common_kwargs["objective"] = objective
#         kwargs = valfunc_argfac(model=model) if valfunc_argfac is not None else {}
#         base_value_function = valfunc_cls(**common_kwargs, **kwargs)
#     else:
#         if objective is not None:
#             base_value_function = qSimpleRegret(
#                 model=model,
#                 sampler=sampler,
#                 objective=objective,
#                 posterior_transform=posterior_transform,
#             )
#         else:
#             base_value_function = PosteriorMean(
#                 model=model, posterior_transform=posterior_transform
#             )

#     if project is None:
#         return base_value_function
#     else:
#         return ProjectedAcquisitionFunction(
#             base_value_function=base_value_function,
#             project=project,
#         )


# def forward(self, X: Tensor) -> Tensor:
#     # split fantasy location from the actual location under evaluation
#     X_actual, X_fantasies = _split_fantasy_points(X=X, n_f=self.num_fantasies)

#     # only concatenate X_pending into the X part after splitting
#     if self.X_pending is not None:
#         X_actual = torch.cat(
#             [X_actual, match_batch_shape(self.X_pending, X_actual)], dim=-2
#         )

#     # construct the fantasy model of shape `num_fantasies x b`
#     fantasy_model = self.model.fantasize(
#         X=X_actual, sampler=self.sampler, observation_noise=True
#     )

#     # get the value function
#     value_function = _get_value_function(
#         model=fantasy_model,
#         objective=self.objective,
#         posterior_transform=self.posterior_transform,
#         sampler=self.inner_sampler,
#     )

#     # make sure to propagate gradients to the fantasy model train inputs
#     with settings.propagate_grads(True):
#         values = value_function(X=X_fantasies)  # num_fantasies x b

    if self.current_value is not None:
        values = values - self.current_value

#     # return average over the fantasy samples
#     return values.mean(dim=0)


In [None]:
from botorch.sampling import SobolQMCNormalSampler
sampler = SobolQMCNormalSampler(num_samples=5, seed=1234)
fantasy_model = model.fantasize(train_X[0].view(1,-1), sampler, observation_noise=True)

from botorch.acquisition.analytic import PosteriorMean
PosteriorMean(model)(train_X[0].view(1,-1))

tensor([-0.3386], grad_fn=<ViewBackward0>)

In [None]:
PosteriorMean(model)(train_X[0].view(1,-1)).item()

-0.3386051654815674

In [None]:
PosteriorMean(fantasy_model)(train_X[0].view(1,-1))

tensor([-0.4586, -0.2904, -0.2234, -0.3893, -0.3752], grad_fn=<ViewBackward0>)

In [None]:
# def gen_one_shot_kg_initial_conditions(
#     acq_function: qKnowledgeGradient,
#     bounds: Tensor,
#     q: int,
#     num_restarts: int,
#     raw_samples: int,
#     fixed_features: Optional[Dict[int, float]] = None,
#     options: Optional[Dict[str, Union[bool, float, int]]] = None,
#     inequality_constraints: Optional[List[Tuple[Tensor, Tensor, float]]] = None,
#     equality_constraints: Optional[List[Tuple[Tensor, Tensor, float]]] = None,
# ) -> Optional[Tensor]:
#     options = options or {}
#     frac_random: float = options.get("frac_random", 0.1)
#     if not 0 < frac_random < 1:
#         raise ValueError(
#             f"frac_random must take on values in (0,1). Value: {frac_random}"
#         )
#     q_aug = acq_function.get_augmented_q_batch_size(q=q)

#     ics = gen_batch_initial_conditions(
#         acq_function=acq_function,
#         bounds=bounds,
#         q=q_aug,
#         num_restarts=num_restarts,
#         raw_samples=raw_samples,
#         fixed_features=fixed_features,
#         options=options,
#         inequality_constraints=inequality_constraints,
#         equality_constraints=equality_constraints,
#     )

#     # compute maximizer of the value function
#     value_function = _get_value_function(
#         model=acq_function.model,
#         objective=acq_function.objective,
#         posterior_transform=acq_function.posterior_transform,
#         sampler=acq_function.inner_sampler,
#         project=getattr(acq_function, "project", None),
#     )
#     from botorch.optim.optimize import optimize_acqf

#     fantasy_cands, fantasy_vals = optimize_acqf(
#         acq_function=value_function,
#         bounds=bounds,
#         q=1,
#         num_restarts=options.get("num_inner_restarts", 20),
#         raw_samples=options.get("raw_inner_samples", 1024),
#         fixed_features=fixed_features,
#         return_best_only=False,
#         inequality_constraints=inequality_constraints,
#         equality_constraints=equality_constraints,
#     )

#     # sampling from the optimizers
#     n_value = int((1 - frac_random) * (q_aug - q))  # number of non-random ICs
#     eta = options.get("eta", 2.0)
#     weights = torch.exp(eta * standardize(fantasy_vals))
#     idx = torch.multinomial(weights, num_restarts * n_value, replacement=True)

#     # set the respective initial conditions to the sampled optimizers
#     ics[..., -n_value:, :] = fantasy_cands[idx, 0].view(num_restarts, n_value, -1)
#     return ics


# def get_augmented_q_batch_size(self, q: int) -> int:
#     r"""Get augmented q batch size for one-shot optimization.

#     Args:
#         q: The number of candidates to consider jointly.

#     Returns:
#         The augmented size for one-shot optimization (including variables
#         parameterizing the fantasy solutions).
#     """
#     return q + self.num_fantasies

In [None]:
from botorch.optim.initializers import gen_one_shot_kg_initial_conditions

Xinit = gen_one_shot_kg_initial_conditions(qKG, bounds, q=2, num_restarts=10, 
                                           raw_samples=512, options={"frac_random": 0.25})
Xinit.size()

torch.Size([10, 130, 2])

In [None]:
from botorch.optim import optimize_acqf
from botorch.utils.sampling import manual_seed

NUM_RESTARTS = 10
RAW_SAMPLES = 512

with manual_seed(1234):
    candidates, acq_value = optimize_acqf(
        acq_function=qKG, 
        bounds=bounds,
        q=2,
        num_restarts=NUM_RESTARTS,
        raw_samples=RAW_SAMPLES,
    )

In [None]:
candidates

tensor([[0.3940, 1.0000],
        [0.0950, 0.0000]])

In [None]:
acq_value

tensor(2.0358)

In [None]:
from botorch.acquisition import PosteriorMean

NUM_RESTARTS = 20
RAW_SAMPLES = 2048

argmax_pmean, max_pmean = optimize_acqf(
    acq_function=PosteriorMean(model), 
    bounds=bounds,
    q=1,
    num_restarts=NUM_RESTARTS,
    raw_samples=RAW_SAMPLES
)

In [None]:
max_pmean

tensor(1.9548)

In [None]:
qKG_proper = qKnowledgeGradient(
    model,
    num_fantasies=NUM_FANTASIES,
    sampler=qKG.sampler,
    current_value=max_pmean,
)

with manual_seed(1234):
    candidates_proper, acq_value_proper = optimize_acqf(
        acq_function=qKG_proper, 
        bounds=bounds,
        q=2,
        num_restarts=NUM_RESTARTS,
        raw_samples=RAW_SAMPLES,
    )

Trying again with a new set of initial conditions.


In [None]:
candidates_proper

tensor([[0.2070, 1.0000],
        [0.0874, 0.0122]])

In [None]:
acq_value_proper

tensor(0.0107)