In [1]:
from __future__ import annotations

import os
import glob
from dataclasses import dataclass
from typing import Any, Dict, List, Tuple, Optional

import cv2
import yaml
import torch
from torch.utils.data import Dataset

import albumentations as A
from transformers import TrainingArguments, Trainer

from types import SimpleNamespace
from ultralytics import YOLO
from types import SimpleNamespace
from ultralytics.utils.loss import v8DetectionLoss


import math
from transformers import TrainerCallback
import copy

## 1) Mapeamento de classes (`id2label` e `label2id`)

Definindo **a “tabela de tradução”** entre:

- o **ID numérico** que o modelo usa internamente (0, 1)
- o **nome** da classe (“fruit”, “bomb”)

Isso é feito pois os modelos não entendem caracteres e sim números, então passamos cada classe de forma numérica, mas no momento de nós utilizarmos fica mais visual em texto.

---


In [2]:
id2label = {
    0: "fruit",
    1: "bomb",
}
label2id = {v: k for k, v in id2label.items()}

print(id2label)
print(label2id)

{0: 'fruit', 1: 'bomb'}
{'fruit': 0, 'bomb': 1}


## 2) `Processor`: do frame ao batch

**um processador de imagem + bounding boxes** que:

1. Recebe a imagem crua (BGR do OpenCV) e as caixas no formato YOLO (`x, y, w, h` normalizado).
2. Aplica um **resize com manutenção de proporção** + **pad** para bater no `imgsz` do modelo.
3. Se `train=True` aplica augmentations que é o processo de alteração das imagens para garantir que o modelo não irá fazer overfiting e aprender de fato as features.
4. Retorna tudo em tensores PyTorch no formato que o `collate_fn` e o `criterion` do YOLO espera.

---

## O que é “pad” (padding)?

**Pad**/**padding** é literalmente **preencher as “bordas vazias”** de uma imagem com pixels extras para ela atingir um tamanho alvo.

No uso em questão o alvo é: `640 x 640`.

---

## Por que você precisa de pad no YOLO?

1. `LongestMaxSize(max_size=imgsz)`  
   → redimensiona mantendo a proporção (aspect ratio)

2. `PadIfNeeded(min_height=imgsz, min_width=imgsz)`  
   → completa o que faltar até virar um quadrado `imgsz x imgsz`

Exemplo prático:

- Frame original: **1280 x 720**
- Após `LongestMaxSize(640)`: **640 x 360** (mantém proporção)
- Só que o modelo quer **640 x 640**
- Então entra o **pad**: preenche o resto até ficar 640x640

Visualmente, fica assim:

- conteúdo real ocupa 640x360
- sobram “faixas” (geralmente em cima/baixo, ou esquerda/direita)
- essas faixas recebem uma cor constante: `fill=(114,114,114)`

---


In [3]:
# imagem processor
@dataclass
class YoloBatchItem:
    img: torch.Tensor          # (3, H, W) float32 [0..1]
    bboxes: torch.Tensor       # (N, 4) float32 (x,y,w,h) normalizado
    cls: torch.Tensor          # (N, 1)
    area: torch.Tensor         # (N,) float32 área em pixels (após resize/pad)
    im_file: str
    ori_shape: Tuple[int, int] # (h, w)

In [4]:
class YoloImageProcessor:
    """
    Processa imagem + bboxes YOLO e aplica augmentations.
    Retorna tensores no padrão collate para o loss do YOLO.
    """

    def __init__(self, imgsz: int = 640, train: bool = True):
        self.imgsz = imgsz
        self.train = train

        # mantém ratio com LongestMaxSize + PadIfNeeded
        base = [
            # redimensiona mantendo ratio
            A.LongestMaxSize(max_size=imgsz, interpolation=cv2.INTER_LINEAR),
            # pad para quadrado imgsz x imgsz (640x640, com borda cinza)
            A.PadIfNeeded(
                min_height=imgsz,
                min_width=imgsz,
                border_mode=cv2.BORDER_CONSTANT,
                fill=(114, 114, 114),
            ),
        ]

        if train:
            aug = [
                # geometria
                # melhora a robustez contra pequenas mudanças de escala/rotação/perspectiva
                A.Affine(
                    shift_limit=0.05, # desloca até 5% da largura/altura
                    scale_limit=0.20, # escala a imagem
                    rotate_limit=15, # rotaciona até 15 graus
                    border_mode=cv2.BORDER_CONSTANT, # adiciona borda
                    fill=(114, 114, 114), # cor da borda
                    p=0.9, # probabilidade
                ),
                # simula distorsão de perspectiva leve
                A.Perspective(scale=(0.00, 0.05), p=0.15),

                # cor/iluminação
                A.HueSaturationValue(hue_shift_limit=10, sat_shift_limit=20, val_shift_limit=15, p=0.6), # muda matiz, saturação, valor
                A.RandomBrightnessContrast(brightness_limit=0.2, contrast_limit=0.2, p=0.6), # simula variação de brilho/contraste
                A.RGBShift(r_shift_limit=10, g_shift_limit=10, b_shift_limit=10, p=0.2), # muda canais RGB

                # blur leve
                A.GaussianBlur(blur_limit=(3, 5), p=0.10),
            ]
        else:
            aug = []

        # faz com que as caixas(bboxes) sejam ajustadas às transformações
        self.transform = A.Compose(
            base + aug,
            bbox_params=A.BboxParams(
                format="yolo",        # (x,y,w,h) normalizado
                label_fields=["category_ids"],
                min_visibility=0.0,
                clip=True,
            ),
        )

    def __call__(self, image_bgr: Any, bboxes_yolo: List[List[float]], cls_ids: List[int], im_file: str) -> YoloBatchItem:
        """
        image_bgr: np.ndarray (H,W,3) BGR
        bboxes_yolo: lista de [x,y,w,h] normalizados
        cls_ids: lista de int
        """

        # mantem shape original para retorno se necessário
        h0, w0 = image_bgr.shape[:2]
        image_rgb = cv2.cvtColor(image_bgr, cv2.COLOR_BGR2RGB)

        # aplica transformações 
        out = self.transform(image=image_rgb, bboxes=bboxes_yolo, category_ids=cls_ids)
        img = out["image"]  # imagem transformada (H,W,3) RGB
        bboxes = out["bboxes"] # lista de [x,y,w,h] normalizados
        cats = out["category_ids"] # classes correspondentes

        # converte a imagem para tensor, que é o formato esperado pelo modelo. HWC - CHW, float32 [0..1]
        img_t = torch.from_numpy(img).permute(2, 0, 1).contiguous().float() / 255.0

        # tratando os casos em que é passado apenas o fundo sem objetos, o que não gera bboxes
        if len(bboxes) == 0:
            bboxes_t = torch.zeros((0, 4), dtype=torch.float32)
            cls_t = torch.zeros((0, 1), dtype=torch.int64)
            area_t = torch.zeros((0,), dtype=torch.float32)
        else:
            bboxes_t = torch.tensor(bboxes, dtype=torch.float32)         # (N,4) yolo norm
            cls_t = torch.tensor(cats, dtype=torch.int64).view(-1, 1)  # (N,1)

            # area_px = (w*imgsz) * (h*imgsz)
            area_t = (bboxes_t[:, 2] * self.imgsz) * (bboxes_t[:, 3] * self.imgsz)

        return YoloBatchItem(
            img=img_t,
            bboxes=bboxes_t,
            cls=cls_t,
            area=area_t,
            im_file=im_file,
            ori_shape=(h0, w0),
        )

## Leitura do dataset

A ideia deste bloco é montar um **Dataset PyTorch** que:

1. Lê o `data.yaml` no padrão YOLO
2. Encontra todas as imagens do split
3. Para cada imagem, acha o `.txt` correspondente em `labels/`
4. Lê as bboxes no formato YOLO (`class x y w h`, tudo normalizado)
5. Joga tudo no `YoloImageProcessor`
6. Retorna um dicionário pronto para o `collate_fn` montar o batch do jeito que o **loss da Ultralytics** espera

---


In [None]:
def _read_data_yaml(data_yaml: str) -> Dict[str, Any]:
    with open(data_yaml, "r", encoding="utf-8") as f:
        return yaml.safe_load(f)
    
def _resolve_path(base: str, p: str) -> str:
    if os.path.isabs(p):
        return p
    return os.path.normpath(os.path.join(base, p))

def _list_images_from_source(src: str) -> List[str]:
    if os.path.isdir(src):
        exts = ("*.jpg", "*.jpeg", "*.png", "*.bmp", "*.webp")
        files = []
        for e in exts:
            files.extend(glob.glob(os.path.join(src, e)))
        return sorted(files)

    if os.path.isfile(src) and src.lower().endswith(".txt"):
        with open(src, "r", encoding="utf-8") as f:
            files = [ln.strip() for ln in f.readlines() if ln.strip()]
        return files
    
def _img_to_label_path(im_path: str) -> str:
    """
    Convenção YOLO comum:
      .../images/.../xxx.jpg -> .../labels/.../xxx.txt
    """
    p = im_path.replace(os.sep + "images" + os.sep, os.sep + "labels" + os.sep)
    p = os.path.splitext(p)[0] + ".txt"
    return p

def _read_yolo_label_file(label_path: str) -> Tuple[List[int], List[List[float]]]:
    """
    Retorna (cls_ids, bboxes_yolo_norm)
    """
    if not os.path.exists(label_path):
        return [], []

    cls_ids: List[int] = []
    bboxes: List[List[float]] = []

    with open(label_path, "r", encoding="utf-8") as f:
        for line in f:
            line = line.strip()
            if not line:
                continue
            parts = line.split()
            if len(parts) != 5:
                continue
            c = int(float(parts[0]))
            x, y, w, h = map(float, parts[1:])
            cls_ids.append(c)
            bboxes.append([x, y, w, h])

    return cls_ids, bboxes


class YoloDetectionDataset(Dataset):
    def __init__(self, data_yaml: str, split: str, imgsz: int = 640, train: bool = True):
        """
        split: "train" ou "val"
        train: define se usa augmentations ou não
        """
        cfg = _read_data_yaml(data_yaml)
        base = os.path.dirname(os.path.abspath(data_yaml))

        # nomes no yaml podem ser list ou dict
        names = cfg.get("names", None)
        if names is not None:
            pass

        src = cfg.get(split, None)
        if src is None:
            raise KeyError(f"data.yaml não tem a chave '{split}'")

        src = _resolve_path(base, src)
        self.image_paths = _list_images_from_source(src)

        self.processor = YoloImageProcessor(imgsz=imgsz, train=train)

    def __len__(self) -> int:
        return len(self.image_paths)

    def __getitem__(self, idx: int) -> Dict[str, Any]:
        im_path = self.image_paths[idx]
        img_bgr = cv2.imread(im_path, cv2.IMREAD_COLOR)
        if img_bgr is None:
            raise FileNotFoundError(f"Falha ao ler imagem: {im_path}")

        lab_path = _img_to_label_path(im_path)
        cls_ids, bboxes = _read_yolo_label_file(lab_path)

        item = self.processor(img_bgr, bboxes, cls_ids, im_file=im_path)

        # o collate_fn vai montar o batch do jeito do YOLO loss.
        return {
            "img": item.img,
            "bboxes": item.bboxes,
            "cls": item.cls,
            "area": item.area,
            "im_file": item.im_file,
            "ori_shape": item.ori_shape,
        }

## `collate_fn` - montando o batch do jeito que o YOLO loss gosta

No PyTorch, o `Dataset.__getitem__()` devolve **um item por vez**.

Mas o treino roda em **batch** (várias imagens juntas).  
Quem transforma uma lista de itens em um batch é o `collate_fn`.

`collate_fn` tem um objetivo bem específico:

- **Empilhar as imagens** em um tensor `(B, 3, H, W)`
- **Concatenar todas as bboxes do batch** em um único tensor `(N_total, 4)`
- **Concatenar todas as classes** em `(N_total, 1)`
- **Criar um vetor** `batch_idx` **de tamanho** `(N_total,)` **dizendo a qual imagem do batch cada bbox pertence**

Essa última parte (`batch_idx`) é crucial, porque cada imagem tem um número variável de objetos, então você não consegue ter um tensor “fixo” como `(B, max_objs, 4)`.  
A Ultralytics resolve isso com concatenação + índice.

---


In [None]:
def collate_fn(examples: List[Dict[str, Any]]) -> Dict[str, Any]:
    # empilha as imagens, cada imagem é um tensor (3,H,W) já normalizado [0..1]
    imgs = torch.stack([e["img"] for e in examples], dim=0)  # (B,3,H,W)

    # os boxes não podem ser empilhados direto porque cada imagem pode ter número diferente de objetos nela
    bboxes_list = []
    cls_list = []
    batch_idx_list = []

    # para cada exemplo na batch
    for i, e in enumerate(examples):
        # número de boxes na imagem i
        n = e["bboxes"].shape[0]
        # se não tem boxes, pula
        if n == 0:
            continue
        # adiciona os boxes e classes
        bboxes_list.append(e["bboxes"])
        cls_list.append(e["cls"])
        # índices da batch para cada box
        # ex: se imagem i=2 tem n=7 então cria tensor [2,2,2,2,2,2,2] de shape (7,)
        batch_idx_list.append(torch.full((n,), i, dtype=torch.int64))

    # caso o batch não tenha boxes nenhum, cria tudo como zeros
    # ex: todas imagens são fundo
    if len(bboxes_list) == 0:
        bboxes = torch.zeros((0, 4), dtype=torch.float32)
        cls = torch.zeros((0, 1), dtype=torch.int64)
        batch_idx = torch.zeros((0,), dtype=torch.int64)
    else:
        # concatena todos os boxes, classes e índices da batch
        bboxes = torch.cat(bboxes_list, dim=0)
        cls = torch.cat(cls_list, dim=0)
        batch_idx = torch.cat(batch_idx_list, dim=0)

    return {
        "img": imgs,
        "bboxes": bboxes,
        "cls": cls,
        "batch_idx": batch_idx,
        # não usados no loss, mas util pra debug
        "im_file": [e["im_file"] for e in examples],
        "ori_shape": [e["ori_shape"] for e in examples],
    }

## `_first_tensor_sum`: “âncora” de gradiente quando o loss vira constante
### Por que se precisa disso?
Em detecção de objetos pode ocorrer de termos apenas o fundo (background) e nesses casos a loss acaba retornando um valor constante. Quando isso acontece o trainer vai tentar fazer o backward e pode dar um erro ou o treino fica incosistente. Ao pegar um valor escalar no momento de fazer a loss ele vai gerantir que o valor existirá - `loss = loss + 0.0 * anchor`

## `ensure_yolo_criterion`: Cria/Ajusta a loss no device (CPU/GPU)
Garante que o criterion exista, esteja setado e que todos os buffers estão no local correto


In [None]:
def _first_tensor_sum(x):
    """Acha algum tensor em preds e retorna um escalar com grad para ancorar o grafo."""
    if torch.is_tensor(x):
        return x.sum()
    if isinstance(x, (list, tuple)):
        for t in x:
            if torch.is_tensor(t):
                return t.sum()
            if isinstance(t, (list, tuple, dict)):
                s = _first_tensor_sum(t)
                if s is not None:
                    return s
    if isinstance(x, dict):
        for v in x.values():
            s = _first_tensor_sum(v)
            if s is not None:
                return s
    return None


def ensure_yolo_criterion(m):
    defaults = {"box": 7.5, "cls": 0.5, "dfl": 1.5}

    # prepara args como namespace que é o esperado pelo loss
    # na prática garante que existam box, cls, dfl
    args = getattr(m, "args", None)
    # caso args seja dict ou None, cria namespace novo
    if isinstance(args, dict):
        m.args = SimpleNamespace(**{**defaults, **args})
    elif args is None:
        m.args = SimpleNamespace(**defaults)
    # se já é namespace, só adiciona os defaults que faltam
    else:
        for k, v in defaults.items():
            if not hasattr(m.args, k):
                setattr(m.args, k, v)

    # cria criterion
    if getattr(m, "criterion", None) is None and hasattr(m, "init_criterion"):
        try:
            m.init_criterion()
        except Exception:
            pass

    if getattr(m, "criterion", None) is None:
        m.criterion = v8DetectionLoss(m)

    # garante que hyp esteja alinhado
    crit = m.criterion
    if hasattr(crit, "hyp"):
        crit.hyp = m.args

    # garante device consistente
    device = next(m.parameters()).device

    # 1) garante crit.device correto
    if hasattr(crit, "device"):
        crit.device = device

    # 2) garante stride no mesmo device
    if hasattr(crit, "stride"):
        if torch.is_tensor(crit.stride):
            crit.stride = crit.stride.to(device)

    # 3) proj e qualquer tensor interno do loss para o device
    if hasattr(crit, "proj") and torch.is_tensor(crit.proj):
        crit.proj = crit.proj.to(device)

    for name, val in vars(crit).items():
        if torch.is_tensor(val) and val.device != device:
            setattr(crit, name, val.to(device))

    # 4) alguns modelos também têm m.stride como tensor fora do device
    if hasattr(m, "stride") and torch.is_tensor(m.stride) and m.stride.device != device:
        m.stride = m.stride.to(device)

    return m

# `YoloHFTrainer.compute_loss` (YOLO + Hugging Face Trainer)

A ideia central aqui é: **o `Trainer` do Hugging Face foi feito pensando em modelos “NLP-style”** (que recebem `model(**inputs)` e devolvem `loss` dentro de `outputs`).  
No YOLO (Ultralytics), o fluxo real é outro:

1. você passa **apenas a imagem** no forward: `preds = model(img)`
2. você calcula o loss chamando um **criterion próprio**: `loss = criterion(preds, batch)`
3. às vezes o batch não tem boxes e o loss pode virar **constante sem grad**

Esse método adapta o Trainer para esse mundo.

---


In [None]:
class YoloHFTrainer(Trainer):
    def compute_loss(self, model, inputs, return_outputs=False, num_items_in_batch=None):
        # manda para o device
        raw_model = self.accelerator.unwrap_model(model) if hasattr(self, "accelerator") else model
        device = next(raw_model.parameters()).device

        # move inputs para o device do modelo
        batch = {k: (v.to(device, non_blocking=True) if torch.is_tensor(v) else v) for k, v in inputs.items()}

        # garante criterion
        if getattr(raw_model, "criterion", None) is None or not callable(raw_model.criterion):
            ensure_yolo_criterion(raw_model)

        # forward + loss, garante gradiente pois aqui estamos treinando
        with torch.enable_grad():
            preds = raw_model(batch["img"])                  # forward padrão
            loss_out = raw_model.criterion(preds, batch)     # loss YOLO, loss_out = (loss, loss_items), loss_items = (box, cls, dfl)
            
        # guarda loss_items (box, cls, dfl) para logging por época, usado em TrainerCallback
        self._last_loss_items = None
        if isinstance(loss_out, (tuple, list)) and len(loss_out) >= 2 and torch.is_tensor(loss_out[1]):
            li = loss_out[1].detach()
            if li.numel() >= 3:
                # (3,) -> box, cls, dfl
                self._last_loss_items = li[:3].float().detach().cpu()

        # extrai o loss total
        if isinstance(loss_out, (tuple, list)):
            loss = loss_out[0]
        else:
            loss = loss_out

        # garante escalar pois o HF espera que seja escalar
        if loss.numel() != 1:
            loss = loss.mean()

        # se batch não tem bbox, loss pode virar constante - sem grad
        if not loss.requires_grad:
            # tenta ancorar em algo que veio do forward
            anchor = _first_tensor_sum(preds)

            # se não der, ancora nos parâmetros treináveis (head-only)
            if anchor is None or (torch.is_tensor(anchor) and not anchor.requires_grad):
                trainable_sum = None
                for p in raw_model.parameters():
                    if p.requires_grad:
                        trainable_sum = p.sum() if trainable_sum is None else (trainable_sum + p.sum())

                if trainable_sum is None:
                    raise RuntimeError("Nenhum parâmetro treinável. Você congelou tudo e não liberou o head corretamente.")

                anchor = trainable_sum

            # linha que foi comentado em sima, não muda o valor mas força que ocorra o gradiente
            loss = loss + 0.0 * anchor


        return (loss, loss_out) if return_outputs else loss

# `build_yolo_model` + `freeze_yolo_head_only` - carregando o YOLO e treinando só o head

1. **`build_yolo_model`**: carrega um `.pt` do Ultralytics e devolve o `BaseModel` pronto pra usar no pipeline (criterion alinhado, train mode ligado).
2. **`freeze_yolo_head_only`**: congela o modelo inteiro e libera **somente o head de detecção**, para fine-tuning leve (Aqui é onde está o segredo).

Funciona porque:
- o backbone já “enxerga” padrões visuais genéricos
- o que você mais precisa adaptar são os **últimos blocos** que mapeiam features - caixas/classes (fruta/bomba)

---


In [None]:
def build_yolo_model(weights_pt: str):
    y = YOLO(weights_pt)
    m = y.model  # Ultralytics BaseModel

    # garante args + criterion + hyp
    m = ensure_yolo_criterion(m)

    # coloca o modelo em modo treino
    m.train()
    return m


In [None]:
def freeze_yolo_head_only(m):
    # congela tudo
    for p in m.parameters():
        p.requires_grad_(False)

    # acha o detect head de forma robusta
    # porque no Ultralytics o último módlo pode conter: Detect, v8Detect, DetectMultiBackend, etc
    # procuramos apenas por "detect"
    head = None
    if hasattr(m, "model"):  # Ultralytics BaseModel: m.model é uma lista de camadas
        for layer in reversed(m.model):
            name = layer.__class__.__name__.lower()
            if "detect" in name:  # Detect, v8Detect, etc.
                head = layer
                break
        if head is None:
            head = m.model[-1]  # fallback
    else:
        head = m  # fallback extremo

    # libera só o head que são os casos que queremos
    for p in head.parameters():
        p.requires_grad_(True)

    return head

# Painel para o treinamento
Aqui são funções que obtém as mesmas informações existentes no treinamento da Ultralytics, não irei entrar em detalhes da implementação.

In [None]:
def _extract_ultra_metrics(val_out):
    """
    Tenta extrair P, R, mAP50, mAP50-95 do retorno do YOLO.val().
    """
    P = R = mAP50 = mAP5095 = float("nan")

    # caminho mais comum: metrics.box.mp / mr / map50 / map
    if hasattr(val_out, "box"):
        box = val_out.box
        if hasattr(box, "mp"):    P = float(box.mp)
        if hasattr(box, "mr"):    R = float(box.mr)
        if hasattr(box, "map50"): mAP50 = float(box.map50)
        if hasattr(box, "map"):   mAP5095 = float(box.map)

    # fallback: results_dict
    if (math.isnan(P) or math.isnan(R) or math.isnan(mAP50) or math.isnan(mAP5095)) and hasattr(val_out, "results_dict"):
        rd = val_out.results_dict
        # tenta achar pelas chaves mais frequentes
        for k, v in rd.items():
            lk = str(k).lower()
            if "precision" in lk and "box" in lk:
                P = float(v)
            elif "recall" in lk and "box" in lk:
                R = float(v)
            elif "map50" in lk and "box" in lk:
                mAP50 = float(v)
            elif ("map" in lk and "box" in lk) and ("map50" not in lk):
                mAP5095 = float(v)

    return P, R, mAP50, mAP5095


class YoloPrettyPrintCallback(TrainerCallback):
    def __init__(self, trainer, data_yaml, imgsz, device=0, val_batch=16, weights_stub="./yolo11n.pt", run_val_every_epochs=1):
        self.trainer = trainer
        self.data_yaml = data_yaml
        self.imgsz = imgsz
        self.device = device
        self.val_batch = val_batch
        self.weights_stub = weights_stub
        self.run_val_every_epochs = run_val_every_epochs

        self.epoch_i = 0
        self.reset_epoch_acc()

    def reset_epoch_acc(self):
        self.n_steps = 0
        self.sum_box = 0.0
        self.sum_cls = 0.0
        self.sum_dfl = 0.0

    def on_step_end(self, args, state, control, **kwargs):
        # pega o último loss_items salvo no compute_loss
        li = getattr(self.trainer, "_last_loss_items", None)
        if li is None:
            return

        self.sum_box += float(li[0])
        self.sum_cls += float(li[1])
        self.sum_dfl += float(li[2])
        self.n_steps += 1

    def on_epoch_end(self, args, state, control, **kwargs):
        self.epoch_i += 1
        total_epochs = int(args.num_train_epochs)

        # médias da época (loss)
        denom = max(1, self.n_steps)
        box_loss = self.sum_box / denom
        cls_loss = self.sum_cls / denom
        dfl_loss = self.sum_dfl / denom

        # métricas de detecção (P/R/mAP)
        P = R = mAP50 = mAP5095 = float("nan")

        do_val = (self.run_val_every_epochs > 0) and (self.epoch_i % self.run_val_every_epochs == 0)
        if do_val:
            raw_model = self.trainer.accelerator.unwrap_model(self.trainer.model) if hasattr(self.trainer, "accelerator") else self.trainer.model

            y = YOLO(self.weights_stub)  # só para montar pipeline de val
            y.model.load_state_dict(copy.deepcopy(raw_model.state_dict()), strict=False)
            val_out = y.val(
                data=self.data_yaml,
                imgsz=self.imgsz,
                batch=self.val_batch,
                device=self.device,
                plots=False,
                save=False,
                verbose=False,
            )
            P, R, mAP50, mAP5095 = _extract_ultra_metrics(val_out)

        print(f"\nEpoch  {self.epoch_i}/{total_epochs}")
        print(f"box_loss  {box_loss:.4f}")
        print(f"cls_loss  {cls_loss:.4f}")
        print(f"dfl_loss  {dfl_loss:.4f}")
        print(f"Box(P  {P:.3f}")
        print(f"R  {R:.3f}")
        print(f"mAP50  {mAP50:.2f}")
        print(f"mAP50-95)  {mAP5095:.2f}")

        self.reset_epoch_acc()
        return control

#  `main()` O pipeline do finetunning

- **Dataset YOLO (imagens + labels)** → `YoloDetectionDataset`
- **Modelo Ultralytics (YOLO)** → `build_yolo_model`
- **Treino controlado pelo Hugging Face Trainer** → `YoloHFTrainer`
- **Validação no estilo Ultralytics** → `YoloPrettyPrintCallback`

A lógica geral é:

1. Define caminhos e parâmetros (data.yaml, imgsz, pesos base).
2. Carrega dataset de treino e validação.
3. Carrega modelo YOLO, manda pra GPU e garante o criterion no device certo.
4. Congela tudo e libera só o head (fine-tuning leve).
5. Monta `TrainingArguments`.
6. Cria otimizador **só com parâmetros treináveis**.
7. Instancia o `Trainer` customizado.
8. Adiciona callback para imprimir loss + métricas por época.
9. Treina.
10. Salva o `state_dict` e limpa GPU.
---

In [None]:
def main():
    # 1. caminhos e parâmetros
    data_yaml = "../datasets/fruitninja_yolo/data.yaml"
    imgsz = 640

    weights_pt = "./yolo11n.pt" # pesos base

    # 2. carrega os df de treino e validação
    train_ds = YoloDetectionDataset(data_yaml=data_yaml, split="train", imgsz=imgsz, train=True)
    val_ds   = YoloDetectionDataset(data_yaml=data_yaml, split="val", imgsz=imgsz, train=False)

    # 3. Carrega modelo YOLO, manda pra GPU e garante o criterion no device certo
    model = build_yolo_model(weights_pt).cuda()
    ensure_yolo_criterion(model)
    
    # 4. Congela tudo e libera só o head (fine-tuning leve).
    head = freeze_yolo_head_only(model)
    
    # apenas para visualização
    trainable_named = [(n, p) for n, p in model.named_parameters() if p.requires_grad]
    print("Trainable params:", len(trainable_named))
    print("Exemplo:", [(n, p.shape) for n, p in trainable_named[:10]])

    # 5. Monta `TrainingArguments`.
    training_args = TrainingArguments(
        output_dir="../models/runs/fruitninja_yolo_manual/",
        per_device_train_batch_size=16, # "amostra" com 16 imagens
        per_device_eval_batch_size=16,
        num_train_epochs=120, # quantas execuções fazer por batch, garante estabilização

        logging_steps=50, # fazer um log a cada 50 passos
        save_steps=200, # fazer um checkpoint a cada 200 passos
        save_total_limit=2, # só mantém 2 checkpoints
        eval_strategy="no", # não faz evaluate pois foi montado o YoloPrettyPrintCallback

        fp16=True, # ativa mixed precision, o que aumenta perfomance e consome menos VRAM
        dataloader_num_workers=0, # evita bugs no jupyter, se mudar aqui quebra, mas em código normal tende a funcionar

        remove_unused_columns=False, # como o batch é personalizado (bboxes, cls, batch_idx) o trainer por padrão pode tentar limpar essas colunas, aqui estamos impedindo isso de acontecer
        report_to="none",
    )
    
    # 6. Cria otimizador só com parâmetros treináveis.
    trainable_params = [p for _, p in trainable_named]
    
    optimizer = torch.optim.AdamW(
        trainable_params,
        lr=2e-4,
        weight_decay=1e-4,
        betas=(0.9, 0.999),
        eps=1e-8,
    )

    # 7. Instancia o `Trainer` customizado.
    trainer = YoloHFTrainer(
        model=model,
        args=training_args,
        train_dataset=train_ds,
        eval_dataset=val_ds,
        data_collator=collate_fn,
        tokenizer=None,  # aqui não precisa pois já foi passado dentro do Dataset
        optimizers=(optimizer, None)
    )
    
    print("model device:", next(model.parameters()).device)
    print("crit.device:", model.criterion.device)
    print("crit.stride:", type(model.criterion.stride), getattr(model.criterion.stride, "device", "no-device"))
    print("crit.proj:", model.criterion.proj.device)

    # 8. Adiciona callback para imprimir loss + métricas por época.
    trainer.add_callback(
        YoloPrettyPrintCallback(
            trainer=trainer,
            data_yaml=data_yaml,
            imgsz=imgsz,
            device=0,
            val_batch=16,
            weights_stub=weights_pt,        
            run_val_every_epochs=1,         # valida toda epoch
        )
    )
    
    # 9. Treina
    trainer.train()

    # 10. Salva o `state_dict` e limpa GPU.
    os.makedirs(training_args.output_dir, exist_ok=True)
    out_path = os.path.join(training_args.output_dir, "yolo_state_dict.pt")
    torch.save(model.state_dict(), out_path)
    print("Salvo em:", out_path)

    del model
    torch.cuda.synchronize()
    torch.cuda.empty_cache()

In [14]:
main()

  A.Affine(
  trainer = YoloHFTrainer(


Trainable params: 67
Exemplo: [('model.23.cv2.0.0.conv.weight', torch.Size([64, 64, 3, 3])), ('model.23.cv2.0.0.bn.weight', torch.Size([64])), ('model.23.cv2.0.0.bn.bias', torch.Size([64])), ('model.23.cv2.0.1.conv.weight', torch.Size([64, 64, 3, 3])), ('model.23.cv2.0.1.bn.weight', torch.Size([64])), ('model.23.cv2.0.1.bn.bias', torch.Size([64])), ('model.23.cv2.0.2.weight', torch.Size([64, 64, 1, 1])), ('model.23.cv2.0.2.bias', torch.Size([64])), ('model.23.cv2.1.0.conv.weight', torch.Size([64, 128, 3, 3])), ('model.23.cv2.1.0.bn.weight', torch.Size([64]))]
criterion: <class 'ultralytics.utils.loss.v8DetectionLoss'> callable: True
hyp: <class 'types.SimpleNamespace'> box: 7.5
model device: cuda:0
crit.device: cuda:0
crit.stride: <class 'torch.Tensor'> cuda:0
crit.proj: cuda:0


Step,Training Loss
50,24.1476
100,15.3929
150,14.0409
200,12.8526
250,12.3931
300,11.709
350,11.297
400,10.9605
450,10.6259
500,10.3847


Ultralytics 8.3.248  Python-3.11.14 torch-2.5.1 CUDA:0 (NVIDIA GeForce RTX 4060, 8187MiB)
YOLO11n summary (fused): 100 layers, 2,616,248 parameters, 0 gradients
[34m[1mval: [0mFast image access  (ping: 0.00.0 ms, read: 200.131.0 MB/s, size: 21.2 KB)
[K[34m[1mval: [0mScanning C:\Users\Pichau\Desktop\Programacao\FruitNAI\datasets\fruitninja_yolo\labels\val.cache... 61 images, 7 backgrounds, 0 corrupt: 100% ━━━━━━━━━━━━ 61/61 55.4Kit/s 0.0s
[K                 Class     Images  Instances      Box(P          R      mAP50  mAP50-95): 100% ━━━━━━━━━━━━ 4/4 1.6it/s 2.4s0.2ss
                   all         61        112      0.779      0.198      0.303      0.196
Speed: 0.6ms preprocess, 5.5ms inference, 0.0ms loss, 1.3ms postprocess per image

Epoch  1/120
box_loss  1.3781
cls_loss  5.3145
dfl_loss  1.2965
Box(P  0.779
R  0.198
mAP50  0.30
mAP50-95)  0.20
Ultralytics 8.3.248  Python-3.11.14 torch-2.5.1 CUDA:0 (NVIDIA GeForce RTX 4060, 8187MiB)
YOLO11n summary (fused): 100 layers, 2,616