In [55]:
from datasets import load_dataset, Dataset
import os
from collections import defaultdict
import json 
from PIL import Image, ImageDraw
from functools import partial
import numpy as np
from transformers.image_transforms import center_to_corners_format
import torch

## Create dataset

In [56]:
annotation_file = "/home/taheera.ahmed/data/reindeerdrone/tiles/new_annotations.json"
data_dir = "/home/taheera.ahmed/data/reindeerdrone/tiles"
img_dir = "/home/taheera.ahmed/data/reindeerdrone/tiles/images"

In [86]:
from collections import defaultdict
from PIL import Image
import os

datasets = load_dataset('json', data_files=annotation_file, field='annotations')

# Function to process image IDs
def process_image_id(image_id):
    stripped_id = image_id.replace("DSC", "").replace("_tile", "").replace(".png", "")
    return int(stripped_id)

# Function to load and prepare the dataset
def prepare_dataset(examples, img_dir):
    # Initialize data for each image
    combined = {}
    for img_name in os.listdir(img_dir):
        image_path = os.path.join(img_dir, img_name)
        image = Image.open(image_path)
        width, height = image.size
        image_id = img_name  # Use filename as image_id
        combined[image_id] = {
            "image_id": image_id,
            "image": image,
            "width": width,
            "height": height,
            "objects": {'id': [], 'area': [], 'category': [], 'bbox': []}
        }

    # Add annotation data to the corresponding image entries
    for count, example in enumerate(examples):
        image_id = example['image_id']
        bbox = example['bbox']
        area = example['area']
        category_id = example['category_id']
        
        if image_id in combined:
            combined[image_id]["objects"]["bbox"].append(bbox)
            combined[image_id]["objects"]["category"].append(category_id)
            combined[image_id]["objects"]["area"].append(area)
            combined[image_id]["objects"]["id"].append(count)
    
    # Convert to a list of dictionaries for the dataset
    combined_data = []
    for image_id, data in combined.items():
        combined_data.append({
            "image_id": process_image_id(data["image_id"]),
            "image": data["image"],
            "width": data["width"],
            "height": data["height"],
            "objects": data["objects"]
        })
    
    return combined_data

# Call the function with paths and datasets
combined_annotations = prepare_dataset(datasets['train'], img_dir)
dataset = Dataset.from_list(combined_annotations)
print(dataset)


  3%|▎         | 27/810 [2:11:53<63:44:50, 293.09s/it]


Dataset({
    features: ['image_id', 'image', 'width', 'height', 'objects'],
    num_rows: 1925
})


In [87]:
split = dataset.train_test_split(test_size=0.15, seed=1337)

dataset = {
    'train': split['train'],
    'validation': split['test']
}

print("Training Set:")
print(dataset['train'])
print("Validation Set:")
print(dataset['validation'])

dataset["train"][15]


Training Set:
Dataset({
    features: ['image_id', 'image', 'width', 'height', 'objects'],
    num_rows: 1636
})
Validation Set:
Dataset({
    features: ['image_id', 'image', 'width', 'height', 'objects'],
    num_rows: 289
})


{'image_id': 45717,
 'image': <PIL.PngImagePlugin.PngImageFile image mode=RGB size=1024x1024>,
 'width': 1024,
 'height': 1024,
 'objects': {'area': [], 'bbox': [], 'category': [], 'id': []}}

## did it work?

In [88]:
# read json file and get categories 
with open(annotation_file, 'r') as file:
    annotations = json.load(file)
    
categories = annotations['categories']
categories = [{"id": category['id'], "name": category['name']} for category in categories]
categories

[{'id': 0, 'name': 'Adult'}, {'id': 1, 'name': 'Calf'}]

In [89]:
annotations = dataset["train"][15]["objects"]["bbox"]
image = dataset["train"][15]["image"]
draw = ImageDraw.Draw(image)

for box in annotations:
    xmin, ymin, width, height = box
    xmax = xmin + width
    ymax = ymin + height
    draw.rectangle([xmin, ymin, xmax, ymax], outline="red", width=3)

image.show()

Opening in existing browser session.


## Pre-process data

In [92]:
from transformers import AutoImageProcessor

MAX_SIZE = 1024
MODEL_NAME = "facebook/detr-resnet-50"

image_processor = AutoImageProcessor.from_pretrained(
    MODEL_NAME,
    do_resize=True,
    size={"max_height": MAX_SIZE, "max_width": MAX_SIZE},
    do_pad=True,
    pad_size={"height": MAX_SIZE, "width": MAX_SIZE},
)

In [93]:
import albumentations as A

train_augment_and_transform = A.Compose(
    [
        #A.Perspective(p=0.1),
        A.HorizontalFlip(p=0.5),
        #A.RandomBrightnessContrast(p=0.5),
        #A.HueSaturationValue(p=0.1),
    ],
    bbox_params=A.BboxParams(format="coco", label_fields=["category"], clip=True, min_area=25),
)

validation_transform = A.Compose(
    [A.NoOp()],
    bbox_params=A.BboxParams(format="coco", label_fields=["category"], clip=True),
)

def format_image_annotations_as_coco(image_id, categories, areas, bboxes):
    """Format one set of image annotations to the COCO format

    Args:
        image_id (str): image id. e.g. "0001"
        categories (List[int]): list of categories/class labels corresponding to provided bounding boxes
        areas (List[float]): list of corresponding areas to provided bounding boxes
        bboxes (List[Tuple[float]]): list of bounding boxes provided in COCO format
            ([center_x, center_y, width, height] in absolute coordinates)

    Returns:
        dict: {
            "image_id": image id,
            "annotations": list of formatted annotations
        }
    """
    annotations = []
    for category, area, bbox in zip(categories, areas, bboxes):
        formatted_annotation = {
            "image_id": image_id,
            "category_id": category,
            "iscrowd": 0,
            "area": area,
            "bbox": list(bbox),
        }
        annotations.append(formatted_annotation)

    return {
        "image_id": image_id,
        "annotations": annotations,
    }

def augment_and_transform_batch(examples, transform, image_processor, return_pixel_mask=False):
    """Apply augmentations and format annotations in COCO format for object detection task"""

    images = []
    annotations = []
    for image_id, image, objects in zip(examples["image_id"], examples["image"], examples["objects"]):
        image = np.array(image.convert("RGB"))

        # apply augmentations
        output = transform(image=image, bboxes=objects["bbox"], category=objects["category"])
        images.append(output["image"])

        # format annotations in COCO format
        formatted_annotations = format_image_annotations_as_coco(
            image_id, output["category"], objects["area"], output["bboxes"]
        )
        annotations.append(formatted_annotations)

    # Apply the image processor transformations: resizing, rescaling, normalization
    result = image_processor(images=images, annotations=annotations, return_tensors="pt")

    if not return_pixel_mask:
        result.pop("pixel_mask", None)

    return result

In [94]:

train_transform_batch = partial(
    augment_and_transform_batch, transform=train_augment_and_transform, image_processor=image_processor
)
validation_transform_batch = partial(
    augment_and_transform_batch, transform=validation_transform, image_processor=image_processor
)

dataset["train"] = dataset["train"].with_transform(train_transform_batch)
dataset["validation"] = dataset["validation"].with_transform(validation_transform_batch)

dataset["train"][2]

{'pixel_values': tensor([[[ 0.0912,  0.1426,  0.1768,  ..., -0.0287, -0.0116, -0.7479],
          [ 0.3481,  0.5193,  0.4679,  ..., -0.0458, -0.4226, -0.9192],
          [ 0.4508,  0.5193,  0.5193,  ..., -0.1657, -0.3883, -0.8507],
          ...,
          [-0.7308, -0.5082, -0.4226,  ..., -0.3541, -0.1828, -0.1143],
          [-0.5938, -0.3369, -0.4911,  ...,  0.0912, -0.0116, -0.1486],
          [-0.4226, -0.3027, -0.5082,  ..., -0.2513, -0.1486, -0.1657]],
 
         [[ 0.2927,  0.3452,  0.3452,  ...,  0.3102,  0.3277, -0.4251],
          [ 0.5028,  0.6954,  0.6429,  ...,  0.2927, -0.0924, -0.6001],
          [ 0.6078,  0.6954,  0.6604,  ...,  0.1527, -0.0574, -0.5301],
          ...,
          [-0.6352, -0.4076, -0.3200,  ..., -0.4251, -0.2150, -0.0574],
          [-0.4951, -0.2325, -0.3901,  ...,  0.0126, -0.0049, -0.0749],
          [-0.3200, -0.1975, -0.3725,  ..., -0.1800, -0.0749, -0.0574]],
 
         [[-0.0964, -0.0790, -0.1138,  ..., -0.2707, -0.2881, -1.0376],
          [ 

In [95]:
def collate_fn(batch):
    data = {}
    data["pixel_values"] = torch.stack([x["pixel_values"] for x in batch])
    data["labels"] = [x["labels"] for x in batch]
    if "pixel_mask" in batch[0]:
        data["pixel_mask"] = torch.stack([x["pixel_mask"] for x in batch])
    return data

## Preparing function to compute mAP

In [96]:
def convert_bbox_yolo_to_pascal(boxes, image_size):
    """
    Convert bounding boxes from YOLO format (x_center, y_center, width, height) in range [0, 1]
    to Pascal VOC format (x_min, y_min, x_max, y_max) in absolute coordinates.

    Args:
        boxes (torch.Tensor): Bounding boxes in YOLO format
        image_size (Tuple[int, int]): Image size in format (height, width)

    Returns:
        torch.Tensor: Bounding boxes in Pascal VOC format (x_min, y_min, x_max, y_max)
    """
    # convert center to corners format
    boxes = center_to_corners_format(boxes)

    # convert to absolute coordinates
    height, width = image_size
    boxes = boxes * torch.tensor([[width, height, width, height]])

    return boxes

In [97]:
from dataclasses import dataclass
from torchmetrics.detection.mean_ap import MeanAveragePrecision


@dataclass
class ModelOutput:
    logits: torch.Tensor
    pred_boxes: torch.Tensor
    
@torch.no_grad()
def compute_average_precision(evaluation_results, image_processor, threshold=0.0, id2label=None):
    """
    Compute Average Precision (AP) for each class in an object detection task.

    Args:
        evaluation_results (EvalPrediction): Predictions and targets from evaluation.
        threshold (float, optional): Threshold to filter predicted boxes by confidence. Defaults to 0.0.
        id2label (Optional[dict], optional): Mapping from class id to class name. Defaults to None.

    Returns:
        Mapping[str, float]: Metrics in a form of dictionary {<class_name>: <AP_value>}
    """
    
    predictions, targets = evaluation_results.predictions, evaluation_results.label_ids

    image_sizes = []
    post_processed_targets = []
    post_processed_predictions = []

    # Collect targets in the required format for metric computation
    for batch in targets:
        batch_image_sizes = torch.tensor(np.array([x["orig_size"] for x in batch]))
        image_sizes.append(batch_image_sizes)
        for image_target in batch:
            boxes = torch.tensor(image_target["boxes"])
            boxes = convert_bbox_yolo_to_pascal(boxes, image_target["orig_size"])
            labels = torch.tensor(image_target["class_labels"])
            post_processed_targets.append({"boxes": boxes, "labels": labels})

    # Collect predictions in the required format for metric computation
    for batch, target_sizes in zip(predictions, image_sizes):
        batch_logits, batch_boxes = batch[1], batch[2]
        output = ModelOutput(logits=torch.tensor(batch_logits), pred_boxes=torch.tensor(batch_boxes))
        post_processed_output = image_processor.post_process_object_detection(
            output, threshold=threshold, target_sizes=target_sizes
        )
        post_processed_predictions.extend(post_processed_output)

    # Compute metrics using MeanAveragePrecision (this will provide class-level AP)
    metric = MeanAveragePrecision(box_format="xyxy", class_metrics=True)
    metric.update(post_processed_predictions, post_processed_targets)
    metrics = metric.compute()

    # Extract per-class AP values
    classes = metrics.pop("classes")
    ap_per_class = metrics.pop("map_per_class")  # This gives AP per class

    # Format results as {class_name: AP_value}
    ap_results = {}
    ap_values = []
    for class_id, class_ap in zip(classes, ap_per_class):
        class_name = id2label[class_id.item()] if id2label is not None else f"class_{class_id.item()}"
        ap_results[f"ap_{class_name}"] = round(class_ap.item(), 4)
        ap_values.append(class_ap.item())  # Collect AP values

    # Calculate general AP (mean AP across all classes)
    ap_results["eval_ap"] = sum(ap_values) / len(ap_values)

    return ap_results


In [98]:
id2label = {category["id"]: category["name"] for category in categories}
label2id = {category["name"]: category["id"] for category in categories}

eval_compute_metrics_fn = partial(
    compute_average_precision, image_processor=image_processor, id2label=id2label, threshold=0.0
)

## Training the model

In [99]:
from transformers import AutoModelForObjectDetection

model = AutoModelForObjectDetection.from_pretrained(
    MODEL_NAME,
    id2label=id2label,
    label2id=label2id,
    ignore_mismatched_sizes=True,
)


Some weights of the model checkpoint at facebook/detr-resnet-50 were not used when initializing DetrForObjectDetection: ['model.backbone.conv_encoder.model.layer1.0.downsample.1.num_batches_tracked', 'model.backbone.conv_encoder.model.layer2.0.downsample.1.num_batches_tracked', 'model.backbone.conv_encoder.model.layer3.0.downsample.1.num_batches_tracked', 'model.backbone.conv_encoder.model.layer4.0.downsample.1.num_batches_tracked']
- This IS expected if you are initializing DetrForObjectDetection from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing DetrForObjectDetection from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of DetrForObjectDetection were not initialized from the model checkpoin

In [100]:
from transformers import TrainingArguments

training_args = TrainingArguments(
    output_dir="detr_finetuned_reindeerdrone",
    num_train_epochs=30,
    fp16=False,
    per_device_train_batch_size=8,
    dataloader_num_workers=4,
    learning_rate=5e-5,
    lr_scheduler_type="cosine",
    weight_decay=1e-4,
    max_grad_norm=0.01,
    metric_for_best_model="eval_ap",
    greater_is_better=True,
    load_best_model_at_end=True,
    eval_strategy="epoch",
    save_strategy="epoch",
    save_total_limit=2,
    remove_unused_columns=False,
    eval_do_concat_batches=False,
    push_to_hub=False,
)

In [102]:
from transformers import Trainer

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=dataset["train"],
    eval_dataset=dataset["validation"],
    tokenizer=image_processor,
    data_collator=collate_fn,
    compute_metrics=eval_compute_metrics_fn,
)

trainer.train()



KeyboardInterrupt: 