# Music Video Synthesis
* Extract lyrics from song with timestamps
* Compose scenes, include timestamps
* Construct video text prompt for each scene
* Build videos for each scene
* Stitch together

# We will use openai whipser for stability

In [None]:
#!pip install --quiet --upgrade pip
#!pip install --quiet --upgrade openai-whisper
# Ubuntu or Debian
#!sudo apt update && sudo apt install ffmpeg
#!pip install setuptools-rust

In [2]:
import cv2
import diffusers
import gc
import imageio
import imageio_ffmpeg
import json
import math
import moviepy.editor as mp
import numpy as np
import os
import random
import tempfile
import time
import transformers
import torch
import whisper

from datetime import datetime, timedelta
from diffusers import AutoencoderKL, AutoencoderKLCogVideoX, AutoPipelineForText2Image, CogVideoXTransformer3DModel, CogVideoXPipeline, CogVideoXDPMScheduler
from diffusers import CogVideoXTransformer3DModel, CogVideoXImageToVideoPipeline
from diffusers.image_processor import VaeImageProcessor
from diffusers.models.transformers.transformer_flux import FluxTransformer2DModel
from diffusers.pipelines.flux.pipeline_flux import FluxPipeline
from diffusers.utils import export_to_video, load_video, load_image
from huggingface_hub import hf_hub_download, snapshot_download
from openai import OpenAI
from optimum.quanto import freeze, qfloat8, quantize, requantize
from PIL import Image
from safetensors.torch import load_file as load_safetensors
from sd_embed.embedding_funcs import get_weighted_text_embeddings_flux1
from torchao.quantization import quantize_, int8_weight_only, int8_dynamic_activation_int8_weight
from diffusers.image_processor import VaeImageProcessor
from transformers import CLIPTextModel, T5EncoderModel

dtype = torch.bfloat16
MAX_SEED = np.iinfo(np.int32).max
device = "cuda" if torch.cuda.is_available() else "cpu"
retry_limit = 3

2024-10-09 21:02:45.757601: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:485] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-10-09 21:02:45.838527: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:8454] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-10-09 21:02:45.862794: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1452] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2024-10-09 21:02:46.036550: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [3]:
# Configuration
CONFIG = {
    "openai_api_key": "",
    "openai_model": "gpt-4o-mini",
    "openai_model_large": "gpt-4o",
    "hf_token": "",
    "base_working_dir": "./images",
    "base_video_dir": "./output",
    "audio_files": [
        "//mnt/d/audio/VampireLament.mp3",
        "//mnt/d/audio/BeethovenGhost.mp3",
        "//mnt/d/audio/VampirePriest.mp3",
        "//mnt/d/audio/PhantomRasta.mp3",
        "//mnt/d/audio/LonesomeGhost.mp3",
        "//mnt/d/audio/ChupacabraTease.mp3",
        # Add more audio file paths here
    ],
    "device": device,
    "dtype": dtype,
    "retry_limit": retry_limit,
    "MAX_SEED": MAX_SEED,
}

# Ensure base directories exist
os.makedirs(CONFIG["base_working_dir"], exist_ok=True)
os.makedirs(CONFIG["base_video_dir"], exist_ok=True)

In [4]:
def get_openai_prompt_response(
    prompt: str,
    config: dict,
    max_tokens: int = 6000,
    temperature: float = 0.33,
    openai_model: str = "",
):
    """
    Sends a prompt to OpenAI's API and retrieves the response with retry logic.
    """
    client = OpenAI(api_key=config["openai_api_key"])
    response = client.chat.completions.create(
        max_tokens=max_tokens,
        messages=[
            {
                "role": "system",
                "content": """Act as a helpful assistant, you are an expert editor.""",
            },
            {"role": "user", "content": prompt},
        ],
        model=openai_model or config["openai_model"],
        temperature=temperature,
    )

    retry_count = 0
    while retry_count < config["retry_limit"]:
        try:
            message_content = response.choices[0].message.content
            return message_content
        except Exception as e:
            print(f"Error occurred: {e}")
            retry_count += 1
            if retry_count == config["retry_limit"]:
                print("Retry limit reached. Moving to the next iteration.")
                return ""
            else:
                print(f"Retrying... (Attempt {retry_count}/{config['retry_limit']})")
                time.sleep(1)  # Optional: wait before retrying


def load_quanto_transformer(repo_path):
    with open(hf_hub_download(repo_path, "transformer/quantization_map.json"), "r") as f:
        quantization_map = json.load(f)
    with torch.device("meta"):
        transformer = diffusers.FluxTransformer2DModel.from_config(hf_hub_download(repo_path, "transformer/config.json")).to(dtype)
    state_dict = load_safetensors(hf_hub_download(repo_path, "transformer/diffusion_pytorch_model.safetensors"))
    requantize(transformer, state_dict, quantization_map, device=torch.device("cuda"))
    return transformer


def load_quanto_text_encoder_2_longer(repo_path, max_length=512):
    with open(hf_hub_download(repo_path, "text_encoder_2/quantization_map.json"), "r") as f:
        quantization_map = json.load(f)
    with open(hf_hub_download(repo_path, "text_encoder_2/config.json")) as f:
        t5_config = transformers.T5Config(**json.load(f))
    
    # Update the config for longer sequence length
    t5_config.max_position_embeddings = max_length
    
    with torch.device("meta"):
        text_encoder_2 = transformers.T5EncoderModel(t5_config).to(dtype)
    
    state_dict = load_safetensors(hf_hub_download(repo_path, "text_encoder_2/model.safetensors"))
    requantize(text_encoder_2, state_dict, quantization_map, device=torch.device("cuda"))
    
    return text_encoder_2


def load_flux_pipe():
    # Load the main pipeline without the transformer or text_encoder_2 initially
    pipe = None
    clip_repo = "zer0int/CLIP-GmP-ViT-L-14"
    text_encoder = CLIPTextModel.from_pretrained(clip_repo, torch_dtype=dtype)
    
    pipe = AutoPipelineForText2Image.from_pretrained(
        "Disty0/FLUX.1-dev-qint8", 
        text_encoder=text_encoder,
        transformer=None, 
        text_encoder_2=None, 
        torch_dtype=dtype
    )
    
    # Load custom transformer and text encoder with specific configurations
    pipe.transformer = load_quanto_transformer("Disty0/FLUX.1-dev-qint8")
    pipe.text_encoder_2 = load_quanto_text_encoder_2_longer(
        "Disty0/FLUX.1-dev-qint8", 
        max_length=512
    )

    
    # Move the pipeline to CUDA with bfloat16 precision for performance
    pipe = pipe.to("cuda", dtype=dtype)

    # Enable memory optimizations (attention slicing)
    pipe.enable_attention_slicing()

    return pipe


def gen_flux_image(pipe, prompt, config: dict, height=1024, width=1024, guidance_scale=3.5, num_inference_steps=32, max_sequence_length=512, seed=-1):
    """
    Generates an image based on the provided prompt using the Flux pipeline.
    """
    if seed == -1:
        seed = random.randint(0, MAX_SEED)
        
    prompt_embeds, pooled_prompt_embeds = get_weighted_text_embeddings_flux1(
        pipe        = pipe,
        prompt    = prompt
    )
    
    image = pipe(
        prompt_embeds               = prompt_embeds,
        pooled_prompt_embeds      = pooled_prompt_embeds,
        height=height,
        width=width,
        guidance_scale=guidance_scale,
        output_type="pil",
        num_inference_steps=num_inference_steps,
        max_sequence_length=max_sequence_length,
        generator=torch.Generator("cpu").manual_seed(seed)
    ).images[0]
    return image


def load_video_pipeline():
    """
    Loads and configures the video generation pipeline.
    """
    quantization = int8_weight_only

    text_encoder = T5EncoderModel.from_pretrained("THUDM/CogVideoX-5b", subfolder="text_encoder", torch_dtype=torch.bfloat16)
    quantize_(text_encoder, quantization())
    
    transformer = CogVideoXTransformer3DModel.from_pretrained("THUDM/CogVideoX-5b", subfolder="transformer", torch_dtype=torch.bfloat16)
    quantize_(transformer, quantization())
    
    i2v_transformer = CogVideoXTransformer3DModel.from_pretrained(
        "THUDM/CogVideoX-5b-I2V", subfolder="transformer", torch_dtype=torch.bfloat16
    )
    
    vae = AutoencoderKLCogVideoX.from_pretrained("THUDM/CogVideoX-5b", subfolder="vae", torch_dtype=torch.bfloat16)
    quantize_(vae, quantization())
    
    # Create pipeline and run inference
    pipe = CogVideoXPipeline.from_pretrained(
        "THUDM/CogVideoX-5b",
        text_encoder=text_encoder,
        transformer=transformer,
        vae=vae,
        torch_dtype=torch.bfloat16,
    )
    #pipe.enable_model_cpu_offload()
    pipe.vae.enable_tiling()
    
    pipe.scheduler = CogVideoXDPMScheduler.from_config(pipe.scheduler.config, timestep_spacing="trailing")
    
    i2v_vae=pipe.vae
    i2v_scheduler=pipe.scheduler
    i2v_tokenizer=pipe.tokenizer
    i2v_text_encoder=pipe.text_encoder
    
    del pipe
    gc.collect()
    
    # Load the pipeline once before the loop
    pipe_image = CogVideoXImageToVideoPipeline.from_pretrained(
        "THUDM/CogVideoX-5b-I2V",
        transformer=i2v_transformer,
        vae=i2v_vae,
        scheduler=i2v_scheduler,
        tokenizer=i2v_tokenizer,
        text_encoder=i2v_text_encoder,
        torch_dtype=torch.bfloat16,
    ).to(device)

    pipe_image.enable_sequential_cpu_offload()
    pipe_image.vae.enable_slicing()
    pipe_image.vae.enable_tiling()

    return pipe_image


def infer(pipe_image, prompt: str, image_input: str, config: dict, num_inference_steps: int = 50, guidance_scale: float = 7.0, seed: int = -1, num_frames: int = 49):
    """
    Generates video frames from an image and prompt using the video pipeline.
    """
    if seed == -1:
        seed = random.randint(0, 255)

    image_input = Image.open(image_input).resize(size=(720, 480))  # Convert to PIL
    image = load_image(image_input)

    video_pt = pipe_image(
        image=image,
        prompt=prompt,
        num_inference_steps=num_inference_steps,
        num_videos_per_prompt=1,
        use_dynamic_cfg=True,
        output_type="pt",
        guidance_scale=guidance_scale,
        generator=torch.Generator(device="cpu").manual_seed(seed),
        num_frames=num_frames,
    ).frames

    return video_pt, seed


def generate_video(pipe_image, prompt, image_input, config: dict, seed_value: int = -1, video_filename: str = "", num_frames: int = 49):
    """
    Generates and saves a video from the provided image and prompt.
    """
    latents, seed = infer(
        pipe_image,
        prompt,
        image_input,
        config,
        num_inference_steps=50,
        guidance_scale=7.0,
        seed=seed_value,
        num_frames=num_frames,
    )
    batch_size = latents.shape[0]
    batch_video_frames = []
    for batch_idx in range(batch_size):
        pt_image = latents[batch_idx]
        pt_image = torch.stack([pt_image[i] for i in range(pt_image.shape[0])])
        image_np = VaeImageProcessor.pt_to_numpy(pt_image)
        image_pil = VaeImageProcessor.numpy_to_pil(image_np)
        batch_video_frames.append(image_pil)

    video_path = save_video(
        batch_video_frames[0],
        fps=math.ceil((len(batch_video_frames[0]) - 1) / 6),
        filename=video_filename
    )
    return video_path


def save_video(frames, fps: int, filename: str):
    """
    Saves a list of frames as a video file.
    """
    with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as temp_file:
        temp_video_path = temp_file.name
        writer = imageio.get_writer(temp_video_path, fps=fps)
        for frame in frames:
            writer.append_data(np.array(frame))
        writer.close()

    os.rename(temp_video_path, filename)
    return filename


def convert_to_gif(video_path: str) -> str:
    """
    Converts a video file to a GIF.
    """
    clip = mp.VideoFileClip(video_path)
    clip = clip.set_fps(8)
    clip = clip.resize(height=240)
    gif_path = video_path.replace(".mp4", ".gif")
    clip.write_gif(gif_path, fps=8)
    return gif_path


def resize_if_unfit(input_video: str) -> str:
    """
    Resizes the video to the target dimensions if it does not match.
    """
    width, height = get_video_dimensions(input_video)

    if width == 720 and height == 480:
        return input_video
    else:
        return center_crop_resize(input_video)


def get_video_dimensions(input_video_path: str) -> tuple:
    """
    Retrieves the dimensions of the video.
    """
    reader = imageio_ffmpeg.read_frames(input_video_path)
    metadata = next(reader)
    return metadata["size"]


def center_crop_resize(input_video_path: str, target_width: int = 720, target_height: int = 480) -> str:
    """
    Resizes and center-crops the video to the target dimensions.
    """
    cap = cv2.VideoCapture(input_video_path)

    orig_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    orig_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    orig_fps = cap.get(cv2.CAP_PROP_FPS)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    width_factor = target_width / orig_width
    height_factor = target_height / orig_height
    resize_factor = max(width_factor, height_factor)

    inter_width = int(orig_width * resize_factor)
    inter_height = int(orig_height * resize_factor)

    target_fps = 8
    ideal_skip = max(0, math.ceil(orig_fps / target_fps) - 1)
    skip = min(5, ideal_skip)  # Cap at 5

    while (total_frames / (skip + 1)) < 49 and skip > 0:
        skip -= 1

    processed_frames = []
    frame_count = 0
    total_read = 0

    while frame_count < 49 and total_read < total_frames:
        ret, frame = cap.read()
        if not ret:
            break

        if total_read % (skip + 1) == 0:
            resized = cv2.resize(frame, (inter_width, inter_height), interpolation=cv2.INTER_AREA)

            start_x = (inter_width - target_width) // 2
            start_y = (inter_height - target_height) // 2
            cropped = resized[start_y:start_y + target_height, start_x:start_x + target_width]

            processed_frames.append(cropped)
            frame_count += 1

        total_read += 1

    cap.release()

    with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as temp_file:
        temp_video_path = temp_file.name
        fourcc = cv2.VideoWriter_fourcc(*"mp4v")
        out = cv2.VideoWriter(temp_video_path, fourcc, target_fps, (target_width, target_height))

        for frame in processed_frames:
            out.write(frame)

        out.release()

    return temp_video_path


def extract_last_frame(video_filename: str, output_image_filename: str):
    """
    Extracts the last frame from a video file and saves it as an image.
    """
    try:
        reader = imageio.get_reader(video_filename, 'ffmpeg')
        last_frame = None
        for frame in reader:
            last_frame = frame
        reader.close()

        if last_frame is not None:
            imageio.imwrite(output_image_filename, last_frame)
            print(f"Last frame saved successfully as '{output_image_filename}'.")
        else:
            print("The video contains no frames.")

    except FileNotFoundError:
        print(f"Error: The file '{video_filename}' was not found.")
    except ValueError as ve:
        print(f"ValueError: {ve}")
    except RuntimeError as re:
        print(f"RuntimeError: {re}")
    except Exception as e:
        print(f"An unexpected error occurred: {e}")


def create_scenes(text: str, video_summary: str, config: dict):
    """
    Creates scenes based on the extracted lyrics using OpenAI's API.
    """
    # Generate scenes JSON
    prompt = f'''Create a json list of scenes (groupings of text), scene_description (200 words of less), and action_sequence (30 words or less) from the following text.  Scenes should be groups of similar lyrics with new scenes when the context changes.  Text: {text}   
The json list should have the start value for the first item in the scene and the text that is combined for all items in the same scene.  
The scene_description should include details such as attire, setting, mood, lighting, and any significant movements or expressions, painting a clear visual scene consistent with the video theme and different from other scenes.
The action_sequence should describe the action in the scene.  Scenes should be unique, creative, imaginative, and awe inspiring create a viral hit animation.
Return only the json list, less jargon. The json list fields should be: start, text, scene_description, action_sequence'''

    result = get_openai_prompt_response(prompt, config, openai_model=config["openai_model"])
    result = result.replace("```", "").replace("```json\n", "").replace("json\n", "").replace("\n", "")
    scenes = json.loads(result)
    return scenes


def process_audio_scenes(audio_file: str, config: dict):
    """
    Processes a single audio file through the entire workflow.
    """
    # Create unique identifier based on audio file name
    audio_basename = os.path.splitext(os.path.basename(audio_file))[0]
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    unique_id = f"{audio_basename}_{timestamp}"

    # Create unique directories for images and videos
    print(f"Create unique directories for images and videos")
    audio_images_dir = os.path.join(config["base_working_dir"], unique_id)
    audio_videos_dir = os.path.join(config["base_video_dir"], unique_id)
    os.makedirs(audio_images_dir, exist_ok=True)
    os.makedirs(audio_videos_dir, exist_ok=True)

    # Step 1: Transcribe audio using Whisper
    print(f"Transcribe audio using Whisper")
    model = whisper.load_model("turbo")
    result = model.transcribe(audio_file)

    # Cleanup Whisper model memory
    del model
    if torch.cuda.is_available():
        torch.cuda.empty_cache()
    gc.collect()

    segments = result['segments']

    # Extract list of start times and texts
    segment_texts_and_start_times = [(segment['text'].strip(), segment['start']) for segment in segments]

    # Combine texts
    text = ""
    for segment_text, start in segment_texts_and_start_times:
        text += f"Start: {start}, Text: {segment_text}\n"

    last_end_value = segments[-1]['end']

    # Step 2: Generate video summary using OpenAI
    print(f"Generate video summary using OpenAI")
    video_summary_prompt = f'Create a short summary that describes a music video based on these lyrics: {text}'
    video_summary = get_openai_prompt_response(video_summary_prompt, config, openai_model=config["openai_model"])

    # Step 3: Create scenes based on lyrics
    print(f"Create scenes based on lyrics")
    scenes = create_scenes(text, video_summary, config)
    
    return scenes, audio_images_dir, audio_videos_dir

def process_audio_images(config: dict, scenes, audio_images_dir):
    # Step 4: Load Flux pipeline and generate images
    print(f"Load Flux pipeline and generate images")
    flux_pipe = load_flux_pipe()
    height = 480
    width = 720
    guidance_scale = 3.9
    num_inference_steps = 24
    max_sequence_length = 512
    seed = -1

    # Generate images for each scene
    image_num = 1
    for scene in scenes:
        image_prompt = scene['scene_description']
        image = gen_flux_image(flux_pipe, image_prompt, config, height, width, guidance_scale, num_inference_steps, max_sequence_length, seed)
        filename = f"image_{str(image_num).zfill(2)}.jpg"
        image_path = os.path.join(audio_images_dir, filename)
        image.save(image_path, dpi=(300, 300))
        image_num += 1

    return

def process_audio_video(config: dict, scenes, audio_images_dir, audio_videos_dir):
    # Step 6: Load Video Pipeline
    print(f"Load Video Pipeline")
    video_pipe = load_video_pipeline()

    # Temporary image path
    temp_image = os.path.join(audio_images_dir, "temp_image.jpg")
    video_num = 1

    # Step 7: Generate video sequences
    for i, scene in enumerate(scenes):
        prompt = scene["action_sequence"]

        # Use the initial image for each scene
        image_input = os.path.join(audio_images_dir, f"image_{str(i+1).zfill(2)}.jpg")

        # Calculate duration to keep the video in 6-second increments
        if i + 1 < len(scenes):
            next_start_time = scenes[i + 1]["start"]
        else:
            next_start_time = last_end_value  # Use the final ending time for the last scene

        duration = next_start_time - scene["start"]
        num_video_segments = int((duration + 3) // 6)

        print(f"Scene {i} has {num_video_segments} segments")
        for j in range(num_video_segments):
            video_name = f"video_{str(video_num).zfill(2)}_{str(j).zfill(2)}.mp4"
            video_output_path = os.path.join(audio_videos_dir, video_name)
            generate_video(video_pipe, prompt, image_input, config, seed_value=-1, video_filename=video_output_path)
            time.sleep(1)  # Pause for 1 second

            # After generating the video, extract the last frame to use as input for the next segment
            extract_last_frame(video_output_path, temp_image)

            # Use the last frame as input for the next video segment in the same scene
            image_input = temp_image

            video_num += 1  # Increment video number for the next segment

    return


def process_all_audios(audio_files: list, config: dict):
    """
    Processes a list of audio files through the workflow.
    """
    for audio_file in audio_files:
        print(f"Processing audio file: {audio_file}")
        scenes, audio_images_dir, audio_videos_dir = process_audio_scenes(audio_file, config)
        # Cleanup Video pipeline memory
        if torch.cuda.is_available():
            torch.cuda.empty_cache()
        gc.collect()
        # Create starting images for scenes
        process_audio_images(config, scenes, audio_images_dir)
        # Cleanup Video pipeline memory
        if torch.cuda.is_available():
            torch.cuda.empty_cache()
        gc.collect()
        # Create starting images for scenes
        process_audio_video(config, scenes, audio_images_dir, audio_videos_dir)
        # Cleanup Video pipeline memory
        if torch.cuda.is_available():
            torch.cuda.empty_cache()
        gc.collect()


In [None]:
# run
process_all_audios(CONFIG["audio_files"], CONFIG)

Processing audio file: //mnt/d/audio/VampireLament.mp3
Create unique directories for images and videos
Transcribe audio using Whisper


  checkpoint = torch.load(fp, map_location=device)



Generate video summary using OpenAI
Create scenes based on lyrics
Load Flux pipeline and generate images


Loading pipeline components...:   0%|          | 0/5 [00:00<?, ?it/s]

You set `add_prefix_space`. The tokenizer needs to be converted from the slow tokenizers
  deprecate("config-passed-as-path", "1.0.0", deprecation_message, standard_warn=False)

