In [None]:
import pandas as pd
import numpy as np
import tensorflow as tf
import tensorflow_probability as tfp
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import sys
from scipy.stats import norm

# print("Python executable:", sys.executable)
# print("TensorFlow version:", tf.__version__)
# print("TensorFlow Probability version:", tfp.__version__)

In [None]:


# Load dataset
data = pd.read_csv(r"C:\Users\Private\Downloads\Assignment 4 2025.csv")
#data.head(10)

# Labels / features
data['event'] = 1 - data['censor']
numeric_cols = ['support_tickets', 'payment_delays', 'total_spend']
categorical_cols = ['industry_type', 'supplier_engagement']

X_numeric = data[numeric_cols].values
X_categorical = pd.get_dummies(data[categorical_cols], drop_first=True)
feature_names = numeric_cols + X_categorical.columns.tolist()
X = np.hstack((X_numeric, X_categorical.values)).astype('float32')

# Standardize
scaler = StandardScaler()
X = scaler.fit_transform(X).astype('float32')

T = data['duration'].values.astype('float32')
delta = data['event'].values.astype('float32')

# Split into train/val/test (60%/20%/20%)
X_temp, X_test, T_temp, T_test, delta_temp, delta_test = train_test_split(
    X, T, delta, test_size=0.2, random_state=42
)
X_train, X_val, T_train, T_val, delta_train, delta_val = train_test_split(
    X_temp, T_temp, delta_temp, test_size=0.25, random_state=42  # 0.25 of 80% = 20% val
)

print(f"Train size: {len(X_train)}, Val size: {len(X_val)}, Test size: {len(X_test)}")

# Convert to tf tensors for efficiency
X_train_tf = tf.convert_to_tensor(X_train)
X_val_tf = tf.convert_to_tensor(X_val)
X_test_tf = tf.convert_to_tensor(X_test)
y_train_tf = tf.convert_to_tensor(np.column_stack((T_train, delta_train)))
y_val_tf = tf.convert_to_tensor(np.column_stack((T_val, delta_val)))
y_test_tf = tf.convert_to_tensor(np.column_stack((T_test, delta_test)))

# Mixture Density Model 
def build_density_model():
    model = tf.keras.Sequential([
        tf.keras.layers.Input(shape=(1,), dtype=tf.float32),
        tf.keras.layers.Dense(64, activation='relu'),
        tf.keras.layers.Dense(64, activation='relu'),
        tf.keras.layers.Dense(15)
    ])
    return model

# Shared model builder for eta(x)
def build_eta_model(input_shape):
    inputs = tf.keras.Input(shape=(input_shape,), dtype=tf.float32)
    eta = tf.keras.layers.Dense(1, use_bias=False)(inputs)
    return tf.keras.Model(inputs, eta)

# Numeric stability constants
EPS = tf.constant(1e-10, dtype=tf.float32)
EPS_SIG = tf.constant(1e-6, dtype=tf.float32)

# 1. Mixture AFT Model
print("\n--- Training Mixture AFT Model ---")
density_model = build_density_model()
model_mixture = build_eta_model(X.shape[1])
sigma_mixture = tf.Variable(tf.constant(1.0, dtype=tf.float32), trainable=True, name='sigma_mixture')

# All trainable variables for mixture
all_trainable_mixture = (model_mixture.trainable_variables + 
                         [sigma_mixture] + 
                         density_model.trainable_variables)

optimizer_mixture = tf.keras.optimizers.Adam(learning_rate=0.001, clipnorm=1.0)

def aft_loss_mixture(y_true, y_pred):
    y_true = tf.cast(y_true, tf.float32)
    y_pred = tf.cast(y_pred, tf.float32)
    T_obs = y_true[:, 0]
    d_event = y_true[:, 1]
    T_obs = tf.maximum(T_obs, EPS)
    sig = tf.nn.softplus(sigma_mixture) + EPS
    z_scalar = (tf.math.log(T_obs) - y_pred[:, 0]) / sig
    z_scalar = tf.clip_by_value(z_scalar, -10.0, 10.0)  # Clip for stability
    params = density_model(tf.expand_dims(z_scalar, -1))
    pi_raw, mu, sigma_nn = tf.split(params, 3, axis=-1)
    pi = tf.nn.softmax(pi_raw, axis=-1)
    sigma_nn = tf.nn.softplus(sigma_nn) + EPS_SIG
    cat = tfp.distributions.Categorical(probs=pi)
    comps = tfp.distributions.Normal(loc=mu, scale=sigma_nn)
    dist = tfp.distributions.MixtureSameFamily(mixture_distribution=cat, components_distribution=comps)
    f_w = dist.prob(z_scalar)
    S_w = dist.survival_function(z_scalar)
    f_w = tf.maximum(f_w, EPS)
    S_w = tf.maximum(S_w, EPS)
    log_f = tf.math.log(f_w) - tf.math.log(sig) - tf.math.log(T_obs)
    log_S = tf.math.log(S_w)
    nll = - (d_event * log_f + (1.0 - d_event) * log_S)
    return tf.reduce_mean(nll)

@tf.function
def train_step_mixture(X_batch, y_batch):
    with tf.GradientTape() as tape:
        eta_batch = model_mixture(X_batch, training=True)
        loss = aft_loss_mixture(y_batch, eta_batch)
    grads = tape.gradient(loss, all_trainable_mixture)
    optimizer_mixture.apply_gradients(zip(grads, all_trainable_mixture))
    return loss

@tf.function
def compute_loss_mixture(X, y):
    eta = model_mixture(X, training=False)
    return aft_loss_mixture(y, eta)

# Training loop
batch_size = 32
train_losses_mixture = []
val_losses_mixture = []

for epoch in range(50):
    epoch_train_loss = 0.0
    num_batches = 0
    for i in range(0, len(X_train_tf), batch_size):
        end_i = min(i + batch_size, len(X_train_tf))
        X_batch = X_train_tf[i:end_i]
        y_batch = y_train_tf[i:end_i]
        loss = train_step_mixture(X_batch, y_batch)
        epoch_train_loss += float(loss)
        num_batches += 1
    avg_train_loss = epoch_train_loss / num_batches
    train_losses_mixture.append(avg_train_loss)
    
    val_loss = float(compute_loss_mixture(X_val_tf, y_val_tf))
    val_losses_mixture.append(val_loss)
    
    if (epoch + 1) % 10 == 0:
        print(f"Mixture Epoch {epoch+1}/50 - Train Loss: {avg_train_loss:.6f}, Val Loss: {val_loss:.6f}")

# Test loss for mixture
test_loss_mixture = float(compute_loss_mixture(X_test_tf, y_test_tf))
print(f"Mixture AFT Test NLL: {test_loss_mixture:.6f}")

# 2. Weibull AFT Model
print("\n--- Training Weibull AFT Model ---")
model_weibull = build_eta_model(X.shape[1])
rho_weibull = tf.Variable(tf.constant(1.0, dtype=tf.float32), trainable=True, name='rho_weibull')

# All trainable variables for Weibull
all_trainable_weibull = model_weibull.trainable_variables + [rho_weibull]

optimizer_weibull = tf.keras.optimizers.Adam(learning_rate=0.001, clipnorm=1.0)

def aft_loss_weibull(y_true, y_pred):
    y_true = tf.cast(y_true, tf.float32)
    y_pred = tf.cast(y_pred, tf.float32)
    T_obs = y_true[:, 0]
    d_event = y_true[:, 1]
    T_obs = tf.maximum(T_obs, EPS)
    rho = tf.nn.softplus(rho_weibull) + EPS
    eta = y_pred[:, 0]
    log_t = tf.math.log(T_obs)
    z = log_t - eta
    z = tf.clip_by_value(z, -20.0, 20.0)  # Clip for stability
    term1 = tf.exp(rho * z)
    log_f = tf.math.log(rho) + (rho - 1.0) * log_t - eta - term1
    log_S = - term1
    nll = - (d_event * log_f + (1.0 - d_event) * log_S)
    return tf.reduce_mean(nll)

@tf.function
def train_step_weibull(X_batch, y_batch):
    with tf.GradientTape() as tape:
        eta_batch = model_weibull(X_batch, training=True)
        loss = aft_loss_weibull(y_batch, eta_batch)
    grads = tape.gradient(loss, all_trainable_weibull)
    optimizer_weibull.apply_gradients(zip(grads, all_trainable_weibull))
    return loss

@tf.function
def compute_loss_weibull(X, y):
    eta = model_weibull(X, training=False)
    return aft_loss_weibull(y, eta)

# Training loop
train_losses_weibull = []
val_losses_weibull = []

for epoch in range(50):
    epoch_train_loss = 0.0
    num_batches = 0
    for i in range(0, len(X_train_tf), batch_size):
        end_i = min(i + batch_size, len(X_train_tf))
        X_batch = X_train_tf[i:end_i]
        y_batch = y_train_tf[i:end_i]
        loss = train_step_weibull(X_batch, y_batch)
        epoch_train_loss += float(loss)
        num_batches += 1
    avg_train_loss = epoch_train_loss / num_batches
    train_losses_weibull.append(avg_train_loss)
    
    val_loss = float(compute_loss_weibull(X_val_tf, y_val_tf))
    val_losses_weibull.append(val_loss)
    
    if (epoch + 1) % 10 == 0:
        print(f"Weibull Epoch {epoch+1}/50 - Train Loss: {avg_train_loss:.6f}, Val Loss: {val_loss:.6f}")

# Test loss for Weibull
test_loss_weibull = float(compute_loss_weibull(X_test_tf, y_test_tf))
print(f"Weibull AFT Test NLL: {test_loss_weibull:.6f}")

# Function to compute stds using numerical Hessian (with fixes for stability)
def compute_stds(model, scale_var, loss_fn, X_data, y_data):
    n = X_data.shape[0]
    # Save current
    beta_save = model.layers[1].kernel.numpy()
    scale_save = scale_var.numpy()
    beta_current = beta_save.flatten()
    scale_current = scale_save
    theta_current = np.concatenate([beta_current, [scale_current]])
    num_params = len(theta_current)
    beta_shape = model.layers[1].kernel.shape  # e.g., (9, 1)

    def nll_func(theta):
        beta_new = theta[:-1].reshape(beta_shape).astype(np.float32)
        scale_new = np.array(theta[-1], dtype=np.float32)
        model.layers[1].kernel.assign(beta_new)
        scale_var.assign(scale_new)
        eta = model(X_data, training=False)
        loss_value = loss_fn(y_data, eta)
        # Handle potential inf/nan
        loss_value = tf.where(tf.math.is_finite(loss_value), loss_value, tf.constant(10000.0, dtype=tf.float32))
        return float(loss_value.numpy()) if hasattr(loss_value, 'numpy') else float(loss_value)

    # Numerical Hessian of mean NLL
    eps = 1e-6  # Smaller eps for better accuracy
    hessian = np.zeros((num_params, num_params))
    for j in range(num_params):
        theta_plus_j = theta_current.copy()
        theta_plus_j[j] += eps
        score_plus = np.zeros(num_params)
        for k in range(num_params):
            theta_pk = theta_plus_j.copy()
            theta_pk[k] += eps
            nll_pk = nll_func(theta_pk)
            theta_pk[k] -= 2 * eps
            nll_pm = nll_func(theta_pk)
            score_plus[k] = (nll_pk - nll_pm) / (2 * eps)
        theta_minus_j = theta_current.copy()
        theta_minus_j[j] -= eps
        score_minus = np.zeros(num_params)
        for k in range(num_params):
            theta_mk = theta_minus_j.copy()
            theta_mk[k] += eps
            nll_mk = nll_func(theta_mk)
            theta_mk[k] -= 2 * eps
            nll_mm = nll_func(theta_mk)
            score_minus[k] = (nll_mk - nll_mm) / (2 * eps)
        for k in range(num_params):
            hessian[k, j] = (score_plus[k] - score_minus[k]) / (2 * eps)

    # Ensure hessian is symmetric and positive semi-definite
    hessian = (hessian + hessian.T) / 2  # Symmetrize
    eigvals, _ = np.linalg.eigh(hessian)
    hessian = np.maximum(hessian, 0)  # Clip negative parts roughly

    # Covariance matrix with pinv and ridge
    ridge = 1e-5 * np.trace(np.abs(hessian)) / num_params * np.eye(num_params)
    info_matrix = n * (hessian + ridge)
    cov_matrix = np.linalg.pinv(info_matrix)

    # Clip diag to non-negative
    diag_cov = np.diag(cov_matrix)
    diag_cov = np.maximum(diag_cov, 0)
    stds = np.sqrt(diag_cov)

    # Restore parameters
    model.layers[1].kernel.assign(beta_save)
    scale_var.assign(scale_save)

    return stds

# Inference for Mixture AFT
print("\n--- Mixture AFT Inference ---")
stds_mixture = compute_stds(model_mixture, sigma_mixture, aft_loss_mixture, X_train_tf, y_train_tf)

beta_mixture_flat = model_mixture.layers[1].kernel.numpy().flatten()
raw_sigma_mixture = sigma_mixture.numpy()
soft_sigma_mixture = float(tf.nn.softplus(raw_sigma_mixture))
stds_beta_mixture = stds_mixture[:-1]
std_raw_sigma_mixture = stds_mixture[-1]
deriv_softplus = float(tf.nn.sigmoid(raw_sigma_mixture))
std_soft_sigma_mixture = std_raw_sigma_mixture * deriv_softplus

print("Mixture AFT Estimated beta coefficients:")
for i, name in enumerate(feature_names):
    coef = beta_mixture_flat[i]
    se = stds_beta_mixture[i]
    if se <= 0:
        z = np.inf if coef > 0 else -np.inf
        p = 0.0
    else:
        z = coef / se
        p = 2 * (1 - norm.cdf(np.abs(z)))
    low = coef - 1.96 * se
    high = coef + 1.96 * se
    print(f"{name}: {coef:.6f} (SE={se:.6f}, z={z:.6f}, p={p:.6f}, 95% CI [{low:.6f}, {high:.6f}])")

if std_soft_sigma_mixture <= 0:
    z_sig = np.inf
    p_sig = 0.0
else:
    z_sig = soft_sigma_mixture / std_soft_sigma_mixture
    p_sig = 2 * (1 - norm.cdf(np.abs(z_sig)))
low_sig = soft_sigma_mixture - 1.96 * std_soft_sigma_mixture
high_sig = soft_sigma_mixture + 1.96 * std_soft_sigma_mixture
print(f"Mixture AFT Estimated sigma (softplus): {soft_sigma_mixture:.6f} (SE={std_soft_sigma_mixture:.6f}, z={z_sig:.6f}, p={p_sig:.6f}, 95% CI [{low_sig:.6f}, {high_sig:.6f}])")

# Inference for Weibull AFT
print("\n--- Weibull AFT Inference ---")
stds_weibull = compute_stds(model_weibull, rho_weibull, aft_loss_weibull, X_train_tf, y_train_tf)

beta_weibull_flat = model_weibull.layers[1].kernel.numpy().flatten()
raw_rho_weibull = rho_weibull.numpy()
soft_rho_weibull = float(tf.nn.softplus(raw_rho_weibull))
stds_beta_weibull = stds_weibull[:-1]
std_raw_rho_weibull = stds_weibull[-1]
deriv_softplus_rho = float(tf.nn.sigmoid(raw_rho_weibull))
std_soft_rho_weibull = std_raw_rho_weibull * deriv_softplus_rho

print("Weibull AFT Estimated beta coefficients:")
for i, name in enumerate(feature_names):
    coef = beta_weibull_flat[i]
    se = stds_beta_weibull[i]
    if se <= 0:
        z = np.inf if coef > 0 else -np.inf
        p = 0.0
    else:
        z = coef / se
        p = 2 * (1 - norm.cdf(np.abs(z)))
    low = coef - 1.96 * se
    high = coef + 1.96 * se
    print(f"{name}: {coef:.6f} (SE={se:.6f}, z={z:.6f}, p={p:.6f}, 95% CI [{low:.6f}, {high:.6f}])")

if std_soft_rho_weibull <= 0:
    z_rho = np.inf
    p_rho = 0.0
else:
    z_rho = soft_rho_weibull / std_soft_rho_weibull
    p_rho = 2 * (1 - norm.cdf(np.abs(z_rho)))
low_rho = soft_rho_weibull - 1.96 * std_soft_rho_weibull
high_rho = soft_rho_weibull + 1.96 * std_soft_rho_weibull
print(f"Weibull AFT Estimated rho (softplus): {soft_rho_weibull:.6f} (SE={std_soft_rho_weibull:.6f}, z={z_rho:.6f}, p={p_rho:.6f}, 95% CI [{low_rho:.6f}, {high_rho:.6f}])")

# Comparison
print("\n--- Model Comparison ---")
print(f"Mixture AFT Test NLL: {test_loss_mixture:.6f}")
print(f"Weibull AFT Test NLL: {test_loss_weibull:.6f}")
if test_loss_mixture < test_loss_weibull:
    print("Mixture AFT has lower test NLL (better fit).")
else:
    print("Weibull AFT has lower test NLL (better fit).")

# Plot training curves for both
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5))

ax1.plot(train_losses_mixture, label='Training Loss')
ax1.plot(val_losses_mixture, label='Validation Loss')
ax1.set_title('Mixture AFT Training Curves')
ax1.set_xlabel('Epoch')
ax1.set_ylabel('Negative Log-Likelihood')
ax1.legend()

ax2.plot(train_losses_weibull, label='Training Loss')
ax2.plot(val_losses_weibull, label='Validation Loss')
ax2.set_title('Weibull AFT Training Curves')
ax2.set_xlabel('Epoch')
ax2.set_ylabel('Negative Log-Likelihood')
ax2.legend()

plt.tight_layout()
plt.show()

# Predict median survival time (approx) on test set
# For mixture (symmetric zero-median baseline): median(T|x) ≈ exp(eta)
# For Weibull: median(T|x) = exp(eta / rho) * (log(2))^{1/rho}
eta_test_mixture = model_mixture.predict(X_test, verbose=0).astype('float32').squeeze()
T_pred_median_mixture = np.exp(eta_test_mixture)

rho_val = soft_rho_weibull
eta_test_weibull = model_weibull.predict(X_test, verbose=0).astype('float32').squeeze()
scale_factor = np.log(2) ** (1.0 / rho_val)
T_pred_median_weibull = np.exp(eta_test_weibull / rho_val) * scale_factor

print(f"\nTest set median survival predictions:")
print(f"Mixture AFT - Mean: {np.mean(T_pred_median_mixture):.6f}, Median: {np.median(T_pred_median_mixture):.6f}")
print(f"Weibull AFT - Mean: {np.mean(T_pred_median_weibull):.6f}, Median: {np.median(T_pred_median_weibull):.6f}")

**HARELLS'S C TEST**

C-index measures discrimination. The Mixture AFT has better(higher) discrimination $[0.5, 0.688]$ than the WEIBULL AFT $[0.45, 0.66]$

In [None]:
#MIXTURE AFT
# Bootstrap 95% CI for C-index (Harrell's C) on Test

# Assuming c_index is defined from previous code (e.g., concordance index function)
# If not, define it here or import from lifelines.utils or similar
from lifelines.utils import concordance_index  # Add this if needed

rng = np.random.default_rng(123)
B = 300
c_vals = []
for _ in range(B):
    idx = rng.integers(0, len(T_test), size=len(T_test))
    c_vals.append(concordance_index(T_test[idx], -eta_test_mixture[idx], delta_test[idx]))
c_vals = np.array(c_vals)
c_lo, c_hi = np.percentile(c_vals, [2.5, 97.5])
print(f"C-index 95% CI (bootstrap): [{c_lo:.3f}, {c_hi:.3f}]")

In [None]:
#WEIBULL AFT
#  Bootstrap 95% CI for C-index (Harrell's C) on Test

from lifelines.utils import concordance_index  

rng = np.random.default_rng(123)
B_W = 300
c_vals_W = []
for _ in range(B_W):
    idx = rng.integers(0, len(T_test), size=len(T_test))
    c_vals_W.append(concordance_index(T_test[idx], -eta_test_weibull[idx], delta_test[idx]))
c_vals = np.array(c_vals_W)
c_lo_W, c_hi_W = np.percentile(c_vals_W, [2.5, 97.5])
print(f"C-index 95% CI WEIBULL (bootstrap): [{c_lo_W:.3f}, {c_hi_W:.3f}]")

HELPER FUNCTIONS

In [None]:
tf_eps = tf.constant(1e-10, tf.float32)

@tf.function
def mixture_params_of_z(z_scalar):
    params = density_model(tf.expand_dims(z_scalar, -1))  # (batch, 15)
    pi_raw, mu, sigK = tf.split(params, 3, axis=-1)       # each (batch, K)
    pi   = tf.nn.softmax(pi_raw, axis=-1)
    sigK = tf.nn.softplus(sigK) + tf.constant(1e-6, tf.float32)
    return pi, mu, sigK

@tf.function
def survival_Z(z_scalar):
    pi, mu, sigK = mixture_params_of_z(z_scalar)
    cat  = tfp.distributions.Categorical(probs=pi)
    comps= tfp.distributions.Normal(loc=mu, scale=sigK)
    dist = tfp.distributions.MixtureSameFamily(mixture_distribution=cat,
                                               components_distribution=comps)
    return dist.survival_function(z_scalar)

@tf.function
def cdf_Z(z_scalar):
    pi, mu, sigK = mixture_params_of_z(z_scalar)
    cat  = tfp.distributions.Categorical(probs=pi)
    comps= tfp.distributions.Normal(loc=mu, scale=sigK)
    dist = tfp.distributions.MixtureSameFamily(mixture_distribution=cat,
                                               components_distribution=comps)
    return dist.cdf(z_scalar)

def S_T_vec(t_vec, eta_vec, sig_scalar):
    t_tf   = tf.constant(t_vec.astype('float32'))
    eta_tf = tf.constant(eta_vec.astype('float32'))
    sig_tf = tf.constant(sig_scalar, tf.float32)
    Z = (tf.math.log(t_tf)[tf.newaxis,:] - eta_tf[:,tf.newaxis]) / sig_tf
    S = survival_Z(tf.reshape(Z, [-1]))
    return tf.reshape(S, [tf.shape(eta_tf)[0], tf.shape(t_tf)[0]]).numpy()

def F_T_vec(t_vec, eta_vec, sig_scalar):
    t_tf   = tf.constant(t_vec.astype('float32'))
    eta_tf = tf.constant(eta_vec.astype('float32'))
    sig_tf = tf.constant(sig_scalar, tf.float32)
    Z = (tf.math.log(t_tf)[tf.newaxis,:] - eta_tf[:,tf.newaxis]) / sig_tf
    F = cdf_Z(tf.reshape(Z, [-1]))
    return tf.reshape(F, [tf.shape(eta_tf)[0], tf.shape(t_tf)[0]]).numpy()

eta_test = model_mixture.predict(X_test, verbose=0).squeeze()
sig_hat  = float(tf.nn.softplus(sigma_mixture).numpy())

# Time grid (focus on central region)
t_grid = np.linspace(np.percentile(T_test, 5), np.percentile(T_test, 95), 120)
S_pred = S_T_vec(t_grid, eta_test_mixture, sigma_mixture)                # shape (n_test, M)
F_pred = 1.0 - S_pred




**DYNAMIC AUC CURVE**

In [None]:
#HYBRID AFT
def dynamic_auc_at_time(t0, T, d, risk):
    # Comparable pairs: one event before t0 vs one still at risk at t0
    case = (T <= t0) & (d == 1)
    ctrl = (T > t0)
    if case.sum()==0 or ctrl.sum()==0:
        return np.nan
    r_case = risk[case][:,None]
    r_ctrl = risk[ctrl][None,:]
    cmp = (r_case > r_ctrl).mean() + 0.5*(r_case == r_ctrl).mean()
    return float(cmp)

# (12) Time-dependent AUC curve (dense horizons)
dense_times = np.linspace(np.percentile(T_test[delta_test==1], 10),
                          np.percentile(T_test[delta_test==1], 90), 15)
auc_curve = [dynamic_auc_at_time(t0, T_test, delta_test, -eta_test_mixture) for t0 in dense_times]
plt.figure()
plt.plot(dense_times, auc_curve, '-o')
plt.xlabel("Time horizon t")
plt.ylabel("Dynamic AUC")
plt.title("Dynamic AUC across time")
plt.tight_layout()

# Save
print(f"Dynamic AUC curve plot saved to: {save_path}")
plt.show()

In [None]:
#WEIBULL AFT
def dynamic_auc_at_time(t0, T, d, risk):
    # Comparable pairs: one event before t0 vs one still at risk at t0
    case = (T <= t0) & (d == 1)
    ctrl = (T > t0)
    if case.sum()==0 or ctrl.sum()==0:
        return np.nan
    r_case = risk[case][:,None]
    r_ctrl = risk[ctrl][None,:]
    cmp = (r_case > r_ctrl).mean() + 0.5*(r_case == r_ctrl).mean()
    return float(cmp)

# (12) Time-dependent AUC curve (dense horizons)
dense_times_w = np.linspace(np.percentile(T_test[delta_test==1], 10),
                          np.percentile(T_test[delta_test==1], 90), 15)
auc_curve_w = [dynamic_auc_at_time(t0, T_test, delta_test, -eta_test_weibull) for t0 in dense_times]
plt.figure()
plt.plot(dense_times_w, auc_curve_w, '-o')
plt.xlabel("Time horizon t")
plt.ylabel("Dynamic AUC")
plt.title("WEIBULL Dynamic AUC across time")
plt.tight_layout()

# Save
print(f"Dynamic AUC curve plot saved to: {save_path}")
plt.show()

COX SNELL RESIDUAL

In [None]:
# 4) Cox–Snell residuals r_i = -log S(T_i|x_i)
idx_T = np.searchsorted(t_grid, np.clip(T_test, t_grid[0], t_grid[-1]), side='left')
S_obs = S_pred[np.arange(len(T_test)), np.clip(idx_T, 0, len(t_grid)-1)]
r_cs  = -np.log(np.clip(S_obs, 1e-10, 1.0))
# KM of r_cs should follow exp(-r) if well-specified
r_grid = np.linspace(np.percentile(r_cs, 1), np.percentile(r_cs, 99), 120)
def km_curve(times, events, grid):
    order = np.argsort(times)
    t = times[order]; e = events[order].astype(bool)
    at_risk = len(t); S = 1.0; out=[]
    j=0
    for tg in grid:
        while j < len(t) and t[j] <= tg:
            S *= (at_risk - (1 if e[j] else 0)) / at_risk
            at_risk -= 1; j += 1
        out.append(S)
    return np.array(out)

S_cs = km_curve(r_cs, delta_test, r_grid)
plt.figure()
plt.plot(r_grid, S_cs, label="KM of Cox–Snell")
plt.plot(r_grid, np.exp(-r_grid), '--', label="Exp(-r) ideal")
plt.xlabel("r (Cox–Snell)"); plt.ylabel("S(r)")
plt.title("Cox–Snell residual check")
plt.legend(); plt.tight_layout()

# Save

print(f"Cox-Snell residuals plot saved to: {save_path}")
plt.show()


DECISION CURVE ANALYSIS

In [None]:
# (13) Decision Curve Analysis (Net Benefit) at horizon t*
#      Net Benefit = TP/N - FP/N * (pt/(1-pt))
#      Predicted event prob by t*: p = 1 - S(t*|x)
t_star = np.percentile(T_test[delta_test==1], 60) if (delta_test==1).sum()>=5 else np.median(T_test)
p_pred = 1.0 - S_T_vec(np.array([t_star]), eta_test_mixture, sigma_mixture)[:,0]  # per-sample prob of event by t*
pt_grid = np.linspace(0.05, 0.5, 10)
NB_model, NB_all, NB_none = [], [], []
N = len(T_test)
y_event_by_t = ((T_test <= t_star) & (delta_test==1)).astype(float)
rate = y_event_by_t.mean()
for pt in pt_grid:
    treat = (p_pred >= pt).astype(float)
    TP = ((treat==1) & (y_event_by_t==1)).sum()
    FP = ((treat==1) & (y_event_by_t==0)).sum()
    NB_model.append(TP/N - FP/N * (pt/(1-pt)))
    # Treat-all and treat-none references
    NB_all.append(rate - (1-rate) * (pt/(1-pt)))
    NB_none.append(0.0)
plt.figure()
plt.plot(pt_grid, NB_model, '-o', label='Model')
plt.plot(pt_grid, NB_all, '--', label='Treat-all')
plt.plot(pt_grid, NB_none, ':', label='Treat-none')
plt.xlabel("Threshold probability pt")
plt.ylabel("Net Benefit")
plt.title(f"Decision Curve at t*={t_star:.1f}")
plt.legend(); plt.tight_layout()

# Save

print(f"Decision curve analysis plot saved to: {save_path}")
plt.show()


PERMUTATION IMPORTANCE

In [None]:
# 7) Permutation importance (Δ Test NLL)
def dataset_nll(XA, TA, dA, batch=1024):
    n = XA.shape[0]; tot = 0.0
    for i in range(0, n, batch):
        sl = slice(i, min(i+batch, n))
        yA = np.column_stack((TA[sl], dA[sl])).astype('float32')
        y_pred = model_mixture(XA[sl], training=False)
        tot += float(aft_loss_mixture(yA, y_pred).numpy()) * (sl.stop - sl.start)
    return tot / n

base_nll = dataset_nll(X_test, T_test, delta_test)
impacts = []
rng = np.random.default_rng(42)
for j, name in enumerate(feature_names):
    Xp = X_test.copy()
    rng.shuffle(Xp[:, j])
    imp = dataset_nll(Xp, T_test, delta_test) - base_nll
    impacts.append((name, imp))
impacts.sort(key=lambda x: x[1], reverse=True)
print("\nPermutation importance (Δ Test NLL, higher = more important):")
for name, imp in impacts:
    print(f"{name:30s} {imp:+.6f}")

# Bar graph for permutation importance
plt.figure(figsize=(10, 8))
names, imps = zip(*impacts)
y_pos = np.arange(len(names))
colors = plt.cm.Blues(np.linspace(0.3, 1.0, len(names)))  # Gradient blue
bars = plt.barh(y_pos, imps, color=colors, edgecolor='navy', alpha=0.8)
plt.yticks(y_pos, names)
plt.xlabel('Δ Test NLL (higher = more important)', fontsize=12)
plt.title('Permutation Feature Importance', fontsize=14, fontweight='bold')
plt.gca().invert_yaxis()  # Highest on top
# Add value labels on bars
for i, (bar, imp) in enumerate(zip(bars, imps)):
    plt.text(bar.get_width() + max(0, imp)*0.01, bar.get_y() + bar.get_height()/2, f'{imp:.4f}', 
             ha='left' if imp > 0 else 'right', va='center', fontsize=10)
plt.grid(axis='x', linestyle='--', alpha=0.7)
plt.tight_layout()

# Save
print(f"Permutation importance plot saved to: {save_path}")
plt.show()


PARTIAL DEPENDENCE

In [None]:
# 6) Interpretability: Partial Dependence (feature effects)
#    Vary one feature, keep others at their dataset mean (test set), and plot median(T|x)
def pdp_numeric_feature(X_ref, feat_idx, grid, use_exp_eta=True):
    """
    X_ref: reference matrix (N,D) to compute an average profile (mean of others)
    feat_idx: index of numeric feature in X
    grid: values to evaluate (already on standardized scale if X is standardized)
    use_exp_eta: median(T|x) ≈ exp(eta) under zero-median baseline
    returns grid, effect
    """
    x_bar = X_ref.mean(axis=0, keepdims=True)  # (1,D)
    X_eval = np.repeat(x_bar, len(grid), axis=0)
    X_eval[:, feat_idx] = grid
    eta_eval = model_mixture.predict(X_eval, verbose=0).squeeze()
    if use_exp_eta:
        return grid, np.exp(eta_eval)
    else:
        # Full numerical median via 0.5 survival is possible, but slower.
        # You could replace with a root-finder using survival_T_at_times.
        return grid, np.exp(eta_eval)

# Build grids in standardized space for your 3 numeric features
num_idx = [feature_names.index(c) for c in ['support_tickets','payment_delays','total_spend']]
grids = []
for i in num_idx:
    v = X_test[:, i]
    g = np.linspace(np.percentile(v,5), np.percentile(v,95), 40)
    grids.append(g)

plt.figure(figsize=(10,6))
for i, (fi, g) in enumerate(zip(num_idx, grids), 1):
    gx, gy = pdp_numeric_feature(X_test, fi, g)
    plt.plot(gx, gy, label=f"PDP: {feature_names[fi]}")
plt.xlabel("Standardized feature value")
plt.ylabel("Predicted median(T|x) ≈ exp(η)")
plt.title("Partial Dependence (median time) for numeric features")
plt.legend()
plt.tight_layout()
plt.show()
