In [None]:
import os
import random
import time
import datetime
import copy
import itertools

import numpy as np
import matplotlib.pyplot as plt

from sklearn import metrics
from sklearn.model_selection import KFold

import torch
from torch import nn, optim

from torchvision import transforms, models, datasets

In [None]:
def initialize_model(model_name, num_classes):
    # Initialize these variables which will be set in this if statement. Each of these
    #   variables is model specific.
    model_ft = None
    input_size = 0

    if model_name == "resnet":
        """ Resnet18
        """
        model_ft = models.resnet18(weights='DEFAULT')
        num_ftrs = model_ft.fc.in_features
        model_ft.fc = nn.Linear(num_ftrs, num_classes)
        input_size = 224

    elif model_name == "alexnet":
        """ Alexnet
        """
        model_ft = models.alexnet(weights='DEFAULT')
        num_ftrs = model_ft.classifier[6].in_features
        model_ft.classifier[6] = nn.Linear(num_ftrs,num_classes)
        input_size = 224

    elif model_name == "vgg":
        """ VGG11_bn
        """
        model_ft = models.vgg11_bn(weights='DEFAULT')
        num_ftrs = model_ft.classifier[6].in_features
        model_ft.classifier[6] = nn.Linear(num_ftrs,num_classes)
        input_size = 224

    elif model_name == "squeezenet":
        """ Squeezenet
        """
        model_ft = models.squeezenet1_0(weights='DEFAULT')
        model_ft.classifier[1] = nn.Conv2d(512, num_classes, kernel_size=(1,1), stride=(1,1))
        model_ft.num_classes = num_classes
        input_size = 224

    elif model_name == "densenet":
        """ Densenet
        """
        model_ft = models.densenet121(weights='DEFAULT')
        num_ftrs = model_ft.classifier.in_features
        model_ft.classifier = nn.Linear(num_ftrs, num_classes)
        input_size = 224

    elif model_name == "inception":
        """ Inception v3
        Be careful, expects (299,299) sized images and has auxiliary output
        """
        model_ft = models.inception_v3(weights='DEFAULT')
        # Handle the auxilary net
        num_ftrs = model_ft.AuxLogits.fc.in_features
        model_ft.AuxLogits.fc = nn.Linear(num_ftrs, num_classes)
        # Handle the primary net
        num_ftrs = model_ft.fc.in_features
        model_ft.fc = nn.Linear(num_ftrs,num_classes)
        input_size = 299

    else:
        print("Invalid model name, exiting...")
        exit()

    return model_ft, input_size

In [None]:
def evaluate_model(model, dataloader, device):
    y_true = []
    y_pred = []
    correct = 0
    total = 0
    model.eval()
    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            y_true += labels.tolist()
            y_pred += predicted.tolist()

    return y_true, y_pred

In [None]:
def plot_confusion_matrix(cm, classes, title='Confusion matrix', cmap=plt.cm.Blues):
    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)

    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, cm[i, j],
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")

    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label')

In [None]:
def plot_loss_accuracy(train_losses, val_losses, train_accs, val_accs, model_name, fold, save_dir):
    """
    Plota gráfico de loss e accuracy por epoch para conjunto de treinamento e validação
    
    Args:
        train_losses (list): Lista com os valores de loss do conjunto de treinamento por epoch
        val_losses (list): Lista com os valores de loss do conjunto de validação por epoch
        train_accs (list): Lista com os valores de accuracy do conjunto de treinamento por epoch
        val_accs (list): Lista com os valores de accuracy do conjunto de validação por epoch
    
    Returns:
        None
    """
    # Define o número de epochs
    epochs = len(train_losses)
    
    # Define o eixo x do gráfico como o número de epochs
    x = range(1, epochs + 1)
    
    # Plota os gráficos de loss e accuracy para conjunto de treinamento e validação
    plt.figure(figsize=(10, 5))
    plt.subplot(1, 2, 1)
    plt.plot(x, train_losses, c='magenta' ,ls='--', label='Train loss', fillstyle='none')
    plt.plot(x, val_losses, c='green' ,ls='--', label='Val. loss', fillstyle='none')
    plt.title('Loss por epoch')
    plt.legend()
    plt.subplot(1, 2, 2)
    plt.plot(x, [acc.cpu() for acc in train_accs], c='magenta' ,ls='-', label='Train acuracy', fillstyle='none')
    plt.plot(x, [acc.cpu() for acc in val_accs], c='green' ,ls='-', label='Val. accuracy', fillstyle='none')
    plt.title('Accuracy por epoch')
    plt.legend()
    
    plt.savefig(f'{save_dir}/{model_name}_fold_{fold}_loss_acc.pdf')
    plt.show()

In [None]:
def generate_classification_report(model, dataloader,class_names, device='cpu'):
    # Define o modelo para o dispositivo correto (CPU ou GPU)
    model = model.to(device)
    model.eval()
    
    # Inicializa as variáveis de predições e rótulos verdadeiros
    all_preds = torch.tensor([], dtype=torch.long, device=device)
    all_labels = torch.tensor([], dtype=torch.long, device=device)

    # Realiza a predição para cada lote de dados no dataloader
    for inputs, labels in dataloader:
        inputs = inputs.to(device)
        labels = labels.to(device)

        with torch.no_grad():
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)

        # Adiciona as predições e rótulos verdadeiros às variáveis criadas anteriormente
        all_preds = torch.cat((all_preds, preds), dim=0)
        all_labels = torch.cat((all_labels, labels), dim=0)
    
    # Gera o classification report com base nas predições e rótulos verdadeiros
    report = metrics.classification_report(all_labels.cpu().numpy(), all_preds.cpu().numpy(),target_names=class_names,
                                           digits=4, zero_division=0)
    
    return report

In [None]:
def train_model(model, dataloaders, optimizer, basic_parameters,fold, date_now, device='cpu'):
    
    # Tempo total do treinamento (treinamento e validação)
    since = time.time()

    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0
    best_loss = float('inf')

    train_loss_list = []
    train_acc_list = []
    val_loss_list = []
    val_acc_list = []
    
    model_name = basic_parameters.get('model_name')
    num_epochs = basic_parameters.get('epochs')
    batch_size = basic_parameters.get('batch_size')

    # Cria a pasta com o nome do modelo
    output_dir = r'outputs/' + model_name
    os.makedirs(output_dir, exist_ok=True)

    # Cria a pasta para separar os testes
    result_dir = output_dir + '\\' + model_name +'_' + date_now
    os.makedirs(result_dir, exist_ok=True)

    # Abre o arquivo para salvar o resultado
    f = open(f'{result_dir}/{model_name}_fold_{fold}.txt', 'w')
    
    for epoch in range(num_epochs):
        f.write(f'Epoch {epoch}/{num_epochs - 1}\n')
        f.write('-' * 10 + '\n')

        print(f'Epoch {epoch}/{num_epochs - 1}')
        print('-' * 10)

        for phase in ['train', 'val']:
            # Inicia contagem de tempo da época
            time_epoch_start = time.time()

            if phase == 'train':
                model.train()
            else:
                model.eval()

            # Perda (loss) nesta época
            running_loss = 0.0
            # Amostras classificadas corretamente nesta época
            running_corrects = 0

            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(device)
                labels = labels.to(device)

                optimizer.zero_grad()

                with torch.set_grad_enabled(phase == 'train'):
                    # Get model outputs and calculate loss
                    # Special case for inception because in training it has an auxiliary output. In train
                    #   mode we calculate the loss by summing the final output and the auxiliary output
                    #   but in testing we only consider the final output.
                    if model_name == 'inception' and phase == 'train':
                        # From https://discuss.pytorch.org/t/how-to-optimize-inception-model-with-auxiliary-classifiers/7958
                        outputs, aux_outputs = model(inputs)
                        loss1 = basic_parameters.get('criterion')(outputs, labels)
                        loss2 = basic_parameters.get('criterion')(aux_outputs, labels)
                        loss = loss1 + 0.4*loss2
                    else:
                        outputs = model(inputs)
                        loss = basic_parameters.get('criterion')(outputs, labels)

                    _, preds = torch.max(outputs, 1)

                    # backward + optimize only if in training phase
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                # Atualiza a perda da época
                running_loss += loss.item() * inputs.size(0)
                # Atualiza o número de amostras classificadas corretamente na época.
                running_corrects += torch.sum(preds == labels.data)
            # Perda desta época
            epoch_loss = running_loss / len(dataloaders[phase].dataset)
            # Acurácia desta época
            epoch_acc = running_corrects.double() / len(dataloaders[phase].dataset)

            # Tempo total desta época
            time_epoch = time.time() - time_epoch_start

            f.write(f'{phase.capitalize()} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f} ({time_epoch:.4f} seconds) \n')

            print(f'{phase.capitalize()} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f} ({time_epoch:.4f} seconds)')

            if phase == 'train':
                train_loss_list.append(epoch_loss)
                train_acc_list.append(epoch_acc)
            else:
                val_loss_list.append(epoch_loss)
                val_acc_list.append(epoch_acc)

            if phase == 'val' and epoch_loss < best_loss:
                best_loss = epoch_loss
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())

        time_epoch = time.time() - since

        f.write(f'Time: {time_epoch:.0f}s\n')
        f.write('\n')

        print(f'Time: {time_epoch:.0f}s')
        print('\n')

    time_elapsed = time.time() - since
    f.write(f'Training complete in {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s\n')
    f.write(f'Number of epochs: {num_epochs}. Batch size: {batch_size}\n')
    f.write(f'Best val loss: {best_loss:.4f} Best val acc: {best_acc:.4f}\n')

    print(f'Training complete in {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s')
    print(f'Best val loss: {best_loss:.4f} Best val acc: {best_acc:.4f}')

    # Save the confusion matrix
    y_true, y_pred = evaluate_model(model, dataloaders['val'], device=device)
    # Confusion matrix
    conf_mat_val = metrics.confusion_matrix(y_true, y_pred)
    f.write(f'\nConfusion Matrix:\n{conf_mat_val}\n')

    # print(f'Confusion Matrix:\n{conf_mat_val}')

    # Classification report 
    class_rep_val = generate_classification_report(model, dataloaders['val'],basic_parameters.get('class_names'), device)    
    f.write(f'\nClassification report:\n{class_rep_val}\n')


    f.close()

    # Save the plot
    plt.figure()
    plot_confusion_matrix(conf_mat_val, classes=basic_parameters.get('class_names'))
    plt.savefig(f'{result_dir}/{model_name}_fold_{fold}_cf_mat.pdf')

    #Plota gráfico de loss e accuracy por epoch
    plot_loss_accuracy(train_loss_list, val_loss_list, train_acc_list, val_acc_list, model_name, fold, result_dir)

    # load best model weights
    model.load_state_dict(best_model_wts)
    return model

In [None]:
# Verifica se a GPU está disponível
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# print('\nDevice: {0}'.format(device))
# print(torch.cuda.get_device_name(0))
# !nvidia-smi

SEED = 42

random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED)
torch.backends.cudnn.benchmark = True
torch.backends.cudnn.deterministic = True

# Define o caminho base do diretório de imagens divididas 
base_dir = r'data/all_classes/'

basic_parameters = {
    'num_classes' : 5,
    'class_names': ['healthy', 'rust_level_1','rust_level_2','rust_level_3','rust_level_4'],
    'batch_size' : 32,
    'lr' : 0.001, # Taxa de aprendizado
    'mm' : 0.9, # Mommentum
    'epochs' : 3,
    'model_name' : "alexnet", # Models to choose from [resnet, alexnet, vgg, squeezenet, densenet, inception]
    'criterion' : nn.CrossEntropyLoss() # Função de perda
}



model_ft, input_size = initialize_model(basic_parameters.get('model_name'), basic_parameters.get('num_classes'))

data_transforms = {
    'train': transforms.Compose([
        transforms.RandomResizedCrop(input_size),
        transforms.RandomHorizontalFlip(),
        transforms.RandomVerticalFlip(),
        transforms.RandomRotation(30),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'val': transforms.Compose([
        transforms.Resize(input_size),
        transforms.CenterCrop(input_size),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
}

image_datasets = datasets.ImageFolder(base_dir, data_transforms['train'])

# Pretrainned
model_ft = model_ft.to(device)  

date_now = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")

kf = KFold(n_splits=2, shuffle=True, random_state=SEED)
folds = kf.split(image_datasets)

for fold, (train_idx, val_idx) in enumerate(folds):
    

    print(f'FOLD {fold}')

    train_dataset = torch.utils.data.Subset(image_datasets, train_idx)
    val_dataset = torch.utils.data.Subset(image_datasets, val_idx)

    train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=basic_parameters.get('batch_size'), shuffle=True, num_workers=4)
    val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=basic_parameters.get('batch_size'), shuffle=True, num_workers=4)

    dataloaders_dict = {'train': train_loader, 'val': val_loader}

    # Reiniciar o modelo em cada fold
    model_ft, input_size = initialize_model(basic_parameters.get('model_name'), basic_parameters.get('num_classes'))
    model_ft = model_ft.to(device)
    # Imprime o modelo
    # print(f'Model: {str(model_ft)}')
    
    # Otimizador
    optimizer = optim.SGD(model_ft.parameters(), lr=basic_parameters.get('lr'), momentum=basic_parameters.get('mm'))

    model_ft = train_model(model_ft, dataloaders_dict, optimizer, basic_parameters, fold,date_now, device)
    
    