Nested CV XGBoost classifier.

In [None]:
import pandas as pd
import numpy as np
import optuna
import pickle
import matplotlib.pyplot as plt
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import roc_auc_score, precision_recall_curve, average_precision_score
from xgboost import XGBClassifier, DMatrix, plot_importance
from xgboost.callback import EarlyStopping
from sklearn.metrics import auc as aucfunc
from copy import deepcopy
import glob
import os

from optuna.trial import Trial
from typing import Any, Dict

from sklearn.metrics import PrecisionRecallDisplay

In [None]:
###USER INPUT
optuna__n_trials = 300
optuna__n_jobs = 2
nthread = 4
k_fold = 8
run = "Name"
if not os.path.isdir(f"Results_XGB/{run}"):
    os.mkdir(f"Results_XGB/{run}")
scoring_str = "AUC_PR"

early_stopping_rounds = 50

space = {"gamma":optuna.distributions.UniformDistribution(0.1, 6.0),
         "learning_rate":optuna.distributions.LogUniformDistribution(1e-4, 2),
         "max_depth":optuna.distributions.IntUniformDistribution(3,12),
         "subsample":optuna.distributions.DiscreteUniformDistribution(0.6, 1, 0.1)}

#### load NACC
X_val_full = pd.read_csv("", index_col=0)
y_val = pd.read_csv("", index_col=0)
y_val = y_val[~y_val.index.duplicated()] -1
X_val_full = X_val_full[~X_val_full.index.duplicated()]
# Only use NACC pats with labels
y_val = X_val_full.join(y_val, on="NACCID", how="inner")["0"].copy()
X_val = X_val_full.loc[y_val.index]
#exclude MRI
#X_val = X_val[X_val.columns[~X_val.columns.str.contains("^ST")]]
cols = X_val.columns
# add APOE as dummy variables
X_val = X_val.join(pd.get_dummies(X_val["APOE4"], prefix="APOE"))
X_val.drop("APOE4", axis=1, inplace=True)

###### load ADNI data and reduce to cols
lables = pd.read_csv(glob.glob("")[0], index_col=0)
X_train = pd.read_csv("", index_col=0)
X_train = X_train[cols]

# add APOE as dummy variables
X_train = X_train.join(pd.get_dummies(X_train["APOE4"], prefix="APOE"))
X_train.drop("APOE4", axis=1, inplace=True)

print(run, "n_trials:", optuna__n_trials)

In [None]:
assert X_train.shape[0] == y_train.shape[0]
assert X_val.shape[0] == y_val.shape[0]
assert (X_train.columns == X_val.columns).all()

#### Functions

In [None]:
# define early stopping for xgb
early_stop = EarlyStopping(rounds=early_stopping_rounds, save_best=True)

In [None]:
# utility functions to logg results in optuna study object
def store_scores(trial:Trial, scores:dict) -> None:
        """store scores in trial object, compute mean and std
        the array holds the values across folds"""
        
        for name, array in scores.items():
            
            if len(array) > 0:
                for i, score in enumerate(array):
                    trial.set_user_attr(f"split{i}_{name}", score)
            
            # creates the 'mean_val_score' that will be optimized.
            trial.set_user_attr(f"mean_{name}", np.nanmean(array))
            trial.set_user_attr(f"std_{name}", np.nanstd(array))

            
def store_metrics(trial:Trial, metrics:dict) -> None:
    """store metrics in trial object, compute mean and std. 
    the array holds the values across folds"""

    for name, array in metrics.items():
        
        if len(array) > 0:
            for i, metric in enumerate(array):
                    trial.set_user_attr(f"split{i}_{name}", metric)

            trial.set_user_attr(f"mean_{name}", np.nanmean(array))
            trial.set_user_attr(f"std_{name}", np.nanstd(array))

            if name in ["epochs","n_estimators"]:

                trial.set_user_attr(f"median_{name}", np.nanmedian(array))
                iqr = np.nanquantile(array, 0.75) - np.nanquantile(array,0.25) 
                trial.set_user_attr(f"iqr_{name}", iqr)

In [None]:
def calc_aucPR(y_true, y_pred):
    """Calculate the area under the precision recall curve."""
    
    precision, recall, thresholds = precision_recall_curve(y_true, y_pred)
    return aucfunc(recall, precision)

#### Inner fold

In [None]:
class ObjectiveCV_Gbm(object):
    
    def __init__(
        self,
        param_distributions: dict,
        X,
        y,
        scoring : str = "AUC",
        cv: int = 5
        ) -> None:

        self.X = X
        self.y = y
        self.cv = cv
        self.fit_params = None
        self.param_distributions = param_distributions
        self.scoring = scoring

    def __call__(self, trial: Trial) -> float:
        """callable for optuna search"""
        
        fit_params = self._get_params(trial) # sample suggested hyperparams
        cv = StratifiedKFold(n_splits = self.cv)
        
        # containers 
        metrics = {
            "fit_time" : [],
            "n_estimators" : []
            }
        
        scores = {
            # train
           "train_AUC" : [],
           "train_AUC_PR" : [],
           # val
           "val_AUC_PR" : [],
           "val_AUC" : []
           }
        
        # get cross-validated estimate for one particular hyperparameter set
        for fold, (train_idx, val_idx) in enumerate(cv.split(self.X, self.y)):
        
            X_train = self.X.iloc[train_idx]
            y_train = self.y.iloc[train_idx]
            X_val = self.X.iloc[val_idx]    
            y_val = self.y.iloc[val_idx]
            
            # not needed
            #dtrain = DMatrix(X_train, label=y_train)
            #dtest = DMatrix(X_test, label=y_test)
            
            # initialize model
            estimator = XGBClassifier(use_label_encoder=False, eval_metric="logloss", 
                                      nthread=nthread, callbacks=[deepcopy(early_stop)])
            estimator.set_params(**fit_params)
            
            # fit with early stopping on validation set and keep best model
            estimator.fit(X_train, y_train, eval_set=[(X_val, y_val)], verbose=False)
            
            # predict
            y_train_pred = estimator.predict_proba(X_train)[:,1]
            y_val_pred = estimator.predict_proba(X_val)[:,1]
            
            
            # evaluate model performance
            ## train
            AUC_train = roc_auc_score(y_train, y_train_pred)
            scores["train_AUC"].append(AUC_train)
            scores["train_AUC_PR"].append(calc_aucPR(y_train, y_train_pred))

            ## val
            AUC_val = roc_auc_score(y_val, y_val_pred)
            scores["val_AUC"].append(AUC_val)
            scores["val_AUC_PR"].append(calc_aucPR(y_val, y_val_pred))

            ## choose scoring to return for optuna to optimize
            if self.scoring == "AUC":
                scores["train_score"] = scores["train_AUC"]
                scores["val_score"] = scores["val_AUC"]

            elif self.scoring == "AUC_PR":
                scores["train_score"] = scores["train_AUC_PR"]
                scores["val_score"] = scores["val_AUC_PR"]
            
            # metrics
            metrics["n_estimators"].append(estimator.n_estimators)
            
        # log scores and metrics
        store_scores(trial, scores)
        store_metrics(trial, metrics)
        
        return trial.user_attrs["mean_val_score"]
    
    def _get_params(self, trial: Trial) -> Dict[str, Any]:
        """estimate/sample fit params using trial history"""

        return {name:trial._suggest(name, distribution) 
                for name, distribution in self.param_distributions.items()}

#### Outer fold

In [None]:
n_estimators = 5024

# Nested CV
results = []
results_pr = []
results_train = []
results_pr_train = []
pr_scores = []
pr_scores_train = []
cur_fold = 0
all_predictions = pd.DataFrame(columns=["Score", "Fold"], index=X_train.index)

    
objective = ObjectiveCV_Gbm(space,
                            X_train,
                            y_train,
                            scoring = scoring_str,
                            cv=k_fold,)

# create study
study = optuna.create_study(study_name='xGBoost',
                            storage=f'sqlite:///Results_XGB/{run}/optuna__xgb_{cur_fold}.db', 
                            direction = "maximize", 
                            load_if_exists = True)

# optimize
study.optimize(objective, 
                n_trials = optuna__n_trials,
                n_jobs = optuna__n_jobs)

#### outer loop predictions
best_params = study.best_params

model = XGBClassifier(use_label_encoder=False, eval_metric="logloss", 
                      nthread=nthread, callbacks=[deepcopy(early_stop)])
model.set_params(**best_params)

model.fit(X_train, y_train, eval_set=[(X_val, y_val)], verbose=False)

# training score
pred_train = model.predict_proba(X_train)[:,1]
auc_train = roc_auc_score(y_train, pred_train)
results_train.append(auc_train)

auc_pr_train = calc_aucPR(y_train, pred_train)
results_pr_train.append(auc_pr_train)
pr_score_train = average_precision_score(y_train, pred_train)
pr_scores_train.append(pr_score_train)

# Get feature importance per outer fold
pd.DataFrame(model.feature_importances_, index=X_train.columns, 
             columns=[f"{run.split('_')[1]}_{cur_fold}"]).to_csv(f"Results_XGB/{run}/feat_importance_{cur_fold}.csv")

## Test

In [None]:
# calculate scores
pred = model.predict_proba(X_val)[:,1]
auc = roc_auc_score(y_val, pred)
results.append(auc)

auc_pr = calc_aucPR(y_val, pred)
results_pr.append(auc_pr)
pr_score = average_precision_score(y_val, pred)
pr_scores.append(pr_score)

print(f'XGB:   outer_fold:{auc=}, {auc_pr=}, {pr_score=}, hyps:{study.best_params}')

In [None]:
# plot PR curve
PrecisionRecallDisplay.from_predictions(y_val, pred, name="XGB")
plt.ylim(0, 1)
plt.savefig(f"Results_XGB/{run}/PR_curve.png", dpi=200)

#save results
print(f"{run}. Avg AUC: {np.mean(results)}, {np.std(results)}. Avg PR_score: {np.mean(pr_scores)}, \
     Avg AUC_PR: {np.mean(results_pr)}, {np.std(results_pr)}. \
    \nTraining performance: {np.mean(results_train)=}, {np.mean(results_pr_train)=}, {np.mean(pr_score_train)=}")

with open(f"Results_XGB/{run}/results.txt", "w") as f:
    f.write(f"{run}. Avg AUC: {np.mean(results)}, {np.std(results)}. Avg PR_score: {np.mean(pr_scores)}, \
     Avg AUC_PR: {np.mean(results_pr)}, {np.std(results_pr)}. \
    \nTraining performance: {np.mean(results_train)=}, {np.mean(results_pr_train)=}, {np.mean(pr_score_train)=}")

In [None]:
# save predictions of current repeat for plotting
pred = pd.Series(pred, index=y_val.index)
pd.concat([pd.Series(pred), y_val], axis=1).to_csv(f"Results_XGB/{run}/enrichment.csv")