# Neural Network Quantization


In this assignment, the goal is to reduce the model size of a deep neural network. This action will result on lighter and faster model. We will rely on the PyTorch functionalities for quantizing neural networks. The autonomous driving models from the previous assignments will be used for that purpose. 

In our context the process of quantization will convert the floating point parameters (32-bit, single precision) to integer parameters.

Note that all scripts should be self-contained and executed on *any* machine that has required libraries installed.

**Important**: There is a helpful tutorial on quantization at [https://pytorch.org/tutorials/advanced/static_quantization_tutorial.html](https://pytorch.org/tutorials/advanced/static_quantization_tutorial.html).



## 1. Evaluation Metrics

You will make use of the feed-forward neural network from the autonomous driving assignment. The goal is to analyze is in terms of of inference time, [FLOPS](https://pypi.org/project/thop/) (floating operations), [model size](https://discuss.pytorch.org/t/finding-model-size/130275) (in MB) and accuracy (classification problem). This task does not require training. A pre-trained model from the previous assignments can be employed.

*Task Output*: The feed-forward model from the autonomous driving assignment should be used in order to compute the execution time, FLOPS, model size and accuracy. For that reason, one function for each metric should be created. The same functions will be later used for evaluating the quantized model.

*Important*: The scripts should be **self-contained**.

In [1]:
# datasetclass with augmentations for CNN model

import torch
from torch.utils.data import Dataset, DataLoader, random_split
import numpy as np
import random
from torchvision import transforms
from PIL import Image

class CarRacingDataset(Dataset):
    def __init__(self, npz_file, augmentations=False, horizontal_flip=False,
                 random_rotation=False, vertical_flip=False, street_color_change=False):
        """
        Args:
            npz_file (str): Path to the .npz file containing 'frames' and 'actions'.
            augmentations (bool): Whether to apply augmentations.
        """
        data = np.load(npz_file)
        self.frames = data['frames']
        self.actions = data['actions']
        self.augmentations = augmentations
        self.horizontal_flip = horizontal_flip
        self.random_rotation = random_rotation
        self.vertical_flip = vertical_flip
        self.street_color_change = street_color_change
        self.to_pil = transforms.ToPILImage()
        self.to_tensor = transforms.ToTensor()

    def __len__(self):
        return len(self.frames)

    def __getitem__(self, idx):
        frame = self.frames[idx]
        action = self.actions[idx].copy()  # Prevent in-place edit

        image = self.to_pil(frame)

        # Horizontal flip: must flip steering direction
        if random.random() < 0.2 and self.horizontal_flip and self.augmentations:
            image = image.transpose(Image.FLIP_LEFT_RIGHT)
            action[0] = -action[0]  # Flip steering

        # Random rotation (small angles)
        if random.random() < 0.2 and self.random_rotation and self.augmentations:
            angle = random.uniform(-20, 20)
            image = image.rotate(angle)

        # Vertical flip (no action change)
        if random.random() < 0.2 and self.vertical_flip and self.augmentations:
            image = image.transpose(Image.FLIP_TOP_BOTTOM)

        # Street color change
        if random.random() < 0.2 and self.street_color_change and self.augmentations:
            gray_min = np.array([100, 100, 100])
            gray_max = np.array([150, 150, 150])
            brown_min = np.array([90, 60, 30])
            brown_max = np.array([150, 100, 60])
            img_np = np.array(image)
            mask = np.all((img_np >= gray_min) & (img_np <= gray_max), axis=2)
            brown_color = np.array([random.randint(brown_min[i], brown_max[i]) for i in range(3)], dtype=np.uint8)
            img_np[mask] = brown_color
            image = Image.fromarray(img_np)

        image = self.to_tensor(image)
        action = torch.tensor(action, dtype=torch.float32)
        return image, action

In [2]:
#define CNN network architecture used as before

import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader
import numpy as np

# Define CNN model for 96x96 images and 3 value o/p
class CarCNN(nn.Module):
    def __init__(self):
        super(CarCNN, self).__init__()
        self.net = nn.Sequential(
            nn.Conv2d(3, 32, 5, stride=2, padding=2),
            nn.ReLU(),
            nn.Conv2d(32, 64, 5, stride=2, padding=2),
            nn.ReLU(),
            nn.Conv2d(64, 128, 5, stride=2, padding=2),
            nn.ReLU(),
            nn.Flatten(),
            nn.Linear(128 * 12 * 12, 128),
            nn.ReLU(),
            nn.Linear(128, 3),      # Output: [steer, gas, brake]
            nn.Tanh()               # Ensure outputs in [-1, 1]
        )

    def forward(self, x):
        return self.net(x)

In [3]:
import torch
import os
import time
from thop import profile

def measure_inference_time(model, dataloader, device, num_batches=10):
    total_time = 0
    with torch.no_grad():
        for i, (inputs, _) in enumerate(dataloader):
            if i >= num_batches:
                break
            inputs = inputs.to(device)
            start = time.time()
            _ = model(inputs)
            end = time.time()
            total_time += (end - start)
    avg_time = total_time / num_batches
    return avg_time

# Function 2: Compute FLOPs and parameters
def compute_flops(model, input_size, device):
    dummy_input = torch.randn(*input_size).to(device)
    flops, params = profile(model, inputs=(dummy_input,), verbose=False)
    return flops, params

# Function 3: Compute model size in MB
def compute_model_size(model_path):
    size_bytes = os.path.getsize(model_path)
    size_mb = size_bytes / (1024 * 1024)
    return size_mb

# Function 4: Compute regression MSE instead of classification accuracy
def compute_regression_mse(model, dataloader, device):
    model.eval()
    total_mse = 0
    count = 0
    criterion = nn.MSELoss()
    
    with torch.no_grad():
        for inputs, targets in dataloader:
            inputs = inputs.to(device)
            targets = targets.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            total_mse += loss.item() * inputs.size(0)
            count += inputs.size(0)
    
    return total_mse / count


In [4]:
#calculate the metrics for the CNN model
from torch.utils.data import DataLoader

#load the model
model_path = "car_cnn_final_augmented.pth"
device = torch.device("cuda" if torch.cuda.is_available else "CPU")
model = CarCNN().to(device)
model.load_state_dict(torch.load(model_path, map_location=device))
model.eval()

npz_file_path = "continuous_15k_dataset.npz"
# create the Dataset
dataset = CarRacingDataset(
    npz_file=npz_file_path,
    augmentations=True, #making augmentations to false here renders the other parameters useless
    horizontal_flip=False,
    vertical_flip=False,
    street_color_change=True
)

testLoader = DataLoader(dataset, batch_size=32, shuffle=False)
# Measure inference time
inference_time = measure_inference_time(model, testLoader, device)
print(f"Inference time per batch: {inference_time:.4f} seconds")
# Compute FLOPs
input_size = (1, 3, 96, 96)  # Batch size 1, 3 channels, 96x96 image
flops, _ = compute_flops(model, input_size, device)
print(f"FLOPs: {flops / 1e6:.2f} Mega FLOPs")  # Convert to GFLOPs
# Compute model size
model_size = compute_model_size(model_path)
print(f"Model size: {model_size:.2f} MB")
# Compute accuracy on test set
mse = compute_regression_mse(model, testLoader, device)
print(f"Mean Squared Error for regression: {mse:.4f}")


  model.load_state_dict(torch.load(model_path, map_location=device))


Inference time per batch: 0.0170 seconds
FLOPs: 66.87 Mega FLOPs
Model size: 9.99 MB
Mean Squared Error for regression: 0.0312


In [5]:
# datasetclass with augmentations for CNN + RNN model

import torch
from torch.utils.data import Dataset, DataLoader, random_split
import numpy as np
import random
from torchvision import transforms
from PIL import Image

class CarRacingEpisodeDataset(Dataset):
    def __init__(self, npz_path, sequence_length=10,
                 augmentations=True, horizontal_flip=True,
                 random_rotation=True, vertical_flip=False,
                 street_color_change=False):
        data = np.load(npz_path, allow_pickle=True)
        raw_episodes = data["episodes"]

        # Unpack all episodes into frame-action pairs with episode index
        self.sequences = []  # Each entry: (episode_idx, start_idx)
        self.episodes = []

        for episode_idx, (frames, actions) in enumerate(raw_episodes):
            frames = np.array(frames)
            actions = np.array(actions)
            self.episodes.append((frames, actions))
            if len(frames) >= sequence_length:
                for i in range(len(frames) - sequence_length + 1):
                    self.sequences.append((episode_idx, i))

        self.sequence_length = sequence_length
        self.augmentations = augmentations
        self.horizontal_flip = horizontal_flip
        self.random_rotation = random_rotation
        self.vertical_flip = vertical_flip
        self.street_color_change = street_color_change

        self.to_pil = transforms.ToPILImage()
        self.to_tensor = transforms.ToTensor()

    def __len__(self):
        return len(self.sequences)

    def _apply_horizontal_flip(self, image, action):
        if random.random() < 0.2 and self.horizontal_flip and self.augmentations:
            image = image.transpose(Image.FLIP_LEFT_RIGHT)
            action[0] = -action[0]  # flip steering
        return image, action

    def _apply_random_rotation(self, image):
        if random.random() < 0.2 and self.random_rotation and self.augmentations:
            angle = random.uniform(-20, 20)
            return image.rotate(angle)
        return image

    def _apply_vertical_flip(self, image):
        if random.random() < 0.2 and self.vertical_flip and self.augmentations:
            return image.transpose(Image.FLIP_TOP_BOTTOM)
        return image

    def _apply_street_color_change(self, image):
        if random.random() < 0.2 and self.street_color_change and self.augmentations:
            gray_min = np.array([100, 100, 100])
            gray_max = np.array([150, 150, 150])
            brown_min = np.array([90, 60, 30])
            brown_max = np.array([150, 100, 60])
            img_np = np.array(image)
            mask = np.all((img_np >= gray_min) & (img_np <= gray_max), axis=2)
            brown_color = np.array([random.randint(brown_min[i], brown_max[i]) for i in range(3)], dtype=np.uint8)
            img_np[mask] = brown_color
            return Image.fromarray(img_np)
        return image

    def __getitem__(self, idx):
        episode_idx, start_idx = self.sequences[idx]
        frames, actions = self.episodes[episode_idx]
        frames_seq = []
        actions_seq = []

        for i in range(self.sequence_length):
            frame = frames[start_idx + i]
            action = actions[start_idx + i].copy()

            image = self.to_pil(frame)
            image, action = self._apply_horizontal_flip(image, action)
            image = self._apply_random_rotation(image)
            image = self._apply_vertical_flip(image)
            image = self._apply_street_color_change(image)

            image = self.to_tensor(image)
            frames_seq.append(image)
            actions_seq.append(torch.tensor(action, dtype=torch.float32))

        return torch.stack(frames_seq), torch.stack(actions_seq)  # [T, C, H, W], [T, 3]


In [6]:
#define RNN network architecture used as before

import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader, Subset
import numpy as np

class CarCNN_RNN(nn.Module):
    def __init__(self, hidden_size=128, num_layers=1):
        super(CarCNN_RNN, self).__init__()
        self.cnn = nn.Sequential(
            nn.Conv2d(3, 32, 5, stride=2, padding=2),  # 96x96 -> 48x48
            nn.ReLU(),
            nn.Conv2d(32, 64, 5, stride=2, padding=2),  # 48x48 -> 24x24
            nn.ReLU(),
            nn.Conv2d(64, 128, 5, stride=2, padding=2),  # 24x24 -> 12x12
            nn.ReLU()
        )
        self.feature_size = 128 * 12 * 12
        self.rnn = nn.GRU(self.feature_size, hidden_size, num_layers, batch_first=True)
        # Output: [steer, gas, brake]
        # It's often better to have separate heads for different action types
        self.fc_steer = nn.Linear(hidden_size, 1)
        self.fc_gas = nn.Linear(hidden_size, 1)
        self.fc_brake = nn.Linear(hidden_size, 1)

    def forward(self, x):
        B, T, C, H, W = x.size()
        cnn_features = []
        for t in range(T):
            out = self.cnn(x[:, t])  # [B, 128, 12, 12]
            out = out.reshape(B, -1) # [B, 18432]
            cnn_features.append(out)
        cnn_features = torch.stack(cnn_features, dim=1)  # [B, T, feature_size]
        rnn_out, _ = self.rnn(cnn_features)              # [B, T, hidden_size]

        # Apply activations per action type
        steer = torch.tanh(self.fc_steer(rnn_out))  # [-1, 1]
        gas = torch.sigmoid(self.fc_gas(rnn_out))    # [0, 1]
        brake = torch.sigmoid(self.fc_brake(rnn_out)) # [0, 1]

        # Concatenate outputs
        out = torch.cat((steer, gas, brake), dim=-1) # [B, T, 3]
        return out

In [7]:
#calculate the metrics for the RNN model

from torch.utils.data import DataLoader

#load the model
model_path = "unfiltered_IP_cnn_rnn_final_continuous.pth"
device = torch.device("cuda" if torch.cuda.is_available else "CPU")
model = CarCNN_RNN().to(device)
model.load_state_dict(torch.load(model_path, map_location=device))
model.eval()

npz_file_path = "Unfiltered_IP_CNN_RNN_continuous_15k_dataset.npz"
# create the Dataset
dataset = CarRacingEpisodeDataset(npz_path=npz_file_path)

testLoader = DataLoader(dataset, batch_size=32, shuffle=False)
# Measure inference time
inference_time = measure_inference_time(model, testLoader, device)
print(f"Inference time per batch: {inference_time:.4f} seconds")
# Compute FLOPs
input_size = (1, 10, 3, 96, 96)  # Batch size 1, sequence length 10, 3 channels, 96x96 image
flops, _ = compute_flops(model, input_size, device)
print(f"FLOPs: {flops / 1e6:.2f} Mega FLOPs")  # Convert to GFLOPs
# Compute model size
model_size = compute_model_size(model_path)
print(f"Model size: {model_size:.2f} MB")
# Compute accuracy on test set
mse = compute_regression_mse(model, testLoader, device)
print(f"Mean Squared Error for regression: {mse:.4f}")

  model.load_state_dict(torch.load(model_path, map_location=device))


Inference time per batch: 0.0089 seconds
FLOPs: 716.41 Mega FLOPs
Model size: 28.18 MB
Mean Squared Error for regression: 0.2729


## 2. Static Quantization

In this task, the parameters of the feed forward model will be quantized. To reach this goal, experimental functions of PyTorch will be used such as: `torch.quantization`. The quantization is static and thus it does not include any training process. 

Check PyTorch tutorial on [quantization](https://pytorch.org/docs/stable/quantization.html#post-training-static-quantization).

*Task Output*: The weights of the model are float variables. They should be converted to int. Then the execution time, FLOPS, model size and accuracy should be computed and compared to the original model.

*Important*: Quantization is possible on the eager mode of PyTorch. This requires to install another version of PyTorch.

*Important*: The scripts should be **self-contained**.

In [8]:
#define quantised CNN network architecture with quantisation stubs

import torch
import torch.nn as nn
import torch.optim as optim
import torch.quantization
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader
import numpy as np

# Define quantised CNN model for 96x96 images and 3 value o/p
class QuantizedCarCNN(nn.Module):
    def __init__(self):
        super(QuantizedCarCNN, self).__init__()
        self.quant = torch.quantization.QuantStub()
        self.dequant = torch.quantization.DeQuantStub()
        self.net = nn.Sequential(
            nn.Conv2d(3, 32, 5, stride=2, padding=2),
            nn.ReLU(),
            nn.Conv2d(32, 64, 5, stride=2, padding=2),
            nn.ReLU(),
            nn.Conv2d(64, 128, 5, stride=2, padding=2),
            nn.ReLU(),
            nn.Flatten(),
            nn.Linear(128 * 12 * 12, 128),
            nn.ReLU(),
            nn.Linear(128, 3),
            nn.Tanh()
        )

    def forward(self, x):
        x = self.quant(x)
        x = self.net(x)
        x = self.dequant(x)
        return x

In [9]:
#calculate the metrics for the quantised CNN model
from torch.utils.data import DataLoader

#load the model
model_path = "car_cnn_final_augmented.pth"
device_cpu = torch.device('cpu')

model_fp32  = QuantizedCarCNN().to(device_cpu)
model_fp32 .load_state_dict(torch.load(model_path, map_location=device_cpu, weights_only=True))
#move the model to cpu for quantization
model_fp32 .eval()
model_fp32 .to('cpu')

# Set quantization config
model_fp32.qconfig = torch.quantization.get_default_qconfig('fbgemm')

# Prepare model for static quantization
torch.quantization.prepare(model_fp32, inplace=True)

npz_file_path = "continuous_15k_dataset.npz"
# create the Dataset
dataset = CarRacingDataset(
    npz_file=npz_file_path,
    augmentations=True, #making augmentations to false here renders the other parameters useless
    horizontal_flip=False,
    vertical_flip=False,
    street_color_change=True
)

testLoader = DataLoader(dataset, batch_size=32, shuffle=False)

# Calibration with representative data
with torch.no_grad():
    for images, _ in testLoader:
        model_fp32(images)  # no labels needed
        break  # Only a small batch is needed for calibration

# Convert to quantized model
model_quantized = torch.quantization.convert(model_fp32, inplace=False)

# Move quantized model to CPU
model_quantized.to(device_cpu)

# Measure inference time
inference_time = measure_inference_time(model_quantized, testLoader, device_cpu)
print(f"[Quantized] Inference time per batch: {inference_time:.4f} seconds")
# Compute FLOPs
input_size = (1, 3, 96, 96)  # Batch size 1, 3 channels, 96x96 image
flops, _ = compute_flops(model_quantized, input_size, device_cpu)
print(f"[Quantized] FLOPs: {flops / 1e6:.2f} Mega FLOPs")  # Convert to GFLOPs
# Compute model size
quantized_model_path = "car_cnn_quantized.pth"
torch.save(model_quantized.state_dict(), quantized_model_path)
model_size = compute_model_size(quantized_model_path)
print(f"[Quantized] Model size: {model_size:.2f} MB")
# Compute accuracy on test set
mse = compute_regression_mse(model_quantized, testLoader, device_cpu)
print(f"[Quantized] Mean Squared Error for regression: {mse:.4f}")




[Quantized] Inference time per batch: 0.0140 seconds
[Quantized] FLOPs: 0.00 Mega FLOPs
[Quantized] Model size: 2.52 MB
[Quantized] Mean Squared Error for regression: 0.0607


## 3. Quantization-Aware Training

In this task, the parameters of the feed forward model will be quantized and trained at the same time. To reach this goal, experimental functions of PyTorch will be used such as: `torch.quantization`. This type of training is called Quantization-aware training (QAT). 

Check PyTorch tutorial on [quantization](https://pytorch.org/docs/stable/quantization.html#post-training-static-quantization)

*Task Output*: The already quantized model from the previous will be use to conduct the training process. The model should be trained until convergence. Then, then the execution time, FLOPS, model size and accuracy should be computed and compared to the static quantization and the original model.

*Important*: Quantization is possible on the eager mode of PyTorch. This requires to install another version of PyTorch.

*Important*: The scripts should be **self-contained**.

In [10]:
device = torch.device("cpu")  # QAT requires CPU execution

QATmodel = QuantizedCarCNN().to(device)

# Load original FP32 weights
quantised_weights = torch.load("car_cnn_final_augmented.pth", map_location=device)
QATmodel.load_state_dict(quantised_weights)

# Set QAT config
QATmodel.qconfig = torch.quantization.get_default_qat_qconfig("fbgemm")
torch.quantization.prepare_qat(QATmodel, inplace=True)

  quantised_weights = torch.load("car_cnn_final_augmented.pth", map_location=device)


QuantizedCarCNN(
  (quant): QuantStub(
    (activation_post_process): FusedMovingAvgObsFakeQuantize(
      fake_quant_enabled=tensor([1]), observer_enabled=tensor([1]), scale=tensor([1.]), zero_point=tensor([0], dtype=torch.int32), dtype=torch.quint8, quant_min=0, quant_max=127, qscheme=torch.per_tensor_affine, reduce_range=True
      (activation_post_process): MovingAverageMinMaxObserver(min_val=inf, max_val=-inf)
    )
  )
  (dequant): DeQuantStub()
  (net): Sequential(
    (0): Conv2d(
      3, 32, kernel_size=(5, 5), stride=(2, 2), padding=(2, 2)
      (weight_fake_quant): FusedMovingAvgObsFakeQuantize(
        fake_quant_enabled=tensor([1]), observer_enabled=tensor([1]), scale=tensor([1.]), zero_point=tensor([0], dtype=torch.int32), dtype=torch.qint8, quant_min=-128, quant_max=127, qscheme=torch.per_channel_symmetric, reduce_range=False
        (activation_post_process): MovingAveragePerChannelMinMaxObserver(min_val=tensor([]), max_val=tensor([]))
      )
      (activation_post_pr

In [11]:
#training of the QAT model

# Dataset
dataset = CarRacingDataset(
    npz_file="continuous_15k_dataset.npz",
    augmentations=False
)
train_loader = DataLoader(dataset, batch_size=32, shuffle=True)

# Optimizer & Loss
optimizer = optim.Adam(QATmodel.parameters(), lr=1e-4)
criterion = nn.MSELoss()

# QAT Training Loop
QATmodel.train()
num_epochs = 10  # Increase until convergence
for epoch in range(num_epochs):
    running_loss = 0.0
    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = QATmodel(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
    print(f"[Epoch {epoch+1}] Loss: {running_loss/len(train_loader):.4f}")

[Epoch 1] Loss: 0.0171
[Epoch 2] Loss: 0.0163
[Epoch 3] Loss: 0.0158
[Epoch 4] Loss: 0.0154
[Epoch 5] Loss: 0.0151
[Epoch 6] Loss: 0.0149
[Epoch 7] Loss: 0.0144
[Epoch 8] Loss: 0.0143
[Epoch 9] Loss: 0.0139
[Epoch 10] Loss: 0.0136


In [12]:
# calculate the metrics for the QAT model

#convert model to quantized version
QATmodel.eval()
QATmodel_quantized = torch.quantization.convert(QATmodel.eval(), inplace=False)

# Time
inference_time = measure_inference_time(QATmodel_quantized, train_loader, device)
print(f"[QAT] Inference time per batch: {inference_time:.4f} seconds")

# FLOPs
input_size = (1, 3, 96, 96)
flops, _ = compute_flops(QATmodel_quantized, input_size, device)
print(f"[QAT] FLOPs: {flops / 1e6:.2f} Mega FLOPs")

# Size
qat_model_path = "car_cnn_qat.pth"
torch.save(QATmodel_quantized.state_dict(), qat_model_path)
model_size = compute_model_size(qat_model_path)
print(f"[QAT] Model size: {model_size:.2f} MB")

# Accuracy (MSE)
mse = compute_regression_mse(QATmodel_quantized, train_loader, device)
print(f"[QAT] Mean Squared Error: {mse:.4f}")

[QAT] Inference time per batch: 0.0102 seconds
[QAT] FLOPs: 0.00 Mega FLOPs
[QAT] Model size: 2.52 MB
[QAT] Mean Squared Error: 0.0134
