# **Set Up Environment**

In [None]:
!pip install -U openai-whisper
!pip install torch torchvision torchaudio pytorch-lightning sentencepiece av

In [None]:
!pip install setuptools-rust

In [None]:
%cd auto_avsr/preparation
!pip install -r requirements.txt

In [None]:
%cd ../..
!git clone https://github.com/hhj1897/face_detection.git
%cd face_detection
!git lfs pull
%pip install -e .
%cd ..

In [None]:
import sys
import os

# Change it to the dictionary of these two folder
face_alignment_path = "face_alignment"
face_detection_path = "face_detection"


sys.path.insert(0, face_alignment_path)
sys.path.insert(0, face_detection_path)

try:
    from ibug.face_alignment import FANPredictor
    print("Successfully imported FANPredictor")
    from ibug.face_detection import RetinaFacePredictor
    print("Successfully imported RetinaFacePredictor")
except ImportError as e:
    print(f"Import Error: {e}")
    print("\nCurrent sys.path:")
    for p in sys.path:
        print(p)

Successfully imported FANPredictor
Successfully imported RetinaFacePredictor


In [None]:
%cd auto_avsr


In [None]:
import sys
sys.path.insert(0, "../")
import os
import torch
import torchaudio
import torchvision
import os
from lightning import ModelModule
from datamodule.transforms import AudioTransform, VideoTransform

In [None]:
import os
import whisper

audio_model = whisper.load_model("turbo")

100%|██████████████████████████████████████| 1.51G/1.51G [00:10<00:00, 150MiB/s]


# **Visual ASR**

In [None]:
import argparse
parser = argparse.ArgumentParser()
args, _ = parser.parse_known_args(args=[])

In [None]:
class InferencePipeline(torch.nn.Module):
    def __init__(self, args, ckpt_path, detector="retinaface"):
        super(InferencePipeline, self).__init__()
        self.modality = args.modality
        if self.modality == "audio":
            self.audio_transform = AudioTransform(subset="test")
        elif self.modality == "video":
            if detector == "mediapipe":
                from preparation.detectors.mediapipe.detector import LandmarksDetector
                from preparation.detectors.mediapipe.video_process import VideoProcess
                self.landmarks_detector = LandmarksDetector()
                self.video_process = VideoProcess(convert_gray=False)
            elif detector == "retinaface":
                from preparation.detectors.retinaface.detector import LandmarksDetector
                from preparation.detectors.retinaface.video_process import VideoProcess
                self.landmarks_detector = LandmarksDetector(device="cuda:0")
                self.video_process = VideoProcess(convert_gray=False)
            self.video_transform = VideoTransform(subset="test")

        ckpt = torch.load(ckpt_path, map_location=lambda storage, loc: storage)
        self.modelmodule = ModelModule(args)
        self.modelmodule.model.load_state_dict(ckpt)
        self.modelmodule.eval()

    def load_video(self, data_filename):
        return torchvision.io.read_video(data_filename, pts_unit="sec")[0].numpy()

    def forward(self, data_filename):
        data_filename = os.path.abspath(data_filename)
        assert os.path.isfile(data_filename), f"data_filename: {data_filename} does not exist."

        if self.modality == "audio":
            audio, sample_rate = self.load_audio(data_filename)
            audio = self.audio_process(audio, sample_rate)
            audio = audio.transpose(1, 0)
            audio = self.audio_transform(audio)
            with torch.no_grad():
                transcript = self.modelmodule(audio)

        if self.modality == "video":
            video = self.load_video(data_filename)
            landmarks = self.landmarks_detector(video)
            video = self.video_process(video, landmarks)
            video = torch.tensor(video)
            video = video.permute((0, 3, 1, 2))
            video = self.video_transform(video)
        #     with torch.no_grad():
        #         transcript = self.modelmodule(video)
        # return transcript
            with torch.no_grad():
                predicted, confidence_score, nbest_hyps, best_hype = self.modelmodule(video)
            return predicted, confidence_score, nbest_hyps, best_hype

    def load_audio(self, data_filename):
        waveform, sample_rate = torchaudio.load(data_filename, normalize=True)
        return waveform, sample_rate

    def load_video(self, data_filename):
        return torchvision.io.read_video(data_filename, pts_unit="sec")[0].numpy()

    def audio_process(self, waveform, sample_rate, target_sample_rate=16000):
        if sample_rate != target_sample_rate:
            waveform = torchaudio.functional.resample(
                waveform, sample_rate, target_sample_rate
            )
        waveform = torch.mean(waveform, dim=0, keepdim=True)
        return waveform

In [None]:
model_path = "../inference/visual_asr.pth"
import os
print("model_path:", model_path)
print("model file exists:", os.path.exists(model_path))

model_path: ../inference/visual_asr.pth
model file exists: True


In [None]:
setattr(args, 'modality', 'video')
pipeline = InferencePipeline(args, model_path, detector="retinaface")

In [None]:
def generate_visualASRTxt(file_name, model):
  input_path = "../inference/data/video/" + file_name
  output_path = "../inference/data/VSR_result/" + file_name.split(".")[0] + ".txt"
  predicted, confidence_score, nbest_hyps, best_hype = pipeline(input_path)
  with open(output_path, 'w') as file:
    file.write(predicted)
  return predicted, confidence_score

# **Audio ASR**

In [None]:
file_name = "noisy_1.mp4"

def generate_audioASRTxt(file_name, model):
  confidence_by_segment = []
  input_path = "../inference/data/video/" + file_name
  output_path = "../inference/data/ASR_result/" + file_name.split(".")[0] + ".txt"
  result = model.transcribe(input_path)
  predicted = result["text"]
  with open(output_path, 'w') as file:
    file.write(predicted)
  confidence_by_segment = []
  for segment in result["segments"]:
    confidence_by_segment.append(segment["avg_logprob"])
  confidence = sum(confidence_by_segment) / len(confidence_by_segment)
  return predicted, confidence


# **Fusion**

In [None]:
from transformers import GPT2LMHeadModel, GPT2Tokenizer
import torch
tokenizer = GPT2Tokenizer.from_pretrained("gpt2")
lm_model = GPT2LMHeadModel.from_pretrained("gpt2")
lm_model.eval()

def get_lm_score(prefix: str, candidate: str) -> float:
    text = (prefix + " " + candidate).strip()
    inputs = tokenizer(text, return_tensors="pt")
    with torch.no_grad():
        outputs = lm_model(**inputs, labels=inputs["input_ids"])
    return -outputs.loss.item()

def fuse_transcripts_final(
    visual_text: str,
    audio_text: str,
    visual_conf: float,
    audio_conf: float,
    alpha_v: float = 1.0,
    alpha_a: float = 1.0,
    alpha_lm: float = 0.2,
    switch_penalty: float = 0.1,
    audio_strong_threshold: float = -0.2,
    audio_weak_threshold: float = -0.4,
    lowercase: bool = True
) -> str:
    def normalize(text: str) -> str:
        t = text.strip()
        if lowercase:
            t = t.lower()
        for ch in ".,?!'\"":
            t = t.replace(ch, "")
        return t

    if audio_conf > audio_strong_threshold:
        return normalize(audio_text)
    if audio_conf < audio_weak_threshold:
        return normalize(visual_text)

    v_tokens = normalize(visual_text).split()
    a_tokens = normalize(audio_text).split()
    max_len = max(len(v_tokens), len(a_tokens))
    v_tokens += [""] * (max_len - len(v_tokens))
    a_tokens += [""] * (max_len - len(a_tokens))

    fused = []
    prev_source = None

    for i in range(max_len):
        v_word = v_tokens[i]
        a_word = a_tokens[i]

        if not a_word:
            fused.append(v_word)
            prev_source = 'visual'
            continue
        if not v_word:
            fused.append(a_word)
            prev_source = 'audio'
            continue

        v_score = alpha_v * (-visual_conf)
        a_score = alpha_a * (-audio_conf)

        prefix = " ".join(fused)
        v_lm = get_lm_score(prefix, v_word) if alpha_lm > 0 else 0.0
        a_lm = get_lm_score(prefix, a_word) if alpha_lm > 0 else 0.0

        v_total = v_score + alpha_lm * v_lm
        a_total = a_score + alpha_lm * a_lm

        if a_total >= v_total:
            chosen = a_word
            source = 'audio'
            score = a_total
        else:
            chosen = v_word
            source = 'visual'
            score = v_total

        if prev_source and source != prev_source:
            score -= switch_penalty

        fused.append(chosen)
        prev_source = source

    generate_text = " ".join(fused)

    return generate_text


tokenizer_config.json:   0%|          | 0.00/26.0 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/1.04M [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.36M [00:00<?, ?B/s]

config.json:   0%|          | 0.00/665 [00:00<?, ?B/s]

Xet Storage is enabled for this repo, but the 'hf_xet' package is not installed. Falling back to regular HTTP download. For better performance, install the package with: `pip install huggingface_hub[hf_xet]` or `pip install hf_xet`


model.safetensors:   0%|          | 0.00/548M [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/124 [00:00<?, ?B/s]

# **Inference**

In [None]:
file_name = "noisy_1.mp4"

In [None]:
def generate_multimodaltxt(file_name, pipeline):
  visual_predicted, visual_confidence_score = generate_visualASRTxt(file_name, pipeline)
  audio_result, audio_confidence = generate_audioASRTxt(file_name, audio_model)
  fused_text = fuse_transcripts_final(
    visual_predicted, audio_result,
    visual_confidence_score, audio_confidence)
  output_path = "../inference/data/multimodal_result/" + file_name.split(".")[0] + ".txt"
  with open(output_path, 'w') as file:
    file.write(fused_text)
  return fused_text



In [None]:
text = generate_multimodaltxt(file_name, pipeline)

In [None]:
text

'can i help you i hope so im looking for some material for paper on writing and im not quite sure where to look ill still try to help you what topic is your paper on'

# **Generate Transcript for All Files**

In [None]:
# This step is just run the inference step above on all videos in the dataset, so it will take quite a long time to finish running
folder_path = "../inference/data/video"
confidence = []
for filename in os.listdir(folder_path):
    print("=============")
    print(filename)
    file_name = filename
    text = generate_multimodaltxt(file_name, pipeline)
    print(text)

quite_1.mp4
do you like cooking yes i like cooking very much i got this hobby when i was 12 years old
quite_2.mp4
you look so tan and healthy thanks i just go back from summer camp how was it great i got to try so many things for the first time
quite_3.mp4


`loss_type=None` was set in the config but it is unrecognised.Using the default loss: `ForCausalLMLoss`.


satya did you get the perfume i give you instant but i tell you the truth i dont want to perfume im sorry i dont know daddy
quite_4.mp4
i would like to register for a class today no problem what class would you like to take i would very much enjoy taking the psychology class because im crazy
quite_5.mp4
hi bill i saw your grandma yesterday oh where was that i was working around the track at my college and she was working around the same track track
quite_6.mp4
granny always tries to stay fit and healthy she is always making us kids the proper food well it pays off for her
quite_8.mp4
to me english is a difficult language a second language is always difficult true but english is harder than the most
quite_7.mp4
to me english is a difficult language a second language is always difficult true but english is harder than the most
quite_9.mp4
do you like traveling yes i like traveling for pleasure to get places for vacation for instance but i dont like traveling to work
quite_10.mp4
what do 