In [None]:
# train_model.py
# This script loads the collected keypoint data, preprocesses it,
# builds an LSTM model, and trains it to recognize ASL signs.

import numpy as np
import os
import json
import time
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.layers import LSTM, Dense, Dropout, BatchNormalization
from tensorflow.keras.callbacks import TensorBoard, ModelCheckpoint, EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.regularizers import l2

# --- 1. Configuration ---
# These parameters must match the data collection script and your data structure.
DATA_PATH = os.path.join('data')
MODEL_SAVE_PATH = os.path.join('models')
LOG_DIR = os.path.join('Logs')

# Base actions (e.g., '10', '11', ... '20')
ACTIONS_BASE = [str(i) for i in range(10, 21)]

# Parameters from the capture script
SEQUENCE_LENGTH = 30  # Frames per sequence

# Keypoint vector length (NO Z coordinate)
# Pose: 33 landmarks * (x, y, visibility) = 99
# Hands: 21 landmarks * (x, y) = 42 per hand
# Total = 99 + 42 + 42 = 183
KEYPOINT_VECTOR_LENGTH = 183

# --- 2. Create Paths ---
os.makedirs(MODEL_SAVE_PATH, exist_ok=True)
os.makedirs(LOG_DIR, exist_ok=True)

# --- 3. Landmark Constants & Helper Functions (Corrected & Robust) ---
NUM_POSE_LANDMARKS = 33
NUM_HAND_LANDMARKS = 21

# Indices for mirroring (swapping left and right body parts)
POSE_LANDMARK_PAIRS = {11: 12, 13: 14, 15: 16, 23: 24, 25: 26, 27: 28, 29: 30, 31: 32}
POSE_LANDMARK_PAIRS.update({v: k for k, v in POSE_LANDMARK_PAIRS.items()})

def mirror_keypoints_frame(keypoints_frame):
    """ Correctly mirrors a 183-length keypoint vector to canonicalize left-hand signs. """
    mirrored_frame = np.copy(keypoints_frame)

    # --- 1. Mirror X-coordinates (around the center 0.5) ---
    # Pose: 3 coords (x, y, vis). X is at index 0, 3, 6, ...
    for i in range(NUM_POSE_LANDMARKS):
        mirrored_frame[i * 3] = 1.0 - mirrored_frame[i * 3]
    # Hands: 2 coords (x, y). X is at index 0, 2, 4, ...
    lh_start_idx = NUM_POSE_LANDMARKS * 3
    rh_start_idx = lh_start_idx + NUM_HAND_LANDMARKS * 2
    for i in range(NUM_HAND_LANDMARKS):
        mirrored_frame[lh_start_idx + i * 2] = 1.0 - mirrored_frame[lh_start_idx + i * 2]
        mirrored_frame[rh_start_idx + i * 2] = 1.0 - mirrored_frame[rh_start_idx + i * 2]

    # --- 2. Swap Left and Right Hand Data Blocks ---
    original_mirrored_lh_data = mirrored_frame[lh_start_idx:rh_start_idx].copy()
    original_mirrored_rh_data = mirrored_frame[rh_start_idx:].copy()
    mirrored_frame[lh_start_idx:rh_start_idx] = original_mirrored_rh_data
    mirrored_frame[rh_start_idx:] = original_mirrored_lh_data

    # --- 3. Swap Paired Pose Landmarks ---
    temp_pose_data = mirrored_frame[:NUM_POSE_LANDMARKS * 3].copy().reshape(NUM_POSE_LANDMARKS, 3)
    final_pose_data = temp_pose_data.copy()
    for l_idx, r_idx in POSE_LANDMARK_PAIRS.items():
        final_pose_data[l_idx] = temp_pose_data[r_idx]
    mirrored_frame[:NUM_POSE_LANDMARKS * 3] = final_pose_data.flatten()
    return mirrored_frame

def normalize_sequence(sequence_data):
    """
    Robustly normalizes a sequence of 183-length keypoint vectors.
    - Translation: Relative to the midpoint of the hips (2D).
    - Scale: Relative to the distance between the shoulders (2D).
    """
    normalized_sequence = []
    for frame_kps in sequence_data:
        if np.all(frame_kps == 0):
            normalized_sequence.append(frame_kps)
            continue

        # Reshape for easier calculations (183 vector -> pose, lh, rh)
        pose_kps = frame_kps[:99].reshape(33, 3)
        lh_kps = frame_kps[99:141].reshape(21, 2)
        rh_kps = frame_kps[141:].reshape(21, 2)

        # --- 1. Translation Normalization (center on hip midpoint) ---
        hip_l, hip_r = pose_kps[23], pose_kps[24]
        # Check visibility flag (3rd value, index 2) to find a stable origin
        if hip_l[2] > 0.5 and hip_r[2] > 0.5:
            origin = (hip_l[:2] + hip_r[:2]) / 2.0
        elif hip_l[2] > 0.5:
            origin = hip_l[:2]
        elif hip_r[2] > 0.5:
            origin = hip_r[:2]
        else:  # Fallback to nose if hips are not visible
            origin = pose_kps[0][:2]

        # Subtract the origin from all x,y coordinates
        pose_kps[:, :2] -= origin
        lh_kps -= origin
        rh_kps -= origin

        # --- 2. Scale Normalization (relative to shoulder distance) ---
        shoulder_l, shoulder_r = pose_kps[11], pose_kps[12]
        if shoulder_l[2] > 0.5 and shoulder_r[2] > 0.5:
            # Use L2 norm for 2D distance
            scale = np.linalg.norm(shoulder_l[:2] - shoulder_r[:2])
            if scale < 1e-6:  # Avoid division by zero
                scale = 1.0
        else:  # If shoulders not visible, don't scale
            scale = 1.0

        pose_kps[:, :2] /= scale
        lh_kps /= scale
        rh_kps /= scale

        # Flatten back into a single 183-length vector
        processed_frame = np.concatenate([pose_kps.flatten(), lh_kps.flatten(), rh_kps.flatten()])
        normalized_sequence.append(processed_frame)

    return np.array(normalized_sequence)

def main():
    # --- 4. Data Loading and Preprocessing ---
    print("Loading and preprocessing data...")
    sequences, labels = [], []
    label_map = {label: num for num, label in enumerate(ACTIONS_BASE)}

    with open(os.path.join(MODEL_SAVE_PATH, 'label_map.json'), 'w') as f:
        json.dump(label_map, f)
    print(f"Label map created: {label_map}")

    action_variants = [d for d in os.listdir(DATA_PATH) if os.path.isdir(os.path.join(DATA_PATH, d))]

    for action_variant in sorted(action_variants):  # Sort for consistent order
        action_base = action_variant.split('_')[0]
        if action_base not in ACTIONS_BASE:
            print(f"Skipping unexpected folder: {action_variant}")
            continue

        is_left_handed_sample = "_L" in action_variant
        action_variant_path = os.path.join(DATA_PATH, action_variant)

        for seq_name in sorted(os.listdir(action_variant_path)):
            sequence_path = os.path.join(action_variant_path, seq_name)

            num_frames = len([f for f in os.listdir(sequence_path) if f.endswith('.npy')])
            if num_frames != SEQUENCE_LENGTH:
                print(f"Warning: Skipping {action_variant}/{seq_name}. Found {num_frames} frames, expected {SEQUENCE_LENGTH}.")
                continue

            window = []
            for frame_num in range(SEQUENCE_LENGTH):
                res = np.load(os.path.join(sequence_path, f"{frame_num}.npy"))
                if is_left_handed_sample:
                    res = mirror_keypoints_frame(res)
                window.append(res)
            
            # Normalize the entire sequence
            normalized_window = normalize_sequence(np.array(window))
            sequences.append(normalized_window)
            labels.append(label_map[action_base])

    if not sequences:
        raise RuntimeError("No data loaded. Check DATA_PATH and ensure subdirectories contain complete sequences.")

    X = np.array(sequences)
    y = to_categorical(labels, num_classes=len(ACTIONS_BASE)).astype(int)
    print(f"\nData Loaded. X shape: {X.shape}, y shape: {y.shape}")

    # --- 5. Splitting Data ---
    X_train_val, X_test, y_train_val, y_test = train_test_split(X, y, test_size=0.15, random_state=42, stratify=y)
    X_train, X_val, y_train, y_val = train_test_split(X_train_val, y_train_val, test_size=0.15, random_state=42, stratify=y_train_val)
    print(f"Train/Val/Test split: {len(X_train)}/{len(X_val)}/{len(X_test)}")

    # --- 6. Model Architecture (The successful architecture from your notebook) ---
    model = Sequential([
        LSTM(128, return_sequences=True, activation='tanh', recurrent_activation='sigmoid',
             input_shape=(SEQUENCE_LENGTH, KEYPOINT_VECTOR_LENGTH), kernel_regularizer=l2(0.001)),
        BatchNormalization(),
        Dropout(0.3),

        LSTM(256, return_sequences=True, activation='tanh', recurrent_activation='sigmoid', kernel_regularizer=l2(0.001)),
        BatchNormalization(),
        Dropout(0.4),

        LSTM(128, return_sequences=False, activation='tanh', recurrent_activation='sigmoid', kernel_regularizer=l2(0.001)),
        BatchNormalization(),
        Dropout(0.3),

        Dense(128, activation='relu', kernel_regularizer=l2(0.001)),
        BatchNormalization(),
        Dropout(0.3),

        Dense(64, activation='relu', kernel_regularizer=l2(0.001)),

        Dense(len(ACTIONS_BASE), activation='softmax')
    ])

    # --- 7. Compile and Train Model ---
    model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['categorical_accuracy'])
    model.summary()

    log_dir_ts = os.path.join(LOG_DIR, time.strftime("%Y%m%d-%H%M%S"))
    checkpoint_filepath = os.path.join(MODEL_SAVE_PATH, 'best_action_model.h5')

    callbacks = [
        TensorBoard(log_dir=log_dir_ts),
        ModelCheckpoint(filepath=checkpoint_filepath, save_weights_only=False, monitor='val_categorical_accuracy', mode='max', save_best_only=True, verbose=1),
        EarlyStopping(monitor='val_loss', patience=30, verbose=1, mode='min', restore_best_weights=True),
        ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=10, verbose=1, mode='min', min_lr=1e-6)
    ]

    EPOCHS = 300
    BATCH_SIZE = 32

    history = model.fit(X_train, y_train,
                        epochs=EPOCHS,
                        batch_size=BATCH_SIZE,
                        validation_data=(X_val, y_val),
                        callbacks=callbacks)

    # --- 8. Evaluate Best Model on Test Set ---
    print("\nEvaluating best model on Test Set...")
    best_model = load_model(checkpoint_filepath)
    test_loss, test_accuracy = best_model.evaluate(X_test, y_test, verbose=1)
    print(f"Test Loss: {test_loss:.4f}")
    print(f"Test Accuracy: {test_accuracy * 100:.2f}%")

    # --- 9. Plotting Training History (Optional) ---
    try:
        import matplotlib.pyplot as plt
        plt.figure(figsize=(12, 5))
        plt.subplot(1, 2, 1)
        plt.plot(history.history['categorical_accuracy'], label='Train Accuracy')
        plt.plot(history.history['val_categorical_accuracy'], label='Validation Accuracy')
        plt.title('Model Accuracy')
        plt.legend()
        plt.subplot(1, 2, 2)
        plt.plot(history.history['loss'], label='Train Loss')
        plt.plot(history.history['val_loss'], label='Validation Loss')
        plt.title('Model Loss')
        plt.legend()
        plt.savefig(os.path.join(MODEL_SAVE_PATH, 'training_history.png'))
        plt.show()
    except ImportError:
        print("Matplotlib not found. Skipping plotting of training history.")

if __name__ == "__main__":
    main()