# EEG Emotional Memory Classification Pipeline

## Complete End-to-End Implementation

This notebook implements a comprehensive pipeline for classifying EEG signals during sleep into emotional (negative) or neutral categories using:
- **Hilbert Transform** for instantaneous power extraction
- **Time-Resolved Classification** with LDA
- **Leave-One-Out Cross-Validation** for robust evaluation
- **Window-Based AUC Metric** optimization

### Key Features:
- Data loading from MATLAB .mat files
- Theta band filtering (4-8 Hz)
- Per-participant z-score standardization
- 16-channel EEG analysis
- Submission generation with validation

---

In [None]:
# Import Required Libraries
import numpy as np
import pandas as pd
import scipy.io as sio
from scipy import signal
from scipy.signal import hilbert
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.metrics import roc_auc_score, roc_curve
from sklearn.preprocessing import StandardScaler
import warnings
import os
from pathlib import Path
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm

warnings.filterwarnings('ignore')

# Configure plotting
plt.style.use('seaborn-v0_8-darkgrid')
sns.set_palette("husl")

print("‚úì All libraries imported successfully")

## 1. Data Loading and Conversion from MATLAB Format

Load .mat files and convert MATLAB structures to Python dictionaries, ensuring proper data integrity and structure verification.

In [None]:
def convert_matlab_struct(matlab_dict):
    """
    Convert MATLAB structure to Python dictionary recursively.
    
    Args:
        matlab_dict: Dictionary loaded from .mat file
        
    Returns:
        Converted Python dictionary with numpy arrays
    """
    if isinstance(matlab_dict, sio.matlab.mio5_params.mat_struct):
        result = {}
        for key in matlab_dict._fieldnames:
            result[key] = convert_matlab_struct(getattr(matlab_dict, key))
        return result
    elif isinstance(matlab_dict, np.ndarray):
        if matlab_dict.dtype == object:
            return np.array([convert_matlab_struct(item) for item in matlab_dict])
        else:
            return matlab_dict
    else:
        return matlab_dict


def load_eeg_data(data_path, data_type='training'):
    """
    Load and parse EEG data from .mat files.
    
    Args:
        data_path: Path to the data directory
        data_type: 'training' or 'testing'
        
    Returns:
        Dictionary containing organized EEG data and labels
    """
    data_dir = Path(data_path) / data_type
    
    if not data_dir.exists():
        raise FileNotFoundError(f"Data directory not found: {data_dir}")
    
    # Find all .mat files
    mat_files = sorted(list(data_dir.glob('**/*.mat')))
    
    if not mat_files:
        raise FileNotFoundError(f"No .mat files found in {data_dir}")
    
    all_signals = []
    all_labels = []
    file_info = []
    
    print(f"Loading {len(mat_files)} files from {data_dir.name}/...")
    
    for mat_file in tqdm(mat_files, desc="Loading"):
        try:
            # Load MATLAB file
            raw_data = sio.loadmat(mat_file, squeeze_me=True)
            
            # Extract signal (common keys: 'data', 'EEG', 'signal')
            signal_data = None
            for key in ['data', 'EEG', 'signal', 'eeg']:
                if key in raw_data:
                    signal_data = raw_data[key]
                    break
            
            if signal_data is None:
                # Try first array that's not metadata
                for key, val in raw_data.items():
                    if not key.startswith('__') and isinstance(val, np.ndarray):
                        signal_data = val
                        break
            
            if signal_data is not None:
                # Ensure 2D shape (channels, timepoints)
                if signal_data.ndim == 1:
                    signal_data = signal_data[np.newaxis, :]
                elif signal_data.ndim == 3:
                    # If multiple trials, use first trial
                    signal_data = signal_data[0]
                
                # Determine label from filename or default
                if 'emo' in mat_file.name.lower():
                    label = 2  # Emotional
                elif 'neu' in mat_file.name.lower():
                    label = 1  # Neutral
                else:
                    label = 0  # Unknown
                
                all_signals.append(signal_data)
                all_labels.append(label)
                file_info.append({
                    'file': mat_file.name,
                    'shape': signal_data.shape,
                    'label': label
                })
        
        except Exception as e:
            print(f"Warning: Could not load {mat_file.name}: {e}")
            continue
    
    # Print summary
    print(f"\n‚úì Loaded {len(all_signals)} files")
    print(f"  Neutral samples: {sum(1 for l in all_labels if l == 1)}")
    print(f"  Emotional samples: {sum(1 for l in all_labels if l == 2)}")
    print(f"  Unknown samples: {sum(1 for l in all_labels if l == 0)}")
    
    return {
        'signals': all_signals,
        'labels': np.array(all_labels),
        'file_info': file_info
    }


# Configuration
DATA_PATH = r'd:\Deep Learning & Time Series - predicting-emotions-using-brain-waves\EEG-Sleep-Emotion-Decoder\data'

# Load data (both training and testing)
try:
    train_data = load_eeg_data(DATA_PATH, 'training')
    test_data = load_eeg_data(DATA_PATH, 'testing')
    print("\n‚úì Data loaded successfully!")
except Exception as e:
    print(f"Note: {e}")
    print("You can still run the pipeline with mock data for demonstration")

## 2. Data Structure Organization and Label Assignment

Organize data into 3D arrays [Trials √ó Channels √ó Timepoints] with proper label assignment for neutral and emotional categories.

In [None]:
def organize_eeg_data(signals_list, labels_list):
    """
    Organize list of signals into 3D array [Trials √ó Channels √ó Timepoints].
    
    Args:
        signals_list: List of 2D arrays (channels √ó timepoints)
        labels_list: List of labels
        
    Returns:
        Tuple of (organized_3D_array, labels, info_dict)
    """
    # Determine dimensions
    n_trials = len(signals_list)
    n_channels = signals_list[0].shape[0]
    n_timepoints = signals_list[0].shape[1]
    
    print(f"Data Organization:")
    print(f"  Trials: {n_trials}")
    print(f"  Channels: {n_channels}")
    print(f"  Timepoints: {n_timepoints}")
    
    # Create 3D array
    X = np.zeros((n_trials, n_channels, n_timepoints))
    
    for i, signal_data in enumerate(signals_list):
        if signal_data.shape[0] != n_channels or signal_data.shape[1] != n_timepoints:
            print(f"Warning: Trial {i} has unexpected shape {signal_data.shape}")
            # Reshape or skip
            continue
        X[i] = signal_data
    
    y = np.array(labels_list)
    
    # Summary statistics
    info = {
        'total_trials': n_trials,
        'neutral_trials': np.sum(y == 1),
        'emotional_trials': np.sum(y == 2),
        'n_channels': n_channels,
        'n_timepoints': n_timepoints,
        'sampling_rate': 200  # Hz
    }
    
    print(f"\n‚úì Data organized successfully:")
    print(f"  Shape: {X.shape} (Trials √ó Channels √ó Timepoints)")
    print(f"  Neutral (label 1): {info['neutral_trials']}")
    print(f"  Emotional (label 2): {info['emotional_trials']}")
    
    return X, y, info


# Organize training data
if 'train_data' in locals():
    X_train, y_train, train_info = organize_eeg_data(train_data['signals'], train_data['labels'])
    
    # Organize test data
    X_test, y_test, test_info = organize_eeg_data(test_data['signals'], test_data['labels'])
else:
    print("Creating mock data for demonstration...")
    # Create mock data for demonstration
    n_train_neutral = 10
    n_train_emotional = 8
    n_test = 3
    n_channels = 16
    n_timepoints = 200
    
    X_train = np.random.randn(n_train_neutral + n_train_emotional, n_channels, n_timepoints)
    y_train = np.array([1]*n_train_neutral + [2]*n_train_emotional)
    
    X_test = np.random.randn(n_test, n_channels, n_timepoints)
    y_test = np.array([1, 2, 1])  # Mixed labels for demo
    
    train_info = {
        'total_trials': len(y_train),
        'neutral_trials': np.sum(y_train == 1),
        'emotional_trials': np.sum(y_train == 2),
        'n_channels': n_channels,
        'n_timepoints': n_timepoints,
        'sampling_rate': 200
    }
    
    print(f"‚úì Mock data created: {X_train.shape}")

## 3. Bandpass Filtering for Theta Band Extraction

Apply Butterworth bandpass filter (4-8 Hz theta band) to isolate emotionally-relevant frequency components during sleep.

In [None]:
def apply_bandpass_filter(X, low_freq=4, high_freq=8, srate=200, order=4):
    """
    Apply Butterworth bandpass filter to EEG data.
    
    Args:
        X: Input data (trials, channels, timepoints)
        low_freq: Low cutoff frequency (Hz)
        high_freq: High cutoff frequency (Hz)
        srate: Sampling rate (Hz)
        order: Filter order
        
    Returns:
        Filtered data with same shape as input
    """
    print(f"Applying bandpass filter: {low_freq}-{high_freq} Hz (order={order})")
    
    # Compute Nyquist frequency
    nyquist = srate / 2
    low = low_freq / nyquist
    high = high_freq / nyquist
    
    # Design filter using second-order sections for stability
    sos = signal.butter(order, [low, high], btype='band', output='sos')
    
    n_trials, n_channels, n_timepoints = X.shape
    X_filtered = np.zeros_like(X)
    
    # Apply filter to each trial and channel
    for trial in range(n_trials):
        for channel in range(n_channels):
            # Use filtfilt for zero-phase filtering
            X_filtered[trial, channel, :] = signal.filtfilt(sos[0], sos[1], X[trial, channel, :])
    
    print(f"‚úì Filter applied. Output shape: {X_filtered.shape}")
    return X_filtered


# Apply bandpass filter to training and test data
X_train_filtered = apply_bandpass_filter(X_train, low_freq=4, high_freq=8, srate=200)
X_test_filtered = apply_bandpass_filter(X_test, low_freq=4, high_freq=8, srate=200)

# Visualize filtered signal for one trial
fig, axes = plt.subplots(2, 1, figsize=(12, 6))

# Original signal (channel 0)
axes[0].plot(X_train[0, 0, :], alpha=0.7, label='Original')
axes[0].set_title('Original EEG Signal (Channel 0)')
axes[0].set_ylabel('Amplitude (¬µV)')
axes[0].legend()
axes[0].grid(True, alpha=0.3)

# Filtered signal
axes[1].plot(X_train_filtered[0, 0, :], alpha=0.7, color='orange', label='Filtered (4-8 Hz)')
axes[1].set_title('Bandpass Filtered Signal (Theta Band)')
axes[1].set_xlabel('Timepoint (200 Hz sampling)')
axes[1].set_ylabel('Amplitude (¬µV)')
axes[1].legend()
axes[1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

print("‚úì Filtering visualization complete")

## 4. Feature Extraction Using Hilbert Transform

Extract instantaneous power from filtered signals using the Hilbert transform to preserve temporal resolution.

In [None]:
def extract_hilbert_power(X_filtered):
    """
    Extract instantaneous power using Hilbert transform.
    
    The Hilbert transform converts the real signal into a complex analytic signal.
    The magnitude of this complex signal gives the instantaneous amplitude.
    Squaring gives the instantaneous power.
    
    Args:
        X_filtered: Bandpass filtered data (trials, channels, timepoints)
        
    Returns:
        Power features preserving temporal resolution (trials, channels, timepoints)
    """
    print("Extracting instantaneous power using Hilbert transform...")
    
    n_trials, n_channels, n_timepoints = X_filtered.shape
    power = np.zeros_like(X_filtered)
    
    for trial in range(n_trials):
        for channel in range(n_channels):
            # Compute analytic signal using Hilbert transform
            analytic_signal = hilbert(X_filtered[trial, channel, :])
            
            # Get instantaneous amplitude (magnitude)
            instantaneous_amplitude = np.abs(analytic_signal)
            
            # Square to get instantaneous power
            power[trial, channel, :] = instantaneous_amplitude ** 2
    
    print(f"‚úì Power extracted. Shape: {power.shape}")
    return power


# Extract power features
power_train = extract_hilbert_power(X_train_filtered)
power_test = extract_hilbert_power(X_test_filtered)

# Visualize Hilbert transform and power extraction
fig, axes = plt.subplots(3, 1, figsize=(12, 8))

trial_idx = 0
channel_idx = 0
signal_orig = X_train_filtered[trial_idx, channel_idx, :]
analytic = hilbert(signal_orig)
amplitude = np.abs(analytic)
power_inst = power_train[trial_idx, channel_idx, :]

timepoints = np.arange(len(signal_orig)) / 200 * 1000  # Convert to ms

# Plot 1: Original filtered signal
axes[0].plot(timepoints, signal_orig, 'b-', alpha=0.7, linewidth=1)
axes[0].set_title('Filtered Signal (4-8 Hz Theta Band)')
axes[0].set_ylabel('Amplitude (¬µV)')
axes[0].grid(True, alpha=0.3)

# Plot 2: Instantaneous amplitude (Hilbert envelope)
axes[1].plot(timepoints, signal_orig, 'b-', alpha=0.3, linewidth=1, label='Filtered signal')
axes[1].plot(timepoints, amplitude, 'r-', linewidth=2, label='Hilbert envelope (amplitude)')
axes[1].plot(timepoints, -amplitude, 'r-', linewidth=2)
axes[1].set_title('Instantaneous Amplitude from Hilbert Transform')
axes[1].set_ylabel('Amplitude (¬µV)')
axes[1].legend()
axes[1].grid(True, alpha=0.3)

# Plot 3: Instantaneous power
axes[2].fill_between(timepoints, power_inst, alpha=0.6, color='green')
axes[2].set_title('Instantaneous Power (Amplitude¬≤)')
axes[2].set_xlabel('Time (ms)')
axes[2].set_ylabel('Power (¬µV¬≤)')
axes[2].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

print("‚úì Feature extraction visualization complete")

## 5. Data Standardization and Participant-Level Z-Scoring

Standardize power features per participant to account for inter-subject variability in EEG amplitude.

In [None]:
def standardize_per_participant(power_data, participant_ids=None):
    """
    Standardize power features per participant using z-score normalization.
    
    Args:
        power_data: Power features (trials, channels, timepoints)
        participant_ids: Participant ID for each trial (if None, treats all as one participant)
        
    Returns:
        Standardized power data
    """
    power_standardized = np.zeros_like(power_data)
    
    if participant_ids is None:
        # Treat all trials as one participant
        participant_ids = np.zeros(power_data.shape[0], dtype=int)
    
    unique_participants = np.unique(participant_ids)
    print(f"Standardizing data for {len(unique_participants)} participants...")
    
    for participant in unique_participants:
        # Get trials for this participant
        mask = participant_ids == participant
        trials_indices = np.where(mask)[0]
        
        # Compute mean and std for this participant across all their trials
        participant_data = power_data[mask]  # (n_trials, channels, timepoints)
        mean = np.mean(participant_data)
        std = np.std(participant_data)
        
        # Standardize
        if std > 0:
            power_standardized[mask] = (participant_data - mean) / std
        else:
            power_standardized[mask] = participant_data
    
    print(f"‚úì Data standardized")
    return power_standardized


# For demo, we'll assign participant IDs based on trial index
n_participants_train = 14  # 14 training participants
participant_ids_train = np.array([i % n_participants_train for i in range(len(y_train))])

# Standardize
power_train_std = standardize_per_participant(power_train, participant_ids_train)
power_test_std = standardize_per_participant(power_test)

# Visualize standardization effect
fig, axes = plt.subplots(2, 2, figsize=(12, 8))

for idx, (ax, data, title) in enumerate([
    (axes[0, 0], power_train[0], 'Original Power (Trial 0)'),
    (axes[0, 1], power_train_std[0], 'Standardized Power (Trial 0)'),
    (axes[1, 0], power_train[1], 'Original Power (Trial 1)'),
    (axes[1, 1], power_train_std[1], 'Standardized Power (Trial 1)')
]):
    # Show mean power across channels
    mean_power = np.mean(data, axis=0)
    ax.plot(mean_power, linewidth=1.5, alpha=0.7)
    ax.set_title(title)
    ax.set_xlabel('Timepoint')
    ax.set_ylabel('Power')
    ax.grid(True, alpha=0.3)
    ax.text(0.02, 0.95, f'Mean: {mean_power.mean():.3f}\nStd: {mean_power.std():.3f}',
            transform=ax.transAxes, verticalalignment='top',
            bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5))

plt.suptitle('Effect of Per-Participant Standardization', fontsize=14, fontweight='bold')
plt.tight_layout()
plt.show()

print("‚úì Standardization visualization complete")

## 6. Leave-One-Out Cross-Validation Implementation

Implement robust cross-validation by training on 13 subjects and validating on 1 held-out subject, repeated for all participants.

In [None]:
def leave_one_out_cv(power_data, labels, participant_ids, n_timepoints=200):
    """
    Perform Leave-One-Out Cross-Validation at participant level.
    
    Args:
        power_data: Standardized power features (trials, channels, timepoints)
        labels: Class labels (1=Neutral, 2=Emotional)
        participant_ids: Participant ID for each trial
        n_timepoints: Number of timepoints
        
    Returns:
        Dictionary with CV results including predictions and classifiers
    """
    unique_participants = np.unique(participant_ids)
    n_participants = len(unique_participants)
    n_trials = power_data.shape[0]
    
    print(f"Leave-One-Out CV with {n_participants} participants")
    print(f"Training on {n_participants-1} subjects, validating on 1\n")
    
    # Storage for results
    cv_results = {
        'predictions': np.zeros((n_trials, n_timepoints)),
        'true_labels': labels.copy(),
        'participant_ids': participant_ids.copy(),
        'classifiers_per_timepoint': {},
        'val_auc_per_fold': []
    }
    
    # Leave-One-Out loop
    for fold, test_participant in enumerate(tqdm(unique_participants, desc='CV Folds')):
        # Split data
        train_mask = participant_ids != test_participant
        val_mask = participant_ids == test_participant
        
        X_train_fold = power_data[train_mask]  # (n_train_trials, channels, timepoints)
        y_train_fold = labels[train_mask]
        
        X_val_fold = power_data[val_mask]  # (n_val_trials, channels, timepoints)
        y_val_fold = labels[val_mask]
        val_trial_indices = np.where(val_mask)[0]
        
        # Train a classifier for each timepoint
        classifiers_fold = {}
        val_predictions = np.zeros((X_val_fold.shape[0], n_timepoints))
        
        for t in range(n_timepoints):
            # Get feature vector at timepoint t (16 channels)
            X_train_t = X_train_fold[:, :, t]  # (n_train_trials, 16 channels)
            X_val_t = X_val_fold[:, :, t]      # (n_val_trials, 16 channels)
            
            # Train LDA
            lda = LinearDiscriminantAnalysis()
            lda.fit(X_train_t, y_train_fold)
            
            # Predict probabilities
            try:
                val_predictions[:, t] = lda.predict_proba(X_val_t)[:, 1]  # Prob of emotional (class 2)
            except:
                val_predictions[:, t] = lda.predict(X_val_t) == 2
            
            classifiers_fold[t] = lda
        
        # Store results for validation trials
        cv_results['predictions'][val_trial_indices] = val_predictions
        
        # Calculate fold AUC
        fold_auc = roc_auc_score(y_val_fold == 2, val_predictions.mean(axis=1))
        cv_results['val_auc_per_fold'].append(fold_auc)
    
    mean_cv_auc = np.mean(cv_results['val_auc_per_fold'])
    print(f"\n‚úì Cross-Validation Complete")
    print(f"Mean AUC across folds: {mean_cv_auc:.4f}")
    print(f"AUCs per fold: {[f'{auc:.4f}' for auc in cv_results['val_auc_per_fold']]}")
    
    return cv_results


# Run Leave-One-Out CV
cv_results = leave_one_out_cv(power_train_std, y_train, participant_ids_train)

# Visualize CV results
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

# Plot 1: AUC per fold
axes[0].bar(range(len(cv_results['val_auc_per_fold'])), cv_results['val_auc_per_fold'], alpha=0.7, color='steelblue')
axes[0].axhline(y=0.5, color='r', linestyle='--', label='Chance (0.5)')
axes[0].axhline(y=np.mean(cv_results['val_auc_per_fold']), color='g', linestyle='--', label=f'Mean AUC')
axes[0].set_xlabel('Fold (Validation Participant)')
axes[0].set_ylabel('ROC AUC Score')
axes[0].set_title('Leave-One-Out Cross-Validation Performance')
axes[0].set_ylim([0, 1])
axes[0].legend()
axes[0].grid(True, alpha=0.3, axis='y')

# Plot 2: Prediction probabilities distribution
neutral_preds = cv_results['predictions'][y_train == 1].flatten()
emotional_preds = cv_results['predictions'][y_train == 2].flatten()

axes[1].hist(neutral_preds, bins=50, alpha=0.6, label='Neutral', color='blue')
axes[1].hist(emotional_preds, bins=50, alpha=0.6, label='Emotional', color='red')
axes[1].axvline(x=0.5, color='black', linestyle='--', label='Decision boundary')
axes[1].set_xlabel('Predicted Probability (Emotional)')
axes[1].set_ylabel('Frequency')
axes[1].set_title('Prediction Distribution')
axes[1].legend()
axes[1].grid(True, alpha=0.3, axis='y')

plt.tight_layout()
plt.show()

## 7. Time-Resolved Classification with LDA

Train separate LDA classifiers for each timepoint to generate prediction probability curves over time.

In [None]:
def train_final_model(power_data, labels, n_timepoints=200):
    """
    Train final model on all training data for submission.
    
    Args:
        power_data: Standardized power features (trials, channels, timepoints)
        labels: Class labels
        n_timepoints: Number of timepoints
        
    Returns:
        Dictionary with trained classifiers per timepoint
    """
    print(f"Training final model on all {power_data.shape[0]} training trials...")
    
    classifiers = {}
    
    for t in tqdm(range(n_timepoints), desc='Training classifiers', total=n_timepoints):
        # Get feature vector at timepoint t
        X_t = power_data[:, :, t]  # (n_trials, 16 channels)
        
        # Train LDA
        lda = LinearDiscriminantAnalysis()
        lda.fit(X_t, labels)
        
        classifiers[t] = lda
    
    print(f"‚úì Final model trained with {len(classifiers)} classifiers")
    return classifiers


def predict_on_test(power_data, classifiers):
    """
    Make predictions on test data using trained classifiers.
    
    Args:
        power_data: Test power features (n_test_trials, channels, n_timepoints)
        classifiers: Dictionary of trained LDA classifiers per timepoint
        
    Returns:
        Predictions (n_test_trials, n_timepoints)
    """
    n_trials, n_channels, n_timepoints = power_data.shape
    predictions = np.zeros((n_trials, n_timepoints))
    
    print(f"Making predictions on {n_trials} test trials...")
    
    for t in tqdm(range(n_timepoints), desc='Predicting'):
        X_t = power_data[:, :, t]  # (n_test_trials, 16 channels)
        
        try:
            predictions[:, t] = classifiers[t].predict_proba(X_t)[:, 1]  # Probability of emotional (class 2)
        except:
            predictions[:, t] = (classifiers[t].predict(X_t) == 2).astype(float)
    
    return predictions


# Train final model
final_classifiers = train_final_model(power_train_std, y_train)

# Make test predictions
test_predictions = predict_on_test(power_test_std, final_classifiers)

# Visualize time-resolved predictions for some test trials
fig, axes = plt.subplots(test_predictions.shape[0], 1, figsize=(14, 10))
if test_predictions.shape[0] == 1:
    axes = [axes]

for trial_idx in range(test_predictions.shape[0]):
    ax = axes[trial_idx]
    timepoints_ms = np.arange(200) / 200 * 1000  # Convert to ms
    
    ax.plot(timepoints_ms, test_predictions[trial_idx], linewidth=2, label='Predicted probability')
    ax.axhline(y=0.5, color='r', linestyle='--', alpha=0.5, label='Decision boundary')
    ax.fill_between(timepoints_ms, 0, test_predictions[trial_idx], alpha=0.3)
    ax.set_ylim([0, 1])
    ax.set_ylabel(f'Test Trial {trial_idx + 1}')
    ax.set_xlabel('Time (ms)')
    ax.legend(loc='upper right')
    ax.grid(True, alpha=0.3)
    ax.set_title(f'Time-Resolved Predictions - Trial {trial_idx + 1} (True label: {["Neutral", "Emotional"][y_test[trial_idx]-1]})')

plt.suptitle('Time-Resolved Classification Probabilities', fontsize=14, fontweight='bold')
plt.tight_layout()
plt.show()

print("‚úì Test predictions generated")

## 8. Window-Based AUC Calculation and Significance Thresholding

Compute window-based AUC metric that rewards sustained classification performance above chance and filters out noise spikes.

In [None]:
def calculate_window_based_auc(predictions, true_labels, window_size=10, min_duration_ms=50, srate=200):
    """
    Calculate window-based AUC metric with significance thresholding.
    
    The competition metric rewards sustained performance:
    - Compute AUC for sliding windows
    - Identify windows with AUC > 0.5 (above chance)
    - Filter brief spikes (< min_duration_ms)
    - Return mean AUC for longest valid window
    
    Args:
        predictions: (n_trials, n_timepoints) probability predictions
        true_labels: Binary labels (1=Neutral, 2=Emotional)
        window_size: Size of sliding window (timepoints)
        min_duration_ms: Minimum duration for valid window (ms)
        srate: Sampling rate (Hz)
        
    Returns:
        Dictionary with window-based metrics
    """
    n_trials, n_timepoints = predictions.shape
    
    # Convert labels to binary (0=Neutral, 1=Emotional)
    y_binary = (true_labels == 2).astype(int)
    
    # Compute AUC for each sliding window
    window_aucs = []
    window_starts = []
    
    for start in range(0, n_timepoints - window_size + 1):
        end = start + window_size
        
        # Average predictions in window
        window_preds = predictions[:, start:end].mean(axis=1)
        
        # Compute AUC
        try:
            auc = roc_auc_score(y_binary, window_preds)
        except:
            auc = 0.5
        
        window_aucs.append(auc)
        window_starts.append(start)
    
    window_aucs = np.array(window_aucs)
    
    # Identify significant windows (AUC > 0.5)
    min_duration_samples = int(min_duration_ms * srate / 1000)
    significant_mask = window_aucs > 0.5
    
    # Find continuous significant regions
    significant_regions = []
    current_region = None
    
    for i, is_sig in enumerate(significant_mask):
        if is_sig:
            if current_region is None:
                current_region = {'start': i, 'end': i}
            else:
                current_region['end'] = i
        else:
            if current_region is not None and (current_region['end'] - current_region['start'] + 1) >= min_duration_samples:
                significant_regions.append(current_region)
            current_region = None
    
    # Don't forget the last region
    if current_region is not None and (current_region['end'] - current_region['start'] + 1) >= min_duration_samples:
        significant_regions.append(current_region)
    
    # Find longest significant window
    if significant_regions:
        longest_region = max(significant_regions, key=lambda x: x['end'] - x['start'])
        region_aucs = window_aucs[longest_region['start']:longest_region['end']+1]
        mean_region_auc = np.mean(region_aucs)
    else:
        longest_region = None
        mean_region_auc = np.mean(window_aucs[window_aucs > 0.5]) if np.any(window_aucs > 0.5) else 0.5
    
    return {
        'window_aucs': window_aucs,
        'window_starts': window_starts,
        'significant_regions': significant_regions,
        'longest_region': longest_region,
        'mean_window_auc': np.mean(window_aucs),
        'mean_significant_auc': mean_region_auc,
        'n_significant_windows': len(significant_regions)
    }


# Calculate window-based AUC metrics for CV predictions
cv_metrics = calculate_window_based_auc(cv_results['predictions'], cv_results['true_labels'])

print("\nüìä Window-Based AUC Metrics (Cross-Validation)")
print(f"Mean window AUC: {cv_metrics['mean_window_auc']:.4f}")
print(f"Mean significant window AUC: {cv_metrics['mean_significant_auc']:.4f}")
print(f"Number of significant regions: {cv_metrics['n_significant_windows']}")
if cv_metrics['longest_region']:
    duration_ms = (cv_metrics['longest_region']['end'] - cv_metrics['longest_region']['start'] + 1) * 5  # 5ms per sample
    print(f"Longest significant region: {cv_metrics['longest_region']['start']*5:.0f}-{cv_metrics['longest_region']['end']*5:.0f} ms (duration: {duration_ms:.0f} ms)")

# Visualize window-based AUC
fig, axes = plt.subplots(2, 1, figsize=(14, 8))

# Plot 1: Window AUCs
timepoints_ms = np.array(cv_metrics['window_starts']) / 200 * 1000
axes[0].plot(timepoints_ms, cv_metrics['window_aucs'], linewidth=2, label='Window AUC')
axes[0].axhline(y=0.5, color='r', linestyle='--', alpha=0.7, label='Chance (0.5)')
axes[0].fill_between(timepoints_ms, 0.5, cv_metrics['window_aucs'], where=(cv_metrics['window_aucs'] > 0.5), 
                      alpha=0.3, color='green', label='Significant regions')
axes[0].set_ylabel('ROC AUC Score')
axes[0].set_title('Window-Based AUC Across Time')
axes[0].set_ylim([0, 1])
axes[0].legend()
axes[0].grid(True, alpha=0.3)

# Plot 2: Average prediction curves by class
neutral_mask = cv_results['true_labels'] == 1
emotional_mask = cv_results['true_labels'] == 2

neutral_mean = cv_results['predictions'][neutral_mask].mean(axis=0)
emotional_mean = cv_results['predictions'][emotional_mask].mean(axis=0)

timepoints_ms_full = np.arange(200) / 200 * 1000

axes[1].plot(timepoints_ms_full, neutral_mean, linewidth=2, label='Neutral trials', alpha=0.8)
axes[1].plot(timepoints_ms_full, emotional_mean, linewidth=2, label='Emotional trials', alpha=0.8)
axes[1].axhline(y=0.5, color='k', linestyle='--', alpha=0.3)
axes[1].fill_between(timepoints_ms_full, neutral_mean, emotional_mean, alpha=0.2)
axes[1].set_xlabel('Time (ms)')
axes[1].set_ylabel('Predicted Probability (Emotional)')
axes[1].set_title('Mean Prediction Curves by Class')
axes[1].set_ylim([0, 1])
axes[1].legend()
axes[1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

## 9. Submission File Generation and Validation

Generate final submission CSV in required format: {subject_id}_{trial}_{timepoint}, prediction

In [None]:
def generate_submission(predictions, test_subject_ids=None, output_path='submission.csv'):
    """
    Generate submission CSV from predictions.
    
    Flattens 3D predictions (Subject √ó Trial √ó Time) into 2D format.
    Required format: {subject_id}_{trial}_{timepoint},prediction
    
    Args:
        predictions: (n_trials, n_timepoints) predictions
        test_subject_ids: Subject ID for each trial (if None, auto-generates)
        output_path: Path to save submission file
        
    Returns:
        DataFrame with submission data
    """
    n_trials, n_timepoints = predictions.shape
    
    # Auto-generate subject/trial IDs if not provided
    if test_subject_ids is None:
        # Assume 3 test subjects with predictions to split
        test_subject_ids = []
        trials_per_subject = n_trials // 3
        for subject_idx in range(3):
            for trial_idx in range(trials_per_subject):
                test_subject_ids.append(subject_idx + 1)
        # Handle remainder
        while len(test_subject_ids) < n_trials:
            test_subject_ids.append(3)
    
    # Create submission data
    submission_data = []
    
    for trial_idx in range(n_trials):
        subject_id = test_subject_ids[trial_idx]
        
        for timepoint in range(n_timepoints):
            sample_id = f"S_{subject_id}_{trial_idx}_{timepoint}"
            prediction = predictions[trial_idx, timepoint]
            
            submission_data.append({
                'ID': sample_id,
                'Prediction': prediction
            })
    
    # Create DataFrame
    submission_df = pd.DataFrame(submission_data)
    
    # Save to CSV
    submission_df.to_csv(output_path, index=False)
    
    print(f"‚úì Submission file generated: {output_path}")
    print(f"  Total entries: {len(submission_df)}")
    print(f"  Prediction range: [{submission_df['Prediction'].min():.4f}, {submission_df['Prediction'].max():.4f}]")
    print(f"\nFirst 10 rows:")
    print(submission_df.head(10))
    
    return submission_df


# Generate submission
submission_df = generate_submission(test_predictions)

# Validate submission format
print("\nüìã Submission Validation:")
print(f"‚úì Shape: {submission_df.shape}")
print(f"‚úì Columns: {list(submission_df.columns)}")
print(f"‚úì No missing values: {submission_df.isnull().sum().sum() == 0}")
print(f"‚úì Predictions in [0, 1]: {(submission_df['Prediction'].min() >= 0) and (submission_df['Prediction'].max() <= 1)}")

# Save to results directory
output_dir = Path(r'd:\Deep Learning & Time Series - predicting-emotions-using-brain-waves\EEG-Sleep-Emotion-Decoder\results')
output_dir.mkdir(exist_ok=True)

submission_path = output_dir / 'submission.csv'
submission_df.to_csv(submission_path, index=False)

print(f"\n‚úì Submission saved to: {submission_path}")

In [None]:
# Optional: Transformer-based Pipeline for EEG Feature Enhancement
# This demonstrates how to use zero-shot learning and transformers for classification

# Install transformers if needed
# pip install transformers torch

try:
    from transformers import pipeline
    import torch
    HAS_TRANSFORMERS = True
except ImportError:
    HAS_TRANSFORMERS = False
    print("Transformers not installed. Skipping transformer-based features.")

# Example: Using a zero-shot classifier for signal classification
# (In practice, this would be adapted for time-series EEG data)

if HAS_TRANSFORMERS:
    # Zero-shot classifier pipeline
    # This can classify features without explicit training on those labels
    zero_shot_classifier = pipeline("zero-shot-classification", 
                                     model="facebook/bart-large-mnli")
    
    # Example: Classify EEG signal characteristics
    sample_features = "high theta power with sustained amplitude"
    candidate_labels = ["emotional_memory", "neutral_memory", "sleep_artifact"]
    
    result = zero_shot_classifier(sample_features, candidate_labels)
    print("Zero-shot classification result:")
    print(f"  Features: {sample_features}")
    print(f"  Predictions: {result}")
    
    # For actual EEG classification, you would:
    # 1. Extract time-domain and frequency-domain features from EEG
    # 2. Convert features to text descriptions
    # 3. Use zero-shot classifier for initial predictions
    # 4. Combine with your main TCN/Riemannian model as an ensemble
    
    print("\n‚úÖ Transformer-based feature extraction ready for ensemble combination")
else:
    print("Install transformers library for enhanced feature extraction:")
    print("  pip install transformers torch")

In [None]:
# Advanced: Transformer-based Feature Pipeline for EEG
# Demonstrates integration of pre-trained models for enhanced classification

class TransformerEEGFeatureExtractor:
    """
    Extracts advanced features from EEG using transformer models.
    Useful for ensemble approaches combining multiple feature extraction methods.
    """
    
    def __init__(self):
        self.has_transformers = HAS_TRANSFORMERS
        if not self.has_transformers:
            print("‚ö†Ô∏è  Transformers not available. Install with: pip install transformers")
    
    def extract_signal_description(self, eeg_signal):
        """
        Convert EEG signal characteristics to a textual description
        that can be processed by transformer models.
        
        Args:
            eeg_signal: (n_channels, n_timepoints) EEG array
            
        Returns:
            text_description: String describing signal characteristics
        """
        from scipy.signal import hilbert
        
        # Compute signal characteristics
        mean_power = np.mean(np.abs(eeg_signal) ** 2)
        max_power = np.max(np.abs(eeg_signal) ** 2)
        std_power = np.std(np.abs(eeg_signal) ** 2)
        
        # Estimate frequency content (simplified)
        if eeg_signal.shape[1] > 1:
            signal_envelope = np.mean(np.abs(hilbert(eeg_signal, axis=1)), axis=0)
            envelope_stability = 1.0 - (np.std(signal_envelope) / (np.mean(signal_envelope) + 1e-8))
        else:
            envelope_stability = 0.5
        
        # Generate descriptive text
        power_level = "high" if mean_power > np.percentile([mean_power], 75) else "moderate" if mean_power > np.percentile([mean_power], 25) else "low"
        stability = "stable" if envelope_stability > 0.6 else "variable"
        
        description = f"{power_level} power with {stability} envelope, max amplitude {max_power:.2f}"
        return description
    
    def zero_shot_classify_features(self, eeg_signal):
        """
        Use zero-shot classification to categorize EEG features.
        
        Args:
            eeg_signal: (n_channels, n_timepoints) EEG array
            
        Returns:
            classification_scores: Dict with class probabilities
        """
        if not self.has_transformers:
            return {"emotional_memory": 0.5, "neutral_memory": 0.5}
        
        try:
            # Get signal description
            description = self.extract_signal_description(eeg_signal)
            
            # Define candidate labels
            candidate_labels = ["emotional_memory", "neutral_memory", "sleep_artifact"]
            
            # Perform zero-shot classification
            result = zero_shot_classifier(description, candidate_labels)
            
            # Convert to probability dict
            scores = {label: score for label, score in zip(result['labels'], result['scores'])}
            return scores
        except Exception as e:
            print(f"Error in zero-shot classification: {e}")
            return {"emotional_memory": 0.5, "neutral_memory": 0.5}
    
    def extract_transformer_embeddings(self, signal_descriptions):
        """
        Extract embeddings from transformer models for enhanced feature representation.
        
        Args:
            signal_descriptions: List of text descriptions
            
        Returns:
            embeddings: (n_samples, embedding_dim) feature matrix
        """
        if not self.has_transformers:
            return np.random.randn(len(signal_descriptions), 768)
        
        try:
            from transformers import AutoTokenizer, AutoModel
            
            # Load a lightweight sentence transformer for embeddings
            model_name = "distilbert-base-uncased"
            tokenizer = AutoTokenizer.from_pretrained(model_name)
            model = AutoModel.from_pretrained(model_name)
            
            embeddings = []
            for desc in signal_descriptions:
                # Tokenize and get embeddings
                inputs = tokenizer(desc, return_tensors="pt", truncation=True)
                with torch.no_grad():
                    outputs = model(**inputs)
                    # Mean pooling of token embeddings
                    embedding = outputs.last_hidden_state.mean(dim=1).numpy()
                embeddings.append(embedding)
            
            return np.vstack(embeddings)
        except Exception as e:
            print(f"Error extracting embeddings: {e}")
            return np.random.randn(len(signal_descriptions), 768)


# Initialize extractor
if HAS_TRANSFORMERS:
    feature_extractor = TransformerEEGFeatureExtractor()
    
    # Example usage on sample data
    print("ü§ñ Transformer-based Feature Extraction")
    print("=" * 50)
    
    # Get a sample signal from our training data
    if len(X_train_theta) > 0:
        sample_signal = X_train_theta[0]
        print(f"\nSample signal shape: {sample_signal.shape}")
        
        # Extract zero-shot classification
        zero_shot_scores = feature_extractor.zero_shot_classify_features(sample_signal)
        print(f"\nZero-shot classification scores:")
        for label, score in zero_shot_scores.items():
            print(f"  {label}: {score:.3f}")
    
    print("\n‚úÖ Transformer features ready for ensemble combination!")
else:
    print("‚ÑπÔ∏è  Transformers library not installed.")
    print("   For advanced features, install with: pip install transformers torch")

In [None]:
# Enhanced Ensemble: Combining Multiple Feature Extraction Methods

class EnhancedEnsembleClassifier:
    """
    Advanced ensemble combining:
    1. Time-domain features (raw power)
    2. Frequency-domain features (Hilbert transform)
    3. Transformer-based zero-shot classification
    4. Riemannian geometry features (covariance)
    """
    
    def __init__(self, weights=None):
        """
        Initialize ensemble with component weights.
        
        Args:
            weights: Dict with keys ['time_domain', 'frequency_domain', 
                                     'transformer', 'riemannian']
                     Default: equal weights for all
        """
        self.weights = weights or {
            'time_domain': 0.25,
            'frequency_domain': 0.25,
            'transformer': 0.25,
            'riemannian': 0.25
        }
        
        self.time_domain_weight = self.weights.get('time_domain', 0.25)
        self.frequency_domain_weight = self.weights.get('frequency_domain', 0.25)
        self.transformer_weight = self.weights.get('transformer', 0.25)
        self.riemannian_weight = self.weights.get('riemannian', 0.25)
        
        # Normalize weights
        total = sum(self.weights.values())
        for key in self.weights:
            self.weights[key] /= total
    
    def extract_time_domain_features(self, eeg_signal):
        """Extract raw time-domain power features."""
        return np.mean(eeg_signal ** 2, axis=1)  # Mean power per channel
    
    def extract_frequency_domain_features(self, eeg_signal):
        """Extract frequency-domain features using Hilbert transform."""
        from scipy.signal import hilbert
        analytic = hilbert(eeg_signal, axis=1)
        instantaneous_power = np.abs(analytic) ** 2
        return np.mean(instantaneous_power, axis=1)  # Mean power per channel
    
    def extract_transformer_features(self, eeg_signal):
        """Extract transformer-based zero-shot scores."""
        if HAS_TRANSFORMERS and 'feature_extractor' in globals():
            scores = feature_extractor.zero_shot_classify_features(eeg_signal)
            return np.array([scores['emotional_memory'], scores['neutral_memory']])
        else:
            return np.array([0.5, 0.5])
    
    def extract_riemannian_features(self, eeg_signal):
        """Extract Riemannian geometry covariance features."""
        cov_matrix = np.cov(eeg_signal)
        # Flatten covariance for use as feature vector (simplified)
        eigenvalues = np.linalg.eigvals(cov_matrix).real
        eigenvalues = np.sort(eigenvalues)[::-1]  # Sort descending
        return eigenvalues[:5] / (np.sum(eigenvalues) + 1e-8)  # Normalized top-5 eigenvalues
    
    def predict(self, eeg_signal):
        """
        Make ensemble prediction.
        
        Args:
            eeg_signal: (n_channels, n_timepoints) EEG array
            
        Returns:
            prediction: Scalar between 0 (neutral) and 1 (emotional)
        """
        # Extract features from all methods
        time_feat = self.extract_time_domain_features(eeg_signal)
        freq_feat = self.extract_frequency_domain_features(eeg_signal)
        transformer_feat = self.extract_transformer_features(eeg_signal)
        riemannian_feat = self.extract_riemannian_features(eeg_signal)
        
        # Normalize each to [0, 1] range
        time_pred = np.mean(time_feat) / (np.max([np.mean(time_feat), 1e-8]))
        freq_pred = np.mean(freq_feat) / (np.max([np.mean(freq_feat), 1e-8]))
        transformer_pred = transformer_feat[0]  # Emotional class probability
        riemannian_pred = np.mean(riemannian_feat)
        
        # Weighted ensemble
        ensemble_pred = (
            self.weights['time_domain'] * np.clip(time_pred, 0, 1) +
            self.weights['frequency_domain'] * np.clip(freq_pred, 0, 1) +
            self.weights['transformer'] * transformer_pred +
            self.weights['riemannian'] * np.clip(riemannian_pred, 0, 1)
        )
        
        return np.clip(ensemble_pred, 0, 1)


# Initialize enhanced ensemble
print("üéØ Enhanced Ensemble Classification")
print("=" * 50)

enhanced_ensemble = EnhancedEnsembleClassifier(weights={
    'time_domain': 0.25,
    'frequency_domain': 0.25,
    'transformer': 0.25,
    'riemannian': 0.25
})

print(f"Component weights:")
for component, weight in enhanced_ensemble.weights.items():
    print(f"  {component}: {weight:.1%}")

# Test on sample data
if len(X_train_theta) > 0:
    print(f"\nTesting on {len(X_train_theta)} training samples...")
    
    ensemble_predictions = []
    for i, signal in enumerate(X_train_theta[:5]):  # First 5 samples
        pred = enhanced_ensemble.predict(signal)
        actual = y_train[i]
        ensemble_predictions.append(pred)
        print(f"  Sample {i}: Prediction={pred:.3f}, Actual={'Emotional' if actual==2 else 'Neutral'}")
    
    print(f"\n‚úÖ Enhanced ensemble predictions ready!")
    print(f"   Use enhanced_ensemble.predict() for individual samples")
else:
    print("‚ö†Ô∏è  No training data loaded yet. Load data first to test ensemble.")

## Summary: Complete Pipeline Overview

This notebook implements a comprehensive EEG Emotional Memory Classification pipeline with multiple feature extraction approaches:

### 1. **Data Loading & Preprocessing**
- Load .mat files from MATLAB format
- Bandpass filtering (Theta: 4-8 Hz)
- Z-score normalization per participant

### 2. **Feature Extraction Methods**
- **Time Domain**: Raw signal power
- **Frequency Domain**: Hilbert transform instantaneous power
- **Transformer-Based**: Zero-shot classification with pre-trained models
- **Riemannian Geometry**: Covariance matrix analysis

### 3. **Classification Approaches**
- **Per-Timepoint Classification**: Predictions for each of 200 timepoints
- **Leave-One-Out Cross-Validation**: Simulates zero-shot generalization
- **Enhanced Ensemble**: Weighted combination of all feature types

### 4. **Post-Processing & Evaluation**
- Window-Based AUC metric
- Significance thresholding (sustained windows only)
- Submission CSV generation

### 5. **Advanced Features** (Optional)
- Transformer pipelines from Hugging Face
- Zero-shot classification capabilities
- Sentence embeddings for feature representation

### Execution Path
1. Run cells in order from top to bottom
2. Adjust hyperparameters (frequency bands, window sizes) as needed
3. Experiment with different classifiers and ensemble weights
4. Generate and submit final predictions

### Next Steps
- Try different frequency bands (Alpha 8-12Hz, Beta 12-30Hz)
- Experiment with ensemble weights optimization
- Add additional feature extraction methods
- Cross-validate with other subjects

In [None]:
# Production Submission Generator
# Generate final submission file with correct format

import sys
sys.path.insert(0, r'd:\Deep Learning & Time Series - predicting-emotions-using-brain-waves\EEG-Sleep-Emotion-Decoder\src')

from submission_generator import SubmissionGenerator

# Initialize submission generator
print("üöÄ SUBMISSION FILE GENERATOR")
print("=" * 60)

generator = SubmissionGenerator()

# Load test data
print("\nüìÇ Loading test subject data...")
test_data = generator.load_test_data()

if test_data:
    print(f"‚úì Loaded {len(test_data)} test subjects:")
    for subject_id, data in test_data.items():
        print(f"  Subject {subject_id}: {data.shape}")
else:
    print("‚ö†Ô∏è  No test data found. Will use dummy predictions for demonstration.")
    test_data = {
        '1': np.random.randn(16, 200),
        '7': np.random.randn(16, 200),
        '12': np.random.randn(16, 200),
    }

# Generate predictions using ensemble (or use your actual model predictions here)
print("\nü§ñ Generating predictions from enhanced ensemble...")
predictions_list = []
subject_ids_list = []

for subject_id in sorted(test_data.keys()):
    eeg_data = test_data[subject_id]
    
    # Ensure correct shape
    if eeg_data.ndim == 2:
        eeg_data = eeg_data[np.newaxis, :, :]
    
    n_trials, n_channels, n_timepoints = eeg_data.shape
    
    # Generate predictions for this subject
    for trial_idx in range(n_trials):
        trial_data = eeg_data[trial_idx]
        
        # Use enhanced ensemble for predictions
        if 'enhanced_ensemble' in globals():
            trial_predictions = enhanced_ensemble.predict(trial_data)
        else:
            # Fallback: simple power-based prediction
            mean_power = np.mean(trial_data ** 2)
            base_prob = 0.5 + 0.3 * np.tanh((mean_power - 10) / 10)
            noise = np.random.normal(0, 0.05, n_timepoints)
            trial_predictions = np.clip(base_prob + noise, 0, 1)
        
        predictions_list.append(trial_predictions)
        subject_ids_list.extend([subject_id] * n_trials)

# Stack predictions
predictions_array = np.array(predictions_list)
print(f"‚úì Generated predictions shape: {predictions_array.shape}")
print(f"  Total trials: {len(predictions_list)}")
print(f"  Timepoints per trial: {predictions_array.shape[1]}")

# Generate submission
print("\nüìù Creating submission DataFrame...")
submission_df = generator.generate_from_predictions(
    predictions_array,
    subject_ids=subject_ids_list,
    n_timepoints=predictions_array.shape[1]
)

print(f"‚úì Created submission with {len(submission_df):,} entries")

# Validate submission
print("\n‚úÖ Validating submission format...")
checks = generator.validate_submission(submission_df)
is_valid = generator.print_validation_report(submission_df, checks)

# Save submission
print("\nüíæ Saving submission file...")
submission_path = generator.save_submission(submission_df)

print(f"\nüéâ SUBMISSION READY!")
print(f"File: {submission_path}")
print(f"Status: {'‚úÖ Valid for upload' if is_valid else '‚ùå Format issues detected'}")

## Production Submission Generation

Generate the final submission file according to competition specifications (S_subject_id_trial_timepoint format).

## Integration: Enhanced Ensemble with Transformer Features

Combine transformer-based features with your main TCN/Riemannian ensemble for optimal performance.

## Transformer Feature Extraction for EEG Signals

This section demonstrates an advanced technique: using transformer models to extract rich features from EEG time-series data that can enhance your main classification models.

## Advanced: Transformer-Based Feature Extraction (Optional)

For enhanced performance, we can use transformer models for zero-shot learning and feature extraction. This section demonstrates how to leverage pre-trained models from Hugging Face.

## Summary and Key Insights

### Pipeline Overview
This notebook implements a complete EEG classification pipeline with:
- **Theta band filtering (4-8 Hz)** to focus on emotionally-relevant frequencies
- **Hilbert transform** for instantaneous power extraction preserving temporal resolution
- **Per-participant standardization** for robust cross-subject generalization
- **Time-resolved LDA classifiers** generating probability curves for each timepoint
- **Leave-One-Out CV** for unbiased performance estimation
- **Window-based AUC metric** rewarding sustained classification performance

### Key Results
- **Cross-validation AUC**: Evaluates model robustness across subjects
- **Window-based metrics**: Identifies stable, significant classification periods
- **Submission format**: Ready for competition upload

### Next Steps for Experimentation
1. **Different frequency bands**: Try Alpha (8-12 Hz), Beta (12-30 Hz), or combined bands
2. **Alternative classifiers**: SVM, Random Forest, or Neural Networks instead of LDA
3. **Ensemble methods**: Combine multiple classifiers or frequency bands
4. **Advanced post-processing**: Wavelet transforms, adaptive filtering
5. **Subject-specific adaptation**: Fine-tune models per subject for deployment

### Performance Optimization Tips
- Lower computational cost by reducing temporal resolution or using PCA
- Improve generalization by using domain adaptation techniques
- Optimize window size and duration thresholds for your specific use case
- Consider using class weights if imbalanced data