In [6]:
# Pré-processamento com verificação e retomada (10 cores VeRi, sem limite)
# - Corrige XML gb2312 (gb18030/gbk -> UTF-8)
# - Filtra somente "carros" (VeRi) e somente as 10 cores do VeRi
# - Verifica pastas existentes, evita duplicatas e resume sem sobrescrever
# - Gera manifest/resumo por split/classe
# Saída: COLOR_FINAL_YOLO no formato Ultralytics (pastas por classe + data.yaml)

import xml.etree.ElementTree as ET
from pathlib import Path
from collections import defaultdict
import shutil, json, re

# ---------- Constantes ----------
VERI_COLOR_ID = {1:"yellow",2:"orange",3:"green",4:"gray",5:"red",6:"blue",7:"white",8:"golden",9:"brown",10:"black"}  # cores VeRi [1]
CAR_TYPES = {1,2,4,5,9}  # sedan, suv, hatchback, mpv, estate [1]
COLOR_MAP = {
    "yellow":"amarelo","orange":"laranja","green":"verde","gray":"cinza_prata","red":"vermelho",
    "blue":"azul","white":"branco","golden":"dourado","brown":"marrom","black":"preto",
    "grey":"cinza_prata","silver":"cinza_prata","beige":"bege","gold":"dourado","purple":"roxo","pink":"rosa","tan":"bege"
}  # normalização p/ pastas por classe Ultralytics [1]
ALLOWED_10 = {"amarelo","laranja","verde","cinza_prata","vermelho","azul","branco","dourado","marrom","preto"}  # paleta VeRi [1]

# ---------- Utilidades de arquivo ----------
def ensure_dir(p: Path):
    p.mkdir(parents=True, exist_ok=True)  # cria diretório de forma idempotente [2]

def list_files(dirp: Path):
    return {f.name for f in dirp.glob("*.*")} if dirp.exists() else set()  # inventário rápido por pasta [2]

def safe_copy(src: Path, dst_dir: Path, prefix: str, allow_collision_suffix=True):
    """Copia evitando sobrescrever; se existir o mesmo nome, adiciona sufixo incremental."""
    if not src.exists():
        return None
    ensure_dir(dst_dir)
    base = f"{prefix}_{src.name}"
    dst = dst_dir / base
    if not dst.exists():
        shutil.copy2(src, dst)
        return dst
    if not allow_collision_suffix:
        return None
    # cria sufixos _1, _2, ...
    stem, suf = dst.stem, dst.suffix
    i = 1
    while True:
        cand = dst_dir / f"{stem}_{i}{suf}"
        if not cand.exists():
            shutil.copy2(src, cand)
            return cand
        i += 1  # caminho existe; tenta próximo [2]

def load_xml_root_utf8(xml_path: Path):
    """Lê XML VeRi declarados como gb2312/GBK/GB18030 e normaliza para UTF-8 (correção ElementTree)."""
    data = xml_path.read_bytes()
    text = None
    for enc in ("gb18030","gbk","utf-8"):
        try:
            text = data.decode(enc); break
        except UnicodeDecodeError:
            continue
    if text is None:
        text = data.decode("latin-1", errors="ignore")
    if text.lstrip().startswith("<?xml"):
        text = re.sub(r'encoding=[\'\"].*?[\'\"]', 'encoding="utf-8"', text, count=1)
    else:
        text = '<?xml version="1.0" encoding="utf-8"?>\n' + text
    return ET.fromstring(text.encode("utf-8"))  # evita erro do Expat com multibyte [2]

def summarize_tree(root_dir: Path):
    """Resumo por split/classe: contagem de arquivos, útil para auditoria e retomada."""
    summary = defaultdict(lambda: defaultdict(int))
    if not root_dir.exists():
        return summary
    for split in root_dir.iterdir():
        if not split.is_dir():
            continue
        for cls in split.iterdir():
            if cls.is_dir():
                summary[split.name][cls.name] = sum(1 for _ in cls.glob("*.*"))
    return summary  # ajuda a validar formato Ultralytics (pastas por classe) [1]

def save_json(obj, path: Path):
    ensure_dir(path.parent)
    path.write_text(json.dumps(obj, indent=2, ensure_ascii=False), encoding="utf-8")  # persistência idempotente [2]

# ---------- Processamentos ----------
def process_veri(veri_root: Path, out_root: Path) -> dict:
    """Extrai APENAS carros do VeRi e organiza por cor normalizada nas 10 cores."""
    stats = {"total":0,"cars":0,"colors":defaultdict(int)}
    veri_out = out_root / "veri_processed"
    cfgs = [("train","train_label.xml","image_train"), ("test","test_label.xml","image_test")]
    if (veri_root/"image_query").exists():
        cfgs.append(("query","test_label.xml","image_query"))
    for split, xmlf, imdir in cfgs:
        x, d = veri_root/xmlf, veri_root/imdir
        if not x.exists() or not d.exists():
            continue
        root = load_xml_root_utf8(x)
        for it in root.iter("Item"):
            stats["total"] += 1
            cid, tid = int(it.attrib["colorID"]), int(it.attrib["typeID"])
            if tid not in CAR_TYPES:
                continue
            color_pt = COLOR_MAP.get(VERI_COLOR_ID.get(cid,"unknown"), "outros")
            if color_pt not in ALLOWED_10:
                continue
            stats["cars"] += 1
            stats["colors"][color_pt] += 1
            src = d / it.attrib["imageName"]
            dst_dir = veri_out / split / color_pt
            # cópia segura com retomada
            safe_copy(src, dst_dir, "veri")  # não sobrescreve; cria sufixo se colidir [2]
    return stats  # atende formato de pastas por classe adotado no Ultralytics classify [1]

def process_vcor(vcor_root: Path, out_root: Path) -> dict:
    """Padroniza nomes de cor do VCoR e copia apenas 10 cores para pastas por classe em train/val/test."""
    stats = {"total":0,"colors":defaultdict(int)}
    vcor_out = out_root / "vcor_processed"
    for split in ("train","val","test"):
        sdir = vcor_root/split
        if not sdir.exists():
            continue
        for cdir in sdir.iterdir():
            if not cdir.is_dir():
                continue
            cor = COLOR_MAP.get(cdir.name.lower(), cdir.name.lower())
            if cor not in ALLOWED_10:
                continue
            dst = vcor_out/split/cor
            # inventário existente para retomar
            existing = list_files(dst)
            for img in cdir.glob("*.*"):
                if img.suffix.lower() in (".jpg",".jpeg",".png",".bmp"):
                    stats["total"] += 1
                    stats["colors"][cor] += 1
                    # se já existir um nome igual, safe_copy criará sufixo
                    safe_copy(img, dst, "vcor")  # idempotente em reexecução [2]
    return stats

def merge_10(veri_proc: Path, vcor_proc: Path, out_final: Path) -> dict:
    """Une VeRi (train/test/query->train) + VCoR (train/val/test) nas 10 cores, sem limite e sem sobrescrever."""
    final_stats = defaultdict(lambda: defaultdict(int))
    pools = defaultdict(lambda: defaultdict(list))

    # Mapear VeRi tudo -> train; VCoR: manter splits
    for base, split_map, tag in [
        (veri_proc, {"train":"train","test":"train","query":"train"}, "veri"),
        (vcor_proc, {"train":"train","val":"val","test":"test"}, "vcor")
    ]:
        if not base.exists():
            continue
        for sdir in base.iterdir():
            if not sdir.is_dir() or sdir.name not in split_map:
                continue
            dst_split = split_map[sdir.name]
            for cdir in sdir.iterdir():
                if cdir.is_dir() and cdir.name in ALLOWED_10:
                    pools[dst_split][cdir.name].extend(cdir.glob("*.*"))

    # Copiar todos (sem limite), com verificação de existentes
    for split, cmap in pools.items():
        for color, imgs in cmap.items():
            dst = out_final/split/color
            ensure_dir(dst)
            existing = list_files(dst)
            idx = len(existing)  # continua numeração
            for src in imgs:
                name = f"{color}_{idx:06d}{src.suffix}"
                idx += 1
                dst_path = dst / name
                if dst_path.name in existing:
                    continue  # já existe
                shutil.copy2(src, dst_path)
                final_stats[split][color] += 1

    names = sorted(ALLOWED_10)
    ensure_dir(out_final)
    (out_final/"data.yaml").write_text(
        f"path: {out_final.resolve()}\ntrain: train\nval: val\n"
        + ("test: test\n" if (out_final/'test').exists() else "")
        + f"\nnc: {len(names)}\nnames: {names}\n", encoding="utf-8"
    )  # estrutura esperada pelo yolo classify [1]
    save_json(final_stats, out_final/"dataset_stats.json")
    return {"classes": names, "final_stats": final_stats}

def run(veri_path: str, vcor_path: str, out_path: str):
    veri_root, vcor_root, out_root = Path(veri_path), Path(vcor_path), Path(out_path)
    # Verificações iniciais de existência
    assert veri_root.exists(), f"VeRi não encontrado: {veri_root}"  # checagem básica com Path.exists [2]
    assert vcor_root.exists(), f"VCoR não encontrado: {vcor_root}"  # idem [2]
    ensure_dir(out_root)

    print("-> VeRi"); veri_stats = process_veri(veri_root, out_root)
    print("-> VCoR"); vcor_stats = process_vcor(vcor_root, out_root)

    # Resumos intermediários (inventário)
    save_json(summarize_tree(out_root/"veri_processed"), out_root/"veri_processed_manifest.json")
    save_json(summarize_tree(out_root/"vcor_processed"), out_root/"vcor_processed_manifest.json")

    # Merge final
    final_dir = out_root/"COLOR_FINAL_YOLO"
    info = merge_10(out_root/"veri_processed", out_root/"vcor_processed", final_dir)
    save_json(summarize_tree(final_dir), final_dir/"final_manifest.json")

    print(f"OK VeRi cars: {veri_stats['cars']} | VCoR imgs: {vcor_stats['total']} | Classes: {info['classes']} | Final: {final_dir}")

if __name__ == "__main__":
    run(
        r"C:\Users\riana\OneDrive\Desktop\Vox MVP\data\VeRi",
        r"C:\Users\riana\OneDrive\Desktop\Vox MVP\data\VCor",
        r"C:\Users\riana\OneDrive\Desktop\Vox MVP\data\PROCESSED"
    )


-> VeRi
-> VCoR
OK VeRi cars: 43939 | VCoR imgs: 7746 | Classes: ['amarelo', 'azul', 'branco', 'cinza_prata', 'dourado', 'laranja', 'marrom', 'preto', 'verde', 'vermelho'] | Final: C:\Users\riana\OneDrive\Desktop\Vox MVP\data\PROCESSED\COLOR_FINAL_YOLO


In [2]:
# UA-DETRAC -> YOLO (1 classe: car) para PROCESSED/UA_DETRAC_CAR
# - Foco total na classe "car" (ID original informável via CAR_ID_OVERRIDE)
# - Mantém apenas "car" e remapeia para classe 0 (zero-based, contínua)
# - Verifica/normaliza xywh para [0,1] e descarta boxes degenerados
# - Copia imagens por hardlink (quando possível) + fallback copy2
# - Aceita múltiplas extensões (.jpg, .jpeg, .png, .bmp)
# - Gera detrac_car.yaml no padrão Ultralytics e manifest JSON
# - Barras de progresso com tqdm

from pathlib import Path
from collections import Counter
import json
import shutil
import os
import cv2
from tqdm import tqdm

# CONFIG: ajuste as pastas de origem/destino
SRC_ROOT = Path(r"C:\Users\riana\OneDrive\Desktop\Vox MVP\data\UA-DETRAC\DETRAC_Upload")
DST_ROOT = Path(r"C:\Users\riana\OneDrive\Desktop\Vox MVP\data\PROCESSED\UA_DETRAC_CAR")

SRC_IM = {"train": SRC_ROOT/"images"/"train", "val": SRC_ROOT/"images"/"val"}
SRC_LB = {"train": SRC_ROOT/"labels"/"train", "val": SRC_ROOT/"labels"/"val"}

DST_IM = {"train": DST_ROOT/"images"/"train", "val": DST_ROOT/"images"/"val"}
DST_LB = {"train": DST_ROOT/"labels"/"train", "val": DST_ROOT/"labels"/"val"}

# Se souber o ID de 'car' nos rótulos originais, defina aqui (ex.: 3);
# caso contrário, deixe None para detecção automática pela classe mais frequente.
CAR_ID_OVERRIDE = 3  # defina 3 se sua taxonomia original for aquela lista com "car"=3

EXTS = (".jpg", ".jpeg", ".png", ".bmp")  # suportadas pelo loader do YOLO

def ensure_dir(p: Path):
    p.mkdir(parents=True, exist_ok=True)

def read_label_file(p: Path):
    if not p.exists():
        return []
    rows = []
    for line in p.read_text(encoding="utf-8").splitlines():
        parts = line.strip().split()
        if len(parts) == 5:
            try:
                c = int(parts[0]); x,y,w,h = map(float, parts[1:])
                rows.append([c,x,y,w,h])
            except Exception:
                pass
    return rows

def is_normalized_xywh(rows):
    for _, x, y, w, h in rows:
        if max(x, y, w, h) > 1.5:
            return False
    return True

def clamp01(v): 
    return max(0.0, min(1.0, v))

def xyxy_to_xywh_norm(x1, y1, x2, y2, W, H):
    xc = ((x1 + x2) / 2.0) / W
    yc = ((y1 + y2) / 2.0) / H
    ww = (abs(x2 - x1)) / W
    hh = (abs(y2 - y1)) / H
    return clamp01(xc), clamp01(yc), clamp01(ww), clamp01(hh)

# Normalização com W,H já conhecidos
def normalize_rows(rows, W, H):
    if not rows:
        return []
    if is_normalized_xywh(rows):
        return [[c, clamp01(x), clamp01(y), clamp01(w), clamp01(h)] for c,x,y,w,h in rows]
    nr = []
    for c, a, b, c3, d in rows:
        if c3 > 1.5 and d > 1.5:  # assume xywh em pixels
            xc = a / W; yc = b / H; ww = c3 / W; hh = d / H
        else:  # assume xyxy em pixels
            x1, y1, x2, y2 = a, b, c3, d
            xc, yc, ww, hh = xyxy_to_xywh_norm(x1, y1, x2, y2, W, H)
        nr.append([c, clamp01(xc), clamp01(yc), clamp01(ww), clamp01(hh)])
    return nr

def discover_car_id():
    cls_freq = Counter()
    all_labels = list(SRC_LB["train"].glob("*.txt")) + list(SRC_LB["val"].glob("*.txt"))
    for txt in tqdm(all_labels, desc="Descobrindo ID de 'car'"):
        for r in read_label_file(txt):
            cls_freq[r[0]] += 1
    if not cls_freq:
        return 0, {}
    car_id = cls_freq.most_common(1)[0][0]  # 'car' tende a ser o mais frequente
    return car_id, dict(cls_freq)

def copy_image(src_img: Path, dst_img: Path):
    ensure_dir(dst_img.parent)
    if dst_img.exists():
        return
    try:
        os.link(src_img, dst_img)  # hardlink (mesmo volume)
    except Exception:
        shutil.copy2(src_img, dst_img)  # fallback

def write_yaml(dst_root: Path):
    yaml = (
        f"path: {dst_root.resolve()}\n"
        f"train: images/train\n"
        f"val: images/val\n"
        f"\nnc: 1\n"
        f"names: [car]\n"
    )
    (dst_root/"detrac_car.yaml").write_text(yaml, encoding="utf-8")

def collect_images(images_dir: Path):
    paths = []
    for ext in EXTS:
        paths.extend(images_dir.rglob(f"*{ext}"))
    return sorted(paths)

def process_split(split, car_id: int, summary: dict):
    ensure_dir(DST_IM[split]); ensure_dir(DST_LB[split])
    stats = {"images": 0, "labels_in": 0, "boxes_total": 0, "boxes_kept": 0, "empties": 0}
    img_paths = collect_images(SRC_IM[split])

    # Cache de dimensões por pasta absoluta para evitar colisões
    dim_cache = {}

    for src_img in tqdm(img_paths, desc=f"Processando split '{split}'"):
        stem = src_img.stem
        src_lbl = SRC_LB[split] / f"{stem}.txt"
        
        rows = read_label_file(src_lbl)
        if rows:
            stats["labels_in"] += 1
        stats["boxes_total"] += len(rows)

        # Copia imagem (YOLO ignora imagens sem .txt)
        dst_img = DST_IM[split] / src_img.name
        copy_image(src_img, dst_img)
        stats["images"] += 1

        # Manter apenas car_id
        car_rows = [r for r in rows if int(r[0]) == car_id]
        if not car_rows:
            # garante que não haja .txt antigo no destino
            dst_lbl = DST_LB[split] / f"{stem}.txt"
            if dst_lbl.exists():
                dst_lbl.unlink()
            stats["empties"] += 1
            continue

        # Dimensões por cache
        seq_key = src_img.parent.as_posix()
        if seq_key not in dim_cache:
            img = cv2.imread(str(src_img))
            H, W = img.shape[:2]
            dim_cache[seq_key] = (W, H)
        else:
            W, H = dim_cache[seq_key]

        # Normalizar
        normalized = normalize_rows(car_rows, W, H)

        # Descartar boxes degenerados (área muito pequena / fora de [0,1] antes do clamp final)
        filtered = []
        for _, x, y, w, h in normalized:
            if 0.0 <= x <= 1.0 and 0.0 <= y <= 1.0 and 0.0 < w <= 1.0 and 0.0 < h <= 1.0 and (w * h) >= 1e-6:
                filtered.append((x, y, w, h))

        stats["boxes_kept"] += len(filtered)

        dst_lbl = DST_LB[split] / f"{stem}.txt"
        if filtered:
            with dst_lbl.open("w", encoding="utf-8") as f:
                for x, y, w, h in filtered:
                    f.write(f"0 {x:.6f} {y:.6f} {w:.6f} {h:.6f}\n")
        else:
            if dst_lbl.exists():
                dst_lbl.unlink()
            stats["empties"] += 1

    summary["splits"][split] = stats

def main():
    # Limpar destino para exportação consistente
    if DST_ROOT.exists():
        print(f"Pasta de destino {DST_ROOT} já existe. Limpando...")
        shutil.rmtree(DST_ROOT)
    ensure_dir(DST_ROOT)

    # Definir ou descobrir CAR_ID
    if CAR_ID_OVERRIDE is not None:
        car_id, freq = CAR_ID_OVERRIDE, {}
        print(f"Usando ID de 'car' pré-definido: {car_id}")
    else:
        print("Buscando ID da classe mais comum ('car')...")
        car_id, freq = discover_car_id()

    summary = {"car_id_used": car_id, "source_class_frequencies": freq, "splits": {}}

    # Processar splits
    for split in ("train", "val"):
        process_split(split, car_id, summary)

    # YAML e manifestos
    write_yaml(DST_ROOT)
    manifest_path = DST_ROOT / "ua_detrac_preprocess_manifest.json"
    manifest_path.write_text(json.dumps(summary, indent=2), encoding="utf-8")

    print("\n" + "="*50)
    print("Resumo do Processamento:")
    print(json.dumps(summary, indent=2))
    print(f"\n✅ Dataset pronto em: {DST_ROOT.resolve()}")
    print("="*50)

if __name__ == "__main__":
    main()


Pasta de destino C:\Users\riana\OneDrive\Desktop\Vox MVP\data\PROCESSED\UA_DETRAC_CAR já existe. Limpando...
Usando ID de 'car' pré-definido: 3


Processando split 'train': 100%|██████████| 83791/83791 [12:23<00:00, 112.72it/s] 
Processando split 'val': 100%|██████████| 56340/56340 [08:28<00:00, 110.83it/s]


Resumo do Processamento:
{
  "car_id_used": 3,
  "source_class_frequencies": {},
  "splits": {
    "train": {
      "images": 83791,
      "labels_in": 82085,
      "boxes_total": 598281,
      "boxes_kept": 33651,
      "empties": 58161
    },
    "val": {
      "images": 56340,
      "labels_in": 56167,
      "boxes_total": 675774,
      "boxes_kept": 71785,
      "empties": 25418
    }
  }
}

✅ Dataset pronto em: C:\Users\riana\OneDrive\Desktop\Vox MVP\data\PROCESSED\UA_DETRAC_CAR





UA-DETRAC → Detecção + Tracking de veículos

     ↓

VCoR + VeRi → Treinamento classificador de cores  

     ↓
     
Integração → Sistema completo
