In [None]:
!curl -o test_set.zip https://zenodo.org/records/1327317/files/test_set.zip?download=1 -o test_set_pixel_size.csv https://zenodo.org/records/1327317/files/test_set_pixel_size.csv?download=1 -o training_set.zip https://zenodo.org/records/1327317/files/training_set.zip?download=1 -o training_set_pixel_size_and_HC.csv https://zenodo.org/records/1327317/files/training_set_pixel_size_and_HC.csv?download=1

In [11]:
from ultralytics import YOLO
import os
from torch.utils.data import TensorDataset, DataLoader, Dataset
import tqdm
import cv2
import re
import torch
from torch import nn
import torchvision
from torchvision import datasets, transforms
from torchvision.transforms import ToTensor, Lambda, Compose
import zipfile
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import random
from timeit import default_timer as timer
from pathlib import Path
from typing import Tuple, Dict, List
import torch.nn.functional as f
from timeit import default_timer as timer

In [7]:
# Path to the zip file
zip_file_path = r'C:/Users/Asus/.vscode/VSC/ML in Med/test_set.zip'
# Directory to extract the contents to
extract_to_dir = r'C:/Users\Asus/.vscode/VSC/ML in Med/'
# Create the directory if it doesn't exist
if not os.path.exists(extract_to_dir):
    os.makedirs(extract_to_dir)
# Open the zip file
with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
    # Extract all contents into the specified directory
    zip_ref.extractall(extract_to_dir)

print("Extraction completed.")

Extraction completed.


In [16]:
class CustomDataset(Dataset):
    def __init__(self, link):
        self.link = link
    
    def __getitem__(self, index):
        img = cv2.imread(self.link + f"{index:03d}"+"_HC.png", cv2.IMREAD_GRAYSCALE)
        anno = cv2.imread(self.link + f"{index:03d}"+"_HC_Annotation.png", cv2.IMREAD_GRAYSCALE)
        target_height, target_width = 540, 800

        if img.shape[0] > target_height:
            img = img[:target_height, :]
            ano = ano[:target_height, :]
        if img.shape[1] > target_width:
            img = img[:, :target_width]
            ano = ano[:, :target_width]
        if img.shape[0] < target_height or img.shape[1] < target_width:
            new_img = np.zeros((target_height, target_width))
            new_ano = np.zeros((target_height, target_width))
            new_img[:img.shape[0], :img.shape[1]] = img
            new_ano[:img.shape[0], :img.shape[1]] = ano
            img, ano = new_img, new_ano
            
        img = torch.FloatTensor(img).unsqueeze(0)
        mask = np.zeros((548, 804))
        anno = np.array(anno, np.uint8)
        contours, _ = cv2.findContours(anno, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
        ellipse = cv2.fitEllipse(contours[0])
        mask = cv2.ellipse(mask, ellipse, (255,255,255), -1)
        mask = (mask / 255).astype(int)
        mask = torch.FloatTensor(mask).unsqueeze(0)
        
        return img, mask
    
    def __len__(self):
        return 806

In [14]:
class Unet(nn.Module):
    def __init__(self):
        super().__init__()
        
        self.in_block_1 = nn.Sequential(
            nn.Conv2d(1, 64, 3, padding=(96, 94)),
            nn.ReLU(),
            nn.Conv2d(64, 64, 3),
            nn.ReLU()
        )
        
        self.in_block_2 = nn.Sequential(
            nn.MaxPool2d(2, 2),
            nn.Conv2d(64, 128, 3),
            nn.ReLU(),
            nn.Conv2d(128, 128, 3),
            nn.ReLU()
        )
        
        self.in_block_3 = nn.Sequential(
            nn.MaxPool2d(2, 2),
            nn.Conv2d(128, 256, 3),
            nn.ReLU(),
            nn.Conv2d(256, 256, 3),
            nn.ReLU()
        )
        
        self.in_block_4 = nn.Sequential(
            nn.MaxPool2d(2, 2),
            nn.Conv2d(256, 512, 3),
            nn.ReLU(),
            nn.Conv2d(512, 512, 3),
            nn.ReLU()
        )
        
        self.in_bottle_neck = nn.Sequential(
            nn.MaxPool2d(2, 2),
            nn.Conv2d(512, 1024, 3),
            nn.ReLU(),
            nn.Conv2d(1024, 1024, 3),
            nn.ReLU(),
            nn.ConvTranspose2d(1024, 512, 2, 2)
        )
        
        self.out_block_4 = nn.Sequential(
            nn.Conv2d(1024, 512, 3),
            nn.ReLU(),
            nn.Conv2d(512, 512, 3),
            nn.ReLU(),
            nn.ConvTranspose2d(512, 256, 2, 2)
        )
        
        self.out_block_3 = nn.Sequential(
            nn.Conv2d(512, 256, 3),
            nn.ReLU(),
            nn.Conv2d(256, 256, 3),
            nn.ReLU(),
            nn.ConvTranspose2d(256, 128, 2, 2)
        )
        
        self.out_block_2 = nn.Sequential(
            nn.Conv2d(256, 128, 3),
            nn.ReLU(),
            nn.Conv2d(128, 128, 3),
            nn.ReLU(),
            nn.ConvTranspose2d(128, 64, 2, 2)
        )
        
        self.out_block_1 = nn.Sequential(
            nn.Conv2d(128, 64, 3),
            nn.ReLU(),
            nn.Conv2d(64, 64, 3),
            nn.ReLU(),
            nn.Conv2d(64, 1, 1),
            nn.Conv2d(2, 1, 1),
            nn.Sigmoid()
        )
        
    def forward(self, data):
        in_1 = self.in_block_1(data)
        in_2 = self.in_block_2(in_1)
        in_3 = self.in_block_3(in_2)
        in_4 = self.in_block_4(in_3)
            
        out_4 = self.in_bottle_neck(in_4)
        x_4, y_4 = out_4.shape[-2], out_4.shape[-1]
        out_4 = torch.cat([in_4[:, :, in_4.shape[-2] // 2-x_4 // 2:in_4.shape[-2] // 2 - x_4 // 2 + x_4, in_4.shape[-1] // 2 - y_4 // 2:in_4.shape[-1] // 2 - y_4 // 2 + y_4], out_4], dim=1)
            
        out_3 = self.out_block_4(out_4)
        x_3, y_3 = out_3.shape[-2], out_3.shape[-1]
        out_3 = torch.cat([in_3[:, :, in_3.shape[-2] // 2 - x_3 // 2:in_3.shape[-2] // 2 - x_3 // 2 + x_3, in_3.shape[-1] // 2 - y_3 // 2:in_3.shape[-1] // 2 - y_3 // 2 + y_3], out_3], dim=1)
            
        out_2 = self.out_block_3(out_3)
        x_2, y_2 = out_2.shape[-2], out_2.shape[-1]
        out_2 = torch.cat([in_2[:, :, in_2.shape[-2] // 2 - x_2 // 2:in_2.shape[-2] // 2 - x_2 // 2 + x_2, in_2.shape[-1] // 2 - y_2 // 2:in_2.shape[-1] // 2 - y_2 // 2 + y_2], out_2], dim=1)
            
        out_1 = self.out_block_2(out_2)
        x_1, y_1 = out_1.shape[-2], out_1.shape[-1]
        out_1 = torch.cat([in_1[:, :, in_1.shape[-2] // 2 - x_1 // 2:in_1.shape[-2] // 2 - x_1 // 2 + x_1, in_1.shape[-1] // 2 - y_1 // 2:in_1.shape[-1] // 2 - y_1 // 2 + y_1], out_1], dim=1)
            
        output = self.out_block_1(out_1)
            
        return output

In [6]:
device = "cuda" if torch.cuda.is_available() else "cpu"

In [17]:
training_set = CustomDataset(r'C:/Users/Asus/.vscode/VSC/ML in Med/training_set/')
training_loader = DataLoader(training_set, batch_size=32, shuffle=True)

In [18]:
model = Unet().to(device)
next(model.parameters()).device

device(type='cpu')

In [9]:
loss_fn = nn.BCELoss()
optimizer = torch.optim.Adam(params=model.parameters(), lr=0.01)

In [10]:
def accuracy_fn(y_pred, y_true):
    correct = torch.eq(y_true, y_pred).sum().item()
    acc = (correct / len(y_pred)) * 100
    return acc

In [12]:
def train_step(data_loader: torch.utils.data.DataLoader, model: torch.nn.Module,
               loss_fn: torch.nn.Module, optimizer: torch.optim.Optimizer,
               accuracy_fn, device: torch.device = device):
    model.to(device)
    model.train()
    train_loss, train_acc = 0, 0
    for batch, (X, y) in enumerate(data_loader):
        # Send data to GPU
        X, y = X.to(device), y.to(device).squeeze(1)

        y_pred = model(X)

        loss = loss_fn(y_pred, y)
        train_loss += loss
        train_acc += accuracy_fn(y_true=y, y_pred=y_pred.argmax(dim=1))

        optimizer.zero_grad()

        loss.backward()

        optimizer.step()
        
        y_pred_class = torch.argmax(torch.softmax(y_pred, dim=1), dim=1)
        
    train_loss = train_loss / len(data_loader)
    train_acc = train_acc / len(data_loader)
    print(f"Train loss: {train_loss:.5f} | Train accuracy: {train_acc:.2f}%")
    return train_loss, train_acc

In [None]:
def train(model: torch.nn.Module, train_loader: torch.utils.data.DataLoader, test_loader: torch.utils.data.DataLoader, optimizer: torch.optim.Optimizer,loss_fn: torch.nn.Module = nn.CrossEntropyLoss()):
    results = {
        "train_loss": [],
        "train_acc": [],
        "test_loss": [],
        "test_acc": []
        }
    for epoch in range (6): #tqdm(range(epochs)):
        
        train_loss, train_acc = train_step(data_loader = training_loader,
                                           model = model, loss_fn = loss_fn, optimizer = optimizer, accuracy_fn = accuracy_fn, device = device)

        print(
            f"Epoch: {epoch+1} | "
            f"train_loss: {train_loss:.4f} | "
            f"train_acc: {train_acc:.4f} | "
        )

        results["train_loss"].append(train_loss)
        results["train_acc"].append(train_acc)
    return results

In [None]:
def print_train_time(start: float, end: float, device: torch.device = None):
  total_time = end - start
  print(f"Train time on {device}: {total_time:.3f} seconds")
  return total_time

In [None]:
def eval_model(model: torch.nn.Module, data_loader: torch.utils.data.DataLoader, loss_fn: torch.nn.Module, accuracy_fn, device: torch.device = device):
  loss, acc = 0, 0
  model.eval()
  with torch.inference_mode():
    for X, y in data_loader:
      X, y = X.to(device), y.to(device).squeeze(1)
      y_pred = model(X)

      loss += loss_fn(y_pred, y)
      acc += accuracy_fn(y_true = y, y_pred = y_pred.argmax(dim = 1))

    loss /= len(data_loader)
    acc /= len(data_loader)

  return {"model_name": model.__class__.__name__, 
          "model_loss": loss.item(),
          "model_acc": acc}

In [None]:
def plot_loss_curves(results: Dict[str, List[float]]):
    loss = torch.tensor(results['train_loss'])
    test_loss = torch.tensor(results['test_loss'])

    accuracy = results['train_acc']
    test_accuracy = results['test_acc']

    epochs = range(len(results['train_loss']))
 
    plt.figure(figsize=(15, 7))
    plt.subplot(1, 2, 1)
    plt.plot(epochs, loss, label='train_loss')
    plt.plot(epochs, test_loss, label='test_loss')
    plt.title('Loss')
    plt.xlabel('Epochs')
    plt.legend()
    # Plot accuracy
    plt.subplot(1, 2, 2)
    plt.plot(epochs, accuracy, label='train_accuracy')
    plt.plot(epochs, test_accuracy, label='test_accuracy')
    plt.title('Accuracy')
    plt.xlabel('Epochs')
    plt.legend()

In [19]:
train_time_start_model = timer()

for epochs in range(10):
    train_step(data_loader=training_loader, model=model, loss_fn=loss_fn, optimizer=optimizer, accuracy_fn=accuracy_fn, device=device)

train_time_end_model = timer()
total_train_time_model = print_train_time(start=train_time_start_model, end=train_time_end_model, device=device)

In [None]:
eval_model(model = model_1, data_loader = mitbih_test_dataloader, loss_fn = loss_fn, accuracy_fn = accuracy_fn, device = device)

In [None]:
plot_loss_curves(model_1_results)