In [1]:
import sys
import os
import torch
import matplotlib.pyplot as plt
import math
import numpy as np
from gpytorch.kernels import MaternKernel, ScaleKernel
from botorch.models import SingleTaskGP
from botorch.models.transforms import Standardize
from botorch.fit import fit_gpytorch_mll
from botorch.optim import optimize_acqf
from botorch.acquisition import ExpectedImprovement, LogExpectedImprovement, PosteriorMean, UpperConfidenceBound
from gpytorch.mlls import ExactMarginalLogLikelihood
from botorch.utils.transforms import unnormalize, normalize
from pprint import pprint
from typing import Optional
from src.scheffe_generator import ScheffeGenerator

# 設定設備與型別
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
dtype = torch.double

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
def set_single_task_gp(train_x:torch.Tensor, train_y:torch.Tensor, kernel_type:str='RBF') -> SingleTaskGP:
    '''
    This function sets up a SingleTaskGP surrogate model with the specified kernel type (RBF or Matern).
    Parameters:
    train_x: Training input data as a torch.Tensor.
    train_y: Training output data as a torch.Tensor.
    kernel_type: The type of kernel to use for the GP model ('RBF' or 'Matern').
    Returns:
    surrogate_model: The configured SingleTaskGP model.
    '''
    if kernel_type == 'RBF':
        surrogate_model = SingleTaskGP(
            train_x,
            train_y,
            outcome_transform=Standardize(m=train_y.shape[-1])
        )
    elif kernel_type == 'Matern':
        covar_module = ScaleKernel(
            MaternKernel(nu=2.5)
        )
        surrogate_model = SingleTaskGP(
            train_x,
            train_y,
            covar_module=covar_module,
            outcome_transform=Standardize(m=train_y.shape[-1])
        )
    else:
        raise ValueError('Unsupported kernel type, kernel_type should be "RBF" or "Matern".')

    return surrogate_model

def one_bo_step_with_gp_ei(
    train_x:torch.Tensor,
    train_y:torch.Tensor,
    kernel_type:str,
    best_f:torch.Tensor,
    bounds:torch.Tensor,
    constraints=None,
    acquisition_type:str='LogEI'
) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor, SingleTaskGP]:
    '''
    This function performs one step of Bayesian Optimization using a Gaussian Process surrogate model and an acquisition function (LogEI or UCB).
    The output of this function is:
    candidate: The next evaluation point suggested by the acquisition function.
    acq_value: The acquisition function value at the candidate point.
    best_predicted_x: The point that the surrogate model predicts to be the best.
    surrogate_model: The trained surrogate model (Gaussian Process).
    '''
    # build surrogate model with GP
    surrogate_model = set_single_task_gp(train_x, train_y, kernel_type=kernel_type)

    # Find Surrogate Model Best Point
    post_mean_func = PosteriorMean(model=surrogate_model)
    if constraints is not None:
        best_predicted_x, _ = optimize_acqf(
            acq_function=post_mean_func,
            bounds=bounds,
            q=1,
            num_restarts=20,
            raw_samples=256,
            equality_constraints=constraints,
        )
    else:
        best_predicted_x, _ = optimize_acqf(
            acq_function=post_mean_func,
            bounds=bounds,
            q=1,
            num_restarts=20,
            raw_samples=256
        )

    mll = ExactMarginalLogLikelihood(surrogate_model.likelihood, surrogate_model)
    fit_gpytorch_mll(mll)

    # 定義獲取函數
    if acquisition_type == 'LogEI':
        acqu_fun = LogExpectedImprovement(model=surrogate_model, best_f=best_f)
    elif acquisition_type == 'UCB':
        acqu_fun = UpperConfidenceBound(model=surrogate_model)
    else:
        raise ValueError('Unsupported acquisition type, the acquisition_type should be "LogEI" or "UCB".')
    
    # 最適化獲取函數以找到下一個評估點
    if constraints is not None:
        candidate, acq_value = optimize_acqf(
            acq_function=acqu_fun,
            bounds=bounds,
            q=1,
            num_restarts=20,
            raw_samples=256,
            equality_constraints=constraints,
        )
    else:
        candidate, acq_value = optimize_acqf(
            acq_function=acqu_fun,
            bounds=bounds,
            q=1,
            num_restarts=20,
            raw_samples=256,
        )
    
    return candidate, acq_value, best_predicted_x, surrogate_model

In [3]:
def optimization_loop(
    train_x:torch.Tensor,
    train_obj:torch.Tensor,
    gt_x:torch.Tensor,
    gt_y:float,
    gt_func,
    bounds:torch.Tensor,
    constraints:Optional[list]=None,
    n_iterations:int=20,
    kernel_type:str='RBF',
    acquisition_type:str='LogEI',
    gt_func_noiseless: bool = False
):
    '''
    This function performs the Bayesian Optimization loop for a specified number of iterations.
    Parameters:
    train_x: Initial training input data as a torch.Tensor.
    train_obj: Initial training output data as a torch.Tensor.
    gt_x: Ground truth optimal input data as a torch.Tensor.
    gt_y: Ground truth optimal output value as a float.
    gt_func: The oracle function to evaluate.
    bounds: The bounds for the input space as a torch.Tensor.
    constraints: Optional list of constraints for the optimization.
    n_iterations: Number of BO iterations to perform.
    kernel_type: The type of kernel to use for the GP model ('RBF' or 'Matern').
    acquisition_type: The type of acquisition function to use ('LogEI' or 'UCB').
    '''
    best_f = train_obj.max()

    inference_regrets = [] # 用來存 inference regret 的數值
    simple_regrets = [] # 用來儲存 simple regret 的數值

    for iteration in range(n_iterations):
        candidate, acq_value, best_predicted_x, surrogate_model = one_bo_step_with_gp_ei(
            train_x, train_obj, kernel_type, best_f, bounds, constraints, acquisition_type
        )

        # Evaluate the oracle function at the candidate point
        
        new_y = gt_func( X=candidate.cpu().numpy(), noiseless=gt_func_noiseless)
        new_y = torch.tensor(new_y, device=device).unsqueeze(0)

        # Update training data
        train_x = torch.cat([train_x, candidate], dim=0)
        train_obj = torch.cat([train_obj, new_y], dim=0)

        # Update best observed value
        if new_y > best_f:
            best_f = new_y

        # Calculate regrets
        inferred_y = gt_func(best_predicted_x.cpu().numpy(), noiseless=gt_func_noiseless)
        inference_regret = float(gt_y - inferred_y[0])
        simple_regret = float(gt_y - best_f.cpu().item())
        inference_regrets.append(inference_regret)
        simple_regrets.append(simple_regret)
        print(f'Iter {iteration+1}/{n_iterations}' + '-'*50)
        print(f'Ground True: {gt_y:.3f} New Value: {new_y.item():.3f}, Best Value: {best_f.item():.3f}')
        print(f'Ground True X: {np.array2string(gt_x.cpu().numpy(), precision=3, suppress_small=True)}')
        print(f'New Candidate: {np.array2string(candidate.cpu().numpy(), precision=3, suppress_small=True)}')

    return train_x, train_obj, surrogate_model, inference_regrets, simple_regrets

In [5]:
# Set Number of Optimization Step
n_interations = 20
ninit = 10
dataset_path = '/workspaces/BO_EXPERIMENTS/src/datasets/D=10_N=1_K=5/oracle_data_D10_A_000.pt'
dataset = torch.load(dataset_path)

In [7]:
dataset['ground_truth'].keys()

dict_keys(['x_star', 'f_star', 'generator_seed'])

In [None]:
train_x = dataset['initial_data']['X'].to(device)

In [None]:
train_x[:ninit, :].shape

In [None]:


# Set Oracle Function
D = dataset['config']['D']
variant = dataset['config']['variant']
k_active = dataset['config']['k_active']
seed = dataset['config']['seed']
gen = ScheffeGenerator( D=D, k_active=k_active, variant=variant, seed=seed )
gt_func = gen.oracle

# initial dataset
train_x = dataset['initial_data']['X'].to(device)
train_obj = dataset['initial_data']['Y'].to(device)
gt_x = dataset['ground_truth']['x_star'].to(device)
gt_y = dataset['ground_truth']['f_star']

# Set X bound
bounds = torch.stack([torch.zeros(D), torch.ones(D)]).to(device, dtype=dtype)

# Set constraints
constraints = [
    (
        torch.arange(D, device=device), # indices: X 的哪些維度要參與計算
        torch.ones(D, dtype=dtype, device=device), # coefficients: 這些維度的係數
        1.0 # rhs: 等號右邊的值 (Sum = 1.0)
    )
]

# Run Optimization Loop
final_train_x, final_train_obj, final_surrogate_model, inference_regrets, simple_regrets = optimization_loop(
    train_x=train_x,
    train_obj=train_obj,
    gt_x=gt_x,
    gt_y=gt_y,  
    gt_func=gt_func,
    bounds=bounds,
    constraints=constraints,
    n_iterations=n_interations,
    kernel_type='RBF',
    acquisition_type='LogEI',
    gt_func_noiseless=True
)

# 出現以下 warning 代表優化器在滿足 等式約束 (Equality Constraints) 的超平面上移動時，遇到了嚴重的數值梯度不穩定。
# /home/appuser/.local/lib/python3.12/site-packages/botorch/optim/optimize.py:789: RuntimeWarning: Optimization failed in `gen_candidates_scipy` with the following warning(s):
#[OptimizationWarning('Optimization failed within `scipy.optimize.minimize` with status 8 and message Positive directional derivative for linesearch.')]
#Trying again with a new set of initial conditions.
#  return _optimize_acqf_batch(opt_inputs=opt_inputs)

In [None]:
# inference_regrets = [] # 用來存 inference regret 的數值
# simple_regrets = [] # 用來儲存 simple regret 的數值

# for i in range(5):
#     # Fit GP surrogate model
#     gp = SingleTaskGP(train_x, train_obj)
#     mll = ExactMarginalLogLikelihood(gp.likelihood, gp)
#     fit_gpytorch_mll(mll)

#     # Find Surrogate Model Best Point
#     post_mean_func = PosteriorMean(model=gp)
#     best_predicted_x, _ = optimize_acqf(
#         acq_function=post_mean_func,
#         bounds=bounds,
#         q=1,
#         num_restarts=10,
#         raw_samples=100,
#         equality_constraints=constraints,
#     )

#     # Get New Observe Data
#     new_y = gt_func( X=best_predicted_x.cpu().numpy(), noiseless=True)

#     # Set Acquisition Function
#     best_f = train_obj.max().item()
#     EI = LogExpectedImprovement(model=gp, best_f=best_f)

#     # Optimize
#     candidate, acq_value = optimize_acqf(
#         acq_function=EI,
#         bounds=bounds,
#         q=1,                     # 每次推薦 1 個點
#         num_restarts=10,         # 隨機重啟次數（類似 SLSQP 的重啟）
#         raw_samples=100,         # 初始採樣點數量
#         equality_constraints=constraints,
#     )

#     # Combine Old and New Observe Data
#     train_x = torch.cat([train_x, candidate])
#     train_obj = torch.cat([train_obj, torch.tensor(new_y, device=device).unsqueeze(0) ])
    
#     # Compute Simple Regret
#     max_train_obj = train_obj.max().item()
#     simple_regret = gt_y - max_train_obj
#     simple_regrets.append(simple_regret)

#     # Compute Inference Regrets
#     infer_regret = float((gt_y - new_y)[0])
#     inference_regrets.append(infer_regret)
    
#     print(f'Epoch {i+1}: Real Best Value: {gt_y:.2f}, Max Train Obj = {max_train_obj:.2f}, Current Train Obj = {float(new_y[0]):.2f}, Simple Regret = {simple_regret:.2f}, Infer Regret = {infer_regret:.2f}, SumX = {candidate.sum().item():.1f}')

# output = {
#     'inference_regrets': inference_regrets,
#     'simple_regrets': simple_regrets,
#     'opt_x': candidate.cpu().numpy().tolist(),
#     'opt_y': float(new_y[0])
# }

In [None]:
# for name, param in gp.named_parameters():
#     print(f"參數名: {name} | 數值: {param.data}")

In [None]:
# # 測試預測點
# test_tensor = torch.tensor([[100,100,100]], dtype=torch.double).to(device)
# with torch.no_grad():
#     # 取得後驗分佈
#     posterior = gp.posterior(test_tensor)
    
#     # 取得預測平均值 (Mean)
#     mean = posterior.mean
    
#     # 取得預測變異數 (Variance) 或 標準差 (Stddev)
#     variance = posterior.variance
#     stddev = torch.sqrt(variance)

# print(mean, stddev)

# # 測試是否有通過所有的點
# with torch.no_grad():
#     # 取得後驗分佈
#     posterior = gp.posterior(train_x)
    
#     # 取得預測平均值 (Mean)
#     mean = posterior.mean
    
#     # 取得預測變異數 (Variance) 或 標準差 (Stddev)
#     variance = posterior.variance
#     stddev = torch.sqrt(variance)

# # 3. 計算誤差 (RMSE)
# rmse = torch.sqrt(torch.mean((mean - train_obj)**2))#  
# print(f"訓練集 RMSE: {rmse.item()}")

In [None]:
train_obj