In [None]:
import numpy as np
import cv2
import os
import mediapipe as mp
from pathlib import Path
import xgboost as xgb
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, confusion_matrix, ConfusionMatrixDisplay, classification_report
from sklearn.preprocessing import LabelEncoder, StandardScaler
import matplotlib.pyplot as plt
from scipy import stats
from scipy.spatial.distance import euclidean
import joblib
import warnings
warnings.filterwarnings('ignore')

# Initialize MediaPipe Holistic
mp_holistic = mp.solutions.holistic


In [None]:
video_dir = Path("Data/INCLUDE/Society/Videos")
landmark_dir = Path("Data/INCLUDE/Society/Landmarks")

#Create same class named folder in Landmark directory 
class_names = []
# Use .iterdir() to loop through the contents of the directory
for item in video_dir.iterdir():
    # Check if the item is a directory before adding its name
    if item.is_dir():
        class_names.append(item.name)
        
# Loop through your list of names and create a new folder for each
for name in class_names:
    # Construct the full path for the new folder
    new_folder_path = os.path.join(landmark_dir, name)
    
    # Create the directory. exist_ok=True prevents an error if it already exists.
    os.makedirs(new_folder_path, exist_ok=True)
print("✅ Folders created successfully!")


In [None]:
# Globals to store expected face landmark count
EXPECTED_FACE_LANDMARKS = None

class AdvancedFeatureExtractor:
    def __init__(self):
        self.hand_landmarks = 21
        self.face_landmarks = 468
        
    def extract_hand_features(self, hand_landmarks):
        """Extract advanced hand features including velocity and acceleration"""
        if hand_landmarks.shape[0] == 0:
            return np.zeros(126)  # 42*3 features (basic + velocity + acceleration)
            
        features = []
        
        # Basic statistics
        features.extend([
            np.mean(hand_landmarks, axis=0),
            np.std(hand_landmarks, axis=0),
            np.min(hand_landmarks, axis=0),
            np.max(hand_landmarks, axis=0)
        ])
        
        # Velocity features (frame-to-frame differences)
        if hand_landmarks.shape[0] > 1:
            velocities = np.diff(hand_landmarks, axis=0)
            features.extend([
                np.mean(velocities, axis=0),
                np.std(velocities, axis=0),
            ])
        else:
            features.extend([np.zeros(42), np.zeros(42)])
            
        return np.concatenate(features)
    
    def extract_inter_hand_features(self, right_hand, left_hand):
        """Extract features between hands"""
        if right_hand.shape[0] == 0 or left_hand.shape[0] == 0:
            return np.zeros(6)
            
        # Reshape to (frames, 21, 2) for proper centroid calculation
        right_reshaped = right_hand.reshape(-1, 21, 2)
        left_reshaped = left_hand.reshape(-1, 21, 2)
        
        # Distance between hand centroids over time
        right_centroid = np.mean(right_reshaped, axis=1)
        left_centroid = np.mean(left_reshaped, axis=1)
        
        distances = [euclidean(r, l) for r, l in zip(right_centroid, left_centroid)]
        
        features = [
            np.mean(distances),
            np.std(distances),
            np.min(distances),
            np.max(distances)
        ]
        
        # Relative hand positions
        relative_positions = right_centroid - left_centroid
        features.extend([
            np.mean(relative_positions, axis=0),
            np.std(relative_positions, axis=0)
        ])
        
        return np.concatenate(features)

def extract_frame_landmarks(results):
    global EXPECTED_FACE_LANDMARKS
    lm = []
    
    # Right hand (21 points × x,y)
    if results.right_hand_landmarks:
        for p in results.right_hand_landmarks.landmark:
            lm.extend([p.x, p.y])
    else:
        lm.extend([0] * 42)
        
    # Left hand
    if results.left_hand_landmarks:
        for p in results.left_hand_landmarks.landmark:
            lm.extend([p.x, p.y])
    else:
        lm.extend([0] * 42)
        
    # Face landmarks
    face_lms = results.face_landmarks.landmark if results.face_landmarks else []
    count = len(face_lms)
    if EXPECTED_FACE_LANDMARKS is None:
        EXPECTED_FACE_LANDMARKS = count or 468
        
    for i in range(min(count, EXPECTED_FACE_LANDMARKS)):
        p = face_lms[i]
        lm.extend([p.x, p.y])
        
    missing = EXPECTED_FACE_LANDMARKS - min(count, EXPECTED_FACE_LANDMARKS)
    if missing > 0:
        lm.extend([0.0, 0.0] * missing)
    
    # Final consistency check
    total_features = 42 * 2 + EXPECTED_FACE_LANDMARKS * 2
    if len(lm) != total_features:
        raise ValueError(f"Inconsistent landmark length {len(lm)} vs expected {total_features}")
    return np.array(lm, dtype=np.float32)

def extract_advanced_features_from_sequence(sequence):
    """Extract advanced features from a landmark sequence"""
    feature_extractor = AdvancedFeatureExtractor()
    
    # Split sequence into components
    right_hand = sequence[:, :42]
    left_hand = sequence[:, 42:84]
    face = sequence[:, 84:]
    
    # Basic statistical features
    basic_features = np.concatenate([
        sequence.mean(axis=0),
        sequence.std(axis=0),
        sequence.min(axis=0),
        sequence.max(axis=0)
    ])
    
    # Advanced hand features
    right_hand_features = feature_extractor.extract_hand_features(right_hand)
    left_hand_features = feature_extractor.extract_hand_features(left_hand)
    
    # Inter-hand features
    inter_hand_features = feature_extractor.extract_inter_hand_features(right_hand, left_hand)
    
    # Temporal features
    temporal_features = []
    if sequence.shape[0] > 1:
        # Total variation (movement energy)
        total_variation = np.sum(np.abs(np.diff(sequence, axis=0)))
        temporal_features.append(total_variation)
        
        # Sequence length (normalized)
        temporal_features.append(sequence.shape[0] / 200.0)
    else:
        temporal_features.extend([0, 0])
    
    # Statistical shape features
    statistical_features = []
    try:
        statistical_features.extend([
            stats.skew(sequence, axis=0).mean(),  # Average skewness
            stats.kurtosis(sequence, axis=0).mean(),  # Average kurtosis
        ])
    except:
        statistical_features.extend([0, 0])
    
    # Combine all features
    all_features = np.concatenate([
        basic_features,
        right_hand_features,
        left_hand_features,
        inter_hand_features,
        temporal_features,
        statistical_features
    ])
    
    return all_features


In [None]:
def process_video(video_dir, landmark_dir, holistic_model):
    for file in os.listdir(video_dir):
        if not file.lower().endswith((".avi", ".mp4", ".mov")):
            continue
            
        video_path = os.path.join(video_dir, file)
        cap = cv2.VideoCapture(video_path)
        frames_lm = []
    
        print(f"Processing {file}...")
        frame_count = 0
    
        while True:
            ret, frame = cap.read()
            if not ret:
                break
                
            rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            results = holistic_model.process(rgb_frame)
            
            frame_landmarks = extract_frame_landmarks(results)
            frames_lm.append(frame_landmarks)
            frame_count += 1
    
        cap.release()
        
        if frames_lm:
            sequence = np.stack(frames_lm, axis=0)
            out_file = os.path.join(landmark_dir, file.rsplit(".", 1)[0] + ".npy")
            np.save(out_file, sequence)
            print(f"Saved landmarks to {out_file} with shape {sequence.shape}")


In [None]:
def load_data_advanced(data_root):
    """Load data with advanced feature extraction"""
    X, y = [], []
    feature_sizes = {}
    
    for category_folder in Path(data_root).iterdir():
        landmarks_root = category_folder / "Landmarks"
        if not landmarks_root.exists():
            continue
            
        for sign_folder in landmarks_root.iterdir():
            if not sign_folder.is_dir():
                continue
                
            for npy_path in sign_folder.glob("*.npy"):
                try:
                    seq = np.load(npy_path)
                    if seq.shape[0] == 0:
                        continue
                    
                    # Track feature dimensions
                    feature_dim = seq.shape[1]
                    if feature_dim not in feature_sizes:
                        feature_sizes[feature_dim] = 0
                    feature_sizes[feature_dim] += 1
                    
                    # Extract advanced features
                    advanced_features = extract_advanced_features_from_sequence(seq)
                    
                    X.append(advanced_features)
                    y.append(sign_folder.name)
                    
                except Exception as e:
                    print(f"Error processing {npy_path}: {e}")
                    continue
    
    print(f"Feature dimension distribution: {feature_sizes}")
    print(f"Loaded {len(X)} samples with {len(X[0]) if X else 0} features each")
    
    return np.vstack(X) if X else np.array([]), np.array(y)

def augment_data(X, y, augmentation_factor=1):
    """Augment data by adding noise"""
    X_aug, y_aug = [], []
    
    for i in range(len(X)):
        X_aug.append(X[i])
        y_aug.append(y[i])
        
        # Add noisy versions
        for _ in range(augmentation_factor):
            noise = np.random.normal(0, 0.01, X[i].shape)
            X_aug.append(X[i] + noise)
            y_aug.append(y[i])
            
    return np.vstack(X_aug), np.array(y_aug)


In [None]:
# =====================================================================================
#                         AUTOMATION CELL TO PROCESS ALL CLASSES
# =====================================================================================

BASE_VIDEO_PATH = video_dir
BASE_LANDMARK_PATH = landmark_dir

try:
    class_folders = [d for d in BASE_VIDEO_PATH.iterdir() if d.is_dir()]
    class_names = [d.name for d in class_folders]
    print(f"✅ Found {len(class_names)} classes: {class_names}\n")
except FileNotFoundError:
    print(f"❌ ERROR: The directory '{BASE_VIDEO_PATH}' was not found.")
    class_folders = []

class_averages = {}

# Process videos to landmarks
with mp_holistic.Holistic(
    static_image_mode=False,
    model_complexity=1,
    smooth_landmarks=True,
    refine_face_landmarks=True,
    min_detection_confidence=0.5,
    min_tracking_confidence=0.5
) as holistic:
    
    for class_dir in class_folders:
        class_name = class_dir.name
        print(f"================== Processing Class: {class_name} ==================")
        
        video_dir_for_class = class_dir
        landmark_dir_for_class = BASE_LANDMARK_PATH / class_name
        
        os.makedirs(landmark_dir_for_class, exist_ok=True)
        process_video(video_dir_for_class, landmark_dir_for_class, holistic)
        
        try:
            file_list = [f for f in os.listdir(landmark_dir_for_class) if f.endswith(".npy")]
            if file_list:
                frame_counts = [np.load(os.path.join(landmark_dir_for_class, f)).shape[0] for f in file_list]
                average_frame_count = np.mean(frame_counts)
                class_averages[class_name] = average_frame_count
                print(f"✅ Finished processing class '{class_name}' - Avg frames: {average_frame_count:.1f}\n")
        except Exception as e:
            print(f"❌ Error calculating average for '{class_name}': {e}\n")

print("========================================================")
print("                    📊 FINAL SUMMARY                    ")
print("========================================================")
for class_name, avg_frames in sorted(class_averages.items()):
    print(f"  ▶️  Class '{class_name}': {avg_frames:.2f} average frames")
