# Neural networks quantization

Today we will deal with neural networks quantization!

Remember that our goal is to reduce network size while keeping the accuracy high!

For this purpose we will use Intel's OpenVINO and Neural Network Compression Framework (NNCF). Be aware that there are other frameworks to choose from: build-in PyTorch quantization, Brevitas from Xilinx, TensorRT and others.

Use this link for OpenVINO reference and documentation: https://docs.openvino.ai/2023.0/home.html.

First, install and import necessary libraries.

In [None]:
# !pip3 install openvino
# !pip3 install nncf

In [None]:
import torch
import nncf
import openvino as ov
import time
import numpy as np
import tqdm

from nncf import NNCFConfig
from nncf.torch import create_compressed_model, register_default_init_args
from openvino.runtime.ie_api import CompiledModel
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets
from torchvision.transforms import ToTensor, RandomRotation
from typing import Union, List, Tuple, Any
from abc import ABC, abstractmethod


Let's start with...

##Quantizing Models Post-training

Post-training model optimization is the process of applying special methods that transform the model into a more hardware-friendly representation without retraining or fine-tuning. The most popular and widely-spread method here is 8-bit post-training quantization because:

- It is easy-to-use.
- It does not hurt accuracy a lot.
- It provides significant performance improvement.
- It suites many hardware devices available in stock since most of them support 8-bit computation natively.

8-bit integer quantization lowers the precision of weights and activations to 8 bits, which leads to significant reduction in the model footprint and significant improvements in inference speed.

Source: https://docs.openvino.ai/2023.0/ptq_introduction.html.

So, first, we need a model to quantize.
Reuse the CNN model from Laboratory 1 (along with training loops, metrics, optimizers and loss function). You can also use the CNN defined below.

Train it for 5 epochs with MNIST dataset. You should get around ~90% accuracy.

Name the final trained model `CNN_MNIST`.

In [None]:
device = torch.device("cuda:0" if (torch.cuda.is_available()) else "cpu")

In [None]:
from typing import Tuple
import tqdm
import torch

def train_or_test(model, data_generator, criterion, metric, mode='test', optimizer=None, update_period=None, device=device) -> Tuple[torch.nn.Module, float, float]:
    # Change model mode to train or test
    if mode == 'train':
        model.train()
    elif mode == 'test':
        model.eval()
    else:
        raise RuntimeError("Unsupported mode.")

    # Move model to the specified device
    model.to(device)

    total_loss = 0.0
    total_accuracy = 0.0
    samples_num = 0

    for i, (X, y) in tqdm.tqdm(enumerate(data_generator)):
        # Convert tensors to the specified device
        X, y = X.to(device), y.to(device)

        # Process input data through the network
        y_pred = model(X)

        # Calculate loss
        loss = criterion(y_pred, y)

        # Reset gradients and perform backpropagation if in training mode
        if mode == 'train':
            optimizer.zero_grad(set_to_none=True)
            loss.backward()
            optimizer.step()

        # Calculate accuracy
        accuracy = metric(y_pred, y)

        total_loss += loss.item() * y_pred.size(0)
        total_accuracy += accuracy.item() * y_pred.size(0)
        samples_num += y_pred.size(0)

    if samples_num == 0:
        return model, 0.0, 0.0

    return model, total_loss / samples_num, total_accuracy / samples_num


In [None]:
import matplotlib.pyplot as plt

def train_and_display(model, train_loader, test_loader, criterion, metric, optimizer, device, num_epochs):
    train_losses = []
    test_losses = []
    train_accuracies = []
    test_accuracies = []

    for epoch in range(num_epochs):
        # Training phase
        model, train_loss, train_accuracy = train_or_test(
            model, train_loader, criterion, metric, mode='train', optimizer=optimizer, device=device)
        train_losses.append(train_loss)
        train_accuracies.append(train_accuracy)

        # Testing phase
        model, test_loss, test_accuracy = train_or_test(
            model, test_loader, criterion, metric, mode='test', device=device)
        test_losses.append(test_loss)
        test_accuracies.append(test_accuracy)

        print(f"Epoch {epoch + 1}/{num_epochs}:")
        print(f"  Train Loss: {train_loss:.4f}  Train Accuracy: {train_accuracy:.4f}")
        print(f"  Test Loss: {test_loss:.4f}  Test Accuracy: {test_accuracy:.4f}")

    # # Plot training history
    # plt.figure(figsize=(12, 5))
    # plt.subplot(1, 2, 1)
    # plt.plot(train_losses, label='Train Loss')
    # plt.plot(test_losses, label='Test Loss')
    # plt.xlabel('Epoch')
    # plt.ylabel('Loss')
    # plt.legend()
    # plt.title('Loss History')

    # plt.subplot(1, 2, 2)
    # plt.plot(train_accuracies, label='Train Accuracy')
    # plt.plot(test_accuracies, label='Test Accuracy')
    # plt.xlabel('Epoch')
    # plt.ylabel('Accuracy')
    # plt.legend()
    # plt.title('Accuracy History')

    # plt.show()
    return model


In [None]:
from abc import ABC, abstractmethod
from typing import Any


class BaseMetric(ABC):

    @abstractmethod
    def __call__(self, y_pred, y_ref) -> Any:
        raise NotImplementedError()



class AccuracyMetric(BaseMetric):

    def __init__(self) -> None:
        pass

    @torch.no_grad()
    def __call__(self, y_pred: torch.Tensor, y_ref: torch.Tensor) -> torch.Tensor:
        """
        :param y_pred: tensor of shape (batch_size, num_of_classes) type float
        :param y_ref: tensor with shape (batch_size,) and type Long
        :return: scalar tensor with accuracy metric for batch
        """
        # Get the predicted class (index with the highest probability) for each sample
        predicted_classes = y_pred.argmax(dim=1)

        # Compare the predicted classes to the reference labels
        correct_predictions = (predicted_classes == y_ref).sum().item()

        # Calculate the accuracy
        accuracy = correct_predictions / y_ref.size(0)  # Divide by the batch size

        return torch.tensor(accuracy)

metric = AccuracyMetric()

In [None]:
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader

# Define the transformations
train_transform = transforms.Compose([
    transforms.RandomRotation(10),  # Random rotation up to 10 degrees
    transforms.RandomCrop(28, padding=4),  # Random crop with padding
    transforms.RandomAffine(degrees=0, translate=(0.1, 0.1)),  # Random translation
    transforms.ToTensor()  # Convert to a PyTorch tensor
])

test_transform = transforms.Compose([
    transforms.ToTensor()
])

train_dataset = torchvision.datasets.MNIST(root='./data', train=True, transform=train_transform, download=True)
test_dataset = torchvision.datasets.MNIST(root='./data', train=False, transform=test_transform, download=True)

batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [None]:
import torch.optim as optim
class CNN(torch.nn.Module):
    def __init__(self, input_shape, num_of_cls) -> None:
        super().__init__()
        ch_in = input_shape[0]
        self.CNN = nn.Sequential(
            # input shape = [1,28,28]
            nn.Conv2d(ch_in, 32, 3, padding=(1, 1)),  # shape [32,28,28]
            nn.BatchNorm2d(32),  # shape [32,28,28]
            nn.ReLU(),  # shape [32,28,28]
            nn.MaxPool2d(2, 2),  # shape [32,14,14]
            nn.Conv2d(32, 64, 3, padding=(1, 1)),  # shape [64,14,14]
            nn.BatchNorm2d(64),  # shape [64,14,14]
            nn.ReLU(),  # shape [64,14,14]
            nn.MaxPool2d(2, 2),  # shape [64,7,7]
            nn.Conv2d(64, 128, 3),  # shape [128,5,5]
            nn.BatchNorm2d(128),  # shape [128,5,5]
            nn.ReLU(),  # shape [128,5,5]
        )

        self.classification_head = nn.Sequential(
            nn.Flatten(), nn.Linear(128 * 5 * 5, num_of_cls), nn.Softmax(dim=1)
        )

    def forward(self, x):
        x = self.CNN(x)
        y = self.classification_head(x)
        return y

input_shape = (1, 28, 28)  # Assuming 28x28 images with 1 channel
num_of_cls = 10  # Adjust the output size for your specific classification problem
net = CNN(input_shape, num_of_cls)
net = net.to(device)

num_epochs = 1
learning_rate = 0.001
loss_fcn = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=learning_rate)


model = train_and_display(
    net, train_loader, test_loader, loss_fcn, metric, optimizer, device, num_epochs
)

# You can alternatively load .pth file created during Lab1 with torch.load.
# Use following code to upload files to colab:
# from google.colab import files
# files.upload()

Now - we will quantize this model to INT8.

NNCF enables post-training quantization (PTQ) by adding the quantization layers into the model graph and then using a subset of the training dataset to initialize the parameters of these additional quantization layers.

By default PTQ uses an unannotated dataset to perform quantization. It uses representative dataset items to estimate the range of activation values in a network and then quantizes the network.

Create an instance of `nncf.Dataset` class by passing two parameters:
- data_source (PyTorch loader containing training samples)
- transform_fn (to make data suitable for API).

Call this instance `calibration_dataset`.

Then, quantize `CNN_MNIST` model with `nncf.quantize()` function, which takes as input two parameters - the model and `calibration_dataset`. Call it `quantized_model`.

In [None]:
print(model)

In [None]:
def transform_fn(data_item):
    images, _ = data_item
    return images

calibration_dataset = nncf.Dataset(train_loader, transform_fn)
quantized_model = nncf.quantize(model, calibration_dataset)

Finally, we will convert models to OpenVINO Intermediate Representation (IR) format.

OpenVINO IR is the proprietary model format of OpenVINO. It is produced after converting a model with model conversion API. It translates the frequently used deep learning operations to their respective similar representation in OpenVINO and tunes them with the associated weights and biases from the trained model. The resulting IR contains two files:
- `xml` - Describes the model topology.
- `bin` - Contains the weights and binary data.

To do that, we'll need `dummy_input` filled with random values and of size:

`[batch_size, channel_number, image_shape[0], image_shape[1]]`

Create `MNIST_fp32_ir` model with `ov.convert_model` that takes three parameters: the model, the dummy input and input size. Use `CNN_MNIST` model.

Then, create `MNIST_int8_ir` model in the same way using `quantized_model`.

Save both models to files (named `MNIST_fp32_ir.xml` and `MNIST_int8_ir.xml` respectively). Use `ov.save_model()` function.

Finally - compile both models with `core.compile_model` function and use  `validate` function to calculate both models' accuracy.

In [None]:
device

In [None]:
core = ov.Core()
devices = core.available_devices

In [None]:
devices[0]

In [17]:
import torch
import openvino as ov
from openvino import CompiledModel
from torchvision import datasets, transforms
from typing import Union
import tqdm
import time


core = ov.Core()
dummy_input = torch.randn(1, 1, 28, 28)

MNIST_fp32_ir = ov.convert_model(model, example_input=dummy_input, input=[-1, 1, 28, 28])

MNIST_int8_ir = ov.convert_model(quantized_model, example_input=dummy_input, input=[-1, 1, 28, 28])

ov.save_model(MNIST_fp32_ir, "mnist_fp32.xml")  # Fill the filename
ov.save_model(MNIST_int8_ir, "mnist_int8.xml")  # Fill the filename

fp32_compiled_model = core.compile_model(MNIST_fp32_ir, "CPU")
int8_compiled_model = core.compile_model(MNIST_int8_ir, "CPU")

def validate(val_loader: torch.utils.data.DataLoader, model: Union[torch.nn.Module, CompiledModel], metric: BaseMetric):
    # Switch to evaluate mode.
    if not isinstance(model, CompiledModel):
        model.eval()
        model.to(device)

    total_accuracy = 0
    samples_num = 0

    with torch.no_grad():
        for i, (images, target) in tqdm.tqdm(enumerate(val_loader)):
            images = images.to(device)
            target = target.to(device)

            # Ensure the model is on the correct device
            if not isinstance(model, CompiledModel):
                model.to(device)

            # Compute the output.
            if isinstance(model, CompiledModel):
                output_layer = model.output(0)
                output = model(images)[output_layer]
                output= torch.from_numpy(output)
            else:
                output = model(images)

            # Measure accuracy and record loss.
            accuracy = metric(output, target)
            total_accuracy += accuracy.item() * target.shape[0]
            samples_num += target.shape[0]

    return total_accuracy / samples_num


# Assuming you have a DataLoader for validation named 'val_loader'
acc1 = validate(val_loader=test_loader, model=fp32_compiled_model, metric=AccuracyMetric())
print(f'FP 32 model acc={acc1:.4f}')

acc2 = validate(val_loader=test_loader, model=int8_compiled_model, metric=AccuracyMetric())
print(f'INT 8 model acc={acc2:.4f}')


313it [00:03, 94.01it/s] 


FP 32 model acc=0.6372


313it [00:02, 146.47it/s]

INT 8 model acc=0.6374





Is INT8 model accuracy similar to FP32 model accuracy? We should hope so!

But let's verify what we have saved in terms of memory resources and network throughput!

First, check the size of OpenVINO IR binary files. You saved both of them on your drive. Is the INT8 model smaller?

We have 244ko vs 120ko

Then, use the following code to benchmark both models. Is INT8 model faster?

313it [00:03, 94.01it/s] 
FP 32 model acc=0.6372
313it [00:02, 146.47it/s]
INT 8 model acc=0.6374

INT 8 is faster !

Note that we used very small network and we deal with very simple task. For bigger models and more complicated networks the perfomance and size differences can be even more significant!

**EXTENSION EXERCISE**

Read about `Quantizing with Accuracy Control` and try to use it for some pretrained network. Use `nncf.quantize_with_accuracy_control`. You can find pretrained networks with `torchvision.models`.

## Quantization-aware Training (QAT)

Training-time model compression improves model performance by applying optimizations (such as quantization) during the training. The training process minimizes the loss associated with the lower-precision optimizations, so it is able to maintain the model’s accuracy while reducing its latency and memory footprint. Generally, training-time model optimization results in better model performance and accuracy than post-training optimization, but it can require more effort to set up.

Quantization-aware Training is a popular method that allows quantizing a model and applying fine-tuning to restore accuracy degradation caused by quantization. In fact, this is the most accurate quantization method.

For this part, let's use a bit harder Dataset. For MNIST, PTQ method was enough, right?

Train your CNN model on CIFAR10 dataset for 10-20 epochs (google it!). Use the same training loops, metrics, optimizers and loss function.

Name the final trained model `CNN_CIFAR`, convert it to OpenVINO IR and save to an xml file.

We start our QAT process with creating compressed models. Just use the following code (fill in the gaps).

In [None]:
train_dataset = ...
test_dataset = ...
train_loader = ...
test_loader = ...
CNN_CIFAR, history = training( ... )

# SAVE floating point model converted to OpenVINO IR
dummy_input = torch.randn(...) # Create dummy_input
CIFAR_fp32_ir = ov.convert_model(...)
ov.save_model(CIFAR_fp32_ir, ...) #TODO - FILL THE FILENAME

# Compress model
nncf_config_dict = {
    "input_info": {"sample_size": [1, ..., ..., ...]}, #Put number of channels, image_size[0] and image_size[1]
    "compression": {
        "algorithm": "quantization",
    },
}
nncf_config = NNCFConfig.from_dict(nncf_config_dict)
nncf_config = register_default_init_args(nncf_config, train_loader)
compression_ctrl, CNN_CIFAR_int8 = create_compressed_model(CNN_CIFAR, nncf_config)

We have our CIFAR CNN model ready to QAT. So... Just train it!

Use your `training` function to train `CNN_CIFAR_int8` model for one more epoch!

Thanks to OpenVINO API, after creating compressed model all we need to do is to continue training on INT8 model :) We call this process fine-tuning. It is applied to futher improve quantized model accuracy! Normally, several epochs of tuning are required with a small learning rate, the same that is usually used at the end of the training of the original model. No other changes in the training pipeline are required.

In [None]:
CNN_CIFAR_int8_finetuned, history = training( ... ) # just one epoch

Convert fine-tuned model to OpenVINO IR, save it to xml and verify both `CIFAR_fp32_ir` and `CIFAR_int8_ir` sizes.

Is the INT8 network smaller?

In [None]:
core = ov.Core()
devices = core.available_devices
dummy_input = ...
CIFAR_int8_ir = ov.convert_model( ... )
ov.save_model( ... )

In [None]:
!ls -lh ...

Finally - compile models, validate and benchmark them.

Did accuracy decreased?
Is INT8 model faster?

In [None]:
fp32_cifar_compiled_model = core.compile_model( ... )
int8_cifar_compiled_model = core.compile_model( ... )
acc1 = validate( ... )
print( ... )
acc2 = validate( ... )
print( ... )

In [None]:
def parse_benchmark_output(benchmark_output: str):
    """Prints the output from benchmark_app in human-readable format"""
    parsed_output = [line for line in benchmark_output if 'FPS' in line]
    print(*parsed_output, sep='\n')


print('Benchmark FP32 model on CPU')
benchmark_output = ! benchmark_app -m CIFAR_fp32_ir.xml -d CPU -api async -t 15 -shape "[1, 3, 32, 32]"
parse_benchmark_output(benchmark_output)

print('Benchmark INT8 model on CPU')
benchmark_output = ! benchmark_app -m CIFAR_int8_ir.xml -d CPU -api async -t 15 -shape "[1, 3, 32, 32]"
parse_benchmark_output(benchmark_output)

**EXTENSION EXERCISE**

Compare PTQ and QAT. Create CNN model and:
- train it for 20 epochs and save as `CNN_long.pth`
- train it for 15 epochs and save as `CNN_short.pth`

Then, apply PTQ on `CNN_long.pth` model and QAT (for 5 epochs) on `CNN_short.pth`. Compare the resulting models (in terms of accuracy, size and FPS).