# Initialize

In [None]:
import tensorflow as tf
print("Tensorflow version " + tf.__version__)

try:
    tpu = tf.distribute.cluster_resolver.TPUClusterResolver.connect(tpu="local")
    strategy = tf.distribute.TPUStrategy(tpu)
    print("on TPU")
except tf.errors.NotFoundError:
    print("not on TPU")
    strategy = tf.distribute.MirroredStrategy()
    
print("REPLICAS: ", strategy.num_replicas_in_sync)

In [None]:
!pip install tensorflow_addons -q
!pip install -U albumentations -q
!pip install seaborn -q
!pip install gdown -q
import os
import psutil
import matplotlib.pyplot as plt
import matplotlib.patches as patches
import seaborn as sns
import math
import gc
import shutil
import xml.etree.ElementTree as ET
import albumentations as A
import cv2 as cv
#import tensorflow as tf
import tensorflow_addons as tfa
import tensorflow_datasets as tfds
import tensorflow_probability as tfp
from tensorflow import keras
from keras import Model
from keras.regularizers import l2
from keras.layers import (
    Add,
    Concatenate,
    Conv2D,
    SeparableConv2D,
    DepthwiseConv2D,
    Input,
    Lambda,
    LeakyReLU,
    MaxPool2D,
    UpSampling2D,
    ZeroPadding2D,
    BatchNormalization)
from keras.losses import (
    BinaryCrossentropy)
from keras.callbacks import (
    ModelCheckpoint,
    CSVLogger)
print(tf.__version__)

from IPython.display import set_matplotlib_formats
sns.set(rc={"figure.dpi":80, 'savefig.dpi':300})
sns.set_context('notebook')
sns.set_style("ticks")
set_matplotlib_formats('retina')

## Configs

In [None]:
config = {
    'LABELS': ['W', 'C'],
    'NUM_classes': 2,
    'INPUT_shape':  [1024, 1024, 3],
    'ANCHORS':  tf.constant([
        [(110.69006964, 115.58234666), (150.43162309, 155.19736292)],
        [(68.92051207, 71.54741845), (85.42938281, 91.21690259)],
        [(38.6832461, 39.43056285), (53.86257291, 55.85377213)]],
        tf.float32) / 1300,
    'ANCHORS_shape': [3, 2],
    'PARAMS_GS_alpha': [1.5, 1.75, 2.0],
    'PARAMS_WH_power': [6.0, 4.0, 2.0],
    'PARAMS_head_scale': [1.0, 1.5, 3.5],
    'PARAMS_conf_scale': [0.01, 0.01, 0.01],
    'RES_variations': [1088, 1024, 896, 832, 768, 704, 640]
}
directory = {
    'TRAIN_Annotations': '/kaggle/input/w-and-c-hand-signals-arrayfoo/Train/Annotations/',
    'TRAIN_Images': '/kaggle/input/w-and-c-hand-signals-arrayfoo/Train/Images/',
    'VALIDATION_Annotations': '/kaggle/input/w-and-c-hand-signals-arrayfoo/Validation/Annotations/',
    'VALIDATION_Images': '/kaggle/input/w-and-c-hand-signals-arrayfoo/Validation/Images/',
    'TEST_Annotations': '/kaggle/input/w-and-c-hand-signals-arrayfoo/Test/Annotations/',
    'TEST_Images': '/kaggle/input/w-and-c-hand-signals-arrayfoo/Test/Images/',
}
architecture = {
    'backbone': 'EfficientNetV2B1',
    'SPPF_C': 480,
    'Neck_C': [120, 240],
    'Det_Blocks': 2,
    'Head_Blocks': 1,
    'Head_C': 128
}
colors = ['#FFA500', '#6A5ACD']

## Utilities

In [None]:
def XYXY_to_XYWH(box):
  box_x = (box[0] + box[2]) / 2.0
  box_y = (box[1] + box[3]) / 2.0
  box_w = box[2] - box[0]
  box_h = box[3] - box[1]
  return tf.stack([[box_x], [box_y], [box_w], [box_h]], axis=-1)

def XYXY_to_YXYX(x):
  x1, y1, x2, y2 = tf.split(x[..., :4], (1, 1, 1, 1), axis=-1)
  return tf.concat([y1, x1, y2, x2], -1)

def Plot_Bbox_NMS(image, boxes, scores, labels, name=None):
  fig, ax = plt.subplots(figsize=(16, 12))
  ax.imshow(tf.squeeze(image))
  idx_non_zero = tf.where(scores)
  boxes = tf.gather_nd(boxes, idx_non_zero)
  scores = tf.gather_nd(scores, idx_non_zero)  
  labels = tf.gather_nd(labels, idx_non_zero)
  for box, label in zip(boxes, labels):
    label = tf.cast(label, tf.int32)
    box = box[..., :4] * config['INPUT_shape'][0]
    xywh = XYXY_to_XYWH(box)
    xywh = tf.squeeze(tf.cast(xywh, tf.int32))
    box = tf.cast(box, tf.int32)
    rect = patches.Rectangle((box[0], box[1]), xywh[2], xywh[3], linewidth=2.5, edgecolor=colors[label], facecolor='none')
    ax.add_patch(rect)
    ax.text(box[0], box[1] - 8, config['LABELS'][label], fontsize=12, color=colors[tf.squeeze(label)])
  plt.axis('off');
  if name:
    plt.savefig(name)
    plt.close(fig)
  else:
    plt.show()

def Plot_Bbox(image, labels):
  fig, ax = plt.subplots(figsize=(16, 12))
  ax.imshow(tf.squeeze(image))
  idx_labels = tf.where(labels[..., 4])
  for label in tf.gather_nd(labels, idx_labels):
    box = tf.cast(label[..., :4] * config['INPUT_shape'][0], tf.int32).numpy()
    xywh = XYXY_to_XYWH(box)
    xywh = tf.squeeze(tf.cast(xywh, tf.int32))
    idx_class = tf.squeeze(tf.where(label[..., 5:]))
    rect = patches.Rectangle((box[0], box[1]), xywh[2], xywh[3], linewidth=2.5, edgecolor=colors[idx_class], facecolor='none')
    ax.add_patch(rect)
    ax.text(box[0], box[1] - 8, config['LABELS'][idx_class], fontsize=12, color=colors[tf.squeeze(idx_class)])
  plt.axis('off'); plt.show()

# Dataset

In [None]:
class Data_Generator_VOC:
  def __init__(self, config, directory, ignore_negative=False, mode='TRAIN'):
    self.output_dim = config['NUM_classes'] + 5
    self.dir_annot = directory[mode + '_Annotations']
    self.dir_img = directory[mode + '_Images']
    self.labels = config['LABELS']
    self.num_classes = config['NUM_classes']
    self.ignore_negative = ignore_negative

  def test_dim(self, x):
    for dim in x:
      if int(dim * 1000) < 0. or (dim * 1000) > 1000: return False
    return True 

  def generator(self):
    for ann in sorted(os.listdir(self.dir_annot)):
      if "xml" not in ann: continue
      tree = ET.parse(self.dir_annot + ann)
      X1, Y1, X2, Y2, conf, label = [[] for _ in range(6)]; 
      for elem in tree.iter():
        if 'filename' in elem.tag:
          with tf.io.gfile.GFile(self.dir_img + elem.text, 'rb') as f: 
            image_raw = f.read()
        if 'width' in elem.tag: width = float(elem.text)
        if 'height' in elem.tag: height = float(elem.text)
        if 'object' in elem.tag or 'part' in elem.tag:  
          for attr in list(elem):
            if 'name' in attr.tag:
              label_exist = attr.text in self.labels
              if not label_exist: continue
              label_id = self.labels.index(attr.text)
            if 'bndbox' in attr.tag and label_exist:
              box_size = 0
              for i, dim in enumerate(list(attr)):
                if 'xmin' in dim.tag:
                  xmin = float(dim.text) / width
                  box_size += 1
                if 'ymin' in dim.tag:
                  ymin = float(dim.text) / height
                  box_size += 1
                if 'xmax' in dim.tag:
                  xmax = float(dim.text) / width
                  box_size += 1
                if 'ymax' in dim.tag:
                  ymax = float(dim.text) / height
                  box_size += 1
                if box_size == 4:
                  if not self.test_dim([xmin, ymin, xmax, ymax]): continue
                  X1.append(xmin); Y1.append(ymin); X2.append(xmax); Y2.append(ymax)
                  label.append(label_id)
                  conf.append(1)
      if len(X1) != len(conf) or (len(conf) < 1 and self.ignore_negative): continue
      example = tf.train.Example(features=tf.train.Features(feature={
        'encoded': tf.train.Feature(bytes_list=tf.train.BytesList(value=[image_raw])),
        'xmin':  tf.train.Feature(float_list=tf.train.FloatList(value=X1)),
        'ymin':  tf.train.Feature(float_list=tf.train.FloatList(value=Y1)),
        'xmax':  tf.train.Feature(float_list=tf.train.FloatList(value=X2)),
        'ymax':  tf.train.Feature(float_list=tf.train.FloatList(value=Y2)),
        'confidence':  tf.train.Feature(float_list=tf.train.FloatList(value=conf)),
        'label': tf.train.Feature(float_list=tf.train.FloatList(value=label)),
      }))
      yield example

class Dataset_Transformation:
  def __init__(self, config):
    self.config = config
    self.anchors = config["ANCHORS"]
    self.grid_size = config['INPUT_shape'][0] // 16
    self.transformed_shape = config['INPUT_shape'][:2]
    self.NUM_classes = config['NUM_classes']
    self.NUM_anchors = config['ANCHORS_shape'][1]

  def __call__(self, x, y):
    y, anchor_mask = self.prepare_dataset(y)
    output_1 = self.transform_output(y, anchor_mask, 0, self.grid_size)
    output_2 = self.transform_output(y, anchor_mask, 1, self.grid_size * 2)
    output_3 = self.transform_output(y, anchor_mask, 2, self.grid_size * 4)
    return x, tuple([output_1, output_2, output_3])

  def prepare_dataset(self, y):
    batch_size = tf.shape(y)[0]
    box_xy = tf.clip_by_value(y[..., :4], 0., 1.)
    box_wh = box_xy[..., 2:4] - box_xy[..., 0:2]
    box_wh = tf.tile(box_wh[..., tf.newaxis, :], [1, 1, self.anchors.shape[0], 1])
    box_area = box_wh[..., 0] * box_wh[..., 1]
    anchor_area = self.anchors[..., 0] * self.anchors[..., 1]
    intersection = tf.minimum(box_wh[..., 0, tf.newaxis], self.anchors[..., 0]) * tf.minimum(box_wh[..., 1, tf.newaxis], self.anchors[..., 1])
    iou = intersection / (anchor_area + box_area[..., tf.newaxis] - intersection)
    anchor_max_iou = tf.cast(tf.argmax(iou, axis=-1), tf.int32)
    labels = tf.one_hot(tf.cast(y[..., 5], tf.int32), self.NUM_classes)
    y_dataset = tf.concat([box_xy, y[..., 4:5], labels], -1)
    return y_dataset, anchor_max_iou

  def transform_output(self, y, anchor_mask, step, grid_size):
    batch_size = tf.shape(y)[0]
    size = tf.shape(y)[1]
    outputs = tf.zeros([batch_size, grid_size, grid_size, self.NUM_anchors, self.NUM_classes + 5])
    if tf.math.greater(tf.shape(y)[0], tf.constant([0])):
      box = y[..., 0:4]
      box_xy = (y[..., 0:2] + y[..., 2:4]) / 2
      grid_xy = tf.cast(box_xy // (1 / grid_size), tf.int32)
      idx_batch = tf.reshape(tf.repeat(tf.range(batch_size), size), [batch_size, size, 1])
      idx_non_zero = tf.where(y[..., 4])
      idx = tf.concat([idx_batch, grid_xy[..., 1, tf.newaxis], grid_xy[..., 0, tf.newaxis],  anchor_mask[..., step, tf.newaxis]], axis=-1)
      idx = tf.gather_nd(idx, idx_non_zero)
      y = tf.gather_nd(y, idx_non_zero)
      outputs = tf.tensor_scatter_nd_update(outputs, idx, y)
    return outputs

def parse_tfrecord(tfrecord, size):
    example = tf.io.parse_single_example(tfrecord, {
        'encoded': tf.io.FixedLenFeature([], tf.string),
        'xmin': tf.io.VarLenFeature(tf.float32),
        'ymin': tf.io.VarLenFeature(tf.float32),
        'xmax': tf.io.VarLenFeature(tf.float32),
        'ymax': tf.io.VarLenFeature(tf.float32),
        'confidence': tf.io.VarLenFeature(tf.float32),
        'label': tf.io.VarLenFeature(tf.float32)})

    x = tf.image.decode_png(example['encoded'], channels=3)
    x = tf.cast(x, tf.uint8)
    y = tf.stack(
        [tf.sparse.to_dense(example['xmin']),
         tf.sparse.to_dense(example['ymin']),
         tf.sparse.to_dense(example['xmax']),
         tf.sparse.to_dense(example['ymax']),
         tf.sparse.to_dense(example['confidence']),
         tf.sparse.to_dense(example['label'])], axis=1)
    return x, y

## Write TFRecords

In [None]:
def RecordWriter(config, directory):
  NUM_samples = []
  for mode in ['TRAIN', 'VALIDATION', 'TEST']:
    ignore_negative = True 
    if mode == 'TRAIN': 
      filename = 'Train.tfrecords'
      print('Writing Train TFRecords')
    elif mode == 'VALIDATION': 
      ignore_negative = True
      filename = 'Validation.tfrecords'
      print('Writing Validation TFRecords')  
    else: 
      ignore_negative = True
      filename = 'Test.tfrecords'
      print('Writing Test TFRecords')
        
    i = 0
    Data_Iterator = Data_Generator_VOC(config, directory, ignore_negative, mode=mode)
    writer = tf.io.TFRecordWriter(filename)

    for example in Data_Iterator.generator():
      i += 1
      writer.write(example.SerializeToString())
    writer.close()
    NUM_samples.append(i)
  return NUM_samples
NUM_samples = RecordWriter(config, directory)
print(NUM_samples)

## Transforms and Augmentation Pipeline

In [None]:
class Dataset_Augmentation:
  def __init__(self, size, resize_only=False):
    if resize_only:
      self.augment_pipeline = A.Compose([
        A.LongestMaxSize(max_size=size, always_apply=True),
        A.PadIfNeeded(min_height=size, min_width=size, border_mode=0, value=(0,0,0)),
        ], bbox_params=A.BboxParams(format='albumentations', label_fields=['index']))
    else:
      self.augment_pipeline = A.Compose([
        A.RandomBrightnessContrast(p=0.8, brightness_limit=(-0.1, 0.25), contrast_limit=(-0.1, 0.25)),
        A.ToGray(0.4),
        A.OneOf([
          A.GaussianBlur(p=0.5, blur_limit=(3, 5)),
          A.MotionBlur(p=0.5, blur_limit=3)
        ], p=0.8),
        A.Sharpen(p=0.5),
        A.HorizontalFlip(p=0.5),
        A.ShiftScaleRotate(p=0.95, rotate_limit=20, scale_limit=(-0.4, 0.4), shift_limit=0.4, border_mode=cv.BORDER_CONSTANT,),
      ], bbox_params=A.BboxParams(format='albumentations', min_visibility=0.7, label_fields=['index']))

  def method(self, x, y, c):
    transformed = self.augment_pipeline(image=x, bboxes=y, index=c)
    return tf.cast(transformed['image'], tf.uint8), tf.cast(transformed['bboxes'], tf.float32), tf.cast(transformed['index'], tf.float32)

  def __call__(self, x, y):
    x, bbox, labels = tf.numpy_function(func=self.method, inp=[x, y[..., :4], y[..., 5]], Tout=[tf.uint8, tf.float32, tf.float32], stateful=False)
    if tf.shape(bbox)[0] > 0:
      y = tf.concat([bbox, tf.ones([tf.shape(bbox)[0], 1]), labels[..., tf.newaxis]], axis=-1)
    else:
      y = tf.reshape(tf.convert_to_tensor(()), (0, 6))
    return x, y


def Dataset_Pipeline(config, batch_size, record_path, augment=False, cache=True, repeat=True, shuffle=True, cache_path='cache'):
  size = config['INPUT_shape'][0]
  options = tf.data.Options()
  options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.AUTO
  Dataset = tf.data.TFRecordDataset(record_path)
  Dataset = Dataset.map(lambda x: parse_tfrecord(x, size), num_parallel_calls=tf.data.AUTOTUNE)
  Dataset = Dataset.map(Dataset_Augmentation(size, resize_only=True), num_parallel_calls=tf.data.AUTOTUNE)
  if cache:
    Dataset = Dataset.cache(cache_path)
  if repeat: 
    Dataset = Dataset.repeat()
  if shuffle:
    Dataset = Dataset.shuffle(buffer_size=2048)
  if augment:
    Dataset = Dataset.map(Dataset_Augmentation(size, resize_only=False), num_parallel_calls=tf.data.AUTOTUNE)
  Dataset = Dataset.padded_batch(batch_size, padded_shapes=([size, size, 3], [20, 6]), drop_remainder=True)
  Dataset = Dataset.map(Dataset_Transformation(config), num_parallel_calls=tf.data.AUTOTUNE)
  Dataset = Dataset.prefetch(buffer_size=tf.data.AUTOTUNE)
  return Dataset.with_options(options)

NUM_samples = [4395, 399, 498]
batch_size = 64
NUM_steps_per_epoch = (NUM_samples[0] // batch_size)

Dataset_Train = Dataset_Pipeline(config, batch_size, augment=True, shuffle=True, record_path='Train.tfrecords', cache_path='Train_Cache')
Dataset_Valid = Dataset_Pipeline(config, NUM_samples[1], repeat=False, shuffle=False, record_path='Validation.tfrecords', cache_path='Validation_Cache')
Dataset_Test = Dataset_Pipeline(config, NUM_samples[2], repeat=False, shuffle=False, record_path='Test.tfrecords', cache_path='Test_Cache')

## Cache Datasets on Disk

In [None]:
tfds.benchmark(Dataset_Valid, batch_size=NUM_samples[1])
tfds.benchmark(Dataset_Test, batch_size=NUM_samples[2])
tfds.benchmark(Dataset_Train, num_iter=NUM_steps_per_epoch + 1, batch_size=64)

## Visualization

In [None]:
for i, j in enumerate(Dataset_Train):
  print(j[0].shape)
  imageS = j[0]
  trueS = j[1][0][0]
  print(trueS.shape)
  used_mem = psutil.virtual_memory().used
  print("used memory: {} Mb".format(used_mem / 1024 / 1024))
  if i == 0: break

Plot_Bbox(imageS[0], trueS)
gc.collect()

In [None]:
#os.remove( '/kaggle/working/Test_Cache.index')
#os.remove('/kaggle/working/Test_Cache.data-00000-of-00001')
#os.remove( '/kaggle/working/Validation_Cache.index')
#os.remove('/kaggle/working/Validation_Cache.data-00000-of-00001')
#os.remove( '/kaggle/working/Train_Cache.index')
#os.remove('/kaggle/working/Train_Cache.data-00000-of-00001')

# Model

In [None]:
def Conv_SiLU(x, filters, kernel_size, strides=1, depth_multiplier=1, mode='Conv2D', batch_norm=True):
  if mode == 'SeparableConv2D':
    x = SeparableConv2D(
            filters=filters,
            kernel_size=kernel_size,
            strides=strides,
            padding='same',
            depth_multiplier=depth_multiplier,
            pointwise_initializer=tf.initializers.variance_scaling(),
            depthwise_initializer=tf.initializers.variance_scaling(),
            use_bias=not batch_norm
    )(x)
  else:
    x = Conv2D(filters=filters,
               kernel_size=kernel_size, 
               strides=strides, 
               padding='same', 
               use_bias=not batch_norm, 
               kernel_regularizer=l2(0.0005), 
               kernel_initializer='he_normal')(x)
  if batch_norm:
    x = BatchNormalization()(x)
    x = tf.nn.silu(x)
  return x

def Conv_MOD(x, filters):
  x = Conv_SiLU(x, filters, 1)
  x = Conv_SiLU(x, filters, 3)
  return x

class PAN:
  def __init__(self, Conv):
    self.Conv = Conv
    
  def Conv_Block(self, x, filters):
    x = DepthwiseConv2D(kernel_size=3, strides=1, padding='same')(x)
    x = BatchNormalization(synchronized=True)(x)
    x = tf.nn.silu(x)
    x = self.Conv(x, filters, 1)
    return x

  def Detection_Block(self, x, C_in, C_out=None):
    if C_out == None: C_out = C_in
    x_1 = self.Conv(x, C_in / 2, 1)
    x_2 = self.Conv(x, C_in / 2, 1)
    for _ in range(architecture['Det_Blocks']):
      x_2 = self.Conv_Block(x_2, C_in / 2)
    x = Concatenate()([x_1, x_2])
    x = self.Conv(x, C_out, 1)
    return x

  def SPPF(self, t, filters, pool_size=5, name='SPPF'):
    pool = MaxPool2D(pool_size, 1, padding='same')
    x = inputs = Input(t.shape[1:])
    x = self.Conv(x, filters // 2, 1)
    p_1 = pool(x)
    p_2 = pool(p_1)
    p_3 = pool(p_2)
    x = tf.concat([x, p_1, p_2, p_3], axis=-1)
    x = self.Conv(x, filters, 1)
    return Model(inputs, x, name=name)(t)
    
  def FPN_head(self, t, filters, conv=True, name=None):
    if isinstance(t, tuple):
      inputs = Input(t[0].shape[1:]), Input(t[1].shape[1:])
      x, x_skip = inputs
      x = UpSampling2D(2, interpolation='bilinear')(x)
      x = Concatenate()([x, x_skip])
      x = self.Detection_Block(x, filters)
      if conv: x = self.Conv(x, filters // 2, 1)
    else:
      x = inputs = Input(t.shape[1:])
      x = self.Conv(x, filters, 1)
    return Model(inputs, x, name=name)(t)

  def PAN_head(self, t, filters, name=None):
    inputs = Input(t[0].shape[1:]), Input(t[1].shape[1:])
    x, y = inputs
    x = self.Conv(x, filters, 3, 2)
    x = Concatenate()([x, y])
    x = self.Detection_Block(x, C_in=filters, C_out=filters*2)
    return Model(inputs, x, name=name)(t)

  def __call__(self, x_in):
    x_1, x_2, x_3 = x_in
    x_3 = self.SPPF(x_3, architecture['SPPF_C'])
    x_3 = y_3 = self.FPN_head(x_3, architecture['Neck_C'][1], name='FPN_13')
    x_3 = y_2 = self.FPN_head((x_3, x_2), architecture['Neck_C'][1], name='FPN_26')
    x_3 = y_1 = self.FPN_head((x_3, x_1), architecture['Neck_C'][0], conv=False, name='PAN_52')
    y_2 = self.PAN_head((y_1, y_2), architecture['Neck_C'][0], name='PAN_26')
    y_3 = self.PAN_head((y_2, y_3), architecture['Neck_C'][1], name='PAN_13')
    return (y_1, y_2, y_3)

def Decoupled_Head(filters, NUM_anchors, NUM_classes, name=None):
  class Head_Transformation(keras.layers.Layer):
    def __init__(self, NUM_anchors, NUM_classes):
      super().__init__()
      self.NUM_anchors = NUM_anchors
      self.NUM_classes = NUM_classes
    def call(self, x):
      return tf.reshape(x, (-1, tf.shape(x)[1], tf.shape(x)[2], self.NUM_anchors, self.NUM_classes + 5))

  def builder(t):
    x = inputs = Input(t.shape[1:])
    x_1 = x_2 = Conv_SiLU(x, architecture['Head_C'], 1)
    for _ in range(architecture['Head_Blocks']):
      x_1 = Conv_SiLU(x_1, architecture['Head_C'], 3)
      x_2 = Conv_SiLU(x_2, architecture['Head_C'], 3)
    x_Cls = Conv_SiLU(x_1, NUM_anchors * (NUM_classes), 1, batch_norm=False)
    x_Box = Conv_SiLU(x_2, NUM_anchors * 4, 1, batch_norm=False)
    x_Conf = Conv_SiLU(x_2, NUM_anchors * 1, 1, batch_norm=False)
    x = tf.concat([x_Box, x_Conf, x_Cls], axis=-1)
    x = Head_Transformation(NUM_anchors, NUM_classes)(x)
    return tf.keras.Model(inputs, x, name=name)(t)
  return builder

class Output_Activation(tf.keras.layers.Layer):
  def __init__(self, NUM_classes, anchors, GS_alpha, WH_power, training=False):
    super().__init__()
    self.training = training
    self.NUM_classes = NUM_classes
    self.anchors = anchors
    self.GS_alpha = GS_alpha
    self.WH_power = WH_power

  def call(self, x):
    grid_size = tf.shape(x)[1:3]
    box_xy, box_wh, confidence, class_probs = tf.split(x, (2, 2, 1, self.NUM_classes), axis=-1)
    box_xy = tf.sigmoid(box_xy)
    box_wh = ((2 * tf.sigmoid(box_wh)) ** self.WH_power) * self.anchors
    
    if not self.training:
      confidence = tf.sigmoid(confidence)
      class_probs = tf.sigmoid(class_probs)

    grid = tf.meshgrid(tf.range(grid_size[0]), tf.range(grid_size[1]))
    grid = tf.stack(grid, axis=-1)[..., tf.newaxis, :]
    box_xy = (self.GS_alpha * box_xy - (self.GS_alpha - 1) / 2 + tf.cast(grid, x.dtype)) / tf.cast(grid_size, x.dtype)
    box_x1y1 = box_xy - box_wh / 2
    box_x2y2 = box_xy + box_wh / 2
    bbox = tf.concat([box_x1y1, box_x2y2], axis=-1)
    return bbox, confidence, class_probs

class EfficientNetV2:
  def __init__(self, mode='EfficientNetV2S', trainable=True):
    self.config = ['block2c_add', 'block4a_expand_activation', 'block6a_expand_activation']
    if mode == 'EfficientNetV2B0': 
        self.model = tf.keras.applications.efficientnet_v2.EfficientNetV2B0
        self.config = ['block2b_add', 'block4a_expand_activation', 'block6a_expand_activation']
    elif mode == 'EfficientNetV2B1': self.model = tf.keras.applications.efficientnet_v2.EfficientNetV2B1
    elif mode == 'EfficientNetV2B2': self.model = tf.keras.applications.efficientnet_v2.EfficientNetV2B2
    elif mode == 'EfficientNetV2B3': self.model = tf.keras.applications.efficientnet_v2.EfficientNetV2B3
    else:
      mode = 'EfficientNetV2S'
      self.model = tf.keras.applications.efficientnet_v2.EfficientNetV2S
    
    self.model = self.model(include_top=False,
                            weights='imagenet',
                            input_tensor=None,
                            input_shape=None,
                            include_preprocessing=False,
                            classifier_activation=None)
    self.mode = mode
    self.trainable = trainable
  
  def __call__(self):
    x = inputs = Input([None, None, 3])
    m = Model(inputs=self.model.inputs, outputs=self.model.get_layer(self.config[0]).output)
    n = Model(inputs=m.inputs, outputs = self.model.get_layer(self.config[1]).output)
    o = Model(inputs=n.inputs, outputs=(m.output, n.output, self.model.get_layer(self.config[2]).output))
    x = o(x)
    model = Model(inputs=inputs, outputs=x, name=self.mode)
    model.trainable = self.trainable
    return model

class Post_Process(keras.layers.Layer):
  def __init__(self, NUM_classes, max_boxes=25, IoU_thresh=0.5, score_thresh=0.55):
    super().__init__()
    self.NUM_classes = NUM_classes
    self.max_boxes = max_boxes
    self.IoU_thresh = IoU_thresh
    self.score_thresh = score_thresh

  def NMS(self, outputs):
    bbox, confidence, class_probs = [], [], []
    for o in outputs:
      bbox.append(tf.reshape(o[0], (tf.shape(o[0])[0], -1, tf.shape(o[0])[-1])))
      confidence.append(tf.reshape(o[1], (tf.shape(o[1])[0], -1, tf.shape(o[1])[-1])))
      class_probs.append(tf.reshape(o[2], (tf.shape(o[2])[0], -1, tf.shape(o[2])[-1])))

    bbox = tf.maximum(tf.concat(bbox, axis=1), 0.)[..., tf.newaxis, :]
    confidence = tf.concat(confidence, axis=1)
    class_probs = tf.concat(class_probs, axis=1)
    
    if self.NUM_classes == 1: scores = confidence
    else: scores = confidence * class_probs
    
    boxes, scores, classes, valid_detections = tf.image.combined_non_max_suppression(
        boxes=tf.cast(XYXY_to_YXYX(bbox), tf.float32),
        scores=tf.cast(scores, tf.float32),
        max_output_size_per_class=self.max_boxes,
        max_total_size=self.max_boxes,
        iou_threshold=self.IoU_thresh,
        score_threshold=self.score_thresh)

    return boxes, scores, classes, valid_detections

  def call(self, x):
    x = self.NMS(x)
    return x

## Architecture

In [None]:
def YOLOv3_MOD(Backbone, Neck, Head, SHAPE_input, NUM_anchors, NUM_classes, training=True):
  x = inputs = Input(SHAPE_input, name='input')
  if not training:
    x = tf.image.resize_with_pad(x, SHAPE_input[0], SHAPE_input[1])
  x = tf.keras.layers.Rescaling(scale=1./255.)(x)
  x = Backbone()(x)
  y_1, y_2, y_3 = Neck(x)
  y_3 = Head(240, NUM_anchors, NUM_classes, 'Head_13')(y_3)
  y_2 = Head(120, NUM_anchors, NUM_classes, 'Head_26')(y_2)
  y_1 = Head(60, NUM_anchors, NUM_classes, 'Head_52')(y_1)
  y = [y_3, y_2, y_1]
  if training: 
    return Model(inputs, y, name='YOLOv3_MOD')
  else:
    y = [Output_Activation(config['NUM_classes'], 
                           config['ANCHORS'][level], 
                           config['PARAMS_GS_alpha'][level], 
                           config['PARAMS_WH_power'][level], 
                           training=False)(y[level]) for level in [0, 1, 2]]
    y = Post_Process(NUM_classes)(y)
  
  return Model(inputs, y, name='YOLOv3_MOD')

## Loss Function

In [None]:
class Loss():
  def __init__(self, Output_Activation, NUM_classes, anchors, GS_alpha, WH_power, head_scale=1.0, conf_scale=0.01, IoU_thresh=0.6, name=None):
    self.Output_Activation = Output_Activation(NUM_classes, anchors, GS_alpha, WH_power, training=True)
    self.NUM_classes = NUM_classes
    self.anchors = anchors
    self.GS_alpha = GS_alpha
    self.IoU_thresh = IoU_thresh
    self.head_scale = head_scale
    self.conf_scale = conf_scale
    self.name = name

  def get_v(self, A_H, A_W, B_H, B_W):
    @tf.custom_gradient
    def forward(height, width):
      arctan = tf.atan(tf.math.divide_no_nan(A_W, A_H)) - tf.atan(tf.math.divide_no_nan(width, height))
      v = 4 * ((arctan / math.pi) ** 2)
      def grad(dv):
        gdw = dv * 8 * arctan * height / (math.pi ** 2)
        gdh = -dv * 8 * arctan * width / (math.pi ** 2)
        return [gdh, gdw]
      return v, grad
    return forward(B_H, B_W)
  
  def Bbox_IoU(self, A, B, loss=None):
    A_X1, A_Y1, A_X2, A_Y2 = tf.split(A, (1, 1, 1, 1), axis=-1)
    B_X1, B_Y1, B_X2, B_Y2 = tf.split(B, (1, 1, 1, 1), axis=-1)
    A_W, A_H = tf.maximum(0., A_X2 - A_X1), tf.maximum(0., A_Y2 - A_Y1)
    B_W, B_H = tf.maximum(0., B_X2 - B_X1), tf.maximum(0., B_Y2 - B_Y1)
    A_area, B_area = A_W * A_H, B_W * B_H
    inter_X1, inter_X2 = tf.maximum(A_X1, B_X1), tf.minimum(A_X2, B_X2)
    inter_Y1, inter_Y2 = tf.maximum(A_Y1, B_Y1), tf.minimum(A_Y2, B_Y2)
    inter_W, inter_H = tf.maximum(0., inter_X2 - inter_X1), tf.maximum(0., inter_Y2 - inter_Y1)
    inter_area =  inter_W * inter_H
    union_area = A_area + B_area - inter_area
    IoU = tf.math.divide_no_nan(inter_area, union_area)
    
    if loss in ['GIoU', 'DIoU', 'CIoU']:
      C_X1, C_X2 = tf.minimum(A_X1, B_X1), tf.maximum(A_X2, B_X2)
      C_Y1, C_Y2 = tf.minimum(A_Y1, B_Y1), tf.maximum(A_Y2, B_Y2)
    
      if loss in ['DIoU', 'CIoU']:
        l2_2 = (((B_X1 + B_X2) - (A_X1 + A_X2)) ** 2) / 4 + (((B_Y1 + B_Y2) - (A_Y1 + A_Y2)) ** 2) / 4
        diag_2 = (C_Y2 - C_Y1) ** 2 + (C_X2 - C_X1) ** 2
        DIoU = IoU - tf.math.divide_no_nan(l2_2, diag_2)
        if loss == 'DIoU':
          return IoU, 1. - DIoU
        v = self.get_v(A_H, A_W, B_H, B_W)
        alpha = tf.stop_gradient(tf.math.divide_no_nan(v, (1 - IoU) + v))
        CIoU = DIoU - alpha * v
        return IoU, 1. - CIoU

      if loss == 'GIoU':
        C_W = tf.maximum(0., C_X2 - C_X1)
        C_H = tf.maximum(0., C_Y2 - C_Y1)
        C_area = C_W * C_H
        GIoU = IoU - tf.math.divide_no_nan((C_area - union_area), C_area)
        return IoU, 1. - GIoU

    return IoU, 1. - IoU

  def logit(self, x):
    return - tf.math.log(1 / x - 1)

  def __call__(self, y_true, y_pred):
    pred_box, pred_obj, pred_class = self.Output_Activation(y_pred)
    true_box, true_obj, true_class = tf.split(y_true, (4, 1, self.NUM_classes), axis=-1)
    
    IoU, Loss_IoU = self.Bbox_IoU(true_box, pred_box, loss='CIoU')

    max_IoU = tf.reduce_max(IoU, axis=-1)
    ignore_mask = tf.stop_gradient(tf.cast(max_IoU < self.IoU_thresh, tf.float32))
    obj_mask = tf.squeeze(true_obj, axis=-1)
    conf_Loss_scale = tf.squeeze((true_obj - tf.sigmoid(pred_obj)) ** 2, axis=-1)
  
    Loss_IoU = tf.squeeze(Loss_IoU, axis=-1) * obj_mask
    
    Loss_conf = BinaryCrossentropy(from_logits=True, reduction='none')(true_obj, pred_obj)
    Loss_conf = conf_Loss_scale * (Loss_conf * obj_mask + self.conf_scale * (1 - obj_mask) * Loss_conf * ignore_mask)

    Loss_class = BinaryCrossentropy(from_logits=True,  reduction='none')(true_class, pred_class)
    Loss_class = Loss_class * obj_mask

    Loss_IoU = tf.reduce_mean(tf.reduce_sum(Loss_IoU, axis=[1,2,3]))
    Loss_conf = tf.reduce_mean(tf.reduce_sum(Loss_conf, axis=[1,2,3]))
    Loss_class = tf.reduce_mean(tf.reduce_sum(Loss_class, axis=[1,2,3]))
    Loss_total = (Loss_IoU * 2 + Loss_conf + Loss_class * 2) * self.head_scale
    return Loss_total

## Summarize Model

In [None]:
structure = [EfficientNetV2(mode=architecture['backbone'], trainable=True), PAN(Conv_SiLU), Decoupled_Head]
arch = YOLOv3_MOD(*structure, config['INPUT_shape'], config['ANCHORS_shape'][1], config['NUM_classes'], training=True)
arch.summary()

# Training

In [None]:
NUM_steps_per_epoch = (NUM_samples[0] // batch_size)

def Plot_learning_rate(lr, steps_per_epoch, epochs=100):
  steps = tf.range((NUM_samples[0] // batch_size)*epochs)
  rate = lr(steps)
  plt.plot(steps / steps_per_epoch, rate)
  plt.show()

lr_CosineDecayRestarts = tf.keras.optimizers.schedules.CosineDecayRestarts(
    initial_learning_rate=0.001, m_mul=0.4, t_mul=1,
    first_decay_steps=NUM_steps_per_epoch*80)

lr_PiecewiseConstantDecay = keras.optimizers.schedules.PiecewiseConstantDecay(
    boundaries=[int(c*NUM_steps_per_epoch) for c in [130, 180, 230]], 
    values=[0.001, 0.0003, 0.0001, 0.00005])

Plot_learning_rate(lr_CosineDecayRestarts, NUM_steps_per_epoch, 320)

In [None]:
def fit(model, epochs, resume=False, freeze_backbone=True, warmup=False, load_last=False, load_checkpoint=False):
  if not resume:
    if not freeze_backbone: model.trainable = True
    if load_last: model.load_weights('weights_last.h5')
    if load_checkpoint: model.load_weights('weights_checkpoint.h5')
    fit_config = [[
      config['NUM_classes'], 
      config['ANCHORS'][level], 
      config['PARAMS_GS_alpha'][level], 
      config['PARAMS_WH_power'][level], 
      config['PARAMS_head_scale'][level],
      config['PARAMS_conf_scale'][level]] for level in tf.range(3)]
    loss = [Loss(Output_Activation, *fit_config[level], name='Loss') for level in tf.range(3)]
    
    if warmup:
      learning_rate = keras.optimizers.schedules.PiecewiseConstantDecay(
            boundaries=[int(c*NUM_steps_per_epoch) for c in [1, 2, 3, 4, 5, 6, 7, 8, 9, 150, 225, 250, 300]],
            values=[1e-9, 1e-8, 1e-7, 1e-6, 1e-5, 1e-5, 1e-4, 1e-4, 1e-3, 0.001, 0.0003, 0.00006, 0.0001, 0.00001])

    else:
      learning_rate = lr_CosineDecayRestarts = tf.keras.optimizers.schedules.CosineDecayRestarts(
          initial_learning_rate=0.001, m_mul=0.4, t_mul=1,
          first_decay_steps=NUM_steps_per_epoch*80)

        
    model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate), jit_compile=True, steps_per_execution=1, loss=loss)
    
  callbacks = [
      ModelCheckpoint(filepath='weights_checkpoint.h5', save_weights_only=True,  monitor='val_loss', save_best_only=True),
      CSVLogger('log.csv', separator=',', append=True)]

  hist = model.fit(
       Dataset_Train,
       validation_data=Dataset_Valid,
       epochs=epochs,
       callbacks=callbacks,
       steps_per_epoch=NUM_samples[0] // batch_size,
       validation_steps=1)  
  model.save_weights('weights_last.h5')
  return hist

NUM_steps_per_epoch = (NUM_samples[0] // batch_size)
with strategy.scope():
  structure = [EfficientNetV2(mode=architecture['backbone'], trainable=True), PAN(Conv_SiLU), Decoupled_Head]
  model = YOLOv3_MOD(*structure, config['INPUT_shape'], config['ANCHORS_shape'][1], config['NUM_classes'], training=True)

history = fit(model, 150, load_last=False, freeze_backbone=False, resume=False, warmup=True)
history = fit(model, 320, load_last=False, freeze_backbone=False, resume=False, warmup=False)

# Evaluation

In [None]:
for full_batch in Dataset_Test: 
      x_true = full_batch[0]
      y_true = full_batch[1][0]
y_pred = model_test.predict(x_true, verbose=0)
y_pred = [Output_Activation(config['NUM_classes'], 
                           config['ANCHORS'][level], 
                           config['PARAMS_GS_alpha'][level], 
                           config['PARAMS_WH_power'][level], 
                           training=False)(y_pred[level]) for level in [0, 1, 2]]
y_pred = Post_Process(2, 30, 0.1, 0.5)(y_pred)

In [None]:
import numpy as np

def IoU(A, B):
  A_X1, A_Y1, A_X2, A_Y2 = tf.split(A, (1, 1, 1, 1), axis=-1)
  B_X1, B_Y1, B_X2, B_Y2 = tf.split(B, (1, 1, 1, 1), axis=-1)
  A_W, A_H = tf.maximum(0., A_X2 - A_X1), tf.maximum(0., A_Y2 - A_Y1)
  B_W, B_H = tf.maximum(0., B_X2 - B_X1), tf.maximum(0., B_Y2 - B_Y1)
  A_area, B_area = A_W * A_H, B_W * B_H
  inter_X1, inter_X2 = tf.maximum(A_X1, B_X1), tf.minimum(A_X2, B_X2)
  inter_Y1, inter_Y2 = tf.maximum(A_Y1, B_Y1), tf.minimum(A_Y2, B_Y2)
  inter_W, inter_H = tf.maximum(0., inter_X2 - inter_X1), tf.maximum(0., inter_Y2 - inter_Y1)
  inter_area =  inter_W * inter_H
  union_area = A_area + B_area - inter_area
  IoU = tf.math.divide_no_nan(inter_area, union_area)
  return IoU

class GC(keras.callbacks.Callback):
  def on_epoch_end(self, epoch, logs=None):
    gc.collect()

class mAP():
  def __init__(self, Dataset, NUM_classes, per_class=False):
    self.dataset = Dataset
    self.stats = None
    self.stats_true_boxes = None
    self.stats_discard = 0.
    self.NUM_classes = NUM_classes
    self.per_class = per_class
    self.metric = []

  @tf.function
  def compute_stats(self, pred_boxes, pred_scores, pred_classes, valid_detections, targets, thresh_IoU):
    pred_boxes = XYXY_to_YXYX(pred_boxes)
    idx_targets = tf.where(targets[..., 4])
    idx_count, _, count = tf.unique_with_counts(idx_targets[:, 0])
    valid_detections = valid_detections[:tf.shape(count)[0]]
    targets = tf.gather_nd(targets, idx_targets)
    NUM_pred = tf.reduce_sum(valid_detections)
    NUM_true = tf.shape(idx_targets)[0]
    idx_pred = tf.where(pred_boxes[..., 3])
    true_boxes = targets[..., :4]
    true_class = tf.cast(tf.where(targets[..., 5:]), tf.int32)[:,-1]
    pred_boxes = tf.gather_nd(pred_boxes, idx_pred)
    pred_classes = tf.gather_nd(pred_classes, idx_pred)
    pred_scores = tf.gather_nd(pred_scores, idx_pred)
    stats = tf.TensorArray(tf.float32, size=NUM_pred, clear_after_read=True)
    stats_true_boxes = tf.TensorArray(tf.float32, size=self.NUM_classes, clear_after_read=True)
    pos_true = pos_pred = count_true = count_pred = 0
    for i in tf.range(NUM_pred):
      stats = stats.write(i, [0., pred_classes[i], pred_scores[i]])
    for i in tf.range(self.NUM_classes):
      count_class = tf.reduce_sum(tf.cast(true_class == i, tf.float32))
      stats_true_boxes = stats_true_boxes.write(i, tf.cast(count_class, tf.float32))
    for i in tf.range(tf.shape(valid_detections)[0]):
      if valid_detections[i] > 0:
        true_box_i = true_boxes[count_true:(count_true + count[pos_true])]
        pred_box_i = tf.tile(pred_boxes[pos_pred:(pos_pred + valid_detections[i]), tf.newaxis], [1, count[pos_true], 1])
        IoU_i = IoU(true_box_i, pred_box_i)
        IoU_thresh = tf.reduce_any(IoU_i > thresh_IoU, axis=0)
        IoU_max = tf.argmax(IoU_i, axis=0, output_type=tf.int32)
        for j in tf.range(count[pos_true]):
          idx = pos_pred + tf.squeeze(IoU_max[j])
          if IoU_thresh[j]:
            stats = stats.write(idx, [1., pred_classes[idx], pred_scores[idx]])      
      count_true += count[pos_true]
      pos_true += 1
      pos_pred += valid_detections[i]
    return stats.stack(), stats_true_boxes.stack()

  #@tf.function
  def compute_AP(self, P, R):
    P_interp = tf.scan(lambda a, b: tf.maximum(a, b), P, reverse=True)  
    _, _, idx = tf.unique_with_counts(R)
    idx = tf.cast(tf.scan(lambda a, b: a + b, idx) - idx, tf.int32)
    P_curve = tf.gather(P_interp, idx)
    R_curve = tf.gather(R, idx)
    dR = tf.scan(lambda a, b: tf.maximum(b - a, 0.), R_curve, initializer=0.)
    AUC = dR * P_curve
    AP = tf.clip_by_value(tf.reduce_sum(AUC), 0., 1.)
    return AP

  def result(self):
    idx_sort = tf.argsort(self.stats[..., -1], direction='DESCENDING') 
    x = tf.gather(self.stats, idx_sort, axis=0)
    array = tf.TensorArray(tf.float32, size=self.NUM_classes, clear_after_read=True)
    array_precision = tf.TensorArray(tf.float32, size=self.NUM_classes, clear_after_read=False, infer_shape=(None))
    array_recall = tf.TensorArray(tf.float32, size=self.NUM_classes, clear_after_read=False, infer_shape=(None))
    for idx_class in tf.range(self.NUM_classes):
      mask_class = tf.cast(x[:, 1], tf.int32) == idx_class
      x_class = tf.boolean_mask(x, mask_class, axis=0)
      if tf.shape(x_class)[0] == 0:
        array = array.write(idx_class, 0.)
        if self.stats_true_boxes[idx_class] == 0: self.stats_discard += 1.
      else:
        cum_TP = tf.cumsum(x_class[:, 0])
        cum_range = tf.cast(tf.range(1, tf.shape(x_class[:, 0])[0] + 1), tf.float32)
        recall = tf.math.divide_no_nan(cum_TP, self.stats_true_boxes[idx_class])
        precision = tf.math.divide_no_nan(cum_TP, cum_range)
        array_precision = array_precision.write(idx_class, precision)
        array_recall = array_recall.write(idx_class, recall)
        #tf.print(2 * precision*recall/(precision + recall))
        array = array.write(idx_class, self.compute_AP(precision, recall))
    AP = array.stack()
    if self.per_class: 
      return AP
    else: 
      mAP = tf.math.divide_no_nan(tf.reduce_sum(AP), (self.NUM_classes - self.stats_discard))
      self.stats_discard = 0
      return mAP, array_precision, array_recall

  def update(self, y_true, y_pred, thresh_IoU):
    stats, stats_true_boxes = self.compute_stats(*y_pred, y_true, thresh_IoU)
    if self.stats == None: 
        self.stats = stats
        self.stats_true_boxes = stats_true_boxes
    else: 
        self.stats = (tf.concat([self.stats, stats], axis=0))
        self.stats_true_boxes += stats_true_boxes

  def reset(self):
    self.stats = None

  def __call__(self, model):
    gc.collect()
    metric = []
    
    for thresh_IoU in [0.5, 0.6, 0.7]: #0.55, 0.6, 0.65, 0.7, 0.75, 0.8, 0.85, 0.9, 0.95
        self.reset()
        self.update(y_true, y_pred, thresh_IoU)
        result = self.result()
        metric.append(result[0])
        r = result[1:]
        print(thresh_IoU)
        print(np.trapz(r[0].read(0), r[1].read(0)))
        print(np.trapz(r[0].read(1), r[1].read(1)), '\n')
    metric = tf.reduce_mean(metric)
    self.metric.append(metric)
    return metric, r

MAP = mAP(Dataset_Test, NUM_classes=2, per_class=False)
metric, result = MAP(model_test)

In [None]:
plt.plot(result[1].read(0), result[0].read(0), label='W')
plt.plot(result[1].read(1), result[0].read(1), label='C')
plt.ylim(ymax = 1, ymin = 0.1)
plt.xlabel("Recall")
plt.ylabel("Precision")
plt.legend(loc="upper right")
plt.savefig('PR.svg')

In [None]:
import pandas as pd
df = pd.read_csv("log.csv")
import pandas as pd
df = pd.read_csv("log.csv")
fig, ax = plt.subplots(figsize=(8, 5))
plt.plot(df['val_loss'])
plt.plot(df['loss'])
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['validation', 'train'], loc='upper right')
plt.ylim(2., 50)
plt.savefig('Loss.svg')

In [None]:
df.head()

In [None]:
import numpy as np

def IoU(A, B):
  A_X1, A_Y1, A_X2, A_Y2 = tf.split(A, (1, 1, 1, 1), axis=-1)
  B_X1, B_Y1, B_X2, B_Y2 = tf.split(B, (1, 1, 1, 1), axis=-1)
  A_W, A_H = tf.maximum(0., A_X2 - A_X1), tf.maximum(0., A_Y2 - A_Y1)
  B_W, B_H = tf.maximum(0., B_X2 - B_X1), tf.maximum(0., B_Y2 - B_Y1)
  A_area, B_area = A_W * A_H, B_W * B_H
  inter_X1, inter_X2 = tf.maximum(A_X1, B_X1), tf.minimum(A_X2, B_X2)
  inter_Y1, inter_Y2 = tf.maximum(A_Y1, B_Y1), tf.minimum(A_Y2, B_Y2)
  inter_W, inter_H = tf.maximum(0., inter_X2 - inter_X1), tf.maximum(0., inter_Y2 - inter_Y1)
  inter_area =  inter_W * inter_H
  union_area = A_area + B_area - inter_area
  IoU = tf.math.divide_no_nan(inter_area, union_area)
  return IoU

class mAP():
  def __init__(self, Dataset, NUM_classes, per_class=False):
    self.dataset = Dataset
    self.stats = None
    self.stats_true_boxes = None
    self.stats_discard = 0.
    self.NUM_classes = NUM_classes
    self.per_class = per_class
    self.metric = []

  @tf.function
  def compute_stats(self, pred_boxes, pred_scores, pred_classes, valid_detections, targets, thresh_IoU):
    pred_boxes = XYXY_to_YXYX(pred_boxes)
    idx_targets = tf.where(targets[..., 4])
    idx_count, _, count = tf.unique_with_counts(idx_targets[:, 0])
    valid_detections = valid_detections[:tf.shape(count)[0]]
    targets = tf.gather_nd(targets, idx_targets)
    NUM_pred = tf.reduce_sum(valid_detections)
    NUM_true = tf.shape(idx_targets)[0]
    idx_pred = tf.where(pred_boxes[..., 3])
    true_boxes = targets[..., :4]
    true_class = tf.cast(tf.where(targets[..., 5:]), tf.int32)[:,-1]
    pred_boxes = tf.gather_nd(pred_boxes, idx_pred)
    pred_classes = tf.gather_nd(pred_classes, idx_pred)
    pred_scores = tf.gather_nd(pred_scores, idx_pred)
    stats = tf.TensorArray(tf.float32, size=NUM_pred, clear_after_read=True)
    stats_true_boxes = tf.TensorArray(tf.float32, size=self.NUM_classes, clear_after_read=True)
    pos_true = pos_pred = count_true = count_pred = 0
    for i in tf.range(NUM_pred):
      stats = stats.write(i, [0., pred_classes[i], pred_scores[i]])
    for i in tf.range(self.NUM_classes):
      count_class = tf.reduce_sum(tf.cast(true_class == i, tf.float32))
      stats_true_boxes = stats_true_boxes.write(i, tf.cast(count_class, tf.float32))
        
    for i in tf.range(tf.shape(valid_detections)[0]):
      if valid_detections[i] > 0:
        true_box_i = true_boxes[count_true:(count_true + count[pos_true])]
        pred_box_i = tf.tile(pred_boxes[pos_pred:(pos_pred + valid_detections[i]), tf.newaxis], [1, count[pos_true], 1])
        IoU_i = IoU(true_box_i, pred_box_i)
        IoU_thresh = tf.reduce_any(IoU_i > thresh_IoU, axis=0)
        IoU_max = tf.argmax(IoU_i, axis=0, output_type=tf.int32)
        for j in tf.range(count[pos_true]):
          idx = pos_pred + tf.squeeze(IoU_max[j])
          if IoU_thresh[j]:
            stats = stats.write(idx, [1., pred_classes[idx], pred_scores[idx]])      
      count_true += count[pos_true]
      pos_true += 1
      pos_pred += valid_detections[i]
    return stats.stack(), stats_true_boxes.stack()


  def compute_AP(self, P, R):
    P_interp = tf.scan(lambda a, b: tf.maximum(a, b), P, reverse=True)  
    _, _, idx = tf.unique_with_counts(R)
    idx = tf.cast(tf.scan(lambda a, b: a + b, idx) - idx, tf.int32)
    P_curve = tf.gather(P_interp, idx)
    R_curve = tf.gather(R, idx)
    dR = tf.scan(lambda a, b: tf.maximum(b - a, 0.), R_curve, initializer=0.)
    AUC = dR * P_curve
    AP = tf.clip_by_value(tf.reduce_sum(AUC), 0., 1.)
    return AP

  def result(self):
    idx_sort = tf.argsort(self.stats[..., -1], direction='DESCENDING') 
    x = tf.gather(self.stats, idx_sort, axis=0)
    array = tf.TensorArray(tf.float32, size=self.NUM_classes, clear_after_read=True)
    array_precision = tf.TensorArray(tf.float32, size=self.NUM_classes, clear_after_read=False, infer_shape=(None))
    array_recall = tf.TensorArray(tf.float32, size=self.NUM_classes, clear_after_read=False, infer_shape=(None))
    for idx_class in tf.range(self.NUM_classes):
      mask_class = tf.cast(x[:, 1], tf.int32) == idx_class
      x_class = tf.boolean_mask(x, mask_class, axis=0)
      if tf.shape(x_class)[0] == 0:
        array = array.write(idx_class, 0.)
        if self.stats_true_boxes[idx_class] == 0: self.stats_discard += 1.
      else:
        cum_TP = tf.cumsum(x_class[:, 0])
        cum_range = tf.cast(tf.range(1, tf.shape(x_class[:, 0])[0] + 1), tf.float32)
        recall = tf.math.divide_no_nan(cum_TP, self.stats_true_boxes[idx_class])
        precision = tf.math.divide_no_nan(cum_TP, cum_range)
        array_precision = array_precision.write(idx_class, precision[-1])
        array_recall = array_recall.write(idx_class, recall[-1])
        array = array.write(idx_class, self.compute_AP(precision, recall))
    AP = array.stack()
    if self.per_class: 
      return AP
    else: 
      mAP = tf.math.divide_no_nan(tf.reduce_sum(AP), (self.NUM_classes - self.stats_discard))
      self.stats_discard = 0
      return mAP, array_precision, array_recall

  def update(self, y_true, y_pred, thresh_IoU):
    stats, stats_true_boxes = self.compute_stats(*y_pred, y_true, thresh_IoU)
    if self.stats == None: 
        self.stats = stats
        self.stats_true_boxes = stats_true_boxes
    else: 
        self.stats = (tf.concat([self.stats, stats], axis=0))
        self.stats_true_boxes += stats_true_boxes

  def reset(self):
    self.stats = None

  def __call__(self, model):
    gc.collect()
    metric = []
    PR = []
    conf = tf.range(0.1, 0.9, 0.025)
    
    for i, y_pred in enumerate(y_pred_conf):
        self.reset()
        self.update(y_true, y_pred, 0.5)
        result = self.result()
        metric.append(result[0])
        P, R = result[1:]
        PR.append([conf[i], P.read(0), R.read(0), P.read(1), R.read(1)])

    metric = tf.reduce_mean(metric)
    self.metric.append(metric)
    return metric, tf.stack(PR)

MAP = mAP(Dataset_Test, NUM_classes=2, per_class=False)
metric, PR = MAP(model_test)

In [None]:
fig, ax = plt.subplots(figsize=(8, 5))
plt.plot(PR[:, 0], PR[:, 1])
plt.plot(PR[:, 0], PR[:, 2])
plt.legend(['precision', 'recall'], loc='lower right')
plt.xlabel('Confidence Threshold')
plt.show()

fig, ax = plt.subplots(figsize=(8, 5))
plt.plot(PR[:, 0], PR[:, 3])
plt.plot(PR[:, 0], PR[:, 4])
plt.legend(['precision', 'recall'], loc='lower right')
plt.xlabel('Confidence Threshold')
plt.show()

In [None]:
for batch in Dataset_Test: 
      x_true = batch[0]
      y_true = batch[1][0]

y_pred_conf = []
y_pred = model_test.predict(x_true, verbose=0)
y_pred = [Output_Activation(config['NUM_classes'], 
                           config['ANCHORS'][level], 
                           config['PARAMS_GS_alpha'][level], 
                           config['PARAMS_WH_power'][level], 
                           training=False)(y_pred[level]) for level in [0, 1, 2]]

for conf in tf.range(0.1, 0.9, 0.025):
  y_pred_conf.append(Post_Process(2, 30, 0.5, conf)(y_pred))

### Save Weights

In [None]:
filepath_weights = "weights_last.h5"
model.save_weights(filepath_weights)

# Results

In [None]:
#!gdown 1_2xPFYOsfLegbRoVocyKgb7hjuzKJh5W
with strategy.scope():
  model_test = YOLOv3_MOD(EfficientNetV2(mode='EfficientNetV2B1', trainable=True), PAN(Conv_SiLU), Decoupled_Head, config['INPUT_shape'], config['ANCHORS_shape'][1], config['NUM_classes'], training=True)
  model_test.load_weights("weights_checkpoint.h5") #weights_last

In [None]:
gc.collect()
image_ = tf.image.flip_left_right(image[19])
image_ = tfa.image.translate_xy(image_, [-120., 129.], replace=0.)
y = model_test.predict(image_[tf.newaxis, ...])
y = [Output_Activation(config['NUM_classes'], 
                           config['ANCHORS'][level], 
                           config['PARAMS_GS_alpha'][level], 
                           config['PARAMS_WH_power'][level], 
                           training=False)(y[level]) for level in [0, 1, 2]]

y = Post_Process(2, 100, 0.45, 0.5)(y)
Plot_Bbox_NMS(image_, XYXY_to_YXYX(y[0][0]), y[1][0], y[2][0])

In [None]:
def Plot_Bbox_NMS(image, boxes, scores, labels, name=None):
  fig, ax = plt.subplots(figsize=(12, 8))
  ax.imshow(tf.squeeze(image))
  idx_non_zero = tf.where(scores)
  boxes = tf.gather_nd(boxes, idx_non_zero)
  scores = tf.gather_nd(scores, idx_non_zero)  
  labels = tf.gather_nd(labels, idx_non_zero)
  for box, label in zip(boxes, labels):
    label = tf.cast(label, tf.int32)
    box = box[..., :4] * config['INPUT_shape'][0]
    xywh = XYXY_to_XYWH(box)
    xywh = tf.squeeze(tf.cast(xywh, tf.int32))
    box = tf.cast(box, tf.int32)
    rect = patches.Rectangle((box[0], box[1]), xywh[2], xywh[3], linewidth=2.5, edgecolor=colors[label], facecolor='none')
    ax.add_patch(rect)
    ax.text(box[0], box[1] - 8, config['LABELS'][label], fontsize=12, color=colors[tf.squeeze(label)])
  plt.axis('off');
  if name:
    plt.savefig(name)
    plt.close(fig)
  else:
    plt.show()
    
def draw_predictions(image, boxes, scores, labels, name=None):
  image = tf.squeeze(image)
  col = [[255, 165, 0], [106, 90, 205]]
  idx_non_zero = tf.where(scores)
  boxes = tf.cast(XYXY_to_YXYX(tf.gather_nd(boxes, idx_non_zero)) * config['INPUT_shape'][0], tf.int32)
  scores = tf.gather_nd(scores, idx_non_zero)  
  labels = tf.gather_nd(labels, idx_non_zero)
  zeros = tf.zeros_like(image)

  for CLS in tf.range(config['NUM_classes']):
    idx_CLS = tf.where(tf.cast(labels, tf.int32) == CLS)
    b = tf.gather_nd(boxes, idx_CLS).numpy()
    for i in tf.range(b.shape[0]):
      start, end = b[i, :2], b[i, 2:]
      mask = tf.cast(cv.rectangle(zeros.numpy(), start, end , col[CLS], 2), tf.uint8)
      image = tf.maximum(image * tf.cast((mask == 0), tf.uint8), mask)
  return image
    

In [None]:
for i, j in enumerate(Dataset_Test):
  image = j[0]
  true_ = j[1][0]
  true = j[1][0][0]
  if i == 1: break
image_ = tf.image.flip_left_right(image[0])
image_ = tfa.image.translate_xy(image_, [-120., 129.], replace=0.)
pred = model_test.predict(image_[tf.newaxis, ...])
Plot_Bbox_NMS(image_, pred[0][0], pred[1][0], pred[2][0])

pred = model_test.predict(image[0][tf.newaxis, ...])
#Plot_Bbox_NMS(image[0][tf.newaxis, ...], pred[0][0], pred[1][0], pred[2][0])
#Plot_Bbox(image[0], true)