<a href="https://colab.research.google.com/github/g1anci/ecl2_ex04/blob/main/Lip_Reading_(Visual_Speech_Recognition).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

- **Author:** Zifan Jiang
- **Email:** zifan.jiang@uzh.ch
- **Affiliation:** University of Zurich  
- **Date:** 2025-03-27

# Lip Reading (Visual Speech Recognition, VSR)

In this notebook, we will see and run a lip reading model in practice (also known as visual speech recognition, as opposed to the classic audio-based speech recognition), as an example of the topic - *Automatic Recognition of Visual Cues in Spoken Languages*.

This notebook is adapted from the research paper [Auto-AVSR: Audio-Visual Speech Recognition with Automatic Labels](https://arxiv.org/abs/2303.14307) and its [codebase](https://github.com/mpc001/auto_avsr), cited as follows:

```
@inproceedings{ma2023auto,
  author={Ma, Pingchuan and Haliassos, Alexandros and Fernandez-Lopez, Adriana and Chen, Honglie and Petridis, Stavros and Pantic, Maja},
  booktitle={IEEE International Conference on Acoustics, Speech and Signal Processing (ICASSP)},
  title={Auto-AVSR: Audio-Visual Speech Recognition with Automatic Labels},
  year={2023},
  pages={1-5},
  doi={10.1109/ICASSP49357.2023.10096889}
}
```





## Enviroment Setup

As always, we start by setting up the enviromemnts.

In [None]:
! git clone https://github.com/mpc001/auto_avsr.git

%cd /content/auto_avsr/preparation/
! pip install -r requirements.txt

! pip install mediapipe
! pip install pytorch-lightning
! pip install av
! pip install numpy==2.0.0

Cloning into 'auto_avsr'...
remote: Enumerating objects: 349, done.[K
remote: Counting objects: 100% (177/177), done.[K
remote: Compressing objects: 100% (124/124), done.[K
remote: Total 349 (delta 97), reused 53 (delta 53), pack-reused 172 (from 1)[K
Receiving objects: 100% (349/349), 31.49 MiB | 11.85 MiB/s, done.
Resolving deltas: 100% (133/133), done.
Updating files: 100% (66/66), done.
/content/auto_avsr/preparation
Collecting ffmpeg-python (from -r requirements.txt (line 3))
  Downloading ffmpeg_python-0.2.0-py3-none-any.whl.metadata (1.7 kB)
Downloading ffmpeg_python-0.2.0-py3-none-any.whl (25 kB)
Installing collected packages: ffmpeg-python
Successfully installed ffmpeg-python-0.2.0
Collecting mediapipe
  Downloading mediapipe-0.10.21-cp311-cp311-manylinux_2_28_x86_64.whl.metadata (9.7 kB)
Collecting numpy<2 (from mediapipe)
  Downloading numpy-1.26.4-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (61 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

**Note** To run this tutorial, please make sure you are in tutorials folder.

In [None]:
%cd /content/auto_avsr/tutorials/

/content/auto_avsr/tutorials


Then import packages:

In [None]:
import sys
sys.path.insert(0, "../")

import os
import torch
import torchaudio
import torchvision

## 1. Build an inference pipeline

The InferencePipeline carries out the following three steps:

1. Load audio or video data
2. Run pre-processing functions, e.g., recognise and locate the mouth
3. Run inference

The following code defines the pipeline:

In [None]:
import os
from lightning import ModelModule
from datamodule.transforms import AudioTransform, VideoTransform

import argparse
parser = argparse.ArgumentParser()
args, _ = parser.parse_known_args(args=[])

class InferencePipeline(torch.nn.Module):
    def __init__(self, args, ckpt_path, detector="retinaface"):
        super(InferencePipeline, self).__init__()
        self.modality = args.modality
        if self.modality == "audio":
            self.audio_transform = AudioTransform(subset="test")
        elif self.modality == "video":
            if detector == "mediapipe":
                from preparation.detectors.mediapipe.detector import LandmarksDetector
                from preparation.detectors.mediapipe.video_process import VideoProcess
                self.landmarks_detector = LandmarksDetector()
                self.video_process = VideoProcess(convert_gray=False)
            elif detector == "retinaface":
                from preparation.detectors.retinaface.detector import LandmarksDetector
                from preparation.detectors.retinaface.video_process import VideoProcess
                self.landmarks_detector = LandmarksDetector(device="cuda:0")
                self.video_process = VideoProcess(convert_gray=False)
            self.video_transform = VideoTransform(subset="test")

        ckpt = torch.load(ckpt_path, map_location=lambda storage, loc: storage)
        self.modelmodule = ModelModule(args)
        self.modelmodule.model.load_state_dict(ckpt)
        self.modelmodule.eval()

    def load_video(self, data_filename):
        return torchvision.io.read_video(data_filename, pts_unit="sec")[0].numpy()

    def forward(self, data_filename):
        data_filename = os.path.abspath(data_filename)
        assert os.path.isfile(data_filename), f"data_filename: {data_filename} does not exist."

        if self.modality == "audio":
            audio, sample_rate = self.load_audio(data_filename)
            audio = self.audio_process(audio, sample_rate)
            audio = audio.transpose(1, 0)
            audio = self.audio_transform(audio)
            with torch.no_grad():
                transcript = self.modelmodule(audio)

        if self.modality == "video":
            video = self.load_video(data_filename)
            landmarks = self.landmarks_detector(video)
            video = self.video_process(video, landmarks)
            video = torch.tensor(video)
            video = video.permute((0, 3, 1, 2))
            video = self.video_transform(video)
            with torch.no_grad():
                transcript = self.modelmodule(video)

        return transcript

    def load_audio(self, data_filename):
        waveform, sample_rate = torchaudio.load(data_filename, normalize=True)
        return waveform, sample_rate

    def load_video(self, data_filename):
        return torchvision.io.read_video(data_filename, pts_unit="sec")[0].numpy()

    def audio_process(self, waveform, sample_rate, target_sample_rate=16000):
        if sample_rate != target_sample_rate:
            waveform = torchaudio.functional.resample(
                waveform, sample_rate, target_sample_rate
            )
        waveform = torch.mean(waveform, dim=0, keepdim=True)
        return waveform

### 1.1 Download a pre-trained model

We also need to download a pre-trained model checkpoint using the `gdown` command (which is a handy tool to download public files from Google Drive to Colab):

In [None]:
! gdown 12PNM5szUsk_CuaV1yB9dL_YWvSM1zvAd
model_path = "/content/auto_avsr/tutorials/vsr_trlrs3_base.pth"

Downloading...
From (original): https://drive.google.com/uc?id=12PNM5szUsk_CuaV1yB9dL_YWvSM1zvAd
From (redirected): https://drive.google.com/uc?id=12PNM5szUsk_CuaV1yB9dL_YWvSM1zvAd&confirm=t&uuid=2dec105c-e497-4218-84f5-d15212025f00
To: /content/auto_avsr/tutorials/vsr_trlrs3_base.pth
100% 1.00G/1.00G [00:10<00:00, 91.8MB/s]


### 1.2 Initialize VSR pipeline

The final step involves:

- set the `modality` to `video` (the model also supports `audio`, but we skip that in this demo)
- set `mediapipe` as the mouth detector (recall we use Mediapipe for pose estimation, which tells the mouth's location)

In [None]:
setattr(args, 'modality', 'video')
pipeline = InferencePipeline(args, model_path, detector="mediapipe")

## 2. VSR inference

### 2.1 Download a video from web

In this example, we have prepared a video recorded in a noisy enviroment, which poses challenge for recogising the words by the audio signal, either by human ears or by a model.

In [None]:
!gdown 1KbY37seUm7LVNXNDdmPwqxy2ZlS3SbYe --output /content/
data_filename = "/content/vsr_demo.mp4"

Downloading...
From: https://drive.google.com/uc?id=1KbY37seUm7LVNXNDdmPwqxy2ZlS3SbYe
To: /content/vsr_demo.mp4
  0% 0.00/2.93M [00:00<?, ?B/s]100% 2.93M/2.93M [00:00<00:00, 187MB/s]


Here is the video, can you understand what the speaker is saying?

In [None]:
from IPython.display import HTML
from base64 import b64encode

mp4 = open(data_filename, 'rb').read()
data_url = "data:video/mp4;base64," + b64encode(mp4).decode()
HTML("""
<video width=480 controls>
      <source src="%s" type="video/mp4">
</video>
""" % data_url)

### 2.2 Run inference

Let's try inference with the lip reading model (may take a while)!

In [None]:
transcript = pipeline(data_filename)
print(transcript)

PLANT OUT STRAIGHT ENVIRONMENTS WHERE WE HAVE LARGE CHANGES IN INTERNET POLLS AND


### 2.3 Mouthing of sign language

We can also apply lip reading on a slient sign language video, since we know that signers sometimes mouth the word.

Let's try our familiar ASL "thank you" from [Spreadthesign](https://www.spreadthesign.com/).

In [None]:
! curl https://media.spreadthesign.com/video/mp4/13/153748.mp4 --output /content/sign_language.mp4

data_filename_sign = "/content/sign_language.mp4"

mp4 = open(data_filename_sign, 'rb').read()
data_url = "data:video/mp4;base64," + b64encode(mp4).decode()
HTML("""
<video width=480 controls>
      <source src="%s" type="video/mp4">
</video>
""" % data_url)

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 90558  100 90558    0     0   545k      0 --:--:-- --:--:-- --:--:--  549k


In [None]:
transcript = pipeline(data_filename_sign)
print(transcript)

THANK YOU THANK YOU


It works! Feel free to try out anything else.