# Imports

In [2]:
import gdown
import matplotlib.pyplot as plt
import numpy as np
import shutil
import os
import pandas as pd
import random
import matplotlib.pyplot as plt
import seaborn as sns
import timm
import torch
import torch.nn.functional as F

from collections import defaultdict
from PIL import Image, ImageOps
from tqdm import tqdm
from torch import nn
from torch import optim
from torch.utils.data import DataLoader, Dataset, WeightedRandomSampler, ConcatDataset, random_split
from torchvision import models, transforms
from torchvision.datasets import ImageFolder
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix

The history saving thread hit an unexpected error (OperationalError('attempt to write a readonly database')).History will not be written to the database.


# Functions

In [3]:
# Plots 9 random images from the dataset

def plot_random_images(dataset, gray=False):
    random_idx = np.random.randint(0, len(dataset), 9)

    plt.figure(figsize=(10, 10))
    for i, img_index in enumerate(random_idx):
        plt.subplot(3,3,i+1)
        plt.grid(False)
        image, label = dataset[img_index]
        plt.title(label)
        if gray:
          plt.imshow(image.permute(1,2,0), cmap='gray')
        else:
          plt.imshow(image.permute(1,2,0))
    
    plt.show()

In [4]:
# Plot confusion matrix

def plot_confusion_matrix(cm, class_names=[0, 1]):
    plt.figure(figsize=(6,5))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=class_names, yticklabels=class_names)
    plt.xlabel('Predicted')
    plt.ylabel('Actual')
    plt.title('Confusion Matrix')
    plt.show()

In [None]:
# Auxiliary function for train_classification()

def _train_step_classification(classifier_model, dataloader, criterion, optimizer, scheduler, device):
    classifier_model.train()

    total_loss = 0
    all_preds = []
    all_targets = []

    for X, y in dataloader:
        X, y = X.to(device), y.to(device)
        y_pred = classifier_model(X)
        loss = criterion(y_pred, y)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item() * X.size(0)

        y_pred_labels = torch.argmax(y_pred, axis=1)
        all_preds.extend(y_pred_labels.detach().cpu().numpy())
        all_targets.extend(y.detach().cpu().numpy())

    if scheduler:
        scheduler.step()

    mean_loss = total_loss / len(dataloader.dataset)
    accuracy = accuracy_score(all_targets, all_preds)
    precision = precision_score(all_targets, all_preds, average='binary', zero_division=0)
    recall = recall_score(all_targets, all_preds, average='binary', zero_division=0)
    f1 = f1_score(all_targets, all_preds, average='binary', zero_division=0)

    tn, fp, fn, tp = confusion_matrix(all_targets, all_preds).ravel()
    specificity = tn / (tn + fp) if (tn + fp) > 0 else 0.0
    sensitivity = tp / (tp + fn) if (tp + fn) > 0 else 0.0

    return mean_loss, accuracy, precision, recall, f1, sensitivity, specificity


In [None]:
# Auxiliary function for train_classification()

def _test_step_classification(classifier_model, dataloader, criterion, device):
    classifier_model.eval()

    total_loss = 0
    all_preds = []
    all_targets = []

    with torch.inference_mode():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            y_pred = classifier_model(X)
            loss = criterion(y_pred, y)

            total_loss += loss.item() * X.size(0)

            y_pred_labels = torch.argmax(y_pred, axis=1)
            all_preds.extend(y_pred_labels.detach().cpu().numpy())
            all_targets.extend(y.detach().cpu().numpy())

    mean_loss = total_loss / len(dataloader.dataset)
    accuracy = accuracy_score(all_targets, all_preds)
    precision = precision_score(all_targets, all_preds, average='binary', zero_division=0)
    recall = recall_score(all_targets, all_preds, average='binary', zero_division=0)
    f1 = f1_score(all_targets, all_preds, average='binary', zero_division=0)
    cm = confusion_matrix(all_targets, all_preds)

    tn, fp, fn, tp = confusion_matrix(all_targets, all_preds).ravel()
    specificity = tn / (tn + fp) if (tn + fp) > 0 else 0.0
    sensitivity = tp / (tp + fn) if (tp + fn) > 0 else 0.0

    return mean_loss, accuracy, precision, recall, f1, sensitivity, specificity, cm

In [None]:
# Trains classification model

def train_classification(classifier_model, train_dataloader, test_dataloader, criterion, epochs, optimizers, schedulers, device='cpu', verbose=True):
    metrics = {'Train loss': [], 'Train accuracy': [], 'Train sensitivity': [], 'Train specificity': [], 'Train precision': [], 'Train recall': [], 'Train F1': [],
               'Test loss': [], 'Test accuracy': [], 'Test sensitivity': [], 'Test specificity': [], 'Test precision': [], 'Test recall': [], 'Test F1': []}

    current_optimizer_number = 0
    for optimizer, scheduler, num_epochs in zip(optimizers, schedulers, epochs):
        current_optimizer_number += 1
        for epoch in tqdm(range(1, num_epochs+1), desc=f'Training with optimizer {current_optimizer_number}'):
            current_lr = optimizer.param_groups[0]["lr"]
            train_loss, train_accuracy, train_precision, train_recall, train_f1, train_sensitivity, train_specificity = _train_step_classification(classifier_model, train_dataloader, criterion, optimizer, scheduler, device)
            test_loss, test_accuracy, test_precision, test_recall, test_f1, test_sensitivity, test_specificity, cm = _test_step_classification(classifier_model, test_dataloader, criterion, device)

            metrics['Train loss'].append(train_loss)
            metrics['Train accuracy'].append(train_accuracy)
            metrics['Train sensitivity'].append(train_sensitivity)
            metrics['Train specificity'].append(train_specificity)
            metrics['Train precision'].append(train_precision)
            metrics['Train recall'].append(train_recall)
            metrics['Train F1'].append(train_f1)
            metrics['Test loss'].append(test_loss)
            metrics['Test accuracy'].append(test_accuracy)
            metrics['Test sensitivity'].append(test_sensitivity)
            metrics['Test specificity'].append(test_specificity)
            metrics['Test precision'].append(test_precision)
            metrics['Test recall'].append(test_recall)
            metrics['Test F1'].append(test_f1)

            if verbose:
                print(f'\nEPOCH {epoch} | Current learning rate: {current_lr:.8f}\n'
                      f'Train loss: {train_loss:.4f} | Train accuracy: {train_accuracy:.4f} | Train sensitivity: {train_sensitivity:.4f} | Train specificity: {train_specificity:.4f} | Train precision: {train_precision:.4f} | Train recall: {train_recall:.4f} | Train F1: {train_f1:.4f}\n'
                      f'Test loss: {test_loss:.4f} | Test accuracy: {test_accuracy:.4f} | Test sensitivity: {test_sensitivity:.4f} | Test specificity: {test_specificity:.4f} | Test precision: {test_precision:.4f} | Test recall: {test_recall:.4f} | Test F1: {test_f1:.4f}\n')

    return metrics

In [None]:
def test_classification(classifier_model, test_dataloader, criterion, device='cpu', verbose=True):
    metrics = {'Test loss': [], 'Test accuracy': [], 'Test sensitivity': [], 'Test specificity': [], 'Test precision': [], 'Test recall': [], 'Test F1': [], 'Confusion matrix': None}

    test_loss, test_accuracy, test_precision, test_recall, test_f1, test_sensitivity, test_specificity, cm = _test_step_classification(
        classifier_model, test_dataloader, criterion, device
    )

    metrics['Test loss'].append(test_loss)
    metrics['Test accuracy'].append(test_accuracy)
    metrics['Test sensitivity'].append(test_sensitivity)
    metrics['Test specificity'].append(test_specificity)
    metrics['Test precision'].append(test_precision)
    metrics['Test recall'].append(test_recall)
    metrics['Test F1'].append(test_f1)
    metrics['Confusion matrix'] = cm

    if verbose:
        print(f'Test loss: {test_loss:.4f} | Test accuracy: {test_accuracy:.4f} | '
              f'Test sensitivity: {test_sensitivity:.4f} | Test specificity: {test_specificity:.4f} |'
              f'Test precision: {test_precision:.4f} | Test recall: {test_recall:.4f} | '
              f'Test F1: {test_f1:.4f}\n')

    return metrics

In [None]:
# Returns confusion matrix

def get_conf_matrix(classifier_model, dataloader, device):
    classifier_model.eval()

    all_preds = []
    all_targets = []

    with torch.inference_mode():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            y_pred = classifier_model(X).argmax(dim=1).cpu()

            all_preds.extend(y_pred.cpu())
            all_targets.extend(y.cpu())

    conf_matrix = confusion_matrix(all_targets, all_preds)

    return conf_matrix

In [None]:
#Plots metrics as graphs
def plot_metrics(metrics):
    epochs = range(1, len(metrics['train_loss']) + 1)

    plt.figure(figsize=(15, 5))

    # --- Loss ---
    plt.subplot(1, 3, 1)
    plt.plot(epochs, metrics['train_loss'], label='Training')
    plt.plot(epochs, metrics['val_loss'], label='Validation')
    plt.title("Loss")
    plt.xlabel("Ephocs")
    plt.ylabel("Loss")
    plt.legend()

    # --- Accuracy ---
    plt.subplot(1, 3, 2)
    plt.plot(epochs, metrics['train_accuracy'], label='Training')
    plt.plot(epochs, metrics['val_accuracy'], label='Validation')
    plt.title("Accuracy")
    plt.xlabel("Ephocs")
    plt.ylabel("Accuracy")
    plt.legend()

    # --- F1 ---
    plt.subplot(1, 3, 3)
    plt.plot(epochs, metrics['train_f1'], label='Training')
    plt.plot(epochs, metrics['val_f1'], label='Validation')
    plt.title("F1")
    plt.xlabel("Ephocs")
    plt.ylabel("F1")
    plt.legend()

    plt.tight_layout()
    plt.show()

In [None]:
#Plots metrics as indivual graphs in .svg
import matplotlib.pyplot as plt

def save_separate_metrics_plots_svg(metrics):
    epochs = range(1, len(metrics['train_loss']) + 1)
    
    FIGSIZE = (5, 5) 
    
    # --- Loss ---
    plt.figure(figsize=FIGSIZE)
    plt.plot(epochs, metrics['train_loss'], label='Training')
    plt.plot(epochs, metrics['val_loss'], label='Validation')
    plt.title("Loss")
    plt.xlabel("Epochs")
    plt.ylabel("Loss")
    plt.legend()
    plt.grid(True)
    plt.tight_layout()
    plt.savefig('metrics_loss_plot_proportional.svg', format='svg') 
    plt.close()
    
    # --- Accuracy ---
    plt.figure(figsize=FIGSIZE)
    plt.plot(epochs, metrics['train_accuracy'], label='Training')
    plt.plot(epochs, metrics['val_accuracy'], label='Validation')
    plt.title("Accuracy")
    plt.xlabel("Epochs")
    plt.ylabel("Accuracy")
    plt.legend()
    plt.grid(True)
    plt.tight_layout()
    plt.savefig('metrics_accuracy_plot_proportional.svg', format='svg')
    plt.close()

    # --- F1 ---
    plt.figure(figsize=FIGSIZE)
    plt.plot(epochs, metrics['train_f1'], label='Training')
    plt.plot(epochs, metrics['val_f1'], label='Validation')
    plt.title("F1 Score")
    plt.xlabel("Epochs")
    plt.ylabel("F1 Score")
    plt.legend()
    plt.grid(True)
    plt.tight_layout()
    plt.savefig('metrics_f1_plot_proportional.svg', format='svg')
    plt.close()
    
    return ['metrics_loss_plot_proportional.svg', 'metrics_accuracy_plot_proportional.svg', 'metrics_f1_plot_proportional.svg']

In [None]:
# Wrapper to apply transform on subset
class TransformedDataset(Dataset):
    def __init__(self, subset, transform=None):
        self.subset = subset   
        self.transform = transform

    def __getitem__(self, idx):
        x, y = self.subset[idx] 
        if self.transform:
            x = self.transform(x)
        return x, y

    def __len__(self):
        return len(self.subset)

# Managing the Dataset

 **Adjusting labels, applying augmentations, preprocessing, splitting into the correct sizes for the respective sets**

In [None]:
# Setting path and renaming labels

import shutil, os

# Source path
src = "/kaggle/input/c-nmc-2019-dataset/C-NMC 2019 (PKG)/C-NMC_training_data"

# Destiny path
dst = "/kaggle/working/C-NMC_training_data"

# Copies the entire folder for working
if not os.path.exists(dst):
    shutil.copytree(src, dst)

# We can now rename the folders inside each fold
for fold in ["fold_0", "fold_1", "fold_2"]:
    fold_path = os.path.join(dst, fold)

    # Rename ALL → 1
    all_path = os.path.join(fold_path, "all")
    if os.path.exists(all_path):
        os.rename(all_path, os.path.join(fold_path, "1"))

    # Rename HEM → 0
    hem_path = os.path.join(fold_path, "hem")
    if os.path.exists(hem_path):
        os.rename(hem_path, os.path.join(fold_path, "0"))

In [None]:
# Defining Augmentations

train_transform = transforms.Compose([
    transforms.CenterCrop(224),               # necessary for the swin input
    transforms.RandomHorizontalFlip(p=0.5),      # only training
    transforms.RandomVerticalFlip(p=0.5),        # only training
    transforms.RandomAffine(degrees=30, translate=(0.00, 0.00), scale=(1.00, 1.00), shear=5), # only training
    transforms.ToTensor(),
])

val_transform = transforms.Compose([
    transforms.CenterCrop(224),  # necessary for the swin input
    transforms.ToTensor(),
])

test_transform = transforms.Compose([
    transforms.CenterCrop(224),  # necessary for the swin input
    transforms.ToTensor(),
])

In [None]:
# Concatanating folders

fold_0 = ImageFolder(os.path.join(dst, "fold_0"))
fold_1 = ImageFolder(os.path.join(dst, "fold_1"))
fold_2 = ImageFolder(os.path.join(dst, "fold_2"))

dataset = ConcatDataset([fold_0, fold_1, fold_2])

In [None]:
# Splitting sets for train, validation and test (70%, 20%, 10%)

seed = 42
g = torch.Generator().manual_seed(seed)

# Sizes
n_total = len(dataset)
n_test = int(0.10 * n_total)
n_val  = int(0.20 * n_total)
n_train = n_total - n_val - n_test

# Split
train_dataset, val_dataset, test_dataset = random_split(
    dataset, [n_train, n_val, n_test], generator=g
)

print(f"Train: {len(train_dataset)} | Val: {len(val_dataset)} | Test: {len(test_dataset)}")

In [None]:
# Apply the respective transformations

train_dataset = TransformedDataset(train_dataset, transform=train_transform)
val_dataset   = TransformedDataset(val_dataset,   transform=val_transform)
test_dataset  = TransformedDataset(test_dataset,  transform=test_transform)

# WeightedRandomSampler

In [None]:
# Recalculate weights for the WeightedRandomSampler with the new training set

print("\nRecalculating weights for the sampler in the new train set...")
targets = [y for _, y in train_dataset]

class_sample_counts = np.bincount(targets)
weights = 1. / class_sample_counts
sample_weights = [weights[t] for t in targets]

# Balanced sampler
sampler = WeightedRandomSampler(
    weights=sample_weights,
    num_samples=30000, # Samples the same number of images as the train dataset
    replacement=True
)
print("Weight successfully recalculated.")

# Let's see some of the dataset images

In [None]:
plot_random_images(train_dataset)

# Defining the Swin Transformer

In [None]:
class SwinTransformerTiny(nn.Module):
    def __init__(self, drop_path_rate=0, drop_rate=0, attn_drop_rate=0):
        super().__init__()
        self.model = timm.create_model(
            'swin_tiny_patch4_window7_224',
            pretrained=True,
            num_classes=2,
            drop_path_rate=drop_path_rate,
            drop_rate=drop_rate,
            attn_drop_rate=attn_drop_rate
        )


    def forward(self, x):
        return self.model(x)

# Removing the attention mechanism (in case of an Ablation Study)

In [None]:
# If you wish to remove the attention mechanism for the ablation study
import torch.nn as nn

'''
def remove_attention(model):

    for name, module in model.named_children():

        # If the module is an attention layer
        if "attn" in name.lower():
            setattr(model, name, nn.Identity())
        
        # If its a SwinTransformerBlock (with function _attn)
        elif hasattr(module, "_attn"):
            def no_attn(x, mask=None):
                return x
            module._attn = no_attn

        # Recursively replaces inside the submodules
        else:
            remove_attention(module)
'''

# Defining the ResNet18 (experimental purposes, only)

In [None]:
# If you wish to use Resnet18 for testing or etc

"""class ResNet18(nn.Module):
    def __init__(self, drop_path_rate=0, drop_rate=0):
        super().__init__()
        self.model = timm.create_model(
            'resnet18',
            pretrained=True,
            num_classes=2,
            drop_path_rate=drop_path_rate,
            drop_rate=drop_rate
        )

    def forward(self, x):
        return self.model(x)"""

# Forcing to use dedicated GPU

In [None]:
# CPU or GPU device

device = 'cuda' if torch.cuda.is_available() else 'cpu'
device

# Dataloaders for train, validation and test

In [None]:
train_dataloader = DataLoader(train_dataset, batch_size=128, sampler=sampler, num_workers=4, prefetch_factor=4, pin_memory=True, persistent_workers=True)
val_dataloader = DataLoader(val_dataset, batch_size=128, shuffle=False, num_workers=4, prefetch_factor=4, pin_memory=True, persistent_workers=True)
test_dataloader = DataLoader(test_dataset, batch_size=128, shuffle=False, num_workers=4, prefetch_factor=4, pin_memory=True, persistent_workers=True)

# Instantiating the model

In [None]:


classifier_model = SwinTransformerTiny(drop_path_rate=0.15, drop_rate=0.1, attn_drop_rate=0.0).to(device)

# Removes attention
#remove_attention(classifier_model)

#Resnet18 model
#classifier_model = ResNet18(drop_path_rate=0.15, drop_rate=0.1).to(device)


criterion = nn.CrossEntropyLoss().to(device)

optimizer = optim.AdamW(classifier_model.parameters(), lr=1e-4, weight_decay=5e-2)
scheduler = optim.lr_scheduler.StepLR(optimizer, 1, 0.9)

optimizers = [optimizer]
schedulers = [scheduler]
epochs = [50]



# Downloading and loading parameters (if necessary)

In [None]:
# Download parameters from google drive (for retraining)

gdown.download('https://drive.google.com/file/d/1b4C0jr9QEnslJketPI9IkWPZVXivjLqr/view?usp=sharing', 'parameters.pth', fuzzy=True, quiet=False)

In [None]:
# Load parameters

classifier_model.load_state_dict(torch.load('/kaggle/working/parameters.pth'))

# Running the model

In [None]:
# Training
metrics = train_classification(classifier_model, train_dataloader, val_dataloader, criterion, epochs, optimizers, schedulers, device)

In [None]:
#Validation
val_metrics = test_classification(classifier_model, val_dataloader, criterion, device)

In [None]:
#Test
test_metrics = test_classification(classifier_model, test_dataloader, criterion, device)

# Let's plot some results

In [None]:
#Plotting metrics

plot_metrics(metrics)
# or
#save_separate_metrics_plots_svg(metrics)

In [None]:
# Plotting confusion matrix
plot_confusion_matrix(test_metrics['Confusion matrix'], class_names=[0, 1])

# Last but not least, save parameters

In [None]:
#Save parameters
torch.save(classifier_model.state_dict(), 'parameters.pth')