## Perez Llera Leonardo
## 6BM1

In [None]:
# Importamos paquetes
%matplotlib inline
%config InlineBackend.figure_format = 'retina'

import matplotlib.pyplot as plt
import numpy as np

import torch
import torch.nn as nn
import torch.nn.functional as F

from torch import optim
from torchvision import datasets, transforms
from torch.autograd import Variable

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

In [None]:
# Hiper parámetros
batchSize = 64
epochs = 25
learningRate = 0.001
steps = 0
runningLoss = 0
printEvery = 40

In [None]:
# Definimos una transformación de los datos
transform = transforms.Compose([transforms.Resize(32), # escalar a 32
                                transforms.ToTensor(), # convertir a tensores
                                transforms.Normalize([0.5], [0.5])]) # normalizar a media y desv std

# Descargamos el conjunto de entrenamiento y cargamos mediante un dataLoader
trainset = datasets.FashionMNIST('F_MNIST_data/', download=True, train=True, transform=transform)
trainloader = torch.utils.data.DataLoader(trainset, batchSize, shuffle=True)

# Descargamos el conjunto de validación
testset = datasets.FashionMNIST('F_MNIST_data/', download=True, train=False, transform=transform)
testloader = torch.utils.data.DataLoader(testset, batchSize, shuffle=True)

# Imprimir información estadística del conjunto de datos
print('Train data, number of images: ', len(trainset))
print('Test data, number of images: ', len(testset))

# Nombrar las clases de acuerdo al índice que tienen
classes = ['T-shirt/top', 'Trouser', 'Pullover', 'Dress', 'Coat', 
           'Sandal', 'Shirt', 'Sneaker', 'Bag', 'Ankle boot']
# Obtener un lote de ejemplos
dataiter = iter(trainloader)
images, labels = next(dataiter)
images = images.numpy()

displaySize = 10

# Graficar los ejemplos junto a las clases que le corresponden
fig = plt.figure(figsize=(25, 4))
for idx in np.arange(displaySize):
    ax = fig.add_subplot(2, displaySize, idx+1, xticks=[], yticks=[])
    ax.imshow(np.squeeze(images[idx]), cmap='gray')
    ax.set_title(classes[labels[idx]])

## Bloque residual
<img src=https://miro.medium.com/v2/resize:fit:786/format:webp/1*foG-iCktwwuQPfepfKyvUA.png>

In [None]:
convK3= lambda inChannel, outChannel, stride: nn.Conv2d(inChannel, outChannel, kernel_size=3, stride=stride, padding=1)

# Implementación del residualBlock
class residualBlock(nn.Module):
    def __init__(self, inChannel, outChannel, stride=1, changeSize=True):
        # Construimos la estructura del residualBlock
        super(residualBlock, self).__init__()
        
        self.conv1 = convK3(inChannel, outChannel, stride)
        self.bn1 = nn.BatchNorm2d(outChannel)
        self.conv2 =  convK3(outChannel, outChannel, 1)
        self.bn2 = nn.BatchNorm2d(outChannel)
        
        # Cambiar tamaño
        self.changeSize = changeSize
        if self.changeSize:
            self.residual = nn.Sequential(
                nn.Conv2d(inChannel, outChannel, kernel_size=1, stride=stride),
                nn.BatchNorm2d(outChannel)
            )
            
    def forward(self, x):
        identity = x if not self.changeSize else self.residual(x)
        y = F.relu(self.bn1(self.conv1(x)))
        y = self.bn2(self.conv2(y))
        y += identity
        
        return F.relu(y)

# Implementación de ResNet56
class ResNet56(nn.Module):
    def __init__(self, n=9, nClases=10):
        # Construimos la estructura de ResNet56
        super(ResNet56, self).__init__()
        
        self.conv1 = convK3(1, 16, stride=1)
        self.bn1 = nn.BatchNorm2d(16)
        self.block1 = self.createBlock(n=9, inChannel=16, outChannel=16, stride=1, changeSize=False)
        self.block2 = self.createBlock(n=9, inChannel=16, outChannel=32, stride=2)
        self.block3 = self.createBlock(n=9, inChannel=32, outChannel=64, stride=2)
        
        self.fc = nn.Linear(64, nClases)
        
    def createBlock(selft, n, inChannel, outChannel, stride, changeSize=True):
        block = [residualBlock(inChannel, outChannel, stride, changeSize=changeSize)]
        for i in range(n-1):
            block.append(residualBlock(outChannel, outChannel, stride=1, changeSize=False))
            
        return nn.Sequential(*block)
        
    
    def forward(self, x): # Definimos el pase frontal (forward pass)
        y = F.relu(self.bn1(self.conv1(x)))
        y = self.block1(y)
        y = self.block2(y)
        y = self.block3(y)
        y = F.adaptive_avg_pool2d(y, 1)
        y = self.fc(y.view(y.size(0), -1))
        
        return F.log_softmax(y, dim=1)

In [None]:
model = ResNet56() # Instanciar la red
model = model.to(device=device)
criterio = loss_fn = nn.CrossEntropyLoss() # Definir la función de costo (criterio de optimización)
optimizer = optim.Adam(model.parameters(), lr=learningRate) # Instanciar optimizador
# optimizer = optim.SGD(model.parameters(), lr=learningRate, momentum=0.95, weight_decay=1e-4) # Instanciar optimizador

print(model)

In [None]:
# Implementamos una función de evaluación
def validation(model, testloader, criterio):
    testLoss = 0
    accuracy = 0
    for images, labels in testloader:
        images = images.to(device=device)
        labels = labels.to(device=device)
    
        output = model.forward(images)
        testLoss += criterio(output, labels).item()

        ps = torch.exp(output)
        equality = (labels.data == ps.max(dim=1)[1])
        accuracy += equality.type(torch.FloatTensor).mean()
    
    return testLoss, accuracy

In [None]:
# Antes del descenso por gradiente y el entrenamiento 
# verificaremos la exactitud que tiene sin haber sido entrenada.

correct = 0
total = 0
for images, labels in testloader:
    images = images.to(device=device)
    labels = labels.to(device=device)

    images = Variable(images)
    
    outputs = model.forward(images)
    _, predicted = torch.max(outputs.data, 1)
    total += labels.size(0)
    correct += (predicted == labels).sum()
accuracy = 100 * correct.cpu().numpy() / total

print("Porcentaje de exactitud antes de entrenar:", accuracy)

In [None]:
# Descenso por gradiente
for epoch in range(epochs):
    model.train()  # Cambiamos a modo de entrenamiento
    for images, labels in trainloader:
        steps += 1
        optimizer.zero_grad()
        
        images = images.to(device=device)
        labels = labels.to(device=device)
        
        images, labels = Variable(images), Variable(labels)      
        
        output = model.forward(images)
        loss = criterio(output, labels)
        loss.backward() # Backpropagation
        optimizer.step() # Optimización
        
        runningLoss += loss.item()
        
        if steps % printEvery == 0:
            model.eval() # Cambiamos a modo de evaluación
            
            # Apagamos los gradientes, reduce memoria y cálculos
            with torch.no_grad():
                testLoss, accuracy = validation(model, testloader, criterio)
                
            print("Epoch: {}/{}.. ".format(epoch+1, epochs),
                  "Training Loss: {:.3f}.. ".format(runningLoss/printEvery),
                  "Test Loss: {:.3f}.. ".format(testLoss/len(testloader)),
                  "Test Accuracy: {:.3f}".format(accuracy/len(testloader)))
            
            runningLoss = 0
            model.train() # Regresamos a modo de entrenamiento

In [None]:
dataiter = iter(testloader)
images, labels = next(dataiter)

images = images.to(device=device)
labels = labels.to(device=device)
    
# get predictions
preds = np.squeeze(model(Variable(images, volatile=True)).data.max(1, keepdim=True)[1].cpu().numpy())
images = images.cpu().numpy()

# plot the images in the batch, along with predicted and true labels
fig = plt.figure(figsize=(25, 4))
for idx in np.arange(displaySize):
    ax = fig.add_subplot(2, displaySize, idx+1, xticks=[], yticks=[])
    ax.imshow(np.squeeze(images[idx]), cmap='gray')
    ax.set_title("{} ({})".format(classes[preds[idx]], classes[labels[idx]]),
                 color=("green" if preds[idx]==labels[idx] else "red"))