# Object detection model customization

## Setup

To install the libraries for customizing a model, run the following commands:

In [None]:
!python --version
!pip install --upgrade pip
!pip install mediapipe-model-maker

Use the following code to import the required Python classes:

In [None]:
from google.colab import files
import os
import json
from tqdm import tqdm
import tensorflow as tf
assert tf.__version__.startswith('2')

from mediapipe_model_maker import object_detector

## Prepare data

In [None]:
from google.colab import drive
import shutil

### Set paths

In [None]:
# Mount Google Drive
drive.mount('/content/drive')

In [None]:
# Define paths
base_path = '/content/drive/MyDrive/'
source_path = base_path + 'Datasets/revisitop/rparis6k/data/'
dest_base_path = base_path + 'MyProject/rparis6k/'

train_dataset_path = dest_base_path + 'train/'
validation_dataset_path = dest_base_path + 'validation/'
test_dataset_path = dest_base_path + 'test/'

In [None]:
os.makedirs(dest_base_path, exist_ok=True)

os.makedirs(train_dataset_path, exist_ok=True)
os.makedirs(validation_dataset_path, exist_ok=True)
os.makedirs(test_dataset_path, exist_ok=True)

os.makedirs(os.path.join(train_dataset_path, 'images'), exist_ok=True)
os.makedirs(os.path.join(validation_dataset_path, 'images'), exist_ok=True)
os.makedirs(os.path.join(test_dataset_path, 'images'), exist_ok=True)

### Copy images

In [None]:
# Function to copy images
def copy_images(file_list, dest_folder):
    with open(file_list, 'r') as f:
        lines = f.readlines()
        for line in tqdm(lines, desc=f"Copying images to {dest_folder}"):
            img_name = line.strip()
            src = os.path.join(source_path, img_name)
            dst = os.path.join(dest_folder, img_name)
            os.makedirs(os.path.dirname(dst), exist_ok=True)
            shutil.copy2(src, dst)

# Copy images for each set
if (len(os.listdir(train_dataset_path + 'images/')) == 0 and
    len(os.listdir(validation_dataset_path + 'images/')) == 0 and
    len(os.listdir(test_dataset_path + 'images/')) == 0):
    copy_images(dest_base_path + 'train.txt', train_dataset_path + 'images/')
    copy_images(dest_base_path + 'val.txt', validation_dataset_path + 'images/')
    copy_images(dest_base_path + 'test.txt', test_dataset_path + 'images/')
    print("Dataset division completed!\n")
else:
    print("One or more directories are not empty. Copy operation aborted.\n")

print(f"Number of images in train set: {len(os.listdir(train_dataset_path + 'images/'))}")
print(f"Number of images in validation set: {len(os.listdir(validation_dataset_path + 'images/'))}")
print(f"Number of images in test set: {len(os.listdir(test_dataset_path + 'images/'))}")

### Review dataset

Verify the dataset content by printing the categories from the `labels.json` file. There should be 13 total categories. Index 0 is always set to be the `background` class which may be unused in the dataset.

In [None]:
with open(os.path.join(train_dataset_path, "labels.json"), "r") as f:
  labels_json = json.load(f)
for category_item in labels_json["categories"]:
  print(f"{category_item['id']}: {category_item['name']}")

In [None]:
#@title Visualize the training dataset
import matplotlib.pyplot as plt
from matplotlib import patches, text, patheffects
from collections import defaultdict
import math

def draw_outline(obj):
  obj.set_path_effects([patheffects.Stroke(linewidth=4,  foreground='black'), patheffects.Normal()])
def draw_box(ax, bb):
  patch = ax.add_patch(patches.Rectangle((bb[0],bb[1]), bb[2], bb[3], fill=False, edgecolor='red', lw=2))
  draw_outline(patch)
def draw_text(ax, bb, txt, disp):
  text = ax.text(bb[0],(bb[1]-disp),txt,verticalalignment='top'
  ,color='white',fontsize=10,weight='bold')
  draw_outline(text)
def draw_bbox(ax, annotations_list, id_to_label, image_shape):
  for annotation in annotations_list:
    cat_id = annotation["category_id"]
    bbox = annotation["bbox"]
    draw_box(ax, bbox)
    draw_text(ax, bbox, id_to_label[cat_id], image_shape[0] * 0.05)
def visualize(dataset_folder, max_examples=None):
  with open(os.path.join(dataset_folder, "labels.json"), "r") as f:
    labels_json = json.load(f)
  images = labels_json["images"]
  cat_id_to_label = {item["id"]:item["name"] for item in labels_json["categories"]}
  image_annots = defaultdict(list)
  for annotation_obj in labels_json["annotations"]:
    image_id = annotation_obj["image_id"]
    image_annots[image_id].append(annotation_obj)

  if max_examples is None:
    max_examples = len(image_annots.items())
  n_rows = math.ceil(max_examples / 3)
  fig, axs = plt.subplots(n_rows, 3, figsize=(24, n_rows*8)) # 3 columns(2nd index), 8x8 for each image
  for ind, (image_id, annotations_list) in enumerate(list(image_annots.items())[:max_examples]):
    ax = axs[ind//3, ind%3]
    img = plt.imread(os.path.join(dataset_folder, "images", images[image_id]["file_name"]))
    ax.imshow(img)
    draw_bbox(ax, annotations_list, cat_id_to_label, img.shape)
  plt.show()

visualize(train_dataset_path, 9)

### Create dataset

In [None]:
if os.path.exists("/tmp/od_data/train"):
    shutil.rmtree("/tmp/od_data/train") # TODO: do I need this instruction ?

In [None]:
# TODO: is it possible to add a progress bar?

In [None]:
train_data = object_detector.Dataset.from_coco_folder(train_dataset_path, cache_dir="/tmp/od_data/train")
validation_data = object_detector.Dataset.from_coco_folder(validation_dataset_path, cache_dir="/tmp/od_data/validation")
print("train_data size: ", train_data.size)
print("validation_data size: ", validation_data.size)

### Augmentation

In [None]:
import albumentations as A
import numpy as np
import cv2

#### Search for augmented data

In [None]:
# FIXME

In [None]:
def check_and_delete_augmented_images(folder_path):
    augmented_images = [f for f in os.listdir(folder_path) if 'aug' in f]

    if len(augmented_images) > 0:
        print(f"Found {len(augmented_images)} augmented images in {folder_path}.")
        user_input = input("Do you want to delete these images? (yes/no): ").strip().lower()

        if user_input == 'yes':
            for img in tqdm(augmented_images, desc="Deleting augmented images"):
                img_path = os.path.join(folder_path, img)
                os.remove(img_path)
            print("Augmented images deleted successfully.")
        else:
            print("Deletion aborted by user.")
    else:
        print("No augmented images found.")

check_and_delete_augmented_images(train_dataset_path + 'images/')

#### Augment data

In [None]:
# TODO: check label_fields
# FIXME: fix augmentation

In [None]:
def get_dynamic_transform(image_height, image_width):
    crop_height = min(224, image_height)
    crop_width = min(224, image_width)

    return A.Compose([ # FIXME
        A.HorizontalFlip(p=0.5),
        #A.RandomRotate90(p=0.5),
        A.RandomBrightnessContrast(p=0.2),
        A.Perspective(p=0.5), # TODO: check perspective transformations
        A.RandomGamma(p=0.2),
        A.GaussianBlur(blur_limit=(3, 7), p=0.1),
        A.CLAHE(clip_limit=4.0, tile_grid_size=(8, 8), p=0.2),
        #A.RandomCrop(height=crop_height, width=crop_width, p=0.5),
        #A.Cutout(num_holes=8, max_h_size=8, max_w_size=8, fill_value=0, p=0.5),
    ], bbox_params=A.BboxParams(format='coco', label_fields=['category_ids']))

In [None]:
def clip_bboxes(bboxes):
    return [[max(0, min(1, coord)) for coord in bbox] for bbox in bboxes]

def resize_if_needed(image, min_size=224): # TODO: check if it's necessary
    height, width = image.shape[:2]
    if height < min_size or width < min_size:
        scale = min_size / min(height, width)
        new_height = int(height * scale)
        new_width = int(width * scale)
        image = cv2.resize(image, (new_width, new_height), interpolation=cv2.INTER_LINEAR)
    return image

def apply_augmentations(image, bboxes, categories):
    height, width = image.shape[:2]
    transform = get_dynamic_transform(height, width)

    # Clip bboxes before applying augmentations
    clipped_bboxes = clip_bboxes(bboxes)

    augmented = transform(image=image, bboxes=clipped_bboxes, category_ids=categories)

    # Clip bboxes after augmentations as well
    augmented['bboxes'] = clip_bboxes(augmented['bboxes'])

    return augmented['image'], augmented['bboxes'], augmented['category_ids']

def update_coco_annotations(annotations, new_image_id, new_bboxes, new_categories):
    new_annotations = []
    for i, (bbox, category) in enumerate(zip(new_bboxes, new_categories)):
        new_annotations.append({
            "id": len(annotations) + i,
            "image_id": new_image_id,
            "category_id": category,
            "bbox": [round(coord, 1) for coord in bbox]
        })
    return new_annotations

In [None]:
# Load COCO annotations
with open(os.path.join(train_dataset_path, 'labels.json'), 'r') as f:
    coco_data = json.load(f)

augmented_images = []
augmented_annotations = []
new_image_id = len(coco_data['images'])

for image_info in tqdm(coco_data['images']):
    # Load image
    image_path = os.path.join(train_dataset_path, 'images', image_info['file_name'])

    if not os.path.exists(image_path):
        print(f"Image not found: {image_path}")
        continue

    image = cv2.imread(image_path)

    if image is None:
        print(f"Failed to load image: {image_path}")
        continue

    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

    # Get annotations for this image
    image_annotations = [ann for ann in coco_data['annotations'] if ann['image_id'] == image_info['id']]
    bboxes = [ann['bbox'] for ann in image_annotations]
    categories = [ann['category_id'] for ann in image_annotations]

    # Clip bboxes before applying augmentations
    bboxes = clip_bboxes(bboxes)

    #image = resize_if_needed(image) # TODO: check if it's necessary

    # Apply augmentations
    aug_image, aug_bboxes, aug_categories = apply_augmentations(image, bboxes, categories)

    # Save augmented image
    aug_image_name = f"aug_{image_info['file_name']}"
    cv2.imwrite(os.path.join(train_dataset_path, 'images', aug_image_name), cv2.cvtColor(aug_image, cv2.COLOR_RGB2BGR)) # FIXME: why do I do COLOR_RGB2BGR?

    # Update COCO annotations
    augmented_images.append({
        "id": new_image_id,
        "file_name": aug_image_name
    })
    augmented_annotations.extend(update_coco_annotations(coco_data['annotations'], new_image_id, aug_bboxes, aug_categories))
    new_image_id += 1

# Update COCO data
coco_data['images'].extend(augmented_images)
coco_data['annotations'].extend(augmented_annotations)

# Save updated COCO annotations
with open(os.path.join(train_dataset_path, 'labels.json'), 'w') as f:
    json.dump(coco_data, f)

print(f"Added {len(augmented_images)} augmented images to the dataset.")

#### Rewrite train data

In [None]:
shutil.rmtree("/tmp/od_data/train") # TODO: do I need this instruction ?

In [None]:
train_data = object_detector.Dataset.from_coco_folder(train_dataset_path, cache_dir="/tmp/od_data/train")
print("Updated train_data size: ", train_data.size)

## Retrain model

### Set retraining options

In [None]:
spec = object_detector.SupportedModels.MOBILENET_MULTI_AVG_I384

hparams = object_detector.HParams(
    learning_rate=0.1,
    batch_size=16,
    epochs=100,
    cosine_decay_epochs=100,
    cosine_decay_alpha=0.2,
    export_dir='exported_model'
)

model_options = object_detector.ModelOptions(
    l2_weight_decay=3e-4
)

options = object_detector.ObjectDetectorOptions(
    supported_model=spec,
    hparams=hparams,
    model_options=model_options
)

### Run retraining

In [None]:
model = object_detector.ObjectDetector.create(
    train_data=train_data,
    validation_data=validation_data,
    options=options
)

### Evaluate the model performance

After training the model, evaluate it on validation dataset and print the loss and coco_metrics. The most important metric for evaluating the model performance is typically the "AP" coco metric for Average Precision.

In [None]:
# TODO: is it possible to add a progress bar?

In [None]:
loss, coco_metrics = model.evaluate(validation_data, batch_size=4)
print(f"Validation loss: {loss}")
print(f"Validation coco metrics: {coco_metrics}")

## Export model

After creating the model, convert and export it to a Tensorflow Lite model format for later use on an on-device application. The export also includes model metadata, which includes the label map.

In [None]:
model.export_model()
!ls exported_model
files.download('exported_model/model.tflite')

## Model quantization

Model quantization is a model modification technique that can reduce the model size and improve the speed of predictions with only a relatively minor decrease in accuracy.

This section of the guide explains how to apply quantization to your model. Model Maker supports two forms of quantization for object detector:
1. Quantization Aware Training: 8 bit integer precision for CPU usage
2. Post-Training Quantization: 16 bit floating point precision for GPU usage

### Quantization aware training (int8 quantization)
Quantization aware training (QAT) is a fine-tuning step which happens after fully training your model. This technique further tunes a model which emulates inference time quantization in order to account for the lower precision of 8 bit integer quantization. For on-device applications with a standard CPU, use Int8 precision. For more information, see the [TensorFlow Lite](https://www.tensorflow.org/model_optimization/guide/quantization/training) documentation.

To apply quantization aware training and export to an int8 model, create a `QATHParams` configuration and run the `quantization_aware_training` method. See the **Hyperparameters** section below on detailed usage of `QATHParams`.

In [None]:
qat_hparams = object_detector.QATHParams(learning_rate=0.3, batch_size=4, epochs=10, decay_steps=6, decay_rate=0.96)
model.quantization_aware_training(train_data, validation_data, qat_hparams=qat_hparams)
qat_loss, qat_coco_metrics = model.evaluate(validation_data)
print(f"QAT validation loss: {qat_loss}")
print(f"QAT validation coco metrics: {qat_coco_metrics}")

The QAT step often requires multiple runs to tune the parameters of training. To avoid having to rerun model training using the `create` method, use the `restore_float_ckpt` method to restore the model state back to the fully trained float model(After running the `create` method) in order to run QAT again.

In [None]:
new_qat_hparams = object_detector.QATHParams(learning_rate=0.9, batch_size=4, epochs=15, decay_steps=5, decay_rate=0.96)
model.restore_float_ckpt()
model.quantization_aware_training(train_data, validation_data, qat_hparams=new_qat_hparams)
qat_loss, qat_coco_metrics = model.evaluate(validation_data)
print(f"QAT validation loss: {qat_loss}")
print(f"QAT validation coco metrics: {qat_coco_metrics}")

Finally, us the `export_model` to export to an int8 quantized model. The `export_model` function will automatically export to either float32 or int8 model depending on whether `quantization_aware_training` was run.

In [None]:
model.export_model('model_int8_qat.tflite')
!ls -lh exported_model
files.download('exported_model/model_int8_qat.tflite')

### Post-training quantization (fp16 quantization)

Post-training model quantization is a model modification technique that can reduce the model size and improve the speed of predictions with only a relatively minor decrease in accuracy. This approach reduces the size of the data processed by the model, for example by transforming 32-bit floating point numbers to 16-bit floats. Float16 quantization is reccomended for GPU usage. For more information, see the [TensorFlow Lite](https://www.tensorflow.org/model_optimization/guide/quantization/post_training) documentation.

First, import the MediaPipe Model Maker quantization module:

In [None]:
from mediapipe_model_maker import quantization

Define a QuantizationConfig object using the `for_float16()` class method. This configuration modifies a trained model to use 16-bit floating point numbers instead of 32-bit floating point numbers. You can further customize the quantization process by setting additional parameters for the QuantizationConfig class.

In [None]:
quantization_config = quantization.QuantizationConfig.for_float16()

Export the model using the additional quantization_config object to apply post-training quantization. Note that if you previously ran `quantization_aware_training`, you must first convert the model back to a float model by using `restore_float_ckpt`.

In [None]:
model.restore_float_ckpt()
model.export_model(model_name="model_fp16.tflite", quantization_config=quantization_config)
!ls -lh exported_model
files.download('exported_model/model_fp16.tflite')