![logo](../../picture/license_header_logo.png)
**Copyright (c) 2020-2021 CertifAI Sdn. Bhd.**

This program is part of OSRFramework. You can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.

You should have received a copy of the GNU Affero General Public License
along with this program. If not, see http://www.gnu.org/licenses/.

Authored by: [Jacklyn Lim](mailto:jacklyn.lim@certifai.ai)

### Import Libaries

In [1]:
import os
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.quantization import QuantStub, DeQuantStub
from utils import download_model, download_dataset, load_model_state_dict, load_dataset, load_image, compare_performance

### Download Model and Dataset

In [2]:
# model download
MODEL_DOWNLOAD_PATH = 'https://s3.eu-central-1.wasabisys.com/certifai/deployment-training-labs/models/fruit_classifier_state_dict.pt'
MODEL_STATE_DICT_PATH = '../../resources/model/'
MODEL_FILENAME = 'fruits_image_classification.zip'

# data download
DATA_DOWNLOAD_PATH = "https://s3.eu-central-1.wasabisys.com/certifai/deployment-training-labs/fruits_image_classification-20210604T123547Z-001.zip"
DATA_SAVE_PATH = "../../resources/data/"
DATA_ZIP_FILENAME = "fruits_image_classification.zip"

# download model
download_model(MODEL_DOWNLOAD_PATH, MODEL_STATE_DICT_PATH, MODEL_FILENAME)

# download dataset
download_dataset(DATA_DOWNLOAD_PATH, DATA_SAVE_PATH, DATA_ZIP_FILENAME)

model already exists, skipping download
data already exists, skipping download


### Load Original Model

In [3]:
class Net(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(3, 6, 5)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(6, 16, 5)
        # use independent ReLUs for layer fusion.
        self.relu1 = torch.nn.ReLU()
        self.relu2 = torch.nn.ReLU()
        self.relu3 = torch.nn.ReLU()
        self.relu4 = torch.nn.ReLU()
        # Note that the input of this layers is depending on your input image sizes
        self.fc1 = nn.Linear(18496, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 3)

    def forward(self, x):
        x = self.conv1(x)
        x = self.relu1(x)
        x = self.pool(x)
        x = self.conv2(x)
        x = self.relu2(x)
        x = self.pool(x)
        x = torch.flatten(x, 1)
        x = self.fc1(x)
        x = self.relu3(x)
        x = self.fc2(x)
        x = self.relu4(x)
        x = self.fc3(x)
        return x

In [4]:
# load original model
model_fp32 = Net()
model_fp32 = load_model_state_dict(model_fp32, MODEL_STATE_DICT_PATH + MODEL_FILENAME)
model_fp32.eval()

# Print original model
print("\033[1mFP32 Model: \033[0m")
print(model_fp32)
print("\n")

[1mFP32 Model: [0m
Net(
  (conv1): Conv2d(3, 6, kernel_size=(5, 5), stride=(1, 1))
  (pool): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (conv2): Conv2d(6, 16, kernel_size=(5, 5), stride=(1, 1))
  (relu1): ReLU()
  (relu2): ReLU()
  (relu3): ReLU()
  (relu4): ReLU()
  (fc1): Linear(in_features=18496, out_features=120, bias=True)
  (fc2): Linear(in_features=120, out_features=84, bias=True)
  (fc3): Linear(in_features=84, out_features=3, bias=True)
)




### Dynamic Quantization

In [5]:
def dynamic_quantization(model_fp32):
    """ Returns a quantized model """
    pass
    ############ Enter your code here ############
    
    ############ Enter your code here ############

In [6]:
# quantising model dynamically
model_int8 = dynamic_quantization(model_fp32)

# Print quantized model
print("\033[1mINT8 Model: \033[0m")
print(model_int8)
print("\n")

[1mINT8 Model: [0m
Net(
  (conv1): Conv2d(3, 6, kernel_size=(5, 5), stride=(1, 1))
  (pool): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (conv2): Conv2d(6, 16, kernel_size=(5, 5), stride=(1, 1))
  (relu1): ReLU()
  (relu2): ReLU()
  (relu3): ReLU()
  (relu4): ReLU()
  (fc1): DynamicQuantizedLinear(in_features=18496, out_features=120, dtype=torch.qint8, qscheme=torch.per_tensor_affine)
  (fc2): DynamicQuantizedLinear(in_features=120, out_features=84, dtype=torch.qint8, qscheme=torch.per_tensor_affine)
  (fc3): DynamicQuantizedLinear(in_features=84, out_features=3, dtype=torch.qint8, qscheme=torch.per_tensor_affine)
)




#### Perform Inference and Compare Performance

In [7]:
INFERENCE_IMAGE_PATH = "../../resources/data/fruits_image_classification/test/apple/image1.jpg"
TEST_DATASET_ROOTDIR = "../../resources/data/fruits_image_classification/test"

# load image
inference_image = load_image(INFERENCE_IMAGE_PATH)

# load test dataset
test_dataloader = load_dataset(TEST_DATASET_ROOTDIR)

# compare performance between original model and quantized model
compare_performance(model_fp32, model_int8, "model_fp32", "model_int8", inference_image, test_dataloader)

Comparing size of models
model:  model_fp32  	 Size (KB): 8935.167
model:  model_int8  	 Size (KB): 2247.663
3.98 times smaller

Comparing latency of models
model:  model_fp32  	 prediction time: 0.004000425338745117s
model:  model_int8  	 prediction time: 0.00299835205078125s

Comparing accuracy of models
model:  model_fp32  	 Test Accuracy: 0.74
model:  model_int8  	 Test Accuracy: 0.74


### Static Quantization

#### Fuse Layers

In [8]:
def fuse_layers(model, fusion_layers_list):
    """
    A function to fuse specified layers in the model.

    Args:
        model (Net): model to be quantized
        fusion_layers_list (list): a list of layers to be fused

    Returns:
        Model with fused layer
    """
    pass
    ############ Enter your code here ############
    
    ############ Enter your code here ############

In [9]:
# fuse layers
model_fp32_fused = fuse_layers(model_fp32, [['conv1', 'relu1'], ['conv2', 'relu2'], ['fc1', 'relu3'], ['fc2', 'relu4']])

##### Check if Fused Model Outputs the Same as the Original Model

In [10]:
def check_model_equivalence(model_fp32, model_fp32_fused, device, rtol=1e-05, atol=1e-08, num_tests=100, input_size=(1,3,32,32)):
    """
    Check if the fused model has approximately the same output as the original model. 
    
    Args:
        model_fp32 (Net): Original model
        model_fp32_fused (Net): Fused model
        device (String): Inference device (CPU)
        rtol (float): The relative tolerance parameter (see numpy documentation for np.allclose)
        atol (float): The absolute tolerance parameter (see numpy documentation for np.allclose)
        num_tests (int): Number of iterations to test the equaivalance of both models
        input_size (tuple): image size

    Returns:
        True if two arrays are element-wise equal within a tolerance, otherwise False   
    """
    model_fp32.to(device)
    model_fp32_fused.to(device)

    for _ in range(num_tests):
        x = torch.rand(size=input_size).to(device)
        y_model_fp32 = model_fp32(x).detach().cpu().numpy()
        y_model_fp32_fused = model_fp32_fused(x).detach().cpu().numpy()
        
        # Returns True if two arrays are element-wise equal within a tolerance
        if np.allclose(a=y_model_fp32, b=y_model_fp32_fused, rtol=rtol, atol=atol, equal_nan=False) == False: 
            print("Model equivalence test sample failed: ")
            print(y_model_fp32)
            print(y_model_fp32_fused)
            return False

    return True

In [11]:
# check if fused model outputs the same as the original model
assert check_model_equivalence(model_fp32=model_fp32,
                               model_fp32_fused=model_fp32_fused,
                               device="cpu", rtol=1e-03, atol=1e-06, num_tests=100,
                               input_size=(1, 3, 150, 150)), "Fused model is not equivalent to the original model!"

#### Define Quantized Model Architecture


In [12]:
class QuantizedNet(nn.Module):
    def __init__(self, model_fp32):
        pass
        ############ Enter your code here ############

        ############ Enter your code here ############

    def forward(self, x):
        pass
        ############ Enter your code here ############

        ############ Enter your code here ############

Create a quantized model after fusing layers

Note: this step usually has to come after the layer fusion if there is a BatchNorm layer since there is no quantized layer implementation for a single batch normalization layer

In [13]:
quantized_model = QuantizedNet(model_fp32=model_fp32_fused)

#### Set Backend to Run Quantized Operators

In [14]:
def set_operatoring_backend(model):
    """
    A function to set backend to run the quantized operators 

    Args:
        model (Net): model to be quantized

    Returns:
        Model with set qconfig (engine used for quantized computations)
    """
    pass
    ############ Enter your code here ############
    
    ############ Enter your code here ############

# set backend
quantized_model = set_operatoring_backend(quantized_model)

#### Calibration With a Representative Dataset

In [15]:
def calibration(model, dataloader, device="cpu"):
    """ Returns calibrated model"""
    pass
    ############ Enter your code here ############
    
    ############ Enter your code here ############

In [16]:
CALIBRATION_DATASET_ROOTDIR = "../../resources/data/fruits_image_classification/train"

# load calibration dataset
calibration_dataloader = load_dataset(CALIBRATION_DATASET_ROOTDIR)

# prepare a copy of the model for the calibration step
quantized_model = torch.quantization.prepare(quantized_model, inplace=True)

# calibration
calibrated_model = calibration(quantized_model, calibration_dataloader)

#### Convert Calibrated Model to a Quantized Model

In [17]:
def convert_to_quantized_model(calibrated_model):
    """ Returns a quantized int8 model"""
    pass
    ############ Enter your code here ############
    
    ############ Enter your code here ############

In [18]:
# Convert to a quantized model
model_int8 = convert_to_quantized_model(calibrated_model)
model_int8.eval()

# Print quantized model
print("\033[1mINT8 Model: \033[0m")
print(model_int8)
print("\n")

[1mINT8 Model: [0m
QuantizedNet(
  (quant): Quantize(scale=tensor([0.0186]), zero_point=tensor([114]), dtype=torch.quint8)
  (model_fp32): Net(
    (conv1): QuantizedConvReLU2d(3, 6, kernel_size=(5, 5), stride=(1, 1), scale=0.05027412995696068, zero_point=0)
    (pool): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (conv2): QuantizedConvReLU2d(6, 16, kernel_size=(5, 5), stride=(1, 1), scale=0.09794529527425766, zero_point=0)
    (relu1): Identity()
    (relu2): Identity()
    (relu3): Identity()
    (relu4): Identity()
    (fc1): QuantizedLinearReLU(in_features=18496, out_features=120, scale=0.5772956609725952, zero_point=0, qscheme=torch.per_tensor_affine)
    (fc2): QuantizedLinearReLU(in_features=120, out_features=84, scale=0.28039851784706116, zero_point=0, qscheme=torch.per_tensor_affine)
    (fc3): QuantizedLinear(in_features=84, out_features=3, scale=0.545403003692627, zero_point=160, qscheme=torch.per_tensor_affine)
  )
  (dequant): DeQuantize

#### Compare Model Performance

In [19]:
INFERENCE_IMAGE_PATH = "../../resources/data/fruits_image_classification/test/apple/image1.jpg"
TEST_DATASET_ROOTDIR = "../../resources/data/fruits_image_classification/test"

# COMPARING PERFORMANCE 
print("\033[1mCOMPARING PERFORMANCE... \033[0m")
# load image
inference_image = load_image(INFERENCE_IMAGE_PATH)

# load test dataset
test_dataloader = load_dataset(TEST_DATASET_ROOTDIR)

# compare performance between original model and quantized model
compare_performance(model_fp32, model_int8, "model_fp32", "model_int8", inference_image, test_dataloader)

[1mCOMPARING PERFORMANCE... [0m
Comparing size of models
model:  model_fp32  	 Size (KB): 8935.167
model:  model_int8  	 Size (KB): 2241.183
3.99 times smaller

Comparing latency of models
model:  model_fp32  	 prediction time: 0.0029985904693603516s
model:  model_int8  	 prediction time: 0.006000041961669922s

Comparing accuracy of models
model:  model_fp32  	 Test Accuracy: 0.74
model:  model_int8  	 Test Accuracy: 0.74


#### Save Torchscript Model

In [20]:
def save_torchscript_model(model, model_dir, model_filename):
    pass
    ############ Enter your code here ############
    
    ############ Enter your code here ############

In [21]:
MODEL_SAVE_PATH = "../generated_model"
TORCHSCRIPT_MODEL_FILENAME = "eagermode_static_quantized_model.pt"

save_torchscript_model(model_int8, MODEL_SAVE_PATH, TORCHSCRIPT_MODEL_FILENAME)