# Imports and Environment Setup
This first cell imports all the necessary libraries for data manipulation, signal processing, and machine learning. It also sets up the environment by suppressing warnings for a cleaner output and defining the base paths for the competition datasets.


In [1]:
import pandas as pd
import numpy as np
import os
import warnings
import pickle

# Signal processing libraries
from scipy.signal import cheby1, filtfilt, welch, hilbert, butter
from scipy.linalg import eigh

# Machine learning and preprocessing libraries
from sklearn.cross_decomposition import CCA
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.svm import SVC
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
import xgboost as xgb
from sklearn.ensemble import VotingClassifier

# --- Configuration ---
# Suppress warnings for cleaner output
warnings.filterwarnings('ignore')

# --- Paths ---
# Define base paths for the competition data, supplementary data, and outlier list
BASE_PATH = '/kaggle/input/mtcaic3-phase-ii'
IMITATION_BASE_PATH = '/kaggle/input/imitation/SSVEP'
OUTLIER_CSV_PATH = '/kaggle/input/outliers-list-for-ssvep/outliers_list.csv'

# Global Constants and Parameters
Here, we define all the critical constants and parameters for the SSVEP paradigm. These values are derived from the competition's dataset description and are crucial for ensuring consistency across all processing steps.



In [2]:
# --- Trial Timing Parameters ---
TRIAL_TOTAL_DURATION = 7.0  # Total duration of one trial in seconds
SKIP_DURATION = 2.0         # Initial period to skip (marker/preparation) in seconds
DATA_DURATION = 4.0         # Duration of the actual SSVEP data to be used, in seconds
SAMPLING_RATE = 250         # EEG sampling rate in Hz

# --- EEG and SSVEP Configuration ---
EEG_CHANNELS = ['FZ', 'C3', 'CZ', 'C4', 'PZ', 'PO7', 'OZ', 'PO8']
SSVEP_FREQUENCIES_MAP = {'Forward': 7, 'Backward': 8, 'Left': 10, 'Right': 13}
CLASS_LABELS = list(SSVEP_FREQUENCIES_MAP.keys())
SSVEP_FREQUENCIES_LIST = list(SSVEP_FREQUENCIES_MAP.values())
NUM_HARMONICS = 5           # Number of harmonics to consider for frequency-based features

# --- Sample Calculation ---
# Convert time durations to number of samples
SAMPLES_PER_TRIAL_FULL = int(TRIAL_TOTAL_DURATION * SAMPLING_RATE)
SAMPLES_TO_SKIP = int(SKIP_DURATION * SAMPLING_RATE)
SAMPLES_PER_SSVEP_TRIAL = int(DATA_DURATION * SAMPLING_RATE)

# Data Loading and Preprocessing Utilities
This section contains the utility functions responsible for loading and preparing the raw EEG data. The load_trial_data function is the cornerstone, capable of locating and extracting the precise 4-second EEG segment for any given trial. It also applies Common Average Referencing (CAR), a standard preprocessing step to reduce noise common to all channels.



In [3]:
def apply_car_preprocessing(eeg_data):
    """Applies Common Average Referencing (CAR) to the EEG data."""
    return eeg_data - np.mean(eeg_data, axis=1, keepdims=True)

def load_trial_data(row, base_path, imitation_base_path):
    """
    Loads, extracts, and preprocesses the EEG data for a single trial.
    Handles data from both the main competition dataset and the supplementary imitation dataset.
    """
    eeg_path = ''
    try:
        # Construct path based on whether the subject is from the imitation dataset
        if row['subject_id'] == 'IMITATION_S1':
            eeg_path = os.path.join(imitation_base_path, str(row['trial_session']), 'EEGdata.csv')
        else:
            # Determine if the data is from train, validation, or test set
            id_num = row['id']
            dataset = 'test' if id_num > 4900 else 'validation' if id_num > 4800 else 'train'
            eeg_path = os.path.join(base_path, row['task'], dataset, row['subject_id'], str(row['trial_session']), 'EEGdata.csv')
        
        eeg_data = pd.read_csv(eeg_path)
        
        # Calculate the start and end indices for the 4-second SSVEP stimulation period
        trial_num = int(row['trial'])
        start_idx = (trial_num - 1) * SAMPLES_PER_TRIAL_FULL + SAMPLES_TO_SKIP
        end_idx = start_idx + SAMPLES_PER_SSVEP_TRIAL
        
        # Extract the relevant data and apply CAR
        trial_eeg_data = eeg_data.loc[start_idx:end_idx-1, EEG_CHANNELS].values
        return apply_car_preprocessing(trial_eeg_data)
        
    except (FileNotFoundError, KeyError) as e:
        print(f"Error loading trial {row.get('id', 'N/A')}: {e} for path {eeg_path}")
        return None

# Signal Processing & Filterbank Utilities
These functions build the necessary tools for frequency-domain analysis. We define a refined filterbank for FBCCA and a function to generate the synthetic sine-cosine reference signals required for CCA.

In [4]:
def get_refined_filterbank():
    """Creates a set of Chebyshev and narrow-band bandpass filters for FBCCA."""
    # Broad frequency bands
    filter_bands = [[6 + i * 8, 14 + i * 8] for i in range(5)]
    # Narrow bands centered around target frequencies
    for freq in SSVEP_FREQUENCIES_LIST:
        filter_bands.append([freq - 1, freq + 1])
    
    # Create filter coefficients, ensuring the high-pass frequency is below Nyquist
    return [cheby1(5, 0.1, [l/(0.5*SAMPLING_RATE), h/(0.5*SAMPLING_RATE)], btype='band') 
            for l, h in filter_bands if h <= 125]

def apply_filterbank(eeg_data, filters):
    """Applies a bank of filters to the EEG data."""
    return np.array([filtfilt(b, a, eeg_data, axis=0) for b, a in filters])

def get_reference_signals(duration_samples, frequencies):
    """Generates sine and cosine reference signals for each target frequency and its harmonics."""
    t = np.arange(duration_samples) / SAMPLING_RATE
    return {freq: np.array([m(2 * np.pi * freq * h * t) for h in range(1, NUM_HARMONICS + 1) for m in [np.sin, np.cos]]).T 
            for freq in frequencies}

# Feature Engineering Functions
This is the core of the feature extraction pipeline. We define a diverse set of functions to capture different aspects of the SSVEP response from the EEG signal. The features range from spatial filtering (TRCA) and frequency correlation (FBCCA) to phase synchronization (PLV) and power analysis (PSD, SNR). A key innovation is the use of subject-specific templates, which capture the unique neural response patterns of each individual.

In [5]:
def extract_subject_specific_templates(X_train, y_train, subjects_train):
    """Creates subject-specific and global average SSVEP templates for each class."""
    subject_templates, global_templates = {}, {}
    
    # Create global templates as a fallback
    for class_label in CLASS_LABELS:
        class_trials = X_train[y_train == class_label]
        global_templates[class_label] = np.mean(class_trials, axis=0) if len(class_trials) > 0 else np.zeros((SAMPLES_PER_SSVEP_TRIAL, len(EEG_CHANNELS)))
    
    # Create subject-specific templates
    for subject in np.unique(subjects_train):
        subject_templates[subject] = {}
        for class_label in CLASS_LABELS:
            mask = (subjects_train == subject) & (y_train == class_label)
            # Use subject-specific average if enough trials exist, otherwise use global template
            subject_templates[subject][class_label] = np.mean(X_train[mask], axis=0) if np.sum(mask) >= 2 else global_templates[class_label]
            
    return subject_templates, global_templates

def get_template_correlation_features(eeg_trial, subject_templates, global_templates, subject_id):
    """Calculates correlation, power, and phase coherence between a trial and learned templates."""
    templates_to_use = subject_templates.get(subject_id, global_templates)
    correlations = []
    for class_label in CLASS_LABELS:
        template = templates_to_use.get(class_label, global_templates[class_label])
        # Calculate multiple correlation types
        channel_corrs = [np.corrcoef(eeg_trial[:, ch], template[:, ch])[0, 1] for ch in range(eeg_trial.shape[1])]
        trial_power = np.mean(eeg_trial**2, axis=0)
        template_power = np.mean(template**2, axis=0)
        power_corr = np.corrcoef(trial_power, template_power)[0, 1]
        trial_phase = np.angle(hilbert(eeg_trial, axis=0))
        template_phase = np.angle(hilbert(template, axis=0))
        phase_coherence = np.mean(np.abs(np.mean(np.exp(1j * (trial_phase - template_phase)), axis=0)))
        
        correlations.extend([np.nanmean(channel_corrs), power_corr, phase_coherence])
    return np.nan_to_num(np.array(correlations))

def get_plv_features(eeg_trial):
    """Calculates Phase-Locking Value (PLV) between all channel pairs for each target frequency."""
    plv_features = []
    for target_freq in SSVEP_FREQUENCIES_LIST:
        low, high = max(1, target_freq - 1), min(125, target_freq + 1)
        b, a = butter(4, [low/(0.5*SAMPLING_RATE), high/(0.5*SAMPLING_RATE)], btype='band')
        filtered_phases = [np.angle(hilbert(filtfilt(b, a, eeg_trial[:, ch]))) for ch in range(eeg_trial.shape[1])]
        for i in range(len(filtered_phases)):
            for j in range(i + 1, len(filtered_phases)):
                plv = np.abs(np.mean(np.exp(1j * (filtered_phases[i] - filtered_phases[j]))))
                plv_features.append(plv)
    return np.array(plv_features)

def get_enhanced_cca_features(filtered_eeg_bank, reference_signals):
    """Calculates enhanced CCA correlation features from a filterbank of EEG signals."""
    enhanced_features = []
    for filtered_eeg in filtered_eeg_bank:
        for freq in SSVEP_FREQUENCIES_LIST:
            ref_sig = reference_signals[freq]
            cca = CCA(n_components=1)
            cca.fit(filtered_eeg, ref_sig)
            X_c, Y_c = cca.transform(filtered_eeg, ref_sig)
            corr = np.corrcoef(X_c.T, Y_c.T)[0, 1]
            enhanced_features.append(corr)
    return np.nan_to_num(np.array(enhanced_features))

def get_optimized_spatial_filters(X_train, y_train, labels):
    """Computes spatial filters that maximize inter-trial covariance for each class (related to TRCA)."""
    spatial_filters = {}
    for label in labels:
        class_trials = X_train[y_train == label]
        if len(class_trials) < 2:
            spatial_filters[label] = np.ones(X_train.shape[2]) / X_train.shape[2]
            continue
        S = np.sum([np.cov(t.T) for t in class_trials], axis=0)
        Q = np.cov(np.mean(class_trials, axis=0).T)
        reg = 1e-5 * np.eye(S.shape[0])
        try:
            _, evecs = eigh(Q, S + reg)
            spatial_filters[label] = evecs[:, -1]
        except np.linalg.LinAlgError:
            spatial_filters[label] = np.ones(X_train.shape[2]) / X_train.shape[2]
    return spatial_filters

def extract_trca_features(eeg_trial, spatial_filters):
    """Applies spatial filters and calculates correlation with the mean template."""
    features = []
    # Create a single template from the current trial
    trial_template = np.mean(eeg_trial, axis=0)
    
    for label, w in spatial_filters.items():
        proj_trial = eeg_trial @ w
        proj_template = trial_template @ w
        
        # Create a template signal by repeating the projected template value
        template_signal = np.full_like(proj_trial, proj_template)
        
        # Calculate correlation between projected trial and template signal
        if np.std(proj_trial) > 1e-10 and np.std(template_signal) > 1e-10:
            corr = np.corrcoef(proj_trial, template_signal)[0, 1]
        else:
            corr = 0.0
        features.append(corr)
    return np.nan_to_num(np.array(features))

def get_psd_features(eeg_trial):
    """Computes Power Spectral Density (PSD) features around target frequencies and their harmonics."""
    psd_features = []
    freqs, psd = welch(eeg_trial, fs=SAMPLING_RATE, nperseg=eeg_trial.shape[0], axis=0)
    for target_freq in SSVEP_FREQUENCIES_LIST:
        for h in range(1, NUM_HARMONICS + 1):
            harmonic_freq = target_freq * h
            if harmonic_freq <= freqs[-1]:
                idx = np.argmin(np.abs(freqs - harmonic_freq))
                # Average power in a small band around the target frequency, across all channels
                psd_features.append(np.mean(psd[idx, :]))
            else:
                psd_features.append(0)
    return np.array(psd_features)

def get_snr_features(eeg_trial):
    """Computes Signal-to-Noise Ratio (SNR) at target frequencies."""
    snr_features = []
    freqs, psd = welch(eeg_trial, fs=SAMPLING_RATE, nperseg=eeg_trial.shape[0], axis=0)
    for target_freq in SSVEP_FREQUENCIES_LIST:
        idx = np.argmin(np.abs(freqs - target_freq))
        if 2 < idx < len(psd) - 2:
            signal_power = np.mean(psd[idx, :])
            # Noise is estimated from neighboring frequency bins
            noise_power = np.mean(psd[[idx-2, idx-1, idx+1, idx+2], :])
            snr_features.append(signal_power / (noise_power + 1e-10))
        else:
            snr_features.append(0)
    return np.array(snr_features)

# Master Feature Extraction Pipeline
This function acts as an orchestrator. For each EEG trial, it calls all the individual feature engineering functions defined above and concatenates their outputs into a single, comprehensive feature vector. This vector will be the input to our machine learning model.

In [6]:
def extract_all_features(eeg_trial, filters, reference_signals, spatial_filters, subject_templates, global_templates, subject_id):
    """
    Combines all feature extraction methods into a single function to generate a complete feature vector.
    """
    # Apply filterbank for FBCCA
    filtered_bank = apply_filterbank(eeg_trial, filters)
    
    # Extract features from each domain
    fbcca_feat = get_enhanced_cca_features(filtered_bank, reference_signals)
    trca_feat = extract_trca_features(eeg_trial, spatial_filters)
    plv_feat = get_plv_features(eeg_trial)
    psd_feat = get_psd_features(eeg_trial)
    snr_feat = get_snr_features(eeg_trial)
    template_corr_feat = get_template_correlation_features(eeg_trial, subject_templates, global_templates, subject_id)
    
    # Concatenate all features into one vector
    return np.concatenate([trca_feat, fbcca_feat, plv_feat, psd_feat, snr_feat, template_corr_feat])

# Main - Data Loading and Preparation
The main execution begins here. This cell handles the initial data loading and preparation steps:

1. Loads the outlier list from the provided CSV.

1. Loads the main `train.csv`, `validation.csv`, and `test.csv` files.

1. Filters out the identified outlier trials from the training and validation sets. (This had its own outlier analysis and generated a file specifying which trails are too extreme and need to be removed)

1. Loads the supplementary imitation dataset to augment our training data.

1. Combines all data sources into a single `full_train_df` for model training.

In [7]:
# [STEP 1 & 2] Load outlier list and main competition dataset
print("[STEP 1 & 2] Loading outliers and main dataset...")
try:
    outliers_df = pd.read_csv(OUTLIER_CSV_PATH)
    print(f"✅ Success: Found {len(outliers_df)} trials marked for exclusion.")
except FileNotFoundError:
    print(f"⚠️ Warning: Outlier file not found. Proceeding without excluding trials.")
    outliers_df = pd.DataFrame()

train_df = pd.read_csv(os.path.join(BASE_PATH, 'train.csv'))
validation_df = pd.read_csv(os.path.join(BASE_PATH, 'validation.csv'))
test_df = pd.read_csv(os.path.join(BASE_PATH, 'test.csv'))

# Filter for SSVEP task only
train_ssvep = train_df[train_df['task'] == 'SSVEP'].copy()
validation_ssvep = validation_df[validation_df['task'] == 'SSVEP'].copy()
test_ssvep = test_df[test_df['task'] == 'SSVEP'].reset_index(drop=True)
print("✅ Success: Main dataset loaded.")

# [STEP 3] Filter out the specified outliers
if not outliers_df.empty:
    print("\n[STEP 3] Filtering outlier trials from the dataset...")
    # Create a unique key for merging
    outliers_df['outlier_key'] = outliers_df['subject_number'].astype(str) + '_' + outliers_df['session_number'].astype(str) + '_' + outliers_df['trial_number'].astype(str)
    train_ssvep['subject_number'] = train_ssvep['subject_id'].str.replace('S', '').astype(int)
    train_ssvep['trial_key'] = train_ssvep['subject_number'].astype(str) + '_' + train_ssvep['trial_session'].astype(str) + '_' + train_ssvep['trial'].astype(str)
    
    # Keep only rows that are not in the outlier list
    initial_train_count = len(train_ssvep)
    train_ssvep = train_ssvep[~train_ssvep['trial_key'].isin(outliers_df['outlier_key'])].drop(columns=['subject_number', 'trial_key'])
    print(f"Removed {initial_train_count - len(train_ssvep)} trials from the training set.")
    print("✅ Success: Filtering complete.")

# [STEP 4] Load supplementary imitation dataset and combine
print("\n[STEP 4] Loading and combining supplementary imitation dataset...")
imitation_dfs = []
for session in [1, 2]:
    label_path = os.path.join(IMITATION_BASE_PATH, str(session), 'trial_labels.csv')
    temp_df = pd.read_csv(label_path)
    temp_df['subject_id'] = 'IMITATION_S1'
    temp_df['trial_session'] = session
    temp_df['task'] = 'SSVEP'
    temp_df.rename(columns={'direction': 'label'}, inplace=True)
    temp_df['id'] = 20000 + (session * 100) + temp_df.index # Unique ID
    imitation_dfs.append(temp_df)
imitation_df = pd.concat(imitation_dfs, ignore_index=True)
print(f"✅ Success: Loaded {len(imitation_df)} supplementary trials.")

# Combine all data for training
full_train_df = pd.concat([train_ssvep, validation_ssvep, imitation_df]).reset_index(drop=True)
print(f"\nTotal training data after filtering and augmentation: {len(full_train_df)} trials")

[STEP 1 & 2] Loading outliers and main dataset...
✅ Success: Found 86 trials marked for exclusion.
✅ Success: Main dataset loaded.

[STEP 3] Filtering outlier trials from the dataset...
Removed 65 trials from the training set.
✅ Success: Filtering complete.

[STEP 4] Loading and combining supplementary imitation dataset...
✅ Success: Loaded 20 supplementary trials.

Total training data after filtering and augmentation: 2405 trials


# Model Training Pipeline
This cell executes the core model training process.

1. Loads all raw EEG trials into memory.
   
1. Generates the necessary assets for feature extraction (templates, spatial filters, reference signals).

1. Iterates through the training data, applying the `extract_all_features` function to each trial.

1. Defines the ensemble model: a `VotingClassifier` combining XGBoost, SVC, and LDA.

1. Scales the features using `StandardScaler`.

1. Trains the final ensemble model on the complete, scaled feature set.

In [8]:
print("\n[STEP 5] Pre-computing features and training model...")

# Generate assets for feature extraction
reference_signals = get_reference_signals(SAMPLES_PER_SSVEP_TRIAL, SSVEP_FREQUENCIES_LIST)
filters = get_refined_filterbank()

# Load all raw training data into memory
X_train_raw, y_train, subjects_train = [], [], []
for _, row in full_train_df.iterrows():
    trial_data = load_trial_data(row, BASE_PATH, IMITATION_BASE_PATH)
    if trial_data is not None and trial_data.shape == (SAMPLES_PER_SSVEP_TRIAL, len(EEG_CHANNELS)):
        X_train_raw.append(trial_data)
        y_train.append(row['label'])
        subjects_train.append(row['subject_id'])
        
X_train, y_train, subjects_train = np.array(X_train_raw), np.array(y_train), np.array(subjects_train)

# Encode labels
le = LabelEncoder()
y_train_encoded = le.fit_transform(y_train)

# Create subject-specific assets from the training data
subject_templates, global_templates = extract_subject_specific_templates(X_train, y_train, subjects_train)
spatial_filters = get_optimized_spatial_filters(X_train, y_train, CLASS_LABELS)

# Extract features for the entire training set
X_train_features = np.array([extract_all_features(trial, filters, reference_signals, spatial_filters, subject_templates, global_templates, sub_id) 
                             for trial, sub_id in zip(X_train, subjects_train)])

# Define the ensemble model with tuned hyperparameters
clf1 = xgb.XGBClassifier(random_state=42, use_label_encoder=False, eval_metric='mlogloss', learning_rate=0.1, max_depth=4, n_estimators=150)
clf2 = SVC(probability=True, random_state=42, C=100, kernel='rbf', gamma='scale')
clf3 = LinearDiscriminantAnalysis(solver='lsqr', shrinkage='auto')
ensemble_model = VotingClassifier(estimators=[('xgb', clf1), ('svc', clf2), ('lda', clf3)], voting='soft', weights=[0.5, 0.3, 0.2])

# Scale features and train the model
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train_features)
ensemble_model.fit(X_train_scaled, y_train_encoded)

print("✅ Success: Model training complete.")


[STEP 5] Pre-computing features and training model...
✅ Success: Model training complete.


# Checkpointing, Prediction, and Submission

In the final cells, we save our entire trained pipeline and generate the final submission file.

1. **Checkpointing**: All trained components—the model, scaler, label encoder, and feature extraction assets—are saved into a single pickle file. This allows for easy reloading and inference without retraining.

1. **Prediction**: We loop through the test set, load each trial, extract its features using the pre-computed assets, and predict the label with our trained model.

1. **Submission**: The predictions are formatted into the required submission.csv file.

In [9]:
# [STEP 6] Save the complete trained pipeline to a checkpoint file
print("\n[STEP 6] Saving trained model and assets to checkpoint...")

checkpoint = {
    'model': ensemble_model,
    'scaler': scaler,
    'label_encoder': le,
    'spatial_filters': spatial_filters,
    'subject_templates': subject_templates,
    'global_templates': global_templates,
    'filter_bank': filters,
    'ref_signals': reference_signals
}

checkpoint_filename = 'ssvep_checkpoint.pkl'
with open(checkpoint_filename, 'wb') as f:
    pickle.dump(checkpoint, f)
    
print(f"✅ Success: Checkpoint saved to '{checkpoint_filename}'")


[STEP 6] Saving trained model and assets to checkpoint...
✅ Success: Checkpoint saved to 'ssvep_checkpoint.pkl'


In [10]:
# [STEP 7] Generate submission file for the test set
print("\n[STEP 7] Generating submission file for the test set...")

test_predictions = []
for _, row in test_ssvep.iterrows():
    trial_data = load_trial_data(row, BASE_PATH, IMITATION_BASE_PATH)
    if trial_data is not None and trial_data.shape == (SAMPLES_PER_SSVEP_TRIAL, len(EEG_CHANNELS)):
        # Extract features for the test trial
        test_features = extract_all_features(trial_data, filters, reference_signals, spatial_filters, subject_templates, global_templates, row['subject_id']).reshape(1, -1)
        # Scale and predict
        test_features_scaled = scaler.transform(test_features)
        prediction_encoded = ensemble_model.predict(test_features_scaled)[0]
        # Decode the prediction back to the original label
        test_predictions.append(le.inverse_transform([prediction_encoded])[0])
    else:
        # Fallback to the most frequent class if data loading fails
        test_predictions.append(pd.Series(y_train).mode()[0])
        
# Create the submission DataFrame
submission_df = pd.DataFrame({'id': test_ssvep['id'], 'label': test_predictions})
submission_df.to_csv('submission.csv', index=False)

print(f"\n✅ All steps complete. Submission file 'submission.csv' created successfully!")
print("\nPredictions summary:\n", submission_df['label'].value_counts())


[STEP 7] Generating submission file for the test set...

✅ All steps complete. Submission file 'submission.csv' created successfully!

Predictions summary:
 label
Left        33
Backward    28
Forward     23
Right       16
Name: count, dtype: int64
