## Basic Setup

In [1]:
import time
import os
import numpy as np
import torch
import torch.nn as nn
from torchvision import datasets, transforms, models
from torch.quantization import quantize_dynamic
from torch.ao.quantization import get_default_qconfig, QConfigMapping
from torch.ao.quantization.quantize_fx import prepare_fx, convert_fx
from torch.utils.data import DataLoader, Subset

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"{device=}")

device=device(type='cuda')


## Get CIFAR-10 train and test sets

In [2]:
transform = transforms.Compose([
    transforms.Resize(32),
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])


train_dataset = datasets.CIFAR10(root="./data", train=True, download=True, transform=transform)
train_loader = DataLoader(
    datasets.CIFAR10(root="./data", train=True, download=True, transform=transform),
    batch_size=128, shuffle=True
)

test_dataset = datasets.CIFAR10(root="./data", train=False, download=True, transform=transform)
test_loader = DataLoader(
    datasets.CIFAR10(root="./data", train=False, download=True, transform=transform),
    batch_size=128,
    shuffle=False,
    num_workers=2,
    drop_last=True,
)

calibration_dataset = Subset(train_dataset, range(256))
calibration_loader = DataLoader(calibration_dataset, batch_size=128, shuffle=False)

## Adjust ResNet18 network for CIFAR-10 dataset

In [3]:
def get_resnet18_for_cifar10():
    model = models.resnet18(weights=None, num_classes=10)
    model.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False)
    model.maxpool = nn.Identity()
    return model.to(device)

full_model = get_resnet18_for_cifar10()

## Define Train and Evaluate functions

In [4]:
def train(model, loader, epochs, lr=0.01, save_path="model.pth", silent=False):
    if os.path.exists(save_path):
        if not silent:
            print(f"Model already trained. Loading from {save_path}")
        model.load_state_dict(torch.load(save_path))
        return

    # no saved model found. training from given model state

    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=lr, momentum=0.9)
    model.train()

    for epoch in range(epochs):
        for x, y in loader:
            x, y = x.to(device), y.to(device)
            optimizer.zero_grad()
            loss = criterion(model(x), y)
            loss.backward()
            optimizer.step()
        if not silent:
            print(f"Epoch {epoch+1}: loss={loss.item():.4f}")

    if save_path:
        torch.save(model.state_dict(), save_path)
        if not silent:
            print(f"Training complete. Model saved to {save_path}")

In [5]:
def evaluate(model, device_str):
    model.eval()
    if device_str:
        device = torch.device(device_str)
        model.to(device)
    correct = total = 0
    with torch.no_grad():
        for x, y in test_loader:
            if device_str:
                x, y = x.to(device), y.to(device)
            preds = model(x).argmax(1)
            correct += (preds == y).sum().item()
            total += y.size(0)
    return correct / total

## Define helper functions to measure latency

In [6]:
class Timer:
    def __init__(self):
        self.use_cuda = torch.cuda.is_available()
        if self.use_cuda:
            self.starter = torch.cuda.Event(enable_timing=True)
            self.ender = torch.cuda.Event(enable_timing=True)

    def start(self):
        if self.use_cuda:
            self.starter.record()
        else:
            self.start_time = time.time()

    def stop(self):
        if self.use_cuda:
            self.ender.record()
            torch.cuda.synchronize()
            return self.starter.elapsed_time(self.ender)  # ms
        else:
            return (time.time() - self.start_time) * 1000  # ms

In [7]:
def estimate_latency(model, example_inputs, repetitions=50):
    timer = Timer()
    timings = np.zeros((repetitions, 1))

    # warm-up
    for _ in range(5):
        _ = model(example_inputs)

    with torch.no_grad():
        for rep in range(repetitions):
            timer.start()
            _ = model(example_inputs)
            elapsed = timer.stop()
            timings[rep] = elapsed

    return np.mean(timings), np.std(timings)

## Train full model

In [8]:
train(full_model, train_loader, epochs=10, save_path="full_model.pth")


Model already trained. Loading from full_model.pth


## Evaluate full model

In [9]:
# evaluate accuracy
accuracy_full = evaluate(full_model, 'cuda')
print(f"Accuracy (full): {accuracy_full*100:.2f}%")

# get model size
size_mb_full = os.path.getsize("full_model.pth") / 1e6
print(f"Size (full): {size_mb_full:.2f} MB")

# estimate latency on GPU
example_input = torch.rand(128, 3, 32, 32).cuda()
full_model.cuda()
latency_mu_full_gpu, latency_std_full_gpu = estimate_latency(full_model, example_input)
print(f"Latency (full, on gpu): {latency_mu_full_gpu:.2f} ± {latency_std_full_gpu:.2f} ms")

# estimate latency on CPU
example_input = torch.rand(128, 3, 32, 32).cpu()
full_model.cpu()
latency_mu_full_cpu, latency_std_full_cpu = estimate_latency(full_model, example_input)
print(f"Latency (full, on cpu): {latency_mu_full_cpu:.2f} ± {latency_std_full_cpu:.2f} ms")


Accuracy (full): 79.33%
Size (full): 44.78 MB
Latency (full, on gpu): 16.22 ± 0.03 ms
Latency (full, on cpu): 1088.15 ± 90.12 ms


## Apply Quantization for CPU inference

In [10]:
# define qconfig and prepare model for quantization
full_model.cpu()
full_model.eval()
qconfig = get_default_qconfig("fbgemm")
# qconfig = get_default_qconfig('qnnpack')
qconfig_mapping = QConfigMapping().set_global(qconfig)

example_inputs = torch.rand(128, 3, 32, 32).cuda() # Example input for calibration
# prepared_model = prepare_fx(full_model, {"": qconfig}, example_inputs=example_inputs)
prepared_model = prepare_fx(full_model, qconfig_mapping, example_inputs=example_inputs)


# calibrate model with real data
with torch.no_grad():
    for images, _ in calibration_loader:
        prepared_model(images)
        # calibration doesn't need targets, only forward pass

# convert to quantized model
ptq_model = convert_fx(prepared_model)

# save model
torch.save(ptq_model.state_dict(), "ptq_model.pth")



## Evaluate post-training-quantization (PTQ) model

In [11]:
# evaluate accuracy
ptq_model.cpu()
accuracy_ptq = evaluate(ptq_model, 'cpu')
print(f"Accuracy (PTQ): {accuracy_ptq*100:.2f}%")

# get model size
size_mb_ptq = os.path.getsize("ptq_model.pth") / 1e6
print(f"Size (PTQ): {size_mb_ptq:.2f} MB")

# estimate latency
example_input = torch.rand(128, 3, 32, 32)
latency_mu_ptq, latency_std_ptq = estimate_latency(ptq_model, example_input)
print(f"Latency (PTQ, on cpu): {latency_mu_ptq:.2f} ± {latency_std_ptq:.2f} ms")

Accuracy (PTQ): 79.10%
Size (PTQ): 11.30 MB
Latency (PTQ, on cpu): 695.72 ± 29.66 ms


# Quantization Aware Training (QAT) on CPU

In [12]:
# reload full model
full_model_qat = get_resnet18_for_cifar10()
train(full_model_qat, train_loader, epochs=10, save_path="full_model.pth")

Model already trained. Loading from full_model.pth


In [13]:
from torch.quantization import prepare_qat, convert, get_default_qconfig

# backend = "fbgemm"  # running on a x86 CPU. Use "qnnpack" if running on ARM.
backend = "qnnpack"
# qconfig = get_default_qconfig('qnnpack')

# torch.backends.quantized.engine = 'fbgemm'

# full_model_qat.eval()
# # Fuse the model in place rather manually.
# full_model_qat = torch.quantization.fuse_modules(full_model_qat, [["conv1", "bn1", "relu"]], inplace=True)
# for module_name, module in full_model_qat.named_children():
#     if "layer" in module_name:
#         for basic_block_name, basic_block in module.named_children():
#             torch.quantization.fuse_modules(basic_block, [["conv1", "bn1", "relu1"], ["conv2", "bn2"]], inplace=True)
#             for sub_block_name, sub_block in basic_block.named_children():
#                 if sub_block_name == "downsample":
#                     torch.quantization.fuse_modules(sub_block, [["0", "1"]], inplace=True)

# class QuantizedResNet18(nn.Module):
#     def __init__(self, model_fp32):
#         super(QuantizedResNet18, self).__init__()
#         # QuantStub converts tensors from floating point to quantized.
#         # This will only be used for inputs.
#         self.quant = torch.quantization.QuantStub()
#         # DeQuantStub converts tensors from quantized to floating point.
#         # This will only be used for outputs.
#         self.dequant = torch.quantization.DeQuantStub()
#         # FP32 model
#         self.model_fp32 = model_fp32

#     def forward(self, x):
#         # manually specify where tensors will be converted from floating
#         # point to quantized in the quantized model
#         x = self.quant(x)
#         x = self.model_fp32(x)
#         # manually specify where tensors will be converted from quantized
#         # to floating point in the quantized model
#         x = self.dequant(x)
#         return x

# quantized_model = QuantizedResNet18(model_fp32=full_model_qat)

"""Insert stubs"""
full_model_qat = nn.Sequential(torch.quantization.QuantStub(), 
                  full_model_qat, 
                  torch.quantization.DeQuantStub())


#adapt model
# full_model_qat.qconfig = get_default_qconfig("fbgemm")
# full_model_qat.qconfig = get_default_qconfig("qnnpack")
full_model_qat.train()
full_model_qat.qconfig = get_default_qconfig(backend)


# from torch.quantization import QConfig, default_observer, default_fake_quant, default_weight_fake_quant

# qat_qconfig_per_tensor = QConfig(
#     activation=default_fake_quant,
#     weight=default_weight_fake_quant 
# )

# full_model_qat.qconfig = qat_qconfig_per_tensor
prepare_qat(full_model_qat, inplace=True)

# consider changing to 3
train(full_model_qat, train_loader, epochs=1, save_path="")


Epoch 1: loss=0.0052
Training complete. Model saved to qat_model.pth


In [14]:
# Convert to quantized model
# full_model_qat.cpu()
full_model_qat.eval()
qat_model = convert(full_model_qat, inplace=True)
# qat_model = convert(full_model_qat, inplace=False)

In [15]:
# evaluate accuracy
accuracy_qat = evaluate(qat_model, 'cpu')
print(f"Accuracy (QAT): {accuracy_qat*100:.2f}%")

RuntimeError: getCudnnDataTypeFromScalarType() not supported for QUInt8

In [None]:
# Convert to quantized model
full_model_qat.eval()
qat_model = convert(full_model_qat, inplace=False)

# evaluate accuracy
accuracy_qat = evaluate(qat_model, '')
print(f"Accuracy (QAT): {accuracy_qat*100:.2f}%")

# get model size
size_mb_qat = os.path.getsize("qat_model.pth") / 1e6
print(f"Size (QAT): {size_mb_qat:.2f} MB")



In [None]:
print("Is quantized:", isinstance(qat_model.conv1, torch.nn.quantized.Conv2d))

x = torch.randn(1, 3, 224, 224).float()
print("Input dtype:", x.dtype, "device:", x.device)

torch.backends.quantized.engine

In [None]:
print(qat_model)
print(qat_model.conv1)  # or whatever your first conv layer is

x = torch.randn(1, 3, 224, 224)  # assuming input shape for ResNet
out = qat_model(x) 

In [None]:
# estimate latency
example_input = torch.rand(128, 3, 32, 32)
latency_mu_qat, latency_std_qat = estimate_latency(qat_model, example_input)
print(f"Latency (QAT, on cpu): {latency_mu_qat:.2f} ± {latency_std_qat:.2f} ms")

# Quantize using GPU

## Setup required packages for GPU Quantization

In [None]:
!pip install torch_tensorrt
!pip install nvidia-modelopt[all]
!pip install onnx
!pip install onnxruntime-gpu

## Required imports for GPU Quantization

In [None]:
import torch_tensorrt
import modelopt.torch.quantization as mtq
import onnx

In [None]:
# reload full model
full_model = get_resnet18_for_cifar10()
train(full_model, train_loader, epochs=10, save_path="full_model.pth")


In [None]:
# define calibration loop
def calibration_loop(model):
    for batch, _ in calibration_loader:
        model(batch.cuda())

In [None]:
# Select quantization config
config = mtq.INT8_SMOOTHQUANT_CFG

# Quantize the model and perform calibration (PTQ)
ptq_model_gpu = mtq.quantize(full_model, config, calibration_loop)

# save model
torch.save(ptq_model_gpu.state_dict(), "ptq_model_gpu.pth")

In [None]:
# save model
torch.save(ptq_model_gpu.state_dict(), "ptq_model_gpu.pth")

In [None]:
# evaluate accuracy
accuracy_ptq_gpu = evaluate(ptq_model_gpu, 'cuda')
print(f"Accuracy (PTQ): {accuracy_ptq_gpu*100:.2f}%")

# get model size
size_mb_ptq_gpu = os.path.getsize("ptq_model_gpu.pth") / 1e6
print(f"Size (PTQ): {size_mb_ptq_gpu:.2f} MB")

# estimate latency
example_input = torch.rand(128, 3, 32, 32).to(device)
latency_mu_ptq_gpu, latency_std_ptq_gpu = estimate_latency(ptq_model_gpu, example_input)
print(f"Latency (PTQ, on gpu): {latency_mu_ptq_gpu:.2f} ± {latency_std_ptq_gpu:.2f} ms")

In [None]:
# export to onnx
example_input = torch.rand(128, 3, 32, 32).cuda()
torch.onnx.export(ptq_model_gpu, example_input, "ptq_model_gpu.onnx")

In [None]:
# load onnx model
onnx_model = onnx.load("ptq_model_gpu.onnx")

# check that the model is well formed
onnx.checker.check_model(onnx_model)

In [None]:
import onnxruntime as ort

session = ort.InferenceSession(
    "ptq_model_gpu.onnx",
    providers=["CUDAExecutionProvider"]  # 👈 use GPU
)

In [None]:
# # evaluate accuracy
# accuracy_ptq_gpu_onnx = evaluate(onnx_model, 'cuda')
# print(f"Accuracy (PTQ): {accuracy_ptq_gpu_onnx*100:.2f}%")

# get model size
size_mb_ptq_gpu_onnx = os.path.getsize("ptq_model_gpu.onnx") / 1e6
print(f"Size (PTQ): {size_mb_ptq_gpu_onnx:.2f} MB")

# # estimate latency
# example_input = torch.rand(128, 3, 32, 32).to(device)
# latency_mu_ptq_gpu_onnx, latency_std_ptq_gpu_onnx = estimate_latency(onnx_model, example_input)
# print(f"Latency (PTQ, on gpu): {latency_mu_ptq_gpu_onnx:.2f} ± {latency_std_ptq_gpu_onnx:.2f} ms")

In [None]:
# Compress the model
mtq.compress(ptq_model_gpu)

In [None]:
from modelopt.torch.quantization.utils import export_torch_mode
import torch_tensorrt as torchtrt

example_input = torch.rand(128, 3, 32, 32).to(device)

with torch.no_grad():
    with export_torch_mode():
        # Compile the model with Torch-TensorRT Dynamo backend

        exp_program = torch.export.export(ptq_model, (example_input,), strict=False)
        enabled_precisions = {torch.int8}
        # enabled_precisions = {torch.float8_e4m3fn}
        trt_model = torchtrt.dynamo.compile(
            exp_program,
            inputs=[example_input],
            enabled_precisions=enabled_precisions,
            min_block_size=1,
            debug=False,
        )
        # You can also use torch compile path to compile the model with Torch-TensorRT:
        # trt_model = torch.compile(model, backend="tensorrt")



In [None]:
transform = transforms.Compose([
    transforms.Resize(224),
    transforms.ToTensor(),
])
train_set = datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
test_set = datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)
train_loader = DataLoader(train_set, batch_size=64, shuffle=True)
test_loader = DataLoader(test_set, batch_size=64, shuffle=False)

## Load and adapt MobileNetV2

In [None]:
model_fp32 = models.mobilenet_v2(pretrained=True)
model_fp32.classifier[1] = nn.Linear(model_fp32.last_channel, 10)
model_fp32.eval()


## Apply dynamic quantization to Linear layers


In [None]:
model_int8 = quantize_dynamic(model_fp32, {nn.Linear}, dtype=torch.qint8)


## Evaluate

In [None]:
# Evaluate function
@torch.no_grad()
def evaluate(model, dataloader):
    model.eval()
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    correct = 0
    total = 0
    for inputs, labels in dataloader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        _, predicted = outputs.max(1)
        correct += predicted.eq(labels).sum().item()
        total += labels.size(0)
    return correct / total

# Run evaluation
acc_original = evaluate(model_fp32, test_loader)
print(f"Original Model Accuracy: {acc_original*100:.2f}%")

acc = evaluate(model_int8, test_loader)
print(f"PTQ Quantized Model Accuracy: {acc*100:.2f}%")

In [None]:
acc_original = evaluate(model_fp32, test_loader)
print(f"Original Model Accuracy: {acc_original:.4f}")


## Measure inference time and model size

In [None]:
def measure_latency(model, device, input_shape=(1, 3, 224, 224), runs=100):
    model.eval()
    dummy_input = torch.randn(input_shape).to(device)
    with torch.no_grad():
        start = time.time()
        for _ in range(runs):
            model(dummy_input)
        end = time.time()
    return (end - start) / runs

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


latency_org = measure_latency(model_fp32, device)
print(f"Average inference latency (org): {latency_org * 1000:.2f} ms")

latency = measure_latency(model_int8, device)
print(f"Average inference latency (PTQ): {latency * 1000:.2f} ms")




## Save and report model size


In [None]:
# Save and report model size
torch.save(model_int8.state_dict(), "ptq_model.pth")
size_mb = os.path.getsize("ptq_model.pth") / 1e6
print(f"Model size (PTQ): {size_mb:.2f} MB")


In [None]:
torch.save(model_fp32.state_dict(), "org_model.pth")
size_mb = os.path.getsize("org_model.pth") / 1e6
print(f"Model size (org): {size_mb:.2f} MB")