# 00 – Data Tasks, Functions and Preparation for the MSL-150 Dataset

This notebook documents the end-to-end data preparation pipeline used in this work,
from the original MSL videos to the final NumPy tensors consumed by the recurrent
neural networks.

It is organized into the following sections:

# 0 – Import Dependencies

In [1]:
# 0. Import Dependencies
# ------------------------------------------------------------

from moviepy.editor import *
import os
import numpy as np
import cv2
from moviepy.editor import VideoFileClip, concatenate_videoclips
from moviepy.video.fx.all import speedx
from matplotlib import pyplot as plt
import time
import mediapipe as mp
import csv
import pandas as pd

from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical

import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout, BatchNormalization, Input
from tensorflow.keras.callbacks import TensorBoard, EarlyStopping

from tensorflow.keras.optimizers import Adam
from tensorflow.keras.regularizers import l2
from sklearn.model_selection import train_test_split
from sklearn.metrics import multilabel_confusion_matrix, accuracy_score
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from sklearn.metrics import roc_curve, auc
from sklearn.preprocessing import label_binarize
import matplotlib.pyplot as plt
from sklearn.metrics import classification_report


objc[53290]: Class CaptureDelegate is implemented in both /Users/armandobecerril/anaconda3/envs/tensorflow/lib/python3.8/site-packages/cv2/cv2.abi3.so (0x3135d65d8) and /Users/armandobecerril/anaconda3/envs/tensorflow/lib/python3.8/site-packages/mediapipe/.dylibs/libopencv_videoio.3.4.16.dylib (0x314748860). One of the two will be used. Which one is undefined.
objc[53290]: Class CVWindow is implemented in both /Users/armandobecerril/anaconda3/envs/tensorflow/lib/python3.8/site-packages/cv2/cv2.abi3.so (0x3135d6628) and /Users/armandobecerril/anaconda3/envs/tensorflow/lib/python3.8/site-packages/mediapipe/.dylibs/libopencv_highgui.3.4.16.dylib (0x16a0f0a68). One of the two will be used. Which one is undefined.
objc[53290]: Class CVView is implemented in both /Users/armandobecerril/anaconda3/envs/tensorflow/lib/python3.8/site-packages/cv2/cv2.abi3.so (0x3135d6650) and /Users/armandobecerril/anaconda3/envs/tensorflow/lib/python3.8/site-packages/mediapipe/.dylibs/libopencv_highgui.3.4.16.d

# 1 - Configuración global, rutas y semillas

In [4]:
# 1. Initial Configuration and Experiment Scope
# ------------------------------------------------------------
# This cell defines:
# - BASE_DIR: repository root (one level above `notebooks/`)
# - DATA_DIR: main data directory
# - RAW_VIDEO_DIR: full raw videos (not included in the public repo)
# - RAW_NPY_DIR: full keypoint tensors
# - SAMPLE_NPY_DIR: small public subset for reproducibility
# - RANDOM SEEDs for NumPy, TensorFlow and Python
from pathlib import Path
import os
import random
import numpy as np
# Resolve base directory *relative* to this notebook
NOTEBOOK_DIR = Path.cwd()
BASE_DIR = NOTEBOOK_DIR.parent          # MSL-150 root
DATA_DIR = BASE_DIR / "data"

RAW_VIDEO_DIR = DATA_DIR / "raw"        # full set of original videos (local only)
RAW_NPY_DIR = DATA_DIR / "raw_npy"      # full npy export
SAMPLE_NPY_DIR = DATA_DIR / "sample_npy"  # small subset for GitHub

DICTIONARY_DIR = DATA_DIR / "dictionary"
ORIGINAL_SOURCE_DIR = DATA_DIR / "original_source"   # anonymized vocabulary samples
CASES_DIR = DATA_DIR / "cases"                       # anonymized narrative cases

# Output synthetic samples data
SYNTH_DATA = os.path.join(BASE_DIR, "data", "synthetic_sample_data")

TERMS_PATH = os.path.join(BASE_DIR, "data", "dictionary", "terms.txt")
TERMS_SAMPLE_PATH = os.path.join(BASE_DIR, "data", "dictionary", "terms_sample.txt")

USE_SAMPLE = True  # Cambia a False para usar vocabulario completo
TERMS_PATH = TERMS_SAMPLE_PATH if USE_SAMPLE else TERMS_PATH


print("NOTEBOOK_DIR      :", NOTEBOOK_DIR)
print("BASE_DIR          :", BASE_DIR)
print("DATA_DIR          :", DATA_DIR)
print("RAW_VIDEO_DIR     :", RAW_VIDEO_DIR)
print("RAW_NPY_DIR       :", RAW_NPY_DIR)
print("SAMPLE_NPY_DIR    :", SAMPLE_NPY_DIR)
print("ORIGINAL_SOURCE_DIR:", ORIGINAL_SOURCE_DIR)
print("CASES_DIR         :", CASES_DIR)

# Create output directories if they do not exist (safe for public repo)
RAW_NPY_DIR.mkdir(parents=True, exist_ok=True)
SAMPLE_NPY_DIR.mkdir(parents=True, exist_ok=True)

# Global config: reproducibility
RANDOM_SEED = 42
frames=30
samples=20

random.seed(RANDOM_SEED)
np.random.seed(RANDOM_SEED)
tf.random.set_seed(RANDOM_SEED)

print("Random seeds set to:", RANDOM_SEED)


NOTEBOOK_DIR      : /Users/armandobecerril/PhD/MSL-150/notebooks
BASE_DIR          : /Users/armandobecerril/PhD/MSL-150
DATA_DIR          : /Users/armandobecerril/PhD/MSL-150/data
RAW_VIDEO_DIR     : /Users/armandobecerril/PhD/MSL-150/data/raw
RAW_NPY_DIR       : /Users/armandobecerril/PhD/MSL-150/data/raw_npy
SAMPLE_NPY_DIR    : /Users/armandobecerril/PhD/MSL-150/data/sample_npy
ORIGINAL_SOURCE_DIR: /Users/armandobecerril/PhD/MSL-150/data/original_source
CASES_DIR         : /Users/armandobecerril/PhD/MSL-150/data/cases
Random seeds set to: 42


In [5]:
# Leer términos desde TERMS_PATH y construir el diccionario de vocabulario
videos_dict_aug = {}

with open(TERMS_PATH, 'r') as f:
    for line in f:
        term = line.strip()
        if term:
            videos_dict_aug[term] = samples  # p.ej. 800 , 200 o 20 según tu escenario

print("Número de términos en videos_dict_aug:", len(videos_dict_aug))
list(videos_dict_aug.items())[:10]

Número de términos en videos_dict_aug: 5


[('ambulancia', 20), ('doctor', 20), ('dolor', 20), ('hoy', 20), ('yo', 20)]

# 2. Synthetic sample video generation for MSL-150
-------------------------------------------------------------

This script takes one blurred source video per sign (e.g. "ambulancia_blur.mp4")
from:

    data/original_source/

and generates up to N augmented videos per sign under:

    data/synthetic_sample_videos/<term>/<term>_XXX.mp4

Each synthetic clip applies:
- small random speed variations (0.85x–1.15x),
- a small random rotation (-10° to +10°),
- frame-wise Gaussian noise.

This sample-friendly pipeline reproduces the main augmentation logic used in the
full MSL-150 dataset, but only for the five public signs:

    ambulancia, doctor, dolor, hoy, yo

so reviewers and other researchers can inspect an end-to-end version of the
process without accessing the full private dataset.

In [6]:
import os
import numpy as np
from moviepy.editor import VideoFileClip
import moviepy.video.fx.all as vfx

# --------------------------------------------------------------------
# 1. Paths and target configuration
# --------------------------------------------------------------------
BASE_DIR = "/Users/armandobecerril/PhD/MSL-150"

# Base source videos
ORIGINAL_DIR = os.path.join(BASE_DIR, "data", "original_source")

# Output synthetic samples
SYNTH_DIR = os.path.join(BASE_DIR, "data", "synthetic_sample_videos")

# Number of synthetic videos per sign
videos_dict_aug = {
    "ambulancia": samples,
    "doctor": samples,
    "dolor": samples,
    "hoy": samples,
    "yo": samples,
}


# --------------------------------------------------------------------
# 2. Utility functions
# --------------------------------------------------------------------
def ensure_directory_exists(directory):
    if not os.path.exists(directory):
        os.makedirs(directory)
        print(f"[INFO] Created directory: {directory}")


def add_gaussian_noise(image, mean=0, sigma=25):
    """Add Gaussian noise to a frame."""
    gauss = np.random.normal(mean, sigma, image.shape)
    noisy = np.clip(image.astype(np.float32) + gauss, 0, 255).astype(np.uint8)
    return noisy


def transform_clip(clip):
    """Apply augmentations: speed, rotation, and Gaussian noise."""

    # 1) Random speed
    speed_factor = np.random.uniform(0.85, 1.15)
    clip = clip.fx(vfx.speedx, speed_factor)

    # 2) Random rotation
    rotation_deg = np.random.uniform(-10, 10)
    clip = clip.fx(vfx.rotate, rotation_deg)

    # 3) Gaussian noise per frame
    def add_noise(get_frame, t):
        return add_gaussian_noise(get_frame(t))

    clip = clip.fl(add_noise)
    return clip


def is_valid_filename(filename, term):
    """
    Accept filenames of the form <term>_XXX.mp4
    """
    if not filename.lower().endswith(".mp4"):
        return False

    name, _ = os.path.splitext(filename)
    parts = name.split("_")

    if len(parts) != 2:
        return False

    prefix, idx = parts
    return prefix == term and idx.isdigit()


# --------------------------------------------------------------------
# 3. Main generation routine
# --------------------------------------------------------------------
def manage_videos(original_dir, synth_root, videos_dict_aug):

    ensure_directory_exists(synth_root)

    for term, num_samples in videos_dict_aug.items():

        term_dir = os.path.join(synth_root, term)
        ensure_directory_exists(term_dir)

        # List current synthetic samples
        existing = sorted(
            f for f in os.listdir(term_dir)
            if is_valid_filename(f, term)
        )

        needed = num_samples - len(existing)
        print(f"\n[TERM] {term}: existing={len(existing)}, need={needed}")

        if needed <= 0:
            print("[SKIP] Already have required number of videos.")
            continue

        # Base video — IMPORTANT change — now <term>_001.mp4
        base_video_path = os.path.join(original_dir, f"{term}_001.mp4")

        if not os.path.exists(base_video_path):
            print(f"[ERROR] Base video not found for: {base_video_path}")
            continue

        start_idx = len(existing) + 1  # continue numbering

        for i in range(start_idx, num_samples + 1):
            out_path = os.path.join(term_dir, f"{term}_{i:03}.mp4")

            print(f"[GEN] Creating: {out_path}")
            with VideoFileClip(base_video_path) as clip:
                aug_clip = transform_clip(clip)
                aug_clip.write_videofile(
                    out_path,
                    codec="libx264",
                    audio=False,
                    verbose=False,
                    logger=None
                )

    print("\n[DONE] All synthetic sample videos generated.")


# --------------------------------------------------------------------
# 4. Execute
# --------------------------------------------------------------------
if __name__ == "__main__":
    manage_videos(ORIGINAL_DIR, SYNTH_DIR, videos_dict_aug)


[TERM] ambulancia: existing=20, need=0
[SKIP] Already have required number of videos.

[TERM] doctor: existing=20, need=0
[SKIP] Already have required number of videos.

[TERM] dolor: existing=20, need=0
[SKIP] Already have required number of videos.

[TERM] hoy: existing=20, need=0
[SKIP] Already have required number of videos.

[TERM] yo: existing=20, need=0
[SKIP] Already have required number of videos.

[DONE] All synthetic sample videos generated.


# 3. MediaPipe Keypoints & Project Functions

For all videos, we used MediaPipe Holistic to extract 3D keypoints from the upper body and both hands. Each frame was preprocessed using a dedicated wrapper (mediapipe_detection) that converts the image from BGR to RGB, runs holistic inference, and returns both the annotated frame and the landmark structures. We then converted the pose and hand landmarks into a fixed-length feature vector with extract_keypoints_lsm, concatenating 25 pose landmarks (x, y, z, visibility) and 21×2 hand landmarks (x, y, z) and zero-padding missing detections. To reduce temporal noise, we applied a central cropping procedure (central_clean_data) that removes low-information frames and selects a centered window of 30 frames per clip. The resulting frame-level features were first stored in CSV format and later converted into NumPy sequences with extract_keypoints_lsm_from_csv and save_keypoints_to_npy, which organize the data in a hierarchical directory of .npy files grouped by sign and video sample. This preprocessing pipeline, fully released in our GitHub repository, standardizes all inputs for training and evaluating the LSTM/GRU models reported in this work.

In [7]:
mp_holistic = mp.solutions.holistic # Holistic model
mp_drawing = mp.solutions.drawing_utils # Drawing utilities4

def mediapipe_detection(image, model):
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # COLOR CONVERSION BGR 2 RGB
    image.flags.writeable = False                  # Image is no longer writeable
    results = model.process(image)                 # Make prediction
    image.flags.writeable = True                   # Image is now writeable 
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) # COLOR COVERSION RGB 2 BGR
    return image, results

def draw_landmarks(image, results):
    lineDrawingSpec = mp_drawing.DrawingSpec(thickness=1, color=(255,255,0))
    pointDrawingSpec1 = mp_drawing.DrawingSpec(color=(255,255,0), thickness=1, circle_radius=1)
    pointDrawingSpec2 = mp_drawing.DrawingSpec(color=(255,255,0), thickness=2, circle_radius=1)
    mp_drawing.draw_landmarks(image, results.face_landmarks, mp_holistic.FACE_CONNECTIONS) # Draw face connections
    mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS) # Draw pose connections
    mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS) # Draw left hand connections
    mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS) # Draw right hand connections
    
def draw_landmarks_lsm(image, results):
    lineDrawingSpec = mp_drawing.DrawingSpec(thickness=1, color=(255,255,0))
    pointDrawingSpec1 = mp_drawing.DrawingSpec(color=(255,255,0), thickness=1, circle_radius=1)
    pointDrawingSpec2 = mp_drawing.DrawingSpec(color=(255,255,0), thickness=2, circle_radius=1)
    #mp_drawing.draw_landmarks(image, results.face_landmarks, mp_holistic.FACE_CONNECTIONS) # Draw face connections
    mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS) # Draw pose connections
    mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS) # Draw left hand connections
    mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS) # Draw right hand connections
    
def draw_styled_landmarks(image, results):
    # Draw face connections
    mp_drawing.draw_landmarks(image, results.face_landmarks, mp_holistic.FACE_CONNECTIONS, 
                             mp_drawing.DrawingSpec(color=(255,255,0), thickness=1, circle_radius=1), 
                             mp_drawing.DrawingSpec(color=(255,255,0), thickness=1, circle_radius=1)
                             ) 
    # Draw pose connections
    mp_drawing.draw_landmarks(image, results.pose_landmarks,mp_holistic.POSE_CONNECTIONS,
                             mp_drawing.DrawingSpec(color=(255,255,0), thickness=2, circle_radius=4), 
                             mp_drawing.DrawingSpec(color=(255,255,0), thickness=2, circle_radius=2)
                             ) 
    # Draw left hand connections
    mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
                             mp_drawing.DrawingSpec(color=(255,255,0), thickness=2, circle_radius=2), 
                             mp_drawing.DrawingSpec(color=(255,255,0), thickness=2, circle_radius=2)
                             ) 
    # Draw right hand connections  
    mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
                             mp_drawing.DrawingSpec(color=(255,255,0), thickness=2, circle_radius=2), 
                             mp_drawing.DrawingSpec(color=(255,255,0), thickness=2, circle_radius=2)
                             )

def draw_styled_landmarks_lsm(image, results):
    
    # Draw face connections  
    #mp_drawing.draw_landmarks(image, results.face_landmarks, mp_holistic.FACE_CONNECTIONS, 
    #mp_drawing.DrawingSpec(color=(255,255,0), thickness=1, circle_radius=1), 
    #mp_drawing.DrawingSpec(color=(255,255,0), thickness=1)
    #                         ) 
    # Draw pose connections
    mp_drawing.draw_landmarks(image, results.pose_landmarks,mp_holistic.POSE_CONNECTIONS,
                             mp_drawing.DrawingSpec(color=(255,255,0), thickness=2, circle_radius=4), 
                             mp_drawing.DrawingSpec(color=(255,255,0), thickness=2, circle_radius=2)
                             ) 
    # Draw left hand connections
    mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
                             mp_drawing.DrawingSpec(color=(255,255,0), thickness=2, circle_radius=2), 
                             mp_drawing.DrawingSpec(color=(255,255,0), thickness=2, circle_radius=2)
                             ) 
    # Draw right hand connections  
    mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
                             mp_drawing.DrawingSpec(color=(255,255,0), thickness=2, circle_radius=2), 
                             mp_drawing.DrawingSpec(color=(255,255,0), thickness=2, circle_radius=2)
                             )

def get_column_headers():
    # Set column headers for CSV file
    column_headers = ['VIDEO_SAMPLE ','CLASSIFICATION', 'FRAME', 'TIMESTAMP']
    
    for point_rh in mp_holistic.HandLandmark:
        column_headers += [f'RIGHT_{point_rh.name}_X', f'RIGHT_{point_rh.name}_Y', f'RIGHT_{point_rh.name}_Z']
    
    for point_lh in mp_holistic.HandLandmark:
        column_headers += [f'LEFT_{point_lh.name}_X', f'LEFT_{point_lh.name}_Y', f'LEFT_{point_lh.name}_Z']
    
    for point in mp_holistic.PoseLandmark:
        if point.value < 25 or point.value > 32:
            column_headers += [f'{point.name}_X', f'{point.name}_Y', f'{point.name}_Z', f'{point.name}_V']
            
    return column_headers

def get_max_video_sample(file_path):
    """ Obtiene el máximo número de 'video_sample' registrado en el archivo CSV. """
    max_sample = 0
    if os.path.exists(file_path):
        with open(file_path, 'r') as file:
            reader = csv.reader(file)
            next(reader)  # Skip header
            for row in reader:
                if row:
                    current_sample = int(row[0])
                    if current_sample > max_sample:
                        max_sample = current_sample
    return max_sample

def extract_keypoints_lsm(results):
    # mano derecha
    rh = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]).flatten() if results.right_hand_landmarks else np.zeros(21*3)

    # mano izquierda
    lh = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]).flatten() if results.left_hand_landmarks else np.zeros(21*3)

    # Pose (excluding landmarks 25-32)
    pose = np.array([[res.x, res.y, res.z, res.visibility] for idx, res in enumerate(results.pose_landmarks.landmark) if idx < 25 or idx > 32]).flatten() if results.pose_landmarks else np.zeros(25*4)

    return np.concatenate([pose, lh, rh])

def ensure_directory_exists(directory):
    """
    Ensure the directory exists.
    
    Args:
        directory (str): The full path to the directory to ensure exists.
        
    Returns:
        str: The path to the directory.
    """
    if not os.path.exists(directory):
        os.makedirs(directory)
    return directory

#Data Standarization 30 Frames
#Se estandrariza e integran archivos a partir de un número de Frames dado, reduciendo de los extremos del video que es donde predominan más ceros

def central_clean_data(data, frames):
    non_zero_rows = data[(data.drop("VIDEO_SAMPLE", axis=1) != 0).sum(axis=1) > data.shape[1] * 0.5]
    
    num_rows = len(non_zero_rows)
    if num_rows < frames:
        # Si hay menos filas que 'frames', simplemente devuelve todos los datos.
        return non_zero_rows
    else:
        # Calcula el inicio y el final para centrar los datos en 'frames'.
        start_idx = (num_rows - frames) // 2
        end_idx = start_idx + frames
        return non_zero_rows.iloc[start_idx:end_idx]
    
# Data Preparation Directories & Numpy
def extract_keypoints_lsm_from_csv(df, classification, video_sample_num):
    # Filtra el dataframe por la clasificación solicitada
    filtered_data = df[(df['CLASSIFICATION'] == classification) & (df['VIDEO_SAMPLE'] == video_sample_num)]

    # Inicializa matrices vacías si no hay datos
    if filtered_data.empty:
        pose_data = np.zeros((1, 100))  # Asumiendo 25 keypoints x 4 datos cada uno
        lh_data = np.zeros((1, 63))  # Asumiendo 21 keypoints x 3 datos cada uno
        rh_data = np.zeros((1, 63))  # Asumiendo 21 keypoints x 3 datos cada uno
    else:
        # Extraer datos de mano derecha, mano izquierda y pose usando índices fijos
        rh_data = filtered_data.iloc[:, 4:67].values   # Columns 4-66 (0-indexed)
        lh_data = filtered_data.iloc[:, 67:130].values # Columns 67-129
        pose_data = filtered_data.iloc[:, 130:230].values # Columns 130-229

    # Concatenar y devolver
    return np.concatenate([pose_data, lh_data, rh_data], axis=1)


def save_keypoints_to_npy(df, output_base_path):
    classifications = df['CLASSIFICATION'].unique()
    
    for classification in classifications:
        class_dir = os.path.join(output_base_path, str(classification))
        if not os.path.exists(class_dir):
            os.makedirs(class_dir)
        
        video_samples_for_classification = df[df['CLASSIFICATION'] == classification]['VIDEO_SAMPLE'].unique()
        
        for video_sample in video_samples_for_classification:
            keypoints_data = extract_keypoints_lsm_from_csv(df, classification, video_sample)
            #print(f"Total features for {classification}-{video_sample}:", keypoints_data.shape[1])
            
            sample_dir = os.path.join(class_dir, str(video_sample))
            if not os.path.exists(sample_dir):
                os.makedirs(sample_dir)
            
            for idx, keypoints in enumerate(keypoints_data):
                np.save(os.path.join(sample_dir, f"{idx}.npy"), keypoints)

# 4. Data Extraction (Build CSV Data Base for Mexican Sign Language)
Toma los videos sintéticos de muestra generados en data/synthetic_sample_videos/<term>/<term>_XXX.mp4.

Para cada término y cada video:

Recorre frame por frame.

Ejecuta MediaPipe Holistic y extrae:

3D landmarks de mano derecha (21×3),

mano izquierda (21×3),

y un subconjunto de landmarks de pose (25×4).

Construye una fila por frame con:

VIDEO_SAMPLE, CLASSIFICATION, FRAME, TIMESTAMP, y todos los features.

Guarda todo en CSV por término en
data/synthetic_sample_data/<term>/<term>_lsm.csv, con un header compatible con el resto del pipeline que después transforma estos CSV en secuencias .npy

In [9]:
import os
import csv
import cv2
import numpy as np

# ---------------------------------------------------------
# Supuestos: ya tienes definidas en el notebook:
#   - mp_holistic
#   - mediapipe_detection(image, model)
#   - draw_styled_landmarks_lsm(image, results)
#   - ensure_directory_exists(path)
#   - get_column_headers()
#   - videos_dict_aug = {"ambulancia": 20, "doctor": 20, ...}  # para samples
# ---------------------------------------------------------

BASE_DIR = "/Users/armandobecerril/PhD/MSL-150"

SYNTH_VIDEOS = os.path.join(BASE_DIR, "data", "synthetic_sample_videos")
SYNTH_DATA   = os.path.join(BASE_DIR, "data", "synthetic_sample_data")

# Directorio raíz donde se guardarán los CSV por término
std_dir = ensure_directory_exists(SYNTH_DATA)

# Mostrar ventana de QA visual (pon en False para correr rápido sin GUI)
SHOW_VIDEO = True


# ---------------------------------------------------------
# 1) Utility: obtener máximo VIDEO_SAMPLE ya presente en un CSV
# ---------------------------------------------------------
def get_max_video_sample(file_path):
    """
    Obtiene el máximo valor de VIDEO_SAMPLE ya registrado en el CSV.
    Si el archivo no existe o está vacío, regresa 0.

    Se asume que la primera columna es VIDEO_SAMPLE.
    """
    max_sample = 0
    if not os.path.exists(file_path):
        return 0

    with open(file_path, "r") as f:
        reader = csv.reader(f)
        try:
            next(reader)  # saltar header
        except StopIteration:
            return 0

        for row in reader:
            if not row:
                continue
            try:
                current_sample = int(row[0])
                if current_sample > max_sample:
                    max_sample = current_sample
            except ValueError:
                continue

    return max_sample


# ---------------------------------------------------------
# 2) Asegurar header correcto usando get_column_headers()
# ---------------------------------------------------------
def ensure_csv_header(output_csv_path):
    """
    Crea el CSV con el header semántico completo si el archivo
    no existe o está vacío.

    Usa la función get_column_headers() ya existente en el proyecto.
    """
    if os.path.exists(output_csv_path) and os.path.getsize(output_csv_path) > 0:
        return  # ya tiene contenido

    # Usamos tu función y de paso hacemos strip() por si hay espacios en 'VIDEO_SAMPLE '
    header = [h.strip() for h in get_column_headers()]

    with open(output_csv_path, mode="w", newline="") as f:
        writer = csv.writer(f, delimiter=",", quotechar='"', quoting=csv.QUOTE_MINIMAL)
        writer.writerow(header)

    print(f"[INFO] CSV header created at: {output_csv_path}")


# ---------------------------------------------------------
# 3) Loop principal: synthetic_sample_videos → synthetic_sample_data
# ---------------------------------------------------------
for classification, num_videos in videos_dict_aug.items():
    # Subdirectorio para los CSV de esta clasificación (ambulancia, doctor, etc.)
    class_dir = ensure_directory_exists(os.path.join(std_dir, classification))
    output_csv_path = os.path.join(class_dir, f"{classification}_lsm.csv")

    # Crear header si el CSV es nuevo / vacío
    ensure_csv_header(output_csv_path)

    # Averiguar el máximo VIDEO_SAMPLE ya procesado
    existing_max_sample = get_max_video_sample(output_csv_path)
    target_samples = num_videos  # por ejemplo, 20 para los samples

    if existing_max_sample >= target_samples:
        print(
            f"[SKIP] CSV {output_csv_path} already has {existing_max_sample} samples "
            f"(target = {target_samples}). Skipping '{classification}'..."
        )
        continue

    start_video_num = existing_max_sample + 1

    print(
        f"[INFO] Processing class='{classification}' from VIDEO_SAMPLE={start_video_num} "
        f"to {target_samples} (synthetic_sample_videos)."
    )

    for video_num in range(start_video_num, target_samples + 1):
        # Ruta al video sintético: data/synthetic_sample_videos/<term>/<term>_XXX.mp4
        video_path = os.path.join(
            SYNTH_VIDEOS,
            classification,
            f"{classification}_{video_num:03}.mp4"
        )

        if not os.path.exists(video_path):
            print(f"  [WARN] Video not found: {video_path}. Skipping...")
            continue

        # -----------------------------
        # Open video capture
        # -----------------------------
        cap = cv2.VideoCapture(video_path)
        if not cap.isOpened():
            print(f"  [WARN] Could not open: {video_path}. Skipping...")
            continue

        frame_no = 0
        data_exp = []

        # MediaPipe Holistic
        with mp_holistic.Holistic(
            min_detection_confidence=0.85,
            min_tracking_confidence=0.25,
            model_complexity=0
        ) as holistic:

            while cap.isOpened():
                ret, frame = cap.read()
                if not ret:
                    break  # fin del video

                # Detección Mediapipe
                image, results = mediapipe_detection(frame, holistic)

                # QA visual (opcional)
                draw_styled_landmarks_lsm(image, results)

                if SHOW_VIDEO:
                    win_name = f"{classification}_{video_num:03d}"
                    cv2.imshow(win_name, image)
                    # Con 'q' sales de ese video
                    if cv2.waitKey(1) & 0xFF == ord('q'):
                        break

                # Timestamp en ms
                timestamp = round(cap.get(cv2.CAP_PROP_POS_MSEC), 4)

                # Campos básicos
                video_sample = [video_num]
                classificacion = [classification]
                tiempo = [frame_no, timestamp]

                # Mano derecha (21 keypoints × 3 coords)
                rh = list(
                    np.array(
                        [[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]
                    ).flatten()
                ) if results.right_hand_landmarks else list(np.zeros(21 * 3))

                # Mano izquierda
                lh = list(
                    np.array(
                        [[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]
                    ).flatten()
                ) if results.left_hand_landmarks else list(np.zeros(21 * 3))

                # Pose (sin 25–32, igual que en tu pipeline original)
                pose = list(
                    np.array(
                        [
                            [res.x, res.y, res.z, res.visibility]
                            for idx, res in enumerate(results.pose_landmarks.landmark)
                            if idx < 25 or idx > 32
                        ]
                    ).flatten()
                ) if results.pose_landmarks else list(np.zeros(25 * 4))

                row = video_sample + classificacion + tiempo + rh + lh + pose
                data_exp.append(row)
                frame_no += 1

        cap.release()
        if SHOW_VIDEO:
            cv2.destroyAllWindows()

        # -----------------------------
        # Append de todas las filas de ese video al CSV
        # -----------------------------
        try:
            with open(output_csv_path, mode="a", newline="") as f:
                csv_writer = csv.writer(
                    f,
                    delimiter=",",
                    quotechar='"',
                    quoting=csv.QUOTE_MINIMAL
                )
                csv_writer.writerows(data_exp)
            print(f"  [OK] Appended {len(data_exp)} frames to {output_csv_path}")
        except Exception as e:
            print(f"  [ERROR] Writing to {output_csv_path}: {e}")


[INFO] CSV header created at: /Users/armandobecerril/PhD/MSL-150/data/synthetic_sample_data/ambulancia/ambulancia_lsm.csv
[INFO] Processing class='ambulancia' from VIDEO_SAMPLE=1 to 20 (synthetic_sample_videos).


INFO: Created TensorFlow Lite XNNPACK delegate for CPU.


  [OK] Appended 69 frames to /Users/armandobecerril/PhD/MSL-150/data/synthetic_sample_data/ambulancia/ambulancia_lsm.csv
  [OK] Appended 63 frames to /Users/armandobecerril/PhD/MSL-150/data/synthetic_sample_data/ambulancia/ambulancia_lsm.csv
  [OK] Appended 63 frames to /Users/armandobecerril/PhD/MSL-150/data/synthetic_sample_data/ambulancia/ambulancia_lsm.csv
  [OK] Appended 62 frames to /Users/armandobecerril/PhD/MSL-150/data/synthetic_sample_data/ambulancia/ambulancia_lsm.csv
  [OK] Appended 60 frames to /Users/armandobecerril/PhD/MSL-150/data/synthetic_sample_data/ambulancia/ambulancia_lsm.csv
  [OK] Appended 78 frames to /Users/armandobecerril/PhD/MSL-150/data/synthetic_sample_data/ambulancia/ambulancia_lsm.csv
  [OK] Appended 66 frames to /Users/armandobecerril/PhD/MSL-150/data/synthetic_sample_data/ambulancia/ambulancia_lsm.csv
  [OK] Appended 66 frames to /Users/armandobecerril/PhD/MSL-150/data/synthetic_sample_data/ambulancia/ambulancia_lsm.csv
  [OK] Appended 65 frames to /Us

  [OK] Appended 64 frames to /Users/armandobecerril/PhD/MSL-150/data/synthetic_sample_data/hoy/hoy_lsm.csv
  [OK] Appended 72 frames to /Users/armandobecerril/PhD/MSL-150/data/synthetic_sample_data/hoy/hoy_lsm.csv
  [OK] Appended 57 frames to /Users/armandobecerril/PhD/MSL-150/data/synthetic_sample_data/hoy/hoy_lsm.csv
  [OK] Appended 62 frames to /Users/armandobecerril/PhD/MSL-150/data/synthetic_sample_data/hoy/hoy_lsm.csv
  [OK] Appended 63 frames to /Users/armandobecerril/PhD/MSL-150/data/synthetic_sample_data/hoy/hoy_lsm.csv
  [OK] Appended 58 frames to /Users/armandobecerril/PhD/MSL-150/data/synthetic_sample_data/hoy/hoy_lsm.csv
  [OK] Appended 59 frames to /Users/armandobecerril/PhD/MSL-150/data/synthetic_sample_data/hoy/hoy_lsm.csv
  [OK] Appended 58 frames to /Users/armandobecerril/PhD/MSL-150/data/synthetic_sample_data/hoy/hoy_lsm.csv
  [OK] Appended 60 frames to /Users/armandobecerril/PhD/MSL-150/data/synthetic_sample_data/hoy/hoy_lsm.csv
  [OK] Appended 63 frames to /Users/a

In [10]:
import cv2

# Safe close
try:
    cap.release()
except:
    pass

cv2.destroyAllWindows()

# A tiny delay — required on macOS to force window close
for i in range(5):
    cv2.waitKey(1)

# 4.1. Quality control and consistency checks for processed CSV files

Before training, we applied a two-step quality control procedure over the per-sign CSV files generated from the synthetic videos.

First, we normalized the schema of all CSV files by converting column headers to uppercase and trimming whitespace. This guarantees a consistent set of variable names (e.g., VIDEO_SAMPLE, CLASSIFICATION, FRAME, TIMESTAMP, and the pose/hand keypoint columns) across all signs and synthetic samples.

Second, we verified coverage and integrity for each sign category using the target number of synthetic videos defined in videos_dict_aug. For every sign, the script checks that the expected CSV file exists and that all VIDEO_SAMPLE identifiers from 1 to N (e.g., 1…200) are present. Missing CSVs or missing sample IDs are reported explicitly. Only after all CSVs pass this normalization and coverage check do we proceed to aggregate data and export the final NumPy tensors used for model training.

In [11]:
import os
import pandas as pd

# -------------------------------------------------------------------
# Paths (MSL-150 project)
# -------------------------------------------------------------------
BASE_DIR   = "/Users/armandobecerril/PhD/MSL-150"
SYNTH_DATA = os.path.join(BASE_DIR, "data", "synthetic_sample_data")

# videos_dict_aug must already be defined in the notebook:
# videos_dict_aug = {"AMBULANCIA": SAMPLES , "DOCTOR": SAMPLES, ...}


def update_csv_headers_to_upper_and_trim(root_dir: str):
    """
    Normalize the schema of all CSV files under `root_dir` by:
      - Converting column names to UPPERCASE.
      - Stripping leading/trailing whitespace from column names.
    
    This step ensures that downstream code can rely on a
    consistent schema (e.g. VIDEO_SAMPLE, CLASSIFICATION, FRAME, TIMESTAMP, ...).
    """
    for subdir, _, files in os.walk(root_dir):
        for file in files:
            if not file.endswith(".csv"):
                continue

            filepath = os.path.join(subdir, file)
            try:
                df = pd.read_csv(filepath)

                # Normalize headers
                df.columns = [col.upper().strip() for col in df.columns]

                df.to_csv(filepath, index=False)
                print(f"[OK] Headers normalized for: {filepath}")
            except Exception as e:
                print(f"[ERR] Could not update headers for {filepath}: {e}")


def check_csv_coverage(root_dir: str, videos_dict_aug: dict):
    """
    For each classification in `videos_dict_aug`, verify:
      1) The CSV file <classification>/<classification>_lsm.csv exists.
      2) All VIDEO_SAMPLE IDs from 1 to expected_samples are present.

    Prints detailed messages for missing files or missing samples.
    """
    missing_classes = []

    for classification, expected_samples in videos_dict_aug.items():
        csv_path = os.path.join(root_dir, classification, f"{classification}_lsm.csv")

        if not os.path.exists(csv_path):
            print(f"[MISSING CSV] {classification}: {csv_path} not found.")
            missing_classes.append(classification)
            continue

        try:
            df = pd.read_csv(csv_path)
            # Make sure column name is normalized
            df.columns = [c.upper().strip() for c in df.columns]

            if "VIDEO_SAMPLE" not in df.columns:
                print(f"[ERR] 'VIDEO_SAMPLE' column not found in {csv_path}")
                missing_classes.append(classification)
                continue

            samples_present = sorted(df["VIDEO_SAMPLE"].unique().tolist())
            missing_samples = [
                sample_id
                for sample_id in range(1, expected_samples + 1)
                if sample_id not in samples_present
            ]

            if missing_samples:
                print(
                    f"[WARN] {classification}: missing VIDEO_SAMPLE IDs: "
                    f"{missing_samples}"
                )
            else:
                print(
                    f"[OK] {classification}: CSV {csv_path} has samples 1..{expected_samples}."
                )

        except Exception as e:
            print(f"[ERR] Could not read {csv_path}: {e}")
            missing_classes.append(classification)

    # Summary
    if missing_classes:
        print("\nSummary – Missing or inconsistent classifications:")
        print(", ".join(sorted(missing_classes)))
    else:
        print("\nAll CSV files are present and consistent.")


# -------------------------------------------------------------------
# Execute the 4.1 quality & coverage checks on the synthetic samples
# -------------------------------------------------------------------
update_csv_headers_to_upper_and_trim(SYNTH_DATA)
check_csv_coverage(SYNTH_DATA, videos_dict_aug)


[OK] Headers normalized for: /Users/armandobecerril/PhD/MSL-150/data/synthetic_sample_data/ambulancia/ambulancia_lsm.csv
[OK] Headers normalized for: /Users/armandobecerril/PhD/MSL-150/data/synthetic_sample_data/hoy/hoy_lsm.csv
[OK] Headers normalized for: /Users/armandobecerril/PhD/MSL-150/data/synthetic_sample_data/doctor/doctor_lsm.csv
[OK] Headers normalized for: /Users/armandobecerril/PhD/MSL-150/data/synthetic_sample_data/dolor/dolor_lsm.csv
[OK] Headers normalized for: /Users/armandobecerril/PhD/MSL-150/data/synthetic_sample_data/yo/yo_lsm.csv
[OK] ambulancia: CSV /Users/armandobecerril/PhD/MSL-150/data/synthetic_sample_data/ambulancia/ambulancia_lsm.csv has samples 1..20.
[OK] doctor: CSV /Users/armandobecerril/PhD/MSL-150/data/synthetic_sample_data/doctor/doctor_lsm.csv has samples 1..20.
[OK] dolor: CSV /Users/armandobecerril/PhD/MSL-150/data/synthetic_sample_data/dolor/dolor_lsm.csv has samples 1..20.
[OK] hoy: CSV /Users/armandobecerril/PhD/MSL-150/data/synthetic_sample_dat

# 5. Data Consolidation

Temporal standardization and consolidated CSV generation

For each sign category, the synthetic videos were first converted into frame-level CSV files containing the full sequence of pose and hand keypoints produced by MediaPipe Holistic. To ensure temporal consistency across all samples, we applied a central trimming strategy over the time dimension.

Specifically, for every (CLASSIFICATION, VIDEO_SAMPLE) pair we removed leading and trailing frames dominated by zeros (i.e., frames where more than 50% of the keypoint features were zero), and then extracted a fixed window of 30 central frames. Samples with fewer than 30 valid frames after this cleaning step were flagged as incomplete and reported, but the vast majority of sequences met the minimum length requirement.

The standardized sequences (30 frames × all keypoints) were then concatenated across all sign categories to build a single consolidated CSV file, MSL-150_Mexican_Sign_Language_Dataset.csv, which includes the fields VIDEO_SAMPLE, CLASSIFICATION, FRAME, TIMESTAMP and the full set of pose and hand coordinates. Additional validation scripts confirmed that each sign reached the expected number of synthetic samples and that each sample contained at least 30 frames, before exporting the NumPy tensors used for training.

In [12]:
import os
import pandas as pd

# ------------------------------------------------------------
# Paths for the “sample” dataset inside the MSL-150 repo
# ------------------------------------------------------------
BASE_DIR        = "/Users/armandobecerril/PhD/MSL-150"
SYNTH_DATA_DIR  = os.path.join(BASE_DIR, "data", "synthetic_sample_data")
TOTAL_DATA_DIR  = os.path.join(BASE_DIR, "data", "synthetic_sample_data_total")
os.makedirs(TOTAL_DATA_DIR, exist_ok=True)

# Final consolidated CSV (small sample version for the repo)
FINAL_CSV_PATH = os.path.join(
    TOTAL_DATA_DIR,
    "MSL-150_Mexican_Sign_Language_Dataset.csv"
)

def central_clean_data(df, frames=30):
    """
    Temporal standardization:
    - Remove frames that are mostly zeros (over all keypoint columns).
    - Extract a centered window of exactly `frames` rows whenever possible.

    Returns
    -------
    cleaned_df : pd.DataFrame
        Either the centered window or the non-zero subset if shorter than `frames`.
    is_complete : bool
        True if at least `frames` valid rows were available.
    """
    if df.empty:
        return df, False

    # Consider all columns except VIDEO_SAMPLE to detect "non-zero" frames
    non_zero_rows = df[
        (df.drop("VIDEO_SAMPLE", axis=1) != 0).sum(axis=1) > df.shape[1] * 0.5
    ]

    num_rows = len(non_zero_rows)
    if num_rows < frames:
        # Not enough valid frames; return what we have and mark as incomplete
        return non_zero_rows, False
    else:
        # Centered window of exactly `frames` rows
        start_idx = (num_rows - frames) // 2
        end_idx   = start_idx + frames
        return non_zero_rows.iloc[start_idx:end_idx], True


# ------------------------------------------------------------
# Consolidation loop
# ------------------------------------------------------------
all_dfs = []
incomplete_files = {}

for action, count in videos_dict_aug.items():
    # Input CSV per sign (previous step already created these)
    input_path = os.path.join(SYNTH_DATA_DIR, action, f"{action}_lsm.csv")

    if not os.path.exists(input_path):
        print(f"[MISSING] CSV not found for {action}: {input_path}")
        continue

    df = pd.read_csv(input_path)
    # Normalize headers just in case
    df.columns = [col.upper().strip() for col in df.columns]

    if "VIDEO_SAMPLE" not in df.columns:
        print(f"[ERR] 'VIDEO_SAMPLE' column not found in {input_path}. Skipping.")
        continue

    # Output per-sign CSV in the consolidated folder (optional)
    output_file = os.path.join(TOTAL_DATA_DIR, f"{action}_lsm.csv")

    incomplete_count = 0

    for i in range(1, count + 1):
        sample_data = df[df["VIDEO_SAMPLE"] == i]

        cleaned_data, is_complete = central_clean_data(sample_data, frames=30)

        if cleaned_data.empty:
            incomplete_count += 1
            continue

        # Append to global list
        all_dfs.append(cleaned_data)

        # Write per-sign file in append mode
        # - first sample: write header
        # - subsequent samples: append without header
        write_mode = "w" if i == 1 else "a"
        write_header = not os.path.exists(output_file) if i > 1 else True

        cleaned_data.to_csv(
            output_file,
            mode=write_mode,
            header=write_header,
            index=False
        )

        if not is_complete:
            incomplete_count += 1

    if incomplete_count > 0:
        incomplete_files[action] = incomplete_count

# ------------------------------------------------------------
# Global consolidated CSV
# ------------------------------------------------------------
if all_dfs:
    final_df = pd.concat(all_dfs, ignore_index=True)
    final_df.to_csv(FINAL_CSV_PATH, index=False)
    print(f"[OK] Consolidated CSV written to:\n  {FINAL_CSV_PATH}")
else:
    print("[WARN] No data collected for consolidation.")

# Summary of incomplete sequences
if incomplete_files:
    print("\nSummary of sequences with fewer than 30 frames (after cleaning):")
    for action, count in incomplete_files.items():
        print(f"  {action}: {count} incomplete samples")
else:
    print("\nAll samples reached the minimum number of frames after cleaning.")


[OK] Consolidated CSV written to:
  /Users/armandobecerril/PhD/MSL-150/data/synthetic_sample_data_total/MSL-150_Mexican_Sign_Language_Dataset.csv

Summary of sequences with fewer than 30 frames (after cleaning):
  ambulancia: 11 incomplete samples
  doctor: 9 incomplete samples
  dolor: 2 incomplete samples


In [14]:
import pandas as pd
import os

BASE_DIR        = "/Users/armandobecerril/PhD/MSL-150"
TOTAL_DATA_DIR  = os.path.join(BASE_DIR, "data", "synthetic_sample_data_total")
FINAL_CSV_PATH  = os.path.join(TOTAL_DATA_DIR, "MSL-150_Mexican_Sign_Language_Dataset.csv")

def analyze_csv_data(file_path, expected_samples, min_frames=30):
    """
    Global sanity-check of the consolidated CSV:
      - Prints general info (columns, rows).
      - Checks number of VIDEO_SAMPLE per CLASSIFICATION.
      - Verifies that each sample has at least `min_frames` rows.
    """
    df = pd.read_csv(file_path)

    # Remove duplicated columns, if any
    if df.columns.duplicated().any():
        df = df.loc[:, ~df.columns.duplicated()]
        print("Detected and removed duplicated column headers.")

    # Normalize types for key columns
    df["CLASSIFICATION"] = df["CLASSIFICATION"].astype(str)
    df["VIDEO_SAMPLE"]   = df["VIDEO_SAMPLE"].astype(str)

    print(f"Number of columns: {df.shape[1]}")
    print(f"Column names: {list(df.columns)}")
    print(f"Total records: {df.shape[0]}")

    if "CLASSIFICATION" in df.columns and "VIDEO_SAMPLE" in df.columns:
        sample_count = df.groupby("CLASSIFICATION")["VIDEO_SAMPLE"].nunique()
        print("\nNumber of samples per classification:")
        print(sample_count)

        # Check expected_samples per class
        for classification, count in sample_count.items():
            if count != expected_samples:
                print(
                    f"[WARN] {classification} has {count} samples; "
                    f"{expected_samples} were expected."
                )

        # Check frame count per (classification, video_sample)
        frame_sizes = df.groupby(["CLASSIFICATION", "VIDEO_SAMPLE"]).size()
        frame_issues = frame_sizes[frame_sizes < min_frames]

        if not frame_issues.empty:
            print(f"\nSome samples have fewer than {min_frames} frames:")
            print(frame_issues)
        else:
            print(f"\nAll samples have at least {min_frames} frames.")
    else:
        print("Missing 'CLASSIFICATION' or 'VIDEO_SAMPLE' columns in the DataFrame.")


def deep_validate_classification(
    file_path,
    classification,
    expected_samples,
    min_frames=30
):
    """
    Detailed inspection for a single classification:
      - Number of records and columns.
      - Number of unique VIDEO_SAMPLE values.
      - Frame count per sample, with a check for min_frames.

    Returns
    -------
    class_df : pd.DataFrame
        Subset of the data for the given classification.
    """
    df = pd.read_csv(file_path)

    df["CLASSIFICATION"] = df["CLASSIFICATION"].astype(str)
    df["VIDEO_SAMPLE"]   = df["VIDEO_SAMPLE"].astype(str)

    class_df = df[df["CLASSIFICATION"] == str(classification)]

    print(f"Data for classification '{classification}':")
    print(f"Total records: {class_df.shape[0]}")
    print(f"Number of columns: {class_df.shape[1]}")

    unique_samples = class_df["VIDEO_SAMPLE"].nunique()
    print(f"Unique samples: {unique_samples}")

    if unique_samples != expected_samples:
        print(
            f"[WARN] Expected {expected_samples} samples, "
            f"but found {unique_samples}."
        )

    sample_frames = class_df.groupby("VIDEO_SAMPLE").size()
    frame_issues = sample_frames[sample_frames < min_frames]

    if not frame_issues.empty:
        print(f"\nSome samples have fewer than {min_frames} frames:")
        print(frame_issues)
    else:
        print(f"\nAll samples have at least {min_frames} frames.")

    return class_df


# ------------------------------------------------------------
# Run the checks for the consolidated “sample” dataset
# ------------------------------------------------------------
# For your current mini-dataset, expected_samples should match `samples`
# (e.g., 20 synthetic videos per sign).
analyze_csv_data(FINAL_CSV_PATH, expected_samples=samples, min_frames=30)

# Example: deep inspection for one sign (e.g. "AMBULANCIA")
df_ambulancia = deep_validate_classification(
    FINAL_CSV_PATH,
    classification="ambulancia",
    expected_samples=samples,
    min_frames=30
)


Number of columns: 230
Column names: ['VIDEO_SAMPLE', 'CLASSIFICATION', 'FRAME', 'TIMESTAMP', 'RIGHT_WRIST_X', 'RIGHT_WRIST_Y', 'RIGHT_WRIST_Z', 'RIGHT_THUMB_CMC_X', 'RIGHT_THUMB_CMC_Y', 'RIGHT_THUMB_CMC_Z', 'RIGHT_THUMB_MCP_X', 'RIGHT_THUMB_MCP_Y', 'RIGHT_THUMB_MCP_Z', 'RIGHT_THUMB_IP_X', 'RIGHT_THUMB_IP_Y', 'RIGHT_THUMB_IP_Z', 'RIGHT_THUMB_TIP_X', 'RIGHT_THUMB_TIP_Y', 'RIGHT_THUMB_TIP_Z', 'RIGHT_INDEX_FINGER_MCP_X', 'RIGHT_INDEX_FINGER_MCP_Y', 'RIGHT_INDEX_FINGER_MCP_Z', 'RIGHT_INDEX_FINGER_PIP_X', 'RIGHT_INDEX_FINGER_PIP_Y', 'RIGHT_INDEX_FINGER_PIP_Z', 'RIGHT_INDEX_FINGER_DIP_X', 'RIGHT_INDEX_FINGER_DIP_Y', 'RIGHT_INDEX_FINGER_DIP_Z', 'RIGHT_INDEX_FINGER_TIP_X', 'RIGHT_INDEX_FINGER_TIP_Y', 'RIGHT_INDEX_FINGER_TIP_Z', 'RIGHT_MIDDLE_FINGER_MCP_X', 'RIGHT_MIDDLE_FINGER_MCP_Y', 'RIGHT_MIDDLE_FINGER_MCP_Z', 'RIGHT_MIDDLE_FINGER_PIP_X', 'RIGHT_MIDDLE_FINGER_PIP_Y', 'RIGHT_MIDDLE_FINGER_PIP_Z', 'RIGHT_MIDDLE_FINGER_DIP_X', 'RIGHT_MIDDLE_FINGER_DIP_Y', 'RIGHT_MIDDLE_FINGER_DIP_Z', 'RIGHT_MI

# 6. Data Preparation Directories & Numpy

Extraer keypoints desde el CSV consolidado

In [18]:
import os
import pandas as pd
import numpy as np

BASE_DIR        = "/Users/armandobecerril/PhD/MSL-150"
TOTAL_DATA_DIR  = os.path.join(BASE_DIR, "data", "synthetic_sample_data_total")
FINAL_CSV_PATH  = os.path.join(TOTAL_DATA_DIR, "MSL-150_Mexican_Sign_Language_Dataset.csv")

SAMPLE_NPY_ROOT = os.path.join(BASE_DIR, "data", "sample_npy")
os.makedirs(SAMPLE_NPY_ROOT, exist_ok=True)

def extract_keypoints_lsm_from_csv(df, classification, video_sample_num):
    """
    Extrae los keypoints de mano derecha, mano izquierda y pose
    desde el DataFrame consolidado con layout:

    VIDEO_SAMPLE, CLASSIFICATION, FRAME, TIMESTAMP,
    RIGHT_*, LEFT_*, [POSE_* ...]

    Asume índices:
      - RIGHT hand: columnas 4–66   (63 columnas = 21×3)
      - LEFT hand : columnas 67–129 (63 columnas = 21×3)
      - POSE      : columnas 130–229 (100 columnas = 25×4)
    """
    # Normalizar encabezados
    df.columns = [c.strip().upper() for c in df.columns]

    # Filtrar por clasificación y sample
    filtered_data = df[
        (df["CLASSIFICATION"] == classification) &
        (df["VIDEO_SAMPLE"] == video_sample_num)
    ]

    if filtered_data.empty:
        pose_data = np.zeros((1, 100))  # 25×4
        lh_data   = np.zeros((1, 63))   # 21×3
        rh_data   = np.zeros((1, 63))   # 21×3
    else:
        rh_data   = filtered_data.iloc[:, 4:67].values    # 63 cols
        lh_data   = filtered_data.iloc[:, 67:130].values  # 63 cols
        pose_data = filtered_data.iloc[:, 130:230].values # 100 cols

    # Concatenar: [POSE | LH | RH] como en tu pipeline original
    return np.concatenate([pose_data, lh_data, rh_data], axis=1)



Guardar .npy asegurando exactamente 30 frames

In [19]:
def save_sample_to_npy(classification, video_sample, keypoints_data, output_base_path, target_frames=30):
    """
    Guarda los keypoints de un sample en:

        output_base_path/<CLASSIFICATION>/<VIDEO_SAMPLE>/<0..29>.npy

    Forzando exactamente `target_frames` frames por muestra.
    """
    num_frames, num_features = keypoints_data.shape

    if num_frames == 0:
        print(f"[WARN] Sample {classification}-{video_sample} no tiene frames válidos. Saltando.")
        return

    # Si hay más de target_frames, volvemos a centrar (por seguridad)
    if num_frames > target_frames:
        start = (num_frames - target_frames) // 2
        keypoints_data = keypoints_data[start:start + target_frames]
        num_frames = target_frames

    # Si hay menos, rellenamos con ceros hasta target_frames
    if num_frames < target_frames:
        pad = np.zeros((target_frames - num_frames, num_features))
        keypoints_data = np.vstack([keypoints_data, pad])
        num_frames = target_frames

    # Crear directorio: data/sample_npy/<CLASS>/<VIDEO_SAMPLE>/
    sample_dir = os.path.join(output_base_path, str(classification), str(video_sample))
    os.makedirs(sample_dir, exist_ok=True)

    for idx in range(num_frames):
        np.save(os.path.join(sample_dir, f"{idx}.npy"), keypoints_data[idx])

    print(f"[OK] {classification} - sample {video_sample}: guardados {num_frames} npy en {sample_dir}")



Orquestador: del CSV consolidado npy

In [20]:
def process_consolidated_csv_to_npy(consolidated_csv_path, npy_root_dir, target_frames=30):
    """
    Lee MSL-150_Mexican_Sign_Language_Dataset.csv (versión sample)
    y genera data/sample_npy/<CLASSIFICATION>/<VIDEO_SAMPLE>/0..29.npy
    """
    if not os.path.exists(consolidated_csv_path):
        raise FileNotFoundError(f"Consolidated CSV not found: {consolidated_csv_path}")

    df = pd.read_csv(consolidated_csv_path)
    df.columns = [c.strip().upper() for c in df.columns]

    if "CLASSIFICATION" not in df.columns or "VIDEO_SAMPLE" not in df.columns:
        raise ValueError("El CSV consolidado debe contener columnas 'CLASSIFICATION' y 'VIDEO_SAMPLE'.")

    classes = sorted(df["CLASSIFICATION"].unique())
    print(f"[INFO] Clasificaciones encontradas en el CSV sample: {classes}")

    for classification in classes:
        df_class = df[df["CLASSIFICATION"] == classification]
        samples = sorted(df_class["VIDEO_SAMPLE"].unique())
        print(f"\n[INFO] Procesando clasificación '{classification}' con {len(samples)} samples")

        for video_sample in samples:
            keypoints_data = extract_keypoints_lsm_from_csv(
                df,
                classification=classification,
                video_sample_num=video_sample
            )
            save_sample_to_npy(
                classification=classification,
                video_sample=video_sample,
                keypoints_data=keypoints_data,
                output_base_path=npy_root_dir,
                target_frames=target_frames
            )

    print(f"\n[DONE] Todos los samples convertidos a npy en: {npy_root_dir}")


# Llamada principal
process_consolidated_csv_to_npy(FINAL_CSV_PATH, SAMPLE_NPY_ROOT, target_frames=30)


[INFO] Clasificaciones encontradas en el CSV sample: ['ambulancia', 'doctor', 'dolor', 'hoy', 'yo']

[INFO] Procesando clasificación 'ambulancia' con 11 samples
[OK] ambulancia - sample 1: guardados 30 npy en /Users/armandobecerril/PhD/MSL-150/data/sample_npy/ambulancia/1
[OK] ambulancia - sample 3: guardados 30 npy en /Users/armandobecerril/PhD/MSL-150/data/sample_npy/ambulancia/3
[OK] ambulancia - sample 4: guardados 30 npy en /Users/armandobecerril/PhD/MSL-150/data/sample_npy/ambulancia/4
[OK] ambulancia - sample 6: guardados 30 npy en /Users/armandobecerril/PhD/MSL-150/data/sample_npy/ambulancia/6
[OK] ambulancia - sample 7: guardados 30 npy en /Users/armandobecerril/PhD/MSL-150/data/sample_npy/ambulancia/7
[OK] ambulancia - sample 8: guardados 30 npy en /Users/armandobecerril/PhD/MSL-150/data/sample_npy/ambulancia/8
[OK] ambulancia - sample 12: guardados 30 npy en /Users/armandobecerril/PhD/MSL-150/data/sample_npy/ambulancia/12
[OK] ambulancia - sample 13: guardados 30 npy en /Use

To ensure compatibility with the recurrent architecture, all samples must share the same temporal and spatial dimensionality. The model was trained on fixed-length sequences of 30 frames × 226 features, and therefore cannot process variable-length inputs during inference. To guarantee consistency and avoid distributional drift, each sequence is temporally standardized using a centered 30-frame window. Sequences shorter than 30 frames are zero-padded, while longer sequences are symmetrically trimmed. This ensures stable recurrent dynamics and reproducible results