In [1]:
# -----------------PROYECTO FINAL MEJORADO-----------------
## 1. IMPORTAR LIBRERIAS
import cv2 
import mediapipe as mp
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import os
from collections import Counter
from filterpy.kalman import KalmanFilter
from scipy.signal import savgol_filter




In [2]:
#def speak_async(engine, text):
#    threading.Thread(target=lambda: (engine.say(text), engine.runAndWait())).start()
num_camara=0

In [3]:
## 2. CONFIGURACIÓN AVANZADA
class KalmanFilterWrapper:
    def __init__(self, process_noise=0.01, measurement_noise=1):
        self.kf = KalmanFilter(dim_x=2, dim_z=1)
        self.kf.x = np.array([0., 0.])
        self.kf.F = np.array([[1., 1.], [0., 1.]])
        self.kf.H = np.array([[1., 0.]])
        self.kf.P *= 100.
        self.kf.R = measurement_noise
        self.kf.Q = np.array([[process_noise, 0.], [0., process_noise]])
    
    def smooth(self, measurement):
        self.kf.predict()
        self.kf.update(measurement)
        return self.kf.x[0]

In [4]:
# Inicialización de MediaPipe
mp_pose = mp.solutions.pose
mp_hands = mp.solutions.hands
mp_draw = mp.solutions.drawing_utils


pose = mp_pose.Pose(
    min_detection_confidence=0.5,
    min_tracking_confidence=0.5,
    model_complexity=1  # Reducir la complejidad
)

hands = mp_hands.Hands(
    min_detection_confidence=0.5,
    min_tracking_confidence=0.5,
    max_num_hands=2,
    model_complexity=0  # Complejidad mínima
)

# Variables globales
data_dir = "sign_language_data_JUPYTER"
data_dir_video = "sign_language_data_JUPYTER_videos"
os.makedirs(data_dir, exist_ok=True)
os.makedirs(data_dir_video, exist_ok=True)

sequence_length = 30  # Frames por secuencia
n_pose_landmarks = 33 * 3
n_hand_landmarks = 21 * 3
total_landmarks = n_pose_landmarks + (n_hand_landmarks * 2)

In [5]:
## 3. PREPROCESAMIENTO MEJORADO
def preprocess_sequence(sequence):
    """Normalización robusta con manejo de bordes"""
    sequence = np.array(sequence)
    if sequence.size == 0 or np.all(sequence == 0):
        return np.zeros_like(sequence)
    
    # Eliminar outliers extremos
    q1 = np.nanquantile(sequence, 0.25, axis=0)
    q3 = np.nanquantile(sequence, 0.75, axis=0)
    iqr = q3 - q1
    sequence = np.clip(sequence, q1 - 1.5*iqr, q3 + 1.5*iqr)
    
    # Normalización adaptativa
    mean = np.nanmean(sequence, axis=0)
    std = np.nanstd(sequence, axis=0) + 1e-8  # Evitar división por cero
    
    return (sequence - mean) / std

In [6]:
def process_frame(frame):
    """Procesa un frame y retorna los resultados de pose y manos"""
    rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    pose_results = pose.process(rgb_frame)
    hands_results = hands.process(rgb_frame)
    return pose_results, hands_results

In [7]:
def extract_landmarks(pose_results, hands_results):
    """Extrae y normaliza los landmarks de pose y manos."""
    landmarks = []

    if pose_results.pose_landmarks:
        pose_landmarks = [[lm.x, lm.y, lm.z] for lm in pose_results.pose_landmarks.landmark]
        landmarks.extend(np.array(pose_landmarks).flatten())
    else:
        landmarks.extend([0] * n_pose_landmarks)

    hand_landmarks_list = []
    if hands_results.multi_hand_landmarks:
        for hand_landmarks in hands_results.multi_hand_landmarks[:2]:  # Máximo 2 manos
            hand_points = [[lm.x, lm.y, lm.z] for lm in hand_landmarks.landmark]
            hand_landmarks_list.extend(np.array(hand_points).flatten())

    # Rellenar con ceros si no se detectan ambas manos
    while len(hand_landmarks_list) < n_hand_landmarks * 2:
        hand_landmarks_list.extend([0] * n_hand_landmarks)

    landmarks.extend(hand_landmarks_list)
    landmarks = np.array(landmarks)

    # Normalización
    if np.any(landmarks):
        landmarks = (landmarks - np.mean(landmarks)) / np.std(landmarks)
    return landmarks


In [8]:
def collect_data(data_dir_video, data_dir, sign_name, sequence_length):
    """Recolecta secuencias de movimiento para una seña específica y guarda el video de los landmarks"""
    sign_dir = os.path.join(data_dir, sign_name)
    sign_dir_video = os.path.join(data_dir_video, f"{sign_name}")
    os.makedirs(sign_dir, exist_ok=True)
    os.makedirs(sign_dir_video, exist_ok=True)

    cap = cv2.VideoCapture(num_camara)  # Cambiar índice si usas DroidCam u otra cámara
    if not cap.isOpened():
        print("Error: No se pudo abrir la cámara")
        return

    total_sequences = int(input("Número de secuencias a recolectar (recomendado: 20-30): "))

    print("\nInstrucciones:")
    print(f"1. Cada secuencia grabará {sequence_length} frames de movimiento")
    print("2. Presiona ESPACIO para iniciar cada secuencia")
    print("3. Realiza el movimiento completo de la seña")
    print("4. La grabación se detendrá automáticamente")
    print("5. Presiona ESC para cancelar")

    sequence_count = 0
    frame_count = 0
    is_recording = False
    current_sequence = []

    while sequence_count < total_sequences:
        ret, frame = cap.read()
        if not ret:
            break

        frame = cv2.flip(frame, 1)
        blank_frame = np.zeros_like(frame)
        pose_results, hands_results = process_frame(frame)

        # Dibujar landmarks
        if pose_results.pose_landmarks:
            mp_draw.draw_landmarks(blank_frame, pose_results.pose_landmarks, mp_pose.POSE_CONNECTIONS)
        if hands_results.multi_hand_landmarks:
            for hand_landmarks in hands_results.multi_hand_landmarks:
                mp_draw.draw_landmarks(blank_frame, hand_landmarks, mp_hands.HAND_CONNECTIONS)

        # Mostrar mensajes
        if is_recording:
            cv2.putText(frame, f"Grabando secuencia {sequence_count + 1}...", (10, 30),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
            landmarks = extract_landmarks(pose_results, hands_results)
            current_sequence.append(landmarks)
            frame_count += 1

            if frame_count >= sequence_length:
                # Guardar la secuencia
                sequence_data = np.array(current_sequence)
                np.save(os.path.join(sign_dir, f"sequence_{sequence_count}.npy"), sequence_data)
                print(f"Secuencia {sequence_count + 1}/{total_sequences} guardada")
                sequence_count += 1
                frame_count = 0
                is_recording = False
                current_sequence = []

        cv2.imshow("Recolección de Datos", frame)
        cv2.imshow("Landmarks", blank_frame)

        key = cv2.waitKey(1) & 0xFF
        if key == 32 and not is_recording:  # Espacio para iniciar grabación
            is_recording = True
            current_sequence = []
            frame_count = 0
        elif key == 27:  # ESC para salir
            break

    cap.release()
    cv2.destroyAllWindows()


In [9]:
## 4. AUGMENTACIÓN DE DATOS
def augment_sequence(sequence):
    """Aplica transformaciones aleatorias a la secuencia"""
    # Ruido gaussiano
    noise = np.random.normal(0, 0.02, sequence.shape)
    sequence += noise
    
    # Variación temporal
    if np.random.rand() > 0.5:
        sequence = np.roll(sequence, shift=np.random.randint(-2, 3), axis=0)
    
    # Escalado no uniforme
    scale_factors = np.random.uniform(0.9, 1.1, size=sequence.shape[1])
    sequence *= scale_factors
    
    return sequence

In [10]:
def train_model(data_dir, sequence_length, total_landmarks, model_file):
    """Entrena el modelo utilizando CNN, LSTM y Transformers."""
    
    if not os.listdir(data_dir):
        print("No hay datos para entrenar")
        return

    X = []
    y = []
    class_names = sorted(os.listdir(data_dir))

    print("Cargando secuencias...")
    for class_idx, class_name in enumerate(class_names):
        class_dir = os.path.join(data_dir, class_name)
        samples = [f for f in os.listdir(class_dir) if f.startswith('sequence_')]
        print(f"Clase {class_name}: {len(samples)} secuencias")

        for sample_file in samples:
            sample_path = os.path.join(class_dir, sample_file)
            sequence = np.load(sample_path)
            X.append(sequence)
            y.append(class_idx)

    X = np.array(X)
    y = tf.keras.utils.to_categorical(y)

    # Normalización de los datos
    X = (X - np.mean(X, axis=0)) / np.std(X, axis=0)

    # Definir dimensiones del modelo
    input_shape = (sequence_length, total_landmarks)
    num_classes = len(class_names)

        # Visualización del entrenamiento
    plt.figure(figsize=(12, 6))
    plt.plot(history.history['accuracy'], label='Precisión de entrenamiento')
    plt.plot(history.history['val_accuracy'], label='Precisión de validación')
    plt.title('Precisión del Modelo')
    plt.xlabel('Épocas')
    plt.ylabel('Precisión')
    plt.legend()
    plt.show()

    plt.figure(figsize=(12, 6))
    plt.plot(history.history['loss'], label='Pérdida de entrenamiento')
    plt.plot(history.history['val_loss'], label='Pérdida de validación')
    plt.title('Pérdida del Modelo')
    plt.xlabel('Épocas')
    plt.ylabel('Pérdida')
    plt.legend()
    plt.show()

In [11]:
## 5. ARQUITECTURA DEL MODELO MEJORADA
def build_advanced_model(input_shape, num_classes):
    inputs = tf.keras.Input(shape=input_shape)
    
    # Capa de atención espacial
    att = tf.keras.layers.MultiHeadAttention(num_heads=4, key_dim=64)(inputs, inputs)
    x = tf.keras.layers.Concatenate()([inputs, att])
    x = tf.keras.layers.LayerNormalization()(x)
    
    # Bloques CNN
    x = tf.keras.layers.Conv1D(256, 5, activation='relu', padding='same')(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.MaxPooling1D(2)(x)
    x = tf.keras.layers.Dropout(0.3)(x)
    
    # Bloques LSTM bidireccional
    x = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(128, return_sequences=True))(x)
    x = tf.keras.layers.Dropout(0.4)(x)
    
    # Atención temporal
    att_temp = tf.keras.layers.MultiHeadAttention(num_heads=4, key_dim=64)(x, x)
    x = tf.keras.layers.Concatenate()([x, att_temp])
    x = tf.keras.layers.GlobalAvgPool1D()(x)
    
    # Capas densas
    x = tf.keras.layers.Dense(128, activation='relu', kernel_regularizer='l2')(x)
    x = tf.keras.layers.BatchNormalization()(x)
    outputs = tf.keras.layers.Dense(num_classes, activation='softmax')(x)
    
    model = tf.keras.Model(inputs, outputs)
    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=0.0001),
        loss='categorical_crossentropy',
        metrics=['accuracy'],
        weighted_metrics=['accuracy']
    )
    return model

In [12]:
last_spoken_word = None  # Variable global para almacenar la última palabra pronunciada

def load_model(model_file):
    """Carga el modelo previamente entrenado."""
    if not os.path.exists(model_file):
        print("No se encontró el modelo entrenado")
        return None
    #Cargar el modelo entrenado
    return tf.keras.models.load_model(model_file)

In [13]:
def load_test_data(data_dir, sequence_length):
    """Carga y prepara los datos de prueba."""
    class_names = sorted(os.listdir(data_dir))

    # Crear conjunto de prueba
    print("Cargando datos de prueba...")
    X_test = []
    y_test = []

    for class_idx, class_name in enumerate(class_names):
        class_dir = os.path.join(data_dir, class_name)
        samples = [f for f in os.listdir(class_dir) if f.startswith('sequence_')]

        # Seleccionar el 20% de las secuencias como prueba
        test_samples = samples[:int(0.2 * len(samples))]
        for sample_file in test_samples:
            sample_path = os.path.join(class_dir, sample_file)
            sequence = np.load(sample_path)
            X_test.append(sequence)
            y_test.append(class_idx)

    X_test = np.array(X_test)
    y_test = tf.keras.utils.to_categorical(y_test)
    return X_test, y_test, class_names

In [14]:
def evaluate_model(model, X_test, y_test):
    """Evalúa el modelo en los datos de prueba."""
    loss, accuracy = model.evaluate(X_test, y_test, verbose=1)
    print(f"\nPérdida en prueba: {loss:.4f}")
    print(f"Precisión en prueba: {accuracy:.2%}")

    # Mostrar métricas globales
    plt.figure(figsize=(6, 4))
    plt.bar(["Pérdida", "Precisión"], [loss, accuracy], color=['blue', 'green'])
    plt.title("Métricas globales en prueba")
    plt.ylim(0, 1)
    plt.ylabel("Valor")
    plt.show()

In [None]:

## 6. EVALUACIÓN EN TIEMPO REAL MEJORADA
def evaluate_realtime(model_file, data_dir, sequence_length, mp_draw, mp_pose, mp_hands, num_camara):
    """Evaluación con filtrado Kalman y procesamiento optimizado"""
    model = load_model(model_file)
    class_names = sorted(os.listdir(data_dir))
    kalman_filter = KalmanFilterWrapper()
    
    cap = cv2.VideoCapture(num_camara)
    cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
    cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
    
    current_sequence = []
    prediction_buffer = []
    confidence_history = []
    last_prediction = None
    
    while True:
        ret, frame = cap.read()
        if not ret: break
        
        frame = cv2.flip(frame, 1)
        pose_results, hands_results = process_frame(frame)
        
        if hands_results.multi_hand_landmarks:
            landmarks = extract_landmarks(pose_results, hands_results)
            if landmarks is not None:
                # Suavizado con Savitzky-Golay
                if len(current_sequence) >= 5:
                    landmarks = savgol_filter(landmarks, 5, 2)
                
                current_sequence.append(landmarks)
                if len(current_sequence) > sequence_length:
                    current_sequence.pop(0)
                
                if len(current_sequence) == sequence_length:
                    # Preprocesamiento y predicción
                    sequence_data = preprocess_sequence(current_sequence)
                    prediction = model.predict(np.expand_dims(sequence_data, 0), verbose=0)[0]
                    
                    # Filtrado Kalman
                    raw_confidence = np.max(prediction)
                    filtered_confidence = kalman_filter.smooth(raw_confidence)
                    class_idx = np.argmax(prediction)
                    
                    # Gestión de predicciones
                    prediction_buffer.append(class_names[class_idx])
                    if len(prediction_buffer) > 15:
                        prediction_buffer.pop(0)
                    
                    # Determinar predicción estable
                    counter = Counter(prediction_buffer)
                    most_common = counter.most_common(1)[0]
                    stable_pred = most_common[0]
                    stability = most_common[1] / len(prediction_buffer)
                    
                    # Mostrar resultados si supera umbrales
                    if stability > 0.7 and filtered_confidence > 0.8:
                        last_prediction = stable_pred
                        cv2.putText(frame, f"Seña: {stable_pred} ({filtered_confidence:.1%})", 
                                   (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2)
        
        cv2.imshow("Detección en Tiempo Real", frame)
        if cv2.waitKey(1) & 0xFF == 27:
            break
    
    cap.release()
    cv2.destroyAllWindows()

## 7. ENTRENAMIENTO CON AUGMENTACIÓN
def train_model_improved(data_dir, sequence_length, total_landmarks, model_file):
    """Entrenamiento con augmentación de datos y callbacks avanzados"""
    X, y = [], []
    class_names = sorted(os.listdir(data_dir))
    
    # Cargar y aumentar datos
    for class_idx, class_name in enumerate(class_names):
        class_dir = os.path.join(data_dir, class_name)
        samples = [f for f in os.listdir(class_dir) if f.startswith('sequence_')]
        
        for sample_file in samples:
            sequence = np.load(os.path.join(class_dir, sample_file))
            X.append(sequence)
            y.append(class_idx)
            
            # Generar variaciones aumentadas
            X.append(augment_sequence(sequence))
            y.append(class_idx)
    
    # Preprocesamiento
    X = np.array(X)
    y = tf.keras.utils.to_categorical(y)
    
    # Construir y entrenar modelo
    model = build_advanced_model((sequence_length, total_landmarks), len(class_names))
    
    # Callbacks avanzados
    callbacks = [
        tf.keras.callbacks.EarlyStopping(patience=15, restore_best_weights=True),
        tf.keras.callbacks.ReduceLROnPlateau(factor=0.5, patience=5),
        tf.keras.callbacks.ModelCheckpoint(model_file, save_best_only=True)
    ]
    
    history = model.fit(
        X, y,
        epochs=100,
        batch_size=64,
        validation_split=0.2,
        callbacks=callbacks,
        verbose=1
    )
    
    # Visualización mejorada
    plt.figure(figsize=(15, 5))
    plt.subplot(1, 2, 1)
    plt.plot(history.history['accuracy'], label='Entrenamiento')
    plt.plot(history.history['val_accuracy'], label='Validación')
    plt.title('Evolución de la Precisión')
    plt.xlabel('Época')
    plt.ylabel('Precisión')
    plt.legend()
    
    plt.subplot(1, 2, 2)
    plt.plot(history.history['loss'], label='Entrenamiento')
    plt.plot(history.history['val_loss'], label='Validación')
    plt.title('Evolución de la Pérdida')
    plt.xlabel('Época')
    plt.ylabel('Pérdida')
    plt.legend()
    
    plt.tight_layout()
    plt.show()

## 8. MENU ACTUALIZADO
def main():
    #data_dir = "data"  # Directorio para los datos recolectados
    data_dir = "sign_language_data_JUPYTER"
    model_file = "sign_language_model.h5"
    sequence_length = 90  # Longitud de la secuencia para cada muestra
    num_camara = 0  # Índice de la cámara

    # Crear el directorio si no existe
    if not os.path.exists(data_dir):
        os.makedirs(data_dir)

    while True:
        print("\n=== Sistema de Reconocimiento de Lenguaje de Señas ===")
        print("1. Ver detección de pose y manos")
        print("2. Recolectar datos de señas")
        print("3. Entrenar modelo")
        print("4. Evaluar en tiempo real")
        print("5. Salir")
        
        option = input("\nSeleccione una opción: ")
        
        if option == "1":
            cap = cv2.VideoCapture(num_camara)
            cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
            cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)

            while True:
                ret, frame = cap.read()
                if not ret:
                    break

                frame = cv2.flip(frame, 1)
                pose_results, hands_results = process_frame(frame)
                # Dibuja los landmarks si se detectan
                if pose_results.pose_landmarks:
                    mp_draw.draw_landmarks(frame, pose_results.pose_landmarks, mp_pose.POSE_CONNECTIONS)
                
                if hands_results.multi_hand_landmarks:
                    for hand_landmarks in hands_results.multi_hand_landmarks:
                        mp_draw.draw_landmarks(frame, hand_landmarks, mp_hands.HAND_CONNECTIONS)
                
                cv2.imshow("Detección de Pose y Manos", frame)

                if cv2.waitKey(1) & 0xFF == 27:  # ESC para salir
                    break

            cap.release()
            cv2.destroyAllWindows()
        
        elif option == "2":
            sign_name = input("Nombre de la seña a recolectar: ")
            collect_data(data_dir_video, data_dir, sign_name, sequence_length)
        
        elif option == "3":
            train_model(data_dir, sequence_length, total_landmarks, model_file)
            
        elif option == "4":
            evaluate_realtime(model_file, data_dir, sequence_length, mp_draw, mp_pose, mp_hands, num_camara)
        
        elif option == "5":
            print("¡Hasta luego!")
            break
        
        else:
            print("Opción no válida.")

if __name__ == "__main__":
    main()


=== Sistema de Reconocimiento de Lenguaje de Señas ===
1. Ver detección de pose y manos
2. Recolectar datos de señas
3. Entrenar modelo
4. Evaluar en tiempo real
5. Salir



=== Sistema de Reconocimiento de Lenguaje de Señas ===
1. Ver detección de pose y manos
2. Recolectar datos de señas
3. Entrenar modelo
4. Evaluar en tiempo real
5. Salir
