In [12]:
import numpy as np
import mne
from mne.decoding import CSP
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline
from sklearn.neural_network import MLPClassifier  # Neural Network Classifier
from sklearn.model_selection import StratifiedKFold, cross_val_score, cross_val_predict, GridSearchCV
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report
from mne.filter import filter_data
import matplotlib.pyplot as plt
# Import the required Python libraries
import os
import re
from pathlib import Path
import pandas as pd
import numpy as np
import mne

In [13]:
# Define the path to your raw_data directory
data_dir = Path('../data/raw data')  # Adjust this path as needed

# Verify the directory exists
if not data_dir.exists():
    raise FileNotFoundError(f"The specified data directory does not exist: {data_dir}")

# Define the specific runs to process, including both two-digit and three-digit run numbers
desired_runs = ['04', '08', '12']

# Compile a regular expression pattern to match filenames like S001R04.edf, S002R08.edf, S001R012.edf, etc.
# This pattern ensures that only R04, R08, R12 are matched
edf_pattern = re.compile(r'^S\d{3}R0?(04|08|12)\.edf$', re.IGNORECASE)

In [14]:
# Initialize a list to store file information
file_info = []

# Counter for processed subject directories
processed_subjects = 0

# Iterate through each subject folder in raw_data
for subject_dir in data_dir.iterdir():
    if subject_dir.is_dir() and re.match(r'^S\d{3}$', subject_dir.name, re.IGNORECASE):
        processed_subjects += 1
        # Iterate through each file in the subject directory
        for file in subject_dir.iterdir():
            if file.is_file() and edf_pattern.match(file.name):
                # Derive the corresponding event file name
                base_name = file.stem  # e.g., S001R04
                event_file = f"{base_name}.edf.event"
                event_path = subject_dir / event_file
                
                # Check if the event file exists
                event_exists = event_path.exists()
                
                # Append the information to the list
                file_info.append({
                    'subject': subject_dir.name,
                    'edf_file': file.name,
                    'event_file': event_file,
                    'event_exists': event_exists,
                    'edf_path': str(file.resolve()),
                    'event_path': str(event_path.resolve()) if event_exists else None
                })

# Convert the list to a DataFrame for better visualization
df_files = pd.DataFrame(file_info)

# Display summary of processed subject directories
print(f"Total subject directories processed: {processed_subjects}")

Total subject directories processed: 109


In [15]:
# Total number of matched .edf files
total_edf = df_files.shape[0]

# Number of event files present
total_events = df_files['event_exists'].sum()

# Number of missing event files
missing_events = total_edf - total_events

print(f"Total matched .edf files: {total_edf}")
print(f"Number of corresponding event files found: {total_events}")
print(f"Number of missing event files: {missing_events}")

Total matched .edf files: 327
Number of corresponding event files found: 327
Number of missing event files: 0


In [16]:
# Filter the DataFrame for missing event files
df_missing_events = df_files[~df_files['event_exists']]

# Display the list of .edf files without corresponding event files
if not df_missing_events.empty:
    print("\nFiles missing corresponding event files:")
    display(df_missing_events[['subject', 'edf_file', 'event_file']])
else:
    print("\nAll matched .edf files have corresponding event files.")


All matched .edf files have corresponding event files.


In [17]:
# Display the entire DataFrame of matched files
print("List of All Matched .edf Files:")
display(df_files[['subject', 'edf_file', 'event_file', 'event_exists']])

List of All Matched .edf Files:


Unnamed: 0,subject,edf_file,event_file,event_exists
0,S038,S038R12.edf,S038R12.edf.event,True
1,S038,S038R04.edf,S038R04.edf.event,True
2,S038,S038R08.edf,S038R08.edf.event,True
3,S007,S007R08.edf,S007R08.edf.event,True
4,S007,S007R04.edf,S007R04.edf.event,True
...,...,...,...,...
322,S106,S106R04.edf,S106R04.edf.event,True
323,S106,S106R08.edf,S106R08.edf.event,True
324,S108,S108R04.edf,S108R04.edf.event,True
325,S108,S108R12.edf,S108R12.edf.event,True


In [32]:
# Initialize lists to collect data and labels from all files
X_all = []
y_all = []


In [33]:
# Assuming df_files is already created as per your initial code









# Define your desired event types
desired_event_types = ['Rest', 'Imagining Left Hand', 'Imagining Right Hand']

# Iterate through each matched EDF file
for idx, row in df_files.iterrows():
    edf_path = row['edf_path']
    event_exists = row['event_exists']
    subject = row['subject']
    edf_file = row['edf_file']
    
    print(f"\nProcessing file: {edf_file} for subject: {subject}")
    
    if not event_exists:
        print(f"Skipping {edf_file} as it lacks an event file.")
        continue
    
    try:
        # Load the raw EEG data

        raw = mne.io.read_raw_edf(edf_path, preload=True, verbose=False)
        # Convert annotations to events and event_id
        events, event_id = mne.events_from_annotations(raw) # This line is missing in your code

        # Define event dictionary
        event_dict = {
            'Rest': 1,                # T0
            'Imagining Left Hand': 2,  # T1
            'Imagining Right Hand': 3  # T2
        }



        # Update event_id with descriptive labels
        event_id = {
            'Rest': 1,
            'Imagining Left Hand': 2,
            'Imagining Right Hand': 3
        }

        
        # Access annotations
        annotations = raw.annotations
        
        # Define epoch parameters
        tmin = -0.2  # 200 ms before the event
        tmax = 3.8   # 3800 ms after the event
        baseline = (None, 0)  # Baseline correction
        
        # Convert annotations to events
        events, _ = mne.events_from_annotations(raw, event_id=event_dict)
        
        if events.size == 0:
            print(f"No events found in {edf_file}. Skipping.")
            continue
        
        # Create epochs
        epochs = Epochs(raw, events, event_id=event_dict, tmin=tmin, tmax=tmax, baseline=baseline, preload=True, verbose=False)
        
        # Select only 'Imagining Left Hand' and 'Imagining Right Hand' epochs
        motor_epochs = epochs[['Imagining Left Hand', 'Imagining Right Hand']]
        
        if len(motor_epochs) == 0:
            print(f"No motor-related epochs found in {edf_file}. Skipping.")
            continue
        
        # Extract labels: 0 for Left, 1 for Right
        labels = motor_epochs.events[:, -1] - event_dict['Imagining Left Hand']
        
        # Get the data as a NumPy array: shape (n_epochs, n_channels, n_times)
        X = motor_epochs.get_data()
        y = labels
        
        # Append to the aggregate lists
        X_all.append(X)
        y_all.append(y)
        
        print(f"Loaded {X.shape[0]} epochs from {edf_file}.")
        
    except Exception as e:
        print(f"Failed to process {edf_file}. Error: {e}")


'''
# Define the path to the EDF file
edf_file = '../data/raw data/S001/S001R04.edf'

# Load the raw EEG data
raw = mne.io.read_raw_edf(edf_file, preload=True)

# Display basic information about the loaded data
print(raw.info)

# Plot raw EEG data interactively (optional)
# raw.plot(duration=30, n_channels=30, scalings='auto', title='Raw EEG Signals', show=True)

# Load the raw EEG data with annotations
raw = mne.io.read_raw_edf(edf_file, preload=True)

# Access annotations
annotations = raw.annotations

# Display annotations
print(annotations)

# Define event dictionary
event_dict = {
    'Rest': 1,                # T0
    'Imagining Left Hand': 2,  # T1
    'Imagining Right Hand': 3  # T2
}

# Convert annotations to events and event_id
events, event_id = mne.events_from_annotations(raw)

# Update event_id with descriptive labels
event_id = {
    'Rest': 1,
    'Imagining Left Hand': 2,
    'Imagining Right Hand': 3
}

print(event_id)

from mne import Epochs

# Define epoch parameters
tmin = -0.2  # 200 ms before the event
tmax = 3.8   # 3800 ms after the event
baseline = (None, 0)  # Baseline correction

# Create epochs
epochs = Epochs(raw, events, event_id=event_id, tmin=tmin, tmax=tmax, baseline=baseline, preload=True)

# Display epoch information
print(epochs)

# Plot an average epoch for 'Imagining Left Hand' (optional)
# fig = epochs['Imagining Left Hand'].average().plot()
# fig.suptitle('Imagining Left Hand')
# plt.show()

# Plot an average epoch for 'Imagining Right Hand' (optional)
# fig = epochs['Imagining Right Hand'].average().plot()
# fig.suptitle('Imagining Right Hand')
# plt.show()

# Plot an average epoch for 'Rest' (optional)
# fig = epochs['Rest'].average().plot()
# fig.suptitle('Rest')
# plt.show()

# Select only 'Imagining Left Hand' and 'Imagining Right Hand' epochs
motor_epochs = epochs[['Imagining Left Hand', 'Imagining Right Hand']]

# Extract labels: 0 for Left, 1 for Right
labels = motor_epochs.events[:, -1] - 2  # 'Imagining Left Hand' = 2, 'Imagining Right Hand' = 3

# Verify the distribution of classes
print("Class distribution:", np.bincount(labels))

# Get the data as a NumPy array: shape (n_epochs, n_channels, n_times)
X = motor_epochs.get_data()
y = labels

print(f"Shape of X: {X.shape}")
print(f"Shape of y: {y.shape}")
'''


Processing file: S038R12.edf for subject: S038
Used Annotations descriptions: [np.str_('T0'), np.str_('T1'), np.str_('T2')]
Failed to process S038R12.edf. Error: Could not find any of the events you specified.

Processing file: S038R04.edf for subject: S038
Used Annotations descriptions: [np.str_('T0'), np.str_('T1'), np.str_('T2')]
Failed to process S038R04.edf. Error: Could not find any of the events you specified.

Processing file: S038R08.edf for subject: S038
Used Annotations descriptions: [np.str_('T0'), np.str_('T1'), np.str_('T2')]
Failed to process S038R08.edf. Error: Could not find any of the events you specified.

Processing file: S007R08.edf for subject: S007
Used Annotations descriptions: [np.str_('T0'), np.str_('T1'), np.str_('T2')]
Failed to process S007R08.edf. Error: Could not find any of the events you specified.

Processing file: S007R04.edf for subject: S007
Used Annotations descriptions: [np.str_('T0'), np.str_('T1'), np.str_('T2')]
Failed to process S007R04.edf. 

  raw = mne.io.read_raw_edf(edf_path, preload=True, verbose=False)
  raw = mne.io.read_raw_edf(edf_path, preload=True, verbose=False)
  raw = mne.io.read_raw_edf(edf_path, preload=True, verbose=False)


'\n# Define the path to the EDF file\nedf_file = \'../data/raw data/S001/S001R04.edf\'\n\n# Load the raw EEG data\nraw = mne.io.read_raw_edf(edf_file, preload=True)\n\n# Display basic information about the loaded data\nprint(raw.info)\n\n# Plot raw EEG data interactively (optional)\n# raw.plot(duration=30, n_channels=30, scalings=\'auto\', title=\'Raw EEG Signals\', show=True)\n\n# Load the raw EEG data with annotations\nraw = mne.io.read_raw_edf(edf_file, preload=True)\n\n# Access annotations\nannotations = raw.annotations\n\n# Display annotations\nprint(annotations)\n\n# Define event dictionary\nevent_dict = {\n    \'Rest\': 1,                # T0\n    \'Imagining Left Hand\': 2,  # T1\n    \'Imagining Right Hand\': 3  # T2\n}\n\n# Convert annotations to events and event_id\nevents, event_id = mne.events_from_annotations(raw)\n\n# Update event_id with descriptive labels\nevent_id = {\n    \'Rest\': 1,\n    \'Imagining Left Hand\': 2,\n    \'Imagining Right Hand\': 3\n}\n\nprint(event

In [34]:
# Concatenate all epochs and labels
if X_all and y_all:
    X_all = np.concatenate(X_all, axis=0)
    y_all = np.concatenate(y_all, axis=0)
    
    print(f"\nTotal epochs collected: {X_all.shape[0]}")
    print(f"Shape of X_all: {X_all.shape}")
    print(f"Shape of y_all: {y_all.shape}")
else:
    raise ValueError("No epochs were loaded. Please check your EDF files and event annotations.")

ValueError: No epochs were loaded. Please check your EDF files and event annotations.

In [19]:
# Define frequency bands (in Hz)
freq_bands = [
    (8, 12),    # Alpha
    (13, 20),   # Beta 1
    (21, 30),   # Beta 2
    (31, 40),   # Low Gamma
    (41, 50),   # High Gamma
    (51, 60),   # Very High Gamma
    (61, 70),   # Ultra High Gamma
    (71, 79)    # Max Gamma
]

In [20]:
class FBCSP(BaseEstimator, TransformerMixin):
    def __init__(self, freq_bands, n_csp=4, sfreq=160):
        """
        Initialize the FBCSP transformer.
        
        Parameters:
        - freq_bands: List of tuples defining frequency bands.
        - n_csp: Number of CSP components per band.
        - sfreq: Sampling frequency of the EEG data.
        """
        self.freq_bands = freq_bands
        self.n_csp = n_csp
        self.sfreq = sfreq
        self.csp_list_ = []  # To store CSP models for each band

    def fit(self, X, y):
        """
        Fit CSP models for each frequency band.
        
        Parameters:
        - X: EEG data, shape (n_trials, n_channels, n_times)
        - y: Labels, shape (n_trials,)
        
        Returns:
        - self
        """
        self.csp_list_ = []  # Reset in case of multiple fits
        for band in self.freq_bands:
            print(f'Fitting CSP for band: {band[0]}-{band[1]} Hz')
            # Band-pass filter the data for the current frequency band
            X_filtered = np.array([
                filter_data(trial, sfreq=self.sfreq, l_freq=band[0], h_freq=band[1], method='iir', verbose=False)
                for trial in X
            ])
            
            # Initialize and fit CSP
            csp = CSP(n_components=self.n_csp, reg=None, log=True, norm_trace=False)
            csp.fit(X_filtered, y)
            self.csp_list_.append(csp)
        return self

    def transform(self, X):
        """
        Transform EEG data using the fitted CSP models.
        
        Parameters:
        - X: EEG data, shape (n_trials, n_channels, n_times)
        
        Returns:
        - X_features: Extracted features, shape (n_trials, n_bands * n_csp)
        """
        features = []
        for idx, band in enumerate(self.freq_bands):
            print(f'Transforming data for band: {band[0]}-{band[1]} Hz')
            csp = self.csp_list_[idx]
            # Band-pass filter the data for the current frequency band
            X_filtered = np.array([
                filter_data(trial, sfreq=self.sfreq, l_freq=band[0], h_freq=band[1], method='iir', verbose=False)
                for trial in X
            ])
            
            # Apply CSP transformation
            X_csp = csp.transform(X_filtered)
            
            # Extract log-variance features
            X_logvar = np.log(np.var(X_csp, axis=1))
            
            # Reshape X_logvar to (n_trials, 1) to make it 2D
            X_logvar = X_logvar.reshape(-1, 1)
            
            # Append the reshaped feature
            features.append(X_logvar)
        
        # Concatenate features from all bands along axis=1
       
        X_features = np.column_stack(features)
        return X_features

In [10]:
# Initialize FBCSP transformer
fbcsp = FBCSP(freq_bands=freq_bands, n_csp=4, sfreq=160)  # Replace sfreq with your actual sampling frequency

# Initialize Neural Network classifier
mlp = MLPClassifier(hidden_layer_sizes=(100,), max_iter=500, random_state=42)

# Define the classification pipeline
pipeline = Pipeline([
    ('fbcsp', fbcsp),
    ('classifier', mlp)
])

In [11]:
# Define cross-validation strategy
n_splits = 5  # Adjust based on your dataset size
cv = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=42)

# Perform cross-validation
print("Starting Cross-Validation...")
scores = cross_val_score(pipeline, X, y, cv=cv, scoring='accuracy')

print(f"\nCross-Validation Accuracy Scores: {scores}")
print(f"Mean Accuracy: {scores.mean() * 100:.2f}%")
print(f"Standard Deviation: {scores.std() * 100:.2f}%")

Starting Cross-Validation...
Fitting CSP for band: 8-12 Hz
Computing rank from data with rank=None
    Using tolerance 8.6e-05 (2.2e-16 eps * 64 dim * 6.1e+09  max singular value)
    Estimated rank (data): 64
    data: rank 64 computed from 64 data channels with 0 projectors
Reducing data rank from 64 -> 64
Estimating class=0 covariance using EMPIRICAL
Done.
Estimating class=1 covariance using EMPIRICAL
Done.
Fitting CSP for band: 13-20 Hz
Computing rank from data with rank=None
    Using tolerance 8.2e-05 (2.2e-16 eps * 64 dim * 5.8e+09  max singular value)
    Estimated rank (data): 64
    data: rank 64 computed from 64 data channels with 0 projectors
Reducing data rank from 64 -> 64
Estimating class=0 covariance using EMPIRICAL
Done.
Estimating class=1 covariance using EMPIRICAL
Done.
Fitting CSP for band: 21-30 Hz
Computing rank from data with rank=None
    Using tolerance 5.5e-05 (2.2e-16 eps * 64 dim * 3.9e+09  max singular value)
    Estimated rank (data): 64
    data: rank 64 



Transforming data for band: 31-40 Hz
Transforming data for band: 41-50 Hz
Transforming data for band: 51-60 Hz
Transforming data for band: 61-70 Hz
Transforming data for band: 71-79 Hz
Fitting CSP for band: 8-12 Hz
Computing rank from data with rank=None
    Using tolerance 8.6e-05 (2.2e-16 eps * 64 dim * 6.1e+09  max singular value)
    Estimated rank (data): 64
    data: rank 64 computed from 64 data channels with 0 projectors
Reducing data rank from 64 -> 64
Estimating class=0 covariance using EMPIRICAL
Done.
Estimating class=1 covariance using EMPIRICAL
Done.
Fitting CSP for band: 13-20 Hz
Computing rank from data with rank=None
    Using tolerance 8.1e-05 (2.2e-16 eps * 64 dim * 5.7e+09  max singular value)
    Estimated rank (data): 64
    data: rank 64 computed from 64 data channels with 0 projectors
Reducing data rank from 64 -> 64
Estimating class=0 covariance using EMPIRICAL
Done.
Estimating class=1 covariance using EMPIRICAL
Done.
Fitting CSP for band: 21-30 Hz
Computing ran



Transforming data for band: 31-40 Hz
Transforming data for band: 41-50 Hz
Transforming data for band: 51-60 Hz
Transforming data for band: 61-70 Hz
Transforming data for band: 71-79 Hz
Fitting CSP for band: 8-12 Hz
Computing rank from data with rank=None
    Using tolerance 8.6e-05 (2.2e-16 eps * 64 dim * 6e+09  max singular value)
    Estimated rank (data): 64
    data: rank 64 computed from 64 data channels with 0 projectors
Reducing data rank from 64 -> 64
Estimating class=0 covariance using EMPIRICAL
Done.
Estimating class=1 covariance using EMPIRICAL
Done.
Fitting CSP for band: 13-20 Hz
Computing rank from data with rank=None
    Using tolerance 7.9e-05 (2.2e-16 eps * 64 dim * 5.5e+09  max singular value)
    Estimated rank (data): 64
    data: rank 64 computed from 64 data channels with 0 projectors
Reducing data rank from 64 -> 64
Estimating class=0 covariance using EMPIRICAL
Done.
Estimating class=1 covariance using EMPIRICAL
Done.
Fitting CSP for band: 21-30 Hz
Computing rank 



Transforming data for band: 21-30 Hz
Transforming data for band: 31-40 Hz
Transforming data for band: 41-50 Hz
Transforming data for band: 51-60 Hz
Transforming data for band: 61-70 Hz
Transforming data for band: 71-79 Hz
Fitting CSP for band: 8-12 Hz
Computing rank from data with rank=None
    Using tolerance 8.4e-05 (2.2e-16 eps * 64 dim * 5.9e+09  max singular value)
    Estimated rank (data): 64
    data: rank 64 computed from 64 data channels with 0 projectors
Reducing data rank from 64 -> 64
Estimating class=0 covariance using EMPIRICAL
Done.
Estimating class=1 covariance using EMPIRICAL
Done.
Fitting CSP for band: 13-20 Hz
Computing rank from data with rank=None
    Using tolerance 8.1e-05 (2.2e-16 eps * 64 dim * 5.7e+09  max singular value)
    Estimated rank (data): 64
    data: rank 64 computed from 64 data channels with 0 projectors
Reducing data rank from 64 -> 64
Estimating class=0 covariance using EMPIRICAL
Done.
Estimating class=1 covariance using EMPIRICAL
Done.
Fitting

