In [None]:
import fiftyone as fo

from utils.dataset_loader import FiftyOneTorchDatasetCOCO, TorchToHFDatasetCOCO
from utils.hf_train import collate_fn, transform_batch
from transformers import AutoProcessor, AutoModelForObjectDetection, EarlyStoppingCallback, Trainer, TrainingArguments
from datasets import Split
from functools import partial
import torch
from PIL import Image

In [None]:
# Dataset conversion into Hugging Face format
dataset_v51 = fo.load_dataset("fisheye8k-100")
dataset_torch = FiftyOneTorchDatasetCOCO(dataset_v51, gt_field="detections")
converter_torch2hf = TorchToHFDatasetCOCO(dataset_torch)
dataset_hf = converter_torch2hf.convert()

In [None]:
# Class Mapping from ID to Name
classes = dataset_v51.default_classes
id2label = {i: class_name for i, class_name in enumerate(classes)}
label2id = {v: k for k, v in id2label.items()}

In [None]:
# Object Detection Finetuning
# https://huggingface.co/docs/transformers/en/tasks/object_detection

MODEL_NAME = "microsoft/conditional-detr-resnet-50"
MAX_SIZE = 512 # If tiny GPU memory

# Preprocess setup
image_processor = AutoProcessor.from_pretrained(
    MODEL_NAME,
    size={"max_height": MAX_SIZE, "max_width": MAX_SIZE},
    do_pad=True,
    pad_size={"height": MAX_SIZE, "width": MAX_SIZE})

# Model to be used
model = AutoModelForObjectDetection.from_pretrained(
    MODEL_NAME,
    id2label=id2label,
    label2id=label2id,
    ignore_mismatched_sizes=True, # Allows for different number of classes compared to pre-trained
)

In [None]:
# Apply transforms on-the-fly
split_transform_batch = partial(
            transform_batch,
            image_processor=image_processor,
        )

dataset_hf[Split.TRAIN] = dataset_hf[Split.TRAIN].with_transform(
            split_transform_batch)
dataset_hf[Split.VALIDATION] = dataset_hf[Split.VALIDATION].with_transform(
            split_transform_batch)
dataset_hf[Split.TEST] = dataset_hf[Split.TEST].with_transform(
            split_transform_batch)

In [None]:
# Training Arguments (well documented)
training_args = TrainingArguments(
    run_name=MODEL_NAME,
    num_train_epochs=36,
    fp16=True,
    per_device_train_batch_size=16,
    auto_find_batch_size=True,
    dataloader_num_workers=8,
    learning_rate=5e-05,
    lr_scheduler_type="cosine",
    weight_decay=0.0001,
    max_grad_norm=0.01,
    metric_for_best_model="eval_loss",
    greater_is_better=False,
    load_best_model_at_end=True,
    eval_strategy="epoch",
    save_strategy="best",
    save_total_limit=1,
    remove_unused_columns=False,
    eval_do_concat_batches=False,
    save_safetensors=False,
    push_to_hub=False,
)

In [None]:
# Stop training early if val performance stops improving
early_stopping_callback = EarlyStoppingCallback(
    early_stopping_patience=3,
    early_stopping_threshold=0,
)

In [None]:
# Hugging Face Trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=dataset_hf[Split.TRAIN],
    eval_dataset=dataset_hf[Split.VALIDATION],
    tokenizer=image_processor,
    data_collator=collate_fn,
    callbacks=[early_stopping_callback],
)

In [None]:
# Training
trainer.train()

In [None]:
# Evaluation
metrics = trainer.evaluate(eval_dataset=dataset_hf[Split.TEST])
print(f"Model training completed. Evaluation results: {metrics}")

In [None]:
# Run inference on test set in Voxel51
eval_view = dataset_v51.match_tags("test")
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
with torch.no_grad():
    for sample in eval_view.iter_samples(progress=True, autosave=True):
        image_width = sample.metadata.width
        image_height = sample.metadata.height
        img_filepath = sample.filepath
        image = Image.open(img_filepath)

        inputs = image_processor(images=[image], return_tensors="pt")
        outputs = model(**inputs.to(device))
        target_sizes = torch.tensor([[image.size[1], image.size[0]]])
        results = image_processor.post_process_object_detection(outputs, threshold=0.2, target_sizes=target_sizes)[0]

        # Transfer output to Voxel51
        detections = []
        for score, label, box in zip(
            results["scores"], results["labels"], results["boxes"]
        ):
            # Bbox is in absolute coordinates x, y, x2, y2
            box = box.tolist()
            text_label = model.config.id2label[label.item()]

            # Voxel51 requires relative coordinates between 0 and 1
            top_left_x = box[0] / image_width
            top_left_y = box[1] / image_height
            box_width = (box[2] - box[0]) / image_width
            box_height = (box[3] - box[1]) / image_height
            detection = fo.Detection(
                label=text_label,
                bounding_box=[
                    top_left_x,
                    top_left_y,
                    box_width,
                    box_height,
                ],
                confidence=score.item(),
            )
            detections.append(detection)

        sample["conditional_detr"] = fo.Detections(detections=detections)


In [None]:
# Evaluate detections
eval_view.evaluate_detections(
    "conditional_detr",
    gt_field="detections",
    eval_key="eval_conditional_detr",
    compute_mAP=True,
)

In [None]:
# Launch Voxel51 GUI
fo.launch_app(eval_view)