In [1]:
import sys
import os
os.environ["KMP_DUPLICATE_LIB_OK"]="TRUE"
import glob
import pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torch.utils.data import random_split

from utils import *

# shareDir = os.path.abspath('../..')
# sys.path.append(shareDir)
# from bciBASE.server import EEG_data

scriptsDir = os.path.abspath('.')
presentationsDir = os.path.abspath(rf'{scriptsDir}/../presentation')
mannualDataDir = os.path.abspath(rf'{scriptsDir}/../manualData')

# true labelled fist_rest data

In [2]:
paradigm = 'fist'
subject = 'JMF'
fistDataMATFile = glob.glob(rf'{mannualDataDir}/{paradigm}/{subject}/*.mat')[0]
fistDataFDTFile = fistDataMATFile.replace('.mat','.fdt')
if not os.path.exists(fistDataFDTFile): raise
fistDataSETFile = fistDataFDTFile.replace('.mat','.set')
if not os.path.exists(fistDataSETFile): raise

infoFile = rf'{presentationsDir}/info.xlsx'
infoDf = pd.read_excel(infoFile)
info = infoDf[(infoDf['subject']==subject)&(infoDf['paradigm']==paradigm)]
channel = eval(info['channel'].values[0])

invalidChannelNum = 7
validChannel = channel[:-invalidChannelNum]
validChannelNum = len(validChannel)

fistData = read_file(fistDataMATFile)[:validChannelNum,:]

print(fistDataMATFile)
print(info)
print(fistData.shape)

h:\BCIteam_Allrelated\SharedSource\ecog\manualData/fist/JMF\60s70s68s.mat
  date   
7   -1  \

  info((hypothesis that all operations are for left tumor and all actions are on right))   
7                                  唤醒下左额顶开颅术：连续握拳1分钟                                      \

  predictedtime paradigm subject subjectid  age  sexual               tumor   
7           60s     fist     JMF    sub006   53  female  left top forehead'  \

   epilepsy      data time  freq   
7         1  32x35000  70s   500  \

                                             channel  
7  ['s6c1','s6c2','s6e1','s6e2','s6e3','s6e4','s6...  
(2, 34248)


In [3]:
paradigm = 'rest'
subject = 'JMF'
restDataMATFile = glob.glob(rf'{mannualDataDir}/{paradigm}/{subject}/*.mat')[0]
restDataFDTFile = restDataMATFile.replace('.mat','.fdt')
if not os.path.exists(restDataFDTFile): raise
restDataSETFile = restDataFDTFile.replace('.mat','.set')
if not os.path.exists(restDataSETFile): raise

infoFile = rf'{presentationsDir}/info.xlsx'
infoDf = pd.read_excel(infoFile)
info = infoDf[(infoDf['subject']==subject)&(infoDf['paradigm']==paradigm)]
channel = eval(info['channel'].values[0])

invalidChannelNum = 7
validChannel = channel[:-invalidChannelNum]
validChannelNum = len(validChannel)

restData = read_file(restDataMATFile)[:validChannelNum,:]

print(restDataMATFile)
print(info)
print(restData.shape)

h:\BCIteam_Allrelated\SharedSource\ecog\manualData/rest/JMF\180s234s224s.mat
  date   
6   -1  \

  info((hypothesis that all operations are for left tumor and all actions are on right))   
6                                    唤醒下左额顶开颅术：静息3分钟                                      \

  predictedtime paradigm subject subjectid  age  sexual               tumor   
6          180s     rest     JMF    sub006   53  female  left top forehead'  \

   epilepsy       data  time  freq   
6         1  32x117000  234s   500  \

                                             channel  
6  ['s6c1','s6c2','s6e1','s6e2','s6e3','s6e4','s6...  
(2, 111924)


# model to classify timestep epoch is rest or fist 

In [None]:
class CNN(nn.Module):
    def __init__(self, p, classes, ifprint=False):
        super(CNN, self).__init__()

        self.p = p
        self.classes = classes
        self.ifprint = ifprint
               
        self.conv1=nn.Sequential(
            nn.Conv2d(in_channels=1,out_channels=4, kernel_size=(1,7),stride=(1,1),padding=(0,3)),
            nn.Dropout(0.2), 
            nn.BatchNorm2d(num_features=4),
            nn.ELU(),
            nn.MaxPool2d(kernel_size=(2,2),stride=(2,2))
            )  
        self.conv2=nn.Sequential(
            nn.Conv2d(in_channels=4,out_channels=8,kernel_size=(3,3),stride=(1,1),padding=(1,1)),
            nn.Dropout(0.2), 
            nn.BatchNorm2d(num_features=8),
            nn.ELU(),       
            nn.MaxPool2d(kernel_size=(2,2),stride=(2,2))
            )
        self.out=nn.Sequential(
            nn.Flatten(),
            nn.Linear(self.p,self.classes,bias=True),
            )

    def forward(self,x):
        if self.ifprint: print(x.size())
        output1=self.conv1(x)
        if self.ifprint: print(output1.size())
        output2=self.conv2(output1)  
        if self.ifprint: print(output2.size())
        output3=output2.reshape(output2.size(0),-1)
        if self.ifprint: print(output3.size())
        output=self.out(output3)
        if self.ifprint: print(output.size())
        return output

def torch_data(data):
    if type(data)==np.ndarray:
        return torch.from_numpy(data).float()
    if type(data)==pd.core.frame.DataFrame:
        return torch.from_numpy(data.values).float()
        
class classifyDataset(Dataset):
    def __init__(self, data0, data1, timeStep,
                 normalize=False, normalizeBy='', normalizeMethod='',
                 flatten=False):
        self.data0 = data0
        self.data1 = data1
        self.timeStep = timeStep
        self.lenData0 = data0.shape[1]
        self.numSample0 = self.lenData0 - self.timeStep + 1
        self.lenData1 = data1.shape[1]
        self.numSample1 = self.lenData1 - self.timeStep + 1
        self.numSample = self.numSample0 + self.numSample1
        
        self.normalize = normalize
        self.normalizeBy = normalizeBy
        self.normalizeMethod = normalizeMethod

        self.flatten = flatten

    def __len__(self):
        return self.numSample
    def __getitem__(self, idx):
        if idx < self.numSample1:
            data = self.data0[:, idx : idx + self.timeStep]
            label = 0
        else:
            data = self.data1[:, (idx - self.numSample1) : (idx - self.numSample1) + self.timeStep]
            label = 1
        if self.normalize: data = normalize(data,by=self.normalizeBy,method=self.normalizeMethod) 
        if self.flatten: data = flatten_data(data)
        return torch_data(data), torch.tensor(label)

data0 = fistData
data1 = restData
timeStep = 500
classifydataset = classifyDataset(data0, data1, timeStep)
train_size = int(classifydataset.__len__() * 0.8)
test_size = classifydataset.__len__() - train_size 
train_dataset, test_dataset = random_split(classifyDataset, [train_size, test_size])
train_loader = DataLoader(train_dataset, batch_size=int(train_dataset.__len__()/10), shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=test_dataset.__len__(), shuffle=False)

if torch.cuda.is_available():
    torch.cuda.empty_cache()
    device = torch.device('cuda')
    print('device is gpu')
else:
    device = torch.device('cpu')
    print('device is cpu')

cnn = CNN(p=8*int(int(validChannelNum/2)/2)*int(int(timeStep/2)/2),
        classes=2,
        ifprint=False).to(device)
optimizer=torch.optim.Adam(cnn.parameters(), lr=0.003, weight_decay=1e-5)
loss_fn=nn.CrossEntropyLoss()

ModelDf = pd.DataFrame(columns = ['epoch','step','loss','trainAcc','testAcc'])
irow = 0
test_accuracy_best = 0
epoch = 0
while (epoch<5) or (test_accuracy_best<0.5 and epoch<20):
    for step, (batch_data,batch_label) in enumerate(train_loader):
        batch_data = torch.unsqueeze(batch_data,1).to(device)
        batch_label = batch_label.to(device)
        output = cnn(batch_data)
        loss = loss_fn(output, batch_label)
        optimizer.zero_grad()   
        loss.backward()         
        optimizer.step() 
        x_pred = torch.max(output,1)[1].data.squeeze()
        train_accuracy = sum(x_pred==batch_label).item()/batch_label.size(0)
        for _,(test_data,test_label) in enumerate(test_loader):
            test_data = torch.unsqueeze(test_data,1).to(device)
            test_label = test_label.to(device)
            test_output = cnn(test_data)
            y_pred = torch.max(test_output,1)[1].data.squeeze()
            test_accuracy=sum(y_pred==test_label).item()/test_label.size(0)
            test_accuracy_best = test_accuracy_best if test_accuracy_best>test_accuracy else test_accuracy 
            ModelDf.loc[irow] = [epoch+1,step+1,round(loss.item(),4),round(train_accuracy,4),round(test_accuracy,4)]
            print(f'epoch: {epoch+1} | step: {step+1} | loss: {round(loss.item(),4)} | train_acc: {round(train_accuracy,4)} | test_acc: {round(test_accuracy,4)}')     
            irow += 1
    epoch += 1

ModelDf

# for decision on reseaonal seperation timepoint

In [None]:
class SegementationDataset(Dataset):
    def __init__(self, data0, data1, step,
                 normalize=False, normalizeBy='', normalizeMethod='',
                 flatten=False):
        self.step = step
        self.lenData0 = data0.shape[1]
        self.numSample0 = self.lenData0 - self.step + 1
        self.lenData1 = data1.shape[1]
        self.numSample1 = self.lenData1 - self.step + 1
        self.numSample = self.numSample0 * self.numSample1
        
        self.normalize = normalize
        self.normalizeBy = normalizeBy
        self.normalizeMethod = normalizeMethod

        self.flatten = flatten

    def __len__(self):
        return self.numSample
    def __getitem__(self, idx):
        idx0 = int( idx / self.numSample1 )
        idx1 = idx % self.numSample1
        idata0 = self.data0[:, idx0:idx0 + self.step]
        idata1 = self.data1[:, idx1:idx0 + self.step]
        if self.normalize: data = normalize(data,by=self.normalizeBy,method=self.normalizeMethod) 
        if self.flatten: data = flatten_data(data)
        label = self.labels[idx]
        return torch_data(data), torch.tensor(label)