In [1]:
import os
import json
import cv2
from numpy import random, ceil
from matplotlib import pyplot as plt
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv3D, LSTM, Dense, Dropout, Bidirectional, MaxPool3D, Activation, Reshape, SpatialDropout3D, BatchNormalization, TimeDistributed, Flatten
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ModelCheckpoint, LearningRateScheduler

#importing dependencies

In [4]:
vocab = ['b', 'ih', 'n', 'l', 'uw', 'ae', 't', 'iy', 'ay', 'p', 'z', 'w', 'f', 'v', 'aw', 'jh', 'ey', 's', 'g', 'r', 'eh', 'd', 'ia', 'ow', 'ah', 'a', 'y', 'th', 'dh', 'ao', 'k', 'm', 'aa', 'ch']
char_to_num = tf.keras.layers.StringLookup(vocabulary=vocab, oov_token='')
num_to_char = tf.keras.layers.StringLookup(vocabulary=char_to_num.get_vocabulary(), oov_token="", invert=True)

#defining the vocabulary using keras functions

print(
    f"The vocabulary is: {char_to_num.get_vocabulary()} "
    f"(size ={char_to_num.vocabulary_size()})"
)

def process_video(path:str): # function for returning input and output signals from a given path
    path = bytes.decode(path.numpy())
    filename = path.split('/')[-1].split('.')[0]
    alignment_file = open(r"drive/MyDrive/Lombard Grid/lombardgrid_alignment/lombardgrid/alignment"+ '/' + filename + r".json")
    alignments = json.load(alignment_file)
    rng = random.randint(1, len(alignments[filename])//2)
    phonemes = ['a' if alignments[filename][i]['phone'].split('_')[0] == 'ax' else alignments[filename][i]['phone'].split('_')[0]
                                  for i in range(len(alignments[filename])//2 - rng, len(alignments[filename])//2 + rng)]
    all_frames = []
    vid = cv2.VideoCapture(r"drive/MyDrive/Lombard Grid/lombardgrid_front/lombardgrid/compressedx4" + '/' + filename + r".mov")
    num_frames = vid.get(cv2.CAP_PROP_FRAME_COUNT)
    fps = vid.get(cv2.CAP_PROP_FPS)
    tot_time = num_frames / (fps + 1e-6)

    start_f = ceil(alignments[filename][len(alignments[filename])//2 - rng]['offset'] * num_frames / tot_time)
    end_f = ceil((alignments[filename][len(alignments[filename])//2 + rng]['offset'] + alignments[filename][len(alignments[filename])//2 + rng]['duration'])* num_frames / tot_time)

    counter = -1
    while True:
        counter += 1
        if counter > end_f:
            break
        ret, frame = vid.read()
        if not ret:
            break
        if counter >= start_f:
            frame = tf.image.rgb_to_grayscale(frame[52:112, 50:140, :])
            all_frames.append(frame)
    vid.release()
    return tf.cast(all_frames, tf.float32)/255, char_to_num(phonemes)

def mappable_function(path:str):  #function for mapping process_video() to tensorflow's Dataset
    result = tf.py_function(process_video, [path], (tf.float32, tf.int64))
    return result



The vocabulary is: ['', 'b', 'ih', 'n', 'l', 'uw', 'ae', 't', 'iy', 'ay', 'p', 'z', 'w', 'f', 'v', 'aw', 'jh', 'ey', 's', 'g', 'r', 'eh', 'd', 'ia', 'ow', 'ah', 'a', 'y', 'th', 'dh', 'ao', 'k', 'm', 'aa', 'ch'] (size =35)


In [5]:
data = tf.data.Dataset.list_files(r"drive/MyDrive/Lombard Grid/lombardgrid_front/lombardgrid/compressedx4/*.mov")
data = data.shuffle(500, reshuffle_each_iteration = False)
data = data.map(mappable_function)
data = data.padded_batch(2, padded_shapes = ([96, None, None, None], [40]))
data = data.prefetch(tf.data.AUTOTUNE)

train = data.skip(200) # Train test split is 200 videos for testing, and the rest for training
test = data.take(200)

In [6]:
test.as_numpy_iterator().next()[0][0].shape

(96, 60, 90, 1)

In [7]:
model = Sequential()
model.add(Conv3D(64, 3, input_shape=(96,60,90,1), padding='same'))
model.add(Activation('relu'))
model.add(MaxPool3D((1, 2, 2)))

model.add(Conv3D(90, 3, padding='same'))
model.add(Activation('relu'))
model.add(MaxPool3D((1, 2, 2)))

model.add(Conv3D(35, 3, padding='same'))
model.add(Activation('relu'))
model.add(MaxPool3D((1, 2, 2)))

model.add(TimeDistributed(Flatten()))

model.add(Bidirectional(LSTM(35, kernel_initializer='Orthogonal', return_sequences=True)))
model.add(Dropout(0.5))

model.add(Bidirectional(LSTM(35, kernel_initializer='Orthogonal', return_sequences=True)))
model.add(Dropout(0.5))

model.add(Dense(char_to_num.vocabulary_size()+1, kernel_initializer='he_normal', activation='softmax'))
model.summary()
#architecture of the model

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv3d (Conv3D)             (None, 96, 60, 90, 64)    1792      
                                                                 
 activation (Activation)     (None, 96, 60, 90, 64)    0         
                                                                 
 max_pooling3d (MaxPooling3D  (None, 96, 30, 45, 64)   0         
 )                                                               
                                                                 
 conv3d_1 (Conv3D)           (None, 96, 30, 45, 90)    155610    
                                                                 
 activation_1 (Activation)   (None, 96, 30, 45, 90)    0         
                                                                 
 max_pooling3d_1 (MaxPooling  (None, 96, 15, 22, 90)   0         
 3D)                                                    

In [8]:
model.load_weights(r'drive/MyDrive/Model Checkpoints/2e1e4.h5') #loading a previous checkpoint of the model

In [None]:
def scheduler(epoch, lr):
    return lr * 0.95
# lr = lr * 0.95


def CTCLoss(y_true, y_pred):
    batch_len = tf.cast(tf.shape(y_true)[0], dtype="int64")
    input_length = tf.cast(tf.shape(y_pred)[1], dtype="int64")
    label_length = tf.cast(tf.shape(y_true)[1], dtype="int64")

    input_length = input_length * tf.ones(shape=(batch_len, 1), dtype="int64")
    label_length = label_length * tf.ones(shape=(batch_len, 1), dtype="int64")

    loss = tf.keras.backend.ctc_batch_cost(y_true, y_pred, input_length, label_length)
    return loss
#CTC loss: https://keras.io/examples/audio/ctc_asr/

class ProduceExample(tf.keras.callbacks.Callback):
    def __init__(self, dataset) -> None:
        self.dataset = dataset.as_numpy_iterator()

    def on_epoch_end(self, epoch, logs=None) -> None:
        data = self.dataset.next()
        yhat = self.model.predict(data[0])
        decoded = tf.keras.backend.ctc_decode(yhat, [96,96], greedy=False)[0][0].numpy()
        for x in range(len(yhat)):
            print('Original: ', tf.strings.reduce_join(num_to_char(data[1][x])).numpy().decode('utf-8'))
            print('Prediction: ', tf.strings.reduce_join(num_to_char(decoded[x])).numpy().decode('utf-8'))
            print('=' * 100)
# Class used for displaying predictions to validation data during trainig

checkpoint_callback = ModelCheckpoint(os.path.join('models','checkpoint'), monitor='loss', save_weights_only=True)
schedule_callback = LearningRateScheduler(scheduler)
example_callback = ProduceExample(test)

In [None]:
model.compile(optimizer=Adam(learning_rate=0.000004), loss=CTCLoss)

In [None]:
counter = 0
for i in range(50):
    print(counter*2, 'epochs' + '~' * 50)
    counter += 1
    model.fit(train, validation_data=test, epochs=2, callbacks=[checkpoint_callback, schedule_callback, example_callback])
    model.save(r'drive/MyDrive/Model Checkpoints/2e1e4.h5')
#training using for loop to accomodata for Google Collab timeouts

0 epochs~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~


NameError: ignored

In [None]:
model.save(r'drive/MyDrive/Model Checkpoints/2e1e4.h5') # 2 e(pochs), lr=1e(-)4

In [9]:
dat = data.as_numpy_iterator().next()
print('-- 1')
yhat = model.predict(dat[0])
print('-- 2')
decoded = tf.keras.backend.ctc_decode(yhat, [91,91], greedy=False)[0][0].numpy()
for x in range(len(yhat)):
    print('Original: ', tf.strings.reduce_join(num_to_char(dat[1][x])).numpy().decode('utf-8'))
    print('Prediction: ', tf.strings.reduce_join(num_to_char(decoded[x])).numpy().decode('utf-8'))
    print('=' * 100)

#Predictions

-- 1
-- 2
Original:  rehdbayehsfayvs
Prediction:  rehdbayehs
Original:  dwihdhiyw
Prediction:  wihdh
