In [None]:
# Imports
import pandas as pd
import numpy as np
import torch
import random
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader

In [None]:
# Hyperparameters
batch_size = 16
n_epochs = 300
learning_rate = 0.05
seed = 42

In [None]:
# Custom train validation test set split for reproducing purposes
def train_val_test_split(X, y, training_size, val_every=10, test_every=10, test_offset=5, seed=seed):
    """
    Custom data split with absolute training size and fixed patterns for validation/test set.

    Args:
        X, y: torch tensors of equal length
        training_size (int): number of training samples to include
        val_every (int): every nth sample goes to validation set
        test_every (int): every nth sample goes to test set
        test_offset (int): offset to start test selection (e.g. every 10th starting at index 5)
        seed (int): random seed for reproducibility

    Returns:
        X_train, y_train, X_val, y_val, X_test, y_test (torch tensors)
    """

    n = len(X)
    indices = list(range(n))

    # Validation set: every nth sample starting at index 0
    val_indices = list(range(0, n, val_every))
    # Test set: every nth sample starting at offset
    test_indices = list(range(test_offset, n, test_every))

    # Remaining samples are potential training candidates
    remaining_indices = [i for i in indices if i not in val_indices + test_indices]

    # Shuffle for randomness
    random.seed(seed)
    random.shuffle(remaining_indices)

    # Cap training size to available data
    training_size = min(training_size, len(remaining_indices))
    train_indices = remaining_indices[:training_size]

    # Helper to slice tensors by indices
    def select(tensor, idxs):
        return tensor[idxs]

    X_train, y_train = select(X, train_indices), select(y, train_indices)
    X_val, y_val = select(X, val_indices), select(y, val_indices)
    X_test, y_test = select(X, test_indices), select(y, test_indices)

    return X_train, y_train, X_val, y_val, X_test, y_test


In [None]:
# Load and prepare data
df = pd.read_csv('../dataset/dummy_data.csv')
df.head()

In [None]:
# Input and target columns
X = df[["Fem_Fle(+)Ext(-)", "Fem_Var(+)Val(-)", "Fem_Int(+)Ext(-)"]].values
y = df[["EI", "VV", "LM", "AP", "PD", "PCL", "dMCL", "sMCL", "LCL", "ALL", "OPL", "PC"]].values

In [None]:
# Convert to tensors
X = torch.tensor(X, dtype=torch.float32)
y = torch.tensor(y, dtype=torch.float32)

# Split into train/val/test
X_train, y_train, X_val, y_val, X_test, y_test = train_val_test_split(X, y, training_size=80)

# Normalize y per target
y_mean = y_train.mean(dim=0)   # shape (12,)
y_std = y_train.std(dim=0)     # shape (12,)
y_train = (y_train - y_mean) / y_std
y_val   = (y_val   - y_mean) / y_std
y_test  = (y_test  - y_mean) / y_std

In [None]:
# Create tensor datasets & dataloaders
train_ds = TensorDataset(X_train, y_train)
val_ds = TensorDataset(X_val, y_val)
test_ds = TensorDataset(X_test, y_test)

train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_ds, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_ds, batch_size=batch_size, shuffle=False)

In [None]:
# Multi-task MLP model
class MultiTaskMLP(nn.Module):
    def __init__(self, input_size, shared_hidden_sizes, task_hidden_size, target_size):
        super().__init__()
        layers = []
        in_dim = input_size
        for h in shared_hidden_sizes:
            layers += [nn.Linear(in_dim, h), nn.LeakyReLU()]
            in_dim = h
        self.shared = nn.Sequential(*layers)
        self.heads = nn.ModuleList([
            nn.Sequential(
                nn.Linear(in_dim, task_hidden_size),
                nn.LeakyReLU(),
                nn.Linear(task_hidden_size, 1)
            )
            for _ in range(target_size)
        ])

    def forward(self, x):
        feat = self.shared(x)
        out = [head(feat) for head in self.heads]
        return torch.cat(out, dim=1)

In [None]:
# Initialize model, loss, optimizer
model = MultiTaskMLP(
    input_size=3,
    shared_hidden_sizes=[30, 20],
    task_hidden_size=20,
    target_size=12
)
loss_fn = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

In [None]:
# Training loop
for epoch in range(n_epochs):
    model.train()
    train_loss = 0.0
    for X_batch, y_batch in train_loader:
        optimizer.zero_grad()
        y_pred = model(X_batch)
        loss = loss_fn(y_pred, y_batch)
        loss.backward()
        optimizer.step()
        train_loss += loss.item()
    train_loss /= len(train_loader)

    # Validation
    model.eval()
    if (epoch + 1) % 10 == 0 or epoch == 0:
        model.eval()
        with torch.no_grad():
            val_loss = np.mean([
                loss_fn(model(Xb), yb).item()
                for Xb, yb in val_loader
            ])
        print(f"Epoch {epoch+1:3d} | Train Loss: {train_loss:.4f} | Val Loss: {val_loss:.4f}")

print("Training complete.")

In [None]:
# Test evaluation
model.eval()
y_true_all = []
y_pred_all = []

with torch.no_grad():
    for X_batch, y_batch in test_loader:
        y_pred = model(X_batch)
        y_true_all.append(y_batch)
        y_pred_all.append(y_pred)

# Denormalize
y_true_all = torch.cat(y_true_all, dim=0)
y_pred_all = torch.cat(y_pred_all, dim=0)
y_true_all = y_true_all * y_std + y_mean
y_pred_all = y_pred_all * y_std + y_mean

# Compute metrics per target
mse_per_target = ((y_true_all - y_pred_all) ** 2).mean(dim=0)
mae_per_target = (y_true_all - y_pred_all).abs().mean(dim=0)
print("Metrics per target:")
for i, col in enumerate(["EI","VV","LM","AP","PD","PCL","dMCL","sMCL","LCL","ALL","OPL","PC"]):
    print(f"  {col:>5s} | MSE: {mse_per_target[i]:.6f} | MAE: {mae_per_target[i]:.6f}")

# Define which are kinematics and which are ligaments
kinematic_cols = ["EI","VV","LM","AP","PD"]
ligament_cols  = ["PCL","dMCL","sMCL","LCL","ALL","OPL","PC"]

# Compute weighted total for each sample
def compute_f(df_tensor):
    df_np = df_tensor.numpy()
    kin_sum = df_np[:, :len(kinematic_cols)].sum(axis=1)
    lig_sum = df_np[:, len(kinematic_cols):].sum(axis=1)
    return kin_sum + 40 * lig_sum  # apply your weighting

# Compute metrics on weighted totals
f_true = compute_f(y_true_all)
f_pred = compute_f(y_pred_all)
mse_f = np.mean((f_true - f_pred) ** 2)
mae_f = np.mean(np.abs(f_true - f_pred))

print("Metrics on weighted sums:")
print(f"MSE: {mse_f:.6f}")
print(f"MAE: {mae_f:.6f}")