# 1. Build your own convolutional neural network using pytorch

Define the `NeuralNet` model, which is a customizable convolutional neural network designed for image classification tasks. 

It has a sequence of convolutional layers with optional pooling layers for feature extraction, followed by a series of fully connected layers for classification. We can adjust the number and configuration of hidden layers and pooling operations.

In [1]:
from typing import List

import torch
import torch.nn as nn

In [2]:
class NeuralNet(nn.Module):

    def __init__(
        self, 
        n_hiddens: List[int], 
        poolings: List[bool], 
        n_classes: int
    ):
        assert len(n_hiddens) == len(poolings)

        super().__init__()
        self.n_hiddens: List[int] = n_hiddens
        self.poolings: List[bool] = poolings
        self.n_classes: int = n_classes

        feature_extractor_modules: List[nn.Module] = []
        for n_hidden, pooling in zip(n_hiddens, poolings):
            feature_extractor_modules.extend([
                nn.LazyConv2d(out_channels=n_hidden, kernel_size=3, padding=1),
                nn.BatchNorm2d(num_features=n_hidden),
                nn.ReLU(),
            ])
            if pooling:
                feature_extractor_modules.append(nn.MaxPool2d(kernel_size=2, stride=2))
        
        self.feature_extractor = nn.Sequential(*feature_extractor_modules)

        # Classifier
        self.classifier = nn.Sequential(
            nn.Flatten(start_dim=1, end_dim=-1),
            nn.LazyLinear(out_features=1024),
            nn.ReLU(),
            nn.LazyLinear(out_features=512),
            nn.ReLU(),
            nn.LazyLinear(out_features=n_classes),
        )

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        y = self.feature_extractor(x)
        y = self.classifier(y)
        return y

# 2. Train your model using dog heart dataset (you may need to use  Google Colab (or Kaggle) with GPU to train your code) 

### (1) use torchvision.datasets.ImageFolder for the training dataset
### (2) use custom dataloader for test dataset (return image tensor and file name)

### Utility classes:
Define `Accumulator` class to track performance metrics:

In [3]:
import os
import pathlib
import time
from typing import Optional, Dict, TextIO, Any
from collections import defaultdict
import datetime as dt

import torch
import torch.nn as nn

In [4]:
class Accumulator:
    """
    A utility class for accumulating values for multiple metrics.
    """

    def __init__(self) -> None:
        self.__records: defaultdict[str, float] = defaultdict(float)

    def add(self, **kwargs: Any) -> None:
        """
        Add values to the accumulator.

        Parameters:
            - **kwargs: named metric and the value is the amount to add.
        """
        metric: str
        value: float
        for metric, value in kwargs.items():
            # Each keyword argument represents a metric name and its value to be added
            self.__records[metric] += value
    
    def reset(self) -> None:
        """
        Reset the accumulator by clearing all recorded metrics.
        """
        self.__records.clear()

    def __getitem__(self, key: str) -> float:
        """
        Retrieve a record by key.

        Parameters:
            - key (str): The record key name.

        Returns:
            - float: The record value.
        """
        return self.__records[key]


Define `EarlyStopping` to early stop the training process given on some validation metrics:

In [5]:
class EarlyStopping:
    """
    A simple early stopping utility to terminate training when a monitored metric stops improving.

    Attributes:
        - patience (int): The number of epochs with no improvement after which training will be stopped.
        - tolerance (float): The minimum change in the monitored metric to qualify as an improvement,
        - considering the direction of the metric being monitored.
        - bestscore (float): The best score seen so far.
    """
    
    def __init__(self, patience: int, tolerance: float = 0.) -> None:
        """
        Initializes the EarlyStopping instance.
        
        Parameters:
            - patience (int): Number of epochs with no improvement after which training will be stopped.
            - tolerance (float): The minimum change in the monitored metric to qualify as an improvement. 
            Defaults to 0.
        """
        self.patience: int = patience
        self.tolerance: float = tolerance
        self.bestscore: float = float('inf')
        self.__counter: int = 0

    def __call__(self, value: float) -> None:
        """
        Update the state of the early stopping mechanism based on the new metric value.

        Parameters:
            - value (float): The latest value of the monitored metric.
        """
        # Improvement or within tolerance, reset counter
        if value <= self.bestscore + self.tolerance:
            self.bestscore: float = value
            self.__counter: int = 0

        # No improvement, increment counter
        else:
            self.__counter += 1

    def __bool__(self) -> bool:
        """
        Determine if the training process should be stopped early.

        Returns:
            - bool: True if training should be stopped (patience exceeded), otherwise False.
        """
        return self.__counter >= self.patience

Define `Logger` class to log the training process to file and console:

In [6]:
class Logger:

    """
    A class used to log the training process.

    This class provides methods to log messages to a file and the console. 
    """
    def __init__(
        self, 
        logfile: str = f".log/{dt.datetime.now().strftime('%Y%m%d%H%M%S')}"
    ) -> None:
    
        """
        Initialize the logger.

        Parameters:
            - logfile (str, optional): The path to the logfile. 
            Defaults to a file in the .log directory with the current timestamp.
        """
        self.logfile: pathlib.Path = pathlib.Path(logfile)
        os.makedirs(name=self.logfile.parent, exist_ok=True)
        self._file: TextIO = open(self.logfile, mode='w')

    def log(
        self, 
        epoch: int, 
        n_epochs: int, 
        batch: Optional[int] = None, 
        n_batches: Optional[int] = None, 
        took: Optional[float] = None, 
        **kwargs: Any,
    ) -> None:
        """
        Log a message to console and a log file

        Parameters:
            - epoch (int): The current epoch.
            - n_epochs (int): The total number of epochs.
            - batch (int, optional): The current batch. Defaults to None.
            - n_batches (int, optional): The total number of batches. Defaults to None.
            - took (float, optional): The time it took to process the batch or epoch. Defaults to None.
            - **kwargs: Additional metrics to log.
        """
        suffix: str = ', '.join([f'{metric}: {value:.3e}' for metric, value in kwargs.items()])
        prefix: str = f'Epoch {epoch}/{n_epochs} | '
        if batch is not None:
            prefix += f'Batch {batch}/{n_batches} | '
        if took is not None:
            prefix += f'Took {took:.2f}s | '
        logstring: str = prefix + suffix
        print(logstring)
        self._file.write(logstring + '\n')

    def __del__(self) -> None:
        """
        Close the logfile at garbage collected.
        """
        self._file.close()

Define `CheckPointSaver` to save model's checkpoints during training:

In [7]:
class CheckPointSaver:
    """
    A class used to save PyTorch model checkpoints.

    Attributes:
        - dirpath (pathlib.Path): The directory where the checkpoints are saved.
    """

    def __init__(self, dirpath: str) -> None:
        """
        Initialize the CheckPointSaver.

        Parameters:
            - dirpath (os.PathLike): The directory where the checkpoints are saved.
        """
        self.dirpath: pathlib.Path = pathlib.Path(dirpath)
        os.makedirs(name=self.dirpath, exist_ok=True)

    def save(self, model: nn.Module, filename: str) -> None:
        """
        Save checkpoint to a .pt file.

        Parameters:
            - model (nn.Module): The PyTorch model to save.
            - filename (str): the checkpoint file name
        """
        torch.save(obj=model, f=os.path.join(self.dirpath, filename))

### `Dataset` classes:

In [8]:
import os
from typing import List, Tuple

from PIL import Image

import torch
import torch.utils
import torchvision
from torch.utils.data import Dataset
from torchvision.datasets import ImageFolder

Define `DogHeartLabeledDataset` class for labeled dataset (training and validation), this class extends the `ImageFolder` from `torchvision`:

In [9]:
class DogHeartLabeledDataset(ImageFolder):

    #extend
    def __init__(self, data_root: str) -> None:
        self.transformation = torchvision.transforms.Compose([
            torchvision.transforms.Grayscale(),
            torchvision.transforms.Resize((128, 128)),
            torchvision.transforms.ToTensor(),
        ])
        super().__init__(root=data_root, transform=self.transformation)
        self.data_root: str = data_root

        self.filepaths: List[str] = [path for path, _ in self.samples]
        self.filenames: List[str] = [path.split('/')[-1] for path in self.filepaths]
        self.labels: List[int] = [label for _, label in self.samples]

    #extend
    def __getitem__(self, idx: int) -> Tuple[torch.Tensor, int, str]:
        tensor: torch.Tensor; label: int
        tensor, label = super().__getitem__(idx)
        filename: str = self.filenames[idx]
        return tensor, label, filename

Define `DogHeartUnlabeledDataset` class for unlabeled dataset (testing):

In [10]:
class DogHearUnlabeledDataset(Dataset):

    def __init__(self, data_root: str) -> None:
        self.data_root: str = data_root
        self.transformation = torchvision.transforms.Compose([
            torchvision.transforms.Grayscale(),
            torchvision.transforms.Resize((128, 128)),
            torchvision.transforms.ToTensor(),
        ])
        self.filenames: List[str] = os.listdir(self.data_root)
    
    def __len__(self) -> int:
        return len(self.filenames)
    
    def __getitem__(self, idx) -> Tuple[torch.Tensor, str]:
        filename: str = self.filenames[idx]
        image: Image = Image.open(os.path.join(self.data_root, filename))
        tensor: torch.Tensor = self.transformation(image)
        return tensor, filename

Create dataloaders from labeled datasets:

In [11]:
train_dataset = DogHeartLabeledDataset(data_root='Dog_heart/Train')
valid_dataset = DogHeartLabeledDataset(data_root='Dog_heart/Valid')

Define the loss function:

In [12]:
from typing import List, Tuple, Optional

import numpy as np
import pandas as pd

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.optim import Optimizer, Adam
from torch.utils.data import DataLoader

In [13]:
def loss_function(
    scores: torch.Tensor,
    gt_labels: torch.Tensor,
):
    return F.cross_entropy(input=scores, target=gt_labels, reduction='mean')

Specify the computing device:

In [14]:
device = torch.device('cuda')

Define the `evaluation` function to report the accuracy and data loss over a batched dataset (dataloader):

In [15]:
def evaluate(model: nn.Module, dataloader: DataLoader) -> Tuple[float, float]:
    model.eval()
    metrics = Accumulator()

    # Loop through each batch
    for batch, (batch_images, gt_labels, filenames) in enumerate(dataloader, start=1):
        batch_images = batch_images.to(device)
        gt_labels = gt_labels.to(device)
        scores: torch.Tensor = model(batch_images)
        pred_labels = scores.max(dim=1).indices
        n_corrects = (pred_labels == gt_labels).sum().item()
        n_predictions = pred_labels.numel()
        loss = loss_function(scores, gt_labels).mean()

        # Accumulate the metrics
        metrics.add(n_corrects=n_corrects, n_predictions=n_predictions, loss=loss.item())

    # Compute the aggregate metrics
    accuracy: float = metrics['n_corrects'] / metrics['n_predictions']
    loss: float = metrics['loss'] / batch
    return accuracy, loss

Define the `train` function that implements the training loop:

In [16]:
def train(
    model: nn.Module,
    train_dataloader: DataLoader,
    val_dataloader: DataLoader,
    optimizer: Optimizer,
    n_epochs: int,
    patience: int,
    tolerance: float,
    checkpoint_dir: Optional[str] = None,
) -> nn.Module:

    model.train()
    train_metrics = Accumulator()
    early_stopping = EarlyStopping(patience, tolerance)
    logger = Logger()
    checkpoint_saver = CheckPointSaver(dirpath=checkpoint_dir)

    # loop through each epoch
    for epoch in range(1, n_epochs + 1):
        # Loop through each batch
        for batch, (batch_images, gt_labels, filenames) in enumerate(train_dataloader, start=1):
            batch_images = batch_images.to(device)
            gt_labels = gt_labels.to(device)
            optimizer.zero_grad()
            scores: torch.Tensor = model(batch_images)
            pred_labels: torch.Tensor = scores.max(dim=1).indices
            # print(pred_labels.detach().cpu().numpy())
            # print(gt_labels.detach().cpu().numpy())
            n_corrects: int = (pred_labels == gt_labels).sum().item()
            n_predictions: int = pred_labels.numel()
            loss: torch.Tensor = loss_function(scores, gt_labels).mean()
            loss.backward()
            optimizer.step()
            
            # Accumulate the metrics
            train_metrics.add(n_correct=n_corrects, n_predictions=n_predictions, loss=loss.item())
            train_accuracy: float = train_metrics['n_correct'] / train_metrics['n_predictions']
            train_loss = train_metrics['loss'] / batch
            logger.log(
                epoch=epoch, n_epochs=n_epochs, batch=batch, n_batches=len(train_dataloader),
                train_accuracy=train_accuracy, train_loss=train_loss
            )

        # Save checkpoint
        if checkpoint_dir:
            checkpoint_saver.save(model, filename=f'epoch{epoch}.pt')

        # Reset metric records for next epoch
        train_metrics.reset()

        # Evaluate
        val_accuracy, val_loss = evaluate(model=model, dataloader=val_dataloader)
        logger.log(epoch=epoch, n_epochs=n_epochs, val_accuracy=val_accuracy, val_loss=val_loss)
        print('='*20)

        early_stopping(val_loss)
        if early_stopping:
            print('Early Stopped')
            break
    
    return model

Create an instance of `NeuralNet`, which is a Convolutional Neural Network. 

Its feature extractor consists of 9 convolutional layers with hidden dimensions progressively increasing from 512 to 2048. Pooling layers are applied after every third convolutional layer to reduce spatial dimensions. 

The feature extractor is followed by a classifier with fully connected layers predicting three output classes.

In [17]:
net = NeuralNet(
    n_hiddens=[
        512, 512, 512, 
        1024, 1024, 1024, 
        2048, 2048, 2048,
    ], 
    poolings=[
        True, False, False, 
        True, False, False, 
        True, False, False,
    ],
    n_classes=3,
).to(device)



Create an instance of Adam optimizer, which has the learning rate adaptively changing from `0.00001`

In [18]:
optimizer = Adam(params=net.parameters(), lr=0.00001)

Now we run the training loop on the maximum number of `100` epochs. 

After each epoch, the model is evaluated on the validation dataset. The early stopping is implemented to stop the training process if the validation loss does not improve after `10` consecutive epochs. The tolerance of the improvement is set to `0`. This early stopping helps avoid overfitting.

The checkpoints are saved after each epoch in the `.pt` format.

In [19]:
net = train(
    model=net,
    train_dataloader=DataLoader(dataset=train_dataset, batch_size=16, shuffle=True),
    val_dataloader=DataLoader(dataset=valid_dataset, batch_size=4, shuffle=False),
    optimizer=optimizer,
    n_epochs=100,
    patience=10,
    tolerance=0.,
    checkpoint_dir='.checkpoints',
)

Epoch 1/100 | Batch 1/88 | train_accuracy: 5.000e-01, train_loss: 1.077e+00
Epoch 1/100 | Batch 2/88 | train_accuracy: 5.000e-01, train_loss: 1.166e+00
Epoch 1/100 | Batch 3/88 | train_accuracy: 5.625e-01, train_loss: 1.014e+00
Epoch 1/100 | Batch 4/88 | train_accuracy: 4.844e-01, train_loss: 1.288e+00
Epoch 1/100 | Batch 5/88 | train_accuracy: 4.500e-01, train_loss: 1.378e+00
Epoch 1/100 | Batch 6/88 | train_accuracy: 4.167e-01, train_loss: 1.360e+00
Epoch 1/100 | Batch 7/88 | train_accuracy: 4.196e-01, train_loss: 1.319e+00
Epoch 1/100 | Batch 8/88 | train_accuracy: 3.984e-01, train_loss: 1.292e+00
Epoch 1/100 | Batch 9/88 | train_accuracy: 3.889e-01, train_loss: 1.280e+00
Epoch 1/100 | Batch 10/88 | train_accuracy: 3.812e-01, train_loss: 1.278e+00
Epoch 1/100 | Batch 11/88 | train_accuracy: 3.920e-01, train_loss: 1.255e+00
Epoch 1/100 | Batch 12/88 | train_accuracy: 3.802e-01, train_loss: 1.225e+00
Epoch 1/100 | Batch 13/88 | train_accuracy: 3.846e-01, train_loss: 1.210e+00
Epoch 1/

# 3. Evaluate your model using the developed software

Define the `predict` function to evaluate the model on test dataset:

In [20]:
import datetime as dt
import pandas as pd

import torch
import torch.nn as nn
from torch.utils.data import DataLoader

In [21]:
def predict(model: nn.Module, dataloader: DataLoader) -> pd.DataFrame:
    model.eval()

    filenames = []
    predictions = []
    with torch.no_grad():
        for images, fnames in dataloader:
            images = images.to(next(model.parameters()).device)
            outputs = model(images)
            _, predicted = torch.max(outputs, 1)
            filenames.extend(fnames)
            predictions.extend(predicted.cpu().numpy())

    prediction_table = pd.DataFrame(
        data={'image': filenames, 'label': predictions}
    )
    prediction_table.to_csv(
        f'{dt.datetime.now().strftime(r"%Y%m%d%H%M%S")}.csv', 
        header=False, 
        index=False
    )
    return prediction_table

Load the trained model:

In [22]:
last_checkpoint: str = '.checkpoints/epoch16.pt'

trained_model: NeuralNet = torch.load(last_checkpoint)

Evaluate the model on test dataset. A `.csv` file is output to load to the developed software:

In [23]:
test_dataset = DogHearUnlabeledDataset(data_root='Dog_heart/Test')
test_dataloader = DataLoader(
    dataset=test_dataset, batch_size=16, shuffle=False
)

In [24]:
predict(model=trained_model, dataloader=test_dataloader)

Unnamed: 0,image,label
0,1804.png,2
1,1810.png,0
2,1838.png,0
3,1623.png,2
4,1637.png,2
...,...,...
395,1955.png,0
396,1969.png,1
397,1835.png,1
398,1821.png,0


<img src="https://github.com/hiepdang-ml/dog_heart_classification/blob/master/test.png?raw=true" alt="PredictionImage" style="width:50%;"/>

# 4. Compare results with [RVT paper](https://www.nature.com/articles/s41598-023-50063-x). Requirement: performance is better than VGG16: 70%

We got `71%` accuracy on test dataset, which is better than `VGG16`

# 5. Write a four-page paper report using the shared LaTex template. Upload your paper to ResearchGate or Arxiv, and put your paper link and GitHub weight link here.

Paper: `...`

Source code: https://github.com/hiepdang-ml/dog_heart_classification

# 6. Grading rubric

(1). Code ------- 20 points (you also need to upload your final model as a pt file)

(2). Grammer ---- 20 points

(3). Introduction & related work --- 10 points


(4). Method  ---- 20 points

(5). Results ---- 20 points

     > = 70 % -->10 points
     < 50 % -->0 points
     >= 50 % & < 70% --> 0.5 point/percent
     

(6). Discussion - 10 points

---