In [1]:
import os
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from scipy.signal import butter, lfilter, iirnotch, filtfilt
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import accuracy_score, f1_score
from tqdm import tqdm
import joblib

In [2]:
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

/kaggle/input/mtcaic3/sample_submission.csv
/kaggle/input/mtcaic3/README.md
/kaggle/input/mtcaic3/validation.csv
/kaggle/input/mtcaic3/train.csv
/kaggle/input/mtcaic3/test.csv
/kaggle/input/mtcaic3/SSVEP/validation/S32/1/EEGdata.csv
/kaggle/input/mtcaic3/SSVEP/validation/S33/1/EEGdata.csv
/kaggle/input/mtcaic3/SSVEP/validation/S31/1/EEGdata.csv
/kaggle/input/mtcaic3/SSVEP/validation/S35/1/EEGdata.csv
/kaggle/input/mtcaic3/SSVEP/validation/S34/1/EEGdata.csv
/kaggle/input/mtcaic3/SSVEP/test/S39/1/EEGdata.csv
/kaggle/input/mtcaic3/SSVEP/test/S40/1/EEGdata.csv
/kaggle/input/mtcaic3/SSVEP/test/S37/1/EEGdata.csv
/kaggle/input/mtcaic3/SSVEP/test/S38/1/EEGdata.csv
/kaggle/input/mtcaic3/SSVEP/test/S36/1/EEGdata.csv
/kaggle/input/mtcaic3/SSVEP/train/S26/7/EEGdata.csv
/kaggle/input/mtcaic3/SSVEP/train/S26/2/EEGdata.csv
/kaggle/input/mtcaic3/SSVEP/train/S26/5/EEGdata.csv
/kaggle/input/mtcaic3/SSVEP/train/S26/8/EEGdata.csv
/kaggle/input/mtcaic3/SSVEP/train/S26/3/EEGdata.csv
/kaggle/input/mtcaic3/SS

In [3]:
def butter_bandpass(lowcut, highcut, fs, order=4):
    nyq = 0.5 * fs
    low = lowcut / nyq
    high = highcut / nyq
    b, a = butter(order, [low, high], btype='band')
    return b, a

def notch_filter(data, fs=250, freq=50.0, Q=30.0):
    b, a = iirnotch(freq / (0.5 * fs), Q)
    return filtfilt(b, a, data, axis=0)

def bandpass_filter(data, lowcut=7, highcut=30, fs=250):
    b, a = butter_bandpass(lowcut, highcut, fs)
    data = lfilter(b, a, data, axis=0)
    return notch_filter(data, fs=fs)
def add_noise(eeg, noise_std=0.01):
    noise = np.random.normal(0, noise_std, eeg.shape)
    return eeg + noise

def normalize_per_trial(eeg):
    mean = np.mean(eeg, axis=1, keepdims=True)
    std = np.std(eeg, axis=1, keepdims=True) + 1e-6
    return (eeg - mean) / std

def standardize_global(eeg, mean, std):
    return (eeg - mean) / (std + 1e-6)
   
def mixup_data(x, y, alpha=0.4):
    '''Returns mixed inputs, pairs of targets, and lambda'''
    if alpha > 0:
        lam = np.random.beta(alpha, alpha)
    else:
        lam = 1.0

    batch_size = x.size(0)
    index = torch.randperm(batch_size).to(x.device)

    mixed_x = lam * x + (1 - lam) * x[index, :]
    y_a, y_b = y, y[index]
    return mixed_x, y_a, y_b, lam

In [4]:
class EEGDataset(Dataset):
    def __init__(self, task, split, base_path='./kaggle/input/mtcaic3', apply_noise=True, apply_trial_norm=True, apply_global_std=False, global_mean=None, global_std=None):
        self.task = task
        self.split = split
        self.base_path = base_path
        self.apply_noise = apply_noise
        self.apply_trial_norm = apply_trial_norm
        self.apply_global_std = apply_global_std
        self.global_mean = global_mean
        self.global_std = global_std

        self.meta_df = pd.read_csv(os.path.join(base_path, f'{split}.csv'))
        self.meta_df = self.meta_df[self.meta_df['task'] == task]

        self.label_encoder = LabelEncoder()
        if split != 'test':
            self.meta_df['label_enc'] = self.label_encoder.fit_transform(self.meta_df['label'])

    def __len__(self):
        return len(self.meta_df)

    def __getitem__(self, idx):
        row = self.meta_df.iloc[idx]
        eeg_path = os.path.join(
            self.base_path,
            row['task'],
            self.split,
            row['subject_id'],
            str(row['trial_session']),
            'EEGdata.csv'
        )

        df = pd.read_csv(eeg_path)

        samples_per_trial = 2250 if row['task'] == 'MI' else 1750
        trial_num = int(row['trial'])
        start_idx = (trial_num - 1) * samples_per_trial
        end_idx = start_idx + samples_per_trial

        df = df.iloc[start_idx:end_idx]
        eeg = df[['FZ', 'C3', 'CZ', 'C4', 'PZ', 'PO7', 'OZ', 'PO8']].values
        #eeg = df[['C3', 'CZ', 'C4']].values

        eeg = bandpass_filter(eeg)

        # Apply per-trial z-score normalization
        if self.apply_trial_norm:
            eeg = normalize_per_trial(eeg)

        # Apply global standardization
        if self.apply_global_std and self.global_mean is not None and self.global_std is not None:
            eeg = standardize_global(eeg, self.global_mean, self.global_std)

        # Apply Gaussian noise
        if self.apply_noise and self.split == 'train':
            eeg = add_noise(eeg)

        eeg = eeg.T  # Shape: (channels, time)
        eeg = torch.tensor(eeg.copy(), dtype=torch.float32)  # .copy() to avoid negative strides

        if self.split != 'test':
            label = torch.tensor(row['label_enc'], dtype=torch.long)
            return eeg, label
        else:
            return eeg

In [5]:
import torch
import torch.nn as nn
import math

# --- Positional Encoding ---
class PositionalEncoding(nn.Module):
    def __init__(self, dim, max_len=500):
        super(PositionalEncoding, self).__init__()
        pe = torch.zeros(max_len, dim)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, dim, 2).float() * (-math.log(10000.0) / dim))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)  # Shape: [1, max_len, dim]
        self.register_buffer('pe', pe)

    def forward(self, x):
        return x + self.pe[:, :x.size(1)]

# --- Squeeze-and-Excitation Block ---
class SEBlock(nn.Module):
    def __init__(self, channels, reduction=8):
        super(SEBlock, self).__init__()
        self.pool = nn.AdaptiveAvgPool2d(1)
        self.fc = nn.Sequential(
            nn.Linear(channels, channels // reduction, bias=False),
            nn.ReLU(),
            nn.Linear(channels // reduction, channels, bias=False),
            nn.Sigmoid()
        )

    def forward(self, x):
        b, c, _, _ = x.size()
        w = self.pool(x).view(b, c)
        w = self.fc(w).view(b, c, 1, 1)
        return x * w

# --- EEGNet + Transformer ---
class EEGNetV4Transformer(nn.Module):
    def __init__(self, num_classes, channels=8, samples=2250, transformer_dim=32, num_heads=4, num_layers=1):
        super(EEGNetV4Transformer, self).__init__()

        self.firstconv = nn.Sequential(
            nn.Conv2d(1, 16, (1, 32), padding=(0, 16), bias=False),
            nn.BatchNorm2d(16)
        )

        self.depthwiseConv = nn.Sequential(
            nn.Conv2d(16, 32, (channels, 1), groups=16, bias=False),
            nn.BatchNorm2d(32),
            nn.ELU(),
            nn.AvgPool2d((1, 4)),
            nn.Dropout(0.5)
        )

        self.separableConv = nn.Sequential(
            nn.Conv2d(32, 32, (1, 16), padding=(0, 8), bias=False),
            nn.BatchNorm2d(32),
            nn.ELU(),
            nn.AvgPool2d((1, 8)),
            nn.Dropout(0.5)
        )

        self.attention = SEBlock(32)

        # Transformer encoder
        encoder_layer = nn.TransformerEncoderLayer(d_model=transformer_dim, nhead=num_heads, batch_first=True)
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        self.pos_encoder = PositionalEncoding(dim=transformer_dim)

        self.classify = nn.Sequential(
    nn.Linear(transformer_dim, 64),
    nn.ReLU(),
    nn.Dropout(0.3),
    nn.Linear(64, num_classes)
)

    def forward(self, x):
        x = x.unsqueeze(1)  # [B, 1, C, T]
        x = self.firstconv(x)
        x = self.depthwiseConv(x)
        x = self.separableConv(x)
        x = self.attention(x)

        # x shape: [B, 32, 1, T'] → squeeze → [B, 32, T']
        x = x.squeeze(2)  # [B, 32, T']
        x = x.permute(0, 2, 1)  # [B, T', 32]

        x = self.pos_encoder(x)
        x = self.transformer(x)  # [B, T', 32]

        # Global average pooling across T'
        x = x.mean(dim=1)  # [B, 32]

        return self.classify(x)


In [12]:
import torch
import torch.nn as nn
import torch.nn.functional as F

# Swish activation
class Swish(nn.Module):
    def forward(self, x):
        return x * torch.sigmoid(x)

# CBAM Block (Channel & Spatial Attention)
class CBAM(nn.Module):
    def __init__(self, channel, reduction=16):
        super(CBAM, self).__init__()
        self.channel_att = SEBlock(channel, reduction)
        self.spatial_att = nn.Sequential(
            nn.Conv2d(2, 1, kernel_size=7, padding=3, bias=False),
            nn.Sigmoid()
        )

    def forward(self, x):
        x_out = self.channel_att(x)
        avg_out = torch.mean(x_out, dim=1, keepdim=True)
        max_out, _ = torch.max(x_out, dim=1, keepdim=True)
        concat = torch.cat([avg_out, max_out], dim=1)
        scale = self.spatial_att(concat)
        return x_out * scale

# Same SEBlock, configurable reduction
class SEBlock(nn.Module):
    def __init__(self, channel, reduction=4):
        super(SEBlock, self).__init__()
        self.avg_pool = nn.AdaptiveAvgPool2d(1)
        self.fc = nn.Sequential(
            nn.Linear(channel, channel // reduction, bias=False),
            nn.ReLU(inplace=True),
            nn.Linear(channel // reduction, channel, bias=False),
            nn.Sigmoid()
        )

    def forward(self, x):
        b, c, _, _ = x.size()
        y = self.avg_pool(x).view(b, c)
        y = self.fc(y).view(b, c, 1, 1)
        return x * y.expand_as(x)

# Tiny Transformer Encoder for temporal modeling
class TemporalTransformer(nn.Module):
    def __init__(self, dim, heads=2, layers=1):
        super(TemporalTransformer, self).__init__()
        encoder_layer = nn.TransformerEncoderLayer(d_model=dim, nhead=heads, batch_first=True)
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=layers)

    def forward(self, x):
        # x: [B, C, 1, T] -> flatten spatial dims -> [B, T, C]
        b, c, _, t = x.size()
        x = x.squeeze(2).permute(0, 2, 1)
        x = self.transformer(x)
        # [B, T, C] -> [B, C, 1, T]
        x = x.permute(0, 2, 1).unsqueeze(2)
        return x

class EEGNetPro(nn.Module):
    def __init__(self, num_classes, channels=8, samples=2250):
        super(EEGNetPro, self).__init__()
        self.swish = Swish()

        self.firstconv = nn.Sequential(
            nn.Conv2d(1, 16, (1, 32), padding=(0, 32), bias=False),
            nn.BatchNorm2d(16)
        )

        self.depthwiseConv = nn.Sequential(
            nn.Conv2d(16, 32, (channels, 1), groups=16, bias=False),
            nn.BatchNorm2d(32),
            self.swish,
            nn.BatchNorm2d(32),
            nn.AvgPool2d((1, 4)),
            nn.Dropout(0.5)
        )
        self.se_block1 = SEBlock(32, reduction=4)

        self.separableConv = nn.Sequential(
            nn.Conv2d(32, 32, (1, 16), padding=(0, 8), bias=False),
            nn.BatchNorm2d(32),
            self.swish,
            nn.BatchNorm2d(32),
            nn.AvgPool2d((1, 8)),
            nn.Dropout(0.5)
        )

        self.cbam = CBAM(32)

        self.temporal_transformer = TemporalTransformer(dim=32, heads=2, layers=1)

        self.flatten_dim = self._get_flattened_size(channels, samples)
        self.classify = nn.Linear(self.flatten_dim, num_classes)

    def _get_flattened_size(self, channels, samples):
        with torch.no_grad():
            x = torch.zeros(1, 1, channels, samples)
            x = self.firstconv(x)
            x = self.depthwiseConv(x)
            x = self.se_block1(x)
            x = self.separableConv(x)
            x = self.cbam(x)
            x = self.temporal_transformer(x)
            return x.reshape(1, -1).shape[1]

    def forward(self, x):
        x = x.unsqueeze(1)
        x = self.firstconv(x)
        x = self.depthwiseConv(x)
        x = self.se_block1(x)
        x = self.separableConv(x)
        x = self.cbam(x)
        x = self.temporal_transformer(x)
        x = x.reshape(x.size(0), -1)
        return self.classify(x)

In [19]:
model_mi = EEGNetV4Transformer(num_classes = 2, samples=2250).to(device)
model_mi.attention = SEBlock(channel=32, reduction=8)  
model_mi.load_state_dict(torch.load('/kaggle/working/best_eegnet_model.pt', map_location='cpu'))  # or 'cuda' if available
model_mi.eval()

model_ssvep = EEGNetPro(num_classes = 4, samples=1750).to(device)
model_ssvep.load_state_dict(torch.load('/kaggle/input/best_ssvep/pytorch/default/1/best_ssvep_model.pt', map_location='cpu'))  # or 'cuda' if available
model_ssvep.eval()

EEGNetPro(
  (swish): Swish()
  (firstconv): Sequential(
    (0): Conv2d(1, 16, kernel_size=(1, 32), stride=(1, 1), padding=(0, 32), bias=False)
    (1): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  )
  (depthwiseConv): Sequential(
    (0): Conv2d(16, 32, kernel_size=(8, 1), stride=(1, 1), groups=16, bias=False)
    (1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): Swish()
    (3): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (4): AvgPool2d(kernel_size=(1, 4), stride=(1, 4), padding=0)
    (5): Dropout(p=0.5, inplace=False)
  )
  (se_block1): SEBlock(
    (avg_pool): AdaptiveAvgPool2d(output_size=1)
    (fc): Sequential(
      (0): Linear(in_features=32, out_features=8, bias=False)
      (1): ReLU(inplace=True)
      (2): Linear(in_features=8, out_features=32, bias=False)
      (3): Sigmoid()
    )
  )
  (separableConv): Sequential(
    (0): Conv2d(32, 32, kernel_size

In [20]:
from sklearn.preprocessing import LabelEncoder
import torch
from torch.utils.data import DataLoader
from tqdm import tqdm
import pandas as pd

def test_and_generate_csv(model, dataset_class, task, base_path, output_csv='submission.csv', batch_size=32):
    # Load test set
    test_dataset = dataset_class(task=task, split='test', base_path=base_path, apply_noise=False)
    
    # Load train set just to fit label encoder
    train_dataset = dataset_class(task=task, split='train', base_path=base_path, apply_noise=False)
    test_dataset.label_encoder = train_dataset.label_encoder  # assign fitted encoder

    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

    model.eval()
    model.to('cuda' if torch.cuda.is_available() else 'cpu')
    
    all_preds = []
    all_ids = []

    with torch.no_grad():
        for X in tqdm(test_loader, desc=f'Testing [{task}]'):
            X = X.to('cuda' if torch.cuda.is_available() else 'cpu')
            outputs = model(X)
            preds = outputs.argmax(dim=1).cpu().numpy()
            all_preds.extend(preds)

    # Get the test metadata CSV
    test_meta = pd.read_csv(f'{base_path}/test.csv')
    test_meta = test_meta[test_meta['task'] == task].reset_index(drop=True)

    # Decode numeric predictions to labels
    if hasattr(test_dataset, 'label_encoder') and hasattr(test_dataset.label_encoder, 'inverse_transform'):
        all_preds = test_dataset.label_encoder.inverse_transform(all_preds)

    output_df = pd.DataFrame({
        'id': test_meta['id'],
        'label': all_preds
    })
    output_df.to_csv(output_csv, index=False)
    print(f" Saved predictions to {output_csv}")


In [21]:
test_and_generate_csv(model_mi, EEGDataset, task='MI', base_path='/kaggle/input/mtcaic3', output_csv='submission_mi.csv')

Testing [MI]: 100%|██████████| 2/2 [00:03<00:00,  1.54s/it]

 Saved predictions to submission_mi.csv





In [22]:
test_and_generate_csv(model_ssvep, EEGDataset, task='SSVEP', base_path='/kaggle/input/mtcaic3', output_csv='submission_ssvep.csv')

Testing [SSVEP]: 100%|██████████| 2/2 [00:02<00:00,  1.12s/it]

 Saved predictions to submission_ssvep.csv



