## Quantization of Image Classification Models

In [None]:
import os
from pathlib import Path

import matplotlib.pyplot as plt
import numpy as np
import torch
from addict import Dict
from openvino.tools.pot.api import DataLoader, Metric
from openvino.tools.pot.engines.ie_engine import IEEngine
from openvino.tools.pot.graph import load_model, save_model
from openvino.tools.pot.graph.model_utils import compress_model_weights
from openvino.tools.pot.pipeline.initializer import create_pipeline
from openvino.runtime import Core
from torchvision import transforms
from torchvision.datasets import CIFAR10



In [None]:
# Set the data and model directories
DATA_DIR = 'data'
MODEL_DIR = 'model'

try:
    os.makedirs(DATA_DIR, exist_ok=True)
    os.makedirs(MODEL_DIR, exist_ok=True)
except OSError as e:
    print(e)

### Prepare the Model

In [None]:
model = torch.hub.load("chenyaofo/pytorch-cifar-models", "cifar10_mobilenetv2_x1_0",  pretrained=True)
model.eval()

dummy_input = torch.randn(1, 3, 32, 32)

onnx_model_path = Path(MODEL_DIR)/ 'mobilenet_v2.onnx'
ir_model_xml = onnx_model_path.with_suffix('.xml')
ir_model_bin = onnx_model_path.with_suffix('.bin')

torch.onnx.export(model, dummy_input, onnx_model_path, verbose=True)

# Run OpenVINO Model Optimization tool to convert ONNX to OpenVINO IR
!mo --framework=onnx --data_type=FP16 --input_shape=[1,3,32,32] -m $onnx_model_path  --output_dir $MODEL_DIR

In [None]:
# Difine DataLoader
transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.4914, 0.4822, 0.4465), (0.247, 0.243, 0.261))])
dataset = CIFAR10(root=DATA_DIR, train=False, transform=transform, download=True)

In [None]:
class CifarDataLoader(DataLoader):

    def __init__(self, config):
        """
        Initialize config and dataset.
        :param config: created config with DATA_DIR path.
        """
        if not isinstance(config, Dict):
            config = Dict(config)
        super().__init__(config)
        self.indexes, self.pictures, self.labels = self.load_data(dataset)
        
    def __len__(self):
        return len(self.labels)

    def __getitem__(self, index):
        """
        Return one sample of index, label and picture.
        :param index: index of the taken sample.
        """
        if index >= len(self):
            raise IndexError

        return (self.indexes[index], self.labels[index]), self.pictures[index].numpy()

    def load_data(self, dataset):
        """
        Load dataset in needed format. 
        :param dataset:  downloaded dataset.
        """
        pictures, labels, indexes = [], [], []
        
        for idx, sample in enumerate(dataset):
            pictures.append(sample[0])
            labels.append(sample[1])
            indexes.append(idx)

        return indexes, pictures, labels

### Define Accuracy Metric Calculation

In [None]:
# Custom implementation of classification accuracy metric

class Accuracy(Metric):

    # Required methods
    def __init__(self, top_k=1):
        super().__init__()
        self._top_k = top_k
        self._name = f'accuracy@top{self._top_k}'
        self._matches = []

    @property
    def value(self):
        # Returns accuracy metric value for the last model output.
        return {self._name: self._matches[-1]}

    @property
    def avg_value(self):
        # Returns accuracy metric value for all model outputs
        return {self._name: np.ravel(self._matches).mean()}

    def update(self, output, target):
        """
        Updates prediction matches
        :param output: model output
        :param target; annotations
        """

        if len(output) > 1:
            raise Exception('The accuracy metric cannot be calculated for a model with multiple outputs')

        if isinstance(target, dict):
            target = list(target.values())
        predictions = np.argsort(output[0], axis=1)[:, -self._top_k:]
        match = [float(t in predictions[i]) for i, t in enumerate(target)]

        self._matches.append(match)

    def reset(self):
        # Resets collected matces
        self._matches = []

    def get_attributes(self):
        """
        Returns a dictionary of metric attributes {metric_name:{attribute_name:value}}
        Required attributes: 'direction': 'higher-better' or 'higher-worse' 'type': metric type
        """

        return {self._name: {"direction": "higher-better", "type": "accuracy"}}



### Run Quantization Pipeline and compare the accuracy of the original and quantized models

In [None]:
model_config = Dict({
    'model_name': 'mobilenet_v2',
    'model': ir_model_xml,
    'weights': ir_model_bin
})

engine_config = Dict({
    'device': 'CPU',
    'start_requests_number': 2,
    'eval_requests_number': 2
})

dataset_config = {
    'data_source': DATA_DIR
}

algorithms = [
    {
        'name': 'DefaultQuantization',
        'params': {
            'target_device': 'CPU',
            'preset': 'performance',
            'start_subset_size': 300
        }
    }
]

model = load_model(model_config)

data_loader = CifarDataLoader(dataset_config)

metric = Accuracy(top_k=1)

engine = IEEngine(engine_config, data_loader, metric)

pipeline = create_pipeline(algorithms, engine)

compressed_model = pipeline.run(model)

compress_model_weights(compressed_model)

compressed_model_paths = save_model(model=compressed_model, save_path=MODEL_DIR, model_name="quantized_mobilenet_v2")

#check
print(compressed_model_paths)

compressed_model_xml = compressed_model_paths[0]["model"]
compressed_model_bin = Path(compressed_model_paths[0]["model"]).with_suffix(".bin")

# check
print(compressed_model_xml)

metric_results = pipeline.evaluate(model)

# check value
print(metric_results)

if metric_results:
    for name, value in metric_results.items():
        print(f"Accuracy of the original model: {name}: {value}")

metric_results = pipeline.evaluate(compressed_model)
if metric_results:
    for name, value in metric_results.items():
        print(f"Accuracy of the optimized model: {name}: {value}")

### Compare Performance of the Original and Quantized Models

In [None]:
# Inference FP16 model(IR)
!benchmark_app -m $ir_model_xml -d CPU -api async

In [None]:
# Inference INT8 model(IR)
!benchmark_app -m $compressed_model_xml -d CPU -api async

### Compare results on four pictures

In [None]:
ie = Core()

# read original model
float_model = ie.read_model(
    model=ir_model_xml, weights=ir_model_bin
)

float_compiled_model = ie.compile_model(model=float_model, device_name="CPU")

# read quantized model
quantized_model = ie.read_model(
    model=compressed_model_xml, weights=compressed_model_bin
)

quantized_compiled_model = ie.compile_model(model=quantized_model, device_name="CPU")

In [None]:
# define all possible labels from CIFAR10
label_names = ["airplane", "automobile", "bird", "cat", "deer", "dog", "frog", "horse", "ship", "truck"]
all_pictures = []
all_labels = []

# get all pictures and their labels
for i, batch in enumerate(data_loader):
    all_pictures.append(batch[1])
    all_labels.append(batch[0][1])

In [None]:
from operator import index


def plot_pictures(indexes: list, all_pictures=all_pictures, all_labels=all_labels):
    """
    Plot 4 pictures
    :param indexes: a list of indexes of images to be displayed
    :param all_labels: labels with pictures
    """

    images, labels = [], []
    num_pics = len(indexes)
    assert num_pics == 4, f'No enough indexes for pictures to be displayed, got {num_pics}'
    for idx in indexes:
        assert idx < 10000, 'Cannot get such index, there are only 10000'
        pic = np.rollaxis(all_pictures[idx].squeeze(), 0, 3)
        images.append(pic)

        labels.append(label_names[all_labels[idx]])

    f, axarr = plt.subplots(1, 4)
    axarr[0].imshow(images[0])
    axarr[0].set_title(labels[0])

    axarr[1].imshow(images[1])
    axarr[1].set_title(labels[1])

    axarr[2].imshow(images[2])
    axarr[2].set_title(labels[2])
    
    axarr[3].imshow(images[3])
    axarr[3].set_title(labels[3])


In [None]:
def infer_on_pictures(model, indexes: list, all_pictures=all_pictures):
    """
    Inference model on a few pictures
    :param net: model on which do inference
    :param indexes: list of indexes
    """

    output_key = model.output(0)
    predicted_labels = []
    for idx in indexes:
        assert idx < 10000, 'Cannot get such index, there are only 10000'
        result = model([all_pictures[idx][None,]])[output_key]
        result = label_names[np.argmax(result[0])]
        predicted_labels.append(result)
    return predicted_labels

In [None]:
indexes_to_infer = [7, 12, 15, 20] # to plot specify 4 indexes

plot_pictures(indexes_to_infer)

results_float = infer_on_pictures(float_compiled_model, indexes_to_infer)
results_quantized = infer_on_pictures(quantized_compiled_model, indexes_to_infer)

print(f"Labels for picture from float model: {results_float}")
print(f"Labels for picture from quantized model: {results_quantized}")