In this notebook, we illustrate how to implement a **Multi-Fidelity Bayesian Optimization** (MFBO) method.

We set up first a PyTorch environment for use with the Botorch library for *Bayesian optimization* research.

In [1]:
import os, torch, botorch, gpytorch
torch.set_default_dtype(torch.double)

SMOKE_TEST = os.environ.get("SMOKE_TEST")

# Get cpu or gpu device for training
device = "cuda" if torch.cuda.is_available() else "cpu" 
print(f'Running on PyTorch {torch.__version__}, Botorch {botorch.__version__}, using {device} device')

Running on PyTorch 1.12.1, Botorch 0.8.1, using cpu device


### Problem setup
Load and preprocess data from a CSV file for use in the model. 

In [2]:
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler, MinMaxScaler

data = 'input_data/Case3_finer.csv'
# Data loading
def load_data(data=data):
    df = pd.read_csv(data)
    X_data  = np.vstack([df["alpha0"].to_numpy() ,
                df["alpha1"].to_numpy() ,
                df["alpha2"].to_numpy() ,
                df["alpha3"].to_numpy() ])
    pts = X_data.T.copy()
    pts = StandardScaler().fit_transform(pts)
    '''COMPUTE DIFFERENT CASES AND PLOT THEM'''
    Cx_data = df['SumCx'].to_numpy() 


    #Cx_data = np.vstack([df['SumCx'].to_numpy() ,
    #                     df['SumCy'].to_numpy() ])

    
    if Cx_data.ndim < 2:
       obs = Cx_data.reshape(-1,1).copy() 
    else:
       obs = Cx_data.T.copy()

    obs = StandardScaler().fit_transform(obs)
    return pts, obs

Perform a PCA transformation on the given dataset

In [3]:
from sklearn.decomposition import PCA
   
# Compute PCA 
def PCA_transformation(pts_original):
    pca = PCA(n_components=4, svd_solver='full').fit(pts_original)
    pts_transformed = pca.transform(pts_original)
    return pts_transformed

Resample data using a Latin hypercube sample and a nearest neighbor interpolation

In [4]:
from pyDOE import lhs
from scipy.interpolate import NearestNDInterpolator

# Sample RSMs
def data_resample(pts, obs):
    dim = pts.shape[1]
    N = 10000
    lb = np.min( pts,axis=0)
    ub = np.max( pts,axis=0)
    bounds = {'lb': lb, 'ub': ub}
    # Generate latin-hypercube
    new_pts = lb + (ub - lb) * lhs(dim, N) 
    # pts are not in convex hull of pts (LD interpolator does not extrapolate) 
    r =  NearestNDInterpolator( pts, obs)
    # pts has to be inside region of interpolation .
    valuesTrasf = r(new_pts) 
    #valuesTrasf.reshape(-1,1).T
    return new_pts,valuesTrasf

Define a PyTorch Dataset for the test case, which will be used for training and testing the model

In [5]:
from torch.utils.data import TensorDataset, DataLoader, Dataset
from sklearn.model_selection import train_test_split

pts, obs = load_data()
pts = PCA_transformation(pts)
new_pts,new_obs = data_resample(pts, obs)

class TestcaseDataset(Dataset):
    def __init__(self,new_pts,new_obs,dim):
        data = np.hstack([new_pts,new_obs]) 
        hifi, lofi = train_test_split(data, test_size=1e-3, shuffle=True)
        hifi, test = train_test_split(hifi, test_size=1e-1)
        size=dim-data.shape[1]
        # Cast them
        self.X_hifi = torch.Tensor(hifi[:,:size]) 
        self.X_lofi = torch.Tensor(lofi[:,:size]) 
        self.X_test = torch.Tensor(test[:,:size]) 
        self.Y_hifi = torch.Tensor(hifi[:,size:])#.unsqueeze(-1)
        self.Y_lofi = torch.Tensor(lofi[:,size:])#.unsqueeze(-1)
        self.Y_test = torch.Tensor(test[:,size:])#.unsqueeze(-1)

        self.hifi_dataset = TensorDataset(self.X_hifi, self.Y_hifi)
        self.test_dataset = TensorDataset(self.X_test, self.Y_test)
    def __call__(self):
        return (self.hifi_dataset, 
                self.test_dataset, 
                self.X_hifi, 
                self.X_lofi, 
                self.X_test, 
                self.Y_hifi, 
                self.Y_lofi, 
                self.Y_test, 
                )

Initialize the data 

In [7]:
from botorch.models.transforms.outcome import Standardize
from botorch.utils.transforms import unnormalize, normalize
dataset = TestcaseDataset(new_pts,new_obs,pts.shape[1])

X_lofi = dataset.X_lofi#normalize(dataset.X_lofi,bounds=bounds)
X_hifi = dataset.X_hifi#normalize(dataset.X_hifi,bounds=bounds)
X_test = dataset.X_test#normalize(dataset.X_test,bounds=bounds)
Y_lofi = dataset.Y_lofi
Y_hifi = dataset.Y_hifi
Y_test = dataset.Y_test

hifi_dataset = dataset.hifi_dataset
test_dataset = dataset.test_dataset

#bounds = torch.stack([-2.5 * torch.ones(pts.shape[-1]), 2.5 * torch.ones(pts.shape[-1])])
bounds = torch.Tensor([[pts[:,0].min(), pts[:,1].min(), pts[:,2].min(), pts[:,3].min()], [pts[:,0].max(), pts[:,1].max(), pts[:,2].max(), pts[:,3].max()]])
'''print("Shape of low fidelity  X and y: ",X_lofi.shape, Y_lofi.shape)
print("Shape of high fidelity X and y: ",X_hifi.shape, Y_hifi.shape)
print("Shape of test set      X and y: ",X_test.shape, Y_test.shape)
print("Buonds shape: ", bounds.shape)
print('points shape',pts.shape)'''

# Create data loaders
hifi_dataloader = DataLoader(hifi_dataset, batch_size = 32, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size = 32, shuffle=True)
best_obs_value = Y_hifi.numpy().max()

NOISE_LEVEL = 1e-1
noise = NOISE_LEVEL*torch.ones(Y_lofi.shape[0])

Display the data points

In [None]:
import matplotlib.pyplot as plt
plt.style.use('seaborn')

fig = plt.figure(figsize=(21, 9))
ax = fig.add_subplot(projection='3d')
ax.scatter(X_hifi[:, 0],X_hifi[:, 1],X_hifi[:, 2],label = 'high fidelity')
ax.scatter(X_test[:, 0],X_test[:, 1],X_test[:, 2],label = 'test')
ax.scatter(X_lofi[:, 0],X_lofi[:, 1],X_lofi[:, 2],label = 'low fidelity')
plt.legend()
plt.show()


## High Fidelity Model

Initialize a `high fidelity neural network` model, which uses the *Mean Square Error* (MSE) loss and the *Adam Optimizer* for training. 

In [8]:
from torch import nn # Neural Network Module
# Define hyperparameters
n_samples, n_features = X_hifi.shape
input_size = n_features
hidden_size = 128
output_size = Y_hifi.shape[1]
learning_rate = 1e-2
# Define the neural network model (input, hidden, output size)
class NeuralNet(nn.Module):
    def __init__(self,input_size,hidden_size,output_size):
        super(NeuralNet, self).__init__()
        # Define layers
        self.linear1 = nn.Linear(input_size, hidden_size)
        self.relu = nn.LeakyReLU() # activation function nn.LeakyReLU()
        self.linear2 = nn.Linear(hidden_size, output_size)
        self.sigmoid = nn.Sigmoid()
    def forward(self, x):
        out = self.linear1(x)
        out = self.relu(out)
        out = self.linear2(out)
        # Uncomment the following line if your task requires a sigmoid activation on the output
        # out = self.sigmoid(out)
        return out

#### variant

In [None]:
# Define hyperparameters
n_samples, n_features = X_hifi.shape
input_size = n_features
hidden_size = 128
output_size = Y_hifi.shape[1]
learning_rate = 1e-2

# Define the neural network model
class NeuralNet(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(NeuralNet, self).__init__()
        # Define layers
        self.linear1 = nn.Linear(input_size, hidden_size)
        self.relu = nn.LeakyReLU()  # Activation function
        self.linear2 = nn.Linear(hidden_size, output_size)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        out = self.linear1(x)
        out = self.relu(out)
        out = self.linear2(out)

        return out

# Instantiate the model
model = NeuralNet(input_size, hidden_size, output_size)

# Print the model architecture
print(model)

#### variant 2

In [None]:
class Encoder(nn.Module):
    def __init__(self, input_dim, hidden_dim, kernel_size, num_layers):
        super(Encoder, self).__init__()
        layers = []
        for i in range(num_layers):
            layers.append(nn.Conv1d(in_channels=input_dim if i == 0 else hidden_dim, 
                                    out_channels=hidden_dim, 
                                    kernel_size=kernel_size, 
                                    padding=kernel_size // 2))
            layers.append(nn.ReLU())
            layers.append(nn.BatchNorm1d(hidden_dim))
        self.encoder = nn.Sequential(*layers)
    
    def forward(self, x):
        return self.encoder(x)

class Decoder(nn.Module):
    def __init__(self, output_dim, hidden_dim, kernel_size, num_layers):
        super(Decoder, self).__init__()
        layers = []
        for i in range(num_layers):
            layers.append(nn.Conv1d(in_channels=hidden_dim, 
                                    out_channels=hidden_dim, 
                                    kernel_size=kernel_size, 
                                    padding=kernel_size // 2))
            layers.append(nn.ReLU())
            layers.append(nn.BatchNorm1d(hidden_dim))
        layers.append(nn.Conv1d(in_channels=hidden_dim, 
                                out_channels=output_dim, 
                                kernel_size=kernel_size, 
                                padding=kernel_size // 2))
        self.decoder = nn.Sequential(*layers)
    
    def forward(self, x):
        return self.decoder(x)

class NeuralNet(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, kernel_size=3, num_layers=1):
        super(NeuralNet, self).__init__()
        self.encoder = Encoder(input_dim, hidden_dim, kernel_size, num_layers)
        self.decoder = Decoder(output_dim, hidden_dim, kernel_size, num_layers)
    
    def forward(self, x):
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        return decoded

# Example usage
input_dim = 1  # Number of input features
output_dim = 1  # Number of output features
hidden_dim = 64  # Number of features in the hidden layers
kernel_size = 3  # Size of the convolutional kernel
num_layers = 3  # Number of layers in the encoder and decoder

model = NeuralNet(input_dim, hidden_dim, output_dim, kernel_size=3, num_layers=1)
print(model)

# Assuming x is your input tensor with shape (batch_size, input_dim, sequence_length)
x = torch.randn(32, input_dim, 100)  # Example input
output = model(x)
print(output.shape)


We train the model for a specified number of epochs on the training data. The data is fed to the model in batches. The model makes predictions on the training data and backpropagates the prediction error to update the model parameters. The loss is printed after every 10 epochs.

It is possible to save the state of the neural network to a file after training. 

It is possible to import the state of a previously trained neural network model. The `NeuralNet` class, defined earlier, takes the same *input size*, *hidden size*, and *output size* as the saved model. The `load_state_dict` method is used to load the saved state of the neural network from a file. In this case, the file is located in the `model_weights` directory. Once the model is loaded, it is set to evaluation mode using the `eval()` method.

In [9]:
model_file = "trained_model_weights/testcase_nn_PCA_3000_singlobj.pt"
path = os.path.join(os.getcwd(), model_file)

if os.path.isfile(path):
    # Load model weights from the work directory
    print('State dict of the neural network model imported')
    nn_model = NeuralNet(input_size,hidden_size,output_size).to(device)
    nn_model.load_state_dict(torch.load(path))
    nn_model.eval()
else:
    print('Training the neural network model')
    # Define a neural network model with input size, hidden size, and output size, then move it to the device (CPU or GPU)
    nn_model = NeuralNet(input_size,hidden_size,output_size).to(device)
    print(nn_model)

    ''' Construct loss and optmizer '''
    # Define a loss function for the neural network to minimize. Mean Square Error Loss is used in this case.
    criterion = nn.MSELoss() # loss_fn = nn.CrossEntropyLoss()
    # Define an optimizer to update the parameters of the neural network during training. Adam optimizer is used in this case.
    optimizer = torch.optim.Adam(nn_model.parameters(), lr=learning_rate) 

    ''' In a single training epoch, the model makes predictions on the training dataset (fed to it in batches)
    and backpropagates the prediction error to update the model’s parameters '''

    # Set the number of epochs for training the neural network. In each epoch, the neural network is trained on all training samples in batches.
    NUM_EPOCHS = 3000 if not SMOKE_TEST else 5 # epoch -> forward and backward of ALL training samples
    # 3000 iter, single obj --> 17.58min
    for epoch in range(NUM_EPOCHS):
        size = len(hifi_dataloader.dataset)
        nn_model.train()
        for batch, (X, y) in enumerate(hifi_dataloader):
            X, y = X.to(device), y.to(device)

            ''' Compute prediction error '''
            # Forward pass the input data through the neural network to compute the predicted output.
            pred = nn_model(X)         
            # Compute the loss between the predicted output and the true output.
            loss = criterion(pred, y) 

            ''' Backpropagation '''
            # Clear the gradients from the previous iteration.
            optimizer.zero_grad()   
            # Compute the gradients for all the parameters in the neural network using backpropagation.
            loss.backward()          
            # Update the parameters of the neural network using the computed gradients.
            optimizer.step()         

        # Print the loss after every 10 epochs.   
        if (epoch+1) % 10 == 0:
                print(f"Epoch {epoch+1} / {NUM_EPOCHS}, step [{batch+1}/{len(hifi_dataloader)}] loss: {loss:.4f}")

    ''' SAVE THE MODEL'''
    # Save the state of the neural network to a file.
    cwd = os.getcwd()
    model_file = "trained_model_weights/testcase_nn.pt" # STATE THE MODEL SPECIFICATIONS IN THE FILE NAME 
    torch.save(nn_model.state_dict(), os.path.join(cwd, model_file))
    print('State dict of the neural network model saved')


    ''' Test '''    
    # Set the neural network to evaluation mode to disable the gradient calculation for test dataset.
    nn_model.eval()
    size = len(test_dataloader.dataset)
    num_batches = len(test_dataloader)
    test_loss, correct, num_samples = 0, 0, 0
    with torch.no_grad():
        for X, y in test_dataloader:
            X, y = X.to(device), y.to(device)
            # Forward pass the input data through the neural network to compute the predicted output.
            pred = nn_model(X)
            # Compute the loss between the predicted output and the true output.
            test_loss += criterion(pred, y).item()
            #num_samples += y.shape[0]
            #correct += (pred.argmax(1) == y).type(torch.float).sum().item()
    # Compute the average test loss.
    test_loss /= num_batches
    #correct /= size
    #acc = 100.0 * correct / num_samples
    # Print the test loss.
    print(f"Test Error -> Test loss: {test_loss:>8f} \n") # Acc: {acc:>8f}

Import the state of the neural network model saved


Visualizes the neural network prediction compared to the test data. First we load the saved state of the neural network model using `load_state_dict` and set the model to evaluation mode using `eval()`. Then, using the `torch.no_grad()` context manager, the model predicts the output *Y_pred* for the test data *X_test*. Finally, we create a plot to compare the predicted output to the true output for each target variable in *Y_test*. The plot shows a scatter plot of the test data and the predicted data on the same axes for each feature of *X_test*.

In [None]:
# Prediction vs the test data
with torch.no_grad():
    Y_pred = nn_model(X_test)

for i in range(Y_test.shape[-1]):
    fig = plt.figure(figsize=(21, 9))
    plt.title('Testcase dataset pca resampled')
    for alpha in range(X_test.shape[-1]):
        ax = fig.add_subplot(2, 2, alpha+1)
        ax.scatter(X_test[:,alpha].numpy(), Y_test[:,i].numpy(),label = 'test')
        ax.scatter(X_test[:,alpha].numpy(), Y_pred[:,i].numpy(),label = 'predicted')
        plt.legend()


## Surrogate Model
We initialize a surrogate model using *Gaussian Processes* (GP) to predict the output values given a set of input features. We use the botorch library to create a `SingleTaskGP` model for each output dimension in *Y* and then combine them into a `ModelListGP`. The *likelihood* of the GP is set to Gaussian with a specified noise level. We also define a function to initialize the GP model by taking input features *X*, output values *Y*, and *noise level* as inputs. The function returns the marginal log-likelihood and the initialized GP model. We also add the option to load a saved state dictionary of the GP model.

In [13]:
from botorch.models import SingleTaskGP, FixedNoiseGP, ModelListGP
from gpytorch.mlls.exact_marginal_log_likelihood import ExactMarginalLogLikelihood
from gpytorch.mlls.sum_marginal_log_likelihood import SumMarginalLogLikelihood

# define a function to initialize the Gaussian Process model
def initialize_model(X, Y, noise, state_dict=None):
  # move X and Y to the device
  train_x, train_obj = X.to(device), Y.to(device)
  # create an empty list of models
  models = []
  # define a Gaussian likelihood with specified noise level
  likelihood = gpytorch.likelihoods.GaussianLikelihood(noise=noise)#, learn_additional_noise=False)
  
  # loop over each output dimension in Y
  for i in range(train_obj.shape[-1]):
      train_y = train_obj[..., i : i + 1]
      # create a single task Gaussian Process for each output dimension
      models.append(
          SingleTaskGP(
              train_x, train_y
          )
      )
  # create a model list GP by passing all the created SingleTaskGP models
  model = ModelListGP(*models)
  # define the marginal log-likelihood as sum of marginal log-likelihoods of all the models
  mll = SumMarginalLogLikelihood(model.likelihood, model)

  #mll = ExactMarginalLogLikelihood(likelihood, single_model) # OTHER LIKELIHOOD ?
  ''' load state dict if it is passed '''
  if state_dict is not None:
    model.load_state_dict(state_dict)
  return mll, model

### Define a helper function that performs the essential BO step
We define a helper function for *Bayesian Optimization* (BO) by performing the essential BO step. The function uses the botorch library for optimizing the acquisition function. 

The function takes an acquisition function as an argument and returns a new candidate point that maximizes the acquisition function. 

The acquisition function is optimized using multiple restarts and raw samples, and the function is constrained to a set of bounds. 

Finally, the predicted optimal objective value is obtained by evaluating the neural network model at the candidate point. 

The parameters `NUM_POINTS`, `NUM_RESTARTS`, `RAW_SAMPLES`, and `NUM_INITIAL_POINTS` are defined at the beginning of the code to control the optimization parameters.

In [14]:
from botorch.optim import optimize_acqf
from botorch.utils.sampling import manual_seed

NUM_POINTS = 1 if not SMOKE_TEST else 1
NUM_RESTARTS = 10 if not SMOKE_TEST else 2
RAW_SAMPLES = 512 if not SMOKE_TEST else 32
NUM_INITIAL_POINTS = X_lofi.shape[0]

def opt_acq_and_get_next_obj(acqf):
  """Optimizes the acquisition function, and returns a new candidate."""
  with manual_seed(1234):
    candidates, acq_value = optimize_acqf(
        acq_function=acqf, 
        bounds=bounds,
        q=NUM_POINTS,
        num_restarts=NUM_RESTARTS,
        raw_samples=RAW_SAMPLES,
        options={"batch_limit": 10, "maxiter": 200},
    )
  ''' get a new obj '''
  with torch.no_grad():
    predicted_optimal_obj = nn_model(candidates)
  return candidates, predicted_optimal_obj

### Perform a few steps of MFBO
We performs a few steps of *Multi-Fidelity Bayesian optimization*. The implementation involves several packages such as `botorch.acquisition`, `botorch.fit`, `botorch.sampling`, etc. The initialize_model function initializes the *Gaussian Process* (GP) model for each acquisition function to optimize. The function also applies the noise model to the input and output, i.e., *X_lofi* and *Y_lofi*.

The objective of this implementation is to iterate until it converges on accuracy test with test data (NN). It includes different acquisition functions such as *Upper Confidence Bound* (UCB), *Probability of Improvement* (PI), *Expected Improvement* (EI), *q-Expected Improvement* (qEI), *q-Noisy Expected Improvement* (qNEI), and *q-Knowledge Gradient* (qKG).

The implementation fits the models, defines the qEI and qNEI acquisition modules using a *QMC sampler*, and uses the best observed noisy values as an approximation. The implementation also uses `fit_gpytorch_mll` function to optimize the *marginal likelihood* of the GP model.

For each acquisition function, the implementation applies its corresponding acquisition module to maximize the acquisition function. It uses optimize_acqf function to optimize the acquisition function. The implementation sets the bounds and the number of iterations to perform the optimization. Finally, the acquisition function with the maximum value is used as the next candidate for the high-fidelity input.

In [38]:
from botorch.acquisition import (UpperConfidenceBound,
                                 ProbabilityOfImprovement,
                                 NoisyExpectedImprovement,
                                 ExpectedImprovement,
                                 qKnowledgeGradient, # too much computational expensive wrt the provided performance/accuracy
                                 PosteriorMean)
from botorch.acquisition.monte_carlo import (qExpectedImprovement, 
                                             qNoisyExpectedImprovement)
from botorch.acquisition.objective import ScalarizedPosteriorTransform
from botorch.fit import fit_gpytorch_mll
from botorch.sampling import SobolQMCNormalSampler
import warnings
#warnings.filterwarnings("ignore", category=NumericalWarning)
# single obj does not display NumericalWarning

N_ITER = 32 if not SMOKE_TEST else 1 # 32 iterations --> 4.12mis
NUM_FANTASIES = 128 if not SMOKE_TEST else 4
MC_SAMPLES = 256 if not SMOKE_TEST else 32

X_lofi = X_lofi[0:NUM_INITIAL_POINTS]
Y_lofi = Y_lofi[0:NUM_INITIAL_POINTS]
''' ITERATE UNTIL CONVERGE ON ACCURACY TEST WITH TEST DATA (NN)'''

X_lofi_UCB,  Y_lofi_UCB  = X_lofi, Y_lofi
X_lofi_PI,   Y_lofi_PI   = X_lofi, Y_lofi
X_lofi_EI,   Y_lofi_EI   = X_lofi, Y_lofi
X_lofi_qEI,  Y_lofi_qEI  = X_lofi, Y_lofi
X_lofi_qNEI, Y_lofi_qNEI = X_lofi, Y_lofi
'''X_lofi_qKG,  qKG_Y_lofi  = X_lofi, Y_lofi
X_lofi_qKGp, qKGp_Y_lofi = X_lofi, Y_lofi'''

mll,      gp_model      = initialize_model(X_lofi,      Y_lofi,      noise)
mll_UCB,  gp_model_UCB  = initialize_model(X_lofi_UCB,  Y_lofi_UCB,  noise)
mll_PI,   gp_model_PI   = initialize_model(X_lofi_PI,   Y_lofi_PI,   noise)
mll_EI,   gp_model_EI   = initialize_model(X_lofi_EI,   Y_lofi_EI,   noise)
mll_qEI,  gp_model_qEI  = initialize_model(X_lofi_qEI,  Y_lofi_qEI,  noise)
mll_qNEI, gp_model_qNEI = initialize_model(X_lofi_qNEI, Y_lofi_qNEI, noise)
'''mll_qKG,  gp_model_qKG  = initialize_model(X_lofi,  Y_lofi,  noise)
mll_qKGp, gp_model_qKGp = initialize_model(X_lofi, Y_lofi, noise)'''

for iter in range(N_ITER):

     ''' fit the models '''
     fit_gpytorch_mll(mll)
     fit_gpytorch_mll(mll_UCB)
     fit_gpytorch_mll(mll_PI)
     fit_gpytorch_mll(mll_EI)
     fit_gpytorch_mll(mll_qEI)
     fit_gpytorch_mll(mll_qNEI)

     ''' define the qEI and qNEI acquisition modules using a QMC sampler '''
     sampler = SobolQMCNormalSampler(sample_shape=torch.Size([MC_SAMPLES]))
     
     pt = ScalarizedPosteriorTransform(weights=torch.ones(Y_lofi.shape[-1])*(1/Y_lofi.shape[-1]))

     ''' for best_f, we use the best observed noisy values as an approximation '''
     UCB  = UpperConfidenceBound(
          model = gp_model_UCB, 
          beta=0.3,
          posterior_transform=pt,
     )
     PI  = ProbabilityOfImprovement( 
          model = gp_model_PI, 
          best_f = Y_lofi_PI.max(),
          posterior_transform=pt,
          )
     EI  = ExpectedImprovement( 
          model = gp_model_EI, 
          best_f = Y_lofi_EI.max() ,
          posterior_transform=pt,
     )
     qEI  = qExpectedImprovement( 
          model = gp_model_qEI, 
          best_f = Y_lofi_qEI.max(), 
          sampler=sampler,
          posterior_transform=pt,
     )
     qNEI  = qNoisyExpectedImprovement(
          model = gp_model_qNEI, 
          X_baseline = X_lofi_qNEI, 
          sampler=sampler,
          posterior_transform=pt,
     )
     '''qKG  = qKnowledgeGradient(
          model = gp_model_qKG, 
          num_fantasies=NUM_FANTASIES
     )
     qKG_proper  = qKnowledgeGradient(
          model = gp_model_qKGp, 
          num_fantasies=NUM_FANTASIES
     )
     argmax_pmean, max_pmean = optimize_acqf(
      acq_function=PosteriorMean(single_model), 
      bounds=bounds,
      q=NUM_POINTS,
      num_restarts=10 if not SMOKE_TEST else 2,
      raw_samples=1024 if not SMOKE_TEST else 4,
     )
     acqf_name = qKnowledgeGradient(
      single_model,
      num_fantasies=NUM_FANTASIES,
      sampler=qKG.sampler,
      current_value=max_pmean,
    )'''
     ''' optimize and get new observation '''
     new_X_UCB,  predicted_optimal_obj_UCB  = opt_acq_and_get_next_obj(UCB)  
     new_X_PI,   predicted_optimal_obj_PI   = opt_acq_and_get_next_obj(PI)  
     new_X_EI,   predicted_optimal_obj_EI   = opt_acq_and_get_next_obj(EI)  
     new_X_qEI,  predicted_optimal_obj_qEI  = opt_acq_and_get_next_obj(qEI)  
     new_X_qNEI, predicted_optimal_obj_qNEI = opt_acq_and_get_next_obj(qNEI)  
     new_X = (bounds[1] - bounds[0]) * torch.rand(new_X_qNEI.shape) + bounds[0]
     with torch.no_grad(): predicted_obj = nn_model(new_X) + NOISE_LEVEL * torch.randn(1)

     print(f"Iter {iter+1}/{N_ITER}")# - next points: {new_X}\n pred objectives: {predicted_optimal_obj}")

     ''' update training points '''
     X_lofi      = torch.cat([X_lofi,      new_X ])
     X_lofi_UCB  = torch.cat([X_lofi_UCB,  new_X_UCB ])
     X_lofi_PI   = torch.cat([X_lofi_PI,   new_X_PI ])
     X_lofi_EI   = torch.cat([X_lofi_EI,   new_X_EI ])
     X_lofi_qEI  = torch.cat([X_lofi_qEI,  new_X_qEI ])
     X_lofi_qNEI = torch.cat([X_lofi_qNEI, new_X_qNEI ])

     Y_lofi      = torch.cat([Y_lofi,      predicted_obj])
     Y_lofi_UCB  = torch.cat([Y_lofi_UCB,  predicted_optimal_obj_UCB])
     Y_lofi_PI   = torch.cat([Y_lofi_PI,   predicted_optimal_obj_PI])
     Y_lofi_EI   = torch.cat([Y_lofi_EI,   predicted_optimal_obj_EI])
     Y_lofi_qEI  = torch.cat([Y_lofi_qEI,  predicted_optimal_obj_qEI])
     Y_lofi_qNEI = torch.cat([Y_lofi_qNEI, predicted_optimal_obj_qNEI])

     noise  = torch.cat([noise, 1e-3*torch.ones(Y_lofi_qNEI.shape[0]) ])

     mll,      gp_model      = initialize_model(X_lofi,      Y_lofi,      noise, gp_model.state_dict())
     mll_UCB,  gp_model_UCB  = initialize_model(X_lofi_UCB,  Y_lofi_UCB,  noise, gp_model_UCB.state_dict())
     mll_PI,   gp_model_PI   = initialize_model(X_lofi_PI,   Y_lofi_PI,   noise, gp_model_PI.state_dict())
     mll_EI,   gp_model_EI   = initialize_model(X_lofi_EI,   Y_lofi_EI,   noise, gp_model_EI.state_dict())
     mll_qEI,  gp_model_qEI  = initialize_model(X_lofi_qEI,  Y_lofi_qEI,  noise, gp_model_qEI.state_dict())
     mll_qNEI, gp_model_qNEI = initialize_model(X_lofi_qNEI, Y_lofi_qNEI, noise, gp_model_qNEI.state_dict())
     '''mll_qKG,  gp_model_qKG  = initialize_model(qKG_X_lofi,  qKG_Y_lofi,  noise)
     mll_qKGp, gp_model_qKGp = initialize_model(qKGp_X_lofi, qKGp_Y_lofi, noise)'''

# NumericalWarning for which acqf ?

Iter 1/32
Iter 2/32
Iter 3/32
Iter 4/32
Iter 5/32
Iter 6/32
Iter 7/32
Iter 8/32
Iter 9/32
Iter 10/32
Iter 11/32
Iter 12/32
Iter 13/32
Iter 14/32
Iter 15/32
Iter 16/32
Iter 17/32
Iter 18/32
Iter 19/32
Iter 20/32
Iter 21/32
Iter 22/32
Iter 23/32
Iter 24/32
Iter 25/32
Iter 26/32
Iter 27/32
Iter 28/32
Iter 29/32
Iter 30/32
Iter 31/32
Iter 32/32


Display the optimal points

In [114]:
def plot_mfbo(acqf=''):
    x = X_lofi.shape[-1]
    y = Y_lofi.shape[-1]
    if acqf != '': _acqf = f'_{acqf}'
    else: _acqf = acqf
    fig = plt.figure(figsize=(21, 9))
    plt.title(f'{acqf}')
    for i in range(y):
        for alpha in range(x):
            ax = fig.add_subplot(int(y*x/2), int(x/2), alpha+x*i+1)
            ax.scatter(X_hifi[:,alpha].numpy(), Y_hifi[:,i].numpy(),label = 'target')
            #ax.scatter(X_lofi[:,alpha].numpy(), Y_lofi[:,i].numpy(),label = 'predicted')
            #ax.scatter(X_lofi_UCB[:,alpha].numpy(), Y_lofi_UCB[:,i].numpy(),label = 'pred UCB')
            #ax.scatter(X_lofi_PI[ :,alpha].numpy(), Y_lofi_PI[ :,i].numpy(),label = 'pred PI')
            #ax.scatter(X_lofi_EI[ :,alpha].numpy(), Y_lofi_EI[ :,i].numpy(),label = 'pred EI')
            #ax.scatter(X_lofi_qEI[:,alpha].numpy(), Y_lofi_qEI[:,i].numpy(),label = 'pred qEI')
            ax.scatter(eval(f'X_lofi{_acqf}')[:,alpha].numpy(), eval(f'Y_lofi{_acqf}')[:,i].numpy(),label = f'pred {acqf}')
            plt.legend()
            plt.xlabel(f"PCA-Wing{alpha+1}")
            plt.ylabel(f"SumCX{i+1}")
    plt.show()

In [None]:
plot_mfbo('qNEI')
plot_mfbo('UCB')
plot_mfbo('PI')
plot_mfbo('EI')
plot_mfbo('qEI')
plot_mfbo()

Build a GP regression model on the optimal points

In [40]:
# Define the Kernel of Gaussian Process
class ExactGPModel(gpytorch.models.ExactGP):
    def __init__(self,X_train, Y_train, likelihood=gpytorch.likelihoods.GaussianLikelihood()):
        super(ExactGPModel, self).__init__(X_train, Y_train, likelihood)
        self.mean_module = gpytorch.means.ConstantMean()
        self.covar_module = gpytorch.kernels.ScaleKernel(gpytorch.kernels.RBFKernel()) 
       
    def forward(self, x):
        mean_x = self.mean_module(x)
        covar_x = self.covar_module(x)
        return gpytorch.distributions.MultivariateNormal(mean_x,covar_x)

Helper function for training the GP regression model

In [41]:
def gp_visualization(x_train,y_train,alpha=0,iter=1000,acqf='acqf'):
    likelihood = gpytorch.likelihoods.GaussianLikelihood()
    train_multi_yvar = noise**2*torch.ones(x_train.unsqueeze(-1).shape[0],1, device=device)
    ''' Define the model '''
    #gp_model = FixedNoiseGP(x_train.unsqueeze(-1), y_train, train_multi_yvar)
    #gp_model = SingleTaskGP(x_train.unsqueeze(-1), y_train, likelihood = likelihood  )
    gp_model = ExactGPModel(x_train, y_train.squeeze(), likelihood)
    ''' Find optimal model hyperparameters '''
    gp_model.train()
    likelihood.train()
    ''' Use the adam optimizer '''  # Includes GaussianLikelihood parameters
    optimizer = torch.optim.Adam(gp_model.parameters(),lr=1e-3)
    ''' "Loss" for GPs - the marginal log likelihood '''
    mll = gpytorch.mlls.ExactMarginalLogLikelihood(likelihood, gp_model)
    training_iter = iter
    for i in range(training_iter):
        ''' Zero gradients from previous iteration '''
        optimizer.zero_grad()    
        ''' Output from model '''
        output = gp_model(x_train)
        ''' compute loss and backprop gradients '''
        loss = -mll(output, y_train.squeeze())
        loss.backward()
        if (i+1) % 100 == 0:
            print(f'Wing{alpha+1}: {acqf} - Iter {i + 1}/{training_iter}')# - Loss: {loss.item():.5f} LenghtParam {gp_model.covar_module.base_kernel.lengthscale.detach().numpy()[0,0]:.5f}')
        optimizer.step()
    print('\n')
    return gp_model.eval(), likelihood.eval()

Train the GP regression  model

In [42]:
TRAIN_ITER = 1000 if not SMOKE_TEST else 4
likelihood_list,      gp_model_list      = [],[]
likelihood_UCB_list,  gp_model_UCB_list  = [],[]
likelihood_PI_list,   gp_model_PI_list   = [],[]
likelihood_EI_list,   gp_model_EI_list   = [],[]
likelihood_qEI_list,  gp_model_qEI_list  = [],[]
likelihood_qNEI_list, gp_model_qNEI_list = [],[]
for alpha in range(2):#X_lofi.shape[-1]):
    ''' Fit the model '''
    likelihood,      gp_model      = gp_visualization(X_lofi[NUM_INITIAL_POINTS:,alpha],     Y_lofi[NUM_INITIAL_POINTS:,0],     alpha,iter=TRAIN_ITER, acqf='rand')
    likelihood_UCB,  gp_model_UCB  = gp_visualization(X_lofi_UCB[NUM_INITIAL_POINTS:,alpha], Y_lofi_UCB[NUM_INITIAL_POINTS:,0], alpha,iter=TRAIN_ITER, acqf='UCB')
    likelihood_PI,   gp_model_PI   = gp_visualization(X_lofi_PI[NUM_INITIAL_POINTS:,alpha],  Y_lofi_PI[NUM_INITIAL_POINTS:,0],  alpha,iter=TRAIN_ITER, acqf='PI')
    likelihood_EI,   gp_model_EI   = gp_visualization(X_lofi_EI[NUM_INITIAL_POINTS:,alpha],  Y_lofi_EI[NUM_INITIAL_POINTS:,0],  alpha,iter=TRAIN_ITER, acqf='EI')
    likelihood_qEI,  gp_model_qEI  = gp_visualization(X_lofi_qEI[NUM_INITIAL_POINTS:,alpha], Y_lofi_qEI[NUM_INITIAL_POINTS:,0], alpha,iter=TRAIN_ITER, acqf='qEI')
    likelihood_qNEI, gp_model_qNEI = gp_visualization(X_lofi_qNEI[NUM_INITIAL_POINTS:,alpha],Y_lofi_qNEI[NUM_INITIAL_POINTS:,0],alpha,iter=TRAIN_ITER, acqf='qNEI')
    likelihood_list.append(likelihood),           gp_model_list.append(gp_model)
    likelihood_UCB_list.append(likelihood_UCB),   gp_model_UCB_list.append(gp_model_UCB)
    likelihood_PI_list.append(likelihood_PI),     gp_model_PI_list.append(gp_model_PI)
    likelihood_EI_list.append(likelihood_EI),     gp_model_EI_list.append(gp_model_EI)
    likelihood_qEI_list.append(likelihood_qEI),   gp_model_qEI_list.append(gp_model_qEI)
    likelihood_qNEI_list.append(likelihood_qNEI), gp_model_qNEI_list.append(gp_model_qNEI)

Wing1: rand - Iter 100/1000
Wing1: rand - Iter 200/1000
Wing1: rand - Iter 300/1000
Wing1: rand - Iter 400/1000
Wing1: rand - Iter 500/1000
Wing1: rand - Iter 600/1000
Wing1: rand - Iter 700/1000
Wing1: rand - Iter 800/1000
Wing1: rand - Iter 900/1000
Wing1: rand - Iter 1000/1000


Wing1: UCB - Iter 100/1000
Wing1: UCB - Iter 200/1000
Wing1: UCB - Iter 300/1000
Wing1: UCB - Iter 400/1000
Wing1: UCB - Iter 500/1000
Wing1: UCB - Iter 600/1000
Wing1: UCB - Iter 700/1000
Wing1: UCB - Iter 800/1000
Wing1: UCB - Iter 900/1000
Wing1: UCB - Iter 1000/1000


Wing1: PI - Iter 100/1000
Wing1: PI - Iter 200/1000
Wing1: PI - Iter 300/1000
Wing1: PI - Iter 400/1000
Wing1: PI - Iter 500/1000
Wing1: PI - Iter 600/1000
Wing1: PI - Iter 700/1000
Wing1: PI - Iter 800/1000
Wing1: PI - Iter 900/1000
Wing1: PI - Iter 1000/1000


Wing1: EI - Iter 100/1000
Wing1: EI - Iter 200/1000
Wing1: EI - Iter 300/1000
Wing1: EI - Iter 400/1000
Wing1: EI - Iter 500/1000
Wing1: EI - Iter 600/1000
Wing1: EI - Iter 700/1000

Helper function to display the GP regression model

In [117]:
def plot_gp(gp_model_acqf,acqf=''):
    x = X_lofi.shape[-1]
    y = Y_lofi.shape[-1]
    if acqf != '': 
        _acqf = f'_{acqf}'
    else: 
        _acqf, acqf = acqf, 'rand'
    fig = plt.figure(figsize=(21, 9))
    plt.title(f'{acqf}')
    for i in range(y):
        for alpha in range(x):

            with torch.no_grad(), gpytorch.settings.fast_pred_var():
                test_x = torch.linspace(X_hifi[:,alpha].min(), X_hifi[:,alpha].max(), 100)
                # Make predictions by feeding model through likelihood / gp_model
                observed_pred_acqf = gp_model_acqf((test_x ))  
                # Initialize plot 
                ax = fig.add_subplot(int(y*x/2), int(x/2), alpha+x*i+1)     
                ax.scatter(X_test[:,alpha].numpy(), Y_test[:,-1].numpy(), alpha=0.8, label = 'Test')
                
                # Get upper and lower confidence bounds # Plot predictive means as blue line
                lower, upper = observed_pred_acqf.confidence_region() 
                ax.plot(test_x.numpy(), observed_pred_acqf.mean.numpy(), label = f'Mean {acqf}')    
                ax.fill_between(test_x.numpy(), lower.numpy(), upper.numpy(), alpha=0.5, label = f'Confidence {acqf}')  
                ax.scatter(eval(f'X_lofi{_acqf}')[:,alpha].numpy(), eval(f'Y_lofi{_acqf}')[:,-1].numpy(), s=50,label = f'Observed Data {acqf}')
                ax.set_ylim([-4,3])
                plt.xlabel(f"PCA-Wing{alpha+1}")
                ax.legend()

Display the GP regression model results

In [None]:
plot_gp(gp_model)
plot_gp(gp_model_UCB,'UCB')
plot_gp(gp_model_PI,'PI')
plot_gp(gp_model_EI,'EI')
plot_gp(gp_model_qEI,'qEI')
plot_gp(gp_model_qNEI,'qNEI')


plt.show()
# ADD NON-PCA DATA (ORIGINAL DATA) PLOT

In [None]:
for alpha in range(X_lofi.shape[-1]):
    likelihood, gp_model = gp_visualization(X_test[:,alpha], Y_test[:,-1].unsqueeze(0),alpha,iter=300, acqf='')


acqf=''
gp_model_acqf = likelihood
x = X_lofi.shape[-1]
y = Y_lofi.shape[-1]
if acqf != '': 
    _acqf = f'_{acqf}'
else: 
    _acqf, acqf = acqf, 'rand'
fig = plt.figure(figsize=(21, 9))
plt.title(f'{acqf}')
for i in range(y):
    for alpha in range(x):

        with torch.no_grad(), gpytorch.settings.fast_pred_var():
            test_x = torch.linspace(X_hifi[:,alpha].min(), X_hifi[:,alpha].max(), 100)
            # Make predictions by feeding model through likelihood / gp_model
            observed_pred_acqf = gp_model_acqf((test_x ))  
            # Initialize plot 
            ax = fig.add_subplot(int(y*x/2), int(x/2), alpha+x*i+1)     
            ax.scatter(X_test[:,alpha].numpy(), Y_test[:,-1].numpy(), alpha=0.8, label = 'Test')
            
            # Get upper and lower confidence bounds # Plot predictive means as blue line
            lower, upper = observed_pred_acqf.confidence_region() 
            ax.plot(test_x.numpy(), observed_pred_acqf.mean.numpy(), label = f'Mean {acqf}')    
            ax.fill_between(test_x.numpy(), lower.numpy(), upper.numpy(), alpha=0.5, label = f'Confidence {acqf}')  
            
            ax.set_ylim([-4,3])
            plt.xlabel(f"PCA-Wing{alpha+1}")
            ax.legend()

In [None]:
from gp import raw_gp
raw_gp(X_test[:,0],Y_test[:,0])

## Plotly
We define some helper functions for creating and updating a Plotly graph in the interactive dashboard. The update_layout_of_graph function updates the layout of the graph with the specified title, x-axis and y-axis labels, legend orientation and position, and axis line properties. The uncertainty_area_scatter function creates a scatter plot with an area filled between the upper and lower bounds. The line_scatter function creates a line plot from the specified x and y values. The test_scatter function creates a scatter plot of test data points. The dot_scatter function creates a scatter plot of observed data points. All functions return a Plotly scatter plot object.

In [119]:
import plotly.graph_objs as go
from visualize import *

In [None]:
for alpha in range(2):
    test_x = torch.linspace(X_hifi[:,alpha].min(), X_hifi[:,alpha].max(), 100)
    plot      = plot_GPR(data_x=X_lofi[NUM_INITIAL_POINTS:,alpha],      
                         data_y=Y_lofi[NUM_INITIAL_POINTS:,-1],      
                         x=test_x, model=likelihood_list[alpha],     legend_group="rand"
                         )
    plot_UCB  = plot_GPR(data_x=X_lofi_UCB[NUM_INITIAL_POINTS:,alpha],  
                         data_y=Y_lofi_UCB[NUM_INITIAL_POINTS:,0],  
                         x=test_x, model=likelihood_UCB_list[alpha], legend_group="UCB",  color = 'coral'
                         )
    plot_PI   = plot_GPR(data_x=X_lofi_PI[NUM_INITIAL_POINTS:,alpha],   
                         data_y=Y_lofi_PI[NUM_INITIAL_POINTS:,0],   
                         x=test_x, model=likelihood_PI_list[alpha],  legend_group="PI",   color = 'cornflowerblue'
                         )
    plot_EI   = plot_GPR(data_x=X_lofi_EI[NUM_INITIAL_POINTS:,alpha],   
                         data_y=Y_lofi_EI[NUM_INITIAL_POINTS:,0],   
                         x=test_x, model=likelihood_EI_list[alpha],  legend_group="EI",   color = 'crimson'
                         )
    plot_qEI  = plot_GPR(data_x=X_lofi_qEI[NUM_INITIAL_POINTS:,alpha],  
                         data_y=Y_lofi_qEI[NUM_INITIAL_POINTS:,0],  
                         x=test_x, model=likelihood_qEI_list[alpha], legend_group="qEI",  color = 'darkblue'
                         )
    plot_qNEI = plot_GPR(data_x=X_lofi_qNEI[NUM_INITIAL_POINTS:,alpha], 
                         data_y=Y_lofi_qNEI[NUM_INITIAL_POINTS:,0], 
                         x=test_x, model=likelihood_qNEI_list[alpha],legend_group="qNEI", color = 'darkgoldenrod'
                         )
    fig4 = go.Figure()
    fig4.add_trace(go.Scatter(
        x=X_test[:,alpha],
        y=Y_test[:,0],
        legendgroup="Test",  
        name="Test",
        mode="markers",
        marker=dict(color="burlywood", size=7)
    ))
    fig4.add_traces(data=plot )
    fig4.add_traces(data=plot_UCB )
    fig4.add_traces(data=plot_PI )
    fig4.add_traces(data=plot_EI )
    fig4.add_traces(data=plot_qEI )
    fig4.add_traces(data=plot_qNEI)
    fig4.update_layout(
            width=1500,
            height=800,
            autosize=False,
            plot_bgcolor='rgba(0,0,0,0)',
            title='WingPCA'+str(alpha+1)+' - GPR on Opt Points vs Test Points')
    fig4 = update_layout_of_graph(fig=fig4,
                              title='GPR on Opt Points vs Test Points')

    fig4.show()