# 05 - Testing e Inferencia en Tiempo Real

Este notebook prueba el modelo entrenado con:
- Im√°genes est√°ticas
- Videos grabados
- **C√°maras IP en tiempo real** (RTSP/HTTP)
- Webcam local

## Objetivos:
1. Validar el modelo en condiciones reales
2. Probar conexi√≥n con c√°maras IP
3. Medir latencia y FPS en streaming
4. Generar visualizaciones de detecciones

In [None]:
!pip install ultralytics opencv-python vidgear paho-mqtt

In [None]:
import os
import yaml
import cv2
import time
import numpy as np
from pathlib import Path
from ultralytics import YOLO
from datetime import datetime
from IPython.display import Image, display, clear_output
import matplotlib.pyplot as plt
from dotenv import load_dotenv

load_dotenv('../.env')

# Configuraci√≥n
PROJECT_ROOT = Path('..').resolve()
DATA_DIR = PROJECT_ROOT / 'data'
MODELS_DIR = DATA_DIR / 'models'
CONFIG_DIR = PROJECT_ROOT / 'config'

print("‚úÖ Librer√≠as importadas")

## Paso 1: Cargar Modelo Entrenado

In [None]:
# Cargar informaci√≥n del modelo
model_info_path = MODELS_DIR / 'yolo_ppe_model_info.yaml'

if model_info_path.exists():
    with open(model_info_path, 'r') as f:
        model_info = yaml.safe_load(f)
    
    model_path = model_info['best_model_path']
    print(f"üì¶ Cargando modelo desde: {model_path}")
    model = YOLO(model_path)
    
    print("\nüìã Informaci√≥n del Modelo:")
    print(f"   Tipo: {model_info['model_type']} {model_info['model_size']}")
    print(f"   Clases: {model_info['num_classes']}")
    print(f"   mAP50: {model_info['metrics']['mAP50']:.3f}")
    print(f"   Recall: {model_info['metrics']['recall']:.3f}")
    
    class_names = model_info['class_names']
else:
    print("‚ö†Ô∏è Modelo no encontrado. Entrena primero en notebook 03")
    # Usar modelo por defecto para testing
    model = YOLO('yolov8s.pt')
    class_names = model.names

## Paso 2: Cargar Configuraci√≥n de C√°maras

In [None]:
# Cargar configuraci√≥n de c√°maras
camera_config_path = CONFIG_DIR / 'camera_config.yaml'

with open(camera_config_path, 'r') as f:
    camera_config = yaml.safe_load(f)

print("üìπ C√°maras configuradas:")
for i, cam in enumerate(camera_config['cameras'], 1):
    status = "üü¢" if cam['enabled'] else "üî¥"
    print(f"   {status} {cam['camera_id']}: {cam['name']} ({cam['protocol'].upper()})")

## Paso 3: Funciones de Conexi√≥n a C√°maras

In [None]:
def build_camera_url(camera):
    """
    Construye URL de conexi√≥n para c√°mara IP
    
    Soporta RTSP y HTTP/MJPEG
    """
    protocol = camera['protocol']
    host = camera['host']
    port = camera['port']
    path = camera['path']
    username = camera.get('username', '')
    
    # Obtener password de variable de entorno
    password_var = camera.get('password', '')
    if password_var.startswith('${') and password_var.endswith('}'):
        env_var = password_var[2:-1]
        password = os.getenv(env_var, '')
    else:
        password = password_var
    
    # Construir URL seg√∫n protocolo
    if protocol == 'rtsp':
        if username and password:
            url = f"rtsp://{username}:{password}@{host}:{port}{path}"
        else:
            url = f"rtsp://{host}:{port}{path}"
    
    elif protocol == 'http':
        if username and password:
            url = f"http://{username}:{password}@{host}:{port}{path}"
        else:
            url = f"http://{host}:{port}{path}"
    
    else:
        raise ValueError(f"Protocolo no soportado: {protocol}")
    
    return url


def connect_to_camera(camera, timeout=10):
    """
    Conecta a una c√°mara IP y retorna el objeto VideoCapture
    """
    url = build_camera_url(camera)
    
    print(f"üîå Conectando a {camera['camera_id']}...")
    print(f"   URL: {url.replace(camera.get('username', ''), '***').replace(os.getenv(camera['password'][2:-1], ''), '***')}")
    
    cap = cv2.VideoCapture(url)
    cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)  # Reducir buffer para menor latencia
    
    # Intentar leer un frame para verificar conexi√≥n
    start_time = time.time()
    while time.time() - start_time < timeout:
        ret, frame = cap.read()
        if ret and frame is not None:
            print(f"   ‚úÖ Conectado exitosamente")
            print(f"   Resoluci√≥n: {frame.shape[1]}x{frame.shape[0]}")
            return cap
        time.sleep(0.5)
    
    print(f"   ‚ùå No se pudo conectar (timeout {timeout}s)")
    cap.release()
    return None

print("‚úÖ Funciones de c√°mara definidas")

## Paso 4: Test de Imagen Est√°tica

In [None]:
# Probar con una imagen de test
test_images_dir = DATA_DIR / 'processed' / 'yolo' / 'images' / 'test'

test_images = list(test_images_dir.glob('*.jpg')) + list(test_images_dir.glob('*.png'))

if test_images:
    test_img_path = test_images[0]
    
    # Hacer predicci√≥n
    results = model.predict(
        source=str(test_img_path),
        conf=0.5,  # Umbral de confianza
        iou=0.45,
        show_labels=True,
        show_conf=True,
        save=True
    )
    
    # Mostrar resultados
    print(f"üñºÔ∏è Detecciones en {test_img_path.name}:")
    for r in results:
        for box in r.boxes:
            class_id = int(box.cls)
            confidence = float(box.conf)
            class_name = class_names[class_id]
            print(f"   {class_name}: {confidence:.1%}")
    
    # Mostrar imagen con detecciones
    result_img = results[0].plot()
    plt.figure(figsize=(12, 8))
    plt.imshow(cv2.cvtColor(result_img, cv2.COLOR_BGR2RGB))
    plt.axis('off')
    plt.title('Detecciones de EPP')
    plt.show()
else:
    print("‚ö†Ô∏è No se encontraron im√°genes de test")
    print("   Puedes subir una imagen de prueba al directorio data/")

## Paso 5: Test con Webcam Local (Opcional)

In [None]:
# Test con webcam local
def test_webcam(duration_seconds=30):
    """
    Prueba el modelo con webcam local durante N segundos
    """
    print("üìπ Iniciando test con webcam...")
    print(f"   Duraci√≥n: {duration_seconds} segundos")
    print("   Presiona 'q' para salir antes\n")
    
    cap = cv2.VideoCapture(0)
    
    if not cap.isOpened():
        print("‚ùå No se pudo abrir la webcam")
        return
    
    start_time = time.time()
    frame_count = 0
    
    while (time.time() - start_time) < duration_seconds:
        ret, frame = cap.read()
        if not ret:
            break
        
        # Hacer predicci√≥n cada 3 frames (optimizaci√≥n)
        if frame_count % 3 == 0:
            results = model.predict(frame, conf=0.5, verbose=False)
            annotated_frame = results[0].plot()
        
        # Mostrar FPS
        elapsed = time.time() - start_time
        fps = frame_count / elapsed if elapsed > 0 else 0
        cv2.putText(annotated_frame, f"FPS: {fps:.1f}", (10, 30),
                   cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
        
        # Mostrar frame
        cv2.imshow('PPE Detection - Webcam', annotated_frame)
        
        frame_count += 1
        
        # Salir con 'q'
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
    
    cap.release()
    cv2.destroyAllWindows()
    
    avg_fps = frame_count / (time.time() - start_time)
    print(f"\n‚úÖ Test completado")
    print(f"   Frames procesados: {frame_count}")
    print(f"   FPS promedio: {avg_fps:.1f}")

# Descomentar para probar con webcam
# test_webcam(duration_seconds=30)

## Paso 6: Test con C√°mara IP

‚ö†Ô∏è **Antes de ejecutar**:
1. Configura las credenciales de las c√°maras en `.env`
2. Verifica que la c√°mara est√© accesible en la red
3. Aseg√∫rate de que el protocolo (RTSP/HTTP) sea correcto

In [None]:
def test_ip_camera(camera_id, duration_seconds=60, save_detections=False):
    """
    Prueba el modelo con una c√°mara IP espec√≠fica
    
    Args:
        camera_id: ID de la c√°mara (ej: 'cam_001')
        duration_seconds: Duraci√≥n del test en segundos
        save_detections: Si guardar frames con detecciones
    """
    # Buscar configuraci√≥n de la c√°mara
    camera = None
    for cam in camera_config['cameras']:
        if cam['camera_id'] == camera_id:
            camera = cam
            break
    
    if not camera:
        print(f"‚ùå C√°mara '{camera_id}' no encontrada en configuraci√≥n")
        return
    
    if not camera['enabled']:
        print(f"‚ö†Ô∏è C√°mara '{camera_id}' est√° deshabilitada")
        print("   Habil√≠tala en config/camera_config.yaml")
        return
    
    # Conectar a la c√°mara
    cap = connect_to_camera(camera)
    
    if cap is None:
        return
    
    # Crear directorio para guardar detecciones
    if save_detections:
        detections_dir = DATA_DIR / 'detections' / camera_id / datetime.now().strftime('%Y%m%d_%H%M%S')
        detections_dir.mkdir(parents=True, exist_ok=True)
        print(f"üíæ Guardando detecciones en: {detections_dir}")
    
    print(f"\nüöÄ Iniciando detecci√≥n en tiempo real...")
    print(f"   Duraci√≥n: {duration_seconds} segundos")
    print(f"   Presiona 'q' para salir antes\n")
    
    start_time = time.time()
    frame_count = 0
    detection_count = 0
    
    # Estad√≠sticas
    fps_history = []
    detections_by_class = {name: 0 for name in class_names}
    
    try:
        while (time.time() - start_time) < duration_seconds:
            ret, frame = cap.read()
            
            if not ret:
                print("‚ö†Ô∏è Error leyendo frame, intentando reconectar...")
                cap.release()
                time.sleep(2)
                cap = connect_to_camera(camera)
                if cap is None:
                    break
                continue
            
            frame_start = time.time()
            
            # Hacer predicci√≥n
            results = model.predict(frame, conf=0.5, iou=0.45, verbose=False)
            annotated_frame = results[0].plot()
            
            # Contar detecciones
            num_detections = len(results[0].boxes)
            if num_detections > 0:
                detection_count += 1
                
                # Estad√≠sticas por clase
                for box in results[0].boxes:
                    class_id = int(box.cls)
                    class_name = class_names[class_id]
                    detections_by_class[class_name] += 1
            
            # Calcular FPS
            frame_time = time.time() - frame_start
            fps = 1 / frame_time if frame_time > 0 else 0
            fps_history.append(fps)
            
            # A√±adir informaci√≥n en el frame
            cv2.putText(annotated_frame, f"FPS: {fps:.1f}", (10, 30),
                       cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
            cv2.putText(annotated_frame, f"Detections: {num_detections}", (10, 70),
                       cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
            cv2.putText(annotated_frame, f"Camera: {camera_id}", (10, 110),
                       cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
            
            # Mostrar frame
            cv2.imshow(f'PPE Detection - {camera["name"]}', annotated_frame)
            
            # Guardar frame si hay detecciones
            if save_detections and num_detections > 0:
                filename = detections_dir / f"frame_{frame_count:06d}.jpg"
                cv2.imwrite(str(filename), annotated_frame)
            
            frame_count += 1
            
            # Salir con 'q'
            if cv2.waitKey(1) & 0xFF == ord('q'):
                print("\n‚èπÔ∏è Detenido por el usuario")
                break
    
    except KeyboardInterrupt:
        print("\n‚èπÔ∏è Interrumpido por el usuario")
    
    finally:
        cap.release()
        cv2.destroyAllWindows()
        
        # Mostrar estad√≠sticas
        elapsed = time.time() - start_time
        avg_fps = np.mean(fps_history) if fps_history else 0
        
        print("\n" + "="*50)
        print("üìä ESTAD√çSTICAS DEL TEST")
        print("="*50)
        print(f"C√°mara: {camera['name']} ({camera_id})")
        print(f"Duraci√≥n: {elapsed:.1f} segundos")
        print(f"Frames procesados: {frame_count}")
        print(f"FPS promedio: {avg_fps:.1f}")
        print(f"Frames con detecciones: {detection_count} ({detection_count/frame_count*100:.1f}%)")
        print("\nDetecciones por clase:")
        for class_name, count in sorted(detections_by_class.items(), key=lambda x: x[1], reverse=True):
            if count > 0:
                print(f"   {class_name}: {count}")
        print("="*50)

print("‚úÖ Funci√≥n de test de c√°mara IP lista")

In [None]:
# Ejecutar test con c√°mara IP
# ‚ö†Ô∏è Aseg√∫rate de configurar primero las credenciales en .env

# Ejemplo:
# test_ip_camera('cam_001', duration_seconds=60, save_detections=True)

print("üí° Para probar con c√°mara IP, descomenta y ejecuta la l√≠nea de arriba")
print("   Reemplaza 'cam_001' con el ID de tu c√°mara")

## Paso 7: Test M√∫ltiples C√°maras (Simulaci√≥n)

In [None]:
def test_multiple_cameras():
    """
    Simula procesamiento de m√∫ltiples c√°maras en paralelo
    """
    enabled_cameras = [cam for cam in camera_config['cameras'] if cam['enabled']]
    
    if not enabled_cameras:
        print("‚ö†Ô∏è No hay c√°maras habilitadas")
        return
    
    print(f"üìπ C√°maras habilitadas: {len(enabled_cameras)}")
    print("\nüí° En producci√≥n, cada c√°mara se procesar√≠a en un thread separado")
    print("   Esto es una simulaci√≥n secuencial\n")
    
    for camera in enabled_cameras:
        print(f"\n{'='*60}")
        print(f"Procesando: {camera['name']}")
        print('='*60)
        
        # En producci√≥n, esto ser√≠a un thread
        # test_ip_camera(camera['camera_id'], duration_seconds=10)
        
        print(f"‚úÖ {camera['camera_id']} procesado")

# test_multiple_cameras()

## Paso 8: An√°lisis de Latencia y Performance

In [None]:
def analyze_performance(num_iterations=100):
    """
    Analiza la performance del modelo con diferentes resoluciones
    """
    resolutions = [
        (640, 480),   # VGA
        (1280, 720),  # HD
        (1920, 1080), # Full HD
    ]
    
    results_data = []
    
    for width, height in resolutions:
        # Crear imagen de prueba
        test_img = np.random.randint(0, 255, (height, width, 3), dtype=np.uint8)
        
        # Warm-up
        for _ in range(5):
            _ = model.predict(test_img, verbose=False)
        
        # Medir tiempo
        times = []
        for _ in range(num_iterations):
            start = time.time()
            _ = model.predict(test_img, verbose=False)
            times.append(time.time() - start)
        
        avg_time = np.mean(times)
        fps = 1 / avg_time
        
        results_data.append({
            'Resolution': f"{width}x{height}",
            'Avg Time (ms)': avg_time * 1000,
            'FPS': fps,
            'Min Time (ms)': np.min(times) * 1000,
            'Max Time (ms)': np.max(times) * 1000,
        })
    
    # Mostrar resultados
    import pandas as pd
    df = pd.DataFrame(results_data)
    
    print("\n‚ö° AN√ÅLISIS DE PERFORMANCE")
    print("="*70)
    print(df.to_string(index=False))
    print("="*70)
    
    # Visualizar
    plt.figure(figsize=(10, 6))
    plt.subplot(1, 2, 1)
    plt.bar(df['Resolution'], df['Avg Time (ms)'])
    plt.ylabel('Tiempo promedio (ms)')
    plt.title('Latencia por Resoluci√≥n')
    plt.xticks(rotation=45)
    
    plt.subplot(1, 2, 2)
    plt.bar(df['Resolution'], df['FPS'])
    plt.ylabel('FPS')
    plt.title('Throughput por Resoluci√≥n')
    plt.xticks(rotation=45)
    plt.axhline(y=30, color='r', linestyle='--', label='30 FPS (objetivo)')
    plt.legend()
    
    plt.tight_layout()
    plt.show()
    
    return df

# Ejecutar an√°lisis
# perf_df = analyze_performance()

## Resumen y Conclusiones

### ‚úÖ Has probado:
1. Inferencia en im√°genes est√°ticas
2. Conexi√≥n a c√°maras IP (RTSP/HTTP)
3. Procesamiento en tiempo real
4. An√°lisis de performance

### üìä M√©tricas Importantes:
- **FPS > 10**: Adecuado para monitoreo de seguridad
- **Latencia < 200ms**: Buena respuesta en tiempo real
- **Detecci√≥n consistente**: Sin frames perdidos

### üîß Optimizaciones Posibles:
1. **Procesar cada N frames**: Reducir carga (ej: 1 de cada 3)
2. **Reducir resoluci√≥n**: 720p es suficiente para PPE
3. **Modelo m√°s peque√±o**: YOLOv8n si necesitas m√°s velocidad
4. **GPU**: Acelera 10-50x vs CPU
5. **Batch processing**: Procesar m√∫ltiples c√°maras juntas

### üö® Troubleshooting C√°maras IP:

**Si no conecta:**
1. Verifica que la c√°mara est√© en la misma red
2. Prueba hacer ping a la IP de la c√°mara
3. Verifica usuario/contrase√±a
4. Confirma el protocolo (RTSP vs HTTP)
5. Revisa el path espec√≠fico del fabricante

**URLs comunes seg√∫n fabricante:**
- Hikvision: `rtsp://user:pass@ip:554/Streaming/Channels/101`
- Dahua: `rtsp://user:pass@ip:554/cam/realmonitor?channel=1&subtype=0`
- Axis: `rtsp://user:pass@ip:554/axis-media/media.amp`

### ‚û°Ô∏è Pr√≥ximos Pasos:
1. **Integrar con aplicaci√≥n de producci√≥n** (app/main_opensource.py)
2. **Configurar MQTT** para Node-RED
3. **Implementar dashboard** en tiempo real
4. **Configurar alertas** cuando falte EPP
5. **Exportar a CSV** logs de detecciones