<a href="https://colab.research.google.com/github/DAG00DAG33/ML-Languages/blob/master/Pytorch/PytorchCheatSheet.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:


import torch #
import torch.nn as nn #
import torch.optim as optim #

import numpy as np

from PIL import Image #
import matplotlib.pyplot as plt #

import torchvision.transforms as transforms #


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

#Load Data


## Load image

In [0]:
imsize = 512
loader = transforms.Compose([
    transforms.Resize(imsize),  # scale imported image
    transforms.ToTensor()])  # transform it into a torch tensor
    #transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))


def image_loader(image_name):
    image = Image.open(image_name)
    # fake batch dimension required to fit network's input dimensions
    image = loader(image).unsqueeze(0)
    return image.to(device, torch.float)

##Data pipeline

###de archivo, CIFAR10

In [0]:
transform = transforms.Compose(
    [transforms.ToTensor(),
     transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

trainset = torchvision.datasets.CIFAR10(root='./data', train=True,
                                        download=True, transform=transform)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=4,
                                          shuffle=True, num_workers=2)

testset = torchvision.datasets.CIFAR10(root='./data', train=False,
                                       download=True, transform=transform)
testloader = torch.utils.data.DataLoader(testset, batch_size=4,
                                         shuffle=False, num_workers=2)

###De base np array

Hay que ver como unir al transformer para data augmentation

In [0]:
#Cambio de los ejes para Chanel first (-1, 3, 64, 64)
X_train = np.moveaxis(X_train, 3, 1)
X_test = np.moveaxis(X_test, 3, 1)

In [0]:
tensor_x_train = torch.stack([torch.Tensor(X_train[i]) for i in range(X_train.shape[0])]) # transform to torch tensors
tensor_y_train = torch.stack([torch.Tensor(Y_train[i]) for i in range(Y_train.shape[0])])

my_train_dataset = torch.utils.data.TensorDataset(tensor_x_train,tensor_y_train) # create your datset
trainloader = torch.utils.data.DataLoader(my_train_dataset, batch_size=BATCH_SIZE, shuffle=True) # create your dataloader


tensor_x_test = torch.stack([torch.Tensor(X_test[i]) for i in range(X_test.shape[0])]) # transform to torch tensors
tensor_y_test = torch.stack([torch.Tensor(Y_test[i]) for i in range(Y_test.shape[0])])

my_test_dataset = torch.utils.data.TensorDataset(tensor_x_test,tensor_y_test) # create your datset
testloader = torch.utils.data.DataLoader(my_test_dataset, batch_size=BATCH_SIZE, shuffle=True) # create your dataloader

In [0]:
dataiter = iter(trainloader)
images, labels = dataiter.next()

# Show image

In [0]:
unloader = transforms.ToPILImage()  # reconvert into PIL image

def imshow(tensor, title=None):
    plt.figure()
    image = tensor.cpu().clone()  # we clone the tensor to not do changes on it
    if len(image.shape) == 4:
      image = image.squeeze(0)      # remove the fake batch dimension if needed
    image = unloader(image)
    plt.imshow(image)
    if title is not None:
        plt.title(title)



imshow(images[0])

# Import VGG model

In [0]:
import torchvision.models as models

cnn = models.vgg19(pretrained=True).features.to(device).eval()

"""Hi, These two have different goals: model.eval() will notify all your layers that you are in eval mode, that way, batchnorm or dropout layers will work in 
eval mode instead of training mode. torch.no_grad() impacts the autograd engine and deactivate it. It will reduce memory usage and speed up"""

'Hi, These two have different goals: model.eval() will notify all your layers that you are in eval mode, that way, batchnorm or dropout layers will work in \neval mode instead of training mode. torch.no_grad() impacts the autograd engine and deactivate it. It will reduce memory usage and speed up'

#Model

In [0]:
class Flatten(nn.Module):
    def forward(self, input):
        return input.view(input.size(0), -1)

In [0]:
class Reshape(nn.Module):
    def __init__(self, *args):
        super(Reshape, self).__init__()
        self.shape = args

    def forward(self, x):
        return x.view(self.shape)

#Ejemplo de como ponerlo:
#Reshape(-1,240,4,4) 

In [0]:
class Net(nn.Module):
    def __init__(self, num_classes, dropout_prob=0.3):
        super(Net, self).__init__()
        self.pipe = nn.Sequential(
            nn.BatchNorm2d(3),
            nn.Conv2d(in_channels=3, out_channels=8,
                      kernel_size=5, stride=2, padding=1),
            nn.ReLU(),
            nn.Conv2d(in_channels=8, out_channels=16,
                      kernel_size=5, stride=2, padding=1),
            Flatten(),
            nn.ReLU(),
            nn.BatchNorm1d(3840)
            nn.Linear(3600, 20),
            nn.ReLU(),
            nn.Linear(20, num_classes),
            nn.Dropout(p=dropout_prob),
            #nn.Sigmoid(),
            nn.Softmax(dim=1)
        )

    def forward(self, x):
        return self.pipe(x)

net = Net(6).to(device)
net(images.to(device))


## summary

In [0]:
from torchsummary  import summary
summary(net, (3,64,64))  #summary(model, (inputShape))

#Load model

In [0]:
CHK_PATH = '/content/drive/My Drive/ML/Lenguajes/Pytorch/checkpoints/'
cuda = torch.cuda.is_available()
if cuda:
    checkpoint = torch.load(CHK_PATH + 'checkpoint.pth.tar')
else:
    # Load GPU model on CPU
    checkpoint = torch.load(resume_weights,
                            map_location=lambda storage,
                            loc: storage)
start_epoch = checkpoint['epoch']
best_epochLoss = checkpoint['best_epochLoss']
encoder.load_state_dict(checkpoint['encoder_dict'])
decoder.load_state_dict(checkpoint['decoder_dict'])
losses = checkpoint['losses']
print("=> loaded checkpoint '{}' (trained for {} epochs)".format("a", checkpoint['epoch']))

#Train

##Basic training

In [0]:

"""CrossEntropyLoss does not expect a one-hot encoded vector as the target, but class indices:

The input is expected to contain scores for each class.
input has to be a 2D Tensor of size (minibatch, C).
This criterion expects a class index (0 to C-1) as the target for each value of a 1D tensor of size minibatch#Train"""

lossFunc = nn.CrossEntropyLoss()
#lossFunc = nn.BCELoss()
#lossFunc = nn.MSELoss()
optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9)
#optimizer = torch.optim.Adam(net.parameters(), lr=1e-4)


def train(model, trainloader, optimizer, epochs, device):
    #model.train()
    n=3
    running_loss = 0.0
    losses = []
    for epoch in range(epochs):
      for i, data in enumerate(trainloader, 0):
          # get the inputs
          images=data[0].to(device)
          labels=data[1].to(device)

          #labels = labels.to(device=device, dtype=torch.int64)
          optimizer.zero_grad()

          outputs = model(images)
          loss = lossFunc(outputs, labels)

          loss.backward()

          optimizer.step()

          running_loss += loss.item()
          if i % n == n-1: 
              print('[%d, %5d] loss: %.3f' %
                    (epoch + 1, (i + 1)*BATCH_SIZE, running_loss / BATCH_SIZE*n))
              losses.append(running_loss / BATCH_SIZE*n)
              running_loss = 0.0
    return losses


losses = train(net,trainloader, optimizer, 10, device)

##Long training

In [0]:
lossFunc = nn.MSELoss()
#lossFunc = nn.BCELoss()
#optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9)
enc_optimizer = torch.optim.Adam(encoder.parameters(), lr=1e-4)
dec_optimizer = torch.optim.Adam(decoder.parameters(), lr=1e-4)


def train(encoder, decoder, enc_optimizer, dec_optimazer, trainloader, epochs, device, losses=[], start_epoch=0, best_epochLoss=99999):
    #model.train()
    n=4
    running_loss = 0.0
    for epoch in range(start_epoch, epochs + start_epoch):
      epochLoss = 0
      for i, data in enumerate(trainloader, 0):
          # get the inputs
          images = data[0].to(device) 
          #labels = labels.to(device=device, dtype=torch.int64)
          enc_optimizer.zero_grad()
          dec_optimizer.zero_grad()

          enc = encoder(images)
          pre = decoder(enc)

          loss = lossFunc(images, pre)

          loss.backward()

          enc_optimizer.step()
          dec_optimizer.step()

          running_loss += loss.item()
          if i % n == n-1: 
              print('[%d, %5d] loss: %.7f' %
                    (epoch + 1, (i + 1)*BATCH_SIZE, running_loss / (BATCH_SIZE*n)))
              losses.append(running_loss / (BATCH_SIZE*n))
              epochLoss+=running_loss
              running_loss = 0.0

      if epoch==0 : best_epochLoss = epochLoss 
      if epochLoss < best_epochLoss: best_epochLoss=epochLoss
    return epochs + start_epoch, best_epochLoss, losses


start_epoch, best_epochLoss, losses = train(encoder, decoder, enc_optimizer, dec_optimizer, trainloader, 50, 
               device,losses=losses, start_epoch=start_epoch, best_epochLoss=best_epochLoss)

#Save Model

In [0]:
CHK_PATH = '/content/drive/My Drive/ML/Lenguajes/Pytorch/checkpoints/'
def save_checkpoint(state, is_best, filename=CHK_PATH + 'checkpoint.pth.tar'):
    """Save checkpoint if a new best is achieved"""
    if is_best:
        print ("=> Saving a new best")
        torch.save(state, filename)  # save checkpoint
    else:
        print ("=> Loss did not improve")

Dentro de la funcion de entrenamiento por ejemplo cada epoch

In [0]:

      if epoch==0 : best_epochLoss = epochLoss 
      is_best = bool(epochLoss < best_epochLoss)
      # best_epochLoss = torch.FloatTensor(max(epochLoss.numpy(), best_epochLoss.numpy()))
      if epochLoss < best_epochLoss: best_epochLoss=epochLoss
      save_checkpoint({
          'epoch': start_epoch + epoch + 1,
          'encoder_dict': encoder.state_dict(),
          'decoder_dict': decoder.state_dict(),
          'best_epochLoss': best_epochLoss,
          'losses': losses
      }, is_best)
      print("[" + str(epochLoss) + "," + str(best_epochLoss) + "]")

#Test

In [0]:
def test(model, testloader, device, display=False):
  model.eval()
  correct = 0
  total = 0
  with torch.no_grad():
      for data in testloader:
          images, labels = data
          outputs = model(images.to(device))
          _, predicted = torch.max(outputs.data, 1)
          _, correctLabel = torch.max(labels.to(device), 1)
          total += labels.size(0)
          correct += (predicted == correctLabel).sum().item()
  if display:
    print('Accuracy of the network on the test images: %d %%' % (
        100 * correct / total))
  return correct/total
  
test(net, testloader, device, display=True)