In [1]:
import os
import sys

# Redirect stdout to /dev/null
original_stdout = sys.stdout
sys.stdout = open(os.devnull, 'w')

for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# Restore stdout
sys.stdout = original_stdout

# print the acknowledgement
print("Finished processing files.")

Finished processing files.


In [None]:
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.layers import Conv3D, LSTM, Dense, Dropout, Bidirectional, MaxPool3D, BatchNormalization, Reshape
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
from glob import glob
import cv2

# -------------------------------------------------------------------
# Data Generator for Lip Reading
# -------------------------------------------------------------------
class LipReadingDataGenerator(tf.keras.utils.Sequence):
    def __init__(self, data_path, alignment_path, batch_size=16, frame_length=75, 
                 image_height=46, image_width=140, video_paths=None, alignment_paths=None, 
                 fixed_vocabulary=None, **kwargs):
        super().__init__(**kwargs)
        self.data_path = data_path
        self.alignment_path = alignment_path
        self.batch_size = batch_size
        self.frame_length = frame_length
        self.image_height = image_height
        self.image_width = image_width

        # Use provided paths or search the directory
        if video_paths is None:
            self.video_paths = sorted(glob(os.path.join(data_path, '*.mpg')))
        else:
            self.video_paths = video_paths
        
        if alignment_paths is None:
            self.alignment_paths = sorted(glob(os.path.join(alignment_path, '*.align')))
        else:
            self.alignment_paths = alignment_paths
        
        print(f"Found {len(self.video_paths)} video files and {len(self.alignment_paths)} alignment files")
        
        # Build vocabulary either from the fixed vocabulary provided or from the data.
        if fixed_vocabulary is not None:
            self.vocabulary = fixed_vocabulary
        else:
            self.vocabulary = self._create_word_vocabulary()
            
        # Create lookup layers for converting between words and numbers.
        self.char_to_num = tf.keras.layers.StringLookup(
            vocabulary=self.vocabulary, oov_token=""
        )
        self.num_to_char = tf.keras.layers.StringLookup(
            vocabulary=self.vocabulary, oov_token="", invert=True
        )

    def _create_word_vocabulary(self):
        words = set()
        print(f"Processing alignment files from: {self.alignment_path}")
        for align_path in self.alignment_paths:
            try:
                with open(align_path, 'r') as f:
                    content = f.read().strip().split()
                    # The alignment file is assumed to have a pattern where every third token (starting at index 2) is a word.
                    words.update([content[i] for i in range(2, len(content), 3)])
            except Exception as e:
                print(f"Error processing {align_path}: {str(e)}")
        # Remove the silence token if present
        words.discard('sil')
        vocabulary = sorted(list(words))
        if not vocabulary:
            print("No words found in alignment files. Using default vocabulary.")
            vocabulary = ['bin', 'blue', 'at', 'f', 'two', 'now']
        print(f"Vocabulary size: {len(vocabulary)}")
        return vocabulary

    def __len__(self):
        return max(1, len(self.video_paths) // self.batch_size)
    
    def _process_video(self, video_path):
        frames = []
        cap = cv2.VideoCapture(video_path)
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            # Convert to grayscale
            gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
            # Crop to the mouth region (hard-coded crop coordinates)
            mouth = gray[190:236, 80:220]
            # Resize the cropped image to the desired size
            mouth = cv2.resize(mouth, (self.image_width, self.image_height))
            frames.append(mouth)
        cap.release()
        frames = np.array(frames, dtype=np.float32)
        # Normalize the frames: zero mean and unit variance
        frames = (frames - frames.mean()) / (frames.std() + 1e-6)
        # Pad if the number of frames is less than frame_length, or trim if more
        if len(frames) < self.frame_length:
            pad_length = self.frame_length - len(frames)
            frames = np.pad(frames, ((0, pad_length), (0, 0), (0, 0)), mode='constant')
        else:
            frames = frames[:self.frame_length]
        return frames
    
    def _process_alignment(self, alignment_path):
        with open(alignment_path, 'r') as f:
            content = f.read().strip().split()
        # Extract every third token starting at index 2 (ignoring 'sil')
        words = [content[i] for i in range(2, len(content), 3) if content[i] != 'sil']
        text = ' '.join(words)
        # Convert words to numerical IDs using the lookup layer
        return self.char_to_num(tf.convert_to_tensor(text.split()))
    
    def __getitem__(self, idx):
        batch_videos = self.video_paths[idx * self.batch_size:(idx + 1) * self.batch_size]
        batch_alignments = self.alignment_paths[idx * self.batch_size:(idx + 1) * self.batch_size]
        
        # Initialize arrays for the batch data.
        X = np.zeros((len(batch_videos), self.frame_length, self.image_height, self.image_width, 1))
        Y = np.zeros((len(batch_videos), len(self.vocabulary)))
        
        for i, (video_path, align_path) in enumerate(zip(batch_videos, batch_alignments)):
            frames = self._process_video(video_path)
            # Add channel dimension (1 for grayscale)
            X[i] = frames.reshape(self.frame_length, self.image_height, self.image_width, 1)
            
            # Process alignment and convert to a multi-hot vector.
            labels = self._process_alignment(align_path)
            # One-hot encode each label and then use reduce_max to combine them into a single vector.
            Y[i] = tf.reduce_max(tf.one_hot(labels, len(self.vocabulary)), axis=0)
        
        return X, Y

# -------------------------------------------------------------------
# Model Architecture
# -------------------------------------------------------------------
def build_model(frame_length, image_height, image_width, vocabulary_size):
    model = Sequential([
        tf.keras.Input(shape=(frame_length, image_height, image_width, 1)),
        # First 3D convolution block
        Conv3D(64, kernel_size=(3, 3, 3), activation='relu'),
        MaxPool3D(pool_size=(1, 2, 2)),
        BatchNormalization(),
        
        # Second 3D convolution block
        Conv3D(128, kernel_size=(3, 3, 3), activation='relu'),
        MaxPool3D(pool_size=(1, 2, 2)),
        BatchNormalization(),
        
        # Third 3D convolution block
        Conv3D(256, kernel_size=(3, 3, 3), activation='relu'),
        MaxPool3D(pool_size=(1, 2, 2)),
        BatchNormalization(),
        
        # Reshape to combine the spatial dimensions into a feature vector per time step.
        Reshape((-1, 256)),
        
        # Temporal modeling with Bidirectional LSTMs
        Bidirectional(LSTM(128, return_sequences=True)),
        Dropout(0.5),
        Bidirectional(LSTM(64)),
        Dropout(0.5),
        
        # Dense layers for classification
        Dense(256, activation='relu'),
        Dropout(0.5),
        Dense(vocabulary_size, activation='softmax')
    ])
    
    return model

# -------------------------------------------------------------------
# Revised Training Routine
# -------------------------------------------------------------------
def train_and_save_model(data_dir, alignment_dir, batch_size=16, epochs=30):
    # -----------------------------------------------------------
    # Step 1: Build the full vocabulary using all available data.
    # -----------------------------------------------------------
    full_data_generator = LipReadingDataGenerator(data_dir, alignment_dir, batch_size=batch_size)
    full_vocabulary = full_data_generator.vocabulary
    
    # Retrieve all file paths (assumes they are in the same order)
    all_video_paths = full_data_generator.video_paths
    all_align_paths = full_data_generator.alignment_paths
    
    total_files = len(all_video_paths)
    print(f"Total number of files: {total_files}")
    
    # -----------------------------------------------------------
    # Step 2: Split the data into training and validation sets.
    # Training: first half of the data.
    # Validation: 70% of the second half.
    # -----------------------------------------------------------
    mid_point = total_files // 2
    train_video_paths = all_video_paths[:mid_point]
    train_align_paths = all_align_paths[:mid_point]
    
    second_half_video_paths = all_video_paths[mid_point:]
    second_half_align_paths = all_align_paths[mid_point:]
    
    # Use 70% of the second half for validation.
    val_count = int(len(second_half_video_paths) * 0.7)
    val_video_paths = second_half_video_paths[:val_count]
    val_align_paths = second_half_align_paths[:val_count]
    
    print(f"Training on {len(train_video_paths)} files.")
    print(f"Validating on {len(val_video_paths)} files.")
    
    # -----------------------------------------------------------
    # Step 3: Create data generators for training and validation.
    # -----------------------------------------------------------
    train_generator = LipReadingDataGenerator(
        data_path=data_dir, 
        alignment_path=alignment_dir, 
        batch_size=batch_size,
        video_paths=train_video_paths,
        alignment_paths=train_align_paths,
        fixed_vocabulary=full_vocabulary
    )
    
    val_generator = LipReadingDataGenerator(
        data_path=data_dir, 
        alignment_path=alignment_dir, 
        batch_size=batch_size,
        video_paths=val_video_paths,
        alignment_paths=val_align_paths,
        fixed_vocabulary=full_vocabulary
    )
    
    # -----------------------------------------------------------
    # Step 4: Build, compile, and train the model.
    # -----------------------------------------------------------
    model = build_model(
        frame_length=75,
        image_height=46,
        image_width=140,
        vocabulary_size=len(full_vocabulary)
    )
    
    model.compile(
        optimizer=Adam(learning_rate=0.0001),
        loss='categorical_crossentropy',
        metrics=['accuracy']
    )
    
    model_dir = "models_main"
    os.makedirs(model_dir, exist_ok=True)
    
    # Callbacks to save the best model and to perform early stopping.
    callbacks = [
        ModelCheckpoint(
            os.path.join(model_dir, 'lip_reading_best_model.keras'),
            save_best_only=True,
            monitor='val_accuracy',
            mode='max'
        ),
        EarlyStopping(
            monitor='val_loss',
            patience=7,
            restore_best_weights=True
        )
    ]
    
    print("Starting training...")
    history = model.fit(
        train_generator,
        epochs=epochs,
        validation_data=val_generator,
        callbacks=callbacks
    )
    
    # -----------------------------------------------------------
    # Step 5: Save the final model and the vocabulary.
    # -----------------------------------------------------------
    final_model_path = os.path.join(model_dir, 'lip_reading_full_model.h5')
    model.save(final_model_path)
    print(f"Final model saved: {final_model_path}")
    
    vocab_path = os.path.join(model_dir, 'vocabulary_main.txt')
    with open(vocab_path, 'w') as f:
        f.write('\n'.join(full_vocabulary))
    print(f"Vocabulary saved: {vocab_path}")
    
    return model, history


In [None]:

# -------------------------------------------------------------------
# Example usage:
# Set your directories for the video (.mpg) files and alignment (.align) files.
data_dir = r"/kaggle/input/mouth-map-comp/data/s1"
alignment_dir = r"/kaggle/input/mouth-map-comp/data/alignments/s1"
train_and_save_model(data_dir, alignment_dir, batch_size=16, epochs=30)


In [None]:
def predict_on_video(model_path, video_path, vocabulary_path, frame_length=75, 
                     image_height=46, image_width=140):
    """Make prediction on a single video file"""
    # Load vocabulary
    with open(vocabulary_path, 'r') as f:
        vocabulary = f.read().strip().split('\n')
    
    # Load model
    model = load_model(model_path)
    
    # Process video
    frames = []
    cap = cv2.VideoCapture(video_path)
    
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        mouth = gray[190:236, 80:220]
        mouth = cv2.resize(mouth, (image_width, image_height))
        frames.append(mouth)
    
    cap.release()
    
    frames = np.array(frames, dtype=np.float32)
    frames = (frames - frames.mean()) / (frames.std() + 1e-6)
    
    if len(frames) < frame_length:
        pad_length = frame_length - len(frames)
        frames = np.pad(frames, ((0, pad_length), (0, 0), (0, 0)), mode='constant')
    else:
        frames = frames[:frame_length]
    
    frames = frames.reshape(1, frame_length, image_height, image_width, 1)
    
    # Make prediction
    prediction = model.predict(frames)[0]
    top_indices = np.argsort(prediction)[-5:][::-1]  # Get top 5 predictions
    
    results = []
    for idx in top_indices:
        results.append({
            'word': vocabulary[idx],
            'confidence': float(prediction[idx])
        })
    
    return results

results = predict_on_video(
    model_path="/kaggle/working/models_main/lip_reading_best_model.keras",
    video_path="/kaggle/input/mouth-map-comp/data/s1/bbaf2n.mpg",
    vocabulary_path="/kaggle/working/models_main/vocabulary_main.txt"
)

In [None]:
results

In [None]:
###################################################################################

In [None]:
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.layers import Conv3D, LSTM, Dense, Dropout, Bidirectional, MaxPool3D, BatchNormalization, Reshape
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau
from glob import glob
import cv2
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split

class LipReadingDataGenerator(tf.keras.utils.Sequence):
    def __init__(self, data_path, alignment_path, batch_size=16, frame_length=75, 
                 image_height=46, image_width=140, video_paths=None, alignment_paths=None, 
                 fixed_vocabulary=None, augment=False, **kwargs):
        super().__init__(**kwargs)
        self.data_path = data_path
        self.alignment_path = alignment_path
        self.batch_size = batch_size
        self.frame_length = frame_length
        self.image_height = image_height
        self.image_width = image_width
        self.augment = augment

        # Use provided paths or generate from directory
        if video_paths is None:
            self.video_paths = sorted(glob(os.path.join(data_path, '*.mpg')))
        else:
            self.video_paths = video_paths
        
        if alignment_paths is None:
            self.alignment_paths = sorted(glob(os.path.join(alignment_path, '*.align')))
        else:
            self.alignment_paths = alignment_paths
        
        # Ensure alignment paths correspond to video paths
        if len(self.video_paths) != len(self.alignment_paths):
            # Match alignments to videos by base filename
            video_basenames = [os.path.splitext(os.path.basename(v))[0] for v in self.video_paths]
            alignment_dict = {os.path.splitext(os.path.basename(a))[0]: a for a in self.alignment_paths}
            
            # Reorder alignment paths to match video paths
            self.alignment_paths = [alignment_dict.get(vb) for vb in video_basenames]
            
            # Filter out any None values (missing alignments)
            valid_indices = [i for i, a in enumerate(self.alignment_paths) if a is not None]
            self.video_paths = [self.video_paths[i] for i in valid_indices]
            self.alignment_paths = [self.alignment_paths[i] for i in valid_indices]
        
        print(f"Found {len(self.video_paths)} video files and {len(self.alignment_paths)} alignment files")
        
        # Use fixed vocabulary if provided, otherwise create from data
        if fixed_vocabulary is not None:
            self.vocabulary = fixed_vocabulary
        else:
            self.vocabulary = self._create_word_vocabulary()
            
        self.char_to_num = tf.keras.layers.StringLookup(
            vocabulary=self.vocabulary, oov_token="")
        self.num_to_char = tf.keras.layers.StringLookup(
            vocabulary=self.vocabulary, oov_token="", invert=True)

    def _create_word_vocabulary(self):
        words = set()
        print(f"Processing alignment files from: {self.alignment_path}")
        
        for align_path in self.alignment_paths:
            try:
                with open(align_path, 'r') as f:
                    content = f.read().strip().split()
                    words.update([content[i] for i in range(2, len(content), 3)])
            except Exception as e:
                print(f"Error processing {align_path}: {str(e)}")
        
        words.discard('sil')
        vocabulary = sorted(list(words))
        
        if not vocabulary:
            print("No words found in alignment files. Using default vocabulary.")
            vocabulary = ['bin', 'blue', 'at', 'f', 'two', 'now']
        
        print(f"Vocabulary size: {len(vocabulary)}")
        return vocabulary

    def __len__(self):
        return max(1, len(self.video_paths) // self.batch_size)
    
    def on_epoch_end(self):
        """Shuffle the dataset at the end of each epoch"""
        indices = np.arange(len(self.video_paths))
        np.random.shuffle(indices)
        self.video_paths = [self.video_paths[i] for i in indices]
        self.alignment_paths = [self.alignment_paths[i] for i in indices]
    
    def _apply_augmentation(self, frame):
        """Apply random augmentations to a frame"""
        if self.augment and np.random.random() > 0.5:
            # Random brightness adjustment
            brightness = np.random.uniform(0.8, 1.2)
            frame = frame * brightness
            frame = np.clip(frame, 0, 255)
            
            # Random horizontal flip
            if np.random.random() > 0.5:
                frame = cv2.flip(frame, 1)
                
            # Random rotation (small angles)
            if np.random.random() > 0.7:
                angle = np.random.uniform(-5, 5)
                h, w = frame.shape
                center = (w/2, h/2)
                M = cv2.getRotationMatrix2D(center, angle, 1.0)
                frame = cv2.warpAffine(frame, M, (w, h))
                
        return frame
    
    def _process_video(self, video_path):
        frames = []
        cap = cv2.VideoCapture(video_path)
        
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            
            gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
            
            # Extract mouth region - these coordinates might need adjustment
            mouth = gray[190:236, 80:220]
            
            # Apply data augmentation if enabled
            mouth = self._apply_augmentation(mouth)
            
            mouth = cv2.resize(mouth, (self.image_width, self.image_height))
            frames.append(mouth)
        
        cap.release()
        
        frames = np.array(frames, dtype=np.float32)
        
        # Normalize frames - using per-video normalization
        frames = (frames - frames.mean()) / (frames.std() + 1e-6)
        
        # Handle videos shorter or longer than desired frame length
        if len(frames) < self.frame_length:
            pad_length = self.frame_length - len(frames)
            frames = np.pad(frames, ((0, pad_length), (0, 0), (0, 0)), mode='constant')
        else:
            frames = frames[:self.frame_length]
        
        return frames
    
    def _process_alignment(self, alignment_path):
        with open(alignment_path, 'r') as f:
            content = f.read().strip().split()
        
        words = [content[i] for i in range(2, len(content), 3) if content[i] != 'sil']
        text = ' '.join(words)
        return self.char_to_num(tf.convert_to_tensor(text.split()))
    
    def __getitem__(self, idx):
        batch_videos = self.video_paths[idx * self.batch_size:(idx + 1) * self.batch_size]
        batch_alignments = self.alignment_paths[idx * self.batch_size:(idx + 1) * self.batch_size]
        
        X = np.zeros((len(batch_videos), self.frame_length, self.image_height, self.image_width, 1))
        Y = np.zeros((len(batch_videos), len(self.vocabulary)))
        
        for i, (video_path, align_path) in enumerate(zip(batch_videos, batch_alignments)):
            frames = self._process_video(video_path)
            X[i] = frames.reshape(self.frame_length, self.image_height, self.image_width, 1)
            
            labels = self._process_alignment(align_path)
            Y[i] = tf.reduce_max(tf.one_hot(labels, len(self.vocabulary)), axis=0)
        
        return X, Y

def build_model(frame_length, image_height, image_width, vocabulary_size):
    """Build the lip reading model with improved architecture"""
    model = Sequential([
        tf.keras.Input(shape=(frame_length, image_height, image_width, 1)), 
        
        # First 3D CNN block with increased filters
        Conv3D(64, kernel_size=(3, 3, 3), activation='relu', padding='same'),
        Conv3D(64, kernel_size=(3, 3, 3), activation='relu', padding='same'),
        MaxPool3D(pool_size=(1, 2, 2)),
        BatchNormalization(),
        
        # Second 3D CNN block
        Conv3D(128, kernel_size=(3, 3, 3), activation='relu', padding='same'),
        Conv3D(128, kernel_size=(3, 3, 3), activation='relu', padding='same'),
        MaxPool3D(pool_size=(1, 2, 2)),
        BatchNormalization(),
        
        # Third 3D CNN block
        Conv3D(256, kernel_size=(3, 3, 3), activation='relu', padding='same'),
        Conv3D(256, kernel_size=(3, 3, 3), activation='relu', padding='same'),
        MaxPool3D(pool_size=(1, 2, 2)),
        BatchNormalization(),
        
        # Reshape for sequence modeling
        Reshape((-1, 256)),
        
        # Bidirectional LSTM layers
        Bidirectional(LSTM(256, return_sequences=True, dropout=0.25, recurrent_dropout=0.1)),
        Bidirectional(LSTM(128, return_sequences=False, dropout=0.25, recurrent_dropout=0.1)),
        
        # Dense layers with stronger regularization
        Dense(512, activation='relu'),
        Dropout(0.5),
        Dense(vocabulary_size, activation='softmax')
    ])
    
    return model

def train_with_validation_split(data_dir, alignment_dir, batch_size=16, epochs=50, learning_rate=0.0001):
    """Train the model using a proper train/validation split approach"""
    print("Starting improved training with validation split...")
    
    model_dir = "models_improved"
    os.makedirs(model_dir, exist_ok=True)
    
    # Load all video and alignment paths
    video_paths = sorted(glob(os.path.join(data_dir, '*.mpg')))
    alignment_paths = sorted(glob(os.path.join(alignment_dir, '*.align')))
    
    # Match alignment paths with video paths by base filename
    video_basenames = [os.path.splitext(os.path.basename(v))[0] for v in video_paths]
    alignment_dict = {os.path.splitext(os.path.basename(a))[0]: a for a in alignment_paths}
    matched_alignments = [alignment_dict.get(vb) for vb in video_basenames]
    
    # Filter out videos without matching alignments
    valid_indices = [i for i, a in enumerate(matched_alignments) if a is not None]
    filtered_videos = [video_paths[i] for i in valid_indices]
    filtered_alignments = [matched_alignments[i] for i in valid_indices]
    
    print(f"Total matched videos and alignments: {len(filtered_videos)}")
    
    # Create a temporary generator to get consistent vocabulary across all splits
    temp_generator = LipReadingDataGenerator(
        data_path=data_dir,
        alignment_path=alignment_dir,
        batch_size=batch_size,
        video_paths=filtered_videos,
        alignment_paths=filtered_alignments
    )
    full_vocabulary = temp_generator.vocabulary
    
    # Split data: 50% training, 35% validation, 15% held out
    # First split into training (50%) and remaining (50%)
    train_videos, remaining_videos, train_alignments, remaining_alignments = train_test_split(
        filtered_videos, filtered_alignments, test_size=0.5, random_state=42
    )
    
    # Split the remaining data into validation (70% of remaining = 35% of total)
    # and held out (30% of remaining = 15% of total)
    val_videos, test_videos, val_alignments, test_alignments = train_test_split(
        remaining_videos, remaining_alignments, test_size=0.3, random_state=42
    )
    
    print(f"Train set: {len(train_videos)} videos")
    print(f"Validation set: {len(val_videos)} videos")
    print(f"Test set (held out): {len(test_videos)} videos")
    
    # Create generators for training and validation
    train_generator = LipReadingDataGenerator(
        data_path=data_dir,
        alignment_path=alignment_dir,
        batch_size=batch_size,
        video_paths=train_videos,
        alignment_paths=train_alignments,
        fixed_vocabulary=full_vocabulary,
        augment=True  # Enable data augmentation for training
    )
    
    val_generator = LipReadingDataGenerator(
        data_path=data_dir,
        alignment_path=alignment_dir,
        batch_size=batch_size,
        video_paths=val_videos,
        alignment_paths=val_alignments,
        fixed_vocabulary=full_vocabulary,
        augment=False  # No augmentation for validation
    )
    
    # Build and compile model
    model = build_model(
        frame_length=75,
        image_height=46,
        image_width=140,
        vocabulary_size=len(full_vocabulary)
    )
    
    model.compile(
        optimizer=Adam(learning_rate=learning_rate),
        loss='categorical_crossentropy',
        metrics=['accuracy', 'top_k_categorical_accuracy']  # Track top-k accuracy too
    )
    
    model.summary()
    
    # Enhanced callbacks
    callbacks = [
        ModelCheckpoint(
            os.path.join(model_dir, 'lip_reading_best_val_acc.keras'),
            save_best_only=True,
            monitor='val_accuracy',
            mode='max',
            verbose=1
        ),
        ModelCheckpoint(
            os.path.join(model_dir, 'lip_reading_best_val_loss.keras'),
            save_best_only=True,
            monitor='val_loss',
            mode='min',
            verbose=1
        ),
        EarlyStopping(
            monitor='val_loss',
            patience=10,
            restore_best_weights=True,
            verbose=1
        ),
        ReduceLROnPlateau(
            monitor='val_loss',
            factor=0.5,
            patience=5,
            min_lr=1e-6,
            verbose=1
        )
    ]
    
    # Train model with validation
    history = model.fit(
        train_generator,
        validation_data=val_generator,
        epochs=epochs,
        callbacks=callbacks,
        verbose=1
    )
    
    # Save final model
    final_model_path = os.path.join(model_dir, 'lip_reading_final_model.keras')
    model.save(final_model_path)
    print(f"Final model saved: {final_model_path}")
    
    # Save vocabulary
    vocab_path = os.path.join(model_dir, 'vocabulary.txt')
    with open(vocab_path, 'w') as f:
        f.write('\n'.join(full_vocabulary))
    print(f"Vocabulary saved: {vocab_path}")
    
    # Plot and save training history
    plot_training_history(history, model_dir)
    
    # Evaluate on test set
    test_generator = LipReadingDataGenerator(
        data_path=data_dir,
        alignment_path=alignment_dir,
        batch_size=batch_size,
        video_paths=test_videos,
        alignment_paths=test_alignments,
        fixed_vocabulary=full_vocabulary
    )
    
    test_results = model.evaluate(test_generator)
    print(f"Test loss: {test_results[0]:.4f}")
    print(f"Test accuracy: {test_results[1]:.4f}")
    print(f"Test top-k accuracy: {test_results[2]:.4f}")
    
    # Save test results
    with open(os.path.join(model_dir, 'test_results.txt'), 'w') as f:
        f.write(f"Test loss: {test_results[0]:.4f}\n")
        f.write(f"Test accuracy: {test_results[1]:.4f}\n")
        f.write(f"Test top-k accuracy: {test_results[2]:.4f}\n")
    
    return model, history, full_vocabulary

def plot_training_history(history, save_dir):
    """Plot and save training history graphs"""
    plt.figure(figsize=(12, 5))
    
    # Plot accuracy
    plt.subplot(1, 2, 1)
    plt.plot(history.history['accuracy'], label='Training Accuracy')
    plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
    plt.title('Model Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()
    
    # Plot loss
    plt.subplot(1, 2, 2)
    plt.plot(history.history['loss'], label='Training Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.title('Model Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    
    plt.tight_layout()
    plt.savefig(os.path.join(save_dir, 'training_history.png'))
    plt.close()

def predict_on_video(model_path, video_path, vocabulary_path, frame_length=75, 
                     image_height=46, image_width=140):
    """Make prediction on a single video file"""
    # Load vocabulary
    with open(vocabulary_path, 'r') as f:
        vocabulary = f.read().strip().split('\n')
    
    # Load model
    model = load_model(model_path)
    
    # Process video
    frames = []
    cap = cv2.VideoCapture(video_path)
    
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        mouth = gray[190:236, 80:220]
        mouth = cv2.resize(mouth, (image_width, image_height))
        frames.append(mouth)
    
    cap.release()
    
    frames = np.array(frames, dtype=np.float32)
    frames = (frames - frames.mean()) / (frames.std() + 1e-6)
    
    if len(frames) < frame_length:
        pad_length = frame_length - len(frames)
        frames = np.pad(frames, ((0, pad_length), (0, 0), (0, 0)), mode='constant')
    else:
        frames = frames[:frame_length]
    
    frames = frames.reshape(1, frame_length, image_height, image_width, 1)
    
    # Make prediction
    prediction = model.predict(frames)[0]
    top_indices = np.argsort(prediction)[-5:][::-1]  # Get top 5 predictions
    
    results = []
    for idx in top_indices:
        results.append({
            'word': vocabulary[idx],
            'confidence': float(prediction[idx])
        })
    
    return results

if __name__ == "__main__":
    # Example usage
    data_dir = r"/kaggle/input/mouth-map-comp/data/s1"
    alignment_dir = r"/kaggle/input/mouth-map-comp/data/alignments/s1"
    
    # Train model with validation split
    model, history, vocabulary = train_with_validation_split(
        data_dir=data_dir,
        alignment_dir=alignment_dir,
        batch_size=16,
        epochs=50,
        learning_rate=0.0001
    )

In [None]:
####### ABOVE CODE GOT OOM... TRY CONDENSING DATA

In [2]:
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv3D, LSTM, Dense, Dropout, Bidirectional, MaxPool3D, BatchNormalization, Reshape
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
from glob import glob
import cv2
import argparse

class LipReadingDataGenerator(tf.keras.utils.Sequence):
    def __init__(self, data_path, alignment_path, batch_size=16, frame_length=75, 
                 image_height=46, image_width=140, 
                 video_paths=None, alignment_paths=None, 
                 fixed_vocabulary=None):
        super().__init__()
        self.data_path = data_path
        self.alignment_path = alignment_path
        self.batch_size = batch_size
        self.frame_length = frame_length
        self.image_height = image_height
        self.image_width = image_width

        # Load video and alignment paths
        if video_paths is None:
            self.video_paths = sorted(glob(os.path.join(data_path, '*.mpg')))
        else:
            self.video_paths = video_paths
        
        if alignment_paths is None:
            self.alignment_paths = sorted(glob(os.path.join(alignment_path, '*.align')))
        else:
            self.alignment_paths = alignment_paths
        
        # Vocabulary handling
        if fixed_vocabulary is not None:
            self.vocabulary = fixed_vocabulary
        else:
            self.vocabulary = self._create_word_vocabulary()
        
        self.char_to_num = tf.keras.layers.StringLookup(
            vocabulary=self.vocabulary, oov_token="")
        self.num_to_char = tf.keras.layers.StringLookup(
            vocabulary=self.vocabulary, oov_token="", invert=True)

    def _create_word_vocabulary(self):
        words = set()
        for align_path in self.alignment_paths:
            with open(align_path, 'r') as f:
                content = f.read().strip().split()
                words.update(content[2::3])
        words.discard('sil')
        return sorted(list(words)) if words else ['default_word']

    def __len__(self):
        return max(1, len(self.video_paths) // self.batch_size)

    def _process_video(self, video_path):
        frames = []
        cap = cv2.VideoCapture(video_path)
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
            mouth = gray[190:236, 80:220]
            mouth = cv2.resize(mouth, (self.image_width, self.image_height))
            frames.append(mouth)
        cap.release()

        # Normalize and pad frames
        frames = np.array(frames, dtype=np.float32)
        if frames.shape[0] < self.frame_length:
            frames = pad_sequences([frames], maxlen=self.frame_length, dtype='float32', padding='post')[0]
        else:
            frames = frames[:self.frame_length]
        frames = (frames - frames.mean()) / (frames.std() + 1e-7)
        return frames

    def _process_alignment(self, alignment_path):
        with open(alignment_path, 'r') as f:
            content = f.read().strip().split()
        words = [word for i, word in enumerate(content[2::3]) if content[i] != 'sil']
        text = ' '.join(words)
        labels = self.char_to_num(tf.convert_to_tensor(text.split()))
        return tf.reduce_max(tf.one_hot(labels, len(self.vocabulary)), axis=0)

    def __getitem__(self, idx):
        batch_videos = self.video_paths[idx * self.batch_size:(idx + 1) * self.batch_size]
        batch_alignments = self.alignment_paths[idx * self.batch_size:(idx + 1) * self.batch_size]
        
        X = np.zeros((len(batch_videos), self.frame_length, self.image_height, self.image_width, 1))
        Y = []
        
        for i, (video_path, align_path) in enumerate(zip(batch_videos, batch_alignments)):
            frames = self._process_video(video_path)
            X[i] = frames.reshape(self.frame_length, self.image_height, self.image_width, 1)
            Y.append(self._process_alignment(align_path))
        
        Y = np.array(Y)
        return X, Y

def build_model(frame_length, image_height, image_width, vocabulary_size):
    model = Sequential([
        tf.keras.Input(shape=(frame_length, image_height, image_width, 1)), 
        Conv3D(64, (3, 3, 3), activation='relu'),
        MaxPool3D((1, 2, 2)),
        BatchNormalization(),
        Conv3D(128, (3, 3, 3), activation='relu'),
        MaxPool3D((1, 2, 2)),
        BatchNormalization(),
        Conv3D(256, (3, 3, 3), activation='relu'),
        MaxPool3D((1, 2, 2)),
        BatchNormalization(),
        Reshape((-1, 256)),
        Bidirectional(LSTM(128, return_sequences=True)),
        Dropout(0.5),
        Bidirectional(LSTM(64)),
        Dropout(0.5),
        Dense(256, activation='relu'),
        Dropout(0.5),
        Dense(vocabulary_size, activation='sigmoid')
    ])
    return model

def train_and_save_main_model(data_dir, alignment_dir, batch_size=16):
    model_dir = "models_main"
    os.makedirs(model_dir, exist_ok=True)
    
    full_data_generator = LipReadingDataGenerator(data_dir, alignment_dir, batch_size=batch_size)
    full_vocabulary = full_data_generator.vocabulary
    total_videos = len(full_data_generator.video_paths)
    train_size = total_videos // 2
    remaining = total_videos - train_size
    val_size = int(0.7 * remaining)
    test_size = remaining - val_size

    # Split video and alignment paths
    video_paths = full_data_generator.video_paths
    alignment_paths = full_data_generator.alignment_paths

    train_videos = video_paths[:train_size]
    train_alignments = alignment_paths[:train_size]

    val_videos = video_paths[train_size:train_size + val_size]
    val_alignments = alignment_paths[train_size:train_size + val_size]

    test_videos = video_paths[train_size + val_size:]
    test_alignments = alignment_paths[train_size + val_size:]

    # Create data generators
    train_generator = LipReadingDataGenerator(
        video_paths=train_videos,
        alignment_paths=train_alignments,
        data_path=data_dir,
        alignment_path=alignment_dir,
        batch_size=batch_size,
        fixed_vocabulary=full_vocabulary
    )

    val_generator = LipReadingDataGenerator(
        video_paths=val_videos,
        alignment_paths=val_alignments,
        data_path=data_dir,
        alignment_path=alignment_dir,
        batch_size=batch_size,
        fixed_vocabulary=full_vocabulary
    )

    test_generator = LipReadingDataGenerator(
        video_paths=test_videos,
        alignment_paths=test_alignments,
        data_path=data_dir,
        alignment_path=alignment_dir,
        batch_size=batch_size,
        fixed_vocabulary=full_vocabulary
    )

    # Build and compile the model
    model = build_model(
        frame_length=75,
        image_height=46,
        image_width=140,
        vocabulary_size=len(full_vocabulary)
    )

    model.compile(
        optimizer=Adam(learning_rate=0.0001),
        loss='binary_crossentropy',
        metrics=['accuracy']
    )

    # Callbacks
    checkpoint = ModelCheckpoint(
        os.path.join(model_dir, 'best_lip_reading_model.keras'),
        save_best_only=True,
        monitor='val_accuracy',
        mode='max'
    )
    
    early_stopping = EarlyStopping(
        monitor='val_loss',
        patience=7,
        restore_best_weights=True
    )

    # Train the model
    history = model.fit(
        train_generator,
        validation_data=val_generator,
        epochs=10,
        callbacks=[checkpoint, early_stopping]
    )

    # Evaluate on the test set
    best_model = tf.keras.models.load_model(os.path.join(model_dir, 'best_lip_reading_model.keras'))
    test_loss, test_acc = best_model.evaluate(test_generator)
    print(f"Test accuracy: {test_acc}")

    # Save the final model
    final_model_path = os.path.join(model_dir, 'lip_reading_final_model.keras')
    best_model.save(final_model_path)
    print(f"Final model saved: {final_model_path}")

    # Save the vocabulary
    vocab_path = os.path.join(model_dir, 'vocabulary_main.txt')
    with open(vocab_path, 'w') as f:
        f.write('\n'.join(full_vocabulary))
    print(f"Vocabulary saved: {vocab_path}")

def main():
    parser = argparse.ArgumentParser(description='Lip Reading Model Training')
    parser.add_argument('--data_dir', type=str, required=True, help='Path to video data directory')
    parser.add_argument('--alignment_dir', type=str, required=True, help='Path to alignment data directory')
    parser.add_argument('--batch_size', type=int, default=16, help='Batch size for training')
    args = parser.parse_args()

    # Validate directories
    if not os.path.exists(args.data_dir):
        raise ValueError(f"Data directory {args.data_dir} does not exist")
    if not os.path.exists(args.alignment_dir):
        raise ValueError(f"Alignment directory {args.alignment_dir} does not exist")

    # Start training
    train_and_save_main_model(args.data_dir, args.alignment_dir, args.batch_size)

In [3]:
data_dir = r"/kaggle/input/mouth-map-comp/data/s1"
alignment_dir = r"/kaggle/input/mouth-map-comp/data/alignments/s1"
batch_size = 16
train_and_save_main_model(data_dir, alignment_dir, batch_size)

Epoch 1/10
[1m31/31[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m102s[0m 3s/step - accuracy: 0.0397 - loss: 0.6703 - val_accuracy: 0.0179 - val_loss: 0.6675
Epoch 2/10
[1m31/31[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m78s[0m 2s/step - accuracy: 0.2138 - loss: 0.5600 - val_accuracy: 1.0000 - val_loss: 0.5693
Epoch 3/10
[1m31/31[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m78s[0m 2s/step - accuracy: 0.5487 - loss: 0.4476 - val_accuracy: 1.0000 - val_loss: 0.4557
Epoch 4/10
[1m31/31[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m78s[0m 2s/step - accuracy: 0.7567 - loss: 0.3754 - val_accuracy: 1.0000 - val_loss: 0.3838
Epoch 5/10
[1m31/31[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m78s[0m 2s/step - accuracy: 0.8875 - loss: 0.3467 - val_accuracy: 1.0000 - val_loss: 0.3557
Epoch 6/10
[1m31/31[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m77s[0m 2s/step - accuracy: 0.9308 - loss: 0.3258 - val_accuracy: 1.0000 - val_loss: 0.3411
Epoch 7/10
[1m31/31[0m [32m━━━━━━━━━