In [None]:
import os
import torch
import random
import numpy as np
import pandas as pd
import torchvision
import matplotlib.pyplot as plt
import time
import copy

from torchvision.transforms import v2
from torchvision import models
from sklearn.metrics import accuracy_score, f1_score

## Make dataloaders

In [None]:
def get_dataloaders(joint_type_and_param: str) -> tuple(torch.utils.data.DataLoader, torch.utils.data.DataLoader, int):
    '''
    Make 2 DataLoaders (using torchvision.datasets.ImageFolder) and count number of classes.

    Args:
        joint_type_and_param (str): f"{joint_type}_{param}", for example "ulna_erosion"

    Return:
        train_dataloader
        val_dataloader
        n_classes (int): number of classes
    '''
    mean_list, std_list = [0.485, 0.456, 0.406], [0.229, 0.224, 0.225]

    train_transform = v2.Compose([
        v2.Resize((224, 224)),
        v2.RandomRotation(15),
        v2.RandomHorizontalFlip(p = 0.3),
        v2.RandomVerticalFlip(p = 0.3),
        v2.ToTensor(),
        v2.Normalize(mean_list, std_list)
    ])

    val_transform = v2.Compose([
        v2.Resize((224, 224)),
        v2.RandomRotation(15),
        v2.ToTensor(),
        v2.Normalize(mean_list, std_list)
    ])

    data_root = os.path.join('dataset', 'custom_split_inv_clahe')
    train, val = os.path.join(data_root, joint_type_and_param, 'train'), os.path.join(data_root, joint_type_and_param, 'test')

    n_classes = max(len(os.listdir(train)), len(os.listdir(val)))

    train_dataset = torchvision.datasets.ImageFolder(train, train_transform)
    val_dataset = torchvision.datasets.ImageFolder(val, val_transform)

    batch_size = 64
    train_dataloader = torch.utils.data.DataLoader(
        train_dataset, batch_size=batch_size, shuffle=True, num_workers=8)
    val_dataloader = torch.utils.data.DataLoader(
        val_dataset, batch_size=batch_size, shuffle=False, num_workers=8)

    print(len(train_dataloader), len(val_dataloader))
    return train_dataloader, val_dataloader, n_classes

### Visualization of images in a batch
Currently isn't used

In [None]:
# visualize all images in a batch

def show_input(input_tensor, title=''):
    mean = np.array([0.485, 0.456, 0.406])
    std = np.array( [0.229, 0.224, 0.225])

    image = input_tensor.permute(1, 2, 0).numpy()
    image = std * image + mean
    plt.imshow(image.clip(0, 1))
    plt.title(title)
    plt.show()
    plt.pause(0.001)

def visualise_batch(train_dataloader):
    X_batch, y_batch = next(iter(train_dataloader))

    for x_item, y_item in zip(X_batch, y_batch):
        show_input(x_item, title={y_item})

## Definition of model training and prediction functions
And 'test_time_augmentations' function

In [None]:
def train_model(model, train_dataloader, val_dataloader, loss, optimizer, num_epochs, device, output_transform = None):
    '''
    Model training loop (num_epochs), every epoch has 2 phases: train and val.
    Loss criterion: ordered cross entropy.
    Backpropagation is only done on train phase.

    First 20 epochs are for training of the last layer, on epoch 21 all other model parameters are set to have gradient.

    Early stopping mechanism is implemented to return a model with smallest loss value on val data.

    Args:
        model
        train_dataloader
        val_dataloader
        loss (Criterion)
        optimizer (torch.optim.Adam)
        num_epochs (int)
        device (torch.device) - 'cuda:0' or 'cpu'
        output_transform = None (str): 'bin' or 'minor'
    
    Return:
        best_model
    '''
    for epoch in tqdm(range(num_epochs)):
        if epoch == 20:
            for param in model.parameters():
                param.requires_grad = True

        for phase in ['train', 'val']:
            if phase == 'train':
                dataloader = train_dataloader
                model.train()
            else:
                dataloader = val_dataloader
                model.eval()

            running_loss = 0.
            running_acc = 0.

            y_preds, y_trues = [], []
            for inputs, labels in dataloader:
                inputs = inputs.to(device)
                labels = labels.to(device)

                optimizer.zero_grad()

                with torch.set_grad_enabled(phase == 'train'):
                    preds = model(inputs)
                    if output_transform:
                       labels = out_transform(labels, output_transform)
                    
                    # calculate ordered CrossEntropy
                    loss_value = loss(preds, labels)
                    distance_weight = torch.abs(preds.argmax(1) - labels) + 1
                    ordinal_ce_loss = torch.mean(distance_weight * loss_value)
                    preds_class = preds.argmax(dim=1)

                    if phase == 'train':
                        # backpropagation only in train phase
                        ordinal_ce_loss.backward()
                        optimizer.step()
                    else:
                        y_preds.extend(preds_class.detach().cpu().tolist())
                        y_trues.extend(labels.detach().cpu().tolist())

                running_loss += loss_value.item()
                running_acc += (preds_class == labels.data).float().mean()

            epoch_loss = running_loss / len(dataloader)
            epoch_acc = running_acc / len(dataloader)

            #early stopping based on model loss on val data
            if phase == 'train':
                train_acc = epoch_acc
            else:
                val_acc = epoch_acc
                val_loss = epoch_loss

        #if epoch % 10 == 0:
         #   print('Epoch {},\n    train accuracy: {:.4f},\n    val accuracy: {:.4f}, f1: {:.4f}'.format(epoch+1, train_acc, val_acc, \
          #                                                                                               f1_score(y_trues, y_preds, average='micro')), flush = True)

        if epoch == 0:
            best_loss = val_loss
        elif val_loss < best_loss:
            best_loss = val_loss
            best_model = copy.deepcopy(model)

    #print('Epoch {},\n    train accuracy: {:.4f},\n    val accuracy: {:.4f}, f1: {:.4f}'.format(epoch+1, train_acc, val_acc, \
     #                                                                                           f1_score(y_trues, y_preds, average='micro')), flush = True)
    return best_model

In [None]:
def test_time_augmentations(preds, inputs, model):
    '''
    Test time augmentations to increase accuracy on test data.
    Used augmentations: rotation + and - 10 degrees, 
        horizontal flip of the original image and same rotations of flipped image.
    
    Args:
        preds (torch.Tensor): model predictions for the original image (or batch of images)
        inputs (torch.Tensor): original image (or batch of images)
        model
    
    Return:
        preds (torch.Tensor): sum of predictions for all augmentations
    '''
    inp_rot_p15 = v2.functional.rotate(inputs, 10)
    inp_rot_m15 = v2.functional.rotate(inputs, -10)

    with torch.no_grad():
        preds += model(v2.functional.horizontal_flip(inputs))
        preds += model(v2.functional.horizontal_flip(inp_rot_p15))
        preds += model(v2.functional.horizontal_flip(inp_rot_m15))
        preds += model(inp_rot_p15)
        preds += model(inp_rot_m15)
    return preds

def predict_test(model, test_dataloader, device):
    '''
    Predict test class names.
    Returns accuracy and f1-score using predictions and predictions with test time augmentations.

    Args:
        model
        val_dataloader
        device (torch.device): 'cuda' or 'cpu'
    
    Return:
        acc (float): accuracy, ranging from 0 to 1
        acc_tta (float): accuracy (with test time augmentations), ranging from 0 to 1
        f1 (float): f1-score, ranging from 0 to 1
        f1_tta (float): f1-score (with test time augmentations), ranging from 0 to 1
    '''
    model.eval()
    acc = 0
    acc_tta = 0

    y_preds, y_preds_tta, y_trues = [], [], []
    for inputs, labels in test_dataloader:
        inputs, labels = inputs.to(device), labels.to(device)
        y_trues.extend(labels.detach().cpu().tolist())
        with torch.no_grad():
            preds = model(inputs)

        y_pred = preds.argmax(dim=1)
        y_preds.extend(y_pred.detach().cpu().tolist())
        acc += (y_pred == labels.data).float().mean()

        y_pred_tta = test_time_augmentations(preds, inputs, model).argmax(dim=1)
        y_preds_tta.extend(y_pred_tta.detach().cpu().tolist())
        acc_tta += (y_pred_tta == labels.data).float().mean()

    f1 = f1_score(y_trues, y_preds, average='micro')
    f1_tta = f1_score(y_trues, y_preds_tta, average='micro')
    print('Test accuracy = {:.4f}, f1 = {:.4f}'.format(acc/len(test_dataloader), f1))
    print('Test accuracy (with tta) = {:.4f}, f1 = {:.4f}'.format(acc_tta/len(test_dataloader), f1_tta))
    return acc/len(test_dataloader), acc_tta/len(test_dataloader), f1, f1_tta

In [None]:
def get_model(n_classes):
    '''
    Get models:
        1. Load pretrained EfficientNet b4.
        2. Disable gradient on all layers, except the last one.
        3. Change last layer so the output size is equal to number of classes. 
        4. Get device ('cuda' or 'cpu') and load model to device.

    Make loss and optimizer.

    Args:
        n_classes (int): number of classes.

    Return:
        model
        loss (torch.nn.CrossEntropyLoss)
        optimizer (torch.optim.Adam)
        device (torch.device): 'cuda' or 'cpu'
    '''

    model = models.efficientnet_b4(weights = models.EfficientNet_B4_Weights.DEFAULT)
    
    for param in model.parameters():
        param.requires_grad = False

    model.classifier = torch.nn.Sequential(torch.nn.Dropout(p = 0.2, inplace=True),
                                        torch.nn.Linear(model.classifier[1].in_features, model.classifier[1].in_features//2),
                                        torch.nn.Dropout(p = 0.2, inplace=True),
                                        torch.nn.LeakyReLU(),
                                        torch.nn.Linear(model.classifier[1].in_features//2, n_classes))

    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    print(f'Using {device}')
    model = model.to(device)

    loss = torch.nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=3.0e-4)

    return model, loss, optimizer, device

## Count which classes are balanced and train a model for each class

In [None]:
def count_disbalance(df, param):
    d = {}
    for column in df.columns:
        vals = df[column].to_list()
        s = sum(vals)
        for i, v in enumerate(vals):
            if v > s * .5:
                d[f'{column}_{param}'] = str(i)
                break
        else:
            d[f'{column}_{param}'] = 'balanced'
    return d

erosion_df = pd.read_csv(os.path.join('dataset', 'non-sorted', 'erosion_data_counts.csv'), header = 0, index_col=0)
jsn_df = pd.read_csv(os.path.join('dataset', 'non-sorted', 'jsn_data_counts.csv'), header = 0, index_col=0)
er_d, jsn_d = count_disbalance(erosion_df, 'erosion'), count_disbalance(jsn_df, 'jsn')
print(er_d)
print(jsn_d)

In [None]:
full_dict = dict(list(er_d.items()) + list(jsn_d.items()))

for key in full_dict:
    if full_dict[key] == 'balanced':
        print(f'\n {key}')
        train_dataloader, val_dataloader, n_classes = get_dataloaders(key)
        model, loss, optimizer, device = get_model(n_classes)
        model = train_model(model, train_dataloader, val_dataloader, loss, optimizer, num_epochs = 100, device = device)

        test_acc, test_acc_tta, f1, f1_tta = predict_test(model, val_dataloader, device)
        #check for model name
        torch.save(model, os.path.join('models', \
                        'effNetb4_{}_{:.3f}_tta_{:.3f}.json'.format(key, test_acc, test_acc_tta)))