In [None]:
from transformers import AutoModelForObjectDetection, AutoImageProcessor, TrainingArguments, Trainer
import torch
from torch.utils.data import Dataset
import os
import json
from PIL import Image, ImageDraw
import kagglehub
from pycocotools.coco import COCO
from pycocotools.cocoeval import COCOeval

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
image_width = 416
image_height = 416

CLASSES = [
    "Green Light",
    "Red Light",
    "Speed Limit 10",
    "Speed Limit 100",
    "Speed Limit 110",
    "Speed Limit 120",
    "Speed Limit 20",
    "Speed Limit 30",
    "Speed Limit 40",
    "Speed Limit 50",
    "Speed Limit 60",
    "Speed Limit 70",
    "Speed Limit 80",
    "Speed Limit 90",
    "Stop",
]

In [3]:
# Load pre-trained model and processor
model_name = "PekingU/rtdetr_v2_r18vd"
image_processor = AutoImageProcessor.from_pretrained(
    model_name,
    do_resize=True,
    size={"width": image_width, "height": image_height},
    use_fast=True,
)
model = AutoModelForObjectDetection.from_pretrained(
    model_name, num_labels=len(CLASSES), ignore_mismatched_sizes=True
)

Using `use_fast=True` but `torchvision` is not available. Falling back to the slow image processor.
Some weights of RTDetrV2ForObjectDetection were not initialized from the model checkpoint at PekingU/rtdetr_v2_r18vd and are newly initialized because the shapes did not match:
- model.decoder.class_embed.0.bias: found shape torch.Size([80]) in the checkpoint and torch.Size([15]) in the model instantiated
- model.decoder.class_embed.0.weight: found shape torch.Size([80, 256]) in the checkpoint and torch.Size([15, 256]) in the model instantiated
- model.decoder.class_embed.1.bias: found shape torch.Size([80]) in the checkpoint and torch.Size([15]) in the model instantiated
- model.decoder.class_embed.1.weight: found shape torch.Size([80, 256]) in the checkpoint and torch.Size([15, 256]) in the model instantiated
- model.decoder.class_embed.2.bias: found shape torch.Size([80]) in the checkpoint and torch.Size([15]) in the model instantiated
- model.decoder.class_embed.2.weight: found shape

In [4]:
data_path = kagglehub.dataset_download("pkdarabi/cardetection")

data_path = os.path.join(data_path, "car")
print("Path to dataset files:", data_path)

Path to dataset files: C:\Users\Daniel\.cache\kagglehub\datasets\pkdarabi\cardetection\versions\5\car


In [5]:
class COCODataset(Dataset):
    def __init__(self, coco_json_path, image_root, image_processor):
        with open(coco_json_path, "r") as f:
            coco = json.load(f)

        self.image_root = image_root
        self.image_processor = image_processor

        # Map image_id to file_name
        self.images = {img["id"]: img for img in coco["images"]}

        # Group annotations by image_id
        self.annotations = {}
        for ann in coco["annotations"]:
            img_id = ann["image_id"]
            if img_id not in self.annotations:
                self.annotations[img_id] = []
            self.annotations[img_id].append(ann)

        self.image_ids = list(self.images.keys())

    def __len__(self):
        return len(self.image_ids)

    def __getitem__(self, idx):
        image_id = self.image_ids[idx]
        img_info = self.images[image_id]
        anns = self.annotations.get(image_id, [])

        img_path = os.path.join(self.image_root, img_info["file_name"])
        image = Image.open(img_path).convert("RGB")

        # Get class labels and boxes
        class_labels = [ann["category_id"] for ann in anns]
        boxes = [ann["bbox"] for ann in anns]  # COCO bbox format: [x, y, w, h]

        # Apply image_processor (e.g., DetrImageProcessor)
        encoding = self.image_processor(
            images=image,
            annotations={"image_id": image_id, "annotations": anns},
            return_tensors="pt",
        )

        for k, v in encoding.items():
            if hasattr(v, "squeeze"):
                encoding[k] = v.squeeze()

        encoding["labels"] = {
            "class_labels": torch.tensor(class_labels, dtype=torch.int64),
            "boxes": torch.tensor(boxes, dtype=torch.float32),
        }

        return encoding

In [6]:
train_dataset = COCODataset(
    coco_json_path=os.path.join(data_path, "train", "cocoann.json"),
    image_root=os.path.join(data_path, "train", "images"),
    image_processor=image_processor,
)

val_dataset = COCODataset(
    coco_json_path=os.path.join(data_path, "valid", "cocoann.json"),
    image_root=os.path.join(data_path, "valid", "images"),
    image_processor=image_processor,
)

test_dataset = COCODataset(
    coco_json_path=os.path.join(data_path, "test", "cocoann.json"),
    image_root=os.path.join(data_path, "test", "images"),
    image_processor=image_processor,
)

In [None]:
# Set up training arguments
training_args = TrainingArguments(
    output_dir="./rtdetrv2-finetuned",
    per_device_train_batch_size=4,
    per_device_eval_batch_size=4,
    num_train_epochs=5,
    logging_dir="./logs",
    eval_strategy="epoch",
    save_strategy="epoch",
    load_best_model_at_end=True,
    metric_for_best_model="mAP",
    greater_is_better=True,
)


# Define a custom compute_metrics function
def compute_metrics(eval_pred):
    predictions, labels = eval_pred

    # Format the predictions to COCO format
    coco_predictions = []
    for i, (boxes, scores, labels_pred) in enumerate(
        zip(predictions["boxes"], predictions["scores"], predictions["labels"])
    ):
        for box, score, label in zip(boxes, scores, labels_pred):
            x1, y1, x2, y2 = box.tolist()
            w, h = x2 - x1, y2 - y1
            coco_predictions.append(
                {
                    "image_id": predictions["image_ids"][i],
                    "category_id": int(label),
                    "bbox": [x1, y1, w, h],
                    "score": float(score),
                }
            )

    # Use COCOeval for evaluation
    coco_gt = COCO(annotations=labels)  # Ground truth
    coco_dt = coco_gt.loadRes(coco_predictions)  # Predictions
    coco_eval = COCOeval(coco_gt, coco_dt, "bbox")
    coco_eval.evaluate()
    coco_eval.accumulate()
    coco_eval.summarize()

    # Return metrics
    return {
        "mAP": float(coco_eval.stats[0]),  # mAP@[.5:.95]
        "mAP_50": float(coco_eval.stats[1]),  # mAP@.50
        "mAP_75": float(coco_eval.stats[2]),  # mAP@.75
        "mAP_small": float(coco_eval.stats[3]),  # mAP@[.5:.95] small objects
        "mAP_medium": float(coco_eval.stats[4]),  # mAP@[.5:.95] medium objects
        "mAP_large": float(coco_eval.stats[5]),  # mAP@[.5:.95] large objects
    }


def collate_fn(batch):
    return {
        "pixel_values": torch.stack([item["pixel_values"] for item in batch]),
        "labels": [item["labels"] for item in batch],
    }


# Create Trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    compute_metrics=compute_metrics,
    data_collator=collate_fn,
)

In [8]:
# Start training
trainer.train()

# Save the model
trainer.save_model("./rtdetrv2-finetuned-final")



Epoch,Training Loss,Validation Loss


KeyboardInterrupt: 

In [None]:
# Evaluate on test set
results = trainer.evaluate(eval_dataset=test_dataset, metric_key_prefix="test")
print(results)

In [None]:
# Example inference
def predict(image_path):
    image = Image.open(image_path).convert("RGB")
    inputs = image_processor(images=image, return_tensors="pt")

    with torch.no_grad():
        outputs = model(**inputs)

    # Post-process results
    target_sizes = torch.tensor([image.size[::-1]])
    results = image_processor.post_process_object_detection(
        outputs, threshold=0.5, target_sizes=target_sizes
    )[0]

    # Visualize results
    draw = ImageDraw.Draw(image)
    for box, score, label in zip(
        results["boxes"], results["scores"], results["labels"]
    ):
        box = box.tolist()
        draw.rectangle(box, outline="red", width=3)
        draw.text((box[0], box[1]), f"{CLASSES[label]}: {score:.2f}", fill="white")

    return image

In [None]:
image = Image.open(os.path.join(data_path, "test", "images", "example.jpg")).convert(
    "RGB"
)

inputs = image_processor(images=[image], return_tensors="pt")
with torch.no_grad():
    outputs = model(**inputs)
target_sizes = torch.tensor([image.size[::-1]])

result = image_processor.post_process_object_detection(
    outputs, threshold=0.4, target_sizes=target_sizes
)[0]

for score, label, box in zip(result["scores"], result["labels"], result["boxes"]):
    box = [round(i, 2) for i in box.tolist()]
    print(
        f"Detected {model.config.id2label[label.item()]} with confidence "
        f"{round(score.item(), 3)} at location {box}"
    )

image_with_boxes = image.copy()
draw = ImageDraw.Draw(image_with_boxes)

for score, label, box in zip(result["scores"], result["labels"], result["boxes"]):
    box = [round(i, 2) for i in box.tolist()]
    x, y, x2, y2 = tuple(box)
    draw.rectangle((x, y, x2, y2), outline="red", width=1)
    text_label = model.config.id2label[label.item()]
    draw.text((x, y), f"{text_label} [ {score.item():.2f} ]", fill="blue")

image_with_boxes