# Tech Challenge Fase 4 - Video Analysis (Google Colab Version)
Este notebook foi gerado automaticamente para execução no Google Colab.
Ele contém todo o código fonte necessário dos módulos e a lógica de análise.

In [1]:
# Instalação das dependências
!pip install ultralytics fer opencv-python-headless matplotlib pandas opencv-python
# Instala ffmpeg para processamento de vídeo
!apt-get install ffmpeg -y


Defaulting to user installation because normal site-packages is not writeable
Collecting ultralytics
  Downloading ultralytics-8.3.249-py3-none-any.whl.metadata (37 kB)
Collecting fer
  Using cached fer-25.10.3-py3-none-any.whl.metadata (7.1 kB)
Collecting opencv-python-headless
  Downloading opencv_python_headless-4.12.0.88-cp37-abi3-manylinux2014_x86_64.manylinux_2_17_x86_64.whl.metadata (19 kB)
Collecting matplotlib
  Using cached matplotlib-3.9.4-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (11 kB)
Collecting opencv-python
  Using cached opencv_python-4.12.0.88-cp37-abi3-manylinux2014_x86_64.manylinux_2_17_x86_64.whl.metadata (19 kB)
Collecting pillow>=7.1.2 (from ultralytics)
  Using cached pillow-11.3.0-cp39-cp39-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl.metadata (9.0 kB)
Collecting torchvision>=0.9.0 (from ultralytics)
  Downloading torchvision-0.23.0-cp39-cp39-manylinux_2_28_x86_64.whl.metadata (6.1 kB)
Collecting polars>=0.20.0 (from ultralytics)
  U

In [2]:
# Montar Google Drive
from google.colab import drive
drive.mount('/content/drive')

print("Google Drive montado com sucesso!")


ModuleNotFoundError: No module named 'google.colab'

In [None]:
import os
import sys
from pathlib import Path

# Define diretório base no Colab
BASE_DIR = Path("/content/TC-4")
SRC_DIR = BASE_DIR / "src"
INPUT_DIR = BASE_DIR / "input"
OUTPUT_DIR = BASE_DIR / "output"
REPORTS_DIR = BASE_DIR / "reports"
MODELS_DIR = BASE_DIR / "models"

# Cria estrutura de pastas
for d in [SRC_DIR, INPUT_DIR, OUTPUT_DIR, REPORTS_DIR, MODELS_DIR]:
    d.mkdir(parents=True, exist_ok=True)

# Adiciona ao path para permitir importações
if str(BASE_DIR) not in sys.path:
    sys.path.append(str(BASE_DIR))

print(f"Ambiente configurado em: {BASE_DIR}")


## Upload do Vídeo
Você pode fazer upload do vídeo manualmente para a pasta `/content/TC-4/input` ou copiar do Google Drive.
Abaixo, um exemplo para copiar do Drive (ajuste o caminho de origem conforme necessário).

In [None]:
# Exemplo de cópia do Drive (descomente e ajuste se necessário)
# !cp "/content/drive/MyDrive/TechChallenge4/video_teste.mp4" "/content/TC-4/input/video_input.mp4"

# Verificação do arquivo
video_files = list(INPUT_DIR.glob("*.mp4"))
if video_files:
    VIDEO_PATH_FOUND = str(video_files[0])
    print(f"Vídeo encontrado: {VIDEO_PATH_FOUND}")
else:
    print("Nenhum vídeo .mp4 encontrado em input/. Faça upload do arquivo.")
    VIDEO_PATH_FOUND = None


In [None]:
%%writefile /content/TC-4/src/config.py
"""
Tech Challenge - Fase 4: Configurações do Projeto
Centraliza todas as configurações e constantes utilizadas na aplicação.
"""

import os
from pathlib import Path
from dotenv import load_dotenv

load_dotenv()

# Diretórios
BASE_DIR = Path("/content/TC-4")
SRC_DIR = BASE_DIR / "src"
INPUT_DIR = BASE_DIR / "input"
OUTPUT_DIR = BASE_DIR / "output"
REPORTS_DIR = BASE_DIR / "reports"
MODELS_DIR = BASE_DIR / "models"

# Criar diretórios se não existirem
INPUT_DIR.mkdir(exist_ok=True)
OUTPUT_DIR.mkdir(exist_ok=True)
REPORTS_DIR.mkdir(exist_ok=True)
MODELS_DIR.mkdir(exist_ok=True)

# Vídeo de entrada
VIDEO_PATH = os.getenv(
    "VIDEO_PATH", 
    str(INPUT_DIR / "Unlocking Facial Recognition_ Diverse Activities Analysis.mp4")
)

# Configurações de processamento
FRAME_SKIP = int(os.getenv("FRAME_SKIP", "2"))
CONFIDENCE_THRESHOLD = float(os.getenv("CONFIDENCE_THRESHOLD", "0.5"))

# OpenAI (opcional para geração de resumo)
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
OPENAI_MODEL = os.getenv("OPENAI_MODEL", "gpt-4o-mini")

# Configurações de visualização
COLORS = {
    "face": (0, 255, 0),       # Verde para rostos
    "emotion": (255, 255, 0),  # Amarelo para emoções
    "activity": (0, 165, 255), # Laranja para atividades
    "anomaly": (0, 0, 255),    # Vermelho para anomalias
    "text_bg": (0, 0, 0),      # Fundo preto para texto
}

# Mapeamento de emoções (português)
EMOTION_LABELS = {
    "angry": "Raiva",
    "disgust": "Nojo",
    "fear": "Medo",
    "happy": "Feliz",
    "sad": "Triste",
    "surprise": "Surpreso",
    "neutral": "Neutro"
}

# Categorias de atividades detectáveis
ACTIVITY_CATEGORIES = {
    "walking": "Caminhando",
    "running": "Correndo",
    "sitting": "Sentado",
    "standing": "Em pé",
    "talking": "Conversando",
    "gesturing": "Gesticulando",
    "waving": "Acenando",
    "pointing": "Apontando",
    "dancing": "Dançando",
    "crouching": "Agachado",
    "arms_raised": "Braços Levantados",
    "unknown": "Desconhecido"
}

# Limiares para detecção de anomalias
ANOMALY_THRESHOLDS = {
    "sudden_movement": 50,      # Pixels de movimento brusco
    "emotion_change_rate": 0.5, # Taxa de mudança emocional
    "activity_duration": 2.0,   # Segundos mínimos para atividade válida
}


In [None]:
%%writefile /content/TC-4/src/face_detector.py
"""
Tech Challenge - Fase 4: Detector de Rostos
Módulo responsável pelo reconhecimento e rastreamento de rostos no vídeo.
"""

import cv2
import numpy as np
from typing import List, Tuple, Dict, Optional
from dataclasses import dataclass, field


@dataclass
class FaceDetection:
    """Representa uma detecção de rosto em um frame."""
    face_id: int
    bbox: Tuple[int, int, int, int]  # (x, y, w, h)
    confidence: float
    landmarks: Optional[Dict] = None
    embedding: Optional[np.ndarray] = None


class FaceDetector:
    """
    Detector de rostos usando OpenCV DNN ou Haar Cascades.
    Suporta rastreamento básico de identidades entre frames.
    """
    
    def __init__(self, method: str = "haar", confidence_threshold: float = 0.5):
        """
        Inicializa o detector de rostos.
        
        Args:
            method: Método de detecção ('haar', 'dnn', 'mediapipe')
            confidence_threshold: Limiar mínimo de confiança
        """
        self.method = method
        self.confidence_threshold = confidence_threshold
        self.face_counter = 0
        self.tracked_faces: Dict[int, np.ndarray] = {}
        
        self._init_detector()
    
    def _init_detector(self):
        """Inicializa o detector baseado no método escolhido."""
        if self.method == "haar":
            # Haar Cascade - usa caminho explícito para evitar problemas de cache
            import os
            from pathlib import Path
            
            # Tenta múltiplos caminhos possíveis
            possible_paths = [
                # Caminho do cv2 atual
                str(Path(cv2.__file__).parent / "data" / "haarcascade_frontalface_default.xml"),
                # Caminho do venv Python 3.12
                "/home/aineto/workspaces/POS/TC-4/.venv/lib/python3.12/site-packages/cv2/data/haarcascade_frontalface_default.xml",
                # Caminho do FER
                "/home/aineto/workspaces/POS/TC-4/.venv/lib/python3.12/site-packages/fer/data/haarcascade_frontalface_default.xml",
                # Caminhos do sistema
                "/usr/share/opencv4/haarcascades/haarcascade_frontalface_default.xml",
                "/usr/share/opencv/haarcascades/haarcascade_frontalface_default.xml",
            ]
            
            for path in possible_paths:
                if os.path.exists(path):
                    self.detector = cv2.CascadeClassifier(path)
                    if not self.detector.empty():
                        break
            
            if self.detector.empty():
                print("[AVISO] Haar Cascade não encontrado, usando método simplificado")
        elif self.method == "dnn":
            # DNN (mais preciso, requer modelo)
            self._init_dnn_detector()
        elif self.method == "mediapipe":
            self._init_mediapipe_detector()
        else:
            raise ValueError(f"Método desconhecido: {self.method}")
    
    def _init_dnn_detector(self):
        """Inicializa detector DNN do OpenCV."""
        # Usa modelo Caffe pré-treinado do OpenCV
        model_path = cv2.data.haarcascades.replace(
            "haarcascades/", ""
        ) + "deploy.prototxt"
        weights_path = cv2.data.haarcascades.replace(
            "haarcascades/", ""
        ) + "res10_300x300_ssd_iter_140000.caffemodel"
        
        try:
            self.detector = cv2.dnn.readNetFromCaffe(model_path, weights_path)
        except Exception:
            print("[AVISO] Modelo DNN não encontrado, usando Haar Cascade")
            self.method = "haar"
            self._init_detector()
    
    def _init_mediapipe_detector(self):
        """Inicializa detector MediaPipe Face Detection."""
        try:
            import mediapipe as mp
            self.mp_face_detection = mp.solutions.face_detection
            self.detector = self.mp_face_detection.FaceDetection(
                model_selection=1,  # 1 para detecção de longa distância
                min_detection_confidence=self.confidence_threshold
            )
        except ImportError:
            print("[AVISO] MediaPipe não instalado, usando Haar Cascade")
            self.method = "haar"
            self._init_detector()
    
    def detect(self, frame: np.ndarray) -> List[FaceDetection]:
        """
        Detecta rostos em um frame.
        
        Args:
            frame: Imagem BGR do OpenCV
            
        Returns:
            Lista de detecções de rostos
        """
        if self.method == "haar":
            return self._detect_haar(frame)
        elif self.method == "dnn":
            return self._detect_dnn(frame)
        elif self.method == "mediapipe":
            return self._detect_mediapipe(frame)
        return []
    
    def _detect_haar(self, frame: np.ndarray) -> List[FaceDetection]:
        """Detecção usando Haar Cascades."""
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        faces = self.detector.detectMultiScale(
            gray,
            scaleFactor=1.1,
            minNeighbors=5,
            minSize=(30, 30)
        )
        
        detections = []
        for (x, y, w, h) in faces:
            face_id = self._assign_face_id(frame, (x, y, w, h))
            detections.append(FaceDetection(
                face_id=face_id,
                bbox=(x, y, w, h),
                confidence=1.0  # Haar não fornece confiança
            ))
        
        return detections
    
    def _detect_dnn(self, frame: np.ndarray) -> List[FaceDetection]:
        """Detecção usando DNN do OpenCV."""
        h, w = frame.shape[:2]
        blob = cv2.dnn.blobFromImage(
            cv2.resize(frame, (300, 300)), 
            1.0, (300, 300), (104.0, 177.0, 123.0)
        )
        
        self.detector.setInput(blob)
        detections_raw = self.detector.forward()
        
        detections = []
        for i in range(detections_raw.shape[2]):
            confidence = detections_raw[0, 0, i, 2]
            if confidence > self.confidence_threshold:
                box = detections_raw[0, 0, i, 3:7] * np.array([w, h, w, h])
                x1, y1, x2, y2 = box.astype(int)
                face_id = self._assign_face_id(frame, (x1, y1, x2-x1, y2-y1))
                detections.append(FaceDetection(
                    face_id=face_id,
                    bbox=(x1, y1, x2-x1, y2-y1),
                    confidence=float(confidence)
                ))
        
        return detections
    
    def _detect_mediapipe(self, frame: np.ndarray) -> List[FaceDetection]:
        """Detecção usando MediaPipe."""
        rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        results = self.detector.process(rgb_frame)
        
        detections = []
        if results.detections:
            h, w = frame.shape[:2]
            for detection in results.detections:
                bbox = detection.location_data.relative_bounding_box
                x = int(bbox.xmin * w)
                y = int(bbox.ymin * h)
                width = int(bbox.width * w)
                height = int(bbox.height * h)
                
                face_id = self._assign_face_id(frame, (x, y, width, height))
                detections.append(FaceDetection(
                    face_id=face_id,
                    bbox=(x, y, width, height),
                    confidence=detection.score[0]
                ))
        
        return detections
    
    def _assign_face_id(self, frame: np.ndarray, bbox: Tuple[int, int, int, int]) -> int:
        """
        Atribui um ID ao rosto baseado em rastreamento simples por posição.
        Para rastreamento mais robusto, usar embeddings faciais.
        """
        x, y, w, h = bbox
        center = np.array([x + w/2, y + h/2])
        
        # Procura rosto mais próximo já rastreado
        min_dist = float('inf')
        matched_id = None
        
        for face_id, prev_center in self.tracked_faces.items():
            dist = np.linalg.norm(center - prev_center)
            if dist < min_dist and dist < max(w, h) * 2:  # Threshold de proximidade
                min_dist = dist
                matched_id = face_id
        
        if matched_id is not None:
            self.tracked_faces[matched_id] = center
            return matched_id
        
        # Novo rosto detectado
        self.face_counter += 1
        self.tracked_faces[self.face_counter] = center
        return self.face_counter
    
    def reset_tracking(self):
        """Reseta o rastreamento de rostos."""
        self.face_counter = 0
        self.tracked_faces.clear()
    
    def draw_detections(
        self, 
        frame: np.ndarray, 
        detections: List[FaceDetection],
        color: Tuple[int, int, int] = (0, 255, 0)
    ) -> np.ndarray:
        """
        Desenha as detecções de rostos no frame.
        
        Args:
            frame: Imagem BGR
            detections: Lista de detecções
            color: Cor das bounding boxes (BGR)
            
        Returns:
            Frame com anotações
        """
        annotated = frame.copy()
        
        for det in detections:
            x, y, w, h = det.bbox
            
            # Bounding box
            cv2.rectangle(annotated, (x, y), (x+w, y+h), color, 2)
            
            # Label com ID e confiança
            label = f"Face #{det.face_id}"
            if det.confidence < 1.0:
                label += f" ({det.confidence:.0%})"
            
            # Fundo do texto
            (text_w, text_h), _ = cv2.getTextSize(
                label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 1
            )
            cv2.rectangle(
                annotated, 
                (x, y - text_h - 10), 
                (x + text_w + 4, y), 
                color, 
                -1
            )
            
            # Texto
            cv2.putText(
                annotated, label,
                (x + 2, y - 5),
                cv2.FONT_HERSHEY_SIMPLEX, 0.6,
                (0, 0, 0), 1, cv2.LINE_AA
            )
        
        return annotated



In [None]:
%%writefile /content/TC-4/src/emotion_analyzer.py
"""
Tech Challenge - Fase 4: Analisador de Emoções
Módulo responsável pela análise de expressões emocionais em rostos detectados.
"""

import cv2
import numpy as np
from typing import Dict, List, Tuple, Optional
from dataclasses import dataclass
from collections import deque

from .config import EMOTION_LABELS


@dataclass
class EmotionResult:
    """Resultado da análise emocional de um rosto."""
    face_id: int
    dominant_emotion: str
    emotion_scores: Dict[str, float]
    confidence: float
    emotion_pt: str  # Emoção em português


class EmotionAnalyzer:
    """
    Analisador de expressões emocionais usando DeepFace ou FER.
    Suporta rastreamento temporal de emoções para suavização.
    """
    
    def __init__(self, method: str = "fer", temporal_window: int = 5):
        """
        Inicializa o analisador de emoções.
        
        Args:
            method: Método de análise ('fer', 'deepface')
            temporal_window: Janela temporal para suavização (em frames)
        """
        self.method = method
        self.temporal_window = temporal_window
        self.emotion_history: Dict[int, deque] = {}  # face_id -> histórico
        
        self._init_analyzer()
    
    def _init_analyzer(self):
        """Inicializa o analisador baseado no método escolhido."""
        if self.method == "fer":
            self._init_fer()
        elif self.method == "deepface":
            self._init_deepface()
        else:
            raise ValueError(f"Método desconhecido: {self.method}")
    
    def _init_fer(self):
        """Inicializa FER (Facial Expression Recognition)."""
        try:
            # Nova versão do FER usa fer.fer.FER
            try:
                from fer.fer import FER
            except ImportError:
                from fer import FER
            self.analyzer = FER(mtcnn=False)  # Usamos nosso próprio detector
        except (ImportError, Exception) as e:
            print(f"[AVISO] FER não disponível ({e}), usando análise simplificada")
            self.method = "simple"
            self.analyzer = None
    
    def _init_deepface(self):
        """Inicializa DeepFace para análise de emoções."""
        try:
            from deepface import DeepFace
            self.analyzer = DeepFace
        except ImportError:
            print("[AVISO] DeepFace não instalado, tentando FER")
            self.method = "fer"
            self._init_fer()
    
    def analyze(
        self, 
        frame: np.ndarray, 
        face_bbox: Tuple[int, int, int, int],
        face_id: int
    ) -> Optional[EmotionResult]:
        """
        Analisa a emoção de um rosto no frame.
        
        Args:
            frame: Imagem BGR
            face_bbox: Bounding box do rosto (x, y, w, h)
            face_id: ID do rosto para rastreamento temporal
            
        Returns:
            Resultado da análise emocional ou None se falhar
        """
        x, y, w, h = face_bbox
        
        # Extrai região do rosto com margem
        margin = int(min(w, h) * 0.1)
        x1 = max(0, x - margin)
        y1 = max(0, y - margin)
        x2 = min(frame.shape[1], x + w + margin)
        y2 = min(frame.shape[0], y + h + margin)
        
        face_roi = frame[y1:y2, x1:x2]
        
        if face_roi.size == 0:
            return None
        
        if self.method == "fer":
            return self._analyze_fer(face_roi, face_id)
        elif self.method == "deepface":
            return self._analyze_deepface(face_roi, face_id)
        else:
            return self._analyze_simple(face_roi, face_id)
    
    def _analyze_fer(
        self, 
        face_roi: np.ndarray, 
        face_id: int
    ) -> Optional[EmotionResult]:
        """Análise usando FER."""
        if self.analyzer is None:
            return self._analyze_simple(face_roi, face_id)
        
        try:
            # FER espera imagem BGR
            emotions = self.analyzer.detect_emotions(face_roi)
            
            if not emotions:
                return None
            
            # Pega a primeira detecção (já sabemos que há um rosto)
            emotion_scores = emotions[0]["emotions"]
            
            # Aplica suavização temporal
            smoothed_scores = self._smooth_emotions(face_id, emotion_scores)
            
            # Encontra emoção dominante
            dominant = max(smoothed_scores, key=smoothed_scores.get)
            confidence = smoothed_scores[dominant]
            
            return EmotionResult(
                face_id=face_id,
                dominant_emotion=dominant,
                emotion_scores=smoothed_scores,
                confidence=confidence,
                emotion_pt=EMOTION_LABELS.get(dominant, dominant)
            )
            
        except Exception as e:
            print(f"[ERRO] FER: {e}")
            return self._analyze_simple(face_roi, face_id)
    
    def _analyze_deepface(
        self, 
        face_roi: np.ndarray, 
        face_id: int
    ) -> Optional[EmotionResult]:
        """Análise usando DeepFace."""
        try:
            result = self.analyzer.analyze(
                face_roi,
                actions=["emotion"],
                enforce_detection=False,
                silent=True
            )
            
            if not result:
                return None
            
            emotion_scores = result[0]["emotion"]
            # Normaliza para 0-1
            emotion_scores = {
                k: v / 100.0 for k, v in emotion_scores.items()
            }
            
            # Aplica suavização temporal
            smoothed_scores = self._smooth_emotions(face_id, emotion_scores)
            
            dominant = max(smoothed_scores, key=smoothed_scores.get)
            confidence = smoothed_scores[dominant]
            
            return EmotionResult(
                face_id=face_id,
                dominant_emotion=dominant,
                emotion_scores=smoothed_scores,
                confidence=confidence,
                emotion_pt=EMOTION_LABELS.get(dominant, dominant)
            )
            
        except Exception as e:
            print(f"[ERRO] DeepFace: {e}")
            return None
    
    def _analyze_simple(
        self, 
        face_roi: np.ndarray, 
        face_id: int
    ) -> EmotionResult:
        """
        Análise simplificada baseada em características básicas.
        Usado como fallback quando bibliotecas principais não estão disponíveis.
        """
        # Análise muito básica baseada em brilho/contraste
        gray = cv2.cvtColor(face_roi, cv2.COLOR_BGR2GRAY)
        mean_brightness = np.mean(gray) / 255.0
        std_contrast = np.std(gray) / 128.0
        
        # Heurística simples (não é precisa, apenas para demonstração)
        if std_contrast > 0.5:
            dominant = "surprise" if mean_brightness > 0.5 else "angry"
        elif mean_brightness > 0.6:
            dominant = "happy"
        elif mean_brightness < 0.4:
            dominant = "sad"
        else:
            dominant = "neutral"
        
        emotion_scores = {
            "angry": 0.1,
            "disgust": 0.05,
            "fear": 0.1,
            "happy": 0.1,
            "sad": 0.1,
            "surprise": 0.1,
            "neutral": 0.45
        }
        emotion_scores[dominant] = 0.6
        
        return EmotionResult(
            face_id=face_id,
            dominant_emotion=dominant,
            emotion_scores=emotion_scores,
            confidence=0.6,
            emotion_pt=EMOTION_LABELS.get(dominant, dominant)
        )
    
    def _smooth_emotions(
        self, 
        face_id: int, 
        current_scores: Dict[str, float]
    ) -> Dict[str, float]:
        """
        Aplica suavização temporal nas emoções para reduzir ruído.
        
        Args:
            face_id: ID do rosto
            current_scores: Scores atuais
            
        Returns:
            Scores suavizados
        """
        # Inicializa histórico se necessário
        if face_id not in self.emotion_history:
            self.emotion_history[face_id] = deque(maxlen=self.temporal_window)
        
        self.emotion_history[face_id].append(current_scores)
        
        # Média ponderada (mais recentes têm mais peso)
        if len(self.emotion_history[face_id]) == 1:
            return current_scores
        
        smoothed = {}
        weights = np.linspace(0.5, 1.0, len(self.emotion_history[face_id]))
        weights /= weights.sum()
        
        for emotion in current_scores.keys():
            values = [h.get(emotion, 0) for h in self.emotion_history[face_id]]
            smoothed[emotion] = float(np.average(values, weights=weights))
        
        return smoothed
    
    def get_emotion_trend(self, face_id: int) -> Optional[str]:
        """
        Analisa a tendência emocional de um rosto.
        
        Returns:
            'increasing', 'decreasing', 'stable' ou None
        """
        if face_id not in self.emotion_history:
            return None
        
        history = list(self.emotion_history[face_id])
        if len(history) < 3:
            return None
        
        # Analisa tendência da emoção dominante
        dominant_values = []
        for h in history:
            dominant = max(h, key=h.get)
            dominant_values.append(h[dominant])
        
        trend = np.polyfit(range(len(dominant_values)), dominant_values, 1)[0]
        
        if trend > 0.05:
            return "increasing"
        elif trend < -0.05:
            return "decreasing"
        return "stable"
    
    def reset_history(self, face_id: Optional[int] = None):
        """Reseta o histórico de emoções."""
        if face_id is not None:
            self.emotion_history.pop(face_id, None)
        else:
            self.emotion_history.clear()
    
    def draw_emotion(
        self,
        frame: np.ndarray,
        bbox: Tuple[int, int, int, int],
        result: EmotionResult,
        color: Tuple[int, int, int] = (255, 255, 0)
    ) -> np.ndarray:
        """
        Desenha a emoção detectada no frame.
        
        Args:
            frame: Imagem BGR
            bbox: Bounding box do rosto
            result: Resultado da análise
            color: Cor do texto (BGR)
            
        Returns:
            Frame anotado
        """
        annotated = frame.copy()
        x, y, w, h = bbox
        
        # Label da emoção
        label = f"{result.emotion_pt} ({result.confidence:.0%})"
        
        # Posição abaixo do rosto
        text_y = y + h + 20
        
        # Fundo do texto
        (text_w, text_h), _ = cv2.getTextSize(
            label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 1
        )
        cv2.rectangle(
            annotated,
            (x, text_y - text_h - 5),
            (x + text_w + 4, text_y + 5),
            (0, 0, 0),
            -1
        )
        
        # Texto
        cv2.putText(
            annotated, label,
            (x + 2, text_y),
            cv2.FONT_HERSHEY_SIMPLEX, 0.6,
            color, 1, cv2.LINE_AA
        )
        
        # Barra de emoções (mini gráfico)
        bar_y = text_y + 15
        bar_height = 8
        bar_width = w
        
        for i, (emotion, score) in enumerate(sorted(
            result.emotion_scores.items(), 
            key=lambda x: -x[1]
        )[:3]):  # Top 3 emoções
            bar_x = x
            filled_width = int(bar_width * score)
            
            # Cores diferentes para cada emoção
            emotion_colors = {
                "happy": (0, 255, 0),
                "sad": (255, 0, 0),
                "angry": (0, 0, 255),
                "surprise": (0, 255, 255),
                "fear": (255, 0, 255),
                "disgust": (0, 128, 0),
                "neutral": (128, 128, 128)
            }
            bar_color = emotion_colors.get(emotion, (200, 200, 200))
            
            cv2.rectangle(
                annotated,
                (bar_x, bar_y + i * (bar_height + 2)),
                (bar_x + filled_width, bar_y + i * (bar_height + 2) + bar_height),
                bar_color,
                -1
            )
        
        return annotated



In [None]:
%%writefile /content/TC-4/src/activity_detector.py
"""
Tech Challenge - Fase 4: Detector de Atividades
Usa YOLOv8-pose para detecção de pessoas e análise de poses/atividades.
"""

import cv2
import numpy as np

# Importa torch após numpy para evitar bug de compatibilidade
import torch

from typing import Dict, List, Tuple, Optional
from dataclasses import dataclass
from collections import deque
from enum import Enum

from .config import ACTIVITY_CATEGORIES


class ActivityType(Enum):
    """Tipos de atividades detectáveis."""
    STANDING = "standing"
    SITTING = "sitting"
    WALKING = "walking"
    RUNNING = "running"
    WAVING = "waving"
    POINTING = "pointing"
    DANCING = "dancing"
    CROUCHING = "crouching"
    ARMS_RAISED = "arms_raised"
    UNKNOWN = "unknown"


@dataclass
class PoseKeypoints:
    """Keypoints de pose (formato COCO - 17 pontos)."""
    nose: Optional[Tuple[float, float]] = None
    left_eye: Optional[Tuple[float, float]] = None
    right_eye: Optional[Tuple[float, float]] = None
    left_ear: Optional[Tuple[float, float]] = None
    right_ear: Optional[Tuple[float, float]] = None
    left_shoulder: Optional[Tuple[float, float]] = None
    right_shoulder: Optional[Tuple[float, float]] = None
    left_elbow: Optional[Tuple[float, float]] = None
    right_elbow: Optional[Tuple[float, float]] = None
    left_wrist: Optional[Tuple[float, float]] = None
    right_wrist: Optional[Tuple[float, float]] = None
    left_hip: Optional[Tuple[float, float]] = None
    right_hip: Optional[Tuple[float, float]] = None
    left_knee: Optional[Tuple[float, float]] = None
    right_knee: Optional[Tuple[float, float]] = None
    left_ankle: Optional[Tuple[float, float]] = None
    right_ankle: Optional[Tuple[float, float]] = None


@dataclass
class ActivityDetection:
    """Resultado da detecção de atividade."""
    person_id: int
    activity: ActivityType
    activity_pt: str
    confidence: float
    bbox: Tuple[int, int, int, int]
    keypoints: Optional[PoseKeypoints] = None
    velocity: float = 0.0


class ActivityDetector:
    """Detector de atividades usando YOLOv8-pose."""
    
    def __init__(
        self, 
        model_size: str = "n",
        min_confidence: float = 0.5,
        history_size: int = 10
    ):
        """
        Args:
            model_size: Tamanho do modelo ('n', 's', 'm', 'l', 'x')
            min_confidence: Confiança mínima para detecção
            history_size: Frames de histórico para análise temporal
        """
        self.min_confidence = min_confidence
        self.history_size = history_size
        self.person_counter = 0
        self.position_history: Dict[int, deque] = {}
        self.pose_history: Dict[int, deque] = {}
        
        self._init_yolo(model_size)
    
    def _init_yolo(self, model_size: str):
        """Inicializa YOLO11-pose (mais preciso que YOLOv8)."""
        try:
            from ultralytics import YOLO
            # Usa YOLO11 (melhor precisão)
            model_name = f"yolo11{model_size}-pose.pt"
            self.model = YOLO(model_name)
            self.model_loaded = True
            print(f"[INFO] Modelo carregado: {model_name}")
        except Exception as e:
            print(f"[AVISO] YOLO11 não disponível, tentando YOLOv8: {e}")
            try:
                from ultralytics import YOLO
                model_name = f"yolov8{model_size}-pose.pt"
                self.model = YOLO(model_name)
                self.model_loaded = True
            except Exception as e2:
                print(f"[ERRO] Falha ao carregar modelo: {e2}")
                self.model = None
                self.model_loaded = False
    
    def detect(self, frame: np.ndarray) -> List[ActivityDetection]:
        """Detecta pessoas e suas atividades no frame."""
        if not self.model_loaded:
            return []
        
        results = self.model(frame, verbose=False, conf=self.min_confidence)
        detections = []
        
        for result in results:
            if result.keypoints is None or result.boxes is None:
                continue
            
            keypoints_data = result.keypoints.xy.cpu().numpy()
            confidences = result.keypoints.conf.cpu().numpy() if result.keypoints.conf is not None else None
            boxes = result.boxes
            
            for i, (kpts, box) in enumerate(zip(keypoints_data, boxes)):
                # Filtra por confiança do box
                box_conf = float(box.conf[0])
                if box_conf < self.min_confidence:
                    continue
                
                # Bounding box
                xyxy = box.xyxy[0].cpu().numpy()
                bbox = (
                    int(xyxy[0]), int(xyxy[1]),
                    int(xyxy[2] - xyxy[0]), int(xyxy[3] - xyxy[1])
                )
                
                # Keypoints
                kpt_conf = confidences[i] if confidences is not None else None
                keypoints = self._extract_keypoints(kpts, kpt_conf)
                
                # ID da pessoa (tracking simples por posição)
                person_id = self._assign_person_id(bbox)
                
                # Analisa atividade
                activity, act_conf = self._analyze_activity(person_id, keypoints)
                
                # Velocidade
                velocity = self._calculate_velocity(person_id, bbox)
                
                detections.append(ActivityDetection(
                    person_id=person_id,
                    activity=activity,
                    activity_pt=ACTIVITY_CATEGORIES.get(activity.value, "Desconhecido"),
                    confidence=act_conf * box_conf,
                    bbox=bbox,
                    keypoints=keypoints,
                    velocity=velocity
                ))
        
        return detections
    
    def _extract_keypoints(
        self, 
        kpts: np.ndarray, 
        conf: Optional[np.ndarray] = None,
        min_kpt_conf: float = 0.3
    ) -> PoseKeypoints:
        """Extrai keypoints do formato YOLO (COCO 17 pontos)."""
        def get_point(idx: int) -> Optional[Tuple[float, float]]:
            if idx >= len(kpts):
                return None
            x, y = kpts[idx]
            if x == 0 and y == 0:
                return None
            if conf is not None and idx < len(conf) and conf[idx] < min_kpt_conf:
                return None
            return (float(x), float(y))
        
        return PoseKeypoints(
            nose=get_point(0),
            left_eye=get_point(1),
            right_eye=get_point(2),
            left_ear=get_point(3),
            right_ear=get_point(4),
            left_shoulder=get_point(5),
            right_shoulder=get_point(6),
            left_elbow=get_point(7),
            right_elbow=get_point(8),
            left_wrist=get_point(9),
            right_wrist=get_point(10),
            left_hip=get_point(11),
            right_hip=get_point(12),
            left_knee=get_point(13),
            right_knee=get_point(14),
            left_ankle=get_point(15),
            right_ankle=get_point(16)
        )
    
    def _assign_person_id(self, bbox: Tuple[int, int, int, int]) -> int:
        """Atribui ID baseado em proximidade com detecções anteriores."""
        cx = bbox[0] + bbox[2] // 2
        cy = bbox[1] + bbox[3] // 2
        
        # Busca pessoa próxima no histórico
        min_dist = float('inf')
        best_id = None
        
        for pid, history in self.position_history.items():
            if history:
                last_pos = history[-1]
                dist = np.sqrt((cx - last_pos[0])**2 + (cy - last_pos[1])**2)
                if dist < min_dist and dist < 100:
                    min_dist = dist
                    best_id = pid
        
        if best_id is None:
            self.person_counter += 1
            best_id = self.person_counter
            self.position_history[best_id] = deque(maxlen=self.history_size)
            self.pose_history[best_id] = deque(maxlen=self.history_size)
        
        self.position_history[best_id].append((cx, cy))
        return best_id
    
    def _calculate_velocity(self, person_id: int, bbox: Tuple[int, int, int, int]) -> float:
        """Calcula velocidade do movimento em pixels/frame."""
        history = self.position_history.get(person_id)
        if not history or len(history) < 2:
            return 0.0
        
        positions = list(history)
        if len(positions) >= 2:
            dx = positions[-1][0] - positions[-2][0]
            dy = positions[-1][1] - positions[-2][1]
            return np.sqrt(dx**2 + dy**2)
        return 0.0
    
    def _analyze_activity(
        self, 
        person_id: int, 
        keypoints: PoseKeypoints
    ) -> Tuple[ActivityType, float]:
        """Analisa atividade baseada em pose e histórico."""
        self.pose_history.setdefault(person_id, deque(maxlen=self.history_size))
        self.pose_history[person_id].append(keypoints)
        
        # 1. Verifica braços levantados (ambos acima da cabeça)
        if self._is_arms_raised(keypoints):
            return ActivityType.ARMS_RAISED, 0.85
        
        # 2. Verifica se está dançando (movimento rítmico + gestos)
        if self._is_dancing(person_id, keypoints):
            return ActivityType.DANCING, 0.8
        
        # 3. Verifica gestos com braços
        if self._is_waving(keypoints):
            return ActivityType.WAVING, 0.8
        
        if self._is_pointing(keypoints):
            return ActivityType.POINTING, 0.75
        
        # 4. Verifica agachado
        if self._is_crouching(keypoints):
            return ActivityType.CROUCHING, 0.75
        
        # 5. Verifica postura sentada (melhorada)
        if self._is_sitting(keypoints):
            return ActivityType.SITTING, 0.75
        
        # 6. Verifica movimento pelas pernas e velocidade
        velocity = self._get_avg_velocity(person_id)
        
        if velocity > 80:
            return ActivityType.RUNNING, 0.8
        elif velocity > 25:
            return ActivityType.WALKING, 0.75
        else:
            return ActivityType.STANDING, 0.7
    
    def _get_avg_velocity(self, person_id: int) -> float:
        """Calcula velocidade média recente."""
        history = self.position_history.get(person_id)
        if not history or len(history) < 3:
            return 0.0
        
        positions = list(history)
        velocities = []
        for i in range(1, len(positions)):
            dx = positions[i][0] - positions[i-1][0]
            dy = positions[i][1] - positions[i-1][1]
            velocities.append(np.sqrt(dx**2 + dy**2))
        
        return np.mean(velocities) if velocities else 0.0
    
    def _is_waving(self, kp: PoseKeypoints) -> bool:
        """Detecta gesto de acenar (mão acima do ombro, cotovelo dobrado)."""
        for wrist, elbow, shoulder in [
            (kp.left_wrist, kp.left_elbow, kp.left_shoulder),
            (kp.right_wrist, kp.right_elbow, kp.right_shoulder)
        ]:
            if all([wrist, elbow, shoulder]):
                # Mão acima do ombro
                if wrist[1] < shoulder[1] - 30:
                    # Cotovelo dobrado (não braço reto)
                    elbow_angle = self._calculate_angle(shoulder, elbow, wrist)
                    if 45 < elbow_angle < 150:
                        return True
        return False
    
    def _is_pointing(self, kp: PoseKeypoints) -> bool:
        """Detecta gesto de apontar (braço estendido horizontalmente)."""
        for wrist, elbow, shoulder in [
            (kp.left_wrist, kp.left_elbow, kp.left_shoulder),
            (kp.right_wrist, kp.right_elbow, kp.right_shoulder)
        ]:
            if all([wrist, elbow, shoulder]):
                # Braço estendido (ângulo > 150)
                arm_angle = self._calculate_angle(shoulder, elbow, wrist)
                if arm_angle > 150:
                    # Horizontalmente (variação vertical pequena)
                    arm_height_diff = abs(wrist[1] - shoulder[1])
                    arm_length = abs(wrist[0] - shoulder[0])
                    if arm_length > 80 and arm_height_diff < 60:
                        return True
        return False
    
    def _is_sitting(self, kp: PoseKeypoints) -> bool:
        """Detecta postura sentada (análise melhorada de ângulos)."""
        # Precisa de quadril e joelho
        if not all([kp.left_hip, kp.right_hip, kp.left_knee, kp.right_knee]):
            return False
        
        hip_y = (kp.left_hip[1] + kp.right_hip[1]) / 2
        knee_y = (kp.left_knee[1] + kp.right_knee[1]) / 2
        
        # Se joelhos estão aproximadamente na mesma altura que quadril = sentado
        hip_knee_diff = abs(hip_y - knee_y)
        
        # Verifica também ângulo do tronco se tiver ombros
        if kp.left_shoulder and kp.right_shoulder:
            shoulder_y = (kp.left_shoulder[1] + kp.right_shoulder[1]) / 2
            torso_length = abs(hip_y - shoulder_y)
            
            # Se diferença quadril-joelho é menor que 40% do tronco = sentado
            if hip_knee_diff < torso_length * 0.5:
                return True
        
        # Fallback: diferença absoluta pequena
        if hip_knee_diff < 80:
            return True
        
        return False
    
    def _is_crouching(self, kp: PoseKeypoints) -> bool:
        """Detecta postura agachada (quadril baixo, joelhos dobrados)."""
        if not all([kp.left_hip, kp.right_hip, kp.left_knee, kp.right_knee]):
            return False
        
        hip_y = (kp.left_hip[1] + kp.right_hip[1]) / 2
        knee_y = (kp.left_knee[1] + kp.right_knee[1]) / 2
        
        # Tornozelos disponíveis para melhor análise
        if kp.left_ankle and kp.right_ankle:
            ankle_y = (kp.left_ankle[1] + kp.right_ankle[1]) / 2
            
            # Agachado: quadril próximo aos joelhos, joelhos acima dos tornozelos
            if hip_y > knee_y - 30 and knee_y < ankle_y:
                return True
        
        return False
    
    def _is_arms_raised(self, kp: PoseKeypoints) -> bool:
        """Detecta ambos os braços levantados acima da cabeça."""
        if not all([kp.left_wrist, kp.right_wrist, kp.nose]):
            return False
        
        # Ambos pulsos acima do nariz
        if kp.left_wrist[1] < kp.nose[1] and kp.right_wrist[1] < kp.nose[1]:
            return True
        
        return False
    
    def _is_dancing(self, person_id: int, kp: PoseKeypoints) -> bool:
        """Detecta dança baseada em movimento rítmico e variação de pose."""
        history = self.pose_history.get(person_id)
        if not history or len(history) < 5:
            return False
        
        # Calcula variação de posição dos braços ao longo do tempo
        wrist_variations = []
        for prev_kp in list(history)[-5:]:
            if prev_kp.left_wrist and kp.left_wrist:
                dx = abs(prev_kp.left_wrist[0] - kp.left_wrist[0])
                dy = abs(prev_kp.left_wrist[1] - kp.left_wrist[1])
                wrist_variations.append(dx + dy)
            if prev_kp.right_wrist and kp.right_wrist:
                dx = abs(prev_kp.right_wrist[0] - kp.right_wrist[0])
                dy = abs(prev_kp.right_wrist[1] - kp.right_wrist[1])
                wrist_variations.append(dx + dy)
        
        # Se há movimento constante dos braços = possível dança
        if wrist_variations:
            avg_variation = np.mean(wrist_variations)
            # Movimento moderado (não parado, não muito rápido)
            if 15 < avg_variation < 80:
                # Verifica se torso também se move
                velocity = self._get_avg_velocity(person_id)
                if 5 < velocity < 40:
                    return True
        
        return False
    
    def _calculate_angle(
        self, 
        p1: Tuple[float, float], 
        p2: Tuple[float, float], 
        p3: Tuple[float, float]
    ) -> float:
        """Calcula ângulo entre três pontos (p2 é o vértice)."""
        v1 = np.array([p1[0] - p2[0], p1[1] - p2[1]])
        v2 = np.array([p3[0] - p2[0], p3[1] - p2[1]])
        
        cos_angle = np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2) + 1e-6)
        angle = np.arccos(np.clip(cos_angle, -1, 1))
        
        return np.degrees(angle)
    
    def reset(self):
        """Reseta estado do detector."""
        self.person_counter = 0
        self.position_history.clear()
        self.pose_history.clear()



In [None]:
%%writefile /content/TC-4/src/anomaly_detector.py
"""
Tech Challenge - Fase 4: Detector de Anomalias
Módulo responsável pela detecção de comportamentos anômalos no vídeo.
Anomalias incluem: movimentos bruscos, mudanças emocionais súbitas, padrões atípicos.
"""

import numpy as np
from typing import Dict, List, Tuple, Optional
from dataclasses import dataclass, field
from collections import deque
from enum import Enum
from datetime import datetime


class AnomalyType(Enum):
    """Tipos de anomalias detectáveis."""
    SUDDEN_MOVEMENT = "sudden_movement"
    EMOTION_SPIKE = "emotion_spike"
    UNUSUAL_ACTIVITY = "unusual_activity"
    CROWD_ANOMALY = "crowd_anomaly"
    PROLONGED_INACTIVITY = "prolonged_inactivity"


@dataclass
class AnomalyEvent:
    """Representa um evento anômalo detectado."""
    anomaly_type: AnomalyType
    timestamp: float  # Segundos desde o início do vídeo
    frame_number: int
    person_id: Optional[int]
    severity: float  # 0.0 a 1.0
    description: str
    bbox: Optional[Tuple[int, int, int, int]] = None
    details: Dict = field(default_factory=dict)


@dataclass
class PersonMetrics:
    """Métricas acumuladas de uma pessoa para análise de anomalias."""
    position_history: deque = field(default_factory=lambda: deque(maxlen=30))
    emotion_history: deque = field(default_factory=lambda: deque(maxlen=30))
    activity_history: deque = field(default_factory=lambda: deque(maxlen=30))
    velocity_history: deque = field(default_factory=lambda: deque(maxlen=30))
    last_seen_frame: int = 0
    frames_inactive: int = 0


class AnomalyDetector:
    """
    Detector de anomalias comportamentais.
    Analisa padrões de movimento, emoção e atividade para identificar comportamentos atípicos.
    """
    
    def __init__(
        self,
        sudden_movement_threshold: float = 80.0,
        emotion_change_threshold: float = 0.5,
        inactivity_threshold: int = 90,  # frames
        fps: float = 30.0
    ):
        """
        Inicializa o detector de anomalias.
        
        Args:
            sudden_movement_threshold: Limiar de velocidade para movimento brusco (pixels/frame)
            emotion_change_threshold: Limiar de mudança emocional (0-1)
            inactivity_threshold: Frames de inatividade para considerar anomalia
            fps: Frames por segundo do vídeo
        """
        self.sudden_movement_threshold = sudden_movement_threshold
        self.emotion_change_threshold = emotion_change_threshold
        self.inactivity_threshold = inactivity_threshold
        self.fps = fps
        
        # Métricas por pessoa
        self.person_metrics: Dict[int, PersonMetrics] = {}
        
        # Histórico de anomalias
        self.anomaly_history: List[AnomalyEvent] = []
        
        # Estatísticas globais para baseline
        self.global_velocity_mean = 0.0
        self.global_velocity_std = 1.0
        self.velocity_samples: deque = deque(maxlen=1000)
        
        # Contadores
        self.frame_count = 0
        self.total_detections = 0
    
    def update(
        self,
        frame_number: int,
        face_detections: List,
        emotion_results: List,
        activity_detections: List
    ) -> List[AnomalyEvent]:
        """
        Atualiza o detector com novos dados e retorna anomalias detectadas.
        
        Args:
            frame_number: Número do frame atual
            face_detections: Lista de detecções de rostos
            emotion_results: Lista de resultados de análise emocional
            activity_detections: Lista de atividades detectadas
            
        Returns:
            Lista de anomalias detectadas neste frame
        """
        self.frame_count = frame_number
        anomalies = []
        
        # Atualiza métricas de cada pessoa
        seen_persons = set()
        
        # Processa detecções de face/emoção
        for face, emotion in zip(face_detections, emotion_results):
            if emotion is None:
                continue
            
            person_id = face.face_id
            seen_persons.add(person_id)
            
            self._ensure_person_metrics(person_id)
            metrics = self.person_metrics[person_id]
            
            # Atualiza histórico
            center = np.array([
                face.bbox[0] + face.bbox[2]/2,
                face.bbox[1] + face.bbox[3]/2
            ])
            metrics.position_history.append(center)
            metrics.emotion_history.append(emotion.emotion_scores.copy())
            metrics.last_seen_frame = frame_number
            metrics.frames_inactive = 0
            
            # Detecta anomalias de emoção
            emotion_anomaly = self._check_emotion_anomaly(person_id, emotion, frame_number)
            if emotion_anomaly:
                emotion_anomaly.bbox = face.bbox
                anomalies.append(emotion_anomaly)
        
        # Processa detecções de atividade
        for activity in activity_detections:
            person_id = activity.person_id
            seen_persons.add(person_id)
            
            self._ensure_person_metrics(person_id)
            metrics = self.person_metrics[person_id]
            
            # Atualiza histórico
            metrics.activity_history.append(activity.activity.value)
            metrics.velocity_history.append(activity.velocity)
            
            # Atualiza estatísticas globais
            self.velocity_samples.append(activity.velocity)
            if len(self.velocity_samples) > 100:
                self.global_velocity_mean = np.mean(self.velocity_samples)
                self.global_velocity_std = max(np.std(self.velocity_samples), 1.0)
            
            # Detecta anomalias de movimento
            movement_anomaly = self._check_movement_anomaly(person_id, activity, frame_number)
            if movement_anomaly:
                anomalies.append(movement_anomaly)
            
            # Detecta anomalias de atividade
            activity_anomaly = self._check_activity_anomaly(person_id, activity, frame_number)
            if activity_anomaly:
                anomalies.append(activity_anomaly)
        
        # Verifica inatividade prolongada
        for person_id, metrics in self.person_metrics.items():
            if person_id not in seen_persons:
                metrics.frames_inactive += 1
                
                if metrics.frames_inactive == self.inactivity_threshold:
                    anomalies.append(AnomalyEvent(
                        anomaly_type=AnomalyType.PROLONGED_INACTIVITY,
                        timestamp=frame_number / self.fps,
                        frame_number=frame_number,
                        person_id=person_id,
                        severity=0.4,
                        description=f"Pessoa #{person_id} desapareceu por {self.inactivity_threshold} frames"
                    ))
        
        # Registra anomalias no histórico
        self.anomaly_history.extend(anomalies)
        self.total_detections += len(face_detections) + len(activity_detections)
        
        return anomalies
    
    def _ensure_person_metrics(self, person_id: int):
        """Garante que métricas existam para a pessoa."""
        if person_id not in self.person_metrics:
            self.person_metrics[person_id] = PersonMetrics()
    
    def _check_emotion_anomaly(
        self,
        person_id: int,
        emotion_result,
        frame_number: int
    ) -> Optional[AnomalyEvent]:
        """Verifica anomalias de mudança emocional."""
        metrics = self.person_metrics[person_id]
        history = metrics.emotion_history
        
        if len(history) < 3:
            return None
        
        # Calcula mudança emocional
        prev_scores = history[-2]
        curr_scores = emotion_result.emotion_scores
        
        change = 0.0
        for emotion in curr_scores:
            prev_val = prev_scores.get(emotion, 0)
            curr_val = curr_scores.get(emotion, 0)
            change += abs(curr_val - prev_val)
        
        change /= len(curr_scores)
        
        if change > self.emotion_change_threshold:
            # Identifica qual emoção mudou mais
            max_change_emotion = max(
                curr_scores.keys(),
                key=lambda e: abs(curr_scores[e] - prev_scores.get(e, 0))
            )
            
            return AnomalyEvent(
                anomaly_type=AnomalyType.EMOTION_SPIKE,
                timestamp=frame_number / self.fps,
                frame_number=frame_number,
                person_id=person_id,
                severity=min(change / self.emotion_change_threshold, 1.0),
                description=f"Mudança emocional brusca para '{max_change_emotion}'",
                details={
                    "emotion": max_change_emotion,
                    "change_magnitude": change,
                    "previous_dominant": max(prev_scores, key=prev_scores.get),
                    "current_dominant": emotion_result.dominant_emotion
                }
            )
        
        return None
    
    def _check_movement_anomaly(
        self,
        person_id: int,
        activity_detection,
        frame_number: int
    ) -> Optional[AnomalyEvent]:
        """Verifica anomalias de movimento brusco."""
        velocity = activity_detection.velocity
        
        # Usa threshold absoluto e relativo
        abs_threshold = self.sudden_movement_threshold
        rel_threshold = self.global_velocity_mean + 3 * self.global_velocity_std
        
        threshold = max(abs_threshold, rel_threshold)
        
        if velocity > threshold:
            severity = min((velocity - threshold) / threshold + 0.5, 1.0)
            
            return AnomalyEvent(
                anomaly_type=AnomalyType.SUDDEN_MOVEMENT,
                timestamp=frame_number / self.fps,
                frame_number=frame_number,
                person_id=person_id,
                severity=severity,
                description=f"Movimento brusco detectado (velocidade: {velocity:.1f} px/frame)",
                bbox=activity_detection.bbox,
                details={
                    "velocity": velocity,
                    "threshold": threshold,
                    "activity": activity_detection.activity.value
                }
            )
        
        return None
    
    def _check_activity_anomaly(
        self,
        person_id: int,
        activity_detection,
        frame_number: int
    ) -> Optional[AnomalyEvent]:
        """Verifica anomalias de padrão de atividade."""
        metrics = self.person_metrics[person_id]
        history = list(metrics.activity_history)
        
        if len(history) < 10:
            return None
        
        current_activity = activity_detection.activity.value
        
        # Conta frequência de cada atividade no histórico
        activity_counts = {}
        for act in history[:-1]:  # Exclui o atual
            activity_counts[act] = activity_counts.get(act, 0) + 1
        
        total = len(history) - 1
        if total == 0:
            return None
        
        # Calcula probabilidade da atividade atual baseado no histórico
        current_freq = activity_counts.get(current_activity, 0) / total
        
        # Se atividade atual é muito rara no histórico (< 5%), é anômala
        if current_freq < 0.05 and len(set(history)) > 2:
            most_common = max(activity_counts, key=activity_counts.get)
            
            return AnomalyEvent(
                anomaly_type=AnomalyType.UNUSUAL_ACTIVITY,
                timestamp=frame_number / self.fps,
                frame_number=frame_number,
                person_id=person_id,
                severity=0.5,
                description=f"Atividade incomum: '{current_activity}' (usual: '{most_common}')",
                bbox=activity_detection.bbox,
                details={
                    "activity": current_activity,
                    "frequency": current_freq,
                    "usual_activity": most_common
                }
            )
        
        return None
    
    def get_statistics(self) -> Dict:
        """Retorna estatísticas do detector."""
        anomaly_counts = {}
        for anomaly in self.anomaly_history:
            atype = anomaly.anomaly_type.value
            anomaly_counts[atype] = anomaly_counts.get(atype, 0) + 1
        
        severity_avg = 0.0
        if self.anomaly_history:
            severity_avg = np.mean([a.severity for a in self.anomaly_history])
        
        return {
            "total_frames": self.frame_count,
            "total_anomalies": len(self.anomaly_history),
            "anomalies_by_type": anomaly_counts,
            "average_severity": severity_avg,
            "persons_tracked": len(self.person_metrics),
            "global_velocity_mean": self.global_velocity_mean,
            "global_velocity_std": self.global_velocity_std
        }
    
    def get_anomalies_summary(self) -> List[Dict]:
        """Retorna resumo das anomalias para relatório."""
        summary = []
        for anomaly in self.anomaly_history:
            summary.append({
                "tipo": anomaly.anomaly_type.value,
                "timestamp": f"{anomaly.timestamp:.2f}s",
                "frame": anomaly.frame_number,
                "pessoa_id": anomaly.person_id,
                "severidade": f"{anomaly.severity:.0%}",
                "descricao": anomaly.description,
                "detalhes": anomaly.details
            })
        return summary
    
    def reset(self):
        """Reseta o estado do detector."""
        self.person_metrics.clear()
        self.anomaly_history.clear()
        self.velocity_samples.clear()
        self.frame_count = 0
        self.total_detections = 0
        self.global_velocity_mean = 0.0
        self.global_velocity_std = 1.0


def draw_anomaly(
    frame: np.ndarray,
    anomaly: AnomalyEvent,
    color: Tuple[int, int, int] = (0, 0, 255)
) -> np.ndarray:
    """
    Desenha indicação de anomalia no frame.
    
    Args:
        frame: Imagem BGR
        anomaly: Evento de anomalia
        color: Cor do indicador (BGR)
        
    Returns:
        Frame anotado
    """
    import cv2
    annotated = frame.copy()
    h, w = frame.shape[:2]
    
    # Se tem bbox, destaca a região
    if anomaly.bbox:
        x, y, bw, bh = anomaly.bbox
        
        # Borda pulsante (mais grossa para severidade maior)
        thickness = int(2 + anomaly.severity * 4)
        cv2.rectangle(annotated, (x, y), (x+bw, y+bh), color, thickness)
        
        # Ícone de alerta
        alert_x = x + bw - 25
        alert_y = y + 5
        cv2.putText(
            annotated, "!",
            (alert_x, alert_y + 20),
            cv2.FONT_HERSHEY_SIMPLEX, 0.8,
            color, 2, cv2.LINE_AA
        )
    
    # Banner de anomalia no topo
    banner_text = f"ANOMALIA: {anomaly.description}"
    (text_w, text_h), _ = cv2.getTextSize(
        banner_text, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 1
    )
    
    # Fundo semi-transparente
    overlay = annotated.copy()
    cv2.rectangle(overlay, (0, 0), (w, text_h + 20), color, -1)
    cv2.addWeighted(overlay, 0.7, annotated, 0.3, 0, annotated)
    
    # Texto
    cv2.putText(
        annotated, banner_text,
        (10, text_h + 10),
        cv2.FONT_HERSHEY_SIMPLEX, 0.6,
        (255, 255, 255), 1, cv2.LINE_AA
    )
    
    return annotated



In [None]:
%%writefile /content/TC-4/src/report_generator.py
"""
Tech Challenge - Fase 4: Gerador de Relatório
Módulo responsável pela geração automática do relatório de análise do vídeo.
Suporta geração com LLM (OpenAI) ou template local.
"""

import os
import json
from typing import Dict, List, Optional
from datetime import datetime
from pathlib import Path
from dataclasses import dataclass

from .config import OPENAI_API_KEY, OPENAI_MODEL, REPORTS_DIR, EMOTION_LABELS


@dataclass
class VideoAnalysisResult:
    """Resultado consolidado da análise de vídeo."""
    video_path: str
    total_frames: int
    fps: float
    duration_seconds: float
    total_faces_detected: int
    unique_faces: int
    emotions_summary: Dict[str, int]
    activities_summary: Dict[str, int]
    total_anomalies: int
    anomalies_by_type: Dict[str, int]
    anomaly_events: List[Dict]
    processing_time_seconds: float


class ReportGenerator:
    """
    Gerador de relatórios de análise de vídeo.
    Suporta geração com LLM para resumos mais elaborados.
    """
    
    def __init__(self, use_llm: bool = True):
        """
        Inicializa o gerador de relatórios.
        
        Args:
            use_llm: Se True, usa LLM para gerar resumos (requer OPENAI_API_KEY)
        """
        self.use_llm = use_llm and bool(OPENAI_API_KEY)
        self.llm = None
        
        if self.use_llm:
            self._init_llm()
    
    def _init_llm(self):
        """Inicializa o LLM para geração de resumos."""
        try:
            from langchain_openai import ChatOpenAI
            from langchain.prompts import PromptTemplate
            
            self.llm = ChatOpenAI(model=OPENAI_MODEL, temperature=0.3)
            self.PromptTemplate = PromptTemplate
        except ImportError:
            print("[AVISO] LangChain não instalado, usando geração de relatório sem LLM")
            self.use_llm = False
    
    def generate(
        self,
        analysis_result: VideoAnalysisResult,
        output_path: Optional[str] = None
    ) -> str:
        """
        Gera o relatório de análise.
        
        Args:
            analysis_result: Resultado da análise do vídeo
            output_path: Caminho para salvar o relatório (opcional)
            
        Returns:
            Texto do relatório gerado
        """
        # Gera seções do relatório
        header = self._generate_header(analysis_result)
        statistics = self._generate_statistics(analysis_result)
        emotions_section = self._generate_emotions_section(analysis_result)
        activities_section = self._generate_activities_section(analysis_result)
        anomalies_section = self._generate_anomalies_section(analysis_result)
        
        # Gera resumo executivo
        if self.use_llm:
            summary = self._generate_llm_summary(analysis_result)
        else:
            summary = self._generate_template_summary(analysis_result)
        
        # Monta relatório completo
        report = f"""
{header}

## Resumo Executivo
{summary}

{statistics}

{emotions_section}

{activities_section}

{anomalies_section}

---
*Relatório gerado automaticamente em {datetime.now().strftime('%d/%m/%Y %H:%M:%S')}*
"""
        
        # Salva se caminho fornecido
        if output_path:
            Path(output_path).write_text(report, encoding='utf-8')
            print(f"[INFO] Relatório salvo em: {output_path}")
        
        return report
    
    def _generate_header(self, result: VideoAnalysisResult) -> str:
        """Gera cabeçalho do relatório."""
        video_name = Path(result.video_path).name
        return f"""# Relatório de Análise de Vídeo
## Tech Challenge - Fase 4

**Arquivo**: `{video_name}`  
**Duração**: {result.duration_seconds:.1f} segundos ({result.total_frames} frames @ {result.fps:.1f} FPS)  
**Data da Análise**: {datetime.now().strftime('%d/%m/%Y %H:%M')}  
**Tempo de Processamento**: {result.processing_time_seconds:.1f} segundos
"""
    
    def _generate_statistics(self, result: VideoAnalysisResult) -> str:
        """Gera seção de estatísticas."""
        return f"""## Estatísticas Gerais

| Métrica | Valor |
|---------|-------|
| Total de Frames Analisados | {result.total_frames} |
| Rostos Detectados (total) | {result.total_faces_detected} |
| Pessoas Únicas Identificadas | {result.unique_faces} |
| Anomalias Detectadas | {result.total_anomalies} |
"""
    
    def _generate_emotions_section(self, result: VideoAnalysisResult) -> str:
        """Gera seção de análise de emoções."""
        if not result.emotions_summary:
            return "## Análise de Emoções\n\n*Nenhuma emoção detectada.*"
        
        # Traduz e ordena emoções
        emotions_translated = {}
        for emotion, count in result.emotions_summary.items():
            emotion_pt = EMOTION_LABELS.get(emotion, emotion)
            emotions_translated[emotion_pt] = count
        
        sorted_emotions = sorted(
            emotions_translated.items(), 
            key=lambda x: -x[1]
        )
        
        table = "| Emoção | Frequência |\n|--------|------------|\n"
        for emotion, count in sorted_emotions:
            table += f"| {emotion} | {count} |\n"
        
        # Emoção dominante
        dominant = sorted_emotions[0][0] if sorted_emotions else "N/A"
        
        return f"""## Análise de Emoções

**Emoção Predominante**: {dominant}

{table}
"""
    
    def _generate_activities_section(self, result: VideoAnalysisResult) -> str:
        """Gera seção de análise de atividades."""
        if not result.activities_summary:
            return "## Detecção de Atividades\n\n*Nenhuma atividade detectada.*"
        
        sorted_activities = sorted(
            result.activities_summary.items(),
            key=lambda x: -x[1]
        )
        
        table = "| Atividade | Frequência |\n|-----------|------------|\n"
        for activity, count in sorted_activities:
            table += f"| {activity} | {count} |\n"
        
        return f"""## Detecção de Atividades

{table}
"""
    
    def _generate_anomalies_section(self, result: VideoAnalysisResult) -> str:
        """Gera seção de anomalias."""
        if result.total_anomalies == 0:
            return "## Anomalias Detectadas\n\n*Nenhuma anomalia detectada durante a análise.*"
        
        # Tabela por tipo
        type_table = "| Tipo de Anomalia | Quantidade |\n|------------------|------------|\n"
        for atype, count in result.anomalies_by_type.items():
            type_table += f"| {atype} | {count} |\n"
        
        # Lista detalhada (limitada a 20 eventos)
        events_list = ""
        for i, event in enumerate(result.anomaly_events[:20], 1):
            events_list += f"\n### {i}. {event['tipo'].replace('_', ' ').title()}\n"
            events_list += f"- **Timestamp**: {event['timestamp']}\n"
            events_list += f"- **Frame**: {event['frame']}\n"
            events_list += f"- **Severidade**: {event['severidade']}\n"
            events_list += f"- **Descrição**: {event['descricao']}\n"
        
        more_events = ""
        if len(result.anomaly_events) > 20:
            more_events = f"\n*... e mais {len(result.anomaly_events) - 20} eventos.*"
        
        return f"""## Anomalias Detectadas

**Total**: {result.total_anomalies} anomalias

### Distribuição por Tipo

{type_table}

### Detalhamento dos Eventos
{events_list}{more_events}
"""
    
    def _generate_llm_summary(self, result: VideoAnalysisResult) -> str:
        """Gera resumo usando LLM."""
        if not self.llm:
            return self._generate_template_summary(result)
        
        # Prepara dados para o prompt
        data = {
            "duracao": f"{result.duration_seconds:.1f}s",
            "frames": result.total_frames,
            "pessoas": result.unique_faces,
            "emocoes": result.emotions_summary,
            "atividades": result.activities_summary,
            "anomalias": result.total_anomalies,
            "tipos_anomalia": result.anomalies_by_type
        }
        
        prompt = self.PromptTemplate.from_template("""
Você é um analista de segurança e comportamento. Com base nos dados de análise de vídeo abaixo,
escreva um resumo executivo em português brasileiro de 3-4 parágrafos.

Dados da análise:
- Duração do vídeo: {duracao}
- Total de frames: {frames}
- Pessoas identificadas: {pessoas}
- Emoções detectadas: {emocoes}
- Atividades detectadas: {atividades}
- Número de anomalias: {anomalias}
- Tipos de anomalia: {tipos_anomalia}

O resumo deve:
1. Descrever o cenário geral observado no vídeo
2. Destacar as emoções e atividades predominantes
3. Comentar sobre as anomalias detectadas (se houver)
4. Fornecer insights relevantes sobre o comportamento observado

Resumo:
""")
        
        try:
            chain = prompt | self.llm
            response = chain.invoke(data)
            return response.content.strip()
        except Exception as e:
            print(f"[ERRO] LLM: {e}")
            return self._generate_template_summary(result)
    
    def _generate_template_summary(self, result: VideoAnalysisResult) -> str:
        """Gera resumo usando template (sem LLM)."""
        # Emoção dominante
        dominant_emotion = "N/A"
        if result.emotions_summary:
            dominant_emotion = EMOTION_LABELS.get(
                max(result.emotions_summary, key=result.emotions_summary.get),
                "Desconhecida"
            )
        
        # Atividade dominante
        dominant_activity = "N/A"
        if result.activities_summary:
            dominant_activity = max(
                result.activities_summary, 
                key=result.activities_summary.get
            )
        
        # Monta resumo
        summary = f"""O vídeo analisado tem duração de {result.duration_seconds:.1f} segundos e contém {result.total_frames} frames.

Durante a análise, foram identificadas **{result.unique_faces} pessoa(s)** com um total de **{result.total_faces_detected} detecções de rostos**. A emoção predominante observada foi **{dominant_emotion}**, enquanto a atividade mais frequente foi **{dominant_activity}**.

"""
        
        if result.total_anomalies > 0:
            summary += f"""Foram detectadas **{result.total_anomalies} anomalias** comportamentais durante o vídeo. """
            
            if result.anomalies_by_type:
                types_desc = ", ".join([
                    f"{count} {atype.replace('_', ' ')}"
                    for atype, count in result.anomalies_by_type.items()
                ])
                summary += f"Os tipos identificados incluem: {types_desc}."
        else:
            summary += "Não foram detectadas anomalias significativas durante o período analisado."
        
        return summary
    
    def save_json_report(
        self,
        analysis_result: VideoAnalysisResult,
        output_path: str
    ):
        """Salva relatório em formato JSON."""
        data = {
            "video_path": analysis_result.video_path,
            "total_frames": analysis_result.total_frames,
            "fps": analysis_result.fps,
            "duration_seconds": analysis_result.duration_seconds,
            "total_faces_detected": analysis_result.total_faces_detected,
            "unique_faces": analysis_result.unique_faces,
            "emotions_summary": analysis_result.emotions_summary,
            "activities_summary": analysis_result.activities_summary,
            "total_anomalies": analysis_result.total_anomalies,
            "anomalies_by_type": analysis_result.anomalies_by_type,
            "anomaly_events": analysis_result.anomaly_events,
            "processing_time_seconds": analysis_result.processing_time_seconds,
            "generated_at": datetime.now().isoformat()
        }
        
        Path(output_path).write_text(
            json.dumps(data, indent=2, ensure_ascii=False),
            encoding='utf-8'
        )
        print(f"[INFO] Relatório JSON salvo em: {output_path}")



In [None]:
%%writefile /content/TC-4/src/visualizer.py
"""
Tech Challenge - Fase 4: Módulo de Visualização
Funções para desenhar detecções e anotações em frames.
"""

import cv2
import numpy as np
from typing import List, Tuple, Optional
from PIL import Image as PILImage, ImageDraw, ImageFont

from .face_detector import FaceDetection
from .emotion_analyzer import EmotionResult
from .activity_detector import ActivityDetection
from .anomaly_detector import AnomalyEvent


# Cores padrão (RGB para PIL)
COLORS = {
    "face": (0, 255, 0),       # Verde
    "emotion": (0, 255, 255),  # Ciano
    "activity": (255, 165, 0), # Laranja
    "anomaly": (255, 0, 0),    # Vermelho
    "text": (255, 255, 255),   # Branco
}


def _get_font(size: int = 20) -> ImageFont.FreeTypeFont:
    """Obtém fonte com suporte a UTF-8."""
    font_paths = [
        "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf",
        "/usr/share/fonts/TTF/DejaVuSans.ttf",
        "/usr/share/fonts/truetype/liberation/LiberationSans-Regular.ttf",
    ]
    for path in font_paths:
        try:
            return ImageFont.truetype(path, size)
        except (IOError, OSError):
            continue
    return ImageFont.load_default()


def put_text(
    img: np.ndarray, 
    text: str, 
    position: Tuple[int, int], 
    font_size: int = 20, 
    color: Tuple[int, int, int] = (255, 255, 255)
) -> np.ndarray:
    """Adiciona texto com suporte a UTF-8 usando PIL."""
    img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    pil_img = PILImage.fromarray(img_rgb)
    draw = ImageDraw.Draw(pil_img)
    font = _get_font(font_size)
    
    x, y = position
    # Borda preta para contraste
    for dx, dy in [(-1, -1), (-1, 1), (1, -1), (1, 1)]:
        draw.text((x + dx, y + dy), text, font=font, fill=(0, 0, 0))
    draw.text(position, text, font=font, fill=color)
    
    return cv2.cvtColor(np.array(pil_img), cv2.COLOR_RGB2BGR)


def draw_detections(
    frame: np.ndarray,
    faces: List[FaceDetection],
    emotions: List[Optional[EmotionResult]],
    activities: List[ActivityDetection],
    anomalies: List[AnomalyEvent],
    min_face_size: int = 40,
    min_emotion_conf: float = 0.3,
    min_activity_conf: float = 0.4
) -> np.ndarray:
    """
    Desenha todas as detecções no frame.
    
    Args:
        frame: Frame BGR
        faces: Lista de detecções de faces
        emotions: Lista de resultados de emoções (pode ter None)
        activities: Lista de detecções de atividades
        anomalies: Lista de anomalias detectadas
        min_face_size: Tamanho mínimo de face para exibir
        min_emotion_conf: Confiança mínima para exibir emoção
        min_activity_conf: Confiança mínima para exibir atividade
    
    Returns:
        Frame anotado
    """
    annotated = frame.copy()
    h, w = frame.shape[:2]
    
    # Filtra faces válidas
    valid_faces = [f for f in faces if _is_valid_face(f, w, h, min_face_size)]
    
    # Desenha faces
    for i, face in enumerate(valid_faces):
        x, y, fw, fh = face.bbox
        cv2.rectangle(annotated, (x, y), (x + fw, y + fh), (0, 255, 0), 2)
        annotated = put_text(annotated, f"ID:{face.face_id}", (x, max(0, y - 25)), 18, COLORS["face"])
        
        # Emoção correspondente
        if i < len(emotions) and emotions[i] is not None:
            emotion = emotions[i]
            if emotion.confidence >= min_emotion_conf:
                text = f"{emotion.emotion_pt}: {emotion.confidence:.0%}"
                annotated = put_text(annotated, text, (x, y + fh + 5), 16, COLORS["emotion"])
    
    # Desenha atividades (apenas de pessoas detectadas pelo YOLO)
    for activity in activities:
        if activity.confidence < min_activity_conf:
            continue
        if activity.bbox:
            ax, ay, aw, ah = activity.bbox
            # Desenha bbox da pessoa (azul)
            cv2.rectangle(annotated, (ax, ay), (ax + aw, ay + ah), (255, 100, 0), 1)
            annotated = put_text(annotated, activity.activity_pt, (ax, max(0, ay - 10)), 18, COLORS["activity"])
    
    # Indicador de anomalias
    if anomalies:
        annotated = put_text(annotated, f"⚠ {len(anomalies)} ANOMALIA(S)", (10, 10), 24, COLORS["anomaly"])
    
    return annotated


def _is_valid_face(
    face: FaceDetection, 
    frame_w: int, 
    frame_h: int, 
    min_size: int = 40
) -> bool:
    """Valida se uma detecção de face é plausível."""
    x, y, w, h = face.bbox
    
    # Tamanho mínimo
    if w < min_size or h < min_size:
        return False
    
    # Proporção válida (rostos são aproximadamente quadrados)
    aspect = w / h if h > 0 else 0
    if aspect < 0.5 or aspect > 2.0:
        return False
    
    # Dentro dos limites
    if x < 0 or y < 0 or x + w > frame_w or y + h > frame_h:
        return False
    
    return True


def show_frame(frame: np.ndarray, title: str = "Frame", figsize: Tuple[int, int] = (12, 8)):
    """Exibe um frame no notebook usando matplotlib."""
    import matplotlib.pyplot as plt
    plt.figure(figsize=figsize)
    rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    plt.imshow(rgb_frame)
    plt.title(title)
    plt.axis('off')
    plt.tight_layout()
    plt.show()



In [None]:
%%writefile /content/TC-4/src/__init__.py
# Tech Challenge - Fase 4
# Análise de Vídeo com Reconhecimento Facial, Emoções e Atividades
from .config import VIDEO_PATH, OUTPUT_DIR, REPORTS_DIR, INPUT_DIR
from .face_detector import FaceDetector, FaceDetection
from .emotion_analyzer import EmotionAnalyzer, EmotionResult
from .activity_detector import ActivityDetector, ActivityDetection, ActivityType
from .anomaly_detector import AnomalyDetector, AnomalyEvent
from .report_generator import ReportGenerator
from .visualizer import draw_detections, put_text, show_frame

__all__ = [
    "VIDEO_PATH", "OUTPUT_DIR", "REPORTS_DIR", "INPUT_DIR",
    "FaceDetector", "FaceDetection",
    "EmotionAnalyzer", "EmotionResult", 
    "ActivityDetector", "ActivityDetection", "ActivityType",
    "AnomalyDetector", "AnomalyEvent",
    "ReportGenerator",
    "draw_detections", "put_text", "show_frame",
]


In [None]:
# Recarrega módulos para garantir que as alterações tenham efeito
import importlib
try:
    import src.config
    importlib.reload(src.config)
except ImportError:
    pass


In [None]:

import cv2
import time
import numpy as np
import matplotlib.pyplot as plt
from collections import Counter, deque
from IPython.display import display, HTML, Video
import base64

# Importa módulos do projeto
from src import (
    OUTPUT_DIR, REPORTS_DIR,
    FaceDetector, EmotionAnalyzer, ActivityDetector, 
    AnomalyDetector, ReportGenerator,
    draw_detections, show_frame
)
from src.config import ACTIVITY_CATEGORIES, EMOTION_LABELS, FRAME_SKIP

# Define vídeo (usa o encontrado ou padrão)
VIDEO_PATH_USE = globals().get('VIDEO_PATH_FOUND')
if not VIDEO_PATH_USE:
    print("ERRO: Nenhum vídeo definido. Por favor, faça upload de um vídeo em /content/TC-4/input/")
else:
    print(f"Iniciando análise de: {VIDEO_PATH_USE}")

# Configurações locais
CONFIG = {
    "detection_interval": FRAME_SKIP,
    "min_face_size": 40,
    "output_video": str(OUTPUT_DIR / "video_analisado.mp4")
}

# Inicializa Detectores
print("Carregando modelos IA...")
face_detector = FaceDetector(method="mediapipe") # MediaPipe é leve e bom para Colab
emotion_analyzer = EmotionAnalyzer(method="fer") # FER funciona bem no Colab com GPU/CPU
activity_detector = ActivityDetector(model_size="s") # Usando Small para balancear
anomaly_detector = AnomalyDetector()

print("Modelos carregados!")

if VIDEO_PATH_USE:
    # Abre vídeo
    cap = cv2.VideoCapture(VIDEO_PATH_USE)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv2.CAP_PROP_FPS)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    print(f"Vídeo: {width}x{height} @ {fps:.1f} fps ({total_frames} frames)")

    # Configura Gravador de Vídeo
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(CONFIG["output_video"], fourcc, fps, (width, height))

    # Estatísticas
    stats = {
        "faces": 0,
        "emotions": Counter(),
        "activities": Counter(),
        "anomalies": Counter()
    }

    # Cache para frames intermediários
    cache = {"faces": [], "emotions": [], "activities": [], "anomalies": []}

    frame_idx = 0
    start_time = time.time()

    print(f"Processando {total_frames} frames...")

    while True:
        ret, frame = cap.read()
        if not ret:
            break
        
        # Detecção completa a cada N frames
        if frame_idx % CONFIG["detection_interval"] == 0:
            # Faces
            cache["faces"] = face_detector.detect(frame)
            stats["faces"] += len(cache["faces"])
            
            # Emoções
            cache["emotions"] = []
            for face in cache["faces"]:
                if face.bbox[2] >= CONFIG["min_face_size"]:
                    em = emotion_analyzer.analyze(frame, face.bbox, face.face_id)
                    cache["emotions"].append(em)
                    if em:
                        stats["emotions"][em.emotion_pt] += 1
                else:
                    cache["emotions"].append(None)
            
            # Atividades (YOLO)
            cache["activities"] = activity_detector.detect(frame)
            for act in cache["activities"]:
                stats["activities"][act.activity_pt] += 1
            
            # Anomalias
            cache["anomalies"] = anomaly_detector.update(
                frame_idx, cache["faces"], 
                [e for e in cache["emotions"] if e],
                cache["activities"]
            )
            for a in cache["anomalies"]:
                stats["anomalies"][a.anomaly_type] += 1
        
        # Desenha e salva
        annotated = draw_detections(
            frame, cache["faces"], cache["emotions"],
            cache["activities"], cache["anomalies"],
            CONFIG["min_face_size"]
        )
        out.write(annotated)
        
        # Progresso a cada 10%
        if frame_idx % (total_frames // 10) == 0 and frame_idx > 0:
            pct = frame_idx / total_frames * 100
            elapsed = time.time() - start_time
            fps_proc = frame_idx / elapsed
            print(f"  {pct:.0f}% - FPS: {fps_proc:.1f}")
        
        frame_idx += 1

    cap.release()
    out.release()

    elapsed = time.time() - start_time
    print(f"\nConcluído em {elapsed:.1f}s ({frame_idx/elapsed:.1f} fps)")
    print(f"Vídeo salvo: {CONFIG['output_video']}")


In [None]:
# Exibe Estatísticas
print("=" * 50)
print("ESTATÍSTICAS DA ANÁLISE")
print("=" * 50)
print(f"\nFaces detectadas (total accum): {stats['faces']}")
print(f"\nEmoções (top 5):")
for em, count in stats["emotions"].most_common(5):
    print(f"  - {em}: {count}")
print(f"\nAtividades (top 5):")
for act, count in stats["activities"].most_common(5):
    print(f"  - {act}: {count}")
print(f"\nAnomalias: {sum(stats['anomalies'].values())}")
for anom, count in stats["anomalies"].most_common():
    print(f"  - {anom}: {count}")

# Gráficos
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

if stats["emotions"]:
    em_data = stats["emotions"].most_common(6)
    axes[0].barh([x[0] for x in em_data], [x[1] for x in em_data], color='steelblue')
    axes[0].set_title('Emoções Detectadas')

if stats["activities"]:
    act_data = stats["activities"].most_common(6)
    axes[1].barh([x[0] for x in act_data], [x[1] for x in act_data], color='coral')
    axes[1].set_title('Atividades Detectadas')

plt.tight_layout()
plt.show()


In [None]:

def show_video_colab(video_path):
    mp4 = open(video_path, 'rb').read()
    data_url = "data:video/mp4;base64," + base64.b64encode(mp4).decode()
    return HTML(f"""
    <video width=800 controls>
          <source src="{data_url}" type="video/mp4">
    </video>
    """)

import os
if os.path.exists(CONFIG["output_video"]):
    file_size_mb = os.path.getsize(CONFIG["output_video"]) / (1024 * 1024)
    print(f"Vídeo processado: {file_size_mb:.1f} MB")
    
    # Se o vídeo for muito grande, o Colab pode travar ao tentar incorporar em base64
    if file_size_mb > 50:
        print("Vídeo muito grande para exibição inline (>50MB). Baixe o arquivo para visualizar.")
    else:
        # Tenta exibir apenas se for pequeno
        try:
             display(show_video_colab(CONFIG["output_video"]))
        except Exception as e:
             print(f"Erro ao exibir vídeo: {e}")
else:
    print("Arquivo de vídeo não encontrado.")


In [None]:
from google.colab import files
if os.path.exists(CONFIG["output_video"]):
    print("Iniciando download...")
    files.download(CONFIG["output_video"])
