In [1]:
!pip install lightning datasets torchmetrics --quiet
!pip install -U --quiet wandb

In [2]:
import os
import sys
import gc

import numpy as np
import pandas as pd
from tqdm.auto import tqdm

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.optim import AdamW, SGD
from torch.utils.data import DataLoader
from torch.optim.lr_scheduler import CosineAnnealingLR

import torchvision
import torchvision.transforms as T

import lightning as L
from lightning import seed_everything
from lightning.pytorch.callbacks import TQDMProgressBar
from pytorch_lightning.loggers import WandbLogger

import torchmetrics
from torchmetrics.functional import accuracy, f1_score

import transformers
from transformers import AutoProcessor, AutoModel, AutoModelForImageClassification
from transformers import get_linear_schedule_with_warmup

from datasets import Dataset, DatasetDict, load_dataset

import matplotlib as  mpl
import matplotlib.pyplot as plt
import seaborn as sns

import PIL
from PIL import Image

In [3]:
import google.colab
google.colab.drive.mount('/content/drive')
os.chdir('/content/drive/MyDrive/datasets/tiny-imagenet')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [4]:
seed_everything(42)

INFO: Seed set to 42
INFO:lightning.fabric.utilities.seed:Seed set to 42


42

In [5]:
processor = AutoProcessor.from_pretrained('google/vit-base-patch16-224')

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


In [6]:
def process(x):
    img_tensor = x['image']
    img_tensor = T.functional.to_tensor(img_tensor)
    if img_tensor.shape[0] == 1:
        img_tensor = img_tensor.repeat(3, 1, 1)
    processed_img = processor(img_tensor, do_rescale=False)['pixel_values'][0]
    return {'pixel_values': processed_img, 'label': x['label']}

In [7]:
if not os.path.exists('./tiny-imagenet'):
    ds = load_dataset('zh-plus/tiny-imagenet')
    ds = ds.map(process)
    ds = ds.remove_columns(['image'])
    ds.save_to_disk('./tiny-imagenet')
else:
    ds = DatasetDict.load_from_disk('./tiny-imagenet')

In [8]:
teacher_dataloader = DataLoader(ds['train'].shuffle()
                                           .select(range(10_000))
                                           .with_format('torch'),
                                batch_size=64,
                                shuffle=True)
train_dataloader = DataLoader(ds['train'].shuffle()
                                         .select(range(50_000))
                                         .with_format('torch'),
                              batch_size=32,
                              shuffle=True)
test_dataloader = DataLoader(ds['valid'].shuffle()
                                        .with_format('torch'),
                             batch_size=128,
                             shuffle=True)

In [9]:
model = AutoModelForImageClassification.from_pretrained('google/vit-base-patch16-224')
model

ViTForImageClassification(
  (vit): ViTModel(
    (embeddings): ViTEmbeddings(
      (patch_embeddings): ViTPatchEmbeddings(
        (projection): Conv2d(3, 768, kernel_size=(16, 16), stride=(16, 16))
      )
      (dropout): Dropout(p=0.0, inplace=False)
    )
    (encoder): ViTEncoder(
      (layer): ModuleList(
        (0-11): 12 x ViTLayer(
          (attention): ViTAttention(
            (attention): ViTSelfAttention(
              (query): Linear(in_features=768, out_features=768, bias=True)
              (key): Linear(in_features=768, out_features=768, bias=True)
              (value): Linear(in_features=768, out_features=768, bias=True)
              (dropout): Dropout(p=0.0, inplace=False)
            )
            (output): ViTSelfOutput(
              (dense): Linear(in_features=768, out_features=768, bias=True)
              (dropout): Dropout(p=0.0, inplace=False)
            )
          )
          (intermediate): ViTIntermediate(
            (dense): Linear(in_features=7

In [10]:
class TeacherViT(L.LightningModule):
    def __init__(self, ckpt='google/vit-base-patch16-224'):
        super().__init__()
        self.model = AutoModelForImageClassification.from_pretrained(ckpt)
        self.model.classifier = nn.Linear(768, 200)
        self.model.vit.requires_grad = False
        self.criterion = nn.CrossEntropyLoss()

    def forward(self, x):
        return self.model(x)

    def training_step(self, batch, batch_idx):
        x = batch['pixel_values']
        y = batch['label']
        y_hat = F.softmax(self(x).logits)
        loss = self.criterion(y_hat, y)
        self.log('teacher/train/loss', loss, prog_bar=True)
        return loss

    def test_step(self, batch, batch_idx):
        x = batch['pixel_values']
        y = batch['label']
        y_hat = F.softmax(self(x).logits)
        loss = self.criterion(y_hat, y)
        self.log('teacher/test/loss', loss)
        self.log('teacher/test/acc', accuracy(y_hat, y, task="multiclass", num_classes=200))
        self.log('teacher/test/f1', f1_score(y_hat, y, task="multiclass", num_classes=200), prog_bar=True)

    def configure_optimizers(self):
        optimizer = AdamW(self.model.parameters(), lr=0.002)
        scheduler = CosineAnnealingLR(optimizer, T_max=10_000 // 32 + 1)
        scheduler = {
            'scheduler': scheduler,
            'interval': 'step',
            'frequency': 1
        }
        return [optimizer], [scheduler]

In [11]:
class DistilViT(L.LightningModule):
    def __init__(self, teacher_model, ckpt='google/vit-base-patch16-224'):
        super().__init__()
        self.teacher_model = teacher_model
        self.model = AutoModelForImageClassification.from_pretrained(ckpt)
        self.model.vit.encoder.layer = self.model.vit.encoder.layer[::2]
        self.model.classifier = nn.Linear(768, 200)
        self.teacher_model.requires_grad = False

        self.kl_criterion = nn.KLDivLoss(reduction='batchmean')
        self.criterion = nn.CrossEntropyLoss()

    def forward(self, x):
        return self.model(x)

    def training_step(self, batch, batch_idx):
        x = batch['pixel_values']
        y = batch['label']

        student_y_hat = self(x)
        with torch.no_grad():
            teacher_y_hat = self.teacher_model(x)
        kl_loss = self.kl_criterion(F.log_softmax(student_y_hat.logits, dim=1),
                                    F.log_softmax(teacher_y_hat.logits, dim=1))
        gt_loss = self.criterion(student_y_hat.logits, y)
        loss = kl_loss + gt_loss

        self.log('distil/train/kl_loss',kl_loss, on_step=True, prog_bar=True)
        self.log('distil/train/gt_loss', gt_loss, on_step=True, prog_bar=True)
        self.log('distil/train/loss', loss, on_step=True, prog_bar=True)
        return loss

    def test_step(self, batch, batch_idx):
        x = batch['pixel_values']
        y = batch['label']
        y_hat = F.softmax(self(x).logits)
        loss = self.criterion(y_hat, y)
        self.log('distil/test/loss', loss)
        self.log('distil/test/acc', accuracy(y_hat, y, task="multiclass", num_classes=200))
        self.log('distil/test/f1', f1_score(y_hat, y, task="multiclass", num_classes=200))

    def configure_optimizers(self):
        optimizer = AdamW(self.model.parameters(), lr=0.002)
        scheduler = CosineAnnealingLR(optimizer, T_max=50_000 // 32 + 1)
        scheduler = {
            'scheduler': scheduler,
            'interval': 'step',
            'frequency': 1
        }
        return [optimizer], [scheduler]

In [12]:
teacher_model = TeacherViT()

In [13]:
teacher_logger = WandbLogger(log_model="all")

In [14]:
teacher_trainer = L.Trainer(max_epochs=1, callbacks=[TQDMProgressBar(refresh_rate=1)], logger=teacher_logger)
teacher_trainer.fit(teacher_model, train_dataloaders=teacher_dataloader)

INFO:pytorch_lightning.utilities.rank_zero:GPU available: True (cuda), used: True
INFO:pytorch_lightning.utilities.rank_zero:TPU available: False, using: 0 TPU cores
INFO:pytorch_lightning.utilities.rank_zero:IPU available: False, using: 0 IPUs
INFO:pytorch_lightning.utilities.rank_zero:HPU available: False, using: 0 HPUs
[34m[1mwandb[0m: Currently logged in as: [33msol1[0m ([33msol-sqad[0m). Use [1m`wandb login --relogin`[0m to force relogin


INFO: LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
INFO:lightning.pytorch.accelerators.cuda:LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
INFO: 
  | Name      | Type                      | Params
--------------------------------------------------------
0 | model     | ViTForImageClassification | 86.0 M
1 | criterion | CrossEntropyLoss          | 0     
--------------------------------------------------------
86.0 M    Trainable params
0         Non-trainable params
86.0 M    Total params
343.810   Total estimated model params size (MB)
INFO:lightning.pytorch.callbacks.model_summary:
  | Name      | Type                      | Params
--------------------------------------------------------
0 | model     | ViTForImageClassification | 86.0 M
1 | criterion | CrossEntropyLoss          | 0     
--------------------------------------------------------
86.0 M    Trainable params
0         Non-trainable params
86.0 M    Total params
343.810   Total estimated model params size (MB)
/usr/local/lib/pytho

Training: |          | 0/? [00:00<?, ?it/s]

  y_hat = F.softmax(self(x).logits)
INFO:pytorch_lightning.utilities.rank_zero:`Trainer.fit` stopped: `max_epochs=1` reached.


In [15]:
teacher_trainer.test(teacher_model, test_dataloader)

INFO: LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
INFO:lightning.pytorch.accelerators.cuda:LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
/usr/local/lib/python3.10/dist-packages/lightning/pytorch/trainer/connectors/data_connector.py:492: Your `test_dataloader`'s sampler has shuffling enabled, it is strongly recommended that you turn shuffling off for val/test dataloaders.
/usr/local/lib/python3.10/dist-packages/lightning/pytorch/trainer/connectors/data_connector.py:441: The 'test_dataloader' does not have many workers which may be a bottleneck. Consider increasing the value of the `num_workers` argument` to `num_workers=1` in the `DataLoader` to improve performance.


Testing: |          | 0/? [00:00<?, ?it/s]

  y_hat = F.softmax(self(x).logits)
/usr/local/lib/python3.10/dist-packages/lightning/pytorch/trainer/call.py:54: Detected KeyboardInterrupt, attempting graceful shutdown...


In [16]:
gc.collect()
torch.cuda.empty_cache()

In [17]:
distil_logger = WandbLogger(log_model="all")

In [18]:
distil_model = DistilViT(teacher_model)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


In [19]:
trainer = L.Trainer(max_epochs=1, callbacks=[TQDMProgressBar(refresh_rate=10)], logger=distil_logger)
trainer.fit(distil_model, train_dataloaders=train_dataloader)

INFO:pytorch_lightning.utilities.rank_zero:GPU available: True (cuda), used: True
INFO:pytorch_lightning.utilities.rank_zero:TPU available: False, using: 0 TPU cores
INFO:pytorch_lightning.utilities.rank_zero:IPU available: False, using: 0 IPUs
INFO:pytorch_lightning.utilities.rank_zero:HPU available: False, using: 0 HPUs
/usr/local/lib/python3.10/dist-packages/pytorch_lightning/loggers/wandb.py:389: There is a wandb run already in progress and newly created instances of `WandbLogger` will reuse this run. If this is not desired, call `wandb.finish()` before instantiating `WandbLogger`.
/usr/local/lib/python3.10/dist-packages/lightning/pytorch/callbacks/model_checkpoint.py:639: Checkpoint directory ./lightning_logs/y20k4lgs/checkpoints exists and is not empty.
INFO: LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
INFO:lightning.pytorch.accelerators.cuda:LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
INFO: 
  | Name          | Type                      | Params
-------------------------------------

Training: |          | 0/? [00:00<?, ?it/s]

In [20]:
trainer.test(distil_model, test_dataloader)

INFO: LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
INFO:lightning.pytorch.accelerators.cuda:LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]


Testing: |          | 0/? [00:00<?, ?it/s]