In [None]:
import os
import cv2
import tensorflow as tf
import numpy as np
from typing import List
from matplotlib import pyplot as plt

In [None]:
tf.config.list_physical_devices('GPU')

In [None]:
physical_devices = tf.config.list_physical_devices('GPU')
try:
    tf.config.experimental.set_memory_growth(physical_devices[0], True)
except:
    pass

In [None]:
# https://github.com/skaws2003/Dlib-lip-detection/tree/master

import dlib

hog_face_detector = dlib.get_frontal_face_detector()
dlib_facelandmark = dlib.shape_predictor("./detector/shape_predictor_68_face_landmarks.dat")

def load_video(path:str) -> List[float]: 

    cap = cv2.VideoCapture(path)
    frames = []
    for _ in range(int(cap.get(cv2.CAP_PROP_FRAME_COUNT))): 
        ret, frame = cap.read()
        if not ret:
            continue
        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        faces = hog_face_detector(frame)

        x67, y67 = 100, 100
        horizontal_padding, vertical_padding = 45, 27
        for face in faces:
            face_landmarks = dlib_facelandmark(frame, face)
            # for n in range(0, 68):
            #     x = face_landmarks.part(n).x
            #     y = face_landmarks.part(n).y
            #     cv2.circle(frame, (x, y), 1, (0, 0, 255), -1)
            x67 = face_landmarks.part(67).x
            y67 = face_landmarks.part(67).y
        lip_window = frame[y67-vertical_padding:y67+vertical_padding, x67-horizontal_padding:x67+horizontal_padding]
        lip_window = tf.expand_dims(lip_window, axis=-1)
            
        frames.append(lip_window)
    cap.release()
    # return frames
    mean = tf.math.reduce_mean(frames)
    std = tf.math.reduce_std(tf.cast(frames, tf.float32))
    return tf.cast((frames - mean), tf.float32) / std


test_path = './final_dataset/train/p0/clips/clip0.mp4'
frm = load_video(test_path)

plt.imshow(frm[20])

In [None]:
vocab = [x for x in " অআইঈউঊঋএঐওঔকখগঘঙচছজঝঞটঠডঢণতথদধনপফবভমযরলশষসহড়ঢ়য়ৎংঃঁািীুূেৈোৌৃ"]

In [None]:
char_to_num = tf.keras.layers.StringLookup(vocabulary=vocab, oov_token="")
num_to_char = tf.keras.layers.StringLookup(
    vocabulary=char_to_num.get_vocabulary(), oov_token="", invert=True
)

print(
    f"The vocabulary is: {char_to_num.get_vocabulary()} \n"
    f"(size ={char_to_num.vocabulary_size()})"
)

In [None]:
char_to_num([' ','অ', 'আ', 'ই', 'ঈ', 'উ', 'ঊ'])

In [None]:
print([bytes.decode(x) for x in num_to_char([14,  9,  3, 11, 13]).numpy()])

In [None]:
def load_alignments(path:str) -> List[str]:
    with open(path, 'r') as f:
        lines = f.readlines()
    tokens = []
    for line in lines:
        line = line.split()
        for word in line:
            for char in word:
                tokens.extend(char)
            tokens.extend(' ')
    # print(tokens)
    return char_to_num(tokens)[:-1]

test_path = './final_dataset/train/p0/texts/clip0.txt'
tokens = load_alignments(test_path)
print(tokens)
print([bytes.decode(x) for x in num_to_char(tokens).numpy()])

In [None]:
def load_data(path: tf.Tensor): 
    path = path.numpy().decode('utf-8')
    file_name = os.path.splitext(os.path.basename(path))[0]
    parent_dir = os.path.dirname(os.path.dirname(path))
    subdirectory = os.path.basename(os.path.dirname(path))
    # print(f'path: {path}')
    # print(f'file_name: {file_name}')
    # print(f'subdirectory: {subdirectory}')
    # print(f'parent_dir: {parent_dir}')

    # Adjust the paths according to your dataset structure
    video_path = os.path.join(parent_dir, 'clips', file_name + '.mp4')
    alignment_path = os.path.join(parent_dir, 'texts', file_name + '.txt')

    frames = load_video(video_path) 
    alignments = load_alignments(alignment_path)
    
    return frames, alignments

In [None]:
# test_path = './final_dataset/train/p0/clips/clip0.mp4'
# test_path = './final_dataset/test/p0/clips/clip0.mp4'
test_path = './final_dataset/val/p0/clips/clip0.mp4'

In [None]:
# for windows
# tf.convert_to_tensor(test_path).numpy().decode('utf-8').split('\\')[-1].split('.')[0]
tf.convert_to_tensor(test_path).numpy().decode('utf-8').split('/')[-1].split('.')[0]

In [None]:
frames, alignments = load_data(tf.convert_to_tensor(test_path))
# load_data(tf.convert_to_tensor(test_path))

In [None]:
plt.imshow(frames[15])
# frames[23]

In [None]:
# tf.strings.reduce_join([bytes.decode(x) for x in num_to_char(alignments.numpy()).numpy()])
print([bytes.decode(x) for x in num_to_char(alignments.numpy()).numpy()])

In [None]:
def mappable_function(path:str) ->List[str]:
    result = tf.py_function(load_data, [path], (tf.float32, tf.int64))
    return result

# Data Pipeline

In [None]:
import random
from glob import glob

base_directory = './final_dataset/'
train_directory = os.path.join(base_directory, 'train')
# print(train_directory)
train_pattern = os.path.join(train_directory, '**', '*.mp4')
train_files = glob(train_pattern, recursive=True)

test_directory = os.path.join(base_directory, 'test')
# print(test_directory)
test_pattern = os.path.join(test_directory, '**', '*.mp4')
test_files = glob(test_pattern, recursive=True)

val_directory = os.path.join(base_directory, 'val')
# print(val_directory)
val_pattern = os.path.join(val_directory, '**', '*.mp4')
val_files = glob(val_pattern, recursive=True)

train_size = len(train_files)
val_size = len(val_files)
test_size = len(test_files)
print(f'Train size: {len(train_files)}\nValidation size: {len(val_files)}\nTest size: {len(test_files)}')

print(train_files[0])
print(val_files[0])
print(test_files[0])



# Load datasets from file paths
train_data = tf.data.Dataset.from_tensor_slices(train_files)
val_data = tf.data.Dataset.from_tensor_slices(val_files)
test_data = tf.data.Dataset.from_tensor_slices(test_files)

# Shuffle the datasets
train_data = train_data.shuffle(train_size, reshuffle_each_iteration=False)
val_data = val_data.shuffle(val_size, reshuffle_each_iteration=False)
test_data = test_data.shuffle(test_size, reshuffle_each_iteration=False)

# Map preprocessing function and batch the datasets
train_data = train_data.map(mappable_function, num_parallel_calls=tf.data.AUTOTUNE)
val_data = val_data.map(mappable_function, num_parallel_calls=tf.data.AUTOTUNE)
test_data = test_data.map(mappable_function, num_parallel_calls=tf.data.AUTOTUNE)

In [None]:
def collect_data(dataset):
    frames = []
    labels = []
    i=0
    for x, y in dataset:
        x = np.array(x)
        y = np.array(y)
        frames.append(x)
        labels.append(y)
 
    # Pad frames and labels
    max_frame_length = 115
    frame_shape = (54, 90, 1)
    max_label_length = 55

    padded_frames = np.zeros((len(frames), max_frame_length, *frame_shape), dtype=np.float32)
    padded_labels = np.zeros((len(labels), max_label_length), dtype=np.int64)

    for i in range(len(frames)):
        frame_length = frames[i].shape[0]
        label_length = labels[i].shape[0]

        padded_frames[i, :frame_length] = frames[i]
        padded_labels[i, :label_length] = labels[i]

    return padded_frames, padded_labels

In [None]:
# Call above function for the first time


train_frames, train_labels = collect_data(train_data)
val_frames, val_labels = collect_data(val_data)
test_frames, test_labels = collect_data(test_data)

In [None]:
import pickle

with open('train_data.pkl', 'wb') as f:
    pickle.dump((train_frames, train_labels), f)

with open('val_data.pkl', 'wb') as f:
    pickle.dump((val_frames, val_labels), f)

with open('test_data.pkl', 'wb') as f:
    pickle.dump((test_frames, test_labels), f)

