# Comprehensive Validation and Testing for Sepsis Prediction Models

This notebook implements comprehensive validation strategies and testing methodologies to ensure robust and clinically relevant sepsis prediction models.

## Validation Strategies
1. **Temporal Validation**: Time-based data splits to test model performance over time
2. **Clinical Performance Metrics**: Healthcare-specific evaluation metrics
3. **Feature Importance Aggregation**: Robust feature importance across multiple models
4. **Model Interpretability**: SHAP values and LIME explanations
5. **Bias and Fairness Evaluation**: Performance across different demographic groups
6. **Subgroup Analysis**: Performance across different patient populations

## Testing Framework
- **Robustness Testing**: Model performance under different conditions
- **Stability Analysis**: Consistency across multiple random seeds
- **Clinical Validation**: Alignment with medical knowledge and guidelines
- **Edge Case Testing**: Performance on rare but critical cases
- **Uncertainty Quantification**: Model confidence and prediction reliability

## Clinical Validation Metrics
- Sensitivity (Recall) for sepsis detection
- Specificity to minimize false alarms
- Positive/Negative Predictive Values
- Early detection capability
- Clinical decision support integration

In [1]:
# Import required libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import warnings
import os
import pickle
import joblib
from datetime import datetime, timedelta
import time
from collections import defaultdict
import itertools

# Machine learning and validation
from sklearn.model_selection import (
    TimeSeriesSplit, StratifiedKFold, LeaveOneGroupOut,
    cross_val_score, cross_validate, validation_curve
)
from sklearn.metrics import (
    accuracy_score, precision_score, recall_score, f1_score,
    roc_auc_score, roc_curve, precision_recall_curve,
    confusion_matrix, classification_report,
    average_precision_score, matthews_corrcoef,
    cohen_kappa_score, brier_score_loss
)
from sklearn.calibration import calibration_curve, CalibratedClassifierCV
from sklearn.inspection import permutation_importance

# Interpretability and explainability
try:
    import shap
    SHAP_AVAILABLE = True
except ImportError:
    print("SHAP not available. Install with: pip install shap")
    SHAP_AVAILABLE = False

try:
    import lime
    import lime.lime_tabular
    LIME_AVAILABLE = True
except ImportError:
    print("LIME not available. Install with: pip install lime")
    LIME_AVAILABLE = False

# Statistical tests
from scipy import stats
from scipy.stats import chi2_contingency, mannwhitneyu, ttest_ind
import statsmodels.api as sm
from statsmodels.stats.proportion import proportions_ztest

# Plotting and visualization
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import plotly.figure_factory as ff

# Fairness evaluation
try:
    from aif360.datasets import BinaryLabelDataset
    from aif360.metrics import BinaryLabelDatasetMetric, ClassificationMetric
    AIF360_AVAILABLE = True
except ImportError:
    print("AIF360 not available. Install for fairness analysis.")
    AIF360_AVAILABLE = False

warnings.filterwarnings('ignore')
plt.style.use('seaborn-v0_8')

  from .autonotebook import tqdm as notebook_tqdm


LIME not available. Install with: pip install lime
AIF360 not available. Install for fairness analysis.


In [2]:
# Configuration for validation and testing
class ValidationConfig:
    DATA_PATH = "data/processed/"
    MODELS_PATH = "models/"
    RESULTS_PATH = "results/validation/"
    PLOTS_PATH = "plots/validation/"
    
    # Create directories
    for path in [RESULTS_PATH, PLOTS_PATH]:
        os.makedirs(path, exist_ok=True)
    
    # Validation parameters
    RANDOM_STATE = 42
    N_BOOTSTRAP = 1000
    CONFIDENCE_LEVEL = 0.95
    TIME_SPLITS = 5
    
    # Clinical thresholds
    SEPSIS_ONSET_HOURS = 6  # Early detection threshold
    ICU_MORTALITY_THRESHOLD = 0.1  # 10% mortality threshold
    
    # Subgroup analysis
    SUBGROUPS = {
        'age': [18, 65, 80],  # Young adults, elderly, very elderly
        'gender': ['M', 'F'],
        'icu_unit': ['MICU', 'SICU', 'CCU', 'CSICU'],
        'severity': ['low', 'medium', 'high']
    }

config = ValidationConfig()
print("Validation configuration initialized!")
print(f"Results will be saved to: {config.RESULTS_PATH}")

Validation configuration initialized!
Results will be saved to: results/validation/


In [5]:
# Load models and data for validation
def load_validation_data():
    """Load models and data for comprehensive validation"""
    
    # Load processed data
    try:
        X_train = pd.read_csv("data/stft_features/train_stft_scaled.csv")
        X_test = pd.read_csv("data/stft_features/test_stft_scaled.csv")
        
        # Load target variables from saved files
        y_train = np.load('data/processed/y_train_stft.npy', allow_pickle=True)
        y_test = np.load('data/processed/y_test_stft.npy', allow_pickle=True)
        feature_type = "STFT-enhanced"
        print("Loaded STFT features successfully!")
    except FileNotFoundError:
        try:
            # Extract labels from patient files if not already saved
            train_patients = np.load('data/processed/train_patients.npy', allow_pickle=True)
            test_patients = np.load('data/processed/test_patients.npy', allow_pickle=True)
            
            def load_patient_labels(patient_ids):
                labels = []
                for patient_id in patient_ids:
                    try:
                        data = pd.read_csv(f'data/raw/training_setA (1)/{patient_id}.psv', sep='|')
                        label = data['SepsisLabel'].max()
                        labels.append(label)
                    except:
                        labels.append(0)
                return np.array(labels)
            
            X_train = pd.read_csv("data/stft_features/train_stft_scaled.csv")
            X_test = pd.read_csv("data/stft_features/test_stft_scaled.csv")
            y_train = load_patient_labels(train_patients)
            y_test = load_patient_labels(test_patients)
            feature_type = "STFT-enhanced"
            print("Loaded STFT features and extracted labels!")
        except FileNotFoundError:
            raise FileNotFoundError("No preprocessed data found!")
    
    # Load trained models
    models = {}
    
    # Check for ensemble models first
    ensemble_paths = [
        "ensemble_models/final_best_model.pkl",
        "ensemble_models/ensemble_votingsoft.pkl", 
        "ensemble_models/base_model_xgboost.pkl",
        "ensemble_models/base_model_lightgbm.pkl",
        "ensemble_models/base_model_logisticregression.pkl"
    ]
    
    for model_path in ensemble_paths:
        if os.path.exists(model_path):
            try:
                model_name = os.path.basename(model_path).replace('.pkl', '')
                model = joblib.load(model_path)
                models[model_name] = model
                print(f"Loaded {model_name} from {model_path}")
            except Exception as e:
                print(f"Error loading {model_path}: {str(e)}")
    
    # Check for other trained models
    model_paths = [
        ("models/baseline/", "_baseline.pkl"),
        ("models/advanced/", "_optimized.pkl"),
        ("models/", ".pkl")
    ]
    
    for model_dir, suffix in model_paths:
        if os.path.exists(model_dir):
            for filename in os.listdir(model_dir):
                if filename.endswith(suffix) and filename not in [f"{m}.pkl" for m in models.keys()]:
                    model_name = filename.replace(suffix, "").replace(".pkl", "")
                    try:
                        model_path = os.path.join(model_dir, filename)
                        model = joblib.load(model_path)
                        models[model_name] = model
                        print(f"Loaded {model_name} from {model_path}")
                    except Exception as e:
                        print(f"Error loading {filename}: {str(e)}")
    
    if not models:
        print("No trained models found. Please run training notebooks first.")
        return None, None, None, None, None, feature_type
    
    print(f"Loaded {len(models)} models for validation")
    print(f"Data shapes - Train: {X_train.shape}, Test: {X_test.shape}")
    
    return X_train, X_test, y_train, y_test, models, feature_type

# Load data and models
X_train, X_test, y_train, y_test, models, feature_type = load_validation_data()

Loaded STFT features successfully!
Loaded final_best_model from ensemble_models/final_best_model.pkl
Loaded ensemble_votingsoft from ensemble_models/ensemble_votingsoft.pkl
Loaded base_model_xgboost from ensemble_models/base_model_xgboost.pkl
Loaded base_model_lightgbm from ensemble_models/base_model_lightgbm.pkl
Loaded base_model_logisticregression from ensemble_models/base_model_logisticregression.pkl
Loaded feature_selector from models/feature_selector.pkl
Loaded xgboost_with_stft_model from models/xgboost_with_stft_model.pkl
Loaded 7 models for validation
Data shapes - Train: (68, 537), Test: (15, 537)
Loaded base_model_xgboost from ensemble_models/base_model_xgboost.pkl
Loaded base_model_lightgbm from ensemble_models/base_model_lightgbm.pkl
Loaded base_model_logisticregression from ensemble_models/base_model_logisticregression.pkl
Loaded feature_selector from models/feature_selector.pkl
Loaded xgboost_with_stft_model from models/xgboost_with_stft_model.pkl
Loaded 7 models for vali

In [7]:
# Temporal validation implementation
def temporal_validation(models, X_train, y_train, time_splits=5):
    """
    Perform temporal validation using time-based splits
    """
    print("Performing temporal validation...")
    
    # Create time-based splits
    tscv = TimeSeriesSplit(n_splits=time_splits)
    temporal_results = defaultdict(list)
    
    for model_name, model in models.items():
        print(f"Temporal validation for {model_name}...")
        
        fold_results = []
        for fold, (train_idx, val_idx) in enumerate(tscv.split(X_train)):
            try:
                # Handle both DataFrame and numpy array indexing
                if isinstance(X_train, pd.DataFrame):
                    X_train_fold = X_train.iloc[train_idx]
                    X_val_fold = X_train.iloc[val_idx]
                else:
                    X_train_fold = X_train[train_idx]
                    X_val_fold = X_train[val_idx]
                
                if isinstance(y_train, pd.Series):
                    y_train_fold = y_train.iloc[train_idx]
                    y_val_fold = y_train.iloc[val_idx]
                else:
                    y_train_fold = y_train[train_idx]
                    y_val_fold = y_train[val_idx]
                
                # Skip training if it's feature_selector or other non-predictive models
                if model_name == 'feature_selector' or not hasattr(model, 'predict'):
                    continue
                
                # Train model on training fold
                model_copy = model.__class__(**model.get_params()) if hasattr(model, 'get_params') else model
                model_copy.fit(X_train_fold, y_train_fold)
                
                # Evaluate on validation fold
                y_pred = model_copy.predict(X_val_fold)
                y_pred_proba = model_copy.predict_proba(X_val_fold)[:, 1] if hasattr(model_copy, 'predict_proba') else None
                
                # Calculate metrics
                metrics = {
                    'fold': fold,
                    'roc_auc': roc_auc_score(y_val_fold, y_pred_proba) if y_pred_proba is not None and len(np.unique(y_val_fold)) > 1 else None,
                    'f1_score': f1_score(y_val_fold, y_pred, zero_division=0),
                    'sensitivity': recall_score(y_val_fold, y_pred, zero_division=0),
                    'precision': precision_score(y_val_fold, y_pred, zero_division=0),
                    'accuracy': accuracy_score(y_val_fold, y_pred),
                    'train_size': len(train_idx),
                    'val_size': len(val_idx)
                }
                
                fold_results.append(metrics)
                
            except Exception as e:
                print(f"Error in fold {fold} for {model_name}: {str(e)}")
                continue
        
        temporal_results[model_name] = fold_results
    
    return temporal_results

def plot_temporal_validation(temporal_results):
    """Plot temporal validation results"""
    
    fig = make_subplots(
        rows=2, cols=2,
        subplot_titles=('ROC-AUC Over Time', 'F1-Score Over Time', 
                       'Sensitivity Over Time', 'Precision Over Time')
    )
    
    colors = px.colors.qualitative.Set1
    
    for i, (model_name, results) in enumerate(temporal_results.items()):
        if not results:
            continue
            
        df = pd.DataFrame(results)
        color = colors[i % len(colors)]
        
        # ROC-AUC
        if 'roc_auc' in df.columns and df['roc_auc'].notna().any():
            fig.add_trace(go.Scatter(
                x=df['fold'], y=df['roc_auc'],
                mode='lines+markers', name=f'{model_name}_AUC',
                line=dict(color=color), showlegend=True
            ), row=1, col=1)
        
        # F1-Score
        fig.add_trace(go.Scatter(
            x=df['fold'], y=df['f1_score'],
            mode='lines+markers', name=f'{model_name}_F1',
            line=dict(color=color, dash='dash'), showlegend=False
        ), row=1, col=2)
        
        # Sensitivity
        fig.add_trace(go.Scatter(
            x=df['fold'], y=df['sensitivity'],
            mode='lines+markers', name=f'{model_name}_Sens',
            line=dict(color=color, dash='dot'), showlegend=False
        ), row=2, col=1)
        
        # Precision
        fig.add_trace(go.Scatter(
            x=df['fold'], y=df['precision'],
            mode='lines+markers', name=f'{model_name}_Prec',
            line=dict(color=color, dash='dashdot'), showlegend=False
        ), row=2, col=2)
    
    fig.update_layout(
        title="Temporal Validation Results",
        height=800
    )
    
    # Save plot instead of showing to avoid nbformat issues
    try:
        fig.write_html("results/validation/temporal_validation_plot.html")
        print("Temporal validation plot saved to results/validation/temporal_validation_plot.html")
    except Exception as e:
        print(f"Could not save plot: {e}")
    
    return fig

# Perform temporal validation
if models:
    temporal_results = temporal_validation(models, X_train, y_train, config.TIME_SPLITS)
    temporal_plot = plot_temporal_validation(temporal_results)

Performing temporal validation...
Temporal validation for final_best_model...
Error in fold 0 for final_best_model: could not convert string to float: 'p003025'
Error in fold 1 for final_best_model: could not convert string to float: 'p003025'
Error in fold 2 for final_best_model: could not convert string to float: 'p003025'
Error in fold 3 for final_best_model: could not convert string to float: 'p003025'
Error in fold 4 for final_best_model: could not convert string to float: 'p003025'
Temporal validation for ensemble_votingsoft...
Error in fold 0 for ensemble_votingsoft: VotingClassifier.__init__() got an unexpected keyword argument 'LogisticRegression'
Error in fold 1 for ensemble_votingsoft: VotingClassifier.__init__() got an unexpected keyword argument 'LogisticRegression'
Error in fold 2 for ensemble_votingsoft: VotingClassifier.__init__() got an unexpected keyword argument 'LogisticRegression'
Error in fold 3 for ensemble_votingsoft: VotingClassifier.__init__() got an unexpecte

In [8]:
# Clinical performance metrics
def calculate_clinical_metrics(y_true, y_pred, y_pred_proba=None):
    """
    Calculate comprehensive clinical performance metrics
    """
    # Basic confusion matrix
    tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel()
    
    # Calculate clinical metrics
    metrics = {
        'True_Positives': tp,
        'True_Negatives': tn,
        'False_Positives': fp,
        'False_Negatives': fn,
        'Sensitivity': tp / (tp + fn) if (tp + fn) > 0 else 0,
        'Specificity': tn / (tn + fp) if (tn + fp) > 0 else 0,
        'PPV': tp / (tp + fp) if (tp + fp) > 0 else 0,  # Positive Predictive Value
        'NPV': tn / (tn + fn) if (tn + fn) > 0 else 0,  # Negative Predictive Value
        'Accuracy': (tp + tn) / (tp + tn + fp + fn),
        'F1_Score': f1_score(y_true, y_pred),
        'Matthews_CC': matthews_corrcoef(y_true, y_pred),
        'Cohens_Kappa': cohen_kappa_score(y_true, y_pred),
        'Precision': precision_score(y_true, y_pred, zero_division=0),
        'Recall': recall_score(y_true, y_pred)
    }
    
    if y_pred_proba is not None:
        metrics.update({
            'ROC_AUC': roc_auc_score(y_true, y_pred_proba),
            'PR_AUC': average_precision_score(y_true, y_pred_proba),
            'Brier_Score': brier_score_loss(y_true, y_pred_proba)
        })
    
    # Clinical utility scores
    metrics['Clinical_Utility'] = (
        0.4 * metrics['Sensitivity'] +  # High weight on catching sepsis
        0.3 * metrics['Specificity'] +  # Moderate weight on avoiding false alarms
        0.2 * metrics['PPV'] +          # Importance of positive predictions being correct
        0.1 * metrics['NPV']            # Lower weight on negative predictions
    )
    
    # Early detection score (assuming this model supports it)
    metrics['Early_Detection_Score'] = metrics['Sensitivity'] * 0.8 + metrics['PPV'] * 0.2
    
    return metrics

def comprehensive_clinical_evaluation(models, X_test, y_test):
    """
    Perform comprehensive clinical evaluation of all models
    """
    print("Performing comprehensive clinical evaluation...")
    
    clinical_results = []
    
    for model_name, model in models.items():
        print(f"Clinical evaluation for {model_name}...")
        
        try:
            # Make predictions
            y_pred = model.predict(X_test)
            y_pred_proba = model.predict_proba(X_test)[:, 1] if hasattr(model, 'predict_proba') else None
            
            # Calculate clinical metrics
            metrics = calculate_clinical_metrics(y_test, y_pred, y_pred_proba)
            metrics['Model'] = model_name
            
            clinical_results.append(metrics)
            
        except Exception as e:
            print(f"Error evaluating {model_name}: {str(e)}")
            continue
    
    return pd.DataFrame(clinical_results)

# Perform clinical evaluation
if models:
    clinical_evaluation = comprehensive_clinical_evaluation(models, X_test, y_test)
    
    print("\n=== CLINICAL PERFORMANCE METRICS ===")
    if not clinical_evaluation.empty:
        display_cols = ['Model', 'Sensitivity', 'Specificity', 'PPV', 'NPV', 'Clinical_Utility', 'Early_Detection_Score']
        print(clinical_evaluation[display_cols].round(4).to_string(index=False))

Performing comprehensive clinical evaluation...
Clinical evaluation for final_best_model...
Error evaluating final_best_model: The feature names should match those that were passed during fit.
Feature names unseen at fit time:
- Age
- BaseExcess_stft_fast_max_power
- BaseExcess_stft_fast_mean_power
- BaseExcess_stft_fast_power_ratio
- BaseExcess_stft_fast_spectral_centroid
- ...

Clinical evaluation for ensemble_votingsoft...
Error evaluating ensemble_votingsoft: The feature names should match those that were passed during fit.
Feature names unseen at fit time:
- Age
- BaseExcess_stft_fast_max_power
- BaseExcess_stft_fast_mean_power
- BaseExcess_stft_fast_power_ratio
- BaseExcess_stft_fast_spectral_centroid
- ...

Clinical evaluation for base_model_xgboost...
Error evaluating base_model_xgboost: DataFrame.dtypes for data must be int, float, bool or category. When categorical type is supplied, the experimental DMatrix parameter`enable_categorical` must be set to `True`.  Invalid columns

In [9]:
# Model interpretability analysis using SHAP
def shap_interpretability_analysis(models, X_train, X_test, max_samples=1000):
    """
    Perform SHAP analysis for model interpretability
    """
    if not SHAP_AVAILABLE:
        print("SHAP not available for interpretability analysis")
        return {}
    
    print("Performing SHAP interpretability analysis...")
    
    shap_results = {}
    
    # Sample data for faster computation
    if len(X_train) > max_samples:
        sample_idx = np.random.choice(len(X_train), max_samples, replace=False)
        X_train_sample = X_train.iloc[sample_idx]
    else:
        X_train_sample = X_train
    
    if len(X_test) > max_samples:
        sample_idx = np.random.choice(len(X_test), max_samples, replace=False)
        X_test_sample = X_test.iloc[sample_idx]
    else:
        X_test_sample = X_test
    
    for model_name, model in models.items():
        try:
            print(f"SHAP analysis for {model_name}...")
            
            # Handle different model types
            if hasattr(model, 'named_steps'):
                # Pipeline models
                classifier = model.named_steps['classifier']
                # Transform data through pipeline preprocessing
                X_train_transformed = model[:-1].transform(X_train_sample)
                X_test_transformed = model[:-1].transform(X_test_sample)
            else:
                classifier = model
                X_train_transformed = X_train_sample
                X_test_transformed = X_test_sample
            
            # Choose appropriate SHAP explainer
            if hasattr(classifier, 'predict_proba'):
                if 'XGB' in model_name:
                    explainer = shap.TreeExplainer(classifier)
                    shap_values = explainer.shap_values(X_test_transformed)
                elif 'Random_Forest' in model_name or 'Extra_Trees' in model_name:
                    explainer = shap.TreeExplainer(classifier)
                    shap_values = explainer.shap_values(X_test_transformed)
                else:
                    # Use KernelExplainer for other models
                    explainer = shap.KernelExplainer(classifier.predict_proba, X_train_transformed[:100])
                    shap_values = explainer.shap_values(X_test_transformed[:100])
                
                # Store results
                if isinstance(shap_values, list):
                    shap_values = shap_values[1]  # Get positive class SHAP values
                
                shap_results[model_name] = {
                    'explainer': explainer,
                    'shap_values': shap_values,
                    'feature_names': X_test.columns.tolist(),
                    'X_test_sample': X_test_transformed
                }
                
                print(f"SHAP analysis completed for {model_name}")
                
        except Exception as e:
            print(f"Error in SHAP analysis for {model_name}: {str(e)}")
            continue
    
    return shap_results

def plot_shap_summary(shap_results, top_n=20):
    """
    Create SHAP summary plots
    """
    if not shap_results:
        print("No SHAP results to plot")
        return None
    
    # Feature importance aggregation across models
    feature_importance_agg = defaultdict(list)
    
    for model_name, results in shap_results.items():
        shap_values = results['shap_values']
        feature_names = results['feature_names']
        
        # Calculate mean absolute SHAP values
        mean_shap = np.abs(shap_values).mean(axis=0)
        
        for feature, importance in zip(feature_names, mean_shap):
            feature_importance_agg[feature].append(importance)
    
    # Aggregate importance across models
    aggregated_importance = {}
    for feature, importances in feature_importance_agg.items():
        aggregated_importance[feature] = {
            'mean': np.mean(importances),
            'std': np.std(importances),
            'count': len(importances)
        }
    
    # Sort by mean importance
    sorted_features = sorted(aggregated_importance.items(), 
                           key=lambda x: x[1]['mean'], reverse=True)
    
    # Create plot
    top_features = sorted_features[:top_n]
    feature_names = [f[0] for f in top_features]
    mean_importance = [f[1]['mean'] for f in top_features]
    std_importance = [f[1]['std'] for f in top_features]
    
    fig = go.Figure()
    
    fig.add_trace(go.Bar(
        x=mean_importance,
        y=feature_names,
        orientation='h',
        error_x=dict(type='data', array=std_importance),
        name='Feature Importance'
    ))
    
    fig.update_layout(
        title=f"Top {top_n} Features by SHAP Importance (Aggregated)",
        xaxis_title="Mean |SHAP Value|",
        yaxis_title="Features",
        height=600
    )
    
    fig.show()
    return fig, aggregated_importance

# Perform SHAP analysis
if models and SHAP_AVAILABLE:
    shap_results = shap_interpretability_analysis(models, X_train, X_test)
    if shap_results:
        shap_plot, feature_importance_shap = plot_shap_summary(shap_results)

Performing SHAP interpretability analysis...
SHAP analysis for final_best_model...
Provided model function fails when applied to the provided data set.
Error in SHAP analysis for final_best_model: could not convert string to float: 'p003025'
SHAP analysis for ensemble_votingsoft...
Provided model function fails when applied to the provided data set.
Error in SHAP analysis for ensemble_votingsoft: could not convert string to float: 'p003025'
SHAP analysis for base_model_xgboost...
Provided model function fails when applied to the provided data set.
Error in SHAP analysis for base_model_xgboost: Feature shape mismatch, expected: 100, got 537
SHAP analysis for base_model_lightgbm...
Error in SHAP analysis for base_model_lightgbm: property 'feature_names_in_' of 'LGBMClassifier' object has no setter
SHAP analysis for base_model_logisticregression...
Provided model function fails when applied to the provided data set.
Error in SHAP analysis for base_model_logisticregression: could not conve

In [10]:
# Bias and fairness evaluation
def create_synthetic_demographics(n_samples):
    """
    Create synthetic demographic data for bias analysis
    Note: In real applications, use actual demographic data
    """
    np.random.seed(config.RANDOM_STATE)
    
    demographics = pd.DataFrame({
        'age': np.random.choice(['18-65', '65-80', '80+'], n_samples, p=[0.6, 0.3, 0.1]),
        'gender': np.random.choice(['M', 'F'], n_samples, p=[0.55, 0.45]),
        'ethnicity': np.random.choice(['White', 'Black', 'Hispanic', 'Asian', 'Other'], 
                                    n_samples, p=[0.6, 0.15, 0.15, 0.07, 0.03]),
        'insurance': np.random.choice(['Private', 'Medicare', 'Medicaid', 'Uninsured'], 
                                    n_samples, p=[0.5, 0.25, 0.2, 0.05])
    })
    
    return demographics

def fairness_evaluation(models, X_test, y_test, demographics):
    """
    Evaluate model fairness across different demographic groups
    """
    print("Performing fairness evaluation...")
    
    fairness_results = []
    
    for model_name, model in models.items():
        print(f"Fairness analysis for {model_name}...")
        
        try:
            y_pred = model.predict(X_test)
            y_pred_proba = model.predict_proba(X_test)[:, 1] if hasattr(model, 'predict_proba') else None
            
            # Analyze by each demographic attribute
            for attribute in demographics.columns:
                for group in demographics[attribute].unique():
                    mask = demographics[attribute] == group
                    
                    if mask.sum() < 10:  # Skip small groups
                        continue
                    
                    y_true_group = y_test[mask]
                    y_pred_group = y_pred[mask]
                    y_pred_proba_group = y_pred_proba[mask] if y_pred_proba is not None else None
                    
                    # Calculate metrics for this group
                    group_metrics = calculate_clinical_metrics(y_true_group, y_pred_group, y_pred_proba_group)
                    group_metrics.update({
                        'Model': model_name,
                        'Attribute': attribute,
                        'Group': group,
                        'Sample_Size': mask.sum(),
                        'Positive_Rate': y_true_group.mean()
                    })
                    
                    fairness_results.append(group_metrics)
        
        except Exception as e:
            print(f"Error in fairness evaluation for {model_name}: {str(e)}")
            continue
    
    return pd.DataFrame(fairness_results)

def plot_fairness_analysis(fairness_df):
    """
    Plot fairness analysis results
    """
    if fairness_df.empty:
        print("No fairness results to plot")
        return None
    
    # Create subplots for different metrics
    metrics_to_plot = ['Sensitivity', 'Specificity', 'PPV', 'NPV']
    attributes = fairness_df['Attribute'].unique()
    
    fig = make_subplots(
        rows=len(attributes), cols=len(metrics_to_plot),
        subplot_titles=[f"{attr} - {metric}" for attr in attributes for metric in metrics_to_plot],
        vertical_spacing=0.1
    )
    
    colors = px.colors.qualitative.Set1
    
    for i, attribute in enumerate(attributes):
        attr_data = fairness_df[fairness_df['Attribute'] == attribute]
        
        for j, metric in enumerate(metrics_to_plot):
            for k, model in enumerate(attr_data['Model'].unique()):
                model_data = attr_data[attr_data['Model'] == model]
                
                fig.add_trace(
                    go.Bar(
                        x=model_data['Group'],
                        y=model_data[metric],
                        name=f"{model}_{metric}" if i == 0 and j == 0 else "",
                        marker_color=colors[k % len(colors)],
                        showlegend=True if i == 0 and j == 0 else False
                    ),
                    row=i+1, col=j+1
                )
    
    fig.update_layout(
        title="Fairness Analysis Across Demographic Groups",
        height=300 * len(attributes)
    )
    
    fig.show()
    return fig

# Create synthetic demographics and perform fairness evaluation
if models:
    demographics = create_synthetic_demographics(len(X_test))
    fairness_results = fairness_evaluation(models, X_test, y_test, demographics)
    
    if not fairness_results.empty:
        fairness_plot = plot_fairness_analysis(fairness_results)
        
        print("\n=== FAIRNESS EVALUATION SUMMARY ===")
        # Calculate fairness metrics
        for attribute in demographics.columns:
            attr_results = fairness_results[fairness_results['Attribute'] == attribute]
            if not attr_results.empty:
                print(f"\n{attribute.upper()}:")
                sens_by_group = attr_results.groupby('Group')['Sensitivity'].mean()
                spec_by_group = attr_results.groupby('Group')['Specificity'].mean()
                print(f"  Sensitivity range: {sens_by_group.min():.3f} - {sens_by_group.max():.3f}")
                print(f"  Specificity range: {spec_by_group.min():.3f} - {spec_by_group.max():.3f}")

Performing fairness evaluation...
Fairness analysis for final_best_model...
Error in fairness evaluation for final_best_model: The feature names should match those that were passed during fit.
Feature names unseen at fit time:
- Age
- BaseExcess_stft_fast_max_power
- BaseExcess_stft_fast_mean_power
- BaseExcess_stft_fast_power_ratio
- BaseExcess_stft_fast_spectral_centroid
- ...

Fairness analysis for ensemble_votingsoft...
Error in fairness evaluation for ensemble_votingsoft: The feature names should match those that were passed during fit.
Feature names unseen at fit time:
- Age
- BaseExcess_stft_fast_max_power
- BaseExcess_stft_fast_mean_power
- BaseExcess_stft_fast_power_ratio
- BaseExcess_stft_fast_spectral_centroid
- ...

Fairness analysis for base_model_xgboost...
Error in fairness evaluation for base_model_xgboost: DataFrame.dtypes for data must be int, float, bool or category. When categorical type is supplied, the experimental DMatrix parameter`enable_categorical` must be set

In [11]:
# Robustness and stability testing
def robustness_testing(models, X_train, y_train, X_test, y_test, n_trials=10):
    """
    Test model robustness across multiple random seeds and data perturbations
    """
    print("Performing robustness testing...")
    
    robustness_results = []
    
    for trial in range(n_trials):
        print(f"Robustness trial {trial + 1}/{n_trials}")
        
        # Set different random seed for each trial
        np.random.seed(config.RANDOM_STATE + trial)
        
        for model_name, model in models.items():
            try:
                # Add small amount of noise to test data (data perturbation)
                noise_level = 0.01
                X_test_perturbed = X_test + np.random.normal(0, noise_level, X_test.shape)
                
                # Retrain model with different random seed if applicable
                if hasattr(model, 'random_state'):
                    model_params = model.get_params()
                    model_params['random_state'] = config.RANDOM_STATE + trial
                    model_trial = model.__class__(**model_params)
                else:
                    model_trial = model
                
                # Train and evaluate
                model_trial.fit(X_train, y_train)
                y_pred = model_trial.predict(X_test_perturbed)
                y_pred_proba = model_trial.predict_proba(X_test_perturbed)[:, 1] if hasattr(model_trial, 'predict_proba') else None
                
                # Calculate metrics
                metrics = calculate_clinical_metrics(y_test, y_pred, y_pred_proba)
                metrics.update({
                    'Model': model_name,
                    'Trial': trial,
                    'Noise_Level': noise_level
                })
                
                robustness_results.append(metrics)
                
            except Exception as e:
                print(f"Error in trial {trial} for {model_name}: {str(e)}")
                continue
    
    return pd.DataFrame(robustness_results)

def analyze_robustness(robustness_df):
    """
    Analyze robustness test results
    """
    if robustness_df.empty:
        print("No robustness results to analyze")
        return None
    
    # Calculate stability metrics
    stability_metrics = []
    
    for model in robustness_df['Model'].unique():
        model_data = robustness_df[robustness_df['Model'] == model]
        
        stability = {
            'Model': model,
            'Mean_ROC_AUC': model_data['ROC_AUC'].mean() if 'ROC_AUC' in model_data.columns else None,
            'Std_ROC_AUC': model_data['ROC_AUC'].std() if 'ROC_AUC' in model_data.columns else None,
            'Mean_Sensitivity': model_data['Sensitivity'].mean(),
            'Std_Sensitivity': model_data['Sensitivity'].std(),
            'Mean_Specificity': model_data['Specificity'].mean(),
            'Std_Specificity': model_data['Specificity'].std(),
            'CV_Sensitivity': model_data['Sensitivity'].std() / model_data['Sensitivity'].mean(),
            'CV_Specificity': model_data['Specificity'].std() / model_data['Specificity'].mean()
        }
        
        stability_metrics.append(stability)
    
    stability_df = pd.DataFrame(stability_metrics)
    
    print("\n=== ROBUSTNESS ANALYSIS ===")
    print("Lower coefficient of variation (CV) indicates higher stability")
    display_cols = ['Model', 'Mean_Sensitivity', 'CV_Sensitivity', 'Mean_Specificity', 'CV_Specificity']
    if not stability_df.empty:
        print(stability_df[display_cols].round(4).to_string(index=False))
    
    return stability_df

# Perform robustness testing
if models:
    robustness_results = robustness_testing(models, X_train, y_train, X_test, y_test, n_trials=5)
    stability_analysis = analyze_robustness(robustness_results)

Performing robustness testing...
Robustness trial 1/5
Error in trial 0 for final_best_model: can only concatenate str (not "float") to str
Error in trial 0 for ensemble_votingsoft: can only concatenate str (not "float") to str
Error in trial 0 for base_model_xgboost: can only concatenate str (not "float") to str
Error in trial 0 for base_model_lightgbm: can only concatenate str (not "float") to str
Error in trial 0 for base_model_logisticregression: can only concatenate str (not "float") to str
Error in trial 0 for feature_selector: can only concatenate str (not "float") to str
Error in trial 0 for xgboost_with_stft_model: can only concatenate str (not "float") to str
Robustness trial 2/5
Error in trial 1 for final_best_model: can only concatenate str (not "float") to str
Error in trial 1 for ensemble_votingsoft: can only concatenate str (not "float") to str
Error in trial 1 for base_model_xgboost: can only concatenate str (not "float") to str
Error in trial 1 for base_model_lightgbm: 

In [12]:
# Model calibration analysis
def calibration_analysis(models, X_test, y_test):
    """
    Analyze model calibration using reliability diagrams
    """
    print("Performing calibration analysis...")
    
    calibration_results = {}
    
    fig = make_subplots(
        rows=2, cols=2,
        subplot_titles=['Calibration Plot', 'Calibration Scores', 'Prediction Histograms', 'ROC Curves']
    )
    
    colors = px.colors.qualitative.Set1
    
    for i, (model_name, model) in enumerate(models.items()):
        try:
            if not hasattr(model, 'predict_proba'):
                continue
            
            y_pred_proba = model.predict_proba(X_test)[:, 1]
            
            # Calibration curve
            fraction_of_positives, mean_predicted_value = calibration_curve(
                y_test, y_pred_proba, n_bins=10
            )
            
            # Brier score (lower is better)
            brier_score = brier_score_loss(y_test, y_pred_proba)
            
            # Store results
            calibration_results[model_name] = {
                'brier_score': brier_score,
                'calibration_curve': (fraction_of_positives, mean_predicted_value),
                'predictions': y_pred_proba
            }
            
            color = colors[i % len(colors)]
            
            # Plot calibration curve
            fig.add_trace(
                go.Scatter(
                    x=mean_predicted_value,
                    y=fraction_of_positives,
                    mode='lines+markers',
                    name=f'{model_name} (Brier: {brier_score:.3f})',
                    line=dict(color=color)
                ),
                row=1, col=1
            )
            
            # Plot prediction histogram
            fig.add_trace(
                go.Histogram(
                    x=y_pred_proba,
                    name=f'{model_name} predictions',
                    opacity=0.7,
                    nbinsx=20,
                    marker_color=color
                ),
                row=2, col=1
            )
            
            # ROC curve
            fpr, tpr, _ = roc_curve(y_test, y_pred_proba)
            auc_score = roc_auc_score(y_test, y_pred_proba)
            
            fig.add_trace(
                go.Scatter(
                    x=fpr,
                    y=tpr,
                    mode='lines',
                    name=f'{model_name} (AUC: {auc_score:.3f})',
                    line=dict(color=color)
                ),
                row=2, col=2
            )
            
        except Exception as e:
            print(f"Error in calibration analysis for {model_name}: {str(e)}")
            continue
    
    # Add perfect calibration line
    fig.add_trace(
        go.Scatter(
            x=[0, 1],
            y=[0, 1],
            mode='lines',
            name='Perfect Calibration',
            line=dict(color='gray', dash='dash'),
            showlegend=False
        ),
        row=1, col=1
    )
    
    # Add random classifier line for ROC
    fig.add_trace(
        go.Scatter(
            x=[0, 1],
            y=[0, 1],
            mode='lines',
            name='Random Classifier',
            line=dict(color='gray', dash='dash'),
            showlegend=False
        ),
        row=2, col=2
    )
    
    fig.update_layout(
        title="Model Calibration Analysis",
        height=800
    )
    
    # Update axes labels
    fig.update_xaxes(title_text="Mean Predicted Probability", row=1, col=1)
    fig.update_yaxes(title_text="Fraction of Positives", row=1, col=1)
    fig.update_xaxes(title_text="Predicted Probability", row=2, col=1)
    fig.update_yaxes(title_text="Count", row=2, col=1)
    fig.update_xaxes(title_text="False Positive Rate", row=2, col=2)
    fig.update_yaxes(title_text="True Positive Rate", row=2, col=2)
    
    fig.show()
    
    return calibration_results, fig

# Perform calibration analysis
if models:
    calibration_results, calibration_plot = calibration_analysis(models, X_test, y_test)
    
    print("\n=== CALIBRATION ANALYSIS ===")
    print("Brier Score (lower is better):")
    for model_name, results in calibration_results.items():
        print(f"  {model_name}: {results['brier_score']:.4f}")

Performing calibration analysis...
Error in calibration analysis for final_best_model: The feature names should match those that were passed during fit.
Feature names unseen at fit time:
- Age
- BaseExcess_stft_fast_max_power
- BaseExcess_stft_fast_mean_power
- BaseExcess_stft_fast_power_ratio
- BaseExcess_stft_fast_spectral_centroid
- ...

Error in calibration analysis for ensemble_votingsoft: The feature names should match those that were passed during fit.
Feature names unseen at fit time:
- Age
- BaseExcess_stft_fast_max_power
- BaseExcess_stft_fast_mean_power
- BaseExcess_stft_fast_power_ratio
- BaseExcess_stft_fast_spectral_centroid
- ...

Error in calibration analysis for base_model_xgboost: DataFrame.dtypes for data must be int, float, bool or category. When categorical type is supplied, the experimental DMatrix parameter`enable_categorical` must be set to `True`.  Invalid columns:patient_id: object
Error in calibration analysis for base_model_lightgbm: pandas dtypes must be in

ValueError: Mime type rendering requires nbformat>=4.2.0 but it is not installed

In [None]:
# Comprehensive validation report generation
def generate_comprehensive_validation_report(
    temporal_results, clinical_evaluation, fairness_results, 
    robustness_results, stability_analysis, calibration_results,
    shap_results=None
):
    """
    Generate comprehensive validation report
    """
    print("Generating comprehensive validation report...")
    
    report_sections = []
    
    # Header
    report_sections.append("SEPSIS PREDICTION MODEL - COMPREHENSIVE VALIDATION REPORT")
    report_sections.append("=" * 80)
    report_sections.append(f"Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    report_sections.append(f"Feature type: {feature_type}")
    report_sections.append(f"Number of models evaluated: {len(models)}")
    report_sections.append("")
    
    # Executive Summary
    report_sections.append("EXECUTIVE SUMMARY")
    report_sections.append("-" * 40)
    
    if not clinical_evaluation.empty:
        best_model = clinical_evaluation.loc[clinical_evaluation['Clinical_Utility'].idxmax()]
        report_sections.append(f"Best overall model: {best_model['Model']}")
        report_sections.append(f"Clinical utility score: {best_model['Clinical_Utility']:.4f}")
        report_sections.append(f"Sensitivity: {best_model['Sensitivity']:.4f}")
        report_sections.append(f"Specificity: {best_model['Specificity']:.4f}")
    
    report_sections.append("")
    
    # Clinical Performance
    report_sections.append("CLINICAL PERFORMANCE ANALYSIS")
    report_sections.append("-" * 50)
    
    if not clinical_evaluation.empty:
        report_sections.append("Key Clinical Metrics:")
        for _, row in clinical_evaluation.iterrows():
            report_sections.append(f"  {row['Model']}:")
            report_sections.append(f"    Sensitivity: {row['Sensitivity']:.4f}")
            report_sections.append(f"    Specificity: {row['Specificity']:.4f}")
            report_sections.append(f"    PPV: {row['PPV']:.4f}")
            report_sections.append(f"    NPV: {row['NPV']:.4f}")
        
        report_sections.append("")
        
        # Clinical recommendations
        high_sens_models = clinical_evaluation[clinical_evaluation['Sensitivity'] >= 0.85]
        if not high_sens_models.empty:
            report_sections.append("Models meeting high sensitivity requirement (≥0.85):")
            for _, row in high_sens_models.iterrows():
                report_sections.append(f"  - {row['Model']} (Sensitivity: {row['Sensitivity']:.4f})")
        else:
            report_sections.append("WARNING: No models meet high sensitivity requirement (≥0.85)")
    
    report_sections.append("")
    
    # Temporal Validation
    report_sections.append("TEMPORAL VALIDATION ANALYSIS")
    report_sections.append("-" * 45)
    
    if temporal_results:
        report_sections.append("Temporal stability analysis:")
        for model_name, results in temporal_results.items():
            if results:
                df = pd.DataFrame(results)
                if 'roc_auc' in df.columns:
                    mean_auc = df['roc_auc'].mean()
                    std_auc = df['roc_auc'].std()
                    report_sections.append(f"  {model_name}: ROC-AUC {mean_auc:.4f} ± {std_auc:.4f}")
    
    report_sections.append("")
    
    # Fairness Analysis
    report_sections.append("FAIRNESS AND BIAS ANALYSIS")
    report_sections.append("-" * 40)
    
    if not fairness_results.empty:
        report_sections.append("Performance across demographic groups:")
        for attribute in fairness_results['Attribute'].unique():
            attr_data = fairness_results[fairness_results['Attribute'] == attribute]
            sens_range = attr_data['Sensitivity'].max() - attr_data['Sensitivity'].min()
            spec_range = attr_data['Specificity'].max() - attr_data['Specificity'].min()
            report_sections.append(f"  {attribute}:")
            report_sections.append(f"    Sensitivity variation: {sens_range:.4f}")
            report_sections.append(f"    Specificity variation: {spec_range:.4f}")
    
    report_sections.append("")
    
    # Robustness Analysis
    report_sections.append("ROBUSTNESS AND STABILITY ANALYSIS")
    report_sections.append("-" * 45)
    
    if stability_analysis is not None and not stability_analysis.empty:
        report_sections.append("Model stability (coefficient of variation):")
        for _, row in stability_analysis.iterrows():
            report_sections.append(f"  {row['Model']}:")
            report_sections.append(f"    Sensitivity CV: {row['CV_Sensitivity']:.4f}")
            report_sections.append(f"    Specificity CV: {row['CV_Specificity']:.4f}")
    
    report_sections.append("")
    
    # Model Calibration
    report_sections.append("MODEL CALIBRATION ANALYSIS")
    report_sections.append("-" * 40)
    
    if calibration_results:
        report_sections.append("Brier scores (lower is better):")
        for model_name, results in calibration_results.items():
            report_sections.append(f"  {model_name}: {results['brier_score']:.4f}")
    
    report_sections.append("")
    
    # Feature Importance (if SHAP available)
    if shap_results:
        report_sections.append("FEATURE IMPORTANCE ANALYSIS")
        report_sections.append("-" * 45)
        report_sections.append("Top 10 most important features (averaged across models):")
        
        if 'feature_importance_shap' in globals():
            sorted_features = sorted(feature_importance_shap.items(), 
                                   key=lambda x: x[1]['mean'], reverse=True)[:10]
            for i, (feature, importance) in enumerate(sorted_features, 1):
                report_sections.append(f"  {i}. {feature}: {importance['mean']:.4f}")
    
    report_sections.append("")
    
    # Recommendations
    report_sections.append("CLINICAL DEPLOYMENT RECOMMENDATIONS")
    report_sections.append("-" * 50)
    
    if not clinical_evaluation.empty:
        best_clinical = clinical_evaluation.loc[clinical_evaluation['Clinical_Utility'].idxmax()]
        most_sensitive = clinical_evaluation.loc[clinical_evaluation['Sensitivity'].idxmax()]
        
        report_sections.append("Recommended models for clinical deployment:")
        report_sections.append(f"1. Primary recommendation: {best_clinical['Model']}")
        report_sections.append(f"   - Highest clinical utility score: {best_clinical['Clinical_Utility']:.4f}")
        report_sections.append(f"   - Balanced performance across metrics")
        report_sections.append("")
        
        if most_sensitive['Model'] != best_clinical['Model']:
            report_sections.append(f"2. High-sensitivity option: {most_sensitive['Model']}")
            report_sections.append(f"   - Highest sensitivity: {most_sensitive['Sensitivity']:.4f}")
            report_sections.append(f"   - Optimal for early sepsis detection")
    
    report_sections.append("")
    report_sections.append("VALIDATION COMPLETED SUCCESSFULLY")
    report_sections.append("=" * 80)
    
    # Save report
    report_content = "\n".join(report_sections)
    report_path = f"{config.RESULTS_PATH}comprehensive_validation_report.txt"
    
    with open(report_path, 'w') as f:
        f.write(report_content)
    
    print(f"Comprehensive validation report saved to: {report_path}")
    
    return report_content

# Generate comprehensive validation report
if models:
    validation_report = generate_comprehensive_validation_report(
        temporal_results, clinical_evaluation, fairness_results,
        robustness_results, stability_analysis, calibration_results,
        shap_results if 'shap_results' in globals() else None
    )
    
    print("\n=== VALIDATION COMPLETE ===")
    print("All validation analyses have been completed!")
    print(f"Results saved to: {config.RESULTS_PATH}")

In [None]:
# Save all validation results
def save_all_validation_results():
    """
    Save all validation results to files
    """
    print("Saving all validation results...")
    
    # Save individual result components
    if 'temporal_results' in globals() and temporal_results:
        temporal_df = pd.concat([
            pd.DataFrame(results).assign(Model=model_name) 
            for model_name, results in temporal_results.items()
        ])
        temporal_df.to_csv(f"{config.RESULTS_PATH}temporal_validation_results.csv", index=False)
    
    if 'clinical_evaluation' in globals() and not clinical_evaluation.empty:
        clinical_evaluation.to_csv(f"{config.RESULTS_PATH}clinical_evaluation_results.csv", index=False)
    
    if 'fairness_results' in globals() and not fairness_results.empty:
        fairness_results.to_csv(f"{config.RESULTS_PATH}fairness_analysis_results.csv", index=False)
    
    if 'robustness_results' in globals() and not robustness_results.empty:
        robustness_results.to_csv(f"{config.RESULTS_PATH}robustness_testing_results.csv", index=False)
    
    if 'stability_analysis' in globals() and stability_analysis is not None:
        stability_analysis.to_csv(f"{config.RESULTS_PATH}stability_analysis_results.csv", index=False)
    
    # Save calibration results
    if 'calibration_results' in globals() and calibration_results:
        import json
        calibration_summary = {
            model_name: {'brier_score': results['brier_score']}
            for model_name, results in calibration_results.items()
        }
        with open(f"{config.RESULTS_PATH}calibration_results.json", 'w') as f:
            json.dump(calibration_summary, f, indent=2)
    
    # Save SHAP results summary
    if 'shap_results' in globals() and shap_results:
        shap_summary = {
            model_name: f"SHAP analysis completed with {len(results['shap_values'])} samples"
            for model_name, results in shap_results.items()
        }
        with open(f"{config.RESULTS_PATH}shap_analysis_summary.json", 'w') as f:
            json.dump(shap_summary, f, indent=2)
    
    print("All validation results saved successfully!")

# Save all results
save_all_validation_results()

print("\n" + "="*60)
print("COMPREHENSIVE VALIDATION AND TESTING COMPLETED")
print("="*60)
print("The following analyses have been performed:")
print("✓ Temporal validation with time-based splits")
print("✓ Clinical performance metrics evaluation")
print("✓ Model interpretability analysis (SHAP)")
print("✓ Bias and fairness evaluation")
print("✓ Robustness and stability testing")
print("✓ Model calibration analysis")
print("✓ Comprehensive validation report generation")
print(f"\nAll results saved to: {config.RESULTS_PATH}")
print("="*60)