### **Weight-only quantization**

딥러닝 모델은 신경망 아키텍처에 가중치(weights, layer parameters)를 포함하고 있다.

딥러닝 모델에서 양자화는 가중치(weights, layer parameters)와 활성화(activations, layer output)에 대해서 가능하다.

Weights를 양자화하면 모델의 크기를 줄일 수 있다.

Activations를 양자화하면 모델 수행 속도를 향상시킬 수 있다.


Weights 양자화는 딥러닝 모델에서 weights를 추출한 후에 이를 양자화시켜서 다시 딥러닝 모델에 업로드시키는 것이다.

Tensor(layer)당 대칭 방식으로 양자화하는 방식으로, 전체 weights에 대해서 양자화하는 것은 다음과 같다.

### **Import Packages**

In [None]:
import os
#import pickle
#import time
#import datetime
import numpy as np #numpy 모듈을 불러온다
import matplotlib.pyplot as plt #데이터 시각화 함수 제공

import torch #Pytorch 라이브러리 기본 모듈, Tensor 연산과 자동 미분 지원
import torch.nn as nn #신경망을 구축하는데 필요한 모든 구성 요소를 제공
                      #이 모듈에는 레이어, activation 함수, loss function 등 신경망의 핵심 구성 요소가 포함되어 있음
import torch.nn.functional as F #신경망을 구성할 때 사용되는 함수들의 모음
                                #nn 모듈에 있는 클래스의 함수형 인터페이스를 제공(Activation function, loss function)
#import torch.optim as optim // 모델의 weights를 최적화하는 데 사용되는 최적화 알고리즘들을 제공한다. ex) SGD, Adam과 같은 최적화 알고리즘 포함

import torchvision.transforms as transforms #이미지 전처리에 사용되는 변환(transformations)을 제공 ex) 이미지를 tensor로 변환, 크기 조정, 정규화, 데이터 증강
from torchvision import datasets #다양한 데이터셋을 가져온다. ex) MNIST, CIFAR-10

In [None]:
#check if CUDA is available
train_on_gpu = torch.cuda.is_available()

if not train_on_gpu:
    print('CUDA is not available. Training on CPU ...')
else:
    print('CUDA is available! Training on GPU ...')

### **Accuracy Functions**

In [None]:
#output: model output
#target: correct answer
#topk: number of k best model output

def accuracy(output, target ,topk=(1,)):
  """
  Computes the accuracy over the k top predictions for the specified values of k
  IN top-5 accuracy you give yourself credit for having the right answer
  if the right answer appears in your top five guesses
  """
  with torch.no_grad(): #no need for gradient
    maxk = max(topk) #store biggest topk value
    batch_size=target.size(0) #check batch size // express "target" tensor's 0th dimension size

    _, pred = output.topk(maxk,1,True,True) #maxk=bring upper k value / dim=selected class dimension / 1st True=largest, if smallest use False / 2nd True=sort result? yes=True, no=False
    pred = pred.t() #transpose tensor pred

    #correct = pred.eq(target. view(1, -1).expand_as(pred))
    #correct = (pred == target.view(1, -1).expand_as(pred))
    correct = (pred == target.unsqueeze(dim=0)).expand_as(pred)

    res = []
    for k in topk:
      #correct_k = correct[:k].view(-1).float().sum(0, keepdim=True)
      correct_k = correct[:k].reshape(-1).float().sum(0, keepdim=True)
      res.append(correct_k.mul(1.0/batch_size))
    return res

class AverageMeter(object):
  """Computes and stores the average and current value"""

  def __init__(self, name, fmt=':f'): #floating point format
    self.name = name
    self.fmt = fmt
    self.reset()

  def reset(self):
    self.val=0
    self.avg=0
    self.sum=0
    self.count=0

  def update(self, val, n=1):
    self.val = val
    self.sum += val * n
    self.count += n
    self.avg = self.num / self.count

  def __str__(self):
    #fmtstr = '{name} {val' + self.fmt + '}({avg' + self.fmt+ '})'
    fmtstr = '{name} ({avg' + self.fmt +  '})'

    return fmtstr.format(**self.__dict__)

def norm1(X,Y):
  return np.sum(np.abs(X-Y))

def norm2(X,Y):
  return np.sqrt(np.sum(np.square(X-Y)))


## **define network architecture**

In [None]:
class Darknet19(nn.Module): # formal inheritance method in PyTorch / inherit Parent class nn.Module
  def __init__(self, num_classes: int = 1000): # 1000 is number of classes for neuralnet model to predict. It is default. you can reset it when calling the function
    super(Darknet19, self).__init__() # super(child class name, self).__init__()

    self.conv1 = nn.Conv2d(in_channels=3, out_channels=32, kernel_size=3, stride=1, padding=1, bias=False) #3*3 require zero padding / 1*1 does not require zero padding
    self.batchnorm1 = nn.BatchNorm2d(32) #32개 channel 출력값을 normalization, Z function(평균, 분산)-->(0,1)
    self.leaky_relu1 = nn.LeakyReLU(negative_slope=0.1) #Relu negative slope

    self.conv2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, stride=1, padding=1, bias=False)
    self.batchnorm2 = nn.BatchNorm2d(64)
    self.leaky_relu2 = nn.LeakyReLU(negative_slope=0.1)

    self.conv3 = nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, stride=1, padding=1, bias=False)
    self.batchnorm3 = nn.BatchNorm2d(128)
    self.leaky_relu3 = nn.LeakyReLU(negative_slope=0.1)
    self.conv4 = nn.Conv2d(in_channels=128, out_channels=64, kernel_size=1, stride=1, padding=0, bias=False)
    self.batchnorm4 = nn.BatchNorm2d(64)
    self.leaky_relu4 = nn.LeakyReLU(negative_slope=0.1)
    self.conv5 = nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, stride=1, padding=1, bias=False)
    self.batchnorm5 = nn.BatchNorm2d(128)
    self.leaky_relu5 = nn.LeakyReLU(negative_slope=0.1)

    self.conv6 = nn.Conv2d(in_channels=128, out_channels=256, kernel_size=3, stride=1, padding=1, bias=False)
    self.batchnorm6 = nn.BatchNorm2d(256)
    self.leaky_relu6 = nn.LeakyReLU(negative_slope=0.1)
    self.conv7 = nn.Conv2d(in_channels=256, out_channels=128, kernel_size=1, stride=1, padding=0, bias=False)
    self.batchnorm7 = nn.BatchNorm2d(128)
    self.leaky_relu7 = nn.LeakyReLU(negative_slope=0.1)
    self.conv8 = nn.Conv2d(in_channels=128, out_channels=256, kernel_size=3, stride=1, padding=1, bias=False)
    self.batchnorm8 = nn.BatchNorm2d(256)
    self.leaky_relu8 = nn.LeakyReLU(negative_slope=0.1)

    self.conv9 = nn.Conv2d(in_channels=256, out_channels=512, kernel_size=3, stride=1, padding=1, bias=False)
    self.batchnorm9 = nn.BatchNorm2d(512)
    self.leaky_relu9 = nn.LeakyReLU(negative_slope=0.1)
    self.conv10 = nn.Conv2d(in_channels=512, out_channels=256, kernel_size=1, stride=1, padding=0, bias=False)
    self.batchnorm10 = nn.BatchNorm2d(256)
    self.leaky_relu10 = nn.LeakyReLU(negative_slope=0.1)
    self.conv11 = nn.Conv2d(in_channels=256, out_channels=512, kernel_size=3, stride=1, padding=1, bias=False)
    self.batchnorm11 = nn.BatchNorm2d(512)
    self.leaky_relu11 = nn.LeakyReLU(negative_slope=0.1)
    self.conv12 = nn.Conv2d(in_channels=512, out_channels=256, kernel_size=1, stride=1, padding=0, bias=False)
    self.batchnorm12 = nn.BatchNorm2d(256)
    self.leaky_relu12 = nn.LeakyReLU(negative_slope=0.1)
    self.conv13 = nn.Conv2d(in_channels=256, out_channels=512, kernel_size=3, stride=1, padding=1, bias=False)
    self.batchnorm13 = nn.BatchNorm2d(512)
    self.leaky_relu13 = nn.LeakyReLU(negative_slope=0.1)

    self.conv14 = nn.Conv2d(in_channels=512, out_channels=1024, kernel_size=3, stride=1, padding=1, bias=False)
    self.batchnorm14 = nn.BatchNorm2d(1024)
    self.leaky_relu14 = nn.LeakyReLU(negative_slope=0.1)
    self.conv15 = nn.Conv2d(in_channels=1024, out_channels=512, kernel_size=1, stride=1, padding=0, bias=False)
    self.batchnorm15 = nn.BatchNorm2d(512)
    self.leaky_relu15 = nn.LeakyReLU(negative_slope=0.1)
    self.conv16 = nn.Conv2d(in_channels=512, out_channels=1024, kernel_size=3, stride=1, padding=1, bias=False)
    self.batchnorm16 = nn.BatchNorm2d(1024)
    self.leaky_relu16 = nn.LeakyReLU(negative_slope=0.1)
    self.conv17 = nn.Conv2d(in_channels=1024, out_channels=512, kernel_size=1, stride=1, padding=0, bias=False)
    self.batchnorm17 = nn.BatchNorm2d(512)
    self.leaky_relu17 = nn.LeakyReLU(negative_slope=0.1)
    self.conv18 = nn.Conv2d(in_channels=512, out_channels=1024, kernel_size=3, stride=1, padding=1, bias=False)
    self.batchnorm18 = nn.BatchNorm2d(1024)
    self.leaky_relu18 = nn.LeakyReLU(negative_slope=0.1)

    self.conv19 = nn.Conv2d(in_channels=1024, out_channels=num_classes, kernel_size=1, stride=1, padding=1, bias=False)
    self.avgpool= nn.AdaptiveAvgPool2d((1,1))
    self.max_pool2d=nn.MaxPool2d(2, stride=2)

  def forward(self, x):
    out=self.batchnorm1(self.conv1(x))
    out=self.leaky_relu1(out)
    out=self.max_pool2d(out)

    out=self.batchnorm2(self.conv2(x))
    out=self.leaky_relu2(out)
    out=self.max_pool2d(out)

    out=self.batchnorm3(self.conv3(x))
    out=self.leaky_relu3(out)
    out=self.batchnorm4(self.conv4(x))
    out=self.leaky_relu4(out)
    out=self.batchnorm5(self.conv5(x))
    out=self.leaky_relu5(out)
    out=self.max_pool2d(out)

    out=self.batchnorm6(self.conv6(x))
    out=self.leaky_relu6(out)
    out=self.batchnorm7(self.conv7(x))
    out=self.leaky_relu7(out)
    out=self.batchnorm8(self.conv8(x))
    out=self.leaky_relu8(out)
    out=self.max_pool2d(out)

    out=self.batchnorm9(self.conv9(x))
    out=self.leaky_relu9(out)
    out=self.batchnorm10(self.conv10(x))
    out=self.leaky_relu10(out)
    out=self.batchnorm11(self.conv11(x))
    out=self.leaky_relu11(out)
    out=self.batchnorm12(self.conv12(x))
    out=self.leaky_relu12(out)
    out=self.batchnorm13(self.conv13(x))
    out=self.leaky_relu13(out)
    out=self.max_pool2d(out)

    out=self.batchnorm14(self.conv14(x))
    out=self.leaky_relu14(out)
    out=self.batchnorm15(self.conv15(x))
    out=self.leaky_relu15(out)
    out=self.batchnorm16(self.conv16(x))
    out=self.leaky_relu16(out)
    out=self.batchnorm17(self.conv17(x))
    out=self.leaky_relu17(out)
    out=self.batchnorm18(self.conv18(x))
    out=self.leaky_relu18(out)

    out=self.conv19(out)
    out=self.avgpool(out)
    out=torch.flatten(out,1)

    return out

### **Build Model**

In [None]:
model = Darknet19()
#print(model)

#weight_path = "C:\Bigdata\"quantization\darknet19_224d.pth"
#model.Load_state_dict(torch.Load(weigtht_path, map_Location=torch.device('cpu')))

weight_path = './darknet19_224d.pth'
model.load_state_dict(torch.load(weight_path))

### **Load Dataset**

In [None]:
data_path = "\wrk\xsjhdnobkup5\taeheej\dataset\imagenet"
traindir = os.path.join(data_path, 'train')
valdir = os.path.join(data_path, 'val3k')

normalize = transforms.Normalize(mean=[0.0, 0.0, 0.0], std=[1,1,1]) #scale[0,1]--->normalization unpreceded

batch_size=32
num_workers=16

train_transform = transforms.Compose([
      transforms.RandomResizedCrop(224), #무작위로 이미지를 크롭(잘라내고)하고 224*224 pixel로 크기 조정
      transforms.RandomHorizontalFlip(), #이미지를 수평 방향으로 무작위로 뒤집기-데이터 다양성을 증가. 이미지 방향에 덜 민감
      transforms.ToTensor(), #이미지를 pytorch 텐서(pytorch 모델에서 처리할 수 있는 데이터 형식)로 변환. 이미지 데이터는 0에서 255 범위의 정수에서 0.0에서 1.0 범위의 부동소수점으로 스케일링
      normalize,
    ])

valid_transform=transforms.Compose([
      transforms.Resize(256), #모든 이미지의 크기를 256x256 픽셀로 조절한다.
      transforms.CenterCrop(224), #중앙을 기준으로 224x224로 크롭하여 크기 조정
      transforms.ToTensor(), #이미지를 pytorch 텐서로 변환
      normalize
    ])

train_dataset=datasets.ImageFolder(traindir, train_transform) #traindir 이미지를 가져와서 train_transform 전처리를 거치고 이미지 데이터셋을 생성한다.

valid_dataset=datasets.ImageFolder(valdir, valid_transform)

train_loader=torch.utils.data.DataLoader( #데이터 로드 및 처리
        train_dataset, #사용될 데이터셋
        batch_size=batch_size, #모델에 한번에 공급될 샘플수
        shuffle=True, #각 에프크(epoch) 시작시 데이터셋을 무작위로 섞어서 데이터의 순서에 의존하지 않도록 한다. overfitting을 방지한다.
        num_workers=num_workers,
        pin_memory=True) #Dataloader가 GPU에 복사하기 전에 CPU의 고정된 메모리 영역(pin된 메모리)에 로드하도록 한다. 이는 CPU에서 GPU로의 데이터 전송 속도를 향상시켜 학습 성능을 높인다.

valid_loader=torch.utils.data.DataLoader(
        valid_dataset,
        batch_size=batch_size,
        shuffle=False,
        num_workers=num_workers,
        pin_memory=True)

print("number of training dataset:%d" % len(train_dataset))
print("number of validation dataset:%d" % len(valid_dataset))

### **Accuracy of floating point32 model**

In [None]:
model.eval() #model.eval(): 모델을 평가 모드로 설정한다. 
             #이는 모델이 학습(training) 모드가 아닌, 평가(evaluation) 모드로 실행되어야 할 때 사용된다. 
             #평가 모드에서는 모델의 행동이 바뀌는 일부 레이어들(예를 들어, Dropout, BatchNorm 등)이 평가 상황에 적합하게 동작하도록 조정된다. 
             #예를 들어, Dropout 레이어는 학습 동안에는 일부 뉴런을 무작위로 비활성화하지만, 평가 모드에서는 모든 뉴런을 활성화 상태로 유지한다.

model.cuda() #model.cuda(): 모델의 매개변수와 버퍼를 CUDA를 사용 가능한 GPU 메모리로 이동시킵니다. 
             #이를 통해 모델이 GPU에서 실행될 수 있게 하며, GPU의 병렬 처리 능력을 활용하여 더 빠른 계산이 가능해집니다. 
             #이 메소드를 호출하기 전에는 모델이 CPU 메모리에 있었을 것이고, 이 호출을 통해 모델의 연산이 GPU에서 수행되도록 변경됩니다.

top1 = AverageMeter('Acc@1', ':6.4f') #batch 마다 평균을 구해서 그 정확도를 계산한다. 총 6자리를 표현하고 소수점 넷째자리까지 표시한다. Fixed point 형식으로 나타낸다
                                      #3.141592 --> 3.1416
top5 = AverageMeter('Acc@5', ':6.4f')
for data, target in valid_loader: #배치 단위로 data와 target을 순회 
    if train_on_gpu:
        data, target = data.cuda(), target.cuda() #data와 target을 GPU로 이동
    #forward pass: compute predicted outputs by passing inputs to the model
    output = model(data) #model에 data 전달
    acc1, acc5 = accuracy(output, target, topk=(1, 5)) #Top-1과 Top-5 정확도를 계산
    top1.update(acc1.item(), target.size(0)) #acc1.item()은 Top-1 정확도의 스칼라 값을 반환, target.size(0)는 현재 배치의 크기를 의미
    top5.update(acc5.item(), target.size(0)) 

##top1 accuracy, top5 accuracy
print(top1, top5)    

### **find eligible layers for quantization**

In [None]:
#이 코드는 결국 어떤 layer를 quantization 할것인지 정하는 부분

disallowed_layer_names = [] #특정 레이어 제고를 위해 사용, 나중에 빼고싶은 레이어 이름 추가 가능

#Linear operation, convolution, batchnorm
whitelist=[torch.nn.Linear, torch.nn.Conv1d, torch.nn.Conv2d, torch.nn.Conv3d, nn.BatchNorm2d] #linear=fully connected
whitelist_layer_types = tuple(whitelist) #whitelist 리스트를 튜플로 변환(튜플은 리스트와 유사하지만 변경 불가)
eligible_modules_list=[]
eligible_param_list=[]
for name, mod in model.named_modules(): #name=모듈이름, mod=모듈 자체
    if isinstance(mod, whitelist_layer_types) and name not in disallowed_layer_names: # ifinstance = mod가 whitelist_layer_types에 정의된 레이어 타입 중 하나인지 확인 
                                                                                      # 해당 모듈의 이름이 disallowed_layer_names 리스트에 없는지 확인
        eligible_modules_list.append((name, mod)) #조건을 만족하는 모듈 이름과 모듈 자체를 리스트에 추가
        eligible_param_list.append(name)

print(eligible_param_list)

### **Quantization in Deep Learning Model**

In [None]:
def quantize(X, NBIT=8):
    #symmetry quantization, per tensor
    #1. find threshold
    threshold = np.max([np.abs(np.max(X)), np.abs(np.min(X))])

    QRANGE = 2**(NBIT-1)

    #2. find p: decimal position of quantized value
    p = np.int(np.log2((QRANGE-1)/threshold))

    #3. quantize using 8 bit
    #3.1 apply threshold
    data_th = np.clip(X, -QRANGE*2**(-p), (QRANGE-1)*2**(-p))

    #3.2 calculate the scale factor for quantization
    SCALE = 2**(-p)

    #3.3 quantize (apply scale factor)
    #we are using a rounding function to force the quantized values to be the whole numbers which INT8 can represent
    data_qn = np.round(data_th/SCALE)

    #3.4 dequantize (simply reverse the quantization)
    data_dqn = data_qn *SCALE
    return data_dqn, p

In [None]:
#def quantize_asymmetric(X, NBIT=8):
    # Asymmetric quantization
#    QRANGE = 2**NBIT - 1

    # 1. Calculate min and max values
#    data_min = np.min(X)
#    data_max = np.max(X)

    # 2. Calculate scale and zero-point
#    scale = (data_max - data_min) / QRANGE
#    zero_point = np.round(-data_min / scale)

    # 3. Quantize
#    data_qn = np.round(X / scale + zero_point)

    # 4. Clip values to ensure they are within the valid range
#    data_qn = np.clip(data_qn, 0, QRANGE)

    # 5. Dequantize
#    data_dqn = (data_qn - zero_point) * scale
#    return data_dqn


In [None]:
#1. extract weight
#2, quantize weight
#3. Load quantized weight into the model

for name, param in model.named_parameters():
    layername = '.'.join(name.split('.')[:1]) #파라미터 이름에서 첫 번째 부분만을 추출하여 레이어의 이름을 얻는다 ex) conv1.weight -->conv1 추출

    if (layername in eligible_param_list) and (len(param.size()) in [1, 2, 4]): #현재 parameter 차원이 1,2,4 차원인지 확인한다
        #1차원 tensor = bias ex) nn.Linear(20, 10) layer는 20개의 입력 특징, 10개의 출력 뉴런. 이 layer의 bias는 10개의 요소를 갖는 1차원 텐서, 각 출력 뉴런에 대한 bias를 나타냄
        #2차원 tensor = fully connnected layer weight ex) nn.Linear(20, 10) layer의 weight는 10x20 크기의 2차원 tesnor로 20개 입력 틍징과 10개의 출력 뉴런 사이의 연결 강도 나타냄
        #4차원 tensor = convolution layer weight ex) nn.Conv2d(3,6,5) layer는 3개의 입력 channel, 6개의 출력 channel, 5x5 kernel을 갖는다. 이 Layer의 weight는 6x3x5x5 크기의 4차원 tensor임
        weight = param.cpu().detach().numpy() #parameter을 CPU로 이동시키고(param.cpu()), gradient 계산에서 분리(detach)하여 Numpy 배열로 변환
        dqn, _ = quantize(weight) #양자화된 data와 Scale factor를 반환, Scale factor는 사용 안하므로 '_'로 무시
        param.data = torch.from_numpy(dqn) #양자화된 데이터(dpn)를 다시 PyTorch Tensor로 변환하여 원래 parameter의 data에 할당(해당 파라미터를 양자화된 겂으로 업데이트)

In [None]:
model.eval()
model.cuda()

top1 = AverageMeter('Acc@1', ':6.4f')
top5 = AverageMeter('Acc@5', ':6.4f')
for data, target in valid_loader:
    if train_on_gpu:
        data, target = data.cuda(), target.cuda()
    #forward pass: compute predicted outputs by passing inputs to the model
    output = model(data)
    acc1, acc5 = accuracy(output, target, topk=(1,5))
    top1.update(acc1.item(), target.size(0))
    top5.update(acc5.item(), target.size(0))

print(top1, top5)

In [None]:
#save weight quantized model
torch.save(model.state_dict(), './darknet19_quan_weight1.pth') 