In [2]:
import cv2
import os
import mediapipe as mp
import numpy as np

# Define paths
dataset_path = "D:/Downloads/DL LIP READING/lip-reading-dataset/data"
video_folder = os.path.join(dataset_path, "s1")
alignment_folder = os.path.join(dataset_path, "alignments/s1")
output_frames_path = "output_frames"

# Ensure output folder exists
if not os.path.exists(output_frames_path):
    os.makedirs(output_frames_path)

# Mediapipe setup for face and lip detection
mp_face_mesh = mp.solutions.face_mesh
face_mesh = mp_face_mesh.FaceMesh(min_detection_confidence=0.5, min_tracking_confidence=0.5)

def extract_lip_region(frame):
    """Detect and extract the lip region using Mediapipe."""
    rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    results = face_mesh.process(rgb_frame)

    if results.multi_face_landmarks:
        for face_landmarks in results.multi_face_landmarks:
            lip_points = [61, 291, 185, 40, 39, 37, 0, 17, 13, 14, 178]
            h, w, _ = frame.shape
            x_min, y_min, x_max, y_max = w, h, 0, 0

            for idx in lip_points:
                x = int(face_landmarks.landmark[idx].x * w)
                y = int(face_landmarks.landmark[idx].y * h)
                x_min, y_min = min(x_min, x), min(y_min, y)
                x_max, y_max = max(x_max, x), max(y_max, y)

            margin = 5
            x_min, y_min = max(x_min - margin, 0), max(y_min - margin, 0)
            x_max, y_max = min(x_max + margin, w), min(y_max + margin, h)
            return frame[y_min:y_max, x_min:x_max]
    return None

def parse_alignment_file(video_name):
    """Parse corresponding .align file for a given video."""
    align_file_path = os.path.join(alignment_folder, f"{video_name}.align")

    if not os.path.exists(align_file_path):
        print(f"Alignment file not found for {video_name}")
        return []
    
    with open(align_file_path, "r") as f:
        alignments = f.readlines()

    words = [line.strip().split()[-1] for line in alignments]

    if not words:
        print(f"No words extracted from {video_name}.align")
    else:
        print(f"Words in {video_name}: {words}")

    return words

def process_videos():
    """Process all .mpg videos in the dataset with correct frame alignment."""
    for video_file in os.listdir(video_folder):
        if video_file.endswith(".mpg"):
            video_name = os.path.splitext(video_file)[0]
            words = parse_alignment_file(video_name)
            
            if not words:
                print(f"Skipping {video_file}, no valid labels found.")
                continue

            video_output_folder = os.path.join(output_frames_path, video_name)
            os.makedirs(video_output_folder, exist_ok=True)

            cap = cv2.VideoCapture(os.path.join(video_folder, video_file))
            frame_count = 0
            frames_per_word = []

            while cap.isOpened():
                ret, frame = cap.read()
                if not ret:
                    break
                
                lip_frame = extract_lip_region(frame)
                if lip_frame is not None:
                    lip_frame_gray = cv2.cvtColor(lip_frame, cv2.COLOR_BGR2GRAY)
                    lip_frame_resized = cv2.resize(lip_frame_gray, (64, 64))
                    frames_per_word.append(lip_frame_resized)
                
                frame_count += 1
            
            cap.release()

            # Ensure we have extracted frames
            if not frames_per_word:
                print(f"No frames extracted from {video_file}. Check preprocessing.")
                continue

            # Split frames equally per word
            frames_per_word_count = max(1, len(frames_per_word) // len(words))
            
            for i, word in enumerate(words):
                start_idx = i * frames_per_word_count
                end_idx = start_idx + frames_per_word_count
                for j, frame in enumerate(frames_per_word[start_idx:end_idx]):
                    output_filename = os.path.join(video_output_folder, f"{word}_frame_{j}.jpg")
                    cv2.imwrite(output_filename, frame, [cv2.IMWRITE_JPEG_QUALITY, 85])

            print(f"Processed {video_file}: {len(words)} words, {frame_count} frames extracted.")

if __name__ == "__main__":
    process_videos()
    print("Optimized preprocessing completed successfully!")


NameError: name 'core' is not defined

In [1]:
import cv2
import os
import mediapipe as mp
import numpy as np

# Define paths
dataset_path = "D:/Downloads/DL LIP READING/lip-reading-dataset/data"
video_folder = os.path.join(dataset_path, "s1")
alignment_folder = os.path.join(dataset_path, "alignments/s1")
output_frames_path = "output_frames"

# Ensure output folder exists
if not os.path.exists(output_frames_path):
    os.makedirs(output_frames_path)

# Mediapipe setup for face and lip detection
mp_face_mesh = mp.solutions.face_mesh
face_mesh = mp_face_mesh.FaceMesh(min_detection_confidence=0.5, min_tracking_confidence=0.5)

# CLAHE for contrast enhancement
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8))

# Lip landmark points (both inner and outer lips)
lip_points = [61, 146, 91, 181, 84, 17, 314, 405, 321, 375, 291, 
              78, 95, 88, 178, 87, 14, 317, 402, 318, 324, 308]

def extract_lip_region(frame):
    """Extract full lip region for better feature learning."""
    rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    results = face_mesh.process(rgb_frame)

    if results.multi_face_landmarks:
        for face_landmarks in results.multi_face_landmarks:
            # Include full lip region
            lip_points = [
                61, 146, 91, 181, 84, 17, 314, 405, 321, 375, 
                291, 409, 185, 40, 39, 37, 0, 267, 269, 270, 409
            ]
            h, w, _ = frame.shape
            x_min, y_min, x_max, y_max = w, h, 0, 0

            for idx in lip_points:
                x = int(face_landmarks.landmark[idx].x * w)
                y = int(face_landmarks.landmark[idx].y * h)
                x_min, y_min = min(x_min, x), min(y_min, y)
                x_max, y_max = max(x_max, x), max(y_max, y)

            margin = 10  # Slightly increase margin
            x_min, y_min = max(x_min - margin, 0), max(y_min - margin, 0)
            x_max, y_max = min(x_max + margin, w), min(y_max + margin, h)

            return frame[y_min:y_max, x_min:x_max]
    return None


def parse_alignment_file(video_name):
    """Parse corresponding .align file for a given video."""
    align_file_path = os.path.join(alignment_folder, f"{video_name}.align")

    if not os.path.exists(align_file_path):
        print(f"Alignment file not found for {video_name}")
        return []
    
    with open(align_file_path, "r") as f:
        alignments = f.readlines()

    words = [line.strip().split()[-1] for line in alignments]

    if not words:
        print(f"No words extracted from {video_name}.align")
    else:
        print(f"Words in {video_name}: {words}")

    return words

import random

def augment_frame(frame):
    """Apply slight random augmentation to increase model robustness."""
    if random.random() > 0.5:  # Random horizontal flip
        frame = cv2.flip(frame, 1)
    
    if random.random() > 0.7:  # Random brightness
        factor = random.uniform(0.7, 1.3)
        frame = np.clip(frame * factor, 0, 255).astype(np.uint8)
    
    return frame



# Dictionary to store unique characters
unique_chars = set()

def process_videos():
    """Process all .mpg videos in the dataset with correct frame alignment."""
    global unique_chars  # Use the global set to collect unique characters

    for video_file in os.listdir(video_folder):
        if video_file.endswith(".mpg"):
            video_name = os.path.splitext(video_file)[0]
            words = parse_alignment_file(video_name)
            
            if not words:
                print(f"Skipping {video_file}, no valid labels found.")
                continue

            # Collect characters from words
            for word in words:
                unique_chars.update(word)  # Add all characters to the set

            video_output_folder = os.path.join(output_frames_path, video_name)
            os.makedirs(video_output_folder, exist_ok=True)

            cap = cv2.VideoCapture(os.path.join(video_folder, video_file))
            frame_count = 0
            frames_per_word = []
            prev_frame = None  # Store previous frame for motion detection

            while cap.isOpened():
                ret, frame = cap.read()
                if not ret:
                    break
                
                lip_frame = extract_lip_region(frame)
                if lip_frame is not None:
                    lip_frame_gray = cv2.cvtColor(lip_frame, cv2.COLOR_BGR2GRAY)
                    lip_frame_gray = clahe.apply(lip_frame_gray)  # Contrast enhancement
                    lip_frame_resized = cv2.resize(lip_frame_gray, (128, 128))

                    lip_frame_resized = augment_frame(lip_frame_resized)
                    # Motion filtering using frame difference
                    if prev_frame is not None:
                        diff = cv2.absdiff(lip_frame_resized, prev_frame)
                        motion = np.sum(diff)
                        if motion < 500:  # Skip static frames
                            continue
                    
                    prev_frame = lip_frame_resized
                    frames_per_word.append(lip_frame_resized)
                
                frame_count += 1
            
            cap.release()

            # Ensure we have extracted frames
            if not frames_per_word:
                print(f"No frames extracted from {video_file}. Check preprocessing.")
                continue

            # Verify frame count with words
            if len(frames_per_word) < len(words):
                print(f"Warning: Fewer frames ({len(frames_per_word)}) than words ({len(words)}) in {video_file}")

            # Frame sampling strategy: Equidistant selection
            step = max(1, len(frames_per_word) // len(words))
            selected_frames = frames_per_word[::step]

            # Save augmented frames
            for i, word in enumerate(words):
                for j, frame in enumerate(selected_frames):
                    output_filename = os.path.join(video_output_folder, f"{word}_frame_{j}.jpg")
                    cv2.imwrite(output_filename, augment_frame(frame), [cv2.IMWRITE_JPEG_QUALITY, 85])

            print(f"Processed {video_file}: {len(words)} words, {frame_count} frames analyzed.")

if __name__ == "__main__":
    process_videos()

    # Generate character mappings AFTER processing all videos
    unique_chars = sorted(unique_chars)  # Sort characters for consistency
    char_to_index = {char: i for i, char in enumerate(unique_chars, start=1)}
    index_to_char = {i: char for char, i in char_to_index.items()}

    print("Character to Index Mapping:", char_to_index)
    print("Index to Character Mapping:", index_to_char)
    print("Optimized preprocessing completed successfully!")



KeyboardInterrupt: 