<a href="https://colab.research.google.com/github/felixsimard/comp551-p4/blob/main/MNIST_Quantization_experiments.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Imports and environmment setup

N.b. Requires runtime restart on Google Colab


In [1]:
!pip install regex requests hydra-core omegaconf bitarray



In [2]:
!pip install fairseq



In [3]:
import torch
import torch.nn as nn
from torchvision import models
from torchsummary import summary
import torch.nn.functional as F
import numpy as np


from fairseq.modules.quantization.pq import quantize_model_, SizeTracker
from fairseq.modules.quant_noise import quant_noise

from operator import attrgetter, itemgetter
import re

In [4]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# Load MNIST dataset

In [5]:
from torchvision import datasets
import torchvision.transforms as transforms

# number of subprocesses to use for data loading
num_workers = 0
# how many samples per batch to load
batch_size = 20

# convert data to torch.FloatTensor
transform = transforms.ToTensor()

# choose the training and test datasets
train_data = datasets.MNIST(root='data', train=True,
                                   download=True, transform=transform)
test_data = datasets.MNIST(root='data', train=False,
                                  download=True, transform=transform)

# prepare data loaders
train_loader = torch.utils.data.DataLoader(train_data, batch_size=batch_size,
    num_workers=num_workers)
test_loader = torch.utils.data.DataLoader(test_data, batch_size=batch_size, 
    num_workers=num_workers)

Downloading http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz to data/MNIST/raw/train-images-idx3-ubyte.gz


  0%|          | 0/9912422 [00:00<?, ?it/s]

Extracting data/MNIST/raw/train-images-idx3-ubyte.gz to data/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz to data/MNIST/raw/train-labels-idx1-ubyte.gz


  0%|          | 0/28881 [00:00<?, ?it/s]

Extracting data/MNIST/raw/train-labels-idx1-ubyte.gz to data/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz to data/MNIST/raw/t10k-images-idx3-ubyte.gz


  0%|          | 0/1648877 [00:00<?, ?it/s]

Extracting data/MNIST/raw/t10k-images-idx3-ubyte.gz to data/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz to data/MNIST/raw/t10k-labels-idx1-ubyte.gz


  0%|          | 0/4542 [00:00<?, ?it/s]

Extracting data/MNIST/raw/t10k-labels-idx1-ubyte.gz to data/MNIST/raw



# Models definition

In [56]:
class MLP(nn.Module):
    """Standard MLP with no quant noise"""

    def __init__(self):
        super().__init__()

        self.linear1 = nn.Linear(784, 100)
        self.linear2 = nn.Linear(100, 100)
        self.linear4 = nn.Linear(100, 10)
    
    def forward(self, x):
        x = x.view(x.size()[0], -1)
        x = self.linear1(x)
        x = F.relu(x)
        x = self.linear2(x)
        x = F.relu(x)
        x = self.linear4(x)
        x = F.relu(x)
        return x

In [64]:
class MLPQuantNoise(nn.Module):
    """MLP with quant noise wrapper"""
    def __init__(self):
        super().__init__()
        p = 0.02
        block_size = 4
        self.linear1 = quant_noise(nn.Linear(784, 20), p, block_size)
        self.linear2 = quant_noise(nn.Linear(20, 20), p, block_size)
        self.linear4 = quant_noise(nn.Linear(20, 10), p, block_size)
    
    def forward(self, x):
        x = x.view(x.size()[0], -1)
        x = self.linear1(x)
        x = F.relu(x)
        x = self.linear2(x)
        x = F.relu(x)
        x = self.linear4(x)
        x = F.relu(x)
        return x
    

In [66]:
class ConvModel(nn.Module):
    """Standard convolutional model without quant noise"""
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(1, 4, 3, padding='same')
        self.conv2 = nn.Conv2d(4, 8, 3, padding='same')
        self.linear = nn.Linear(8 * 28 * 28, 10)

    def forward(self, x):
        x = self.conv1(x)
        x = F.relu(x)
        x = self.conv2(x)
        x = F.relu(x)
        x = x.view(x.size()[0], -1)
        x = self.linear(x)
        x = F.relu(x)
        return x

In [22]:
class QuantConvModel(nn.Module):
    """Convolutional model with quant noise"""
    def __init__(self):
        super().__init__()
        p = 0.2
        block_size = 3
        self.conv1 = quant_noise(nn.Conv2d(1, 4, 3, padding='same'), p, block_size)
        self.conv2 = quant_noise(nn.Conv2d(4, 8, 3, padding='same'), p, block_size)
        self.linear = quant_noise(nn.Linear(8 * 28 * 28, 10), p, 4)

    def forward(self, x):
        x = self.conv1(x)
        x = F.relu(x)
        x = self.conv2(x)
        x = F.relu(x)
        x = x.view(x.size()[0], -1)
        x = self.linear(x)
        x = F.relu(x)
        return x

# Train / Eval implementation


In [7]:
def train(model, train_loader, n_epochs=5):
    N = len(train_loader.dataset)
    model.train()

    for epoch in range(n_epochs):
        train_loss = 0.0

        for data, label in train_loader:
            data = data.to(device)
            label = label.to(device)
            
            optimizer.zero_grad()

            output = model(data)
            loss = criterion(output, label)
            loss.backward()
            optimizer.step()

            train_loss += loss.item()*data.size(0)

        train_loss = train_loss/N

        print(file='Epoch: {} \tTraining Loss: {:.6f}'.format(epoch+1, train_loss))

In [8]:
def eval_model(model, test_loader):
    test_loss = 0.0
    class_correct = list(0. for i in range(10))
    class_total = list(0. for i in range(10))

    model.eval() # prep model for *evaluation*

    for data, target in test_loader:
        data = data.to(device)
        target = target.to(device)
        
        output = model(data)
        loss = criterion(output, target)
        test_loss += loss.item()*data.size(0)
        _, pred = torch.max(output, 1)
        correct = np.squeeze(pred.eq(target.data.view_as(pred)))
        for i in range(batch_size):
            label = target.data[i]
            class_correct[label] += correct[i].item()
            class_total[label] += 1

    test_loss = test_loss/len(test_loader.dataset)
    print('Test Loss: {:.6f}\n'.format(test_loss))

    for i in range(10):
        if class_total[i] > 0:
            print('Test Accuracy of %5s: %2d%% (%2d/%2d)' % (
                str(i), 100 * class_correct[i] / class_total[i],
                np.sum(class_correct[i]), np.sum(class_total[i])))
        else:
            print('Test Accuracy of %5s: N/A (no training examples)' % (classes[i]))

    print('\nTest Accuracy (Overall): %2d%% (%2d/%2d)' % (
        100. * np.sum(class_correct) / np.sum(class_total),
        np.sum(class_correct), np.sum(class_total)))

# Quantization configs

In [None]:
config = {
    'n_centroids': {
              'Conv2d': ('kernel_size', {'*': 256}),
              'Linear': ('in_features', {'*': 256})
          },
    'block_sizes': {
              'Conv2d': ('kernel_size', {'(3, 3)': 3, '(1, 1)': 4}), # '(3, 3)': 9
              'Linear': ('in_features', {'*': 4})
          },
    'layers_to_quantize': [".*?"]
}

n_centroids_config = config['n_centroids']
block_sizes_config = config['block_sizes']
layers_to_quantize = config['layers_to_quantize']

# Scalar Quantization

## Comparison of the following approaches for homemade Linear and Convolutional models

*   No quant noise and no quantization
*   No quant noise with quantization
*   Quant noise and quantization

## Vanilla MLP

In [57]:
# plain model
model = MLP()
model = model.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=1e-2)
train(model, train_loader, n_epochs=10)
eval_model(model, test_loader)

Epoch: 1 	Training Loss: 1.101991
Epoch: 2 	Training Loss: 0.333103
Epoch: 3 	Training Loss: 0.267461
Epoch: 4 	Training Loss: 0.223788
Epoch: 5 	Training Loss: 0.191529
Epoch: 6 	Training Loss: 0.166880
Epoch: 7 	Training Loss: 0.147326
Epoch: 8 	Training Loss: 0.131392
Epoch: 9 	Training Loss: 0.118261
Epoch: 10 	Training Loss: 0.107277
Test Loss: 0.119093

Test Accuracy of     0: 98% (967/980)
Test Accuracy of     1: 98% (1122/1135)
Test Accuracy of     2: 95% (986/1032)
Test Accuracy of     3: 96% (976/1010)
Test Accuracy of     4: 96% (948/982)
Test Accuracy of     5: 97% (873/892)
Test Accuracy of     6: 95% (914/958)
Test Accuracy of     7: 95% (977/1028)
Test Accuracy of     8: 94% (916/974)
Test Accuracy of     9: 95% (962/1009)

Test Accuracy (Overall): 96% (9641/10000)


In [58]:
# use previous model trained without quant noise
sizetracker = SizeTracker(model)
quantized_layers = quantize_model_(model, sizetracker, layers_to_quantize, block_sizes_config, n_centroids_config)
print('Quantized layers:', quantized_layers)

print('Test accuracy after quantization')
eval_model(model, test_loader)
sizetracker

Quantized layers: ['linear1', 'linear2', 'linear4']
Test accuracy after quantization
Test Loss: 0.162468

Test Accuracy of     0: 98% (967/980)
Test Accuracy of     1: 98% (1122/1135)
Test Accuracy of     2: 96% (995/1032)
Test Accuracy of     3: 96% (975/1010)
Test Accuracy of     4: 97% (960/982)
Test Accuracy of     5: 98% (879/892)
Test Accuracy of     6: 90% (870/958)
Test Accuracy of     7: 94% (973/1028)
Test Accuracy of     8: 89% (867/974)
Test Accuracy of     9: 89% (908/1009)

Test Accuracy (Overall): 95% (9516/10000)


Non-compressed model size: 0.38 MB. After quantizing 3 layers, size (indexing + centroids + other): 0.02 MB + 0.01 MB + 0.04 MB = 0.07 MB, compression ratio: 5.72x

In [65]:
quantnoise_model = MLPQuantNoise()
sizetracker = SizeTracker(quantnoise_model)
quantized_layers = quantize_model_(quantnoise_model, sizetracker, layers_to_quantize, block_sizes_config, n_centroids_config)
print('Quantized layers:', quantized_layers)
quantnoise_model = quantnoise_model.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(quantnoise_model.parameters(), lr=1e-2)
train(quantnoise_model, train_loader, n_epochs=10)
eval_model(quantnoise_model, test_loader)

Quantized layers: ['linear1', 'linear2', 'linear4']
Epoch: 1 	Training Loss: 1.812365
Epoch: 2 	Training Loss: 1.332222
Epoch: 3 	Training Loss: 1.183771
Epoch: 4 	Training Loss: 1.155922
Epoch: 5 	Training Loss: 1.139379
Epoch: 6 	Training Loss: 1.127996
Epoch: 7 	Training Loss: 1.119579
Epoch: 8 	Training Loss: 1.112899
Epoch: 9 	Training Loss: 1.106999
Epoch: 10 	Training Loss: 1.102016
Test Loss: 1.099139

Test Accuracy of     0: 99% (974/980)
Test Accuracy of     1:  0% ( 0/1135)
Test Accuracy of     2: 91% (940/1032)
Test Accuracy of     3: 93% (945/1010)
Test Accuracy of     4: 94% (927/982)
Test Accuracy of     5:  0% ( 0/892)
Test Accuracy of     6:  0% ( 0/958)
Test Accuracy of     7: 90% (927/1028)
Test Accuracy of     8:  0% ( 0/974)
Test Accuracy of     9: 92% (930/1009)

Test Accuracy (Overall): 56% (5643/10000)


In [67]:
sizetracker

Non-compressed model size: 0.06 MB. After quantizing 3 layers, size (indexing + centroids + other): 0.00 MB + 0.01 MB + 0.00 MB = 0.01 MB, compression ratio: 6.27x

## Conv model



In [68]:
# plain model
model = ConvModel()
model = model.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=1e-2)
train(model, train_loader, n_epochs=10)
eval_model(model, test_loader)

Epoch: 1 	Training Loss: 0.869171
Epoch: 2 	Training Loss: 0.393527
Epoch: 3 	Training Loss: 0.108878
Epoch: 4 	Training Loss: 0.084117
Epoch: 5 	Training Loss: 0.071380
Epoch: 6 	Training Loss: 0.062963
Epoch: 7 	Training Loss: 0.056677
Epoch: 8 	Training Loss: 0.051595
Epoch: 9 	Training Loss: 0.047350
Epoch: 10 	Training Loss: 0.043739
Test Loss: 0.070240

Test Accuracy of     0: 99% (973/980)
Test Accuracy of     1: 99% (1125/1135)
Test Accuracy of     2: 98% (1012/1032)
Test Accuracy of     3: 98% (991/1010)
Test Accuracy of     4: 98% (968/982)
Test Accuracy of     5: 98% (876/892)
Test Accuracy of     6: 94% (902/958)
Test Accuracy of     7: 96% (993/1028)
Test Accuracy of     8: 96% (944/974)
Test Accuracy of     9: 97% (986/1009)

Test Accuracy (Overall): 97% (9770/10000)


In [70]:
# use previous model trained without quant noise
sizetracker = SizeTracker(model)
quantized_layers = quantize_model_(model, sizetracker, layers_to_quantize, block_sizes_config, n_centroids_config)
print('Quantized layers:', quantized_layers)

print('Test accuracy after quantization')
eval_model(model, test_loader)
sizetracker

Quantized layers: ['conv1', 'conv2', 'linear']
Test accuracy after quantization


TypeError: ignored

In [71]:
quantnoise_model = QuantConvModel()
sizetracker = SizeTracker(quantnoise_model)
quantized_layers = quantize_model_(quantnoise_model, sizetracker, layers_to_quantize, block_sizes_config, n_centroids_config)
print('Quantized layers:', quantized_layers)
quantnoise_model = quantnoise_model.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(quantnoise_model.parameters(), lr=1e-2)
train(quantnoise_model, train_loader, n_epochs=10)
eval_model(quantnoise_model, test_loader)

Quantized layers: ['conv1', 'conv2', 'linear']


TypeError: ignored

# Iterative Product Quantization (iPQ)

## Comparison of the following approaches for homemade Linear and Convolutional models

*   No quant noise and no quantization
*   No quant noise with quantization
*   Quant noise and quantization

In [74]:
def iPQ(model):
    size_tracker = SizeTracker(model)

    # Quantize model by stages
    for step in range(len(layers_to_quantize)):

        # quantize model in-place
        quantized_layers = quantize_model_(
            model,
            size_tracker,
            layers_to_quantize,
            block_sizes_config,
            n_centroids_config,
            step=step,
        )
        print(f"Finetuning stage {step}, quantized layers: {quantized_layers}")
        print(f"{size_tracker}")

        # Don't forget to re-create/update trainer/optimizer since model parameters have changed
        optimizer = torch.optim.SGD(model.parameters(), lr=1e-2)

        # Finetune the centroids with your usual training loop for a few epochs
        train(model, train_loader, n_epochs=5)

        # eval model
        eval(q)

## Vanilla MLP

In [77]:
# plain model
model = MLP()
model = model.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=1e-2)
train(model, train_loader, n_epochs=10)
eval_model(model, test_loader)

Epoch: 1 	Training Loss: 1.580700
Epoch: 2 	Training Loss: 0.991750
Epoch: 3 	Training Loss: 0.933765
Epoch: 4 	Training Loss: 0.895316
Epoch: 5 	Training Loss: 0.863501
Epoch: 6 	Training Loss: 0.839098
Epoch: 7 	Training Loss: 0.821392
Epoch: 8 	Training Loss: 0.807972
Epoch: 9 	Training Loss: 0.797247
Epoch: 10 	Training Loss: 0.788072
Test Loss: 0.791380

Test Accuracy of     0: 93% (920/980)
Test Accuracy of     1: 99% (1128/1135)
Test Accuracy of     2:  0% ( 0/1032)
Test Accuracy of     3: 97% (989/1010)
Test Accuracy of     4: 95% (942/982)
Test Accuracy of     5: 97% (867/892)
Test Accuracy of     6: 97% (936/958)
Test Accuracy of     7:  0% ( 0/1028)
Test Accuracy of     8: 94% (925/974)
Test Accuracy of     9: 95% (965/1009)

Test Accuracy (Overall): 76% (7672/10000)


In [79]:
# use previous model trained without quant noise
iPQ(model)

KeyError: ignored

In [80]:
quantnoise_model = MLPQuantNoise()
sizetracker = SizeTracker(quantnoise_model)
quantnoise_model = quantnoise_model.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(quantnoise_model.parameters(), lr=1e-2)
train(quantnoise_model, train_loader, n_epochs=10)
print("Test accuracy before iPQ")
eval_model(quantnoise_model, test_loader)

iPQ(model)

Epoch: 1 	Training Loss: 2.382372
Epoch: 2 	Training Loss: 2.301521
Epoch: 3 	Training Loss: 2.301628
Epoch: 4 	Training Loss: 2.301530
Epoch: 5 	Training Loss: 2.301521
Epoch: 6 	Training Loss: 2.301531
Epoch: 7 	Training Loss: 2.301636
Epoch: 8 	Training Loss: 2.301578
Epoch: 9 	Training Loss: 2.301498
Epoch: 10 	Training Loss: 2.301500
Test accuracy before iPQ
Test Loss: 2.301452

Test Accuracy of     0:  0% ( 0/980)
Test Accuracy of     1: 100% (1135/1135)
Test Accuracy of     2:  0% ( 0/1032)
Test Accuracy of     3:  0% ( 0/1010)
Test Accuracy of     4:  0% ( 0/982)
Test Accuracy of     5:  0% ( 0/892)
Test Accuracy of     6:  0% ( 0/958)
Test Accuracy of     7:  0% ( 0/1028)
Test Accuracy of     8:  0% ( 0/974)
Test Accuracy of     9:  0% ( 0/1009)

Test Accuracy (Overall): 11% (1135/10000)


KeyError: ignored

In [None]:
sizetracker

Non-compressed model size: 0.06 MB. After quantizing 3 layers, size (indexing + centroids + other): 0.00 MB + 0.01 MB + 0.00 MB = 0.01 MB, compression ratio: 6.27x

## Conv model



In [81]:
# plain model
model = ConvModel()
model = model.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=1e-2)
train(model, train_loader, n_epochs=10)
eval_model(model, test_loader)

Epoch: 1 	Training Loss: 0.453799
Epoch: 2 	Training Loss: 0.220119
Epoch: 3 	Training Loss: 0.129491
Epoch: 4 	Training Loss: 0.096502
Epoch: 5 	Training Loss: 0.081060
Epoch: 6 	Training Loss: 0.071181
Epoch: 7 	Training Loss: 0.064252
Epoch: 8 	Training Loss: 0.058944
Epoch: 9 	Training Loss: 0.054583
Epoch: 10 	Training Loss: 0.050833
Test Loss: 0.069242

Test Accuracy of     0: 99% (975/980)
Test Accuracy of     1: 99% (1127/1135)
Test Accuracy of     2: 98% (1014/1032)
Test Accuracy of     3: 99% (1000/1010)
Test Accuracy of     4: 98% (966/982)
Test Accuracy of     5: 97% (869/892)
Test Accuracy of     6: 96% (929/958)
Test Accuracy of     7: 96% (987/1028)
Test Accuracy of     8: 96% (944/974)
Test Accuracy of     9: 97% (981/1009)

Test Accuracy (Overall): 97% (9792/10000)


In [None]:
# use previous model trained without quant noise
iPQ(model)

Quantized layers: ['conv1', 'conv2', 'linear']
Test accuracy after quantization


TypeError: ignored

In [82]:
quantnoise_model = QuantConvModel()
sizetracker = SizeTracker(quantnoise_model)
quantnoise_model = quantnoise_model.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(quantnoise_model.parameters(), lr=1e-2)
train(quantnoise_model, train_loader, n_epochs=10)
print("Test accuracy before iPQ")
eval_model(quantnoise_model, test_loader)

iPQ(model)

Epoch: 1 	Training Loss: 2.303102
Epoch: 2 	Training Loss: 2.302585
Epoch: 3 	Training Loss: 2.302585
Epoch: 4 	Training Loss: 2.302585
Epoch: 5 	Training Loss: 2.302585
Epoch: 6 	Training Loss: 2.302585
Epoch: 7 	Training Loss: 2.302585
Epoch: 8 	Training Loss: 2.302585
Epoch: 9 	Training Loss: 2.302585
Epoch: 10 	Training Loss: 2.302585
Test accuracy before iPQ
Test Loss: 2.302585

Test Accuracy of     0: 100% (980/980)
Test Accuracy of     1:  0% ( 0/1135)
Test Accuracy of     2:  0% ( 0/1032)
Test Accuracy of     3:  0% ( 0/1010)
Test Accuracy of     4:  0% ( 0/982)
Test Accuracy of     5:  0% ( 0/892)
Test Accuracy of     6:  0% ( 0/958)
Test Accuracy of     7:  0% ( 0/1028)
Test Accuracy of     8:  0% ( 0/974)
Test Accuracy of     9:  0% ( 0/1009)

Test Accuracy (Overall):  9% (980/10000)


KeyError: ignored