<a href="https://colab.research.google.com/github/Amir-D-Shadow/Google-Colab/blob/main/start.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
#drive.flush_and_unmount()
drive.mount("/content/gdrive")

In [None]:
import tensorflow as tf
import numpy as np
from tensorflow.keras.utils import plot_model
from numba import jit
import random
import os
import pandas as pd
import json
import cv2
import tensorflow.keras.backend as K

In [None]:
def foo(a):

  while True:

    if a > 8:

      break

    a = a + 1

    yield a


In [None]:
for i in foo(3):

  print((i is None))

In [None]:
class CBS(tf.keras.Model):

   def __init__(self,filters=32,kernel_size=3,strides=2,padding='valid',**kwargs):

      #initialization
      super(CBS,self).__init__(**kwargs)

      #define layers
      self.conv2D_x = tf.keras.layers.Conv2D(filters=filters,kernel_size=kernel_size,strides=strides,padding=padding,data_format="channels_last")

      self.BN_x = tf.keras.layers.BatchNormalization(axis=-1)
      

   def call(self,inputs,train_flag=True):

      """
      input -- tensorflow layer with shape (m,n_H,n_W,n_C)
      """

      #convolution layer
      conv2D_x = self.conv2D_x(inputs)

      #Batch Normalization layer
      BN_x = self.BN_x(conv2D_x,training=train_flag)

      #activate by sigmoid
      output_sigmoid = tf.keras.activations.sigmoid(BN_x)

      return output_sigmoid

In [None]:
class CBL(tf.keras.Model):

   def __init__(self,filters=32,kernel_size=3,strides=2,padding='valid',**kwargs):

      #initialization
      super(CBL,self).__init__(**kwargs)

      #define layers
      self.conv2D_x = tf.keras.layers.Conv2D(filters=filters,kernel_size=kernel_size,strides=strides,padding=padding,data_format="channels_last")

      self.BN_x = tf.keras.layers.BatchNormalization(axis=-1)

      self.output_leaky_relu = tf.keras.layers.LeakyReLU()
      

   def call(self,inputs,train_flag=True):

      """
      input -- tensorflow layer with shape (m,n_H,n_W,n_C)
      """

      #convolution layer
      conv2D_x = self.conv2D_x(inputs)

      #Batch Normalization layer
      BN_x = self.BN_x(conv2D_x,training=train_flag)

      #activate by Leaky relu
      output_leaky_relu = self.output_leaky_relu(BN_x)

      return output_leaky_relu

In [None]:
#activation Mish
def Mish(x):

   softplus = tf.math.softplus(x)
   tanh_s = tf.math.tanh(softplus)

   return (x * tanh_s)


#CBM Module
class CBM(tf.keras.Model):

   def __init__(self,filters=32,kernel_size=3,strides=2,padding="valid",**kwargs):

      #initialization
      super(CBM,self).__init__(**kwargs)

      #define layers
      self.conv2D_x = tf.keras.layers.Conv2D(filters=filters,kernel_size=kernel_size,strides=strides,padding=padding,data_format="channels_last")

      self.BN_x = tf.keras.layers.BatchNormalization(axis=-1)

      self.output_Mish = tf.keras.layers.Lambda(Mish)

      

   def call(self,inputs,train_flag=True):

      """
      input -- tensorflow layer with shape (m,n_H,n_W,n_C)
      """

      #Convolution 2D layer
      conv2D_x = self.conv2D_x(inputs)

      #Batch Normalization layer
      BN_x = self.BN_x(conv2D_x,training=train_flag)

      #activate by Mish
      output_Mish = self.output_Mish(BN_x)

      return output_Mish

In [None]:
#Backbone CSPX
class CSPX(tf.keras.Model):

   def __init__(self,CSPX_info,**kwargs):

      """
      CSPX_info -- dictionary containing information: num_of_res_unit , res_unit block info , CBM block info , CBL_info

                     - hpara: (filters,kernel_size,strides,padding)

                     
      Module Graph:
      
      ------ CBL_1 ------ CBL_2 ------ res_unit * X ------ CBL_3 -----
                     |                                               |
                     |                                               |______
                     |                                                ______  Concat --- BN --- leaky relu --- CBM_1 
                     |                                               |
                     |                                               |
                     -------------------------------- CBL_4 ----------
      """

      #initialization
      super(CSPX,self).__init__(**kwargs)

      #extract num_of_res_unit
      self.num_of_res_unit = CSPX_info["num_of_res_unit"]

      #define layers

      #res_unit
      self.res_unit_seq = {}

      #Important: When defining the CSPX layer, remember to define res unit info (dictionary key) in the form of res_unit_i : i start from 1
      for i in range(1,self.num_of_res_unit+1):

         #Extract res_unit_i info
         res_unit_info = CSPX_info[f"res_unit_{i}"]

         #define resunit layer
         self.res_unit_seq[f"res_unit_{i}"] = res_unit(res_unit_info)
         

      #CBL_1
      filters,kernel_size,strides,padding = CSPX_info["CBL_1"]
      
      self.CBL_1 = CBL(filters,kernel_size,strides,padding)

      #CBL_2
      filters,kernel_size,strides,padding = CSPX_info["CBL_2"]
      
      self.CBL_2 = CBL(filters,kernel_size,strides,padding)

      #CBL_3
      filters,kernel_size,strides,padding = CSPX_info["CBL_3"]
      
      self.CBL_3 = CBL(filters,kernel_size,strides,padding)

      #CBL_4
      filters,kernel_size,strides,padding = CSPX_info["CBL_4"]
      
      self.CBL_4 = CBL(filters,kernel_size,strides,padding)

      #BN
      self.BN_x = tf.keras.layers.BatchNormalization(axis=-1)

      #leaky relu
      self.leaky_relu_x = tf.keras.layers.LeakyReLU()

      #CBM_1
      filters,kernel_size,strides,padding = CSPX_info["CBM_1"]

      self.CBM_1 = CBM(filters,kernel_size,strides,padding)
      

   def call(self,inputs,train_flag=True):

      """
      input -- tensorflow layer with shape (m,n_H,n_W,n_C)
      """

      x = inputs

      #CBL_1
      CBL_1 = self.CBL_1(x,train_flag)

      #CBL_2
      CBL_2 = self.CBL_2(CBL_1,train_flag)

      #res_unit block
      res_unit_block = CBL_2
      
      for i in range(1,self.num_of_res_unit+1):

         res_unit_block =  (self.res_unit_seq[f"res_unit_{i}"])(res_unit_block,train_flag) 

      #CBL3
      CBL_3 = self.CBL_3(res_unit_block,train_flag)
      
      #CBL_4
      CBL_4 = self.CBL_4(CBL_1,train_flag)

      #Concat
      mid_concat = tf.keras.layers.concatenate(inputs=[CBL_3,CBL_4],axis=-1)

      #Batch Normalization
      BN_x = self.BN_x(mid_concat,training=train_flag)

      #leaky_relu_x
      leaky_relu_x = self.leaky_relu_x(BN_x)

      #output_CBM
      output_CBM = self.CBM_1(leaky_relu_x,train_flag)

      return output_CBM
      



#Neck CSPX
class CSPX_Neck(tf.keras.Model):

   def __init__(self,NECK_info,**kwargs):
      
      """
      NECK_info -- dictionary containing information: num_of_CBL, CBM block info , CBL block info , conv2D info

                     - hpara: (filters,kernel_size,strides,padding)

                     
      Module Graph:
      
      ----------- CBL * X ------ conv2D_1 ----------------
         |                                               |
         |                                               |______
         |                                                ______  Concat --- BN --- leaky relu --- CBM_1 
         |                                               |
         |                                               |
         -------------------------------- conv2D_2 -------
         
      """
      
      #initialization
      super(CSPX_Neck,self).__init__(**kwargs)

      #Get num_of_CBL
      self.num_of_CBL = NECK_info["num_of_CBL"]

      #define layers

      #CBL_X
      self.CBL_seq = {}

      for i in range(1,self.num_of_CBL+1):

         filters,kernel_size,strides,padding = NECK_info[f"CBL_{i}"]

         self.CBL_seq[f"CBL_{i}"] = CBL(filters,kernel_size,strides,padding)


      #Conv2D_1
      filters,kernel_size,strides,padding = NECK_info["conv2D_1"]
      
      self.conv2D_1 = tf.keras.layers.Conv2D(filters=filters,kernel_size=kernel_size,strides=strides,padding=padding,data_format="channels_last")

      #conv2D_2
      filters,kernel_size,strides,padding = NECK_info["conv2D_2"]

      self.conv2D_2 = tf.keras.layers.Conv2D(filters=filters,kernel_size=kernel_size,strides=strides,padding=padding,data_format="channels_last")

      #Batch Normalization
      self.BN_x = tf.keras.layers.BatchNormalization(axis=-1)
      
      #leaky relu
      self.leaky_relu_x = tf.keras.layers.LeakyReLU()

      #CBM_1
      filters,kernel_size,strides,padding = NECK_info["CBM_1"]

      self.CBM_1 = CBM(filters,kernel_size,strides,padding)


   def call(self,inputs,train_flag=True):

      """
      input -- tensorflow layer with shape (m,n_H,n_W,n_C)
      """

      #CBL_X
      CBL_block = inputs
      
      for i in range(1,self.num_of_CBL+1):

         CBL_block = (self.CBL_seq[f"CBL_{i}"])(CBL_block,train_flag)


      #conv2D_1
      conv2D_1 = self.conv2D_1(CBL_block)

      #conv2D_2
      conv2D_2 = self.conv2D_2(inputs)

      #concat
      mid_concat = tf.keras.layers.concatenate(inputs=[conv2D_1,conv2D_2],axis=-1)

      #BN_x
      BN_x = self.BN_x(mid_concat,training=train_flag)

      #leaky relu
      leaky_relu_x = self.leaky_relu_x(BN_x)

      #CBM_1
      output_CBM = self.CBM_1(leaky_relu_x,train_flag)

      return output_CBM


#revised CSP
class rCSP(tf.keras.Model):


   def __init__(self,rCSP_info,**kwargs):

      """
      rCSP_info -- dictionary containing information:  CBL info 

                     - hpara: (filters,kernel_size,strides,padding)

                     
      Module Graph:
      
                   ------- CBL_2 --- CBL_3 --- CBL_4 --- SPP_1 --- CBL_5 -----
                   |                                                         |
                   |                                                         |______
         CBL_1  ---|                                                          ______  Concat --- CBL_7
                   |                                                         |
                   |                                                         |
                   --------------------------- CBL_6 -------------------------
         
      """
      #initialization
      super(rCSP,self).__init__(**kwargs)

      #CBL_1
      filters,kernel_size,strides,padding = rCSP_info["CBL_1"]
      
      self.CBL_1 = CBL(filters,kernel_size,strides,padding)
      
      #CBL_2
      filters,kernel_size,strides,padding = rCSP_info["CBL_2"]

      self.CBL_2 = CBL(filters,kernel_size,strides,padding)

      #CBL_3
      filters,kernel_size,strides,padding = rCSP_info["CBL_3"]

      self.CBL_3 = CBL(filters,kernel_size,strides,padding)

      #CBL_4
      filters,kernel_size,strides,padding = rCSP_info["CBL_4"]

      self.CBL_4 = CBL(filters,kernel_size,strides,padding)     

      #SPP
      self.SPP_1 = SPP()

      #CBL_5
      filters,kernel_size,strides,padding = rCSP_info["CBL_5"]

      self.CBL_5 = CBL(filters,kernel_size,strides,padding)

      #CBL_6
      filters,kernel_size,strides,padding = rCSP_info["CBL_6"]

      self.CBL_6 = CBL(filters,kernel_size,strides,padding)

      #CBL_7
      filters,kernel_size,strides,padding = rCSP_info["CBL_7"]

      self.CBL_7 = CBL(filters,kernel_size,strides,padding)
      
   def call(self,inputs,train_flag=True):

      """
      input -- tensorflow layer with shape (m,n_H,n_W,n_C)
      """

      #CBL_1
      CBL_1 = self.CBL_1(inputs,train_flag)

      #CBL_2
      CBL_2 = self.CBL_2(CBL_1,train_flag)

      #CBL_3
      CBL_3 = self.CBL_3(CBL_2,train_flag)

      #CBL_4
      CBL_4 = self.CBL_4(CBL_3,train_flag)

      #SPP_1
      SPP_1 = self.SPP_1(CBL_4)

      #CBL_5
      CBL_5 = self.CBL_5(SPP_1,train_flag)

      #CBL_6
      CBL_6 = self.CBL_6(CBL_1,train_flag)

      #concat
      mid_concat = tf.keras.layers.concatenate(inputs=[CBL_6,CBL_5],axis=-1)

      #CBL_7
      output_CBL_7 = self.CBL_7(mid_concat,train_flag)

      
      return output_CBL_7



In [None]:
class res_unit(tf.keras.Model):

   def __init__(self,block_info,**kwargs):

      """
      block_info -- dictionary containing blocks' hyperparameters (filters,kernel_size,strides,padding)

      Module Graph:

      ------ CBM_1 ------ CBM_2 ------ Add
         |                              |
         |                              |
         |                              |
         --------------------------------

      
      """

      #initialization
      super(res_unit,self).__init__(**kwargs)
      
      #1st CBM block
      filters,kernel_size,strides,padding = block_info["CBM_1"]
      
      self.CBM_1 = CBM(filters,kernel_size,strides,padding)

      #2nd CBM block
      filters,kernel_size,strides,padding = block_info["CBM_2"]

      self.CBM_2 = CBM(filters,kernel_size,strides,padding)

      #Add Layer
      self.Add_layer = tf.keras.layers.Add()


   def call(self,inputs,train_flag=True):

      x = inputs

      #1st CBM block
      CBM_1 = self.CBM_1(inputs,train_flag)

      #2nd CBM block
      CBM_2 = self.CBM_2(CBM_1,train_flag)

      #Add Layer
      output_shortcut = self.Add_layer([CBM_2,x])

      return output_shortcut


In [None]:
class SPP(tf.keras.Model):

   def __init__(self,**kwargs):

      #initialization
      super(SPP,self).__init__(**kwargs)

      #define layers
      self.maxpool_5x5 = tf.keras.layers.MaxPooling2D(pool_size=5,strides=1,padding="same",data_format="channels_last")

      self.maxpool_9x9 = tf.keras.layers.MaxPooling2D(pool_size=9,strides=1,padding="same",data_format="channels_last")

      self.maxpool_13x13 = tf.keras.layers.MaxPooling2D(pool_size=13,strides=1,padding="same",data_format="channels_last")


   def call(self,inputs):

      """
      input -- tensorflow layer with shape (m,n_H,n_W,n_C)
      """
      
      #5x5
      maxpool_5x5 = self.maxpool_5x5(inputs)

      #9x9
      maxpool_9x9 = self.maxpool_9x9(inputs)

      #13x13
      maxpool_13x13 = self.maxpool_13x13(inputs)

      #concatenate
      output_concat = tf.keras.layers.concatenate(inputs=[maxpool_5x5,maxpool_9x9,maxpool_13x13,inputs],axis=-1)

      return output_concat


In [None]:
#TCBL Module
class TCBL(tf.keras.Model):

   def __init__(self,filters=32,kernel_size=3,strides=1,padding="valid",**kwargs):

      #initialization
      super(TCBL,self).__init__(**kwargs)

      #define layers
      self.conv2D_transpose_x = tf.keras.layers.Conv2DTranspose(filters=filters,kernel_size=kernel_size,strides=strides,padding=padding,data_format="channels_last")

      self.BN_x = tf.keras.layers.BatchNormalization(axis=-1)

      self.output_leaky_relu = tf.keras.layers.LeakyReLU()

      

   def call(self,inputs,train_flag=True):

      """
      input -- tensorflow layer with shape (m,n_H,n_W,n_C)
      """

      #Transpose Convolution 2D layer
      conv2D_transpose_x = self.conv2D_transpose_x(inputs)

      #Batch Normalization layer
      BN_x = self.BN_x(conv2D_transpose_x,training=train_flag)

      #activate by Mish
      output_leaky_relu = self.output_leaky_relu(BN_x)

      return output_leaky_relu

In [None]:
#TCBM Module
class TCBM(tf.keras.Model):

   def __init__(self,filters=32,kernel_size=3,strides=2,padding="valid",**kwargs):

      #initialization
      super(TCBM,self).__init__(**kwargs)

      #define layers
      self.conv2D_transpose_x = tf.keras.layers.Conv2DTranspose(filters=filters,kernel_size=kernel_size,strides=strides,padding=padding,data_format="channels_last")

      self.BN_x = tf.keras.layers.BatchNormalization(axis=-1)

      self.output_Mish = tf.keras.layers.Lambda(Mish)

      

   def call(self,inputs,train_flag=True):

      """
      input -- tensorflow layer with shape (m,n_H,n_W,n_C)
      """

      #Transpose Convolution 2D layer
      conv2D_transpose_x = self.conv2D_transpose_x(inputs)

      #Batch Normalization layer
      BN_x = self.BN_x(conv2D_transpose_x,training=train_flag)

      #activate by Mish
      output_Mish = self.output_Mish(BN_x)

      return output_Mish

In [None]:
class alpha_model(tf.keras.Model):

   def __init__(self,**kwargs):

      #initialization
      super(alpha_model,self).__init__(**kwargs)

      #$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$
      
      #CBM_1 in : 640 x 640 x 3 out: 640 x 640 x 32
      filters=32
      kernel_size=3
      strides=1
      padding="same"

      self.CBM_1 = CBM(filters,kernel_size,strides,padding)

      #$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$

      #CSP_1 in : 640 x 640 x 32 out: 319 x 319 x 64
      CSPX_info = {}

      #num_of_res_unit
      CSPX_info["num_of_res_unit"] = 1

      #res_unit_info

      res_unit_1 = {}
      res_unit_1["CBM_1"] = (32,1,1,"same")
      res_unit_1["CBM_2"] = (64,3,1,"same")
      CSPX_info["res_unit_1"] = res_unit_1


      #CBL_1
      CSPX_info["CBL_1"] = (64,3,2,"valid")

      #CBL_2
      CSPX_info["CBL_2"] = (64,1,1,"same")

      #CBL_3
      CSPX_info["CBL_3"] = (64,1,1,"same")

      #CBL_4
      CSPX_info["CBL_4"] = (64,1,1,"same")

      #CBM_1
      CSPX_info["CBM_1"] = (64,1,1,"same")

      #define CSP1
      self.CSP1 = CSPX(CSPX_info)

      #$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$

      #CSP2 in : 319 x 319 x 64 out : 159 x 159 x 128
      CSPX_info = {}

      #num_of_res_unit
      CSPX_info["num_of_res_unit"] = 2

      #res_unit_info
      res_unit_1 = {}
      res_unit_1["CBM_1"] = (64,1,1,"same")
      res_unit_1["CBM_2"] = (64,3,1,"same")
      CSPX_info["res_unit_1"] = res_unit_1

      res_unit_2 = {}
      res_unit_2["CBM_1"] = (64,1,1,"same")
      res_unit_2["CBM_2"] = (64,3,1,"same")
      CSPX_info["res_unit_2"] = res_unit_2

      #CBL_1
      CSPX_info["CBL_1"] = (128,3,2,"valid")

      #CBL_2
      CSPX_info["CBL_2"] = (64,1,1,"same")

      #CBL_3
      CSPX_info["CBL_3"] = (64,1,1,"same")

      #CBL_4
      CSPX_info["CBL_4"] = (64,1,1,"same")

      #CBM_1
      CSPX_info["CBM_1"] = (128,1,1,"same")

      #define CSP2
      self.CSP2 = CSPX(CSPX_info)

      #$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$

      #CSP8_1 in : 159 x 159 x 128  out : 79 x 79 x 256 -- branch 1
      CSPX_info = {}

      #num_of_res_unit
      CSPX_info["num_of_res_unit"] = 8

      #res_unit_info

      res_unit_1 = {}
      res_unit_1["CBM_1"] = (128,1,1,"same")
      res_unit_1["CBM_2"] = (128,3,1,"same")
      CSPX_info["res_unit_1"] = res_unit_1

      res_unit_2 = {}
      res_unit_2["CBM_1"] = (128,1,1,"same")
      res_unit_2["CBM_2"] = (128,3,1,"same")
      CSPX_info["res_unit_2"] = res_unit_2

      res_unit_3 = {}
      res_unit_3["CBM_1"] = (128,1,1,"same")
      res_unit_3["CBM_2"] = (128,3,1,"same")
      CSPX_info["res_unit_3"] = res_unit_3

      res_unit_4 = {}
      res_unit_4["CBM_1"] = (128,1,1,"same")
      res_unit_4["CBM_2"] = (128,3,1,"same")
      CSPX_info["res_unit_4"] = res_unit_4

      res_unit_5 = {}
      res_unit_5["CBM_1"] = (128,1,1,"same")
      res_unit_5["CBM_2"] = (128,3,1,"same")
      CSPX_info["res_unit_5"] = res_unit_5

      res_unit_6 = {}
      res_unit_6["CBM_1"] = (128,1,1,"same")
      res_unit_6["CBM_2"] = (128,3,1,"same")
      CSPX_info["res_unit_6"] = res_unit_6

      res_unit_7 = {}
      res_unit_7["CBM_1"] = (128,1,1,"same")
      res_unit_7["CBM_2"] = (128,3,1,"same")
      CSPX_info["res_unit_7"] = res_unit_7

      res_unit_8 = {}
      res_unit_8["CBM_1"] = (128,1,1,"same")
      res_unit_8["CBM_2"] = (128,3,1,"same")
      CSPX_info["res_unit_8"] = res_unit_8

      #CBL_1
      CSPX_info["CBL_1"] = (256,3,2,"valid")

      #CBL_2
      CSPX_info["CBL_2"] = (128,1,1,"same")

      #CBL_3
      CSPX_info["CBL_3"] = (128,1,1,"same")

      #CBL_4
      CSPX_info["CBL_4"] = (128,1,1,"same")

      #CBM_1
      CSPX_info["CBM_1"] = (256,1,1,"same")

      #define CSP8_1 -- branch_1
      self.CSP8_branch_1 = CSPX(CSPX_info)

      #$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$

      #CSP8_2 in : 79 x 79 x 256 out : 39 x 39 x 512 --- branch_2
      CSPX_info = {}

      #num_of_res_unit
      CSPX_info["num_of_res_unit"] = 8

      #res_unit_info

      res_unit_1 = {}
      res_unit_1["CBM_1"] = (256,1,1,"same")
      res_unit_1["CBM_2"] = (256,3,1,"same")
      CSPX_info["res_unit_1"] = res_unit_1

      res_unit_2 = {}
      res_unit_2["CBM_1"] = (256,1,1,"same")
      res_unit_2["CBM_2"] = (256,3,1,"same")
      CSPX_info["res_unit_2"] = res_unit_2

      res_unit_3 = {}
      res_unit_3["CBM_1"] = (256,1,1,"same")
      res_unit_3["CBM_2"] = (256,3,1,"same")
      CSPX_info["res_unit_3"] = res_unit_3

      res_unit_4 = {}
      res_unit_4["CBM_1"] = (256,1,1,"same")
      res_unit_4["CBM_2"] = (256,3,1,"same")
      CSPX_info["res_unit_4"] = res_unit_4

      res_unit_5 = {}
      res_unit_5["CBM_1"] = (256,1,1,"same")
      res_unit_5["CBM_2"] = (256,3,1,"same")
      CSPX_info["res_unit_5"] = res_unit_5

      res_unit_6 = {}
      res_unit_6["CBM_1"] = (256,1,1,"same")
      res_unit_6["CBM_2"] = (256,3,1,"same")
      CSPX_info["res_unit_6"] = res_unit_6

      res_unit_7 = {}
      res_unit_7["CBM_1"] = (256,1,1,"same")
      res_unit_7["CBM_2"] = (256,3,1,"same")
      CSPX_info["res_unit_7"] = res_unit_7

      res_unit_8 = {}
      res_unit_8["CBM_1"] = (256,1,1,"same")
      res_unit_8["CBM_2"] = (256,3,1,"same")
      CSPX_info["res_unit_8"] = res_unit_8



      #CBL_1
      CSPX_info["CBL_1"] = (512,3,2,"valid")

      #CBL_2
      CSPX_info["CBL_2"] = (256,1,1,"same")

      #CBL_3
      CSPX_info["CBL_3"] = (256,1,1,"same")

      #CBL_4
      CSPX_info["CBL_4"] = (256,1,1,"same")

      #CBM_1
      CSPX_info["CBM_1"] = (512,1,1,"same")

      #define CSP8_2 --- branch_2
      self.CSP8_branch_2 = CSPX(CSPX_info)

      #$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$

      #CSP4 in : 39 x 39 x 512 out : 19 x 19 x 1024
      CSPX_info = {}

      #num_of_res_unit
      CSPX_info["num_of_res_unit"] = 4

      #res_unit_info

      res_unit_1 = {}
      res_unit_1["CBM_1"] = (512,1,1,"same")
      res_unit_1["CBM_2"] = (512,3,1,"same")
      CSPX_info["res_unit_1"] = res_unit_1

      res_unit_2 = {}
      res_unit_2["CBM_1"] = (512,1,1,"same")
      res_unit_2["CBM_2"] = (512,3,1,"same")
      CSPX_info["res_unit_2"] = res_unit_2

      res_unit_3 = {}
      res_unit_3["CBM_1"] = (512,1,1,"same")
      res_unit_3["CBM_2"] = (512,3,1,"same")
      CSPX_info["res_unit_3"] = res_unit_3

      res_unit_4 = {}
      res_unit_4["CBM_1"] = (512,1,1,"same")
      res_unit_4["CBM_2"] = (512,3,1,"same")
      CSPX_info["res_unit_4"] = res_unit_4


      #CBL_1
      CSPX_info["CBL_1"] = (1024,3,2,"valid")

      #CBL_2
      CSPX_info["CBL_2"] = (512,1,1,"same")

      #CBL_3
      CSPX_info["CBL_3"] = (512,1,1,"same")

      #CBL_4
      CSPX_info["CBL_4"] = (512,1,1,"same")

      #CBM_1
      CSPX_info["CBM_1"] = (1024,1,1,"same")

      #define CSP4
      self.CSP4 = CSPX(CSPX_info)

      #$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$

      #CBL_1 neck in : 19 x 19 x 1024 out : 19 x 19 x 512
      self.CBL_1_neck = CBL(512,1,1,"same")

      #CBL_2 neck in : 19 x 19 x 512 out : 19 x 19 x 1024
      self.CBL_2_neck = CBL(1024,3,1,"same")

      #CBL_3 neck in : 19 x 19 x 1024 out : 19 x 19 x 512
      self.CBL_3_neck = CBL(512,1,1,"same")

      #SPP_neck in:19 x 19 x 512 out: 19 x 19 x 2048
      self.SPP_neck = SPP()

      #CBL_4 neck in : 19 x 19 x 2048 out : 19 x 19 x 512
      self.CBL_4_neck = CBL(512,1,1,"same")

      #CBL_5 neck in : 19 x 19 x 512 out : 19 x 19 x 1024
      self.CBL_5_neck = CBL(1024,3,1,"same")

      #CBL_6 branch 3 in : 19 x 19 x 1024 out : 19 x 19 x 512 -- branch 3
      self.CBL_6_branch_3 = CBL(512,1,1,"same")

      #$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$
      
      #CBL 7 neck in : 19 x 19 x 512 out : 19 x 19 x 256
      self.CBL_7_neck = CBL(256,1,1,"same")

      #TCBL1 in : 19 x 19 x 256 out: 39 x 39 x 256
      self.TCBL1 = TCBL(256,21,1,"valid")

      #CBL_connect_branch_2 in : 39 x 39 x 512 out: 39 x 39 x 256
      self.CBL_connect_branch_2 = CBL(256,1,1,"same")

      #concat TCBL1 -- CBL_connect_branch_2 , out: 39 x 39 x 512

      #$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$

      #CBL_1 phase1 in : 39 x 39 x 512 out : 39 x 39 x 256
      self.CBL_1_phase1 = CBL(256,1,1,"same")

      #CBL_2 phase1 in : 39 x 39 x 256 out : 39 x 39 x 512
      self.CBL_2_phase1 = CBL(512,3,1,"same")

      #CBL_3 phase1 in : 39 x 39 x 512 out : 39 x 39 x 256
      self.CBL_3_phase1 = CBL(256,1,1,"same")

      #CBL_4 phase1 in : 39 x 39 x 256 out : 39 x 39 x 512
      self.CBL_4_phase1 = CBL(512,3,1,"same")

      #CBL_5 phase1_branch_4 in : 39 x 39 x 512 out : 39 x 39 x 256 -- branch 4
      self.CBL_5_phase1_branch_4 = CBL(256,1,1,"same")

      #$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$
      
      #CBL_6 phase1 in : 39 x 39 x 256 out : 39 x 39 x 128
      self.CBL_6_phase1 = CBL(128,1,1,"same")

      #TCBL2 in : 39 x 39 x 128 out: 79 x 79 x 128
      self.TCBL2 = TCBL(128,41,1,"valid")

      #CBL_connect_branch_1 in : 79 x 79 x 256 out: 79 x 79 x 128
      self.CBL_connect_branch_1 = CBL(128,1,1,"same")

      #concat TCBL2 -- CBL_connect_branch_1 , out: 79 x 79 x 256

      #$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$

      #CBL_1 phase2 in : 79 x 79 x 256 out : 79 x 79 x 128
      self.CBL_1_phase2 = CBL(128,1,1,"same")

      #CBL_2 phase2 in : 79 x 79 x 128 out : 79 x 79 x 256
      self.CBL_2_phase2 = CBL(256,3,1,"same")

      #CBL_3 phase2 in : 79 x 79 x 256 out : 79 x 79 x 128
      self.CBL_3_phase2 = CBL(128,1,1,"same")

      #CBL_4 phase2 in : 79 x 79 x 128 out : 79 x 79 x 256
      self.CBL_4_phase2 = CBL(256,3,1,"same")

      #CBL_5 phase2_branch_5 in : 79 x 79 x 256 out : 79 x 79 x 128 -- branch 5
      self.CBL_5_phase2_branch_5 = CBL(128,1,1,"same")

      #$#$#$#$#$#$#$#$#$#$#$#$#$# small output $#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$

      #TCBL_small in : 79 x 79 x 128 out : 80 x 80 x 128
      self.TCBL_small = TCBL(128,2,1,"valid")

      #prob info

      #CBL_prob_class_small_1 in : 80 x 80 x 128 out : 80 x 80 x 256
      self.CBL_prob_class_small_1 = CBL(256,3,1,"same")

      #CBL_prob_class_small_2 in : 80 x 80 x 256 out : 80 x 80 x 256
      self.CBL_prob_class_small_2 = CBL(256,1,1,"same")

      #conv_prob_small in : 80 x 80 x 256 out : 80 x 80 x 1
      self.conv_prob_small = tf.keras.layers.Conv2D(filters=1,kernel_size=1,strides=1,padding="same",data_format="channels_last",activation="sigmoid")

      #conv_class_small in : 80 x 80 x 256 out : 80 x 80 x 80
      self.conv_class_small = tf.keras.layers.Conv2D(filters=80,kernel_size=1,strides=1,padding="same",data_format="channels_last",activation="sigmoid")
      
      #Reg info
      
      #CBL_left_center_small_1 in : 80 x 80 x 128 out : 80 x 80 x 256
      self.CBL_left_center_small_1 = CBL(256,3,1,"same")

      #CBL_left_center_small_2 in : 80 x 80 x 256 out : 80 x 80 x 256
      self.CBL_left_center_small_2 = CBL(256,1,1,"same")

      #conv_pos_info_small in : 80 x 80 x 256 out : 80 x 80 x 4
      self.conv_pos_info_small = tf.keras.layers.Conv2D(filters=4,kernel_size=1,strides=1,padding="same",data_format="channels_last",activation=tf.keras.layers.LeakyReLU())

      #concat conv_prob_small -- conv_pos_info_small -- conv_class_small , out: 80 x 80 x 85

      #$#$#$#$#$#$#$#$#$#$#$#$#$# small output $#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$

      #CBL_connect_branch_5 in : 79 x 79 x 128 out : 39 x 39 x 256
      self.CBL_connect_branch_5 = CBL(256,3,2,"valid")

      #concat CBL_connect_branch_5 -- CBL_5_phase1_branch_4 , out: 39 x 39 x 512

      #$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$

      #CBL_1 phase3 in : 39 x 39 x 512 out : 39 x 39 x 256
      self.CBL_1_phase3 = CBL(256,1,1,"same")

      #CBL_2 phase3 in : 39 x 39 x 256 out : 39 x 39 x 512
      self.CBL_2_phase3 = CBL(512,3,1,"same")

      #CBL_3 phase3 in : 39 x 39 x 512 out : 39 x 39 x 256
      self.CBL_3_phase3 = CBL(256,1,1,"same")

      #CBL_4 phase3 in : 39 x 39 x 256 out : 39 x 39 x 512
      self.CBL_4_phase3 = CBL(512,3,1,"same")

      #CBL_5 phase3_branch_6 in : 39 x 39 x 512 out : 39 x 39 x 256 -- branch 6
      self.CBL_5_phase3_branch_6 = CBL(256,1,1,"same")

      #$#$#$#$#$#$#$#$#$#$#$#$#$# medium output $#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$

      #TCBL_medium in : 39 x 39 x 256 out : 40 x 40 x 256
      self.TCBL_medium = TCBL(128,2,1,"valid")

      #prob info

      #CBL_prob_class_medium_1 in : 40 x 40 x 256 out : 40 x 40 x 512
      self.CBL_prob_class_medium_1 = CBL(512,3,1,"same")

      #CBL_prob_class_medium_2 in : 40 x 40 x 512 out : 40 x 40 x 512
      self.CBL_prob_class_medium_2 = CBL(512,1,1,"same")

      #conv_prob_medium in : 40 x 40 x 512 out : 40 x 40 x 1
      self.conv_prob_medium = tf.keras.layers.Conv2D(filters=1,kernel_size=1,strides=1,padding="same",data_format="channels_last",activation="sigmoid")

      #conv_class_medium in : 40 x 40 x 512 out : 40 x 40 x 80
      self.conv_class_medium = tf.keras.layers.Conv2D(filters=80,kernel_size=1,strides=1,padding="same",data_format="channels_last",activation="sigmoid")


      #Reg info
      
      #CBL_left_center_medium_1 in : 40 x 40 x 256 out : 40 x 40 x 512
      self.CBL_left_center_medium_1 = CBL(512,3,1,"same")

      #CBL_left_center_medium_2 in :  40 x 40 x 512 out :  40 x 40 x 512
      self.CBL_left_center_medium_2 = CBL(512,1,1,"same")

      #conv_pos_info_medium in : 40 x 40 x 512 out : 40 x 40 x 4
      self.conv_pos_info_medium = tf.keras.layers.Conv2D(filters=4,kernel_size=1,strides=1,padding="same",data_format="channels_last",activation=tf.keras.layers.LeakyReLU())

      #concat conv_prob_medium -- conv_pos_info_medium -- conv_class_medium , out: 40 x 40 x 85

      #$#$#$#$#$#$#$#$#$#$#$#$#$# medium output $#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$

      #CBL_connect_branch_6 in : 39 x 39 x 256 out : 19 x 19 x 512
      self.CBL_connect_branch_6 = CBL(512,3,2,"valid")

      #concat CBL_connect_branch_6 -- CBL_6_branch_3 , out: 19 x 19 x 1024

      #$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$

      #CBL_1 phase4 in : 19 x 19 x 1024 out : 19 x 19 x 512
      self.CBL_1_phase4 = CBL(512,1,1,"same")

      #CBL_2 phase4 in : 19 x 19 x 512 out : 19 x 19 x 1024
      self.CBL_2_phase4 = CBL(1024,3,1,"same")

      #CBL_3 phase4 in : 19 x 19 x 1024 out : 19 x 19 x 512
      self.CBL_3_phase4 = CBL(512,1,1,"same")

      #CBL_4 phase4 in : 19 x 19 x 512 out : 19 x 19 x 1024
      self.CBL_4_phase4 = CBL(1024,3,1,"same")

      #CBL_5 phase4 in : 19 x 19 x 1024 out : 19 x 19 x 512 
      self.CBL_5_phase4 = CBL(512,1,1,"same")

      
      #$#$#$#$#$#$#$#$#$#$#$#$#$# large output $#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$
      
      #TCBL_large in : 19 x 19 x 512 out : 20 x 20 x 512
      self.TCBL_large = TCBL(512,2,1,"valid")

      #prob info

      #CBL_prob_class_large_1 in : 20 x 20 x 512 out : 20 x 20 x 1024
      self.CBL_prob_class_large_1 = CBL(1024,3,1,"same")

      #CBL_prob_class_large_2 in : 20 x 20 x 1024 out : 20 x 20 x 1024
      self.CBL_prob_class_large_2 = CBL(1024,1,1,"same")

      #conv_prob_large in : 20 x 20 x 512 out : 20 x 20 x 1
      self.conv_prob_large = tf.keras.layers.Conv2D(filters=1,kernel_size=1,strides=1,padding="same",data_format="channels_last",activation="sigmoid")

      #conv_class_large in : 20 x 20 x 512 out : 20 x 20 x 80
      self.conv_class_large = tf.keras.layers.Conv2D(filters=80,kernel_size=1,strides=1,padding="same",data_format="channels_last",activation="sigmoid")


      #Reg info
      
      #CBL_left_center_large_1 in : 20 x 20 x 512 out : 20 x 20 x 1024
      self.CBL_left_center_large_1 = CBL(1024,3,1,"same")

      #CBL_left_center_large_2 in :  20 x 20 x 1024 out :  20 x 20 x 1024
      self.CBL_left_center_large_2 = CBL(1024,1,1,"same")

      #conv_pos_info_large in : 20 x 20 x 1024 out : 20 x 20 x 4
      self.conv_pos_info_large = tf.keras.layers.Conv2D(filters=4,kernel_size=1,strides=1,padding="same",data_format="channels_last",activation=tf.keras.layers.LeakyReLU())

      #concat conv_prob_large -- conv_pos_info_large -- conv_class_large , out: 20 x 20 x 85

      #$#$#$#$#$#$#$#$#$#$#$#$#$# large output $#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$
      
      
   def call(self,inputs,train_flag=True):

      #CBM1
      CBM_1 = self.CBM_1(inputs,train_flag)

      #CSP1
      CSP1 = self.CSP1(CBM_1,train_flag)

      #CSP2
      CSP2 = self.CSP2(CSP1,train_flag)

      #CSP8_branch_1 ----------------------------------------- branch 1
      CSP8_branch_1 = self.CSP8_branch_1(CSP2,train_flag)

      #CSP8_branch_2 ----------------------------------------- branch 2
      CSP8_branch_2 = self.CSP8_branch_2(CSP8_branch_1,train_flag)

      #CSP4
      CSP4 = self.CSP4(CSP8_branch_2,train_flag)

      #CBL_1_neck
      CBL_1_neck = self.CBL_1_neck(CSP4,train_flag)

      #CBL_2_neck
      CBL_2_neck = self.CBL_2_neck(CBL_1_neck,train_flag)

      #CBL_3_neck
      CBL_3_neck = self.CBL_3_neck(CBL_2_neck,train_flag)

      #SPP_neck
      SPP_neck = self.SPP_neck(CBL_3_neck)

      #CBL_4_neck
      CBL_4_neck = self.CBL_4_neck(SPP_neck,train_flag)

      #CBL_5_neck
      CBL_5_neck = self.CBL_5_neck(CBL_4_neck,train_flag)

      #CBL_6_branch_3 ----------------------------------------- branch 3
      CBL_6_branch_3 = self.CBL_6_branch_3(CBL_5_neck,train_flag)

      #CBL_7_neck
      CBL_7_neck = self.CBL_7_neck(CBL_6_branch_3,train_flag)

      #TCBL1
      TCBL1 = self.TCBL1(CBL_7_neck,train_flag)

      #CBL_connect_branch_2
      CBL_connect_branch_2 = self.CBL_connect_branch_2(CSP8_branch_2,train_flag)

      #concat TCBL1 -- CBL_connect_branch_2
      concat_TCBL1_CBL_connect_branch_2 = tf.keras.layers.concatenate(inputs=[TCBL1,CBL_connect_branch_2],axis=-1)

      #CBL_1 phase1
      CBL_1_phase1 = self.CBL_1_phase1(concat_TCBL1_CBL_connect_branch_2,train_flag)

      #CBL_2_phase1
      CBL_2_phase1 = self.CBL_2_phase1(CBL_1_phase1,train_flag)

      #CBL_3_phase1
      CBL_3_phase1 = self.CBL_3_phase1(CBL_2_phase1,train_flag)

      #CBL_4_phase1
      CBL_4_phase1 = self.CBL_4_phase1(CBL_3_phase1,train_flag)

      #CBL_5_phase1_branch_4 ----------------------------------------- branch 4
      CBL_5_phase1_branch_4 = self.CBL_5_phase1_branch_4(CBL_4_phase1,train_flag)

      #CBL_6_phase1
      CBL_6_phase1 = self.CBL_6_phase1(CBL_5_phase1_branch_4,train_flag)

      #TCBL2
      TCBL2 = self.TCBL2(CBL_6_phase1,train_flag)

      #CBL_connect_branch_1
      CBL_connect_branch_1 = self.CBL_connect_branch_1(CSP8_branch_1,train_flag)

      #concat TCBL2 -- CBL_connect_branch_1 , out: 79 x 79 x 256
      concat_TCBL2_CBL_connect_branch_1 = tf.keras.layers.concatenate(inputs=[TCBL2,CBL_connect_branch_1],axis=-1)

      #CBL_1_phase2
      CBL_1_phase2 = self.CBL_1_phase2(concat_TCBL2_CBL_connect_branch_1,train_flag)

      #CBL_2_phase2
      CBL_2_phase2 = self.CBL_2_phase2(CBL_1_phase2,train_flag)

      #CBL_3_phase2
      CBL_3_phase2 = self.CBL_3_phase2(CBL_2_phase2,train_flag)

      #CBL_4_phase2
      CBL_4_phase2 = self.CBL_4_phase2(CBL_3_phase2,train_flag)

      #CBL_5_phase2_branch_5 ----------------------------------------- branch 5
      CBL_5_phase2_branch_5 = self.CBL_5_phase2_branch_5(CBL_4_phase2,train_flag)

      #$#$#$#$#$#$#$#$#$#$#$#$#$# small output $#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$

      #TCBL_small
      TCBL_small = self.TCBL_small(CBL_5_phase2_branch_5,train_flag)

      #prob info

      #CBL_prob_class_1 
      CBL_prob_class_small_1 = self.CBL_prob_class_small_1(TCBL_small,train_flag)

      #CBL_prob_class_2 in : 80 x 80 x 256 out : 80 x 80 x 256
      CBL_prob_class_small_2 = self.CBL_prob_class_small_2(CBL_prob_class_small_1,train_flag)

      #CBL_prob_small 
      conv_prob_small = self.conv_prob_small(CBL_prob_class_small_2)

      #CBL_class_small 
      conv_class_small = self.conv_class_small(CBL_prob_class_small_2)

      #reg info
      
      #CBL_left_center_small_1
      CBL_left_center_small_1 = self.CBL_left_center_small_1(TCBL_small,train_flag)

      #CBL_left_center_small_2
      CBL_left_center_small_2 = self.CBL_left_center_small_2(CBL_left_center_small_1,train_flag)

      #conv_pos_info_small
      conv_pos_info_small = self.conv_pos_info_small(CBL_left_center_small_2)

      #concat conv_prob_small -- conv_pos_info_small -- conv_class_small , out: 80 x 80 x 85
      output_small = tf.keras.layers.concatenate(inputs=[conv_prob_small,conv_pos_info_small,conv_class_small],axis=-1,name="output_small")

      #$#$#$#$#$#$#$#$#$#$#$#$#$# small output $#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$

      #CBL_connect_branch_5
      CBL_connect_branch_5 = self.CBL_connect_branch_5(CBL_5_phase2_branch_5,train_flag)

      #concat CBL_connect_branch_5 -- CBL_5_phase1_branch_4 
      concat_CBL_connect_branch_5_CBL_5_phase1_branch_4 = tf.keras.layers.concatenate(inputs=[CBL_connect_branch_5,CBL_5_phase1_branch_4],axis=-1)
      

      #CBL_1 phase3 
      CBL_1_phase3 = self.CBL_1_phase3(concat_CBL_connect_branch_5_CBL_5_phase1_branch_4,train_flag)

      #CBL_2 phase3 
      CBL_2_phase3 = self.CBL_2_phase3(CBL_1_phase3,train_flag)

      #CBL_3 phase3 
      CBL_3_phase3 = self.CBL_3_phase3(CBL_2_phase3,train_flag)

      #CBL_4 phase3 
      CBL_4_phase3 = self.CBL_4_phase3(CBL_3_phase3,train_flag)

      #CBL_5 phase3_branch_6 ----------------------------------------- branch 6
      CBL_5_phase3_branch_6  = self.CBL_5_phase3_branch_6(CBL_4_phase3,train_flag)

      #$#$#$#$#$#$#$#$#$#$#$#$#$# medium output $#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$
      
      #TCBL_medium
      TCBL_medium = self.TCBL_medium(CBL_5_phase3_branch_6,train_flag)

      #prob info

      #CBL_prob_class_medium_1 
      CBL_prob_class_medium_1 = self.CBL_prob_class_medium_1(TCBL_medium,train_flag)

      #CBL_prob_class_medium_2
      CBL_prob_class_medium_2 = self.CBL_prob_class_medium_2(CBL_prob_class_medium_1,train_flag)

      #conv_prob_medium 
      conv_prob_medium = self.conv_prob_medium(CBL_prob_class_medium_2)

      #conv_class_medium 
      conv_class_medium = self.conv_class_medium(CBL_prob_class_medium_2)


      #Reg info
      
      #CBL_left_center_medium_1 
      CBL_left_center_medium_1 = self.CBL_left_center_medium_1(TCBL_medium,train_flag)

      #CBL_left_center_medium_2 
      CBL_left_center_medium_2 = self.CBL_left_center_medium_2(CBL_left_center_medium_1,train_flag)

      #conv_pos_info_medium 
      conv_pos_info_medium = self.conv_pos_info_medium(CBL_left_center_medium_2)

      #concat conv_prob_medium -- conv_pos_info_medium -- conv_class_medium , out: 40 x 40 x 85
      output_medium = tf.keras.layers.concatenate(inputs=[conv_prob_medium,conv_pos_info_medium,conv_class_medium],axis=-1,name="output_medium")

      #$#$#$#$#$#$#$#$#$#$#$#$#$# medium output $#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$

      #CBL_connect_branch_6 
      CBL_connect_branch_6 = self.CBL_connect_branch_6(CBL_5_phase3_branch_6,train_flag)

      #concat CBL_connect_branch_6 -- CBL_6_branch_3
      concat_CBL_connect_branch_6_CBL_6_branch_3 = tf.keras.layers.concatenate(inputs=[CBL_connect_branch_6,CBL_6_branch_3],axis=-1)

      #CBL_1 phase4 
      CBL_1_phase4 = self.CBL_1_phase4(concat_CBL_connect_branch_6_CBL_6_branch_3,train_flag)

      #CBL_2 phase4 
      CBL_2_phase4 = self.CBL_2_phase4(CBL_1_phase4,train_flag)

      #CBL_3 phase4
      CBL_3_phase4 = self.CBL_3_phase4(CBL_2_phase4,train_flag)

      #CBL_4 phase4 
      CBL_4_phase4 = self.CBL_4_phase4(CBL_3_phase4,train_flag)

      #CBL_5 phase4 
      CBL_5_phase4 = self.CBL_5_phase4(CBL_4_phase4,train_flag)

      #$#$#$#$#$#$#$#$#$#$#$#$#$# large output $#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$
      
      #TCBL_large 
      TCBL_large = self.TCBL_large(CBL_5_phase4,train_flag)

      #prob info

      #CBL_prob_class_large_1 
      CBL_prob_class_large_1 = self.CBL_prob_class_large_1(TCBL_large,train_flag)

      #CBL_prob_class_large_2
      CBL_prob_class_large_2 = self.CBL_prob_class_large_2(CBL_prob_class_large_1,train_flag)

      #conv_prob_large 
      conv_prob_large = self.conv_prob_large(CBL_prob_class_large_2)

      #conv_class_large 
      conv_class_large = self.conv_class_large(CBL_prob_class_large_2)

      #Reg info
      
      #CBL_left_center_large_1 
      CBL_left_center_large_1 = self.CBL_left_center_large_1(TCBL_large,train_flag)

      #CBL_left_center_large_2 
      CBL_left_center_large_2 = self.CBL_left_center_large_2(CBL_left_center_large_1,train_flag)

      #conv_pos_info_large 
      conv_pos_info_large = self.conv_pos_info_large(CBL_left_center_large_2) 

      #concat conv_prob_large -- conv_pos_info_large -- conv_class_large , out: 20 x 20 x 85
      output_large = tf.keras.layers.concatenate(inputs=[conv_prob_large,conv_pos_info_large,conv_class_large],axis=-1,name="output_large")

      #$#$#$#$#$#$#$#$#$#$#$#$#$# large output $#$#$#$#$#$#$#$#$#$#$#$#$#$#$#$

      return [output_large,output_medium,output_small]

   def graph_model(self,dim):

      x = tf.keras.layers.Input(shape=dim)
      
      return tf.keras.Model(inputs=x,outputs=self.call(x))

In [None]:
initializer = tf.random_normal_initializer()
k = tf.Variable(initial_value=initializer(shape=(10,640,640,3),dtype="float32"))
in_x = tf.keras.layers.Input(shape=tuple(k.get_shape().as_list()[1:]))

y = alpha_model()
model = y.graph_model(tuple(in_x.get_shape().as_list()[1:]))
plot_model(model,show_shapes=False, show_layer_names=False)

In [None]:
class alpha_loss(tf.keras.losses.Loss):

  def __init__(self,gamma = 2,**kwargs):

    super(alpha_loss,self).__init__(**kwargs)

    self.gamma = gamma

  def call(self,y_true,y_pred):

    """
    y_true -- (batch_size,H,W,info) -- info [prob,x_left,y_left,x_center,y_center,class]
    y_pred -- (batch_size,H,W,info) -- info [prob,x_left,y_left,x_center,y_center,class]

    """

    #get object mask
    object_mask = K.cast(y_true[:,:,:,0:1],K.dtype(y_pred))

    object_mask_bool = K.cast(object_mask,dtype=tf.bool)

    #get ignore mask
    ignore_mask  = tf.TensorArray(K.dtype(y_pred),size=1,dynamic_size=True)

    ignore_mask = get_ignore_mask(ignore_mask,y_true,y_pred,object_mask_bool)

    #get prob
    prob_true = y_true[:,:,:,0:1]
    prob_pred = y_pred[:,:,:,0:1]

    prob_true = K.cast(prob_true,K.dtype(prob_pred))

    #get class
    class_true = y_true[:,:,:,5:]
    class_pred = y_pred[:,:,:,5:]

    class_true = K.cast(class_true,K.dtype(class_pred))

    #****************** Focal loss ******************

    #get batch size
    m = K.cast(K.shape(y_pred)[0],K.dtype(y_pred))

    #clip the prediction
    #prob_pred = K.clip(prob_pred,min_value = 0.0, max_value = 1.0)
    #class_pred = K.clip(class_pred,min_value = 0.0, max_value = 1.0)

    #prob focal loss
    loss_tensor_prob =  - ( (1 - prob_pred[:,:,:,:])**self.gamma ) * prob_true[:,:,:,:] * tf.math.log( prob_pred[:,:,:,:] + 1e-18 ) - ( prob_pred[:,:,:,:] ** self.gamma ) * ( 1 - prob_true[:,:,:,:] ) * tf.math.log( 1 - prob_pred[:,:,:,:] + 1e-18 )
    
    pos_loss_tensor_prob = loss_tensor_prob[:,:,:,:] * object_mask[:,:,:,:]
    neg_loss_tensor_prob = loss_tensor_prob[:,:,:,:] * (1 - object_mask[:,:,:,:]) * ignore_mask[:,:,:,:]
    
    prob_focal_loss = K.sum( (pos_loss_tensor_prob[:,:,:,:] + neg_loss_tensor_prob[:,:,:,:]) )/ m

    #class focal loss
    loss_tensor_class =  - ( (1 - class_pred[:,:,:,:])**self.gamma ) * class_true[:,:,:,:] * tf.math.log( class_pred[:,:,:,:] + 1e-18 ) - ( class_pred[:,:,:,:] ** self.gamma) * ( 1 - class_true[:,:,:,:] ) * tf.math.log( 1 - class_pred[:,:,:,:] + 1e-18 )
    class_focal_loss = K.sum(loss_tensor_class[:,:,:,:]*object_mask[:,:,:,:]) / m


    #****************** Focal loss ******************

    #get reg left -- (x,y)
    reg_left_true = y_true[:,:,:,1:3] 
    reg_left_pred = y_pred[:,:,:,1:3]

    reg_left_true = K.cast(reg_left_true,K.dtype(reg_left_pred))

    #get reg center -- (x,y)
    reg_center_true = y_true[:,:,:,3:5] 
    reg_center_pred = y_pred[:,:,:,3:5]

    reg_center_true = K.cast(reg_center_true,K.dtype(reg_center_pred))

    #calculate width x height of anchor box
    reg_wh_true = (reg_center_true[:,:,:,:] - reg_left_true[:,:,:,:])*2
    reg_wh_pred = (reg_center_pred[:,:,:,:] - reg_left_pred[:,:,:,:])*2

    #get reg right
    reg_right_true = reg_left_true[:,:,:,:] + reg_wh_true[:,:,:,:]
    reg_right_pred = reg_left_pred[:,:,:,:] + reg_wh_pred[:,:,:,:]

    #get reg width
    reg_width_true = reg_wh_true[:,:,:,0:1] 
    reg_width_pred = reg_wh_pred[:,:,:,0:1]

    #get reg height
    reg_height_true = reg_wh_true[:,:,:,1:2]
    reg_height_pred = reg_wh_pred[:,:,:,1:2]

    
    #****************** Box Scale Entropy ******************
    
    #calculate mini width
    reg_width_mini = tf.math.minimum(reg_width_true,reg_width_pred)

    reg_width_mini = K.square(reg_width_mini)
    
    #calculate max width
    reg_width_max = tf.math.maximum(reg_width_true,reg_width_pred)

    reg_width_max = K.square(reg_width_max)
    

    #calculate width scale
    reg_width_scale = reg_width_mini[:,:,:,:] / ( reg_width_max[:,:,:,:] + 1e-10 ) 

    #----------------------------------------------------------------------
    
    #calculate mini height
    reg_height_mini = tf.math.minimum(reg_height_true,reg_height_pred)

    reg_height_mini = K.square(reg_height_mini)

    #calculate max height
    reg_height_max = tf.math.maximum(reg_height_true,reg_height_pred)

    reg_height_max = K.square(reg_height_max)


    #calculate height scale
    reg_height_scale =  reg_height_mini[:,:,:,:] / ( reg_height_max[:,:,:,:] + 1e-10 )

    #----------------------------------------------------------------------

    #Box Scale Entropy
    loss_tensor_Box_Scale_Entropy = - tf.math.log(reg_width_scale[:,:,:,:] + 1e-18)  -  tf.math.log(reg_height_scale[:,:,:,:] + 1e-18)

    box_scale_entropy_loss = K.sum(loss_tensor_Box_Scale_Entropy[:,:,:,:] * object_mask[:,:,:,:] ) / m
    

    #****************** Box Scale Entropy ******************
    
    #****************** IOU loss ******************
    
    #calculate IOU  

    #calculate intersection left
    reg_left_intersection = tf.math.maximum(reg_left_true,reg_left_pred)

    #calculate intersection right
    reg_right_intersection = tf.math.minimum(reg_right_true,reg_right_pred)

    #calibrate
    #reg_right_intersection = tf.where((reg_left_intersection>reg_right_intersection),reg_left_intersection,reg_right_intersection) #-- same meaning
    reg_right_intersection = tf.math.maximum(reg_left_intersection,reg_right_intersection) #-- same meaning

    #intersection width height
    intersection_wh = reg_right_intersection[:,:,:,:] - reg_left_intersection[:,:,:,:]

    #intersection area
    intersection_area = intersection_wh[:,:,:,0:1] * intersection_wh[:,:,:,1:2]

    #union area
    true_area = reg_wh_true[:,:,:,0:1] * reg_wh_true[:,:,:,1:2]
    pred_area = reg_wh_pred[:,:,:,0:1] * reg_wh_pred[:,:,:,1:2]

    union_area = true_area[:,:,:,:] + pred_area[:,:,:,:] - intersection_area[:,:,:,:]

    #calculate iou 
    iou_val = intersection_area[:,:,:,:] / ( union_area[:,:,:,:] +  1e-10 )

    #iou loss
    loss_tensor_iou = - tf.math.log( iou_val[:,:,:,:] + 1e-18 ) * object_mask[:,:,:,:] 

    iou_loss = K.sum( loss_tensor_iou ) / m

    #****************** IOU loss ******************
    
    #****************** Loc loss ******************
    loss_tensor_loc = ( K.square(reg_left_true[:,:,:,:] - reg_left_pred[:,:,:,:]) + K.square(reg_center_true[:,:,:,:] - reg_center_pred[:,:,:,:]) ) * object_mask[:,:,:,:] 

    #loc loss
    loc_loss = K.sum(loss_tensor_loc) / m

    #****************** Loc loss ******************

    #calculate reg loss
    reg_loss = loc_loss + iou_loss + box_scale_entropy_loss
 
    #calculate loss
    loss = prob_focal_loss + class_focal_loss + reg_loss
 
    return loss

def get_ignore_mask(ignore_mask,y_true,y_pred,object_mask_bool,ignore_threshold = 0.5):

  """
  y_true -- (batch_size,H,W,info) -- info [prob,x_left,y_left,x_center,y_center,class]
  y_pred -- (batch_size,H,W,info) -- info [prob,x_left,y_left,x_center,y_center,class]
  ignore_mask -- TensorArray
  """
  #set up
  y_true = K.cast(y_true,K.dtype(y_pred))

  m = K.cast(K.shape(y_true)[0],tf.int32)

  i = 0

  i = K.cast(i,K.dtype(m))

  #loop via each batch
  while i < m:

    #get true box -- shape (n,4)
    true_box = tf.boolean_mask(y_true[i,:,:,1:5],object_mask_bool[i,:,:,0])

    #get true box -- shape (1,n,4)
    true_box = true_box[tf.newaxis,:,:]

    #get y_pred_batch -- shape (h,w,4)
    y_pred_batch = y_pred[i,:,:,1:5]
    
    #get y_pred_batch -- shape (h,w,1,4)
    y_pred_batch = y_pred_batch[:,:,tf.newaxis,:]

    #****************** IOU ******************

    #batch pred -- (h,w,1,4)
    batch_pred_left_xy =  y_pred_batch[:,:,:,0:2]
    batch_pred_center_xy = y_pred_batch[:,:,:,2:4]
    
    batch_pred_wh = (batch_pred_center_xy[:,:,:,:] - batch_pred_left_xy[:,:,:,:])*2

    batch_pred_right_xy = batch_pred_left_xy[:,:,:,:] + batch_pred_wh[:,:,:,:] 

    #batch true -- (1,n,4)
    batch_true_left_xy = true_box[:,:,0:2]
    batch_true_center_xy = true_box[:,:,2:4]

    batch_true_wh =  (batch_true_center_xy[:,:,:] - batch_true_left_xy[:,:,:])*2

    batch_true_right_xy = batch_true_left_xy[:,:,:] + batch_true_wh[:,:,:]

    #intersection -- (h,w,n,2)
    batch_intersection_left = K.maximum(batch_pred_left_xy,batch_true_left_xy)
    batch_intersection_right = K.minimum(batch_pred_right_xy,batch_true_right_xy)

    #calibrate 
    batch_intersection_right = K.maximum(batch_intersection_right,batch_intersection_left)

    #batch intersection wh -- (h,w,n,2)
    batch_intersection_wh = batch_intersection_right[:,:,:,:] - batch_intersection_left[:,:,:,:]
    
    #batch intersection area -- (h,w,n)
    batch_intersection_area = batch_intersection_wh[:,:,:,0] * batch_intersection_wh[:,:,:,1]

    #batch union area -- (h,w,n)
    batch_true_union_area = batch_true_wh[:,:,0] * batch_true_wh[:,:,1]
    batch_pred_union_area = batch_pred_wh[:,:,:,0] * batch_pred_wh[:,:,:,1]

    batch_union_area = (batch_true_union_area + batch_pred_union_area) - batch_intersection_area

    #calculate iou -- (h,w,n)
    batch_iou = batch_intersection_area / batch_union_area

    #****************** IOU ******************

    #get best iou -- (h,w,1)
    best_iou = K.max(batch_iou,axis=-1,keepdims=True)

    #ignore value -- (h,w,1)
    ignore_val = K.cast(best_iou < ignore_threshold,K.dtype(y_pred))

    #update ignore mask
    ignore_mask = ignore_mask.write(i,ignore_val)

    #update i
    i = i + 1

  #stack to tensor -- (m,h,w,1)
  ignore_mask = ignore_mask.stack()

  return ignore_mask

In [None]:
def preprocess_class(input_path,save_path ,name="class_map.txt"):

   """
   MS COCO 2017 Dataset

   return dict
   """
   dataset = pd.read_csv(input_path)

   #get class map
   class_array = dataset.iloc[:,3]
   class_map = {}
   idx = 0
   
   for i in range(class_array.shape[0]):

      if not (class_array[i] in class_map.keys()):

         class_map[class_array[i]] = idx
         idx = idx + 1

   #save class map 
   with open(f"{save_path}/{name}","w") as file:
      
     file.write(json.dumps(class_map))

   return class_map

def preprocess_bbox_info(path,path_pos,path_hw,name_pos="bbox_pos.txt",name_hw="bbox_hw.txt"):

   """
   MS COCO 2017 Dataset

   return numpy.ndarray,bbox_pos,bbox_hw
   """

   dataset = pd.read_csv(path)

   """
   #find center
   dataset["center_x"] = (dataset["xmax"] + dataset["xmin"])/2
   dataset["center_y"] = (dataset["ymax"] + dataset["ymin"])/2
   """

   #get positional data
   bbox_pos = dataset.iloc[:,4:].to_numpy()

   #save bbox_pos
   with open(f"{path_pos}/{name_pos}","w") as file:

      file.write(json.dumps(bbox_pos.tolist()))

   #construct bbox_hw (m,2) : h -> 0 , w -> 1
   m = bbox_pos.shape[0]
   bbox_hw = np.zeros((m,2))

   for i in range(m):

      bbox_hw[i,0] = bbox_pos[i,3] - bbox_pos[i,1]
      bbox_hw[i,1] = bbox_pos[i,2] - bbox_pos[i,0]


   #save bbox_hw 
   with open(f"{path_hw}/{name_hw}","w") as file:

      file.write(json.dumps(bbox_hw.tolist()))


   return bbox_pos,bbox_hw


def preprocess_pre_define_anchor_box(bbox_hw,save_path,K=9,name="anchors.txt"):

   """
   bbox_hw -- numpy.ndarray (m,2)
   """

   anchors = Kmean(bbox_hw,K)

   with open(f"{save_path}/{name}","w") as file:

      file.write(json.dumps(anchors.tolist()))

   
   return anchors

def preprocessing_label(input_path,save_path,name="gt_dataset.txt"):

   """
   save dict as {obj1:[[class,xmin,ymin,xcenter,ycenter],[class,xmin,ymin,xcenter,ycenter],...],obj2:...} (for each key)
   """

   #read csv file
   dataset = pd.read_csv(input_path)
   m = dataset.shape[0]

   #calibrate bbox pos
   dataset["xmin"] = dataset["xmin"] + (640 - dataset["width"] )//2
   dataset["xmax"] = dataset["xmax"] + (640 - dataset["width"] )//2

   dataset["ymin"] = dataset["ymin"] + (640 - dataset["height"] )//2
   dataset["ymax"] = dataset["ymax"] + (640 - dataset["height"] )//2

   #calculate center
   dataset["xcenter"] = (dataset["xmin"] + dataset["xmax"])/2
   dataset["ycenter"] = (dataset["ymin"] + dataset["ymax"])/2

   #Group the label
   gt_dataset = {}
   
   for i in range(m):

      filename = dataset.iloc[i,0]
      tmp = []

      #class
      tmp.append(dataset.iloc[i,3])

      #xmin xmax ymin ymax
      tmp.append(dataset.iloc[i,4].item())
      tmp.append(dataset.iloc[i,5].item())
      tmp.append(dataset.iloc[i,8].item())
      tmp.append(dataset.iloc[i,9].item())

      if not ( filename in gt_dataset.keys()  ):

         gt_dataset[filename] = []
         
      gt_dataset[filename].append(tmp)


   #save dataset
   with open(f"{save_path}/{name}","w") as file:

      file.write(json.dumps(gt_dataset))

      file.close()

   return gt_dataset
   
def preprocess_image(img,standard_shape=(640,640)):

   """
   img -- numpy.ndarray
   standard_shape -- (height,width)
   """
   #get h,w
   height,width = standard_shape

   #get pad size
   padH = (height - img.shape[0])//2
   padW = (width - img.shape[1])//2

   #pad img
   diff_H = height - img.shape[0]
   diff_W = width - img.shape[1]
   
   if (diff_H % 2 ) == 0 and (diff_W % 2) == 0:

      img_pad = np.pad(img,((padH,padH),(padW,padW),(0,0)),mode="constant",constant_values=(0,0))

   elif (diff_H % 2 ) != 0 and (diff_W % 2) == 0:

      img_pad = np.pad(img,((padH,padH+1),(padW,padW),(0,0)),mode="constant",constant_values=(0,0))

   elif (diff_H % 2 ) == 0 and (diff_W % 2) != 0:

      img_pad = np.pad(img,((padH,padH),(padW,padW+1),(0,0)),mode="constant",constant_values=(0,0))

   elif (diff_H % 2 ) != 0 and (diff_W % 2) != 0:

      img_pad = np.pad(img,((padH,padH+1),(padW,padW+1),(0,0)),mode="constant",constant_values=(0,0))

   return img_pad
   

def preprocess_y_true(input_path,save_path,anchors,class_map,input_shape = (640,640),pos_info_format = [(76,76,255),(38,38,255),(19,19,255)],bbox_type=3):

   """
   anchors -- numpy.ndarray (K,2)
   input_shape -- (x,y)
   """

   #read csv file
   dataset = pd.read_csv(input_path)
   m = dataset.shape[0]

   #get center info
   dataset["x_center"] = (dataset["xmin"] + dataset["xmax"])/2
   dataset["y_center"] = (dataset["ymin"] + dataset["ymax"])/2

   #calibrate bbox pos
   dataset["xmin"] = dataset["xmin"] + (640 - dataset["width"] )//2
   dataset["xmax"] = dataset["xmax"] + (640 - dataset["width"] )//2

   dataset["ymin"] = dataset["ymin"] + (640 - dataset["height"] )//2
   dataset["ymax"] = dataset["ymax"] + (640 - dataset["height"] )//2

   #set up
   prev_name = "#"
   pos_format_size = len(pos_info_format)

   #loop through dataset
   for i in range(m):

      curr_name = dataset.iloc[i,0]
      class_name = dataset.iloc[i,3]

      if curr_name == prev_name:

         #update pos info
         update_pos_info(pos_info,dataset.iloc[i,8].item(),dataset.iloc[i,9].item(),dataset.iloc[i,4].item(),dataset.iloc[i,5].item(),dataset.iloc[i,6].item(),dataset.iloc[i,7].item(),class_map[dataset.iloc[i,3]],anchors,image_shape=input_shape,bbox_type=bbox_type,feature_size=85)
      
      elif curr_name != prev_name and prev_name != "#":

         #save prev pos info
         for q in range(pos_format_size):
            
            h_size = pos_info_format[q][0]
            w_size = pos_info_format[q][1]
            
            with open(f"{save_path}/{curr_name}_{h_size}x{w_size}.txt","w") as file:

               file.write(json.dumps((pos_info[q]).tolist()))

               file.close()

         #crreate new pos info
         pos_info = [np.zeros(pos_info_format[0]),np.zeros(pos_info_format[1]),np.zeros(pos_info_format[2])]

         #update pos info
         update_pos_info(pos_info,dataset.iloc[i,8].item(),dataset.iloc[i,9].item(),dataset.iloc[i,4].item(),dataset.iloc[i,5].item(),dataset.iloc[i,6].item(),dataset.iloc[i,7].item(),class_map[dataset.iloc[i,3]],anchors,image_shape=input_shape,bbox_type=bbox_type,feature_size=85)

      elif prev_name == "#":

         #crreate new pos info
         pos_info = [np.zeros(pos_info_format[0]),np.zeros(pos_info_format[1]),np.zeros(pos_info_format[2])]

         #update pos info
         update_pos_info(pos_info,dataset.iloc[i,8].item(),dataset.iloc[i,9].item(),dataset.iloc[i,4].item(),dataset.iloc[i,5].item(),dataset.iloc[i,6].item(),dataset.iloc[i,7].item(),class_map[dataset.iloc[i,3]],anchors,image_shape=input_shape,bbox_type=bbox_type,feature_size=85)

      #update prev_name
      prev_name = curr_name

   #save last obj
   for q in range(pos_format_size):
   
      h_size = pos_info_format[q][0]
      w_size = pos_info_format[q][1]
      
      with open(f"{save_path}/{curr_name}_{h_size}x{w_size}.txt","w") as file:

         file.write(json.dumps((pos_info[q]).tolist()))

         file.close()


#@jit(nopython=True)
def update_pos_info(pos_info,center_x,center_y,xmin,ymin,xmax,ymax,class_index,anchors,image_shape = (640,640),bbox_type = 3,feature_size = 85):

   """
   center_info -- (x,y)
   pos_info -- list containing numpy.ndarray
   obj_pos -- numpy.ndarray (1,4)
   anchors -- numpy.ndarray (K,2)
   image_shape -- (x,y)
   """
   
   #object bbox h w
   bbox_h = xmax - xmin
   bbox_w = ymax - ymin
   
   #find bext anchor index
   max_index = 0
   max_iou = 0
   
   for i in range(anchors.shape[0]):

      min_h = np.minimum(anchors[i,0],bbox_h).item()
      min_w = np.minimum(anchors[i,1],bbox_w).item()

      #intersection
      intersection_area = min_w * min_h

      #union
      union_area = bbox_h * bbox_w  + anchors[i,0] * anchors[i,1] - intersection_area

      #iou
      cur_iou = intersection_area / union_area

      if cur_iou > max_iou:

         max_iou = cur_iou

         max_index = i

   #size of particular type
   type_size = np.int64(anchors.shape[0]/bbox_type).item()
   
   #best box index -- dim 1 (determine which type of box)
   best_anchor_index = np.int64(max_index/type_size).item()
      
   #best box index -- dim 2 (in a particular type of box , determine sub class of particular type )
   sub_class_index = max_index % type_size

   #update info (prob,xmin,ymin,xmax,ymax,class)
   #feature mapping
   feature_per_image_pixel_x = pos_info[best_anchor_index].shape[0] / image_shape[0]
   feature_per_image_pixel_y = pos_info[best_anchor_index].shape[1] / image_shape[1]

   target_x = np.int64(center_x * feature_per_image_pixel_x).item()
   target_y = np.int64(center_y * feature_per_image_pixel_y).item()

   #prob
   (pos_info[best_anchor_index])[target_y,target_x,0] = 1

   #xmin,ymin,xmax,ymax
   (pos_info[best_anchor_index])[target_y,target_x,1] = xmin
   (pos_info[best_anchor_index])[target_y,target_x,2] = ymin
   (pos_info[best_anchor_index])[target_y,target_x,3] = xmax
   (pos_info[best_anchor_index])[target_y,target_x,4] = ymax

   #class
   (pos_info[best_anchor_index])[target_y,target_x,(class_index+5)] = 1

In [None]:
#data augmentation
def data_aug(img):

   idx = np.random.randint(0,4)

   if idx == 0:

      h  = tune_odd(np.random.randint(1,17))
      w  = tune_odd(np.random.randint(1,17))

      kernel_shape = (h,w)

      img = GaussianBlur(img,kernel_shape)

   elif idx == 1:

      img = GaussianNoise(img)

   elif idx == 2:

      color_aug_seq = [ i for i in range(4)]
      random.shuffle(color_aug_seq)

      for i in color_aug_seq:

         if i == 0:

            img = random_brightness(img)

         elif i == 1:

            img = random_saturation(img)

         elif i == 2:

            img = random_contrast(img)

         elif i == 3:

            img = random_hue(img)

   elif idx == 3:

      for i in range(16):

         h = np.random.randint(8,81)
         w = np.random.randint(8,81)

         kernel_size = (h,w)

         img = random_erase(img,kernel_size)

      
   return img

@jit(nopython=True)
def tune_odd(val):

   if (val % 2) == 0:

      val = val + 1

   return val

#Gaussian Blur
def GaussianBlur(img,kernel_shape,sigma=0):

   """
   img -- numpy array
   kernel_shape -- (int,int)
   sigma -- real number
   """

   return cv2.GaussianBlur(img,kernel_shape,sigma)


##Gaussian Noise
@jit(nopython=True)
def ext_operator(val):

  if val > 255 :

    val = 255

  elif val < 0:

    val = 0

  return val

@jit(nopython=True)
def GaussianNoise(img,low=8,high=64):

  nH,nW,nC = img.shape

  for h in range(nH):

    for w in range(nW):

      for c in range(nC):

        factor = np.random.randint(low,high)

        loc = factor * np.random.random()

        scale = factor * np.random.random()

        noise = np.random.normal(loc,scale)

        img[h,w,c] = ext_operator(img[h,w,c]+noise)

  return img


#brightness
def random_brightness(img):

   return tf.image.random_brightness(img,0.4).numpy()


#saturation
def random_saturation(img):

   return tf.image.random_saturation(img,1.5,8.0).numpy()

#contrast
def random_contrast(img):

   return tf.image.random_contrast(img,1.5,8.0).numpy()

#hue
def random_hue(img):

   return tf.image.random_hue(img,0.4).numpy()

#erase
@jit(nopython=True)
def random_erase(img,kernel_size=(64,64)):

   nH,nW,_ = img.shape
   fH,fW = kernel_size

   new_nH = nH - fH + 1
   new_nW = nW - fW + 1

   nH_start = np.random.randint(0,new_nH)
   nW_start = np.random.randint(0,new_nW)

   nH_end = nH_start + fH
   nW_end = nW_start + fW

   erase_region = img[nH_start:nH_end,nW_start:nW_end,:].copy()

   for h in range(nH_start,nH_end):

      for w in range(nW_start,nW_end):
         
         random_h = np.random.randint(0,fH)
         random_w = np.random.randint(0,fW)

         img[h,w,:] = erase_region[random_h,random_w,:].copy()


   return img

In [None]:
def Kmean_IOU(bbox_hw,K=3,threshold=1e-8,max_iterations=2000):

   """
   bbox_hw -- (m,2) : h -> 0 , w -> 1

   return -- [large,medium ,small] <-- list(list)
   """

   m = bbox_hw.shape[0]

   #ONLY for K = 3 
   if m == 1:

      return [ [0] , [] , [] ]

   elif m == 2:

      bbox_1_area = bbox_hw[0,0] * bbox_hw[0,1]
      bbox_2_area = bbox_hw[1,0] * bbox_hw[1,1]

      if bbox_1_area > bbox_2_area:
         
         return [ [0] , [1] , [] ]

      else:

         return [ [1] , [0] , [] ]

   #initialize anchors (K,2)
   anchors = np.zeros((K,2))
   for i in range(K):

      anchors[i,:] = bbox_hw[i,:].copy()
   
   #calculate Kmean
   iteration_i = 0
   
   while iteration_i <= max_iterations:

      #store the data by index
      output_list = [[] for i in range(K)]

      #classify bbox 
      for i in range(m):

         class_id = best_anchor(bbox_hw[i,:].reshape(1,2),anchors)

         output_list[class_id].append(i)

      #update anchor box
      sum_diff = 0
      
      for i in range(K):
         
         new_h,new_w = update_anchor_x(bbox_hw,output_list[i],anchors[i,:].reshape(1,2))

         """
         #sum the changes for h and w of new anchor box 
         sum_diff = sum_diff + ((new_h-anchors[i,0])**2 + (new_w - anchors[i,1])**2)**(0.5)
         """
         #sum the changes for h and w of new anchor box - compare iou
         min_h = min(new_h,anchors[i,0])
         min_w = min(new_w,anchors[i,1])

         intersection = min_h * min_w
         union = new_h * new_w + anchors[i,0] * anchors[i,1] - intersection

         sum_diff = sum_diff + (1 - intersection / (union + 1e-8) )

         #update anchor box
         anchors[i,0] = new_h
         anchors[i,1] = new_w

      if sum_diff < threshold:

         return rearrange_output_list_to_correct_order(bbox_hw,output_list)

      #update iteration
      iteration_i = iteration_i + 1
      
   return rearrange_output_list_to_correct_order(bbox_hw,output_list)
         

def rearrange_output_list_to_correct_order(bbox_hw,output_list):

   #corrected output_list
   corrected_output_list = []
   
   n =len(output_list)
   #find area sum
   sample_area_sum = []
   for i in range(n):

      area = find_area_sum(bbox_hw,output_list[i])
      sample_area_sum.append((i,area))

   #sort the list
   sample_area_sum.sort(key=lambda x:x[1],reverse=True)

   #correct output list
   for ele in sample_area_sum:

      corrected_output_list.append(output_list[ele[0]])

   return corrected_output_list
   

def find_area_sum(bbox_hw,output_list_x):

   area = 0

   for i in output_list_x:

      area = area + bbox_hw[i,0] * bbox_hw[i,1]

   return area
      
   
def update_anchor_x(bbox_hw,output_list_x,anchors_x):

   """
   output_list_x -- [ n elements]
   anchor_x -- (1,2) : h -> 0 , w -> 1
   """

   sum_w = 0
   sum_h = 0

   n = len(output_list_x)

   for i in output_list_x:

      sum_h = sum_h + bbox_hw[i,0]
      sum_w = sum_w + bbox_hw[i,1]

   if n == 0:

      return anchors_x[0,0],anchors_x[0,1]

   mean_h = sum_h/n
   mean_w = sum_w/n

   return mean_h,mean_w


@jit(nopython=True)
def best_anchor(box,anchors):
   
  """
  box -- (1,2) :h -> 0 , w -> 1

  anchors -- (K,2)

  return class_idx
  """
  
  max_iou = 0
  max_index = 0

  K = anchors.shape[0]

  for i in range(K):

    min_h = np.minimum(box[0,0],anchors[i,0])
    min_w = np.minimum(box[0,1],anchors[i,1])

    intersection_area = min_h * min_w

    union_area = box[0,0] * box[0,1] + anchors[i,0] * anchors[i,1] - intersection_area

    cur_iou = intersection_area / (union_area + 1e-8)

    if cur_iou > max_iou:

      max_iou = cur_iou

      max_index = i

    
  return max_index

In [None]:
#data augmentation
def data_aug(img):

   idx = np.random.randint(0,4)

   if idx == 0:

      h  = tune_odd(np.random.randint(1,17))
      w  = tune_odd(np.random.randint(1,17))

      kernel_shape = (h,w)

      img = GaussianBlur(img,kernel_shape)

   elif idx == 1:

      img = GaussianNoise(img)

   elif idx == 2:

      color_aug_seq = [ i for i in range(4)]
      random.shuffle(color_aug_seq)

      for i in color_aug_seq:

         if i == 0:

            img = random_brightness(img)

         elif i == 1:

            img = random_saturation(img)

         elif i == 2:

            img = random_contrast(img)

         elif i == 3:

            img = random_hue(img)

   elif idx == 3:

      for i in range(16):

         h = np.random.randint(8,81)
         w = np.random.randint(8,81)

         kernel_size = (h,w)

         img = random_erase(img,kernel_size)

      
   return img

@jit(nopython=True)
def tune_odd(val):

   if (val % 2) == 0:

      val = val + 1

   return val

#Gaussian Blur
def GaussianBlur(img,kernel_shape,sigma=0):

   """
   img -- numpy array
   kernel_shape -- (int,int)
   sigma -- real number
   """

   return cv2.GaussianBlur(img,kernel_shape,sigma)


##Gaussian Noise
@jit(nopython=True)
def ext_operator(val):

  if val > 255 :

    val = 255

  elif val < 0:

    val = 0

  return val

@jit(nopython=True)
def GaussianNoise(img,low=8,high=64):

  nH,nW,nC = img.shape

  for h in range(nH):

    for w in range(nW):

      for c in range(nC):

        factor = np.random.randint(low,high)

        loc = factor * np.random.random()

        scale = factor * np.random.random()

        noise = np.random.normal(loc,scale)

        img[h,w,c] = ext_operator(img[h,w,c]+noise)

  return img


#brightness
def random_brightness(img):

   return tf.image.random_brightness(img,0.4).numpy()


#saturation
def random_saturation(img):

   return tf.image.random_saturation(img,1.5,8.0).numpy()

#contrast
def random_contrast(img):

   return tf.image.random_contrast(img,1.5,8.0).numpy()

#hue
def random_hue(img):

   return tf.image.random_hue(img,0.4).numpy()

#erase
@jit(nopython=True)
def random_erase(img,kernel_size=(64,64)):

   nH,nW,_ = img.shape
   fH,fW = kernel_size

   new_nH = nH - fH + 1
   new_nW = nW - fW + 1

   nH_start = np.random.randint(0,new_nH)
   nW_start = np.random.randint(0,new_nW)

   nH_end = nH_start + fH
   nW_end = nW_start + fW

   erase_region = img[nH_start:nH_end,nW_start:nW_end,:].copy()

   for h in range(nH_start,nH_end):

      for w in range(nW_start,nW_end):
         
         random_h = np.random.randint(0,fH)
         random_w = np.random.randint(0,fW)

         img[h,w,:] = erase_region[random_h,random_w,:].copy()


   return img


In [None]:
#generator
def get_gt_data(batch_size,img_info,class_info,img_path,img_shape = (640,640),aug_flag=False):

   """
   #img_shape -- (height,width)
   """
   
   img_list_shuffled = list(img_info.keys())
   
   random.shuffle(img_list_shuffled)

   m = len(img_list_shuffled)

   idx = 0

   while m >= batch_size:

      """
      #check remaining sample
      #if m < batch_size:

         #break
      """
      
      #get name list
      name_list = []

      for i in range(idx*batch_size,(idx+1)*batch_size):

         name_list.append(img_list_shuffled[i])


      #get image data -- np.array
      img_data = get_image_data(name_list,img_path,img_shape,aug_flag)

      #get y_true data -- tuple (np.array,np.array,np.array)
      label = get_y_true(name_list,img_info,class_info,img_shape)

      #update remaining sample
      m = m - batch_size
      idx = idx + 1

      yield img_data,label


      
def get_image_data(name_list,img_path,img_shape=(640,640),aug_flag=False):

   """
   return numpy.ndarray
   """

   img_data = []

   for name in name_list:

      #img -- numpy.ndarray
      img = cv2.imread(f"{img_path}/{name}")

      #calibrate image
      img = preprocess_image(img,img_shape)

      #img = cv2.resize(img,(128,128))
      #data augmentation
      if aug_flag:

         img = data_aug(img)

      #save img
      img_data.append(img)

   img_data = np.array(img_data)

   return img_data


def get_y_true(name_list,img_info,class_info,img_shape = (640,640)):

   """
   name_list -- list
   img_info -- dict -- {obj1:[[class,xmin,ymin,xcenter,ycenter],[class,xmin,ymin,xcenter,ycenter],...],obj2:...} (for each key)
   class_info -- dict
   standard_scale -- dict (small , medium , large)
   img_shape -- (height,width)
   """
   #initialize y_true
   small_true = []
   medium_true = []
   large_true = []
   
   
   for name in name_list:


      #initialize y_true extra dim will be removed when it is saved (it is used for overlap region checking)
      obj_small_true = np.zeros((80,80,85))
      obj_medium_true = np.zeros((40,40,85))
      obj_large_true = np.zeros((20,20,85))

      #obj_small_true = np.zeros((16,16,91))
      #obj_medium_true = np.zeros((8,8,91))
      #obj_large_true = np.zeros((4,4,91))

      #get (obj_info -- list)
      obj_info = img_info[name]

      n = len(obj_info)

      #initial bbox hw
      bbox_hw = np.zeros((n,2))

      #loop via all object in the image (obj -- list) to fill bbox_hw
      for i in range(n):

         bbox_hw[i,0] = (obj_info[i][4] - obj_info[i][2])*2
         bbox_hw[i,1] = (obj_info[i][3] - obj_info[i][1])*2

      #get cluster index
      cluster_idx = Kmean_IOU(bbox_hw)
      
      #update y_true
      obj_small_true,obj_medium_true,obj_large_true = update_y_true(obj_info,class_info,cluster_idx,obj_small_true,obj_medium_true,obj_large_true,img_shape = (640,640))
      
      #save image info
      small_true.append(obj_small_true[:,:,:])
      medium_true.append(obj_medium_true[:,:,:])
      large_true.append(obj_large_true[:,:,:])

   #convert y_true to numpy array
   small_true = np.array(small_true)
   medium_true = np.array(medium_true)
   large_true = np.array(large_true)

   return (large_true,medium_true,small_true)

   
def update_y_true(obj_info,class_info,cluster_idx,obj_small_true,obj_medium_true,obj_large_true,img_shape = (640,640)):

   """
   obj -- list [class,xmin,ymin,xcenter,ycenter]
   img_shape -- (height,width)
   """
   """
   _,xmin,ymin,xcenter,ycenter = obj

   #avoid x,y division by zeros for ratio calculation
   xmin = xmin + 1e-18
   ymin = ymin + 1e-18
   xcenter = xcenter + 1e-18
   ycenter = ycenter + 1e-18

   width = (xcenter - xmin) * 2
   height = (ycenter - ymin) * 2

   xmax = xmin + width
   ymax = ymin + height

   area  = width * height
   """
   #update large obj
   for i in cluster_idx[0]:

      #get obj info
      class_name,xmin,ymin,xcenter,ycenter = obj_info[i]

      class_id = class_info[class_name]

      xmax = (xcenter - xmin) * 2 + xmin
      ymax = (ycenter - ymin) * 2 + ymin

      #set up
      step_h = obj_large_true.shape[0] / img_shape[0]
      step_w = obj_large_true.shape[1] / img_shape[1]

      h_pos = int(step_h * ycenter)
      w_pos = int(step_w * xcenter)
      
      #prob
      obj_large_true[h_pos,w_pos,0] = 1

      #xmin,ymin
      obj_large_true[h_pos,w_pos,1] = xmin 
      obj_large_true[h_pos,w_pos,2] = ymin 

      #xcenter,ycenter
      obj_large_true[h_pos,w_pos,3] = xcenter 
      obj_large_true[h_pos,w_pos,4] = ycenter 

      #class
      obj_large_true[h_pos,w_pos,5+class_id] = 1

      #multiple positive
      obj_large_true = multiple_positive_labeling(obj_large_true,class_id,xmin,ymin,xmax,ymax,xcenter,ycenter,step_w,step_h)


   #update medium obj
   for i in cluster_idx[1]:

      #get obj info
      class_name,xmin,ymin,xcenter,ycenter = obj_info[i]

      class_id = class_info[class_name]

      xmax = (xcenter - xmin) * 2 + xmin
      ymax = (ycenter - ymin) * 2 + ymin

      #set up
      step_h = obj_medium_true.shape[0] / img_shape[0]
      step_w = obj_medium_true.shape[1] / img_shape[1]

      h_pos = int(step_h * ycenter)
      w_pos = int(step_w * xcenter) 
         
      #prob
      obj_medium_true[h_pos,w_pos,0] = 1

      #xmin,ymin
      obj_medium_true[h_pos,w_pos,1] = xmin 
      obj_medium_true[h_pos,w_pos,2] = ymin 

      #xcenter,ycenter
      obj_medium_true[h_pos,w_pos,3] = xcenter 
      obj_medium_true[h_pos,w_pos,4] = ycenter 

      #class
      obj_medium_true[h_pos,w_pos,5+class_id] = 1

      #multiple positive
      obj_medium_true = multiple_positive_labeling(obj_medium_true,class_id,xmin,ymin,xmax,ymax,xcenter,ycenter,step_w,step_h)


   #update small obj
   for i in cluster_idx[2]:

      #get obj info
      class_name,xmin,ymin,xcenter,ycenter = obj_info[i]

      class_id = class_info[class_name]

      xmax = (xcenter - xmin) * 2 + xmin
      ymax = (ycenter - ymin) * 2 + ymin

      #set up
      step_h = obj_small_true.shape[0] / img_shape[0]
      step_w = obj_small_true.shape[1] / img_shape[1]

      h_pos = int(step_h * ycenter)
      w_pos = int(step_w * xcenter)
      
      #prob
      obj_small_true[h_pos,w_pos,0] = 1

      #xmin,ymin
      obj_small_true[h_pos,w_pos,1] = xmin 
      obj_small_true[h_pos,w_pos,2] = ymin 

      #xcenter,ycenter
      obj_small_true[h_pos,w_pos,3] = xcenter 
      obj_small_true[h_pos,w_pos,4] = ycenter 

      #class
      obj_small_true[h_pos,w_pos,5+class_id] = 1


      #multiple positive
      obj_small_true = multiple_positive_labeling(obj_small_true,class_id,xmin,ymin,xmax,ymax,xcenter,ycenter,step_w,step_h)
                 
   return obj_small_true,obj_medium_true,obj_large_true

            
@jit(nopython=True)  
def multiple_positive_labeling(y_true,class_id,xmin,ymin,xmax,ymax,xcenter,ycenter,step_w,step_h):

   """
   y_true -- numpy array
   """

   xlow = int(xmin*step_w)
   ylow = int(ymin*step_h)

   xhigh = int(xmax*step_w)
   yhigh = int(ymax*step_h)
   
   w_pos_init = int(xcenter*step_w - 32*step_w)
   h_pos_init = int(ycenter*step_h - 32*step_h)

   w_max = int(w_pos_init + 3*32*step_w)
   h_max = int(h_pos_init + 3*32*step_h)

   n = 0

   for w_pos in range(w_pos_init,w_max):

      for h_pos in range(h_pos_init,h_max):

         if n >= 5:

           break

         if (y_true[h_pos,w_pos,0] == 0) and (w_pos > xlow) and (w_pos < xhigh) and (h_pos > ylow) and (h_pos < yhigh):

            #prob
            y_true[h_pos,w_pos,0] = 1

            #xmin,ymin
            y_true[h_pos,w_pos,1] = xmin + y_true[h_pos,w_pos,1] 
            y_true[h_pos,w_pos,2] = ymin + y_true[h_pos,w_pos,2]

            #xcenter,ycenter
            y_true[h_pos,w_pos,3] = xcenter + y_true[h_pos,w_pos,3]
            y_true[h_pos,w_pos,4] = ycenter + y_true[h_pos,w_pos,4]

            #class
            y_true[h_pos,w_pos,5+class_id] = 1

            n = n + 1


   return y_true

In [None]:
gpu_info = !nvidia-smi
gpu_info = '\n'.join(gpu_info)
if gpu_info.find('failed') >= 0:
  print('Select the Runtime > "Change runtime type" menu to enable a GPU accelerator, ')
  print('and then re-execute this cell.')
else:
  print(gpu_info)

In [None]:
#find current path
cur_path = os.getcwd()

#define strategy
strategy = tf.distribute.MirroredStrategy(cross_device_ops=tf.distribute.HierarchicalCopyAllReduce())

#define model,loss,optimizer
with strategy.scope():

   #define loss object
   loss_object = alpha_loss(reduction=tf.keras.losses.Reduction.NONE)

   #define compute loss
   def compute_loss(labels,predictions):

      #large
      large_obj_loss = loss_object(labels[0],predictions[0])

      #medium
      medium_obj_loss = loss_object(labels[1],predictions[1])

      #small
      small_obj_loss = loss_object(labels[2],predictions[2])

      #total loss
      total_loss = large_obj_loss + medium_obj_loss + small_obj_loss

      return total_loss 

   #define learning rate scheduler
   #lr_scheduler = tf.keras.optimizers.schedules.CosineDecay(initial_learning_rate=0.001,decay_steps=5,alpha=1e-5)

   #define optimizer
   #optimizer = tf.keras.optimizers.Adam(learning_rate=lr_scheduler)
   optimizer = tf.keras.optimizers.Adam()

   #define model
   model = alpha_model()
   #model.load_weights(f"{cur_path}/gdrive/MyDrive/model_weights")
   #model = tf.keras.models.load_model(f"{cur_path}/gdrive/MyDrive/model")
   
#step function
@tf.function
def distributed_train_step(data_inputs):

   per_replica_losses = strategy.run(train_step,args=(data_inputs,))

   return strategy.reduce(tf.distribute.ReduceOp.SUM,per_replica_losses,axis=None)


def train_step(inputs):

   images,labels = inputs

   with tf.GradientTape() as tape:

      predictions = model(images,train_flag=True)
      
      loss = compute_loss(labels,predictions)

   gradients = tape.gradient(loss,model.trainable_weights)

   optimizer.apply_gradients(zip(gradients,model.trainable_weights))

   return loss

batch_size_per_replica = 4

EPOCHS = 300

#preprocessing class
input_path = f"{cur_path}/gdrive/MyDrive/annotations/train_annotations.csv"
save_path = f"{cur_path}/gdrive/MyDrive/data"

class_train_info = preprocess_class(input_path,save_path)

#preprocessing label
input_path = f"{cur_path}/gdrive/MyDrive/annotations/test_annotations.csv"
save_path = f"{cur_path}/gdrive/MyDrive/data"

img_train_info = preprocessing_label(input_path,save_path)

#train image path
img_train_path = f"{cur_path}/gdrive/MyDrive/img"

img_shape = (640,640)
standard_scale=(19360,66930)

#get number of sample m
m = len(list(img_train_info.keys()))

#dataset size
global_batch_size = batch_size_per_replica * strategy.num_replicas_in_sync
buffer_size = global_batch_size * 2

#total step per epochs
total_step_per_epoch = int(m/buffer_size)


#aug flag
aug_flag = False

#train
for i  in range(EPOCHS):

   total_loss = 0.0

   if (i+1) < 295:

     aug_flag = True

   else:

     aug_flag = False

   #get data 
   for train_images, train_labels in get_gt_data(buffer_size,img_train_info,class_train_info,img_train_path,img_shape,aug_flag):

      #cast data
      train_images = train_images.astype(np.float64)

      # Create Datasets from the batches
      train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels)).shuffle(buffer_size).batch(global_batch_size)

      #create distributed dataset
      train_dist_dataset = strategy.experimental_distribute_dataset(train_dataset)

      #Do training
      for batch in train_dist_dataset:

         total_loss = total_loss + distributed_train_step(batch) 
   
   total_loss = total_loss 
   
   print(f"Epoch {i+1} , Loss: {total_loss}")

   #save model for each epoch
   if ((i+1)%10) == 0:

     #model.save(f"{cur_path}/gdrive/MyDrive/model")
     model.save_weights(f"{cur_path}/gdrive/MyDrive/model_weights")

In [None]:
#tf.keras.models.save_model(model,f"{cur_path}/gdrive/MyDrive/model")
model.save(f"{cur_path}/gdrive/MyDrive/model")

In [None]:
def draw_anchor_box(img,feat,reversed_class_map,class_color_map):

  """
  img -- numpy_array
  feat -- list [class,left_x,left_y,center_x,center_y]
  """

  left_x = feat[1]
  left_y = feat[2]

  width = (feat[3]-feat[1])*2
  height = (feat[4]-feat[2])*2

  right_x = feat[1] + width
  right_y = feat[2] + height

  updated_img = cv2.rectangle(img,(int(left_x),int(left_y)),(int(right_x),int(right_y)),class_color_map[feat[0]],2)

  updated_img = cv2.putText(updated_img,reversed_class_map[feat[0]],(int(left_x),int(left_y)),cv2.FONT_HERSHEY_DUPLEX,0.8,class_color_map[feat[0]],2,cv2.LINE_AA)

  return updated_img

In [None]:
@tf.function
def step_predict(img,model):

   predictions = model(img,training=False)

   return predictions

def load_model(model_path):

   #define model
   #model = tf.keras.models.load_model(model_path)
   model = alpha_model()
   #model.load_weights(f"{cur_path}/gdrive/MyDrive/model_weights")
   model.load_weights(model_path)

   return model

def get_image_generator(batch_size,image_path,standard_shape=(640,640)):

   filename_list = os.listdir(image_path)

   idx = 0

   while True:

      img_data = []

      batch_filename_list = filename_list[ idx * batch_size : (idx+1) * batch_size ]

      if len(batch_filename_list) == 0:

         break

      for name in batch_filename_list:
         
         #read image
         img = cv2.imread(f"{image_path}/{name}").astype(np.float64)

         #pad image
         img = preprocess_image(img,standard_shape)

         #save img
         img_data.append(img)

      img_data = np.array(img_data)

      yield img_data

      idx = idx + 1

def predict(model,image_path,result_path,reversed_class_map,class_color_map,confidence_threshold=0.8,diou_threshold=0.4,batch_size=8,standard_shape=(640,640)):

   img_idx = 0

   #get batch of image
   for img_data in get_image_generator(batch_size,image_path,standard_shape) :

      #get prediction
      y = step_predict(img_data,model)

      #analysis and draw data
      for i in range(batch_size):

         img = img_data[i].copy()

         #large object
         large_obj = y[0][i].numpy()
         large_confirmed_anchor_box = get_confirmed_anchor_box(large_obj,confidence_threshold)
         
         #medium object
         medium_obj = y[1][i].numpy()
         medium_confirmed_anchor_box = get_confirmed_anchor_box(medium_obj,confidence_threshold)
         
         #small object
         small_obj = y[2][i].numpy()
         small_confirmed_anchor_box = get_confirmed_anchor_box(small_obj,confidence_threshold)

         #final anchor box -- final_anchor_box -- list: [ (prob1,feat_vec1) , (prob2,feat_vec2) , ... ]
         final_anchor_box = large_confirmed_anchor_box + medium_confirmed_anchor_box + small_confirmed_anchor_box

         #sort in descending order
         final_anchor_box.sort(key=lambda x:x[0],reverse=True)

         #nms
         selected_object = non_max_supression(final_anchor_box,diou_threshold)
         
         #draw prediction
         img = draw_box_based_on_feat(img,selected_object,reversed_class_map,class_color_map)

         #save the result
         cv2.imwrite(f"{result_path}/res_{img_idx}.jpg",img)

         #update img_idx
         img_idx = img_idx + 1

      """
      print(type(y))

      print(type(y[0]))
      """
      
      yield y
      
   

   with open(f"{os.getcwd()}/gdrive/MyDrive/data/predict_small.txt","w") as file:

      file.write(json.dumps((small_obj).tolist()))

      file.close()

   with open(f"{os.getcwd()}/gdrive/MyDrive/data/predict_medium.txt","w") as file:

      file.write(json.dumps((medium_obj).tolist()))

      file.close()

   with open(f"{os.getcwd()}/gdrive/MyDrive/data/predict_large.txt","w") as file:

      file.write(json.dumps((large_obj).tolist()))

      file.close()   
   
   
def draw_box_based_on_feat(img,selected_object,reversed_class_map,class_color_map):


   for obj in selected_object:

      class_idx = np.argmax(obj[5:])

      #set feat
      feat = [class_idx,obj[1],obj[2],obj[3],obj[4]]

      #draw box
      img = draw_anchor_box(img,feat,reversed_class_map,class_color_map)


   return img


def analyse_feature(feat,confidence_threshold=0.8,diou_threshold=0.4):

   """
   feat -- numpy array
   """
   
   #filtering high prob anchor box - confirmed_anchor_box -- list: [ (prob1,feat_vec1) , (prob2,feat_vec2) , ... ]
   confirmed_anchor_box = get_confirmed_anchor_box(feat,confidence_threshold)

   #sort in descending order according to prob * class
   confirmed_anchor_box.sort(key=lambda x:x[0],reverse=True)
   
   #non-max suppression
   selected_object = non_max_supression(confirmed_anchor_box,diou_threshold)

   return selected_object
      

def get_confirmed_anchor_box(feat,confidence_threshold=0.8):

   nH,nW,nC = feat.shape

   confirmed_anchor_box = []
   
   for h in range(nH):

      for w in range(nW):

         feat_vec = feat[h,w,:].copy()
         
         class_idx = np.argmax(feat_vec[5:])
         
         prob = feat_vec[5+class_idx] * feat_vec[0]

         if prob >= confidence_threshold:

            confirmed_anchor_box.append((prob,feat_vec))

   return confirmed_anchor_box


def non_max_supression(confirmed_anchor_box,diou_threshold=0.4):

   """
   confirmed_anchor_box -- list: [ (prob1,feat_vec1) , (prob2,feat_vec2) , ... ]
   """

   selected_object = []
   m = len(confirmed_anchor_box)

   while m != 0:

      #get and save largest fect_vec -- feat_vec: numpy.array
      feat_vec = confirmed_anchor_box[0][1]
      selected_object.append(feat_vec)

      #pop it from confirmed_anchor_box
      confirmed_anchor_box.pop(0)

      #get confirmed_anchor_box length
      m = len(confirmed_anchor_box)

      #compare it with diou
      idx = 0
      
      for i in range(m):

         #get diou
         diou_val = DIOU(feat_vec,confirmed_anchor_box[idx][1])

         if diou_val > diou_threshold:

            confirmed_anchor_box.pop(idx)

         else:

            idx = idx + 1
      
      m = len(confirmed_anchor_box)

   return selected_object

@jit(nopython=True)
def DIOU(feat_1,feat_2):

   #get left pos
   left_1 = feat_1[1:3]
   left_2 = feat_2[1:3]

   #get center
   center_1 = feat_1[3:5]
   center_2 = feat_2[3:5]

   #get width , height
   wh_1 = (center_1[:] - left_1[:])*2
   wh_2 = (center_2[:] - left_2[:])*2

   #get right pos
   right_1 = left_1[:] + wh_1[:]
   right_2 = left_2[:] + wh_2[:]

   ################## IOU ##################
   left_intersection = np.maximum(left_1,left_2)
   
   right_intersection = np.minimum(right_1,right_2)
   right_intersection = np.maximum(left_intersection,right_intersection)

   wh_intersection = right_intersection[:] - left_intersection[:]

   intersection_area = wh_intersection[0] * wh_intersection[1]

   union_area = wh_1[0] * wh_1[1] + wh_2[0] * wh_2[1] - intersection_area
   
   iou_val  = intersection_area / (union_area + 1e-10)

   ################## IOU ##################

   ################## distance ratio ##################
   outermost_left = np.minimum(left_1,left_2)
   outermost_right = np.maximum(right_1,right_2)

   outermost_distance = np.sum(np.square(outermost_right[:] - outermost_left[:]))

   center_distance = np.sum(np.square(center_1[:] - center_2[:]))

   distance_ratio = center_distance/(outermost_distance+1e-10)

   ################## distance ratio ##################

   diou_val = iou_val - distance_ratio

   return diou_val

In [None]:
def preprocess_class_color_map(class_info,save_path,name="class_color_map.txt"):

  num_class = len(list(class_info.keys()))

  step = int(256*3/num_class)

  class_color_map = {}
  x,y,z = 0,0,0

  for k in class_info.values():

    if x < 255:

      x = x + step

    elif y < 255:

      y = y + step

    elif z < 255:

      z = z + step

    if x > 255:

      x = 255

    elif y > 255:

      y = 255

    elif z > 255:

      z = 255

    class_color_map[k] = (x,y,z)

  with open(f"{save_path}/{name}","w") as file:

    file.write(json.dumps(class_color_map))

    file.close()

  return class_color_map

def reverse_class_info(class_info,save_path,name="reversed_class_map.txt"):

  reversed_class_map = {}

  for k in class_info.keys():

    val = class_info[k]

    reversed_class_map[val] = k

  with open(f"{save_path}/{name}","w") as file:

    file.write(json.dumps(reversed_class_map))

    file.close()

  return reversed_class_map

In [None]:
import time
path = os.getcwd()

data_path =  f"{path}/gdrive/MyDrive/data"

#get class info
file = open(f"{data_path}/class_map.txt")
class_info = json.load(file)
file.close()

class_color_map = preprocess_class_color_map(class_info,data_path)
reversed_class_info = reverse_class_info(class_info,data_path)


model_path = f"{path}/gdrive/MyDrive/model_weights"
image_path = f"{path}/gdrive/MyDrive/pending_to_analysis"
result_path = f"{path}/gdrive/MyDrive/result"

model = load_model(model_path)

start = time.time()
for y in predict(model,image_path,result_path,reversed_class_info,class_color_map,confidence_threshold=0.6,diou_threshold=0.5,batch_size=1):

  print(f"FPS: {1/(time.time()-start)}")
  start = time.time()