# Exercícios Normalização

## Carregando dados do google drive

In [None]:
from google.colab import drive
drive.mount._DEBUG = False
drive.mount('/content/drive', force_remount=True)

In [None]:
#Carregue os dados PKLotSegmented.tar.gz para seu Google Drive antes de executar
#Ajuste o path para apontar para o local do arquivo

!mkdir /content/datasets

!tar -xf "/content/drive/MyDrive/PKLotSegmented.tar.gz" -C "/content/datasets"
#Ou, se o .tar.gz já estiver carregado no seu ambiente:
!tar -xf "/content/PKLotSegmented.tar.gz" -C "/content/datasets"

!mv /content/datasets/Treino-UFPR04 /content/datasets/treino
!mv /content/datasets/Validação-UFPR05 /content/datasets/validacao
!mv /content/datasets/Teste-PUC /content/datasets/teste

In [None]:
import math
import copy

import torch
import torch.nn as nn
import torch.nn.functional as f
import torch.optim as optim
from torchvision import datasets, models, transforms
from torch.utils.data import DataLoader

## Dataloaders

In [None]:
img_dim = 32
b_size = 32

#Atenção: a função normalize do Pytorch considera a média e o desvio padrão, diferente da aula onde consideramos a média e a variância
#Na prática, a normalização aqui é equivalente a vista em aula
data_transforms = {
    "treino": transforms.Compose([
        transforms.Grayscale(num_output_channels=1),
        transforms.Resize((img_dim, img_dim)),
        transforms.ToTensor()
        #,transforms.Normalize(mean, std)
    ]),
    "validacao": transforms.Compose([
        transforms.Grayscale(num_output_channels=1),
        transforms.Resize((img_dim, img_dim)),
        transforms.ToTensor()
        #,transforms.Normalize(mean, std)
    ]),
    "teste": transforms.Compose([
        transforms.Grayscale(num_output_channels=1),
        transforms.Resize((img_dim, img_dim)),
        transforms.ToTensor()
        #,transforms.Normalize(mean, std)
    ]),
}

sets = ["treino", "validacao", "teste"]

data_dir = "/content/datasets"
image_datasets = {x: datasets.ImageFolder(root=f'{data_dir}/{x}', transform=data_transforms[x])
                  for x in sets}
dataloaders = {x: DataLoader(image_datasets[x], batch_size=b_size, shuffle=True, num_workers=2)
               for x in sets}

dataset_sizes = {x: len(image_datasets[x]) for x in sets}
class_names = image_datasets['treino'].classes

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

print("Conectado em um ambiente com", device)

## Função para calcular média e desvio padrão do batch
**Atenção, só vai funcionar para imagens em grayscale da forma que está aqui**

In [None]:
#full_batch_size deve receber o tamanho dos batches
def calcularMediaDesvio(loader, full_batch_size):
    mean = 0.0
    std = 0.0
    nb = 0
    for images, _ in loader:
        batch_size, num_channels, height, width = images.shape
        mean += images.mean().sum()
        std += images.var().sum() #calculando variâncias primeiro
        nb += batch_size / full_batch_size #Para compensar o último batch, que pode ter menos instâncias

    mean /= nb
    std /= nb #média das variâncias
    std = std.sqrt() #Só agora podemos tirar a raiz para calcular o stdev

    return mean, std


## Definindo arquitetura da rede

In [None]:
import torch.nn.functional as f
import torch.nn as nn
import importlib
import torch


#As imagens no dataset estão em escala de cinza
INPUT_SHAPE = (1, 32, 32)
NUM_CLASSES = 2

class custom_3_layer(nn.Module):
    def __init__(self):
        super(custom_3_layer, self).__init__()

        self.conv1 = nn.Conv2d(in_channels=INPUT_SHAPE[0], out_channels=32, kernel_size=3, stride=1, padding=1)
        self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2, padding=0)

        self.conv2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, stride=1, padding=0)
        self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2, padding=0)

        self.conv3 = nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, stride=1, padding=0)

        self.flatten = nn.Flatten()

        self.fc1 = nn.Linear(1600, 64)
        self.fc2 = nn.Linear(64, NUM_CLASSES)

    def convs(self, x):
        x = f.relu(self.conv1(x))
        x = self.pool1(x)
        x = f.relu(self.conv2(x))
        x = self.pool2(x)
        x = f.relu(self.conv3(x))

        return x


    def forward(self, x):
        x = self.convs(x)
        x = self.flatten(x)
        x = f.relu(self.fc1(x))
        x = self.fc2(x)

        return x

## Função de treino

In [None]:
# Implementa Early Stopping e Grace Period
def train_model(model, criterion, optimizer, max_epochs=10, grace_period = 3):
    best_loss = math.inf
    curr_grace_period = 0
    best_model = copy.deepcopy(model.state_dict())

    for epoch in range(max_epochs):
        print(f"Época {epoch+1}/{max_epochs}")
        print('-' * 10)

        for phase in ["treino", "validacao"]:
            if phase == "treino":
                model.train()  # Colocar o modelo em modo de treino
            else:
                model.eval()   # Colocar o modelo em modo de avaliação

            running_loss = 0.0
            running_corrects = 0

            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(device)
                labels = labels.to(device)

                optimizer.zero_grad()

                with torch.set_grad_enabled(phase == "treino"):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)

                    if phase == "treino":
                        loss.backward()
                        optimizer.step()

                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

            epoch_loss = running_loss / dataset_sizes[phase]
            epoch_acc = running_corrects.double() / dataset_sizes[phase]

            print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')
            print()

            if phase == "validacao":
              if epoch_loss < best_loss:
                best_loss = epoch_loss
                curr_grace_period = 0
                best_model = copy.deepcopy(model.state_dict())
              else:
                curr_grace_period += 1
                if curr_grace_period >= grace_period:
                  print("Early stopping")
                  model.load_state_dict(best_model)
                  return

    model.load_state_dict(best_model)
    return

# Faça você mesmo #1

Teste o treinamento e acurácia da rede no conjunto de testes com e sem normalização dos dados

In [None]:
import torch.optim as optim

#Treinamento
model = custom_3_layer()
model = model.to(device)

# Função de loss e o otimizador
criterion = nn.CrossEntropyLoss()
#optimizer_ft = optim.SGD(model_ft.parameters(), lr=0.001, momentum=0.9)
optimizer_ft = optim.Adam(model.parameters(), lr=0.005)

# Treinar o modelo
print("Treinando")
train_model(model, criterion, optimizer_ft, max_epochs=15, grace_period=5)

# Avaliar o modelo no conjunto de teste
print("Avaliando no conjunto de testes")
model.eval()
test_corrects = 0

for inputs, labels in dataloaders['teste']:
    inputs = inputs.to(device)
    labels = labels.to(device)

    outputs = model(inputs)
    _, preds = torch.max(outputs, 1)
    test_corrects += torch.sum(preds == labels.data)

test_acc = test_corrects.double() / dataset_sizes['teste']
print(f'Test Acc: {test_acc:.4f}')