In [352]:
import numpy as np
import pyxdf
import mne
from mne.datasets import misc
import matplotlib
import matplotlib.pyplot as plt

matplotlib.use('TkAgg')

Read XDF file

In [353]:
#Load xdf file
xdf_file_path = "data/CurrentStudy/sub-P001/ses-S200/eeg/sub-P001_ses-S200_task-Default_run-001_eeg.xdf"
#xdf_file_path = "data/CurrentStudy/sub-P001/ses-S201_FaceDelay/eeg/sub-P001_ses-S201_FaceDelay_task-Default_run-001_eeg.xdf"
streams, header = pyxdf.load_xdf(xdf_file_path)

Stream 2: Calculated effective sampling rate 8.8342 Hz is different from specified rate 500.0000 Hz.


We need to create an mne object in order to use filter and epoch functions. Extract the elements from the dictiornaries.
streams[0] contains info about markers and streams[1] contains info about EEG data

**Creates RAW data object**

In [354]:
eeg_data_raw = streams[1]['time_series']
sampling_rate = streams[1]['info']['nominal_srate'][0]
eeg_data = mne.filter.filter_data(eeg_data_raw.T.astype(np.float64),sampling_rate,2,16)
eeg_timestamps = streams[1]['time_stamps']

#Extract channel names
number_channels = np.shape(streams[1]['info']['desc'][0]['channels'][0]['channel'])[0]
ch_names = []
ch_pos = {}
for i in range(number_channels):
    channel_label = streams[1]['info']['desc'][0]['channels'][0]['channel'][i]['label'][0]
    pos_x = np.float64(streams[1]['info']['desc'][0]['channels'][0]['channel'][i]['location'][0]['X'][0])#Get X channel position
    pos_y = np.float64(streams[1]['info']['desc'][0]['channels'][0]['channel'][i]['location'][0]['Y'][0])#Get Y channel position
    pos_z = np.float64(streams[1]['info']['desc'][0]['channels'][0]['channel'][i]['location'][0]['Z'][0])#Get Z channel position
    
    ch_names.append(channel_label)
    ch_pos[channel_label] = [pos_x,pos_y,pos_z]
   
ch_types = ['eeg'] * len(ch_names)


Setting up band-pass filter from 2 - 16 Hz

FIR filter parameters
---------------------
Designing a one-pass, zero-phase, non-causal bandpass filter:
- Windowed time-domain design (firwin) method
- Hamming window with 0.0194 passband ripple and 53 dB stopband attenuation
- Lower passband edge: 2.00
- Lower transition bandwidth: 2.00 Hz (-6 dB cutoff frequency: 1.00 Hz)
- Upper passband edge: 16.00 Hz
- Upper transition bandwidth: 4.00 Hz (-6 dB cutoff frequency: 18.00 Hz)
- Filter length: 825 samples (1.650 s)



[Parallel(n_jobs=1)]: Done  17 tasks      | elapsed:    0.0s


**Create events to use for the epoching phase**

In [355]:
#https://mne.tools/stable/generated/mne.Epochs.html
marker_id = np.squeeze(streams[0]['time_series'])
marker_timestamps = streams[0]['time_stamps']
marker_timestamps =  marker_timestamps - eeg_timestamps[0]

#Creates event array, for this we need event_sample and event_id. Our event sample will be int(marker_timestamps[i]) and event_id as define below
events = []
for i,marker in enumerate(marker_id):
    event_id = {'S10': 1, 'S11': 2}[marker] #Iterates over marker_id and every time it finds an S10 it assigns a 1 to event_id, 
                                                  #every time it finds an S11 it assigns a 2 to event_id list and then append the value to
                                                  #events
    #events.append([int((marker_timestamps[i]-6.889)*1000), 0, event_id])
    events.append([marker_timestamps[i], 0, event_id])
                                          
#Why do we use numeric values ​​instead of directly using S10 or S11? 
# Some functions can expect number values ​​as parameters, so it is more convenient to use a dictionary and assign 
# S10 and S11 a numeric representation. In case we need to use a string, it is easier to pass this number value 
# to a string or use the dictionary to index its key (S10 or S11)

In [356]:
duplicate_events = []
timestamps = [event[0] for event in events]

for i in range(len(events)):
    if timestamps.count(events[i][0]) > 1:
        duplicate_events.append(i)

print("Events with the same timestamps:")
for index in duplicate_events:
    print(events[index])

Events with the same timestamps:


In [357]:
# Create info object
info = mne.create_info(ch_names, sfreq=sampling_rate, ch_types=ch_types)
#Set channel positions
montage = mne.channels.make_dig_montage(ch_pos)
info.set_montage(montage)

raw = mne.io.RawArray(eeg_data, info)

Creating RawArray with float64 data, n_channels=24, n_times=208787
    Range : 0 ... 208786 =      0.000 ...   417.572 secs
Ready.


  info.set_montage(montage)


Plot EEG signal

In [360]:
raw.plot(events=np.asarray(events),scalings=dict(eeg=1e1),show_options=True)

<MNEBrowseFigure size 800x800 with 4 Axes>

Channels marked as bad:
none


Plot PSD

In [359]:
raw.compute_psd(fmax = 30).plot()

Effective window size : 4.096 (s)


  raw.compute_psd(fmax = 30).plot()


<MNELineFigure size 1000x350 with 2 Axes>

**Implement functions for epoching**

In [201]:
def find_nearest_indices(arr1, arr2):
    indices = []
    for value in arr1:
        abs_diff = np.abs(arr2 - value)
        nearest_index = np.argmin(abs_diff) #The minimum absolute value between the arrays is calculated. For example if arr2 has elements 454 and 456 and value is 455, 
                                            #the subtraction abs(454-455) = 1 and abs(456-455) = 1, how then select only one of them? np.argmin() returns only the index of 
                                            #the first minimum found in case there are several minimum values ​​in the array.
        indices.append(nearest_index)
    return indices

def epoch(raw,marker_timestamps,marker_id,marker,tmin,tmax,sampling_rate):
    #Let's go and find how many samples tmin and tmax are equal to. Remember: Samples are integers/discrete values, thats why we have to cast to int
    tmin_samples = np.abs(int(tmin*int(sampling_rate)))
    tmax_samples = np.abs(int(tmax*int(sampling_rate)))
    time_points = np.round(marker_timestamps[np.where(marker_id == marker)],3)
    indices = find_nearest_indices(time_points,raw.times)
    epochs = []
    for trial in range(len(indices)):
        epochs.append(np.array(raw.get_data())[:,indices[trial] - tmin_samples:indices[trial] + tmax_samples])
        
    epochs = np.array(epochs)#Convert to numpy array
    return epochs

tmin = -0.5
tmax = 1
event_epochs = epoch(raw,marker_timestamps,marker_id,'S10',tmin,tmax,sampling_rate)
non_event_epochs = epoch(raw,marker_timestamps,marker_id,'S11',tmin,tmax,sampling_rate)

**Visualization**

In [368]:
time = np.arange(tmin,tmax,1/int(sampling_rate)) #This is not the general time but an snapshop from -0.500 to 1.000 using the sample sampling frequency used in data collection
event_avg = np.mean(event_epochs,axis = 0)
non_event_avg = np.mean(non_event_epochs,axis = 0)

fig, axs = plt.subplots(int(len(ch_names)/4), int(len(ch_names)/6))

#Select a window to look for maximum
lower_idx = np.where(time >= 0.250)[0][0]
upper_idx = np.where(time <= 0.305)[0][-1]

plot_next_row = 0
plot_next_column = 0
for ch in range(int(len(ch_names))):
    if ch%6 == 0 and ch != 0:
        plot_next_row = 0
        plot_next_column = plot_next_column + 1 
    max_idx = np.where(event_avg == max(event_avg[ch,lower_idx:upper_idx]))[1][0]
    axs[plot_next_row, plot_next_column].plot(time,event_avg[ch,:], color = 'lightgreen',label="Event")
    axs[plot_next_row, plot_next_column].plot(time[max_idx], event_avg[ch,max_idx], marker='x', color='red')
    axs[plot_next_row, plot_next_column].plot(time,non_event_avg[ch,:], color = 'blue',label="Non Event")
    axs[plot_next_row, plot_next_column].plot(time[max_idx], non_event_avg[ch,max_idx], marker='x', color='red')
    axs[plot_next_row, plot_next_column].set_title('Ch ' + str(ch + 1) + ' :' + ch_names[ch])
    axs[plot_next_row, plot_next_column].text(time[max_idx], 0, str(np.round(time[max_idx],3)), fontsize=9, color='black', ha='center', va='bottom')
    axs[plot_next_row, plot_next_column].grid(True)
    
    plot_next_row = plot_next_row + 1


fig.subplots_adjust(hspace=0.9)
plt.show()

**Feature extraction**

In [None]:
time = np.arange(tmin,tmax,1/int(sampling_rate))
event_avg = np.mean(event_epochs,axis = 0)
non_event_avg = np.mean(non_event_epochs,axis = 0)

fig, axs = plt.subplots(int(len(ch_names)/4), int(len(ch_names)/6))

#Select a window to look for maximum
lower_idx = np.where(time >= 0.250)[0][0]
upper_idx = np.where(time <= 0.305)[0][-1]

plot_next_row = 0
plot_next_column = 0
for ch in range(int(len(ch_names))):
    if ch%6 == 0 and ch != 0:
        plot_next_row = 0
        plot_next_column = plot_next_column + 1 
    max_idx = np.where(event_avg == max(event_avg[ch,lower_idx:upper_idx]))[1][0]
    axs[plot_next_row, plot_next_column].plot(time,event_avg[ch,:], color = 'lightgreen',label="Event")
    axs[plot_next_row, plot_next_column].plot(time[max_idx], event_avg[ch,max_idx], marker='x', color='red')
    axs[plot_next_row, plot_next_column].plot(time,non_event_avg[ch,:], color = 'blue',label="Non Event")
    axs[plot_next_row, plot_next_column].plot(time[max_idx], non_event_avg[ch,max_idx], marker='x', color='red')
    axs[plot_next_row, plot_next_column].set_title('Ch ' + str(ch + 1) + ' :' + ch_names[ch])
    axs[plot_next_row, plot_next_column].text(time[max_idx], 0, str(np.round(time[max_idx],3)), fontsize=9, color='black', ha='center', va='bottom')
    axs[plot_next_row, plot_next_column].grid(True)
    
    plot_next_row = plot_next_row + 1


fig.subplots_adjust(hspace=0.9)
plt.show()