In [1]:
print("hi")

hi


In [2]:
!mkdir -p /kaggle/working/project/data
!mkdir -p /kaggle/working/project/model
!mkdir -p /kaggle/working/project/results


In [3]:
%%writefile /kaggle/working/project/data/data_loader.py


import numpy as np
import torch
from torch.utils.data import Dataset
import mne
import scipy.io as sio


class BCIV2aDataset(Dataset):
    def __init__(self, gdf_path, mat_path=None):

        print(f"Loading {gdf_path}")

        raw = mne.io.read_raw_gdf(gdf_path, preload=True, verbose=False)
        # raw.pick("eeg")
        # Keep only first 22 EEG channels (exclude EOG) 
        eeg_channels = raw.ch_names[:22]
        raw.pick_channels(eeg_channels)

        raw.filter(8., 30., fir_design="firwin", verbose=False)

        events, event_dict = mne.events_from_annotations(raw)

        # -------- TRAINING SESSION --------
        if mat_path is None:

            mi_event_names = ['769', '770', '771', '772']

            valid_event_id = {
                key: event_dict[key]
                for key in mi_event_names
                if key in event_dict
            }

            epochs = mne.Epochs(
                raw,
                events,
                event_id=valid_event_id,
                tmin=0.5,
                tmax=2.5,
                baseline=None,
                preload=True,
                verbose=False
            )

            self.data = epochs.get_data()
            # self.labels = epochs.events[:, -1] - 1
            # Map event IDs to 0â€“3
            label_map = {
            valid_event_id['769']: 0,
            valid_event_id['770']: 1,
            valid_event_id['771']: 2,
            valid_event_id['772']: 3
            }

            self.labels = np.array([label_map[e] for e in epochs.events[:, -1]])


        # -------- EVALUATION SESSION --------
        else:

            # Use event 783 for evaluation session
            if '783' not in event_dict:
                raise ValueError("Event 783 not found in E file.")

            eval_event_id = {'783': event_dict['783']}

            epochs = mne.Epochs(
                raw,
                events,
                event_id=eval_event_id,
                tmin=0.5,
                tmax=2.5,
                baseline=None,
                preload=True,
                verbose=False
            )

            self.data = epochs.get_data()

            # Load true labels from .mat
            mat = sio.loadmat(mat_path)
            self.labels = mat["classlabel"].squeeze().astype(int) - 1

        print(f"Loaded {len(self.data)} trials")

    def __len__(self):
        return len(self.data)

    # def __getitem__(self, idx):
    #     x = torch.tensor(self.data[idx], dtype=torch.float32)
    #     y = torch.tensor(self.labels[idx], dtype=torch.long)
    #     return x, y
    def __getitem__(self, idx):
          x = self.data[idx]  # (C, T)

          # Trial-wise normalization
          x = (x - np.mean(x, axis=1, keepdims=True)) / (np.std(x, axis=1, keepdims=True) + 1e-6)

          x = torch.tensor(x, dtype=torch.float32)
          y = torch.tensor(self.labels[idx], dtype=torch.long)

          return x, y









Writing /kaggle/working/project/data/data_loader.py


In [4]:
!cp -r /kaggle/input/datasets/bhaskarkumar01/bciciv-2a /kaggle/working/project/data/BCICIV_2a_gdf


In [5]:
!python /kaggle/working/project/data/data_loader.py


In [6]:
%%writefile /kaggle/working/project/data/utils.py
import torch
import numpy as np
from scipy.signal import butter, lfilter

def accuracy(preds, labels):
    _, pred = torch.max(preds, 1)
    return (pred == labels).float().mean().item()

def bandpass_filter(data, low=8, high=30, fs=250, order=4):
    nyq = 0.5 * fs
    low /= nyq
    high /= nyq
    b, a = butter(order, [low, high], btype="band")
    return lfilter(b, a, data, axis=-1)

def normalize(trial):
    mean = trial.mean(axis=-1, keepdims=True)
    std = trial.std(axis=-1, keepdims=True) + 1e-6
    return (trial - mean) / std



Writing /kaggle/working/project/data/utils.py


In [7]:
%%writefile /kaggle/working/project/model/multiscale_cnn.py
import torch
import torch.nn as nn
import torch.nn.functional as F

class MultiScaleCNN(nn.Module):
    def __init__(self, in_channels):
        super().__init__()

        self.conv3 = nn.Conv2d(in_channels, 32, kernel_size=(1,3), padding=(0,1))
        self.conv5 = nn.Conv2d(in_channels, 32, kernel_size=(1,5), padding=(0,2))
        self.conv7 = nn.Conv2d(in_channels, 32, kernel_size=(1,7), padding=(0,3))

        self.bn = nn.BatchNorm2d(96)

    def forward(self, x):
        # x: (B, C, 1, T)
        f1 = F.relu(self.conv3(x))
        f2 = F.relu(self.conv5(x))
        f3 = F.relu(self.conv7(x))

        out = torch.cat([f1, f2, f3], dim=1)
        out = self.bn(out)

        return out



Writing /kaggle/working/project/model/multiscale_cnn.py


In [8]:
%%writefile /kaggle/working/project/model/transformer.py
import torch.nn as nn

class TransformerEncoder(nn.Module):
    def __init__(self, embed_dim):
        super().__init__()

        layer = nn.TransformerEncoderLayer(
            d_model=embed_dim,
            nhead=4,
            batch_first=True
        )

        self.encoder = nn.TransformerEncoder(layer, num_layers=1)

    def forward(self, x):
        # x: (B, T, D)
        return self.encoder(x)



Writing /kaggle/working/project/model/transformer.py


In [9]:
%%writefile /kaggle/working/project/model/cl_mst.py
import torch
import torch.nn as nn
from model.multiscale_cnn import MultiScaleCNN
from model.transformer import TransformerEncoder

class CL_MST(nn.Module):
    def __init__(self, in_channels, num_classes):
        super().__init__()

        self.mscnn = MultiScaleCNN(in_channels)
        self.transformer = TransformerEncoder(embed_dim=96)

        self.classifier = nn.Linear(96, num_classes)

        self.projection_head = nn.Sequential(
            nn.Linear(96, 64),
            nn.ReLU(),
            nn.Linear(64, 32)
        )

    def forward(self, x):
        # x: (B, C, T)
        x = x.unsqueeze(2)               # (B, C, 1, T)
        feats = self.mscnn(x)            # (B, 96, 1, T)
        feats = feats.squeeze(2)         # (B, 96, T)
        feats = feats.permute(0, 2, 1)   # (B, T, 96)

        encoded = self.transformer(feats)
        pooled = encoded.mean(dim=1)

        logits = self.classifier(pooled)
        proj = self.projection_head(pooled)

        return logits, proj



Writing /kaggle/working/project/model/cl_mst.py


In [10]:
%%writefile /kaggle/working/project/train.py



# train.py

import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, ConcatDataset
from sklearn.metrics import accuracy_score, cohen_kappa_score, confusion_matrix
import numpy as np

from data.data_loader  import BCIV2aDataset
from model.cl_mst import CL_MST  # your model

from loss import supervised_contrastive_loss



DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
DATA_DIR = "/kaggle/input/datasets/bhaskarkumar01/bciciv-2a"


def load_all_subjects():
    train_datasets = []
    test_datasets = []

    for i in range(1, 10):
        subject = f"A0{i}"

        train_gdf = os.path.join(DATA_DIR, f"{subject}T.gdf")
        test_gdf = os.path.join(DATA_DIR, f"{subject}E.gdf")
        test_mat = os.path.join(DATA_DIR, f"{subject}E.mat")

        train_datasets.append(BCIV2aDataset(train_gdf))
        test_datasets.append(BCIV2aDataset(test_gdf, test_mat))

    train_dataset = ConcatDataset(train_datasets)
    test_dataset = ConcatDataset(test_datasets)

    return train_dataset, test_dataset


# def train_model(model, train_loader, criterion, optimizer):
#     model.train()
#     total_loss = 0

#     for x, y in train_loader:
#         x, y = x.to(DEVICE), y.to(DEVICE)

#         optimizer.zero_grad()
#         # outputs = model(x)
#         # loss = criterion(outputs, y)
#         logits, _ = model(x)
#         loss = criterion(logits, y)

#         loss.backward()
#         optimizer.step()

#         total_loss += loss.item()

#     return total_loss / len(train_loader)

def train_model(model, train_loader, ce_loss, optimizer, lambda_c=0.2):
    model.train()
    total_loss = 0

    for x, y in train_loader:
        x, y = x.to(DEVICE), y.to(DEVICE)

        optimizer.zero_grad()

        logits, proj = model(x)

        loss_ce = ce_loss(logits, y)
        loss_con = supervised_contrastive_loss(proj, y)

        loss = loss_ce + lambda_c * loss_con

        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    return total_loss / len(train_loader)



def evaluate_model(model, test_loader):
    model.eval()

    all_preds = []
    all_labels = []

    with torch.no_grad():
        for x, y in test_loader:
            x = x.to(DEVICE)

            # outputs = model(x)
            # preds = torch.argmax(outputs, dim=1).cpu().numpy()
            logits, _ = model(x)
            preds = torch.argmax(logits, dim=1).cpu().numpy()



            all_preds.extend(preds)
            all_labels.extend(y.numpy())

    acc = accuracy_score(all_labels, all_preds)
    kappa = cohen_kappa_score(all_labels, all_preds)
    cm = confusion_matrix(all_labels, all_preds)

    return acc, kappa, cm


def main():
    print("Loading datasets...")

    train_dataset, test_dataset = load_all_subjects()

    print("Train samples:", len(train_dataset))
    print("Test samples:", len(test_dataset))

    train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

    # model = CL_MST().to(DEVICE)
    model = CL_MST(in_channels=22, num_classes=4).to(DEVICE)


    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    EPOCHS = 50

    for epoch in range(EPOCHS):
        # loss = train_model(model, train_loader, criterion, optimizer)
        loss = train_model(model, train_loader, criterion, optimizer, lambda_c=0.2)
        print(f"Epoch [{epoch+1}/{EPOCHS}] Loss: {loss:.4f}")

    print("\nEvaluating on E session...")

    acc, kappa, cm = evaluate_model(model, test_loader)

    print("\n===== FINAL RESULTS =====")
    print(f"Accuracy: {acc*100:.2f}%")
    print(f"Cohen Kappa: {kappa:.4f}")
    print("Confusion Matrix:")
    print(cm)


if __name__ == "__main__":
    main()



Writing /kaggle/working/project/train.py


In [11]:
%%writefile /kaggle/working/project/loss.py
# import torch.nn.functional as F

# def contrastive_loss(z1, z2):
#     z1 = F.normalize(z1, dim=1)
#     z2 = F.normalize(z2, dim=1)
#     return 1 - F.cosine_similarity(z1, z2).mean()



# loss.py

import torch
import torch.nn.functional as F

def supervised_contrastive_loss(features, labels, temperature=0.07):
    """
    features: (B, D)
    labels: (B,)
    """

    device = features.device
    features = F.normalize(features, dim=1)

    similarity_matrix = torch.matmul(features, features.T) / temperature

    labels = labels.contiguous().view(-1, 1)
    mask = torch.eq(labels, labels.T).float().to(device)

    logits_mask = torch.ones_like(mask) - torch.eye(mask.shape[0]).to(device)
    mask = mask * logits_mask

    exp_sim = torch.exp(similarity_matrix) * logits_mask

    log_prob = similarity_matrix - torch.log(exp_sim.sum(dim=1, keepdim=True) + 1e-8)

    mean_log_prob_pos = (mask * log_prob).sum(dim=1) / (mask.sum(dim=1) + 1e-8)

    loss = -mean_log_prob_pos.mean()

    return loss


Writing /kaggle/working/project/loss.py


In [12]:
%%writefile /kaggle/working/project/utils.py
import torch
import numpy as np
from scipy.signal import butter, lfilter

def accuracy(preds, labels):
    _, pred = torch.max(preds, 1)
    return (pred == labels).float().mean().item()

def bandpass_filter(data, low=8, high=30, fs=250, order=4):
    nyq = 0.5 * fs
    low /= nyq
    high /= nyq
    b, a = butter(order, [low, high], btype="band")
    return lfilter(b, a, data, axis=-1)

def normalize(trial):
    mean = trial.mean(axis=-1, keepdims=True)
    std = trial.std(axis=-1, keepdims=True) + 1e-6
    return (trial - mean) / std


Writing /kaggle/working/project/utils.py


In [13]:
import sys
sys.path.append("/kaggle/working/project")


In [14]:
!python  /kaggle/working/project/train.py

Loading datasets...
Loading /kaggle/input/datasets/bhaskarkumar01/bciciv-2a/A01T.gdf
  next(self.gen)
NOTE: pick_channels() is a legacy function. New code should use inst.pick(...).
Used Annotations descriptions: [np.str_('1023'), np.str_('1072'), np.str_('276'), np.str_('277'), np.str_('32766'), np.str_('768'), np.str_('769'), np.str_('770'), np.str_('771'), np.str_('772')]
Loaded 288 trials
Loading /kaggle/input/datasets/bhaskarkumar01/bciciv-2a/A01E.gdf
  next(self.gen)
NOTE: pick_channels() is a legacy function. New code should use inst.pick(...).
Used Annotations descriptions: [np.str_('1023'), np.str_('1072'), np.str_('276'), np.str_('277'), np.str_('32766'), np.str_('768'), np.str_('783')]
Loaded 288 trials
Loading /kaggle/input/datasets/bhaskarkumar01/bciciv-2a/A02T.gdf
  next(self.gen)
NOTE: pick_channels() is a legacy function. New code should use inst.pick(...).
Used Annotations descriptions: [np.str_('1023'), np.str_('1072'), np.str_('276'), np.str_('277'), np