In [4]:
!pip install optimum-quanto
#restart kernal after installing
from IPython.core.display import HTML
HTML("<script>Jupyter.notebook.kernel.restart()</script>")



In [5]:
!pip install torch-pruning
import torch_pruning as tp
from optimum.quanto import Calibration, QTensor, freeze, qfloat8, qint4, qint8, qint2,quantize



In [33]:
def train_model(model,
                train_loader,
                test_loader,
                device,
                learning_rate=1e-1,
                num_epochs=200):

    criterion = nn.CrossEntropyLoss()

    model.to(device)

    optimizer = optim.SGD(model.parameters(),
                          lr=learning_rate,
                          momentum=0.9,
                          weight_decay=1e-4)

    scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer,
                                                     milestones=[100, 150],
                                                     gamma=0.1,
                                                     last_epoch=-1)

    # Evaluation
    model.eval()
    eval_loss, eval_accuracy = evaluate_model(model=model,
                                              test_loader=test_loader,
                                              device=device,
                                              criterion=criterion)
    print("Epoch: {:03d} Eval Loss: {:.3f} Eval Acc: {:.3f}".format(
        0, eval_loss, eval_accuracy))

    for epoch in range(num_epochs):

        # Training
        model.train()

        running_loss = 0
        running_corrects = 0

        for inputs, labels in train_loader:

            inputs = inputs.to(device)
            labels = labels.to(device)

            # zero the parameter gradients
            optimizer.zero_grad()

            # forward + backward + optimize
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            # statistics
            running_loss += loss.item() * inputs.size(0)
            running_corrects += torch.sum(preds == labels.data)

        train_loss = running_loss / len(train_loader.dataset)
        train_accuracy = running_corrects / len(train_loader.dataset)

        # Evaluation
        model.eval()
        eval_loss, eval_accuracy = evaluate_model(model=model,
                                                  test_loader=test_loader,
                                                  device=device,
                                                  criterion=criterion)

        scheduler.step()

        print(
            "Epoch: {:03d} Train Loss: {:.3f} Train Acc: {:.3f} Eval Loss: {:.3f} Eval Acc: {:.3f}"
            .format(epoch + 1, train_loss, train_accuracy, eval_loss,
                    eval_accuracy))

    return model

def evaluate_model(model, test_loader, device, criterion=None):

    model.eval()
    model.to(device)

    running_loss = 0
    running_corrects = 0

    for inputs, labels in test_loader:

        inputs = inputs.to(device)
        labels = labels.to(device)

        outputs = model(inputs)
        _, preds = torch.max(outputs, 1)

        if criterion is not None:
            loss = criterion(outputs, labels).item()
        else:
            loss = 0

        # statistics
        running_loss += loss * inputs.size(0)
        running_corrects += torch.sum(preds == labels.data)

    eval_loss = running_loss / len(test_loader.dataset)
    eval_accuracy = running_corrects / len(test_loader.dataset)

    return eval_loss, eval_accuracy


In [34]:
def save_model(model, model_dir, model_filename):

    if not os.path.exists(model_dir):
        os.makedirs(model_dir)
    model_filepath = os.path.join(model_dir, model_filename)
    torch.save(model.state_dict(), model_filepath)

def load_model(model, model_filepath, device):

    model.load_state_dict(torch.load(model_filepath, map_location=device))

    return model

In [35]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
from torchvision import models

# Set device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Transformation for MobileNet
transform = transforms.Compose([
    transforms.Grayscale(num_output_channels=3),  # Convert grayscale images to 3 channels
    transforms.Resize((32, 32)),  
    transforms.ToTensor(),
    transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225))  # Standard normalization for ImageNet
])

batch_size = 128

# MNIST dataset
train_dataset = torchvision.datasets.MNIST(root='./data', train=True, transform=transform, download=True)
test_dataset = torchvision.datasets.MNIST(root='./data', train=False, transform=transform, download=True)

# Data loaders
train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=False)


In [36]:
import torch.nn.functional as F
import time
def test(model, device, test_loader):
    model.to(device)
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        start = time.time()
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            if isinstance(output, QTensor):
                output = output.dequantize()
            test_loss += F.nll_loss(output, target, reduction="sum").item()  # sum up batch loss
            pred = output.argmax(dim=1, keepdim=True)  # get the index of the max log-probability
            correct += pred.eq(target.view_as(pred)).sum().item()
        end = time.time()

    test_loss /= len(test_loader.dataset)

    print(
        "\nTest set evaluated in {:.2f} s: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n".format(
            end - start, test_loss, correct, len(test_loader.dataset), 100.0 * correct / len(test_loader.dataset)
        )
    )

In [55]:
# Load pretrained MobileNet model
model = models.mobilenet_v2(pretrained=True)

# Modify the classifier to match the number of classes in MNIST (10 classes)
model.classifier[1] = nn.Linear(model.classifier[1].in_features, 10)

# Move model to the device (GPU or CPU)
model = model.to(device)

# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training function
def train_models(model, train_loader, test_loader, device, criterion, optimizer, num_epochs=1):
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        for i, (images, labels) in enumerate(train_loader):
            images, labels = images.to(device), labels.to(device)

            # Forward pass
            outputs = model(images)
            loss = criterion(outputs, labels)

            # Backward pass and optimization
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            running_loss += loss.item()

        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}')

        # Evaluate the model
        model.eval()
        with torch.no_grad():
            correct = 0
            total = 0
            for images, labels in test_loader:
                images, labels = images.to(device), labels.to(device)
                outputs = model(images)
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

        print(f'Accuracy of the model on the test images: {100 * correct / total} %')

# Train and evaluate the model
train_models(model=model,
            train_loader=train_loader,
            test_loader=test_loader,
            device=device,
            criterion=criterion,
            optimizer=optimizer,
            num_epochs=1)

Epoch [1/1], Loss: 0.1253
Accuracy of the model on the test images: 98.55 %


In [56]:
test(model, device, test_loader)

model_dir = "/kaggle/working/"
model_filename = "Mobilenet_v2.pth"
model_filepath = model_dir + model_filename
import os
#save_model(model=model, model_dir=model_dir, model_filename=model_filename)
torch.save(model,model_filepath)


Test set evaluated in 4.71 s: Average loss: -10.6797, Accuracy: 9855/10000 (99%)



In [57]:
#new_model = torch.load(model_filepath)
#test(new_model, device, test_loader)

In [58]:
all_images = []
targets = []

for image, target in test_loader:
    all_images.append(image)
    targets.append(target)
    break
# Concatenate all the images into a single tensor
all_images_tensor = torch.cat(all_images, dim=0)
targets_tensor = torch.cat(all_images, dim=0)
print(all_images_tensor.shape)
print(targets_tensor.shape)

torch.Size([128, 3, 32, 32])
torch.Size([128, 3, 32, 32])


In [59]:
# for name,m in new_model.named_modules():
#     print(name)

In [60]:
# example_inputs = torch.randn(1,3,224,224).to(device)

criterion = nn.CrossEntropyLoss()

example_inputs = all_images_tensor.clone().detach().requires_grad_(True).to(device)
eample_targets = targets_tensor.clone().detach().requires_grad_(True).to(device)

# 1. Importance criterion
imp = tp.importance.GroupTaylorImportance() # or GroupNormImportance(p=2), GroupHessianImportance(), etc.

# 2. Initialize a pruner with the model and the importance criterion
ignored_layers = [model.features[1], model.features[2], model.features[3], model.features[4]]
for m in model.modules():
    if isinstance(m, torch.nn.Linear) and m.out_features == 10:
        ignored_layers.append(m) # DO NOT prune the final classifier!

pruning_ratio_dict = {
#     model.features[1]: 0.2,
#     model.features[3]: 0.3,
#     model.features[6]: 0.4,
#     model.features[13]: 0.5,
    model.classifier: 0.2 # Example: Pruning 20% of the channels in the classifier
}

pruner1 = tp.pruner.MetaPruner( # We can always choose MetaPruner if sparse training is not required.
    model,
    example_inputs,
    importance=imp,
    pruning_ratio=0.2, 
    # pruning_ratio_dict =pruning_ratio_dict , # customized pruning ratios for layers or blocks
    ignored_layers=ignored_layers,
)
# pruner2 = tp.pruner.MetaPruner( # We can always choose MetaPruner if sparse training is not required.
#     model,
#     example_inputs,
#     importance=imp,
#     # pruning_ratio=0.1, 
#     pruning_ratio_dict =pruning_ratio_dict , # customized pruning ratios for layers or blocks
#     ignored_layers=ignored_layers,
# )

# 3. Prune & finetune the model
base_macs, base_nparams = tp.utils.count_ops_and_params(model, example_inputs)
if isinstance(imp, tp.importance.GroupTaylorImportance):
    # Taylor expansion requires gradients for importance estimation
#     loss = new_model(example_inputs).sum() # A dummy loss, please replace this line with your loss function and data!
    loss = criterion(example_inputs,eample_targets)
    loss.backward() # before pruner.step()

In [61]:
pruner1.step()

macs, nparams = tp.utils.count_ops_and_params(model.to(device), example_inputs.to(device))
print(
    " Params: %.2f M => %.2f M"
    % (base_nparams / 1e6, nparams / 1e6)
)

# pruner2.step()

# macs, nparams = tp.utils.count_ops_and_params(model.to(device), example_inputs.to(device))
# print(
#     " Params: %.2f M => %.2f M"
#     % (base_nparams / 1e6, nparams / 1e6)
# )


test(model, device, test_loader)

torch.save(model,"/kaggle/working/pruned.pth")

 Params: 2.24 M => 1.46 M
 Params: 2.24 M => 1.46 M





Test set evaluated in 4.53 s: Average loss: -1.4228, Accuracy: 5195/10000 (52%)



In [62]:
def Quantize(model,weights_dtype,activations_dtype):
    quantize(model, weights=weights_dtype, activations=activations_dtype)
    print("Calibrating ...")
    with Calibration():
        test(model, device, test_loader)
    freeze(model)

weights = [qint8,qint4,qint2,qfloat8]
strs = ["int8.pth","int4.pth","int2.pth","float8.pth"]
activations = [qint8,qint4,qint2,qfloat8]

i= 0#change here
# pruned_model = torch.load("/content/drive/MyDrive/vision lab/Mobilenet_v2.pth")
Quantize(model = model,weights_dtype=weights[i],activations_dtype=activations[i])
path = "/kaggle/working/"+strs[i]
torch.save(model,path)

Calibrating ...

Test set evaluated in 11.22 s: Average loss: -1.4271, Accuracy: 5176/10000 (52%)



In [63]:
if i==0:
    ptq_model = torch.load("/kaggle/working/int8.pth")
    print("int8 model:")
    test(ptq_model, device, test_loader)
elif i==1:
    ptq_model = torch.load("/kaggle/working/int4.pth")
    print("int4 model:")
    test(ptq_model, device, test_loader)
elif i==2:
    ptq_model = torch.load("/kaggle/working/int2.pth")
    print("int2 model:")
    test(ptq_model, device, test_loader)
else:
    ptq_model = torch.load("/kaggle/working/float8.pth")
    print("float8 model:")
    test(ptq_model, device, test_loader)

int8 model:


  ptq_model = torch.load("/kaggle/working/int8.pth")



Test set evaluated in 5.62 s: Average loss: -1.4291, Accuracy: 5173/10000 (52%)



In [None]:
# for name, param in ptq_model.named_parameters():
#     print(name)
#     print(param)
#     break

# **QAT for model(optional)**

In [65]:
# quantize(ptq_model,weights=qint4,activations=qint4)
qat_model = train_model(model=ptq_model,
            train_loader=train_loader,
            test_loader=test_loader,
            device=device,
            num_epochs=2)

test(ptq_model, device, test_loader)

freeze(qat_model)
print("after freezing:")
test(qat_model, device, test_loader)
torch.save(qat_model,path)

Epoch: 000 Eval Loss: 1.526 Eval Acc: 0.517
Epoch: 001 Train Loss: 0.061 Train Acc: 0.982 Eval Loss: 0.045 Eval Acc: 0.985
Epoch: 002 Train Loss: 0.045 Train Acc: 0.987 Eval Loss: 0.036 Eval Acc: 0.989

Test set evaluated in 5.54 s: Average loss: -7.5813, Accuracy: 9895/10000 (99%)

after freezing:

Test set evaluated in 5.55 s: Average loss: -7.5813, Accuracy: 9895/10000 (99%)



In [66]:
for name, param in qat_model.named_parameters():
    print(name)
    print(param)
    break

features.0.0.weight
QBytesTensor(tensor([[[[  45,  -15,   50],
          [ 111,  -86,   23],
          [  36, -127,  -50]],

         [[  27,  -20,   51],
          [  68, -112,   -7],
          [  39, -112,  -27]],

         [[ -86,  -69,  -34],
          [ -38, -100,  -52],
          [ -90, -114,  -82]]],


        [[[  -9,   -2,    6],
          [  24,   64,    7],
          [ -32,  -47,   -8]],

         [[ -10,   11,    2],
          [  47,  127,   17],
          [ -57, -103,  -15]],

         [[  -1,    0,    7],
          [   9,   29,    0],
          [ -11,  -26,   -2]]],


        [[[ -11,    8,   -1],
          [ -36,   63,  -27],
          [ -60,   79,  -25]],

         [[ -12,   16,   -4],
          [ -72,  115,  -46],
          [ -94,  127,  -40]],

         [[  -3,   -3,    2],
          [ -18,   32,  -15],
          [ -26,   37,  -13]]],


        [[[   1,   -6,   16],
          [  11,   37,  -52],
          [   3,   57,  -60]],

         [[  -1,  -10,   16],
          [