# Step 1: Select Task/Dataset
I chose Tiny ImageNet, which contains 100000 images of 200 classes (500 for each class) resized to 64x64 color images.

In [1]:
# from datasets import load_dataset
# from classes import i2d
# import json

In [2]:
# dataset = load_dataset('Maysee/tiny-imagenet', split='train')

In [3]:
# dataset[0]

In [4]:
# with open("dataset_infos.json") as file:
#     dataset_infos = json.load(file)

In [5]:
# class_names = dataset_infos["Maysee--tiny-imagenet"]["features"]["label"]["names"]
# idx2class = {i: class_names[i] for i in range(len(class_names))}

# Step 2: Get to know the data
The dataset is well balanced and has 500 images for each class

In [6]:
# from collections import defaultdict

# class_counts = defaultdict(int)
# for instance in dataset:
#     label = instance['label']
#     class_counts[label] += 1

# for label, count in class_counts.items():
#     print(f"Class {label}: {count} instances")


# Step 3: Structure Modeling

### Step 3.1 Determine how (with which metrics) you want to evaluate your model. Also, consider the error in estimating the metrics.
We will use accuracy and F1 macro (precision, recall) to evaluate our model.

### Step 3.2 Implement basic functionality to train models and evaluate them against each other. It is recommended to use a suitable MLOps platform (e.g. W&B)

In [7]:
from tin import TinyImageNetDataset
from torch.utils.data import DataLoader
from torchvision import transforms, models
from sklearn.metrics import precision_score, recall_score, f1_score

  from tqdm.autonotebook import tqdm


In [8]:
# Define a custom Dataset class because the dataset from load_dataset() is useless
train_data = TinyImageNetDataset(root_dir="./data/tiny-imagenet-200", mode="train")
val_data = TinyImageNetDataset(root_dir="./data/tiny-imagenet-200", mode="val")
test_data = TinyImageNetDataset(root_dir="./data/tiny-imagenet-200", mode="test")

Preloading train data...:   0%|          | 0/100000 [00:00<?, ?it/s]

Preloading val data...:   0%|          | 0/10000 [00:00<?, ?it/s]

Preloading test data...:   0%|          | 0/10000 [00:00<?, ?it/s]

In [9]:
import torch
# reduce the size of train_data by x
train_data = torch.utils.data.Subset(train_data, range(0, len(train_data), 1))
print(f"train_data size: {len(train_data)}")
val_data = torch.utils.data.Subset(val_data, range(0, len(val_data), 1))
print(f"val_data size: {len(val_data)}")
test_data = torch.utils.data.Subset(test_data, range(0, len(test_data), 1))
print(f"test_data size: {len(test_data)}")

train_data size: 100000
val_data size: 10000
test_data size: 10000


In [10]:
BATCH_SIZE = 64
train_loader = DataLoader(train_data, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_data, batch_size=BATCH_SIZE, shuffle=True)
print(f"There are {len(train_loader)} batches in the training set")
print(f"There are {len(val_loader)} batches in the validation set")

There are 1563 batches in the training set
There are 157 batches in the validation set


In [11]:
device = None
if torch.cuda.is_available():
    device = torch.device("cuda:0")
elif torch.backends.mps.is_available():
    device = torch.device("mps")
else:
    device = torch.device("cpu")
print(f"Using device: {device}")

Using device: mps


## Step 3.2.1: Model

In [22]:
import torch.nn as nn
import utils
from typing import List

class CNN_MLP(nn.Module):
    def __init__(
            self,
            dim: int,
            num_classes: int,
            layers: list,
            confs: List[dict],
            in_channels: int,
            out_channels: int,
            dropout=0.5,
            weight_init=None):
        super(CNN_MLP, self).__init__()

        self.net = nn.ModuleList()

        for layer, conf in zip(layers, confs):
            if layer == "C":
                self.net.append(
                    nn.Conv2d(
                        in_channels,
                        out_channels,
                        kernel_size=conf["kernel"],
                        stride=conf["stride"],
                        padding=conf["padding"]
                    )
                )
                # self.net.append(nn.BatchNorm2d(out_channels))
                self.net.append(nn.ReLU())
                self.net.append(nn.BatchNorm2d(out_channels))
                in_channels = out_channels
                out_channels = conf["channels"]
            elif layer == "P":
                self.net.append(
                    nn.MaxPool2d(kernel_size=conf["kernel"])
                )
        
        self.flatten = nn.Flatten()
        self.in_channels = in_channels
        self.dim = utils.get_dim_after_conv_and_pool(
            dim_init=dim,
            layers=layers,
            confs=confs
        )
        print(f"self.dim: {self.dim},\nself.in_channels: {self.in_channels}")
        self.fc1 = nn.Linear(self.dim * self.dim * self.in_channels, 500)
        # self.fc2 = nn.Linear(784, 196)
        self.fc3 = nn.Linear(500, num_classes)
        self.dropout = nn.Dropout(dropout)
        # self.dropout2d = nn.Dropout2d(dropout)
        # Weight Initialization
        # self.initialize_weights(weight_init)
        # if weight_init == "random_uniform":
        #     assert (self.conv1.weight >= 0).all() and (self.conv1.weight <= 1).all()
        #     assert (self.fc1.weight >= 0).all() and (self.fc1.weight <= 1).all()
        # elif weight_init == "random_normal":
        #     assert torch.isclose(self.conv1.weight.mean(), torch.tensor(0.), atol=1e-2).item()
        #     assert torch.isclose(self.conv1.weight.std(), torch.tensor(1.), atol=1e-2).item()


    def forward(self, x):
        N = x.shape[0]
        C, H, W = x.shape[3], x.shape[1], x.shape[2]
        x = x.permute(0, 3, 1, 2)  # From (batch_size, H, W, C) to (batch_size, C, H, W)
        assert x.shape == (N, C, H, W)
        
        for layer in self.net:
            x = layer(x)

        # x = self.dropout2d(x)
        x = self.flatten(x)
        x = self.dropout(x)
        x = nn.ReLU()(self.fc1(x))
        x = self.dropout(x)
        # x = nn.ReLU()(self.fc2(x))
        # TODO add batch norm to fc layers
        x = self.fc3(x)

        return x


    # def initialize_weights(self, kind):
    #     if kind == "random_uniform":
    #         for m in self.modules():
    #             if isinstance(m, nn.Conv2d):
    #                 nn.init.uniform_(m.weight)
    #                 if m.bias is not None:
    #                     nn.init.uniform_(m.bias)
    #             elif isinstance(m, nn.Linear):
    #                 nn.init.uniform_(m.weight)
    #                 nn.init.uniform_(m.bias)
    #     elif kind == "random_normal":
    #         for m in self.modules():
    #             if isinstance(m, nn.Conv2d):
    #                 nn.init.normal_(m.weight)
    #                 if m.bias is not None:
    #                     nn.init.normal_(m.bias)
    #             elif isinstance(m, nn.Linear):
    #                 nn.init.normal_(m.weight)
    #                 nn.init.normal_(m.bias)
    #     elif kind == "xavier":
    #         pass
    #     elif kind == "he":
    #         pass
    #     else:
    #         raise ValueError("Invalid weight initialization kind!")

## Hyperparameters

In [23]:
EPOCHS = 40
BATCH_SIZE = BATCH_SIZE
LEARNING_RATE = 0.001
SEED = 42
WEIGHT_DECAY = None
DROPOUT = 0.5
BATCH_NORM = True
OPTIMIZER = "Adam"

## Step 3.2.2: Model Init

In [25]:
import copy
from torchsummary import summary
import torch.optim as optim
torch.manual_seed(SEED)

layers = ["C", "C", "P", "C", "C", "P", "C", "C", "P"]
confs = [
    {"kernel": 3, "stride": 1, "padding": 1, "channels": 16},
    {"kernel": 3, "stride": 1, "padding": 1, "channels": 32},
    {"kernel": 2},
    {"kernel": 3, "stride": 1, "padding": 1, "channels": 64},
    {"kernel": 3, "stride": 1, "padding": 1, "channels": 64},
    {"kernel": 2},
    {"kernel": 3, "stride": 1, "padding": 1, "channels": 128},
    {"kernel": 3, "stride": 1, "padding": 1, "channels": 128},
    {"kernel": 2},
]

model = CNN_MLP(
    dim=64,
    num_classes=200,
    layers=layers,
    confs=confs,
    in_channels=3,
    out_channels=16,
    dropout=DROPOUT
)

# test_model = copy.deepcopy(model)
# x = torch.randn(BATCH_SIZE, 64, 64, 3)
# output = test_model(x)
# print(f"Shape of dummy tensor: {output.shape}")

optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)
criterion = nn.CrossEntropyLoss()
print(f"Epochs: {EPOCHS}\nBatch size: {BATCH_SIZE}\n\
Learning rate: {LEARNING_RATE}\nSeed: {SEED}\nWeight Decay: {WEIGHT_DECAY}\n\
Dropout: {DROPOUT}\nBatch Norm: {BATCH_NORM}\nOptimizer: {OPTIMIZER}")
print(model)
summary(model)

self.dim: 8,
self.in_channels: 128
Epochs: 40
Batch size: 64
Learning rate: 0.001
Seed: 42
Weight Decay: None
Dropout: 0.5
Batch Norm: True
Optimizer: Adam
CNN_MLP(
  (net): ModuleList(
    (0): Conv2d(3, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU()
    (2): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (3): Conv2d(16, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (4): ReLU()
    (5): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (6): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (7): Conv2d(16, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (8): ReLU()
    (9): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (10): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (11): ReLU()
    (12): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (13)

Layer (type:depth-idx)                   Param #
├─ModuleList: 1-1                        --
|    └─Conv2d: 2-1                       448
|    └─ReLU: 2-2                         --
|    └─BatchNorm2d: 2-3                  32
|    └─Conv2d: 2-4                       2,320
|    └─ReLU: 2-5                         --
|    └─BatchNorm2d: 2-6                  32
|    └─MaxPool2d: 2-7                    --
|    └─Conv2d: 2-8                       4,640
|    └─ReLU: 2-9                         --
|    └─BatchNorm2d: 2-10                 64
|    └─Conv2d: 2-11                      18,496
|    └─ReLU: 2-12                        --
|    └─BatchNorm2d: 2-13                 128
|    └─MaxPool2d: 2-14                   --
|    └─Conv2d: 2-15                      36,928
|    └─ReLU: 2-16                        --
|    └─BatchNorm2d: 2-17                 128
|    └─Conv2d: 2-18                      73,856
|    └─ReLU: 2-19                        --
|    └─BatchNorm2d: 2-20                 256
|    

## Step 3.2.3: Model Train

In [26]:
import wandb
# %env WANDB_LOG_MODEL="end"
%env WANDB_SILENT=true
%env PYTORCH_ENABLE_MPS_FALLBACK=1
try:
    wandb.login()
    wandb.init(project="del",
               entity="hariveliki")
    wandb.define_metric("epoch")
    wandb.define_metric("loss_train", step_metric="epoch")
    wandb.define_metric("loss_eval", step_metric="epoch")
    wandb.define_metric("train_accuracy", step_metric="epoch")
    wandb.define_metric("eval_accuracy", step_metric="epoch")
    wandb.define_metric("f1_macro", step_metric="epoch")
    wandb.define_metric("precision", step_metric="epoch")
    wandb.define_metric("recall", step_metric="epoch")

    # conv1_grads = []
    # def save_grad(module, grad_input, grad_output):
    #     conv1_grads.append(grad_output[0])
    # hook_handle = model.conv1.register_full_backward_hook(save_grad)

    model.to(device)
    for epoch in range(1, EPOCHS+1):
        print(f"|---------------------------| Start Epoch {epoch}: |---------------------------|")
        loss_train = 0
        total = 0
        correct = 0
        model.train()
        for n, batch in enumerate(train_loader):
            imgs = batch["image"]
            imgs = imgs.to(device)
            labels = batch["label"]
            labels = labels.to(device)

            # Forward pass
            labels = labels.long()
            logits = model(imgs)
            preds = nn.functional.softmax(logits, dim=1)
            loss = criterion(logits, labels)
            loss_train += loss.item()
            predicted = preds.argmax(1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

            # Backward pass
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
        loss_train /= len(train_loader)
        train_accuracy = 100 * correct / total
        print(f"Train Loss: {loss_train}")
        print(f"Train Accuracy: {train_accuracy}")

        correct = 0
        total = 0
        loss_eval = 0
        # all_labels = []
        # all_outputs = []
        model.eval()
        for n, batch in enumerate(val_loader):
            imgs = batch["image"]
            imgs = imgs.to(device)
            labels = batch["label"]
            labels = labels.to(device)

            # Forward pass
            labels = labels.long()
            logits = model(imgs)
            preds = nn.functional.softmax(logits, dim=1)
            loss_eval += criterion(logits, labels).item()
            predicted = preds.argmax(1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            # correct += (preds.argmax(1) == labels).sum().item()
            # all_labels.extend(preds.argmax(1).cpu().detach().numpy().tolist())
            # all_outputs.extend(labels.cpu().detach().numpy().tolist())

        loss_eval /= len(val_loader)
        # correct /= len(val_loader)
        eval_accuracy = 100 * correct / total
        # f1_macro = f1_score(all_labels, all_outputs, average='macro')
        # precision = precision_score(all_labels, all_outputs, average='macro', zero_division=0)
        # recall = recall_score(all_labels, all_outputs, average='macro', zero_division=0)
        print(f"Eval Loss: {loss_eval}")
        print(f"Eval Accuracy: {eval_accuracy}")
        # print(f"F1 Macro: {f1_macro}")
        # print(f"Precision: {precision}")
        # print(f"Recall: {recall}")
        wandb.log(
            {
                "epoch": epoch,
                "loss_train": loss_train,
                "loss_eval": loss_eval,
                "train_accuracy": train_accuracy,
                "eval_accuracy": eval_accuracy
            }
        )

    # hook_handle.remove()
    wandb.finish()

except Exception as e:
    wandb.finish()
    raise e

except KeyboardInterrupt:
    wandb.finish()
    raise KeyboardInterrupt

env: WANDB_SILENT=true
env: PYTORCH_ENABLE_MPS_FALLBACK=1
|---------------------------| Start Epoch 1: |---------------------------|
Train Loss: 5.221913453637219
Train Accuracy: 1.096
Eval Loss: 5.042434437259747
Eval Accuracy: 1.9
|---------------------------| Start Epoch 2: |---------------------------|
Train Loss: 5.102122197605751
Train Accuracy: 1.419
Eval Loss: 4.910566876648338
Eval Accuracy: 2.88
|---------------------------| Start Epoch 3: |---------------------------|
Train Loss: 5.010724197651283
Train Accuracy: 1.905
Eval Loss: 4.786122136814579
Eval Accuracy: 3.99
|---------------------------| Start Epoch 4: |---------------------------|
Train Loss: 4.919468417811379
Train Accuracy: 2.563
Eval Loss: 4.629715615776694
Eval Accuracy: 5.46
|---------------------------| Start Epoch 5: |---------------------------|
Train Loss: 4.787711892155448
Train Accuracy: 3.487
Eval Loss: 4.484924143287027
Eval Accuracy: 6.94
|---------------------------| Start Epoch 6: |-----------------



In [16]:
import matplotlib.pyplot as plt

def plot_conv_layer_gradients(batch_idx, conv_grads):
    """
    Plots the gradients of a convolutional layer for a given batch index.

    Args:
    - batch_idx (int): The index of the batch to plot the gradients for.
    - conv_grads (list): A list of the gradients of the convolutional layer.

    Returns:
    - None
    """
    # Get the gradient for that batch
    batch_grad = conv_grads[batch_idx]

    # Convert the PyTorch tensor to a NumPy array
    batch_grad_np = batch_grad.cpu().numpy()

    # Create a plot
    plt.figure(figsize=(10, 10))

    # Assuming the weight tensor is of shape (out_channels, in_channels, kernel_size, kernel_size)
    out_channels = batch_grad_np.shape[0]
    for i in range(out_channels):
        plt.subplot(4, 4, i + 1)
        plt.imshow(batch_grad_np[i, 0], cmap='viridis')
        plt.title(f'Output channel {i}')
        plt.axis('off')

    plt.tight_layout()
    plt.show()

# plot_conv_layer_gradients(0, conv1_grads)