In [None]:
!pip install -q "numpy<2.0" "matplotlib<3.9" "ultralytics==8.2.50" pyyaml


In [2]:
import os
from pathlib import Path
import yaml
import cv2
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, models
from PIL import Image
import matplotlib.pyplot as plt
import numpy as np

from ultralytics import YOLO
from ultralytics.utils import patches
from ultralytics.nn.tasks import DetectionModel

# PyTorch >=2.6 "safe load" fix – allow DetectionModel in checkpoints
try:
    from torch.serialization import add_safe_globals
    add_safe_globals([DetectionModel])
    print("Registered DetectionModel as safe global for torch.load ✅")
except Exception as e:
    print("Safe globals not needed / not available:", e)

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", DEVICE)

# ----- PATHS -----
# Kaggle: dataset is usually mounted here:
DATA_ROOT = Path("/kaggle/input/capstonev3")
# If you're on Colab, change DATA_ROOT to wherever you unzipped the dataset.

ORIG_YAML  = DATA_ROOT / "data.yaml"
FIXED_YAML = Path("/kaggle/working/data_fixed.yaml")  # or any writable folder

# Classifier settings
MODEL_TYPE   = "resnet50"   # or "vit_b_16"
EPOCHS       = 20           # increase to 40–60 for max accuracy
BATCH_SIZE   = 64
LR           = 1e-4
IMG_SIZE     = 224
NUM_WORKERS  = 4            # for classifier dataloaders

CLASSIFIER_CKPT = Path("/kaggle/working/best_classifier.pth")
YOLO_EXP_NAME   = "yolov12_mar20"


numpy: 1.26.4
matplotlib: 3.7.2
ultralytics: 8.2.50


In [3]:
print("Original data.yaml:")
print(ORIG_YAML.read_text()[:500])

with open(ORIG_YAML, "r") as f:
    orig = yaml.safe_load(f)

CLASS_NAMES = orig["names"]
NC = orig["nc"]
print("Number of classes:", NC)
print("Classes:", CLASS_NAMES)

fixed = {
    "path": str(DATA_ROOT),     # root folder
    "train": "train/images",
    "val": "valid/images",
    "test": "test/images",
    "nc": NC,
    "names": CLASS_NAMES,
}

with open(FIXED_YAML, "w") as f:
    yaml.safe_dump(fixed, f)

print("\nUsing fixed data.yaml:")
print(FIXED_YAML.read_text())


Patched ultralytics.utils.patches.imread ✅


In [4]:
# ---- Patch Ultralytics to use cv2.imread (avoid imdecode issues) ----
def simple_imread(path, flags=cv2.IMREAD_COLOR):
    return cv2.imread(str(path), flags)

patches.imread = simple_imread
print("Patched ultralytics.utils.patches.imread -> cv2.imread ✅")

# ---- YOLOv12-style detector (YOLO11n backbone) ----
yolo_model = YOLO("yolo11n.pt")  # small model; you can try yolo11s.pt, yolo11m.pt, etc.

yolo_model.train(
    data=str(FIXED_YAML),
    epochs=30,          # increase for better mAP if you have time
    imgsz=640,
    batch=16,
    workers=0,          # 0 => use main process, no DataLoader worker bugs
    project="yolo_mar20",
    name=YOLO_EXP_NAME,
    amp=False,          # keep AMP off for stability
)

YOLO_BEST = Path(f"yolo_mar20/{YOLO_EXP_NAME}/weights/best.pt")
print("Best YOLO weights at:", YOLO_BEST)


Using device: cuda


In [7]:
class YoloCropDataset(Dataset):
    """
    Build a classification dataset from YOLOv12/v8/v11 txt labels.

    root/
      train/
        images/
        labels/
      valid/
        images/
        labels/
      test/
        images/
        labels/

    Each line in a label file: class cx cy w h (normalized).
    """

    def __init__(self, root_dir, split="train", img_size=224, augment=False):
        self.root_dir = Path(root_dir)
        self.split = split
        self.img_size = img_size
        self.augment = augment

        self.image_dir = self.root_dir / split / "images"
        self.label_dir = self.root_dir / split / "labels"

        assert self.image_dir.exists(), f"Missing {self.image_dir}"
        assert self.label_dir.exists(), f"Missing {self.label_dir}"

        self.samples = []
        self.class_ids = set()
        self._build_index()

        normalize = transforms.Normalize(
            mean=[0.485, 0.456, 0.406],
            std=[0.229, 0.224, 0.225],
        )

        base = [
            transforms.Resize((img_size, img_size)),
            transforms.ToTensor(),
            normalize,
        ]

        if augment:
            self.transform = transforms.Compose(
                [
                    transforms.RandomHorizontalFlip(),
                    transforms.RandomRotation(10),
                    transforms.ColorJitter(
                        brightness=0.2, contrast=0.2,
                        saturation=0.2, hue=0.02
                    ),
                ] + base
            )
        else:
            self.transform = transforms.Compose(base)

        self.num_classes = max(self.class_ids) + 1
        print(f"[{self.split}] {len(self.samples)} crops | "
              f"{len(self.class_ids)} classes.")

    def _build_index(self):
        for label_path in sorted(self.label_dir.glob("*.txt")):
            with open(label_path, "r") as f:
                lines = [l.strip() for l in f.readlines() if l.strip()]
            if not lines:
                continue

            stem = label_path.stem
            img_path = None
            for ext in [".jpg", ".jpeg", ".png"]:
                candidate = self.image_dir / f"{stem}{ext}"
                if candidate.exists():
                    img_path = candidate
                    break
            if img_path is None:
                continue

            for line in lines:
                parts = line.split()
                if len(parts) != 5:
                    continue
                cls_id = int(parts[0])
                cx, cy, w, h = map(float, parts[1:])
                self.samples.append((img_path, (cx, cy, w, h), cls_id))
                self.class_ids.add(cls_id)

        if len(self.samples) == 0:
            raise RuntimeError(f"No crops found in {self.label_dir}")

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        img_path, (cx, cy, w, h), cls_id = self.samples[idx]
        img = Image.open(img_path).convert("RGB")
        W, H = img.size

        bw = w * W
        bh = h * H
        bx = cx * W
        by = cy * H

        x1 = int(max(0, bx - bw / 2))
        y1 = int(max(0, by - bh / 2))
        x2 = int(min(W, bx + bw / 2))
        y2 = int(min(H, by + bh / 2))

        if x2 <= x1 or y2 <= y1:
            crop = img
        else:
            crop = img.crop((x1, y1, x2, y2))

        crop = self.transform(crop)
        return crop, cls_id


Patched ultralytics.utils.patches.imread -> cv2.imread ✅


UnpicklingError: Weights only load failed. This file can still be loaded, to do so you have two options, [1mdo those steps only if you trust the source of the checkpoint[0m. 
	(1) In PyTorch 2.6, we changed the default value of the `weights_only` argument in `torch.load` from `False` to `True`. Re-running `torch.load` with `weights_only` set to `False` will likely succeed, but it can result in arbitrary code execution. Do it only if you got the file from a trusted source.
	(2) Alternatively, to load with `weights_only=True` please check the recommended steps in the following error message.
	WeightsUnpickler error: Unsupported global: GLOBAL torch.nn.modules.container.Sequential was not an allowed global by default. Please use `torch.serialization.add_safe_globals([Sequential])` or the `torch.serialization.safe_globals([Sequential])` context manager to allowlist this global if you trust this class/function.

Check the documentation of torch.load to learn more about types accepted by default with weights_only https://pytorch.org/docs/stable/generated/torch.load.html.

In [None]:
def create_model(num_classes: int, model_type: str = "resnet50"):
    model_type = model_type.lower()
    if model_type == "resnet50":
        weights = models.ResNet50_Weights.DEFAULT
        model = models.resnet50(weights=weights)
        in_feats = model.fc.in_features
        model.fc = nn.Linear(in_feats, num_classes)
    elif model_type == "vit_b_16":
        weights = models.ViT_B_16_Weights.DEFAULT
        model = models.vit_b_16(weights=weights)
        in_feats = model.heads.head.in_features
        model.heads.head = nn.Linear(in_feats, num_classes)
    else:
        raise ValueError("model_type must be 'resnet50' or 'vit_b_16'")
    return model


def train_one_epoch(model, loader, criterion, optimizer, device):
    model.train()
    running_loss, correct, total = 0.0, 0, 0
    for images, labels in loader:
        images, labels = images.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * images.size(0)
        _, preds = outputs.max(1)
        correct += (preds == labels).sum().item()
        total += labels.size(0)
    return running_loss / total, correct / total


@torch.no_grad()
def evaluate(model, loader, criterion, device):
    model.eval()
    running_loss, correct, total = 0.0, 0, 0
    for images, labels in loader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)
        loss = criterion(outputs, labels)
        running_loss += loss.item() * images.size(0)
        _, preds = outputs.max(1)
        correct += (preds == labels).sum().item()
        total += labels.size(0)
    return running_loss / total, correct / total


In [None]:
# Build datasets from YOLO labels
train_ds = YoloCropDataset(DATA_ROOT, split="train", img_size=IMG_SIZE, augment=True)
valid_ds = YoloCropDataset(DATA_ROOT, split="valid", img_size=IMG_SIZE, augment=False)
test_ds  = YoloCropDataset(DATA_ROOT, split="test",  img_size=IMG_SIZE, augment=False)

num_classes = train_ds.num_classes
print("Classifier num_classes:", num_classes)

train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True,
                          num_workers=NUM_WORKERS, pin_memory=True)
valid_loader = DataLoader(valid_ds, batch_size=BATCH_SIZE, shuffle=False,
                          num_workers=NUM_WORKERS, pin_memory=True)
test_loader  = DataLoader(test_ds,  batch_size=BATCH_SIZE, shuffle=False,
                          num_workers=NUM_WORKERS, pin_memory=True)

clf_model = create_model(num_classes, MODEL_TYPE).to(DEVICE)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(clf_model.parameters(), lr=LR)

best_val_acc = 0.0

for epoch in range(1, EPOCHS + 1):
    tr_loss, tr_acc = train_one_epoch(clf_model, train_loader, criterion, optimizer, DEVICE)
    val_loss, val_acc = evaluate(clf_model, valid_loader, criterion, DEVICE)
    print(f"Epoch {epoch:02d}/{EPOCHS} | "
          f"train_loss={tr_loss:.4f} acc={tr_acc:.4f} | "
          f"val_loss={val_loss:.4f} acc={val_acc:.4f}")
    if val_acc > best_val_acc:
        best_val_acc = val_acc
        torch.save({
            "model_state": clf_model.state_dict(),
            "num_classes": num_classes,
            "model_type": MODEL_TYPE,
        }, CLASSIFIER_CKPT)
        print("  -> Saved new best classifier")

print("Best validation accuracy:", best_val_acc)

# Load best checkpoint and evaluate on test set
ckpt = torch.load(CLASSIFIER_CKPT, map_location=DEVICE)
best_clf = create_model(ckpt["num_classes"], ckpt["model_type"]).to(DEVICE)
best_clf.load_state_dict(ckpt["model_state"])

test_loss, test_acc = evaluate(best_clf, test_loader, criterion, DEVICE)
print(f"Classifier test loss: {test_loss:.4f} | test acc: {test_acc:.4f}")


In [None]:
# load trained YOLO detector
detector = YOLO(str(YOLO_BEST))
best_clf.eval()

clf_transform = transforms.Compose([
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.ToTensor(),
    transforms.Normalize(
        mean=[0.485, 0.456, 0.406],
        std=[0.229, 0.224, 0.225],
    ),
])

def classify_crop(pil_img):
    img = clf_transform(pil_img).unsqueeze(0).to(DEVICE)
    with torch.no_grad():
        out = best_clf(img)
        prob = torch.softmax(out, dim=1)
        conf, cls = prob.max(1)
    return cls.item(), conf.item()

def run_pipeline(image_path):
    img = Image.open(image_path).convert("RGB")
    res = detector(str(image_path))[0]

    draw = cv2.cvtColor(np.array(img), cv2.COLOR_RGB2BGR)

    for box in res.boxes:
        x1, y1, x2, y2 = box.xyxy[0].cpu().numpy().astype(int)
        crop = img.crop((x1, y1, x2, y2))

        # classifier prediction
        cls_id, conf_clf = classify_crop(crop)
        cls_name = CLASS_NAMES[cls_id] if cls_id < len(CLASS_NAMES) else str(cls_id)

        # YOLO prediction (for comparison)
        yolo_cls = int(box.cls[0].item())
        yolo_conf = float(box.conf[0].item())
        yolo_name = CLASS_NAMES[yolo_cls] if yolo_cls < len(CLASS_NAMES) else str(yolo_cls)

        label = f"{cls_name} ({conf_clf:.2f}) | YOLO:{yolo_name} ({yolo_conf:.2f})"

        cv2.rectangle(draw, (x1, y1), (x2, y2), (0, 255, 0), 2)
        cv2.putText(draw, label, (x1, max(0, y1-5)),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.4, (255, 255, 255), 1, cv2.LINE_AA)

    draw_rgb = cv2.cvtColor(draw, cv2.COLOR_BGR2RGB)
    plt.figure(figsize=(8, 8))
    plt.imshow(draw_rgb)
    plt.axis("off")
    plt.title("YOLOv12 detector + classifier")
    plt.show()


In [None]:
sample_img = next((DATA_ROOT / "test" / "images").glob("*"))
print("Sample image:", sample_img)
run_pipeline(sample_img)
