#Downloading the datasets

In [None]:
!wget 'https://spandh.dcs.shef.ac.uk/gridcorpus/s5/video/s5.mpg_vcd.zip'

In [None]:
!wget 'https://spandh.dcs.shef.ac.uk/gridcorpus/s20/video/s20.mpg_vcd.zip'

In [None]:
!wget 'https://spandh.dcs.shef.ac.uk/gridcorpus/s30/video/s30.mpg_vcd.zip'

In [None]:
!wget 'https://spandh.dcs.shef.ac.uk/gridcorpus/s5/align/s5.tar'

In [None]:
!wget 'https://spandh.dcs.shef.ac.uk/gridcorpus/s20/align/s20.tar'

In [None]:
!wget 'https://spandh.dcs.shef.ac.uk/gridcorpus/s30/align/s30.tar'

In [None]:
!unzip '/content/s5.mpg_vcd.zip'

In [None]:
!unzip '/content/s20.mpg_vcd.zip'

In [None]:
!unzip '/content/s30.mpg_vcd.zip'

In [None]:
!tar -xvf s5.tar

In [None]:
!tar -xvf s20.tar

In [None]:
!tar -xvf s30.tar

# Creating the Model

In [None]:
model.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv3d (Conv3D)             (None, 75, 46, 140, 128   3584      
                             )                                   
                                                                 
 activation (Activation)     (None, 75, 46, 140, 128   0         
                             )                                   
                                                                 
 max_pooling3d (MaxPooling3  (None, 75, 23, 70, 128)   0         
 D)                                                              
                                                                 
 conv3d_1 (Conv3D)           (None, 75, 23, 70, 256)   884992    
                                                                 
 activation_1 (Activation)   (None, 75, 23, 70, 256)   0         
                                                        

In [None]:
import tensorflow as tf
from typing import List
import cv2
import os

vocab = [x for x in "abcdefghijklmnopqrstuvwxyz'?!123456789 "]
char_to_num = tf.keras.layers.StringLookup(vocabulary=vocab, oov_token="")
# Mapping integers back to original characters
num_to_char = tf.keras.layers.StringLookup(
    vocabulary=char_to_num.get_vocabulary(), oov_token="", invert=True
)


In [None]:
def load_alignments(path:str) -> List[str]:
    #print(path)
    with open(path, 'r') as f:
        lines = f.readlines()
    tokens = []
    for line in lines:
        line = line.split()
        if line[2] != 'sil':
            tokens = [*tokens,' ',line[2]]
    return char_to_num(tf.reshape(tf.strings.unicode_split(tokens, input_encoding='UTF-8'), (-1)))[1:]

In [None]:
yhat = model.predict(frames)

In [None]:
decoded = tf.keras.backend.ctc_decode(yhat,input_length = [75],greedy=True)[0][0].numpy()

In [None]:
[tf.strings.reduce_join([num_to_char(word) for word in sentence]) for sentence in decoded]

In [None]:
[tf.strings.reduce_join([num_to_char(word) for word in sentence]) for sentence in [alignments]]

In [None]:
model.save('lipnet_grid_model.keras')

In [None]:
cropping_dictionary = {}
cropping_dictionary['s30'] = (200,246,80,220)
cropping_dictionary['s5'] = (210,256,120,260)
cropping_dictionary['s20'] = (200,246,120,260)

In [None]:
def load_video(path):
    print(path)
    folder_name = path.split('/')[-2]
    a,b,c,d = cropping_dictionary[folder_name]
    cap = cv2.VideoCapture(path)
    frames = []
    for _ in range(int(cap.get(cv2.CAP_PROP_FRAME_COUNT))):
        ret, frame = cap.read()
        frame = tf.image.rgb_to_grayscale(frame)
        frames.append(frame[a:b,c:d,:])
    cap.release()
    mean = tf.math.reduce_mean(frames)
    std = tf.math.reduce_std(tf.cast(frames, tf.float32))
    return tf.cast((frames - mean), tf.float32) / std


In [None]:
def load_data(path):
  path = bytes.decode(path.numpy())
  file_name = path.split('/')[-1].split('.')[0]
  folder_name = path.split('/')[-2]
  video_path = os.path.join(path)
  try:
    alignment_path = os.path.join(f'/content/alignment/{folder_name}/align',f'{file_name}.align')
    frames = load_video(video_path)
    alignments = load_alignments(alignment_path)
  except:
    return tf.zeros((75,46,140,1)),tf.zeros((40),dtype= tf.int32)
  return frames, alignments


In [None]:
def mappable_function(path):
    result = tf.py_function(load_data, [path], (tf.float32, tf.int64))
    return result

# Creating Tf Datasets

In [None]:
import tensorflow as tf

In [None]:
data = tf.data.Dataset.list_files('./data/*/*.mpg')


In [None]:
for i in data:
  print(i)

In [None]:
data = data.shuffle(10000, reshuffle_each_iteration=False)
data = data.map(mappable_function)
data = data.padded_batch(2, padded_shapes=([75,None,None,None],[40]))
data = data.prefetch(tf.data.AUTOTUNE)
train = data.take(9500)
test = data.skip(9500)

# Training

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv3D, LSTM, Dense, Dropout, Bidirectional, MaxPool3D, Activation, Reshape, SpatialDropout3D, BatchNormalization, TimeDistributed, Flatten
from tensorflow.keras.optimizers import Adam 

In [None]:
def CTCLoss(y_true, y_pred):
    batch_len = tf.cast(tf.shape(y_true)[0], dtype="int64")
    input_length = tf.cast(tf.shape(y_pred)[1], dtype="int64")
    label_length = tf.cast(tf.shape(y_true)[1], dtype="int64")

    input_length = input_length * tf.ones(shape=(batch_len, 1), dtype="int64")
    label_length = label_length * tf.ones(shape=(batch_len, 1), dtype="int64")

    loss = tf.keras.backend.ctc_batch_cost(y_true, y_pred, input_length, label_length)
    return loss

In [None]:
model.compile(optimizer=tf.keras.optimizers.legacy.Adam(learning_rate=0.01), loss=CTCLoss)

In [None]:
history = model.fit(train, epochs=100)

Epoch 1/10
 79/950 [=>............................] - ETA: 13:57 - loss: 104.4280

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
model.save('Lipnet_10epochs_model.h5')

  saving_api.save_model(


In [None]:
import matplotlib.pyplot as plt

In [None]:
plt.plt(history.hi)

# Predictions

In [None]:
sample = data.as_numpy_iterator()

In [None]:
val = sample.next(); val[0]

In [None]:
yhat = model.predict(val[0
                         ])



In [None]:
tf.strings.reduce_join([num_to_char(x) for x in tf.argmax(yhat[0],axis=1)])

<tf.Tensor: shape=(), dtype=string, numpy=b'binc   bbeee bbi      nnne   aoin'>