#Installing and importing the libraries

In [72]:
!pip install opencv-python matplotlib imageio gdown

In [2]:
import os
import cv2
import tensorflow as tf
import numpy as np
from typing import List
import gdown
from matplotlib import pyplot as plt
import imageio

In [3]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv3D, LSTM, Dense, Dropout, Bidirectional, MaxPool3D, Activation, Reshape, SpatialDropout3D, BatchNormalization, TimeDistributed, Flatten
from tensorflow.keras.optimizers import Adam,legacy
from tensorflow.keras.callbacks import ModelCheckpoint, LearningRateScheduler


In [4]:
# List available physical GPU devices
physical_devices = tf.config.list_physical_devices('GPU')

In [5]:
physical_devices = tf.config.list_physical_devices('GPU')
try:
    # Attempt to set memory growth for the first GPU
    tf.config.experimental.set_memory_growth(physical_devices[0], True)
except:
    pass

#DataLoading


In [72]:
# Define the URL of the dataset containing videos of 1 speaker of GRID Dataset
url = 'https://drive.google.com/uc?id=1YlvpDLix3S-U8fd-gqRwPcWXAXm8JwjL'

# Define the output file name for the downloaded ZIP file
output = 'data.zip'

# Download the dataset from the specified URL
gdown.download(url, output, quiet=False)

# Extract the contents of the ZIP file
gdown.extractall('data.zip')

In [7]:
def load_video(path: str) -> List[float]:
    """
    Load and preprocess a video from the specified path.

    Parameters:
        path (str): The path to the video file.

    Returns:
        List[float]: A list of preprocessed frames from the video."""


    # Open the video file
    cap = cv2.VideoCapture(path)

    # Initialize an empty list to store frames
    frames = []

    # Loop through all frames in the video
    for _ in range(int(cap.get(cv2.CAP_PROP_FRAME_COUNT))):
        # Read a frame from the video
        ret, frame = cap.read()

        # Convert the frame to grayscale
        frame = tf.image.rgb_to_grayscale(frame)

        # Crop the frame to the region of interest i.e. lip region
        frames.append(frame[190:236, 80:220, :])

    # Release the video capture object
    cap.release()

    # Calculate mean and standard deviation for normalization
    mean = tf.math.reduce_mean(frames)
    std = tf.math.reduce_std(tf.cast(frames, tf.float32))

    # Normalize frames and cast to float32
    return tf.cast((frames - mean), tf.float32) / std


In [8]:
#Define the vocabulary as a list of all possible characters we may encounter in our annotations
vocab = [x for x in "abcdefghijklmnopqrstuvwxyz'?!123456789 "]

In [72]:
# String to number mapping
char_to_num = tf.keras.layers.StringLookup(
    vocabulary=vocab, oov_token=""
)

# Number to string mapping
num_to_char = tf.keras.layers.StringLookup(
    vocabulary=char_to_num.get_vocabulary(), oov_token="", invert=True
)

# Print vocabulary information
print(
    f"The vocabulary is: {char_to_num.get_vocabulary()} "
    f"(size ={char_to_num.vocabulary_size()})"
)

In [72]:
char_to_num.get_vocabulary()

In [11]:
def load_alignments(path:str) -> List[str]:
    """
    Load alignments from a text file at the specified path.

    Parameters:
        path (str): The path to the text file containing alignments.

    Returns:
        List[str]: A list of phonetic tokens extracted from the alignments file.
    """
    with open(path, 'r') as f:
        lines = f.readlines()

    # Initialize an empty list to store phonetic tokens
    tokens = []

    # Iterate through each line in the file
    for line in lines:
        line = line.split()
        # Check if the token is not 'sil' (silence)
        if line[2] != 'sil':
            tokens = [*tokens,' ',line[2]]

    # Convert phonetic tokens to numerical indices using char_to_num layer, and exclude the first space token
    return char_to_num(tf.reshape(tf.strings.unicode_split(tokens, input_encoding='UTF-8'), (-1)))[1:]


In [12]:
def load_data(path: str):
    """
    Load data from the specified path

    Parameters:
        path (str): The path to the data file.

    Returns:
        Tuple[List[float], List[str]]: A tuple containing the loaded video frames and alignments.
    """
    # Convert path from bytes to string
    path = bytes.decode(path.numpy())

    # Extract file name from the path
    file_name = path.split('/')[-1].split('.')[0]

    # Construct paths for video and alignment files
    video_path = os.path.join('data', 's1', f'{file_name}.mpg')
    alignment_path = os.path.join('data', 'alignments', 's1', f'{file_name}.align')

    # Load video frames
    frames = load_video(video_path)

    # Load alignments
    alignments = load_alignments(alignment_path)

    return frames, alignments


In [13]:
def mappable_function(path: str) -> List[str]:
    """
    A mappable function to load data from the specified path.

    Parameters:
        path (str): The path to the data file.

    Returns:
        List[str]: A list containing the loaded video frames and alignments.
    """
    # Call load_data using tf.py_function
    result = tf.py_function(load_data, [path], (tf.float32, tf.int64))

    return result

# Visualising One Example in our dataset


In [14]:
test_path = '/content/data/s1/bbaf2n.mpg'

In [15]:
#Load the data and unpack the frame and alignments
frames, alignments = load_data(tf.convert_to_tensor(test_path))

In [72]:
#Show one instance of the lip
plt.imshow(frames[40])

In [72]:
#Decode numerical alignments into human-readable phonetic tokens.
tf.strings.reduce_join([bytes.decode(x) for x in num_to_char(alignments.numpy()).numpy()])

#Data Pipeline



In [34]:
# Load data files from the specified directory
data = tf.data.Dataset.list_files('./data/s1/*.mpg')

# Shuffle the dataset with a buffer size of 500
data = data.shuffle(500, reshuffle_each_iteration=False)

# Map the mappable_function to load data for each file in the dataset
data = data.map(mappable_function)

# Pad and batch the dataset with group size 2, padding frames to have shapes ([75,None,None,None]) and alignments to have shape ([40])
data = data.padded_batch(2, padded_shapes=([75,None,None,None],[40]))

# Prefetch data to improve pipeline performance
data = data.prefetch(tf.data.AUTOTUNE)

#Split Data
train = data.take(450)
test = data.skip(450)

In [72]:
len(data)

In [36]:
frames, alignments = data.as_numpy_iterator().next()

In [37]:
sample = data.as_numpy_iterator()

In [72]:
val = sample.next(); val[0]

In [72]:
# 0:videos, 0: 1st video out of the batch,  35: return the 35th frame in the video
plt.imshow(val[0][0][35])

In [72]:
data.as_numpy_iterator().next()[0][0].shape

#MODEL


In [41]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv3D, Activation, MaxPool3D, TimeDistributed, Flatten, Bidirectional, LSTM, Dropout, Dense

# Create a Sequential model
model = Sequential()

# Add a 3D convolutional layer with 128 filters, kernel size 3x3x3, and input shape of (75, 46, 140, 1)
model.add(Conv3D(128, 3, input_shape=(75,46,140,1), padding='same'))

# Add ReLU activation function
model.add(Activation('relu'))

# Add 3D max pooling layer with pool size (1,2,2)
model.add(MaxPool3D((1,2,2)))

# Add another 3D convolutional layer with 256 filters and kernel size 3x3x3
model.add(Conv3D(256, 3, padding='same'))

# Add ReLU activation function
model.add(Activation('relu'))

# Add 3D max pooling layer with pool size (1,2,2)
model.add(MaxPool3D((1,2,2)))

# Add another 3D convolutional layer with 75 filters and kernel size 3x3x3
model.add(Conv3D(75, 3, padding='same'))

# Add ReLU activation function
model.add(Activation('relu'))

# Add 3D max pooling layer with pool size (1,2,2)
model.add(MaxPool3D((1,2,2)))

# Add TimeDistributed layer to apply Flatten operation to each time step independently
model.add(TimeDistributed(Flatten()))

# Add Bidirectional LSTM layer with 128 units, using Orthogonal kernel initializer, returning sequences
model.add(Bidirectional(LSTM(128, kernel_initializer='Orthogonal', return_sequences=True)))

# Add dropout layer with dropout rate of 0.5
model.add(Dropout(0.5))

# Add another Bidirectional LSTM layer with 128 units, using Orthogonal kernel initializer, returning sequences
model.add(Bidirectional(LSTM(128, kernel_initializer='Orthogonal', return_sequences=True)))

# Add dropout layer with dropout rate of 0.5
model.add(Dropout(0.5))

# Add Dense layer with number of units equal to vocabulary size + 1, using he_normal kernel initializer and softmax activation function
model.add(Dense(char_to_num.vocabulary_size() + 1, kernel_initializer='he_normal', activation='softmax'))


In [72]:
model.summary()

#Setting up Training


In [43]:
def scheduler(epoch, lr):
    """
    Learning rate scheduler function.

    Args:
    - epoch (int): The current epoch number.
    - lr (float): The current learning rate.

    Returns:
    - float: The updated learning rate based on the epoch number.
    """

    if epoch < 30:
        return lr
    else:
        return lr * tf.math.exp(-0.1)


In [44]:
def CTCLoss(y_true, y_pred):
    """
    Compute the Connectionist Temporal Classification (CTC) loss.

    Args:
    - y_true (tensor): True labels. Expected to have shape (batch_size, max_label_length).
    - y_pred (tensor): Predicted logits. Expected to have shape (batch_size, max_input_length, num_classes).

    Returns:
    - tensor: CTC loss.
    """

    batch_len = tf.cast(tf.shape(y_true)[0], dtype="int64")
    input_length = tf.cast(tf.shape(y_pred)[1], dtype="int64")
    label_length = tf.cast(tf.shape(y_true)[1], dtype="int64")

    input_length = input_length * tf.ones(shape=(batch_len, 1), dtype="int64")
    label_length = label_length * tf.ones(shape=(batch_len, 1), dtype="int64")

    loss = tf.keras.backend.ctc_batch_cost(y_true, y_pred, input_length, label_length)
    return loss


In [45]:
class ProduceExample(tf.keras.callbacks.Callback):
    """
    Callback to produce examples of model predictions at the end of each epoch.

    Args:
    - dataset (tf.data.Dataset): Dataset used for evaluation.

    Methods:
    - on_epoch_end(epoch, logs=None): Called at the end of each epoch to generate and print model predictions.
    """
    def __init__(self, dataset) -> None:
        """
        Initialize the callback.

        Args:
        - dataset (tf.data.Dataset): Dataset used for evaluation.
        """
        self.dataset = dataset.as_numpy_iterator()

    def on_epoch_end(self, epoch, logs=None) -> None:
        """
        Generate and print model predictions at the end of each epoch.

        Args:
        - epoch (int): Current epoch number.
        - logs (dict): Dictionary containing the loss value and any other metrics during training.
        """
        data = self.dataset.next()
        yhat = self.model.predict(data[0])
        decoded = tf.keras.backend.ctc_decode(yhat, [75,75], greedy=False)[0][0].numpy()
        for x in range(len(yhat)):
            print('Original:', tf.strings.reduce_join(num_to_char(data[1][x])).numpy().decode('utf-8'))
            print('Prediction:', tf.strings.reduce_join(num_to_char(decoded[x])).numpy().decode('utf-8'))
            print('~'*100)


In [46]:
model.compile(optimizer = legacy.Adam(), loss=CTCLoss)

In [47]:
checkpoint_callback = ModelCheckpoint(os.path.join('models','checkpoint'), monitor='loss', save_weights_only=True)


In [48]:
schedule_callback = LearningRateScheduler(scheduler)

In [49]:
example_callback = ProduceExample(test)

In [50]:
model.fit(train, validation_data=test, epochs=96, callbacks=[checkpoint_callback, schedule_callback, example_callback])


#PREDICTIONS

In [72]:
url = 'https://drive.google.com/uc?id=1vWscXs4Vt0a_1IH1-ct2TCgXAZT-N3_Y'
output = 'checkpoints.zip'
gdown.download(url, output, quiet=False)
gdown.extractall('checkpoints.zip', 'models')

In [72]:
model.load_weights('models/checkpoint')

In [53]:
optimizer = legacy.Adam()

In [54]:
test_data = test.as_numpy_iterator()

In [55]:
sample = test_data.next()

In [56]:
yhat = model.predict(sample[0])



In [59]:
print('~'*100, 'REAL TEXT')
[tf.strings.reduce_join([num_to_char(word) for word in sentence]) for sentence in sample[1]]

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ REAL TEXT


[<tf.Tensor: shape=(), dtype=string, numpy=b'lay green at z seven soon'>,
 <tf.Tensor: shape=(), dtype=string, numpy=b'place white with e zero now'>]

In [60]:
decoded = tf.keras.backend.ctc_decode(yhat, input_length=[75,75], greedy=True)[0][0].numpy()

In [61]:
print('~'*100, 'PREDICTIONS')
[tf.strings.reduce_join([num_to_char(word) for word in sentence]) for sentence in decoded]

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ PREDICTIONS


[<tf.Tensor: shape=(), dtype=string, numpy=b'lay green at z seven soon'>,
 <tf.Tensor: shape=(), dtype=string, numpy=b'place white with zero now'>]

In [68]:
sample = load_data(tf.convert_to_tensor('/content/data/s1/lrarzn.mpg'))

In [69]:
print('~'*100, 'REAL TEXT')
[tf.strings.reduce_join([num_to_char(word) for word in sentence]) for sentence in [sample[1]]]

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ REAL TEXT


[<tf.Tensor: shape=(), dtype=string, numpy=b'lay red at r zero now'>]

In [70]:
yhat = model.predict(tf.expand_dims(sample[0], axis=0))



In [71]:
decoded = tf.keras.backend.ctc_decode(yhat, input_length=[75], greedy=True)[0][0].numpy()

In [72]:
print('~'*100, 'PREDICTIONS')
[tf.strings.reduce_join([num_to_char(word) for word in sentence]) for sentence in decoded]

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ PREDICTIONS


[<tf.Tensor: shape=(), dtype=string, numpy=b'lay red at r zero now'>]