## Measure inference time and evaluate the accuracies for different types of model states (FP32 | INT) and on different devices (CPU | GPU)
###### Author: Jan Klhufek (iklhufek@fit.vut.cz)

In [None]:
import time
import copy
import torch
import torch.nn as nn
from typing import Tuple

from data.data_loaders import ImagenetLoader, Imagenet100Loader, Cifar10Loader
from utils.utils import *
from models.mobilenet_v2 import mobilenetv2
from models.mobilenet_v1 import mobilenetv1
import eval

In [None]:
def measure_inference_latency(model: nn.Module, device: str, input_size: Tuple[int, int, int, int] = (1, 3, 32, 32), num_samples: int = 100, num_warmups: int = 10, half_tensor: bool = False) -> float:
    """
    Measures the average inference latency of a neural network model on a given device.
    This function runs the model with a specified number of samples and warm-up iterations to measure the average inference time.

    Code adapted and modified from: https://leimao.github.io/blog/PyTorch-Quantization-Aware-Training/

    Args:
        model (nn.Module): The neural network model to be evaluated.
        device (str): The device (CPU or CUDA) on which to perform the inference.
        input_size (Tuple[int, int, int, int]): The size of the input tensor. Defaults to (1, 3, 32, 32).
        num_samples (int): The number of samples to run for measuring inference time. Defaults to 100.
        num_warmups (int): The number of warm-up iterations to stabilize performance measurements. Defaults to 10.
        half_tensor (bool): Whether to use half precision tensors for inference. Defaults to False.

    Returns:
        float: The average time taken for inference per sample, in seconds.
    """
    model.to(device)
    model.eval()

    x = torch.rand(size=input_size).to(device)
    x = x.half() if half_tensor else x

    with torch.no_grad():
        for _ in range(num_warmups):
            _ = model(x)
    torch.cuda.synchronize()

    with torch.no_grad():
        start_time = time.time()
        for _ in range(num_samples):
            _ = model(x)
            torch.cuda.synchronize()
        end_time = time.time()
    elapsed_time = end_time - start_time
    elapsed_time_ave = elapsed_time / num_samples

    return elapsed_time_ave

In [None]:
def model_equivalence(model_1: nn.Module, model_2: nn.Module, device: str, rtol: float = 1e-05, atol: float = 1e-08, num_tests: int = 100, input_size: Tuple[int, int, int, int] = (1,3,224,224)) -> bool:
    """
    Checks the equivalence of two models (BEFORE and AFTER fusing, not conversion to int) by comparing their outputs on randomly generated inputs.
    The equivalence is determined based on the relative tolerance (rtol) and absolute tolerance (atol) of the outputs.

    Code adapted and modified from: https://leimao.github.io/blog/PyTorch-Quantization-Aware-Training/

    Args:
        model_1 (nn.Module): The first neural network model for comparison.
        model_2 (nn.Module): The second neural network model for comparison.
        device (str): The device (CPU or CUDA) on which to perform the tests.
        rtol (float): The relative tolerance parameter for np.allclose. Defaults to 1e-05.
        atol (float): The absolute tolerance parameter for np.allclose. Defaults to 1e-08.
        num_tests (int): The number of random tests to run for comparison. Defaults to 100.
        input_size (Tuple[int, int, int, int]): The size of the input tensor. Defaults to (1, 3, 224, 224).

    Returns:
        bool: True if the models are equivalent within the specified tolerances, False otherwise.
    """
    model_1.to(device)
    model_2.to(device)
    model_1.eval()
    model_2.eval()

    for _ in range(num_tests):
        x = torch.rand(size=input_size).to(device)
        y1 = model_1(x).detach().cpu().numpy()
        y2 = model_2(x).detach().cpu().numpy()
        if np.allclose(a=y1, b=y2, rtol=rtol, atol=atol, equal_nan=False) == False:
            print("Model equivalence test sample failed: ")
            print(y1)
            print(y2)
            return False
    return True

In [None]:
# Evaluate accuracies and measure the inference time of a model trained with floating-point precision
fp32_data = "checkpoints/no_QAT/mobilenetv2_cifar10_noqat_150epochs/model_best.pth.tar"

# Instantiate models (mobilenetv2 for example)
cpu_mobilenet2 = mobilenetv2(num_classes=10, pretrained=True, checkpoint_path=fp32_data).to("cpu")
gpu_mobilenet2 = mobilenetv2(num_classes=10, pretrained=True, checkpoint_path=fp32_data).to("cuda")
fp16_gpu_mobilenet2 = mobilenetv2(num_classes=10, pretrained=True, checkpoint_path=fp32_data, half_tensor=True).to("cuda")

# Create data loaders
cifar10_loader = Cifar10Loader()
cpu_val_loader = cifar10_loader.load_validation_data(batch_size=512, num_workers=4, pin_memory=False)
gpu_val_loader = cifar10_loader.load_validation_data(batch_size=512, num_workers=4, pin_memory=True)

# Evaluate inference accuracies
criterion = nn.CrossEntropyLoss()  # Could also be None if we do not care about loss
_, cpu_fp32_avg_top1, cpu_fp32_avg_top5 = eval.test(model=cpu_mobilenet2, val_loader=cpu_val_loader, criterion=criterion, device="cpu")
_, gpu_fp32_avg_top1, gpu_fp32_avg_top5 = eval.test(model=gpu_mobilenet2, val_loader=gpu_val_loader, criterion=criterion, device="cuda")
_, gpu_fp16_avg_top1, gpu_fp16_avg_top5 = eval.test(model=fp16_gpu_mobilenet2, val_loader=gpu_val_loader, criterion=criterion, device="cuda", half_tensor=True)

# Measure inference time over 100 random samples of batches of data
fp32_cpu_inference_latency = measure_inference_latency(model=cpu_mobilenet2, device="cpu", input_size=(512,3,32,32), num_samples=100)
fp32_gpu_inference_latency = measure_inference_latency(model=gpu_mobilenet2, device="cuda", input_size=(512,3,32,32), num_samples=100)
fp16_gpu_inference_latency = measure_inference_latency(model=fp16_gpu_mobilenet2, device="cuda", input_size=(512,3,32,32), num_samples=100, half_tensor=True)

# CPU FP32
print(f"\nFP32 CPU Inference TOP1 Acc: {cpu_fp32_avg_top1}")
print(f"FP32 CPU Inference TOP5 Acc: {cpu_fp32_avg_top5}")
print(f"FP32 CPU Inference Latency: {(fp32_cpu_inference_latency * 1000):.2f} ms / sample \n")

# GPU FP32
print(f"FP32 GPU Inference TOP1 Acc: {gpu_fp32_avg_top1}")
print(f"FP32 GPU Inference TOP5 Acc: {gpu_fp32_avg_top5}")
print(f"FP32 GPU Inference Latency: {(fp32_gpu_inference_latency * 1000):.2f} ms / sample \n")

# GPU FP16
print(f"FP16 GPU Inference TOP1 Acc: {gpu_fp16_avg_top1}")
print(f"FP16 GPU Inference TOP5 Acc: {gpu_fp16_avg_top5}")
print(f"FP16 GPU Inference Latency: {(fp16_gpu_inference_latency * 1000):.2f} ms / sample")


In [None]:
# Evaluate accuracies and measure the inference time of a model trained using QAT and then converted into integer precision
print("Check equivalence of model before and after layer fusion (which precedes QAT)")
test_model = mobilenetv1(num_classes=10).to("cpu")
fused_test_model = copy.deepcopy(test_model)
fused_test_model._qat = True
fused_test_model.quant = torch.ao.quantization.QuantStub()
fused_test_model.dequant = torch.ao.quantization.DeQuantStub()
fused_test_model._quant_config = {}
fused_test_model._set_qat_config()
fused_test_model.eval()
fused_test_model.fuse_model()
fused_test_model.train()
torch.quantization.prepare_qat(fused_test_model, inplace=True)

# Check that models are equivalent
assert model_equivalence(model_1=test_model, model_2=fused_test_model, device="cpu", rtol=1e-03, atol=1e-06, num_tests=100, input_size=(64,3,32,32)), "Quantized model deviates from the original model too much!"
print("Models are functionaly equivalent!")

# Test accuracies and inference times
mn1_qat_pre_conversion_sym_chkpt = "checkpoints/QAT_sym/mobilenetv1_cifar10_qat_150epochs/model_best.pth.tar"
mn1_qat_jit_converted_sym_chkpt = "checkpoints/QAT_sym/mobilenetv1_cifar10_qat_150epochs/jit_model_after_qat.pth.tar"
mn1_qat_converted_sym_chkpt = "checkpoints/QAT_sym/mobilenetv1_cifar10_qat_150epochs/model_after_qat.pth.tar"
orig_mobilenet1 = mobilenetv1(num_classes=10, pretrained=True, checkpoint_path=mn1_qat_pre_conversion_sym_chkpt, qat=True, symmetric_quant=True, act_function=nn.ReLU).to("cpu")
quant_mobilenet1 = mobilenetv1(num_classes=10, pretrained=True, checkpoint_path=mn1_qat_converted_sym_chkpt, qat=True, symmetric_quant=True, load_quantized=True, act_function=nn.ReLU).to("cpu")
quant_jit_mobilenet1 = torch.jit.load(mn1_qat_jit_converted_sym_chkpt, map_location="cpu")

# Print original model size and size after quantization
orig_model_size = get_model_size(orig_mobilenet1)
quant_model_size = get_model_size(quant_mobilenet1)
print(f"Original model Size: {orig_model_size:.2f} MB")
print(f"Quantized model Size: {quant_model_size:.2f} MB\n")

# FP32 PRECISION (USING GPU)
print("Evaluate floating-point model on GPU before conversion to integer.") # Could also be tested on CPU, but for larger models/datasets, that is not possible in most cases
cifar10_loader = Cifar10Loader()
cpu_val_loader = cifar10_loader.load_validation_data(batch_size=512, num_workers=4, pin_memory=False)
gpu_val_loader = cifar10_loader.load_validation_data(batch_size=512, num_workers=4, pin_memory=True)
orig_mobilenet1.to("cuda")

criterion = nn.CrossEntropyLoss()  # Could also be None if we do not care about loss
_, gpu_fp32_avg_top1, gpu_fp32_avg_top5 = eval.test(model=orig_mobilenet1, val_loader=gpu_val_loader, criterion=criterion, device="cuda")
# Measure inference time over 100 random samples of batches of data
fp32_gpu_inference_latency = measure_inference_latency(model=orig_mobilenet1, device="cuda", input_size=(512,3,32,32), num_samples=100)

# orig_mobilenet1.to("cpu")
# _, cpu_fp32_avg_top1, cpu_fp32_avg_top5 = eval.test(model=orig_mobilenet1, val_loader=cpu_val_loader, criterion=criterion, device="cpu")
# Measure inference time over 100 random samples of batches of data
# fp32_cpu_inference_latency = measure_inference_latency(model=orig_mobilenet1, device="cpu", input_size=(512,3,32,32), num_samples=100)

# GPU FP32
print(f"FP32 GPU Inference TOP1 Acc: {gpu_fp32_avg_top1}")
print(f"FP32 GPU Inference TOP5 Acc: {gpu_fp32_avg_top5}")
print(f"FP32 GPU Inference Latency: {(fp32_gpu_inference_latency * 1000):.2f} ms / sample \n")
# CPU FP32 – optional (but good for comparison with quantized inference time)
# print(f"FP32 CPU Inference TOP1 Acc: {cpu_fp32_avg_top1}")
# print(f"FP32 CPU Inference TOP5 Acc: {cpu_fp32_avg_top5}")
# print(f"FP32 CPU Inference Latency: {(fp32_cpu_inference_latency * 1000):.2f} ms / sample \n")

# CONVERT MODEL INPLACE (if desired)
# print("Converting the FP32 model (its state dict after QAT) into INT")
# orig_mobilenet1.to("cpu")  # NOTE: Quantization operations in PyTorch are optimized for CPU backend inference (i.e. utilization of vectorization, etc.).
# orig_mobilenet1.eval()
# torch.ao.quantization.convert(orig_mobilenet1, inplace=True)

# Measure int inference
print("Evaluate integer (loaded) model.")
_, cpu_load_int8_avg_top1, cpu_load_int8_avg_top5 = eval.test(model=quant_mobilenet1, val_loader=cpu_val_loader, criterion=criterion, device="cpu")

# Measure inference time over 100 random samples of batches of data for the converted model
int8_cpu_load_inference_latency = measure_inference_latency(model=quant_jit_mobilenet1, device="cpu", input_size=(512,3,32,32), num_samples=100)

# CPU After QAT INT8
print(f"\nLoaded Model INT8 CPU Inference TOP1 Acc: {cpu_load_int8_avg_top1}")
print(f"Loaded Model INT8 CPU Inference TOP5 Acc: {cpu_load_int8_avg_top5}")
print(f"Loaded Model INT8 CPU Inference Latency: {(int8_cpu_load_inference_latency * 1000):.2f} ms / sample \n")