# 0. Install and Import Dependencies

In [None]:
# install all dependencies listed in the 'requirements.txt' file with their specified versions.

In [None]:
import os
import cv2
import tensorflow as tf
import numpy as np
from typing import List
from matplotlib import pyplot as plt
import imageio

In [None]:
# Check if GPU is Present
tf.config.list_physical_devices('GPU')

In [None]:
# Prevent Exponential Memory Growth
physical_devices = tf.config.list_physical_devices('GPU')
try:
    tf.config.experimental.set_memory_growth(physical_devices[0], True)
except:
    pass

# 1. Build Data Loading Functions

In [None]:
# Downloading and Extracting the Dataset

# import gdown
# url = 'https://drive.google.com/file/d/1czmd2kDblp7t6wambFSFf5YgzU5k85kV/view?usp=sharing'
# output = 'dataset.zip'
# gdown.download(url=url, output=output, fuzzy=True)
# gdown.extractall('dataset.zip')

In [None]:
vocab = [x for x in "abcdefghijklmnopqrstuvwxyz'?!123456789 "]
print(vocab, end="")

In [None]:
char_to_num = tf.keras.layers.StringLookup(vocabulary=vocab, oov_token="") 
num_to_char = tf.keras.layers.StringLookup(vocabulary=char_to_num.get_vocabulary(), oov_token="", invert=True) 

print(f'The vocabulary is: {char_to_num.get_vocabulary()} ' 
      f'(size ={char_to_num.vocabulary_size()})')

In [None]:
# testing fn
# char_to_num(['s','j','a','i', 'n']) #[19, 10,  1,  9, 14]
# num_to_char([19, 10,  1,  9, 14]) #['s','j','a','i', 'n']

In [None]:
# Load the video
def load_video(path:str) -> List[float]: 
    cap = cv2.VideoCapture(path)
    frames = []
    for _ in range(int(cap.get(cv2.CAP_PROP_FRAME_COUNT))): 
        ret, frame = cap.read()
        frame = tf.image.rgb_to_grayscale(frame)
        frames.append(frame[190:236,80:220,:]) 
    cap.release()
    
    mean = tf.math.reduce_mean(frames)
    std = tf.math.reduce_std(tf.cast(frames, tf.float32))
    return tf.cast((frames - mean), tf.float32) / std

In [None]:
# Load the subtitles for video
def load_subtitles(path:str) -> List[str]: 
    with (open(path, 'r') as f): 
        lines = f.readlines() 
    tokens = []
    for line in lines:
        line = line.split()
        if line[2] != 'sil': 
            tokens = [*tokens,' ',line[2]]
    return char_to_num(tf.reshape(tf.strings.unicode_split(tokens, input_encoding='UTF-8'), (-1)))[1:]

In [None]:
# loading the video and subtitles simultaneously
def load_data(path: str): 
    path = bytes.decode(path.numpy()) 

    # Path Splitter for MacOS/Linux
    file_name = path.split('/')[-1].split('.')[0]

    # for Windows
    # file_name = path.split('\\')[-1].split('.')[0]
    
    video_path = os.path.join('dataset','videos',f'{file_name}.mpg') # replace with your location
    subtitles_path = os.path.join('dataset','subtitles',f'{file_name}.align') # replace with your location
    frames = load_video(video_path) 
    subtitles = load_subtitles(subtitles_path)
    
    return frames, subtitles

In [None]:
test_path = 'dataset/videos/bbal6n.mpg'

In [None]:
tf.convert_to_tensor(test_path).numpy().decode('utf-8').split('/')[-1].split('.')[0] 

In [None]:
load_data(tf.convert_to_tensor(test_path)) 

In [None]:
frames, subtitles = load_data(tf.convert_to_tensor(test_path)) 

In [None]:
#testing the function
plt.imshow(frames[40])

In [None]:
tf.strings.reduce_join([bytes.decode(x) for x in num_to_char(subtitles.numpy()).numpy()]) 

In [None]:
def mappable_function(path:str) -> List[str]:
    result = tf.py_function(load_data, [path], (tf.float32, tf.int64))
    return result 

# 2. Create Data Pipeline

In [None]:
from matplotlib import pyplot as plt

In [None]:
data = tf.data.Dataset.list_files('dataset/videos/*.mpg') # replace with your location
data = data.shuffle(500, reshuffle_each_iteration=False) 
data = data.map(mappable_function)
data = data.padded_batch(2, padded_shapes=([75,None,None,None],[40]))
data = data.prefetch(tf.data.AUTOTUNE)

# Added for split 
train = data.take(450)
test = data.skip(450)

In [None]:
len(test)

In [None]:
frames, subtitles = data.as_numpy_iterator().next() 

In [None]:
len(frames), len(subtitles)

In [None]:
sample = data.as_numpy_iterator()

In [None]:
val = sample.next(); val[0] 

In [None]:
imageio.mimsave('animation.gif', val[0][1], fps=10) 

In [None]:
# 0:videos, 0: 1st video out of the batch,  0: return the frame in the video 
plt.imshow(val[0][0][34]) 

In [None]:
tf.strings.reduce_join([num_to_char(word) for word in val[1][0]]) 

# 3. Design the Deep Neural Network

In [None]:
from tensorflow.keras.models import Sequential 
from tensorflow.keras.layers import Conv3D, LSTM, Dense, Dropout, Bidirectional, MaxPool3D, Activation, Reshape, SpatialDropout3D, BatchNormalization, TimeDistributed, Flatten 
from tensorflow.keras.optimizers import Adam 
from tensorflow.keras.callbacks import ModelCheckpoint, LearningRateScheduler 
import keras

In [None]:
# get the shape of the input
data.as_numpy_iterator().next()[0][0].shape 

In [None]:
model = Sequential()

# Add a 3D convolutional layer with 128 filters, kernel size of 3, and input shape of (75, 46, 140, 1)
model.add(Conv3D(128, 3, input_shape=(75,46,140,1), padding='same'))  
model.add(Activation('relu'))
model.add(MaxPool3D((1,2,2)))  # Apply 3D max pooling with pool size of (1, 2, 2)

# Add another 3D convolutional layer with 256 filters and kernel size of 3
model.add(Conv3D(256, 3, padding='same')) 
model.add(Activation('relu'))
model.add(MaxPool3D((1,2,2)))  # Apply 3D max pooling with pool size of (1, 2, 2)

# Add another 3D convolutional layer with 75 filters and kernel size of 3
model.add(Conv3D(75, 3, padding='same'))  
model.add(Activation('relu'))
model.add(MaxPool3D((1,2,2)))  # Apply 3D max pooling with pool size of (1, 2, 2)

# Apply TimeDistributed layer to flatten the input
model.add(TimeDistributed(Flatten()))  

# Define an Orthogonal initializer
initializer = keras.initializers.Orthogonal()

# Add a bidirectional LSTM layer with 128 units, using Orthogonal initializer and returning sequences
model.add(Bidirectional(LSTM(128, kernel_initializer=initializer, return_sequences=True)))  
model.add(Dropout(.5))  # Apply dropout with a rate of 0.5

# Add another bidirectional LSTM layer with 128 units, using Orthogonal initializer and returning sequences
model.add(Bidirectional(LSTM(128, kernel_initializer=initializer, return_sequences=True)))  
model.add(Dropout(.5))  # Apply dropout with a rate of 0.5

# Add a dense layer with units equal to the vocabulary size plus 1, using He normal initializer and softmax activation function
model.add(Dense(char_to_num.vocabulary_size()+1, kernel_initializer='he_normal', activation='softmax'))

In [None]:
model.summary()

In [None]:
yhat = model.predict(val[0]) 

In [None]:
tf.strings.reduce_join([num_to_char(x) for x in tf.argmax(yhat[0],axis=1)])

In [None]:
tf.strings.reduce_join([num_to_char(tf.argmax(x)) for x in yhat[0]])

In [None]:
model.input_shape

In [None]:
model.output_shape

# 4. Setup Training Options and Train

In [None]:
def scheduler(epoch, lr):
    if epoch < 30:
        return lr
    else:
        return lr * tf.math.exp(-0.1)

In [None]:
def CTCLoss(y_true, y_pred):
    batch_len = tf.cast(tf.shape(y_true)[0], dtype="int64") 
    input_length = tf.cast(tf.shape(y_pred)[1], dtype="int64") 
    label_length = tf.cast(tf.shape(y_true)[1], dtype="int64") 

    input_length = input_length * tf.ones(shape=(batch_len, 1), dtype="int64") 
    label_length = label_length * tf.ones(shape=(batch_len, 1), dtype="int64") 

    loss = tf.keras.backend.ctc_batch_cost(y_true, y_pred, input_length, label_length) 
    return loss

In [None]:
class ProduceExample(tf.keras.callbacks.Callback): 
    def __init__(self, dataset) -> None: 
        self.dataset = dataset.as_numpy_iterator()
    
    def on_epoch_end(self, epoch, logs=None) -> None:
        data = self.dataset.next()
        yhat = self.model.predict(data[0]) 
        decoded = tf.keras.backend.ctc_decode(yhat, [75,75], greedy=False)[0][0].numpy()
        for x in range(len(yhat)):           
            print('Original:', tf.strings.reduce_join(num_to_char(data[1][x])).numpy().decode('utf-8'))
            print('Prediction:', tf.strings.reduce_join(num_to_char(decoded[x])).numpy().decode('utf-8'))
            print('~'*100)

In [None]:
model.compile(tf.keras.optimizers.legacy.Adam(learning_rate=0.0001), loss=CTCLoss)

In [None]:
checkpoint_callback = ModelCheckpoint(os.path.join('model','checkpoint'), monitor='loss', save_weights_only=True)

In [None]:
schedule_callback = LearningRateScheduler(scheduler)

In [None]:
example_callback = ProduceExample(test)

In [None]:
model.fit(train, validation_data=test, epochs=100, callbacks=[checkpoint_callback, schedule_callback, example_callback])

# 5. Make a Prediction 

In [None]:
# Downloading and Extracting my Pre-Trained Model

# import gdown
# url = 'https://drive.google.com/file/d/1eg4FaZgTPF6vlFJHhreAdxY48cgKezER/view?usp=sharing'
# output = 'models.zip'
# gdown.download(url=url, output=output, fuzzy=True)
# gdown.extractall('models.zip')

In [None]:
model.load_weights('models/checkpoint')

In [None]:
test_data = test.as_numpy_iterator()

In [None]:
sample = test_data.next()
yhat = model.predict(sample[0])

In [None]:
print('~'*50, 'REAL TEXT', '~'*50)
[tf.strings.reduce_join([num_to_char(word) for word in sentence]) for sentence in sample[1]]

In [None]:
decoded = tf.keras.backend.ctc_decode(yhat, input_length=[75,75], greedy=True)[0][0].numpy()
print('~'*50, 'PREDICTIONS', '~'*50)
[tf.strings.reduce_join([num_to_char(word) for word in sentence]) for sentence in decoded]

# Test on a Video

In [None]:
sample = load_data(tf.convert_to_tensor('/dataset/videos/lgbf9s.mpg')) # replace with your location

In [None]:
print('~'*50, 'REAL TEXT', '~'*50)
[tf.strings.reduce_join([num_to_char(word) for word in sentence]) for sentence in [sample[1]]]

In [None]:
yhat = model.predict(tf.expand_dims(sample[0], axis=0))

In [None]:
decoded = tf.keras.backend.ctc_decode(yhat, input_length=[75], greedy=True)[0][0].numpy()
print('~'*50, 'PREDICTIONS', '~'*50)
[tf.strings.reduce_join([num_to_char(word) for word in sentence]) for sentence in decoded]

# Accuracy Check

In [None]:
import tensorflow as tf
import editdistance

def cer(preds, labels):
    total_chars = 0
    total_errors = 0
    for pred, label in zip(preds, labels):
        total_chars += len(label)
        total_errors += editdistance.eval(pred, label)
    
    return total_errors / total_chars

def wer(preds, labels):
    total_words = 0
    total_errors = 0
    for pred, label in zip(preds, labels):
        pred_words = pred.split()
        label_words = label.split()
        
        total_words += len(label_words)
        total_errors += editdistance.eval(pred_words, label_words)
    
    return total_errors / total_words

def decode_predictions(predictions):
    decoded = tf.keras.backend.ctc_decode(predictions, input_length=[predictions.shape[1]] * predictions.shape[0], greedy=True)[0][0].numpy()
    decoded_str = [tf.strings.reduce_join([num_to_char(word) for word in sentence]).numpy().decode('utf-8') for sentence in decoded]
    return decoded_str

# Load your test data
test_data = test.as_numpy_iterator()
sample = test_data.next()

# Model predictions
yhat = model.predict(sample[0])

# Decode predictions and true labels
decoded_preds = decode_predictions(yhat)
true_labels = [tf.strings.reduce_join([num_to_char(word) for word in sentence]).numpy().decode('utf-8') for sentence in sample[1]]

# Compute CER and WER
cer_value = cer(decoded_preds, true_labels)
wer_value = wer(decoded_preds, true_labels)

print(f'Character Error Rate (CER): {cer_value:.2f}')
print(f'Word Error Rate (WER): {wer_value:.2f}')


In [None]:
#testing the model
