In [36]:
import tensorflow as tf
import numpy as np 
import pandas as pd
import os
import matplotlib.pyplot as plt
from PIL import Image
os.environ["CUDA_VISIBLE_DEVICES"] = '0'


pram

In [58]:
MODEL_NAME = 'efficientdet_d0'

NUM_CLASSES = 80

EPOCHS = 300
BATCH_SIZE = 4

INITIAL_LR = 0.01
DECAY_STEPS = 433 * 155
width=640
height=512
train_data=pd.read_csv('../../data/txt/train.csv',  header=None)
image_path='../../data/image/train/'

Dataload

In [49]:
class Anchors():
    """Anchor boxes generator."""

    def __init__(self,
                 aspect_ratios=[0.5, 1, 2],
                 scales=[0, 1/3, 2/3]):
        """Initialize anchors generator.

        Args:
            aspect_ratios: a list of floats representing aspect
                ratios of anchor boxes on each feature level.
            scales: a list of floats representing different scales
                of anchor boxes on each feature level.
        """
        self._aspect_ratios = aspect_ratios
        self._scales = [2**i for i in scales]
        self._num_anchors = len(aspect_ratios) * len(scales)

        self._strides = [2**i for i in range(3, 8)]
        self._areas = [i**2 for i in [32.0, 64.0, 128.0, 256.0, 512.0]]
        self._anchor_dims = self._compute_dims()

    def _compute_dims(self):
        """Compute height and width for each anchor box on each level.

        Returns:
            A float tensor with shape (5, num_anchors, 2) where each
                pair representing height and width of anchor box.
        """
        all_dims = list()
        for area in self._areas:
            level_dims = list()
            for aspect_ratio in self._aspect_ratios:
                height = tf.math.sqrt(area * aspect_ratio)
                width = area / height
                dims = tf.cast([height, width], tf.float32)
                for scale in self._scales:
                    level_dims.append(dims * scale)
            all_dims.append(tf.stack(level_dims, axis=0))
        return tf.stack(all_dims, axis=0)

    @tf.function
    def _get_anchors(self, feature_height, feature_width, level):
        """Get anchors for with given height and width on given level.

        Args:
            feature_height: an integer representing height of feature map.
                Should be divisible by 2**level.
            feature_width: an integer representing width of feature map.
                Should be divisible by 2**level.
            level: an integer from range [3, 7] representing level
                of feature map.
        """
        rx = tf.range(feature_width, dtype=tf.float32) + .5
        ry = tf.range(feature_height, dtype=tf.float32) + .5
        xs = tf.tile(tf.reshape(rx, [1, -1]), [tf.shape(ry)[0], 1])
        ys = tf.tile(tf.reshape(ry, [-1, 1]), [1, tf.shape(rx)[0]])

        centers = tf.stack([xs, ys], axis=-1) * self._strides[level - 3]
        centers = tf.reshape(centers, [-1, 1, 2])
        centers = tf.tile(centers, [1, self._num_anchors, 1])
        centers = tf.reshape(centers, [-1, 2])

        dims = tf.tile(self._anchor_dims[level - 3], [feature_height * feature_width, 1])
        return tf.concat([centers, dims], axis=-1)

    def get_anchors(self, image_height, image_width):
        """Get anchors for given height and width on all levels.

        Args:
            image_height: an integer representing height of image.
            image_width: an integer representing width of image.
        """
        anchors = [
            self._get_anchors(
                tf.math.ceil(image_height / 2**i),
                tf.math.ceil(image_width / 2**i),
                i
            ) for i in range(3, 8)
        ]
        return tf.concat(anchors, axis=0)

def to_corners(bbox):
    """Convert [x, y, width, height] to [x_min, y_min, x_max, y_max]."""
    return tf.concat(
        [bbox[..., :2] - bbox[..., 2:] / 2.0, bbox[..., :2] + bbox[..., 2:] / 2.0], axis=-1
    )
def compute_iou(boxes_1, boxes_2):
    """Compute intersection over union.

    Args:
        boxes_1: a tensor with shape (N, 4) representing bounding boxes
            where each box is of the format [x, y, width, height].
        boxes_2: a tensor with shape (M, 4) representing bounding boxes
            where each box is of the format [x, y, width, height].

    Returns:
        IOU matrix with shape (N, M).
    """

    boxes_1_corners = to_corners(boxes_1)
    boxes_2_corners = to_corners(boxes_2)

    left_upper = tf.maximum(boxes_1_corners[..., None, :2], boxes_2_corners[..., :2])
    right_lower = tf.minimum(boxes_1_corners[..., None, 2:], boxes_2_corners[..., 2:])
    diff = tf.maximum(0.0, right_lower - left_upper)
    intersection = diff[..., 0] * diff[..., 1]

    boxes_1_area = boxes_1[..., 2] * boxes_1[..., 3]
    boxes_2_area = boxes_2[..., 2] * boxes_2[..., 3]
    union = boxes_1_area[..., None] + boxes_2_area - intersection

    iou = intersection / union
    return tf.clip_by_value(iou, 0.0, 1.0)

class SamplesEncoder():
    """Enchoder of training batches."""

    def __init__(self,
                 aspect_ratios=[0.5, 1, 2],
                 scales=[0, 1/3, 2/3]):
        self._anchors = Anchors()
        self._box_variance = tf.cast(
            [0.1, 0.1, 0.2, 0.2], tf.float32
        )

    def _match_anchor_boxes(self, anchor_boxes, gt_boxes, match_iou=0.5, ignore_iou=0.4):
        """Assign ground truth boxes to all anchor boxes."""

        iou = compute_iou(anchor_boxes, gt_boxes)
        max_iou = tf.reduce_max(iou, axis=1)
        matched_gt_idx = tf.argmax(iou, axis=1)
        positive_mask = tf.greater_equal(max_iou, match_iou)
        negative_mask = tf.less(max_iou, ignore_iou)
        ignore_mask = tf.logical_not(tf.logical_or(positive_mask, negative_mask))
        return (
            matched_gt_idx,
            tf.cast(positive_mask, dtype=tf.float32),
            tf.cast(ignore_mask, dtype=tf.float32),
        )

    @tf.autograph.experimental.do_not_convert
    def _compute_box_target(self, anchor_boxes, matched_gt_boxes):
        box_target = tf.concat(
            [
                (matched_gt_boxes[:, :2] - anchor_boxes[:, :2]) / anchor_boxes[:, 2:],
                tf.math.log(matched_gt_boxes[:, 2:] / anchor_boxes[:, 2:]),
            ],
            axis=-1,
        )
        box_target = box_target / self._box_variance
        return box_target

    def _encode_sample(self, image_shape, gt_boxes, classes):
        anchor_boxes = self._anchors.get_anchors(image_shape[1], image_shape[2])
        matched_gt_idx, positive_mask, ignore_mask = self._match_anchor_boxes(
            anchor_boxes, gt_boxes
        )
        matched_gt_boxes = tf.gather(gt_boxes, matched_gt_idx)
        box_target = self._compute_box_target(anchor_boxes, matched_gt_boxes)

        classes = tf.cast(classes, dtype=tf.float32)
        matched_gt_classes = tf.gather(classes, matched_gt_idx)
        class_target = tf.where(tf.equal(positive_mask, 1.0), matched_gt_classes, -1.0)
        class_target = tf.where(tf.equal(ignore_mask, 1.0), -2.0, class_target)
        class_target = tf.expand_dims(class_target, axis=-1)

        label = tf.concat([box_target, class_target], axis=-1)

        return label

    def encode_batch(self, images, gt_boxes, classes):
        """Encode batch for training."""

        images_shape = tf.shape(images)
        batch_size = images_shape[0]

        labels = tf.TensorArray(dtype=tf.float32, size=batch_size)
        for i in range(batch_size):
            label = self._encode_sample(images_shape, gt_boxes[i], classes[i])
            labels = labels.write(i, label)
        images = tf.keras.applications.efficientnet.preprocess_input(images)
        return images, labels.stack()


In [74]:
cancer_train_data=train_data.loc[train_data[5] == 'cancer'].reset_index()
train_image=np.zeros((len(cancer_train_data),height,width,3))
train_label=np.zeros((len(cancer_train_data),4))
train_class=np.zeros((len(cancer_train_data),1))
for i in range(len(cancer_train_data)):
    img=Image.open(image_path+os.path.basename(cancer_train_data[0][i])).resize([width,height])
    train_image[i]=np.array(img)
    train_label[i]=np.array((cancer_train_data[1][i],cancer_train_data[2][i],cancer_train_data[3][i],cancer_train_data[4][i]))//2
    train_class[i]=0
# samples_encoder = SamplesEncoder()
# autotune = tf.data.experimental.AUTOTUNE
# train_data = train_data.shuffle(5000)
# train_data = train_data.padded_batch(BATCH_SIZE, padding_values=(0.0, 1e-8, -1.0))
# train_data = train_data.map(samples_encoder.encode_batch, num_parallel_calls=autotune)
# train_data = train_data.prefetch(autotune)

In [76]:
train_label

array([[  0., 225., 225., 419.],
       [  0.,  50., 128., 165.],
       [ 15.,  43., 110., 138.],
       ...,
       [113.,  93., 381., 317.],
       [114.,  60., 219., 190.],
       [ 99.,  56., 175., 174.]])

backbone

In [4]:
def get_backbone(name='efficientnet_b0',
                 weights='imagenet'):

    models = {
        'efficientnet_b0': tf.keras.applications.EfficientNetB0,
        'efficientnet_b1': tf.keras.applications.EfficientNetB1,
        'efficientnet_b2': tf.keras.applications.EfficientNetB2,
        'efficientnet_b3': tf.keras.applications.EfficientNetB3,
        'efficientnet_b4': tf.keras.applications.EfficientNetB4,
        'efficientnet_b5': tf.keras.applications.EfficientNetB5,
        'efficientnet_b6': tf.keras.applications.EfficientNetB6,
        'efficientnet_b7': tf.keras.applications.EfficientNetB7
    }

    backbone = models[name](include_top=False,
                            weights=weights,
                            input_shape=[None, None, 3])

    outputs = [backbone.get_layer(layer_name).output for layer_name in [
        'block3b_add', # LEVEL P3
        'block5c_add', # LEVEL P4
        'top_activation' # LEVEL P5
        ]]

    return tf.keras.Model(inputs=backbone.inputs, outputs=outputs, name=name)

BiFPN

In [13]:
class BiFPNLayerNode(tf.keras.layers.Layer):
    """One node in BiFPN for features fusing."""

    def __init__(self,
                 channels=64,
                 kernel_size=3,
                 depth_multiplier=1,
                 name='BiFPN_node'):
        """Ininitialize node.
        Args:
            channels: an integer representing number of units inside the node.
            kernel_size: an integer or tuple/list of 2 integers, specifying 
                the height and width of the 2D convolution window.
            depth_multiplier: an integer representing depth multiplier for
                separable convolution layer.
            name: a string representing layer name.
        """
        super().__init__(name=name)
        self.channels = channels
        self.depth_multiplier = depth_multiplier
        self.kernel_size = kernel_size

    def build(self, inputs):
        self.w = self.add_weight(
            shape=(len(inputs), self.channels),
            initializer="ones",
            name='sum_weights',
            trainable=True
        )

        self.conv2d = tf.keras.layers.SeparableConv2D(
            self.channels,
            self.kernel_size,
            padding='same',
            depth_multiplier=self.depth_multiplier,
            pointwise_initializer=tf.initializers.variance_scaling(),
            depthwise_initializer=tf.initializers.variance_scaling(),
            name='node_conv'
        )

        self.bn = tf.keras.layers.BatchNormalization()
        self.act = tf.keras.layers.Activation(tf.nn.silu)

    def call(self, inputs, training=False):
        """Fuse features.
        Args:
            inputs: a list with length equal to self.w.shape[0] of feature maps
                with equal shapes.
        Returns:
            A float tensor of fused features after applying convolution
            with batch normalization and SiLU activation.
        """
        norm = tf.math.reduce_sum(self.w, axis=0) + 1e-4
        scaled_tensors = [inputs[i] * self.w[i] / norm for i in range(self.w.shape[0])]
        w_sum = tf.math.add_n(scaled_tensors)
        conv = self.conv2d(w_sum)
        bn = self.bn(conv, training=training)
        return self.act(bn)
class BiFPNLayer(tf.keras.layers.Layer):
    """One layer of BiFPN."""

    def __init__(self,
                 channels=64,
                 kernel_size=3,
                 depth_multiplier=1,
                 pooling_strategy='avg',
                 name='BiFPN_Layer'):
        """Initialize BiFPN layer.
        Args:
            channels: an integer representing number of units inside each fusing node.
            kernel_size: an integer or tuple/list of 2 integers, specifying 
                the height and width of the 2D convolution window.
            depth_multiplier: an integer representing depth multiplier for
                separable convolution layers in BiFPN nodes.
            pooling_strategy: a string representing pooling strategy.
                'avg' or 'max'. Otherwise the max pooling will be selected.
            name: a string representing layer name.
        """
        super().__init__(name=name)
        self.pooling_strategy = pooling_strategy

        self.first_step_nodes = [BiFPNLayerNode(channels=channels,
                                                kernel_size=kernel_size,
                                                depth_multiplier=depth_multiplier,
                                                name=f'step_1_level_{i}_node') for i in range(4, 7)]
        self.second_step_nodes = [BiFPNLayerNode(channels=channels,
                                                 kernel_size=kernel_size,
                                                 depth_multiplier=depth_multiplier,
                                                 name=f'step_2_level_{i}_node') for i in range(3, 8)]

    def call(self, inputs, training=False):
        """Perfrom features fusing from different levels. (Inputs length equals 5)"""

        # TOP-DOWN PATHWAY
        # UPSAMPLE LEVEL 7 FEATURE MAP
        upscaled = self._upscale2d(inputs[-1])
        # FUSE LEVELS 6 AND 7
        first_step_outs = [self.first_step_nodes[-1]([inputs[-2], upscaled], training=training)]
        for i in range(2):
            # UPSAMPLE PREVIOUS RESULT OF FUSING
            upscaled = self._upscale2d(first_step_outs[i])
            # FUSE FEATURE MAPS
            fused = self.first_step_nodes[1-i]([inputs[-3-i], upscaled])
            first_step_outs.append(fused)

        # BOTTOM-UP PATHWAY
        # UPSAMPLE LAST RESULT OF FEATURE FUSING FROM TOP-DOWN PATH   
        upscaled = self._upscale2d(first_step_outs[-1])
        # FUSE LEVELS 3 AND 4^TD
        second_step_outs = [self.second_step_nodes[0]([inputs[0], upscaled])]
        for i in range(1, 4):
            # DOWNSAMPLE PREVIOUS RESULT OF FUSING
            downscaled = self._pool2d(second_step_outs[-1])
            # FUSE FEATURES
            fused = self.second_step_nodes[i]([inputs[i], first_step_outs[3-i], downscaled], training=training)
            second_step_outs.append(fused)
        downscaled = self._pool2d(second_step_outs[-1])
        # FUSE LEVELS 7 AND 6^OUT
        fused = self.second_step_nodes[-1]([inputs[-1], downscaled])
        second_step_outs.append(fused)

        return second_step_outs

    def _pool2d(self, inputs):
        if self.pooling_strategy == 'avg':
            return tf.keras.layers.AveragePooling2D()(inputs)
        else:
            return tf.keras.layers.MaxPool2D()(inputs)

    def _upscale2d(self, inputs):
        return tf.keras.layers.UpSampling2D()(inputs)
class BiFPN(tf.keras.layers.Layer):
    """Bidirectional Feature Pyramid Network."""

    def __init__(self,
                 channels=64,
                 depth=3,
                 kernel_size=3,
                 depth_multiplier=1,
                 pooling_strategy='avg',
                 name='BiFPN'):
        super().__init__(name=name)
        """Initialize BiFPN.
        Args:
            channels: an integer representing number of units inside each fusing node
                and convolution layer.
            depth: an integer representing number of BiFPN layers. depth > 0.
            kernel_size: an integer or tuple/list of 2 integers, specifying 
                the height and width of the 2D convolution window.
            depth_multiplier: an integer representing depth multiplier for
                separable convolution layers in BiFPN nodes.
            pooling_strategy: a string representing pooling strategy in BiFPN layers.
                'avg' or 'max'. Otherwise the max pooling will be selected.
            name: a string representing layer name.
        """
        self.depth = depth
        self.channels = channels
        self.pooling_strategy = pooling_strategy

        self.convs_1x1 = [tf.keras.layers.Conv2D(channels,
                                                 1,
                                                 padding='same',
                                                 name=f'1x1_conv_level_{3+i}') for i in range(5)]

        self.bns = [
            tf.keras.layers.BatchNormalization(name=f'bn_level_{i}') for i in range(5)
        ]
        self.act = tf.keras.layers.Activation(tf.nn.silu)

        self.bifpn_layers = [BiFPNLayer(channels=channels,
                                        kernel_size=kernel_size,
                                        depth_multiplier=depth_multiplier,
                                        pooling_strategy=pooling_strategy,
                                        name=f'BiFPN_Layer_{i}') for i in range(depth)]

    def call(self, inputs, training=False):
        assert len(inputs) == 5

        squeezed = [self.convs_1x1[i](inputs[i]) for i in range(5)]
        normalized = [self.bns[i](squeezed[i], training=training) for i in range(5)]
        activated = [self.act(normalized[i]) for i in range(5)]
        feature_maps = self.bifpn_layers[0](activated, training=training)
        for layer in self.bifpn_layers[1:]:
            feature_maps = layer(feature_maps, training=training)

        return feature_maps

Loss

In [39]:
class FocalLoss(tf.keras.losses.Loss):
    """Focal loss implementations."""

    def __init__(self,
                 alpha=0.25,
                 gamma=1.5,
                 label_smoothing=0.1,
                 name='focal_loss'):
        """Initialize parameters for Focal loss.

        FL = - alpha_t * (1 - p_t) ** gamma * log(p_t)
        This implementation also includes label smoothing for preventing overconfidence.
        """
        super().__init__(name=name, reduction="none")
        self.alpha = alpha
        self.gamma = gamma
        self.label_smoothing = label_smoothing

    def call(self, y_true, y_pred):
        """Calculate Focal loss.

        Args:
            y_true: a tensor of ground truth values with
                shape (batch_size, num_anchor_boxes, num_classes).
            y_pred: a tensor of predicted values with
                shape (batch_size, num_anchor_boxes, num_classes).

        Returns:
            A float tensor with shape (batch_size, num_anchor_boxes) with
            loss value for every anchor box.
        """
        prob = tf.sigmoid(y_pred)
        pt = y_true * prob + (1 - y_true) * (1 - prob)
        at = y_true * self.alpha + (1 - y_true) * (1 - self.alpha)

        y_true = y_true * (1.0 - self.label_smoothing) + 0.5 * self.label_smoothing
        ce = tf.nn.sigmoid_cross_entropy_with_logits(labels=y_true, logits=y_pred)

        loss = at * (1.0 - pt)**self.gamma * ce
        return tf.reduce_sum(loss, axis=-1)


class BoxLoss(tf.keras.losses.Loss):
    """Huber loss implementation."""

    def __init__(self,
                 delta=1.0,
                 name='box_loss'):
        super().__init__(name=name, reduction="none")
        self.delta = delta

    def call(self, y_true, y_pred):
        """Calculate Huber loss.

        Args:
            y_true: a tensor of ground truth values with shape (batch_size, num_anchor_boxes, 4).
            y_pred: a tensor of predicted values with shape (batch_size, num_anchor_boxes, 4).

        Returns:
            A float tensor with shape (batch_size, num_anchor_boxes) with
            loss value for every anchor box.
        """
        loss = tf.abs(y_true - y_pred)
        l1 = self.delta * (loss - 0.5 * self.delta)
        l2 = 0.5 * loss ** 2
        box_loss = tf.where(tf.less(loss, self.delta), l2, l1)
        return tf.reduce_sum(box_loss, axis=-1)


class EffDetLoss(tf.keras.losses.Loss):
    """Composition of Focal and Huber losses."""

    def __init__(self,
                 num_classes=80,
                 alpha=0.25,
                 gamma=1.5,
                 label_smoothing=0.1,
                 delta=1.0,
                 name='effdet_loss'):
        """Initialize Focal and Huber loss.

        Args:
            num_classes: an integer number representing number of
                all possible classes in training dataset.
            alpha: a float number for Focal loss formula.
            gamma: a float number for Focal loss formula.
            label_smoothing: a float number of label smoothing intensity.
            delta: a float number representing a threshold in Huber loss
                for choosing between linear and cubic loss.
        """
        super().__init__(name=name)
        self.class_loss = FocalLoss(alpha=alpha, gamma=gamma, label_smoothing=label_smoothing)
        self.box_loss = BoxLoss(delta=delta)
        self.num_classes = num_classes

    @tf.autograph.experimental.do_not_convert
    def call(self, y_true, y_pred):
        """Calculate Focal and Huber losses for every anchor box.

        Args:
            y_true: a tensor of ground truth values with shape (batch_size, num_anchor_boxes, 5)
                representing anchor box correction and class label.
            y_pred: a tensor of predicted values with
                shape (batch_size, num_anchor_boxes, num_classes).

        Returns:
            loss: a float loss value.
        """
        y_pred = tf.cast(y_pred, dtype=tf.float32)

        box_labels = y_true[..., :4]
        box_preds = y_pred[..., :4]

        cls_labels = tf.one_hot(
            tf.cast(y_true[..., 4], dtype=tf.int32),
            depth=self.num_classes,
            dtype=tf.float32
        )
        cls_preds = y_pred[..., 4:]

        positive_mask = tf.cast(tf.greater(y_true[..., 4], -1.0), dtype=tf.float32)
        ignore_mask = tf.cast(tf.equal(y_true[..., 4], -2.0), dtype=tf.float32)

        clf_loss = self.class_loss(cls_labels, cls_preds)
        box_loss = self.box_loss(box_labels, box_preds)
        clf_loss = tf.where(tf.equal(ignore_mask, 1.0), 0.0, clf_loss)
        box_loss = tf.where(tf.equal(positive_mask, 1.0), box_loss, 0.0)

        normalizer = tf.reduce_sum(positive_mask, axis=-1)
        clf_loss = tf.math.divide_no_nan(tf.reduce_sum(clf_loss, axis=-1), normalizer)
        box_loss = tf.math.divide_no_nan(tf.reduce_sum(box_loss, axis=-1), normalizer)
        loss = clf_loss + box_loss
        return loss

Model

In [14]:
class ClassDetector(tf.keras.layers.Layer):
    """Classification head."""

    def __init__(self,
                 num_classes=80,
                 channels=64,
                 num_anchors=9,
                 depth=3,
                 kernel_size=3,
                 depth_multiplier=1,
                 name='class_det'):
        """Initialize classification model.
        Args:
            num_classes: an integer representing number of classes
                to predict.
            channels: an integer representing number of filters
                inside each separable convolution layer.
            num_anchors: an integer representing number of anchor
                boxes.
            depth: an integer representing number of separable
                convolutions before final convolution.
            kernel_size: an integer or tuple/list of 2 integers, specifying 
                the height and width of the 2D convolution window.
            depth_multiplier: an integer representing depth multiplier for
                separable convolution layers.
            name: a string representing layer name.
        """
        super().__init__(name=name)
        self.num_classes = num_classes
        self.channels = channels
        self.depth = depth
        self.kernel_size = kernel_size
        self.depth_multiplier = depth_multiplier

        self.convs = [tf.keras.layers.SeparableConv2D(
            channels,
            kernel_size,
            padding='same',
            depth_multiplier=depth_multiplier,
            pointwise_initializer=tf.initializers.variance_scaling(),
            depthwise_initializer=tf.initializers.variance_scaling(),
            bias_initializer=tf.zeros_initializer(),
            name=f'class_det_separable_conv_{i}'
        ) for i in range(depth)]

        self.bns = [
            tf.keras.layers.BatchNormalization(name=f'bn_{i}') for i in range(depth)
        ]
        self.act = tf.keras.layers.Activation(tf.nn.silu)

        bias_init = tf.constant_initializer(-np.log((1 - 0.01) / 0.01))
        self.classes = tf.keras.layers.SeparableConv2D(
            num_classes * num_anchors,
            kernel_size,
            padding='same',
            depth_multiplier=depth_multiplier,
            activation=None,
            pointwise_initializer=tf.initializers.variance_scaling(),
            depthwise_initializer=tf.initializers.variance_scaling(),
            bias_initializer=bias_init,
            name='class_preds'
        )

    def call(self, inputs, training=False):
        for i in range(self.depth):
            inputs = self.convs[i](inputs)
            inputs = self.bns[i](inputs, training=training)
            inputs = self.act(inputs)
        class_output = self.classes(inputs)

        return class_output


class BoxRegressor(tf.keras.layers.Layer):
    """Regression head."""

    def __init__(self,
                 channels=64,
                 num_anchors=9,
                 depth=3,
                 kernel_size=3,
                 depth_multiplier=1,
                 name='box_regressor'):
        """Initialize regression model.
        Args:
            channels: an integer representing number of filters
                inside each separable convolution layer.
            num_anchors: an integer representing number of anchor
                boxes.
            depth: an integer representing number of separable
                convolutions before final convolution.
            kernel_size: an integer or tuple/list of 2 integers, specifying
                the height and width of the 2D convolution window.
            depth_multiplier: an integer representing depth multiplier for
                separable convolution layers.
            name: a string representing layer name.
        """
        super().__init__(name=name)
        self.channels=channels
        self.num_anchors=num_anchors
        self.depth=depth
        self.kernel_size=kernel_size
        self.depth_multiplier=depth_multiplier

        self.convs = [tf.keras.layers.SeparableConv2D(
            channels,
            kernel_size,
            padding='same',
            depth_multiplier=depth_multiplier,
            pointwise_initializer=tf.initializers.variance_scaling(),
            depthwise_initializer=tf.initializers.variance_scaling(),
            bias_initializer=tf.zeros_initializer(),
            name=f'box_reg_separable_conv_{i}'
        ) for i in range(depth)]

        self.bns = [
            tf.keras.layers.BatchNormalization(name=f'bn_{i}') for i in range(depth)
        ]
        self.act = tf.keras.layers.Activation(tf.nn.silu)

        self.boxes = tf.keras.layers.SeparableConv2D(
            4 * num_anchors,
            kernel_size,
            padding='same',
            depth_multiplier=depth_multiplier,
            activation=None,
            pointwise_initializer=tf.initializers.variance_scaling(),
            depthwise_initializer=tf.initializers.variance_scaling(),
            bias_initializer=tf.zeros_initializer(),
            name='box_preds'
        )

    def call(self, inputs, training=False):
        for i in range(self.depth):
            inputs = self.convs[i](inputs)
            inputs = self.bns[i](inputs, training=training)
            inputs = self.act(inputs)
        box_output = self.boxes(inputs)

        return box_output

In [15]:
class EfficientDet(tf.keras.Model):
    """EfficientDet model."""

    def __init__(self,
                 channels=64,
                 num_classes=80,
                 num_anchors=9,
                 bifpn_depth=3,
                 bifpn_kernel_size=3,
                 bifpn_depth_multiplier=1,
                 bifpn_pooling_strategy='avg',
                 heads_depth=3,
                 class_kernel_size=3,
                 class_depth_multiplier=1,
                 box_kernel_size=3,
                 box_depth_multiplier=1,
                 backbone_name='efficientnet_b0',
                 name='efficientdet_d0'):
        """Initialize EffDet. Default args refers to EfficientDet D0.
        Args:
            channels: an integer representing number of units inside each fusing
                node and convolution layer of BiFPN and head models.
            num_classes: an integer representing number of classes to predict.
            num_anchors: an integer representing number of anchor boxes.
            bifpn_depth: an integer representing number of BiFPN layers.
            bifpn_kernel_size: an integer or tuple/list of 2 integers, specifying
                the height and width of the 2D convolution window for BiFPN layers.
            bifpn_depth_multiplier: an integer representing depth multiplier for
                separable convolution layers in BiFPN nodes.
            bifpn_pooling_strategy: a string representing pooling strategy in BiFPN
                layers. 'avg' or 'max'. Otherwise the max pooling will be selected.
            heads_depth: an integer representing number of separable convolutions
                before final convolution in head models.
            class_kernel_size: an integer or tuple/list of 2 integers, specifying
                the height and width of the 2D convolution window for
                classifier model.
            class_depth_multiplier: an integer representing depth multiplier for
                separable convolution layers in classifier model.
            box_kernel_size: an integer or tuple/list of 2 integers, specifying
                the height and width of the 2D convolution window for
                regression model.
            box_depth_multiplier: an integer representing depth multiplier for
                separable convolution layers in regression model.
            name: a string representing model name.
        """
        super().__init__(name=name)
        self.num_classes=num_classes

        self.backbone = get_backbone(backbone_name)
        self.backbone.trainable = False

        self.BiFPN = BiFPN(channels=channels,
                           depth=bifpn_depth,
                           kernel_size=bifpn_kernel_size,
                           depth_multiplier=bifpn_depth_multiplier,
                           pooling_strategy=bifpn_pooling_strategy)
        self.class_det = ClassDetector(channels=channels,
                                       num_classes=num_classes,
                                       num_anchors=num_anchors,
                                       depth=heads_depth,
                                       kernel_size=class_kernel_size,
                                       depth_multiplier=class_depth_multiplier)
        self.box_reg = BoxRegressor(channels=channels,
                                    num_anchors=num_anchors,
                                    depth=heads_depth,
                                    kernel_size=box_kernel_size,
                                    depth_multiplier=box_depth_multiplier)

    def call(self, inputs, training=False):
        batch_size = tf.shape(inputs)[0]

        features = self.backbone(inputs)
        features.append(tf.keras.layers.AveragePooling2D()(features[-1]))
        features.append(tf.keras.layers.AveragePooling2D()(features[-1]))

        fpn_features = self.BiFPN(features, training=training)

        classes = list()
        boxes = list()
        for feature in fpn_features:
            classes.append(tf.reshape(self.class_det(feature, training=training), [batch_size, -1, self.num_classes]))
            boxes.append(tf.reshape(self.box_reg(feature, training=training), [batch_size, -1, 4]))

        classes = tf.concat(classes, axis=1)
        boxes = tf.concat(boxes, axis=1)

        return tf.concat([boxes, classes], axis=-1)

In [16]:
def get_efficientdet(name='efficientdet_d0',
                     num_classes=80,
                     num_anchors=9):
    models = {
        'efficientdet_d0': (64, 3, 3, 'efficientnet_b0'),
        'efficientdet_d1': (88, 4, 3, 'efficientnet_b1'),
        'efficientdet_d2': (112, 5, 3, 'efficientnet_b2'),
        'efficientdet_d3': (160, 6, 4, 'efficientnet_b3'),
        'efficientdet_d4': (224, 7, 4, 'efficientnet_b4'),
        'efficientdet_d5': (288, 7, 4, 'efficientnet_b5'),
        'efficientdet_d6': (384, 8, 5, 'efficientnet_b6'),
        'efficientdet_d7': (384, 8, 5, 'efficientnet_b6'),
    }
    return EfficientDet(channels=models[name][0],
                        num_classes=num_classes,
                        num_anchors=num_anchors,
                        bifpn_depth=models[name][1],
                        heads_depth=models[name][2],
                        name=name)

In [50]:
model = get_efficientdet(MODEL_NAME, num_classes=NUM_CLASSES)
loss = EffDetLoss(num_classes=1)

LR = tf.keras.experimental.CosineDecay(INITIAL_LR, DECAY_STEPS, 1e-3)
opt = tf.keras.optimizers.SGD(LR, momentum=0.9)