In [None]:
#Importar las librerias necesarias
import helper
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from timeit import default_timer as timer
from tqdm.auto import tqdm

import torch 
from torch import nn
from torchvision import datasets, transforms, utils
from torch.utils.data import Dataset, DataLoader
from ignite.handlers.param_scheduler import create_lr_scheduler_with_warmup
from torch.optim.lr_scheduler import CosineAnnealingLR


In [None]:
#Utilizar GPU
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# **PREPARACIÓN DATASETS**

In [None]:
#NORMALIZAR INPUTS modelo 1 y 2
transform = transforms.Compose([transforms.ToTensor(),
                                transforms.Normalize((0,), (1,))])

# #DESCARGAR DATASETS
train_set = datasets.FashionMNIST('~/.pytorch/F_MNIST_data/', download=True, train=True, transform = transform)

test_set = datasets.FashionMNIST('~/.pytorch/F_MNIST_data/', download=True, train=False, transform = transform)


# #CREAR DATALOADERS
batch_size = 32

trainloader = torch.utils.data.DataLoader(train_set, batch_size=batch_size, shuffle=True)
testloader = torch.utils.data.DataLoader(test_set, batch_size=batch_size, shuffle=True)

# **EXPLORACIÓN DE LOS DATASETS**

In [None]:
##Informacion del dataset

train_dataiter = iter(trainloader)
train_images, train_labels = next(train_dataiter)

test_dataiter = iter(testloader)
test_images, test_labels = next(test_dataiter)

print("Training set")
print("")
print(train_set)
print("")
print("Dimensiones mini-batch")
print(train_images.shape)
print(train_labels.shape)

print("")
print("")
print("Test set")
print("")
print(test_set)
print("")
print("Dimensiones mini-batch")
print(test_images.shape)
print(test_labels.shape)

#Dataloaders info
print("")
print("Dataloaders lenght:")
print(f"trainloader: {len(trainloader)} batches")
print(f"testloader: {len(testloader)} batches")

#Clases
print("")
print("------------CLASES------------")
classes = train_set.class_to_idx
classNames = train_set.classes
classes

In [None]:
#Visualizar imagenes del set
figure = plt.figure(figsize=(8, 8))
cols, rows = 2, 2
for i in range(1, cols * rows + 1):
    sample_idx = torch.randint(0, len(train_set), size=[1]).item() #numero aleatorio
    img, label = train_set[sample_idx]
    figure.add_subplot(rows, cols, i)
    plt.title(classNames[label] + f", label: {label}")
    plt.axis("off")
    plt.imshow(img.squeeze(), cmap="gray")
plt.show()

In [None]:
#Funcion precision

def accuracy_fn(y_true, y_pred):
    correct = torch.eq(y_true, y_pred).sum().item()
    accuracy = (correct / len(y_pred)) * 100
    return accuracy

In [None]:
#TRAIN AND TEST STEP (iteraciones)

def train_step(model, dataloader, optimizer, loss_fn, accuracy_fn, device, scheduler):
    
    train_loss, train_acc = 0,0
    
    model.to(device)
    
    for batch, (X,y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)
        
        #forward pass
        y_pred = model(X)
        
        #Perdida per batch
        loss = loss_fn(y_pred,y)
        train_loss += loss #acumula la perdida
        train_acc += accuracy_fn(y_true = y, y_pred =y_pred.argmax(dim=1))
        
        #optimizador gradientes acumulados a cero
        optimizer.zero_grad()
        
        loss.backward()
            
        optimizer.step()
        
        if batch % 256 == 0:
            print(f"Revisados {batch*len(X)}/{len(dataloader.dataset)} samples")
        
    #perdida y precision promedio por batch
    train_loss /= len(dataloader)
    train_acc /= len(dataloader)
    
    print("")
    print(f"Train Loss {train_loss:.5f} | Train Acc: {train_acc:.5f}%")
    
    if scheduler != None:
        print("LR antes: " + str(optimizer.param_groups[0]['lr']))
        scheduler(None) #actualizar LR
        print("LR despues: " + str(optimizer.param_groups[0]['lr']))
        
    return train_loss,train_acc
    
def test_step(model, dataloader,loss_fn, accuracy_fn, device):
    
    test_loss, test_acc = 0,0
    
    model.to(device)
    
    model.eval() #poner el modelo en modo evaluacion
    
    #context manager
    with torch.inference_mode():
        for X,y in dataloader:
            X, y = X.to(device), y.to(device)
            
            #forward pass
            test_pred = model(X)
            
            #perdida y precision
            test_loss += loss_fn(test_pred, y)
            test_acc += accuracy_fn(y_true = y, y_pred=test_pred.argmax(dim=1))
            
        #perdida y precision promedio por batch
        test_loss /= len(dataloader)
        test_acc /= len(dataloader)
        
        print("")
        print(f"Test Loss {test_loss:.5f} | Test Acc: {test_acc:.5f}%")
        
    return test_loss,test_acc

In [None]:
#Función para correr el modelo
def run_model(model, trainloader, testloader, loss_function, optimizer, accuracy_fn, device, epochs, scheduler):
    train_time_start_model = timer() #captar tiempo inicial
    
    epoch_train_loss = []
    epoch_train_acc = []
    epoch_test_loss = []
    epoch_test_acc = []
    
    for epoch in tqdm(range(epochs)):
        print(f"Epoch: {epoch+1}\n")

        epoch_loss, epoch_acc = train_step(model = model,dataloader=trainloader,loss_fn=loss_function,optimizer=optimizer, accuracy_fn=accuracy_fn, device=device, scheduler=scheduler)
        epoch_train_loss.append(epoch_loss)
        epoch_train_acc.append(epoch_acc)
        
        epoch_loss, epoch_acc = test_step(model=model, dataloader=testloader,loss_fn=loss_function,accuracy_fn=accuracy_fn, device=device)
        epoch_test_loss.append(epoch_loss)
        epoch_test_acc.append(epoch_acc)
        
    train_time_end_model = timer() #captar tiempo final
    total_time = train_time_end_model - train_time_start_model
    print(f"Tiempo de entrenamiento en {str(device)}: {total_time:.3f} segundos")
    
    return  epoch_train_loss,epoch_train_acc,epoch_test_loss,epoch_test_acc 

In [None]:
#EVALUAR EL MODELO
def evaluate_model(model: torch.nn.Module, data_loader: torch.utils.data.DataLoader, loss_function: torch.nn.Module, accuracy_fn, device=device):
    loss, acc = 0,0
    model.eval()
    with torch.inference_mode():
        
        for X, y in tqdm(data_loader):
            X, y = X.to(device), y.to(device)
            y_pred = model(X)
            
            loss += loss_function(y_pred, y)
            acc += accuracy_fn(y, y_pred.argmax(dim=1))
            
        loss /= len(data_loader)
        acc /= len(data_loader)
        
    return {"model_name": model.__class__.__name__,"model_loss": loss.item(),"model_acc": acc}


In [None]:
#Graficar funcion de perdida y de precision

def plot_loss(epoch_train_loss, epoch_test_loss):
    
    
    epoch_train_loss_temp = [x.cpu().detach().numpy() for x in epoch_train_loss]

    epoch_test_loss_temp = [x.cpu().detach().numpy() for x in epoch_test_loss]

    plt.plot(np.arange(len(epoch_train_loss_temp)), epoch_train_loss_temp, 'r', label = "Training loss")
    plt.plot(np.arange(len(epoch_test_loss_temp)), epoch_test_loss_temp, 'b', label = "Test loss")
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.show()
    
def plot_acc(epoch_train_acc, epoch_test_acc):
    plt.plot(np.arange(len(epoch_train_acc)), epoch_train_acc, 'r', label="Training accuracy")
    plt.plot(np.arange(len(epoch_test_acc)), epoch_test_acc, 'b', label="Test accuracy")
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()
    plt.show()

# **ARQUITECTURA 1**

In [None]:
#MODELO 1

class ModelV1(nn.Module):
    def __init__(self, input_shape, hidden_units, output_shape):
        super().__init__()
        self.conv_block_1 = nn.Sequential(
            nn.Conv2d(in_channels = input_shape,
                     out_channels = hidden_units,
                     kernel_size=3,
                     stride=1,
                     padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size = 2))
        
        self.conv_block_2 = nn.Sequential(
            nn.Conv2d(in_channels = hidden_units,
                     out_channels = hidden_units,
                     kernel_size=3,
                     stride=1,
                     padding=1),
            nn.ReLU(),
            nn.Conv2d(in_channels = hidden_units,
                     out_channels = hidden_units,
                     kernel_size=3,
                     stride=1,
                     padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size = 2)
            )
        
        self.conv_block_3 = nn.Sequential(
            nn.Conv2d(in_channels = hidden_units,
                     out_channels = hidden_units,
                     kernel_size=3,
                     stride=1,
                     padding=1),
            nn.ReLU(),
            nn.Conv2d(in_channels = hidden_units,
                     out_channels = hidden_units,
                     kernel_size=3,
                     stride=1,
                     padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size = 2)
            )      

        
        #fully connected
        
        self.classifier = nn.Sequential(
            nn.Flatten(),#transformar output en un solo vector
            nn.Linear(in_features=hidden_units*9,
                    out_features = output_shape),
            nn.LogSoftmax(1)
            )
        
    
    def forward(self, x):
        x= self.conv_block_1(x)
        #print(x.shape) 
        x= self.conv_block_2(x)
        #print(x.shape) evaluar dimensiones del output del bloque convolucional al inicio
        x= self.conv_block_3(x)
        #print(x.shape)
        x = self.classifier(x)
        #print(x.shape)
        #print(x)
        return x
        

In [None]:
#Hiperparámetros modelo 1 (88.75)
model_1 = ModelV1(input_shape = 1,
                hidden_units = 15,
               output_shape = len(classNames))

loss_function = nn.NLLLoss()
optimizer = torch.optim.SGD(params = model_1.parameters(), lr=0.01, momentum=0.9)
epochs = 6

#EJECUTAR MODELO
epoch_train_loss,epoch_train_acc,epoch_test_loss,epoch_test_acc = run_model(model_1, trainloader, testloader, loss_function, optimizer, accuracy_fn, device, epochs, scheduler = None)

In [None]:
#GRAFICAR LOSS Y ACC
plot_loss(epoch_train_loss,epoch_test_loss)

plot_acc(epoch_train_acc,epoch_test_acc)

In [None]:
#OBTENER RESULTADOS
model_1_results = evaluate_model(model_1, testloader, loss_function, accuracy_fn, device)

# **ARQUITECTURA 2**

In [None]:
#inicializacion de xavier (normal)
def weights_init(m):
    if isinstance(m, nn.Conv2d):
        torch.nn.init.xavier_normal_(m.weight)
        torch.nn.init.zeros_(m.bias)
        
        
#CREAR DATALOADERS V2
batch_size = 64

trainloader = torch.utils.data.DataLoader(train_set, batch_size=batch_size, shuffle=True)
testloader = torch.utils.data.DataLoader(test_set, batch_size=batch_size, shuffle=True)


In [None]:
#MODELO 2
class ModelV2(nn.Module):
    def __init__(self, input_shape, hidden_units, output_shape):
        super().__init__()
         #inicializar pesos 
        self.apply(weights_init)
        
        self.conv_block_1 = nn.Sequential(
            nn.Conv2d(in_channels = input_shape,
                     out_channels = hidden_units,
                     kernel_size=3,
                     stride=1,
                     padding=1),
            nn.BatchNorm2d(hidden_units),
            nn.LeakyReLU(),
            nn.MaxPool2d(kernel_size = 2),
            nn.Dropout2d(p=0.5)
            )
            
        
        self.conv_block_2 = nn.Sequential(
            nn.Conv2d(in_channels = hidden_units,
                     out_channels = hidden_units,
                     kernel_size=3,
                     stride=1,
                     padding=1),
            nn.BatchNorm2d(hidden_units),
            nn.LeakyReLU(),
            nn.Conv2d(in_channels = hidden_units,
                     out_channels = hidden_units,
                     kernel_size=3,
                     stride=1,
                     padding=1),
            nn.BatchNorm2d(hidden_units),
            nn.LeakyReLU(),
            nn.MaxPool2d(kernel_size = 2),
            nn.Dropout2d(p=0.5)
            )
        
        self.conv_block_3 = nn.Sequential(
            nn.Conv2d(in_channels = hidden_units,
                     out_channels = hidden_units,
                     kernel_size=3,
                     stride=1,
                     padding=1),
            nn.BatchNorm2d(hidden_units),
            nn.LeakyReLU(),
            nn.Conv2d(in_channels = hidden_units,
                     out_channels = hidden_units,
                     kernel_size=3,
                     stride=1,
                     padding=1),
            nn.BatchNorm2d(hidden_units),
            nn.LeakyReLU(),
            nn.MaxPool2d(kernel_size = 2),
            nn.Dropout2d(p=0.5)
            )

        
        #fully connected
        
        self.classifier = nn.Sequential(
            nn.Flatten(),#transformar output en un solo vector
            nn.Linear(in_features=hidden_units*9,
                    out_features = hidden_units),
            nn.BatchNorm1d(hidden_units),
            nn.LogSoftmax(1)
            )
        
    
    def forward(self, x):
        x= self.conv_block_1(x)
        #print(x.shape) 
        x= self.conv_block_2(x)
        #print(x.shape)
        x= self.conv_block_3(x)
        #print(x.shape)
        x = self.classifier(x)
        #print(x.shape)
        #print(x)
        return x

In [None]:
#Hiperparámetros modelo 2 (91,28%)
model_2 = ModelV2(input_shape = 1,
                hidden_units = 15,
               output_shape = len(classNames))

loss_function = nn.NLLLoss()
epochs = 10

#LEARNING RATE WARMUP
initial_lr = 0.01
warmup_iteration = 5
warmup_initial_lr = 1e-5

optimizer = torch.optim.Adam(params = model_2.parameters(), lr=initial_lr, betas=(0.9,0.999))
lr_scheduler = create_lr_scheduler_with_warmup(CosineAnnealingLR(optimizer, T_max=epochs-warmup_iteration),
                                               warmup_start_value=warmup_initial_lr,
                                               warmup_duration=warmup_iteration,
                                               warmup_end_value=initial_lr)



epoch_train_loss,epoch_train_acc,epoch_test_loss,epoch_test_acc = run_model(model_2, trainloader, testloader, loss_function, optimizer, accuracy_fn, device, epochs, scheduler = lr_scheduler)

In [None]:
#GRAFICAR LOSS Y ACC
plot_loss(epoch_train_loss,epoch_test_loss)

plot_acc(epoch_train_acc,epoch_test_acc)

In [None]:
# #OBTENER RESULTADOS
model_2_results = evaluate_model(model_2, testloader, loss_function, accuracy_fn, device)

# **ARQUITECTURA 3**

In [None]:
# #ALTERACIÓN DE INPUTS 

# #PREPARACIÓN PARA DATA AUGMENTATION
# transform1 = transforms.Compose([transforms.ToTensor(),    
#                                 transforms.ColorJitter(brightness=(0.6,1.4), hue=0.2, saturation=(0.6,1.4) ),
#                                 transforms.RandomHorizontalFlip(p=0.5),
#                                 transforms.Normalize((0,), (1,))])
# #NORMALIZACIÓN INPUTS
# transform2 = transforms.Compose([transforms.ToTensor(),
#                                 transforms.Normalize((0,), (1,))])


# #DESCARGAR DATASETS
# train_norm = datasets.FashionMNIST('~/.pytorch/F_MNIST_data/', download=True, train=True, transform = transform2)
# train_augm = datasets.FashionMNIST('~/.pytorch/F_MNIST_data/', download=True, train=True, transform = transform1)
# train_set = torch.utils.data.ConcatDataset([train_norm,train_augm]) #Incrementar el tamaño del trainset al doble

# test_set = datasets.FashionMNIST('~/.pytorch/F_MNIST_data/', download=True, train=False, transform = transform2)



In [None]:
#inicializacion de xavier (normal)
def weights_init(m):
    if isinstance(m, nn.Conv2d):
        torch.nn.init.xavier_normal_(m.weight)
        torch.nn.init.zeros_(m.bias)
        
        
#CREAR DATALOADERS V3
batch_size = 128

trainloader = torch.utils.data.DataLoader(train_set, batch_size=batch_size, shuffle=True)
testloader = torch.utils.data.DataLoader(test_set, batch_size=batch_size, shuffle=True)


In [None]:
#Modelo 3
class ModelV3(nn.Module):
    def __init__(self, input_shape, hidden_units, output_shape):
        super().__init__()
         #inicializar pesos 
        self.apply(weights_init)
        
        self.conv_block_1 = nn.Sequential(
            nn.Conv2d(in_channels = input_shape,
                     out_channels = hidden_units,
                     kernel_size=3,
                     stride=1,
                     padding=1),
            nn.BatchNorm2d(hidden_units),
            nn.LeakyReLU(),
            nn.MaxPool2d(kernel_size = 2),
            nn.Dropout2d(p=0.6)
            )
            
        
        self.conv_block_2 = nn.Sequential(
            nn.Conv2d(in_channels = hidden_units,
                     out_channels = hidden_units,
                     kernel_size=3,
                     stride=1,
                     padding=1),
            nn.BatchNorm2d(hidden_units),
            nn.LeakyReLU(),
            nn.Conv2d(in_channels = hidden_units,
                     out_channels = hidden_units,
                     kernel_size=3,
                     stride=1,
                     padding=1),
            nn.BatchNorm2d(hidden_units),
            nn.LeakyReLU(),
            nn.MaxPool2d(kernel_size = 2),
            nn.Dropout2d(p=0.6)
            )
        
        
        #fully connected
        
        self.classifier1 = nn.Sequential(
            nn.Flatten(),#transformar output en un solo vector
            nn.Linear(in_features=hidden_units+960,
                    out_features = hidden_units),
            nn.BatchNorm1d(hidden_units),
            nn.Dropout1d(p=0.7),
            nn.LeakyReLU()
            )
        self.classifier2 = nn.Sequential(
            nn.Linear(in_features=hidden_units,
                    out_features = hidden_units),
            nn.LogSoftmax(1)
            )

    
    def forward(self, x):
        x= self.conv_block_1(x)
        #print(x.shape) 
        x= self.conv_block_2(x)
        #print(x.shape)
#       x= self.conv_block_3(x)
        #print(x.shape)
        x = self.classifier1(x)
        x = self.classifier2(x)
        #print(x.shape)
        #print(x)
        return x


In [None]:
#Hiperparámetros modelo 3 ()
model_3 = ModelV3(input_shape = 1,
                hidden_units = 20,
               output_shape = len(classNames))

loss_function = nn.NLLLoss()
epochs = 15

#LEARNING RATE WARMUP
initial_lr = 0.01
warmup_iteration = 7
warmup_initial_lr = 1e-5

optimizer = torch.optim.Adam(params = model_3.parameters(), lr=initial_lr, betas=(0.9,0.999))
lr_scheduler = create_lr_scheduler_with_warmup(CosineAnnealingLR(optimizer, T_max=epochs-warmup_iteration),
                                               warmup_start_value=warmup_initial_lr,
                                               warmup_duration=warmup_iteration,
                                               warmup_end_value=initial_lr)



epoch_train_loss,epoch_train_acc,epoch_test_loss,epoch_test_acc = run_model(model_3, trainloader, testloader, loss_function, optimizer, accuracy_fn, device, epochs, scheduler = lr_scheduler)

In [None]:
#GRAFICAR LOSS Y ACC
plot_loss(epoch_train_loss,epoch_test_loss)

plot_acc(epoch_train_acc,epoch_test_acc)

In [None]:
#OBTENER RESULTADOS
model_3_results = evaluate_model(model_3, testloader, loss_function, accuracy_fn, device)

# **COMPARACIÓN DE LOS MODELOS**

In [None]:
compare_results = pd.DataFrame([model_1_results, model_2_results, model_3_results])

graph = compare_results.set_index("model_name")["model_acc"].plot(kind="barh")
plt.xlabel("Presición (%)")
plt.ylabel("Modelo")

graph.set_xscale('linear')
graph.set_xlim([0, 100])