In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
import mne
from mne import Epochs , pick_types

tmin, tmax = -1.0, 5

def make_epochs(edf_file  , preload = True) :
    raw = mne.io.read_raw_edf( edf_file , preload=True)
    # print(raw.info['sfreq'])
    raw.filter(7.0, 40.0, fir_design="firwin", skip_by_annotation="edge")
    picks = pick_types(raw.info, meg=False, eeg=True, stim=False, eog=False, exclude="bads")
    data, times = raw.get_data(return_times=True)
    epochs = Epochs(
                    raw,
                    tmin=tmin,
                    tmax=tmax,
                    proj=True,
                    picks=picks,
                    baseline=None,
                    preload=True,
             )
    return epochs
    

In [None]:
import os
import mne
from pathlib import Path
from tqdm import tqdm

import warnings

mne.set_log_level('WARNING')
warnings.filterwarnings("ignore", category=RuntimeWarning)


X = []
path = Path('/kaggle/input/eeg-open-source/files')
all_files = sorted(path.iterdir())
half_files = all_files[:len(all_files) // 2]

class_1 = []
class_2 = []
class_3 = []
class_4 = []


for file in tqdm(half_files, desc="Processing directories"):
    if file.is_dir():

        
        for edf_file in sorted(file.iterdir()):
            
            if edf_file.name.endswith('.edf') and not edf_file.name.endswith(('01.edf', '02.edf')):
                

                # (open and close left or right fist)
                if edf_file.name.endswith(('03.edf', '07.edf' , '11.edf')):
                    data = make_epochs( edf_file = edf_file )
                    class_1.append(data)

                # (imagine opening and closing left or right fist)
                if edf_file.name.endswith(('04.edf', '08.edf' , '12.edf')):
                    data = make_epochs( edf_file = edf_file )
                    class_2.append(data)

                # (open and close both fists or both feet)
                if edf_file.name.endswith(('05.edf', '09.edf' , '13.edf')):
                    data = make_epochs( edf_file = edf_file )
                    class_3.append(data)
                    
                # (imagine opening and closing both fists or both feet)
                if edf_file.name.endswith(('06.edf', '10.edf' , '14.edf')):
                    data = make_epochs( edf_file = edf_file )
                    class_4.append(data)
                    
                
                


In [None]:
print(class_1[1].get_data().shape)

In [None]:
import numpy as np
from tqdm import tqdm

ref_shape = class_1[0].get_data().shape[1:]
data_array = []

valid_data = [e.get_data()[:-1] for e in class_1 if e.get_data().shape[1:] == ref_shape]
data_array.append(np.vstack(valid_data))
del class_1
del valid_data

valid_data = [e.get_data()[:-1] for e in class_2 if e.get_data().shape[1:] == ref_shape]
data_array.append(np.vstack(valid_data))
del class_2
del valid_data

valid_data = [e.get_data()[:-1] for e in class_3 if e.get_data().shape[1:] == ref_shape]
data_array.append(np.vstack(valid_data))
del class_3
del valid_data

valid_data = [e.get_data()[:-1] for e in class_4 if e.get_data().shape[1:] == ref_shape]
data_array.append(np.vstack(valid_data))
del class_4
del valid_data

print(len(data_array))

In [None]:

count = 0 
y = [ [ count ]*data_point.shape[0] for count , data_point in enumerate( data_array )  ]

X = np.vstack(data_array)
Y = [label for sublist in y for label in sublist]
Y = np.array(Y)
X = X.transpose(0, 2, 1)

X = X * 1e7



In [None]:
import numpy as np
from scipy import signal
from tqdm import tqdm

def setup_filters(sampling_rate):
    b_notch, a_notch = signal.iirnotch(50.0 / (0.5 * sampling_rate), 30.0)
    b_bandpass, a_bandpass = signal.butter(4, [0.5 / (0.5 * sampling_rate), 30.0 / (0.5 * sampling_rate)], 'band')
    return b_notch, a_notch, b_bandpass, a_bandpass

def process_eeg_data(data, b_notch, a_notch, b_bandpass, a_bandpass):
    data = signal.filtfilt(b_notch, a_notch, data)
    data = signal.filtfilt(b_bandpass, a_bandpass, data)
    return data

def calculate_psd_features(segment, sampling_rate):
    f, psd_values = signal.welch(segment, fs=sampling_rate, nperseg=len(segment))
    bands = {'alpha': (8, 13), 'beta': (14, 30), 'theta': (4, 7), 'delta': (0.5, 3)}
    features = {}
    for band, (low, high) in bands.items():
        idx = np.where((f >= low) & (f <= high))
        features[f'E_{band}'] = np.sum(psd_values[idx])
    features['alpha_beta_ratio'] = features['E_alpha'] / features['E_beta'] if features['E_beta'] > 0 else 0
    return features

def calculate_additional_features(segment, sampling_rate):
    f, psd = signal.welch(segment, fs=sampling_rate, nperseg=len(segment))
    peak_frequency = f[np.argmax(psd)]
    spectral_centroid = np.sum(f * psd) / np.sum(psd)
    log_f = np.log(f[1:])
    log_psd = np.log(psd[1:])
    spectral_slope = np.polyfit(log_f, log_psd, 1)[0]
    return {
        'peak_frequency': peak_frequency,
        'spectral_centroid': spectral_centroid,
        'spectral_slope': spectral_slope
    }

# Setup
sampling_rate = 160
b_notch, a_notch, b_bandpass, a_bandpass = setup_filters(sampling_rate)

df = []

for idx, sample in enumerate(tqdm(X, desc="Extracting EEG features")):
    sample_features = {}
    for ch in range(sample.shape[1]):  # Iterate over 64 channels
        channel_data = sample[:, ch]
        processed_data = process_eeg_data(channel_data, b_notch, a_notch, b_bandpass, a_bandpass)
        
        psd_features = calculate_psd_features(processed_data, sampling_rate)
        additional_features = calculate_additional_features(processed_data, sampling_rate)

        # Prefix channel index to feature names
        channel_features = {f'ch{ch}_{k}': v for k, v in {**psd_features, **additional_features}.items()}
        sample_features.update(channel_features)
    
    df.append(sample_features)

# Optionally convert to DataFrame
import pandas as pd
df = pd.DataFrame(df)


In [None]:
config = {
    
    # TRAINING PARAMS
    "training_size"    : 0.80,
    "val_size"         : 0.20,
    "learning_rate"    : 1e-4,
    "batch_size"       : 64,
    "training_epochs"  : 15000,
    "learning_step_size" : 25,       
    "learning_rate_decay": 1e-5,        

    # TRANSFORMER PARAMS
    "maxlen"           : 1,  # (Seq_len of Transformer)
    "num_features"     : 512,   # No. of Electrodes
    "num_classes"      : 4,
    "num_heads"        : 8,
    "embed_dim"        : 64,   # 640 * 14 ----> 640 * embed_dim -----> tranformer  ---> 
    "ff_dim"           : 64    # middle layer ( No. of Nodes )
}



In [None]:
import torch
from torch.utils.data import Dataset, DataLoader, random_split

class EEGDataset(Dataset):
    def __init__(self, data, labels):
        if isinstance(data, pd.DataFrame):
            data = data.values
        if isinstance(labels, pd.Series):
            labels = labels.values

        self.data = torch.tensor(data, dtype=torch.float32)
        self.labels = torch.tensor(labels, dtype=torch.long)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx], self.labels[idx]



In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from torch.utils.data import Dataset, DataLoader, random_split
from sklearn.preprocessing import StandardScaler
import joblib
import torch.optim as optim
import math
import matplotlib.pyplot as plt

class EEGTransformerEncoderOnly(nn.Module):
    def __init__(self, maxlen, num_features, num_classes, embed_dim, 
                 num_heads, ff_dim, dropout=0.1, num_encoder_layers=1):
        super(EEGTransformerEncoderOnly, self).__init__()
        
        self.maxlen = maxlen
        self.num_features = num_features
        self.num_classes = num_classes
        self.embed_dim = embed_dim
        
        self.input_projection = nn.Linear(num_features, embed_dim)
        
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=embed_dim,
            nhead=num_heads,
            dim_feedforward=ff_dim,
            dropout=dropout,
            batch_first=True
        )
        self.transformer_encoder = nn.TransformerEncoder(
            encoder_layer, 
            num_layers=num_encoder_layers
        )
        
        self.global_avg_pool = nn.AdaptiveAvgPool1d(1)
        
        self.fc1 = nn.Linear(embed_dim, 256)
        self.fc2 = nn.Linear(256, 64)
        self.fc3 = nn.Linear(64, num_classes)
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)
        
    def positional_encoding(self, x):
        """Generate positional encoding for the input sequence"""
        batch_size, seq_len, d_model = x.size()
        
        pe = torch.zeros(seq_len, d_model, device=x.device)
        position = torch.arange(0, seq_len, dtype=torch.float, device=x.device).unsqueeze(1)
        
        div_term = torch.exp(torch.arange(0, d_model, 2, dtype=torch.float, device=x.device) * 
                            -(math.log(10000.0) / d_model))
        
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        
        return pe.unsqueeze(0).expand(batch_size, -1, -1)
        
    def forward(self, x):

        x = self.input_projection(x)
        # x = x + self.positional_encoding(x)
        x = self.transformer_encoder(x)
        # x = x.permute(0, 2, 1)
        x = self.global_avg_pool(x).squeeze(-1)
        
        x = F.relu(self.fc1(x))
        x = self.dropout1(x)
        x = F.relu(self.fc2(x))
        x = self.dropout2(x)
        x = self.fc3(x)
        
        return x

    @staticmethod
    def train_model(model, train_loader, val_loader, criterion, optimizer,  
                    scheduler ,device = None ,epochs = config['training_epochs']):
    
        model.to(device)
        train_losses, val_losses = [], []
        train_accs, val_accs = [], []
        # print("Train_laoder: ",len(train_loader))
        
        for epoch in range(epochs):
            model.train()
            total_loss, correct, total = 0, 0, 0
        
            for inputs, targets in train_loader:
                inputs, targets = inputs.to(device), targets.to(device)
                optimizer.zero_grad()
                outputs = model(inputs)
                loss = criterion(outputs, targets)
        
                loss.backward()
                optimizer.step()
        
                total_loss += loss.item() * inputs.size(0)
                correct += (outputs.argmax(1) == targets).sum().item()
                total += targets.size(0)
        
            train_loss = total_loss / total
            train_acc = correct / total
            val_loss, val_acc = EEGTransformerEncoderOnly.evaluate_model(model, val_loader, criterion)
        
            print(f"Epoch {epoch + 1}: Train Loss = {train_loss:.4f}, Train Acc = {train_acc:.4f}, "
                  f"Val Loss = {val_loss:.4f}, Val Acc = {val_acc:.4f}")
        
            train_losses.append(train_loss)
            val_losses.append(val_loss)  
            train_accs.append(train_acc)
            val_accs.append(val_acc)
        



    @staticmethod
    def evaluate_model(model, dataloader, criterion=None):
        model.eval()
        total_loss, correct, total = 0, 0, 0
        with torch.no_grad():
            for inputs, targets in dataloader:
                inputs, targets = inputs.to(device), targets.to(device)
                outputs = model(inputs )
                if criterion:
                    loss = criterion(outputs, targets)
                    total_loss += loss.item() * inputs.size(0)
                correct += (outputs.argmax(1) == targets).sum().item()
                total += targets.size(0)
        if criterion:
            return total_loss / total, correct / total
        return correct / total
    
    
    
    


In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
import torch.optim.lr_scheduler as lr_scheduler





def modal(X, Y):

    dataset = EEGDataset(X, Y)

    train_size = int(config["training_size"] * len(dataset))
    val_size = len(dataset) - train_size  # Use the rest for validation

    train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

    
    
    train_loader = DataLoader(train_dataset, batch_size=config["batch_size"], shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=config["batch_size"], shuffle=False)

    model = EEGTransformerEncoderOnly(maxlen=config["maxlen"], num_features=config["num_features"], num_classes=config["num_classes"], 
                           embed_dim=config["embed_dim"], num_heads=config["num_heads"], ff_dim=config["ff_dim"])

    if torch.cuda.device_count() > 1:
        print(f"Using {torch.cuda.device_count()} GPUs")
        model = nn.DataParallel(model)

    model.to(device)
    model.eval()
    
    num_batches = len(train_loader)
    print(f"Number of batches: {num_batches}")


    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=config["learning_rate"])
    
    scheduler = lr_scheduler.StepLR(
    optimizer, 
    step_size=config["learning_step_size"],       # e.g., every 10 epochs
    gamma=config["learning_rate_decay"]           # e.g., 0.1 for 10x decay
        
    )

    EEGTransformerEncoderOnly.train_model(model, train_loader, val_loader, criterion, optimizer , scheduler , device )

    # joblib.dump(model,"model.pkl")




In [None]:
modal(df,Y)