In [None]:
from transformers import DetrImageProcessor, DetrForObjectDetection
import torch
import os
from PIL import Image
import torch
from torch.utils.data import Dataset
from pycocotools.coco import COCO
import matplotlib.pyplot as plt
import matplotlib.patches as patches
import os
import random
import torchvision
import time
from pytorch_lightning.loggers import TensorBoardLogger

In [None]:
import warnings
warnings.filterwarnings("ignore")

In [None]:
DEVICE = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
CHECKPOINT = 'facebook/detr-resnet-50'
CONFIDENCE_TRESHOLD = 0.5
IOU_TRESHOLD = 0.8
image_processor = DetrImageProcessor.from_pretrained(CHECKPOINT)
model = DetrForObjectDetection.from_pretrained(CHECKPOINT)
model.to(DEVICE)

In [None]:
!pip install -i https://test.pypi.org/simple/ supervision==0.3.0

In [None]:
!pip install roboflow

#isolated dataset
from roboflow import Roboflow
rf = Roboflow(api_key="8LphsYHJxlPrbZc2rNfn")
project = rf.workspace("runxy").project("isolated-6chwu")
version = project.version(2)
dataset = version.download("coco")
                
DATASET_LOCATION = dataset.location

In [None]:
# %% [code] {"id":"Yewz_KH0sjsK", ...}
import os
from PIL import Image
import torch
import torchvision
from torch.utils.data import Dataset, Subset
from pycocotools.coco import COCO

class CocoDetection(torchvision.datasets.CocoDetection):
    def __init__(
        self,
        img_folder: str,
        annotation_file: str,
        image_processor,
        train: bool = True
    ):
        super(CocoDetection, self).__init__(img_folder, annotation_file)
        self.image_processor = image_processor
        self.train = train

    def __getitem__(self, idx):
        img, target = super(CocoDetection, self).__getitem__(idx)
        
        # --- INI PERBAIKANNYA ---
        # img.size adalah (width, height), kita balik jadi (height, width)
        orig_size = torch.tensor([img.size[1], img.size[0]]) 
        # --- AKHIR PERBAIKAN ---

        image_id = self.ids[idx]
        target = {'image_id': image_id, 'annotations': target}
        
        encoding = self.image_processor(
            images=img, 
            annotations=target, 
            return_tensors="pt"
        )
        
        pixel_values = encoding["pixel_values"].squeeze()
        target = encoding["labels"][0]

        target["orig_size"] = orig_size # Sekarang formatnya [tinggi, lebar]

        return pixel_values, target

In [None]:
# In your DataLoader cell (ZBA5h8OF_3sQ)
def collate_fn(batch):
    # Filter out None entries caused by errors in __getitem__
    batch = [item for item in batch if item is not None and item[0] is not None]
    # If the whole batch was bad, return None or an empty dict
    if not batch:
        return {'pixel_values': torch.empty(0), 'pixel_mask': torch.empty(0), 'labels': []} # Return empty batch

    pixel_values = [item[0] for item in batch]
    encoding = image_processor.pad(pixel_values, return_tensors="pt")
    labels = [item[1] for item in batch]
    return {
        'pixel_values': encoding['pixel_values'],
        'pixel_mask': encoding['pixel_mask'],
        'labels': labels
    }

In [None]:
# Define constants at the top (like train_huggingface does)
ANNOTATION_FILE_NAME = "_annotations.coco.json"

# Define paths using os.path for better cross-platform compatibility
train_folder = os.path.join(DATASET_LOCATION, "train")
val_folder   = os.path.join(DATASET_LOCATION, "valid")
test_folder  = os.path.join(DATASET_LOCATION, "test")

# Get annotation files (using the constant)
train_annotation_file = os.path.join(train_folder, ANNOTATION_FILE_NAME)
val_annotation_file = os.path.join(val_folder, ANNOTATION_FILE_NAME)
test_annotation_file = os.path.join(test_folder, ANNOTATION_FILE_NAME)

# Initialize datasets with proper parameters
train_dataset = CocoDetection(
    img_folder=train_folder,
    annotation_file=train_annotation_file,
    image_processor=image_processor,  # Make sure this matches what you defined earlier
    train=True  # Add train parameter
)

val_dataset = CocoDetection(
    img_folder=val_folder,
    annotation_file=val_annotation_file,
    image_processor=image_processor,
    train=False
)

test_dataset = CocoDetection(
    img_folder=test_folder,
    annotation_file=test_annotation_file,
    image_processor=image_processor,
    train=False
)

# Add dataset size logging (like train_huggingface does)
print("Number of training examples:", len(train_dataset))
print("Number of validation examples:", len(val_dataset))
print("Number of test examples:", len(test_dataset))

In [None]:
import random
import cv2
import numpy as np
import supervision as sv



# select random image
image_ids = train_dataset.coco.getImgIds()
image_id = random.choice(image_ids)
print('Image #{}'.format(image_id))

# load image and annotatons
image = train_dataset.coco.loadImgs(image_id)[0]
annotations = train_dataset.coco.imgToAnns[image_id]
image_path = os.path.join(train_dataset.root, image['file_name'])
image = cv2.imread(image_path)

# annotate
detections = sv.Detections.from_coco_annotations(coco_annotation=annotations)

# we will use id2label function for training
categories = train_dataset.coco.cats
id2label = {k: v['name'] for k,v in categories.items()}

labels = [
    f"{id2label[class_id]}"
    for _, _, class_id, _
    in detections
]

box_annotator = sv.BoxAnnotator()
frame = box_annotator.annotate(scene=image, detections=detections, labels=labels)

%matplotlib inline
sv.show_frame_in_notebook(image, (16, 16))

In [None]:
TRAIN_SAMPLES = 100
VAL_SAMPLES = 20

train_subset = Subset(train_dataset, range(TRAIN_SAMPLES))
val_subset = Subset(val_dataset, range(VAL_SAMPLES))

In [None]:
# %% [code] {"id":"ZBA5h8OF_3sQ", ...}
from torch.utils.data import DataLoader

# Create DataLoaders with optimizations
train_loader = DataLoader(
    dataset=train_dataset,
    batch_size=16,  # <-- INCREASED BATCH SIZE
    shuffle=True,
    collate_fn=collate_fn,
    num_workers=4,
    pin_memory=True,            # <-- ADDED
    persistent_workers=True     # <-- ADDED
)

val_loader = DataLoader(
    dataset=val_dataset,
    batch_size=16,  # <-- INCREASED BATCH SIZE
    shuffle=False,
    collate_fn=collate_fn,
    num_workers=4,
    pin_memory=True,            # <-- ADDED
    persistent_workers=True     # <-- ADDED
)

test_loader = DataLoader(
    dataset=test_dataset,
    batch_size=16,  # <-- INCREASED BATCH SIZE
    shuffle=False,
    collate_fn=collate_fn,
    num_workers=2, # Can be lower for testing
    pin_memory=True,            # <-- ADDED
    persistent_workers=True     # <-- ADDED
)

In [None]:
from pytorch_lightning.callbacks import Callback
import matplotlib.pyplot as plt
import numpy as np
import os

class LossPlotCallback(Callback):
    def __init__(self, save_dir='/kaggle/working/training_plots'):
        super().__init__()
        self.save_dir = save_dir
        self.train_losses = []
        self.val_losses = []
        self.epochs = []

    def on_train_epoch_end(self, trainer, pl_module):
        metrics = trainer.callback_metrics
        epoch_idx = trainer.current_epoch + 1
        self.epochs.append(epoch_idx)
        # pakai .get untuk aman
        tr = metrics.get("train/loss")
        self.train_losses.append(float(tr.cpu()) if tr is not None else np.nan)

    def on_validation_epoch_end(self, trainer, pl_module):
        metrics = trainer.callback_metrics
        va = metrics.get("val/loss")
        # simpan di slot terakhir (epoch yang sama)
        if len(self.val_losses) < len(self.epochs) - 1:
            # align jika pengguna mematikan val untuk sebagian epoch
            self.val_losses += [np.nan] * (len(self.epochs) - 1 - len(self.val_losses))
        self.val_losses.append(float(va.cpu()) if va is not None else np.nan)

    def on_train_end(self, trainer, pl_module):
        os.makedirs(self.save_dir, exist_ok=True)
        # samakan panjang list
        n = min(len(self.epochs), len(self.train_losses), len(self.val_losses) if self.val_losses else len(self.epochs))
        ep = self.epochs[:n]
        tr = self.train_losses[:n]
        va = self.val_losses[:n] if self.val_losses else [np.nan]*n

        plt.figure(figsize=(10,6))
        plt.plot(ep, tr, marker='.', label='train/loss')
        plt.plot(ep, va, marker='.', label='val/loss')
        plt.xlabel('Epoch'); plt.ylabel('Loss'); plt.title('Loss per Epoch')
        plt.grid(True); plt.legend()
        plt.tight_layout()
        out = os.path.join(self.save_dir, 'loss_curves.png')
        plt.savefig(out)
        print(f"[LossPlotCallback] Saved: {out}")
        plt.close()


In [None]:
# %% [code] {"id":"9r-lMAWKWoLY", ...}
import pytorch_lightning as pl
from transformers import DetrForObjectDetection
import torch


class Detr(pl.LightningModule):
    def __init__(self, lr, lr_backbone, weight_decay, freeze_backbone=False):
        super().__init__()
        self.save_hyperparameters()  # supaya hyperparams kesimpan di logger
        self.model = DetrForObjectDetection.from_pretrained(
            pretrained_model_name_or_path=CHECKPOINT,
            num_labels=len(id2label),
            ignore_mismatched_sizes=True
        )
        if freeze_backbone:
            for n, p in self.model.named_parameters():
                if "backbone" in n:
                    p.requires_grad = False

        self.lr = lr
        self.lr_backbone = lr_backbone
        self.weight_decay = weight_decay

    def forward(self, pixel_values, pixel_mask):
        return self.model(pixel_values=pixel_values, pixel_mask=pixel_mask)

    def common_step(self, batch):
        pixel_values = batch["pixel_values"]
        pixel_mask = batch["pixel_mask"]
        labels = [{k: v.to(DEVICE) for k, v in t.items()} for t in batch["labels"]]
        outputs = self.model(pixel_values=pixel_values, pixel_mask=pixel_mask, labels=labels)
        return outputs.loss, outputs.loss_dict

    def training_step(self, batch, batch_idx):
        loss, loss_dict = self.common_step(batch)
        # pakai nama standar "train/loss"
        self.log("train/loss", loss, on_step=False, on_epoch=True, prog_bar=True, sync_dist=False)
        for k, v in loss_dict.items():
            # contoh: train/loss_ce, train/loss_bbox, dll.
            self.log(f"train/{k}", v, on_step=False, on_epoch=True, prog_bar=False, sync_dist=False)
        return loss

    def validation_step(self, batch, batch_idx):
        loss, loss_dict = self.common_step(batch)
        # pakai nama standar "val/loss"
        self.log("val/loss", loss, on_step=False, on_epoch=True, prog_bar=True, sync_dist=False)
        for k, v in loss_dict.items():
            self.log(f"val/{k}", v, on_step=False, on_epoch=True, prog_bar=False, sync_dist=False)
        return loss

    def configure_optimizers(self):
        param_dicts = [
            {"params": [p for n, p in self.named_parameters() if "backbone" not in n and p.requires_grad]},
            {"params": [p for n, p in self.named_parameters() if "backbone" in n and p.requires_grad],
             "lr": self.lr_backbone},
        ]
        optimizer = torch.optim.AdamW(param_dicts, lr=self.lr, weight_decay=self.weight_decay)
        scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
            optimizer, mode='min', factor=0.1, patience=5, verbose=True
        )
        return {
            "optimizer": optimizer,
            "lr_scheduler": {"scheduler": scheduler, "monitor": "val/loss", "interval": "epoch", "frequency": 1}
        }

    def train_dataloader(self):
        return train_loader

    def val_dataloader(self):
        return val_loader


In [None]:
model = Detr(lr=1e-4, lr_backbone=1e-5, weight_decay=1e-4, freeze_backbone=True)
batch = next(iter(train_loader))
outputs = model(pixel_values=batch['pixel_values'], pixel_mask=batch['pixel_mask'])

In [None]:
total_params = sum(p.numel() for p in model.parameters())
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)

print(f"Total parameters: {total_params:,}")
print(f"Trainable parameters: {trainable_params:,}")
# --- END ---

In [None]:
from pytorch_lightning import Trainer
from pytorch_lightning.callbacks import ModelCheckpoint, LearningRateMonitor
from pytorch_lightning.loggers import CSVLogger, TensorBoardLogger
MAX_EPOCHS = 100
ACCUM_STEPS = 1

In [None]:
ckpt = ModelCheckpoint(
    dirpath="/kaggle/working/checkpoints",
    filename="best-{epoch:02d}-{val_loss:.4f}",
    monitor="val/loss",
    mode="min",
    save_top_k=1
)

loss_plot_cb = LossPlotCallback(save_dir="/kaggle/working/training_plots")
lr_monitor = LearningRateMonitor(logging_interval='epoch')

csv_logger = CSVLogger(save_dir="/kaggle/working/lightning_logs", name="detr_run")
tb_logger = TensorBoardLogger(save_dir="/kaggle/working/lightning_logs", name="detr_tb")

trainer = Trainer(
    devices=1, accelerator="gpu",
    precision="16-mixed",
    max_epochs=MAX_EPOCHS,
    gradient_clip_val=0.1,
    accumulate_grad_batches=ACCUM_STEPS,
    log_every_n_steps=50,             # lebih sering supaya metrik tercatat
    check_val_every_n_epoch=1,
    enable_model_summary=False,
    callbacks=[ckpt, loss_plot_cb, lr_monitor],
    logger=[csv_logger, tb_logger],
)

trainer.fit(model)

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import os

metrics_path = os.path.join(csv_logger.log_dir, "metrics.csv")
print("Metrics CSV:", metrics_path)

df = pd.read_csv(metrics_path)

# Ambil baris yang punya epoch (bukan step-level)
df_epoch = df[df["epoch"].notna()].copy()

plt.figure(figsize=(10,6))
if "train/loss" in df_epoch.columns:
    plt.plot(df_epoch["epoch"], df_epoch["train/loss"], marker='.', label="train/loss")
if "val/loss" in df_epoch.columns:
    plt.plot(df_epoch["epoch"], df_epoch["val/loss"], marker='.', label="val/loss")
plt.xlabel("Epoch"); plt.ylabel("Loss"); plt.title("Loss (from metrics.csv)")
plt.grid(True); plt.legend(); plt.tight_layout()
plt.savefig("/kaggle/working/training_plots/loss_from_csv.png")
plt.close()
print("Saved /kaggle/working/training_plots/loss_from_csv.png")


In [None]:
# After training, save the model state dictionary manually:
torch.save(model.state_dict(), "/kaggle/working/detr_isolated100_weights.pth")

# To later load the model:
# model.load_state_dict(torch.load("/kaggle/input/m/runxyy/detr-math/pytorch/default/1/detr_model_weights.pth"))

In [None]:
from IPython.display import Image, display

# Define the path to your saved plot
image_path = "/kaggle/working/training_plots/loss_curves.png"

# Display the image
display(Image(filename=image_path))

In [None]:
model.to(DEVICE).eval()

In [None]:
import random
import cv2
import numpy as np


# utils
categories = test_dataset.coco.cats
id2label = {k: v['name'] for k,v in categories.items()}
box_annotator = sv.BoxAnnotator()

# select random image
image_ids = test_dataset.coco.getImgIds()
image_id = random.choice(image_ids)
print('Image #{}'.format(image_id))

# load image and annotatons
image = test_dataset.coco.loadImgs(image_id)[0]
annotations = test_dataset.coco.imgToAnns[image_id]
image_path = os.path.join(test_dataset.root, image['file_name'])
image = cv2.imread(image_path)

# annotate
detections = sv.Detections.from_coco_annotations(coco_annotation=annotations)
labels = [f"{id2label[class_id]}" for _, _, class_id, _ in detections]
frame = box_annotator.annotate(scene=image.copy(), detections=detections, labels=labels)

print('ground truth')
%matplotlib inline
sv.show_frame_in_notebook(frame, (16, 16))

# inference
with torch.no_grad():

    # load image and predict
    inputs = image_processor(images=image, return_tensors='pt').to(DEVICE)
    outputs = model(**inputs)

    # post-process
    target_sizes = torch.tensor([image.shape[:2]]).to(DEVICE)
    results = image_processor.post_process_object_detection(
        outputs=outputs,
        threshold=CONFIDENCE_TRESHOLD,
        target_sizes=target_sizes
    )[0]

# annotate
detections = sv.Detections.from_transformers(transformers_results=results).with_nms(threshold=0.5)
labels = [f"{id2label[class_id]} {confidence:.2f}" for _, confidence, class_id, _ in detections]
frame = box_annotator.annotate(scene=image.copy(), detections=detections, labels=labels)

print('detections')
%matplotlib inline
sv.show_frame_in_notebook(frame, (16, 16))

In [None]:
import cv2
import torch
import supervision as sv
import matplotlib.pyplot as plt

image = test_dataset.coco.loadImgs(image_id)[0]
annotations = test_dataset.coco.imgToAnns[image_id]
image_path = os.path.join(test_dataset.root, image['file_name'])
image = cv2.imread(image_path)
if image is None:
    print(f"Error: Cannot load image from {IMAGE_PATH}")
else:
    print("Image loaded successfully!")
    print(f"Image shape: {image.shape}")
    
    # Show original image
    plt.figure(figsize=(10, 10))
    plt.imshow(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
    plt.title("Original Image")
    plt.axis('off')
    plt.show()
    
    # Create annotator
    box_annotator = sv.BoxAnnotator()

    # Inference
    with torch.no_grad():
        # Load image and predict
        inputs = image_processor(images=image, return_tensors='pt').to(DEVICE)
        outputs = model(**inputs)

        # Post-process
        target_sizes = torch.tensor([image.shape[:2]]).to(DEVICE)
        results = image_processor.post_process_object_detection(
            outputs=outputs,
            threshold=CONFIDENCE_TRESHOLD,
            target_sizes=target_sizes
        )[0]

    # Annotate
    detections = sv.Detections.from_transformers(transformers_results=results).with_nms(threshold=0.5)
    labels = [f"{id2label[class_id]} {confidence:.2f}" for _, confidence, class_id, _ in detections]
    frame = box_annotator.annotate(scene=image.copy(), detections=detections, labels=labels)

    # Display result
    plt.figure(figsize=(10, 10))
    plt.imshow(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
    plt.title("Detection Results")
    plt.axis('off')
    plt.show()

In [None]:
# Pastikan pakai categories dari dataset yang dievaluasi (test atau val)
# GANTI 'test_dataset' jika kamu sedang evaluasi 'val_dataset'
categories_from_coco = test_dataset.coco.cats 

# Ambil semua COCO category ID dan urutkan
coco_cat_ids = sorted(categories_from_coco.keys()) 

# Buat mapping: index model (0, 1, ...) -> ID kategori COCO (1, 2, ...)
model_label_to_coco_id = {model_idx: coco_id for model_idx, coco_id in enumerate(coco_cat_ids)}

print("--- DEBUG: Category ID Mapping ---")
print(f"COCO categories (ID: name): { {k: v['name'] for k,v in categories_from_coco.items()} }")
print(f"Model index -> COCO ID map: {model_label_to_coco_id}")
print("---------------------------------")

In [None]:
# %% [code] {"id":"RZ6AHdPDk4nz", ...}
def convert_to_xywh(boxes):
    xmin, ymin, xmax, ymax = boxes.unbind(1)
    return torch.stack((xmin, ymin, xmax - xmin, ymax - ymin), dim=1)

# Function yang diupdate - terima mapping
def prepare_for_coco_detection(predictions, model_label_to_coco_id_map): 
    coco_results = []
    for original_id, prediction in predictions.items():

        # Cek jika prediction kosong atau tidak ada 'scores'
        # Atau jika tensor 'scores' ada tapi kosong
        if not prediction or "scores" not in prediction or len(prediction["scores"]) == 0:
            continue # Lewati jika tidak ada prediksi

        boxes = prediction["boxes"]
        boxes = convert_to_xywh(boxes).tolist()
        scores = prediction["scores"].tolist()
        labels = prediction["labels"].tolist() # Ini index dari model (0, 1, ...)

        coco_results.extend(
            [
                {
                    "image_id": original_id,
                    # --- PAKAI MAP UNTUK DAPATKAN COCO ID YANG BENAR ---
                    "category_id": model_label_to_coco_id_map.get(labels[k], -1), # Gunakan map
                    # --- AKHIR MAPPING ---
                    "bbox": box, # Sudah format [x, y, w, h]
                    "score": scores[k],
                }
                # Pastikan label index ada di map sebelum menambahkannya
                for k, box in enumerate(boxes) if labels[k] in model_label_to_coco_id_map 
            ]
        )
    return coco_results

In [None]:
!pip -q install coco_eval

In [None]:
from coco_eval import CocoEvaluator
from tqdm.notebook import tqdm

import numpy as np

evaluator = CocoEvaluator(coco_gt=test_dataset.coco, iou_types=["bbox"])

print("Running evaluation...")

for idx, batch in enumerate(tqdm(test_loader)):
    pixel_values = batch["pixel_values"].to(DEVICE)
    pixel_mask = batch["pixel_mask"].to(DEVICE)
    labels = [{k: v.to(DEVICE) for k, v in t.items()} for t in batch["labels"]]

    with torch.no_grad():
      outputs = model(pixel_values=pixel_values, pixel_mask=pixel_mask)

    orig_target_sizes = torch.stack([target["orig_size"] for target in labels], dim=0)
    results = image_processor.post_process_object_detection(outputs, target_sizes=orig_target_sizes)

    predictions = {target['image_id'].item(): output for target, output in zip(labels, results)}
    predictions = prepare_for_coco_detection(predictions, model_label_to_coco_id) # Pass the map
    evaluator.update(predictions)

evaluator.synchronize_between_processes()
evaluator.accumulate()
evaluator.summarize()

In [None]:
from coco_eval import CocoEvaluator
from tqdm.notebook import tqdm

import numpy as np

evaluator = CocoEvaluator(coco_gt=val_dataset.coco, iou_types=["bbox"])

print("Running evaluation...")

for idx, batch in enumerate(tqdm(val_loader)):
    pixel_values = batch["pixel_values"].to(DEVICE)
    pixel_mask = batch["pixel_mask"].to(DEVICE)
    labels = [{k: v.to(DEVICE) for k, v in t.items()} for t in batch["labels"]]

    with torch.no_grad():
      outputs = model(pixel_values=pixel_values, pixel_mask=pixel_mask)

    orig_target_sizes = torch.stack([target["orig_size"] for target in labels], dim=0)
    results = image_processor.post_process_object_detection(outputs, target_sizes=orig_target_sizes)

    predictions = {target['image_id'].item(): output for target, output in zip(labels, results)}
    predictions = prepare_for_coco_detection(predictions, model_label_to_coco_id) # Pass the map
    evaluator.update(predictions)

evaluator.synchronize_between_processes()
evaluator.accumulate()
evaluator.summarize()