# LightGBM model
9 combinations (sex x featureset)  
Hyper param tuning by LightGBMTunerCV  

lightgbm==3.3.5  
optuna==2.8.0

In [None]:
import numpy as np
import pandas as pd
import os, re, glob, itertools
import optuna
import matplotlib.pyplot as plt
import seaborn as sns
import joblib
from tqdm import tqdm
import logging
from sklearn.model_selection import KFold
from sklearn.metrics import make_scorer

In [None]:
plt.rcParams['pdf.fonttype'] = 42
plt.rcParams['font.family'] = 'Arial'

In [None]:
LOGGER = logging.getLogger(__name__)

from optuna.integration import lightgbm as lgb
from optuna.integration.lightgbm import LightGBMTunerCV
optuna.logging.set_verbosity(optuna.logging.WARN)

# Define callback class

In [None]:
class ModelExtractionCallback(object):
    """Class for extracting trained models from lightgbm.cv() using callbacks.

    NOTE: This class relies on the non-public class '_CVBooster', 
    which may not work in future versions of LightGBM.
    """

    def __init__(self):
        self._model = None

    def __call__(self, env):
        # Keep a reference to the _CVBooster
        self._model = env.model

    def _assert_called_cb(self):
        if self._model is None:
            # Raise an exception if the callback has not been called
            raise RuntimeError('callback has not been called yet')

    @property
    def boosters_proxy(self):
        self._assert_called_cb()
        # Return a proxy object to the Booster
        return self._model

    @property
    def raw_boosters(self):
        self._assert_called_cb()
        # Return a list of Boosters
        return self._model.boosters

    @property
    def best_iteration(self):
        self._assert_called_cb()
        # Return the boosting round at early stopping
        return self._model.best_iteration


# Define Functions

## Define evaluation metrics

In [None]:
def MAPE(true_y, pred_y):
    return np.mean(np.abs((true_y - pred_y) / true_y)) * 100

loss = make_scorer(MAPE, greater_is_better=False)
LOSS = "MAPE"

## Hyperparameter tuning

### Cross_validate (LGBM)

In [None]:
def _cross_validate(X, y, folds, params):
    """Function to perform cross-validation using LightGBM"""
    lgb_train = lgb.Dataset(X, y)
    callbacks = [
        lgb.log_evaluation(10000),
        lgb.early_stopping(100),
    ]
    lgbcv = lgb.cv(params,
                   lgb_train,
                   folds=folds,
                   num_boost_round=1000,
                   return_cvbooster=True,
                   callbacks=callbacks)
    return lgbcv["cvbooster"]

def cv_mean_test_score(X, y, folds, params):
    """Function to calculate the mean test score for Out-of-Fold predictions"""
    cv_booster = _cross_validate(X, y, folds, params)
    oof_y_pred = _predict_oof(cv_booster, X, y, folds)
    test_score = MAPE(y, oof_y_pred)
    
    return test_score

## OOF predictions

In [None]:
def _predict_oof(cv_booster, X, y, folds):
    """Function to obtain Out-of-Fold predictions from trained models"""
    oof_y_preds = np.zeros_like(y.squeeze(), dtype=float)
    for i in range(len(cv_booster.boosters)):
        booster = cv_booster.boosters[i]
        val_index = folds[i][1]
        val_train_x = X.iloc[val_index]
        oof_y_preds[val_index] = booster.predict(val_train_x, num_iteration=cv_booster.best_iteration)
    return oof_y_preds

## Feature Importance Calculation

In [None]:
def cv_mean_feature_importance(X, y, folds, params):
    """Function to calculate feature importance using cross-validated models"""
    cv_booster = _cross_validate(X, y, folds, params)
    importances = [booster.feature_importance(importance_type='gain') for booster in cv_booster.boosters]
    mean_importance = np.mean(importances, axis=0)
    return mean_importance

## Null importance analysis

In [None]:
def analyse_null_importance(X, y, folds, params, percentile=50, trials=20):
    """
    inputs: X, y, folds ->
    * calc feature importance [base_importance]
    * calc feature importance with randomized y [null_importance] for TRIALS_N times
    * compare criterion_percentile percentile of null_importance
    
    return base_importance:null_importance ratio [null_imp_score]
        and
        sorted_indices which indicates the sorted feature index by null_imp_score
    """
    LOGGER.info('Starting base importance calculation')
    base_importance = cv_mean_feature_importance(X, y, folds, params)
    
    LOGGER.info('Starting null importance calculation')
    null_importances = []
    for _ in tqdm(range(trials)):
        train_y_permuted = np.random.permutation(y).flatten()
        null_importance = cv_mean_feature_importance(X, train_y_permuted, folds, params)
        null_importances.append(null_importance)
    null_importances = np.array(null_importances)

    percentile_null_imp = np.percentile(null_importances, percentile, axis=0)
    null_imp_score = base_importance / (percentile_null_imp + 1e-6)
    sorted_indices = np.argsort(null_imp_score)[::-1]
    
    return null_imp_score, sorted_indices

## Feature selection by null importance

In [None]:
def select_features_by_percentage(X, y, folds, params, scoring, null_imp_score, sorted_indices, DIR, use_percentages):
    sorted_columns = X.columns[sorted_indices]
    mean_test_scores = []
    
    for percentage in tqdm(use_percentages):
        num_of_features = int(len(sorted_columns) * percentage / 100)
        if num_of_features == 0:
            continue
        selected_cols = sorted_columns[:num_of_features]
        selected_X = X[selected_cols]
        LOGGER.info(f'Null Importance score TOP {percentage}%')
        LOGGER.info(f'Selected features: {list(selected_cols)}')
        mean_test_score = cv_mean_test_score(selected_X, y, folds, params, )#scoring)
        LOGGER.info(f'Mean test_score: {mean_test_score}')
        mean_test_scores.append(mean_test_score)
    
    return mean_test_scores

def select_features_by_num_features(X, y, folds, params, scoring, null_imp_score, sorted_indices, DIR, num_features_range):
    sorted_columns = X.columns[sorted_indices]
    selected_features_list = []
    mean_test_scores = []
    
    for num_features in tqdm(num_features_range):
        if num_features == 0:
            continue
        selected_cols = sorted_columns[:num_features]
        selected_X = X[selected_cols]
        LOGGER.info(f'Null Importance score TOP {num_features} features')
        LOGGER.info(f'Selected features: {list(selected_cols)}')
        mean_test_score = cv_mean_test_score(selected_X, y, folds, params, )#scoring)
        LOGGER.info(f'Mean test_score: {mean_test_score}')
        mean_test_scores.append(mean_test_score)
        selected_features_list.append(selected_cols)
    
    selected_features = selected_features_list[np.argmin(mean_test_scores)]
    joblib.dump(list(selected_features), os.path.join(DIR, "selected_features_by_null_importance.pkl"))
    
    return mean_test_scores, selected_features, selected_features_list


def plot_feature_selection_results(mean_test_scores, DIR):
    
    fig, ax1 = plt.subplots(figsize=(8, 4))
    ax1.plot(range(1, len(mean_test_scores) + 1), mean_test_scores, color='b', label='Mean test score')
    ax1.set_xlabel('Importance TOP n features')
    ax1.set_ylabel('Mean test score (RMSE)')
    
    min_index = np.argmin(mean_test_scores)
    min_value = mean_test_scores[min_index]
    
    ax1.plot(min_index+1, min_value, 'ro')
    ax1.text(min_index+1, min_value, f'features: {min_index+1}, Min: {min_value:.3f}', color='red')
    
    plt.legend()
    plt.grid()
    plt.savefig(os.path.join(DIR, "nullimportance_featureselection.pdf"), bbox_inches="tight")
    plt.close()

## parameter tuning by inner CV

In [None]:
def param_optim_loop(X, y, folds, best_params_before_featureselection, DIRinloop):
    """
    Function to perform parameter optimization and model training using LightGBM.

    Args:
        X (pd.DataFrame): Training features.
        y (pd.Series): Training labels.
        folds (list): List of fold indices for cross-validation.

    Returns:
        boosters (list): List of trained LightGBM booster models.
        best_iteration (int): The best iteration number.
    """
    train_set = lgb.Dataset(X, label=y)

    best_params = {}
    callbacks=[
        lgb.early_stopping(100),
        lgb.log_evaluation(10_000),
              ]
    # Initialize LightGBM tuner for cross-validation    
    tunercv = LightGBMTunerCV(best_params_before_featureselection,
                        train_set,
                        num_boost_round=2000,
                        verbose_eval=False,
                        folds=folds,
                        callbacks=callbacks)    
    
    # Run the tuner
    tunercv.run()
    best_params = tunercv.best_params
    
    # Save the best parameters to a file
    joblib.dump(best_params, os.path.join(DIRinloop, "best_params.pkl"))

    # Prepare callback for extracting trained models
    extraction_cb = ModelExtractionCallback()
    callbacks = [
        extraction_cb,
        lgb.log_evaluation(10_000),
    ]

    # Train the model with the best parameters
    cv_result = lgb.cv(params=best_params,num_boost_round=100_000,
                   train_set=train_set,
                   early_stopping_rounds=50,
                   folds=folds,
                   callbacks=callbacks,
                   return_cvbooster=True,)
   # Extract trained models from the callback
    proxy = extraction_cb.boosters_proxy
    boosters = extraction_cb.raw_boosters
    best_iteration = extraction_cb.best_iteration
    
    # Save the best iteration number and models
    joblib.dump(best_iteration,  os.path.join(DIRinloop, "/best_iterNO.pkl"))
    for p, booster in enumerate(boosters):
        booster.save_model(os.path.join(DIRinloop, f"model_{p}.txt"))

    return boosters, best_iteration

In [None]:
def plot_regression_results(test_y, y_pred_proba_avg, DIR):
    """Plot regression results."""
    finalMAPE = np.mean(np.abs(test_y.values - y_pred_proba_avg) / test_y.values) * 100
        
    plt.figure()
    plt.scatter(test_y.values, y_pred_proba_avg, label='test_samples',s=1)
    
    plt.xlabel("Chronological Age")
    plt.ylabel("Predicted Age")
    plt.grid(True)
    plt.text(np.min(test_y), np.max(y_pred_proba_avg)-7, '$MAPE =$' + str(finalMAPE.round(3)))    
    plt.savefig(os.path.join(DIR, "pred_vs_true.pdf"))
    plt.close()

# Data Import

In [None]:
ipt_DIR = "../../../0_data_processing/processed_data/"

In [None]:
modeltype = "LGBM"

In [None]:
list_out = ['BOS','BOT', 'RR', 'FR', 'FAI', 'ATI']
list_in = ['ONH_T','ONH_A', 'ONH_V', 'Choroid']
prev_features = [x + "_" + y for x in list_in for y in list_out]

In [None]:
R = ["0.99"]
SEX = ["male", "female", "both"]
FTYPE = ["prev", "tsfresh", "both"]
all_iter = itertools.product(R, SEX, FTYPE)

use_feature_importance_top_percentages = [100, 75, 50, 40, 30, 25, 20, 15, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1]
kf = KFold(n_splits=5,random_state=2022,shuffle=True)

In [None]:
initial_params = {
    'objective': 'regression',
    'metric': "mape",
    'verbosity': -1,
    "max_depth": -1,
    'feature_pre_filter': False,
    'lambda_l1': 1e-06,
    'lambda_l2': 1e-07,
    'num_leaves': 255,
    'feature_fraction': 0.6,
    "seed": 42,
}

## Model Training and Evaluation for All Sex-Feature Combinations

In [None]:
# Iterate over all combinations of parameters
for r, sextype, featuretype in tqdm(all_iter, total=len(R)*len(SEX)*len(FTYPE)):
    # Load data
    y = pd.read_csv(os.path.join(ipt_DIR, "y.csv"), index_col="group.cmp")
    X = pd.read_csv(os.path.join(ipt_DIR, "X_scaled.csv"), index_col="group.cmp")
    folds_out = joblib.load(os.path.join(ipt_DIR, "indices_5folds.pkl"))
    both_features = joblib.load(os.path.join(ipt_DIR, f"tsfresh_features_var_0_r_{r}.pkl"))
    outDIR = os.path.join("../out", LOSS, r, modeltype, sextype, featuretype)
    os.makedirs(outDIR, exist_ok=True)
    
    # Select features based on feature type
    if featuretype == "prev":
        used_features = prev_features
    elif featuretype == "tsfresh":
        used_features = list(set(both_features) - set(prev_features))
    elif featuretype == "both":
        used_features = both_features
    else:
        raise ValueError("Invalid value for featuretype: {}".format(featuretype))
    
    X = X[used_features]
    y = y.reset_index()
    X = X.reset_index()
    
    # Create a mapping from group.cmp to numeric indices
    str_to_num_mapping = dict(zip(y["group.cmp"], y.index))
    # Update fold indices with numeric indices
    for i in range(len(folds_out)):
        folds_out[i] = (
            [str_to_num_mapping[idx] for idx in folds_out[i][0]],
            [str_to_num_mapping[idx] for idx in folds_out[i][1]]
        )
    
    y = y.drop(columns=["group.cmp"])
    X = X.drop(columns=["group.cmp"])
    
    # Filter data by sex type    
    if sextype != "both":
        if sextype == "male":
            ind = np.array(y[y["SEX.男1.女0"] == 1].index)
        elif sextype == "female":
            ind = np.array(y[y["SEX.男1.女0"] == 0].index)
        else:
            raise ValueError("Invalid value for sextype: {}".format(sextype))
        
        y = y.loc[ind]
        X = X.loc[ind].reset_index()
        index_mapping = dict(zip(list(X["index"]), list(X.index)))
        X = X.drop(["index"], axis=1)
        y = y.loc[ind].reset_index().drop(["index"], axis=1)
        # Update fold indices with new mapping
        new_folds_out = []
        for train_idx, test_idx in folds_out:
            new_train_idx = [index_mapping[idx] for idx in train_idx if idx in index_mapping]
            new_test_idx = [index_mapping[idx] for idx in test_idx if idx in index_mapping]
            new_folds_out.append((new_train_idx, new_test_idx))
        folds_out = new_folds_out
                
    os.makedirs(os.path.join(outDIR, "optuna"), exist_ok=True)
    
    # Analyse null importance with initial parameters
    null_imp_score, sorted_indices = analyse_null_importance(X, y["Age"], folds_out, initial_params)
    # First feature selection to reduce calculation
    num_selected_features = sum(null_imp_score > 1)
    # Select columns based on sorted indices and number of selected features
    sorted_columns = X.columns[sorted_indices]
    X = X[sorted_columns].iloc[:, :num_selected_features]
    sorted_indices = np.arange(X.shape[1])
    
    # Hyper parameter optimization for feature selection from remained features
    train_set = lgb.Dataset(X, label=y["Age"])
    callbacks=[lgb.early_stopping(100),lgb.log_evaluation(10000),]
    tunercv_first = LightGBMTunerCV(initial_params,
                        train_set,
                        num_boost_round=2000,
                        verbose_eval=False,
                        folds=folds_out,
                        callbacks=callbacks)    
    tunercv_first.run()
    
    # Save the best parameters before feature selection to a file
    best_params_before_featureselection = tunercv_first.best_params
    best_params_before_filepath = os.path.join(outDIR, "best_params_before_featureselection.pkl")
    joblib.dump(best_params_before_featureselection, best_params_before_filepath)

    # calc. null importance    
    if featuretype == "prev":
        mean_test_scores, selected_features, selected_features_list = select_features_by_num_features(
            X, y["Age"], folds_out, best_params_before_featureselection, loss, null_imp_score, sorted_indices, outDIR, num_features_range=range(1, len(sorted_indices) + 1))
    else:
        # Rough survey using use_feature_importance_top_percentages
        mean_test_scores = select_features_by_percentage(
            X, y["Age"], folds_out, best_params_before_featureselection, loss, null_imp_score, sorted_indices, outDIR, use_percentages=use_feature_importance_top_percentages)
    
        # Detailed surveys
        best_index = np.argmin(mean_test_scores)
        detailed_percentages_upto = use_feature_importance_top_percentages[max(0, best_index-1)]
        num_features_for_detailed = int(np.ceil(len(sorted_indices) * detailed_percentages_upto / 100))
            
        mean_test_scores, selected_features, selected_features_list = select_features_by_num_features(
            X, y["Age"], folds_out, best_params_before_featureselection, loss, null_imp_score, sorted_indices, outDIR, num_features_range=range(1, num_features_for_detailed + 1))
    
    plot_feature_selection_results(mean_test_scores, outDIR)

    # nested CV
    pred_y=np.zeros(y.shape[0])
    for i, f in enumerate(folds_out):
        y_pred_proba_list = []
        tr_ind, te_ind = f[0], f[1]
        tr_x = X.iloc[tr_ind, :].loc[:, selected_features]
        te_x = X.iloc[te_ind, :].loc[:, selected_features]
        tr_y = y.iloc[tr_ind, :]["Age"]
        te_y = y.iloc[te_ind, :]["Age"]
        folds_in = list(kf.split(tr_x, tr_y))
        DIRinloop = os.path.join(outDIR, "optuna", f"outer_{i}")
        os.makedirs(DIRinloop, exist_ok=True)
        
        proxy, best_iteration = param_optim_loop(tr_x,tr_y,folds_in, best_params_before_featureselection, DIRinloop)
    
        for best_model in proxy:
            tmp_y_pred = best_model.predict(te_x, num_iteration=best_iteration)
            y_pred_proba_list.append(tmp_y_pred)
        y_pred_proba_avg = np.array(y_pred_proba_list).mean(axis=0)
        pred_y[te_ind] = y_pred_proba_avg
               
    if sextype == "male":
        pred_y = pred_y[np.array(y[y["SEX.男1.女0"] == 1].index)]
        true_y = y[y["SEX.男1.女0"] == 1].copy()
    elif sextype == "female":
        pred_y = pred_y[np.array(y[y["SEX.男1.女0"] == 0].index)]
        true_y = y[y["SEX.男1.女0"] == 0].copy()
    elif sextype == "both":
        true_y = y.copy()
    
    plot_regression_results(true_y["Age"], pred_y, outDIR)
    true_y["Predicted_age"] = pred_y
    true_y.to_csv(os.path.join(outDIR, "pred_vs_true.csv"))

    # plot coefficients
    cvbooster_from_file=lgb.CVBooster()
    for bestDIR in glob.glob(outDIR+"/optuna/**/*/model_*.txt", recursive=True):
        tmp_booster = lgb.Booster(model_file = bestDIR)
        cvbooster_from_file.boosters.append(tmp_booster)
    raw_importances = cvbooster_from_file.feature_importance(importance_type='gain')
    feature_name = cvbooster_from_file.boosters[0].feature_name()
    importance_df = pd.DataFrame(data=raw_importances,
                                 columns=feature_name)
    
    sorted_indices = importance_df.mean(axis=0).sort_values(ascending=False).index
    sorted_importance_df = importance_df.loc[:, sorted_indices]

    tops = [10, 20, 50, 200]
    for i in tops:
        PLOT_TOP_N = i
        plot_cols = sorted_importance_df.columns[:PLOT_TOP_N]
        _, ax = plt.subplots(figsize=(8, i*0.3))
        ax.grid()
        ax.set_xscale('symlog')
        ax.set_ylabel('Feature')
        ax.set_xlabel('Importance')
        sns.boxplot(data=sorted_importance_df[plot_cols], boxprops=dict(alpha=.3),
                    orient='h',
                    ax=ax)
        # Save the plot as a PDF file
        plot_filename = os.path.join(outDIR, f'feature_importance_top{PLOT_TOP_N}.pdf')
        plt.savefig(plot_filename, bbox_inches='tight')
        plt.close()
    
    # Save the sorted importance dataframe to a CSV file
    importance_csv_filepath = os.path.join(outDIR, 'feature_importance.csv')
    sorted_importance_df.to_csv(importance_csv_filepath)