In [None]:
# Assignment 2. Quantization for CNN

In [None]:
## Goals
본 실습에서는 CNN(Convolutional Neural Network)을 **양자화(Quantization)**하여 모델의 크기와 실행 시간을 줄이는 방법을 실습합니다.

In [None]:
## Contents
1. **Uniform Quantization**
  - **Linear quantization**을 구현하고 적용합니다.
  - **Linear quantization**을 위한 **Integer-only inference**를 구현하고 적용합니다.
2. **Non-uniform Quantization**
  - **K-means quantization**을 구현하고 적용합니다.
3. **Quantization with PyTorch API**
  - **Post-Training Quantization** (PTQ)
  - **Quantization-Aware Training** (QAT)

In [None]:
import subprocess

print('Installing torchprofile...')
subprocess.check_call(["pip", "install", "torchprofile"])
print('Installing fast-pytorch-kmeans...')
subprocess.check_call(["pip", "install", "fast-pytorch-kmeans"])
print('All required packages have been successfully installed!')

In [None]:
import copy
import math
import random
from collections import OrderedDict, defaultdict

from matplotlib import pyplot as plt
from matplotlib.colors import ListedColormap
import numpy as np
from tqdm.auto import tqdm

import torch
from torch import nn
from torch.optim import *
from torch.optim.lr_scheduler import *
from torch.utils.data import DataLoader
from torchprofile import profile_macs
from torchvision.datasets import *
from torchvision.transforms import *

In [None]:
random.seed(0)
np.random.seed(0)
torch.manual_seed(0)

In [None]:
class VGG(nn.Module):
  ARCH = [64, 128, 'M', 256, 256, 'M', 512, 512, 'M', 512, 512, 'M']

  def __init__(self) -> None:
    super().__init__()

    layers = []
    counts = defaultdict(int)

    def add(name: str, layer: nn.Module) -> None:
      layers.append((f"{name}{counts[name]}", layer))
      counts[name] += 1

    in_channels = 3
    for x in self.ARCH:
      if x != 'M':
        # conv-bn-relu
        add("conv", nn.Conv2d(in_channels, x, 3, padding=1, bias=False))
        add("bn", nn.BatchNorm2d(x))
        add("relu", nn.ReLU(True))
        in_channels = x
      else:
        # maxpool
        add("pool", nn.MaxPool2d(2))
    add("avgpool", nn.AvgPool2d(2))
    self.backbone = nn.Sequential(OrderedDict(layers))
    self.classifier = nn.Linear(512, 10)

  def forward(self, x: torch.Tensor) -> torch.Tensor:
    # backbone: [N, 3, 32, 32] => [N, 512, 2, 2]
    x = self.backbone(x)

    # avgpool: [N, 512, 2, 2] => [N, 512]
    # x = x.mean([2, 3])
    x = x.view(x.shape[0], -1)

    # classifier: [N, 512] => [N, 10]
    x = self.classifier(x)
    return x

In [None]:
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader

# CIFAR-10 데이터셋 로드 및 전처리
transform = transforms.Compose(
    [transforms.ToTensor(), transforms.Normalize(mean=[0.4914, 0.4822, 0.4465], std=[0.2023, 0.1994, 0.2010])]
)
trainset = torchvision.datasets.CIFAR10(
    root="D:\\data", train=True, download=True, transform=transform
)
testset = torchvision.datasets.CIFAR10(
    root="D:\\data", train=False, download=True, transform=transform
)

trainloader = DataLoader(trainset, batch_size=1024, shuffle=True)
testloader = DataLoader(testset, batch_size=100, shuffle=False)

# 모델 학습 함수 정의
def train_model(model, trainloader, epochs=5):
    try:
        device = next(model.parameters()).device
    except StopIteration:
        try:
            device = next(model.buffers()).device
        except StopIteration:
            # 파라미터나 버퍼가 모두 없으면 기본적으로 CPU로 설정
            device = torch.device("cpu")
    optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9)
    criterion = nn.CrossEntropyLoss()

    model.train()

    for epoch in range(epochs):
        running_loss = 0.0
        for inputs, labels in trainloader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        print(f"Epoch [{epoch+1}/{epochs}], Loss: {running_loss/len(trainloader):.4f}")
    return model

# 모델 평가 함수 정의
def evaluate_model(model, testloader):
    try:
        device = next(model.parameters()).device
    except StopIteration:
        try:
            device = next(model.buffers()).device
        except StopIteration:
            # 파라미터나 버퍼가 모두 없으면 기본적으로 CPU로 설정
            device = torch.device("cpu")
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in testloader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    accuracy = 100 * correct / total
    return accuracy

In [None]:
@torch.inference_mode()
def evaluate(
  model: nn.Module,
  dataloader: DataLoader,
  extra_preprocess = None
) -> float:
  model.eval()

  num_samples = 0
  num_correct = 0

  for inputs, targets in tqdm(dataloader, desc="eval", leave=False):
    # Move the data from CPU to GPU
    if torch.cuda.is_available():
      inputs = inputs.cuda()
      targets = targets.cuda()
    if extra_preprocess is not None:
        for preprocess in extra_preprocess:
            inputs = preprocess(inputs)

    # Inference
    outputs = model(inputs)

    # Convert logits to class indices
    outputs = outputs.argmax(dim=1)

    # Update metrics
    num_samples += targets.size(0)
    num_correct += (outputs == targets).sum()

  return (num_correct / num_samples * 100).item()

In [None]:
def get_model_flops(model, inputs):
    num_macs = profile_macs(model, inputs)
    return num_macs

In [None]:
def get_model_size(model: nn.Module, data_width=32):
    """
    calculate the model size in bits
    :param data_width: #bits per element
    """
    num_elements = 0
    for param in model.parameters():
        num_elements += param.numel()
    return num_elements * data_width

Byte = 8
KiB = 1024 * Byte
MiB = 1024 * KiB
GiB = 1024 * MiB

In [None]:
!curl -L "https://www.dropbox.com/scl/fi/ui1fkdvwlhd55fncto8fa/vgg_cifar10_pretrained.pth?rlkey=gu58eq42mo9riot1mexijw79k&st=gak0oq04&dl=1" -o "D:\\data\\vgg_cifar10_pretrained.pth"
checkpoint = torch.load('D:\\data\\vgg_cifar10_pretrained.pth', map_location="cpu")
model = VGG().cuda()
print(f"=> loading checkpoint 'vgg_cifar10_pretrained.pth'")
model.load_state_dict(checkpoint['state_dict'])
recover_model = lambda : model.load_state_dict(checkpoint['state_dict'])

#TORCH_HUB_REPO = "SKKU-ESLAB/pytorch-models"
#MODEL_NAME = "cifar10_vgg9_bn" # cifar10_resnet20, cifar10_vgg11_bn
#
#model = torch.hub.load(TORCH_HUB_REPO, MODEL_NAME, pretrained=True)
#if torch.cuda.is_available():
#    model = model.cuda()

In [None]:
from torch.utils.data import Subset
import numpy as np

image_size = 32
transforms = {
    "train": Compose([
        RandomCrop(image_size, padding=4),
        RandomHorizontalFlip(),
        ToTensor(),
    ]),
    "test": ToTensor(),
}

dataset = {}
for split in ["train", "test"]:
    dataset[split] = CIFAR10(
        root="D:\\data\\cifar10",
        train=(split == "train"),
        download=True,
        transform=transforms[split],
    )


num_classes = 10
targets = np.array(dataset['test'].targets)

indices = []
for class_idx in range(num_classes):
    class_indices = np.where(targets == class_idx)[0]
    selected_indices = np.random.choice(class_indices, len(class_indices) // 100, replace=False)
    indices.extend(selected_indices)

dataset['test'] = Subset(dataset['test'], indices)

dataloader = {}
for split in ['train', 'test']:
    dataloader[split] = DataLoader(
        dataset[split],
        batch_size=512,
        shuffle=(split == 'train'),
        num_workers=0,
        pin_memory=True,
    )

In [None]:
def qconfig_printer(qconfig):
    # 가중치(weight) 관찰자 인스턴스 생성 및 속성 조회
    weight_observer_instance = qconfig.weight()
    weight_observer = weight_observer_instance.__class__.__name__
    weight_dtype = weight_observer_instance.dtype
    weight_qscheme = weight_observer_instance.qscheme
    weight_quant_min = weight_observer_instance.quant_min
    weight_quant_max = weight_observer_instance.quant_max

    # 활성화(activation) 관찰자 인스턴스 생성 및 속성 조회
    activation_observer_instance = qconfig.activation()
    activation_observer = activation_observer_instance.__class__.__name__
    activation_dtype = activation_observer_instance.dtype
    activation_qscheme = activation_observer_instance.qscheme
    activation_quant_min = activation_observer_instance.quant_min
    activation_quant_max = activation_observer_instance.quant_max


    # 결과 출력
    print(f"Weight Observer: {weight_observer}")
    print(f"Weight dtype: {weight_dtype}")
    print(f"Weight qscheme: {weight_qscheme}")
    print(f"Weight quant_min: {weight_quant_min}")
    print(f"Weight quant_max: {weight_quant_max}")
    print("----------------------------------------------")
    print(f"Activation Observer: {activation_observer}")
    print(f"Activation dtype: {activation_dtype}")
    print(f"Activation qscheme: {activation_qscheme}")
    print(f"Activation quant_min: {activation_quant_min}")
    print(f"Activation quant_max: {activation_quant_max}")

In [None]:
fp32_model_accuracy = evaluate(model, dataloader['test'])
fp32_model_size = get_model_size(model)
print(f"fp32 model의 정확도={fp32_model_accuracy:.2f}%")
print(f"fp32 model의 크기={fp32_model_size/MiB:.2f} MiB")

In [None]:
def get_quantized_range(bitwidth):
    quantized_max = (1 << (bitwidth - 1)) - 1
    quantized_min = -(1 << (bitwidth - 1))
    return quantized_min, quantized_max

In [None]:
# ## [실습 1] Linear quantization 구현
"""
다음 linear quantization 함수를 완성해 주세요.

**Hint**:
*   $r=S(q-Z)$ 로 부터, $q = r/S + Z$ 를 도출할 수 있습니다.
*   $r$ 과 $S$ 는 모두 부동 소수점이므로, 정수 $Z$ 을 부동 소수점 $r/S$ 에 직접 더할 수 없습니다. 그러므로 $q = \mathrm{int}(\mathrm{round}(r/S)) + Z$ 와 같이 계산해야 합니다.
*   [`torch.FloatTensor`](https://pytorch.org/docs/stable/tensors.html) 에서 [`torch.IntTensor`]
(https://pytorch.org/docs/stable/tensors.html) 로 변환하려면, 먼저 [`torch.round()`]
(https://pytorch.org/docs/stable/generated/torch.round.html#torch.round), [`torch.Tensor.round()`]
(https://pytorch.org/docs/stable/generated/torch.Tensor.round.html#torch.Tensor.round), 또는 [`torch.Tensor.round_()`]
(https://pytorch.org/docs/stable/generated/torch.Tensor.round_)을 사용하여 모든 값을 부동 소수점 정수로 반올림 한 후, [`torch.Tensor.to(torch.int8)`]
(https://pytorch.org/docs/stable/generated/torch.Tensor.to.html#torch.Tensor.to) 를 사용하여 데이터 유형을 [`torch.float`]
(https://pytorch.org/docs/stable/tensors.html) 에서 [`torch.int8`](https://pytorch.org/docs/stable/tensors.html)로 변환해야 합니다.
"""
