<a href="https://colab.research.google.com/github/davidemichelon11/DL_Assignment/blob/main/main.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from os import makedirs
from os import listdir
from tqdm import tqdm
from google.colab import drive
import tensorflow as tf
import shutil

In [None]:
drive.mount('/content/gdrive')

In [None]:
!mkdir dataset
!cp "gdrive/My Drive/Adaptiope.zip" dataset/
!ls dataset
!unzip dataset/Adaptiope.zip
!rm -rf adaptiope_small

In [None]:
!mkdir adaptiope_small

In [None]:
classes = listdir("Adaptiope/product_images")
classes = ["/backpack", "/bookcase", "/car jack", "/comb", "/crown", "/file cabinet", "/flat iron", "/game controller", "/glasses",
           "/helicopter", "/ice skates", "/letter tray", "/monitor", "/mug", "/network switch", "/over-ear headphones", "/pen",
           "/purse", "/stand mixer", "/stroller"]
for d, td in zip(["Adaptiope/product_images", "Adaptiope/real_life"], ["adaptiope_small/product_images", "adaptiope_small/real_life"]):
  makedirs(td)
  for c in tqdm(classes):
    c_path = ''.join((d, c))
    c_target = ''.join((td, c))
    shutil.copytree(c_path, c_target)

In [None]:
import torch
import torchvision
import torch.nn.functional as F
import torchvision.transforms as T
from torch.utils.tensorboard import SummaryWriter

In [None]:
def get_data(batch_size, img_root):
  # resizing and cropping
  # prepare data transformations for the train loader
  transform = list()
  transform.append(T.Resize((256, 256)))                      # resize each PIL image to 256 x 256
  transform.append(T.RandomCrop((224, 224)))                 # randomly crop a 224 x 224 patch
  transform.append(T.ToTensor())                              # convert Numpy to Pytorch Tensor
  transform.append(T.Normalize(mean=[0.485, 0.456, 0.406], 
                               std=[0.229, 0.224, 0.225]))    # normalize with ImageNet mean
  transform = T.Compose(transform)                            # compose the above transformations into one
    
  # load data
  dataset = torchvision.datasets.ImageFolder(root=img_root, transform=transform)
  
  # create train and test splits (80/20)
  num_samples = len(dataset)
  training_samples = int(num_samples * 0.8 + 1)
  test_samples = num_samples - training_samples

  training_data, test_data = torch.utils.data.random_split(dataset, 
                                                           [training_samples, test_samples])

  # initialize dataloaders
  train_loader = torch.utils.data.DataLoader(training_data, batch_size, shuffle=True)
  test_loader = torch.utils.data.DataLoader(test_data, batch_size, shuffle=False)
  
  return train_loader, test_loader

In [None]:
def initialize_resnet(num_classes):

  # load the pre-trained Alexnet
  resnet = torchvision.models.resnet50(pretrained=True)
  
  # get the number of neurons in the second last layer
  in_features = resnet.fc.in_features
  
  # re-initalize the output layer
  resnet.fc = torch.nn.Linear(in_features=in_features, 
                                          out_features=num_classes)
  
  return resnet

In [None]:
def get_cost_function():
  cost_function = torch.nn.CrossEntropyLoss()
  return cost_function

In [None]:
def get_optimizer(model, lr, wd, momentum):
  
  # we will create two groups of weights, one for the newly initialized layer
  # and the other for rest of the layers of the network
  
  final_layer_weights = []
  rest_of_the_net_weights = []
  
  # iterate through the layers of the network
  for name, param in model.named_parameters():
    if name.startswith('fc'):
      final_layer_weights.append(param)
    else:
      rest_of_the_net_weights.append(param)
  
  # assign the distinct learning rates to each group of parameters
  optimizer = torch.optim.SGD([
      {'params': rest_of_the_net_weights},
      {'params': final_layer_weights, 'lr': lr}
  ], lr=lr / 10, weight_decay=wd, momentum=momentum)
  
  return optimizer

In [None]:
get_optimizer(initialize_resnet(20), 0.001, 0.1, 0.1)

In [None]:
def training_step(net, data_loader, optimizer, cost_function, device='cuda'):

  samples = 0.
  cumulative_loss = 0.
  cumulative_accuracy = 0.

  # set the network to training mode: particularly important when using dropout!
  net.train() 

  # iterate over the training set
  for batch_idx, (inputs, targets) in enumerate(data_loader):

    # load data into GPU
    inputs = inputs.to(device)
    targets = targets.to(device)
      
    # forward pass
    outputs = net(inputs)

    # loss computation
    loss = cost_function(outputs,targets)

    # backward pass
    loss.backward()
    
    # parameters update
    optimizer.step()
    
    # gradients reset
    optimizer.zero_grad()

    # fetch prediction and loss value
    samples += inputs.shape[0]
    cumulative_loss += loss.item()
    _, predicted = outputs.max(dim=1) # max() returns (maximum_value, index_of_maximum_value)

    # compute training accuracy
    cumulative_accuracy += predicted.eq(targets).sum().item()

  return cumulative_loss/samples, cumulative_accuracy/samples*100

def test_step(net, data_loader, cost_function, device='cuda'):

  samples = 0.
  cumulative_loss = 0.
  cumulative_accuracy = 0.

  # set the network to evaluation mode
  net.eval() 

  # disable gradient computation (we are only testing, we do not want our model to be modified in this step!)
  with torch.no_grad():

    # iterate over the test set
    for batch_idx, (inputs, targets) in enumerate(data_loader):
      
      # load data into GPU
      inputs = inputs.to(device)
      targets = targets.to(device)
        
      # forward pass
      outputs = net(inputs)

      # loss computation
      loss = cost_function(outputs, targets)

      # fetch prediction and loss value
      samples+=inputs.shape[0]
      cumulative_loss += loss.item() # Note: the .item() is needed to extract scalars from tensors
      _, predicted = outputs.max(1)

      # compute accuracy
      cumulative_accuracy += predicted.eq(targets).sum().item()

  return cumulative_loss/samples, cumulative_accuracy/samples*100

In [None]:
'''
Input arguments
  batch_size: Size of a mini-batch
  device: GPU where you want to train your network
  weight_decay: Weight decay co-efficient for regularization of weights
  momentum: Momentum for SGD optimizer
  epochs: Number of epochs for training the network
  num_classes: Number of classes in your dataset
  visualization_name: Name of the visualization folder
  img_root: The root folder of images
'''

def main(batch_size=30, 
         device='cuda:0', 
         learning_rate=0.001, 
         weight_decay=0.000001, 
         momentum=0.9, 
         epochs=50, 
         num_classes=65, 
         visualization_name='resnet_sgd', 
         img_root='adaptiope_small/product_images'):
  
  writer = SummaryWriter(log_dir="runs/exp1")

  # instantiates dataloaders
  train_loader, test_loader = get_data(batch_size=batch_size, img_root=img_root)
  
  # instantiates the model
  net = initialize_resnet(20).to(device)
  
  # instantiates the optimizer
  optimizer = get_optimizer(net, learning_rate, weight_decay, momentum)
  
  # instantiates the cost function
  cost_function = get_cost_function()

  # perform a preliminar step
  print('Before training:')
  train_loss, train_accuracy = test_step(net, train_loader, cost_function)
  test_loss, test_accuracy = test_step(net, test_loader, cost_function)

  print('\t Training loss {:.5f}, Training accuracy {:.2f}'.format(train_loss, train_accuracy))
  print('\t Test loss {:.5f}, Test accuracy {:.2f}'.format(test_loss, test_accuracy))
  print('-----------------------------------------------------')
  
  # add values to logger
  writer.add_scalar('Loss/train_loss', train_loss, 0)
  writer.add_scalar('Loss/test_loss', test_loss, 0)
  writer.add_scalar('Accuracy/train_accuracy', train_accuracy, 0)
  writer.add_scalar('Accuracy/test_accuracy', test_accuracy, 0)

  # range over the number of epochs
  for e in range(epochs):
    train_loss, train_accuracy = training_step(net, train_loader, optimizer, cost_function)
    test_loss, test_accuracy = test_step(net, test_loader, cost_function)
    print('Epoch: {:d}'.format(e+1))
    print('\t Training loss {:.5f}, Training accuracy {:.2f}'.format(train_loss, train_accuracy))
    print('\t Test loss {:.5f}, Test accuracy {:.2f}'.format(test_loss, test_accuracy))
    print('-----------------------------------------------------')
    
    # add values to logger
    writer.add_scalar('Loss/train_loss', train_loss, e + 1)
    writer.add_scalar('Loss/test_loss', test_loss, e + 1)
    writer.add_scalar('Accuracy/train_accuracy', train_accuracy, e + 1)
    writer.add_scalar('Accuracy/test_accuracy', test_accuracy, e + 1)

  # perform final test step and print the final metrics
  print('After training:')
  train_loss, train_accuracy = test_step(net, train_loader, optimizer, cost_function)
  test_loss, test_accuracy = test_step(net, test_loader, cost_function)

  print('\t Training loss {:.5f}, Training accuracy {:.2f}'.format(train_loss, train_accuracy))
  print('\t Test loss {:.5f}, Test accuracy {:.2f}'.format(test_loss, test_accuracy))
  print('-----------------------------------------------------')

  # close the logger
  writer.close()

In [None]:
main()