In [None]:
import os
import json

import torch
import numpy as np
from torch.utils.data import Dataset
from PIL import Image
from transformers import AutoProcessor, GroundingDinoForObjectDetection, TrainingArguments, Trainer, set_seed

  from .autonotebook import tqdm as notebook_tqdm


Train:
üéâ –í—Å–µ —Ñ–∞–π–ª—ã –æ–±—Ä–∞–±–æ—Ç–∞–Ω—ã!
üìä –ò—Ç–æ–≥–æ–≤–∞—è —Å—Ç–∞—Ç–∏—Å—Ç–∏–∫–∞:
   –ò–∑–æ–±—Ä–∞–∂–µ–Ω–∏–π: 10687
   –ê–Ω–Ω–æ—Ç–∞—Ü–∏–π: 4771
   –ë–µ–∑ –∞–Ω–Ω–æ—Ç–∞—Ü–∏–π: 5916
   –°–æ–æ—Ç–Ω–æ—à–µ–Ω–∏–µ: 0.81:1

val:
üéâ –í—Å–µ —Ñ–∞–π–ª—ã –æ–±—Ä–∞–±–æ—Ç–∞–Ω—ã!
üìä –ò—Ç–æ–≥–æ–≤–∞—è —Å—Ç–∞—Ç–∏—Å—Ç–∏–∫–∞:
   –ò–∑–æ–±—Ä–∞–∂–µ–Ω–∏–π: 2208
   –ê–Ω–Ω–æ—Ç–∞—Ü–∏–π: 1106
   –ë–µ–∑ –∞–Ω–Ω–æ—Ç–∞—Ü–∏–π: 1102
   –°–æ–æ—Ç–Ω–æ—à–µ–Ω–∏–µ: 1.00:1

In [2]:
model_id = "IDEA-Research/grounding-dino-base"
train_json_path = "dataset/dataset/train_annotations.json"
val_json_path = "dataset/dataset/train_annotations.json"
train_image_root = "dataset/dataset/images_train"
val_image_root = "dataset/dataset/images_train"
seed = 42
output_dir = "runs/gdino-trainer1"
labels_list = []

In [3]:
class JsonDataset(Dataset):
    def __init__(self, json_path, image_root, label2id):
        with open(json_path, "r") as f:
            data = json.load(f)
        self.image_root = image_root
        self.items = []

        for entry in data:
            w, h = entry["width"], entry["height"]
            boxes = []
            labels = []

            for ann in entry["annotations"]:
                boxes.append([ann["cx"], ann["cy"], ann["w"], ann["h"]])
                labels.append(label2id[str(ann["label_name"])])

            if len(boxes) > 0:
                boxes_cxcywh = torch.tensor(boxes, dtype=torch.float32)
                cx, cy, bw, bh = boxes_cxcywh.unbind(dim=-1)
                boxes = torch.stack([cx, cy, bw, bh], dim=-1)
                class_labels = torch.tensor(labels, dtype=torch.long)
            else:
                boxes = torch.zeros((0, 4), dtype=torch.float32)
                class_labels = torch.zeros((0,), dtype=torch.long)

            self.items.append({
                "image_path": os.path.join(image_root, entry["image_name"]),
                "size": (h, w),
                "boxes": boxes,
                "class_labels": class_labels
            })

    def __len__(self):
        return len(self.items)

    def __getitem__(self, idx):
        it = self.items[idx]
        image = Image.open(it["image_path"]).convert("RGB")
        return {"image": image, **it}

def collate_fn(batch):
    images = [b["image"] for b in batch]
    text_prompts = [text_prompt] * len(images)

    enc = processor(
        images=images,
        text=text_prompts,
        return_tensors="pt",
        padding="max_length",
        truncation=True,
        max_length=64,
    )

    enc["model_inputs"] = {k: v for k, v in enc.items()}

    enc["labels"] = [{"class_labels": b["class_labels"], "boxes": b["boxes"]} for b in batch]
    enc["orig_sizes"] = [b["size"] for b in batch]
    return enc

In [4]:
def freeze_layers(model):
    base_model = model.model

    for param in base_model.parameters():
        param.requires_grad = False

    if hasattr(base_model, "encoder"):
        for layer in base_model.encoder.layers:
            for param in layer.parameters():
                param.requires_grad = True

    if hasattr(base_model.decoder, "reference_points_head"):
        for param in base_model.decoder.reference_points_head.parameters():
            param.requires_grad = True

    if hasattr(base_model.decoder, "bbox_embed"):
        for param in base_model.decoder.bbox_embed.parameters():
            param.requires_grad = True

    # for name in ["encoder_output_bbox_embed", "enc_output", "enc_output_norm"]:
    #     if hasattr(base_model, name):
    #         for param in getattr(base_model, name).parameters():
    #             param.requires_grad = True

    total = sum(p.numel() for p in base_model.parameters())
    trainable = sum(p.numel() for p in base_model.parameters() if p.requires_grad)
    print(f"–ó–∞–º–æ—Ä–æ–∑–∫–∞ –∑–∞–≤–µ—Ä—à–µ–Ω–∞. –û–±—É—á–∞–µ–º—ã—Ö –ø–∞—Ä–∞–º–µ—Ç—Ä–æ–≤: {trainable:,} / {total:,} "
          f"({100 * trainable / total:.2f}%)")

In [5]:
class GroundingDINOTrainer(Trainer):
    
    def _build_model_inputs(self, batch, device):
        if "model_inputs" in batch:
            model_inputs = {k: v.to(device) for k, v in batch["model_inputs"].items()}
        else:
            allowed = ("input_ids", "token_type_ids", "attention_mask", "pixel_values", "pixel_mask")
            model_inputs = {k: v.to(device) for k, v in batch.items() if k in allowed and isinstance(v, torch.Tensor)}

        if "labels" in batch:
            labels_dev = []
            for item in batch["labels"]:
                cls = item["class_labels"]
                boxes = item["boxes"]
                if not isinstance(cls, torch.Tensor):
                    cls = torch.tensor(cls, dtype=torch.long)
                if not isinstance(boxes, torch.Tensor):
                    boxes = torch.tensor(boxes, dtype=torch.float32)
                labels_dev.append({
                    "class_labels": cls.to(device),
                    "boxes": boxes.to(device),
                })
            model_inputs["labels"] = labels_dev

        return model_inputs

    def compute_loss(self, model, inputs, return_outputs=False, num_items_in_batch=None):
        device = model.device
        model_inputs = self._build_model_inputs(inputs, device)
        outputs = model(**model_inputs)
        loss = outputs["loss"]
        return (loss, outputs) if return_outputs else loss

    def prediction_step(self, model, inputs, prediction_loss_only=False, ignore_keys=None):
        model.eval()
        device = model.device

        with torch.no_grad():
            loss, outputs = self.compute_loss(model, inputs, return_outputs=True)

        if prediction_loss_only:
            return (loss, None, None)

        if "model_inputs" in inputs:
            input_ids = inputs["model_inputs"]["input_ids"]
        else:
            input_ids = inputs["input_ids"]

        target_sizes = inputs.get("orig_sizes", None)

        results = processor.post_process_grounded_object_detection(
            outputs,
            input_ids=input_ids,
            box_threshold=0.4,
            text_threshold=0.4,
            target_sizes=target_sizes
        )

        preds = [torch.tensor(r["boxes"], dtype=torch.float32, device=device) for r in results]
        
        W = target_sizes[0][1]
        H = target_sizes[0][0]
        labels = [] 
        for b in inputs["labels"]: 
            boxes = b["boxes"]
            if not isinstance(boxes, torch.Tensor):
                boxes = torch.tensor(boxes, dtype=torch.float32)
            cx, cy, bw, bh = boxes.unbind(-1)
            gt_boxes = torch.stack([
                (cx - bw/2) * W,   # x1
                (cy - bh/2) * H,   # y1
                (cx + bw/2) * W,   # x2
                (cy + bh/2) * H    # y2
            ], dim=-1)
            labels.append(gt_boxes.to(device))
        count_boxes = torch.tensor([preds[0].size()[0], labels[0].size()[0]], dtype=torch.float32, device=device)
        return (loss, count_boxes, (preds, labels))

In [6]:
def compute_iou(pred_boxes, target_boxes):
    area1 = (pred_boxes[:, 2] - pred_boxes[:, 0]) * (pred_boxes[:, 3] - pred_boxes[:, 1])
    area2 = (target_boxes[:, 2] - target_boxes[:, 0]) * (target_boxes[:, 3] - target_boxes[:, 1])

    lt = torch.max(pred_boxes[:, None, :2], target_boxes[:, :2])  # (N_pred, N_gt, 2)
    rb = torch.min(pred_boxes[:, None, 2:], target_boxes[:, 2:])  # (N_pred, N_gt, 2)

    wh = (rb - lt).clamp(min=0)  # (N_pred, N_gt, 2)
    inter = wh[:, :, 0] * wh[:, :, 1]  # (N_pred, N_gt)

    union = area1[:, None] + area2 - inter
    iou = inter / union
    return iou

def match_predictions_to_targets(pred_boxes, target_boxes, iou_threshold=0.5):
    ious = compute_iou(pred_boxes, target_boxes)
    matched_gt = set()
    correct = 0

    for pred_idx in range(ious.shape[0]):
        iou_values = ious[pred_idx]
        best_gt_idx = iou_values.argmax().item()
        best_iou = iou_values[best_gt_idx].item()

        if best_iou >= iou_threshold and best_gt_idx not in matched_gt:
            matched_gt.add(best_gt_idx)
            correct += 1

    return correct

def compute_metrics(eval_preds, device=None, iou_threshold=0.5):
    if device is None:
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    count_boxes, (preds_batch, labels_batch) = eval_preds
    count_boxes = [(int(count_boxes[i].item()), int(count_boxes[i + 1].item())) for i in range(0, len(count_boxes), 2)]
    
    total_correct = 0
    total_pred = 0
    total_true = 0
    fp_count = 0

    curr_pred = 0
    curr_gt = 0
    for count_pred, count_gt in count_boxes:
        total_pred += count_pred
        total_true += count_gt
        
        if count_pred > 0 and count_gt > 0:
            pred = preds_batch[0][curr_pred:curr_pred + count_pred]
            gt = labels_batch[0][curr_gt:curr_gt + count_gt]
            
            if isinstance(pred, np.ndarray):
                pred = torch.tensor(pred, dtype=torch.float32)
            if isinstance(gt, np.ndarray):
                gt = torch.tensor(gt, dtype=torch.float32)

            correct = match_predictions_to_targets(pred, gt, iou_threshold=0.5)
            total_correct += correct
            if count_pred - correct:
                fp_count += 1
        else:
            if count_pred > 0 and count_gt == 0:
                fp_count += count_pred
            
        curr_pred += count_pred
        curr_gt += count_gt
        
    precision = total_correct / total_pred if total_pred > 0 else 0
    recall = total_correct / total_true if total_true > 0 else 0
    f1 = 2 * (precision * recall) / (precision + recall + 1e-8)
    false_positive_percentage = fp_count / len(count_boxes)

    return {"precision": precision, "recall": recall, "f1": f1, "false positive percentage": false_positive_percentage}

In [7]:
os.makedirs(output_dir, exist_ok=True)
set_seed(seed)

with open(train_json_path, "r") as f:
    train_data = json.load(f)
labels = labels_list if labels_list else sorted({ann["label_name"] for d in train_data for ann in d["annotations"]})
label2id = {c: i for i, c in enumerate(labels)}
id2label = {i: c for c, i in label2id.items()}

train_ds = JsonDataset(train_json_path, train_image_root, label2id)
val_ds = JsonDataset(val_json_path, val_image_root, label2id)

processor = AutoProcessor.from_pretrained(model_id)
model = GroundingDinoForObjectDetection.from_pretrained(model_id, id2label=id2label, label2id=label2id, ignore_mismatched_sizes=True)
freeze_layers(model)
text_prompt = " . ".join(labels) + " ."

args = TrainingArguments(
    output_dir='checkpoints',
    per_device_train_batch_size=2,
    per_device_eval_batch_size=8,
    eval_accumulation_steps=128,
    gradient_accumulation_steps=32,
    num_train_epochs=100,
    learning_rate=1e-5,
    eval_strategy='epoch',
    eval_on_start=True,
    remove_unused_columns=False,
    weight_decay=3e-6,
    adam_beta2=0.999,
    optim="adamw_torch",
    save_strategy="best",
    load_best_model_at_end=True,
    bf16=True,
    dataloader_pin_memory=False,
    logging_dir="./logs",
    report_to="tensorboard",
    logging_strategy="epoch",
    metric_for_best_model="f1",
    greater_is_better=True,
    lr_scheduler_type="cosine"
)

trainer = GroundingDINOTrainer(
    model=model,
    args=args,
    train_dataset=train_ds,
    eval_dataset=val_ds,
    data_collator=collate_fn,
    compute_metrics=compute_metrics
)

trainer.train()
model.save_pretrained(output_dir)
processor.save_pretrained(output_dir)

üßä –ó–∞–º–æ—Ä–æ–∑–∫–∞ –∑–∞–≤–µ—Ä—à–µ–Ω–∞. –û–±—É—á–∞–µ–º—ã—Ö –ø–∞—Ä–∞–º–µ—Ç—Ä–æ–≤: 22,235,396 / 232,313,216 (9.57%)


In [8]:
model_2 = GroundingDinoForObjectDetection.from_pretrained('checkpoint-334', id2label=id2label, label2id=label2id, ignore_mismatched_sizes=True)
trainer_1 = GroundingDINOTrainer(
    model=model_2,
    args=args,
    train_dataset=train_ds,
    eval_dataset=val_ds,
    data_collator=collate_fn,
    compute_metrics=compute_metrics
)
trainer_1.evaluate()

  return forward_call(*args, **kwargs)
  preds = [torch.tensor(r["boxes"], dtype=torch.float32, device=device) for r in results]


{'eval_loss': 394244.9375,
 'eval_model_preparation_time': 0.0104,
 'eval_precision': 0.891970802919708,
 'eval_recall': 0.9934959349593496,
 'eval_f1': 0.9399999950144972,
 'eval_false positive percentage': 0.05538922155688623,
 'eval_runtime': 3933.6517,
 'eval_samples_per_second': 2.717,
 'eval_steps_per_second': 0.34}