### Cifar10 dataset loader


In [None]:
import torch
from torch.utils.data import DataLoader
from torchvision import datasets, transforms


def get_loaders(
    source, batch_size, transform, eval_transform=None, root="data", split_ratio=0.1
):
    if eval_transform is None:
        eval_transform = transform

    trainset = source(
        root=root,
        train=True,
        download=True,
        transform=transform,
    )
    testset = source(
        root=root,
        train=False,
        download=True,
        transform=eval_transform,
    )

    trainset, valset = torch.utils.data.random_split(
        trainset,
        [int((1 - split_ratio) * len(trainset)), int(split_ratio * len(trainset))],
    )

    trainloader = DataLoader(trainset, batch_size=batch_size, shuffle=True)
    valloader = DataLoader(valset, batch_size=batch_size, shuffle=True)
    testloader = DataLoader(testset, batch_size=batch_size, shuffle=False)
    return trainloader, valloader, testloader


def get_cifar10_loaders(batch_size, root="data/cifar10", split_ratio=0.1):
    transform = transforms.Compose(
        [
            transforms.RandomCrop(32, padding=4),
            transforms.RandomHorizontalFlip(),
            transforms.ToTensor(),
            transforms.Normalize((0.4914, 0.4822, 0.4465), (0.247, 0.243, 0.261)),
        ]
    )
    eval_transform = transforms.Compose(
        [
            transforms.ToTensor(),
            transforms.Normalize((0.4914, 0.4822, 0.4465), (0.247, 0.243, 0.261)),
        ]
    )
    return get_loaders(
        datasets.CIFAR10,
        batch_size,
        transform,
        eval_transform=eval_transform,
        root=root,
        split_ratio=split_ratio,
    )


DATALOADERS = {
    "cifar10": get_cifar10_loaders,
}

if __name__ == "__main__":
    datasets_to_load = ["cifar10"]
    for dataset in datasets_to_load:
        trainloader, valloader, testloader = DATALOADERS[dataset](batch_size=64)
        print(f'{dataset}: {len(trainloader.dataset)}, {len(valloader.dataset)}, {len(testloader.dataset)}, {trainloader.dataset[0][0].shape}')



Downloading https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz to data/cifar10/cifar-10-python.tar.gz


100%|██████████| 170M/170M [00:12<00:00, 13.1MB/s]


Extracting data/cifar10/cifar-10-python.tar.gz to data/cifar10
Files already downloaded and verified
cifar10: 45000, 5000, 10000, torch.Size([3, 32, 32])


#### Useful utilities
- ``evaluate(model, loader, criterion, device)`` – Evaluates a model on a dataset, computing loss, accuracy, and confusion matrix.
- ``plot_loss_accuracy(train_loss, train_acc, val_loss, val_acc, filename)`` – Plots and saves the training and validation loss/accuracy curves.
- ``save_model(model, filename, verbose, existed)`` – Saves a PyTorch model's state dictionary while handling filename conflicts.
- ``load_model(model, filename, qconfig, fuse_modules, verbose)`` – Loads a saved model, optionally applying quantization and module fusion.
- ``reset_seed(seed)`` – Sets seeds for PyTorch and NumPy to ensure reproducibility.
- ``plot_confusion_matrix(conf_matrix, filename)`` – Generates and saves a heatmap of the confusion matrix for CIFAR-10 classification.



In [None]:
import os

import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
from sklearn.metrics import confusion_matrix
import torch
import torch.ao.quantization as tq
from tqdm.notebook import tqdm

DEFAULT_DEVICE = "cuda" if torch.cuda.is_available() else "cpu"


def evaluate(model, loader, criterion, device=DEFAULT_DEVICE):
    running_loss = 0
    total, correct = 0, 0
    all_preds, all_labels = [], []

    model.eval()
    with torch.no_grad():
        loop = tqdm(loader, desc="Evaluating", leave=True)

        for images, labels in loop:
            images, labels = images.to(device), labels.to(device)
            output = model(images)
            loss = criterion(output, labels)

            running_loss += loss.item()
            predicted = torch.argmax(output, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

            all_preds.extend(predicted.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

        loop.set_postfix(
            loss=running_loss / (total / images.shape[0]), accuracy=correct / total
        )

    avg_loss = running_loss / len(loader)
    accuracy = correct / total
    all_preds = np.array(all_preds)
    all_labels = np.array(all_labels)
    conf_matrix = confusion_matrix(all_labels, all_preds)
    return avg_loss, accuracy, conf_matrix


def preprocess_filename(filename: str, existed: str = "keep_both") -> str:
    if existed == "overwrite":
        pass
    elif existed == "keep_both":
        base, ext = os.path.splitext(filename)
        cnt = 1
        while os.path.exists(filename):
            filename = f"{base}-{cnt}{ext}"
            cnt += 1
    elif existed == "raise" and os.path.exists(filename):
        raise FileExistsError(f"{filename} already exists.")
    else:
        raise ValueError(f"Unknown value for 'existed': {existed}")
    return filename


def plot_loss_accuracy(
    train_loss, train_acc, val_loss, val_acc, filename="loss_accuracy.png"
):

    fig, (ax1, ax2) = plt.subplots(1, 2)

    ax1.set_xlabel("Epoch")
    ax1.set_ylabel("Loss")
    ax1.plot(train_loss, color="tab:blue")
    ax1.plot(val_loss, color="tab:red")
    ax1.legend(["Training", "Validation"])
    ax1.set_title("Loss")

    ax2.set_xlabel("Epoch")
    ax2.set_ylabel("Accuracy")
    ax2.plot(train_acc, color="tab:blue")
    ax2.plot(val_acc, color="tab:red")
    ax2.legend(["Training", "Validation"])
    ax2.set_title("Accuracy")

    fig.tight_layout()
    filename = preprocess_filename(filename)
    os.makedirs(os.path.dirname(filename), exist_ok=True)
    plt.savefig(filename)
    print(f"Plot saved at {filename}")


def plot_confusion_matrix(conf_matrix, filename="conf_matrix.png"):
    classes = [
        "airplane",
        "automobile",
        "bird",
        "cat",
        "deer",
        "dog",
        "frog",
        "horse",
        "ship",
        "truck",
    ]
    plt.figure(figsize=(10, 8))
    sns.heatmap(
        conf_matrix,
        annot=True,
        fmt="d",
        cmap="Blues",
        xticklabels=classes,
        yticklabels=classes,
    )
    plt.xlabel("Predicted")
    plt.ylabel("True")
    plt.title("Confusion Matrix for CIFAR-10 Classification")
    plt.tight_layout()

    filename = preprocess_filename(filename)
    plt.savefig(filename)
    print(f"Confusion matrix saved to {filename}")


def save_model(
    model, filename: str, verbose: bool = True, existed: str = "keep_both"
) -> None:
    filename = preprocess_filename(filename, existed)

    os.makedirs(os.path.dirname(filename), exist_ok=True)
    torch.save(model.state_dict(), filename)
    if verbose:
        print(f"Model saved at {filename} ({os.path.getsize(filename) / 1e6} MB)")
    else:
        print(f"Model saved at {filename}")


def load_model(
    model, filename: str, qconfig=None, fuse_modules: bool = False, verbose: bool = True
) -> torch.nn.Module:
    if fuse_modules and hasattr(model, "fuse_modules"):
        print("Fusing modules")
        model.fuse_modules()
    else:
        print("Model does not have 'fuse_modules' method. Skipping fusion.")

    if qconfig is not None:
        model = tq.QuantWrapper(model)
        model.qconfig = qconfig
        tq.prepare(model, inplace=True)
        tq.convert(model, inplace=True)

    device = DEFAULT_DEVICE if qconfig is None else "cpu"
    print(device)
    model.load_state_dict(torch.load(filename, map_location=device))

    if verbose:
        print(f"Model loaded from {filename} ({os.path.getsize(filename) / 1e6} MB)")
    return model


def reset_seed(seed: int = 42):
    torch.manual_seed(seed)
    np.random.seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False



### VGG

#### Implement your model based on the given model architecture in the lab material.



In [None]:
import torch
import torch.nn as nn
import torch.ao.quantization as tq


class VGG(nn.Module):
    """ Implement your model here """
    def __init__(self, in_channels=3, in_size=32, num_classes=10) -> None:
        super().__init__()

        self.conv1 = nn.Sequential(
            nn.Conv2d(3,  64, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),

            nn.MaxPool2d(kernel_size=2, stride=2),
        )

        self.conv2 = nn.Sequential(
            nn.Conv2d(64,  192, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(192),
            nn.ReLU(),

            nn.MaxPool2d(kernel_size=2, stride=2),
        )

        self.conv3 = nn.Sequential(
            nn.Conv2d(192,  384, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(384),
            nn.ReLU(),
        )

        self.conv4 = nn.Sequential(
            nn.Conv2d(384,  256, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(),
        )

        self.conv5 = nn.Sequential(
            nn.Conv2d(256,  256, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(),

            nn.MaxPool2d(kernel_size=2, stride=2),
        )

        self.fc6 = nn.Sequential(
            nn.Linear(4*4*256, 256),
            nn.ReLU(),
        )

        self.fc7 = nn.Sequential(
            nn.Linear(256, 128),
            nn.ReLU(),
        )

        self.fc8 = nn.Sequential(
            nn.Linear(128, 10),
            nn.ReLU(),
        )

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)
        x = self.conv4(x)
        x = self.conv5(x)
        x = torch.flatten(x, start_dim=1)
        x = self.fc6(x)
        x = self.fc7(x)
        x = self.fc8(x)
        return x




if __name__ == "__main__":
    model = VGG()
    inputs = torch.randn(1, 3, 32, 32)
    print(model)

    from torchsummary import summary

    summary(model, (3, 32, 32), device="cpu")



VGG(
  (conv1): Sequential(
    (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
    (3): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (conv2): Sequential(
    (0): Conv2d(64, 192, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): BatchNorm2d(192, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
    (3): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (conv3): Sequential(
    (0): Conv2d(192, 384, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): BatchNorm2d(384, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
  )
  (conv4): Sequential(
    (0): Conv2d(384, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReL

### QConfig
#### Quantization scheme
- Write your QConfig Observer inorder to do PTQ
- Use Power-of-Two uniform/scale, symmetric quantization to quantize model.
- Finish qconfig observer for PTQ calibration.


In [None]:
from enum import Enum
import math

import torch
import torch.ao.quantization as tq


class PowerOfTwoObserver(tq.MinMaxObserver):
    """
    Observer module for power-of-two quantization (dyadic quantization with b = 1).
    """

    def scale_approximate(self, scale: float, max_shift_amount=8) -> float:
        #########Implement your code here##########
        if scale == 0:
            return 2 ** -max_shift_amount  # Prevent division by zero

        exp = math.log2(scale)
        #exp_q = round(exp)  # Round to the nearest power of two exponent
        exp_q = max(-max_shift_amount, min(exp, max_shift_amount))  # Clamp shift range

        return 2 ** exp_q  # Return the closest power-of-two scale
        ##########################################

    def calculate_qparams(self):
        """Calculates the quantization parameters with scale as power of two."""
        min_val, max_val = self.min_val.item(), self.max_val.item()

        """ Calculate zero_point as in the base class """
        #########Implement your code here##########
        min_abs, max_abs = abs(min_val), abs(max_val)
        max_range = max(min_abs, max_abs)
        scale = max_range / 127
        zero_point = 0 if self.dtype == torch.qint8 else 127
        ##########################################
        scale = self.scale_approximate(scale)
        scale = torch.tensor(scale, dtype=torch.float32)
        zero_point = torch.tensor(zero_point, dtype=torch.int64)
        return scale, zero_point

    def extra_repr(self):
        return f"min_val={self.min_val}, max_val={self.max_val}, scale=PowerOfTwo"


class CustomQConfig(Enum):
    POWER2 = tq.QConfig(
        activation=PowerOfTwoObserver.with_args(
            dtype=torch.quint8, qscheme=torch.per_tensor_symmetric
        ),
        weight=PowerOfTwoObserver.with_args(
            dtype=torch.qint8, qscheme=torch.per_tensor_symmetric
        ),
    )
    DEFAULT = None


### Model Training

- Set hyper parameter for training.
- Record the number of epochs and the accuracy in the results.
- Plot the accuracy and loss.



In [None]:
from pickle import FALSE
import os
import time

import torch
from torch import nn, optim
from tqdm.notebook import tqdm

reset_seed(10)


def train_one_epoch(model, loader, criterion, optimizer, device=DEFAULT_DEVICE):
    running_loss = 0
    total, correct = 0, 0

    loop = tqdm(loader, desc="Training", leave=True)

    for images, labels in loop:
        images, labels = images.to(device), labels.to(device)
        optimizer.zero_grad()
        output = model(images)
        loss = criterion(output, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        predicted = torch.argmax(output, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

        loop.set_postfix(
            loss=running_loss / (total / images.shape[0]), accuracy=correct / total
        )

    avg_loss = running_loss / len(loader)
    accuracy = correct / total
    return avg_loss, accuracy


def train(
    model,
    trainloader,
    valloader,
    criterion,
    optimizer,
    scheduler=None,
    epochs=1,
    save_path=None,
    device=DEFAULT_DEVICE,
):
    model = model.to(device)

    train_loss, train_acc = [], []
    val_loss, val_acc = [], []

    epoch_loop = tqdm(range(epochs), desc="Epochs", leave=True)

    for epoch in epoch_loop:
        model.train()
        _train_loss, _train_acc = train_one_epoch(
            model, trainloader, criterion, optimizer
        )
        train_loss.append(_train_loss)
        train_acc.append(_train_acc)

        model.eval()
        _val_loss, _val_acc, _ = evaluate(model, valloader, criterion)
        val_loss.append(_val_loss)
        val_acc.append(_val_acc)

        print(f"Epoch {epoch + 1:2d}/{epochs}", end="  ")
        if scheduler is not None:
            print(f"lr={scheduler.get_last_lr()[0]:.2e}", end=", ")
        print(f"train_loss={_train_loss:.4f}, val_loss={_val_loss:.4f}", end=", ")
        print(f"train_acc={_train_acc:.4f}, val_acc={_val_acc:.4f}")

        if _val_acc >= max(val_acc):
            save_model(model, save_path, existed="overwrite")

        if scheduler is not None:
            scheduler.step()

        epoch_loop.set_postfix(
            train_loss=_train_loss,
            val_loss=_val_loss,
            train_acc=_train_acc,
            val_acc=_val_acc,
        )

    return train_loss, train_acc, val_loss, val_acc


def main(epochs, network, dataset, name=None):
    dataset = dataset.lower()
    if name is None:
        name = f"{dataset}/{network.__name__.lower()}"

    t = time.time()
    trainloader, valloader, testloader = DATALOADERS[dataset](batch_size=64)
    in_channels, in_size = trainloader.dataset[0][0].shape[:2]
    model_path = './weight_fp32.pt'
    # model = network(in_channels, in_size).to(DEFAULT_DEVICE)
    model = load_model(VGG(in_channels, in_size), model_path, qconfig=None, fuse_modules=False)
    #########Implement your code here##########
    #Loss function
    criterion = nn.CrossEntropyLoss()
    #Optimizer
    optimizer = optim.Adam(model.parameters(), lr = 0.0001)
    #Scheduler
    scheduler = optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.9)
    ##########################################
    train_loss, train_acc, val_loss, val_acc = train(
        model,
        trainloader,
        valloader,
        criterion,
        optimizer,
        scheduler,
        epochs,
        save_path=f"./weight_fp32.pt",
    )

    test_loss, test_accuracy, _ = evaluate(model.eval(), testloader, criterion)
    print(f"Test: loss={test_loss:.4f}, accuracy={test_accuracy:.4f}")
    print(f"Model size: {os.path.getsize(f'./weight_fp32.pt') / 1e6:.2f} MB")

    plot_loss_accuracy(train_loss, train_acc, val_loss, val_acc, f"./image/vgg.png")
    print(f"Time: {time.time() - t:.2f}s")


if __name__ == "__main__":
    for network in [VGG]:
        """ You can adjust the number of epochs """
        EPOCHS = 20
        main(epochs=EPOCHS, network=network, dataset="cifar10")



Files already downloaded and verified
Files already downloaded and verified
Model does not have 'fuse_modules' method. Skipping fusion.
cuda


  model.load_state_dict(torch.load(filename, map_location=device))


Model loaded from /content/drive/MyDrive/E24106220_lab1/weights/cifar10/vgg.pt (13.367975 MB)


Epochs:   0%|          | 0/20 [00:00<?, ?it/s]

Training:   0%|          | 0/704 [00:00<?, ?it/s]

Evaluating:   0%|          | 0/79 [00:00<?, ?it/s]

Epoch  1/20  lr=1.00e-04, train_loss=0.2307, val_loss=0.6284, train_acc=0.9182, val_acc=0.8092
Model saved at /content/drive/MyDrive/E24106220_lab1/weights/cifar10/vgg.pt (13.367975 MB)


Training:   0%|          | 0/704 [00:00<?, ?it/s]

Evaluating:   0%|          | 0/79 [00:00<?, ?it/s]

Epoch  2/20  lr=9.00e-05, train_loss=0.2137, val_loss=0.6151, train_acc=0.9233, val_acc=0.8256
Model saved at /content/drive/MyDrive/E24106220_lab1/weights/cifar10/vgg.pt (13.367975 MB)


Training:   0%|          | 0/704 [00:00<?, ?it/s]

Evaluating:   0%|          | 0/79 [00:00<?, ?it/s]

Epoch  3/20  lr=8.10e-05, train_loss=0.1904, val_loss=0.7371, train_acc=0.9335, val_acc=0.7970


Training:   0%|          | 0/704 [00:00<?, ?it/s]

Evaluating:   0%|          | 0/79 [00:00<?, ?it/s]

Epoch  4/20  lr=7.29e-05, train_loss=0.1696, val_loss=0.6274, train_acc=0.9403, val_acc=0.8212


Training:   0%|          | 0/704 [00:00<?, ?it/s]

Evaluating:   0%|          | 0/79 [00:00<?, ?it/s]

Epoch  5/20  lr=6.56e-05, train_loss=0.1524, val_loss=0.6442, train_acc=0.9476, val_acc=0.8240


Training:   0%|          | 0/704 [00:00<?, ?it/s]

Evaluating:   0%|          | 0/79 [00:00<?, ?it/s]

Epoch  6/20  lr=5.90e-05, train_loss=0.1415, val_loss=0.5682, train_acc=0.9508, val_acc=0.8424
Model saved at /content/drive/MyDrive/E24106220_lab1/weights/cifar10/vgg.pt (13.367975 MB)


Training:   0%|          | 0/704 [00:00<?, ?it/s]

Evaluating:   0%|          | 0/79 [00:00<?, ?it/s]

Epoch  7/20  lr=5.31e-05, train_loss=0.1211, val_loss=0.5965, train_acc=0.9577, val_acc=0.8338


Training:   0%|          | 0/704 [00:00<?, ?it/s]

Evaluating:   0%|          | 0/79 [00:00<?, ?it/s]

Epoch  8/20  lr=4.78e-05, train_loss=0.1106, val_loss=0.5806, train_acc=0.9603, val_acc=0.8434
Model saved at /content/drive/MyDrive/E24106220_lab1/weights/cifar10/vgg.pt (13.367975 MB)


Training:   0%|          | 0/704 [00:00<?, ?it/s]

Evaluating:   0%|          | 0/79 [00:00<?, ?it/s]

Epoch  9/20  lr=4.30e-05, train_loss=0.1016, val_loss=0.5694, train_acc=0.9656, val_acc=0.8482
Model saved at /content/drive/MyDrive/E24106220_lab1/weights/cifar10/vgg.pt (13.367975 MB)


Training:   0%|          | 0/704 [00:00<?, ?it/s]

Evaluating:   0%|          | 0/79 [00:00<?, ?it/s]

Epoch 10/20  lr=3.87e-05, train_loss=0.0924, val_loss=0.5699, train_acc=0.9669, val_acc=0.8538
Model saved at /content/drive/MyDrive/E24106220_lab1/weights/cifar10/vgg.pt (13.367975 MB)


Training:   0%|          | 0/704 [00:00<?, ?it/s]

Evaluating:   0%|          | 0/79 [00:00<?, ?it/s]

Epoch 11/20  lr=3.49e-05, train_loss=0.0835, val_loss=0.5978, train_acc=0.9711, val_acc=0.8502


Training:   0%|          | 0/704 [00:00<?, ?it/s]

KeyboardInterrupt: 

### PTQ on VGG Model

#### You can refer to 'Quantization in Practice' in the lab material.



In [None]:
import os
import torch
from torch import nn
import torch.ao.quantization as tq
from torch.utils.data import DataLoader, TensorDataset

reset_seed(0)



#########Implement your code here##########
""" Calibrate Method """
def calibrate(model, loader, device=DEFAULT_DEVICE):
    model.eval()
    with torch.no_grad():
        for data, _ in loader:
            data = data.to(device)
            model(data)

###########################################

def main(network, verbose=True):


    #########Implement your code here##########
    """ Calibration Data """
    *_, trainloader = DATALOADERS['cifar10'](batch_size=1)

    """ Load Pretrained Model """
    in_channels, in_size = trainloader.dataset[0][0].shape[:2]
    model = network(in_channels, in_size).eval().cpu()
    # Load the pre-trained model
    model_path = '/content/drive/MyDrive/E24106220_lab1/weights/cifar10/vgg.pt'
    model = load_model(VGG(in_channels, in_size), model_path, qconfig=None, fuse_modules=True)

    """ Fuse Modules """
    # model = tq.fuse_modules(model, [['bn', 'relu']], inplace=False)

    """ Configure Quantization """
    # Use power-of-two quantization for maximum compression
    model = tq.QuantWrapper(model)
    model.qconfig = CustomQConfig.POWER2.value
    print(f"Quantization backend: {model.qconfig}")


    # Insert observers to collect statistics during calibration
    print("Preparing model for quantization...")
    tq.prepare(model, inplace=True)

    """ Apply Quantization """
    # Run calibration to determine optimal quantization parameters
    print("Calibrating model...")
    calibrate(model, trainloader, 'cpu')

    """ Convert Model """
    # Convert floating-point model to quantized version
    print("Converting to quantized model...")
    tq.convert(model.cpu(), inplace=True)

    """ Save Model """
    # Save the optimized quantized model
    output_path = '/content/drive/MyDrive/E24106220_lab1/weights/cifar10/vgg-power2.pt'
    save_model(model, output_path, verbose=verbose)

    """Show PTQ result"""
    if verbose:
        original_size = os.path.getsize('/content/drive/MyDrive/E24106220_lab1/weights/cifar10/vgg.pt') / 1e6
        quantized_size = os.path.getsize(output_path) / 1e6
        print(f"Original model size: {original_size:.2f} MB")
        print(f"Quantized model size: {quantized_size:.2f} MB")
        print(f"Size reduction: {(1 - quantized_size/original_size) * 100:.2f}%")

    return model
    ###########################################

if __name__ == "__main__":
    main(network=VGG, verbose=True)



Files already downloaded and verified
Files already downloaded and verified
Model does not have 'fuse_modules' method. Skipping fusion.
cuda
Model loaded from /content/drive/MyDrive/E24106220_lab1/weights/cifar10/vgg.pt (13.367975 MB)
Quantization backend: QConfig(activation=functools.partial(<class '__main__.PowerOfTwoObserver'>, dtype=torch.quint8, qscheme=torch.per_tensor_symmetric){}, weight=functools.partial(<class '__main__.PowerOfTwoObserver'>, dtype=torch.qint8, qscheme=torch.per_tensor_symmetric){})
Preparing model for quantization...
Calibrating model...


  model.load_state_dict(torch.load(filename, map_location=device))


Converting to quantized model...
Model saved at /content/drive/MyDrive/E24106220_lab1/weights/cifar10/vgg-power2.pt (3.380066 MB)
Original model size: 13.37 MB
Quantized model size: 3.38 MB
Size reduction: 74.72%


### Evaluate Quantized Model


In [None]:
import os
from torch import nn
import torch.ao.quantization as tq

reset_seed(0)

def main():
    dataset = 'cifar10'
    backend = 'power2'
    model_path = './weight_int8.pt'

    *_, test_loader = DATALOADERS[dataset](batch_size=1000)
    in_channels, in_size = test_loader.dataset[0][0].shape[:2]
    if backend:
        qconfig = CustomQConfig[backend.upper()].value
        fuse_modules = True
    else:
        qconfig = None
        fuse_modules = False
    model = load_model(VGG(in_channels, in_size), model_path, qconfig=qconfig, fuse_modules=fuse_modules)
    print(model)

    device = "cpu" if backend else DEFAULT_DEVICE
    criterion = nn.CrossEntropyLoss()
    test_loss, test_accuracy, _ = evaluate(
        model.to(device), test_loader, criterion, device=device
    )
    print(
        f"Test: loss={test_loss:.4f}, accuracy={test_accuracy:.4f}, size={os.path.getsize(model_path) / 1e6}MB"
    )


if __name__ == "__main__":
    main()



Files already downloaded and verified
Files already downloaded and verified
Model does not have 'fuse_modules' method. Skipping fusion.
cpu
Model loaded from /content/drive/MyDrive/E24106220_lab1/weights/cifar10/vgg-power2.pt (3.380066 MB)
QuantWrapper(
  (quant): Quantize(scale=tensor([0.0168]), zero_point=tensor([127]), dtype=torch.quint8)
  (dequant): DeQuantize()
  (module): VGG(
    (conv1): Sequential(
      (0): QuantizedConv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), scale=0.02796769142150879, zero_point=127, padding=(1, 1))
      (1): QuantizedBatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): ReLU()
      (3): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    )
    (conv2): Sequential(
      (0): QuantizedConv2d(64, 192, kernel_size=(3, 3), stride=(1, 1), scale=0.06782747805118561, zero_point=127, padding=(1, 1))
      (1): QuantizedBatchNorm2d(192, eps=1e-05, momentum=0.1, affine=True, track_running_stats=

  model.load_state_dict(torch.load(filename, map_location=device))
  device=storage.device,


Evaluating:   0%|          | 0/10 [00:00<?, ?it/s]

Test: loss=0.5783, accuracy=0.8603, size=3.380066MB
