In [1]:
# Import necessary libraries
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torchvision.transforms.functional import resize
from torch.utils.data import DataLoader, Dataset
from PIL import Image
import os
import glob
import torchvision.io
import cv2
from tqdm.notebook import tqdm
from sklearn.metrics import accuracy_score

In [2]:
# Custom dataset class for Lip Reading
class LipReadingDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.video_paths = []
        self.labels = []
        self.classes = sorted(os.listdir(root_dir))
        if '.DS_Store' in self.classes:
            self.classes.remove('.DS_Store')
        
        for label, class_name in enumerate(self.classes):
            class_dir = os.path.join(root_dir, class_name, 'train')
            video_files = glob.glob(os.path.join(class_dir, "*.mp4"))
            self.video_paths.extend(video_files)
            self.labels.extend([label] * len(video_files))

        # Debug prints
        print(f"Found {len(self.video_paths)} videos across {len(self.classes)} classes.")
        if len(self.video_paths) == 0:
            print("No videos found. Please check the dataset directory structure and paths.")

    def __len__(self):
        return len(self.video_paths)

    def __getitem__(self, idx):
        video_path = self.video_paths[idx]
        label = self.labels[idx]
        frames = self.load_video_frames(video_path)

        if self.transform:
            frames = self.transform(frames)

        return frames, label

    def load_video_frames(self, video_path):
        frames = []
        cap = cv2.VideoCapture(video_path)
        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break

            # frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)  # Convert frame to grayscale
            frame = Image.fromarray(frame)  # Convert numpy array to PIL Image
            frames.append(frame)
        cap.release()
        return frames

In [3]:
# Transform for video frames
class ToTensor:
    def __call__(self, frames):
        tensor = torch.stack([transforms.ToTensor()(frame) for frame in frames])  
        return tensor

In [4]:
# Path to the processed_selected_mp4_files directory
root_dir = './processed_selected_mp4_files'  # Update this path

# Dataset and DataLoader
transform = ToTensor()
train_dataset = LipReadingDataset(root_dir, transform=transform)
train_loader = DataLoader(train_dataset, batch_size=1, shuffle=True, collate_fn=lambda x: collate_fn(x))

# Collate function to handle variable-length sequences
def collate_fn(batch):
    videos, labels = zip(*batch)
    max_len = max(len(video) for video in videos)
    padded_videos = []
    for video in videos:
        assert len(video) == max_len
        
        # Not needed since all videos are same length(same number of frames). Should all be 29 frames
        # padded_video = torch.cat([video, torch.zeros((pad_size, video.shape[1], video.shape[2], video.shape[3]))], dim=0)  # Add padding if num frames < max_len
        
        # Resize video to (max_len, 1, 224, 224)
        resized_video = torch.stack([resize(frame, (128, 128)) for frame in video])
        padded_videos.append(resized_video)

        # padded_videos.append(video)

    return torch.stack(padded_videos), torch.tensor(labels)


Found 20 videos across 2 classes.


In [5]:
# Define the model architecture using VGG
class LipReadingModel(nn.Module):
    def __init__(self, num_classes=500):
        super(LipReadingModel, self).__init__()
        # VGG16 as feature extractor
        self.vgg = torchvision.models.vgg16(weights='IMAGENET1K_V1')
        # self.vgg.features[0] = nn.Conv2d(1, 64, kernel_size=3, stride=1, padding=1)  # Modify first conv layer for grayscale

        self.vgg.classifier = nn.Identity()  # Remove final classification layer

        # RNN for sequence modeling
        self.rnn = nn.LSTM(input_size=512*4*4, hidden_size=256, num_layers=2, batch_first=True, bidirectional=True)
        
        # Fully connected layer for classification
        self.fc = nn.Linear(256*2, num_classes)  # bidirectional doubles the output features

    def forward(self, x):
        batch_size, timesteps, C, H, W = x.size()
        c_in = x.view(batch_size * timesteps, C, H, W)
        # print(c_in.shape)
        c_out = self.vgg.features(c_in)
        # print(c_out.shape)
        c_out = c_out.view(batch_size, timesteps, -1)  # Flatten for LSTM
        # print(c_out.shape)
        r_out, _ = self.rnn(c_out)
        # print(r_out.shape)
        out = self.fc(r_out[:, -1, :])
        # print(out.shape)
        return out

In [6]:
# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Hyperparameters
num_classes = len(train_dataset.classes)  # Automatically get the number of classes
learning_rate = 0.001
num_epochs = 10
batch_size = 1

# Initialize model, loss, and optimizer
model = LipReadingModel(num_classes=num_classes).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Training loop with progress bar
for epoch in range(num_epochs):
    model.train()
    epoch_loss = 0
    progress_bar = tqdm(train_loader, desc=f"Epoch {epoch + 1}/{num_epochs}", unit="batch")
    for i, (videos, labels) in enumerate(progress_bar):
        videos = videos.to(device)
        labels = labels.to(device)

        outputs = model(videos)
        loss = criterion(outputs, labels)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()
        progress_bar.set_postfix(loss=loss.item())

    avg_loss = epoch_loss / len(train_loader)
    print(f'Epoch [{epoch + 1}/{num_epochs}], Average Loss: {avg_loss:.4f}')

# Save the trained model
model_save_path = 'vgg16_lstm_model.pth'
torch.save(model.state_dict(), model_save_path)
print(f"Model saved to {model_save_path}")


Epoch 1/10:   0%|          | 0/20 [00:00<?, ?batch/s]

Epoch [1/10], Average Loss: 0.7938


Epoch 2/10:   0%|          | 0/20 [00:00<?, ?batch/s]

Epoch [2/10], Average Loss: 0.7247


Epoch 3/10:   0%|          | 0/20 [00:00<?, ?batch/s]

Epoch [3/10], Average Loss: 0.7805


Epoch 4/10:   0%|          | 0/20 [00:00<?, ?batch/s]

Epoch [4/10], Average Loss: 0.7137


Epoch 5/10:   0%|          | 0/20 [00:00<?, ?batch/s]

Epoch [5/10], Average Loss: 0.6965


Epoch 6/10:   0%|          | 0/20 [00:00<?, ?batch/s]

Epoch [6/10], Average Loss: 0.7570


Epoch 7/10:   0%|          | 0/20 [00:00<?, ?batch/s]

Epoch [7/10], Average Loss: 0.7001


Epoch 8/10:   0%|          | 0/20 [00:00<?, ?batch/s]

Epoch [8/10], Average Loss: 0.7157


Epoch 9/10:   0%|          | 0/20 [00:00<?, ?batch/s]

Epoch [9/10], Average Loss: 0.6970


Epoch 10/10:   0%|          | 0/20 [00:00<?, ?batch/s]

Epoch [10/10], Average Loss: 0.6960
Model saved to vgg16_lstm_model.pth


In [7]:
# To run saved model on test dataset

# test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)

# # Define the path where the model is saved
# model_save_path = 'vgg16_lstm_model.pth'
# # Device configuration
# device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# num_classes = len(train_dataset.classes)
# # Instantiate the model
# model = LipReadingModel(num_classes=num_classes).to(device)

# # Load the state dictionary
# model.load_state_dict(torch.load(model_save_path))
# model.eval()  # Set the model to evaluation mode
# print("Model loaded from", model_save_path)

# # Function to evaluate the model
# def evaluate_model(model, data_loader):
#     model.eval()  # Set the model to evaluation mode
#     all_labels = []
#     all_preds = []

#     with torch.no_grad():  # No need to track gradients for inference
#         for videos, labels in data_loader:
#             videos = videos.to(device)
#             labels = labels.to(device)

#             outputs = model(videos)
#             _, preds = torch.max(outputs, 1)

#             all_labels.extend(labels.cpu().numpy())
#             all_preds.extend(preds.cpu().numpy())

#     # Calculate accuracy
#     accuracy = accuracy_score(all_labels, all_preds)
#     return accuracy

# # Example of using the loaded model for inference on the test dataset
# test_accuracy = evaluate_model(model, test_loader)
# print(f'Test Accuracy: {test_accuracy:.4f}')


Model loaded from vgg16_lstm_model.pth
Test Accuracy: 0.5000
