**Seeing speech:  Analysis of lip movement and extracting speech using deep learning **

In [None]:
from tensorflow.keras.models import load_model

In [None]:
def CTCLoss(y_true, y_pred):
    batch_len = tf.cast(tf.shape(y_true)[0], dtype="int64")
    input_length = tf.cast(tf.shape(y_pred)[1], dtype="int64")
    label_length = tf.cast(tf.shape(y_true)[1], dtype="int64")

    input_length = input_length * tf.ones(shape=(batch_len, 1), dtype="int64")
    label_length = label_length * tf.ones(shape=(batch_len, 1), dtype="int64")

    loss = tf.keras.backend.ctc_batch_cost(y_true, y_pred, input_length, label_length)
    return loss

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import keras


In [None]:
custom_objects={'CTCLoss':CTCLoss}
with keras.saving.custom_object_scope(custom_objects):
  model = keras.models.load_model('/content/drive/MyDrive/lipReading/lipreadmodel.hdf5')

In [None]:
model.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv3d (Conv3D)             (None, 75, 46, 140, 128   3584      
                             )                                   
                                                                 
 activation (Activation)     (None, 75, 46, 140, 128   0         
                             )                                   
                                                                 
 max_pooling3d (MaxPooling3  (None, 75, 23, 70, 128)   0         
 D)                                                              
                                                                 
 conv3d_1 (Conv3D)           (None, 75, 23, 70, 256)   884992    
                                                                 
 activation_1 (Activation)   (None, 75, 23, 70, 256)   0         
                                                        

In [None]:
import zipfile

zip_ref = zipfile.ZipFile('/content/drive/MyDrive/lipReading/data.zip', 'r')
zip_ref.extractall('/content/')
zip_ref.close()

In [None]:
import os
import cv2
import tensorflow as tf
import numpy as np
from typing import List
from matplotlib import pyplot as plt
import imageio

In [None]:
def load_video(path:str) -> List[float]:

    cap = cv2.VideoCapture(path)
    frames = []
    for _ in range(int(cap.get(cv2.CAP_PROP_FRAME_COUNT))):
        ret, frame = cap.read()
        frame = tf.image.rgb_to_grayscale(frame)
        frames.append(frame[190:236,80:220,:])
    cap.release()

    mean = tf.math.reduce_mean(frames)
    std = tf.math.reduce_std(tf.cast(frames, tf.float32))
    return tf.cast((frames - mean), tf.float32) / std

In [None]:
vocab = [x for x in "abcdefghijklmnopqrstuvwxyz'?!123456789 "]

In [None]:
char_to_num = tf.keras.layers.StringLookup(vocabulary=vocab, oov_token="")
num_to_char = tf.keras.layers.StringLookup(
    vocabulary=char_to_num.get_vocabulary(), oov_token="", invert=True
)

In [None]:
def load_alignments(path:str) -> List[str]:
    with open(path, 'r') as f:
        lines = f.readlines()
    tokens = []
    for line in lines:
        line = line.split()
        if line[2] != 'sil':
            tokens = [*tokens,' ',line[2]]
    return char_to_num(tf.reshape(tf.strings.unicode_split(tokens, input_encoding='UTF-8'), (-1)))[1:]

In [None]:
def load_data(path: str):
    path = tf.convert_to_tensor(path).numpy().decode('utf-8')
    #file_name = path.split('/')[-1].split('.')[0]
    # File name splitting for windows
    from pathlib import Path
    file_name = Path(path).stem.split('\\')[-1]
    video_path = os.path.join('data','s1',f'{file_name}.mpg')
    alignment_path = os.path.join('data','alignments','s1',f'{file_name}.align')
    frames = load_video(video_path)
    alignments = load_alignments(alignment_path)

    return frames, alignments

In [None]:
sample = load_data(tf.convert_to_tensor('/content/data/s1/bbaf5a.mpg'))

In [None]:
print('~'*10, 'REAL TEXT', '~'*10)
real_text=[tf.strings.reduce_join([num_to_char(word) for word in sentence]) for sentence in [sample[1]]]
text_tensor = real_text[0]
text_string = text_tensor.numpy().decode('utf-8')
real_text_display = text_string.replace("b'", "").replace("'", "")
print(' '*4,real_text_display)

~~~~~~~~~~ REAL TEXT ~~~~~~~~~~
     bin blue at f five again


In [None]:
yhat = model.predict(tf.expand_dims(sample[0], axis=0))



In [None]:
decoded = tf.keras.backend.ctc_decode(yhat, input_length=[75], greedy=True)[0][0].numpy()

In [None]:
print('~'*10, 'PREDICTIONS', '~'*10)
predicted_text=[tf.strings.reduce_join([num_to_char(word) for word in sentence]) for sentence in decoded]
text_tensor = predicted_text[0]
text_string = text_tensor.numpy().decode('utf-8')
predicted_text_display = text_string.replace("b'", "").replace("'", "")
print(' '*5,predicted_text_display)

~~~~~~~~~~ PREDICTIONS ~~~~~~~~~~
      bin blue at f five again


In [None]:
!pip install gradio

Collecting gradio
  Downloading gradio-4.31.5-py3-none-any.whl (12.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m12.3/12.3 MB[0m [31m67.9 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting aiofiles<24.0,>=22.0 (from gradio)
  Downloading aiofiles-23.2.1-py3-none-any.whl (15 kB)
Collecting fastapi (from gradio)
  Downloading fastapi-0.111.0-py3-none-any.whl (91 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m92.0/92.0 kB[0m [31m14.6 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting ffmpy (from gradio)
  Downloading ffmpy-0.3.2.tar.gz (5.5 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting gradio-client==0.16.4 (from gradio)
  Downloading gradio_client-0.16.4-py3-none-any.whl (315 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m315.9/315.9 kB[0m [31m38.7 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting httpx>=0.24.1 (from gradio)
  Downloading httpx-0.27.0-py3-none-any.whl (75 kB)
[2K     [90m━━━━━━━━━━━━━━━━

In [None]:
import gradio as gr

# Define the function to load data and make predictions
def predict_lip_reading(video_path,video):
    # Load data
    sample = load_data(tf.convert_to_tensor(video_path))
    frames, real_text = load_data(video_path)

    # Make predictions
    yhat = model.predict(tf.expand_dims(sample[0], axis=0))
    decoded = tf.keras.backend.ctc_decode(yhat, input_length=[frames.shape[0]], greedy=True)[0][0].numpy()
    predicted_text = [tf.strings.reduce_join([num_to_char(word) for word in sentence]) for sentence in decoded]
    text_tensor = predicted_text[0]
    text_string = text_tensor.numpy().decode('utf-8')
    predicted_text_display = text_string.replace("b'", "").replace("'", "")

    real_text = [tf.strings.reduce_join([num_to_char(word) for word in sentence]) for sentence in [sample[1]]]
    text_tensor = real_text[0]
    text_string = text_tensor.numpy().decode('utf-8')
    real_text_display = text_string.replace("b'", "").replace("'", "")

    # Convert frames to a single video file for display
    return predicted_text_display,real_text_display,video


In [None]:
# Create Gradio interface
video_input = [gr.Textbox(label="Video File Path"),gr.PlayableVideo(label="Video")]
outputs = [gr.Textbox(label="Predicted Text"),gr.Textbox(label="Real Text"), gr.Video(label="Video")]

gr.Interface(fn=predict_lip_reading, inputs=video_input, outputs=outputs).launch()

Setting queue=True in a Colab notebook requires sharing enabled. Setting `share=True` (you can turn this off by setting `share=False` in `launch()` explicitly).

Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
Running on public URL: https://49f3ab7d444ef66d1f.gradio.live

This share link expires in 72 hours. For free permanent hosting and GPU upgrades, run `gradio deploy` from Terminal to deploy to Spaces (https://huggingface.co/spaces)


