# Create Voice Over for Video using LLaVa and TTS

This notebook utilizes open source vision and speech synthesis to automatically create voiceover narration for a video file. The key steps are:

1. **Extract Video Frames**: Utilize OpenCV, an open source computer vision library, to sample frames from the input video file at regular intervals such as every 25 frames.

2. **Generate Image Descriptions**: Feed the extracted video frames into LLaVA, an open source vision model, to generate textual descriptions of the visual contents of each frame.

3. **Summarize Frame Descriptions**: Take the LLaVA-generated descriptions for all sampled frames and process them into a prompt call to GPT-4 for voiceover narration covering the full video contents.

4. **Synthesize Narration Audio**: Use SileroTTS, an open source text-to-speech model, to synthesize the narration audio from the generated script.

5. **Combine Audio and Video**: Finally, mix the synthesized narration audio track with the original video to produce the final video with automatic voiceover.

The use of open source vision and speech models provides an efficient way to automatically create narration for video contents without manual effort.

In [None]:
!pip install openai --quiet
!pip install langchain --quiet
!pip install pydub loguru --quiet
!pip install moviepy --quiet
!pip install opencv-python --quiet
!pip install replicate --quiet

In [None]:
!mkdir frames

### text to speech service, opne source silero is used here, code sample referred to https://github.com/ouoertheo/silero-api-server

In [27]:
import os, time
import shutil
import requests
import torch
import torch.package
import torchaudio
from hashlib import md5
from loguru import logger
from pydub import AudioSegment
from pathlib import Path
import json

class SileroTtsService:
    """
    Generate TTS wav files using Silero
    """
    def __init__(self, sample_path, lang="v3_en.pt") -> None:
        self.sample_text = "The fallowed fallen swindle auspacious goats in portable power stations."
        self.sample_path = Path(sample_path)
        self.sessions_path = None

        # Silero works fine on CPU
        self.device = torch.device('cpu')
        torch.set_num_threads(4)
        torchaudio.set_audio_backend("soundfile")

        # Make sure we have the sample path
        if not self.sample_path.exists():
            self.sample_path.mkdir()

        self.sample_rate = 48000
        logger.info(f"TTS Service loaded successfully")

        # Prevent generation failure due to too long input
        self.max_char_length = 600

        # Get language model URLs
        self.langs = self.list_languages()

        # Load model
        self.load_model(lang)

    def init_sessions_path(self, sessions_path="sessions"):
        self.sessions_path = Path(sessions_path)
        if not self.sessions_path.exists():
            self.sessions_path.mkdir()

    def load_model(self, lang_model="v3_en.pt"):
        # Download the model. Default to en.
        if lang_model not in self.langs:
            raise Exception(f"{lang_model} not in {list(self.langs.values())}")

        model_url = self.langs[lang_model]
        self.model_file = Path(lang_model)

        if not Path.is_file(self.model_file):
            logger.warning(f"Downloading Silero {lang_model} model...")
            torch.hub.download_url_to_file(model_url,
                                        self.model_file)
            logger.info(f"Model download completed.")

        self.model = torch.package.PackageImporter(self.model_file).load_pickle("tts_models", "model")
        self.model.to(self.device)

    def generate(self, speaker, text, session=""):
        if len(text) > self.max_char_length:
            # Handle long text input
            text_chunks = self.split_text(text)
            combined_wav = AudioSegment.empty()

            for chunk in text_chunks:
                audio_path = Path(self.model.save_wav(text=chunk,speaker=speaker,sample_rate=self.sample_rate))
                combined_wav += AudioSegment.silent(500) # Insert 500ms pause
                combined_wav += AudioSegment.from_file(audio_path)

            combined_wav.export("test.wav", format="wav")
            audio_path = Path("test.wav")
        else:
            audio_path = Path(self.model.save_wav(text=text,speaker=speaker,sample_rate=self.sample_rate))
        if session:
            self.save_session_audio(audio_path, session, speaker)
        return audio_path

    def split_text(self, text:str) -> list[str]:
        # Split text into chunks less than self.max_char_length
        chunk_list = []
        chunk_str = ""

        for word in text.split(' '):
            word = word.replace('\n',' ') + " "
            if len(chunk_str + word) > self.max_char_length:
                chunk_list.append(chunk_str)
                chunk_str = ""
            chunk_str += word

        # Add the last chunk
        if len(chunk_str) > 0:
            chunk_list.append(chunk_str)

        return chunk_list


    def combine_audio(self, audio_segments):
        combined_audio = AudioSegment.from_mono_audiosegments(audio_segments)
        return combined_audio

    def save_session_audio(self, audio_path:Path, session:Path, speaker):
        if not self.sessions_path:
            raise Exception("Session not initialized. Call /tts/init_session with {'path':'desired\session\path'}")
        session_path = self.sessions_path.joinpath(session)
        if not session_path.exists():
            session_path.mkdir()
        dst = session_path.joinpath(f"tts_{session}_{int(time.time())}_{speaker}_.wav")
        shutil.copy(audio_path, dst)

    def get_speakers(self):
        "List different speakers in model"
        return self.model.speakers

    def generate_samples(self):
        "Remove current samples and generate new ones for all speakers."
        logger.warning("Removing current samples")
        for file in self.sample_path.iterdir():
            os.remove(self.sample_path.joinpath(file))

        logger.info("Creating new samples. This should take a minute...")
        for speaker in self.model.speakers:
            sample_name = Path(self.sample_path.joinpath(f"{speaker}.wav"))
            if sample_name.exists():
                continue
            audio = Path(self.model.save_wav(text=self.sample_text,speaker=speaker,sample_rate=self.sample_rate))
            audio.rename(self.sample_path.joinpath(sample_name))
        logger.info("New samples created")

    def update_sample_text(self,text: str):
        "Update the text used to generate samples"
        if not text: return
        self.sample_text = text
        logger.info(f"Sample text updated to {self.sample_text}")

    def list_languages(self):
        'Grab all v3 model links from https://models.silero.ai/models/tts'
        lang_file = Path('langs.json')
        if lang_file.exists():
            with lang_file.open('r') as fh:
                logger.info('Loading cached language index')
                return json.load(fh)
        logger.info('Loading remote language index')
        lang_base_url = 'https://models.silero.ai/models/tts'
        lang_urls = {}

        # Parse initial web directory for languages
        response = requests.get(lang_base_url)
        langs = [lang.split('/')[0] for lang in response.text.split('<a href="')][1:]

        # Enter each web directory and grab v3 model file links
        for lang in langs:
            response = requests.get(f"{lang_base_url}/{lang}")
            if not response.ok:
                raise f"Failed to get languages: {response.status_code}"
            lang_files = [f.split('"')[0] for f in response.text.split('<a href="')][1:]

            # If a valid v3 file, add to list
            for lang_file in lang_files:
                if lang_file.startswith('v3'):
                    lang_urls[lang_file]=f"{lang_base_url}/{lang}/{lang_file}"
        with open('langs.json','w') as fh:
            json.dump(lang_urls,fh)
        return lang_urls

### function to extract frames from video

In [9]:
import cv2
def video_to_frames(video_file, output_dir):
    vidcap = cv2.VideoCapture(video_file)
    fps = vidcap.get(cv2.CAP_PROP_FPS)
    success, image = vidcap.read()
    count = 0
    while success:
        cv2.imwrite(output_dir + f"/frame{count}.jpg", image)
        success, image = vidcap.read()
        count += 1
    return count, count/fps


### environment variables

In [15]:
import os
os.environ["REPLICATE_API_TOKEN"] = "xxxx"
os.environ["OPENAI_API_KEY"] = "xxxx"
os.environ["OPENAI_API_BASE"] = "xxx"
os.environ["OPENAI_API_VERSION"] = "xxxx"


### use vision model to get text from image of different sample frames

In [12]:
import replicate
def frames_to_frame_text(count):

    target_ids = list(range(count))[0::25]

    frame_texts = []

    for i in target_ids:
        output = replicate.run(
            "yorickvp/llava-13b:2facb4a474a0462c15041b78b1ad70952ea46b5ec6ad29583c0b29dbd4249591",
            input={
                "image": open(f"frames/frame{i}.jpg", "rb"),
                # "image": "https://marketplace.canva.com/EAETpJ0lmjg/2/0/1131w/canva-fashion-invoice-in-beige-black-minimalist-style-zvoLwRH8Wys.jpg",
                "prompt": f"this is the frame {i+1} of an video, describe this frame."
            }
        )
        ret = "".join(list(output))
        frame_texts.append(ret)


    text = ""

    for i, id in enumerate(target_ids):
        text += f"frame {id} description:\n"
        text += frame_texts[i] + "\n------\n\n\n"

    return text


### use a prompt to get voice over text from the series of frame texts

In [18]:

from langchain.prompts import PromptTemplate
from langchain.chat_models import AzureChatOpenAI
from langchain.schema import StrOutputParser


def frame_text_to_voice_over(text, duration):

    prompt = PromptTemplate.from_template(
        """
        we have a video, below it is the description of sampled frames(every 25):

        ### description of different sample frames
        {text}

        ### Instruction
        Generate a Voice-over text for this video based on the descriptions of frames above.
        The text should not have more than {word_count} words
        """
    )
    runnable = prompt | AzureChatOpenAI(deployment_name="gpt-4") | StrOutputParser()
    ret = runnable.invoke({"text": text,
                    #  "duration": duration,
                    "word_count": duration * 2})
    return ret



### combine the audio with the video

In [30]:

def combine_audio(vidname, audname, outname, fps=25):
    import moviepy.editor as mpe
    my_clip = mpe.VideoFileClip(vidname)
    audio_background = mpe.AudioFileClip(audname)
    final_clip = my_clip.set_audio(audio_background)
    final_clip.write_videofile(outname,fps=fps)


### with an input video, use the functions step by step to create a voice over for the video

In [None]:

tts_service = SileroTtsService("./samples")

x = "input video locaiton"

import glob
files = glob.glob('./frames/*')
for f in files:
    os.remove(f)
count, duration = video_to_frames(x, "frames")
frame_text = frames_to_frame_text(count)
voice_over = frame_text_to_voice_over(frame_text, duration)
tts_service.generate(speaker = f'en_{49}', text=voice_over)
combine_audio(x, "test.wav", "output.mp4")
