In [1]:
import math
import re
import torch
from torch import nn
from torch.nn import functional as F
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split

## 1.导入数据

In [2]:
data = pd.read_csv(r'基准数据集.csv')  # 导入数据
sequences = data.Sequence  # 序列
labels = data.Label.values  # 标签

## 2.预处理：每个字母为单个字符串

In [3]:
pat = re.compile('[AGCTagct]')

def pre_process(text):
    text = pat.findall(text)
    text = [each.lower() for each in text]
    return text

x = sequences.apply(pre_process)

## 3.创建词典：word_index

In [4]:
word_set = set()  # 初始化

for lst in x:
    for word in lst:
        word_set.add(word)

word_list = list(word_set)
word_index = dict([(each, word_list.index(each) + 1) for each in word_list])

## 4.将序列中的字母转换为数字

In [5]:
text = x.apply(lambda x: [word_index.get(word, 0) for word in x])

## 5.固定序列的长度，进行截断或填充操作

In [6]:
text_len = 1200  # 长度固定为1200

pad_text = [l + (text_len - len(l)) * [0] if len(l) < text_len else l[:text_len] for l in text]  # 用0填充或者截断

pad_text = np.array(pad_text)  # 转为数组

## 6.划分训练集和测试集

In [7]:
x_train, x_test, y_train, y_test = train_test_split(pad_text, labels, test_size=0.3)  # 默认采用分层抽样构建训练集

## 7.构建数据迭代器

In [8]:
class Mydataset(torch.utils.data.Dataset):
    def __init__(self, text_list, label_list):  # 将序列和标签保存在类属性中
        self.text_list = text_list
        self.label_list = label_list
    
    def __getitem__(self,index):  # 索引序列和标签
        text = torch.LongTensor(self.text_list[index])  # Tensor中的元素转换为64位整型
        label = self.label_list[index]
        return text, label
    
    def __len__(self):
        return len(self.text_list)  # 返回序列数

train_ds = Mydataset(x_train, y_train)  # 将训练集单独存储
test_ds = Mydataset(x_test, y_test)  # 将测试集单独存储

batch_size = 32  # 批量大小

train_dl = torch.utils.data.DataLoader(train_ds, batch_size=batch_size, shuffle=True)
test_dl = torch.utils.data.DataLoader(test_ds, batch_size=batch_size, shuffle=False)

## 8.定义网络架构

In [9]:
embed_dim = 24  # 嵌入维度
hidden_size = 20  # 隐藏层单元数

class Net(nn.Module):
    def __init__(self, word_list, embed_dim, hidden_size):
        super(Net, self).__init__()
        self.em = nn.Embedding(len(word_list) + 1, embed_dim)   
        self.rnn = nn.LSTM(embed_dim, hidden_size,bidirectional=True)     
        self.fc1 = nn.Linear(2*hidden_size, 128)
        self.fc2 = nn.Linear(128, 2)

    def forward(self, inputs):
        bz = inputs.shape[1]
        h0 = torch.zeros((2, bz, hidden_size)).to('cuda')
        c0 = torch.zeros((2, bz, hidden_size)).to('cuda')
        x = self.em(inputs)
        r_o, _ = self.rnn(x, (h0, c0))
        r_o = r_o[-1]
        x = F.dropout(F.relu(self.fc1(r_o)))
        x = self.fc2(x)
        
        return x

## 9.定义交叉熵损失函数和Adam优化器

In [10]:
model = Net(word_list, embed_dim, hidden_size)  # 实例化
model = model.to('cuda')  # 将模型放到gpu上

loss = nn.CrossEntropyLoss()  # 默认求每个batch下的平均损失
loss = loss.to('cuda')

optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)

In [11]:
epoch_tr_y=[]
epoch_tr_y_pre=[]
epoch_tr_AUC=[]
epoch_te_AUC=[]

## 10.定义训练函数，并计算评价指标(1个epoch)

In [12]:
def fit(model, optimizer, train_dl, test_dl):
    
    tr_correct = 0  # 预测正确的个数
    tr_total = 0  # 总样本数
    tr_loss = 0
    tr_TP = 0
    tr_TN = 0
    tr_FP = 0
    tr_FN = 0
    
    model.train()  # 训练模式
    for x, y in train_dl:
        x = x.permute(1, 0)
        x, y = x.to('cuda'), y.to('cuda')
        y_pred = model(x)
        loss_value = loss(y_pred, y)
        #flood=(loss_value - 0.002).abs() + 0.002  # 洪泛函数：防止过拟合
        optimizer.zero_grad()
        loss_value.backward()
        optimizer.step()
        
        with torch.no_grad():
            y_pred = torch.argmax(y_pred, dim=1)
            tr_correct += (y_pred == y).sum().item()
            tr_TP += ((y_pred == y) & (y == 1)).sum().item()
            tr_FN += ((y_pred != y) & (y == 1)).sum().item()
            tr_FP += ((y_pred != y) & (y == 0)).sum().item()
            tr_TN += ((y_pred == y) & (y == 0)).sum().item()
            tr_total += len(y)
            tr_loss += loss_value.item()  # 最后的loss还要除以batch数
            
    """1个epoch训练结束后，计算训练集的各个指标"""
    epoch_tr_loss = tr_loss / len(train_dl)
    epoch_tr_accuracy = tr_correct / tr_total
    epoch_tr_MCC = (tr_TP * tr_TN - tr_TP * tr_FN) / (math.sqrt((tr_TP + tr_FP) * (tr_TP + tr_FN) * (tr_TN + tr_FP) * (tr_TN + tr_FN)))
    epoch_tr_SE=tr_TP/(tr_TP+tr_FN)
    epoch_tr_SPC = tr_TN / (tr_TN + tr_FP)
    epoch_tr_PPV= tr_TP / (tr_TP + tr_FP)
    epoch_tr_NPV= tr_TN / (tr_TN + tr_FN)
    epoch_tr_recall = tr_TP / (tr_TP + tr_FN)
    epoch_tr_precision = tr_TP / (tr_TP + tr_FP)
    epoch_tr_F1 = (2 * epoch_tr_precision * epoch_tr_recall) / (epoch_tr_precision + epoch_tr_recall)
    
    
    te_correct = 0  # 预测正确的个数
    te_total = 0  # 总样本数
    te_loss = 0
    te_TP = 0
    te_TN = 0
    te_FP = 0
    te_FN = 0
    
    model.eval()  # 评估模式
    with torch.no_grad():
        for x, y in test_dl:
            x = x.permute(1, 0)
            x, y = x.to('cuda'), y.to('cuda')
            y_pred = model(x)
            loss_value = loss(y_pred, y)
            y_pred = torch.argmax(y_pred, dim=1)
            te_correct += (y_pred == y).sum().item()
            te_TP += ((y_pred == y) & (y == 1)).sum().item()
            te_FN += ((y_pred != y) & (y == 1)).sum().item()
            te_FP += ((y_pred != y) & (y == 0)).sum().item()
            te_TN += ((y_pred == y) & (y == 0)).sum().item()
            te_total += len(y)
            te_loss += loss_value.item()
        
    """1个epoch训练结束后，计算测试集的各个指标"""
    epoch_te_loss = te_loss / len(test_dl)
    epoch_te_accuracy = te_correct / te_total
    epoch_te_MCC = (te_TP * te_TN - te_TP * te_FN) / (math.sqrt((te_TP + te_FP) * (te_TP + te_FN) * (te_TN + te_FP) * (te_TN + te_FN)))
    epoch_te_SE=te_TP/(te_TP+te_FN)
    epoch_te_SPC = te_TN / (te_TN + te_FP)
    epoch_te_PPV= te_TP / (te_TP + te_FP)
    epoch_te_NPV= te_TN / (te_TN + te_FN)
    epoch_te_recall = te_TP / (te_TP + te_FN)
    epoch_te_precision = te_TP / (te_TP + te_FP)
    epoch_te_F1 = (2 * epoch_te_precision * epoch_te_recall) / (epoch_te_precision + epoch_te_recall)


    return epoch_tr_loss, epoch_tr_accuracy, epoch_tr_MCC, epoch_tr_SE,epoch_tr_F1, epoch_tr_SPC,epoch_tr_PPV, epoch_tr_NPV,epoch_te_loss, epoch_te_accuracy, epoch_te_MCC, epoch_te_SE, epoch_te_SPC,epoch_te_PPV,epoch_te_F1,epoch_te_NPV 

## 11.初始化，用于存储各指标

In [13]:
tr_loss = []
tr_accuracy = []
tr_MCC = []
tr_SE = []
tr_SPC = []
tr_PPV=[]
tr_NPV=[]
tr_AUC=[]
tr_F1=[]

te_loss = []
te_accuracy = []
te_MCC = []
te_SE = []
te_SPC = []
te_PPV=[]
te_NPV=[]
te_AUC=[]
te_F1=[]
from sklearn import metrics

## 12.开始训练

In [15]:
epochs = 100

for epoch in range(epochs): 
    print(f'{epoch} : ',end = '')
    epoch_tr_loss, epoch_tr_accuracy, epoch_tr_MCC, epoch_tr_SE,epoch_tr_F1, epoch_tr_SPC,epoch_tr_PPV, epoch_tr_NPV,epoch_te_loss, epoch_te_accuracy, epoch_te_MCC, epoch_te_SE, epoch_te_SPC,epoch_te_PPV,epoch_te_F1,epoch_te_NPV= fit(model, optimizer, train_dl, test_dl)
    tr_loss.append(epoch_tr_loss)
    tr_accuracy.append(epoch_tr_accuracy)
    tr_MCC.append(epoch_tr_MCC)
    tr_SE.append(epoch_tr_SE)
    tr_SPC.append(epoch_tr_SPC)
    tr_PPV.append(epoch_tr_PPV)
    tr_NPV.append(epoch_tr_NPV)
    tr_F1.append(epoch_tr_F1)
    "tr_AUC.append(epoch_tr_AUC)"
    
    te_loss.append(epoch_te_loss)
    te_accuracy.append(epoch_te_accuracy)
    print(epoch_te_accuracy)
    te_MCC.append(epoch_te_MCC)
    te_SE.append(epoch_te_SE)
    te_SPC.append(epoch_te_SPC)
    te_PPV.append(epoch_te_PPV)
    te_NPV.append(epoch_te_NPV)
    te_F1.append(epoch_te_F1)
    "te_AUC.append(epoch_te_AUC)"  

0 : 0.6845833333333333
1 : 0.6822916666666666
2 : 0.6854166666666667
3 : 0.6858333333333333
4 : 0.6910416666666667
5 : 0.6879166666666666
6 : 0.6952083333333333
7 : 0.68125
8 : 0.6775
9 : 0.681875
10 : 0.6785416666666667
11 : 0.67875
12 : 0.6791666666666667
13 : 0.679375
14 : 0.679375
15 : 0.679375
16 : 0.6795833333333333
17 : 0.6802083333333333
18 : 0.6804166666666667
19 : 0.680625
20 : 0.6802083333333333
21 : 0.6814583333333334
22 : 0.6802083333333333
23 : 0.6852083333333333
24 : 0.6820833333333334
25 : 0.685625
26 : 0.6827083333333334
27 : 0.6835416666666667
28 : 0.6845833333333333
29 : 0.69
30 : 0.689375
31 : 0.6954166666666667
32 : 0.6752083333333333
33 : 0.669375
34 : 0.6835416666666667
35 : 0.6795833333333333
36 : 0.6825
37 : 0.69375
38 : 0.6666666666666666
39 : 0.68125
40 : 0.688125
41 : 0.6797916666666667
42 : 0.6864583333333333
43 : 0.6966666666666667
44 : 0.6902083333333333
45 : 0.6841666666666667
46 : 0.68
47 : 0.6577083333333333
48 : 0.68125
49 : 0.6814583333333334
50 : 0.

## 13.整理训练结果

In [20]:
column_name = ['loss', 'accuracy', 'MCC', 'SE', 'SPC', 'PPV', 'NPV','F1']

tr_loss = pd.Series(tr_loss)
tr_accuracy = pd.Series(tr_accuracy)
tr_MCC = pd.Series(tr_MCC)
tr_SE = pd.Series(tr_SE)
tr_SPC = pd.Series(tr_SPC)
tr_PPV = pd.Series(tr_PPV)
tr_NPV = pd.Series(tr_NPV)
tr_F1 = pd.Series(tr_F1)

tr_result = pd.concat([tr_loss, tr_accuracy, tr_MCC, tr_SE, tr_SPC, tr_PPV, tr_NPV,tr_F1], axis=1)
tr_result.columns = column_name

## 14.整理测试结果

In [21]:
column_name = ['loss', 'accuracy', 'MCC', 'SE', 'SPC', 'PPV', 'NPV','F1']

te_loss = pd.Series(te_loss)
te_accuracy = pd.Series(te_accuracy)
te_MCC = pd.Series(te_MCC)
te_SE = pd.Series(te_SE)
te_SPC = pd.Series(te_SPC)
te_PPV = pd.Series(te_PPV)
te_NPV = pd.Series(te_NPV)
te_F1= pd.Series(te_F1)

te_result = pd.concat([te_loss, te_accuracy, te_MCC, te_SE, te_SPC, te_PPV, te_NPV,te_F1], axis=1)
te_result.columns = column_name
# te_result.index = [*range(1, epochs + 1)]

## 15.输出结果

In [22]:
max(te_result.accuracy)

0.6966666666666667

In [19]:
# save
tr_result.to_csv(r'./调参前后/Word_embedding_LSTMBI_train_result.csv')
te_result.to_csv(r'./调参前后/Word_embedding_LSTMBI_valid_result.csv')