# Project ASPLOC - 1DT109 11201 (2024HT)
#### By Supun Madusanka

This script will use the LeNet NN architecture and quantize it to be used on hardware accelerator

Ref:
- [PyTorch tutorial on NN](https://pytorch.org/tutorials/beginner/blitz/neural_networks_tutorial.html)
- [PyTorch tutorial on NN pruning](https://pytorch.org/tutorials/intermediate/pruning_tutorial.html)
- [CNN](https://towardsdatascience.com/a-comprehensive-guide-to-convolutional-neural-networks-the-eli5-way-3bd2b1164a53)
- [NN quantization](https://towardsdatascience.com/introduction-to-weight-quantization-2494701b9c0c)

Requried libraries

In [1]:
%matplotlib inline

import torch
from torch import nn
import torch.nn.utils.prune as prune
import torch.nn.functional as F
import numpy as np
import torchvision
import torchvision.transforms as transforms
import serial


# Define relevant variables for the ML task
batch_size = 64
num_classes = 10
learning_rate = 0.001
num_epochs = 10

import sys
np.set_printoptions(threshold=sys.maxsize)

DOWNLOAD_DATA = False
DATA_PATH = '../data'

Load data

In [2]:
#Loading the dataset and preprocessing
train_dataset = torchvision.datasets.MNIST(root = DATA_PATH,
                                            train = True,
                                            transform = transforms.Compose([
                                                    transforms.Resize((32,32)),
                                                    transforms.ToTensor()
                                                #     ,
                                                #     transforms.Normalize(mean = (0.1307,), std = (0.3081,))
                                                    ]),
                                            download = DOWNLOAD_DATA)


test_dataset = torchvision.datasets.MNIST(root = DATA_PATH,
                                            train = False,
                                            transform = transforms.Compose([
                                                    transforms.Resize((32,32)),
                                                    transforms.ToTensor()
                                                #     ,
                                                #     transforms.Normalize(mean = (0.1325,), std = (0.3105,))
                                                    ]),
                                            download=DOWNLOAD_DATA)


train_loader = torch.utils.data.DataLoader(dataset = train_dataset,
                                            batch_size = batch_size,
                                            shuffle = True)


test_loader = torch.utils.data.DataLoader(dataset = test_dataset,
                                            batch_size = batch_size,
                                            shuffle = True)

Create a model with PyTorch
==============

In this tutorial, we use the
[LeNet](http://yann.lecun.com/exdb/publis/pdf/lecun-98.pdf) architecture
from LeCun et al., 1998.

In [3]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

class LeNet(nn.Module):
    def __init__(self):
        super(LeNet, self).__init__()
        # 1 input image channel, 6 output channels, 5x5 square conv kernel
        self.conv1 = nn.Conv2d(1, 6, 5)
        self.conv2 = nn.Conv2d(6, 16, 5)
        self.fc1 = nn.Linear(16 * 5 * 5, 120)  # 5x5 image dimension
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 10)

    def forward(self, x):
        x = F.max_pool2d(F.relu(self.conv1(x)), (2, 2))
        x = F.max_pool2d(F.relu(self.conv2(x)), 2)
        x = x.view(-1, int(x.nelement() / x.shape[0]))
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x


In [4]:
model = LeNet().to(device=device)

#Setting the loss function
cost = nn.CrossEntropyLoss()

#Setting the optimizer with the model parameters and learning rate
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

#this is defined to print how many steps are remaining when training
total_step = len(train_loader)

Train the model

In [None]:
total_step = len(train_loader)
for epoch in range(num_epochs):
    for i, (images, labels) in enumerate(train_loader):  
        images = images.to(device)
        labels = labels.to(device)
        
        #Forward pass
        outputs = model(images)
        loss = cost(outputs, labels)
        #Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        if (i+1) % 400 == 0:
            print ('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}' 
.format(epoch+1, num_epochs, i+1, total_step, loss.item()))

Test the model

In [None]:
# In test phase, we don't need to compute gradients (for memory efficiency)
    
with torch.no_grad():
    correct = 0
    total = 0
    for images, labels in test_loader:
            images = images.to(device)
            labels = labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    print('Accuracy of the network on the 10000 test images: {} %'.format(100 * correct / total))

Inspect the model

In [None]:
print(list(model.named_parameters()))

Save the model

In [17]:
torch.save(model, 'lenet_model.pth')

Skip the training if already done and restart from here by loading existing model

In [5]:
model = torch.load('lenet_model.pth', weights_only=False)

Unwrap model functions for quantization
=======================================

Here we will unwrap most of the functions for quantization and make it hardware friendly. The trained weights and biases from previous model is used inside the functions and the training phase is not needed.

In [6]:
def absmax_quantize(X:np.matrix) -> np.matrix:
    # Calculate scale
    scale = 127 / np.max(np.absolute(X))

    # Quantize
    X_quant = np.round(scale * X)

    return X_quant.astype(np.int16), scale

In [3]:
def fs256_quantize(X:np.matrix) -> np.matrix:
    # Fix scale
    scale = 256

    # Quantize
    X_quant = np.round(scale * X)

    return X_quant.astype(np.int16)

In [4]:
def fs256_dequantize(X:np.matrix) -> np.matrix:
    # Fix scale
    scale = 256

    # Quantize
    X_quant = np.round(X/scale)

    return X_quant.astype(np.int16)

In [5]:
def fs4_quantize(X:np.matrix) -> np.matrix:
    # Fix scale
    scale = 4

    # Quantize
    X_quant = np.round(scale * X)

    return X_quant.astype(np.int16)

Most of the scaling factors returned by absmax_quantize is close to 256

In [None]:
absmax_quantize(torch.detach(model.conv1.weight).numpy())

This class is the breakdown of the functionalities used above with the Torch library

In [10]:

class LeNet_quantize():
    def __init__(self, conv_layer_1w:np.ndarray, conv_layer_1b:np.ndarray, 
                 conv_layer_2w:np.ndarray, conv_layer_2b:np.ndarray,
                 dense_1_w:np.ndarray, dense_1_b:np.ndarray, 
                 dense_2_w:np.ndarray, dense_2_b:np.ndarray,
                 dense_3_w:np.ndarray, dense_3_b:np.ndarray):
        # 1 input image channel, 6 output channels, 5x5 square conv kernel
        self.conv1 = self.Conv2d(1, 6, 5, conv_layer_1w, conv_layer_1b)
        self.conv2 = self.Conv2d(6, 16, 5, conv_layer_2w, conv_layer_2b)
        self.fc1 = self.myDense2d(dense_1_w, dense_1_b)
        self.fc2 = self.myDense2d(dense_2_w, dense_2_b)
        self.fc3 = self.myDense2d(dense_3_w, dense_3_b)
    
    def Conv2d(self, inChan:int, outChan:int, kernalDim:int, weight: np.ndarray, bias: np.ndarray):
        myIn = inChan
        myOt = outChan
        myKr = kernalDim
        myWeight = weight
        myBias = bias

        def CalConv2d(img:np.ndarray) -> np.ndarray:
            inDim, imdim_r, imdim_c = img.shape
            outImg = np.zeros((myOt, imdim_r-myKr, imdim_c-myKr))
            for oc in range(myOt):
                bias_s = myBias[oc]
                for ic in range(myIn):
                    kernal = myWeight[oc][ic]
                    for kr_i in range(int(imdim_r-myKr)):
                        for kr_j in range(int(imdim_c-myKr)):
                            outImg[oc][kr_i][kr_j] += (kernal * img[ic][kr_i:kr_i+myKr,kr_j:kr_j+myKr]).sum() + bias_s
            return outImg
        return CalConv2d
    
    def myRelu(self, img:np.ndarray) -> np.ndarray:
        return img.clip(0)
    
    def maxPool2d(self, img:np.ndarray) -> np.ndarray:
        inDim, imdim_r, imdim_c = img.shape
        outImg = np.zeros_like(img, shape=(inDim, int((imdim_r+1)/2), int((imdim_c+1)/2)))
        for chn in range(inDim):
            for i in range(int((imdim_r+1)/2)):
                for j in range(int((imdim_c+1)/2)):
                    if(i*2 > imdim_r):
                        if(j*2 > imdim_c):
                            outImg[chn][i,j] = img[chn][i*2,j*2]
                        else:
                            outImg[chn][i,j] = (img[chn][i*2,j*2:j*2+2]).max()
                    else:
                        if(j*2 > imdim_c):
                            outImg[chn][i,j] = (img[chn][i*2:i*2+2,j*2]).max()
                        else:
                            outImg[chn][i,j] = (img[chn][i*2:i*2+2,j*2:j*2+2]).max()
        return outImg

    def myReshape(self, img:np.ndarray) -> np.ndarray:
        return np.reshape(img, (img.size, -1))
    
    def myDense2d(self, weight:np.ndarray, bias:np.ndarray) -> np.ndarray:
        myweight = weight
        mybias  = bias

        def dense(img:np.ndarray):
            return np.dot(myweight, img) +  mybias

        return dense


Load the weights and biases from the model and inspect if needed

In [36]:
cnv1w = torch.detach(model.conv1.weight).numpy()
cnv1b = torch.detach(model.conv1.bias).numpy()
cnv2w = torch.detach(model.conv2.weight).numpy()
cnv2b = torch.detach(model.conv2.bias).numpy()

den1w = torch.detach(model.fc1.weight).numpy()
den1b = np.reshape(torch.detach(model.fc1.bias).numpy(), (model.fc1.bias.size()[0], 1)) 
den2w = torch.detach(model.fc2.weight).numpy()
den2b = np.reshape(torch.detach(model.fc2.bias).numpy(), (model.fc2.bias.size()[0], 1)) 
den3w = torch.detach(model.fc3.weight).numpy()
den3b = np.reshape(torch.detach(model.fc3.bias).numpy(), (model.fc3.bias.size()[0], 1)) 

Fixed scale (256) quantized weights and biases

In [11]:
cnv1wq = fs256_quantize(torch.detach(model.conv1.weight).numpy())
cnv1bq = fs256_quantize(torch.detach(model.conv1.bias).numpy())
cnv2wq = fs256_quantize(torch.detach(model.conv2.weight).numpy())
cnv2bq = fs256_quantize(torch.detach(model.conv2.bias).numpy())

den1wq = fs256_quantize(torch.detach(model.fc1.weight).numpy())
den1bq = fs256_quantize(np.reshape(torch.detach(model.fc1.bias).numpy(), (model.fc1.bias.size()[0], 1)) )
den2wq = fs256_quantize(torch.detach(model.fc2.weight).numpy())
den2bq = fs256_quantize(np.reshape(torch.detach(model.fc2.bias).numpy(), (model.fc2.bias.size()[0], 1)) )
den3wq = fs256_quantize(torch.detach(model.fc3.weight).numpy())
den3bq = fs256_quantize(np.reshape(torch.detach(model.fc3.bias).numpy(), (model.fc3.bias.size()[0], 1)) )

In [58]:
with open(DATA_PATH+'/np_data/cnv1wq.npy', 'wb') as f:
    np.save(f, cnv1wq)
with open(DATA_PATH+'/np_data/cnv1bq.npy', 'wb') as f:
    np.save(f, cnv1bq)
with open(DATA_PATH+'/np_data/cnv2wq.npy', 'wb') as f:
    np.save(f, cnv2wq)
with open(DATA_PATH+'/np_data/cnv2bq.npy', 'wb') as f:
    np.save(f, cnv2bq)
with open(DATA_PATH+'/np_data/den1wq.npy', 'wb') as f:
    np.save(f, den1wq)
with open(DATA_PATH+'/np_data/den1bq.npy', 'wb') as f:
    np.save(f, den1bq)
with open(DATA_PATH+'/np_data/den2wq.npy', 'wb') as f:
    np.save(f, den2wq)
with open(DATA_PATH+'/np_data/den2bq.npy', 'wb') as f:
    np.save(f, den2bq)
with open(DATA_PATH+'/np_data/den3wq.npy', 'wb') as f:
    np.save(f, den3wq)
with open(DATA_PATH+'/np_data/den3bq.npy', 'wb') as f:
    np.save(f, den3bq)

Unquantized model

In [37]:
mm = LeNet_quantize(cnv1w, cnv1b, cnv2w, cnv2b, den1w, den1b, den2w, den2b, den3w, den3b)

Quantized model

In [12]:
mmq = LeNet_quantize(cnv1wq, cnv1bq, cnv2wq, cnv2bq, den1wq, den1bq, den2wq, den2bq, den3wq, den3bq)

The forward function LeNet class is copied here. 
- This is used to cross-validate the individual layer outputs with the unwraped LeNet_quantize class. Keep the required layer to cross-check and comment the other layers and check the output as needed.

In [13]:
def trch_forward(x):
    x = F.max_pool2d(F.relu(F.conv2d(x, weight=model.conv1.weight, bias=model.conv1.bias)), (2, 2))
    x = F.max_pool2d(F.relu(F.conv2d(x, weight=model.conv2.weight, bias=model.conv2.bias)), 2)
    x = x.view(-1, int(x.nelement() / x.shape[0]))
    x = F.relu(F.linear(x, weight=model.fc1.weight, bias=model.fc1.bias))
    x = F.relu(F.linear(x, weight=model.fc2.weight, bias=model.fc2.bias))
    x = F.linear(x, weight=model.fc3.weight, bias=model.fc3.bias)
    return x

Correspondencing unwraped function

In [14]:
def my_forward(x):
    x = mm.maxPool2d(mm.myRelu(mm.conv1(x)))
    x = mm.maxPool2d(mm.myRelu(mm.conv2(x)))
    x = mm.myReshape(x)
    x = mm.myRelu(mm.fc1(x))
    x = mm.myRelu(mm.fc2(x))
    x = mm.fc3(x)
    return x

Correspondencing unwraped quantized function

In [15]:
def my_q_forward(x):
    x = mmq.maxPool2d(mmq.myRelu(mmq.conv1(x)))
    x = fs256_dequantize(x)
    x = mmq.maxPool2d(mmq.myRelu(mmq.conv2(x)))
    x = fs256_dequantize(x)
    x = mmq.myReshape(x)
    x = mmq.myRelu(mmq.fc1(x))
    x = fs256_dequantize(x)
    x = mmq.myRelu(mmq.fc2(x))
    x = fs256_dequantize(x)
    x = mmq.fc3(x)
    x = fs256_dequantize(x)
    return x

Validate original model

In [None]:
with torch.no_grad():
    correct = 0
    total = 0
    for images, labels in test_loader:
            images = images.to(device)
            labels = labels.to(device)
            outputs = trch_forward(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    print('Accuracy of the network on the 10000 test images: {} %'.format(100 * correct / total))

Validate unwraped model

> [Note] my_forward() function is incredibly slow compared to trch_forward()

- Obs. my_forward does not support batch input hence the for loop.
- Obs. a bit of accuracy drop due to no padding and other missing small fine tunes that present in Torch

In [None]:
# Test the model
# In test phase, we don't need to compute gradients (for memory efficiency)
    
with torch.no_grad():
      correct = 0
      total = 0
      for images, labels in test_loader:
            images = images.to(device)
            labels = labels.to(device)
            outputs = torch.zeros(images.shape[0])
            for i in range(images.shape[0]):
                  outputs[i] = np.argmax(my_forward(torch.Tensor.numpy(images[i])))
            total += labels.size(0)
            correct += (outputs == labels).sum().item()
      print('Accuracy of the network on the 10000 test images: {} %'.format(100 * correct / total))

Validate unwraped (FS) quantized model

In [None]:
# Test the model
# In test phase, we don't need to compute gradients (for memory efficiency)
    
with torch.no_grad():
      correct = 0
      total = 0
      for images, labels in test_loader:
            images = images.to(device)
            labels = labels.to(device)
            outputs = torch.zeros(images.shape[0])
            for i in range(images.shape[0]):
                  outputs[i] = np.argmax(my_q_forward(fs4_quantize(torch.Tensor.numpy(images[i]))))
            total += labels.size(0)
            correct += (outputs == labels).sum().item()
      print('Accuracy of the network on the 10000 test images: {} %'.format(100 * correct / total))

How to use QLeNet

In [None]:
from qLenet import QLeNet
mmn = QLeNet()
mmn.qLenet_forward(fs4_quantize(torch.Tensor.numpy(images[i])))

In [6]:
def tohex(val, nbits):
  hex_s = hex((val + (1 << nbits)) % (1 << nbits))
  return hex_s.split("x")[1].rjust(2, '0')

def save_q_val(fname, data):
    with open(fname, 'w') as fout:
        for chni in data:
            if(type(chni) == np.int16):
                val = tohex(chni, 8)
                fout.write(f'{val}\n')
            else:
                for ki in chni:
                    if(type(ki) == np.int16):
                        val = tohex(ki, 8)
                        fout.write(f'{val}\n')
                    else:
                        for row in ki:
                            if(type(row) == np.int16):
                                val = tohex(row, 8)
                                fout.write(f'{val}\n')
                            else:
                                for col in row:
                                    val = tohex(col, 8)
                                    fout.write(f'{val}\n')



In [46]:
save_q_val("../wnb/conv1_weight.mem", cnv1wq)
save_q_val("../wnb/conv1_bias.mem", cnv1bq)
save_q_val("../wnb/conv2_weight.mem", cnv2wq)
save_q_val("../wnb/conv2_bias.mem", cnv2bq)
save_q_val("../wnb/fs1_weight.mem", den1wq)
save_q_val("../wnb/fs1_bias.mem", den1bq)
save_q_val("../wnb/fs2_weight.mem", den2wq)
save_q_val("../wnb/fs2_bias.mem", den2bq)
save_q_val("../wnb/fs3_weight.mem", den3wq)
save_q_val("../wnb/fs3_bias.mem", den3bq)

In [7]:
def image_q_uart_str(data) -> bytearray:
    image_str:str = ''
    for chni in data:
        if(type(chni) == np.int16):
            val = tohex(chni, 8)
            image_str += f'{val}'
        else:
            for ki in chni:
                if(type(ki) == np.int16):
                    val = tohex(ki, 8)
                    image_str += f'{val}'
                else:
                    for row in ki:
                        if(type(row) == np.int16):
                            val = tohex(row, 8)
                            image_str += f'{val}'
                        else:
                            for col in row:
                                val = tohex(col, 8)
                                image_str += f'{val}'
    return bytearray.fromhex(image_str)

In [8]:
# connect the MINIZED and run this. change the COM port to the connected port
uartConnection = serial.Serial("COM8", 115200)

In [9]:
def predict_hw(img_test: bytearray) -> int:
    # Test the loopback
    uartConnection.write(img_test)
    hw_pred:bytearray
    # Read line   
    while True:
        hw_pred = uartConnection.readline()
        try:
            hw_say = hw_pred.decode("utf-8")
        except: UnicodeDecodeError
        
        if('Predict Result' in hw_say):
            return int(hw_say.split()[-1], 16)-1

In [None]:
with torch.no_grad():
      correct = 0
      total = 0
      for images, labels in test_loader:
            images = images.to(device)
            labels = labels.to(device)
            outputs = torch.zeros(images.shape[0])
            for i in range(images.shape[0]):
                  img_q4 = fs4_quantize(torch.Tensor.numpy(images[i]))
                  pred_sw_o = np.argmax(my_q_forward(img_q4))
                  pred_hw_o = predict_hw(image_q_uart_str(img_q4))
                  print(f'Label: {labels[i]}, SW Pred: {pred_sw_o}, HW Pred: {pred_hw_o}')
                  

The prediction time is largely limited by the UART connection. 

In [None]:
with torch.no_grad():
      correct = 0
      total = 0
      for images, labels in test_loader:
            outputs = torch.zeros(images.shape[0])
            for i in range(images.shape[0]):
                  img_q4 = fs4_quantize(torch.Tensor.numpy(images[i]))
                  outputs[i] = predict_hw(image_q_uart_str(img_q4))
                  print(f'{labels[i]}, {outputs[i]}')
            total += labels.size(0)
            correct += (outputs == labels).sum().item()
      print('Accuracy of the network on the 10000 test images: {} %'.format(100 * correct / total))
                  