In [1]:
import os
import pandas as pd
from tensorflow.keras.utils import to_categorical
import numpy as np
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv1D, MaxPooling1D, Dropout, LSTM, Dense
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
from sklearn.model_selection import train_test_split
import os
import json
import cv2
import mediapipe as mp
import numpy as np
from tensorflow.keras.models import load_model
from sklearn.preprocessing import LabelEncoder
import time




In [6]:


# -------------------------
# Settings
# -------------------------
DATASET_DIR = "dataset"
SEQUENCE_LENGTH = 30
N_FEATURES = 126
SAVE_DIR = "processed_data"
os.makedirs(SAVE_DIR, exist_ok=True)

# -------------------------
# Function to clean and fix sequence
# -------------------------
def clean_and_fix_sequence(df, target_len=SEQUENCE_LENGTH, n_features=N_FEATURES):
    """
    Convert DataFrame to numeric, drop non-numeric rows, pad/truncate to target shape.
    Returns: np.array of shape (target_len, n_features)
    """
    # Convert to numeric, replace non-numeric with NaN
    df_numeric = df.apply(pd.to_numeric, errors='coerce')
    df_numeric = df_numeric.dropna(how='all')

    # If empty, fill with zeros
    if df_numeric.empty:
        return np.zeros((target_len, n_features), dtype=np.float32)

    # Convert to numpy
    seq = df_numeric.values.astype(np.float32)

    # Fix columns
    n_rows, n_cols = seq.shape
    if n_cols > n_features:
        seq = seq[:, :n_features]
    elif n_cols < n_features:
        seq = np.hstack([seq, np.zeros((n_rows, n_features - n_cols), dtype=np.float32)])

    # Fix rows (sequence length)
    if n_rows > target_len:
        seq = seq[:target_len, :]
    elif n_rows < target_len:
        seq = np.vstack([seq, np.zeros((target_len - n_rows, n_features), dtype=np.float32)])

    return seq

# -------------------------
# Process entire dataset
# -------------------------
def process_dataset(dataset_dir=DATASET_DIR):
    X, y = [], []
    skipped_files = []

    categories = [c for c in os.listdir(dataset_dir) if os.path.isdir(os.path.join(dataset_dir, c))]
    print(f"Found categories: {categories}")

    for category in categories:
        kp_dir = os.path.join(dataset_dir, category, "keypoints")
        if not os.path.isdir(kp_dir):
            continue

        files = os.listdir(kp_dir)
        if not files:
            print(f"No CSV files found in {kp_dir}")
            continue

        for csv_file in files:
            csv_path = os.path.join(kp_dir, csv_file)
            try:
                df = pd.read_csv(csv_path, header=None)
                keypoints = clean_and_fix_sequence(df)
                X.append(keypoints)
                y.append(category)
            except Exception as e:
                skipped_files.append(csv_path)
                X.append(np.zeros((SEQUENCE_LENGTH, N_FEATURES), dtype=np.float32))
                y.append(category)
                print(f"Warning: Could not read {csv_path}. Error: {e}")

    X = np.array(X, dtype=np.float32)
    y = np.array(y)

    # Encode labels
    label_encoder = LabelEncoder()
    y_encoded = label_encoder.fit_transform(y)
    y_onehot = to_categorical(y_encoded)

    # Save processed data
    np.save(os.path.join(SAVE_DIR, "X_keypoints.npy"), X)
    np.save(os.path.join(SAVE_DIR, "y_labels.npy"), y_onehot)

    # Save label mapping for later use
    label_map = {i: label for i, label in enumerate(label_encoder.classes_)}
    with open(os.path.join(SAVE_DIR, "label_map.json"), "w") as f:
        json.dump(label_map, f, indent=2)

    print("\nDataset processed successfully!")
    print(f"X shape: {X.shape}, y_onehot shape: {y_onehot.shape}")
    if skipped_files:
        print(f"Skipped {len(skipped_files)} files due to errors.")
        for f in skipped_files:
            print(f" - {f}")

    return X, y_onehot, label_encoder

# -------------------------
# Run preprocessing
# -------------------------
if __name__ == "__main__":
    X, y_onehot, label_encoder = process_dataset()


Found categories: ['again', 'Baby', 'Bad', 'bathroom', 'book', 'Brother', 'busy', 'Dad', 'do not want', 'Eat', 'Fast', 'father', 'Fine', 'finish', 'forget', 'Friend', 'Go', 'Good', 'Great', 'happy', 'He', 'hello', 'Help', 'how', 'I', 'is', 'learn', 'like', 'Love', 'marry', 'meet', 'milk', 'more', 'mother', 'My', 'name', 'need', 'nice', 'No', 'Nothing', 'please', 'question', 'right', 'sad', 'same', 'Say', 'See you later', 'see you letter', 'Sister', 'sleep', 'Stop', 'Teacher', 'thank you', 'want', 'We', 'what', 'What_s up', 'when', 'where', 'which', 'who', 'why', 'wrong', 'Yes', 'You', 'your']

Dataset processed successfully!
X shape: (3701, 30, 126), y_onehot shape: (3701, 65)


In [7]:
import os
import json
import numpy as np
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.layers import Conv1D, MaxPooling1D, Dropout, LSTM, Dense
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
from sklearn.model_selection import train_test_split

# -------------------------
# Settings
# -------------------------
DATA_DIR = "processed_data"
MODEL_SAVE_PATH = "sign_language_model.keras"  # new Keras format
SEQUENCE_LENGTH = 30
N_FEATURES = 126
BATCH_SIZE = 16
EPOCHS = 50
PATIENCE = 10  # Early stopping patience

# -------------------------
# Load preprocessed data
# -------------------------
X = np.load(os.path.join(DATA_DIR, "X_keypoints.npy"))
y = np.load(os.path.join(DATA_DIR, "y_labels.npy"))

# Load label map
with open(os.path.join(DATA_DIR, "label_map.json"), "r") as f:
    label_map = json.load(f)

NUM_CLASSES = y.shape[1]

print(f"X shape: {X.shape}, y shape: {y.shape}")
print(f"Number of classes: {NUM_CLASSES}, Classes: {list(label_map.values())}")

# -------------------------
# Train-test split
# -------------------------
X_train, X_val, y_train, y_val = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)

print(f"Training samples: {len(X_train)}, Validation samples: {len(X_val)}")

# -------------------------
# Model definition
# -------------------------
model = Sequential([
    # 1D Conv layers for spatial info
    Conv1D(64, kernel_size=3, activation='relu', input_shape=(SEQUENCE_LENGTH, N_FEATURES)),
    Conv1D(128, kernel_size=3, activation='relu'),
    MaxPooling1D(pool_size=2),
    Dropout(0.3),

    # LSTM for temporal info
    LSTM(64, return_sequences=False),
    Dropout(0.3),

    # Output layer
    Dense(NUM_CLASSES, activation='softmax')
])

model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
model.summary()

# -------------------------
# Callbacks
# -------------------------
checkpoint = ModelCheckpoint(MODEL_SAVE_PATH, monitor='val_accuracy', save_best_only=True, verbose=1)
early_stop = EarlyStopping(monitor='val_loss', patience=PATIENCE, restore_best_weights=True, verbose=1)

# -------------------------
# Train the model
# -------------------------
history = model.fit(
    X_train, y_train,
    validation_data=(X_val, y_val),
    epochs=EPOCHS,
    batch_size=BATCH_SIZE,
    callbacks=[checkpoint, early_stop],
    shuffle=True
)

# -------------------------
# Evaluate the model
# -------------------------
loss, accuracy = model.evaluate(X_val, y_val, verbose=0)
print(f"\nâœ… Validation Loss: {loss:.4f}")
print(f"âœ… Validation Accuracy: {accuracy:.4f}")

# -------------------------
# Optional: Continue training with extra epochs
# -------------------------
EXTRA_EPOCHS = 15  # adjust as needed
print("\nðŸ”¹ Continuing training with extra epochs...")

# Load best saved model
model = load_model(MODEL_SAVE_PATH)

# Adjust patience for fine-tuning
early_stop_extra = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True, verbose=1)

history_extra = model.fit(
    X_train, y_train,
    validation_data=(X_val, y_val),
    epochs=EXTRA_EPOCHS,
    batch_size=BATCH_SIZE,
    callbacks=[early_stop_extra, checkpoint],
    shuffle=True
)

# Final evaluation
loss, accuracy = model.evaluate(X_val, y_val, verbose=0)
print(f"\nâœ… Final Validation Loss: {loss:.4f}")
print(f"âœ… Final Validation Accuracy: {accuracy:.4f}")


X shape: (3701, 30, 126), y shape: (3701, 65)
Number of classes: 65, Classes: ['Baby', 'Bad', 'Brother', 'Dad', 'Eat', 'Fine', 'Friend', 'Go', 'Good', 'Great', 'He', 'Help', 'I', 'Love', 'My', 'No', 'Nothing', 'Say', 'See you later', 'Sister', 'Stop', 'Teacher', 'We', 'What_s up', 'Yes', 'You', 'again', 'bathroom', 'book', 'busy', 'do not want', 'father', 'finish', 'forget', 'happy', 'hello', 'how', 'is', 'learn', 'like', 'marry', 'meet', 'milk', 'more', 'mother', 'name', 'need', 'nice', 'please', 'question', 'right', 'sad', 'same', 'see you letter', 'sleep', 'thank you', 'want', 'what', 'when', 'where', 'which', 'who', 'why', 'wrong', 'your']
Training samples: 2960, Validation samples: 741
Model: "sequential_2"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv1d_4 (Conv1D)           (None, 28, 64)            24256     
                                                                 
 conv1d_5 (Con

In [2]:

# -------------------------
# Settings
# -------------------------
MODEL_PATH = "sign_language_model.keras"
SEQUENCE_LENGTH = 30
N_FEATURES = 126
CLASSES = ["Baby",
"Bad",
"Brother",
"Dad",
"Eat",
"Fine",
"Friend",
"Go",
"Good",
"Great",
"He",
"Help",
"I",
"Love",
"My",
"No",
"Nothing",
"Say",
"See you later",
"Sister",
"Stop",
"Teacher",
"We",
"What_s up",
"Yes",
"You",
"again",
"bathroom",
"book",
"busy",
"do not want",
"father",
"finish",
"forget",
"happy",
"hello",
"how",
"is",
"learn",
"like",
"marry",
"meet",
"milk",
"more",
"mother",
"name",
"need",
"nice",
"please",
"question",
"right",
"sad",
"same",
"see you letter",
"sleep",
"thank you",
"want",
"what",
"when",
"where",
"which",
"who",
"why",
"wrong",
"your"
]  # add all your class names in correct order

# -------------------------
# Load model
# -------------------------
model = load_model(MODEL_PATH)

# -------------------------
# Label encoder
# -------------------------
label_encoder = LabelEncoder()
label_encoder.fit(CLASSES)

# -------------------------
# Initialize MediaPipe
# -------------------------
mp_hands = mp.solutions.hands
mp_drawing = mp.solutions.drawing_utils
hands = mp_hands.Hands(
    min_detection_confidence=0.7,
    min_tracking_confidence=0.7
)

# -------------------------
# Capture from webcam
# -------------------------
cap = cv2.VideoCapture(0)
FRAME_WIDTH = 1280
FRAME_HEIGHT = 720
FPS = 30
cap.set(cv2.CAP_PROP_FRAME_WIDTH, FRAME_WIDTH)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, FRAME_HEIGHT)
cap.set(cv2.CAP_PROP_FPS, FPS)

print("Starting real-time testing...")
print(f"Collecting {SEQUENCE_LENGTH} frames per prediction...")

sequence = []

while True:
    ret, frame = cap.read()
    if not ret:
        continue

    frame = cv2.flip(frame, 1)
    rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    result = hands.process(rgb_frame)

    # Initialize keypoints
    left_hand_kp = [(0, 0, 0)] * 21
    right_hand_kp = [(0, 0, 0)] * 21

    if result.multi_hand_landmarks and result.multi_handedness:
        for hand_landmarks, handedness in zip(result.multi_hand_landmarks, result.multi_handedness):
            hand_label = handedness.classification[0].label
            kp = [(lm.x, lm.y, lm.z) for lm in hand_landmarks.landmark]

            if hand_label == "Left":
                left_hand_kp = kp
            else:
                right_hand_kp = kp

            mp_drawing.draw_landmarks(frame, hand_landmarks, mp_hands.HAND_CONNECTIONS)

    # Flatten keypoints and append to sequence
    row = [coord for kp in left_hand_kp + right_hand_kp for coord in kp]
    sequence.append(row)

    # Keep only the last SEQUENCE_LENGTH frames
    if len(sequence) > SEQUENCE_LENGTH:
        sequence = sequence[-SEQUENCE_LENGTH:]

    # Predict if we have enough frames
    if len(sequence) == SEQUENCE_LENGTH:
        X_input = np.array(sequence, dtype=np.float32)
        X_input = np.expand_dims(X_input, axis=0)  # shape (1, SEQUENCE_LENGTH, N_FEATURES)
        pred = model.predict(X_input, verbose=0)
        pred_class_index = np.argmax(pred)
        confidence = np.max(pred)
        pred_class_name = label_encoder.inverse_transform([pred_class_index])[0]

        # Display prediction
        cv2.putText(frame, f"Prediction: {pred_class_name} ({confidence:.2f})",
                    (10, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)

    # Show webcam frame
    cv2.imshow("Real-Time Sign Language Test", frame)

    # Quit with 'q'
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()
hands.close()





Starting real-time testing...
Collecting 30 frames per prediction...
