# Setting

In [None]:
# Standard library imports
import os
import math
import re
import random
import warnings

# Data manipulation and numeric libraries
import numpy as np
import pandas as pd
import scipy.stats as stats
import scipy.ndimage as ndimage
from scipy.interpolate import interp1d

# Machine learning and optimization libraries
import joblib
from joblib import dump, load
import optuna
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import (
    train_test_split, StratifiedKFold, GroupShuffleSplit, cross_val_score,
    RandomizedSearchCV, KFold
)
from sklearn.ensemble import RandomForestClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import (
    balanced_accuracy_score, accuracy_score, confusion_matrix, precision_score,
    recall_score, f1_score, roc_auc_score, precision_recall_curve, auc, 
    make_scorer, average_precision_score, precision_recall_fscore_support, 
    roc_curve, ConfusionMatrixDisplay
)
from sklearn.utils import resample, class_weight
from sklearn.preprocessing import MinMaxScaler, RobustScaler, StandardScaler, MaxAbsScaler
from imblearn.over_sampling import SMOTE

# Deep learning libraries
import tensorflow as tf
from tensorflow.keras import Input, optimizers, backend as K
from tensorflow.keras.models import Sequential, model_from_json
from tensorflow.keras.layers import (
    GlobalAveragePooling1D, MaxPool1D, AvgPool1D, LeakyReLU, TimeDistributed,
    GlobalMaxPooling2D, Bidirectional, ReLU, Reshape, Activation, LSTM,
    MaxPool2D, Conv2D, MaxPooling1D, BatchNormalization, Dense, Dropout,
    Flatten, Conv1D, GlobalMaxPooling1D, GlobalAveragePooling2D, AvgPool2D,
    SimpleRNN
)
from tensorflow.keras.callbacks import EarlyStopping, TensorBoard
from tensorflow.keras.optimizers import RMSprop, Adam, Nadam, SGD, Adamax
from keras import regularizers


# XGBoost library
import xgboost as xgb
from xgboost import XGBClassifier

# File and plotting libraries
from pathlib import Path
from matplotlib import pyplot as plt
from scipy.io import savemat
from tqdm import tqdm

# Suppress all warnings
warnings.filterwarnings("ignore")


In [None]:
def predict_score(model, x, y, print_results=True, deep_model=False, batch_size= 256):
    """
    Predicts the score for the given model and data.
    
    Args:
    - model: Trained model.
    - x: Input data.
    - y: True labels.
    - print_results: Whether to print the results.
    - deep_model: Indicates the type of model (True for deep learning models, False for others, -1 for special case).
    
    Returns:
    - probs: Predicted probabilities.
    - metrics: List of [tp, tn, fp, fn].
    - scores: List of [auc, auprc, acc, sens, sp, ppv, npv].
    """
    
    # Predict probabilities based on the model type
    if deep_model == True:
        probs = model.predict(x, verbose=0, batch_size = batch_size)[:, 0]
    elif deep_model == -1:
        reconstruction_error = np.square(x[:, 0] - model.predict_proba(x)[:, 1])
        probs = 1 - stats.norm.cdf(reconstruction_error)
    else:
        probs = model.predict_proba(x)[:, 1]
    
    # Convert probabilities to binary predictions
    preds = (probs > 0.5).astype('int32')
    
    # Compute confusion matrix values
    tp = np.sum((preds == 1) & (y == 1))
    tn = np.sum((preds == 0) & (y == 0))
    fp = np.sum((preds == 1) & (y == 0))
    fn = np.sum((preds == 0) & (y == 1))
    
    # Compute metrics
    auc = 1 if np.unique(y).size == 1 else roc_auc_score(y, probs)
    auprc = auprc_scoring(y, probs)
    npv = tn / (tn + fn)
    ppv = tp / (tp + fp) if (tp + fp) > 0 else 0
    sp = tn / (tn + fp) if (tn + fp) > 0 else 0
    sens = tp / (tp + fn) if (tp + fn) > 0 else 0
    acc = (sp + sens) / 2
    
    # Print results if required
    if print_results:
        print(f'AUC: {auc:.4f}, AUPRC: {auprc:.4f}, Accuracy: {acc:.4f}, Sensitivity: {sens:.4f}, Specificity: {sp:.4f}, NPV: {npv:.4f}, PPV: {ppv:.4f}')
    
    return probs, [tp, tn, fp, fn], [auc, auprc, acc, sens, sp, ppv, npv]

def auprc_scoring(y_true, y_pred_proba):
    precision, recall, thresholds = precision_recall_curve(y_true, y_pred_proba)
    return auc(recall, precision)

class data_sacler: # just for implement
    def __init__(self, prescale=False, scaler_opts='RobustScaler', datascale =False,scaleset=False):
        self.prescale = prescale
        self.scaler_opts = scaler_opts
        self.scaleset = scaleset
        self.datascale = datascale
        self.scaler1 = None
        self.scaler2 = None
        
    def scaler_set(self, scaler1, scaler2):
        self.scaler1 = scaler1
        self.scaler2 = scaler2
    
    def transform(self, x_data):
        if self.prescale == 'log': 
            x_data[:,[0,1]] = np.log(x_data[:,[0,1]]) 
        x_data_tmp = x_data[:,:2].shape
        if self.datascale:            
            x_data[:,:2] = self.scaler1.transform(x_data[:,:2].reshape(-1,1)).reshape(x_data_tmp)
        if self.scaleset:
             x_data[:,-1] = self.scaler2.transform(x_data[:,-1].reshape(-1,1)).reshape(-1)
        return x_data
    
    def fit_transform(self, x_data ):
        if self.prescale == 'log': 
            x_data[:,[0,1]] = np.log(x_data[:,[0,1]])
        
        if self.scaler_opts == 'RobustScaler':
            self.scaler1 = RobustScaler()
            self.scaler2 = RobustScaler()

        elif self.scaler_opts == 'MinMaxScaler':
            self.scaler1 = MinMaxScaler()
            self.scaler2 = MinMaxScaler()

        elif self.scaler_opts == 'StandardScaler':
            self.scaler1 = StandardScaler()
            self.scaler2 = StandardScaler()
            
        elif self.scaler_opts == 'MaxAbsScaler':
            self.scaler1 = MaxAbsScaler()
            self.scaler2 = MaxAbsScaler()
        else:
            self.scaler1=None
            self.scaler2=None
            
        x_data_tmp = x_data[:,:2].shape
        if self.datascale:
            x_data[:,:2] = self.scaler1.fit_transform(x_data[:,:2].reshape(-1,1)).reshape(x_data_tmp)
        if self.scaleset:
            x_data[:,-1] = self.scaler2.fit_transform(x_data[:,-1].reshape(-1,1)).reshape(-1)
        
        return x_data, self.scaler1, self.scaler2

def oversampling(X_train, y_train, smote= False):
    """
    Perform oversampling to address class imbalance in training data.
    
    Args:
    X_train (numpy.array): The training feature data.
    y_train (numpy.array): The corresponding training target labels.
    smote (bool or int): If True, use SMOTE for oversampling. If -1, no oversampling is performed.
                         Any other value triggers random oversampling of the minority class.
    
    Returns:
    numpy.array: The oversampled training feature data.
    numpy.array: The corresponding oversampled training target labels.
    """
    if smote:
        sm = SMOTE(random_state=42)
        X_train_upsampled, y_train_upsampled = sm.fit_resample(X_train,y_train)
    elif smote == -1:
        X_train_upsampled, y_train_upsampled = X_train, y_train
    else:
        X_train_1 = X_train[y_train == 1]
        y_train_1 = y_train[y_train == 1]

        n_samples = (y_train == 0).sum()  
        indices = np.random.choice(X_train_1.shape[0], n_samples, replace=True) 
        X_train_1_upsampled = X_train_1[indices]  
        y_train_1_upsampled = y_train_1[indices]  

        X_train_upsampled = np.vstack([X_train[y_train == 0], X_train_1_upsampled])
        y_train_upsampled = np.concatenate([y_train[y_train == 0], y_train_1_upsampled])
    return X_train_upsampled,y_train_upsampled

# models
def hp_dnn(trial, input_dim):
    num_layers = trial.suggest_int('num_layers', 2, 20)  
    activ_fcn = trial.suggest_categorical('dense_activation', ['relu', 'elu', 'gelu'])
    optimizer_name = trial.suggest_categorical('optimizer', ['Adam'])
    lr = trial.suggest_categorical('lr', [0.001, 0.0005, 0.0001])
    input_tensor = tf.keras.layers.Input(shape=input_dim)
    x = input_tensor
    for i in range(num_layers):
        units = trial.suggest_int(name='units_' + str(i), low=32, high=512, step=32)  
        x = tf.keras.layers.Dense(units=units, activation=activ_fcn,
                                  kernel_initializer=trial.suggest_categorical('kernel_initializer_' + str(i), ['he_normal', 'glorot_uniform', 'lecun_normal']))(x)
        batch_norm = trial.suggest_categorical('batch_' + str(i), [True, False])
        if batch_norm:
            x = tf.keras.layers.BatchNormalization()(x)
        dropout = trial.suggest_categorical('dropout_' + str(i), [True, False])
        if dropout:
            dropout_rate = trial.suggest_float('dropout_rate_' + str(i), 0.1, 0.5, step=0.1)  # 드롭아웃 비율 조정
            x = tf.keras.layers.Dropout(dropout_rate)(x)
        l2_reg = trial.suggest_float('l2_reg_' + str(i), 1e-6, 1e-1, log=True)
        x = tf.keras.layers.Dense(units=units, activation=activ_fcn, 
                                  kernel_regularizer=tf.keras.regularizers.l2(l2_reg))(x)
    x = tf.keras.layers.Dense(1, activation='sigmoid')(x)
    model = tf.keras.Model(inputs=input_tensor, outputs=x)
    if optimizer_name == 'Adam':
        optimizer = Adam(learning_rate=lr)
    # elif optimizer_name == 'Nadam':
    #     optimizer = Nadam(learning_rate=lr)
    # else:  # RMSprop
    #     optimizer = RMSprop(learning_rate=lr)
    model.compile(loss='binary_crossentropy',
                  optimizer=optimizer,
                  metrics=[tf.keras.metrics.AUC(name='PR', curve='PR'),
                           tf.keras.metrics.BinaryAccuracy(name='bacc', threshold=0.5),
                           tf.keras.metrics.AUC(curve='ROC')])
    return model

def hp_xgb(trial):
    param = {
        'objective': 'binary:logistic',
        'booster': trial.suggest_categorical('booster', ['gbtree', 'dart']),
        'lambda': trial.suggest_float('lambda', 1e-8, 1.0, log=True),
        'alpha': trial.suggest_float('alpha', 1e-8, 1.0, log=True),
        'learning_rate': trial.suggest_categorical('lr',[0.001, 0.0005, 0.0001]), #1e-4
        'subsample': trial.suggest_float('subsample', 0.5, 1.0),
        'max_depth': trial.suggest_int('max_depth', 10, 1000),
        'min_child_weight': trial.suggest_int('min_child_weight', 1, 40),
        'n_estimators': trial.suggest_int('n_estimators', 10, 1000, step=10),
    }
    model = xgb.XGBClassifier(**param)
    return model

def hp_rf(trial):
    param = {
        'n_estimators': trial.suggest_int('n_estimators', 10, 1000, step=100),
        'max_depth': trial.suggest_int('max_depth', 1, 100, log=True),
        'min_samples_split': trial.suggest_int('min_samples_split', 2, 16, log=True),
        'min_samples_leaf': trial.suggest_int('min_samples_leaf', 2, 10),
        'max_features': trial.suggest_categorical('max_features', ['auto', 'sqrt', 'log2']),
        'bootstrap': trial.suggest_categorical('bootstrap', [True, False]),
        'criterion': trial.suggest_categorical('criterion', ['gini', 'entropy']),
        'class_weight': 'balanced',
    }    
    model = RandomForestClassifier(**param)
    return model

def hp_lr(trial):
    param = {
#         'C': trial.suggest_loguniform('C',0.001, 100),  
        'penalty': trial.suggest_categorical('penalty', ['none','l2']),
#         'solver': trial.suggest_categorical('solver', ['liblinear', 'newton-cg', 'lbfgs', 'sag', 'saga']),
        'max_iter': trial.suggest_categorical('max_iter',[100,1000,2500,5000]),
        'class_weight': 'balanced',
    }

# research process function
def gen_dbset(x, n_splits=10, scale = True):
    """
    Generates training and testing datasets by splitting the input dataframe.
    
    Args:
    x (pandas.DataFrame): Input data frame that includes at least two columns for features.
    n_splits (int): Number of splits to perform on the data. Default is 10.
    scale (bool): Whether to apply scaling to the input features. Default is True.
    
    Returns:
    tuple: Two lists containing numpy arrays for training and testing datasets respectively.
    """
    
    tdf = x.copy()  
    tdf = tdf.dropna()  
    input_data = tdf.iloc[:,:2].to_numpy()
    # clinical_setting = tdf['clinical_setting'] # if exist

    #--------------------------------
    # Data must be preprocessed in the previous step unconditionally, but added for implementation and execution.
    if scale == True:
        tscaler = data_sacler('log','RobustScaler',True,False)
        scaled_input_data,_,_ = tscaler.fit_transform(input_data.copy())
    else:
        scaled_input_data = input_data
    #--------------------------------

    print('=====================')
    print('Raw db shape:',scaled_input_data.shape)
    
    # skf = StratifiedKFold(n_splits = n_splits)
    skf = KFold(n_splits=n_splits) 
    nx_train, nx_test = [],[]
    # for nest_idx, (x_train, x_test) in enumerate(skf.split(scaled_input_data,clinical_setting)):
    for nest_idx, (x_train, x_test) in enumerate(skf.split(scaled_input_data)):
        # print('------------------Nest :',nest_idx)
        nx_train.append(scaled_input_data[x_train])
        nx_test.append(scaled_input_data[x_test])
        # print('train : ',scaled_input_data[x_train].shape,'/','test :',scaled_input_data[x_test].shape)
    return nx_train, nx_test

def nested_train_model(x_train_all, model_str, n_trials =10, shuffle_ratio = 0.01, verbose_tr =0):
    """
    Trains models specified by `model_str` on multiple datasets using hyperparameter optimization.
    The function performs training within a nested structure to optimize hyperparameters for each dataset independently.

    Args:
    x_train_all (list of numpy.ndarray): List of training datasets.
    model_str (str): Specifies the type of model to train. Supported types are 'dnn' (Deep Neural Network),
                     'rnn' (Recurrent Neural Network), 'cnn' (Convolutional Neural Network),
                     'xgb' (XGBoost), 'rf' (Random Forest), and 'lr' (Logistic Regression).
    n_trials (int, optional): Number of trials for hyperparameter optimization (default is 10).
    shuffle_ratio (float, optional): Ratio of the data to shuffle to create training variability (default is 0.01).
    verbose_tr (int, optional): Verbosity mode for training output, 0 = silent, 1 = progress bar, 2 = one line per epoch (default is 0).

    Returns:
    tuple: 
        - list of Optuna study objects containing the results of the optimization.
        - list of the best models trained on each dataset.

    Each dataset in `x_train_all` is processed to shuffle, oversample, and then split into training and validation subsets.
    Model performance is optimized for balanced accuracy using Optuna.
    """
    study_all = []
    best_models = []

    for nest_idx,x_train in enumerate(x_train_all):
        tf.keras.backend.clear_session()       
        x_train, y_train = shuffle_data(x_train, shuffle_ratio=shuffle_ratio, random_state = 1111)
        x_train, y_train = oversampling(x_train, y_train)
        x_train, x_val, y_train, y_val = train_test_split(x_train,y_train, test_size=0.25, stratify=y_train)
        epoch = 20
        study=[]
        # class_weights = compute_class_weight('balanced', classes=np.unique(y_train), y=y_train)
        # class_weights = {i: class_weights[i] for i in range(2)}
        early_stopping_cb = tf.keras.callbacks.EarlyStopping(patience=100, restore_best_weights=True)

        def objective(trial):
            model = []
            if model_str in ('dnn','rnn','cnn'):
                input_dim = x_train.shape[1]
                if model_str == 'dnn':
                    model = hp_dnn(trial, input_dim)
                # elif model_str == 'rnn':
                #     model = hp_rnn(trial, input_dim)
                # elif model_str == 'cnn':
                #     model = hp_cnn(trial, input_dim)
                model.fit(x_train, y_train, epochs=epoch, validation_data=(x_val, y_val),
                                    callbacks=[early_stopping_cb], verbose=verbose_tr, batch_size=256)
                preds = (model.predict(x_val)>0.5).astype(int)
                
            elif model_str == 'xgb':
                model = hp_xgb(trial)
                model.fit(x_train, y_train,
                        eval_set = [(x_train,y_train), (x_val,y_val)],
                        eval_metric = 'auc',
                        early_stopping_rounds=100, verbose = verbose_tr)
                preds = model.predict(x_val)

            elif model_str in ('rf','lr'):
                if model_str == 'rf':
                    model = hp_rf(trial)
                if model_str == 'lr':
                    model = hp_lr(trial)
                model.fit(x_train, y_train)
                preds = model.predict(x_val)
            return balanced_accuracy_score(y_val, preds)
            
        study = optuna.create_study(direction='maximize')
        study.optimize(objective, n_trials = n_trials, show_progress_bar=True)
        study_all.append(study)

        best_trial = study.best_trial
        if model_str in ('dnn', 'rnn', 'cnn'):
            input_dim = x_train.shape[1]
            if model_str == 'dnn':
                best_model = hp_dnn(best_trial, input_dim)
            # elif model_str == 'rnn':
            #     best_model = hp_rnn(best_trial, input_dim)
            # elif model_str == 'cnn':
            #     best_model = hp_cnn(best_trial, input_dim)
            best_model.fit(x_train, y_train, epochs=epoch, validation_data=(x_val, y_val),
                           callbacks=[early_stopping_cb], verbose=verbose_tr, batch_size=256)
        
        elif model_str == 'xgb':
            best_model = hp_xgb(best_trial)
            best_model.fit(x_train, y_train,
                           eval_set=[(x_train, y_train), (x_val, y_val)],
                           eval_metric='auc',
                           early_stopping_rounds=100, verbose=verbose_tr)
        
        elif model_str in ('rf', 'lr'):
            if model_str == 'rf':
                best_model = hp_rf(best_trial)
            if model_str == 'lr':
                best_model = hp_lr(best_trial)
            best_model.fit(x_train, y_train)
        
        best_models.append(best_model)
    return study_all, best_models  

def shuffle_data(x, shuffle_ratio, random_state=None):
    """
    Shuffles a portion of the data in the input array based on the specified split ratio.
    
    Args:
    x (numpy.array): The input data array to be shuffled.
    shuffle_ratio (float): The fraction of the array to shuffle (0 < shuffle_ratio <= 1).
    random_state (int, optional): A seed number to make the shuffle deterministic. Default is None.
    
    Returns:
    numpy.array: The shuffled array.
    numpy.array: An indicator array where shuffled indices are marked as 1 and others as 0.
    
    The function copies the input array, shuffles a portion of the first column of the array based on
    the split ratio, and returns the shuffled array along with an indicator array that marks which
    elements were shuffled.
    """

    # copy
    arr = x.copy()
    # Determine the indices that should be shuffled
    shuffle_idx = np.random.RandomState(random_state).permutation(arr.shape[0])[:int(arr.shape[0] * shuffle_ratio)]
    # Shuffle only the first column of the selected indices
    arr[shuffle_idx, 0] = np.random.RandomState(random_state).permutation(arr[:, 0][shuffle_idx])
    # Create an indicator array that marks shuffled indices
    shuffle_indicator = np.zeros(arr.shape[0])
    shuffle_indicator[shuffle_idx] = 1
    return arr, shuffle_indicator

def nested_evaluate_model(x_test_all, optimized_models, model_str, n_iter = 1000, test_shuffle_ratio=0.01):
    """
    Evaluate a list of optimized models on multiple shuffled test datasets to simulate real-world variability 
    and generate performance metrics.

    Args:
    x_test_all (list of numpy.ndarray): List of test datasets where each dataset corresponds to a model in `optimized_models`.
    optimized_models (list of models): List of trained and optimized models to be evaluated.
    model_str (str): Type of the models to determine the prediction approach; options include 'dnn' for deep neural networks and others.
    n_iter (int, optional): Number of iterations for generating test sets through permutation, default is 1000.
    test_shuffle_ratio (float, optional): Ratio of the test data to shuffle for each iteration to create variability, default is 0.01.

    Returns:
    dict: A dictionary containing lists of various performance metrics:
        - 'scores': List of overall performance scores for each model.
        - 'probabilities': List of predicted probabilities for each test dataset.
        - 'tpr': True Positive Rates for ROC curve analysis.
        - 'fpr': False Positive Rates for ROC curve analysis.
        - 'precision': Precision values for Precision-Recall curve analysis.
        - 'recall': Recall values for Precision-Recall curve analysis.

    Each metric list contains an element for each model evaluated.
    """

    scores_all, probs_all, tpr_all, fpr_all, prer_all, recall_all = [],[],[],[],[],[]
    for nest_idx, x_test in enumerate(x_test_all):
        x_test_whole, y_test_whole = [],[]
        # error set generation

        # Generating test sets with permutation test-based iterative simulation
        for i in tqdm(range(n_iter)):
            tx_te, ty_te = shuffle_data(x_test,shuffle_ratio=test_shuffle_ratio, random_state=i)
            x_test_whole.append(tx_te)
            y_test_whole.append(ty_te)
        x_test_whole = np.vstack(x_test_whole)
        y_test_whole = np.hstack(y_test_whole)

        tf.keras.backend.clear_session()
        best_model = []
        best_model = optimized_models[nest_idx]

        if model_str == 'dnn':
            probs,_,scores = predict_score(best_model,x_test_whole,y_test_whole, print_results=True, deep_model=True)
        else:
            probs,_,scores = predict_score(best_model,x_test_whole,y_test_whole, print_results=True, deep_model=False)

        scores_all.append(scores)
        probs_all.append(probs)

        fpr, tpr, thresholds = roc_curve(y_test_whole, probs)
        fpr,tpr=interpolate_data(fpr,tpr,inverse = False)
        tpr_all.append(tpr)
        fpr_all.append(fpr)    

        prer,recall,_ = precision_recall_curve(y_test_whole,probs)    
        recall,prer=interpolate_data(recall,prer,inverse = True)
        prer_all.append(prer)
        recall_all.append(recall) 

    return {
        'scores': scores_all,
        'probabilities': probs_all,
        'tpr': tpr_all,
        'fpr': fpr_all,
        'precision': prer_all,
        'recall': recall_all
    } 

def interpolate_data(x, y, num_points=100, err=0, inverse=False):
    """
    Interpolates or inversely interpolates x and y data points based on the original data and a boolean flag.
    Use to match TPR, FPR counts
    
    Args:
    x (numpy.array): Original x values.
    y (numpy.array): Original y values.
    num_points (int): The number of interpolated data points to generate. Default is 100.
    err (int): Error margin to add additional points for edge cases. Default is 0.
    inverse (bool): If True, perform inverse interpolation; otherwise, perform normal interpolation. Default is False.
    
    Returns:
    tuple: new_x, new_y containing the interpolated data points.
    
    Example usage:
    # For normal interpolation:
    new_x, new_y = interpolate_data(x, y, inverse=False)
    # For inverse interpolation:
    new_x, new_y = interpolate_data(x, y, inverse=True)
    """
    # Insert start and end values based on the inverse flag
    if inverse:
        start_end = (1, 0)  # Inverse interpolation
    else:
        start_end = (0, 1)  # Normal interpolation
    
    x = np.insert(x, 0, 0)  # Inserting 0 at the beginning of the x array
    x = np.append(x, 1)    # Appending 1 at the end of the x array
    y = np.insert(y, 0, start_end[0])  # Inserting the start value at the beginning of the y array
    y = np.append(y, start_end[1])    # Appending the end value at the end of the y array
    
    # Create an interpolator object and generate new points
    interpolator = interp1d(x, y, kind='linear', fill_value='extrapolate')
    new_x = np.linspace(0, 1, num=num_points + err)  # Generating new x values
    new_y = interpolator(new_x)  # Generating new y values using the interpolator

    return new_x, new_y

# Example of AI model

In [None]:
x_train_all, x_test_all, study_all = [],[],[]

# error simulation : shuffle ratio (DO NOT exceed 0.5)
train_shuffle_ratio = 0.1
test_shuffle_ratio = 0.01

# Load data
db=pd.read_pickle('../DB/sample_db.pkl')

# Generate dataset
x_train_all, x_test_all = gen_dbset(db, n_splits=5)

# Train and optimize model 
model_str = 'dnn' # Select model : 'dnn', 'xgb', 'rf', 'lr'
study_all, best_models = nested_train_model(x_train_all, model_str, n_trials =1, shuffle_ratio = train_shuffle_ratio)

# Evaluation model with permutation test
result_all=nested_evaluate_model(x_test_all, best_models, model_str, n_iter = 1000, test_shuffle_ratio=test_shuffle_ratio)
avg_scores=np.mean(result_all['scores'], axis=0)
metrics_names = ["AUC", "AUPRC", "Accuracy", "Sensitivity", "Specificity", "PPV", "NPV"]
for name, score in zip(metrics_names, avg_scores):
    print(f"{name}: {score:.6f}")

# External validation example

In [None]:
# Load external DB and Generate exteranl validation dataset
external_x_train_all, external_x_test_all = gen_dbset(external_db, n_splits=5)

# 1. external validation (use internally developed model)
result_all=nested_evaluate_model(external_x_test_all, best_models, model_str, n_iter = 1000, test_shuffle_ratio=shuffle_ratio)

# 2. Optimal performance model for external data
study_all, best_models = nested_train_model(external_x_train_all, model_str, n_trials =100, shuffle_ratio = shuffle_ratio)
result_all=nested_evaluate_model(external_x_test_all, best_models, model_str, n_iter = 1000, test_shuffle_ratio=shuffle_ratio)
