In [None]:
import copy
import math
import torch
import pyro
import numpy as np
import matplotlib.pyplot as plt

import torch.optim as optim
import torch.autograd as autograd
from torch.distributions import constraints, transform_to

import pyro.distributions as dist
import pyro.contrib.gp as gp

In [None]:
seed_number = 333

def set_random_seed(seed):
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)
    
set_random_seed(seed_number)

# Objective function

In [None]:
a = -5
b = 0
c = 0.5

def test_function(X):
    return a * torch.exp(-1.0 * torch.pow((X - b), 2) / (2*c*c))

const_x1_min = -5.0
const_x1_max = 5.0

x_ = torch.linspace(const_x1_min, const_x1_max, 200)

# Training data

In [None]:
train_cnt = 3
X_train = torch.tensor([x for x in np.random.uniform(low=const_x1_min, high=const_x1_max, size=train_cnt)])
Y_train = test_function(X_train)

# Visualising objective function and training data

In [None]:
plt.plot(x_, test_function(x_))
plt.plot(X_train, Y_train, "*", markersize=20)

# GP model

In [None]:
def create_gp_model(X, Y, plot_opti=False):
    D = 1
    gp_model = gp.models.GPRegression(X, Y, 
                                      kernel=gp.kernels.Matern52(input_dim=D, lengthscale=torch.ones(D)))
    ###############
    # Set priors
    ###############
    gp_model.kernel.set_prior("lengthscale", dist.LogNormal(0.0, 1.0).expand((D,)).to_event(1))
    gp_model.kernel.set_prior("variance", dist.LogNormal(0.0, 1.0))
    #    Assuming noiseless model
    gp_model.set_prior("noise", dist.Uniform(0.0, 1.0))

    ###############
    # Set guides
    ###############
    gp_model.kernel.autoguide("lengthscale", dist.Normal)
    gp_model.kernel.autoguide("variance", dist.Normal)
    gp_model.autoguide("noise", dist.Normal)
    
    # Optimiser
    gp_opt = torch.optim.Adam(gp_model.parameters(), lr=0.005)
    loss_fn = pyro.infer.Trace_ELBO().differentiable_loss
    
    losses = []

    num_steps = 5000
    for i in range(num_steps):
        gp_opt.zero_grad()

        loss = loss_fn(gp_model.model, gp_model.guide)
        loss.backward()

        gp_opt.step()

        losses.append(loss.item())
    
    if plot_opti:
        plt.semilogy(losses);
    
    return gp_model

In [None]:
gp_model = create_gp_model(X_train, Y_train, plot_opti=True)

In [None]:
sorted(list(gp_model.named_buffers()))

In [None]:
predict_result = gp_model(x_)

In [None]:
plt.errorbar(x_, predict_result[0].detach().numpy(), yerr=predict_result[1].detach().numpy(), color="#1f77b4")
plt.plot(x_, test_function(x_), color="orange")
plt.plot(X_train, Y_train, "*", markersize=20, color="green")

# Expected Improvement

In [None]:
normal_phi = lambda x: torch.exp(-x.pow(2)/2)/np.sqrt(2*np.pi)
normal_Phi = lambda x: (1 + torch.erf(x / np.sqrt(2))) / 2
  
def expected_improvement(gpmodel, x):
    
    y_min = gpmodel.y.min()
    
    mu, variance = gpmodel(x, full_cov=False, noiseless=False)
    
    sigma = variance.sqrt()
    
    delta = y_min - mu
    
    EI = delta.clamp_min(0.0) + sigma*normal_phi(delta/sigma) - delta.abs()*normal_Phi(delta/sigma)
    
    return -EI

In [None]:
ei_result = expected_improvement(gp_model, x_)

In [None]:
plt.figure(figsize=(9,6))
plt.errorbar(x_, 
             predict_result[0].detach().numpy(), 
             yerr=predict_result[1].detach().numpy(), 
             color="#1f77b4", label="GP model")

plt.plot(x_, test_function(x_), color="orange", label="Target")
plt.plot(X_train, Y_train, "*", markersize=20, color="green", label="Train")
plt.plot(x_, ei_result.detach().numpy(), color="red", label="EI")
plt.legend();

# AVG Expected Improvement

In [None]:
def avg_expected_improvement(gpmodel, x, no_samples=10):

    D = x.shape[0]
    EI_sum = torch.zeros((D))
    
    for _ in range(no_samples):
        EI_sum += expected_improvement(gpmodel, x)
    
    EI_sum /= no_samples
    
    return EI_sum

In [None]:
avg_ei_result = avg_expected_improvement(gp_model, x_, no_samples=100)

In [None]:
plt.figure(figsize=(9,6))
plt.errorbar(x_, 
             predict_result[0].detach().numpy(), 
             yerr=predict_result[1].detach().numpy(), 
             color="#1f77b4", label="GP model")

plt.plot(x_, test_function(x_), color="orange", label="Target")
plt.plot(X_train, Y_train, "*", markersize=20, color="green", label="Train")

plt.plot(x_, avg_ei_result.detach().numpy(), color="red", label="AVG EI")
plt.legend();

# Comparing EI and avg EI

In [None]:
plt.figure(figsize=(9,6))
plt.plot(x_, ei_result.detach().numpy(), color="blue", label="EI")
plt.plot(x_, avg_ei_result.detach().numpy(), color="red", label="AVG EI")
plt.legend();

# Comparing optimisation between EI and AVG EI

In [None]:
def optimise(acquisition_func, x_st, lr=1.0):
    
    # unconstrained minimiser
    minimizer = optim.LBFGS([x_st], lr=lr)
                        
    def closure():
        # clear gradients
        minimizer.zero_grad()

        y = acquisition_func(x_st)

        autograd.backward(x_st, autograd.grad(y, x_st))

        print("x_st", x_st, y)

        return y

    minimizer.step(closure)
    
    return minimizer

In [None]:
x_st = torch.Tensor([-1.5]).detach().requires_grad_(True)
optimise(lambda x: expected_improvement(gp_model, x), x_st, lr=1);

In [None]:
x_st = torch.Tensor([-1.5]).detach().requires_grad_(True)
opti_resul = optimise(lambda x: avg_expected_improvement(gp_model, x), x_st, lr=1)

# Performing BO with EI and avg EI

### Minimalistic BO approach

In [None]:
def find_a_candidate(acquisition_func, x_init, x_min, x_max):
    # Creating constrains
    constraint_x = constraints.interval(x_min, x_max)
    
    # transform x_init to an unconstrained domain as we use an unconstrained optimizer
    x_uncon_init = transform_to(constraint_x).inv(x_init)
    x_uncon = x_uncon_init.clone().detach().requires_grad_(True)
    
    # unconstrained minimiser
    minimizer = optim.LBFGS([x_uncon])

    def closure():
        # clear gradients
        minimizer.zero_grad()
        x = transform_to(constraint_x)(x_uncon)
        y = acquisition_func(x)
        autograd.backward(x_uncon, autograd.grad(y, x_uncon))
                        
        return y
    
    minimizer.step(closure)
        
    # after finding a candidate in the unconstrained domain,
    # convert it back to original domain.
    x = transform_to(constraint_x)(x_uncon)
        
    return x.detach()

def next_x(acquisition_func, gp_model, x_init, x_min, x_max, num_candidates=5):
    
    candidates = []
    values = []
    
    for _ in range(num_candidates):
        
        x_can = find_a_candidate(lambda x: acquisition_func(gp_model, x), 
                                 x_init, x_min, x_max)
        
        y_can = test_function(x_can)
        
        candidates.append(x_can)
        values.append(y_can)
        
        x_init = torch.tensor([x for x in np.random.uniform(low=x_min, high=x_max, size=1)])
    
    #print("candidates: ", candidates)
    #print("values: ", values)
    
    argmin = torch.min(torch.cat(values), dim=0)[1].item()
    candidate = candidates[argmin]
    
    return candidate

def update_posterior(gpmodel, x_new, y_new):
    
    # incorporate new evaluation
    X = torch.cat([gpmodel.X, x_new]) 
    y = torch.cat([gpmodel.y, y_new])
    
    gpmodel.set_data(X, y)
    
    optimizer = torch.optim.Adam(gpmodel.parameters(), lr=0.005)
    
    gp.util.train(gpmodel, optimizer)
    
    return X, y,

def BO(gp_model, acquisition_function, X, Y, bo_steps=5, num_candidates=5):
    
    X_train_ = copy.copy(X)
    Y_train_ = copy.copy(Y)

    for i in range(bo_steps):

        x_init = torch.Tensor([X_train_[-1]]).detach().requires_grad_(True)

        x_new = next_x(acquisition_function, gp_model, x_init, 
                       const_x1_min, const_x1_max, num_candidates=num_candidates)

        y_new = test_function(x_new) 

        print("BO STEP: ", i+1, 
              "ini = ", x_init.detach().numpy()[0], 
              "fin = ", x_new.detach().numpy()[0], 
              "value = ", y_new.detach().numpy()[0]
        )

        X_train_ = torch.cat([X_train_, x_new]) 
        Y_train_ = torch.cat([Y_train_, y_new]) 

        update_posterior(gp_model, x_new, y_new)
    

# BO AVG EI

In [None]:
set_random_seed(444)
gp_model = create_gp_model(X_train, Y_train)

BO(gp_model, avg_expected_improvement, X_train, Y_train)

# BO EI

In [None]:
set_random_seed(444)
gp_model = create_gp_model(X_train, Y_train)

BO(gp_model, expected_improvement, X_train, Y_train)