In [None]:
%reset -f

In [3]:
# ~~~~~~~~~~~~~~ Libraries
import sys, os
import mne # Python package for processing and analyzing electrophysiological data
import numpy as np
from glob import glob # look for all the pathnames matching a specified pattern according to the rules
import matplotlib.pyplot as plt
from mne.preprocessing import ICA # ICA (Independent Component Analysis) algorithm, which is for artifact removal
import json
import re

from sklearn.model_selection import StratifiedKFold
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score
from scipy.stats import permutation_test
import matplotlib.pyplot as plt
from itertools import combinations

import pandas as pd
import seaborn as sns
import pickle

# Average time series for one channel

In [None]:

# Set the working directory
path = "/u/kazma/MINT/data/interim/visual"
sub_folders = [f for f in os.listdir(path) if os.path.isdir(os.path.join(path, f))]
sub_folders_sorted = sorted(sub_folders, key=lambda x: int(re.search(r'\d+', x).group())) # Sort the folders based on the numeric part after "sub-"



for sub_loop in sub_folders_sorted:

    # subject folder 
    sub_filename = os.path.join(path, sub_loop, 'RDM_epochs-epo.fif')

    # Load epochs
    epochs = mne.read_epochs(sub_filename, preload=True)
    print(f"{sub_loop} is analyzed")

    eeg_channels = epochs.pick_types(eeg=True).ch_names
    print("EEG Channels:", eeg_channels)
    print("Number of EEG Channels:", len(eeg_channels))
    
    # Get the condition names
    condition_names = list(epochs.event_id.keys())
    evoked_list = []

    # Compute the averaged epoch for each condition and each channel
    for condition in condition_names:
        evoked = epochs[condition].average()  # Average the epochs for the condition
        evoked_list.append(evoked)

    # Store 6 vectors, representing averaged vector for each condition
    averaged_data = {}

    # Specify the channel to plot (e.g., channel name 'Cz')
    channel_name = 'Fz'  # Replace with the channel of your choice

    # Figure
    plt.figure(figsize=(8, 4))

    # Define colors for the conditions
    colors = plt.cm.tab10(np.linspace(0, 1, 6))

    for i, evoked in enumerate(evoked_list):
        # Extract data for the specified channel
        channel_index = evoked.ch_names.index(channel_name)  # Get index of the channel
        channel_data = evoked.data[channel_index]  # Get the data for the specific channel
        times = evoked.times * 1000  # Convert time to milliseconds

        # Store the data
        averaged_data[condition_names[i]] = channel_data
        
        # Plot the channel data
        plt.plot(times, channel_data, linewidth=3, alpha=0.5) 
        plt.plot(times, channel_data, label=condition_names[i], linewidth=2)  # Plot time series

    # Customize the plot
    plt.title('Average Time Series for Channel: ' + channel_name)
    plt.xlabel('Time (ms)')
    plt.ylabel('Amplitude (µV)')
    plt.axhline(0, color='black', linestyle='--', linewidth=0.5)  # Add a horizontal line at y=0
    plt.axvline(0, color='black', linestyle='-', linewidth=1)  # Add a vertical line at t=0
    plt.xlim(-20, 900)  # Set x-axis limits
    # plt.ylim(-2.0e-8, 2.0e-8)  # Set y-axis limits
    # plt.legend(ncol=3, loc='lower left', framealpha=1)

    # Set the box line color to gray
    ax = plt.gca()
    ax.spines['top'].set_color('gray')
    ax.spines['right'].set_color('gray')
    ax.spines['bottom'].set_color('gray')
    ax.spines['left'].set_color('gray')

    plt.tight_layout()
    plt.show()



# RDM

In [2]:
# Set the working directory
path = "/u/kazma/MINT/data/interim/visual"
sub_folders = [f for f in os.listdir(path) if os.path.isdir(os.path.join(path, f))]
sub_folders_sorted = sorted(sub_folders, key=lambda x: int(re.search(r'\d+', x).group())) # Sort the folders based on the numeric part after "sub-"


In [None]:
# SUB_LOOP
# for subject in sub_folders_sorted

# subject folder 
sub_filename = os.path.join(path, 'sub-01_ses-01', 'RDM_epochs-epo.fif')

# Load epochs
epochs = mne.read_epochs(sub_filename, preload=True)

# Get the information about the data
conditions = list(epochs.event_id.keys())
n_conditions = len(conditions)
# n_trials = len(epochs)
n_channels = epochs.get_data().shape[1]


n_samples = epochs.get_data().shape[2]
min_time = epochs.times[0]*1000   # First time point in milli seconds
max_time = epochs.times[-1]*1000    # Last time point in milli seconds


# number of samples for each window
window_size = 5 # 1 sample = 2ms, 5 samples = 10 ms

df_dict = {}
for condition in conditions:
    x = epochs[condition]
    x = x.get_data()
    x = x[:,:,0:5]
    x = x.reshape(x.shape[0], -1)
    df_dict[condition] = x


# Define the order for conditions
condition_order = {'singledot': 0, 'totaldot': 1, 'circum': 2}
# Sort dictionary by first number and then by condition order
sorted_condition_dict = dict(
    sorted(
        df_dict.items(),
        key=lambda item: (
            int(item[0].split('_')[0]),
            condition_order.get(item[0].split('_')[2], 99),
            int(item[0].split('_')[-1].split('.')[0])  # Sort by condition order, 99 as default for unmatched
        )
    )
)

pairwise_dissimilar_dict = {}

# CONDITION_LOOP
for i in range(n_conditions):
    for j in range(i + 1, n_conditions):
        
        cond1 = list(sorted_condition_dict.keys())[i]
        cond2 = list(sorted_condition_dict.keys())[j]
        data_cond1 = sorted_condition_dict[cond1]
        data_cond2 = sorted_condition_dict[cond2]
        # Correlation
        corr_matrix = np.corrcoef(data_cond1, data_cond2)
        pairwise_dissimilar_dict[(cond1, cond2)] = 1 - np.abs(corr_matrix[0, 1])
        print(f"Correlation for {cond1} vs {cond2} in time points {window_size}: {np.abs(corr_matrix[0, 1]):.2f}")

In [None]:



""" # Window loop start
for start in range(0, n_samples, window_size): # Loop over the epoch in steps of `window_size` to extract each 5-sample window
    # Check if there are enough samples left for a full window
    if start + window_size <= n_samples:

        # Initialize the decoding dictionary with each condition containing a list of flattened sample windows
        decoding_dict = {}

        for condition in conditions:
            x = epochs[condition] 
            x = x.get_data() # the shape: (n_trials, n_channels, n_times)
            x = x[:,:,start:start + window_size] # 5 sample is a window size
            x = x.reshape(x.shape[0], -1)
            decoding_dict[condition] = x


        # Dictionary to store decoding results for each pair
        pairwise_decoding_accuracies = {}

        for i in range(n_conditions):
            for j in range(i + 1, n_conditions):

                cond1 = conditions[i]
                cond2 = conditions[j]
                    
                # Classification
                data_cond1 = decoding_dict[cond1]
                data_cond2 = decoding_dict[cond2]
                data = np.vstack((data_cond1, data_cond2))
                labels = np.hstack((np.zeros(len(data_cond1)), np.ones(len(data_cond2))))

                # Set cross-validation
                cv = StratifiedKFold(n_splits=n_splits) # StratifiedKFold ensures that each fold has a proportional representation of both classes, so each fold maintains a 50:50 balance of numerosity 1 and numerosity 2 trials.

                # Time-resolved decoding storage
                decoding_accuracies = []

                # Cross-validation for this time point
                for train_idx, test_idx in cv.split(data, labels):
                    X_train, X_test = data[train_idx], data[test_idx] # the EEG data for training and testing.
                    y_train, y_test = labels[train_idx], labels[test_idx] # corresponding labels for the training and testing data.
                    
                    clf = SVC(kernel='linear') # initializes a support vector machine (SVM) classifier with a linear kernel.
                    clf.fit(X_train, y_train) # trains the classifier on the training data (X_train) with labels (y_train).
                    y_pred = clf.predict(X_test) 
                    decoding_accuracies.append(accuracy_score(y_test, y_pred)) # calculates the classification accuracy by comparing the true labels (y_test) with the predicted labels (y_pred).

                avg_accuracy = np.mean(decoding_accuracies)
                # Store the average accuracy for the condition pair
                pairwise_decoding_accuracies[(cond1, cond2)] = avg_accuracy

                print(f"Average accuracy for {cond1} vs {cond2} in time points {start + window_size}: {avg_accuracy:.2f}")


    # Average accuracy for this time point across CV folds
    all_decoding_accuracy.append(pairwise_decoding_accuracies)
    print(f"Time point {start + window_size} is done") """