In [None]:
# Remember to mount your Google Drive before proceeding!
!pip install -i https://test.pypi.org/simple/ resnet-simple -q
!pip install nvidia-ml-py -q

In [None]:
%load_ext tensorboard
%matplotlib inline
import torch
import random
import pynvml
import matplotlib.pyplot as plt
from torch import optim
from tqdm.auto import tqdm
from typing import Optional, Callable
from sklearn import metrics
from torchvision import datasets
from torchvision import transforms
from torch.utils.data import DataLoader
from torch.utils.tensorboard import SummaryWriter

from resnet_simple import ResNet50, ResNetPredictor

# To insert your own folder directory
MODEL_FOLDER = "Models/"

def calculate_gpu_utilization(gpu_index: int = 0):
    pynvml.nvmlInit()
    handle = pynvml.nvmlDeviceGetHandleByIndex(gpu_index)
    info = pynvml.nvmlDeviceGetMemoryInfo(handle)
    print(f"GPU memory occupied: {info.used // 1024 ** 2} MB.")

# Train ResNet

In [None]:
# Another method to consider (which should yield much better results) would be having a second classification head for the coarse labels
class ModifiedCIFAR100(datasets.CIFAR100):
    def __init__(self, root: str, train: bool = True, transform: Optional[Callable] = None, target_transform: Optional[Callable] = None, download: bool = False):
        super().__init__(root, train, transform, target_transform, download)
        # Can be loaded via __init__(), under entry["coarse_labels"], similar to how fine labels are obtained via entry["fine_labels"]
        self.fine_to_coarse_map = torch.tensor([
            4, 1, 14, 8, 0, 6, 7, 7, 18, 3, 3, 14, 9, 18, 7, 11, 3, 9, 7, 11, 6,
            11, 5, 10, 7, 6, 13, 15, 3, 15, 0, 11, 1, 10, 12, 14, 16, 9, 11, 5, 5,
            19, 8, 8, 15, 13, 14, 17, 18, 10, 16, 4, 17, 4, 2, 0, 17, 4, 18, 17,
            10, 3, 2, 12, 12, 16, 12, 1, 9, 19, 2, 10, 0, 1, 16, 12, 9, 13, 15, 13,
            16, 19, 2, 4, 6, 19, 5, 5, 8, 19, 18, 1, 2, 15, 6, 0, 17, 8, 14, 13
        ])
        # Can be loaded via _load_meta()'s data["coarse_label_names"], similar to how fine labels are obtained via data[self.meta["key"]], where self.meta["key"] == "fine_label_names"
        self.superclasses = [
            "aquatic_mammals",
            "fish",
            "flowers",
            "food_containers",
            "fruit_and_vegetables",
            "household_electrical_devices",
            "household_furniture",
            "insects",
            "large_carnivores",
            "large_man-made_outdoor_things",
            "large_natural_outdoor_scenes",
            "large_omnivores_and_herbivores",
            "medium_mammals",
            "non-insect_invertebrates",
            "people",
            "reptiles",
            "small_mammals",
            "trees",
            "vehicles_1",
            "vehicles_2"
        ]
        # Append superclasses to the end of class list, such that there are 2 labels tagged to 1 image
        # Technically the laziest solution out there, but this counts as a Multi-Label Classification
        # The best solution is to use another classification head to predict the superclasses instead
        # Note that we use Sigmoid instead of Softmax for evaluation as it provides us with topk results based on the user-defined threshold
        # Also bounds logits to (0, 1) and generally we use threshold of 0.5 to split output as 0 or 1
        self.fine_to_coarse_map += len(self.classes)
        self.classes.extend(self.superclasses)
        # One-hot encoding to feed to NN
        #self.coarse_targets = torch.nn.functional.one_hot(torch.tensor(self.targets[]), num_classes = len(self.classes))
        self.targets = torch.tensor(self.targets)
        one_hot_targets = torch.nn.functional.one_hot(self.targets, num_classes = len(self.classes))
        one_hot_targets[torch.arange(one_hot_targets.shape[0]), self.fine_to_coarse_map[self.targets]] = 1
        self.targets = one_hot_targets

In [None]:
mean, std = [0.50707516, 0.48654887, 0.44091784], [0.26733429, 0.25643846, 0.27615047]
batch_size = 256
transform_train = transforms.Compose([
    transforms.RandomHorizontalFlip(p = 0.5),
    transforms.RandomCrop(32, padding = 4),
    transforms.RandAugment(num_ops = 2, magnitude = 9),
    transforms.ToTensor(),
    transforms.Normalize(mean, std)
])

transform_test = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean, std)
])

train = ModifiedCIFAR100(root = "sample_data/cifar", download = True, train = True, transform = transform_train)
trainloader = DataLoader(train, batch_size = batch_size, shuffle = True)

test = ModifiedCIFAR100(root = "sample_data/cifar", download = True, train = False, transform = transform_test)
testloader = DataLoader(test, batch_size = batch_size)

calculate_gpu_utilization(0)

In [None]:
resnet = ResNet50()

# Modelling most of the hyperparameters from https://catalog.ngc.nvidia.com/orgs/nvidia/teams/dle/resources/resnet_pyt
episodes = 150
optimizer = optim.SGD(
    resnet.parameters(),
    lr = batch_size / 1000, # linearly scale lr based on batch_size: https://arxiv.org/pdf/1706.02677.pdf%5B3%5D%20ImageNet
    momentum = 0.875,
    weight_decay = 1/32768
)
lr_scheduler = optim.lr_scheduler.CosineAnnealingLR(
    optimizer,
    T_max = episodes
)

resnet_classifier = ResNetPredictor(
    resnet,
    optimizer = optimizer,
    lr_scheduler = lr_scheduler,
    mode = "multi_label_classification",
    num_classes = len(train.classes),
    dropout = 0.2,
    optimize_predictor = True
)

In [None]:
writer = SummaryWriter()
episode_bar = tqdm(range(episodes))
for episode in episode_bar:
    print(f"[Episode {episode + 1}]")
    # Train model using train data
    y_true, y_pred, loss = resnet_classifier.step(trainloader, training = True)
    f1_score = metrics.f1_score(y_true, y_pred, average = "macro") * 100
    print(f"Train Loss: {loss}")
    print(f"Train F1-Score: {f1_score}%")
    episode_bar.set_description(f"Train Loss: {loss}")
    writer.add_scalar("Train Loss (with Dropout)", loss, episode)
    writer.add_scalar("Train F1-Score (with Dropout)", f1_score, episode)

    # Evaluate every 5 epochs
    if (episode + 1) % 5 == 0:
        # Evaluate model using test data
        y_true, y_pred, loss = resnet_classifier.step(testloader, training = False)
        f1_score = metrics.f1_score(y_true, y_pred, average = "macro") * 100
        print(f"Test Loss: {loss}")
        print(f"Test F1-Score: {f1_score}%")
        writer.add_scalar("Test Loss (without Dropout)", loss, episode)
        writer.add_scalar("Test F1-Score (without Dropout)", f1_score, episode)

    # Write last learning rate to log
    writer.add_scalar("Learning Rate", lr_scheduler.get_last_lr()[0], episode)

    # Save model every 10 epochs
    if (episode + 1) % 10 == 0:
        resnet_classifier.save(MODEL_FOLDER + f"resnet50_cifar100_episode{episode + 1}.safetensors")

calculate_gpu_utilization(0)
writer.flush()
writer.close()
%tensorboard --logdir=runs

# Evaluate ResNet

In [None]:
mean, std = [0.50707516, 0.48654887, 0.44091784], [0.26733429, 0.25643846, 0.27615047]
batch_size = 256
transform_test = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean, std)
])

inverse_transform = transforms.Compose([
    transforms.Normalize([-m/s for m, s in zip(mean, std)], [1/s for s in std]),
    transforms.ToPILImage()
])

test = ModifiedCIFAR100(root = "sample_data/cifar", download = True, train = False, transform = transform_test)
testloader = DataLoader(test, batch_size = batch_size)

resnet = ResNet50()
resnet_classifier = ResNetPredictor(
    resnet,
    mode = "multi_label_classification",
    num_classes = len(test.classes),
    dropout = 0.0
)

# Load and test model from episode 150
resnet_classifier.load(MODEL_FOLDER + "resnet50_cifar100_episode150.safetensors")
y_true, y_pred, loss = resnet_classifier.step(testloader, training = False)

# Calculate metrics
f1_score = metrics.f1_score(y_true, y_pred, average = "macro") * 100
print(f"Test Loss: {loss}")
print(f"Test F1-Score: {f1_score}%")

In [None]:
# Visualize tagging of predicted labels to their images
rows, cols, scale = 15, 2, 8
figure = plt.figure(figsize = (cols * scale, rows * scale))
plt.rc("font", size = 6)

inputs, labels = next(iter(testloader))
with torch.no_grad():
    inputs, labels = inputs[:cols * rows].to(resnet_classifier.device), labels[:cols * rows].to(resnet_classifier.device)
    logits, loss = resnet_classifier(inputs, labels)
    labels = torch.nonzero(labels)[:, 1].view(-1, 2) # 1 coarse label + 1 fine label = 2 labels for each image
    _, predictions = torch.topk(logits, 2, dim = 1) # displays 2 labels with the highest probabilities for simplistic visualization

for i in range(rows * cols):
    ax = figure.add_subplot(rows, cols, i + 1)
    ax.set_title(f"Ground truth: {[test.classes[labels[i][j]] for j in range(2)]}, Prediction: {[test.classes[predictions[i][j]] for j in range(2)]}")
    ax.axis("off")
    ax.imshow(inverse_transform(inputs[i]))
plt.show()