In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from scipy import signal
from rich.pretty import pprint
from torcheeg.datasets import DREAMERDataset
from torcheeg.datasets.constants.emotion_recognition.dreamer import DREAMER_CHANNEL_LOCATION_DICT
from torcheeg import transforms
from torch import nn
import torch
import random

In [2]:
dataset_path = "./DREAMER.mat"
base_path = "./"

toTensor = transforms.ToTensor()

In [3]:
dataset = DREAMERDataset(
    io_path=base_path + 'dreamer61sec',
    mat_path=dataset_path,
    offline_transform=transforms.Compose([
        transforms.BaselineRemoval(),
        transforms.MeanStdNormalize(),
        transforms.To2d()
    ]),
    # online_transform=transforms.ToTensor(),
    label_transform=transforms.Compose(
        [transforms.Select('valence'),
         transforms.Binary(3.0)]),
    chunk_size=7808,
    baseline_chunk_size=7808,
    num_baseline=1
)

The target folder already exists, if you need to regenerate the database IO, please delete the path ./dreamer61sec.


In [4]:
def get_tf_feature(eeg, sr, n_channels = 14):
    WinLength = int(0.5*sr) # 500 points (0.5 sec, 500 ms)
    step = int(0.025*sr) # 25 points (or 25 ms)
    final_features = None
    for i in range(n_channels):
        eeg_single = eeg[i].squeeze()
        myparams = dict(nperseg = WinLength, noverlap = WinLength-step, return_onesided=True, mode='magnitude')
        f, nseg, Sxx = signal.spectrogram(x = eeg_single, fs = sr, **myparams)
        if(isinstance(final_features, np.ndarray)):
            final_features = np.concatenate((final_features, Sxx), axis=0)
        else:
            final_features = Sxx
    return final_features

In [16]:
device = "cuda" if torch.cuda.is_available() else "cpu"
device = "cpu"
print(f"Using {device} device")

Using cpu device


In [17]:
def convert_data_to_tensor(data):
    data = data.astype("float32")
    data = data.reshape(1, data.shape[0], data.shape[1])
    return torch.from_numpy(data)

In [18]:
class NeuralNetwork(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv2D_1 = nn.Sequential(
            nn.Conv2d(1, 1024, 11, stride=3),
            nn.Conv2d(1024, 512, 7, stride=3),
            nn.Conv2d(512, 128, 7, stride=3),
        )
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            # nn.Linear(14550, 2048),
            # nn.ReLU(),
            nn.Linear(178560, 1024),
            nn.ReLU(),
            nn.Linear(1024, 512),
            nn.ReLU(),
            nn.Linear(512, 128),
            nn.ReLU(),
            nn.Linear(128, 2),
        )

    def forward(self, x):
        x = self.conv2D_1(x)
        # x = self.flatten(x)
        x = x.view(1, -1)
        # print(x.shape)
        logits = self.linear_relu_stack(x)
        return logits

In [19]:
random.seed(42)

test_size = 250
test_index = random.sample(range(0, 1265), test_size)
train_index = []

for i in range(1265):
    if i not in test_index:
        train_index.append(i)


In [20]:
def train_loop(dataset,  model, loss_fn, optimizer):
    # size = len(dataset)
    model.train()
    sample_size = len(train_index)
    j=0
    for i in train_index:
        # Compute prediction and loss
        # print(i)
        # Backpropagation
        optimizer.zero_grad()
        
        X, y = dataset[i][0][0], dataset[i][1]
        X = get_tf_feature(X, sr=128)
        X = convert_data_to_tensor(X)
        if y == 0:
            y = [0]
        else:
            y = [1]
        y = torch.tensor(y)
        X, y = X.to(device), y.to(device)
        pred = model(X)
        # print(y.shape, pred.shape)
        loss = loss_fn(pred, y)

        loss.backward()
        optimizer.step()
        if j % 1 == 0:
            loss, current = loss.item(), j + 1
            print(f"loss: {loss:>7f}  [{current:>5d}/{sample_size:>5d}]")
        j=j+1

In [21]:

val_error = 999999999.9
best_model_parameter = None

def test_loop(dataset, model, loss_fn):
    # size = len(dataset)
    test_loss, correct = 0, 0
    sample_size = len(test_index)
    # l = random.sample(range(0, 11000), 1)
    # j=0
    with torch.no_grad():
        model.eval()
        for i in test_index:
            X, y = dataset[i][0][0], dataset[i][1]
            X = get_tf_feature(X, sr=128)
            X = convert_data_to_tensor(X)
            if y == 0:
                y = [0]
            else:
                y = [1]
            y = torch.tensor(y)
            
            X, y = X.to(device), y.to(device)
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == best_model_parametery).type(torch.float).sum().item()

    test_loss /= sample_size
    correct /= sample_size

    if val_error > test_loss:
        val_error = test_loss
        best_model_parameter = model.state_dict()

    str = (f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")
    print(str)
    with open("log.txt", "a") as f:
        f.write(str)

In [22]:
learning_rate = 3e-3

In [23]:
model = NeuralNetwork().to(device)

loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)

epochs = 100
for t in range(epochs):
    str = (f"Epoch {t+1}\n-------------------------------\n")
    print(str)
    with open("log.txt", "a") as f:
        f.write(str)
    train_loop(dataset, model, loss_fn, optimizer)
    test_loop(dataset, model, loss_fn)
print("Done!")

Epoch 1
-------------------------------

loss: 0.631244  [    1/ 1015]
loss: 0.628938  [    2/ 1015]
loss: 0.626724  [    3/ 1015]
loss: 0.624499  [    4/ 1015]
loss: 0.622207  [    5/ 1015]


KeyboardInterrupt: 

In [None]:
assert best_model_parameter is not None, "No best model"
best_model = NeuralNetwork().to(device)
best_model.load_state_dict(best_model_parameter)
torch.save(best_model.state_dict(), "eeg_model.pth")