# MobileNetV1 Quantization
Notebook to quantize MobileNetV1.

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
from torchvision import transforms

import copy
import os
import time
import numpy as np
import random

from pytorchcv.models.mobilenet import mobilenet_w1

In [None]:
def print_size_of_model(model):
    """
    Print the size of the model.
    """
    torch.save(model.state_dict(), "temp.p")
    size_in_bytes = os.path.getsize("temp.p")
    if size_in_bytes < 1048576:
        size_in_kb = size_in_bytes / 1024
        print("{:.3f} KB".format(size_in_kb))
    else:
        size_in_mb = size_in_bytes / 1048576
        print("{:.3f} MB".format(size_in_mb))
    os.remove('temp.p')


def measure_inference_latency(model, input_shape, device = None, repetitions=100, warmup_it = 10):
    """
    Measures the inference time of the provided neural network model.

    Args:
        model: The neural network model to evaluate.
        input_shape: The shape of the input data expected by the model.

    Returns:
        tuple: A tuple containing the mean and standard deviation of the inference time
               measured in milliseconds.
    """
    if device is None:
        device = next(model.parameters()).device.type  # Get the device where the model is located
    
    dummy_input = torch.randn(1, *input_shape, dtype=torch.float).to(device)
    
    # Set model to evaluation mode
    model.to(device)
    model.eval()
    
    # GPU warm-up
    for _ in range(warmup_it):
        _ = model(dummy_input)

    # Measure inference time
    timings = []
    with torch.no_grad():
        if device == 'cuda':
            starter, ender = torch.cuda.Event(enable_timing=True), torch.cuda.Event(enable_timing=True)
            for rep in range(repetitions):
                starter.record()
                _ = model(dummy_input)
                ender.record()
                torch.cuda.synchronize()
                curr_time = starter.elapsed_time(ender)
                timings.append(curr_time)
        else:  # CPU
            for rep in range(repetitions):
                start_time = time.time()
                _ = model(dummy_input)
                end_time = time.time()
                elapsed_time = (end_time - start_time) * 1000.0  # Convert to milliseconds
                timings.append(elapsed_time)

    # Calculate mean and std
    mean_time = np.mean(timings)
    std_time = np.std(timings)

    return mean_time, std_time


def model_equivalence(model_1, model_2, device, rtol=1e-05, atol=1e-08, num_tests=100, input_size=(1,3,32,32), verbose=False):
    """
    Tests whether two models are equivalent by comparing their outputs on random inputs.
    """

    model_1.to(device)
    model_2.to(device)

    for i in range(num_tests):
        print(f"Running test {i+1}/{num_tests}") if verbose else None
        x = torch.rand(size=input_size).to(device)
        y1 = model_1(x).detach().cpu().numpy()
        y2 = model_2(x).detach().cpu().numpy()
        print("Difference: ", np.max(np.abs(y1-y2))) if verbose else None
        if np.allclose(a=y1, b=y2, rtol=rtol, atol=atol, equal_nan=False) == False:
            print("Model equivalence test sample failed: ")
            print(y1)
            print(y2)
            return False
    print("Model equivalence test passed!")
    return True

def set_random_seeds(random_seed=0):
    """
    Set all random seeds to a fixed value to make results reproducible.
    """

    torch.manual_seed(random_seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    np.random.seed(random_seed)
    random.seed(random_seed)

# Training

In [None]:
# Training Function
def train(model, train_loader, device, learning_rate=1e-1, num_epochs=200, save_dir="saved_models"):
    # Create the save directory if it doesn't exist
    if not os.path.exists(save_dir):
        os.makedirs(save_dir)

    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    criterion = nn.CrossEntropyLoss()
    tot_exp_time = 0

    for epoch in range(num_epochs):
        model.train()
        model.to(device)
        train_loss = 0.0
        correct = 0
        total = 0

        # Initialize the timer
        starter, ender = torch.cuda.Event(enable_timing=True), torch.cuda.Event(enable_timing=True)
        # Start Recording the time
        starter.record()
        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            train_loss += loss.item()
            prob = nn.functional.softmax(outputs, dim=1)
            _, predicted = prob.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()
        ender.record()
        torch.cuda.synchronize()
        curr_time = starter.elapsed_time(ender)

        # Save the model state dictionary at the end of each epoch
        model_save_path = os.path.join(save_dir, f"model_epoch_{epoch+1}.pt")
        torch.save(model.state_dict(), model_save_path)

        # Save some statics the be saved in output
        tot_exp_time += curr_time
        accuracy = 100.0 * correct / total
        average_loss = train_loss / len(train_loader)

        print(f"Epoch {epoch+1}/{num_epochs} : Train accuracy {accuracy:.2f}%, Train loss {average_loss:.4f}, Training Time: {curr_time/1000:.3f} s")

    return model, tot_exp_time

def test(model, test_loader, device, criterion= nn.CrossEntropyLoss()):

    model.eval()
    model.to(device)

    test_loss = 0.0
    correct = 0
    total = 0

    with torch.no_grad():
        for images, labels in test_loader:
            images, labels = images.to(device), labels.to(device)

            outputs = model(images)
            loss = criterion(outputs, labels)

            test_loss += loss.item()
            prob = nn.functional.softmax(outputs, dim=1)
            _, predicted = prob.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()

    accuracy = 100.0 * correct / total
    average_loss = test_loss / len(test_loader)

    return accuracy, average_loss

def calibrate_model(model, loader, device=torch.device("cpu")):

    model.to(device)
    model.eval()

    for inputs, labels in loader:
        inputs = inputs.to(device)
        labels = labels.to(device)
        _ = model(inputs)

Download the CIFAR10 dataset:

In [None]:
def prepare_dataloader(num_workers=1, train_batch_size=128, eval_batch_size=256):

    train_transform = transforms.Compose([
        torchvision.transforms.ToTensor(),
        torchvision.transforms.Resize((224, 224)),
        transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225))
    ])

    train_set = torchvision.datasets.CIFAR10(root='./data', train=True, transform=train_transform, download=True)
    test_set = torchvision.datasets.CIFAR10(root='./data', train=False, transform=train_transform)

    train_sampler = torch.utils.data.RandomSampler(train_set)
    test_sampler = torch.utils.data.SequentialSampler(test_set)

    train_loader = torch.utils.data.DataLoader(
        dataset=train_set, batch_size=train_batch_size,
        sampler=train_sampler, num_workers=num_workers)

    test_loader = torch.utils.data.DataLoader(
        dataset=test_set, batch_size=eval_batch_size,
        sampler=test_sampler, num_workers=num_workers)

    return train_loader, test_loader    

Create the function to fuse the MobileNetV1 layers:

In [None]:
from pytorchcv.models.common import ConvBlock

def fuse_mobilenet(model):
    for basic_block in model.children():
        if isinstance(basic_block, ConvBlock):
            torch.ao.quantization.fuse_modules(basic_block, [["conv", "bn", "activ"]], inplace=True)
        else:
            basic_block = fuse_mobilenet(basic_block)


Training of MobileNetV1 on CIFAR10:

In [None]:
model_cifar_fp32 = mobilenet_w1(pretrained=True)
print(model_cifar_fp32)

train_loader, test_loader = prepare_dataloader(num_workers=1, train_batch_size=256, eval_batch_size=256)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model_cifar_fp32, train_time_model_cifar_fp32 = train(model_cifar_fp32, train_loader, device, learning_rate=0.001, num_epochs=10, save_dir="orignal_model")

acc_model_cifar_fp32, _ = test(model_cifar_fp32, test_loader, device)

print(f"Training Time: {train_time_model_cifar_fp32/1000:.3f} s")
print(f"Test Accuracy: {acc_model_cifar_fp32:.2f}%")

Static Quantization of the modified PhiNet:

In [None]:
class ModelQuant(nn.Module):
    def __init__(self, model_fp32):
        super(ModelQuant, self).__init__()
        # QuantStub converts tensors from floating point to quantized.
        # This will only be used for inputs.
        self.quant = torch.ao.quantization.QuantStub()
        # FP32 model
        self.model_fp32 = model_fp32
        # DeQuantStub converts tensors from quantized to floating point.
        # This will only be used for outputs.
        self.dequant = torch.ao.quantization.DeQuantStub()

    def forward(self, x):
        # manually specify where tensors will be converted from floating
        # point to quantized in the quantized model
        x = self.quant(x)
        x = self.model_fp32(x)
        # manually specify where tensors will be converted from quantized
        # to floating point in the quantized model
        x = self.dequant(x)
        
        return x

In [None]:
set_random_seeds(random_seed=1)

# Copy the pretrained model for static quantization
model_st_cifar_fp32 = copy.deepcopy(model_cifar_fp32)
model_st_cifar_fp32.to("cpu")

# Create a copy of the model for fusion
model_st_cifar_fp32_fused = copy.deepcopy(model_st_cifar_fp32)

# Set the models to eval mode (important for fusion)
model_st_cifar_fp32.eval()
model_st_cifar_fp32_fused.eval()

# Fuse Conv, BN, ReLu modules in the MobileNetV1 model
fuse_mobilenet(model_st_cifar_fp32_fused)

print(f"model_fp32:\n {model_st_cifar_fp32}\n")
print(f"model_fp32_fused:\n {model_st_cifar_fp32_fused}\n")

assert model_equivalence(model_1=model_st_cifar_fp32, model_2=model_st_cifar_fp32_fused, device="cpu", rtol=1e-01, atol=1e-04, num_tests=100, input_size=(1,3,224,224)), "Fused model is not equivalent to the original model!"

# Insert stubs
model_st_cifar_int8 = ModelQuant(model_st_cifar_fp32_fused)

quantization_config = torch.ao.quantization.get_default_qconfig("x86")

model_st_cifar_int8.qconfig = quantization_config

# Prepare the model for static quantization. 
torch.ao.quantization.prepare(model_st_cifar_int8, inplace=True)

print(f"After preparation, note fake-quantization modules:\n {model_st_cifar_int8}\n")

# Calibrate the model
calibrate_model(model_st_cifar_int8, train_loader, device="cpu")

# Convert the observed model to a quantized model.
model_st_cifar_int8 = torch.ao.quantization.convert(model_st_cifar_int8, inplace=True)

print(f"model_int8:\n {model_st_cifar_int8}\n")

acc_model_st_cifar_int8, _ = test(model_st_cifar_int8, test_loader, "cpu")

print(f"Test Accuracy: {acc_model_st_cifar_int8:.2f}%")

Comper the dimension of the FP32 model and of the INT8 model:

In [None]:
print("FP32 size:")
print_size_of_model(model_cifar_fp32)

print("INT8 size:")
print_size_of_model(model_st_cifar_int8)

Compare results on **CIFAR10**:
- Orginal model FP32;
- Static Quantization of the modified model INT8;

In [None]:
model_fp32_cpu_inference_latency  = measure_inference_latency(model_cifar_fp32, device="cpu", input_shape=(3, 224, 224))
model_fp32_gpu_inference_latency  = measure_inference_latency(model_cifar_fp32, device="cuda", input_shape=(3, 224, 224))

model_st_int8_cpu_inference_latency  = measure_inference_latency(model_st_cifar_int8, device="cpu", input_shape=(3, 224, 224))

print("FP32 model CPU Inference Latency: {:.3f} ms".format(model_fp32_cpu_inference_latency[0]))
print("FP32 model GPU Inference Latency: {:.3f} ms".format(model_fp32_gpu_inference_latency[0]))
print("FP32 model test accuracy: {:.2f}%".format(acc_model_cifar_fp32))
print("FP32 model training time: {:.3f} s\n".format(train_time_model_cifar_fp32/1000))


print("INT8 model static quant CPU Inference Latency: {:.3f} ms".format(model_st_int8_cpu_inference_latency[0]))
print("INT8 model static quant test accuracy: {:.2f}%\n".format(acc_model_st_cifar_int8))
