# Notebook 3: Lip Syncing

In this notebook, we will combine the video we generated in Notebook 1 and the speech clip we generated in Notebook 2 into a video where the speaker's lips follow the text naturally. For this, we will use a cutting-edge lip syncing library called [LatentSync](https://arxiv.org/abs/2412.09262). The high-level intuition for how LatentSync works is as follows:
- In **Step 1++, the model learns to inpaint and reconstruct masked video frames using reference images (visual feature learning).
- In **Step 2**, the model learns the audio-visual correlation (lip syncing) with SyncNet and other losses, focusing on syncing lips to audio.

[SyncNet](https://www.robots.ox.ac.uk/~vgg/publications/2016/Chung16a/chung16a.pdf) is actually a historic model (i.e., from 2016) which quantifies how well lip movements in video match the audio. In lip-sync generation models, SyncNet can be used as a supervisory signal. In LatentSync, it is used both as a loss function and as a metric for lip sync quality.

This notebook uses several dependencies that we will install first.

In [1]:
%%capture
!git clone https://huggingface.co/spaces/fffiloni/LatentSync
!cd LatentSync && pip install -r requirements.txt
!pip install -U accelerate==0.32.0

**IMPORTANT**: Make sure to restart the session after installing the dependencies (Go to `Runtime` $\to$ `Restart Session`).

In [1]:
%cd LatentSync

/content/LatentSync


In [2]:
%%capture
import os
from huggingface_hub import snapshot_download
os.makedirs("checkpoints", exist_ok=True)
snapshot_download(
    repo_id = "chunyu-li/LatentSync",
    local_dir = "./checkpoints"
)

LatentSync uses a very similar pipeline as Diffusers. In fact, it is made to be compatible with the Diffusers API. If you want, you can again try to identify the important components of the LatentSync pipeline by cross-referencing the paper. You will recognise some familiar components, such as the UNet and the Noise Scheduler from Notebook 1. If you don't understand every component yet, don't worry. We will discuss them in the lecture.

In [3]:
import gradio as gr
import spaces
import sys
import shutil
import uuid
import subprocess
from glob import glob
import tempfile
from moviepy.editor import VideoFileClip
from pydub import AudioSegment
import argparse
from omegaconf import OmegaConf
import torch
from diffusers import AutoencoderKL, DDIMScheduler
from latentsync.models.unet import UNet3DConditionModel
from latentsync.pipelines.lipsync_pipeline import LipsyncPipeline
from diffusers.utils.import_utils import is_xformers_available
from accelerate.utils import set_seed
from latentsync.whisper.audio2feature import Audio2Feature

  if event.key is 'enter':



LatentSync works with videos up to 10 seconds, so we first need some helper functions to trim the video and audio if it is too long.

In [4]:
def process_video(input_video_path, temp_dir="temp_dir"):
    """
    Crop a given MP4 video to a maximum duration of 10 seconds if it is longer than 10 seconds.
    Save the new video in the specified folder (default is temp_dir).

    Args:
        input_video_path (str): Path to the input video file.
        temp_dir (str): Directory where the processed video will be saved.

    Returns:
        str: Path to the cropped video file.
    """
    # Ensure the temp_dir exists
    os.makedirs(temp_dir, exist_ok=True)

    # Load the video
    video = VideoFileClip(input_video_path)

    # Determine the output path
    input_file_name = os.path.basename(input_video_path)
    output_video_path = os.path.join(temp_dir, f"cropped_{input_file_name}")

    # Crop the video to 10 seconds if necessary
    if video.duration > 10:
        video = video.subclip(0, 10)

    # Write the cropped video to the output path
    video.write_videofile(output_video_path, codec="libx264", audio_codec="aac")

    # Return the path to the cropped video
    return output_video_path

def process_audio(file_path, temp_dir):
    # Load the audio file
    audio = AudioSegment.from_file(file_path)

    # Check and cut the audio if longer than 4 seconds
    max_duration = 8 * 1000  # 4 seconds in milliseconds
    if len(audio) > max_duration:
        audio = audio[:max_duration]

    # Save the processed audio in the temporary directory
    output_path = os.path.join(temp_dir, "trimmed_audio.wav")
    audio.export(output_path, format="wav")

    # Return the path to the trimmed file
    print(f"Processed audio saved at: {output_path}")
    return output_path



Our main function realises the LatentSync inference process. Let's look at it step by step:

During inference (generation of new videos):
1.	Input:
    - A reference frame (for identity and pose). This is the unprocessed video that we generated in Notebook 1.
    - A masked frame (current video frame with the mouth region masked). This is done internally. We only pass the original video.
    - The driving audio. This is what we generated in Notebook 2.
2.	Audio Embedding:
    - The audio is processed to extract audio embeddings.
    - The embeddings for a window of audio frames around the current frame are bundled together to provide temporal context.
3.	Diffusion Process: We will explain this in the lecture, but on a high level:
    - The reference frame, masked frame, and noised latent variables are concatenated and fed into a U-Net architecture (as in stable diffusion).
    - Audio embeddings are integrated via cross-attention layers in the U-Net.
    - The diffusion model denoises the latent variables step by step, conditioned on both audio and visual features, to generate the target latent representation.
4.	Decoding:
    - The predicted latent is decoded (via a VAE decoder) to obtain the reconstructed video frame with lips synced to audio.
5.	Temporal Consistency:
    - Post-processing (TREPA) is used to align the temporal features of the generated sequence with real video, reducing artifacts like flickering.

In [5]:
def main(video_path, audio_path, progress=gr.Progress(track_tqdm=True)):
    """
    Perform lip-sync video generation using an input video and a separate audio track.

    This function takes an input video (usually a person speaking) and an audio file,
    and synchronizes the video frames so that the lips of the speaker match the audio content.
    It uses a latent diffusion model-based pipeline (LatentSync) for audio-conditioned lip synchronization.

    Args:
        video_path (str): File path to the input video in MP4 format.
        audio_path (str): File path to the input audio file (e.g., WAV or MP3).
        progress (gr.Progress, optional): Gradio progress tracker for UI feedback (auto-injected).

    Returns:
        str: File path to the generated output video with lip synchronization applied.
    """

    inference_ckpt_path = "checkpoints/latentsync_unet.pt"
    unet_config_path = "configs/unet/second_stage.yaml"
    config = OmegaConf.load(unet_config_path)

    print(f"Input video path: {video_path}")
    print(f"Input audio path: {audio_path}")
    print(f"Loaded checkpoint path: {inference_ckpt_path}")

    scheduler = DDIMScheduler.from_pretrained("configs")

    if config.model.cross_attention_dim == 768:
        whisper_model_path = "checkpoints/whisper/small.pt"
    elif config.model.cross_attention_dim == 384:
        whisper_model_path = "checkpoints/whisper/tiny.pt"
    else:
        raise NotImplementedError("cross_attention_dim must be 768 or 384")

    audio_encoder = Audio2Feature(model_path=whisper_model_path, device="cuda", num_frames=config.data.num_frames)

    vae = AutoencoderKL.from_pretrained("stabilityai/sd-vae-ft-mse", torch_dtype=torch.float16)
    vae.config.scaling_factor = 0.18215
    vae.config.shift_factor = 0

    unet, _ = UNet3DConditionModel.from_pretrained(
        OmegaConf.to_container(config.model),
        inference_ckpt_path,  # load checkpoint
        device="cpu",
    )

    unet = unet.to(dtype=torch.float16)

    # set xformers
    """
    if is_xformers_available():
        unet.enable_xformers_memory_efficient_attention()
    """

    pipeline = LipsyncPipeline(
        vae=vae,
        audio_encoder=audio_encoder,
        unet=unet,
        scheduler=scheduler,
    ).to("cuda")

    seed = -1
    if seed != -1:
        set_seed(seed)
    else:
        torch.seed()

    print(f"Initial seed: {torch.initial_seed()}")

    unique_id = str(uuid.uuid4())
    video_out_path = f"video_out{unique_id}.mp4"

    pipeline(
        video_path=video_path,
        audio_path=audio_path,
        video_out_path=video_out_path,
        video_mask_path=video_out_path.replace(".mp4", "_mask.mp4"),
        num_frames=config.data.num_frames,
        num_inference_steps=config.run.inference_steps,
        guidance_scale=1.0,
        weight_dtype=torch.float16,
        width=config.data.resolution,
        height=config.data.resolution,
    )
    return video_out_path

To run the pipeline, upload the video and audio file that we generated in Notebooks 1 and 2 and pass them as arguments to the main function.

In [7]:
main("/content/video.mp4", "/content/audio.wav")

Input video path: /content/video.mp4
Input audio path: /content/audio.wav
Loaded checkpoint path: checkpoints/latentsync_unet.pt
Initial seed: 12538211817369556066
Affine transforming 75 faces...


100%|██████████| 75/75 [00:15<00:00,  4.94steps/s]


video in 25 FPS, audio idx in 50FPS


Doing inference...:   0%|          | 0/4 [00:00<?, ?steps/s]

Sample frames: 16:   0%|          | 0/20 [00:00<?, ?steps/s]

Doing inference...:  25%|██▌       | 1/4 [00:09<00:27,  9.29s/steps]

Sample frames: 16:   0%|          | 0/20 [00:00<?, ?steps/s]

Doing inference...:  50%|█████     | 2/4 [00:10<00:09,  4.59s/steps]

Sample frames: 16:   0%|          | 0/20 [00:00<?, ?steps/s]

Doing inference...:  75%|███████▌  | 3/4 [00:11<00:03,  3.09s/steps]

Sample frames: 16:   0%|          | 0/20 [00:00<?, ?steps/s]

Doing inference...: 100%|██████████| 4/4 [00:13<00:00,  3.29s/steps]


'video_outa24ccc3e-8f22-4308-b902-3733704973da.mp4'

Note that the finished video can be found inside the `LatentSync` directory. You can download and watch it.