<a href="https://colab.research.google.com/github/AyubQuadri/Assignment/blob/main/TASK.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [14]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, Subset


# Set device
from google.colab import drive
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
drive.mount('/content/gdrive')

# Save the model files
PATH = '/content/gdrive/My Drive/Colab Notebooks/chkpt_training_BN/'
No_Epochs = 25
# Define the neural network with 3 Linear layers and SiLU activation
class MLP(nn.Module):
    def __init__(self):
        super(MLP, self).__init__()
        self.flatten = nn.Flatten()
        self.silu = nn.SiLU()
        self.fc1 = nn.Linear(28*28, 512 ,bias=False)  # Input layer (flattened 28x28 images), 512 neurons
        self.fc2 = nn.Linear(512, 256, bias=False)     # Hidden layer with 256 neurons
        self.fc3 = nn.Linear(256, 10, bias=False)      # Output layer (10 classes for digits 0-9)

    def forward(self, x):
        x = self.flatten(x)  # Flatten the input image
        x = self.silu(self.fc1(x))
        x = self.silu(self.fc2(x))
        x = self.fc3(x)        # Output without activation for classification
        return x


# Define transformations for the dataset
transform = transforms.Compose([
    # transforms.RandomRotation(10),
    # transforms.RandomAffine(0, translate=(0.1,0.1)),
    transforms.ToTensor(),
    transforms.Normalize((0.1307,), (0.3081,))  # Normalize with mean and std of MNIST
])
no_imgs_eval_set = 128
mnist_data = datasets.MNIST(root='./data', train=True, transform=transform, download=True)
eval_indices = list(range(no_imgs_eval_set))
train_indices = list(range(no_imgs_eval_set, len(mnist_data)))

# Load the MNIST dataset
eval_set = Subset(mnist_data, eval_indices)
train_dataset = Subset(mnist_data, train_indices)
test_dataset = datasets.MNIST(root='./data', train=False, transform=transform)

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=1000, shuffle=False)
eval_loader = DataLoader(eval_set, batch_size=64, shuffle=False)

# Initialize model, loss function, and optimizer
model = MLP().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training loop
def train(model, device, train_loader, optimizer, criterion, epoch):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()
        if batch_idx % 100 == 0:
            print(f'Train Epoch: {epoch} [{batch_idx*len(data)}/{len(train_loader.dataset)}] Loss: {loss.item():.6f}')

# Test function
def test(model, device, test_loader):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            test_loss += criterion(output, target).item()  # Sum up batch loss
            pred = output.argmax(dim=1, keepdim=True)  # Get the index of the max log-probability
            correct += pred.eq(target.view_as(pred)).sum().item()

    test_loss /= len(test_loader.dataset)
    accuracy = 100. * correct / len(test_loader.dataset)
    print(f'\nTest set: Average loss: {test_loss:.4f}, Accuracy: {correct}/{len(test_loader.dataset)} ({accuracy:.2f}%)\n')

# Save checkpoint function
def save_checkpoint(epoch, model, optimizer, path):
  torch.save({
      'epoch': epoch,
      'model_state_dict': model.state_dict(),
      'optimizer_state_dict': optimizer.state_dict()
  }, path)

# Main training and testing loop

for epoch in range(1, No_Epochs+1):  # Train for 5 epochs
    train(model, device, train_loader, optimizer, criterion, epoch)
    test(model, device, test_loader)
    checkpoint_path= PATH + f'checkpoint_epoch_{epoch}.pth'
    save_checkpoint(epoch, model, optimizer, checkpoint_path)

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).
Train Epoch: 1 [0/59872] Loss: 2.325803
Train Epoch: 1 [6400/59872] Loss: 0.271597
Train Epoch: 1 [12800/59872] Loss: 0.179996
Train Epoch: 1 [19200/59872] Loss: 0.385996
Train Epoch: 1 [25600/59872] Loss: 0.364178
Train Epoch: 1 [32000/59872] Loss: 0.025599
Train Epoch: 1 [38400/59872] Loss: 0.307025
Train Epoch: 1 [44800/59872] Loss: 0.044439
Train Epoch: 1 [51200/59872] Loss: 0.104807
Train Epoch: 1 [57600/59872] Loss: 0.079351

Test set: Average loss: 0.0001, Accuracy: 9673/10000 (96.73%)

Train Epoch: 2 [0/59872] Loss: 0.046477
Train Epoch: 2 [6400/59872] Loss: 0.061661
Train Epoch: 2 [12800/59872] Loss: 0.077074
Train Epoch: 2 [19200/59872] Loss: 0.088976
Train Epoch: 2 [25600/59872] Loss: 0.098698
Train Epoch: 2 [32000/59872] Loss: 0.240937
Train Epoch: 2 [38400/59872] Loss: 0.163877
Train Epoch: 2 [44800/59872] Loss: 0.224243
Train Epoch: 2 [51200/5

In [15]:
#Util functions
import os
import tqdm
def print_size_of_model(model):
    torch.save(model.state_dict(), "temp_delme.p")
    print('Size (KB):', os.path.getsize("temp_delme.p")/1e3)
    os.remove('temp_delme.p')
    # return True
# Model analysis Current Weights and size of the model before Quantization
def Model_analysis(model):
  print('Weights Before Quantization')
  print(model.fc1.weight)
  print("\nData Type of Model:",model.fc1.weight.dtype)
  print('Size of the model before Quantization')
  print_size_of_model(model)
  test(model, device,eval_loader)
  # return True



In [16]:
# Load Model
Model_analysis(model)

Weights Before Quantization
Parameter containing:
tensor([[0.0088, 0.0624, 0.0083,  ..., 0.0267, 0.0469, 0.0489],
        [0.1012, 0.1306, 0.1138,  ..., 0.1344, 0.1305, 0.1118],
        [0.0758, 0.0833, 0.0674,  ..., 0.1113, 0.0828, 0.0540],
        ...,
        [0.1134, 0.1597, 0.1541,  ..., 0.1217, 0.1074, 0.1076],
        [0.0843, 0.0569, 0.1004,  ..., 0.1168, 0.1112, 0.0988],
        [0.1139, 0.0734, 0.0982,  ..., 0.0646, 0.0788, 0.0551]],
       device='cuda:0', requires_grad=True)

Data Type of Model: torch.float32
Size of the model before Quantization
Size (KB): 2142.121

Test set: Average loss: 0.0028, Accuracy: 126/128 (98.44%)



Quantization Techniques
1. Post Training Quantization (PTQ)
2. Quantize Aware Training (QAT)

# Post Training Quantization Method
Steps
1. Insert Min-Max based oberservers
2. Calibirate
3. Quantize the model
4. Compare the results with original vs Quantized model


In [17]:
class Quantized_MLP(nn.Module):
  def __init__(self):
    super(Quantized_MLP, self).__init__()
    self.quant = torch.quantization.QuantStub()
    self.flatten = nn.Flatten()
    self.silu = nn.SiLU()
    self.fc1 = nn.Linear(28*28, 512 ,bias=False)  # Input layer (flattened 28x28 images), 512 neurons
    self.fc2 = nn.Linear(512, 256, bias=False)     # Hidden layer with 256 neurons
    self.fc3 = nn.Linear(256, 10, bias=False)      # Output layer (10 classes for digits 0-9)
    self.dequant = torch.quantization.DeQuantStub()


  def forward(self, x):
    x = self.flatten(x)  # Flatten the input image
    x = self.quant(x)
    x = self.silu(self.fc1(x))
    x = self.silu(self.fc2(x))
    x = self.fc3(x)        # Output without activation for classification
    x = self.dequant(x)
    return x

In [26]:
quantized_model = Quantized_MLP().to(device)
quantized_model.load_state_dict(model.state_dict())
quantized_model.eval()

quantized_model.qconfig = torch.ao.quantization.default_qconfig
quantized_model = torch.ao.quantization.prepare(quantized_model)
quantized_model

Quantized_MLP(
  (quant): QuantStub(
    (activation_post_process): MinMaxObserver(min_val=inf, max_val=-inf)
  )
  (flatten): Flatten(start_dim=1, end_dim=-1)
  (silu): SiLU()
  (fc1): Linear(
    in_features=784, out_features=512, bias=False
    (activation_post_process): MinMaxObserver(min_val=inf, max_val=-inf)
  )
  (fc2): Linear(
    in_features=512, out_features=256, bias=False
    (activation_post_process): MinMaxObserver(min_val=inf, max_val=-inf)
  )
  (fc3): Linear(
    in_features=256, out_features=10, bias=False
    (activation_post_process): MinMaxObserver(min_val=inf, max_val=-inf)
  )
  (dequant): DeQuantStub()
)

Step 2: Calibrate the model using test set

In [27]:
test(quantized_model, device, eval_loader) # Calibered on the evalset saved


Test set: Average loss: 0.0028, Accuracy: 126/128 (98.44%)



In [28]:
print('Check the Quantized model details\n', quantized_model)

Check the Quantized model details
 Quantized_MLP(
  (quant): QuantStub(
    (activation_post_process): MinMaxObserver(min_val=-0.4242129623889923, max_val=2.821486711502075)
  )
  (flatten): Flatten(start_dim=1, end_dim=-1)
  (silu): SiLU()
  (fc1): Linear(
    in_features=784, out_features=512, bias=False
    (activation_post_process): MinMaxObserver(min_val=-60.48722839355469, max_val=36.19487380981445)
  )
  (fc2): Linear(
    in_features=512, out_features=256, bias=False
    (activation_post_process): MinMaxObserver(min_val=-88.3974609375, max_val=62.31698226928711)
  )
  (fc3): Linear(
    in_features=256, out_features=10, bias=False
    (activation_post_process): MinMaxObserver(min_val=-112.92228698730469, max_val=69.16704559326172)
  )
  (dequant): DeQuantStub()
)


Step 3: Quantize the model to pytorch default int8

In [29]:
quantized_model_converted = torch.ao.quantization.convert(quantized_model)
print('check statistics of the quantized model\n', quantized_model_converted)
# Check the weights of the quantized model
print('Check the weights of the quantized model\n',torch.int_repr(quantized_model_converted.fc1.weight()))

check statistics of the quantized model
 Quantized_MLP(
  (quant): Quantize(scale=tensor([0.0256], device='cuda:0'), zero_point=tensor([17], device='cuda:0'), dtype=torch.quint8)
  (flatten): Flatten(start_dim=1, end_dim=-1)
  (silu): SiLU()
  (fc1): QuantizedLinear(in_features=784, out_features=512, scale=0.761276364326477, zero_point=79, qscheme=torch.per_tensor_affine)
  (fc2): QuantizedLinear(in_features=512, out_features=256, scale=1.1867278814315796, zero_point=74, qscheme=torch.per_tensor_affine)
  (fc3): QuantizedLinear(in_features=256, out_features=10, scale=1.4337742328643799, zero_point=79, qscheme=torch.per_tensor_affine)
  (dequant): DeQuantize()
)
Check the weights of the quantized model
 tensor([[ 1,  7,  1,  ...,  3,  6,  6],
        [12, 16, 14,  ..., 16, 16, 13],
        [ 9, 10,  8,  ..., 13, 10,  6],
        ...,
        [14, 19, 18,  ..., 15, 13, 13],
        [10,  7, 12,  ..., 14, 13, 12],
        [14,  9, 12,  ...,  8,  9,  7]], device='cuda:0', dtype=torch.int8)

Step4: Compare the results of PTQ int8 model vs original fp32 model

In [30]:
print('Original weights: ')
print(model.fc1.weight)
print('')
print(f'Dequantized weights: ')
print(torch.dequantize(quantized_model_converted.fc1.weight()))
print('')

print("Check the model after quantization")
print_size_of_model(quantized_model_converted)


Original weights: 
Parameter containing:
tensor([[0.0088, 0.0624, 0.0083,  ..., 0.0267, 0.0469, 0.0489],
        [0.1012, 0.1306, 0.1138,  ..., 0.1344, 0.1305, 0.1118],
        [0.0758, 0.0833, 0.0674,  ..., 0.1113, 0.0828, 0.0540],
        ...,
        [0.1134, 0.1597, 0.1541,  ..., 0.1217, 0.1074, 0.1076],
        [0.0843, 0.0569, 0.1004,  ..., 0.1168, 0.1112, 0.0988],
        [0.1139, 0.0734, 0.0982,  ..., 0.0646, 0.0788, 0.0551]],
       device='cuda:0', requires_grad=True)

Dequantized weights: 
tensor([[0.0084, 0.0587, 0.0084,  ..., 0.0252, 0.0503, 0.0503],
        [0.1006, 0.1342, 0.1174,  ..., 0.1342, 0.1342, 0.1090],
        [0.0755, 0.0839, 0.0671,  ..., 0.1090, 0.0839, 0.0503],
        ...,
        [0.1174, 0.1593, 0.1509,  ..., 0.1258, 0.1090, 0.1090],
        [0.0839, 0.0587, 0.1006,  ..., 0.1174, 0.1090, 0.1006],
        [0.1174, 0.0755, 0.1006,  ..., 0.0671, 0.0755, 0.0587]],
       device='cuda:0')

Check the model after quantization
Size (KB): 539.49


In [42]:
print("Original Model accuracy")
test(model,device, eval_loader)
print('Quantized model accuracy')
test(quantized_model, device, eval_loader)

Original Model accuracy

Test set: Average loss: 0.0028, Accuracy: 126/128 (98.44%)

Quantized model accuracy

Test set: Average loss: 0.0028, Accuracy: 126/128 (98.44%)



2. Quantized Aware Training

In [46]:
class Quantized_MLP(nn.Module):
  def __init__(self):
    super(Quantized_MLP, self).__init__()
    self.quant = torch.quantization.QuantStub()
    self.flatten = nn.Flatten()
    self.silu = nn.SiLU()
    self.fc1 = nn.Linear(28*28, 512 ,bias=False)  # Input layer (flattened 28x28 images), 512 neurons
    self.fc2 = nn.Linear(512, 256, bias=False)     # Hidden layer with 256 neurons
    self.fc3 = nn.Linear(256, 10, bias=False)      # Output layer (10 classes for digits 0-9)
    self.dequant = torch.quantization.DeQuantStub()


  def forward(self, x):
    x = self.quant(x)
    x = self.flatten(x)  # Flatten the input image
    x = self.silu(self.fc1(x))
    x = self.silu(self.fc2(x))
    x = self.fc3(x)        # Output without activation for classification
    x = self.dequant(x)
    return x
QAT_model = Quantized_MLP().to(device)

In [47]:
QAT_model.qconfig = torch.ao.quantization.default_qconfig
QAT_model.train()
QAT_model_quantized = torch.ao.quantization.prepare_qat(QAT_model) # Insert observers
QAT_model_quantized

Quantized_MLP(
  (quant): QuantStub(
    (activation_post_process): MinMaxObserver(min_val=inf, max_val=-inf)
  )
  (flatten): Flatten(start_dim=1, end_dim=-1)
  (silu): SiLU()
  (fc1): Linear(
    in_features=784, out_features=512, bias=False
    (weight_fake_quant): MinMaxObserver(min_val=inf, max_val=-inf)
    (activation_post_process): MinMaxObserver(min_val=inf, max_val=-inf)
  )
  (fc2): Linear(
    in_features=512, out_features=256, bias=False
    (weight_fake_quant): MinMaxObserver(min_val=inf, max_val=-inf)
    (activation_post_process): MinMaxObserver(min_val=inf, max_val=-inf)
  )
  (fc3): Linear(
    in_features=256, out_features=10, bias=False
    (weight_fake_quant): MinMaxObserver(min_val=inf, max_val=-inf)
    (activation_post_process): MinMaxObserver(min_val=inf, max_val=-inf)
  )
  (dequant): DeQuantStub()
)

In [48]:
PATH_Quant = '/content/gdrive/MyDrive/Colab Notebooks/chkpt_training_quant/'
No_Epochs=6
for epoch in range(1, No_Epochs+1):  # Train for 5 epochs
    train(QAT_model_quantized, device, train_loader, optimizer, criterion, epoch)
    test(QAT_model_quantized, device, test_loader)
    checkpoint_path= PATH + f'checkpoint_epoch_{epoch}.pth'
    save_checkpoint(epoch, QAT_model_quantized, optimizer, checkpoint_path)

Train Epoch: 1 [0/59872] Loss: 2.301084
Train Epoch: 1 [6400/59872] Loss: 2.291529
Train Epoch: 1 [12800/59872] Loss: 2.287277
Train Epoch: 1 [19200/59872] Loss: 2.305913
Train Epoch: 1 [25600/59872] Loss: 2.301478
Train Epoch: 1 [32000/59872] Loss: 2.293453
Train Epoch: 1 [38400/59872] Loss: 2.284954
Train Epoch: 1 [44800/59872] Loss: 2.294887
Train Epoch: 1 [51200/59872] Loss: 2.302283
Train Epoch: 1 [57600/59872] Loss: 2.300330

Test set: Average loss: 0.0023, Accuracy: 1256/10000 (12.56%)

Train Epoch: 2 [0/59872] Loss: 2.300718
Train Epoch: 2 [6400/59872] Loss: 2.302331
Train Epoch: 2 [12800/59872] Loss: 2.293987
Train Epoch: 2 [19200/59872] Loss: 2.306480
Train Epoch: 2 [25600/59872] Loss: 2.298894
Train Epoch: 2 [32000/59872] Loss: 2.293580
Train Epoch: 2 [38400/59872] Loss: 2.302183
Train Epoch: 2 [44800/59872] Loss: 2.311525
Train Epoch: 2 [51200/59872] Loss: 2.298345
Train Epoch: 2 [57600/59872] Loss: 2.297176

Test set: Average loss: 0.0023, Accuracy: 1256/10000 (12.56%)

Tr