In [1]:
import torch
import torch.nn as nn
from torchvision import transforms, datasets
from torch.utils.data import DataLoader
from torchvision.models import vit_b_16, ViT_B_16_Weights
from torch.utils.tensorboard import SummaryWriter
import time
import copy
import numpy as np
import torch
import torch.nn as nn
from torchvision import transforms, datasets
from torch.utils.data import DataLoader
from torchvision.models import vit_b_16, ViT_B_16_Weights

import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import transforms, datasets, models
from torch.utils.data import DataLoader
from torchvision.models import vit_b_16, ViT_B_16_Weights
from typing import Dict
import math
from typing import Optional, List

import time
import copy
import numpy as np
import matplotlib.pyplot as plt

import torch
import torchvision
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
from torch.utils.data import DataLoader

# Import f1_score for F1 calculation
from sklearn.metrics import f1_score

# Hyperparameters
EPOCHS = 25
batch_size = 64
BASE_LR = 1e-3
WEIGHT_DECAY = 0.03
DROPOUT = 0.1
LORA_ALPHA = 32
LORA_DROPOUT = 0
R_LORA_VALUES = [8]  # LoRA ranks to evaluate


class LoRALayer():
    def __init__(
        self,
        r: int,
        lora_alpha: int,
        lora_dropout: float,
        merge_weights: bool,
    ):
        self.r = r
        self.lora_alpha = lora_alpha

        # Optional dropout
        if lora_dropout > 0.:
            self.lora_dropout = nn.Dropout(p=lora_dropout)
        else:
            self.lora_dropout = lambda x: x
        # Mark the weight as unmerged
        self.merged = False
        self.merge_weights = merge_weights


class xLinear(nn.Linear, LoRALayer):
    # LoRA implemented in a dense layer
    def __init__(
        self,
        in_features: int,
        out_features: int,
        r: int = 0,
        lora_alpha: int = 32,
        lora_dropout: float = 0.0,
        fan_in_fan_out: bool = False,
        merge_weights: bool = True,
        pretrained_weights=None,  # Added to accept pretrained weights
        pretrained_bias=None,     # Added to accept pretrained bias
        **kwargs
    ):
        super().__init__(in_features, out_features, **kwargs)
        LoRALayer.__init__(self, r=r, lora_alpha=lora_alpha, lora_dropout=lora_dropout,
                           merge_weights=merge_weights)

        self.fan_in_fan_out = fan_in_fan_out
        if pretrained_weights is not None:
            self.weight.data = pretrained_weights
        if pretrained_bias is not None:
            self.bias.data = pretrained_bias

        # Actual trainable parameters
        if r > 0:
            self.lora_A = nn.Parameter(self.weight.new_zeros((r, in_features)))
            self.lora_B = nn.Parameter(self.weight.new_zeros((out_features, r)))
            self.scaling = self.lora_alpha / self.r
            self.weight.requires_grad = False
        self._initialize_lora_parameters()  # Only initialize LoRA parameters
        if fan_in_fan_out:
            self.weight.data = self.weight.data.transpose(0, 1)

    def _initialize_lora_parameters(self):
        """
        Initialize only the LoRA-specific parameters (lora_A and lora_B).
        Avoid reinitializing self.weight or self.bias to preserve pretrained values.
        """
        if hasattr(self, 'lora_A'):
            nn.init.kaiming_uniform_(self.lora_A, a=math.sqrt(5))
            nn.init.zeros_(self.lora_B)
            
    def train(self, mode: bool = True):
        def T(w):
            return w.transpose(0, 1) if self.fan_in_fan_out else w
        nn.Linear.train(self, mode)
        if mode:
            if self.merge_weights and self.merged:
                # Make sure that the weights are not merged
                if self.r > 0:
                    self.weight.data -= T(self.lora_B @ self.lora_A) * self.scaling
                self.merged = False
        else:
            if self.merge_weights and not self.merged:
                # Merge the weights and mark it
                if self.r > 0:
                    self.weight.data += T(self.lora_B @ self.lora_A) * self.scaling
                self.merged = True

    def forward(self, x: torch.Tensor):
        def T(w):
            return w.transpose(0, 1) if self.fan_in_fan_out else w
        if self.r > 0 and not self.merged:
            result = F.linear(x, T(self.weight), bias=self.bias)
            result += (self.lora_dropout(x) @ self.lora_A.transpose(0, 1) @ self.lora_B.transpose(0, 1)) * self.scaling
            return result
        else:
            return F.linear(x, T(self.weight), bias=self.bias)

def replace_linear_with_lora(module: nn.Module, parent_name='', skip_substring='heads.head'):
    """
    Recursively replace all nn.Linear modules with LoRALayer.Linear,
    while preserving pretrained weights and biases and skipping specific submodules.
    """
    for name, child in list(module.named_children()):
        # Form the fully qualified name (like 'encoder.layer1.linear')
        module_path = f"{parent_name}.{name}" if parent_name else name

        # Recursively apply to child modules first
        replace_linear_with_lora(child, parent_name=module_path, skip_substring=skip_substring)

        if isinstance(child, nn.Linear) and skip_substring not in module_path:
            # Extract pretrained weights and bias
            pretrained_weights = child.weight.data.clone()
            pretrained_bias = child.bias.data.clone() if child.bias is not None else None

            # Replace the nn.Linear with LoRA-wrapped Linear
            lora_linear = xLinear(
                in_features=child.in_features,
                out_features=child.out_features,
                r=R_LORA,
                lora_alpha=LORA_ALPHA,
                lora_dropout=LORA_DROPOUT,
                pretrained_weights=pretrained_weights,
                pretrained_bias=pretrained_bias,
            )
            setattr(module, name, lora_linear)

def count_trainable_parameters(model):
    """
    Counts and returns the number of trainable parameters in the model.
    """
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

def mark_lora_and_head_as_trainable(model: nn.Module, head_substring="heads.head", bias='none'):
    """
    Unfreeze LoRA parameters + the final classification head (by default `heads.head`).
    Everything else remains frozen.
    """
    for name, param in model.named_parameters():
        # Unfreeze LoRA parameters
        if 'lora_' in name:
            param.requires_grad = True
        # Unfreeze classification head
        elif head_substring in name:
            print("head_substring came:", name)
            param.requires_grad = True
        else:
            param.requires_grad = False

    # Optionally allow some bias fine-tuning
    if bias == 'all':
        for n, p in model.named_parameters():
            if 'bias' in n:
                p.requires_grad = True
    elif bias == 'lora_only':
        for m in model.modules():
            if isinstance(m, LoRALayer) and hasattr(m, 'bias') and m.bias is not None:
                m.bias.requires_grad = True

def lr_lambda(current_step: int):
    """
    Linear decay from step=0 to step=total_steps. At step=0 => 1.0; at step=total_steps => 0.0
    """
    progress = float(current_step) / float(EPOCHS * len(train_loader))
    return max(0.0, 1.0 - progress)

def compare_encoder_weights_consistency_with_xlinear(encoder_before, encoder_after):
    """
    Compare the pretrained weights and biases of nn.Linear layers in the encoder of two models.
    """
    print("Comparing nn.Linear weights and biases between original encoder and modified encoder...")

    for (name_before, module_before), (name_after, module_after) in zip(
        encoder_before.named_modules(), encoder_after.named_modules()
    ):
        if isinstance(module_before, nn.Linear) and isinstance(module_after, xLinear):
            if torch.equal(module_before.weight.data, module_after.weight.data):
                pass
            else:
                print(f"[MISMATCH] {name_before}: Weights differ.")

            if module_before.bias is not None and module_after.bias is not None:
                if torch.equal(module_before.bias.data, module_after.bias.data):
                    pass
                else:
                    print(f"[MISMATCH] {name_before}: Biases differ.")
            elif module_before.bias is None and module_after.bias is None:
                pass
            else:
                print(f"[MISMATCH] {name_before}: One layer has bias while the other does not.")

    print("Comparison complete.")

def evaluate(model, dataloader, criterion, device):
    model.eval()
    total_loss = 0.0
    correct = 0
    total = 0
    all_preds = []
    all_labels = []
    
    with torch.no_grad():
        for images, labels in dataloader:
            images = images.to(device)
            labels = labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)
            total_loss += loss.item() * images.size(0)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            all_preds.extend(predicted.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    
    avg_loss = total_loss / len(dataloader.dataset)
    accuracy = 100.0 * correct / total
    f1 = f1_score(all_labels, all_preds, average='macro')
    return avg_loss, accuracy, f1

# Dataset Preparation (CIFAR100)
weights = ViT_B_16_Weights.IMAGENET1K_V1
preprocess = weights.transforms()

train_dataset = datasets.CIFAR100(
    root='./data', 
    train=True, 
    download=True, 
    transform=preprocess
)
test_dataset = datasets.CIFAR100(
    root='./data', 
    train=False, 
    download=True, 
    transform=preprocess
)

train_size = 45000
val_size = 5000
train_data, val_data = torch.utils.data.random_split(
    train_dataset, 
    [train_size, val_size]
)

train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True, num_workers=4)
val_loader = DataLoader(val_data, batch_size=batch_size, shuffle=False, num_workers=4)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=4)

# Device setup
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Experiment loop for different LoRA ranks
for R_LORA in R_LORA_VALUES:
    print(f"\n{'='*50}")
    print(f"Running experiment with R_LORA = {R_LORA}")
    print(f"{'='*50}")
    
    # Initialize TensorBoard writer
    writer = SummaryWriter(f'logs/rank_{R_LORA}')

    # Model Preparation
    model = vit_b_16(weights=ViT_B_16_Weights.IMAGENET1K_V1)
    num_features = model.heads.head.in_features
    model.heads.head = nn.Sequential(
        nn.Dropout(DROPOUT),
        nn.Linear(num_features, 100)   
    )

    # Apply LoRA modifications
    replace_linear_with_lora(model)
    mark_lora_and_head_as_trainable(model, head_substring="heads.head", bias="none")

    # Memory footprint calculation: move model to device and reset memory stats
    model.to(device)
    torch.cuda.empty_cache()
    torch.cuda.reset_peak_memory_stats()

    # Optimizer and scheduler
    trainable_params = filter(lambda p: p.requires_grad, model.parameters())
    optimizer = torch.optim.AdamW(trainable_params, lr=BASE_LR, weight_decay=WEIGHT_DECAY)
    scheduler = torch.optim.lr_scheduler.LambdaLR(optimizer, lr_lambda)

    # Training loop
    criterion = nn.CrossEntropyLoss()
    for epoch in range(EPOCHS):
        model.train()
        running_loss = 0.0

        for step, (images, labels) in enumerate(train_loader):
            images = images.to(device)
            labels = labels.to(device)

            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            scheduler.step()

            running_loss += loss.item() * images.size(0)

            if step % 100 == 0:
                current_lr = scheduler.get_last_lr()[0]
                print(f"[Epoch {epoch+1}/{EPOCHS} - Step {step}] Loss: {loss.item():.4f}, LR: {current_lr:.6f}")

        # Evaluation on train, validation, and test sets (now with F1 score)
        train_loss, train_acc, train_f1 = evaluate(model, train_loader, criterion, device)
        val_loss, val_acc, val_f1 = evaluate(model, val_loader, criterion, device)
        test_loss, test_acc, test_f1 = evaluate(model, test_loader, criterion, device)

        # Logging to TensorBoard: Loss, Accuracy, and F1 score
        writer.add_scalar(f'Rank_{R_LORA}/Train Loss vs epoch', train_loss, epoch)
        writer.add_scalar(f'Rank_{R_LORA}/Train Acc vs epoch', train_acc, epoch)
        writer.add_scalar(f'Rank_{R_LORA}/Train F1 vs epoch', train_f1, epoch)
        
        writer.add_scalar(f'Rank_{R_LORA}/Val Loss vs epoch', val_loss, epoch)
        writer.add_scalar(f'Rank_{R_LORA}/Val Acc vs epoch', val_acc, epoch)
        writer.add_scalar(f'Rank_{R_LORA}/Val F1 vs epoch', val_f1, epoch)       
        
        writer.add_scalar(f'Rank_{R_LORA}/Test Loss vs epoch', test_loss, epoch)
        writer.add_scalar(f'Rank_{R_LORA}/Test Acc vs epoch', test_acc, epoch)
        writer.add_scalar(f'Rank_{R_LORA}/Test F1 vs epoch', test_f1, epoch)

        print(f"Epoch [{epoch+1}/{EPOCHS}]")
        print(f"Train Loss: {train_loss:.4f} | Acc: {train_acc:.2f}% | F1: {train_f1:.2f}")
        print(f"Val Loss: {val_loss:.4f} | Acc: {val_acc:.2f}% | F1: {val_f1:.2f}")
        print(f"Test Loss: {test_loss:.4f} | Acc: {test_acc:.2f}% | F1: {test_f1:.2f}\n")

    # Count trainable parameters
    lora_params = sum(p.numel() for n, p in model.named_parameters() if 'lora_' in n)
    print(f"Number of trainable parameters: {lora_params}")   

    # After calculating num_trainable_params, measure peak memory usage
    optimizer_memory = (3 * lora_params * 4) / (1024 ** 2)  # 4 bytes per float32, 3x for (param + moments)
    memory_footprint = torch.cuda.max_memory_allocated() / (1024 ** 2)  # In MB

    print(f"\nMemory Breakdown:")
    print(f"Memory footprint for fine-tuning: {memory_footprint:.2f} MB")
    print(f"LoRA params contribution: {optimizer_memory:.2f} MB")

    # Clean up for next experiment
    del model
    torch.cuda.empty_cache()

    writer.close()

Downloading https://www.cs.toronto.edu/~kriz/cifar-100-python.tar.gz to ./data/cifar-100-python.tar.gz


100%|██████████| 169M/169M [00:05<00:00, 29.2MB/s] 


Extracting ./data/cifar-100-python.tar.gz to ./data
Files already downloaded and verified

Running experiment with R_LORA = 8


Downloading: "https://download.pytorch.org/models/vit_b_16-c867db91.pth" to /root/.cache/torch/hub/checkpoints/vit_b_16-c867db91.pth
100%|██████████| 330M/330M [00:02<00:00, 156MB/s]  


head_substring came: heads.head.1.weight
head_substring came: heads.head.1.bias
[Epoch 1/25 - Step 0] Loss: 4.6992, LR: 0.001000
[Epoch 1/25 - Step 100] Loss: 0.6738, LR: 0.000994
[Epoch 1/25 - Step 200] Loss: 0.6440, LR: 0.000989
[Epoch 1/25 - Step 300] Loss: 0.5887, LR: 0.000983
[Epoch 1/25 - Step 400] Loss: 0.6283, LR: 0.000977
[Epoch 1/25 - Step 500] Loss: 0.8398, LR: 0.000972
[Epoch 1/25 - Step 600] Loss: 0.8178, LR: 0.000966
[Epoch 1/25 - Step 700] Loss: 0.5722, LR: 0.000960
Epoch [1/25]
Train Loss: 0.3890 | Acc: 87.89% | F1: 0.88
Val Loss: 0.5481 | Acc: 83.18% | F1: 0.83
Test Loss: 0.5651 | Acc: 82.54% | F1: 0.82

[Epoch 2/25 - Step 0] Loss: 0.6796, LR: 0.000960
[Epoch 2/25 - Step 100] Loss: 0.3613, LR: 0.000954
[Epoch 2/25 - Step 200] Loss: 0.1117, LR: 0.000949
[Epoch 2/25 - Step 300] Loss: 0.3467, LR: 0.000943
[Epoch 2/25 - Step 400] Loss: 0.5321, LR: 0.000937
[Epoch 2/25 - Step 500] Loss: 0.5572, LR: 0.000932
[Epoch 2/25 - Step 600] Loss: 0.5065, LR: 0.000926
[Epoch 2/25 - St

In [5]:
import shutil

# Define the folder to zip and the output zip file name
folder_to_zip = '/kaggle/working/logs'
output_zip = '/kaggle/working/logs_cifar100_rank8.zip'

# Create a zip file
shutil.make_archive(output_zip.replace('.zip', ''), 'zip', folder_to_zip)

print(f"Zipped folder is saved as {output_zip}")


Zipped folder is saved as /kaggle/working/logs_cifar100_rank8.zip


In [None]:
         b               