# Library import

In [None]:
""" os """
import os

""" torch """
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
from torchvision import datasets, transforms
from torch.utils.data import Dataset, DataLoader, random_split, SubsetRandomSampler, WeightedRandomSampler
from torch.utils.data.sampler import SubsetRandomSampler


"""tensor board"""
import torchvision


"""glob"""
from glob import glob

""" tqdm """
import time
from tqdm import tqdm

"""Pandas"""
import pandas as pd

""" numpy """
import numpy as np
from numpy import argmax
from PIL import Image

"""JSON"""
import json

"""sklearn"""
import sklearn
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import accuracy_score
from sklearn.metrics import auc
from sklearn.metrics import roc_curve
from sklearn.metrics import roc_auc_score
from sklearn.metrics import confusion_matrix
from sklearn.metrics import mean_absolute_error, r2_score

"""seaborn"""
import seaborn as sns

"""scipy"""
from scipy import io
from scipy import signal
from scipy.fft import fft, ifft,fftfreq
from scipy import stats

"""SUMMARY"""
from torchsummary import summary

"""time"""
import time


"""PIL"""
from PIL import Image

"""pingouin"""
import pingouin as pg

import re
import shutil
import random
import matplotlib.pyplot as plt
import scipy

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

import numpy as np
import matplotlib.pyplot as plt

import cv2
from torch.autograd import Function

from scipy.interpolate import interp1d

# Path init

In [None]:
os.listdir()

In [None]:
cur_path = "path"
os.chdir(cur_path)
os.listdir()

In [None]:
experiment_num = 
fig_save_path = 'path'

# Hyper parameters

In [None]:
seed = 1

#validation ratio
validation_ratio = 0.1

#learning rate
lr = 0.001
 

momentum = 0.5


batch_size = 512    
test_batch_size = 512


epochs = 50
no_cuda = False

log_interval = 5

# Set the seed and GPU

In [None]:
torch.manual_seed(seed)

use_cuda = not no_cuda and torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu")

kwargs = {'num_workers':0,'pin_memory':True} if use_cuda else {}

# Model

In [None]:
class ECG_CNN(nn.Module):
    def __init__(self, num_classes=2):
        super(ECG_CNN, self).__init__()
        
        self.conv1 = nn.Sequential(
            nn.Conv1d(12, 48, kernel_size=5, stride=1 ),
            nn.BatchNorm1d(48),
            nn.ReLU(inplace=True),
            nn.MaxPool1d(kernel_size=2, stride=2)
        )
        
        self.conv2 = nn.Sequential(
            nn.Conv1d(48, 96, kernel_size=5, stride=1 ),
            nn.BatchNorm1d(96),
            nn.ReLU(inplace=True),
            nn.MaxPool1d(kernel_size=2, stride=2)
        )
        
        self.conv3 = nn.Sequential(
            nn.Conv1d(96, 192, kernel_size=5, stride=1 ),
            nn.BatchNorm1d(192),
            nn.ReLU(inplace=True),
            nn.MaxPool1d(kernel_size=2, stride=2)
        )
        
        """Sigmoid"""
        self.sigmoid = nn.Sigmoid()
        
        
        """ Global Max, Average Pooling"""
        self.globalavrpool1 = nn.AvgPool1d(kernel_size = 621, stride=1)
        
        """Drop out"""
        self.dp = nn.Dropout(p=0.5)
        
        
        """Fully Connected """ 
        self.fc1 = nn.Linear(1*192, 64)
        self.fc2 = nn.Linear(64, 32)
        self.fc3 = nn.Linear(32, num_classes) 
        
    def forward(self, x):
        
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)
        x = self.globalavrpool1(x)
        
        """Fully Connected"""
        x = x.view(-1, 1*192*1) 
        
        # Dense
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        
        return x

# Data Path(12-Lead)

In [None]:
with open('path', 'r') as f:
    label_data = json.load(f)

In [None]:
len(label_data)

In [None]:
data_paths_infant,data_paths_10s,data_paths_20s,data_paths_30s,data_paths_40s,data_paths_50s,data_paths_60s  = [],[],[],[],[],[],[]
data_paths_70s, data_paths_80s, data_paths_90s = [],[],[]

for i in range(0,len(label_data),1):
    
    """get label"""
    tempt = label_data[i]['age']
    
    if tempt<18:
        data_paths_infant.append(i)
    elif 18 <= tempt < 20:
        data_paths_10s.append(i)
    elif 20 <= tempt < 30:
        data_paths_20s.append(i)
    elif 30 <= tempt < 40:
        data_paths_30s.append(i)
    elif 40 <= tempt < 50:
        data_paths_40s.append(i)
    elif 50 <= tempt < 60:
        data_paths_50s.append(i)
    elif 60 <= tempt < 70:
        data_paths_60s.append(i)
    elif 70 <= tempt < 80:
        data_paths_70s.append(i)
    elif 80 <= tempt < 90:
        data_paths_80s.append(i)
    else:
        data_paths_90s.append(i)

# train, validatin, test set

In [None]:
def Make_train_validation_test_paths(data_paths, train_ratio, valid_ratio):
    
    """count number of data paths"""
    num_datapaths = len(data_paths)
    indices = list(range(num_datapaths))
    
    train_split = int(np.floor(train_ratio * num_datapaths))
    valid_split = int(np.floor((train_ratio+valid_ratio) * num_datapaths))

    """set the seed and shuffle"""
    np.random.seed(seed)
    np.random.shuffle(indices)
    
    """spllit data paths"""
    train_data_paths      = []
    validation_data_paths = []
    test_data_pahts       = []
    
    
    train_idx, valid_idx, test_idx = indices[:train_split], indices[train_split:valid_split], indices[valid_split:]
    

    for i in range(0,len(train_idx),1):
        train_data_paths.append( data_paths[train_idx[i]])
        
    for i in range(0,len(valid_idx),1):
        validation_data_paths.append( data_paths[valid_idx[i]])
        
    for i in range(0,len(test_idx),1):
        test_data_pahts.append( data_paths[test_idx[i]])
    
    return train_data_paths, validation_data_paths, test_data_pahts

In [None]:
train_paths_10s, validation_paths_10s, test_paths_10s = Make_train_validation_test_paths(data_paths_10s,0.4,0.3)
train_paths_20s, validation_paths_20s, test_paths_20s = Make_train_validation_test_paths(data_paths_20s,0.4,0.3)
train_paths_30s, validation_paths_30s, test_paths_30s = Make_train_validation_test_paths(data_paths_30s,0.4,0.3)
train_paths_40s, validation_paths_40s, test_paths_40s = Make_train_validation_test_paths(data_paths_40s,0.4,0.3)
train_paths_50s, validation_paths_50s, test_paths_50s = Make_train_validation_test_paths(data_paths_50s,0.4,0.3)
train_paths_60s, validation_paths_60s, test_paths_60s = Make_train_validation_test_paths(data_paths_60s,0.4,0.3)
train_paths_70s, validation_paths_70s, test_paths_70s = Make_train_validation_test_paths(data_paths_70s,0.4,0.3)
train_paths_80s, validation_paths_80s, test_paths_80s = Make_train_validation_test_paths(data_paths_80s,0.4,0.3)
train_paths_90s, validation_paths_90s, test_paths_90s = Make_train_validation_test_paths(data_paths_90s,0.4,0.3)

In [None]:
train_paths      = train_paths_10s + train_paths_20s + train_paths_30s + train_paths_40s + train_paths_50s + train_paths_60s + train_paths_70s + train_paths_80s + train_paths_90s
validation_paths = validation_paths_10s + validation_paths_20s + validation_paths_30s + validation_paths_40s + validation_paths_50s + validation_paths_60s + validation_paths_70s + validation_paths_80s + validation_paths_90s
test_paths       = test_paths_10s + test_paths_20s + test_paths_30s + test_paths_40s + test_paths_50s + test_paths_60s + test_paths_70s + test_paths_80s + test_paths_90s
len(train_paths), len(validation_paths), len(test_paths)

# Custom Dataset

In [None]:
class CustomDataset(Dataset):

    def __init__(self, data_paths, transform=None):
        self.data_paths = data_paths
        self.transform = transform

    def __getitem__(self, idx):
          
        """get data"""
        tempt = self.data_paths[idx] 
        path  = label_data[tempt]['data_path']
        data = np.load(file = path)
        
        
        """get label"""
        
        if label_data[tempt]['age'] <= 40:
            label = 0
        else:
            label = 1
        

        return data,label
    
    
    def __len__(self):
        return len(self.data_paths)

In [None]:
train_dataset      = CustomDataset(train_paths,transforms.Compose([transforms.ToTensor()]))
validation_dataset = CustomDataset(validation_paths,transforms.Compose([transforms.ToTensor()]))
test_dataset       = CustomDataset(test_paths,transforms.Compose([transforms.ToTensor()]))

print(len(train_dataset),len(validation_dataset), len(test_dataset))

# Data Loader

In [None]:
"""Train"""
train_loader = torch.utils.data.DataLoader(
    dataset=train_dataset,
    batch_size = batch_size,
    shuffle = True,  
    **kwargs
)

"""Validation"""
validation_loader = torch.utils.data.DataLoader(
    dataset=validation_dataset,
    batch_size = batch_size,
    shuffle = True,
    **kwargs
)

"""Test"""
test_loader = torch.utils.data.DataLoader(
    dataset=test_dataset,
    batch_size = test_batch_size,
    shuffle = True,
    **kwargs
)


In [None]:
print("Length of the train_loader:", len(train_loader))
print("Length of the val_loader:", len(validation_loader))
print("Length of the test_loader:", len(test_loader))

# Optimizer

In [None]:
model = ECG_CNN().to(device)

optimizer = optim.Adam(model.parameters(), lr=lr)

# Train

## init

In [None]:
"""Train"""
train_losses        = []
avg_train_losses    = []
Train_baths_ACC     = [] 
Train_ACC           = [] 


"""Validaion"""
valid_losses        = []
avg_valid_losses    = []
Validation_ACC      = []
Valid_ACC_per_Class = []


In [None]:
num_classes = 2
criterion = nn.CrossEntropyLoss()

In [None]:
"""save best model"""
best_acc = 0
best_model_save_path = 'path'
best_model_save_path = best_model_save_path + str(experiment_num)

# Model save and Load

In [None]:
# 모델 저장 경로
save_path = 'path'

In [None]:
save_path = save_path + str(experiment_num)

In [None]:
torch.save(model, save_path)

# Test

In [None]:
print(best_acc)

# CAM

In [None]:
class FeatureExtractor():
    """ Class for extracting activations and 
    registering gradients from targetted intermediate layers """

    def __init__(self, model):
        self.model = model
        self.gradients = []

    def save_gradient(self, grad):
        self.gradients.append(grad)

    def __call__(self, x):
        outputs = []
        self.gradients = []
        for name, module in self.model._modules.items():

            if name == '3':
                x = module(x)
               
            else:
                x = module(x)
                x.register_hook(self.save_gradient)
                outputs += [x]
        return outputs, x


class ModelOutputs():
    """ Class for making a forward pass, and getting:
    1. The network output.
    2. Activations from intermeddiate targetted layers.
    3. Gradients from intermeddiate targetted layers. """

    def __init__(self, model, feature_module):
        self.model = model
        self.feature_module = feature_module
        self.feature_extractor = FeatureExtractor(self.feature_module)

    def get_gradients(self):
        return self.feature_extractor.gradients

    def __call__(self, x):
        target_activations = []
        for name, module in self.model._modules.items():
            if module == self.feature_module:
                target_activations, x = self.feature_extractor(x)
            elif "globalavrpool1" in name.lower():
                x = module(x)
                x = x.view(-1, 192)
            elif name == 'fc3':
                x = F.softmax(module(x))
            elif name == 'fc1' or name == 'fc2':
                x = F.relu(module(x))
            else:
                x = module(x)
        
        return target_activations, x



def interpolation(mask,shape):
    x = np.linspace(0,10,shape)
    y = mask
    fq = interp1d(x,y,kind = 'quadratic')

    xint = np.linspace(x.min(), x.max(), 5000)
    yintq = fq(xint)
    yintq = yintq - np.min(yintq)
    yintq = yintq / (np.max(yintq) +0.00001)
    return yintq

class GradCam:
    def __init__(self, model, feature_module, use_cuda):
        self.model = model
        self.feature_module = feature_module
        self.model.eval()
        self.cuda = use_cuda
        if self.cuda:
            self.model = model.cuda()

        self.extractor = ModelOutputs(self.model, self.feature_module)

    def forward(self, input):
        return self.model(input)

    def __call__(self, input, index=None):
        if self.cuda:
            features, output = self.extractor(input.cuda())
            
        else:
            features, output = self.extractor(input)

        if index == None:
            index = np.argmax(output.cpu().data.numpy())
        
        one_hot = np.zeros((1, output.size()[-1]), dtype=np.float32)
        one_hot[0][index] = 1
        one_hot = torch.from_numpy(one_hot).requires_grad_(True)
        

        if self.cuda:
            one_hot = torch.sum(one_hot.cuda() * output)
        else:
            one_hot = torch.sum(one_hot * output)

        self.feature_module.zero_grad()
        self.model.zero_grad()
        
        one_hot.backward(retain_graph=True)
        grads_val = self.extractor.get_gradients()[-1].cpu().data.numpy()
        target = features[-1]
        target = target.cpu().data.numpy()[0, :]


        weights = np.mean(grads_val, axis=(2))[0, :]
        cam = np.zeros(target.shape[1:], dtype=np.float32)

        for i, w in enumerate(weights):

            cam += w * target[i, :]
            
        cam  = torch.from_numpy(cam)
        cam = F.relu(cam)
        cam =  cam.numpy()
           
        
        cam = cam - np.min(cam)
        cam = cam / (np.max(cam) +0.00001)
        return cam


# Load best model

In [None]:
best_model_save_path = 'path'
best_model_save_path = best_model_save_path + str(experiment_num)

best_model_save_path

In [None]:
del model

In [None]:
model = torch.load(best_model_save_path)
model.to(device)

optimizer = optim.Adam(model.parameters(), lr=lr)

In [None]:
cam_path = 'path'

In [None]:
grad_cam = GradCam(model=model, feature_module=model.conv3, \
                use_cuda=True)
model.eval()
index = 0
for dnn_inputs, dnn_target in test_loader:

        
    for i in range(len(dnn_inputs)):
        print(i)
        target_index = dnn_target[i]
        grad_cam_input = dnn_inputs[i].unsqueeze(0).cuda()
        plot_cam_input = dnn_inputs[i]
        
        output = model(grad_cam_input).cpu()
        if output[0][1] > 0.5:
            predicted_index = 1
        else:
            predicted_index = 0

        index = index + 1
        if target_index == predicted_index:
            mask = grad_cam(grad_cam_input, target_index)
            if mask.sum() == 0:
                pass
            else:

                interpolted_mask = interpolation(mask,mask.shape[0])
                a = np.append(np.expand_dims(interpolted_mask, axis=0), np.expand_dims(interpolted_mask, axis=0), axis=0)
                for j in range(50):
                    a = np.append(a,np.expand_dims(interpolted_mask, axis=0),axis = 0)
                heatmap = cv2.applyColorMap(np.uint8(a*255), cv2.COLORMAP_JET)
                heatmap = np.float32(heatmap)

                f, axes = plt.subplots(12, 1, figsize=(12, 20))
                x = np.arange(0,10,0.002)
                for j in range(12):
                    axes[j].plot(x, plot_cam_input[j],color = 'black')
                    axes[j].set_xlim([0, 10])
                    axes[j].axis('off')

                f.tight_layout()
                plt.savefig('./signal.jpg')
                img = cv2.imread('./signal.jpg')
                heatmap = cv2.resize(heatmap , dsize=(img.shape[1],img.shape[0]), interpolation=cv2.INTER_AREA)
                
                
                #overlay
                cam = cv2.addWeighted(np.float32(img),0.5,heatmap,0.75,0)
                
                # normalize
                cam2 = cv2.normalize(cam, None, 0, 255, cv2.NORM_MINMAX, cv2.CV_8U)
                # BGR 2 RGB
                cam2 = cv2.cvtColor(cam2, cv2.COLOR_BGR2RGB)
                
                
                plt.imshow(cam2)
                plt.axis('off')
                
                if target_index == 1:
                     plt.imsave(cam_path+'/'+'1/'+str(index)+'.png', cam2, dpi=600)
                    
                else:
                     plt.imsave(cam_path+'/'+'0/'+str(index)+'.png', cam2, dpi=600)
                
                
                del f, axes
        else:
            pass
