## Studio preliminare su MobileNetV2
Inizialmente si era tentato di utilizzare MobileNetV2 e provare a quantizzare la rete per poi effettuare un confronto finale sui vari metodi di quantizzazione o meno.
Tuttavia dopo aver addestrato con successo la rete e quantizzato i modelli nei tre modi possibili (dinamico, statico, aware training) le performance delle reti risultavano essere le medesime e quindi si è scartata questa opzione andando ad impiegare il modello MobileNetV3 presente nel file train.ipynb

In [None]:
import os
import sys
import time
import numpy as np

import torch
import torch.nn as nn
from torch.utils.data import DataLoader

import torchvision
from torchvision import datasets
import torchvision.transforms as transforms


# Specify random seed for repeatable results
torch.manual_seed(191009)

In [None]:
from torch.quantization import QuantStub, DeQuantStub

def _make_divisible(v, divisor, min_value=None):
    """
    This function is taken from the original tf repo.
    It ensures that all layers have a channel number that is divisible by 8
    It can be seen here:
    https://github.com/tensorflow/models/blob/master/research/slim/nets/mobilenet/mobilenet.py
    :param v:
    :param divisor:
    :param min_value:
    :return:
    """
    if min_value is None:
        min_value = divisor
    new_v = max(min_value, int(v + divisor / 2) // divisor * divisor)
    # Make sure that round down does not go down by more than 10%.
    if new_v < 0.9 * v:
        new_v += divisor
    return new_v


class ConvBNReLU(nn.Sequential):
    def __init__(self, in_planes, out_planes, kernel_size=3, stride=1, groups=1):
        padding = (kernel_size - 1) // 2
        super(ConvBNReLU, self).__init__(
            nn.Conv2d(in_planes, out_planes, kernel_size, stride, padding, groups=groups, bias=False),
            nn.BatchNorm2d(out_planes, momentum=0.1),
            # Replace with ReLU
            nn.ReLU(inplace=False)
        )


class InvertedResidual(nn.Module):
    def __init__(self, inp, oup, stride, expand_ratio):
        super(InvertedResidual, self).__init__()
        self.stride = stride
        assert stride in [1, 2]

        hidden_dim = int(round(inp * expand_ratio))
        self.use_res_connect = self.stride == 1 and inp == oup

        layers = []
        if expand_ratio != 1:
            layers.append(ConvBNReLU(inp, hidden_dim, kernel_size=1))
        layers.extend([
            ConvBNReLU(hidden_dim, hidden_dim, stride=stride, groups=hidden_dim),
            nn.Conv2d(hidden_dim, oup, 1, 1, 0, bias=False),
            nn.BatchNorm2d(oup, momentum=0.1),
        ])
        self.conv = nn.Sequential(*layers)
        # Replace torch.add with floatfunctional
        self.skip_add = nn.quantized.FloatFunctional()

    def forward(self, x):
        if self.use_res_connect:
            return self.skip_add.add(x, self.conv(x))
        else:
            return self.conv(x)


class MobileNetV2(nn.Module):
    def __init__(self, num_classes=1000, width_mult=1.0, inverted_residual_setting=None, round_nearest=8):
        """
        MobileNet V2 main class
        Args:
            num_classes (int): Number of classes
            width_mult (float): Width multiplier - adjusts number of channels in each layer by this amount
            inverted_residual_setting: Network structure
            round_nearest (int): Round the number of channels in each layer to be a multiple of this number
            Set to 1 to turn off rounding
        """
        super(MobileNetV2, self).__init__()
        block = InvertedResidual
        input_channel = 32
        last_channel = 1280

        if inverted_residual_setting is None:
            inverted_residual_setting = [
                # t, c, n, s
                [1, 16, 1, 1],
                [6, 24, 2, 2],
                [6, 32, 3, 2],
                [6, 64, 4, 2],
                [6, 96, 3, 1],
                [6, 160, 3, 2],
                [6, 320, 1, 1],
            ]

        # only check the first element, assuming user knows t,c,n,s are required
        if len(inverted_residual_setting) == 0 or len(inverted_residual_setting[0]) != 4:
            raise ValueError("inverted_residual_setting should be non-empty "
                             "or a 4-element list, got {}".format(inverted_residual_setting))

        # building first layer
        input_channel = _make_divisible(input_channel * width_mult, round_nearest)
        self.last_channel = _make_divisible(last_channel * max(1.0, width_mult), round_nearest)
        features = [ConvBNReLU(3, input_channel, stride=2)]
        # building inverted residual blocks
        for t, c, n, s in inverted_residual_setting:
            output_channel = _make_divisible(c * width_mult, round_nearest)
            for i in range(n):
                stride = s if i == 0 else 1
                features.append(block(input_channel, output_channel, stride, expand_ratio=t))
                input_channel = output_channel
        # building last several layers
        features.append(ConvBNReLU(input_channel, self.last_channel, kernel_size=1))
        # make it nn.Sequential
        self.features = nn.Sequential(*features)
        self.quant = QuantStub()
        self.dequant = DeQuantStub()
        # building classifier
        self.classifier = nn.Sequential(
            nn.Dropout(0.2),
            nn.Linear(self.last_channel, num_classes),
        )

        # weight initialization
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out')
                if m.bias is not None:
                    nn.init.zeros_(m.bias)
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.ones_(m.weight)
                nn.init.zeros_(m.bias)
            elif isinstance(m, nn.Linear):
                nn.init.normal_(m.weight, 0, 0.01)
                nn.init.zeros_(m.bias)

    def forward(self, x):
        x = self.quant(x)
        x = self.features(x)
        x = x.mean([2, 3])
        x = self.classifier(x)
        x = self.dequant(x)
        return x

    # Fuse Conv+BN and Conv+BN+Relu modules prior to quantization
    def fuse_model(self):
        fuse_modules = torch.quantization.fuse_modules
        for m in self.modules():
            if type(m) == ConvBNReLU:
                fuse_modules(m, ['0', '1', '2'], inplace=True)
            if type(m) == InvertedResidual:
                for idx in range(len(m.conv)):
                    if type(m.conv[idx]) == nn.Conv2d:
                        fuse_modules(m.conv, [str(idx), str(idx + 1)], inplace=True)

In [None]:
def load_model(model_file):
    model = MobileNetV2()
    state_dict = torch.load(model_file)
    model.load_state_dict(state_dict)
    for param in model.features.parameters():
        param.requires_grad = False

    model.classifier = nn.Sequential(
        nn.Linear(1280, 512),
        nn.ReLU(),
        nn.Dropout(0.2),
        nn.Linear(512, 2)
    )

    model.to('cpu')
    return model


def print_size_of_model(model):
    torch.save(model.state_dict(), "temp.p")
    print('Size (MB):', os.path.getsize("temp.p") / 1e6)
    os.remove('temp.p')


def load_model(model_file):
    model = MobileNetV2()
    state_dict = torch.load(model_file)
    model.load_state_dict(state_dict)

    #freeze all the other layers
    for param in model.features.parameters():
        param.requires_grad = False

    model.classifier = nn.Sequential(
        nn.Linear(1280, 512),
        nn.ReLU(),
        nn.Dropout(0.2),
        nn.Linear(512, 2)
    )

    model.to('cpu')
    return model


def train_model(model, num_epochs=3):
    device = 'cpu'    
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(),lr=1e-4)
    for epoch in range(num_epochs):
        print('Epoch {}/{}'.format(epoch+1, num_epochs))
        print('-' * 10)
        model.to(device)
        
        for phase in ['train', 'validation']:
            if phase == 'train':
                model.train()
            else:
                model.eval()

            running_loss = 0.0
            running_corrects = 0

            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(torch.float32).to(device)
                labels = labels.to(device)

                outputs = model(inputs)
                loss = criterion(outputs, labels)

                if phase == 'train':
                    optimizer.zero_grad()
                    loss.backward()
                    optimizer.step()

                _, preds = torch.max(outputs, 1)
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

            epoch_loss = running_loss / len(image_datasets[phase])
            epoch_acc = running_corrects.double() / len(image_datasets[phase])

            print('{} loss: {:.4f}, acc: {:.4f}'.format(phase,
                                                        epoch_loss,
                                                        epoch_acc))
    return model

In [None]:
from torch.utils.data import ConcatDataset

saved_model_dir = 'models\\'
float_model_file = 'mobilenet_pretrained_float.pth'
scripted_float_model_file = 'mobilenet_quantization_scripted.pth'
scripted_quantized_model_file = 'mobilenet_quantization_scripted_quantized.pth'

train_batch_size = 32
eval_batch_size = 32

normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                 std=[0.229, 0.224, 0.225])
input_path = 'data\\'
augmented_path1 = 'data_aug\\'
augmented_path2 = 'data_aug2\\'
data_transforms = {
    'train':
    transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.RandomAffine(0, shear=10, scale=(0.8, 1.2)),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        normalize
    ]),
    'validation':
    transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        normalize
    ]),
    'test':
    transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        normalize
    ]),
}

image_datasets = {
    'train': datasets.ImageFolder(input_path + 'train', data_transforms['train']),
    'validation': datasets.ImageFolder(input_path + 'validation', data_transforms['validation']),
    'test': datasets.ImageFolder(input_path + 'test', data_transforms['test'])
}


augmented1_datasets = {
    'train': datasets.ImageFolder(os.path.join(augmented_path1, 'train'), data_transforms['train']),
    'validation': datasets.ImageFolder(os.path.join(augmented_path1, 'validation'), data_transforms['validation']),
    'test': datasets.ImageFolder(os.path.join(augmented_path1, 'test'), data_transforms['test'])
}

augmented2_datasets = {
    'train': datasets.ImageFolder(os.path.join(augmented_path2, 'train'), data_transforms['train']),
    'validation': datasets.ImageFolder(os.path.join(augmented_path2, 'validation'), data_transforms['validation']),
    'test': datasets.ImageFolder(os.path.join(augmented_path2, 'test'), data_transforms['test'])
}

# Combina i dataset originali e aumentati
combined_datasets = {
    'train': ConcatDataset([image_datasets['train'], augmented1_datasets['train'], augmented2_datasets['train']]),
    'validation': ConcatDataset([image_datasets['validation'], augmented1_datasets['validation'], augmented2_datasets['validation']]),
    'test': ConcatDataset([image_datasets['test'], augmented1_datasets['test'], augmented2_datasets['test']])
}

dataloaders = {
    'train': torch.utils.data.DataLoader(combined_datasets['train'], batch_size=32, shuffle=True, num_workers=0),
    'validation': torch.utils.data.DataLoader(combined_datasets['validation'], batch_size=32, shuffle=False, num_workers=0),
    'test': torch.utils.data.DataLoader(combined_datasets['test'], batch_size=1, shuffle=False, num_workers=0)
}

data_loader, data_loader_test = dataloaders['train'],dataloaders['validation']
criterion = nn.CrossEntropyLoss()
float_model = load_model(saved_model_dir + float_model_file).to('cpu')


In [None]:
print("Size of baseline model")
print_size_of_model(float_model)

model_trained = train_model(float_model, num_epochs=9)

torch.save(model_trained.to('cpu').state_dict(), 'model.pt')
torch.jit.save(torch.jit.script(model_trained.to('cpu')), 'float_script.pt')

In [None]:
from torchvision import datasets, models, transforms

x = models.quantization.mobilenet_v3_large(pretrained=True)
x.classifier = nn.Sequential(
    nn.Linear(960, 512),
    nn.ReLU(),
    nn.Dropout(0.5),
    nn.Linear(512, 2)
)

print_size_of_model(x.to('cpu'))

In [None]:

myModel = x.to('cpu')
myModel.eval()

myModel.fuse_model()

myModel.qconfig = torch.quantization.get_default_qconfig('x86')
print(myModel.qconfig)
torch.quantization.prepare(myModel)

#calibration
for inputs, labels in dataloaders['validation']:
    myModel(inputs)
print('Post Training Quantization: Calibration done')

torch.quantization.convert(myModel.to('cpu'))

print("Size of model after quantization")
print_size_of_model(myModel.to('cpu'))
torch.jit.save(torch.jit.script(model_trained.to('cpu')), 'staticq_script.pt')

In [None]:
x = MobileNetV2()

x.classifier = nn.Sequential(
    nn.Linear(1280, 512),
    nn.ReLU(),
    nn.Dropout(0.2),
    nn.Linear(512, 2)
)

x.to('cpu')
x.load_state_dict(torch.load('model.pt'))

In [None]:
qat_model = x.to('cpu')
qat_model.fuse_model()

qat_model.qconfig = torch.quantization.get_default_qat_qconfig('x86')

torch.quantization.prepare_qat(qat_model)

qat_model = train_model(qat_model,3)

qat_model.eval()
torch.quantization.convert(qat_model.to('cpu'))
torch.jit.save(torch.jit.script(qat_model.to('cpu')), 'qat_script.pt')

In [None]:
import torch
import torch.nn.functional as F
from sklearn.metrics import accuracy_score


def test_models(model_dynamicq, model_staticq, model_qat, test_dataloader):
    models = [model_dynamicq, model_staticq, model_qat]
    model_names = ['Automatic Optimization', 'Static Quantization', 'Quantization Aware Training']

    accuracies = []
    inference_times = []

    for model, model_name in zip(models, model_names):
        model.eval()
        model.to('cpu')
        correct = 0
        total = 0

        i = 0
        avg_time = 0
        for inputs, labels in test_dataloader:
            inputs, labels = inputs.to('cpu'), labels.to('cpu')

            start_time = time.time()

            # Run inference
            outputs = model(inputs)

            end_time = time.time()

            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            i += len(inputs)
            avg_time += (end_time - start_time)

            # Calculate average inference time for every 10 images
            if i % 30 == 0:
                inference_time = avg_time / 10 * 1e3  # Convert to microseconds
                inference_times.append(inference_time)
                avg_time = 0  # Reset average time

        # Calculate accuracy for the entire test set
        accuracy = correct / total
        accuracies.append(accuracy)

        # Check if inference_times is not empty before calculating the average
        if inference_times:
            print(f"Accuracy of {model_name} on the test set: {accuracy:.4f}")
            print(f" - Average Inference Time: {sum(inference_times)/len(inference_times):.2f} milliseconds per 10 images\n")

    return accuracies, inference_times


# Replace these with your actual models
model_automatic_optimization = torch.jit.load('optimized_model.pt')
model_staticq_path = torch.jit.load('script.pt')
model_qat_path = torch.jit.load('model_qat.pt')


accuracies, inference_times = test_models(model_automatic_optimization, model_staticq_path, model_qat_path, dataloaders['test'])

