# YOLO v3 Darknet

### Imports

In [None]:
%cd /Users/rahulkanojia/Documents/own/YOLO/

In [None]:
# https://pylessons.com/YOLOv3-TF2-custrom-images/

In [None]:
from __future__ import absolute_import, division, print_function, unicode_literals
from tensorflow.keras import datasets, layers, models
from tensorflow.keras import backend as K
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.layers import Input
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer, InputSpec, Concatenate, LeakyReLU
from tensorflow.keras.layers import Conv2D, Input, LeakyReLU, ZeroPadding2D, BatchNormalization, MaxPool2D
from tensorflow.keras.regularizers import l2
import tensorflow as tf
import numpy as np
import shutil
import os
from Configs.mnist_config import *


In [None]:
# from tensorflow.keras.layers import BatchNormalization
# BatchNormalization._USE_V2_BEHAVIOR = False

In [None]:
ANCHORS         = [[[10,  13], [16,   30], [33,   23]],
                   [[30,  61], [62,   45], [59,  119]],
                   [[116, 90], [156, 198], [373, 326]]]

### Layers

In [None]:
class Relu6(Layer):
    def __init__(self):
        super(self.__class__,self).__init__()
        self.relu6 = tf.nn.relu6
        
    @tf.function
    def call(self, inputs):
        return self.relu6(inputs)
class BatchNorm(Layer):
    def __init__(self, scale=True, center=True):
        super(self.__class__,self).__init__()
        #self.bn = tf.keras.layers.BatchNormalization(scale=scale, center=center, trainable=True)
        self.bn = BatchNormalization(scale=scale, center=center, trainable=True)

    #@tf.function
    def call(self, inputs, training=True):
        return self.bn(inputs, training=training)
class Convolution2D(Layer):
    def __init__(self, filters, kernel_size, is_activation = True, is_batch_norm = True, downsample=False):
        super(self.__class__,self).__init__()
        
        self.downsample = downsample
        if self.downsample:
            padding = 'valid'
            strides = 2
        else:
            strides = 1
            padding = 'same'
        
        self.is_activation = is_activation
        self.is_batch_norm = is_batch_norm
        self.zero_pad =  ZeroPadding2D(((1, 0), (1, 0)))
        self.conv = tf.keras.layers.Conv2D(filters = filters, kernel_size = kernel_size, strides = strides, padding=padding, 
                                           use_bias=not is_batch_norm, kernel_regularizer=l2(0.0005),
                                           kernel_initializer=tf.random_normal_initializer(stddev=0.01),
                                           bias_initializer=tf.constant_initializer(0.))
        
        self.bn = BatchNorm()
        self.act = LeakyReLU(alpha=0.1)
        
    @tf.function
    def call(self, inputs):
        
        if self.downsample:
            x = self.zero_pad(inputs)
#             print(" PAD SHAOE ",x.shape )
            x = self.conv(x)
        else:
            x = self.conv(inputs)
            
        if self.is_batch_norm:
            x = self.bn(x)
        if self.is_activation:
            x = self.act(x)
        
        return x
class AveragePooling(Layer):
    def __init__(self, pool_size):
        super(self.__class__,self).__init__()
        self.avgpool = tf.keras.layers.AveragePooling2D(pool_size=pool_size, padding="SAME")
        
    @tf.function
    def call(self, inputs):
        
        x = self.avgpool(inputs)
         
        return x
class DenseLayer(Layer):
    def __init__(self, units):
        super(self.__class__,self).__init__()

        self.dense = tf.keras.layers.Dense(units=units,
                                           kernel_initializer=tf.random_normal_initializer(stddev=0.01))
        
    @tf.function
    def call(self, inputs):
        
        x = self.dense(inputs)
        
        return x
class FlattenLayer(Layer):
    def __init__(self):
        super(self.__class__,self).__init__()
        self.flatten = tf.keras.layers.Flatten()
        
    @tf.function
    def call(self, inputs):
        x = self.flatten(inputs)
        return x
class Residual_Block(Layer):
    def __init__(self, filters):
        super(self.__class__,self).__init__()
        self.conv1  = Convolution2D(filters =   filters, kernel_size = (1, 1))
        self.conv2  = Convolution2D(filters = 2*filters, kernel_size = (3, 3))
        self.add = tf.keras.layers.Add()

    def call(self, inputs):
        short_cut = inputs
        x = self.conv1(inputs)
        x = self.conv2(x)
        
#         print(" Residual Output 1", short_cut.shape, x.shape)
        residual_output = self.add([short_cut,x])
#         print(" Residual Output 2", residual_output.shape)
    
        return residual_output
class Upsample(Layer):
    def __init__(self):
        super(self.__class__,self).__init__()
        
    @tf.function
    def call(self, inputs):
        return tf.image.resize(inputs, (inputs.shape[1] * 2, inputs.shape[2] * 2), method='nearest')

In [None]:
class Darknet(Model):
    def __init__(self):
        super(self.__class__,self).__init__()
        self.conv1        = Convolution2D(32, (3, 3), is_activation=True, is_batch_norm=True, downsample=False)
        self.conv2        = Convolution2D(64, (3, 3), is_activation=True, is_batch_norm=True, downsample=True)
        
        self.residual1_1  = Residual_Block(32)
        
        self.conv3        = Convolution2D(128, (3, 3), is_activation=True, is_batch_norm=True, downsample=True)
        
        self.residual2_1  = Residual_Block(64)
        self.residual2_2  = Residual_Block(64)
        
        self.conv4        = Convolution2D(256, (3, 3),is_activation=True, is_batch_norm=True, downsample=True)
        
        self.residual3_1  = Residual_Block(128)
        self.residual3_2  = Residual_Block(128)
        self.residual3_3  = Residual_Block(128)
        self.residual3_4  = Residual_Block(128)
        self.residual3_5  = Residual_Block(128)
        self.residual3_6  = Residual_Block(128)
        self.residual3_7  = Residual_Block(128)
        self.residual3_8  = Residual_Block(128)
        
        self.conv5        = Convolution2D(512,  (3, 3),is_activation=True, is_batch_norm=True, downsample=True)
        
        self.residual4_1  = Residual_Block(256)
        self.residual4_2  = Residual_Block(256)
        self.residual4_3  = Residual_Block(256)
        self.residual4_4  = Residual_Block(256)
        self.residual4_5  = Residual_Block(256)
        self.residual4_6  = Residual_Block(256)
        self.residual4_7  = Residual_Block(256)
        self.residual4_8  = Residual_Block(256)
        
        self.conv6        = Convolution2D(1024, (3, 3),is_activation=True, is_batch_norm=True, downsample=True)
        
        self.residual5_1  = Residual_Block(512)
        self.residual5_2  = Residual_Block(512)
        self.residual5_3  = Residual_Block(512)
        self.residual5_4  = Residual_Block(512)
             
    @tf.function
    def call(self, inputs):
        x = self.conv1(inputs)
        x = self.conv2(x) 
        
        x = self.residual1_1(x)        
        
        x = self.conv3(x) 
        x = self.residual2_1(x)
        x = self.residual2_2(x)
        
        x = self.conv4(x)
        x = self.residual3_1(x)
        x = self.residual3_2(x)
        x = self.residual3_3(x)
        x = self.residual3_4(x)
        x = self.residual3_5(x)
        x = self.residual3_6(x)
        x = self.residual3_7(x)
        x = self.residual3_8(x)
        route1 = x
        
        x = self.conv5(x)
        x = self.residual4_1(x)
        x = self.residual4_2(x)
        x = self.residual4_3(x)
        x = self.residual4_4(x)
        x = self.residual4_5(x)
        x = self.residual4_6(x)
        x = self.residual4_7(x)
        x = self.residual4_8(x)
        route2 = x
        
        x = self.conv6(x)
        x = self.residual5_1(x)
        x = self.residual5_2(x)
        x = self.residual5_3(x)
        x = self.residual5_4(x)
        
    
        return route1, route2, x

### Model

In [None]:
class YOLOV3(Model):
    def __init__(self, NUM_CLASS, TRAIN = False):
        super(self.__class__,self).__init__()
        self.NUM_CLASS = NUM_CLASS
        self.TRAIN = TRAIN
        self.darknet = Darknet()
        self.conv1 = Convolution2D(512,  (1, 1), is_activation=True, is_batch_norm=True, downsample=False)
        self.conv2 = Convolution2D(1024, (3, 3), is_activation=True, is_batch_norm=True, downsample=False)
        self.conv3 = Convolution2D(512,  (1, 1), is_activation=True, is_batch_norm=True, downsample=False)
        self.conv4 = Convolution2D(1024, (3, 3), is_activation=True, is_batch_norm=True, downsample=False)
        self.conv5 = Convolution2D(512,  (1, 1), is_activation=True, is_batch_norm=True, downsample=False)
        
        self.conv_lobj_branch = Convolution2D(1024, (3, 3), is_activation=True, is_batch_norm=True, downsample=False)
        self.conv_lbbox = Convolution2D(3*(NUM_CLASS + 5),  (1, 1), is_activation=False, is_batch_norm=False, downsample=False)
        
        self.conv6 = Convolution2D(256,  (1, 1), is_activation=True, is_batch_norm=True, downsample=False)
        self.upsample1 = Upsample()
         
        self.conv7  = Convolution2D(256, (1, 1), is_activation=True, is_batch_norm=True, downsample=False)
        self.conv8  = Convolution2D(512, (3, 3), is_activation=True, is_batch_norm=True, downsample=False)
        self.conv9  = Convolution2D(256, (1, 1), is_activation=True, is_batch_norm=True, downsample=False)
        self.conv10 = Convolution2D(512, (3, 3), is_activation=True, is_batch_norm=True, downsample=False)
        self.conv11 = Convolution2D(256, (1, 1), is_activation=True, is_batch_norm=True, downsample=False)
        
        
        self.conv_mobj_branch = Convolution2D(512, (3, 3), is_activation=True, is_batch_norm=True, downsample=False)
        self.conv_mbbox = Convolution2D(3*(NUM_CLASS + 5), (1, 1), is_activation=False, is_batch_norm=False, downsample=False)
        #convolutional(conv_mobj_branch, (1, 1, 512, 3*(NUM_CLASS + 5)), activate=False, bn=False)

        
        self.conv12 = Convolution2D(128, (1, 1), is_activation=True, is_batch_norm=True, downsample=False)
        self.upsample2  = Upsample()
        self.conv13 = Convolution2D(128, (1, 1), is_activation=True, is_batch_norm=True, downsample=False)
        self.conv14 = Convolution2D(256, (3, 3), is_activation=True, is_batch_norm=True, downsample=False)
        self.conv15 = Convolution2D(128, (1, 1), is_activation=True, is_batch_norm=True, downsample=False)
        self.conv16 = Convolution2D(256, (3, 3), is_activation=True, is_batch_norm=True, downsample=False)
        self.conv17 = Convolution2D(128, (1, 1), is_activation=True, is_batch_norm=True, downsample=False)    

        self.conv_sobj_branch = Convolution2D(256, (3, 3), is_activation=True, is_batch_norm=True, downsample=False)
    # conv_sbbox is used to predict small size objects, shape = [None, 52, 52, 255]
        self.conv_sbbox = Convolution2D(3*(NUM_CLASS +5), (1, 1), is_activation=False, is_batch_norm=False, downsample=False)

    @tf.function
    def call(self, inputs, training=True):
        
        route_1, route_2, conv  = self.darknet(inputs)    
        x = self.conv1(conv)
        x = self.conv2(x)
        x = self.conv3(x)
        x = self.conv4(x)
        x = self.conv5(x)
        conv_lobj_branch = self.conv_lobj_branch(x)
        conv_lbbox = self.conv_lbbox(conv_lobj_branch)
        
        x = self.conv6(x)
        x = self.upsample1(x)
        
        x = tf.concat([x, route_2], axis=-1)
        x = self.conv7(x)
        x = self.conv8(x)
        x = self.conv9(x)
        x = self.conv10(x)
        x = self.conv11(x)
        
        conv_mobj_branch = self.conv_mobj_branch(x)
        conv_mbbox = self.conv_mbbox(conv_mobj_branch)
        
        x = self.conv12(x)
        x = self.upsample2(x)
        x = tf.concat([x, route_1], axis=-1)
        x = self.conv13(x)
        x = self.conv14(x)
        x = self.conv15(x)
        x = self.conv16(x)
        x = self.conv17(x)
        conv_sobj_branch = self.conv_sobj_branch(x)
        conv_sbbox = self.conv_sbbox(conv_sobj_branch)
        
        conv_tensors = [conv_sbbox, conv_mbbox, conv_lbbox]
        
        output_tensors = []

        for i, conv_tensor in enumerate(conv_tensors):
            pred_tensor = decode(conv_tensor, self.NUM_CLASS, i)
            if self.TRAIN:
                output_tensors.append(conv_tensor)
            output_tensors.append(pred_tensor)
        
        return output_tensors

In [None]:
yolo = YOLOV3(10, TRAIN=True)
tert = np.random.rand(1,416,416,3).astype(np.float32)
bert = yolo(tert, training=False)

In [None]:
bert[0].shape, bert[1].shape, bert[2].shape

#### Loading weights

In [None]:
# W1 = yolo.weights
# W2 = mine.weights
# for i in range(len(WW)):
# #     if WW[i].shape!=W2[i].shape or WW[i].name.split('/')[-1].split(':')[0] != W2[i].name.split('/')[-1].split(':')[0]:
#     print("Index :",i, " ",WW[i].shape, W2[i].shape, WW[i].shape==W2[i].shape," || ",WW[i].name.split('/')[-1].split(':')[0] == W2[i].name.split('/')[-1].split(':')[0])
# for i in range(len(WW)):
# #     if WW[i].shape!=W2[i].shape or WW[i].name.split('/')[-1].split(':')[0] != W2[i].name.split('/')[-1].split(':')[0]:
#     print("Index :",i, " ",WW[i].shape, W2[i].shape, WW[i].shape==W2[i].shape," || ",WW[i].name.split('/')[-1].split(':')[0] == W2[i].name.split('/')[-1].split(':')[0])
# for i in range(len(WW),len(W2)):
#     WW.append(W2[i])

In [None]:
# ### Loading Weights
# WW = []
# i = 0
# while(i<13):
#     WW.append(yolo.weights[i])
#     i += 1  
# WW.append(yolo.weights[15])
# WW.append(yolo.weights[16])
# WW.append(yolo.weights[17])
# WW.append(yolo.weights[13])
# WW.append(yolo.weights[14])
# i = 18
# while(i<28):
#     WW.append(yolo.weights[i])
#     i += 1
# while(i<43):
    
#     if i==33 :
#         for _ in range(5):
#             WW.append(yolo.weights[i])
#             i += 1
    
#     WW.append(yolo.weights[i+2])
#     WW.append(yolo.weights[i+3])
#     WW.append(yolo.weights[i+4])
#     WW.append(yolo.weights[i])
#     WW.append(yolo.weights[i+1])
#     i += 5
# while(i<53):
#     WW.append(yolo.weights[i])
#     i += 1
# WW.append(yolo.weights[55])
# WW.append(yolo.weights[56])
# WW.append(yolo.weights[57])
# WW.append(yolo.weights[53])
# WW.append(yolo.weights[54])
# i = 58
# while(i<128):
#     for _ in range(5):
#         WW.append(yolo.weights[i])
#         i += 1
#     WW.append(yolo.weights[i+2])
#     WW.append(yolo.weights[i+3])
#     WW.append(yolo.weights[i+4])
#     WW.append(yolo.weights[i])
#     WW.append(yolo.weights[i+1])
#     i+=5
# while(i<138):
#     WW.append(yolo.weights[i])
#     i += 1    
# while(i<213):
#     WW.append(yolo.weights[i+2])
#     WW.append(yolo.weights[i+3])
#     WW.append(yolo.weights[i+4])
#     WW.append(yolo.weights[i])
#     WW.append(yolo.weights[i+1])
#     i+=5
#     for _ in range(5):
#         WW.append(yolo.weights[i])
#         i += 1
# for _ in range(5):
#     WW.append(yolo.weights[i])
#     i += 1
# while(i<258):
#     WW.append(yolo.weights[i+2])
#     WW.append(yolo.weights[i+3])
#     WW.append(yolo.weights[i+4])
#     WW.append(yolo.weights[i])
#     WW.append(yolo.weights[i+1])
#     i+=5
#     for _ in range(5):
#         WW.append(yolo.weights[i])
#         i += 1
# for _ in range(22):
#     WW.append(yolo.weights[i])
#     i += 1

In [None]:
import pickle
file = open('../Darknet_weights.pkl', 'rb')
data = pickle.load(file)

In [None]:
print(len(W1), len(data))
W1 = yolo.weights
for i in range(len(data), len(W1)):
    data.append(W1[i])
print(len(W1), len(data))

In [None]:
new_data = []
for i in data:
    new_data.append(i.numpy())

In [None]:
yolo.set_weights(new_data)

### Dataset

In [None]:
%cd /Users/rahulkanojia/Documents/own/YOLO/

In [None]:
!python Dataset/MNIST/make_data.py

In [None]:
# from yolov3.dataset import Dataset
from Dataset.MNIST.mnist_data import Dataset
trainset = Dataset('train')
testset = Dataset('test')

In [None]:
for image, target in trainset:
    print(image.shape, len(target))
    break

In [None]:
import matplotlib.pyplot as plt

In [None]:
plt.imshow(image[0])

### Losses

In [None]:
def decode(conv_output, NUM_CLASS, i=0):
    # where i = 0, 1 or 2 to correspond to the three grid scales  
    conv_shape       = tf.shape(conv_output)
    batch_size       = conv_shape[0]
    output_size      = conv_shape[1]

    conv_output = tf.reshape(conv_output, (batch_size, output_size, output_size, 3, 5 + NUM_CLASS))

    conv_raw_dxdy = conv_output[:, :, :, :, 0:2] # offset of center position     
    conv_raw_dwdh = conv_output[:, :, :, :, 2:4] # Prediction box length and width offset
    conv_raw_conf = conv_output[:, :, :, :, 4:5] # confidence of the prediction box
    conv_raw_prob = conv_output[:, :, :, :, 5: ] # category probability of the prediction box 

    # next need Draw the grid. Where output_size is equal to 13, 26 or 52  
    y = tf.range(output_size, dtype=tf.int32)
    y = tf.expand_dims(y, -1)
    y = tf.tile(y, [1, output_size])
    x = tf.range(output_size,dtype=tf.int32)
    x = tf.expand_dims(x, 0)
    x = tf.tile(x, [output_size, 1])

    xy_grid = tf.concat([x[:, :, tf.newaxis], y[:, :, tf.newaxis]], axis=-1)
    xy_grid = tf.tile(xy_grid[tf.newaxis, :, :, tf.newaxis, :], [batch_size, 1, 1, 3, 1])
    xy_grid = tf.cast(xy_grid, tf.float32)

    # Calculate the center position of the prediction box:
    pred_xy = (tf.sigmoid(conv_raw_dxdy) + xy_grid) * STRIDES[i]
    # Calculate the length and width of the prediction box:
    pred_wh = (tf.exp(conv_raw_dwdh) * ANCHORS[i]) * STRIDES[i]

    pred_xywh = tf.concat([pred_xy, pred_wh], axis=-1)
    pred_conf = tf.sigmoid(conv_raw_conf) # object box calculates the predicted confidence
    pred_prob = tf.sigmoid(conv_raw_prob) # calculating the predicted probability category box object

    # calculating the predicted probability category box object
    return tf.concat([pred_xywh, pred_conf, pred_prob], axis=-1)
def bbox_iou(boxes1, boxes2):
    boxes1_area = boxes1[..., 2] * boxes1[..., 3]
    boxes2_area = boxes2[..., 2] * boxes2[..., 3]

    boxes1 = tf.concat([boxes1[..., :2] - boxes1[..., 2:] * 0.5,
                        boxes1[..., :2] + boxes1[..., 2:] * 0.5], axis=-1)
    boxes2 = tf.concat([boxes2[..., :2] - boxes2[..., 2:] * 0.5,
                        boxes2[..., :2] + boxes2[..., 2:] * 0.5], axis=-1)

    left_up = tf.maximum(boxes1[..., :2], boxes2[..., :2])
    right_down = tf.minimum(boxes1[..., 2:], boxes2[..., 2:])

    inter_section = tf.maximum(right_down - left_up, 0.0)
    inter_area = inter_section[..., 0] * inter_section[..., 1]
    union_area = boxes1_area + boxes2_area - inter_area

    return 1.0 * inter_area / union_area
def read_class_names(class_file_name):
    # loads class name from a file
    names = {}
    with open(class_file_name, 'r') as data:
        for ID, name in enumerate(data):
            names[ID] = name.strip('\n')
    return names
def bbox_giou(boxes1, boxes2):
    boxes1 = tf.concat([boxes1[..., :2] - boxes1[..., 2:] * 0.5,
                        boxes1[..., :2] + boxes1[..., 2:] * 0.5], axis=-1)
    boxes2 = tf.concat([boxes2[..., :2] - boxes2[..., 2:] * 0.5,
                        boxes2[..., :2] + boxes2[..., 2:] * 0.5], axis=-1)

    boxes1 = tf.concat([tf.minimum(boxes1[..., :2], boxes1[..., 2:]),
                        tf.maximum(boxes1[..., :2], boxes1[..., 2:])], axis=-1)
    boxes2 = tf.concat([tf.minimum(boxes2[..., :2], boxes2[..., 2:]),
                        tf.maximum(boxes2[..., :2], boxes2[..., 2:])], axis=-1)

    boxes1_area = (boxes1[..., 2] - boxes1[..., 0]) * (boxes1[..., 3] - boxes1[..., 1])
    boxes2_area = (boxes2[..., 2] - boxes2[..., 0]) * (boxes2[..., 3] - boxes2[..., 1])

    left_up = tf.maximum(boxes1[..., :2], boxes2[..., :2])
    right_down = tf.minimum(boxes1[..., 2:], boxes2[..., 2:])

    inter_section = tf.maximum(right_down - left_up, 0.0)
    inter_area = inter_section[..., 0] * inter_section[..., 1]
    union_area = boxes1_area + boxes2_area - inter_area

    # Calculate the iou value between the two bounding boxes
    iou = inter_area / union_area

    # Calculate the coordinates of the upper left corner and the lower right corner of the smallest closed convex surface
    enclose_left_up = tf.minimum(boxes1[..., :2], boxes2[..., :2])
    enclose_right_down = tf.maximum(boxes1[..., 2:], boxes2[..., 2:])
    enclose = tf.maximum(enclose_right_down - enclose_left_up, 0.0)

    # Calculate the area of the smallest closed convex surface C
    enclose_area = enclose[..., 0] * enclose[..., 1]

    # Calculate the GIoU value according to the GioU formula  
    giou = iou - 1.0 * (enclose_area - union_area) / enclose_area

    return giou
def bbox_ciou(boxes1, boxes2):
    boxes1_coor = tf.concat([boxes1[..., :2] - boxes1[..., 2:] * 0.5,
                        boxes1[..., :2] + boxes1[..., 2:] * 0.5], axis=-1)
    boxes2_coor = tf.concat([boxes2[..., :2] - boxes2[..., 2:] * 0.5,
                        boxes2[..., :2] + boxes2[..., 2:] * 0.5], axis=-1)

    left = tf.maximum(boxes1_coor[..., 0], boxes2_coor[..., 0])
    up = tf.maximum(boxes1_coor[..., 1], boxes2_coor[..., 1])
    right = tf.maximum(boxes1_coor[..., 2], boxes2_coor[..., 2])
    down = tf.maximum(boxes1_coor[..., 3], boxes2_coor[..., 3])

    c = (right - left) * (right - left) + (up - down) * (up - down)
    iou = bbox_iou(boxes1, boxes2)

    u = (boxes1[..., 0] - boxes2[..., 0]) * (boxes1[..., 0] - boxes2[..., 0]) + (boxes1[..., 1] - boxes2[..., 1]) * (boxes1[..., 1] - boxes2[..., 1])
    d = u / c

    ar_gt = boxes2[..., 2] / boxes2[..., 3]
    ar_pred = boxes1[..., 2] / boxes1[..., 3]

    ar_loss = 4 / (np.pi * np.pi) * (tf.atan(ar_gt) - tf.atan(ar_pred)) * (tf.atan(ar_gt) - tf.atan(ar_pred))
    alpha = ar_loss / (1 - iou + ar_loss + 0.000001)
    ciou_term = d + alpha * ar_loss

    return iou - ciou_term
def compute_loss(pred, conv, label, bboxes, i=0, CLASSES=YOLO_COCO_CLASSES):
    NUM_CLASS = len(read_class_names(CLASSES))
    conv_shape  = tf.shape(conv)
    batch_size  = conv_shape[0]
    output_size = conv_shape[1]
    input_size  = STRIDES[i] * output_size
    conv = tf.reshape(conv, (batch_size, output_size, output_size, 3, 5 + NUM_CLASS))

    conv_raw_conf = conv[:, :, :, :, 4:5]
    conv_raw_prob = conv[:, :, :, :, 5:]

    pred_xywh     = pred[:, :, :, :, 0:4]
    pred_conf     = pred[:, :, :, :, 4:5]

    label_xywh    = label[:, :, :, :, 0:4]
    respond_bbox  = label[:, :, :, :, 4:5]
    label_prob    = label[:, :, :, :, 5:]

    giou = tf.expand_dims(bbox_giou(pred_xywh, label_xywh), axis=-1)
    input_size = tf.cast(input_size, tf.float32)

    bbox_loss_scale = 2.0 - 1.0 * label_xywh[:, :, :, :, 2:3] * label_xywh[:, :, :, :, 3:4] / (input_size ** 2)
    giou_loss = respond_bbox * bbox_loss_scale * (1 - giou)

    iou = bbox_iou(pred_xywh[:, :, :, :, np.newaxis, :], bboxes[:, np.newaxis, np.newaxis, np.newaxis, :, :])
    # Find the value of IoU with the real box The largest prediction box
    max_iou = tf.expand_dims(tf.reduce_max(iou, axis=-1), axis=-1)

    # If the largest iou is less than the threshold, it is considered that the prediction box contains no objects, then the background box
    respond_bgd = (1.0 - respond_bbox) * tf.cast( max_iou < YOLO_IOU_LOSS_THRESH, tf.float32 )

    conf_focal = tf.pow(respond_bbox - pred_conf, 2)

    # Calculate the loss of confidence
    # we hope that if the grid contains objects, then the network output prediction box has a confidence of 1 and 0 when there is no object.
    conf_loss = conf_focal * (
            respond_bbox * tf.nn.sigmoid_cross_entropy_with_logits(labels=respond_bbox, logits=conv_raw_conf)
            +
            respond_bgd * tf.nn.sigmoid_cross_entropy_with_logits(labels=respond_bbox, logits=conv_raw_conf)
    )

    prob_loss = respond_bbox * tf.nn.sigmoid_cross_entropy_with_logits(labels=label_prob, logits=conv_raw_prob)

    giou_loss = tf.reduce_mean(tf.reduce_sum(giou_loss, axis=[1,2,3,4]))
    conf_loss = tf.reduce_mean(tf.reduce_sum(conf_loss, axis=[1,2,3,4]))
    prob_loss = tf.reduce_mean(tf.reduce_sum(prob_loss, axis=[1,2,3,4]))

    return giou_loss, conf_loss, prob_loss
def read_class_names(class_file_name):
    # loads class name from a file
    names = {}
    with open(class_file_name, 'r') as data:
        for ID, name in enumerate(data):
            names[ID] = name.strip('\n')
    return names

### Training

In [None]:
steps_per_epoch = len(trainset)
global_steps = tf.Variable(1, trainable=False, dtype=tf.int64)
warmup_steps = TRAIN_WARMUP_EPOCHS * steps_per_epoch
total_steps = TRAIN_EPOCHS * steps_per_epoch

In [None]:
if os.path.exists(TRAIN_LOGDIR): shutil.rmtree(TRAIN_LOGDIR)
writer = tf.summary.create_file_writer(TRAIN_LOGDIR)

validate_writer = tf.summary.create_file_writer(TRAIN_LOGDIR)
def validate_step(image_data, target):
    with tf.GradientTape() as tape:
        pred_result = yolo(image_data, training=False)
        giou_loss=conf_loss=prob_loss=0

        # optimizing process
        grid = 3 if not TRAIN_YOLO_TINY else 2
        for i in range(grid):
            conv, pred = pred_result[i*2], pred_result[i*2+1]
            
            loss_items = compute_loss(pred, conv, *target[i], i, CLASSES=TRAIN_CLASSES)
            giou_loss += loss_items[0]
            conf_loss += loss_items[1]
            prob_loss += loss_items[2]

        total_loss = giou_loss + conf_loss + prob_loss

    return giou_loss.numpy(), conf_loss.numpy(), prob_loss.numpy(), total_loss.numpy()

In [None]:
optimizer = tf.keras.optimizers.Adam()

def train_step(image_data, target):
    with tf.GradientTape() as tape:
        pred_result = yolo(image_data, training=True)
        giou_loss=conf_loss=prob_loss=0

        # optimizing process
        grid = 3 if not TRAIN_YOLO_TINY else 2
        for i in range(grid):
            conv, pred = pred_result[i*2], pred_result[i*2+1]
            loss_items = compute_loss(pred, conv, *target[i], i, CLASSES=TRAIN_CLASSES)
            giou_loss += loss_items[0]
            conf_loss += loss_items[1]
            prob_loss += loss_items[2]

        total_loss = giou_loss + conf_loss + prob_loss
        print(" Total Loss ", total_loss)

        gradients = tape.gradient(total_loss, yolo.trainable_variables)
        optimizer.apply_gradients(zip(gradients, yolo.trainable_variables))

        # update learning rate
        # about warmup: https://arxiv.org/pdf/1812.01187.pdf&usg=ALkJrhglKOPDjNt6SHGbphTHyMcT0cuMJg
        global_steps.assign_add(1)
        if global_steps < warmup_steps:# and not TRAIN_TRANSFER:
            lr = global_steps / warmup_steps * TRAIN_LR_INIT
        else:
            lr = TRAIN_LR_END + 0.5 * (TRAIN_LR_INIT - TRAIN_LR_END)*(
                (1 + tf.cos((global_steps - warmup_steps) / (total_steps - warmup_steps) * np.pi)))
        optimizer.lr.assign(lr.numpy())

#         # writing summary data
#         with writer.as_default():
#             tf.summary.scalar("lr", optimizer.lr, step=global_steps)
#             tf.summary.scalar("loss/total_loss", total_loss, step=global_steps)
#             tf.summary.scalar("loss/giou_loss", giou_loss, step=global_steps)
#             tf.summary.scalar("loss/conf_loss", conf_loss, step=global_steps)
#             tf.summary.scalar("loss/prob_loss", prob_loss, step=global_steps)
#         writer.flush()

    return global_steps.numpy(), optimizer.lr.numpy(), giou_loss.numpy(), conf_loss.numpy(), prob_loss.numpy(), total_loss.numpy()

In [None]:
best_val_loss = 1000 # should be large at start
for epoch in range(TRAIN_EPOCHS):
    for image_data, target in trainset:
        results = train_step(image_data, target)
        print(" ****  ")
        cur_step = results[0]%steps_per_epoch
        print("epoch:{:2.0f} step:{:5.0f}/{}, lr:{:.6f}, giou_loss:{:7.2f}, conf_loss:{:7.2f}, prob_loss:{:7.2f}, total_loss:{:7.2f}"
              .format(epoch, cur_step, steps_per_epoch, results[1], results[2], results[3], results[4], results[5]))
        break
#     break
    
    if len(testset) == 0:
        print("configure TEST options to validate model")
        yolo.save_weights(os.path.join(TRAIN_CHECKPOINTS_FOLDER, TRAIN_MODEL_NAME))
        continue
    print(" Validation ")
    count, giou_val, conf_val, prob_val, total_val = 0., 0, 0, 0, 0
    for image_data, target in testset:
        print(" Running val ")
        results = validate_step(image_data, target)
        count += 1
        giou_val += results[0]
        conf_val += results[1]
        prob_val += results[2]
        total_val += results[3]
    # writing validate summary data
    with validate_writer.as_default():
        tf.summary.scalar("validate_loss/total_val", total_val/count, step=epoch)
        tf.summary.scalar("validate_loss/giou_val", giou_val/count, step=epoch)
        tf.summary.scalar("validate_loss/conf_val", conf_val/count, step=epoch)
        tf.summary.scalar("validate_loss/prob_val", prob_val/count, step=epoch)
    validate_writer.flush()

    print("\n\ngiou_val_loss:{:7.2f}, conf_val_loss:{:7.2f}, prob_val_loss:{:7.2f}, total_val_loss:{:7.2f}\n\n".
          format(giou_val/count, conf_val/count, prob_val/count, total_val/count))

    if TRAIN_SAVE_CHECKPOINT and not TRAIN_SAVE_BEST_ONLY:
        save_directory = os.path.join(TRAIN_CHECKPOINTS_FOLDER, TRAIN_MODEL_NAME+"_val_loss_{:7.2f}".format(total_val/count))
        yolo.save_weights(save_directory)
    if TRAIN_SAVE_BEST_ONLY and best_val_loss>total_val/count:
        save_directory = os.path.join(TRAIN_CHECKPOINTS_FOLDER, TRAIN_MODEL_NAME)
        yolo.save_weights(save_directory)
        best_val_loss = total_val/count
    if not TRAIN_SAVE_BEST_ONLY and not TRAIN_SAVE_CHECKPOINT:
        save_directory = os.path.join(TRAIN_CHECKPOINTS_FOLDER, TRAIN_MODEL_NAME)
        yolo.save_weights(save_directory)


In [None]:
yolo.summary()

### Inference

In [None]:
import cv2
import time
import random
import colorsys
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt

In [None]:
infer = YOLOV3(10, TRAIN=False)
tert = np.random.rand(1,416,416,3).astype(np.float32)
bert = infer(tert, training=False)

In [None]:
# infer.load_weights()

In [None]:
def draw_bbox(image, bboxes, CLASSES=YOLO_COCO_CLASSES, show_label=True, show_confidence = True, Text_colors=(255,255,0), rectangle_colors='', tracking=False):   
    NUM_CLASS = read_class_names(CLASSES)
    num_classes = len(NUM_CLASS)
    image_h, image_w, _ = image.shape
    hsv_tuples = [(1.0 * x / num_classes, 1., 1.) for x in range(num_classes)]
    #print("hsv_tuples", hsv_tuples)
    colors = list(map(lambda x: colorsys.hsv_to_rgb(*x), hsv_tuples))
    colors = list(map(lambda x: (int(x[0] * 255), int(x[1] * 255), int(x[2] * 255)), colors))

    random.seed(0)
    random.shuffle(colors)
    random.seed(None)

    for i, bbox in enumerate(bboxes):
        coor = np.array(bbox[:4], dtype=np.int32)
        score = bbox[4]
        class_ind = int(bbox[5])
        bbox_color = rectangle_colors if rectangle_colors != '' else colors[class_ind]
        bbox_thick = int(0.6 * (image_h + image_w) / 1000)
        if bbox_thick < 1: bbox_thick = 1
        fontScale = 0.75 * bbox_thick
        (x1, y1), (x2, y2) = (coor[0], coor[1]), (coor[2], coor[3])

        # put object rectangle
        cv2.rectangle(image, (x1, y1), (x2, y2), bbox_color, bbox_thick*2)

        if show_label:
            # get text label
            score_str = " {:.2f}".format(score) if show_confidence else ""

            if tracking: score_str = " "+str(score)

            label = "{}".format(NUM_CLASS[class_ind]) + score_str

            # get text size
            (text_width, text_height), baseline = cv2.getTextSize(label, cv2.FONT_HERSHEY_COMPLEX_SMALL,
                                                                  fontScale, thickness=bbox_thick)
            # put filled text rectangle
            cv2.rectangle(image, (x1, y1), (x1 + text_width, y1 - text_height - baseline), bbox_color, thickness=cv2.FILLED)

            # put text above rectangle
            cv2.putText(image, label, (x1, y1-4), cv2.FONT_HERSHEY_COMPLEX_SMALL,
                        fontScale, Text_colors, bbox_thick, lineType=cv2.LINE_AA)

    return image
def bboxes_iou(boxes1, boxes2):
    boxes1 = np.array(boxes1)
    boxes2 = np.array(boxes2)

    boxes1_area = (boxes1[..., 2] - boxes1[..., 0]) * (boxes1[..., 3] - boxes1[..., 1])
    boxes2_area = (boxes2[..., 2] - boxes2[..., 0]) * (boxes2[..., 3] - boxes2[..., 1])

    left_up       = np.maximum(boxes1[..., :2], boxes2[..., :2])
    right_down    = np.minimum(boxes1[..., 2:], boxes2[..., 2:])

    inter_section = np.maximum(right_down - left_up, 0.0)
    inter_area    = inter_section[..., 0] * inter_section[..., 1]
    union_area    = boxes1_area + boxes2_area - inter_area
    ious          = np.maximum(1.0 * inter_area / union_area, np.finfo(np.float32).eps)

    return ious
def nms(bboxes, iou_threshold, sigma=0.3, method='nms'):
    """
    :param bboxes: (xmin, ymin, xmax, ymax, score, class)

    Note: soft-nms, https://arxiv.org/pdf/1704.04503.pdf
          https://github.com/bharatsingh430/soft-nms
    """
    classes_in_img = list(set(bboxes[:, 5]))
    best_bboxes = []

    for cls in classes_in_img:
        cls_mask = (bboxes[:, 5] == cls)
        cls_bboxes = bboxes[cls_mask]
        # Process 1: Determine whether the number of bounding boxes is greater than 0 
        while len(cls_bboxes) > 0:
            # Process 2: Select the bounding box with the highest score according to socre order A
            max_ind = np.argmax(cls_bboxes[:, 4])
            best_bbox = cls_bboxes[max_ind]
            best_bboxes.append(best_bbox)
            cls_bboxes = np.concatenate([cls_bboxes[: max_ind], cls_bboxes[max_ind + 1:]])
            # Process 3: Calculate this bounding box A and
            # Remain all iou of the bounding box and remove those bounding boxes whose iou value is higher than the threshold 
            iou = bboxes_iou(best_bbox[np.newaxis, :4], cls_bboxes[:, :4])
            weight = np.ones((len(iou),), dtype=np.float32)

            assert method in ['nms', 'soft-nms']

            if method == 'nms':
                iou_mask = iou > iou_threshold
                weight[iou_mask] = 0.0

            if method == 'soft-nms':
                weight = np.exp(-(1.0 * iou ** 2 / sigma))

            cls_bboxes[:, 4] = cls_bboxes[:, 4] * weight
            score_mask = cls_bboxes[:, 4] > 0.
            cls_bboxes = cls_bboxes[score_mask]

    return best_bboxes
def postprocess_boxes(pred_bbox, original_image, input_size, score_threshold):
    valid_scale=[0, np.inf]
    pred_bbox = np.array(pred_bbox)

    pred_xywh = pred_bbox[:, 0:4]
    pred_conf = pred_bbox[:, 4]
    pred_prob = pred_bbox[:, 5:]

    # 1. (x, y, w, h) --> (xmin, ymin, xmax, ymax)
    pred_coor = np.concatenate([pred_xywh[:, :2] - pred_xywh[:, 2:] * 0.5,
                                pred_xywh[:, :2] + pred_xywh[:, 2:] * 0.5], axis=-1)
    # 2. (xmin, ymin, xmax, ymax) -> (xmin_org, ymin_org, xmax_org, ymax_org)
    org_h, org_w = original_image.shape[:2]
    resize_ratio = min(input_size / org_w, input_size / org_h)

    dw = (input_size - resize_ratio * org_w) / 2
    dh = (input_size - resize_ratio * org_h) / 2

    pred_coor[:, 0::2] = 1.0 * (pred_coor[:, 0::2] - dw) / resize_ratio
    pred_coor[:, 1::2] = 1.0 * (pred_coor[:, 1::2] - dh) / resize_ratio

    # 3. clip some boxes those are out of range
    pred_coor = np.concatenate([np.maximum(pred_coor[:, :2], [0, 0]),
                                np.minimum(pred_coor[:, 2:], [org_w - 1, org_h - 1])], axis=-1)
    invalid_mask = np.logical_or((pred_coor[:, 0] > pred_coor[:, 2]), (pred_coor[:, 1] > pred_coor[:, 3]))
    pred_coor[invalid_mask] = 0

    # 4. discard some invalid boxes
    bboxes_scale = np.sqrt(np.multiply.reduce(pred_coor[:, 2:4] - pred_coor[:, 0:2], axis=-1))
    scale_mask = np.logical_and((valid_scale[0] < bboxes_scale), (bboxes_scale < valid_scale[1]))

    # 5. discard boxes with low scores
    classes = np.argmax(pred_prob, axis=-1)
    scores = pred_conf * pred_prob[np.arange(len(pred_coor)), classes]
    score_mask = scores > score_threshold
    mask = np.logical_and(scale_mask, score_mask)
    coors, scores, classes = pred_coor[mask], scores[mask], classes[mask]

    return np.concatenate([coors, scores[:, np.newaxis], classes[:, np.newaxis]], axis=-1)
def image_preprocess(image, target_size, gt_boxes=None):
    ih, iw    = target_size
    h,  w, _  = image.shape

    scale = min(iw/w, ih/h)
    nw, nh  = int(scale * w), int(scale * h)
    image_resized = cv2.resize(image, (nw, nh))

    image_paded = np.full(shape=[ih, iw, 3], fill_value=128.0)
    dw, dh = (iw - nw) // 2, (ih-nh) // 2
    image_paded[dh:nh+dh, dw:nw+dw, :] = image_resized
    image_paded = image_paded / 255.

    if gt_boxes is None:
        return image_paded

    else:
        gt_boxes[:, [0, 2]] = gt_boxes[:, [0, 2]] * scale + dw
        gt_boxes[:, [1, 3]] = gt_boxes[:, [1, 3]] * scale + dh
        return image_paded, gt_boxes

In [None]:
def detect_image(YoloV3, image_path, output_path, input_size=416, show=False, CLASSES=TRAIN_CLASSES, score_threshold=0.3, iou_threshold=0.45, rectangle_colors=''):
    original_image      = cv2.imread(image_path)
    original_image      = cv2.cvtColor(original_image, cv2.COLOR_BGR2RGB)
    original_image      = cv2.cvtColor(original_image, cv2.COLOR_BGR2RGB)

    image_data = image_preprocess(np.copy(original_image), [input_size, input_size])
    image_data = tf.expand_dims(image_data, 0)

    pred_bbox = YoloV3.predict(image_data)
    pred_bbox = [tf.reshape(x, (-1, tf.shape(x)[-1])) for x in pred_bbox]
    pred_bbox = tf.concat(pred_bbox, axis=0)
    
    bboxes = postprocess_boxes(pred_bbox, original_image, input_size, score_threshold)
    bboxes = nms(bboxes, iou_threshold, method='nms')

    image = draw_bbox(original_image, bboxes, CLASSES=CLASSES, rectangle_colors=rectangle_colors)

    if output_path != '': cv2.imwrite(output_path, image)
    if show:
        # Show the image
        cv2.imshow("predicted image", image)
        # Load and hold the image
        cv2.waitKey(0)
        # To close the window after the required kill value was provided
        cv2.destroyAllWindows()
        
    return image

In [None]:
image_path   = "Dataset/MNIST/mnist_train/000001.jpg"

In [None]:
image = detect_image(yolo, image_path, '', input_size=416, show=False, rectangle_colors=(255,0,0))
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

In [None]:
plt.figure(figsize=(30,15))
plt.imshow(image)

### Evaluation


In [None]:
import cv2
import numpy as np
import tensorflow as tf
from Dataset.MNIST.mnist_data import Dataset
from Configs.mnist_config import *
import shutil
import json
import time

In [None]:
gpus = tf.config.experimental.list_physical_devices('GPU')
if len(gpus) > 0:
    try: tf.config.experimental.set_memory_growth(gpus[0], True)
    except RuntimeError: print("RuntimeError in tf.config.experimental.list_physical_devices('GPU')")

In [None]:
def voc_ap(rec, prec):
    """
    --- Official matlab code VOC2012---
    mrec=[0 ; rec ; 1];
    mpre=[0 ; prec ; 0];
    for i=numel(mpre)-1:-1:1
            mpre(i)=max(mpre(i),mpre(i+1));
    end
    i=find(mrec(2:end)~=mrec(1:end-1))+1;
    ap=sum((mrec(i)-mrec(i-1)).*mpre(i));
    """
    rec.insert(0, 0.0) # insert 0.0 at begining of list
    rec.append(1.0) # insert 1.0 at end of list
    mrec = rec[:]
    prec.insert(0, 0.0) # insert 0.0 at begining of list
    prec.append(0.0) # insert 0.0 at end of list
    mpre = prec[:]
    """
     This part makes the precision monotonically decreasing
        (goes from the end to the beginning)
        matlab:  for i=numel(mpre)-1:-1:1
                                mpre(i)=max(mpre(i),mpre(i+1));
    """
    # matlab indexes start in 1 but python in 0, so I have to do:
    #   range(start=(len(mpre) - 2), end=0, step=-1)
    # also the python function range excludes the end, resulting in:
    #   range(start=(len(mpre) - 2), end=-1, step=-1)
    for i in range(len(mpre)-2, -1, -1):
        mpre[i] = max(mpre[i], mpre[i+1])
    """
     This part creates a list of indexes where the recall changes
        matlab:  i=find(mrec(2:end)~=mrec(1:end-1))+1;
    """
    i_list = []
    for i in range(1, len(mrec)):
        if mrec[i] != mrec[i-1]:
            i_list.append(i) # if it was matlab would be i + 1
    """
     The Average Precision (AP) is the area under the curve
        (numerical integration)
        matlab: ap=sum((mrec(i)-mrec(i-1)).*mpre(i));
    """
    ap = 0.0
    for i in i_list:
        ap += ((mrec[i]-mrec[i-1])*mpre[i])
    return ap, mrec, mpre
def get_mAP(model, dataset, score_threshold=0.25, iou_threshold=0.50, TEST_INPUT_SIZE=TEST_INPUT_SIZE):
    MINOVERLAP = 0.5 # default value (defined in the PASCAL VOC2012 challenge)
    NUM_CLASS = read_class_names(TRAIN_CLASSES)

    ground_truth_dir_path = 'mAP/ground-truth'
    if os.path.exists(ground_truth_dir_path): shutil.rmtree(ground_truth_dir_path)

    if not os.path.exists('mAP'): os.mkdir('mAP')
    os.mkdir(ground_truth_dir_path)

    print(f'\ncalculating mAP{int(iou_threshold*100)}...\n')

    gt_counter_per_class = {}
    for index in range(dataset.num_samples):
        ann_dataset = dataset.annotations[index]

        original_image, bbox_data_gt = dataset.parse_annotation(ann_dataset, True)

        if len(bbox_data_gt) == 0:
            bboxes_gt = []
            classes_gt = []
        else:
            bboxes_gt, classes_gt = bbox_data_gt[:, :4], bbox_data_gt[:, 4]
        ground_truth_path = os.path.join(ground_truth_dir_path, str(index) + '.txt')
        num_bbox_gt = len(bboxes_gt)

        bounding_boxes = []
        for i in range(num_bbox_gt):
            class_name = NUM_CLASS[classes_gt[i]]
            xmin, ymin, xmax, ymax = list(map(str, bboxes_gt[i]))
            bbox = xmin + " " + ymin + " " + xmax + " " +ymax
            bounding_boxes.append({"class_name":class_name, "bbox":bbox, "used":False})

            # count that object
            if class_name in gt_counter_per_class:
                gt_counter_per_class[class_name] += 1
            else:
                # if class didn't exist yet
                gt_counter_per_class[class_name] = 1
            bbox_mess = ' '.join([class_name, xmin, ymin, xmax, ymax]) + '\n'
        with open(f'{ground_truth_dir_path}/{str(index)}_ground_truth.json', 'w') as outfile:
            json.dump(bounding_boxes, outfile)

    gt_classes = list(gt_counter_per_class.keys())
    # sort the classes alphabetically
    gt_classes = sorted(gt_classes)
    n_classes = len(gt_classes)

    times = []
    json_pred = [[] for i in range(n_classes)]
    for index in range(dataset.num_samples):
        ann_dataset = dataset.annotations[index]

        image_name = ann_dataset[0].split('/')[-1]
        original_image, bbox_data_gt = dataset.parse_annotation(ann_dataset, True)
        
        image = image_preprocess(np.copy(original_image), [TEST_INPUT_SIZE, TEST_INPUT_SIZE])
        image_data = tf.expand_dims(image, 0)

        t1 = time.time()
        pred_bbox = model.predict(image_data)
        t2 = time.time()
        times.append(t2-t1)
        
        pred_bbox = [tf.reshape(x, (-1, tf.shape(x)[-1])) for x in pred_bbox]
        pred_bbox = tf.concat(pred_bbox, axis=0)

        bboxes = postprocess_boxes(pred_bbox, original_image, TEST_INPUT_SIZE, score_threshold)
        bboxes = nms(bboxes, iou_threshold, method='nms')

        for bbox in bboxes:
            coor = np.array(bbox[:4], dtype=np.int32)
            score = bbox[4]
            class_ind = int(bbox[5])
            class_name = NUM_CLASS[class_ind]
            score = '%.4f' % score
            xmin, ymin, xmax, ymax = list(map(str, coor))
            bbox = xmin + " " + ymin + " " + xmax + " " +ymax
            json_pred[gt_classes.index(class_name)].append({"confidence": str(score), "file_id": str(index), "bbox": str(bbox)})

    ms = sum(times)/len(times)*1000
    fps = 1000 / ms

    for class_name in gt_classes:
        json_pred[gt_classes.index(class_name)].sort(key=lambda x:float(x['confidence']), reverse=True)
        with open(f'{ground_truth_dir_path}/{class_name}_predictions.json', 'w') as outfile:
            json.dump(json_pred[gt_classes.index(class_name)], outfile)

    # Calculate the AP for each class
    sum_AP = 0.0
    ap_dictionary = {}
    # open file to store the results
    with open("mAP/results.txt", 'w') as results_file:
        results_file.write("# AP and precision/recall per class\n")
        count_true_positives = {}
        for class_index, class_name in enumerate(gt_classes):
            count_true_positives[class_name] = 0
            # Load predictions of that class
            predictions_file = f'{ground_truth_dir_path}/{class_name}_predictions.json'
            predictions_data = json.load(open(predictions_file))

            # Assign predictions to ground truth objects
            nd = len(predictions_data)
            tp = [0] * nd # creates an array of zeros of size nd
            fp = [0] * nd
            for idx, prediction in enumerate(predictions_data):
                file_id = prediction["file_id"]
                # assign prediction to ground truth object if any
                #   open ground-truth with that file_id
                gt_file = f'{ground_truth_dir_path}/{str(file_id)}_ground_truth.json'
                ground_truth_data = json.load(open(gt_file))
                ovmax = -1
                gt_match = -1
                # load prediction bounding-box
                bb = [ float(x) for x in prediction["bbox"].split() ] # bounding box of prediction
                for obj in ground_truth_data:
                    # look for a class_name match
                    if obj["class_name"] == class_name:
                        bbgt = [ float(x) for x in obj["bbox"].split() ] # bounding box of ground truth
                        bi = [max(bb[0],bbgt[0]), max(bb[1],bbgt[1]), min(bb[2],bbgt[2]), min(bb[3],bbgt[3])]
                        iw = bi[2] - bi[0] + 1
                        ih = bi[3] - bi[1] + 1
                        if iw > 0 and ih > 0:
                            # compute overlap (IoU) = area of intersection / area of union
                            ua = (bb[2] - bb[0] + 1) * (bb[3] - bb[1] + 1) + (bbgt[2] - bbgt[0]
                                            + 1) * (bbgt[3] - bbgt[1] + 1) - iw * ih
                            ov = iw * ih / ua
                            if ov > ovmax:
                                ovmax = ov
                                gt_match = obj

                # assign prediction as true positive/don't care/false positive
                if ovmax >= MINOVERLAP:# if ovmax > minimum overlap
                    if not bool(gt_match["used"]):
                        # true positive
                        tp[idx] = 1
                        gt_match["used"] = True
                        count_true_positives[class_name] += 1
                        # update the ".json" file
                        with open(gt_file, 'w') as f:
                            f.write(json.dumps(ground_truth_data))
                    else:
                        # false positive (multiple detection)
                        fp[idx] = 1
                else:
                    # false positive
                    fp[idx] = 1

            # compute precision/recall
            cumsum = 0
            for idx, val in enumerate(fp):
                fp[idx] += cumsum
                cumsum += val
            cumsum = 0
            for idx, val in enumerate(tp):
                tp[idx] += cumsum
                cumsum += val
            #print(tp)
            rec = tp[:]
            for idx, val in enumerate(tp):
                rec[idx] = float(tp[idx]) / gt_counter_per_class[class_name]
            #print(rec)
            prec = tp[:]
            for idx, val in enumerate(tp):
                prec[idx] = float(tp[idx]) / (fp[idx] + tp[idx])
            #print(prec)

            ap, mrec, mprec = voc_ap(rec, prec)
            sum_AP += ap
            text = "{0:.3f}%".format(ap*100) + " = " + class_name + " AP  " #class_name + " AP = {0:.2f}%".format(ap*100)

            rounded_prec = [ '%.3f' % elem for elem in prec ]
            rounded_rec = [ '%.3f' % elem for elem in rec ]
            # Write to results.txt
            results_file.write(text + "\n Precision: " + str(rounded_prec) + "\n Recall   :" + str(rounded_rec) + "\n\n")

            print(text)
            ap_dictionary[class_name] = ap

        results_file.write("\n# mAP of all classes\n")
        mAP = sum_AP / n_classes

        text = "mAP = {:.3f}%, {:.2f} FPS".format(mAP*100, fps)
        results_file.write(text + "\n")
        print(text)
        
        return mAP*100

In [None]:
yolo = YOLOV3(10, TRAIN=False)
tert = np.random.rand(1,416,416,3).astype(np.float32)
bert = yolo(tert, training=False)

In [None]:
yolo.load_weights('MNIST_WEIGHTS/check6')

In [None]:
from Dataset.MNIST.mnist_data import Dataset
testset = Dataset('test')

In [None]:
get_mAP(yolo, testset, score_threshold=TEST_SCORE_THRESHOLD, iou_threshold=TEST_IOU_THRESHOLD, TEST_INPUT_SIZE=YOLO_INPUT_SIZE)

# Yolo3 Mobilenet 

In [None]:
# %cd ..
# !git clone https://github.com/fsx950223/mobilenetv2-yolov3.git

In [None]:
%cd mobilenetv2-yolov3/

In [None]:
import tensorflow as tf

In [None]:
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2
from tensorflow.keras.layers import UpSampling2D, Concatenate
from tensorflow.keras.models import Model
from tensorflow.keras.layers import BatchNormalization, Input, Lambda, Conv2D
from tensorflow.keras.layers import LeakyReLU, UpSampling2D
from tensorflow.keras.regularizers import l2
from functools import wraps, reduce

In [None]:
def compose(*funcs):
    """Compose arbitrarily many functions, evaluated left to right.
    Reference: https://mathieularose.com/function-composition-in-python/
    """
    # return lambda x: reduce(lambda v, f: f(v), funcs, x)
    if funcs:
        return reduce(lambda f, g: lambda *a, **kw: g(f(*a, **kw)), funcs)
    else:
        raise ValueError('Composition of empty sequence not supported.')

In [None]:
def yolo3_mobilenetv2_body(inputs, num_anchors, num_classes, alpha=1.0):
    """Create YOLO_V3 MobileNetV2 model CNN body in Keras."""
    mobilenetv2 = MobileNetV2(input_tensor=inputs, weights='imagenet', include_top=False, alpha=alpha)
    print('backbone layers number: {}'.format(len(mobilenetv2.layers)))

    # input: 416 x 416 x 3
    # out_relu: 13 x 13 x 1280
    # block_13_expand_relu: 26 x 26 x (576*alpha)
    # block_6_expand_relu: 52 x 52 x (192*alpha)

    # f1 :13 x 13 x 1280
    f1 = mobilenetv2.get_layer('out_relu').output
    # f2: 26 x 26 x (576*alpha)
    f2 = mobilenetv2.get_layer('block_13_expand_relu').output
    # f3 : 52 x 52 x (192*alpha)
    f3 = mobilenetv2.get_layer('block_6_expand_relu').output

    f1_channel_num = int(1280*alpha)
    f2_channel_num = int(576*alpha)
    f3_channel_num = int(192*alpha)
    #f1_channel_num = 1024
    #f2_channel_num = 512
    #f3_channel_num = 256

    y1, y2, y3 = yolo3_predictions((f1, f2, f3), (f1_channel_num, f2_channel_num, f3_channel_num), num_anchors, num_classes)

    return Model(inputs = inputs, outputs=[y1,y2,y3])

In [None]:
def DarknetConv2D(*args, **kwargs):
    """Wrapper to set Darknet parameters for Conv2D."""
    darknet_conv_kwargs = {'kernel_regularizer': l2(5e-4)}
    darknet_conv_kwargs['padding'] = 'valid' if kwargs.get('strides')==(2,2) else 'same'
    darknet_conv_kwargs.update(kwargs)
    return Conv2D(*args, **darknet_conv_kwargs)

In [None]:
def DarknetConv2D_BN_Leaky(*args, **kwargs):
    """Darknet Convolution2D followed by BatchNormalization and LeakyReLU."""
    no_bias_kwargs = {'use_bias': False}
    no_bias_kwargs.update(kwargs)
    return compose(
        DarknetConv2D(*args, **no_bias_kwargs),
        BatchNormalization(),
        LeakyReLU(alpha=0.1))

In [None]:
def make_last_layers(x, num_filters, out_filters, predict_filters=None, predict_id='1'):
    '''6 Conv2D_BN_Leaky layers followed by a Conv2D_linear layer'''
    x = compose(
            DarknetConv2D_BN_Leaky(num_filters, (1,1)),
            DarknetConv2D_BN_Leaky(num_filters*2, (3,3)),
            DarknetConv2D_BN_Leaky(num_filters, (1,1)),
            DarknetConv2D_BN_Leaky(num_filters*2, (3,3)),
            DarknetConv2D_BN_Leaky(num_filters, (1,1)))(x)

    if predict_filters is None:
        predict_filters = num_filters*2
    y = compose(
            DarknetConv2D_BN_Leaky(predict_filters, (3,3)),
            DarknetConv2D(out_filters, (1,1), name='predict_conv_' + predict_id))(x)
    return x, y

In [None]:
def yolo3_predictions(feature_maps, feature_channel_nums, num_anchors, num_classes, use_spp=False):
    f1, f2, f3 = feature_maps
    f1_channel_num, f2_channel_num, f3_channel_num = feature_channel_nums

    #feature map 1 head & output (13x13 for 416 input)
    if use_spp:
        x, y1 = make_spp_last_layers(f1, f1_channel_num//2, num_anchors * (num_classes + 5), predict_id='1')
    else:
        x, y1 = make_last_layers(f1, f1_channel_num//2, num_anchors * (num_classes + 5), predict_id='1')

    #upsample fpn merge for feature map 1 & 2
    x = compose(
            DarknetConv2D_BN_Leaky(f2_channel_num//2, (1,1)),
            UpSampling2D(2))(x)
    x = Concatenate()([x,f2])

    #feature map 2 head & output (26x26 for 416 input)
    x, y2 = make_last_layers(x, f2_channel_num//2, num_anchors*(num_classes+5), predict_id='2')

    #upsample fpn merge for feature map 2 & 3
    x = compose(
            DarknetConv2D_BN_Leaky(f3_channel_num//2, (1,1)),
            UpSampling2D(2))(x)
    x = Concatenate()([x, f3])

    #feature map 3 head & output (52x52 for 416 input)
    x, y3 = make_last_layers(x, f3_channel_num//2, num_anchors*(num_classes+5), predict_id='3')

    return y1, y2, y3

In [None]:
input_shape =  (416 ,416 , 3)

In [None]:
input_tensor = Input(shape=input_shape, name='image_input')

In [None]:
model = yolo3_mobilenetv2_body(input_tensor, 9, 10)

In [None]:
model.summary()
