## Importing libraries

In [1]:
import time
from copy import deepcopy

import numpy as np
import torch
from sklearn.metrics import (
    accuracy_score,
    cohen_kappa_score,
    confusion_matrix,
    mean_absolute_error,
)
from sklearn.model_selection import StratifiedShuffleSplit
from sklearn.utils import class_weight
from torch import cuda, nn
from torch.optim import Adam
from torch.utils.data import DataLoader, Subset
from torchvision import models
from torchvision.transforms import Compose, ToTensor
from tqdm import tqdm

from dlordinal.datasets import FGNet
from dlordinal.losses import TriangularLoss
from dlordinal.output_layers import StickBreakingLayer

## Load and preprocess of FGNet dataset

First, we present the configuration parameters for the experimentation and the number of workers for the *DataLoader*, which defines the number of subprocesses to use for data loading. In this specific case, it refers to the images.

In [2]:
optimiser_params = {"lr": 1e-3, "bs": 200, "epochs": 5, "s": 2, "c": 0.2, "beta": 0.5}

workers = 3

Now we use the *FGNet* method to download and preprocess the images. Once that is done with the training data, we create a validation partition comprising 15% of the data using the *StratifiedShuffleSplit* method. Finally, with all the partitions, we load the images using a method called *DataLoader*.

In [3]:
fgnet_trainval = FGNet(
    root="./datasets",
    download=True,
    train=True,
    target_transform=np.array,
    transform=Compose([ToTensor()]),
)

test_data = FGNet(
    root="./datasets",
    download=True,
    train=False,
    target_transform=np.array,
    transform=Compose([ToTensor()]),
)

num_classes = len(fgnet_trainval.classes)
classes = fgnet_trainval.classes
targets = fgnet_trainval.targets

# Create a validation split
sss = StratifiedShuffleSplit(n_splits=1, test_size=0.15, random_state=0)
sss_splits = list(sss.split(X=np.zeros(len(fgnet_trainval)), y=fgnet_trainval.targets))
train_idx, val_idx = sss_splits[0]

# Create subsets for training and validation
train_data = Subset(fgnet_trainval, train_idx)
val_data = Subset(fgnet_trainval, val_idx)

# Get CUDA device
device = "cuda" if cuda.is_available() else "cpu"
print(f"Using {device} device")

# Create dataloaders
train_dataloader = DataLoader(
    train_data, batch_size=optimiser_params["bs"], shuffle=True, num_workers=workers
)
val_dataloader = DataLoader(
    val_data, batch_size=optimiser_params["bs"], shuffle=True, num_workers=workers
)
test_dataloader = DataLoader(
    test_data, batch_size=optimiser_params["bs"], shuffle=False, num_workers=workers
)

# Get image shape
img_shape = None
for X, _ in train_dataloader:
    img_shape = list(X.shape[1:])
    break
print(f"Detected image shape: {img_shape}")

# Define class weights for imbalanced datasets
classes_array = np.array([int(c) for c in classes])

class_weights = class_weight.compute_class_weight(
    "balanced", classes=classes_array, y=targets
)
print(f"{class_weights=}")
class_weights = (
    torch.from_numpy(class_weights).float().to(device)
)  # Transform to Tensor

Files already downloaded and verified
Files already processed and verified
Files already split and verified
Files already downloaded and verified
Files already processed and verified
Files already split and verified
Using cuda device
Detected image shape: [3, 128, 128]
class_weights=array([1.53448276, 0.55165289, 1.01908397, 0.79464286, 1.13135593,
       2.42727273])


## Model

We are using a pretrained *ResNet* model, which has previously been trained on ImageNet. We are modifying the last fully connected layer by a methodology called *Stick Breaking*.

The stick breaking approach considers the problem of breaking a stick of length 1 into J segments. This methodology is related to non-parametric Bayesian methods and can be considered a subset of the random allocation processes [1].

Finally, we define the *Adam* optimiser, which is used to adjust the network's weights and minimize the error of a loss function.

[1]: Vargas, Víctor Manuel et al. (2022). *Unimodal regularisation based on beta distribution for deep ordinal regression.* Pattern Recognition, 122, 108310. Elsevier. doi.org/10.1016/j.patcog.2021.108310

In [4]:
model = models.resnet18(weights="IMAGENET1K_V1")
model.fc = StickBreakingLayer(model.fc.in_features, num_classes)
model = model.to(device)

# Optimizer and scheduler
optimizer = Adam(model.parameters(), lr=optimiser_params["lr"])

## Loss Function

$$
\begin{align*}
f(x; a_j, b_j, c_j) &= \begin{cases}
   0, & x < a_j, \\
   \frac{2(x - a_j)}{(b_j - a_j)(c_j - a_j)}, & a_j \leq x < c_j, \\
   \frac{2(b_j - x)}{(b_j - a_j)(b_j - c_j)}, & c_j \leq x < b_j, \\
   0, & b_j \leq x,
\end{cases}
\end{align*}
$$


The triangular distribution [1] can be determined using three parameters a, b and c, which define the lower limit, upper limit, and mode, respectively. These parameters also determine the x coordinate of each of the vertices of the triangle.

The distributions employed for the extreme classes should differ from those utilized for the intermediate ones. Consequently, the distributions for the initial and final classes should allocate their probabilities just in one direction: positively for the first class and negatively for the last one.

[1]: Víctor Manuel Vargas, Pedro Antonio Gutiérrez, Javier Barbero-Gómez, and César Hervás-Martínez (2023). *Soft Labelling Based on Triangular Distributions for Ordinal Classification.* Information Fusion, 93, 258--267. doi.org/10.1016/j.inffus.2023.01.003


In [5]:
loss_fn = TriangularLoss(base_loss=nn.CrossEntropyLoss(), num_classes=num_classes).to(
    device
)

## Metrics

In [6]:
# Metrics computation


def compute_metrics(y_true: np.ndarray, y_pred: np.ndarray, num_classes: int):

    if len(y_true.shape) > 1:
        y_true = np.argmax(y_true, axis=1)

    if len(y_pred.shape) > 1:
        y_pred = np.argmax(y_pred, axis=1)

    labels = range(0, num_classes)

    # Metrics calculation
    qwk = cohen_kappa_score(y_true, y_pred, weights="quadratic", labels=labels)
    ms = minimum_sensitivity(y_true, y_pred, labels=labels)
    mae = mean_absolute_error(y_true, y_pred)
    acc = accuracy_score(y_true, y_pred)
    off1 = accuracy_off1(y_true, y_pred, labels=labels)
    conf_mat = confusion_matrix(y_true, y_pred, labels=labels)

    metrics = {
        "QWK": qwk,
        "MS": ms,
        "MAE": mae,
        "CCR": acc,
        "1-off": off1,
        "Confusion matrix": conf_mat,
    }

    return metrics


def _compute_sensitivities(y_true, y_pred, labels=None):
    if len(y_true.shape) > 1:
        y_true = np.argmax(y_true, axis=1)
    if len(y_pred.shape) > 1:
        y_pred = np.argmax(y_pred, axis=1)

    conf_mat = confusion_matrix(y_true, y_pred, labels=labels)

    sum = np.sum(conf_mat, axis=1)
    mask = np.eye(conf_mat.shape[0], conf_mat.shape[1])
    correct = np.sum(conf_mat * mask, axis=1)
    sensitivities = correct / sum

    sensitivities = sensitivities[~np.isnan(sensitivities)]

    return sensitivities


def minimum_sensitivity(y_true, y_pred, labels=None):
    return np.min(_compute_sensitivities(y_true, y_pred, labels=labels))


def accuracy_off1(y_true, y_pred, labels=None):
    if len(y_true.shape) > 1:
        y_true = np.argmax(y_true, axis=1)
    if len(y_pred.shape) > 1:
        y_pred = np.argmax(y_pred, axis=1)

    conf_mat = confusion_matrix(y_true, y_pred, labels=labels)
    n = conf_mat.shape[0]
    mask = np.eye(n, n) + np.eye(n, n, k=1), +np.eye(n, n, k=-1)
    correct = mask * conf_mat

    return 1.0 * np.sum(correct) / np.sum(conf_mat)


def print_metrics(metrics):
    print("")
    print("Confusion matrix :\n{}".format(metrics["Confusion matrix"]))
    print("")
    print("MS: {:.4f}".format(metrics["MS"]))
    print("")
    print("QWK: {:.4f}".format(metrics["QWK"]))
    print("")
    print("MAE: {:.4f}".format(metrics["MAE"]))
    print("")
    print("CCR: {:.4f}".format(metrics["CCR"]))
    print("")
    print("1-off: {:.4f}".format(metrics["1-off"]))

## Training Process

In [7]:
def train(
    dataloader: torch.utils.data.DataLoader,
    model: torch.nn.Module,
    loss_fn: torch.nn.Module,
    optimizer: torch.optim.Optimizer,
    device: torch.device,
    H: dict,
    num_classes: int,
):  # H: dict
    num_batches = len(dataloader)
    size = len(dataloader.dataset)
    progress_bar = tqdm(total=num_batches, ncols=100, position=0, desc="Train progress")
    model.train()
    mean_loss, accuracy = 0, 0
    y_pred, y_true = None, None

    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)  # Inputs and labels to device

        # Compute prediction error and accuracy of the training process
        pred = model(X)
        loss = loss_fn(pred, y)

        mean_loss += loss
        accuracy += (pred.argmax(1) == y).type(torch.float).sum().item()

        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # Stack predictions and true labels to determine the confusion matrix
        pred_np = pred.argmax(1).cpu().detach().numpy()
        true_np = y.cpu().detach().numpy()
        if y_pred is None:
            y_pred = pred_np
        else:
            y_pred = np.concatenate((y_pred, pred_np))

        if y_true is None:
            y_true = true_np
        else:
            y_true = np.concatenate((y_true, true_np))

        # Update progress bar
        progress_bar.set_postfix(loss=loss.item(), accuracy=accuracy)
        progress_bar.update(1)

    accuracy /= size
    mean_loss /= num_batches

    H["train_loss"].append(loss.cpu().detach().numpy())
    H["train_acc"].append(accuracy)

    # Confusion matrix for training
    labels = range(0, num_classes)
    conf_mat = confusion_matrix(y_true, y_pred, labels=labels)
    print("")
    print("Train Confusion matrix :\n{}".format(conf_mat))
    print("")

    return accuracy, mean_loss

## Test Process

In [8]:
def test(
    test_dataloader: torch.utils.data.DataLoader,
    model: torch.nn.Module,
    loss_fn: torch.nn.Module,
    device: torch.device,
    num_classes: int,
):
    num_batches = len(test_dataloader)
    progress_bar = tqdm(total=num_batches, ncols=100, position=0, desc="Test progress")
    model.eval()
    test_loss = 0
    y_pred, y_true = None, None

    with torch.no_grad():
        for batch, (X, y) in enumerate(test_dataloader):
            X, y = X.to(device), y.to(device)  # inputs and labels to device
            pred = model(X)
            test_loss += loss_fn(pred, y).item()

            # Stack predictions and true labels
            pred_np = pred.argmax(1).cpu().detach().numpy()
            true_np = y.cpu().detach().numpy()
            if y_pred is None:
                y_pred = pred_np
            else:
                y_pred = np.concatenate((y_pred, pred_np))

            if y_true is None:
                y_true = true_np
            else:
                y_true = np.concatenate((y_true, true_np))

            # Update progress bar
            progress_bar.set_postfix(loss=test_loss / (batch + 1))
            progress_bar.update(1)

    test_loss /= num_batches
    metrics = compute_metrics(y_true, y_pred, num_classes)
    print_metrics(metrics)

    return metrics, test_loss

## Validation Process

In [9]:
def validate(
    dataloader: torch.utils.data.DataLoader,
    model: torch.nn.Module,
    loss_fn: torch.nn.Module,
    device: torch.device,
    H: dict,
    num_classes: int,
):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    loss, accuracy = 0, 0
    y_pred, y_true = None, None

    with torch.no_grad():
        for batch, (X, y) in enumerate(dataloader):
            X, y = X.to(device), y.to(device)
            pred = model(X)
            loss += loss_fn(pred, y)
            accuracy += (pred.argmax(1) == y).type(torch.float).sum().item()

            pred_np = pred.argmax(1).cpu().detach().numpy()
            true_np = y.cpu().detach().numpy()
            if y_pred is None:
                y_pred = pred_np
            else:
                y_pred = np.concatenate((y_pred, pred_np))

            if y_true is None:
                y_true = true_np
            else:
                y_true = np.concatenate((y_true, true_np))

    accuracy /= size
    loss /= num_batches

    H["val_loss"].append(loss.cpu().detach().numpy())
    H["val_acc"].append(accuracy)

    metrics = compute_metrics(y_true, y_pred, num_classes)

    return metrics, accuracy, loss

## Results

In [10]:
H = {"train_loss": [], "train_acc": [], "val_loss": [], "val_acc": []}

# To store validation metrics
validation_metrics = {}

# Definition to store best model weights
best_model_weights = model.state_dict()
best_qwk = 0.0

# Start time
start_time = time.time()

for e in range(optimiser_params["epochs"]):
    train_acc, train_loss = train(
        train_dataloader, model, loss_fn, optimizer, device, H, num_classes=num_classes
    )
    validation_metrics, val_acc, val_loss = validate(
        val_dataloader, model, loss_fn, device, H, num_classes=num_classes
    )

    if validation_metrics["QWK"] >= best_qwk:
        best_qwk = validation_metrics["QWK"]
        best_model_weights = deepcopy(model.state_dict())

    print("[INFO] EPOCH: {}/{}".format(e + 1, optimiser_params["epochs"]))
    print("Train loss: {:.6f}, Train accuracy: {:.4f}".format(train_loss, train_acc))
    print("Val loss: {:.6f}, Val accuracy: {:.4f}\n".format(val_loss, val_acc))

# Store last train error
train_error = H["train_loss"][-1]

# Restore best weights
model.load_state_dict(best_model_weights)

# Start evaluation
print("[INFO] Network evaluation ...")

test_metrics, test_loss = test(
    test_dataloader, model, loss_fn, device, num_classes=num_classes
)

# End time
end_time = time.time()
print("\n[INFO] Total training time: {:.2f}s".format(end_time - start_time))

Train progress: 100%|████████████████████████| 4/4 [00:01<00:00,  2.62it/s, accuracy=125, loss=1.67]


Train Confusion matrix :
[[ 73   1   0   0   0   0]
 [168  24   7   5   1   0]
 [ 79  12   2  15   3   0]
 [110  11   2  19   1   0]
 [ 71  11   1  12   5   0]
 [ 31   3   0   5   6   2]]






[INFO] EPOCH: 1/5
Train loss: 2.032689, Train accuracy: 0.1838
Val loss: 2.108031, Val accuracy: 0.1405



Train progress: 100%|████████████████████████| 4/4 [00:00<00:00,  5.15it/s, accuracy=363, loss=1.31]


Train Confusion matrix :
[[ 64   8   2   0   0   0]
 [ 38 112  26  18   9   2]
 [  7  24  37  30  13   0]
 [  6   8  27  85  17   0]
 [ 13   5   3  27  43   9]
 [  5   1   1  10   8  22]]






[INFO] EPOCH: 2/5
Train loss: 1.358173, Train accuracy: 0.5338
Val loss: 2.558786, Val accuracy: 0.1322



Train progress: 100%|████████████████████████| 4/4 [00:00<00:00,  5.32it/s, accuracy=475, loss=1.18]


Train Confusion matrix :
[[ 63  11   0   0   0   0]
 [ 32 154   8   8   3   0]
 [  1  34  48  22   6   0]
 [  2   5  13 113  10   0]
 [  0   7   2  17  61  13]
 [  1   0   0   0  10  36]]






[INFO] EPOCH: 3/5
Train loss: 1.206461, Train accuracy: 0.6985
Val loss: 1.459632, Val accuracy: 0.4793



Train progress: 100%|████████████████████████| 4/4 [00:00<00:00,  4.96it/s, accuracy=525, loss=1.11]


Train Confusion matrix :
[[ 59  14   1   0   0   0]
 [  7 186   9   3   0   0]
 [  0  36  51  18   6   0]
 [  0   2  15 116  10   0]
 [  0   6   0  17  74   3]
 [  0   2   1   0   5  39]]






[INFO] EPOCH: 4/5
Train loss: 1.108122, Train accuracy: 0.7721
Val loss: 1.435940, Val accuracy: 0.5455



Train progress: 100%|████████████████████████| 4/4 [00:00<00:00,  4.70it/s, accuracy=568, loss=1.03]


Train Confusion matrix :
[[ 62  12   0   0   0   0]
 [  6 193   5   1   0   0]
 [  0  15  77  16   3   0]
 [  0   0  13 124   6   0]
 [  0   1   0  23  72   4]
 [  0   0   0   0   7  40]]






[INFO] EPOCH: 5/5
Train loss: 1.033004, Train accuracy: 0.8353
Val loss: 1.374854, Val accuracy: 0.5372

[INFO] Network evaluation ...


Test progress: 100%|███████████████████████████████████████| 2/2 [00:00<00:00,  3.30it/s, loss=1.42]


Confusion matrix :
[[ 9 12  0  1  0  0]
 [ 3 43  8  3  2  1]
 [ 0  8  8 10  2  5]
 [ 0  5  7 17  4  9]
 [ 0  2  2  4  8 14]
 [ 0  2  1  3  2  6]]

MS: 0.2424

QWK: 0.6677

MAE: 0.8209

CCR: 0.4527

1-off: 0.8109

[INFO] Total training time: 7.36s



