# ***IMPORTS***

In [1]:
!pip install jiwer

Collecting jiwer
  Downloading jiwer-3.0.3-py3-none-any.whl (21 kB)
Installing collected packages: jiwer
Successfully installed jiwer-3.0.3


In [2]:
!pip install mediapipe

Collecting mediapipe
  Downloading mediapipe-0.10.11-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (35.6 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m35.6/35.6 MB[0m [31m26.3 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
Collecting sounddevice>=0.4.4 (from mediapipe)
  Downloading sounddevice-0.4.6-py3-none-any.whl (31 kB)
Installing collected packages: sounddevice, mediapipe
Successfully installed mediapipe-0.10.11 sounddevice-0.4.6


In [1]:
import os
import cv2
import tensorflow as tf
import numpy as np
from typing import List, Tuple
# import jiwer
from keras.models import Sequential
from keras.layers import Conv3D, LSTM, Dense, Dropout, Bidirectional, MaxPool3D, Activation, TimeDistributed, Flatten
from tensorflow.keras.optimizers import Adam
from keras.callbacks import ModelCheckpoint, ReduceLROnPlateau, CSVLogger
# import mediapipe as mp



In [None]:
# tf.__version__

In [None]:
# tf.config.list_physical_devices('GPU')

In [None]:
# physical_devices = tf.config.list_physical_devices('GPU')
# try:
#     tf.config.experimental.set_memory_growth(physical_devices[0], True)
# except:
#     pass

# ***Preprocessing DATA***

In [4]:
from collections import defaultdict

speaker_lip_landmarks = defaultdict(lambda :[float('inf'),float('-inf'),float('inf'),float('-inf')])

In [5]:
paths = ['/kaggle/input/gridcorpus-s1-s15/data/s1/lgaf7a.mpg',
        '/kaggle/input/gridcorpus-s1-s15/data/s2/pbwp6s.mpg',
         '/kaggle/input/gridcorpus-s1-s15/data/s3/lwwz6n.mpg',
         '/kaggle/input/gridcorpus-s1-s15/data/s4/lgbmzs.mpg',
         '/kaggle/input/gridcorpus-s1-s15/data/s5/srahzn.mpg',
         '/kaggle/input/gridcorpus-s1-s15/data/s6/bbae7n.mpg',
         '/kaggle/input/gridcorpus-s1-s15/data/s7/bbae6n.mpg',
         '/kaggle/input/gridcorpus-s1-s15/data/s8/bbae5n.mpg',
         '/kaggle/input/gridcorpus-s1-s15/data/s9/bbae4n.mpg',
         '/kaggle/input/gridcorpus-s1-s15/data/s10/bbab8n.mpg',
         '/kaggle/input/gridcorpus-s1-s15/data/s11/bbae2n.mpg',
         '/kaggle/input/gridcorpus-s1-s15/data/s12/bbae1n.mpg',
         '/kaggle/input/gridcorpus-s1-s15/data/s13/bbae1s.mpg',
         '/kaggle/input/gridcorpus-s1-s15/data/s14/bbad9n.mpg'
        ]

In [6]:
from mediapipe.python.solutions.drawing_utils import _normalized_to_pixel_coordinates
mp_face_mesh = mp.solutions.face_mesh
face_mesh = mp_face_mesh.FaceMesh(min_detection_confidence=0.5, min_tracking_confidence=0.5)

INFO: Created TensorFlow Lite XNNPACK delegate for CPU.


In [7]:
for path in paths:
    cap = cv2.VideoCapture(path)
    speaker=path.split('/')[-2][1:]
    for _ in range(int(cap.get(cv2.CAP_PROP_FRAME_COUNT))):
        ret, image = cap.read()
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

        image_rows, image_cols, _ = image.shape

        # Detect faces
        results = face_mesh.process(image)

        # Check if any faces are detected
        if results.multi_face_landmarks:
            for face_landmarks in results.multi_face_landmarks:
                # Extract lip landmarks
                lip_landmarks = [
                    _normalized_to_pixel_coordinates(face_landmarks.landmark[i].x,face_landmarks.landmark[i].y, image_cols,image_rows) for i in [0,17,61,291]
                ]

            speaker_lip_landmarks[speaker][0] = min(speaker_lip_landmarks[speaker][0],lip_landmarks[2][0])
            speaker_lip_landmarks[speaker][1] = max(speaker_lip_landmarks[speaker][1],lip_landmarks[3][0])
            speaker_lip_landmarks[speaker][2] = min(speaker_lip_landmarks[speaker][2],lip_landmarks[0][1])
            speaker_lip_landmarks[speaker][3] = max(speaker_lip_landmarks[speaker][3],lip_landmarks[1][1])

    cap.release()

# ***LOADING AND SPLITTING DATA***

In [2]:
vocab = [x for x in "abcdefghijklmnopqrstuvwxyz'?!123456789 "]

In [3]:
char_to_num = tf.keras.layers.StringLookup(vocabulary=vocab, oov_token="")
num_to_char = tf.keras.layers.StringLookup(vocabulary=char_to_num.get_vocabulary(), oov_token="", invert=True)

print(
    f"The vocabulary is: {char_to_num.get_vocabulary()} "
    f"(size ={char_to_num.vocabulary_size()})"
)

The vocabulary is: ['', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', "'", '?', '!', '1', '2', '3', '4', '5', '6', '7', '8', '9', ' '] (size =40)


In [10]:
def load_video(path: str):
    speaker=path.split('/')[-2][1:]
    if speaker=='15':
        return tf.constant([], dtype=tf.float32)
    cap = cv2.VideoCapture(path)
    frames = []
    for _ in range(int(cap.get(cv2.CAP_PROP_FRAME_COUNT))):
        ret, frame = cap.read()
        try:
            frame = tf.image.rgb_to_grayscale(frame)
        except:
            cap.release()
            return tf.constant([], dtype=tf.float32)
        frames.append(frame[speaker_lip_landmarks[speaker][2]-5:speaker_lip_landmarks[speaker][3]+5, speaker_lip_landmarks[speaker][0]-5:speaker_lip_landmarks[speaker][1]+5,:])
    cap.release()

    mean = tf.math.reduce_mean(frames)
    std = tf.math.reduce_std(tf.cast(frames, tf.float32))
    return tf.cast((frames - mean), tf.float32) / std

In [11]:
def load_alignments(path: str) -> List[str]:
    with open(path, 'r') as f:
        lines = f.readlines()
    tokens = []
    for line in lines:
        line = line.split()
        if line[2] != 'sil':
            tokens = [*tokens,' ',line[2]]
    return char_to_num(tf.reshape(tf.strings.unicode_split(tokens, input_encoding='UTF-8'), (-1)))[1:]

In [12]:
def load_data(path: str):
    path = bytes.decode(path.numpy())
    file_name = path.split('/')[-1].split('.')[0]
    speaker=path.split('/')[-2][1:]
    video_path = os.path.join('/kaggle/input/gridcorpus-s1-s15','data',f's{speaker}',f'{file_name}.mpg')
    alignment_path = os.path.join('/kaggle/input/gridcorpus-s1-s15','data','alignments',f's{speaker}',f'{file_name}.align')
    frames = load_video(video_path)
    alignments = load_alignments(alignment_path)

    return frames, alignments

In [13]:
def mappable_function(path: str):
    result = tf.py_function(load_data, [path], (tf.float32, tf.int64))
    return result

In [14]:
data = tf.data.Dataset.list_files('/kaggle/input/gridcorpus-s1-s15/data/s[1-9]/*.mpg')
data = data.shuffle(500, reshuffle_each_iteration=False)
data = data.map(mappable_function)
data = data.filter(lambda x,y: tf.size(x)>0)
data = data.padded_batch(5, padded_shapes=([75,70,70,1],[40]))
data = data.prefetch(tf.data.AUTOTUNE)

In [15]:
train = data.take(1550)
test = data.skip(1550)

# **Building Model**

In [4]:
def Build_Model() -> Sequential:
    model = Sequential()

    model.add(Conv3D(128, 3, input_shape=(75,50,70,1), padding='same'))
    model.add(Activation('relu'))
    model.add(MaxPool3D((1,2,2)))

    model.add(Conv3D(256, 3, padding='same'))
    model.add(Activation('relu'))
    model.add(MaxPool3D((1,2,2)))
    
    model.add(Conv3D(75, 3, padding='same'))
    model.add(Activation('relu'))
    model.add(MaxPool3D((1,2,2)))

    model.add(TimeDistributed(Flatten()))

    model.add(Bidirectional(LSTM(128, kernel_initializer='Orthogonal', return_sequences=True)))
    model.add(Dropout(.5))

    model.add(Bidirectional(LSTM(128, kernel_initializer='Orthogonal', return_sequences=True)))
    model.add(Dropout(.5))

    model.add(Dense(char_to_num.vocabulary_size()+1, kernel_initializer='he_normal', activation='softmax'))

    return model

In [5]:
model = Build_Model()
model.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv3d (Conv3D)             (None, 75, 50, 70, 128)   3584      
                                                                 
 activation (Activation)     (None, 75, 50, 70, 128)   0         
                                                                 
 max_pooling3d (MaxPooling3D  (None, 75, 25, 35, 128)  0         
 )                                                               
                                                                 
 conv3d_1 (Conv3D)           (None, 75, 25, 35, 256)   884992    
                                                                 
 activation_1 (Activation)   (None, 75, 25, 35, 256)   0         
                                                                 
 max_pooling3d_1 (MaxPooling  (None, 75, 12, 17, 256)  0         
 3D)                                                    

# ***Training***

In [6]:
def CTCLoss(y_true, y_pred):
    batch_len = tf.cast(tf.shape(y_true)[0], dtype="int64")
    input_length = tf.cast(tf.shape(y_pred)[1], dtype="int64")
    label_length = tf.cast(tf.shape(y_true)[1], dtype="int64")

    input_length = input_length * tf.ones(shape=(batch_len, 1), dtype="int64")
    label_length = label_length * tf.ones(shape=(batch_len, 1), dtype="int64")

    loss = tf.keras.backend.ctc_batch_cost(y_true, y_pred, input_length, label_length)
    return loss

In [None]:
# class ProduceExample(tf.keras.callbacks.Callback):
#     def __init__(self, dataset):
#         self.dataset = dataset.as_numpy_iterator()

#     def on_epoch_end(self, epoch, logs=None):
#         data = self.dataset.next()
#         yhat = self.model.predict(data[0])
#         decoded = tf.keras.backend.ctc_decode(yhat, [75, 75], greedy=False)[0][0].numpy()

#         for x in range(len(yhat)):
#             original_text = tf.strings.reduce_join(num_to_char(data[1][x])).numpy().decode('utf-8')
#             predicted_text = tf.strings.reduce_join(num_to_char(decoded[x])).numpy().decode('utf-8')
#             accuracy = 1 - jiwer.wer(original_text, predicted_text)

#             print('Original:', original_text)
#             print('Prediction:', predicted_text)
#             print('Accuracy:', accuracy)
#             print('~' * 100)

In [7]:
model.compile(optimizer=Adam(learning_rate=0.0001), loss=CTCLoss)
checkpoint_callback = ModelCheckpoint('lipreading_weights_9speaker_mp_{epoch}.h5', monitor='val_loss', save_weights_only=True,save_best_only=True)
reduceLROnPlat = ReduceLROnPlateau(monitor='val_loss', factor=0.8, patience=2, verbose=1, mode='auto', min_delta=0.0001, cooldown=5, min_lr = 0.00009048374)
# csv_logger = CSVLogger('history.csv', append=True)

In [8]:
model.load_weights('/kaggle/input/lipreadingweights/lipreading_weights_9speaker_mp_31.h5')

In [None]:
model.fit(train, validation_data=test, epochs=10, callbacks=[checkpoint_callback, reduceLROnPlat])

In [9]:
model.save('model.keras')

# ***TESTING***

# On Seen speakers

In [None]:
# acc = []

In [None]:
# class Testing:
#     def __init__(self, data):
#         yhat = model.predict(data[0])
#         if len(yhat)==4:
#             decoded = tf.keras.backend.ctc_decode(yhat, [75, 75,75,75], greedy=False)[0][0].numpy()
#         elif len(yhat)==3:
#             decoded = tf.keras.backend.ctc_decode(yhat, [75, 75,75], greedy=False)[0][0].numpy()
#         elif len(yhat)==2:
#             decoded = tf.keras.backend.ctc_decode(yhat, [75, 75], greedy=False)[0][0].numpy()
#         else:
#             decoded = tf.keras.backend.ctc_decode(yhat, [75], greedy=False)[0][0].numpy()

#         for x in range(len(yhat)):
#             original_text = tf.strings.reduce_join(num_to_char(data[1][x])).numpy().decode('utf-8')
#             predicted_text = tf.strings.reduce_join(num_to_char(decoded[x])).numpy().decode('utf-8')
#             accuracy = 1 - jiwer.wer(original_text, predicted_text)

#             acc.append(accuracy)
#             print('Original:', original_text)
#             print('Prediction:', predicted_text)
#             print('Accuracy:', accuracy)
#             print('~' * 100)

In [None]:
# for d in test:
#     Testing(d)

In [None]:
# print(f"Accuracy of Model on seen data: {((sum(acc)/len(acc))*100):.2f}%")

In [None]:
# print('Test data size : ',len(acc))

# On Unseen Speakers

In [None]:
# unseen = tf.data.Dataset.list_files('/kaggle/input/gridcorpus-s1-s15/data/s10/*.mpg')
# unseen = unseen.shuffle(50, reshuffle_each_iteration=False)
# unseen = unseen.map(mappable_function)
# unseen = unseen.filter(lambda x,y: tf.size(x)>0)
# unseen = unseen.padded_batch(2, padded_shapes=([75,50,70,1],[40]))
# unseen = unseen.prefetch(tf.data.AUTOTUNE)

In [None]:
# acc = []

In [None]:
# for d in unseen:
#     try:
#         Testing(d)
#     except:
#         continue

In [None]:
# print(f"Accuracy of Model on Unseen data: {((sum(acc)/len(acc))*100):.2f}%")

In [None]:
# yhat = model.predict(data[0])
# decoded = tf.keras.backend.ctc_decode(yhat, [75, 75], greedy=False)[0][0].numpy()

In [None]:
# decoded

In [None]:
# predicted_text = tf.strings.reduce_join(num_to_char(decoded[1])).numpy().decode('utf-8')

In [None]:
# predicted_text