In [11]:
# Create mini dataset
import os
import random
import shutil
import logging
from pathlib import Path


# Source and destination directories - fixed path formatting for Windows
source_dir = Path('/content/drive/MyDrive/SignComm_Dataset/ArSL_Dataset')  # Removed leading slash, using forward slashes
dest_dir = Path('/content/drive/MyDrive/SignComm_Dataset/mini_dataset')  # Removed leading slash, using forward slashes

# Number of videos to select per signer per sign
NUM_VIDEOS = 10

def create_mini_dataset():
    """Create a mini dataset with 10 videos per signer per sign."""

    dest_dir.mkdir(exist_ok=True)

    # Track statistics


    # Iterate through sign folders
    for sign_folder in source_dir.iterdir():
        if not sign_folder.is_dir():
            continue

        sign_name = sign_folder.name

        logging.info(f"Processing sign: {sign_name}")

        # Create corresponding sign folder in mini dataset
        mini_sign_folder = dest_dir / sign_name
        mini_sign_folder.mkdir(exist_ok=True)

        # Iterate through signer folders
        for signer_folder in sign_folder.iterdir():
            if not signer_folder.is_dir():
                continue

            signer_name = signer_folder.name

            # Skip if the signer is ahmed
            if signer_name.lower() == "ahmed":
                logging.info(f"Skipping signer: {signer_name}")

                continue



            # Create corresponding signer folder in mini dataset
            mini_signer_folder = mini_sign_folder / signer_name
            mini_signer_folder.mkdir(exist_ok=True)

            # Get all video files
            video_files = [f for f in signer_folder.iterdir() if f.is_file() and f.suffix.lower() in ['.mp4', '.avi', '.mov']]

            # Select videos to copy (either all if less than NUM_VIDEOS or random selection)
            if len(video_files) <= NUM_VIDEOS:
                selected_videos = video_files
            else:
                selected_videos = random.sample(video_files, NUM_VIDEOS)

            # Copy selected videos
            for video in selected_videos:
                dest_path = mini_signer_folder / video.name
                shutil.copy2(video, dest_path)




if __name__ == "__main__":
    create_mini_dataset()


In [5]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [12]:
!pip install mediapipe tensorflow opencv-python arabic-reshaper python-bidi scikit-learn

Collecting mediapipe
  Downloading mediapipe-0.10.21-cp311-cp311-manylinux_2_28_x86_64.whl.metadata (9.7 kB)
Collecting arabic-reshaper
  Downloading arabic_reshaper-3.0.0-py3-none-any.whl.metadata (12 kB)
Collecting python-bidi
  Downloading python_bidi-0.6.6-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4.9 kB)
Collecting sounddevice>=0.4.4 (from mediapipe)
  Downloading sounddevice-0.5.1-py3-none-any.whl.metadata (1.4 kB)
Downloading mediapipe-0.10.21-cp311-cp311-manylinux_2_28_x86_64.whl (35.6 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m35.6/35.6 MB[0m [31m23.7 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading arabic_reshaper-3.0.0-py3-none-any.whl (20 kB)
Downloading python_bidi-0.6.6-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (292 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m292.9/292.9 kB[0m [31m29.3 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading sounddevice-0.5.1-py3-none-any.whl (32 kB)
Instal

In [13]:
import mediapipe as mp
import cv2
import numpy as np
import os
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, LSTM, Dense, Attention, Flatten
from arabic_reshaper import reshape
from bidi.algorithm import get_display

1. Preprocessing with MediaPipe

In [None]:
# Function to extract pose and hand landmarks from a video
def extract_landmarks(video_path):
    # Initialize MediaPipe solutions for hands and pose detection
    mp_hands = mp.solutions.hands
    mp_pose = mp.solutions.pose
    hands = mp_hands.Hands()
    pose = mp_pose.Pose()

    # Open video file
    cap = cv2.VideoCapture(video_path)
    sequence = []   # List to hold frame data

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret: break    # Break if no frame

        # Process frame
        # Convert frame to RGB for MediaPipe processing
        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

        # Process the frame to get hand and pose landmarks
        hand_results = hands.process(frame_rgb)
        pose_results = pose.process(frame_rgb)


        # Get landmarks, Initialize lists to hold landmarks
        frame_data = []

        # Extract pose landmarks (33 landmarks)
        if pose_results.pose_landmarks:
            pose_data = [[lmk.x, lmk.y, lmk.z] for lmk in pose_results.pose_landmarks.landmark]
        else:
            pose_data = [[0,0,0]]*33   # Pad if no pose detected

        # Initialize lists for hands landmarks (21 landmarks each)
        left_hand = [[0,0,0]]*21
        right_hand = [[0,0,0]]*21

        # Extract hand landmarks
        if hand_results.multi_hand_landmarks:
            for hand, handedness in zip(hand_results.multi_hand_landmarks,
                                      hand_results.multi_handedness):
                if handedness.classification[0].label == "Left":
                    left_hand = [[lmk.x, lmk.y, lmk.z] for lmk in hand.landmark]
                else:
                    right_hand = [[lmk.x, lmk.y, lmk.z] for lmk in hand.landmark]

        # Flatten and combine all landmarks
        frame_data = np.array(pose_data + left_hand + right_hand).flatten()

        # Only add frames with hands (Append frame to sequence only if hands are present)
        if (np.any(left_hand != [0,0,0]) or np.any(right_hand != [0,0,0])):
           sequence.append(frame_data)

    if len(sequence) == 0:
        print(f"Warning: No hands detected in {video_path}")
    return np.array(sequence)

    cap.release()  # Release the video
    return np.array(sequence)  # Return the sequence of landmarks

In [None]:
# Initialize MediaPipe hand and pose solutions outside the function
mp_hands = mp.solutions.hands.Hands()
mp_pose = mp.solutions.pose.Pose()

# Function to extract landmarks from a single frame
def extract_landmarks_single(frame):
    # Use the global MediaPipe hand and pose solutions
    global mp_hands, mp_pose
    hands = mp_hands.Hands()
    pose = mp_pose.Pose()

    # Convert frame to RGB for MediaPipe
    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

    # Process landmarks
    hand_results = hands.process(frame_rgb)
    pose_results = pose.process(frame_rgb)

    # Same logic as extract_landmarks but for a single frame
    frame_data = []

    # Pose landmarks
    if pose_results.pose_landmarks:
        pose_data = [[lmk.x, lmk.y, lmk.z] for lmk in pose_results.pose_landmarks.landmark]
    else:
        pose_data = [[0,0,0]]*33

    # Hand landmarks
    left_hand = [[0,0,0]]*21
    right_hand = [[0,0,0]]*21

    if hand_results.multi_hand_landmarks:
        for hand, handedness in zip(hand_results.multi_hand_landmarks,
                                  hand_results.multi_handedness):
            if handedness.classification[0].label == "Left":
                left_hand = [[lmk.x, lmk.y, lmk.z] for lmk in hand.landmark]
            else:
                right_hand = [[lmk.x, lmk.y, lmk.z] for lmk in hand.landmark]

    # Flatten and return
    frame_data = np.array(pose_data + left_hand + right_hand).flatten()
    return frame_data if (np.any(left_hand) or np.any(right_hand)) else None

2. Data Conversion to Numpy Arrays

In [None]:
# Function to process all videos in a directory and save as numpy arrays
def process_dataset(input_dir, output_dir, seq_length=30):
    os.makedirs(output_dir, exist_ok=True) # Create output directory if it doesn't exist
    allowed_extensions = ['.mp4', '.avi', '.mov']  # List of video file extensions

    # Loop through each sign folder
    for sign_name in os.listdir(input_dir):
        sign_path = os.path.join(input_dir, sign_name)
        if not os.path.isdir(sign_path):  # Skip if it's not a directory
            continue

        # Loop through each signer folder
        for signer_name in os.listdir(sign_path):
            signer_path = os.path.join(sign_path, signer_name)
            if not os.path.isdir(signer_path):
                continue

            # Loop through each video file
            for video_file in os.listdir(signer_path):
                # Skip non-video files
                if not any(video_file.lower().endswith(ext) for ext in allowed_extensions):
                  continue

                video_path = os.path.join(signer_path, video_file)

                sequence = extract_landmarks(video_path)  # Extract landmarks from video

                if len(sequence) == 0:  # Skip videos with no hands detected
                  continue

                # Padding with zeros to match sequence length
                num_features = 75 * 3  # 33 pose + 21*2 hands
                padded_sequence = np.zeros((seq_length, num_features))

                if len(sequence) > seq_length:
                   padded_sequence = sequence[:seq_length]  # If sequence is longer, truncate
                else:
                   padded_sequence[:len(sequence)] = sequence   # If sequence is shorter, pad with zeros

                # Save with sign_name in filename
                base_name = os.path.splitext(video_file)[0]
                numpy_filename = f"{sign_name}_{signer_name}_{base_name}.npy"
                np.save(os.path.join(output_dir, numpy_filename), padded_sequence) # Save the padded sequence as numpy array

In [None]:
# Set the paths
input_video_dir = r"C:\Users\DELL\Desktop\ArSL_Model\ArSL_Dataset\Videos"  # Raw videos (28 subfolders)
output_numpy_dir = r"C:\Users\DELL\Desktop\ArSL_Model\output_dir"  # Processed numpy data
model_save_path = r"C:\Users\DELL\Desktop\ArSL_Model\model.h5"     # Trained model path

# to process all videos
process_dataset(input_video_dir, output_numpy_dir, seq_length=30)

3. Data loading

In [None]:
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical

# Function to load data from numpy arrays and prepare for model training
def load_data(numpy_dir, label_mapping):
    X = []  # List to hold feature data
    y = []  # List to hold label data

    # Loop through each numpy file in directory
    for file in os.listdir(numpy_dir):
        if not file.endswith(".npy"):  # Skip non-numpy files
            continue

        # Extract label from the first part of the filename
        label = file.split("_")[0]
        class_idx = label_mapping[label]  # Get the class index from label mapping

        data = np.load(os.path.join(numpy_dir, file)) # Load numpy array from the file
        X.append(data)  # Append data to features list
        y.append(class_idx) # Append label to labels list

    X = np.array(X)  # Convert feature list to numpy array
    y = to_categorical(y, num_classes=28)  # Convert labels to encoded format
    return train_test_split(X, y, test_size=0.2) # Split the data into training and testing sets

4. LSTM Model with Attention

In [None]:
from tensorflow.keras.layers import Layer, LSTM, Dense, Permute, Multiply, Flatten
from tensorflow.keras.layers import Bidirectional

# Define custom temporal attention layer
class TemporalAttention(Layer):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    def build(self, input_shape):
        # Initialize attention weight
        self.W = self.add_weight(name='att_weight', shape=(input_shape[-1], 1), initializer='normal')

    def call(self, x):
        # Calculate attention weights
        e = tf.tanh(tf.matmul(x, self.W))
        a = tf.nn.softmax(e, axis=1)
        # Apply attention to the input sequence
        output = x * a
        # Aggregate the attentionaly weighted features over the sequence
        return tf.reduce_sum(output, axis=1)

# Function to build the LSTM model with attention
def build_model(input_shape, num_classes):
    inputs = Input(shape=input_shape)

    # Bi-directional LSTM layers with return sequences
    x = Bidirectional(LSTM(256, return_sequences=True))(inputs)
    x = Bidirectional(LSTM(128, return_sequences=True))(x)

    # Temporal Attention Layer
    attention = TemporalAttention()(x)

    # Classification using dense layer
    outputs = Dense(num_classes, activation='softmax')(attention)

    # Create model object
    model = Model(inputs, outputs)
    # Compile the model
    model.compile(loss='categorical_crossentropy',
                optimizer='adam',
                metrics=['accuracy'])
    return model

5. Training and Evaluation

In [None]:
# Function to train and evaluate the model
def train_model(X_train, y_train, X_test, y_test):
    # Build the model using the specified input shape and number of classes
    model = build_model(X_train.shape[1:], 28)

    # Define callbacks for early stopping and saving the best model
    callbacks = [
        # I'll remove EarlyStopping to ensure full 5 epochs
        # tf.keras.callbacks.EarlyStopping(patience=10),
        tf.keras.callbacks.ModelCheckpoint('best_model.h5', save_best_only=True)
    ]

    # Train the model
    history = model.fit(X_train, y_train,
                      validation_data=(X_test, y_test),
                      epochs=5,
                      batch_size=8,
                      verbose=1,  # show progress
                      callbacks=callbacks)
    return model   # Return the trained model

In [None]:
# Create label mapping based on your dataset's sign names
label_mapping = {
    'اسمك ايه ؟': 0,
    'اشاره': 1,
    'الحمدلله': 2,
    'السلام عليكم': 3,
    'الصم': 4,
    'اللغه العربيه': 5,
    'ان شاء الله': 6,
    'انا': 7,
    'انت': 8,
    'ايه ؟': 9,
    'برنامج': 10,
    'تخرج': 11,
    'جميل': 12,
    'دكتور': 13,
    'شكرا': 14,
    'طالب': 15,
    'عامل ايه ؟': 16,
    'فكرة': 17,
    'في': 18,
    'كلية حاسبات و معلومات': 19,
    'مترجم': 20,
    'مجتمع': 21,
    'مساعده': 22,
    'مشروع': 23,
    'ناجح': 24,
    'هدف': 25,
    'و': 26,
    'وعليكم السلام': 27,
}

# Load data and train
X_train, X_test, y_train, y_test = load_data(output_numpy_dir, label_mapping)
model = train_model(X_train, y_train, X_test, y_test)

# Final evaluation
loss, accuracy = model.evaluate(X_test, y_test)
print(f"Final Model Accuracy: {accuracy*100:.2f}%")

6. Arabic Support

In [None]:
# Function to display arabic text on frame
def display_arabic_text(frame, text):
    reshaped_text = reshape(text)  # Reshape text for Arabic display
    bidi_text = get_display(reshaped_text)  # Get display text for correct display order
    cv2.putText(frame, bidi_text, (50,100),
               cv2.FONT_HERSHEY_SIMPLEX, 1,
               (0,255,0), 2)

In [None]:
# Function to get the Arabic label from the index
def get_arabic_label(index):
    arabic_labels = [ "اسمك ايه ؟", "اشاره", "الحمدلله","السلام عليكم","الصم","اللغه العربيه","ان شاء الله","انا","انت","ايه ؟","برنامج","تخرج",
    "جميل","دكتور","شكرا","طالب","عامل ايه ؟","فكرة","في","كلية حاسبات و معلومات","مترجم","مجتمع","مساعده","مشروع","ناجح","هدف","و","وعليكم السلام"]
    return arabic_labels[index]

7. Real-Time Translation

In [None]:
# Function for real-time translation
def real_time_translation(model, seq_length=30):
    cap = cv2.VideoCapture(0)  # Open default camera
    buffer = []  # Initialize frame buffer

    while cap.isOpened():
        ret, frame = cap.read()   # Read frame from the camera
        if not ret: break   # Break if no frame is read

        # Process the frame to get hand and pose landmarks
        processed_frame = extract_landmarks_single(frame)

        if processed_frame is None:
            # Display a text to show hands if not detected
            cv2.putText(frame, "Show Hands", (50,50),
                      cv2.FONT_HERSHEY_SIMPLEX, 1,
                      (0,0,255), 2)

        # Append the frame to the buffer if landmarks are detected
        else:  # Has hands
            buffer.append(processed_frame) # Append the processed frame to buffer
            buffer = [f for f in buffer if f is not None][-seq_length:]  # Keep only the most recent frames and filter out any None


            if len(buffer) == seq_length:
                # Make a prediction using the model
                prediction = model.predict(np.array([buffer]))
                arabic_word = get_arabic_label(np.argmax(prediction)) # Get the predicted word
                display_arabic_text(frame, arabic_word) # Display it on the frame

        # Display the frame
        cv2.imshow('Translation', frame)
        if cv2.waitKey(1) & 0xFF == ord('q'):  # Exit if 'q' is pressed
            break

    cap.release()  # Release the camera
    cv2.destroyAllWindows()  # Close all windows

8. Save/Load Model

In [None]:
# Function to save the model
def save_model(model, path):
    model.save(path)

# Function to load the model
def load_model(path):
    return tf.keras.models.load_model(
        path,
        custom_objects={'TemporalAttention': TemporalAttention}
    )

In [None]:
model = load_model(model_save_path)
real_time_translation(model)