# DATA LOAD

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import pandas as pd
from sklearn.model_selection import GroupKFold
from sklearn.metrics import accuracy_score, mean_squared_error, confusion_matrix
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt
import seaborn as sns

all_participants_data = load_all_participants()




# FUNCTIONS

In [None]:
def prepare_data_for_pytorch(all_participants_data):

    X = all_participants_data.drop(
        columns=["perclos", "quantized_perclos", "Participant Name"]
    ).values
    
    y_binary = (all_participants_data["perclos"] > 0.5).astype(int).values
    y_ternary = all_participants_data["quantized_perclos"].values
    y_continuous = all_participants_data["perclos"].values
    
    groups = all_participants_data["Participant Name"].values
    
    return X, y_binary, y_ternary, y_continuous, groups

def create_torch_dataset(X, y, task_type):
    X_tensor = torch.FloatTensor(X)

    if task_type == 'ternary':
        y_tensor = torch.LongTensor(y)
    elif task_type == 'binary':
        y_tensor = torch.FloatTensor(y).view(-1, 1)
    else:  # continuous
        y_tensor = torch.FloatTensor(y).view(-1, 1)
        
    return torch.utils.data.TensorDataset(X_tensor, y_tensor)

def train_epoch(model, train_loader, criterion, optimizer, device, task_type):
    model.train()
    running_loss = 0.0
    
    for inputs, targets in train_loader:
        inputs = inputs.to(device)
        targets = targets.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, targets)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
    
    return running_loss / len(train_loader)

def evaluate_model(model, test_loader, criterion, device, task_type):
    model.eval()
    running_loss = 0.0
    all_predictions = []
    all_targets = []
    
    with torch.no_grad():
        for inputs, targets in test_loader:
            inputs = inputs.to(device)
            targets = targets.to(device)
            
            outputs = model(inputs)
            loss = criterion(outputs, targets)  
            running_loss += loss.item()

            if task_type == 'binary':
                preds = (outputs > 0.5).float().cpu().numpy()
                all_predictions.extend(preds)
                all_targets.extend(targets.cpu().numpy())
            elif task_type == 'ternary':
                preds = torch.argmax(outputs, dim=1).cpu().numpy()
                all_predictions.extend(preds)
                all_targets.extend(targets.cpu().numpy())
            else:  # continuous
                all_predictions.extend(outputs.cpu().numpy())
                all_targets.extend(targets.cpu().numpy())
    

    if task_type == 'binary' or task_type == 'ternary':
        accuracy = accuracy_score(np.array(all_targets).reshape(-1), 
                                 np.array(all_predictions).reshape(-1))
        return running_loss / len(test_loader), accuracy
    else:  
        rmse = np.sqrt(mean_squared_error(np.array(all_targets).reshape(-1), 
                                         np.array(all_predictions).reshape(-1)))
        return running_loss / len(test_loader), rmse

def plot_confusion_matrix(all_targets, all_predictions, task_type, model_type):
    plt.figure(figsize=(8, 6))
    cm = confusion_matrix(all_targets, all_predictions)
    if task_type == 'binary':
        labels = ["Class 0", "Class 1"]
        cmap = "Blues"
        title = f"{model_type.upper()} Binary Classification Confusion Matrix"
    else:
        labels = ["Class 0", "Class 1", "Class 2"]
        cmap = "Oranges"
        title = f"{model_type.upper()} Ternary Classification Confusion Matrix"
    sns.heatmap(cm, annot=True, fmt="d", cmap=cmap, xticklabels=labels, yticklabels=labels)
    plt.title(title)
    plt.xlabel("Predicted Label")
    plt.ylabel("True Label")
    plt.tight_layout()
    plt.show()

def plot_regression_results(all_targets, all_predictions, model_type):
    overall_rmse = np.sqrt(mean_squared_error(all_targets, all_predictions))
    print(f"\nOverall Continuous Regression RMSE: {overall_rmse:.4f}")
    plt.figure(figsize=(8, 6))
    plt.scatter(all_targets, all_predictions, alpha=0.5)
    min_val = min(np.min(all_targets), np.min(all_predictions))
    max_val = max(np.max(all_targets), np.max(all_predictions))
    plt.plot([min_val, max_val], [min_val, max_val], 'r--')
    plt.title(f"{model_type.upper()} Regression: Predicted vs Actual PERCLOS")
    plt.xlabel("Actual PERCLOS")
    plt.ylabel("Predicted PERCLOS")
    plt.tight_layout()
    plt.show()
    return overall_rmse

# MAIN

In [None]:
def run_single_model(all_participants_data, model_type='cnn', task_type='binary', 
                     batch_size=32, learning_rate=0.001, num_epochs=20):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")
    print(f"Running {model_type.upper()} model for {task_type} task")
    
    X, y_binary, y_ternary, y_continuous, groups = prepare_data_for_pytorch(all_participants_data)
    
    if task_type == 'binary':
        y = y_binary
    elif task_type == 'ternary':
        y = y_ternary
    else:
        y = y_continuous
        task_type = 'continuous'
    
    unique_groups = np.unique(groups)
    cv_loso = GroupKFold(n_splits=len(unique_groups))
    
    all_predictions = []
    all_targets = []
    fold_metrics = []
    
    for fold, (train_idx, test_idx) in enumerate(cv_loso.split(X, y, groups)):
        test_subject = unique_groups[np.where(np.isin(unique_groups, groups[test_idx]))[0][0]]
        print(f"Fold {fold+1}/{len(unique_groups)} - Testing on subject: {test_subject}")
        
        X_train, X_test = X[train_idx], X[test_idx]
        y_train, y_test = y[train_idx], y[test_idx]
        
        scaler = StandardScaler()
        scaler.fit(X_train)
        X_train = scaler.transform(X_train)
        X_test = scaler.transform(X_test)
        
        train_dataset = create_torch_dataset(X_train, y_train, task_type)
        test_dataset = create_torch_dataset(X_test, y_test, task_type)
        
        train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
        test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
        
        if task_type == 'binary':
            output_size = 1
            criterion = nn.BCELoss()
        elif task_type == 'ternary':
            output_size = 3
            criterion = nn.CrossEntropyLoss()
        else:
            output_size = 1
            criterion = nn.MSELoss()
        
        if model_type.lower() == 'cnn':
            model = CNNModel(X.shape[1], output_size, task_type).to(device)
        else:
            model = TransformerModel(X.shape[1], output_size, task_type).to(device)
        
        optimizer = optim.Adam(model.parameters(), lr=learning_rate)
        
        for epoch in range(num_epochs):
            train_loss = train_epoch(model, train_loader, criterion, optimizer, device, task_type)
            if (epoch + 1) % 5 == 0 or epoch == 0:
                val_loss, metric = evaluate_model(model, test_loader, criterion, device, task_type)
                metric_name = "Accuracy" if task_type in ['binary', 'ternary'] else "RMSE"
                print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, "
                      f"Val Loss: {val_loss:.4f}, {metric_name}: {metric:.4f}")
        
        model.eval()
        fold_preds = []
        fold_targets = []
        
        with torch.no_grad():
            for inputs, targets in test_loader:
                inputs = inputs.to(device)
                outputs = model(inputs)
                if task_type == 'binary':
                    preds = (outputs > 0.5).float().cpu().numpy()
                elif task_type == 'ternary':
                    preds = torch.argmax(outputs, dim=1).cpu().numpy()
                else:
                    preds = outputs.cpu().numpy()
                fold_preds.extend(preds)
                fold_targets.extend(targets.cpu().numpy())
            
            fold_preds_np = np.array(fold_preds).reshape(-1)
            fold_targets_np = np.array(fold_targets).reshape(-1)
            
            if task_type in ['binary', 'ternary']:
                if task_type == 'binary':
                    fold_preds_np = np.round(fold_preds_np)
                fold_acc = accuracy_score(fold_targets_np, fold_preds_np)
                fold_metrics.append((test_subject, fold_acc))
                print(f"Subject {test_subject} - Accuracy: {fold_acc:.4f}")
            else:
                fold_rmse = np.sqrt(mean_squared_error(fold_targets_np, fold_preds_np))
                fold_metrics.append((test_subject, fold_rmse))
                print(f"Subject {test_subject} - RMSE: {fold_rmse:.4f}")
        
        all_predictions.extend(fold_preds)
        all_targets.extend(fold_targets)
    
    all_predictions = np.array(all_predictions).reshape(-1)
    all_targets = np.array(all_targets).reshape(-1)
    
    if task_type in ['binary', 'ternary']:
        if task_type == 'binary':
            all_predictions = np.round(all_predictions)
        overall_acc = accuracy_score(all_targets, all_predictions)
        print(f"\nOverall {task_type.capitalize()} Classification Accuracy: {overall_acc:.4f}")
        plot_confusion_matrix(all_targets, all_predictions, task_type, model_type)
        results = {
            'model_type': model_type,
            'task_type': task_type,
            'overall_accuracy': overall_acc,
            'fold_metrics': fold_metrics,
            'predictions': all_predictions,
            'targets': all_targets,
            'confusion_matrix': confusion_matrix(all_targets, all_predictions)
        }
    else:
        overall_rmse = plot_regression_results(all_targets, all_predictions, model_type)
        results = {
            'model_type': model_type,
            'task_type': task_type,
            'overall_rmse': overall_rmse,
            'fold_metrics': fold_metrics,
            'predictions': all_predictions,
            'targets': all_targets
        }
    
    return results




# USE

In [None]:
binary_results = run_single_model(
    all_participants_data, 
    model_type='transformer',  # 'cnn' or 'transformer'
    task_type='binary',  # 'binary', 'ternary', or 'continuous'
    batch_size=64,
    learning_rate=0.0001,
    num_epochs=3
)

In [None]:
ternary_results = run_single_model(
    all_participants_data, 
    model_type='transformer',  # 'cnn' or 'transformer'
    task_type='ternary',  # 'binary', 'ternary', or 'continuous'
    batch_size=64,
    learning_rate=0.0001,
    num_epochs=3
)

In [None]:
continuous_results = run_single_model(
    all_participants_data, 
    model_type='transformer',  # 'cnn' or 'transformer'
    task_type='continuous',  # 'binary', 'ternary', or 'continuous'
    batch_size=64,
    learning_rate=0.0001,
    num_epochs=3
)