# BOTorch tutorial
Adapted of https://www.youtube.com/watch?v=BQ4kVn-Rt84

First we install BOTorch

In [None]:
!pip install botorch

Import libraries

In [None]:
import os
import torch
import numpy as np
import plotly

Objective function: Sphere 1D very steep. Obviously, we could use other methods here as it is convex optimization, but it is just a toy problem.

$f(x) = 1000000 x^2$

In [None]:
def target_function(individuals):
  result = []
  for x in individuals:
    result.append(1e7*x[0]**2)
  return torch.tensor(result)

Print objective function that we want to maximize.

In [None]:
import plotly.graph_objects as go

lower_bound = -0.1
upper_bound = 0.1
x = np.linspace(lower_bound, upper_bound, 100)
x_new = x.reshape((100,-1))
z = target_function(x_new)

data = go.Scatter(x=x, y=z, line_color="#FE73FF")

fig = go.Figure(data=data)
fig.update_layout(title="Objective function", xaxis_title="input", yaxis_title="output")
fig.show()

Generate some data. Example of 10 random points from the input space, normalized into the range.

In [None]:
train_x = torch.rand(10000, 1) / 5 - upper_bound
print(train_x.min())
print(train_x.max())

Then we compute the latent function $f(x)$. The true evaluation would be contaminated. $y = f(x) + \epsilon \quad s.t. \quad \epsilon \approx N(0, \sigma)$

In [None]:
exact_obj = target_function(train_x).unsqueeze(-1)
exact_obj

Let us see which is the best observed value so far. Assuming minimization.

In [None]:
best_observed_value = exact_obj.min().item()
best_observed_value

We wrap all of the previous code into a function to be used freely

In [None]:
def generate_initial_data(n=10):
  train_x = torch.rand(n, 1, dtype=torch.double) / 5 - upper_bound
  exact_obj = target_function(train_x).unsqueeze(-1)
  best_observed_value = exact_obj.min().item()
  return train_x, exact_obj, best_observed_value

In [None]:
generate_initial_data(20)

Let us now invoke this function to start the BO iteration and set the bounds of the 1-D $f(x) : x \in [-0.1,0.1]$

In [None]:
init_x, init_y, best_init_y = generate_initial_data(20)
bounds = torch.tensor([[lower_bound], [upper_bound]]) #bounds for 2D: torch.tensor([[0., 1.], [10.,2.]]) 

We set which model and which likelihood will we use. In our case we will use a classic Gaussian process and compute its hyper-parameters using the exact marginal log likelihood (which can produce overfitting when points are reduced but well...). We will not normalize the inputs here nor standardize the outputs, so the hyper-parameters of the GP kernel set by default are not going to be a good prior for this function, and this will significantly hurt the optimization, as we will see. 

In [None]:
from botorch.models import SingleTaskGP, ModelListGP
from gpytorch.mlls.exact_marginal_log_likelihood import ExactMarginalLogLikelihood

single_model = SingleTaskGP(init_x, init_y)
mll = ExactMarginalLogLikelihood(single_model.likelihood, single_model)

Now that our model is declared, we fit the previous points with the Gaussian process setting its hyperparameters via Exact Marginal log likelihood of the points. The output shows the default covariance function used by the GP and its hyper-hyperparameters. It also shows the Gaussian likelihood used and the homoskedastic noise added to the Matern Kernel to capture the noise of the data. 

In [None]:
from botorch import fit_gpytorch_model
fit_gpytorch_model(mll)

Now we declare the acquisition function that is going to be computed using the predictive distribution of the previous Gaussian process in all the input space. We will use the expected improvement

In [None]:
from botorch.acquisition.analytic import ExpectedImprovement #use the noisy version if the problem has noise

EI = ExpectedImprovement(model=single_model, best_f=best_init_y)

We will now optimize the acquisition function, all the hyper parameters here are a good heuristic default to try and find the global optima of the acquisition function



In [None]:
from botorch.optim import optimize_acqf

candidates, _ = optimize_acqf(acq_function=EI, bounds=bounds, q=1, num_restarts=200, raw_samples=512, options={"batch_limit": 5, "maxiter": 200})

candidates

We now have all the code of an iteration so we just put it in a loop. To do so: We just wrap previous code into a function.

In [None]:
def get_next_points(init_x, init_y, best_init_y, bounds, n_points=1):
  single_model = SingleTaskGP(init_x, init_y)
  mll = ExactMarginalLogLikelihood(single_model.likelihood, single_model)
  fit_gpytorch_model(mll)

  EI = ExpectedImprovement(model=single_model, best_f=best_init_y, maximize=False)
  candidates, _ = optimize_acqf(acq_function=EI, bounds=bounds, q=n_points, num_restarts=200, raw_samples=512, options={"batch_limit": 5, "maxiter": 200})

  return candidates


We test the function

In [None]:
get_next_points(init_x, init_y, best_init_y, bounds, n_points=1)

Finally, we embed the previous code into the Bayesian optimization loop

In [None]:
n_iterations=2

init_x, init_y, best_init_y = generate_initial_data(20)
bounds = torch.tensor([[lower_bound], [upper_bound]])

for i in range(n_iterations):
  print(f"Number of iterations done: {i}")
  new_candidates = get_next_points(init_x, init_y, best_init_y, bounds, 1)
  new_results = target_function(new_candidates).unsqueeze(-1)

  print(f"New candidates are: {new_candidates}")
  init_x = torch.cat([init_x, new_candidates])
  init_y = torch.cat([init_y, new_results])

  best_init_y = init_y.max().item()
  print(f"Best point performs this way: {best_init_y}")


Get the best observed result of the optimization. We can see in the previous figure how the result is exactly the maximum. The optimization has been successful.

In [None]:
print(f"Best observed result: {best_init_y}")
best_candidate = init_x[((init_y == best_init_y).nonzero(as_tuple=True)[0])][0][0]
print(f"Best location of observed result: {best_candidate}")

In [None]:
def print_objective_function(best_candidate, iteration, l_bound=0, h_bound=1):
  x = np.linspace(l_bound, h_bound, 100)
  x_new = x.reshape((100,-1))
  z = target_function(x_new)

  data = go.Scatter(x=x, y=z, line_color="#FE73FF")

  fig = go.Figure(data=data)
  fig.update_layout(title="Objective function. Iteration " + str(iteration), xaxis_title="input", yaxis_title="output")
  fig.add_vline(x=best_candidate, line_width=3, line_color="red")
  fig.show()


print_objective_function(best_candidate, 1, l_bound=lower_bound, h_bound=upper_bound)

In [None]:
x = torch.linspace(lower_bound, upper_bound, steps=100)
x_test = torch.tensor([x[0]]).unsqueeze(-1)
EI = qExpectedImprovement(model=single_model, best_f=best_init_y)
EI(x_test)

We can also plot the acquisition function, with its maximum, which is the point suggested to be evaluated in the next iteration

In [None]:
def compute_acquisition_function(single_model, best_init_y, l_bound=-2., h_bound=10., resolution=1000):
  linspace = torch.linspace(l_bound, h_bound, steps=resolution)
  x_test = torch.tensor([linspace[0]]).unsqueeze(-1)
  EI = ExpectedImprovement(model=single_model, best_f=best_init_y, maximize=False)
  result = []
  for x in linspace:
    x_test = torch.tensor([x]).unsqueeze(-1)
    result.append(EI(x_test))
  return torch.tensor(result)

In [None]:
def print_acquisition_function(acq_fun, iteration, l_bound=-2., h_bound=10., resolution=1000, suggested=None):
  x = torch.linspace(l_bound, h_bound, steps=resolution).detach().numpy()
  x_new = x.reshape((resolution,-1))
  z = acq_fun
  max_acq_fun = x[((acq_fun == acq_fun.max().item()).nonzero(as_tuple=True)[0])]
  data = go.Scatter(x=x, y=z, line_color="yellow")

  fig = go.Figure(data=data)
  fig.update_layout(title="Expected Improvement acquisition function. Iteration " + str(iteration), xaxis_title="input", yaxis_title="output")
  if(suggested==None):
    fig.add_vline(x=max_acq_fun, line_width=3, line_color="red")
  else:
    fig.add_vline(x=float(suggested[0][0]), line_width=3, line_color="red")
  fig.show()
  

In [None]:
acq_fun = compute_acquisition_function(single_model, best_init_y, l_bound=lower_bound, h_bound=upper_bound)
print_acquisition_function(acq_fun, 1)

We can as well plot the GP predictive mean and standard deviation, its predictive distribution, for all the input space.

In [None]:
def compute_predictive_distribution(single_model, best_init_y, l_bound=-2., h_bound=10., resolution=1000):
  linspace = torch.linspace(l_bound, h_bound, steps=resolution)
  x_test = torch.tensor([linspace[0]]).unsqueeze(-1)
  result = []
  variances = []
  for x in linspace:
    x_test = torch.tensor([x]).unsqueeze(-1)
    result.append(single_model.posterior(x_test).mean)
    variances.append(single_model.posterior(x_test).variance)
  return torch.tensor(result), torch.tensor(variances)

In [None]:
def print_predictive_mean(predictive_mean, predictive_variance, iteration, l_bound=-2., h_bound=10., resolution=1000, suggested=None, old_obs=[], old_values=[]):
  x = torch.linspace(l_bound, h_bound, steps=resolution).detach().numpy()
  x_new = x.reshape((resolution,-1))
  z = predictive_mean
  max_predictive_mean = x[((predictive_mean == predictive_mean.max().item()).nonzero(as_tuple=True)[0])]

  fig = go.Figure()

  fig.add_trace(go.Scatter(x=x, y= predictive_mean + np.sqrt(predictive_variance),
                                     mode='lines',
                                     line=dict(color="#19D3F3",width =0.1),
                                     name='upper bound'))
  fig.add_trace(go.Scatter(x=x, y= predictive_mean,
                         mode='lines',
                         line=dict(color="blue"),
                         fill='tonexty',
                         name='predictive mean'))
  fig.add_trace(go.Scatter(x=x, y= predictive_mean - np.sqrt(predictive_variance),
                         mode='lines',
                         line=dict(color="blue", width =0.1),
                         fill='tonexty',
                         name='lower bound'))
  
  
  
  fig.update_layout(title="GP Predictive distribution. Iteration " + str(iteration), xaxis_title="input", yaxis_title="output", showlegend=False)

  if(suggested==None):
    fig.add_vline(x=max_predictive_mean, line_width=3, line_color="red")
  else:
    fig.add_vline(x=float(suggested[0][0]), line_width=3, line_color="red")  

  if(len(old_obs)>0):
    fig.add_trace(go.Scatter(x=old_obs, y=old_values, mode = 'markers', marker_color="black", marker_size=10))

  fig.show()

In [None]:
predictive_mean, predictive_variance = compute_predictive_distribution(single_model, best_init_y, l_bound=lower_bound, h_bound=upper_bound)
print_predictive_mean(predictive_mean, predictive_variance, 1, l_bound=lower_bound, h_bound=upper_bound)

We can embed all this logic into the BO loop to have visualizations of the objective function, GP predictive distribution and acquisition function in every iteration.

In [None]:
def visualize_functions(single_model, best_init_y, best_candidate, candidate_acq_fun, iteration, previous_observations, previous_values, bounds):
  predictive_mean, predictive_variance = compute_predictive_distribution(single_model, best_init_y, l_bound=bounds[0][0], h_bound=bounds[1][0])
  print_predictive_mean(predictive_mean, predictive_variance, iteration, suggested=candidate_acq_fun, old_obs=previous_observations, old_values=previous_values, l_bound=bounds[0][0], h_bound=bounds[1][0])
  acq_fun = compute_acquisition_function(single_model, best_init_y, l_bound=bounds[0][0], h_bound=bounds[1][0])
  print_acquisition_function(acq_fun, iteration, suggested=candidate_acq_fun, l_bound=bounds[0][0], h_bound=bounds[1][0])
  print_objective_function(best_candidate, iteration, l_bound=bounds[0][0], h_bound=bounds[1][0])

def get_next_points_and_visualize(init_x, init_y, best_init_y, bounds, iteration, previous_observations, previous_values, n_points=1):
  single_model = SingleTaskGP(init_x, init_y)
  mll = ExactMarginalLogLikelihood(single_model.likelihood, single_model)
  fit_gpytorch_model(mll)

  EI = ExpectedImprovement(model=single_model, best_f=best_init_y, maximize=False)
  
  candidates, _ = optimize_acqf(acq_function=EI, bounds=bounds, q=n_points, num_restarts=200, raw_samples=512, options={"batch_limit": 5, "maxiter": 200})
  best_candidate = init_x[((init_y == best_init_y).nonzero(as_tuple=True)[0])][0][0]

  visualize_functions(single_model, best_init_y, best_candidate, candidates, iteration, previous_observations, previous_values, bounds)

  return candidates

In [None]:
n_iterations=10

init_x, init_y, best_init_y = generate_initial_data(20)
bounds = torch.tensor([[lower_bound], [upper_bound]])

candidates=[]
results=[]
for i in range(n_iterations):
  print(f"Number of iterations done: {i}")
  new_candidates = get_next_points_and_visualize(init_x, init_y, best_init_y, bounds, i, candidates, results, 1)
  new_results = target_function(new_candidates).unsqueeze(-1)

  print(f"New candidates are: {new_candidates}")
  init_x = torch.cat([init_x, new_candidates])
  init_y = torch.cat([init_y, new_results])

  best_init_y = init_y.min().item()
  print(f"Best point performs this way: {best_init_y}")
  candidates.append(float(new_candidates[0][0]))
  results.append(float(new_results[0][0]))

We are now going to fix the previous behaviour normalizing and standardizing both inputs and outputs into the unit cube to avoid the previously seen weird behaviour of the GP and, consequently, of the acquisition function. We import both utilities from botorch.

In [None]:
from botorch.utils.transforms import standardize, normalize, unnormalize

We now test the normalize function, that basically does

$\mathbb{R}^1 \in [-0.1 , 0.1] \to \mathbb{R}^1 \in [0 , 1]$ 

Easy stuff. Bounds are set before, but would need to be set for another problem according to its particular details.

In [None]:
inputs = generate_initial_data(20)[0]
print("Inputs without normalization: " + str(inputs))
print("Normalized inputs: " + str(normalize(inputs, bounds=bounds)))

Let us standardize (mean=0, std=1) the outputs. Watch out! Standardizing is not the same as normalizing. BOTorch assumes that observations are standardized and that inputs are normalized. Let us make BOTorch happy. Here goes the code for standarization, how well does it do it!!!

In [None]:
outputs = generate_initial_data(20)[1]
print("Outputs without standarization: " + str(outputs))
print("Standarized outputs: " + str(standardize(outputs)))

Let us mix everything, as in BO would be used.

In [None]:
data = generate_initial_data(20)
inputs = data[0]
outputs = data[1]

In [None]:
normalized_inputs = normalize(inputs, bounds=bounds)
standardized_outputs = standardize(outputs)
print("Normalized inputs: " + str(normalized_inputs))
print("Standarized outputs: " + str(standardized_outputs))

We can now embed this code into the previous BO loop. Watch out! We know the real bounds but we need to "lie" to the Gaussian process and make it think that the bounds are $[0,1]$ for the inputs and a Z(0,1) for the outputs. Let us cheat a bit the happy ignorant GP using the function unnormalize to evaluate the objective function there, while the GP thinks that is normalized into the $[0,1]$ cube: 

In [None]:
def visualize_functions(single_model, best_init_y, best_candidate, candidate_acq_fun, iteration, previous_observations, previous_values, bounds, best_candidate_normalized):
  predictive_mean, predictive_variance = compute_predictive_distribution(single_model, best_init_y, l_bound=0, h_bound=1)
  print_predictive_mean(predictive_mean, predictive_variance, iteration, suggested=candidate_acq_fun, old_obs=previous_observations, old_values=previous_values, l_bound=bounds[0][0], h_bound=bounds[1][0])
  acq_fun = compute_acquisition_function(single_model, best_init_y, l_bound=0, h_bound=1)
  print_acquisition_function(acq_fun, iteration, suggested=candidate_acq_fun, l_bound=bounds[0][0], h_bound=bounds[1][0])
  print_objective_function(best_candidate, iteration, l_bound=bounds[0][0], h_bound=bounds[1][0])

def get_next_points_and_visualize_norm(init_x, init_y, best_init_y, normalized_bounds, iteration, previous_observations, previous_values, bounds, n_points=1):
  single_model = SingleTaskGP(init_x, init_y)
  mll = ExactMarginalLogLikelihood(single_model.likelihood, single_model)
  fit_gpytorch_model(mll)

  EI = ExpectedImprovement(model=single_model, best_f=best_init_y, maximize=False)
  
  candidates, _ = optimize_acqf(acq_function=EI, bounds=normalized_bounds, q=n_points, num_restarts=200, raw_samples=512, options={"batch_limit": 5, "maxiter": 200})
  best_candidate = unnormalize(init_x[((init_y == best_init_y).nonzero(as_tuple=True)[0])][0][0], bounds=normalized_bounds)
  best_candidate_normalized = init_x[((init_y == best_init_y).nonzero(as_tuple=True)[0])][0][0]

  visualize_functions(single_model, best_init_y, best_candidate, unnormalize(candidates, bounds=bounds), iteration, previous_observations, previous_values, bounds, best_candidate_normalized)

  return candidates

First the version without visualizations. First 5 points are chosen at random. Just to see whether everything is OK. Watch out!!! The bounds must be also normalized now... easy to not have that into account.

In [None]:
n_iterations=10

init_x, init_y, best_init_y = generate_initial_data(5)
bounds = torch.tensor([[lower_bound], [upper_bound]])
normalized_bounds = torch.tensor([[0.0], [1.0]])
init_x_normalized = normalize(init_x, bounds=bounds)
init_y_standardized = standardize(init_y)
best_init_y_standardized = init_y_standardized.min().item()


candidates=[]
results=[]
for i in range(n_iterations):
  print(f"Number of iterations done: {i}")
  normalized_new_candidates = get_next_points(init_x_normalized, init_y_standardized, best_init_y_standardized, normalized_bounds, 1)
  new_candidates = unnormalize(normalized_new_candidates, bounds=bounds)
  new_results = target_function(new_candidates).unsqueeze(-1)

  print(f"New candidates are: {new_candidates}")
  init_x = torch.cat([init_x, new_candidates])
  init_y = torch.cat([init_y, new_results])
  init_x_normalized = normalize(init_x, bounds=bounds)
  init_y_standardized = standardize(init_y)

  best_init_y = init_y.min().item()
  best_init_y_standardized = init_y_standardized.min().item()
  print(f"Best point performs this way: {best_init_y}")
  candidates.append(float(normalized_new_candidates[0][0]))
  results.append(float(standardize(new_results[0][0])))

Seems OK! Bayesian optimization is exploring the center of our 1D sphere, just as we want it. So the GP now works!!! Now the version with visualizations, some little tricks must be done, as specifing normalized bounds for optimization but bounds for visualization, unnormalize the best candidate and so on...

In [None]:
n_iterations=10

init_x, init_y, best_init_y = generate_initial_data(5)
bounds = torch.tensor([[lower_bound], [upper_bound]])
normalized_bounds = torch.tensor([[0.0], [1.0]])
init_x_normalized = normalize(init_x, bounds=bounds)
init_y_standardized = standardize(init_y)
best_init_y_standardized = init_y_standardized.min().item()

candidates=[]
results=[]
for i in range(n_iterations):
  print(f"Number of iterations done: {i}")
  normalized_new_candidates = get_next_points_and_visualize_norm(init_x_normalized, init_y_standardized, best_init_y_standardized, normalized_bounds, i, init_x, init_y, bounds, 1)
  new_candidates = unnormalize(normalized_new_candidates, bounds=bounds)
  new_results = target_function(new_candidates).unsqueeze(-1)

  print(f"New candidates are: {new_candidates}")
  init_x = torch.cat([init_x, new_candidates])
  init_y = torch.cat([init_y, new_results])
  init_x_normalized = normalize(init_x, bounds=bounds)
  init_y_standardized = standardize(init_y)

  best_init_y = init_y.min().item()
  best_init_y_standardized = init_y_standardized.min().item()
  print(f"Best point performs this way: {best_init_y}")
  candidates.append(float(normalized_new_candidates[0][0]))
  results.append(float(standardize(new_results[0][0])))