In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn import init
import matplotlib.pyplot as plt
from sklearn.model_selection import StratifiedKFold
from scipy import interp
from sklearn.metrics import roc_curve,auc
from sklearn.metrics import confusion_matrix
import warnings
from sklearn.metrics import accuracy_score
from sklearn.preprocessing import label_binarize
from torch.autograd import Variable
from torch.utils.data import Dataset,DataLoader,TensorDataset
from sklearn.model_selection import GridSearchCV
from sklearn import metrics

In [None]:
class Net(nn.Module):
    def __init__(self,in_dim ,n_hidden_1,n_hidden_2,n_hidden_3,n_hidden_4,out_dim):  
        super(Net,self).__init__()
        self.w1=torch.nn.Parameter(torch.randn(n_hidden_1,in_dim, requires_grad =True))  
        self.b1=torch.nn.Parameter(torch.zeros(n_hidden_1,requires_grad =True))
        self.fc1=nn.Sequential(nn.ReLU(True),nn.Dropout(0.5))
        self.fc2=nn.Sequential(nn.Linear(n_hidden_1,n_hidden_2),nn.ReLU(True),nn.Dropout(0.1))
        self.fc3=nn.Sequential(nn.Linear(n_hidden_2,n_hidden_3),nn.ReLU(True),nn.Dropout(0.1))
        self.fc4=nn.Sequential(nn.Linear(n_hidden_3,n_hidden_4),nn.ReLU(True),nn.Dropout(0.1))
        self.fc5=nn.Sequential(nn.Linear(n_hidden_4,out_dim))
    def forward(self,x,partition):
        x=torch.add(x.matmul(self.w1.mul(partition).permute(1,0)),self.b1)
       
        x=self.fc1(x)                            
        x=self.fc2(x)
        x=self.fc3(x)
        x=self.fc4(x)
        out=self.fc5(x)
        return out

In [None]:
import numpy as np
file1 = '表达矩阵.csv'
file2 = 'TOM.csv'
expression = np.loadtxt(file1, dtype=float, delimiter=",")
partition = np.loadtxt(file2, dtype=float, delimiter=",")
partition =torch.from_numpy(partition) 
partition.requires_grad=False
labels = np.array(expression[:,-1], dtype=int)
labels -=1
label_vec = torch.from_numpy(labels)
expression = torch.from_numpy(np.array(expression[:,:-1]))
in_dim = expression.size()[1]
n_hidden_1 = in_dim

In [None]:
def preparation():
    weight_p, bias_p = [],[]
    for name, p in net.named_parameters():
        if 'bias' in name:
            bias_p += [p]
        elif 'b1' in name:
            bias_p += [p]
        else:
            weight_p += [p]
    optimizer =torch.optim.Adam([{'params': weight_p, 'weight_decay':0.01},   
                      {'params': bias_p, 'weight_decay':0}],lr=0.001)
    cost = nn.CrossEntropyLoss()   
    return optimizer,cost

In [None]:
def preprocess(x_train,x_test,y_train,y_test):
    dataset1 = TensorDataset(x_train,y_train)
    train_loader = DataLoader(dataset=dataset1,batch_size=8,shuffle=False)
    dataset2 = TensorDataset(x_test,y_test)
    test_loader =DataLoader(dataset=dataset2,batch_size=x_test.size()[0],shuffle=False)
    return dataset1,dataset2,train_loader,test_loader

In [None]:
def adjust_learning_rate2(optimizer, epoch, lr): 
    if epoch <=20:
        lr =lr
    
    else:
        lr = lr*(0.1 ** ((epoch-20) // 5+1))
    for param_group in optimizer.param_groups:
        param_group['lr'] = lr

In [None]:
def cnf(true_value, prediction_value): 
    cnf_matrix = confusion_matrix(true_value, prediction_value)  

    FP = cnf_matrix.sum(axis=0) - np.diag(cnf_matrix)    
    FN = cnf_matrix.sum(axis=1) - np.diag(cnf_matrix)   
    TP = np.diag(cnf_matrix)  
    TN = cnf_matrix.sum() - (FP + FN + TP)
    FP = FP.astype(float)
    FN = FN.astype(float) 
    TP = TP.astype(float)
    TN = TN.astype(float)

    ACC = accuracy_score(true_value, prediction_value)

    TPR = metrics.recall_score(true_value, prediction_value, average='macro')  

    TNR = np.mean(TN / (TN + FP))

    F1_score = metrics.f1_score(true_value, prediction_value, average='macro')
    return ACC,TPR,TNR,F1_score,cnf_matrix 

In [None]:
class RunningStats1: 

    def __init__(self):
        self.n = 0
        self.old_m = torch.zeros(n_hidden_1,in_dim)
        self.new_m = torch.zeros_like(self.old_m)
        self.old_s = torch.zeros_like(self.old_m)
        self.new_s = torch.zeros_like(self.old_m)
        self.liwai = torch.zeros_like(self.old_m)

    def clear(self):
        self.n = 0

    def push(self, x):
        self.n += 1

        if self.n == 1:
            self.old_m = self.new_m = x
            self.old_s = 0.
        else:
            self.new_m = self.old_m + (x - self.old_m) / self.n
            self.new_s = self.old_s + (x - self.old_m) * (x - self.new_m)

            self.old_m = self.new_m
            self.old_s = self.new_s

    def mean(self):
        return self.new_m if self.n else self.liwai

    def variance(self):
        return self.new_s / (self.n - 1) if self.n > 1 else self.liwai

    def standard_deviation(self):
      
        return self.variance().sqrt()

In [None]:
class RunningStats2:

    def __init__(self):
        self.n = 0
        self.old_m = torch.zeros(128,n_hidden_1)
        self.new_m = torch.zeros_like(self.old_m)
        self.old_s = torch.zeros_like(self.old_m)
        self.new_s = torch.zeros_like(self.old_m)
        self.liwai = torch.zeros_like(self.old_m)
    def clear(self):
        self.n = 0

    def push(self, x):
        self.n += 1

        if self.n == 1:
            self.old_m = self.new_m = x
            self.old_s = 0.
        else:
            self.new_m = self.old_m + (x - self.old_m) / self.n
            self.new_s = self.old_s + (x - self.old_m) * (x - self.new_m)

            self.old_m = self.new_m
            self.old_s = self.new_s

    def mean(self):
        return self.new_m if self.n else self.liwai

    def variance(self):
        return self.new_s / (self.n - 1) if self.n > 1 else self.liwai

    def standard_deviation(self):
      #  return math.sqrt(self.variance())
        return self.variance().sqrt()

In [None]:
def setup_seed(seed):     
    np.random.seed(seed)
    torch.manual_seed(seed) 
    torch.cuda.manual_seed_all(seed)  
    torch.backends.cudnn.deterministic = True 
    torch.backends.cudnn.benchmark = True  

In [None]:
warnings.filterwarnings("ignore")
skf = StratifiedKFold(n_splits=10,random_state=0)
fold = 0
ACC = []
SPE = []
TPR = []
F1_score = []
tprs = []
aucs = []
mean_fpr = np.linspace(0, 1, 100)
for train_index,test_index in skf.split(expression,label_vec):
    print('fold[{}/{}]'.format(fold+1,10))
    x_train,x_test = expression[train_index] ,expression[test_index]
    y_train,y_test = label_vec[train_index] , label_vec[test_index]
    dataset1,dataset2,train_loader,test_loader = preprocess(x_train,x_test,y_train,y_test)
    net =Net( in_dim,n_hidden_1,n_hidden_2=128,n_hidden_3=32,n_hidden_4=16,out_dim=6)
    #optimizer,cost,lr_init = preparation()
    optimizer,cost = preparation()
    lr_init = optimizer.param_groups[0]['lr']
    setup_seed(120)
    def weigth_init(m):
        if isinstance(m, nn.Linear):
            m.weight.data.normal_(0,0.1)
            m.bias.data.zero_()
    net.apply(weigth_init)  #完成权重和偏置的初始化
    num_epo =50
    train_losses = []
    train_acces = []
    
    rs_1 = RunningStats1()
    rs_2 = RunningStats2()
    rs_3 = RunningStats3()
    
    
    for epoch in range(num_epo):
        print('epoch[{}/{}]'.format(epoch+1,num_epo))
        train_loss = 0.
        train_acc = 0.
        adjust_learning_rate(optimizer, epoch, lr_init)
        terminate_w1 = list(net.parameters())[0]*1000
        terminate_w1 = terminate_w1.detach()   
        terminate_w2 = list(net.parameters())[2]*1000
        terminate_w2 = terminate_w2.detach()
        rs_1.push(terminate_w1)
        rs_2.push(terminate_w2)
        
        for batch_x,batch_y in train_loader:
            batch_x = batch_x.float()
            partition = partition.float()
            batch_y = batch_y.long()  
            batch_x, batch_y = Variable(batch_x), Variable(batch_y)
            out = net(batch_x,partition)
            loss =cost(out,batch_y)   
            train_loss += loss.item()
            pred = torch.max(out,1)[1]
            train_correct=(pred==batch_y).sum()
            train_acc+=train_correct.item()
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
    
        train_losses.append(train_loss / len(dataset1))
        train_acces.append(train_acc/len(dataset1))
        print('Train Loss: {:.6f}, Acc: {:.6f}'.format(train_loss / (len(
        dataset1)), train_acc / (len(dataset1))))
                
    with torch.no_grad():
        lastweight_1 =list(net.parameters())[0]
        lastweight_2 =list(net.parameters())[2]
        stdev_1 = rs_1.standard_deviation()
        stdev_2 = rs_2.standard_deviation()
        score_1 = torch.sum(torch.mul(stdev_1,torch.abs(torch.mul(lastweight_1,partition))),1)
        score_1 = (score_1-torch.min(score_1))/(torch.max(score_1)-torch.min(score_1))
        score_2 = torch.sum(torch.mul(stdev_2,torch.abs(lastweight_2)),0)
        score = score_1+score_2
        finalscore =(score-torch.min(score))/(torch.max(score)-torch.min(score))
          
    rs_3.push(finalscore)
    
    
    with torch.no_grad():
        net.eval()
        eval_loss = 0.
        eval_acc = 0.
        for batch_x, batch_y in test_loader:
            batch_x=batch_x.float()
            batch_y=batch_y.long()
            batch_x, batch_y = Variable(batch_x), Variable(batch_y)
            out = net(batch_x,partition)
            loss = cost(out, batch_y)
            eval_loss += loss.item()
            pred = torch.max(out, 1)[1]
            num_correct = (pred == batch_y).sum()
            eval_acc += num_correct.item()
        batch_y = batch_y
        ACC_val, TPR_val, SPE_val, F1_score_val, cnf_matrix = cnf(batch_y, pred)
        ACC.append(ACC_val)
        SPE.append(SPE_val)
        TPR.append(TPR_val)
        F1_score.append(F1_score_val)
        out = out.numpy()
        y_one_hot = label_binarize(batch_y, np.arange(6)) 
        
        fpr, tpr, thresholds = metrics.roc_curve(y_one_hot.ravel(), out.ravel())
        tprs.append(interp(mean_fpr, fpr, tpr))
        tprs[-1][0] = 0.0          
        roc_auc = auc(fpr, tpr)   
        aucs.append(roc_auc)    
        print('Test Loss: {:.6f}, Acc: {:.6f}'.format(eval_loss / (len(
        dataset2)), eval_acc / (len(dataset2))))
   
    break
    
    fold +=1

mean_tpr = np.mean(tprs, axis=0)   
mean_tpr[-1] = 1.0  
mean_auc = auc(mean_fpr, mean_tpr)  

importance1 = np.array(rs_3.mean())

outfile_1 ='xx.csv'

np.savetxt(outfile_1, importance1, delimiter=",")

print( 'ACC:', np.mean(ACC), 'TPR:', np.mean(TPR),
          'SPE', np.mean(SPE), 'F1_score:', np.mean(F1_score),'AUC:',mean_auc)
                