## Setup

In [None]:
!python --version
!pip install --upgrade pip
!pip install mediapipe-model-maker

In [None]:
from google.colab import files
import os
import json
from tqdm import tqdm
import tensorflow as tf
assert tf.__version__.startswith('2')

from mediapipe_model_maker import object_detector

from google.colab import drive
import shutil

import matplotlib.pyplot as plt
import seaborn as sns
%matplotlib inline

### Colab Pro

In [None]:
gpu_info = !nvidia-smi
gpu_info = '\n'.join(gpu_info)
if gpu_info.find('failed') >= 0:
  print('Not connected to a GPU')
else:
  print(gpu_info)

In [None]:
from psutil import virtual_memory
ram_gb = virtual_memory().total / 1e9
print('Your runtime has {:.1f} gigabytes of available RAM\n'.format(ram_gb))

if ram_gb < 20:
  print('Not using a high-RAM runtime')
else:
  print('You are using a high-RAM runtime!')

### Set paths

In [None]:
# Mount Google Drive
drive.mount('/content/drive')

In [None]:
base_path = '/content/drive/MyDrive/'
source_path = base_path + 'Datasets/florence1k/'

dest_base_path = base_path + 'MyProject/florence1k/'

train_dataset_path = dest_base_path + 'train/'
validation_dataset_path = dest_base_path + 'validation/'
test_dataset_path = dest_base_path + 'test/'

In [None]:
#@title Create directories

os.makedirs(dest_base_path, exist_ok=True)

os.makedirs(train_dataset_path, exist_ok=True)
os.makedirs(validation_dataset_path, exist_ok=True)
os.makedirs(test_dataset_path, exist_ok=True)

os.makedirs(os.path.join(train_dataset_path, 'images'), exist_ok=True)
os.makedirs(os.path.join(validation_dataset_path, 'images'), exist_ok=True)
os.makedirs(os.path.join(test_dataset_path, 'images'), exist_ok=True)

## Prepare Data

### Copy images

In [None]:
# Function to copy images
def copy_images(file_list, dest_folder):
    with open(file_list, 'r') as f:
        lines = f.readlines()
        for line in tqdm(lines, desc=f"Copying images to {dest_folder}"):
            img_name = line.strip()
            src = os.path.join(source_path, img_name)
            dst = os.path.join(dest_folder, img_name)
            os.makedirs(os.path.dirname(dst), exist_ok=True)
            shutil.copy2(src, dst)

# Copy images for each set
if os.listdir(train_dataset_path + 'images/') == [] and \
   os.listdir(validation_dataset_path + 'images/') == [] and \
   os.listdir(test_dataset_path + 'images/') == []:
    copy_images(dest_base_path + 'train.txt', train_dataset_path + 'images/')
    copy_images(dest_base_path + 'val.txt', validation_dataset_path + 'images/')
    copy_images(dest_base_path + 'test.txt', test_dataset_path + 'images/')
    print("Dataset division completed!\n")
else:
    print("One or more directories are not empty. Copy operation aborted.\n")

print(f"Number of images in train set: {len(os.listdir(train_dataset_path + 'images/'))}")
print(f"Number of images in validation set: {len(os.listdir(validation_dataset_path + 'images/'))}")
print(f"Number of images in test set: {len(os.listdir(test_dataset_path + 'images/'))}")

### Review Data

In [None]:
with open(os.path.join(train_dataset_path, "labels.json"), "r") as f:
  labels_json = json.load(f)
for category_item in labels_json["categories"]:
  print(f"{category_item['id']}: {category_item['name']}")

### Create Dataset

In [None]:
# TODO: do I need this instruction ?

cache_dirs = ["/tmp/od_data/train", "/tmp/od_data/validation"]

for cache_dir in cache_dirs:
    if os.path.exists(cache_dir):
        shutil.rmtree(cache_dir)

In [None]:
train_data = object_detector.Dataset.from_coco_folder(train_dataset_path, cache_dir="/tmp/od_data/train")
validation_data = object_detector.Dataset.from_coco_folder(validation_dataset_path, cache_dir="/tmp/od_data/validation")

print(f"{'Training Dataset Size:':<25} {train_data.size:>4}")
print(f"{'Validation Dataset Size:':<25} {validation_data.size:>4}")

## Augmentation

### Augment Data

In [None]:
import albumentations as A
import numpy as np
import cv2

In [None]:
def get_transform(set='train'):
    bboxes_params = A.BboxParams(format='coco', min_visibility=0.3, label_fields=['class_labels']) # TODO: check min_visibility

    if set == 'train':
        transform = A.Compose([ # TODO: update pipeline (?)
            # TODO: do I need to resize images?
            #A.RandomResizedCrop(height=640, width=640, scale=(0.8, 1.0), ratio=(0.9, 1.1), p=1.0), # TODO: check h,w
            A.HorizontalFlip(p=0.5),
            A.RandomBrightnessContrast(brightness_limit=0.2, contrast_limit=0.2, p=0.5),
            A.HueSaturationValue(hue_shift_limit=20, sat_shift_limit=30, val_shift_limit=20, p=0.5),
            A.GaussNoise(var_limit=(10.0, 50.0), p=0.5),
            A.RandomShadow(num_shadows_lower=1, num_shadows_upper=3, shadow_dimension=5, shadow_roi=(0, 0.5, 1, 1), p=0.3),
            A.CLAHE(clip_limit=4.0, tile_grid_size=(8, 8), p=0.5),
            A.OneOf([
                A.MotionBlur(blur_limit=7, p=0.5),
                A.MedianBlur(blur_limit=7, p=0.5),
                A.GaussianBlur(blur_limit=7, p=0.5),
            ], p=0.3),
            A.ShiftScaleRotate(shift_limit=0.1, scale_limit=0.2, rotate_limit=15, border_mode=0, p=0.5),
            A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), # TODO: check
        ], bbox_params=bboxes_params)

    elif set == 'validation':
        transform = A.Compose([ # TODO: update pipeline
            # TODO: do I need to resize images?
            A.HorizontalFlip(p=0.5),
            A.CLAHE(clip_limit=4.0, tile_grid_size=(8, 8), p=0.5),
            A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), # TODO: check
        ], bbox_params=bboxes_params)

    return transform

In [None]:
with open(os.path.join(train_dataset_path, 'labels.json'), 'r') as f:
    train_json = json.load(f)

with open(os.path.join(validation_dataset_path, 'labels.json'), 'r') as f:
    val_json = json.load(f)

with open(os.path.join(test_dataset_path, 'labels.json'), 'r') as f:
    test_json = json.load(f)

n_images = max(train_json['images'][-1]['id'], val_json['images'][-1]['id'], test_json['images'][-1]['id'])
n_annotations = max(train_json['annotations'][-1]['id'], val_json['annotations'][-1]['id'], test_json['annotations'][-1]['id'])

In [None]:
def clip_bbox(bbox, image_width, image_height):
    x_min, y_min, width, height = bbox

    x_min = max(0, min(x_min, image_width - 1)) # TODO: check -1
    y_min = max(0, min(y_min, image_height - 1)) # TODO: check -1
    width = min(width, image_width - x_min)
    height = min(height, image_height - y_min)

    return [x_min, y_min, width, height]

In [None]:
def validate_bbox(bbox, image_width, image_height):
    x, y, w, h = bbox

    return 0 <= x < image_width and 0 <= y < image_height and x + w <= image_width and y + h <= image_height

In [None]:
def apply_augmentation(image_path, bboxes, class_labels, output_path, output_filename, transform):
    # Read the image
    image = cv2.imread(image_path)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image_height, image_width = image.shape[:2]

    # Apply the augmentation
    try:
        transformed = transform(image=image, bboxes=bboxes, class_labels=class_labels)
    except Exception as e:
        print(f"Error during transformation: {e}")
        return [], []

    # Save the augmented image
    augmented_image_path = os.path.join(output_path, output_filename)
    cv2.imwrite(augmented_image_path, cv2.cvtColor(transformed['image'], cv2.COLOR_RGB2BGR))

    return transformed['bboxes'], transformed['class_labels']

In [None]:
def augment_dataset(input_path, output_path, transform, n_images, n_annotations, num_augmentations=5):
    # Load the original COCO JSON file
    with open(os.path.join(input_path, 'labels.json'), 'r') as f:
        coco_data = json.load(f)

    new_images = []
    new_annotations = []

    # Copy original images and annotations
    for img in tqdm(coco_data['images'], desc="Copying original images"):

        src_path = os.path.join(input_path, 'images', img['file_name'])
        dst_path = os.path.join(output_path, 'images', img['file_name'])
        shutil.copy2(src_path, dst_path)

        new_images.append(img)
        img_anns = [ann for ann in coco_data['annotations'] if ann['image_id'] == img['id']]
        new_annotations.extend(img_anns)

    '''debug'''
    print("Before augmentation:")
    print(f"Number of images: {len(new_images)}")
    print(f"Number of annotations: {len(new_annotations)}")

    # Apply augmentations
    for img in tqdm(coco_data['images'], desc="Augmenting images"):
        image_path = os.path.join(input_path, 'images', img['file_name'])

        annotations = [ann for ann in coco_data['annotations'] if ann['image_id'] == img['id']]

        image = cv2.imread(image_path)
        image_height, image_width = image.shape[:2]

        for i in range(num_augmentations):
            bboxes = [ann['bbox'] for ann in annotations]
            class_labels = [ann['category_id'] for ann in annotations]

            # TODO: should I call the function clip_bbox() regardless of the function validate_bbox()?
            for bbox in bboxes:
                if not validate_bbox(bbox, image_width, image_height):
                    bboxes = [clip_bbox(bbox, image_width, image_height) for bbox in bboxes]

            new_filename = f"{os.path.splitext(img['file_name'])[0]}_aug_{i}.jpg"

            new_bboxes, new_class_labels = apply_augmentation(
                image_path, bboxes, class_labels,
                os.path.join(output_path, 'images'), new_filename, transform
            )

            new_img_id = n_images + 1
            new_images.append({
                'id': new_img_id,
                'file_name': new_filename
            })

            n_images = n_images + 1

            for bbox, cat_id in zip(new_bboxes, new_class_labels):
                new_annotations.append({
                    'id': n_annotations + 1,
                    'image_id': new_img_id,
                    'category_id': cat_id,
                    'bbox': bbox
                })

                n_annotations = n_annotations + 1

    '''debug'''
    print("After augmentation:")
    print(f"Number of images: {len(new_images)}")
    print(f"Number of annotations: {len(new_annotations)}")

    # Create the new COCO JSON file
    new_coco_data = {
        'categories': coco_data['categories'],
        'images': new_images,
        'annotations': new_annotations
    }

    # Save the new COCO JSON file
    with open(os.path.join(output_path, 'labels.json'), 'w') as f:
        json.dump(new_coco_data, f, indent=4)

    return n_images, n_annotations

In [None]:
augmented_train_dataset_path = dest_base_path + 'train_augmented/'
augmented_validation_dataset_path = dest_base_path + 'validation_augmented/'

os.makedirs(os.path.join(augmented_train_dataset_path, 'images'), exist_ok=True)
os.makedirs(os.path.join(augmented_validation_dataset_path, 'images'), exist_ok=True)

if os.listdir(augmented_train_dataset_path + 'images/') == []:
    n_images, n_annotations = augment_dataset(train_dataset_path, augmented_train_dataset_path, get_transform('train'), n_images, n_annotations, num_augmentations=5)
else:
    print("Augmentation on the training set has already been made.")

if os.listdir(augmented_validation_dataset_path + 'images/') == []:
    augment_dataset(validation_dataset_path, augmented_validation_dataset_path, get_transform('validation'), n_images, n_annotations, num_augmentations=5)
else:
    print("Augmentation on the validation set has already been made.")

In [None]:
count1 = sum(1 for filename in os.listdir(os.path.join(augmented_train_dataset_path, 'images')) if any(filename.lower().endswith(ext) for ext in ['.jpg', '.jpeg']))
count2 = sum(1 for filename in os.listdir(os.path.join(augmented_validation_dataset_path, 'images')) if any(filename.lower().endswith(ext) for ext in ['.jpg', '.jpeg']))

print(f"Number of images in the train_augmented folder: {count1}") # TODO: make prettier
print(f"Number of images in the validation_augmented folder: {count2}") # TODO: make prettier

### Rewrite Datasets

In [None]:
# TODO: add if condition (if augmentation has been executed)

In [None]:
shutil.rmtree("/tmp/od_data/augmented_train") # TODO: do I need this instruction ?
shutil.rmtree("/tmp/od_data/augmented_validation") # TODO: do I need this instruction ?

In [None]:
train_data = object_detector.Dataset.from_coco_folder(augmented_train_dataset_path, cache_dir="/tmp/od_data/augmented_train")
validation_data = object_detector.Dataset.from_coco_folder(augmented_validation_dataset_path, cache_dir="/tmp/od_data/augmented_validation")

print(f"{'New Training Dataset Size:':<25} {train_data.size:>6} images")
print(f"{'New Validation Dataset Size:':<25} {validation_data.size:>4} images")

## Retrain model

### Set retraining options

In [None]:
spec = object_detector.SupportedModels.MOBILENET_MULTI_AVG_I384

hparams = object_detector.HParams(
    learning_rate=0.01, # 0.015 (is it possible to implement a scheduler?)
    batch_size=64, # try 128, 256
    epochs=100,
    cosine_decay_epochs=100,
    cosine_decay_alpha=0.1,
    shuffle=True, # TODO: check
    export_dir='exported_model'
)

model_options = object_detector.ModelOptions(
    l2_weight_decay=1e-4 # 3e-5
)

options = object_detector.ObjectDetectorOptions(
    supported_model=spec,
    hparams=hparams,
    model_options=model_options
)

### Run retraining

In [None]:
model = object_detector.ObjectDetector.create(
    train_data=train_data,
    validation_data=validation_data,
    options=options
)

### Evaluate the model performance

In [None]:
# TODO: think about saving metrics permanently

In [None]:
loss, coco_metrics = model.evaluate(validation_data, batch_size=32) # TODO: check batch_size
print(f"Validation loss: {loss}")
print(f"Validation coco metrics: {coco_metrics}")

In [None]:
#@title Visualize metrics

# Assuming coco_metrics is a dictionary as shown in the output above

# 1. Graph of the main COCO metrics
plt.figure(figsize=(10, 6))
metrics = ['AP', 'AP50', 'AP75', 'APl', 'ARmax1', 'ARmax10', 'ARmax100']
values = [coco_metrics[m] for m in metrics]
plt.bar(metrics, values)
plt.title('COCO Metrics')
plt.ylabel('Score')
plt.ylim(0, 1)
for i, v in enumerate(values):
    plt.text(i, v, f'{v:.3f}', ha='center', va='bottom')
plt.show()

# 2. Loss distribution
plt.figure(figsize=(10, 6))
loss_types = ['total_loss', 'cls_loss', 'box_loss', 'model_loss']
plt.bar(loss_types, loss)
plt.title('Loss Distribution')
plt.ylabel('Loss Value')
for i, v in enumerate(loss):
    plt.text(i, v, f'{v:.4f}', ha='center', va='bottom')
plt.show()

# 3. AP comparison by object size
plt.figure(figsize=(10, 6))
ap_sizes = ['APl', 'APm', 'APs']
ap_values = [coco_metrics[size] for size in ap_sizes]
plt.bar(ap_sizes, ap_values)
plt.title('AP by Object Size')
plt.ylabel('Average Precision')
plt.ylim(-1, 1)
for i, v in enumerate(ap_values):
    plt.text(i, v, f'{v:.3f}', ha='center', va='bottom')
plt.show()

In [None]:
loss, coco_metrics = model.evaluate(validation_data, batch_size=64) # TODO: check batch_size
print(f"Validation loss: {loss}")
print(f"Validation coco metrics: {coco_metrics}")

In [None]:
#@title Visualize metrics

# Assuming coco_metrics is a dictionary as shown in the output above

# 1. Graph of the main COCO metrics
plt.figure(figsize=(10, 6))
metrics = ['AP', 'AP50', 'AP75', 'APl', 'ARmax1', 'ARmax10', 'ARmax100']
values = [coco_metrics[m] for m in metrics]
plt.bar(metrics, values)
plt.title('COCO Metrics')
plt.ylabel('Score')
plt.ylim(0, 1)
for i, v in enumerate(values):
    plt.text(i, v, f'{v:.3f}', ha='center', va='bottom')
plt.show()

# 2. Loss distribution
plt.figure(figsize=(10, 6))
loss_types = ['total_loss', 'cls_loss', 'box_loss', 'model_loss']
plt.bar(loss_types, loss)
plt.title('Loss Distribution')
plt.ylabel('Loss Value')
for i, v in enumerate(loss):
    plt.text(i, v, f'{v:.4f}', ha='center', va='bottom')
plt.show()

# 3. AP comparison by object size
plt.figure(figsize=(10, 6))
ap_sizes = ['APl', 'APm', 'APs']
ap_values = [coco_metrics[size] for size in ap_sizes]
plt.bar(ap_sizes, ap_values)
plt.title('AP by Object Size')
plt.ylabel('Average Precision')
plt.ylim(-1, 1)
for i, v in enumerate(ap_values):
    plt.text(i, v, f'{v:.3f}', ha='center', va='bottom')
plt.show()

## Export model

In [None]:
# TODO: do I need to remove the existing model first?

In [None]:
model.export_model()
!ls exported_model
files.download('exported_model/model.tflite')

## Model quantization

### Quantization aware training (int8 quantization)

### Post-training quantization (fp16 quantization)

In [None]:
from mediapipe_model_maker import quantization

In [None]:
quantization_config = quantization.QuantizationConfig.for_float16()

In [None]:
model.restore_float_ckpt()
model.export_model(model_name="model_fp16.tflite", quantization_config=quantization_config)
!ls -lh exported_model
files.download('exported_model/model_fp16.tflite')