In [1]:
import re
import random
import copy
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import TensorDataset, DataLoader, Dataset
from sklearn.preprocessing import StandardScaler
import scipy.stats as stats
from pathlib import Path

In [None]:
def set_seed(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

set_seed(42)

In [None]:
def split_samples_random(base_folder, train_ratio=0.8, val_ratio=0.1, seed=42):
    base = Path(base_folder)
    # Gather all sample IDs
    sample_ids = sorted([
        int(d.name.split("_")[-1])
        for d in base.iterdir()
        if d.is_dir() and d.name.startswith("sample_")
    ])
    # Shuffle and split
    random.seed(seed)
    random.shuffle(sample_ids)
    n = len(sample_ids)
    n_train = int(train_ratio * n)
    n_val = int(val_ratio * n)
    train_ids = sample_ids[:n_train]
    val_ids = sample_ids[n_train:n_train + n_val]
    test_ids = sample_ids[n_train + n_val:]
    # Collect CSVs for each split
    def gather(ids):
        files = []
        for sid in ids:
            for csv in (base / f"sample_{sid:02d}").glob("*.csv"):
                files.append(csv)
        return files

    return gather(train_ids), gather(val_ids), gather(test_ids)

In [None]:
base_folder = r"C:\Users\manub\OneDrive - Arizona State University\Research\Research_new\Param_est\newSimulated_data"
train_files, val_files, test_files = split_samples_random(base_folder)

In [None]:
train_files

In [None]:
target_path = r"C:\Users\manub\OneDrive - Arizona State University\Research\Research_new\Param_est\newSimulated_data\constants.csv"

In [None]:
def load_coefs(coef_csv_path):

    df = pd.read_csv(coef_csv_path).set_index('sample_id')
    # Drop columns that are constant across all rows
    non_constant_df = df.loc[:, (df != df.iloc[0]).any()]
    coef_names = list(non_constant_df.columns)
    coef_map = {
        int(sample_id): row.values.astype(float)
        for sample_id, row in non_constant_df.iterrows()
    }
    return coef_map, coef_names

In [None]:
coef_map, coef_names = load_coefs(coef_csv_path=target_path)
coef_names

In [None]:
def load_data(file_list, constants_csv="constants.csv"):
    # Read the map and the names
    coef_map, coef_names = load_coefs(constants_csv)
    pattern = re.compile(r"sample_(\d+)_stepwise.csv$")

    X_list, y_list = [], []
    for f in file_list:
        f = Path(f)
        m = pattern.search(f.name)
        if not m:
            continue
        sid = int(m.group(1))
        if sid not in coef_map:
            continue

        df = pd.read_csv(f).drop(columns=['Time'], errors='ignore')
        if df.shape[0] != 366:
            continue

        X_list.append(df.values.astype(float))
        y_list.append(coef_map[sid])

    if not X_list:
        return np.empty((0,0,0)), np.empty((0, len(coef_names))), coef_names

    return np.stack(X_list, axis=0), np.stack(y_list, axis=0), coef_names

In [None]:
X_train, y_train, const_vals = load_data(train_files, constants_csv=target_path)
X_val,   y_val,   const_vals = load_data(val_files,   constants_csv=target_path)
X_test,  y_test,  _          = load_data(test_files,  constants_csv=target_path)

if y_train.ndim == 1:
    y_train = y_train.reshape(1, -1)
if y_test.ndim == 1:
    y_test = y_test.reshape(1, -1)
if y_val.ndim == 1:
    y_val = y_val.reshape(1, -1)

In [None]:
X_train.shape
# y_val.shape
# y_test.shape

In [None]:
# Normalize data
def standardize_features(X_train, X_test, X_val):
    N_train, T, F = X_train.shape
    N_test = X_test.shape[0]
    N_val = X_val.shape[0]
    
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train.reshape(-1, F)).reshape(N_train, T, F)
    X_test_scaled = scaler.transform(X_test.reshape(-1, F)).reshape(N_test, T, F)
    X_val_scaled = scaler.transform(X_val.reshape(-1, F)).reshape(N_val, T, F)

    return X_train_scaled, X_test_scaled, X_val_scaled, scaler

X_train, X_test, X_val, input_scaler = standardize_features(X_train, X_test, X_val)
y_train, y_test, y_val = y_train, y_test, y_val

In [None]:
class ResidualBlock(nn.Module):
    def __init__(self, channels, kernel_size=5, dropout=0.2):
        super().__init__()
        padding = kernel_size // 2
        self.conv1 = nn.Conv1d(channels, channels, kernel_size, padding=padding)
        self.conv2 = nn.Conv1d(channels, channels, kernel_size, padding=padding)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        residual = x
        out = self.conv1(x)
        out = F.relu(out)
        out = self.dropout(out)

        out = self.conv2(out)
        out = self.dropout(out)

        # skip connection
        out += residual
        return F.relu(out)


class Conv1DRegressor(nn.Module):
    def __init__(
        self,
        input_dim: int,
        num_filters: int = 128,
        kernel_size: int = 5,
        num_blocks: int = 3,
        output_dim: int = 13,
        dropout: float = 0.2
    ):

        super().__init__()

        # Initial projection from input_dim → num_filters
        self.input_proj = nn.Sequential(
            nn.Conv1d(input_dim, num_filters, kernel_size=1),
            nn.ReLU()
        )

        # Stacked residual blocks
        blocks = []
        for _ in range(num_blocks):
            blocks.append(ResidualBlock(num_filters, kernel_size, dropout))
        self.encoder = nn.Sequential(*blocks)

        # Pool to a single vector per sample
        self.global_pool = nn.AdaptiveAvgPool1d(1)

        # Head
        self.dropout = nn.Dropout(dropout)
        self.fc      = nn.Linear(num_filters, output_dim)

        self._init_weights()

    def _init_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv1d) or isinstance(m, nn.Linear):
                nn.init.kaiming_normal_(m.weight, nonlinearity='relu')
                if m.bias is not None:
                    nn.init.zeros_(m.bias)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
    
        # (batch, input_dim, seq_len)
        x = x.permute(0, 2, 1)

        # project & encode
        x = self.input_proj(x)
        x = self.encoder(x)

        # global avg pool -> (batch, num_filters, 1)
        x = self.global_pool(x)

        # -> (batch, num_filters)
        x = x.squeeze(-1)

        x = F.relu(x)
        x = self.dropout(x)

        out = self.fc(x)
        out = torch.sigmoid(out)
        return out

In [None]:
# Initialize model
batch_size = 8
lr = 1e-4
epochs = 50

In [None]:
class NoisyTensorDataset(Dataset):
    def __init__(self, X, y, noise_std=0.05, noise_type="gaussian"):
        self.X = X
        self.y = y
        self.s = noise_std
        self.kind = noise_type

    def __len__(self):
        return len(self.y)

    def __getitem__(self, idx):
        x = self.X[idx].astype(np.float32).copy()
        if self.s > 0:
            x += np.random.normal(0, self.s, size=x.shape)
        return torch.from_numpy(x), torch.from_numpy(self.y[idx].astype(np.float32))

In [None]:
train_ds = NoisyTensorDataset(
    X_train, y_train,
    noise_std=0.05,
    noise_type="gaussian"
)

val_ds = TensorDataset(torch.tensor(X_val, dtype=torch.float32),
                       torch.tensor(y_val, dtype=torch.float32))

test_ds = TensorDataset(torch.tensor(X_test, dtype=torch.float32),
                        torch.tensor(y_test, dtype=torch.float32))

train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_ds, batch_size=batch_size)
test_loader = DataLoader(test_ds, batch_size=batch_size)

In [None]:
def train_model(
    model: nn.Module,
    train_loader: torch.utils.data.DataLoader,
    val_loader: torch.utils.data.DataLoader = None,
    epochs: int = 50,
    lr: float = 1e-4,
    device: torch.device = None,
    patience: int = 10           # Early stopping patience
):
    device = device or torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)

    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    loss_fn = nn.MSELoss()

    train_losses = []
    val_losses = []

    best_val_loss = float("inf")
    best_model_state = None
    patience_counter = 0

    for epoch in range(1, epochs + 1):
        # Training
        model.train()
        total_train_loss = 0.0

        for xb, yb in train_loader:
            xb = xb.to(device).float()
            yb = yb.to(device).float()

            preds = model(xb)
            loss = loss_fn(preds, yb)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            total_train_loss += loss.item()

        avg_train_loss = total_train_loss / len(train_loader)
        train_losses.append(avg_train_loss)

        # Validation
        if val_loader is not None:
            model.eval()
            total_val_loss = 0.0
            with torch.no_grad():
                for xb, yb in val_loader:
                    xb = xb.to(device).float()
                    yb = yb.to(device).float()
                    preds = model(xb)
                    total_val_loss += loss_fn(preds, yb).item()
            avg_val_loss = total_val_loss / len(val_loader)
            val_losses.append(avg_val_loss)
            print(f"Epoch {epoch}/{epochs} ▶ train loss: {avg_train_loss:.4f} | val loss: {avg_val_loss:.4f}")

            # Early Stopping Check
            if avg_val_loss < best_val_loss:
                best_val_loss = avg_val_loss
                best_model_state = copy.deepcopy(model.state_dict())
                patience_counter = 0
            else:
                patience_counter += 1
                if patience_counter >= patience:
                    print(f"Early stopping at epoch {epoch} (val loss did not improve for {patience} epochs)")
                    break

    # Restore best model
    if best_model_state is not None:
        model.load_state_dict(best_model_state)

    # Plot losses
    plt.figure(figsize=(6, 4))
    plt.plot(train_losses, label="Train")
    if val_loader is not None:
        plt.plot(val_losses, label="Val")
    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.legend()
    plt.title("Loss over epochs")
    plt.grid(True)
    plt.tight_layout()
    plt.show()

    return train_losses, val_losses

In [None]:

def evaluate_model(model, test_loader, n_mc_samples=1):
    if n_mc_samples > 1:
        model.train()  # keep dropout active
    else:
        model.eval()

    all_means = []
    all_stds = []
    all_trues = []

    with torch.no_grad():
        for xb, yb in test_loader:
            preds = []
            for _ in range(n_mc_samples):
                pred = model(xb)
                preds.append(pred.numpy())
            preds = np.stack(preds)  # (n_mc_samples, batch_size, output_dim)
            mean_pred = preds.mean(axis=0)
            std_pred = preds.std(axis=0) if n_mc_samples > 1 else np.zeros_like(mean_pred)

            all_means.append(mean_pred)
            all_stds.append(std_pred)
            all_trues.append(yb.numpy())

    all_means = np.concatenate(all_means, axis=0)
    all_stds = np.concatenate(all_stds, axis=0)
    all_trues = np.concatenate(all_trues, axis=0)

    param_names = ['lambda1_HM', 'lambda2_HM', 'lambda3_HM', 'gamma1_MH', 'gamma2_MH', 'gamma3_MH',
                    'delta1_HM', 'delta2_HM', 'delta3_HM', 'eta1_MH', 'eta2_MH', 'eta3_MH', 'p1_mortality']


    metrics = []
    for i, name in enumerate(param_names):
        true_i = all_trues[:, i]
        pred_i = all_means[:, i]
        std_i  = all_stds[:, i]
        
        mean_true = true_i.mean()
        mean_pred = pred_i.mean()
        bias      = mean_pred - mean_true

        # MSE of the mean predictor
        mse = np.mean((pred_i - true_i)**2)
        rmse = np.sqrt(mse)
        
        # Average epistemic uncertainty
        mean_std = np.mean(std_i)
        
        # Empirical coverage of the ±1σ band
        coverage = np.mean(np.abs(pred_i - true_i) <= std_i)
        
        metrics.append({
            'parameter': name,
            'Mean_True': mean_true,
            'Mean_Pred': mean_pred,
            'Bias':      bias,
            'MSE':       mse,
            'RMSE':       rmse,
            'Mean_STD':  mean_std,
            'Coverage_1σ': coverage
        })

    df_metrics = pd.DataFrame(metrics).set_index('parameter')
    print(df_metrics)

    # Save to DataFrame
    df = pd.DataFrame(all_means, columns=[f"pred_{name}" for name in param_names])
    for i, name in enumerate(param_names):
        df[f"true_{name}"] = all_trues[:, i]
        if n_mc_samples > 1:
            df[f"std_{name}"] = all_stds[:, i]  # Only include uncertainty if MC Dropout is used

    # Save to appropriate file
    csv_name = "predicted_parameters_with_uncertainty.csv" if n_mc_samples > 1 else "predicted_parameters.csv"
    df.to_csv(csv_name, index=False)

    # Uncertainty plots
    if n_mc_samples > 1:
        for i, name in enumerate(param_names):
            true_vals = all_trues[:, i]
            mean_vals = all_means[:, i]
            std_vals  = all_stds[:, i]

            plot_errorbar_chart(true_vals, mean_vals, std_vals, name)
            plot_true_vs_pred(true_vals, mean_vals, std_vals, name)
            plot_calibration_curve(true_vals, mean_vals, std_vals, name)
            plot_all_mean_bars(all_trues, all_means, param_names)
            plot_all_box_true_vs_pred(all_trues, all_means, param_names)

            # Print summary
            avg_std = np.mean(all_stds[:, i])
            print(f"[Uncertainty] {name}: Mean STD = {avg_std:.6f}")

def plot_errorbar_chart(true_vals, mean_vals, std_vals, param_name):
    x = np.arange(len(mean_vals))
    plt.figure(figsize=(10,4))
    plt.errorbar(x, mean_vals, yerr=std_vals, fmt='o', alpha=0.6, label='Predicted ±1σ')
    plt.scatter(x, true_vals, color='red', marker='x', label='True')
    plt.xlabel("Sample Index")
    plt.ylabel(param_name)
    plt.title(f"{param_name}: estimates with ±1σ")
    plt.legend()
    plt.grid(True)
    plt.tight_layout()
    plt.show()


def plot_true_vs_pred(true_vals, mean_vals, std_vals, param_name):
    plt.figure(figsize=(5,5))
    plt.errorbar(mean_vals, true_vals, xerr=std_vals, fmt='o', alpha=0.6)
    lims = [
        min(true_vals.min(), mean_vals.min()),
        max(true_vals.max(), mean_vals.max())
    ]
    plt.plot(lims, lims, '--', color='red')
    plt.xlabel("Predicted mean")
    plt.ylabel("True value")
    plt.title(f"{param_name}: True vs Predicted")
    plt.grid(True)
    plt.tight_layout()
    plt.show()


def plot_calibration_curve(true_vals, mean_vals, std_vals, param_name):
    # nominal levels and corresponding z thresholds
    levels = np.array([0.5, 0.68, 0.9, 0.95])
    z_th = stats.norm.ppf((1 + levels) / 2)

    emp_cov = []
    for z in z_th:
        within = np.abs(mean_vals - true_vals) <= z * std_vals
        emp_cov.append(np.mean(within))

    plt.figure(figsize=(5,4))
    plt.plot(levels, emp_cov, 'o-', label='Empirical')
    plt.plot(levels, levels, '--', label='Ideal')
    plt.xlabel("Nominal coverage")
    plt.ylabel("Empirical coverage")
    plt.title(f"{param_name}: Calibration curve")
    plt.legend()
    plt.grid(True)
    plt.tight_layout()
    plt.show()

def plot_all_mean_bars(all_trues, all_means, param_names):
    # compute per‐parameter stats
    means_true = all_trues.mean(axis=0)
    stds_true  = all_trues.std(axis=0)
    means_pred = all_means.mean(axis=0)
    stds_pred  = all_means.std(axis=0)

    N = len(param_names)
    x = np.arange(N)
    width = 0.35

    fig, ax = plt.subplots(figsize=(12, 6))
    ax.bar(x - width/2, means_true, width,
           yerr=stds_true, capsize=4, label='True')
    ax.bar(x + width/2, means_pred, width,
           yerr=stds_pred, capsize=4, label='Predicted')

    ax.set_xticks(x)
    ax.set_xticklabels(param_names, rotation=45, ha='right')
    ax.set_ylabel("Parameter value")
    ax.set_title("True vs. Predicted Means ±1σ for all parameters")
    ax.legend()
    ax.grid(axis='y', linestyle='--', alpha=0.5)
    plt.tight_layout()
    plt.show()


def plot_all_box_true_vs_pred(all_trues, all_means, param_names):
    N = len(param_names)
    data = []
    positions = []
    ticks = []
    width = 0.35

    # prepare data & positions
    for i in range(N):
        data.append(all_trues[:, i])
        data.append(all_means[:,  i])
        positions.append(i - width/2)
        positions.append(i + width/2)
        ticks.append(i)

    fig, ax = plt.subplots(figsize=(14, 6))
    bp = ax.boxplot(
        data,
        positions=positions,
        widths=width,
        notch=True,
        patch_artist=True,
        labels=[''] * len(data), 
        manage_ticks=False
    )

    # choose two distinct colors
    true_color = 'skyblue'
    pred_color = 'salmon'

    # color the boxes: even indices → true, odd → pred
    for j, box in enumerate(bp['boxes']):
        box.set(facecolor = true_color if (j % 2 == 0) else pred_color)

    # color the medians to stand out
    for j, med in enumerate(bp['medians']):
        med.set(color='black', linewidth=1.5)

    ax.set_xticks(np.arange(N))
    ax.set_xticklabels(param_names, rotation=45, ha='right')
    ax.legend([bp["boxes"][0], bp["boxes"][1]], ['True','Predicted'])
    ax.set_ylabel("Value")
    ax.set_title("Distributions of True vs Predicted for All Parameters")
    ax.grid(axis='y', linestyle='--', alpha=0.5)
    plt.tight_layout()
    plt.show()

In [None]:
model = Conv1DRegressor(input_dim=X_train.shape[2], output_dim=y_train.shape[1])
train_model(model, train_loader, val_loader, epochs=epochs, lr=lr)

In [None]:
evaluate_model(model, test_loader, n_mc_samples=50)