In [1]:
import os
import torch
from torch.utils.data import Dataset, DataLoader
from transformers import DetrImageProcessor, DetrForObjectDetection, get_linear_schedule_with_warmup
from torch.optim import AdamW
from PIL import Image

# ---------------------- Load classes.txt ----------------------
class_id_to_name = {}
with open('dataset/classes.txt', 'r') as f:
    for line in f:
        line = line.strip()
        if line:
            idx, name = line.split(':')
            class_id_to_name[int(idx)] = name

num_classes = len(class_id_to_name)
class_names = [class_id_to_name[i] for i in range(num_classes)]
print(f"Classes: {class_names}")

# ---------------------- Custom Dataset ------------------------
class YOLOCircuitDataset(Dataset):
    def __init__(self, img_dir, anno_dir, processor):
        self.img_dir = img_dir
        self.anno_dir = anno_dir
        self.processor = processor
        self.img_files = [f for f in os.listdir(img_dir) if f.lower().endswith('.jpg')]
        self.img_files.sort()

    def __len__(self):
        return len(self.img_files)

    def __getitem__(self, idx):
        img_file = self.img_files[idx]
        img_path = os.path.join(self.img_dir, img_file)
        image = Image.open(img_path).convert("RGB")
        width, height = image.size

        # Parse YOLO txt annotations and wrap in COCO format
        txt_name = os.path.splitext(img_file)[0] + ".txt"
        txt_path = os.path.join(self.anno_dir, txt_name)
        annotations = []
        if os.path.exists(txt_path):
            with open(txt_path, 'r') as f:
                for line in f:
                    class_id, x_c, y_c, w, h = map(float, line.strip().split())
                    x_c, y_c, w, h = x_c * width, y_c * height, w * width, h * height
                    x_min = x_c - w / 2
                    y_min = y_c - h / 2
                    annotations.append({
                        "category_id": int(class_id),
                        "bbox": [x_min, y_min, w, h],   # COCO = [x_min, y_min, width, height]
                        "area": w * h,
                        "iscrowd": 0
                    })

        # COCO-style dict for the HF processor
        target = {
            "image_id": idx,
            "annotations": annotations
        }
        encoding = self.processor(images=image, annotations=target, return_tensors="pt")
        # Don't squeeze here: let collate_fn stack properly!
        return encoding

# -------------------- Custom collate_fn ----------------------
def collate_fn(batch):
    """
    Batches HF encodings from YOLOCircuitDataset.
    Each batch[i] is a dict of tensors (with leading batch dim 1).
    """
    batched = {}
    for k in batch[0]:
        # pixel_values: [1,3,H,W] per item, so stack then squeeze
        if k == "pixel_values":
            batched[k] = torch.cat([item[k] for item in batch], dim=0)
        # labels: each is a dict (no batch dim), so keep as list
        elif k == "labels":
            batched[k] = [item[k][0] for item in batch]  # [0] to drop singleton batch dim
        else:
            batched[k] = [item[k] for item in batch]
    return batched

# --------------------- Dataset and DataLoader ------------------
processor = DetrImageProcessor.from_pretrained("facebook/detr-resnet-50")
dataset = YOLOCircuitDataset("dataset/train/processed", "dataset/train/annotations", processor)
loader = DataLoader(dataset, batch_size=1, shuffle=True, collate_fn=collate_fn)

# --------------------- Model Setup -----------------------------
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = DetrForObjectDetection.from_pretrained(
    "facebook/detr-resnet-50",
    num_labels=num_classes,
    ignore_mismatched_sizes=True
)
model.to(device)

optimizer = AdamW(model.parameters(), lr=2e-5)
num_epochs = 25
num_training_steps = num_epochs * len(loader)
lr_scheduler = get_linear_schedule_with_warmup(
    optimizer, num_warmup_steps=0, num_training_steps=num_training_steps
)

# --------------------- Training Loop ---------------------------
model.train()
for epoch in range(num_epochs):
    total_loss = 0
    for batch in loader:
        pixel_values = batch["pixel_values"].to(device)
        labels = [{k: v.to(device) for k, v in t.items()} for t in batch["labels"]]
        outputs = model(pixel_values=pixel_values, labels=labels)
        loss = outputs.loss
        loss.backward()
        optimizer.step()
        lr_scheduler.step()
        optimizer.zero_grad()
        total_loss += loss.item()
    avg_loss = total_loss / len(loader)
    print(f"Epoch {epoch+1}/{num_epochs} - Loss: {avg_loss:.4f}")

# --------------------- Save Model ------------------------------
os.makedirs("ckt_detr_fewshot", exist_ok=True)
model.save_pretrained("ckt_detr_fewshot")
processor.save_pretrained("ckt_detr_fewshot")
with open("ckt_detr_fewshot/class_names.txt", "w") as f:
    for name in class_names:
        f.write(name + "\n")
print("Training complete and model saved.")


  from .autonotebook import tqdm as notebook_tqdm


Classes: ['battery', 'bulb', 'resistor', 'capacitor', 'inductor', 'transistor', 'ground', 'switch', 'diode']


Some weights of the model checkpoint at facebook/detr-resnet-50 were not used when initializing DetrForObjectDetection: ['model.backbone.conv_encoder.model.layer1.0.downsample.1.num_batches_tracked', 'model.backbone.conv_encoder.model.layer2.0.downsample.1.num_batches_tracked', 'model.backbone.conv_encoder.model.layer3.0.downsample.1.num_batches_tracked', 'model.backbone.conv_encoder.model.layer4.0.downsample.1.num_batches_tracked']
- This IS expected if you are initializing DetrForObjectDetection from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing DetrForObjectDetection from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of DetrForObjectDetection were not initialized from the model checkpoin

Epoch 1/25 - Loss: 5.6066
Epoch 2/25 - Loss: 5.6814
Epoch 3/25 - Loss: 5.1000
Epoch 4/25 - Loss: 5.1966
Epoch 5/25 - Loss: 5.1187
Epoch 6/25 - Loss: 5.2633
Epoch 7/25 - Loss: 5.0803
Epoch 8/25 - Loss: 4.8406
Epoch 9/25 - Loss: 4.6921
Epoch 10/25 - Loss: 4.2949
Epoch 11/25 - Loss: 3.8885
Epoch 12/25 - Loss: 3.6379
Epoch 13/25 - Loss: 3.5656
Epoch 14/25 - Loss: 3.3800
Epoch 15/25 - Loss: 3.3390
Epoch 16/25 - Loss: 3.2841
Epoch 17/25 - Loss: 3.2763
Epoch 18/25 - Loss: 3.1141
Epoch 19/25 - Loss: 3.2496
Epoch 20/25 - Loss: 3.2368
Epoch 21/25 - Loss: 3.0329
Epoch 22/25 - Loss: 3.0409
Epoch 23/25 - Loss: 3.0662
Epoch 24/25 - Loss: 2.9916
Epoch 25/25 - Loss: 3.0619
Training complete and model saved.


In [11]:
import torch
from transformers import DetrImageProcessor, DetrForObjectDetection
from PIL import Image, ImageDraw, ImageFont
import os
import numpy as np

# Load model & processor
model_dir = "ckt_detr_fewshot"
model = DetrForObjectDetection.from_pretrained(model_dir).eval()
processor = DetrImageProcessor.from_pretrained(model_dir)

# Device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# Load class names
with open(os.path.join(model_dir, "class_names.txt")) as f:
    class_names = [line.strip() for line in f if line.strip()]

# Output dir for results
os.makedirs("eval_results", exist_ok=True)

# Inference on each test image
test_dir = "dataset/test/processed"
image_files = [f for f in os.listdir(test_dir) if f.lower().endswith(".jpg")]

for img_name in image_files:
    img_path = os.path.join(test_dir, img_name)
    image = Image.open(img_path).convert("RGB")
    # Preprocess
    inputs = processor(images=image, return_tensors="pt").to(device)
    # Inference
    with torch.no_grad():
        outputs = model(**inputs)

    # Post-process
    target_sizes = torch.tensor([image.size[::-1]]).to(device)  # (H, W)
    results = processor.post_process_object_detection(
        outputs, 
        target_sizes=target_sizes, 
        threshold=0.135  # confidence threshold, adjust if needed
    )[0]

    # Draw boxes
    draw = ImageDraw.Draw(image)
    for score, label, box in zip(results["scores"], results["labels"], results["boxes"]):
        score = float(score)
        label = int(label)
        box = [float(x) for x in box.tolist()]
        x0, y0, x1, y1 = box
        draw.rectangle([x0, y0, x1, y1], outline="red", width=3)
        class_label = class_names[label] if label < len(class_names) else str(label)
        draw.text((x0, y0-10), f"{class_label} {score:.2f}", fill="red")
    
    # Save visualized image
    save_path = os.path.join("eval_results", f"pred_{img_name}")
    image.save(save_path)
    print(f"Saved: {save_path}")

print("Evaluation done! Visualized predictions in eval_results/")


Saved: eval_results\pred_000.jpg
Evaluation done! Visualized predictions in eval_results/
