# <center> MobileNetV2 x1.4 Quantization </center>

Here we quantize our pre-trained , 3 times adversarial trained MobileNetV2 model.

### Libraries and Functions

In [None]:
import numpy as np
import cv2
import os
import pickle
import torch.nn as nn
import torch
import torchvision
import torchvision.transforms as transforms
from torchvision import datasets
import time
from math import log10, sqrt
from torch.utils.data import DataLoader
from art.utils import load_cifar10
import random
import urllib
from torch.quantization import MovingAverageMinMaxObserver
from torch.ao.quantization.observer import MinMaxObserver
from torch.quantization import QuantStub, DeQuantStub
import torchvision.transforms as transforms
import torchvision.transforms.functional as F
torch.manual_seed(0)
torch.cuda.manual_seed(0)
np.random.seed(0)
random.seed(0)

def test(model: nn.Module, dataloader: DataLoader, cuda=False) -> float:
    correct = 0
    total = 0
    model.eval()
    with torch.no_grad():
        for data in dataloader:
            inputs, labels = data
            if cuda:
              inputs = inputs.cuda()
              labels = labels.cuda()
            outputs = model(inputs)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    return 100 * correct / total

def normalize_np(img):
  img = torch.from_numpy(img)
  img = F.normalize(img, [0.4914, 0.4822, 0.4465], [0.2023, 0.1994, 0.2010])
  return img.numpy()

def custom_collate(batch):
        # Combine a list of samples into a batch
        data, labels = zip(*batch)
        data = torch.stack(data)
        labels = torch.tensor(labels, dtype=torch.long)
        return data, labels

def evaluator(model, loader):
  model.eval()
  top_1 = 0
  top_5 = 0
  with torch.no_grad():
    for data in loader:
      inputs, labels = data
      outputs = model(inputs)

      _, predicted = torch.max(outputs, 1, keepdim=True)
      top_1 += torch.sum(predicted.view(-1) == labels).item()

      _, predicted_5 = torch.topk(outputs, k=5, dim=1)
      top_5 += torch.sum(predicted_5 == labels.unsqueeze(1)).item()

  return ("{:.2f}".format((top_1/400) * 100), "{:.2f}".format((top_5/400) * 100))

def evaluator_testset(model, loader):
  model.eval()
  top_1 = 0
  top_5 = 0
  with torch.no_grad():
    for data in loader:
      inputs, labels = data
      outputs = model(inputs)

      _, predicted = torch.max(outputs, 1, keepdim=True)
      top_1 += torch.sum(predicted.view(-1) == labels).item()

      _, predicted_5 = torch.topk(outputs, k=5, dim=1)
      top_5 += torch.sum(predicted_5 == labels.unsqueeze(1)).item()

  return ("{:.2f}".format((top_1/10000) * 100), "{:.2f}".format((top_5/10000) * 100))





### Dataset Preparation

In [None]:
(x_train, y_train), (x_test, y_test), min_pixel_value, max_pixel_value = load_cifar10()

x_train = np.transpose(x_train, (0, 3, 1, 2)).astype(np.float32)
x_test = np.transpose(x_test, (0, 3, 1, 2)).astype(np.float32)

classes_cifar = ['airplane', 'automobile', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck']

y_test_set = np.zeros((400,),np.int8)

y_train_set = np.zeros((400,),np.int8)


for i in range(400):
        y_test_set[i] = np.where(y_test[i] == 1)[0][0]

for i in range(400):
        y_train_set[i] = np.where(y_train[i] == 1)[0][0]

transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize([0.4914, 0.4822, 0.4465], [0.2023, 0.1994, 0.2010])])
testset = torchvision.datasets.CIFAR10(root='./data', train=False,download=True, transform=transform)
testloader = torch.utils.data.DataLoader(testset, batch_size=64,shuffle=False, num_workers=2, pin_memory=True)

### Model Definition

Here we change += operations with the float functional equivalent for VitisAI support.

In [None]:
from functools import partial
from typing import Dict, Type, Any, Callable, Union, List, Optional
from torch import Tensor
try:
    from torch.hub import load_state_dict_from_url
except ImportError:
    from torch.utils.model_zoo import load_url as load_state_dict_from_url
from torch.ao.nn.quantized.modules.functional_modules import FloatFunctional

def _make_divisible(v: float, divisor: int, min_value: Optional[int] = None) -> int:
    """
    This function is taken from the original tf repo.
    It ensures that all layers have a channel number that is divisible by 8
    It can be seen here:
    https://github.com/tensorflow/models/blob/master/research/slim/nets/mobilenet/mobilenet.py
    """
    if min_value is None:
        min_value = divisor
    new_v = max(min_value, int(v + divisor / 2) // divisor * divisor)
    # Make sure that round down does not go down by more than 10%.
    if new_v < 0.9 * v:
        f_add = torch.nn.quantized.FloatFunctional()
        new_v = f_add.add(new_v, divisor)
        #new_v += divisor
    return new_v


class ConvBNActivation(nn.Sequential):
    def __init__(
        self,
        in_planes: int,
        out_planes: int,
        kernel_size: int = 3,
        stride: int = 1,
        groups: int = 1,
        norm_layer: Optional[Callable[..., nn.Module]] = None,
        activation_layer: Optional[Callable[..., nn.Module]] = None,
        dilation: int = 1,
    ) -> None:
        padding = (kernel_size - 1) // 2 * dilation
        if norm_layer is None:
            norm_layer = nn.BatchNorm2d
        if activation_layer is None:
            activation_layer = nn.ReLU6
        super(ConvBNReLU, self).__init__(
            nn.Conv2d(in_planes, out_planes, kernel_size, stride, padding, dilation=dilation, groups=groups,
                      bias=False),
            norm_layer(out_planes),
            activation_layer(inplace=True)
        )
        self.out_channels = out_planes


# necessary for backwards compatibility
ConvBNReLU = ConvBNActivation

class InvertedResidual(nn.Module):
    def __init__(
        self,
        inp: int,
        oup: int,
        stride: int,
        expand_ratio: int,
        norm_layer: Optional[Callable[..., nn.Module]] = None
    ) -> None:
        super(InvertedResidual, self).__init__()
        self.stride = stride
        assert stride in [1, 2]

        if norm_layer is None:
            norm_layer = nn.BatchNorm2d

        hidden_dim = int(round(inp * expand_ratio))
        self.use_res_connect = self.stride == 1 and inp == oup

        layers: List[nn.Module] = []
        if expand_ratio != 1:
            # pw
            layers.append(ConvBNReLU(inp, hidden_dim, kernel_size=1, norm_layer=norm_layer))
        layers.extend([
            # dw
            ConvBNReLU(hidden_dim, hidden_dim, stride=stride, groups=hidden_dim, norm_layer=norm_layer),
            # pw-linear
            nn.Conv2d(hidden_dim, oup, 1, 1, 0, bias=False),
            norm_layer(oup),
        ])
        self.conv = nn.Sequential(*layers)
        self.out_channels = oup
        self._is_cn = stride > 1
        self.ff = torch.nn.quantized.FloatFunctional()

    def forward(self, x: Tensor) -> Tensor:
        if self.use_res_connect:
            #return x + self.conv(x)
            return self.ff.add(x, self.conv(x))
        else:
            return self.conv(x)


class MobileNetV2(nn.Module):
    def __init__(
        self,
        num_classes: int = 10,
        width_mult: float = 1.0,
        inverted_residual_setting: Optional[List[List[int]]] = None,
        round_nearest: int = 8,
        block: Optional[Callable[..., nn.Module]] = None,
        norm_layer: Optional[Callable[..., nn.Module]] = None
    ) -> None:
        """
        Args:
            num_classes (int): Number of classes
            width_mult (float): Width multiplier - adjusts number of channels in each layer by this amount
            inverted_residual_setting: Network structure
            round_nearest (int): Round the number of channels in each layer to be a multiple of this number
            Set to 1 to turn off rounding
            block: Module specifying inverted residual building block for mobilenet
            norm_layer: Module specifying the normalization layer to use

        """
        super(MobileNetV2, self).__init__()

        if block is None:
            block = InvertedResidual

        if norm_layer is None:
            norm_layer = nn.BatchNorm2d

        input_channel = 32
        last_channel = 1280

        if inverted_residual_setting is None:
            inverted_residual_setting = [
                # t, c, n, s
                [1, 16, 1, 1],
                [6, 24, 2, 1],  # NOTE: change stride 2 -> 1 for CIFAR10/100
                [6, 32, 3, 2],
                [6, 64, 4, 2],
                [6, 96, 3, 1],
                [6, 160, 3, 2],
                [6, 320, 1, 1],
            ]

        # only check the first element, assuming user knows t,c,n,s are required
        if len(inverted_residual_setting) == 0 or len(inverted_residual_setting[0]) != 4:
            raise ValueError("inverted_residual_setting should be non-empty "
                             "or a 4-element list, got {}".format(inverted_residual_setting))

        # building first layer
        input_channel = _make_divisible(input_channel * width_mult, round_nearest)
        self.last_channel = _make_divisible(last_channel * max(1.0, width_mult), round_nearest)
        features: List[nn.Module] = [ConvBNReLU(3, input_channel, stride=1, norm_layer=norm_layer)]  # NOTE: change stride 2 -> 1 for CIFAR10/100
        # building inverted residual blocks
        for t, c, n, s in inverted_residual_setting:
            output_channel = _make_divisible(c * width_mult, round_nearest)
            for i in range(n):
                stride = s if i == 0 else 1
                features.append(block(input_channel, output_channel, stride, expand_ratio=t, norm_layer=norm_layer))
                input_channel = output_channel
        # building last several layers
        features.append(ConvBNReLU(input_channel, self.last_channel, kernel_size=1, norm_layer=norm_layer))
        # make it nn.Sequential
        self.features = nn.Sequential(*features)

        # building classifier
        self.classifier = nn.Sequential(
            nn.Dropout(0.2),
            nn.Linear(self.last_channel, num_classes),
        )

        # weight initialization
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out')
                if m.bias is not None:
                    nn.init.zeros_(m.bias)
            elif isinstance(m, (nn.BatchNorm2d, nn.GroupNorm)):
                nn.init.ones_(m.weight)
                nn.init.zeros_(m.bias)
            elif isinstance(m, nn.Linear):
                nn.init.normal_(m.weight, 0, 0.01)
                nn.init.zeros_(m.bias)

    def _forward_impl(self, x: Tensor) -> Tensor:
        # This exists since TorchScript doesn't support inheritance, so the superclass method
        # (this one) needs to have a name other than `forward` that can be accessed in a subclass
        x = self.features(x)
        # Cannot use "squeeze" as batch-size can be 1
        x = nn.functional.adaptive_avg_pool2d(x, (1, 1))
        x = torch.flatten(x, 1)
        x = self.classifier(x)
        return x

    def forward(self, x: Tensor) -> Tensor:
        return self._forward_impl(x)


### Model State

In [None]:
"""REMINDER: Load the retrained model, not the initial."""


adversarial_state1 = urllib.request.urlopen("https://drive.usercontent.google.com/download?id=1umVJExm8VeVSqIpEWlCVhMbJrNZeU-aw&export=download&confirm=t&uuid=0")

"""
If you want to manually load the file or load your own model version, use the code below updating your path accordingly

# Adversarial Model 1 https://drive.google.com/file/d/1umVJExm8VeVSqIpEWlCVhMbJrNZeU-aw/view?usp=sharing
"""


#path = "~/Downloads/"

#adversarial_state1 = os.path.join(path, "MobileNet_1it_CIFAR10_94.17acc.pkl")


adversarial_state2 = urllib.request.urlopen("https://drive.usercontent.google.com/download?id=12Ux4pWLxK4gTr54eMN-Mds4adPMvjfNX&export=download&confirm=t&uuid=0")

"""
If you want to manually load the file or load your own model version, use the code below updating your path accordingly

# Adversarial Model 2 https://drive.google.com/file/d/12Ux4pWLxK4gTr54eMN-Mds4adPMvjfNX/view?usp=sharing
"""


#path = "~/Downloads/"

#adversarial_state2 = os.path.join(path, "MobileNet_2it_CIFAR10_93.58acc.pkl")



adversarial_state3 = urllib.request.urlopen("https://drive.usercontent.google.com/download?id=19Nxyh3nIwR0KdXv_ZBk8Ur-DtQzuuct3&export=download&confirm=t&uuid=0")

"""
If you want to manually load the file or load your own model version, use the code below updating your path accordingly

# Adversarial Model 3 https://drive.google.com/file/d/19Nxyh3nIwR0KdXv_ZBk8Ur-DtQzuuct3/view?usp=sharing
"""


#path = "~/Downloads/"

#adversarial_state3 = os.path.join(path, "MobileNet_3it_CIFAR10_92.79acc.pkl")


### Quantization

#### First Adversarial Training Iteration

In [None]:
mobilenet_fp_model = MobileNetV2(width_mult = 1.4)

mobilenet_fp_model.load_state_dict(torch.load(adversarial_state1,map_location=torch.device('cpu')))


mobilenet_model = MobileNetV2(width_mult = 1.4)

mobilenet_model.load_state_dict(torch.load(adversarial_state1,map_location=torch.device('cpu')))

mobilenet_model = torch.quantization.QuantWrapper(mobilenet_model)
B=8
mobilenet_model.qconfig = torch.quantization.QConfig(activation= MovingAverageMinMaxObserver.with_args(quant_min=0, quant_max=int(2 ** B - 1), dtype=torch.quint8,
                                                              qscheme=torch.per_tensor_affine, reduce_range=False),
                                                     weight= MovingAverageMinMaxObserver.with_args(quant_min=int(-(2 ** B) / 2), quant_max=int((2 ** B) / 2 - 1), dtype=torch.qint8,
                                                              qscheme=torch.per_tensor_symmetric, reduce_range=False))
torch.quantization.prepare(mobilenet_model, inplace=True)

mobilenet_model.to("cpu")
test(mobilenet_model, testloader, cuda=False)
mobilenet_model.to("cpu")

torch.quantization.convert(mobilenet_model, inplace=True)

top1 , top5 = evaluator_testset(mobilenet_fp_model, testloader)
print('Adveraral Training 1: Initial accuracy on test images: Top-1: {}%  Top-5: {}% - FP32'.format(top1, top5))

top1, top5 = evaluator_testset(mobilenet_model, testloader)
print('Adveraral Training 1 Quantized: accuracy on test images: Top-1: {}%  Top-5: {}% - INT8'.format(top1, top5))

torch.save(mobilenet_model.state_dict(), "MobileNetV2_adv1_quant_CIF10.pkl")

#### Second Adversarial Training Iteration

In [None]:
mobilenet_fp_model = MobileNetV2(width_mult = 1.4)

mobilenet_fp_model.load_state_dict(torch.load(adversarial_state2,map_location=torch.device('cpu')))


mobilenet_model = MobileNetV2(width_mult = 1.4)

mobilenet_model.load_state_dict(torch.load(adversarial_state2,map_location=torch.device('cpu')))

mobilenet_model = torch.quantization.QuantWrapper(mobilenet_model)
B=8
mobilenet_model.qconfig = torch.quantization.QConfig(activation= MovingAverageMinMaxObserver.with_args(quant_min=0, quant_max=int(2 ** B - 1), dtype=torch.quint8,
                                                              qscheme=torch.per_tensor_affine, reduce_range=False),
                                                     weight= MovingAverageMinMaxObserver.with_args(quant_min=int(-(2 ** B) / 2), quant_max=int((2 ** B) / 2 - 1), dtype=torch.qint8,
                                                              qscheme=torch.per_tensor_symmetric, reduce_range=False))
torch.quantization.prepare(mobilenet_model, inplace=True)

mobilenet_model.to("cpu")
test(mobilenet_model, testloader, cuda=False)
mobilenet_model.to("cpu")

torch.quantization.convert(mobilenet_model, inplace=True)

top1 , top5 = evaluator_testset(mobilenet_fp_model, testloader)
print('Adveraral Training 2: Initial accuracy on test images: Top-1: {}%  Top-5: {}% - FP32'.format(top1, top5))

top1, top5 = evaluator_testset(mobilenet_model, testloader)
print('Adveraral Training 2 Quantized: accuracy on test images: Top-1: {}%  Top-5: {}% - INT8'.format(top1, top5))


torch.save(mobilenet_model.state_dict(), "MobileNetV2_adv2_quant_CIF10.pkl")

#### Third Adversarial Training Iteration

In [None]:
mobilenet_fp_model = MobileNetV2(width_mult = 1.4)

mobilenet_fp_model.load_state_dict(torch.load(adversarial_state3,map_location=torch.device('cpu')))


mobilenet_model = MobileNetV2(width_mult = 1.4)

mobilenet_model.load_state_dict(torch.load(adversarial_state3,map_location=torch.device('cpu')))

mobilenet_model = torch.quantization.QuantWrapper(mobilenet_model)
B=8
mobilenet_model.qconfig = torch.quantization.QConfig(activation= MovingAverageMinMaxObserver.with_args(quant_min=0, quant_max=int(2 ** B - 1), dtype=torch.quint8,
                                                              qscheme=torch.per_tensor_affine, reduce_range=False),
                                                     weight= MovingAverageMinMaxObserver.with_args(quant_min=int(-(2 ** B) / 2), quant_max=int((2 ** B) / 2 - 1), dtype=torch.qint8,
                                                              qscheme=torch.per_tensor_symmetric, reduce_range=False))
torch.quantization.prepare(mobilenet_model, inplace=True)

mobilenet_model.to("cpu")
test(mobilenet_model, testloader, cuda=False)
mobilenet_model.to("cpu")

torch.quantization.convert(mobilenet_model, inplace=True)

top1 , top5 = evaluator_testset(mobilenet_fp_model, testloader)
print('Adveraral Training 3: Initial accuracy on test images: Top-1: {}%  Top-5: {}% - FP32'.format(top1, top5))

top1, top5 = evaluator_testset(mobilenet_model, testloader)
print('Adveraral Training 3 Quantized: accuracy on test images: Top-1: {}%  Top-5: {}% - INT8'.format(top1, top5))

torch.save(mobilenet_model.state_dict(), "MobileNetV2_adv3_quant_CIF10.pkl")