In [3]:
# importing all the required Libraries
import mne
from mne_bids import BIDSPath, read_raw_bids
from mne import Epochs, find_events
from mne.preprocessing import ICA
from mne_icalabel import label_components
from mne.channels import make_standard_montage
from mne import find_events
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

In [4]:
#loading the BIDS data 
def loadData(sunId):
    bids_root = "/Users/sammy/Documents/SP EEG/pro_bids"
    #specify id in individual notebook
    subject_id = sunId 
    bids_path = BIDSPath(subject=subject_id,task="jacobsen",datatype='eeg', suffix='eeg',root=bids_root)
    raw = read_raw_bids(bids_path)
    raw.load_data()
    raw.set_montage('standard_1020',match_case=False, on_missing='ignore')
    #dropping because these channels are causing troubles in ICA.
    raw.drop_channels(['EXG1', 'EXG2', 'EXG3', 'EXG4','EXG5', 'EXG6', 'EXG7', 'EXG8'])
    return raw

In [5]:
#Filtering
def filterData(raw):
    #filtered at highpass 0.05Hz and Lowpass 50Hz
    raw.filter(l_freq = 0.05 , h_freq = 50)
    return raw

In [6]:
#channel information and channel names
def channelInfo(raw):
    print(raw.info['ch_names'])  # Lists all channel names
    print(raw.get_channel_types())  # Lists the types of channels

In [7]:
#for getting events
def eventName(raw):
    # Specify stimulus channel
    events = find_events(raw, stim_channel='Status', initial_event = True)
    print(events)

In [8]:
#epoching the data
def getEpochs(raw):
    events = find_events(raw, stim_channel='Status')  #stimuli channel
    event_id = {
        'Symmetrical': 1,  
        'Asymmetrical': 3
    }
    #epoching from -0.2s to 1s.
    epochs = Epochs(raw, events, event_id, tmin=-0.2, tmax=1.0, preload=True)
    return epochs

In [9]:
#applying baseline correction
def baselineCorrection(epochs):
    #from -0.2s to 0s
    epochs.apply_baseline(baseline=(-0.2,0))
    return epochs

In [10]:
#plotting epochs with events
def plotEpochs(epochs):
    symmetrical_epochs = epochs['Symmetrical']
    # Plotting symmetrical stimuli event
    symmetrical_epochs.plot()
    asymmetrical_epochs = epochs['Asymmetrical']
    # Plotting asymmetrical stimuli event
    asymmetrical_epochs.plot()

In [11]:
#ICA 
def performICA(raw):
    #intialising ICA
    ica = ICA(n_components=20, random_state=42, method='fastica')
    ica.fit(raw)
    
    # Using mne_icalabel to classify components
    ic_labels = label_components(raw, ica, method='iclabel')

    # Printing the classifications
    # print("ICLabel classifications:")
    # print(ic_labels)
    
    # Identify Eye, muscle artifacts and channel noise
    Artifact_comp = [i for i, label in enumerate(ic_labels['labels']) if label == 'eye blink' or 'channel noise' or 'muscle artifact']
    # print(f"Artifacts identified: {Artifact_comp}")
    
    # Excluding artifacts related components
    ica.exclude = Artifact_comp
    
    # Apply ICA cleaning to the raw data and plotting components
    raw_clean = ica.apply(raw)
    # ica.plot_components()
    x = range(0,20)
    # ica.plot_properties(raw,picks=x,psd_args={'fmax': 35.},reject=None);
    #labels of all components
    # print(ic_labels["labels"])

In [12]:
#plotting EEG with events as symmetrical or asymmetrical / Random patters 
def plotEvent(raw):
    event_id = {
        'Symmetrical': 1,  
        'Asymmetrical': 3
    }
    events = find_events(raw, stim_channel='Status')  # Specify your stim channel
    raw.plot(events=events, event_id=event_id)

In [13]:
#plotting EEG
def plotEEG(raw, start_time, end_time):
    data = raw.get_data()  # Shape: (n_channels, n_times)
    sfreq = raw.info['sfreq']  
    times = np.arange(data.shape[1]) / sfreq  # Time vector
    channel_names = raw.info['ch_names']  # List of channel names

    start, end = start_time, end_time  # Seconds
    start_idx, end_idx = int(start * sfreq), int(end * sfreq)

    for i, channel_name in enumerate(channel_names):
        plt.figure(figsize=(10, 4))
        plt.plot(times[start_idx:end_idx], data[i, start_idx:end_idx], label=channel_name, color='blue', linewidth=0.8)
        plt.title(f"EEG Signal - {channel_name} (Time {start}-{end}s)")
        plt.xlabel("Time (s)")
        plt.ylabel("Amplitude (µV)")
        plt.grid(True)
        plt.legend(loc="upper right")
        plt.tight_layout()
        plt.show()

In [14]:
#plotting power spectrum density
def plotPSD(raw):
    raw.plot_psd(fmin=0.05, fmax=50, spatial_colors=True, show=True);

In [15]:
#computing evoked resonse for event type and then averaging it
def calcEvoked(epochs):
    # for symmetrical
    evoked_symm = epochs['Symmetrical'].average()
    # print("PLOT FOR SYMMETRICAL STIMULI")
    # evoked_symm.plot()  # Basic waveform plot
    # evoked_symm.plot_joint();  # Enhanced plot with topography
    # for asymmetrical/random stimuli
    evoked_asymm = epochs['Asymmetrical'].average()
    # print("PLOT FOR ASYMMETRICAL / RANDOM STIMULI")
    # evoked_asymm.plot()  # Basic waveform plot
    # evoked_asymm.plot_joint();  # Enhanced plot with topography
    return (evoked_symm, evoked_asymm)

In [16]:
#finding peak and latency for P1 and N1
def findPeak(evoked):
    # Find P1 (positive peak) within 80-130 ms
    p1, p1_latency, p1_amplitude = evoked.get_peak(tmin=0.08, tmax=0.13, mode='pos', return_amplitude=True)
    # Find N1 (negative peak) within 140-200 ms
    n1, n1_latency, n1_amplitude = evoked.get_peak(tmin=0.14, tmax=0.2, mode='neg', return_amplitude=True)
    print(f"P1: Latency={p1_latency*1000:.1f} ms, Amplitude={p1_amplitude*1000000:.3f} µV")
    print(f"N1: Latency={n1_latency*1000:.1f} ms, Amplitude={n1_amplitude*1000000:.3f} µV")
    print("P1 at : ", p1, " ; N1 at : ", n1)
    evoked.plot_topomap(times=[p1_latency, n1_latency])
    P1 = [p1, p1_latency, p1_amplitude]
    N1 = [n1, n1_latency, n1_amplitude]
    return [P1,N1]

In [17]:
#plotting PO8 electrode events - Depicting P1 and N1
def drawEvokedEvents(evoked_symm, evoked_asymm, pick):
    mne.viz.plot_compare_evokeds([evoked_symm, evoked_asymm], picks=pick)

In [18]:
def getPicks(l1, l2):
    #to pick channels for P1 and N1
    lst = []
    lst.append(l1[0][0])
    lst.append(l1[1][0])
    lst.append(l2[0][0])
    lst.append(l2[1][0])
    lst.append('PO8')
    lst.append('PO7')
    s = set(lst)
    fl = list(s)
    return fl

In [19]:
#for getting Data in Dataframe form
def getData(evoked):
    #making copy so that original object stays intact
    x = evoked.copy()
    x.pick(['PO8','PO7']) #picking these 2 channels
    # Get the data and time points
    data = x.data  # Shape is (n_channels, n_times)
    times = x.times  # Time points in seconds
    
    # Convert data to a pandas DataFrame
    df = pd.DataFrame(data.T, columns = ['PO8','PO7'])  # Transpose to make time points as rows
    df['time'] = times  # Add time column
    return df

In [20]:
def loadEvoked(subId):
    raw = loadData(subId)
    raw = filterData(raw)
    performICA(raw)
    epochs = getEpochs(raw)
    epochs = baselineCorrection(epochs)
    evoked = calcEvoked(epochs)
    return evoked