## Customize Simulation with Acquisition-guided sampling

In [1]:
## load modules needed

from main_sim import *
import pickle

import math
import torch
from torch import Tensor
from typing import Optional, Union, Tuple

## modules for customized test function
from botorch.test_functions.synthetic import SyntheticTestFunction
from botorch.test_functions.base import BaseTestProblem

from botorch.models.model import Model
from botorch.utils import t_batch_mode_transform

from torch.distributions import Normal

## modules for customized acquisition function
from botorch.acquisition import AnalyticAcquisitionFunction
from botorch.acquisition.objective import PosteriorTransform


## specify the directory for saving resutls 
output_dir = os.path.join(os.getcwd(), "results")

## specify the device to make sure only `cuda` or `cpu` is used, for consistency
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")



### Specify the settings 

Some attributes are listed as following: 

* `opt_type`: the approach of getting the next suggestion, can be chosen from (`random`, `opt`, `sample`)

* `threshold_value`: the pre-specified threshold of interest

* `n_sugs`: the number of suggestions proposed each time

* `M`: the number of steps for HMC

* `L`: the number of leapfrog steps in HMC

* `epsilon`: the stepsize used in HMC

* `gfn`: the transformation function, can be chosen from (`exp`, `identity`)

* `hmc_type`: the type of HMC used

* `x0_type`: the approach of proposing the initial points 

* `select_criteria`: the criteria used for pre-screening

* `sel_options`: the criteria used for selection

* `screenXY`: the criteria used for subset selection

In [2]:
## specify the settings 

options = {"opt_type": "sample",
           "threshold_value": 0.,
           "beta": 1.,
           "n_sugs": 1,
           "return_best_only": True,
           "ts_sampler": "cholesky",
           "n_candidates": 2000,
           "alpha": 0.1,
           "batch_limit": 5,
           "init_batch_limit": 5,
           "nonnegative": True,
           "gfn": "exp",
           "M": 100,
           "L": 20,
           "epsilon": 1e-4,
           "burnin": 20,
           "n_gap": 10,
           "hmc_type": "hmc_wc",
           "x0_type": "qmc",
           "use_keops": False,
           "seed": 1025,
           "select_criteria": {"bws_type": "quantile", "bws_param": 0.5, "bwe_type": "quantile", "bwe_param": 0.5, "bwe_threshold": 0.5},
           "sel_options": {"type": "mu"},
           "standardized_Y": False,
           "normal_X": True,
           "screenXY": {"type": "sample", "N": 10, "gamma": 0.6},
           }

### Specify the function of interest

You can specify the function of interest as follows. 

In [None]:
class CustomizedFunction(SyntheticTestFunction):
    r"""Forrester test function.

    1-dimensional function (evluated on `[0, 1]`):

        f(x) = (6x - 2)^2 x sin(12x - 4)

    f has two minimizers for its global minimum at 0.7572727 with f(x) = -6.02074
    """

    dim =                        # the dimension of the function of interest
    _bounds =                    # a list of size dim, each entry is a tuple (lb, ub), e.g. [(lb_1, ub_1), (lb_2, ub_2), ...]
    _optimal_value =             # the optimal value of the function 
    _optimizers =                # the locations provides the optimal value [(x_11, x_12, ...), (x_21, x_22, ...), ...]

    def evaluate_true(self, X: Tensor) -> Tensor:
        '''
        specify the function here, using `torch`. 
        '''
        return ...

We can also create new acquisition function to be used. 

In [None]:
## probability ratio (PR)
class CustomizeAcquisition(AnalyticAcquisitionFunction):
    r"""Single-outcome Customized Acquisition Function.

    Description for this newly designed acquisition function if needed. 
    `Acq(x) = P(y >= best_f) / P(y < best_f), y ~ f(x)`

    Example:
        >>> train_X, train_Y, test_X, test_Y = # generate data as you like and satisfy the data format
        >>> model = SingleTaskGP(train_X, train_Y)
        >>> newAcq = CustomizeAcquisition(model, best_f=0.1)
        >>> acq_val = newAcq(test_X)
    """

    def __init__(
            self,
            model: Model,
            best_f: Union[float, Tensor],
            maximize: bool = True,
            ...
    ) -> None:
        r""" 

        Args:
            model: A fitted single-outcome model
            best_f:  Either a scalar or a `b`-dim Tensor (batch mode) representing
                the best function value observed so far (assumed noiseless).
            maximize: If True, consider the problem a maximization problem.
            and other attributes if needed. 
        """
        super(AnalyticAcquisitionFunction, self).__init__(model=model)
        self.maximize = maximize
        if not torch.is_tensor(best_f):
            best_f = torch.tensor(best_f)
        self.register_buffer("best_f", best_f)

    @t_batch_mode_transform(expected_q=1)
    def forward(self, X: Tensor) -> Tensor:
        r"""Evaluate the acquisition on the candidate set X.

        Args: 
            X: A `(b) x 1 x d`-dim Tensor of `(b)` t-batches of `d`-dim design
                points each.
        Returns:
            A `(b)`-dim tensor of Probability of Improvement values at the given
            design points `X`.
        
        """
        ## specify the acquisition formula here 
        return ...

In [3]:
### Specify the test functions and acquisition functions of interest
dfn = "forrester"
acq = "ei"

### Update the options setting

It's easy to change the settings specified in options through `options.update({key: new_value})`. 

In [None]:
options.update({"epsilon": 1e-5})
options.update({"sel_options": {"type": "grad_mu"}})

### Run the simulation

In [5]:
## Set up the initial dataset and budget
n_init = 3
options.update({'threshold_value': -0.5})
budget = 10

# specify the number of replicates
M = 1

# specify the batch size
batch_size = 1

# number of suggestions
n_sugs = 1

res_acq = []

# specify if we want to do maximization or minimization
negate = True

post_processing_func = post_process_func
acqf = options.get("acqf", acq)
standardized_Y = options.get("standardized_Y", True)
normal_X = options.get("normal_X", True)
num_restarts = 20
raw_samples = 512

for j in range(M):
    obj, X, Y = initialize_func_data(n_init, dfn, noise_err = None, negate = negate)
    Y = Y.view(-1, 1)
    print(f"Iteration {j}")
    options.update({"bounds": obj.bounds})
        
    for i in range(n_init):
        print(f"Iteration: {0}: X = {X[i].numpy()} | y = {- Y[i].numpy()}")
        
    state = TurboState(obj.dim, batch_size=batch_size)
    screen_criteria = options.get("screenXY", None)
    options.update({"state": state})
        
    def eval_obj(x):
        if normal_X:
            return obj(unnormalize(x, obj.bounds))
        else:
            return obj(x)
            
    N_get = 0
    seed = 1025
    options.update({"seed": seed * j + 1992})
    torch.manual_seed(seed * j + 1992)
    while N_get < budget:
        if screen_criteria is None:
            train_X = X
            train_Y = Y
        else:
            train_X, train_Y = ScreenXY(X, Y, screen_criteria)
            
        if normal_X:
            train_X = normalize(train_X, obj.bounds)
            
        best_observed_value = Y.max().item()

        # get the GP model
        model = SingleTaskGP(train_X, train_Y)
        mll = ExactMarginalLogLikelihood(model.likelihood, model)
        _ = fit_gpytorch_model(mll)

        # specify the selection function
        options.update({"X_keep": train_X})
        sel_options = options.get("sel_options")
        def sel_func(x):
            sel_options.update({"best_f": best_observed_value})
            sel_options.update({"threshold_value": options.get("threshold_value")})
            sel_options.update({"X_keep": train_X})
            return selection_func(x, model, sel_options)
            
        options.update({"sel_func": sel_func})

        # get the next proposal
        opt_type = options.get("opt_type")
        if opt_type == "random":
            if normal_X:
                X_next = torch.rand(n_sugs, obj.dim)
            else:
                X_next = unnormalize(torch.rand(n_sugs, obj.dim), obj.bounds)
        else:
            try:
                X_next = generate_batch(N_get, model, train_X, train_Y, batch_size, acqf, options, num_restarts, raw_samples, post_processing_func)
            except:
                with gpytorch.settings.cholesky_jitter(1e-3):
                    X_next = generate_batch(N_get, model, train_X, train_Y, batch_size, acqf, options, num_restarts, raw_samples, post_processing_func)
            
        X_next = X_next.view(-1, obj.dim)
        Y_next = torch.tensor([eval_obj(x) for x in X_next], dtype = dtype, device = device).unsqueeze(-1)
            
        # get back the original space
        if normal_X:
            ori_X_next = unnormalize(X_next, obj.bounds)
        else:
            ori_X_next = X_next

        # concatenate to the X dimension
        X = torch.cat((X, ori_X_next), dim = 0)
        Y = torch.cat((Y, Y_next), dim = 0)

        # print the result for each steps
        if negate:
            for k in range(n_sugs):
                print(f"Iteration: {N_get + k + 1}: X = {ori_X_next[k].numpy()} | y = {- Y_next[k].numpy()}")
        else:
            for k in range(n_sugs):
                print(f"Iteration: {N_get + k + 1}: X = {ori_X_next[k].numpy()} | y = {Y_next[k].numpy()}")
            
        N_get += n_sugs
            
    # save to numpy
    resX_np = X.cpu().detach().numpy()
    resY_np = Y.cpu().detach().numpy()
    if negate:
        res_np = np.hstack([resX_np, -resY_np])
    else:
        res_np = np.hstack([resX_np, resY_np])
        
    # append to the result list
    res_acq.append(res_np)

# save to a file
# outputfile = os.path.join(output_dir, options.get("opt_type") + "_" + dfn + "_" + acq + "_" + options.get("gfn") + "_seed_" + str(options.get("seed")) + "_rep_" + str(M) + ".pickle")
# with open(outputfile, 'wb') as f:
#     pickle.dump(res_acq, f)

Iteration 0
Iteration: 0: X = [1.] | y = [15.829732]
Iteration: 0: X = [0.998999] | y = [15.80897]
Iteration: 0: X = [0.997998] | y = [15.785848]
Iteration: 1: X = [0.27973783] | y = [-0.06201614]
Iteration: 2: X = [0.395464] | y = [0.09427444]
Iteration: 3: X = [0.30001187] | y = [-0.01556039]
Iteration: 4: X = [0.1676335] | y = [-0.90349013]
Iteration: 5: X = [0.0923351] | y = [-0.51651108]
Iteration: 6: X = [0.1699805] | y = [-0.88870013]
Iteration: 7: X = [0.14754355] | y = [-0.98268187]
Iteration: 8: X = [0.15876794] | y = [-0.94984275]
Iteration: 9: X = [0.14840773] | y = [-0.98132318]
Iteration: 10: X = [0.32375342] | y = [-0.00037898]


In [None]:
sample_benchmark(dfn, acq, options, output_dir, n_sugs = 1, M = 1)

forrester
ei
Iteration 0
Iteration: 0: X = [1.] | y = [15.829732]
Iteration: 0: X = [0.998999] | y = [15.80897]
Iteration: 0: X = [0.997998] | y = [15.785848]
Iteration: 1: X = [0.27973783] | y = [-0.06201614]
Iteration: 2: X = [0.395464] | y = [0.09427444]
Iteration: 3: X = [0.30001187] | y = [-0.01556039]
Iteration: 4: X = [0.1676335] | y = [-0.90349013]
Iteration: 5: X = [0.0923351] | y = [-0.51651108]
Iteration: 6: X = [0.1699805] | y = [-0.88870013]
Iteration: 7: X = [0.14754355] | y = [-0.98268187]
Iteration: 8: X = [0.15876794] | y = [-0.94984275]
Iteration: 9: X = [0.14840773] | y = [-0.98132318]
Iteration: 10: X = [0.32375342] | y = [-0.00037898]
