## 12. Post-Training Quantization

In [1]:
import numpy as np
import pandas as pd
from PIL import Image
import tensorflow as tf
import time
import os

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
path = '/content/drive/MyDrive/cs2/'

In [4]:
#https://www.kaggle.com/titericz/building-and-visualizing-masks
#https://www.kaggle.com/paulorzp/rle-functions-run-lenght-encode-decode

#defining function for converting EncodedPixels(rle: run length encoding) to mask
def rle2mask(rle_string, img_shape=(256,1600)):
    '''
    input: EncodedPixels (run-length-encoded) string & image shape:-(width,height)
    output: mask in numpy.ndarray format with shape (256,1600)
    '''
    rle_array = np.array([int(s)for s in rle_string.split()])
    starts_array = rle_array[::2]-1
    lengths_array = rle_array[1::2]
    mask_array = np.zeros(img_shape[0]*img_shape[1],dtype=np.uint8)
    #print(starts_array,lengths_array)
    for i in range(len(starts_array)):
        mask_array[starts_array[i]:starts_array[i]+lengths_array[i]] = 1
    #order='F' because encoded pixels are numbered from top to bottom, then left to right
    return mask_array.reshape(img_shape, order = 'F')

#defining function for converting given mask to EncodedPixels(rle: run length encoding)
def mask2rle(mask_array):
    '''
    input: mask in numpy.ndarray format
    output: EncodedPixels (run-length-encoded) string
    '''
    mask_array = mask_array.T.flatten()
    mask_array = np.concatenate([[0], mask_array, [0]])
    rle_array = np.where(mask_array[1:]!=mask_array[:-1])[0]+1
    rle_array[1::2] -= rle_array[::2]
    rle_string = ' '.join(map(str,rle_array))
    return rle_string

#defining function for calculation of metric dice coefficient
def dice_coefficient(y_true, y_pred):
    y_true_f = tf.reshape(y_true, [-1])
    y_pred_f = tf.reshape(y_pred, [-1])
    intersection = tf.math.reduce_sum(y_true_f * y_pred_f)
    smoothing_const = 1e-9
    return (2. * intersection + smoothing_const) / (tf.math.reduce_sum(y_true_f) + tf.math.reduce_sum(y_pred_f) + smoothing_const)

#defining function for calculation of loss function: binary cross entropy + dice loss
def bce_dice_loss(y_true, y_pred):
    y_true_f = tf.reshape(y_true, [-1])
    y_pred_f = tf.reshape(y_pred, [-1])
    return tf.keras.losses.binary_crossentropy(y_true, y_pred) + (1-dice_coefficient(y_true, y_pred))

### 12.1 Quantization

In [None]:
#Loading Unet++ trained model
objects = {'bce_dice_loss':bce_dice_loss, 'dice_coefficient': dice_coefficient}
model = tf.keras.models.load_model(path + 'unet_pp.h5', custom_objects = objects)

In [None]:
#Applying post-training quantization technique to our trained 'UNet++' model 
#https://www.tensorflow.org/lite/performance/post_training_quantization

tf_lite_converter = tf.lite.TFLiteConverter.from_keras_model(model)
tf_lite_converter.optimizations = [tf.lite.Optimize.OPTIMIZE_FOR_SIZE]
tflite_model = tf_lite_converter.convert()

INFO:tensorflow:Assets written to: /tmp/tmps4o5z6c1/assets


In [None]:
#saving the lite model
open(path+ 'tflite_model.tflite', "wb").write(tflite_model)

321360

### 12.2 Size Comparision

In [None]:
def get_file_size(file_path):
    size = os.path.getsize(file_path)
    return size

def convert_bytes(size, unit=None):
    if unit == "KB":
        return print('File size: ' + str(round(size / 1024, 3)) + ' Kilobytes')
    elif unit == "MB":
        return print('File size: ' + str(round(size / (1024 * 1024), 3)) + ' Megabytes')
    else:
        return print('File size: ' + str(size) + ' bytes')

In [None]:
convert_bytes(get_file_size(path + 'unet_pp.h5'), "KB")

File size: 973.109 Kilobytes


In [None]:
convert_bytes(get_file_size(path+'tflite_model.tflite'), "KB")

File size: 313.828 Kilobytes


### 12.3 performance comparision

In [5]:
#function to evaluate the performance of tf keras model
def evaluate_model(X, Y):
    '''
    X: List of Image Ids
    Y: List of tuples(containing rles of actual/ground-truth masks) for the corresponding Image Ids
    returns: dice-coeeficient calculated based on the the predictions
    '''        
    
    smoothing_const = 1e-9
    intersection = 0
    denominator = 0
    batch = np.empty((10,256,1600,3),dtype=np.float32)
    y_actual_masks = np.empty((10,256,1600,4),dtype=np.uint8)

    for i in range(0,len(X),10):
        batch_idcs =  list(range(i, min(len(X), i + 10)))
        if len(batch_idcs)== 10:        
            for i, idx in enumerate(batch_idcs):
                img = Image.open( path + 'data/train_images/' + X[idx])
                batch[i,] = img#input image
        else:
            batch = np.empty((len(batch_idcs),256,1600,3),dtype=np.float32)
            for i, idx in enumerate(batch_idcs):
                img = Image.open( path + 'data/train_images/' + X[idx])
                batch[i,] = img#input image
            y_actual_masks = np.empty((len(batch_idcs),256,1600,4),dtype=np.uint8) 

        y_pred_masks = model.predict(batch).round().astype(int)
        
        for j, idx in enumerate(batch_idcs):
            y_actual_masks[j,:,:,0] = rle2mask(Y[idx][0])
            y_actual_masks[j,:,:,1] = rle2mask(Y[idx][1])
            y_actual_masks[j,:,:,2] = rle2mask(Y[idx][2])
            y_actual_masks[j,:,:,3] = rle2mask(Y[idx][3])
        
        #batchwise calculation for dice coefficient
        y_true = y_actual_masks.flatten()
        y_pred = y_pred_masks.flatten()
        intersection  = intersection + (np.sum(y_true * y_pred))
        denominator  = denominator + (np.sum(y_true) + np.sum(y_pred))
    dc = (2*intersection + smoothing_const)/(denominator+smoothing_const)
    return dc

In [10]:
#function to evaluate the performance of lite model
def evaluate_lite_model(X, Y):
    '''
    X: List of Image Ids
    Y: List of tuples(containing rles of actual/ground-truth masks) for the corresponding Image Ids
    returns: dice-coeeficient calculated based on the the predictions
    '''        
    interpreter = tf.lite.Interpreter(model_path = path + 'tflite_model.tflite')
    input_details = interpreter.get_input_details()
    output_details = interpreter.get_output_details()

    interpreter.resize_tensor_input(input_details[0]['index'], (10, 256,1600,3))
    interpreter.resize_tensor_input(output_details[0]['index'], (10, 256,1600,4))
    interpreter.allocate_tensors()
    input_details = interpreter.get_input_details()
    output_details = interpreter.get_output_details()

    smoothing_const = 1e-9
    intersection = 0
    denominator = 0
    batch = np.empty((10,256,1600,3),dtype=np.float32)
    y_actual_masks = np.empty((10,256,1600,4),dtype=np.uint8)
    
    for i in range(0,len(X),10):
        batch_idcs =  list(range(i, min(len(X), i + 10)))
        if len(batch_idcs)== 10:        
            for i, idx in enumerate(batch_idcs):
                img = Image.open( path + 'data/train_images/' + X[idx])
                batch[i,] = img#input image
            
            interpreter.set_tensor(input_details[0]['index'], batch)
            interpreter.invoke()
            y_pred_masks = interpreter.get_tensor(output_details[0]['index']).round().astype(int)

        else:
            batch = np.empty((len(batch_idcs),256,1600,3),dtype=np.float32)
            for i, idx in enumerate(batch_idcs):
                img = Image.open( path + 'data/train_images/' + X[idx])
                batch[i,] = img#input image
            interpreter.resize_tensor_input(input_details[0]['index'], (len(batch_idcs), 256,1600,3))
            interpreter.resize_tensor_input(output_details[0]['index'], (len(batch_idcs), 256,1600,4))
            interpreter.allocate_tensors()
            interpreter.set_tensor(interpreter.get_input_details()[0]['index'], batch)
            interpreter.invoke()
            y_pred_masks = interpreter.get_tensor(interpreter.get_output_details()[0]['index']).round().astype(int)
            y_actual_masks = np.empty((len(batch_idcs),256,1600,4),dtype=np.uint8)

        
        for j, idx in enumerate(batch_idcs):
            y_actual_masks[j,:,:,0] = rle2mask(Y[idx][0])
            y_actual_masks[j,:,:,1] = rle2mask(Y[idx][1])
            y_actual_masks[j,:,:,2] = rle2mask(Y[idx][2])
            y_actual_masks[j,:,:,3] = rle2mask(Y[idx][3])
        
        #batchwise calculation for dice coefficient
        y_true = y_actual_masks.flatten()
        y_pred = y_pred_masks.flatten()
        intersection  = intersection + (np.sum(y_true * y_pred))
        denominator  = denominator + (np.sum(y_true) + np.sum(y_pred))
    dc = (2*intersection + smoothing_const)/(denominator+smoothing_const)
    return dc

In [7]:
#loading Validation Data dataframe
vali_data = pd.read_csv(path + "data/validtn_data.csv").fillna('')
x= vali_data['ImageId'].tolist()
y = list(vali_data[['rle_1', 'rle_2', 'rle_3', 'rle_4']].to_records(index = False))

In [8]:
#loading original model
model = tf.keras.models.load_model(path + 'unet_pp.h5', compile = False)

In [9]:
# checking performance of unet_pp.h5
dice_coeff = evaluate_model(x[0:100], y[0:100])
print('Dice Coefficient for validation data: ', dice_coeff)

Dice Coefficient for validation data:  0.5872662331855102


In [11]:
#loading lite model
interpreter = tf.lite.Interpreter(model_path = path + 'tflite_model.tflite')
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()
print("Input Shape:", input_details[0]['shape'])
print("Input Type:", input_details[0]['dtype'])
print("Output Shape:", output_details[0]['shape'])
print("Output Type:", output_details[0]['dtype'])

Input Shape: [   1  256 1600    3]
Input Type: <class 'numpy.float32'>
Output Shape: [   1  256 1600    4]
Output Type: <class 'numpy.float32'>


In [12]:
# checking performance of tflite model
dice_coeff = evaluate_lite_model(x[0:100], y[0:100])
print('Dice Coefficient for validation data: ', dice_coeff)

Dice Coefficient for validation data:  0.5880486725401608


Size reduced by almost 3 times and but the performance is almost same.