## Classification Notebook
### Deep Learning project - Task #1
*Sebastiano Chiari - Francesco Ferrini - Wamiq Raza*

### Setup

Import section

In [None]:
import zipfile
import os
from tqdm.notebook import tqdm

import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader
import torchvision
import torchvision.transforms as tt
from torchvision.io import read_image
from torchvision.utils import make_grid
from torchsummary import summary

import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from sklearn import preprocessing
from skimage import io, transform

from torch.utils.tensorboard import SummaryWriter

import warnings
warnings.filterwarnings("ignore")

Global variables definition

In [None]:
DATASET_PATH = '/content/dataset/'
MODEL_PATH = '/content/classification_model.pt'

ANNOTATIONS_CSV_PATH = DATASET_PATH + 'annotations_train.csv'

TRAIN_FOLDER = DATASET_PATH + 'train/'
TEST_FOLDER = DATASET_PATH + 'test/'

TRAIN_CSV_PATH = DATASET_PATH + 'train_annotation.csv'
VALIDATION_CSV_PATH = DATASET_PATH + 'validation_annotations.csv'
TEST_CSV_PATH = DATASET_PATH + 'tests.csv'

In [None]:
TRAIN_PERCENTAGE = 70
BATCH_SIZE = 64
EPOCHS = 100

Device

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

___

## Dataset

Import the dataset and extract the zip file

In [None]:
from google.colab import drive
drive.mount('/content/drive')
!unzip "/content/drive/MyDrive/dataset.zip" -d dataset

Preprocessing (create two new csv files, one for training images and the other for validation images)

In [None]:
def preprocessing(annotations_path=ANNOTATIONS_CSV_PATH, train_path=TRAIN_FOLDER):
  # load annotations CSV and retrieve IDs
  annotation_csv = pd.read_csv(annotations_path)
  
  # retrieve all the images from the train folder
  train_imgs = os.listdir(train_path)

  # get the image id from the file name
  train_imgs_data = []
  for s in train_imgs:
    train_imgs_data.append([int(s.split('_')[0]), s])

  # dataframe for image id and filename
  train_imgs_df = pd.DataFrame(train_imgs_data, columns=['id', 'filename'])
  # merge the 2 dataframes on id value so that we have 1 line for each image name instead that 1 line for each id
  annotations_train = pd.merge(annotation_csv, train_imgs_df, on='id')
  # 1-hot encoding except for age value that has 4 different values [0,1,2,3]
  annotations_train.iloc[annotations_train.index,1:28] = annotations_train.iloc[annotations_train.index,1:28]-1
  # move the last column in second position
  column_to_move = annotations_train.pop("filename")
  # move filename column in second position
  annotations_train.insert(1, "filename", column_to_move)

  # get and split IDs
  ids = annotation_csv.iloc[:,0].to_numpy()
  np.random.shuffle(ids)
  val_samples = len(ids) - int((TRAIN_PERCENTAGE / 100) * (len(ids)))
  train_ids = ids[:-val_samples]
  val_ids = ids[-val_samples:]

  train_df = annotations_train.loc[annotations_train['id'].isin(train_ids)]
  train_df = train_df.iloc[:, 1:] 

  val_df = annotations_train.loc[annotations_train['id'].isin(val_ids)]
  val_df = val_df.iloc[:, 1:]

  # export CSV files
  pd.DataFrame(train_df).to_csv(TRAIN_CSV_PATH)
  pd.DataFrame(val_df).to_csv(VALIDATION_CSV_PATH)

Train dataset class

In [None]:
class MarketDataset(Dataset):
  def __init__(self, csv_file, root_dir, transform=None):
    self.annotations_frame = pd.read_csv(csv_file)
    self.root_dir = root_dir
    self.transform = transform
  
  def __len__(self):
    return len(self.annotations_frame)
    
  def __getitem__(self, idx):
    if torch.is_tensor(idx):
      idx = idx.tolist()
    
    img_path = os.path.join(self.root_dir, str(self.annotations_frame.iloc[idx, 1]))
    image = io.imread(img_path)
    
    sample = {
        'image': image,
        'annotations': {
            'age': np.array([self.annotations_frame.iloc[idx, 2]]).astype('float'),
            'backpack': np.array([self.annotations_frame.iloc[idx, 3]]).astype('float'),
            'bag': np.array([self.annotations_frame.iloc[idx, 4]]).astype('float'),
            'handbag': np.array([self.annotations_frame.iloc[idx, 5]]).astype('float'),
            'clothes': np.array([self.annotations_frame.iloc[idx, 6]]).astype('float'),
            'down': np.array([self.annotations_frame.iloc[idx, 7]]).astype('float'),
            'up': np.array([self.annotations_frame.iloc[idx, 8]]).astype('float'),
            'hair': np.array([self.annotations_frame.iloc[idx, 9]]).astype('float'),
            'hat': np.array([self.annotations_frame.iloc[idx, 10]]).astype('float'),
            'gender': np.array([self.annotations_frame.iloc[idx, 11]]).astype('float'),
            'upblack': np.array([self.annotations_frame.iloc[idx, 12]]).astype('float'),
            'upwhite': np.array([self.annotations_frame.iloc[idx, 13]]).astype('float'),
            'upred': np.array([self.annotations_frame.iloc[idx, 14]]).astype('float'),
            'uppurple': np.array([self.annotations_frame.iloc[idx, 15]]).astype('float'),
            'upyellow': np.array([self.annotations_frame.iloc[idx, 16]]).astype('float'),
            'upgray': np.array([self.annotations_frame.iloc[idx, 17]]).astype('float'),
            'upblue': np.array([self.annotations_frame.iloc[idx, 18]]).astype('float'),
            'upgreen': np.array([self.annotations_frame.iloc[idx, 19]]).astype('float'),
            'downblack': np.array([self.annotations_frame.iloc[idx, 20]]).astype('float'),
            'downwhite': np.array([self.annotations_frame.iloc[idx, 21]]).astype('float'),
            'downpink': np.array([self.annotations_frame.iloc[idx, 22]]).astype('float'),
            'downpurple': np.array([self.annotations_frame.iloc[idx, 23]]).astype('float'),
            'downyellow': np.array([self.annotations_frame.iloc[idx, 24]]).astype('float'),
            'downgray': np.array([self.annotations_frame.iloc[idx, 25]]).astype('float'),
            'downblue': np.array([self.annotations_frame.iloc[idx, 26]]).astype('float'),
            'downgreen': np.array([self.annotations_frame.iloc[idx, 27]]).astype('float'),
            'downbrown': np.array([self.annotations_frame.iloc[idx, 28]]).astype('float'),
        }
    }
    
    if self.transform:
      sample['image'] = self.transform(sample['image'])
    
    return sample

Data augmentation

In [None]:
imagenet_stats = ([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])

image_transform = tt.Compose([
    tt.ToPILImage(),
    tt.RandomCrop((128, 64), padding=8, padding_mode='reflect'),
    tt.RandomHorizontalFlip(p=0.5), 
    tt.RandomRotation(10),
    tt.ToTensor(), 
    tt.Normalize(*imagenet_stats,inplace=True), 
    tt.RandomErasing(p=0.5, inplace=True)
])

## Dataloaders

In [None]:
def get_data(train_csv=TRAIN_CSV_PATH, val_csv=VALIDATION_CSV_PATH, train_path=TRAIN_FOLDER, batch_size=BATCH_SIZE):
  # load dataset and create CSV files
  preprocessing()

  # create the two dataset classes
  train_dataset = MarketDataset(train_csv, train_path, transform=image_transform)
  val_dataset = MarketDataset(val_csv, train_path, transform=image_transform)

  # create dataloaders
  train_dataloader = torch.utils.data.DataLoader(train_dataset, batch_size, shuffle=True, num_workers=2, pin_memory=True)
  val_dataloader = torch.utils.data.DataLoader(val_dataset, batch_size, num_workers=2, pin_memory=True)

  return train_dataloader, val_dataloader

## Network definition

Custom ResNet50 pretrained

In [None]:
class CustomResNet50(nn.Module):
  def __init__(self, n_age_classes=4, other_classes=2, dropout_p=0.4):
    super(CustomResNet50, self).__init__()
    
    # import pretrained resnet50 backbone
    resnet50 = torchvision.models.resnet50(pretrained=True)
    self.fc_inputs = resnet50.fc.in_features
    layers = list(resnet50.children())[:-1]
    
    # edit last layer of the network
    self.feature_extractor = nn.Sequential(*layers)
    
    # create separate classifiers for each class
    self.age = nn.Sequential(
        nn.Dropout(p=dropout_p), 
        nn.Linear(self.fc_inputs,1000), 
        nn.ReLU(), nn.Dropout(p=dropout_p), 
        nn.Linear(1000,n_age_classes)
    )
    self.bagpack = nn.Sequential(nn.Dropout(p=dropout_p), nn.Linear(self.fc_inputs,1000), nn.ReLU(), nn.Dropout(p=dropout_p), nn.Linear(1000,other_classes))
    self.bag = nn.Sequential(nn.Dropout(p=dropout_p), nn.Linear(self.fc_inputs,1000), nn.ReLU(), nn.Dropout(p=dropout_p), nn.Linear(1000,other_classes))
    self.handbag = nn.Sequential(nn.Dropout(p=dropout_p), nn.Linear(self.fc_inputs,1000), nn.ReLU(), nn.Dropout(p=dropout_p), nn.Linear(1000,other_classes))
    self.clothes = nn.Sequential(nn.Dropout(p=dropout_p), nn.Linear(self.fc_inputs,1000), nn.ReLU(), nn.Dropout(p=dropout_p), nn.Linear(1000,other_classes))
    self.down = nn.Sequential(nn.Dropout(p=dropout_p), nn.Linear(self.fc_inputs,1000), nn.ReLU(), nn.Dropout(p=dropout_p), nn.Linear(1000,other_classes))
    self.up = nn.Sequential(nn.Dropout(p=dropout_p), nn.Linear(self.fc_inputs,1000), nn.ReLU(), nn.Dropout(p=dropout_p), nn.Linear(1000,other_classes))
    self.hair = nn.Sequential(nn.Dropout(p=dropout_p), nn.Linear(self.fc_inputs,1000), nn.ReLU(), nn.Dropout(p=dropout_p), nn.Linear(1000,other_classes))
    self.hat = nn.Sequential(nn.Dropout(p=dropout_p), nn.Linear(self.fc_inputs,1000), nn.ReLU(), nn.Dropout(p=dropout_p), nn.Linear(1000,other_classes))
    self.gender = nn.Sequential(nn.Dropout(p=dropout_p), nn.Linear(self.fc_inputs,1000), nn.ReLU(), nn.Dropout(p=dropout_p), nn.Linear(1000,other_classes))
    self.upblack = nn.Sequential(nn.Dropout(p=dropout_p), nn.Linear(self.fc_inputs,1000), nn.ReLU(), nn.Dropout(p=dropout_p), nn.Linear(1000,other_classes))
    self.upwhite = nn.Sequential(nn.Dropout(p=dropout_p), nn.Linear(self.fc_inputs,1000), nn.ReLU(), nn.Dropout(p=dropout_p), nn.Linear(1000,other_classes))
    self.upred = nn.Sequential(nn.Dropout(p=dropout_p), nn.Linear(self.fc_inputs,1000), nn.ReLU(), nn.Dropout(p=dropout_p), nn.Linear(1000,other_classes))
    self.uppurple = nn.Sequential(nn.Dropout(p=dropout_p), nn.Linear(self.fc_inputs,1000), nn.ReLU(), nn.Dropout(p=dropout_p), nn.Linear(1000,other_classes))
    self.upyellow = nn.Sequential(nn.Dropout(p=dropout_p), nn.Linear(self.fc_inputs,1000), nn.ReLU(), nn.Dropout(p=dropout_p), nn.Linear(1000,other_classes))
    self.upgray = nn.Sequential(nn.Dropout(p=dropout_p), nn.Linear(self.fc_inputs,1000), nn.ReLU(), nn.Dropout(p=dropout_p), nn.Linear(1000,other_classes))
    self.upblue = nn.Sequential(nn.Dropout(p=dropout_p), nn.Linear(self.fc_inputs,1000), nn.ReLU(), nn.Dropout(p=dropout_p), nn.Linear(1000,other_classes))
    self.upgreen = nn.Sequential(nn.Dropout(p=dropout_p), nn.Linear(self.fc_inputs,1000), nn.ReLU(), nn.Dropout(p=dropout_p), nn.Linear(1000,other_classes))
    self.downblack = nn.Sequential(nn.Dropout(p=dropout_p), nn.Linear(self.fc_inputs,1000), nn.ReLU(), nn.Dropout(p=dropout_p), nn.Linear(1000,other_classes))
    self.downwhite = nn.Sequential(nn.Dropout(p=dropout_p), nn.Linear(self.fc_inputs,1000), nn.ReLU(), nn.Dropout(p=dropout_p), nn.Linear(1000,other_classes))
    self.downpink = nn.Sequential(nn.Dropout(p=dropout_p), nn.Linear(self.fc_inputs,1000), nn.ReLU(), nn.Dropout(p=dropout_p), nn.Linear(1000,other_classes))
    self.downpurple = nn.Sequential(nn.Dropout(p=dropout_p), nn.Linear(self.fc_inputs,1000), nn.ReLU(), nn.Dropout(p=dropout_p), nn.Linear(1000,other_classes))
    self.downyellow = nn.Sequential(nn.Dropout(p=dropout_p), nn.Linear(self.fc_inputs,1000), nn.ReLU(), nn.Dropout(p=dropout_p), nn.Linear(1000,other_classes))
    self.downgray = nn.Sequential(nn.Dropout(p=dropout_p), nn.Linear(self.fc_inputs,1000), nn.ReLU(), nn.Dropout(p=dropout_p), nn.Linear(1000,other_classes))
    self.downblue = nn.Sequential(nn.Dropout(p=dropout_p), nn.Linear(self.fc_inputs,1000), nn.ReLU(), nn.Dropout(p=dropout_p), nn.Linear(1000,other_classes))
    self.downgreen = nn.Sequential(nn.Dropout(p=dropout_p), nn.Linear(self.fc_inputs,1000), nn.ReLU(), nn.Dropout(p=dropout_p), nn.Linear(1000,other_classes))
    self.downbrown = nn.Sequential(nn.Dropout(p=dropout_p), nn.Linear(self.fc_inputs,1000), nn.ReLU(), nn.Dropout(p=dropout_p), nn.Linear(1000,other_classes))

  def forward(self, x):
    x = self.feature_extractor(x)
    x = x.view(x.size(0), -1)
    return {
        'age': self.age(x),
        'backpack': self.bagpack(x),
        'bag': self.bag(x),
        'handbag': self.handbag(x),
        'clothes': self.clothes(x),
        'down': self.down(x),
        'up': self.up(x),
        'hair': self.hair(x),
        'hat': self.hat(x),
        'gender': self.gender(x),
        'upblack': self.upblack(x),
        'upwhite': self.upwhite(x),
        'upred': self.upred(x),
        'uppurple': self.uppurple(x),
        'upyellow': self.upyellow(x),
        'upgray': self.upgray(x),
        'upblue': self.upblue(x),
        'upgreen': self.upgreen(x),
        'downblack': self.downblack(x),
        'downwhite': self.downwhite(x),
        'downpink': self.downpink(x),
        'downpurple': self.downpurple(x),
        'downyellow': self.downyellow(x),
        'downgray': self.downgray(x),
        'downblue': self.downblue(x),
        'downgreen': self.downgreen(x),
        'downbrown': self.downbrown(x)
    }

## Early Stopping

In [None]:
class EarlyStopping:
    """Early stops the training if validation loss doesn't improve after a given patience."""
    def __init__(self, patience=7, verbose=False, delta=0, path=MODEL_PATH, trace_func=print):
        """
        Args:
            patience (int): How long to wait after last time validation loss improved.
                            Default: 7
            verbose (bool): If True, prints a message for each validation loss improvement. 
                            Default: False
            delta (float): Minimum change in the monitored quantity to qualify as an improvement.
                            Default: 0
            path (str): Path for the checkpoint to be saved to.
                            Default: 'checkpoint.pt'
            trace_func (function): trace print function.
                            Default: print            
        """
        self.patience = patience
        self.verbose = verbose
        self.counter = 0
        self.best_score = None
        self.early_stop = False
        self.val_loss_min = np.Inf
        self.delta = delta
        self.path = path
        self.trace_func = trace_func
    def __call__(self, val_loss, model):

        score = -val_loss

        if self.best_score is None:
            self.best_score = score
            self.save_checkpoint(val_loss, model)
        elif score < self.best_score + self.delta:
            self.counter += 1
            self.trace_func(f'EarlyStopping counter: {self.counter} out of {self.patience}')
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_score = score
            self.save_checkpoint(val_loss, model)
            self.counter = 0

    def save_checkpoint(self, val_loss, model):
        '''Saves model when validation loss decrease.'''
        if self.verbose:
            self.trace_func(f'Validation loss decreased ({self.val_loss_min:.6f} --> {val_loss:.6f}).  Saving model ...')
        torch.save(model, self.path)
        self.val_loss_min = val_loss

## Loss / cost function

In [None]:
def get_cost_function():
  return nn.CrossEntropyLoss()

## Accuracy

In [None]:
'''
  Function to get both the total loss summing all the losses and the accuracies:
  - loss: is the total loss
  - acc: dictionary with all the accuracies
'''

def get_loss_and_accuracies(truth_annotations, outputs, cost_func, device):
  loss = 0.0
  acc = {}
  for key, value in truth_annotations.items():
    loss += cost_func(outputs[key], value.type(torch.LongTensor).squeeze(1).to(device)) # calculate the loss
    acc[key] = accuracy(value, outputs[key])
  return loss, acc

def accuracy(truth_labels, outputs):
  _, predicted = torch.max(outputs, dim=1)
  return predicted.eq(truth_labels.squeeze(1)).sum().item() # Note: the .item() is needed to extract scalars from tensors

def calculate_total_accuracy(total_accuracy, current_accuracies):
  if len(total_accuracy) == 0:
    total_accuracy = current_accuracies
    return total_accuracy

  for key, value in total_accuracy.items():
    total_accuracy[key] += current_accuracies[key]
  return total_accuracy

## Optimizer

In [None]:
def get_optimizer(net, lr, wd, momentum):
  optimizer = torch.optim.SGD(net.parameters(), lr, wd, momentum)
  # optimizer = torch.optim.Adam(net.parameters(), lr=lr, betas=(0.9, 0.999), weight_decay=wd)
  return optimizer

## Adaptive Learning Rate

In [None]:
def get_scheduler(optimizer):
  scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=3)
  return scheduler

## Train step

In [None]:
def training_model(model, batch_size, data_loader, cost_func, optimizer, device, n_outputs=27):
  model.train()

  # loss
  total_train_loss = 0.0

  # total accuracy
  total_accuracy = {}

  batch_counter = 0
  tot_samples = 0

  for batch_idx, input_data in enumerate(tqdm(data_loader, desc="Training", leave=False)): #enumerate(data_loader):
    # push input data to GPU
    input_img = input_data['image'].to(device)
    # move annotations (dict) to device
    truth_annotations = input_data['annotations']
    for key, value in truth_annotations.items():
      truth_annotations[key] = truth_annotations[key].to(device)
    # get outputs (predictions) from the model
    outputs = model(input_img.float())

    # push output to GPU
    for key, value in outputs.items():
      outputs[key] = outputs[key].to(device)
    #outputs = move_to(outputs, device)

    # calculate loss, summ all classifiers losses
    loss, current_accuracies = get_loss_and_accuracies(truth_annotations, outputs, cost_func, device)

    # backward
    loss.backward()

    # update weights
    optimizer.step()

    # reset gradiants to zero
    optimizer.zero_grad()

    # update total training loss
    total_train_loss += loss/n_outputs

    # get accuracies
    #current_accuracies = get_all_classifiers_accuracies(truth_annotations, outputs)
    total_accuracy = calculate_total_accuracy(total_accuracy, current_accuracies)

    tot_samples += input_img.shape[0] # batch size
    batch_counter += 1

  # normalize accuracy and loss
  for key, value in total_accuracy.items():
    total_accuracy[key] = value / tot_samples*100

  batch_loss = total_train_loss/batch_counter

  return batch_loss, total_accuracy

## Validation step

In [None]:
def validate_model(model, batch_size, data_loader, cost_func, device, n_outputs=27):
  model.eval()

  # loss
  total_valid_loss = 0.0

  # total accuracy
  total_accuracy = {}

  batch_counter = 0
  tot_samples = 0

  with torch.no_grad():
    for batch_idx, input_data in enumerate(tqdm(data_loader, desc="Validation", leave=False)): # enumerate(data_loader):
      # push input data to GPU
      input_img = input_data['image'].to(device)
      # move annotations (dict) to device
      truth_annotations = input_data['annotations']
      for key, value in truth_annotations.items():
        truth_annotations[key] = truth_annotations[key].to(device)

      # get outputs (predictions) from the model
      outputs = model(input_img.float())

      # push outputs to GPU
      for key, value in outputs.items():
        outputs[key] = outputs[key].to(device)

      # calculate loss, summ all classifiers losses
      loss, current_accuracies = get_loss_and_accuracies(truth_annotations, outputs, cost_func, device)

      # update total training loss
      total_valid_loss += loss/n_outputs

      # get accurecies
      #current_accuracies = get_all_classifiers_accuracies(truth_annotations, outputs)
      total_accuracy = calculate_total_accuracy(total_accuracy, current_accuracies)

      tot_samples += input_img.shape[0] # batch elem size
      batch_counter += 1

    # normalize accuracy and loss
    for key, value in total_accuracy.items():
      total_accuracy[key] = value / tot_samples*100

    batch_loss = total_valid_loss/batch_counter

    return batch_loss, total_accuracy

## Main

In [None]:
def log_values(writer, step, loss, accuracy, prefix):
  writer.add_scalar(f"{prefix}/loss", loss, step)
  writer.add_scalar(f"{prefix}/accuracy", accuracy, step)

In [None]:
def main(batch_size=64,
         device='cuda:0',
         learning_rate=0.01,
         weight_decay=0.01,
         momentum=0.9,
         epochs=100,
         ):
  
  writer = SummaryWriter(log_dir="runs/exp")

  # instantiate dataloaders
  train_loader, valid_loader = get_data(TRAIN_CSV_PATH, VALIDATION_CSV_PATH, TRAIN_FOLDER, batch_size)

  # instantiate network
  net = CustomResNet50().to(device)

  # instatiate optimizer
  optimizer = get_optimizer(net, learning_rate, weight_decay, momentum)

  # instantiate scheduler for adaptive learning rate
  scheduler = get_scheduler(optimizer) 

  # instantiate cost function
  cost_function = get_cost_function()

  # instantiate early stopping
  early_stopping = EarlyStopping(verbose=True)

  for epoch in range(epochs):
    total_train_acc = 0.0
    total_valid_acc = 0.0

    print('-' * 220)
    print('| epoch {:3d}/{:3d}'.format(epoch +1, epochs))

    train_loss, train_accuracy = training_model(net, batch_size, train_loader, cost_function, optimizer, device)

    tr = '| train_loss: {:.3f}'.format(train_loss.data)
    for key, value in train_accuracy.items():
      tr += ' | ' + key + ': {:.3f}'.format(value)
      total_train_acc += value
    tr += ' | avg_acc: {:.3f}'.format(total_train_acc/len(train_accuracy))
    print(tr)

    val_loss, val_accuracy = validate_model(net, batch_size, valid_loader, cost_function, device)

    acc = '| valid_loss: {:.3f}'.format(val_loss.data)
    for key, value in val_accuracy.items():
      acc += ' | ' + key + ': {:.3f}'.format(value)
      total_valid_acc += value 
    acc += ' | avg_acc: {:.3f}'.format(total_valid_acc/len(val_accuracy))
    print(acc)

    scheduler.step(val_loss)

    # log to tensorboard
    log_values(writer, epoch, train_loss, total_train_acc/len(train_accuracy), "Train")
    log_values(writer, epoch, val_loss, total_valid_acc/len(val_accuracy), "Validation")

    early_stopping(val_loss, net)

    if early_stopping.early_stop:
      print("Early stopping has occurred. Reverting to latest save model")
      break

  # closes the logger
  writer.close()

In [None]:
%load_ext tensorboard
%tensorboard --logdir=runs

In [None]:
main()

## Prediction Step

Data preprocessing

In [None]:
def test_preprocessing(test_folder=TEST_FOLDER, test_csv_path=TEST_CSV_PATH):
  # create test dataframe
  test_imgs = os.listdir(test_folder)
  test_df = pd.DataFrame(test_imgs, columns=['filename'])

  # convert dataframe to cs
  test_df.to_csv(test_csv_path)

Test dataset class

In [None]:
class TestMarketDataset(Dataset):
  """ Market test dataset """

  def __init__(self, csv_file, root_dir, transform=None):
    """
    Args:
        csv_file (string): Path to the csv file with annotations.
        root_dir (string): Directory with all the images.
        transform (callable, optional): Optional transform to be applied
            on a sample.
    """
    self.annotations_frame = pd.read_csv(csv_file)
    self.root_dir = root_dir
    self.transform = transform

  def __len__(self):
      return len(self.annotations_frame)

  def __getitem__(self, idx):
    if torch.is_tensor(idx):
        idx = idx.tolist()

    img_path = os.path.join(self.root_dir, str(self.annotations_frame.iloc[idx, 1]))
    image = io.imread(img_path)

    sample = {
        'image': image,
        'image_filename': self.annotations_frame.iloc[idx, 1],
        'image_path': img_path
    }

    if self.transform:
        sample['image'] = self.transform(sample['image'])

    return sample

Data augmentation

In [None]:
imagenet_stats = ([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])

test_transform = tt.Compose([
    tt.ToTensor(), 
    tt.Normalize(*imagenet_stats,inplace=True)
])

Dataloader

In [None]:
def get_test_data(test_csv=TEST_CSV_PATH, test_folder=TEST_FOLDER, transform=test_transform):
  # preprocess data
  test_preprocessing(test_folder, test_csv)
  
  # build dataset object
  test_dataset = TestMarketDataset(test_csv, test_folder, transform)
  
  # instantiate dataloader
  test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=1, num_workers=2, pin_memory=True)

  return test_loader

Test function

In [None]:
def get_prediction(img_id, outputs):
  result = []
  result.append(img_id)
  for key, values in outputs.items():
    _, pred = torch.max(values, dim=1)
    result.append(int(pred.cpu().detach().numpy()))
  return result

In [None]:
def test_model(model, data_loader, device):
  model.eval()

  # pedictions
  predictions = []

  print('Testing has started ..')

  with torch.no_grad():
    for batch_idx, input_data in enumerate(tqdm(data_loader)):

      # push input data to GPU
      input_img = input_data['image'].to(device)
      input_img_id = str(input_data['image_filename'][0])

      # get outputs (predictions) from the model
      outputs = model(input_img.float())

      # push output to GPU
      for key, value in outputs.items():
        outputs[key] = outputs[key].to(device)

      # get final predictions
      final_result = get_prediction(input_img_id, outputs)

      predictions.append(final_result)
    
    print('Testing has finished!!')
    return predictions

Export predictions

In [None]:
def export_predictions(predictions_csv, model_path=MODEL_PATH, device='cuda:0'):
  # get data
  test_loader = get_test_data()

  # load model
  model = torch.load(MODEL_PATH)

  # run test model on test images
  predictions = test_model(model, test_loader, device)
  
  # export classifier outcome to dataframe
  predictions_df = pd.DataFrame(predictions, columns=['filename', 'age', 'backpack', 'bag','handbag','clothes','down','up','hair','hat','gender', 'upblack','upwhite','upred','uppurple','upyellow','upgray','upblue','upgreen','downblack','downwhite','downpink','downpurple','downyellow','downgray','downblue','downgreen','downbrown'])
  
  # save final predictions to csv
  predictions_df.to_csv(predictions_csv, index=False)

In [None]:
predictions_csv_path = DATASET_PATH + 'classification_test.csv'
export_predictions(predictions_csv_path)