# Model Evaluation

Import libraries/packages + preprocessed data + models

In [1]:
import sys
import os
sys.path.append(os.path.abspath('../'))
from src.build_dnn_model import build_nn_model
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

import joblib

from sklearn import metrics, calibration
import tensorflow as tf

import os
from openpyxl import load_workbook

from src.config import SEED

## Data
y_train, y_test = (pd.read_excel('../data/raw/split/Raw_y_train.xlsx'))['ORN'], (pd.read_excel('../data/raw/split/Raw_y_test.xlsx'))['ORN']
ml_X_train = pd.read_parquet('../data/processed/ml_train_transformed.parquet')
ml_X_test = pd.read_parquet('../data/processed/ml_test_transformed.parquet')
nomo_X_train = pd.read_parquet('../data/processed/nomo_train_transformed.parquet')
nomo_X_test = pd.read_parquet('../data/processed/nomo_test_transformed.parquet')

## Models
lightgbm_clf = joblib.load('../models/LightGBM.joblib')
svc_clf = joblib.load('../models/SVC.joblib')
knn_clf = joblib.load('../models/KNN.joblib')
dnn_clf = joblib.load('../models/DNN.joblib')
stack_clf = joblib.load('../models/stack.joblib')
nomo_clf = joblib.load('../models/NLR.joblib')

Evaluation functions (split for readability)

In [None]:
class_report_df = {}

def get_cm(model_name, y_true, y_pred, data_type, save_plot = False):
    cm = tf.math.confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(10,7))
    sns.heatmap(cm, annot=True, fmt ='d')
    plt.xlabel('Predicted')
    plt.ylabel('Truth')
    plt.title(f'{model_name}: {data_type}')
    if save_plot and data_type == 'Testing':
        plt.savefig(f'../results/figures/CM/{model_name}_CM.pdf', bbox_inches='tight')
    plt.show()

def get_auc_CI(y_true, y_pred, curve_type, n_bootstraps=3000, seed=SEED):
    np.random.seed(seed)
    y_true = np.array(y_true)
    y_pred = np.array(y_pred)
    classes, counts = np.unique(y_true, return_counts=True)
    class_indices = [np.where(y_true == cl)[0] for cl in classes]
    
    scores = []
    for _ in range(n_bootstraps):
        sample_indices = np.concatenate([ np.random.choice(idx, size=count, replace=True) for idx, count in zip(class_indices, counts)])
        if curve_type == 'roc':
            scores.append(metrics.roc_auc_score(y_true[sample_indices], y_pred[sample_indices]))
        elif curve_type == 'prc':
            precision, recall, _ = metrics.precision_recall_curve(y_true[sample_indices], y_pred[sample_indices])
            scores.append(metrics.auc(recall, precision))
    
    return (
        np.percentile(scores, 2.5),
        np.percentile(scores, 97.5)
        )

def get_class_report_CI(y_true, y_pred, n_bootstraps=3000, seed=SEED):
    np.random.seed(seed)
    y_true = np.array(y_true)
    y_pred = np.array(y_pred)
    classes, counts = np.unique(y_true, return_counts=True)
    class_indices = [np.where(y_true == cl)[0] for cl in classes]
    
    all_boot_metrics = []
    for _ in range(n_bootstraps):
        sample_indices = np.concatenate([ 
            np.random.choice(idx, size=count, replace=True) 
            for idx, count in zip(class_indices, counts)])
        # Calculate metrics
        y_true_boot = y_true[sample_indices]
        y_pred_boot = y_pred[sample_indices]
        single_boot_metric = get_discrimination_metrics(y_true_boot, y_pred_boot)

        all_boot_metrics.append((single_boot_metric['Accuracy'], 
                                 single_boot_metric['Precision'],
                                 single_boot_metric['F1-Score'],
                                 single_boot_metric['Sensitivity'], 
                                 single_boot_metric['Specificity'], 
                                 single_boot_metric['MCC']))

    all_boot_metrics = np.array(all_boot_metrics)
    ci_low = np.percentile(all_boot_metrics, 2.5, axis=0)
    ci_high = np.percentile(all_boot_metrics, 97.5, axis=0)
    true_metrics = get_discrimination_metrics(y_true, y_pred)
    return {
        'Accuracy': f'{true_metrics["Accuracy"]:.3f} ({ci_low[0]:.3f}-{ci_high[0]:.3f})',
        'Precision': f'{true_metrics["Precision"]:.3f} ({ci_low[1]:.3f}-{ci_high[1]:.3f})',
        'F1-Score': f'{true_metrics["F1-Score"]:.3f} ({ci_low[2]:.3f}-{ci_high[2]:.3f})',
        'Sensitivity': f'{true_metrics["Sensitivity"]:.3f} ({ci_low[3]:.3f}-{ci_high[3]:.3f})',
        'Specificity': f'{true_metrics["Specificity"]:.3f} ({ci_low[4]:.3f}-{ci_high[4]:.3f})',
        'MCC': f'{true_metrics["MCC"]:.3f} ({ci_low[5]:.3f}-{ci_high[5]:.3f})'
    }
  
def get_callibration_CI(y_pred_proba, y_true, n_bootstraps=3000, seed = SEED):
    np.random.seed(seed)
    y_true = np.array(y_true)
    y_pred_proba = np.array(y_pred_proba)
    classes, counts = np.unique(y_true, return_counts=True)
    class_indices = [np.where(y_true == cl)[0] for cl in classes]
    
    brier_scores = []
    ici_scores = []
    for _ in range(n_bootstraps):
        sample_indices = np.concatenate([
            np.random.choice(idx, size=count, replace=True) 
            for idx, count in zip(class_indices, counts)
            ])
        y_true_boot = y_true[sample_indices]
        y_proba_boot = y_pred_proba[sample_indices]

        # Brier
        brier = metrics.brier_score_loss(y_true_boot, y_proba_boot)
        brier_scores.append(brier)
        #ICI
        prob_true, prob_pred = calibration.calibration_curve(y_true_boot, y_proba_boot, n_bins=3, strategy='uniform')
        ici = np.mean(np.abs(prob_true - prob_pred))
        ici_scores.append(ici)


    brier_ci = [
        np.percentile(brier_scores, 2.5),
        np.percentile(brier_scores, 97.5),
        ]
    ici_ci = [
        np.percentile(ici_scores, 2.5),
        np.percentile(ici_scores, 97.5),
        ]

    return brier_ci, ici_ci

def get_bin_stats(model, model_name, X_test, y_test, save_plot, low_thr = 0.33, high_thr = 0.66):
    y_pred_proba = model.predict_proba(X_test)[:, 1]
    df = pd.DataFrame({'y_true': y_test, 'y_pred_proba': y_pred_proba})
    bins = [0.0, low_thr, high_thr, 1.0]
    bin_labels = ['low', 'medium', 'high']
    df['risk_bin'] = pd.cut(df['y_pred_proba'], bins=bins, labels=bin_labels, include_lowest=True)


    bin_stats = df.groupby('risk_bin').agg(
        N_patients = ('y_true', 'size'),
        Percent = ('y_true', lambda x: 100 * len(x) / len(df)),
        Observed_Event_Rate = ('y_true', 'mean'),
        Avg_Pred_Prob = ('y_pred_proba', 'mean'),
        Positives_in_bin = ('y_true', 'sum')
    )
    total_positives = df['y_true'].sum()
    bin_stats['Percent_of_All_Positives'] = 100 * bin_stats['Positives_in_bin'] / total_positives
    print(bin_stats)

    plt.bar(bin_stats.index, bin_stats['Observed_Event_Rate'], alpha=0.7, label='Observed')
    plt.plot(bin_stats.index, bin_stats['Avg_Pred_Prob'], marker='o', color='red', label='Predicted')
    plt.ylabel('Event Rate')
    plt.xlabel('Risk Bin')
    plt.title(f'Proportions of Average Observed vs Predicted ORN Occurrence: {model_name}')
    plt.legend()
    plt.ylim(0, 1.0)
    if save_plot:
        plt.savefig(f'../results/figures/calibration/bars/{model_name}_CalBar.pdf', bbox_inches='tight')
    plt.show()

def get_discrimination_metrics(y_true, y_pred):
    tn, fp, fn, tp = metrics.confusion_matrix(y_true, y_pred).ravel()
    accuracy = metrics.accuracy_score(y_true, y_pred)
    sensitivity = tp / (tp + fn) 
    specificity = tn / (tn + fp) 
    precision = metrics.precision_score(y_true, y_pred)
    f1 = metrics.f1_score(y_true, y_pred)
    mcc = metrics.matthews_corrcoef(y_true, y_pred)

    return {
    'Accuracy': accuracy,
    'Precision': precision,
    'F1-Score': f1,
    'Sensitivity': sensitivity,
    'Specificity': specificity,
    'MCC': mcc
    }

def plot_ROC(y_true, y_prob, data_type, n_bootstraps=3000, seed=SEED): ## Will return optimal prediction threshold
    fpr, tpr, thresholds = metrics.roc_curve(y_true, y_prob)
    roc_auc = metrics.auc(fpr, tpr)
    lower_CI, upper_CI = get_auc_CI(y_true, y_prob, 'roc', n_bootstraps, seed)
    auroc_string = f'{roc_auc:.3f} ({lower_CI:.3f}-{upper_CI:.3f})'
    model_score = f'AUROC = {auroc_string}'
    
    pr_dif = tpr-fpr
    optimal_idx = np.argmax(pr_dif)
    optimal_threshold = thresholds[optimal_idx]
    plt.plot(fpr, tpr, lw=4, label=f'{data_type} {model_score}')
    return auroc_string, optimal_threshold

def plot_calibration(y_pred_proba, y_true, data_type = 'Train', n_bootstraps = 3000, seed = SEED):
    cal_info = {}
    brier = metrics.brier_score_loss(y_true, y_pred_proba)
    prob_true, prob_pred = calibration.calibration_curve(y_true, y_pred_proba, n_bins=3, strategy='uniform')
    ici = np.mean(np.abs(prob_true - prob_pred))
    brier_list, ici_list = get_callibration_CI(y_pred_proba, y_true, n_bootstraps, seed)
    brier_lower_ci = brier_list[0]
    brier_upper_ci= brier_list[1]
    ici_lower_ci = ici_list[0]
    ici_upper_ci = ici_list[1]
    if data_type == "Train":
        marker = 'o'
    else:
        marker = 's'
        cal_info = {'Brier': brier,
                    'Brier_low_CI': brier_lower_ci,
                    'Brier_high_CI': brier_upper_ci,
                    'ICI': ici,
                    'ICI_low_CI': ici_lower_ci,
                    'ICI_high_CI': ici_upper_ci}
    plt.plot(prob_pred, prob_true, marker=marker, 
             label=f'{data_type} Brier = {brier:.3f} ({brier_lower_ci:.3f}-{brier_upper_ci:.3f}) & ICI = {ici:.3f} ({ici_lower_ci:.3f}-{ici_upper_ci:.3f})')
    return cal_info

def evaluate(model, model_name, X_train, y_train, X_test, y_test, 
             include_training = True, include_test = True, save_results = False):   
    class_report_df[model_name] = {}
    ########## Get prediction probabilities and hard predictions based on threshold ########## 
    ## Probabilities
    y_pred_proba_test = model.predict_proba(X_test)[:, 1] if include_test else None
    y_pred_proba_train = model.predict_proba(X_train)[:, 1] if include_training else None

    ##################### ROC PLOT AND AUROC #########################
    plt.figure(figsize=(12, 8))
    plt.plot([0, 1], [0, 1], color='gray', linestyle='--', label='Random Classifier')
    ##Training##
    _, estimated_threshold = plot_ROC(y_train, y_pred_proba_train, 'Training')
    threshold =  estimated_threshold

    ##Testing##
    if include_test:
        auroc_string, _ = plot_ROC(y_test, y_pred_proba_test, 'Testing')
        
    ####### Plot graphs ######
    #plt.plot([0, 1], [0, 1], color='gray', linestyle='--', label='Random Classifier')
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate', fontsize = 21, fontweight = 550)
    plt.ylabel('True Positive Rate', fontsize = 21, fontweight = 550)
    plt.tick_params(axis = 'both', which = 'major', labelsize=15)
    plt.title(f'{model_name} ROC', fontweight='semibold', fontsize = 25)
    plt.legend(loc="lower right", prop = {'size': 19, 'weight': 550})
    if save_results:
        plt.savefig(f'../results/figures/ROC/{model_name}_ROC.pdf', bbox_inches='tight')
    plt.show() 


    #### Hard predictions + CM + classification report ####
    if include_training: 
        y_pred_train = (y_pred_proba_train >= threshold).astype('int')
        get_cm(model_name, y_train, y_pred_train, 'Training', save_results)
    if include_test: 
        #Get CM
        y_pred_test = (y_pred_proba_test >= threshold).astype('int')
        get_cm(model_name, y_test, y_pred_test, 'Testing', save_results)
        ##Classification report just to print out
        class_report = metrics.classification_report(y_test, y_pred_test) 
        print(class_report)
        ##Further reporting to add to class report
        boot_class_report = get_class_report_CI(y_test, y_pred_test)
        class_report_df[model_name] = boot_class_report
        #Add AUROC to class_report
        class_report_df[model_name]['AUROC (95% CI)'] = auroc_string
        

    ##################### CALIBRATION #########################
    plt.figure(figsize=(12, 8))
    plt.plot([0, 1], [0, 1], linestyle='--', color='gray', label='Perfect Calibration')
    plot_calibration(y_pred_proba_train, y_train, data_type='Train')
    if include_test:
        calibration_info = plot_calibration(y_pred_proba_test, y_test, data_type='Test')
        class_report_df[model_name]['Brier Score (95% CI)'] = f'{calibration_info["Brier"]:.3f} ({calibration_info["Brier_low_CI"]:.3f}-{calibration_info["Brier_high_CI"]:.3f})'
        class_report_df[model_name]['Integrated Calibration Index (ICI) (95% CI)'] = f'{calibration_info["ICI"]:.3f} ({calibration_info["ICI_low_CI"]:.3f}-{calibration_info["ICI_high_CI"]:.3f})'
        class_report_df[model_name]['Threshold'] = np.round(threshold, 3)
        
    
    plt.xlabel('Predicted Probability')
    plt.ylabel('Fraction of Positives')
    plt.title(f'{model_name}')
    plt.legend(loc='upper left', prop = {'size': 15, 'weight': 550})
    plt.grid(True)
    if save_results:
        plt.savefig(f'../results/figures/calibration/curves/{model_name}_CalCurve.pdf', bbox_inches='tight')
    plt.show()
    if include_test:
        get_bin_stats(model, model_name, X_test, y_test, save_results, low_thr = 0.33, high_thr = 0.66)
    print(f'Threshold used for hard predictions: {threshold:.3f}')
    

Get ROC, CM, and discrimination values for each model

In [None]:
base_models = {
    'LightGBM': lightgbm_clf,
    'SVC': svc_clf,
    'KNN': knn_clf,
    'DNN': dnn_clf,
    'Stack': stack_clf
}
for model_name, model in base_models.items():
    evaluate(model, model_name, 
             ml_X_train, y_train, 
             ml_X_test, y_test,
             save_results=True)
    
evaluate(nomo_clf, 'LR-Nomogram',
         nomo_X_train, y_train,
         nomo_X_test, y_test)

class_report_df_saved = pd.DataFrame(class_report_df).T
display(class_report_df_saved)
class_report_df_saved.to_excel('../results/tables/class_report.xlsx')

# AUROC P-values

In [None]:
os.environ['R_HOME'] = '/Library/Frameworks/R.framework/Resources'
%reload_ext rpy2.ipython

##Y test true
y_true = y_test.to_list()
%R -i y_true

##NOTE: predict_proba() returns 2-d array, second entry is predicted prob of ORN occurance
#LightGBM
y_proba_light = lightgbm_clf.predict_proba(ml_X_test)[:,1]
%R -i y_proba_light
#SVC
y_proba_svc = svc_clf.predict_proba(ml_X_test)[:,1]
%R -i y_proba_svc
#KNN
y_proba_knn = knn_clf.predict_proba(ml_X_test)[:,1]
%R -i y_proba_knn
#DNN
y_proba_nn = dnn_clf.predict_proba(ml_X_test)[:,1]
%R -i y_proba_nn
#Stack
y_proba_stack = stack_clf.predict_proba(ml_X_test)[:,1]
%R -i y_proba_stack

#Nomogram (LR)
y_proba_nomo = nomo_clf.predict_proba(nomo_X_test)[:,1]
%R -i y_proba_nomo

In [None]:
%%R
library(pROC)
get_pval <- function(y_true, y_proba_nomo, model_pred) {
  roc_baseline <- roc(y_true, y_proba_nomo)
  roc_model <- roc(y_true, model_pred)
  test_result <- roc.test(roc_model, roc_baseline, method = 'delong')
  return(test_result)
}
light_pval = get_pval(y_true, y_proba_nomo, y_proba_light)
svc_pval = get_pval(y_true, y_proba_nomo, y_proba_svc)
knn_pval = get_pval(y_true, y_proba_nomo, y_proba_knn)
nn_pval = get_pval(y_true, y_proba_nomo, y_proba_nn)
stack_pval = get_pval(y_true, y_proba_nomo, y_proba_stack)

In [None]:
%%R -o roc_sig_df
# Helper function to extract elements from roc.test objects
extract_pval_stats <- function(test_obj) {
  list(
    P_Value = round(test_obj$p.value, 4),
    Z_Statistic = round(test_obj$statistic, 4),
    AUC_Model = round(test_obj$estimate[1], 4),
    AUC_Baseline = round(test_obj$estimate[2], 4),
    CI_Lower = round(test_obj$conf.int[1], 4),
    CI_Upper = round(test_obj$conf.int[2], 4)
  )
}

# List of all test objects
pval_objects <- list(light_pval, svc_pval, knn_pval, nn_pval, stack_pval)

# Apply extraction function to all objects
extracted_data <- lapply(pval_objects, extract_pval_stats)

# Convert to data frame and add model names
roc_sig_df <- do.call(rbind, lapply(extracted_data, data.frame))
roc_sig_df$Model <- c("LightGBM", "SVC", "KNN", "Deep Neural Network", "Stack")

# Reorder columns for clarity
roc_sig_df <- roc_sig_df[, c("Model", "P_Value", "Z_Statistic",
                            "AUC_Model", "AUC_Baseline",
                            "CI_Lower", "CI_Upper")]

In [None]:
display(roc_sig_df)
roc_sig_df.to_excel('../results/tables/roc_sig_table.xlsx', index = False)

# Bin Information

In [None]:
excel_path = '../results/tables/all_models_bin_info.xlsx'
if os.path.exists(excel_path):
    os.remove(excel_path)

y_scores = {}
y_scores['LR-Nomogram'] = model.predict_proba(nomo_X_test)[:, 1]
for model_name, model in base_models.items():
    y_scores[model_name] = model.predict_proba(ml_X_test)[:, 1]

for model_name, y_pred_proba in y_scores.items():
    print(f'--------------------- {model_name} ---------------------')
    ## Get actual positive rate and mean predicted probability
    prob_true, prob_pred = calibration.calibration_curve(y_test, y_pred_proba, n_bins=3, strategy='uniform')

    ## Put predictions into bins
    df = pd.DataFrame({'y_pred_proba': y_pred_proba, 'y_true': y_test})
    df = df.sort_values(by='y_pred_proba').reset_index(drop=True)
    bin_edges = np.linspace(0, 1,  4)
    bin_labels = range(len(bin_edges)-1)
    df['bin'] = pd.cut(df['y_pred_proba'], bins=bin_edges, include_lowest=True, labels=bin_labels)
    df['bin'] = pd.Categorical(df['bin'], categories=bin_labels)
    ## Create table
    calibration_table = df.groupby('bin', observed=False).agg(
        num_patients=('y_true', 'size'),
        mean_predicted_prob= ('y_pred_proba', 'mean'),
        actual_positive_rate=('y_true', 'mean')
    ).reset_index()
    ##Round
    calibration_table['mean_predicted_prob'] = calibration_table['mean_predicted_prob'].round(3)
    calibration_table['actual_positive_rate'] = calibration_table['actual_positive_rate'].round(3)
    ##Fill NA for models w/ bins w/o any allocated patients
    calibration_table['mean_predicted_prob'] = calibration_table['mean_predicted_prob'].fillna("None assigned")
    calibration_table['actual_positive_rate'] = calibration_table['actual_positive_rate'].fillna("None assigned")
    # Assign the appropriate bin range to each row (by bin index)
    bin_range_labels = []
    for i in range(len(bin_edges)-1):
        if i == 0:
            # First bin: closed on left
            s = f"[{bin_edges[i]:.2f}-{bin_edges[i+1]:.2f}]"
        else:
            # Subsequent bins: open on left
            s = f"({bin_edges[i]:.2f}-{bin_edges[i+1]:.2f}]"
        bin_range_labels.append(s)
    calibration_table['bin_range'] = calibration_table['bin'].apply(lambda i: bin_range_labels[i])
    display(calibration_table)
    ##Export
    with pd.ExcelWriter(excel_path, engine='openpyxl', mode='a' if os.path.exists(excel_path) else 'w') as writer:
        calibration_table.to_excel(writer, sheet_name=model_name, index=False)