## LipNet 

This code is inspired by a similar demonstration by Nicholas Renotte. This is a demonstration of lip reading.

To acquire the data:

https://drive.google.com/uc?id=1YlvpDLix3S-U8fd-gqRwPcWXAXm8JwjL

In [None]:
import tensorflow as tf
import os
import cv2
import numpy as np
from tensorflow.keras.models import Sequential 
from tensorflow.keras.layers import Conv3D, GRU, Dense, Dropout, Bidirectional, MaxPool3D, Activation, Reshape, SpatialDropout3D, BatchNormalization, TimeDistributed, Flatten
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ModelCheckpoint, LearningRateScheduler

In [2]:
data_dir = os.path.join(os.getcwd(), os.listdir(os.getcwd())[1])

In [3]:
mpg = []; align = []
for folder, _, filename in os.walk(data_dir):
    for file in filename:
        if file.endswith('mpg'):
            mpg.append(os.path.join(folder, file))
        elif file.endswith('align'):
            align.append(os.path.join(folder, file))

In [4]:
def load_mpg(mpg_file):
    video = cv2.VideoCapture(mpg_file)
    frames = []
    
    for f in range(int(video.get(cv2.CAP_PROP_FRAME_COUNT))):
        ret, frame = video.read()
        frame = tf.image.rgb_to_grayscale(frame)
        frame = tf.cast(frame, tf.float32)
        frame = frame[190:236,80:220,:]
        frames.append(frame)
        
    mean = tf.math.reduce_mean(frames)
    std = tf.math.reduce_std(frames)
    
    normalized = tf.cast((frames - mean), tf.float32) / std
    
    return normalized
        

In [5]:
vocab = [v for v in 'abcdefghijklmnopqrstuvwxyz0123456789']
char_to_num = tf.keras.layers.StringLookup(vocabulary=vocab, oov_token=' ')
num_to_char = tf.keras.layers.StringLookup(vocabulary=char_to_num.get_vocabulary(), oov_token=' ', invert=True)

In [6]:
def load_align(align_file):
    files = []
    with open(align_file) as file:
        for file in file:
            
            if file.strip().split(' ')[2] != 'sil':
                files.append(file.strip().split(' ')[2])
    
    for idx, file in enumerate(files):
        if idx % 2 == 0:
            files.insert(idx, ' ')
                    
    unicodes = char_to_num(tf.reshape(tf.strings.unicode_split(files, input_encoding='UTF-8'), (-1)))[1:]
    
    return unicodes

In [7]:
load_align(align[0])

<tf.Tensor: shape=(21,), dtype=int64, numpy=
array([ 2,  9, 14,  0,  2, 12, 21,  5,  0,  1, 20,  0,  6,  0, 20, 23, 15,
        0, 14, 15, 23], dtype=int64)>

In [8]:
def load_data(mpg_file):
    frame = tf.convert_to_tensor(mpg_file).numpy().decode('utf-8')
    frames = load_mpg(frame)
    
    start = tf.strings.reduce_join((tf.strings.split(frame, sep='\\')[:-2]), separator='\\').numpy().decode('utf-8')
    end = tf.strings.regex_replace(tf.strings.reduce_join(tf.strings.split(frame, sep='\\')[-2:], separator='\\'), 'mpg', 'align').numpy().decode('utf-8')
    centre = tf.strings.reduce_join(tf.constant('alignments'), separator='\\').numpy().decode('utf-8')
    align = os.path.join(start, centre, end)
    align = load_align(align)
    
    return frames, align

In [9]:
load_data(mpg[0])

(<tf.Tensor: shape=(75, 46, 140, 1), dtype=float32, numpy=
 array([[[[ 1.4685191 ],
          [ 1.4685191 ],
          [ 1.4310399 ],
          ...,
          [ 0.38162336],
          [ 0.38162336],
          [ 0.38162336]],
 
         [[ 1.4685191 ],
          [ 1.4685191 ],
          [ 1.4310399 ],
          ...,
          [ 0.38162336],
          [ 0.38162336],
          [ 0.38162336]],
 
         [[ 1.4310399 ],
          [ 1.4310399 ],
          [ 1.4685191 ],
          ...,
          [ 0.30666503],
          [ 0.30666503],
          [ 0.30666503]],
 
         ...,
 
         [[ 1.0187691 ],
          [ 1.0187691 ],
          [ 0.9812899 ],
          ...,
          [ 0.08179008],
          [ 0.08179008],
          [ 0.04431092]],
 
         [[ 1.0187691 ],
          [ 1.0187691 ],
          [ 0.9812899 ],
          ...,
          [ 0.08179008],
          [ 0.04431092],
          [ 0.04431092]],
 
         [[ 1.0187691 ],
          [ 1.0187691 ],
          [ 0.9812899 ],
          

In [10]:
def map_function(file):
    return tf.py_function(load_data, [file], (tf.float32, tf.int64))

In [11]:
data = tf.data.Dataset.list_files([m for m in mpg])

data = data.shuffle(500, reshuffle_each_iteration=False)
data = data.prefetch(tf.data.AUTOTUNE)
data = data.map(map_function)
data = data.padded_batch(2, padded_shapes=([75, None, None, None], [40]))
data = data.prefetch(tf.data.AUTOTUNE)
data = data.cache()

train = data.take(450)
test = data.skip(450)

In [12]:
model = Sequential()

model.add(Conv3D(128, input_shape=(75, 46, 140, 1), kernel_size=3, activation='relu'))
model.add(Dropout(0.5))
model.add(MaxPool3D((1, 2, 2)))

model.add(Conv3D(128, input_shape=(75, 46, 140, 1), kernel_size=3, activation='relu'))
model.add(Dropout(0.5))
model.add(MaxPool3D((1, 2, 2)))

model.add(Conv3D(128, input_shape=(75, 46, 140, 1), kernel_size=3, activation='relu'))
model.add(Dropout(0.5))
model.add(MaxPool3D((1, 2, 2)))

model.add(TimeDistributed(Flatten()))

model.add(Bidirectional(GRU(128, kernel_initializer='Orthogonal', return_sequences=True)))

model.add(Bidirectional(GRU(128, kernel_initializer='Orthogonal', return_sequences=True)))

model.add(Dense(num_to_char.vocabulary_size()+1, kernel_initializer='he_normal', activation='softmax'))

In [13]:
def lr_scheduler(epochs, lr):
    if epochs < 30:
        return lr * tf.math.exp(-0.1)
    else:
        return lr

In [14]:
def CTCLoss(y_true, y_pred):
    
    batch_length = tf.cast(tf.shape(y_true)[0], dtype='int32')
    input_length = tf.cast(tf.shape(y_pred)[1], dtype='int32')
    label_length = tf.cast(tf.shape(y_true)[1], dtype='int32')
    
    input_length = input_length * tf.ones(shape=(batch_length, 1), dtype='int32')
    label_length = label_length * tf.ones(shape=(batch_length, 1), dtype='int32')
    
    loss = tf.keras.backend.ctc_batch_cost(y_true, y_pred, input_length, label_length)
    
    return loss

In [15]:
print('here')

here


In [71]:
class ProduceExample(tf.keras.callbacks.Callback):
    
    def __init__(self, dataset):
        dataset = dataset.as_numpy_iterator()
        
    def on_epoch_end(self, epochs):
        data = self.dataset.next()
        predictions = self.model.predict(data[0])
        decoded = tf.keras.backend.ctc_decode(predictions, [75, 75], greedy=False)[0][0].numpy()
        
        for i in range(len(predictions)):
            print('Original: {}'.format(tf.strings.reduce_join(num_to_char(data[1][i]).numpy().decode('utf-8'))))
            print('Prediction: {}'.format(tf.strings.reduce_join(decoded[i]).numpy().decode('utf-8')))
            print('~'*100)

In [75]:
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3), loss=CTCLoss)

checkpoint_check = ModelCheckpoint(os.path.join('models', 'checkpoint'), save_weights_only=False)
lr_check = LearningRateScheduler(lr_scheduler)
produce_example = ProduceExample(test)

model.fit(train, validation_data=test, callbacks=[checkpoint_check, lr_check, produce_example])

 69/450 [===>..........................] - ETA: 4:24:04 - loss: 91.4557

KeyboardInterrupt: 

In [76]:
sample = data.as_numpy_iterator()

sample_data = sample.next()

prediction = model.predict(sample_data[0])



In [82]:
print('Actual:')
print(' ')
predictions=tf.strings.reduce_join(num_to_char(sample_data[1])).numpy().decode('utf-8')
print(predictions)

Actual:
 
lay blue by e one soon                  place green by k eight please           


In [84]:
prediction.shape

(2, 69, 38)

In [90]:
print('Predictions')
print(' ')

decoded = tf.keras.backend.ctc_decode(prediction, input_length=[69, 38], greedy=True)[0][0].numpy()

[tf.strings.reduce_join([num_to_char(word) for word in sentence]) for sentence in decoded]


Predictions
 


[<tf.Tensor: shape=(), dtype=string, numpy=b'be                                                                   '>,
 <tf.Tensor: shape=(), dtype=string, numpy=b'be                                                                   '>]