In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

from torch.quantization import fuse_modules
from torch.nn.quantized import FloatFunctional
from torch import Tensor
import torchvision
from torchvision import datasets, transforms

import os
import random
from tqdm import tqdm
import time
import copy
import numpy as np
from resnet import resnet18

# Set up warnings
import warnings
warnings.filterwarnings(
    action='ignore',
    category=DeprecationWarning,
    module=r'.*'
)
warnings.filterwarnings(
    action='default',
    module=r'torch.ao.quantization'
)

In [2]:
def set_random_seeds(random_seed=0):
    torch.manual_seed(random_seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    np.random.seed(random_seed)
    random.seed(random_seed)

In [3]:
def prepare_dataloader(num_workers=8, train_batch_size=128, eval_batch_size=256):

    train_transform = transforms.Compose([
        transforms.RandomCrop(32, padding=4),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        # transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
        transforms.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225))
    ])

    test_transform = transforms.Compose([
        transforms.ToTensor(),
        # transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
        transforms.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225))
    ])

    train_set = torchvision.datasets.CIFAR10(root="data", train=True, download=True, transform=train_transform) 
    # We will use test set for validation and test in this project.
    # Do not use test set for validation in practice!
    test_set = torchvision.datasets.CIFAR10(root="data", train=False, download=True, transform=test_transform)

    train_sampler = torch.utils.data.RandomSampler(train_set)
    test_sampler = torch.utils.data.SequentialSampler(test_set)

    train_loader = torch.utils.data.DataLoader(
        dataset=train_set, batch_size=train_batch_size,
        sampler=train_sampler, num_workers=num_workers)

    test_loader = torch.utils.data.DataLoader(
        dataset=test_set, batch_size=eval_batch_size,
        sampler=test_sampler, num_workers=num_workers)

    return train_loader, test_loader

In [4]:
def evaluate_model(model, test_loader, device, criterion=None):

    model.eval()
    model.to(device)

    running_loss = 0
    running_corrects = 0

    for inputs, labels in test_loader:

        inputs = inputs.to(device)
        labels = labels.to(device)

        outputs = model(inputs)
        _, preds = torch.max(outputs, 1)

        if criterion is not None:
            loss = criterion(outputs, labels).item()
        else:
            loss = 0

        # statistics
        running_loss += loss * inputs.size(0)
        running_corrects += torch.sum(preds == labels.data)

    eval_loss = running_loss / len(test_loader.dataset)
    eval_accuracy = running_corrects / len(test_loader.dataset)

    return eval_loss, eval_accuracy

In [5]:
def train_model(model, train_loader, test_loader, device, learning_rate=1e-1, num_epochs=200):

    # The training configurations were not carefully selected.

    criterion = nn.CrossEntropyLoss()

    model.to(device)

    # It seems that SGD optimizer is better than Adam optimizer for ResNet18 training on CIFAR10.
    optimizer = optim.SGD(model.parameters(), lr=learning_rate, momentum=0.9, weight_decay=1e-4)
    # scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=500)
    scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, milestones=[100, 150], gamma=0.1, last_epoch=-1)
    # optimizer = optim.Adam(model.parameters(), lr=learning_rate, betas=(0.9, 0.999), eps=1e-08, weight_decay=0, amsgrad=False)

    # Evaluation
    model.eval()
    eval_loss, eval_accuracy = evaluate_model(model=model, test_loader=test_loader, device=device, criterion=criterion)
    print("Epoch: {:02d} Eval Loss: {:.3f} Eval Acc: {:.3f}".format(-1, eval_loss, eval_accuracy))

    for epoch in range(num_epochs):

        # Training
        model.train()

        running_loss = 0
        running_corrects = 0

        for inputs, labels in train_loader:

            inputs = inputs.to(device)
            labels = labels.to(device)

            # zero the parameter gradients
            optimizer.zero_grad()

            # forward + backward + optimize
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            # statistics
            running_loss += loss.item() * inputs.size(0)
            running_corrects += torch.sum(preds == labels.data)

        train_loss = running_loss / len(train_loader.dataset)
        train_accuracy = running_corrects / len(train_loader.dataset)

        # Evaluation
        model.eval()
        eval_loss, eval_accuracy = evaluate_model(model=model, test_loader=test_loader, device=device, criterion=criterion)

        # Set learning rate scheduler
        scheduler.step()

        print("Epoch: {:03d} Train Loss: {:.3f} Train Acc: {:.3f} Eval Loss: {:.3f} Eval Acc: {:.3f}".format(epoch, train_loss, train_accuracy, eval_loss, eval_accuracy))

    return model

In [6]:
def calibrate_model(model, loader, device=torch.device("cpu:0")):

    model.to(device)
    model.eval()

    for inputs, labels in loader:
        inputs = inputs.to(device)
        labels = labels.to(device)
        _ = model(inputs)

def measure_inference_latency(model,
                              device,
                              input_size=(1, 3, 32, 32),
                              num_samples=100,
                              num_warmups=10):

    model.to(device)
    model.eval()

    x = torch.rand(size=input_size).to(device)

    with torch.no_grad():
        for _ in range(num_warmups):
            _ = model(x)
    torch.cuda.synchronize()

    with torch.no_grad():
        start_time = time.time()
        for _ in range(num_samples):
            _ = model(x)
            torch.cuda.synchronize()
        end_time = time.time()
    elapsed_time = end_time - start_time
    elapsed_time_ave = elapsed_time / num_samples

    return elapsed_time_ave

In [7]:
def save_model(model, model_dir, model_filename):

    if not os.path.exists(model_dir):
        os.makedirs(model_dir)
    model_filepath = os.path.join(model_dir, model_filename)
    torch.save(model.state_dict(), model_filepath)

def load_model(model, model_filepath, device):

    model.load_state_dict(torch.load(model_filepath, map_location=device))

    return model

In [8]:
def save_torchscript_model(model, model_dir, model_filename):

    if not os.path.exists(model_dir):
        os.makedirs(model_dir)
    model_filepath = os.path.join(model_dir, model_filename)
    torch.jit.save(torch.jit.script(model), model_filepath)
    
def load_torchscript_model(model_filepath, device):

    model = torch.jit.load(model_filepath, map_location=device)

    return model

In [9]:
def create_model(num_classes=10):

    model = resnet18(num_classes=num_classes, pretrained=False)
    return model

In [10]:
class QuantizedResNet18(nn.Module):
    def __init__(self, model_fp32):
        super(QuantizedResNet18, self).__init__()
        # QuantStub converts tensors from floating point to quantized.
        # This will only be used for inputs.
        self.quant = torch.ao.quantization.QuantStub()
        # DeQuantStub converts tensors from quantized to floating point.
        # This will only be used for outputs.
        self.dequant = torch.ao.quantization.DeQuantStub()
        # FP32 model
        self.model_fp32 = model_fp32
        
    def model_fuse(self):
        # ③ layer fusion을 적용합니다.
        # Fuse the model in place rather manually.
        self.model_fp32 = torch.ao.quantization.fuse_modules_qat(self.model_fp32, [["conv1", "bn1"]], inplace=True)
        for module_name, module in self.model_fp32.named_children():
            if "layer" in module_name:
                for basic_block_name, basic_block in module.named_children():
                    torch.ao.quantization.fuse_modules_qat(basic_block, [["conv1", "bn1"], ["conv2", "bn2"]], inplace=True)
                    for sub_block_name, sub_block in basic_block.named_children():
                        if sub_block_name == "downsample":
                            torch.ao.quantization.fuse_modules_qat(sub_block, [["0", "1"]], inplace=True)

    def forward(self, x):
        # manually specify where tensors will be converted from floating
        # point to quantized in the quantized model
        x = self.quant(x)
        x = self.model_fp32(x)
        # manually specify where tensors will be converted from quantized
        # to floating point in the quantized model
        x = self.dequant(x)
        return x

In [11]:
def model_equivalence(model_1, model_2, device, rtol=1e-05, atol=1e-08, num_tests=100, input_size=(1,3,32,32)):

    model_1.to(device)
    model_2.to(device)

    for _ in range(num_tests):
        x = torch.rand(size=input_size).to(device)
        y1 = model_1(x).detach().cpu().numpy()
        y2 = model_2(x).detach().cpu().numpy()
        
        
        if np.allclose(a=y1, b=y2, rtol=rtol, atol=atol, equal_nan=False) == False:
            print("Model equivalence test sample failed: ")
            print(y1)
            print(y2)
            return False

    return True

In [12]:
random_seed = 0
num_classes = 10
cuda_device = torch.device("cuda:0")
cpu_device = torch.device("cpu:0")
model_dir = "saved_models"
model_filename = "resnet18_cifar10.pt"
quantized_model_filename = "resnet18_quantized_cifar10.pt"
model_filepath = os.path.join(model_dir, model_filename)
quantized_model_filepath = os.path.join(model_dir, quantized_model_filename)
set_random_seeds(random_seed=random_seed)

In [13]:
train_loader, test_loader = prepare_dataloader(num_workers=8, train_batch_size=128, eval_batch_size=256)

Files already downloaded and verified
Files already downloaded and verified


In [14]:
# Create an untrained model.
model = create_model(num_classes=num_classes)

print("Training Model...")
# model = train_model(model=model, train_loader=train_loader, test_loader=test_loader, device=cuda_device, learning_rate=1e-1, num_epochs=1)

# Save model.
save_model(model=model, model_dir=model_dir, model_filename=model_filename)

Training Model...


In [15]:
model = load_model(model=create_model(num_classes=num_classes), model_filepath=model_filepath, device=cuda_device)

# ② 모델을 CPU 상태로 두고 학습 모드로 변환합니다. (model.train())
model.to(cpu_device)

# Make a copy of the model for layer fusion
fused_model = QuantizableResNet18(10)
fused_model.load_state_dict(model.state_dict())
fused_model.fuse_model()


In [16]:
# The model has to be switched to training mode before any layer fusion.
# Otherwise the quantization aware training will not work correctly.
model.train()
fused_model.train()

QuantizableResNet(
  (conv1): ConvBnReLU2d(
    (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
    (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
  )
  (bn1): Identity()
  (relu): Identity()
  (layer1): Sequential(
    (0): QuantizableBasicBlock(
      (conv1): ConvBn2d(
        (0): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      )
      (bn1): Identity()
      (conv2): ConvBn2d(
        (0): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      )
      (bn2): Identity()
      (shortcut): Sequential()
      (float_functional): FloatFunctional(
        (activation_post_process): Identity()
      )
    )
    (1): QuantizableBasicBlock(
      (conv1): C

In [17]:
# ④ 모델을 평가 모드로 변환 후 (model.eval()) layer fusion이 잘 적용되었는 지 확인합니다. 확인 후에는 다시 학습 모드로 변경해 줍니다.
# Model and fused model should be equivalent.
model.eval()
fused_model.eval()
# assert model_equivalence(model_1=model, model_2=fused_model, device=cpu_device, rtol=1e-03, atol=1e-06, num_tests=100, input_size=(1,3,32,32)), "Fused model is not equivalent to the original model!"

QuantizableResNet(
  (conv1): ConvBnReLU2d(
    (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
    (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
  )
  (bn1): Identity()
  (relu): Identity()
  (layer1): Sequential(
    (0): QuantizableBasicBlock(
      (conv1): ConvBn2d(
        (0): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      )
      (bn1): Identity()
      (conv2): ConvBn2d(
        (0): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      )
      (bn2): Identity()
      (shortcut): Sequential()
      (float_functional): FloatFunctional(
        (activation_post_process): Identity()
      )
    )
    (1): QuantizableBasicBlock(
      (conv1): C

In [18]:
quantized_model = QuantizedResNet18(model_fp32=fused_model)
quantized_model.eval()

QuantizedResNet18(
  (quant): QuantStub()
  (dequant): DeQuantStub()
  (model_fp32): QuantizableResNet(
    (conv1): ConvBnReLU2d(
      (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): ReLU()
    )
    (bn1): Identity()
    (relu): Identity()
    (layer1): Sequential(
      (0): QuantizableBasicBlock(
        (conv1): ConvBn2d(
          (0): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
          (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        )
        (bn1): Identity()
        (conv2): ConvBn2d(
          (0): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
          (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        )
        (bn2): Identity()
        (shortcut): Sequential()
        (float_functional)

In [19]:
quantized_model.qconfig = torch.ao.quantization.get_default_qconfig("fbgemm")

In [20]:
# https://pytorch.org/docs/stable/_modules/torch/quantization/quantize.html#prepare_qat
quantized_model.train()
quantized_model = torch.ao.quantization.prepare_qat(quantized_model)

# ⑧ 모델을 다시 CUDA가 상태로 적용하고 CUDA를 이용하여 QAT를 모델 학습을 진행합니다.
print("Training QAT Model...")
# quantized_model = train_model(model=quantized_model, train_loader=train_loader, test_loader=test_loader, device=cuda_device, learning_rate=1e-3, num_epochs=1)

Training QAT Model...




In [21]:
# ⑨ 모델을 다시 CPU 상태로 두고 QAT가 적용된 floating point 모델을 quantized integer model로 변환합니다.    
quantized_model.to(cpu_device)

# ⑪ quantized integer model을 저장합니다.
quantized_model.eval()
quantized_model = torch.ao.quantization.convert(quantized_model)

save_torchscript_model(model=quantized_model, model_dir=model_dir, model_filename=quantized_model_filename)



In [22]:
quantized_jit_model = load_torchscript_model(model_filepath=quantized_model_filepath, device=cpu_device)

# _, fp32_eval_accuracy = evaluate_model(model=model, test_loader=test_loader, device=cpu_device, criterion=None)

In [23]:
print(quantized_jit_model)

RecursiveScriptModule(
  original_name=QuantizedResNet18
  (quant): RecursiveScriptModule(original_name=Quantize)
  (dequant): RecursiveScriptModule(original_name=DeQuantize)
  (model_fp32): RecursiveScriptModule(
    original_name=QuantizableResNet
    (conv1): RecursiveScriptModule(original_name=ConvReLU2d)
    (bn1): RecursiveScriptModule(original_name=Identity)
    (relu): RecursiveScriptModule(original_name=Identity)
    (layer1): RecursiveScriptModule(
      original_name=Sequential
      (0): RecursiveScriptModule(
        original_name=QuantizableBasicBlock
        (conv1): RecursiveScriptModule(original_name=Conv2d)
        (bn1): RecursiveScriptModule(original_name=Identity)
        (conv2): RecursiveScriptModule(original_name=Conv2d)
        (bn2): RecursiveScriptModule(original_name=Identity)
        (shortcut): RecursiveScriptModule(original_name=Sequential)
        (float_functional): RecursiveScriptModule(
          original_name=QFunctional
          (activation_post_pr

In [24]:
_, int8_eval_accuracy = evaluate_model(model=quantized_jit_model, test_loader=test_loader, device=cpu_device, criterion=None)

[256, 64, 32, 32] [256, 64, 32, 32]
[256, 64, 32, 32] [256, 64, 32, 32]


RuntimeError: The following operation failed in the TorchScript interpreter.
Traceback of TorchScript, serialized code (most recent call last):
  File "code/__torch__.py", line 14, in forward
    x0 = (quant).forward(x, )
    model_fp32 = self.model_fp32
    x1 = (model_fp32).forward(x0, )
          ~~~~~~~~~~~~~~~~~~~ <--- HERE
    dequant = self.dequant
    return (dequant).forward(x1, )
  File "code/__torch__/ResNet2D.py", line 25, in forward
    out0 = (layer1).forward(out, )
    layer2 = self.layer2
    out1 = (layer2).forward(out0, )
            ~~~~~~~~~~~~~~~ <--- HERE
    layer3 = self.layer3
    out2 = (layer3).forward(out1, )
  File "code/__torch__/torch/nn/modules/container/___torch_mangle_3.py", line 12, in forward
    _0 = getattr(self, "0")
    _1 = getattr(self, "1")
    input0 = (_0).forward(input, )
              ~~~~~~~~~~~ <--- HERE
    return (_1).forward(input0, )
  def __len__(self: __torch__.torch.nn.modules.container.___torch_mangle_3.Sequential) -> int:
  File "code/__torch__/ResNet2D/___torch_mangle_2.py", line 22, in forward
    out0 = (bn2).forward((conv2).forward(out, ), )
    shortcut = self.shortcut
    tmp = (shortcut).forward(x, )
           ~~~~~~~~~~~~~~~~~ <--- HERE
    print(torch.size(out0), torch.size(tmp))
    float_functional = self.float_functional
  File "code/__torch__/torch/nn/modules/container/___torch_mangle_1.py", line 13, in forward
    _1 = getattr(self, "1")
    input0 = (_0).forward(input, )
    return (_1).forward(input0, )
            ~~~~~~~~~~~ <--- HERE
  def __len__(self: __torch__.torch.nn.modules.container.___torch_mangle_1.Sequential) -> int:
    return 2
  File "code/__torch__/torch/ao/nn/quantized/modules/batchnorm.py", line 26, in forward
    scale = self.scale
    zero_point = self.zero_point
    _0 = ops.quantized.batch_norm2d(input, weight, bias, running_mean, running_var, 1.0000000000000001e-05, annotate(float, scale), annotate(int, zero_point))
                                                                                                            ~~~~~~~~~~~~~~~~~~~~~ <--- HERE
    return _0

Traceback of TorchScript, original code (most recent call last):
  File "C:\Users\devLupin\AppData\Local\Temp\ipykernel_3416\3070739321.py", line 29, in forward
        # point to quantized in the quantized model
        x = self.quant(x)
        x = self.model_fp32(x)
            ~~~~~~~~~~~~~~~ <--- HERE
        # manually specify where tensors will be converted from quantized
        # to floating point in the quantized model
  File "c:\Users\devLupin\Desktop\torch-quantization\ResNet2D.py", line 116, in forward
        out = self.relu(self.bn1(self.conv1(x)))
        out = self.layer1(out)
        out = self.layer2(out)
              ~~~~~~~~~~~ <--- HERE
        out = self.layer3(out)
        out = self.layer4(out)
  File "c:\Users\devLupin\Miniconda3\envs\qat\lib\site-packages\torch\nn\modules\container.py", line 204, in forward
    def forward(self, input):
        for module in self:
            input = module(input)
                    ~~~~~~ <--- HERE
        return input
  File "c:\Users\devLupin\Desktop\torch-quantization\ResNet2D.py", line 86, in forward
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        tmp = self.shortcut(x)
              ~~~~~~~~~~~~~ <--- HERE
        print(out.shape, tmp.shape)
    
  File "c:\Users\devLupin\Miniconda3\envs\qat\lib\site-packages\torch\nn\modules\container.py", line 204, in forward
    def forward(self, input):
        for module in self:
            input = module(input)
                    ~~~~~~ <--- HERE
        return input
  File "c:\Users\devLupin\Miniconda3\envs\qat\lib\site-packages\torch\ao\nn\quantized\modules\batchnorm.py", line 65, in forward
        # disabling this since this is not symbolically traceable
        # self._check_input_dim(input)
        return torch.ops.quantized.batch_norm2d(
               ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ <--- HERE
            input, self.weight, self.bias, self.running_mean,
            self.running_var, self.eps, self.scale, self.zero_point)
RuntimeError: Cannot input a tensor of dimension other than 0 as a scalar argument


In [None]:
# Skip this assertion since the values might deviate a lot.
# assert model_equivalence(model_1=model, model_2=quantized_jit_model, device=cpu_device, rtol=1e-01, atol=1e-02, num_tests=100, input_size=(1,3,32,32)), "Quantized model deviates from the original model too much!"

print("FP32 evaluation accuracy: {:.3f}".format(fp32_eval_accuracy))
print("INT8 evaluation accuracy: {:.3f}".format(int8_eval_accuracy))

fp32_cpu_inference_latency = measure_inference_latency(model=model, device=cpu_device, input_size=(1,3,32,32), num_samples=100)
int8_cpu_inference_latency = measure_inference_latency(model=quantized_model, device=cpu_device, input_size=(1,3,32,32), num_samples=100)
int8_jit_cpu_inference_latency = measure_inference_latency(model=quantized_jit_model, device=cpu_device, input_size=(1,3,32,32), num_samples=100)
fp32_gpu_inference_latency = measure_inference_latency(model=model, device=cuda_device, input_size=(1,3,32,32), num_samples=100)

print("FP32 CPU Inference Latency: {:.2f} ms / sample".format(fp32_cpu_inference_latency * 1000))
print("FP32 CUDA Inference Latency: {:.2f} ms / sample".format(fp32_gpu_inference_latency * 1000))
print("INT8 CPU Inference Latency: {:.2f} ms / sample".format(int8_cpu_inference_latency * 1000))
print("INT8 JIT CPU Inference Latency: {:.2f} ms / sample".format(int8_jit_cpu_inference_latency * 1000))

## library version issue

- https://discuss.pytorch.org/t/torch-tensor-returns-error-while-creating-a-copy/103510
- https://github.com/pytorch/pytorch/issues/76726
- 버그가 수정되지 않을 것이라고 함.
  - 현재 새로운 컴파일러 스택으로 이동하고 있고, Torchscript는 현재 유지 관리 상태