<a href="https://colab.research.google.com/github/hangsheng0625/Deep_Learning/blob/main/Week11_Kaggle_ViT_ModelFineTuning.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# <span style="color:#0b486b">  FIT3181/5215: In-class Kaggle Competition</span>
***
*CE/Lecturer (Clayton):*  **Dr Trung Le** | trunglm@monash.edu <br/>
*Lecturer (Clayton):* **Prof Dinh Phung** | dinh.phung@monash.edu <br/>
*Lecturer (Malaysia):*  **Dr Arghya Pal** | arghya.pal@monash.edu <br/> <br/>
 <br/>
Faculty of Information Technology, Monash University, Australia
***

# <span style="color:#0b486b"> Kaggle week 11: ViT Transfer learning, Model Fine-Tuning with prompts and Adapters

**Your roles:**
- Leveraging ViT fine-tuning for classification tasks.
- You can adopt either ViT transfer learning, model fine-tuning with prompts, or fine-tuning with adapters.
- The dataset consists of 5640 images across 47 categories, with 1880 images each for training, validation, and testing.
- Predict the test set of 1880 images and submit your solution to Kaggle.

# Setup



In [None]:
!gdown https://drive.google.com/file/d/1BTSDJiLTG6wlrTtIGOhy8sZl73xJSynf/view?usp=sharing --fuzzy # comment out for the second run
# backup
# https://drive.google.com/file/d/1srQLZo461jmYJhtG-P-gxNxEH7LbxaES/view?usp=sharing
# https://drive.google.com/file/d/1NKSuoOTxiT4LQA4Oa4PRbrt0tBmsq6aC/view?usp=sharing
# https://drive.google.com/file/d/1tDF8dN-rZyQ2_urp2WSAMfffgrfeht9-/view?usp=sharing

Downloading...
From (original): https://drive.google.com/uc?id=1BTSDJiLTG6wlrTtIGOhy8sZl73xJSynf
From (redirected): https://drive.google.com/uc?id=1BTSDJiLTG6wlrTtIGOhy8sZl73xJSynf&confirm=t&uuid=4beb237d-c09f-479b-9c84-6261f645a893
To: /content/Kaggle_Week10.zip
100% 626M/626M [00:09<00:00, 66.0MB/s]


In [None]:
!unzip -q -o Kaggle_Week10.zip # comment out for the second run

Download the pre-trained ViT-B_16 and store on Google Colab drive.

In [None]:
# Load imagenet21k pre-train weights for ViT-B_16
!wget https://storage.googleapis.com/vit_models/imagenet21k/ViT-B_16.npz # comment out for the second run

--2024-09-29 11:01:36--  https://storage.googleapis.com/vit_models/imagenet21k/ViT-B_16.npz
Resolving storage.googleapis.com (storage.googleapis.com)... 74.125.130.207, 74.125.68.207, 64.233.170.207, ...
Connecting to storage.googleapis.com (storage.googleapis.com)|74.125.130.207|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 412815506 (394M) [application/octet-stream]
Saving to: ‘ViT-B_16.npz.2’


2024-09-29 11:02:13 (11.3 MB/s) - ‘ViT-B_16.npz.2’ saved [412815506/412815506]



In [None]:
!pip install ml_collections



We import the necessary libraries and packages.

In [None]:
import warnings
warnings.simplefilter("ignore", UserWarning)

import copy
import logging
import math
from tqdm import tqdm
import ml_collections

from os.path import join as pjoin

import torch
import torch.nn as nn
import numpy as np

from torch.nn import CrossEntropyLoss, Dropout, Softmax, Linear, Conv2d, LayerNorm
from torch.nn.modules.utils import _pair
from scipy import ndimage
import torchvision
import torchvision.transforms as transforms


In [None]:
from Kaggle_Week10.vit import Embeddings, Mlp, Attention, Transformer, VisionTransformer
from Kaggle_Week10.training import predict_and_save, load_test_images #, train_epoch_vit

# Prepare the dataset

In [None]:
import os
import torch
from torchvision import datasets, transforms
from torch.utils.data import DataLoader

data_dir = './Kaggle_Week10/dataset'

batch_size = 32
img_size = 224
# Data augmentation for training set
train_transform = transforms.Compose([
    transforms.RandomResizedCrop(img_size),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
# test transform
test_transform = transforms.Compose([
    transforms.Resize((img_size, img_size)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
# Load datasets
train_dataset = datasets.ImageFolder(root=os.path.join(data_dir, 'train'), transform=train_transform)
val_dataset = datasets.ImageFolder(root=os.path.join(data_dir, 'val'), transform=test_transform)

# Create data loaders
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=4)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=4)
test_loader = load_test_images(os.path.join(data_dir, 'test'))

class_names = train_dataset.classes

# Usage

print(f"Number of classes: {len(class_names)}")
# Print some information about the datasets
print(f"Number of training samples: {len(train_loader.dataset)}")
print(f"Number of validation samples: {len(val_loader.dataset)}")
print(f"Number of test samples: {len(test_loader.dataset)}")
# Fetch a batch of training data
images, labels = next(iter(train_loader))
print(f"Batch shape: {images.shape}")
print(f"Labels shape: {labels.shape}")

Loaded 1000 images
Number of classes: 47
Number of training samples: 1880
Number of validation samples: 1880
Number of test samples: 1880
Batch shape: torch.Size([32, 3, 224, 224])
Labels shape: torch.Size([32])


# Prepare the backbone

This code snippet outlines a set of default configurations and utility functions for the Vision Transformer (ViT) model. ViT is a Transformer-based model designed for image classification, where images are processed as a sequence of patches instead of individual pixels, leveraging the attention mechanism traditionally used in NLP. The configuration defines key model components such as the multi-head attention query (`ATTENTION_Q`), key (`ATTENTION_K`), value (`ATTENTION_V`), and output (`ATTENTION_OUT`) layers, as well as fully connected layers (`FC_0` and `FC_1`) within the multi-layer perceptron (MLP) block and normalization layers (`ATTENTION_NORM` and `MLP_NORM`).

The utility function `np2th` is used to convert weights from a NumPy format into a PyTorch tensor, with an option to adjust tensor shapes for convolutional layers. Additionally, the code includes an activation function dictionary (`ACT2FN`), which maps activation function names (like `gelu`, `relu`, and `swish`) to their respective implementations in PyTorch. This setup facilitates flexibility in selecting the desired activation functions and ensures smooth conversion of weights for initializing the ViT model in a PyTorch environment.

In [None]:
# VIT default config
ATTENTION_Q = "MultiHeadDotProductAttention_1/query"
ATTENTION_K = "MultiHeadDotProductAttention_1/key"
ATTENTION_V = "MultiHeadDotProductAttention_1/value"
ATTENTION_OUT = "MultiHeadDotProductAttention_1/out"
FC_0 = "MlpBlock_3/Dense_0"
FC_1 = "MlpBlock_3/Dense_1"
ATTENTION_NORM = "LayerNorm_0"
MLP_NORM = "LayerNorm_2"

def np2th(weights, conv=False):
    """Possibly convert HWIO to OIHW."""
    if conv:
        weights = weights.transpose([3, 2, 0, 1])
    return torch.from_numpy(weights)

def swish(x):
    return x * torch.sigmoid(x)


ACT2FN = {"gelu": torch.nn.functional.gelu, "relu": torch.nn.functional.relu, "swish": swish}

We create a ViT_B_16 model and then load the weights of the pre-trained ViT_B_16 model to our declared ViT_B_16 architecture.  

The `get_b16_config` function is designed to return the configuration settings for the Vision Transformer (ViT) model with a base architecture (ViT-B) using 16x16 pixel patches. It specifies various parameters such as hidden size, number of transformer layers, attention heads, and dropout rates, which are crucial for initializing the model's architecture and training it effectively. This configuration serves as a foundational setup for the ViT model, ensuring that it operates as intended for image classification tasks.

In [None]:
def get_b16_config():
    """Returns the ViT-B/16 configuration."""
    config = ml_collections.ConfigDict()
    config.patches = ml_collections.ConfigDict({'size': (16, 16)})  # Set patch size to 16x16
    config.hidden_size = 768  # Define hidden size for the transformer
    config.transformer = ml_collections.ConfigDict()
    config.transformer.mlp_dim = 3072  # Set the dimension of the MLP in the transformer
    config.transformer.num_heads = 12  # Define the number of attention heads
    config.transformer.num_layers = 12  # Set the number of transformer layers
    config.transformer.attention_dropout_rate = 0.0  # Set dropout rate for attention
    config.transformer.dropout_rate = 0.1  # Set general dropout rate
    config.classifier = 'token'  # Specify classifier type
    config.representation_size = None  # Set representation size (None for default)
    return config

In [None]:
config = get_b16_config()
pretrained_dir = "./ViT-B_16.npz"

# Finetuning ViT models

In [None]:
def train_epoch_vit(model, optimizer, train_loader, test_loader, scheduler, e):
  running_loss = 0.
  running_acc = 0
  num_data = 0
  model.train()
  lr = scheduler.get_lr()[0]
  loss_fct = CrossEntropyLoss(reduction='mean')
  with tqdm(total=len(train_loader), desc="Epoch {}".format(e)) as tepoch:
    for i, data in enumerate(train_loader):
        tepoch.update(1)
        # Every data instance is an input + label pair
        inputs, labels = [d_i.cuda() for d_i in data]
        # Zero your gradients for every batch!
        optimizer.zero_grad()

        # Make predictions for this batch
        outputs = model(inputs)
        loss = loss_fct(outputs, labels)
        acc = 100.*(torch.argmax(outputs, 1) == labels).sum()
        loss.backward()

        # Adjust learning weights
        optimizer.step()

        # Gather data and report
        running_loss += len(labels)*loss.cpu().item()
        running_acc += acc.cpu().item()
        num_data += len(labels)
        tepoch.set_postfix({"Loss": "={:.3f}, Acc={:.2f}, Lr={:.4f}".format(running_loss/num_data, running_acc/num_data, lr)})
    tepoch.close()
  scheduler.step()
  if e%2==0: # you can modify the period to validate the model for speed up training
    running_loss = 0.
    running_acc = 0
    num_data = 0
    model.eval()
    with torch.no_grad():
      with tqdm(total=len(test_loader), desc="\tTesting") as tepoch:
        for i, data in enumerate(test_loader):
          tepoch.update(1)
          inputs, labels = [d_i.cuda() for d_i in data]
          outputs = model(inputs)
          loss = loss_fct(outputs, labels)
          acc = 100.*(torch.argmax(outputs, 1) == labels).sum()
          # Gather data and report
          running_loss += len(labels)*loss.item()
          running_acc += acc.item()
          num_data += len(labels)
          tepoch.set_postfix({"Loss": "={:.3f}, Acc={:.2f}".format(running_loss/num_data, running_acc/num_data)})
      tepoch.close()

# I) Transfer learning and Finetuning ViT models

A Vision Transformer model is initialized with a specified configuration and image size, and pre-trained weights are loaded from a given directory. The model is then moved to the GPU for computation. An SGD optimizer is created with a learning rate of 0.01 and a momentum of 0.9 to optimize the model's parameters during training. Additionally, a cosine annealing learning rate scheduler is set up to adjust the learning rate over 50 epochs, allowing it to decrease gradually, which can help improve training stability and performance.

In [None]:
epochs = 10
num_classes = 47
model = VisionTransformer(config, img_size, zero_head=True, num_classes=num_classes)
model.load_from(np.load(pretrained_dir))
model.cuda()
optimizer = torch.optim.SGD(model.parameters(), lr=0.01, momentum=0.9)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=epochs)

#### Training ViT models

In [None]:
for e in range(epochs):
  train_epoch_vit(model, optimizer, train_loader, val_loader, scheduler, e)

Epoch 0: 100%|██████████| 59/59 [01:02<00:00,  1.06s/it, Loss==3.670, Acc=37.87, Lr=0.0100]
	Testing: 100%|██████████| 59/59 [00:23<00:00,  2.47it/s, Loss==3.383, Acc=58.40]


## Submit to Kaggle for VIT transfer learning

In [None]:
## create a csv file for fine-tuning for ViTs, and you can submit solution_normal_vit_finetuning.csv to Kaggle
predict_and_save(model, test_loader, device="cuda", submission_path="solution_normal_vit_finetuning.csv")

# II) Model Fine-Tuning with Prompts

The `PromptedTransformer` class extends the base `Transformer` class to incorporate additional prompted tokens into the input embeddings, enhancing the model's ability to capture context or features from the input images. It initializes with a configuration that specifies how to manage prompted tokens, including their number, dropout rates, and initialization strategies. The class features methods for incorporating these prompts into the input data and for processing them through deep layers of the transformer architecture. The `forward` method determines how to handle the input data, either by integrating the prompted tokens directly or utilizing a more complex deep prompting mechanism. Overall, this class is designed to improve performance in tasks where additional context or representation from prompted tokens can enhance the learning capability of the Vision Transformer model.

In [None]:
from functools import reduce
from operator import mul
class PromptedTransformer(Transformer):
    def __init__(self, prompt_config, config, img_size, vis):
        # Ensure the prompt configuration is set to prepend, indicating
        # that the prompt tokens will be added at the beginning of the input.
        assert prompt_config.LOCATION == "prepend"
        # Check that the prompt tokens are to be initialized randomly.
        assert prompt_config.INITIATION == "random"
        # Ensure that no deep layers are specified for the prompt.
        assert prompt_config.NUM_DEEP_LAYERS is None
        # Confirm that deep sharing of prompts is not enabled.
        assert not prompt_config.DEEP_SHARED
        # Initialize the parent Transformer class with the provided configurations.
        super(PromptedTransformer, self).__init__(config, img_size, vis)
        # Store the provided prompt configuration for later use.
        self.prompt_config = prompt_config
        # Store the Vision Transformer configuration for layer setups.
        self.vit_config = config
        # Convert the image size and patch size to a standardized format.
        img_size = _pair(img_size)
        patch_size = _pair(config.patches["size"])
        # Get the number of prompt tokens from the prompt configuration.
        num_tokens = self.prompt_config.NUM_TOKENS
        self.num_tokens = num_tokens  # Store the number of prompted tokens.
        # Initialize a dropout layer for the prompt embeddings.
        self.prompt_dropout = Dropout(self.prompt_config.DROPOUT)
        # Check if prompt embeddings need to be projected to a different dimensionality.
        if self.prompt_config.PROJECT > -1:
            # Set the prompt dimension to the specified project size.
            prompt_dim = self.prompt_config.PROJECT
            # Create a linear layer to project the prompt embeddings to the hidden size.
            self.prompt_proj = nn.Linear(prompt_dim, config.hidden_size)
            # Initialize the weights of the projection layer using Kaiming normal initialization.
            nn.init.kaiming_normal_(self.prompt_proj.weight, a=0, mode='fan_out')
        else:
            # If no projection is required, set the projection layer to be an identity function.
            prompt_dim = config.hidden_size
            self.prompt_proj = nn.Identity()
        # Initialize prompt embeddings based on the specified initiation method.
        if self.prompt_config.INITIATION == "random":
            # Calculate a value for uniform initialization based on the patch size and prompt dimension.
            val = math.sqrt(6. / float(3 * reduce(mul, patch_size, 1) + prompt_dim))  # noqa
            # Create a parameter for prompt embeddings, initialized to zeros.
            self.prompt_embeddings = nn.Parameter(torch.zeros(1, num_tokens, prompt_dim))
            # Use uniform initialization for the prompt embeddings.
            nn.init.uniform_(self.prompt_embeddings.data, -val, val)
            # If deep prompting is enabled, create additional prompt embeddings for deep layers.
            if self.prompt_config.DEEP:  # noqa
                total_d_layer = config.transformer["num_layers"] - 1
                # Create parameters for deep prompt embeddings initialized to zeros.
                self.deep_prompt_embeddings = nn.Parameter(torch.zeros(total_d_layer, num_tokens, prompt_dim))
                # Use uniform initialization for the deep prompt embeddings.
                nn.init.uniform_(self.deep_prompt_embeddings.data, -val, val)
        else:
            # Raise an error if an unsupported initiation scheme is provided.
            raise ValueError("Other initiation scheme is not supported")

    def incorporate_prompt(self, x):
        # combine prompt embeddings with image-patch embeddings
        B = x.shape[0]
        # after CLS token, all before image patches
        x = self.embeddings(x)  # (batch_size, 1 + n_patches, hidden_dim)
        x = torch.cat((
                x[:, :1, :],
                self.prompt_dropout(self.prompt_proj(self.prompt_embeddings).expand(B, -1, -1)),
                x[:, 1:, :]
            ), dim=1)
        # (batch_size, cls_token + n_prompt + n_patches, hidden_dim)

        return x

    def train(self, mode=True):
        # Set the training status for the PromptedTransformer class.
        # This method controls which modules are set to training mode
        # and which are set to evaluation mode based on the `mode` argument.
        if mode:
            # If the mode is True, set the model to training mode:
            # Set the encoder to evaluation mode to freeze its parameters
            # and prevent updates during training. This is useful when
            # the encoder is pre-trained and only the prompt-related
            # modules are being fine-tuned.
            self.encoder.eval()

            # Similarly, set the embeddings layer to evaluation mode,
            # ensuring that the embedding parameters do not get updated
            # during training, thus keeping them static.
            self.embeddings.eval()

            # Set the prompt projection layer to training mode, allowing
            # its parameters to be updated during training. This layer
            # projects the prompt embeddings to match the hidden size.
            self.prompt_proj.train()

            # Set the prompt dropout layer to training mode, enabling
            # dropout during training to prevent overfitting. Dropout
            # will randomly zero some of the prompt embeddings based on
            # the specified dropout rate.
            self.prompt_dropout.train()
        else:
            # If the mode is False, set the model to evaluation mode:
            # Loop through all child modules of the PromptedTransformer
            # and set each one to the specified mode (True for training,
            # False for evaluation). This ensures that all parts of the
            # model can behave correctly depending on the context (i.e.,
            # whether it's being trained or evaluated).
            for module in self.children():
                module.train(mode)

    def forward_deep_prompt(self, embedding_output):
        # Initialize a list to store attention weights for visualization.
        attn_weights = []

        # Initialize hidden_states and weights to None.
        # hidden_states will store the output of each transformer layer,
        # while weights will capture attention weights.
        hidden_states = None
        weights = None

        # Get the batch size from the input embedding output.
        B = embedding_output.shape[0]

        # Retrieve the number of transformer layers from the configuration.
        num_layers = self.vit_config.transformer["num_layers"]

        # Loop through each transformer layer.
        for i in range(num_layers):
            if i == 0:
                # For the first layer, directly pass the embedding output
                # through the encoder's first layer, capturing the output
                # hidden states and attention weights.
                hidden_states, weights = self.encoder.layer[i](embedding_output)
            else:
                # For subsequent layers, check if deep prompt embeddings are being used.
                if i <= self.deep_prompt_embeddings.shape[0]:
                    # Apply dropout and projection to the deep prompt embeddings for the current layer.
                    deep_prompt_emb = self.prompt_dropout(self.prompt_proj(
                        self.deep_prompt_embeddings[i-1]).expand(B, -1, -1))

                    # Concatenate the CLS token, the processed deep prompt embeddings,
                    # and the remaining hidden states from the previous layer.
                    hidden_states = torch.cat((
                        hidden_states[:, :1, :],  # CLS token
                        deep_prompt_emb,          # Deep prompt embeddings
                        hidden_states[:, (1+self.num_tokens):, :]  # Remaining tokens
                    ), dim=1)

                # Pass the updated hidden states through the current transformer layer,
                # capturing the new hidden states and attention weights.
                hidden_states, weights = self.encoder.layer[i](hidden_states)

            # If visualization is enabled, append the attention weights for the current layer.
            if self.encoder.vis:
                attn_weights.append(weights)

        # Apply layer normalization to the final hidden states before returning.
        encoded = self.encoder.encoder_norm(hidden_states)

        # Return the encoded representations and the list of attention weights.
        return encoded, attn_weights

    def forward(self, x):
        # This is the default forward pass for the PromptedTransformer.
        # First, incorporate prompt embeddings into the input tensor x.
        embedding_output = self.incorporate_prompt(x)

        # Check if deep prompting is enabled in the configuration.
        if self.prompt_config.DEEP:
            # If deep prompting is enabled, pass the embedding output
            # through the deep prompt forward function, which processes
            # the embeddings layer by layer and returns the encoded
            # output along with attention weights for visualization.
            encoded, attn_weights = self.forward_deep_prompt(embedding_output)
        else:
            # If deep prompting is not enabled, simply pass the embedding output
            # through the encoder, which computes the encoded representation
            # and also returns attention weights.
            encoded, attn_weights = self.encoder(embedding_output)

        # Return the encoded representations and the attention weights.
        return encoded, attn_weights


The `PromptedVisionTransformer` class extends the functionality of the `VisionTransformer` class by incorporating prompting mechanisms. In its constructor, it initializes the base class with various parameters while asserting that the prompt configuration's pooling type is set to "original." It raises an error if the `prompt_cfg` is `None`. The class instantiates a `PromptedTransformer` object, which manages the incorporation of prompts into the transformer architecture. In the `forward` method, it processes the input `x` through the transformer to obtain attention weights and embeddings. It then extracts the first token (typically the class token) from the transformer output, passes it through a classification head to produce logits, and returns the logits, along with the attention weights if visualization is requested.

In [None]:
class PromptedVisionTransformer(VisionTransformer):
    def __init__(
        self, prompt_cfg, model_type=None,
        img_size=224, num_classes=21843, vis=False, vit_cfg=None, zero_head=False
    ):
        assert prompt_cfg.VIT_POOL_TYPE == "original"
        super(PromptedVisionTransformer, self).__init__(
            vit_cfg, img_size, num_classes, zero_head, vis)
        if prompt_cfg is None:
            raise ValueError("prompt_cfg cannot be None if using PromptedVisionTransformer")
        self.prompt_cfg = prompt_cfg
        # vit_cfg = CONFIGS[model_type]
        self.transformer = PromptedTransformer(prompt_cfg, vit_cfg, img_size, vis)

    def forward(self, x, vis=False):
        # Pass the input x through the transformer to get the output and attention weights.
        x, attn_weights = self.transformer(x)

        # Extract the first token from the transformer output, which is typically used for classification.
        x = x[:, 0]

        # Pass the extracted token through the classification head to obtain the logits.
        logits = self.head(x)

        # If visualization is not requested, return only the logits.
        if not vis:
            return logits

        # If visualization is requested, return both the logits and the attention weights for further analysis.
        return logits, attn_weights

The `get_prompt_config` function defines and returns a configuration dictionary for the ViT-B/16 model, specifically tailored for prompt tuning. The configuration includes parameters such as the number of prompt tokens, their location (set to "prepend"), and initialization methods, which can be random or based on final class embeddings. It also specifies settings for deep prompting, including whether to apply it, the number of deep layers, and sharing of prompt embeddings across layers. Additionally, the configuration outlines options for how the output embeddings are pooled for the classification head, along with dropout rates and the option to save the model state after each epoch.

In [None]:
def get_prompt_config():
    """Returns the ViT-B/16 configuration."""
    config = ml_collections.ConfigDict()
    config.NUM_TOKENS = 5
    config.LOCATION = "prepend"
    # prompt initalizatioin:
        # (1) default "random"
        # (2) "final-cls" use aggregated final [cls] embeddings from training dataset
        # (3) "cls-nolastl": use first 12 cls embeddings (exclude the final output) for deep prompt
        # (4) "cls-nofirstl": use last 12 cls embeddings (exclude the input to first layer)
    config.INITIATION = "random"  # "final-cls", "cls-first12"
    config.CLSEMB_FOLDER = ""
    config.CLSEMB_PATH = ""
    config.PROJECT = -1  # "projection mlp hidden dim"
    config.DEEP = False # "whether do deep prompt or not, only for prepend location"


    config.NUM_DEEP_LAYERS = None  # if set to be an int, then do partial-deep prompt tuning
    config.REVERSE_DEEP = False  # if to only update last n layers, not the input layer
    config.DEEP_SHARED = False  # if true, all deep layers will be use the same prompt emb
    config.FORWARD_DEEP_NOEXPAND = False  # if true, will not expand input sequence for layers without prompt
    # how to get the output emb for cls head:
        # original: follow the orignial backbone choice,
        # img_pool: image patch pool only
        # prompt_pool: prompt embd pool only
        # imgprompt_pool: pool everything but the cls token
    config.VIT_POOL_TYPE = "original"
    config.DROPOUT = 0.0
    config.SAVE_FOR_EACH_EPOCH = False
    return config

In [None]:
prompt_config = get_prompt_config()
vit_config = get_b16_config()

In [None]:
prompt_model = PromptedVisionTransformer(prompt_config, vit_config,
        img_size=img_size, num_classes=num_classes, vis=False, vit_cfg=vit_config, zero_head=True)

In [None]:
#load_pretrain:
prompt_model.load_from(np.load(pretrained_dir))

In [None]:
prompt_model.cuda()
epochs = 10
lr = 0.01
weight_decay=0.0001
prompt_model.train()
trainable_params = []
print_trainable_param = True
for p_name, param in prompt_model.named_parameters():
  if "prompt" not in p_name and "head" not in p_name and "cls_token" not in p_name:
          param.requires_grad = False
  else:
    if print_trainable_param:
          print("\t{}, {}, {}".format(p_name, param.numel(), param.shape))
    # trainable_params.append((key, value))
    trainable_params += [{
                        "params": [param]
                    }]
prompt_optimizer = torch.optim.SGD(trainable_params, lr=lr, momentum=0.9, weight_decay=weight_decay)
# You may consider using a warm_up schedule for this task
prompt_scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(prompt_optimizer, T_max=epochs)

#### Training model fine-tuning with prompts

In [None]:
for e in range(epochs):
  train_epoch_vit(prompt_model, prompt_optimizer, train_loader, val_loader, prompt_scheduler, e)

## Submit to Kaggle for fine-tuning with prompts

In [None]:
## Create a csv file for fine-tuning with prompts, and submit solution_prompt_finetuning.csv to Kaggle
predict_and_save(prompt_model, test_loader, device="cuda", submission_path="solution_prompt_finetuning.csv")

# III) Model Fine-Tuning with Adapters

The `ADPT_Block` class is a modified version of a standard Vision Transformer (ViT) block, incorporating additional Adapter layers to enhance model capacity and flexibility. It initializes essential components such as layer normalization for both attention and feed-forward networks, as well as the attention mechanism itself. Depending on the specified adapter configuration style (currently supporting only "Pfeiffer"), it sets up downsampling and upsampling linear layers that reduce and restore the hidden size, respectively. In the `forward` method, the block first processes the input through the attention mechanism, followed by the feed-forward network, while integrating Adapter layers between these two processes. The class also includes a `load_from` method that facilitates the loading of weights from a pre-trained model, ensuring that the adapter's parameters are correctly initialized with the corresponding attention and feed-forward weights, as well as the normalization layers' parameters. This structure enables the model to leverage pre-trained knowledge while benefiting from the flexibility of Adapter layers.

In [None]:
# Re-define the ViT model with Adapter
# The only mayjor difference is ADPT_Block and Block.
# ADPT_Block uses additional Adapter blocks

class ADPT_Block(nn.Module):
    def __init__(self, config, vis, adapter_config):
        super(ADPT_Block, self).__init__()
        self.hidden_size = config.hidden_size
        self.attention_norm = LayerNorm(config.hidden_size, eps=1e-6)
        self.ffn_norm = LayerNorm(config.hidden_size, eps=1e-6)
        self.ffn = Mlp(config)
        self.attn = Attention(config, vis)

        self.adapter_config = adapter_config

        if adapter_config.STYLE == "Pfeiffer":
            self.adapter_downsample = nn.Linear(
                config.hidden_size,
                config.hidden_size // adapter_config.REDUCATION_FACTOR
            )
            self.adapter_upsample = nn.Linear(
                config.hidden_size // adapter_config.REDUCATION_FACTOR,
                config.hidden_size
            )
            self.adapter_act_fn = ACT2FN["gelu"]

            nn.init.zeros_(self.adapter_downsample.weight)
            nn.init.zeros_(self.adapter_downsample.bias)

            nn.init.zeros_(self.adapter_upsample.weight)
            nn.init.zeros_(self.adapter_upsample.bias)
        else:
            raise ValueError("Other adapter styles are not supported.")

    def forward(self, x):
    # Check if the adapter configuration style is "Pfeiffer"
      if self.adapter_config.STYLE == "Pfeiffer":
        # Store the original input x for later residual connection
        h = x

        # Apply layer normalization to the input x
        x = self.attention_norm(x)

        # Pass the normalized input through the attention layer
        # The output x is the result of the attention mechanism
        # 'weights' stores the attention weights for analysis
        x, weights = self.attn(x)

        # Add the original input (h) to the output of the attention block (residual connection)
        x = x + h

        # Store the output after attention for another residual connection
        h = x

        # Apply layer normalization to the output of the attention block
        x = self.ffn_norm(x)

        # Pass the normalized output through the feed-forward network
        x = self.ffn(x)

        # Start inserting adapter layers after the feed-forward network
        # First, downsample the output using a linear layer
        adpt = self.adapter_downsample(x)

        # Apply the activation function (e.g., GELU) to the downsampled output
        adpt = self.adapter_act_fn(adpt)

        # Upsample the activated output back to the original hidden size
        adpt = self.adapter_upsample(adpt)

        # Add the output of the adapter layers back to the original feed-forward output (x)
        x = adpt + x
        # End of adapter insertion

        # Add the output after the adapter layers to the original input from the attention block (residual connection)
        x = x + h

        # Return the final output and the attention weights
        return x, weights

    def load_from(self, weights, n_block):
        ROOT = f"Transformer/encoderblock_{n_block}"
        with torch.no_grad():
            query_weight = np2th(weights[pjoin(ROOT, ATTENTION_Q, "kernel")]).view(self.hidden_size, self.hidden_size).t()
            key_weight = np2th(weights[pjoin(ROOT, ATTENTION_K, "kernel")]).view(self.hidden_size, self.hidden_size).t()
            value_weight = np2th(weights[pjoin(ROOT, ATTENTION_V, "kernel")]).view(self.hidden_size, self.hidden_size).t()
            out_weight = np2th(weights[pjoin(ROOT, ATTENTION_OUT, "kernel")]).view(self.hidden_size, self.hidden_size).t()

            query_bias = np2th(weights[pjoin(ROOT, ATTENTION_Q, "bias")]).view(-1)
            key_bias = np2th(weights[pjoin(ROOT, ATTENTION_K, "bias")]).view(-1)
            value_bias = np2th(weights[pjoin(ROOT, ATTENTION_V, "bias")]).view(-1)
            out_bias = np2th(weights[pjoin(ROOT, ATTENTION_OUT, "bias")]).view(-1)

            self.attn.query.weight.copy_(query_weight)
            self.attn.key.weight.copy_(key_weight)
            self.attn.value.weight.copy_(value_weight)
            self.attn.out.weight.copy_(out_weight)
            self.attn.query.bias.copy_(query_bias)
            self.attn.key.bias.copy_(key_bias)
            self.attn.value.bias.copy_(value_bias)
            self.attn.out.bias.copy_(out_bias)

            mlp_weight_0 = np2th(weights[pjoin(ROOT, FC_0, "kernel")]).t()
            mlp_weight_1 = np2th(weights[pjoin(ROOT, FC_1, "kernel")]).t()
            mlp_bias_0 = np2th(weights[pjoin(ROOT, FC_0, "bias")]).t()
            mlp_bias_1 = np2th(weights[pjoin(ROOT, FC_1, "bias")]).t()

            self.ffn.fc1.weight.copy_(mlp_weight_0)
            self.ffn.fc2.weight.copy_(mlp_weight_1)
            self.ffn.fc1.bias.copy_(mlp_bias_0)
            self.ffn.fc2.bias.copy_(mlp_bias_1)

            self.attention_norm.weight.copy_(np2th(weights[pjoin(ROOT, ATTENTION_NORM, "scale")]))
            self.attention_norm.bias.copy_(np2th(weights[pjoin(ROOT, ATTENTION_NORM, "bias")]))
            self.ffn_norm.weight.copy_(np2th(weights[pjoin(ROOT, MLP_NORM, "scale")]))
            self.ffn_norm.bias.copy_(np2th(weights[pjoin(ROOT, MLP_NORM, "bias")]))

The `ADPT_Encoder` class defines a custom encoder module for a vision transformer model that incorporates adapter blocks. In its initialization, it creates a series of adapter layers based on the specified configuration, ensuring that each layer is deep-copied to maintain independent parameters. The `forward` method processes the input hidden states through each adapter block in sequence, collecting the attention weights if visualization is enabled. After passing through all layers, the output hidden states are normalized, and both the normalized states and the attention weights are returned, facilitating subsequent processing or analysis.

In [None]:
class ADPT_Encoder(nn.Module):
    def __init__(self, config, vis, adapter_cfg):
        super(ADPT_Encoder, self).__init__()
        self.vis = vis
        self.layer = nn.ModuleList()
        self.encoder_norm = LayerNorm(config.hidden_size, eps=1e-6)

        self.num_layers = config.transformer["num_layers"]
        for _ in range(self.num_layers):
            layer = ADPT_Block(config, vis, adapter_cfg)
            self.layer.append(copy.deepcopy(layer))

    def forward(self, hidden_states):
        attn_weights = []
        for layer_block in self.layer:
            hidden_states, weights = layer_block(hidden_states)
            if self.vis:
                attn_weights.append(weights)
        encoded = self.encoder_norm(hidden_states)
        return encoded, attn_weights

The `ADPT_Transformer` class defines a transformer model that integrates an embedding layer and an adapter-based encoder. During initialization, it creates an instance of the `Embeddings` class to process input data based on the given configuration and image size, and it initializes the `ADPT_Encoder` to handle the encoded representations. In the `forward` method, the input IDs are transformed into embeddings, which are then passed through the encoder. The method returns both the encoded representations and the attention weights, allowing for further processing or analysis in a vision transformer context.

In [None]:
class ADPT_Transformer(nn.Module):
    def __init__(self, config, img_size, vis, adapter_cfg):
        super(ADPT_Transformer, self).__init__()
        self.embeddings = Embeddings(config, img_size=img_size)
        self.encoder = ADPT_Encoder(config, vis, adapter_cfg)

    def forward(self, input_ids):
        embedding_output = self.embeddings(input_ids)

        encoded, attn_weights = self.encoder(embedding_output)
        return encoded, attn_weights

The `ADPT_VisionTransformer` class implements a vision transformer model that utilizes adapter blocks for improved performance and adaptability. Upon initialization, it sets up various components, including a transformer instance (`ADPT_Transformer`) and a linear classification head, depending on the number of classes specified. The `forward` method processes input images, passing them through the transformer and obtaining logits from the first token's output. If the `vis` flag is set, it also returns attention weights. Additionally, the `load_from` method facilitates loading pre-trained weights into the model's layers, managing various scenarios such as zeroing the head or resizing position embeddings. This structure allows the model to adapt efficiently while benefiting from the capabilities of the transformer architecture.

In [None]:
class ADPT_VisionTransformer(nn.Module):
    def __init__(
        self, model_type=None,
        img_size=224, num_classes=21843, vis=False, adapter_cfg=None, vit_cfg=None, zero_head=False
    ):
        super(ADPT_VisionTransformer, self).__init__()
        self.num_classes = num_classes
        self.zero_head = zero_head
        self.classifier = config.classifier

        self.transformer = ADPT_Transformer(vit_cfg, img_size, vis, adapter_cfg)
        self.head = Linear(config.hidden_size, num_classes) if num_classes > 0 else nn.Identity()

    def forward(self, x, label=None, vis=False):
        x, attn_weights = self.transformer(x)
        logits = self.head(x[:, 0])

        if not vis:
            return logits
        return logits, attn_weights

    def load_from(self, weights):
        with torch.no_grad():
            if self.zero_head:
                nn.init.zeros_(self.head.weight)
                nn.init.zeros_(self.head.bias)
            else:
                self.head.weight.copy_(np2th(weights["head/kernel"]).t())
                self.head.bias.copy_(np2th(weights["head/bias"]).t())

            self.transformer.embeddings.patch_embeddings.weight.copy_(np2th(weights["embedding/kernel"], conv=True))
            self.transformer.embeddings.patch_embeddings.bias.copy_(np2th(weights["embedding/bias"]))
            self.transformer.embeddings.cls_token.copy_(np2th(weights["cls"]))
            self.transformer.encoder.encoder_norm.weight.copy_(np2th(weights["Transformer/encoder_norm/scale"]))
            self.transformer.encoder.encoder_norm.bias.copy_(np2th(weights["Transformer/encoder_norm/bias"]))

            posemb = np2th(weights["Transformer/posembed_input/pos_embedding"])
            posemb_new = self.transformer.embeddings.position_embeddings
            if posemb.size() == posemb_new.size():
                self.transformer.embeddings.position_embeddings.copy_(posemb)
            else:
                print("load_pretrained: resized variant: %s to %s" % (posemb.size(), posemb_new.size()))
                ntok_new = posemb_new.size(1)

                if self.classifier == "token":
                    posemb_tok, posemb_grid = posemb[:, :1], posemb[0, 1:]
                    ntok_new -= 1
                else:
                    posemb_tok, posemb_grid = posemb[:, :0], posemb[0]

                gs_old = int(np.sqrt(len(posemb_grid)))
                gs_new = int(np.sqrt(ntok_new))
                print('load_pretrained: grid-size from %s to %s' % (gs_old, gs_new))
                posemb_grid = posemb_grid.reshape(gs_old, gs_old, -1)

                zoom = (gs_new / gs_old, gs_new / gs_old, 1)
                posemb_grid = ndimage.zoom(posemb_grid, zoom, order=1)
                posemb_grid = posemb_grid.reshape(1, gs_new * gs_new, -1)
                posemb = np.concatenate([posemb_tok, posemb_grid], axis=1)
                self.transformer.embeddings.position_embeddings.copy_(np2th(posemb))

            for bname, block in self.transformer.encoder.named_children():
                for uname, unit in block.named_children():
                    unit.load_from(weights, n_block=uname)

            if self.transformer.embeddings.hybrid:
                self.transformer.embeddings.hybrid_model.root.conv.weight.copy_(np2th(weights["conv_root/kernel"], conv=True))
                gn_weight = np2th(weights["gn_root/scale"]).view(-1)
                gn_bias = np2th(weights["gn_root/bias"]).view(-1)
                self.transformer.embeddings.hybrid_model.root.gn.weight.copy_(gn_weight)
                self.transformer.embeddings.hybrid_model.root.gn.bias.copy_(gn_bias)

                for bname, block in self.transformer.embeddings.hybrid_model.body.named_children():
                    for uname, unit in block.named_children():
                        unit.load_from(weights, n_block=bname, n_unit=uname)

In [None]:
def get_adapter_config():
  config = ml_collections.ConfigDict()
  config.REDUCATION_FACTOR = 8
  config.STYLE = "Pfeiffer"
  return config

In [None]:
adapter_config = get_adapter_config()
vit_config = get_b16_config()

adapter_model = ADPT_VisionTransformer(adapter_cfg=adapter_config, vit_cfg=vit_config, img_size=img_size, zero_head=True, num_classes=num_classes)
adapter_model.load_from(np.load(pretrained_dir))
adapter_model = adapter_model.cuda()



```
Hyperparameters for finetuning with adapters
```



In [None]:
epochs = 10
lr = 0.01
weight_decay=0.0001
adapter_model.train()
adapter_trainable_params = []
print("Trainable params:")
for p_name, param in adapter_model.named_parameters():
  if "adapter" not in p_name and "head" not in p_name and "cls_token" not in p_name:
          param.requires_grad = False
  else:
    print("\t{}, {}, {}".format(p_name, param.numel(), param.shape))
    # trainable_params.append((key, value))
    adapter_trainable_params += [{
                        "params": [param]
                    }]
adapter_optimizer = torch.optim.SGD(adapter_trainable_params, lr=lr, momentum=0.9, weight_decay=weight_decay)
# You may consider using a warm_up schedule for this task
adapter_scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(adapter_optimizer, T_max=epochs)

#### Training model fine-tuning with adapters

In [None]:
for e in range(epochs):
  train_epoch_vit(adapter_model, adapter_optimizer, train_loader, test_loader, adapter_scheduler, e)

## Submit to Kaggle for fine-tuning with Adapters

In [None]:
## Create a csv file for fine-tuning with adapters and submit solution_adapters_finetuning.csv to Kaggle
predict_and_save(adapter_model, test_loader, device="cuda", submission_path="solution_adapters_finetuning.csv")

# Notes

You need to download the `solution_normal_vit_finetuning.csv`, `solution_prompt_finetuning.csv`, or  `solution_adapters_finetuning.csv` files and upload them to the provided Kaggle competition URL

Remember to change your display name on the Leaderboard to:
 \<**Your team name**\>



Tutorial 4: https://www.kaggle.com/t/5eed50209f4b4b40a23a8343117eebdd

Tutorial 3: https://www.kaggle.com/t/115e016843df4f0f8060701a39a8b339