In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
from torchvision.models import resnet18, ResNet18_Weights
import random
import copy
import matplotlib.pyplot as plt
import torch.nn as nn
import numpy as np
import torch
import torch.optim as optim
import torchvision
from torchvision.transforms import Resize
import torchvision.transforms as transforms
import torchvision.models as models
import torch.quantization
import torch.nn.functional as F
from torch.utils.data import random_split
from torch.utils.data import DataLoader
import pandas as pd
import time

# use GPU if available
if torch.cuda.is_available():
        device = torch.device("cuda")
        print("GPU is available and being used.")
else:
        device = torch.device("cpu")
        print("GPU is not available, using CPU instead.")

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

GPU is available and being used.
Using device: cuda


### Basic Block and ResNet18 Architecture

In [None]:
class BasicBlock(nn.Module):
    expansion = 1  # No expansion in BasicBlock

    def __init__(self, in_channels, out_channels, kernel_size=3, stride=1, padding=1, downsample=None):
        super(BasicBlock, self).__init__()
        self.stride = stride

        # First convolutional layer
        self.conv1 = nn.Conv2d(
            in_channels, out_channels,
            kernel_size=kernel_size, stride=stride, padding=padding, bias=False
        )

        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)

        # Second convolutional layer
        self.conv2 = nn.Conv2d(
            out_channels, out_channels,
            kernel_size=kernel_size, stride=1, padding=padding, bias=False
        )
        self.bn2 = nn.BatchNorm2d(out_channels)

        # Downsample layer for shortcut connection (if needed)
        self.downsample = downsample

    def forward(self, x):
        identity = x  # Save the input tensor for the shortcut

        # First layer
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        # Second layer
        out = self.conv2(out)
        out = self.bn2(out)

        # Apply downsampling to the identity if necessary
        if self.downsample is not None:
            identity = self.downsample(x)

        # Add the identity (shortcut connection)
        out += identity
        out = self.relu(out)

        return out

In [None]:
class ResNet18(nn.Module):
    def __init__(self, num_classes=1000):
        super(ResNet18, self).__init__()

        # Initial Convolution and Max Pool
        self.conv1 = nn.Conv2d(
            in_channels=3, out_channels=64,
            kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        # Define layers using your BasicBlock
        self.layer1 = self._make_layer(64, 64, 2, stride=1)
        self.layer2 = self._make_layer(64, 128, 2, stride=2)
        self.layer3 = self._make_layer(128, 256, 2, stride=2)
        self.layer4 = self._make_layer(256, 512, 2, stride=2)


        # Adaptive Average Pooling
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))

        # Fully connected layer
        self.fc = nn.Linear(512 * BasicBlock.expansion, num_classes)

        # Initialize weights
        self._initialize_weights()

    def _make_layer(self, in_channels, out_channels, blocks, stride):
        downsample = None
        if stride != 1 or in_channels != out_channels:
            downsample = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )

        layers = []
        layers.append(BasicBlock(in_channels, out_channels, stride=stride, downsample=downsample))
        for _ in range(1, blocks):
            layers.append(BasicBlock(out_channels, out_channels))

        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)

        return x

    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)

### Initialization

In [None]:
# Hyperparameters:
learning_rate = 0.0005
momentum = 0.9
weight_decay = 1e-3

num_epochs = 6
T_max = num_epochs
eta_min = 1e-5


In [None]:
# Define transformations for CIFAR-100 dataset
transform_train = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
    transforms.RandomRotation(15),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

transform_test = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Download the CIFAR-100 training dataset
download_train_dataset = torchvision.datasets.CIFAR100(root='./data', train=True, download=True, transform=transform_train)
download_test_dataset = torchvision.datasets.CIFAR100(root='./data', train=False, download=True, transform=transform_test)

batch_size = 64
# Create DataLoader for training and validation datasets
train_loader = DataLoader(download_train_dataset, batch_size=batch_size, shuffle=True, num_workers=2)
test_loader = DataLoader(download_test_dataset, batch_size=batch_size, shuffle=False, num_workers=2)

Files already downloaded and verified
Files already downloaded and verified


### Functions

#### Save/Load function

In [None]:
def load_checkpoint(model, optimizer, path):
    checkpoint = torch.load(path, map_location=device)
    model.load_state_dict(checkpoint['model_state_dict'])
    optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
    epoch = checkpoint['epoch']
    model.to(device)

    for state in optimizer.state.values():
        for k, v in state.items():
            if isinstance(v, torch.Tensor):
                state[k] = v.to(device)
    return model, optimizer, epoch

In [None]:
def save_checkpoint(model, optimizer, epoch, path):
    # Create the directory if it doesn't exist
    import os
    os.makedirs(os.path.dirname(path), exist_ok=True)
    torch.save({
        'epoch': epoch,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
    }, path)

#### Evaluation Function

In [None]:
def evaluate(model, data_loader, device):
    model.eval()  # Set model to evaluation mode
    correct = 0
    total = 0

    with torch.no_grad():
        for images, labels in data_loader:
            images = images.to(device)
            labels = labels.to(device)

            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)

            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    accuracy = 100 * correct / total
    return accuracy

#### Quantization Function

In [None]:
def fixed_point_quantize_weights(weights, total_bits, int_bits):
    frac_bits = total_bits - int_bits
    delta = 2 ** (-frac_bits)
    max_val = (2 ** (total_bits - 1) - 1) * delta
    min_val = -2 ** (total_bits - 1) * delta

    q_weights = torch.clamp(torch.round(weights / delta), min_val / delta, max_val / delta) * delta
    return q_weights

In [None]:
class QuantizedConv2d(nn.Conv2d):
    def __init__(self, *args, total_bits=8, weight_int_bits=2, input_int_bits=2, **kwargs):
        super(QuantizedConv2d, self).__init__(*args, **kwargs)
        self.total_bits = total_bits
        self.weight_int_bits = weight_int_bits
        self.input_int_bits = input_int_bits
    def forward(self, input):
        # quantize input
        quantized_input = fixed_point_quantize_weights(input, self.total_bits, self.input_int_bits)
        # quantize weights
        original_weights = self.weight.data
        quantized_weights = fixed_point_quantize_weights(original_weights, self.total_bits, self.weight_int_bits)
        output = F.conv2d(quantized_input, quantized_weights, self.bias, self.stride,
                          self.padding, self.dilation, self.groups)
        return output

In [None]:
## quantize conv
def quantize_conv2d(model, total_bits, weight_int_bits, input_int_bits):
    for name, m in model.named_children():
        if isinstance(m, nn.Conv2d):
            new_layer = QuantizedConv2d(
                in_channels=m.in_channels,
                out_channels=m.out_channels,
                kernel_size=m.kernel_size,
                stride=m.stride,
                padding=m.padding,
                dilation=m.dilation,
                groups=m.groups,
                bias=(m.bias is not None),
                total_bits=total_bits,
                weight_int_bits=weight_int_bits,
                input_int_bits=input_int_bits
            )
            new_layer.weight.data = fixed_point_quantize_weights(m.weight.data.clone(), total_bits, weight_int_bits)
            if m.bias is not None:
                new_layer.bias.data = fixed_point_quantize_weights(m.bias.data.clone(), total_bits, weight_int_bits)

            setattr(model, name, new_layer)
        elif len(list(m.children())) > 0:
            quantize_conv2d(m, total_bits, weight_int_bits, input_int_bits)

### Reload from our trained model (ResNet18) as starting point, start_epoch will be initialized as 0

In [None]:
checkpoint_path = '/content/drive/My Drive/Colab Notebooks/checkpoints/transfer_learning_checkpoint.pth'
model = ResNet18(num_classes=100)

optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate, weight_decay=weight_decay)

model, optimizer, start_epoch = load_checkpoint(model, optimizer, checkpoint_path)
model.to(device)

# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()

# Create model, schedueler
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=T_max, eta_min=eta_min)

# Verify the model
print("start epoch: ", start_epoch)
for name, param in model.named_parameters():
    if param.requires_grad:
        print(name, param.data.shape, param.data.sum())

start_epoch = 0 # initialize start_epoch for new training quantized ResNet18


  checkpoint = torch.load(path, map_location=device)


start epoch:  60
conv1.weight torch.Size([64, 3, 7, 7]) tensor(0.2768, device='cuda:0')
bn1.weight torch.Size([64]) tensor(16.4849, device='cuda:0')
bn1.bias torch.Size([64]) tensor(11.5917, device='cuda:0')
layer1.0.conv1.weight torch.Size([64, 64, 3, 3]) tensor(-113.8010, device='cuda:0')
layer1.0.bn1.weight torch.Size([64]) tensor(21.7344, device='cuda:0')
layer1.0.bn1.bias torch.Size([64]) tensor(-2.1847, device='cuda:0')
layer1.0.conv2.weight torch.Size([64, 64, 3, 3]) tensor(-32.7843, device='cuda:0')
layer1.0.bn2.weight torch.Size([64]) tensor(21.3155, device='cuda:0')
layer1.0.bn2.bias torch.Size([64]) tensor(0.2216, device='cuda:0')
layer1.1.conv1.weight torch.Size([64, 64, 3, 3]) tensor(-89.2124, device='cuda:0')
layer1.1.bn1.weight torch.Size([64]) tensor(21.0363, device='cuda:0')
layer1.1.bn1.bias torch.Size([64]) tensor(-5.3487, device='cuda:0')
layer1.1.conv2.weight torch.Size([64, 64, 3, 3]) tensor(-46.4579, device='cuda:0')
layer1.1.bn2.weight torch.Size([64]) tensor(25

### Load from quantized checkpoint (ResNet18) as starting point, start_epoch will be previously saved checkpoint

In [None]:
checkpoint_path = '/content/drive/My Drive/Colab Notebooks/checkpoints/quantized_checkpoint.pth'
model = ResNet18(num_classes=100)

optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate, weight_decay=weight_decay)

model, optimizer, start_epoch = load_checkpoint(model, optimizer, checkpoint_path)
model.to(device)

# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()

# Create model, schedueler
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=T_max, eta_min=eta_min)

# Verify the model
print("start epoch: ", start_epoch)
for name, param in model.named_parameters():
    if param.requires_grad:
        print(name, param.data.shape, param.data.sum())

### Quantized Model Search Space

In [None]:
best_accuracy = 0
best_weight_int_bits = 0
best_input_int_bits = 0
weight_int_bits_options = [2, 3, 4]
input_int_bits_options = [2, 3, 4]

for weight_int_bits in weight_int_bits_options:
    for input_int_bits in input_int_bits_options:
        # Create a copy of the model for quantization
        quantized_model = copy.deepcopy(model)

        # Quantize the model
        quantize_conv2d(quantized_model, 8, weight_int_bits, input_int_bits)
        quantized_model = quantized_model.to(device)

        # Evaluate the quantized model
        accuracy = evaluate(quantized_model, test_loader, device)

        print(f"Weight int bits: {weight_int_bits}, Input int bits: {input_int_bits}, Accuracy: {accuracy:.2f}%")

        if accuracy > best_accuracy:
            best_accuracy = accuracy
            best_weight_int_bits = weight_int_bits
            best_input_int_bits = input_int_bits

print(f"Best weight int bits: {best_weight_int_bits}, Best input int bits: {best_input_int_bits}, Best Accuracy: {best_accuracy:.2f}%")

Weight int bits: 2, Input int bits: 2, Accuracy: 67.22%
Weight int bits: 2, Input int bits: 3, Accuracy: 67.40%
Weight int bits: 2, Input int bits: 4, Accuracy: 66.90%
Weight int bits: 3, Input int bits: 2, Accuracy: 42.44%
Weight int bits: 3, Input int bits: 3, Accuracy: 41.84%
Weight int bits: 3, Input int bits: 4, Accuracy: 39.53%
Weight int bits: 4, Input int bits: 2, Accuracy: 1.11%
Weight int bits: 4, Input int bits: 3, Accuracy: 1.06%
Weight int bits: 4, Input int bits: 4, Accuracy: 1.04%
Best weight int bits: 2, Best input int bits: 3, Best Accuracy: 67.40%


### Quantized Model and train with int_bit [2, 3]

In [None]:
quantized_model = copy.deepcopy(model)

weight_int_bits = 2
input_int_bits = 3

quantize_conv2d(quantized_model, 8, weight_int_bits, input_int_bits)
quantized_model = quantized_model.to(device)

#### Test Quantization model

In [None]:
## quantize test 0
a = torch.arange(100)/10-5
aa = fixed_point_quantize_weights(a, 8, 2)
print(f"a: {a}")
print(f"aa: {aa}")
diff_a = a-aa
print(f"diff_a.std: {diff_a.std()}")
print(f"diff_a.rms_mean: {torch.mean(torch.sqrt(diff_a**2))}")

## quantize test
with torch.no_grad():
    img = (torch.rand((1,3,224,224))-0.5).to(device)
    X = model.conv1(img)
    print("X: ", X[0][0])
    Xq = quantized_model.conv1(img)
    print("Xq: ", Xq[0][0])
    diff_X = X-Xq
    print(f"diff_X.std: {diff_X.std()}")
    print(f"diff_X.rms_mean: {torch.mean(torch.sqrt(diff_X**2))}")
    print(f"diff_X.max: {diff_X.max()}")
    layer = model.layer1[0]
    layerq = quantized_model.layer1[0]
    Y = layer(X)
    Yq = layerq(Xq)
    diff_Y = Y-Yq
    print(f"diff_Y.std: {diff_Y.std()}")
    print(f"diff_Y.rms_mean: {torch.mean(torch.sqrt(diff_Y**2))}")
    print(f"diff_Y.max: {diff_Y.max()}")
    bn = layer.bn1
    bn.eval()
    Z = bn(X)
    ZZ = X.clone()
    for idx in range(X.shape[1]):
        ZZ[0,idx,:,:] -= bn.running_mean[idx]
        ZZ[0,idx,:,:] /= torch.sqrt(bn.running_var[idx]+bn.eps)
        ZZ[0,idx,:,:] *= bn.weight[idx]
        ZZ[0,idx,:,:] += bn.bias[idx]
    # ZZ = (X-bn.running_mean.view(1,-1,1,1))/torch.sqrt(bn.running_var.view(1,-1,1,1)+bn.eps)
    # ZZ = ZZ * bn.weight.view(1,-1,1,1) + bn.bias.view(1,-1,1,1)
    diff_Z = Z-ZZ
    print(f"diff_Z.std: {diff_Z.std()}")
    print(f"diff_Z.rms_mean: {torch.mean(torch.sqrt(diff_Z**2))}")
    print(f"diff_Z.max: {diff_Z.max()}")

a: tensor([-5.0000, -4.9000, -4.8000, -4.7000, -4.6000, -4.5000, -4.4000, -4.3000,
        -4.2000, -4.1000, -4.0000, -3.9000, -3.8000, -3.7000, -3.6000, -3.5000,
        -3.4000, -3.3000, -3.2000, -3.1000, -3.0000, -2.9000, -2.8000, -2.7000,
        -2.6000, -2.5000, -2.4000, -2.3000, -2.2000, -2.1000, -2.0000, -1.9000,
        -1.8000, -1.7000, -1.6000, -1.5000, -1.4000, -1.3000, -1.2000, -1.1000,
        -1.0000, -0.9000, -0.8000, -0.7000, -0.6000, -0.5000, -0.4000, -0.3000,
        -0.2000, -0.1000,  0.0000,  0.1000,  0.2000,  0.3000,  0.4000,  0.5000,
         0.6000,  0.7000,  0.8000,  0.9000,  1.0000,  1.1000,  1.2000,  1.3000,
         1.4000,  1.5000,  1.6000,  1.7000,  1.8000,  1.9000,  2.0000,  2.1000,
         2.2000,  2.3000,  2.4000,  2.5000,  2.6000,  2.7000,  2.8000,  2.9000,
         3.0000,  3.1000,  3.2000,  3.3000,  3.4000,  3.5000,  3.6000,  3.7000,
         3.8000,  3.9000,  4.0000,  4.1000,  4.2000,  4.3000,  4.4000,  4.5000,
         4.6000,  4.7000,  4.8000,  4

### Train Loop

In [None]:
# Initializing parameters with zeroes
total_train = torch.zeros(num_epochs)
correct_train = torch.zeros(num_epochs)
avg_loss_train = torch.zeros(num_epochs)
accuracy_train = torch.zeros(num_epochs)

# TRAINING LOOP
print("START TRAINING........")
train_losses = [] # store training loss for each batch
train_accuracies = [] # store training accuracy for each batch
val_accuracies = [] #store validation accuracy after each epoch

for epoch in range(num_epochs):
  quantized_model.train() # Set the model to training mode
  batch_losses = []
  batch_accuracies = []

  for input, target in train_loader:
      input, target = input.to(device), target.to(device)

      # forward
      output = quantized_model(input)
      loss = criterion(output, target)

      # backward
      optimizer.zero_grad()
      loss.backward()
      optimizer.step()

      # *** Add gradient clipping here ***
      torch.nn.utils.clip_grad_norm_(quantized_model.parameters(), max_norm=1)

      # save data
      batch_losses.append(loss.item())
      _, predicted = output.max(1)
      total = target.size(0)
      correct = predicted.eq(target).sum().item()
      batch_accuracies.append(100. * correct / total)

  train_losses.append(batch_losses)
  train_accuracies.append(batch_accuracies)
  avg_loss_train[epoch] = np.mean(batch_losses)
  accuracy_train[epoch] = np.mean(batch_accuracies)

  # Validation after each epoch
  val_accuracy = evaluate(quantized_model, test_loader, device)
  val_accuracies.append(val_accuracy)

  checkpoint_path = '/content/drive/My Drive/Colab Notebooks/checkpoints/quantized_checkpoint.pth'
  if (epoch + 1) % 2 == 1:
        save_checkpoint(quantized_model, optimizer, epoch, checkpoint_path)
  print(f"Epoch [{epoch+1}/{num_epochs}] - "
        f"Train Loss: {avg_loss_train[epoch]:.4f} - "
        f"Train Accuracy: {accuracy_train[epoch]:.2f}% - "
        f"Validation Accuracy: {val_accuracy:.2f}%")

START TRAINING........
Epoch [1/6] - Train Loss: 0.3345 - Train Accuracy: 92.63% - Validation Accuracy: 72.57%
Epoch [2/6] - Train Loss: 0.3351 - Train Accuracy: 92.75% - Validation Accuracy: 72.98%
Epoch [3/6] - Train Loss: 0.3382 - Train Accuracy: 92.45% - Validation Accuracy: 72.33%
Epoch [4/6] - Train Loss: 0.3309 - Train Accuracy: 92.74% - Validation Accuracy: 72.59%
Epoch [5/6] - Train Loss: 0.3350 - Train Accuracy: 92.60% - Validation Accuracy: 72.46%
Epoch [6/6] - Train Loss: 0.3345 - Train Accuracy: 92.72% - Validation Accuracy: 72.66%
