# DL Project 2023/24

## Introduction

Description of the method choosen and the work done

In [1]:
# import modules
import torch
import torchvision
from torchvision.models import resnet50, ResNet50_Weights
import torchvision.transforms as transforms
import torchvision.transforms.functional as TF
from torch.utils.tensorboard import SummaryWriter
import matplotlib.pyplot as plt

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


## Reading Data

In [3]:
import tarfile
import os
import shutil
import re

tar_file = "./drive/MyDrive/DL_project/imagenet-a.tar"
data_folder = "imagenet-a"

# function to untar the dataset and store it in a new folder
def extract_dataset(compress_file, destination_folder):
  # function to change dir names to their words description
  def change_folders_names(readme_file, dataset_root):
    with open(readme_file, 'r') as f:
        lines = f.readlines()
        for line in lines:
            # Match lines containing WordNet IDs and descriptions
            match = re.match(r'n\d+ (.+)', line)
            if match:
                # Split the line into WordNet ID and description
                parts = match.group(0).split()
                wordnet_id = parts[0]
                description = ' '.join(parts[1:])
                os.rename(os.path.join(dataset_root, wordnet_id),
                            os.path.join(dataset_root, description))

  if not os.path.exists(compress_file):
    print("Compress file doesn't exist.")
    return

  if os.path.exists(destination_folder):
    # remove the folder if already exists one
    shutil.rmtree(destination_folder)

  # extract content from the .tar file
  with tarfile.open(compress_file, 'r') as tar_ref:
    tar_ref.extractall("./")
  print("All the data is extracted.")

  change_folders_names(destination_folder+"/README.txt", destination_folder)

extract_dataset(tar_file, data_folder)

All the data is extracted.


In [4]:
ids_list = os.listdir(data_folder)
print(ids_list)
len(ids_list) # 200 folders + 1 readme

['goldfinch', 'README.txt', 'parking meter', 'balloon', 'cottontail rabbit', 'pufferfish', 'salt shaker', 'green iguana', 'fountain', 'sulphur-crested cockatoo', 'weevil', 'mosque', 'monarch butterfly', 'saxophone', 'goblet', 'German Shepherd Dog', 'cello', 'go-kart', 'baseball player', 'ocarina', 'bell pepper', 'Rottweiler', 'ant', 'unicycle', 'Chihuahua', 'billiard table', 'stingray', 'manhole cover', 'junco', 'flagpole', 'syringe', 'wine bottle', 'hair dryer', 'jellyfish', 'academic gown', 'Persian cat', 'fly', 'barn', 'hummingbird', 'rapeseed', 'accordion', 'snowmobile', 'white-headed capuchin', 'schooner', 'cheeseburger', 'box turtle', 'dumbbell', 'broom', 'centipede', 'dragonfly', 'banjo', 'racket', 'agama', 'reel', 'basketball', 'apron', 'goose', 'breastplate', 'chain', 'sleeping bag', 'volleyball', 'jay', 'stick insect', 'cockroach', 'leafhopper', 'mongoose', 'golf cart', 'nail', 'suspension bridge', 'acoustic guitar', 'maraca', 'pretzel', 'tricycle', 'sea lion', 'great egret',

201

In [5]:
from PIL import Image
from torch.utils.data import Dataset, DataLoader, SubsetRandomSampler

# function that returns a DataLoader for the dataset
def get_data(batch_size, dataset_path, transform):

  data = torchvision.datasets.ImageFolder(root=dataset_path, transform=transform)

  class_labels = data.classes
  print(f"The dataset contains {len(data)} images.")
  print(f"The dataset contains {len(class_labels)} labels.")

  test_loader = torch.utils.data.DataLoader(data, batch_size, shuffle=False, num_workers=8)

  return test_loader, class_labels

In [6]:
import matplotlib.pyplot as plt
import numpy as np
import torchvision.transforms.functional as TF

# function to display images from the DataLoader
def show_images(dataloader, num_images=5):
  # get a batch of data
  data_iter = iter(dataloader)
  images, labels = next(data_iter)

  # convert images to numpy array
  images = images.numpy()

  # display images
  fig, axes = plt.subplots(1, num_images, figsize=(15, 3))
  for i in range(num_images):
      image = np.transpose(images[i], (1, 2, 0))  # move channels in last position
      image = np.clip(image, 0, 1)
      axes[i].imshow(image)
      axes[i].axis('off')
      axes[i].set_title(dataloader.dataset.classes[labels[i]])
  plt.show()

## MEMO

In [7]:
from PIL import ImageOps, ImageEnhance

# define some image augmentations

def vertical_flip(img):
    img = TF.to_pil_image(img)
    res = img.transpose(Image.FLIP_TOP_BOTTOM)
    return TF.to_tensor(res)

def brightness(img, factor_range=(0.5, 1.5)):
  img = TF.to_pil_image(img)
  factor = np.random.uniform(factor_range[0], factor_range[1])
  enhancer = ImageEnhance.Brightness(img)
  res = enhancer.enhance(factor)
  return TF.to_tensor(res)

'''
def rotation(img, angle_range=(-45, 45)):
  angle = np.random.uniform(angle_range[0], angle_range[1])
  return img.rotate(angle)
'''

def color(img, factor_range=(0.5, 1.5)):
  img = TF.to_pil_image(img)
  factor = np.random.uniform(factor_range[0], factor_range[1])
  enhancer = ImageEnhance.Color(img)
  res = enhancer.enhance(factor)
  return TF.to_tensor(res)

def sharpness(img, factor_range=(0.5, 1.5)):
  img = TF.to_pil_image(img)
  factor = np.random.uniform(factor_range[0], factor_range[1])
  enhancer = ImageEnhance.Sharpness(img)
  res = enhancer.enhance(factor)
  return TF.to_tensor(res)

augmentations = [vertical_flip, brightness, color, sharpness]

In [8]:
import random

# functon that apply B augmentations to the original image and return M+1 images
def augment_image(img, augmentations, B=15):
  assert len(augmentations) > 0, "There are not augmentations provided."

  images = [img]
  for _ in range(B):
    # randomly choose an augmentation in the augmentation functions
    index = random.randrange(0, len(augmentations))
    augmentation = augmentations[index]
    # apply the augmentation to the original image
    augmented_img = augmentation(img)
    # add the augmented image to the list of images I want to evaluate
    images.append(augmented_img)
  return images

In [9]:
# define the cost function used to evaluate the model output
def get_cost_function():
  cost_function = torch.nn.CrossEntropyLoss()
  return cost_function

In [10]:
# define the optimizer
def get_optimizer(net, lr, wd, momentum):
    optimizer = torch.optim.SGD(net.parameters(), lr=lr, weight_decay=wd, momentum=momentum)
    return optimizer

In [11]:
# compute the marginal output distribution
def marginal_distribution(images, model, transforms, device):
  # collect the prediction for every image in input
  img_results = []
  for img in images:
    single_batch = transforms(img).unsqueeze(0).to(device)
    prediction = model(single_batch).squeeze(0).softmax(0)
    img_results.append(prediction)

  # sum all the resulting tensors
  sum_results = torch.sum(torch.stack(img_results), dim=0).to(device)
  # divide each element by B to obtain the marginal output distribution
  num_images = len(images)
  res = torch.div(sum_results, num_images).to(device)
  return res

In [17]:
# compute the marginal cross entropy
def marginal_cross_entropy(marginal_dist, labels, cost_function):
  entropy = 0.0
  # sum all entropies for the different labels since I don't know the real one
  for label in labels:
    entropy += cost_function(marginal_dist, label)
  return entropy

In [18]:
import copy

# test time robustness via MEMO algorithm
def ttr_MEMO(model, test_sample, labels, B, cost_function, optimizer, transforms, device):
  # save the original model weights
  original_params = copy.deepcopy(model.state_dict())

  with torch.enable_grad():
    # get the B + 1 images
    augmented_images = augment_image(test_sample, augmentations, B)

    # get the marginal output distribution
    marginal_dist = marginal_distribution(augmented_images, model, transforms, device)

    # update the model weights
    loss = marginal_cross_entropy(marginal_dist, labels, cost_function)
    loss.backward()
    optimizer.step()
    optimizer.zero_grad()

  test_sample = transforms(test_sample).unsqueeze(0).to(device)
  output = model(test_sample).squeeze(0).softmax(0)

  # reapply original weights to the model
  model.load_state_dict(original_params)

  return output

## Test Procedure

In [14]:
def test(model, data_loader, B, cost_function, optimizer, transforms, device="cuda"):
  samples = 0.0
  cumulative_loss = 0.0
  cumulative_accuracy = 0.0

  # set the network to evaluation mode
  model.eval()

  # disable gradient computation for testing mode
  with torch.no_grad():
    # iterate over the test set
    for batch_idx, (inputs, targets) in enumerate(data_loader):
      # Load data into GPU
      inputs = inputs.to(device)
      targets = targets.to(device)

      # forward pass
      batch_size = inputs.size(0)
      num_labels = 1000 # 1000 is the ImageNet number of labels
      empty_tensor = torch.empty(num_labels).to(device)

      # apply MEMO to each test point in the batch
      intermediate_outputs = []
      for input in inputs:
        output = ttr_MEMO(model, input, targets, B, cost_function, optimizer, transforms, device)
        intermediate_outputs.append(output)

      outputs = torch.stack(intermediate_outputs).to(device)

      # outputs_no_MEMO = model(inputs)

      # loss computation
      loss = cost_function(outputs, targets)

      # fetch prediction and loss value
      samples+=inputs.shape[0]
      cumulative_loss += loss.item() # Note: the .item() is needed to extract scalars from tensors
      _, predicted = outputs.max(1)

      # compute accuracy
      cumulative_accuracy += predicted.eq(targets).sum().item()

  return cumulative_loss / samples, cumulative_accuracy / samples * 100

## Put all together

In [19]:
def main(
    run_name,
    batch_size = 32,
    device = "cuda",
    learning_rate=0.001,
    weight_decay=0.000001,
    momentum=0.9,
    num_augmentations = 15
):
  # writer = SummaryWriter(log_dir=f"runs/{run_name}")
  device = device

  # itialize the ResNet model
  weights = ResNet50_Weights.DEFAULT
  model = resnet50(weights=weights).to(device)

  # initialize the inference transforms
  preprocess = weights.transforms()
  preprocess

  # initialize the test dataloader
  test_loader, _ = get_data(batch_size, data_folder, preprocess)

  # initialize the optimizer
  optimizer = get_optimizer(model, learning_rate, weight_decay, momentum)

  # initialize the cost function
  cost_function = get_cost_function()

  test_loss, test_accuracy = test(model, test_loader, num_augmentations, cost_function, optimizer, preprocess, device)
  print(f"\tTest loss {test_loss:.5f}, Test accuracy {test_accuracy:.2f}")

In [20]:
main("resnet_MEMO")

The dataset contains 7500 images.
The dataset contains 200 labels.
	Test loss 0.21649, Test accuracy 0.04
