In [1]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import (
    Add,
    Input,
    Conv2D,
    Lambda,
    MaxPool2D,
    LeakyReLU,
    Concatenate,
    UpSampling2D,
    ZeroPadding2D,
    BatchNormalization)
from tensorflow.keras import Model
from tensorflow.keras.regularizers import l2

In [2]:
gpus = tf.config.experimental.list_physical_devices('GPU')
if len(gpus) > 0:
    try: tf.config.experimental.set_memory_growth(gpus[0], True)
    except RuntimeError: pass

In [3]:
YOLO_STRIDES                = [8, 16, 32]
YOLO_IOU_LOSS_THRESH        = 0.5
YOLO_ANCHOR_PER_SCALE       = 3
YOLO_MAX_BBOX_PER_SCALE     = 100
YOLO_INPUT_SIZE             = 416
YOLO_ANCHORS                = [[[10,  13], [16,   30], [33,   23]],
                               [[30,  61], [62,   45], [59,  119]],
                               [[116, 90], [156, 198], [373, 326]]]

STRIDES         = np.array(YOLO_STRIDES)
ANCHORS         = (np.array(YOLO_ANCHORS).T/STRIDES).T

### Backbone of YoloV3

In [4]:
class BatchNormalization(BatchNormalization):

    def call(self, x, training=False):
        if not training:
            training = tf.constant(False)
        training = tf.logical_and(training, self.trainable)
        return super().call(x, training)


def DarknetConv(x, filters, size, downsample=False, activate=True, bn=True):

    if downsample:
        x = ZeroPadding2D(((1, 0), (1, 0)))(x)
        padding = 'valid'
        strides = 2
    else:
        padding = 'same'
        strides = 1

    x = Conv2D(filters=filters, kernel_size=size,
               strides=strides, padding=padding, use_bias=not bn,
               kernel_regularizer=l2(0.0005),
               kernel_initializer=tf.random_normal_initializer(stddev=0.01),
               bias_initializer=tf.constant_initializer(0.))(x)

    if bn:
        x = BatchNormalization()(x)
    if activate:
        x = LeakyReLU(alpha=0.1)(x)

    return x


def DarknetResidual(x, filters):

    short_cut = x
    x = DarknetConv(x, filters=filters//2, size=1)
    x = DarknetConv(x, filters=filters, size=3)
    x = Add()([short_cut, x])

    return x


def DarknetBlock(x, filters, blocks):
    x = DarknetConv(x, filters=filters, size=3, downsample=True)
    for _ in range(blocks):
        x = DarknetResidual(x, filters)

    return x


def Darknet(name=None):

    x = inputs = Input([None, None, 3])
    x = DarknetConv(x, filters=32, size=3)
    x = DarknetBlock(x, 64, 2)
    x = DarknetBlock(x, 128, 2)
    x = x_36 = DarknetBlock(x, 256, 8)
    x = x_61 = DarknetBlock(x, 512, 8)
    x = DarknetBlock(x, 1024, 4)

    return Model(inputs, (x_36, x_61, x), name=name)


### YoloV3 model

In [5]:
def YoloConv(filters, name=None):
    def yolo_conv(x_in):
        if isinstance(x_in, tuple):
            inputs = Input(x_in[0].shape[1:]), Input(x_in[1].shape[1:])
            x, x_skip = inputs

            # concat with skip connection
            x = DarknetConv(x, filters, 1)
            x = UpSampling2D(2)(x)
            x = Concatenate()([x, x_skip])
        else:
            x = inputs = Input(x_in.shape[1:])

        x = DarknetConv(x, filters, 1)
        x = DarknetConv(x, filters * 2, 3)
        x = DarknetConv(x, filters, 1)
        x = DarknetConv(x, filters * 2, 3)
        x = DarknetConv(x, filters, 1)
        return Model(inputs, x, name=name)(x_in)
    return yolo_conv


class YoloOutput(tf.keras.layers.Layer):

    def __init__(self, classes=80, masks=None, strides=None):
        super(YoloOutput, self).__init__()
        self.classes = classes
        self.masks = masks
        self.strides = strides

    def __call__(self, x):

        batch_size, output_size = tf.shape(x)[:2]
        x_output = tf.reshape(x, (-1, output_size, output_size,
                                  3, 5 + self.classes))

        x_dxdy = x_output[:, :, :, :, 0:2]
        x_dwdh = x_output[:, :, :, :, 2:4]
        x_conf = x_output[:, :, :, :, 4:5]
        x_prob = x_output[:, :, :, :, 5:]

        # Draw the grid

        y = tf.range(output_size, dtype=tf.int32)
        y = tf.expand_dims(y, axis=-1)
        y = tf.tile(y, [1, output_size])
        x = tf.range(output_size, dtype=tf.int32)
        x = tf.expand_dims(x, axis=0)
        x = tf.tile(x, [output_size, 1])

        xy_grid = tf.concat([x[:, :, tf.newaxis], y[:, :, tf.newaxis]],
                            axis=-1)
        xy_grid = tf.tile(xy_grid[tf.newaxis, :, :, tf.newaxis, :],
                          [batch_size, 1, 1, 3, 1])
        xy_grid = tf.cast(xy_grid, tf.float32)

        pred_xy = (tf.sigmoid(x_dxdy) + xy_grid) * self.strides
        pred_wh = tf.exp(x_dwdh) * self.masks * self.strides

        pred_xywh = tf.concat([pred_xy, pred_wh], axis=-1)
        pred_conf = tf.sigmoid(x_conf)
        pred_prob = tf.sigmoid(x_prob)

        return tf.concat([pred_xywh, pred_conf, pred_prob], axis=-1)


def YoloV3(size=None, classes=80, training=False):
    x = inputs = Input([size, size, 3], name='input')

    x_36, x_61, x = Darknet(name='yolo_darknet')(x)

    x = YoloConv(512, name='yolo_conv_0')(x)
    l_output = DarknetConv(x, filters=3*(classes + 5),
                           size=1, activate=False, bn=False)

    x = YoloConv(256, name='yolo_conv_1')((x, x_61))
    m_output = DarknetConv(x, filters=3*(classes + 5),
                           size=1, activate=False, bn=False)

    x = YoloConv(128, name='yolo_conv_2')((x, x_36))
    s_output = DarknetConv(x, filters=3*(classes + 5),
                           size=1, activate=False, bn=False)

    output_tensors = []
    for i, output_tensor in enumerate([s_output, m_output, l_output]):
        pred_tensor = YoloOutput(classes, masks=ANCHORS[i],
                                 strides=STRIDES[i])(output_tensor)
        if training:
            output_tensors.append(output_tensor)
        output_tensors.append(pred_tensor)

    return Model(inputs, output_tensors)


In [None]:
m = YoloV3(416, 1, training=True)

In [None]:
m.summary()