In [1]:
import scipy.io
import numpy as np

from sklearn.model_selection import train_test_split

import torch 
import torch.nn as nn
import torch.nn.functional as F

from torch.utils.data import Dataset, DataLoader
import torch.optim as optim

In [2]:
# Load the .mat file
mat_file = scipy.io.loadmat('/Users/chrisdollo/Documents/coding_projects/EMG/Data/resized for 1500/processed_gestures_subject3.mat')
cell_array = mat_file['processed_data']

X = []
Y = []

for gesture_type in range(cell_array.shape[1]):
    for row_idx in range(cell_array.shape[0]):

        cell = cell_array[row_idx, gesture_type]

        # Convert to numpy array explicitly and ensure shape (24, 18)
        cell = np.array(cell)

        if(gesture_type == 0 and row_idx == 0):
            print(cell.shape)
        # if cell.shape != (24, 18):
        #     raise ValueError(f"Unexpected shape: {cell.shape} at row {row_idx}, gesture {gesture_type}")

        # Add a new axis for channel (CNN expects 4D input: samples, height, width, channels)
        X.append(cell.astype(np.float32))

        # we add 1 to the label so that they range from 1-7 insteat of 0-6
        Y.append(gesture_type + 1)

# Convert list to NumPy arrays
# X = np.array(X, dtype=np.float32)  # shape: (samples, 24, 18)
# X = X[..., np.newaxis]             # shape: (samples, 24, 18, 1)
# Y = np.array(Y, dtype=np.int32)

X = np.array(X)
Y = np.array(Y)

# # # Train/test split
# # X_train, X_test, y_train, y_test = train_test_split(
# #     X, Y, test_size=0.2, stratify=Y, random_state=42
# # )

print("X shape:", X.shape)
print("y shape:", Y.shape)

(1500, 24)
X shape: (196, 1500, 24)
y shape: (196,)


Let's build the model.

For this task we will be using a Convolutional Neural Network

In [3]:
class EEGNet(nn.Module):
    def __init__(self, num_classes=4, num_channels=22, input_samples=1125, dropout_rate=0.5, kernel_length=64, F1=8, D=2, F2=16):
        super(EEGNet, self).__init__()
        
        self.num_classes = 4
        
        self.firstConv = nn.Sequential(
            nn.Conv2d(1, F1, kernel_size=(1,kernel_length), padding=(0, kernel_length//2), bias=False), 
            nn.BatchNorm2d(F1)
        )
        
        self.depthwiseConv = nn.Sequential(
            nn.Conv2d(F1, F1*D, kernel_size=(num_channels, 1), groups=F1, bias=False),
            nn.BatchNorm2d(F1*D),
            nn.ELU(),
            nn.AvgPool2d(kernel_size=(1,4)),
            nn.Dropout(p=dropout_rate)
        )
        
        self.separableConv = nn.Sequential(
            nn.Conv2d(F1*D, F2, kernel_size=(1,16), padding=(0,8), bias=False),
            nn.BatchNorm2d(F2),
            nn.ELU(),
            nn.AvgPool2d(kernel_size=(1,8)),
            nn.Dropout(p=dropout_rate)
        )
        
        with torch.no_grad():
            dummy = torch.zeros(1, 1, num_channels, input_samples)
            x = self.firstConv(dummy)
            x = self.depthwiseConv(x)
            x = self.separableConv(x)
            flattened = x.shape[1] * x.shape[2] * x.shape[3]
        

        self.classify = nn.Sequential(
            nn.Flatten(),
            nn.Linear(736, num_classes)
        )
        
    def forward(self, x):
        x = self.firstConv(x)
        x = self.depthwiseConv(x)
        x = self.separableConv(x)
        x = self.classify(x)
        return x

Dimension 
 X_train: (230, 1, 22, 1000) 	 y_train: (230,) 
 X_test: (58, 1, 22, 1000) 	 y_test: (58,)

 The model takes in the input dimensions as (trials, 1, channels, samples). Please keep it in these dimenions to make it work

In [4]:
# DataLoader

class BCIDataset(Dataset):
    def __init__(self, X, y):
        print("crashes here")
        self.X = torch.from_numpy(X)
        print("crashes here")
        self.y = torch.from_numpy(y-1)
        
    def __len__(self):
        return len(self.X)
    
    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]
    

# class BCIDataset(Dataset):
#     def __init__(self, X, y):
#         self.X = X  # Keep as NumPy array
#         self.y = y - 1  # Apply label adjustment, if necessary

#     def __len__(self):
#         return len(self.X)

#     def __getitem__(self, idx):
#         x_tensor = torch.tensor(self.X[idx], dtype=torch.float32)
#         y_tensor = torch.tensor(self.y[idx], dtype=torch.long)
#         return x_tensor, y_tensor

In [5]:
# Train and evaluate
def train(model, train_loader, criterion, optimizer, device):
    model.train()
    total_loss = 0
    
    for X,y in train_loader:
        X, y = X.to(device), y.to(device)
        optimizer.zero_grad()
        out = model(X)
        loss = criterion(out, y)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    return total_loss / len(train_loader)
 
def evaluate(model, test_loader, device):
    model.eval()
    correct, total = 0, 0
    with torch.no_grad():
        for X, y in test_loader:
            X,y = X.to(device), y.to(device)
            out = model(X)
            pred = out.argmax(dim=1)
            correct += (pred == y).sum().item()
            total += y.size(0)
            
    return correct / total

In [6]:
X = X[:, np.newaxis, :, :]      # we reshape the seize of ther input to fit the model 
Y = Y - 1                       # we substract 1 from the labels 

X = np.transpose(X, (0, 1, 3, 2))
Y= Y.squeeze()

X_train, X_val, y_train, y_val = train_test_split(
    X, Y, test_size=0.2, stratify=Y, random_state=42
)

In [7]:
train_ds = BCIDataset(X_train, y_train)

val_ds = BCIDataset(X_val, y_val)

train_loader = DataLoader(train_ds, batch_size=16, shuffle=True)
val_loader = DataLoader(val_ds, batch_size=16, shuffle=False)

crashes here
crashes here
crashes here
crashes here


In [8]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print("Using device:", device)

Using device: cpu


In [9]:
model = EEGNet(num_classes=7, num_channels=24, input_samples=1500) # Modify these parameters


In [10]:
model = model.to(device)
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()

In [11]:
# mx_acc = 0
for epoch in range(50):
    loss = train(model, train_loader, criterion, optimizer, device)
    # acc = evaluate(model, val_loader, device)
    # if acc > mx_acc:
    #     mx_acc = acc
    print(f"Epoch {epoch}: Loss= {loss:.4f}, Accuracy: {acc*100:.2f}%, Max Acc: {mx_acc*100:.2f}")

: 

: 