In [None]:
import tensorflow as tf
import numpy as np

import os
import json

from model.ModelBuilder import ModelBuilder
from utils_train.Datagenerator import AnchorBox


from object_detection.core import post_processing
import functools

In [None]:
modelName = "MobileNetV3_PFH_SSD"
    
model_dir = "checkpoints/"
modelPart = modelName.split("_")

with open(os.path.join("model", "0_Config", modelName+".json"), "r") as config_file:
    config = json.load(config_file)

config['modelName'] = modelName
config['training_config']['num_classes'] = 80

In [None]:
model = ModelBuilder(config = config)
#model.load_weights("logs/_epoch190_mAP0.228").expect_partial()

In [None]:
_MAX_CLASSES_PER_DETECTION = 1
_DETECTION_POSTPROCESS_FUNC = 'TFLite_Detection_PostProcess'


class SSDModule(tf.Module):
    """Inference Module for TFLite-friendly SSD models."""
    def __init__(self, config, detection_model, max_detections=100, use_regular_nms=False):
        """Initialization.

        Args:
          pipeline_config: The original pipeline_pb2.TrainEvalPipelineConfig
          detection_model: The detection model to use for inference.
          max_detections: Max detections desired from the TFLite model.
          use_regular_nms: If True, TFLite model uses the (slower) multi-class NMS.
        """
        self._process_config(config)
        self._model = detection_model
        self._max_detections = max_detections
        self._use_regular_nms = use_regular_nms
        self._Anchors =  AnchorBox(config).get_anchors()

    def _process_config(self, config):
        self._num_classes = config['training_config']['num_classes']
        self._nms_score_threshold=0.3
        self._nms_iou_threshold=0.6
        self._scale_values = {}
        self._scale_values['y_scale']=10.0
        self._scale_values['x_scale']=10.0
        self._scale_values['h_scale']=5.0
        self._scale_values['w_scale']=5.0

    def input_shape(self):
        """Returns shape of TFLite model input."""
        return [1, config["model_config"]["target_size"], config["model_config"]["target_size"], 3]

    def postprocess_implements_signature(self):
        """Returns tf.implements signature for MLIR legalization of TFLite NMS."""
        implements_signature = [
            'name: "%s"' % _DETECTION_POSTPROCESS_FUNC,
            'attr { key: "max_detections" value { i: %d } }' % self._max_detections,
            'attr { key: "max_classes_per_detection" value { i: %d } }' %
            _MAX_CLASSES_PER_DETECTION,
            'attr { key: "use_regular_nms" value { b: %s } }' %
            str(self._use_regular_nms).lower(),
            'attr { key: "nms_score_threshold" value { f: %f } }' %
            self._nms_score_threshold,
            'attr { key: "nms_iou_threshold" value { f: %f } }' %
            self._nms_iou_threshold,
            'attr { key: "y_scale" value { f: %f } }' %
            self._scale_values['y_scale'],
            'attr { key: "x_scale" value { f: %f } }' %
            self._scale_values['x_scale'],
            'attr { key: "h_scale" value { f: %f } }' %
            self._scale_values['h_scale'],
            'attr { key: "w_scale" value { f: %f } }' %
            self._scale_values['w_scale'],
            'attr { key: "num_classes" value { i: %d } }' % self._num_classes
        ]
        implements_signature = ' '.join(implements_signature)
        return implements_signature

    def _get_postprocess_fn(self, num_anchors, num_classes):
        # There is no TF equivalent for TFLite's custom post-processing op.
        # So we add an 'empty' composite function here, that is legalized to the
        # custom op with MLIR.
        @tf.function(experimental_implements=self.postprocess_implements_signature())

        def dummy_post_processing(box_encodings, class_predictions, anchors):
            boxes = tf.constant(0.0, dtype=tf.float32, name='boxes')
            scores = tf.constant(0.0, dtype=tf.float32, name='scores')
            classes = tf.constant(0.0, dtype=tf.float32, name='classes')
            num_detections = tf.constant(0.0, dtype=tf.float32, name='num_detections')
            return boxes, classes, scores, num_detections

        return dummy_post_processing

    @tf.function
    def inference_fn(self, image):
        """Encapsulates SSD inference for TFLite conversion.

        NOTE: The Args & Returns sections below indicate the TFLite model signature,
        and not what the TF graph does (since the latter does not include the custom
        NMS op used by TFLite)

        Args:
          image: a float32 tensor of shape [num_anchors, 4] containing the anchor
            boxes

        Returns:
          num_detections: a float32 scalar denoting number of total detections.
          classes: a float32 tensor denoting class ID for each detection.
          scores: a float32 tensor denoting score for each detection.
          boxes: a float32 tensor denoting coordinates of each detected box.
        """
        predicted_tensors = self._model(image)
        class_predictions = tf.sigmoid(predicted_tensors['ClfPred'])
        class_predictions = tf.identity(class_predictions, name='class_predictions')

        box_encodings = tf.identity(predicted_tensors["BoxPred"], name='box_encodings')

        anchors = tf.identity(self._Anchors, name='anchors')

        # tf.function@ seems to reverse order of inputs, so reverse them here.
        return self._get_postprocess_fn(detection_module._Anchors.shape[0], self._num_classes)(box_encodings, class_predictions, anchors)[::-1]

In [None]:
detection_module = SSDModule(config, model)    
concrete_function = detection_module.inference_fn.get_concrete_function(tf.TensorSpec(shape=detection_module.input_shape(), dtype=tf.float32, name='input'))
tf.saved_model.save(detection_module, "logs/tflite_Test", signatures=concrete_function)

In [None]:
#Estimated count of arithmetic ops:325.369 M  ops, equivalently 162.684 M  MACs MB3 PFH
#Estimated count of arithmetic ops:494.661 M  ops, equivalently 247.331 M  MACs MB3 FPN

#Estimated count of arithmetic ops: 77.080 G  ops, equivalently 38.540 G  MACs

#Estimated count of arithmetic ops:905.699 M  ops, equivalently 452.849 M  MACs MBDet PFH
#Estimated count of arithmetic ops: 1.121 G  ops, equivalently 0.560 G  MACs MBDet FPN

In [None]:
asdasd

In [None]:
converter = tf.lite.TFLiteConverter.from_saved_model("logs/tflite_Test", signature_keys=['serving_default'])
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.target_spec.supported_ops = [
    tf.lite.OpsSet.TFLITE_BUILTINS,
    tf.lite.OpsSet.SELECT_TF_OPS
]

#converter.target_spec.supported_types = [tf.float32]

tflite_quant_model = converter.convert()
with tf.io.gfile.GFile('converted_model.tflite', 'wb') as f:
    f.write(tflite_quant_model)

In [None]:
asdasd

In [None]:
import tensorflow_datasets as  tfds

def representative_dataset_gen():
    [test_dataset], dataset_info = tfds.load(name="coco/2017", split=["validation"], with_info=True)
    HEIGHT, WIDTH = 320, 320
    for sample in test_dataset.take(100):
        img = sample['image']
        resized_img = tf.image.resize(img, (HEIGHT, WIDTH))
        resized_img = tf.cast(resized_img /127.5 -1.0, tf.float32)
        resized_img = tf.expand_dims(resized_img, 0)
        yield [resized_img]


converter = tf.lite.TFLiteConverter.from_saved_model("logs/tflite_Test", signature_keys=['serving_default'])


converter.allow_custom_ops = True
converter.representative_dataset = representative_dataset_gen
converter.optimizations = [tf.lite.Optimize.DEFAULT]
'''
converter.target_spec.supported_ops = [
    tf.lite.OpsSet.TFLITE_BUILTINS_INT8,
    tf.lite.OpsSet.SELECT_TF_OPS,
    tf.lite.OpsSet.TFLITE_BUILTINS
]
converter.target_spec.supported_types = [tf.int8]

'''

converter.inference_input_type = tf.uint8
converter.quantized_input_stats = {"normalized_input_image_tensor": (128, 128)}
#converter.inference_output_type = tf.uint8
tflite_model = converter.convert()

#with open('converted_model.tflite', 'wb') as f:
with tf.io.gfile.GFile('converted_model.tflite', 'wb') as f:
    f.write(tflite_model)