In [1]:
!pip3 install mne



In [2]:
from google.colab import drive
drive.mount('/content/drive/')

Drive already mounted at /content/drive/; to attempt to forcibly remount, call drive.mount("/content/drive/", force_remount=True).


In [47]:
import mne
import numpy as np
import pywt
import numpy as np
from scipy.ndimage import zoom
import torch
from torch.utils.data import TensorDataset, DataLoader
from torch import nn, optim
import os
import torchvision.models as models

# ---------------- EEG Preprocessor ----------------
import mne
import numpy as np

class EEGPreprocessor:
    def __init__(self):
        print("[INIT] EEGPreprocessor initialized")

    @staticmethod
    def load_data(gdf_file, eeg_channels=1, trial_duration_sec=8):
        # Load raw data
        raw = mne.io.read_raw_gdf(gdf_file, preload=True)
        sfreq = raw.info['sfreq']

        # Extract events
        events, event_id = mne.events_from_annotations(raw)
        mi_event_codes = [event_id['769'], event_id['770'], event_id['771'], event_id['772']]
        artifact_code = event_id.get('1023', None)

        samples_per_trial = int(trial_duration_sec * sfreq)

        trials = []
        labels = []

        for sample_idx, _, code in events:
            if code not in mi_event_codes or code == artifact_code:
                continue

            data = raw.get_data(start=sample_idx, stop=sample_idx + samples_per_trial)[:eeg_channels, :]
            if data.shape[1] != samples_per_trial or np.isnan(data).any():
                continue

            trials.append(data[:, 500:1501])  # full trial
            labels.append(mi_event_codes.index(code))

        trials = np.array(trials)
        labels = np.array(labels)

        print(f"[LOAD] Loaded {trials.shape[0]} trials with shape {trials.shape[1:]}")
        return trials, labels


    @staticmethod
    def split_into_windows(trials_2_4s, labels, n_windows=5, window_samples=200):
        n_trials, n_channels, n_samples = trials_2_4s.shape

        if n_samples < n_windows * window_samples:
            raise ValueError(f"Not enough samples per trial ({n_samples}) for {n_windows} windows of {window_samples} samples each.")

        windowed_data = []
        windowed_labels = []

        for trial_idx in range(n_trials):
            data = []
            for i in range(n_windows):
                start = i * window_samples
                end = start + window_samples
                data.append(trials_2_4s[trial_idx, :, start:end])
            windowed_data.append(np.array(data))
            windowed_labels.append(labels[trial_idx])

        print(f"[WINDOW] Split into {n_windows} windows per trial.")
        print(f"[WINDOW] Windowed data shape: {len(windowed_data)}, Windowed labels shape: {len(windowed_labels)}")

        return windowed_data, windowed_labels




# ---------------- Feature Extractor ----------------
class FeatureExtractor:
    def __init__(self):
        print("[INIT] Loading frozen ResNet50 as feature extractor...")
        self.model = models.resnet50(pretrained=True)
        # Remove classification head
        self.model = nn.Sequential(*list(self.model.children())[:-1])
        for param in self.model.parameters():
            param.requires_grad = False
        self.model.eval()
        print("[INIT] ResNet50 feature extractor ready")

    @staticmethod
    def compute_scalogram_short(eeg_signal, sfreq=100, freqs=np.linspace(8, 30, 40), n_cycles=3, output_size=(224, 224)):
        """
        Convert a 1-channel short EEG signal (~200 samples) to a Morlet wavelet scalogram.
        """
        if eeg_signal.ndim != 2 or eeg_signal.shape[0] != 1:
            raise ValueError("eeg_signal must have shape (1, n_samples)")

        eeg_batch = eeg_signal[np.newaxis, :, :]  # shape (1, 1, n_samples)

        tfr = mne.time_frequency.tfr_array_morlet(
            eeg_batch, sfreq=sfreq, freqs=freqs, n_cycles=n_cycles, output='power'
        )  # shape (1, 1, n_freqs, n_times)

        scalogram = tfr[0, 0]  # shape (n_freqs, n_times)

        # Normalize to 0-1
        scalogram -= scalogram.min()
        scalogram /= scalogram.max() + 1e-6

        # Resize to desired output size
        from skimage.transform import resize
        scalogram_resized = resize(scalogram, output_size, mode='reflect', anti_aliasing=True)

        return scalogram_resized

    def extract_features(self, scalogram):
        """
        Instance method: extract features using the frozen ResNet50.
        """
        img = torch.tensor(scalogram, dtype=torch.float32).unsqueeze(0).repeat(3, 1, 1)  # 3 channels
        img = img.unsqueeze(0)  # add batch dimension

        with torch.no_grad():
            feat = self.model(img)

        return feat.flatten().cpu().numpy()



In [76]:
def get_data(path):
    pre = EEGPreprocessor()
    data, labels = pre.load_data(path)
    windowed_data, windowed_labels = pre.split_into_windows(data, labels, n_windows=5, window_samples=200)
    fe = FeatureExtractor()
    dataset = []
    true_labels = []

    for i, window in enumerate(windowed_data):
        channel_feats = []

        for ch in range(5):
            epoch = window[ch]
            scalogram = fe.compute_scalogram_short(epoch)
            feature = fe.extract_features(scalogram)
            channel_feats.append(feature)

        # Concatenate features from all channels → (n_channels * 2048,)
        window_features = np.concatenate(channel_feats)

        dataset.append(window_features)

        if i % 50 == 0:
            print(f"Number: {i}")

    dataset = np.array(dataset)
    true_labels = np.array(true_labels)
    dataset_reshaped = dataset.reshape(287, 5, 2048)

    print("Final dataset shape:", dataset.shape)
    print("Final labels shape:", true_labels.shape)

    return dataset_reshaped, true_labels

train_data, train_labels = get_data('/content/drive/MyDrive/BCICIV_2a_gdf/A01T.gdf')
test_data, test_labels = get_data('/content/drive/MyDrive/BCICIV_2a_gdf/A02T.gdf')

[INIT] EEGPreprocessor initialized
Extracting EDF parameters from /content/drive/MyDrive/BCICIV_2a_gdf/A01T.gdf...
GDF file detected
Setting channel info structure...
Could not determine channel type of the following channels, they will be set as EEG:
EEG-Fz, EEG, EEG, EEG, EEG, EEG, EEG, EEG-C3, EEG, EEG-Cz, EEG, EEG-C4, EEG, EEG, EEG, EEG, EEG, EEG, EEG, EEG-Pz, EEG, EEG, EOG-left, EOG-central, EOG-right
Creating raw.info structure...
Reading 0 ... 672527  =      0.000 ...  2690.108 secs...


  next(self.gen)


Used Annotations descriptions: [np.str_('1023'), np.str_('1072'), np.str_('276'), np.str_('277'), np.str_('32766'), np.str_('768'), np.str_('769'), np.str_('770'), np.str_('771'), np.str_('772')]
[LOAD] Loaded 287 trials with shape (1, 1001)
[WINDOW] Split into 5 windows per trial.
[WINDOW] Windowed data shape: 287, Windowed labels shape: 287
[INIT] Loading frozen ResNet50 as feature extractor...
[INIT] ResNet50 feature extractor ready
Number: 0
Number: 50
Number: 100
Number: 150
Number: 200
Number: 250
Final dataset shape: (287, 10240)
Final labels shape: (0,)
[INIT] EEGPreprocessor initialized
Extracting EDF parameters from /content/drive/MyDrive/BCICIV_2a_gdf/A02T.gdf...
GDF file detected
Setting channel info structure...
Could not determine channel type of the following channels, they will be set as EEG:
EEG-Fz, EEG, EEG, EEG, EEG, EEG, EEG, EEG-C3, EEG, EEG-Cz, EEG, EEG-C4, EEG, EEG, EEG, EEG, EEG, EEG, EEG, EEG-Pz, EEG, EEG, EOG-left, EOG-central, EOG-right
Creating raw.info stru

  next(self.gen)


Used Annotations descriptions: [np.str_('1023'), np.str_('1072'), np.str_('276'), np.str_('277'), np.str_('32766'), np.str_('768'), np.str_('769'), np.str_('770'), np.str_('771'), np.str_('772')]
[LOAD] Loaded 287 trials with shape (1, 1001)
[WINDOW] Split into 5 windows per trial.
[WINDOW] Windowed data shape: 287, Windowed labels shape: 287
[INIT] Loading frozen ResNet50 as feature extractor...
[INIT] ResNet50 feature extractor ready
Number: 0
Number: 50
Number: 100
Number: 150
Number: 200
Number: 250
Final dataset shape: (287, 10240)
Final labels shape: (0,)


In [82]:
all_data = np.concatenate((train_data, test_data), axis=0)
all_labels = np.concatenate((train_labels, test_labels), axis=0)

print("Dataset shape: ", all_data.shape)
print("Labels shape: ", all_labels.shape)

Dataset shape:  (574, 5, 2048)
Labels shape:  (574,)


In [83]:
class EEGModel(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(EEGModel, self).__init__()
        print("[MODEL INIT] Initializing LSTM model...")

        self.lstm = nn.LSTM(input_size, hidden_size, batch_first=True)
        self.fc1 = nn.Linear(hidden_size, 100)   # First FC layer with 100 neurons
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(100, 4)             # Final output layer with 4 neurons
        self.softmax = nn.Softmax(dim=1)         # Softmax across the class dimension

        print("[MODEL INIT] LSTM model ready")

    def forward(self, x):
        out, _ = self.lstm(x)
        out = out[:, -1, :]       # Last time-step output
        out = self.fc1(out)
        out = self.relu(out)
        out = self.fc2(out)
        out = self.softmax(out)   # Convert logits to probabilities
        return out

In [None]:
class InceptionBlock1D(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(InceptionBlock1D, self).__init__()
        assert out_channels % 4 == 0, "out_channels must be divisible by 4"
        branch_channels = out_channels // 4
        
        # 1x1 convolution branch
        self.branch1 = nn.Conv1d(in_channels, branch_channels, kernel_size=1)
        
        # 1x1 -> 3x1 convolution branch
        self.branch2 = nn.Sequential(
            nn.Conv1d(in_channels, branch_channels, kernel_size=1),
            nn.Conv1d(branch_channels, branch_channels, kernel_size=3, padding=1)
        )
        
        # 1x1 -> 5x1 convolution branch
        self.branch3 = nn.Sequential(
            nn.Conv1d(in_channels, branch_channels, kernel_size=1),
            nn.Conv1d(branch_channels, branch_channels, kernel_size=5, padding=2)
        )
        
        # MaxPool -> 1x1 convolution branch
        self.branch4 = nn.Sequential(
            nn.MaxPool1d(kernel_size=3, stride=1, padding=1),
            nn.Conv1d(in_channels, branch_channels, kernel_size=1)
        )

    def forward(self, x):
        b1 = self.branch1(x)
        b2 = self.branch2(x)
        b3 = self.branch3(x)
        b4 = self.branch4(x)
        out = torch.cat([b1, b2, b3, b4], dim=1)  # Concatenate along channel dimension
        return out


class EEGInceptionModel(nn.Module):
    def __init__(self, in_channels, num_classes=4):
        super(EEGInceptionModel, self).__init__()
        print("[MODEL INIT] Initializing Inception EEG model...")

        self.inception1 = InceptionBlock1D(in_channels, 64)
        self.inception2 = InceptionBlock1D(64, 128)
        self.pool = nn.AdaptiveAvgPool1d(1)  # Global average pooling
        self.fc = nn.Linear(128, num_classes)
        self.softmax = nn.Softmax(dim=1)

        print("[MODEL INIT] Inception EEG model ready")

    def forward(self, x):
        # Expect x of shape (batch, channels, time)
        x = self.inception1(x)
        x = self.inception2(x)
        x = self.pool(x).squeeze(-1)  # shape: (batch, channels)
        x = self.fc(x)
        x = self.softmax(x)
        return x

In [None]:
class SimpleEEGCNN(nn.Module):
    def __init__(self, num_classes=4):
        super(SimpleEEGCNN, self).__init__()

        # ---- Layer 1 ----
        self.conv1 = nn.Conv2d(
            in_channels=1,   # grayscale EEG image
            out_channels=5,  # 5 kernels
            kernel_size=5
        )
        self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2)

        # ---- Layer 2 ----
        self.conv2 = nn.Conv2d(
            in_channels=5,
            out_channels=10,  # doubled from 5 → 10
            kernel_size=5
        )
        self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2)

        # ---- Fully Connected ----
        # *You must update the in_features based on image size*
        self.fc1 = nn.Linear(10 * 53 * 53, 128)  
        self.fc2 = nn.Linear(128, num_classes)
        self.softmax = nn.Softmax(dim=1)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = self.pool1(x)

        x = F.relu(self.conv2(x))
        x = self.pool2(x)

        x = x.view(x.size(0), -1)  # flatten
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        x = self.softmax(x)
        return x


In [84]:
class EEGTrainer:
    def __init__(self, model, lr=0.001, epochs=10):
        print(f"[TRAINER INIT] lr={lr}, epochs={epochs}")
        self.model = model
        self.criterion = nn.CrossEntropyLoss()
        self.optimizer = optim.Adam(model.parameters(), lr=lr)
        self.epochs = epochs

    def train(self, train_loader):
        print("[TRAIN] Starting training...")
        self.model.train()
        for epoch in range(self.epochs):
            total_loss = 0
            for X, y in train_loader:
                self.optimizer.zero_grad()
                outputs = self.model(X)
                loss = self.criterion(outputs, y)
                loss.backward()
                self.optimizer.step()
                total_loss += loss.item()
            print(f"[EPOCH {epoch+1}/{self.epochs}] Loss: {total_loss/len(train_loader):.4f}")

    def validate(self, val_loader):
        print("[VALIDATE] Running validation...")
        self.model.eval()
        correct, total = 0, 0
        with torch.no_grad():
            for X, y in val_loader:
                outputs = self.model(X)
                _, predicted = torch.max(outputs, 1)
                total += y.size(0)
                correct += (predicted == y).sum().item()
        print(f"[VALIDATE] Accuracy: {100 * correct / total:.2f}%")

    def save_model(self, path):
        torch.save(self.model.state_dict(), path)
        print(f"[SAVE] Model saved to {path}")


In [95]:
from sklearn.model_selection import train_test_split

# ==============================
# DATA PREPARATION
# ==============================


X = torch.tensor(all_data, dtype=torch.float32)
y = torch.tensor(all_labels, dtype=torch.long)
print(f"[DATA] Combined data shape: {X.shape}, Labels shape: {y.shape}")

# Split into train/test sets (80% train, 20% test)
X_train, X_val, y_train, y_val = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y  # stratify keeps class balance
)

# Create TensorDatasets
train_dataset = TensorDataset(X_train, y_train)
val_dataset = TensorDataset(X_val, y_val)

# Create DataLoaders
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False)

# ==============================
# MODEL TRAINING
# ==============================
model = EEGModel(input_size=2048, hidden_size=64)
trainer = EEGTrainer(model, lr=0.001, epochs=100)
trainer.train(train_loader)
trainer.validate(val_loader)

[DATA] Combined data shape: torch.Size([574, 5, 2048]), Labels shape: torch.Size([574])
[MODEL INIT] Initializing LSTM model...
[MODEL INIT] LSTM model ready
[TRAINER INIT] lr=0.001, epochs=100
[TRAIN] Starting training...
[EPOCH 1/100] Loss: 1.3899
[EPOCH 2/100] Loss: 1.3869
[EPOCH 3/100] Loss: 1.3869
[EPOCH 4/100] Loss: 1.3870
[EPOCH 5/100] Loss: 1.3864
[EPOCH 6/100] Loss: 1.3857
[EPOCH 7/100] Loss: 1.3849
[EPOCH 8/100] Loss: 1.3839
[EPOCH 9/100] Loss: 1.3831
[EPOCH 10/100] Loss: 1.3831
[EPOCH 11/100] Loss: 1.3822
[EPOCH 12/100] Loss: 1.3822
[EPOCH 13/100] Loss: 1.3826
[EPOCH 14/100] Loss: 1.3824
[EPOCH 15/100] Loss: 1.3825
[EPOCH 16/100] Loss: 1.3820
[EPOCH 17/100] Loss: 1.3818
[EPOCH 18/100] Loss: 1.3813
[EPOCH 19/100] Loss: 1.3819
[EPOCH 20/100] Loss: 1.3820
[EPOCH 21/100] Loss: 1.3820
[EPOCH 22/100] Loss: 1.3818
[EPOCH 23/100] Loss: 1.3818
[EPOCH 24/100] Loss: 1.3818
[EPOCH 25/100] Loss: 1.3817
[EPOCH 26/100] Loss: 1.3818
[EPOCH 27/100] Loss: 1.3818
[EPOCH 28/100] Loss: 1.3817
[E

In [96]:
model_path="LSTM_MI_RES.pth"

print(f"[PIPELINE COMPLETE] Model saved to {model_path}")

[PIPELINE COMPLETE] Model saved to LSTM_MI_RES.pth
