In [1]:
import pandas as pd
import numpy as np

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

from sklearn.metrics import accuracy_score

import random

In [2]:
import importlib
import utils.data_processing as data_processing
importlib.reload(data_processing)
from utils.data_processing import set_seed, save_output, get_df

In [3]:
set_seed(42)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

Using device: cuda


# data processing

In [4]:
df = get_df()

In [5]:


# --- Define feature groups ---
seq_features = ['r0', 'c0', 'r1', 'c1', 'r2', 'c2', 'r3', 'c3']
static_features = ['gameLength', 'uc']
target = 'c4'

# --- One-hot encode subjects ---
subj_onehot = pd.get_dummies(df['subject'], prefix='subj')
df_static = pd.concat([df[static_features], subj_onehot], axis=1)

# --- Combine all features ---
X = pd.concat([df[seq_features], df_static], axis=1)
y = df[target]

# --- Split by subjects (to ensure unseen subjects in test) ---
subjects = df['subject'].unique()
train_subj, test_subj = train_test_split(subjects, test_size=0.2, random_state=42)

train_mask = df['subject'].isin(train_subj)
test_mask  = df['subject'].isin(test_subj)

X_train = X.loc[train_mask]
X_test  = X.loc[test_mask]
y_train = y.loc[train_mask]
y_test  = y.loc[test_mask]

# --- Separate numeric vs one-hot for scaling ---
onehot_cols = [c for c in X.columns if c.startswith('subj_')]
numeric_cols = seq_features + static_features

scaler = StandardScaler()
X_train_num = scaler.fit_transform(X_train[numeric_cols])
X_test_num  = scaler.transform(X_test[numeric_cols])

X_train_oh = X_train[onehot_cols].to_numpy()
X_test_oh  = X_test[onehot_cols].to_numpy()

# --- Combine back numeric + one-hot ---
X_train = np.hstack([X_train_num, X_train_oh])
X_test  = np.hstack([X_test_num, X_test_oh])

# --- Optional: track horizon masks for evaluation ---
h1_mask = X.loc[test_mask, 'gameLength'] == 1
h6_mask = X.loc[test_mask, 'gameLength'] == 6

print("--- Feature Data (X) ---")
print(X.head())
print("\n--- Target Data (y) ---")
print(y.head())

print(f"\nTrain set: {X_train.shape}, Test set: {X_test.shape}")
print(f"H1 samples: {h1_mask.sum()}, H6 samples: {h6_mask.sum()}")


--- Feature Data (X) ---
     r0   c0    r1   c1    r2   c2    r3   c3  gameLength  uc  ...  subj_990  \
0  42.0  0.0  45.0  1.0  42.0  1.0  18.0  1.0           6   1  ...     False   
1  67.0  0.0  57.0  1.0  56.0  0.0  50.0  1.0           6   0  ...     False   
2  37.0  1.0  48.0  0.0  23.0  0.0  39.0  1.0           6   0  ...     False   
3  58.0  1.0  51.0  0.0  28.0  0.0  47.0  1.0           1   0  ...     False   
4   4.0  1.0  30.0  0.0  11.0  1.0  37.0  0.0           1   0  ...     False   

   subj_991  subj_992  subj_993  subj_994  subj_995  subj_996  subj_997  \
0     False     False     False     False     False     False     False   
1     False     False     False     False     False     False     False   
2     False     False     False     False     False     False     False   
3     False     False     False     False     False     False     False   
4     False     False     False     False     False     False     False   

   subj_998  subj_999  
0     False     Fal

convert to tensor for pytorch

In [6]:
print(type(X_train))
print(type(y_train))

<class 'numpy.ndarray'>
<class 'pandas.core.series.Series'>


In [7]:
X_train_tensor = torch.tensor(X_train, dtype=torch.float32) # sklearn output float64, doesn't work with torch
X_test_tensor = torch.tensor(X_test, dtype=torch.float32) 

y_train_tensor = torch.tensor(y_train.to_numpy(), dtype=torch.long) # pandas series to tensor
y_test_tensor = torch.tensor(y_test.to_numpy(), dtype=torch.long)

In [8]:
train_loader = DataLoader(TensorDataset(X_train_tensor, y_train_tensor), batch_size=32, shuffle=True)
test_loader = DataLoader(TensorDataset(X_test_tensor, y_test_tensor), batch_size=32, shuffle=False)

In [9]:
# get the scaled tensor for h1 h6
# convert pandas series -> numpy array -> torch BoolTensor
h1_mask_bool = torch.tensor(h1_mask.to_numpy(), dtype=torch.bool)
h6_mask_bool = torch.tensor(h6_mask.to_numpy(), dtype=torch.bool)

X_test_h1 = X_test_tensor[h1_mask_bool]
X_test_h6 = X_test_tensor[h6_mask_bool]

y_test_h1 = y_test_tensor[h1_mask_bool]
y_test_h6 = y_test_tensor[h6_mask_bool]

test_loader_h1 = DataLoader(TensorDataset(X_test_h1, y_test_h1), batch_size=32, shuffle=False)
test_loader_h6 = DataLoader(TensorDataset(X_test_h6, y_test_h6), batch_size=32, shuffle=False)

# Model

In [10]:
from utils.models import (
    MLP
)

# train / eval

In [11]:
def train(model, train_loader, criterion, optimizer, device): 
    model.train()
    train_loss = 0.0
    correct = 0
    total = len(train_loader)
    for batch_idx, (inputs, labels) in enumerate(train_loader):
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()
        logits = model(inputs)
        loss = criterion(logits, labels)
        loss.backward()
        optimizer.step()

        preds = logits.argmax(dim=1)
        train_loss += loss.item() * labels.size(0)
        correct += (preds == labels).sum().item()

    avg_loss = train_loss / total
    accuracy = correct / total
    return accuracy, avg_loss


def test(model, test_loader, criterion, device):
    model.eval()
    correct = 0
    total_loss = 0
    total = len(test_loader.dataset)

    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            logits = model(inputs)
            loss = criterion(logits, labels)
            preds = logits.argmax(dim=1)

            correct += (preds == labels).sum().item()
            total_loss += loss.item() * labels.size(0)
            
    accuracy = correct / total
    avg_loss = total_loss / total
    return accuracy, avg_loss

# running experiment

In [16]:
input_size = X_train.shape[1]
output_size = len(np.unique(y_train))  # usually 2 for left/right choice


In [17]:
model_dict = {
    "MLP": MLP(input_size=input_size, output_size=output_size).to(device)
}

In [18]:

def train_and_evaluate(model, train_loader, test_loaders, criterion, optimizer, device, epochs):
    test_loader, test_loader_h1, test_loader_h6 = test_loaders
    train_loss_prog, train_acc_prog = [], []

    test_loss_prog, test_acc_prog = [], []

    test_acc_h1_prog, test_loss_h1_prog = [], []
    test_acc_h6_prog, test_loss_h6_prog = [], []

    epochs_without_improvement = 0 # for early stopping
    best_loss = float('inf')
    PATIENCE = 5
    final_epoch = epochs

    for epoch in range(epochs):
        train_acc, train_loss = train(model, train_loader, criterion, optimizer, device)

        test_acc, test_loss = test(model, test_loader, criterion, device)
        test_acc_h1, test_loss_h1 = test(model, test_loader_h1, criterion, device)
        test_acc_h6, test_loss_h6 = test(model, test_loader_h6, criterion, device)
        if (epoch+1) % 10 == 0 or epoch == 0:
            print(f"Epoch {epoch+1}: Loss: {test_loss:.4f} | overall: {test_acc:.4f} | H1 {test_acc_h1:.4f} | H6 {test_acc_h6:.4f}")

        train_acc_prog.append(train_acc)
        train_loss_prog.append(train_loss)

        test_loss_prog.append(test_loss)
        test_acc_prog.append(test_acc)

        test_acc_h1_prog.append(test_acc_h1)
        test_loss_h1_prog.append(test_loss_h1)

        test_acc_h6_prog.append(test_acc_h6)
        test_loss_h6_prog.append(test_loss_h6)

        # early stopping
        if test_loss < best_loss:
            best_loss = test_loss
            epochs_without_improvement = 0
        else:
            epochs_without_improvement += 1

        if epochs_without_improvement > PATIENCE: 
            print(f"Early stopping triggered: epoch {epoch+1} best_loss {best_loss:.4f}")
            final_epoch = epoch+1
            break

    return {
        "train_loss_prog": train_loss_prog,
        "train_acc_prog": train_acc_prog,
        "test_loss_prog": test_loss_prog,
        "test_acc_prog": test_acc_prog,
        "test_acc_h1_prog": test_acc_h1_prog,
        "test_loss_h1_prog": test_loss_h1_prog,
        "test_acc_h6_prog": test_acc_h6_prog,
        "test_loss_h6_prog": test_loss_h6_prog,
        "final_epoch": final_epoch
    }

In [19]:
epochs = 100
test_loaders = (test_loader, test_loader_h1, test_loader_h6)
for model_name, model in model_dict.items():
    print(f"\nTraining model: {model_name}")
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    history = train_and_evaluate(model, train_loader, test_loaders, criterion, optimizer, device, epochs=epochs)
    model_dict[model_name] = {
        "model": model,
        **history
    }



Training model: MLP
Epoch 1: Loss: 0.5078 | overall: 0.7719 | H1 0.8126 | H6 0.7312
Early stopping triggered: epoch 8 best_loss 0.5062


In [20]:

save_output(model_dict, "output_mlp_individual")

Saved model results to output_mlp_individual.json
