## **Imports**

In [1]:
from bayes_opt import BayesianOptimization
import numpy as np
import pandas as pd
from sklearn import preprocessing
from sklearn.ensemble import RandomForestRegressor
import math
from scipy.stats import norm
import random

In [2]:
import os
global PATH
PATH = os.getcwd()
PATH = PATH.split(os.path.sep)
if "src" in PATH:
    PATH.remove("src")
PATH = os.path.sep.join(PATH)

data_df = pd.read_csv(os.path.join(PATH, "datasets", "AgNP_dataset.csv"))

features = list(data_df.columns)[:-1]
obj_metric = list(data_df.columns)[-1]
data_df = data_df.groupby(features)[obj_metric].agg(lambda x: x.unique().mean())
data_df = (data_df.to_frame()).reset_index()

In [3]:
bounds = dict()
for col in list(data_df.columns)[:-1]:
    smallest = data_df[col].min()
    largest = data_df[col].max()
    remove = {
        '(': '',
        ')': '',
        '%': '',
        'uL/min': ''
    }
    for key,val in remove.items():
        col = col.replace(key, val)
    bounds[col] = (smallest, largest)

def return_closest_objective_metric(QAgNO3, Qpva, Qtsc, Qseed, Qtot):
    loss, dis = None, float('inf')
    features = data_df.values
    
    for i in features:
        val = np.exp2(QAgNO3 - i[0]) + np.exp2(Qpva - i[1]) + np.exp2(Qtsc - i[2]) + np.exp2(Qseed - i[3]) + np.exp2(Qtot - i[4])
        val = np.sqrt(val)
        
        if val < dis:
            dis = val
            loss = i[5]
    
    return loss

In [4]:
import warnings
import numpy as np
from scipy.stats import norm
from scipy.optimize import minimize

"""
class UtilityFunction(object):


    def __init__(self, kind='ucb', kappa=2.576, xi=0, kappa_decay=1, kappa_decay_delay=0):

        self.kappa = kappa
        self._kappa_decay = kappa_decay
        self._kappa_decay_delay = kappa_decay_delay

        self.xi = xi

        self._iters_counter = 0

        if kind not in ['ucb', 'ei', 'poi']:
            err = "The utility function " \
                  "{} has not been implemented, " \
                  "please choose one of ucb, ei, or poi.".format(kind)
            raise NotImplementedError(err)
        else:
            self.kind = kind

    def update_params(self):
        self._iters_counter += 1

        if self._kappa_decay < 1 and self._iters_counter > self._kappa_decay_delay:
            self.kappa *= self._kappa_decay

    def utility(self, x, gp, y_max):
        if self.kind == 'ucb':
            return self._ucb(x, gp, self.kappa)
        if self.kind == 'ei':
            return self._ei(x, gp, y_max, self.xi)
        if self.kind == 'poi':
            return self._poi(x, gp, y_max, self.xi)

    @staticmethod
    def _ucb(x, gp, kappa):
        with warnings.catch_warnings():
            warnings.simplefilter("ignore")
            mean, std = gp.predict(x, return_std=True)
        return mean + kappa * std

    @staticmethod
    def _ei(x, gp, y_max, xi):
        with warnings.catch_warnings():
            warnings.simplefilter("ignore")
            mean, std = gp.predict(x, return_std=True)

        a = (mean - y_max - xi)
        z = a / std
        return a * norm.cdf(z) + std * norm.pdf(z)
    
    @staticmethod
    def _poi(x, gp, y_max, xi):
        with warnings.catch_warnings():
            warnings.simplefilter("ignore")
            mean, std = gp.predict(x, return_std=True)

        z = (mean - y_max - xi)/std
        return norm.cdf(z)
"""



In [6]:
import warnings
from bayes_opt import UtilityFunction
class Custom_Acquisition(UtilityFunction):
    def __init__(self, kind, kappa):
        super().__init__(kind, kappa)
        self.apple = 1000
        
    def utility(self, x, gp, y_max):
        if self.kind == 'ucb':
            return self._ucb(x, gp, self.kappa)
        if self.kind == 'ei':
            return self._ei(x, gp, y_max, self.xi)
        if self.kind == 'poi':
            return self._poi(x, gp, y_max, self.xi)
        
    @staticmethod
    def _ucb(x, gp, kappa):
        with warnings.catch_warnings():
            warnings.simplefilter("ignore")
            mean, std = gp.predict(x, return_std=True)
        val = mean + kappa * std
        return mean + kappa * std
        
    @staticmethod
    def _ei(x, gp, y_max, xi):
        with warnings.catch_warnings():
            warnings.simplefilter("ignore")
            mean, std = gp.predict(x, return_std=True)

        a = (mean - y_max - xi)
        z = a / std
        return a * norm.cdf(z) + std * norm.pdf(z)
    
    @staticmethod
    def _poi(x, gp, y_max, xi):
        with warnings.catch_warnings():
            warnings.simplefilter("ignore")
            mean, std = gp.predict(x, return_std=True)

        z = (mean - y_max - xi)/std
        return norm.cdf(z)
    

evaluated_points = []
evaluated_values = []

optimizer = BayesianOptimization(
    f=return_closest_objective_metric,
    pbounds=bounds,
    random_state=42  # Optional, for reproducibility
)

#u = UtilityFunction(kind="ucb", kappa=2.5)
ca = Custom_Acquisition(kind="ucb", kappa=2.5)
for _ in range(20):
    next_point = optimizer.suggest(utility_function=ca)
    value = return_closest_objective_metric(**next_point)
    optimizer.register(params=next_point, target=value)
    
    evaluated_points.append(next_point)
    evaluated_values.append(value)

# Retrieve the best parameters and their value found so far
best_params = optimizer.max['params']
best_value = optimizer.max['target']

print("Best parameters found:", best_params)
print("Corresponding value:", best_value)
print(data_df[obj_metric].max())

Best parameters found: {'QAgNO3': 4.53, 'Qpva': 9.999518096, 'Qseed': 19.5, 'Qtot': 351.84927370700353, 'Qtsc': 16.776930909004168}
Corresponding value: 0.9052872549444443
0.9070041334166667
