# 0. Install and Import Dependencies

In [None]:
!pip list

In [None]:
!pip install opencv-python matplotlib imageio gdown tensorflow

In [None]:
import os
import cv2
import tensorflow as tf
import numpy as np
from typing import List
from matplotlib import pyplot as plt
import imageio

In [None]:
tf.config.list_physical_devices('GPU')

In [None]:
physical_devices = tf.config.list_physical_devices('GPU')
try:
    tf.config.experimental.set_memory_growth(physical_devices[0], True)
except:
    pass

# 1. Build Data Loading Functions

In [None]:
import gdown

In [None]:
url = 'https://drive.google.com/uc?id=1YlvpDLix3S-U8fd-gqRwPcWXAXm8JwjL'
output = 'data.zip'
gdown.download(url, output, quiet=False)
gdown.extractall('data.zip')


****Prompt: Write a Python function using OpenCV and TensorFlow that loads a video from a given file path, extracts all its frames, converts each frame to grayscale,crops a fixed region from each frame (from rows 190 to 236 and columns 80 to 220), and normalizes the resulting frames using the dataset mean and standard deviation. Return the processed frames as a list of normalized tensors.Video loading code ,converting frames into grayscale(25fps)****


In [None]:
def load_video(path:str) -> List[float]:

    cap = cv2.VideoCapture(path)
    frames = []
    for _ in range(int(cap.get(cv2.CAP_PROP_FRAME_COUNT))):
        ret, frame = cap.read()
        frame = tf.image.rgb_to_grayscale(tf.convert_to_tensor(frame))
        frames.append(frame[190:236,80:220,:])
    cap.release()

    mean = tf.math.reduce_mean(frames)
    std = tf.math.reduce_std(tf.cast(frames, tf.float32))
    return tf.cast((frames - mean), tf.float32) / std

In [None]:
vocab = [x for x in "abcdefghijklmnopqrstuvwxyz'?!123456789 "]

In [None]:
import tensorflow as tf

# List all available physical devices (GPUs)
physical_devices = tf.config.list_physical_devices('GPU')

# Check if GPUs are available
if len(physical_devices) > 0:
    for gpu in physical_devices:
        try:
# Setting memory growth for each GPU to True
            tf.config.experimental.set_memory_growth(gpu, True)
        except RuntimeError as e:
            print(f"Error setting memory growth on {gpu}: {e}")
else:
    print("No GPU devices found.")


**Create two StringLookup layers in tensorFlow.one for converting characters to numbers and another for converting numbers back to characters.Also print the vocabulary and its size.**

In [None]:
char_to_num = tf.keras.layers.StringLookup(vocabulary=vocab, oov_token="")
num_to_char = tf.keras.layers.StringLookup(
    vocabulary=char_to_num.get_vocabulary(), oov_token="", invert=True
)

print(
    f"The vocabulary is: {char_to_num.get_vocabulary()} "
    f"(size ={char_to_num.vocabulary_size()})"
)

In [None]:
char_to_num.get_vocabulary()

In [None]:
#example testing for character_to_num function
char_to_num(['n','i','c','k'])

In [None]:
num_to_char([14,9,3,11])

In [None]:
#Returns a list of character token IDs (for use in CTC loss model)
#sil here is silence


def load_alignments(path:str)->List[str]:
    with open(path, 'r') as f:
        lines = f.readlines()
    tokens = []
    for line in lines:
        line = line.split()
        if line[2] != 'sil':
            tokens = [*tokens,' ',line[2]]
    return char_to_num(tf.reshape(tf.strings.unicode_split(tokens,input_encoding='UTF-8'),(-1)))[1:]

**Write a Python function that takes a TensorFlow tensor representing a file path, decodes it to a string, and extracts the file name. Use this to generate the paths for a video file and its corresponding alignment file. Load the video frames and alignment tokens using appropriate helper functions and return both.**

In [None]:
#the following function:Converts a path tensor to a usable string.
#Extracts the sample name.
#Loads the corresponding video and phoneme alignment.

def load_data(path: tf.Tensor):
    path = path.numpy().decode('utf-8')
    file_name = os.path.splitext(os.path.basename(path))[0]

    video_path = os.path.join('data', 's1', f'{file_name}.mpg')
    alignment_path = os.path.join('data', 'alignments', 's1', f'{file_name}.align')

    frames = load_video(video_path)
    alignments = load_alignments(alignment_path)

    return frames, alignments


In [None]:
test_path = '/kaggle/working/data/s1/bbaf2n.mpg'


In [None]:
tf.convert_to_tensor(test_path).numpy().decode('utf-8').split('\\')[-1].split('.')[0]

In [None]:
frames, alignments = tf.py_function(load_data, [tf.convert_to_tensor('/kaggle/working/data/s1/bbaf2n.mpg')], (tf.float32, tf.int64))


In [None]:
plt.imshow(frames[40])

In [None]:
#showing 21 timesteps and the numbers in them are char_to_num representation
alignments

In [None]:
tf.strings.reduce_join([bytes.decode(x) for x in num_to_char(alignments.numpy()).numpy()])

# 2. Create Data Pipeline

In [None]:
from matplotlib import pyplot as plt

In [None]:
def mappable_function(path: str):
    def load_data_wrapper(path: str):  #wrapper function
        path = bytes.decode(path.numpy())
        file_name = os.path.basename(path).split('.')[0]
        video_path = os.path.join('data', 's1', f'{file_name}.mpg')
        alignment_path = os.path.join('data', 'alignments', 's1', f'{file_name}.align')
        frames = load_video(video_path)  # Assuming load_video is defined elsewhere
        alignments = load_alignments(alignment_path)  # Assuming load_alignments is defined elsewhere
        return frames, alignments

    result = tf.py_function(load_data_wrapper, [path], (tf.float32, tf.int64))
    return result

#data pipeline setup 
data = tf.data.Dataset.list_files('./data/s1/*.mpg')
data = data.shuffle(500, reshuffle_each_iteration=False)
data = data.map(mappable_function)  # Using the wrapper function
data = data.padded_batch(2, padded_shapes=([75, None, None, None], [40]))
data = data.prefetch(tf.data.AUTOTUNE)# Added for split
train = data.take(450)
test = data.skip(450)

In [None]:
len(test)

In [None]:
len(frames)

In [None]:
sample = data.as_numpy_iterator()

In [None]:
val = sample.next(); val[0]

In [None]:
# 0:videos, 0: 1st video out of the batch,  0: return the first frame in the video
plt.imshow(val[0][0][35])

In [None]:
tf.strings.reduce_join([num_to_char(word) for word in val[1][0]])

# 3. Design the Deep Neural Network

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv3D, LSTM, Dense, Dropout, Bidirectional, MaxPool3D, Activation, Reshape, SpatialDropout3D, BatchNormalization, TimeDistributed, Flatten
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ModelCheckpoint, LearningRateScheduler

In [None]:
data.as_numpy_iterator().next()[0][0].shape

In [None]:
#Model creation(Implementing LipNet 2014)
model = Sequential()
model.add(Conv3D(128, 3, input_shape=(75,46,140,1), padding='same'))
model.add(Activation('relu'))
model.add(MaxPool3D((1,2,2)))

model.add(Conv3D(256, 3, padding='same'))
model.add(Activation('relu'))
model.add(MaxPool3D((1,2,2)))

model.add(Conv3D(75, 3, padding='same'))
model.add(Activation('relu'))
model.add(MaxPool3D((1,2,2)))

model.add(Reshape((75, 5 * 17 * 75)))

model.add(Bidirectional(LSTM(128, kernel_initializer='Orthogonal', return_sequences=True)))
model.add(Dropout(.5))

model.add(Bidirectional(LSTM(128, kernel_initializer='Orthogonal', return_sequences=True)))
model.add(Dropout(.5))

model.add(Dense(char_to_num.vocabulary_size()+1, kernel_initializer='he_normal', activation='softmax'))

In [None]:
model.summary()

# 4. Setup Training Options and Train

In [None]:
def scheduler(epoch,lr):
    if epoch < 30:
        return lr
    else:
        return lr * tf.math.exp(-0.1)

In [None]:
# Defining a loss function for LipNet(CTCLoss used in the paper)
def CTCLoss(y_true, y_pred):
    batch_len = tf.cast(tf.shape(y_true)[0], dtype="int64")
    input_length = tf.cast(tf.shape(y_pred)[1], dtype="int64")
    label_length = tf.cast(tf.shape(y_true)[1], dtype="int64")

    input_length = input_length * tf.ones(shape=(batch_len, 1), dtype="int64")
    label_length = label_length * tf.ones(shape=(batch_len, 1), dtype="int64")

    loss = tf.keras.backend.ctc_batch_cost(y_true, y_pred, input_length, label_length)
    return loss

In [None]:
#This function tests a sample video after every epoch
class ProduceExample(tf.keras.callbacks.Callback):
    def __init__(self, dataset) -> None:
        self.dataset = dataset.as_numpy_iterator()

    def on_epoch_end(self, epoch, logs=None) -> None:
        data = self.dataset.next()
        yhat = self.model.predict(data[0])
        decoded = tf.keras.backend.ctc_decode(yhat, [75,75], greedy=False)[0][0].numpy()
        for x in range(len(yhat)):
            print('Original:', tf.strings.reduce_join(num_to_char(data[1][x])).numpy().decode('utf-8'))
            print('Prediction:', tf.strings.reduce_join(num_to_char(decoded[x])).numpy().decode('utf-8'))
            print('~'*100)

In [None]:
model.compile(optimizer=Adam(learning_rate=0.0001), loss=CTCLoss)

In [None]:
import os
from tensorflow.keras.callbacks import ModelCheckpoint

# Create a directory to save checkpoints
checkpoint_dir = './checkpoints'
os.makedirs(checkpoint_dir, exist_ok=True)

checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath=os.path.join(checkpoint_dir, 'model_epoch_{epoch:02d}.keras'),
    save_weights_only=False,
    save_best_only=False,
    save_freq='epoch',
    verbose=1
)


In [None]:
import tensorflow as tf

try:
    latest_model_path = '/kaggle/input/notebookf0d2dc04cb/checkpoints/'
    model = tf.saved_model.load(latest_model_path)
    print(f"Successfully loaded model using tf.saved_model.load() from: {latest_model_path}")

    # You might need to wrap the loaded SavedModel in a Keras layer
    # if you want to use .fit() directly. This depends on how it was saved.
    # For example:z
    # loaded_keras_model = tf.keras.models.Sequential([
    #     tf.keras.layers.Input(shape=input_shape), # Replace input_shape
    #     model.signatures['serving_default'] # Access the inference function
    # ])
    #
    # However, for resuming training, it's usually better if it's a direct Keras model.

except Exception as e:
    print(f"Error loading with tf.saved_model.load(): {e}")
    print("Trying the next method...")

In [None]:
# Load the previously saved model
latest_checkpoint_path = '/kaggle/input/notebookf0d2dc04cb/checkpoints/model_epoch_66.keras'
model = tf.keras.models.load_model(latest_checkpoint_path, custom_objects={'CTCLoss': CTCLoss})
print(f"Resuming training from epoch {int(latest_checkpoint_path.split('_')[-1].split('.')[0])}")

initial_epoch = int(latest_checkpoint_path.split('_')[-1].split('.')[0])

In [None]:
model.fit(

    train,

    validation_data=test,

    epochs=100,

    callbacks=[checkpoint_callback],

    initial_epoch=initial_epoch

)
model.save("final_model.keras")

In [None]:
model.save("lipnet.keras")
