In [None]:
!pip install pytorch_lightning sentencepiece av streamlit pyngrok

Collecting pytorch_lightning
  Downloading pytorch_lightning-2.5.0.post0-py3-none-any.whl.metadata (21 kB)
Collecting av
  Downloading av-14.1.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4.7 kB)
Collecting streamlit
  Downloading streamlit-1.42.1-py2.py3-none-any.whl.metadata (8.9 kB)
Collecting pyngrok
  Downloading pyngrok-7.2.3-py3-none-any.whl.metadata (8.7 kB)
Collecting torchmetrics>=0.7.0 (from pytorch_lightning)
  Downloading torchmetrics-1.6.1-py3-none-any.whl.metadata (21 kB)
Collecting lightning-utilities>=0.10.0 (from pytorch_lightning)
  Downloading lightning_utilities-0.12.0-py3-none-any.whl.metadata (5.6 kB)
Collecting watchdog<7,>=2.1.5 (from streamlit)
  Downloading watchdog-6.0.0-py3-none-manylinux2014_x86_64.whl.metadata (44 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m44.3/44.3 kB[0m [31m1.7 MB/s[0m eta [36m0:00:00[0m
Collecting pydeck<1,>=0.8.0b4 (from streamlit)
  Downloading pydeck-0.9.1-py2.py3-none-any.whl.

In [None]:
import os
import torch
import torchaudio
import torchvision

In [None]:
!git clone https://github.com/mpc001/auto_avsr.git

Cloning into 'auto_avsr'...
remote: Enumerating objects: 349, done.[K
remote: Counting objects: 100% (192/192), done.[K
remote: Compressing objects: 100% (127/127), done.[K
remote: Total 349 (delta 95), reused 94 (delta 62), pack-reused 157 (from 1)[K
Receiving objects: 100% (349/349), 31.49 MiB | 18.56 MiB/s, done.
Resolving deltas: 100% (130/130), done.


In [None]:
!git clone https://github.com/hhj1897/face_alignment.git

Cloning into 'face_alignment'...
remote: Enumerating objects: 190, done.[K
remote: Counting objects: 100% (32/32), done.[K
remote: Compressing objects: 100% (6/6), done.[K
remote: Total 190 (delta 27), reused 27 (delta 26), pack-reused 158 (from 1)[K
Receiving objects: 100% (190/190), 213.82 MiB | 30.47 MiB/s, done.
Resolving deltas: 100% (84/84), done.
Updating files: 100% (14/14), done.


In [None]:
!git clone https://github.com/hhj1897/face_detection.git

Cloning into 'face_detection'...
remote: Enumerating objects: 300, done.[K
remote: Counting objects: 100% (50/50), done.[K
remote: Compressing objects: 100% (11/11), done.[K
remote: Total 300 (delta 41), reused 39 (delta 39), pack-reused 250 (from 1)[K
Receiving objects: 100% (300/300), 81.19 MiB | 23.75 MiB/s, done.
Resolving deltas: 100% (141/141), done.


In [None]:
!mv /content/face_alignment/ibug /content/auto_avsr
!mv /content/face_detection/ibug/face_detection /content/auto_avsr/ibug

mv: cannot stat '/content/face_alignment/ibug': No such file or directory
mv: cannot stat '/content/face_detection/ibug/face_detection': No such file or directory


In [None]:
!rm -rf /content/face_alignment /content/face_detection /content/sample_data

In [None]:
%cd /content/auto_avsr
%ls

/content/auto_avsr
average_checkpoints.py  [0m[01;34mdoc[0m/      [01;34mibug[0m/           [01;32mlightning.py[0m*  [01;34mspm[0m/
cosine.py               [01;34mespnet[0m/   INSTRUCTION.md  [01;34mpreparation[0m/   [01;32mtrain.py[0m*
[01;34mdatamodule[0m/             [01;32meval.py[0m*  LICENSE         [01;32mREADME.md[0m*     [01;34mtutorials[0m/


In [None]:
import os
from lightning import ModelModule
from datamodule.transforms import AudioTransform, VideoTransform

In [None]:
import argparse
parser = argparse.ArgumentParser()
args, _ = parser.parse_known_args(args=[])

In [None]:
class InferencePipeline(torch.nn.Module):
    def __init__(self, args, ckpt_path, detector="retinaface"):
        super(InferencePipeline, self).__init__()
        self.modality = args.modality
        if self.modality == "audio":
            self.audio_transform = AudioTransform(subset="test")
        elif self.modality == "video":
            if detector == "mediapipe":
                from preparation.detectors.mediapipe.detector import LandmarksDetector
                from preparation.detectors.mediapipe.video_process import VideoProcess
                self.landmarks_detector = LandmarksDetector()
                self.video_process = VideoProcess(convert_gray=False)
            elif detector == "retinaface":
                from preparation.detectors.retinaface.detector import LandmarksDetector
                from preparation.detectors.retinaface.video_process import VideoProcess
                self.landmarks_detector = LandmarksDetector(device="cuda:0")
                self.video_process = VideoProcess(convert_gray=False)
            self.video_transform = VideoTransform(subset="test")

        ckpt = torch.load(ckpt_path, map_location=lambda storage, loc: storage)
        self.modelmodule = ModelModule(args)
        self.modelmodule.model.load_state_dict(ckpt)
        self.modelmodule.eval()

    def load_video(self, data_filename):
        return torchvision.io.read_video(data_filename, pts_unit="sec")[0].numpy()

    def forward(self, data_filename):
        data_filename = os.path.abspath(data_filename)
        assert os.path.isfile(data_filename), f"data_filename: {data_filename} does not exist."

        if self.modality == "audio":
            audio, sample_rate = self.load_audio(data_filename)
            audio = self.audio_process(audio, sample_rate)
            audio = audio.transpose(1, 0)
            audio = self.audio_transform(audio)
            with torch.no_grad():
                transcript = self.modelmodule(audio)

        if self.modality == "video":
            video = self.load_video(data_filename)
            landmarks = self.landmarks_detector(video)
            video = self.video_process(video, landmarks)
            video = torch.tensor(video)
            video = video.permute((0, 3, 1, 2))
            video = self.video_transform(video)
            with torch.no_grad():
                transcript = self.modelmodule(video)

        return transcript

    def load_audio(self, data_filename):
        waveform, sample_rate = torchaudio.load(data_filename, normalize=True)
        return waveform, sample_rate

    def load_video(self, data_filename):
        return torchvision.io.read_video(data_filename, pts_unit="sec")[0].numpy()

    def audio_process(self, waveform, sample_rate, target_sample_rate=16000):
        if sample_rate != target_sample_rate:
            waveform = torchaudio.functional.resample(
                waveform, sample_rate, target_sample_rate
            )
        waveform = torch.mean(waveform, dim=0, keepdim=True)
        return waveform

In [None]:
#https://drive.google.com/file/d/1r1kx7l9sWnDOCnaFHIGvOtzuhFyFA88_/view?usp=sharing
!gdown 1r1kx7l9sWnDOCnaFHIGvOtzuhFyFA88_

Downloading...
From (original): https://drive.google.com/uc?id=1r1kx7l9sWnDOCnaFHIGvOtzuhFyFA88_
From (redirected): https://drive.google.com/uc?id=1r1kx7l9sWnDOCnaFHIGvOtzuhFyFA88_&confirm=t&uuid=600e4400-f560-413d-9c67-b56f2fb3d12a
To: /content/auto_avsr/vsr_trlrs2lrs3vox2avsp_base.pth
100% 1.00G/1.00G [00:11<00:00, 85.0MB/s]


In [None]:
model_path = '/content/auto_avsr/vsr_trlrs2lrs3vox2avsp_base.pth'

In [None]:
setattr(args, 'modality', 'video')
pipeline = InferencePipeline(args, model_path, detector="retinaface")

In [None]:
transcript = pipeline("/content/WhatsApp Video 2025-02-18 at 7.50.36 PM.mp4")
print(transcript)

KeyboardInterrupt: 

In [None]:
system_prompt = "You are an assistant that helps make corrections to the output of a lipreading model. The text you will receive was transcribed using a video-to-text system that attempts to lipread the subject speaking in the video, so the text will likely be imperfect.\n\nIf something seems unusual, assume it was mistranscribed. Do your best to infer the words actually spoken, and make changes to the mistranscriptions in your response. Do not add more words or content, just change the ones that seem to be out of place (and, therefore, mistranscribed). Do not change even the wording of sentences, just individual words that look nonsensical in the context of all of the other words in the sentence.\n\nAlso, add correct punctuation to the entire text. ALWAYS end each sentence with the appropriate sentence ending: '.', '?', or '!'. The input text in all-caps, although your respose should be capitalized correctly and should NOT be in all-caps.\n\nReturn the corrected text."
user_prompt = f"Transcription:\n\n{transcript}"

NameError: name 'transcript' is not defined

In [None]:
from transformers import AutoModelForCausalLM, AutoTokenizer

model_name = "Qwen/Qwen2.5-1.5B-Instruct"

model = AutoModelForCausalLM.from_pretrained(
    model_name,
    torch_dtype="auto",
    device_map="auto"
)
tokenizer = AutoTokenizer.from_pretrained(model_name)

config.json:   0%|          | 0.00/660 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/3.09G [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/242 [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/7.30k [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/2.78M [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/1.67M [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/7.03M [00:00<?, ?B/s]

In [None]:
messages = [
    {"role": "system", "content": system_prompt},
    {"role": "user", "content": user_prompt}
]
text = tokenizer.apply_chat_template(
    messages,
    tokenize=False,
    add_generation_prompt=True
)
model_inputs = tokenizer([text], return_tensors="pt").to(model.device)

generated_ids = model.generate(
    **model_inputs,
    max_new_tokens=512
)
generated_ids = [
    output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids)
]

response = tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0]

NameError: name 'user_prompt' is not defined

In [None]:
response

NameError: name 'response' is not defined

In [None]:
%%writefile app.py
import os
import argparse
import torch
import torchaudio
import torchvision
import streamlit as st

# Import your custom modules
from lightning import ModelModule
from datamodule.transforms import AudioTransform, VideoTransform

# -----------------------------
# Inference Pipeline Definition
# -----------------------------
class InferencePipeline(torch.nn.Module):
    def __init__(self, args, ckpt_path, detector="retinaface"):
        super(InferencePipeline, self).__init__()
        self.modality = args.modality
        if self.modality == "audio":
            self.audio_transform = AudioTransform(subset="test")
        elif self.modality == "video":
            if detector == "mediapipe":
                from preparation.detectors.mediapipe.detector import LandmarksDetector
                from preparation.detectors.mediapipe.video_process import VideoProcess
                self.landmarks_detector = LandmarksDetector()
                self.video_process = VideoProcess(convert_gray=False)
            elif detector == "retinaface":
                from preparation.detectors.retinaface.detector import LandmarksDetector
                from preparation.detectors.retinaface.video_process import VideoProcess
                self.landmarks_detector = LandmarksDetector(device="cuda:0")
                self.video_process = VideoProcess(convert_gray=False)
            self.video_transform = VideoTransform(subset="test")

        # Load the model checkpoint (adjust the path if needed)
        ckpt = torch.load(ckpt_path, map_location=lambda storage, loc: storage)
        self.modelmodule = ModelModule(args)
        self.modelmodule.model.load_state_dict(ckpt)
        self.modelmodule.eval()

    def load_video(self, data_filename):
        # Returns a numpy array of the video frames
        return torchvision.io.read_video(data_filename, pts_unit="sec")[0].numpy()

    def forward(self, data_filename):
        data_filename = os.path.abspath(data_filename)
        assert os.path.isfile(data_filename), f"data_filename: {data_filename} does not exist."

        if self.modality == "audio":
            audio, sample_rate = self.load_audio(data_filename)
            audio = self.audio_process(audio, sample_rate)
            audio = audio.transpose(1, 0)
            audio = self.audio_transform(audio)
            with torch.no_grad():
                transcript = self.modelmodule(audio)

        if self.modality == "video":
            video = self.load_video(data_filename)
            landmarks = self.landmarks_detector(video)
            video = self.video_process(video, landmarks)
            video = torch.tensor(video)
            video = video.permute((0, 3, 1, 2))
            video = self.video_transform(video)
            with torch.no_grad():
                transcript = self.modelmodule(video)

        return transcript

    def load_audio(self, data_filename):
        waveform, sample_rate = torchaudio.load(data_filename, normalize=True)
        return waveform, sample_rate

    def audio_process(self, waveform, sample_rate, target_sample_rate=16000):
        if sample_rate != target_sample_rate:
            waveform = torchaudio.functional.resample(waveform, sample_rate, target_sample_rate)
        waveform = torch.mean(waveform, dim=0, keepdim=True)
        return waveform

    def load_video(self, data_filename):
        return torchvision.io.read_video(data_filename, pts_unit="sec")[0].numpy()


# -----------------------------------------------------
# Cache the Hugging Face model and tokenizer (for LLM)
# -----------------------------------------------------
@st.cache_resource
def load_hf_model():
    from transformers import AutoModelForCausalLM, AutoTokenizer
    model_name = "Qwen/Qwen2.5-1.5B-Instruct"
    model = AutoModelForCausalLM.from_pretrained(
        model_name,
        torch_dtype="auto",
        device_map="auto"
    )
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    return model, tokenizer


# -------------------------------------------------------
# Cache the lip reading inference pipeline (optional)
# -------------------------------------------------------
@st.cache_resource
def load_pipeline():
    # Create a dummy argparse.Namespace with the required attributes
    args = argparse.Namespace()
    setattr(args, 'modality', 'video')
    # Set the path to your lip reading model checkpoint (update this path if needed)
    model_path = '/content/auto_avsr/vsr_trlrs2lrs3vox2avsp_base.pth'
    pipeline = InferencePipeline(args, model_path, detector="retinaface")
    return pipeline


# ------------------------------------------------------------
# Function to generate corrected transcript using Hugging Face LLM
# ------------------------------------------------------------
def generate_corrected_text(transcript, hf_model, tokenizer):
    system_prompt = (
        "You are an assistant that helps make corrections to the output of a lipreading model. "
        "The text you will receive was transcribed using a video-to-text system that attempts to lipread the subject speaking in the video, so the text will likely be imperfect.\n\n"
        "If something seems unusual, assume it was mistranscribed. Do your best to infer the words actually spoken, and make changes to the mistranscriptions in your response. Do not add more words or content, just change the ones that seem to be out of place (and, therefore, mistranscribed). Do not change even the wording of sentences, just individual words that look nonsensical in the context of all of the other words in the sentence.\n\n"
        "Also, add correct punctuation to the entire text. ALWAYS end each sentence with the appropriate sentence ending: '.', '?', or '!'. The input text in all-caps, although your respose should be capitalized correctly and should NOT be in all-caps.\n\n"
        "Return the corrected text."
    )
    user_prompt = f"Transcription:\n\n{transcript}"

    messages = [
        {"role": "system", "content": system_prompt},
        {"role": "user", "content": user_prompt}
    ]

    # This example uses a chat template method; adjust if your tokenizer does not have this method.
    text = tokenizer.apply_chat_template(
        messages,
        tokenize=False,
        add_generation_prompt=True
    )
    model_inputs = tokenizer([text], return_tensors="pt").to(hf_model.device)
    generated_ids = hf_model.generate(
        **model_inputs,
        max_new_tokens=512
    )
    # Remove the prompt tokens from the generated output
    generated_ids = [
        output_ids[len(input_ids):]
        for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids)
    ]
    response = tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0]
    return response


# -------------------
# Streamlit App Logic
# -------------------
def main():
    # Load (or retrieve from cache) the lip reading pipeline
    pipeline = load_pipeline()

    # Load (or retrieve from cache) the Hugging Face LLM and tokenizer
    hf_model, tokenizer = load_hf_model()

    print("Model loaded successfully")

    st.title("Lip Reading App")
    st.write("Upload a muted video file and click **Predict** to perform lip reading.")

    uploaded_file = st.file_uploader("Upload Video", type=["mp4", "avi", "mov", "mkv"])
    if uploaded_file is not None:
        # Show the uploaded video in the UI
        st.video(uploaded_file)

        if st.button("Predict"):
            with st.spinner("Processing video..."):
                # Save the uploaded video locally
                video_path = "input_video.mp4"
                with open(video_path, "wb") as f:
                    f.write(uploaded_file.read())


                # Run the pipeline on the saved video
                transcript = pipeline(video_path)
                st.write("### Lip Reading Transcript:")
                st.write(transcript)

                # Generate the corrected transcript
                corrected_text = generate_corrected_text(transcript, hf_model, tokenizer)

            st.success("Prediction completed!")
            st.write("### Corrected Transcript:")
            st.write(corrected_text)


if __name__ == "__main__":
    main()

Writing app.py


In [None]:
import locale
locale.getpreferredencoding = lambda: "UTF-8"

In [None]:
from pyngrok import ngrok

ngrok_key = "2tDaTspy8uvzoWIogCd7UeRzJFN_332w1vWtZPvRjvd3TmNmQ"
port = 8501

ngrok.set_auth_token(ngrok_key)
ngrok.connect(port).public_url

'https://aa27-34-125-153-5.ngrok-free.app'

In [None]:
!rm -rf logs.txt && streamlit run app.py &>/content/logs.txt