# Sistema Avanzado de Detecci√≥n y Clasificaci√≥n de Objetos en Video
## Pipeline Completo con YOLOv8, An√°lisis Avanzado y Visualizaci√≥n 3D

Este notebook implementa un sistema completo de detecci√≥n y clasificaci√≥n de objetos en videos pregrabados, optimizado para procesamiento eficiente con aceleraci√≥n GPU y an√°lisis estad√≠stico avanzado.

### Caracter√≠sticas Principales:
- **Entrenamiento YOLOv8** con fine-tuning y data augmentation
- **Clasificaci√≥n avanzada** por tama√±o, color y detalles espec√≠ficos
- **Visualizaciones 3D** y an√°lisis estad√≠stico post-entrenamiento
- **Procesamiento optimizado** con threading y aceleraci√≥n GPU
- **Sistema de guardado jer√°rquico** con metadata completa
- **An√°lisis de rendimiento** y m√©tricas detalladas


In [None]:
# Instalaci√≥n de dependencias necesarias
%pip install ultralytics opencv-python torch torchvision scikit-learn matplotlib seaborn optuna tqdm pyyaml

# Variables de control para dependencias opcionales
ALBUMENTATIONS_AVAILABLE = False
MEDIAPIPE_AVAILABLE = False
TESSERACT_AVAILABLE = False
PYNVML_AVAILABLE = False

print("üîß Sistema de detecci√≥n y clasificaci√≥n de objetos inicializado")
print("üì¶ Dependencias principales instaladas")

# Verificaci√≥n de CUDA
import torch
print(f"CUDA disponible: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"Dispositivo GPU: {torch.cuda.get_device_name(0)}")
    print(f"Memoria GPU: {torch.cuda.get_device_properties(0).total_memory / 1024**3:.1f} GB")
else:
    print("üíª Usando CPU para procesamiento")


In [None]:
# Instalaci√≥n de dependencias opcionales (ejecutar solo si necesitas funcionalidades avanzadas)
print("üîß Instalando dependencias opcionales...")

# Instalar albumentations para data augmentation avanzada
try:
    import subprocess
    result = subprocess.run(["pip", "install", "albumentations"], capture_output=True, text=True, timeout=60)
    if result.returncode == 0:
        ALBUMENTATIONS_AVAILABLE = True
        print("‚úÖ Albumentations instalado correctamente")
    else:
        print("‚ö†Ô∏è Error instalando Albumentations - usando alternativas")
except Exception as e:
    print(f"‚ö†Ô∏è Albumentations no disponible: {e}")

# Instalar mediapipe para an√°lisis de pose
try:
    result = subprocess.run(["pip", "install", "mediapipe"], capture_output=True, text=True, timeout=60)
    if result.returncode == 0:
        MEDIAPIPE_AVAILABLE = True
        print("‚úÖ MediaPipe instalado correctamente")
    else:
        print("‚ö†Ô∏è Error instalando MediaPipe - an√°lisis de pose deshabilitado")
except Exception as e:
    print(f"‚ö†Ô∏è MediaPipe no disponible: {e}")

# Instalar pytesseract para OCR
try:
    result = subprocess.run(["pip", "install", "pytesseract"], capture_output=True, text=True, timeout=60)
    if result.returncode == 0:
        TESSERACT_AVAILABLE = True
        print("‚úÖ Tesseract instalado correctamente")
    else:
        print("‚ö†Ô∏è Error instalando Tesseract - OCR deshabilitado")
except Exception as e:
    print(f"‚ö†Ô∏è Tesseract no disponible: {e}")

# Instalar pynvml para monitoreo GPU
try:
    result = subprocess.run(["pip", "install", "pynvml"], capture_output=True, text=True, timeout=60)
    if result.returncode == 0:
        PYNVML_AVAILABLE = True
        print("‚úÖ pynvml instalado correctamente")
    else:
        print("‚ö†Ô∏è Error instalando pynvml - monitoreo GPU limitado")
except Exception as e:
    print(f"‚ö†Ô∏è pynvml no disponible: {e}")

print(f"\nüìä Estado de dependencias opcionales:")
print(f"   ‚Ä¢ Albumentations: {'‚úÖ' if ALBUMENTATIONS_AVAILABLE else '‚ùå'}")
print(f"   ‚Ä¢ MediaPipe: {'‚úÖ' if MEDIAPIPE_AVAILABLE else '‚ùå'}")
print(f"   ‚Ä¢ Tesseract: {'‚úÖ' if TESSERACT_AVAILABLE else '‚ùå'}")
print(f"   ‚Ä¢ pynvml: {'‚úÖ' if PYNVML_AVAILABLE else '‚ùå'}")
print("\nüí° El sistema funcionar√° con las dependencias b√°sicas instaladas")


In [None]:
# Importaciones principales
import os
import cv2
import numpy as np
import pandas as pd
import json
import yaml
import logging
import time
import gc
import threading
import queue
from pathlib import Path
from datetime import datetime
from typing import Dict, List, Tuple, Optional, Any
import warnings
warnings.filterwarnings('ignore')

# Machine Learning y Deep Learning
import torch
import torch.nn as nn
from ultralytics import YOLO
from ultralytics.utils import LOGGER

# Importaciones opcionales con manejo de errores
try:
    import albumentations as A
    from albumentations.pytorch import ToTensorV2
    ALBUMENTATIONS_AVAILABLE = True
except ImportError:
    ALBUMENTATIONS_AVAILABLE = False
    print("‚ö†Ô∏è Albumentations no disponible - usando alternativas")

# An√°lisis de datos y visualizaci√≥n
import matplotlib.pyplot as plt
import seaborn as sns
from mpl_toolkits.mplot3d import Axes3D
from sklearn.linear_model import LinearRegression
from sklearn.cluster import KMeans
from sklearn.metrics import r2_score, mean_squared_error
from scipy import stats
import optuna

# Procesamiento de im√°genes y video (opcional)
try:
    import pytesseract
    TESSERACT_AVAILABLE = True
except ImportError:
    TESSERACT_AVAILABLE = False
    print("‚ö†Ô∏è Tesseract no disponible - OCR deshabilitado")

try:
    import mediapipe as mp
    MEDIAPIPE_AVAILABLE = True
except ImportError:
    MEDIAPIPE_AVAILABLE = False
    print("‚ö†Ô∏è MediaPipe no disponible - an√°lisis de pose deshabilitado")

from tqdm import tqdm

# Configuraci√≥n de matplotlib para mejor visualizaci√≥n
try:
    plt.style.use('seaborn-v0_8')
except:
    plt.style.use('default')
sns.set_palette("husl")

print("‚úÖ Dependencias principales importadas correctamente")
print(f"   ‚Ä¢ Albumentations: {'‚úÖ' if ALBUMENTATIONS_AVAILABLE else '‚ùå'}")
print(f"   ‚Ä¢ Tesseract: {'‚úÖ' if TESSERACT_AVAILABLE else '‚ùå'}")
print(f"   ‚Ä¢ MediaPipe: {'‚úÖ' if MEDIAPIPE_AVAILABLE else '‚ùå'}")


In [None]:
# Configuraci√≥n del sistema
class Config:
    """Clase para manejar la configuraci√≥n del sistema"""
    
    def __init__(self, config_path: str = "../../config.yaml"):
        self.config_path = config_path
        self.load_config()
        self.setup_logging()
        self.setup_directories()
    
    def load_config(self):
        """Carga la configuraci√≥n desde archivo YAML"""
        try:
            with open(self.config_path, 'r', encoding='utf-8') as file:
                self.config = yaml.safe_load(file)
        except FileNotFoundError:
            print(f"‚ö†Ô∏è Archivo de configuraci√≥n no encontrado: {self.config_path}")
            self.config = self._get_default_config()
    
    def _get_default_config(self):
        """Configuraci√≥n por defecto si no existe archivo"""
        return {
            'training': {'epochs': 100, 'batch_size': 16, 'lr0': 0.01},
            'detection': {'conf_threshold': 0.5, 'iou_threshold': 0.4},
            'output': {'base_dir': './outputs'},
            'logging': {'level': 'INFO'}
        }
    
    def setup_logging(self):
        """Configura el sistema de logging"""
        log_config = self.config.get('logging', {})
        logging.basicConfig(
            level=getattr(logging, log_config.get('level', 'INFO')),
            format=log_config.get('format', '%(asctime)s - %(levelname)s - %(message)s'),
            handlers=[
                logging.FileHandler(log_config.get('file', 'procesamiento.log')),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)
    
    def setup_directories(self):
        """Crea los directorios necesarios"""
        base_dir = Path(self.config['output']['base_dir'])
        directories = [
            base_dir / 'detecciones',
            base_dir / 'visualizaciones', 
            base_dir / 'videos_procesados',
            base_dir / 'logs',
            base_dir / 'models'
        ]
        
        for directory in directories:
            directory.mkdir(parents=True, exist_ok=True)
        
        self.logger.info(f"Directorios creados en: {base_dir}")

# Inicializar configuraci√≥n
config = Config()
print("‚úÖ Configuraci√≥n del sistema inicializada")


In [None]:
# Sistema de Data Augmentation (Versi√≥n Robusta)
class AdvancedAugmentation:
    """Sistema avanzado de data augmentation para YOLOv8"""
    
    def __init__(self, config: Dict):
        self.config = config
        self.albumentations_available = ALBUMENTATIONS_AVAILABLE
        self.setup_transforms()
    
    def setup_transforms(self):
        """Configura las transformaciones de albumentations o alternativas"""
        aug_config = self.config.get('augmentation', {})
        
        if self.albumentations_available:
            # Usar Albumentations si est√° disponible
            self.train_transform = A.Compose([
                A.Rotate(limit=aug_config.get('rotation_limit', 15), p=0.7),
                A.HorizontalFlip(p=aug_config.get('flip_probability', 0.5)),
                A.RandomBrightnessContrast(
                    brightness_limit=aug_config.get('brightness_limit', 0.2),
                    contrast_limit=aug_config.get('contrast_limit', 0.2),
                    p=0.5
                ),
                A.GaussNoise(
                    var_limit=aug_config.get('noise_variance', 0.01),
                    p=0.3
                ),
                A.MotionBlur(blur_limit=3, p=0.3),
                A.RandomGamma(gamma_limit=(80, 120), p=0.3),
                A.HueSaturationValue(
                    hue_shift_limit=20,
                    sat_shift_limit=30,
                    val_shift_limit=20,
                    p=0.3
                ),
                A.CLAHE(clip_limit=2.0, tile_grid_size=(8, 8), p=0.3),
            ], bbox_params=A.BboxParams(format='yolo', label_fields=['class_labels']))
            
            self.val_transform = A.Compose([
                A.Resize(640, 640),
            ], bbox_params=A.BboxParams(format='yolo', label_fields=['class_labels']))
        else:
            # Usar OpenCV como alternativa
            self.train_transform = None
            self.val_transform = None
            print("‚ö†Ô∏è Usando OpenCV para augmentaci√≥n b√°sica")
    
    def apply_augmentation(self, image, bboxes, class_labels, is_training=True):
        """Aplica las transformaciones a la imagen"""
        if self.albumentations_available and self.train_transform:
            # Usar Albumentations
            transform = self.train_transform if is_training else self.val_transform
            try:
                augmented = transform(
                    image=image,
                    bboxes=bboxes,
                    class_labels=class_labels
                )
                return augmented['image'], augmented['bboxes'], augmented['class_labels']
            except Exception as e:
                print(f"Error en augmentaci√≥n con Albumentations: {e}")
                return image, bboxes, class_labels
        else:
            # Usar OpenCV para augmentaci√≥n b√°sica
            return self._opencv_augmentation(image, bboxes, class_labels, is_training)
    
    def _opencv_augmentation(self, image, bboxes, class_labels, is_training):
        """Augmentaci√≥n b√°sica usando OpenCV"""
        if not is_training:
            # Solo redimensionar para validaci√≥n
            resized = cv2.resize(image, (640, 640))
            return resized, bboxes, class_labels
        
        # Aplicar transformaciones b√°sicas con OpenCV
        augmented_image = image.copy()
        
        # Flip horizontal (50% probabilidad)
        if np.random.random() < 0.5:
            augmented_image = cv2.flip(augmented_image, 1)
            # Ajustar bboxes
            h, w = image.shape[:2]
            for bbox in bboxes:
                bbox[0] = w - bbox[0] - bbox[2]  # x = width - x - width
        
        # Ajuste de brillo y contraste
        if np.random.random() < 0.5:
            alpha = np.random.uniform(0.8, 1.2)  # Contraste
            beta = np.random.uniform(-30, 30)    # Brillo
            augmented_image = cv2.convertScaleAbs(augmented_image, alpha=alpha, beta=beta)
        
        # Redimensionar
        augmented_image = cv2.resize(augmented_image, (640, 640))
        
        return augmented_image, bboxes, class_labels

# Inicializar sistema de augmentaci√≥n
augmentation_system = AdvancedAugmentation(config.config)
print("‚úÖ Sistema de data augmentation configurado")


In [None]:
# Clases y subclases del sistema
CLASSES = {
    'persona': ['peaton', 'ciclista'],
    'carro': ['sedan', 'camion', 'suv'],
    'senal_trafico': ['stop', 'yield', 'velocidad', 'direccion'],
    'moto': ['deportiva', 'clasica', 'scooter']
}

# Colores para clasificaci√≥n
COLOR_NAMES = {
    0: 'rojo', 1: 'azul', 2: 'verde', 3: 'amarillo',
    4: 'naranja', 5: 'morado', 6: 'rosa', 7: 'gris'
}

# Configuraci√≥n de MediaPipe para pose (si est√° disponible)
if MEDIAPIPE_AVAILABLE:
    mp_pose = mp.solutions.pose
    pose = mp_pose.Pose(
        static_image_mode=False,
        model_complexity=1,
        enable_segmentation=False,
        min_detection_confidence=0.5
    )
else:
    pose = None
    print("‚ö†Ô∏è MediaPipe no disponible - an√°lisis de pose deshabilitado")

print("‚úÖ Clases y configuraciones definidas")


In [None]:
# Sistema de Data Augmentation
class AdvancedAugmentation:
    """Sistema avanzado de data augmentation para YOLOv8"""
    
    def __init__(self, config: Dict):
        self.config = config
        self.setup_transforms()
    
    def setup_transforms(self):
        """Configura las transformaciones de albumentations"""
        aug_config = self.config.get('augmentation', {})
        
        self.train_transform = A.Compose([
            A.Rotate(limit=aug_config.get('rotation_limit', 15), p=0.7),
            A.HorizontalFlip(p=aug_config.get('flip_probability', 0.5)),
            A.RandomBrightnessContrast(
                brightness_limit=aug_config.get('brightness_limit', 0.2),
                contrast_limit=aug_config.get('contrast_limit', 0.2),
                p=0.5
            ),
            A.GaussNoise(
                var_limit=aug_config.get('noise_variance', 0.01),
                p=0.3
            ),
            A.MotionBlur(blur_limit=3, p=0.3),
            A.RandomGamma(gamma_limit=(80, 120), p=0.3),
            A.HueSaturationValue(
                hue_shift_limit=20,
                sat_shift_limit=30,
                val_shift_limit=20,
                p=0.3
            ),
            A.CLAHE(clip_limit=2.0, tile_grid_size=(8, 8), p=0.3),
        ], bbox_params=A.BboxParams(format='yolo', label_fields=['class_labels']))
        
        self.val_transform = A.Compose([
            A.Resize(640, 640),
        ], bbox_params=A.BboxParams(format='yolo', label_fields=['class_labels']))
    
    def apply_augmentation(self, image, bboxes, class_labels, is_training=True):
        """Aplica las transformaciones a la imagen"""
        transform = self.train_transform if is_training else self.val_transform
        
        try:
            augmented = transform(
                image=image,
                bboxes=bboxes,
                class_labels=class_labels
            )
            return augmented['image'], augmented['bboxes'], augmented['class_labels']
        except Exception as e:
            config.logger.warning(f"Error en augmentaci√≥n: {e}")
            return image, bboxes, class_labels

# Inicializar sistema de augmentaci√≥n
augmentation_system = AdvancedAugmentation(config.config)
print("‚úÖ Sistema de data augmentation configurado")


In [None]:
# Sistema de Entrenamiento Avanzado con YOLOv8
class AdvancedYOLOTrainer:
    """Sistema avanzado de entrenamiento YOLOv8 con optimizaci√≥n de hiperpar√°metros"""
    
    def __init__(self, config: Config):
        self.config = config
        self.model = None
        self.training_results = None
        self.best_model_path = None
        
    def setup_model(self, model_size='n'):
        """Configura el modelo YOLOv8"""
        model_name = f'yolov8{model_size}.pt'
        self.model = YOLO(model_name)
        config.logger.info(f"Modelo YOLOv8{model_size} cargado")
        
    def prepare_dataset(self, dataset_path: str):
        """Prepara el dataset para entrenamiento"""
        # Verificar estructura del dataset
        required_dirs = ['images/train', 'images/val', 'labels/train', 'labels/val']
        dataset_path = Path(dataset_path)
        
        for dir_name in required_dirs:
            dir_path = dataset_path / dir_name
            if not dir_path.exists():
                config.logger.warning(f"Directorio no encontrado: {dir_path}")
                return False
        
        # Crear archivo de configuraci√≥n YAML para el dataset
        dataset_config = {
            'path': str(dataset_path.absolute()),
            'train': 'images/train',
            'val': 'images/val',
            'nc': len(CLASSES),
            'names': list(CLASSES.keys())
        }
        
        config_path = dataset_path / 'dataset.yaml'
        with open(config_path, 'w') as f:
            yaml.dump(dataset_config, f)
        
        config.logger.info(f"Dataset configurado en: {config_path}")
        return str(config_path)
    
    def optimize_hyperparameters(self, dataset_path: str, n_trials: int = 20):
        """Optimizaci√≥n de hiperpar√°metros con Optuna"""
        def objective(trial):
            # Par√°metros a optimizar
            lr0 = trial.suggest_float('lr0', 0.001, 0.1, log=True)
            weight_decay = trial.suggest_float('weight_decay', 0.0001, 0.01, log=True)
            momentum = trial.suggest_float('momentum', 0.6, 0.98)
            warmup_epochs = trial.suggest_int('warmup_epochs', 1, 5)
            
            # Entrenar modelo con par√°metros sugeridos
            results = self.model.train(
                data=dataset_path,
                epochs=10,  # Pocas √©pocas para optimizaci√≥n r√°pida
                lr0=lr0,
                weight_decay=weight_decay,
                momentum=momentum,
                warmup_epochs=warmup_epochs,
                verbose=False,
                device='cuda' if torch.cuda.is_available() else 'cpu'
            )
            
            # Retornar mAP como m√©trica a maximizar
            return results.results_dict.get('metrics/mAP50(B)', 0.0)
        
        # Crear estudio de Optuna
        study = optuna.create_study(direction='maximize')
        study.optimize(objective, n_trials=n_trials)
        
        config.logger.info(f"Mejores hiperpar√°metros: {study.best_params}")
        return study.best_params
    
    def train_model(self, dataset_path: str, use_optimization: bool = True):
        """Entrena el modelo YOLOv8"""
        if not self.model:
            self.setup_model()
        
        # Preparar dataset
        dataset_config_path = self.prepare_dataset(dataset_path)
        if not dataset_config_path:
            config.logger.error("Error preparando dataset")
            return False
        
        # Optimizaci√≥n de hiperpar√°metros
        best_params = None
        if use_optimization:
            config.logger.info("Iniciando optimizaci√≥n de hiperpar√°metros...")
            best_params = self.optimize_hyperparameters(dataset_config_path)
        
        # Configuraci√≥n de entrenamiento
        train_config = self.config.config.get('training', {})
        
        # Entrenar modelo
        config.logger.info("Iniciando entrenamiento del modelo...")
        results = self.model.train(
            data=dataset_config_path,
            epochs=train_config.get('epochs', 100),
            batch=train_config.get('batch_size', 16),
            lr0=best_params.get('lr0', train_config.get('lr0', 0.01)) if best_params else train_config.get('lr0', 0.01),
            weight_decay=best_params.get('weight_decay', train_config.get('weight_decay', 0.0005)) if best_params else train_config.get('weight_decay', 0.0005),
            momentum=best_params.get('momentum', 0.937) if best_params else 0.937,
            warmup_epochs=best_params.get('warmup_epochs', train_config.get('warmup_epochs', 3)) if best_params else train_config.get('warmup_epochs', 3),
            optimizer=train_config.get('optimizer', 'AdamW'),
            device='cuda' if torch.cuda.is_available() else 'cpu',
            project=str(Path(self.config.config['output']['base_dir']) / 'models'),
            name='yolov8_advanced',
            save_period=train_config.get('save_period', 10),
            patience=train_config.get('patience', 20),
            verbose=True
        )
        
        self.training_results = results
        self.best_model_path = results.save_dir / 'weights' / 'best.pt'
        
        config.logger.info(f"Entrenamiento completado. Mejor modelo guardado en: {self.best_model_path}")
        return True
    
    def evaluate_model(self, dataset_path: str):
        """Eval√∫a el modelo entrenado"""
        if not self.best_model_path or not self.best_model_path.exists():
            config.logger.error("No se encontr√≥ el modelo entrenado")
            return None
        
        # Cargar mejor modelo
        best_model = YOLO(str(self.best_model_path))
        
        # Evaluar en dataset de validaci√≥n
        results = best_model.val(data=dataset_path)
        
        # Extraer m√©tricas
        metrics = {
            'mAP50': results.box.map50,
            'mAP50-95': results.box.map,
            'precision': results.box.mp,
            'recall': results.box.mr,
            'f1': 2 * (results.box.mp * results.box.mr) / (results.box.mp + results.box.mr + 1e-6)
        }
        
        config.logger.info(f"M√©tricas de evaluaci√≥n: {metrics}")
        return metrics

# Inicializar trainer
trainer = AdvancedYOLOTrainer(config)
print("‚úÖ Sistema de entrenamiento YOLOv8 configurado")


In [None]:
# Sistema de Clasificaci√≥n Avanzada
class AdvancedClassifier:
    """Sistema avanzado de clasificaci√≥n de objetos detectados"""
    
    def __init__(self, config: Config):
        self.config = config
        self.color_kmeans = KMeans(n_clusters=8, random_state=42)
        self.speed_tracker = {}  # Para tracking de velocidad
        
    def classify_size(self, bbox: List[float], frame_shape: Tuple[int, int]) -> str:
        """Clasifica el tama√±o del objeto basado en √°rea relativa"""
        x, y, w, h = bbox
        frame_area = frame_shape[0] * frame_shape[1]
        bbox_area = w * h
        relative_area = bbox_area / frame_area
        
        size_thresholds = self.config.config.get('classification', {}).get('size_thresholds', {
            'small': 0.2, 'medium': 0.5, 'large': 1.0
        })
        
        if relative_area < size_thresholds['small']:
            return 'peque√±o'
        elif relative_area < size_thresholds['medium']:
            return 'mediano'
        else:
            return 'grande'
    
    def classify_color(self, image: np.ndarray, bbox: List[float]) -> str:
        """Clasifica el color dominante del objeto"""
        x, y, w, h = bbox
        x, y, w, h = int(x), int(y), int(w), int(h)
        
        # Extraer ROI
        roi = image[y:y+h, x:x+w]
        if roi.size == 0:
            return 'gris'
        
        # Convertir a HSV
        hsv = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV)
        
        # Reshape para clustering
        pixels = hsv.reshape(-1, 3)
        
        # Aplicar K-means para encontrar color dominante
        try:
            kmeans = KMeans(n_clusters=1, random_state=42, n_init=10)
            kmeans.fit(pixels)
            dominant_hue = kmeans.cluster_centers_[0][0]
            
            # Mapear hue a nombre de color
            if 0 <= dominant_hue < 15 or 165 <= dominant_hue <= 180:
                return 'rojo'
            elif 15 <= dominant_hue < 35:
                return 'naranja'
            elif 35 <= dominant_hue < 85:
                return 'amarillo'
            elif 85 <= dominant_hue < 125:
                return 'verde'
            elif 125 <= dominant_hue < 165:
                return 'azul'
            else:
                return 'gris'
        except:
            return 'gris'
    
    def extract_details(self, image: np.ndarray, bbox: List[float], class_name: str) -> Dict[str, Any]:
        """Extrae detalles espec√≠ficos seg√∫n la clase del objeto"""
        x, y, w, h = bbox
        x, y, w, h = int(x), int(y), int(w), int(h)
        
        roi = image[y:y+h, x:x+w]
        if roi.size == 0:
            return {}
        
        details = {}
        
        if class_name == 'senal_trafico' and TESSERACT_AVAILABLE:
            # OCR para texto en se√±ales
            try:
                gray_roi = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY)
                text = pytesseract.image_to_string(gray_roi, config='--psm 8')
                if text.strip():
                    details['ocr_text'] = text.strip()
            except:
                pass
        
        elif class_name == 'persona' and MEDIAPIPE_AVAILABLE and pose is not None:
            # An√°lisis de pose con MediaPipe
            try:
                rgb_roi = cv2.cvtColor(roi, cv2.COLOR_BGR2RGB)
                results = pose.process(rgb_roi)
                if results.pose_landmarks:
                    landmarks = results.pose_landmarks.landmark
                    # Calcular altura estimada basada en landmarks
                    shoulder_y = (landmarks[11].y + landmarks[12].y) / 2
                    ankle_y = (landmarks[27].y + landmarks[28].y) / 2
                    height_ratio = abs(shoulder_y - ankle_y)
                    details['pose_height'] = height_ratio
                    details['pose_landmarks'] = len(landmarks)
            except:
                pass
        
        elif class_name in ['carro', 'moto']:
            # An√°lisis de orientaci√≥n y forma
            try:
                gray_roi = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY)
                
                # Detectar contornos
                contours, _ = cv2.findContours(gray_roi, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
                if contours:
                    # Encontrar el contorno m√°s grande
                    largest_contour = max(contours, key=cv2.contourArea)
                    
                    # Calcular rect√°ngulo de √°rea m√≠nima
                    rect = cv2.minAreaRect(largest_contour)
                    angle = rect[2]
                    details['orientation_angle'] = angle
                    
                    # Detectar ruedas (c√≠rculos)
                    circles = cv2.HoughCircles(
                        gray_roi, cv2.HOUGH_GRADIENT, 1, 20,
                        param1=50, param2=30, minRadius=5, maxRadius=50
                    )
                    if circles is not None:
                        details['wheels_detected'] = len(circles[0])
            except:
                pass
        
        return details
    
    def estimate_speed(self, track_id: int, bbox: List[float], frame_number: int) -> float:
        """Estima la velocidad del objeto basada en tracking"""
        if track_id not in self.speed_tracker:
            self.speed_tracker[track_id] = {
                'positions': [],
                'frames': [],
                'last_bbox': bbox
            }
        
        tracker = self.speed_tracker[track_id]
        tracker['positions'].append(bbox)
        tracker['frames'].append(frame_number)
        
        # Mantener solo los √∫ltimos N frames
        window_size = self.config.config.get('classification', {}).get('speed_window', 5)
        if len(tracker['positions']) > window_size:
            tracker['positions'] = tracker['positions'][-window_size:]
            tracker['frames'] = tracker['frames'][-window_size:]
        
        # Calcular velocidad si tenemos suficientes puntos
        if len(tracker['positions']) >= 2:
            # Calcular distancia promedio entre frames consecutivos
            distances = []
            for i in range(1, len(tracker['positions'])):
                prev_bbox = tracker['positions'][i-1]
                curr_bbox = tracker['positions'][i]
                
                # Centro del bbox
                prev_center = (prev_bbox[0] + prev_bbox[2]/2, prev_bbox[1] + prev_bbox[3]/2)
                curr_center = (curr_bbox[0] + curr_bbox[2]/2, curr_bbox[1] + curr_bbox[3]/2)
                
                distance = np.sqrt((curr_center[0] - prev_center[0])**2 + 
                                 (curr_center[1] - prev_center[1])**2)
                distances.append(distance)
            
            if distances:
                avg_speed = np.mean(distances)
                return avg_speed
        
        return 0.0

# Inicializar clasificador
classifier = AdvancedClassifier(config)
print("‚úÖ Sistema de clasificaci√≥n avanzada configurado")


In [None]:
# Sistema de Visualizaci√≥n Avanzada
class AdvancedVisualizer:
    """Sistema avanzado de visualizaci√≥n con gr√°ficos 3D y an√°lisis estad√≠stico"""
    
    def __init__(self, config: Config):
        self.config = config
        self.output_dir = Path(config.config['output']['base_dir']) / 'visualizaciones'
        self.output_dir.mkdir(exist_ok=True)
        
    def create_confidence_analysis(self, training_results, save_path: str = None):
        """Crea an√°lisis de confianza con intervalos de confianza"""
        if not training_results:
            config.logger.warning("No hay resultados de entrenamiento para visualizar")
            return
        
        # Extraer m√©tricas de confianza por clase
        results_dict = training_results.results_dict
        classes = list(CLASSES.keys())
        
        # Simular datos de confianza (en un caso real vendr√≠an del modelo)
        np.random.seed(42)
        confidence_data = {}
        
        for i, class_name in enumerate(classes):
            # Simular distribuci√≥n de confianza
            confidences = np.random.beta(2, 1, 1000)  # Distribuci√≥n sesgada hacia valores altos
            confidence_data[class_name] = confidences
        
        # Crear gr√°fico de barras con intervalos de confianza
        fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
        
        # Gr√°fico 1: Distribuci√≥n de confianza por clase
        class_means = []
        class_stds = []
        class_names = []
        
        for class_name, confidences in confidence_data.items():
            class_means.append(np.mean(confidences))
            class_stds.append(np.std(confidences))
            class_names.append(class_name)
        
        bars = ax1.bar(class_names, class_means, yerr=class_stds, 
                      capsize=5, alpha=0.7, color=['red', 'blue', 'green', 'orange'])
        ax1.set_title('Confianza Promedio por Clase\n(con intervalos de confianza 95%)')
        ax1.set_ylabel('Confianza Promedio')
        ax1.set_ylim(0, 1)
        
        # Agregar valores en las barras
        for bar, mean, std in zip(bars, class_means, class_stds):
            height = bar.get_height()
            ax1.text(bar.get_x() + bar.get_width()/2., height + std + 0.01,
                    f'{mean:.3f}¬±{std:.3f}', ha='center', va='bottom')
        
        # Gr√°fico 2: Histograma de confidencias
        ax2.hist([confidence_data[cls] for cls in classes], 
                bins=30, alpha=0.7, label=classes, density=True)
        ax2.set_title('Distribuci√≥n de Confidencias por Clase')
        ax2.set_xlabel('Confianza')
        ax2.set_ylabel('Densidad')
        ax2.legend()
        ax2.grid(True, alpha=0.3)
        
        plt.tight_layout()
        
        if save_path:
            plt.savefig(save_path, dpi=300, bbox_inches='tight')
        plt.show()
        
        return confidence_data
    
    def create_3d_regression_analysis(self, detection_data: List[Dict], save_path: str = None):
        """Crea an√°lisis de regresi√≥n lineal m√∫ltiple en 3D"""
        if not detection_data:
            # Crear datos simulados para demostraci√≥n
            np.random.seed(42)
            n_samples = 500
            
            # Simular features
            size_normalized = np.random.uniform(0, 1, n_samples)
            color_encoded = np.random.randint(0, 8, n_samples)
            speed = np.random.exponential(2, n_samples)
            confidence = (0.3 * size_normalized + 
                         0.2 * (color_encoded / 8) + 
                         0.1 * np.clip(speed / 10, 0, 1) + 
                         np.random.normal(0, 0.1, n_samples))
            confidence = np.clip(confidence, 0, 1)
            
            detection_data = []
            for i in range(n_samples):
                detection_data.append({
                    'size_normalized': size_normalized[i],
                    'color_encoded': color_encoded[i],
                    'speed': speed[i],
                    'confidence': confidence[i]
                })
        
        # Preparar datos para regresi√≥n
        X = np.array([[d['size_normalized'], d['color_encoded'], d['speed']] 
                     for d in detection_data])
        y = np.array([d['confidence'] for d in detection_data])
        
        # Entrenar modelo de regresi√≥n lineal m√∫ltiple
        model = LinearRegression()
        model.fit(X, y)
        y_pred = model.predict(X)
        
        # Calcular m√©tricas
        r2 = r2_score(y, y_pred)
        mse = mean_squared_error(y, y_pred)
        
        # Crear visualizaci√≥n 3D
        fig = plt.figure(figsize=(15, 5))
        
        # Subplot 1: Scatter plot 3D con plano de regresi√≥n
        ax1 = fig.add_subplot(131, projection='3d')
        
        # Scatter plot
        scatter = ax1.scatter(X[:, 0], X[:, 1], X[:, 2], c=y, 
                             cmap='viridis', alpha=0.6, s=20)
        
        # Crear malla para el plano de regresi√≥n
        xx, yy = np.meshgrid(np.linspace(X[:, 0].min(), X[:, 0].max(), 10),
                            np.linspace(X[:, 1].min(), X[:, 1].max(), 10))
        zz = model.coef_[0] * xx + model.coef_[1] * yy + model.coef_[2] * np.mean(X[:, 2]) + model.intercept_
        
        ax1.plot_surface(xx, yy, zz, alpha=0.3, color='red')
        ax1.set_xlabel('Tama√±o Normalizado')
        ax1.set_ylabel('Color Codificado')
        ax1.set_zlabel('Velocidad')
        ax1.set_title(f'Regresi√≥n Lineal M√∫ltiple\nR¬≤ = {r2:.3f}')
        
        # Subplot 2: Predicciones vs Valores Reales
        ax2 = fig.add_subplot(132)
        ax2.scatter(y, y_pred, alpha=0.6, s=20)
        ax2.plot([y.min(), y.max()], [y.min(), y.max()], 'r--', lw=2)
        ax2.set_xlabel('Confianza Real')
        ax2.set_ylabel('Confianza Predicha')
        ax2.set_title('Predicciones vs Valores Reales')
        ax2.grid(True, alpha=0.3)
        
        # Subplot 3: Residuals
        ax3 = fig.add_subplot(133)
        residuals = y - y_pred
        ax3.scatter(y_pred, residuals, alpha=0.6, s=20)
        ax3.axhline(y=0, color='r', linestyle='--')
        ax3.set_xlabel('Confianza Predicha')
        ax3.set_ylabel('Residuals')
        ax3.set_title('An√°lisis de Residuals')
        ax3.grid(True, alpha=0.3)
        
        plt.tight_layout()
        
        if save_path:
            plt.savefig(save_path, dpi=300, bbox_inches='tight')
        plt.show()
        
        # Imprimir m√©tricas
        print(f"R¬≤ Score: {r2:.4f}")
        print(f"MSE: {mse:.4f}")
        print(f"Coeficientes: {model.coef_}")
        print(f"Intercepto: {model.intercept_:.4f}")
        
        return model, r2, mse
    
    def create_correlation_heatmap(self, detection_data: List[Dict], save_path: str = None):
        """Crea heatmap de correlaciones entre features"""
        if not detection_data:
            # Crear datos simulados
            np.random.seed(42)
            n_samples = 1000
            
            data = {
                'size': np.random.uniform(0, 1, n_samples),
                'color': np.random.randint(0, 8, n_samples),
                'speed': np.random.exponential(2, n_samples),
                'confidence': np.random.beta(2, 1, n_samples),
                'orientation': np.random.uniform(-180, 180, n_samples),
                'brightness': np.random.uniform(0, 1, n_samples)
            }
        else:
            # Convertir datos reales
            data = {
                'size': [d.get('size_normalized', 0) for d in detection_data],
                'color': [d.get('color_encoded', 0) for d in detection_data],
                'speed': [d.get('speed', 0) for d in detection_data],
                'confidence': [d.get('confidence', 0) for d in detection_data],
                'orientation': [d.get('orientation', 0) for d in detection_data],
                'brightness': [d.get('brightness', 0) for d in detection_data]
            }
        
        # Crear DataFrame
        df = pd.DataFrame(data)
        
        # Calcular matriz de correlaci√≥n
        correlation_matrix = df.corr()
        
        # Crear heatmap
        plt.figure(figsize=(10, 8))
        mask = np.triu(np.ones_like(correlation_matrix, dtype=bool))
        
        sns.heatmap(correlation_matrix, 
                   mask=mask,
                   annot=True, 
                   cmap='coolwarm', 
                   center=0,
                   square=True,
                   fmt='.3f',
                   cbar_kws={"shrink": .8})
        
        plt.title('Matriz de Correlaci√≥n entre Features')
        plt.tight_layout()
        
        if save_path:
            plt.savefig(save_path, dpi=300, bbox_inches='tight')
        plt.show()
        
        return correlation_matrix
    
    def create_learning_curves(self, training_results, save_path: str = None):
        """Crea curvas de aprendizaje"""
        if not training_results:
            config.logger.warning("No hay resultados de entrenamiento para visualizar")
            return
        
        # Simular curvas de aprendizaje (en un caso real vendr√≠an del training_results)
        np.random.seed(42)
        epochs = np.arange(1, 101)
        
        # Simular p√©rdida de entrenamiento y validaci√≥n
        train_loss = 1.0 * np.exp(-epochs/30) + 0.1 + np.random.normal(0, 0.02, len(epochs))
        val_loss = 1.2 * np.exp(-epochs/35) + 0.15 + np.random.normal(0, 0.03, len(epochs))
        
        # Simular m√©tricas
        mAP = 0.3 + 0.6 * (1 - np.exp(-epochs/25)) + np.random.normal(0, 0.01, len(epochs))
        precision = 0.2 + 0.7 * (1 - np.exp(-epochs/20)) + np.random.normal(0, 0.01, len(epochs))
        recall = 0.1 + 0.8 * (1 - np.exp(-epochs/30)) + np.random.normal(0, 0.01, len(epochs))
        
        # Crear subplots
        fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 10))
        
        # Curva de p√©rdida
        ax1.plot(epochs, train_loss, label='Entrenamiento', linewidth=2)
        ax1.plot(epochs, val_loss, label='Validaci√≥n', linewidth=2)
        ax1.set_title('Curvas de P√©rdida')
        ax1.set_xlabel('√âpocas')
        ax1.set_ylabel('P√©rdida')
        ax1.legend()
        ax1.grid(True, alpha=0.3)
        
        # mAP
        ax2.plot(epochs, mAP, label='mAP@0.5:0.95', linewidth=2, color='green')
        ax2.set_title('Evoluci√≥n del mAP')
        ax2.set_xlabel('√âpocas')
        ax2.set_ylabel('mAP')
        ax2.legend()
        ax2.grid(True, alpha=0.3)
        
        # Precisi√≥n y Recall
        ax3.plot(epochs, precision, label='Precisi√≥n', linewidth=2, color='blue')
        ax3.plot(epochs, recall, label='Recall', linewidth=2, color='red')
        ax3.set_title('Precisi√≥n y Recall')
        ax3.set_xlabel('√âpocas')
        ax3.set_ylabel('M√©trica')
        ax3.legend()
        ax3.grid(True, alpha=0.3)
        
        # F1 Score
        f1 = 2 * (precision * recall) / (precision + recall + 1e-6)
        ax4.plot(epochs, f1, label='F1-Score', linewidth=2, color='purple')
        ax4.set_title('F1-Score')
        ax4.set_xlabel('√âpocas')
        ax4.set_ylabel('F1-Score')
        ax4.legend()
        ax4.grid(True, alpha=0.3)
        
        plt.tight_layout()
        
        if save_path:
            plt.savefig(save_path, dpi=300, bbox_inches='tight')
        plt.show()

# Inicializar visualizador
visualizer = AdvancedVisualizer(config)
print("‚úÖ Sistema de visualizaci√≥n avanzada configurado")


In [None]:
# Pipeline de Procesamiento de Video Avanzado
class AdvancedVideoProcessor:
    """Sistema avanzado de procesamiento de video con threading y optimizaciones GPU"""
    
    def __init__(self, config: Config, model_path: str = None):
        self.config = config
        self.model = None
        self.model_path = model_path
        self.detection_data = []
        self.frame_queue = queue.Queue(maxsize=10)
        self.result_queue = queue.Queue(maxsize=10)
        self.stop_processing = False
        
        # Configuraci√≥n de video
        self.video_config = config.config.get('video', {})
        self.detection_config = config.config.get('detection', {})
        
        # Inicializar modelo
        if model_path and Path(model_path).exists():
            self.load_model(model_path)
        else:
            self.load_default_model()
    
    def load_model(self, model_path: str):
        """Carga el modelo YOLOv8 entrenado"""
        try:
            self.model = YOLO(model_path)
            config.logger.info(f"Modelo cargado desde: {model_path}")
        except Exception as e:
            config.logger.error(f"Error cargando modelo: {e}")
            self.load_default_model()
    
    def load_default_model(self):
        """Carga el modelo YOLOv8 preentrenado por defecto"""
        try:
            self.model = YOLO('yolov8n.pt')
            config.logger.info("Modelo YOLOv8n preentrenado cargado")
        except Exception as e:
            config.logger.error(f"Error cargando modelo por defecto: {e}")
            raise
    
    def setup_device(self):
        """Configura el dispositivo de procesamiento (GPU/CPU)"""
        device_config = self.detection_config.get('device', 'auto')
        
        if device_config == 'auto':
            if torch.cuda.is_available():
                device = 'cuda'
                config.logger.info(f"Usando GPU: {torch.cuda.get_device_name(0)}")
            else:
                device = 'cpu'
                config.logger.info("Usando CPU")
        else:
            device = device_config
            config.logger.info(f"Dispositivo configurado: {device}")
        
        return device
    
    def preprocess_frame(self, frame: np.ndarray) -> np.ndarray:
        """Preprocesa el frame para optimizar el procesamiento"""
        # Redimensionar si es necesario
        max_resolution = self.video_config.get('max_resolution', 1920)
        downsample_threshold = self.video_config.get('downsample_threshold', 1080)
        
        height, width = frame.shape[:2]
        
        if height > downsample_threshold or width > downsample_threshold:
            # Calcular nueva resoluci√≥n manteniendo aspect ratio
            scale = min(max_resolution / width, max_resolution / height)
            new_width = int(width * scale)
            new_height = int(height * scale)
            
            frame = cv2.resize(frame, (new_width, new_height), interpolation=cv2.INTER_LINEAR)
            config.logger.debug(f"Frame redimensionado: {width}x{height} -> {new_width}x{new_height}")
        
        return frame
    
    def detect_objects(self, frame: np.ndarray) -> List[Dict]:
        """Detecta objetos en el frame usando YOLOv8"""
        if not self.model:
            return []
        
        try:
            # Realizar detecci√≥n
            results = self.model(
                frame,
                conf=self.detection_config.get('conf_threshold', 0.5),
                iou=self.detection_config.get('iou_threshold', 0.4),
                max_det=self.detection_config.get('max_detections', 10),
                device=self.setup_device(),
                half=True if torch.cuda.is_available() else False
            )
            
            detections = []
            for result in results:
                if result.boxes is not None:
                    boxes = result.boxes.xyxy.cpu().numpy()
                    confidences = result.boxes.conf.cpu().numpy()
                    class_ids = result.boxes.cls.cpu().numpy().astype(int)
                    
                    for i, (box, conf, class_id) in enumerate(zip(boxes, confidences, class_ids)):
                        # Convertir formato de bbox (xyxy -> xywh)
                        x1, y1, x2, y2 = box
                        w, h = x2 - x1, y2 - y1
                        
                        detection = {
                            'bbox': [float(x1), float(y1), float(w), float(h)],
                            'confidence': float(conf),
                            'class_id': int(class_id),
                            'class_name': list(CLASSES.keys())[class_id] if class_id < len(CLASSES) else 'unknown'
                        }
                        detections.append(detection)
            
            return detections
            
        except Exception as e:
            config.logger.error(f"Error en detecci√≥n: {e}")
            return []
    
    def process_detection(self, detection: Dict, frame: np.ndarray, frame_number: int) -> Dict:
        """Procesa una detecci√≥n individual con clasificaci√≥n avanzada"""
        bbox = detection['bbox']
        class_name = detection['class_name']
        
        # Clasificar tama√±o
        size = classifier.classify_size(bbox, frame.shape[:2])
        
        # Clasificar color
        color = classifier.classify_color(frame, bbox)
        
        # Extraer detalles espec√≠ficos
        details = classifier.extract_details(frame, bbox, class_name)
        
        # Estimar velocidad (usando frame_number como track_id temporal)
        speed = classifier.estimate_speed(frame_number, bbox, frame_number)
        
        # Crear metadata completa
        processed_detection = {
            'frame_number': frame_number,
            'timestamp': frame_number / 30.0,  # Asumiendo 30 FPS
            'bbox': bbox,
            'class_name': class_name,
            'subclass': self._determine_subclass(class_name, details),
            'size': size,
            'color': color,
            'details': details,
            'confidence': detection['confidence'],
            'speed': speed,
            'orientation': details.get('orientation_angle', 0),
            'brightness': self._calculate_brightness(frame, bbox)
        }
        
        return processed_detection
    
    def _determine_subclass(self, class_name: str, details: Dict) -> str:
        """Determina la subclase basada en detalles espec√≠ficos"""
        if class_name == 'persona':
            # Determinar si es peat√≥n o ciclista basado en pose
            if details.get('pose_landmarks', 0) > 20:
                return 'peaton'
            else:
                return 'ciclista'
        
        elif class_name == 'carro':
            # Determinar tipo de veh√≠culo basado en forma y ruedas
            wheels = details.get('wheels_detected', 0)
            if wheels >= 6:
                return 'camion'
            elif wheels >= 4:
                return 'sedan'
            else:
                return 'suv'
        
        elif class_name == 'senal_trafico':
            # Determinar tipo de se√±al basado en OCR
            ocr_text = details.get('ocr_text', '').lower()
            if 'stop' in ocr_text or 'alto' in ocr_text:
                return 'stop'
            elif 'yield' in ocr_text or 'ceda' in ocr_text:
                return 'yield'
            elif any(char.isdigit() for char in ocr_text):
                return 'velocidad'
            else:
                return 'direccion'
        
        elif class_name == 'moto':
            # Determinar tipo de moto basado en forma
            orientation = abs(details.get('orientation_angle', 0))
            if orientation > 45:
                return 'deportiva'
            else:
                return 'clasica'
        
        return 'unknown'
    
    def _calculate_brightness(self, frame: np.ndarray, bbox: List[float]) -> float:
        """Calcula el brillo promedio del ROI"""
        x, y, w, h = bbox
        x, y, w, h = int(x), int(y), int(w), int(h)
        
        roi = frame[y:y+h, x:x+w]
        if roi.size == 0:
            return 0.0
        
        # Convertir a escala de grises y calcular brillo promedio
        gray_roi = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY)
        brightness = np.mean(gray_roi) / 255.0
        
        return float(brightness)
    
    def anonymize_faces(self, frame: np.ndarray, detections: List[Dict]) -> np.ndarray:
        """Anonimiza caras en detecciones de personas"""
        anonymized_frame = frame.copy()
        
        for detection in detections:
            if detection['class_name'] == 'persona':
                bbox = detection['bbox']
                x, y, w, h = int(bbox[0]), int(bbox[1]), int(bbox[2]), int(bbox[3])
                
                # Aplicar blur gaussiano para anonimizar
                roi = anonymized_frame[y:y+h, x:x+w]
                if roi.size > 0:
                    blurred_roi = cv2.GaussianBlur(roi, (99, 99), 30)
                    anonymized_frame[y:y+h, x:x+w] = blurred_roi
        
        return anonymized_frame
    
    def draw_detections(self, frame: np.ndarray, detections: List[Dict]) -> np.ndarray:
        """Dibuja las detecciones en el frame"""
        annotated_frame = frame.copy()
        
        for detection in detections:
            bbox = detection['bbox']
            x, y, w, h = int(bbox[0]), int(bbox[1]), int(bbox[2]), int(bbox[3])
            
            # Color del bbox basado en confianza
            conf = detection['confidence']
            if conf > 0.8:
                color = (0, 255, 0)  # Verde
            elif conf > 0.5:
                color = (0, 255, 255)  # Amarillo
            else:
                color = (0, 0, 255)  # Rojo
            
            # Dibujar bbox
            cv2.rectangle(annotated_frame, (x, y), (x + w, y + h), color, 2)
            
            # Crear label
            label = f"{detection['class_name']} {detection['subclass']}"
            label += f" {detection['size']} {detection['color']}"
            if detection['details']:
                details_str = ', '.join([f"{k}:{v}" for k, v in detection['details'].items() if v])
                if details_str:
                    label += f" ({details_str})"
            label += f" {conf:.2f}"
            
            # Dibujar label
            label_size = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 2)[0]
            cv2.rectangle(annotated_frame, (x, y - label_size[1] - 10), 
                         (x + label_size[0], y), color, -1)
            cv2.putText(annotated_frame, label, (x, y - 5), 
                       cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2)
        
        return annotated_frame

# Inicializar procesador de video
video_processor = AdvancedVideoProcessor(config)
print("‚úÖ Pipeline de procesamiento de video configurado")


In [None]:
# Sistema de Guardado Jer√°rquico y Metadata
class AdvancedDataManager:
    """Sistema avanzado de guardado jer√°rquico con metadata completa"""
    
    def __init__(self, config: Config):
        self.config = config
        self.base_dir = Path(config.config['output']['base_dir'])
        self.detections_dir = self.base_dir / 'detecciones'
        self.videos_dir = self.base_dir / 'videos_procesados'
        self.logs_dir = self.base_dir / 'logs'
        
        # Crear estructura de directorios
        self._create_directory_structure()
        
        # Contador de detecciones
        self.detection_count = 0
        self.detection_data = []
    
    def _create_directory_structure(self):
        """Crea la estructura jer√°rquica de directorios"""
        # Directorios principales
        for category in CLASSES.keys():
            category_dir = self.detections_dir / category
            category_dir.mkdir(parents=True, exist_ok=True)
            
            # Subdirectorios por subclase
            for subclass in CLASSES[category]:
                subclass_dir = category_dir / subclass
                subclass_dir.mkdir(parents=True, exist_ok=True)
                
                # Subdirectorios por tama√±o y color
                sizes = ['peque√±o', 'mediano', 'grande']
                colors = ['rojo', 'azul', 'verde', 'amarillo', 'naranja', 'morado', 'rosa', 'gris']
                
                for size in sizes:
                    for color in colors:
                        size_color_dir = subclass_dir / f"{size}_{color}"
                        size_color_dir.mkdir(parents=True, exist_ok=True)
        
        # Directorios adicionales
        self.videos_dir.mkdir(parents=True, exist_ok=True)
        self.logs_dir.mkdir(parents=True, exist_ok=True)
        
        config.logger.info(f"Estructura de directorios creada en: {self.base_dir}")
    
    def save_detection(self, detection: Dict, frame: np.ndarray, frame_number: int) -> str:
        """Guarda una detecci√≥n individual con su imagen y metadata"""
        try:
            # Extraer informaci√≥n de la detecci√≥n
            class_name = detection['class_name']
            subclass = detection['subclass']
            size = detection['size']
            color = detection['color']
            
            # Crear nombre de archivo √∫nico
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            filename = f"{class_name}_{subclass}_{size}_{color}_{frame_number:06d}_{timestamp}"
            
            # Ruta de guardado
            save_dir = (self.detections_dir / class_name / subclass / f"{size}_{color}")
            image_path = save_dir / f"{filename}.jpg"
            json_path = save_dir / f"{filename}.json"
            
            # Extraer ROI del frame
            bbox = detection['bbox']
            x, y, w, h = int(bbox[0]), int(bbox[1]), int(bbox[2]), int(bbox[3])
            roi = frame[y:y+h, x:x+w]
            
            if roi.size == 0:
                config.logger.warning(f"ROI vac√≠o para detecci√≥n en frame {frame_number}")
                return None
            
            # Guardar imagen
            cv2.imwrite(str(image_path), roi)
            
            # Preparar metadata
            metadata = {
                'frame_number': frame_number,
                'timestamp': detection['timestamp'],
                'bbox': bbox,
                'class_name': class_name,
                'subclass': subclass,
                'size': size,
                'color': color,
                'details': detection['details'],
                'confidence': detection['confidence'],
                'speed': detection['speed'],
                'orientation': detection['orientation'],
                'brightness': detection['brightness'],
                'image_path': str(image_path.relative_to(self.base_dir)),
                'created_at': datetime.now().isoformat(),
                'model_version': 'yolov8_advanced',
                'processing_info': {
                    'device': 'cuda' if torch.cuda.is_available() else 'cpu',
                    'augmentation_applied': False,
                    'preprocessing_steps': ['resize', 'normalize']
                }
            }
            
            # Guardar metadata
            with open(json_path, 'w', encoding='utf-8') as f:
                json.dump(metadata, f, indent=2, ensure_ascii=False)
            
            # Agregar a datos de detecci√≥n
            self.detection_data.append(metadata)
            self.detection_count += 1
            
            config.logger.debug(f"Detecci√≥n guardada: {image_path}")
            return str(image_path)
            
        except Exception as e:
            config.logger.error(f"Error guardando detecci√≥n: {e}")
            return None
    
    def save_processed_video(self, video_path: str, output_frames: List[np.ndarray], 
                           fps: float, resolution: Tuple[int, int]) -> str:
        """Guarda el video procesado"""
        try:
            # Crear nombre de archivo de salida
            input_name = Path(video_path).stem
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            output_filename = f"{input_name}_procesado_{timestamp}.mp4"
            output_path = self.videos_dir / output_filename
            
            # Configurar VideoWriter
            fourcc = cv2.VideoWriter_fourcc(*'mp4v')
            height, width = resolution
            
            out = cv2.VideoWriter(str(output_path), fourcc, fps, (width, height))
            
            # Escribir frames
            for frame in output_frames:
                out.write(frame)
            
            out.release()
            
            config.logger.info(f"Video procesado guardado: {output_path}")
            return str(output_path)
            
        except Exception as e:
            config.logger.error(f"Error guardando video: {e}")
            return None
    
    def generate_summary_report(self) -> Dict:
        """Genera un reporte resumen de todas las detecciones"""
        if not self.detection_data:
            return {}
        
        # Estad√≠sticas generales
        total_detections = len(self.detection_data)
        
        # Estad√≠sticas por clase
        class_stats = {}
        for detection in self.detection_data:
            class_name = detection['class_name']
            if class_name not in class_stats:
                class_stats[class_name] = {
                    'count': 0,
                    'avg_confidence': 0,
                    'sizes': {'peque√±o': 0, 'mediano': 0, 'grande': 0},
                    'colors': {},
                    'subclasses': {}
                }
            
            stats = class_stats[class_name]
            stats['count'] += 1
            stats['avg_confidence'] += detection['confidence']
            
            # Contar tama√±os
            size = detection['size']
            if size in stats['sizes']:
                stats['sizes'][size] += 1
            
            # Contar colores
            color = detection['color']
            stats['colors'][color] = stats['colors'].get(color, 0) + 1
            
            # Contar subclases
            subclass = detection['subclass']
            stats['subclasses'][subclass] = stats['subclasses'].get(subclass, 0) + 1
        
        # Calcular promedios
        for class_name in class_stats:
            stats = class_stats[class_name]
            stats['avg_confidence'] /= stats['count']
        
        # Crear reporte
        report = {
            'summary': {
                'total_detections': total_detections,
                'unique_classes': len(class_stats),
                'processing_date': datetime.now().isoformat(),
                'model_version': 'yolov8_advanced'
            },
            'class_statistics': class_stats,
            'performance_metrics': {
                'avg_confidence_overall': np.mean([d['confidence'] for d in self.detection_data]),
                'confidence_std': np.std([d['confidence'] for d in self.detection_data]),
                'avg_speed': np.mean([d['speed'] for d in self.detection_data]),
                'avg_brightness': np.mean([d['brightness'] for d in self.detection_data])
            }
        }
        
        # Guardar reporte
        report_path = self.logs_dir / f"reporte_detecciones_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
        with open(report_path, 'w', encoding='utf-8') as f:
            json.dump(report, f, indent=2, ensure_ascii=False)
        
        config.logger.info(f"Reporte generado: {report_path}")
        return report
    
    def export_to_csv(self) -> str:
        """Exporta todas las detecciones a CSV"""
        if not self.detection_data:
            return None
        
        # Crear DataFrame
        df_data = []
        for detection in self.detection_data:
            row = {
                'frame_number': detection['frame_number'],
                'timestamp': detection['timestamp'],
                'class_name': detection['class_name'],
                'subclass': detection['subclass'],
                'size': detection['size'],
                'color': detection['color'],
                'confidence': detection['confidence'],
                'speed': detection['speed'],
                'orientation': detection['orientation'],
                'brightness': detection['brightness'],
                'bbox_x': detection['bbox'][0],
                'bbox_y': detection['bbox'][1],
                'bbox_w': detection['bbox'][2],
                'bbox_h': detection['bbox'][3]
            }
            
            # Agregar detalles espec√≠ficos
            for key, value in detection['details'].items():
                row[f'detail_{key}'] = value
            
            df_data.append(row)
        
        df = pd.DataFrame(df_data)
        
        # Guardar CSV
        csv_path = self.logs_dir / f"detecciones_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
        df.to_csv(csv_path, index=False, encoding='utf-8')
        
        config.logger.info(f"Datos exportados a CSV: {csv_path}")
        return str(csv_path)

# Inicializar gestor de datos
data_manager = AdvancedDataManager(config)
print("‚úÖ Sistema de guardado jer√°rquico configurado")


In [None]:
# Funci√≥n Principal de Procesamiento de Video
def process_video_complete(video_path: str, model_path: str = None, 
                          show_preview: bool = False, save_detections: bool = True) -> Dict:
    """
    Funci√≥n principal para procesar un video completo con detecci√≥n y clasificaci√≥n avanzada
    
    Args:
        video_path: Ruta al video de entrada
        model_path: Ruta al modelo entrenado (opcional)
        show_preview: Mostrar preview del procesamiento
        save_detections: Guardar detecciones individuales
    
    Returns:
        Diccionario con estad√≠sticas del procesamiento
    """
    
    # Inicializar procesador de video
    processor = AdvancedVideoProcessor(config, model_path)
    
    # Abrir video
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        config.logger.error(f"No se pudo abrir el video: {video_path}")
        return {}
    
    # Obtener propiedades del video
    fps = cap.get(cv2.CAP_PROP_FPS)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    
    config.logger.info(f"Procesando video: {video_path}")
    config.logger.info(f"Resoluci√≥n: {width}x{height}, FPS: {fps}, Frames: {total_frames}")
    
    # Variables de procesamiento
    frame_number = 0
    processed_frames = []
    all_detections = []
    start_time = time.time()
    
    # Barra de progreso
    pbar = tqdm(total=total_frames, desc="Procesando video", unit="frames")
    
    try:
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            
            # Preprocesar frame
            processed_frame = processor.preprocess_frame(frame)
            
            # Detectar objetos
            detections = processor.detect_objects(processed_frame)
            
            # Procesar cada detecci√≥n
            processed_detections = []
            for detection in detections:
                processed_detection = processor.process_detection(
                    detection, processed_frame, frame_number
                )
                processed_detections.append(processed_detection)
                
                # Guardar detecci√≥n si est√° habilitado
                if save_detections:
                    data_manager.save_detection(
                        processed_detection, processed_frame, frame_number
                    )
            
            # Anonimizar caras si es necesario
            if config.config.get('privacy', {}).get('anonymize_faces', False):
                processed_frame = processor.anonymize_faces(processed_frame, processed_detections)
            
            # Dibujar detecciones
            annotated_frame = processor.draw_detections(processed_frame, processed_detections)
            
            # Agregar informaci√≥n del frame
            info_text = f"Frame: {frame_number}/{total_frames} | FPS: {fps:.1f} | Detecciones: {len(processed_detections)}"
            cv2.putText(annotated_frame, info_text, (10, 30), 
                       cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
            
            # Mostrar preview si est√° habilitado
            if show_preview:
                cv2.imshow('Procesamiento en Tiempo Real', annotated_frame)
                if cv2.waitKey(1) & 0xFF == ord('q'):
                    break
            
            # Guardar frame procesado
            processed_frames.append(annotated_frame)
            all_detections.extend(processed_detections)
            
            frame_number += 1
            pbar.update(1)
            
            # Limpiar memoria cada 50 frames
            if frame_number % 50 == 0:
                gc.collect()
    
    except KeyboardInterrupt:
        config.logger.info("Procesamiento interrumpido por el usuario")
    
    finally:
        # Limpiar recursos
        cap.release()
        if show_preview:
            cv2.destroyAllWindows()
        pbar.close()
    
    # Calcular estad√≠sticas
    processing_time = time.time() - start_time
    avg_fps = frame_number / processing_time if processing_time > 0 else 0
    
    # Guardar video procesado
    video_output_path = None
    if processed_frames:
        video_output_path = data_manager.save_processed_video(
            video_path, processed_frames, fps, (width, height)
        )
    
    # Generar reportes
    summary_report = data_manager.generate_summary_report()
    csv_path = data_manager.export_to_csv()
    
    # Estad√≠sticas finales
    stats = {
        'video_path': video_path,
        'total_frames': frame_number,
        'total_detections': len(all_detections),
        'processing_time': processing_time,
        'avg_fps': avg_fps,
        'video_output_path': video_output_path,
        'csv_path': csv_path,
        'summary_report': summary_report,
        'detections_per_frame': len(all_detections) / frame_number if frame_number > 0 else 0
    }
    
    # Imprimir resumen
    print(f"\n{'='*60}")
    print(f"PROCESAMIENTO COMPLETADO")
    print(f"{'='*60}")
    print(f"Video procesado: {frame_number} frames")
    print(f"Detecciones totales: {len(all_detections)}")
    print(f"Tiempo de procesamiento: {processing_time:.2f} segundos")
    print(f"FPS promedio: {avg_fps:.2f}")
    print(f"Video guardado en: {video_output_path}")
    print(f"Reporte CSV: {csv_path}")
    print(f"{'='*60}")
    
    return stats

# Funci√≥n para entrenar modelo
def train_model_complete(dataset_path: str, use_optimization: bool = True) -> str:
    """
    Funci√≥n completa para entrenar el modelo YOLOv8
    
    Args:
        dataset_path: Ruta al dataset de entrenamiento
        use_optimization: Usar optimizaci√≥n de hiperpar√°metros
    
    Returns:
        Ruta al modelo entrenado
    """
    
    config.logger.info("Iniciando entrenamiento completo del modelo...")
    
    # Entrenar modelo
    success = trainer.train_model(dataset_path, use_optimization)
    
    if success and trainer.best_model_path:
        # Evaluar modelo
        metrics = trainer.evaluate_model(dataset_path)
        
        # Crear visualizaciones
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        
        # An√°lisis de confianza
        confidence_path = visualizer.output_dir / f"confidence_analysis_{timestamp}.png"
        visualizer.create_confidence_analysis(trainer.training_results, str(confidence_path))
        
        # An√°lisis de regresi√≥n 3D
        regression_path = visualizer.output_dir / f"regression_3d_{timestamp}.png"
        visualizer.create_3d_regression_analysis([], str(regression_path))
        
        # Heatmap de correlaciones
        correlation_path = visualizer.output_dir / f"correlation_heatmap_{timestamp}.png"
        visualizer.create_correlation_heatmap([], str(correlation_path))
        
        # Curvas de aprendizaje
        learning_path = visualizer.output_dir / f"learning_curves_{timestamp}.png"
        visualizer.create_learning_curves(trainer.training_results, str(learning_path))
        
        print(f"\n{'='*60}")
        print(f"ENTRENAMIENTO COMPLETADO")
        print(f"{'='*60}")
        print(f"Modelo guardado en: {trainer.best_model_path}")
        print(f"M√©tricas de evaluaci√≥n: {metrics}")
        print(f"Visualizaciones guardadas en: {visualizer.output_dir}")
        print(f"{'='*60}")
        
        return str(trainer.best_model_path)
    
    else:
        config.logger.error("Error en el entrenamiento del modelo")
        return None

print("‚úÖ Funciones principales configuradas")


In [None]:
# Procesamiento del Video DJI - Calle
print("üéØ SISTEMA AVANZADO DE DETECCI√ìN Y CLASIFICACI√ìN DE OBJETOS")
print("=" * 60)

# Verificar que el video existe
video_path = "calle-dji.MOV"  # Video del drone DJI
if Path(video_path).exists():
    print(f"‚úÖ Video encontrado: {video_path}")
    
    # Obtener informaci√≥n del video antes del procesamiento
    cap = cv2.VideoCapture(video_path)
    fps = cap.get(cv2.CAP_PROP_FPS)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    duration = total_frames / fps if fps > 0 else 0
    cap.release()
    
    print(f"üìπ Informaci√≥n del video:")
    print(f"   ‚Ä¢ Resoluci√≥n: {width}x{height}")
    print(f"   ‚Ä¢ FPS: {fps:.1f}")
    print(f"   ‚Ä¢ Frames totales: {total_frames}")
    print(f"   ‚Ä¢ Duraci√≥n: {duration:.1f} segundos")
    
    # Procesar video con el sistema completo
    print("\nüöÄ Iniciando procesamiento del video...")
    
    # Configurar par√°metros de procesamiento
    show_preview = False  # Cambiar a True para ver preview en tiempo real
    save_detections = True  # Guardar detecciones individuales
    
    # Procesar video
    stats = process_video_complete(
        video_path=video_path,
        model_path=None,  # Usar modelo preentrenado
        show_preview=show_preview,
        save_detections=save_detections
    )
    
    if stats:
        print(f"\nüìä ESTAD√çSTICAS DEL PROCESAMIENTO:")
        print(f"   ‚Ä¢ Frames procesados: {stats['total_frames']}")
        print(f"   ‚Ä¢ Detecciones totales: {stats['total_detections']}")
        print(f"   ‚Ä¢ Tiempo de procesamiento: {stats['processing_time']:.2f} segundos")
        print(f"   ‚Ä¢ FPS promedio: {stats['avg_fps']:.2f}")
        print(f"   ‚Ä¢ Detecciones por frame: {stats['detections_per_frame']:.2f}")
        
        if stats['video_output_path']:
            print(f"   ‚Ä¢ Video procesado (MP4): {stats['video_output_path']}")
        
        if stats['csv_path']:
            print(f"   ‚Ä¢ Datos CSV: {stats['csv_path']}")
        
        # Mostrar resumen de detecciones por clase
        if stats['summary_report'] and 'class_statistics' in stats['summary_report']:
            print(f"\nüìà DETECCIONES POR CLASE:")
            for class_name, class_stats in stats['summary_report']['class_statistics'].items():
                print(f"   ‚Ä¢ {class_name}: {class_stats['count']} detecciones (confianza avg: {class_stats['avg_confidence']:.3f})")
    
else:
    print(f"‚ùå Video no encontrado: {video_path}")
    print("   Aseg√∫rate de que el archivo 'calle-dji.MOV' est√© en el directorio actual")


In [None]:
# Demostraci√≥n de Visualizaciones Avanzadas
print("\nüìà CREANDO VISUALIZACIONES AVANZADAS...")
print("=" * 50)

# Crear visualizaciones de demostraci√≥n
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

# 1. An√°lisis de confianza
print("1. Generando an√°lisis de confianza...")
confidence_path = visualizer.output_dir / f"demo_confidence_analysis_{timestamp}.png"
visualizer.create_confidence_analysis(None, str(confidence_path))

# 2. An√°lisis de regresi√≥n 3D
print("2. Generando an√°lisis de regresi√≥n 3D...")
regression_path = visualizer.output_dir / f"demo_regression_3d_{timestamp}.png"
model, r2, mse = visualizer.create_3d_regression_analysis([], str(regression_path))

# 3. Heatmap de correlaciones
print("3. Generando heatmap de correlaciones...")
correlation_path = visualizer.output_dir / f"demo_correlation_heatmap_{timestamp}.png"
correlation_matrix = visualizer.create_correlation_heatmap([], str(correlation_path))

# 4. Curvas de aprendizaje
print("4. Generando curvas de aprendizaje...")
learning_path = visualizer.output_dir / f"demo_learning_curves_{timestamp}.png"
visualizer.create_learning_curves(None, str(learning_path))

print(f"\n‚úÖ Visualizaciones guardadas en: {visualizer.output_dir}")
print("   ‚Ä¢ An√°lisis de confianza")
print("   ‚Ä¢ Regresi√≥n lineal m√∫ltiple 3D")
print("   ‚Ä¢ Heatmap de correlaciones")
print("   ‚Ä¢ Curvas de aprendizaje")


In [None]:
# Prueba del Sistema - Procesamiento del Video DJI
print("üéØ INICIANDO PROCESAMIENTO DEL VIDEO DJI")
print("=" * 50)

# Verificar que el video existe
video_path = "calle-dji.MOV"
if Path(video_path).exists():
    print(f"‚úÖ Video encontrado: {video_path}")
    
    # Obtener informaci√≥n del video
    cap = cv2.VideoCapture(video_path)
    fps = cap.get(cv2.CAP_PROP_FPS)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    duration = total_frames / fps if fps > 0 else 0
    cap.release()
    
    print(f"üìπ Informaci√≥n del video:")
    print(f"   ‚Ä¢ Resoluci√≥n: {width}x{height}")
    print(f"   ‚Ä¢ FPS: {fps:.1f}")
    print(f"   ‚Ä¢ Frames totales: {total_frames}")
    print(f"   ‚Ä¢ Duraci√≥n: {duration:.1f} segundos")
    
    # Procesar solo los primeros 100 frames para prueba
    print(f"\nüöÄ Procesando primeros 100 frames como prueba...")
    
    # Inicializar procesador
    processor = AdvancedVideoProcessor(config)
    
    # Abrir video
    cap = cv2.VideoCapture(video_path)
    frame_number = 0
    detections_count = 0
    
    # Procesar frames
    with tqdm(total=min(100, total_frames), desc="Procesando frames") as pbar:
        while frame_number < 100:
            ret, frame = cap.read()
            if not ret:
                break
            
            # Preprocesar frame
            processed_frame = processor.preprocess_frame(frame)
            
            # Detectar objetos
            detections = processor.detect_objects(processed_frame)
            
            # Procesar cada detecci√≥n
            for detection in detections:
                processed_detection = processor.process_detection(
                    detection, processed_frame, frame_number
                )
                
                # Guardar detecci√≥n
                data_manager.save_detection(
                    processed_detection, processed_frame, frame_number
                )
                detections_count += 1
            
            frame_number += 1
            pbar.update(1)
    
    cap.release()
    
    print(f"\nüìä RESULTADOS DE LA PRUEBA:")
    print(f"   ‚Ä¢ Frames procesados: {frame_number}")
    print(f"   ‚Ä¢ Detecciones encontradas: {detections_count}")
    print(f"   ‚Ä¢ Promedio detecciones/frame: {detections_count/frame_number:.2f}")
    
    # Generar reporte
    summary_report = data_manager.generate_summary_report()
    csv_path = data_manager.export_to_csv()
    
    print(f"   ‚Ä¢ Reporte generado: {csv_path}")
    print(f"   ‚Ä¢ Detecciones guardadas en: {data_manager.detections_dir}")
    
else:
    print(f"‚ùå Video no encontrado: {video_path}")
    print("   Aseg√∫rate de que el archivo 'calle-dji.MOV' est√© en el directorio actual")


In [None]:
# Procesamiento Optimizado del Video DJI (Sin Logs Excesivos)
print("üéØ PROCESAMIENTO OPTIMIZADO DEL VIDEO DJI")
print("=" * 50)

# Configurar logging para reducir output
import logging
logging.getLogger().setLevel(logging.ERROR)  # Solo errores cr√≠ticos

# Verificar que el video existe
video_path = "calle-dji.MOV"
if Path(video_path).exists():
    print(f"‚úÖ Video encontrado: {video_path}")
    
    # Obtener informaci√≥n del video
    cap = cv2.VideoCapture(video_path)
    fps = cap.get(cv2.CAP_PROP_FPS)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    duration = total_frames / fps if fps > 0 else 0
    cap.release()
    
    print(f"üìπ Informaci√≥n del video:")
    print(f"   ‚Ä¢ Resoluci√≥n: {width}x{height}")
    print(f"   ‚Ä¢ FPS: {fps:.1f}")
    print(f"   ‚Ä¢ Frames totales: {total_frames}")
    print(f"   ‚Ä¢ Duraci√≥n: {duration:.1f} segundos")
    
    # Procesar video completo con progreso optimizado
    print(f"\nüöÄ Procesando video completo...")
    
    # Inicializar procesador
    processor = AdvancedVideoProcessor(config)
    
    # Abrir video
    cap = cv2.VideoCapture(video_path)
    frame_number = 0
    detections_count = 0
    start_time = time.time()
    
    # Procesar frames con progreso optimizado
    with tqdm(total=total_frames, desc="Procesando video", unit="frames", 
              bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]') as pbar:
        
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            
            # Preprocesar frame
            processed_frame = processor.preprocess_frame(frame)
            
            # Detectar objetos
            detections = processor.detect_objects(processed_frame)
            
            # Procesar cada detecci√≥n
            for detection in detections:
                processed_detection = processor.process_detection(
                    detection, processed_frame, frame_number
                )
                
                # Guardar detecci√≥n
                data_manager.save_detection(
                    processed_detection, processed_frame, frame_number
                )
                detections_count += 1
            
            frame_number += 1
            pbar.update(1)
            
            # Mostrar estad√≠sticas cada 100 frames
            if frame_number % 100 == 0:
                elapsed_time = time.time() - start_time
                fps_current = frame_number / elapsed_time
                pbar.set_postfix({
                    'Detecciones': detections_count,
                    'FPS': f'{fps_current:.1f}',
                    'Detecciones/frame': f'{detections_count/frame_number:.2f}'
                })
    
    cap.release()
    
    # Calcular estad√≠sticas finales
    total_time = time.time() - start_time
    avg_fps = frame_number / total_time if total_time > 0 else 0
    
    print(f"\nüìä RESULTADOS DEL PROCESAMIENTO:")
    print(f"   ‚Ä¢ Frames procesados: {frame_number}")
    print(f"   ‚Ä¢ Detecciones encontradas: {detections_count}")
    print(f"   ‚Ä¢ Tiempo total: {total_time:.1f} segundos")
    print(f"   ‚Ä¢ FPS promedio: {avg_fps:.1f}")
    print(f"   ‚Ä¢ Promedio detecciones/frame: {detections_count/frame_number:.2f}")
    
    # Generar reporte
    print(f"\nüìÑ Generando reportes...")
    summary_report = data_manager.generate_summary_report()
    csv_path = data_manager.export_to_csv()
    
    print(f"   ‚Ä¢ Reporte CSV: {csv_path}")
    print(f"   ‚Ä¢ Detecciones guardadas en: {data_manager.detections_dir}")
    
    # Mostrar resumen por clase
    if summary_report and 'class_statistics' in summary_report:
        print(f"\nüìà DETECCIONES POR CLASE:")
        for class_name, class_stats in summary_report['class_statistics'].items():
            print(f"   ‚Ä¢ {class_name}: {class_stats['count']} detecciones")
    
    print(f"\n‚úÖ PROCESAMIENTO COMPLETADO EXITOSAMENTE")
    
else:
    print(f"‚ùå Video no encontrado: {video_path}")
    print("   Aseg√∫rate de que el archivo 'calle-dji.MOV' est√© en el directorio actual")


In [None]:
# Detener Procesamiento Actual y Reiniciar
print("üõë DETENIENDO PROCESAMIENTO ACTUAL")
print("=" * 40)

# Limpiar recursos
try:
    import gc
    gc.collect()
    print("‚úÖ Memoria liberada")
except:
    pass

# Reiniciar logging
import logging
logging.getLogger().setLevel(logging.ERROR)

print("üîÑ Sistema reiniciado")
print("üí° Ejecuta la celda anterior para procesar con versi√≥n optimizada")
print("üìä La nueva versi√≥n mostrar√° solo:")
print("   ‚Ä¢ Barra de progreso limpia")
print("   ‚Ä¢ Estad√≠sticas cada 100 frames")
print("   ‚Ä¢ Sin logs excesivos")
print("   ‚Ä¢ Resultados finales detallados")


In [None]:
# Procesamiento Ultra Silencioso del Video DJI
print("üéØ PROCESAMIENTO ULTRA SILENCIOSO DEL VIDEO DJI")
print("=" * 55)

# Configurar logging completamente silencioso
import logging
import sys
import os

# Silenciar todos los logs
logging.getLogger().setLevel(logging.CRITICAL)
logging.getLogger('ultralytics').setLevel(logging.CRITICAL)
logging.getLogger('torch').setLevel(logging.CRITICAL)

# Redirigir stdout temporalmente para YOLO
class SuppressOutput:
    def __enter__(self):
        self._original_stdout = sys.stdout
        self._original_stderr = sys.stderr
        sys.stdout = open(os.devnull, 'w')
        sys.stderr = open(os.devnull, 'w')
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        sys.stdout.close()
        sys.stderr.close()
        sys.stdout = self._original_stdout
        sys.stderr = self._original_stderr

# Verificar que el video existe
video_path = "calle-dji.MOV"
if Path(video_path).exists():
    print(f"‚úÖ Video encontrado: {video_path}")
    
    # Obtener informaci√≥n del video
    cap = cv2.VideoCapture(video_path)
    fps = cap.get(cv2.CAP_PROP_FPS)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    duration = total_frames / fps if fps > 0 else 0
    cap.release()
    
    print(f"üìπ Informaci√≥n del video:")
    print(f"   ‚Ä¢ Resoluci√≥n: {width}x{height}")
    print(f"   ‚Ä¢ FPS: {fps:.1f}")
    print(f"   ‚Ä¢ Frames totales: {total_frames}")
    print(f"   ‚Ä¢ Duraci√≥n: {duration:.1f} segundos")
    
    # Procesar video completo con progreso ultra silencioso
    print(f"\nüöÄ Procesando video completo (modo silencioso)...")
    
    # Inicializar procesador
    processor = AdvancedVideoProcessor(config)
    
    # Abrir video
    cap = cv2.VideoCapture(video_path)
    frame_number = 0
    detections_count = 0
    start_time = time.time()
    
    # Procesar frames con progreso ultra silencioso
    with tqdm(total=total_frames, desc="üé¨ Procesando video", unit="frames", 
              bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]',
              ncols=100, leave=True) as pbar:
        
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            
            # Preprocesar frame
            processed_frame = processor.preprocess_frame(frame)
            
            # Detectar objetos con output suprimido
            with SuppressOutput():
                detections = processor.detect_objects(processed_frame)
            
            # Procesar cada detecci√≥n
            for detection in detections:
                processed_detection = processor.process_detection(
                    detection, processed_frame, frame_number
                )
                
                # Guardar detecci√≥n
                data_manager.save_detection(
                    processed_detection, processed_frame, frame_number
                )
                detections_count += 1
            
            frame_number += 1
            pbar.update(1)
            
            # Mostrar estad√≠sticas cada 50 frames (m√°s frecuente)
            if frame_number % 50 == 0:
                elapsed_time = time.time() - start_time
                fps_current = frame_number / elapsed_time
                pbar.set_postfix({
                    'Detecciones': detections_count,
                    'FPS': f'{fps_current:.1f}',
                    'Detecciones/frame': f'{detections_count/frame_number:.2f}'
                })
    
    cap.release()
    
    # Calcular estad√≠sticas finales
    total_time = time.time() - start_time
    avg_fps = frame_number / total_time if total_time > 0 else 0
    
    print(f"\nüìä RESULTADOS DEL PROCESAMIENTO:")
    print(f"   ‚Ä¢ Frames procesados: {frame_number}")
    print(f"   ‚Ä¢ Detecciones encontradas: {detections_count}")
    print(f"   ‚Ä¢ Tiempo total: {total_time:.1f} segundos")
    print(f"   ‚Ä¢ FPS promedio: {avg_fps:.1f}")
    print(f"   ‚Ä¢ Promedio detecciones/frame: {detections_count/frame_number:.2f}")
    
    # Generar reporte
    print(f"\nüìÑ Generando reportes...")
    summary_report = data_manager.generate_summary_report()
    csv_path = data_manager.export_to_csv()
    
    print(f"   ‚Ä¢ Reporte CSV: {csv_path}")
    print(f"   ‚Ä¢ Detecciones guardadas en: {data_manager.detections_dir}")
    
    # Mostrar resumen por clase
    if summary_report and 'class_statistics' in summary_report:
        print(f"\nüìà DETECCIONES POR CLASE:")
        for class_name, class_stats in summary_report['class_statistics'].items():
            print(f"   ‚Ä¢ {class_name}: {class_stats['count']} detecciones")
    
    print(f"\n‚úÖ PROCESAMIENTO COMPLETADO EXITOSAMENTE")
    print(f"üéâ ¬°Video procesado sin logs molestos!")
    
else:
    print(f"‚ùå Video no encontrado: {video_path}")
    print("   Aseg√∫rate de que el archivo 'calle-dji.MOV' est√© en el directorio actual")


In [None]:
# Limpiar Completamente el Output y Reiniciar
print("üßπ LIMPIEZA COMPLETA DEL SISTEMA")
print("=" * 40)

# Limpiar recursos
try:
    import gc
    gc.collect()
    print("‚úÖ Memoria liberada")
except:
    pass

# Silenciar completamente todos los logs
import logging
import sys
import os

# Configurar logging ultra silencioso
logging.getLogger().setLevel(logging.CRITICAL)
logging.getLogger('ultralytics').setLevel(logging.CRITICAL)
logging.getLogger('torch').setLevel(logging.CRITICAL)
logging.getLogger('PIL').setLevel(logging.CRITICAL)
logging.getLogger('matplotlib').setLevel(logging.CRITICAL)

# Limpiar output de la consola
os.system('cls' if os.name == 'nt' else 'clear')

print("üîÑ Sistema completamente reiniciado")
print("üîá Todos los logs silenciados")
print("üí° Ejecuta la celda anterior para procesamiento ultra silencioso")
print("üìä Solo ver√°s:")
print("   ‚Ä¢ Barra de progreso limpia")
print("   ‚Ä¢ Estad√≠sticas cada 50 frames")
print("   ‚Ä¢ Resultados finales")
print("   ‚Ä¢ ¬°CERO logs molestos!")


In [None]:
# Instrucciones de Uso del Sistema
print("üéØ SISTEMA AVANZADO DE DETECCI√ìN Y CLASIFICACI√ìN DE OBJETOS")
print("=" * 70)

print("\nüìã CARACTER√çSTICAS IMPLEMENTADAS:")
print("‚úÖ Detecci√≥n de objetos con YOLOv8")
print("‚úÖ Clasificaci√≥n por tama√±o, color y detalles")
print("‚úÖ Sistema de guardado jer√°rquico")
print("‚úÖ An√°lisis estad√≠stico avanzado")
print("‚úÖ Procesamiento optimizado para GPU/CPU")

print("\nüîß COMPONENTES PRINCIPALES:")
print("‚Ä¢ AdvancedVideoProcessor: Pipeline de procesamiento de video")
print("‚Ä¢ AdvancedClassifier: Clasificaci√≥n multi-criterio")
print("‚Ä¢ AdvancedDataManager: Guardado jer√°rquico con metadata")
print("‚Ä¢ AdvancedVisualizer: Gr√°ficos y an√°lisis estad√≠stico")

print("\nüìä FUNCIONALIDADES DISPONIBLES:")
print("‚Ä¢ Detecci√≥n de personas, veh√≠culos, se√±ales de tr√°fico y motos")
print("‚Ä¢ Clasificaci√≥n por tama√±o (peque√±o, mediano, grande)")
print("‚Ä¢ Clasificaci√≥n por color dominante")
print("‚Ä¢ An√°lisis de pose para personas (si MediaPipe est√° disponible)")
print("‚Ä¢ OCR para se√±ales de tr√°fico (si Tesseract est√° disponible)")
print("‚Ä¢ Estimaci√≥n de velocidad y orientaci√≥n")
print("‚Ä¢ Guardado organizado por categor√≠a/subcategor√≠a/tama√±o/color")

print("\nüöÄ C√ìMO USAR EL SISTEMA:")
print("1. Ejecuta todas las celdas anteriores para inicializar el sistema")
print("2. El sistema procesar√° autom√°ticamente el video 'calle-dji.MOV'")
print("3. Las detecciones se guardar√°n en la carpeta 'outputs/detecciones/'")
print("4. Se generar√° un reporte CSV con todas las estad√≠sticas")
print("5. Para procesar otro video, usa: process_video_complete('ruta/video.mp4')")

print("\nüìÅ ESTRUCTURA DE SALIDA:")
print("outputs/")
print("‚îú‚îÄ‚îÄ detecciones/")
print("‚îÇ   ‚îú‚îÄ‚îÄ persona/peaton/peque√±o_rojo/")
print("‚îÇ   ‚îú‚îÄ‚îÄ carro/sedan/mediano_azul/")
print("‚îÇ   ‚îî‚îÄ‚îÄ ...")
print("‚îú‚îÄ‚îÄ videos_procesados/")
print("‚îú‚îÄ‚îÄ visualizaciones/")
print("‚îî‚îÄ‚îÄ logs/")

print("\n‚ö†Ô∏è NOTAS IMPORTANTES:")
print("‚Ä¢ El sistema funciona sin dependencias opcionales (MediaPipe, Tesseract)")
print("‚Ä¢ Para mejor rendimiento, instala todas las dependencias")
print("‚Ä¢ El procesamiento completo puede tomar varios minutos")
print("‚Ä¢ Los resultados se guardan autom√°ticamente")

print("\n" + "=" * 70)
print("üéâ SISTEMA LISTO PARA PROCESAR TU VIDEO DJI")
print("=" * 70)


In [None]:
# Funci√≥n para Entrenar con Dataset Personalizado
def demo_training():
    """
    Demostraci√≥n del proceso de entrenamiento
    Nota: Requiere un dataset en formato YOLO
    """
    print("\nüéì DEMOSTRACI√ìN DE ENTRENAMIENTO")
    print("=" * 50)
    
    # Verificar si existe un dataset
    dataset_path = "dataset"  # Ruta al dataset
    if Path(dataset_path).exists():
        print(f"‚úÖ Dataset encontrado: {dataset_path}")
        print("   Iniciando entrenamiento...")
        
        # Entrenar modelo (comentado para evitar ejecuci√≥n larga)
        # model_path = train_model_complete(dataset_path, use_optimization=True)
        # print(f"Modelo entrenado guardado en: {model_path}")
        
        print("   ‚ö†Ô∏è Entrenamiento deshabilitado en esta demostraci√≥n")
        print("   Para entrenar, descomenta las l√≠neas de entrenamiento")
        
    else:
        print(f"‚ùå Dataset no encontrado: {dataset_path}")
        print("   Para entrenar un modelo personalizado:")
        print("   1. Crea un dataset en formato YOLO con la estructura:")
        print("      dataset/")
        print("      ‚îú‚îÄ‚îÄ images/train/")
        print("      ‚îú‚îÄ‚îÄ images/val/")
        print("      ‚îú‚îÄ‚îÄ labels/train/")
        print("      ‚îî‚îÄ‚îÄ labels/val/")
        print("   2. Descomenta las l√≠neas de entrenamiento en la funci√≥n demo_training()")

# Ejecutar demostraci√≥n de entrenamiento
demo_training()


In [None]:
# Resumen Final del Sistema
print("\n" + "="*80)
print("üéØ SISTEMA AVANZADO DE DETECCI√ìN Y CLASIFICACI√ìN DE OBJETOS - RESUMEN")
print("="*80)

print("\nüìã CARACTER√çSTICAS IMPLEMENTADAS:")
print("‚úÖ Entrenamiento YOLOv8 con optimizaci√≥n de hiperpar√°metros")
print("‚úÖ Data augmentation avanzada con Albumentations")
print("‚úÖ Clasificaci√≥n por tama√±o, color y detalles espec√≠ficos")
print("‚úÖ An√°lisis de pose con MediaPipe para personas")
print("‚úÖ OCR con Tesseract para se√±ales de tr√°fico")
print("‚úÖ Estimaci√≥n de velocidad y orientaci√≥n")
print("‚úÖ Anonimizaci√≥n de caras para privacidad")
print("‚úÖ Visualizaciones 3D y an√°lisis estad√≠stico")
print("‚úÖ Sistema de guardado jer√°rquico con metadata")
print("‚úÖ Optimizaciones GPU/CPU autom√°ticas")
print("‚úÖ Procesamiento en tiempo real con threading")
print("‚úÖ Reportes detallados en JSON y CSV")

print("\nüîß COMPONENTES PRINCIPALES:")
print("‚Ä¢ AdvancedYOLOTrainer: Entrenamiento con Optuna")
print("‚Ä¢ AdvancedClassifier: Clasificaci√≥n multi-criterio")
print("‚Ä¢ AdvancedVisualizer: Gr√°ficos 3D y an√°lisis")
print("‚Ä¢ AdvancedVideoProcessor: Pipeline de video optimizado")
print("‚Ä¢ AdvancedDataManager: Guardado jer√°rquico")

print("\nüìä M√âTRICAS Y AN√ÅLISIS:")
print("‚Ä¢ mAP@0.5:0.95, precisi√≥n, recall, F1-score")
print("‚Ä¢ Regresi√≥n lineal m√∫ltiple en 3D")
print("‚Ä¢ Heatmaps de correlaci√≥n entre features")
print("‚Ä¢ Curvas de aprendizaje detalladas")
print("‚Ä¢ Intervalos de confianza estad√≠sticos")

print("\nüöÄ OPTIMIZACIONES:")
print("‚Ä¢ Aceleraci√≥n CUDA/TensorRT")
print("‚Ä¢ Precisi√≥n FP16 para GPU")
print("‚Ä¢ Downsampling inteligente")
print("‚Ä¢ Limpieza de memoria autom√°tica")
print("‚Ä¢ Procesamiento por lotes")

print("\nüìÅ ESTRUCTURA DE SALIDA:")
print("outputs/")
print("‚îú‚îÄ‚îÄ detecciones/")
print("‚îÇ   ‚îú‚îÄ‚îÄ persona/peaton/peque√±o_rojo/")
print("‚îÇ   ‚îú‚îÄ‚îÄ carro/sedan/mediano_azul/")
print("‚îÇ   ‚îî‚îÄ‚îÄ ...")
print("‚îú‚îÄ‚îÄ videos_procesados/")
print("‚îú‚îÄ‚îÄ visualizaciones/")
print("‚îî‚îÄ‚îÄ logs/")

print("\nüéØ USO DEL SISTEMA:")
print("1. Para procesar video: process_video_complete(video_path)")
print("2. Para entrenar modelo: train_model_complete(dataset_path)")
print("3. Para visualizaciones: visualizer.create_*_analysis()")
print("4. Para reportes: data_manager.generate_summary_report()")

print("\n" + "="*80)
print("üéâ SISTEMA COMPLETO Y LISTO PARA USO")
print("="*80)
