In [None]:
!gdown 1oHoYT7J4-xKfNu6cfBOMKHyO0QBqdYsI
!unzip /content/calibration_data2.zip -d /content/calibration_data

In [None]:
!pwd

/content


In [None]:
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
from torch.quantization.qconfig import QConfig
from torch.quantization.observer import MinMaxObserver, MovingAverageMinMaxObserver, MovingAveragePerChannelMinMaxObserver, PerChannelMinMaxObserver
import yaml
from torch.ao.quantization.fake_quantize import FakeQuantize
import torch
import torchvision
import os
import torch.nn as nn
import torch.ao.quantization.quantize_fx as quantize_fx
import copy

In [None]:
  example_inputs = (torch.randn(1, 3, 224, 224),)
  model_fp = torchvision.models.resnet18(pretrained=True)
  model_to_quantize = copy.deepcopy(model_fp)
  quantize_fx.fuse_fx(model_to_quantize.eval())
  transform_cali = transforms.Compose([
      transforms.Resize(256),
      transforms.CenterCrop(224),
      transforms.ToTensor(),  # Convert the image to a PyTorch tensor
      transforms.Normalize(
          mean=[0.485, 0.456, 0.406],  # ImageNet dataset mean
          std=[0.229, 0.224, 0.225]  # ImageNet dataset standard deviation
      )
  ])
  transform_val = transforms.Compose([
      transforms.Resize(256),
      transforms.CenterCrop(224),
      transforms.ToTensor(),  # Convert the image to a PyTorch tensor
      transforms.Normalize(
          mean=[0.485, 0.456, 0.406],  # ImageNet dataset mean
          std=[0.229, 0.224, 0.225]  # ImageNet dataset standard deviation
      )
  ])




In [None]:
from PIL import Image
from torch.utils.data import Dataset
import torchvision.transforms as transforms
import random
import os

class MyDataset(Dataset):
    def __init__(self, txt_file, transform=None, dir=None):
        self.data = []
        with open(txt_file, 'r') as f:
            for line in f:
                image_path = line.split(' ')[0]
                label = line.split(' ')[1].split('\n')[0]
                self.data.append((image_path, int(label)))
        self.transform = transform
        self.dir = dir
    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        image_path, label = self.data[idx]
        if self.dir:
            image_path = os.path.join(self.dir, image_path)
        image = Image.open(image_path).convert('RGB')
        if self.transform:
            image = self.transform(image)
        return image, label


class CustomDataset(Dataset):
    def __init__(self, original_dataset, samples_per_epoch):
        self.original_dataset = original_dataset
        self.samples_per_epoch = samples_per_epoch

    def __getitem__(self, index):
        random_index = random.randint(0, len(self.original_dataset) - 1)
        return self.original_dataset[random_index]

    def __len__(self):
        return self.samples_per_epoch

In [None]:
val_dataset = MyDataset('/content/calibration_data/samples.txt',
                        transform=transform_val)

In [None]:
print(len(val_dataset))

896


In [None]:
qconfig = QConfig(activation=FakeQuantize.with_args(observer=MinMaxObserver,
                                                          quant_min=0,
                                                          quant_max=255,
                                                          qscheme=torch.per_tensor_affine,
                                                          reduce_range=False,),
                        weight=FakeQuantize.with_args(observer=PerChannelMinMaxObserver,
                                                      quant_min=-128,
                                                      quant_max=127,
                                                      dtype=torch.qint8,
                                                      qscheme=torch.per_channel_symmetric,
                                                      #per_tensor_symmetric
                                                      reduce_range=False,
                                                      ch_axis=0,
                                                        ))

In [None]:
from torch.ao.quantization import (
  get_default_qconfig_mapping,
  get_default_qat_qconfig_mapping,
  QConfigMapping,
)
qconfig_mapping = QConfigMapping().set_global(qconfig)

In [None]:
model_prepared = quantize_fx.prepare_fx(model_to_quantize, qconfig_mapping, example_inputs)

In [None]:
criterion = nn.CrossEntropyLoss()

In [None]:
def evaluate_model(model, validation_dataloader, criterion, device):
    model.to(device)
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0

    with torch.no_grad():
        for data in validation_dataloader:
            inputs, labels = data
            inputs, labels = inputs.to(device), labels.to(device)

            # Forward pass
            outputs = model(inputs)

            # Compute the loss
            loss = criterion(outputs, labels)
            running_loss += loss.item()

            # Calculate accuracy
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    avg_loss = running_loss / len(validation_dataloader)
    accuracy = 100 * correct / total

    return avg_loss, accuracy

In [None]:
val_loader = DataLoader(val_dataset, batch_size=100, shuffle=False, num_workers=8, drop_last=True)



In [None]:
loss, acc = evaluate_model(model_prepared, val_loader, criterion,'cuda')

In [None]:
acc

69.75

In [None]:
model_prepared = model_prepared.to('cpu')

In [None]:
model_quantized = quantize_fx.convert_fx(model_prepared)

In [None]:
loss, acc = evaluate_model(model_quantized, val_loader, criterion,'cpu')

In [None]:
acc

66.625

In [None]:
model_prepared

#TASK:
1. Split the 1000 samples into two groups, 50%,50%, one of them will be named calibration_dataset while the other will be validation_dataset. You should use calibration dataset to calibrate scale and zero point before performing actual quantization.
2. Make PTQ training for MobileNetV2, using PerchannelMinMax Quantization for weights and PerTensorMinMax Quantization separately.
3. Make Perchannel MovingAverageMinMax Quantization for weights and MovingAverage Pertensor Quantization for activation. Compare to results in 2., Check which one is better. Compare some scales and zero points of from the observers. Explain why one of the solution is better?

In [None]:
import copy
import random
# val_dataset = MyDataset('samples.txt', transform=transform_val)
calibration_dataset = copy.deepcopy(val_dataset)
random.shuffle(calibration_dataset.data)
validation_dataset = copy.deepcopy(calibration_dataset)
validation_dataset.data = validation_dataset.data[:len(validation_dataset)//2]
calibration_dataset.data = calibration_dataset.data[len(calibration_dataset)//2:]
calibration_loader = DataLoader(calibration_dataset, batch_size=100, shuffle=False, num_workers=8, drop_last=True)
validation_loader = DataLoader(validation_dataset, batch_size=100, shuffle=False, num_workers=8, drop_last=True)


In [None]:
print(len(validation_dataset), len(calibration_dataset))

448 448


In [None]:
model_prepared = quantize_fx.prepare_fx(model_to_quantize, qconfig_mapping, example_inputs)
loss, acc = evaluate_model(model_prepared, calibration_loader, criterion,'cuda')



In [None]:
print(loss, acc)

1.343494862318039 69.75


In [None]:
model_prepared = model_prepared.to('cpu')
model_quantized = quantize_fx.convert_fx(model_prepared)
loss, acc = evaluate_model(model_prepared, validation_loader, criterion,'cuda')

In [None]:
print(loss, acc)
# with a quantization with a separate calibration dataset it really seems to work better
# lets compare it with no quantization

1.2071868181228638 71.25


In [None]:
loss, acc = evaluate_model(model_fp, validation_loader, criterion,'cuda')

In [None]:
print(loss, acc)
# funny, quantization improves the results

1.2008384466171265 70.5


# MobileNetV2

In [None]:
  
model_fp = torchvision.models.quantization.mobilenet_v2(pretrained=True)
model_to_quantize = copy.deepcopy(model_fp)
quantize_fx.fuse_fx(model_to_quantize.eval())


In [None]:
qconfig = QConfig(activation=FakeQuantize.with_args(observer=MinMaxObserver,
                                                          quant_min=0,
                                                          quant_max=255,
                                                          qscheme=torch.per_tensor_affine,
                                                          reduce_range=False,),
                        weight=FakeQuantize.with_args(observer=PerChannelMinMaxObserver,
                                                      quant_min=-128,
                                                      quant_max=127,
                                                      dtype=torch.qint8,
                                                      qscheme=torch.per_channel_symmetric,
                                                      #per_tensor_symmetric
                                                      reduce_range=False,
                                                      ch_axis=0,
                                                        ))
qconfig_mapping = QConfigMapping().set_global(qconfig)
model_prepared = quantize_fx.prepare_fx(model_to_quantize, qconfig_mapping, example_inputs)

In [None]:
loss, acc = evaluate_model(model_prepared, calibration_loader, criterion,'cuda')

In [None]:
print(loss, acc)
model_prepared = model_prepared.to('cpu')
model_quantized = quantize_fx.convert_fx(model_prepared)

1.3331595063209534 66.75


In [None]:
loss, acc = evaluate_model(model_prepared, validation_loader, criterion,'cuda')

In [None]:
print(loss, acc)

1.1564173102378845 72.5
