# Imports

In [None]:
# run as shell instead of python

!pip install --quiet optuna optuna-integration

In [None]:
import warnings, random
import numpy as np
import pandas as pd

from sklearn.model_selection import train_test_split, KFold
from sklearn.metrics import mean_squared_error
from sklearn.preprocessing import OneHotEncoder
from sklearn.compose import ColumnTransformer

import xgboost as xgb

import optuna
from optuna.integration import XGBoostPruningCallback

warnings.filterwarnings("ignore")

RANDOM_STATE = 42
np.random.seed(RANDOM_STATE)
random.seed(RANDOM_STATE)


---
# Dataset

In [None]:
loaded_dataset = "custo"

if loaded_dataset == "california":
    from sklearn.datasets import fetch_california_housing
    data = fetch_california_housing()
    X = data.data
    y = data.target
elif loaded_dataset == "custo":
    import kagglehub
    from kagglehub import KaggleDatasetAdapter
    
    file_path = ""
    dataset_path = ""
    df = kagglehub.load_dataset(
      KaggleDatasetAdapter.PANDAS,
      dataset_path,
      file_path,
    )
    df = df.rename(columns={'custo': 'Target'})
    
    X = df.drop(columns={"Target"})
    y = df["Target"]

## Preprocessing

In [None]:
cat_cols = [c for c in X.columns if X[c].dtype == 'object' or str(X[c].dtype).startswith('category')]
num_cols = [c for c in X.columns if c not in cat_cols]

#Stratification for regression
n_bins = 10 
y_binned = pd.qcut(y, q=n_bins, duplicates='drop')

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.1, random_state=RANDOM_STATE, stratify=y_binned
)

if len(cat_cols) > 0:

    onehot_trees = OneHotEncoder(sparse_output=False, handle_unknown="ignore")

    preprocess_for_trees = ColumnTransformer(
       transformers=[("onehot_trees", onehot_trees, cat_cols)],
       remainder="passthrough"
    )

    X_train = preprocess_for_trees.fit_transform(X_train)
    X_test = preprocess_for_trees.transform(X_test)
    y_train = y_train.to_numpy()
    y_test = y_test.to_numpy()


---
# Optuna - Hyperparameter optimization

## Study Parameters

In [None]:
study_params = {
    "show_progress_bar":True,
    #"n_trials": 50,
    "timeout": 7*60*60
}

def get_params(trial: optuna.Trial) -> dict:
    params = {
        "objective": "reg:squarederror",
        "eval_metric": "rmse",
        "random_state": RANDOM_STATE,
        "n_jobs": -1,
        "tree_method": "hist",

        "learning_rate": trial.suggest_float(
            #"learning_rate", 0.02, 0.15, log=True
            "learning_rate", 0.01, 0.08, log=True
        ),
        "n_estimators": trial.suggest_int(
            #"n_estimators", 300, 2000
            "n_estimators", 300, 1200
        ),

        "max_depth": trial.suggest_int(
            #"max_depth", 4, 10
            "max_depth", 3, 6
        ),

        "min_child_weight": trial.suggest_float(
            #"min_child_weight", 1.0, 10.0
            "min_child_weight", 3.0, 15.0
        ),
        "gamma": trial.suggest_float(
            #"gamma", 0.0, 5.0
            "gamma", 0.5, 5.0
        ),
        "reg_lambda": trial.suggest_float(
            #"reg_lambda", 1.0, 10.0
            "reg_lambda", 5.0, 30.0
        ),
        "reg_alpha": trial.suggest_float(
            #"reg_alpha", 0.0, 5.0
            "reg_alpha", 0.5, 10.0
        ),

        "subsample": trial.suggest_float(
            #"subsample", 0.7, 1.0
            "subsample", 0.6, 0.9
        ),
        "colsample_bytree": trial.suggest_float(
            #"colsample_bytree", 0.7, 1.0        
            "colsample_bytree", 0.6, 0.9
        ),
    }

    return params

# Optuna

In [None]:
study_name ='xgboost tuning'

def objective(trial: optuna.Trial) -> float:
    params = get_params(trial)

    kf = KFold(n_splits=5, shuffle=True, random_state=42)
    fold_rmse = []

    for fold_i, (train_idx, valid_idx) in enumerate(kf.split(X_train, y_train), 1):
        X_tr, X_val = X_train[train_idx], X_train[valid_idx]
        y_tr, y_val = y_train[train_idx], y_train[valid_idx]

        dtrain = xgb.DMatrix(X_tr, label=y_tr)
        dvalid = xgb.DMatrix(X_val, label=y_val)

        pruning_callback = XGBoostPruningCallback(
            trial, "validation-rmse"
        )

        booster = xgb.train(
            params,
            dtrain,
            num_boost_round=params["n_estimators"],
            evals=[(dvalid, "validation")],
            early_stopping_rounds=100,
            callbacks=[pruning_callback],
            verbose_eval=False,
        )

        preds = booster.predict(dvalid)
        rmse = np.sqrt(mean_squared_error(y_val, preds))
        fold_rmse.append(rmse)

    return np.mean(fold_rmse)


study = optuna.create_study(
    direction="minimize",
    pruner=optuna.pruners.MedianPruner(n_warmup_steps=3),
    study_name=study_name
)
study.optimize(objective, **study_params)

print("Best RMSE:", study.best_value)
print("Best params:", study.best_params)