Imports

In [1]:
import os
import re
import json
import datetime
import torch
from typing import List, Tuple

from PIL import Image
import numpy as np

# Detector: ultralytics YOLO
from ultralytics import YOLO

# BLIP captioning do Hugging Face / transformers
from transformers import BlipForConditionalGeneration, BlipProcessor

# CLIP (OpenAI)
from transformers import CLIPProcessor, CLIPModel

  from .autonotebook import tqdm as notebook_tqdm


Configurações

In [None]:
'''INPUT_FRAMES_FOLDER = "/home/gotoxico/BD2-Trabalho/projetoBD2/Frames" #Caminho da pasta com os frames extraídos dos vídeos em VideoResizeAndFrameExtractionPrincipal.ipynb, alterar para sua pasta
OUTPUT_CROPS_FOLDER = "/home/gotoxico/BD2-Trabalho/projetoBD2/CroppedPersons2" #Caminho da pasta para salvar os crops dos indivíduos/grupos, alterar para sua pasta            
OUTPUT_RECORDS_FOLDER = "/home/gotoxico/BD2-Trabalho/projetoBD2/Records2" #Caminho da pasta para salvar os records únicos para cada ID, alterar para sua pasta  '''

INPUT_FRAMES_FOLDER = "../HighResImage" 
OUTPUT_CROPS_FOLDER = "CroppedPersonsHighRes"
OUTPUT_RECORDS_FOLDER = "RecordsHighRes"

YOLO_WEIGHTS = "../yolo11n.pt" #Idealmente utilizar a versão mais recente e a "n", pois é mais leve (precisa baixar no site da Ultralytics)
YOLO_CONF = 0.35                           
CAMERA_ID = 1

IOU_MATCH_THRESHOLD = 0.3                 
MIN_BOX_AREA = 400                         
ATTR_TOP_K = 2
ATTR_CONF_THRESHOLD = 0.05

# BLIP model (Hugging Face)
BLIP_MODEL = "Salesforce/blip-image-captioning-base"   #ou "Salesforce/blip-image-captioning-large", mais pesado
CAPTION_MAX_TOKENS = 160
CAPTION_MAX_WORDS = 80

FRAME_LIMIT = 1000 #Limitar número de frames processados (Bom para testes)

IMAGE_EXTS = (".jpg", ".jpeg")

Funções de apoio

In [3]:
#Apenas para make sure diretório existe
def ensure_dir(path: str):
    os.makedirs(path, exist_ok=True)

#Para listar files com frames dentro de folder
def list_frame_files(input_folder: str):
    files = []
    for root, _, filenames in os.walk(input_folder):
        for f in sorted(filenames):
            if os.path.splitext(f)[1].lower() in IMAGE_EXTS:
                files.append(os.path.join(root, f))
    return files

#Capturar timestamp do filename frame
def parse_timestamp_from_filename(fname: str):
    base = os.path.basename(fname)
    m = re.search(r"(\d{4})[-_]?(\d{2})[-_]?(\d{2})[_T\-]?(\d{2})[:_]?(\d{2})[:_]?(\d{2})", base)
    if m:
        year, mon, day, h, mn, s = m.groups()
        try:
            dt = datetime.datetime(int(year), int(mon), int(day), int(h), int(mn), int(s))
            return dt.isoformat()
        except Exception:
            pass
    m2 = re.search(r"(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})", base)
    if m2:
        return m2.group(1)
    ts = os.path.getmtime(fname)
    return datetime.datetime.fromtimestamp(ts).isoformat()

#Encurtar descrição baseado em MAX_WORDS
def shorten_text(text: str, max_words: int = CAPTION_MAX_WORDS):
    if not text:
        return ""
    text = text.strip()
    first = re.split(r'[.?!]\s*', text)[0].strip()
    words = first.split()
    if len(words) <= max_words:
        return first
    return " ".join(words[:max_words])

# Calcula o IoU (Intersection over Union) entre dois bounding boxes.
# Retorna um valor entre 0 e 1 que indica a sobreposição das caixas.
def iou(boxA, boxB):
    xA = max(boxA[0], boxB[0])
    yA = max(boxA[1], boxB[1])
    xB = min(boxA[2], boxB[2])
    yB = min(boxA[3], boxB[3])
    interW = max(0, xB - xA)
    interH = max(0, yB - yA)
    interArea = interW * interH
    if interArea == 0:
        return 0.0
    boxAArea = max(1.0, (boxA[2] - boxA[0]) * (boxA[3] - boxA[1]))
    boxBArea = max(1.0, (boxB[2] - boxB[0]) * (boxB[3] - boxB[1]))
    return interArea / float(boxAArea + boxBArea - interArea)

Tracker baseado em IoU para dar ID único para indivíduos/grupos

In [4]:
class SimpleTracker:
    """
    Rastreador simples baseado em IoU (Intersection over Union).

    A cada frame:
      - Associa detecções novas a objetos existentes com base no IoU.
      - Atribui IDs únicos a novos objetos.
      - Remove objetos que sumiram há muitos frames.
    """

    def __init__(self, iou_threshold=0.3, max_lost=30):
        self.next_id = 1
        self.tracks = {}
        self.iou_threshold = iou_threshold
        self.max_lost = max_lost

    """
    Atualiza os rastros com as detecções do frame atual.
    Retorna uma lista de tuplas (track_id, bounding_box).
    """
    def update(self, detections: List[Tuple[float, float, float, float]], frame_idx: int):
        assignments = []
        unmatched_dets = set(range(len(detections)))
        for tid, info in list(self.tracks.items()):
            best_i = -1
            best_iou = 0.0
            for di in list(unmatched_dets):
                val = iou(info['box'], detections[di])
                if val > best_iou:
                    best_iou = val
                    best_i = di
            if best_i != -1 and best_iou >= self.iou_threshold:
                self.tracks[tid]['box'] = detections[best_i]
                self.tracks[tid]['last_seen'] = frame_idx
                self.tracks[tid]['lost'] = 0
                assignments.append((tid, detections[best_i]))
                unmatched_dets.remove(best_i)
            else:
                self.tracks[tid]['lost'] += 1
        to_delete = [tid for tid, info in self.tracks.items() if info['lost'] > self.max_lost]
        for tid in to_delete:
            del self.tracks[tid]
        for di in sorted(unmatched_dets):
            tid = self.next_id
            self.next_id += 1
            self.tracks[tid] = {'box': detections[di], 'last_seen': frame_idx, 'lost': 0}
            assignments.append((tid, detections[di]))
        return assignments

Detector de indivíduos/grupos baseado em YOLO

In [5]:
class Detector:
    def __init__(self, weights: str = None, conf: float = 0.25, device: str = "cpu"):
        self.device = device
        if weights:
            self.model = YOLO(weights)
        else:
            self.model = YOLO("yolov8n")
        self.conf = conf

    """
    Detecta todos os objetos que YOLO foi treinado para detectar e cria registros.
    """
    def detect(self, image: np.ndarray):
        res = self.model.predict(image, imgsz=640, conf=self.conf, verbose=False, device=self.device)
        results = res[0]
        detections = []
        if getattr(results, 'boxes', None) is not None:
            boxes = results.boxes.xyxy.cpu().numpy()
            scores = results.boxes.conf.cpu().numpy()
            cls = results.boxes.cls.cpu().numpy().astype(int)
            names = getattr(self.model, "names", None)
            for b, s, c in zip(boxes, scores, cls):
                detections.append({
                    'bbox': [float(b[0]), float(b[1]), float(b[2]), float(b[3])],
                    'confidence': float(s),
                    'class_id': int(c),
                    'class_name': names[int(c)] if names is not None and int(c) in names else str(int(c))
                })
        return detections

BLIP é o módulo open-source para criar captions/descrições de acordo com imagens

In [6]:
class BLIPCaptioner:
    def __init__(self, model_name=BLIP_MODEL, device="cpu"):
        self.device = device
        self.processor = BlipProcessor.from_pretrained(model_name)
        self.model = BlipForConditionalGeneration.from_pretrained(model_name).to(self.device)
        self.model.eval()

    #Cria caption/descrição
    def caption(self, pil_image: Image.Image, max_length=CAPTION_MAX_TOKENS):
        inputs = self.processor(images=pil_image, return_tensors="pt").to(self.device)
        with torch.no_grad():
            out = self.model.generate(**inputs, max_new_tokens=max_length, num_beams=5, early_stopping=True)
        text = self.processor.decode(out[0], skip_special_tokens=True)
        return shorten_text(text, max_words=CAPTION_MAX_WORDS)

CLIP é o módulo open-source (apesar de ser da openAI) responsável por de acordo com atributos/características atribuir à imagem segundo um fator de confiança

In [7]:
class CLIPAttributeMatcher:
    def __init__(self, device="cpu"):
        self.device = device
        self.model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32").to(self.device)
        self.processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")

        self.hair_colors = ["blond hair", "brown hair", "black hair", "gray hair", "red hair", "dark hair", "light hair"]
        self.clothing_items = ["t-shirt", "shirt", "jacket", "hoodie", "dress", "coat", "suit", "skirt", "pants", "shorts"]
        self.clothing_colors = [
            "red shirt", "blue shirt", "black shirt", "white shirt", "green shirt",
            "red jacket", "blue jacket", "black jacket", "white jacket", "green jacket",
            "gray sweater", "striped shirt", "plaid shirt"
        ]
        self.accessories = ["wearing glasses", "wearing sunglasses", "wearing a hat", "backpack", "holding a bag"]

    #Dar score de confiança para os textos
    def _score_texts(self, pil_image: Image.Image, texts: List[str]):
        inputs = self.processor(text=texts, images=pil_image, return_tensors="pt", padding=True).to(self.device)
        with __import__('torch').no_grad():
            outputs = self.model(**inputs)
            logits_per_image = outputs.logits_per_image
            probs = logits_per_image.softmax(dim=1).cpu().numpy().flatten()
        return probs.tolist()

    #Retornar os com maior confiança
    def _top_filtered(self, pil_image: Image.Image, texts: List[str], top_k=2, min_conf=ATTR_CONF_THRESHOLD):
        probs = self._score_texts(pil_image, texts)
        idxs = sorted(range(len(probs)), key=lambda i: probs[i], reverse=True)[:top_k]
        return [{"label": texts[i], "score": float(probs[i])} for i in idxs if probs[i] >= min_conf]

    #Retornar os atributos com maior confiança para todas as categorias
    def get_attributes(self, pil_image: Image.Image, top_k=2):
        attrs = {}
        attrs['hair'] = self._top_filtered(pil_image, self.hair_colors, top_k=top_k)
        attrs['clothing_items'] = self._top_filtered(pil_image, self.clothing_items, top_k=top_k)
        attrs['clothing_colors'] = self._top_filtered(pil_image, self.clothing_colors, top_k=top_k)
        attrs['accessories'] = self._top_filtered(pil_image, self.accessories, top_k=top_k)
        return attrs

Pipeline principal unindo quase tudo, menos extração de frames de vídeos

In [8]:
def run_pipeline():
    ensure_dir(OUTPUT_CROPS_FOLDER)
    ensure_dir(OUTPUT_RECORDS_FOLDER)

    #Tentar utilizar GPU CUDA, se tiver
    try:
        import torch
        device = "cuda" if torch.cuda.is_available() else "cpu"
    except Exception:
        device = "cpu"
    print(f"Using device: {device}")

    #Inicializando módulos
    detector = Detector(weights=YOLO_WEIGHTS, conf=YOLO_CONF, device=device)
    captioner = BLIPCaptioner(model_name=BLIP_MODEL, device=device)
    attr_matcher = CLIPAttributeMatcher(device=device)
    tracker = SimpleTracker(iou_threshold=IOU_MATCH_THRESHOLD, max_lost=30)

    frame_files = list_frame_files(INPUT_FRAMES_FOLDER)
    #Para limitar quantidade de frames processados (útil para testes rápidos)
    if FRAME_LIMIT:
        frame_files = frame_files[:FRAME_LIMIT]
    if not frame_files:
        print("Frames não foram encontrados em ", INPUT_FRAMES_FOLDER)
        return

    print(f"Encontrou {len(frame_files)} frames, processando...")



    frame_idx = 0
    track_records = {}
    for frame_path in frame_files:
        frame_idx += 1
        pil = Image.open(frame_path).convert("RGB")
        img_np = np.array(pil)

        dets = detector.detect(img_np)

        #Utilizando apenas bounding boxes com pessoas
        person_dets_xyxy = []
        for d in dets:
            name = d.get('class_name', '').lower()
            if name in ("person", "people", "human") or int(d.get('class_id', -1)) == 0:
                x1, y1, x2, y2 = [int(round(v)) for v in d['bbox']]
                area = (x2 - x1) * (y2 - y1)
                if area >= MIN_BOX_AREA:
                    person_dets_xyxy.append([x1, y1, x2, y2])

        assignments = tracker.update(person_dets_xyxy, frame_idx)
        timestamp = parse_timestamp_from_filename(frame_path)
        frame_base = os.path.splitext(os.path.basename(frame_path))[0]

        for track_id, box in assignments:
            x1, y1, x2, y2 = [int(v) for v in box]
            crop = pil.crop((x1, y1, x2, y2))
            #Dizem que BLIP trabalha melhor com 384 x 384, por isto o resize
            crop_for_blip = crop.resize((384, 384), Image.LANCZOS)

            if track_id not in track_records:
                try:
                    caption = captioner.caption(crop_for_blip)
                except Exception as e:
                    caption = ""
                    print("Erro de geração de caption:", e)
                try:
                    attributes = attr_matcher.get_attributes(crop_for_blip, top_k=ATTR_TOP_K)
                except Exception as e:
                    attributes = {}
                    print("Erro de matching de atributo:", e)

                fname_ts = timestamp.replace(":", "").replace("-", "")
                #Apenas salvando 1 record para cada indivíduo/grupo detectado
                first_crop_fname = f"cam{CAMERA_ID}_trk{track_id}_{frame_base}_{fname_ts}_first.jpg"
                first_crop_path = os.path.join(OUTPUT_CROPS_FOLDER, first_crop_fname)
                try:
                    crop.save(first_crop_path, format="JPEG", quality=90)
                except Exception as e:
                    print("Falha ao salvar primeiro crop:", e)

                track_records[track_id] = {
                    "track_id": track_id,
                    "timestamp_start": timestamp,
                    "timestamp_end": timestamp,
                    "first_crop_path": first_crop_path,
                    "last_crop_path": first_crop_path,
                    "description": caption,
                    "attributes": attributes,
                    "bbox_start": {"x1": x1, "y1": y1, "x2": x2, "y2": y2},
                    "bbox_end": {"x1": x1, "y1": y1, "x2": x2, "y2": y2},
                }
            else:
                fname_ts = timestamp.replace(":", "").replace("-", "")
                last_crop_fname = f"cam{CAMERA_ID}_trk{track_id}_{frame_base}_{fname_ts}_last.jpg"
                last_crop_path = os.path.join(OUTPUT_CROPS_FOLDER, last_crop_fname)
                try:
                    crop.save(last_crop_path, format="JPEG", quality=90)
                except Exception as e:
                    print("Failed to save last crop:", e)

                track_records[track_id]["timestamp_end"] = timestamp
                track_records[track_id]["last_crop_path"] = last_crop_path
                track_records[track_id]["bbox_end"] = {"x1": x1, "y1": y1, "x2": x2, "y2": y2}

        #Simulando TQDM
        if frame_idx % 50 == 0:
            print(f"Processou {frame_idx}/{len(frame_files)} frames")

    #Salvar records
    for track_id, rec in track_records.items():
        record_fname = f"record_cam{CAMERA_ID}_trk{track_id}.json"
        record_path = os.path.join(OUTPUT_RECORDS_FOLDER, record_fname)
        try:
            with open(record_path, "w", encoding="utf-8") as fh:
                json.dump(rec, fh, ensure_ascii=False, indent=2)
        except Exception as e:
            print("Falhou escrita record:", e)

    print("Pronto. Crops pessoas salvo em:", OUTPUT_CROPS_FOLDER)
    print("Records salvo em:", OUTPUT_RECORDS_FOLDER)

Rodar main pipeline principal

In [9]:
if __name__ == "__main__":
    run_pipeline()

Using a slow image processor as `use_fast` is unset and a slow processor was saved with this model. `use_fast=True` will be the default behavior in v4.52, even if the model was saved with a slow processor. This will result in minor differences in outputs. You'll still be able to use a slow processor with `use_fast=False`.


Using device: cuda
Encontrou 1 frames, processando...
Pronto. Crops pessoas salvo em: CroppedPersonsHighRes
Records salvo em: RecordsHighRes


To support symlinks on Windows, you either need to activate Developer Mode or to run Python as an administrator. In order to activate developer mode, see this article: https://docs.microsoft.com/en-us/windows/apps/get-started/enable-your-device-for-development
