In [2]:
import cv2
import mediapipe as mp
import numpy as np
import os

In [3]:
# Change MediaPipe initialization to use Holistic instead of just Hands
mp_holistic = mp.solutions.holistic
mp_drawing = mp.solutions.drawing_utils

In [4]:

def calculate_finger_angles(landmarks):
    """Calculate bend angles for each finger"""
    # Finger base joints indices
    finger_bases = [1, 5, 9, 13, 17]  # Thumb, index, middle, ring, pinky bases
    # Middle joints indices
    middle_joints = [2, 6, 10, 14, 18]
    # Tip joints indices
    tip_joints = [4, 8, 12, 16, 20]
    
    angles = []
    for base, mid, tip in zip(finger_bases, middle_joints, tip_joints):
        # Get coordinates
        base_coords = np.array([landmarks[base].x, landmarks[base].y])
        mid_coords = np.array([landmarks[mid].x, landmarks[mid].y])
        tip_coords = np.array([landmarks[tip].x, landmarks[tip].y])
        
        # Calculate vectors
        v1 = mid_coords - base_coords
        v2 = tip_coords - mid_coords
        
        # Calculate angle between vectors (in radians)
        cosine_angle = np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2))
        angle = np.arccos(np.clip(cosine_angle, -1.0, 1.0))
        
        angles.append(angle)
    
    return angles

In [5]:
def calculate_hand_to_hand_distances(left_landmarks, right_landmarks):
    """Calculate key distances between hands"""
    # Key points to measure between hands
    key_points = [0, 4, 8, 12, 20]  # Wrist, thumb tip, index tip, middle tip, pinky tip
    
    distances = []
    for idx in key_points:
        left_point = np.array([left_landmarks[idx].x, left_landmarks[idx].y])
        right_point = np.array([right_landmarks[idx].x, right_landmarks[idx].y])
        distance = np.linalg.norm(left_point - right_point)
        distances.append(distance)
    
    return distances

In [6]:

def extract_enhanced_keypoints(frame, holistic):
    """
    Process the frame through MediaPipe Holistic to extract enhanced features including:
    - Both hands landmarks
    - Face landmarks (selected points)
    - Hand motion features (velocity)
    - Hand configuration metrics (angles)
    Returns both the features array and the results object
    """
    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    results = holistic.process(frame_rgb)
    
    # Initialize empty feature array
    features = []
    
    # 1. Extract left hand landmarks if detected
    left_hand = []
    if results.left_hand_landmarks:
        for lm in results.left_hand_landmarks.landmark:
            left_hand.extend([lm.x, lm.y, lm.z])
    else:
        # Fill with zeros if hand not detected
        left_hand = [0] * (21 * 3)
        
    # 2. Extract right hand landmarks if detected
    right_hand = []
    if results.right_hand_landmarks:
        for lm in results.right_hand_landmarks.landmark:
            right_hand.extend([lm.x, lm.y, lm.z])
    else:
        # Fill with zeros if hand not detected
        right_hand = [0] * (21 * 3)
    
    pose_landmarks = []
    if results.pose_landmarks:
        # 3. Extract pose landmarks if detected
        for lm in results.pose_landmarks.landmark:
            pose_landmarks.extend([lm.x, lm.y, lm.z])
            
    else:
        # Fill with zeros if pose not detected
        pose_landmarks = [0] * (33 * 3)
    
    # Combine all features
    features.extend(left_hand)
    features.extend(right_hand)
    features.extend(pose_landmarks)
    
    # Add finger bend angles for left hand if detected
    if results.left_hand_landmarks:
        angles = calculate_finger_angles(results.left_hand_landmarks.landmark)
        features.extend(angles)
    else:
        features.extend([0] * 5)  # One angle per finger
        
    # Add finger bend angles for right hand if detected
    if results.right_hand_landmarks:
        angles = calculate_finger_angles(results.right_hand_landmarks.landmark)
        features.extend(angles)
    else:
        features.extend([0] * 5)  # One angle per finger
        
    # Calculate hand-to-hand distances if both hands detected
    if results.left_hand_landmarks and results.right_hand_landmarks:
        distances = calculate_hand_to_hand_distances(
            results.left_hand_landmarks.landmark,
            results.right_hand_landmarks.landmark
        )
        features.extend(distances)
    else:
        features.extend([0] * 5)  # Key distances between hands
        
    return np.array(features), results

In [7]:
def enhanced_record_gesture(gesture_label, num_samples=30, sequence_length=30, 
                 output_dir="data/raw", video_dir="data/videos"):
    """
    Enhanced version that captures richer features for ISL gestures
    """
    os.makedirs(output_dir, exist_ok=True)
    os.makedirs(video_dir, exist_ok=True)
    
    cap = cv2.VideoCapture(0)
    holistic = mp_holistic.Holistic(
        static_image_mode=False,
        model_complexity=1,
        min_detection_confidence=0.7,
        min_tracking_confidence=0.5
    )
    
    sample_count = 0
    print(f"Starting data collection for gesture: '{gesture_label}'")
    print("Press 's' to start recording a sample, or 'q' to quit.")

    fourcc = cv2.VideoWriter_fourcc(*'XVID')
    
    while sample_count < num_samples:
        ret, frame = cap.read()
        if not ret:
            continue
        
        # Display instructions on the frame
        cv2.putText(frame, f"Gesture: {gesture_label} | Sample {sample_count+1}/{num_samples}",
                    (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
        cv2.imshow("Data Collection", frame)
        key = cv2.waitKey(1) & 0xFF
        
        if key == ord('s'):
            print("Recording sample...")
            sequence = []
            video_filename = os.path.join(video_dir, f"{gesture_label}_{sample_count+1}.avi")
            height, width, _ = frame.shape
            out = cv2.VideoWriter(video_filename, fourcc, 20.0, (width, height))
            
            prev_keypoints = None
            
            while len(sequence) < sequence_length:
                ret, frame = cap.read()
                if not ret:
                    continue
                out.write(frame)  # Save frame to video file
                
                # Extract enhanced keypoints and get results
                keypoints, results = extract_enhanced_keypoints(frame, holistic)
                
                # Calculate velocity if we have previous keypoints
                if prev_keypoints is not None:
                    velocity = keypoints - prev_keypoints
                    # Append velocity features to current keypoints
                    keypoints = np.concatenate([keypoints, velocity])
                else:
                    # For the first frame, use zeros for velocity
                    velocity = np.zeros_like(keypoints)
                    keypoints = np.concatenate([keypoints, velocity])
                
                # Save current keypoints for next frame velocity calculation
                prev_keypoints = keypoints[:len(keypoints)//2]  # Only store original features, not velocity
                
                # Add to sequence
                if keypoints is not None:
                    sequence.append(keypoints)
                
                # Display frame with landmarks
                if results.right_hand_landmarks:
                    mp_drawing.draw_landmarks(frame, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS)
                if results.left_hand_landmarks:
                    mp_drawing.draw_landmarks(frame, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS)
               
                
                cv2.putText(frame, f"Recording... {len(sequence)}/{sequence_length}",
                            (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
                cv2.imshow("Data Collection", frame)
                if cv2.waitKey(1) & 0xFF == ord('q'):
                    break
            
            out.release()  # Finish saving the video
            
            if len(sequence) == sequence_length:
                sequence = np.array(sequence)
                filename = os.path.join(output_dir, f"{gesture_label}_{sample_count+1}.npy")
                np.save(filename, sequence)
                print(f"Saved sample {sample_count+1} as {filename}")
                print(f"Saved raw video as {video_filename}")
                sample_count += 1
            else:
                print("Incomplete sample, discarding...")
        elif key == ord('q'):
            break

    cap.release()
    cv2.destroyAllWindows()
    holistic.close()
    print("Data collection complete.")

In [8]:
BASE_DATA_DIR = "../data"
RAW_DATA_DIR = os.path.join(BASE_DATA_DIR, "raw")
PROCESSED_DATA_DIR = os.path.join(BASE_DATA_DIR, "processed")
VIDEO_DIR = os.path.join(BASE_DATA_DIR, "videos")

In [9]:
def extract_keypoints_from_video(video_path, output_dir):
    """
    Extract keypoints from a video file and save them as numpy arrays
    """
    os.makedirs(output_dir, exist_ok=True)
    
    cap = cv2.VideoCapture(video_path)
    holistic = mp_holistic.Holistic(
        static_image_mode=False,
        model_complexity=1,
        min_detection_confidence=0.7,
        min_tracking_confidence=0.5
    )
    
    frame_count = 0
    sequence = []
    
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        
        # Process the frame
        keypoints, results = extract_enhanced_keypoints(frame, holistic)
        sequence.append(keypoints)
        
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
        
        frame_count += 1
    
    cap.release()
    # cv2.destroyAllWindows()
    
    # Save the extracted keypoints
    sequence = np.array(sequence)
    output_filename = os.path.join(output_dir, os.path.basename(video_path).replace('.avi', '.npy'))
    np.save(output_filename, sequence)
    
    print(f"Extracted keypoints saved to {output_filename}")

In [10]:
for video_file in os.listdir(VIDEO_DIR):
    if video_file.endswith('.avi'):
        video_path = os.path.join(VIDEO_DIR, video_file)
        extract_keypoints_from_video(video_path, output_dir=RAW_DATA_DIR)

Extracted keypoints saved to ../data\raw\bye_1.npy
Extracted keypoints saved to ../data\raw\bye_10.npy
Extracted keypoints saved to ../data\raw\bye_11.npy


KeyboardInterrupt: 

In [11]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
import pandas as pd
import glob
import pickle
import random
from sklearn.metrics import classification_report, confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt
import csv


In [12]:
class GestureDataset(Dataset):
    def __init__(self, data_dir, labels, sequence_length=30):
        self.data_dir = data_dir
        self.labels = labels
        self.sequence_length = sequence_length
        self.data = []
        self.label_map = {}
        
        # Load all gesture files
        for label in labels:
            files = glob.glob(os.path.join(data_dir, f"{label}_*.npy"))
            for file in files:
                keypoints = np.load(file)
                if keypoints.shape[0] == sequence_length:
                    self.data.append((keypoints, label))
                    if label not in self.label_map:
                        self.label_map[label] = len(self.label_map)  # Assign a unique index to each label
        
        random.shuffle(self.data)  # Shuffle the dataset for better training
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        keypoints, label = self.data[idx]
        return torch.tensor(keypoints, dtype=torch.float32), torch.tensor(self.label_map[label], dtype=torch.long)


In [13]:
def load_data(data_dir, sequence_length=30):
    """
    Load the dataset and split into training and testing sets
    """
    labels = [d.split('_')[0] for d in os.listdir(data_dir) if d.endswith('.npy')]
    dataset = GestureDataset(data_dir, labels, sequence_length)
    
    # Split into train and test sets
    train_data, test_data = train_test_split(dataset.data, test_size=0.2, random_state=42)
    
    # Create DataLoader for training and testing
    train_dataset = GestureDataset(data_dir, [d[1] for d in train_data], sequence_length)
    test_dataset = GestureDataset(data_dir, [d[1] for d in test_data], sequence_length)
    
    return train_dataset, test_dataset, dataset.label_map



In [25]:
class GestureRecognitionModel(nn.Module):
    def __init__(self, input_size, num_classes):
        super(GestureRecognitionModel, self).__init__()
        self.lstm = nn.LSTM(input_size, 64,5 ,batch_first=True)
        self.fc = nn.Linear(64, num_classes)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.5)
        self.softmax = nn.Softmax(dim=1)

    def forward(self, x):
        x, _ = self.lstm(x)
        x = x[:, -1, :]
        x = self.relu(x)
        x = self.dropout(x)
        x = self.fc(x)
        x = self.softmax(x)
        return x


In [26]:
def train_model(
    data_dir, model_save_path, sequence_length=30, num_epochs=50, batch_size=32
):
    """
    Train the gesture recognition model
    """
    train_dataset, test_dataset, label_map = load_data(data_dir, sequence_length)

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

    input_size = train_dataset[0][0].shape[1]  # Number of features per frame
    num_classes = len(label_map)  # Number of unique gestures

    model = GestureRecognitionModel(input_size, num_classes)

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.1)
    best_accuracy = 0.0
    history = []

    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0

        for keypoints, labels in train_loader:
            keypoints, labels = keypoints.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(keypoints)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        train_loss = running_loss / len(train_loader)
        train_acc = correct / total

        # Validation phase
        model.eval()
        val_loss = 0.0
        val_correct = 0
        val_total = 0

        with torch.no_grad():
            for keypoints, labels in test_loader:
                keypoints, labels = keypoints.to(device), labels.to(device)

                outputs = model(keypoints)
                loss = criterion(outputs, labels)

                val_loss += loss.item()
                _, predicted = torch.max(outputs.data, 1)
                val_total += labels.size(0)
                val_correct += (predicted == labels).sum().item()

        val_loss /= len(test_loader)
        val_acc = val_correct / val_total

        scheduler.step()

        history.append((train_loss, train_acc, val_loss, val_acc))

        print(
            f"Epoch [{epoch + 1}/{num_epochs}], "
            f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}, "
            f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}"
        )

        if val_acc > best_accuracy:
            best_accuracy = val_acc
            torch.save(model.state_dict(), model_save_path)

    print("Training complete. Best validation accuracy: {:.4f}".format(best_accuracy))
    return model, history


In [16]:

def evaluate_model(model, test_loader, label_map):
    """
    Evaluate the model on the test set and print classification report
    """
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    model.eval()
    
    all_labels = []
    all_preds = []
    
    with torch.no_grad():
        for keypoints, labels in test_loader:
            keypoints, labels = keypoints.to(device), labels.to(device)
            outputs = model(keypoints)
            _, predicted = torch.max(outputs.data, 1)
            
            all_labels.extend(labels.cpu().numpy())
            all_preds.extend(predicted.cpu().numpy())
    
    # Convert label map to a list of labels
    label_list = list(label_map.keys())
    
    print(classification_report(all_labels, all_preds, target_names=label_list))
    
    # Confusion matrix
    cm = confusion_matrix(all_labels, all_preds)
    plt.figure(figsize=(10, 7))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=label_list, yticklabels=label_list)
    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.title('Confusion Matrix')
    plt.show()

In [17]:
def plot_loss_accuracy(history):
    """
    Plot training and validation loss and accuracy
    """
    epochs = range(1, len(history) + 1)
    
    train_loss = [h[0] for h in history]
    train_acc = [h[1] for h in history]
    val_loss = [h[2] for h in history]
    val_acc = [h[3] for h in history]
    
    plt.figure(figsize=(12, 5))
    
    plt.subplot(1, 2, 1)
    plt.plot(epochs, train_loss, label='Train Loss')
    plt.plot(epochs, val_loss, label='Validation Loss')
    plt.title('Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()
    
    plt.subplot(1, 2, 2)
    plt.plot(epochs, train_acc, label='Train Accuracy')
    plt.plot(epochs, val_acc, label='Validation Accuracy')
    plt.title('Accuracy')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.legend()
    
    plt.show()

In [18]:
def save_model(model, model_save_path):
    """
    Save the trained model to a file
    """
    torch.save(model.state_dict(), model_save_path)
    print(f"Model saved to {model_save_path}")

In [19]:
MODEL_PATH = os.path.join("../models", "noface.v1.pth")
LOG_DIR = os.path.join("..", "logs")
os.makedirs(LOG_DIR, exist_ok=True)

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
model, history = train_model(RAW_DATA_DIR, MODEL_PATH, sequence_length=30, num_epochs=50, batch_size=32)
plot_loss_accuracy(history)
test_dataset, _, label_map = load_data(RAW_DATA_DIR, sequence_length=30)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)
evaluate_model(model, test_loader, label_map)
save_model(model, MODEL_PATH)
# Save the label map to a file
label_map_path = os.path.join(LOG_DIR, "label_map.pkl")
with open(label_map_path, 'wb') as f:
    pickle.dump(label_map, f)
    
# Save training history to a CSV file
history_path = os.path.join(LOG_DIR, "training_history.csv")
with open(history_path, 'w', newline='') as f:
    writer = csv.writer(f)
    writer.writerow(['Epoch', 'Train Loss', 'Train Accuracy', 'Validation Loss', 'Validation Accuracy'])
    for epoch, (train_loss, train_acc, val_loss, val_acc) in enumerate(history):
        writer.writerow([epoch + 1, train_loss, train_acc, val_loss, val_acc])

Using device: cuda


In [None]:
def load_model(model_save_path, data_dir, sequence_length=30, batch_size=32):
    """
    Load the trained model from a file
    """
    train_dataset, _, label_map = load_data(data_dir, sequence_length)
    input_size = train_dataset[0][0].shape[1]  # Number of features per frame
    num_classes = len(label_map)  # Number of unique gestures

    model = GestureRecognitionModel(input_size, num_classes)

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.load_state_dict(torch.load(model_save_path))
    model.to(device)
    model.eval()
    print(f"Model loaded from {model_save_path}")
    return model

In [None]:
def load_label_map(label_map_path):
    """
    Load the label map from a file
    """
    with open(label_map_path, 'rb') as f:
        label_map = pickle.load(f)
    print(f"Label map loaded from {label_map_path}")
    return label_map

In [None]:
def real_time_prediction():
    """
    Real-time gesture recognition using the trained model
    """
    cap = cv2.VideoCapture(0)
    model = load_model(MODEL_PATH, RAW_DATA_DIR)
    label_map = load_label_map(os.path.join(LOG_DIR, "label_map.pkl"))
    
    holistic = mp_holistic.Holistic(
        static_image_mode=False,
        model_complexity=1,
        min_detection_confidence=0.7,
        min_tracking_confidence=0.5
    )
    
    sequence = []
    
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        
        keypoints, results = extract_enhanced_keypoints(frame, holistic)
        
        # Append keypoints to sequence
        sequence.append(keypoints)
        
        # If we have enough frames in the sequence, make a prediction
        if len(sequence) == 30:
            sequence = np.array(sequence)
            sequence = torch.tensor(sequence, dtype=torch.float32).unsqueeze(0)  # Add batch dimension
            
            with torch.no_grad():
                outputs = model(sequence)
                _, predicted = torch.max(outputs.data, 1)
                predicted_label = list(label_map.keys())[predicted.item()]
            
            # Display the predicted label on the frame
            cv2.putText(frame, f"Predicted: {predicted_label}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
            
            # Clear the sequence for the next prediction
            sequence = []
        
        # Draw landmarks on the frame
        if results.right_hand_landmarks:
            mp_drawing.draw_landmarks(frame, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS)
        if results.left_hand_landmarks:
            mp_drawing.draw_landmarks(frame, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS)
        if results.pose_landmarks:
            mp_drawing.draw_landmarks(frame, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS)
        
        cv2.imshow("Real-Time Gesture Recognition", frame)
        
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
    
    cap.release()
    cv2.destroyAllWindows()
    holistic.close()

In [None]:
real_time_prediction()