In [None]:
# Copyright 2022 Sony Semiconductor Solutions Corp. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Quantize Model (Image Classification Keras Model)

This notebook explains the workflow to quantize custom AI model using [Model Compression Toolkit (MCT)](https://github.com/sony/model_optimization).

Instructions are described in [README.md](./README.md).

## Imports

In [None]:
import glob
import json
import os
import pathlib
import re

import cv2
import model_compression_toolkit as mct
import numpy as np
import pandas as pd
import tensorflow as tf
from keras.models import load_model
from model_compression_toolkit import FolderImageLoader
from tensorflow.keras.applications.mobilenet import MobileNet
from tqdm import tqdm

## Load Configurations

Load the configuration file and set the variables.

In [None]:
with open("./configuration.json", "r") as f:
    app_configuration = json.load(f)

source_keras_model = app_configuration.get("source_keras_model", "")

dataset_image_dir = app_configuration["dataset_image_dir"]

batch_size = app_configuration["batch_size"]

input_tensor_size = app_configuration["input_tensor_size"]

iteration_count = app_configuration["iteration_count"]

output_dir = app_configuration["output_dir"]

evaluate_image_dir = app_configuration["evaluate_image_dir"]
evaluate_image_extension = '*.' + app_configuration["evaluate_image_extension"]
evaluate_ground_truth_file = app_configuration["evaluate_ground_truth_file"]
evaluate_result_dir = app_configuration["evaluate_result_dir"]

## Load Keras Model

In [None]:
if not source_keras_model:
    raise FileNotFoundError(
        f'Please set "source_keras_model" value in configuration.json file.')
else:
    if os.path.isfile(source_keras_model):
        # earlier style keras h5 file
        keras_model = load_model(source_keras_model)
    else:
        # later style keras Saved Model folder
        keras_model = tf.keras.models.load_model(source_keras_model)

## Quantize Keras Model

Quantize the Keras model using MCT with jpeg images for calibration.

In [None]:
# Define preprocessing for calibration (resize and normalization).
# The implementation depends on AI model's preprocessing of learning.
# The default implementation is for MobileNetV1.
# Please change the implementation according to the using AI model.

MEAN = 127.5
STD = 127.5
RESIZE_SCALE = 256 / input_tensor_size
SIZE = input_tensor_size

def resize(x):
    resize_side = max(RESIZE_SCALE * SIZE / x.shape[0], RESIZE_SCALE * SIZE / x.shape[1])
    height_tag = int(np.round(resize_side * x.shape[0]))
    width_tag = int(np.round(resize_side * x.shape[1]))
    resized_img = cv2.resize(x, (width_tag, height_tag))
    offset_height = int((height_tag - SIZE) / 2)
    offset_width = int((width_tag - SIZE) / 2)
    cropped_img = resized_img[offset_height:offset_height + SIZE,
                              offset_width:offset_width + SIZE]
    return cropped_img

def normalization(x):
    return (x - MEAN) / STD

# Create a representative data generator, which returns a list of images.
# The images can be preprocessed using a list of preprocessing functions.
image_data_loader = FolderImageLoader(dataset_image_dir,
                                      preprocessing=[resize, normalization],
                                      batch_size=batch_size)

In [None]:
# Create a Callable representative dataset for calibration purposes.
# The function must be called without any arguments, and must return a list numpy arrays
# (array for each model's input).
def representative_data_gen() -> list:
    return [image_data_loader.sample()]

# Quantize a model using the representative_data_gen as the calibration images.
# Set the number of calibration iterations.
quantized_keras_model, quantization_info = mct.keras_post_training_quantization(
    keras_model,
    representative_data_gen,
    n_iter=iteration_count)

## Convert from Keras to TFLite Model

Quantized TFLite model will be saved as **`model_quantized.tflite`** in **`output_dir`**.

In [None]:
converter_quantized = tf.lite.TFLiteConverter.from_keras_model(quantized_keras_model)
converter_quantized.optimizations = [tf.lite.Optimize.DEFAULT]
converter_quantized.inference_input_type = tf.uint8
tflite_model_quantized = converter_quantized.convert()

tflite_models_dir = pathlib.Path(output_dir)
tflite_models_dir.mkdir(exist_ok=True, parents=True)

tflite_model_file_quantized = tflite_models_dir/"model_quantized.tflite"
tflite_model_file_quantized.write_bytes(tflite_model_quantized)

Non-quantized TFLite model will be saved as **`model.tflite`** in **`output_dir`**.

In [None]:
converter = tf.lite.TFLiteConverter.from_keras_model(keras_model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
tflite_model = converter.convert()

tflite_models_dir = pathlib.Path(output_dir)
tflite_models_dir.mkdir(exist_ok=True, parents=True)

tflite_model_file = tflite_models_dir/"model.tflite"
tflite_model_file.write_bytes(tflite_model)

## Prepare Evaluation

Load ground truth. The file format is that class ids are listed per row in the same order as the image filename.

In [None]:
with open(evaluate_ground_truth_file) as f:
    ground_truth_ids = np.array([s.strip() for s in f.readlines()], dtype='uint')

Enumerate jpeg images.

In [None]:
def atoi(text):
    return int(text) if text.isdigit() else text

def natural_keys(text):
    return [atoi(c) for c in re.split(r'(\d+)', text)]

files = sorted(glob.glob(str(pathlib.Path(evaluate_image_dir)/evaluate_image_extension)),
               key=natural_keys)
if len(files) == 0:
    raise FileNotFoundError(
        f'Image for evaluation not found in the evaluate_image_dir: {evaluate_image_dir}')

# get images for evaluation
test_images = []
for idx, file in enumerate(files):
    filename = os.path.basename(file)
    info = dict()
    info['path'] = file
    info['imageID'] = filename
    test_images.append(info)

Define evaluate methods for TFLite model.

In [None]:
def evaluate_tflite_model(interpreter, images):
    input_details = interpreter.get_input_details()[0]
    output_details = interpreter.get_output_details()[0]

    # refs: https://github.com/tensorflow/models
    #       /research/slim/preprocessing/inception_preprocessing.py
    predictions = []
    for test_image in tqdm(images):
        if input_details['dtype'] == np.uint8:
            # Pre-processing: add batch dimension and convert to uint8 to match with
            # the model's input data format.
            image = tf.io.decode_jpeg(tf.io.read_file(test_image['path']), channels=3)
            # image = tf.image.central_crop(image, central_fraction=0.875)
            image = tf.expand_dims(image, 0)
            image = tf.compat.v1.image.resize_bilinear(image, [input_tensor_size, input_tensor_size],
                                                       align_corners=False)
            image = tf.cast(image, tf.uint8)
            interpreter.set_tensor(input_details["index"], image)
        else:
            # for non-quantized model.
            image = tf.io.decode_jpeg(tf.io.read_file(test_image['path']), channels=3)
            image = tf.image.convert_image_dtype(image, dtype=tf.float32)
            # image = tf.image.central_crop(image, central_fraction=0.875)
            image = tf.expand_dims(image, 0)
            image = tf.compat.v1.image.resize_bilinear(image, [input_tensor_size, input_tensor_size],
                                                       align_corners=False)
            image = tf.subtract(image, 0.5)
            image = tf.multiply(image, 2.0)
            interpreter.set_tensor(input_details["index"], image)

        # Run inference.
        interpreter.invoke()

        output = interpreter.tensor(output_details["index"])
        prediction = dict()
        prediction['imageID'] = test_image['imageID']
        prediction['predictions'] = [np.argmax(output()[0])]
        predictions.append(prediction)

    # Compare prediction results with ground truth labels to calculate accuracy.
    accurate_count = 0
    for index in range(len(predictions)):
        if predictions[index]['predictions'][0] == ground_truth_ids[index]:
            accurate_count += 1
    top_1_accuracy = accurate_count * 1.0 / len(predictions)

    return top_1_accuracy

Define evaluate methods for Keras model.

In [None]:
def evaluate_keras_model(data_dir, gt_file, model):
    with open(gt_file, 'r') as fp:
        labels = [line.strip() for line in fp.readlines()]

    def load_image(image_path):
        image = tf.io.decode_jpeg(tf.io.read_file(image_path), channels=3)
        image = tf.image.convert_image_dtype(image, tf.float32)
        # image = tf.image.central_crop(image, central_fraction=0.875)
        image = tf.expand_dims(image, 0)
        image = tf.compat.v1.image.resize_bilinear(image, [input_tensor_size, input_tensor_size],
                                                   align_corners=False)
        image = tf.squeeze(image, [0])
        return image

    def atoi(text):
        return int(text) if text.isdigit() else text

    def natural_keys(text):
        return [atoi(c) for c in re.split(r'(\d+)', text)]

    image_paths = sorted(
        glob.glob(str(pathlib.Path(evaluate_image_dir)/evaluate_image_extension)),
        key=natural_keys)
    image_paths = list(image_paths)
    images_ds = tf.data.Dataset.from_tensor_slices(
        [str(path) for path in image_paths]).map(load_image)
    labels_ds = tf.data.Dataset.from_tensor_slices(np.array(labels).astype(np.uint32))
    test_data = tf.data.Dataset.zip((images_ds, labels_ds)).shuffle(len(image_paths))

    model.trainable = False
    model.compile(
        loss=tf.keras.losses.SparseCategoricalCrossentropy(),
        metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])
    model.summary()

    test_result = model.evaluate(test_data.batch(1))

    return test_result[1]  # Top1 accuracy

Load non-quantized TFLite model.

In [None]:
tflite_models_dir = pathlib.Path(output_dir)
tflite_models_dir.mkdir(exist_ok=True, parents=True)

tflite_model_file = tflite_models_dir/"model.tflite"

interpreter = tf.lite.Interpreter(model_path=str(tflite_model_file))
interpreter.allocate_tensors()

Load quantized TFLite model.

In [None]:
tflite_models_dir = pathlib.Path(output_dir)
tflite_models_dir.mkdir(exist_ok=True, parents=True)

tflite_model_file_quantized = tflite_models_dir/"model_quantized.tflite"

interpreter_quantized = tf.lite.Interpreter(model_path=str(tflite_model_file_quantized))
interpreter_quantized.allocate_tensors()

## Evaluate

Evaluate non-quantized tflite model.

In [None]:
top_1_accuracy_tflite = evaluate_tflite_model(interpreter, test_images)
print(f'Top1 accuracy: non-quantized tflite: {top_1_accuracy_tflite}')

Evaluate quantized tflite model.

In [None]:
top_1_accuracy_tflite_quantized = evaluate_tflite_model(interpreter_quantized, test_images)
print(f'Top1 accuracy: quantized tflite: {top_1_accuracy_tflite_quantized}')

Evaluate non-quantized Keras model.

In [None]:
top_1_accuracy_keras = evaluate_keras_model(evaluate_image_dir,
                                            evaluate_ground_truth_file,
                                            keras_model)
print(f'\nTop1 accuracy: non-quantized keras: {top_1_accuracy_keras}')

Evaluate quantized Keras model.

In [None]:
top_1_accuracy_keras_quantized = evaluate_keras_model(evaluate_image_dir,
                                                      evaluate_ground_truth_file,
                                                      quantized_keras_model)
print(f'\nTop1 accuracy: quantized keras: {top_1_accuracy_keras_quantized}')

Print evaluation results.

In [None]:
df = pd.DataFrame([[top_1_accuracy_tflite],
                   [top_1_accuracy_tflite_quantized],
                   [top_1_accuracy_keras],
                   [top_1_accuracy_keras_quantized]],
                  index=['non-quantized tflite (model.tflite)',
                         'quantized tflite (model_quantized.tflite)',
                         'non-quantized keras',
                         'quantized keras'],
                  columns=['Top1 accuracy'])
df

Save evaluation results as **`results.json`** in **`evaluate_output_dir`**.

In [None]:
evaluate_output_dir = pathlib.Path(evaluate_result_dir)
evaluate_output_dir.mkdir(exist_ok=True, parents=True)

with open(evaluate_output_dir/"results.json", 'w') as f:
    results = dict()
    results['top_1_accuracy_keras'] = top_1_accuracy_keras
    results['top_1_accuracy_keras_quantized'] = top_1_accuracy_keras_quantized
    results['top_1_accuracy_tflite'] = top_1_accuracy_tflite
    results['top_1_accuracy_tflite_quantized'] = top_1_accuracy_tflite_quantized
    json.dump(results, f, ensure_ascii=False, indent=4)