In [None]:
import os
import pandas as pd
import mne
import numpy as np

# Step 1: Load Metadata from Participants.tsv
participants_file = '/Users/pro/Desktop/BigData/ProjetEEG/ds004504-1.0.7/participants.tsv'
participants_df = pd.read_csv(participants_file, sep='\t')

# Keep only relevant columns: participant_id and group 
participants_df = participants_df[['participant_id', 'Group']]

# Create a dictionary to map participant_id to group
group_mapping = participants_df.set_index('participant_id')['Group'].to_dict()

# Step 2: Load and Segment EEG Signals with Overlap
# Directory containing the EEG data
data_dir = '/Users/pro/Desktop/BigData/ProjetEEG/ds004504-1.0.7/derivatives'

# Parameters for segmentation
segment_length_sec = 5  # length of each segment in seconds
sampling_rate = 500  # Hz
segment_length = segment_length_sec * sampling_rate  # Number of data points per segment

# Parameters for overlap
overlap_ratio = 0.5  # 50% overlap
overlap_step = int(segment_length * (1 - overlap_ratio))  # Step size considering overlap

data_segments = []
group_labels = []

# Iterate over each subject folder to load EEG data
directories = [d for d in os.listdir(data_dir) if os.path.isdir(os.path.join(data_dir, d)) and d != '.DS_Store']
print("Directories in data_dir (Total):", len(directories))
print("Directories in data_dir:", directories)

for participant_id in directories:
    participant_folder = os.path.join(data_dir, participant_id)
    
    if participant_id in group_mapping:
        # Load the EEG data
        eeg_file = os.path.join(participant_folder, 'eeg', f'{participant_id}_task-eyesclosed_eeg.set')
        print("Looking for EEG file:", eeg_file)
        
        if os.path.exists(eeg_file):
            print(f"Loading EEG data for {participant_id}...")
            raw = mne.io.read_raw_eeglab(eeg_file, preload=True)
            
            # Get the EEG data as a numpy array
            data = raw.get_data()  # Shape: (n_channels, n_samples)
            n_channels, n_samples = data.shape
            
            # Segment the data with overlap
            start = 0
            while start + segment_length <= n_samples:
                end = start + segment_length
                segment = data[:, start:end]
                
                # Append the segment and corresponding group label
                data_segments.append(segment)
                group_labels.append(group_mapping[participant_id])
                
                # Move to the next segment considering overlap
                start += overlap_step

        else:
            print(f"EEG file not found for participant: {participant_id}")

# Convert lists to numpy arrays
data_segments = np.array(data_segments)  # Shape: (n_segments, n_channels, segment_length)
group_labels = np.array(group_labels)

# Print the shapes of the segmented data and labels
print("Data Segments Shape (with overlap):", data_segments.shape)
print("Group Labels Shape:", group_labels.shape)


In [None]:
from sklearn.preprocessing import StandardScaler

# Step 3: Standardize Each Segment
scaler = StandardScaler()
standardized_segments = []

for segment in data_segments:
    # Standardize each channel independently
    standardized_segment = scaler.fit_transform(segment)
    standardized_segments.append(standardized_segment)

# Convert the standardized segments list to a numpy array
standardized_segments = np.array(standardized_segments)

print("Standardized Segments Shape:", standardized_segments.shape)


In [None]:
import tensorflow as tf
from tensorflow.keras.layers import Input, Dense
from tensorflow.keras.models import Model

# Step 4: Define and Train the Autoencoder
input_shape = standardized_segments.shape[1:]  # Shape: (n_channels, segment_length)
latent_dim = 32  

# Define the encoder part of the autoencoder
input_layer = Input(shape=input_shape)
flattened = tf.keras.layers.Flatten()(input_layer)
encoded = Dense(128, activation='relu')(flattened)
encoded = Dense(latent_dim, activation='relu')(encoded)

# Define the decoder part of the autoencoder
decoded = Dense(128, activation='relu')(encoded)
decoded = Dense(np.prod(input_shape), activation='linear')(decoded)
decoded = tf.keras.layers.Reshape(input_shape)(decoded)

# Build the autoencoder model
autoencoder = Model(inputs=input_layer, outputs=decoded)
autoencoder.compile(optimizer='adam', loss='mse')

# Train the autoencoder
autoencoder.fit(
    standardized_segments, 
    standardized_segments,
    epochs=20,
    batch_size=32,
    shuffle=True,
    validation_split=0.2
)

# Define the encoder model to extract features
encoder = Model(inputs=input_layer, outputs=encoded)

# Extract features for all segments
encoded_segments = encoder.predict(standardized_segments)

# Print the shape of the encoded segments
print("Encoded Segments Shape:", encoded_segments.shape)


In [None]:
import numpy as np
import scipy.signal as signal
import scipy.integrate as integrate

# Définition des bandes de fréquence (incluant Gamma)
frequency_bands = {
    'delta': (0.5, 4),
    'theta': (4, 8),
    'alpha': (8, 13),
    'beta': (13, 25),
    'gamma': (25, 45)
}

# Taux d'échantillonnage des données EEG (en Hz)
sampling_rate = 128

def extract_band_power(segments, frequency_bands, sampling_rate):
    """
    Extrait la puissance des bandes de fréquence pour chaque segment EEG encodé.
    
    Args:
        segments (np.array): Segments encodés (N_segments, N_features)
        frequency_bands (dict): Dictionnaire des bandes de fréquence à extraire
        sampling_rate (int): Taux d'échantillonnage des données EEG
        
    Returns:
        np.array: Puissances des bandes de fréquence pour chaque segment
    """
    band_powers = []
    for segment in segments:
        # Calcul du spectre de puissance (PSD)
        freqs, psd = signal.welch(segment, fs=sampling_rate, nperseg=len(segment))
        
        # Calcul de la puissance pour chaque bande de fréquence définie
        band_power = []
        for band, (low_freq, high_freq) in frequency_bands.items():
            idx_band = np.logical_and(freqs >= low_freq, freqs <= high_freq)
            power = integrate.simps(psd[idx_band], freqs[idx_band])
            band_power.append(power)
        
        band_powers.append(band_power)
    
    return np.array(band_powers)

# Calcul des bandes de puissance sur les segments encodés
band_powers = extract_band_power(encoded_segments, frequency_bands, sampling_rate)

print("Band Powers Shape:", band_powers.shape)


In [None]:
from scipy.stats import entropy

# Calcul de l'entropie spectrale à partir des bandes de puissance
def calculate_spectral_entropy(band_powers):
    """
    Calculer l'entropie spectrale pour chaque segment encodé.

    Arguments:
    band_powers -- numpy array de forme (n_samples, n_bands)

    Retourne:
    spectral_entropy -- numpy array de forme (n_samples,)
    """
    # Normaliser les bandes de puissance pour obtenir des probabilités
    normalized_powers = band_powers / np.sum(band_powers, axis=1, keepdims=True)
    
    # Calculer l'entropie pour chaque segment
    spectral_entropy = entropy(normalized_powers, axis=1)
    
    return spectral_entropy

# Calculer l'entropie spectrale sur les bandes de puissance extraites
spectral_entropy = calculate_spectral_entropy(band_powers)

print("Spectral Entropy Shape:", spectral_entropy.shape)


In [None]:
# Combinaison des caractéristiques : bandes de puissance + entropie spectrale
combined_features = np.hstack((band_powers, spectral_entropy.reshape(-1, 1)))  # Forme : (27032, 6)
print("Combined Features Shape:", combined_features.shape)

# Préparation des séquences pour le LSTM
sequence_length = 10  # Nombre de segments consécutifs par séquence
num_features = combined_features.shape[1]

sequences = []
labels = []

for i in range(len(combined_features) - sequence_length):
    sequence = combined_features[i:i + sequence_length]
    label = group_labels[i + sequence_length - 1]  
    sequences.append(sequence)
    labels.append(label)

sequences = np.array(sequences)
labels = np.array(labels)

print("Sequences Shape:", sequences.shape)  
print("Labels Shape:", labels.shape)        


In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout, BatchNormalization, Bidirectional
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.utils.class_weight import compute_class_weight
import numpy as np

# Step 1: Encode labels into integers
label_encoder = LabelEncoder()
encoded_labels = label_encoder.fit_transform(labels)

# Step 2: One-hot encode the labels for categorical classification
categorical_labels = to_categorical(encoded_labels)

# Step 3: Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(
    sequences, 
    categorical_labels, 
    test_size=0.2, 
    random_state=42, 
    stratify=encoded_labels  
)

# Step 4: Compute class weights to handle class imbalance
class_weights = compute_class_weight(
    class_weight='balanced',
    classes=np.unique(encoded_labels),
    y=encoded_labels
)
class_weights_dict = dict(enumerate(class_weights))

# Step 5: Define the optimized LSTM model
model_optimized = Sequential([
    Bidirectional(LSTM(128, return_sequences=True, activation='tanh'), input_shape=(sequence_length, num_features)),
    Dropout(0.3),
    BatchNormalization(),
    LSTM(64, return_sequences=False, activation='tanh'),
    Dropout(0.3),
    Dense(64, activation='relu'),
    Dense(32, activation='relu'),
    Dense(categorical_labels.shape[1], activation='softmax')  # Output layer with softmax for classification
])

model_optimized.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

# Step 6: Add EarlyStopping and ReduceLROnPlateau callbacks
early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)
lr_scheduler = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5)

# Step 7: Train the model with class weights
history = model_optimized.fit(
    X_train, y_train,
    validation_data=(X_test, y_test),
    epochs=100,  # Train for more epochs with EarlyStopping
    batch_size=32,
    class_weight=class_weights_dict,  # Handle class imbalance
    shuffle=True,
    callbacks=[early_stopping, lr_scheduler]
)

# Step 8: Evaluate the optimized model on the test set
test_loss, test_accuracy = model_optimized.evaluate(X_test, y_test)
print(f"Test Loss (Optimized): {test_loss}")
print(f"Test Accuracy (Optimized): {test_accuracy}")

# Step 9: Save the trained optimized model
# Step 9: Save the trained optimized model
model_optimized.save("/Users/pro/Desktop/BigData/ProjetEEG/model/eeg_model_for5s.h5")
print("Optimized Model saved as '/Users/pro/Desktop/BigData/ProjetEEG/model/eeg_model_for5s.h5'")


# Step 10: Use the optimized model for predictions
predictions = model_optimized.predict(X_test)
predicted_labels = label_encoder.inverse_transform(np.argmax(predictions, axis=1))
true_labels = label_encoder.inverse_transform(np.argmax(y_test, axis=1))

# Print a few predictions and true labels for verification
for i in range(10):
    print(f"Predicted (Optimized): {predicted_labels[i]}, True: {true_labels[i]}")
