### Notebook: Tracking de mãos e instrumentos – Tarefa 3 OSS 2025

In [1]:
# -------------------------------------------------------------
# Instalar bibliotecas
# -------------------------------------------------------------
# %pip install -q ultralytics==8.1.0 opencv-python==4.9.0.80 torch

In [2]:
# -------------------------------------------------------------
# Importações e configuração geral
# -------------------------------------------------------------
import cv2
import torch
import json
import gc

from pathlib import Path
from ultralytics import YOLO

ROOT = Path.cwd() / 'Task3'
print(ROOT)
DATA_DIR = ROOT / 'data'
OUT_DIR = ROOT / 'outputs'
OUT_DIR.mkdir(exist_ok=True, parents=True)

# Thresholds
CONF_TRESHOLD = 0.35
IOU_TRESHOLD = 0.3

# YOLO Image Size
IMGSZ = 1920

# Frame Step
FRAME_STEP = 5

# Classes a considerar (mapeamento simples de COCO)
VALID_CLS = {0: "hand", 43: "instrument"}    # 0: person ~ mão, 43: scissors ~ instrumento

# Definir GPU como device
device = "cuda" if torch.cuda.is_available() else "cpu"
print("Dispositivo: " + device)

# Libertar memória da GPU
if torch.cuda.is_available():
    torch.cuda.empty_cache()
    torch.cuda.ipc_collect()

D:\Repos\AP-TPG-M2\Task3
Dispositivo: cuda


In [3]:
# -----------------------------------------------------------------
# Tracker simples por IoU em PyTorch puro
# -----------------------------------------------------------------
class SimpleIoUTracker:
    def __init__(self, iou_threshold=0.3, max_age=3):
        self.iou_threshold = iou_threshold
        self.max_age = max_age
        self.tracks = {}  # id: (bbox_tensor, age, label)
        self.next_id = 0

    def _iou(self, box1, box2):
        x1 = torch.max(box1[0], box2[0])
        y1 = torch.max(box1[1], box2[1])
        x2 = torch.min(box1[2], box2[2])
        y2 = torch.min(box1[3], box2[3])
        inter_area = torch.clamp(x2 - x1, min=0) * torch.clamp(y2 - y1, min=0)
        area1 = (box1[2] - box1[0]) * (box1[3] - box1[1])
        area2 = (box2[2] - box2[0]) * (box2[3] - box2[1])
        union_area = area1 + area2 - inter_area
        return inter_area / union_area

    def update(self, detections, labels=None):
        updated_tracks = {}
        assigned = set()

        if labels is None:
            labels = ["object"] * len(detections)

        for tid, (prev_bbox, age, label) in self.tracks.items():
            best_iou = self.iou_threshold
            best_det = None
            for i, det in enumerate(detections):
                if i in assigned:
                    continue
                iou = self._iou(prev_bbox, det)
                if iou > best_iou:
                    best_iou = iou
                    best_det = i
            if best_det is not None:
                updated_tracks[tid] = (detections[best_det], 0, labels[best_det])
                assigned.add(best_det)
            elif age + 1 <= self.max_age:
                updated_tracks[tid] = (prev_bbox, age + 1, label)

        for i, det in enumerate(detections):
            if i not in assigned:
                updated_tracks[self.next_id] = (det, 0, labels[i])
                self.next_id += 1

        self.tracks = updated_tracks
        return self.tracks


In [4]:
# -----------------------------------------------------------------
# Detecção + tracking de um vídeo; devolve caminho do JSON gerado
# -----------------------------------------------------------------
def process_video(video_path: Path, model, out_dir: Path, valid_cls={0, 43}, frame_step=30, conf_thresh=0.15, imgsz=1920) -> Path:
    video_id = video_path.stem
    cap = cv2.VideoCapture(str(video_path))
    if not cap.isOpened():
        raise RuntimeError(f"Erro ao abrir vídeo: {video_path}")

    tracker = SimpleIoUTracker(iou_threshold=IOU_TRESHOLD)
    frame_id = 0
    tracks = {}

    while True:
        ok, frame = cap.read()
        if not ok:
            break
        if frame_id % frame_step != 0:
            frame_id += 1
            continue

        detections_tensor = []
        detection_labels = []

        try:
            with torch.no_grad():
                res = model(frame, conf=conf_thresh, imgsz=imgsz)[0]
                boxes = res.boxes
                if boxes and boxes.xyxy.numel() > 0:
                    xyxy = boxes.xyxy.to("cpu")
                    confs = boxes.conf.to("cpu")
                    clss = boxes.cls.to("cpu")
                    for box, conf, cls in zip(xyxy, confs, clss):
                        cls_int = int(cls)
                        if cls_int in valid_cls and conf.item() >= conf_thresh:
                            detections_tensor.append(box)
                            detection_labels.append(VALID_CLS[cls_int])
        except Exception as e:
            print(f"[ERRO] YOLO falhou no frame {frame_id}: {e}")
            torch.cuda.empty_cache()
            gc.collect()
            frame_id += 1
            continue

        det_tensor = torch.stack(detections_tensor) if detections_tensor else torch.empty((0, 4), dtype=torch.float32)
        frame_tracks = tracker.update(det_tensor, labels=detection_labels)

        for tid, (bbox, _, label) in frame_tracks.items():
            x1, y1, x2, y2 = bbox.tolist()
            tracks.setdefault(str(frame_id), []).append({
                "id": int(tid),
                "bbox": [float(x1), float(y1), float(x2), float(y2)],
                "score": 1.0,
                "label": label
            })

        torch.cuda.empty_cache()
        gc.collect()
        frame_id += 1

    cap.release()
    out_path = out_dir / f"{video_id}.json"
    with open(out_path, "w") as fp:
        json.dump(tracks, fp, indent=2)
    print(f"[INFO] JSON gravado: {out_path}")
    return out_path


In [5]:
# -------------------------------------------------------------
# Detecção e segmentação com YOLOv8
# -------------------------------------------------------------
model = YOLO('yolov8n.pt')

json_paths = []
for mp4 in DATA_DIR.glob("*.mp4"):
    print(f"Processar {mp4.name}")
    json_paths.append(process_video(video_path=mp4, model=model, out_dir=OUT_DIR, valid_cls=VALID_CLS, frame_step=FRAME_STEP, conf_thresh=CONF_TRESHOLD, imgsz=IMGSZ))

Processar A31H.mp4

0: 1088x1920 1 person, 50.5ms
Speed: 15.1ms preprocess, 50.5ms inference, 51.6ms postprocess per image at shape (1, 3, 1088, 1920)

0: 1088x1920 1 person, 15.7ms
Speed: 15.7ms preprocess, 15.7ms inference, 0.0ms postprocess per image at shape (1, 3, 1088, 1920)

0: 1088x1920 1 person, 16.0ms
Speed: 13.8ms preprocess, 16.0ms inference, 0.0ms postprocess per image at shape (1, 3, 1088, 1920)

0: 1088x1920 1 person, 17.3ms
Speed: 21.7ms preprocess, 17.3ms inference, 4.0ms postprocess per image at shape (1, 3, 1088, 1920)

0: 1088x1920 1 person, 19.0ms
Speed: 9.8ms preprocess, 19.0ms inference, 1.4ms postprocess per image at shape (1, 3, 1088, 1920)

0: 1088x1920 1 person, 18.8ms
Speed: 10.0ms preprocess, 18.8ms inference, 1.1ms postprocess per image at shape (1, 3, 1088, 1920)

0: 1088x1920 1 person, 18.1ms
Speed: 9.9ms preprocess, 18.1ms inference, 1.0ms postprocess per image at shape (1, 3, 1088, 1920)

0: 1088x1920 1 person, 17.8ms
Speed: 12.1ms preprocess, 17.8ms i

In [6]:
# -------------------------------------------------------------
# Função para sobrepor caixas + IDs e gravar novo .mp4
# -------------------------------------------------------------
# -------------------------------------------------------------
# Função para sobrepor caixas + IDs e gravar novo .mp4
# -------------------------------------------------------------
def annotate_video(video_path: Path, json_path: Path, out_dir: Path) -> Path:
    vid_id   = video_path.stem
    out_path = out_dir / f"{vid_id}_tracked.mp4"

    with open(json_path) as fp:
        preds = json.load(fp)

    cap = cv2.VideoCapture(str(video_path))
    w, h = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps  = cap.get(cv2.CAP_PROP_FPS) or 30
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    writer = cv2.VideoWriter(str(out_path), fourcc, fps, (w, h))

    f_idx = 0
    while True:
        ok, frame = cap.read()
        if not ok:
            break

        # Desenhar caixas e etiquetas
        for obj in preds.get(str(f_idx), []):
            x1, y1, x2, y2 = map(int, obj['bbox'])
            tid = obj['id']
            label = obj.get('label', 'object')
            colour = ((tid * 37) % 256, (tid * 17) % 256, (tid * 97) % 256)

            # Desenhar rectângulo e texto com sombra
            cv2.rectangle(frame, (x1, y1), (x2, y2), colour, 2)
            text = f'{label} {tid}'
            (tw, th), _ = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)
            cv2.rectangle(frame, (x1, y1 - th - 4), (x1 + tw, y1), colour, -1)
            cv2.putText(frame, text, (x1, y1 - 2),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255,255,255), 1, cv2.LINE_AA)

        writer.write(frame)
        f_idx += 1

    cap.release()
    writer.release()
    print(f"Vídeo anotado: {out_path}")
    return out_path

In [7]:
# -------------------------------------------------------------
# Gerar vídeo anotado para cada par vídeo/JSON
# -------------------------------------------------------------
for mp4, js in zip(DATA_DIR.glob("*.mp4"), json_paths):
    annotate_video(mp4, js, OUT_DIR)

Vídeo anotado: D:\Repos\AP-TPG-M2\Task3\outputs\A31H_tracked.mp4
