In [None]:
# Py Imports
from dataclasses import dataclass

# DL Imports
import tensorflow as tf
import keras_cv as nn_cv
# from tensorflow_addons.optimizers import MultiOptimizer
nn = tf.keras
tnp = tf.experimental.numpy

# Project Imports
from dataset import load_pascal_voc
from backbone import BackBone
from detr_transformer import Transformer
from feedforward import FFN
from matcher import Matcher
from loss import HungarianLoss

# PASCAL VOC 2007

PASCAL VOC 2007 is a dataset for image recognition. The twenty object classes that have been selected are:

    
```python
classes =  {
  Person: [person],
  Animal: [bird, cat, cow, dog, horse, sheep],
  Vehicle: [aeroplane, bicycle, boat, bus, car, motorbike, train],
  Indoor: [bottle, chair, dining table, potted plant, sofa, tv/monitor]
  }
```

We add an extra class `no_object`

In [None]:
class_ids = [
    "no_object",
    "Aeroplane",
    "Bicycle",
    "Bird",
    "Boat",
    "Bottle",
    "Bus",
    "Car",
    "Cat",
    "Chair",
    "Cow",
    "Dining Table",
    "Dog",
    "Horse",
    "Motorbike",
    "Person",
    "Potted Plant",
    "Sheep",
    "Sofa",
    "Train",
    "Tv/monitor",
    ]

print("number of classes is", len(class_ids))
class_mapping = dict(zip(range(len(class_ids)), class_ids))
print(class_mapping)

## DETR and Train Args

In [None]:
class DETR_ARGS:
    N:int = 42 # max number of bboxes which can be present in an image
    n_classes:int = len(class_ids)

    max_length:list = [256, N] # [encoder:max((H*W) in images), decoder:max(num_object in images)]
    d_model:int = 1024
    n_heads:int = 8
    n_enc_layers:int = 2
    n_dec_layers:int = 3
    dropout_rate:float = 0.1
    
    backbone_lr:float = 1e-5
    weight_decay:float = 1e-4
    transformer_lr:float = 1e-4
    epochs:int = 1 # to be changed
    batch_size:int = 32

# Dataset

In [None]:
def visualize_dataset(inputs, value_range, rows, cols, bounding_box_format):
  inputs = next(iter(inputs.take(1)))
  images, bounding_boxes = inputs[0], {"classes": inputs[1][0], "boxes": inputs[1][1]}
  nn_cv.visualization.plot_bounding_box_gallery(
      images,
      value_range=value_range,
      rows=rows,
      cols=cols,
      y_true=bounding_boxes,
      scale=5,
      font_scale=0.7,
      bounding_box_format=bounding_box_format,
      class_mapping=class_mapping,
  )

train_ds = load_pascal_voc(split="test", dataset="voc/2007", bounding_box_format="xywh"); print(tf.data.experimental.cardinality(train_ds))
val_ds = load_pascal_voc(split="validation", dataset="voc/2007", bounding_box_format="xywh"); print(tf.data.experimental.cardinality(val_ds))
test_ds = load_pascal_voc(split="train", dataset="voc/2007", bounding_box_format="xywh"); print(tf.data.experimental.cardinality(test_ds))

In [None]:
def preprocess(x):
    N = DETR_ARGS.N
    X = nn_cv.layers.Resizing(500, 500, bounding_box_format="xywh", pad_to_aspect_ratio=True)(x["images"])
    y = (
        tf.pad([x["bounding_boxes"]["classes"]], [[0,0], [N,N]])[0][N:-len(x["bounding_boxes"]["classes"])],
        tf.pad(x["bounding_boxes"]["boxes"], [[N,N], [0,0]])[N:-len(x["bounding_boxes"]["boxes"])]
    )
    return X, y

train_ds = train_ds.map(preprocess)
val_ds = val_ds.map(preprocess)
test_ds = test_ds.map(preprocess)

In [None]:
visualize_dataset(train_ds.ragged_batch(4, drop_remainder=True), 
                  bounding_box_format="xywh", 
                  value_range=(0, 255), 
                  rows=2, cols=2
                  )

In [None]:
visualize_dataset(val_ds.ragged_batch(4, drop_remainder=True), 
                  bounding_box_format="xywh", 
                  value_range=(0, 255), 
                  rows=2, cols=2
                  )

In [None]:
visualize_dataset(test_ds.ragged_batch(4, drop_remainder=True), 
                  bounding_box_format="xywh", 
                  value_range=(0, 255), 
                  rows=2, cols=2
                  )

In [None]:
def normalize_bbox(x, y):
    img = x
    cls = y[0]
    box = y[1]/500
    return img, (cls, box)

def denormalize_bbox(x, y):
    img = x
    cls = y[0]
    box = y[1]*500
    return img, (cls, box)

In [None]:
train_ds = train_ds.map(lambda x, y: normalize_bbox(x, y))
val_ds = val_ds.map(lambda x, y: normalize_bbox(x, y))
test_ds = test_ds.map(lambda x, y: normalize_bbox(x, y))

## Model

In [None]:
class DETR(nn.Model):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.backbone = BackBone(
            DETR_ARGS.d_model, 
        )
        self.transformer = Transformer(
            DETR_ARGS.d_model, 
            DETR_ARGS.n_heads, 
            DETR_ARGS.n_enc_layers, 
            DETR_ARGS.n_dec_layers, 
            DETR_ARGS.dropout_rate, 
            DETR_ARGS.max_length, 
        )
        self.ffn = FFN(
            DETR_ARGS.d_model, 
            DETR_ARGS.n_classes
        )
        
    def call(self, inputs, training=False):
        if not training: # when training==False excecute below line
            inputs = nn_cv.layers.Resizing(500, 500, bounding_box_format="xywh", pad_to_aspect_ratio=True)(inputs)
        x = self.backbone(inputs, training=training)
        x = self.transformer(x, training=training)
        class_prob, bbox_pred = self.ffn(x, training=training) # ((B, N, 4), (B, N, 4))
        return (class_prob, bbox_pred) # ((B, N, n_classes), (B, N, 4))

In [None]:
model = DETR()

In [None]:
# optimizers_layers = [
#     (tf.optimizers.AdamW(learning_rate=1e-5, weight_decay=DETR_ARGS.weight_decay), model.layers[0]), 
#     (tf.optimizers.AdamW(learning_rate=1e-4, weight_decay=DETR_ARGS.weight_decay), model.layers[1:])]
optimizer = tf.optimizers.AdamW(learning_rate=5.5e-5, weight_decay=DETR_ARGS.weight_decay) #MultiOptimizer(optimizers_layers)

loss_fn = HungarianLoss(name="hugarian_loss")
matcher = Matcher()

In [None]:
@tf.function(jit_compile=True)
def train_step(train_ds):
    loss_values = []
    for step, (x_train, y_train) in enumerate(train_ds):
        with tf.GradientTape() as tape:
            y_pred = model(x_train, training=True)
            y_pred = matcher(y_train, y_pred)
            loss = loss_fn(y_train, y_pred)
        grads = tape.gradient(loss, model.trainable_weights)
        optimizer.apply_gradients(zip(grads, model.trainable_weights))
        
        loss_values.append(loss)
        print("Training loss (for one batch) at step %d: %.4f" % (step, float(loss)))
        print("Seen so far: %s samples" % ((step + 1) * DETR_ARGS.batch_size))
    return tf.reduce_mean(loss_values)

In [None]:
for epoch in range(1, DETR_ARGS.epochs + 1):
    print(f"Epoch {epoch}/{DETR_ARGS.epochs}")
    loss = train_step(train_ds)
    print(f"Loss at Epoch {epoch} : {loss}\n")
    model.save_weights(f'detr_weights_epoch{epoch}.keras')