In [1]:
import warnings
warnings.filterwarnings('ignore')
from typing import Literal

import pandas as pd
import numpy as np
from custom_ml_toolkit.preprocessor.encoder import SupportMissingDatasetEncoder
from sklearn.model_selection import train_test_split, KFold
from sklearn.metrics import classification_report
from xgboost import XGBClassifier
from lightgbm import LGBMClassifier

import optuna
from functools import partial

## Data Preparation

In [2]:
random_state = 77
data_df = pd.read_csv('example_data/titanic.csv')
data_df['Deck'] = data_df['Cabin'].str[0]

numerical_cols = ['Age', 'SibSp', 'Parch', 'Fare']
norminal_cols = ['Sex', 'Embarked']
ordinal_cols = ['Pclass', 'Deck']
target_col = 'Survived'

train_data_df, test_data_df = train_test_split(
    data_df,
    test_size=0.8,
    random_state=random_state,
    stratify=data_df['Survived']
)

de = SupportMissingDatasetEncoder(
    numerical_cols=numerical_cols,
    norminal_cols=norminal_cols,
    ordinal_cols=ordinal_cols,
    target_col=target_col,
    drop_binary=True,
    oe_unknown_value=np.nan,
    oe_missing_value=np.nan,
    encode_target=True
)

de.fit(train_data_df)
encoded_train_data_df = de.transform(train_data_df)
encoded_test_data_df = de.transform(test_data_df)

X_train = encoded_train_data_df.drop(columns=['Survived'])
y_train = encoded_train_data_df['Survived']

X_test = encoded_test_data_df.drop(columns=['Survived'])
y_test = encoded_test_data_df['Survived']

In [3]:
def generate_class_weight(
        trial,
        class_names: list
    ):
    class_names = sorted(class_names)
    class_weights_dict = dict()

    if len(class_names) > 2:
        for class_name in class_names:
            class_weights_dict[class_name] = trial.suggest_float(class_name, 0.01, 1)
    else:
        first_class_weight = trial.suggest_float(class_names[0], 0.01, 1)
        second_calss_weight = 1 - first_class_weight
        class_weights_dict[class_names[0]] = first_class_weight
        class_weights_dict[class_names[1]] = second_calss_weight

    return class_weights_dict

def generate_hyper_params(
        trial,
        model: Literal['xgb', 'lgbm'] = 'xgb'
    ):
    if model == 'xgb':
        params = {
            'n_estimators': trial.suggest_int('n_estimators', 50, 900),
            'learning_rate': trial.suggest_float('learning_rate', 0.01, 1, log=True), # or eta def=0.3 [0,1]
            'min_split_loss': trial.suggest_float('min_split_loss', 0, 10), # or gamma def=0 [0, inf]
            'max_depth': trial.suggest_int('max_depth', 1, 10), # def=6 [0, inf]
            'max_leaves': trial.suggest_int('max_leaves', 0, 10), # def=0
            'max_bin': trial.suggest_int('max_bin', 128, 256), # def=256
            'min_child_weight': trial.suggest_int('min_child_weight', 1, 10), # def=1 [0, inf]
            'max_delta_step': trial.suggest_int('max_delta_step', 1, 10), # def=1 [0, inf]
            'subsample': trial.suggest_float('subsample', 0.01, 1.0, log=True), # def=1 (0, inf]
            'colsample_bytree': trial.suggest_float('colsample_bytree', 0.01, 1, log=True), # def=1 (0, 1]
            'reg_lambda': trial.suggest_float('reg_lambda', 1e-8, 2.0, log=True), # or lambda def=1
            'reg_alpha': trial.suggest_float('reg_alpha', 1e-8, 2.0, log=True), # or alpha def=0
            ## 'grow_policy': trial.suggest_categorical('grow_policy', ['depthwise', 'lossguide']),
            ## 'booster': trial.suggest_categorical('booster', ['gbtree', 'gblinear']), # def=gbtree
            ## 'sampling_method' trial.suggest_categorical('sampling_method', ['uniform', 'gradient_based'] ) # def=uniform only supported when tree_method is set to gpu_hist
            ##'tree_method': trial.suggest_categorical('tree_method', ['exact', 'approx', 'hist']), # def=auto
        }
    elif model == 'lgbm':
        params = {
        }
    return params

def train_model(
        X_train,
        y_train,
        params: dict = None,
        class_weights: dict = None,
        model: Literal['xgb', 'lgbm'] = 'xgb'
    ):

    if params is None:
        params = dict()

    if class_weights is None:
        sample_weight = None
    else:
        sample_weight = y_train\
            .map(class_weights)\
            .to_numpy()

    if model == 'xgb':
        clf = XGBClassifier(
            **params,
            random_state=random_state,
            n_jobs=-1,
            missing=np.nan
        )
    elif model == 'lgbm':
        clf = LGBMClassifier(
            **params,
            random_state=random_state,
            n_jobs=-1,
            verbose=-1
        )

    clf.fit(
        X_train,
        y_train,
        sample_weight=sample_weight
    )

    return clf

def eval_model(clf, X_test, y_test):
    y_test_pred = clf.predict(X_test)

    eval_dict = classification_report(
        y_true=y_test,
        y_pred=y_test_pred,
        output_dict=True
    )
    score = eval_dict['macro avg']['f1-score']
    return score

def objective(
        trial,
        X_train,
        y_train,
        X_test=None,
        y_test=None,
        k=3,
        model: Literal['xgb', 'lgbm'] = 'xgb',
        mode: Literal['class_weight', 'params'] = 'params',
        class_weights:dict = None
    ):

    if mode == 'class_weight':
        class_weights = generate_class_weight(trial, class_names=list(y_train.unique()))
        params = None
    elif mode == 'params':
        params = generate_hyper_params(trial, model=model)

    if (X_test is None) or (y_test is None):
        kf = KFold(n_splits=k, shuffle=False)
        cv_score_list = list()
        for train_index, test_index in kf.split(X=X_train, y=y_train):
            X_train_cv = X_train.iloc[train_index].copy()
            y_train_cv =  y_train.iloc[train_index].copy()
            X_test_cv = X_train.iloc[test_index].copy()
            y_test_cv = y_train.iloc[test_index].copy()

            clf = train_model(
                    X_train_cv,
                    y_train_cv,
                    params=params,
                    class_weights=class_weights,
                    model=model
            )
            cv_score = eval_model(clf, X_test_cv, y_test_cv)

            cv_score_list.append(cv_score)
        score = sum(cv_score_list)/len(cv_score_list)
    else:
        clf = train_model(
                X_train,
                y_train,
                params=params,
                class_weights=class_weights,
                model=model
        )
        score = eval_model(clf, X_test, y_test)
    return score


## Tune Class Weight

In [4]:
objective_func = partial(
    objective,
    X_train=X_train,
    y_train=y_train,
    # X_test=X_test,
    # y_test=y_test,
    k=3,
    model='xgb',
    mode='class_weight'
)

study = optuna.create_study(direction = 'maximize')
study.optimize(objective_func, n_trials = 10)
trial = study.best_trial
print('Best Score: ', trial.value)
print('Best Params: ')
print(trial.params)
optuna.visualization.plot_contour(study)
optuna.visualization.plot_param_importances(study)

[I 2024-09-17 18:45:27,015] A new study created in memory with name: no-name-ff5db418-aaa5-46bd-a9e2-ec3535af7e99
[I 2024-09-17 18:45:27,221] Trial 0 finished with value: 0.69223788765024 and parameters: {0: 0.7169069231694983}. Best is trial 0 with value: 0.69223788765024.
[I 2024-09-17 18:45:27,395] Trial 1 finished with value: 0.5995265497503349 and parameters: {0: 0.28772406957720625}. Best is trial 0 with value: 0.69223788765024.
[I 2024-09-17 18:45:27,562] Trial 2 finished with value: 0.6689543195753798 and parameters: {0: 0.6829458607525221}. Best is trial 0 with value: 0.69223788765024.
[I 2024-09-17 18:45:27,729] Trial 3 finished with value: 0.6453801288448789 and parameters: {0: 0.5421046644088138}. Best is trial 0 with value: 0.69223788765024.
[I 2024-09-17 18:45:27,886] Trial 4 finished with value: 0.5869936326477152 and parameters: {0: 0.17516539801811765}. Best is trial 0 with value: 0.69223788765024.
[I 2024-09-17 18:45:28,023] Trial 5 finished with value: 0.469209179950

Best Score:  0.69223788765024
Best Params: 
{0: 0.7169069231694983}


In [5]:
objective_func = partial(
    objective,
    X_train=X_train,
    y_train=y_train,
    # X_test=X_test,
    # y_test=y_test,
    k=3,
    model='xgb',
    mode='params',
    class_weights={0: 0.6807888256513457, 1: 1-0.6807888256513457}
)

study = optuna.create_study(direction = 'maximize')
study.optimize(objective_func, n_trials = 100)
trial = study.best_trial
print('Best Score: ', trial.value)
print('Best Params: ')
print(trial.params)
optuna.visualization.plot_contour(study)
optuna.visualization.plot_param_importances(study)

[I 2024-09-17 18:45:29,771] A new study created in memory with name: no-name-9cfc89f1-7177-4b39-9be6-2f3e6680814b
[I 2024-09-17 18:45:29,903] Trial 0 finished with value: 0.38146997929606624 and parameters: {'n_estimators': 71, 'learning_rate': 0.28842586843678486, 'min_split_loss': 8.984763037879581, 'max_depth': 8, 'max_leaves': 8, 'max_bin': 141, 'min_child_weight': 4, 'max_delta_step': 10, 'subsample': 0.6266441490966028, 'colsample_bytree': 0.024786313897839758, 'reg_lambda': 3.174353179534399e-08, 'reg_alpha': 0.009403822054904563}. Best is trial 0 with value: 0.38146997929606624.
[I 2024-09-17 18:45:30,278] Trial 1 finished with value: 0.38146997929606624 and parameters: {'n_estimators': 604, 'learning_rate': 0.05735535115096553, 'min_split_loss': 2.220597561817087, 'max_depth': 6, 'max_leaves': 3, 'max_bin': 225, 'min_child_weight': 5, 'max_delta_step': 10, 'subsample': 0.01750841548515223, 'colsample_bytree': 0.07973545366620943, 'reg_lambda': 1.8119863495948314e-07, 'reg_alph

Best Score:  0.688200801500695
Best Params: 
{'n_estimators': 177, 'learning_rate': 0.5683748930635135, 'min_split_loss': 1.08453056948852, 'max_depth': 7, 'max_leaves': 5, 'max_bin': 187, 'min_child_weight': 1, 'max_delta_step': 6, 'subsample': 0.6087200131106071, 'colsample_bytree': 0.07317213219278966, 'reg_lambda': 1.8854197021232464, 'reg_alpha': 0.09696958363421158}
