## Setup file

Import files and set up custom functions

In [1]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import glob
from scipy.signal import find_peaks
import pingouin as pg

import mne

# Function to update event codes
def update_events(raw):
    events, labels = mne.events_from_annotations(raw, verbose='Warning')

    target_onset_trigger = labels['Comment/Time 0/']
    trigger_1 = labels['Stimulus/S  1']
    trigger_2 = labels['Stimulus/S  2']
    trigger_3 = labels['Stimulus/S  3']
    trigger_5 = labels['Stimulus/S  5']
    trigger_6 = labels['Stimulus/S  6']
    trigger_7 = labels['Stimulus/S  7']

    num_trig = [trigger_1, trigger_2, trigger_3, trigger_5, trigger_6, trigger_7]

    for i in range(len(events)):
        if events[i, 2] == target_onset_trigger:
            j = i
            while True:
                j += 1
                if events[j, 2] in num_trig:
                    if events[j, 2] in [trigger_1, trigger_7]:
                        events[i, 2] = 300
                    elif events[j, 2] in [trigger_2, trigger_6]:
                        events[i, 2] = 200
                    elif events[j, 2] in [trigger_3, trigger_5]:
                        events[i, 2] = 100
                    break

    labels['Stimulus/target_close'] = 100
    labels['Stimulus/target_mid'] = 200
    labels['Stimulus/target_far'] = 300

    return events, labels



    # Function to detect local peaks using find_peaks
def detect_peaks(evoked, tmin_n1=None, tmax_n1=None, tmin_p2=None, tmax_p2=None, tmin_p3b=None, tmax_p3b=None, peak_to_detect=None):
    times = evoked.times
    data = evoked.data[0, :]
    
    if peak_to_detect == 'N1':
        # Detect local minima (N1) within the specified time range
        idx_n1_window = np.where((times >= tmin_n1) & (times <= tmax_n1))[0]
        data_n1_window = data[idx_n1_window]
        n1_peaks, _ = find_peaks(-data_n1_window)  # Negate data to find minima
        if n1_peaks.size > 0:
            n1_peak_idx = idx_n1_window[n1_peaks[0]]
            n1_latency = times[n1_peak_idx]
            n1_amplitude = data[n1_peak_idx]
        else:
            n1_latency, n1_amplitude = np.nan, np.nan

        return n1_latency, n1_amplitude

    elif peak_to_detect == 'P2':
        # Detect local maxima (P2) within the specified time range
        idx_p2_window = np.where((times >= tmin_p2) & (times <= tmax_p2))[0]
        data_p2_window = data[idx_p2_window]
        p2_peaks, _ = find_peaks(data_p2_window)
        if p2_peaks.size > 0:
            p2_peak_idx = idx_p2_window[p2_peaks[0]]
            p2_latency = times[p2_peak_idx]
            p2_amplitude = data[p2_peak_idx]
        else:
            p2_latency, p2_amplitude = np.nan, np.nan

        return p2_latency, p2_amplitude

    else:
        # Detect both N1 and P2 peaks
        # Detect local minima (N1) within the specified time range
        idx_n1_window = np.where((times >= tmin_n1) & (times <= tmax_n1))[0]
        data_n1_window = data[idx_n1_window]
        n1_peaks, _ = find_peaks(-data_n1_window)  # Negate data to find minima
        if n1_peaks.size > 0:
            n1_peak_idx = idx_n1_window[n1_peaks[0]]
            n1_latency = times[n1_peak_idx]
            n1_amplitude = data[n1_peak_idx]
        else:
            n1_latency, n1_amplitude = np.nan, np.nan

        # Detect local maxima (P2) within the specified time range
        idx_p2_window = np.where((times >= tmin_p2) & (times <= tmax_p2))[0]
        data_p2_window = data[idx_p2_window]
        p2_peaks, _ = find_peaks(data_p2_window)
        if p2_peaks.size > 0:
            p2_peak_idx = idx_p2_window[p2_peaks[0]]
            p2_latency = times[p2_peak_idx]
            p2_amplitude = data[p2_peak_idx]
        else:
            p2_latency, p2_amplitude = np.nan, np.nan

        # Detect local maxima (P2) within the specified time range
        idx_p3b_window = np.where((times >= tmin_p3b) & (times <= tmax_p3b))[0]
        data_p3b_window = data[idx_p3b_window]
        p3b_peaks, _ = find_peaks(data_p3b_window)
        if p3b_peaks.size > 0:
            p3b_peak_idx = idx_p3b_window[p3b_peaks[0]]
            p3b_latency = times[p3b_peak_idx]
            p3b_amplitude = data[p3b_peak_idx]
        else:
            p3b_latency, p3b_amplitude = np.nan, np.nan
        
        return n1_latency, n1_amplitude, p2_latency, p2_amplitude, p3b_latency, p3b_amplitude





## Extract Grand averages from ppts

In [None]:
# Load preprocessed EEG data for each subject and calculate grand averages
subject_files = glob.glob('clean_seg_updated/*.vhdr')  # File path for EEG data
skip = []  # List of files/participants to skip
run = []   # Optional list of participants to include

# Initialize lists to store participant IDs and evoked responses
PID = []
overall_evoked = []
close_evoked = []
mid_evoked = []
far_evoked = []
peaks_list = []
mean_amp_list = []
evoked_df = pd.DataFrame()

# Loop through each EEG file (participant)
for file in subject_files:

    # Skip files in the 'skip' list
    if file in skip:
        print(f'Skipping {file} as it was in the skip subject list')
        continue

    else:
        # Extract participant ID from the filename and store it
        id = file.replace('clean_seg_updated/', "").replace('_seg_corrected_raw.vhdr', "")
        PID.append(id)
        print(f'Processing participant: {id}')

        # Try reading the raw data up to 3 times in case of errors
        try_count = 0
        while try_count < 3:
            try:
                raw = mne.io.read_raw_brainvision(file, preload=True, verbose='Warning')
                try_count += 1
            except:
                print('Unable to read bva file, trying again')

        # Set montage (electrode positions) for the EEG data
        events, labels = update_events(raw)
        montage = mne.channels.make_standard_montage("easycap-M1")
        raw = raw.set_montage(montage, verbose='Warning')

        # Define event dictionary for different stimulus conditions
        event_dict = {
            'target_close': labels['Stimulus/target_close'], 
            'target_mid': labels['Stimulus/target_mid'], 
            'target_far': labels['Stimulus/target_far']
        }

        # Create epochs (segments) for the EEG data based on events
        epochs = mne.Epochs(raw, events, event_id=event_dict, tmin=-0.2, tmax=0.8, preload=True, baseline=(-0.2, 0), verbose='Warning')
        print(len(epochs))  # Print the number of epochs

        # Append the average evoked response for overall and each condition
        overall_evoked.append(epochs.average())
        close_evoked.append(epochs['target_close'].average())
        mid_evoked.append(epochs['target_mid'].average())
        far_evoked.append(epochs['target_far'].average())

        # Store evoked data in a dictionary for later use
        evoked_dict = dict(PID=id, close=epochs['target_close'], mid=epochs['target_mid'], far=epochs['target_far'])

# Calculate grand averages for overall and each condition
overall_grand_average = mne.grand_average(overall_evoked)
close_grand_average = mne.grand_average(close_evoked)
mid_grand_average = mne.grand_average(mid_evoked)
far_grand_average = mne.grand_average(far_evoked)


## ERP and Topo maps joint

In [None]:
overall_joint = overall_grand_average.plot_joint()

In [None]:
topo_grand_average = overall_grand_average.copy()
topo_grand_average.drop_channels(['Fp1', 'Fp2'])
overall_topo = topo_grand_average.plot_topomap([-0.2, 0, 0.15, 0.25, 0.375, 0.5])


### P3 and P4

In [None]:
# Define time windows for N1 and P2 components
tmin_n1 = 0.12  # N1 start time
tmax_n1 = 0.18  # N1 end time
tmin_p2 = 0.18  # P2 start time
tmax_p2 = 0.25  # P2 end time

# Enable inline plotting for Jupyter notebooks
%matplotlib inline

# Copy the grand average data for channels P3 and P4
p3_grand_average = overall_grand_average.copy()
p4_grand_average = overall_grand_average.copy()

# Extract time and amplitude data for P3 and P4 channels
p3_times = p3_grand_average.pick(['P3']).times
p3_data = p3_grand_average.pick(['P3']).data[0]
p4_times = p4_grand_average.pick(['P4']).times
p4_data = p4_grand_average.pick(['P4']).data[0]

# Plot the grand average waveforms for P3 and P4
plt.figure(figsize=(10, 5))
plt.plot(p3_times, p3_data, label='P3 Grand Average')
plt.plot(p4_times, p4_data, label='P4 Grand Average')
plt.xlabel('Time (s)')
plt.ylabel('Amplitude (µV)')
plt.legend()

# Highlight the N1 and P2 regions with shaded areas
plt.axvspan(tmin_n1, tmax_n1, color='blue', alpha=0.1, label='N1 Region')
plt.axvspan(tmin_p2, tmax_p2, color='red', alpha=0.1, label='P2 Region')

# Display the plot
plt.show()


In [None]:
# Define time windows for N1 and P2 components
tmin_n1 = 0.13  # N1 start time
tmax_n1 = 0.19  # N1 end time
tmin_p2 = 0.19  # P2 start time
tmax_p2 = 0.25  # P2 end time

# Enable inline plotting for Jupyter notebooks
%matplotlib inline

# Copy the grand average data for channels P7 and P8
p7_grand_average = overall_grand_average.copy()
p8_grand_average = overall_grand_average.copy()

# Extract time and amplitude data for P7 and P8 channels
p7_times = p7_grand_average.pick(['P7']).times
p7_data = p7_grand_average.pick(['P7']).data[0]
p8_times = p8_grand_average.pick(['P8']).times
p8_data = p8_grand_average.pick(['P8']).data[0]

# Plot the grand average waveforms for P7 and P8
plt.figure(figsize=(10, 5))
plt.plot(p7_times, p7_data, label='P7 Grand Average')
plt.plot(p8_times, p8_data, label='P8 Grand Average')
plt.xlabel('Time (s)')
plt.ylabel('Amplitude (µV)')
plt.legend()

# Highlight the N1 and P2 regions with shaded areas
plt.axvspan(tmin_n1, tmax_n1, color='blue', alpha=0.1, label='N1 Region')
plt.axvspan(tmin_p2, tmax_p2, color='red', alpha=0.1, label='P2 Region')

# Display the plot
plt.show()


## Global Field Power plots

In [None]:
overall_gfp = overall_grand_average.plot(gfp=True, spatial_colors=True, ylim=dict(eeg=[-12, 12]))

## Evoked plots for selcted elctrodes

In [None]:
left_picks = ["P3", "P7", "O1"]
right_picks = ["P4", "P8", "O2"]

overall_left_plot = overall_grand_average.plot(picks=left_picks)
overall_right_plot = overall_grand_average.plot(picks=right_picks)

## Detect peaks using a local minima/maxima from scipy

In [None]:
# Get the list of EEG files to process
subject_files = glob.glob('clean_seg_updated/*.vhdr')  # Update with actual file path
skip = []  # List of files/participants to skip
run = []   # Optional list of participants to include

# Define time windows for N1, P2, and P3b components
tmin_n1 = 0.12
tmax_n1 = 0.18
tmin_p2 = 0.18
tmax_p2 = 0.25
tmin_p3b = 0.25
tmax_p3b = 0.5

peaks_list = []  # List to store detected peak data

# Loop through each EEG file (participant)
for file in subject_files:

    # Skip files in the 'skip' list
    if file in skip:
        print(f'Skipping {file} as it was in the skip subject list')
        continue

    else:
        # Extract participant ID from the filename and store it
        id = file.replace('clean_seg_updated/', "").replace('_seg_corrected_raw.vhdr', "")
        PID.append(id)
        print(f'Processing participant: {id}')

        # Try reading the raw data up to 3 times in case of errors
        try_count = 0
        while try_count < 3:
            try:
                raw = mne.io.read_raw_brainvision(file, preload=True, verbose='Warning')
                try_count += 1
            except:
                print('Unable to read bva file, trying again')

        # Update the events and labels from the raw EEG data
        events, labels = update_events(raw)

        # Set the montage (electrode positions) for the EEG data
        montage = mne.channels.make_standard_montage("easycap-M1")
        raw.set_montage(montage, verbose='Warning')

        # Create an event dictionary for different stimulus conditions
        event_dict = {
            'target_close': labels['Stimulus/target_close'], 
            'target_mid': labels['Stimulus/target_mid'], 
            'target_far': labels['Stimulus/target_far']
        }

        # Create epochs (segments) for the EEG data based on events
        epochs = mne.Epochs(raw, events, event_id=event_dict, tmin=-0.2, tmax=0.7, preload=True, baseline=(None, 0), verbose='Warning')

        # Store evoked responses for each condition (close, mid, far)
        evoked_dict = dict(close=epochs['target_close'], mid=epochs['target_mid'], far=epochs['target_far'])

        # Loop through conditions and channels to detect peaks
        for condition, evoked in evoked_dict.items():
            for channel in ['P3', 'P4', 'P7', 'P8']:

                # Define time windows based on channel (different for P3/P4 vs P7/P8)
                if channel in ['P3', 'P4']:
                    tmin_n1 = 0.12
                    tmax_n1 = 0.18
                    tmin_p2 = 0.18
                    tmax_p2 = 0.25
                    tmin_p3b = 0.25
                    tmax_p3b = 0.5
                else:
                    tmin_n1 = 0.13
                    tmax_n1 = 0.19
                    tmin_p2 = 0.19
                    tmax_p2 = 0.25  
                    tmin_p3b = 0.25
                    tmax_p3b = 0.5

                # Detect peaks (N1, P2, P3b) in the evoked response for the channel
                evoked_channel = evoked.copy().average().pick([channel])
                n1_latency, n1_amplitude, p2_latency, p2_amplitude, p3b_latency, p3b_amplitude = detect_peaks(
                    evoked_channel, tmin_n1=tmin_n1, tmax_n1=tmax_n1, tmin_p2=tmin_p2, tmax_p2=tmax_p2, tmin_p3b=tmin_p3b, tmax_p3b=tmax_p3b)

                # Append detected peaks to the list
                peaks_list.append({
                    'PID': id,
                    'Condition': condition,
                    'Channel': channel,
                    'N1_Latency': n1_latency,
                    'N1_Amplitude': n1_amplitude,
                    'P2_Latency': p2_latency,
                    'P2_Amplitude': p2_amplitude,
                    'P3b_Latency': p3b_latency,
                    'P3b_Amplitude': p3b_amplitude
                })

# Convert the list of peaks into a DataFrame and save it to a CSV file
original_peaks_df = pd.DataFrame(peaks_list)
original_peaks_df.to_csv('all_subjects_peaks.csv')


## Run ANOVA on peaks for P3 and P4

In [None]:

peaks_df = original_peaks_df.copy()
peaks_df = peaks_df.loc[~peaks_df['PID'].isin(['A016', 'A025', 'A013', 'A014'])]
peaks_df = peaks_df.loc[peaks_df['Channel'].isin(['P3','P4'])]

# Add Hemisphere column based on Channel
peaks_df['Hemisphere'] = peaks_df['Channel'].map({'P3': 'Left', 'P4': 'Right'})
# means_df['Hemisphere'] = means_df['Channel'].map({'P3': 'Left', 'P4': 'Right'})

# Perform ANOVA for N1 Latency
anova_n1_latency = pg.rm_anova(data=peaks_df, dv='N1_Latency', within=['Condition', 'Hemisphere'], subject='PID')
print("ANOVA Results for N1 Latency:")
print(anova_n1_latency)

# Perform ANOVA for N1 Amplitude
anova_n1_amplitude = pg.rm_anova(dv='N1_Amplitude', within=['Condition', 'Hemisphere'], subject='PID', data=peaks_df)
print("ANOVA Results for N1 Amplitude:")
print(anova_n1_amplitude)

# Perform ANOVA for P2 Latency
anova_p2_latency = pg.rm_anova(dv='P2_Latency', within=['Condition', 'Hemisphere'], subject='PID', data=peaks_df)
print("ANOVA Results for P2 Latency:")
print(anova_p2_latency)

# Perform ANOVA for P2 Amplitude
anova_p2_amplitude = pg.rm_anova(dv='P2_Amplitude', within=['Condition', 'Hemisphere'], subject='PID', data=peaks_df)
print("ANOVA Results for P2 Amplitude:")
print(anova_p2_amplitude)

# Perform post-hoc tests using pairwise comparisons
post_hoc_results_N1 = pg.pairwise_ttests(dv='N1_Amplitude', 
                                      within=['Condition', 'Channel'], 
                                      subject='PID', 
                                      data=peaks_df, 
                                      padjust='bonferroni')
print(post_hoc_results_N1)

# Perform post-hoc tests using pairwise comparisons
post_hoc_results_N1_lat = pg.pairwise_ttests(dv='N1_Latency', 
                                      within=['Condition', 'Channel'], 
                                      subject='PID', 
                                      data=peaks_df, 
                                      padjust='bonferroni')
print(post_hoc_results_N1_lat)

# Perform post-hoc tests using pairwise comparisons
post_hoc_results_P2 = pg.pairwise_ttests(dv='P2_Amplitude', 
                                      within=['Condition', 'Channel'], 
                                      subject='PID', 
                                      data=peaks_df, 
                                      padjust='bonferroni')
print(post_hoc_results_P2)

anova_n1_latency['Measure'] = 'N1_lat'
anova_p2_latency['Measure'] = 'P2_lat'

anova_n1_amplitude['Measure'] = 'N1_amp'
anova_p2_amplitude['Measure'] = 'P2_amp'

anova_output = pd.concat([anova_n1_latency, anova_n1_amplitude, anova_p2_latency, anova_p2_amplitude])

first_column = anova_output.pop('Measure')
anova_output.insert(0, 'Measure', first_column) 

# anova_output.to_csv('anova_output.csv')



