# Efficient Machine Learning - Fine-grained and channel pruning
This notebook concerns the concept of neural network pruning. The concepts of fine-grained and channel pruning are implemented and tested. The performance improvements and differences and tradeoffs between these pruning approaches are compared

## Model and setup
We will conduct the experiments on VGG16. The model is quite outdated by today standards, however, it is easily dissectable and there are verious pretrained variants available. This makes it suitable for the purpose of this notebook.

In [2]:
%pip install torchprofile

Note: you may need to restart the kernel to use updated packages.


In [2]:
import os
os.environ['CUDA_LAUNCH_BLOCKING'] = "1"
import copy
import math
import random
import time
from collections import OrderedDict, defaultdict
from typing import Union, List

import numpy as np
import torch
import torch.nn as nn
from torch.optim import *
from torch.optim.lr_scheduler import *
from torch.utils.data import DataLoader
from torchprofile import profile_macs
from torchvision.datasets import *
from torchvision.transforms import *
from torchvision.models import vgg16
from tqdm.auto import tqdm

# Ensure CUDA support
assert torch.cuda.is_available(), "The runtime has no CUDA support"

  from .autonotebook import tqdm as notebook_tqdm
  return torch._C._cuda_getDeviceCount() > 0


AssertionError: The runtime has no CUDA support

In [None]:
# setting seeds for reproducability
random.seed(0)
np.random.seed(0)
torch.manual_seed(0)

<torch._C.Generator at 0x768c483451f0>

The model architecture is the same as vgg11_bn in torchvision - this enables us to use the weights pretrained on imagenet_1k. This class definition is used to add explicit layer names to the model.

## Edit
This model class is no longer needed. We are changing the layer names in the pretrained pytorch model instead.

In [3]:
class VGG(nn.Module):
  ARCH = [64, 128, 'M', 256, 256, 'M', 512, 512, 'M', 512, 512, 'M']
  # ARCH =  [64, "M", 128, "M", 256, 256, "M", 512, 512, "M", 512, 512, "M"]

  def __init__(self) -> None:
    super().__init__()

    layers = []
    counts = defaultdict(int)

    def add(name: str, layer: nn.Module) -> None:
      layers.append((f"{name}{counts[name]}", layer))
      counts[name] += 1

    in_channels = 3
    for x in self.ARCH:
      if x != 'M':
        # conv-bn-relu
        add("conv", nn.Conv2d(in_channels, x, 3, padding=1, bias=False))
        add("bn", nn.BatchNorm2d(x))
        add("relu", nn.ReLU(True))
        in_channels = x
      else:
        # maxpool
        add("pool", nn.MaxPool2d(2))

    self.backbone = nn.Sequential(OrderedDict(layers))
    self.classifier = nn.Linear(512, 10)

  def forward(self, x: torch.Tensor) -> torch.Tensor:
    # backbone: [N, 3, 32, 32] => [N, 512, 2, 2]
    x = self.backbone(x)

    # avgpool: [N, 512, 2, 2] => [N, 512]
    x = x.mean([2, 3])

    # classifier: [N, 512] => [N, 10]
    x = self.classifier(x)
    return x

In [4]:
model = VGG().cuda()

RuntimeError: CUDA unknown error - this may be due to an incorrectly set up environment, e.g. changing env variable CUDA_VISIBLE_DEVICES after program start. Setting the available devices to be zero.

In [None]:
model

VGG(
  (backbone): Sequential(
    (conv0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
    (bn0): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (relu0): ReLU(inplace=True)
    (pool0): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (conv1): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
    (bn1): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (relu1): ReLU(inplace=True)
    (pool1): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (conv2): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
    (bn2): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (relu2): ReLU(inplace=True)
    (conv3): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
    (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, a

In [5]:
for name, param in model.named_parameters():
    print(name, param.size())

NameError: name 'model' is not defined

Load the pretrained model and transfer the weights.

## pretrained model from pytroch

In [6]:
from torchvision.models import vgg11_bn
model = vgg11_bn(pretrained=True)

print(model)



VGG(
  (features): Sequential(
    (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU(inplace=True)
    (3): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (4): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (5): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (6): ReLU(inplace=True)
    (7): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (8): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (9): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (10): ReLU(inplace=True)
    (11): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (12): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (13): ReLU(inplace=True)
    (14): MaxPool2d(ke

## Change layer names for further visualization

In [7]:
class KV_Getter:
    def __init__(self):
        self.it = 0

    def get_kv(self, k, v):
        if isinstance(v, torch.nn.modules.conv.Conv2d):
            return (f'conv{self.it}', v)
        elif isinstance(v, torch.nn.modules.batchnorm.BatchNorm2d):
            return (f'batchnorm{self.it}', v)
        elif isinstance(v, torch.nn.modules.activation.ReLU):
            self.it += 1
            return (f'relu{self.it-1}', v)
        elif isinstance(v, torch.nn.modules.pooling.MaxPool2d):
            return (f'maxpool{self.it-1}', v)
        else:
            return (k, v)

    def get_kv_classifier(self, k, v):
        if isinstance(v, torch.nn.Linear):
            return (f'linear{self.it}', v)
        elif isinstance(v, torch.nn.modules.activation.ReLU):
            return (f'relu{self.it}', v)
        elif isinstance(v, torch.nn.Dropout):
            self.it += 1
            return (f'maxpool{self.it-1}', v)
        else:
            return (k, v)
            
kvg = KV_Getter()
new_od = OrderedDict([kvg.get_kv(k,v) for k, v in model._modules['features']._modules.items()])

kvg = KV_Getter()
new_od_classifier = OrderedDict([kvg.get_kv_classifier(k,v) for k, v in model._modules['classifier']._modules.items()])

        
model._modules['features']._modules = new_od
model._modules['classifier']._modules = new_od_classifier

print(model)

VGG(
  (features): Sequential(
    (conv0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (batchnorm0): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (relu0): ReLU(inplace=True)
    (maxpool0): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (conv1): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (batchnorm1): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (relu1): ReLU(inplace=True)
    (maxpool1): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (conv2): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (batchnorm2): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (relu2): ReLU(inplace=True)
    (conv3): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (batchnorm3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, tr

In [8]:
for name, param in model.named_parameters():
    print(name)

features.conv0.weight
features.conv0.bias
features.batchnorm0.weight
features.batchnorm0.bias
features.conv1.weight
features.conv1.bias
features.batchnorm1.weight
features.batchnorm1.bias
features.conv2.weight
features.conv2.bias
features.batchnorm2.weight
features.batchnorm2.bias
features.conv3.weight
features.conv3.bias
features.batchnorm3.weight
features.batchnorm3.bias
features.conv4.weight
features.conv4.bias
features.batchnorm4.weight
features.batchnorm4.bias
features.conv5.weight
features.conv5.bias
features.batchnorm5.weight
features.batchnorm5.bias
features.conv6.weight
features.conv6.bias
features.batchnorm6.weight
features.batchnorm6.bias
features.conv7.weight
features.conv7.bias
features.batchnorm7.weight
features.batchnorm7.bias
classifier.linear0.weight
classifier.linear0.bias
classifier.linear1.weight
classifier.linear1.bias
classifier.linear2.weight
classifier.linear2.bias


## Utility function for training, inference, etc.

In [9]:
def train(
  model: nn.Module,
  dataloader: DataLoader,
  criterion: nn.Module,
  optimizer: Optimizer,
  scheduler: LambdaLR,
  callbacks = None
) -> None:
  model.train()

  for inputs, targets in tqdm(dataloader, desc='train', leave=False):
    # Move the data from CPU to GPU
    inputs = inputs.cuda()
    targets = targets.cuda()

    # Reset the gradients (from the last iteration)
    optimizer.zero_grad()

    # Forward inference
    outputs = model(inputs)
    loss = criterion(outputs, targets)

    # Backward propagation
    loss.backward()

    # Update optimizer and LR scheduler
    optimizer.step()
    scheduler.step()

    if callbacks is not None:
        for callback in callbacks:
            callback()

In [10]:
@torch.inference_mode()
def evaluate(
  model: nn.Module,
  dataloader: DataLoader,
  verbose=True,
) -> float:
  model.eval()

  num_samples = 0
  num_correct = 0

  for inputs, targets in tqdm(dataloader, desc="eval", leave=False,
                              disable=not verbose):
    # Move the data from CPU to GPU
    inputs = inputs.cuda()
    targets = targets.cuda()

    # Inference
    outputs = model(inputs)

    # Convert logits to class indices
    outputs = outputs.argmax(dim=1)

    # Update metrics
    num_samples += targets.size(0)
    num_correct += (outputs == targets).sum()

  return (num_correct / num_samples * 100).item()

Helper Functions (Flops, Model Size calculation, etc.)

In [11]:
def get_model_macs(model, inputs) -> int:
    return profile_macs(model, inputs)


def get_sparsity(tensor: torch.Tensor) -> float:
    """
    calculate the sparsity of the given tensor
        sparsity = #zeros / #elements = 1 - #nonzeros / #elements
    """
    return 1 - float(tensor.count_nonzero()) / tensor.numel()


def get_model_sparsity(model: nn.Module) -> float:
    """
    calculate the sparsity of the given model
        sparsity = #zeros / #elements = 1 - #nonzeros / #elements
    """
    num_nonzeros, num_elements = 0, 0
    for param in model.parameters():
        num_nonzeros += param.count_nonzero()
        num_elements += param.numel()
    return 1 - float(num_nonzeros) / num_elements

def get_num_parameters(model: nn.Module, count_nonzero_only=False) -> int:
    """
    calculate the total number of parameters of model
    :param count_nonzero_only: only count nonzero weights
    """
    num_counted_elements = 0
    for param in model.parameters():
        if count_nonzero_only:
            num_counted_elements += param.count_nonzero()
        else:
            num_counted_elements += param.numel()
    return num_counted_elements


def get_model_size(model: nn.Module, data_width=32, count_nonzero_only=False) -> int:
    """
    calculate the model size in bits
    :param data_width: #bits per element
    :param count_nonzero_only: only count nonzero weights
    """
    return get_num_parameters(model, count_nonzero_only) * data_width

Byte = 8
KiB = 1024 * Byte
MiB = 1024 * KiB
GiB = 1024 * MiB

Define misc functions for verification.

In [12]:
def test_fine_grained_prune(
    test_tensor=torch.tensor([[-0.46, -0.40, 0.39, 0.19, 0.37],
                              [0.00, 0.40, 0.17, -0.15, 0.16],
                              [-0.20, -0.23, 0.36, 0.25, 0.03],
                              [0.24, 0.41, 0.07, 0.13, -0.15],
                              [0.48, -0.09, -0.36, 0.12, 0.45]]),
    test_mask=torch.tensor([[True, True, False, False, False],
                            [False, True, False, False, False],
                            [False, False, False, False, False],
                            [False, True, False, False, False],
                            [True, False, False, False, True]]),
    target_sparsity=0.75, target_nonzeros=None):
    def plot_matrix(tensor, ax, title):
        ax.imshow(tensor.cpu().numpy() == 0, vmin=0, vmax=1, cmap='tab20c')
        ax.set_title(title)
        ax.set_yticklabels([])
        ax.set_xticklabels([])
        for i in range(tensor.shape[1]):
            for j in range(tensor.shape[0]):
                text = ax.text(j, i, f'{tensor[i, j].item():.2f}',
                                ha="center", va="center", color="k")

    test_tensor = test_tensor.clone()
    fig, axes = plt.subplots(1,2, figsize=(6, 10))
    ax_left, ax_right = axes.ravel()
    plot_matrix(test_tensor, ax_left, 'dense tensor')

    sparsity_before_pruning = get_sparsity(test_tensor)
    mask = fine_grained_prune(test_tensor, target_sparsity)
    sparsity_after_pruning = get_sparsity(test_tensor)
    sparsity_of_mask = get_sparsity(mask)

    plot_matrix(test_tensor, ax_right, 'sparse tensor')
    fig.tight_layout()
    plt.show()

    print('* Test fine_grained_prune()')
    print(f'    target sparsity: {target_sparsity:.2f}')
    print(f'        sparsity before pruning: {sparsity_before_pruning:.2f}')
    print(f'        sparsity after pruning: {sparsity_after_pruning:.2f}')
    print(f'        sparsity of pruning mask: {sparsity_of_mask:.2f}')

    if target_nonzeros is None:
        if test_mask.equal(mask):
            print('* Test passed.')
        else:
            print('* Test failed.')
    else:
        if mask.count_nonzero() == target_nonzeros:
            print('* Test passed.')
        else:
            print('* Test failed.')

In [14]:
%rm data -r

In [15]:
image_size = 160
transforms = {
    "train": Compose([
        RandomCrop(image_size, padding=4),
        RandomHorizontalFlip(),
        ToTensor(),
    ]),
    "val": Compose([
      RandomCrop(image_size, padding=4),
      ToTensor()
    ])
}
dataset = {}
for split in ["train", "val"]:
  dataset[split] = Imagenette(
    root=f"data/imagenette/{split}",
    split=split,
    size='160px',
    download=True,
    transform=transforms[split],
  )
dataloader = {}
for split in ['train', 'val']:
  dataloader[split] = DataLoader(
    dataset[split],
    batch_size=512,
    shuffle=(split == 'train'),
    num_workers=0,
    pin_memory=True,
  )

Downloading https://s3.amazonaws.com/fast-ai-imageclas/imagenette2-160.tgz to data/imagenette/train/imagenette2-160.tgz


100%|█████████████████████████████████████████████████████████████| 99003388/99003388 [00:04<00:00, 22133940.94it/s]


Extracting data/imagenette/train/imagenette2-160.tgz to data/imagenette/train
Downloading https://s3.amazonaws.com/fast-ai-imageclas/imagenette2-160.tgz to data/imagenette/val/imagenette2-160.tgz


100%|█████████████████████████████████████████████████████████████| 99003388/99003388 [00:04<00:00, 23146250.43it/s]


Extracting data/imagenette/val/imagenette2-160.tgz to data/imagenette/val


## Accuracy and Model Size of Dense Model

In [16]:
dense_model_accuracy = evaluate(model, dataloader['val'])
dense_model_size = get_model_size(model)
print(f"dense model has accuracy={dense_model_accuracy:.2f}%")
print(f"dense model has size={dense_model_size/MiB:.2f} MiB")

                                                                                                                    

RuntimeError: CUDA unknown error - this may be due to an incorrectly set up environment, e.g. changing env variable CUDA_VISIBLE_DEVICES after program start. Setting the available devices to be zero.

In [20]:
torch.cuda.is_available()

False