# **SqueezeNet: 8-bit Fixed-point**

Steps:
1. Implement quantization function from float32 to low-precision in
quantization_utils.py
2. Run main.py. Get quantized weight, corresponding TF .pb and IR.json
3. (debug)Rebuild model with IR.json and quantized weight. Check results layer by layer.
4. Evaluate quantized model on ImageNet validation set to check accuracy
5. (optional) Retrain quantized model to reduce the accuracy gap compared to original
float32 model



#Setup Enviornment

https://pytorch.org/tutorials/advanced/super_resolution_with_onnxruntime.html

02_TVM_Tutorial_Relay:
https://colab.research.google.com/github/uwsampl/tutorial/blob/master/notebook/02_TVM_Tutorial_Relay.ipynb#scrollTo=1cdUL9-QU34l

In [None]:
from google.colab import drive
drive.mount('/content/gdrive', force_remount=True)
%cd gdrive/

Mounted at /content/gdrive


In [None]:
import tensorflow as tf
import numpy as np
import numpy as np

import torch
import torch.nn.functional as F
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms

from torchvision.datasets import ImageNet
from torch.utils.data import DataLoader

import matplotlib.pyplot as plt

import torch
from torchvision import transforms
from PIL import Image
import cv2
from google.colab.patches import cv2_imshow
import os
import time

#Quantization


In [None]:
use_cuda = True
if use_cuda and torch.cuda.is_available():
  # net.cuda()
  device = torch.device("cuda:0")

In [None]:
class SqueezeNet(object):

    def __init__(self, model_filepath):

        # The file path of model
        self.model_filepath = model_filepath
        # Initialize the model
        self.load_graph(model_filepath = self.model_filepath)

    def load_graph(self, model_filepath):
        '''
        Lode trained model.
        '''
        print('Loading model...')
        self.graph = tf.Graph()

        with tf.gfile.GFile(model_filepath, 'rb') as f:
            graph_def = tf.GraphDef()
            graph_def.ParseFromString(f.read())

        print('Check out the input placeholders:')
        nodes = [n.name + ' => ' +  n.op for n in graph_def.node if n.op in ('Placeholder')]
        for node in nodes:
            print(node)

        with self.graph.as_default():
        	# Define input tensor
        	self.input = tf.placeholder(np.float32, shape = [None,224, 224,3], name='input_fx') 
        	tf.import_graph_def(graph_def, {'input_fx': self.input})
         

        self.graph.finalize()

        print('Model loading complete!')
        """
        # Get layer names
        layers = [op.name for op in self.graph.get_operations()]
        for layer in layers:
            print(layer)
        """
        # In this version, tf.InteractiveSession and tf.Session could be used interchangeably. 
        # self.sess = tf.InteractiveSession(graph = self.graph)
        self.sess = tf.Session(graph = self.graph)

    def test(self, data):

        # Know your output node name
        output_tensor = self.graph.get_tensor_by_name("import/output_fx:0")
        output = self.sess.run(output_tensor, feed_dict = {self.input: data})

        return output

In [None]:
#PROCESS DATA
def preprocess(x):   
    '''
    Preprocessing required on the images for inference with mxnet gluon
    The function takes path to an image and returns processed tensor
    '''
    transform = transforms.Compose([#[1]
                                transforms.Resize(256),#[2]
                                transforms.CenterCrop(224),#[3]
                                transforms.ToTensor(),#[4]
                                transforms.Normalize(#5
                                                     mean=[0.485,0.456,0.406],#6
                                                     std =[0.229,0.224,0.225] #7
                                                    )])
    x = x[:,:,::-1]
    x = Image.fromarray(x,'RGB')
    x = transform(x)
    x = tf.transpose(x,perm=[1,2,0])  
    x = tf.expand_dims(x,0) # batchify

    return x

In [None]:
#CALCULATING MODEL ACCURACY
def model_accuracy(label, prediction):

  len_label=len(label)
  len_pred =len(prediction)

  if( len_label != len_pred):
    print("Warning: label"+str(len_label)+"and prediction"+ str(len_pred)+" have unequal lengths!")
   
  num_test = min(len_label,len_pred)
  count = label[0:num_test+1] == prediction[0:num_test+1]

  return np.sum(count)/num_test



#LOADING DATA
def loading_data(directory, start, batchsize):

    # Load and preprocess validation dataset
    x_test=[]
    filenames = []
    counts = 0

    tic = time.clock()
    
    for i in range(start, start+batchsize): #50001)

      num_zeros=5-len(str(i))
      path = directory+'/ILSVRC2012_val_000'+num_zeros*str(0)+str(i)+'.JPEG'
      x = cv2.imread(path)
      counts += 1
      # print (x)
      if x_test == [] :
        x_test = preprocess(x)
      else:
        x_test = tf.concat([x_test,preprocess(x)], axis=0)
      
      if counts % 100 == 0:
        toc = time.clock()
        print ('loaded %d images...' %counts, '%5fs'%(toc-tic))
        tic = time.clock()

    toc = time.clock()
    #convert x_test back to numpy
    with tf.Session() as sess:  x_test = x_test.eval(session=sess) 

    #loading y_test from file
    lfile = open('/content/gdrive/My Drive/209AS AI on Chips/val.txt')
    lines=lfile.readlines()
    y_test=[]

    for i in lines[start-1:start+batchsize-1]: #zero indexing
      y_test.append(int(i.split(' ')[-1]))
    
    print('loaded %d labels' %len(y_test))
    
    return x_test, np.array(y_test)


#PREDICTION
def test_from_frozen_graph(x_test, y_test):

    tf.reset_default_graph()

    test_prediction_onehot = model.test(data = x_test)
    test_prediction = np.argmax(test_prediction_onehot, axis = 3).reshape((-1))
    test_accuracy = model_accuracy(label = y_test, prediction = test_prediction)
    return test_accuracy

In [None]:
#Wrapper function
def Evaluation(start, batch_size, directory):

    x_test,y_test = loading_data(directory,start,batchsize) #preprocess and return x, read y 
    accuracy = test_from_frozen_graph(x_test, y_test)
    print('Tested %d images,' %batch_size, 'Accuracy = %f' %(accuracy*100), '%')

    return accuracy

## evaluation

In [None]:
# accuracies, loaded_files = Evaluation(filesindir, accuracies, loaded_files, 3, 10000, '/content/gdrive/My Drive/UCLA/AI on Chips/val_dataset')
model = SqueezeNet('/content/gdrive/My Drive/209AS AI on Chips/test_m1.pb') 

In [None]:
loaded_files, accuracies = [], []

start = 1
batchsize = 100

while( start + batchsize <= 1101):
  print("Start: " + str(start))
  accuracy = Evaluation(start, batchsize,'/content/gdrive/My Drive/209AS AI on Chips/valset')

  with open('/content/gdrive/My Drive/209AS AI on Chips/resultlog_m1.txt', 'a') as output:

    output.write('Start:'+str(start)+' Stop:'+str(start+batchsize)+ ' batchsize: ' +str(batchsize)+'\n' )
    output.write(str(accuracy) + '\n')

  print()
  start = start + batchsize


# Code for Quantization_utils

##prime method

In [None]:
import numpy as np
import tensorflow as tf
import math
# '''
# utility function
# with with block.suppress_stdout_stderr():
#     your code
# To hide stdout/stderr output i.e. from Tensorflow initialzation    
# '''
# from . import suppress_stdout_stderr as block


def tf_symbolic_convert(value, wl, fl):
    '''
    Convert float numpy array to wl-bit low precision data with Tensorflow API
    
    Inputs：
    - value : a numpy array of input data
    - wl : word length of the data format to convert
    - fl : fraction length (exponent length for floating-point)
    
    Returns:
    - val_fp : tf.Tensor as the symbolic expression for quantization 
    '''

    max_v = 0
    for i in range(int(wl-1),0, -1):
      max_v +=2**(i-fl-1)
    
    print ('max_value:', max_v, 'min_value:', -max_v)
    value = tf.convert_to_tensor(value)
    values_sign = tf.sign(value)

    val_fp = tf.abs(value)
    val_fp = tf.floor(val_fp)*values_sign

    values_frac = tf.abs(value)-tf.abs(val_fp)
    
    for i in range(fl):
      binary_base = 2**-(i+1)
      values_frac = values_frac * 2
      select = tf.greater_equal(values_frac, 1)
      select = tf.cast(select, value.dtype)
      values_frac = values_frac - select
      val_fp = val_fp + select * binary_base * values_sign
      

    remaining = tf.greater_equal(tf.abs(value-val_fp), 2**-(fl+1))
    remaining = tf.cast(remaining, value.dtype)
    
    val_fp = val_fp + remaining * 2**-fl * values_sign
    val_fp=tf.clip_by_value(val_fp, -max_v, max_v)
    print (val_fp.eval(session=tf.Session()))

    return val_fp

class Qnn:
    def __init__(self):
        pass
    
    # dtype convertion: basic functions           
    def to_fixedpoint(self, data_i, word_len, frac_len):
        return tf_symbolic_convert(data_i, word_len, frac_len)
    
    # utility function to convert symbolically or numerically
    def convert(self, data_i, word_len, frac_len, symbolic=False):
        if symbolic is True:
            data_q = self.to_fixedpoint(data_i, word_len, frac_len)
        else:
            with tf.Graph().as_default():
                data_q = self.to_fixedpoint(data_i, word_len, frac_len)
                with block.suppress_stdout_stderr():
                with tf.Session() as sess:
                    data_q = sess.run(data_q)
        return data_q    
        
    # error measurement
    def difference(self, data_q, data_origin):    
        '''
        Compute the difference before and after quantization
        
        Inputs：
        - data_q: a numpy array of quantized data
        - data_origin: a numpy array of original data
        
        Returns:
        - dif : numerical value of quantization error 
        '''
  
        dif= np.sum((data_q - data_origin)**2)

        return dif
    
    # search policy
    def search(self, data_i, word_len):
        '''
        Search for the optimal fraction length that leads to minimal quantization error for data_i
        
        Inputs：
        - data_i : a numpy array of original data
        - word_len : word length of quantized data
        
        Returns:
        - fl_opt : fraction length (python built-in int data type) that leads to minimal quantization error
        '''
        
        error = []
        frac_len = range(word_len)
        for f in frac_len:
          print ('frac_len = %d'%f)
          data = self.convert(data_i, word_len, f, symbolic=False)
          error.append(self.difference(data, data_i))
        
        print ('errors:', error)
        max_index = error.index(min(error))
        fl_opt = frac_len[max_index]
        print ('best fraction_length = %d' %fl_opt)
       
        return fl_opt
    
    # granularity
    def apply(self, data_i, word_len):
        fl_opt = self.search(data_i, word_len)
        data_q = self.convert(data_i, word_len, fl_opt)
        return data_q, fl_opt

##Method 1

In [None]:
def tf_symbolic_convert_m1(value, wl, fl):
  max_v = 0
  for i in range(int(wl-1),0, -1):
    max_v +=2**(i-fl-1)
  
  print ('max_value:', max_v, 'min_value:', -max_v)
  
  # values_sign = np.sign(value)
  min_value = np.min(value)
  print ('min_value of data = ', min_value, ', max value = ', min_value + max_v)
  quantize_step = 2**-fl
  print ('quantized step = ', quantize_step)
  
  choose = value >= (min_value + max_v)
  value[choose] = min_value + max_v
    
  val_fp = (value - min_value)/quantize_step
  
  #get integer part
  select = val_fp - np.floor(val_fp)
  select = select >= 0.5
  val_fp = np.floor(val_fp) + select * 1
  val_fp = val_fp * quantize_step + min_value
  print(val_fp)
 


  return val_fp

## testing the code

In [None]:
A = np.array([[1.345987, 20.3459874, 13.348574],[-8.230975, -13.835, 28.4842],[3.65489, 21.376, 28.4542],[3.648, -21.50975, 13.027395],[3.3430985, 21.875, -13.027395]])
x = Qnn()
print('random numpy input:\n', A)
print()
x.search(data_i=A, word_len=8)