<a href="https://colab.research.google.com/github/Ajinkya-18/NeuroVision/blob/main/notebooks/neurovision_colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# NeuroVision

## Dataset Preparation

In [1]:
# Make sure your Google Drive is mounted
from google.colab import drive
drive.mount('/content/drive')

# --- This is the key step ---
# Use 'cp -r' to recursively copy the entire folder.
# This will be SLOW, as it's copying thousands of individual files over the network.
# Let it run until it's finished.

print("Starting to copy dataset folder from Drive to local storage...")
print("This may take a significant amount of time, please be patient.")

# !cp -r "/content/drive/MyDrive/NeuroVision/data/Classes_Regrouped_Dataset" "/content/eeg_dataset"

print("Copying complete!")

Mounted at /content/drive
Starting to copy dataset folder from Drive to local storage...
This may take a significant amount of time, please be patient.
Copying complete!


In [2]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import os
import shutil

In [3]:
# function to read the dir contents of dataset folder and segregate them
# into n separate classes.
def create_dataset_folders(metadata_file:str, csv_dir:str, output_dir:str):
    class_id_to_folder = {}

    with open(metadata_file, 'r', encoding='utf-8') as f:
        for line in f:
            parts = line.strip().split('\t')

            if len(parts) < 3:
                continue

            label_str, _, class_id = parts
            # print(label_str, class_id)
            first_label = label_str.split(',')[0].strip()
            # print(first_label)
            class_id_to_folder[class_id] = first_label

        count = 0
        for filename in os.listdir(csv_dir):
            if not filename.endswith('.csv'):
                continue

            class_id = filename.split('_')[3]

            folder_name = class_id_to_folder.get(class_id)
            print(folder_name)

            if not folder_name:
                print(f'Unknown class id: {class_id}')
                continue

            safe_folder = folder_name.replace('/', '_').replace('\\', '_').strip()

            dest_folder = os.path.join(output_dir, safe_folder)
            os.makedirs(dest_folder, exist_ok=True)

            src_path = os.path.join(csv_dir, filename)
            dst_path = os.path.join(dest_folder, filename)

            # print(f"Move: {src_path} to {dst_path}")
            count+=1
            print(count)
            shutil.copy(src_path, dst_path)


In [4]:
# create_dataset_folders('../data/WordReport-v1.04.txt',
#                        '../data/MindBigData-Imagenet',
#                        '../data/Segregated_Dataset')

In [5]:
import shutil
import json
import os

def reorganize_dataset(mapping_file, src_root, dst_root, move=False):
    with open(mapping_file, 'r') as f:
        mapping = json.load(f)

    os.makedirs(os.path.dirname(dst_root), exist_ok=True)
    # src_root = os.path.dirname(src_root)

    for super_class, sub_classes in mapping.items():
        super_cls_dir = os.path.join(dst_root, super_class)
        os.makedirs(super_cls_dir, exist_ok=True)

        for sub_class in sub_classes:
            sub_cls_dir = os.path.join(src_root, sub_class)
            if not os.path.exists(sub_cls_dir):
                print(f"[Warning] Sub-class folder not found: {sub_cls_dir}")
                continue

            for file_name in os.listdir(sub_cls_dir):
                src_file = os.path.join(sub_cls_dir, file_name)
                dst_file = os.path.join(super_cls_dir, file_name)

                if move:
                    shutil.move(src_file, dst_file)

                else:
                    shutil.copy2(src_file, dst_file)

            print(f"[OK] {'Moved' if move else 'Copied'} {sub_class} -> {super_class}")
    print("Dataset reorganization complete!")

In [6]:
# reorganize_dataset(mapping_file='../content/drive/MyDrive/NeuroVision/class_mapping_v4.json',
#                    src_root='../content/drive/MyDrive/NeuroVision/Segregated_Dataset',
#                    dst_root='../content/drive/MyDrive/NeuroVision/data/Classes_Regrouped_Dataset',
#                    move=False)

## Dataset Processing for PyTorch

In [7]:
import torch
import os
import pandas as pd
from torch.utils.data import DataLoader, Dataset, Subset
from sklearn.model_selection import train_test_split

In [132]:
class EEGDataset(Dataset):
    def __init__(self, root_dir, samples, num_channels=5, transform=None):
        self.root_dir = root_dir
        self.samples = samples
        self.transform = transform
        self.num_channels = num_channels
        self.class_to_idx = list(set([label for _, label in self.samples]))

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        file_path, label = self.samples[idx]

        df = pd.read_csv(file_path, header=None, index_col=0)
        eeg_data = torch.tensor(df.values, dtype=torch.float32)

        if eeg_data.shape[1] != self.num_channels:
          if eeg_data.shape[0] == self.num_channels:
            eeg_data = eeg_data.T

          else:
            raise ValueError(f"File {file_path} has invalid shape: {eeg_data.shape}")

        if self.transform:
            eeg_data = self.transform(eeg_data)

        return eeg_data, label


In [133]:
def make_datasets(root_dir, val_ratio=0.25, random_state=42):
    class_names = os.listdir(root_dir)
    class_to_idx = {cls:idx for idx, cls in enumerate(class_names)}

    all_samples = []
    all_labels = []

    for cls in class_names:
        cls_dir = os.path.join(root_dir, cls)

        for fname in os.listdir(cls_dir):
            if fname.endswith('.csv'):
                path = os.path.join(cls_dir, fname)
                all_samples.append((path, class_to_idx[cls]))
                all_labels.append(class_to_idx[cls])

    train_idx, val_idx = train_test_split(
        list(range(len(all_samples))),
        test_size=val_ratio,
        random_state=random_state,
        stratify=all_labels
    )

    train_samples = [all_samples[i] for i in train_idx]
    val_samples = [all_samples[i] for i in val_idx]

    train_dataset = EEGDataset(root_dir, train_samples)
    val_dataset = EEGDataset(root_dir, val_samples)

    return train_dataset, val_dataset

In [134]:
from torch.nn.utils.rnn import pad_sequence

def collate_fn(batch):
    sequences, labels = zip(*batch)

    lengths = torch.tensor([seq.size(0) for seq in sequences], dtype=torch.long)
    padded_seqs = pad_sequence(sequences, batch_first=True)

    return padded_seqs, torch.tensor(labels), lengths

In [135]:
def create_sampler(dataset):
  from collections import Counter

  all_labels = [label for _, label in dataset.samples]

  class_counts = Counter(all_labels)

  num_classes = len(dataset.class_to_idx)
  class_weights = torch.zeros(num_classes)

  for class_idx, count in class_counts.items():
    if count > 0:
      class_weights[class_idx] = 1.0 / count

  sample_weights = [class_weights[label] for label in all_labels]

  sampler = WeightedRandomSampler(
      weights=sample_weights,
      num_samples=len(dataset.samples),
      replacement=True
  )

  return sampler

In [136]:
root_dir = '../content/drive/MyDrive/NeuroVision/data/Classes_Regrouped_Dataset'

In [137]:
train_dataset, val_dataset = make_datasets(root_dir)

In [138]:
len(train_dataset), len(val_dataset)

(10288, 3430)

In [139]:
train_sampler = create_sampler(train_dataset)

In [141]:
train_loader = DataLoader(train_dataset, batch_size=128, shuffle=False, sampler=train_sampler,
                          collate_fn=collate_fn, num_workers=2, pin_memory=False,
                          persistent_workers=False, prefetch_factor=2)

val_loader = DataLoader(val_dataset, batch_size=128, shuffle=False, collate_fn=collate_fn,
                        num_workers=2, pin_memory=False, persistent_workers=False, prefetch_factor=4)

## Model Architecture

In [142]:
import torch
import torch.nn as nn

In [80]:
class EegLstm(nn.Module):
    def __init__(self, input_dims=5, hidden_dims=128, num_layers=3, dropout=0.3 , num_classes=len(os.listdir(root_dir))):
        super(EegLstm, self).__init__()

        self.lstm = nn.LSTM(
            input_size=input_dims,
            hidden_size=hidden_dims,
            num_layers=num_layers,
            batch_first=True,
            dropout=dropout if num_layers >= 2 else 0,
            bidirectional=True
        )

        self.conv_stack = nn.Sequential(
            nn.Conv2d(hidden_dims*2)
        )

        self.fc = nn.Sequential(
            nn.Linear(hidden_dims*2, hidden_dims),
            nn.BatchNorm1d(hidden_dims),
            nn.SELU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_dims, hidden_dims),
            nn.BatchNorm1d(hidden_dims),
            nn.SELU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_dims, hidden_dims//2),
            nn.BatchNorm1d(hidden_dims//2),
            nn.SELU(),
            nn.Linear(hidden_dims, num_classes)
        )

    def forward(self, x, lengths=None):
        if lengths is not None:
            packed = nn.utils.rnn.pack_padded_sequence(
                x, lengths.cpu(), batch_first=True, enforce_sorted=False
            )

            packed_out, (h_n, c_n) = self.lstm(packed)

        else:
            out, (h_n, c_n) = self.lstm(x)

        last_hidden_backward, last_hidden_forward = h_n[-1], h_n[-2]
        logits=self.fc(torch.cat((last_hidden_backward, last_hidden_forward), dim=1))

        return logits

In [81]:
class HybridExtractor(nn.Module):
    def __init__(self, input_dims=5, cnn_out_channels=64, kernel_size=50, lstm_hidden_dims=128, num_layers=3, dropout=0.3 , num_classes=len(os.listdir(root_dir))):
        super(HybridExtractor, self).__init__()

        # CNN block
        self.cnn_stack = nn.Sequential(
            nn.Conv1d(input_dims, 32, kernel_size=kernel_size, stride=1, padding='same'),
            nn.BatchNorm1d(32),
            nn.ELU(),
            nn.AvgPool1d(kernel_size=2, stride=2),

            nn.Conv1d(32, cnn_out_channels, kernel_size=kernel_size//2, stride=1, padding='same'),
            nn.BatchNorm1d(cnn_out_channels),
            nn.ELU(),
            nn.AvgPool1d(kernel_size=2, stride=2)
        )

        self.lstm = nn.LSTM(
            input_size=cnn_out_channels,
            hidden_size=lstm_hidden_dims,
            num_layers=num_layers,
            batch_first=True,
            dropout=dropout if num_layers > 1 else 0,
            bidirectional=True
        )

        self.fc = nn.Sequential(
            nn.Linear(lstm_hidden_dims*2, lstm_hidden_dims),
            nn.BatchNorm1d(lstm_hidden_dims),
            nn.SELU(),
            nn.Dropout(dropout),
            nn.Linear(lstm_hidden_dims, lstm_hidden_dims),
            nn.BatchNorm1d(lstm_hidden_dims),
            nn.SELU(),
            nn.Dropout(dropout),
            nn.Linear(lstm_hidden_dims, num_classes)
        )

    def forward(self, x, lengths=None):
      x = x.permute(0, 2, 1)

      cnn_out = self.cnn_stack(x)

      lstm_input = cnn_out.permute(0, 2, 1)


      if lengths is not None:
        new_lengths = (lengths//4).long()

        packed = nn.utils.rnn.pack_padded_sequence(
                lstm_input, new_lengths.cpu(), batch_first=True, enforce_sorted=False
            )

        packed_out, (h_n, c_n) = self.lstm(packed)

      else:
          out, (h_n, c_n) = self.lstm(lstm_input)

      last_hidden_backward, last_hidden_forward = h_n[-1, :, :], h_n[-2, :, :]
      logits=self.fc(torch.cat((last_hidden_backward, last_hidden_forward), dim=1))

      return logits

In [143]:
def weights_init(m):
  if isinstance(m, (nn.Conv1d, nn.Linear)):
    nn.init.kaiming_normal_(m.weight, mode='fan_in', nonlinearity='selu')

    if m.bias is not None:
      nn.init.constant_(m.bias, 0)

In [144]:
from tqdm import tqdm
import torch.optim as optim
from torch.utils.tensorboard import SummaryWriter

In [145]:
class EarlyStopping(object):
    def __init__(self, model, save_path='../content/drive/MyDrive/eeg_classifier.pt', patience=4, tol=1e-3):
        self.model = model
        self.save_path = save_path
        self.patience = patience
        self.counter = 0
        self.tol = tol
        self.best_val_loss = float('inf')
        self.early_stop = False

    def __call__(self, batch_val_loss):
        if batch_val_loss < self.best_val_loss - self.tol:
            torch.save(self.model.state_dict(), self.save_path)
            self.best_val_loss = batch_val_loss
            self.counter = 0
            print(f'Validation Loss improved -> model saved to {self.save_path}')

        else:
            if self.counter < self.patience:
                self.counter += 1
                print(f'No improvement in Val Loss. Counter: {self.counter}/{self.patience}')

            else:
                self.early_stop = True
                print(f"Early Stopping triggered!")


In [146]:
def train_model(model, model_name, train_loader, val_loader, epochs=15, lr=1e-3, device='cpu'):
    log_dir = f'../content/drive/MyDrive/NeuroVision/runs/{model_name}'
    save_path = f'../content/drive/MyDrive/NeuroVision/models/{model_name}_v2_best.pth'
    os.makedirs(os.path.dirname(log_dir), exist_ok=True)
    os.makedirs(os.path.dirname(save_path), exist_ok=True)

    criterion = nn.CrossEntropyLoss(weight=torch.Tensor(train_cls_wts).to(device))
    optimizer = optim.AdamW(model.parameters(), lr=lr)
    writer = SummaryWriter(log_dir=log_dir)
    # scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=3)
    scheduler = optim.lr_scheduler.OneCycleLR(optimizer, max_lr=lr,
                                              total_steps=len(train_loader)*epochs,
                                              pct_start=0.2)
    early_stopping = EarlyStopping(model, save_path=save_path, patience=6)
    model.to(device)

    for epoch in range(epochs):
        model.train()
        train_loss, train_correct, train_total = 0.0, 0, 0
        train_bar = tqdm(train_loader, desc=f'Epoch {epoch+1}/{epochs} [Train Pass]', leave=True)

        for batch_x, batch_y, lengths in train_bar:
            batch_x, batch_y, lengths = batch_x.to(device), batch_y.to(device), lengths.to(device)

            optimizer.zero_grad()
            y_preds = model(batch_x, lengths)

            loss = criterion(y_preds, batch_y)
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
            optimizer.step()

            scheduler.step()

            train_loss += loss.item() * batch_x.size(0)
            _, preds = torch.max(y_preds, 1)
            train_correct += (preds == batch_y).sum().item()
            train_total += batch_y.size(0)

            train_bar.set_postfix(loss=loss.item())

        train_acc = train_correct / train_total
        train_loss /= train_total


        model.eval()
        val_loss, val_correct, val_total = 0.0, 0, 0

        val_bar = tqdm(val_loader, desc=f"Epoch{epoch+1}/{epochs} [Val Pass]", leave=True)

        with torch.no_grad():
            for batch_x, batch_y, lengths in val_bar:
                batch_x, batch_y, lengths = batch_x.to(device), batch_y.to(device), lengths.to(device)

                y_preds = model(batch_x, lengths)
                loss = criterion(y_preds, batch_y)

                val_loss += loss.item() * batch_x.size(0)
                _, preds = torch.max(y_preds, 1)
                print(f"Sample Predictions: {preds.cpu().numpy()}")
                val_correct += (preds == batch_y).sum().item()
                val_total += batch_y.size(0)

                val_bar.set_postfix(loss=loss.item())

        val_acc = val_correct / val_total
        val_loss /= val_total

        # scheduler.step(val_loss)

        early_stopping(val_loss)
        if early_stopping.early_stop:
            break


        # logging
        writer.add_scalar('Loss/train', train_loss, epoch)
        writer.add_scalar('Loss/val', val_loss, epoch)
        writer.add_scalar('Accuracy/train', train_acc, epoch)
        writer.add_scalar('Accuracy/val', val_acc, epoch)

        print(f"Epoch {epoch+1}/{epochs}:\nTrain Loss: {train_loss:.3f} | Train Acc: {train_acc*100:.2f} %\nVal Loss: {val_loss:.3f} | Val Acc: {val_acc*100:.2f} %")

    writer.close()

In [147]:
def model_summary(model):
    print('========================================= Model Summary ==============================================\n')
    print(f"\n{'='*55}")
    print(f"{'| Parameter Name':31}|| Number of Parameters|")
    print(f"{'='*55}")

    total_params = 0

    for name, param in model.named_parameters():
        print(f'| {name:30}|{param.numel():20} |')
        print(f"{'-'*55}")
        total_params += param.numel()

    print(f"\nTotal Parameters: {total_params:,}")

## Model Training

In [98]:
# lstm_model = EegLstm(input_dims=5, hidden_dims=256, num_layers=4, dropout=0.4)
# hybrid_model = HybridExtractor(input_dims=5, cnn_out_channels=64, lstm_hidden_dims=256, num_layers=3, dropout=0.5)

In [148]:
mesonet = EEG_MesoNet(5, len(os.listdir(root_dir)), 0.5)

In [149]:
mesonet.apply(weights_init)
# hybrid_model.load_state_dict(torch.load('../content/drive/MyDrive/NeuroVision/models/EEG_LSTM_v1_best.pth'))

EEG_MesoNet(
  (branch_fine): ConvBranch(
    (conv): Conv1d(5, 16, kernel_size=(10,), stride=(1,), padding=same, bias=False)
    (bn): BatchNorm1d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (pool): AvgPool1d(kernel_size=(4,), stride=(4,), padding=(0,))
  )
  (branch_medium): ConvBranch(
    (conv): Conv1d(5, 16, kernel_size=(50,), stride=(1,), padding=same, bias=False)
    (bn): BatchNorm1d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (pool): AvgPool1d(kernel_size=(4,), stride=(4,), padding=(0,))
  )
  (branch_coarse): ConvBranch(
    (conv): Conv1d(5, 16, kernel_size=(150,), stride=(1,), padding=same, bias=False)
    (bn): BatchNorm1d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (pool): AvgPool1d(kernel_size=(4,), stride=(4,), padding=(0,))
  )
  (fc): Sequential(
    (0): Linear(in_features=48, out_features=256, bias=True)
    (1): BatchNorm1d(256, eps=1e-05, momentum=0.1, affine=True, track_running_s

In [150]:
model_summary(mesonet)



| Parameter Name               || Number of Parameters|
| branch_fine.conv.weight       |                 800 |
-------------------------------------------------------
| branch_fine.bn.weight         |                  16 |
-------------------------------------------------------
| branch_fine.bn.bias           |                  16 |
-------------------------------------------------------
| branch_medium.conv.weight     |                4000 |
-------------------------------------------------------
| branch_medium.bn.weight       |                  16 |
-------------------------------------------------------
| branch_medium.bn.bias         |                  16 |
-------------------------------------------------------
| branch_coarse.conv.weight     |               12000 |
-------------------------------------------------------
| branch_coarse.bn.weight       |                  16 |
-------------------------------------------------------
| branch_coarse.bn.bias         |             

In [152]:
!nvidia-smi

Thu Aug 28 20:53:30 2025       
+-----------------------------------------------------------------------------------------+
| NVIDIA-SMI 550.54.15              Driver Version: 550.54.15      CUDA Version: 12.4     |
|-----------------------------------------+------------------------+----------------------+
| GPU  Name                 Persistence-M | Bus-Id          Disp.A | Volatile Uncorr. ECC |
| Fan  Temp   Perf          Pwr:Usage/Cap |           Memory-Usage | GPU-Util  Compute M. |
|                                         |                        |               MIG M. |
|   0  Tesla T4                       Off |   00000000:00:04.0 Off |                    0 |
| N/A   77C    P0             34W /   70W |    7556MiB /  15360MiB |      0%      Default |
|                                         |                        |                  N/A |
+-----------------------------------------+------------------------+----------------------+
                                                

In [151]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
train_model(mesonet, 'mesonet', train_loader, val_loader, 30, 1e-4, device)

Epoch 1/30 [Train Pass]:   5%|▍         | 4/81 [03:40<1:10:44, 55.13s/it, loss=5.07]


KeyboardInterrupt: 

## Model Testing