##  Load and Preprocess EEG Data

### 1. Load .mat files


In [1]:
import numpy as np
import scipy.io
from scipy import signal
import os
from pathlib import Path
import pandas as pd
import mne
from mne.preprocessing import ICA

def load_mat_files(directory):
    """Load all .mat files from a directory into a dictionary"""
    data_dict = {}
    if not os.path.exists(directory):
        print(f"Directory {directory} not found")
        return data_dict
        
    for file in os.listdir(directory):
        if file.endswith('.mat'):
            file_path = os.path.join(directory, file)
            try:
                # Load the .mat file
                mat_data = scipy.io.loadmat(file_path)
                # Get the variable name from the file name 
                var_name = file.split('.')[0]
                # Extract the EEG data using the variable name as key
                eeg_data = mat_data[var_name]
                # Store in dictionary with filename as key
                data_dict[file] = eeg_data
                print(f"Successfully loaded {file} with shape {eeg_data.shape}")
            except Exception as e:
                print(f"Error loading {file}: {str(e)}")
    return data_dict


## 2. Preprocess EEG Data
1. Bandpass filter (1-50 Hz)
2. ICA for artifact removal 
3. Epoching

In [3]:

def preprocess_eeg(raw_data, sfreq=128):
    """
    Preprocess single subject EEG data with:
    1. Bandpass filter (1-50 Hz)
    2. ICA for artifact removal
    3. Epoching
    """
    # Create MNE Raw object
    ch_names = [f'EEG{i+1}' for i in range(raw_data.shape[1])]
    ch_types = ['eeg'] * raw_data.shape[1]
    info = mne.create_info(ch_names=ch_names, sfreq=sfreq, ch_types=ch_types)
    raw = mne.io.RawArray(raw_data.T, info)
    
    try:
        # Bandpass filter (1-50 Hz)
        raw_filtered = raw.copy().filter(l_freq=1, h_freq=50)
        
        # ICA for artifact removal
        ica = ICA(
            n_components=min(raw_data.shape[1] - 1, 15),
            random_state=42,
            max_iter=800
        )
        ica.fit(raw_filtered)
        raw_clean = raw_filtered.copy()
        ica.apply(raw_clean)
        
        #Create epochs (2-second epochs with 50% overlap)
        events = mne.make_fixed_length_events(
            raw_clean,
            duration=2.0,
            overlap=1.0
        )
        
        epochs = mne.Epochs(
            raw_clean,
            events,
            tmin=0,
            tmax=2,
            baseline=None,
            preload=True
        )
        
        epoched_data = epochs.get_data()
        print(f"Successfully preprocessed data: {epoched_data.shape}")
        return epoched_data
        
    except Exception as e:
        print(f"Error during preprocessing: {str(e)}")
        return None


## 3. Feature Extraction 
Extract 3 features 
1. PSD 
(Delta (1-4 Hz)
Theta (4-8 Hz)
Alpha (8-13 Hz)
Beta (13-30 Hz)
Gamma (30-45 Hz))
2. Theta/Beta Ratio
3. Higuchi Fractal Dimension


In [5]:


def extract_features(epoched_data, sfreq=128):
    """
    Extract PSD, Theta/Beta Ratio, and Fractal Dimension features from epoched data
    
    Parameters:
    -----------
    epoched_data : numpy.ndarray
        Epoched data of shape (n_epochs, n_channels, n_timepoints)
    sfreq : float
        Sampling frequency
    
    Returns:
    --------
    list
        List of dictionaries containing features for all epochs
    """
    n_epochs, n_channels, n_timepoints = epoched_data.shape
    all_features = []
    
    # Process each epoch
    for epoch in range(n_epochs):
        epoch_features = {}
        
        # Process each channel
        for channel in range(n_channels):
            # Get channel data for current epoch
            data = epoched_data[epoch, channel]
            
            #Calculate PSD using Welch's method
            freqs, psd = signal.welch(data, fs=sfreq, nperseg=min(256, n_timepoints))
            
            # Calculate power in different frequency bands
            bands = {
                'delta': (1, 4),
                'theta': (4, 8),
                'alpha': (8, 13),
                'beta': (13, 30),
                'gamma': (30, 45)
            }
            
            for band_name, (fmin, fmax) in bands.items():
                # Find frequencies within band
                mask = (freqs >= fmin) & (freqs <= fmax)
                # Calculate average power in band
                power = np.mean(psd[mask])
                epoch_features[f'{band_name}_power_ch{channel}'] = power
            
            # Calculate Theta/Beta Ratio
            theta_mask = (freqs >= 4) & (freqs <= 8)
            beta_mask = (freqs >= 13) & (freqs <= 30)
            theta_power = np.mean(psd[theta_mask])
            beta_power = np.mean(psd[beta_mask])
            theta_beta_ratio = theta_power / beta_power if beta_power != 0 else 0
            epoch_features[f'theta_beta_ratio_ch{channel}'] = theta_beta_ratio
            
            # Calculate Higuchi Fractal Dimension
            def calculate_hfd(signal_data, kmax=10):
                N = len(signal_data)
                L = []
                x = []
                
                for k in range(1, kmax + 1):
                    Lk = 0
                    for m in range(k):
                        # Calculate length Lm(k)
                        Lmk = 0
                        for i in range(1, int((N - m) / k)):
                            Lmk += abs(signal_data[m + i * k] - signal_data[m + (i - 1) * k])
                        Lmk = Lmk * (N - 1) / (((N - m) / k) * k)
                        Lk += Lmk
                    L.append(np.log(Lk / k))
                    x.append(np.log(1.0 / k))
                
                # Fit line and get slope
                p = np.polyfit(x, L, 1)
                return -p[0]  # Fractal dimension is the negative of the slope
            
            hfd = calculate_hfd(data)
            epoch_features[f'fractal_dim_ch{channel}'] = hfd
            
        all_features.append(epoch_features)
    
    return all_features


In [6]:

def main():
    adhd_part1_path = Path('ADHD_part1')
    adhd_part2_path = Path('ADHD_part2')
    control_part1_path = Path('Control_part1')
    control_part2_path = Path('Control_part2')
    
    # Load all data
    print("Loading ADHD group data...")
    adhd_data_part1 = load_mat_files(adhd_part1_path)
    adhd_data_part2 = load_mat_files(adhd_part2_path)

    print("\nLoading Control group data...")
    control_data_part1 = load_mat_files(control_part1_path)
    control_data_part2 = load_mat_files(control_part2_path)
    
    # Initialize feature storage
    processed_features = {
        'adhd': [],
        'control': []
    }
    
    # Process ADHD data
    print("\nProcessing ADHD data...")
    for data_dict in [adhd_data_part1, adhd_data_part2]:
        for file_name, eeg_data in data_dict.items():
            try:
                # First preprocess the data
                processed = preprocess_eeg(eeg_data)
                if processed is not None:
                    # Then extract features
                    features = extract_features(processed)
                    processed_features['adhd'].extend(features)
                    print(f"Successfully processed and extracted features from {file_name}")
            except Exception as e:
                print(f"Error processing {file_name}: {str(e)}")
    
    # Process Control data
    print("\nProcessing Control data...")
    for data_dict in [control_data_part1, control_data_part2]:
        for file_name, eeg_data in data_dict.items():
            try:
                # First preprocess the data
                processed = preprocess_eeg(eeg_data)
                if processed is not None:
                    # Then extract features
                    features = extract_features(processed)
                    processed_features['control'].extend(features)
                    print(f"Successfully processed and extracted features from {file_name}")
            except Exception as e:
                print(f"Error processing {file_name}: {str(e)}")
    
    # Convert to DataFrame
    adhd_df = pd.DataFrame(processed_features['adhd'])
    control_df = pd.DataFrame(processed_features['control'])
    
    # Add labels
    adhd_df['label'] = 1
    control_df['label'] = 0
    
    # Combine all features
    all_features = pd.concat([adhd_df, control_df], axis=0)
    all_features = all_features.reset_index(drop=True)
    
    # Save features
    all_features.to_csv('eeg_features.csv', index=False)
    
    print("\nFeature extraction complete!")
    print(f"Total samples: {len(all_features)}")
    print(f"Number of features: {len(all_features.columns) - 1}")  # Excluding label column
    print("Features saved to 'eeg_features.csv'")
    
    return all_features

if __name__ == "__main__":
    features_df = main()

Loading ADHD group data...
Directory ADHD_part1 not found
Directory ADHD_part2 not found

Loading Control group data...
Directory Control_part1 not found
Directory Control_part2 not found

Processing ADHD data...

Processing Control data...

Feature extraction complete!
Total samples: 0
Number of features: 0
Features saved to 'eeg_features.csv'


## 4. SVM Classifier 
1. ~92.5% accuracy 
2. Evaluatio?n: cross valication, confusion matrix, performance metrics.

In [12]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split, cross_val_score, StratifiedKFold
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC
from sklearn.metrics import classification_report, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns

def train_svm_classifier():
    """
    Train SVM classifier on the EEG features
    """
    # Load the features
    print("Loading features from eeg_features.csv...")
    features_df = pd.read_csv('eeg_features.csv')
    
    # Separate features and labels
    X = features_df.drop('label', axis=1)
    y = features_df['label']
    
    # Split the data
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42, stratify=y
    )
    
    # Scale the features
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)
    
    # Initialize and train SVM
    print("\nTraining SVM classifier...")
    svm = SVC(kernel='rbf', random_state=42)
    svm.fit(X_train_scaled, y_train)
    
    # Evaluate using cross-validation
    print("\nPerforming cross-validation...")
    cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
    cv_scores = cross_val_score(svm, X_train_scaled, y_train, cv=cv)
    print(f"Cross-validation scores: {cv_scores}")
    print(f"Average CV accuracy: {cv_scores.mean():.3f} (+/- {cv_scores.std() * 2:.3f})")
    
    # Evaluate on test set
    print("\nEvaluating on test set...")
    y_pred = svm.predict(X_test_scaled)
    
    # Print classification report
    print("\nClassification Report:")
    print(classification_report(y_test, y_pred, target_names=['Control', 'ADHD']))
    
    # Create confusion matrix
    cm = confusion_matrix(y_test, y_pred)
    
    # Plot confusion matrix
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                xticklabels=['Control', 'ADHD'],
                yticklabels=['Control', 'ADHD'])
    plt.title('Confusion Matrix')
    plt.ylabel('True Label')
    plt.xlabel('Predicted Label')
    plt.show()
    
    # Feature importance analysis
    print("\nAnalyzing feature importance...")
    if svm.kernel == 'linear':
        # For linear kernel, we can directly get feature importance
        importance = np.abs(svm.coef_[0])
        feature_importance = pd.DataFrame({
            'feature': X.columns,
            'importance': importance
        })
        feature_importance = feature_importance.sort_values('importance', ascending=False)
        
        # Plot top 20 most important features
        plt.figure(figsize=(12, 6))
        sns.barplot(data=feature_importance.head(20), x='importance', y='feature')
        plt.title('Top 20 Most Important Features')
        plt.xlabel('Absolute Coefficient Value')
        plt.tight_layout()
        plt.show()
    
    return {
        'model': svm,
        'scaler': scaler,
        'cv_scores': cv_scores,
        'test_accuracy': svm.score(X_test_scaled, y_test),
        'classification_report': classification_report(y_test, y_pred),
        'confusion_matrix': cm
    }

def main():
    # Train and evaluate the classifier
    results = train_svm_classifier()
    
    # Print summary
    print("\nSummary:")
    print(f"Average CV accuracy: {results['cv_scores'].mean():.3f}")
    print(f"Test set accuracy: {results['test_accuracy']:.3f}")
    
    # Save the model and scaler
    import joblib
    joblib.dump(results['model'], 'svm_model.joblib')
    joblib.dump(results['scaler'], 'scaler.joblib')
    print("\nModel and scaler saved to 'svm_model.joblib' and 'scaler.joblib'")

if __name__ == "__main__":
    main()

Loading features from eeg_features.csv...


ValueError: With n_samples=0, test_size=0.2 and train_size=None, the resulting train set will be empty. Adjust any of the aforementioned parameters.