In [1]:
import pandas as pd

base_path = '/kaggle/input/mtcaic3'
train_df = pd.read_csv(f'{base_path}/train.csv')
val_df = pd.read_csv(f'{base_path}/validation.csv')

ssvep_df = pd.concat([
    train_df[train_df['task'] == 'SSVEP'],
    val_df[val_df['task'] == 'SSVEP']
], ignore_index=True)


In [2]:
label_map = {'Left': 0, 'Right': 1, 'Forward': 2, 'Backward': 3}
ssvep_df['label'] = ssvep_df['label'].map(label_map)

In [3]:
import numpy as np
import os

def load_ssvep_trial(row, dataset='train', base_path='./mtc-aic3_dataset'):
    subject_id = row['subject_id']
    trial_session = row['trial_session']
    trial_num = int(row['trial'])
    eeg_path = f"{base_path}/SSVEP/{dataset}/{subject_id}/{trial_session}/EEGdata.csv"
    
    eeg_data = pd.read_csv(eeg_path)
    samples_per_trial = 1750
    start_idx = (trial_num - 1) * samples_per_trial
    end_idx = start_idx + samples_per_trial
    trial_segment = eeg_data.iloc[start_idx:end_idx]
    
    selected_channels = ['OZ', 'PZ', 'CZ', 'PO8']
    return trial_segment[selected_channels].values.T.astype(np.float32)


In [4]:
def load_trial(row, dataset_type):
    eeg_path = f"{base_path}/SSVEP/{dataset_type}/{row['subject_id']}/{row['trial_session']}/EEGdata.csv"
    eeg = pd.read_csv(eeg_path)
    trial_num = int(row['trial'])
    start = (trial_num - 1) * 1750
    end = trial_num * 1750
    selected_channels = ['OZ', 'PZ', 'CZ', 'PO8']
    return eeg[selected_channels].iloc[start:end].values.T.astype(np.float32)

In [5]:
def extract_features(signal):
    features = []
    for ch in signal:
        # Time-domain
        features.extend([
            np.mean(ch), np.std(ch), np.min(ch), np.max(ch),
            np.sum(ch**2),  # Energy
            np.sqrt(np.mean(ch**2)),  # RMS
        ])
        # Frequency-domain
        fft = np.fft.rfft(ch)
        power = np.abs(fft)**2
        features.append(np.sum(power[7:10]))   # 7–10 Hz (Forward)
        features.append(np.sum(power[10:13]))  # 10–13 Hz (Left/Right)
    return np.array(features)

In [6]:
X = []
y = []

for _, row in ssvep_df.iterrows():
    dataset_type = 'train' if row['id'] <= 4800 else 'validation'
    signal = load_trial(row, dataset_type)
    feats = extract_features(signal)
    X.append(feats)
    y.append(row['label'])

X = np.array(X)
y = np.array(y)
print(f"✅ Extracted shape: {X.shape}, Labels: {y.shape}")


✅ Extracted shape: (2450, 32), Labels: (2450,)


In [None]:
# def extract_features(trial):  # shape: (4, 1750)
#     features = []
#     for ch in trial:
#         features.extend([
#             np.mean(ch),
#             np.std(ch),
#             np.max(ch),
#             np.min(ch),
#             np.sum(ch ** 2),  # energy
#         ])
#     return np.array(features)

# X_feat = np.array([extract_features(trial) for trial in X_all])  # shape: (N, 4×5 = 20)


In [9]:
!pip install lazypredict

[0m[31mERROR: Could not find a version that satisfies the requirement lazypredict (from versions: none)[0m[31m
[0m[31mERROR: No matching distribution found for lazypredict[0m[31m
[0m

# Band power

In [6]:
bands = {
    "theta": (6, 8),     # Could capture 7 Hz (Forward)
    "alpha": (8, 10),    # Could capture 8 Hz (Backward)
    "mu":    (10, 12),   # Could capture 10 Hz (Left)
    "beta":  (12, 14),   # Could capture 13 Hz (Right)
}


In [7]:
from scipy.signal import welch

def bandpower(data, fs, band, window_sec=None):
    band = np.asarray(band)
    low, high = band
    if window_sec is None:
        nperseg = (2 / (high - low)) * fs  # Adaptive window
    else:
        nperseg = int(window_sec * fs)

    freqs, psd = welch(data, fs=fs, nperseg=nperseg)
    freq_res = freqs[1] - freqs[0]
    
    # Band power
    idx_band = np.logical_and(freqs >= low, freqs <= high)
    return np.sum(psd[idx_band]) * freq_res


In [8]:
def extract_bandpower_features(signal, fs=250):
    # signal: (4 channels, 1750 samples)
    bands = {
        "theta": (6, 8),
        "alpha": (8, 10),
        "mu":    (10, 12),
        "beta":  (12, 14),
    }
    
    features = []
    for ch in signal:
        for band_range in bands.values():
            bp = bandpower(ch, fs, band_range)
            features.append(bp)
    return np.array(features)


In [None]:
X = []
y = []

for _, row in ssvep_df.iterrows():
    dataset_type = 'train' if row['id'] <= 4800 else 'validation'
    signal = load_trial(row, dataset_type)  # shape: (4, 1750)
    features = extract_bandpower_features(signal, fs=250)
    X.append(features)
    y.append(row['label'])

X = np.array(X)
y = np.array(y)
print("✅ Band Power feature matrix:", X.shape)


# Wavelet

In [None]:
!pip install pywt


In [10]:
import pywt
import numpy as np

def extract_wavelet_energy(signal, wavelet='db4', level=4):
    # signal shape: (4, 1750) → channels × samples
    features = []

    for ch in signal:
        coeffs = pywt.wavedec(ch, wavelet=wavelet, level=level)
        energies = [np.sum(np.square(c)) for c in coeffs]
        features.extend(energies)

    return np.array(features)


In [None]:
X_wavelet = []
y_wavelet = []

for _, row in ssvep_df.iterrows():
    dataset_type = 'train' if row['id'] <= 4800 else 'validation'
    signal = load_trial(row, dataset_type)  # shape: (4, 1750)
    features = extract_wavelet_energy(signal)
    X_wavelet.append(features)
    y_wavelet.append(row['label'])

X_wavelet = np.array(X_wavelet)
y_wavelet = np.array(y_wavelet)
print(f"✅ Wavelet Energy features shape: {X_wavelet.shape}")


In [None]:
from lazypredict.Supervised import LazyClassifier
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

# Normalize
X_scaled = StandardScaler().fit_transform(X_wavelet)

# Train/test split
X_train, X_test, y_train, y_test = train_test_split(X_scaled, y_wavelet, test_size=0.2, random_state=42)

# Run LazyPredict
clf = LazyClassifier(verbose=0, ignore_warnings=True)
models, predictions = clf.fit(X_train, X_test, y_train, y_test)

print(models)


# combine band power and wavelet

In [41]:
import numpy as np

def compute_hjorth_parameters(signal):
    """
    signal: np.array of shape (channels, time)
    Returns: np.array of shape (channels × 3)
    """
    features = []

    for ch in signal:
        first_deriv = np.diff(ch)
        second_deriv = np.diff(first_deriv)

        activity = np.var(ch)
        mobility = np.sqrt(np.var(first_deriv) / activity) if activity != 0 else 0
        complexity = (
            np.sqrt(np.var(second_deriv) / np.var(first_deriv)) / mobility
            if mobility != 0 and np.var(first_deriv) != 0
            else 0
        )

        features.extend([activity, mobility, complexity])
    
    return np.array(features)


In [42]:
def extract_combined_features(signal, fs=250, wavelet='db4', level=4):
    bandpower_features = extract_bandpower_features(signal, fs)
    wavelet_features = extract_wavelet_energy(signal, wavelet, level)
    return np.concatenate([bandpower_features, wavelet_features])


In [None]:
X_combined = []
y_combined = []

for _, row in ssvep_df.iterrows():
    dataset_type = 'train' if row['id'] <= 4800 else 'validation'
    signal = load_trial(row, dataset_type)  # (4, 1750)
    features = extract_combined_features(signal)
    X_combined.append(features)
    y_combined.append(row['label'])

X_combined = np.array(X_combined)
y_combined = np.array(y_combined)

print("✅ Combined feature shape:", X_combined.shape)


In [None]:
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from lazypredict.Supervised import LazyClassifier

# Normalize
X_scaled = StandardScaler().fit_transform(X_combined)

# Train/test split
X_train, X_test, y_train, y_test = train_test_split(X_scaled, y_combined, test_size=0.2, random_state=42)

# LazyPredict
clf = LazyClassifier(verbose=0, ignore_warnings=True)
models, predictions = clf.fit(X_train, X_test, y_train, y_test)

print(models)


# Filter by gyro

In [12]:
def load_trial_with_gyro_filter(row, dataset_type, movement_threshold=0.1):
    eeg_path = f"{base_path}/SSVEP/{dataset_type}/{row['subject_id']}/{row['trial_session']}/EEGdata.csv"
    eeg = pd.read_csv(eeg_path)

    trial_num = int(row['trial'])
    start = (trial_num - 1) * 1750
    end = trial_num * 1750

    # Extract EEG + gyro
    eeg_segment = eeg[selected_channels].iloc[start:end].values.T.astype(np.float32)  # (4, 1750)
    gyro_signal = eeg[['Gyro1', 'Gyro2', 'Gyro3']].iloc[start:end].values  # (1750, 3)

    # Compute movement magnitude per sample
    gyro_movement = np.linalg.norm(gyro_signal, axis=1)
    gyro_std = np.std(gyro_movement)

    # Skip this trial if movement is too high
    if gyro_std > movement_threshold:
        return None, True  # second return value flags skipped trial
    return eeg_segment, False


In [13]:
def extract_gyro_features(gyro_signal):
    # gyro_signal: (3, 1750)
    features = []
    for g in gyro_signal:
        features.extend([
            np.mean(g), np.std(g), np.min(g), np.max(g),
            np.sum(g**2),  # Energy
            np.sqrt(np.mean(g**2))  # RMS
        ])
    return np.array(features)


In [14]:

from scipy.signal import butter, filtfilt

def bandpass_filter(data, lowcut=5, highcut=45, fs=250, order=4):
    """
    Apply a Butterworth bandpass filter to EEG data.
    
    Parameters:
    - data: np.ndarray, shape (n_channels, n_samples)
    - lowcut: float, low frequency cutoff
    - highcut: float, high frequency cutoff
    - fs: float, sampling rate
    - order: int, filter order
    
    Returns:
    - filtered_data: np.ndarray, shape (n_channels, n_samples)
    """
    nyquist = 0.5 * fs
    low = lowcut / nyquist
    high = highcut / nyquist

    b, a = butter(order, [low, high], btype='band')
    filtered_data = filtfilt(b, a, data, axis=-1)  # filter each channel
    return filtered_data


In [15]:
def augment_gaussian_noise(eeg, noise_std=0.01):
    noise = np.random.normal(0, noise_std, eeg.shape)
    return eeg + noise


In [16]:
def augment_time_shift(eeg, shift_samples=10):
    return np.roll(eeg, shift=shift_samples, axis=1)


In [17]:
def augment_amplitude_scale(eeg, scale_range=(0.9, 1.1)):
    scale = np.random.uniform(*scale_range, (eeg.shape[0], 1))
    return eeg * scale


In [18]:
def augment_eeg(eeg, methods=['noise', 'shift', 'scale']):
    if 'noise' in methods:
        eeg = augment_gaussian_noise(eeg)
    if 'shift' in methods:
        eeg = augment_time_shift(eeg, shift_samples=np.random.randint(-10, 10))
    if 'scale' in methods:
        eeg = augment_amplitude_scale(eeg)
    return eeg


In [19]:
ssvep_df

Unnamed: 0,id,subject_id,task,trial_session,trial,label
0,2401,S1,SSVEP,1,1,2
1,2402,S1,SSVEP,1,2,0
2,2403,S1,SSVEP,1,3,3
3,2404,S1,SSVEP,1,4,0
4,2405,S1,SSVEP,1,5,0
...,...,...,...,...,...,...
2445,4896,S35,SSVEP,1,6,1
2446,4897,S35,SSVEP,1,7,0
2447,4898,S35,SSVEP,1,8,2
2448,4899,S35,SSVEP,1,9,3


In [20]:
 def sliding_window_augment(X, y, window_size=100, stride=100):
    aug_X = []
    aug_y = []
    for trial, label in zip(X, y):
        # print(trial.shape[0])
        for start in range(0, trial.shape[0] - window_size + 1, stride):
            
            window = trial[start:start + window_size]
            aug_X.append(window)
            aug_y.append(label)
    return np.array(aug_X), np.array(aug_y)

In [None]:
print(" Applying sliding window augmentation...")
aug_X_train, aug_y_train = sliding_window_augment(X_train, y_train, window_size=32, stride=4)
# aug_X_val_MI, aug_y_val_MI = sliding_window_augment(pre_X_val_MI, y_val_MI, window_size=200, stride=200)
print(f" Augmented MI training set shape: {aug_X_train.shape}, Labels: {aug_y_train.shape}")
# print(f" Augmented MI validate set shape: {aug_X_val_MI.shape}, Labels: {aug_y_val_MI.shape}")

In [23]:
def apply_car(eeg_signal):
    """
    Apply Common Average Referencing (CAR) to EEG data.
    
    eeg_signal: np.ndarray of shape (channels, samples)
    """
    average = np.mean(eeg_signal, axis=0)           # Shape: (samples,)
    eeg_car = eeg_signal - average                  # Subtract from each channel
    return eeg_car


In [21]:
selected_channels = ['OZ', 'PZ', 'CZ', 'PO8']

In [43]:
def apply_car(eeg_signal):
    """
    Apply Common Average Referencing (CAR) to EEG data.
    eeg_signal: shape (channels, time)
    """
    mean_across_channels = np.mean(eeg_signal, axis=0)  # shape: (time,)
    return eeg_signal - mean_across_channels

# Main data processing loop
X_combined = []
y_combined = []
aug_factor = 2

for _, row in ssvep_df.iterrows():
    dataset_type = 'train' if row['id'] <= 4800 else 'validation'

    # Load EEG file
    eeg_path = f"{base_path}/SSVEP/{dataset_type}/{row['subject_id']}/{row['trial_session']}/EEGdata.csv"
    eeg = pd.read_csv(eeg_path)

    trial_num = int(row['trial'])
    start = (trial_num - 1) * 1750
    end = trial_num * 1750

    # Select EEG and Gyro
    eeg_segment = eeg[selected_channels].iloc[start:end].values.T.astype(np.float32)
    gyro_segment = eeg[['Gyro1', 'Gyro2', 'Gyro3']].iloc[start:end].values.T.astype(np.float32)

    # Filter noisy trial using gyro
    gyro_movement = np.linalg.norm(gyro_segment.T, axis=1)
    if np.std(gyro_movement) > 3:
        continue  # Skip noisy trial

    # Apply CAR to EEG
    eeg_car = apply_car(eeg_segment)

    # Apply bandpass filter
    filtered = bandpass_filter(eeg_car, lowcut=5, highcut=45, fs=250)

    # Extract features
    eeg_features = extract_combined_features(filtered)  # Band Power + Wavelet
    gyro_features = extract_gyro_features(gyro_segment)

    full_features = np.concatenate([eeg_features, gyro_features])
    X_combined.append(full_features)
    y_combined.append(row['label'])

X_combined = np.array(X_combined)
y_combined = np.array(y_combined)

print("✅ Final feature shape (EEG + Gyro):", X_combined.shape)


✅ Final feature shape (EEG + Gyro): (2117, 54)


In [30]:
from sklearn.feature_selection import SelectKBest, f_classif
selector = SelectKBest(score_func=f_classif, k=40)
X_selected = selector.fit_transform(X_combined, y_combined)


In [26]:
!pip install lazypredict

Collecting lazypredict
  Downloading lazypredict-0.2.16-py2.py3-none-any.whl.metadata (13 kB)
Collecting pytest-runner (from lazypredict)
  Downloading pytest_runner-6.0.1-py3-none-any.whl.metadata (7.3 kB)
Collecting mlflow>=2.0.0 (from lazypredict)
  Downloading mlflow-3.1.1-py3-none-any.whl.metadata (29 kB)
Collecting mlflow-skinny==3.1.1 (from mlflow>=2.0.0->lazypredict)
  Downloading mlflow_skinny-3.1.1-py3-none-any.whl.metadata (30 kB)
Collecting graphene<4 (from mlflow>=2.0.0->lazypredict)
  Downloading graphene-3.4.3-py2.py3-none-any.whl.metadata (6.9 kB)
Collecting gunicorn<24 (from mlflow>=2.0.0->lazypredict)
  Downloading gunicorn-23.0.0-py3-none-any.whl.metadata (4.4 kB)
Collecting databricks-sdk<1,>=0.20.0 (from mlflow-skinny==3.1.1->mlflow>=2.0.0->lazypredict)
  Downloading databricks_sdk-0.57.0-py3-none-any.whl.metadata (39 kB)
Collecting fastapi<1 (from mlflow-skinny==3.1.1->mlflow>=2.0.0->lazypredict)
  Downloading fastapi-0.115.14-py3-none-any.whl.metadata (27 kB)
Col

In [44]:
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from lazypredict.Supervised import LazyClassifier

X_scaled = StandardScaler().fit_transform(X_combined)
X_train, X_test, y_train, y_test = train_test_split(X_scaled, y_combined, test_size=0.2, random_state=42)

In [29]:
len(X_train)

1693

In [None]:
!pip install -U scikit-learn imbalanced-learn --quiet


In [None]:
!pip uninstall scikit-learn imbalanced-learn -y

In [None]:
!pip install scikit-learn==1.3.0 imbalanced-learn==0.10.1

In [45]:


clf = LazyClassifier(verbose=0, ignore_warnings=True)
models, predictions = clf.fit(X_train, X_test, y_train, y_test)
print(models)


  0%|          | 0/29 [00:00<?, ?it/s]

[LightGBM] [Info] Auto-choosing col-wise multi-threading, the overhead of testing was 0.000559 seconds.
You can set `force_col_wise=true` to remove the overhead.
[LightGBM] [Info] Total Bins 13185
[LightGBM] [Info] Number of data points in the train set: 1693, number of used features: 54
[LightGBM] [Info] Start training from score -1.408391
[LightGBM] [Info] Start training from score -1.445296
[LightGBM] [Info] Start training from score -1.403572
[LightGBM] [Info] Start training from score -1.294373
                               Accuracy  Balanced Accuracy ROC AUC  F1 Score  \
Model                                                                          
XGBClassifier                      0.49               0.49    None      0.48   
RandomForestClassifier             0.48               0.48    None      0.47   
LGBMClassifier                     0.47               0.47    None      0.47   
AdaBoostClassifier                 0.46               0.46    None      0.46   
BaggingClassifi

In [None]:
from lightgbm import LGBMClassifier
from sklearn.metrics import accuracy_score, classification_report

# Initialize the classifier
lgbm = LGBMClassifier(random_state=42)

# Train the model
lgbm.fit(X_aug, y_aug)

# Predict on test data
y_pred = lgbm.predict(X_test)

# Evaluate
acc = accuracy_score(y_test, y_pred)
print(f"✅ Accuracy on test set: {acc:.4f}")
print("\n📊 Classification Report:")
print(classification_report(y_test, y_pred))


# SSVEPformer

In [46]:
# Okba Bekhelifi dec 2024, <okba.bekhelifi@univ-usto.dz>
# Implements SSVEPFormer model from:
# Chen, J. et al. (2023) ‘A transformer-based deep neural network model for SSVEP classification’, 
# Neural Networks, 164, pp. 521–534. Available at: https://doi.org/10.1016/j.neunet.2023.04.045.
#
#

from torch import flatten
from torch import nn
import torch.nn.functional as F

class ChComb(nn.Module):
  def __init__(self, Chans=8, Samples=220, dropout=0.5):
    super().__init__()
    self.conv = nn.Conv1d(Chans // 2, Chans, 1, padding='same')
    self.ln   = nn.LayerNorm(Samples)
    self.act  = nn.GELU()
    self.do   = nn.Dropout(p=dropout)

  def forward(self, x):
    return self.do(self.act(self.ln(self.conv(x))))

class Encoder(nn.Module):
  def __init__(self, Chans=16, Samples=220, dropout=0.5):
    super().__init__()
    # CNN module
    self.channels = Chans
    self.ln1  = nn.LayerNorm(Samples)
    self.conv = nn.Conv1d(Chans, Chans, 31, padding='same')
    self.ln2  = nn.LayerNorm(Samples)
    self.act  = nn.GELU()
    self.do   = nn.Dropout(p=dropout)
    # MLP module
    self.ln3  = nn.LayerNorm(Samples)
    self.proj = nn.Linear(Chans, Samples)
    self.do2  = nn.Dropout(p=dropout)

  def forward(self, x):
    #
    shortcut1 = x
    x = self.conv(self.ln1(x))
    x = self.act(self.ln2(x))
    x = self.do(x) + shortcut1
    shortcut2 = x
    #
    x = self.ln3(x)
    output_channels = []
    for i in range(self.channels):
      c = self.proj(x[:,:,i])
      c = c.unsqueeze(1)
      output_channels.append(c)
    x = torch.cat(output_channels, 1)
    x = self.do(x) + shortcut2
    return x

class MlpHead(nn.Module):
  def __init__(self, Chans, Samples, n_classes, drop_rate=0.5):
    super().__init__()
    self.drop       = nn.Dropout(drop_rate)
    self.linear1    = nn.Linear(Chans * Samples, 6 * n_classes)
    self.norm       = nn.LayerNorm(6*n_classes)
    self.activation = nn.GELU()
    self.drop2      = nn.Dropout(drop_rate)
    self.linear2    = nn.Linear(6*n_classes, n_classes)

  def forward(self, x):
    x = flatten(x, 1)
    x = self.drop(x)
    x = self.linear1(x)
    x = self.norm(x)
    x = self.activation(x)
    x = self.drop2(x)
    x = self.linear2(x)
    return x

class SSVEPFormerTH(nn.Module):
  def __init__(self, Chans=8, n_classes=12, fs=256,
               band=[8, 64], resolution=0.25, 
               drop_rate=0.25):
    super().__init__()
    self.name = "SSVEPFORMER"
    self.fs = fs
    self.resolution = resolution
    self.nfft  = round(fs / resolution)
    self.fft_start = int(round(band[0] / self.resolution))
    self.fft_end   = int(round(band[1] / self.resolution)) + 1
    samples = (self.fft_end - self.fft_start) * 2
    filters = 2*Chans

    self.channel_comb = ChComb(filters,  samples, drop_rate)
    self.encoder1     = Encoder(filters, samples, drop_rate)
    self.encoder2     = Encoder(filters, samples, drop_rate)
    self.head         = MlpHead(filters, samples, n_classes, drop_rate)

    self.init_weights()

  def init_weights(self):
    for module in self.modules():
        if hasattr(module, 'weight'):
          cls_name = module.__class__.__name__
          if not("BatchNorm" in cls_name or "LayerNorm" in cls_name):
            nn.init.normal_(module.weight, mean=0.0, std=0.01)
          else:
            nn.init.constant_(module.weight, 1)
          if hasattr(module, "bias"):
            if module.bias is not None:
              nn.init.constant_(module.bias, 0)

  def forward(self, x):
    x = self.transform(x)
    x = self.channel_comb(x)
    x = self.encoder1(x)
    x = self.encoder2(x)
    x = self.head(x)
    return x

  def transform(self, x):
    with torch.no_grad():
      samples = x.shape[-1]
      x = torch.fft.fft(x, n=self.nfft) / samples
      real = x.real[:,:, self.fft_start:self.fft_end]
      imag = x.imag[:,:, self.fft_start:self.fft_end]
      x = torch.cat((real, imag), axis=-1)
    return x


class FBSSVEPFormer(nn.Module):
  def __init__(self, fs=256, n_subbands=3, models=None):
    super().__init__()
    self.name = "FB-SSVEPFORMER"
    self.fs = fs
    self.subbands = [[8*i, 80] for i in range(1, n_subbands+1)]
    self.subnets  = models
    self.conv     = nn.Conv1d(n_subbands, 1, 1, padding='same')
    self.init_weights()

  def init_weights(self):
    nn.init.normal_(self.conv.weight, mean=0.0, std=0.01)
    nn.init.constant_(self.conv.bias, 0)

  def forward(self, x):
    out = []
    for i, band in enumerate(self.subbands):
      c = self.filter_band(x, band)
      c = self.subnets[i](c)
      c = c.unsqueeze(1)
      out.append(c)
    #
    x = torch.cat(out, 1)
    x = self.conv(x)
    return x.squeeze(1)

  def filter_band(self, x, band):
    # x: batch, channels, samples
    device = x.device
    with torch.no_grad():
      x = x.cpu().numpy()
      B, A = butter(4, np.array(band) / (self.fs / 2), btype='bandpass')
      x = filtfilt(B, A, x, axis=-1)
      x = x.copy()
    return torch.tensor(x, dtype=torch.float, device=device)


In [47]:
import torch
from torch.utils.data import DataLoader, TensorDataset

# 1. Reshape data to (samples, 1, timepoints)
X_tensor = torch.tensor(X_train, dtype=torch.float32).unsqueeze(1)
y_tensor = torch.tensor(y_train, dtype=torch.long)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32).unsqueeze(1)
y_test_tensor = torch.tensor(y_test, dtype=torch.long)

# 2. Initialize model
model = SSVEPFormerTH(
    Chans=X_tensor.shape[1],  # Should be 1 after unsqueeze
    n_classes=len(torch.unique(y_tensor)),
    fs=250,
    band=[8, 64]
).to('cuda')

# 3. Train as before...
train_loader = DataLoader(TensorDataset(X_tensor, y_tensor), batch_size=128, shuffle=True)

In [48]:
val_loader = DataLoader(TensorDataset(X_test_tensor, y_test_tensor), batch_size=128)

In [47]:
!pip uninstall optim -y

Found existing installation: optim 0.1.0
Uninstalling optim-0.1.0:
  Successfully uninstalled optim-0.1.0


In [49]:
import torch
import torch.nn as nn
import torch.optim as optim  # ✅ This is what you're missing

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)
best_val_loss = float('inf')

for epoch in range(1, 201):
    model.train()
    train_loss = 0
    train_correct = 0

    for x_batch, y_batch in train_loader:
        x_batch, y_batch = x_batch.to('cuda'), y_batch.to('cuda')
        optimizer.zero_grad()
        output = model(x_batch)
        loss = criterion(output, y_batch)
        loss.backward()
        optimizer.step()

        train_loss += loss.item() * x_batch.size(0)
        train_correct += (output.argmax(dim=1) == y_batch).sum().item()

    train_loss /= len(train_loader.dataset)
    train_acc = train_correct / len(train_loader.dataset)

    # Validation
    model.eval()
    val_loss = 0
    val_correct = 0

    with torch.no_grad():
        for x_val, y_val in val_loader:
            x_val, y_val = x_val.to('cuda'), y_val.to('cuda')
            output = model(x_val)
            loss = criterion(output, y_val)
            val_loss += loss.item() * x_val.size(0)
            val_correct += (output.argmax(dim=1) == y_val).sum().item()

    val_loss /= len(val_loader.dataset)
    val_acc = val_correct / len(val_loader.dataset)

    print(f"Epoch {epoch:03d} | Train Loss: {train_loss:.4f} | Train Acc: {train_acc:.4f} | "
          f"Val Loss: {val_loss:.4f} | Val Acc: {val_acc:.4f}")

Epoch 001 | Train Loss: 1.3850 | Train Acc: 0.2658 | Val Loss: 1.3788 | Val Acc: 0.3090
Epoch 002 | Train Loss: 1.3741 | Train Acc: 0.3148 | Val Loss: 1.3676 | Val Acc: 0.3349
Epoch 003 | Train Loss: 1.3586 | Train Acc: 0.3757 | Val Loss: 1.3536 | Val Acc: 0.3726
Epoch 004 | Train Loss: 1.3450 | Train Acc: 0.3692 | Val Loss: 1.3395 | Val Acc: 0.3679
Epoch 005 | Train Loss: 1.3261 | Train Acc: 0.3940 | Val Loss: 1.3331 | Val Acc: 0.3703
Epoch 006 | Train Loss: 1.3089 | Train Acc: 0.4040 | Val Loss: 1.3199 | Val Acc: 0.3750
Epoch 007 | Train Loss: 1.3037 | Train Acc: 0.4087 | Val Loss: 1.3284 | Val Acc: 0.3915
Epoch 008 | Train Loss: 1.2902 | Train Acc: 0.4040 | Val Loss: 1.3087 | Val Acc: 0.4080
Epoch 009 | Train Loss: 1.2759 | Train Acc: 0.4300 | Val Loss: 1.3130 | Val Acc: 0.3774
Epoch 010 | Train Loss: 1.2728 | Train Acc: 0.4229 | Val Loss: 1.3042 | Val Acc: 0.3939
Epoch 011 | Train Loss: 1.2657 | Train Acc: 0.4294 | Val Loss: 1.3041 | Val Acc: 0.4175
Epoch 012 | Train Loss: 1.2612 |

In [None]:
!pip install torch

In [None]:
from lazypredict.Supervised import LazyClassifier
from sklearn.model_selection import train_test_split

# Split your features
X_train, X_test, y_train, y_test = train_test_split(X_scaled, y_combined, test_size=0.2, random_state=42)

# Run LazyClassifier
clf = LazyClassifier(verbose=0, ignore_warnings=True, custom_metric=None)
models, predictions = clf.fit(X_train, X_test, y_train, y_test)

# Show best model
print(models.head())


In [None]:
from sklearn.ensemble import RandomForestClassifier

best_model = RandomForestClassifier()
best_model.fit(aug_X_train, aug_y_train)



In [None]:
y_pred = best_model.predict(X_test)
acc = accuracy_score(y_test, y_pred)
print(f"✅ Accuracy on test set: {acc:.4f}")
print("\n📊 Classification Report:")
print(classification_report(y_test, y_pred))

In [None]:
import pandas as pd
import numpy as np
import os

# Path to dataset
base_path = '/kaggle/input/mtcaic3'
selected_channels = ['OZ', 'PZ', 'CZ', 'PO8']

# Load test.csv
test_df = pd.read_csv(os.path.join(base_path, 'test.csv'))

# 🔍 Filter last 50 SSVEP entries
ssvep_df = test_df[test_df['task'] == 'SSVEP'].copy()
ssvep_df = ssvep_df.sort_values(by='id').tail(50)

# Prepare predictions
ids = []
predicted_labels = []

for _, row in ssvep_df.iterrows():
    subject_id = row['subject_id']
    session = row['trial_session']
    trial = row['trial']
    id_num = row['id']
    task = row['task']

    eeg_path = f"{base_path}/{task}/test/{subject_id}/{session}/EEGdata.csv"
    
    if not os.path.exists(eeg_path):
        print(f"⚠️ Missing file: {eeg_path}")
        continue

    eeg = pd.read_csv(eeg_path)
    samples_per_trial = 1750  # SSVEP only

    start = (trial - 1) * samples_per_trial
    end = trial * samples_per_trial

    eeg_segment = eeg[selected_channels].iloc[start:end].values.T.astype(np.float32)
    gyro_segment = eeg[['Gyro1', 'Gyro2', 'Gyro3']].iloc[start:end].values.T.astype(np.float32)

    eeg_features = extract_combined_features(eeg_segment)
    gyro_features = extract_gyro_features(gyro_segment)
    full_features = np.concatenate([eeg_features, gyro_features])

    try:
        if 'scaler' in globals() and hasattr(scaler, 'transform') and scaler.n_features_in_ == len(full_features):
            full_features_scaled = scaler.transform([full_features])
            prediction = lgbm.predict(full_features_scaled)[0]
        else:
            prediction = lgbm.predict([full_features])[0]
    except Exception as e:
        print(f" Prediction failed for id {id_num}: {e}")
        continue

    ids.append(id_num)
    predicted_labels.append(prediction)

# Save predictions
label_map = {0: 'Left', 1: 'Right', 2: 'Forward', 3: 'Backward'}  # Example
predicted_labels = [label_map[label] for label in predicted_labels]
submission = pd.DataFrame({'id': ids, 'label': predicted_labels})
submission.to_csv('ssvep_last50_submission.csv', index=False)

print("✅ ssvep_last50_submission.csv generated successfully!")


In [None]:
thresholds = [2.0, 3.0, 3.5, 4, 4.5]

In [None]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score

best_acc = 0
best_threshold = None
results = {}

for threshold in thresholds:
    print(f"\n🔍 Trying threshold: {threshold}")
    X_combined, y_combined = [], []

    for _, row in ssvep_df.iterrows():
        dataset_type = 'train' if row['id'] <= 4800 else 'validation'
        eeg_path = f"{base_path}/SSVEP/{dataset_type}/{row['subject_id']}/{row['trial_session']}/EEGdata.csv"
        eeg = pd.read_csv(eeg_path)

        trial_num = int(row['trial'])
        start = (trial_num - 1) * 1750
        end = trial_num * 1750

        eeg_segment = eeg[selected_channels].iloc[start:end].values.T.astype(np.float32)
        gyro_segment = eeg[['Gyro1', 'Gyro2', 'Gyro3']].iloc[start:end].values.T.astype(np.float32)

        gyro_movement = np.linalg.norm(gyro_segment.T, axis=1)
        if np.std(gyro_movement) > threshold:
            continue

        eeg_features = extract_combined_features(eeg_segment)
        gyro_features = extract_gyro_features(gyro_segment)
        full_features = np.concatenate([eeg_features, gyro_features])

        X_combined.append(full_features)
        y_combined.append(row['label'])

    if len(X_combined) < 10:
        print("❌ Too few trials left, skipping this threshold")
        continue

    X_combined = np.array(X_combined)
    y_combined = np.array(y_combined)

    # Train/test split
    X_scaled = StandardScaler().fit_transform(X_combined)
    X_train, X_test, y_train, y_test = train_test_split(X_scaled, y_combined, test_size=0.2, random_state=42)

    # Train classifier
    clf = RandomForestClassifier(random_state=42)
    clf.fit(X_train, y_train)
    acc = accuracy_score(y_test, clf.predict(X_test))

    print(f"✅ Accuracy at threshold {threshold}: {acc:.4f}")
    results[threshold] = acc

    if acc > best_acc:
        best_acc = acc
        best_threshold = threshold


In [None]:
from sklearn.decomposition import PCA


In [None]:
X_train, X_test, y_train, y_test = train_test_split(X_pca, y_combined, test_size=0.2, random_state=42)

from lazypredict.Supervised import LazyClassifier
clf = LazyClassifier(verbose=0, ignore_warnings=True)
models, predictions = clf.fit(X_train, X_test, y_train, y_test)

print(models)
