In [16]:
!pip install kagglehub
!pip install scikit-learn
!pip install opencv-python

Collecting opencv-python
  Downloading opencv_python-4.11.0.86-cp37-abi3-win_amd64.whl.metadata (20 kB)
Downloading opencv_python-4.11.0.86-cp37-abi3-win_amd64.whl (39.5 MB)
   ---------------------------------------- 0.0/39.5 MB ? eta -:--:--
   --------- ------------------------------ 9.7/39.5 MB 46.5 MB/s eta 0:00:01
   ---------------------- ----------------- 22.5/39.5 MB 52.8 MB/s eta 0:00:01
   -------------------------------- ------- 32.2/39.5 MB 51.2 MB/s eta 0:00:01
   ---------------------------------------  38.8/39.5 MB 47.5 MB/s eta 0:00:01
   ---------------------------------------- 39.5/39.5 MB 40.5 MB/s eta 0:00:00
Installing collected packages: opencv-python
Successfully installed opencv-python-4.11.0.86


In [None]:
import kagglehub

# Download latest version
path = kagglehub.dataset_download("mohamedbentalb/lipreading-dataset")

print("Path to dataset files:", path)

Downloading from https://www.kaggle.com/api/v1/datasets/download/mohamedbentalb/lipreading-dataset?dataset_version_number=1...


100%|██████████| 404M/404M [00:08<00:00, 51.4MB/s] 

Extracting files...





Path to dataset files: C:\Users\6\.cache\kagglehub\datasets\mohamedbentalb\lipreading-dataset\versions\1


In [None]:
import cv2
import torch
import torchvision
import torchvision.transforms as T
import torchvision
import torch_directml
import numpy as np
import os
from sklearn.model_selection import train_test_split

Load the Data

In [None]:
video_dir = r"mohamedbentalb\lipreading-dataset\versions\1\data\s1"
align_dir = r"mohamedbentalb\lipreading-dataset\versions\1\data\alignments\s1"

# Pair each video with its alignment file
video_files = [f for f in os.listdir(video_dir) if f.endswith('.mpg')]
video_align_pairs = []
for vf in video_files:
    base = os.path.splitext(vf)[0]
    align_path = os.path.join(align_dir, base + ".align")
    video_path = os.path.join(video_dir, vf)
    if os.path.exists(align_path):
        video_align_pairs.append((video_path, align_path))

print(f"Found {len(video_align_pairs)} video-align pairs.")

train_pairs, test_pairs = train_test_split(video_align_pairs, test_size=0.2, random_state=42)

print(f"Training set size: {len(train_pairs)}")
print(f"Test set size: {len(test_pairs)}")

Found 1000 video-align pairs.
Training set size: 800
Test set size: 200


Pre-Processing dataset, extract each frames

In [52]:
# Preprocessing transform
transform = T.Compose([
    T.ToPILImage(),
    T.Resize((112, 112)),
    T.ToTensor(),
    T.Normalize([0.43216, 0.394666, 0.37645], [0.22803, 0.22145, 0.216989])
])

def preprocess_frames(frames):
    processed = [transform(frame) for frame in frames]
    video_tensor = torch.stack(processed)  # (T, C, H, W)
    video_tensor = video_tensor.permute(1, 0, 2, 3)  # (C, T, H, W)
    return video_tensor.unsqueeze(0)  # (1, C, T, H, W)

def parse_align_file(align_path):
    alignments = []
    with open(align_path, 'r') as f:
        for line in f:
            parts = line.strip().split()
            if len(parts) == 3:
                start, end, word = parts
                alignments.append((int(start), int(end), word))
    return alignments

def extract_word_frames(video_path, alignments, fps=25):
    cap = cv2.VideoCapture(video_path)
    frames = []
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        frames.append(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
    cap.release()

    word_frames = []
    for start, end, word in alignments:
        start_idx = int(start / 1000 * fps)
        end_idx = int(end / 1000 * fps)
        word_seq = frames[start_idx:end_idx]
        if word_seq:
            word_frames.append((word, word_seq))
    return word_frames

Encoder:

In [55]:
# device = torch_directml.device()
device = torch.device("cpu")

# Load model
model = torchvision.models.video.r3d_18(pretrained=True)
# model = torchvision.models.video.r2plus1d_18(pretrained=True)
model = model.to(device)
model.eval()

all_features = []
all_labels = []
num = 1
for video_path, align_path in train_pairs:  # or test_pairs
    print(num)
    alignments = parse_align_file(align_path)
    word_frames = extract_word_frames(video_path, alignments, fps=25)
    with torch.no_grad():
        for word, frames in word_frames:
            video_tensor = preprocess_frames(frames).to(device)
            features = model(video_tensor) 
            all_features.append(features)
            all_labels.append(word)
    num += 1
torch.save(all_features, 'features.pt')
torch.save(all_labels, 'labels.pt')

1


KeyboardInterrupt: 

Here is another pretrained model: here is the instruction to pip install:

```bash
git clone https://github.com/astorfi/lipnet.git
cd lipnet
pip install -r requirements.txt
pip install .


In [None]:
# You must have LipNet installed and its modules accessible in your path
from lipnet.model import LipNet


# Load LipNet model (set decoder to None to use only encoder)
model = LipNet(img_c=3, img_w=112, img_h=112, absolute_max_string_len=32, output_size=28)
model = model.to(device)
model.eval()

MIN_FRAMES = 75  # LipNet expects 75 frames per input

with torch.no_grad():
    for word, frames in word_frames:
        video_tensor = preprocess_frames(frames).to(device)
        # Forward pass through encoder (LipNet's forward returns logits, you may want encoder features)
        features = model.encoder(video_tensor)
        print(f"Word: {word}, Feature shape: {features.shape}")

In [45]:
features = torch.load('features.pt')
labels = torch.load('labels.pt')
print(f"Loaded {len(features)} features and {len(labels)} labels.")

Loaded 9 features and 9 labels.


  features = torch.load('features.pt')
  labels = torch.load('labels.pt')


In [None]:
import torch.nn as nn

class VAE(nn.Module):
    def __init__(self, feature_dim, latent_dim, decoder_hidden_dim, output_dim):
        super(VAE, self).__init__()
        # Encoder: maps features to latent mean and logvar
        self.fc_mu = nn.Linear(feature_dim, latent_dim)
        self.fc_logvar = nn.Linear(feature_dim, latent_dim)
        # Decoder: simple RNN decoder (can be LSTM/GRU)
        self.decoder_rnn = nn.GRU(latent_dim, decoder_hidden_dim, batch_first=True)
        self.fc_out = nn.Linear(decoder_hidden_dim, output_dim)

    def encode(self, x):
        mu = self.fc_mu(x)
        logvar = self.fc_logvar(x)
        return mu, logvar

    def reparameterize(self, mu, logvar):
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        return mu + eps * std

    def decode(self, z, seq_len):
        # Repeat z for each time step
        z_seq = z.unsqueeze(1).repeat(1, seq_len, 1)
        out, _ = self.decoder_rnn(z_seq)
        out = self.fc_out(out)
        return out

    def forward(self, x, seq_len):
        mu, logvar = self.encode(x)
        z = self.reparameterize(mu, logvar)
        recon = self.decode(z, seq_len)
        return recon, mu, logvar

RNN decoder

The variational part is the sampling and KL loss

In [None]:
# Assume features: [num_samples, feature_dim], labels: [num_samples, seq_len, output_dim]
# You may need to preprocess your labels to the right shape

vae = VAE(feature_dim=features[0].numel(), latent_dim=128, decoder_hidden_dim=256, output_dim=label_dim)
optimizer = torch.optim.Adam(vae.parameters(), lr=1e-3)
loss_fn = nn.CrossEntropyLoss()  # or nn.MSELoss() depending on your task

for epoch in range(num_epochs):
    for x, y in dataloader:  # x: [batch, feature_dim], y: [batch, seq_len, output_dim]
        recon, mu, logvar = vae(x, seq_len=y.shape[1])
        # Reconstruction loss
        recon_loss = loss_fn(recon.view(-1, recon.size(-1)), y.view(-1))
        # KL divergence
        kl_loss = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
        loss = recon_loss + kl_loss * beta  # beta can be 1.0 or tuned
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()