In [6]:
import tensorflow as tf
import tensorflow_addons as tfa
from tensorflow.keras.layers import Dropout, Activation, LayerNormalization, Dense

## BOUNDING BOX

In [67]:
from typing import Union, Dict, Tuple
import matplotlib.pyplot as plt
import numpy as np
import random
import cv2

def bbox_xcycwh_to_x1y1x2y2(bbox_xcycwh : np.array):
    
    
    bbox_x1y1x2y2 = np.zeros_like((bbox_xcycwh))
    bbox_x1y1x2y2[:,0] = bbox_xcycwh[:,0] - (bbox_xcycwh[:,2] / 2)
    bbox_x1y1x2y2[:,2] = bbox_xcycwh[:,0] + (bbox_xcycwh[:,2] / 2)
    bbox_x1y1x2y2[:,1] = bbox_xcycwh[:,1] - (bbox_xcycwh[:,3] / 2)
    bbox_x1y1x2y2[:,3] = bbox_xcycwh[:,1] - (bbox_xcycwh[:,3] / 2)
    bbox_x1y1x2y2 = bbox_x1y1x2y2.astype(np.int32)
    return bbox_x1y1x2y2

def intersect(box_a : tf.Tensor, box_b : tf.Tensor) -> tf.Tensor:
    A = tf.shape(box_a)[0]
    B = tf.shape(box_b)[0]
    
    tiled_box_a_xymax = tf.tile(tf.expand_dims(box_a[:,2:], axis=1), [1, B, 1])
    tiled_box_b_xymax = tf.tile(tf.expand_dims(box_b[:,2:], axis=0), [A, 1, 1])
    
    above_right_corner = tf.math.minimum(tiled_box_a_xymax, tiled_box_b_xymax)
    
    tiled_box_a_xymin = tf.tile(tf.expand_dims(box_a[:, :2], axis=1), [1,B,1])
    tiled_box_b_xymin = tf.tile(tf.expand_dims(box_b[:, :2], axis=0), [A,1,1])
    upper_left_corner = tf.math.maximum(tiled_box_a_xymin, tiled_box_b_xymin)
    
    inter = tf.nn.relu(above_right_corner - upper_left_corner)
    inter = inter[:, :, 0] * inter[:, :, 1]
    return inter

def overlap(box_a: tf.Tensor, box_b: tf.Tensor, return_union=False) -> tf.Tensor:
    inter = intersect(box_a, box_b)
    
    area_a = (box_a[:,2] - box_a[:,0]) * (box_a[:,3] - box_a[:,1])
    area_a = tf.tile(tf.expand_dims(area_a, axis=-1), [1, tf.shape(inter)[-2], 1])
    
    area_b = (box_b[:,2] - box_b[:,0]) * (box_b[:,3] - box_b[:,1])
    area_b = tf.tile(tf.expand_dims(area_b, axis=-2), [1, tf.shape(inter)[-2], 1])
    
    union = area_a + area_b - inter
    
    
    if return_union is False:
        return intersect/union
    else:
        return inter/union, union
    
def merge(box_a: tf.Tensor, box_b: tf.Tensor) -> Tuple[tf.Tensor, tf.Tensor]:
    
    A = tf.shape(box_a)[0]
    B = tf.shape(box_b)[0]
    
    tiled_box_a = tf.tile(tf.expand_dims(box_a, axis=1), [1, B, 1])
    tiled_box_b = tf.tile(tf.expand_dims(box_b, axis=0), [A, 1, 1])
    
    return tiled_box_a, tiled_box_b


def xy_min_xy_max_to_xcycwh(bbox: tf.Tensor) -> tf.Tensor:
    """
    Convert bbox from shape [xmin, ymin, xmax, ymax] to [xc, yc, w, h]
    Args:
        bbox A (tf.Tensor) list a bbox (n, 4) with n the number of bbox to convert
    Returns:
        The converted bbox
    """
    bbox_xcycwh = tf.concat([bbox[:, :2] + ((bbox[:, 2:] - bbox[:, :2]) / 2), bbox[:, 2:] - bbox[:, :2]], axis=-1)
    return bbox_xcycwh



def xcycwh_to_xy_min_xy_max(bbox: tf.Tensor) -> tf.Tensor:
    """
    Convert bbox from shape [xc, yc, w, h] to [xmin, ymin, xmax, ymax]
    Args:
        bbox A (tf.Tensor) list a bbox (n, 4) with n the number of bbox to convert
    Returns:
        The converted bbox
    """
    bbox_xyxy = tf.concat([bbox[:, :2] - (bbox[:, 2:] / 2), bbox[:, :2] + (bbox[:, 2:] / 2)], axis=-1)
    
    bbox_xyxy = tf.clip_by_value(bbox_xyxy, 0.0, 1.0)
    return bbox_xyxy



## ENCODER LAYER

In [68]:
class EncoderLayer(tf.keras.layers.Layer):
    def __init__(self, model_dim=512, num_heads=8, dim_feedforward=2048,
                dropout=0.1, activation='relu', normalize_before=False,
                **kwargs):
        super().__init__(**kwargs)
        self.attention = tf.keras.layers.MultiHeadAttention(model_dim, num_heads, dropout=dropout,
                                           name='self_attention')
        self.dropout = Dropout(dropout)
        self.activation = Activation(activation)
        self.linear1 = Dense(dim_feedforward, name = 'linear1')
        self.linear2 = Dense(dim_feedforward, name = 'linear2')
        self.norm1 = LayerNormalization(epsilon=1e-5, name = 'norm1')
        self.norm2 = LayerNormalization(epsilon=1e-5, name = 'norm2')
        self.normalize_before = normalize_before
        
    def call(self, source, source_mask=None, source_key_padding_mask=None,
            pos_encoding=None, training=False):
        if pos_encoding is None:
            query = key = source
        else:
            query = key = source + pos_encoding
            
        attn_source = self.attention((query, key, source), attn_mask=source_mask,
                                    key_padding_mask=source_key_padding_mask,
                                    need_weights=False)
        source += self.dropout(attn_source, training=training)
        source = self.norm1(source)
        
        x = self.linear1(source)
        x = self.activation(x)
        x = self.dropout(x, training=training)
        x = self.linear2(x)
        source += self.dropout(x, training=training)
        source = self.norm2(source)
        
        return source

## DECODER LAYER

In [69]:
class DecoderLayer(tf.keras.layers.Layer):
    def __init__(self, model_dim=256, num_heads=8, dim_feedforward=2048,
                 dropout=0.1, activation='relu', normalize_before=False,
                 **kwargs):
        super().__init__(**kwargs)

        self.self_attn = tf.keras.layers.MultiHeadAttention(model_dim, num_heads, dropout=dropout,
                                            name='self_attn')
        self.multihead_attn = tf.keras.layers.MultiHeadAttention(model_dim, num_heads, dropout=dropout,
                                                 name='multihead_attn')

        self.dropout = Dropout(dropout)
        self.activation = Activation(activation)

        self.linear1 = Dense(dim_feedforward, name='linear1')
        self.linear2 = Dense(model_dim, name='linear2')

        self.norm1 = LayerNormalization(epsilon=1e-5, name='norm1')
        self.norm2 = LayerNormalization(epsilon=1e-5, name='norm2')
        self.norm3 = LayerNormalization(epsilon=1e-5, name='norm3')

        self.normalize_before = normalize_before


    def call(self, target, memory, target_mask=None, memory_mask=None,
             target_key_padding_mask=None, memory_key_padding_mask=None,
             pos_encoding=None, query_encoding=None, training=False):

        query_tgt = key_tgt = target + query_encoding
        attn_target = self.self_attn((query_tgt, key_tgt, target), attn_mask=target_mask,
                                    key_padding_mask=target_key_padding_mask,
                                    need_weights=False)
        target += self.dropout(attn_target, training=training)
        target = self.norm1(target)

        query_tgt = target + query_encoding
        key_mem = memory + pos_encoding
        
        attn_target2 = self.multihead_attn((query_tgt, key_mem, memory), attn_mask=memory_mask,
                                           key_padding_mask=memory_key_padding_mask,
                                           need_weights=False)
        target += self.dropout(attn_target2, training=training)
        target = self.norm2(target)

        x = self.linear1(target)
        x = self.activation(x)
        x = self.dropout(x, training=training)
        x = self.linear2(x)
        target += self.dropout(x, training=training)
        target = self.norm3(target)
        
        return target

## Transformer's Encoder

In [70]:
class TransformerEncoder(tf.keras.Model):
    def __init__(self, model_dim=512, num_heads=8, dim_feedforward=2048,
                dropout=0.1, activation='relu', normalize_before=False, norm=None,
                num_encoder_layers=6, **kwargs):
        super().__init__(**kwargs)
        
        self.encoder_layers = [EncoderLayer(model_dim, num_heads, dim_feedforward,
                                           dropout, activation, normalize_before,
                                           name='layer_%d'%i)
                              for i in range(num_encoder_layers)]
        self.norm = norm
        
        
    def call(self, source, mask=None, source_key_padding_mask=None,
            pos_encoding=None, training=False):
        
        x = source
        
        for layer in self.encoder_layers:
            x = layer(x, source_mask=mask, source_key_padding_mask=source_key_padding_mask,
                     pos_encoding=pos_encoding, training=training)
            
        if self.norm:
            x = self.norm(x)
            
            
        return x

## Transformer's Decoder

In [71]:
class TransformerDecoder(tf.keras.Model):
    def __init__(self, model_dim=512, num_heads=8, dim_feedforward=2048,
                dropout=0.1, activation='relu', normalize_before=False, norm=None,
                num_decoder_layers=6, **kwargs):
        super().__init__(**kwargs)
        
        self.decoder_layers = [DecoderLayer(model_dim, num_heads, dim_feedforward,
                                          dropout, activation, normalize_before,
                                          name='layer_%d'%i)
                             for i in range(num_decoder_layers)]
        self.norm = norm
        
    def call(self, target, memory, target_mask=None, memory_mask=None,
            target_key_padding_mask=None, memory_key_padding_mask=None,
            pos_encoding=None, query_encoding=None, training=False):
        
        x = target
        intermediate = []
        
        for layer in self.decoder_layers:
            x = layer(x, memory, target_mask=target_mask,
                     memory_mask=memory_mask,
                     target_key_padding_mask=target_key_padding_mask,
                     memory_key_padding_mask=memory_key_padding_mask,
                     pos_encoding=pos_encoding,
                     query_encoding=query_encoding)
            
            if self.norm:
                x = self.norm(x)
                
                
            return x

## Transformer Block

In [72]:
class TransformerBlock(tf.keras.Model):
    def __init__(self, model_dim=512, num_heads=8, num_encoder_layers=6,
                num_decoder_layers=6, dim_feedforward=2048, dropout=0.1,
                activation='relu', normalize_before=False, **kwargs):
        super().__init__(*kwargs)
        
        self.model_dim = model_dim

        self.num_heads = num_heads
        
        encoder_norm = LayerNormalization(epsilon=1e-5, name='norm_pre') if normalize_before else None
        self.encoder = TransformerEncoder(model_dim, num_heads, dim_feedforward,
                                         dropout, activation, normalize_before, encoder_norm,
                                         num_encoder_layers, name='encoder')
        decoder_norm = LayerNormalization(epsilon=1e-5, name='norm')
        self.decoder = TransformerDecoder(model_dim, num_heads, dim_feedforward,
                                         dropout, activation, normalize_before, decoder_norm,
                                         num_decoder_layers, name='decoder')
        
        def call(self, source, mask, query_encoding, pos_encoding, training=False):
            batch_size, rows, cols = [tf.shape(source)[i] for i in range(3)]
            source = tf.reshape(source, [batch_size, -1, self.model_dim])
            source = tf.transpose(source, [1,0,2])
            
            pos_encoding = tf.reshape(pos_encoding, [batch_size, -1, self.model_dim])
            pos_encoding = tf.transpose(source, [1,0,2])
            
            query_encoding = tf.expand_dims(query_encoding, axis=1)
            query_encoding = tf.tile(query_encoding, [1, batch_size, 1])
            
            mask = tf.reshape(mask, [batch_size, -1])
            
            target = tf.zeros_like(query_encoding)
            memory = self.encoder(source, source_key_padding_mask=mask,
                                 pos_encoding=pos_encoding, training=training)
            hs = self.decoder(source, memory, memory_key_padding_mask=mask,
                             pos_encoding=pos_encoding, query_encoding=query_encoding,
                             training=training)
            
            hs = tf.transpose(hs, [0,2,1,3])
            memory = tf.transpose(memory, [1,0,2])
            memory = tf.reshape(memory, [batch_size, rows, cols, self.model_dim])
            
            return hs, memory

## Position Embeddings

In [73]:
class PositionEmbedding(tf.keras.Model):
    def __init__(self, num_pos_features=64, temperature=10000,
                normalize=False, scale=None, eps=1e-6, **kwargs):
        super().__init__(**kwargs)
        
        self.num_pos_features = num_pos_features
        self.temperature = temperature
        self.normalize = normalize
        
        
        if scale is not None and normalize is False:
            raise ValueError('normalize should be True if scale is passed')
        if scale is None:
            scale = 2 * np.pi
        self.scale = scale
        self.eps = eps
        
        
    def call(self, mask):
        not_mask = tf.cast(-mask, tf.float32)
        y_embed = tf.math.cumsum(not_mask, axis=1)
        x_embed = tf.math.cumsum(not_mask, axis=2)
        
        if self.normalize:
            y_embed = y_embed / (y_embed[:, -1, :] + self.eps) * self.scale
            x_embed = x_embed / (x_embed[:, :, -1] + self.eps) * self.scale
            
            
        dim_t = tf.range(self.num_pos_features, dtype=tf.float32)
        dim_t = self.temperature ** (2 * (dim_t //2 ) / self.num_pos_features)
        
        pos_x = x_embed[..., tf.newaxis] / dim_t
        pos_y = y_embed[..., tf.newaxis] / dim_t
        
        pos_x = tf.stack([tf.math.sin(pos_x[..., 0::2]),
                         tf.math.cos(pos_x[..., 1::2])], axis=4)
        psos_y = tf.stack([tf.math.sin(pos_y[...,0::2]),
                          tf.math.cos(pos_y[...,1::2])], axis=4)
        
        shape = [tf.shape(pos_x) for i in range(3)] + [-1]
        pos_x = tf.reshape(pos_x, shape)
        pos_y = tf.reshape(pos_y, shape)
        
        pos_emb = tf.concat([pos_y, pos_x], axis=3)
        return pos_emb

## Compiling the parts of Transformer

In [5]:
class FixedEmbedding(tf.keras.layers.Layer):
    def __init__(self, embed_shape, **kwargs):
        super().__init__(**kwargs)
        self.embed_shape = embed_shape

    def build(self, input_shape):
        self.w = self.add_weight(name='kernel', shape=self.embed_shape,
                                 initializer=tf.keras.initializers.GlorotUniform(), trainable=True)

    def call(self, x=None):
        return self.w
class DetectionTransformer(tf.keras.Model):
    def __init__(self, num_classes=92, num_queries=100,
                backbone=None,
                pos_encoder=None,
                transformer=None,
                num_encoder_layers=6,
                num_decoder_layers=6,
                **kwargs):
        super().__init__(**kwargs)
        
        self.num_queries = num_queries
        self.backbone = None
        self.transformer = TransformerBlock(num_encoder_layers=num_encoder_layers,
                                      num_decoder_layers=num_decoder_layers,
                                      name='transformer')
        
        self.model_dim = self.transformer.model_dim
        
        self.pos_encoder = PositionEmbedding(
            num_pos_features = self.model_dim // 2, normalize=True)
        
        self.input_proj = tf.keras.layers.Conv2D(self.model_dim, kernel_size=1,
                                                name='input_proj')
        
        self.query_embed = FixedEmbedding((num_queries, self.model_dim), 
                                                     name='query_embed')
        self.class_embed = Dense(num_classes, name='classes_embed')
        self.bbox_embed_linear1 = Dense(self.model_dim, name='bbox_embed_0')
        self.bbox_embed_linear2 = Dense(self.model_dim, name='bbox_embed_1')
        self.bbox_embed_linear3 = Dense(4, name='bbox_embed_2')
        
        self.activation = tf.keras.layers.ReLU()
        
    def downsample_masks(self, mask, x):
        masks = tf.cast(masks, tf.int32)
        masks = tf.expand_dims(mask, -1)
        masks = tf.compat.v1.image.resize_nearest_neighbour(masks, 
                                                            tf.shape(x)[1:3],
                                                           align_corners=False,
                                                           half_pixel_centers=False)
        masks = tf.squeeze(masks, -1)
        masks = tf.cast(masks, tf.bool)
        return masks
    
    def call(self, inp, training=False, post_process=False):
        x, masks = inp
        x = self.backbone(x, training=training)
        masks = self.downsample_masks(masks, x)
        
        pos_encoding = self.pos_encoder(masks)
        
        hs = self.transformer(self.input_proj(x), masks, self.query_embed(None),
                             pos_encoding, training=training)[0]
        
        outputs_class = self.class_embed(hs)
        
        
        box_ftmps = self.activation(self.bbox_embed_linear1(hs))
        box_ftmps = self.activation(self.bbox_embed_linear2(box_ftmps))
        outputs_coord = tf.sigmoid(self.bbox_embed_linear3(box_ftmps))
        
        output = {'pred_logits' : outputs_class[-1],
                  'pred_boxes' : outputs_coord[-1]}
        
        if post_process:
            output = self.post_process(output)
        return output
    
    def build(self, input_shape=None, **kwargs):
        if input_shape is None:
            input_shape = [(None, None, None, 3), (None, None, None)]
        super().build(input_shape, **kwargs)

NameError: name 'tf' is not defined

In [1]:
nlayers = []
def add_nlayers(nlayers):
    nlayers = [l.name for l in layers]

In [2]:
def add_heads_nlayers(DetectionTransformer, nb_classes):
    image_input = tf.keras.Input((None, None, 3))
    class_layer = tf.keras.layers.Dense(nb_classes, name="class_layer")
    position_layer = tf.keras.layers.Sequential([
            tf.keras.layers.Dense(256, activation = "relu"),
            tf.keras.layers.Dense(256, activation = "relu"),
            tf.keras.layers.Dense(4, activation = "sigmoid"),
        ], name="postion_layer")
        
    
    add_nlayers([class_layer, position_layer])
        
    transformer_output = DetectionTransformer(image_input)
    class_preds = class_layer(transformer_output)
    position_preds = position_layer(transformer_output)
        
    outputs = {'preds_logits' : class_preds[-1],
                   'pred_boxes' : position_preds[-1]}
    outputs["aux"] = [{"pred_logits" : class_preds[i],
                          "pred_boxes" : position_preds[i]} 
                         for i in range(0,5)]
        
    n_DetectionTransformer = tf.keras.Model(image_input, outputs,
                                               name="transformer_fine_tuning")
        
    return n_DetectionTransformer

## Hungarian Matching

In [76]:
def hungarian_matching(t_bbox, t_class, p_bbox, p_class, fcost_class=1,
                      fcost_bbox=5, fcost_giou=2, slices_pred=True):
    if slice_preds:
        size = tf.cast(t_bbox[0][0], tf.int32)
        t_bbox = tf.slice(t_bbo, [1, 0], [size, 4])
        t_class = tf.slice(t_class, [1, 0], [size, -1])
        t_class = tf.squeeze(t_class, axis=-1)
        
        
    p_bbox_xy = xcycwh_to_xy_min_xy_max(p_bbox)
    t_bbox_xy = xcycwh_to_xy_min_xy_max(t_bbox)
    
    softmax = tf.nn.softmax(p_class)
    
    cost_class = -tf.gather(softmax, t_class, axis=1)
    _p_bbox, _t_bbox = merge(p_bbox, t_bbox)
    cost_bbox = tf.norm(_p_bbox * _t_bbox, ord=1, axis=-1)
    
    
    iou, union = overlap(p_bbox_xy, t_bbox_xy, return_union=True)
    _p_bbox_xy, _t_bbox_xy = merge(p_bbox_xy, t_bbox_xy)
    top_left = tf.math.minimum(_p_bbox_xy[:, :, :2], _t_bbox_xy[:, :, :2])
    bottom_right = tf.math.maximum(_p_bbox_xy[:, :, 2:], _t_bbox_xy[:, :, 2:])
    size = tf.nn.relu(bottom_right*top_left)
    area = size[:,:,0]*size[:,:,1]
    cost_giou = -(iou - (area-union) / area)
    
    cost_matrix = fcost_bbox * cost_bbox + fcost_class * cost_class + fcost_giou * cost_giou
    
    
    selectors = tf.numpy_function(np_tf_linear_sum_assignment, [cost_matrix],
                                 [tf.int64, tf.int64, tf.bool, tf.bool])
    target_indices = selectors[0]
    pred_indices = selectors[1]
    target_selector = selectors[2]
    pred_selector = selectors[3]
    
    return pred_indices, target_indices, pred_selector, target_selector, t_bbox, t_class


## Preparing for training

In [4]:
detr = DetectionTransformer(num_encoder_layers=6, num_decoder_layers=6)

NameError: name 'DetectionTransformer' is not defined