In [1]:
import torch
import torchvision
from torchvision import datasets, models, transforms
from torchvision.io import ImageReadMode
from torchvision.io import read_image
from torchvision.datasets import ImageFolder
import numpy as np
from sklearn.model_selection import train_test_split
import pandas as pd
import numpy as np
import pandas as pd
import torch 
import matplotlib.pyplot as plt
from scipy.io import loadmat
import torch.nn as nn
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader
from torch.utils.data import Dataset
import torch.nn.functional as F
import os
from torchvision import transforms as T
import math
from sklearn.metrics import *

In [2]:
device = "cuda:0" if torch.cuda.is_available() else "cpu"
device

'cuda:0'

In [3]:
class ECGimage_DB(Dataset):
    def __init__(self, annotations_file, img_dir, transform=None, target_transform=None):
        self.img_labels = pd.read_csv(annotations_file)
        self.img_dir = img_dir 
        self.transform = transform
        self.target_transform = target_transform

    def __len__(self):
        return len(self.img_labels)

    def __getitem__(self, idx):
        img_path = os.path.join(self.img_dir, self.img_labels.iloc[idx, 0] +'.png')
        image = read_image(img_path, mode=ImageReadMode.RGB)
        label = self.img_labels.iloc[idx, 1]
        if self.transform:
            image = self.transform(image)
        if self.target_transform:
            label = self.target_transform(label)
        return image.float(), label-1

In [4]:
class ECGimage_DBv(Dataset):
    def __init__(self, annotations_file, img_dir, transform=None, target_transform=None):
        self.img_labels = pd.read_csv(annotations_file)
        self.img_dir = img_dir 
        self.transform = transform
        self.target_transform = target_transform

    def __len__(self):
        return len(self.img_labels)

    def __getitem__(self, idx):
        img_path = os.path.join(self.img_dir, self.img_labels.iloc[idx, 0] +'.png')
        image = read_image(img_path, mode=ImageReadMode.RGB)
        label = self.img_labels.iloc[idx, 1]
        if self.transform:
            image = self.transform(image)
        if self.target_transform:
            label = self.target_transform(label)
        return image.float(), label-1

In [5]:
class AlexNet(nn.Module):
    def __init__(self, num_classes=9):
        super(AlexNet, self).__init__()
        self.layer1 = nn.Sequential(
            nn.Conv2d(3, 96, kernel_size=11, stride=4, padding=0),
            nn.BatchNorm2d(96),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size = 3, stride = 2))
        self.layer2 = nn.Sequential(
            nn.Conv2d(96, 256, kernel_size=5, stride=1, padding=2),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size = 3, stride = 2))
        self.layer3 = nn.Sequential(
            nn.Conv2d(256, 384, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(384),
            nn.ReLU())
        self.layer4 = nn.Sequential(
            nn.Conv2d(384, 384, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(384),
            nn.ReLU())
        self.layer5 = nn.Sequential(
            nn.Conv2d(384, 256, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size = 3, stride = 2))
        self.fc = nn.Sequential(
            nn.Dropout(0.5),
            nn.Linear(9216, 4096),
            nn.ReLU())
        self.fc1 = nn.Sequential(
            nn.Dropout(0.5),
            nn.Linear(4096, 4096),
            nn.ReLU()
            )
        self.fc2= nn.Sequential(
            nn.Linear(4096, num_classes))
        
    def forward(self, x):
        out = self.layer1(x)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)
        out = self.layer5(out)
        out = out.reshape(out.size(0), -1)
        out = self.fc(out)
        out = self.fc1(out)
        out = self.fc2(out)
        return out

In [6]:
model = AlexNet().to(device)
learning_rate = 1e-3
epochs = 10
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate, weight_decay = 0.005, momentum = 0.9)
loss_fn = nn.CrossEntropyLoss()

In [7]:
path="ECGimg_ts"
train_db=ECGimage_DB('train_image.csv',path)
valid_db= ECGimage_DBv('val_image.csv',path)

train_dl=DataLoader(train_db, batch_size=16)
valid_dl=DataLoader(valid_db, batch_size=16)
# image = read_image(os.path.join(path,'A0001.png'),mode=ImageReadMode.RGB)

# print(image.size())
# flatten = nn.Flatten()
# flat_image = flatten(image).float()
# print(flat_image.size())

In [8]:
def train_loop(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    for batch, (X, y) in enumerate(dataloader,0):
        X = X.to(device)
        y = y.to(device)
        # Backpropagation
        pred = model(X)
        optimizer.zero_grad()
        loss = loss_fn(pred, y)
        loss.backward()
        optimizer.step()

        if batch % 100 == 0:
            loss, current = loss.item(), batch * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")

In [9]:
def test_loop(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    test_loss, correct = 0, 0
    running_y, running_pred = [], []

    with torch.no_grad():
        for X, y in dataloader:
            X = X.to(device)
            y = y.to(device)
            pred = model(X)
            pred1 = torch.argmax(pred,1)
            labels = list(y.cpu().numpy())
            
            preds = list(pred1.cpu().numpy())
            
            running_y.extend(labels)
            running_pred.extend(preds)
            test_loss += loss_fn(pred, y).item()
            # correct += (pred.argmax(1) == y).type(torch.float).sum().item()
    f1 = f1_score(running_y,running_pred, average = 'macro')

    test_loss /= num_batches
    correct /= size
    # print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")
    print(f"Test Error: \n F1: {(f1):>0.1f}, Avg loss: {test_loss:>8f} \n")

In [10]:
epochs = 50
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train_loop(train_dl, model, loss_fn, optimizer)
    test_loop(valid_dl, model, loss_fn)
print("Done!")

Epoch 1
-------------------------------
loss: 2.231246  [    0/ 6189]
loss: 2.081947  [ 1600/ 6189]
loss: 2.161009  [ 3200/ 6189]
loss: 2.060879  [ 4800/ 6189]
Test Error: 
 F1: 0.2, Avg loss: 1.850080 

Epoch 2
-------------------------------
loss: 2.134130  [    0/ 6189]
loss: 1.857631  [ 1600/ 6189]
loss: 1.729461  [ 3200/ 6189]
loss: 1.933644  [ 4800/ 6189]
Test Error: 
 F1: 0.3, Avg loss: 1.716331 

Epoch 3
-------------------------------
loss: 2.002716  [    0/ 6189]
loss: 1.736678  [ 1600/ 6189]
loss: 1.546056  [ 3200/ 6189]
loss: 1.864898  [ 4800/ 6189]
Test Error: 
 F1: 0.3, Avg loss: 1.678661 

Epoch 4
-------------------------------
loss: 1.981087  [    0/ 6189]
loss: 1.658165  [ 1600/ 6189]
loss: 1.524407  [ 3200/ 6189]
loss: 1.726446  [ 4800/ 6189]
Test Error: 
 F1: 0.3, Avg loss: 1.621677 

Epoch 5
-------------------------------
loss: 2.003408  [    0/ 6189]
loss: 1.487002  [ 1600/ 6189]
loss: 1.497447  [ 3200/ 6189]
loss: 1.629867  [ 4800/ 6189]
Test Error: 
 F1: 0.3, A