# Dynamic, Static, and Quantitative-Aware Training Quantization Methods 

This notebook shows how to perform Dynamic, Static, and Quantitative-Aware Training Quantization.

The notebook and materials are part of the LinkedIn Learning Course: [Ai Model Compression Techniques: Building Cheaper, Faster, and Greener AI:](https://www.linkedin.com/learning/ai-model-compression-techniques-building-cheaper-faster-and-greener-ai)

----

## Load Modules

Some helpful comments on the modules:

* `import torch` # torch will allow us to create tensors.

* `import torch.nn as nn` # torch.nn allows us to create a neural network.

* `torch.nn.functional` # nn.functional give us access to the activation and loss functions.

* `import torch.optim as optim`  # optim contains many optimizers. This time I am using Adam

* `from torch.utils.data import DataLoader, Subset` # needed for training data

* `import torch.quantization` # provides tools and functionalities for quantizing deep learning models. 

* `import torch.utils.data import DataLoader, Subset` 
    * **DataLoader** loads data efficiently in batches
    * **Subset** allows for the creation of an exisitng PyTorch Dataset. It takes an original Dataset and a list of indices as input, effectively creating a new "dataset" that only contains the samples corresponding to those specified indices. 
    
* `from torchvision import datasets, transforms` 
    * **datasets**: This module provides access to a collection of popular datasets commonly used in computer vision for developing and testing machine learning models. It is where we get the CIFAR-10 dataset for this notebooks
    * **transforms**: This module offers common image transformations and data augmentation techniques that are crucial for training and evaluating computer vision models.


In [None]:
import os
import random
import time
import warnings
warnings.filterwarnings('ignore')

import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.quantization
from torch.utils.data import DataLoader, Subset
from torchvision import datasets, transforms

### Set Random Seeds for reproducibility


In [None]:
torch.manual_seed(42)
random.seed(42)
np.random.seed(42)

### Do an initial check if Compute Unified Device Architecture (CUDA) is available

Checking if CUDA is available is a crucial step in applications, particularly in deep learning and high-performance computing, for several reasons: 

* Enabling GPU Acceleration:

    CUDA (Compute Unified Device Architecture) is NVIDIA's parallel computing platform and API that allows software to leverage the power of NVIDIA GPUs for general-purpose computing. Checking for its availability determines whether your program can offload computationally intensive tasks to the GPU, leading to significant speedups compared to CPU-only execution.

* Conditional Code Execution:

    By checking for CUDA availability, you can write code that dynamically adapts to the hardware environment. If a CUDA-enabled GPU is present, your program can utilize GPU-specific operations and data structures. If not, it can gracefully fall back to CPU implementations or inform the user about the lack of GPU support. This prevents errors and ensures your application can run on various systems.

* Resource Management:

    Knowing if CUDA is available allows you to manage resources effectively. If a GPU is present, you can allocate memory on the device and perform computations there. If not, you avoid attempting to access non-existent GPU resources, which would lead to errors.


* Error Prevention and Debugging:

    Explicitly checking for CUDA availability helps in identifying and preventing issues related to missing or improperly configured CUDA installations or incompatible GPU drivers. If the check fails, it provides an immediate indication that GPU acceleration is not possible, guiding troubleshooting efforts.

* Optimized Performance:

    Many deep learning frameworks and libraries are designed to leverage CUDA for optimal performance. Verifying CUDA availability ensures that these frameworks can utilize the intended hardware acceleration, leading to faster training times and inference speeds for machine learning models.

In [None]:
cuda_available = torch.cuda.is_available()
train_device = torch.device("cuda:0" if cuda_available else "cpu")
print(f"Using device for training: {train_device}")


---

# Building the Model: Model Definition

---

I will be building a Convolutional Neueral Netwwork (CNN) for the CIFAR-10 Dataset.

#### Why CNN?

CNNs are preferred for CIFAR-10 image classification because they excel at identifying spatial hierarchies and patterns, which are crucial for recognizing objects within images. Their convolutional layers effectively extract features like edges, shapes, and textures, outperforming traditional Artificial Neural Networks (ANNs) for this task. 

The CIFAR-10 dataset consists of small, color images (32x32 pixels) with ten object categories. CNNs are well-suited to the spatial characteristics of these images and can effectively learn to classify them.

CNN has a form similar to the figure below.

<p>
  <img alt="CNN for CIFAR-10" src="cnn.png" width="500" height="250"/>
</p>

[img source](https://towardsdatascience.com/deep-learning-with-cifar-10-image-classification-64ab92110d79/)


### Understanding the Key Components of the CNN Class Below

* **class CIFAR10CNN(nn.Module):**

    - This line declares a Python class **CIFAR10CNN** that inherits from nn.Module. Inheriting from nn.Module is fundamental in PyTorch for building neural networks, as it provides core functionalities like tracking parameters, moving models to different devices, and managing submodules.

* **def __init__(self)::**

    - This method is the constructor method where the layers of the neural network are defined and initialized.

        - **super(CIFAR10CNN, self).__init__():** This calls the constructor of the parent class nn.Module, which is crucial for proper initialization of the PyTorch module.

        - **self.features = nn.Sequential(...):** This defines the "feature extraction" part of the CNN using nn.Sequential. nn.Sequential allows chaining multiple layers together, where the output of one layer becomes the input of the next.

            - **Convolutional Blocks:** The _features_ section consists of three blocks, each containing:
                - **nn.Conv2d:** A 2D convolutional layer that extracts features from the input image. The parameters define input channels (3 for RGB images), output channels (64, 128, 256), kernel size (3x3), and padding (1 to maintain spatial dimensions).

                - **nn.ReLU(inplace=True):** A _Rectified Linear Unit (ReLU)_ activation function, which introduces non-linearity. _inplace=True_ modifies the input directly, saving memory.

                - **nn.MaxPool2d(kernel_size=2):** A 2D max pooling layer that downsamples the feature maps, reducing spatial dimensions and making the network more robust to small shifts in the input.

        - **self.classifier = nn.Sequential(...):** This defines the "classification" part of the CNN.
            - **nn.Flatten():** This layer flattens the multi-dimensional output from the features part into a 1D vector, preparing it for the fully connected layers. The input dimension 256 * 4 * 4 implies that after the features layers, the spatial dimensions of the feature maps are 4x4, and there are 256 channels.

            - **nn.Linear(256 * 4 * 4, 1024):** A fully connected (linear) layer that takes the flattened features as input and outputs a 1024-dimensional vector.

            - **nn.ReLU(inplace=True):** Another ReLU activation function.

            - **nn.Linear(1024, 10):** The final fully connected layer that maps the 1024-dimensional vector to 10 output values, corresponding to the 10 classes in the CIFAR-10 dataset.

* **def forward(self, x)::**

This method defines the forward pass of the network, specifying how input data x flows through the layers.

- **x = self.features(x):** The input x first passes through the feature extraction layers.
- **x = self.classifier(x):** The output of the feature extraction is then passed through the classification layers.
- **return x:** The final output of the network, representing the raw scores (logits) for each of the 10 classes, is returned.

In [None]:
# Define a CNN model for CIFAR10
class CIFAR10CNN(nn.Module):
   def __init__(self):
       super(CIFAR10CNN, self).__init__()
       self.features = nn.Sequential(
           nn.Conv2d(3, 64, kernel_size=3, padding=1),
           nn.ReLU(inplace=True),
           nn.MaxPool2d(kernel_size=2),


           nn.Conv2d(64, 128, kernel_size=3, padding=1),
           nn.ReLU(inplace=True),
           nn.MaxPool2d(kernel_size=2),


           nn.Conv2d(128, 256, kernel_size=3, padding=1),
           nn.ReLU(inplace=True),
           nn.MaxPool2d(kernel_size=2),
       )


       self.classifier = nn.Sequential(
           nn.Flatten(),
           nn.Linear(256 * 4 * 4, 1024),
           nn.ReLU(inplace=True),
           nn.Linear(1024, 10)
       )


   def forward(self, x):
       x = self.features(x)
       x = self.classifier(x)
       return x

### Loading the CIFAR-10 Dataset

The provided code defines a function load_data for loading and preprocessing the CIFAR-10 dataset in PyTorch, and then uses it to create data loaders for training and testing. It also creates a separate CPU-specific test data loader.

### Understanding the Key Components of Loading the CIFAR-10 Dataset

1. **load_data Function:**

    - **Data Transformations:**

        - **transform_train:** A sequence of transformations applied to the training data. This includes **RandomCrop** and **RandomHorizontalFlip** for data augmentation, **ToTensor** to convert images to PyTorch tensors, and **Normalize** to scale pixel values to a range of -1 to 1.

        - **transform_test:** A sequence of transformations for the test data, applying **ToTensor** and **Normalize.**

    - **Dataset Loading:**
        - **train_dataset** and **test_dataset**: Loads the CIFAR-10 dataset, downloading it if not already present. The respective transformations (**transform_train** and **transform_test**) are applied.

    - **Training Data Subset:**
        - For faster demonstration, a subset of 10,000 samples is randomly selected from the **train_dataset** to create **train_subset**.

    - **Data Loaders:**
        - **train_loader:** Creates a **DataLoader** for the **train_subset** with a specified **batch_size**, shuffling the data, and using 2 worker processes for parallel data loading.

        - **test_loader:** Creates a **DataLoader** for the **test_dataset** with the same **batch_size**, but without shuffling.

    - **Return Values:**
        - The function returns **train_loader** and **test_loader**.


2. **Data Loading and CPU Dataloader:**
    - **Loading Data:**
        - The **load_data** function is called with **batch_size = 128** to obtain **train_loader** and **test_loader**.

    - **CPU Dataloader for Quantization:**
        - **test_loader_cpu:** A separate **DataLoader** is created specifically for the test set, configured to load data onto the CPU. This is explicitly mentioned as being for "quantization operations," suggesting that subsequent parts of the code might involve quantizing a model, which often requires data to be on the CPU. The same **ToTensor** and **Normalize** transformations are applied as in **transform_test**.

In [None]:
# Data loading and preprocessing
def load_data(batch_size=128):
   transform_train = transforms.Compose([
       transforms.RandomCrop(32, padding=4),
       transforms.RandomHorizontalFlip(),
       transforms.ToTensor(),
       transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
   ])


   transform_test = transforms.Compose([
       transforms.ToTensor(),
       transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
   ])


   train_dataset = datasets.CIFAR10('./data', train=True, download=True, transform=transform_train)
   test_dataset = datasets.CIFAR10('./data', train=False, download=True, transform=transform_test)


   # For faster demonstration, use a subset of training data
   train_indices = list(range(len(train_dataset)))
   random.shuffle(train_indices)
   train_indices = train_indices[:10000]  # Use 10,000 samples for training


   train_subset = Subset(train_dataset, train_indices)


   train_loader = DataLoader(train_subset, batch_size=batch_size, shuffle=True, num_workers=2)
   test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=2)


   return train_loader, test_loader


# Load data
batch_size = 128
train_loader, test_loader = load_data(batch_size)


# CPU dataloader for quantization operations
test_loader_cpu = DataLoader(
   datasets.CIFAR10('./data', train=False, download=True,
                    transform=transforms.Compose([
                        transforms.ToTensor(),
                        transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
                    ])),
   batch_size=batch_size, shuffle=False)

## Create Utilities for Evaluation

In [None]:
# Utility functions
def get_model_size(model):
   """Calculate the model size in MB"""
   torch.save(model.state_dict(), "temp.p")
   size = os.path.getsize("temp.p")/1e6
   os.remove('temp.p')
   return size


def evaluate_model(model, dataloader, device, num_batches=None):
   """Evaluate model accuracy"""
   model.eval()
   correct = 0
   total = 0


   with torch.no_grad():
       for i, (data, target) in enumerate(dataloader):
           if num_batches is not None and i >= num_batches:
               break


           data, target = data.to(device), target.to(device)
           outputs = model(data)
           _, predicted = torch.max(outputs.data, 1)
           total += target.size(0)
           correct += (predicted == target).sum().item()


   return correct / total


def measure_inference_time(model, dataloader, device, num_runs=100):
   """Measure inference time"""
   model.eval()


   # Get a batch for testing
   data_iter = iter(dataloader)
   batch, _ = next(data_iter)
   batch = batch.to(device)


   # Warmup
   with torch.no_grad():
       for _ in range(10):
           _ = model(batch)


   # Measure inference time
   start_time = time.time()
   with torch.no_grad():
       for _ in range(num_runs):
           _ = model(batch)
   end_time = time.time()


   avg_time = (end_time - start_time) / num_runs
   return avg_time




### Now Let's Train the Baseline Model


The training uses `Cross-Entropy Loss` for classification determination

Cross-entropy loss, also known as log loss, is a function used in machine learning to measure the difference between two probability distributions. In classification tasks, it quantifies how well a model's predicted probabilities align with the actual (true) labels. It's particularly useful when dealing with multiple classes, and it penalizes confident misclassifications more heavily than uncertain ones. 

* If a model predicts a high probability for the correct class, the loss is low. 

* If a model predicts a high probability for an incorrect class, the loss is high. 

* The logarithm in the formula means that the penalty for confident wrong predictions is much greater than for incorrect predictions with low confidence. 


##### Example:

Let's say you have a binary classification problem (e.g., cat vs. not cat). If the true label is "cat" (1), and the model predicts a probability of 0.9 for "cat", the cross-entropy loss will be relatively low. However, if the model predicts a probability of only 0.2 for "cat", the loss will be significantly higher, reflecting the model's poor confidence in the correct prediction. 

##### Why use it?
Cross-entropy loss is a popular choice for classification problems because it:

* **Encourages confident predictions:** It pushes the model to make strong predictions, reducing uncertainty. 

* **Penalizes misclassifications effectively:** It penalizes confident errors more severely than uncertain errors. 

* **Works well with _softmax activation_:** It's often used in conjunction with softmax in the output layer of neural networks. 


In [None]:
def train_model(model, train_loader, optimizer, criterion, device, epochs=5):
   """Train the model"""
   model.train()
   training_losses = []


   for epoch in range(epochs):
       running_loss = 0.0
       for i, (inputs, labels) in enumerate(train_loader):
           inputs, labels = inputs.to(device), labels.to(device)


           # Zero the parameter gradients
           optimizer.zero_grad()


           # Forward + backward + optimize
           outputs = model(inputs)
           loss = criterion(outputs, labels)
           loss.backward()
           optimizer.step()


           # Print statistics
           running_loss += loss.item()
           if i % 100 == 99:  # Print every 100 mini-batches
               print(f'Epoch {epoch+1}, Batch {i+1}: Loss = {running_loss/100:.4f}')
               training_losses.append(running_loss/100)
               running_loss = 0.0


   return training_losses

# Train and evaluate regular model (baseline)
print("\n" + "="*50)
print("BASELINE MODEL TRAINING AND EVALUATION")
print("="*50)


model_fp32 = CIFAR10CNN().to(train_device)
optimizer = optim.Adam(model_fp32.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()


print("Training baseline model...")
train_losses = train_model(
   model_fp32, train_loader, optimizer, criterion, train_device, epochs=3
)


# Evaluate on the same device used for training
print("Evaluating baseline model...")
fp32_accuracy = evaluate_model(model_fp32, test_loader, train_device)
fp32_inference_time = measure_inference_time(model_fp32, test_loader, train_device)
fp32_size = get_model_size(model_fp32)
print(f"FP32 Model Accuracy: {fp32_accuracy:.4f}")
print(f"FP32 Inference Time per batch: {fp32_inference_time*1000:.2f} ms")
print(f"FP32 Model Size: {fp32_size:.2f} MB")


# Save the model state dict for later use
torch.save(model_fp32.state_dict(), "fp32_model.pth")


# Move model to CPU for quantization operations
model_fp32_cpu = CIFAR10CNN()
model_fp32_cpu.load_state_dict(model_fp32.state_dict())
model_fp32_cpu.eval()


# For fair comparison, also measure FP32 performance on CPU
fp32_cpu_inference_time = measure_inference_time(model_fp32_cpu, test_loader_cpu, "cpu")
print(f"FP32 Inference Time on CPU: {fp32_cpu_inference_time*1000:.2f} ms")

---
---

# Dynamic Quantization

---
---

This notebook assumes you've trained the baseline model and have the following variables available:

- model_fp32_cpu: The baseline CPU model
- test_loader_cpu: DataLoader for CPU testing
- fp32_accuracy: Baseline accuracy
- fp32_size: Baseline model size
- fp32_cpu_inference_time: Baseline CPU inference time


#### Special note for macOS users
Ensure PyTorch is Built with Quantization Backends: Verify that your PyTorch installation includes the necessary quantization backends. For CPU quantization, FBGEMM (for x86) and QNNPACK (for ARM) are common. If PyTorch was installed from source, ensure these backends were enabled during compilation. If installed via pip or conda, confirm you are using a build that includes them.

Use 
```
# Example for checking FBGEMM support (if applicable)
    import torch
    print(torch.backends.quantized.engine_list())

```

Explicitly Set the Quantization Backend: Although dynamic quantization often automatically selects a backend, explicitly setting it can sometimes help fix this error exception

`RuntimeError: Didn't find engine for operation quantized::linear_prepack NoQEngine`


#### Example: Set qnnpack as the default engine

```bash
torch.backends.quantized.engine = 'qnnpack'

```

In [None]:
# Post-Training Dynamic Quantization
print("\n" + "="*50)
print("POST-TRAINING DYNAMIC QUANTIZATION (CPU)")
print("="*50)


# Apply dynamic quantization on CPU model
print("Applying dynamic quantization...")
torch.backends.quantized.engine = "qnnpack"
model_dynamic = torch.quantization.quantize_dynamic(
   model_fp32_cpu,  # Use CPU model
   {nn.Linear, nn.Conv2d},  # Quantize both linear and conv layers
   dtype=torch.qint8  # Use 8-bit integers
)


# Evaluate dynamically quantized model (always on CPU)
print("Evaluating dynamically quantized model on CPU...")
dynamic_accuracy = evaluate_model(model_dynamic, test_loader_cpu, "cpu")
dynamic_inference_time = measure_inference_time(model_dynamic, test_loader_cpu, "cpu")
dynamic_size = get_model_size(model_dynamic)


print(f"Dynamic Quantized Model Accuracy: {dynamic_accuracy:.4f}")
print(f"Accuracy Change: {(dynamic_accuracy - fp32_accuracy)*100:.2f}%")
print(f"Dynamic Quantized Inference Time (CPU): {dynamic_inference_time*1000:.2f} ms")
print(f"FP32 Inference Time (CPU): {fp32_cpu_inference_time*1000:.2f} ms")
print(f"Speedup vs FP32 on CPU: {fp32_cpu_inference_time/dynamic_inference_time:.2f}x")
print(f"Dynamic Quantized Model Size: {dynamic_size:.2f} MB")
print(f"Size Reduction: {(1 - dynamic_size/fp32_size)*100:.2f}%")


# Visualization for Dynamic Quantization vs FP32
plt.figure(figsize=(15, 5))


# Accuracy comparison
plt.subplot(1, 3, 1)
models = ['FP32', 'Dynamic Quantization']
accuracies = [fp32_accuracy, dynamic_accuracy]
plt.bar(models, accuracies, color=['blue', 'green'])
plt.title('Model Accuracy')
plt.ylabel('Accuracy')
plt.ylim(min(accuracies) - 0.05, 1.0)


# Size comparison
plt.subplot(1, 3, 2)
sizes = [fp32_size, dynamic_size]
bars = plt.bar(models, sizes, color=['blue', 'green'])
plt.title('Model Size (MB)')
plt.ylabel('Size (MB)')
# Add percentage reduction label
reduction = (1 - dynamic_size/fp32_size) * 100
plt.text(bars[1].get_x() + bars[1].get_width()/2, dynamic_size + 0.5,
        f"{reduction:.1f}% reduction",
        ha='center', va='bottom')


# Inference time comparison (all on CPU for fair comparison)
plt.subplot(1, 3, 3)
times = [fp32_cpu_inference_time*1000, dynamic_inference_time*1000]
bars = plt.bar(models, times, color=['blue', 'green'])
plt.title('Inference Time on CPU (ms)')
plt.ylabel('Time (ms)')
# Add speedup label
speedup = fp32_cpu_inference_time/dynamic_inference_time
plt.text(bars[1].get_x() + bars[1].get_width()/2, dynamic_inference_time*1000 + 1,
        f"{speedup:.1f}x faster",
        ha='center', va='bottom')


plt.tight_layout()
plt.savefig('dynamic_quantization_comparison.png')
plt.show()


---
---

# Static Quantization

---
---

This notebook assumes you've trained the baseline model and have the following variables available:

- model_fp32_cpu: The baseline CPU model
- test_loader_cpu: DataLoader for CPU testing
- fp32_accuracy: Baseline accuracy
- fp32_size: Baseline model size
- fp32_cpu_inference_time: Baseline CPU inference time


In [None]:
# Static Quantization

# Define a PTQ-ready model with proper quantization support
class QuantizablePTQCIFAR10CNN(nn.Module):
   def __init__(self):
       super(QuantizablePTQCIFAR10CNN, self).__init__()
       # Quantization stubs
       self.quant = torch.quantization.QuantStub()
       self.dequant = torch.quantization.DeQuantStub()


       # Feature extraction layers
       self.features = nn.Sequential(
           nn.Conv2d(3, 64, kernel_size=3, padding=1),
           nn.ReLU(inplace=True),
           nn.MaxPool2d(kernel_size=2),


           nn.Conv2d(64, 128, kernel_size=3, padding=1),
           nn.ReLU(inplace=True),
           nn.MaxPool2d(kernel_size=2),


           nn.Conv2d(128, 256, kernel_size=3, padding=1),
           nn.ReLU(inplace=True),
           nn.MaxPool2d(kernel_size=2),
       )


       # Classification layers
       self.classifier = nn.Sequential(
           nn.Flatten(),
           nn.Linear(256 * 4 * 4, 1024),
           nn.ReLU(inplace=True),
           nn.Linear(1024, 10)
       )


   def forward(self, x):
       x = self.quant(x)
       x = self.features(x)
       x = self.classifier(x)
       x = self.dequant(x)
       return x

print("\n" + "="*50)
print("POST-TRAINING STATIC QUANTIZATION (CPU)")
print("="*50)


# Create quantizable model on CPU
model_static = QuantizablePTQCIFAR10CNN()
# Load state dict from the trained model
model_static.load_state_dict(model_fp32.state_dict())
model_static.eval()


# Configure static quantization
model_static.qconfig = torch.quantization.get_default_qconfig("qnnpack") # for x86 use "fbgemm"


# Prepare for static quantization
print("Preparing model for static quantization...")
model_static_prepared = torch.quantization.prepare(model_static)


# Calibration function to determine optimal quantization parameters
def calibrate(model, data_loader, num_batches=10):
   model.eval()
   with torch.no_grad():
       for i, (data, _) in enumerate(data_loader):
           if i >= num_batches:
               break
           # No need to explicitly move to CPU since model is on CPU
           _ = model(data)


# Calibrate with sample data
print("Calibrating with test data...")
calibrate(model_static_prepared, test_loader_cpu)


# Convert to fully quantized model
print("Converting to quantized model...")
model_static_quantized = torch.quantization.convert(model_static_prepared)

# Evaluate statically quantized model on CPU
print("Evaluating statically quantized model on CPU...")
static_accuracy = evaluate_model(model_static_quantized, test_loader_cpu, "cpu")
static_inference_time = measure_inference_time(model_static_quantized, test_loader_cpu, "cpu")
static_size = get_model_size(model_static_quantized)


print(f"Static Quantized Model Accuracy: {static_accuracy:.4f}")
print(f"Accuracy Change: {(static_accuracy - fp32_accuracy)*100:.2f}%")
print(f"Static Quantized Inference Time (CPU): {static_inference_time*1000:.2f} ms")
print(f"Speedup vs FP32 on CPU: {fp32_cpu_inference_time/static_inference_time:.2f}x")
print(f"Static Quantized Model Size: {static_size:.2f} MB")
print(f"Size Reduction: {(1 - static_size/fp32_size)*100:.2f}%")


# Visualization for Static Quantization vs FP32
plt.figure(figsize=(15, 5))


# Accuracy comparison
plt.subplot(1, 3, 1)
models = ['FP32', 'Static Quantization']
accuracies = [fp32_accuracy, static_accuracy]
plt.bar(models, accuracies, color=['blue', 'orange'])
plt.title('Model Accuracy')
plt.ylabel('Accuracy')
plt.ylim(min(accuracies) - 0.05, 1.0)


# Size comparison
plt.subplot(1, 3, 2)
sizes = [fp32_size, static_size]
bars = plt.bar(models, sizes, color=['blue', 'orange'])
plt.title('Model Size (MB)')
plt.ylabel('Size (MB)')
# Add percentage reduction label
reduction = (1 - static_size/fp32_size) * 100
plt.text(bars[1].get_x() + bars[1].get_width()/2, static_size + 0.5,
        f"{reduction:.1f}% reduction",
        ha='center', va='bottom')


# Inference time comparison
plt.subplot(1, 3, 3)
times = [fp32_cpu_inference_time*1000, static_inference_time*1000]
bars = plt.bar(models, times, color=['blue', 'orange'])
plt.title('Inference Time on CPU (ms)')
plt.ylabel('Time (ms)')
# Add speedup label
speedup = fp32_cpu_inference_time/static_inference_time
plt.text(bars[1].get_x() + bars[1].get_width()/2, static_inference_time*1000 + 1,
        f"{speedup:.1f}x faster",
        ha='center', va='bottom')


plt.tight_layout()
plt.savefig('static_quantization_comparison.png')
plt.show()

---
---

# Quantization Aware Training

---
---

This notebook assumes you've trained the baseline model and have the following variables available:

- model_fp32_cpu: The baseline CPU model
- test_loader_cpu: DataLoader for CPU testing
- fp32_accuracy: Baseline accuracy
- fp32_size: Baseline model size
- fp32_cpu_inference_time: Baseline CPU inference time


In [None]:
# Define a QAT-ready model with proper quantization support
class QuantizableCIFAR10CNN(nn.Module):
   def __init__(self):
       super(QuantizableCIFAR10CNN, self).__init__()
       # Quantization stubs
       self.quant = torch.quantization.QuantStub()
       self.dequant = torch.quantization.DeQuantStub()


       # Feature extraction layers
       self.features = nn.Sequential(
           nn.Conv2d(3, 64, kernel_size=3, padding=1),
           nn.ReLU(inplace=True),
           nn.MaxPool2d(kernel_size=2),


           nn.Conv2d(64, 128, kernel_size=3, padding=1),
           nn.ReLU(inplace=True),
           nn.MaxPool2d(kernel_size=2),


           nn.Conv2d(128, 256, kernel_size=3, padding=1),
           nn.ReLU(inplace=True),
           nn.MaxPool2d(kernel_size=2),
       )


       # Classification layers
       self.classifier = nn.Sequential(
           nn.Flatten(),
           nn.Linear(256 * 4 * 4, 1024),
           nn.ReLU(inplace=True),
           nn.Linear(1024, 10)
       )


   def forward(self, x):
       x = self.quant(x)
       x = self.features(x)
       x = self.classifier(x)
       x = self.dequant(x)
       return x



# Quantization-Aware Training
print("\n" + "="*50)
print("QUANTIZATION-AWARE TRAINING")
print("="*50)


# Create new model for QAT
model_qat = QuantizableCIFAR10CNN().to(train_device)
model_qat.load_state_dict(model_fp32.state_dict())


# Configure optimizer with lower learning rate for fine-tuning
optimizer_qat = optim.Adam(model_qat.parameters(), lr=0.0001)
criterion_qat = nn.CrossEntropyLoss()


# Initial training in FP32
print("Initial training phase (FP32)...")
model_qat.train()
for i, (inputs, labels) in enumerate(train_loader):
   if i >= 200:  # Just a few batches for initial FP32 training
       break


   inputs, labels = inputs.to(train_device), labels.to(train_device)
   optimizer_qat.zero_grad()
   outputs = model_qat(inputs)
   loss = criterion_qat(outputs, labels)
   loss.backward()
   optimizer_qat.step()


# Move to CPU for QAT preparation
model_qat = model_qat.cpu()


# Prepare for QAT
print("Preparing for QAT...")
model_qat.qconfig = torch.quantization.get_default_qat_qconfig('fbgemm')
torch.quantization.prepare_qat(model_qat, inplace=True)


# Move back to training device
model_qat = model_qat.to(train_device)


# QAT training
print("QAT training phase...")
model_qat.train()
qat_losses = []
for epoch in range(2):  # 2 epochs of QAT
   running_loss = 0.0
   for i, (inputs, labels) in enumerate(train_loader):
       inputs, labels = inputs.to(train_device), labels.to(train_device)


       optimizer_qat.zero_grad()
       outputs = model_qat(inputs)
       loss = criterion_qat(outputs, labels)
       loss.backward()
       optimizer_qat.step()


       running_loss += loss.item()
       if i % 100 == 99:
           print(f'QAT Epoch {epoch+1}, Batch {i+1}: Loss = {running_loss/100:.4f}')
           qat_losses.append(running_loss/100)
           running_loss = 0.0


       if i >= 300:  # Limit batches for demonstration
           break


# Evaluate QAT model before final conversion
print("Evaluating QAT model before conversion...")
qat_fp32_accuracy = evaluate_model(model_qat, test_loader, train_device)
print(f"QAT Model Accuracy (before conversion): {qat_fp32_accuracy:.4f}")


# Convert QAT model to fully quantized model (on CPU)
print("Converting QAT model to deployable quantized model...")
model_qat = model_qat.cpu()
model_qat_quantized = torch.quantization.convert(model_qat)


# Evaluate final QAT model
print("Evaluating final QAT model on CPU...")
qat_accuracy = evaluate_model(model_qat_quantized, test_loader_cpu, "cpu")
qat_inference_time = measure_inference_time(model_qat_quantized, test_loader_cpu, "cpu")
qat_size = get_model_size(model_qat_quantized)


print(f"QAT Model Accuracy: {qat_accuracy:.4f}")
print(f"Accuracy Change vs FP32: {(qat_accuracy - fp32_accuracy)*100:.2f}%")
print(f"QAT Model Inference Time (CPU): {qat_inference_time*1000:.2f} ms")
print(f"Speedup vs FP32 on CPU: {fp32_cpu_inference_time/qat_inference_time:.2f}x")
print(f"QAT Model Size: {qat_size:.2f} MB")
print(f"Size Reduction vs FP32: {(1 - qat_size/fp32_size)*100:.2f}%")


# Visualization for QAT vs FP32
plt.figure(figsize=(15, 5))


# Accuracy comparison
plt.subplot(1, 3, 1)
models = ['FP32', 'QAT']
accuracies = [fp32_accuracy, qat_accuracy]
plt.bar(models, accuracies, color=['blue', 'red'])
plt.title('Model Accuracy')
plt.ylabel('Accuracy')
plt.ylim(min(accuracies) - 0.05, 1.0)


# Size comparison
plt.subplot(1, 3, 2)
sizes = [fp32_size, qat_size]
bars = plt.bar(models, sizes, color=['blue', 'red'])
plt.title('Model Size (MB)')
plt.ylabel('Size (MB)')
# Add percentage reduction label
reduction = (1 - qat_size/fp32_size) * 100
plt.text(bars[1].get_x() + bars[1].get_width()/2, qat_size + 0.5,
        f"{reduction:.1f}% reduction",
        ha='center', va='bottom')


# Inference time comparison
plt.subplot(1, 3, 3)
times = [fp32_cpu_inference_time*1000, qat_inference_time*1000]
bars = plt.bar(models, times, color=['blue', 'red'])
plt.title('Inference Time on CPU (ms)')
plt.ylabel('Time (ms)')
# Add speedup label
speedup = fp32_cpu_inference_time/qat_inference_time
plt.text(bars[1].get_x() + bars[1].get_width()/2, qat_inference_time*1000 + 1,
        f"{speedup:.1f}x faster",
        ha='center', va='bottom')


plt.tight_layout()
plt.savefig('qat_comparison.png')
plt.show()

---
---

# Comparing Quantization Techniques

---
---

This notebook assumes you've run all previous parts and have these variables:

- fp32_accuracy, fp32_size, fp32_cpu_inference_time (baseline)
- dynamic_accuracy, dynamic_size, dynamic_inference_time (dynamic quantization)
- static_accuracy, static_size, static_inference_time (static quantization)
- qat_accuracy, qat_size, qat_inference_time (QAT)


In [None]:
# Results Comparison
print("\n" + "="*80)
print("QUANTIZATION COMPARISON SUMMARY")
print("="*80)
print(f"{'Model Type':<25} {'Size (MB)':<15} {'Accuracy':<15} {'Inference (ms)':<15} {'Speedup':<10}")
print(f"{'-'*80}")
# For fair comparison, use CPU times for all models
print(f"{'Original FP32 (CPU)':<25} {fp32_size:<15.2f} {fp32_accuracy:<15.4f} {fp32_cpu_inference_time*1000:<15.2f} {1.0:<10.2f}x")
print(f"{'Dynamic Quantization':<25} {dynamic_size:<15.2f} {dynamic_accuracy:<15.4f} {dynamic_inference_time*1000:<15.2f} {fp32_cpu_inference_time/dynamic_inference_time:<10.2f}x")
print(f"{'Static Quantization':<25} {static_size:<15.2f} {static_accuracy:<15.4f} {static_inference_time*1000:<15.2f} {fp32_cpu_inference_time/static_inference_time:<10.2f}x")
print(f"{'QAT':<25} {qat_size:<15.2f} {qat_accuracy:<15.4f} {qat_inference_time*1000:<15.2f} {fp32_cpu_inference_time/qat_inference_time:<10.2f}x")
print("="*80)


# Create a comprehensive visualization
plt.figure(figsize=(20, 15))


# Model Size Comparison
plt.subplot(2, 2, 1)
models = ['FP32', 'Dynamic', 'Static', 'QAT']
colors = ['blue', 'green', 'orange', 'red']
sizes = [fp32_size, dynamic_size, static_size, qat_size]
bars = plt.bar(models, sizes, color=colors)
plt.title('Model Size (MB)', fontsize=14)
plt.ylabel('Size (MB)', fontsize=12)


# Add reduction percentages
for i, bar in enumerate(bars[1:], 1):
   reduction = (1 - sizes[i]/sizes[0]) * 100
   plt.text(bar.get_x() + bar.get_width()/2, sizes[i] + 0.5,
            f"{reduction:.1f}%\nreduction",
            ha='center', va='bottom', fontsize=11)


# Accuracy Comparison
plt.subplot(2, 2, 2)
accuracies = [fp32_accuracy, dynamic_accuracy, static_accuracy, qat_accuracy]
bars = plt.bar(models, accuracies, color=colors)
plt.title('Model Accuracy', fontsize=14)
plt.ylabel('Accuracy', fontsize=12)
plt.ylim(min(accuracies) - 0.05, 1.0)


# Add accuracy change percentages
for i, bar in enumerate(bars[1:], 1):
   change = (accuracies[i] - accuracies[0]) * 100
   color = 'green' if change >= 0 else 'red'
   prefix = '+' if change >= 0 else ''
   plt.text(bar.get_x() + bar.get_width()/2, accuracies[i] + 0.01,
            f"{prefix}{change:.2f}%",
            ha='center', va='bottom', fontsize=11, color=color)


# Inference Time Comparison
plt.subplot(2, 2, 3)
times = [fp32_cpu_inference_time*1000, dynamic_inference_time*1000,
        static_inference_time*1000, qat_inference_time*1000]
bars = plt.bar(models, times, color=colors)
plt.title('Inference Time on CPU (ms)', fontsize=14)
plt.ylabel('Time (ms)', fontsize=12)


# Add speedup labels
for i, bar in enumerate(bars[1:], 1):
   speedup = fp32_cpu_inference_time/[dynamic_inference_time, static_inference_time, qat_inference_time][i-1]
   plt.text(bar.get_x() + bar.get_width()/2, times[i]/2,
            f"{speedup:.2f}x\nfaster",
            ha='center', va='center', fontsize=11, color='white', weight='bold')


# Size-Speed-Accuracy Trade-off
plt.subplot(2, 2, 4)
plt.scatter(sizes, times, s=np.array(accuracies)*500, c=colors, alpha=0.7)


# Add labels for each point
for i, model in enumerate(models):
   plt.annotate(model,
              (sizes[i], times[i]),
              textcoords="offset points",
              xytext=(0,10),
              ha='center', fontsize=12)


plt.title('Size vs. Speed vs. Accuracy Trade-off', fontsize=14)
plt.xlabel('Model Size (MB)', fontsize=12)
plt.ylabel('Inference Time (ms)', fontsize=12)
plt.grid(True, linestyle='--', alpha=0.7)


# Add a legend for bubble size
plt.figtext(0.85, 0.25, "Bubble size represents accuracy", fontsize=12)


plt.tight_layout()
plt.savefig('quantization_methods_comparison.png')
plt.show()
