# Imports:

In [2]:
import os
import cv2
from tqdm import tqdm
import numpy as np
import torch
import torchvision
from torch import nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, ConcatDataset
from torch.optim import lr_scheduler
from torchvision import transforms
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
from PIL import Image
import pywt

cuda = torch.cuda.is_available()

In [3]:
class BlurDataset(Dataset):
    def __init__(self, images, labels, train=True, transform=None):
        
        self.train = train
        self.labels = labels
        self.images = images
        self.classes = list(set(labels)) 
        self.transform = transform
        
    def __getitem__(self, index: int):
        img_path = self.images[index]
        label = self.labels[index]
        original = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)
        transformed = self.transform(original).unsqueeze(0)
        return transformed, label

    def __len__(self):
        return len(self.images)

# Transformada de Fourier:

In [4]:
def aplicar_janela_hann(imagem):
    linhas, colunas = imagem.shape[:2]
    hann_vertical = np.hanning(linhas) # Criar a janela de Hann
    hann_horizontal = np.hanning(colunas)
    janela_2d = hann_vertical[:, np.newaxis] * hann_horizontal # Aplicar a janela à imagem
    imagem_janelada = imagem * janela_2d # Multiplicar a imagem pela janela
    return imagem_janelada

def aplicar_transformada_fourier(imagem):
    imagem = cv2.resize(imagem, dsize=(224, 224))
    imagem = aplicar_janela_hann(imagem)
    transformada_fourier = np.fft.fft2(imagem) # Calcular a transformada de Fourier 2D
    transformada_fourier_deslocada = np.fft.fftshift(transformada_fourier) # Mover o componente de baixa frequência para o centro
    espectro_magnitude = np.log(np.abs(transformada_fourier_deslocada) + 1) # Calcular o espectro de magnitude (log para melhor visualização)
    espectro_magnitude = cv2.resize(espectro_magnitude, dsize=(120, 120))
    return espectro_magnitude

# Transformada de Wavelets:

In [5]:
def aplicar_transformada_haar_merged(imagem):
    coeffs = pywt.wavedec2(imagem, 'haar', level=2)
    cA2, (cH2, cV2, cD2), (cH1, cV1, cD1) = coeffs
    imagem_reconstruida = pywt.waverec2(coeffs, 'haar')
    merged = cv2.add(cH2, cV2)
    merged = cv2.resize(merged, dsize=(120, 120))
    return merged

---
# Classifier:

In [7]:
class CNN(nn.Module):

    def __init__(self):
        super(CNN, self).__init__()
        self.convnet = nn.Sequential(
            nn.Conv2d(1, 32, kernel_size=3,padding=1), nn.ReLU(),
            nn.Conv2d(32, 32, kernel_size=3,padding=1), nn.ReLU(),
            nn.MaxPool2d(2, 2),
            
            nn.Conv2d(32, 64, kernel_size=3,padding=1), nn.ReLU(),
            nn.MaxPool2d(2, 2),
            
            nn.Conv2d(64, 128, kernel_size=3,padding=1), nn.ReLU(),
            nn.MaxPool2d(2, 2),
            
            nn.Flatten()
        )
        
        self.fc = nn.Sequential(
            nn.Linear(28800, 1024),
            nn.ReLU(),
            nn.Dropout(),
            nn.BatchNorm1d(1024),
            
            nn.Linear(1024, 512),
            nn.ReLU(),
            nn.Dropout(),
            nn.BatchNorm1d(512),
            
            nn.Linear(512, 3),
        )

    def forward(self, x):
        x = self.convnet(x)
        x = self.fc(x)
        return x

    def get_embedding(self, x):
        return self.forward(x)

In [8]:
from tqdm import tqdm
from sklearn.metrics import accuracy_score, precision_score, recall_score, roc_auc_score, RocCurveDisplay


def get_metrics(y_targets, y_preds, y_probs=None):
    acc = accuracy_score(y_targets, y_preds)
    prec = precision_score(y_targets, y_preds, average='macro', zero_division=0.0)
    rec = recall_score(y_targets, y_preds, average='macro', zero_division=0.0)
    if not y_probs == None:
        y_probs = np.array(y_probs)
        y_onehot_test = np.array([[int(tg.item()==0), int(tg.item()==1), int(tg.item()==2)] for tg in y_targets])
        micro_roc_auc_ovr = roc_auc_score(
            y_onehot_test,
            y_probs,
            multi_class="ovr",
            average="micro",
        )
        return acc, prec, rec, micro_roc_auc_ovr
    return acc, prec, rec, None


def fit(train_loader, val_loader, model, loss_fn, optimizer, scheduler, n_epochs, cuda, log_interval, start_epoch=0):
    history = {'train_loss': [], 'train_acc': [], 'train_prec': [], 'train_rec': [],
              'val_loss': [], 'val_acc': [], 'val_prec': [], 'val_rec': []}

    for epoch in range(start_epoch, n_epochs):
        epoch_str = "[{} / {} epochs]".format(epoch, n_epochs)
        
        train_loss, y_targets, y_preds = train_epoch(train_loader, model, loss_fn, optimizer, cuda, log_interval, epoch_str)
        train_acc, train_prec, train_rec, _ = get_metrics(y_targets, y_preds)
        history['train_loss'].append(train_loss)
        history['train_acc'].append(train_acc)
        history['train_prec'].append(train_prec)
        history['train_rec'].append(train_rec)
        message = 'Epoch: {}/{}. train - avgloss: {:.4f} | acc: {:.4f} | precision: {:.4f} | recall: {:.4f} '.format(
            epoch + 1, n_epochs, train_loss, train_acc, train_prec, train_rec)

        if not val_loader == None:
            val_loss, y_targets, y_preds, _ = test_epoch(val_loader, model, loss_fn, cuda)
            val_acc, val_prec, val_rec, _ = get_metrics(y_targets, y_preds)
            history['val_loss'].append(val_loss)
            history['val_acc'].append(val_acc)
            history['val_prec'].append(val_prec)
            history['val_rec'].append(val_rec)
            message += '\nEpoch: {}/{}. valid - avgloss: {:.4f} | acc: {:.4f} | precision: {:.4f} | recall: {:.4f} '.format(
                epoch + 1, n_epochs, val_loss, val_acc, val_prec, val_rec)
            
        print(message)
        scheduler.step()
    return history

         
def train_epoch(train_loader, model, loss_fn, optimizer, cuda, log_interval, epoch_str):
    model.train()
    losses = []
    total_loss = 0
    
    y_preds = []
    y_targets = []
    
    pbar = tqdm(train_loader, desc="{} Training - avg_loss: ".format(epoch_str))
    for batch_idx, (data, target) in enumerate(pbar):
        target = target if len(target) > 0 else None
        if not type(data) in (tuple, list):
            data = (data,)
        if cuda:
            data = tuple(d.cuda() for d in data)
            if target is not None:
                target = target.cuda()

        optimizer.zero_grad()
        outputs = model(*data)

        if type(outputs) not in (tuple, list):
            outputs = (outputs,)

        loss_inputs = outputs
        if target is not None:
            target = (target,)
            loss_inputs += target

        loss_outputs = loss_fn(*loss_inputs)
        loss = loss_outputs[0] if type(loss_outputs) in (tuple, list) else loss_outputs
        losses.append(loss.item())
        total_loss += loss.item()
        loss.backward()
        optimizer.step()
        
        _, predicted = outputs[0].max(1)
        y_preds += predicted.cpu()
        y_targets += target[0].cpu()
        
        pbar.set_description("{} Training - avg_loss: {:.4f}".format(epoch_str , np.mean(losses)))

    total_loss /= (batch_idx + 1)
    return total_loss, y_targets, y_preds


def test_epoch(val_loader, model, loss_fn, cuda):
    with torch.no_grad():
        model.eval()
        val_loss = 0
        
        y_probs = []
        y_preds = []
        y_targets = []
        
        for batch_idx, (data, target) in enumerate(val_loader):
            target = target if len(target) > 0 else None
            if not type(data) in (tuple, list):
                data = (data,)
            if cuda:
                data = tuple(d.cuda() for d in data)
                if target is not None:
                    target = target.cuda()

            outputs = model(*data)

            if type(outputs) not in (tuple, list):
                outputs = (outputs,)
            loss_inputs = outputs
            if target is not None:
                target = (target,)
                loss_inputs += target

            loss_outputs = loss_fn(*loss_inputs)
            loss = loss_outputs[0] if type(loss_outputs) in (tuple, list) else loss_outputs
            val_loss += loss.item()
            
            _, predicted = outputs[0].max(1)
            y_probs += outputs[0].cpu()
            y_preds += predicted.cpu()
            y_targets += target[0].cpu()
                
    val_loss /= len(val_loader)
    return val_loss, y_targets, y_preds, y_probs

---
# Simple Training:
### Prepare Dataset:

In [None]:
root = "path/to/dataset/"

In [None]:
BATCH_SIZE = 32

def toTensor(image):
    return torch.tensor(image, dtype=torch.float32)

transform=transforms.Compose([
    aplicar_transformada_haar_merged, #aplicar_transformada_fourier,
    toTensor,
])

CLASSES = os.listdir(root)
anns = [(os.path.join(root, classe, img), classe) for classe in CLASSES for img in os.listdir(os.path.join(root, classe))]
X = [ann[0] for ann in anns]
y = [ann[1] for ann in anns]

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.15, random_state=42)
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.15, random_state=42)

from sklearn.preprocessing import LabelEncoder
le = LabelEncoder()

y_train = le.fit_transform(y_train)
y_test = le.transform(y_test)
y_val = le.transform(y_val)

kwargs = {'num_workers': 1, 'pin_memory': True} if cuda else {}
train_dataset = BlurDataset(X_train, y_train, True, transform)
test_dataset = BlurDataset(X_test, y_test, False, transform)
val_dataset = BlurDataset(X_val, y_val, False, transform)
print(len(X_train), len(X_test), len(X_val), le.classes_)

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, **kwargs)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, **kwargs)
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, **kwargs)

In [None]:
# Show sample:
import random
r1 = random.randint(0, train_dataset.__len__())
img, label = train_dataset.__getitem__(r1)
print(img.shape, label)

plt.subplot(121), plt.imshow(img.squeeze(), cmap='gray')
plt.title('Input'), plt.xticks([]), plt.yticks([])

### Train Model:

In [11]:
EPOCHS = 30
LR = 1e-3
LOSS_FN = nn.CrossEntropyLoss()

In [None]:
# Train:
model = CNN()
if cuda:
    model.cuda()

optimizer = optim.Adam(model.parameters(), lr=LR)
scheduler = lr_scheduler.StepLR(optimizer, 8, gamma=0.1, last_epoch=-1)
history = fit(train_loader, val_loader, model, LOSS_FN, optimizer, scheduler, EPOCHS, cuda, 0)

In [None]:
# Save Model:
save_path = f'./model.pth'
torch.save(model.state_dict(), save_path)

### Test Model:

In [None]:
test_loss, y_targets, y_preds, y_probs = test_epoch(test_loader, model, LOSS_FN, cuda)
test_acc, test_prec, test_rec, test_auc = get_metrics(y_targets, y_preds, y_probs)
message = 'Test set - acc: {:.4f} | precision: {:.4f} | recall: {:.4f} | AUC: {:.4f}'.format(
    test_acc, test_prec, test_rec, test_auc)
print(message)

---
# Cross-Validation:

### Prepare Dataset:

In [None]:
root = "path/to/dataset/"

In [9]:
from sklearn.model_selection import KFold
from sklearn.preprocessing import LabelEncoder

def toTensor(image):
    return torch.tensor(image, dtype=torch.float32)

transform=transforms.Compose([
    aplicar_transformada_fourier, #aplicar_transformada_haar_merged,
    toTensor,
])

root = "./blur_datasets/mixed_blur_dataset_2/"
CLASSES = os.listdir(root)
anns = [(os.path.join(root, classe, img), classe) for classe in CLASSES for img in os.listdir(os.path.join(root, classe))]
X = [ann[0] for ann in anns]
y = [ann[1] for ann in anns]

le = LabelEncoder()
y_encoded = le.fit_transform(y)
all_data = BlurDataset(X, y_encoded, True, transform)

In [None]:
# Show sample:
import random
r1 = random.randint(0, all_data.__len__())
img, label = all_data.__getitem__(r1)
print(img.shape, label)

plt.subplot(121), plt.imshow(img.squeeze(), cmap='gray')
plt.title('Input'), plt.xticks([]), plt.yticks([])

### Start CV process:

In [11]:
LR = 1e-3
EPOCHS = 30
K_FOLDS = 10
BATCH_SIZE = 32
LOSS_FN = nn.CrossEntropyLoss()
results = {}

kfold = KFold(n_splits=K_FOLDS, shuffle=True)
split_folds = kfold.split(all_data)

In [None]:
def reset_weights(m):
  '''
    Try resetting model weights to avoid
    weight leakage.
  '''
  for layer in m.children():
    if hasattr(layer, 'reset_parameters'):
        print(f'Reset trainable parameters of layer = {layer}')
        layer.reset_parameters()

for fold, (train_ids, test_ids) in enumerate(split_folds):
    print(f'FOLD {fold+1}')
    print('---------------------------------------------------------------------------')
    train_subsampler = torch.utils.data.SubsetRandomSampler(train_ids)
    test_subsampler = torch.utils.data.SubsetRandomSampler(test_ids)
    
    train_loader = torch.utils.data.DataLoader(
                      all_data, 
                      batch_size=BATCH_SIZE, sampler=train_subsampler)
    test_loader = torch.utils.data.DataLoader(
                      all_data,
                      batch_size=BATCH_SIZE, sampler=test_subsampler)
    
    model = CNN()
    model.apply(reset_weights)
    if cuda:
        model.cuda()
    optimizer = optim.Adam(model.parameters(), lr=LR)
    scheduler = lr_scheduler.StepLR(optimizer, 8, gamma=0.1, last_epoch=-1)
    history = fit(train_loader, None, model, LOSS_FN, optimizer, scheduler, EPOCHS, cuda, 0)
    
    # print('Training process has finished. Saving trained model.')
    # save_path = f'./blur-mix-fourier-fold-{fold}.pth'
    # torch.save(model.state_dict(), save_path)
    # Load models:
    # model = CNN().to("cuda")
    # model.load_state_dict(torch.load(f'./blur-coco-model-fold-{fold}.pth', map_location=torch.device('cuda')))
    
    print('Starting testing...')
    test_loss, y_targets, y_preds, y_probs = test_epoch(test_loader, model, LOSS_FN, cuda)
    test_acc, test_prec, test_rec, test_auc = get_metrics(y_targets, y_preds, y_probs)
    y_probs = np.array(y_probs)
    y_onehot_test = np.array([[int(tg.item()==0), int(tg.item()==1), int(tg.item()==2)] for tg in y_targets])
    results[fold] = {
        "acc": test_acc,
        "prec": test_prec,
        "rec": test_rec,
        "auc": test_auc,
        "probs": y_probs,
        "y_onehot": y_onehot_test,
        "history": history,
    }
    
    message = 'Test set - acc: {:.4f} | precision: {:.4f} | recall: {:.4f} | auc: {:.4f}'.format(
        test_acc, test_prec, test_rec, test_auc)
    print(message)
    print('---------------------------------------------------------------------------')    

### Show Results:

In [None]:
# Print fold results
print(f'K-FOLD CROSS VALIDATION RESULTS FOR {K_FOLDS} FOLDS')
print('-------------------------------------------')

fig, ax = plt.subplots(figsize=(6, 6))

avg_acc = 0.0
avg_prec = 0.0
avg_rec = 0.0
avg_auc = 0.0

colors = ["darkgrey", "red", "peru", "darkorange", "gold",
          "yellowgreen", "turquoise", "blue", "violet", "hotpink"]

for fold in range(K_FOLDS):
    message = 'Test set - acc: {:.4f} | precision: {:.4f} | recall: {:.4f} | auc: {:.4f}'.format(
        results[fold]["acc"], results[fold]["prec"], results[fold]["rec"], results[fold]["auc"])
    print(f'Fold {fold+1}-', message)
    avg_acc += results[fold]["acc"]
    avg_prec += results[fold]["prec"]
    avg_rec += results[fold]["rec"]
    avg_auc += results[fold]["auc"]
    
    RocCurveDisplay.from_predictions(
        results[fold]["y_onehot"].ravel(),
        results[fold]["probs"].ravel(),
        name=f'micro-average OvR: Fold-{fold+1}',
        color=colors[fold],
        ax=ax
    )
    
print('\n\nAverage Acc: {:.2f} %'.format(100*avg_acc/len(results.items())))
print('Average Prec: {:.2f} %'.format(100*avg_prec/len(results.items())))
print('Average Rec: {:.2f} %'.format(100*avg_rec/len(results.items())))
print('Average AUC: {:.2f} %'.format(100*avg_auc/len(results.items())))

_ = ax.set(
    xlabel="False Positive Rate",
    ylabel="True Positive Rate",
    title="Micro-averaged One-vs-Rest\nReceiver Operating Characteristic",
)