In [None]:
!pip list

In [None]:
import os
import cv2
import tensorflow as tf
import numpy as np
from typing import List
from matplotlib import pyplot as plt
import imageio

In [None]:
physical_devices = tf.config.list_physical_devices('GPU')
try:
    tf.config.experimental.set_memory_growth(physical_devices[0], True)
except:
    pass

In [None]:
import gdown    # easier to import files from Gdrive

In [None]:
url = 'https://drive.google.com/uc?id=1YlvpDLix3S-U8fd-gqRwPcWXAXm8JwjL'
output = 'data.zip'
gdown.download(url, output, quiet=False)
gdown.extractall('data.zip')

In [None]:
def load_video(path:str) -> List[float]:
    cap = cv2.VideoCapture(path)
    frames = []
    for _ in range(int(cap.get(cv2.CAP_PROP_FRAME_COUNT))):
        ret, frame = cap.read()
        frame = tf.image.rgb_to_grayscale(frame)
        frames.append(frame[190:236, 80:220, :])    # to etract the lip frames
    cap.release()

    mean = tf.math.reduce_mean(frames)
    std = tf.math.reduce_std(tf.cast(frames, tf.float32))
    return tf.cast((frames-mean), tf.float32)/std

In [None]:
vocab = [x for x in "abcdefghijklmnopqrstuvwxyz'?!1234567890 "]

In [None]:
vocab

In [None]:
char_to_num = tf.keras.layers.StringLookup(vocabulary=vocab, oov_token="")
num_to_char = tf.keras.layers.StringLookup(vocabulary=char_to_num.get_vocabulary(), oov_token="", invert=True)
print(
    f"The vocabulary is: {char_to_num.get_vocabulary()} "
    f"(size={char_to_num.vocabulary_size()})"
)

In [None]:
char_to_num(['a', 'm', 'e', 'y', 'a'])

In [None]:
num_to_char([ 1, 13,  5, 25,  1])

In [None]:
def load_alignments(path:str) -> List[str]:
    with open(path, 'r') as f:
        lines = f.readlines()
    tokens = []
    for line in lines:
        line = line.split()
        if line[2] != 'sil':    # ignore the line with silence
            tokens = [*tokens, ' ', line[2]]
    return char_to_num(tf.reshape(tf.strings.unicode_split(tokens, input_encoding='UTF-8'), (-1)))[1:]

In [None]:
def load_data(path:str):
    path = bytes.decode(path.numpy())
    # file name splitting
    file_name = path.split('\\')[-1].split('.')[0]
    video_path = os.path.join('data', 's1', f'{file_name}.mpg')
    alignment_path = os.path.join('data', 'alignments', 's1', f'{file_name}.align')
    frames = load_video(video_path)
    alignments = load_alignments(alignment_path)
    return frames, alignments

In [None]:
test_path = ".\\data\\s1\\bbal6n.mpg"

In [None]:
tf.convert_to_tensor(test_path).numpy().decode('utf-8').split('\\')[-1].split(".")[0]

In [None]:
frames, alignments = load_data(tf.convert_to_tensor(test_path))

In [None]:
plt.imshow(frames[10])

In [None]:
alignments

In [None]:
tf.strings.reduce_join([bytes.decode(x) for x in num_to_char(alignments.numpy()).numpy()])

In [None]:
def mappable_function(path:str) -> List[str]:
    result = tf.py_function(load_data, [path], (tf.float32, tf.int64))
    return result

In [None]:
data = tf.data.Dataset.list_files('./data/s1/*.mpg')
data = data.shuffle(500)
data = data.map(mappable_function)
data = data.padded_batch(2, padded_shapes=([75, None, None, None], [40]))
data = data.prefetch(tf.data.AUTOTUNE)

In [None]:
frames, alignments = data.as_numpy_iterator().next()

In [None]:
len(frames)

In [None]:
test = data.as_numpy_iterator()

In [None]:
val = test.next(); val[0]

In [None]:
imageio.mimsave('./animation.gif', val[0][0], fps=10)    # changes data to gif