In [3]:
import tensorflow as tf
import cv2
import numpy as np
from matplotlib import pyplot as plt
import os
import imageio
from typing import List

In [None]:
import gdown
url=("https://drive.google.com/uc?id=1YlvpDLix3S-U8fd-gqRwPcWXAXm8JwjL")
output="data1.zip"
gdown.download(url,output,quiet=True)
gdown.extractall(output)

In [8]:
def load_video(path:str)-> List[float]:
    cap=cv2.VideoCapture(path)
    frames=[]
    for _ in range(int(cap.get(cv2.CAP_PROP_FRAME_COUNT))):
        ret,frame=cap.read()
        frame=tf.image.rgb_to_grayscale(frame)
        frames.append(frame[190:236,80:220,:])
    cap.release()

    mean=tf.math.reduce_mean(frames)
    std=tf.math.reduce_std(tf.cast(frames,tf.float32))
    return tf.cast(frames-mean,tf.float32)/std

        
        

In [4]:
vocab=[x for x in "abcdefghijklmnopqrstuvwxyz'?!123456789 "]

char_to_num=tf.keras.layers.StringLookup(vocabulary=vocab,oov_token="")
num_to_char=tf.keras.layers.StringLookup(vocabulary=char_to_num.get_vocabulary(),oov_token="",invert=True)

In [5]:
def load_alignment(path:str)->List[str]:
    with open(path,'r') as f:
        lines=f.readlines()
    token=[]
    for line in lines:
        line=line.split(" ")
        if(line[2]!="sil"):
            tokens=[*token,' ',line[2]]
    return char_to_num(tf.reshape(tf.strings.unicode_split(tokens,input_encoding='UTF-8'),(-1)))[1:]

In [6]:
def load_data(path:str):
    path=tf.convert_to_tensor(path).numpy().decode("utf-8")
    file_name=path.split("\\")[-1].split(".")[0]
    video_path=os.path.join("data","s1",file_name+".mpg")
    alignment_path=os.path.join("data","alignments","s1",file_name+".align")
    aligns=load_alignment(alignment_path)
    frames=load_video(video_path)
    return frames,aligns

In [None]:
test_path=".\\data\\s1\\bbal6n.mpg"
frames,alignments=load_data(test_path)
plt.imshow(frames[74])
imageio.mimsave('./animation.gif', frames, fps=10)
        

In [10]:
def map_funcn(path:str)->List[str]:
    result=tf.py_function(load_data,[path],(tf.float32,tf.int64))
    
    return result

In [11]:
data=tf.data.Dataset.list_files("./data/s1/*.mpg")
data=data.shuffle(500,reshuffle_each_iteration=False)
data=data.map(map_funcn)
data=data.padded_batch(2,padded_shapes=([75,None,None,None],[40]))
data=data.prefetch(tf.data.AUTOTUNE)
train=data.take(450)
test=data.skip(450)

In [12]:
frames,alignment=train.as_numpy_iterator().next()
frames.shape


(2, 75, 46, 140, 1)

In [13]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv3D, Conv2D, LSTM, Dense, MaxPooling3D, Bidirectional, Activation, Flatten, TimeDistributed, Dropout,Input
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ModelCheckpoint, LearningRateScheduler

In [14]:
model = Sequential()
model.add(Conv3D(128, 3, input_shape=(75,46,140,1), padding='same',activation="relu"))
model.add(MaxPooling3D((1,2,2)))

model.add(Conv3D(256, 3, input_shape=(75,46,140,1), padding='same',activation="relu"))
model.add(MaxPooling3D((1,2,2)))

model.add(Conv3D(75, 3, input_shape=(75,46,140,1), padding='same', activation="relu"))
model.add(MaxPooling3D((1,2,2)))


model.add(TimeDistributed(Flatten()))


model.add(Bidirectional(LSTM(128, return_sequences=True)))
model.add(Dropout(.5))

model.add(Bidirectional(LSTM(128, return_sequences=True)))
model.add(Dropout(.5))

model.add(Dense(char_to_num.vocabulary_size()+1, activation='softmax'))
model.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv3d (Conv3D)             (None, 75, 46, 140, 128)  3584      
                                                                 
 max_pooling3d (MaxPooling3D  (None, 75, 23, 70, 128)  0         
 )                                                               
                                                                 
 conv3d_1 (Conv3D)           (None, 75, 23, 70, 256)   884992    
                                                                 
 max_pooling3d_1 (MaxPooling  (None, 75, 11, 35, 256)  0         
 3D)                                                             
                                                                 
 conv3d_2 (Conv3D)           (None, 75, 11, 35, 75)    518475    
                                                                 
 max_pooling3d_2 (MaxPooling  (None, 75, 5, 17, 75)    0

In [15]:
def scheduler(epoch,lr):
    if(epoch<30):
        return lr
    else:
        return lr*tf.math.exp(-0.1)

In [16]:
def CTCLoss(y_true,y_pred):
    batch_len=tf.cast(tf.shape(y_true)[0],dtype="int64")
    input_length=tf.cast(tf.shape(y_pred)[1],dtype="int64")
    label_length=tf.cast(tf.shape(y_pred)[1],dtype="int64")
    input_length=input_length*tf.ones(shape=(batch_len,1),dtype="int64")
    label_length=label_length*tf.ones(shape=(batch_len,1),dtype="int64")
    loss=tf.keras.backend.ctc_batch_cost(y_true,y_pred,input_length,label_length)
    return loss


In [17]:
class example(tf.keras.callbacks.Callback):
   
    def __init__(self,dataset)->None:
        self.dataset=dataset.as_numpy_iterator()
    def on_epoch_end(self,epoch,logs=None)->None:
        data = self.dataset.next()
        yhat = self.model.predict(data[0])
        decoded = tf.keras.backend.ctc_decode(yhat, [75,75], greedy=False)[0][0].numpy()
        for x in range(len(yhat)):           
            print('Original:', tf.strings.reduce_join(num_to_char(data[1][x])).numpy().decode('utf-8'))
            print('Prediction:', tf.strings.reduce_join(num_to_char(decoded[x])).numpy().decode('utf-8'))
            
        

In [18]:
model.compile(optimizer=Adam(learning_rate=0.0001),loss=CTCLoss)

In [19]:
checkpoint_callback = ModelCheckpoint(os.path.join('models','checkpoint.weights.h5'), monitor='loss', save_weights_only=True)

In [20]:
schedule_callback=LearningRateScheduler(scheduler)

In [18]:
example_callback=example(test)
example_callback

<__main__.example at 0x23f9662c880>

In [None]:
model.fit(train,validation_data=test,epochs=100,callbacks=[checkpoint_callback,schedule_callback,example_callback])

Epoch 1/100

In [30]:
import gdown
url = 'https://drive.google.com/uc?id=1vWscXs4Vt0a_1IH1-ct2TCgXAZT-N3_Y'
output = 'checkpoints.zip'
gdown.download(url, output, quiet=False)
gdown.extractall('checkpoints.zip', 'models')

Downloading...
From: https://drive.google.com/uc?id=1vWscXs4Vt0a_1IH1-ct2TCgXAZT-N3_Y
To: C:\Users\charu\Jupyter\Lip_Reading_AI\checkpoints.zip
100%|██████████████████████████████████████████████████████████████████████████████| 94.5M/94.5M [04:23<00:00, 358kB/s]


['models\\checkpoint.index',
 'models\\__MACOSX/._checkpoint.index',
 'models\\checkpoint.data-00000-of-00001',
 'models\\__MACOSX/._checkpoint.data-00000-of-00001',
 'models\\checkpoint',
 'models\\__MACOSX/._checkpoint']

In [21]:
model.load_weights('models/checkpoint')



<tensorflow.python.checkpoint.checkpoint.CheckpointLoadStatus at 0x1de9b72b7f0>

In [22]:
test_data = test.as_numpy_iterator()

In [23]:
sample = test_data.next()

In [26]:
yhat = model.predict(sample[0])
p=[tf.strings.reduce_join([num_to_char(word) for word in sentence]) for sentence in sample[1]]

decoded = tf.keras.backend.ctc_decode(yhat, input_length=[75,75], greedy=True)[0][0].numpy()
print("Real Text, Predicted Text:",)
p1=[tf.strings.reduce_join([num_to_char(word) for word in sentence]) for sentence in decoded]
p1

Real Text, Predicted Text:


[<tf.Tensor: shape=(), dtype=string, numpy=b'set white at u nine soon'>,
 <tf.Tensor: shape=(), dtype=string, numpy=b'bin red at m five again'>]