In [2]:
cd ..

C:\Users\nick\Desktop\tfcv


In [3]:
import os
import numpy as np
from tqdm import tqdm
import matplotlib.pyplot as plt
import matplotlib.patches as patches

import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Conv2D, Flatten, Dropout, MaxPooling2D, Concatenate, UpSampling2D, Add
import tensorflow_datasets as tfds

from initializers import PriorProbability

In [None]:
print(f'Tensorflow Version: {tf.__version__}')
print(f'TFDS Version: {tfds.__version__}')
print(f"GPU: {tf.config.list_physical_devices('GPU')}")

tf.keras.backend.clear_session()  # For easy reset of notebook state.

# 1. Tensorflow Datasets (TFDS)

In [None]:
#import warnings
#warnings.filterwarnings('ignore')
train, info = tfds.load('wider_face', split="train", with_info=True)
validation = tfds.load('wider_face', split="validation")

In [None]:
info

In [None]:
# data comes in tuples with a 28x28x1 array of uint8 for the images and 1 uint64 label
train.take(1)

In [None]:
ex = train.take(2)
imgs = [x['image'].numpy() for x in ex]
faces = [x['faces'] for x in ex]
faces[1]['bbox'].shape

In [None]:
def plot_images(ds=train, n=5, fig_size=128, font_size=32, show_boxes=False):
    """Plot n samples of images"""
    examples = ds.take(n)
    # 28x28 to visualize (dont need the color channel)
    
    imgs = [x['image'].numpy() for x in examples]
     # use our int2str to get string of int labels
    labels = [x['image/filename'].numpy() for x in examples]
    # separate the class since it is pretty long
    labels = [list(os.path.split(l))[-1:][0] for l in labels]
    all_faces = [x['faces']['bbox'] for x in examples]
    
    def transform_bbox(bbox, img_width, img_height):
        #  (ymin, xmin, ymax, xmax) -> (xmin, ymin, w, h)
        [x_min, x_max] = np.array([x_min, x_max])*img_width
        [y_min, y_max] = np.array([y_min, y_max])*img_height
        w = (xmax - xmin) 
        h = ymax - ymin 
        return [xmin, ymin, w, h]
    
    fig, axes = plt.subplots(1, len(imgs), figsize=(fig_size, fig_size))
    axes = axes.flatten()
    for img, ax, label, faces in zip(imgs, axes, labels, all_faces):
        ax.imshow(img)
        #ax.set_title(label, color='w', fontsize=font_size)
        ax.set_title(str(faces.shape[0]), color='w', fontsize=font_size)
        ax.axis('off')
                
    plt.tight_layout()
    plt.show()
    
faces=plot_images(n=10, font_size=128)

In [None]:
def plot_bbox(ds=train, n=100, _index=0):
    """Plot bbox on a photo"""
    ex = ds.take(n)
    
    img = [x['image'].numpy() for x in ex][_index]
    faces = [x['faces']['bbox'].numpy().tolist() for x in ex][_index]
    
    def transform_bbox(bbox, img_width, img_height):
        #  (ymin, xmin, ymax, xmax) -> (xmin, ymin, w, h)
        xmin = bbox[1]*img_width
        ymin = bbox[0]*img_height
        xmax = bbox[3]*img_width
        ymax = bbox[2]*img_height
        
        w = (xmax - xmin) 
        h = ymax - ymin 
        return [xmin, ymin, w, h]
    
    #fig, ax = plt.subplots(1, figsize=(fig_size, fig_size))
    fig, ax = plt.subplots(1)
    ax.imshow(img)
    ax.axis('off')
        
    for bbox in faces:
        bbox = transform_bbox(bbox, img.shape[0], img.shape[1])
        rect = patches.Rectangle((bbox[0], bbox[1]),bbox[3], bbox[2], linewidth=1, edgecolor='g', facecolor='none')
        ax.add_patch(rect)
                
    plt.tight_layout()
    plt.show()
    
plot_bbox(_index=4)

In [None]:
def preprocess(features):
    image = features['image']
    image = tf.image.resize(image, (640, 640))
    image = image / 255.0
    
    bboxes = features['faces']['bbox']
    bboxes = tf.concat(bboxes, axis=0)
    
    bboxes = tf.gather(bboxes, indices=[1, 0, 3, 2], axis=1) # I created the dataset wrong; should be (xmin, ymin, xmax, ymax)
    return image, bboxes

In [None]:
# train = train.shuffle(buffer_size=128).map(preprocess).batch(32)

# train.take(1)

In [None]:
# train = train.prefetch(tf.data.experimental.AUTOTUNE)

In [None]:
for ex in train.take(1):
    print(preprocess(ex)[1])

# 2. Model

## 2.1 Pretrained Model (Backbone)

In [4]:

input_shape = (640, 640, 3)

backbone_model = tf.keras.applications.ResNet50(weights='imagenet',
                                                input_shape=input_shape, 
                                                include_top=False)
#backbone_model.trainable = False

inputs = tf.keras.Input(shape=input_shape)
# We make sure that the base_model is running in inference mode here,
# by passing `training=False`. This is important for fine-tuning, as you will
# learn in a few paragraphs.
x = backbone_model(inputs, training=False)


A local file was found, but it seems to be incomplete or outdated because the auto file hash does not match the original value of 4d473c1dd8becc155b73f8504c6f6626 so we will re-download the data.
Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/resnet/resnet50_weights_tf_dim_ordering_tf_kernels_notop.h5


In [7]:
# to get this to work for me on windows i needed to conda install pydotplus and python-graphviz then
#change the path in pydotplut/graphviz.py as in https://datascience.stackexchange.com/questions/37428/graphviz-not-working-when-imported-inside-pydotplus-graphvizs-executables-not/61840#61840?newreg=fff7d63feca2460ab2062f1a9c450adb
tf.keras.utils.plot_model(backbone_model, show_shapes=True, show_layer_names=True, to_file='object_detection/images/resnet50.png');

In [8]:
backbone_model.summary()

Model: "resnet50"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            [(None, 640, 640, 3) 0                                            
__________________________________________________________________________________________________
conv1_pad (ZeroPadding2D)       (None, 646, 646, 3)  0           input_1[0][0]                    
__________________________________________________________________________________________________
conv1_conv (Conv2D)             (None, 320, 320, 64) 9472        conv1_pad[0][0]                  
__________________________________________________________________________________________________
conv1_bn (BatchNormalization)   (None, 320, 320, 64) 256         conv1_conv[0][0]                 
___________________________________________________________________________________________

## 2.2 Custom Model

In [None]:
#model subclassing not working b/c Add layer doesnt catch shape changes
tf.keras.backend.clear_session() 

class RetinaFace(tf.keras.Model):
    def __init__(self, input_shape=(640, 640, 3)):
        super(RetinaFace, self).__init__()
        
        self.backbone_model = tf.keras.applications.ResNet152(weights='imagenet', 
                                                              input_shape=input_shape,
                                                              include_top=False)
        self.backbone_model.trainable = False # freeze all layers of ResNet
        self.p2_lateral_conv = Conv2D(256, kernel_size=(1, 1), strides=1, padding='same', activation=None, name='p2_lateral_conv')
        self.p3_lateral_conv = Conv2D(256, kernel_size=(1, 1), strides=1, padding='same', activation=None, name='p3_lateral_conv')
        self.p4_lateral_conv = Conv2D(256, kernel_size=(1, 1), strides=1, padding='same', activation=None, name='p4_lateral_conv')
        self.p5_lateral_conv = Conv2D(256, kernel_size=(1, 1), strides=1, padding='same', activation=None, name='p5_lateral_conv')
        # glorot same as xavier
        self.p6_lateral_conv = Conv2D(256, kernel_size=(3, 3), strides=2, padding='same', activation=None, kernel_initializer='glorot_normal', name='p6_lateral_conv')
    
        
    def call(self, inputs):
        anchors = Anchors()
        anchors = anchors.generate_all_anchors()
        print(anchors.shape)
        
        batch_size = tf.shape(inputs)[0]
        x = self.backbone_model(inputs)
        p6 = self.p6_lateral_conv(x)
        p5 = self.p5_lateral_conv(x)
        
        p4_lateral_conv = self.p4_lateral_conv(self.backbone_model.get_layer('conv4_block36_out').output)
        p4_lateral_conv = tf.reshape(p4_lateral_conv, [batch_size, 40, 40, 256])
        p5_upsampled = UpSampling2D(size=(2, 2), name='p5_upsampled')(p5)
        p4_add = Add(name='p4_add')([p4_lateral_conv, p5_upsampled])
        p4 = Conv2D(256, kernel_size=(3, 3), strides=1, padding='same', activation=None, name='p4_conv_out')(p4_add)
        
        p3_lateral_conv = self.p3_lateral_conv(self.backbone_model.get_layer('conv3_block8_out').output)
        p3_lateral_conv = tf.reshape(p3_lateral_conv, [batch_size, 80, 80, 256])
        p4_upsampled = UpSampling2D(size=(2, 2), name='p4_upsampled')(p4)
        p3_add = Add(name='p3_add')([p3_lateral_conv, p4_upsampled])
        p3 = Conv2D(256, kernel_size=(3, 3), strides=1, padding='same', activation=None, name='p3_conv_out')(p3_add)
        
        p2_lateral_conv = self.p2_lateral_conv(self.backbone_model.get_layer('conv2_block3_out').output)
        p2_lateral_conv = tf.reshape(p2_lateral_conv, [batch_size, 160, 160, 256])
        p3_upsampled = UpSampling2D(size=(2, 2), name='p3_upsampled')(p3)
        p2_add = Add(name='p2_add')([p2_lateral_conv, p3_upsampled])
        p2 = Conv2D(256, kernel_size=(3, 3), strides=1, padding='same', activation=None, name='p2_conv_out')(p2_add)
        
        features = [p2, p3, p4, p5, p6]
        
        #return features

        # K=2 since face or not face
        classification_outputs = []
        for feature_layer in features:
            classification_outputs.append(ClassificationSubnet(K=2, A=3, prior=.01)(feature_layer))
        classification_outputs = tf.keras.layers.Concatenate(axis=1, name='classification_outputs')(classification_outputs)
        
        # bounding box regression
        regression_outputs = []
        for feature_layer in features:
            regression_outputs.append(RegressionSubnet(n_landmarks=4, A=3)(feature_layer))
        regression_outputs = tf.keras.layers.Concatenate(axis=1, name='regression_outputs')(regression_outputs)
        print(regression_outputs.shape)
#         # facial landmark regression
#         landmarks_outputs = []
#         for feature_layer in features:
#             landmarks_outputs.append(RegressionSubnet(n_landmarks=5, A=3)(feature_layer))
        #return features    
        #return classification_outputs, regression_outputs
        
         # apply predicted regression to anchors
        boxes = RegressBoxes(name='boxes')([anchors, regression_outputs])
        boxes = ClipBoxes(name='clipped_boxes')([model.inputs[0], boxes])
        
        detections = FilterDetections(nms                   = nms,
                                      class_specific_filter = class_specific_filter,
                                      name                  = 'filtered_detections',
                                      nms_threshold         = nms_threshold,
                                      score_threshold       = score_threshold,
                                      max_detections        = max_detections,
                                      parallel_iterations   = parallel_iterations)([boxes, classification])
        return detections
        
        
    
    
        #now we need to filter the ones from the anchors
    
#     def focal_loss():
#         pass
    
#     def train_step(self, data):
#         img, bboxes = data
#         boxes = RegressBoxes(name='boxes')([anchors, regression])
#         boxes = ClipBoxes(name='clipped_boxes')([model.inputs[0], boxes])
        
        
        
        
#         predicted_transforms = self(img)
        
    
model = RetinaFace()

In [None]:
for ex in train.take(1):
    img, bboxes = preprocess(ex)
    img = tf.expand_dims(img, 0)
    detections = model(img)

In [None]:
detections

Output of FPN

In [None]:
# test
for ex in train.take(1):
    img, bboxes = preprocess(ex)
    features = model(tf.expand_dims(img, 0))
    
for f in features:
    print(f.shape)

In [None]:
## 2. 1 Pretrained Model

In [None]:
class ClassificationSubnet(tf.keras.layers.Layer):
    """Classification Subnet on top of FPN"""
    
    def __init__(self, K, A, prior):
        super(ClassificationSubnet, self).__init__()
        self.K = K
        self.A = A
        self.prior = prior
        
    def call(self, inputs):
        outputs = inputs
        for i in range(4):
                outputs = Conv2D(256, 
                                 kernel_size=(3, 3), 
                                 strides=1,
                                 padding='same',
                                 kernel_initializer=tf.keras.initializers.RandomNormal(mean=0.0, stddev=0.01, seed=None),
                                 bias_initializer='zeros',
                                 activation='relu')(outputs)
        
        outputs = Conv2D(self.K*self.A,
                         kernel_size=(3, 3),
                         strides=1,
                         padding='same',
                         kernel_initializer=tf.keras.initializers.RandomNormal(mean=0.0, stddev=0.01, seed=None),
                         bias_initializer=PriorProbability(probability=self.prior),
                         activation='relu')(outputs)  # (w x h x C) - (w x h x (K*A))
        
        # K anchors at each center pixel (width*height)
        outputs = tf.keras.layers.Reshape((-1, self.K))(outputs) # (w x h x (K*A)) - (w*h*A, K) 
        outputs = tf.keras.layers.Activation('sigmoid')(outputs)
        
        return outputs

In [None]:
# # test
for ex in train.take(1):
    img, bboxes = preprocess(ex)
    classification_outputs, _ = model(tf.expand_dims(img, 0))
    
total=0
for o in classification_outputs:
    print(o.shape)
    total+=o.shape.as_list()[1]
    
print(f'\nTotal: {total} anchors')

In [None]:
# OR... (notice it matches the paper)
total=0
for x in [160, 80, 40, 20, 10]:
    total += (x**2)*3
print(total)


In [None]:
class RegressionSubnet(tf.keras.layers.Layer):
    """Regression Subnet on top of FPN"""
    
    def __init__(self, n_landmarks, A):
        super(RegressionSubnet, self).__init__()
        self.n_landmarks = n_landmarks
        self.A = A
        
    def call(self, inputs):
        outputs = inputs
        for i in range(4):
                outputs = Conv2D(256, 
                                 kernel_size=(3, 3), 
                                 strides=1,
                                 padding='same',
                                 kernel_initializer=tf.keras.initializers.RandomNormal(mean=0.0, stddev=0.01, seed=None),
                                 bias_initializer='zeros',
                                 activation='relu')(outputs)
        
        outputs = Conv2D(self.n_landmarks*self.A,
                         kernel_size=(3, 3),
                         strides=1,
                         padding='same',
                         kernel_initializer=tf.keras.initializers.RandomNormal(mean=0.0, stddev=0.01, seed=None),
                         bias_initializer='zeros',
                         activation='relu')(outputs)  # (w x h x C) - (w x h x (K*A))
        
        # K anchors at each center pixel (width*height)
        outputs = tf.keras.layers.Reshape((-1, self.n_landmarks))(outputs) # (w x h x (n_landmarks*A)) - (w*h*n_landmarks, K) 
        
        return outputs

In [None]:
for ex in train.take(1):
    img, bboxes = preprocess(img)
    _, regression_outputs = model(tf.expand_dims(img, 0))
    
for o in regression_outputs:
    print(o.shape)

# 3. Anchor Boxes

## 3.1 Create Base Anchors

In [None]:
#sizes = [32, 64, 128, 256] # RetinaNet
sizes = [16, 32, 64, 128, 256] # RetinaFace since trying to capture smaller faces
strides = [4, 8, 16, 32, 64]
#ratios  = [0.5, 1, 2] $ RetinaNet
ratios = [1] # RetinaFace since most faces are squares
scales = [2 ** 0, 2 ** (1.0 / 3.0), 2 ** (2.0 / 3.0)]

In [None]:
num_anchors = len(ratios) * len(scales)
num_anchors

anchors = np.zeros((num_anchors, 4))
print(f'{anchors.shape[0]} anchors x {anchors.shape[1]} coordinates')

In [None]:
base_size = 16
base_size * np.tile(scales, (2, len(ratios))).T

In [None]:
anchors[:, 2:] = base_size * np.tile(scales, (2, len(ratios))).T
print(anchors)

In [None]:
areas = anchors[:, 2] * anchors[:, 3]
areas

In [None]:
np.repeat(ratios, len(scales))

In [None]:
anchors[:, 2] = np.sqrt(areas / np.repeat(ratios, len(scales)))

print(anchors)

In [None]:
anchors[:, 3] = anchors[:, 2] * np.repeat(ratios, len(scales))

print(anchors)

In [None]:
# transform from (x_ctr, y_ctr, w, h) -> (x1, y1, x2, y2)
anchors[:, 0::2] -= np.tile(anchors[:, 2] * 0.5, (2, 1)).T
anchors[:, 1::2] -= np.tile(anchors[:, 3] * 0.5, (2, 1)).T

print(anchors)

## 3.2 Create Bounding Boxes to Regress Against (ex: P2)

In [None]:
#tf.squeeze removes the batch size index
p2_features = tf.squeeze(features[0], axis=0)
print('p2_features shape:', p2_features.shape)

p2_anchors = {'size': sizes[0], 'stride': strides[0], 'ratios': ratios, 'scales': scales}
print('p2_anchors:', p2_anchors)


**tf.keras.backend.shift**

In [None]:
# notice these are the center pixels
stride = p2_anchors['stride']

shift_x = (tf.keras.backend.arange(0, p2_features.shape[1], dtype=tf.float32) + tf.keras.backend.constant(0.5, dtype=tf.float32)) * stride
shift_y = (tf.keras.backend.arange(0, p2_features.shape[0], dtype=tf.float32) + tf.keras.backend.constant(0.5, dtype=tf.float32)) * stride

# a tensor that supports cartesian indexing
shift_x, tf.meshgrid(shift_x)
shift_y = tf.meshgrid(shift_y)

# make sure of 1-dimensionsal 
shift_x = tf.keras.backend.reshape(shift_x, [-1])
shift_y = tf.keras.backend.reshape(shift_y, [-1])

#notice this will be the same for shift_y since the height is equal to the width in our input_shape as in RetinaNet
print(f'Placing the anchor boxes every {stride} strides on a box of width {p2_features.shape[0]} (output of p2) yields:\n')
print(shift_x)

In [None]:
shifts = tf.keras.backend.stack([shift_x, shift_y, shift_x, shift_y], axis=0)
shifts = tf.keras.backend.transpose(shifts)

print(shifts.shape)
print(shifts[:][:5])

In [None]:
n_anchors = tf.keras.backend.shape(anchors)[0]
print(f'{n_anchors} distinct anchors ({len(ratios)} aspect ratios * {len(scales)} scales)')

k = tf.keras.backend.shape(shifts)[0]
print(f'k={k} (center pixels on low resolution images which anchors will repeat on)')

In [None]:

anchors = tf.cast(anchors, dtype=tf.float32)
shifts = tf.cast(shifts, dtype=tf.float32)

shifted_anchors = tf.keras.backend.reshape(anchors, [1, n_anchors, 4]) + tf.keras.backend.reshape(shifts, [k, 1, 4])
shifted_anchors = tf.keras.backend.reshape(shifted_anchors, [k * n_anchors, 4])

print('Given our initial anchor box shapes, we can add them to each of our coordinates or shift according to the stride')
print(f'{n_anchors} anchors at {shifts.shape[0]} centers/shifts = {shifted_anchors.shape[0]} total anchors for P2 with {shifted_anchors.shape[1]} coordinates each corresponding to ()\n')
print(shifted_anchors)

In [None]:
tf.keras.backend.tile(tf.keras.backend.expand_dims(shifted_anchors, axis=0), (160, 1, 1))

In [None]:
# draw them on image

## 3.3 Modularize

In [None]:
class Anchors(tf.keras.layers.Layer):
    """Create anchor boxes for Object Detection"""
    
    def __init__(self, resolution=None, sizes=None, strides=None, ratios=None, scales=None):
        """Initiliaze parameters for Anchor Boxes"""
        # strides and sizes align with FPN feature outputs (p2-pn)
        if not resolution:
            self.resolution = 640
        if not sizes:
            self.sizes = [16, 32, 64, 128, 256]
        if not strides:
            self.strides = [4, 8, 16, 32, 64]
        # ratios and scales applied to all feature levels from FPN output
        if not ratios:
            #self.ratios  = [0.5, 1, 2]
            self.ratios = [1] #used in RetinaFace since faces are typically square-like
        if not scales:
            self.scales  = [2 ** 0, 2 ** (1.0 / 3.0), 2 ** (2.0 / 3.0)]
            
        self.n_anchors = len(self.ratios) * len(self.scales)
        
        return None
    
    def generate_feature_level_base_anchors(self, size):
        """Create K anchors boxes centered on origin for a particular FPN feature level"""
        
        anchors = np.zeros((self.n_anchors, 4)) 
        #scale base size at different scales
        anchors[:, 2:] = size * np.tile(self.scales, (2, len(self.ratios))).T
        # get different combinations of aspect ratios
        areas = anchors[:, 2] * anchors[:, 3]
        anchors[:, 2] = np.sqrt(areas / np.repeat(self.ratios, len(self.scales)))
        anchors[:, 3] = anchors[:, 2] * np.repeat(self.ratios, len(self.scales))
        
        # transform from (x_ctr, y_ctr, w, h) -> (x1, y1, x2, y2)
        anchors[:, 0::2] -= np.tile(anchors[:, 2] * 0.5, (2, 1)).T
        anchors[:, 1::2] -= np.tile(anchors[:, 3] * 0.5, (2, 1)).T
        
        #self.base_anchors = tf.cast(anchors, dtype=tf.float32)
        return anchors
    
    def shift_and_duplicate(self, anchors, stride):
        """Generate bounding boxes by duplicating FPN base anchors every s strides"""
        feature_size = int(np.round(self.resolution/stride))

        # image_size/stride should equal feature_size (so we could write it for either)
        shift_x = (tf.keras.backend.arange(0, feature_size, dtype=tf.float32) + tf.keras.backend.constant(0.5, dtype=tf.float32)) * stride
        shift_y = (tf.keras.backend.arange(0, feature_size, dtype=tf.float32) + tf.keras.backend.constant(0.5, dtype=tf.float32)) * stride

        # a tensor that supports cartesian indexing
        shift_x, tf.meshgrid(shift_x)
        shift_y = tf.meshgrid(shift_y)

        # make sure of 1-dimensionsal 
        shift_x = tf.keras.backend.reshape(shift_x, [-1])
        shift_y = tf.keras.backend.reshape(shift_y, [-1])
        
        shifts = tf.keras.backend.stack([shift_x, shift_y, shift_x, shift_y], axis=0)
        shifts = tf.keras.backend.transpose(shifts)
        
        k = tf.keras.backend.shape(shifts)[0]
        
        anchors = tf.convert_to_tensor(anchors, dtype=tf.float32)
        shifts = tf.cast(shifts, dtype=tf.float32)
        

        shifted_anchors = tf.keras.backend.reshape(anchors, [1, self.n_anchors, 4]) + tf.keras.backend.reshape(shifts, [k, 1, 4])
        shifted_anchors = tf.keras.backend.reshape(shifted_anchors, [k * self.n_anchors, 4])
        
        feature_level_anchors = tf.keras.backend.tile(tf.keras.backend.expand_dims(shifted_anchors, axis=0), (feature_size, 1, 1))
        feature_level_anchors = tf.keras.backend.reshape(feature_level_anchors, [(feature_size**2)*self.n_anchors, 4])
        
        return feature_level_anchors
    
    def generate_all_anchors(self):
        """Generate all anchor boxes for every level of the pyramid"""
        self.feature_sizes = [int(np.round(self.resolution/stride)) for stride in self.strides]
        
        all_anchors = [self.generate_feature_level_base_anchors(size=size) for size in self.sizes]
        all_anchors = [self.shift_and_duplicate(layer_anchors, stride) for layer_anchors, stride in zip(all_anchors, self.strides)]
        all_anchors = tf.concat(all_anchors, axis=0)

        return all_anchors

In [None]:
anchors = Anchors()

In [None]:
# test it for another set of features to make sure it is working
p3_anchors = anchors.generate_feature_level_base_anchors(size=anchors.sizes[1])
p3_anchors = anchors.shift_and_duplicate(p3_anchors, anchors.strides[1])
p3_anchors.shape

In [None]:
#generate all anchors
all_anchors = anchors.generate_all_anchors()
all_anchors.shape

In [None]:
all_anchors

## 4. Loss
The loss is actually regressing the transformations (deltas) from the anchor boxes to the ground truth boxes/annotation. 
So first we need to functions:
* 1. Get target transformations: delta between ground truth boxes (annotations) and anchor boxes
* 2.  from predicted transformations/deltas 

In [None]:
def compute_gt_delta(anchors, gt_boxes, mean=0.0, std=0.2):
    """Compute ground-truth transformations from annotations and anchor boxes"""
    anchor_widths  = anchors[:, 2] - anchors[:, 0]
    anchor_heights = anchors[:, 3] - anchors[:, 1]

    # According to the information provided by a keras-retinanet author, they got marginally better results using
    # the following way of bounding box parametrization.
    # See https://github.com/fizyr/keras-retinanet/issues/1273#issuecomment-585828825 for more details
    targets_dx1 = (gt_boxes[:, 0] - anchors[:, 0]) / anchor_widths
    targets_dy1 = (gt_boxes[:, 1] - anchors[:, 1]) / anchor_heights
    targets_dx2 = (gt_boxes[:, 2] - anchors[:, 2]) / anchor_widths
    targets_dy2 = (gt_boxes[:, 3] - anchors[:, 3]) / anchor_heights
    
    targets = tf.concat((targets_dx1, targets_dy1, targets_dx2, targets_dy2), axis=0)
    targets = targets.T

    targets = (targets - mean) / std
    return targets



In [None]:
def compute_pred_boxes(deltas, anchors, mean=0.0, std=0.2):
    """Get Predicted Boxes from predicted deltas and anchors"""
    #first dimension is the batch size
    width  = anchors[:, :, 2] - anchors[:, :, 0]
    height = anchors[:, :, 3] - anchors[:, :, 1]

    x1 = anchors[:, :, 0] + (deltas[:, :, 0] * std[0] + mean[0]) * width
    y1 = anchors[:, :, 1] + (deltas[:, :, 1] * std[1] + mean[1]) * height
    x2 = anchors[:, :, 2] + (deltas[:, :, 2] * std[2] + mean[2]) * width
    y2 = anchors[:, :, 3] + (deltas[:, :, 3] * std[3] + mean[3]) * height

    pred_boxes = tf.keras.backend.stack([x1, y1, x2, y2], axis=2)

    return pred_boxes

In [None]:
class RegressBoxes(tf.keras.layers.Layer):
    """ Keras layer for applying regression values to boxes.
    """

    def __init__(self, mean=None, std=None, *args, **kwargs):
        """ Initializer for the RegressBoxes layer.
        Args
            mean: The mean value of the regression values which was used for normalization.
            std: The standard value of the regression values which was used for normalization.
        """
        if mean is None:
            mean = np.array([0, 0, 0, 0])
        if std is None:
            std = np.array([0.2, 0.2, 0.2, 0.2])

        if isinstance(mean, (list, tuple)):
            mean = np.array(mean)
        elif not isinstance(mean, np.ndarray):
            raise ValueError('Expected mean to be a np.ndarray, list or tuple. Received: {}'.format(type(mean)))

        if isinstance(std, (list, tuple)):
            std = np.array(std)
        elif not isinstance(std, np.ndarray):
            raise ValueError('Expected std to be a np.ndarray, list or tuple. Received: {}'.format(type(std)))

        self.mean = mean
        self.std  = std
        super(RegressBoxes, self).__init__(*args, **kwargs)

    def call(self, inputs, **kwargs):
        anchors, regression_outputs = inputs
        return compute_gt_delta(anchors, regression_outputs, mean=self.mean, std=self.std)

    def compute_output_shape(self, input_shape):
        return input_shape[0]

    def get_config(self):
        config = super(RegressBoxes, self).get_config()
        config.update({
            'mean': self.mean.tolist(),
            'std' : self.std.tolist(),
        })

        return config

In [None]:
class ClipBoxes(tf.keras.layers.Layer):
    """ Keras layer to clip box values to lie inside a given shape.
    """
    def call(self, inputs, **kwargs):
        image, boxes = inputs
        image_shape = tf.shape(image)
        _, width, height, _ = tf.unstack(image_shape, axis=0)

        x1, y1, x2, y2 = tf.unstack(boxes, axis=-1)
        x1 = tf.clip_by_value(x1, 0, width  - 1)
        y1 = tf.clip_by_value(y1, 0, height - 1)
        x2 = tf.clip_by_value(x2, 0, width  - 1)
        y2 = tf.clip_by_value(y2, 0, height - 1)

        return tf.stack([x1, y1, x2, y2], axis=2)

    def compute_output_shape(self, input_shape):
        return input_shape[1]

In [None]:
def filter_detections(
    boxes,
    classification,
    other                 = [],
    class_specific_filter = True,
    nms                   = True,
    score_threshold       = 0.05,
    max_detections        = 300,
    nms_threshold         = 0.5
):
    """ Filter detections using the boxes and classification values.
    Args
        boxes                 : Tensor of shape (num_boxes, 4) containing the boxes in (x1, y1, x2, y2) format.
        classification        : Tensor of shape (num_boxes, num_classes) containing the classification scores.
        other                 : List of tensors of shape (num_boxes, ...) to filter along with the boxes and classification scores.
        class_specific_filter : Whether to perform filtering per class, or take the best scoring class and filter those.
        nms                   : Flag to enable/disable non maximum suppression.
        score_threshold       : Threshold used to prefilter the boxes with.
        max_detections        : Maximum number of detections to keep.
        nms_threshold         : Threshold for the IoU value to determine when a box should be suppressed.
    Returns
        A list of [boxes, scores, labels, other[0], other[1], ...].
        boxes is shaped (max_detections, 4) and contains the (x1, y1, x2, y2) of the non-suppressed boxes.
        scores is shaped (max_detections,) and contains the scores of the predicted class.
        labels is shaped (max_detections,) and contains the predicted label.
        other[i] is shaped (max_detections, ...) and contains the filtered other[i] data.
        In case there are less than max_detections detections, the tensors are padded with -1's.
    """
    def _filter_detections(scores, labels):
        # threshold based on score
        indices = tf.where(tf.math.greater(scores, score_threshold))

        if nms:
            filtered_boxes  = tf.gather_nd(boxes, indices)
            filtered_scores = tf.gather(scores, indices)[:, 0]

            # perform NMS
            nms_indices = tf.image.non_max_suppression(filtered_boxes, filtered_scores, max_output_size=max_detections, iou_threshold=nms_threshold)

            # filter indices based on NMS
            indices = tf.gather(indices, nms_indices)

        # add indices to list of all indices
        labels = tf.gather_nd(labels, indices)
        indices = tf.stack([indices[:, 0], labels], axis=1)

        return indices

    if class_specific_filter:
        all_indices = []
        # perform per class filtering
        for c in range(int(classification.shape[1])):
            scores = classification[:, c]
            labels = c * tf.ones((tf.shape(scores)[0],), dtype='int64')
            all_indices.append(_filter_detections(scores, labels))

        # concatenate indices to single tensor
        indices = tf.concatenate(all_indices, axis=0)
    else:
        scores  = tf.keras.backend.max(classification, axis    = 1)
        labels  = tf.math.argmax(classification, axis = 1)
        indices = _filter_detections(scores, labels)

    # select top k
    scores              = tf.gather_nd(classification, indices)
    labels              = indices[:, 1]
    scores, top_indices = tf.top_k(scores, k=tf.math.minimum(max_detections, tf.shape(scores)[0]))

    # filter input using the final set of indices
    indices             = tf.gather(indices[:, 0], top_indices)
    boxes               = tf.gather(boxes, indices)
    labels              = tf.gather(labels, top_indices)
    other_              = [tf.gather(o, indices) for o in other]

    # zero pad the outputs
    pad_size = tf.math.maximum(0, max_detections - tf.shape(scores)[0])
    boxes    = tf.pad(boxes, [[0, pad_size], [0, 0]], constant_values=-1)
    scores   = tf.pad(scores, [[0, pad_size]], constant_values=-1)
    labels   = tf.pad(labels, [[0, pad_size]], constant_values=-1)
    labels   = tf.cast(labels, 'int32')
    other_   = [tf.pad(o, [[0, pad_size]] + [[0, 0] for _ in range(1, len(o.shape))], constant_values=-1) for o in other_]

    # set shapes, since we know what they are
    boxes.set_shape([max_detections, 4])
    scores.set_shape([max_detections])
    labels.set_shape([max_detections])
    for o, s in zip(other_, [list(keras.backend.int_shape(o)) for o in other]):
        o.set_shape([max_detections] + s[1:])

    return [boxes, scores, labels] + other_


class FilterDetections(tf.keras.layers.Layer):
    """ Keras layer for filtering detections using score threshold and NMS.
    """

    def __init__(
        self,
        nms                   = True,
        class_specific_filter = True,
        nms_threshold         = 0.5,
        score_threshold       = 0.05,
        max_detections        = 300,
        parallel_iterations   = 32,
        **kwargs
    ):
        """ Filters detections using score threshold, NMS and selecting the top-k detections.
        Args
            nms                   : Flag to enable/disable NMS.
            class_specific_filter : Whether to perform filtering per class, or take the best scoring class and filter those.
            nms_threshold         : Threshold for the IoU value to determine when a box should be suppressed.
            score_threshold       : Threshold used to prefilter the boxes with.
            max_detections        : Maximum number of detections to keep.
            parallel_iterations   : Number of batch items to process in parallel.
        """
        self.nms                   = nms
        self.class_specific_filter = class_specific_filter
        self.nms_threshold         = nms_threshold
        self.score_threshold       = score_threshold
        self.max_detections        = max_detections
        self.parallel_iterations   = parallel_iterations
        super(FilterDetections, self).__init__(**kwargs)

    def call(self, inputs, **kwargs):
        """ Constructs the NMS graph.
        Args
            inputs : List of [boxes, classification, other[0], other[1], ...] tensors.
        """
        boxes          = inputs[0]
        classification = inputs[1]
        other          = inputs[2:]

        # wrap nms with our parameters
        def _filter_detections(args):
            boxes          = args[0]
            classification = args[1]
            other          = args[2]

            return filter_detections(
                boxes,
                classification,
                other,
                nms                   = self.nms,
                class_specific_filter = self.class_specific_filter,
                score_threshold       = self.score_threshold,
                max_detections        = self.max_detections,
                nms_threshold         = self.nms_threshold,
            )

        # call filter_detections on each batch
        outputs = tf.map_fn(
            _filter_detections,
            elems=[boxes, classification, other],
            dtype=[tf.keras.backend.floatx(), tf.keras.backend.floatx(), 'int32'] + [o.dtype for o in other],
            parallel_iterations=self.parallel_iterations
        )

        return outputs

    def compute_output_shape(self, input_shape):
        """ Computes the output shapes given the input shapes.
        Args
            input_shape : List of input shapes [boxes, classification, other[0], other[1], ...].
        Returns
            List of tuples representing the output shapes:
            [filtered_boxes.shape, filtered_scores.shape, filtered_labels.shape, filtered_other[0].shape, filtered_other[1].shape, ...]
        """
        return [
            (input_shape[0][0], self.max_detections, 4),
            (input_shape[1][0], self.max_detections),
            (input_shape[1][0], self.max_detections),
        ] + [
            tuple([input_shape[i][0], self.max_detections] + list(input_shape[i][2:])) for i in range(2, len(input_shape))
        ]

    def compute_mask(self, inputs, mask=None):
        """ This is required in Keras when there is more than 1 output.
        """
        return (len(inputs) + 1) * [None]

    def get_config(self):
        """ Gets the configuration of this layer.
        Returns
            Dictionary containing the parameters of this layer.
        """
        config = super(FilterDetections, self).get_config()
        config.update({
            'nms'                   : self.nms,
            'class_specific_filter' : self.class_specific_filter,
            'nms_threshold'         : self.nms_threshold,
            'score_threshold'       : self.score_threshold,
            'max_detections'        : self.max_detections,
            'parallel_iterations'   : self.parallel_iterations,
        })

        return config

In [None]:
for ex in train.take(1):
    img, bboxes = preprocess(img)