# Diagnosis based on ECG data

- - - 

* Diagnosis using collected ECG data
* Currently total 42 subjects
* 3 classes (DEP, SUI, NOR)

- - -

In [None]:
# importing required components 
import os
import time
import json
import urllib
import random
import torch
import torchvision
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

In [None]:
from PIL import Image
from pdf2image import convert_from_path

from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix
from torchvision import transforms, datasets, models
from torch.utils.data import DataLoader
from torch.utils.data.sampler import SubsetRandomSampler

## Basic data check

In [None]:
ecg_df = pd.read_csv('E:/RESEARCH/Datasets/wearable/AI_coded_1.csv', sep=',')

* 1: depression, 2: suicidality, 3: normal

In [None]:
ecg_df['class'] = ecg_df['class'].astype("category")
ecg_df['sub'] = ecg_df['sub'].astype("category")

In [None]:
depression = ecg_df[ecg_df['class']==1]
print((depression.index)+1)

In [None]:
suicidal = ecg_df[ecg_df['class']==2]
print((suicidal.index)+1)

In [None]:
normal = ecg_df[ecg_df['class']==3]
print((normal.index)+1)

- - -

* Data handling

In [None]:
## Checking file path and names
file_path = "E:/RESEARCH/Datasets/wearable/ECG/test_0420/dep/"
file_names = os.listdir(file_path)

In [None]:
## Changing file names to 1, 2, ...
i = 1
for name in file_names:
    src = os.path.join(file_path, name)
    dst = str(i) + '.png'
    dst = os.path.join(file_path, dst)
    os.rename(src, dst)
    i += 1

In [None]:
## Converting pdf file into png file
for name in file_names:
    pages = convert_from_path(file_path + name, poppler_path="E:/RESEARCH/Datasets/wearable/ECG/poppler/Library/bin")
    
    for page in pages:
        page.save(file_path + name + '.png', "PNG")

In [None]:
## Data Crop (deleting patients' information)
# original png size = 2200 x 1700
# leaving the important part only (only the ecg data part)
left = 100
top = 450
right = 2100
bottom = 1300

for name in file_names:
    im = Image.open(file_path + name)
    imc = im.crop((left, top, right, bottom))
    imc.save(file_path+name+'.png')

- - -

## With simple image classification

* Our dataset numbers (dep: 244, nor: 402, sui: 105)

In [None]:
class Args:
    # arugments
    epochs=30
    bs=8
    lr=0.001
    momentum=0.9
    num_channels=3 
    num_classes=3
    verbose='store_true'
    seed=710674

args = Args()    

np.random.seed(args.seed)
random.seed(args.seed)
torch.manual_seed(args.seed)

In [None]:
#Setting torch environment

if torch.cuda.is_available():
    DEVICE = torch.device('cuda')
else:
    DEVICE = torch.device('cpu')
    
print('Using PyTorch version:', torch.__version__, ' Device: ', DEVICE)

In [None]:
# model_res = models.resnet18(num_classes=2, pretrained=True)
# model_eff3 = EfficientNet.from_pretrained('efficientnet-b3', num_classes=args.num_classes)
model_resnet18 = models.resnet18(pretrained=True)
# model_mobnetv2 = models.mobilenet_v2(pretrained=True)

In [None]:
## resnet 구조는 마지막 fc layer의 out_features 를 바꿔주면 되고.
# model_resnet18.fc = nn.Linear(in_features = 512, out_features = args.num_classes)

In [None]:
# model = model_res.to(DEVICE)
# model = model_eff3.to(DEVICE)
# model = model_mobnetv2.to(DEVICE)
model = model_resnet18.to(DEVICE)

In [None]:
# Data Transformation
data_transforms = transforms.Compose([
    transforms.Resize(256),
    transforms.RandomResizedCrop(256),
    transforms.RandomHorizontalFlip(),
    transforms.RandomVerticalFlip(),
    transforms.ColorJitter(contrast=(0.3, 1), saturation=(0.3, 1)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456,0.406], [0.229, 0.224, 0.225])
])

In [None]:
# Uploading image data
ecg_data = datasets.ImageFolder(root = 'E:/RESEARCH/Datasets/wearable/ECG/test_0420/train',
                                transform = data_transforms)

In [None]:
train_size = int(0.8 * len(ecg_Data))
test_size = len(ecg_Data)-train_size
print(train_size)
print(test_size)

In [None]:
train_dataset, test_dataset = torch.utils.data.random_split(ecg_data, [train_size, test_size])

In [None]:
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=args.bs, shuffle=True, num_workers=4)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=args.bs, shuffle=False, num_workers=4)

In [None]:
dataiter = iter(train_loader)
images, labels = dataiter.next()
print(labels)

In [None]:
# Setting Optimizer and Objective Function

optimizer = torch.optim.Adam(model.parameters(), lr = args.lr)
scheduler = torch.optim.lr_scheduler.OneCycleLR(optimizer, max_lr=0.01, total_steps=30, anneal_strategy='cos')
criterion = nn.CrossEntropyLoss() ## setup the loss function

# print(model)

In [None]:
# Function for checking model performance during CNN model

def train(model, train_loader, optimizer, log_interval):
    model.train()
    print(optimizer.param_groups[0]['lr'])
    
    for batch_idx, (image, label) in enumerate(train_loader):
        image = image.to(DEVICE)
        label = label.to(DEVICE)
        optimizer.zero_grad()
        output = model(image)
        loss = criterion(output, label)
        loss.backward()
        optimizer.step()

        if batch_idx % log_interval == 0:
            print("Train Epoch: {} [{}/{} ({:.0f}%)]\tTrain Loss: {:.6f}".format(
                epoch, batch_idx * len(image), 
                len(train_loader.dataset), 100. * batch_idx / len(train_loader), 
                loss.item()))

    scheduler.step() #for learning rate scheduler

In [None]:
# Function for checking model performance during the learning process

def evaluate(model, test_loader):
    model.eval()
    test_loss = 0
    correct = 0
    validation =[]

    with torch.no_grad():
        for image, label in test_loader:
            image = image.to(DEVICE)
            label = label.to(DEVICE)
            output = model(image)
            test_loss += criterion(output, label).item()
            prediction = output.max(1, keepdim = True)[1]
            correct += prediction.eq(label.view_as(prediction)).sum().item()
    
    test_loss /= (len(test_loader)) 
    validation_accuracy = 100. * correct / len(test_loader.dataset)
    validation.append(validation_accuracy)
    
    return test_loss, validation_accuracy

In [None]:
# Checking train, val loss and accuracy

total = []

for epoch in range(1, args.epochs):
    train(model, train_loader, optimizer, log_interval = 200)
    test_loss, validation_accuracy = evaluate(model, test_loader)
    print("\n[EPOCH: {}], \tTest Loss: {:.4f}, \tValidation Accuracy: {:.2f} % \n".format(
        epoch, test_loss, validation_accuracy))
    
    total.append((test_loss, validation_accuracy))