# Downloads and Imports

In [None]:
# Install KerasCV and PyCOCOTools metrics
! pip install -q pycocotools git+https://github.com/keras-team/keras-cv@v0.6.4

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow import keras
print("tensorflow:", tf.__version__)
import keras_cv
from keras_cv import bounding_box
from keras_cv import visualization
from keras.callbacks import LambdaCallback
from tensorflow.keras.models import Model

print("keras_cv:", keras_cv.__version__)
device_name = tf.test.gpu_device_name()

if "GPU" in device_name:
    print('Found GPU at: {}'.format(device_name))
else:
    print('GPU not found.')

# Utils

In [None]:
def visualize_dataset(inputs, value_range, rows, cols, bounding_box_format):
    inputs = next(iter(inputs.take(1)))
    images, bounding_boxes = inputs["images"], inputs["bounding_boxes"]
    visualization.plot_bounding_box_gallery(
        images,
        value_range=value_range,
        rows=rows,
        cols=cols,
        y_true=bounding_boxes,
        scale=5,
        font_scale=0.7,
        bounding_box_format=bounding_box_format,
        class_mapping=class_mapping,
    )

def visualize_detections(model, dataset, bounding_box_format, rows=2, cols=2):
    images, y_true = next(iter(dataset.take(1)))
    y_pred = model.predict(images)
    y_pred = bounding_box.to_ragged(y_pred)
    visualization.plot_bounding_box_gallery(
        images,
        value_range=(0, 255),
        bounding_box_format=bounding_box_format,
        y_true=y_true,
        y_pred=y_pred,
        scale=4,
        rows=rows,
        cols=cols,
        show=True,
        font_scale=0.7,
        class_mapping=class_mapping,
    )
    
def dict_to_tuple(inputs):
    return inputs["images"], bounding_box.to_dense(
        inputs["bounding_boxes"], max_boxes=32
    )

In [None]:
class_mapping = {
    1: "Aluminium foil",
    2: "Bottle",
    3: "Bottle cap",
    4: "Broken glass",
    5: "Can",
    6: "Carton",
    7: "Cigarette",
    8: "Cup",
    9: "Lid",
    10: "Other litter",
    11: "Other plastic",
    12: "Paper",
    13: "Plastic bag - wrapper",
    14: "Plastic container",
    15: "Pop tab",
    16: "Straw",
    17: "Styrofoam piece",
    18: "Unlabeled litter",
}

# Inspecting Data

In [None]:
train_tfrecord_file = '/kaggle/input/taco-tfrecord/Train_litter.tfrecord'
val_tfrecord_file = '/kaggle/input/taco-tfrecord/Test_litter.tfrecord'

# Create a TFRecordDataset
train_dataset = tf.data.TFRecordDataset([train_tfrecord_file])
val_dataset = tf.data.TFRecordDataset([val_tfrecord_file])

# Iterate over a few entries and print their content. Uncomment this to look at the raw data
# for record in train_dataset.take(1):
#     example = tf.train.Example()
#     example.ParseFromString(record.numpy())
#     print(example)

In [None]:
def parse_tfrecord_fn(example):
    feature_description = {
        'image/encoded': tf.io.FixedLenFeature([], tf.string),
        'image/height': tf.io.FixedLenFeature([], tf.int64),
        'image/width': tf.io.FixedLenFeature([], tf.int64),
        'image/object/bbox/xmin': tf.io.VarLenFeature(tf.float32),
        'image/object/bbox/xmax': tf.io.VarLenFeature(tf.float32),
        'image/object/bbox/ymin': tf.io.VarLenFeature(tf.float32),
        'image/object/bbox/ymax': tf.io.VarLenFeature(tf.float32),
        'image/object/class/label': tf.io.VarLenFeature(tf.int64),
    }
    
    parsed_example = tf.io.parse_single_example(example, feature_description)

    # Decode the JPEG image and normalize the pixel values to the [0, 1] range.
    img = tf.image.decode_jpeg(parsed_example['image/encoded'], channels=3) # Returned as uint8
    # Normalize the pixel values to [0, 256]
    img = tf.image.convert_image_dtype(img, tf.uint8)

    # Get the bounding box coordinates and class labels.
    xmin = tf.sparse.to_dense(parsed_example['image/object/bbox/xmin'])
    xmax = tf.sparse.to_dense(parsed_example['image/object/bbox/xmax'])
    ymin = tf.sparse.to_dense(parsed_example['image/object/bbox/ymin'])
    ymax = tf.sparse.to_dense(parsed_example['image/object/bbox/ymax'])
    labels = tf.sparse.to_dense(parsed_example['image/object/class/label'])

    # Stack the bounding box coordinates to create a [num_boxes, 4] tensor.
    rel_boxes = tf.stack([xmin, ymin, xmax, ymax], axis=-1)
    boxes = keras_cv.bounding_box.convert_format(rel_boxes, source='rel_xyxy', target='xyxy', images=img)

    # Create the final dictionary.
    image_dataset = {
        'images': img,
        'bounding_boxes': {
            'classes': labels,
            'boxes': boxes
        }
    }

    return image_dataset

train_dataset = train_dataset.map(parse_tfrecord_fn)
val_dataset = val_dataset.map(parse_tfrecord_fn)

In [None]:
# Batching
BATCH_SIZE = 32
# Adding autotune for pre-fetching
AUTOTUNE = tf.data.experimental.AUTOTUNE
# Other constants
NUM_ROWS = 4
NUM_COLS = 8
IMG_SIZE = 416
BBOX_FORMAT = "xyxy"

train_dataset = train_dataset.ragged_batch(BATCH_SIZE).prefetch(buffer_size=AUTOTUNE)
val_dataset = val_dataset.ragged_batch(BATCH_SIZE).prefetch(buffer_size=AUTOTUNE)

augmenter = keras.Sequential(
    [
        keras_cv.layers.JitteredResize(
            target_size=(IMG_SIZE, IMG_SIZE), scale_factor=(0.8, 1.25), bounding_box_format=BBOX_FORMAT
        ),
        keras_cv.layers.RandomFlip(mode="horizontal_and_vertical", bounding_box_format=BBOX_FORMAT),
        keras_cv.layers.RandomRotation(factor=0.25, bounding_box_format=BBOX_FORMAT),
        keras_cv.layers.RandomSaturation(factor=(0.4, 0.6)),
        keras_cv.layers.RandomHue(factor=0.2, value_range=[0,255])
    ]
)

train_dataset = train_dataset.map(augmenter, num_parallel_calls=tf.data.AUTOTUNE)

# Resize and pad images
inference_resizing = keras_cv.layers.Resizing(
    IMG_SIZE, IMG_SIZE, pad_to_aspect_ratio=True, bounding_box_format=BBOX_FORMAT
)

val_dataset = val_dataset.map(inference_resizing, num_parallel_calls=tf.data.AUTOTUNE)

## Visualizing Our Dataset 

### Training Data

In [None]:
# Visualize training set
visualize_dataset(
    train_dataset, bounding_box_format=BBOX_FORMAT, value_range=(0, 255), rows=NUM_ROWS, cols=NUM_COLS
)

### Validation Data

In [None]:
# Visualize validation set
visualize_dataset(
    val_dataset, bounding_box_format=BBOX_FORMAT, value_range=(0, 255), rows=NUM_ROWS, cols=NUM_COLS
)

In [None]:
train_dataset = train_dataset.map(dict_to_tuple, num_parallel_calls=tf.data.AUTOTUNE)
val_dataset = val_dataset.map(dict_to_tuple, num_parallel_calls=tf.data.AUTOTUNE)

# Creating RetinaNet Model and Doing Inference

In [None]:
base_lr = 0.0001
# including a global_clipnorm is extremely important in object detection tasks
optimizer_Adam = tf.keras.optimizers.Adam(
    learning_rate=base_lr,
    global_clipnorm=10.0
)

coco_metrics = keras_cv.metrics.BoxCOCOMetrics(
    bounding_box_format=BBOX_FORMAT, evaluate_freq=5
)

class VisualizeDetections(keras.callbacks.Callback):
    def on_epoch_end(self, epoch, logs):
        if (epoch+1)%5==0:
            visualize_detections(
                self.model, bounding_box_format=BBOX_FORMAT, dataset=val_dataset, rows=NUM_ROWS, cols=NUM_COLS
            )

checkpoint_path="best-custom-model"

callbacks_list = [
    # Conducting early stopping to stop after 6 epochs of non-improving validation loss
    keras.callbacks.EarlyStopping(
        monitor="val_loss",
        patience=6,
    ),
    
    # Saving the best model
    keras.callbacks.ModelCheckpoint(
        filepath=checkpoint_path,
        monitor="val_loss",
        mode="min",
        save_best_only=True,
        save_weights_only=False,
        save_freq='epoch',
    ),
    
    # Custom metrics printing after each epoch
    tf.keras.callbacks.LambdaCallback(
    on_epoch_end=lambda epoch, logs: 
        print(f"\nEpoch #{epoch+1} \n" +
              f"Loss: {logs['loss']:.4f} \n" + 
              f"mAP: {logs['MaP']:.4f} \n" + 
              f"Validation Loss: {logs['val_loss']:.4f} \n" + 
              f"Validation mAP: {logs['val_MaP']:.4f} \n") 
    ),
    
    # Visualizing results after each n epoch
    VisualizeDetections()
]

# Building a RetinaNet model with a backbone trained on resnet50
def create_model():        
    #back_bone_model = MobileNetV3Small(include_top=False, weights='imagenet')
    model = keras_cv.models.RetinaNet.from_preset(
        #"yolo_v8_m_backbone_coco",
        #"mobilenet_v3_small_imagenet",
        "yolo_v8_xs_backbone_coco",
        num_classes=len(class_mapping),
        bounding_box_format=BBOX_FORMAT
    )
    return model

model = create_model()


# Customizing non-max supression of model prediction.
model.prediction_decoder = keras_cv.layers.MultiClassNonMaxSuppression(
    bounding_box_format = BBOX_FORMAT,
    from_logits=True,
    iou_threshold=0.5,
    confidence_threshold=0.5,
)

# Using focal classification loss and smoothl1 box loss with coco metrics
model.compile(
    classification_loss="focal",
    box_loss="smoothl1",
    optimizer=optimizer_Adam,
    #metrics=[coco_metrics]
)

In [None]:
history = model.fit(
    train_dataset,
    validation_data=val_dataset,
    epochs=60,
    #callbacks=callbacks_list,
    verbose=1,
)
model.save(checkpoint_path)

## Visualizing Training and Validation Loss

In [None]:
# Access training and validation loss values from the history
training_loss = history.history['loss']
validation_loss = history.history['val_loss']

# Plotting both training and validation loss
epochs = range(1, len(training_loss) + 1)

plt.plot(epochs, training_loss, label='Training Loss')
plt.plot(epochs, validation_loss, label='Validation Loss')

plt.title('Training and Validation Loss Over Epochs')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()

## Testing the Best Model

In [None]:
from keras_cv.layers import MultiClassNonMaxSuppression
# Import BoxCOCOMetrics from the correct module
from keras_cv.metrics import BoxCOCOMetrics  # Replace 'your_module' with the actual module name

custom_objects = {
    "MultiClassNonMaxSuppression": MultiClassNonMaxSuppression,
    "BoxCOCOMetrics": lambda bounding_box_format=BBOX_FORMAT, evaluate_freq=5: BoxCOCOMetrics(bounding_box_format, evaluate_freq)
    # include other custom objects if there are any
}

model = tf.keras.models.load_model(checkpoint_path, custom_objects=custom_objects, compile=False)

# Using focal classification loss and smoothl1 box loss with coco metrics
model.compile(
    classification_loss="focal",
    box_loss="smoothl1",
    optimizer=optimizer_Adam,
    #metrics=[coco_metrics]
)
# Visuaize on test set
visualize_detections(model, dataset=val_dataset.skip(1), bounding_box_format=BBOX_FORMAT, rows=NUM_ROWS, cols=NUM_COLS)


In [None]:
import tensorflow as tf
import keras_cv
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.patches as patches

def visualize_boxes(image, true_boxes, pred_boxes, true_labels=None, pred_labels=None):
    # Rescale the image if the pixel values are floats and fall outside [0, 1]
    if image.dtype == np.float32 or image.dtype == np.float64:
        if image.min() < 0 or image.max() > 1:
            image = (image - image.min()) / (image.max() - image.min())

    fig, ax = plt.subplots(1)
    ax.imshow(image)

    # True boxes in green
    for box in true_boxes:
        rect = patches.Rectangle((box[0], box[1]), box[2] - box[0], box[3] - box[1], linewidth=2, edgecolor='g', facecolor='none')
        ax.add_patch(rect)
        if true_labels is not None:
            plt.text(box[0], box[1], true_labels, bbox=dict(facecolor='green', alpha=0.5))

    # Predicted boxes in red
    for box in pred_boxes:
        rect = patches.Rectangle((box[1], box[0]), box[3] - box[1], box[2] - box[0], linewidth=2, edgecolor='r', facecolor='none')
        ax.add_patch(rect)
        if pred_labels is not None:
            plt.text(box[0], box[1], pred_labels, bbox=dict(facecolor='red', alpha=0.5))

    plt.show()

def calculate_errors_and_visualize(model, dataset, iou_threshold, confidence_threshold, bounding_box_format, debug=True):
    error_counts = []

    for images, y_true in dataset:  # Process one batch for demonstration
        y_pred = model.predict(images)
        y_pred = keras_cv.bounding_box.to_ragged(y_pred)

        for image, true_boxes, true_classes, pred_boxes, pred_confidences, pred_classes in zip(images, y_true['boxes'], y_true['classes'], y_pred['boxes'], y_pred['confidence'], y_pred['classes']):
            # Filter out boxes with class -1
            valid_true_boxes = tf.boolean_mask(true_boxes, tf.not_equal(true_classes, -1))
            valid_pred_boxes = tf.boolean_mask(pred_boxes, tf.not_equal(pred_classes, -1))
            valid_pred_confidences = tf.boolean_mask(pred_confidences, tf.not_equal(pred_classes, -1))

            # Further filter predicted boxes based on confidence threshold
            confident_indices = tf.where(valid_pred_confidences > confidence_threshold)
            pred_confident_boxes = tf.gather_nd(valid_pred_boxes, confident_indices)

            # Compute IoU matrix
            iou_matrix = keras_cv.bounding_box.compute_iou(valid_true_boxes, pred_confident_boxes, bounding_box_format)

            # Handle cases where there are no confident predictions
            if pred_confident_boxes.shape[0] == 0:
                false_positives = 0
                false_negatives = valid_true_boxes.shape[0]
            else:
                # Count false positives and false negatives
                false_positives = np.sum(np.max(iou_matrix, axis=0) < iou_threshold)
                false_negatives = np.sum(np.max(iou_matrix, axis=1) < iou_threshold)
            error_counts.append(false_positives + false_negatives)

            # Visualization
            if false_positives + false_negatives > 0:
                print(f"Image: False Positives: {false_positives}, False Negatives: {false_negatives}")
                visualize_boxes(image.numpy(), valid_true_boxes.numpy(), pred_confident_boxes.numpy())
                print()

    return error_counts

# Example usage:
error_counts = calculate_errors_and_visualize(model, train_dataset, iou_threshold=0.5, confidence_threshold=0.6, bounding_box_format="xyxy", debug=True)
print("Total error counts for each image:", error_counts)