In [1]:
import os
import torch
from torch import nn
import torch.nn.functional as F
from torchvision import transforms
from torchvision.datasets import CIFAR10
from torch.utils.data import DataLoader, random_split
import pytorch_lightning as pl
from torchvision.models import vgg11_bn, VGG11_BN_Weights
from torchmetrics import Accuracy
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import torchvision
from pytorch_lightning.loggers import TensorBoardLogger
from torch.optim.lr_scheduler import StepLR

torch.set_float32_matmul_precision("medium")


In [2]:
transform = transforms.Compose([
    transforms.RandomCrop(32, padding=4),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize((0.4915, 0.4823, 0.4468),
                         (0.2470, 0.2435, 0.2616))
])

In [3]:
batch_size=128
test_dataset = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=8)

Files already downloaded and verified


In [4]:
class RCNN(nn.Module):
    def __init__(self, num_classes=10):
        super(RCNN, self).__init__()
        # Load the pre-trained ResNet-50 model
        self.m = vgg11_bn(weights=VGG11_BN_Weights.DEFAULT)
        # Remove the last layer
        del self.m.classifier[-1]
        # Add a custom fully connected layer
        fc_layer_1 = nn.Linear(4096, 1000)
        classifier = nn.Linear(1000, num_classes)
        relu_activation = nn.ReLU(inplace=True)
        self.m.classifier.add_module("fc_1_rcnn_added", fc_layer_1)
        self.m.classifier.add_module("relu_added", relu_activation)
        self.m.classifier.add_module("classifier_rcnn_added", classifier)
    def forward(self, x):
        x = self.m(x)
        x = torch.nn.functional.log_softmax(x, dim = 1)
        return x

In [5]:
class LightningRCNN(pl.LightningModule):
    def __init__(self, num_classes:int) -> None:
        super().__init__()
        self.model = RCNN(num_classes=num_classes)
        self.loss_module = nn.CrossEntropyLoss()
        self.save_hyperparameters()
        self.accuracy = Accuracy(task="multiclass", num_classes=num_classes)
        
        
    def forward(self, x):
        y = self.model(x)
        return y
    
    def training_step(self, batch, batch_idx):
       imgs, labels = batch
       preds = self.model(imgs)
       loss = self.loss_module(preds, labels)
       acc = self.accuracy(preds, labels)
       self.log_dict({'train_loss': loss, 'train_accuracy': acc}, on_step=False, on_epoch=True, prog_bar=True, logger=True)
       return loss
    
    def backward(self, loss):
        loss.backward()
    
    def validation_step(self, batch, batch_idx):
        imgs, labels = batch
        preds = self.model(imgs)

        loss = self.loss_module(preds, labels)
        acc = self.accuracy(preds, labels)
        self.log_dict({'validation_loss': loss, 'validation_accuracy': acc}, on_step=True, on_epoch=True, prog_bar=False, logger=True)
        return {'validation_loss': loss, 'validation_accuracy': acc}

    def test_step(self, batch, batch_idx):
        imgs, labels = batch
        preds = self.model(imgs)
        acc = self.accuracy(preds, labels)
        loss = self.loss_module(preds, labels)
        self.log_dict({'test_loss': loss, 'test_accuracy': acc}, on_step=True, on_epoch=True, prog_bar=False, logger=True)
        return {'test_loss': loss, 'test_accuracy': acc}
    
    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=1e-3)
        lr_scheduler = StepLR(optimizer, step_size=100, gamma=0.1)  # Define your scheduler
        return [optimizer], [lr_scheduler]

# load finetuned model

In [12]:
model = LightningRCNN.load_from_checkpoint("model_v1_vgg/epoch=8-step=2817.ckpt")

In [13]:
model

LightningRCNN(
  (model): RCNN(
    (m): VGG(
      (features): Sequential(
        (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (2): ReLU(inplace=True)
        (3): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
        (4): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        (5): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (6): ReLU(inplace=True)
        (7): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
        (8): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        (9): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (10): ReLU(inplace=True)
        (11): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        (12): BatchNorm2d(256, eps=1e-05, momentu

# without pruning

In [8]:
trainer = pl.Trainer(accelerator="gpu")

GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs


In [9]:
test_result = trainer.test(model, dataloaders=test_loader)

LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]


Testing: 0it [00:00, ?it/s]

# pruning at 60%

In [10]:
from torch.nn.utils import prune

In [14]:
import copy

In [15]:
temp_copy_model = copy.deepcopy(model)

In [16]:
for name, module in temp_copy_model.named_modules():
    if isinstance(module, torch.nn.Conv2d):
         prune.l1_unstructured(module, name='weight', amount=0.6)

In [17]:
print(dict(temp_copy_model.named_buffers()).keys())

dict_keys(['model.m.features.0.weight_mask', 'model.m.features.1.running_mean', 'model.m.features.1.running_var', 'model.m.features.1.num_batches_tracked', 'model.m.features.4.weight_mask', 'model.m.features.5.running_mean', 'model.m.features.5.running_var', 'model.m.features.5.num_batches_tracked', 'model.m.features.8.weight_mask', 'model.m.features.9.running_mean', 'model.m.features.9.running_var', 'model.m.features.9.num_batches_tracked', 'model.m.features.11.weight_mask', 'model.m.features.12.running_mean', 'model.m.features.12.running_var', 'model.m.features.12.num_batches_tracked', 'model.m.features.15.weight_mask', 'model.m.features.16.running_mean', 'model.m.features.16.running_var', 'model.m.features.16.num_batches_tracked', 'model.m.features.18.weight_mask', 'model.m.features.19.running_mean', 'model.m.features.19.running_var', 'model.m.features.19.num_batches_tracked', 'model.m.features.22.weight_mask', 'model.m.features.23.running_mean', 'model.m.features.23.running_var', '

In [19]:
prune.is_pruned(temp_copy_model)

True

In [21]:
test_result = trainer.test(temp_copy_model, dataloaders=test_loader)

LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]


Testing: 0it [00:00, ?it/s]

In [22]:
trainer.save_checkpoint("model_v1_vgg/pruned_60%.ckpt")

# pruning at 50%

In [23]:
del temp_copy_model

temp_copy_model = copy.deepcopy(model)

for name, module in temp_copy_model.named_modules():
    if isinstance(module, torch.nn.Conv2d):
         prune.l1_unstructured(module, name='weight', amount=0.5)

print(dict(temp_copy_model.named_buffers()).keys())

dict_keys(['model.m.features.0.weight_mask', 'model.m.features.1.running_mean', 'model.m.features.1.running_var', 'model.m.features.1.num_batches_tracked', 'model.m.features.4.weight_mask', 'model.m.features.5.running_mean', 'model.m.features.5.running_var', 'model.m.features.5.num_batches_tracked', 'model.m.features.8.weight_mask', 'model.m.features.9.running_mean', 'model.m.features.9.running_var', 'model.m.features.9.num_batches_tracked', 'model.m.features.11.weight_mask', 'model.m.features.12.running_mean', 'model.m.features.12.running_var', 'model.m.features.12.num_batches_tracked', 'model.m.features.15.weight_mask', 'model.m.features.16.running_mean', 'model.m.features.16.running_var', 'model.m.features.16.num_batches_tracked', 'model.m.features.18.weight_mask', 'model.m.features.19.running_mean', 'model.m.features.19.running_var', 'model.m.features.19.num_batches_tracked', 'model.m.features.22.weight_mask', 'model.m.features.23.running_mean', 'model.m.features.23.running_var', '

In [24]:
prune.is_pruned(temp_copy_model)

True

In [25]:
test_result = trainer.test(temp_copy_model, dataloaders=test_loader)

LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]


Testing: 0it [00:00, ?it/s]

In [26]:
trainer.save_checkpoint("model_v1_vgg/pruned_50%.ckpt")

# pruning at 70%

In [27]:
del temp_copy_model

temp_copy_model = copy.deepcopy(model)

for name, module in temp_copy_model.named_modules():
    if isinstance(module, torch.nn.Conv2d):
         prune.l1_unstructured(module, name='weight', amount=0.7)

print(dict(temp_copy_model.named_buffers()).keys())

dict_keys(['model.m.features.0.weight_mask', 'model.m.features.1.running_mean', 'model.m.features.1.running_var', 'model.m.features.1.num_batches_tracked', 'model.m.features.4.weight_mask', 'model.m.features.5.running_mean', 'model.m.features.5.running_var', 'model.m.features.5.num_batches_tracked', 'model.m.features.8.weight_mask', 'model.m.features.9.running_mean', 'model.m.features.9.running_var', 'model.m.features.9.num_batches_tracked', 'model.m.features.11.weight_mask', 'model.m.features.12.running_mean', 'model.m.features.12.running_var', 'model.m.features.12.num_batches_tracked', 'model.m.features.15.weight_mask', 'model.m.features.16.running_mean', 'model.m.features.16.running_var', 'model.m.features.16.num_batches_tracked', 'model.m.features.18.weight_mask', 'model.m.features.19.running_mean', 'model.m.features.19.running_var', 'model.m.features.19.num_batches_tracked', 'model.m.features.22.weight_mask', 'model.m.features.23.running_mean', 'model.m.features.23.running_var', '

In [28]:
prune.is_pruned(temp_copy_model)

True

In [29]:
test_result = trainer.test(temp_copy_model, dataloaders=test_loader)

LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]


Testing: 0it [00:00, ?it/s]

In [30]:
trainer.save_checkpoint("model_v1_vgg/pruned_70%.ckpt")

# pruning at 90%

In [32]:
del temp_copy_model

temp_copy_model = copy.deepcopy(model)

for name, module in temp_copy_model.named_modules():
    if isinstance(module, torch.nn.Conv2d):
         prune.l1_unstructured(module, name='weight', amount=0.9)

print(dict(temp_copy_model.named_buffers()).keys())

dict_keys(['model.m.features.0.weight_mask', 'model.m.features.1.running_mean', 'model.m.features.1.running_var', 'model.m.features.1.num_batches_tracked', 'model.m.features.4.weight_mask', 'model.m.features.5.running_mean', 'model.m.features.5.running_var', 'model.m.features.5.num_batches_tracked', 'model.m.features.8.weight_mask', 'model.m.features.9.running_mean', 'model.m.features.9.running_var', 'model.m.features.9.num_batches_tracked', 'model.m.features.11.weight_mask', 'model.m.features.12.running_mean', 'model.m.features.12.running_var', 'model.m.features.12.num_batches_tracked', 'model.m.features.15.weight_mask', 'model.m.features.16.running_mean', 'model.m.features.16.running_var', 'model.m.features.16.num_batches_tracked', 'model.m.features.18.weight_mask', 'model.m.features.19.running_mean', 'model.m.features.19.running_var', 'model.m.features.19.num_batches_tracked', 'model.m.features.22.weight_mask', 'model.m.features.23.running_mean', 'model.m.features.23.running_var', '

In [33]:
prune.is_pruned(temp_copy_model)

True

In [34]:
test_result = trainer.test(temp_copy_model, dataloaders=test_loader)

LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]


Testing: 0it [00:00, ?it/s]

In [35]:
trainer.save_checkpoint("model_v1_vgg/pruned_90%.ckpt")

# Report
1. base finetuned model have accuracy of 84% and model size is around 1.5 gb
2. <ins>50% pruning has given 84% accuracy with model size of 542 mb </ins> (Is the best model)
3. 60% pruning has given 82% accuracy with model size of 542 mb
4. 70% pruning has given 79% accuracy with model size of 542 mb
5. 90% pruning has given 12% accuracy with model size of 542 mb