<a href="https://colab.research.google.com/github/AgnelFernando/BCI_UR3/blob/main/EEG_Classification.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
!pip install numpy scipy pandas scikit-learn


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.1.1[0m[39;49m -> [0m[32;49m25.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49m/usr/local/opt/python@3.11/bin/python3.11 -m pip install --upgrade pip[0m


In [4]:
import os
import numpy as np
import pandas as pd
from scipy.signal import butter, filtfilt

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import torch.optim as optim

from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from sklearn.preprocessing import StandardScaler

import matplotlib.pyplot as plt

ModuleNotFoundError: No module named 'numpy'

# Data Preparation

In [None]:
left_channels = pd.read_csv("./data/down/down_2025-08-07_18/06/34.csv")

In [None]:
import matplotlib.pyplot as plt

# Plot 5 seconds from channel 0
channel = 12
start, end = 0, 50 * 250  # 5 sec at 250Hz
plt.plot(left_channels.iloc[start:end, channel])
plt.title(f"Channel {channel} - Raw EEG (Time Domain)")
plt.xlabel("Sample")
plt.ylabel("Voltage (µV)")
plt.show()

In [None]:
print(left_channels.describe())

### Power Spectral Density (PSD) Analysis

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
from scipy.signal import welch

def plot_psd(df):
  fs = 250
  eeg_data = df.iloc[:, :16]

  plt.figure(figsize=(10, 6))
  for ch in range(eeg_data.shape[1]):
      f, Pxx = welch(eeg_data.iloc[:, ch], fs=fs, nperseg=512)
      plt.semilogy(f, Pxx, label=f"Ch {ch}")

  plt.title("Power Spectral Density (All 16 Channels)")
  plt.xlabel("Frequency (Hz)")
  plt.ylabel("Power Spectral Density")
  plt.xlim(0, 50)  # EEG relevant range
  plt.grid(True)
  plt.legend(ncol=4, fontsize=8)
  plt.tight_layout()
  plt.show()


In [None]:
# Left channels

left_channels = pd.read_csv("/content/drive/MyDrive/OpenBCI/left/left_2025-07-15_13:20:42.csv")
plot_psd(left_channels)

In [None]:
# Right channels

right_channels = pd.read_csv("/content/drive/MyDrive/OpenBCI/right/right_2025-07-15_13:52:29.csv")
plot_psd(right_channels)

In [None]:
SAMPLING_RATE = 250
WINDOW_SIZE = SAMPLING_RATE * 1
STRIDE = SAMPLING_RATE // 2
BANDPASS_LOW = 1
BANDPASS_HIGH = 40

In [None]:
def bandpass_filter(data, low, high, fs):
    b, a = butter(N=4, Wn=[low / (fs/2), high / (fs/2)], btype='band')
    return filtfilt(b, a, data, axis=0)

def segment_windows(data, window_size, stride):
    windows = []
    for start in range(0, data.shape[0] - window_size + 1, stride):
        end = start + window_size
        windows.append(data[start:end])
    return np.stack(windows)

def preprocess_file(file_path, label):
    df = pd.read_csv(file_path)
    eeg_data = df.iloc[:, :16].drop(df.columns[11], axis=1).values

    filtered = bandpass_filter(eeg_data, BANDPASS_LOW, BANDPASS_HIGH, SAMPLING_RATE)

    scaler = StandardScaler()
    normalized = scaler.fit_transform(filtered)

    windows = segment_windows(normalized, WINDOW_SIZE, STRIDE)
    labels = np.full((len(windows),), label)

    return windows, labels

task_to_label = {'left': 0, 'right': 1, 'up': 2, 'down': 3}

def process_dataset(data_dir):
    all_X, all_y = [], []
    for task in os.listdir(data_dir):
      for file in os.listdir(data_dir + "/" + task):
        if file.endswith('.csv'):
            file_path = os.path.join(data_dir, task, file)
            label = task_to_label[task]
            X, y = preprocess_file(file_path, label)
            all_X.append(X)
            all_y.append(y)

    X = np.concatenate(all_X, axis=0)
    y = np.concatenate(all_y, axis=0)
    return X, y

In [None]:
data_dir = '/content/drive/MyDrive/OpenBCI'
X, y = process_dataset(data_dir)

print(f"Shape of X: {X.shape}")
print(f"Shape of y: {y.shape}")

In [None]:
class EEGDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.tensor(X, dtype=torch.float32)
        self.y = torch.tensor(y, dtype=torch.long)

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx].T, self.y[idx]

In [None]:
class SimpleEEGNet(nn.Module):
    def __init__(self, num_classes=4):
        super(SimpleEEGNet, self).__init__()
        self.net = nn.Sequential(
            nn.Conv1d(15, 32, kernel_size=5, stride=1, padding=2),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=2),

            nn.Conv1d(32, 64, kernel_size=5, stride=1, padding=2),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=2),

            nn.Flatten(),
            nn.Linear(64 * 62, 128),
            nn.ReLU(),
            nn.Linear(128, num_classes)
        )

    def forward(self, x):
        return self.net(x)

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.25, random_state=42)

train_dataset = EEGDataset(X_train, y_train)
test_dataset = EEGDataset(X_test, y_test)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32)

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
model = SimpleEEGNet(num_classes=4).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

for epoch in range(10):
    model.train()
    total_loss = 0
    for batch_X, batch_y in train_loader:
        batch_X, batch_y = batch_X.to(device), batch_y.to(device)

        optimizer.zero_grad()
        outputs = model(batch_X)
        loss = criterion(outputs, batch_y)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()

    print(f"Epoch {epoch+1} - Loss: {total_loss:.4f}")

In [None]:
model.eval()
all_preds = []
all_labels = []

with torch.no_grad():
    for batch_X, batch_y in test_loader:
        batch_X = batch_X.to(device)
        outputs = model(batch_X)
        preds = torch.argmax(outputs, dim=1).cpu().numpy()
        all_preds.extend(preds)
        all_labels.extend(batch_y.numpy())

acc = accuracy_score(all_labels, all_preds)
print(f"Test Accuracy: {acc:.2%}")

# LSTM

In [None]:
class EEG_LSTM(nn.Module):
    def __init__(self, input_size=16, hidden_size=64, num_layers=2, num_classes=4):
        super(EEG_LSTM, self).__init__()
        self.lstm = nn.LSTM(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,
            bidirectional=False
        )
        self.fc = nn.Sequential(
            nn.Linear(hidden_size, 64),
            nn.ReLU(),
            nn.Linear(64, num_classes)
        )

    def forward(self, x):
        out, _ = self.lstm(x)
        out = out[:, -1, :]
        return self.fc(out)


In [None]:
class EEGDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.tensor(X, dtype=torch.float32)
        self.y = torch.tensor(y, dtype=torch.long)

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.25, random_state=42)

train_dataset = EEGDataset(X_train, y_train)
test_dataset = EEGDataset(X_test, y_test)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32)

In [None]:
model = EEG_LSTM(input_size=15, hidden_size=128).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.0005)

In [None]:
train_losses = []

for epoch in range(200):
    model.train()
    running_train_loss = 0
    for batch_X, batch_y in train_loader:
        batch_X, batch_y = batch_X.to(device), batch_y.to(device)

        optimizer.zero_grad()
        outputs = model(batch_X)
        loss = criterion(outputs, batch_y)
        loss.backward()
        optimizer.step()
        running_train_loss += loss.item()

    train_losses.append(running_train_loss)
    if epoch % 5 == 0:
      print(f"Epoch {epoch+1:02d} | Train Loss: {running_train_loss:.4f}")

In [None]:
model.eval()
all_preds = []
all_labels = []

with torch.no_grad():
    for batch_X, batch_y in test_loader:
        batch_X = batch_X.to(device)
        outputs = model(batch_X)
        preds = torch.argmax(outputs, dim=1).cpu().numpy()
        all_preds.extend(preds)
        all_labels.extend(batch_y.numpy())

acc = accuracy_score(all_labels, all_preds)
print(f"Test Accuracy: {acc:.2%}")

# GRU

In [None]:
class EEG_GRU(nn.Module):
    def __init__(self, input_size=16, hidden_size=64, num_layers=2, num_classes=4, dropout=0.3):
        super(EEG_GRU, self).__init__()
        self.gru = nn.GRU(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            dropout=dropout if num_layers > 1 else 0,
            batch_first=True,
            bidirectional=False
        )
        self.fc = nn.Sequential(
            nn.Linear(hidden_size, 64),
            nn.ReLU(),
            nn.Linear(64, num_classes)
        )

    def forward(self, x):
        out, _ = self.gru(x)
        out = out[:, -1, :]
        return self.fc(out)


In [None]:
model = EEG_GRU(input_size=15, hidden_size=64, num_layers=2, num_classes=4).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

In [None]:
train_losses = []

for epoch in range(100):
    model.train()
    running_train_loss = 0
    for batch_X, batch_y in train_loader:
        batch_X, batch_y = batch_X.to(device), batch_y.to(device)

        optimizer.zero_grad()
        outputs = model(batch_X)
        loss = criterion(outputs, batch_y)
        loss.backward()
        optimizer.step()
        running_train_loss += loss.item()

    train_losses.append(running_train_loss)
    if epoch % 5 == 0:
      print(f"Epoch {epoch+1:02d} | Train Loss: {running_train_loss:.4f}")

In [None]:
model.eval()
all_preds = []
all_labels = []

with torch.no_grad():
    for batch_X, batch_y in test_loader:
        batch_X = batch_X.to(device)
        outputs = model(batch_X)
        preds = torch.argmax(outputs, dim=1).cpu().numpy()
        all_preds.extend(preds)
        all_labels.extend(batch_y.numpy())

acc = accuracy_score(all_labels, all_preds)
print(f"Test Accuracy: {acc:.2%}")

In [None]:
torch.save({
    'model_state_dict': model.state_dict(),
    'input_size': 15,
    'hidden_size': 64,
    'num_layers': 2,
    'num_classes': 4
}, "eeg_gru_checkpoint.pth")

## Inference

In [None]:
import numpy as np
import torch
import torch.nn as nn
from scipy.signal import butter, filtfilt
from sklearn.preprocessing import StandardScaler

SAMPLING_RATE = 250
BANDPASS_LOW = 1
BANDPASS_HIGH = 40

def bandpass_filter(data, low, high, fs):
    b, a = butter(N=4, Wn=[low / (fs / 2), high / (fs / 2)], btype='band')
    return filtfilt(b, a, data, axis=0)

def preprocess_live_eeg(raw_window):
    """
    raw_window: np.array of shape (250, 16)
    returns: processed_window of shape (250, 15)
    """
    assert raw_window.shape == (250, 16), "Expected raw EEG shape (250, 16)"

    raw_window = np.delete(raw_window, 11, axis=1)  # → (250, 15)

    filtered = bandpass_filter(raw_window, BANDPASS_LOW, BANDPASS_HIGH, SAMPLING_RATE)

    scaler = StandardScaler()
    normalized = scaler.fit_transform(filtered)

    return normalized


In [None]:
class EEG_GRU(nn.Module):
    def __init__(self, input_size=15, hidden_size=64, num_layers=2, num_classes=4, dropout=0.3):
        super(EEG_GRU, self).__init__()
        self.gru = nn.GRU(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            dropout=dropout if num_layers > 1 else 0,
            batch_first=True
        )
        self.fc = nn.Sequential(
            nn.Linear(hidden_size, 64),
            nn.ReLU(),
            nn.Linear(64, num_classes)
        )

    def forward(self, x):
        out, _ = self.gru(x)
        out = out[:, -1, :]
        return self.fc(out)

In [None]:
def inference(raw_eeg_window: np.ndarray, model_path="eeg_gru_model.pth"):
    processed = preprocess_live_eeg(raw_eeg_window)  # (250, 15)

    model = EEG_GRU()
    model.load_state_dict(torch.load(model_path, map_location='cpu'))
    model.eval()

    with torch.no_grad():
        x = torch.tensor(processed, dtype=torch.float32).unsqueeze(0)  # (1, 250, 15)
        output = model(x)
        pred = torch.argmax(output, dim=1).item()

    return pred
