# Imports

In [None]:
import gdown
import matplotlib.pyplot as plt
import numpy as np
import os
import pandas as pd
import random
import seaborn as sns
import timm
import torch
import torch.nn.functional as F
import cv2

from collections import defaultdict
from PIL import Image, ImageOps
from tqdm import tqdm
from torch import nn
from torch import optim
from torch.utils.data import DataLoader, Dataset, WeightedRandomSampler, ConcatDataset, random_split
from torchvision import models, transforms
from torchvision.datasets import ImageFolder
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix

# Functions

In [None]:
# Plotar imagens aleatórias

def plot_random_images(dataset, gray=False):
    random_idx = np.random.randint(0, len(dataset), 9)

    plt.figure(figsize=(10, 10))
    for i, img_index in enumerate(random_idx):
        plt.subplot(3,3,i+1)
        plt.grid(False)
        image, label = dataset[img_index]
        plt.title(label)
        if gray:
          plt.imshow(image.permute(1,2,0), cmap='gray')
        else:
          plt.imshow(image.permute(1,2,0))
    
    plt.show()

In [None]:
# Plotar matriz de confusão

def plot_confusion_matrix_with_diagonal(
    cm,
    labels,
    title_matrix='Matriz de Confusão Normalizada',
    title_diagonal='Class Accuracies',
    extra_diagonals=None,
    extra_names=None
):

    row_sums = cm.sum(axis=1)

    sorted_indices = np.argsort(row_sums)[::-1]

    cm_sorted = cm[sorted_indices][:, sorted_indices]
    labels_sorted = [labels[i] for i in sorted_indices]

    cm_normalized = cm_sorted.astype('float') / cm_sorted.sum(axis=1)[:, np.newaxis]
    diagonal = np.diag(cm_normalized)

    accuracy_rows = [diagonal]
    accuracy_labels = [title_diagonal]

    if extra_diagonals is not None:
        accuracy_rows += extra_diagonals
    if extra_names is not None:
        accuracy_labels += extra_names
    elif extra_diagonals:
        accuracy_labels += [f'Modelo {i+1}' for i in range(len(extra_diagonals))]

    sns.set(font_scale=1.0)

    fig = plt.figure(figsize=(14, 16))

    ax1 = plt.subplot2grid((6, 1), (0, 0), rowspan=4)
    sns.heatmap(cm_normalized, annot=np.round(cm_normalized, 2), fmt=".2f", cmap='flare',
                xticklabels=labels_sorted, yticklabels=labels_sorted, linewidths=0.5,
                cbar=True, annot_kws={"size": 8}, ax=ax1)
    ax1.set_title(title_matrix)
    ax1.set_xlabel('Previsão')
    ax1.set_ylabel('Real')
    ax1.tick_params(axis='x', rotation=90)
    ax1.tick_params(axis='y', rotation=0)

    ax2 = plt.subplot2grid((6, 1), (4, 0), rowspan=2)
    sns.heatmap(accuracy_rows, annot=np.round(accuracy_rows, 2), fmt=".2f", cmap='flare',
                xticklabels=labels_sorted, yticklabels=accuracy_labels, cbar=True,
                linewidths=0.5, ax=ax2)
    ax2.tick_params(axis='x', rotation=90)
    ax2.tick_params(axis='y', rotation=0)

    plt.tight_layout()
    plt.show()


In [None]:
# Função auxiliar da train_classification()

def _train_step_classification(classifier_model, dataloader, criterion, optimizer, scheduler, device):
    classifier_model.train()

    total_loss = 0
    all_preds = []
    all_targets = []

    for X, y in dataloader:
        X, y = X.to(device), y.to(device)
        y_pred = classifier_model(X)
        loss = criterion(y_pred, y)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item() * X.size(0)

        y_pred_labels = torch.argmax(y_pred, axis=1)
        all_preds.extend(y_pred_labels.detach().cpu().numpy())
        all_targets.extend(y.detach().cpu().numpy())

    if scheduler:
        scheduler.step()

    mean_loss = total_loss / len(dataloader.dataset)
    accuracy = accuracy_score(all_targets, all_preds)
    precision = precision_score(all_targets, all_preds, average='binary', zero_division=0)
    recall = recall_score(all_targets, all_preds, average='binary', zero_division=0)
    f1 = f1_score(all_targets, all_preds, average='binary', zero_division=0)

    return mean_loss, accuracy, precision, recall, f1

In [None]:
# Função auxiliar da train_classification()

def _test_step_classification(classifier_model, dataloader, criterion, device):
    classifier_model.eval()

    total_loss = 0
    all_preds = []
    all_targets = []

    with torch.inference_mode():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            y_pred = classifier_model(X)
            loss = criterion(y_pred, y)

            total_loss += loss.item() * X.size(0)

            y_pred_labels = torch.argmax(y_pred, axis=1)
            all_preds.extend(y_pred_labels.detach().cpu().numpy())
            all_targets.extend(y.detach().cpu().numpy())

    mean_loss = total_loss / len(dataloader.dataset)
    accuracy = accuracy_score(all_targets, all_preds)
    precision = precision_score(all_targets, all_preds, average='binary', zero_division=0)
    recall = recall_score(all_targets, all_preds, average='binary', zero_division=0)
    f1 = f1_score(all_targets, all_preds, average='binary', zero_division=0)

    return mean_loss, accuracy, precision, recall, f1

In [None]:
# Treina rede de classificação

def train_classification(classifier_model, train_dataloader, test_dataloader, criterion, epochs, optimizers, schedulers, device='cpu', verbose=True):
    metrics = {'Train loss': [], 'Train accuracy': [], 'Train precision': [], 'Train recall': [], 'Train F1': [],
               'Val loss': [], 'Val accuracy': [], 'Val precision': [], 'Val recall': [], 'Val F1': []}

    current_optimizer_number = 0
    for optimizer, scheduler, num_epochs in zip(optimizers, schedulers, epochs):
        current_optimizer_number += 1
        for epoch in tqdm(range(1, num_epochs+1), desc=f'Training with optimizer {current_optimizer_number}'):
            current_lr = optimizer.param_groups[0]["lr"]
            train_loss, train_accuracy, train_precision, train_recall, train_f1= _train_step_classification(classifier_model, train_dataloader, criterion, optimizer, scheduler, device)
            test_loss, test_accuracy, test_precision, test_recall, test_f1 = _test_step_classification(classifier_model, test_dataloader, criterion, device)

            metrics['Train loss'].append(train_loss)
            metrics['Train accuracy'].append(train_accuracy)
            metrics['Train precision'].append(train_precision)
            metrics['Train recall'].append(train_recall)
            metrics['Train F1'].append(train_f1)
            metrics['Val loss'].append(test_loss)
            metrics['Val accuracy'].append(test_accuracy)
            metrics['Val precision'].append(test_precision)
            metrics['Val recall'].append(test_recall)
            metrics['Val F1'].append(test_f1)

            if verbose:
                print(f'\nEPOCH {epoch} | Current learning rate: {current_lr:.8f}\n'
                      f'Train loss: {train_loss:.4f} | Train accuracy: {train_accuracy:.4f} | Train precision: {train_precision:.4f} | Train recall: {train_recall:.4f} | Train F1: {train_f1:.4f}\n'
                      f'Val loss: {test_loss:.4f} | Val accuracy: {test_accuracy:.4f} | Val precision: {test_precision:.4f} | Val recall: {test_recall:.4f} | Val F1: {test_f1:.4f}\n')

    return metrics

In [None]:
# Retorna matriz de confusão

def get_conf_matrix(classifier_model, dataloader, device):
    classifier_model.eval()

    all_preds = []
    all_targets = []

    with torch.inference_mode():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            y_pred = classifier_model(X).argmax(dim=1).cpu()

            all_preds.extend(y_pred.cpu())
            all_targets.extend(y.cpu())

    conf_matrix = confusion_matrix(all_targets, all_preds)

    return conf_matrix

In [None]:
#Wrapper pra receber o subset do dataset e aplicar o transform

from torch.utils.data import Dataset

# Wrapper que aplica transform no subset
class TransformedDataset(Dataset):
    def __init__(self, subset, transform=None):
        self.subset = subset   # pode ser random_split (Subset)
        self.transform = transform

    def __getitem__(self, idx):
        x, y = self.subset[idx]   # retorna PIL Image + label
        if self.transform:
            x = self.transform(x)
        return x, y

    def __len__(self):
        return len(self.subset)

# Dataset

In [None]:
train_transform = transforms.Compose([
    transforms.CenterCrop(224),              
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomVerticalFlip(p=0.5),        
    transforms.RandomAffine(degrees=30, translate=(0.00, 0.00), scale=(1.00, 1.00), shear=5),
    transforms.ToTensor(),
    #transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

val_transform = transforms.Compose([
    transforms.CenterCrop(224), 
    transforms.ToTensor(),
    #transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

test_transform = transforms.Compose([
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    #transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

In [None]:
fold_0 = ImageFolder("/kaggle/input/c-nmc-2019-dataset/C-NMC 2019 (PKG)/C-NMC_training_data/fold_0")
fold_1 = ImageFolder("/kaggle/input/c-nmc-2019-dataset/C-NMC 2019 (PKG)/C-NMC_training_data/fold_1")
fold_2 = ImageFolder("/kaggle/input/c-nmc-2019-dataset/C-NMC 2019 (PKG)/C-NMC_training_data/fold_2")

dataset = ConcatDataset([fold_0, fold_1, fold_2])

In [None]:
seed = 42
g = torch.Generator().manual_seed(seed)

# Tamanhos
n_total = len(dataset)
n_test = int(0.10 * n_total)
n_val  = int(0.20 * n_total)
n_train = n_total - n_val - n_test

# Split
train_dataset, val_dataset, test_dataset = random_split(
    dataset, [n_train, n_val, n_test], generator=g
)

print(f"Train: {len(train_dataset)} | Val: {len(val_dataset)} | Test: {len(test_dataset)}")

In [None]:
train_dataset = TransformedDataset(train_dataset, transform=train_transform)
val_dataset   = TransformedDataset(val_dataset,   transform=val_transform)
test_dataset  = TransformedDataset(test_dataset,  transform=test_transform)

In [None]:
# Conta quantos exemplos existem de cada classe
targets = [y for _, y in train_dataset]

class_sample_counts = np.bincount(targets)  # [num_classe0, num_classe1]
weights = 1. / class_sample_counts          # inverso da frequência

# Cria vetor de pesos para cada exemplo
sample_weights = [weights[t] for t in targets]

# Sampler balanceado
sampler = WeightedRandomSampler(
    weights=sample_weights,
    num_samples=30000,   # total de amostras desejadas
    replacement=True     # permite repetir para balancear
)

In [None]:
plot_random_images(train_dataset) #falta tirar a normalização depois, pra mostrar as imagens normalmente

# Model

In [None]:
class SwinTransformerBase(nn.Module):
    def __init__(self, drop_path_rate=0, drop_rate=0, attn_drop_rate=0):
        super().__init__()
        self.model = timm.create_model(
            'swin_base_patch4_window7_224',
            pretrained=True,
            num_classes=2,
            drop_path_rate=drop_path_rate,
            drop_rate=drop_rate,
            attn_drop_rate=attn_drop_rate
        )


    def forward(self, x):
        return self.model(x)

# Training

In [None]:
# Dispositivo (CPU ou GPU)

device = 'cuda' if torch.cuda.is_available() else 'cpu'
device

In [None]:
train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=False, num_workers=4, prefetch_factor=4, pin_memory=True, persistent_workers=True)
val_dataloader = DataLoader(val_dataset, batch_size=32, shuffle=False, num_workers=4, prefetch_factor=4, pin_memory=True, persistent_workers=True)
test_dataloader = DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=4, prefetch_factor=4, pin_memory=True, persistent_workers=True)

## Classification

In [None]:
from torch.optim.lr_scheduler import CosineAnnealingLR

classifier_model = SwinTransformerBase(drop_path_rate=0.2, drop_rate=0.1, attn_drop_rate=0.0).to(device)
criterion = nn.CrossEntropyLoss().to(device)

optimizer = optim.AdamW(classifier_model.parameters(), lr=1e-4, weight_decay=5e-2) 


scheduler = optim.lr_scheduler.StepLR(optimizer, 1, 0.9)

optimizers = [optimizer]
schedulers = [scheduler]
epochs = [50]

In [None]:
#plot_results(results)

In [None]:
# Baixando parâmetros do drive (se necessário)
#gdown.download('https://drive.google.com/file/d/1xowiBw8I_GhCwx2sTwM7ttyqrGkawpc7/view?usp=drive_link', 'parameters.pth', fuzzy=True, quiet=False)

In [None]:
# Carregando parâmetros
#classifier_model.load_state_dict(torch.load('/kaggle/working/parameters.pth'))

In [None]:
# Treino
metrics = train_classification(classifier_model, train_dataloader, val_dataloader, criterion, epochs, optimizers, schedulers, device)

In [None]:
# --- Avaliação Final no Conjunto de Teste ---

print("Avaliando o modelo final no conjunto de teste...")

test_loss, test_accuracy, test_precision, test_recall, test_f1 = _test_step_classification(
    classifier_model,
    test_dataloader,
    criterion,
    device
)

print("\n--- Resultados Finais no Conjunto de Teste ---")
print(f'Test loss: {test_loss:.4f}')
print(f'Test accuracy: {test_accuracy:.4f}')
print(f'Test precision: {test_precision:.4f}')
print(f'Test recall: {test_recall:.4f}')
print(f'Test F1: {test_f1:.4f}')


print("\nGerando a Matriz de Confusão...")
conf_matrix = get_conf_matrix(classifier_model, test_dataloader, device)

class_names = fold_0.classes 

plot_confusion_matrix_with_diagonal(conf_matrix, labels=class_names, title_matrix='Matriz de Confusão (Conjunto de Teste)')

In [None]:
torch.save(classifier_model.state_dict(), 'parameters.pth')