<a href="https://colab.research.google.com/github/Buddika-Kasun/ML/blob/main/Dataset_Creation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Install Dependencies
!pip install mediapipe opencv-python pandas numpy tqdm
!apt update && apt install -y ffmpeg

In [None]:
import cv2
import mediapipe as mp
import numpy as np
import pandas as pd
import os
import json
from tqdm import tqdm
import shutil

In [None]:
# Mount Google Drive (most reliable for large files)
from google.colab import drive
drive.mount('/content/drive')

def copy_from_drive():
    """Copy your structured folder from Google Drive"""
    # Your folder structure in Google Drive
    drive_path = '/content/drive/MyDrive/SLSL_Dataset'  # Adjust this path

    if os.path.exists(drive_path):
        # Copy entire structure
        import shutil
        shutil.copytree(drive_path, '/content/raw_videos', dirs_exist_ok=True)
        print("✅ Copied entire structure from Google Drive!")

        # Show what was copied
        sentences = os.listdir('/content/raw_videos')
        print(f"📁 Sentence folders copied: {len(sentences)}")
        for sentence in sentences:
            sentence_path = f"/content/raw_videos/{sentence}"
            video_count = len([f for f in os.listdir(sentence_path) if f.endswith('.mp4')])
            print(f"   {sentence}: {video_count} videos")
    else:
        print("❌ SLSL_Dataset folder not found in Google Drive")
        print("💡 Please make sure your folder is at: /content/drive/MyDrive/SLSL_Dataset")

# Mount and copy
drive.mount('/content/drive')
copy_from_drive()

In [None]:
# Initialize MediaPipe Holistic
mp_holistic = mp.solutions.holistic
mp_drawing = mp.solutions.drawing_utils

In [None]:
def setup_mediapipe():
    """Setup MediaPipe Holistic with optimal settings for SLSL"""
    holistic = mp_holistic.Holistic(
        static_image_mode=False,        # False for videos
        model_complexity=1,             # 1 for balanced accuracy/speed
        smooth_landmarks=True,          # Temporal smoothing
        enable_segmentation=False,      # Not needed for landmarks
        smooth_segmentation=True,
        refine_face_landmarks=True,     # CRITICAL for accurate lip landmarks
        min_detection_confidence=0.5,
        min_tracking_confidence=0.5
    )
    return holistic

In [None]:
holistic = setup_mediapipe()
print("MediaPipe Holistic initialized!")

In [None]:
def extract_frame_landmarks(frame, holistic_model):
    """
    Extract hand, pose, and lip landmarks from a single frame
    Returns dictionary with all landmarks
    """
    # Convert BGR to RGB
    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    frame_rgb.flags.writeable = False

    # Process with MediaPipe
    results = holistic_model.process(frame_rgb)

    landmarks_dict = {
        'left_hand': None,
        'right_hand': None,
        'pose': None,
        'face': None,
        'lip_roi': None,
        'timestamp': None
    }

    # Extract Left Hand Landmarks (21 points, 3 coordinates each = 63 values)
    if results.left_hand_landmarks:
        left_hand = []
        for landmark in results.left_hand_landmarks.landmark:
            left_hand.extend([landmark.x, landmark.y, landmark.z, landmark.visibility])
        landmarks_dict['left_hand'] = left_hand

    # Extract Right Hand Landmarks
    if results.right_hand_landmarks:
        right_hand = []
        for landmark in results.right_hand_landmarks.landmark:
            right_hand.extend([landmark.x, landmark.y, landmark.z, landmark.visibility])
        landmarks_dict['right_hand'] = right_hand

    # Extract Pose Landmarks (Upper body - 25 points)
    if results.pose_landmarks:
        pose = []
        upper_body_indices = [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24]  # Upper body
        for i in upper_body_indices:
            if i < len(results.pose_landmarks.landmark):
                landmark = results.pose_landmarks.landmark[i]
                pose.extend([landmark.x, landmark.y, landmark.z, landmark.visibility])
        landmarks_dict['pose'] = pose

    # Extract Lip Landmarks
    if results.face_landmarks:
        # Lip landmark indices from MediaPipe Face Mesh (simplified set)
        lip_indices = [
            # Outer lips
            61, 84, 314, 17, 87, 178, 88, 95, 78, 62, 96, 89,
            146, 91, 181, 76, 184, 74, 183, 42,
            # Inner lips
            13, 82, 81, 80, 191, 78, 312, 311, 310, 415, 308, 324,
            318, 402, 317, 14, 87, 178
        ]

        lip_landmarks = []
        face_landmarks = []

        for i, landmark in enumerate(results.face_landmarks.landmark):
            face_landmarks.extend([landmark.x, landmark.y, landmark.z, landmark.visibility])
            if i in lip_indices:
                lip_landmarks.extend([landmark.x, landmark.y, landmark.z, landmark.visibility])

        landmarks_dict['face'] = face_landmarks
        landmarks_dict['lip_roi'] = lip_landmarks

    return landmarks_dict

In [None]:
def process_single_video(video_path, holistic_model, max_frames=None):
    """
    Process a single video and extract landmarks from all frames
    """
    cap = cv2.VideoCapture(video_path)
    frames_data = []
    frame_count = 0

    # Get video properties
    fps = cap.get(cv2.CAP_PROP_FPS)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    print(f"📹 Processing: {os.path.basename(video_path)}")
    print(f"   Frames: {total_frames}, FPS: {fps}")

    with tqdm(total=total_frames, desc="Extracting landmarks") as pbar:
        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break

            if max_frames and frame_count >= max_frames:
                break

            # Extract landmarks
            landmarks = extract_frame_landmarks(frame, holistic_model)
            landmarks['timestamp'] = frame_count / fps  # Add timestamp
            frames_data.append(landmarks)

            frame_count += 1
            pbar.update(1)

    cap.release()
    print(f"   ✅ Extracted {len(frames_data)} frames")
    return frames_data

In [None]:
def process_sentence_based_dataset(raw_videos_root='/content/raw_videos',
                                 output_folder='/content/landmarks_data',
                                 test_mode=False):
    """
    Main function to process the entire sentence-based dataset
    """
    metadata = []

    print("🔍 Scanning for sentence folders...")

    # Find all sentence folders
    sentence_folders = []
    for item in os.listdir(raw_videos_root):
        item_path = os.path.join(raw_videos_root, item)
        if os.path.isdir(item_path):
            sentence_folders.append(item)

    print(f"📁 Found {len(sentence_folders)} sentence folders: {sentence_folders}")

    # Process each sentence folder
    for sentence in sentence_folders:
        sentence_path = os.path.join(raw_videos_root, sentence)
        video_files = [f for f in os.listdir(sentence_path)
                      if f.endswith(('.mp4', '.avi', '.mov', '.mkv'))]

        print(f"\n🎬 Processing sentence: '{sentence}'")
        print(f"   Found {len(video_files)} videos")

        # Test mode: process only 1 video per sentence
        if test_mode:
            video_files = video_files[:1]
            print(f"   TEST MODE: Processing only 1 video")

        for video_file in video_files:
            video_path = os.path.join(sentence_path, video_file)

            # Parse filename (expected: signer_01_rep_1.mp4)
            filename_parts = video_file.replace('.mp4', '').split('_')
            signer_id = filename_parts[1] if len(filename_parts) > 1 else "unknown"
            rep_number = filename_parts[3] if len(filename_parts) > 3 else "1"

            try:
                # Process video (limit frames in test mode)
                max_frames = 50 if test_mode else None
                landmarks_sequence = process_single_video(video_path, holistic, max_frames)

                if landmarks_sequence:
                    # Create output filename
                    output_filename = f"{sentence}_signer_{signer_id}_rep_{rep_number}.npy"
                    output_path = os.path.join(output_folder, output_filename)

                    # Save landmarks
                    np.save(output_path, landmarks_sequence)

                    # Calculate landmark statistics
                    left_hand_frames = sum(1 for frame in landmarks_sequence if frame['left_hand'] is not None)
                    right_hand_frames = sum(1 for frame in landmarks_sequence if frame['right_hand'] is not None)
                    lip_frames = sum(1 for frame in landmarks_sequence if frame['lip_roi'] is not None)

                    # Add to metadata
                    metadata.append({
                        'landmarks_file': output_filename,
                        'sentence': sentence,
                        'signer_id': signer_id,
                        'rep_number': rep_number,
                        'original_video': video_file,
                        'video_path': video_path,
                        'total_frames': len(landmarks_sequence),
                        'left_hand_frames': left_hand_frames,
                        'right_hand_frames': right_hand_frames,
                        'lip_frames': lip_frames,
                        'left_hand_coverage': (left_hand_frames / len(landmarks_sequence)) * 100,
                        'right_hand_coverage': (right_hand_frames / len(landmarks_sequence)) * 100,
                        'lip_coverage': (lip_frames / len(landmarks_sequence)) * 100,
                        'success': True
                    })

                    print(f"   ✅ Saved: {output_filename}")
                else:
                    print(f"   ❌ No landmarks extracted: {video_file}")

            except Exception as e:
                print(f"   ❌ Error processing {video_file}: {str(e)}")
                metadata.append({
                    'landmarks_file': 'FAILED',
                    'sentence': sentence,
                    'signer_id': signer_id,
                    'rep_number': rep_number,
                    'original_video': video_file,
                    'error': str(e),
                    'success': False
                })

    # Save metadata
    if metadata:
        metadata_df = pd.DataFrame(metadata)
        metadata_path = '/content/metadata/sentence_dataset_metadata.csv'
        metadata_df.to_csv(metadata_path, index=False)

        # Save sentence mapping
        unique_sentences = metadata_df['sentence'].unique()
        sentence_mapping = {sentence: idx for idx, sentence in enumerate(unique_sentences)}

        with open('/content/metadata/sentence_mapping.json', 'w') as f:
            json.dump(sentence_mapping, f, indent=2)

        print(f"\n📊 Metadata saved: {metadata_path}")
        print(f"📝 Sentence mapping saved: /content/metadata/sentence_mapping.json")

        return metadata_df, sentence_mapping
    else:
        print("❌ No videos processed successfully!")
        return None, None

In [None]:
def analyze_sentence_dataset(metadata_df):
    """Comprehensive analysis of the sentence-based dataset"""
    print("📊 SENTENCE DATASET ANALYSIS REPORT")
    print("=" * 60)

    # Basic statistics
    total_videos = len(metadata_df)
    successful_videos = metadata_df['success'].sum()

    print(f"\n📈 BASIC STATISTICS:")
    print(f"   Total videos processed: {total_videos}")
    print(f"   Successful processing: {successful_videos}")
    print(f"   Success rate: {(successful_videos/total_videos)*100:.1f}%")

    # By sentence
    print(f"\n📝 BY SENTENCE:")
    sentence_stats = metadata_df[metadata_df['success']]['sentence'].value_counts()
    for sentence, count in sentence_stats.items():
        print(f"   '{sentence}': {count} videos")

    # By signer
    print(f"\n👥 BY SIGNER:")
    signer_stats = metadata_df[metadata_df['success']]['signer_id'].value_counts()
    for signer, count in signer_stats.items():
        print(f"   Signer {signer}: {count} videos")

    # Landmark coverage
    if successful_videos > 0:
        successful_df = metadata_df[metadata_df['success']]
        avg_left_hand = successful_df['left_hand_coverage'].mean()
        avg_right_hand = successful_df['right_hand_coverage'].mean()
        avg_lips = successful_df['lip_coverage'].mean()

        print(f"\n🖐️ AVERAGE LANDMARK COVERAGE:")
        print(f"   Left Hand: {avg_left_hand:.1f}%")
        print(f"   Right Hand: {avg_right_hand:.1f}%")
        print(f"   Lip Landmarks: {avg_lips:.1f}%")

    # Frame statistics
    if successful_videos > 0:
        avg_frames = successful_df['total_frames'].mean()
        total_frames = successful_df['total_frames'].sum()
        print(f"\n🎞️ FRAME STATISTICS:")
        print(f"   Average frames per video: {avg_frames:.0f}")
        print(f"   Total frames processed: {total_frames}")

def verify_landmark_files(landmarks_folder='/content/landmarks_data'):
    """Verify the created landmark files"""
    print("\n🔍 VERIFYING LANDMARK FILES...")

    landmark_files = [f for f in os.listdir(landmarks_folder) if f.endswith('.npy')]
    print(f"   Found {len(landmark_files)} landmark files")

    if landmark_files:
        # Check first 3 files
        for i, file in enumerate(landmark_files[:3]):
            file_path = os.path.join(landmarks_folder, file)
            data = np.load(file_path, allow_pickle=True)

            print(f"\n   Sample {i+1}: {file}")
            print(f"     Frames: {len(data)}")
            if len(data) > 0:
                print(f"     Keys: {list(data[0].keys())}")
                print(f"     Left hand present: {data[0]['left_hand'] is not None}")
                print(f"     Right hand present: {data[0]['right_hand'] is not None}")
                print(f"     Lip landmarks present: {data[0]['lip_roi'] is not None}")

In [None]:
def main_pipeline(test_mode=True):
    """
    Complete pipeline for sentence-based dataset creation
    Set test_mode=False for full processing
    """
    print("🚀 STARTING SENTENCE-BASED SLSL DATASET CREATION")
    print("=" * 60)

    # Step 1: Setup
    print("\n📋 STEP 1: Environment Setup")
    create_sentence_based_structure()

    # Step 2: Initialize MediaPipe
    print("\n📋 STEP 2: MediaPipe Initialization")
    global holistic
    holistic = setup_mediapipe()

    # Step 3: Process dataset
    print("\n📋 STEP 3: Processing Videos")
    print("💡 Upload your videos to sentence folders in '/content/raw_videos/'")
    print("   Folder structure:")
    print("   raw_videos/")
    print("   ├── where_does_it_hurt/")
    print("   │   ├── signer_01_rep_1.mp4")
    print("   │   └── ...")
    print("   ├── i_have_a_headache/")
    print("   │   └── ...")
    print("   └── ...")

    input("⏰ Press Enter after uploading videos...")

    # Step 4: Process videos
    metadata_df, sentence_mapping = process_sentence_based_dataset(test_mode=test_mode)

    # Step 5: Analyze results
    if metadata_df is not None:
        print("\n📋 STEP 4: Dataset Analysis")
        analyze_sentence_dataset(metadata_df)

        print("\n📋 STEP 5: Verification")
        verify_landmark_files()

        print(f"\n🎉 DATASET CREATION COMPLETE!")
        print(f"📁 Landmarks saved in: /content/landmarks_data/")
        print(f"📊 Metadata saved in: /content/metadata/")

        return metadata_df, sentence_mapping
    else:
        print("❌ Dataset creation failed!")
        return None, None

In [None]:
# 🎯 EXECUTE THE PIPELINE
# For testing (process 1 video per sentence)
metadata, mapping = main_pipeline(test_mode=True)

In [None]:
# For full processing (uncomment when ready)
# metadata, mapping = main_pipeline(test_mode=False)