# 01. Módulo de Detección - Arquitectura Híbrida (Kornia + YOLO)

Este notebook implementa el sistema de detección y seguimiento de objetos para OccultaShield.

## Arquitectura de Detección

```
┌─────────────────┬───────────────────────┬─────────────────┐
│ Tipo            │ Modelo                │ Framework       │
├─────────────────┼───────────────────────┼─────────────────┤
│ Caras           │ YuNet (FaceDetector)  │ Kornia AI       │
│ Personas        │ YOLOv10 (nano/s/m)    │ Ultralytics     │
│ Matrículas      │ YOLO-LPR              │ Ultralytics     │
└─────────────────┴───────────────────────┴─────────────────┘
```

**Ventajas de esta arquitectura:**
- Kornia FaceDetector (YuNet) es nativo GPU - sin dependencias adicionales
- YOLOv10 para personas - precisión comprobada + velocidad
- Auto-adaptación según VRAM disponible (nano/small/medium)

### 1. Imports y Configuración Inicial
Aquí preparamos el entorno:
- Importamos librerías clave: `cv2` (procesamiento de video), `numpy` (matrices), `torch` y `ultralytics` (IA/YOLO).
- `nest_asyncio.apply()`: Es fundamental en Jupyter para permitir que bucles asíncronos (async/await) funcionen dentro de las celdas.

In [None]:
import sys
import os
import asyncio
import logging
from pathlib import Path
from typing import List, Dict, Optional, Tuple, Callable
from dataclasses import dataclass, field, asdict
from enum import Enum
from concurrent.futures import ThreadPoolExecutor

import cv2
import numpy as np
import torch
import nest_asyncio
from ultralytics import YOLO
from scipy.optimize import linear_sum_assignment

# Kornia para efectos GPU y detección de caras
try:
    import kornia
    import kornia.filters
    from kornia.contrib import FaceDetector, FaceDetectorResult
    KORNIA_AVAILABLE = True
    KORNIA_FACE_AVAILABLE = True
except ImportError:
    try:
        import kornia
        import kornia.filters
        KORNIA_AVAILABLE = True
        KORNIA_FACE_AVAILABLE = False
        print("Kornia instalado pero FaceDetector no disponible. Actualiza: pip install kornia>=0.7.0")
    except ImportError:
        KORNIA_AVAILABLE = False
        KORNIA_FACE_AVAILABLE = False
        print("Kornia no instalado. Ejecuta: pip install kornia>=0.7.0")

nest_asyncio.apply()

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger('detection_module')

# =============================================================================
# CONFIGURACIÓN
# =============================================================================
MIN_DETECTION_AREA = 500  # Área mínima (más pequeña para caras/matrículas)

class PrivacyCategory(Enum):
    """Categorías de datos sensibles según GDPR"""
    PERSON = "person"
    FACE = "face"
    LICENSE_PLATE = "license_plate"
    FINGERPRINT = "fingerprint"
    ID_DOCUMENT = "id_document"
    CREDIT_CARD = "credit_card"
    SIGNATURE = "signature"

# Mapeo de severidad GDPR por tipo de detección
GDPR_SEVERITY = {
    "face": "high",
    "fingerprint": "high",
    "license_plate": "high",
    "id_document": "high",
    "credit_card": "high",
    "signature": "medium",
    "person": "medium",
}

# =============================================================================
# GPU MANAGER - Auto-detección y gestión de VRAM
# =============================================================================
class GPUManager:
    """
    Singleton para gestionar recursos GPU.
    Detecta automáticamente la GPU y su VRAM disponible.
    """
    _instance = None
    
    def __new__(cls):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
            cls._instance._initialized = False
        return cls._instance
    
    def __init__(self):
        if self._initialized:
            return
        self._initialized = True
        self._refresh_info()
    
    def _refresh_info(self):
        if torch.cuda.is_available():
            self.device = "cuda"
            self.device_name = torch.cuda.get_device_name(0)
            self.vram_total_mb = torch.cuda.get_device_properties(0).total_memory // (1024**2)
            self.vram_free_mb = self._get_free_vram()
        else:
            self.device = "cpu"
            self.device_name = "CPU"
            self.vram_total_mb = 0
            self.vram_free_mb = 0
        
        logger.info(f"GPU Manager: {self.device_name}, VRAM: {self.vram_total_mb}MB")
    
    def _get_free_vram(self) -> int:
        if not torch.cuda.is_available():
            return 0
        torch.cuda.synchronize()
        allocated = torch.cuda.memory_allocated(0)
        total = torch.cuda.get_device_properties(0).total_memory
        return (total - allocated) // (1024**2)
    
    def can_fit_model(self, required_mb: int, safety_margin: float = 0.2) -> bool:
        """Verifica si hay suficiente VRAM para cargar un modelo"""
        if self.device == "cpu":
            return True
        available = self._get_free_vram()
        required_with_margin = int(required_mb * (1 + safety_margin))
        return available >= required_with_margin
    
    def get_strategy(self) -> Tuple[str, str, int]:
        """
        Determina la estrategia óptima según VRAM disponible.
        Returns: (strategy, model_size, batch_size)
        """
        vram_gb = self.vram_total_mb / 1024
        
        if vram_gb < 8:
            return "sequential", "nano", 4
        elif vram_gb < 16:
            return "parallel", "small", 16
        else:  # 16GB+ → usar medium (máximo permitido) con batch escalable
            return "parallel", "medium", min(64, int(vram_gb * 2))

# Instancia global
gpu_manager = GPUManager()

# =============================================================================
# HYBRID DETECTOR MANAGER - Kornia FaceDetector + YOLOv10
# =============================================================================
class HybridDetectorManager:
    """
    Gestor híbrido de detectores: Kornia AI (caras) + YOLO (personas, matrículas).
    
    Arquitectura:
    - Personas: YOLOv10 (nano/s/m según VRAM)
    - Caras: Kornia YuNet (FaceDetector) - nativo GPU, sin dependencias extra
    - Matrículas: YOLO-LPR
    
    Ventajas:
    - Reduce dependencia de modelos YOLO de 3 a 2
    - FaceDetector de Kornia es nativo GPU y muy eficiente
    - Mantiene precisión de YOLOv10 para personas
    """
    
    # Configuración de modelos YOLO por tamaño
    YOLO_CONFIGS = {
        "nano": {"person": "yolov10n.pt", "plate": "yolov8n.pt"},
        "small": {"person": "yolov10s.pt", "plate": "yolov8s.pt"},
        "medium": {"person": "yolov10m.pt", "plate": "yolov8m.pt"},
    }
    
    def __init__(
        self, 
        gpu_mgr: GPUManager = None,
        person_model: str = None,
        plate_model: str = None,
        face_confidence: float = 0.5,
        person_confidence: float = 0.5
    ):
        """
        Args:
            gpu_mgr: Instancia de GPUManager (usa global si no se proporciona)
            person_model: Ruta personalizada al modelo de personas
            plate_model: Ruta personalizada al modelo de matrículas
            face_confidence: Umbral de confianza para caras (Kornia)
            person_confidence: Umbral de confianza para personas/matrículas (YOLO)
        """
        self.gpu = gpu_mgr or gpu_manager
        self.device = self.gpu.device
        self.strategy, self.model_size, self.batch_size = self.gpu.get_strategy()
        
        self.face_confidence = face_confidence
        self.person_confidence = person_confidence
        
        # Inicializar detectores
        self._init_face_detector()
        self._init_yolo_detectors(person_model, plate_model)
        
        logger.info(f"HybridDetectorManager: strategy={self.strategy}, size={self.model_size}, "
                   f"device={self.device}, kornia_face={KORNIA_FACE_AVAILABLE}")
    
    def _init_face_detector(self):
        """Inicializa Kornia FaceDetector (YuNet)"""
        self.face_detector = None
        
        if KORNIA_FACE_AVAILABLE:
            try:
                self.face_detector = FaceDetector().to(self.device)
                logger.info("✓ Kornia FaceDetector (YuNet) loaded")
            except Exception as e:
                logger.warning(f"Could not load Kornia FaceDetector: {e}")
                self.face_detector = None
        
        # Fallback a OpenCV Haar si Kornia no está disponible
        if self.face_detector is None:
            self.face_cascade = cv2.CascadeClassifier(
                cv2.data.haarcascades + 'haarcascade_frontalface_default.xml'
            )
            logger.info("Using OpenCV Haar Cascade fallback for face detection")
    
    def _init_yolo_detectors(self, person_model: str, plate_model: str):
        """Inicializa detectores YOLO para personas y matrículas"""
        config = self.YOLO_CONFIGS[self.model_size]
        
        # Detector de personas (YOLOv10)
        person_path = person_model or config["person"]
        try:
            self.person_detector = YOLO(person_path)
            logger.info(f"✓ YOLO person detector loaded: {person_path}")
        except Exception as e:
            logger.error(f"Failed to load person model: {e}")
            self.person_detector = None
        
        # Detector de matrículas (opcional)
        self.plate_detector = None
        if plate_model and os.path.exists(plate_model):
            try:
                self.plate_detector = YOLO(plate_model)
                logger.info(f"✓ YOLO plate detector loaded: {plate_model}")
            except Exception as e:
                logger.warning(f"Could not load plate model: {e}")
    
    def _numpy_to_tensor(self, frame: np.ndarray) -> torch.Tensor:
        """Convierte frame numpy BGR a tensor GPU RGB normalizado"""
        # BGR -> RGB
        rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        # (H, W, C) -> (1, C, H, W) normalizado a [0, 1]
        tensor = torch.from_numpy(rgb).permute(2, 0, 1).unsqueeze(0).float() / 255.0
        return tensor.to(self.device)
    
    def detect_faces_kornia(self, tensor: torch.Tensor, frame_num: int) -> List[Tuple[str, 'BoundingBox']]:
        """Detecta caras usando Kornia FaceDetector (YuNet)"""
        if self.face_detector is None:
            return []
        
        results = []
        
        with torch.no_grad():
            detections = self.face_detector(tensor)
        
        # Procesar resultados de Kornia FaceDetector
        for det in detections:
            if det.data.numel() == 0:
                continue
            
            # YuNet devuelve [x, y, w, h, score, landmarks...]
            # get_keypoints() devuelve keypoints, usamos los primeros 4 para bbox
            try:
                # Obtener bboxes y scores
                data = det.data.cpu().numpy()
                for row in data:
                    if len(row) >= 5:
                        x, y, w, h, score = row[:5]
                        
                        if score >= self.face_confidence:
                            # Desnormalizar coordenadas si están normalizadas
                            _, _, img_h, img_w = tensor.shape
                            
                            # Convertir a x1, y1, x2, y2
                            x1, y1 = float(x), float(y)
                            x2, y2 = float(x + w), float(y + h)
                            
                            bbox = BoundingBox(x1, y1, x2, y2, float(score), frame_num)
                            
                            if bbox.area >= MIN_DETECTION_AREA:
                                results.append(("face", bbox))
            except Exception as e:
                logger.debug(f"Error processing face detection: {e}")
        
        return results
    
    def detect_faces_opencv(self, frame: np.ndarray, frame_num: int) -> List[Tuple[str, 'BoundingBox']]:
        """Fallback: Detección de caras con OpenCV Haar Cascade"""
        if not hasattr(self, 'face_cascade'):
            return []
        
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        
        faces = self.face_cascade.detectMultiScale(
            gray,
            scaleFactor=1.1,
            minNeighbors=5,
            minSize=(30, 30)
        )
        
        results = []
        for (x, y, w, h) in faces:
            bbox = BoundingBox(
                x1=float(x), 
                y1=float(y), 
                x2=float(x + w), 
                y2=float(y + h),
                confidence=0.8,  # Haar no da confianza real
                frame=frame_num
            )
            if bbox.area >= MIN_DETECTION_AREA:
                results.append(("face", bbox))
        
        return results
    
    def detect_persons(self, frame: np.ndarray, frame_num: int) -> List[Tuple[str, 'BoundingBox']]:
        """Detecta personas usando YOLOv10"""
        if self.person_detector is None:
            return []
        
        results_yolo = self.person_detector.predict(
            frame, 
            conf=self.person_confidence, 
            verbose=False,
            device=self.device
        )
        
        results = []
        for r in results_yolo:
            for box in r.boxes:
                cls = int(box.cls[0])
                if cls == 0:  # COCO class 0 = person
                    x1, y1, x2, y2 = box.xyxy[0].tolist()
                    bbox = BoundingBox(x1, y1, x2, y2, float(box.conf[0]), frame_num)
                    
                    if bbox.area >= MIN_DETECTION_AREA:
                        results.append(("person", bbox))
        
        return results
    
    def detect_plates(self, frame: np.ndarray, frame_num: int) -> List[Tuple[str, 'BoundingBox']]:
        """Detecta matrículas usando YOLO-LPR"""
        if self.plate_detector is None:
            return []
        
        results_yolo = self.plate_detector.predict(
            frame, 
            conf=self.person_confidence, 
            verbose=False,
            device=self.device
        )
        
        results = []
        for r in results_yolo:
            for box in r.boxes:
                x1, y1, x2, y2 = box.xyxy[0].tolist()
                bbox = BoundingBox(x1, y1, x2, y2, float(box.conf[0]), frame_num)
                
                if bbox.area >= MIN_DETECTION_AREA:
                    results.append(("license_plate", bbox))
        
        return results
    
    async def detect_all(
        self, 
        frames: List[np.ndarray], 
        frame_nums: List[int],
        conf_threshold: float = 0.5
    ) -> Dict[int, List[Tuple[str, 'BoundingBox']]]:
        """
        Ejecuta todos los detectores en los frames proporcionados.
        
        Args:
            frames: Lista de frames numpy BGR
            frame_nums: Números de frame correspondientes
            conf_threshold: Umbral de confianza
            
        Returns:
            Dict[frame_num, List[(label, BoundingBox)]]
        """
        all_detections = {fn: [] for fn in frame_nums}
        
        for frame, frame_num in zip(frames, frame_nums):
            # Detección de personas (YOLO)
            persons = self.detect_persons(frame, frame_num)
            all_detections[frame_num].extend(persons)
            
            # Detección de caras (Kornia o OpenCV fallback)
            if self.face_detector is not None:
                tensor = self._numpy_to_tensor(frame)
                faces = self.detect_faces_kornia(tensor, frame_num)
            else:
                faces = self.detect_faces_opencv(frame, frame_num)
            all_detections[frame_num].extend(faces)
            
            # Detección de matrículas (YOLO)
            plates = self.detect_plates(frame, frame_num)
            all_detections[frame_num].extend(plates)
        
        return all_detections
    
    def get_info(self) -> Dict:
        """Retorna información sobre la configuración actual"""
        detectors = []
        if self.person_detector:
            detectors.append("person (YOLOv10)")
        if self.face_detector:
            detectors.append("face (Kornia YuNet)")
        elif hasattr(self, 'face_cascade'):
            detectors.append("face (OpenCV Haar)")
        if self.plate_detector:
            detectors.append("plate (YOLO)")
        
        return {
            "strategy": self.strategy,
            "model_size": self.model_size,
            "batch_size": self.batch_size,
            "device": self.device,
            "vram_total_mb": self.gpu.vram_total_mb,
            "detectors": detectors,
            "kornia_available": KORNIA_FACE_AVAILABLE
        }

print(f"✓ GPU detectada: {gpu_manager.device_name}")
print(f"✓ VRAM total: {gpu_manager.vram_total_mb}MB")
print(f"✓ Estrategia recomendada: {gpu_manager.get_strategy()}")
print(f"✓ Kornia FaceDetector disponible: {KORNIA_FACE_AVAILABLE}")

### 2. Modelos de Datos (`models.py`)
Definimos las estructuras de datos (clases) que usaremos para organizar la información:
- **`BoundingBox`**: Representa un recuadro detectado (coordenadas x1, y1, x2, y2). Incluye métodos útiles para calcular su área, ancho y alto.
- **`Capture`**: Guarda la información de una "foto" (snapshot) tomada a un objeto (ruta del archivo, frame, motivo...).
- **`TrackedDetection`**: Es el historial completo de un objeto único a lo largo del tiempo. Contiene su ID, todas sus posiciones pasadas (`bbox_history`) y sus capturas.
- **`DetectionResult`**: Es el informe final del procesamiento del video. Resume estadísticas (FPS, duración) y contiene la lista de todas las detecciones encontradas.

In [None]:
@dataclass
class BoundingBox:
    x1: float
    y1: float
    x2: float
    y2: float
    confidence: float
    frame: int

    @property
    def width(self) -> float:
        return self.x2 - self.x1

    @property
    def height(self) -> float:
        return self.y2 - self.y1

    @property
    def area(self) -> float:
        return self.width * self.height

    def to_dict(self) -> dict:
        return asdict(self)

@dataclass
class Capture:
    frame: int
    image_path: str
    bbox: BoundingBox
    reason: str
    timestamp: float

    def to_dict(self) -> dict:
        return asdict(self)

@dataclass
class TrackedDetection:
    track_id: int
    detection_type: str
    bbox_history: List[BoundingBox] = field(default_factory=list)
    captures: List[Capture] = field(default_factory=list)
    is_confirmed: bool = False
    
    @property
    def last_bbox(self) -> Optional[BoundingBox]:
        return self.bbox_history[-1] if self.bbox_history else None

    @property
    def avg_confidence(self) -> float:
        if not self.bbox_history: return 0.0
        return sum(b.confidence for b in self.bbox_history) / len(self.bbox_history)
    
    @property
    def best_capture(self) -> Optional[Capture]:
        """Retorna la captura con mayor confianza"""
        if not self.captures:
            return None
        return max(self.captures, key=lambda c: c.bbox.confidence)
    
    def add_bbox(self, bbox: BoundingBox):
        self.bbox_history.append(bbox)

    # NUEVO: Método to_dict() que faltaba
    def to_dict(self) -> dict:
        return {
            "track_id": self.track_id,
            "detection_type": self.detection_type,
            "bbox_history": [b.to_dict() for b in self.bbox_history],
            "captures": [c.to_dict() for c in self.captures],
            "is_confirmed": self.is_confirmed,
            "avg_confidence": self.avg_confidence,
            "total_frames": len(self.bbox_history)
        }

@dataclass
class DetectionResult:
    video_path: str
    total_frames: int
    fps: float
    duration_seconds: float
    width: int
    height: int
    detections: List[TrackedDetection] = field(default_factory=list)
    frames_processed: int = 0
    processing_time_seconds: float = 0.0

### 3. Gestor de Capturas (`capture_manager.py`)
Esta clase decide **cuándo** guardar una foto de un objeto.
- No guarda todas las fotos de todos los frames (sería demasiado).
- **`consider_frame`**: Evalúa si merece la pena capturar el objeto en el frame actual.
    - Verifica **estabilidad**: Si la confianza de detección es alta durante varios frames seguidos.
    - Verifica **tiempo**: Para no tener 50 fotos/segundo, impone un intervalo (ej. 1 foto cada segundo).
- **`_save_capture`**: Recorta la imagen (`crop`) alrededor del objeto (con un margen extra) y la guarda en disco.

In [None]:
class CaptureManager:
    def __init__(self, stability_threshold=0.5, stability_frames=3, image_quality=95, crop_margin=20):
        self.stability_threshold = stability_threshold
        self.stability_frames = stability_frames
        self.image_quality = image_quality
        self.crop_margin = crop_margin
        self.track_data = {} # {id: {stable_count: 0, last_capture_time: 0}}

    def consider_frame(self, track_id: int, frame_img: np.ndarray, frame_num: int, bbox: BoundingBox, output_dir: Path, fps: float, capture_interval: float) -> str:
        if track_id not in self.track_data:
            self.track_data[track_id] = {"stable_count": 0, "last_capture_time": -999}
            
        data = self.track_data[track_id]
        
        # Check stability
        if bbox.confidence >= self.stability_threshold:
            data["stable_count"] += 1
        else:
            data["stable_count"] = 0
            
        if data["stable_count"] < self.stability_frames:
            return None
            
        # Check timing
        timestamp = frame_num / fps
        if timestamp - data["last_capture_time"] < capture_interval:
            return None
            
        return self._save_capture(track_id, frame_img, frame_num, bbox, output_dir, timestamp)

    def _save_capture(self, track_id: int, frame_img: np.ndarray, frame_num: int, bbox: BoundingBox, output_dir: Path, timestamp: float) -> str:
        track_dir = output_dir / f"track_{track_id}"
        track_dir.mkdir(parents=True, exist_ok=True)
        
        filename = f"capture_{frame_num}.jpg"
        filepath = track_dir / filename
        
        h, w = frame_img.shape[:2]
        x1 = max(0, int(bbox.x1) - self.crop_margin)
        y1 = max(0, int(bbox.y1) - self.crop_margin)
        x2 = min(w, int(bbox.x2) + self.crop_margin)
        y2 = min(h, int(bbox.y2) + self.crop_margin)
        
        crop = frame_img[y1:y2, x1:x2]
        if crop.size > 0:
            cv2.imwrite(str(filepath), crop, [int(cv2.IMWRITE_JPEG_QUALITY), self.image_quality])
            
        self.track_data[track_id]["last_capture_time"] = timestamp
        return str(filepath)

### 4. Rastreador de Objetos (`tracker.py`)
El modelo YOLO detecta objetos en *cada frame* de forma independiente. No sabe que la persona del frame 10 es la misma que la del frame 11.
Esta clase soluciona eso:
- **`Track`**: Representa un objeto vivo que estamos siguiendo.
- **`ObjectTracker`**: Asigna IDs únicos a los objetos.
- **`update`**: 
    1. Recibe las detecciones nuevas del frame actual.
    2. Las compara con los `tracks` existentes usando **IOU** (Intersección sobre Unión). Si las cajas se solapan mucho, asume que es el mismo objeto.
    3. Crea nuevos tracks para objetos que no coinciden con nada.
    4. Elimina tracks antiguos que llevan mucho tiempo sin verse (`max_age`).

In [None]:
class Track:
    def __init__(self, track_id, detection_type, bbox, frame):
        self.track_id = track_id
        self.detection_type = detection_type
        self.last_bbox = bbox
        self.first_frame = frame
        self.last_frame = frame
        self.hits = 1
        self.age = 0
        
    def update(self, bbox, frame):
        self.last_bbox = bbox
        self.last_frame = frame
        self.hits += 1
        self.age = 0

class ObjectTracker:
    """
    Rastreador de objetos mejorado con Hungarian Algorithm para asignación óptima.
    """
    def __init__(self, iou_threshold=0.3, max_age=30, min_hits=3):
        self.iou_threshold = iou_threshold
        self.max_age = max_age
        self.min_hits = min_hits
        self.tracks = {}
        self.next_id = 1
        
    def update(self, detections: List[Tuple[str, BoundingBox]], frame_num: int) -> List[Tuple[int, str, BoundingBox]]:
        # Envejecer tracks existentes
        for t in self.tracks.values():
            t.age += 1
            
        confirmed_out = []
        dets_by_type = {}
        for cls, bbox in detections:
            dets_by_type.setdefault(cls, []).append(bbox)
            
        for cls, bboxes in dets_by_type.items():
            active_tracks = [t for t in self.tracks.values() if t.detection_type == cls]
            
            # Si no hay tracks o detecciones, manejar casos edge
            if not active_tracks:
                for bbox in bboxes:
                    self._create_track(cls, bbox, frame_num)
                continue
                
            if not bboxes:
                continue
            
            # MEJORADO: Construir matriz de costos para Hungarian Algorithm
            num_tracks = len(active_tracks)
            num_dets = len(bboxes)
            cost_matrix = np.ones((num_tracks, num_dets), dtype=np.float32)
            
            for i, trk in enumerate(active_tracks):
                for j, det in enumerate(bboxes):
                    iou = self._calculate_iou(trk.last_bbox, det)
                    if iou >= self.iou_threshold:
                        cost_matrix[i, j] = 1.0 - iou  # Menor costo = mejor match
            
            # MEJORADO: Asignación óptima con Hungarian Algorithm (scipy)
            row_indices, col_indices = linear_sum_assignment(cost_matrix)
            
            matched_tracks = set()
            matched_dets = set()
            
            for row, col in zip(row_indices, col_indices):
                # Solo aceptar si el costo es aceptable (IOU >= threshold)
                if cost_matrix[row, col] < (1.0 - self.iou_threshold):
                    track = active_tracks[row]
                    matched_tracks.add(track.track_id)
                    matched_dets.add(col)
                    self.tracks[track.track_id].update(bboxes[col], frame_num)
                    
            # Crear nuevos tracks para detecciones sin match
            for j, det in enumerate(bboxes):
                if j not in matched_dets:
                    self._create_track(cls, det, frame_num)
                    
        # Limpieza de tracks muertos y reporte de confirmados
        dead = []
        for tid, t in self.tracks.items():
            if t.age > self.max_age:
                dead.append(tid)
            elif t.hits >= self.min_hits:
                confirmed_out.append((tid, t.detection_type, t.last_bbox))
                
        for tid in dead:
            del self.tracks[tid]
        
        return confirmed_out

    def _create_track(self, cls, bbox, frame):
        self.tracks[self.next_id] = Track(self.next_id, cls, bbox, frame)
        self.next_id += 1

    def _calculate_iou(self, bb1, bb2):
        """Calcula Intersection over Union entre dos bounding boxes"""
        xl = max(bb1.x1, bb2.x1)
        yt = max(bb1.y1, bb2.y1)
        xr = min(bb1.x2, bb2.x2)
        yb = min(bb1.y2, bb2.y2)
        if xr < xl or yb < yt:
            return 0.0
        inter = (xr - xl) * (yb - yt)
        union = bb1.area + bb2.area - inter
        return inter / union if union > 0 else 0.0

### 5. Detector Principal (`detector.py`) - Arquitectura Híbrida

Esta es la clase maestra que orquesta todo el proceso usando **HybridDetectorManager**:

**Arquitectura de Detección:**
| Tipo | Motor | Modelo |
|------|-------|--------|
| Caras | Kornia AI | YuNet (FaceDetector) |
| Personas | Ultralytics | YOLOv10 (nano/s/m) |
| Matrículas | Ultralytics | YOLO-LPR |

**Flujo de Procesamiento:**
1. `__init__`: Inicializa `HybridDetectorManager` que carga:
   - Kornia FaceDetector (YuNet) para caras - nativo GPU
   - YOLOv10 para personas - selección automática según VRAM
   - YOLO-LPR para matrículas (opcional)
2. `process_video`: 
   - Abre el video frame a frame con OpenCV
   - Acumula frames en batches según `batch_size` (auto-calculado)
   - Ejecuta detección híbrida: Kornia (caras) + YOLO (personas/matrículas)
   - Pasa detecciones al `tracker` para IDs estables
   - Guarda capturas con `CaptureManager`
3. `_process_batch`: Procesa un lote de frames en paralelo/secuencial según VRAM

**Auto-Adaptación por VRAM:**
| VRAM | Estrategia | Modelos | Batch |
|------|------------|---------|-------|
| <8GB | Secuencial | nano | 4 |
| 8-16GB | Paralelo | small | 16 |
| 16GB+ | Paralelo | medium | 32-64 |

In [None]:
class VideoDetector:
    """
    Detector multi-modelo para datos sensibles GDPR.

    Utiliza HybridDetectorManager (Kornia FaceDetector + YOLOv10) para gestionar
    automáticamente los modelos según la VRAM disponible.

    Arquitectura Híbrida:
    - Personas: YOLOv10 (nano/s/m según VRAM)
    - Caras: Kornia YuNet (FaceDetector) - nativo GPU, sin dependencias extra
    - Matrículas: YOLO-LPR
    - Huellas/Documentos: requiere módulo de verificación con LLM
    """

    def __init__(
        self,
        hybrid_manager: HybridDetectorManager = None,
        person_model: str = None,
        plate_model: str = None,
        confidence_threshold: float = 0.5
    ):
        """
        Args:
            hybrid_manager: HybridDetectorManager preconfigurado (opcional)
            person_model: Ruta personalizada al modelo de personas (YOLOv10)
            plate_model: Ruta personalizada al modelo de matrículas
            confidence_threshold: Umbral de confianza para detecciones
        """
        self.conf_threshold = confidence_threshold

        # Usar manager proporcionado o crear uno nuevo
        if hybrid_manager:
            self.hybrid_manager = hybrid_manager
        else:
            self.hybrid_manager = HybridDetectorManager(
                person_model=person_model,
                plate_model=plate_model,
                face_confidence=confidence_threshold,
                person_confidence=confidence_threshold
            )

        self.batch_size = self.hybrid_manager.batch_size
        self.device = self.hybrid_manager.device

        logger.info(f"VideoDetector initialized: {self.hybrid_manager.get_info()}")

    async def process_video(
        self, 
        video_path: str, 
        output_dir: str,
        on_progress: Optional[Callable[[int, int, str], None]] = None
    ) -> 'DetectionResult':
        """
        Procesa un video detectando datos sensibles GDPR.
        
        Args:
            video_path: Ruta al archivo de video
            output_dir: Directorio para guardar capturas
            on_progress: Callback de progreso (current, total, message)
            
        Returns:
            DetectionResult con todas las detecciones encontradas
        """
        import time
        start_time = time.time()
        
        output_path = Path(output_dir)
        output_path.mkdir(parents=True, exist_ok=True)
        
        cap = cv2.VideoCapture(video_path)
        if not cap.isOpened():
            raise ValueError(f"Could not open video: {video_path}")
            
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        fps = cap.get(cv2.CAP_PROP_FPS)
        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        
        tracker = ObjectTracker()
        cm = CaptureManager()
        tracked_objects = {}
        frame_num = 0
        
        frame_buffer = []
        frame_nums = []
        
        logger.info(f"Processing video: {total_frames} frames at {fps} FPS ({width}x{height})")
        
        while True:
            ret, frame = cap.read()
            if not ret:
                if frame_buffer:
                    await self._process_batch(
                        frame_buffer, frame_nums, tracker, cm,
                        tracked_objects, output_path, fps, on_progress, total_frames
                    )
                break
            
            frame_num += 1
            frame_buffer.append(frame)
            frame_nums.append(frame_num)
            
            if len(frame_buffer) >= self.batch_size:
                await self._process_batch(
                    frame_buffer, frame_nums, tracker, cm,
                    tracked_objects, output_path, fps, on_progress, total_frames
                )
                frame_buffer = []
                frame_nums = []
                
        cap.release()
        
        processing_time = time.time() - start_time
        
        return DetectionResult(
            video_path=video_path, 
            total_frames=total_frames, 
            fps=fps, 
            duration_seconds=total_frames/fps if fps > 0 else 0, 
            width=width, 
            height=height, 
            detections=list(tracked_objects.values()),
            frames_processed=frame_num,
            processing_time_seconds=processing_time
        )

    async def _process_batch(
        self, 
        frames: List[np.ndarray], 
        frame_nums: List[int], 
        tracker: ObjectTracker, 
        cm: CaptureManager,
        tracked_objects: Dict[int, 'TrackedDetection'], 
        output_path: Path, 
        fps: float,
        on_progress: Optional[Callable],
        total_frames: int
    ):
        """Procesa un batch de frames usando HybridDetectorManager"""
        
        # Obtener detecciones de todos los modelos usando HybridDetectorManager
        all_detections_by_frame = await self.hybrid_manager.detect_all(
            frames, 
            frame_nums, 
            conf_threshold=self.conf_threshold
        )
        
        # Procesar cada frame
        for frame, frame_num in zip(frames, frame_nums):
            current_detections = all_detections_by_frame[frame_num]
            
            # Actualizar Tracker
            confirmed = tracker.update(current_detections, frame_num)
            
            for tid, label, bbox in confirmed:
                if tid not in tracked_objects:
                    tracked_objects[tid] = TrackedDetection(tid, label)
                tracked_objects[tid].add_bbox(bbox)
                
                cap_path = cm.consider_frame(
                    tid, frame, frame_num, bbox, output_path, fps, 
                    capture_interval=1.0
                )
                if cap_path:
                    tracked_objects[tid].captures.append(Capture(
                        frame=frame_num, 
                        image_path=cap_path, 
                        bbox=bbox, 
                        reason="periodic", 
                        timestamp=frame_num/fps
                    ))
            
            if frame_num % 10 == 0 and on_progress:
                msg = f"Frame {frame_num}/{total_frames}"
                if asyncio.iscoroutinefunction(on_progress):
                    await on_progress(frame_num, total_frames, msg)
                else:
                    on_progress(frame_num, total_frames, msg)
        
        await asyncio.sleep(0)  # Yield para otras tareas async

### 6. Ejecución de Prueba
Bloque final para verificar que todo funciona:
1. Define las rutas de entrada (video) y salida.
2. Instancia el `VideoDetector`.
3. Ejecuta `process_video` en un bucle asíncrono (`asyncio.run`).
4. Imprime un resumen de los objetos encontrados y sus capturas.

In [None]:
# =============================================================================
# PRUEBA DE EJECUCIÓN - ARQUITECTURA HÍBRIDA (Kornia + YOLOv10)
# =============================================================================

VIDEO_PATH = "../storage/uploads/coche.mp4"  # Actualiza esto
OUTPUT_DIR = "../storage/captures/notebook_test_hybrid"

# Rutas de modelos personalizados (opcional)
# El sistema auto-selecciona nano/small/medium según tu VRAM
CUSTOM_PERSON_MODEL = None  # e.g., "../models/yolov10m.pt"
CUSTOM_PLATE_MODEL = None   # e.g., "../models/yolo-license-plate.pt"

async def run_hybrid_test():
    if not os.path.exists(VIDEO_PATH):
        print(f"Video no encontrado: {VIDEO_PATH}")
        print("Saltando test. Para ejecutar, descarga un video de prueba.")
        return
    
    print("=" * 70)
    print("PRUEBA DEL MÓDULO DE DETECCIÓN - ARQUITECTURA HÍBRIDA")
    print("Kornia FaceDetector (YuNet) + YOLOv10 (Personas) + YOLO-LPR (Matrículas)")
    print("=" * 70)
    
    # El detector usa HybridDetectorManager internamente
    # Se adaptará automáticamente a tu GPU (nano/small/medium)
    detector = VideoDetector(
        person_model=CUSTOM_PERSON_MODEL,
        plate_model=CUSTOM_PLATE_MODEL,
        confidence_threshold=0.5
    )
    
    # Mostrar configuración del manager híbrido
    info = detector.hybrid_manager.get_info()
    print(f"\nConfiguración Híbrida Auto-Adaptativa:")
    print(f"  - Estrategia: {info['strategy']}")
    print(f"  - Tamaño de modelos YOLO: {info['model_size']}")
    print(f"  - Batch size: {info['batch_size']}")
    print(f"  - Dispositivo: {info['device']}")
    print(f"  - VRAM: {info['vram_total_mb']}MB")
    print(f"  - Kornia FaceDetector: {'✓' if info['kornia_available'] else '✗ (usando OpenCV Haar)'}")
    print(f"  - Detectores activos: {', '.join(info['detectors'])}")
    
    print(f"\nProcesando video: {VIDEO_PATH}")
    print("-" * 70)
    
    def progress_callback(current, total, msg):
        pct = (current / total) * 100
        print(f"\r  Progreso: {pct:.1f}% ({msg})", end="", flush=True)
    
    res = await detector.process_video(
        VIDEO_PATH, 
        OUTPUT_DIR,
        on_progress=progress_callback
    )
    
    print(f"\n\n{'=' * 70}")
    print("RESULTADOS")
    print("=" * 70)
    print(f"  - Frames procesados: {res.frames_processed}/{res.total_frames}")
    print(f"  - Tiempo de procesamiento: {res.processing_time_seconds:.2f}s")
    print(f"  - FPS de procesamiento: {res.frames_processed / res.processing_time_seconds:.1f}")
    print(f"  - Objetos detectados: {len(res.detections)}")
    
    # Agrupar por tipo
    by_type = {}
    for det in res.detections:
        by_type.setdefault(det.detection_type, []).append(det)
    
    print("\nResumen por tipo de detección (Arquitectura Híbrida):")
    for det_type, dets in by_type.items():
        severity = GDPR_SEVERITY.get(det_type, "unknown")
        total_captures = sum(len(d.captures) for d in dets)
        engine = "Kornia YuNet" if det_type == "face" else "YOLOv10" if det_type == "person" else "YOLO-LPR"
        print(f"  - {det_type}: {len(dets)} objetos, {total_captures} capturas "
              f"(Motor: {engine}, Severidad GDPR: {severity})")
    
    print("\nPrimeras 5 detecciones:")
    for det in res.detections[:5]:
        best = det.best_capture
        best_frame = best.frame if best else "N/A"
        print(f"  ID: {det.track_id}, Tipo: {det.detection_type}, "
              f"Frames: {len(det.bbox_history)}, Capturas: {len(det.captures)}, "
              f"Mejor frame: {best_frame}")
    
    print(f"\nCapturas guardadas en: {OUTPUT_DIR}")

# Ejecutar test
asyncio.run(run_hybrid_test())