# <center> Video Dubbing Full Pipeline </center>

In [1]:
import os 

import torch 
import torchaudio
import numpy as np
from moviepy import VideoFileClip, AudioFileClip

In [None]:
def extract_audio_from_mp4(video_path: str, target_sr: int = 16000, temp_dir='./temp-audios', delete_file=True) -> tuple[np.ndarray, int]:
    video = VideoFileClip(video_path)
    audio: AudioFileClip = video.audio
    
    if not os.path.exists(temp_dir):
        os.makedirs(temp_dir)

    temp_audio_path = temp_dir + "/temp_audio.wav"
    audio.write_audiofile(temp_audio_path, codec='pcm_s16le', fps=target_sr)
    
    audio_data, sr = torchaudio.load(temp_audio_path)
    
    if sr != target_sr:
        audio_data = torchaudio.transforms.Resample(orig_freq=sr, new_freq=target_sr)(audio_data)
    
    if audio_data.shape[0] > 1:
        audio_data = audio_data.mean(dim=0)

    if delete_file:
        os.remove(temp_audio_path)
    
    return audio_data.numpy(), sr

In [3]:
video_path = "/home/maksim/Repos/video_dubbing/test-videos/videoplayback.mp4"

audio, sr = extract_audio_from_mp4(video_path)

MoviePy - Writing audio in ./temp-audios/temp_audio.wav


                                                                      

MoviePy - Done.


## Базовые классы

In [4]:
from abc import ABCMeta, abstractmethod

In [5]:
from dataclasses import dataclass, field

In [None]:
@dataclass
class Segment:
    """
    Класс сегмента с речью в исходной аудиодорожке. 
    """
    start: int = field(repr=True) # Начало сегмента (индекс в исходном аудио)
    end: int = field(repr=True) # Конец сегмента

    audio: np.ndarray = field(repr=False) # Аудио сегмент

    transcription: str = field(repr=True, default=None) 
    translation: str = field(repr=True, default=None) 

    tts_wav: np.ndarray = field(repr=False, default=None) # Озвучка 

In [7]:
@dataclass
class VadOutput:
    """
    Класс возвращаемого VAD-пайплайном значения.
    """
    segments: list[Segment] = field(repr=False) # Речевые сегменты в исходном аудио
    audio: np.ndarray = field(repr=False) # Склеенные в единое аудио речевые сегменты
    timestamps_mapping: dict[tuple[int, int], Segment] = field(repr=False) # сопоставление таймкодов склеенного аудио с сегментами исходного аудио

In [8]:
@dataclass
class AsrWordOutput:
    """
    Результатом транскрибации является транскрипция с word-level временными метками. 
    Данный класс - это обёртка для каждого отдельного слова транскрипции.
    """
    start: int = field(repr=True)
    end: int = field(repr=True)
    word: str = field(repr=True)

In [9]:
class VADPipeline(metaclass=ABCMeta):
    @abstractmethod
    def __call__(self, audio, *args, **kwargs) -> VadOutput:
        pass


class ASRPipeline(metaclass=ABCMeta):
    @abstractmethod
    def __call__(self, audio) -> list[AsrWordOutput]:
        pass


class MTPipeline(metaclass=ABCMeta):
    @abstractmethod
    def _process_sample(self, text_en: str) -> str:
        pass


    @abstractmethod
    def __call__(self, texts_en: list[Segment]) -> list[Segment]:
        pass


class TTSPipeline(metaclass=ABCMeta):
    @abstractmethod
    def _process_sample(self, audio: str) -> np.ndarray:
        pass
    

    @abstractmethod
    def __call__(self, texts_ru: list[Segment], *args, **kwargs) -> list[Segment]:
        pass

## VAD

In [11]:
class SileroVADPipeline(VADPipeline):
    def __init__(self, model_path: str = ""):
        """
        Для локальной загрузки модели, нужно сначала её скачать: git clone <silerovad repo>
        А затем передать в качестве параметра model_path путь до корня склонированного репозитория.
        """
        if model_path:
            self.silerovad, utils = torch.hub.load(repo_or_dir=model_path,
                              model='silero_vad',
                              force_reload=True,
                              source='local')
        else:
            self.silerovad, utils = torch.hub.load(repo_or_dir='snakers4/silero-vad',
                                model='silero_vad',
                                force_reload=True, 
                                source='github') 

        (self.get_speech_timestamps, _, _, _, _) = utils


    def __call__(self, audio: np.ndarray, threshold: float = 0.5,  min_silence_duration_ms=1000, min_speech_duration_ms=1000, sampling_rate=16000) -> VadOutput:
        speech_segments: list[Segment] = []
        speech_audio = []
        new2old = {}
    
        speech_timestamps = self.get_speech_timestamps(audio, 
                                                  self.silerovad, 
                                                  threshold=threshold, 
                                                  sampling_rate=sampling_rate,
                                                  min_silence_duration_ms=min_silence_duration_ms, 
                                                  min_speech_duration_ms=min_speech_duration_ms)
    
        for ts in speech_timestamps:
            start = ts['start']
            end = ts['end']
            speech_segments.append(Segment(start=start, end=end, audio=audio[start:end]))


        start_idx = 0

        for segment in speech_segments:
            new2old[(start_idx, segment.end - segment.start + start_idx)] = segment

            start_idx = segment.end - segment.start + start_idx + 1

            speech_audio.extend(segment.audio.tolist())

        speech_audio = np.array(speech_audio)

        output: VadOutput = VadOutput(segments=speech_segments, audio=speech_audio, timestamps_mapping=new2old)
        
        return output

In [12]:
vad_path = "/home/maksim/Models/SileroVAD/snakers4-silero-vad"

vad_pipe = SileroVADPipeline(vad_path)

vad_outp = vad_pipe(audio, sampling_rate=sr)

segments = vad_outp.segments

In [13]:
print(segments[0])

Segment(start=225312, end=308704, transcription=None, translation=None)


## ASR

In [14]:
from faster_whisper import WhisperModel

class FasterWhisperPipeline(ASRPipeline):
    sr = 16_000

    def __init__(self, model_size_or_path: str = "tiny.en", device: str = "cpu", compute_type: str = "int8"):
        self.model = WhisperModel(model_size_or_path=model_size_or_path, device=device, compute_type=compute_type)
    
    def __call__(self, audio: np.ndarray) -> list[AsrWordOutput]:
        output = []

        segments, _ = self.model.transcribe(audio, word_timestamps=True)
        
        for segment in segments:
            for word in segment.words:
                output.append(
                    AsrWordOutput(
                        start=int(word.start * self.sr), 
                        end=int(word.end * self.sr), 
                        word=word.word
                    )
                )
        
        return output

In [15]:
whisper_path = "/home/maksim/Models/FasterWhisper/tiny-en"

asr_pipe = FasterWhisperPipeline(whisper_path)

asr_outp = asr_pipe(vad_outp.audio)

In [16]:
def process_asr_outp(asr_outp: list[AsrWordOutput]):
    """
    Сопоставление каждого слова из транскрипции с таймкодами исходного аудио.
    """
    for word in asr_outp:
        for interval in vad_outp.timestamps_mapping.keys():
            if word.start >= interval[0] and word.end <= interval[1]:
                segment = vad_outp.timestamps_mapping[interval]
                if segment.transcription:
                    segment.transcription += " " + word.word
                else:
                    segment.transcription = word.word

In [17]:
process_asr_outp(asr_outp=asr_outp)

In [18]:
print(segments[0].transcription)

 What  you're  doing  right  now  at  this  very  moment  is  killing


## EN -> RU (Machine Translation)

In [19]:
from transformers import AutoModelForSeq2SeqLM, AutoTokenizer, pipeline

class HelsinkiEnRuPipeline(MTPipeline):
    def __init__(self, model_path: str | None = None, device: str = 'cpu'):
        model = None
        tokenizer = None 
        if model_path:
            model = AutoModelForSeq2SeqLM.from_pretrained(model_path)
            tokenizer = AutoTokenizer.from_pretrained(model_path)
        
        else:
            model_hf_name = "Helsinki-NLP/opus-mt-en-ru"
            model = AutoModelForSeq2SeqLM.from_pretrained(model_hf_name)
            tokenizer = AutoTokenizer.from_pretrained(model_hf_name)
        
        self.pipe = pipeline(
            task="translation", 
            model=model, 
            tokenizer=tokenizer,
            device=device)

    def _process_sample(self, text_en: str) -> str:
        return self.pipe(text_en)[0]['translation_text']


    def __call__(self, segments: list[Segment]):
        for segment in segments:
            segment.translation = self._process_sample(segment.transcription)


In [20]:
mt_path = "/home/maksim/Models/OpusEnRu"

mt_pipe = HelsinkiEnRuPipeline(model_path=mt_path)

mt_pipe(segments)

Device set to use cpu


In [21]:
print(segments[0].translation)

То, что ты делаешь прямо сейчас, это убивает


## TTS

### XTTS-v2 (GPU Pipeline)

In [22]:
import soundfile as sf

In [23]:
from TTS.api import TTS

class XTTSPipeline(TTSPipeline):
    output_sampling_rate = 24_000

    def __init__(self, target_spk: str | None = None, model_path: str | None = None, device: str = 'cpu', temp_dir="./temp-dir/tts/"):
        self.target_spk = target_spk

        if model_path:
            self.model = TTS(model_path=model_path, config_path=f'{model_path}/config.json').to(device)
        else:
            self.model = TTS("tts_models/multilingual/multi-dataset/xtts_v2").to(device)

        self.temp_dir = temp_dir


    def _process_sample(self, text_ru: str, speaker_wav: str) -> np.ndarray:
        return self.model.tts(text=text_ru, speaker_wav=speaker_wav, language='ru')


    def __call__(self, segments: list[Segment]) -> list[Segment]:
        os.makedirs(self.temp_dir, exist_ok=True)

        for i, segment in enumerate(segments):
            if self.target_spk is None:
                audio_path = self.temp_dir + f"{i}.wav"

                sf.write(audio_path, segment.audio, 16_000)

                segment.tts_wav = self._process_sample(segment.translation, audio_path)

                os.remove(audio_path)
            else:
                segment.tts_wav = self._process_sample(segment.translation, self.target_spk)

        return segments

### Silero-TTS (CPU Pipeline)

In [24]:
class SileroTTSPipeline(TTSPipeline):
    output_sampling_rate = 48_000

    def __init__(self, model_path: str | None = None, device: str = 'cpu'):
        if model_path:
            self.silero_tts, _ = torch.hub.load(repo_or_dir=model_path,
                                     model='silero_tts',
                                     language='ru',
                                     speaker='v4_ru',
                                     source='local')

        else:
            self.silero_tts, _ = torch.hub.load(repo_or_dir='snakers4/silero-models',
                                     model='silero_tts',
                                     language='ru',
                                     speaker='v4_ru',
                                     source='github')
        
        self.silero_tts.to(device)


    def _process_sample(self, text_ru: str, speaker: str) -> np.ndarray:
        return self.silero_tts.apply_tts(text=text_ru,
                        speaker=speaker,
                        sample_rate=self.output_sampling_rate).numpy()


    def __call__(self, segments: list[Segment], speaker: str = "xenia"):
        for segment in segments:
            segment.tts_wav = self._process_sample(segment.translation, speaker)

In [None]:
xtts_path = "/home/maksim/Models/XTTS/XTTS_mg_ft/"

tts_pipe = XTTSPipeline(model_path=xtts_path)

tts_pipe(segments)

In [25]:
silero_tts_path = "/home/maksim/Models/SileroModels"

tts_pipe = SileroTTSPipeline(silero_tts_path)

tts_pipe(segments)

In [30]:
def postprocess(audio, segments: list[Segment], tts: type):
    audio_len = len(audio)
    output_audio = np.array([0.0]*audio_len)

    for segment in segments:
        segment_tts = torch.tensor(segment.tts_wav)
        segment_tts = torchaudio.transforms.Resample(tts.output_sampling_rate, 16_000)(segment_tts)
        
        for i in range(len(segment_tts)):
            output_audio[segment.start + i] = segment_tts[i]
        
    return output_audio

In [31]:
result = postprocess(audio, segments, type(tts_pipe))

In [37]:
import soundfile as sf
import ffmpeg


def merge_audio_video(audio: np.ndarray, video_path: str, output_path: str):
    audio_path = "../outputs/tmp.wav"

    sf.write(audio_path, audio, 16_000)

    video = ffmpeg.input(video_path).video
    audio = ffmpeg.input(audio_path).audio

    ffmpeg.output(audio, video, output_path, vcodec="copy", acodec="aac").run()

    os.remove(audio_path)

In [40]:
merge_audio_video(result, video_path, "../outputs/outp-3.mp4")

ffmpeg version 6.1.1-3ubuntu5 Copyright (c) 2000-2023 the FFmpeg developers
  built with gcc 13 (Ubuntu 13.2.0-23ubuntu3)
  configuration: --prefix=/usr --extra-version=3ubuntu5 --toolchain=hardened --libdir=/usr/lib/x86_64-linux-gnu --incdir=/usr/include/x86_64-linux-gnu --arch=amd64 --enable-gpl --disable-stripping --disable-omx --enable-gnutls --enable-libaom --enable-libass --enable-libbs2b --enable-libcaca --enable-libcdio --enable-libcodec2 --enable-libdav1d --enable-libflite --enable-libfontconfig --enable-libfreetype --enable-libfribidi --enable-libglslang --enable-libgme --enable-libgsm --enable-libharfbuzz --enable-libmp3lame --enable-libmysofa --enable-libopenjpeg --enable-libopenmpt --enable-libopus --enable-librubberband --enable-libshine --enable-libsnappy --enable-libsoxr --enable-libspeex --enable-libtheora --enable-libtwolame --enable-libvidstab --enable-libvorbis --enable-libvpx --enable-libwebp --enable-libx265 --enable-libxml2 --enable-libxvid --enable-libzimg --ena

# Объединяем всё в единый пайплайн

In [None]:
class VideoDubber:
    def __init__(self, config: dict):
        self.config = config
    
    def _extract_audio_from_mp4(self, video_path: str, target_sr: int = 16000, temp_dir='./temp-audios', delete_file=True) -> tuple[np.ndarray, int]:
        """
        Извлекает аудио из MP4-файла, преобразует к моно (1 канал) и заданной частоте дискретизации.
    
        Args:
            video_path: Путь к MP4-файлу.
            target_sr: Целевая частота дискретизации (по умолчанию 16 кГц для Whisper).
    
        Returns:
            Аудио-данные в виде np.array и частоту дискретизации.
        """
        # 1. Извлекаем аудио из видео с помощью moviepy
        video = VideoFileClip(video_path)
        audio: AudioFileClip = video.audio
    
        # 2. Сохраняем временный WAV-файл (moviepy не возвращает напрямую np.array)
        if not os.path.exists(temp_dir):
            os.makedirs(temp_dir)

        temp_audio_path = temp_dir + "/temp_audio.wav"
        audio.write_audiofile(temp_audio_path, codec='pcm_s16le', fps=target_sr)
    
        # 3. Загружаем аудио 
        audio_data, sr = torchaudio.load(temp_audio_path)
    
        if sr != target_sr:
            audio_data = torchaudio.transforms.Resample(orig_freq=sr, new_freq=target_sr)(audio_data)
    
        if audio_data.shape[0] > 1:
            audio_data = audio_data.mean(dim=0)

        if delete_file:
            os.remove(temp_audio_path)
    
        return audio_data.numpy(), sr
    

    def _merge_segments(self, audio, segments: list[Segment]):
        audio_len = len(audio)
        output_audio = np.array([0.0]*audio_len)

        for segment in segments:
            segment_tts = torch.tensor(segment.tts_wav)
            segment_tts = torchaudio.transforms.Resample(SileroTTSPipeline.output_sampling_rate, 16_000)(segment_tts)
        
            for i in range(len(segment_tts)):
                output_audio[segment.start + i] = segment_tts[i]
        
        return output_audio


    def _merge_audio_with_video(self, audio, input_video_path, output_video_path):
        audio_path = "./tmp.wav"

        sf.write(audio_path, audio, 16_000)

        video = ffmpeg.input(input_video_path).video
        audio = ffmpeg.input(audio_path).audio

        ffmpeg.output(audio, video, output_video_path, vcodec="copy", acodec="aac", ).run()
    

    def __call__(self, input_video_path: str, output_video_path: str):
        audio, sr = self._extract_audio_from_mp4(input_video_path)
        
        segments: list[Segment] = []

        # Vad
        vad_pipe = None
        if self.config["vad"]["model"] == "silero":
            vad_pipe = SileroVADPipeline(model_path=self.config["vad"]["model_path"])
        else:
            raise ValueError("Vad model should be in list: ['silero']")
        
        segments = vad_pipe(audio)
        
        # ASR
        asr_pipe = None
        if self.config["asr"]["model"] == "whisper":
            asr_pipe = WhisperPipeline(model_path=self.config["asr"]["model_path"], device=self.config["mt"]["device"])
        else:
            raise ValueError("ASR model should be in list: ['whisper']")
        
        asr_pipe(segments)

        # MT
        mt_pipe = None
        if self.config["mt"]["model"] == "opus-en-ru":
            mt_pipe = HelsinkiEnRuPipeline(model_path=self.config["mt"]["model_path"], device=self.config["mt"]["device"])
        else:
            raise ValueError("MT model should be in list: ['opus-en-ru']")

        mt_pipe(segments)
        
        # TTS
        tts_pipe = None
        if self.config["tts"]["model"] == "xtts":
            tts_pipe = XTTSPipeline(self.config["tts"]["params"]["speaker_wav"], self.config["tts"]["model_path"], device=self.config["tts"]["device"])
        elif self.config["tts"]["model"] == "silero":
            tts_pipe = SileroTTSPipeline(model_path=self.config["tts"]["model_path"], device=self.config["tts"]["device"])
        else:
            raise ValueError("TTS model should be in list: ['silero', 'xtts']")
        
        tts_pipe(segments)

        output_audio = self._merge_segments(audio, segments)
        self._merge_audio_with_video(output_audio, input_video_path, output_video_path)
        
    

In [36]:
video_dubbing_config = {
    "vad": {
        "model": "silero",
        "model_path": "/home/maksim/Models/SileroVAD/snakers4-silero-vad",
        "device": "cpu",
        "params": {
            "threshold": 0.5,
            "min_silence_duration_ms": 1000,
            "min_speech_duration_ms": 1000
        }
    },
    "asr": {
        "model": "whisper",
        "device": "cpu",
        "model_path": "/home/maksim/Models/Whisper/tiny"
    },
    "mt": {
        "model": "opus-en-ru",
        "device": "cpu",
        "model_path": "/home/maksim/Models/OpusEnRu"
    },
    "tts": {
        "model": "silero",
        "model_path": "/home/maksim/Models/SileroModels",
        "device": "cpu",
        "params": {
            "speaker": "xenia"
        }
    }
}

In [37]:
video_dubber = VideoDubber(video_dubbing_config)

In [39]:
video_dubber("/home/maksim/Repos/video_dubbing/test-videos/videoplayback.mp4", "./outp-3.mp4")

MoviePy - Writing audio in ./temp-audios/temp_audio.wav


                                                                      

MoviePy - Done.


Device set to use cpu
Whisper did not predict an ending timestamp, which can happen if audio is cut off in the middle of a word. Also make sure WhisperTimeStampLogitsProcessor was used during generation.
Device set to use cpu
ffmpeg version 6.1.1-3ubuntu5 Copyright (c) 2000-2023 the FFmpeg developers
  built with gcc 13 (Ubuntu 13.2.0-23ubuntu3)
  configuration: --prefix=/usr --extra-version=3ubuntu5 --toolchain=hardened --libdir=/usr/lib/x86_64-linux-gnu --incdir=/usr/include/x86_64-linux-gnu --arch=amd64 --enable-gpl --disable-stripping --disable-omx --enable-gnutls --enable-libaom --enable-libass --enable-libbs2b --enable-libcaca --enable-libcdio --enable-libcodec2 --enable-libdav1d --enable-libflite --enable-libfontconfig --enable-libfreetype --enable-libfribidi --enable-libglslang --enable-libgme --enable-libgsm --enable-libharfbuzz --enable-libmp3lame --enable-libmysofa --enable-libopenjpeg --enable-libopenmpt --enable-libopus --enable-librubberband --enable-libshine --enable-lib