In [1]:
import mne
import pyedflib
import numpy as np
import os
from scipy.interpolate import griddata
import matplotlib.pyplot as plt
import h5py

mne.set_log_level('WARNING')

In [2]:
# Validation
# maxshape=(None,32,32,3) maxshape=(None,1)
# Validation samples
f = h5py.File("X_validation_linear.h5","a")
dset_1 = f.create_dataset('validation', shape=(1800,32,32,3), maxshape=(1800,32,32,3), compression="gzip", compression_opts=9)

# Testing labels
f = h5py.File("Y_validation_linear.h5","a")
dset_2 = f.create_dataset('val_labels', shape=(1800,1), maxshape=(1800,1), compression="gzip", compression_opts=9)

image_index = 0

round_labels = np.zeros((300,1))
label_index = 0

In [3]:
# Electrodes coordinates

# Get example file to extract electrode coordinates
path = "../../Dataset/S001"                                  # Read specific subject dataset
file_name = os.path.join(path, 'S001R04.edf')             # Read specific round of the subject

round_data = mne.io.read_raw_edf(file_name)
round_data.load_data()
round_data.rename_channels(lambda s: s.strip("."))

montage_kind = "standard_1020"
montage = mne.channels.make_standard_montage(montage_kind)

round_data.set_montage(montage, match_case=False)
round_data.set_eeg_reference("average")                    # ADD THIS IN THE LOOP AS WELL

montage_image = round_data.get_montage()
a = getattr(montage_image,'dig')

# Get coordinates of electrodes in a single array
elec_coord = np.zeros((67,2))

# Information about electrodes
elec_info = getattr(montage_image,'dig')

# Save x and y coordinates of LPA(index 0), nasion(index 1), RPA(index 2) and all electrodes (indeces 3-67) in the array
for i in range(0,67):
    xyz = elec_info[i].get('r')
    elec_coord[i] = [xyz[1],xyz[0]]

# Normalize coordinates
max_y = np.amax(elec_coord[:,0])
min_y = np.amin(elec_coord[:,0])

max_x = np.amax(elec_coord[:,1])
min_x = np.amin(elec_coord[:,1])

for i in range(0,67):
    
    # Normalize y coordinates
    # Positive
    if (elec_coord[i][0] > 0):
        elec_coord[i][0] = abs(round(16*abs(elec_coord[i][0]/max_y))-16)
    # Negative
    else:
        elec_coord[i][0] = round(15*abs(elec_coord[i][0]/(min_y)))+16
    
    # Normanize x coordinates
    # Positive
    if (elec_coord[i][1] > 0):
        elec_coord[i][1] = round(15*abs(elec_coord[i][1]/max_x))+16
    # Negative
    else:
        elec_coord[i][1] = abs(round(16*abs(elec_coord[i][1]/min_x))-16)

print(elec_coord)
montage_image.save("montage.fif")

# Electrode coordinates used to assign values and interpolate later
electrodes = np.zeros((64,2))

for k in range(0,64):
    electrodes[k] = elec_coord[k+3]

image_x, image_y = np.mgrid[0:32, 0:32]
interpolation = np.zeros((10,32,32,3))

[[16.  0.]
 [ 1. 16.]
 [16. 31.]
 [10.  2.]
 [ 9.  5.]
 [ 8. 10.]
 [ 8. 16.]
 [ 8. 22.]
 [ 9. 27.]
 [ 9. 30.]
 [14.  1.]
 [14.  4.]
 [13.  9.]
 [13. 16.]
 [13. 23.]
 [13. 28.]
 [14. 31.]
 [19.  1.]
 [19.  4.]
 [19.  9.]
 [19. 16.]
 [19. 23.]
 [19. 28.]
 [19. 31.]
 [ 1. 11.]
 [ 0. 16.]
 [ 1. 21.]
 [ 3.  6.]
 [ 1. 10.]
 [ 1. 16.]
 [ 1. 22.]
 [ 3. 26.]
 [ 7.  3.]
 [ 6.  4.]
 [ 5.  7.]
 [ 4. 11.]
 [ 4. 16.]
 [ 4. 21.]
 [ 4. 25.]
 [ 5. 28.]
 [ 6. 29.]
 [11.  1.]
 [11. 30.]
 [15.  0.]
 [15. 31.]
 [15.  0.]
 [15. 31.]
 [20.  0.]
 [19. 31.]
 [24.  3.]
 [24.  3.]
 [24.  6.]
 [24. 11.]
 [24. 16.]
 [24. 22.]
 [24. 26.]
 [24. 28.]
 [24. 29.]
 [27.  6.]
 [28.  9.]
 [28. 16.]
 [28. 22.]
 [27. 26.]
 [30. 11.]
 [30. 16.]
 [30. 21.]
 [31. 16.]]


In [4]:
# Make an array for windows to select for each task
windows_a = np.zeros((30,10)) # For subjects of category a

windows_a = [[0,2,4,6,8,10,12,14,16,18],
     [21,23,25,27,29,31,33,35,37,39],
     [41,43,45,47,49,51,53,55,57,59],
     [62,64,66,68,70,72,74,76,78,80],
     [82,84,86,88,90,92,94,96,98,100],
     [103,105,107,109,111,113,115,117,119,121],
     [123,125,127,129,131,133,135,137,139,141],
     [144,146,148,150,152,154,156,158,160,162],
     [164,166,168,170,172,174,176,178,180,182],
     [185,187,189,191,193,195,197,199,201,203],
     [205,207,209,211,213,215,217,219,221,223],
     [226,228,230,232,234,236,238,240,242,244],
     [246,248,250,252,254,256,258,260,262,264],
     [267,269,271,273,275,277,279,281,283,285],
     [287,289,291,293,295,297,299,301,303,305],
     [308,310,312,314,316,318,320,322,324,326],
     [328,330,332,334,336,338,340,342,344,346],
     [349,351,353,355,357,359,361,363,365,367],
     [369,371,373,375,377,379,381,383,385,387],
     [390,392,394,396,398,400,402,404,406,408],
     [410,412,414,416,418,420,422,424,426,428],
     [431,433,435,437,439,441,443,445,447,449],
     [451,453,455,457,459,461,463,465,467,469],
     [472,474,476,478,480,482,484,486,488,490],
     [492,494,496,498,500,502,504,506,508,510],
     [513,515,517,519,521,523,525,527,529,531],
     [533,535,537,539,541,543,545,547,549,551],
     [554,556,558,560,562,564,566,568,570,572],
     [574,576,578,580,582,584,586,588,590,592],
     [595,597,599,601,603,605,607,609,611,613]]

In [5]:
# Loop through all the files, pre-process data, extract images, extract labels and save them in ndarrays

path = "../../Dataset"                                  # Read specific subject dataset

# For subjects of group a (T0 duration = 4.1 s)
#for subject_id in subj_a:
for subject_id in range(60,61):             # Testing Dataset from Subject 60
        
    for round_id in range(4,15,2):
        
        file_name = os.path.join(path, './S{:003d}/S{:003d}R{:02d}.edf'.format(subject_id,subject_id,round_id))             # Read specific round of the subject
        print('./S{:003d}/S{:003d}R{:02d}.edf'.format(subject_id,subject_id,round_id))
        round_data = mne.io.read_raw_edf(file_name)
        round_data.load_data()
        round_data.rename_channels(lambda s: s.strip("."))
        array_rnd_data = round_data.get_data()
        
        # Extract labels for specific round
        # Extract events from annotations and use dictionary to specify required label - IT DEPENDS ON THE ROUND!
        # rest: 0 , left fist: 1, right fist: 2, both fists: 3, both feet: 4
        if (round_id == 4 or round_id == 8 or round_id == 12):
            dictionary = {'T0':0, 'T1':1, 'T2':2}
        
        elif (round_id == 6 or round_id == 10 or round_id == 14):
            dictionary = {'T0':0, 'T1':3, 'T2':4}
        
        ev_fr_ann, event_id = mne.events_from_annotations(round_data, event_id=dictionary)
        
        for i in range(0,30):
            round_labels[(10*i):(10*i)+10] = ev_fr_ann[i][2]   # 10 times the label because of 10 windows when extracting images of 0.4s
            # Save labels in the file to be used as training and testing labels
            with h5py.File('./../Y_validation_linear.h5','a') as hf:
                hf["val_labels"][label_index:(label_index+10)] = round_labels[(10*i):(10*i)+10]
                label_index = label_index + 10    

                    
        # TO DO: Maybe add band-pass filtering here for Delta, Mu and Beta ranges
        
        fft = mne.time_frequency.stft(array_rnd_data,64)
        
        for task in range(0,30):
            
            index = 0
            pixels = np.zeros((10,64,3))    # pixels(windows, electrodes, (R,G,B)) and each (R,G,B) corresponds to one electrode
            max_value = np.zeros(3)
            
            for window in windows_a[task]:    # Windows for each task/imagined movement
                for i in range(0,64):

                    # Extract value for Delta Band (0.5 - 4 Hz)
                    # Calculate sum of squares of absolute values
                    # all electrodes / freq_ranges 1,2,3 / window 1
                    pixels[index][i][0] = abs(fft[i][0][window])**2 + abs(fft[i][1][window])**2 + abs(fft[i][2][window])**2

                    # Extract value for Mu Band (8 - 13 Hz)
                    # Calculate sum of squares of absolute values
                    # electrode 1 / freq_ranges 4,5,6 / window 1 
                    pixels[index][i][1] = abs(fft[i][3][window])**2 + abs(fft[i][4][window])**2 + abs(fft[i][5][window])**2

                    # Extract value for Beta Band (13 - 30 Hz)
                    # Calculate sum of squares of absolute values
                    # electrode 1 / freq_ranges 7,8,9,10,11,12,13 / window 1 
                    pixels[index][i][2] = abs(fft[i][6][window])**2 + abs(fft[i][7][window])**2 + abs(fft[i][8][window])**2 + abs(fft[i][9][window])**2 + abs(fft[i][10][window])**2 + abs(fft[i][11][window])**2 + abs(fft[i][12][window])**2


                # Normalize R,G,B channels to fit the (0,1) range with the max value of each band    
                #print(np.amax(pixels))
                max_value[0] = np.amax(pixels[index,:,0]) # max value of Delta band
                max_value[1] = np.amax(pixels[index,:,1]) # max value of Mu band
                max_value[2] = np.amax(pixels[index,:,2]) # max value of Beta band

                for j in range(0,64):
                    for k in range(0,3):
                        pixels[index][j][k] = pixels[index][j][k]/max_value[k]
                        
                index = index+1
            
            
            # Assign pixel values to electrode coordinates and interpolate to create the image
            for index in range(0,10):
                interpolation[index] = griddata(electrodes,pixels[index], (image_x, image_y), method ='linear')
                
                # Save interpolated images in the file to be used as training and testing samples
                with h5py.File('./../X_validation_linear.h5','a') as hf:
                    hf["validation"][image_index] = interpolation[index]
                    image_index = image_index + 1
                
f.close()
print("Done.")

./S060/S060R04.edf
./S060/S060R06.edf
./S060/S060R08.edf
./S060/S060R10.edf
./S060/S060R12.edf
./S060/S060R14.edf
Done.
