In [11]:
############################################################################################################
# Training neural networks using activation matrices and EMG peak-to-peak values
############################################################################################################


import torch
import torch.nn.functional as F
from torch.autograd import Variable
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.model_selection import KFold
import os
from sklearn.metrics import confusion_matrix
from scipy.io import loadmat

# Select the stimulation side (L or R)
side = 'L'

path = '.\\Data\\Neural activity 20230217\\'+side+'\\a_matric\\scale_factor_1_100_20230216\\5.7'
path_list = os.listdir(path)

# Check whether all the files are availible
pattern_range = []

file_range = range(2437)
for i in file_range:
    filename = 'a_matric_'+str(i)+'.npy'
    if filename in path_list:
        pattern_range.append(i)

# Load activation matrices
a_16 = np.zeros([len(pattern_range),320])
a_10 = np.zeros([len(pattern_range),320])
a_73 = np.zeros([len(pattern_range),320])
a_57 = np.zeros([len(pattern_range),320])

k = 0
for i in pattern_range:
    a_16[k,:] = np.load('.\\Data\\Neural activity 20230217\\'+side+'\\a_matric\\scale_factor_1_100_20230216\\16.0\\a_matric_'+str(i)+'.npy')
    a_10[k,:] = np.load('.\\Data\\Neural activity 20230217\\'+side+'\\a_matric\\scale_factor_1_100_20230216\\10.0\\a_matric_'+str(i)+'.npy')
    a_73[k,:] = np.load('.\\Data\\Neural activity 20230217\\'+side+'\\a_matric\\scale_factor_1_100_20230216\\7.3\\a_matric_'+str(i)+'.npy')
    a_57[k,:] = np.load('.\\Data\\Neural activity 20230217\\'+side+'\\a_matric\\scale_factor_1_100_20230216\\5.7\\a_matric_'+str(i)+'.npy')
    k += 1
    
zip_16 = np.zeros((len(pattern_range),32))
zip_10 = np.zeros((len(pattern_range),32))
zip_73 = np.zeros((len(pattern_range),32))
zip_57 = np.zeros((len(pattern_range),32))
for i in range(len(pattern_range)):
    for j in range(320):
        zip_16[i,int(j/10)] += a_16[i,j]
        zip_10[i,int(j/10)] += a_10[i,j]
        zip_73[i,int(j/10)] += a_73[i,j]
        zip_57[i,int(j/10)] += a_57[i,j]

X_DC = loadmat(r'.\\Data\\x_column.mat')['x']
x_dc = X_DC[:,0]
sort_index = np.argsort(x_dc)
R_index_dc = sort_index[0:int(sort_index.shape[0] / 2)]
L_index_dc = sort_index[int(sort_index.shape[0] / 2):]

D_list = [16.0,10.0,7.3,5.7]
dc_a =np.zeros((len(pattern_range),2,4))
n = 0
for k in pattern_range:
    for j in range(4):
        a_m = np.load('.\\Data\\Neural activity 20230217\\'+side+'\\column_a_matric\\scale_factor_1_100_20230216\\'+str(D_list[j])+'\\ca_matric_'+str(k)+'.npy')
        L_n = 0
        R_n = 0
        for i in range(a_m.shape[0]):
            if (a_m[i]==1) and (i in L_index_dc):
                L_n += 1
            elif (a_m[i]==1) and (i in R_index_dc):
                R_n += 1
        L_ratio = L_n/L_index_dc.shape[0]
        R_ratio = R_n/R_index_dc.shape[0]
        dc_a[n,0,j] = L_ratio
        dc_a[n,1,j] = R_ratio
    n = n + 1

print(dc_a.shape)
x = np.zeros((len(pattern_range),16+2,4))
x[:,0:16,0] = zip_16[:,0:16]/10
x[:,0:16,1] = zip_10[:,0:16]/10
x[:,0:16,2] = zip_73[:,0:16]/10
x[:,0:16,3] = zip_57[:,0:16]/10
x[:,16:18,:] = dc_a

# Load EMG peak-to-peak values
data_frame = pd.read_excel(r'.\\Data\\EMG data of P2\\P2_Mapping_20230217.xlsx',sheet_name=side,header=0,usecols=range(12))
EMG = np.array(data_frame)

# Nomarlization with the maximum of each channel
m1 = [2476.925299,2713.996826,655.707048,2761.23056,5883.62704,469.9733753,661.5074164,2377.081812,950.7308385,1726.09589,1635.000931,961.7593543]
m2 = [1582.94892,2481.605571,474.2016415,2529.903187,4396.750533,372.8770509,1524.564656,2915.445047,1343.855694,1847.565441,2173.126584,946.5696596]
m = []
for i in range(12):
    if m1[i] > m2[i]:
        m.append(m1[i])
    else:
        m.append(m2[i])

EMG = EMG/m
emg_range = []
for i in range(len(pattern_range)):
    emg_range.append(pattern_range[i]-1)


(2436, 2, 4)


In [None]:
# Define loss function, containing Ordinal Loss and MSE Loss
def lossfunc(x, y, predict, net):
    
    mse_loss = torch.nn.MSELoss()(y, predict)
    
    x_h = x.cpu().numpy()
    x_l = x.cpu().numpy()
    
    index1 = list(np.random.randint(0,x.shape[0],size=int(0.1*x.shape[0])))
    x_h = x_h[index1,:,:]
    x_l = x_l[index1,:,:]
    index2 = list(np.random.randint(0,18,size=np.random.randint(5,12)))
    x_h[:,index2,:] = x_h[:,index2,:] + 0.1
    x_l[:,index2,:] = x_l[:,index2,:] - 0.1
    x_h = np.minimum(x_h, 1)
    x_l = np.maximum(x_l, 0)

    sort_loss = 0
    mu = 1
    
    x_h = torch.from_numpy(x_h).cuda()
    x_l = torch.from_numpy(x_l).cuda()
    y_h = net(x_h)
    y_l = net(x_l)
    
    dif_h = (predict[index1] - y_h).detach().cpu().numpy()
    dif_l = (y_l - predict[index1]).detach().cpu().numpy()
    dif_h = np.maximum(dif_h, 0)
    dif_l = np.maximum(dif_l, 0)
    sort_loss = np.mean(dif_h) + np.mean(dif_l)
    
    loss = mse_loss + sort_loss
    
    return loss

# Define neural network
class Net(torch.nn.Module):
    def __init__(self):
        super(Net,self).__init__()
        self.hidden1 = torch.nn.Linear(4,18)
        self.hidden2 = torch.nn.Linear(18*18,6)
        self.predict = torch.nn.Linear(6,1)

    def forward(self,x):
        x = torch.relu(self.hidden1(x))
        x = x.reshape(x.size(0), -1)
        x = torch.sigmoid(self.hidden2(x))
        x = torch.sigmoid(self.predict(x))
        x = torch.flatten(x)
        return x

# Training with 5-fold strategy: save a best model for each fold
for channel in [0,1,2,3,4,5,6,7,8,9,10,11]:
    y = EMG[emg_range,channel]

    index_lose  = []
    for i in range(y.shape[0]):
        if y[i] < 0:
            index_lose.append(i) 

    y = np.delete(y,index_lose)
    x = np.delete(x,index_lose,axis=0)

    # print(y.shape)
    print('channel ' + str(channel) + ':',np.mean(y))

    max_y = max(y)
    y = y/max_y

    x_tt, x_v, y_tt, y_v = train_test_split(x, y, test_size=0.2, random_state=14)
    
    kf = KFold(n_splits=5, shuffle=True, random_state=14)

    net = Net()
    net = net.cuda()
    print(net)

    opt = torch.optim.SGD(net.parameters(),lr=0.5)

    lossfunc2 = torch.nn.MSELoss()
    lossfunc2 = lossfunc2.cuda()
    
    global_best_loss = 1
    v_pred = []
    for fold, (train_ids, test_ids) in enumerate(kf.split(x_tt)):
        x_train = x_tt[train_ids,:,:]
        y_train = y_tt[train_ids]
        x_test = x_tt[test_ids,:,:]
        y_test = y_tt[test_ids]
    
        x_train = torch.tensor(x_train)
        y_train = torch.tensor(y_train)
        x_test = torch.tensor(x_test)
        y_test = torch.tensor(y_test)
        x_v, y_v, x_tt, y_tt = torch.tensor(x_v),torch.tensor(y_v),torch.tensor(x_tt),torch.tensor(y_tt)

        x_train = x_train.to(torch.float32)
        y_train = y_train.to(torch.float32)
        x_test = x_test.to(torch.float32)
        y_test = y_test.to(torch.float32)
        x_v, y_v, x_tt, y_tt = x_v.to(torch.float32),y_v.to(torch.float32),x_tt.to(torch.float32),y_tt.to(torch.float32)
        
        x_train,y_train = Variable(x_train),Variable(y_train)
        x_test,y_test = Variable(x_test),Variable(y_test)
        x_v, y_v, x_tt, y_tt = Variable(x_v),Variable(y_v),Variable(x_tt),Variable(y_tt)
        
        x_train = x_train.cuda()
        y_train = y_train.cuda()
        x_test = x_test.cuda()
        y_test = y_test.cuda()
        x_v, y_v, x_tt, y_tt = x_v.cuda(),y_v.cuda(),x_tt.cuda(),y_tt.cuda()
###############################################################################################################################
        global_best_loss = 100
        min_test_loss = 100
        for i in range(5):
            net = Net()
            net = net.cuda()
            opt = torch.optim.SGD(net.parameters(),lr=0.5)
            min_test_loss = 100
            for t in range(30000):
                prediction = net(x_train)
                loss = lossfunc(x_train, y_train, prediction, net)
                loss = loss.cuda()
                test_prediction = net(x_test)
                test_loss = lossfunc2(y_test, test_prediction)
                if test_loss < min_test_loss:
                    min_test_loss = test_loss
                    torch.save(net, '.\\Data\\Torch models\\P2_20240716_'+side+'_channel'+str(channel)+'_fold'+str(fold)+'_round'+str(i)+'.pt')
                if test_loss <= global_best_loss:
                    global_best_loss = test_loss
                    torch.save(net, '.\\Data\\Torch models\\P2_20240716_'+side+'_channel'+str(channel)+'_fold'+str(fold)+'_best.pt')
                opt.zero_grad()
                loss.backward()
                opt.step()
            print(fold,i,"End!")
            print("test loss:",min_test_loss)
            v_prediction = net(x_v)
            v_pred.append(v_prediction)
            v_loss = lossfunc2(v_prediction, y_v)
            print("v_loss:",v_loss)
        temp_model = torch.load(r'.\\Data\\Torch models\\P2_20240716_'+side+'_channel'+str(channel)+'_fold'+str(fold)+'_best.pt')
#         temp_model = temp_model.cpu()
        v_prediction = temp_model(x_v)
        v_pred.append(v_prediction)
        v_loss = lossfunc2(v_prediction, y_v)
    mean_vp = (v_pred[0]+v_pred[1]+v_pred[2]+v_pred[3]+v_pred[4])/5
    mean_vloss = lossfunc2(mean_vp, y_v)
    print("mean_vloss:", mean_vloss)

channel 0: 0.1439083296516631
Net(
  (hidden1): Linear(in_features=4, out_features=18, bias=True)
  (hidden2): Linear(in_features=324, out_features=6, bias=True)
  (predict): Linear(in_features=6, out_features=1, bias=True)
)
0 0 End!
test loss: tensor(0.0099, device='cuda:0', grad_fn=<MseLossBackward>)
v_loss: tensor(0.0163, device='cuda:0', grad_fn=<MseLossBackward>)
0 1 End!
test loss: tensor(0.0099, device='cuda:0', grad_fn=<MseLossBackward>)
v_loss: tensor(0.0158, device='cuda:0', grad_fn=<MseLossBackward>)
0 2 End!
test loss: tensor(0.0106, device='cuda:0', grad_fn=<MseLossBackward>)
v_loss: tensor(0.0161, device='cuda:0', grad_fn=<MseLossBackward>)
