**PREPARE ENVIRONMENT**

In [2]:
# @title
!pip install diffusers mediapipe>=0.10.8 transformers huggingface-hub omegaconf
!pip install einops opencv-python face-alignment decord ffmpeg-python
!pip install safetensors soundfile

!git clone https://github.com/Isi-dev/LatentSync
%cd LatentSync

import os
from google.colab import files
import torch
from omegaconf import OmegaConf
from diffusers import AutoencoderKL, DDIMScheduler
from latentsync.models.unet import UNet3DConditionModel
from latentsync.pipelines.lipsync_pipeline import LipsyncPipeline
from latentsync.whisper.audio2feature import Audio2Feature
from diffusers.utils.import_utils import is_xformers_available
from accelerate.utils import set_seed
import ipywidgets as widgets

os.makedirs("/root/.cache/torch/hub/checkpoints", exist_ok=True)

!wget https://huggingface.co/Isi99999/LatentSync/resolve/main/auxiliary/s3fd-619a316812.pth -O /root/.cache/torch/hub/checkpoints/s3fd-619a316812.pth
!wget https://huggingface.co/Isi99999/LatentSync/resolve/main/auxiliary/2DFAN4-cd938726ad.zip -O /root/.cache/torch/hub/checkpoints/2DFAN4-cd938726ad.zip

!mkdir -p checkpoints

!wget https://huggingface.co/Isi99999/LatentSync/resolve/main/latentsync_unet.pt -O checkpoints/latentsync_unet.pt
!wget https://huggingface.co/Isi99999/LatentSync/resolve/main/whisper/tiny.pt -O checkpoints/tiny.pt
!wget https://huggingface.co/stabilityai/sd-vae-ft-mse/resolve/main/diffusion_pytorch_model.safetensors -O checkpoints/diffusion_pytorch_model.safetensors
!wget https://huggingface.co/stabilityai/sd-vae-ft-mse/raw/main/config.json -O checkpoints/config.json




def perform_inference(video_path, audio_path, seed=1247, num_inference_steps=20, guidance_scale=1.0, output_path="output_video.mp4"):
    config_path = "configs/unet/first_stage.yaml"
    inference_ckpt_path = "checkpoints/latentsync_unet.pt"

    config = OmegaConf.load(config_path)

    is_fp16_supported = torch.cuda.is_available() and torch.cuda.get_device_capability()[0] > 7
    dtype = torch.float16 if is_fp16_supported else torch.float32

    scheduler = DDIMScheduler.from_pretrained("configs")

    whisper_model_path = "checkpoints/tiny.pt"
    audio_encoder = Audio2Feature(model_path=whisper_model_path, device="cuda", num_frames=config.data.num_frames)

    vae = AutoencoderKL.from_pretrained("checkpoints", torch_dtype=dtype, local_files_only=True)
    vae.config.scaling_factor = 0.18215
    vae.config.shift_factor = 0

    unet, _ = UNet3DConditionModel.from_pretrained(
        OmegaConf.to_container(config.model),
        inference_ckpt_path,
        device="cpu",
    )

    unet = unet.to(dtype=dtype)

    if is_xformers_available():
        unet.enable_xformers_memory_efficient_attention()
        print('x_formers available!')

    pipeline = LipsyncPipeline(
        vae=vae,
        audio_encoder=audio_encoder,
        unet=unet,
        scheduler=scheduler,
    ).to("cuda")

    set_seed(seed)

    pipeline(
        video_path=video_path,
        audio_path=audio_path,
        video_out_path=output_path,
        video_mask_path=output_path.replace(".mp4", "_mask.mp4"),
        num_frames=config.data.num_frames,
        num_inference_steps=num_inference_steps,
        guidance_scale=guidance_scale,
        weight_dtype=dtype,
        width=config.data.resolution,
        height=config.data.resolution,
    )
    return output_path


fatal: destination path 'LatentSync' already exists and is not an empty directory.
/content/LatentSync
--2025-02-27 06:22:34--  https://huggingface.co/Isi99999/LatentSync/resolve/main/auxiliary/s3fd-619a316812.pth
Resolving huggingface.co (huggingface.co)... 3.165.160.61, 3.165.160.12, 3.165.160.11, ...
Connecting to huggingface.co (huggingface.co)|3.165.160.61|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://cdn-lfs-us-1.hf.co/repos/01/db/01db91728569792dcc32345218b5246807210ac43ec688a2ea6a0e0f00475624/619a31681264d3f7f7fc7a16a42cbbe8b23f31a256f75a366e5a1bcd59b33543?response-content-disposition=inline%3B+filename*%3DUTF-8%27%27s3fd-619a316812.pth%3B+filename%3D%22s3fd-619a316812.pth%22%3B&Expires=1740640954&Policy=eyJTdGF0ZW1lbnQiOlt7IkNvbmRpdGlvbiI6eyJEYXRlTGVzc1RoYW4iOnsiQVdTOkVwb2NoVGltZSI6MTc0MDY0MDk1NH19LCJSZXNvdXJjZSI6Imh0dHBzOi8vY2RuLWxmcy11cy0xLmhmLmNvL3JlcG9zLzAxL2RiLzAxZGI5MTcyODU2OTc5MmRjYzMyMzQ1MjE4YjUyNDY4MDcyMTBhYzQzZWM2ODhhMmVhNmEwZ

**RUN IMAGE TO VIDEO**

In [3]:
# @title
import cv2
import torchaudio
import subprocess

image_upload = widgets.FileUpload(accept="image/*", multiple=False, description="Upload Image")
audio_upload = widgets.FileUpload(accept=".wav,.mp3,.aac,.flac", multiple=False, description="Upload Audio")
seed_input = widgets.IntText(value=1247, description="Seed:")
num_steps_input = widgets.IntSlider(value=20, min=1, max=100, step=1, description="Steps:")
guidance_scale_input = widgets.FloatSlider(value=1.0, min=0.1, max=10.0, step=0.1, description="Guidance Scale:")
video_scale_input = widgets.FloatSlider(value=0.5, min=0.1, max=1.0, step=0.1, description="Video Scale:")
output_fps_input = widgets.IntSlider(value=25, min=6, max=60, step=1, description="Output FPS:")

run_button = widgets.Button(description="Run Inference")
output_display = widgets.Output()

def convert_video_fps(input_path, target_fps):
    if not os.path.exists(input_path) or os.path.getsize(input_path) == 0:
        print(f"Error: The video file {input_path} is missing or empty.")
        return None

    output_path = f"converted_{target_fps}fps.mp4"

    audio_check_cmd = [
        "ffprobe", "-i", input_path, "-show_streams", "-select_streams", "a",
        "-loglevel", "error"
    ]
    audio_present = subprocess.run(audio_check_cmd, capture_output=True, text=True).stdout.strip() != ""

    cmd = [
        "ffmpeg", "-y", "-i", input_path,
        "-filter:v", f"fps={target_fps}",
        "-c:v", "libx264", "-preset", "fast", "-crf", "18",
    ]

    if audio_present:
        cmd.extend(["-c:a", "aac", "-b:a", "192k"])
    else:
        cmd.append("-an")

    cmd.append(output_path)

    subprocess.run(cmd, check=True)
    print(f"Converted video saved as {output_path}")
    return output_path

# def add_silent_frames(audio_path, target_fps=25):

#     waveform, sample_rate = torchaudio.load(audio_path)
#     silent_duration = 25 / target_fps  # Two frames at target FPS
#     silent_samples = int(silent_duration * sample_rate)
#     silent_waveform = torch.zeros((waveform.shape[0], silent_samples))

#     # Concatenate silence at the beginning for mouth correction
#     new_waveform = torch.cat((silent_waveform, waveform), dim=1)
#     new_audio_path = "audio_with_silence.wav"
#     torchaudio.save(new_audio_path, new_waveform, sample_rate)

#     return new_audio_path




def pad_audio_to_multiple_of_16(audio_path, target_fps=25):

    # audio_path = add_silent_frames(audio_path)

    waveform, sample_rate = torchaudio.load(audio_path)
    audio_duration = waveform.shape[1] / sample_rate  # Duration in seconds

    num_frames = int(audio_duration * target_fps)

    # Pad audio to ensure frame count is a multiple of 16
    remainder = num_frames % 16
    if remainder > 0:
        pad_frames = 16 - remainder
        pad_samples = int((pad_frames / target_fps) * sample_rate)
        pad_waveform = torch.zeros((waveform.shape[0], pad_samples))  # Silence padding
        waveform = torch.cat((waveform, pad_waveform), dim=1)

        # Save the padded audio
        padded_audio_path = "padded_audio.wav"
        torchaudio.save(padded_audio_path, waveform, sample_rate)
    else:
        padded_audio_path = audio_path  # No padding needed

    padded_duration = waveform.shape[1] / sample_rate
    padded_num_frames = int(padded_duration * target_fps)

    return padded_audio_path, padded_num_frames



def create_video_from_image(image_path, output_video_path, num_frames, fps=25):
    """Convert an image into a video of specified length (num_frames at 25 FPS)."""
    img = cv2.imread(image_path)
    if img is None:
        print("Error: Unable to read the image.")
        return None

    height, width, _ = img.shape
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    video_writer = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))

    for _ in range(num_frames):
        video_writer.write(img)

    video_writer.release()
    print(f"Created video {output_video_path} with {num_frames} frames ({num_frames / fps:.2f} seconds).")
    return output_video_path


def on_run_button_click(change):
    with output_display:
        output_display.clear_output()

        # Validate uploads
        if not audio_upload.value or not image_upload.value:
            print("Please upload both an image and an audio file.")
            return

        # Process audio
        audio_file_info = next(iter(audio_upload.value.values()))
        audio_path = audio_file_info.get('name', 'uploaded_audio.wav')
        with open(audio_path, "wb") as f:
            f.write(audio_file_info['content'])

        # Get audio duration with padding
        audio_path, num_frames = pad_audio_to_multiple_of_16(audio_path, target_fps=25)

        # Process image and create video
        image_file_info = next(iter(image_upload.value.values()))
        image_path = image_file_info.get('name', 'uploaded_image.png')
        with open(image_path, "wb") as f:
            f.write(image_file_info['content'])

        img = cv2.imread(image_path)
        if img is None:
            print("Error: Could not read the image file.")
            return

        height, width, _ = img.shape
        video_path = "generated_video.mp4"
        video_path = create_video_from_image(image_path, video_path, num_frames)

        try:
            print("Running inference...")
            output_path = "output_video.mp4"
            perform_inference(video_path, audio_path, seed_input.value, num_steps_input.value, guidance_scale_input.value, output_path)

            output_path = convert_video_fps(output_path, output_fps_input.value)

            from IPython.display import Video
            print("Inference complete. Displaying output video:")
            display(Video(output_path, embed=True, width=int(width * video_scale_input.value), height=int(height * video_scale_input.value)))

        finally:
            torch.cuda.empty_cache()
            for path in [video_path, audio_path, image_path]:
                if path and os.path.exists(path):
                    os.remove(path)

run_button.on_click(on_run_button_click)

# Display the UI
widgets_box = widgets.VBox([
    image_upload, audio_upload,
    seed_input, num_steps_input, guidance_scale_input, video_scale_input,
    output_fps_input, run_button, output_display
])
display(widgets_box)


VBox(children=(FileUpload(value={}, accept='image/*', description='Upload Image'), FileUpload(value={}, accept…

**RUN VIDEO TO VIDEO**

In [None]:
# @title

import ipywidgets as widgets
import torch
import torchaudio
import subprocess
import os
import ffmpeg


def convert_video_fps(input_path, target_fps):
    if not os.path.exists(input_path) or os.path.getsize(input_path) == 0:
        print(f"Error: The video file {input_path} is missing or empty.")
        return None

    output_path = f"converted_{target_fps}fps.mp4"

    audio_check_cmd = [
        "ffprobe", "-i", input_path, "-show_streams", "-select_streams", "a",
        "-loglevel", "error"
    ]
    audio_present = subprocess.run(audio_check_cmd, capture_output=True, text=True).stdout.strip() != ""

    cmd = [
        "ffmpeg", "-y", "-i", input_path,
        "-filter:v", f"fps={target_fps}",
        "-c:v", "libx264", "-preset", "fast", "-crf", "18",
    ]

    if audio_present:
        cmd.extend(["-c:a", "aac", "-b:a", "192k"])
    else:
        cmd.append("-an")

    cmd.append(output_path)

    subprocess.run(cmd, check=True)
    print(f"Converted video saved as {output_path}")
    return output_path


def trim_video(video_path, target_duration):
    if not os.path.exists(video_path) or os.path.getsize(video_path) == 0:
        print(f"Error: The video file {video_path} is missing or empty.")
        return video_path

    has_audio = False
    try:
        probe = ffmpeg.probe(video_path, v='error', select_streams='a:0', show_entries='stream=codec_type')
        has_audio = any(stream['codec_type'] == 'audio' for stream in probe['streams'])
    except ffmpeg.Error as e:
        print(f"Error while probing video: {e}")
        return video_path

    trimmed_video_path = "trimmed_video.mp4"
    try:
        if has_audio:
            ffmpeg.input(video_path, ss=0, to=target_duration).output(trimmed_video_path, codec="libx264", audio_codec="aac").run()
        else:
            ffmpeg.input(video_path, ss=0, to=target_duration).output(trimmed_video_path, codec="libx264").run()
        print("Video trimmed")
    except ffmpeg.Error as e:
        print(f"Error during video trimming: {e}")
        return video_path

    return trimmed_video_path


def has_audio(video_path):
    try:
        probe = ffmpeg.probe(video_path, v='error', select_streams='a', show_entries='stream=index')
        return len(probe['streams']) > 0
    except ffmpeg.Error:
        return False

def extend_video(video_path, target_duration):
    if not os.path.exists(video_path) or os.path.getsize(video_path) == 0:
        print(f"Error: The video file {video_path} is missing or empty.")
        return video_path

    audio_exists = has_audio(video_path)

    try:
        probe = ffmpeg.probe(video_path, v='error', select_streams='v:0', show_entries='format=duration')
        original_duration = float(probe['format']['duration'])
    except ffmpeg.Error as e:
        print(f"Error: Unable to fetch video duration: {e.stderr.decode()}")
        return video_path

    if original_duration <= 0:
        print("Error: Invalid video duration!")
        return video_path

    print("Extending video...")

    clips = [video_path]
    total_duration = original_duration
    extensions = 0

    while total_duration < target_duration:
        extensions += 1
        reversed_clip = reverse_video(clips[-1], audio_exists)
        clips.append(reversed_clip)
        total_duration += original_duration

    print(f"The video was extended {extensions} time(s)")

    extended_video_path = "extended_video.mp4"

    try:
        inputs = [ffmpeg.input(clip) for clip in clips]

        if audio_exists:
            concat = ffmpeg.concat(*inputs, v=1, a=1).output(extended_video_path, codec="libx264", audio_codec="aac", format="mp4", vcodec="libx264", acodec="aac")
        else:
            concat = ffmpeg.concat(*inputs, v=1, a=0).output(extended_video_path, codec="libx264", format="mp4", vcodec="libx264")

        concat.run(overwrite_output=True)
    except ffmpeg.Error as e:
        print(f"Error during video concatenation: {e.stderr.decode()}")
        return video_path

    for clip in clips[1:]:
        if os.path.exists(clip):
            os.remove(clip)

    return extended_video_path


def reverse_video(video_path, audio_exists):
    reversed_video_path = f"reversed_{os.path.basename(video_path)}"

    try:
        if audio_exists:
            ffmpeg.input(video_path).output(reversed_video_path, vf='reverse', af='areverse').run(overwrite_output=True)
        else:
            ffmpeg.input(video_path).output(reversed_video_path, vf='reverse').run(overwrite_output=True)
    except ffmpeg.Error as e:
        print(f"Error during video reversal: {e.stderr.decode()}")
        return video_path

    return reversed_video_path


def get_video_duration(video_path):
    try:
        probe = ffmpeg.probe(video_path, v='error', select_streams='v:0', show_entries='format=duration')
        return float(probe['format']['duration'])
    except ffmpeg.Error as e:
        print(f"Error: Unable to fetch video duration for {video_path}: {e}")
        return 0


def pad_audio_to_multiple_of_16(audio_path, target_fps=25):
    waveform, sample_rate = torchaudio.load(audio_path)
    audio_duration = waveform.shape[1] / sample_rate
    num_frames = int(audio_duration * target_fps)
    remainder = num_frames % 16

    if remainder > 0:
        pad_frames = 16 - remainder
        pad_samples = int((pad_frames / target_fps) * sample_rate)
        pad_waveform = torch.zeros((waveform.shape[0], pad_samples))
        waveform = torch.cat((waveform, pad_waveform), dim=1)
        padded_audio_path = "padded_audio.wav"
        torchaudio.save(padded_audio_path, waveform, sample_rate)
    else:
        padded_audio_path = audio_path

    return padded_audio_path, int((waveform.shape[1] / sample_rate) * target_fps), waveform.shape[1] / sample_rate

video_upload = widgets.FileUpload(accept=".mp4", multiple=False, description="Upload Video")
audio_upload = widgets.FileUpload(accept=".wav,.mp3,.aac,.flac", multiple=False, description="Upload Audio")
seed_input = widgets.IntText(value=1247, description="Seed:")
num_steps_input = widgets.IntSlider(value=20, min=1, max=100, step=1, description="Steps:")
guidance_scale_input = widgets.FloatSlider(value=1.0, min=0.1, max=10.0, step=0.1, description="Guidance Scale:")
video_scale_input = widgets.FloatSlider(value=0.5, min=0.1, max=1.0, step=0.1, description="Video Scale:")
output_fps_input = widgets.IntSlider(value=25, min=6, max=60, step=1, description="Output FPS:")
width, height = 0, 0

run_button = widgets.Button(description="Run Inference")
output_display = widgets.Output()

def on_run_button_click(change):
    with output_display:
        output_display.clear_output()

        if not video_upload.value or not audio_upload.value:
            print("Please upload both video and audio files.")
            return

        video_file_info = next(iter(video_upload.value.values()))
        video_path = "uploaded_video.mp4"
        with open(video_path, "wb") as f:
            f.write(video_file_info['content'])

        global width, height
        if width <= 0 or height <= 0:
            print("Setting output video's width & height.")
            import cv2
            cap = cv2.VideoCapture(video_path)
            if cap.isOpened():
                width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
                height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
            else:
                print("Error: Unable to open video file.")
            cap.release()


        audio_file_info = next(iter(audio_upload.value.values()))
        audio_path = "uploaded_audio.mp3"
        with open(audio_path, "wb") as f:
            f.write(audio_file_info['content'])

        video_path = convert_video_fps(video_path, 25)

        audio_path, num_frames, audio_duration = pad_audio_to_multiple_of_16(audio_path, target_fps=25)


        video_duration = get_video_duration (video_path)

        if audio_duration > video_duration:
            video_path = extend_video(video_path, audio_duration)
            video_duration = get_video_duration (video_path)
            if video_duration > audio_duration:
                video_path = trim_video(video_path, audio_duration)

        elif video_duration > audio_duration:
            video_path = trim_video(video_path, audio_duration)

        try:
            print("Running inference...")
            output_path = "output_video.mp4"
            perform_inference(video_path, audio_path, seed_input.value, num_steps_input.value, guidance_scale_input.value, output_path)

            output_path = convert_video_fps(output_path, output_fps_input.value)

            print("Inference complete. Displaying output video:")
            from IPython.display import Video
            if width <= 0 :
                display(Video(output_path, embed=True))
            else:
                display(Video(output_path, embed=True, width=int(width * video_scale_input.value), height=int(height * video_scale_input.value)))

            # print("Download output video")
            # files.download(output_path)

        finally:
            torch.cuda.empty_cache()
            for file in [video_path, audio_path]:
                if os.path.exists(file):
                    os.remove(file)

run_button.on_click(on_run_button_click)
widgets_box = widgets.VBox([video_upload, audio_upload, seed_input, num_steps_input, guidance_scale_input, video_scale_input, output_fps_input, run_button, output_display])
display(widgets_box)




VBox(children=(FileUpload(value={}, accept='.mp4', description='Upload Video'), FileUpload(value={}, accept='.…