In [1]:
from functools import reduce
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import (Add, BatchNormalization, Conv2D, 
                                     Concatenate, Input, ZeroPadding2D, 
                                     LeakyReLU, UpSampling2D)
from tensorflow.keras import backend as backend
from tensorflow.keras.regularizers import l2

In [2]:
def parse_cfg(cfgfile):
    with open(cfgfile, 'r') as file:
        # to remove unnecessary characters like '\n' and '#'
        # and variable 'lines' will have all lines of file yolov3.cfg
        lines = [line.rstrip('\n') for line in file if line != '\n' and line[0] != '#']    
    holder = {}
    blocks = []
    # loop over each line and store each key value pairs in holder dict holder
    # and then that dictionary is stored in blocks list
    for line in lines:
        if line[0] == '[':
            line = 'type=' + line[1:-1].rstrip()
            if len(holder) != 0:
                blocks.append(holder)
                holder = {}
        key, value = line.split('=')
        holder[key.rstrip()] = value.lstrip()
    blocks.append(holder)
    return blocks

In [3]:
def YOLOv3Net(cfgfile, model_size, num_classes):
    
    blocks = parse_cfg(cfgfile)
    
    outputs = {}
    output_filters = []
    filters = []
    out_pred = []
    scale = 0
    
    inputs = input_image = Input(shape = model_size)
    inputs = inputs / 255.0
    
    # The YOLOv3 has 5 layers types in general, they are: 
    # 1. Convolutional layer
    # 2. Upsample layer
    # 3. Route layer
    # 4. Shortcut layer
    # 5. Yolo layer
    for i, block in enumerate(blocks[1:]):
        
        # In YOLOv3, there are 2 convolutional layer types, i.e with and without batch normalization layer.
        if(block['type'] == 'convolutional'):
            
            activation = block['activation']
            filters = int(block['filters'])
            kernel_size = int(block['size'])
            strides = int(block['stride'])
            
            # If strides is greater than 1 then downsampling is performed,
            # hence need to adjust the padding
            if strides > 1:
                inputs = ZeroPadding2D(((1, 0), (1, 0)))(inputs)
            
            inputs = Conv2D(filters,
                            kernel_size,
                            strides=strides,
                            padding='valid' if strides > 1 else 'same',
                            name='conv_' + str(i),
                            use_bias=False if ("batch_normalize" in block) else True)(inputs)
            
            # The convolutional layer followed by a batch normalization layer uses a Leaky ReLU activation layer,
            # otherwise, it uses the linear activation by default.
            if "batch_normalize" in block:
                inputs = BatchNormalization(name='bnorm_'+str(i))(inputs)
            if activation == 'leaky':
                inputs = LeakyReLU(alpha=0.1, name='leaky_'+str(i))(inputs)
        
        # In YOLOv3, Upsampler layer performs upsampling of the previous fetaure map
        # by a factor of 'stride' using bilinear upsampling method.
        elif (block['type'] == 'upsample'):
            stride = int(block['stride'])
            inputs = UpSampling2D(stride)(inputs)
        
        # Route layer
        elif (block['type'] == 'route'):
            block['layers'] = block['layers'].split(',')
            start = int(block['layers'][0])
            # Check if attribute 'layers' has 1 value or 2
            # If it has 1 value such as -4, then we need to go backward 4 layers and then output
            # the feature map from that layer.
            # If it has 2 values such as -1 and 61, then we need to concatenate the feature map 
            # from a previous layer (-1) and the feature map from layer 61. 
            if len(block['layers']) > 1:
                end = int(block['layers'][1]) - i
                filters = output_filters[i + start] + output_filters[end]
                inputs = tf.concat([outputs[i + start], outputs[i + end]], axis = -1)
            else:
                filters = output_filters[i + start]
                inputs = outputs[i + start]
                
        # In Shortcut layer, we perform skip connection. If attribute 'from' has value -3, then
        # go backward 3 payers and take the feature map from that layer and add it with feature
        # map from previous layer.
        elif (block['type'] == 'shortcut'):
            from_ = int(block['from'])
            inputs = outputs[i - 1] + outputs[i + from_]
            
        # In Yolo layer, first we check the 'mask' and 'anchors' values.
        elif (block['type'] == 'yolo'):
            mask = block['mask'].split(',')
            mask = [int(x) for x in mask]
            #num_classes = int(block['classes'])
            anchors = block['anchors'].split(',')
            anchors = [int(a) for a in anchors]
            anchors = [(anchors[j], anchors[j+1]) for j in range(0,len(anchors),2)]
            anchors = [anchors[j] for j in mask]
            # Reshape the YOLO output to the form of [None, B*grid_size*grid_size, 5+C]
            # where B is the number of anchors and C is the number of classes
            n_anchors = len(anchors)
            out_shape = inputs.get_shape().as_list()
            
            inputs = tf.reshape(inputs, [-1, n_anchors * out_shape[1] * out_shape[2], \
                                        5 + num_classes])
            
            
            box_centers = inputs[:, :, 0:2]
            
            box_shapes = inputs[:, :, 2:4]
            confidence = inputs[:, :, 4:5]
            classes = inputs[:, :, 5:num_classes + 5]
            
            # Refining Bounding Boxes
            # Use the sigmoid function to convert box_centers, confidence and classes values 
            # into range of 0 - 1.
            box_centers = tf.nn.sigmoid(box_centers)
            confidence = tf.nn.sigmoid(confidence)
            classes = tf.nn.sigmoid(classes)
          
            # Convert box_shapes
            anchors = tf.tile(anchors, [out_shape[1] * out_shape[2], 1])
            box_shapes = tf.exp(box_shapes) * tf.cast(anchors, dtype = tf.float32)
            
            x = tf.range(out_shape[1], dtype = tf.float32)
            y = tf.range(out_shape[2], dtype = tf.float32)
           
            
            # Using meshgrid to convert the relative positions of the center boxes
            # into real positions
            cx, cy = tf.meshgrid(x,y)
            cx = tf.reshape(cx, (-1,1))
            cy = tf.reshape(cy, (-1,1))
            cxy = tf.concat([cx, cy], axis = -1)
            cxy = tf.tile(cxy, [1, n_anchors])
            cxy = tf.reshape(cxy, [1, -1, 2])
            
            strides = (input_image.shape[1] // out_shape[1], input_image.shape[2] // out_shape[2])
            box_centers = (box_centers + cxy) * strides
            
            prediction = tf.concat([box_centers, box_shapes, confidence, classes], axis = -1)
            
            if scale:
                out_pred = tf.concat([out_pred, prediction], axis = 1)
            else:
                out_pred = prediction
                scale = 1
            
        # Since Route and Shortcut layers need output feature maps from previous layers
        # so, we need to keep track of feature maps and output filters
        outputs[i] = inputs
        output_filters.append(filters)
                
    model = Model(input_image, out_pred)
    model.summary()
    #model.save('./data/yolov3.h5')
    return model


In [4]:
def compose(*funcs):
    '''Compose arbitrarily many functions, evaluated left to right.
    '''
    # return lambda x: reduce(lambda v, f: f(v), funcs, x)
    if funcs:
        return reduce(lambda f, g: lambda *a, **kw: g(f(*a, **kw)), funcs)
    else:
        raise ValueError('Composition of empty sequence not supported.')

In [5]:
def DarknetConv2D(*args, **kwargs):
    '''Wrapper to set Darknet parameters for Convolution2D.'''
    darknet_conv_kwargs = {'kernel_regularizer': l2(5e-4)}
    darknet_conv_kwargs['padding'] = 'valid' if kwargs.get('strides')==(2,2) else 'same'
    darknet_conv_kwargs.update(kwargs)
    return Conv2D(*args, **darknet_conv_kwargs)

def DarknetConv2D_BN_Leaky(*args, **kwargs):
    '''Darknet Convolution2D followed by BatchNormalization and LeakyReLU.'''
    no_bias_kwargs = {'use_bias': False}
    no_bias_kwargs.update(kwargs)
    return compose(
        DarknetConv2D(*args, **no_bias_kwargs),
        BatchNormalization(),
        LeakyReLU(alpha=0.1))

def resblock_body(x, num_filters, num_blocks):
    '''A series of resblocks starting with a downsampling Convolution2D'''
    # Darknet uses left and top padding instead of 'same' mode
    x = ZeroPadding2D(((1,0),(1,0)))(x)
    x = DarknetConv2D_BN_Leaky(num_filters, (3,3), strides=(2,2))(x)
    for i in range(num_blocks):
        y = compose(
                DarknetConv2D_BN_Leaky(num_filters//2, (1,1)),
                DarknetConv2D_BN_Leaky(num_filters, (3,3)))(x)
        x = Add()([x,y])
    return x

def darknet_body(x):
    '''Darknent body having 52 Convolution2D layers'''
    x = DarknetConv2D_BN_Leaky(32, (3,3))(x)
    x = resblock_body(x, 64, 1)
    x = resblock_body(x, 128, 2)
    x = resblock_body(x, 256, 8)
    x = resblock_body(x, 512, 8)
    x = resblock_body(x, 1024, 4)
    return x

def make_last_layers(x, num_filters, out_filters):
    '''6 Conv2D_BN_Leaky layers followed by a Conv2D_linear layer'''
    x = compose(
            DarknetConv2D_BN_Leaky(num_filters, (1,1)),
            DarknetConv2D_BN_Leaky(num_filters*2, (3,3)),
            DarknetConv2D_BN_Leaky(num_filters, (1,1)),
            DarknetConv2D_BN_Leaky(num_filters*2, (3,3)),
            DarknetConv2D_BN_Leaky(num_filters, (1,1)))(x)
    y = compose(
            DarknetConv2D_BN_Leaky(num_filters*2, (3,3)),
            DarknetConv2D(out_filters, (1,1)))(x)
    return x, y


def yolo_body(inputs, num_anchors, num_classes):
    """Create YOLO_V3 model CNN body in Keras."""
    darknet = Model(inputs, darknet_body(inputs))
    x, y1 = make_last_layers(darknet.output, 512, num_anchors*(num_classes+5))

    x = compose(
            DarknetConv2D_BN_Leaky(256, (1,1)),
            UpSampling2D(2))(x)
    x = Concatenate()([x,darknet.layers[152].output])
    x, y2 = make_last_layers(x, 256, num_anchors*(num_classes+5))

    x = compose(
            DarknetConv2D_BN_Leaky(128, (1,1)),
            UpSampling2D(2))(x)
    x = Concatenate()([x,darknet.layers[92].output])
    x, y3 = make_last_layers(x, 128, num_anchors*(num_classes+5))

    return Model(inputs, [y1,y2,y3])

In [6]:
def yolo_head(feats, anchors, num_classes, input_shape, calc_loss=False):
    '''Convert final layer features to bounding box parameters.'''
    num_anchors = len(anchors)
    # Reshape to batch, height, width, num_anchors, box_params.
    anchors_tensor = backend.reshape(backend.constant(anchors), [1, 1, 1, num_anchors, 2])

    grid_shape = backend.shape(feats)[1:3] # height, width
    grid_y = backend.tile(backend.reshape(backend.arange(0, stop=grid_shape[0]), [-1, 1, 1, 1]),
        [1, grid_shape[1], 1, 1])
    grid_x = backend.tile(backend.reshape(backend.arange(0, stop=grid_shape[1]), [1, -1, 1, 1]),
        [grid_shape[0], 1, 1, 1])
    grid = backend.concatenate([grid_x, grid_y])
    grid = backend.cast(grid, backend.dtype(feats))

    feats = backend.reshape(
        feats, [-1, grid_shape[0], grid_shape[1], num_anchors, num_classes + 5])

    # Adjust preditions to each spatial grid point and anchor size.
    box_xy = (backend.sigmoid(feats[..., :2]) + grid) / backend.cast(grid_shape[::-1], backend.dtype(feats))
    box_wh = backend.exp(feats[..., 2:4]) * anchors_tensor / backend.cast(input_shape[::-1], backend.dtype(feats))
    box_confidence = backend.sigmoid(feats[..., 4:5])
    box_class_probs = backend.sigmoid(feats[..., 5:])

    if calc_loss == True:
        return grid, feats, box_xy, box_wh
    return box_xy, box_wh, box_confidence, box_class_probs

In [7]:
def box_iou(b1, b2):
    # Expand dim to apply broadcasting.
    b1 = backend.expand_dims(b1, -2)
    b1_xy = b1[..., :2]
    b1_wh = b1[..., 2:4]
    b1_wh_half = b1_wh/2.
    b1_mins = b1_xy - b1_wh_half
    b1_maxes = b1_xy + b1_wh_half

    # Expand dim to apply broadcasting.
    b2 = backend.expand_dims(b2, 0)
    b2_xy = b2[..., :2]
    b2_wh = b2[..., 2:4]
    b2_wh_half = b2_wh/2.
    b2_mins = b2_xy - b2_wh_half
    b2_maxes = b2_xy + b2_wh_half

    intersect_mins = backend.maximum(b1_mins, b2_mins)
    intersect_maxes = backend.minimum(b1_maxes, b2_maxes)
    intersect_wh = backend.maximum(intersect_maxes - intersect_mins, 0.)
    intersect_area = intersect_wh[..., 0] * intersect_wh[..., 1]
    b1_area = b1_wh[..., 0] * b1_wh[..., 1]
    b2_area = b2_wh[..., 0] * b2_wh[..., 1]
    iou = intersect_area / (b1_area + b2_area - intersect_area)

    return iou

In [8]:
def yolo_loss(args, anchors, num_classes, ignore_thresh=.5, print_loss=False):
    '''Return yolo_loss tensor

    Parameters
    ----------
    yolo_outputs: list of tensor, the output of yolo_body or tiny_yolo_body
    y_true: list of array, the output of preprocess_true_boxes
    anchors: array, shape=(N, 2), wh
    num_classes: integer
    ignore_thresh: float, the iou threshold whether to ignore object confidence loss

    Returns
    -------
    loss: tensor, shape=(1,)

    '''
    num_layers = len(anchors)//3 # default setting
    yolo_outputs = args[:num_layers]
    y_true = args[num_layers:]
    anchor_mask = [[6,7,8], [3,4,5], [0,1,2]] if num_layers==3 else [[3,4,5], [1,2,3]]
    input_shape = tf.cast(tf.shape(yolo_outputs[0])[1:3] * 32, backend.dtype(y_true[0]))
    grid_shapes = [tf.cast(tf.shape(yolo_outputs[l])[1:3], backend.dtype(y_true[0])) for l in range(num_layers)]
    loss = 0
    m = tf.shape(yolo_outputs[0])[0] # batch size, tensor
    mf = tf.cast(m, backend.dtype(yolo_outputs[0]))

    for l in range(num_layers):
        object_mask = y_true[l][..., 4:5]
        true_class_probs = y_true[l][..., 5:]

        grid, raw_pred, pred_xy, pred_wh = yolo_head(yolo_outputs[l],
             anchors[anchor_mask[l]], num_classes, input_shape, calc_loss=True)
        pred_box = backend.concatenate([pred_xy, pred_wh])

        # Darknet raw box to calculate loss.
        raw_true_xy = y_true[l][..., :2]*grid_shapes[l][::-1] - grid
        raw_true_wh = backend.log(y_true[l][..., 2:4] / anchors[anchor_mask[l]] * input_shape[::-1])
        raw_true_wh = backend.switch(object_mask, raw_true_wh, tf.zeros_like(raw_true_wh)) # avoid log(0)=-inf
        box_loss_scale = 2 - y_true[l][...,2:3]*y_true[l][...,3:4]

        # Find ignore mask, iterate over each of batch.
        ignore_mask = tf.TensorArray(backend.dtype(y_true[0]), size=1, dynamic_size=True)
        object_mask_bool = tf.cast(object_mask, 'bool')
        def loop_body(b, ignore_mask):
            true_box = tf.boolean_mask(y_true[l][b,...,0:4], object_mask_bool[b,...,0])
            iou = box_iou(pred_box[b], true_box)
            best_iou = backend.max(iou, axis=-1)
            ignore_mask = ignore_mask.write(b, tf.cast(best_iou<ignore_thresh, backend.dtype(true_box)))
            return b+1, ignore_mask
        _, ignore_mask = tf.while_loop(lambda b,*args: b<m, loop_body, [0, ignore_mask])
        ignore_mask = ignore_mask.stack()
        ignore_mask = backend.expand_dims(ignore_mask, -1)

        # K.binary_crossentropy is helpful to avoid exp overflow.
        xy_loss = object_mask * box_loss_scale * backend.binary_crossentropy(raw_true_xy, raw_pred[...,0:2], from_logits=True)
        wh_loss = object_mask * box_loss_scale * 0.5 * backend.square(raw_true_wh-raw_pred[...,2:4])
        confidence_loss = object_mask * backend.binary_crossentropy(object_mask, raw_pred[...,4:5], from_logits=True)+ \
            (1-object_mask) * backend.binary_crossentropy(object_mask, raw_pred[...,4:5], from_logits=True) * ignore_mask
        class_loss = object_mask * backend.binary_crossentropy(true_class_probs, raw_pred[...,5:], from_logits=True)

        xy_loss = backend.sum(xy_loss) / mf
        wh_loss = backend.sum(wh_loss) / mf
        confidence_loss = backend.sum(confidence_loss) / mf
        class_loss = backend.sum(class_loss) / mf
        loss += xy_loss + wh_loss + confidence_loss + class_loss
        if print_loss:
            loss = tf.print(loss, [loss, xy_loss, wh_loss, confidence_loss, class_loss, backend.sum(ignore_mask)], message='loss: ')
    return loss