**Use on-demand test objective functions (e.g. bimodal) to help calibrate surrogate functions and acquisition functions**

In [None]:
import numpy as np
import pandas as pd
import random
import matplotlib.pyplot as plt
from scipy import optimize
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import RationalQuadratic, RBF, Matern, DotProduct, WhiteKernel
from mbbo.output import format_for_submission, output_results_csv, ScenarioResult, TestResults #!pip install -e .
from mbbo.test_gaussian import one_d_test, two_d_test, n_d_test
import csv

In [None]:
random.seed(51)
np.random.seed(51)

week1_out = [0.0, -0.03634716524130564, -0.13995571712281177, -11.512791229057324, 351.7115420928652, -0.5971511450896173, 0.2910786825809617, 8.618272750952901]
week1_in = [np.array([0.00367, 0.9999 ]), 
            np.array([0.851999, 0.973204]), 
            np.array([0.747032, 0.28413 , 0.226329]), 
            np.array([0.169128, 0.756136, 0.275457, 0.528761]), 
            np.array([0.439601, 0.772709, 0.376277, 0.933269]), 
            np.array([0.232204, 0.132714, 0.53824 , 0.760706, 0.075595]), 
            np.array([0.476821, 0.248196, 0.242816, 0.576157, 0.162416, 0.290926]), 
            np.array([0.221603, 0.703755, 0.674607, 0.130295, 0.376739, 0.669444, 0.136655, 0.061316])]    

week2_out = [-1.2075460499722905e-18, 0.17608630702211278, -0.17239781799687137, -31.982880235497266, 1236.8846557000643, -2.451406055102475, 0.00010805707939840242, 5.178959940699899]
week2_in = [np.array([0.476035, 0.572563]), 
            np.array([0.641846, 0.498841]), 
            np.array([0., 0., 0.]), 
            np.array([0.953433, 0.895217, 0.812477, 0.618719]), 
            np.array([0.987523, 0.470227, 0.946409, 0.105412]), 
            np.array([3.40696e-01, 4.94179e-01, 2.10000e-05, 3.08050e-02, 9.39958e-01]), 
            np.array([0.88314 , 0.756642, 0.      , 0.      , 0.9     , 0.942719]), 
            np.array([0.993634, 0.968223, 0.979285, 0.397318, 0.965856, 0.955218, 0.006078, 0.024001])]

week3_out = [-2.118633970077695e-95, -0.1068943462941895, -0.005838531351604155, -2.6718044713157307, 32.0025, -1.4580645404618957, 0.4892165178828796, 9.9417237968706]

week3_in = [np.array([0.127849, 0.198491]), 
            np.array([0.246077, 0.656597]), 
            np.array([0.492581, 0.611593, 0.5     ]), 
            np.array([0.510358, 0.521985, 0.383995, 0.445439]), 
            np.array([0.5, 0.5, 0.5, 0.5]), 
            np.array([0.66336 , 0.      , 0.999999, 0.332984, 0.      ]), 
            np.array([0.      , 0.165185, 0.28681 , 0.      , 0.318109, 0.999999]), 
            np.array([0.119265, 0.254466, 0.117275, 0.24563 , 0.548426, 0.553172,  0.230111, 0.516062])]

week4_out = [-8.306597721001677e-27, 0.715790799340666, -0.00506242600241439, -3.2126105576284227, 31.94090504001378, -0.9205277885179105, 0.3911680928412005, 9.6899612812574]

week4_in = [np.array([0.24001 , 0.357107]),
                np.array([0.5, 0.5]), 
                np.array([0.5, 0.5, 0.5]), 
                np.array([0.549669, 0.508442, 0.413776, 0.413008]), 
                np.array([0.500102, 0.500102, 0.500102, 0.500102]), 
                np.array([0.563405, 0.      , 0.83134 , 0.999999, 0.      ]), 
                np.array([0.      , 0.626234, 0.280125, 0.      , 0.36777 , 0.451863]), 
                np.array([0.275027, 0.304704, 0.160147, 0.328388, 0.419169, 0.578759, 0.436166, 0.614079])]

week5_out = [1.517648729565899e-192, 0.509599138595138, -0.025681820315624142, -4.078914281244423, 629.9338529410855, -1.747233852094004, 0.39256467139392903, 9.7675674964181]

week5_in = [np.array([0.999999, 0.999999]), np.array([0.666698, 0.666698]), np.array([0.558875, 0.558874, 0.558875]), np.array([0.523385, 0.494608, 0.22783 , 0.357468]), np.array([0.932544, 0.415248, 0.89143 , 0.050433]), np.array([0.      , 0.687353, 0.      , 0.999999, 0.      ]), np.array([0.      , 0.56895 , 0.354465, 0.290165, 0.482077, 0.989692]), np.array([0.      , 0.047944, 0.315163, 0.115808, 0.571106, 0.59513 ,
       0.376754, 0.548807])]

def get_function_data(function_number):
    ary_in = np.load(f'../data/raw/initial_data/function_{function_number}/initial_inputs.npy')
    ary_out = np.load(f'../data/raw/initial_data/function_{function_number}/initial_outputs.npy')

    

    ary_out=np.append(ary_out, week1_out[function_number-1])
    ary_out=np.append(ary_out, week2_out[function_number-1])
    ary_out=np.append(ary_out, week3_out[function_number-1])
    ary_out=np.append(ary_out, week4_out[function_number-1])
    ary_out=np.append(ary_out, week5_out[function_number-1])
    ary_in=np.vstack((ary_in, week1_in[function_number-1]))
    ary_in=np.vstack((ary_in, week2_in[function_number-1]))
    ary_in=np.vstack((ary_in, week3_in[function_number-1]))
    ary_in=np.vstack((ary_in, week4_in[function_number-1]))
    ary_in=np.vstack((ary_in, week5_in[function_number-1]))
    
    return ary_in, ary_out

class FunctionInfo():
    rbf_lengthscales = [0.8, 0.001, 0.016, 0.337, 0.0162, 0.68, 0.644, 1.0 ] # functions 2 and 8 didn't find optimal lengthscale in training
    ucb_kappas = [0.1, 1.1, 0.00001, 0.8, 0.4, 0.8, 0.8, 0.8] # only got successful calibration for first 3. Default to 0.8 for the rest (high because still exploring) 
                                                              # - except function 5 which is unimodal so can exploit more.
                                                              # Function 3 - aim is to reduce bad side effects of drug combination - have a maximum around 0.5,0.5,0.5 so from week 5 on, exploiting that
    perturb_max_starts = [0,0,0,0,-0.055,0,0,0] #having trouble getting function 5 to explore a little more away from its maximum - nudge
    kernel_params_list=[{"class": "RationalQuadratic", "alpha": 0.4},
                   {"class": "RationalQuadratic", "alpha": 1.0},
                   {"class": "RationalQuadratic", "alpha": 1.0},
                   {"class": "RationalQuadratic", "alpha": 1.0}, #f4 - all NaN
                   {"class": "RationalQuadratic", "alpha": 1.4}, #f5 = mainly NaN
                   {"class": "Matern52", "nu":2.5}, #f6 - mainly NaN
                   {"class": "RationalQuadratic", "alpha": 1.0},
                   {"class": "RationalQuadratic", "alpha": 1.0}]

    def __init__(self, function_number):
        self.function_number = function_number
        function_ix = function_number - 1
        self.kernel_lengthscale = self.rbf_lengthscales[function_ix]
        self.ucb_kappa = self.ucb_kappas[function_ix]
        self.kernel_params = self.kernel_params_list[function_ix]
        self.perturb_max_start = self.perturb_max_starts[function_ix]


In [None]:
#check progress so far

responses = [week1_out, week2_out, week3_out, week4_out, week5_out]

for i in range(1,9):
    initial_data = np.load(f'../data/raw/initial_data/function_{i}/initial_outputs.npy')
    for week in range(1, 4):
        response = responses[week-1][i-1]
        if response > max(initial_data):
            print(f"Function {i} - week {week} improved on initial values - {response} over {max(initial_data)}")


In [None]:
#Function 3 - "(hint: one of the variables may not cause any effects on the person)."
X, y = get_function_data(3)

#1. Simple correlations
correlations = []
for i in range(X.shape[1]):
    corr_matrix = np.corrcoef(X[:, i], y)
    corr = corr_matrix[0, 1]  # correlation between dimension i and y
    correlations.append(corr)
print("Simple correlations:")
for i, corr in enumerate(correlations):
    print(f"Dimension {i}: correlation with output = {corr:.4f}")

#2 Coefficients from linear regression
from sklearn.linear_model import LinearRegression
model = LinearRegression()
model.fit(X, y)

coeffs = model.coef_
intercept = model.intercept_
print("Linear regression:")
print(f"Coefficients: {coeffs}")
print(f"Intercept: {intercept}")
 #3 Features importance from random forest
from sklearn.ensemble import RandomForestRegressor
rf = RandomForestRegressor(random_state=51)
rf.fit(X, y)

importances = rf.feature_importances_
print("Random forest:")
for i, imp in enumerate(importances):
    print(f"Dimension {i}: importance = {imp:.4f}")

In [27]:
#function 1 - values mostly very close to 0
def scale_f1(values):
    # Step 1: Replace zeros with a small positive value to avoid log(0)
    epsilon = 1e-200  # A very small number
    values_safe = np.where(values == 0, epsilon, values)

    # Step 2: Take the logarithm of absolute values
    log_values = np.log10(np.abs(values_safe))

    # Step 3: Normalize to the range [-1, 1]
    log_min, log_max = np.min(log_values), np.max(log_values)
    scaled_values = 2 * (log_values - log_min) / (log_max - log_min) - 1

    # Step 4: Restore signs
    return np.sign(values) * np.abs(scaled_values)

f1_in, f1_out = get_function_data(1)
f1_out_scaled = scale_f1(f1_out) 
for original, scaled in zip(f1_out ,f1_out_scaled):
    print(f"Original: {original:.2E} -> Scaled: {scaled:.4f}")

Original: 1.32E-79 -> Scaled: 0.2262
Original: 1.03E-46 -> Scaled: 0.5592
Original: 7.71E-16 -> Scaled: 0.8717
Original: 3.34E-124 -> Scaled: 0.2253
Original: -3.61E-03 -> Scaled: -1.0000
Original: -2.16E-54 -> Scaled: -0.4814
Original: -2.09E-91 -> Scaled: -0.1067
Original: 2.54E-40 -> Scaled: 0.6239
Original: 3.61E-81 -> Scaled: 0.2104
Original: 6.23E-48 -> Scaled: 0.5468
Original: 0.00E+00 -> Scaled: 0.0000
Original: -1.21E-18 -> Scaled: -0.8433
Original: -2.12E-95 -> Scaled: -0.0663
Original: -8.31E-27 -> Scaled: -0.7607
Original: 1.52E-192 -> Scaled: 0.9172


In [None]:
def plot_twod_objective(twod):

    N = 100
    x0_vals = np.linspace(0, 1, N)
    x1_vals = np.linspace(0, 1, N)
    X0, X1 = np.meshgrid(x0_vals, x1_vals)        # shape (N, N) each
    X_grid = np.column_stack((X0.ravel(), X1.ravel()))  # shape (N*N, 2)

    y_grid  = twod.call_function(X_grid)
    # Reshape back to (N, N) for plotting
    y_2d = y_grid.reshape(N, N)
    
    # Heatmap
    plt.figure(figsize=(6, 5))
    plt.imshow(y_2d, origin='lower', extent=(0,1,0,1), cmap='viridis', aspect='equal')
    plt.colorbar(label='f(x, y)')
    plt.title("Bimodal 2D Function (Heatmap)")
    plt.xlabel("x0")
    plt.ylabel("x1")
    plt.show()

    # 3D surface
    #fig = plt.figure(figsize=(8,6))
    #ax = fig.add_subplot(111, projection='3d')
    #ax.plot_surface(X0, X1, y_2d, cmap='viridis', edgecolor='none')
    #ax.set_title("Bimodal 2D Function (Surface Plot)")
    #ax.set_xlabel("x0")
    #ax.set_ylabel("x1")
    #ax.set_zlabel("f(x, y)")
    #plt.show()

    print("Max from grid search:", np.max(y_2d))



testmu1 = [random.random(),random.random()]
testmu2 = [random.random(),random.random()]
testalpha2 = random.uniform(0.3, 0.9)

twod = two_d_test(mu1=testmu1, mu2=testmu2, alpha2=testalpha2)
plot_twod_objective(twod)


In [None]:
#week 4
#pilot test n_d by using it to plot one_d
x = np.linspace(0, 1, 500)
testmu1 = 0.3
testmu2 = 0.7
testalpha1 = 1
testalpha2 = 0.5
testsigma1 = 0.1
testsigma2 = 0.2
oned = one_d_test(sigma1 = testsigma1, sigma2 = testsigma2, mu1=testmu1, mu2=testmu2, alpha1 =testalpha1, alpha2 = testalpha2)
y = oned.call_function(x)

# Plot the function
plt.figure(figsize=(8, 5))
plt.plot(x, y, label="Bimodal Function")
plt.title("Bimodal Function")
plt.xlabel("x")
plt.ylabel("f(x)")
plt.grid(True)
plt.legend()
plt.show()

x = np.linspace(0, 1, 500)
nd = n_d_test(sigma=[testsigma1, testsigma2], mu=[[[testmu1]], [[testmu2]]], alpha=[testalpha1, testalpha2])
y = nd.call_function(x)

# Plot the function
plt.figure(figsize=(8, 5))
plt.plot(x, y, label="Bimodal Function")
plt.title("Bimodal Function")
plt.xlabel("x")
plt.ylabel("f(x)")
plt.grid(True)
plt.legend()
plt.show()

x2 = np.array([0.46])
onedy2 = oned.call_function(x2)
print(onedy2)
ndy2 = nd.call_function(x2)
print(ndy2)




In [None]:
#week 4
#pilot test n_d by using it to plot two_d
testmu1 = [random.random(),random.random()]
testmu2 = [random.random(),random.random()]
testalpha2 = random.uniform(0.3, 0.9)
testalpha1 = 1
twod = two_d_test(mu1=testmu1, mu2=testmu2, alpha1 = 1, alpha2=testalpha2)
plot_twod_objective(twod)

nd = n_d_test(mu = [testmu1, testmu2], sigma=[0.1,0.1], alpha = [testalpha1, testalpha2] )
plot_twod_objective(nd)

# try 5 peaks
mu5 = np.random.rand(5,2)
sigma5 = (0.2, 0.1, 0.2, 0.1, 0.2)
nd5 = n_d_test(mu=mu5, alpha=(1, 0.8,0.6, 0.6, 0.6), sigma = sigma5)
plot_twod_objective(nd5)

In [None]:
# Test acquisition functions on the 1d function

noise_assumption = 1e-10 # noise assumption, a hyper-parameter

input_bounds = [(0, 1)] # bounds for the input space

#Experiment with lengthscale 0.1 to 0.5 in steps of 0.05
# and kappa 0.1 to 3 in steps of 0.1

def input_bounds_for_dim(dimensions):
    return [(0, 1) for _ in range(dimensions)]

def bounds_midpoint(input_bounds):
    return np.array([(low + high) / 2.0 for low, high in input_bounds])

def acquisition_UCB(x, model, ucb_kappa):
    mean, std = model.predict(x.reshape(1, -1), return_std=True)
    return mean + ucb_kappa * std

def test_on_oned(rbf_lengthscale, ucb_acquisition_kappa, test_oned_function, max_iterations):
    
    objective_x = np.linspace(0, 1, 500)
    objective_y = test_oned_function.call_function(objective_x)
    objective_y_max = max(objective_y)

    #quick plot objective function
    #plt.figure(figsize=(3, 5))
    #plt.plot(objective_x, objective_y, label="Objective")
    #plt.grid(True)
    #plt.legend()
    #plt.show()

    kernel = RBF(length_scale=rbf_lengthscale, length_scale_bounds='fixed')
    model = GaussianProcessRegressor(kernel = kernel, alpha=noise_assumption)

    X = []
    Y = []
    # First point
    x0 = bounds_midpoint(input_bounds)  # array([0.5])
    X.append(x0)
    Y.append(test_oned_function.call_function(x0))
    i=0
    #print("Objective max:", objective_y_max)
    while abs(max(Y) - objective_y_max) > 0.01:
        i+=1
        if i > max_iterations:
            print("Max iterations reached")
            return 0
        #print("Iteration", i, " Max found:", max(Y))
        # print(abs(max(Y) - objective_y_max))
        # fit the model
        model.fit(X, Y)
            
        # optimize the acquisition function
        result = optimize.minimize(lambda x: -acquisition_UCB(x, model, ucb_acquisition_kappa), x0=bounds_midpoint(input_bounds), bounds=input_bounds)
        x_new = result.x
        y_new = test_oned_function.call_function(x_new)
        
        # add the new observation to the training set
        X.append(x_new) #assumes 1d
        Y.append(y_new)

    return i

def plot_oned_objective(oned):
    objective_x = np.linspace(0, 1, 500)
    objective_y = oned.call_function(objective_x)
    plt.figure(figsize=(3, 3))
    plt.plot(objective_x, objective_y, label="Objective")
    plt.grid(True)
    plt.show()



In [None]:
#ucb_kappa = 3
#rbf_lengthscale = 0.2 # lengthscale parameter
test_functions = []
for i in range(3):
    testmu1 = random.random()
    testmu2 = random.random()
    testalpha2 = random.uniform(0.3, 0.9)
    oned = one_d_test(mu1=testmu1, mu2=testmu2, alpha2=testalpha2)
    test_functions.append(oned)
    plot_oned_objective(oned) # to see what we're dealing with

results = []
scenario_results = []
for rbf_lengthscale in np.arange(0.05, 0.25, 0.1):
    print("Lengthscale:", rbf_lengthscale)
    for kappa in np.arange(0.1, 2.00, 0.5):
        ucb_kappa = round(kappa, 2)
        print("Kappa:", ucb_kappa)
        fn = 0
        fn_result = []
        fn_result.append(rbf_lengthscale)
        fn_result.append(ucb_kappa)
        
        scenario_description = f"ls={rbf_lengthscale} - k={ucb_kappa}"
        result_list = []
        for oned in test_functions:
            iterations_required = test_on_oned(rbf_lengthscale, ucb_kappa, oned, 20)
            fn_result.append(iterations_required)
            #print("Lengthscale:", rbf_lengthscale, "Kappa:", ucb_kappa, "Function:", fn, "Needed:", iterations_required)
            result_list.append(iterations_required)
            fn+=1
        results.append(fn_result)
        
        scenario_result = ScenarioResult(scenario_description, result_list)
        scenario_results.append(scenario_result)
test_results = TestResults(scenario_results)

output_results_csv(test_results)

In [None]:
#test on 2d functions
def test_on_twod(rbf_lengthscale, ucb_acquisition_kappa, test_twod_function, max_iterations):
    
    twod_input_bounds = [(0, 1), (0,1)]

    N = 100
    x0_vals = np.linspace(0, 1, N)
    x1_vals = np.linspace(0, 1, N)
    X0, X1 = np.meshgrid(x0_vals, x1_vals)        # shape (N, N) each
    objective_x = np.column_stack((X0.ravel(), X1.ravel()))  # shape (N*N, 2)
    objective_y = test_twod_function.call_function(objective_x)
    objective_y_max = max(objective_y)

    kernel = RBF(length_scale=rbf_lengthscale)
    model = GaussianProcessRegressor(kernel = kernel, alpha=noise_assumption)

    X = []
    Y = []
    # First point
    initial_x = bounds_midpoint(twod_input_bounds)  # array([0.5])
    X.append(initial_x)
    Y.append(test_twod_function.call_function(initial_x))
    for i in range(5):
        starting_x = [random.random(), random.random()]
        starting_y = test_twod_function.call_function(starting_x)
        X.append(starting_x)
        Y.append(starting_y)
    
    i=0

    #print("Objective max:", objective_y_max)
    while abs(max(Y) - objective_y_max) > 0.01:
        i+=1
        # print("Max y:", max(Y))
        if i > max_iterations:
            print("Max iterations reached")
            return 0
        #print("Iteration", i, " Max found:", max(Y))
        # print(abs(max(Y) - objective_y_max))
        # fit the model
        model.fit(X, Y)
            
        # optimize the acquisition function
        result = optimize.minimize(lambda x: -acquisition_UCB(x, model, ucb_acquisition_kappa), x0=initial_x, bounds=twod_input_bounds)
        x_new = result.x
        y_new = test_twod_function.call_function(x_new)
        
        # add the new observation to the training set
        X.append(x_new)
        Y.append(y_new)

    return i

test_functions = []
for i in range(3): #5
    testmu1 = [random.random(),random.random()]
    testmu2 = [random.random(),random.random()]
    testalpha2 = random.uniform(0.3, 0.9)
    twod = two_d_test(mu1=testmu1, mu2=testmu2, alpha2=testalpha2)
    test_functions.append(twod)
    plot_twod_objective(twod) # to see what we're dealing with

#test_on_twod(0.15, 0.4, test_functions[0], 20)

results = []
for rbf_lengthscale in np.arange(0.05, 0.5, 0.1):
    print("Lengthscale:", rbf_lengthscale)
    for kappa in np.arange(0.1, 2.00, 0.5):
        ucb_kappa = round(kappa, 2)
        print("Kappa:", ucb_kappa)
        scenario_description = f"ls={rbf_lengthscale} - k={ucb_kappa}"
        fn = 0
        fn_result = []
        fn_result.append(rbf_lengthscale)
        fn_result.append(ucb_kappa)
        result_list = []
        for twod in test_functions:
            iterations_required = test_on_twod(rbf_lengthscale, ucb_kappa, twod, 20)
            fn_result.append(iterations_required)
            result_list.append(iterations_required)
            print("Lengthscale:", rbf_lengthscale, "Kappa:", ucb_kappa, "Function:", fn, "Needed:", iterations_required)
            fn+=1
        results.append(fn_result)

        scenario_result = ScenarioResult(scenario_description, result_list)
        scenario_results.append(scenario_result)
test_results = TestResults(scenario_results)

output_results_csv(test_results)


from the output, lengthscale 0.25 kappa 0.3 was about best for 2d

for 1d, lengthscale 0.15 kappa 0.4


In [None]:
# week 4 - set up test profiles to test the higher dimensions

#Generate a random data point between 0 and 1 for the number of dimensions
def random_point(dimensions):
    return np.random.rand(dimensions)

class TestProfile():
    def __init__(self, dimensions, start_samples, maxima, std = 0.1):
        self.dimensions = dimensions
        self.start_samples = start_samples # number of samples to start with
        self.maxima = maxima
        self.std = std

    def CreateFunctions(self, number):
        profile_functions = []
        for i in range(number):
            mu = []
            # assume 0.1 std for all gaussians
            # assume alpha (relative height) = 1 for first peak and 0.5 for others
            sigma = [self.std] * self.maxima
            alpha = [1]

            for m in range(self.maxima):
                mu.append(random_point(self.dimensions))
                if(m > 0):
                    alpha.append(0.5)
            ndt = n_d_test(mu=mu, sigma=sigma, alpha=alpha)
            profile_functions.append (ndt)
        return profile_functions

def get_test_profile(function_number):
    X, Y = get_function_data(function_number)
    # Return a test profile matching what we know about the objective function
    # Function 1 (2d): 2 maxima
    # Function 2 (2d): "a lot of local optima" - use small standard deviation (0.05) and 10 peaks
    # Function 4 (4d): "a lot of local optima" - use small standard deviation (0.05) and 20 peaks
    # Function 5 (4d): 1 maximum
    # For other functions - use the number of dimensions as the number of peaks
    dimensions = X.shape[1]
    start_samples = X.shape[0]
    maxima = dimensions
    std = 0.15
    if(function_number == 1):
        maxima = 2
    elif(function_number ==2):
        std = 0.1
        maxima=10
    elif(function_number == 4):
        std = 0.05
        maxima=20
    elif(function_number==5):
        maxima=1

    return TestProfile(dimensions, start_samples, maxima, std)


def test_on_n_d(test_profile: TestProfile, kernel, ucb_acquisition_kappa, test_function: n_d_test, max_iterations, n_grid = 20):
    
    # 1. Create a list of coordinate arrays, one for each dimension.
    coords_1d = [np.linspace(0, 1, n_grid) for _ in range(test_profile.dimensions)]

    # 2. Create the D-dimensional mesh.
    #    Each element of `mesh` will be an array of shape (N, N, ..., N) [D times].
    mesh = np.meshgrid(*coords_1d, indexing='ij')

    # 3. Flatten each dimension, then stack them to get shape (N^D, D).
    #    .ravel() flattens the array, and column_stack collects them into columns.
    objective_x = np.column_stack([m.ravel() for m in mesh])

    objective_y = test_function.call_function(objective_x)
    objective_y_max = max(objective_y)

    model = GaussianProcessRegressor(kernel = kernel, alpha=noise_assumption)

    X = []
    Y = []
    # First point
    bounds = input_bounds_for_dim(test_profile.dimensions)
    initial_x = bounds_midpoint(bounds)  
    #X.append(initial_x)
    #Y.append(test_function.call_function(initial_x))
    for i in range(test_profile.start_samples):
        starting_x = random_point(test_profile.dimensions)
        starting_y = test_function.call_function([starting_x])
        X.append(starting_x)
        Y.append(starting_y[0])
    
    i=0
    #print("Objective max:", objective_y_max)
    while abs(max(Y) - objective_y_max) > 0.01:
        i+=1
        # print("Max y:", max(Y))
        if i > max_iterations:
            #print("Max iterations reached")
            return np.nan, np.nan
        #print("Iteration", i, " Max found:", max(Y))
        # print(abs(max(Y) - objective_y_max))
        # fit the model
        model.fit(X, Y)
            
        # optimize the acquisition function
        result = optimize.minimize(lambda x: -acquisition_UCB(x, model, ucb_acquisition_kappa), x0=initial_x, bounds=bounds)
        x_new = result.x
        y_new = test_function.call_function([x_new])
        
        # add the new observation to the training set
        X.append(x_new)
        Y.append(y_new[0])
    trained_length_scale = 0
    if hasattr(model, "kernel_"):
        if(hasattr(model.kernel_, "length_scale")):
            trained_length_scale = model.kernel_.length_scale
    # print (trained_length_scale)
    return i, trained_length_scale



In [None]:
class KernelWithParams():
    def __init__(self, kernel, name, params):
        self.kernel = kernel
        self.name = name
        self.params = params

In [20]:

maternNu = 2.5

def make_kernels_for_tests(lengthscale):
    linearKernel = DotProduct(sigma_0=1.0)
    linearWithParams = KernelWithParams(linearKernel, "Linear (no noise)", {})
    rbfKernel = RBF(length_scale=lengthscale, length_scale_bounds=(0.001, 1))
    rbfWithParams = KernelWithParams(rbfKernel, "RBF", {})
    matern52Kernel = Matern(nu = maternNu, length_scale=lengthscale, length_scale_bounds=(0.001, 1))
    matern52WithParams = KernelWithParams(matern52Kernel, "Matern52", {"nu": maternNu})
    kernels = [linearWithParams, rbfWithParams, matern52WithParams]
    
    for alpha in np.arange(0.2, 2, 0.2):
        rqKernel = RationalQuadratic(alpha = alpha, length_scale=lengthscale, length_scale_bounds=(0.001, 1))
        kernels.append(KernelWithParams(rqKernel, "RationalQuadratic", {"alpha": alpha}))


    return kernels

def make_kernel_for_function(function_info):
    kernel = None
    match function_info.kernel_params["class"]:
        case "Linear (no noise)":
            kernel = DotProduct(sigma_0=1.0)
        case "RBF":
            kernel = RBF(length_scale=function_info.kernel_lengthscale, length_scale_bounds=(0.001, 1))
        case "Matern52":
            kernel = Matern(nu = function_info.kernel_params["nu"], length_scale=function_info.kernel_lengthscale, length_scale_bounds=(0.001, 1))
        case "RationalQuadratic":
            kernel = RationalQuadratic(alpha = function_info.kernel_params["alpha"], length_scale=function_info.kernel_lengthscale, length_scale_bounds=(0.001, 1))
        case _:
            raise Exception (f"Kernel class {function_info.kernel_params["class"]} not matched")

    return KernelWithParams(kernel, function_info.kernel_params["class"], function_info.kernel_params)


In [None]:

#tp1 = get_test_profile(1)
#tf1 = tp1.CreateFunctions(1)[0]
#iterations1 = test_on_n_d(tp1, 0.3, 1.00, tf1, 10)

#tp3 = get_test_profile(3)
#tf3 = tp3.CreateFunctions(1)[0]
#iterations3 = test_on_n_d(tp3, 0.3, 1.00, tf3, 10)

for fn in range(2, 9): # for each competition function
    test_profile = get_test_profile(fn) #create the test profile
    test_functions = []
    test_functions = test_profile.CreateFunctions(4) #create gaussian process simulations
    results = []
    kernel_lengthscale = FunctionInfo(fn).kernel_lengthscale # previously trained lengthscales from competition function data
    print("Lengthscale:", kernel_lengthscale)
    
    kernels = make_kernels_for_tests(kernel_lengthscale)
    scenario_results = []
    for kernel in kernels:
        for kappa in np.arange(0.1, 1.5, 0.3): # for kappa in range...
            ucb_kappa = round(kappa, 2)
            #print("Kappa:", ucb_kappa)
            result_line = []
            kernelTypeName = kernel.name
            if(len(kernel.params) > 0):
                kernelTypeName += " " + str(kernel.params)
            result_line.append(kernelTypeName)
            result_line.append(kernel_lengthscale)
            result_line.append(ucb_kappa)
            result_list = []
            scenario_description = f"{kernelTypeName} - ls={kernel_lengthscale} - k={ucb_kappa}"
            print(scenario_description)
            for test_function in test_functions: # Test the acquisition function configuration on each simulation
                (iterations_required, trained_length_scale) = test_on_n_d(test_profile, kernel.kernel, ucb_kappa, test_function, 30, n_grid=20)
                result_list.append(iterations_required)
                result_line.append(iterations_required)
                #result_line.append(trained_length_scale)
                #print("Lengthscale:", rbf_lengthscale, "Kappa:", ucb_kappa, "Function:", fn, "Needed:", iterations_required)
            results.append(result_line)
            
            scenario_result = ScenarioResult(scenario_description, result_list)
            scenario_results.append(scenario_result)
    test_results = TestResults(scenario_results)
    #print(results)
    output_results_csv(test_results, f"Function_{fn}")

In [28]:
# cutdown version that uses sample data instead of the initial random 5 and calls to the objective function, and suggests next point to explore.
# Use that to do week 3 submissions.
# Then, next week, start fitting on functions that are closer to the real ones in terms of local maxima/variance.

def suggest_next(kernel, ucb_acquisition_kappa, function_num, initial_x = None):
    model = GaussianProcessRegressor(kernel = kernel, alpha=noise_assumption)
    X, Y = get_function_data(function_num)
    dimensions = X.shape[1]
    bounds = input_bounds_for_dim(dimensions)
    if(initial_x is None):
        initial_x = bounds_midpoint(bounds)
    
    model.fit(X, Y)

    result = optimize.minimize(lambda x: -acquisition_UCB(x, model, ucb_acquisition_kappa), x0=initial_x, bounds=bounds)
    return result.x, model.kernel_.length_scale


for i in range(1,9):
    X,y = get_function_data(i)
    if(i==1):
        y = scale_f1(y) #logarithmic scaling for f1
    X_max = X[np.argmax(y)]
    
    print("Function", i)
    info = FunctionInfo(i)
    if(info.perturb_max_start != 0):
        print("perturb_max_start = ", info.perturb_max_start)
        X_max += info.perturb_max_start
    kernelWithParams = make_kernel_for_function(info)
    sn, ls = suggest_next(kernelWithParams.kernel, info.ucb_kappa, i, initial_x=X_max)
    formatted = format_for_submission(sn)
    print(formatted)
    #print("Trained length scale", ls)

Function 1
0.999999-0.784908
Function 2




0.500257-0.500039
Function 3
0.500001-0.500004-0.499999
Function 4
0.502787-0.488480-0.355693-0.387261
Function 5
perturb_max_start =  -0.055
0.987554-0.470163-0.946567-0.105327
Function 6




0.463126-0.317874-0.508172-0.723817-0.144808
Function 7
0.055316-0.488299-0.249433-0.216093-0.410181-0.731049
Function 8
0.108893-0.287056-0.194225-0.299992-0.537696-0.356217-0.306504-0.371320


In [None]:
f=5
info_f = FunctionInfo(f)
X,y = get_function_data(f)
X_max = X[np.argmax(y)]
X_max -= 0.055
sn, ls = suggest_next(info_f.rbf_lengthscale, 30, f, initial_x=X_max)
formatted = format_for_submission(sn)
print(formatted)