## Load Tiny ImageNet Data

In [None]:
# This loads your google drive
from google.colab import drive
drive.mount( '/content/drive', force_remount=True)

In [None]:
!wget http://cs231n.stanford.edu/tiny-imagenet-200.zip
!unzip -q tiny-imagenet-200.zip && ls tiny-imagenet-200

## Format validation data


In [None]:
import os

def create_val_img_folder(path_to_val):
    '''
    This method is responsible for separating validation images into separate sub folders
    '''
    dataset_dir = os.path.join(path_to_val)
    val_dir = os.path.join(dataset_dir, 'val')
    img_dir = os.path.join(val_dir, 'images')

    fp = open(os.path.join(val_dir, 'val_annotations.txt'), 'r')
    data = fp.readlines()
    val_img_dict = {}
    for line in data:
        words = line.split('\t')
        val_img_dict[words[0]] = words[1]
    fp.close()

    # Create folder if not present and move images into proper folders
    val_image_dir = "IMagenet/tiny-imagenet-200/val_new"
    for img, folder in val_img_dict.items():
        newpath = (os.path.join(val_image_dir, folder))
        newpath += '/images/'
        if not os.path.exists(newpath):
            os.makedirs(newpath)
        if os.path.exists(os.path.join(img_dir, img)):
            os.rename(os.path.join(img_dir, img), os.path.join(newpath, img))

create_val_img_folder('IMagenet/tiny-imagenet-200/')

In [None]:
! cd IMagenet/tiny-imagenet-200/val_new/n01443537/images && ls

## Get name for each label



In [None]:
def Get_label_name(path):
  fp = open(os.path.join(path,'words.txt'), 'r')
  data = fp.readlines()
  
  dict_labelsAll = {}
  for line in data:
   list = line.split()
   label_name = list[1:]
   s = " "
   dict_labelsAll[list[0]] = s.join(label_name)
  fp.close()

  return dict_labelsAll

dict_all_labels = Get_label_name('IMagenet/tiny-imagenet-200/')

In [None]:
import os
def Get_200label_name(path, dict_labelsAll):
  fp = open(os.path.join(path, 'val_annotations.txt'), 'r')
  data = data = fp.readlines()
  label_list = []

  for line in data:
    list = line.split()
    label_list.append(list[1])
  fp.close()
  label_list = set(label_list)
  label_list = sorted(label_list)
  print(label_list)

  dict_labels200 = {}
  for i in range(len(label_list)):
    dict_labels200[i] = dict_labelsAll[label_list[i]]

  return dict_labels200, label_list

dict_200_labels, labels_200 = Get_200label_name('IMagenet/tiny-imagenet-200/val', dict_all_labels)

## Prepare test data

In [None]:
import os

def create_test_folder(path, labels_200):

  train_dir = os.path.join(path, 'train')
  test_image_dir = "IMagenet/tiny-imagenet-200/test_images/"
  test_img_size = 50

  for label in labels_200:
    label_directory = os.path.join(train_dir, label)
    label_directory += '/images/'

    test_label = os.path.join(test_image_dir, label)
    test_label += '/images/'
    i = 0
    for file in os.listdir(label_directory):
      if not os.path.exists(test_label):
        os.makedirs(test_label)

      if i<test_img_size and os.path.exists(os.path.join(label_directory, file)):
        if file not in 'boxes':
          os.rename(os.path.join(label_directory, file), os.path.join(test_label, file))
      else:
        break
      i = i+1

create_test_folder('IMagenet/tiny-imagenet-200/', labels_200)

In [None]:
! cd IMagenet/tiny-imagenet-200/test_images/n01768244/images && ls | wc -l

In [None]:
! cd IMagenet/tiny-imagenet-200/train/n01768244/images && ls | wc -l

## We run Pytorch on GPU

In [None]:
import torch
# set seed
torch.manual_seed(0)
# set torch.device
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
# Print to see if you have GPU available, if no GPU => change colab runtime
print(f'GPU: {torch.cuda.get_device_name(0)}')

## Define Model

In [None]:
import math
from tqdm.autonotebook import tqdm
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
from torchvision import datasets, transforms

path = "IMagenet/tiny-imagenet-200/"

class Data():
    """
        class for loading data
    """

    
    def __init__(self):

        # TODO: ADD MORE IMAGE TRANSFORMATIONS?
        self.transform = transforms.Compose([
                              transforms.ToTensor(),
                              #transforms.RandomRotation(20),
                              #transforms.RandomHorizontalFlip(0.5),
                              transforms.Normalize(mean=[0.4802, 0.4481, 0.3975], std=[0.2302, 0.2265, 0.2262]),
                          ])
        # Init data loaders
        self.train_loader = self.load_data(path + 'train', batchsize=100, shuffle=True)
        self.val_loader = self.load_data(path + 'val_new', batchsize=100, shuffle=False)
        self.test_loader = self.load_data(path + 'test_images', batchsize=100, shuffle=False)


    def load_data(self, ImagefolderPath:str, batchsize:int, shuffle=True) -> torch.utils.data.DataLoader:
        """
            load images into Pytorch DataLoader given Image path. 
            OBS: Requires specific folder format
        """
        imagenet_data = torchvision.datasets.ImageFolder(ImagefolderPath, transform=self.transform)
        data_loader = torch.utils.data.DataLoader(imagenet_data,
                                            batch_size=batchsize,
                                            shuffle=shuffle,
                                            pin_memory=True)
        return data_loader

In [None]:
class Block(nn.Module):
    
    def __init__(self, input_dim, output_dim, stride=1, downsampling=False):
        super().__init__()
        if isDropout:
          self.dorpout1 = nn.Dropout2d(p=0.5, inplace=True)
        self.conv1 = nn.Conv2d(input_dim, output_dim, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(output_dim)
        self.relu = nn.ReLU(inplace=True)
        if isDropout:
          self.dorpout2 = nn.Dropout2d(p=0.5, inplace=True)
        self.conv2 = nn.Conv2d(output_dim, output_dim, kernel_size=3, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(output_dim)
        
        self.downsampling = downsampling
    
        if downsampling:
            layers = []
            layers.append(nn.Conv2d(input_dim, output_dim, kernel_size=1, stride=2, bias=False)) 
            layers.append(nn.BatchNorm2d(output_dim)) 
            self.downsample = nn.Sequential(*layers)


    def forward(self, x):
        
        identity = x
        
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        if isDropout:
          x = self.dorpout1(x)
        x = self.conv2(x)
        x = self.bn2(x)
        if self.downsampling:
            identity = self.downsample(identity)
        x += identity
        x = self.relu(x)
        if isDropout:
          x = self.dorpout2(x)
        return x

In [None]:
class Resnet(nn.Module):
    """
        18-layer ResNet architecture
    """

    # TODO: HOW ABOUT PADDINGS?
    def __init__(self, out_classes=200):
        super().__init__()

        self.relu = nn.ReLU(inplace=True)
        self.maxpooling = nn.MaxPool2d(3, stride=2)
  
        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(64)

        # APPEND ALL 3x3 CONV LAYERS - DIMENSION 64
        conv_layers = []
        conv_layers.append(Block(input_dim=64, output_dim=64, stride=1)) 
        conv_layers.append(Block(input_dim=64, output_dim=64, stride=1)) 
        self.conv2 = nn.Sequential(*conv_layers)

        # APPEND ALL 3x3 CONV LAYERS - DIMENSION 128
        conv_layers = []
        conv_layers.append(Block(input_dim=64, output_dim=128, stride=2, downsampling=True)) 
        conv_layers.append(Block(input_dim=128, output_dim=128, stride=1)) 
        self.conv3 = nn.Sequential(*conv_layers)

        # APPEND ALL 3x3 CONV LAYERS - DIMENSION 256
        conv_layers = []
        conv_layers.append(Block(input_dim=128, output_dim=256, stride=2, downsampling=True)) 
        conv_layers.append(Block(input_dim=256, output_dim=256, stride=1)) 
        self.conv4 = nn.Sequential(*conv_layers)

         # APPEND ALL 3x3 CONV LAYERS - DIMENSION 256
        conv_layers = []
        conv_layers.append(Block(input_dim=256, output_dim=512, stride=2, downsampling=True)) 
        conv_layers.append(Block(input_dim=512, output_dim=512, stride=1)) 
        self.conv5 = nn.Sequential(*conv_layers)

        # average pooling => selects the average of the 
        # activations in the feature map
        self.avg_pool = nn.AdaptiveAvgPool2d(output_size=(1,1))
        # fully connected layers => classification layers
        self.fc1 = nn.Linear(512, out_classes, bias=True)
        

        
    def forward(self, x):
        """
            Forward step
        """

        x = self.conv1(x)
        x = self.maxpooling(x)
        x = self.conv2(x)
        x = self.conv3(x)
        x = self.conv4(x)
        x = self.conv5(x)
        x = self.avg_pool(x)
        x = torch.flatten(x, 1)
        x = self.fc1(x)
        return x

    def fit(self, epochs=10, verbose=True, validation=True):
        """
            Function for training the network
        """
        # NOTE: this uses softmax per default => No need to use softmax in forward I believe
        criterion = nn.CrossEntropyLoss()

        # same paramters as used in the paper
        optimizer = optim.SGD(self.parameters(), lr=0.1, momentum=0.9, weight_decay=0.0001)

        # SGD, dropout
        #optimizer = optim.SGD(self.parameters(), lr=0.1, momentum=0.9)

        #Optimazer Adadelta
        #optimizer = optim.Adadelta(self.parameters(), lr=0.1, weight_decay=0.0001)

        
        # Dataloaders
        train_loader = data.train_loader
        val_loader = data.val_loader

        
        training_losses = []
        training_accuracy = []
        validation_losses = []
        validation_accuracy = []

        for epoch in range(epochs):  
            # training mode
            self.train()
            # Training loss
            training_loss = 0
            # Accuracy variables
            correct = 0
            total = 0
            for x, y in tqdm(train_loader, desc="Epoch {}".format(epoch + 1)):                
                optimizer.zero_grad()

                # map to cuda if GPU available
                x = x.to(device)
                y = y.to(device)

                # Forward propagation
                isDropout = False
                outputs = self(x)
                loss = criterion(outputs, y)
                # Backward propagation
                loss.backward()
                optimizer.step()

                # Training loss
                training_loss+= loss.item()

                # Calculate training accuracy
                _, predicted = torch.max(outputs.data, 1)
                total += y.size(0)
                correct += (predicted == y).sum().item()
                

            # Training accuracy
            train_acc =  100 * (correct / total)
            # Training loss
            training_loss /= len(train_loader)
            
            # Print validation scores
            if validation:
              
              val_loss, val_acc = validation_scores(self, val_loader, criterion)  
              print(f'Validation Loss: {val_loss}') 
              print(f'Validation Accuracy: {val_acc}') 
              validation_losses.append(val_loss)
              validation_accuracy.append(val_acc)
      
            # Priint training loss
            if verbose:
                print(f'Training Loss: {training_loss}')
                print(f'Training Accuracy: {train_acc}')

            # Store training scores
            training_losses.append(training_loss) 
            training_accuracy.append(train_acc)


        return training_losses, training_accuracy, validation_accuracy, validation_losses, epochs

In [None]:
def validation_scores(net, val_loader, loss_function):
  """
      Calculates validation loss and accuracy
  """
  correct = 0
  total = 0
  loss = 0
  # since we're not training, we don't need to calculate the gradients for our outputs
  with torch.no_grad():
      net.eval()
      for x, y in val_loader:  
          x = x.to(device)
          y = y.to(device)
          # Forward propagation
          isDropout = False
          outputs = net(x)
          val_loss = loss_function(outputs, y)
                    
          loss+= val_loss.item()
          # Calculate accuracy
          _, predicted = torch.max(outputs.data, 1)
          total += y.size(0)
          correct += (predicted == y).sum().item()
          
      
  # Validation Accuracy
  acc =  100 * (correct / total)
  return loss/len(val_loader), acc

In [None]:
def test_acc(net, test_loader):#, scheduler):
  """
      Calculates validation loss and accuracy
  """
  correct = 0
  total = 0

  # since we're not training, we don't need to calculate the gradients for our outputs
  with torch.no_grad():
      for x, y in test_loader:  
          x = x.to(device)
          y = y.to(device)
          # Forward propagation
          isDropout = False
          outputs = net(x)

          # Calculate accuracy
          _, predicted = torch.max(outputs.data, 1)
          total += y.size(0)
          correct += (predicted == y).sum().item()
          
      
  # Validation Accuracy
  acc =  100 * (correct / total)
  return acc

## Train and store model (To Drive)

In [None]:
isDropout = False
resnet = Resnet()
data = Data()
resnet.to(device)
training_losses, training_accuracy, validation_accuracy, validation_losses, epochs = resnet.fit(epochs=20)

## Visualization

In [None]:
import matplotlib.pyplot as plt

def plot_losses(training_loss: list, validation_loss: list, epochs: list) -> None:
  """
      Plot the validation/training loss
  """
  # PLOT RESULTS
  fig, ax = plt.subplots()
  ax.ticklabel_format(useOffset=False, style='plain')
  plt.plot(epochs, training_loss, label='Training loss')
  plt.plot(epochs, validation_loss, label='Validation loss')
  plt.title('Training/Validation loss without dropout')
  plt.xlabel('Epochs')
  plt.ylabel('Loss')
  plt.legend()
  plt.show()
  plt.clf()


plot_losses(training_losses, validation_losses, range(epochs))

In [None]:
import matplotlib.pyplot as plt

def plot_accuracy(training_accuracy: list, validation_accuracy: list, epochs: list) -> None:
  """
      Plot the validation/training loss
  """
  # PLOT RESULTS
  fig, ax = plt.subplots()
  ax.ticklabel_format(useOffset=False, style='plain')
  plt.plot(epochs, training_accuracy, label='Training Accuracy')
  plt.plot(epochs, validation_accuracy, label='Validation Accuracy')
  plt.title('Training/Validation Accuracy without dropout')
  plt.xlabel('Epochs')
  plt.ylabel('Accuracy')
  plt.legend()
  #plt.savefig(f'./smooth_loss_3_epochs')
  plt.show()
  plt.clf()


plot_accuracy(training_accuracy, validation_accuracy, range(epochs))

## Calculate the mean and std for the model runs

In [None]:
import numpy as np
print(np.mean([31.785, 34, 31.76]))
print(np.std([31.785, 34, 31.76]))