In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
rootpath = '/content/drive/MyDrive/EEG Data/'

In [None]:
import torch
import scipy.io as sio
import numpy as np
from sklearn.metrics import accuracy_score
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import matplotlib.pyplot as plt
from scipy.signal import butter, filtfilt
from scipy.stats import zscore
import torch.optim as optim
import os
import torch.functional as F

In [None]:
torch.cuda.empty_cache()
torch.manual_seed(0)

<torch._C.Generator at 0x7eff1e984230>

In [None]:
class CNNLSTM(torch.nn.Module):
    """
    The codes implement the CNN model proposed in the paper "Subject-Independent Drowsiness Recognition from Single-Channel EEG with an Interpretable CNN-LSTM model".
    The network is designed to classify 1D drowsy and alert EEG signals for the purposed of driver drowsiness recognition.
      
    """
    def __init__(self):
        super(CNNLSTM, self).__init__()
        self.feature=32
        # self.padding= torch.nn.ReplicationPad2d((31,32,0,0))
        self.conv = torch.nn.Conv2d(1,self.feature,(1,64))#,padding=(0,32),padding_mode='replicate')     
        self.batch = Batchlayer(self.feature)          
        self.avgpool = torch.nn.AvgPool2d((1,8))
        self.fc = torch.nn.Linear(32, 2)        
        self.softmax=torch.nn.LogSoftmax(dim=1)
        self.softmax1=torch.nn.Softmax(dim=1)        
        self.lstm=torch.nn.LSTM(32, 2)
        # self.conv_new = torch.nn.Conv2d(30,1)
        
    def forward(self, source): 
        # source = self.padding(source)
        # source = self.conv_new(source)
        source = self.conv(source)
        source = F.relu(source)
        source = self.batch(source)
        
        source = torch.nn.ELU()(source) 
        source=self.avgpool(source)        
        source =source.squeeze()
        source=source.permute(2, 0, 1)
        source = self.lstm(source)
        source=source[1][0].squeeze()
        source = self.softmax(source)   

        return source 

"""
We use the batch normalization layer implemented by ourselves for this model instead using the one provided by the Pytorch library.
In this implementation, we do not use momentum and initialize the gamma and beta values in the range (-0.1,0.1). 
We have got slightly increased accuracy using our implementation of the batch normalization layer.
"""
def normalizelayer(data):
    eps=1e-05
    a_mean=data-torch.mean(data, [0,2,3],True).expand(int(data.size(0)), int(data.size(1)), int(data.size(2)),int(data.size(3)))
    b=torch.div(a_mean,torch.sqrt(torch.mean((a_mean)**2, [0,2,3],True)+eps).expand(int(data.size(0)), int(data.size(1)), int(data.size(2)),int(data.size(3))))
    
    return b

class Batchlayer(torch.nn.Module):
    def __init__(self, dim):
        super(Batchlayer, self).__init__()
        self.gamma=torch.nn.Parameter(torch.Tensor(1,dim,1,1))
        self.beta=torch.nn.Parameter(torch.Tensor(1,dim,1,1))
        self.gamma.data.uniform_(-0.1, 0.1)
        self.beta.data.uniform_(-0.1, 0.1)
        
    def forward(self, input):
        data=normalizelayer(input)
        gammamatrix=self.gamma.expand(int(data.size(0)), int(data.size(1)), int(data.size(2)),int(data.size(3)))
        betamatrix = self.beta.expand(int(data.size(0)), int(data.size(1)), int(data.size(2)),int(data.size(3)))
        
        return data*gammamatrix+betamatrix

In [None]:
def bp_filter(data, f_lo, f_hi, fs):
    """ Digital band pass filter (6-th order Butterworth)
    Args:
        data: numpy.array, time along axis 0
        (f_lo, f_hi): frequency band to extract [Hz]
        fs: sampling frequency [Hz]
    Returns:
        data_filt: band-pass filtered data, same shape as data """
    data_filt = np.zeros_like(data)
    f_ny = fs / 2.  # Nyquist frequency
    b_lo = f_lo / f_ny  # normalized frequency [0..1]
    b_hi = f_hi / f_ny  # normalized frequency [0..1]
    # band-pass filter parameters
    p_lp = {"N":6, "Wn":b_hi, "btype":"lowpass", "analog":False, "output":"ba"}
    p_hp = {"N":6, "Wn":b_lo, "btype":"highpass", "analog":False, "output":"ba"}
    bp_b1, bp_a1 = butter(**p_lp)
    bp_b2, bp_a2 = butter(**p_hp)
    data_filt = filtfilt(bp_b1, bp_a1, data, axis=0)
    data_filt = filtfilt(bp_b2, bp_a2, data_filt, axis=0)
    return data_filt

def get_EEG_data(data_root, filename):
    # Extract the data from one of these files.
    hz = 128
    #filename = 'eeg_record30.mat'
    mat = sio.loadmat(data_root + filename)
    data = pd.DataFrame.from_dict(mat["o"]["data"][0,0])

    # Limit the data to the 7 valid EEG leads.
    dat = data.filter(list(range(3, 17)))
    dat.columns = list(range(1, 15))
    dat = dat.filter([2, 3, 6, 7, 8, 9, 14], axis=1)
    labels = ['F7', 'F3', 'T5', 'O1', 'O2', 'T6', 'FP2']  # FP2 should really be AF4
    dat.columns = labels

    # Filter the data, high pass 2 Hz, low pass 40 Hz.
    lo, hi = 0.5, 40
    # Do the filtering.  get 0.5-40hz
    datf = bp_filter(dat.to_numpy(), lo, hi, hz)

    # Convert back to a dataframe.
    dat = pd.DataFrame({c: datf[:, i] for i,c in enumerate(labels)})

    # Z-transform each column
    dat = dat.apply(zscore)
    
    return dat

def plotAll(dat, leads, start, seconds, hz):
    plt.figure(figsize=(30,10))
    for i, lead in enumerate(leads):   
        length = dat.loc[start: start + seconds * hz, lead].shape
        print(length)
        plt.plot(dat.loc[start: start + seconds * hz, lead],label=lead)   
    plt.title('multi-channel EEG signal')
    plt.legend()
    plt.show()

def saveAll(dat,leads, start, seconds, hz):
    plt.figure(figsize=(30,10))
    for i, lead in enumerate(leads):   
        print(type(dat.loc[start: start + seconds * hz, lead]))
        dat.loc[start: start + seconds * hz, lead]
        
        
    # plt.title('multi-channel EEG signal')
    # plt.legend()
    # plt.show()

def signal_append(data,front,behind):
  # change data set shape to (length-2,front+dat.shape[1]+behind)
  # dat = dataset
  # front : the length of data append in the front of the data slice
  # behind : the length of data append behind the data slice
  # return dataset as numpy
    o_shape = data.shape # original data set shape
    n_shape = (data.shape[0]-2,front+data.shape[1]+behind)
    data_set = np.zeros((n_shape[0],n_shape[1]))
    # print(data_set.shape)
    for i,item in enumerate(data):
      if i == 0:
        continue
      if i == o_shape[0]-1:
        break
      data_pre = data[i-1]
      data_now = data[i]
      data_next = data[i+1]
      # print(i)
      data_set[i-1] = np.concatenate((data_pre[-1*front:],data_now,data_next[0:behind]),axis = 0)
    return data_set

def signal_slice(dat,channel, start, seconds, hz, time_len, front,behind):  
  # time_len : data time after slice in seconds
      data_shape = dat.shape
      new_set = []
      for i in range(start,start+time_len*hz,seconds*hz):
          new_set.append(dat.loc[i: i + seconds * hz-1, channel])
      new_set = np.array(new_set)
      new_set = signal_append(new_set,front, behind)
      return new_set

# def create_label():


In [None]:
filelist = os.listdir(rootpath)
print(filelist[0:-1])

['eeg_record1.mat', 'eeg_record10.mat', 'eeg_record12.mat', 'eeg_record11.mat', 'eeg_record13.mat', 'eeg_record14.mat', 'eeg_record16.mat', 'eeg_record15.mat', 'eeg_record18.mat', 'eeg_record17.mat', 'eeg_record2.mat', 'eeg_record19.mat', 'eeg_record20.mat', 'eeg_record21.mat', 'eeg_record22.mat', 'eeg_record23.mat', 'eeg_record25.mat', 'eeg_record24.mat', 'eeg_record27.mat', 'eeg_record26.mat', 'eeg_record29.mat', 'eeg_record28.mat', 'eeg_record30.mat', 'eeg_record3.mat', 'eeg_record31.mat', 'eeg_record33.mat', 'eeg_record32.mat', 'eeg_record34.mat', 'eeg_record5.mat', 'eeg_record4.mat', 'eeg_record7.mat', 'eeg_record6.mat', 'eeg_record8.mat', 'eeg_record9.mat']


In [None]:
from logging import root
channels = ['F7', 'F3', 'T5', 'O1', 'O2', 'T6', 'FP2']
start = 0
seconds = 3
hz = 128


# def gen_dataset(rootpath,channel):
#   data = pd.DataFrame()
#   label = pd.DataFrame()
#   for i,f in enumerate(filelist[0:-1]):
#     name = f[0:-4]
#       # print(name)  
#     dat = get_EEG_data(rootpath, f)
#     label[str(i)] = np.concatenate((np.zeros(199),np.ones(199)),axis=0).tolist()   
#     data[str(i)] = signal_slice(dat,channel,0,3,128,1200,31,32).tolist()
#   return data,label
 

def gen_dataset(rootpath,channel):
  data = []
  label = []
  for i,f in enumerate(filelist[0:-1]):
    name = f[0:-4]
      # print(name)  
    dat = get_EEG_data(rootpath, f)
    label.append(np.concatenate((np.zeros(199),np.ones(199)),axis=0))
    data.append(signal_slice(dat,channel,0,3,128,1200,31,32))
  return np.array(data),np.array(label)


# data.shape
dataset, labelset = gen_dataset(rootpath,'O1')



In [None]:
print(labelset.shape)

(34, 398)


In [None]:
def channel_data_per_subject(dataset,labelset):
  subject_data = pd.DataFrame()
  subject_label = pd.DataFrame()
  subject_num = 5
  # namelist = [f[0:-4] for f in filelist]
  # channels_t = ['O1','O2']
  s = 1
  data = []
  label = []
  for i in range(len(filelist)-1):
    data = np.concatenate((data,dataset[str(i)]),axis=0)
    label = np.concatenate((label,labelset[str(i)]),axis=0)
    if i%7 == 6 or i == 33:
      subject_data[str(s)] = pd.Series(data)
      subject_label[str(s)] = pd.Series(label)
      data = []
      label = []
      s = s + 1
  return subject_data,subject_label

def channel_data_per_subject(dataset,labelset):
  subject_data = np.zeros(5)
  subject_data[0]
  return subject_data,subject_label

In [None]:
# subject_data = np.zeros(5)
# subject_data[0] = np.concatenate((dataset[0],dataset[1],dataset[2],dataset[3],dataset[4],dataset[5],dataset[6]),axis=0)

ValueError: ignored

In [None]:
# xdata= dataset.reshape(dataset.shape[0]*dataset.shape[1],dataset.shape[2])
# ydata = labelset.reshape(labelset.shape[0]*labelset.shape[1])

# from sklearn.model_selection import train_test_split


# X_train,X_test,y_train,y_test = train_test_split(xdata,ydata,test_size=0.2, random_state=42)


In [None]:
# my_net = CNNLSTM().double().cuda()
# optimizer = optim.Adam(my_net.parameters(), lr=lr)    
# loss_class = torch.nn.NLLLoss().cuda()

TypeError: ignored

In [None]:
def run(data,label):

#    load data from the file
    # filename = r'dataset.mat' 
    
    # tmp = sio.loadmat(filename)
    # xdata=np.array(tmp['EEGsample'])
    # label=np.array(tmp['substate'])
    # subIdx=np.array(tmp['subindex'])

    # label.astype(int)
    # subIdx.astype(int)
    
    # samplenum=label.shape[0]
    
#   there are 11 subjects in the dataset. Each sample is 3-seconds data from 30 channels with sampling rate of 128Hz. 
    # channelnum=30
    # subjnum=11
    # samplelength=3
    # sf=128
    
#   define the learning rate, batch size and epoches
    # lr=1e-2 
    # batch_size = 50
    # n_epoch =15 
    
#   ydata contains the label of samples
    # xdata =    
    ydata=np.zeros(samplenum,dtype=np.longlong)
    
    for i in range(samplenum):
        ydata[i]=label[i]

#   only channel 28 is used, which corresponds to the Oz channel
#     selectedchan=[28]
#     selectedchen=[28]
    
#   update the xdata and channel number
#     print(xdata)
#     xdata=xdata[:,selectedchan,:]
#     print(xdata.head)
#     channelnum=len(selectedchan)
    
#   the result stores accuracies of every subject     
    results=np.zeros(subjnum)
  
    
#   it performs leave-one-subject-out training and classfication 
#   for each iteration, the subject i is the testing subject while all the other subjects are the training subjects.      
    for i in range(1,subjnum+1):

#       form the training data        
        trainindx=np.where(subIdx != i)[0] 
        xtrain=xdata[trainindx]   
        x_train = xtrain.reshape(xtrain.shape[0],1,channelnum, samplelength*sf)
        y_train=ydata[trainindx]
        
#       form the testing data         
        testindx=np.where(subIdx == i)[0]    
        xtest=xdata[testindx]
        x_test = xtest.reshape(xtest.shape[0], 1,channelnum, samplelength*sf)
        y_test=ydata[testindx]
    

        train = torch.utils.data.TensorDataset(torch.from_numpy(x_train), torch.from_numpy(y_train))
        train_loader = torch.utils.data.DataLoader(train, batch_size=batch_size, shuffle=True)

    #       load the CNN model to deal with 1D EEG signals
        my_net = CNNLSTM().double().cuda()

   
        optimizer = optim.Adam(my_net.parameters(), lr=lr)    
        loss_class = torch.nn.NLLLoss().cuda()

        for p in my_net.parameters():
            p.requires_grad = True    
  
    #        train the classifier 
        for epoch in range(n_epoch):   
            for j, data in enumerate(train_loader, 0):
                inputs, labels = data                
                
                input_data = inputs.cuda()
                class_label = labels.cuda()              

                my_net.zero_grad()               
                my_net.train()          
   
                class_output= my_net(input_data) 
                err_s_label = loss_class(class_output, class_label)
                err = err_s_label 
             
                err.backward()
                optimizer.step()

    #       test the results
        my_net.train(False)
        with torch.no_grad():
            x_test =  torch.DoubleTensor(x_test).cuda()
            answer = my_net(x_test)
            probs= answer.cpu().numpy()
            preds = probs.argmax(axis = -1)  
            acc=accuracy_score(y_test, preds)

            print(acc)
            results[i-1]=acc
            
            
    print('mean accuracy:',np.mean(results))

if __name__ == '__main__':
    run()

TypeError: __init__() takes at least 4 arguments (3 given)

In [None]:
print(type(np.array(subject_data['1'][0])))

<class 'numpy.ndarray'>
