## Lip Sync Model Implementation

####Environment Setup

In [None]:
# Installing required modules
!pip install diffusers mediapipe>=0.10.8 transformers huggingface-hub omegaconf
!pip install einops opencv-python face-alignment decord ffmpeg-python
!pip install safetensors soundfile gtts

# Cloning git repository
!git clone https://github.com/Isi-dev/LatentSync
%cd LatentSync

In [None]:
# Importing required modules
import os
from google.colab import files
import torch
from omegaconf import OmegaConf
from diffusers import AutoencoderKL, DDIMScheduler
from latentsync.models.unet import UNet3DConditionModel
from latentsync.pipelines.lipsync_pipeline import LipsyncPipeline
from latentsync.whisper.audio2feature import Audio2Feature
from diffusers.utils.import_utils import is_xformers_available
from accelerate.utils import set_seed
import ipywidgets as widgets
import cv2
import torchaudio
import subprocess
from gtts import gTTS

In [None]:
# Checkpoint for LatentSync
os.makedirs("/root/.cache/torch/hub/checkpoints", exist_ok=True)

!wget https://huggingface.co/Isi99999/LatentSync/resolve/main/auxiliary/s3fd-619a316812.pth -O /root/.cache/torch/hub/checkpoints/s3fd-619a316812.pth
!wget https://huggingface.co/Isi99999/LatentSync/resolve/main/auxiliary/2DFAN4-cd938726ad.zip -O /root/.cache/torch/hub/checkpoints/2DFAN4-cd938726ad.zip

!mkdir -p checkpoints

!wget https://huggingface.co/Isi99999/LatentSync/resolve/main/latentsync_unet.pt -O checkpoints/latentsync_unet.pt
!wget https://huggingface.co/Isi99999/LatentSync/resolve/main/whisper/tiny.pt -O checkpoints/tiny.pt
!wget https://huggingface.co/stabilityai/sd-vae-ft-mse/resolve/main/diffusion_pytorch_model.safetensors -O checkpoints/diffusion_pytorch_model.safetensors
!wget https://huggingface.co/stabilityai/sd-vae-ft-mse/raw/main/config.json -O checkpoints/config.json

####Inference method

In [None]:
# Inference
def perform_inference(video_path, audio_path, seed=1247, num_inference_steps=20, guidance_scale=1.0, output_path="output_video.mp4"):
    config_path = "configs/unet/first_stage.yaml"
    inference_ckpt_path = "checkpoints/latentsync_unet.pt"

    config = OmegaConf.load(config_path)

    is_fp16_supported = torch.cuda.is_available() and torch.cuda.get_device_capability()[0] > 7
    dtype = torch.float16 if is_fp16_supported else torch.float32

    scheduler = DDIMScheduler.from_pretrained("configs")

    whisper_model_path = "checkpoints/tiny.pt"
    audio_encoder = Audio2Feature(model_path=whisper_model_path, device="cuda", num_frames=config.data.num_frames)

    vae = AutoencoderKL.from_pretrained("checkpoints", torch_dtype=dtype, local_files_only=True)
    vae.config.scaling_factor = 0.18215
    vae.config.shift_factor = 0

    unet, _ = UNet3DConditionModel.from_pretrained(
        OmegaConf.to_container(config.model),
        inference_ckpt_path,
        device="cpu",
    )

    unet = unet.to(dtype=dtype)

    if is_xformers_available():
        unet.enable_xformers_memory_efficient_attention()
        print('x_formers available!')

    pipeline = LipsyncPipeline(
        vae=vae,
        audio_encoder=audio_encoder,
        unet=unet,
        scheduler=scheduler,
    ).to("cuda")

    set_seed(seed)

    pipeline(
        video_path=video_path,
        audio_path=audio_path,
        video_out_path=output_path,
        video_mask_path=output_path.replace(".mp4", "_mask.mp4"),
        num_frames=config.data.num_frames,
        num_inference_steps=num_inference_steps,
        guidance_scale=guidance_scale,
        weight_dtype=dtype,
        width=config.data.resolution,
        height=config.data.resolution,
    )
    return output_path

##### Generating Audio and Video files

In [2]:
def convert_video_fps(input_path, target_fps):
    if not os.path.exists(input_path) or os.path.getsize(input_path) == 0:
        print(f"Error: The video file {input_path} is missing or empty.")
        return None

    output_path = f"converted_{target_fps}fps.mp4"

    audio_check_cmd = [
        "ffprobe", "-i", input_path, "-show_streams", "-select_streams", "a",
        "-loglevel", "error"
    ]
    audio_present = subprocess.run(audio_check_cmd, capture_output=True, text=True).stdout.strip() != ""

    cmd = [
        "ffmpeg", "-y", "-i", input_path,
        "-filter:v", f"fps={target_fps}",
        "-c:v", "libx264", "-preset", "fast", "-crf", "18",
    ]

    if audio_present:
        cmd.extend(["-c:a", "aac", "-b:a", "192k"])
    else:
        cmd.append("-an")

    cmd.append(output_path)

    subprocess.run(cmd, check=True)
    print(f"Converted video saved as {output_path}")
    return output_path

def generate_audio_from_text(text, output_path="generated_audio.mp3", lang='en'):
    """Generate audio file from text using Google Text-to-Speech."""
    tts = gTTS(text=text, lang=lang, slow=False)
    tts.save(output_path)
    return output_path

def pad_audio_to_multiple_of_16(audio_path, target_fps=25):
    waveform, sample_rate = torchaudio.load(audio_path)
    audio_duration = waveform.shape[1] / sample_rate  # Duration in seconds

    num_frames = int(audio_duration * target_fps)

    # Pad audio to ensure frame count is a multiple of 16
    remainder = num_frames % 16
    if remainder > 0:
        pad_frames = 16 - remainder
        pad_samples = int((pad_frames / target_fps) * sample_rate)
        pad_waveform = torch.zeros((waveform.shape[0], pad_samples))  # Silence padding
        waveform = torch.cat((waveform, pad_waveform), dim=1)

        # Save the padded audio
        padded_audio_path = "padded_audio.wav"
        torchaudio.save(padded_audio_path, waveform, sample_rate)
    else:
        padded_audio_path = audio_path  # No padding needed

    padded_duration = waveform.shape[1] / sample_rate
    padded_num_frames = int(padded_duration * target_fps)

    return padded_audio_path, padded_num_frames

def convert_mp3_to_wav(mp3_path):
    """Convert MP3 to WAV format using FFmpeg."""
    wav_path = mp3_path.replace('.mp3', '.wav')
    cmd = [
        "ffmpeg", "-y", "-i", mp3_path,
        "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1",
        wav_path
    ]
    subprocess.run(cmd, check=True)
    return wav_path

def create_video_from_image(image_path, output_video_path, num_frames, fps=25):
    """Convert an image into a video of specified length (num_frames at 25 FPS)."""
    img = cv2.imread(image_path)
    if img is None:
        print("Error: Unable to read the image.")
        return None

    height, width, _ = img.shape
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    video_writer = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))

    for _ in range(num_frames):
        video_writer.write(img)

    video_writer.release()
    print(f"Created video {output_video_path} with {num_frames} frames ({num_frames / fps:.2f} seconds).")
    return output_video_path


####UI Widget

In [3]:
image_upload = widgets.FileUpload(accept="image/*", multiple=False, description="Upload Image")
text_input = widgets.Textarea(
    value='Hello, this is a sample text for lip sync.',
    placeholder='Enter the text you want to convert to speech',
    description='Text:',
    disabled=False,
    rows=5
)
language_selector = widgets.Dropdown(
    options=[
        ('English', 'en'),
        ('Spanish', 'es'),
        ('French', 'fr'),
        ('German', 'de'),
        ('Italian', 'it'),
        ('Japanese', 'ja'),
        ('Korean', 'ko'),
        ('Chinese', 'zh-CN')
    ],
    value='en',
    description='Language:'
)
seed_input = widgets.IntText(value=1247, description="Seed:")
num_steps_input = widgets.IntSlider(value=20, min=1, max=100, step=1, description="Steps:")
guidance_scale_input = widgets.FloatSlider(value=1.0, min=0.1, max=10.0, step=0.1, description="Guidance Scale:")
video_scale_input = widgets.FloatSlider(value=0.5, min=0.1, max=1.0, step=0.1, description="Video Scale:")
output_fps_input = widgets.IntSlider(value=25, min=6, max=60, step=1, description="Output FPS:")

run_button = widgets.Button(description="Run Inference")
output_display = widgets.Output()

def on_run_button_click(change):
    with output_display:
        output_display.clear_output()

        # Validate uploads
        if not image_upload.value:
            print("Please upload an image.")
            return

        if not text_input.value.strip():
            print("Please enter some text for speech synthesis.")
            return

        # Process image
        image_file_info = next(iter(image_upload.value.values()))
        image_path = image_file_info.get('name', 'uploaded_image.png')
        with open(image_path, "wb") as f:
            f.write(image_file_info['content'])

        # Generate audio from text
        print("Generating speech from text...")
        mp3_path = generate_audio_from_text(text_input.value, lang=language_selector.value)

        # Convert MP3 to WAV (LatentSync needs WAV)
        wav_path = convert_mp3_to_wav(mp3_path)

        # Get audio duration with padding
        audio_path, num_frames = pad_audio_to_multiple_of_16(wav_path, target_fps=25)

        img = cv2.imread(image_path)
        if img is None:
            print("Error: Could not read the image file.")
            return

        height, width, _ = img.shape
        video_path = "generated_video.mp4"
        video_path = create_video_from_image(image_path, video_path, num_frames)

        try:
            print("Running inference...")
            output_path = "output_video.mp4"
            perform_inference(video_path, audio_path, seed_input.value, num_steps_input.value, guidance_scale_input.value, output_path)

            output_path = convert_video_fps(output_path, output_fps_input.value)

            from IPython.display import Video, display
            print("Inference complete. Displaying output video:")
            display(Video(output_path, embed=True, width=int(width * video_scale_input.value), height=int(height * video_scale_input.value)))

            # Add download button for the final video
            print("\nYou can download the video using the code below:")
            print("from google.colab import files")
            print(f"files.download('{output_path}')")

        except Exception as e:
            print(f"Error during inference: {str(e)}")
        finally:
            torch.cuda.empty_cache()
            for path in [video_path, audio_path, mp3_path, wav_path, image_path]:
                if path and os.path.exists(path):
                    try:
                        os.remove(path)
                    except:
                        pass

####Main

In [4]:
run_button.on_click(on_run_button_click)

# Display the UI
widgets_box = widgets.VBox([
    image_upload,
    text_input,
    language_selector,
    seed_input,
    num_steps_input,
    guidance_scale_input,
    video_scale_input,
    output_fps_input,
    run_button,
    output_display
])
display(widgets_box)

VBox(children=(FileUpload(value={}, accept='image/*', description='Upload Image'), Textarea(value='Hello, this…