In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.preprocessing import StandardScaler
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.model_selection import StratifiedGroupKFold, GroupShuffleSplit
from sklearn.preprocessing import label_binarize
from sklearn.metrics import roc_curve, auc
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay



# Load Preprocessed Data
df = pd.read_csv(r"C:\Users\KIIT\OneDrive\Desktop\Minor_datasheet\processed_labeled_data.csv")

# Extract Features and Labels
X = df.iloc[:, 3:].values  
y_upper = df['upperbody_label'].values  
y_lower = df['lowerbody_label'].values  
groups = df['subject'].values  

# Standardize Features
scaler = StandardScaler()
X = scaler.fit_transform(X)

# Convert to Tensors
y_upper = torch.tensor(y_upper, dtype=torch.long)
y_lower = torch.tensor(y_lower, dtype=torch.long)
X = torch.tensor(X, dtype=torch.float32)

# Split Off 20% Final Test Set
gss = GroupShuffleSplit(n_splits=1, test_size=0.2, random_state=42)
train_idx, final_test_idx = next(gss.split(X, y_upper, groups))

X_final_test, y_upper_final_test, y_lower_final_test = (
    X[final_test_idx], y_upper[final_test_idx], y_lower[final_test_idx]
)
groups_final_test = groups[final_test_idx]

# Use Remaining 80% for Cross-Validation
X, y_upper, y_lower, groups = (
    X[train_idx], y_upper[train_idx], y_lower[train_idx], groups[train_idx]
)

# Define Dataset
dataset = TensorDataset(X, y_upper, y_lower)

# Define Model
class MLP(nn.Module):
    def __init__(self, input_dim, hidden_dim, num_classes_upper, num_classes_lower):
        super(MLP, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        self.fc3 = nn.Linear(hidden_dim, hidden_dim)
        self.fc_upper = nn.Linear(hidden_dim, num_classes_upper)  
        self.fc_lower = nn.Linear(hidden_dim, num_classes_lower)  
        self.relu = nn.ReLU()
    
    def forward(self, x):
        x = self.relu(self.fc1(x))
        x = self.relu(self.fc2(x))
        x = self.relu(self.fc3(x))
        return self.fc_upper(x), self.fc_lower(x)

# Model Parameters
input_dim = X.shape[1]
hidden_dim = 128
num_classes_upper = len(np.unique(y_upper))  
num_classes_lower = len(np.unique(y_lower))  

# Initialize Model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = MLP(input_dim, hidden_dim, num_classes_upper, num_classes_lower).to(device)

# Define Loss & Optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr=0.001, weight_decay=1e-4)
scheduler = optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.95)

# Lists to Track Metrics
train_losses, val_losses = [], []
train_accs, val_accs = [], []

# Stratified Group K-Fold Cross-Validation Training
num_splits = 5  # You can adjust the number of splits
sgkf = StratifiedGroupKFold(n_splits=num_splits, shuffle=True, random_state=42)

for fold, (train_idx, test_idx) in enumerate(sgkf.split(X, y_upper, groups)):

    print(f"Training Fold {fold+1}...")
    print("Unique subjects in training set:", np.unique(groups[train_idx]))
    print("Unique subjects in test set:", np.unique(groups[test_idx]))
    print("-" * 50)
    
    # Train-Test Split
    X_train, X_test = X[train_idx], X[test_idx]
    y_upper_train, y_upper_test = y_upper[train_idx], y_upper[test_idx]
    y_lower_train, y_lower_test = y_lower[train_idx], y_lower[test_idx]

    train_dataset = TensorDataset(X_train, y_upper_train, y_lower_train)
    test_dataset = TensorDataset(X_test, y_upper_test, y_lower_test)

    train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

    # Train Model
    for epoch in range(10):
        model.train()
        total_loss, correct_upper, correct_lower, total = 0.0, 0, 0, 0

        for batch in train_loader:
          
            x_batch, y_u_batch, y_l_batch = [b.to(device) for b in batch]

            optimizer.zero_grad()
            output_upper, output_lower = model(x_batch)

            loss_upper = criterion(output_upper, y_u_batch)
            loss_lower = criterion(output_lower, y_l_batch)
            loss = loss_upper + loss_lower  

            loss.backward()
            optimizer.step()
            total_loss += loss.item()

            # Compute training accuracy
            _, preds_upper = torch.max(output_upper, 1)
            _, preds_lower = torch.max(output_lower, 1)

            correct_upper += (preds_upper == y_u_batch).sum().item()
            correct_lower += (preds_lower == y_l_batch).sum().item()
            total += y_u_batch.size(0)

        scheduler.step()  
        print(f"Epoch {epoch+1} ")

    print(f"Fold {fold+1} - Upper Body Accuracy: {val_acc_upper:.2f}%, Lower Body Accuracy: {val_acc_lower:.2f}%")

print("Training Complete!")

# Save the trained model
model_path = r"C:\Users\KIIT\OneDrive\Desktop\Minor_datasheet\trained_model6.pth"
torch.save({
    'model_state_dict': model.state_dict(),
    'optimizer_state_dict': optimizer.state_dict()
}, model_path)
print("Model saved successfully!")
# === PLOT LEARNING CURVES ===
epochs = range(1, len(train_losses) + 1)
plt.figure(figsize=(10, 4))

# #  Loss Curve
# plt.subplot(1, 2, 1)
# plt.plot(epochs, train_losses, label='Training Loss', marker='o')
# plt.plot(epochs, val_losses, label='Validation Loss', marker='o')
# plt.xlabel('Epochs')
# plt.ylabel('Loss')
# plt.title('Training vs Validation Loss')
# plt.legend()
# plt.grid(True)

# #  Accuracy Curve
# plt.subplot(1, 2, 2)
# plt.plot(epochs, train_accs, label='Training Accuracy', marker='o')
# plt.plot(epochs, val_accs, label='Validation Accuracy', marker='o')
# plt.xlabel('Epochs')
# plt.ylabel('Accuracy')
# plt.title('Training vs Validation Accuracy')
# plt.legend()
# plt.grid(True)
# plt.show()

def load_model(model_path):
    checkpoint = torch.load(model_path, map_location=device)
    model.load_state_dict(checkpoint['model_state_dict'])
    optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
    model.eval()
    print("Model loaded successfully!")

# Create a test DataLoader for evaluation
test_dataset = TensorDataset(X, y_upper, y_lower)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# Initialize metric lists
train_losses, val_losses, train_accs, val_accs = [], [], [], []

# === AUC-ROC Curve ===
y_true_upper, y_scores_upper = [], []
y_true_lower, y_scores_lower = [], []


model.eval()
with torch.no_grad():
    for batch in test_loader:
        x_batch, y_u_batch, y_l_batch = [b.to(device) for b in batch]
        output_upper, output_lower = model(x_batch)
        
        y_true_upper.extend(y_u_batch.cpu().numpy())
        y_true_lower.extend(y_l_batch.cpu().numpy())
        
        y_scores_upper.extend(torch.softmax(output_upper, dim=1).cpu().numpy())
        y_scores_lower.extend(torch.softmax(output_lower, dim=1).cpu().numpy())

# Binarize labels
y_true_upper_bin = label_binarize(y_true_upper, classes=np.arange(num_classes_upper))
y_true_lower_bin = label_binarize(y_true_lower, classes=np.arange(num_classes_lower))

# Compute AUC-ROC for each class
plt.figure(figsize=(10, 5))
for i in range(num_classes_upper):
    fpr, tpr, _ = roc_curve(y_true_upper_bin[:, i], np.array(y_scores_upper)[:, i])
    plt.plot(fpr, tpr, label=f'Upper Body Class {i} AUC = {auc(fpr, tpr):.2f}')
for i in range(num_classes_lower):
    fpr, tpr, _ = roc_curve(y_true_lower_bin[:, i], np.array(y_scores_lower)[:, i])
    plt.plot(fpr, tpr, label=f'Lower Body Class {i} AUC = {auc(fpr, tpr):.2f}')
plt.plot([0, 1], [0, 1], 'k--')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('AUC-ROC Curve')
plt.legend()
plt.grid(True)
plt.show()

# # === Confusion Matrix ===
# y_pred_upper = np.argmax(y_scores_upper, axis=1)
# y_pred_lower = np.argmax(y_scores_lower, axis=1)

# cm_upper = confusion_matrix(y_true_upper, y_pred_upper)
# cm_lower = confusion_matrix(y_true_lower, y_pred_lower)

# fig, axes = plt.subplots(1, 2, figsize=(12, 5))
# ConfusionMatrixDisplay(cm_upper).plot(ax=axes[0], cmap='Blues')
# axes[0].set_title("Upper Body Confusion Matrix")
# ConfusionMatrixDisplay(cm_lower).plot(ax=axes[1], cmap='Blues')
# axes[1].set_title("Lower Body Confusion Matrix")
# plt.show()

# Final Holdout Test Evaluation
final_test_dataset = TensorDataset(X_final_test, y_upper_final_test, y_lower_final_test)
final_test_loader = DataLoader(final_test_dataset, batch_size=32, shuffle=False)

true_upper_labels = []
pred_upper_labels = []
true_lower_labels = []
pred_lower_labels = []

# Evaluate the model on the final test set
model.eval()
with torch.no_grad():
    for x_batch, y_u_batch, y_l_batch in final_test_loader:
        x_batch = x_batch.to(device)
        y_u_batch = y_u_batch.to(device)
        y_l_batch = y_l_batch.to(device)

        output_upper, output_lower = model(x_batch)

        _, preds_upper = torch.max(output_upper, 1)
        _, preds_lower = torch.max(output_lower, 1)

        true_upper_labels.extend(y_u_batch.cpu().numpy())
        pred_upper_labels.extend(preds_upper.cpu().numpy())
        true_lower_labels.extend(y_l_batch.cpu().numpy())
        pred_lower_labels.extend(preds_lower.cpu().numpy())

# Compute confusion matrices
cm_upper = confusion_matrix(true_upper_labels, pred_upper_labels)
cm_lower = confusion_matrix(true_lower_labels, pred_lower_labels)

# Plot confusion matrices
fig, axes = plt.subplots(1, 2, figsize=(12, 5))

# Upper body confusion matrix
disp_upper = ConfusionMatrixDisplay(confusion_matrix=cm_upper)
disp_upper.plot(ax=axes[0], cmap='Blues')
axes[0].set_title("Upper Body Confusion Matrix")

# Lower body confusion matrix
disp_lower = ConfusionMatrixDisplay(confusion_matrix=cm_lower)
disp_lower.plot(ax=axes[1], cmap='Blues')
axes[1].set_title("Lower Body Confusion Matrix")

plt.show()
# === PLOT LEARNING CURVES ===
epochs = range(1, len(train_losses) + 1)
plt.figure(figsize=(10, 4))

#  Loss Curve
plt.subplot(1, 2, 1)
plt.plot(epochs, train_losses, label='Training Loss', marker='o')
plt.plot(epochs, val_losses, label='Validation Loss', marker='o')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Training vs Validation Loss')
plt.legend()
plt.grid(True)

#  Accuracy Curve
plt.subplot(1, 2, 2)
plt.plot(epochs, train_accs, label='Training Accuracy', marker='o')
plt.plot(epochs, val_accs, label='Validation Accuracy', marker='o')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.title('Training vs Validation Accuracy')
plt.legend()
plt.grid(True)
plt.show()

def load_model(model_path):
    checkpoint = torch.load(model_path, map_location=device)
    model.load_state_dict(checkpoint['model_state_dict'])
    optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
    model.eval()
    print("Model loaded successfully!")

# Create a test DataLoader for evaluation
test_dataset = TensorDataset(X, y_upper, y_lower)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# Initialize metric lists
train_losses, val_losses, train_accs, val_accs = [], [], [], []

# === AUC-ROC Curve ===
y_true_upper, y_scores_upper = [], []
y_true_lower, y_scores_lower = [], []


model.eval()
with torch.no_grad():
    for batch in test_loader:
        x_batch, y_u_batch, y_l_batch = [b.to(device) for b in batch]
        output_upper, output_lower = model(x_batch)
        
        y_true_upper.extend(y_u_batch.cpu().numpy())
        y_true_lower.extend(y_l_batch.cpu().numpy())
        
        y_scores_upper.extend(torch.softmax(output_upper, dim=1).cpu().numpy())
        y_scores_lower.extend(torch.softmax(output_lower, dim=1).cpu().numpy())

# Binarize labels
y_true_upper_bin = label_binarize(y_true_upper, classes=np.arange(num_classes_upper))
y_true_lower_bin = label_binarize(y_true_lower, classes=np.arange(num_classes_lower))

# Compute AUC-ROC for each class
plt.figure(figsize=(10, 5))
for i in range(num_classes_upper):
    fpr, tpr, _ = roc_curve(y_true_upper_bin[:, i], np.array(y_scores_upper)[:, i])
    plt.plot(fpr, tpr, label=f'Upper Body Class {i} AUC = {auc(fpr, tpr):.2f}')
for i in range(num_classes_lower):
    fpr, tpr, _ = roc_curve(y_true_lower_bin[:, i], np.array(y_scores_lower)[:, i])
    plt.plot(fpr, tpr, label=f'Lower Body Class {i} AUC = {auc(fpr, tpr):.2f}')
plt.plot([0, 1], [0, 1], 'k--')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('AUC-ROC Curve')
plt.legend()
plt.grid(True)
plt.show()

# === Confusion Matrix ===
y_pred_upper = np.argmax(y_scores_upper, axis=1)
y_pred_lower = np.argmax(y_scores_lower, axis=1)

cm_upper = confusion_matrix(y_true_upper, y_pred_upper)
cm_lower = confusion_matrix(y_true_lower, y_pred_lower)

fig, axes = plt.subplots(1, 2, figsize=(12, 5))
ConfusionMatrixDisplay(cm_upper).plot(ax=axes[0], cmap='Blues')
axes[0].set_title("Upper Body Confusion Matrix")
ConfusionMatrixDisplay(cm_lower).plot(ax=axes[1], cmap='Blues')
axes[1].set_title("Lower Body Confusion Matrix")
plt.show()

# Final Holdout Test Evaluation
final_test_dataset = TensorDataset(X_final_test, y_upper_final_test, y_lower_final_test)
final_test_loader = DataLoader(final_test_dataset, batch_size=32, shuffle=False)

true_upper_labels = []
pred_upper_labels = []
true_lower_labels = []
pred_lower_labels = []

# Evaluate the model on the final test set
model.eval()
with torch.no_grad():
    for x_batch, y_u_batch, y_l_batch in final_test_loader:
        x_batch = x_batch.to(device)
        y_u_batch = y_u_batch.to(device)
        y_l_batch = y_l_batch.to(device)

        output_upper, output_lower = model(x_batch)

        _, preds_upper = torch.max(output_upper, 1)
        _, preds_lower = torch.max(output_lower, 1)

        true_upper_labels.extend(y_u_batch.cpu().numpy())
        pred_upper_labels.extend(preds_upper.cpu().numpy())
        true_lower_labels.extend(y_l_batch.cpu().numpy())
        pred_lower_labels.extend(preds_lower.cpu().numpy())

# Compute confusion matrices
cm_upper = confusion_matrix(true_upper_labels, pred_upper_labels)
cm_lower = confusion_matrix(true_lower_labels, pred_lower_labels)

# Plot confusion matrices
fig, axes = plt.subplots(1, 2, figsize=(12, 5))

# Upper body confusion matrix
disp_upper = ConfusionMatrixDisplay(confusion_matrix=cm_upper)
disp_upper.plot(ax=axes[0], cmap='Blues')
axes[0].set_title("Upper Body Confusion Matrix")

# Lower body confusion matrix
disp_lower = ConfusionMatrixDisplay(confusion_matrix=cm_lower)
disp_lower.plot(ax=axes[1], cmap='Blues')
axes[1].set_title("Lower Body Confusion Matrix")

plt.show()