**1. Importing libraries and loading data**

In [None]:
#Importing librairies
import os
import cv2
import tensorflow as tf
import numpy as np
import math
from matplotlib import pyplot as plt
import imageio

In [None]:
#Downloading and loading data
import gdown
url = 'https://drive.google.com/uc?id=1tMM16j54blSffGrXRXXWqIzH5SpjPT4F'
output = 'data.zip'
gdown.download(url, output, quiet=False)
gdown.extractall('data.zip')

In [None]:
#Setting physical devices to GPU if possible, changing to CPU otherwise
physical_devices = tf.config.list_physical_devices("GPU")
try:
    tf.config.experimental.get_memory_growth(physical_devices[0], True)
except:
    pass

**2. Initiating prerequisites for the data pipeline**

In [None]:
#Range of different clusters that the neural network needs to be able to predict
vocab = [x for x in "abcdefghijklmnopqrstuvwxyz'?!123456789 "]

In [None]:
char_to_num = tf.keras.layers.StringLookup(vocabulary=vocab, oov_token="") #takes character and converts to number
num_to_char = tf.keras.layers.StringLookup(
    vocabulary = char_to_num.get_vocabulary(), oov_token = "", invert = True  #takes number and converts to character
)

In [None]:
#Building video_loading function

def load_video(path:str) -> list[float]: #takes in a video and returns a list of floats

  cap = cv2.VideoCapture(path) #initiates cv2 video instance
  frames = [] #list to store frames in
  frame_number = 0  #counter for the maximum number of frames to crop from each video
  for _ in range(int(cap.get(cv2.CAP_PROP_FRAME_COUNT))): #iterates over every frame in the video input and keeps track of the count of frames
    ret, frame = cap.read()
    frame = tf.image.rgb_to_grayscale(frame)
    frames.append(frame[140:186, 110:250, :]) #cropping portion of the video that includes the mouth and appending cropped image to frames
    frame_number += 1
    if frame_number== 75:
      break
  cap.release() #releasing resources

#rescales the data and converts the images to float32 format:
  mean = tf.math.reduce_mean(frames)
  std = tf.math.reduce_std(tf.cast(frames, tf.float32))
  return tf.cast((frames - mean), tf.float32) / std

In [None]:
#Building load_alignments function

def load_alignments(path:str) -> list[str]: #takes in an alignment and returns a list of numbers
  with open(path, "r") as f:    #opens and reads lines in alignments
    lines = f.readlines()
  tokens = []
  for line in lines:
    line = line.split()   #splits the lines
    if line[2] != "sil":
      tokens = [*tokens, " ", line[2]] #appends the alignments to tokens if not silence (sil)
  return char_to_num(tf.reshape(tf.strings.unicode_split(tokens, input_encoding="UTF-8"), (-1)))[1:] #returns alignments as numbers

In [None]:
#Building data loading function that calls video and alignments laoding functions in order to load videos and alignments simultaneously

def load_data(path: str):
  path = bytes.decode(path.numpy()) #converts to numpy array and string format

  file_name = path.split("/")[-1].split(".")[0] #splits path

  video_path = os.path.join('data','videos',f"{file_name}.mp4")   #assigns path to video
  alignment_path = os.path.join('data','alignments',f"{file_name}.align")    #assigns path to alignment

  frames = load_video(video_path) #calls load_video function
  alignments = load_alignments(alignment_path)  #calls load_alignments function

  return frames, alignments


In [None]:
#Building mappable function to use in data pipeline:

def mappable_function(path:str) -> list[str]:
  result = tf.py_function(load_data, [path], (tf.float32, tf.int64))
  return result

**3. Constructing data pipeline**

In [None]:
#Data pipeline
data = tf.data.Dataset.list_files("./data/videos/*.mp4")  #accessing dataset we have stored and selecting all videos
data = data.shuffle(60)  #shuffling data
data = data.map(mappable_function)  #calling mappable_function specified before which converts the filepath of each data point into tensors by calling load_data function on every data point (i.e. every video)
data = data.padded_batch(1, padded_shapes=([75, None, None, None],[40])) #normalizing everything: batches of 2 videos and corresponding alignments with 75 frames per video and 40 tokens for each alignment
data = data.prefetch(tf.data.AUTOTUNE)
#splitting data into train and test sets
train = data.take(45)
test = data.skip(45)

In [None]:
len(train)

45

In [None]:
len(test)

15

**4.Designing deep neural network**

In [None]:
#Importing libraries
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv3D, LSTM, Dense, Dropout, Bidirectional, Activation, MaxPool3D, TimeDistributed, Flatten
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ModelCheckpoint, LearningRateScheduler

In [None]:
#Building model
model = Sequential()

#first layer of Conv3D with relu activation and MaxPool3D
model.add(Conv3D(128, 3, input_shape=(75, 46, 140, 1), padding="same")) #input_shape equals dimensions of our data
model.add(Activation("relu"))
model.add(MaxPool3D((1, 2, 2)))

#second layer of Conv3D with relu activation and MaxPool3D
model.add(Conv3D(256, 3, padding="same"))
model.add(Activation("relu"))
model.add(MaxPool3D((1, 2, 2)))

#third layer of Conv3D with relu activation and MaxPool3D
model.add(Conv3D(75, 3, padding="same"))
model.add(Activation("relu"))
model.add(MaxPool3D((1, 2, 2)))

#adding time distributed flatten layer that enables us to pass 75 inputs into the lstm that will output 75 units representing textbased characters
model.add(TimeDistributed(Flatten()))

#adding 2 layers of LSTM
model.add(Bidirectional(LSTM(128,   kernel_initializer="Orthogonal", return_sequences=True)))
model.add(Dropout(.5))  #randomly sets 50% of input units to 0 for each iteration --> prevents overfitting

model.add(Bidirectional(LSTM(128,   kernel_initializer="Orthogonal", return_sequences=True)))
model.add(Dropout(.5))  #randomly sets 50% of input units to 0 for each iteration --> prevents overfitting

#adding dense layer
model.add(Dense(char_to_num.vocabulary_size()+1, kernel_initializer="he_normal", activation="softmax"))

In [None]:
#have a look at the shape of the model
model.summary()

**5. Training**

In [None]:
#Scheduler
def scheduler(epoch, lr):
  if epoch < 30:
    return lr
  else:
    return lr * tf.math.exp(-0.1)

CTCLoss function is suited for learning tasks where the exact timing of data is not known and may vary (e.g. sound or video) thus why it needs the length of input and labels as well as y_true and y_pred in order to calculate the loss

In [None]:
# CTC loss function
def CTCLoss(y_true, y_pred):
  batch_len = tf.cast(tf.shape(y_true)[0], dtype="int64")
  input_length = tf.cast(tf.shape(y_pred)[1], dtype="int64")
  label_length = tf.cast(tf.shape(y_true)[1], dtype="int64")

  input_length = input_length * tf.ones(shape=(batch_len, 1), dtype="int64")
  label_length = label_length * tf.ones(shape=(batch_len, 1), dtype="int64")

  loss = tf.keras.backend.ctc_batch_cost(y_true, y_pred, input_length, label_length)
  return loss

This code snippet defines a custom callback class named ProduceExample for TensorFlow (TF) models, specifically designed to work with Keras. Callbacks in Keras are functions that are called at certain points during model training, allowing you to hook into the training process for purposes such as logging, model checkpointing, or performing custom operations. The ProduceExample class is intended to be used during the training of a model, particularly for tasks involving sequence prediction, such as text generation or speech recognition.

In [None]:
class ProduceExample(tf.keras.callbacks.Callback):
  def __init__(self, dataset) -> None:
    self.dataset = dataset.as_numpy_iterator()

  def on_epoch_end(self, epoch, logs=None) -> None:
    data = self.dataset.next()
    yhat = self.model.predict(data[0])
    decoded = tf.keras.backend.ctc_decode(yhat, [75], greedy=False)[0][0].numpy()

    for x in range(len(yhat)):
      print('Original:', tf.strings.reduce_join(num_to_char(data[1][x])).numpy().decode('utf-8'))
      print('Prediction:', tf.strings.reduce_join(num_to_char(decoded[x])).numpy().decode('utf-8'))
      print('~' * 100)

In [None]:
#Initiating legacy optimizer in order to be able to load the 95 epochs weights
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.0001), loss=CTCLoss)

In [None]:
#Compiling the model with optimizer Adam, learning rate and CTCLoss
model.compile(optimizer=Adam(learning_rate=0.0001), loss=CTCLoss)

In [None]:
#Downloading the checkpoints
url = 'https://drive.google.com/uc?id=1vWscXs4Vt0a_1IH1-ct2TCgXAZT-N3_Y'
output = 'checkpoints.zip'
gdown.download(url, output, quiet=False)
gdown.extractall('checkpoints.zip')

In [None]:
from tensorflow.keras.callbacks import ModelCheckpoint

#specify the full path with the desired filename and extension
checkpoint_filepath = './checkpoints'

In [None]:
#create the ModelCheckpoint callback --> store training checkpoints for later use
checkpoint_callback = ModelCheckpoint(checkpoint_filepath, monitor='loss', save_weights_only=True)

In [None]:
#Ensuring learning rate drops each epoch
schedule_callback = LearningRateScheduler(scheduler)

In [None]:
#Example callback calls the ProduceExample class to determine how well the model is performing after each epoch
example_callback = ProduceExample(data)

Loading weights for past training iterations. In case preexisting weights exist or have been produced from previous trainingsessions, they can be loaded into the model at this point and training can resume in order to increase performance.

In [None]:
model.load_weights(checkpoint_filepath)

In [None]:
#Fitting the model
model.fit(train, validation_data=test, epochs=30, callbacks=[checkpoint_callback, schedule_callback])

**6. Making prediction**

In [None]:
#Compiling the model with optimizer Adam, learning rate and CTCLoss:
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.0001), loss=CTCLoss)

In [None]:
#Loading the weights
model.load_weights(checkpoint_filepath)

In [None]:
test_data = test.as_numpy_iterator()

In [None]:
sample = test_data.next()

In [None]:
yhat = model.predict(sample[0])

In [None]:
decoded = tf.keras.backend.ctc_decode(yhat, input_length=[75], greedy=True)[0][0].numpy()

In [None]:
print('~'*100, 'PREDICTIONS')
[tf.strings.reduce_join([num_to_char(word) for word in sentence]) for sentence in decoded]

In [None]:
print('~'*100, 'REAL TEXT')
[tf.strings.reduce_join([num_to_char(word) for word in sentence]) for sentence in sample[1]]

Testing on video: A single datapoint from the dataset is selected in order to test the models prediction capabilities

In [None]:
sample = load_data(tf.convert_to_tensor('./data/videos/S_Banana_Later_Right_Hate.mp4'))

In [None]:
# 0:videos, 0: 1st video out of the batch,  0: return the first frame in the video
plt.imshow(sample[0][40])

In [None]:
print('~'*100, 'REAL TEXT')
[tf.strings.reduce_join([num_to_char(word) for word in sentence]) for sentence in [sample[1]]]

In [None]:
yhat = model.predict(tf.expand_dims(sample[0], axis=0))

In [None]:
decoded = tf.keras.backend.ctc_decode(yhat, input_length=[75], greedy=True)[0][0].numpy()

In [None]:
print('~'*100, 'PREDICTIONS')
[tf.strings.reduce_join([num_to_char(word) for word in sentence]) for sentence in decoded]

**7. Measuring performance of the model**

Measuring word error rate (WER) and character error rate (CER) in order to assess the performance of the model

In [None]:
import tensorflow as tf
from tensorflow.python.ops import string_ops

# Word Error Rate
def wer(real_text, pred_text):
    real_words = real_text.split()
    pred_words = pred_text.split()
    real_words = [word for word in real_words if word!= ""]  # Remove empty strings
    pred_words = [word for word in pred_words if word!= ""]  # Remove empty strings

    # Calculate the number of substitutions, insertions, and deletions
    substitutions = sum([real_word!= pred_word for real_word, pred_word in zip(real_words, pred_words)])
    insertions = len(pred_words) - len(real_words)
    deletions = len(real_words) - len(pred_words)

    # Calculate the total number of operations
    total_operations = substitutions + insertions + deletions

    # Calculate the word error rate
    wer = total_operations / len(real_words)
    return wer

In [None]:
def cer(real_text, pred_text):
    real_chars = real_text.replace(" ", "")  # Remove spaces to treat as individual characters
    pred_chars = pred_text.replace(" ", "")  # Remove spaces to treat as individual characters

    # Calculate the number of substitutions, insertions, and deletions
    substitutions = sum([real_char!= pred_char for real_char, pred_char in zip(real_chars, pred_chars)])
    insertions = len(pred_chars) - len(real_chars)
    deletions = len(real_chars) - len(pred_chars)

    # Calculate the total number of operations
    total_operations = substitutions + insertions + deletions

    # Calculate the character error rate
    cer = total_operations / len(real_chars)
    return cer

Calculating the average WER and CER over every datapoint in the test set

In [None]:
import numpy as np

def get_average_wer_and_cer(test):

  # Initialize variables to store the total WER, the total CER and the count of samples
  total_wer = 0
  total_cer = 0
  count = 0

  # Iterate over each sample in the test dataset
  for sample in test:
      # Predict the text for the current sample
      yhat = model.predict(sample[0])
      decoded = tf.keras.backend.ctc_decode(yhat, input_length=[75], greedy=True)[0][0].numpy()
      pred_text = tf.strings.reduce_join(num_to_char(decoded)).numpy().decode('utf-8')

      # Get the actual text for the current sample
      real_text = tf.strings.reduce_join(num_to_char(sample[1])).numpy().decode('utf-8')

      # Now pass the joined strings to the wer function
      wer_value = wer(real_text, pred_text)
      cer_value = cer(real_text, pred_text)
      # Update the total WER, the total CER and count
      total_wer += wer_value
      total_cer += cer_value
      count += 1
  # Calculate the average WER across all samples
  average_wer = total_wer / count if count > 0 else 0
  # Calculate the average CER across all samples
  average_cer = total_cer / count if count > 0 else 0
  print(f"Average Word Error Rate: {average_wer}")
  print(f"Average Character Error Rate: {average_cer}")

In [None]:
get_average_wer_and_cer(test_data)