<a href="https://colab.research.google.com/github/adyab1/Detection-of-Parkinson-s-Disease-Using-Short-Time-Fourier-Transform-and-Attention-Mechanism-Model/blob/main/notebooks/PD_EEG.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>


#Current




In [None]:
!pip install mne

Collecting mne
  Downloading mne-1.7.1-py3-none-any.whl.metadata (13 kB)
Downloading mne-1.7.1-py3-none-any.whl (7.4 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m7.4/7.4 MB[0m [31m36.5 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: mne
Successfully installed mne-1.7.1


In [None]:
import numpy as np
import matplotlib.pyplot as plt
import mne


# Load the sample dataset
sample_data_path = mne.datasets.sample.data_path()
raw = str(sample_data_path) + '/MEG/sample/sample_audvis_raw.fif'
raw = mne.io.read_raw_fif(raw, preload=True)

Using default location ~/mne_data for sample...


Downloading file 'MNE-sample-data-processed.tar.gz' from 'https://osf.io/86qa2/download?version=6' to '/root/mne_data'.

  0%|                                              | 0.00/1.65G [00:00<?, ?B/s][A
  0%|                                      | 312k/1.65G [00:00<08:51, 3.11MB/s][A
  0%|                                      | 814k/1.65G [00:00<07:30, 3.67MB/s][A
  0%|                                     | 1.33M/1.65G [00:00<06:44, 4.08MB/s][A
  0%|                                     | 1.86M/1.65G [00:00<06:52, 4.01MB/s][A
  0%|                                     | 2.38M/1.65G [00:00<07:12, 3.81MB/s][A
  0%|                                     | 2.90M/1.65G [00:00<06:59, 3.93MB/s][A
  0%|                                     | 3.42M/1.65G [00:00<06:47, 4.05MB/s][A
  0%|                                     | 3.95M/1.65G [00:00<06:34, 4.18MB/s][A
  0%|                                     | 4.46M/1.65G [00:01<06:22, 4.31MB/s][A
  0%|                                     | 4.99M

Attempting to create new mne-python configuration file:
/root/.mne/mne-python.json
Download complete in 08m29s (1576.2 MB)
Opening raw data file /root/mne_data/MNE-sample-data/MEG/sample/sample_audvis_raw.fif...
    Read a total of 3 projection items:
        PCA-v1 (1 x 102)  idle
        PCA-v2 (1 x 102)  idle
        PCA-v3 (1 x 102)  idle
    Range : 25800 ... 192599 =     42.956 ...   320.670 secs
Ready.
Reading 0 ... 166799  =      0.000 ...   277.714 secs...


In [None]:
# Pick EEG channels
raw.pick_types(meg=False, eeg=True, eog=False, stim=False)

# Filter the data
raw.filter(l_freq=0.5, h_freq=50)

# Choose an EEG channel to analyze
channel = 'EEG 001'
channel_idx = raw.ch_names.index(channel)

# Extract the raw data and sampling frequency
sfreq = int(raw.info['sfreq'])
data, times = raw.get_data(return_times=True)
eeg_data = data[channel_idx]


In [None]:
from scipy.signal import stft

# Define STFT parameters
window_size = 256
overlap = int(window_size * 0.5)

# Compute STFT
frequencies, time_points, Zxx = stft(eeg_data, fs=sfreq, nperseg=window_size, noverlap=overlap)

# Calculate power spectral density (PSD)
psd = np.abs(Zxx) ** 2

# Plot the time-frequency representation
plt.figure(figsize=(12, 6))
plt.pcolormesh(time_points, frequencies, 10 * np.log10(psd), shading='gouraud')
plt.xlabel('Time (s)')
plt.ylabel('Frequency (Hz)')
plt.title(f'Time-Frequency Representation of {channel} Channel')
plt.ylim([0, 50])
plt.colorbar().set_label('Power Spectral Density (dB)')
plt.show()


#Tests

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

In [None]:
!pip install mne


In [None]:
import os
import numpy as np
import pandas as pd
import mne
from scipy.signal import stft, get_window

In [None]:
def perform_stft(raw, window='hann', nperseg=256, noverlap=None):
    data = raw.get_data()
    sampling_rate = raw.info['sfreq']
    window_function = get_window(window, nperseg)

    freqs, times, stft_data = stft(data, fs=sampling_rate, window=window_function, nperseg=nperseg, noverlap=noverlap)

    return freqs, times, stft_data


In [None]:
def spectral_centroid(magnitude, freqs):
    normalized_magnitude = magnitude / np.sum(magnitude, axis=0)
    centroid = np.sum(freqs[:, np.newaxis] * normalized_magnitude, axis=0)
    return centroid

def spectral_bandwidth(magnitude, freqs):
    normalized_magnitude = magnitude / np.sum(magnitude, axis=0)
    centroid = spectral_centroid(magnitude, freqs)
    second_moment = np.sum((freqs[:, np.newaxis] - centroid) ** 2 * normalized_magnitude, axis=0)
    bandwidth = np.sqrt(second_moment)
    return bandwidth

def extract_features(magnitude, freqs):
    # Spectral peaks: spectral centroid (weighted average of frequencies)
    centroid = spectral_centroid(magnitude, freqs)

    # Energy distribution: spectral bandwidth (square root of the second central moment)
    bandwidth = spectral_bandwidth(magnitude, freqs)

    features = {'centroid': centroid, 'bandwidth': bandwidth}
    return features


In [None]:
import mne

def load_eeg_data(subject_folder):
    eeg_files = [file for file in os.listdir(subject_folder) if file.endswith(".bdf")]

    if len(eeg_files) == 0:
        return None

    eeg_file = os.path.join(subject_folder, eeg_files[0])
    channels_file = eeg_file.replace("_task-rest_eeg.bdf", "_task-rest_channels.tsv")

    raw = mne.io.read_raw_bdf(eeg_file, preload=True)
    channels_info = pd.read_csv(channels_file, delimiter='\t')

    channel_names = channels_info['name'].tolist()
    raw.rename_channels({old: new for old, new in zip(raw.ch_names, channel_names)})

    return raw



In [None]:
import matplotlib.pyplot as plt

features_all = []
data_path = "/content/gdrive/MyDrive/UCSD"
# Iterate through subjects
for subject_folder in os.listdir(data_path):
    if not subject_folder.startswith('sub-'):
        continue

    subject_path = os.path.join(data_path, subject_folder)

    # Iterate through sessions (hc, on, off)
    for session_folder in os.listdir(subject_path):
        session_path = os.path.join(subject_path, session_folder)
        eeg_folder = os.path.join(session_path, "eeg")

        # Load the EEG data
        raw = load_eeg_data(eeg_folder)

        if raw is None:
            print(f"No EEG data found for {subject_folder}, {session_folder}")
            continue

        # Perform STFT
        freqs, times, stft_data = perform_stft(raw)

        # Compute the magnitude
        magnitude = np.abs(stft_data)

        # Extract features
        features = extract_features(magnitude, freqs)

        features_all.append((subject_folder, session_folder, features))


In [None]:
subject = 'sub-hc8'
session = 'ses-hc'

# Get the features for the selected subject and session
features = next((f for (s, ss, f) in features_all if s == subject and ss == session), None)

if features is not None:
    # Plot spectral centroid
    plt.figure(figsize=(12, 4))
    plt.imshow(features['centroid'], aspect='auto', origin='lower', extent=[times[0], times[-1], 0, len(raw.ch_names)], cmap='viridis')
    plt.colorbar(label='Frequency (Hz)')
    plt.title(f'Spectral Centroid for {subject}, {session}')
    plt.xlabel('Time (s)')
    plt.ylabel('Channels')
    plt.show()
else:
    print(f"Features not found for subject {subject} and session {session}")


In [None]:
!pip install tensorflow


In [None]:
!pip install tensorflow
!pip install scikit-learn
!pip install Keras-Preprocessing

import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Layer
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Bidirectional, LSTM, Dense
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical


In [None]:
def label_function(subject, session):
    if subject.startswith('sub-hc'):
        return 0
    elif session == 'ses-on':
        return 1
    else:
        return 2

# Prepare the data
X_data = [np.column_stack((features['centroid'], features['bandwidth'])) for subject, session, features in features_all]
y_data = [label_function(subject, session) for subject, session, features in features_all]

# Split the data into training and validation sets
X_train, X_val, y_train, y_val = train_test_split(X_data, y_data, test_size=0.2, random_state=42)

# Convert the labels to one-hot encoding
y_train = to_categorical(y_train, num_classes=3)
y_val = to_categorical(y_val, num_classes=3)


In [None]:
class AttentionLayer(Layer):
    def __init__(self, **kwargs):
        super(AttentionLayer, self).__init__(**kwargs)

    def build(self, input_shape):
        self.W = self.add_weight(shape=(input_shape[-1], input_shape[-1]), initializer='random_normal', trainable=True, name='W')
        self.b = self.add_weight(shape=(input_shape[-1],), initializer='zeros', trainable=True, name='b')
        self.u = self.add_weight(shape=(input_shape[-1], 1), initializer='random_normal', trainable=True, name='u')
        super(AttentionLayer, self).build(input_shape)

    def call(self, inputs):
        q = tf.nn.tanh(tf.tensordot(inputs, self.W, axes=1) + self.b)
        a = tf.nn.softmax(tf.tensordot(q, self.u, axes=1), axis=1)
        return tf.reduce_sum(inputs * a, axis=1)

def create_model(input_shape):
    input_layer = Input(shape=input_shape)
    lstm_layer = Bidirectional(LSTM(128, return_sequences=True))(input_layer)
    attention_layer = AttentionLayer()(lstm_layer)
    output_layer = Dense(3, activation='softmax')(attention_layer)

    model = Model(inputs=input_layer, outputs=output_layer)
    return model


In [None]:
def pad_sequences_2d(sequences, maxlen, num_features, dtype='float32', value=0.0):
    num_samples = len(sequences)
    padded_sequences = np.full((num_samples, maxlen, num_features), value, dtype=dtype)

    for idx, sequence in enumerate(sequences):
        if sequence.shape[0] > maxlen:
            sequence = sequence[:maxlen]
        padded_sequences[idx, :sequence.shape[0], :sequence.shape[1]] = sequence

    return padded_sequences


def scale_sequence(sequence, scalers):
    scaled_sequence = sequence.copy()
    for idx, feature_scaler in enumerate(scalers):
        if idx < sequence.shape[1]:
            scaled_sequence[:, idx] = feature_scaler.transform(sequence[:, idx].reshape(-1, 1)).reshape(-1)
    return scaled_sequence

# Find the maximum sequence length in the training data
max_length = max([len(sequence) for sequence in X_train])

# Fit the scaler
max_features = max([sequence.shape[1] for sequence in X_train])
scaler = [MinMaxScaler() for _ in range(max_features)]

for idx, sequence in enumerate(X_train):
    for feature_idx in range(max_features):
        if feature_idx < sequence.shape[1]:
            feature_data = sequence[:, feature_idx].reshape(-1, 1)
            scaler[feature_idx].partial_fit(feature_data)

# Scale the sequences
X_train_scaled = [scale_sequence(sequence, scaler) for sequence in X_train]
X_val_scaled = [scale_sequence(sequence, scaler) for sequence in X_val]

# Find the maximum number of features in all the data
max_features = max([sequence.shape[1] for sequence in X_train + X_val])

# Pad the sequences
X_train_padded = pad_sequences_2d(X_train_scaled, maxlen=max_length, dtype='float32', num_features=max_features)
X_val_padded = pad_sequences_2d(X_val_scaled, maxlen=max_length, dtype='float32', num_features=max_features)




In [None]:
# Create the model
model = create_model(X_train_padded[0].shape)
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

print("X_train_padded shape:", X_train_padded.shape)
print("X_val_padded shape:", X_val_padded.shape)


# Train the model
batch_size = 2
history = model.fit(X_train_padded, y_train, validation_data=(X_val_padded, y_val), epochs=50, batch_size=batch_size)


In [None]:
!pip install matplotlib


In [None]:
import matplotlib.pyplot as plt

def plot_history(history):
    plt.figure(figsize=(12, 4))

    # Plot loss
    plt.subplot(1, 2, 1)
    plt.plot(history.history['loss'], label='Training Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.title('Loss')

    # Plot accuracy
    plt.subplot(1, 2, 2)
    plt.plot(history.history['accuracy'], label='Training Accuracy')
    plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()
    plt.title('Accuracy')

    plt.show()

plot_history(history)


In [None]:
from sklearn.metrics import classification_report, confusion_matrix


In [None]:
y_val_pred = model.predict(X_val_padded)
y_val_pred_classes = np.argmax(y_val_pred, axis=1)
y_val_true_classes = np.argmax(y_val, axis=1)


In [None]:
print("Classification Report:")
print(classification_report(y_val_true_classes, y_val_pred_classes))


In [None]:
def specificity_score(y_true, y_pred):
    cm = confusion_matrix(y_true, y_pred)
    tn, fp, fn, tp = cm.ravel()
    specificity = tn / (tn + fp)
    return specificity

specificity = specificity_score(y_val_true_classes, y_val_pred_classes)
print("Specificity:", specificity)


In [None]:
import matplotlib.pyplot as plt
from sklearn.metrics import plot_confusion_matrix

def plot_cm(y_true, y_pred, class_names):
    cm = confusion_matrix(y_true, y_pred)
    fig, ax = plt.subplots(figsize=(6, 6))
    im = ax.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues)
    ax.figure.colorbar(im, ax=ax)
    ax.set(xticks=np.arange(cm.shape[1]),
           yticks=np.arange(cm.shape[0]),
           xticklabels=class_names, yticklabels=class_names,
           title="Confusion Matrix",
           ylabel='True label',
           xlabel='Predicted label')
    plt.show()

class_names = ['Healthy', 'On-Med', 'Off-Med']
plot_cm(y_val_true_classes, y_val_pred_classes, class_names)


In [None]:
!pip install tensorflow
!pip install scikit-learn
!pip install Keras-Preprocessing

import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Layer
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Bidirectional, LSTM, Dense
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical

#New Stuff

In [None]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
from sklearn.model_selection import GridSearchCV

def label_function(subject, session):
    if subject.startswith('sub-hc'):
        return 0
    elif session == 'ses-on':
        return 1
    else:
        return 2

def pad_sequences_2d(sequences, maxlen, num_features, dtype='float32', value=0.0):
    num_samples = len(sequences)
    padded_sequences = np.full((num_samples, maxlen, num_features), value, dtype=dtype)

    for idx, sequence in enumerate(sequences):
        if sequence.shape[0] > maxlen:
            sequence = sequence[:maxlen]
        padded_sequences[idx, :sequence.shape[0], :sequence.shape[1]] = sequence

    return padded_sequences


def scale_sequence(sequence, scalers):
    scaled_sequence = sequence.copy()
    for idx, feature_scaler in enumerate(scalers):
        if idx < sequence.shape[1]:
            scaled_sequence[:, idx] = feature_scaler.transform(sequence[:, idx].reshape(-1, 1)).reshape(-1)
    return scaled_sequence


In [None]:

# Prepare the data
X_data = [np.column_stack((features['centroid'], features['bandwidth'])) for subject, session, features in features_all]
y_data = [label_function(subject, session) for subject, session, features in features_all]

# Split the data into training and validation sets
X_train_rf, X_val_rf, y_train_rf, y_val_rf = train_test_split(X_data, y_data, test_size=0.2, random_state=42)

# Pad the sequences
max_length_rf = max([len(sequence) for sequence in X_data])
max_features_rf = max([sequence.shape[1] for sequence in X_data])

X_train_rf_padded = pad_sequences_2d(X_train_rf, maxlen=max_length_rf, dtype='float32', num_features=max_features_rf)
X_val_rf_padded = pad_sequences_2d(X_val_rf, maxlen=max_length_rf, dtype='float32', num_features=max_features_rf)

# Flatten the data for RandomForest
X_train_rf_flat = np.array([x.flatten() for x in X_train_rf_padded])
X_val_rf_flat = np.array([x.flatten() for x in X_val_rf_padded])

In [None]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import GridSearchCV

# Create a RandomForest Classifier
rf = RandomForestClassifier()

# Set the hyperparameters for grid search
param_grid = {
    'n_estimators': [10, 50, 100, 200],
    'max_depth': [None, 10, 20, 30, 50],
    'min_samples_split': [2, 5, 10],
    'min_samples_leaf': [1, 2, 4],
}

# Perform grid search for hyperparameter tuning
grid_search = GridSearchCV(estimator=rf, param_grid=param_grid, cv=5, n_jobs=-1, verbose=2)
grid_search.fit(X_train_rf_flat, y_train_rf)

# Get the best estimator
best_rf = grid_search.best_estimator_

# Check the performance on the validation set
y_val_rf_pred = best_rf.predict(X_val_rf_flat)
accuracy = np.sum(y_val_rf_pred == y_val_rf) / len(y_val_rf)
print("Random Forest Validation Accuracy:", accuracy)



In [None]:
def spectral_contrast(magnitude, freqs, n_bands=6):
    freq_res = freqs[1] - freqs[0]
    octave_frequencies = np.arange(1, n_bands + 1) * freq_res
    contrast = np.zeros((n_bands, magnitude.shape[1]))

    for k in range(magnitude.shape[1]):
        for i in range(n_bands):
            if i == 0:
                l = 0
                u = octave_frequencies[i]
            else:
                l = u
                u = octave_frequencies[i]

            freq_band = np.where((freqs >= l) & (freqs <= u))
            contrast[i, k] = 10 * np.log10(np.sum(magnitude[freq_band, k] ** 2) / np.sum(magnitude[:, k] ** 2))

    return contrast

def extract_features(magnitude, freqs):
    # Spectral centroid
    centroid = np.sum(magnitude * freqs[:, np.newaxis], axis=0) / np.sum(magnitude, axis=0)

    # Spectral bandwidth
    bandwidth = np.sqrt(np.sum(((freqs[:, np.newaxis] - centroid) ** 2) * magnitude, axis=0) / np.sum(magnitude, axis=0))

    # Spectral contrast
    contrast = np.max(magnitude, axis=0) / np.min(magnitude, axis=0)

    # Average the contrast feature along the time axis
    contrast_avg = np.mean(contrast, axis=1)

    return {'centroid': centroid, 'bandwidth': bandwidth, 'contrast': contrast_avg}




In [None]:
features_all = []

data_path = "/content/gdrive/MyDrive/UCSD"
# Iterate through subjects
for subject_folder in os.listdir(data_path):
    if not subject_folder.startswith('sub-'):
        continue

    subject_path = os.path.join(data_path, subject_folder)

    # Iterate through sessions (hc, on, off)
    for session_folder in os.listdir(subject_path):
        session_path = os.path.join(subject_path, session_folder)
        eeg_folder = os.path.join(session_path, "eeg")

        # Load the EEG data
        raw = load_eeg_data(eeg_folder)

        if raw is None:
            print(f"No EEG data found for {subject_folder}, {session_folder}")
            continue

        # Perform STFT
        freqs, times, stft_data = perform_stft(raw)

        # Compute the magnitude
        magnitude = np.abs(stft_data)

        # Extract features
        features = extract_features(magnitude, freqs)

        features_all.append((subject_folder, session_folder, features))


In [None]:
# Prepare the data
X_data = [np.column_stack((features['centroid'], features['bandwidth'], features['contrast'])) for subject, session, features in features_all]
y_data = [label_function(subject, session) for subject, session, features in features_all]

# Split the data into training and validation sets
X_train, X_val, y_train, y_val = train_test_split(X_data, y_data, test_size=0.2, random_state=42)

# Pad the sequences
max_length_rf = max([len(sequence) for sequence in X_data])
max_features_rf = max([sequence.shape[1] for sequence in X_data])

X_train_rf_padded = pad_sequences_2d(X_train_rf, maxlen=max_length_rf, dtype='float32', num_features=max_features_rf)
X_val_rf_padded = pad_sequences_2d(X_val_rf, maxlen=max_length_rf, dtype='float32', num_features=max_features_rf)

# Flatten the data for RandomForest
X_train_rf_flat = np.array([x.flatten() for x in X_train_rf_padded])
X_val_rf_flat = np.array([x.flatten() for x in X_val_rf_padded])

In [None]:
# Create and train the model using the best parameters found
best_params = {'max_depth': None, 'min_samples_leaf': 4, 'min_samples_split': 2, 'n_estimators': 10}
rf_classifier = RandomForestClassifier(**best_params)
rf_classifier.fit(X_train_rf_flat, y_train)  # <-- change here

# Make predictions and calculate accuracy
y_pred = rf_classifier.predict(X_val_rf_flat)  # <-- change here
accuracy = accuracy_score(y_val, y_pred)
print("RandomForest Accuracy:", accuracy)


#latest


In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

!pip install mne


In [None]:
import os
import numpy as np
import pandas as pd
import mne
from scipy.signal import stft, get_window
from sklearn.preprocessing import StandardScaler

In [None]:
def perform_stft(raw, window='hann', nperseg=256, noverlap=None):
    data = raw.get_data()
    sampling_rate = raw.info['sfreq']
    window_function = get_window(window, nperseg)

    freqs, times, stft_data = stft(data, fs=sampling_rate, window=window_function, nperseg=nperseg, noverlap=noverlap)

    return freqs, times, stft_data

def spectral_centroid(magnitude, freqs):
    normalized_magnitude = magnitude / np.sum(magnitude, axis=0)
    centroid = np.sum(freqs[:, np.newaxis] * normalized_magnitude, axis=0)
    return centroid

def spectral_bandwidth(magnitude, freqs):
    normalized_magnitude = magnitude / np.sum(magnitude, axis=0)
    centroid = spectral_centroid(magnitude, freqs)
    second_moment = np.sum((freqs[:, np.newaxis] - centroid) ** 2 * normalized_magnitude, axis=0)
    bandwidth = np.sqrt(second_moment)
    return bandwidth

def extract_features(magnitude, freqs):
    # Spectral peaks: spectral centroid (weighted average of frequencies)
    centroid = spectral_centroid(magnitude, freqs)

    # Energy distribution: spectral bandwidth (square root of the second central moment)
    bandwidth = spectral_bandwidth(magnitude, freqs)

    features = {'centroid': centroid, 'bandwidth': bandwidth}
    return features


In [None]:

import mne

def load_eeg_data(subject_folder):
    eeg_files = [file for file in os.listdir(subject_folder) if file.endswith(".bdf")]

    if len(eeg_files) == 0:
        return None

    eeg_file = os.path.join(subject_folder, eeg_files[0])
    channels_file = eeg_file.replace("_task-rest_eeg.bdf", "_task-rest_channels.tsv")

    raw = mne.io.read_raw_bdf(eeg_file, preload=True)
    channels_info = pd.read_csv(channels_file, delimiter='\t')

    channel_names = channels_info['name'].tolist()
    raw.rename_channels({old: new for old, new in zip(raw.ch_names, channel_names)})

    return raw


In [None]:

import matplotlib.pyplot as plt

features_all = []
data_path = "/content/gdrive/MyDrive/UCSD"
# Iterate through subjects
for subject_folder in os.listdir(data_path):
    if not subject_folder.startswith('sub-'):
        continue

    subject_path = os.path.join(data_path, subject_folder)

    # Iterate through sessions (hc, on, off)
    for session_folder in os.listdir(subject_path):
        session_path = os.path.join(subject_path, session_folder)
        eeg_folder = os.path.join(session_path, "eeg")

        # Load the EEG data
        raw = load_eeg_data(eeg_folder)

        if raw is None:
            print(f"No EEG data found for {subject_folder}, {session_folder}")
            continue

        # Perform STFT
        freqs, times, stft_data = perform_stft(raw)

        # Compute the magnitude
        magnitude = np.abs(stft_data)

        # Extract features
        features = extract_features(magnitude, freqs)

        features_all.append((subject_folder, session_folder, features))

In [None]:
subject = 'sub-hc8'
session = 'ses-hc'

# Get the features for the selected subject and session
features = next((f for (s, ss, f) in features_all if s == subject and ss == session), None)

if features is not None:
    # Plot spectral centroid
    plt.figure(figsize=(12, 4))
    plt.imshow(features['centroid'], aspect='auto', origin='lower', extent=[times[0], times[-1], 0, len(raw.ch_names)], cmap='viridis')
    plt.colorbar(label='Frequency (Hz)')
    plt.title(f'Spectral Centroid for {subject}, {session}')
    plt.xlabel('Time (s)')
    plt.ylabel('Channels')
    plt.show()
else:
    print(f"Features not found for subject {subject} and session {session}")

In [None]:

!pip install tensorflow

!pip install tensorflow
!pip install scikit-learn
!pip install Keras-Preprocessing

import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Layer
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Bidirectional, LSTM, Dense
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical

In [None]:

def label_function(subject, session):
    if subject.startswith('sub-hc'):
        return 0
    elif session == 'ses-on':
        return 1
    else:
        return 2



In [None]:
from sklearn.preprocessing import MinMaxScaler

def pad_sequences_columns(sequences, max_columns, dtype='float32'):
    padded_sequences = []
    for sequence in sequences:
        n_rows, n_cols = sequence.shape
        padding = np.zeros((n_rows, max_columns - n_cols), dtype=dtype)
        padded_sequence = np.hstack((sequence, padding))
        padded_sequences.append(padded_sequence)
    return padded_sequences

# Prepare the data
X_data = [np.column_stack((features['centroid'], features['bandwidth'])) for subject, session, features in features_all]
y_data = [label_function(subject, session) for subject, session, features in features_all]

# Split the data into training and validation sets
X_train, X_val, y_train, y_val = train_test_split(X_data, y_data, test_size=0.2, random_state=42)

# Convert the labels to one-hot encoding
y_train = to_categorical(y_train, num_classes=3)
y_val = to_categorical(y_val, num_classes=3)

# Find the maximum number of columns across all sequences
max_columns = max([sequence.shape[1] for sequence in X_train])

# Pad sequences with columns of zeros
X_train_padded_columns = pad_sequences_columns(X_train, max_columns)
X_val_padded_columns = pad_sequences_columns(X_val, max_columns)

# Concatenate all training sequences
X_train_concatenated = np.vstack(X_train_padded_columns)

# Fit the MinMaxScaler on the entire training dataset
scaler = MinMaxScaler()
scaler.fit(X_train_concatenated)

# Apply the transformation to each sequence in the training and validation sets
X_train_scaled = [scaler.transform(sequence) for sequence in X_train_padded_columns]
X_val_scaled = [scaler.transform(sequence) for sequence in X_val_padded_columns]



In [None]:
from keras.layers import Masking
from keras_preprocessing.sequence import pad_sequences
from keras.layers import Attention

from keras.layers import GlobalMaxPooling1D

def create_model_with_dropout(input_shape):
    input_layer = Input(shape=input_shape)
    masking_layer = Masking(mask_value=0.0)(input_layer)
    lstm_layer = Bidirectional(LSTM(128, return_sequences=True, dropout=0.5, recurrent_dropout=0.5))(masking_layer)
    attention_layer = Attention()([lstm_layer, lstm_layer])
    pooling_layer = GlobalMaxPooling1D()(attention_layer)
    output_layer = Dense(3, activation='softmax')(pooling_layer)

    model = Model(inputs=input_layer, outputs=output_layer)
    return model

# Find the maximum sequence length in the training data
max_length = max([len(sequence) for sequence in X_train])

# Pad the sequences
X_train_padded = pad_sequences(X_train_scaled, maxlen=max_length, dtype='float32', padding='post', truncating='post')
X_val_padded = pad_sequences(X_val_scaled, maxlen=max_length, dtype='float32', padding='post', truncating='post')

# Create and compile the model
model = create_model_with_dropout(X_train_padded[0].shape)
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

print("X_train_padded shape:", X_train_padded.shape)
print("X_val_padded shape:", X_val_padded.shape)

In [None]:
import matplotlib.pyplot as plt

def plot_history(history):
    # Plot loss and accuracy
    plt.figure(figsize=(12, 6))

    plt.subplot(1, 2, 1)
    plt.plot(history.history['loss'], label='Training Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()

    plt.subplot(1, 2, 2)
    plt.plot(history.history['accuracy'], label='Training Accuracy')
    plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.legend()

    plt.show()

plot_history(history)


In [None]:
# Train the model with an increased batch size
batch_size = 8
history = model.fit(X_train_padded, y_train, validation_data=(X_val_padded, y_val), epochs=50, batch_size=batch_size)

# Plot the training history
plot_history(history)


In [None]:
from sklearn.metrics import classification_report


In [None]:
# Evaluate the model
y_val_pred = model.predict(X_val_padded)
y_val_pred_classes = np.argmax(y_val_pred, axis=1)
y_val_true_classes = np.argmax(y_val, axis=1)

print("Classification Report:")
print(classification_report(y_val_true_classes, y_val_pred_classes))


In [None]:
from sklearn.metrics import confusion_matrix


def specificity_score(y_true, y_pred):
    cm = confusion_matrix(y_true, y_pred)
    specificity = cm[0, 0] / (cm[0, 0] + cm[0, 1])
    return specificity


In [None]:
specificity = specificity_score(y_val_true_classes, y_val_pred_classes)
print("Specificity:", specificity)

