https://arxiv.org/pdf/1611.01599


#Set up Environment

In [None]:
!pip install imageio==2.23.0
!pip install tensorflow==2.10.1
!pip install ultralytics

In [None]:
import os
import cv2
import tensorflow as tf
import numpy as np
import pandas as pd
import warnings
from typing import List
from matplotlib import pyplot as plt
import imageio # Creates gifs to see frames stacked together
from IPython.display import clear_output
CROP_HEIGHT = 46
CROP_WIDTH = 140

In [None]:
!pip list

In [None]:
physical_devices = tf.config.list_physical_devices('GPU')
try:
    tf.config.experimental.set_memory_growth(physical_devices[0], True)
    print("GPU found")
except:
    print("GPU not found")

# Load and preprocess data

In [None]:
from ultralytics import YOLO
# Load trained model
weights_path = '/content/best.pt'
try:
    object_detect_model = YOLO(weights_path)
    print(f"Loaded model with best weights from {weights_path}")
except FileNotFoundError:
    warnings.warn(f"MODEL NOT FOUND AT {weights_path}, LOADING GENERIC PRETRAINED YOLOv8 MODEL INSTEAD.")
    object_detect_model = YOLO('yolov8n.pt')  # Load a normal YOLO model with pretrained weights

In [None]:
import gdown # Downloads datasets with good compatibility Colab notebooks


url = 'https://drive.google.com/uc?id=1YlvpDLix3S-U8fd-gqRwPcWXAXm8JwjL' #More managable subset of very large original dataset
output = 'data.zip'
gdown.download(url, output, quiet=False)
gdown.extractall('data.zip')

In [None]:
def load_video(path: str, detection_model, crop_height: int, crop_width: int) -> List[float]:
    results = detection_model.predict(path, conf=0.5, show=False)
    preprocessed_frames = []

    for result in results:
        if result.boxes.xyxy.shape[0] > 0: # If any detections boxes were detected
            for box in result.boxes.xyxy:
                x1, y1, x2, y2 = box.int() # Get box coordinates as integers
                crop = result.orig_img[y1:y2, x1:x2] # Crop with indices
                grey = tf.image.rgb_to_grayscale(crop)
                resize = tf.image.resize(grey, (crop_height, crop_width))
                preprocessed_frames.append(resize)

    frames_tensor = tf.convert_to_tensor(preprocessed_frames, dtype=tf.float32)
    mean = tf.math.reduce_mean(frames_tensor)
    std = tf.math.reduce_std(frames_tensor)
    normalized_frames = (frames_tensor - mean) / std

    return normalized_frames

In [None]:
# VOCABULARY CONVERSION

vocab = [x for x in "abcdefghijklmnopqrstuvwxyz'?!123456789 "]
char_to_num = tf.keras.layers.StringLookup(vocabulary=vocab, oov_token="")
num_to_char = tf.keras.layers.StringLookup(
    vocabulary=char_to_num.get_vocabulary(), oov_token="", invert=True
)

print(
    f"Vocab List: {char_to_num.get_vocabulary()}\n"
    f"Vocab size: {char_to_num.vocab_size()}"
    )


In [None]:
print(char_to_num(['k', 'e', 'r', 'o']))
print(num_to_char([11, 5, 18, 15]))

In [None]:
def load_labels(path:str) -> List[str]:
    tokens = []
    with open(path) as f:
        lines = f.readlines()
        for line in lines:
            line = line.split()
            if line[2] != 'sil':
                tokens.append(' ')
                tokens.append(line[2])
    return char_to_num(tf.reshape(tf.strings.unicode_split(tokens, input_encoding='UTF-8'), [-1]))

In [None]:
def load_data(videos_path:str, show_shapes:bool=False): #
    videos_path = bytes.decode(videos_path.numpy())
    # Get the name of the file to be used for alignment path
    file_name = videos_path.split('/')[-1].split('.')[0]
    directory = os.path.dirname(os.path.dirname(videos_path))
    labels_path = f'{directory}/alignments/s1/{file_name}.align'
    frames = load_video(videos_path,
                        detection_model=object_detect_model,
                        crop_height=CROP_HEIGHT,
                        crop_width=CROP_WIDTH,
                       )
    labels = load_labels(labels_path)
    if show_shapes:
        print(f'Frames shape:{frames.shape}\nLabels shape: {labels.shape}')
    return frames, labels

In [None]:
# Paths input as tensors for TF Dataset API compatibility and mapping capabilities
frames, labels = load_data(tf.convert_to_tensor('/content/data/s1/bbaf2n.mpg'))

In [None]:
def decode_label(labels):
    decoded = [bytes.decode(x) for x in num_to_char(labels).numpy()]
    return tf.strings.reduce_join(decoded)
    #print(tf.strings.reduce_join(decoded).numpy())

In [None]:
plt.imshow(frames[np.random.randint(0, frames.shape[0])])
print(f"Sequence: {labels}\nLabel: {decode_label(labels)}")

In [None]:
def map_func(path:str)->List[str]:
    return tf.py_function(load_data, [path], (tf.float32, tf.int64))

# Data Pipeline

In [None]:
data = tf.data.Dataset.list_files('/content/data/s1/*.mpg')
data = data.shuffle(500, reshuffle_each_iteration=False)
data = data.map(map_func)
data = data.padded_batch(2, padded_shapes=([75, None, None, None], [40]))
data = data.prefetch(tf.data.AUTOTUNE)
train = data.take(450)
test = data.skip(450)

In [None]:
frames, labels = data.as_numpy_iterator().next()
plt.imshow(frames[0][0])
print(f"Sequence: {labels[0]} \n Label: {decode_label(labels[0])}")

In [None]:
test = data.as_numpy_iterator()
val = test.next()
val[0][0]

In [None]:
imageio.mimsave('./example.gif', val[0][0], fps=10)

#Make NN

In [None]:
vid_shape = data.as_numpy_iterator().next()[0][0].shape

In [None]:
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv3D, MaxPooling3D, Dropout, TimeDistributed, Flatten, Bidirectional, LSTM, Dense
from tensorflow.keras.optimizers import Adam

lipnet_model = Sequential([
    Conv3D(128, 3, input_shape=vid_shape, padding='same', activation='relu'),
    MaxPooling3D((1,2,2)),
    Dropout(0.5),
    Conv3D(256, 3, padding='same', activation='relu'),
    MaxPooling3D((1,2,2)),
    Dropout(0.5),
    Conv3D(75, 3, padding='same', activation='relu'),
    MaxPooling3D((1,2,2)),
    Dropout(0.5),

    TimeDistributed(Flatten()),

    Bidirectional(LSTM(128, kernel_initializer='Orthogonal', return_sequences=True)),
    Dropout(0.5),
    Bidirectional(LSTM(128, kernel_initializer='Orthogonal', return_sequences=True)),
    Dropout(0.5),

    Dense(char_to_num.vocabulary_size()+1, kernel_initializer='he_normal', activation='softmax')
])


In [None]:
lipnet_model.summary()

In [None]:
yhat = lipnet_model.predict(val[0])
print(f"Coded:\n{np.argmax(yhat[0], axis=1)}\n")
print(f"Decoded:\n{decode_label(np.argmax(yhat[0], axis=1))}")

# Train Model

In [None]:
def ctc_loss(y_true, y_pred):
    batch_len = tf.cast(tf.shape(y_true)[0], dtype="int64")
    input_length = tf.cast(tf.shape(y_pred)[1], dtype="int64")
    label_length = tf.cast(tf.shape(y_true)[1], dtype="int64")

    input_length = input_length * tf.ones(shape=(batch_len, 1), dtype="int64")
    label_length = label_length * tf.ones(shape=(batch_len, 1), dtype="int64")
    return tf.keras.backend.ctc_batch_cost(y_true, y_pred, input_length, label_length)

In [None]:
lipnet_model.compile(loss=ctc_loss, optimizer=Adam(learning_rate=0.001))

In [None]:
# Load Pretrained Lipnet Weights
url = 'https://drive.google.com/uc?id=1vWscXs4Vt0a_1IH1-ct2TCgXAZT-N3_Y'
output = 'checkpoints.zip'
gdown.download(url, output, quiet=False)
gdown.extractall('checkpoints.zip', 'models')
lipnet_model.load_weights('models/checkpoint')

In [None]:
#Train Model (Not required since pretrained weights are loaded)
#lipnet_model.fit(train, validation_data=test, epochs=50)

# Test Model

In [None]:
def batch_video(videos_path:str):
    video_tensor = load_video(videos_path,
                              detection_model=object_detect_model,
                              crop_height=CROP_HEIGHT,
                              crop_width=CROP_WIDTH,
                              )

    num_frames = video_tensor.shape[0]
    batch_size = 75

    # Number of complete batches based on batch size
    num_batches = num_frames // batch_size

    # Trim video from remainder frames to have a whole number of batches
    trimmed_frames = num_batches * batch_size
    trimmed_video_tensor = video_tensor[:trimmed_frames]

    # Form batches based on batch size and determined number of batches
    batches = tf.reshape(trimmed_video_tensor, (num_batches, batch_size, CROP_HEIGHT, CROP_WIDTH, 1))

    # Pad remainder frames that were trimmed
    remainder_frames = num_frames % batch_size
    if remainder_frames > 0:
        remainder_video_tensor = video_tensor[-remainder_frames:]
        padding_frames = batch_size - remainder_frames
        padding_tensor = tf.zeros((padding_frames, CROP_HEIGHT, CROP_WIDTH, 1), dtype=video_tensor.dtype)
        padded_remainder_video_tensor = tf.concat([remainder_video_tensor, padding_tensor], axis=0)

        # Add the padded batch to the rest of the batches
        batches = tf.concat([batches, tf.expand_dims(padded_remainder_video_tensor, axis=0)], axis=0)
        imageio.mimsave('./test.gif', batches[0], fps=10)

    return batches

In [None]:
def predict_and_compare(path:str, real_label:bool=True):

    if real_label:
        sample = load_data(tf.convert_to_tensor(path))
        yhat = [lipnet_model.predict(tf.expand_dims(sample[0], axis=0))]
    else:
        batches = batch_video(path)
        yhat = []
        for i in range(0,batches.shape[0]-1):
            print(f"Predicting for batch {i+1} of {batches.shape[0]}...")
            yhat.append(lipnet_model.predict(tf.expand_dims(batches[i], axis=0)))


    # Decode predictions
    clear_output()
    predictions_str = ""

    for prediction in yhat:
        decoded = tf.keras.backend.ctc_decode(prediction, input_length=[75], greedy=True)[0][0].numpy()
        predictions_str += decode_label(decoded[0]).numpy().decode('utf-8') + " "

    print(f"PREDICTIONS:\n {predictions_str.strip()}\n")
    if real_label:
        print(f"ACTUAL TEXT:\n {decode_label(sample[1]).numpy().decode('utf-8').strip()}")

In [None]:
# Repredicting for train data sample:
predict_and_compare('/content/data/s1/bbaz6p.mpg', real_label=True)

PREDICTIONS:
 s ee s ix pleasin

ACTUAL TEXT:
 bin blue at z six please


In [None]:
# Predicting for unseen test data:
predict_and_compare('/content/test_video.mp4', real_label=False)
actual_text = """Uh we didn't meet um until this morning. Um but I watched the France game when
I got home. Um I didn't I didn't watch uh our whole game I watched the France game first."""
print(f"ACTUAL TEXT:\n {actual_text}")

PREDICTIONS:
 s bue i oe slgain s een it t oue again  een in i sive sgain s bree ni so pslgain

ACTUAL TEXT:
 Uh we didn't meet um until this morning. Um but I watched the France game when 
I got home. Um I didn't I didn't watch uh our whole game I watched the France game first.
