# 🧠 Lip Reading CTC Trainer + Inference on Real Video (.mpg + .align)

In [None]:
!pip install opencv-python-headless mediapipe tensorflow


Collecting mediapipe
  Downloading mediapipe-0.10.21-cp311-cp311-manylinux_2_28_x86_64.whl.metadata (9.7 kB)
INFO: pip is looking at multiple versions of mediapipe to determine which version is compatible with other requirements. This could take a while.
  Downloading mediapipe-0.10.20-cp311-cp311-manylinux_2_28_x86_64.whl.metadata (9.7 kB)
  Downloading mediapipe-0.10.18-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (9.7 kB)
  Downloading mediapipe-0.10.15-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (9.7 kB)
  Downloading mediapipe-0.10.14-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (9.7 kB)
Collecting protobuf<5,>=4.25.3 (from mediapipe)
  Downloading protobuf-4.25.8-cp37-abi3-manylinux2014_x86_64.whl.metadata (541 bytes)
Collecting sounddevice>=0.4.4 (from mediapipe)
  Downloading sounddevice-0.5.2-py3-none-any.whl.metadata (1.6 kB)
Downloading mediapipe-0.10.14-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_

In [None]:
import shutil
import os

# 🚮 Delete existing videos folder if present
if os.path.exists("data/videos"):
    shutil.rmtree("data/videos")


In [None]:
# Install gdown to download from Google Drive
!pip install -q gdown

import os
import zipfile
import gdown

# Google Drive file ID
file_id = "1YlvpDLix3S-U8fd-gqRwPcWXAXm8JwjL"
zip_path = "data.zip"

# Download from Drive
print("⬇️ Downloading data.zip from Google Drive...")
gdown.download(id=file_id, output=zip_path, quiet=False)

# Extract ZIP
print("📦 Extracting...")
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    zip_ref.extractall(".")

if os.path.exists("data/align"):
    os.rename("data/align/s1", "data/labels")

# ✅ Rename s1 → videos, align → labels
if os.path.exists("data/s1"):
    os.rename("data/s1", "data/videos")


# Show structure
print("📁 Final contents of data/:", os.listdir("data"))


⬇️ Downloading data.zip from Google Drive...


Downloading...
From (original): https://drive.google.com/uc?id=1YlvpDLix3S-U8fd-gqRwPcWXAXm8JwjL
From (redirected): https://drive.google.com/uc?id=1YlvpDLix3S-U8fd-gqRwPcWXAXm8JwjL&confirm=t&uuid=f97d0410-9839-4646-b2ca-70254d763b1f
To: /content/data.zip
100%|██████████| 423M/423M [00:01<00:00, 268MB/s]


📦 Extracting...
📁 Final contents of data/: ['videos', 'labels', 'alignments']


In [None]:
print("📂 Please upload the following project files:")
print("- train_ctc.py")
print("- utils.py")
print("- decoder.py")
print("- model.py")

from google.colab import files
uploaded_scripts = files.upload()


📂 Please upload the following project files:
- train_ctc.py
- utils.py
- decoder.py
- model.py


Saving decoder.py to decoder.py
Saving lip_detector.py to lip_detector.py
Saving live_infer.py to live_infer.py
Saving model.py to model.py
Saving streamlit_app.py to streamlit_app.py
Saving train_ctc.py to train_ctc.py
Saving train_model.py to train_model.py
Saving utils.py to utils.py


In [None]:
import shutil
os.makedirs("app", exist_ok=True)

for fname in ["decoder.py", "model.py"]:
    if fname in uploaded_scripts:
        shutil.move(fname, f"app/{fname}")


In [None]:
with open("train_ctc.py", "r") as f:
    code = f.read()

code = code.replace("../data/videos", "data/videos")
code = code.replace("../data/labels", "data/labels")

with open("train_ctc.py", "w") as f:
    f.write(code)

print("✅ Patched train_ctc.py for Colab paths")


✅ Patched train_ctc.py for Colab paths


In [None]:
import shutil
shutil.move("app/model.py", "model.py")

'model.py'

In [None]:
# 🔧 Patch model.py to fix ctc_batch_cost import
model_path = "model.py"

with open(model_path, "r") as file:
    code = file.read()

code = code.replace(
    "from keras.backend import ctc_batch_cost",
    "from tensorflow.keras.backend import ctc_batch_cost"
)

with open(model_path, "w") as file:
    file.write(code)

print("✅ Patched model.py to use TensorFlow's ctc_batch_cost")


✅ Patched model.py to use TensorFlow's ctc_batch_cost


In [None]:
import os

if os.path.exists("data/alignments"):
    os.rename("data/alignments", "data/labels")
    print("✅ Renamed data/alignments → data/labels")
else:
    print("❌ Folder 'data/alignments' not found.")


✅ Renamed data/alignments → data/labels


In [None]:
import os

label_files = sorted(os.listdir("data/labels/s1"))
print(f"📂 Total .align files found: {len(label_files)}")
print("📝 First few align files:", label_files[:5])


📂 Total .align files found: 1000
📝 First few align files: ['bbaf2n.align', 'bbaf3s.align', 'bbaf4p.align', 'bbaf5a.align', 'bbal6n.align']


In [None]:
import os

video_files = sorted(os.listdir("data/videos"))
align_files = sorted(os.listdir("data/labels"))

video_basenames = [f.split(".")[0] for f in video_files if f.endswith(".mpg")]
align_basenames = [f.split(".")[0] for f in align_files if f.endswith(".align")]

print("🎥 First 10 video basenames:", video_basenames[:10])
print("📄 First 10 align basenames:", align_basenames[:10])

# Now check how many match
matching = list(set(video_basenames) & set(align_basenames))
print(f"✅ Found {len(matching)} matching video-align pairs.")


🎥 First 10 video basenames: ['bbaf2n', 'bbaf3s', 'bbaf4p', 'bbaf5a', 'bbal6n', 'bbal7s', 'bbal8p', 'bbal9a', 'bbas1s', 'bbas2p']
📄 First 10 align basenames: []
✅ Found 0 matching video-align pairs.


In [None]:
import os

print("🔍 Sample files in 'data/labels':")
print(os.listdir("data/labels")[:10])


🔍 Sample files in 'data/labels':
['s1']


In [None]:
import os
import shutil

src_folder = "data/labels/s1"
dst_folder = "data/labels"

moved = 0
for fname in os.listdir(src_folder):
    if fname.endswith(".align"):
        shutil.move(os.path.join(src_folder, fname), os.path.join(dst_folder, fname))
        moved += 1

print(f"✅ Moved {moved} .align files from 'labels/s1' → 'labels'")


✅ Moved 1000 .align files from 'labels/s1' → 'labels'


In [None]:
!python3 train_ctc.py


2025-07-21 16:03:17.673249: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1753113797.694659   33702 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1753113797.701351   33702 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2025-07-21 16:03:17.722260: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
[INFO] Loading data...
[0;36m[mpeg1video @ 0x26c00f00] [0m[1;31mac-tex damaged at 22 17
[0m[INFO] Building model...
2025-

In [None]:
from google.colab import files
files.download("models/best_model.h5")


FileNotFoundError: Cannot find file: models/best_model.h5

In [None]:
print("📹 Upload a .mpg video to test inference:")
from google.colab import files
video_upload = files.upload()
video_file = list(video_upload.keys())[0]


📹 Upload a .mpg video to test inference:


Saving bbaf2n.mpg to bbaf2n.mpg


In [None]:
# 🔧 Build proper inference model using only video input and softmax output
from keras.models import Model

# Step 1: Confirm which input is video (usually input[0])
video_input = model.input[0]

# Step 2: Get the softmax layer correctly
# You can list all layers to confirm, but this usually works:
softmax_output = model.get_layer("y_pred").output


# Step 3: Create a simplified inference model
inference_model = Model(inputs=video_input, outputs=softmax_output)


In [None]:
from utils import load_video, preprocess_video
from decoder import greedy_decoder, get_charset
from model import build_ctc_model

print("[INFO] Loading charset and inference model...")
charset = get_charset()
output_dim = 28  # must match trained model

# Build and load model
inference_model = build_ctc_model(input_dim=(75, 50, 100, 1), output_dim=output_dim, training=False)
inference_model.load_weights("models/best_model.h5")

# Load and preprocess video
print("[INFO] Loading and preprocessing video...")
frames = load_video(open(video_file, 'rb'))
input_tensor = preprocess_video(frames)

# Run inference
print("[INFO] Running inference...")
pred = inference_model.predict(input_tensor)

# Decode prediction
text = greedy_decoder(pred[0], charset)
print("🗣️ Predicted Text:", text)


[INFO] Loading charset and inference model...
[INFO] Loading and preprocessing video...
[INFO] Running inference...
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 548ms/step
🗣️ Predicted Text: let wr_e_ _t si_ lg_on
