# -----------------PROYECTO FINAL-----------------

## 1. IMPORTAR LIBRERIAS

In [None]:
from keras.src.saving.saving_api import load_model  # ⚠️ Import no estándar de Keras, otra "from tensorflow.keras.models import load_model"
import cv2  # OpenCV para procesamiento de imágenes/video
import mediapipe as mp  # Framework para detección de posturas corporales
import numpy as np  # Manejo de arrays numéricos
import os  # Interacción con sistema operativo
import tensorflow as tf  # Framework de ML
from collections import deque  # Estructura de datos tipo cola
from sklearn.model_selection import train_test_split  # División de datasets
import socket  # Comunicación en red
import threading  # Ejecución paralela
import psutil  # Monitoreo de recursos del sistema
import time  # Medición de tiempos/FPS
import logging

Sistema de análisis en tiempo real

|_> Procesamiento multimedia

    |_> Captura de video (OpenCV)

    |_> Detección de posturas (MediaPipe)

|_> Modelo de ML

    |_> Carga de modelo (Keras/TensorFlow)

    |_> Preprocesamiento de datos (NumPy)

|_> Gestión de datos

    |_> Almacenamiento temporal (deque)

    |_> División de datasets (scikit-learn)

|_> Monitoreo del sistema

    |_> Rendimiento en red (socket)

    |_> Uso de recursos (psutil)

    |_> Medición FPS (time)

|_> Ejecución paralela

    |_> Hilos para tareas concurrentes (threading)

## 2. INICIALIZAR MEDIAPIPE

In [None]:
class CircularBuffer:
    def __init__(self, size, total_landmarks):
        # Inicializa buffer con array numpy pre-dimensionado
        self.buffer = np.zeros((size, total_landmarks), dtype=np.float32)  
        self.size = size  # Capacidad máxima del buffer
        self.idx = 0  # Puntero de escritura actual
        self.full = False  # Flag de buffer lleno

    def add(self, data):
        # Almacena datos en posición actual y actualiza índice
        self.buffer[self.idx] = data  # Escritura en posición del puntero
        self.idx = (self.idx + 1) % self.size  # Incremento circular
        self.full = self.full or self.idx == 0  # Actualiza estado de llenado

    def get_sequence(self):
        # Devuelve secuencia ordenada temporalmente
        if self.full:
            # Buffer lleno: concatena final + inicio
            return np.concatenate([self.buffer[self.idx:], self.buffer[:self.idx]])
        #return self.buffer[:self.idx]  # 🚨 Secuencia invertida si no está lleno
        return self.buffer[:self.idx][::-1]  # Invertir orden si no está lleno

Gestión de datos temporales

|_> Inicialización

    |_> Array pre-asignado (eficiencia memoria)

    |_> Control de capacidad fija

|_> Operaciones

    |_> Adición de datos

        |_> Escritura secuencial circular

        |_> Actualización automática de estado

    |_> Recuperación de secuencia

        |_> Manejo de 2 casos (buffer lleno/vacio)
        
        |_> Reordenamiento temporal

In [None]:
import socket
import time
import threading
import logging
import numpy as np
import cv2
from collections import deque

class UDPCamera:
    """
    Cliente UDP mejorado para recibir video con mayor fiabilidad, 
    manejo de frames y rendimiento.
    """
    def __init__(self, host='0.0.0.0', port=5000, buffer_size=65536, timeout=0.1, auto_start=False):
        """
        Inicialización con parámetros configurables.
        
        Args:
            host: IP a la que se va a enlazar.
            port: Puerto UDP para escuchar.
            buffer_size: Tamaño máximo del paquete UDP.
            timeout: Tiempo de espera del socket en segundos.
            auto_start: Si es True, inicia automáticamente la recepción.
        """
        self.host = host
        self.port = port
        self.buffer_size = buffer_size
        
        # 1. Configuración del socket con verificación del tamaño del buffer
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 4 * buffer_size)
        actual_rcvbuf = self.sock.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF)
        if actual_rcvbuf < 4 * buffer_size:
            logging.warning(f"El tamaño del buffer de recepción es {actual_rcvbuf} bytes, menor que el solicitado {4 * buffer_size} bytes.")
        self.sock.settimeout(timeout)
        
        # 2. Manejo de frames: se eliminan variables no utilizadas
        self.current_frame = None
        self.last_frame = None
        self.frame_fragments = {}    # Almacena fragmentos por frame_id
        self.frame_timestamps = {}   # Marca de tiempo para cada frame
        
        # Variables eliminadas: self.current_frame_id, self.fragment_count
        
        # 3. Configuración de hilos y sincronización
        self.running = False
        self.thread = None
        self.lock = threading.Lock()
        
        # 4. Cálculo de FPS
        self.fps = 0
        self.frame_times = deque(maxlen=30)
        self.last_frame_time = time.time()
        self.frame_count = 0
        
        # 5. Gestión de limpieza de frames incompletos
        self.last_cleanup_time = time.time()
        self.cleanup_interval = 1.0  # Limpieza cada 1 segundo
        
        # 6. Se elimina el inicio automático en el constructor
        if auto_start:
            self.start()
    
    def start(self):
        """Inicia el receptor UDP con verificación del estado."""
        if self.isOpened():
            logging.info("El receptor UDP ya se encuentra en ejecución.")
            return True
        
        try:
            self.sock.bind((self.host, self.port))
            self.running = True
            self.thread = threading.Thread(
                target=self._receive_frames, 
                name="UDP_Receiver",
                daemon=False  # Se mantiene para permitir una finalización controlada
            )
            self.thread.start()
            time.sleep(0.1)  # Espera breve para la verificación
            if self.thread.is_alive():
                logging.info(f"Escuchando en {self.host}:{self.port}")
                return True
            else:
                logging.error("El hilo receptor no se inició correctamente.")
                self.running = False
                return False
        except PermissionError:
            logging.error(f"El puerto {self.port} requiere privilegios de administrador.")
        except OSError as e:
            logging.error(f"Error de red o conflicto en el puerto: {e.strerror}")
        return False
    
    def _receive_frames(self):
        """Hilo principal para recibir paquetes UDP, ensamblar frames y controlar tiempos."""
        frame_timeout = 0.5  # Tiempo máximo para recibir todos los fragmentos
        iteration_count = 0
        
        while self.running:
            try:
                # Recepción del paquete UDP
                packet_data, _ = self.sock.recvfrom(self.buffer_size)
                # 7. Verificar posible truncamiento de datos
                if len(packet_data) == self.buffer_size:
                    logging.warning("El paquete recibido puede estar truncado debido al tamaño del buffer.")
                
                # Validar longitud mínima para procesar la cabecera
                if len(packet_data) < 8:
                    continue  # Descarta paquetes inválidos
                
                # 8. Decodificación de la cabecera (formato big-endian)
                header = packet_data[:8]
                frame_id = int.from_bytes(header[0:4], byteorder='big')
                fragment_id = int.from_bytes(header[4:6], byteorder='big')
                total_fragments = int.from_bytes(header[6:8], byteorder='big')
                fragment_data = packet_data[8:]
                
                with self.lock:
                    # Inicializa la entrada para un nuevo frame
                    if frame_id not in self.frame_fragments:
                        self.frame_fragments[frame_id] = {}
                        self.frame_timestamps[frame_id] = time.time()
                    
                    # Guarda el fragmento (se sobrescribe en caso de duplicados)
                    self.frame_fragments[frame_id][fragment_id] = fragment_data
                    
                    # 9. Reconstrucción del frame cuando se han recibido todos los fragmentos
                    if len(self.frame_fragments[frame_id]) == total_fragments:
                        # Verifica que los fragmentos estén completos y en orden
                        if sorted(self.frame_fragments[frame_id].keys()) == list(range(total_fragments)):
                            # Se ensamblan los fragmentos utilizando join para eficiencia
                            fragments = [self.frame_fragments[frame_id][i] for i in range(total_fragments)]
                            frame_data = b''.join(fragments)
                            try:
                                frame_array = np.frombuffer(frame_data, dtype=np.uint8)
                                decoded_frame = cv2.imdecode(frame_array, cv2.IMREAD_COLOR)
                                if decoded_frame is not None:
                                    self.last_frame = self.current_frame
                                    self.current_frame = decoded_frame
                                    now = time.time()
                                    self.frame_times.append(now)
                                    if len(self.frame_times) >= 2:
                                        self.fps = len(self.frame_times) / (self.frame_times[-1] - self.frame_times[0])
                                else:
                                    logging.error("Error: cv2.imdecode devolvió None.")
                            except Exception as e:
                                logging.error(f"Error al decodificar el frame: {e}")
                        else:
                            logging.warning(f"Frame incompleto para frame_id {frame_id}. Faltan fragmentos.")
                        
                        # Limpieza del frame ya procesado
                        del self.frame_fragments[frame_id]
                        del self.frame_timestamps[frame_id]
                
                # 10. Limpieza periódica de frames incompletos para mejorar el rendimiento
                iteration_count += 1
                current_time = time.time()
                if current_time - self.last_cleanup_time > self.cleanup_interval:
                    with self.lock:
                        expired_frames = [
                            fid for fid, ts in self.frame_timestamps.items() 
                            if current_time - ts > frame_timeout
                        ]
                        for fid in expired_frames:
                            logging.info(f"Limpieza de frame expirado: frame_id {fid}")
                            del self.frame_fragments[fid]
                            del self.frame_timestamps[fid]
                    self.last_cleanup_time = current_time
                
            except socket.timeout:
                continue  # Timeout esperado, continuar
            except Exception as e:
                logging.error(f"Error en el receptor UDP: {e}")
                if not self.running:
                    break
    
    def read(self):
        """
        Retorna el último frame completo o el último frame bueno como fallback.
        
        Returns:
            tuple: (True, frame) si hay un frame disponible; de lo contrario (False, None)
        """
        with self.lock:
            if self.current_frame is not None:
                return True, self.current_frame.copy()
            elif self.last_frame is not None:
                return True, self.last_frame.copy()
            return False, None
    
    def get_fps(self):
        """Retorna el FPS estimado actual."""
        with self.lock:
            return self.fps
    
    def isOpened(self):
        """Verifica si la cámara UDP está en ejecución."""
        return self.running and self.thread and self.thread.is_alive()
    
    def release(self):
        """Detiene el hilo receptor y libera los recursos."""
        self.running = False
        if self.thread and self.thread.is_alive():
            self.thread.join(timeout=1)
        with self.lock:
            self.frame_fragments.clear()
            self.frame_timestamps.clear()
            self.current_frame = None
            self.last_frame = None
        try:
            self.sock.close()
        except Exception as e:
            logging.error(f"Error al cerrar el socket: {e}")
        logging.info("UDP Camera liberada")


In [149]:
# Configuración inicial global
mp_hands = mp.solutions.hands

# Optimizar MediaPipe
hands = mp_hands.Hands(
    static_image_mode=False,
    max_num_hands=2,
    min_detection_confidence=0.45,  # Reducir confianza
    min_tracking_confidence=0.45,
    model_complexity=0  # Menor complejidad
)

mp_draw = mp.solutions.drawing_utils
dataset_dir = "dataset_11_90"
model_path = "gesture_model_me_12_90_4.h5"
sequence_length = 90
total_landmarks = 126
gestures = []
X_mean = None
X_std = None

num_camara = 0

## 3. FUNCIONES PRINCIPALES

In [150]:
# Funciones principales
def init_system():
    global gestures
    os.makedirs(dataset_dir, exist_ok=True)
    gestures = get_existing_gestures()
    
def get_existing_gestures():
    return sorted([d for d in os.listdir(dataset_dir) if os.path.isdir(os.path.join(dataset_dir, d))])


## 4. DETECCION DE MANO

In [151]:
def detect_hands():
    print("\nIniciando detección de manos. Presiona 'ESC' para salir.")
    cap = UDPCamera()  # <-- Cambio aquí

    while True:
        ret, frame = cap.read()
        if not ret:
            continue

        rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        results = hands.process(rgb_frame)

        if results.multi_hand_landmarks:
            for hand_landmarks in results.multi_hand_landmarks:
                mp_draw.draw_landmarks(frame, hand_landmarks, mp_hands.HAND_CONNECTIONS)

        cv2.imshow("Detección de Manos", frame)
        if cv2.waitKey(1) & 0xFF == 27:
            break

    cap.release()
    cv2.destroyAllWindows()

## 5. RECOLLECION DE DATOS 

In [152]:
def collect_data():
    global gestures
    gesture = input("\nIngrese la palabra o letra para la cual desea recolectar datos: ").upper()
    num_sequences = int(input("Ingrese el número de secuencias a capturar (recomendado: 50): "))
    
    save_dir = os.path.join(dataset_dir, gesture)
    os.makedirs(save_dir, exist_ok=True)

    print(f"\nRecolectando datos para el gesto '{gesture}'. Presiona 'ESC' para cancelar.")
    print("Mantenga la seña frente a la cámara...")
    
    cap = UDPCamera()
    sequence = []
    counter = 0

    #NUEVO Configurar ventana de landmarks
    landmark_window_name = "Landmarks en Tiempo Real"
    cv2.namedWindow(landmark_window_name, cv2.WINDOW_NORMAL)
    cv2.resizeWindow(landmark_window_name, 640, 480)

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        #NUEVO Crear canvas para landmarks
        landmark_canvas = np.zeros((480, 640, 3), dtype=np.uint8)  # Canvas negro 640x480

        rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        results = hands.process(rgb_frame)

        if results.multi_hand_landmarks:
            all_landmarks = []

            #NUEVO Dibujar landmarks en el canvas
            for hand_landmarks in results.multi_hand_landmarks:
                # Dibujar en el canvas negro
                mp_draw.draw_landmarks(
                    landmark_canvas,
                    hand_landmarks,
                    mp_hands.HAND_CONNECTIONS,
                    mp_draw.DrawingSpec(color=(0, 255, 0), thickness=2, circle_radius=2),
                    mp_draw.DrawingSpec(color=(0, 0, 255), thickness=2)
                )
            
            # Extraer coordenadas para el dataset
            for hand in results.multi_hand_landmarks[:2]:
                for lm in hand.landmark:
                    all_landmarks.extend([lm.x, lm.y, lm.z])
            
            # Rellenar si solo hay una mano
            if len(results.multi_hand_landmarks) < 2:
                all_landmarks += [0.0] * 63
            
            sequence.append(all_landmarks)

            # Dibujar en el frame original
            for hand_landmarks in results.multi_hand_landmarks:
                mp_draw.draw_landmarks(frame, hand_landmarks, mp_hands.HAND_CONNECTIONS)

        if len(sequence) == sequence_length:
            np.save(os.path.join(save_dir, f"secuencia_{counter}.npy"), sequence)
            counter += 1
            sequence = []
            print(f"Secuencias capturadas: {counter}/{num_sequences}")

        #NUEVO Mostrar información en ambas ventanas
        info_text = f"Secuencias: {counter}/{num_sequences}"
        cv2.putText(frame, info_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
        cv2.putText(landmark_canvas, info_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)


        cv2.imshow("Recolección de Datos", frame)
        #NEUVO
        cv2.imshow(landmark_window_name, landmark_canvas)
        if cv2.waitKey(1) & 0xFF == 27 or counter >= num_sequences:
            break

    cap.release()
    cv2.destroyAllWindows()
    gestures = get_existing_gestures()
    print(f"\nSe recolectaron {counter} secuencias para el gesto '{gesture}'")

## 6. CARGA DE DATOS

In [153]:
def tf_3d_rotation(angles):
    """Rotación 3D vectorizada para landmarks"""
    rx = tf.convert_to_tensor([
        [1, 0, 0],
        [0, tf.cos(angles[0]), -tf.sin(angles[0])],
        [0, tf.sin(angles[0]), tf.cos(angles[0])]
    ])
    
    ry = tf.convert_to_tensor([
        [tf.cos(angles[1]), 0, tf.sin(angles[1])],
        [0, 1, 0],
        [-tf.sin(angles[1]), 0, tf.cos(angles[1])]
    ])
    
    rz = tf.convert_to_tensor([
        [tf.cos(angles[2]), -tf.sin(angles[2]), 0],
        [tf.sin(angles[2]), tf.cos(angles[2]), 0],
        [0, 0, 1]
    ])
    
    return tf.matmul(rz, tf.matmul(ry, rx))

def augment_landmarks(landmarks):
    """Aumentación 3D mejorada"""
    angles = tf.random.uniform([3], -15.0, 15.0) * (np.pi / 180.0)
    rot_matrix = tf_3d_rotation(angles)
    
    # Aplicar rotación
    original_shape = tf.shape(landmarks)
    landmarks = tf.reshape(landmarks, [-1, 3])
    landmarks = tf.matmul(landmarks, rot_matrix)
    landmarks = tf.reshape(landmarks, original_shape)
    
    # Añadir ruido diferenciado
    noise = tf.random.normal(tf.shape(landmarks), mean=0.0, stddev=0.08)
    return landmarks + noise

In [154]:

def custom_augmentation(sequence):
    """Aumentación 100% en TensorFlow con rotaciones 3D"""
    sequence = tf.cast(sequence, tf.float32)
    
    # 1. Aplicar rotación 3D
    sequence = augment_landmarks(sequence)  # <--- Aquí se usa
    
    # 2. Ruido Gaussiano (ya incluido en augment_landmarks, pero puedes añadir más)
    noise = tf.random.normal(tf.shape(sequence), mean=0.0, stddev=0.05)
    sequence = tf.add(sequence, noise)
    
    # 3. Escalado aleatorio 
    scale_factor = tf.random.uniform([], 0.9, 1.1)
    sequence = tf.multiply(sequence, scale_factor)
    
    return sequence

# Modificar la función create_dataset
def create_dataset(X_data, y_data, augment=False):
    dataset = tf.data.Dataset.from_tensor_slices((X_data, y_data))
    
    if augment:
        dataset = dataset.map(
            lambda x, y: (custom_augmentation(x), y),
            num_parallel_calls=tf.data.AUTOTUNE
        )
        dataset = dataset.shuffle(1000)
    
    return dataset.batch(32).prefetch(tf.data.AUTOTUNE)

In [155]:
def load_data(augment=True):
    X = []
    y = []
    
    for label_idx, gesture in enumerate(gestures):
        gesture_dir = os.path.join(dataset_dir, gesture)
        sequences = [f for f in os.listdir(gesture_dir) if f.endswith('.npy')]
        print(f"Gesto '{gesture}' - secuencias encontradas: {len(sequences)}")
        
        for seq_file in sequences:
            seq_path = os.path.join(gesture_dir, seq_file)
            sequence = np.load(seq_path)
            
            if sequence.shape == (sequence_length, total_landmarks):
                X.append(sequence)
                y.append(label_idx)
            else:
                print(f"Secuencia {seq_file} con forma {sequence.shape} ignorada.")
    
    return np.array(X, dtype=np.float32), np.array(y), gestures  # Asegurar tipo float32


## 7. ENTRENAMIENTO DEL MODELO

In [156]:
# Definir correctamente la función generadora
def representative_dataset_gen():  
    for _ in range(100):  
        yield [np.random.rand(1, sequence_length, total_landmarks).astype(np.float32)]  


def train_model():
    global X_mean, X_std, gestures
    
    # 1. Verificar datos de entrenamiento
    gestures = get_existing_gestures()
    if not gestures:
        print("\nNo hay datos recolectados. Primero recolecte datos de gestos.")
        return

    # 2. Cargar y preparar datos
    print("\nCargando datos y preparando el entrenamiento...")
    X, y, gestures = load_data(augment=False)  # Cargar sin aumentación inicial
    y = tf.keras.utils.to_categorical(y)

    # 3. Dividir datos antes de crear el Dataset
    
    X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, stratify=y, random_state=42)

    # 4. Calcular parámetros de normalización
    X_mean = np.mean(X_train, axis=(0, 1)).astype(np.float32)
    X_std = np.std(X_train, axis=(0, 1)).astype(np.float32)
    X_train = (X_train - X_mean) / X_std
    X_val = (X_val - X_mean) / X_std  # Aplicar misma normalización a validación

    train_dataset = create_dataset(X_train, y_train, augment=True)
    val_dataset = create_dataset(X_val, y_val, augment=False)
    

    # 4. Guardar parámetros de normalización
    np.savez('normalization_params_90_4.npz', mean=X_mean, std=X_std)
    
    # 5. Arquitectura optimizada del modelo
    inputs = tf.keras.Input(shape=(sequence_length, total_landmarks))
    
    # Capa de atención espacial
    attention = tf.keras.layers.MultiHeadAttention(num_heads=4, key_dim=64)(inputs, inputs)
    x = tf.keras.layers.Concatenate()([inputs, attention])
    
    # Bloques convolucionales
    x = tf.keras.layers.Conv1D(128, 5, activation='relu', padding='same')(x)
    x = tf.keras.layers.MaxPooling1D(2)(x)
    x = tf.keras.layers.Conv1D(64, 3, activation='relu', padding='same')(x)
    
    # Attention temporal
    x = tf.keras.layers.MultiHeadAttention(num_heads=2, key_dim=32)(x, x)
    
    # Pooling final
    x = tf.keras.layers.GlobalAveragePooling1D()(x)
    
    # Capas densas
    x = tf.keras.layers.Dense(64, activation='relu')(x)
    outputs = tf.keras.layers.Dense(len(gestures), activation='softmax')(x)

    model = tf.keras.Model(inputs=inputs, outputs=outputs)

    # 6. Compilación y entrenamiento
    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=0.0001, global_clipnorm=1.0),
        loss='categorical_crossentropy',
        metrics=['accuracy'],
        weighted_metrics=['accuracy']
    )
    
    model.summary()

    print("\nIniciando entrenamiento...")
    history = model.fit(
        train_dataset,
        validation_data=val_dataset,  # Usar dataset de validación explícito
        epochs=50,
        verbose=1
    )
    # 7. Guardar modelo y resultados
    model.save(model_path)
    print(f"\nModelo guardado en {model_path}")
    
    # 8. Conversión a TFLite con configuraciones especiales
    # Conversión TFLite
    converter = tf.lite.TFLiteConverter.from_keras_model(model)
    converter.optimizations = [tf.lite.Optimize.DEFAULT]
    converter.representative_dataset = representative_dataset_gen  # Función correcta
    converter.target_spec.supported_ops = [  
        tf.lite.OpsSet.TFLITE_BUILTINS,
        tf.lite.OpsSet.SELECT_TF_OPS
    ]
    converter.target_spec.supported_types = [tf.int8]
    converter._experimental_default_to_single_batch_in_tensor_list_ops = True
    
    try:
        tflite_model = converter.convert()
        with open('model_quantized_90_4.tflite', 'wb') as f:
            f.write(tflite_model)
        print("\nModelo TFLite exportado exitosamente")
    except Exception as e:
        print(f"\nError en conversión TFLite: {str(e)}")
    
    # Mostrar métricas finales
    val_accuracy = history.history['val_accuracy'][-1]
    print(f"Precisión de validación final: {val_accuracy:.2%}")

## TF LITE

In [157]:
def convert_to_tflite():
    try:
        # Cargar el modelo entrenado
        model = tf.keras.models.load_model(model_path)
        
        # Configurar el conversor con parámetros especiales
        converter = tf.lite.TFLiteConverter.from_keras_model(model)
        
        # Añadir estas 3 líneas clave para compatibilidad con LSTM
        converter.target_spec.supported_ops = [
            tf.lite.OpsSet.TFLITE_BUILTINS,
            tf.lite.OpsSet.SELECT_TF_OPS
        ]
        converter._experimental_lower_tensor_list_ops = False
        converter.allow_custom_ops = True  # Permitir operaciones personalizadas
        
        # Realizar la conversión
        tflite_model = converter.convert()
        
        # Guardar el modelo cuantizado
        with open('model_quantized_90_4.tflite', 'wb') as f:
            f.write(tflite_model)
            
        print("\n✅ Conversión a TFLite exitosa!")
        
    except Exception as e:
        print(f"\n❌ Error en conversión: {str(e)}")
        print("Posibles soluciones:")
        print("1. Verifique que el modelo .h5 existe")
        print("2. Actualice TensorFlow: pip install --upgrade tensorflow")
        print("3. Reinicie el runtime/kernel")

    global gestures
    gestures = get_existing_gestures()
    print("Gestos cargados para evaluación:", gestures)

    print("Salida del modelo:", model.output_shape)



def representative_dataset_gen():
    # Generador de datos de ejemplo para calibración
    for _ in range(100):
        yield [np.random.randn(1, sequence_length, total_landmarks).astype(np.float32)]

In [158]:
def extrapolate_sequence(seq_array):
    """Extrapolación lineal para secuencias incompletas"""
    last_valid = np.where(seq_array.sum(axis=1) != 0)[0]
    if len(last_valid) < 2:
        return seq_array
    
    x = np.array([0, 1])
    for col in range(seq_array.shape[1]):
        y = seq_array[last_valid[-2:], col]
        coeffs = np.polyfit(x, y, 1)
        seq_array[last_valid[-1]+1:, col] = np.polyval(coeffs, np.arange(2, 2+len(seq_array)-last_valid[-1]-1))
    
    return seq_array

## 8. EVALUACION DEL MODELO

In [159]:
def evaluate():
    """
    Función para evaluar el modelo de reconocimiento de gestos en tiempo real.
    Procesa video de la cámara, detecta manos, y predice gestos basados en secuencias
    de landmarks de MediaPipe.
    """
    global gestures
    gestures = get_existing_gestures()

    # Constantes para mantener consistencia
    MODEL_PATH = "model_quantized_90_4.tflite"
    NORMALIZATION_PARAMS_PATH = 'normalization_params_90_4.npz'
    
    # 1. Verificar si existe el modelo
    if not os.path.exists(MODEL_PATH):
        print("\n¡Primero debe entrenar y convertir el modelo!")
        return
    
    # 2. Cargar parámetros y modelo
    try:
        with np.load(NORMALIZATION_PARAMS_PATH) as data:
            X_mean = data['mean']
            X_std = data['std']
            
        # Cargar el modelo TFLite
        interpreter = tf.lite.Interpreter(model_path=MODEL_PATH)
        interpreter.allocate_tensors()

        input_details = interpreter.get_input_details()[0]
        output_details = interpreter.get_output_details()[0]
        print("Output details shape:", output_details['shape'])
    except Exception as e:
        print(f"\nError crítico al cargar modelo: {str(e)}")
        return

    # 3. Configuración de cámara
    cap = UDPCamera()
    
    # 4. Variables de estado
    buffer = CircularBuffer(sequence_length)  # Usar solo buffer para secuencias
    prediction_history = deque(maxlen=15)     # Para suavizado de predicciones
    current_gesture = "Esperando..."
    current_confidence = 0.0
    latency = 0.0
    fps_counter = deque(maxlen=30)
    last_time = time.time()
    high_sensitivity_mode = False
    unknown_counter = 0

    # 5. Configurar afinidad de CPU para mejor rendimiento
    try:
        p = psutil.Process()
        p.cpu_affinity([0, 1])  # Hilo principal en núcleos 0-1
    except Exception as e:
        print(f"Advertencia: No se pudo configurar afinidad de CPU: {str(e)}")

    # 6. Bucle principal optimizado
    while True:
        # Medición de FPS
        current_time = time.time()
        fps_counter.append(1/(current_time - last_time + 1e-7))
        last_time = current_time

        # Capturar frame
        ret, frame = cap.read()
        if not ret:
            continue

        # Procesar landmarks de manos
        rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        results = hands.process(rgb_frame)
        
        # Inicializar landmarks con ceros (para casos sin detección)
        landmarks = [0.0] * total_landmarks
        
        # Extraer landmarks si hay manos detectadas
        if results.multi_hand_landmarks:
            hand_landmarks = []
            for hand in results.multi_hand_landmarks[:2]:  # Máximo 2 manos
                for lm in hand.landmark:
                    hand_landmarks.extend([lm.x, lm.y, lm.z])
            
            # Rellenar con ceros si es necesario
            if len(hand_landmarks) < total_landmarks:
                hand_landmarks += [0.0] * (total_landmarks - len(hand_landmarks))
            else:
                hand_landmarks = hand_landmarks[:total_landmarks]  # Truncar si excede
                
            landmarks = hand_landmarks

        # Añadir al buffer circular
        buffer.add(landmarks)
        
        # Ajustar modo alta sensibilidad si está activado
        if high_sensitivity_mode:
            frame = cv2.resize(frame, (320, 240))  # Reducir resolución para aumentar velocidad
        
        # Realizar predicción cuando el buffer esté completo
        if buffer.full:
            try:
                # Obtener secuencia y preprocesar
                seq_array = buffer.get_sequence()
                seq_array = (seq_array - X_mean) / (X_std + 1e-7)
                input_data = seq_array.reshape(1, sequence_length, total_landmarks).astype(np.float32)
                
                # Verificar forma de los datos de entrada
                if input_data.shape != tuple(input_details['shape']):
                    print(f"Error: Forma esperada {input_details['shape']}, obtenida {input_data.shape}")
                    continue
                
                # Inferencia con medición de latencia
                start_time = time.perf_counter()
                interpreter.set_tensor(input_details['index'], input_data)
                interpreter.invoke()
                prediction = interpreter.get_tensor(output_details['index'])[0]
                latency = (time.perf_counter() - start_time) * 1000  # ms

                # Sistema de fallback para gestos desconocidos
                if np.max(prediction) < 0.5:
                    unknown_counter += 1
                    if unknown_counter >= 3:
                        high_sensitivity_mode = not high_sensitivity_mode
                        print(f"Cambiando a modo: {'Alta Sensibilidad' if high_sensitivity_mode else 'Normal'}")
                        unknown_counter = 0
                else:
                    unknown_counter = 0
                
                # Suavizado temporal con promedio de predicciones recientes
                prediction_history.append(prediction)
                smoothed_pred = np.mean(prediction_history, axis=0)
                predicted_idx = np.argmax(smoothed_pred)
                confidence = smoothed_pred[predicted_idx]
                
                # Umbral dinámico según modo de sensibilidad
                confidence_threshold = 0.7 if not high_sensitivity_mode else 0.5
                if confidence > confidence_threshold:
                    current_gesture = gestures[predicted_idx]
                    current_confidence = confidence
                else:
                    #current_gesture = "Desconocido"
                    current_confidence = 0.0

            except Exception as e:
                print(f"Error en predicción: {str(e)}")
                continue

        # Visualización de métricas en pantalla
        fps = np.mean(fps_counter) if fps_counter else 0
        
        # Información de rendimiento
        cv2.putText(frame, f"FPS: {fps:.1f}", (10, 110), 
                   cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 2)
        cv2.putText(frame, f"Latencia: {latency:.1f}ms", (10, 140), 
                   cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 2)
        
        # Información de modo
        mode_color = (0, 255, 0) if not high_sensitivity_mode else (0, 0, 255)
        cv2.putText(frame, f"Modo: {'Normal' if not high_sensitivity_mode else 'Alta Sensibilidad'}", 
                   (10, 170), cv2.FONT_HERSHEY_SIMPLEX, 0.6, mode_color, 2)
        
        # Información de predicción
        cv2.putText(frame, f"Prediccion: {current_gesture}", (10, 30),
                   cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 0, 255), 2) #BGR
        cv2.putText(frame, f"Confianza: {current_confidence:.2%}", (10, 70),
                   cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
        
        # Mostrar frame y detectar tecla ESC para salir
        cv2.imshow("Predicciones en Tiempo Real", frame)
        if cv2.waitKey(1) & 0xFF == 27:
            break

    # Liberar recursos
    cap.release()
    cv2.destroyAllWindows()

Las mejoras clave que he hecho a su evaluate() la función incluye:

* Manejo de ruta de modelo consistente: He definido constantes en la parte superior de la función tanto para el modelo como para los archivos de parámetros de normalización para garantizar la consistencia.

* Gestión de secuencias simplificada: He eliminado el redundante sequence deque y consistentemente utilizó el CircularBuffer para gestionar secuencias de puntos de referencia manuales.

* Procesamiento de punto de referencia más limpio: La extracción de puntos de referencia ahora es más sencilla, con un manejo adecuado de los casos en que hay demasiados o muy pocos puntos de referencia.

* Mejor manejo de errores: Se agregaron mensajes de error más específicos e implementaron un continue declaración para manejar errores durante la predicción para que el bucle no se bloquee.

* Código no utilizado eliminado: Se han eliminado el código comentado y las variables no utilizadas.

* Se agregaron comentarios más detallados: Cada sección de la función ahora tiene comentarios claros que explican lo que hace.

* Estructura mejorada: La función ahora sigue un flujo más lógico con secciones distintas para la inicialización, el procesamiento del bucle principal y la visualización.

* Prevención de errores: Se agregó un bloque de prueba excepto para la configuración de afinidad de la CPU, ya que esto podría no funcionar en todas las plataformas.

* Mejor gestión de marcos: Añadido a continue instrucción cuando falla la recuperación de fotogramas en lugar de procesar datos potencialmente no válidos.

* Retroalimentación visual para cambios de modo: Se agregó una declaración de impresión para informar cuando el sistema cambia entre los modos normal y de alta sensibilidad.

Esta función revisada debe ser más confiable, mantenible y consistente, al tiempo que conserva todas las características avanzadas de su implementación original.

## 10. MENU

In [160]:
# Menú principal
def main():
    init_system()
    
    while True:
        print("\n=== Sistema de Reconocimiento de Lenguaje de Señas ===")
        print("1. Detectar Manos")
        print("2. Recolectar Datos")
        print("3. Entrenar Modelo, y despues ir a convertir a TFlite")
        print("4. Evaluar")
        print("6. Convertir a TFLite")  # Nueva opción
        print("7. Salir")
        
        choice = input("\nSeleccione una opción: ")
        
        if choice == '1':
            detect_hands()
        elif choice == '2':
            collect_data()
        elif choice == '3':
            train_model()
        elif choice == '4':
            evaluate()
        elif choice == '6':  # Nueva opción de conversión
            convert_to_tflite()
        elif choice == '7':
            print("\n¡Hasta luego!")
            break
        else:
            print("\nOpción inválida. Por favor, intente de nuevo.")

# MENU

In [161]:
if __name__ == "__main__":
    main()


=== Sistema de Reconocimiento de Lenguaje de Señas ===
1. Detectar Manos
2. Recolectar Datos
3. Entrenar Modelo, y despues ir a convertir a TFlite
4. Evaluar
6. Convertir a TFLite
7. Salir
Output details shape: [1 5]
UDP Camera started on 0.0.0.0:5000
Cambiando a modo: Alta Sensibilidad
UDP Camera released

=== Sistema de Reconocimiento de Lenguaje de Señas ===
1. Detectar Manos
2. Recolectar Datos
3. Entrenar Modelo, y despues ir a convertir a TFlite
4. Evaluar
6. Convertir a TFLite
7. Salir

¡Hasta luego!
