# Libirary import

In [None]:
""" os """
import os

""" torch """
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
from torchvision import datasets, transforms
from torchvision.transforms import Compose, Resize, ToTensor
import torchvision.models as models
from torch.utils.data import Dataset, DataLoader, random_split, SubsetRandomSampler, WeightedRandomSampler
from torch.utils.data.sampler import SubsetRandomSampler
from torch import Tensor

"""tensor board"""
#from torch.utils.tensorboard import SummaryWriter

"""glob"""
from glob import glob

""" tqdm """
import time
from tqdm import tqdm

"""Pandas"""
import pandas as pd

""" numpy """
import numpy as np
from numpy import argmax
from PIL import Image

"""JSON"""
import json

"""sklearn"""
import sklearn
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import accuracy_score
from sklearn.metrics import auc
from sklearn.metrics import roc_curve
from sklearn.metrics import roc_auc_score
from sklearn.metrics import confusion_matrix

"""seaborn"""
import seaborn as sns

"""scipy"""
from scipy import io
from scipy import signal
from scipy.fft import fft, ifft,fftfreq

"""SUMMARY"""
from torchsummary import summary as summary_

"""time"""
import time

"""PIL"""
from PIL import Image

"""einops"""
from einops import rearrange, reduce, repeat
from einops.layers.torch import Rearrange, Reduce

import re
import shutil
import random
import matplotlib.pyplot as plt
import scipy

from IPython.display import Image

In [None]:
import gc
gc.collect()
torch.cuda.empty_cache() 

In [None]:
def model_performance(labels, preds):
    tn, fp, fn, tp = confusion_matrix(labels,preds).ravel()
    
    sen =  (tp/(tp+fn))*100
    spe =  (tn/(fp+tn))*100
    
    ppv =  (tp/(tp+fp))*100
    npv =  (tn/(fn+tn))*100
    
    return tn, fp, fn, tp, sen,spe,ppv,npv

In [None]:
import sys
print(sys.version)

# Path init

In [None]:
os.listdir()

## Male Save Path

In [None]:
experiment_num = 
result_save_path = ''

gender = 'male'
pos_class = 494
neg_class = 7143

## Female Save Path

In [None]:
experiment_num = 
result_save_path = ''

gender = 'female'
pos_class = 1455
neg_class = 4655

## Total Save Path

In [None]:
experiment_num = 
result_save_path = ''

pos_class = 494+1455
neg_class = 7143+4655

In [None]:
folder_path = result_save_path + str(experiment_num)

try:
    if not(os.path.isdir(folder_path)):
        os.makedirs(os.path.join(folder_path))
except OSError as e:
    if e.errno != errno.EEXIST:
        print("Failed to create directory!!!!!")
        raise

In [None]:
folder_path2 = result_save_path + str(experiment_num)+'/prob and auroc figure per epoch'

try:
    if not(os.path.isdir(folder_path2)):
        os.makedirs(os.path.join(folder_path2))
except OSError as e:
    if e.errno != errno.EEXIST:
        print("Failed to create directory!!!!!")
        raise

In [None]:
"""Result init"""
Result = {}
Result['Data status']   = 'Normal, Mildly vs  Moderately, Severely + Male cut off: 132, Female cut off: 109'
Result['Data Balanced'] = 'Unbalanced'
Result['class weight']  = 'True'
Result['preprocess']  = '132,109'
Result['Model']  = 'coatmixer'

# Hyper parameters

In [None]:
seed = 1

#validation ratio
validation_ratio = 0.1

#learning rate
lr = 0.001

momentum = 0.5

batch_size = 32
test_batch_size = 32


epochs = 100
no_cuda = False

log_interval = 20

# Set the seed and GPU

In [None]:
torch.manual_seed(seed)
use_cuda = not no_cuda and torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu")

kwargs = {'num_workers':0,'pin_memory':True} if use_cuda else {}

!nvcc --version
#https://pytorch.org/get-started/previous-versions/
print('--------------------------------------')
print('현재 torch 버전:',torch.__version__)
print('학습을 진행하는 기기:',device)
print('cuda index:', torch.cuda.current_device())
print('gpu 개수:', torch.cuda.device_count())
print('graphic name:', torch.cuda.get_device_name())

# CoAtMixer

In [None]:
import sys
sys.path.append('')

In [None]:
from coatmixer import coatmixer_0

In [None]:
input = torch.randn(1,1,12,4000)


model = coatmixer_0()

output = model.forward(input)
output.shape

# ResNet-CBAM

In [None]:
from resnet_cbam_1d import ResNetCBAM

In [None]:
input = torch.randn(1,1,12)

layer = ResNetCBAM()

output = layer(input)
output.shape

# Ensemble Model

In [None]:
class Ensemble_CNN(nn.Module):
    
    def __init__(self, oup1:int=768, oup2:int=32, class_n:int=1):
        super(Ensemble_CNN,self).__init__()
        
        """ECG processing model"""
        self.FE_Global    = coatmixer_0()
        
        self.seq1 = nn.Sequential(
            nn.AdaptiveAvgPool2d((1, 1)),
            nn.Flatten()
        )
        
        """Features processing model"""
        self.seq2 = ResNetCBAM()
        
        
        """FC"""
        self.seq3 = nn.Sequential(
            nn.Linear(oup1+oup2, int((oup1+oup2)/8)),
            nn.Dropout(p=0.5),
            nn.Linear(int((oup1+oup2)/8), int((oup1+oup2)/32) ),
            nn.Dropout(p=0.5),
            nn.Linear(int((oup1+oup2)/32), class_n)
            )
        
        """CAM"""
        self.gradients_global = None

        
    def activations_hook_global(self, grad):
        self.gradients_global = grad

        
    def forward(self,x,y):
        
        """intput"""
        global_features = self.FE_Global(x)
        # hook for grad cam
        if global_features.requires_grad:
            h = global_features.register_hook(self.activations_hook_global)
        global_features = self.seq1(global_features)
        
        """intput"""
        ECG_features = self.seq2(y)
        
        """concat"""
        out = torch.cat((global_features, ECG_features), dim= -1)
        out = self.seq3(out)
        return out
    
    def get_activations_gradient_global(self):
        # method for the gradient extraction
        return self.gradients_global
    
    def get_activations_global(self, x):
        # method for the activation exctraction
        return self.FE_Global(x)

In [None]:
input = torch.randn(1,1,12,4000)
#additional_features = torch.randn(1,12)
additional_features = torch.randn(1, 1,12)
 


model = Ensemble_CNN()
output = model.forward(input, additional_features)
output.shape

# Data Path

In [None]:
"""LVH"""
with open('', 'r') as f:
    train = json.load(f)
    
with open('', 'r') as f:
    validation = json.load(f)
    
with open('', 'r') as f:
    test = json.load(f)

In [None]:
len(train),len(validation),len(test)

In [None]:
def gender_selection(json_list, gender):
    
    label_data = json_list
    if gender == 'male':
        sex = '1'
    elif gender == 'female':
        sex = '2'
        
    new_label_data = []
    
    for i in range(0,len(label_data),1):
        tempt = label_data[i]
        if tempt['sex'] == sex:
            new_label_data.append(tempt)
            
    return new_label_data

In [None]:
train = gender_selection(train, gender)
validation = gender_selection(validation, gender)
test = gender_selection(test, gender)

len(train),len(validation),len(test)

In [None]:
train_paths       = train
validation_paths  = validation
test_paths        = test

len(train_paths), len(validation_paths), len(test_paths)

# Custom Dataset

In [None]:
class CustomDataset(Dataset):

    def __init__(self, data_paths, transform=None):
        self.data_paths = data_paths
        self.transform = transform

    def __getitem__(self, idx):
          
        """get data"""
        tempt = self.data_paths[idx] 
        path = tempt['data_path']
        data = np.load(file = path)
        
        LeadI   = data[0][500:-500]
        LeadII  = data[1][500:-500]
        LeadIII = data[2][500:-500]
        V1      = data[3][500:-500]
        V2      = data[4][500:-500]
        V3      = data[5][500:-500]
        V4      = data[6][500:-500]
        V5      = data[7][500:-500]
        V6      = data[8][500:-500]
        aVL     = data[9][500:-500]
        aVR     = data[10][500:-500]
        aVF     = data[11][500:-500]

        data = np.vstack([LeadI, LeadII, LeadIII,aVR, aVL, aVF, V1, V2, V3, V4, V5, V6])
        data = np.expand_dims(data, axis=0)
        
        
        sex = float(tempt['sex'])
        age = float(tempt['age'])
        
        """ecg features"""
        VentricularRate = float(tempt['VentricularRate'])
        AtrialRate      = float(tempt['AtrialRate'])
        PRInterval      = float(tempt['PRInterval']) 
        QRSDuration     = float(tempt['QRSDuration'])
        QTInterval      = float(tempt['QTInterval'])
        QTCorrected     = float(tempt['QTCorrected'])
        PAxis           = float(tempt['PAxis'])
        RAxis           = float(tempt['RAxis'])
        TAxis           = float(tempt['TAxis'])
        AFIB_AFL        = float(tempt['fibrillation or flutter'])
        
        
        features = np.array([sex,age,VentricularRate,AtrialRate,PRInterval,QRSDuration,
                             QTInterval,QTCorrected,PAxis,RAxis,TAxis,AFIB_AFL])
        
        
        features = np.expand_dims(features, axis=0)
        

        """get label"""
        if sex == 1.0:
            if float(tempt['LV mass index']) < 132:  
                label = 0
            else:
                label = 1
                
        else:
            if float(tempt['LV mass index']) < 109: 
                label = 0
            else:
                label = 1
               
        
        return data,features,label
    
    
    def __len__(self):
        return len(self.data_paths)

In [None]:
train_dataset      = CustomDataset(train_paths,transforms.Compose([transforms.ToTensor()]))
validation_dataset = CustomDataset(validation_paths,transforms.Compose([transforms.ToTensor()]))
test_dataset       = CustomDataset(test_paths,transforms.Compose([transforms.ToTensor()]))

print(len(train_dataset),len(validation_dataset), len(test_dataset))

# Data Loader

In [None]:
"""Train"""
train_loader = torch.utils.data.DataLoader(
    dataset=train_dataset,
    batch_size = batch_size,
    shuffle = True, 
    **kwargs
)

"""Validation"""
validation_loader = torch.utils.data.DataLoader(
    dataset=validation_dataset,
    batch_size = batch_size,
    shuffle = True,
    **kwargs
)

"""Test"""
test_loader = torch.utils.data.DataLoader(
    dataset=test_dataset,
    batch_size = test_batch_size,
    shuffle = False,
    **kwargs
)

print("Length of the train_loader:", len(train_loader))
print("Length of the val_loader:", len(validation_loader))
print("Length of the test_loader:", len(test_loader))

# Calculate pos_weight for unbalanced class

In [None]:
pos_class = 0
neg_class = 0

for batch_idx,(data,_,target) in enumerate(train_loader):
    
    pos_class += len(target[np.where(target == 1)])
    neg_class += len(target[np.where(target == 0)])

In [None]:
#pos_weight = num_neg / num_pos
pos_weight = torch.tensor([neg_class/pos_class], dtype = torch.float32)
print(pos_weight)

In [None]:
Result['pos_weight']  = pos_weight

# Optimizer

In [None]:
model = Ensemble_CNN().to(device)

optimizer = optim.Adam(model.parameters(), lr=lr)
Result['optimizer']   = 'Adam'

# Train

In [None]:
"""Train"""
train_losses        = []
avg_train_losses    = []
Train_baths_ACC     = [] 
Train_ACC           = [] 
Train_AUROC         = []


"""Validaion"""
valid_losses        = []
avg_valid_losses    = []
Validation_ACC      = []
Valid_ACC_per_Class = []
Validation_AUROC    = []


Validation_Sen      = []
Validation_Spe      = []

In [None]:
"""class num and loss"""
criterion = nn.BCEWithLogitsLoss(pos_weight = pos_weight.cuda())

"""save best model"""
best_acc = 0
best_epoch = 0
best_auroc = 0
train_acc_cul = 0

best_model_save_path = folder_path +'/'+ 'best model of experiment ' + str(experiment_num)

In [None]:
for epoch in range(1, epochs + 1):
    
    """Train"""
    
    train_prob_auroc_save_path = folder_path2 + '/epoch = ' + str(epoch) + '/train'

    try:
        if not(os.path.isdir(train_prob_auroc_save_path)):
            os.makedirs(os.path.join(train_prob_auroc_save_path))
    except OSError as e:
        if e.errno != errno.EEXIST:
            print("Failed to create directory!!!!!")
            raise
    
    model.train()
    
    train_loss = 0
    Train_baths_ACC = []
    
    true_labels  = np.array([]) # ideal label
    target_score = np.array([]) # ouput score
    prob_score   = np.array([]) # ouput prob
    
    for batch_idx, (data,features,target) in enumerate(train_loader):
    
        data, target = data.to(device, dtype=torch.float), target.to(device, dtype=torch.float)
        features = features.to(device, dtype=torch.float)
        
        optimizer.zero_grad()
        output = model(data,features)
         
        """pred(BCE loss)"""
        pred = torch.round(torch.sigmoid(output))
        
        """loss"""
        loss = criterion(output.view_as(target), target)
        
        """update and save loss"""
        train_loss += loss.item()
        loss.backward() # 가중치 갱신.
        optimizer.step()
        
        
        correct = 0
        total = target.size(0)
        correct += pred.eq(target.view_as(pred)).sum().item()
        accuracy = 100. * correct / total
        Train_baths_ACC.append(accuracy)
        
        """auroc inputs"""

        true_labels  = np.append(true_labels,target.detach().cpu().numpy())
        target_score = np.append(target_score,output.detach().cpu().numpy())
        prob_score   = np.append(prob_score,torch.sigmoid(output).detach().cpu().numpy())
        
        
        if batch_idx % log_interval == 0:
            #1.
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), len(train_loader.dataset),
                100. * batch_idx / len(train_loader), loss.item()))

            #2.
            print('Train set:  batch loss: {:.4f}, Accuracy: {:.0f}% '.format(
                loss.item() ,accuracy))
            
    train_auroc = roc_auc_score(true_labels, prob_score)
    Train_AUROC.append(train_auroc)
    
    """label and target score save"""
    np.save(train_prob_auroc_save_path +'/true_labels.npy',true_labels)
    np.save(train_prob_auroc_save_path +'/target_score.npy',target_score)
    np.save(train_prob_auroc_save_path +'/target_prob.npy',prob_score)

    
    """Validation"""
    
    valid_prob_auroc_save_path = folder_path2 + '/epoch = ' + str(epoch) + '/validation'

    try:
        if not(os.path.isdir(valid_prob_auroc_save_path)):
            os.makedirs(os.path.join(valid_prob_auroc_save_path))
    except OSError as e:
        if e.errno != errno.EEXIST:
            print("Failed to create directory!!!!!")
            raise
    
    model.eval()
    
    valid_loss = 0
    correct = 0
    total = len(validation_loader.dataset)
    
    true_labels  = np.array([]) # ideal label
    target_score = np.array([]) # ouput score
    prob_score   = np.array([]) # ouput prob
    
    pred_labels   = np.array([]) # model prediction

    with torch.no_grad():
        for batch_idx, (data,features, target) in enumerate(validation_loader):
            data, target = data.to(device, dtype=torch.float), target.to(device, dtype=torch.float)
            features = features.to(device, dtype=torch.float)
            
            output = model(data,features)

            """pred(BCE loss)"""
            pred = torch.round(torch.sigmoid(output))
            
            """loss"""
            loss = criterion(output.view_as(target), target)
            valid_loss += loss.item()
            correct += pred.eq(target.view_as(pred)).sum().item()
            
            """auroc inputs"""
            true_labels  = np.append(true_labels,np.array(target.cpu()))
            target_score = np.append(target_score,np.array(output.cpu()))
            prob_score   = np.append(prob_score,np.array(torch.sigmoid(output).cpu()))
            
            pred_labels   = np.append(pred_labels,np.array(torch.round(torch.sigmoid(output)).cpu()))
            
            
        
        valid_auroc = roc_auc_score(true_labels, prob_score)
        Validation_AUROC.append(valid_auroc)
        
        """valid auroc figure save"""
        auc_save_path = valid_prob_auroc_save_path +'/ROC Curve.png'

        FPR, TPR, thresholds = roc_curve(true_labels, prob_score)

        """label and target score save"""
        np.save(valid_prob_auroc_save_path +'/true_labels.npy',true_labels)
        np.save(valid_prob_auroc_save_path +'/target_score.npy',target_score)
        np.save(valid_prob_auroc_save_path +'/target_prob.npy',prob_score)
        
        """calculate sen & spe"""
        _, _, _, _, sen,spe,_,_ = model_performance(true_labels,pred_labels)
        Validation_Sen.append(sen)
        Validation_Spe.append(spe)
                                      
        

    """Loss and ACC """
    train_loss /= len(train_loader)
    valid_loss /= len(validation_loader)
    avg_train_losses.append(train_loss)
    avg_valid_losses.append(valid_loss)
    
    Train_ACC.append(sum(Train_baths_ACC)/len(Train_baths_ACC))
    valid_accuracy = 100. * correct / total
    Validation_ACC.append(valid_accuracy)
    
    print('------------------------------------------')
    print('Valid set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)'.format(
        valid_loss, correct, total, valid_accuracy))
    print('Valid set: AUROC: {:.4f}'.format(valid_auroc))
    print('Valid set: Sensitivity: {:.4f}'.format(sen))
    print('Valid set: Specificity: {:.4f}'.format(spe))
    print('-------------------------------------------')
    
    
    """Save best model"""
    
    if valid_auroc > best_auroc:
        torch.save(model, best_model_save_path)
        print("save model")
        print('-------------------------------------------')
        best_acc = valid_accuracy
        best_epoch = epoch
        best_auroc = valid_auroc
        #best_sen = sen
        train_acc_cul = sum(Train_baths_ACC)/len(Train_baths_ACC)#best model save시 현재 epoch의 train acc save.


# Model save and Load

In [None]:
save_path = folder_path +'/'+ 'experiment '+ str(experiment_num)
torch.save(model, save_path)

# Figure and Save Path init

In [None]:
fig_save_path = result_save_path

In [None]:
loss_save_path = fig_save_path + str(experiment_num) +'/Loss.png'

fig = plt.figure(figsize=(10,8))
plt.plot(range(1,len(avg_train_losses)+1),avg_train_losses, label='Training Loss')
plt.plot(range(1,len(avg_valid_losses)+1),avg_valid_losses, label='Validation Loss')


plt.xlabel('epochs')
plt.ylabel('loss')

plt.xlim(0, len(avg_train_losses)+1) 
plt.ylim(0, 2) 

plt.grid(True)
plt.legend()
plt.tight_layout()
plt.savefig(loss_save_path, dpi = 200) 
plt.show()

In [None]:
acc_save_path = fig_save_path + str(experiment_num) +'/Acc.png'

fig = plt.figure(figsize=(10,8))

plt.plot (range(1,len(Train_ACC)+1),Train_ACC, label='Train ACC' )
plt.plot (range(1,len(Validation_ACC)+1),Validation_ACC, label='Validation ACC')


plt.xlabel('epochs')
plt.ylabel('acc')

plt.xlim(0, len(Train_ACC)+1)
plt.ylim(0, 100)

plt.grid(True)
plt.legend(loc = 'lower right', fancybox = False, edgecolor = 'k', framealpha = 1.0)
plt.tight_layout()
plt.savefig(acc_save_path, dpi = 200) 
plt.show()

In [None]:
acc_save_path = fig_save_path + str(experiment_num) +'/Acc and AUROC.png'

fig = plt.figure(figsize=(10,8))

plt.plot (range(1,len(Train_ACC)+1),np.array(Train_ACC)/100, label='Train ACC',alpha = 0.2 )
plt.plot (range(1,len(Validation_ACC)+1),np.array(Validation_ACC)/100, label='Validation ACC',alpha = 0.2)
plt.plot (range(1,len(Train_AUROC)+1),np.array(Train_AUROC), label='Train AUROC')
plt.plot (range(1,len(Validation_AUROC)+1),np.array(Validation_AUROC), label='Validation AUROC')

plt.vlines(best_epoch,0.1,1.0,'r','--')
plt.text(best_epoch, 0.1,' best model',fontsize = 15)

plt.xlabel('epochs')
plt.ylabel('acc and auroc')

plt.xlim(0, len(Train_ACC)+1)
plt.ylim(0, 1)

plt.grid(True)
plt.legend(loc = 'lower right', fancybox = False, edgecolor = 'k', framealpha = 1.0,fontsize = 15)
plt.tight_layout()
plt.savefig(acc_save_path, dpi = 200) 
plt.show()

In [None]:
sen_spe_save_path = fig_save_path + str(experiment_num) +'/Validatin Sen and Spe.png'

fig = plt.figure(figsize=(10,8))
plt.plot(range(1,len(Validation_Sen)+1),np.array(Validation_Sen)*0.01, color='blue', label='Validation Sensitivity')
plt.plot(range(1,len(Validation_Spe)+1),np.array(Validation_Spe)*0.01, color='green', label='Validation Specificity')
plt.plot (range(1,len(Train_AUROC)+1),np.array(Train_AUROC), label='Train AUROC' ,alpha = 0.3)
plt.plot (range(1,len(Validation_AUROC)+1),np.array(Validation_AUROC), label='Validation AUROC',alpha = 0.3)

plt.vlines(best_epoch,0.1,1.0,'r','--')
plt.text(best_epoch, 0.1,' best model',fontsize = 15)

plt.xlabel('epochs')
plt.ylabel('metrics')

plt.xlim(0, len(Validation_Sen)+1) 
plt.ylim(0, 1) 

plt.grid(True)
plt.legend(loc = 'lower right', fancybox = False, edgecolor = 'k', framealpha = 1.0,fontsize = 15)
plt.tight_layout()
plt.savefig(sen_spe_save_path, dpi = 200) 
plt.show()

# Result

In [None]:
Result['loss'] = 'BCE'

Result['epochs'] = epochs
Result['learning rate'] = lr
Result['batch size'] = batch_size
Result['test batch size'] = test_batch_size

Result['best_epoch'] = best_epoch
Result['train_acc_at_last_epoch'] = Train_ACC[-1]
Result['train_acc_at_best_epoch'] = train_acc_cul

Result['validation_acc_at_last_epoch'] = Validation_ACC[-1]
Result['validation_acc_at_best_epoch'] = best_acc
Result['validation_auroc_at_best_epoch'] = best_auroc

# Load best model

In [None]:
del model

experiment_num

In [None]:
model = torch.load(best_model_save_path)
model.to(device)

optimizer = optim.Adam(model.parameters(), lr=lr)

# Test without Youden index

In [None]:
true_labels  = np.array([]) 
target_score = np.array([]) 
prob_score   = np.array([]) # ouput prob
pred_labels  = np.array([]) 

"""test"""
model.eval()
    
test_loss = 0
correct = 0
total = len(test_loader.dataset)



with torch.no_grad():
    for batch_idx, (data,features, target) in enumerate(test_loader):
        data, target = data.to(device, dtype=torch.float), target.to(device, dtype=torch.float)
        features = features.to(device, dtype=torch.float)
        
        """pred(BCE loss)"""
        pred = torch.round(torch.sigmoid(output))
        
        
        """loss"""
        loss = criterion(output.view_as(target), target)
        test_loss += loss.item()
        correct += pred.eq(target.view_as(pred)).sum().item()
        
        
        true_labels = np.append(true_labels,np.array(target.cpu())) 
        target_score = np.append(target_score,np.array(output.cpu()))
        prob_score   = np.append(prob_score,np.array(torch.sigmoid(output).cpu()))
        pred_labels = np.append(pred_labels,np.array(pred.cpu())) 
        
        print('test: [{}/{}] '.format(batch_idx,len(test_loader)-1))
        
        
test_loss /= len(test_loader)
test_accuracy = 100. * correct / total

print('Test set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)'.format(
    test_loss, correct, total, test_accuracy))

In [None]:
Result['test_acc'] = test_accuracy
Result['test_auroc'] = roc_auc_score(true_labels, prob_score)


test_path = folder_path +'/test'

try:
    if not(os.path.isdir(test_path)):
        os.makedirs(os.path.join(test_path))
except OSError as e:
    if e.errno != errno.EEXIST:
        print("Failed to create directory!!!!!")
        raise


performance = [roc_curve(true_labels, target_score)]
np.save(test_path +'/test performance.npy',performance)

np.save(test_path +'/true_labels.npy',true_labels)
np.save(test_path +'/target_score.npy',target_score)
np.save(test_path +'/target_prob.npy',prob_score)

In [None]:
auc_save_path = fig_save_path + str(experiment_num) +'/Testset ROC Curve.png'

FPR, TPR, thresholds = roc_curve(true_labels, prob_score)

fig = plt.figure(figsize=(10,10))

plt.plot(FPR, TPR, label=r'model performance (AUROC %0.4f)' % (roc_auc_score(true_labels, prob_score)),lw=2, alpha=.8)
plt.plot([0,1],[0,1],'--', color='red') # 대각선
plt.title('ROC Curve', fontsize=18)
plt.xlabel('1-specificity', fontsize=18)
plt.ylabel('sensitivity', fontsize=18)

plt.legend(loc = 'lower right', fontsize=18)
plt.xticks(fontsize=15)
plt.yticks(fontsize=15)

plt.savefig(auc_save_path, dpi = 200)
plt.show()
plt.clf()

# Performance

In [None]:
tn, fp, fn, tp, sen,spe,ppv,npv = model_performance(true_labels,pred_labels)

print("모델의 Sensitivity:",sen)
print("모델의 Specificity:",spe)
print("모델의 PPV:",ppv)
print("모델의 NPV:",npv)

In [None]:
Result['TN'] = tn
Result['FP'] = fp
Result['FN'] = fn
Result['TP'] = tp

Result['Sensitivity'] = sen
Result['Specificity'] = spe
Result['PPV'] = ppv
Result['NPV'] = npv

# Test with Youden index

## calculate youden index

In [None]:
true_labels  = np.array([]) 
target_score = np.array([]) 
prob_score   = np.array([]) 
pred_labels  = np.array([]) 

"""Validation"""
model.eval()
    
valid_loss = 0
correct = 0
total = len(validation_loader.dataset)


with torch.no_grad():
        for batch_idx, (data,features, target) in enumerate(validation_loader):
            data, target = data.to(device, dtype=torch.float), target.to(device, dtype=torch.float)
            features = features.to(device, dtype=torch.float)
            
            output = model(data,features)
            
            """pred(BCE loss)"""
            pred = torch.round(torch.sigmoid(output))
            
            """결과값 누적."""
            true_labels = np.append(true_labels,np.array(target.cpu())) 
            target_score = np.append(target_score,np.array(output.cpu()))
            prob_score   = np.append(prob_score,np.array(torch.sigmoid(output).cpu()))
            pred_labels = np.append(pred_labels,np.array(pred.cpu()))  
    
            print('validation: [{}/{}] '.format(batch_idx,len(validation_loader)-1))
            

In [None]:
"""Youden’s J statistic. / J = Sensitivity + Specificity – 1"""

# calculate roc curves
#FPR, TPR, thresholds = roc_curve(true_labels, target_score)
FPR, TPR, thresholds = roc_curve(true_labels, prob_score)

# get the best threshold
J = TPR - FPR
idx = argmax(J)
best_thresh = thresholds[idx]

print('Best Threshold=%f, sensitivity = %.3f, specificity = %.3f, J=%.3f' % (best_thresh, TPR[idx], 1-FPR[idx], J[idx]))

In [None]:
auc_save_path = fig_save_path + str(experiment_num) +'/Validset ROC Curve.png'

fig = plt.figure(figsize=(10,10))

plt.plot(FPR, TPR, label=r'model performance (AUROC %0.4f)' % (roc_auc_score(true_labels, prob_score)),lw=2, alpha=.8)
plt.scatter(FPR[idx], TPR[idx], marker='+', s=100, color='r', 
            label='Best threshold = %.3f, \nSensitivity = %.3f, \nSpecificity = %.3f' % (best_thresh, TPR[idx], 1-FPR[idx]))

plt.plot([0,1],[0,1],'--', color='red') 
plt.title('ROC Curve', fontsize=18)
plt.xlabel('1-specificity', fontsize=18)
plt.ylabel('sensitivity', fontsize=18)

plt.legend(loc = 'lower right', fontsize=18)
plt.xticks(fontsize=15)
plt.yticks(fontsize=15)

plt.savefig(auc_save_path, dpi = 200) 
plt.show()
plt.clf()

In [None]:
Result['best threshold'] = best_thresh
Result['sensitivity_in_best threshold'] =  TPR[idx]
Result['specificity_in_best threshold'] =  1-FPR[idx]
Result['J_best threshold'] =  J[idx]

In [None]:
true_labels  = np.array([]) 
target_score = np.array([]) 
prob_score   = np.array([]) 
pred_labels  = np.array([]) 

"""test"""
model.eval()
    
test_loss = 0
correct = 0
total = len(test_loader.dataset)



with torch.no_grad():
    for batch_idx, (data,features, target) in enumerate(test_loader):
        data, target = data.to(device, dtype=torch.float), target.to(device, dtype=torch.float)
        features = features.to(device, dtype=torch.float)

        
        """pred(BCE loss)"""
        #pred = torch.round(torch.sigmoid(output))
        pred = (torch.sigmoid(output) > best_thresh).int() 
        
        
        """loss"""
        loss = criterion(output.view_as(target), target)
        test_loss += loss.item()
        correct += pred.eq(target.view_as(pred)).sum().item()
        
        
        true_labels = np.append(true_labels,np.array(target.cpu())) # 실제 라벨
        target_score = np.append(target_score,np.array(output.cpu()))
        prob_score   = np.append(prob_score,np.array(torch.sigmoid(output).cpu()))
        pred_labels = np.append(pred_labels,np.array(pred.cpu()))  # 예측 결과
        
        print('test: [{}/{}] '.format(batch_idx,len(test_loader)-1))
        
        
test_loss /= len(test_loader)
test_accuracy = 100. * correct / total

print('Test set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)'.format(
    test_loss, correct, total, test_accuracy))

In [None]:
Result['test_acc_youden_index'] = test_accuracy
Result['test_auroc_youden_index'] = roc_auc_score(true_labels, prob_score)


test_path = folder_path +'/test_youden_index'

try:
    if not(os.path.isdir(test_path)):
        os.makedirs(os.path.join(test_path))
except OSError as e:
    if e.errno != errno.EEXIST:
        print("Failed to create directory!!!!!")
        raise


performance = [roc_curve(true_labels, prob_score)]
np.save(test_path +'/test performance_youden_index.npy',performance)

np.save(test_path +'/true_labels.npy',true_labels)
np.save(test_path +'/target_score.npy',target_score)
np.save(test_path +'/target_prob.npy',prob_score)


In [None]:
auc_save_path = fig_save_path + str(experiment_num) +'/Testset ROC Curve with Youden index.png'

fig = plt.figure(figsize=(10,10))

plt.plot(FPR, TPR, label=r'model performance (AUROC %0.4f)' % (roc_auc_score(true_labels, prob_score)),lw=2, alpha=.8)
plt.plot([0,1],[0,1],'--', color='red') 
plt.title('ROC Curve', fontsize=18)
plt.xlabel('1-specificity', fontsize=18)
plt.ylabel('sensitivity', fontsize=18)

plt.legend(loc = 'lower right', fontsize=18)
plt.xticks(fontsize=15)
plt.yticks(fontsize=15)

plt.savefig(auc_save_path, dpi = 200) 
plt.show()
plt.clf()

In [None]:
tn, fp, fn, tp, sen,spe,ppv,npv = model_performance(true_labels,pred_labels)

print("Sensitivity:",sen)
print("Specificity:",spe)
print("PPV:",ppv)
print("NPV:",npv)

In [None]:
Result['TN_youden_index'] = tn
Result['FP_youden_index'] = fp
Result['FN_youden_index'] = fn
Result['TP_youden_index'] = tp

Result['Sensitivity_youden_index'] = sen
Result['Specificity_youden_index'] = spe
Result['PPV_youden_index'] = ppv
Result['NPV_youden_index'] = npv

# Result save

In [None]:
Result

In [None]:
df = pd.DataFrame(Result,index = [0])
df.to_excel(folder_path +'/Result.xlsx',index=False)