In [None]:
import csv

import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np

import xgboost as xgb
from sklearn.tree import DecisionTreeRegressor
from sklearn.ensemble import AdaBoostRegressor
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.ensemble import ExtraTreesRegressor
from sklearn.ensemble import RandomForestRegressor
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import DotProduct, WhiteKernel

from sklearn.model_selection import train_test_split, GridSearchCV, KFold, cross_val_score
from sklearn.metrics import mean_squared_error, r2_score

from scipy.optimize import minimize, differential_evolution
from scipy.stats import norm
from sklearn.utils import resample
from typing import Tuple, List, Optional, Dict, Any

import optuna
import joblib

In [74]:
data=pd.read_csv('Oxidation_reaction.csv')

In [None]:
# calculate correlation
def calculate_corr(data):
    plt.rcParams['font.sans-serif'] = ['Times New Roman']
    plt.rcParams['axes.unicode_minus']=False

    corr1 = data.corr(method='pearson')
    print("pearson",corr1)
    fig, ax1 = plt.subplots(1, 1, figsize=(8, 8))
    sns.heatmap(corr1, ax=ax1, cmap="plasma", annot=True)
    ax1.set_title("Correlation (pearson) Heatmap of Variables in LHS Sampled Data")

    corr2 = data.corr(method='spearman')
    print("spearman",corr2)
    fig, ax1 = plt.subplots(1, 1, figsize=(8, 8))
    sns.heatmap(corr2, ax=ax1, cmap="plasma", annot=True)
    ax1.set_title("Correlation (spearman) Heatmap of Variables in LHS Sampled Data")

    corr3 = data.corr(method='kendall')
    print("kendall",corr3)
    fig, ax1 = plt.subplots(1, 1, figsize=(8, 8))
    sns.heatmap(corr3, ax=ax1, cmap="plasma", annot=True)
    ax1.set_title("Correlation (kendall) Heatmap of Variables in LHS Sampled Data")
calculate_corr(data)

In [None]:
# model filter
def model_filter(data):
    X=data.drop(['Residual of 17a','Yield'],axis=1)
    y=data['Yield']
    models=[xgb.XGBRegressor(objective='reg:squarederror'),
       RandomForestRegressor(),
        ExtraTreesRegressor(),
        GradientBoostingRegressor(),
        AdaBoostRegressor(),
        DecisionTreeRegressor(),
        GaussianProcessRegressor(kernel = DotProduct() + WhiteKernel())
       ]
    models_name=['XGBRegressor','RandomForestRegressor','ExtraTreesRegressor','GradientBoostingRegressor','AdaBoostRegressor','DecisionTreeRegressor','GaussianProcessRegressor']
    all_rmse_test=[]
    all_rmse_train=[]

    for model in models:
        kf = KFold(n_splits=4,shuffle=True, random_state=42)
        rmse_test_list=[]
        rmse_train_list=[]
        for train_index, test_index in kf.split(X, y):
            X_train=X.iloc[train_index]
            y_train=y.iloc[train_index]
            X_test=X.iloc[test_index]
            y_test=y.iloc[test_index]
            
            model.fit(X_train,y_train)
            y_pred_test = model.predict(X_test)
            y_pred_train=model.predict(X_train)

            rmse_test_list.append(np.sqrt(mean_squared_error(y_test, y_pred_test)))
            rmse_train_list.append(np.sqrt(mean_squared_error(y_train, y_pred_train)))

        all_rmse_test.append(np.mean(rmse_test_list))
        all_rmse_train.append(np.mean(rmse_train_list))

    print("rmse_test:")
    for i in range(len(models_name)):
        print(f"{models_name[i]}: {all_rmse_test[i]}")
    print("rmse_train:")
    for i in range(len(models_name)):
        print(f"{models_name[i]}: {all_rmse_train[i]}")
model_filter(data)

In [None]:
X=data.drop(['Residual of 17a','Yield'],axis=1)
y=data['Yield']

lower_rmse = np.inf
lower_number = 0

def model_optimize():
    def objective(trial):

        params = {
            'objective': 'reg:squarederror', 
            'booster': 'gbtree', 
            'random_state': 42,
            
            'n_estimators': trial.suggest_int('n_estimators', 100, 3000, step=100),
            'max_depth': trial.suggest_int('max_depth', 3, 9),
            'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.3, log=True),
            'subsample': trial.suggest_float('subsample', 0.6, 1.0),
            'colsample_bytree': trial.suggest_float('colsample_bytree', 0.6, 1.0),
            
            'reg_alpha': trial.suggest_float('reg_alpha', 1e-8, 10.0, log=True),
            'reg_lambda': trial.suggest_float('reg_lambda', 1e-8, 10.0, log=True),
        }

        model = xgb.XGBRegressor(**params)

        kfold = KFold(n_splits=4, shuffle=True, random_state=42)
        rmse_scores = []
        for train_index, test_index in kfold.split(X, y):
            X_train, y_train = X.iloc[train_index], y.iloc[train_index]
            X_test, y_test = X.iloc[test_index], y.iloc[test_index]
            model = xgb.XGBRegressor(**params)
            model.fit(X_train, y_train)

            y_pred_test = model.predict(X_test)
            rmse = np.sqrt(mean_squared_error(y_test, y_pred_test))
            rmse_scores.append(rmse)
        
        avg_rmse = np.mean(rmse_scores)
        
        return avg_rmse

    class EarlyStopCallback:
        def __init__(self, patience, min_delta):
            self.patience = patience
            self.min_delta = min_delta
            self.best_score = None
            self.wait = 0
            self.stop_now = False

        def __call__(self, study, trial):
            current_score = study.best_value
            
            if self.best_score is None:
                self.best_score = current_score
            else:
                
                if abs(current_score - self.best_score)/self.best_score > self.min_delta:
                    self.best_score = current_score
                    self.wait = 0
                else:
                    self.wait += 1
                

                if self.wait >= self.patience:
                    self.stop_now = True
                    study.stop()

    early_stopping_callback = EarlyStopCallback(patience=50, min_delta=0.01)
    
    study = optuna.create_study(direction='minimize')
    study.optimize(objective, n_trials=500, callbacks=[early_stopping_callback], show_progress_bar=True)
    

    best_params = study.best_params
    best_score = study.best_value 
    
    final_best_params = {
        'objective': 'reg:squarederror',
        'booster': 'gbtree'
    }
    final_best_params.update(best_params)

    print("best_param:", final_best_params)
    print("best_score:", best_score)

    return final_best_params, best_score
params, score = model_optimize()

In [None]:
class XGBoostBayesianOptimizer:
    def __init__(self, 
                 X_train: np.ndarray, 
                 y_train: np.ndarray, 
                 xgb_params: Dict[str, Any],
                 n_estimators: int = 10, 
                 random_state: int = 42):
        self.X_train = X_train
        self.y_train = y_train
        self.xgb_params = xgb_params
        self.n_estimators = n_estimators
        self.random_state = random_state
        self.models = []
        
        self.best_y = np.max(y_train) 
        
        self._train_bootstrap_models()

    def _train_bootstrap_models(self):
        self.models = []
        
        for i in range(self.n_estimators):
            X_sample, y_sample = resample(self.X_train, self.y_train, 
                                          replace=True, 
                                          n_samples=len(self.X_train), 
                                          random_state=range(self.n_estimators)[i])
            
            model = xgb.XGBRegressor(**self.xgb_params, random_state=range(self.n_estimators)[i])
            model.fit(X_sample, y_sample, eval_set=[(X_sample, y_sample)], verbose=False)
            
            self.models.append(model)
        
        print(f"finished {len(self.models)} Bootstrap models")

    def predict_with_uncertainty(self, X: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
        X = np.atleast_2d(X)
        predictions = np.array([model.predict(X) for model in self.models])
        
        mean = np.mean(predictions, axis=0)
        std = np.std(predictions, axis=0)
        
        std = np.maximum(std, 1e-6)
        
        return mean, std

    def _acquisition_function_ei(self, X: np.ndarray, xi: float = 0.01) -> np.ndarray:
        X = np.atleast_2d(X)
        mu, sigma = self.predict_with_uncertainty(X)
        
        mu = mu.flatten()
        sigma = sigma.flatten()
        
        with np.errstate(divide='ignore'):
            imp = mu - self.best_y - xi
            Z = imp / sigma
            
            ei = imp * norm.cdf(Z) + sigma * norm.pdf(Z)
            ei[sigma == 0.0] = 0.0

        print(mu,sigma,ei)

        return -ei

    def suggest_next_sample(self, 
                            bounds: List[Tuple[float, float]], 
                            n_restarts: int = 20, 
                            xi: float = 0.01) -> np.ndarray:

        bounds = np.array(bounds)
        n_dims = len(bounds)
        
        best_x = None
        max_ei_value = -np.inf
        
        for i in range(n_restarts):
            x0 = np.random.uniform(bounds[:, 0], bounds[:, 1])
            
            def objective(x):
                return self._acquisition_function_ei(x.reshape(1, -1), xi)[0]
            
            try:
                res = minimize(
                    fun=objective,
                    x0=x0,
                    bounds=bounds,
                    method='L-BFGS-B',
                    options={'ftol': 1e-6, 'maxiter': 200}
                )
                
                if res.success and res.fun < max_ei_value: 
                    pass 
                    
            except Exception as e:
                continue
                
        candidates = []
        for i in range(n_restarts):
            x0 = np.random.uniform(bounds[:, 0], bounds[:, 1])
            def objective(x):
                return self._acquisition_function_ei(x.reshape(1, -1), xi)[0]
            
            res = minimize(objective, x0, bounds=bounds, method='L-BFGS-B', options={'ftol': 1e-6})
            if res.success:
                candidates.append((res.x, -res.fun))
        
        print("All sampled data:",candidates)
        candidates.sort(key=lambda k: k[1], reverse=True)
        best_x_1, best_ei_1 = candidates[0]
        best_x_2, best_ei_2 = candidates[1]
        print(f"New sample point 1. (EI={best_ei_1:.4f}): {best_x_1}")
        print(f"New sample point 2. (EI={best_ei_2:.4f}): {best_x_2}")


my_xgb_params = params

optimizer = XGBoostBayesianOptimizer(
    X_train=X, 
    y_train=y, 
    xgb_params=my_xgb_params,
    n_estimators=10  
)


search_bounds = [(0.5, 1.75),(1,1.75),(0.8,2.0),(1.1,1.5),(70,90),(10,60)] 

optimizer.suggest_next_sample(
    bounds=search_bounds, 
    n_restarts=20,
    xi=0.01        
)