In [2]:
import torch
import torch.nn as nn
from torcheeg import transforms
import torch.optim as optim
from torch.utils.data import DataLoader
from torcheeg.datasets import SEEDDataset
from torcheeg.models import DGCNN
from dgcnn_gat_a import DGCNN
from torch.utils.data import random_split
from torch.utils.data import DataLoader, Subset

#TODO: dataset接受收集的数据
dataset = SEEDDataset(io_path='/Users/hanlin/Desktop/vr_locomotion/seed',  # 设置为之前保存数据的路径
                      offline_transform=transforms.BandDifferentialEntropy(band_dict={
                          "delta": [1, 4],
                          "theta": [4, 8],
                          "alpha": [8, 14],
                          "beta": [14, 31],
                          "gamma": [31, 49]
                      }),
                      online_transform=transforms.Compose([
                          transforms.ToTensor()
                      ]),
                      label_transform=transforms.Compose([
                          transforms.Select('emotion'),
                          transforms.Lambda(lambda x: x + 1)
                      ]))

  from .autonotebook import tqdm as notebook_tqdm
[2024-05-10 23:02:53] INFO (torcheeg/MainThread) 🔍 | Detected cached processing results, reading cache from /Users/hanlin/Desktop/vr_locomotion/seed.


In [3]:
# The data format the model receives
sample, label = dataset[0]
print(sample.shape)
print(label)

torch.Size([62, 5])
2


In [None]:
# TODO: Change here to use our data
dataset_using = dataset

# split
train_ratio = 0.7
val_ratio = 0.15
test_ratio = 0.15

seed = 42
torch.manual_seed(seed)
train_size = int(train_ratio * len(dataset_using))
val_size = int(val_ratio * len(dataset_using))
test_size = len(dataset_using) - train_size - val_size

train_dataset, val_dataset, test_dataset = random_split(dataset_using, [train_size, val_size, test_size])

# TODO: hyper params
batch_size = 32
num_epochs = 65
# l1_reg = 0
# l2_reg = 0
learning_rate = 0.0001

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

train_losses = []
val_losses = []

model = DGCNN(in_channels=5, num_electrodes=62, hid_channels=32, num_classes=3)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Training
for epoch in range(num_epochs):
    running_loss = 0.0
    for batch_data, batch_labels in train_loader:
        outputs = model(batch_data)
        
        ce_loss = criterion(outputs, batch_labels)
        
        # l1_reg_loss = 0
        # for param in model.parameters():
        #     l1_reg_loss += torch.sum(torch.abs(param))
        # loss = ce_loss + l1_reg * l1_reg_loss

        # l2_reg_loss = 0
        # for param in model.parameters():
        #     l2_reg_loss += torch.sum(torch.pow(param, 2))
        # loss = ce_loss + l2_reg * l2_reg_loss
        
        loss = ce_loss

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
    
    # Evaluation
    model.eval()
    val_loss = 0.0
    with torch.no_grad():
        for batch_data, batch_labels in val_loader:
            outputs = model(batch_data)
            loss = criterion(outputs, batch_labels)
            val_loss += loss.item()
    
    # Compute train and val loss
    train_loss = running_loss / len(train_loader)
    val_loss = val_loss / len(val_loader)
    
    train_losses.append(train_loss)
    val_losses.append(val_loss)
    
    print(f"Epoch [{epoch+1}/{num_epochs}], Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}")

import matplotlib.pyplot as plt
epochs = range(1, num_epochs + 1)
plt.plot(epochs, train_losses, 'b', label='Training Loss')
plt.plot(epochs, val_losses, 'r', label='Validation Loss')
# plt.title('Training and Validation Loss, l2_reg: ' + str(l2_reg))
plt.title('Training and Validation Loss, lr: ' + str(learning_rate))
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()

# eval on test
model.eval()
with torch.no_grad():
    correct = 0
    total = 0
    for batch_data, batch_labels in test_loader:
        outputs = model(batch_data)
        _, predicted = torch.max(outputs.data, 1)
        total += batch_labels.size(0)
        correct += (predicted == batch_labels).sum().item()
    accuracy = 100 * correct / total
    print(f"Test Accuracy: {accuracy:.2f}%")

In [None]:
# subject-independent experiement
import numpy as np
from sklearn.model_selection import LeaveOneOut

num_subjects = 15

# get all subject ids
subject_ids = sorted(list(set(dataset.info["subject_id"])))

loo = LeaveOneOut()

train_test_splits = []

for train_index, test_index in loo.split(subject_ids):
    train_subjects = [subject_ids[i] for i in train_index]
    test_subject = subject_ids[test_index[0]]
    
    train_indices = [i for i in range(len(dataset)) if dataset.info["subject_id"][i] in train_subjects]
    test_indices = [i for i in range(len(dataset)) if dataset.info["subject_id"][i] == test_subject]
    
    train_test_splits.append((train_indices, test_indices))

for fold, (train_indices, test_indices) in enumerate(train_test_splits):
    print(f"Fold {fold+1}/{len(train_test_splits)}")
    
    train_dataset = Subset(dataset, train_indices)
    test_dataset = Subset(dataset, test_indices)
    
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
    
    # model = ...