In [1]:
# Imports
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

In [2]:
# Files load
eda_file = '../data/eda/swell/combined/classification/combined-swell-classification-eda-dataset.csv'
hrv_file = '../data/hrv/swell/combined/classification/combined-swell-classification-hrv-dataset.csv'

eda_data = pd.read_csv(eda_file, nrows=60000)
hrv_data = pd.read_csv(hrv_file, nrows=60000)

merged_data = pd.concat([eda_data, hrv_data], axis=1)

In [3]:
# Preprocess
merged_data_cleaned = merged_data.dropna()

X = merged_data_cleaned.select_dtypes(include=[float, int]).drop(columns=['Condition Label', 'NasaTLX Label'])
y = merged_data_cleaned['Condition Label']

scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

X_tensor = torch.tensor(X_scaled, dtype=torch.float32)
y_tensor = torch.tensor(y.values, dtype=torch.long)

X_train, X_test, y_train, y_test = train_test_split(X_tensor, y_tensor, test_size=0.2, random_state=42)

X_train_cnn = X_train.unsqueeze(1)
X_test_cnn = X_test.unsqueeze(1)

class EDAHRVDataset(Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

train_dataset = EDAHRVDataset(X_train_cnn, y_train)
test_dataset = EDAHRVDataset(X_test_cnn, y_test)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

In [4]:
class EDAHRVCNN(nn.Module):
    def __init__(self, input_size, num_classes):
        super(EDAHRVCNN, self).__init__()
        self.conv1 = nn.Conv1d(in_channels=1, out_channels=32, kernel_size=3, padding=1)
        self.conv2 = nn.Conv1d(in_channels=32, out_channels=64, kernel_size=3, padding=1)
        self.pool = nn.MaxPool1d(2)
        
        self.fc1 = nn.Linear(64 * (input_size // 2 // 2), 128)
        self.fc2 = nn.Linear(128, num_classes)
        
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.5)

    def forward(self, x):
        x = self.pool(self.relu(self.conv1(x)))
        x = self.pool(self.relu(self.conv2(x)))
        x = x.view(x.size(0), -1)
        x = self.dropout(self.relu(self.fc1(x)))
        x = self.fc2(x)
        return x

input_size = X_train.shape[1]  
num_classes = len(torch.unique(y_tensor))  
model = EDAHRVCNN(input_size=input_size, num_classes=num_classes)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [5]:
# Training Loop
num_epochs = 10  
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for inputs, labels in train_loader:
        optimizer.zero_grad()
        
        outputs = model(inputs)  
        
        if labels.dim() > 1:  
            labels = labels.argmax(dim=1) 
        
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
    
    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}')

Epoch [1/10], Loss: 0.4668
Epoch [2/10], Loss: 0.3367
Epoch [3/10], Loss: 0.2836
Epoch [4/10], Loss: 0.2473
Epoch [5/10], Loss: 0.2231
Epoch [6/10], Loss: 0.2007
Epoch [7/10], Loss: 0.1862
Epoch [8/10], Loss: 0.1688
Epoch [9/10], Loss: 0.1619
Epoch [10/10], Loss: 0.1497


In [6]:
# Evaluation Loop
model.eval()
correct = 0
total = 0

with torch.no_grad():
    for inputs, labels in test_loader:
        outputs = model(inputs)
        
        if labels.dim() > 1:  
            labels = labels.argmax(dim=1)
        
        _, predicted = torch.max(outputs, 1)  
        
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

accuracy = 100 * correct / total
print(f'Test Accuracy: {accuracy:.2f}%')

Test Accuracy: 94.18%
