Library

In [None]:

from typing import List, Dict, Tuple
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import sweetviz as sv
import lightgbm as lgb
from sklearn.model_selection import train_test_split, KFold, StratifiedKFold
from sklearn.metrics import log_loss, confusion_matrix
import xgboost as xgb
import lightgbm as lgb
from catboost import CatBoostClassifier, Pool
from sklearn.ensemble import RandomForestClassifier
from sklearn.cluster import KMeans
import optuna
from tabpfn import TabPFNClassifier

Data

In [39]:
data_domain = pd.read_csv("../data/dataset.csv")
data_img = pd.read_csv("../data/features(1).csv")
data_domain_valid = pd.read_csv("../data/dataset_valid.csv")
data_img_valid = pd.read_csv("../data/features_valid.csv")

In [40]:
data = pd.merge(data_domain, data_img, on="id", how="inner")
data_valid = pd.merge(data_domain_valid, data_img_valid, on="id", how="inner")

In [41]:
data_valid.columns

Index(['id', 'latitude_min', 'longitude_min', 'latitude_max', 'longitude_max',
       'sand', 'coral_algae', 'rock', 'seagrass', 'microalgal_mats', 'rubble',
       'sand_rate', 'coral_algae_rate', 'rock_rate', 'seagrass_rate',
       'microalgal_mats_rate', 'rubble_rate', 'seagrass_overlap', 'r_sum',
       'r_mean', 'r_var', 'g_sum', 'g_mean', 'g_var', 'b_sum', 'b_mean',
       'b_var', 'hog_sum', 'hog_mean', 'hog_var', 'sift_sum', 'sift_mean',
       'sift_var'],
      dtype='object')

In [42]:
columns_location = ['latitude_min', 'longitude_min', 'latitude_max', 'longitude_max']
columns_base = ['sand', 'coral_algae', 'rock', 'seagrass', 'microalgal_mats', 'rubble']
columns_rate = ['sand_rate', 'coral_algae_rate', 'rock_rate', 'seagrass_rate', 'microalgal_mats_rate', 'rubble_rate']
columns_rgb = ['r_sum', 'r_mean', 'r_var', 'g_sum', 'g_mean', 'g_var', 'b_sum', 'b_mean', 'b_var', 'hog_sum']
columns_hog = ['hog_sum', 'hog_mean', 'hog_var']
columns_sift = ['sift_sum', 'sift_mean', 'sift_var']
target = "seagrass_overlap"
target_binary = "target_binary"

In [43]:
data['target_binary'] = data[target].apply(lambda x: 0 if x == 0 else 1)
data_valid['target_binary'] = data_valid[target].apply(lambda x: 0 if x == 0 else 1)

Feature

In [44]:
# カテゴリカル作成
for col in ['sand', 'coral_algae', 'rock', 'seagrass', 'microalgal_mats', 'rubble']:
    data[col + '_onehot'] = data[col].apply(lambda x: 1 if x == 1 else 0)
for col in ['sand', 'coral_algae', 'rock', 'seagrass', 'microalgal_mats', 'rubble']:
    data_valid[col + '_onehot'] = data_valid[col].apply(lambda x: 1 if x == 1 else 0)

In [45]:
# 位置情報×ドメイン作成
categories = ['seagrass', 'seagrass_rate', 'coral_algae', 'rock', 'microalgal_mats', 'rubble']
for category in categories:
    data[f'latitude×{category}'] = ((data['latitude_min'] + data['latitude_max']) / 2) * data[category]
    data[f'longitude×{category}'] = ((data['longitude_min'] + data['longitude_max']) / 2) * data[category]
for category in categories:
    data_valid[f'latitude×{category}'] = ((data_valid['latitude_min'] + data_valid['latitude_max']) / 2) * data_valid[category]
    data_valid[f'longitude×{category}'] = ((data_valid['longitude_min'] + data_valid['longitude_max']) / 2) * data_valid[category]

In [46]:
# seagrass関係をexponentialize
data['seagrass_exp'] = np.exp(data['seagrass'])
data['seagrass_rate_exp'] = np.exp(data['seagrass_rate'])
data_valid['seagrass_exp'] = np.exp(data_valid['seagrass'])
data_valid['seagrass_rate_exp'] = np.exp(data_valid['seagrass_rate'])

In [47]:
# 位置情報×画像
features = ['r_mean', 'g_mean', 'b_mean', 'hog_mean', 'sift_mean']

data['longitude_avg'] = (data['longitude_min'] + data['longitude_max']) / 2
data['latitude_avg'] = (data['latitude_min'] + data['latitude_max']) / 2

for feature in features:
    data[f'longitude_avg×{feature}'] = data['longitude_avg'] * data[feature]
    data[f'latitude_avg×{feature}'] = data['latitude_avg'] * data[feature]

data_valid['longitude_avg'] = (data_valid['longitude_min'] + data_valid['longitude_max']) / 2
data_valid['latitude_avg'] = (data_valid['latitude_min'] + data_valid['latitude_max']) / 2

for feature in features:
    data_valid[f'longitude_avg×{feature}'] = data_valid['longitude_avg'] * data_valid[feature]
    data_valid[f'latitude_avg×{feature}'] = data_valid['latitude_avg'] * data_valid[feature]

In [48]:
# ドメイン×画像
features = ['r_mean', 'g_mean', 'b_mean', 'hog_mean', 'sift_mean']
categories = ['sand', 'coral_algae', 'rock', 'seagrass', 'microalgal_mats', 'rubble']

for feature in features:
    for category in categories:
        new_column_name = f'{feature}×{category}'
        data[new_column_name] = data[feature] * data[category]

for feature in features:
    for category in categories:
        new_column_name = f'{feature}×{category}'
        data_valid[new_column_name] = data_valid[feature] * data_valid[category]

In [49]:
X_data_base = data[columns_base]
X_data_base_valid = data_valid[columns_base]

X_data_rate_img = data[columns_rate + columns_rgb + columns_hog + columns_sift]
X_data_rate_img_valid = data_valid[columns_rate + columns_rgb + columns_hog + columns_sift]

In [50]:
# 教師無しクラスタリング

kmeans_3 = KMeans(n_clusters=3, random_state=0).fit(X_data_base)
data['base_3'] = kmeans_3.labels_
kmeans_3_valid = KMeans(n_clusters=3, random_state=0).fit(X_data_base_valid)
data_valid['base_3'] = kmeans_3_valid.predict(X_data_base_valid)

kmeans_5 = KMeans(n_clusters=5, random_state=0).fit(X_data_base)
data['base_5'] = kmeans_5.labels_
kmeans_5_valid = KMeans(n_clusters=5, random_state=0).fit(X_data_base_valid)
data_valid['base_5'] = kmeans_5_valid.predict(X_data_base_valid)

kmeans_5_ = KMeans(n_clusters=5, random_state=0).fit(X_data_rate_img)
data['rate_img__5'] = kmeans_5_.labels_
kmeans_5_valid_ = KMeans(n_clusters=5, random_state=0).fit(X_data_rate_img_valid)
data_valid['rate_img_5'] = kmeans_5_valid_.predict(X_data_rate_img_valid)

kmeans_7 = KMeans(n_clusters=7, random_state=0).fit(X_data_rate_img)
data['rate_img_7'] = kmeans_7.labels_
kmeans_7_valid = KMeans(n_clusters=7, random_state=0).fit(X_data_rate_img_valid)
data_valid['rate_img_7'] = kmeans_7_valid.predict(X_data_rate_img_valid)

In [54]:
columns_location = ['latitude_min', 'longitude_min', 'latitude_max', 'longitude_max']
columns_base = ['sand', 'coral_algae', 'rock', 'seagrass', 'microalgal_mats', 'rubble']
columns_rate = ['sand_rate', 'coral_algae_rate', 'rock_rate', 'seagrass_rate', 'microalgal_mats_rate', 'rubble_rate']
columns_rgb = ['r_sum', 'r_mean', 'r_var', 'g_sum', 'g_mean', 'g_var', 'b_sum', 'b_mean', 'b_var', 'hog_sum']
columns_hog = ['hog_sum', 'hog_mean', 'hog_var']
columns_sift = ['sift_sum', 'sift_mean', 'sift_var']
columns_onehot = ['sand_onehot', 'coral_algae_onehot', 'rock_onehot', 'seagrass_onehot', 'microalgal_mats_onehot', 'rubble_onehot']
columns_loc_base = ['latitude×seagrass', 'longitude×seagrass', 'latitude×seagrass_rate', 'longitude×seagrass_rate', 'latitude×coral_algae', 'longitude×coral_algae', 'latitude×rock', 'longitude×rock', 'latitude×microalgal_mats', 'longitude×microalgal_mats', 'latitude×rubble', 'longitude×rubble']
columns_exp = ['seagrass_exp', 'seagrass_rate_exp']
columns_loc_img = ['longitude_avg×r_mean', 'latitude_avg×r_mean', 'longitude_avg×g_mean', 'latitude_avg×g_mean', 'longitude_avg×b_mean', 'latitude_avg×b_mean', 'longitude_avg×hog_mean', 'latitude_avg×hog_mean', 'longitude_avg×sift_mean', 'latitude_avg×sift_mean']
columns_kmeans = ['base_3', 'base_5', 'rate_img_5', 'rate_img_7']
target = "seagrass_overlap"
target_binary = "target_binary"

In [None]:
columns_location = ['latitude_min', 'longitude_min', 'latitude_max', 'longitude_max']
columns_base = ['sand', 'coral_algae', 'rock', 'seagrass', 'microalgal_mats', 'rubble']
columns_rate = ['sand_rate', 'coral_algae_rate', 'rock_rate', 'seagrass_rate', 'microalgal_mats_rate', 'rubble_rate']
columns_rgb = ['r_sum', 'r_mean', 'r_var', 'g_sum', 'g_mean', 'g_var', 'b_sum', 'b_mean', 'b_var']
columns_hog = ['hog_sum', 'hog_mean', 'hog_var']
columns_sift = ['sift_sum', 'sift_mean', 'sift_var']
columns_onehot = ['sand_onehot', 'coral_algae_onehot', 'rock_onehot', 'seagrass_onehot', 'microalgal_mats_onehot', 'rubble_onehot']
columns_loc_base = ['latitude×seagrass', 'longitude×seagrass', 'latitude×seagrass_rate', 'longitude×seagrass_rate', 'latitude×coral_algae', 'longitude×coral_algae', 'latitude×rock', 'longitude×rock', 'latitude×microalgal_mats', 'longitude×microalgal_mats', 'latitude×rubble', 'longitude×rubble']
columns_exp = ['seagrass_exp', 'seagrass_rate_exp']
columns_loc_img = ['longitude_avg×r_mean', 'latitude_avg×r_mean', 'longitude_avg×g_mean', 'latitude_avg×g_mean', 'longitude_avg×b_mean', 'latitude_avg×b_mean', 'longitude_avg×hog_mean', 'latitude_avg×hog_mean', 'longitude_avg×sift_mean', 'latitude_avg×sift_mean']
columns_base_img =['r_mean×sand',
       'r_mean×coral_algae', 'r_mean×rock', 'r_mean×seagrass',
       'r_mean×microalgal_mats', 'r_mean×rubble', 'g_mean×sand',
       'g_mean×coral_algae', 'g_mean×rock', 'g_mean×seagrass',
       'g_mean×microalgal_mats', 'g_mean×rubble', 'b_mean×sand',
       'b_mean×coral_algae', 'b_mean×rock', 'b_mean×seagrass',
       'b_mean×microalgal_mats', 'b_mean×rubble', 'hog_mean×sand',
       'hog_mean×coral_algae', 'hog_mean×rock', 'hog_mean×seagrass',
       'hog_mean×microalgal_mats', 'hog_mean×rubble', 'sift_mean×sand',
       'sift_mean×coral_algae', 'sift_mean×rock', 'sift_mean×seagrass',
       'sift_mean×microalgal_mats', 'sift_mean×rubble']
columns_kmeans = ['base_3', 'base_5', 'rate_img_5', 'rate_img_7']
target = "seagrass_overlap"
target_binary = "target_binary"

In [None]:
columns_a = columns_location + columns_base + columns_rate + columns_rgb + columns_hog + columns_sift + columns_onehot + columns_kmeans
columns_b = columns_base + columns_rate + columns_rgb + columns_hog + columns_sift + columns_onehot + columns_kmeans
columns_c = columns_base + columns_rgb + columns_hog + columns_sift
columns_d = columns_loc_base + columns_onehot + columns_rgb + columns_hog + columns_sift
columns_e = columns_exp + columns_base_img + columns_kmeans
columns_f = columns_loc_img + columns_rate
columns_g = columns_base + columns_rate + columns_rgb + columns_hog + columns_sift

Metrics and Bins

In [68]:
def generate_confusion_matrix(
    y_true: np.ndarray,
    y_pred: np.ndarray,
    threshold: float,
) -> np.ndarray:
    """
    予測値と実際の値から混同行列を生成する関数。

    引数:
    - y_true: np.ndarray, テストデータセットの実際のクラスラベル。
    - y_pred: np.ndarray, テストデータセットに対する予測確率。
    - threshold: float, 予測確率をクラスラベルに変換するための閾値。

    戻り値:
    - np.ndarray: 生成された混同行列。
    """
    y_pred_label = (y_pred >= threshold).astype(int)
    
    cm = confusion_matrix(y_true, y_pred_label)
    
    return cm

LightGBM

In [17]:

def train_lightgbm_stratified_kfold(
    X: pd.DataFrame,
    y: pd.Series,
    columns_feature: List[str],
    target_binary: List[str],  # この実装では使用しないが、型情報を含める
    param: Dict[str, any],
    n_splits: int = 5,   
) -> Tuple[List[lgb.Booster], List[float]]:
    """
    Stratified k-foldクロスバリデーションを使用してLightGBMモデルを訓練する関数。
    非均衡データを考慮して、各クラスの割合を保持する。

    引数:
    - X: pandas DataFrame, 特徴量データ。
    - y: pandas Series, 目的変数データ。
    - columns_feature: list, 特徴量のカラム名のリスト。
    - target_binary: list, 目的変数のカラム名のリスト（この関数では使用しないが、一貫性のために残す）。
    - param: dict, LightGBMモデルのパラメータ。
    - n_splits: int, クロスバリデーションの分割数。

    戻り値:
    - Tuple[List[lgb.Booster], List[float]]: 訓練済みLightGBMモデルのリストと各foldのlog lossスコアのリスト。
    """
    skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=42)
    models: List[lgb.Booster] = []
    scores: List[float] = []

    for train_index, test_index in skf.split(X, y):
        X_train, X_test = (
            X.iloc[train_index][columns_feature],
            X.iloc[test_index][columns_feature],
        )
        y_train, y_test = y.iloc[train_index], y.iloc[test_index]

        lgb_train = lgb.Dataset(X_train, y_train)
        lgb_eval = lgb.Dataset(X_test, y_test, reference=lgb_train)

        gbm: lgb.Booster = lgb.train(
            param,
            lgb_train,
            valid_sets=[lgb_train, lgb_eval],
        )

        y_pred = gbm.predict(X_test, num_iteration=gbm.best_iteration)
        score: float = log_loss(y_test, y_pred)
        scores.append(score)
        models.append(gbm)

    return models, scores

def predict_with_lightgbm(
    models: List[lgb.Booster],
    X_test: pd.DataFrame,
    columns_feature: List[str]
) -> np.ndarray:
    """
    訓練済みLightGBMモデルのリストを使用してテストデータセットの予測を行う関数。

    引数:
    - models: List[lgb.Booster], 訓練済みLightGBMモデルのリスト。
    - X_test: pandas DataFrame, テストデータセット。
    - columns_feature: list, 特徴量のカラム名のリスト。

    戻り値:
    - np.ndarray: テストデータセットの予測値の平均値。
    """
    predictions = []
    
    for model in models:
        y_pred = model.predict(X_test[columns_feature], num_iteration=model.best_iteration)
        predictions.append(y_pred)
    
    predictions_mean = np.mean(predictions, axis=0)
    
    return predictions_mean

In [18]:
def objective_lightgbm(trial, X, y, columns_feature, n_splits=5):
    """
    Optunaの試行に対する目的関数。LightGBMのハイパーパラメータを探索する。

    引数:
    - trial: optuna.trial.Trial オブジェクト
    - X, y: 訓練データ
    - columns_feature: 特徴量のカラム名のリスト
    - n_splits: クロスバリデーションの分割数

    戻り値:
    - 試行の平均log lossスコア
    """
    param = {
        "objective": "binary",
        "metric": "binary_logloss",
        "verbosity": -1,
        "boosting_type": "gbdt",
        "lambda_l1": trial.suggest_loguniform("lambda_l1", 1e-8, 10.0),
        "lambda_l2": trial.suggest_loguniform("lambda_l2", 1e-8, 10.0),
        "num_leaves": trial.suggest_int("num_leaves", 2, 256),
        "feature_fraction": trial.suggest_uniform("feature_fraction", 0.4, 1.0),
        "bagging_fraction": trial.suggest_uniform("bagging_fraction", 0.4, 1.0),
        "bagging_freq": trial.suggest_int("bagging_freq", 1, 7),
        "min_child_samples": trial.suggest_int("min_child_samples", 5, 100),
    }

    _, scores = train_lightgbm_stratified_kfold(
        X, y, columns_feature, [], param, n_splits
    )
    return np.mean(scores)


def optimize_hyperparameters_lightgbm(
    X, y, columns_feature, n_trials=100, n_splits=5, timeout=300
):
    """
    Optunaを使用してLightGBMモデルのハイパーパラメータを最適化する。

    引数:
    - X, y: 訓練データ
    - columns_feature: 特徴量のカラム名のリスト
    - n_trials: 試行回数の上限
    - n_splits: クロスバリデーションの分割数
    - timeout: 探索にかける時間の上限（秒）

    戻り値:
    - 最適なハイパーパラメータの辞書
    """
    study = optuna.create_study(direction="minimize")
    study.optimize(
        lambda trial: objective_lightgbm(trial, X, y, columns_feature, n_splits),
        n_trials=n_trials,
        timeout=timeout,
    )

    print(f"Best trial: {study.best_trial.value}")
    print(f"Best params: {study.best_trial.params}")

    return study.best_trial.params

In [None]:
optimize_hyperparameters_lightgbm(data_valid[columns_a], data_valid[target_binary], columns_a)

In [36]:
models_lightgbm, scores_lightgbm = train_lightgbm_stratified_kfold(data[columns_a], data[target_binary], columns_base, target_binary, param)
#NOTE: Define param

In [69]:
y_pred_lightgbm = predict_with_lightgbm(models_lightgbm, data_valid[columns_a], columns_a)

In [70]:
generate_confusion_matrix(data_valid[target_binary], y_pred_lightgbm, 0.5)

array([[1762,    3],
       [ 137,    0]])

Xgboost

In [59]:
def train_xgboost_stratified_kfold(
    X: pd.DataFrame,
    y: pd.Series,
    columns_feature: List[str],
    target_binary: List[str],  # この実装では使用しないが、型情報を含める
    param: Dict[str, any],
    n_splits: int = 5,
) -> Tuple[List[xgb.Booster], List[float]]:
    """
    Stratified k-foldクロスバリデーションを使用してXGBoostモデルを訓練する関数。
    非均衡データを考慮して、各クラスの割合を保持する。

    引数:
    - X: pandas DataFrame, 特徴量データ。
    - y: pandas Series, 目的変数データ。
    - columns_feature: list, 特徴量のカラム名のリスト。
    - target_binary: list, 目的変数のカラム名のリスト（この関数では使用しないが、一貫性のために残す）。
    - param: dict, XGBoostモデルのパラメータ。
    - n_splits: int, クロスバリデーションの分割数。

    戻り値:
    - Tuple[List[xgb.Booster], List[float]]: 訓練済みXGBoostモデルのリストと各foldのlog lossスコアのリスト。
    """
    skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=42)
    models: List[xgb.Booster] = []
    scores: List[float] = []

    for train_index, test_index in skf.split(X, y):
        X_train, X_test = (
            X.iloc[train_index][columns_feature],
            X.iloc[test_index][columns_feature],
        )
        y_train, y_test = y.iloc[train_index], y.iloc[test_index]

        dtrain = xgb.DMatrix(X_train, label=y_train)
        dtest = xgb.DMatrix(X_test, label=y_test)

        bst: xgb.Booster = xgb.train(
            param,
            dtrain,
            num_boost_round=param.get("num_boost_round", 100),
            evals=[(dtrain, "train"), (dtest, "eval")],
            # early_stopping_rounds=param.get("early_stopping_rounds", 10),
            # verbose_eval=param.get("verbose_eval", 50),
        )

        y_pred = bst.predict(dtest)
        score: float = log_loss(y_test, y_pred)
        scores.append(score)
        models.append(bst)

    return models, scores


def predict_with_xgboost(models: List[xgb.Booster], X_test: pd.DataFrame, columns_feature: List[str]) -> np.ndarray:
    """
    訓練済みXGBoostモデルのリストを使用してテストデータセットの予測を行う関数。

    引数:
    - models: List[xgb.Booster], 訓練済みXGBoostモデルのリスト。
    - X_test: pandas DataFrame, テストデータセット。
    - columns_feature: list, 特徴量のカラム名のリスト。

    戻り値:
    - np.ndarray: テストデータセットの予測値の平均値。
    """
    # 各モデルからの予測値を格納するリスト
    predictions = []
    
    for model in models:
        dtest = xgb.DMatrix(X_test[columns_feature])
        y_pred = model.predict(dtest)
        predictions.append(y_pred)
    
    # 予測値の平均を計算
    predictions_mean = np.mean(predictions, axis=0)
    
    return predictions_mean

In [60]:
def objective_xgboost(trial, X, y, columns_feature, n_splits=5):
    """
    Optunaの試行に対する目的関数。XGBoostのハイパーパラメータを探索する。

    引数:
    - trial: optuna.trial.Trial オブジェクト
    - X, y: 訓練データ
    - columns_feature: 特徴量のカラム名のリスト
    - n_splits: クロスバリデーションの分割数

    戻り値:
    - 試行の平均log lossスコア
    """
    param = {
        "verbosity": 0,
        "objective": "binary:logistic",
        "eval_metric": "logloss",
        "booster": "gbtree",
        "eta": trial.suggest_loguniform("eta", 1e-3, 0.1),
        "max_depth": trial.suggest_int("max_depth", 3, 9),
        "subsample": trial.suggest_uniform("subsample", 0.6, 1.0),
        "colsample_bytree": trial.suggest_uniform("colsample_bytree", 0.6, 1.0),
        "min_child_weight": trial.suggest_int("min_child_weight", 1, 10),
        "lambda": trial.suggest_loguniform("lambda", 1e-3, 10.0),
        "alpha": trial.suggest_loguniform("alpha", 1e-3, 10.0),
    }

    _, scores = train_xgboost_stratified_kfold(
        X, y, columns_feature, [], param, n_splits
    )
    return np.mean(scores)


def optimize_hyperparameters_xgboost(
    X, y, columns_feature, n_trials=100, n_splits=5, timeout=300
):
    """
    Optunaを使用してXGBoostモデルのハイパーパラメータを最適化する。

    引数:
    - X, y: 訓練データ
    - columns_feature: 特徴量のカラム名のリスト
    - n_trials: 試行回数の上限
    - n_splits: クロスバリデーションの分割数
    - timeout: 探索にかける時間の上限（秒）

    戻り値:
    - 最適なハイパーパラメータの辞書
    """
    study = optuna.create_study(direction="minimize")
    study.optimize(
        lambda trial: objective_xgboost(trial, X, y, columns_feature, n_splits),
        n_trials=n_trials,
        timeout=timeout,
    )

    print(f"Best trial: {study.best_trial.value}")
    print(f"Best params: {study.best_trial.params}")

    return study.best_trial.params

Catboost

In [62]:
def train_catboost_stratified_kfold(
    X: pd.DataFrame,
    y: pd.Series,
    columns_feature: List[str],
    target_binary: List[str],  # この実装では使用しないが、型情報を含める
    param: Dict[str, any],
    n_splits: int = 5,
) -> Tuple[List[CatBoostClassifier], List[float]]:
    """
    Stratified k-foldクロスバリデーションを使用してCatBoostモデルを訓練する関数。
    非均衡データを考慮して、各クラスの割合を保持する。

    引数:
    - X: pandas DataFrame, 特徴量データ。
    - y: pandas Series, 目的変数データ。
    - columns_feature: list, 特徴量のカラム名のリスト。
    - target_binary: list, 目的変数のカラム名のリスト（この関数では使用しないが、一貫性のために残す）。
    - param: dict, CatBoostモデルのパラメータ。
    - n_splits: int, クロスバリデーションの分割数。

    戻り値:
    - Tuple[List[CatBoostClassifier], List[float]]: 訓練済みCatBoostモデルのリストと各foldのlog lossスコアのリスト。
    """
    skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=42)
    models: List[CatBoostClassifier] = []
    scores: List[float] = []

    for train_index, test_index in skf.split(X, y):
        X_train, X_test = (
            X.iloc[train_index][columns_feature],
            X.iloc[test_index][columns_feature],
        )
        y_train, y_test = y.iloc[train_index], y.iloc[test_index]

        train_pool = Pool(X_train, y_train)
        test_pool = Pool(X_test, y_test)

        model = CatBoostClassifier(**param)
        model.fit(
            train_pool,
            eval_set=test_pool,
            # verbose=50,
            # early_stopping_rounds=100,
            use_best_model=True,
        )

        y_pred = model.predict_proba(X_test)[:, 1]
        score = log_loss(y_test, y_pred)
        scores.append(score)
        models.append(model)

    return models, scores


def predict_with_catboost(
    models: List[CatBoostClassifier], X_test: pd.DataFrame, columns_feature: List[str]
) -> np.ndarray:
    """
    訓練済みCatBoostモデルのリストを使用してテストデータセットの予測を行う関数。

    引数:
    - models: List[CatBoostClassifier], 訓練済みCatBoostモデルのリスト。
    - X_test: pandas DataFrame, テストデータセット。
    - columns_feature: list, 特徴量のカラム名のリスト。

    戻り値:
    - np.ndarray: テストデータセットの予測値の平均値。
    """
    predictions = []

    for model in models:
        y_pred = model.predict_proba(X_test[columns_feature])[:, 1]
        predictions.append(y_pred)

    predictions_mean = np.mean(predictions, axis=0)

    return predictions_mean

In [63]:
def objective_catboost(trial, X, y, columns_feature, n_splits=5):
    """
    Optunaの試行に対する目的関数。CatBoostのハイパーパラメータを探索する。

    引数:
    - trial: optuna.trial.Trial オブジェクト
    - X, y: 訓練データ
    - columns_feature: 特徴量のカラム名のリスト
    - n_splits: クロスバリデーションの分割数

    戻り値:
    - 試行の平均log lossスコア
    """
    param = {
        "iterations": trial.suggest_int("iterations", 100, 1000),
        "depth": trial.suggest_int("depth", 4, 10),
        "learning_rate": trial.suggest_loguniform("learning_rate", 0.01, 0.3),
        "random_strength": trial.suggest_int("random_strength", 1, 20),
        "bagging_temperature": trial.suggest_loguniform(
            "bagging_temperature", 0.01, 1.0
        ),
        "l2_leaf_reg": trial.suggest_loguniform("l2_leaf_reg", 1e-8, 10),
        "border_count": trial.suggest_int("border_count", 1, 255),
        "loss_function": "Logloss",
        "eval_metric": "Logloss",
        "verbose": False,
    }

    _, scores = train_catboost_stratified_kfold(
        X, y, columns_feature, [], param, n_splits
    )
    return np.mean(scores)


def optimize_hyperparameters_catboost(
    X, y, columns_feature, n_trials=100, n_splits=5, timeout=300
):
    """
    Optunaを使用してCatBoostモデルのハイパーパラメータを最適化する。

    引数:
    - X, y: 訓練データ
    - columns_feature: 特徴量のカラム名のリスト
    - n_trials: 試行回数の上限
    - n_splits: クロスバリデーションの分割数
    - timeout: 探索にかける時間の上限（秒）

    戻り値:
    - 最適なハイパーパラメータの辞書
    """
    study = optuna.create_study(direction="minimize")
    study.optimize(
        lambda trial: objective_catboost(trial, X, y, columns_feature, n_splits),
        n_trials=n_trials,
        timeout=timeout,
    )

    print(f"Best trial: {study.best_trial.value}")
    print(f"Best params: {study.best_trial.params}")

    return study.best_trial.params

RandomForest

In [65]:
import numpy as np
import pandas as pd
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import log_loss
from sklearn.ensemble import RandomForestClassifier
from typing import List, Dict, Tuple


def train_random_forest_stratified_kfold(
    X: pd.DataFrame,
    y: pd.Series,
    columns_feature: List[str],
    target_binary: List[str],  # この実装では使用しないが、型情報を含める
    param: Dict[str, any],
    n_splits: int = 5,
) -> Tuple[List[RandomForestClassifier], List[float]]:
    """
    Stratified k-foldクロスバリデーションを使用してRandom Forestモデルを訓練する関数。
    非均衡データを考慮して、各クラスの割合を保持する。

    引数:
    - X: pandas DataFrame, 特徴量データ。
    - y: pandas Series, 目的変数データ。
    - columns_feature: list, 特徴量のカラム名のリスト。
    - target_binary: list, 目的変数のカラム名のリスト（この関数では使用しないが、一貫性のために残す）。
    - param: dict, Random Forestモデルのパラメータ。
    - n_splits: int, クロスバリデーションの分割数。

    戻り値:
    - Tuple[List[RandomForestClassifier], List[float]]: 訓練済みRandom Forestモデルのリストと各foldのlog lossスコアのリスト。
    """
    skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=42)
    models: List[RandomForestClassifier] = []
    scores: List[float] = []

    for train_index, test_index in skf.split(X, y):
        X_train, X_test = (
            X.iloc[train_index][columns_feature],
            X.iloc[test_index][columns_feature],
        )
        y_train, y_test = y.iloc[train_index], y.iloc[test_index]

        model = RandomForestClassifier(**param)
        model.fit(X_train, y_train)

        y_pred = model.predict_proba(X_test)[:, 1]
        score = log_loss(y_test, y_pred)
        scores.append(score)
        models.append(model)

    return models, scores


def predict_with_randomforest(
    models: List[RandomForestClassifier],
    X_test: pd.DataFrame,
    columns_feature: List[str],
) -> np.ndarray:
    """
    訓練済みRandom Forestモデルのリストを使用してテストデータセットの予測を行う関数。

    引数:
    - models: List[RandomForestClassifier], 訓練済みRandom Forestモデルのリスト。
    - X_test: pandas DataFrame, テストデータセット。
    - columns_feature: list, 特徴量のカラム名のリスト。

    戻り値:
    - np.ndarray: テストデータセットの予測値の平均値。
    """
    predictions = []

    for model in models:
        y_pred = model.predict_proba(X_test[columns_feature])[:, 1]
        predictions.append(y_pred)

    predictions_mean = np.mean(predictions, axis=0)

    return predictions_mean

In [66]:
def objective_random_forest(trial, X, y, columns_feature, n_splits=5):
    """
    Optunaの試行に対する目的関数。Random Forestのハイパーパラメータを探索する。

    引数:
    - trial: optuna.trial.Trial オブジェクト
    - X, y: 訓練データ
    - columns_feature: 特徴量のカラム名のリスト
    - n_splits: クロスバリデーションの分割数

    戻り値:
    - 試行の平均log lossスコア
    """
    param = {
        "n_estimators": trial.suggest_int("n_estimators", 100, 1000),
        "max_depth": trial.suggest_int("max_depth", 6, 30),
        "min_samples_split": trial.suggest_int("min_samples_split", 2, 150),
        "min_samples_leaf": trial.suggest_int("min_samples_leaf", 1, 60),
        "max_features": trial.suggest_uniform("max_features", 0.1, 1.0),
    }

    _, scores = train_random_forest_stratified_kfold(
        X, y, columns_feature, [], param, n_splits
    )
    return np.mean(scores)


def optimize_hyperparameters_random_forest(
    X, y, columns_feature, n_trials=100, n_splits=5, timeout=300
):
    """
    Optunaを使用してRandom Forestモデルのハイパーパラメータを最適化する。

    引数:
    - X, y: 訓練データ
    - columns_feature: 特徴量のカラム名のリスト
    - n_trials: 試行回数の上限
    - n_splits: クロスバリデーションの分割数
    - timeout: 探索にかける時間の上限（秒）

    戻り値:
    - 最適なハイパーパラメータの辞書
    """
    study = optuna.create_study(direction="minimize")
    study.optimize(
        lambda trial: objective_random_forest(trial, X, y, columns_feature, n_splits),
        n_trials=n_trials,
        timeout=timeout,
    )

    print(f"Best trial: {study.best_trial.value}")
    print(f"Best params: {study.best_trial.params}")

    return study.best_trial.params

TabPFN

In [None]:
def train_tabpfn_stratified_kfold(
    X: pd.DataFrame,
    y: pd.Series,
    columns_feature: List[str],
    target_binary: List[str],  # この実装では使用しないが、型情報を含める
    n_splits: int = 5,
) -> Tuple[List[TabPFNClassifier], List[float]]:
    """
    Stratified k-foldクロスバリデーションを使用してTabPFNモデルを訓練する関数。

    引数:
    - X: pandas DataFrame, 特徴量データ。
    - y: pandas Series, 目的変数データ。
    - columns_feature: list, 特徴量のカラム名のリスト。
    - target_binary: list, 目的変数のカラム名のリスト（この関数では使用しないが、一貫性のために残す）。
    - n_splits: int, クロスバリデーションの分割数。

    戻り値:
    - Tuple[List[TabPFNClassifier], List[float]]: 訓練済みTabPFNモデルのリストと各foldのlog lossスコアのリスト。
    """
    skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=42)
    models: List[TabPFNClassifier] = []
    scores: List[float] = []

    for train_index, test_index in skf.split(X, y):
        X_train, X_test = X.iloc[train_index][columns_feature], X.iloc[test_index][columns_feature]
        y_train, y_test = y.iloc[train_index], y.iloc[test_index]

        classifier = TabPFNClassifier(device='cpu', N_ensemble_configurations=8)
        classifier.fit(X_train, y_train, overwrite_warning=True)
        y_pred = classifier.predict_proba(X_test)[:, 1]  # 二値分類の場合

        score: float = log_loss(y_test, y_pred)
        scores.append(score)
        models.append(classifier)

    return models, scores

  
def predict_with_tabpfn(
    models: List[TabPFNClassifier],
    X_test: pd.DataFrame,
    columns_feature: List[str]
) -> np.ndarray:
    """
    訓練済みTabPFNClassifierモデルのリストを使用してテストデータセットの予測を行う関数。

    引数:
    - models: List[TabPFNClassifier], 訓練済みTabPFNClassifierモデルのリスト。
    - X_test: pandas DataFrame, テストデータセット。
    - columns_feature: list, 特徴量のカラム名のリスト。

    戻り値:
    - np.ndarray: テストデータセットの予測確率値の平均値。
    """
    predictions = []

    for model in models:
        # TabPFNClassifierのpredict_probaメソッドを使用して確率を予測
        # ここでは、すべてのクラスに対する確率を取得します
        y_pred = model.predict_proba(X_test[columns_feature])
        predictions.append(y_pred)

    # axis=0に沿って予測の平均を計算
    predictions_mean = np.mean(predictions, axis=0)

    return predictions_mean