In [None]:
# Filter test
import numpy as np
from scipy import signal
import matplotlib.pyplot as plt
import scipy.io
from sklearn.decomposition import PCA
import umap
 
def uniform(data):				# UMAP dimension reduction and visualize
  um=umap.UMAP(n_components=2,n_neighbors=15,min_dist=0.1)
  data=um.fit_transform(data)
  return data
 
def principal(data):				# PCA dimension reduction
  pca=PCA()
  data=pca.fit_transform(data) 
  detect=np.cumsum(pca.explained_variance_ratio_)
  detect=np.where(detect>=0.9)  # extract the main component which can reflect 90% correctness
  data=data[:,detect[0]]
  return data
 
def pre_process(data,label):			# Introduce .mat dataset 
  data = data['data']
  label = group['stim']
  b,a = signal.butter(4, (20,20.5),'bandpass',fs=1000)	# BETA bandpass filter
  data = signal.filtfilt(b,a,data.T)
  data = data.T
  index,_=np.where(label<=0)			    # data includes rest and experiment state
  data = np.delete(data, index, 0)		# we will delete those unwanted state
  label = np.delete(label,index,0)		# leave fingerflex we only want
  label = label-1
  label = label[:,0]
  return data,label
 
data = scipy.io.loadmat('drive/MyDrive/train/bp_data.mat')
group = scipy.io.loadmat('drive/MyDrive/train/bp_stim.mat')
bp,bplabel=pre_process(data,group)
 
# dimension reduction 
bp=principal(bp)
bp=uniform(bp)
# visualize
plt.figure(1,figsize=(15,15))
scatter=plt.scatter(bp[:,0],bp[:,1],s=0.1,c=bplabel)
plt.legend(handles=scatter.legend_elements()[0],labels=['thumb','index','middle','ring','little'])
plt.title('After the bandpass filter')


In [None]:
# Visualize the Waveforms from different patients
def data_process(data,group):
  data=data['data']
  group=group['stim']
  b,a = signal.butter(4, (20.1,20.5),'bandpass',fs=1000)
  data = signal.filtfilt(b,a,data.T)
  data = data.T
  index,_=np.where(group==1) # Only plot thumb for this case
  train = data[index,:]
  trainlabel=group[index,:]
  train=principal(train)
  return train,trainlabel

# Load data and Preprocess 
data = scipy.io.loadmat('drive/MyDrive/train/bp_data.mat')	
group = scipy.io.loadmat('drive/MyDrive/train/bp_stim.mat')
bp,_=data_process(data,group)
data = scipy.io.loadmat('drive/MyDrive/train/ht_data.mat')
group = scipy.io.loadmat('drive/MyDrive/train/ht_stim.mat')
ht,_=data_process(data,group)
# plot
timestep=np.arange(15000)		
plt.figure(1,figsize=(15,3))
plt.plot(timestep,bp[timestep,0])
plt.xlabel('timestep')
plt.ylabel('voltage')
plt.title('Patient A thumb waveform')
plt.legend(['Principal Channel one'])
plt.figure(2,figsize=(15,3))
plt.plot(timestep,ht[timestep,0])
plt.xlabel('timestep')
plt.ylabel('voltage')
plt.title('Patient B thumb waveform')
plt.legend(['Principal Channel one'])

In [None]:
# CNN LSTM UMAP-Linear Classifier
import scipy.io
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import random
from torch.utils.data import DataLoader, sampler, TensorDataset 
 
def uniform(data):				# UMAP dimension reduction
  um=umap.UMAP(n_components=30,n_neighbors=15,min_dist=0.1)
  data=um.fit_transform(data)
  return data
 
def principal(data):				# PCA dimension reduction
  pca=PCA(n_components=30)  # Using main 30 channels 
  data=pca.fit_transform(data) 
  return data

def data_process(data,group):
  data=data['data']
  group=group['stim']
  b,a = signal.butter(4, (20.1,20.5),'bandpass',fs=1000)
  data = signal.filtfilt(b,a,data.T)
  data = data.T
  index,_=np.where(group>0)
  train = data[index,:]
  trainlabel=group[index,0]
  # UMAP-Linear model 
  # train=uniform(train)
  train=principal(train)
  trainlabel=trainlabel-1  # Five class for 1,2,3,4,5 but pytorch classification require 0-4
  train,trainlabel=torch.from_numpy(train).float(),torch.from_numpy(trainlabel).long()
  return train,trainlabel
 
class myNetwork(nn.Module):
    def __init__(self, channel ,hidden_size, num_states):
        super().__init__()
        # CNN 
        '''
        self.conv1= nn.Conv1d(channel,32,20,stride=1)
        self.conv2= nn.Conv1d(32,64,10)
        self.conv3= nn.Conv1d(64,128,8)
        self.flat= nn.Flatten()
        '''
        # LSTM
        '''
        self.rnn=nn.LSTM(input_size=channel,
                         hidden_size=hidden_size,
                         num_layers=1,
                         batch_first=True,
                         )
        '''
        # linear layer
        self.bn1 = nn.BatchNorm1d(channel)
        self.fc1 = nn.Linear(channel,hidden_size)
        self.bn2 = nn.BatchNorm1d(hidden_size)
        self.fc2 = nn.Linear(hidden_size,hidden_size)
        self.bn3 = nn.BatchNorm1d(hidden_size)
        self.fc3 = nn.Linear(hidden_size,hidden_size)
        self.bn4 = nn.BatchNorm1d(hidden_size)
        self.fc4 = nn.Linear(hidden_size,num_states)
 
    def forward(self, x):
        # CNN
        '''
        x = self.conv1(x)
        x = F.relu(x)
        x = self.conv2(x)
        x = F.relu(x)
        x = self.conv3(x)
        x = F.relu(x)
        x = self.flat(x)
        '''
        # RNN
        '''
        rout,(h_n,h_c)=self.rnn(x,None)
        x = self.bn2(rout[:,-1,:])
        '''
        x = self.bn1(x)
        x = self.fc1(x)
        x = F.relu(x)
        x = self.bn2(x)
        x = self.fc2(x)
        x = F.relu(x)
        x = self.bn3(x)
        x = self.fc3(x)
        x = F.relu(x)
        x = self.bn4(x)
        scores = self.fc4(x)
        return scores
 
def init_normal(m): #initial linear layer parameters with normal distribution
    if type(m) == nn.Linear:
        nn.init.kaiming_normal_(m.weight,nonlinearity='relu')
 
# Load data and Preprocess 
data = scipy.io.loadmat('drive/MyDrive/train/bp_data.mat')
group = scipy.io.loadmat('drive/MyDrive/train/bp_stim.mat')
bp,bplabel=data_process(data,group)
data = scipy.io.loadmat('drive/MyDrive/train/ht_data.mat')
group = scipy.io.loadmat('drive/MyDrive/train/ht_stim.mat')
ht,htlabel=data_process(data,group)
data = scipy.io.loadmat('drive/MyDrive/train/jp_data.mat')
group = scipy.io.loadmat('drive/MyDrive/train/jp_stim.mat')
jp,jplabel=data_process(data,group)
data = scipy.io.loadmat('drive/MyDrive/train/wc_data.mat')
group = scipy.io.loadmat('drive/MyDrive/train/wc_stim.mat')
wc,wclabel=data_process(data,group)
data = scipy.io.loadmat('drive/MyDrive/train/zt_data.mat')
group = scipy.io.loadmat('drive/MyDrive/train/zt_stim.mat')
zt,ztlabel=data_process(data,group)
data = scipy.io.loadmat('drive/MyDrive/train/jc_data.mat')
group = scipy.io.loadmat('drive/MyDrive/train/jc_stim.mat')
jc,jclabel=data_process(data,group)
data = scipy.io.loadmat('drive/MyDrive/test/mv_data.mat')
group = scipy.io.loadmat('drive/MyDrive/test/mv_stim.mat')
test,testlabel=data_process(data,group)

# Stack all nerual data
train=torch.vstack((bp,ht,jp,wc,zt,jc)) 
trainlabel=torch.hstack((bplabel,htlabel,jplabel,wclabel,ztlabel,jclabel))
channel=30

# LSTM random path assignment
'''
path = 5
for recurrent in range(path):
  pick=np.random.choice(channel,channel,replace='False') 
  train=torch.hstack((train,train[:,pick]))
  test=torch.hstack((test,test[:,pick]))
train=train.view(train.shape[0],path+1,-1)
test=test.view(test.shape[0],path+1,-1)
'''
# CNN reshape 
'''
train=train.view(train.shape[0],channel,-1)
test=test.view(test.shape[0],channel,-1)
# Previous data
column=train
col=test
for i in range(19):   # 20ms window so we need past 19ms data as extra input
  column=torch.roll(column,1, 0)
  column[0,:,0]=0
  col=torch.roll(col,1, 0)
  col[0,:,0]=0
  train=torch.dstack((train,column))
  test=torch.dstack((test,col))
'''
 
# Create Dataset and dataloader
test_ds= TensorDataset(test, testlabel)
train_ds = TensorDataset(train, trainlabel)
train_dl = DataLoader(train_ds, batch_size=400000, shuffle=True, drop_last=True)
test_dl =  DataLoader(test_ds, batch_size=40000, shuffle=False, drop_last=True)
 
# parameter & loss function & optimize
learning_rate = 0.01
decay=0.001
hidden=400
outlayer=5
 
theNetwork=myNetwork(channel,hidden,outlayer)
theNetwork.apply(init_normal)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(theNetwork.parameters(),lr=learning_rate,weight_decay=decay)
epoch=1 # we will stop whenever we want
# start training
while epoch!=0:
  for x,y in train_dl: # batch of training points
    optimizer.zero_grad()
    predict=theNetwork(x)
    loss=criterion(predict,y)
    loss.backward()
    optimizer.step()
  # valid per epoch
  with torch.no_grad():
    accu=0
    length=0
    for x,y in train_dl: #current train error
      output=theNetwork(x)
      _,predict=torch.max(output.data,1)
      accu=accu+(predict==y).sum()
      length=length+len(y)
    traincorrect=accu/length
    accu=0
    length=0
    for xval,yval in test_dl:# current validation error
      output=theNetwork(xval)
      _,predict=torch.max(output.data,1)
      accu=accu+(predict==yval).sum()
      length=length+len(yval)
    testcorrect=accu/length
    print(epoch,traincorrect,testcorrect)
    epoch=epoch+1