In [None]:
%load_ext autoreload
%autoreload 1
%aimport ecg_get_data
%aimport Models
%aimport train_test_validat
%aimport self_attention
%aimport ECGplot
import Models 
from train_test_validat import *
from self_attention import *
from  ecg_get_data import *
import matplotlib.pyplot as plt

import torch
import torch.utils.data as Data
import random

import time
random_seed = 1
torch.manual_seed(random_seed)    # reproducible
torch.cuda.manual_seed_all(random_seed)
random.seed(random_seed)
np.random.seed(random_seed)

time_str = time.strftime("%Y%m%d_%H%M%S", time.localtime()) 

In [None]:

data_path =  './npy_ECG/' #路径
lable_path = './label.npy'
model_path = './model/'+time_str
log_path = './log/'+  time_str


EcgChannles_num = 12
EcgLength_num = 5000
DEVICE = torch.device("cuda:1" if torch.cuda.is_available() else "cpu")
print(DEVICE)

In [None]:
x = load_data(data_path,EcgChannles_num=EcgChannles_num,EcgLength_num=EcgLength_num)
y = load_label(lable_path)

In [None]:
from sklearn.model_selection import train_test_split
train_x,test_x,train_y,test_y = train_test_split(x,y,train_size=0.9,random_state = random_seed,shuffle = True,stratify=y)
print("         HTN     NHTN ")
print("train: %5d   %5d" % (train_y.sum(),len(train_y)-train_y.sum()))
print("test : %5d   %5d" % (test_y.sum(),len(test_y)-test_y.sum()))

In [None]:
#train_dataset,valid_dataset = load_numpy_dataset_to_tensor_dataset(x,y,random_seed=random_seed,train_rate = 0.8)

In [None]:
BATCH_SIZE = 128
FOLDS = 1
EPOCHS = 2000  
PATIENCE = 64
LR = 0.00001

In [None]:
from torch.utils.tensorboard import SummaryWriter   
os.makedirs(model_path, exist_ok=True)
writer = SummaryWriter(log_path)
from torchsummary import summary

In [None]:
NET = [Models.CNN_ATT7() for i in range(FOLDS)]
print(summary(NET[0], (EcgChannles_num,EcgLength_num), device="cpu"))
writer.add_graph(NET[0],torch.rand([1,EcgChannles_num,EcgLength_num]))

In [None]:
from torch.optim.lr_scheduler import CosineAnnealingLR

for fold in range(FOLDS):
    
    early_stopping = EarlyStopping(PATIENCE, verbose=True, model_path=model_path, delta=0)
    train_dataset,valid_dataset = get_k_fold_dataset(fold=int(fold+1),x = train_x,y=train_y,k=FOLDS,random_seed = random_seed)
    train_dataloader = Data.DataLoader(dataset=train_dataset, batch_size=BATCH_SIZE, shuffle=True)
    valid_dataloader = Data.DataLoader(dataset=valid_dataset, batch_size=BATCH_SIZE, shuffle=True)

    NET[fold].to(DEVICE)
    optimizer  = torch.optim.Adam(NET[fold].parameters(), lr=LR)  
    criterion = torch.nn.CrossEntropyLoss()   
    #等间隔调整学习率
    #scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer,T_max = 32)
    best_loss = 3
    for epoch in range(1,EPOCHS):
        time_all=0
        start_time = time.time()
        train_loss,train_acc = train_model(train_dataloader, NET[fold], criterion, optimizer,DEVICE) # 训练模型
        #scheduler.step() # 学习率迭代

        time_all = time.time()-start_time
        validate_loss,validate_acc = test_model(train_dataloader,criterion,NET[fold],DEVICE) # 测试模型
        writer.add_scalars(main_tag=str(fold)+'_Loss',tag_scalar_dict={'train': train_loss,'validate': validate_loss},global_step=epoch)
        writer.add_scalars(main_tag=str(fold)+'_Accuracy',tag_scalar_dict={'train': train_acc,'validate': validate_acc},global_step=epoch)
        writer.add_scalars(main_tag=str(fold)+'_LearningRate',tag_scalar_dict={'LR': optimizer.state_dict()['param_groups'][0]['lr']},global_step=epoch)

        print('- Epoch: %d - Train_loss: %.5f - Train_acc: %.5f - Val_loss: %.5f - Val_acc: %5f - T_Time: %.3f' %(epoch,train_loss,train_acc,validate_loss,validate_acc,time_all))
        print('当前学习率：%f' %optimizer.state_dict()['param_groups'][0]['lr'])

        if validate_loss < best_loss:
            best_loss = validate_loss
            print('Find better model in Epoch {0}, saving model.'.format(epoch))
            torch.save(NET[fold],  model_path+'/best_model_' + str(fold) + '.pt')  # 保存最优模型
        #是否满足早停法条件
        if(early_stopping(validate_loss,NET[fold])):
            print("Early stopping")
            break

    print('Fold %d Training Finished' %(fold+1))
    torch.cuda.empty_cache()# 清空显卡cuda
print('Training Finished')

# ------Display the attention value-----

# -----For Leads -----

In [None]:
test_model_path = "./model/20220907_091538/best_model_0.pt"
test_model = torch.load(test_model_path).to(DEVICE)
test_x = x[0:1698]
test_y = x[0:1698]
test_x = MAX_MIN_normalization_by_feactures(test_x)
test_x = torch.FloatTensor(test_x)  #turn numpy to tensor
test_y = torch.LongTensor(test_y)
test_dataset = Data.TensorDataset(test_x, test_y)
test_dataloader = Data.DataLoader(dataset=test_dataset, batch_size=1, shuffle=False)
test_model.eval()

test_model.eval()
attention_value_timestep = np.zeros(EcgChannles_num,)
for i,data in enumerate(test_dataloader,0):
    inputs,labels = data[0].to(DEVICE),data[1].to(DEVICE)
    outputs = test_model(inputs)
    #print(outputs)
    _,pred = outputs.max(1) # 求概率最大值对应的标签
    #print("the label :{labels},pred is {pred}".format(labels=labels[0],pred=pred[0]))
    attention_value = test_model.attention_value
    attention_value_timestep += (((attention_value.to('cpu'))[0]).detach().numpy()).sum(axis=0)
plt.figure(figsize=(5,5))
plt.stem(np.arange(0,len(attention_value_timestep)), attention_value_timestep)
plt.xticks(np.arange(0,len(attention_value_timestep)),['I', 'II', 'III', 'aVR', 'aVL', 'aVF', 'V1', 'V2', 'V3', 'V4', 'V5', 'V6'])
plt.ylabel("Sum of attention value")
plt.show()

In [None]:
print(summary(Models.CNN_ATT7(), (EcgChannles_num,EcgLength_num), device="cpu"))