In [52]:
import torch
from torch.utils.data import DataLoader, Dataset
import numpy as np
import os
from sklearn.model_selection import train_test_split

### Dataset

In [3]:
device = torch.device("cuda")

BATCH_SIZE = 64

In [4]:
def prepare_data(data_path='EEG_Data', sample_rate = 2):
    X = []
    Y = []

    sw = 0
    for dir in os.listdir(data_path):
        folder_path = os.path.join(data_path, dir)
        for file in os.listdir(folder_path):
            if sw != sample_rate:
                sw+=1
                continue
            file_path = os.path.join(folder_path, file)
            spectrogram = np.load(file_path)
            X.append(spectrogram)
            Y.append(int(dir))
            sw=0
            
    X = np.array(X)
    Y = np.array(Y)
    
    X_train, X_temp, y_train, y_temp = train_test_split(X, Y, test_size=0.3, random_state=42)
    X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42)
    
    return X_train, X_val, X_test, y_train, y_val, y_test

In [30]:
class EEG_Dataset(Dataset):
    def __init__(self, X, Y):
        
        self.X = X
        self.Y = Y
        self.X = torch.FloatTensor(X)
        self.Y = torch.LongTensor(Y)

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        x = self.X[idx].squeeze()
        x = x.unsqueeze(0)
        return x.to(device), self.Y[idx].to(device)

In [31]:
X_train, X_val, X_test, y_train, y_val, y_test = prepare_data()

In [32]:
train_dataset = EEG_Dataset(X_train, y_train)
val_dataset = EEG_Dataset(X_val, y_val)
test_dataset = EEG_Dataset(X_test, y_test)

In [33]:
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

## Model

In [57]:
import torch.nn as nn

class SpectrogramCNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(1, 16, kernel_size=3)
        self.bn1 = nn.BatchNorm2d(16)
        self.conv2 = nn.Conv2d(16, 32, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(32)
        self.conv3 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.bn3 = nn.BatchNorm2d(64)
        self.pool = nn.MaxPool2d(2, 2)
        
        self.fc1 = nn.Linear(64 * 6 * 7, 512)
        self.bn4 = nn.BatchNorm1d(512)
        self.fc2 = nn.Linear(512, 5) 
        
        self.dropout = nn.Dropout(0.5)
        
    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = torch.relu(x)
        x = self.pool(x)
        
        x = self.conv2(x)
        x = self.bn2(x)
        x = torch.relu(x)
        x = self.pool(x)
        
        x = self.conv3(x)
        x = self.bn3(x)
        x = torch.relu(x)
        x = self.pool(x)
        
        x = x.view(-1, 64 * 6 * 7) 
        x = self.fc1(x)
        x = self.bn4(x)
        x = torch.relu(x)
        x = self.dropout(x)
        
        x = self.fc2(x)
        return x

In [58]:
model = SpectrogramCNN().to(device)
xx1 = train_dataset[0][0].unsqueeze(0)
xx2 = train_dataset[1][0].unsqueeze(0)
xx = torch.cat((xx1, xx2), dim=0)
xx.shape
yy = torch.cat((train_dataset[0][1].unsqueeze(0), train_dataset[1][1].unsqueeze(0)), dim=0)

In [61]:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
n_epochs = 1000

for epoch in range(n_epochs):
    model.train()
    train_loss = 0
    
    optimizer.zero_grad()
    outputs = model(xx)
    loss = criterion(outputs, yy)
    loss.backward()
    optimizer.step()
    
    if epoch % 100 == 0:
        print(f'Training Loss: {loss.item():.8f}')

Training Loss: 0.00000012
Training Loss: 0.00000000
Training Loss: 0.00000000
Training Loss: 0.00000000
Training Loss: 0.00000000
Training Loss: 0.00000000
Training Loss: 0.00000000
Training Loss: 0.00000000
Training Loss: 0.00000000
Training Loss: 0.00000000
