In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
from brevitas.nn import QuantConv2d, QuantLinear, QuantReLU, QuantIdentity
from brevitas.quant import IntBias
from brevitas.core.restrict_val import RestrictValueType
from brevitas.quant import Int8ActPerTensorFloat, Uint8ActPerTensorFloat, Int32Bias
from brevitas.quant import Int8WeightPerTensorFloat, Int8WeightPerChannelFloat
from brevitas.quant.scaled_int import Int32Bias
from sklearn.metrics import confusion_matrix, precision_recall_fscore_support, roc_auc_score
import numpy as np
import time
import warnings

In [3]:
FIRST_LAYER_BIT_WIDTH = 8
MIDDLE_LAYER_BIT_WIDTH = 4

In [4]:

# For 8-bit weights
class Common8bitWeightPerTensorQuant(Int8WeightPerTensorFloat):
    scaling_min_val = 2e-16

class Common8bitWeightPerChannelQuant(Int8WeightPerChannelFloat):
    scaling_per_output_channel = True

# For 4-bit weights
class Common4bitWeightPerTensorQuant(Common8bitWeightPerTensorQuant):
    bit_width = 4

class Common4bitWeightPerChannelQuant(Common8bitWeightPerChannelQuant):
    bit_width = 4

# For 8-bit activations
class Common8bitActQuant(Int8ActPerTensorFloat):
    scaling_min_val = 2e-16
    restrict_scaling_type = RestrictValueType.LOG_FP

class Common8bitUintActQuant(Uint8ActPerTensorFloat):
    scaling_min_val = 2e-16
    restrict_scaling_type = RestrictValueType.LOG_FP

# For 4-bit activations
class Common4bitActQuant(Common8bitActQuant):
    bit_width = 4

class Common4bitUintActQuant(Common8bitUintActQuant):
    bit_width = 4

In [5]:
class QuantizedLeNet5(nn.Module):
    def __init__(self):
        super(QuantizedLeNet5, self).__init__()
        
        # Input quantization (8-bit)
        self.quant_inp = QuantIdentity(
            bit_width=8,
            return_quant_tensor=True,
            act_quant=Common8bitActQuant)
        
        # First convolutional layer (8-bit)
        self.conv1 = QuantConv2d(
            1, 6, kernel_size=5, stride=1, padding=2,
            weight_bit_width=8,
            bias=True,
            weight_quant=Common8bitWeightPerChannelQuant,
            input_quant=Common8bitActQuant,
            output_quant=Common8bitActQuant,
            return_quant_tensor=True)
        self.relu1 = QuantReLU(
            bit_width=8,
            act_quant=Common8bitUintActQuant,
            return_quant_tensor=True)
        
        # First average pooling layer
        self.avg_pool1 = nn.AvgPool2d(kernel_size=2, stride=2)
        
        # Second convolutional layer (4-bit)
        self.conv2 = QuantConv2d(
            6, 16, kernel_size=5, stride=1, padding=0,
            weight_bit_width=4,
            bias=True,
            weight_quant=Common4bitWeightPerChannelQuant,
            input_quant=Common4bitUintActQuant,
            output_quant=Common4bitActQuant,
            return_quant_tensor=True)
        self.relu2 = QuantReLU(
            bit_width=4,
            act_quant=Common4bitUintActQuant,
            return_quant_tensor=True)
        
        # Second average pooling layer
        self.avg_pool2 = nn.AvgPool2d(kernel_size=2, stride=2)
        
        # First fully connected layer (4-bit)
        self.fc1 = QuantLinear(
            16 * 5 * 5, 120,
            bias=True,
            weight_bit_width=4,
            weight_quant=Common4bitWeightPerTensorQuant,
            input_quant=Common4bitUintActQuant,
            output_quant=Common4bitActQuant,
            return_quant_tensor=True)
        self.relu3 = QuantReLU(
            bit_width=4,
            act_quant=Common4bitUintActQuant,
            return_quant_tensor=True)
        
        # Second fully connected layer (4-bit)
        self.fc2 = QuantLinear(
            120, 84,
            bias=True,
            weight_bit_width=4,
            weight_quant=Common4bitWeightPerTensorQuant,
            input_quant=Common4bitUintActQuant,
            output_quant=Common4bitActQuant,
            return_quant_tensor=True)
        self.relu4 = QuantReLU(
            bit_width=4,
            act_quant=Common4bitUintActQuant,
            return_quant_tensor=True)
        
        # Output layer (8-bit)
        self.fc3 = QuantLinear(
            84, 10,
            bias=True,
            weight_bit_width=8,
            weight_quant=Common8bitWeightPerTensorQuant,
            input_quant=Common8bitUintActQuant,
            output_quant=Common8bitActQuant,
            return_quant_tensor=True)
        
        # Output quantization (8-bit)
        self.quant_out = QuantIdentity(
            bit_width=8,
            act_quant=Common8bitActQuant,
            return_quant_tensor=True)

    def forward(self, x):
        x = self.quant_inp(x)
        x = self.relu1(self.conv1(x))
        x = self.avg_pool1(x)
        x = self.relu2(self.conv2(x))
        x = self.avg_pool2(x)
        x = x.view(x.size(0), -1)  # Flatten
        x = self.relu3(self.fc1(x))
        x = self.relu4(self.fc2(x))
        x = self.fc3(x)
        x = self.quant_out(x)
        return F.log_softmax(x, dim=1)

In [6]:
def load_data(batch_size=64):
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.1307,), (0.3081,))
    ])
    train_dataset = datasets.MNIST(root='./data', train=True, download=True, transform=transform)
    test_dataset = datasets.MNIST(root='./data', train=False, download=True, transform=transform)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=1000, shuffle=False)
    return train_loader, test_loader

In [7]:
def train(model, device, train_loader, optimizer, criterion, epoch):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()
        if batch_idx % 100 == 0:
            print(f'Train Epoch: {epoch} [{batch_idx * len(data)}/{len(train_loader.dataset)} '
                  f'({100. * batch_idx / len(train_loader):.0f}%)]\tLoss: {loss.item():.6f}')

In [8]:
def test(model, device, test_loader, criterion):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            test_loss += criterion(output, target).item()
            pred = output.argmax(dim=1, keepdim=True)
            correct += pred.eq(target.view_as(pred)).sum().item()
    test_loss /= len(test_loader.dataset)
    accuracy = 100. * correct / len(test_loader.dataset)
    print(f'\nTest set: Average loss: {test_loss:.4f}, '
          f'Accuracy: {correct}/{len(test_loader.dataset)} '
          f'({accuracy:.2f}%)\n')
    return accuracy

In [9]:
model_name = 'lenet5_quantized.pth'

In [10]:
def main():
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    if not torch.cuda.is_available():
        print("CUDA is not available. Running on CPU.")
    else:
        print(f"Running on GPU: {torch.cuda.get_device_name(0)}")
    
    model = QuantizedLeNet5().to(device)
    criterion = nn.CrossEntropyLoss().to(device)
    optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9)
    
    train_loader, test_loader = load_data()
    
    print(model)
    
    best_accuracy = 0
    for epoch in range(1, 11):
        train(model, device, train_loader, optimizer, criterion, epoch)
        accuracy = test(model, device, test_loader, criterion)
        if accuracy > best_accuracy:
            best_accuracy = accuracy
            torch.save(model.state_dict(), model_name)
    
    print(f"Best test accuracy: {best_accuracy:.2f}%")
    
    # Load the best model and save in ONNX format
    model = QuantizedLeNet5().to(device)
    model.load_state_dict(torch.load(model_name, map_location=device))
    model.eval()
    
    # save_onnx(model, device)

In [40]:
if __name__ == '__main__':
    main()

Running on GPU: NVIDIA GeForce RTX 3050 Laptop GPU
QuantizedLeNet5(
  (quant_inp): QuantIdentity(
    (input_quant): ActQuantProxyFromInjector(
      (_zero_hw_sentinel): StatelessBuffer()
    )
    (act_quant): ActQuantProxyFromInjector(
      (_zero_hw_sentinel): StatelessBuffer()
      (fused_activation_quant_proxy): FusedActivationQuantProxy(
        (activation_impl): Identity()
        (tensor_quant): RescalingIntQuant(
          (int_quant): IntQuant(
            (float_to_int_impl): RoundSte()
            (tensor_clamp_impl): TensorClamp()
            (delay_wrapper): DelayWrapper(
              (delay_impl): _NoDelay()
            )
          )
          (scaling_impl): ParameterFromRuntimeStatsScaling(
            (stats_input_view_shape_impl): OverTensorView()
            (stats): _Stats(
              (stats_impl): AbsPercentile()
            )
            (restrict_scaling): _RestrictValue(
              (restrict_value_impl): LogFloatRestrictValue(
                (power_

In [11]:
def load_model(model_path):
    model = QuantizedLeNet5()
    model.load_state_dict(torch.load(model_path, map_location=torch.device('cuda')))
    model.eval()
    return model

In [12]:
def inference(model, input_tensor):
    with torch.no_grad():
        output = model(input_tensor)
    return output

# Example usage
if __name__ == "__main__":
    # Load the model
    model_path = model_name
    model = load_model(model_path)

    # Create a dummy input (replace this with your actual input data)
    dummy_input = torch.randn(1, 1, 28, 28)  # Batch size 1, 1 channel, 28x28 image

    # Perform inference
    output = inference(model, dummy_input)

    # Process the output
    predicted_class = output.argmax(dim=1).item()
    print(f"Predicted class: {predicted_class}")
    print(f"Output probabilities: {torch.exp(output)}")

Predicted class: 4
Output probabilities: tensor([[0.0419, 0.1085, 0.0856, 0.0372, 0.3163, 0.0532, 0.0532, 0.1222, 0.0964,
         0.0856]])


  return super().rename(names)
  return F.avg_pool2d(input, self.kernel_size, self.stride,


In [13]:
def inspect_quantization(model):
    print("Quantization Inspection:")
    print("-----------------------")

    for name, module in model.named_modules():
        if isinstance(module, (QuantConv2d, QuantLinear, QuantReLU, QuantIdentity)):
            print(f"\nLayer: {name}")
            print(f"Type: {type(module).__name__}")
            
            # Check weight quantization
            if hasattr(module, 'weight'):
                weight = module.weight
                print(f"Weight shape: {weight.shape}")
                print(f"Weight dtype: {weight.dtype}")
                print(f"Weight range: [{weight.min().item():.4f}, {weight.max().item():.4f}]")
                print(f"Weight unique values: {torch.unique(weight).numel()}")
                
                if hasattr(module.weight_quant, 'bit_width'):
                    bit_width = module.weight_quant.bit_width
                    if callable(bit_width):
                        bit_width = bit_width()
                    print(f"Weight bit width: {bit_width}")
                else:
                    print("Weight bit width not found")
                
                if hasattr(module, 'weight_quant') and hasattr(module.weight_quant, 'scaling_impl'):
                    scaling_factor = module.weight_quant.scaling_impl.scaling_factor
                    if callable(scaling_factor):
                        scaling_factor = scaling_factor()
                    print(f"Weight scaling factor: {scaling_factor}")

            # Check bias quantization
            if hasattr(module, 'bias') and module.bias is not None:
                bias = module.bias
                print(f"Bias shape: {bias.shape}")
                print(f"Bias dtype: {bias.dtype}")
                print(f"Bias range: [{bias.min().item():.4f}, {bias.max().item():.4f}]")
                print(f"Bias unique values: {torch.unique(bias).numel()}")

            # Check activation quantization
            if hasattr(module, 'act_quant'):
                print(f"Activation quantization: {type(module.act_quant).__name__}")
                if hasattr(module.act_quant, 'bit_width'):
                    bit_width = module.act_quant.bit_width
                    if callable(bit_width):
                        bit_width = bit_width()
                    print(f"Activation bit width: {bit_width}")
                if hasattr(module.act_quant, 'scaling_impl'):
                    scaling_factor = module.act_quant.scaling_impl.scaling_factor
                    if callable(scaling_factor):
                        scaling_factor = scaling_factor()
                    print(f"Activation scaling factor: {scaling_factor}")

            # Check input quantization
            if hasattr(module, 'input_quant'):
                print(f"Input quantization: {type(module.input_quant).__name__}")
                if hasattr(module.input_quant, 'bit_width'):
                    bit_width = module.input_quant.bit_width
                    if callable(bit_width):
                        bit_width = bit_width()
                    print(f"Input bit width: {bit_width}")

            # Check output quantization
            if hasattr(module, 'output_quant'):
                print(f"Output quantization: {type(module.output_quant).__name__}")
                if hasattr(module.output_quant, 'bit_width'):
                    bit_width = module.output_quant.bit_width
                    if callable(bit_width):
                        bit_width = bit_width()
                    print(f"Output bit width: {bit_width}")

    print("\nOverall Model Statistics:")
    print("-------------------------")
    total_params = sum(p.numel() for p in model.parameters())
    print(f"Total parameters: {total_params}")
    
    quant_params = sum(p.numel() for name, p in model.named_parameters() if 'weight' in name or 'bias' in name)
    print(f"Quantized parameters: {quant_params}")
    
    print(f"Percentage of quantized parameters: {quant_params/total_params*100:.2f}%")

# Example usage
if __name__ == "__main__":
    model_path = model_name
    model = load_model(model_path)
    inspect_quantization(model)

Quantization Inspection:
-----------------------

Layer: quant_inp
Type: QuantIdentity
Activation quantization: ActQuantProxyFromInjector
Activation bit width: 8.0
Input quantization: ActQuantProxyFromInjector
Input bit width: None

Layer: conv1
Type: QuantConv2d
Weight shape: torch.Size([6, 1, 5, 5])
Weight dtype: torch.float32
Weight range: [-0.6313, 1.1670]
Weight unique values: 150
Weight bit width: 8.0
Bias shape: torch.Size([6])
Bias dtype: torch.float32
Bias range: [0.0897, 1.2325]
Bias unique values: 6
Input quantization: ActQuantProxyFromInjector
Input bit width: 8.0
Output quantization: ActQuantProxyFromInjector
Output bit width: 8.0

Layer: relu1
Type: QuantReLU
Activation quantization: ActQuantProxyFromInjector
Activation bit width: 8.0
Input quantization: ActQuantProxyFromInjector
Input bit width: None

Layer: conv2
Type: QuantConv2d
Weight shape: torch.Size([16, 6, 5, 5])
Weight dtype: torch.float32
Weight range: [-0.5481, 0.4840]
Weight unique values: 2400
Weight bit wid

In [14]:
def evaluate_mnist_model(model, test_loader, device):
    model.eval()
    all_preds = []
    all_targets = []
    all_probs = []
    correct = 0
    total = 0
    inference_times = []

    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            
            start_time = time.time()
            output = model(data)
            inference_time = time.time() - start_time
            inference_times.append(inference_time)

            probs = F.softmax(output, dim=1)
            _, predicted = torch.max(output.data, 1)
            total += target.size(0)
            correct += (predicted == target).sum().item()

            all_preds.extend(predicted.cpu().numpy())
            all_targets.extend(target.cpu().numpy())
            all_probs.extend(probs.cpu().numpy())

    accuracy = 100 * correct / total
    all_preds = np.array(all_preds)
    all_targets = np.array(all_targets)
    all_probs = np.array(all_probs)

    # Confusion Matrix
    conf_matrix = confusion_matrix(all_targets, all_preds)

    # Precision, Recall, F1-Score
    with warnings.catch_warnings():
        warnings.simplefilter("ignore")
        precision, recall, f1, _ = precision_recall_fscore_support(all_targets, all_preds, average='weighted', zero_division=0)

    # ROC AUC Score (one-vs-all)
    roc_auc = roc_auc_score(all_targets, all_probs, multi_class='ovr', average='weighted')

    # Top-3 Accuracy
    top3_preds = np.argsort(all_probs, axis=1)[:, -3:]
    top3_correct = np.any(top3_preds == all_targets.reshape(-1, 1), axis=1).sum()
    top3_accuracy = 100 * top3_correct / total

    # Inference Time
    avg_inference_time = sum(inference_times) / len(inference_times)

    # Model Size
    model_size = sum(p.numel() for p in model.parameters() if p.requires_grad)

    results = {
        'Accuracy': accuracy,
        'Confusion Matrix': conf_matrix,
        'Precision': precision,
        'Recall': recall,
        'F1-Score': f1,
        'ROC AUC Score': roc_auc,
        'Top-3 Accuracy': top3_accuracy,
        'Average Inference Time (s)': avg_inference_time,
        'Model Size (parameters)': model_size
    }

    return results

# Usage
# Main execution
if __name__ == "__main__":
    model_path = model_name
    batch_size = 64
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Load the model
    model = load_model(model_path)
    model.to(device)

    # Load the data
    train_loader, test_loader = load_data(batch_size)

    # Evaluate the model
    results = evaluate_mnist_model(model, test_loader, device)
    
    # Print results
    for metric, value in results.items():
        print(f"{metric}:")
        print(value)
        print()

Accuracy:
98.78

Confusion Matrix:
[[ 974    0    0    0    0    0    3    0    2    1]
 [   0 1128    1    1    0    0    3    0    2    0]
 [   2    0 1020    3    2    0    0    3    1    1]
 [   0    0    0 1006    0    3    0    0    1    0]
 [   0    0    1    0  973    0    3    1    0    4]
 [   1    0    0    6    0  879    3    1    1    1]
 [   3    2    0    0    1    1  951    0    0    0]
 [   1    6    5    1    0    0    0 1013    1    1]
 [   3    0    1    6    1    2    1    2  952    6]
 [   1    3    0    4    5    3    0   10    1  982]]

Precision:
0.9878179856875806

Recall:
0.9878

F1-Score:
0.987790735762089

ROC AUC Score:
0.9998990395824485

Top-3 Accuracy:
99.93

Average Inference Time (s):
0.07031493186950684

Model Size (parameters):
61722



In [15]:
def save_onnx(model, device, onnx_path='quantized_lenet5.onnx'):
    model.eval()
    
    # Create a dummy input tensor
    dummy_input = torch.randn(1, 1, 28, 28).to(device)
    
    # Custom export
    torch.onnx.export(model,
                      dummy_input,
                      onnx_path,
                      export_params=True,
                      opset_version=11,
                      do_constant_folding=True,
                      input_names=['input'],
                      output_names=['output'],
                      dynamic_axes={'input' : {0 : 'batch_size'},
                                    'output' : {0 : 'batch_size'}},
                      custom_opsets={"brevitas": 1},  # Add this line
                      keep_initializers_as_inputs=None,  # Add this line
                      )
    
    print(f"Model saved to {onnx_path}")

    # Verify the exported model
    import onnx
    onnx_model = onnx.load(onnx_path)
    onnx.checker.check_model(onnx_model)
    print("ONNX model is valid")

In [19]:
from brevitas.export import export_onnx_qcdq
model_path = model_name
model = load_model(model_path)
dummy_input = torch.randn(1, 1, 28, 28)
export_onnx_qcdq(model, dummy_input, export_path='quantized_lenet5_4bit.onnx')

