In [None]:
# -*- coding: utf-8 -*-
"""
Ordinal Deep Learning Model for T2D-Dementia Fluctuating_Decreasing Cluster
- Uses grid search to find optimal architecture and hyperparameters
- Applies SMOTE oversampling to handle class imbalance
- Implements custom ordinal regression loss with monotonic threshold constraints
- Evaluates using classification report and MAE
"""

import os
import tensorflow as tf
import shap
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.impute import SimpleImputer
from imblearn.over_sampling import SMOTE
from sklearn.metrics import classification_report, mean_absolute_error
from scipy.stats import bootstrap
import itertools

# ============================= Configuration =============================
DATA_PATH = "./data/extracted_Gradually_Increasing.csv" # (example uses GI dataset)
RANDOM_STATE = 42
BATCH_SIZE = 64
EPOCHS = 30
LEARNING_RATE = 1e-5

# ============================= 1. Data Preparation =============================
print("\n1. Loading and preprocessing data...")

# Load dataset
df = pd.read_csv(DATA_PATH)
protein_names = df.columns[9:76].tolist()
covariates_names = df.columns[2:9].tolist()
num_proteins = len(protein_names)

# Extract features and target
y = df.iloc[:, 1].values.astype(np.int32)
X_protein = df.iloc[:, 9:76].values.astype(np.float32)
X_covariates = df.iloc[:, 2:9].values.astype(np.float32)

# Handle missing values
imputer = SimpleImputer(strategy='mean')
X_protein = imputer.fit_transform(X_protein)
X_covariates = imputer.fit_transform(X_covariates)

# Standardize features
scaler = StandardScaler()
X_protein_scaled = scaler.fit_transform(X_protein)
X_covariates_scaled = scaler.fit_transform(X_covariates)
X_combined = np.hstack([X_protein_scaled, X_covariates_scaled])

# Train/test split
X_train, X_test, y_train, y_test = train_test_split(
    X_combined, y, test_size=0.2, random_state=RANDOM_STATE
)

# Apply SMOTE oversampling
sm = SMOTE(random_state=RANDOM_STATE)
X_train, y_train = sm.fit_resample(X_train, y_train)

# Further split training data into sub-train and validation for grid search
X_subtrain, X_val, y_subtrain, y_val = train_test_split(
    X_train, y_train, test_size=0.3, random_state=RANDOM_STATE
)

# Define grid search space
layer_configs = [[1024, 512, 256, 128], [512, 256, 128], [256, 128]]
dropout_rates = [0.2, 0.3, 0.4]
l2_lambdas = [0.001, 0.01, 0.1]
learning_rates = [1e-5, 5e-5, 1e-4]

# Custom monotonic constraint for thresholds
class MonotonicConstraint(tf.keras.constraints.Constraint):
    """Enforce monotonic increasing constraint on ordinal thresholds"""
    def __call__(self, w):
        return tf.cumsum(tf.nn.elu(w) + 1e-6)

# Model builder with configurable hyperparameters
def build_model(input_dim, num_classes, layer_sizes, dropout_rate, l2_lambda, learning_rate):
    class DeepOrdinal(tf.keras.Model):
        def __init__(self, input_dim, num_classes):
            super().__init__()
            self.num_classes = num_classes
            
            # Feature extraction backbone
            self.dense_stack = tf.keras.Sequential()
            for size in layer_sizes:
                self.dense_stack.add(tf.keras.layers.Dense(
                    size, activation='relu',
                    kernel_regularizer=tf.keras.regularizers.l2(l2_lambda)
                ))
                self.dense_stack.add(tf.keras.layers.Dropout(dropout_rate))
            
            # Output layer: logits before thresholds
            self.output_layer = tf.keras.layers.Dense(
                num_classes - 1,
                kernel_regularizer=tf.keras.regularizers.l2(l2_lambda)
            )
            
            # Trainable monotonic thresholds
            self.thresholds = tf.Variable(
                initial_value=tf.sort(tf.linspace(-1.0, 1.0, num_classes - 1)),
                trainable=True,
                constraint=MonotonicConstraint(),
                name="ordinal_thresholds"
            )

        def call(self, inputs):
            x = self.dense_stack(inputs)
            return self.output_layer(x)

        def custom_loss(self, y_true, y_pred):
            """Custom ordinal regression loss with cumulative logits"""
            y_true = tf.cast(tf.reshape(y_true, (-1, 1)), tf.float32)
            cum_loss = 0.0
            
            for k in range(self.num_classes - 1):
                target = tf.cast(y_true > k, tf.float32)
                logit = y_pred[:, k] - self.thresholds[k]
                
                # Numerical stability
                logit = tf.clip_by_value(logit, -10.0, 10.0)
                logit = tf.reshape(logit, (-1, 1))
                
                loss = tf.nn.sigmoid_cross_entropy_with_logits(target, logit)
                cum_loss += tf.reduce_mean(loss)
                
            return cum_loss

    model = DeepOrdinal(input_dim=input_dim, num_classes=num_classes)
    optimizer = tf.keras.optimizers.Adam(
        learning_rate=learning_rate,
        clipvalue=1.0  # Gradient clipping
    )
    model.compile(
        optimizer=optimizer,
        loss=model.custom_loss,
        metrics=['accuracy']
    )
    return model

# Grid search loop
best_score = float('inf')  # Minimize validation MAE
best_params = None
results = []  # Track all configurations

for layer_sizes, dropout_rate, l2_lambda, lr in itertools.product(
    layer_configs, dropout_rates, l2_lambdas, learning_rates
):
    print(f"Evaluating config: layers={layer_sizes}, dropout={dropout_rate}, "
          f"l2={l2_lambda}, lr={lr}")
    
    model = build_model(
        input_dim=X_subtrain.shape[1],
        num_classes=5,
        layer_sizes=layer_sizes,
        dropout_rate=dropout_rate,
        l2_lambda=l2_lambda,
        learning_rate=lr
    )
    
    # Train model
    history = model.fit(
        X_subtrain, y_subtrain,
        epochs=EPOCHS,
        batch_size=BATCH_SIZE,
        validation_data=(X_val, y_val),
        verbose=0
    )
    
    # Predict on validation set and compute MAE
    y_val_pred = model.predict(X_val)  # logits
    # Convert logits to class predictions (simple argmax after cumulative sum)
    cum_probs = tf.sigmoid(y_val_pred - model.thresholds)
    y_val_pred_classes = tf.argmax(cum_probs, axis=1).numpy()
    
    val_mae = mean_absolute_error(y_val, y_val_pred_classes)
    
    results.append({
        'layers': layer_sizes,
        'dropout': dropout_rate,
        'l2': l2_lambda,
        'lr': lr,
        'val_mae': val_mae
    })
    
    if val_mae < best_score:
        best_score = val_mae
        best_params = {
            'layers': layer_sizes,
            'dropout': dropout_rate,
            'l2': l2_lambda,
            'lr': lr
        }

# Save grid search results
pd.DataFrame(results).to_csv('./plot/Gradually_Increasing/grid_search_results.csv', index=False)

print(f"Best configuration: {best_params} with validation MAE: {best_score:.4f}")

# ============================= 2. Final Model Initialization =============================
print("\n2. Training final model with best configuration...")

final_model = build_model(
    input_dim=X_train.shape[1],
    num_classes=5,
    layer_sizes=best_params['layers'],
    dropout_rate=best_params['dropout'],
    l2_lambda=best_params['l2'],
    learning_rate=best_params['lr']
)

# ============================= 3. Final Model Training =============================
history = final_model.fit(
    X_train, y_train,
    epochs=EPOCHS,
    batch_size=BATCH_SIZE,
    validation_data=(X_test, y_test),
    verbose=2
)

# ============================= 4. Model Evaluation =============================
print("\n3. Evaluating final model...")

# Get predictions (logits → class)
val_logits = final_model.predict(X_test)
cum_probs = tf.sigmoid(val_logits - final_model.thresholds)
y_pred = tf.argmax(cum_probs, axis=1).numpy()

print("\nClassification Report:")
print(classification_report(y_test, y_pred, digits=4))

print("\nMean Absolute Error (MAE):", mean_absolute_error(y_test, y_pred))

In [None]:
# ===================== 5. Gradient-based Feature Importance (Protein Features Only) =====================
print("\n5. Computing gradient-based importance...")

@tf.function
def compute_gradients(inputs):
    with tf.GradientTape() as tape:
        tape.watch(inputs)  # Explicitly watch inputs
        preds = model(inputs)
    return tape.gradient(preds, inputs)

# Compute gradients for all training samples
X_train_tensor = tf.convert_to_tensor(X_train, dtype=tf.float32)
gradients = compute_gradients(X_train_tensor)

if gradients is None:
    raise ValueError("Gradient computation failed. Please check model input-output dependency.")

# Take absolute values and extract protein part
abs_gradients = tf.abs(gradients).numpy()  # Convert to NumPy for further processing
protein_abs_gradients = abs_gradients[:, :num_proteins]  # First num_proteins columns are proteins

# Bootstrapping function: mean importance per protein
def bootstrap_mean(data):
    return np.mean(data, axis=0)  # Mean across samples for each feature

# Perform bootstrapping (95% CI, 1000 resamples)
boot_result = bootstrap(
    (protein_abs_gradients,),
    bootstrap_mean,
    n_resamples=1000,
    random_state=RANDOM_STATE,
    method='percentile'
)

# Extract bootstrap statistics
gradient_importance_mean = boot_result.bootstrap_distribution.mean(axis=-1)
gradient_importance_se   = boot_result.standard_error
gradient_importance_ci_low  = boot_result.confidence_interval.low
gradient_importance_ci_high = boot_result.confidence_interval.high

# Diagnostic prints (optional — can be removed after verification)
print("Number of protein features:", len(protein_names))
print("Shape of gradient_importance_mean:", gradient_importance_mean.shape)
print("Shape of gradient_importance_se:", gradient_importance_se.shape)
print("Shape of gradient_importance_ci_low:", gradient_importance_ci_low.shape)
print("Shape of gradient_importance_ci_high:", gradient_importance_ci_high.shape)

# Save results to CSV with confidence intervals
pd.DataFrame({
    'Protein': protein_names,
    'Gradient Importance Mean': gradient_importance_mean,
    'Gradient Importance SE': gradient_importance_se,
    'Gradient Importance CI Lower (95%)': gradient_importance_ci_low,
    'Gradient Importance CI Upper (95%)': gradient_importance_ci_high
}).to_csv(
    './plot/Gradually_Increasing/gradient_importance_dementia_with_ci.csv',
    index=False
)

print("Gradient importance results (with 95% CI) saved to: "
      "./plot/Gradually_Increasing/gradient_importance_dementia_with_ci.csv")

In [None]:
from scipy.stats import percentileofscore

# Permutation Test Parameters
n_permutations = 1000  # Adjust for computational feasibility

# Function to compute mean absolute gradients
def compute_mean_abs_gradients(inputs):
    grads = compute_gradients(tf.convert_to_tensor(inputs, dtype=tf.float32))
    return np.mean(np.abs(grads.numpy()), axis=0)[:num_proteins]  # Protein part only

# Original mean importance
original_mean = np.mean(protein_abs_gradients, axis=0)

# Generate null distribution
null_distribution = np.zeros((n_permutations, num_proteins))
for i in range(n_permutations):
    # Permute features (columns) independently for each protein
    permuted_X = X_train.copy()
    for j in range(num_proteins):
        np.random.shuffle(permuted_X[:, j])
    null_distribution[i] = compute_mean_abs_gradients(permuted_X)

# Compute p-values (one-tailed: proportion of null means >= original)
p_values_perm = np.array([1 - (percentileofscore(null_distribution[:, j], original_mean[j]) / 100) for j in range(num_proteins)])

# Save results
pd.DataFrame({
    'Protein': protein_names,
    'Gradient Importance Mean': original_mean,
    'P-value (Permutation)': p_values_perm
}).to_csv('./plot/Gradually_Increasing/gradient_importance_permutation_test.csv', index=False)

In [None]:
# Simplified; insert after model training
from scipy.stats import norm

# Compute mean importance
original_mean = np.mean(protein_abs_gradients, axis=0)

# Approximate pivotal statistic (e.g., standardized gradient mean)
std_gradients = np.std(protein_abs_gradients, axis=0)
pivotal_stats = original_mean / (std_gradients / np.sqrt(protein_abs_gradients.shape[0]))
p_values_pivotal = 2 * (1 - norm.cdf(np.abs(pivotal_stats)))  # Two-tailed

# Save results
pd.DataFrame({
    'Protein': protein_names,
    'Pivotal Statistic': pivotal_stats,
    'P-value (Pivotal)': p_values_pivotal
}).to_csv('./plot/Gradually_Increasing/gradient_importance_pivotal_test.csv', index=False)