In [61]:
# In a notebook cell
# !pip install -r requirements.txt

In [62]:
# Import Library 

import os
import json
import cv2
import mediapipe as mp
import pandas as pd
import numpy as np
from datasets import load_dataset
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
import gc
import time
from pathlib import Path
from collections import Counter
from typing import List, Dict 

In [63]:
def get_next_version_dir(base="landmarks"):
    Path(base).mkdir(exist_ok=True)
    existing_versions = [
        int(d.name[1:]) for d in Path(base).iterdir()
        if d.is_dir() and d.name.startswith("v") and d.name[1:].isdigit()
    ]
    next_version = max(existing_versions + [0]) + 1
    version_path = Path(base) / f"v{next_version}"
    small_dir = version_path / "small"
    full_dir = version_path / "full"
    small_dir.mkdir(parents=True, exist_ok=True)
    full_dir.mkdir(parents=True, exist_ok=True)
    return {
        "version": next_version,
        "small_dir": small_dir,
        "full_dir": full_dir
    }


In [64]:
# Initialize MediaPipe Face Mesh globally
mp_face_mesh = mp.solutions.face_mesh
face_mesh = mp_face_mesh.FaceMesh(
    static_image_mode=True,
    max_num_faces=1,
    refine_landmarks=True,
    min_detection_confidence=0.5
)

I0000 00:00:1749098791.825257 3520520 gl_context.cc:369] GL version: 2.1 (2.1 Metal - 89.4), renderer: Apple M4 Pro
W0000 00:00:1749098791.826526 3617385 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.


In [65]:
def prepare_createml_dataset(dataset, output_dir="createml_data"):
    """
    Create CreateML dataset structure and save images for both train and test separately.
    Returns paths to train and test directories.
    """
    print("🚀 Starting CreateML dataset preparation...")
    
    os.makedirs(output_dir, exist_ok=True)
    
    # Create separate directories for train and test
    train_dir = os.path.join(output_dir, "train")
    test_dir = os.path.join(output_dir, "test")
    os.makedirs(train_dir, exist_ok=True)
    os.makedirs(test_dir, exist_ok=True)
    
    # Create label subdirectories
    for split_dir in [train_dir, test_dir]:
        for label_name in ['awake', 'drowsy']:
            label_dir = os.path.join(split_dir, label_name)
            os.makedirs(label_dir, exist_ok=True)
    
    train_paths = []
    test_paths = []
    
    # Process train split
    print("💾 Saving train images...")
    for idx, item in enumerate(tqdm(dataset['train'], desc="Train images")):
        label_name = 'awake' if item['label'] == 0 else 'drowsy'
        filename = f"train_{label_name}_{idx:05d}.jpg"
        filepath = os.path.join(train_dir, label_name, filename)
        item['image'].save(filepath, 'JPEG', quality=95)
        train_paths.append(filepath)
    
    # Process test split
    print("💾 Saving test images...")
    for idx, item in enumerate(tqdm(dataset['test'], desc="Test images")):
        label_name = 'awake' if item['label'] == 0 else 'drowsy'
        filename = f"test_{label_name}_{idx:05d}.jpg"
        filepath = os.path.join(test_dir, label_name, filename)
        item['image'].save(filepath, 'JPEG', quality=95)
        test_paths.append(filepath)
    
    print(f"✅ Dataset preparation complete!")
    print(f"📊 Train images: {len(train_paths)}, Test images: {len(test_paths)}")
    
    return train_paths, test_paths

W0000 00:00:1749098791.833322 3617382 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.


In [66]:
def extract_landmarks(image_path):
    """Extract 468 facial landmarks from a single image using MediaPipe Face Mesh."""
    try:
        image = cv2.imread(image_path)
        if image is None:
            return None
        
        # Resize for faster processing
        image = cv2.resize(image, (640, 480))
        rgb_image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        results = face_mesh.process(rgb_image)
        
        if results.multi_face_landmarks:
            landmarks = []
            for lm in results.multi_face_landmarks[0].landmark:
                landmarks.append({'x': lm.x, 'y': lm.y, 'z': lm.z})
            
            # Clean up memory
            del image, rgb_image, results
            gc.collect()
            
            return landmarks
        
        # Clean up even when no face detected
        del image, rgb_image, results
        gc.collect()
        return None
        
    except Exception as e:
        print(f"❌ Error processing {image_path}: {e}")
        gc.collect()
        return None

In [67]:
class LandmarkProcessor:
    """Processes batches of images for landmark extraction."""
    
    def __init__(self, extract_func):
        self.extract_func = extract_func
    
    def process_batch(self, image_paths_batch: List[str]) -> List[Dict]:
        """Process a batch of images and return landmark data."""
        batch_results = []
        for image_path in image_paths_batch:
            try:
                landmarks = self.extract_func(image_path)
                if landmarks:
                    # Determine label from path
                    label = 'drowsy' if 'drowsy' in image_path.lower() else 'awake'
                    batch_results.append({
                        'image_path': image_path,
                        'landmarks': landmarks,
                        'label': label
                    })
            except Exception as e:
                print(f"⚠️ Error processing {image_path}: {e}")
        return batch_results

In [68]:
def process_with_threading(image_paths, extract_landmarks_func, batch_size=200, num_threads=8, phase_name=""):
    """Process images with multithreading and return landmarks + DataFrame."""
    print(f"🧵 Starting {phase_name} processing:")
    print(f"   📊 Images: {len(image_paths)}, Batch size: {batch_size}, Threads: {num_threads}")
    
    # Create batches
    batches = [image_paths[i:i + batch_size] for i in range(0, len(image_paths), batch_size)]
    print(f"📦 Created {len(batches)} batches")
    
    start_time = time.time()
    all_landmarks = []
    
    # Process with ThreadPoolExecutor
    with ThreadPoolExecutor(max_workers=num_threads) as executor:
        processor = LandmarkProcessor(extract_landmarks_func)
        future_to_idx = {
            executor.submit(processor.process_batch, batch): idx
            for idx, batch in enumerate(batches)
        }
        
        # Process results with progress bar
        for future in tqdm(as_completed(future_to_idx), total=len(batches), desc=f"{phase_name} batches"):
            batch_idx = future_to_idx[future]
            try:
                batch_result = future.result()
                all_landmarks.extend(batch_result)
            except Exception as e:
                print(f"❌ Batch {batch_idx+1} failed: {e}")
    
    total_time = time.time() - start_time
    print(f"✅ {phase_name} processing complete!")
    print(f"   📊 Processed: {len(all_landmarks)}/{len(image_paths)} images")
    print(f"   ⏱️ Time: {total_time:.2f}s, Speed: {len(all_landmarks)/total_time:.2f} images/sec")
    
    # Convert to DataFrame
    csv_rows = []
    for item in all_landmarks:
        row = {'image_path': item['image_path'], 'label': item['label']}
        
        # Add landmark coordinates
        for i, lm in enumerate(item['landmarks']):
            row[f'landmark_{i}_x'] = lm['x']
            row[f'landmark_{i}_y'] = lm['y']
            row[f'landmark_{i}_z'] = lm['z']
        
        # Add computed features (EAR, MAR)
        if len(item['landmarks']) >= 468:
            # Eye Aspect Ratio calculation
            left_eye_idx = [362, 385, 387, 263, 373, 380]
            right_eye_idx = [33, 160, 158, 133, 153, 144]
            
            def calculate_ear(eye_points):
                try:
                    landmarks = item['landmarks']
                    v1 = np.linalg.norm(np.array([
                        landmarks[eye_points[1]]['x'] - landmarks[eye_points[5]]['x'],
                        landmarks[eye_points[1]]['y'] - landmarks[eye_points[5]]['y']
                    ]))
                    v2 = np.linalg.norm(np.array([
                        landmarks[eye_points[2]]['x'] - landmarks[eye_points[4]]['x'],
                        landmarks[eye_points[2]]['y'] - landmarks[eye_points[4]]['y']
                    ]))
                    h = np.linalg.norm(np.array([
                        landmarks[eye_points[0]]['x'] - landmarks[eye_points[3]]['x'],
                        landmarks[eye_points[0]]['y'] - landmarks[eye_points[3]]['y']
                    ]))
                    return (v1 + v2) / (2.0 * h) if h > 0 else 0.0
                except:
                    return 0.0
            
            left_ear = calculate_ear(left_eye_idx)
            right_ear = calculate_ear(right_eye_idx)
            row['left_eye_ear'] = left_ear
            row['right_eye_ear'] = right_ear
            row['avg_eye_ear'] = (left_ear + right_ear) / 2.0
            
            # Mouth Aspect Ratio
            mouth_idx = [61, 84, 17, 314, 405, 320]
            try:
                landmarks = item['landmarks']
                v1 = np.linalg.norm(np.array([
                    landmarks[mouth_idx[1]]['x'] - landmarks[mouth_idx[5]]['x'],
                    landmarks[mouth_idx[1]]['y'] - landmarks[mouth_idx[5]]['y']
                ]))
                v2 = np.linalg.norm(np.array([
                    landmarks[mouth_idx[2]]['x'] - landmarks[mouth_idx[4]]['x'],
                    landmarks[mouth_idx[2]]['y'] - landmarks[mouth_idx[4]]['y']
                ]))
                h_m = np.linalg.norm(np.array([
                    landmarks[mouth_idx[0]]['x'] - landmarks[mouth_idx[3]]['x'],
                    landmarks[mouth_idx[0]]['y'] - landmarks[mouth_idx[3]]['y']
                ]))
                row['mouth_aspect_ratio'] = (v1 + v2) / (2.0 * h_m) if h_m > 0 else 0.0
            except:
                row['mouth_aspect_ratio'] = 0.0
        
        csv_rows.append(row)
    
    landmark_df = pd.DataFrame(csv_rows)
    print(f"🔄 Created DataFrame: {landmark_df.shape[0]} rows, {landmark_df.shape[1]} columns")
    
    return all_landmarks, landmark_df


In [69]:
def process_separate_datasets(
    train_paths, test_paths, extract_landmarks_func, 
    small_batch_size=5, full_batch_size=200, num_threads=8,
    csv_train_path="landmarks_train.csv", csv_test_path="landmarks_test.csv",
    json_train_path="landmarks_train.json", json_test_path="landmarks_test.json"
):
    """Process train and test datasets completely separately."""
    print("🚀 Starting separate train/test landmark processing...")

    # Phase 1: Small batch validation
    print("🧪 Testing pipeline with small batch...")
    small_paths = train_paths[:small_batch_size]
    small_landmarks, small_df = process_with_threading(
        small_paths, extract_landmarks_func,
        batch_size=small_batch_size,
        num_threads=min(2, num_threads),
        phase_name="VALIDATION"
    )

    if not small_landmarks:
        print("❌ Pipeline validation failed, exiting.")
        return None

    print(f"✅ Pipeline validated with {len(small_landmarks)} images")
    cont = input("Proceed with full dataset processing? (y/n): ").strip().lower()
    if cont != 'y':
        print("⏹️ Aborting after validation.")
        return None

    # Phase 2: Process TRAINING dataset
    print("🔄 Processing TRAINING dataset...")
    train_landmarks, train_df = process_with_threading(
        train_paths, extract_landmarks_func,
        batch_size=full_batch_size,
        num_threads=num_threads,
        phase_name="TRAIN"
    )

    if train_landmarks:
        os.makedirs(os.path.dirname(csv_train_path), exist_ok=True)
        with open(json_train_path, 'w') as f:
            json.dump(train_landmarks, f, indent=2)
        train_df.to_csv(csv_train_path, index=False)
        print(f"💾 Training data saved: {csv_train_path} ({len(train_df)} samples)")
        print(f"💾 Raw landmarks backup saved: {json_train_path}")
        
        train_labels = train_df['label'].value_counts()
        print(f"   📊 Train label distribution: {train_labels.to_dict()}")

    # Phase 3: Process TESTING dataset
    print("🔄 Processing TESTING dataset...")
    test_landmarks, test_df = process_with_threading(
        test_paths, extract_landmarks_func,
        batch_size=full_batch_size,
        num_threads=num_threads,
        phase_name="TEST"
    )

    if test_landmarks:
        os.makedirs(os.path.dirname(csv_test_path), exist_ok=True)
        with open(json_test_path, 'w') as f:
            json.dump(test_landmarks, f, indent=2)
        test_df.to_csv(csv_test_path, index=False)
        print(f"💾 Testing data saved: {csv_test_path} ({len(test_df)} samples)")
        print(f"💾 Raw landmarks backup saved: {json_test_path}")

        test_labels = test_df['label'].value_counts()
        print(f"   📊 Test label distribution: {test_labels.to_dict()}")

    print("✅ Separate train/test processing complete!")
    return {
        'train_samples': len(train_df) if train_landmarks else 0,
        'test_samples': len(test_df) if test_landmarks else 0
    }


In [None]:
if __name__ == "__main__":
    print("🚀 Starting Driver Drowsiness Landmark Extraction...")

    # Load dataset
    print("📥 Loading dataset from Hugging Face...")
    dataset = load_dataset("akahana/Driver-Drowsiness-Dataset")
    print(f"📊 Dataset loaded - Train: {len(dataset['train'])}, Test: {len(dataset['test'])}")

    # Create both full and small versions
    print("📂 Creating small and full dataset splits...")
    small_dataset = {
        'train': dataset['train'].select(range(min(200, len(dataset['train'])))),
        'test': dataset['test'].select(range(min(100, len(dataset['test']))))
    }
    full_dataset = {
        'train': dataset['train'],
        'test': dataset['test']
    }

    # Get versioned output directories
    version_info = get_next_version_dir()
    print(f"📁 Using version v{version_info['version']}")

    print("💾 Preparing image files for SMALL dataset...")
    small_train_paths, small_test_paths = prepare_createml_dataset(
        small_dataset,
        output_dir=str(version_info["small_dir"])
    )

    print("💾 Preparing image files for FULL dataset...")
    full_train_paths, full_test_paths = prepare_createml_dataset(
        full_dataset,
        output_dir=str(version_info["full_dir"])
    )

    # Settings info
    print("\n🔧 Optimized settings:")
    print("   - Small batch size: 5")
    print("   - Full batch size: 200")
    print("   - Threads: 8")
    print(f"   - Output directory: {version_info['full_dir'].parent}/v{version_info['version']}")

    # Process SMALL dataset
    print("\n🔎 Processing SMALL dataset...")
    results_small = process_separate_datasets(
        small_train_paths, small_test_paths, extract_landmarks,
        small_batch_size=5,
        full_batch_size=200,
        num_threads=8,
        csv_train_path=version_info["small_dir"] / "landmarks_train.csv",
        csv_test_path=version_info["small_dir"] / "landmarks_test.csv",
        json_train_path=version_info["small_dir"] / "landmarks_train.json",
        json_test_path=version_info["small_dir"] / "landmarks_test.json"
    )

    # Process FULL dataset
    print("\n🚀 Processing FULL dataset...")
    results_full = process_separate_datasets(
        full_train_paths, full_test_paths, extract_landmarks,
        small_batch_size=5,
        full_batch_size=200,
        num_threads=8,
        csv_train_path=version_info["full_dir"] / "landmarks_train.csv",
        csv_test_path=version_info["full_dir"] / "landmarks_test.csv",
        json_train_path=version_info["full_dir"] / "landmarks_train.json",
        json_test_path=version_info["full_dir"] / "landmarks_test.json"
    )

    # Final summary for SMALL
    print(f"\n📈 SMALL Dataset Results:")
    print(f"   Training samples: {results_small['train_samples']}")
    print(f"   Testing samples: {results_small['test_samples']}")

    # Final summary for FULL
    print(f"\n📈 FULL Dataset Results:")
    print(f"   Training samples: {results_full['train_samples']}")
    print(f"   Testing samples: {results_full['test_samples']}")
    print(f"   Output: {version_info['full_dir']}")

    print("\n📋 Next Steps:")
    print(f"   1. Use .csv files for CreateML training and testing")
    print(f"   2. .json files are available for backup/analysis")

    # Clean up MediaPipe
    face_mesh.close()
    print("✅ All processing complete!")


ModuleNotFoundError: No module named 'utils'