In [1]:
# Instalação de dependências no Colab
!pip install mediapipe opencv-python tqdm deepface transformers sentencepiece --quiet


[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/10.3 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.0/10.3 MB[0m [31m35.2 MB/s[0m eta [36m0:00:01[0m[2K   [91m━━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━━━━━━━[0m [32m4.1/10.3 MB[0m [31m63.5 MB/s[0m eta [36m0:00:01[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m10.3/10.3 MB[0m [31m103.2 MB/s[0m eta [36m0:00:01[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m10.3/10.3 MB[0m [31m103.2 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m10.3/10.3 MB[0m [31m70.6 MB/s[0m eta [36m0:00:00[0m
[?25h[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/133.1 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m133.1/133.1 kB[0m [31m5.4 MB/s[0m eta [36m0:00:00[0m


In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
"""Analisador de vídeo com IA (faces, emoções e ações) totalmente em PT-BR.

- Detecta rostos em cada frame de um vídeo (MediaPipe Tasks).
- Analisa a emoção de cada rosto (DeepFace) e traduz para português.
- Gera uma legenda descritiva da cena (BLIP) e traduz EN->PT-BR.
- Atribui um identificador único por pessoa ao longo do vídeo (pessoa1, pessoa2, ...).
- Gera:
  - Um vídeo anotado com bounding boxes e textos.
  - Um CSV detalhado (frame a frame).
  - Um CSV de resumo com uma emoção e uma ação representativa por pessoa.
"""

import os
import time
import logging
from typing import Tuple, Optional

import cv2
import mediapipe as mp
from tqdm import tqdm
from deepface import DeepFace
from transformers import (
    BlipProcessor,
    BlipForConditionalGeneration,
    AutoTokenizer,
    AutoModelForSeq2SeqLM,
)
import unicodedata
import pandas as pd

# ---------------------------
# Configuração de logging
# ---------------------------
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
)
logger = logging.getLogger("video_emotion_analysis")

# Tipo auxiliar para bounding box de face (x, y, largura, altura)
FaceBox = Tuple[int, int, int, int]


FACE_MODEL_URL = "https://storage.googleapis.com/mediapipe-models/face_detector/blaze_face_short_range/float16/latest/blaze_face_short_range.tflite"
FACE_MODEL_PATH = "blaze_face_short_range.tflite"

def ensure_face_model_downloaded(model_path: str = FACE_MODEL_PATH) -> str:
    """Garante que o modelo de detecção facial do MediaPipe esteja disponível localmente.

    Faz o download automático do modelo se ele ainda não existir no diretório atual.
    """
    if not os.path.exists(model_path):
        import urllib.request
        logger.info("Baixando modelo de detecção facial do MediaPipe...")
        try:
            urllib.request.urlretrieve(FACE_MODEL_URL, model_path)
            logger.info(f"Modelo salvo em: {model_path}")
        except Exception as e:
            logger.error(f"Falha ao baixar o modelo de face do MediaPipe: {e}")
            raise
    return model_path

# # ---------------------------
# # Inicialização de modelos
# # ---------------------------
# def init_mediapipe_face_detector(conf=0.5):
#     mp_face_detection = mp.solutions.face_detection
#     face_det = mp_face_detection.FaceDetection(min_detection_confidence=conf)
#     return face_det

def init_mediapipe_face_detector(conf=0.5):
    """Inicializa o detector facial usando a API moderna `MediaPipe Tasks`.

    Compatível com as versões mais recentes do pacote `mediapipe` no Colab.
    """
    # Garante o modelo local
    model_path = ensure_face_model_downloaded()

    # Atalhos das classes da API de Tasks
    BaseOptions = mp.tasks.BaseOptions
    FaceDetector = mp.tasks.vision.FaceDetector
    FaceDetectorOptions = mp.tasks.vision.FaceDetectorOptions
    VisionRunningMode = mp.tasks.vision.RunningMode

    options = FaceDetectorOptions(
        base_options=BaseOptions(model_asset_path=model_path),
        running_mode=VisionRunningMode.IMAGE,  # processa frame a frame
        min_detection_confidence=conf,
    )
    detector = FaceDetector.create_from_options(options)
    return detector

def init_blip_colab(device="cpu"):
    try:
        processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base")
        model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-base")
        if device != "cpu":
            model.to(device)
        return processor, model
    except Exception as e:
        logger.warning("BLIP não pôde ser carregado. Caption desativado. Erro: %s", e)
        return None, None

# ---------------------------
# Emoções com DeepFace
# ---------------------------
def analyze_emotion(face_img):
    """Analisa emoções com DeepFace e normaliza o retorno.

    Retorno normalizado:
    {
        "dominant_emotion": <str | None>,
        "emotion": <dict de scores>
    }
    """
    try:
        result = DeepFace.analyze(
            face_img,
            actions=['emotion'],
            enforce_detection=False
        )

        # DeepFace pode retornar uma lista (uma entrada por face)
        if isinstance(result, list):
            if len(result) == 0:
                return {"dominant_emotion": None, "emotion": {}}
            result = result[0]

        dominant = result.get("dominant_emotion")
        emotions = result.get("emotion", {})

        return {
            "dominant_emotion": dominant,
            "emotion": emotions
        }
    except Exception as e:
        logger.warning(f"Erro na análise de emoção com DeepFace: {e}")
        return {"dominant_emotion": None, "emotion": {}}

# ---------------------------
# Tradução de emoções para PT-BR
# ---------------------------
EMOTION_PT = {
    "angry": "Raiva",
    "disgust": "Nojo",
    "fear": "Medo",
    "happy": "Alegre",
    "sad": "Triste",
    "surprise": "Surpreso",
    "neutral": "Neutro",
}

def translate_emotion(emotion: Optional[str]) -> str:
    """Traduz o rótulo de emoção do DeepFace para PT-BR.

    Se não houver emoção, retorna "N/A".
    """
    if not emotion:
        return "N/A"
    # Garante string e aplica lower para mapear no dicionário
    key = str(emotion).lower()
    return EMOTION_PT.get(key, str(emotion))


# ---------------------------
# Detector MediaPipe
# ---------------------------
# def detect_faces_mediapipe(frame_rgb, detector):
#     h, w, _ = frame_rgb.shape
#     results = detector.process(frame_rgb)
#     faces = []

#     if results.detections:
#         for det in results.detections:
#             box = det.location_data.relative_bounding_box
#             x = int(box.xmin * w)
#             y = int(box.ymin * h)
#             bw = int(box.width * w)
#             bh = int(box.height * h)
#             x = max(0, x); y = max(0, y)
#             faces.append({"bbox": (x, y, bw, bh), "score": float(det.score[0])})
#     return faces
def detect_faces_mediapipe(frame_rgb, detector):
    """Detecta faces em um frame RGB usando MediaPipe Tasks.

    Retorna uma lista de dicionários:
        { "bbox": (x, y, w, h), "score": float }
    em coordenadas de pixels.
    """
    h, w, _ = frame_rgb.shape
    faces = []

    # Converte o frame (numpy array) para `mp.Image`
    try:
        mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=frame_rgb)
    except Exception as e:
        logger.error(f"Falha ao criar mp.Image para MediaPipe: {e}")
        return faces

    try:
        result = detector.detect(mp_image)
    except AttributeError:
        # Fallback: se por algum motivo o detector ainda for da API legada (mp.solutions),
        # mantém compatibilidade com o .process(...)
        results = detector.process(frame_rgb)
        if results.detections:
            for det in results.detections:
                box = det.location_data.relative_bounding_box
                x = int(box.xmin * w)
                y = int(box.ymin * h)
                bw = int(box.width * w)
                bh = int(box.height * h)
                x = max(0, x); y = max(0, y)
                faces.append({"bbox": (x, y, bw, bh), "score": float(det.score[0])})
        return faces

    # API nova (Tasks) -> FaceDetectorResult
    if not result.detections:
        return faces

    for det in result.detections:
        try:
            # bounding_box já vem em coordenadas de pixels
            box = det.bounding_box
            x = int(box.origin_x)
            y = int(box.origin_y)
            bw = int(box.width)
            bh = int(box.height)

            score = None
            if getattr(det, "categories", None):
                score = float(det.categories[0].score)
            elif getattr(det, "score", None):
                score = float(det.score[0])

            x = max(0, x); y = max(0, y)
            faces.append({"bbox": (x, y, bw, bh), "score": float(score) if score is not None else 0.0})
        except Exception as e:
            logger.warning(f"Falha ao extrair bounding box de uma detecção do MediaPipe: {e}")

    return faces

# ---------------------------
# Caption com BLIP
# ---------------------------
def generate_caption(frame_bgr, processor, model, device="cpu"):
    """Gera legenda em inglês usando BLIP."""
    if processor is None or model is None:
        return None
    try:
        import torch
        rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB)
        inputs = processor(images=rgb, return_tensors="pt").to(device)
        out = model.generate(**inputs)
        return processor.decode(out[0], skip_special_tokens=True)
    except Exception as e:
        logger.warning(f"Erro ao gerar legenda com BLIP: {e}")
        return None

# ---------------------------
#  Remover acentos e especiais
# ---------------------------
def remove_accents(texto):
    return ''.join(
        c for c in unicodedata.normalize('NFD', texto)
        if unicodedata.category(c) != 'Mn'
    )

def sem_acentos(txt: str) -> str:
    # NFKD separa letra do acento (ex: "ã" -> "a" + "~"); depois removemos os acentos (combining marks)
    txt = unicodedata.normalize("NFKD", txt)
    txt = "".join(ch for ch in txt if not unicodedata.combining(ch))
    # Alguns símbolos comuns que podem aparecer e não renderizam bem no Hershey:
    txt = (txt.replace("–", "-")
              .replace("—", "-")
              .replace("“", '"').replace("”", '"')
              .replace("’", "'").replace("…", "..."))
    return txt

# ---------------------------
# Tradução de legendas EN->PT-BR (offline HuggingFace)
# ---------------------------
def init_translator_en_pt(device: str = "cpu"):
    """Inicializa modelo unicamp-dl/translation-en-pt-t5 para tradução EN->PT.

    Usa apenas modelos locais da biblioteca transformers (sem API externa).
    """
    try:
        model_name = "unicamp-dl/translation-en-pt-t5"
        tokenizer = AutoTokenizer.from_pretrained(model_name)
        model = AutoModelForSeq2SeqLM.from_pretrained(model_name)
        if device != "cpu":
            model.to(device)
        return tokenizer, model
    except Exception as e:
        logger.warning(f"Erro ao carregar modelo de tradução EN->PT: {e}")
        return None, None


def translate_caption_pt(caption: Optional[str],
                         tokenizer,
                         model,
                         device: str = "cpu") -> Optional[str]:
    """Traduz a legenda gerada em inglês para PT-BR.

    Se não for possível traduzir, retorna a legenda original.
    """
    if not caption or tokenizer is None or model is None:
        return caption
    try:
        import torch
        inputs = tokenizer(caption, return_tensors="pt", truncation=True, max_length=64)
        if device != "cpu":
            inputs = {k: v.to(device) for k, v in inputs.items()}

        with torch.no_grad():
            outputs = model.generate(**inputs, max_length=64)
        pt_text = tokenizer.decode(outputs[0], skip_special_tokens=True)
        return pt_text
    except Exception as e:
        logger.warning(f"Erro ao traduzir legenda EN->PT: {e}")
        return caption


# ---------------------------
# Anotar frame
# ---------------------------
def draw_label(frame, bbox, label, score=None):
    x, y, w, h = bbox
    cv2.rectangle(frame, (x,y), (x+w, y+h), (0,255,0), 2)
    text = label if score is None else f"{label} ({score:.2f})"
    cv2.putText(frame, text, (x, y-8), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,255,0), 2)

# ---------------------------
# Pipeline principal
# ---------------------------
def process_video_colab(input_path, output_path, csv_path, skip_frames=0, conf=0.5):

    # Detecção facial + BLIP
    face_detector = init_mediapipe_face_detector(conf)
    processor, blip_model = init_blip_colab(device="cpu")

    # Tradutor EN->PT para legendas
    translator_tokenizer, translator_model = init_translator_en_pt(device="cpu")

    cap = cv2.VideoCapture(input_path)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    fps_in = cap.get(cv2.CAP_PROP_FPS) or 25
    w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    out_writer = cv2.VideoWriter(
        output_path,
        cv2.VideoWriter_fourcc(*'mp4v'),
        fps_in,
        (w, h)
    )

    csv_data = []
    start = time.time()

    pbar = tqdm(total=total_frames, desc="Processando vídeo")

    frame_idx = 0
    processed = 0
    person_counter = 0  # contador global de pessoas detectadas
    person_trackers = []  # rastreadores simples por proximidade (id, cx, cy)

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        if skip_frames and (frame_idx % (skip_frames + 1) != 0):
            frame_idx += 1
            pbar.update(1)
            continue

        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        faces = detect_faces_mediapipe(frame_rgb, face_detector)

        # Legenda em inglês via BLIP + tradução para PT-BR
        caption_en = generate_caption(frame, processor, blip_model)
        caption = translate_caption_pt(caption_en, translator_tokenizer, translator_model, device="cpu")

        if not faces:
            csv_data.append({
                "frame": frame_idx,
                "face_id": -1,
                "person_label": None,
                "emotion": None,
                "score": None,
                "caption": caption
            })

        for i, f in enumerate(faces):
            # Rastreamento simples: associa cada face à pessoa mais próxima (mesmo id ao longo do vídeo)
            x, y, w2, h2 = f["bbox"]
            cx = x + w2 / 2.0
            cy = y + h2 / 2.0

            best_id = None
            best_dist = 1e12
            best_idx = None
            for idx, t in enumerate(person_trackers):
                dx = cx - t["cx"]
                dy = cy - t["cy"]
                dist = dx*dx + dy*dy
                if dist < best_dist:
                    best_dist = dist
                    best_id = t["id"]
                    best_idx = idx

            # limiar de distância em pixels (quadrado)
            dist_thresh_sq = 80 * 80
            if best_id is not None and best_dist <= dist_thresh_sq:
                pid = best_id
                # atualiza posição do rastreador
                person_trackers[best_idx]["cx"] = cx
                person_trackers[best_idx]["cy"] = cy
            else:
                # nova pessoa
                person_counter += 1
                pid = person_counter
                person_trackers.append({"id": pid, "cx": cx, "cy": cy})

            person_label = f"pessoa{pid}"

            crop = frame[y:y+h2, x:x+w2]

            result = analyze_emotion(crop)
            emo = result.get("dominant_emotion")
            scores = result.get("emotion", {}) or {}

            # Fallback: se não vier dominant_emotion, escolhe a maior probabilidade
            if (not emo) and scores:
                try:
                    emo = max(scores, key=scores.get)
                except Exception as e:
                    logger.warning(f"Falha ao calcular emoção dominante a partir dos scores: {e}")
                    emo = None

            emo_score = scores.get(emo, None) if emo and scores else None
            emo_pt = translate_emotion(emo)

            # Rótulo no vídeo: pessoaX - Emoção
            label_text = f"{person_label} - {emo_pt or 'N/A'}"
            draw_label(frame, f["bbox"], label_text, emo_score)

            csv_data.append({
                "frame": frame_idx,
                "face_id": pid,
                "person_label": person_label,
                "emotion": emo_pt,
                "score": emo_score,
                "caption": caption
            })

        # Desenha a legenda (ação) em PT-BR no rodapé do frame, se existir
        if caption:
            try:
                # Faixa preta semi-cheia para melhorar legibilidade
                cv2.rectangle(frame, (10, h-40), (w-10, h-10), (0, 0, 0), -1)
                cv2.putText(
                    frame,
                    str(sem_acentos(caption))[:120],
                    (20, h-18),
                    cv2.FONT_HERSHEY_SIMPLEX,
                    0.6,
                    (255, 255, 255),
                    2
                )
            except Exception as e:
                logger.warning(f"Falha ao desenhar legenda no frame: {e}")

        out_writer.write(frame)

        frame_idx += 1
        processed += 1
        pbar.update(1)

    pbar.close()
    cap.release()
    out_writer.release()

    end = time.time()
    eff_fps = processed / (end - start)
    logger.info(f"Finalizado. {processed} frames processados. FPS efetivo: {eff_fps:.2f}")

    # Salvar CSV detalhado (emoções e ações já em PT-BR)
    import pandas as pd
    df = pd.DataFrame(csv_data)
    df.to_csv(csv_path, index=False)

    logger.info("CSV salvo em: " + csv_path)

    # Arquivo de resumo: totalizador de emoções e ações (somente PT-BR)
    try:
        resumo_path = csv_path.replace(".csv", "_resumo.csv")

        # Considera apenas linhas com pessoa identificada
        df_pessoas = df[df["person_label"].notna() & (df["person_label"] != "")].copy()

        # Para cada pessoa, escolhe UMA emoção representativa (mais frequente)
        df_emoc_valid = df_pessoas[df_pessoas["emotion"].notna() & (df_pessoas["emotion"] != "")]
        if not df_emoc_valid.empty:
            emoc_por_pessoa = (
                df_emoc_valid
                .groupby("person_label")["emotion"]
                .agg(lambda s: s.value_counts().idxmax())
            )
            emoc_counts = emoc_por_pessoa.value_counts().reset_index()
            emoc_counts.columns = ["item_pt_br", "total"]
            emoc_counts["tipo"] = "EMOCAO"
        else:
            emoc_counts = pd.DataFrame(columns=["item_pt_br", "total", "tipo"])

        # Para cada pessoa, escolhe UMA ação representativa (legenda mais frequente)
        df_actions_valid = df_pessoas[df_pessoas["caption"].notna() & (df_pessoas["caption"] != "")]
        if not df_actions_valid.empty:
            acao_por_pessoa = (
                df_actions_valid
                .groupby("person_label")["caption"]
                .agg(lambda s: s.value_counts().idxmax())
            )
            action_counts = acao_por_pessoa.value_counts().reset_index()
            action_counts.columns = ["item_pt_br", "total"]
            action_counts["tipo"] = "ACAO"
        else:
            action_counts = pd.DataFrame(columns=["item_pt_br", "total", "tipo"])

        resumo_df = pd.concat([emoc_counts, action_counts], ignore_index=True)
        # Reorganiza colunas
        if not resumo_df.empty:
            resumo_df = resumo_df[["tipo", "item_pt_br", "total"]]

        resumo_df.to_csv(resumo_path, index=False)
        logger.info("Resumo salvo em: " + resumo_path)
    except Exception as e:
        logger.warning(f"Falha ao gerar arquivo de resumo: {e}")

    logger.info("Vídeo anotado salvo em: " + output_path)


# -------------------------------------------------------------------
# Ponto de entrada (linha de comando)
# -------------------------------------------------------------------

26-01-08 18:35:51 - Directory /root/.deepface has been created
26-01-08 18:35:51 - Directory /root/.deepface/weights has been created




In [None]:
# Ajuste os caminhos abaixo conforme seu ambiente (pasta no Drive, etc.)
folder_path = '/content/drive/MyDrive/Tech Challenge 4/'
input_path = os.path.join(folder_path, '', 'Unlocking Facial Recognition_ Diverse Activities Analysis.mp4')
output_video = os.path.join(folder_path, 'output', 'output_video.mp4')
csv_output = os.path.join(folder_path, 'output', 'resultado.csv')

process_video_colab(
    input_path=input_path,
    output_path=output_video,
    csv_path=csv_output,
    skip_frames=0,  # coloque 2 ou 3 se quiser acelerar
    conf=0.5,
)
