<a href="https://colab.research.google.com/github/NielsRogge/Transformers-Tutorials/blob/master/DETR/Fine_tuning_DetrForObjectDetection_on_custom_dataset_(balloon).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Install PyTorch Lightning for Model Fine-Tuning


In [None]:
# !pip install -q transformers

In [None]:
!pip install -q pytorch-lightning

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m802.2/802.2 kB[0m [31m2.1 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m868.8/868.8 kB[0m [31m1.9 MB/s[0m eta [36m0:00:00[0m
[?25h

## Download + preprocess data



In [None]:
import numpy as np
import os, json, cv2, random
import torch
import torchvision

In [None]:
# download, decompress the data
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
def convert_image_id(old_id):
    # Split the string using the '-' as the delimiter
    parts = old_id.split('-')
    # Concatenate the numeric parts (omit the non-numeric initial part if present)
    new_id = ''.join(part for part in parts if part.isdigit())
    new_id = int(new_id)
    return new_id

In [None]:
import os
import cv2

def get_custom_data_dicts(images_dir, labels_dir):
    dataset_dicts = []

    # Get list of label files
    label_files = os.listdir(labels_dir)


    for label_file in label_files:
        # Extract image id from label file name
        image_id = label_file.split('.')[0]
        image_path = os.path.join(images_dir, f"{image_id}.jpg")
        label_path = os.path.join(labels_dir, label_file)

        # Load image to get width and height
        img = cv2.imread(image_path)
        height, width, _ = img.shape

        # Read label file
        with open(label_path, 'r') as f:
            lines = f.readlines()

        objs = []
        for line in lines:
            data = line.strip().split()
            class_0, x_cen, y_cen, w, h = map(float, data)
            bbox = [(x_cen - w / 2)*width, (y_cen - h / 2)*height, (x_cen + w / 2)*width , (y_cen + h / 2)*height ]

            obj = {
                "bbox": bbox,
                "category_id": 0, # 0 is the only class in this dataset
            }
            objs.append(obj)

        record = {
            "file_name": image_path,
            "image_id": convert_image_id(image_id),
            "height": height,
            "width": width,
            "annotations": objs,
        }

        dataset_dicts.append(record)


    return dataset_dicts

In [None]:
import json
def save_to_coco_json(output_file, images_dir, labels_dir):
    dataset_dicts = get_custom_data_dicts(images_dir, labels_dir)

    # Convert dataset dicts to COCO format
    coco_format = {
        "images": [],
        "annotations": [],
        "categories": [{"id": 0, "name": "category_name"}]  # Adjust categories as needed
    }

    annotation_id = 1  # Unique ID for each annotation
    for data_dict in dataset_dicts:
        coco_format["images"].append({
            "file_name": data_dict["file_name"],
            "height": data_dict["height"],
            "width": data_dict["width"],
            "id": data_dict["image_id"]
        })
        for annotation in data_dict["annotations"]:
            annotation["id"] = annotation_id
            annotation["image_id"] = data_dict["image_id"]
            annotation["area"] = annotation["bbox"][2] * annotation["bbox"][3]  # width * height
            annotation["iscrowd"] = 0
            annotation["segmentation"] = []  # Optional: Add if you have segmentation data
            coco_format["annotations"].append(annotation)
            annotation_id += 1

    # Ensure the output directory exists
    os.makedirs(os.path.dirname(output_file), exist_ok=True)

    # Write data to the JSON file with indentation
    with open(output_file, 'w') as f:
        json.dump(coco_format, f, indent=4)

# Paths
images_train_dir = '/content/drive/MyDrive/wb_localization_dataset/images/train'
labels_train_dir = '/content/drive/MyDrive/wb_localization_dataset/labels/train'
images_val_dir = '/content/drive/MyDrive/wb_localization_dataset/images/val'
labels_val_dir = '/content/drive/MyDrive/wb_localization_dataset/labels/val'
output_train = '/content/drive/MyDrive/wb_localization_dataset/train.json'
output_val = '/content/drive/MyDrive/wb_localization_dataset/val.json'

# Save to JSON
save_to_coco_json(output_train, images_train_dir, labels_train_dir)
save_to_coco_json(output_val, images_val_dir, labels_val_dir)


## Create PyTorch dataset + dataloaders
The standard way in PyTorch to train a model is by creating datasets and a corresponding dataloaders.
Here we define a regular PyTorch dataset. Each item of the dataset is an image and corresponding annotations. Torchvision already provides a `CocoDetection` dataset, which we can use. We only add an image processor (`DetrImageProcessor`) to resize + normalize the images, and to turn the annotations (which are in COCO format) in the format that DETR expects. It will also resize the annotations accordingly.

In [None]:
class CocoDetection(torchvision.datasets.CocoDetection):
    """
    Extended COCO detection dataset class that allows custom preprocessing.

    Parameters:
        img_folder (str): Path to the image folder.
        processor (callable): A callable that preprocesses the images and annotations.
        mode (str): 'train' or 'val' to specify the dataset mode.
        ann_file (str): Path to the annotation file.
    """
    def __init__(self, img_folder, processor, mode='train', ann_file=None):
        if ann_file is None:
            ann_file = os.path.join(img_folder, f"{mode}.json")
        super(CocoDetection, self).__init__(img_folder, ann_file)
        self.processor = processor

    def __getitem__(self, idx):
        img, target = super(CocoDetection, self).__getitem__(idx)
        image_id = self.ids[idx]
        target = {'image_id': image_id, 'annotations': target}

        # Apply preprocessing defined by the processor
        encoding = self.processor(images=img, annotations=target, return_tensors="pt")
        pixel_values = encoding["pixel_values"].squeeze()  # Assuming processor handles batching
        target = encoding["labels"][0]  # Assuming processor handles target structure

        return pixel_values, target


Based on the class defined above, we create training and validation datasets.

In [None]:
from transformers import DetrImageProcessor

processor = DetrImageProcessor.from_pretrained("facebook/detr-resnet-50")
train_dataset = CocoDetection(img_folder='/content/drive/MyDrive/wb_localization_dataset/images/train', processor=processor, ann_file='/content/drive/MyDrive/wb_localization_dataset/train.json')
val_dataset = CocoDetection(img_folder='/content/drive/MyDrive/wb_localization_dataset/images/val', processor=processor, ann_file='/content/drive/MyDrive/wb_localization_dataset/val.json')

As you can see, this dataset is tiny:

In [None]:
print("Number of training examples:", len(train_dataset))
print("Number of validation examples:", len(val_dataset))

Let's verify an example by visualizing it. We can access the COCO API of the dataset by typing `train_dataset.coco`.

In [None]:
from pycocotools.coco import COCO
import numpy as np
import os
from PIL import Image, ImageDraw
try:
    coco = COCO('/content/drive/MyDrive/wb_localization_dataset/train.json')
    print("Loaded annotations successfully.")
except Exception as e:
    print(f"Failed to load annotations: {e}")

In [None]:
!pip install pycocotools

In [None]:
import numpy as np
import os
from PIL import Image, ImageDraw

# based on https://github.com/woctezuma/finetune-detr/blob/master/finetune_detr.ipynb
image_ids = train_dataset.coco.getImgIds()
# let's pick a random image
image_id = image_ids[np.random.randint(0, len(image_ids))]
print('Image n°{}'.format(image_id))
image = train_dataset.coco.loadImgs(image_id)[0]
image = Image.open(os.path.join('/content/drive/MyDrive/wb_localization_dataset/images/train', image['file_name']))

annotations = train_dataset.coco.imgToAnns[image_id]
draw = ImageDraw.Draw(image, "RGBA")

cats = train_dataset.coco.cats
id2label = {k: v['name'] for k,v in cats.items()}

for annotation in annotations:
  box = annotation['bbox']
  class_idx = annotation['category_id']
  x,y,w,h = tuple(box)
  draw.rectangle((x,y,w,h), outline='red', width=1)
  draw.text((x, y), id2label[class_idx], fill='white')

image

In [None]:
from torch.utils.data import DataLoader

def collate_fn(batch):
  pixel_values = [item[0] for item in batch]
  encoding = processor.pad(pixel_values, return_tensors="pt")
  labels = [item[1] for item in batch]
  batch = {}
  batch['pixel_values'] = encoding['pixel_values']
  batch['pixel_mask'] = encoding['pixel_mask']
  batch['labels'] = labels
  return batch

train_dataloader = DataLoader(train_dataset, collate_fn=collate_fn, batch_size=4, shuffle=True)
val_dataloader = DataLoader(val_dataset, collate_fn=collate_fn, batch_size=2)
batch = next(iter(train_dataloader))

## Train the model using PyTorch Lightning

In [None]:
import pytorch_lightning as pl
from transformers import DetrForObjectDetection
import torch

class Detr(pl.LightningModule):
     def __init__(self, lr, lr_backbone, weight_decay):
         super().__init__()
         # replace COCO classification head with custom head
         # we specify the "no_timm" variant here to not rely on the timm library
         # for the convolutional backbone
         self.model = DetrForObjectDetection.from_pretrained("facebook/detr-resnet-50",
                                                             revision="no_timm",
                                                             num_labels=len(id2label),
                                                             ignore_mismatched_sizes=True)
         # see https://github.com/PyTorchLightning/pytorch-lightning/pull/1896
         self.lr = lr
         self.lr_backbone = lr_backbone
         self.weight_decay = weight_decay

     def forward(self, pixel_values, pixel_mask):
       outputs = self.model(pixel_values=pixel_values, pixel_mask=pixel_mask)

       return outputs

     def common_step(self, batch, batch_idx):
       pixel_values = batch["pixel_values"]
       pixel_mask = batch["pixel_mask"]
       labels = [{k: v.to(self.device) for k, v in t.items()} for t in batch["labels"]]

       outputs = self.model(pixel_values=pixel_values, pixel_mask=pixel_mask, labels=labels)

       loss = outputs.loss
       loss_dict = outputs.loss_dict

       return loss, loss_dict

     def training_step(self, batch, batch_idx):
        loss, loss_dict = self.common_step(batch, batch_idx)
        # logs metrics for each training_step,
        # and the average across the epoch
        self.log("training_loss", loss)
        for k,v in loss_dict.items():
          self.log("train_" + k, v.item())

        return loss

     def validation_step(self, batch, batch_idx):
        loss, loss_dict = self.common_step(batch, batch_idx)
        self.log("validation_loss", loss)
        for k,v in loss_dict.items():
          self.log("validation_" + k, v.item())

        return loss

     def configure_optimizers(self):
        param_dicts = [
              {"params": [p for n, p in self.named_parameters() if "backbone" not in n and p.requires_grad]},
              {
                  "params": [p for n, p in self.named_parameters() if "backbone" in n and p.requires_grad],
                  "lr": self.lr_backbone,
              },
        ]
        optimizer = torch.optim.AdamW(param_dicts, lr=self.lr,
                                  weight_decay=self.weight_decay)

        return optimizer

     def train_dataloader(self):
        return train_dataloader

     def val_dataloader(self):
        return val_dataloader

Here we define the model, and verify the outputs.

In [None]:
model = Detr(lr=1e-4, lr_backbone=1e-5, weight_decay=1e-4)

outputs = model(pixel_values=batch['pixel_values'], pixel_mask=batch['pixel_mask'])

In [None]:
outputs.logits.shape

Next, let's train!

In [None]:
from pytorch_lightning import Trainer

trainer = Trainer(max_epochs=50, gradient_clip_val=0.1)
trainer.fit(model)

## Evaluate the model

Finally, we evaluate the model on the validation set.

In [None]:
!pip install -q coco-eval

In [None]:
def convert_to_xywh(boxes):
    xmin, ymin, xmax, ymax = boxes.unbind(1)
    return torch.stack((xmin, ymin, xmax- xmin,ymax-ymin), dim=1)

def prepare_for_coco_detection(predictions):
    coco_results = []
    for original_id, prediction in predictions.items():
        if len(prediction) == 0:
            continue

        boxes = prediction["boxes"]
        boxes = boxes.tolist()
        print(boxes)
        scores = prediction["scores"].tolist()
        labels = prediction["labels"].tolist()

        coco_results.extend(
            [
                {
                    "image_id": original_id,
                    "category_id": labels[k],
                    "bbox": box,
                    "score": scores[k],
                }
                for k, box in enumerate(boxes)
            ]
        )
    return coco_results

 Let's run the evaluation:

In [None]:
from coco_eval import CocoEvaluator
from tqdm.notebook import tqdm

import numpy as np
# Define the device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# initialize evaluator with ground truth (gt)
evaluator = CocoEvaluator(coco_gt=val_dataset.coco, iou_types=["bbox"])
model = model.to(device)
print("Running evaluation...")
for idx, batch in enumerate(tqdm(val_dataloader)):
    # get the inputs
    pixel_values = batch["pixel_values"].to(device)
    pixel_mask = batch["pixel_mask"].to(device)
    labels = [{k: v.to(device) for k, v in t.items()} for t in batch["labels"]] # these are in DETR format, resized + normalized

    # forward pass
    with torch.no_grad():
      outputs = model(pixel_values=pixel_values, pixel_mask=pixel_mask)

    # turn into a list of dictionaries (one item for each example in the batch)
    orig_target_sizes = torch.stack([target["orig_size"] for target in labels], dim=0)
    results = processor.post_process_object_detection(outputs, target_sizes=orig_target_sizes, threshold=0)

    # provide to metric
    # metric expects a list of dictionaries, each item
    # containing image_id, category_id, bbox and score keys
    predictions = {target['image_id'].item(): output for target, output in zip(labels, results)}
    predictions = prepare_for_coco_detection(predictions)
    evaluator.update(predictions)

evaluator.synchronize_between_processes()
evaluator.accumulate()
evaluator.summarize()

Based on the evaluation using the Average Precision (AP) metric, it's evident that this model's performance significantly lags behind that of YOLOv5 or YOLO v8. Consequently, I've made the decision to discontinue utilizing the DETR approach.