In [None]:
# ==================== #
# BlazeFace tensorflow #
# - train and test     #
# ==================== #

# inspired by:
# https://github.com/FurkanOM/tf-blazeface.git
# https://github.com/PureHing/face-mask-detection-tf2
# https://github.com/ibaiGorordo/BlazeFace-TFLite-Inference

# BlazeFace paper:
# https://arxiv.org/pdf/1907.05047.pdf

# Val_conf_loss was always 0 for fixed anchor size
# (which Google use for their MediaPipe blaceface model)
# iou_map didn't exceed the threshold
# Calculation of the 'actual_labels' (delta between ground_truth and anchors),
# comes out with empty array
# Reduced iou_threhold from 0.5 to 0.3 for fixed anchor size

# Note about coordinate systems:
# tensorflow bounding boxes use [y_min, x_min, y_max, x_max]
# therefore the following here use same [y_min, x_min, y_max, x_max]
# - bboxes
# - min_max
# the following are always [x, y] order:
# - landmarks
# - anchors (prior_boxes)
# detection delta is configurable via config['detect_coord_order']
# - Training:  get_deltas_from_bboxes_and_landmarks
# - Inference: get_bboxes_and_landmarks_from_deltas

# started adding signatures for model export, but not finished yet

import tensorflow as tf

assert tf.__version__.startswith("2")
import tensorflow_datasets as tfds
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam

import math
import os
import matplotlib.pyplot as plt
import numpy as np
from datetime import datetime
from PIL import ImageDraw

print("TF version:", tf.__version__)

# Limit tensorflow log verbosity
logger = tf.get_logger()
logger.setLevel("ERROR")
from absl import logging

logging.set_verbosity(logging.ERROR)

In [None]:
# ============= #
# Configuration #
# ============= #

PLOT_MODEL = False
LOAD_SAVED_WEIGHTS = True
EXPORT_SAVED_MODEL = True
EXPORT_TFLITE_MODEL = True
EXPORT_QUANTIZED_MODEL = True

ROOT_DIR = os.path.abspath("")

config = {
    "model_spec_name": "bf3",
    "output_dir": os.path.join(ROOT_DIR, "trained_models"),
    "tfds_dataset_name": "the300w_lp",
    "data_dir": "~/tensorflow_datasets",
    "data_train_split_percentage": 80,
    #
    # network settings
    "input_size": (128, 128),  # (height, width)
    "batch_size": 32,
    "detections_per_layer": [6, 2],
    "total_landmarks": 6,
    "neg_pos_ratio": 3,
    "loc_loss_alpha": 1,
    "iou_threshold": 0.3,  # 0.5,
    # 'variances': [0.1, 0.1, 0.2, 0.2], # original
    "variances": [1.0, 1.0, 1.0, 1.0],  # MediaPipe
    # 'detect_coord_order':   'yx', # original
    "detect_coord_order": "xy",  # MediaPipe
    # 'delta_normalised_or_absolute': 'normalised', # original
    "delta_normalised_or_absolute": "absolute",  # MediaPipe
    #
    #     anchor settings
    "clip": False,
    "min_scale": 0.1484375,
    "max_scale": 0.75,
    "feature_map_width": [],
    "feature_map_height": [],
    "strides": [8, 16, 16, 16],
    "aspect_ratios": [1.0],
    "offset_x": 0.5,
    "offset_y": 0.5,
    "reduce_boxes_in_lowest_layer": False,
    "interpolated_scale_aspect_ratio": 1.0,
    "fixed_anchor_size": True,  # True for BlazeFace
    #
    # training settings
    # 'epochs':               150, # 3 # 20 #150
    # 'limited_steps':        None, # None, or optionally shorten (eg. 50) for dev
    #
    # test configuration
    "num_test_images": 10,
}

saved_model_path = os.path.join(config["output_dir"],
                                config["model_spec_name"])
model_plot_file = os.path.join(saved_model_path,
                               config["model_spec_name"] + ".png")
weights_file = os.path.join(saved_model_path,
                            config["model_spec_name"] + ".h5")
tflite_file = os.path.join(saved_model_path,
                           config["model_spec_name"] + ".tflite")
tflite_quant_file = os.path.join(saved_model_path,
                                 config["model_spec_name"] + "_quant.tflite")

# if not os.path.exists(config['output_dir']):
#     os.mkdir(config['output_dir'])
# if not os.path.exists(saved_model_path):
#     os.mkdir(saved_model_path)

# log_path = "logs/{}/{}".format(
#      config['model_spec_name'], datetime.now().strftime("%Y%m%d-%H%M%S"))
# log_path_simple = "logs/{}".format(config['model_spec_name'])
# print('created log file:', log_path_simple)
# w = tf.summary.create_file_writer(log_path_simple)

In [None]:
# ================= #
# Anchor generation #
# ================= #


class SsdAnchorsCalculatorOptions:
    def __init__(
        self,
        input_size_width,
        input_size_height,
        min_scale,
        max_scale,
        num_layers,
        feature_map_width,
        feature_map_height,
        strides,
        aspect_ratios,
        anchor_offset_x=0.5,
        anchor_offset_y=0.5,
        reduce_boxes_in_lowest_layer=False,
        interpolated_scale_aspect_ratio=1.0,
        fixed_anchor_size=False,
    ):
        # Size of input images.
        self.input_size_width = input_size_width
        self.input_size_height = input_size_height
        # Min and max scales for generating anchor boxes on feature maps.
        self.min_scale = min_scale
        self.max_scale = max_scale
        # Offset for the center of anchors. Value is in the scale of stride.
        # E.g. 0.5 meaning 0.5 * |current_stride| in pixels.
        self.anchor_offset_x = anchor_offset_x
        self.anchor_offset_y = anchor_offset_y
        # Number of output feature maps to generate the anchors on.
        self.num_layers = num_layers
        # Sizes of output feature maps to create anchors.
        # Either feature_map size or stride should be provided.
        self.feature_map_width = feature_map_width
        self.feature_map_height = feature_map_height
        self.feature_map_width_size = len(feature_map_width)
        self.feature_map_height_size = len(feature_map_height)
        # Strides of each output feature maps.
        self.strides = strides
        self.strides_size = len(strides)
        # List of different aspect ratio to generate anchors.
        self.aspect_ratios = aspect_ratios
        self.aspect_ratios_size = len(aspect_ratios)
        # A boolean to indicate whether the fixed 3 boxes per location is used
        # in the lowest layer.
        self.reduce_boxes_in_lowest_layer = reduce_boxes_in_lowest_layer
        # An additional anchor is added with this aspect ratio and a scale
        # interpolated between the scale for a layer and the scale for the
        # next layer (1.0 for the last layer). This anchor is not included
        # if this value is 0.
        self.interpolated_scale_aspect_ratio = interpolated_scale_aspect_ratio
        # Whether use fixed width and height (e.g. both 1.0f) for each anchor.
        # This option can be used when the predicted anchor width and height
        # are in  pixels.
        self.fixed_anchor_size = fixed_anchor_size

    def to_string(self):
        return "input_size_width: {:}\ninput_size_height: {:}" \
               "\nmin_scale: {:}\nmax_scale: {:}\nanchor_offset_x: {:}" \
               "\nanchor_offset_y: {:}\nnum_layers: {:}" \
               "\nfeature_map_width: {:}\nfeature_map_height: {:}" \
               "\nstrides: {:}\naspect_ratios: {:}" \
               "\nreduce_boxes_in_lowest_layer: {:}" \
               "\ninterpolated_scale_aspect_ratio: {:}" \
               "\nfixed_anchor_size: {:}".format(
            self.input_size_width,
            self.input_size_height,
            self.min_scale,
            self.max_scale,
            self.anchor_offset_x,
            self.anchor_offset_y,
            self.num_layers,
            self.feature_map_width,
            self.feature_map_height,
            self.strides,
            self.aspect_ratios,
            self.reduce_boxes_in_lowest_layer,
            self.interpolated_scale_aspect_ratio,
            self.fixed_anchor_size,
        )


def gen_anchors(options):
    anchors = []
    # Verify the options.
    if options.strides_size != options.num_layers:
        print("strides_size and num_layers must be equal.")
        return []

    layer_id = 0
    while layer_id < options.strides_size:
        anchor_height = []
        anchor_width = []
        aspect_ratios = []
        scales = []

        # For same strides, we merge the anchors in the same order.
        last_same_stride_layer = layer_id
        while (
            last_same_stride_layer < options.strides_size
            and options.strides[last_same_stride_layer] == options.strides[layer_id]
        ):
            scale = options.min_scale + (
                options.max_scale - options.min_scale
            ) * 1.0 * last_same_stride_layer / (options.strides_size - 1.0)
            if last_same_stride_layer == 0 and options.reduce_boxes_in_lowest_layer:
                # For first layer, it can be specified to use predefined anchors.
                aspect_ratios.append(1.0)
                aspect_ratios.append(2.0)
                aspect_ratios.append(0.5)
                scales.append(0.1)
                scales.append(scale)
                scales.append(scale)
            else:
                for aspect_ratio_id in range(options.aspect_ratios_size):
                    aspect_ratios.append(options.aspect_ratios[aspect_ratio_id])
                    scales.append(scale)

                if options.interpolated_scale_aspect_ratio > 0.0:
                    scale_next = (
                        1.0
                        if last_same_stride_layer == options.strides_size - 1
                        else options.min_scale
                        + (options.max_scale - options.min_scale)
                        * 1.0
                        * (last_same_stride_layer + 1)
                        / (options.strides_size - 1.0)
                    )
                    scales.append(math.sqrt(scale * scale_next))
                    aspect_ratios.append(
                        options.interpolated_scale_aspect_ratio)
            last_same_stride_layer += 1
        for i in range(len(aspect_ratios)):
            ratio_sqrts = math.sqrt(aspect_ratios[i])
            anchor_height.append(scales[i] / ratio_sqrts)
            anchor_width.append(scales[i] * ratio_sqrts)

        feature_map_height = 0
        feature_map_width = 0
        if options.feature_map_height_size > 0:
            feature_map_height = options.feature_map_height[layer_id]
            feature_map_width = options.feature_map_width[layer_id]
        else:
            stride = options.strides[layer_id]
            feature_map_height = math.ceil(
                1.0 * options.input_size_height / stride)
            feature_map_width = math.ceil(
                1.0 * options.input_size_width / stride)

        for y in range(feature_map_height):
            for x in range(feature_map_width):
                for anchor_id in range(len(anchor_height)):
                    # TODO: Support specifying anchor_offset_x, anchor_offset_y.
                    x_center = (x + options.anchor_offset_x) * 1.0 / feature_map_width
                    y_center = (y + options.anchor_offset_y) * 1.0 / feature_map_height
                    w = 0
                    h = 0
                    if options.fixed_anchor_size:
                        w = 1.0
                        h = 1.0
                    else:
                        w = anchor_width[anchor_id]
                        h = anchor_height[anchor_id]
                    new_anchor = [x_center, y_center, h, w]
                    anchors.append([new_anchor])

        layer_id = last_same_stride_layer
    return anchors


# Configuration options for SSD anchor generation
ssd_anchors_calculator_options = SsdAnchorsCalculatorOptions(
    input_size_width=config["input_size"][1],
    input_size_height=config["input_size"][0],
    min_scale=config["min_scale"],
    max_scale=config["max_scale"],
    anchor_offset_x=config["offset_x"],
    anchor_offset_y=config["offset_y"],
    num_layers=4,
    feature_map_width=[],
    feature_map_height=[],
    strides=config["strides"],
    aspect_ratios=config["aspect_ratios"],
    reduce_boxes_in_lowest_layer=config["reduce_boxes_in_lowest_layer"],
    interpolated_scale_aspect_ratio=config["interpolated_scale_aspect_ratio"],
    fixed_anchor_size=config["fixed_anchor_size"],
)

print("Generating priors boxes (anchors)")
prior_boxes = gen_anchors(ssd_anchors_calculator_options)
prior_boxes = tf.concat(prior_boxes, axis=0)
prior_boxes = tf.clip_by_value(prior_boxes, 0, 1)

print(f"Prior boxes number:{len(prior_boxes)}")

In [None]:
# =================== #
# Landmarks functions #
# =================== #


def renormalize_landmarks_with_min_max(landmarks, min_max):
    """Renormalizing given bounding boxes to the new boundaries.
    r = (x - min) / (max - min)
    outputs:
        landmarks = (total_count, total_landmarks, [x, y])
        min_max = ([y_min, x_min, y_max, x_max])
    """
    y_min, x_min, y_max, x_max = tf.split(min_max, 4)
    renomalized_landmarks = landmarks - tf.concat([x_min, y_min], -1)
    renomalized_landmarks /= tf.concat([x_max - x_min, y_max - y_min], -1)
    return tf.clip_by_value(renomalized_landmarks, 0, 1)


def normalize_landmarks(landmarks, height, width):
    """Normalizing landmarks.
    inputs:
        landmarks = (M, N, [x, y])
        height = image height
        width = image width

    outputs:
        normalized_landmarks = (M, N, [x, y])
            in normalized form [0, 1]
    """
    return landmarks / tf.cast([width, height], tf.float32)


def denormalize_landmarks(landmarks, height, width):
    """Denormalizing landmarks.
    inputs:
        landmarks = (M, N, [x, y])
            in normalized form [0, 1]
        height = image height
        width = image width

    outputs:
        denormalized_landmarks = (M, N, [x, y])
    """
    return tf.round(landmarks * tf.cast([width, height], tf.float32))

In [None]:
# ====================== #
# Bounding box functions #
# ====================== #


def get_weighted_boxes_and_landmarks(scores, bboxes_and_landmarks, mask):
    """Calculating weighted mean of given bboxes and landmarks according to
    the mask.
    inputs:
        scores = (total_bboxes, [probability])
        bboxes_and_landmarks = (total_bboxes, [y1, x1, y2, x2, landmark_x0,
        landmark_y0, ..., landmark_xN, landmark_yN])
        mask = (total_bboxes,)

    outputs:
        weighted_bbox_and_landmark = (1, [y1, x1, y2, x2, landmark_x0,
        landmark_y0, ..., landmark_xN, landmark_yN])
    """
    selected_scores = scores[mask]
    selected_bboxes_and_landmarks = bboxes_and_landmarks[mask]
    weighted_sum = tf.reduce_sum(
        selected_bboxes_and_landmarks * selected_scores, 0)
    sum_selected_scores = tf.reduce_sum(selected_scores, 0)
    sum_selected_scores = tf.where(
        tf.equal(sum_selected_scores, 0.0), 1.0, sum_selected_scores
    )
    return tf.expand_dims(weighted_sum / sum_selected_scores, 0)


def weighted_suppression_body(
    counter, iou_threshold, scores, bboxes_and_landmarks,
    weighted_suppressed_data
):
    """Weighted mean suppression algorithm while body.
    inputs:
        counter = while body counter
        iou_threshold = threshold value for overlapping bounding boxes
        scores = (total_bboxes, [probability])
        bboxes_and_landmarks = (total_bboxes, [y1, x1, y2, x2, landmark_x0,
        landmark_y0, ..., landmark_xN, landmark_yN])
        weighted_suppressed_data = (M, [y1, x1, y2, x2, landmark_x0,
        landmark_y0, ..., landmark_xN, landmark_yN])

    outputs:
        counter = while body counter
        iou_threshold = threshold value for overlapping bounding boxes
        scores = (total_bboxes - N, [probability])
        bboxes_and_landmarks = (total_bboxes - N, [y1, x1, y2, x2, landmark_x0,
        landmark_y0, ..., landmark_xN, landmark_yN])
        weighted_suppressed_data = (M + 1, [y1, x1, y2, x2, landmark_x0,
        landmark_y0, ..., landmark_xN, landmark_yN])
    """
    counter = tf.add(counter, 1)
    first_box = bboxes_and_landmarks[0, 0:4]
    iou_map = generate_iou_map(
        first_box, bboxes_and_landmarks[..., 0:4], transpose_perm=[1, 0]
    )
    overlapped_mask = tf.reshape(tf.greater(iou_map, iou_threshold), (-1,))
    weighted_bbox_and_landmark = get_weighted_boxes_and_landmarks(
        scores, bboxes_and_landmarks, overlapped_mask
    )
    weighted_suppressed_data = tf.concat(
        [weighted_suppressed_data, weighted_bbox_and_landmark], axis=0
    )
    not_overlapped_mask = tf.logical_not(overlapped_mask)
    scores = scores[not_overlapped_mask]
    bboxes_and_landmarks = bboxes_and_landmarks[not_overlapped_mask]
    return (
        counter,
        iou_threshold,
        scores,
        bboxes_and_landmarks,
        weighted_suppressed_data,
    )


def weighted_suppression(
    scores,
    bboxes_and_landmarks,
    max_total_size=50,
    score_threshold=0.75,
    iou_threshold=0.3,
):
    """Blazeface weighted mean suppression algorithm.
    inputs:
        scores = (total_bboxes, [probability])
        bboxes_and_landmarks = (total_bboxes, [y1, x1, y2, x2, landmark_x0,
        landmark_y0, ..., landmark_xN, landmark_yN])
        max_total_size = maximum returned bounding boxes and landmarks
        score_threshold = threshold value for bounding boxes and landmarks
                          selection
        iou_threshold = threshold value for overlapping bounding boxes

    outputs:
        weighted_bboxes_and_landmarks = (dynamic_size, [y1, x1, y2, x2,
        landmark_x0, landmark_y0, ..., landmark_xN, landmark_yN])
    """
    # Filter based on the score threshold before applying sigmoid function
    score_mask = tf.squeeze(tf.greater(scores, score_threshold), -1)
    scores = scores[score_mask]
    # Convert scores back from sigmoid values
    scores_sig = tf.math.sigmoid(scores)

    bboxes_and_landmarks = bboxes_and_landmarks[score_mask]
    sorted_indices = tf.argsort(scores_sig, axis=0, direction="DESCENDING")
    sorted_scores = tf.gather_nd(scores_sig, sorted_indices)
    sorted_bboxes_and_landmarks = tf.gather_nd(bboxes_and_landmarks,
                                               sorted_indices)
    counter = tf.constant(0, tf.int32)
    weighted_data = tf.zeros(tf.shape(bboxes_and_landmarks[0:1]),
                             dtype=tf.float32)
    cond = lambda counter, iou_threshold, scores_sig, data, weighted: tf.logical_and(
        tf.less(counter, max_total_size),
        tf.greater(tf.shape(scores_sig)[0], 0)
    )
    _, _, _, _, weighted_data = tf.while_loop(
        cond,
        weighted_suppression_body,
        [
            counter,
            iou_threshold,
            sorted_scores,
            sorted_bboxes_and_landmarks,
            weighted_data,
        ],
    )

    weighted_data = weighted_data[1:]
    pad_size = max_total_size - weighted_data.shape[0]
    weighted_data = tf.pad(weighted_data, ((0, pad_size), (0, 0)))
    return weighted_data, sorted_scores


def non_max_suppression(pred_bboxes, pred_labels, **kwargs):
    """Applying non maximum suppression.
    Details could be found on tensorflow documentation.
    https://www.tensorflow.org/api_docs/python/tf/image/combined_non_max_suppression
    inputs:
        pred_bboxes = (batch_size, total_bboxes, total_labels,
                       [y1, x1, y2, x2])
            total_labels should be 1 for binary operations like in rpn
        pred_labels = (batch_size, total_bboxes, total_labels)
        **kwargs = other parameters

    outputs:
        nms_boxes = (batch_size, max_detections, [y1, x1, y2, x2])
        nmsed_scores = (batch_size, max_detections)
        nmsed_classes = (batch_size, max_detections)
        valid_detections = (batch_size)
            Only the top valid_detections[i] entries in nms_boxes[i],
            nms_scores[i] and nms_class[i] are valid.
            The rest of the entries are zero paddings.
    """
    return tf.image.combined_non_max_suppression(pred_bboxes, pred_labels,
                                                 **kwargs)


def generate_iou_map(bboxes, gt_boxes, transpose_perm=[0, 2, 1]):
    """Calculating intersection over union values for each ground truth boxes
    in a dynamic manner.
    It is supported from 1d to 3d dimensions for bounding boxes.
    Even if bboxes have different rank from gt_boxes it should be work.
    inputs:
        bboxes = (dynamic_dimension, [y1, x1, y2, x2])
        gt_boxes = (dynamic_dimension, [y1, x1, y2, x2])
        transpose_perm = (transpose_perm_order)
            for 3d gt_boxes => [0, 2, 1]

    outputs:
        iou_map = (dynamic_dimension, total_gt_boxes)
            same rank with the gt_boxes
    """
    gt_rank = tf.rank(gt_boxes)
    gt_expand_axis = gt_rank - 2

    bbox_y1, bbox_x1, bbox_y2, bbox_x2 = tf.split(bboxes, 4, axis=-1)
    gt_y1, gt_x1, gt_y2, gt_x2 = tf.split(gt_boxes, 4, axis=-1)

    # Calculate bbox and ground truth boxes areas
    gt_area = tf.squeeze((gt_y2 - gt_y1) * (gt_x2 - gt_x1), axis=-1)
    bbox_area = tf.squeeze((bbox_y2 - bbox_y1) * (bbox_x2 - bbox_x1), axis=-1)

    x_top = tf.maximum(bbox_x1, tf.transpose(gt_x1, transpose_perm))
    y_top = tf.maximum(bbox_y1, tf.transpose(gt_y1, transpose_perm))
    x_bottom = tf.minimum(bbox_x2, tf.transpose(gt_x2, transpose_perm))
    y_bottom = tf.minimum(bbox_y2, tf.transpose(gt_y2, transpose_perm))

    w_max = tf.maximum(x_bottom - x_top, 0)
    h_max = tf.maximum(y_bottom - y_top, 0)

    # Calculate intersection area
    intersection_area = w_max * h_max
    # Calculate union area
    union_area = (
        tf.expand_dims(bbox_area, -1)
        + tf.expand_dims(gt_area, gt_expand_axis)
        - intersection_area
    )

    # Intersection over Union
    return intersection_area / union_area


def get_bboxes_and_landmarks_from_deltas(prior_boxes, deltas):
    """Calculating bounding boxes and landmarks for given delta values.
    inputs:
        prior_boxes = (total_bboxes, [center_x, center_y, width, height])
        deltas = (batch_size, total_bboxes, [delta_bbox_y, delta_bbox_x,
        delta_bbox_h, delta_bbox_w, delta_landmark_x0, delta_landmark_y0, ...,
        delta_landmark_xN, delta_landmark_yN])

    outputs:
        bboxes_and_landmarks = (batch_size, total_bboxes, [y1, x1, y2, x2,
        landmark_x0, landmark_y0, ..., landmark_xN, landmark_yN])
    """

    if config["detect_coord_order"] == "xy":
        xi, yi, wi, hi = 0, 1, 2, 3
    else:
        yi, xi, hi, wi = 0, 1, 2, 3
    pxi, pyi, pwi, phi = 0, 1, 2, 3

    delta_x = deltas[..., xi]
    delta_y = deltas[..., yi]
    delta_w = deltas[..., wi]
    delta_h = deltas[..., hi]

    anchor_x = prior_boxes[..., pxi]
    anchor_y = prior_boxes[..., pyi]
    anchor_w = prior_boxes[..., pwi]
    anchor_h = prior_boxes[..., phi]

    image_height, image_width = tf.cast(config["input_size"], tf.float32)

    if config["delta_normalised_or_absolute"] == "absolute":
        bbox_ctr_x = delta_x * anchor_w + anchor_x * image_width
        bbox_ctr_y = delta_y * anchor_h + anchor_y * image_height
        bbox_width = delta_w * anchor_w
        bbox_height = delta_h * anchor_h

        bbox_ctr_x /= image_width
        bbox_ctr_y /= image_height
        bbox_width /= image_width
        bbox_height /= image_height
    else:  # normalised
        bbox_ctr_x = delta_x * anchor_w + anchor_x
        bbox_ctr_y = delta_y * anchor_h + anchor_y
        bbox_width = delta_w * anchor_w
        bbox_height = delta_h * anchor_h

    y1 = bbox_ctr_y - (0.5 * bbox_height)
    x1 = bbox_ctr_x - (0.5 * bbox_width)
    y2 = bbox_height + y1
    x2 = bbox_width + x1

    total_landmarks = tf.shape(deltas[..., 4:])[-1] // 2
    xy_pairs = tf.tile(prior_boxes[..., 0:2], (1, total_landmarks))
    wh_pairs = tf.tile(prior_boxes[..., 2:4], (1, total_landmarks))
    if config["delta_normalised_or_absolute"] == "absolute":
        px, py = tf.split(xy_pairs, 2, axis=1)
        xy_pairs = tf.concat([px * image_width, py * image_height], -1)
        image_wh_pairs = tf.tile([image_width, image_height],
                                 [total_landmarks])
        landmarks = (deltas[..., 4:] * wh_pairs) + xy_pairs
        landmarks /= image_wh_pairs
    else:  # normalised
        landmarks = (deltas[..., 4:] * wh_pairs) + xy_pairs

    bbl = tf.stack([y1, x1, y2, x2], axis=-1)
    return tf.concat([bbl, landmarks], -1)


def get_deltas_from_bboxes_and_landmarks(prior_boxes, bboxes_and_landmarks):
    """Calculating bounding box and landmark deltas for given ground truth
    boxes and landmarks.
    inputs:
        prior_boxes = (total_bboxes, [center_x, center_y, width, height])
        bboxes_and_landmarks = (batch_size, total_bboxes, [y1, x1, y2, x2,
        landmark_x0, landmark_y0, ..., landmark_xN, landmark_yN])

    outputs:
        deltas = (batch_size, total_bboxes, [delta_bbox_y, delta_bbox_x,
        delta_bbox_h, delta_bbox_w, delta_landmark_x0, delta_landmark_y0, ...,
        delta_landmark_xN, delta_landmark_yN])
    """

    y1i, x1i, y2i, x2i = 0, 1, 2, 3
    pxi, pyi, pwi, phi = 0, 1, 2, 3

    gt_width = bboxes_and_landmarks[..., x2i] - bboxes_and_landmarks[..., x1i]
    gt_height = bboxes_and_landmarks[..., y2i] - bboxes_and_landmarks[..., y1i]
    gt_ctr_x = bboxes_and_landmarks[..., x1i] + 0.5 * gt_width
    gt_ctr_y = bboxes_and_landmarks[..., y1i] + 0.5 * gt_height

    anchor_x = prior_boxes[..., pxi]
    anchor_y = prior_boxes[..., pyi]
    anchor_w = prior_boxes[..., pwi]
    anchor_h = prior_boxes[..., phi]

    delta_x = (gt_ctr_x - anchor_x) / anchor_w
    delta_y = (gt_ctr_y - anchor_y) / anchor_h
    delta_w = gt_width / anchor_w
    delta_h = gt_height / anchor_h

    image_height, image_width = tf.cast(config["input_size"], tf.float32)

    if config["delta_normalised_or_absolute"] == "absolute":
        # re-normalise from absolute values before comparison
        delta_x = delta_x * image_width
        delta_y = delta_y * image_height
        delta_w = delta_w * image_width
        delta_h = delta_h * image_height

    total_landmarks = tf.shape(bboxes_and_landmarks[..., 4:])[-1] // 2
    xy_pairs = tf.tile(prior_boxes[..., 0:2], (1, total_landmarks))
    wh_pairs = tf.tile(prior_boxes[..., 2:4], (1, total_landmarks))
    landmark_deltas = (bboxes_and_landmarks[..., 4:] - xy_pairs) / wh_pairs
    if config["delta_normalised_or_absolute"] == "absolute":
        # re-normalise from absolute values before comparison
        image_wh_pairs = tf.tile([image_width, image_height],
                                 [total_landmarks])
        landmark_deltas *= image_wh_pairs

    if config["detect_coord_order"] == "xy":
        deltas = tf.stack([delta_x, delta_y, delta_w, delta_h], -1)
    else:
        deltas = tf.stack([delta_y, delta_x, delta_h, delta_w], -1)
    return tf.concat([deltas, landmark_deltas], -1)


def convert_xywh_to_bboxes(xywh):
    """Converting center x, y and width height format to bounding boxes.
    inputs:
        xywh = (M, N, [center_x, center_y, width, height])

    outputs:
        bboxes = (M, N, [y1, x1, y2, x2])
    """
    xi, yi, wi, hi = 0, 1, 2, 3

    y1 = xywh[..., yi] - (0.5 * xywh[..., hi])
    x1 = xywh[..., xi] - (0.5 * xywh[..., wi])
    y2 = xywh[..., hi] + y1
    x2 = xywh[..., wi] + x1

    bboxes = tf.stack([y1, x1, y2, x2], axis=-1)
    return tf.clip_by_value(bboxes, 0, 1)


def renormalize_bboxes_with_min_max(bboxes, min_max):
    """Renormalizing given bounding boxes to the new boundaries.
    r = (x - min) / (max - min)
    outputs:
        bboxes = (total_bboxes, [y1, x1, y2, x2])
        min_max = ([y_min, x_min, y_max, x_max])
    """
    y_min, x_min, y_max, x_max = tf.split(min_max, 4)
    renomalized_bboxes = bboxes - tf.concat([y_min, x_min, y_min, x_min], -1)
    renomalized_bboxes /= tf.concat(
        [y_max - y_min, x_max - x_min, y_max - y_min, x_max - x_min], -1
    )
    return tf.clip_by_value(renomalized_bboxes, 0, 1)


def normalize_bboxes(bboxes, height, width):
    """Normalizing bounding boxes.
    inputs:
        bboxes = (M, N, [y1, x1, y2, x2])
        height = image height
        width = image width

    outputs:
        normalized_bboxes = (M, N, [y1, x1, y2, x2])
            in normalized form [0, 1]
    """
    y1i, x1i, y2i, x2i = 0, 1, 2, 3
    y1 = bboxes[..., y1i] / height
    x1 = bboxes[..., x1i] / width
    y2 = bboxes[..., y2i] / height
    x2 = bboxes[..., x2i] / width

    box = tf.stack([y1, x1, y2, x2], axis=-1)
    return tf.round(box)


def denormalize_bboxes(bboxes, height, width):
    """Denormalizing bounding boxes.
    inputs:
        bboxes = (M, N, [y1, x1, y2, x2])
            in normalized form [0, 1]
        height = image height
        width = image width

    outputs:
        denormalized_bboxes = (M, N, [y1, x1, y2, x2])
    """
    y1i, x1i, y2i, x2i = 0, 1, 2, 3
    y1 = bboxes[..., y1i] * height
    x1 = bboxes[..., x1i] * width
    y2 = bboxes[..., y2i] * height
    x2 = bboxes[..., x2i] * width

    box = tf.stack([y1, x1, y2, x2], axis=-1)
    return tf.round(box)

In [None]:
# =========================== #
# Data augmentation functions #
# =========================== #


def apply_augmentation(img, gt_boxes, gt_landmarks):
    """Randomly applying data augmentation methods to image and ground truth
    boxes.
    inputs:
        img = (height, width, depth)
        gt_boxes = (ground_truth_object_count, [y1, x1, y2, x2])
            in normalized form [0, 1]
        gt_landmarks = (ground_truth_object_count, total_landmarks, [x, y])
            in normalized form [0, 1]
    outputs:
        modified_img = (final_height, final_width, depth)
        modified_gt_boxes = (ground_truth_object_count, [y1, x1, y2, x2])
            in normalized form [0, 1]
        modified_gt_landmarks = (ground_truth_object_count, total_landmarks,
        [x, y])
            in normalized form [0, 1]
    """
    # Color operations
    # Randomly change hue, saturation, brightness and contrast of image
    color_methods = [random_brightness, random_contrast, random_hue,
                     random_saturation]
    # Geometric operations
    # Randomly sample a patch image and ground truth boxes
    geometric_methods = [patch]

    for augmentation_method in geometric_methods + color_methods:
        img, gt_boxes, gt_landmarks = randomly_apply_operation(
            augmentation_method, img, gt_boxes, gt_landmarks
        )

    img = tf.clip_by_value(img, 0.0, 1.0)
    return img, gt_boxes, gt_landmarks


def get_random_bool():
    """Generating random boolean.
    outputs:
        random boolean 0d tensor
    """
    return tf.greater(tf.random.uniform((), dtype=tf.float32), 0.5)


def randomly_apply_operation(operation, img, gt_boxes, gt_landmarks, *args):
    """Randomly applying given method to image and ground truth boxes.
    inputs:
        operation = callable method
        img = (height, width, depth)
        gt_boxes = (ground_truth_object_count, [y1, x1, y2, x2])
        gt_landmarks = (ground_truth_object_count, total_landmarks, [x, y])
    outputs:
        modified_or_not_img = (final_height, final_width, depth)
        modified_or_not_gt_boxes = (ground_truth_object_count,
                                    [y1, x1, y2, x2])
        modified_or_not_gt_landmarks = (ground_truth_object_count,
        total_landmarks, [x, y])
    """
    return tf.cond(
        get_random_bool(),
        lambda: operation(img, gt_boxes, gt_landmarks, *args),
        lambda: (img, gt_boxes, gt_landmarks),
    )


def random_brightness(img, gt_boxes, gt_landmarks, max_delta=0.12):
    """Randomly change brightness of the image.
    inputs:
        img = (height, width, depth)
        gt_boxes = (ground_truth_object_count, [y1, x1, y2, x2])
        gt_landmarks = (ground_truth_object_count, total_landmarks, [x, y])
    outputs:
        modified_img = (height, width, depth)
        gt_boxes = (ground_truth_object_count, [y1, x1, y2, x2])
        gt_landmarks = (ground_truth_object_count, total_landmarks, [x, y])
    """
    return tf.image.random_brightness(img, max_delta), gt_boxes, gt_landmarks


def random_contrast(img, gt_boxes, gt_landmarks, lower=0.5, upper=1.5):
    """Randomly change contrast of the image.
    inputs:
        img = (height, width, depth)
        gt_boxes = (ground_truth_object_count, [y1, x1, y2, x2])
        gt_landmarks = (ground_truth_object_count, total_landmarks, [x, y])
    outputs:
        modified_img = (height, width, depth)
        gt_boxes = (ground_truth_object_count, [y1, x1, y2, x2])
        gt_landmarks = (ground_truth_object_count, total_landmarks, [x, y])
    """
    return tf.image.random_contrast(img, lower, upper), gt_boxes, gt_landmarks


def random_hue(img, gt_boxes, gt_landmarks, max_delta=0.08):
    """Randomly change hue of the image.
    inputs:
        img = (height, width, depth)
        gt_boxes = (ground_truth_object_count, [y1, x1, y2, x2])
        gt_landmarks = (ground_truth_object_count, total_landmarks, [x, y])
    outputs:
        modified_img = (height, width, depth)
        gt_boxes = (ground_truth_object_count, [y1, x1, y2, x2])
        gt_landmarks = (ground_truth_object_count, total_landmarks, [x, y])
    """
    return tf.image.random_hue(img, max_delta), gt_boxes, gt_landmarks


def random_saturation(img, gt_boxes, gt_landmarks, lower=0.5, upper=1.5):
    """Randomly change saturation of the image.
    inputs:
        img = (height, width, depth)
        gt_boxes = (ground_truth_object_count, [y1, x1, y2, x2])
        gt_landmarks = (ground_truth_object_count, total_landmarks, [x, y])
    outputs:
        modified_img = (height, width, depth)
        gt_boxes = (ground_truth_object_count, [y1, x1, y2, x2])
        gt_landmarks = (ground_truth_object_count, total_landmarks, [x, y])
    """
    return (tf.image.random_saturation(img, lower, upper),
            gt_boxes, gt_landmarks)


# ================== #
# Sample patch start #
# ================== #


def get_random_min_overlap():
    """Generating random minimum overlap value.
    outputs:
        min_overlap = random minimum overlap value 0d tensor
    """
    overlaps = tf.constant([0.1, 0.3, 0.5, 0.7, 0.9], dtype=tf.float32)
    i = tf.random.uniform((), minval=0, maxval=tf.shape(overlaps)[0],
                          dtype=tf.int32)
    return overlaps[i]


def expand_image(img, gt_boxes, gt_landmarks, height, width):
    """Randomly expanding image and adjusting ground truth object coordinates.
    inputs:
        img = (height, width, depth)
        gt_boxes = (ground_truth_object_count, [y1, x1, y2, x2])
        gt_landmarks = (ground_truth_object_count, total_landmarks, [x, y])
        height = height of the image
        width = width of the image
    outputs:
        modified_img = (final_height, final_width, depth)
        modified_gt_boxes = (ground_truth_object_count, [y1, x1, y2, x2])
        modified_gt_landmarks = (ground_truth_object_count, total_landmarks,
        [x, y])
    """
    expansion_ratio = tf.random.uniform((), minval=1, maxval=4,
                                        dtype=tf.float32)
    final_height, final_width = tf.round(height * expansion_ratio), tf.round(
        width * expansion_ratio
    )
    pad_left = tf.round(
        tf.random.uniform((), minval=0, maxval=final_width - width,
                          dtype=tf.float32)
    )
    pad_top = tf.round(
        tf.random.uniform((), minval=0, maxval=final_height - height,
                          dtype=tf.float32)
    )
    pad_right = final_width - (width + pad_left)
    pad_bottom = final_height - (height + pad_top)

    mean, _ = tf.nn.moments(img, [0, 1])
    expanded_image = tf.pad(
        img, ((pad_top, pad_bottom), (pad_left, pad_right), (0, 0)),
        constant_values=-1
    )
    expanded_image = tf.where(expanded_image == -1, mean, expanded_image)

    min_max = tf.stack(
        [-pad_top, -pad_left, pad_bottom + height, pad_right + width], -1
    ) / [height, width, height, width]
    modified_gt_boxes = renormalize_bboxes_with_min_max(gt_boxes, min_max)
    modified_gt_landmarks = renormalize_landmarks_with_min_max(gt_landmarks,
                                                               min_max)

    return expanded_image, modified_gt_boxes, modified_gt_landmarks


def patch(img, gt_boxes, gt_landmarks):
    """Generating random patch and adjusting image and ground truth objects to
    this patch. After this operation some of the ground truth boxes / objects
    could be removed from the image. However, these objects are not excluded
    from the output, only the coordinates are changed as zero.
    inputs:
        img = (height, width, depth)
        gt_boxes = (ground_truth_object_count, [y1, x1, y2, x2])
            in normalized form [0, 1]
        gt_landmarks = (ground_truth_object_count, total_landmarks, [x, y])
            in normalized form [0, 1]
    outputs:
        modified_img = (final_height, final_width, depth)
        modified_gt_boxes = (ground_truth_object_count, [y1, x1, y2, x2])
            in normalized form [0, 1]
        modified_gt_landmarks = (ground_truth_object_count, total_landmarks,
        [x, y])
            in normalized form [0, 1]
    """
    img_shape = tf.cast(tf.shape(img), dtype=tf.float32)
    org_height, org_width = img_shape[0], img_shape[1]
    # Randomly expand image and adjust bounding boxes
    img, gt_boxes, gt_landmarks = randomly_apply_operation(
        expand_image, img, gt_boxes, gt_landmarks, org_height, org_width
    )
    # Get random minimum overlap value
    min_overlap = get_random_min_overlap()

    begin, size, new_boundaries = tf.image.sample_distorted_bounding_box(
        tf.shape(img),
        bounding_boxes=tf.expand_dims(gt_boxes, 0),
        min_object_covered=min_overlap,
    )

    img = tf.slice(img, begin, size)
    img = tf.image.resize(img, (org_height, org_width))
    gt_boxes = renormalize_bboxes_with_min_max(gt_boxes, new_boundaries[0, 0])
    gt_landmarks = renormalize_landmarks_with_min_max(
        gt_landmarks, new_boundaries[0, 0]
    )

    return img, gt_boxes, gt_landmarks

In [None]:
# ========================== #
# Data preparation functions #
# ========================== #


# reduce total landmarks in data-set to just what is needed for blazeface
def filter_landmarks(landmarks):
    # Left eye
    left_eye_coords = tf.reduce_mean(landmarks[..., 36:42, :], -2)
    # Right eye
    right_eye_coords = tf.reduce_mean(landmarks[..., 42:48, :], -2)
    # Left ear
    left_ear_coords = tf.reduce_mean(landmarks[..., 0:2, :], -2)
    # Right ear
    right_ear_coords = tf.reduce_mean(landmarks[..., 15:17, :], -2)
    # Nose
    nose_coords = tf.reduce_mean(landmarks[..., 27:36, :], -2)
    # Mouth
    mouth_coords = tf.reduce_mean(landmarks[..., 48:68, :], -2)
    return tf.stack(
        [
            left_eye_coords,
            right_eye_coords,
            left_ear_coords,
            right_ear_coords,
            nose_coords,
            mouth_coords,
        ],
        -2,
    )


def generate_bboxes_from_landmarks(landmarks):
    padding = 5e-3
    xi, yi = 0, 1
    x1 = tf.reduce_min(landmarks[..., xi], -1) - padding
    x2 = tf.reduce_max(landmarks[..., xi], -1) + padding
    y1 = tf.reduce_min(landmarks[..., yi], -1) - padding
    y2 = tf.reduce_max(landmarks[..., yi], -1) + padding

    gt_boxes = tf.stack([y1, x1, y2, x2], -1)
    return tf.clip_by_value(gt_boxes, 0, 1)


def preprocessing(image_data, final_height, final_width, augmentation_fn=None):
    img = image_data["image"]
    img = tf.image.convert_image_dtype(img, tf.float32)
    gt_landmarks = tf.expand_dims(image_data["landmarks_2d"], 0)
    gt_boxes = generate_bboxes_from_landmarks(gt_landmarks)
    gt_landmarks = filter_landmarks(gt_landmarks)
    img = tf.image.resize(img, (final_height, final_width))
    if augmentation_fn:
        img, gt_boxes, gt_landmarks = augmentation_fn(img, gt_boxes,
                                                      gt_landmarks)
    img = (img - 0.5) / 0.5
    return img, gt_boxes, gt_landmarks


def calculate_actual_outputs(prior_boxes, gt_boxes, gt_landmarks,
                             hyper_params):
    """Calculate ssd actual output values.
    Batch operations supported.
    inputs:
        prior_boxes = (total_bboxes, [center_x, center_y, width, height])
            these values in normalized format between [0, 1]
        gt_boxes = (batch_size, gt_box_size, [y1, x1, y2, x2])
            these values in normalized format between [0, 1]
        gt_landmarks = (batch_size, gt_box_size, total_landmarks, [x, y])
            these values in normalized format between [0, 1]
        hyper_params = dictionary

    outputs:
        actual_deltas = (batch_size, total_bboxes, [delta_bbox_y, delta_bbox_x,
        delta_bbox_h, delta_bbox_w, delta_landmark_x0, delta_landmark_y0, ...,
        delta_landmark_xN, delta_landmark_yN])
        actual_labels = (batch_size, total_bboxes, [1 or 0])
    """

    prior_boxes = tf.cast(prior_boxes, tf.float32)  # ensure all float values

    batch_size = tf.shape(gt_boxes)[0]
    iou_threshold = hyper_params["iou_threshold"]
    variances = hyper_params["variances"]
    total_landmarks = hyper_params["total_landmarks"]
    landmark_variances = total_landmarks * variances[0:2]

    # Calculate intersection (jaccard index) between each bboxes and
    # ground truth boxes
    prior_box_corners = convert_xywh_to_bboxes(prior_boxes)
    iou_map = generate_iou_map(prior_box_corners, gt_boxes)

    # Get max index value for each row
    max_indices_each_gt_box = tf.argmax(iou_map, axis=2, output_type=tf.int32)
    # IoU map has iou values for every gt boxes and we merge these values
    # column wise
    merged_iou_map = tf.reduce_max(iou_map, axis=2)

    pos_cond = tf.greater(merged_iou_map, iou_threshold)

    gt_landmarks = tf.reshape(gt_landmarks,
                              (batch_size, -1, total_landmarks * 2))
    gt_boxes_and_landmarks = tf.concat([gt_boxes, gt_landmarks], -1)
    gt_boxes_and_landmarks_map = tf.gather(
        gt_boxes_and_landmarks, max_indices_each_gt_box, batch_dims=1
    )
    expanded_gt_boxes_and_landmarks = tf.where(
        tf.expand_dims(pos_cond, -1),
        gt_boxes_and_landmarks_map,
        tf.zeros_like(gt_boxes_and_landmarks_map),
    )
    actual_deltas = get_deltas_from_bboxes_and_landmarks(
        prior_boxes, expanded_gt_boxes_and_landmarks
    ) / (variances + landmark_variances)

    actual_labels = tf.expand_dims(tf.cast(pos_cond, dtype=tf.float32), -1)

    return actual_deltas, actual_labels


def generator_finite(dataset, prior_boxes, hyper_params):
    for image_data in dataset:
        img, gt_boxes, gt_landmarks = image_data
        actual_deltas, actual_labels = calculate_actual_outputs(
            prior_boxes, gt_boxes, gt_landmarks, hyper_params
        )
        yield img, (actual_deltas, actual_labels)


def generator_infinite(dataset, prior_boxes, hyper_params):
    while True:
        for image_data in dataset:
            img, gt_boxes, gt_landmarks = image_data
            actual_deltas, actual_labels = calculate_actual_outputs(
                prior_boxes, gt_boxes, gt_landmarks, hyper_params
            )
            yield img, (actual_deltas, actual_labels)

In [None]:
# ============ #
# Prepare data #
# ============ #

print("Loading tfds dataset:", config["tfds_dataset_name"])
train_split = "train[:{}%]".format(config["data_train_split_percentage"])
rest_split = "train[{}%:]".format(config["data_train_split_percentage"])
train_data, info = tfds.load(
    config["tfds_dataset_name"],
    split=train_split,
    data_dir=config["data_dir"],
    with_info=True,
)
rest_data, _ = tfds.load(
    config["tfds_dataset_name"],
    split=rest_split,
    data_dir=config["data_dir"],
    with_info=True,
)

train_total_items = info.splits[train_split].num_examples
rest_total_items = info.splits[rest_split].num_examples

val_total_items = int(rest_total_items / 2)
test_total_items = rest_total_items - val_total_items
print("train_total_items:    {:>10,d}".format(train_total_items))
print("val_total_items:      {:>10,d}".format(val_total_items))
print("test_total_items:     {:>10,d}".format(test_total_items))

val_data = rest_data.take(val_total_items)
test_data = rest_data.skip(val_total_items).take(test_total_items)

img_size = config["input_size"][0]

# import augmentation
train_data = train_data.map(lambda x: preprocessing(x, img_size, img_size,
                                                    apply_augmentation))
val_data = val_data.map(lambda x: preprocessing(x, img_size, img_size))
test_data = test_data.map(lambda x: preprocessing(x, img_size, img_size))

# (images, ground truth boxes, ground truth landmarks)
data_shapes = ([None, None, None], [None, None], [None, None, None])

# (images, ground truth boxes, ground truth landmarks)
padding_values = (
    tf.constant(0, tf.float32),
    tf.constant(0, tf.float32),
    tf.constant(0, tf.float32),
)

train_data = train_data.shuffle(config["batch_size"] * 12).padded_batch(
    config["batch_size"], padded_shapes=data_shapes,
    padding_values=padding_values
)
val_data = val_data.padded_batch(
    config["batch_size"], padded_shapes=data_shapes,
    padding_values=padding_values
)
test_data = test_data.padded_batch(
    1, padded_shapes=data_shapes, padding_values=padding_values
)

In [None]:
# ============ #
# Create model #
# ============ #


class BlazeFace(tf.Module):
    def __init__(self, config={}):
        super(BlazeFace, self).__init__()
        self.config = config
        self.config["input_shape"] = [
            config["input_size"][0],
            config["input_size"][1],
            3,
        ]
        self.model = None

    def single_blaze_block(
        self, inputs, filters=24, kernel_size=5, strides=1, padding="same"
    ):
        depthwise1 = tf.keras.layers.DepthwiseConv2D(
            kernel_size=kernel_size, strides=strides, padding=padding
        )(inputs)
        conv1 = tf.keras.layers.Conv2D(
            filters=1, kernel_size=1, padding=padding, activation=None
        )(depthwise1)
        # residual connection
        if strides == 2:
            maxpool1 = tf.keras.layers.MaxPool2D(pool_size=2)(inputs)
            conv2 = tf.keras.layers.Conv2D(
                filters=filters, kernel_size=1, padding=padding
            )(maxpool1)
            output = tf.keras.layers.Add()([conv1, conv2])
        else:
            output = tf.keras.layers.Add()([conv1, inputs])
        return tf.keras.layers.Activation("relu")(output)

    def double_blaze_block(
        self, inputs, filters1=24, filters2=96, kernel_size=5, strides=1,
        padding="same"
    ):
        depthwise1 = tf.keras.layers.DepthwiseConv2D(
            kernel_size=kernel_size, strides=strides, padding=padding
        )(inputs)
        conv1 = tf.keras.layers.Conv2D(
            filters=filters1, kernel_size=1, padding=padding, activation="relu"
        )(depthwise1)
        depthwise2 = tf.keras.layers.DepthwiseConv2D(
            kernel_size=kernel_size, strides=1, padding=padding
        )(conv1)
        conv2 = tf.keras.layers.Conv2D(
            filters=filters2, kernel_size=1, padding=padding
        )(depthwise2)
        # residual
        if strides == 2:
            maxpool1 = tf.keras.layers.MaxPool2D(pool_size=2)(inputs)
            conv3 = tf.keras.layers.Conv2D(
                filters=filters2, kernel_size=1, padding=padding
            )(maxpool1)
            output = tf.keras.layers.Add()([conv2, conv3])
        else:
            output = tf.keras.layers.Add()([conv2, inputs])
        return tf.keras.layers.Activation("relu")(output)

    def get_feature_extractor(self):
        # =========== #
        # Input layer #
        # =========== #
        inputs = tf.keras.layers.Input(shape=self.config["input_shape"])

        # ======================= #
        # First convolution layer #
        # ======================= #
        # input 128x128x3; kernel 5x5x3x24, stride 2
        convolution1 = tf.keras.layers.Conv2D(
            filters=24, kernel_size=5, strides=2, padding="same",
            activation="relu"
        )(inputs)

        # ======================== #
        # Single blaze block phase #
        # ======================== #
        # input 64x64x24; kernels 5x5x24x1; 1x1x24x24
        single_blaze1 = self.single_blaze_block(convolution1, filters=24)
        # input 64x64x24; kernels 5x5x24x1; 1x1x24x24
        single_blaze2 = self.single_blaze_block(single_blaze1, filters=24)
        # input 64x64x24; kernels 5x5x24x1, stride 2; 1x1x24x48
        single_blaze3 = self.single_blaze_block(single_blaze2, filters=48,
                                                strides=2)
        # input 32x32x48; kernels 5x5x48x1; 1x1x48x48
        single_blaze4 = self.single_blaze_block(single_blaze3, filters=48)
        # input 32x32x48; kernels 5x5x48x1; 1x1x48x48
        single_blaze5 = self.single_blaze_block(single_blaze4, filters=48)

        # ======================== #
        # Double blaze block phase #
        # ======================== #
        # input 32x32x48; kernels 5x5x48x1, stride 2; 1x1x48x24; 5x5x24x1;
        # 1x1x24x96
        double_blaze1 = self.double_blaze_block(
            single_blaze5, filters1=24, filters2=96, strides=2
        )
        # input 16x16x96; kernels 5x5x96x1; 1x1x96x24; 5x5x24x1; 1x1x24x96
        double_blaze2 = self.double_blaze_block(
            double_blaze1, filters1=24, filters2=96, strides=1
        )
        # input 16x16x96; kernels 5x5x96x1; 1x1x96x24; 5x5x24x1; 1x1x24x96
        double_blaze3 = self.double_blaze_block(
            double_blaze2, filters1=24, filters2=96, strides=1
        )
        # input 16x16x96; kernels 5x5x96x1, stride 2; 1x1x96x24; 5x5x24x1;
        # 1x1x24x96
        double_blaze4 = self.double_blaze_block(
            double_blaze3, filters1=24, filters2=96, strides=2
        )
        # input 8x8x96; kernels 5x5x96x1; 1x1x96x24; 5x5x24x1; 1x1x24x96
        double_blaze5 = self.double_blaze_block(
            double_blaze4, filters1=24, filters2=96, strides=1
        )
        # input 8x8x96; kernels 5x5x96x1; 1x1x96x24; 5x5x24x1; 1x1x24x96
        double_blaze6 = self.double_blaze_block(
            double_blaze5, filters1=24, filters2=96, strides=1
        )

        return Model(inputs=inputs, outputs=[double_blaze3, double_blaze6])

    def build_model(self, compile=False):
        feature_extractor = self.get_feature_extractor()

        total_reg_points = self.config["total_landmarks"] * 2 + 4

        # ============== #
        # Detection head #
        # ============== #
        # labels: 16x16, [batch, 16, 16, 6]
        labels1 = tf.keras.layers.Conv2D(
            filters=self.config["detections_per_layer"][1] * 1,
            kernel_size=3,
            padding="same",
        )(feature_extractor.output[0])
        labels1_reshaped = tf.keras.layers.Reshape(
            (16**2 * self.config["detections_per_layer"][1], 1)
        )(labels1)
        # labels: 8x8, [batch, 8, 8, 2]
        labels2 = tf.keras.layers.Conv2D(
            filters=self.config["detections_per_layer"][0] * 1,
            kernel_size=3,
            padding="same",
        )(feature_extractor.output[1])
        labels2_reshaped = tf.keras.layers.Reshape(
            (8**2 * self.config["detections_per_layer"][0], 1)
        )(labels2)
        # labels: [batch, 896, 1]
        labels_combined = tf.keras.layers.Concatenate(name="conf", axis=1)(
            [labels1_reshaped, labels2_reshaped]
        )

        # boxes: 16x16, [x, y, w, h] # x6
        boxes1 = tf.keras.layers.Conv2D(
            filters=self.config["detections_per_layer"][1] * total_reg_points,
            kernel_size=3,
            padding="same",
        )(feature_extractor.output[0])
        boxes1_reshaped = tf.keras.layers.Reshape(
            (16**2 * self.config["detections_per_layer"][1], total_reg_points)
        )(boxes1)
        # boxes: 8x8, [x, y, w, h] # x2
        boxes2 = tf.keras.layers.Conv2D(
            self.config["detections_per_layer"][0] * total_reg_points,
            kernel_size=3,
            padding="same",
        )(feature_extractor.output[1])
        boxes2_reshaped = tf.keras.layers.Reshape(
            (8**2 * self.config["detections_per_layer"][0], total_reg_points)
        )(boxes2)
        # boxes: [batch, 896, 1]
        boxes_combined = tf.keras.layers.Concatenate(name="loc", axis=1)(
            [boxes1_reshaped, boxes2_reshaped]
        )

        self.model = Model(
            inputs=feature_extractor.input,
            outputs=[boxes_combined, labels_combined]
        )

        self.init_model(self.model, compile=compile)

        return self.model

    def init_model(self, model, compile=True):
        # Compile model with loss function, and initialise with random data
        if compile is True:
            custom_losses = CustomLoss(
                self.config["neg_pos_ratio"], self.config["loc_loss_alpha"]
            )
            model.compile(
                optimizer=Adam(learning_rate=self.get_learning_rate()),
                loss=[custom_losses.loc_loss_fn, custom_losses.conf_loss_fn],
            )

        img_size = model.input.shape[1]
        model(tf.random.uniform((1, img_size, img_size, 3)))

    def get_learning_rate(self, epoch=1):
        if epoch < 100:
            learning_rate = 1e-3
        elif epoch < 125:
            learning_rate = 1e-4
        else:
            learning_rate = 1e-5
        return learning_rate

    @tf.function(
        input_signature=[
            tf.TensorSpec(
                [None, config["input_size"][0], config["input_size"][0], 3],
                dtype=tf.dtypes.float32,
                name="input_1",
            )
        ]
    )
    def predict(self, inputs):
        prediction = self.model(inputs)
        #         outputs = {'loc':  prediction[0],
        #                    'conf': prediction[1]}
        #         return {outputs}
        return {"loc": prediction[0], "conf": prediction[1]}


blazeface = BlazeFace(config)
model = blazeface.build_model()
print("model input:", model.input)
print("model output:", model.output)
print("model signature:", blazeface.predict.get_concrete_function())
if PLOT_MODEL:
    model.summary()
    tf.keras.utils.plot_model(
        model, to_file=model_plot_file, show_shapes=True, show_layer_names=True
    )

In [None]:
# =================== #
# Save & export model #
# =================== #

concrete_function = blazeface.predict.get_concrete_function()

if LOAD_SAVED_WEIGHTS:
    print("Load saved weights: ", weights_file)
    model.load_weights(weights_file)

if EXPORT_SAVED_MODEL:
    print("Save model: ", saved_model_path)
    tf.saved_model.save(
        blazeface,
        saved_model_path,
        signatures={"predict": concrete_function},
    )

if EXPORT_TFLITE_MODEL:
    print("Save TFLite file:", tflite_file)
    #     converter = tf.lite.TFLiteConverter.from_concrete_functions(
    #         [concrete_function],
    #         blazeface) # or model
    #     converter = tf.lite.TFLiteConverter.from_saved_model(saved_model_path)
    #     converter = tf.lite.TFLiteConverter.from_keras_model(model)
    converter = tf.lite.TFLiteConverter.from_keras_model(blazeface)  # or model
    tflite_model = converter.convert()

    # save TFLite file
    with open(tflite_file, "wb") as f:
        f.write(tflite_model)
    print("Wrote TFLite model of %d bytes." % (len(tflite_model)))

    # Print the signatures from the converted model
    interpreter = tf.lite.Interpreter(model_content=tflite_model)
    signatures = interpreter.get_signature_list()
    print("signatures:", signatures)

if EXPORT_QUANTIZED_MODEL:
    print("Save TFLite quantized file:", tflite_quant_file)
    tf.compat.v1.enable_eager_execution()
    # Quantization settings
    num_calibration_examples = 100
    representative_dataset = None

    print(
        "- using {} test inputs to optimise quantization".format(
            num_calibration_examples
        )
    )

    def representative_dataset():
        for image_data in test_data.take(num_calibration_examples):
            img, gt_boxes, gt_landmarks = image_data
            yield {"input_1": img}

    #     converter = tf.lite.TFLiteConverter.from_concrete_functions(
    #         [concrete_function],
    #         blazeface) # model
    #     converter = tf.lite.TFLiteConverter.from_saved_model(saved_model_path)
    #     converter = tf.lite.TFLiteConverter.from_keras_model(model)
    converter = tf.lite.TFLiteConverter.from_keras_model(blazeface)  # or model
    converter.optimizations = [tf.lite.Optimize.DEFAULT]
    if representative_dataset:  # This is optional, see above.
        converter.representative_dataset = representative_dataset
    converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
    converter.inference_input_type = tf.int8  # or tf.uint8
    converter.inference_output_type = tf.int8  # or tf.uint8
    tflite_model = converter.convert()

    with open(tflite_quant_file, "wb") as f:
        f.write(tflite_model)
    print("Wrote TFLite model of %d bytes." % (len(tflite_model)))

    # Print the signatures from the converted model
    interpreter = tf.lite.Interpreter(model_content=tflite_model)
    signatures = interpreter.get_signature_list()
    print("signatures:", signatures)

In [None]:
# ============== #
# Inference Test #
# ============== #

variances = blazeface.config["variances"]
total_landmarks = blazeface.config["total_landmarks"]

landmark_variances = total_landmarks * variances[0:2]
total_variances = variances + landmark_variances


def draw_bboxes(imgs, bboxes):
    """Drawing bounding boxes on given images.
    inputs:
        imgs = (batch_size, height, width, channels)
        bboxes = (batch_size, total_bboxes, [y1, x1, y2, x2])
            in normalized form [0, 1]
    """
    colors = tf.constant([[1, 0, 0, 1]], dtype=tf.float32)
    imgs_with_bb = tf.image.draw_bounding_boxes(imgs, bboxes, colors)
    plt.figure()
    for img_with_bb in imgs_with_bb:
        plt.imshow(img_with_bb)
        plt.show()


def local_draw_bboxes_with_landmarks(img, bboxes, landmarks, scores=None):
    """Drawing bounding boxes and landmarks on given image.
    inputs:
        img = (height, width, channels)
        bboxes = (total_bboxes, [y1, x1, y2, x2])
        landmarks = (total_bboxes, total_landmarks, [x, y])
        scores = (totalbboxes, [scores])
    """
    image = tf.keras.preprocessing.image.array_to_img(img)
    width, height = image.size
    draw = ImageDraw.Draw(image)
    color = (255, 0, 0, 255)
    fnt = None
    for index, bbox in enumerate(bboxes):
        y1, x1, y2, x2 = tf.split(bbox, 4)
        width = x2 - x1
        height = y2 - y1
        if width <= 0 or height <= 0:
            continue
        draw.rectangle((x1, y1, x2, y2), outline=color, width=1)
        try:
            score = scores[index]
            draw.text((x1, y1 - 10), '{:.2f}'.format(score),
                      font=fnt, fill=color)
        except Exception:
            pass
    for index, landmark in enumerate(landmarks):
        if tf.reduce_max(landmark) <= 0:
            continue
        rects = tf.concat([landmark - 1, landmark + 1], -1)
        for rect in rects:
            draw.ellipse(rect, fill=color)

    return image


def singleImageTensorToPil(single_img_tensor):
    img_pil = tf.keras.preprocessing.image.array_to_img(single_img_tensor)
    return img_pil


# Inference tests
for image_data in test_data.take(config["num_test_images"]):

    # Get test image
    img, shapes, values = image_data

    # Run inference
    pred_deltas, pred_scores = model.predict_on_batch(img)

    pred_deltas *= total_variances

    pred_bboxes_and_landmarks = get_bboxes_and_landmarks_from_deltas(
        prior_boxes, pred_deltas
    )
    pred_bboxes_and_landmarks = tf.clip_by_value(pred_bboxes_and_landmarks,
                                                 0, 1)

    pred_scores = tf.cast(pred_scores, tf.float32)

    weighted_suppressed_data, filtered_scores = weighted_suppression(
        pred_scores[0], pred_bboxes_and_landmarks[0]
    )

    weighted_bboxes = weighted_suppressed_data[..., 0:4]
    weighted_landmarks = weighted_suppressed_data[..., 4:]

    denormalized_bboxes = denormalize_bboxes(weighted_bboxes,
                                             img_size, img_size)
    weighted_landmarks = tf.reshape(weighted_landmarks,
                                    (-1, total_landmarks, 2))
    denormalized_landmarks = denormalize_landmarks(
        weighted_landmarks, img_size, img_size
    )

    img_plot = local_draw_bboxes_with_landmarks(
        img[0], denormalized_bboxes, denormalized_landmarks,
        filtered_scores[..., 0]
    )

    bboxes_truth_abs = denormalize_bboxes(shapes[0], img_size, img_size)
    landmarks_truth_abs = denormalize_landmarks(values[0], img_size, img_size)
    img_plot_truth = local_draw_bboxes_with_landmarks(
        img[0], bboxes_truth_abs, landmarks_truth_abs
    )

    width = 10
    height = 10
    fig, axarr = plt.subplots(1, 2, figsize=(width, height))

    axarr[0].set_title("detection")
    axarr[0].imshow(img_plot)

    axarr[1].set_title("ground truth")
    axarr[1].imshow(img_plot_truth)

    plt.show()

In [None]:
# TFLite inference test #

# Load the TFLite model in TFLite Interpreter
print("Loading TFLite file: ", tflite_file)
interpreter = tf.lite.Interpreter(tflite_file)
interpreter.allocate_tensors()

# check the type of the input tensor
floating_model = interpreter.get_input_details()[0]["dtype"] == np.float32

USE_SIGNATURE_FN = False

if USE_SIGNATURE_FN:
    # something in this next section causes problems for inference
    signatures = interpreter.get_signature_list()
    print("signatures:", signatures)
    sig_key = list(signatures.keys())[0]
    # print('signature key:', sig_key)
    my_signature_fn = interpreter.get_signature_runner(sig_key)
else:
    input_details = interpreter.get_input_details()[0]
    output_details = interpreter.get_output_details()
    input_index = input_details["index"]
    output0_index = output_details[0]["index"]
    output1_index = output_details[1]["index"]

# Inference tests
for image_data in test_data.take(config["num_test_images"]):

    # Get test image
    img, shapes, values = image_data

    # Run inference
    pred_deltas, pred_scores = model.predict_on_batch(img)

    pred_deltas *= total_variances

    pred_bboxes_and_landmarks = get_bboxes_and_landmarks_from_deltas(
        prior_boxes, pred_deltas
    )
    pred_bboxes_and_landmarks = tf.clip_by_value(pred_bboxes_and_landmarks,
                                                 0, 1)

    pred_scores = tf.cast(pred_scores, tf.float32)

    weighted_suppressed_data, filtered_scores = weighted_suppression(
        pred_scores[0], pred_bboxes_and_landmarks[0]
    )

    weighted_bboxes = weighted_suppressed_data[..., 0:4]
    weighted_landmarks = weighted_suppressed_data[..., 4:]

    denormalized_bboxes = denormalize_bboxes(weighted_bboxes,
                                             img_size, img_size)
    weighted_landmarks = tf.reshape(weighted_landmarks,
                                    (-1, total_landmarks, 2))
    denormalized_landmarks = denormalize_landmarks(
        weighted_landmarks, img_size, img_size
    )

    img_plot = local_draw_bboxes_with_landmarks(
        img[0], denormalized_bboxes, denormalized_landmarks,
        filtered_scores[..., 0]
    )

    bboxes_truth_abs = denormalize_bboxes(shapes[0], img_size, img_size)
    landmarks_truth_abs = denormalize_landmarks(values[0], img_size, img_size)
    img_plot_truth = local_draw_bboxes_with_landmarks(
        img[0], bboxes_truth_abs, landmarks_truth_abs
    )

    width = 10
    height = 10
    fig, axarr = plt.subplots(1, 2, figsize=(width, height))

    axarr[0].set_title("detection")
    axarr[0].imshow(img_plot)

    axarr[1].set_title("ground truth")
    axarr[1].imshow(img_plot_truth)

    plt.show()