In [None]:
import mne
import numpy as np
import matplotlib.pyplot as plt

from sklearn.model_selection import GridSearchCV, LeaveOneOut
from sklearn.preprocessing import PowerTransformer, RobustScaler, StandardScaler, MinMaxScaler
from sklearn.pipeline import Pipeline
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score, classification_report
import pandas as pd
import numpy as np
import mne
from collections import defaultdict
%matplotlib inline

# Plotting Language PSDs

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import mne


# Define frequency bands.
frequency_bands = {
    "delta": (1, 4),
    "theta": (4, 8),
    "alpha": (8, 12),
    "beta": (12, 30),
    "gamma_low": (30, 48),
    "gamma_high": (48, 68)
}


# Function to extract power in each band using PSD.
def band_power_psd(epochs, freq_bands):
    psds, freqs = epochs.compute_psd().get_data(return_freqs=True)
    band_powers = {}
    for band, (fmin, fmax) in freq_bands.items():
        freq_mask = (freqs >= fmin) & (freqs <= fmax)
        band_powers[band] = np.mean(psds[:, :, freq_mask], axis=2)
    return band_powers


subjects = ["s1", "s2", "s3", "s4", "s5", "s6"]


# Initialize lists to store the averaged band powers across all subjects.
dutch_avg_band_powers = {band: [] for band in frequency_bands.keys()}
english_avg_band_powers = {band: [] for band in frequency_bands.keys()}

for subject in subjects:
    EEG_data_path = f'./Subject Data/{subject.upper()}/'
    epochs = mne.read_epochs(EEG_data_path + f"{subject}_ar_epo.fif", preload=True)
    dutch_epochs = epochs['21', '22', '23']
    english_epochs = epochs['11', '12', '13']
    
    # Extract power spectral density for each band and language.
    dutch_psds = band_power_psd(dutch_epochs, frequency_bands)
    english_psds = band_power_psd(english_epochs, frequency_bands)
    
    # For each band and language, calculate average power, then store those.
    for band in frequency_bands.keys():
        dutch_avg_band_powers[band].append(np.mean(dutch_psds[band]))
        english_avg_band_powers[band].append(np.mean(english_psds[band]))

        
# Calculate the across-subjects mean power for each frequency band.
dutch_avg_band_powers = {band: np.mean(powers) for band, powers in dutch_avg_band_powers.items()}
english_avg_band_powers = {band: np.mean(powers) for band, powers in english_avg_band_powers.items()}


# Plot the data.
fig, ax = plt.subplots(figsize=(12, 6))
bar_width = 0.35
index = np.arange(len(frequency_bands))

dutch_bar = ax.bar(index, dutch_avg_band_powers.values(), bar_width, label='Dutch')
english_bar = ax.bar(index + bar_width, english_avg_band_powers.values(), bar_width, label='English')

ax.set_title('Average Power by Frequency Band for Dutch and English Epochs')
ax.set_xlabel('Frequency Band')
ax.set_ylabel('Power')
ax.set_xticks(index + bar_width / 2)
ax.set_xticklabels(frequency_bands.keys())
ax.legend()

plt.tight_layout()
plt.show()


# Plotting Valence PSDs (Dutch-only)

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import mne


# Define frequency bands.
frequency_bands = {
    "delta": (1, 4),
    "theta": (4, 8),
    "alpha": (8, 12),
    "beta": (12, 30),
    "gamma_low": (30, 48),
    "gamma_high": (48, 68)
}


# Function to extract power in each band using PSD.
def band_power_psd(epochs, freq_bands):
    psds, freqs = epochs.compute_psd().get_data(return_freqs=True)
    band_powers = {}
    for band, (fmin, fmax) in freq_bands.items():
        freq_mask = (freqs >= fmin) & (freqs <= fmax)
        band_powers[band] = np.mean(psds[:, :, freq_mask], axis=2)
    return band_powers


# Subjects list
subjects = ["s1", "s2", "s3", "s4", "s5", "s6"]


# Initialize lists to store the averaged band powers across all subjects.
dutch_pos_psds_avg_band_powers = {band: [] for band in frequency_bands.keys()}
dutch_neu_psds_avg_band_powers = {band: [] for band in frequency_bands.keys()}
dutch_neg_psds_avg_band_powers = {band: [] for band in frequency_bands.keys()}

for subject in subjects:
    EEG_data_path = f'./Subject Data/{subject.upper()}/'
    epochs = mne.read_epochs(EEG_data_path + f"{subject}_ar_epo.fif", preload=True)
    dutch_epochs_pos = epochs['21']
    dutch_epochs_neu = epochs['22']
    dutch_epochs_neg = epochs['23']
    
    # Extract power spectral density for each band and valence (Dutch-only).
    dutch_pos_psds = band_power_psd(dutch_epochs_pos, frequency_bands)
    dutch_neu_psds = band_power_psd(dutch_epochs_neu, frequency_bands)
    dutch_neg_psds = band_power_psd(dutch_epochs_neg, frequency_bands)
    
    
    # For each band and valence, calculate average power, then store those (Dutch-only).
    for band in frequency_bands.keys():
        dutch_pos_psds_avg_band_powers[band].append(np.mean(dutch_pos_psds[band]))
        dutch_neu_psds_avg_band_powers[band].append(np.mean(dutch_neu_psds[band]))
        dutch_neg_psds_avg_band_powers[band].append(np.mean(dutch_neg_psds[band]))

        
# Calculate the across-subjects mean power for each frequency band.
dutch_pos_psds_avg_band_powers = {band: np.mean(powers) for band, powers in dutch_pos_psds_avg_band_powers.items()}
dutch_neu_psds_avg_band_powers = {band: np.mean(powers) for band, powers in dutch_neu_psds_avg_band_powers.items()}
dutch_neg_psds_avg_band_powers = {band: np.mean(powers) for band, powers in dutch_neg_psds_avg_band_powers.items()}

# Plot the data.
fig, ax = plt.subplots(figsize=(12, 6))
bar_width = 0.33
index = np.arange(len(frequency_bands))

pos_bar = ax.bar(index, dutch_pos_psds_avg_band_powers.values(), bar_width, label='Positive')
neu_bar = ax.bar(index + bar_width, dutch_neu_psds_avg_band_powers.values(), bar_width, label='Neutral')
neg_bar = ax.bar(index + bar_width, dutch_neg_psds_avg_band_powers.values(), bar_width, label='Negative')

ax.set_title('Average Power by Frequency Band for Pos/Neu/Neg in Dutch')
ax.set_xlabel('Frequency Band')
ax.set_ylabel('Power')
ax.set_xticks(index + bar_width / 2)
ax.set_xticklabels(frequency_bands.keys())
ax.legend()

plt.tight_layout()
plt.show()


# Language Classification (Within-subject)

In [None]:
import mne
import numpy as np
import pandas as pd
from sklearn.model_selection import GridSearchCV, train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score, classification_report
import pywt
from scipy.signal import stft, hilbert
iir_params = dict(order=4, ftype='butter')


# Define frequency bands.
frequency_bands = {
    "delta": (1, 4),
    "theta": (4, 8),
    "alpha": (8, 12),
    "beta": (12, 30),
    "gamma_low": (30, 48),
    "gamma_high": (48, 68)
}


# Function to extract power in each band using PSD.
def band_power_psd(epochs, freq_bands):
    psds, freqs = epochs.compute_psd().get_data(return_freqs=True)
    band_powers = {}
    for band, (fmin, fmax) in freq_bands.items():
        freq_mask = (freqs >= fmin) & (freqs <= fmax)
        band_powers[band] = np.mean(psds[:, :, freq_mask], axis=2)
    return band_powers


# Function to extract wavelet features for each band.
def extract_wavelet_features(epochs, freq_bands, wavelet='sym8'):
    wavelet_band_features = {}
    for band, (fmin, fmax) in freq_bands.items():
        band_data = epochs.copy().filter(fmin, fmax, method='iir', iir_params=iir_params).get_data()
        wavelet_features = []
        for epoch in band_data:
            coeffs = pywt.wavedec(epoch, wavelet, level=3)
            features = np.hstack([np.mean(c, axis=1) for c in coeffs])
            wavelet_features.append(features)
        wavelet_band_features[band] = np.array(wavelet_features)
    return wavelet_band_features


# Function to extract STFT features for each band.
def extract_stft_features(epochs, freq_bands, fs=256, nperseg=128):
    stft_band_features = {}
    for band, (fmin, fmax) in freq_bands.items():
        band_data = epochs.copy().filter(fmin, fmax, method='iir', iir_params=iir_params).get_data()
        stft_features = []
        for epoch in band_data:
            _, _, Zxx = stft(epoch, fs=fs, nperseg=nperseg)
            features = np.abs(Zxx).mean(axis=1).flatten()
            stft_features.append(features)
        stft_band_features[band] = np.array(stft_features)
    return stft_band_features


# Function to extract Hilbert features for each band.
def extract_hilbert_features(epochs, freq_bands):
    hilbert_band_features = {}
    for band, (fmin, fmax) in freq_bands.items():
        band_data = epochs.copy().filter(fmin, fmax, method='iir', iir_params=iir_params).get_data()
        hilbert_features = []
        for epoch in band_data:
            analytic_signal = hilbert(epoch)
            amplitude_envelope = np.abs(analytic_signal)
            features = amplitude_envelope.mean(axis=1)
            hilbert_features.append(features)
        hilbert_band_features[band] = np.array(hilbert_features)
    return hilbert_band_features


# Load the EEG data and run the loop for each subject.
subjects = ["s1", "s2", "s3", "s4", "s5", "s6"]
for subject in subjects: 
    EEG_data_path = f'./Subject Data/{subject.upper()}/'
    epochs = mne.read_epochs(EEG_data_path + f"{subject}_ar_epo.fif", preload=True)
    epochs = epochs[::4]
    
    # Label the data.
    english_epochs = epochs['11', '12', '13']
    dutch_epochs = epochs['21', '22', '23']
    Y_english = np.zeros(len(english_epochs))
    Y_dutch = np.ones(len(dutch_epochs))
    Y = np.concatenate([Y_english, Y_dutch])
    
    # Define the pipeline and parameter grid.
    pipeline = Pipeline([('scaler', StandardScaler()), ('svc', SVC())])
    param_grid = {
        'svc__kernel': ['rbf'],
        'svc__gamma': [0.01],
        'svc__C': [1]
    }

    # Method names and their corresponding feature extraction functions.
    methods = {
        "PSD": band_power_psd,
        "Wavelet": extract_wavelet_features,
        "STFT": extract_stft_features,
        "Hilbert": extract_hilbert_features
    }

    # Loop through each method and evaluate the performance.
    results = []

    for method_name, method_func in methods.items():
        print(f"Evaluating method: {method_name}")

        # Extract features using the method from both English and Dutch epochs.
        english_features = method_func(english_epochs, frequency_bands)
        dutch_features = method_func(dutch_epochs, frequency_bands)
        for band_name in frequency_bands.keys():
            print(f"Evaluating band: {band_name}")

            # Combine English and Dutch features for the current band into one.
            features_english = english_features[band_name]
            features_dutch = dutch_features[band_name]
            X = np.concatenate([features_english, features_dutch], axis=0)
            X = X.reshape(X.shape[0], -1) # to 2D.

            # Split the data, perform Grid Search.
            X_train, X_test, y_train, y_test = train_test_split(X, Y, test_size=0.25)
            grid_search = GridSearchCV(pipeline, param_grid, cv=3, scoring='accuracy', n_jobs=-1)
            grid_search.fit(X_train, y_train)

            # Evaluate the model.
            y_pred = grid_search.predict(X_test)
            report = classification_report(y_test, y_pred, output_dict=True)
            accuracy = accuracy_score(y_test, y_pred)

            results.append({
                "subject": subject,
                "method": method_name,
                "band": band_name,
                "accuracy": accuracy,
                "best_params": grid_search.best_params_,
                "classification_report": report
            })

            print(f"Method: {method_name}, Band: {band_name}")
            print(f"Accuracy: {accuracy}")
            print(classification_report(y_test, y_pred))
            print(f"Best Parameters: {grid_search.best_params_}")
            print("\n")

            
    # Convert results to DataFrame and save to CSV.
    results_df = pd.DataFrame(results)
    results_df.to_csv(f"{subject}_results_lang.csv", index=False)
    print(f"Results for subject {subject} saved to {subject}_results.csv")


# Emotional Valence (Dutch-only, within-subject)

In [None]:
import mne
import numpy as np
import pandas as pd
from sklearn.model_selection import GridSearchCV, train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score, classification_report
import matplotlib.pyplot as plt
import pywt
from scipy.signal import stft, hilbert


# Load the EEG data
subject_name = "s1"
EEG_data_path = f'./Subject Data/{subject_name.upper()}/'
epochs = mne.read_epochs(EEG_data_path + f"{subject_name}_ar_epo.fif", preload=True)
epochs = epochs[::4]
dutch_epochs = epochs['21', '22', '23']
iir_params = dict(order=4, ftype='butter')


# Define frequency bands.
frequency_bands = {
    "delta": (1, 4),
    "theta": (4, 8),
    "alpha": (8, 12),
    "beta": (12, 30),
    "gamma_low": (30, 48),
    "gamma_high": (48, 68)
}


# Function to extract power in each band using PSD.
def band_power_psd(epochs, freq_bands):
    psds, freqs = epochs.compute_psd().get_data(return_freqs=True)
    band_powers = {}
    for band, (fmin, fmax) in freq_bands.items():
        freq_mask = (freqs >= fmin) & (freqs <= fmax)
        band_powers[band] = np.mean(psds[:, :, freq_mask], axis=2)
    return band_powers


# Function to extract wavelet features for each band.
def extract_wavelet_features(epochs, freq_bands, wavelet='sym8'):
    wavelet_band_features = {}
    for band, (fmin, fmax) in freq_bands.items():
        band_data = epochs.copy().filter(fmin, fmax, method='iir', iir_params=iir_params).get_data()
        wavelet_features = []
        for epoch in band_data:
            coeffs = pywt.wavedec(epoch, wavelet, level=3)
            features = np.hstack([np.mean(c, axis=1) for c in coeffs])
            wavelet_features.append(features)
        wavelet_band_features[band] = np.array(wavelet_features)
    return wavelet_band_features


# Function to extract STFT features for each band.
def extract_stft_features(epochs, freq_bands, fs=256, nperseg=128):
    stft_band_features = {}
    for band, (fmin, fmax) in freq_bands.items():
        band_data = epochs.copy().filter(fmin, fmax, method='iir', iir_params=iir_params).get_data()
        stft_features = []
        for epoch in band_data:
            _, _, Zxx = stft(epoch, fs=fs, nperseg=nperseg)
            features = np.abs(Zxx).mean(axis=1).flatten()
            stft_features.append(features)
        stft_band_features[band] = np.array(stft_features)
    return stft_band_features


# Function to extract Hilbert features for each band.
def extract_hilbert_features(epochs, freq_bands):
    hilbert_band_features = {}
    for band, (fmin, fmax) in freq_bands.items():
        band_data = epochs.copy().filter(fmin, fmax, method='iir', iir_params=iir_params).get_data()
        hilbert_features = []
        for epoch in band_data:
            analytic_signal = hilbert(epoch)
            amplitude_envelope = np.abs(analytic_signal)
            features = amplitude_envelope.mean(axis=1)
            hilbert_features.append(features)
        hilbert_band_features[band] = np.array(hilbert_features)
    return hilbert_band_features


# Load the EEG data.
subjects = ["s1","s2","s3","s4","s5","s6"]
for subject in subjects: 
    EEG_data_path = f'./Subject Data/{subject.upper()}/'
    epochs = mne.read_epochs(EEG_data_path + f"{subject}_ar_epo.fif", preload=True)
    epochs = epochs[3::4]
    dutch_epochs = epochs['21', '22', '23']
    
    # Label the data (Dutch-only).
    dutch_epochs_21 = dutch_epochs['21'].get_data()
    dutch_epochs_22 = dutch_epochs['22'].get_data()
    dutch_epochs_23 = dutch_epochs['23'].get_data()
    Y = np.concatenate([np.zeros(len(dutch_epochs_21)), np.ones(len(dutch_epochs_22)), np.full(len(dutch_epochs_23), 2)])
    
    # Define the pipeline and parameter grid.
    pipeline = Pipeline([('scaler', StandardScaler()), ('svc', SVC())])
    param_grid = {
        'svc__kernel': ['rbf'],
        'svc__gamma': [0.01],
        'svc__C': [1]
    }
    
    # Method names and their corresponding feature extraction functions.
    methods = {
        "PSD": band_power_psd,
        "Wavelet": extract_wavelet_features,
        "STFT": extract_stft_features,
        "Hilbert": extract_hilbert_features
    }

    # Loop through each method and evaluate the performance.
    results = []

    for method_name, method_func in methods.items():
        print(f"Evaluating method: {method_name}")

        # Extract features using the current method in the loop.
        band_features = method_func(dutch_epochs, frequency_bands)

        for band_name, features in band_features.items():
            print(f"Evaluating band: {band_name}")
            X = features.reshape(features.shape[0], -1) # to 2D.

            # Split the data and perform the grid search.
            X_train, X_test, y_train, y_test = train_test_split(X, Y, test_size=0.25)
            grid_search = GridSearchCV(pipeline, param_grid, cv=3, scoring='accuracy', n_jobs=-1)
            grid_search.fit(X_train, y_train)

            # Evaluate the model.
            y_pred = grid_search.predict(X_test)
            report = classification_report(y_test, y_pred, output_dict=True)
            accuracy = accuracy_score(y_test, y_pred)

            results.append({
                "subject": subject,
                "method": method_name,
                "band": band_name,
                "accuracy": accuracy,
                "best_params": grid_search.best_params_,
                "classification_report": report
            })

            print(f"Method: {method_name}, Band: {band_name}")
            print(f"Accuracy: {accuracy}")
            print(classification_report(y_test, y_pred))
            print(f"Best Parameters: {grid_search.best_params_}")
            print("\n")

            
    # Convert results to DataFrame and save to CSV.
    results_df = pd.DataFrame(results)
    results_df.to_csv(f"{subject}_results_valence_nl.csv", index=False)
    print(f"Results for subject {subject} saved to {subject}_results.csv")

# Emotional Valence (English-only, within-subject)

In [None]:
import mne
import numpy as np
import pandas as pd
from sklearn.model_selection import GridSearchCV, train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score, classification_report
import matplotlib.pyplot as plt
import pywt
from scipy.signal import stft, hilbert


# Load the EEG data
subject_name = "s1"
EEG_data_path = f'./Subject Data/{subject_name.upper()}/'
epochs = mne.read_epochs(EEG_data_path + f"{subject_name}_ar_epo.fif", preload=True)
english_epochs = epochs['21', '22', '23']
iir_params = dict(order=4, ftype='butter')


# Define frequency bands.
frequency_bands = {
    "delta": (1, 4),
    "theta": (4, 8),
    "alpha": (8, 12),
    "beta": (12, 30),
    "gamma_low": (30, 48),
    "gamma_high": (48, 68)
}


# Function to extract power in each band using PSD.
def band_power_psd(epochs, freq_bands):
    psds, freqs = epochs.compute_psd().get_data(return_freqs=True)
    band_powers = {}
    for band, (fmin, fmax) in freq_bands.items():
        freq_mask = (freqs >= fmin) & (freqs <= fmax)
        band_powers[band] = np.mean(psds[:, :, freq_mask], axis=2)
    return band_powers


# Function to extract wavelet features for each band.
def extract_wavelet_features(epochs, freq_bands, wavelet='sym8'):
    wavelet_band_features = {}
    for band, (fmin, fmax) in freq_bands.items():
        band_data = epochs.copy().filter(fmin, fmax, method='iir', iir_params=iir_params).get_data()
        wavelet_features = []
        for epoch in band_data:
            coeffs = pywt.wavedec(epoch, wavelet, level=3)
            features = np.hstack([np.mean(c, axis=1) for c in coeffs])
            wavelet_features.append(features)
        wavelet_band_features[band] = np.array(wavelet_features)
    return wavelet_band_features


# Function to extract STFT features for each band.
def extract_stft_features(epochs, freq_bands, fs=256, nperseg=256):
    stft_band_features = {}
    for band, (fmin, fmax) in freq_bands.items():
        band_data = epochs.copy().filter(fmin, fmax, method='iir', iir_params=iir_params).get_data()
        stft_features = []
        for epoch in band_data:
            _, _, Zxx = stft(epoch, fs=fs, nperseg=nperseg)
            features = np.abs(Zxx).mean(axis=1).flatten()
            stft_features.append(features)
        stft_band_features[band] = np.array(stft_features)
    return stft_band_features


# Function to extract Hilbert features for each band.
def extract_hilbert_features(epochs, freq_bands):
    hilbert_band_features = {}
    for band, (fmin, fmax) in freq_bands.items():
        band_data = epochs.copy().filter(fmin, fmax, method='iir', iir_params=iir_params).get_data()
        hilbert_features = []
        for epoch in band_data:
            analytic_signal = hilbert(epoch)
            amplitude_envelope = np.abs(analytic_signal)
            features = amplitude_envelope.mean(axis=1)
            hilbert_features.append(features)
        hilbert_band_features[band] = np.array(hilbert_features)
    return hilbert_band_features


# Load the EEG data.
subjects = ["s1","s2","s3","s4","s5","s6"]
for subject in subjects: 
    EEG_data_path = f'./Subject Data/{subject.upper()}/'
    epochs = mne.read_epochs(EEG_data_path + f"{subject}_ar_epo.fif", preload=True)
    epochs = epochs[3::4]
    english_epochs = epochs['11', '12', '13']
    
    # Label the data (Dutch-only).
    english_epochs_11 = english_epochs['11'].get_data()
    english_epochs_12 = english_epochs['12'].get_data()
    english_epochs_13 = english_epochs['13'].get_data()
    Y = np.concatenate([np.zeros(len(english_epochs_11)), np.ones(len(english_epochs_12)), np.full(len(english_epochs_13), 2)])
    
    # Define the pipeline and parameter grid.
    pipeline = Pipeline([('scaler', StandardScaler()), ('svc', SVC())])
    param_grid = {
        'svc__kernel': ['rbf'],
        'svc__gamma': [0.01],
        'svc__C': [1]
    }
    
    # Method names and their corresponding feature extraction functions.
    methods = {
        "PSD": band_power_psd,
        "Wavelet": extract_wavelet_features,
        "STFT": extract_stft_features,
        "Hilbert": extract_hilbert_features
    }
    
    # Loop through each method and evaluate the performance.
    results = []

    for method_name, method_func in methods.items():
        print(f"Evaluating method: {method_name}")

        # Extract features using the current method in the loop.
        band_features = method_func(english_epochs, frequency_bands)

        for band_name, features in band_features.items():
            print(f"Evaluating band: {band_name}")
            X = features.reshape(features.shape[0], -1) # to 2D.

            # Split the data and perform the grid search.
            X_train, X_test, y_train, y_test = train_test_split(X, Y, test_size=0.25)
            grid_search = GridSearchCV(pipeline, param_grid, cv=3, scoring='accuracy', n_jobs=-1)
            grid_search.fit(X_train, y_train)
            
            # Evaluate the model.
            y_pred = grid_search.predict(X_test)
            report = classification_report(y_test, y_pred, output_dict=True)
            accuracy = accuracy_score(y_test, y_pred)

            results.append({
                "subject": subject,
                "method": method_name,
                "band": band_name,
                "accuracy": accuracy,
                "best_params": grid_search.best_params_,
                "classification_report": report
            })

            print(f"Method: {method_name}, Band: {band_name}")
            print(f"Accuracy: {accuracy}")
            print(classification_report(y_test, y_pred))
            print(f"Best Parameters: {grid_search.best_params_}")
            print("\n")

            
    # Convert results to DataFrame and save to CSV.
    results_df = pd.DataFrame(results)
    results_df.to_csv(f"{subject}_results_valence_en.csv", index=False)
    print(f"Results for subject {subject} saved to {subject}_results.csv")