# Lyrical Video Generator
## Overview of the Lyrical Video Generator Notebook

This Jupyter Notebook is designed to automate the creation of continuous lyric videos from audio files using advanced AI models and multimedia processing tools. The notebook provides an interactive environment where users can input audio files, process them through transcription and image generation, and generate a fully synchronized lyric video with guaranteed audio-visual coverage.

### **Key Features**

- **Audio File Input**: Users can specify or upload audio files to be processed, with predefined paths for ease of use.
- **Audio Transcription with Coverage**: Implements English transcription using the Whisper model, ensuring every segment of audio is covered with text or placeholders for instrumental sections.
- **Dynamic Visual Segmentation**: Creates time-based segments to guarantee visual content for every second of audio, preventing gaps in the output video.
- **AI-Generated Visuals**: Utilizes Stable Diffusion to generate varied and dynamic images for each segment, enhancing visual appeal with diverse styles and color themes.
- **Continuous Video Assembly**: Combines text overlays, generated images, and audio into a seamless video using MoviePy, with robust error handling for uninterrupted playback.
- **Real-Time Processing Feedback**: Logs each step of the process, from transcription to video rendering, providing transparency and progress updates.
- **Output Visualization**: Displays the final lyric video within the notebook or saves it to a designated output directory for review and sharing.

### **Technologies Used**

- **PyTorch**: Powers the AI models for transcription and image generation, leveraging GPU acceleration when available.
- **Whisper**: Used for accurate audio-to-text transcription with forced language settings to prevent errors.
- **Stable Diffusion (Diffusers)**: Generates unique images for visual segments, ensuring variety in the video output.
- **MoviePy**: Handles video and audio processing, enabling the creation of composite video clips with text overlays.
- **FFmpeg**: Supports audio segmentation and processing for transcription, ensuring high-quality audio handling.
- **PIL (Python Imaging Library)**: Facilitates text rendering for overlays, creating visually appealing lyric displays.
- **tqdm**: Provides progress bars for long-running tasks, improving user experience during processing.

This notebook is ideal for musicians, content creators, and developers who want to create professional lyric videos effortlessly or adapt the code for custom multimedia projects.


### **Pre-trained Models**

The notebook relies on pre-trained models for transcription and image generation. Follow the instructions below to download and set up the necessary models:

- **Whisper Model**: The "base" model is used for audio transcription. You can download and install Whisper by running the following command in your terminal.
- **Stable Diffusion Model**: The "runwayml/stable-diffusion-v1-5" model is utilized for image generation and is accessed through the `diffusers` library with a designated cache directory.


### **Create and Activate the Virtual Environment**

1. Open a terminal or command prompt within the Jupyter Notebook environment (File -> New -> Terminal) and ensure compatibility with bash commands.
2. Navigate to the project directory where the notebook is located.
3. Execute the following commands to create and activate a virtual environment:

```bash
export PROJECT_NAME="Lyrical-Video-Generator"
export PIP_CACHE_DIR=`pwd`/.cache/pip
mkdir -p $PIP_CACHE_DIR
python -m venv --system-site-packages myvenv
source myvenv/bin/activate
pip install ipykernel
python -m ipykernel install --user --name=${PROJECT_NAME}-myvenv --display-name="Python (${PROJECT_NAME}-myvenv)"
echo ""; echo "Before continuing load the created Python kernel: Python (${PROJECT_NAME}-myvenv)"
```
To enhance the functionality of the CoreAI  environment, we need to install some libraries not pre-installed but required for this notebook. 

## Install Required Libraries:

Before running the following command in jupyter notebook, make sure you are in the directory where the Jupyter Notebook and virtual environment is located. This ensures the ./ path is always current. You can use the cd command to change to your project directory and pwd to verify your current directory.

Load the `Python (Lyrical-Video-Generator-myvenv)` kernel before running the below cell. 

In [1]:
import os
def set_env_with_cache_dir(env_var_name: str, subdir: str):
    base_cache = os.path.join(os.getcwd(), ".cache")
    full_path = os.path.join(base_cache, subdir)
    os.environ[env_var_name] = full_path
    os.makedirs(full_path, exist_ok=True)

set_env_with_cache_dir("PIP_CACHE_DIR", "pip")

In [None]:
!. ./myvenv/bin/activate; pip install -r requirements.txt

## Module Imports and Initial Setup
This section imports all the essential Python modules and sets up the project's directory structure. It includes imports for AI models, video and image processing, and utility functions. By defining paths for input, output, and temporary files, it organizes resources for the entire workflow. Suppressing warnings and configuring environment variables also help keep notebook output clean and focused.

In [None]:
import sys
import subprocess
import os
import gc
import re
import numpy as np
from pathlib import Path
import warnings
import shutil
import torch
import whisper
from diffusers import StableDiffusionPipeline

# Handle MoviePy imports with fallback for compatibility
try:
    from moviepy import (
        VideoFileClip, AudioFileClip, ImageClip,
        CompositeVideoClip, TextClip, ColorClip,
        concatenate_videoclips
    )
    print("MoviePy imported successfully")
except ImportError:
    try:
        from moviepy.editor import (
            VideoFileClip, AudioFileClip, ImageClip,
            CompositeVideoClip, TextClip, ColorClip,
            concatenate_videoclips
        )
        print("MoviePy editor imported successfully")
    except ImportError as e:
        print(f"MoviePy import failed: {e}")
        sys.exit(1)

from tqdm.auto import tqdm
from PIL import Image, ImageDraw, ImageFont

# Suppress warnings for cleaner output
warnings.filterwarnings("ignore")

# Define directory paths for input, output, and temporary storage
INPUT_DIRECTORY = Path("./input")
OUTPUT_DIRECTORY = Path("./working")
TEMPORARY_DIRECTORY = Path("./temp")
MODEL_STORAGE_DIRECTORY = Path("./working/model_cache")

# Set environment variable for ImageMagick binary
os.environ["IMAGEMAGICK_BINARY"] = "convert"


In [4]:
# Place your input file in the folder of this notebook and update the variable below
DEFAULT_AUDIO_TRACK = "./input.mp3"

## Utility Functions for File Management
Here, helper functions are defined to manage temporary files and calculate audio durations. These utilities automate cleanup, reclaim disk space, and prevent clutter from intermediate files. They also provide robust error handling for file operations. This ensures efficient resource management throughout the pipeline.

In [5]:
def clear_temporary_files(file_pattern=None):
    """
    Delete temporary files to reclaim disk space.
    
    Args:
        file_pattern (str, optional): Glob pattern to match specific temporary files.
                                      If None, clears entire temporary directory.
    """
    try:
        if file_pattern:
            for file_path in Path(TEMPORARY_DIRECTORY).glob(file_pattern):
                os.remove(file_path)
        else:
            shutil.rmtree(TEMPORARY_DIRECTORY, ignore_errors=True)
            os.makedirs(TEMPORARY_DIRECTORY, exist_ok=True)
        gc.collect()
    except Exception as e:
        print(f"Error clearing temporary files: {e}")

# Initialize necessary directories
os.makedirs(TEMPORARY_DIRECTORY, exist_ok=True)
os.makedirs(MODEL_STORAGE_DIRECTORY, exist_ok=True)

def calculate_audio_length(audio_file_path):
    """
    Calculate the duration of an audio file in seconds.
    
    Args:
        audio_file_path (str): Path to the audio file.
    
    Returns:
        float: Duration of the audio in seconds, or None if an error occurs.
    """
    try:
        audio_clip = AudioFileClip(audio_file_path)
        duration = audio_clip.duration
        audio_clip.close()
        return duration
    except Exception as e:
        print(f"Error calculating audio duration: {e}")
        return None


## Audio Transcription with Full Coverage
This section transcribes the input audio file into text segments using OpenAI Whisper, processing the audio in fixed-duration chunks. It ensures that every part of the audio is covered, inserting placeholder text for instrumental or silent sections. The transcription process includes multiple quality checks to minimize errors and hallucinations. The output is a detailed mapping of lyrics and timings for later video synchronization.

In [6]:
def transcribe_audio_ensured_coverage(audio_file_path, segment_length=10):
    """
    Transcribe audio into text segments with forced English language detection,
    ensuring full coverage by processing in fixed-duration chunks.
    
    Args:
        audio_file_path (str): Path to the input audio file.
        segment_length (int): Duration of each audio chunk in seconds.
    
    Returns:
        dict: Dictionary containing transcribed chunks with timestamps.
    """
    print(f"Transcribing audio with forced English in {segment_length}-second segments...")
    
    audio_duration = calculate_audio_length(audio_file_path)
    if not audio_duration:
        return {"chunks": []}
    
    print(f"Total audio duration: {audio_duration:.2f} seconds")
    
    device = "cuda" if torch.cuda.is_available() else "cpu"
    transcription_model = whisper.load_model("base", device=device)
    transcribed_segments = []
    
    for start_time in range(0, int(audio_duration), segment_length):
        end_time = min(start_time + segment_length, audio_duration)
        print(f"Processing segment {start_time}-{end_time} seconds...")
        
        segment_path = os.path.join(TEMPORARY_DIRECTORY, f"segment_{start_time}.wav")
        
        try:
            # Extract audio segment using FFmpeg with normalization
            ffmpeg_command = [
                'ffmpeg', '-i', audio_file_path,
                '-ss', str(start_time), '-t', str(end_time - start_time),
                '-acodec', 'pcm_s16le', '-ar', '16000',
                '-af', 'volume=2.0',
                '-y', segment_path
            ]
            result = subprocess.run(ffmpeg_command, capture_output=True, text=True)
            
            if result.returncode != 0:
                print(f"FFmpeg failed for segment {start_time}-{end_time}, using placeholder")
                transcribed_segments.append({
                    "text": "♪ ♫ ♪",
                    "timestamp": (start_time, end_time),
                    "is_placeholder": True
                })
                continue
            
            # Transcribe with settings to prevent hallucination and force English
            try:
                segment_result = transcription_model.transcribe(
                    segment_path,
                    word_timestamps=True,
                    verbose=False,
                    language="en",
                    condition_on_previous_text=False,
                    temperature=0.0,
                    suppress_tokens=[-1, 50257, 50362],
                    initial_prompt="",
                    task="transcribe"
                )
                
                # Validate and process transcription segments
                segment_has_content = False
                previous_texts = set()
                
                for seg in segment_result["segments"]:
                    text = seg["text"].strip()
                    start = seg.get("start", 0) + start_time
                    end = seg.get("end", 0) + start_time
                    
                    if not text or len(text) < 2:
                        continue
                    
                    text_lower = text.lower()
                    if text_lower in previous_texts or len(set(text_lower.replace(' ', ''))) < 4 and len(text) > 10:
                        print(f"Skipping repetitive or hallucinated segment: '{text[:50]}...'")
                        continue
                    
                    if any(ord(char) > 127 for char in text):
                        print(f"Skipping non-English segment: '{text[:50]}...'")
                        continue
                    
                    alpha_chars = sum(1 for char in text if char.isalpha())
                    if len(text) > 5 and alpha_chars / len(text) < 0.5:
                        print(f"Skipping non-alphabetic segment: '{text[:50]}...'")
                        continue
                    
                    previous_texts.add(text_lower)
                    segment_has_content = True
                    
                    if "words" in seg and seg["words"]:
                        for word_info in seg["words"]:
                            word_text = word_info.get("word", "").strip()
                            word_start = word_info.get("start", 0) + start_time
                            word_end = word_info.get("end", 0) + start_time
                            if word_text and word_start < audio_duration and all(ord(char) <= 127 for char in word_text):
                                transcribed_segments.append({
                                    "text": word_text,
                                    "timestamp": (word_start, min(word_end, audio_duration))
                                })
                    else:
                        if start < audio_duration:
                            transcribed_segments.append({
                                "text": text,
                                "timestamp": (start, min(end, audio_duration))
                            })
                
                if not segment_has_content:
                    print(f"No valid content for segment {start_time}-{end_time}, adding placeholder")
                    transcribed_segments.append({
                        "text": "♪ ♫ ♪",
                        "timestamp": (start_time, end_time),
                        "is_placeholder": True
                    })
                    
            except Exception as e:
                print(f"Transcription failed for segment {start_time}-{end_time}: {e}")
                transcribed_segments.append({
                    "text": "♪ ♫ ♪",
                    "timestamp": (start_time, end_time),
                    "is_placeholder": True
                })
                
        except Exception as e:
            print(f"Error processing segment {start_time}-{end_time}: {e}")
            transcribed_segments.append({
                "text": "♪ ♫ ♪",
                "timestamp": (start_time, end_time),
                "is_placeholder": True
            })
        finally:
            try:
                os.remove(segment_path)
            except:
                pass
    
    print(f"→ Extracted {len(transcribed_segments)} segments covering full audio duration")
    return {"chunks": transcribed_segments}


## Segment Creation for Visual Coverage
Here, the notebook converts transcription results into time-aligned visual segments, guaranteeing that every second of audio has a corresponding visual element. It merges overlapping or adjacent text segments and fills gaps with instrumental placeholders. This step is vital for preventing visual gaps in the final video. The result is a comprehensive sequence of segments ready for image generation.

In [7]:
def build_complete_visual_segments(transcription_data, audio_file_path, segment_duration=3.0):
    """
    Ensure every second of audio has corresponding visual content by creating
    time-based segments.
    
    Args:
        transcription_data (dict): Dictionary of transcribed audio chunks.
        audio_file_path (str): Path to the audio file.
        segment_duration (float): Duration of each visual segment in seconds.
    
    Returns:
        list: List of segments with text, start/end times, and instrumental flags.
    """
    print("Building segments for complete visual coverage...")
    
    audio_duration = calculate_audio_length(audio_file_path)
    if audio_duration is None:
        return []
    
    print(f"Audio duration: {audio_duration:.2f} seconds")
    visual_segments = []
    current_time = 0
    chunks = sorted(transcription_data["chunks"], key=lambda x: x["timestamp"][0])
    chunk_index = 0
    
    while current_time < audio_duration:
        segment_end = min(current_time + segment_duration, audio_duration)
        segment_text = ""
        found_content = False
        temp_chunk_index = chunk_index
        
        while temp_chunk_index < len(chunks) and chunks[temp_chunk_index]["timestamp"][0] < segment_end:
            chunk = chunks[temp_chunk_index]
            chunk_start, chunk_end = chunk["timestamp"]
            if chunk_end > current_time:
                segment_text += " " + chunk["text"]
                found_content = True
            temp_chunk_index += 1
        
        while chunk_index < len(chunks) and chunks[chunk_index]["timestamp"][1] <= current_time:
            chunk_index += 1
        
        segment_text = segment_text.strip()
        is_instrumental = not found_content or not segment_text or segment_text == "♪ ♫ ♪"
        segment_text = "♪ ♫ ♪" if is_instrumental else segment_text
        
        visual_segments.append({
            "text": segment_text,
            "start": current_time,
            "end": segment_end,
            "is_instrumental": is_instrumental
        })
        current_time = segment_end
    
    print(f"Created {len(visual_segments)} segments with complete coverage")
    return visual_segments


## Dynamic Image Generation for Visuals
In this section, the function generates unique and varied images for each visual segment using Stable Diffusion. It leverages different prompts, styles, and color schemes to avoid repetitive visuals. Instrumental sections receive abstract or music-themed art, while lyric segments are illustrated based on their content. This approach enhances the visual diversity and appeal of the lyric video.

In [8]:
def produce_varied_visual_images(segment_data, output_directory, model_id="runwayml/stable-diffusion-v1-5"):
    """
    Generate diverse images for each segment using Stable Diffusion to avoid repetitive visuals.
    
    Args:
        segment_data (list): List of segment dictionaries with text and timing.
        output_directory (str): Directory to save generated images.
        model_id (str): Identifier for the Stable Diffusion model.
    
    Returns:
        list: Updated segment data with paths to generated images.
    """
    print("Generating varied visual images...")
    
    diffusion_pipeline = StableDiffusionPipeline.from_pretrained(
        model_id,
        torch_dtype=torch.float16,
        cache_dir=MODEL_STORAGE_DIRECTORY
    )
    diffusion_pipeline.safety_checker = None
    diffusion_pipeline = diffusion_pipeline.to("cuda")
    diffusion_pipeline.enable_attention_slicing()
    
    instrumental_prompts = [
        "Abstract music visualization, flowing sound waves, vibrant colors",
        "Musical energy patterns, dynamic rhythm visualization",
        "Sound frequency visualization, colorful audio waves",
        "Music beats in visual form, pulsing lights and colors",
        "Abstract representation of musical harmony, flowing patterns",
        "Rhythmic light patterns, musical atmosphere",
        "Dynamic color flows representing music energy",
        "Cosmic music visualization, stars and nebulae dancing",
        "Electric music waves, neon colors pulsing",
        "Fluid art representing musical emotions",
        "Geometric patterns synchronized with music beats",
        "Particle effects dancing to rhythm",
        "Crystalline structures resonating with sound",
        "Fire and water elements dancing to music",
        "Aurora borealis patterns following musical rhythm",
        "Digital matrix effects with musical flow"
    ]
    
    text_styles = [
        "artistic, vibrant, emotional", "cinematic, dramatic, colorful",
        "dreamy, ethereal, beautiful", "energetic, dynamic, powerful",
        "serene, peaceful, flowing", "bold, striking, vivid",
        "mystical, magical, enchanting", "futuristic, sci-fi, glowing",
        "romantic, soft, warm", "intense, passionate, fiery"
    ]
    
    color_schemes = [
        "blue and purple tones", "warm orange and red hues",
        "cool green and teal colors", "golden and yellow shades",
        "pink and magenta tones", "silver and white highlights"
    ]
    
    for index, segment in enumerate(tqdm(segment_data, desc="Generating varied images")):
        if segment.get("is_instrumental", False):
            base_prompt = instrumental_prompts[index % len(instrumental_prompts)]
            color_theme = color_schemes[index % len(color_schemes)]
            prompt = f"{base_prompt}, {color_theme}"
        else:
            words = segment['text'].split()
            clean_text = ' '.join(words[:4])
            style = text_styles[index % len(text_styles)]
            color_theme = color_schemes[index % len(color_schemes)]
            prompt = f"Scene: '{clean_text}', {style}, {color_theme}"
        
        seed = (hash(prompt) + index * 1337 + int(segment["start"]) * 42) % 10000
        generator = torch.Generator(device="cuda").manual_seed(seed)
        
        try:
            image = diffusion_pipeline(
                prompt,
                num_inference_steps=25,
                height=512,
                width=512,
                generator=generator
            ).images[0]
        except Exception as e:
            print(f"Error generating image {index}: {e}")
            fallback_seed = (index * 999) % 10000
            fallback_generator = torch.Generator(device="cuda").manual_seed(fallback_seed)
            image = diffusion_pipeline(
                "Abstract colorful art",
                num_inference_steps=20,
                height=512,
                width=512,
                generator=fallback_generator
            ).images[0]
        
        if image.mode != "RGB":
            image = image.convert("RGB")
        
        image_path = os.path.join(output_directory, f"visual_image_{index:03d}.jpg")
        image.save(image_path, "JPEG", quality=90)
        segment["image_path"] = image_path
        
        if index % 5 == 0:
            torch.cuda.empty_cache()
            gc.collect()
    
    del diffusion_pipeline
    torch.cuda.empty_cache()
    gc.collect()
    
    return segment_data


## Text Overlay Image Creation
This part renders lyric or placeholder text as transparent images using the Python Imaging Library (PIL). The overlays are styled for readability and aesthetics, with options for font size and color. Text is automatically wrapped and centered to fit the video frame. These overlays are later composited onto the generated visuals for the final video.

In [9]:
def render_text_overlay(text_content, image_width, image_height, font_size=48, text_color=(255, 255, 255)):
    """
    Render text as an image overlay using PIL for video composition.
    
    Args:
        text_content (str): Text to render.
        image_width (int): Width of the output image.
        image_height (int): Height of the output image.
        font_size (int): Size of the font for text rendering.
        text_color (tuple): RGB color tuple for the text.
    
    Returns:
        numpy.ndarray: Array representing the text image with transparency.
    """
    overlay_image = Image.new('RGBA', (image_width, image_height), (0, 0, 0, 0))
    draw_context = ImageDraw.Draw(overlay_image)
    
    try:
        text_font = ImageFont.truetype("DejaVuSans-Bold.ttf", font_size)
    except:
        text_font = ImageFont.load_default()
    
    if not text_content or text_content.strip() == "":
        text_content = " "
    
    words = text_content.split()
    text_lines = []
    current_line = ""
    
    for word in words:
        test_line = current_line + " " + word if current_line else word
        try:
            text_box = draw_context.textbbox((0, 0), test_line, font=text_font)
            text_width = text_box[2] - text_box[0]
        except:
            text_width = len(test_line) * font_size * 0.6
        
        if text_width <= image_width - 40:
            current_line = test_line
        else:
            if current_line:
                text_lines.append(current_line)
            current_line = word
    
    if current_line:
        text_lines.append(current_line)
    
    line_height = font_size + 10
    total_height = len(text_lines) * line_height
    start_y = (image_height - total_height) // 2
    
    for i, line_text in enumerate(text_lines):
        try:
            text_box = draw_context.textbbox((0, 0), line_text, font=text_font)
            text_width = text_box[2] - text_box[0]
        except:
            text_width = len(line_text) * font_size * 0.6
        
        x_position = (image_width - text_width) // 2
        y_position = start_y + i * line_height
        
        outline_width = 2
        for offset_x in range(-outline_width, outline_width + 1):
            for offset_y in range(-outline_width, outline_width + 1):
                if abs(offset_x) + abs(offset_y) <= outline_width:
                    draw_context.text(
                        (x_position + offset_x, y_position + offset_y),
                        line_text, font=text_font, fill=(0, 0, 0)
                    )
        
        draw_context.text(
            (x_position, y_position), line_text, font=text_font, fill=text_color
        )
    
    return np.array(overlay_image)


## Video Assembly with Continuous Playback
This section brings together all generated images, text overlays, and the original audio track to create a seamless lyric video. Using MoviePy, it ensures that each segment is synchronized and that there are no gaps in playback. The assembly process includes robust error handling for missing or corrupt files. The final output is a professionally rendered video file ready for sharing or further editing.

In [10]:
def assemble_continuous_lyric_video(segment_data, audio_file_path, output_video_path):
    """
    Assemble a continuous lyric video ensuring no gaps in playback.
    
    Args:
        segment_data (list): List of segments with text, timing, and image paths.
        audio_file_path (str): Path to the audio file.
        output_video_path (str): Path to save the final video.
    
    Returns:
        str: Path to the generated video file, or None if an error occurs.
    """
    print("Assembling continuous lyric video...")
    
    audio_track = AudioFileClip(audio_file_path)
    total_duration = audio_track.duration
    print(f"Creating video for full duration of {total_duration:.2f} seconds")
    
    video_components = []
    
    for index, segment in enumerate(segment_data):
        try:
            segment_duration = segment["end"] - segment["start"]
            start_time = segment["start"]
            if segment_duration < 0.5:
                segment_duration = 0.5
            
            display_text = segment["text"] if segment["text"] else "♪"
            is_instrumental = segment.get("is_instrumental", False)
            text_color = (255, 215, 0) if is_instrumental else (255, 255, 255)
            font_size = 56 if is_instrumental else 48
            text_image = render_text_overlay(display_text, 512, 512, font_size, text_color)
            
            try:
                text_component = ImageClip(text_image).with_duration(segment_duration)
            except AttributeError:
                text_component = ImageClip(text_image).set_duration(segment_duration)
            
            if "image_path" in segment and os.path.exists(segment["image_path"]):
                try:
                    background_component = ImageClip(segment["image_path"]).with_duration(segment_duration)
                except AttributeError:
                    background_component = ImageClip(segment["image_path"]).set_duration(segment_duration)
            else:
                color_options = [(20, 20, 40), (40, 20, 20), (20, 40, 20), (40, 40, 20), (40, 20, 40), (20, 40, 40)]
                bg_color = color_options[index % len(color_options)]
                try:
                    background_component = ColorClip(size=(512, 512), color=bg_color).with_duration(segment_duration)
                except AttributeError:
                    background_component = ColorClip(size=(512, 512), color=bg_color).set_duration(segment_duration)
            
            combined_component = CompositeVideoClip([background_component, text_component])
            try:
                combined_component = combined_component.with_start(start_time)
            except AttributeError:
                combined_component = combined_component.set_start(start_time)
            
            video_components.append(combined_component)
        except Exception as e:
            print(f"Error processing segment {index}: {e}")
            try:
                fallback_bg = ColorClip(size=(512, 512), color=(50, 50, 50)).set_duration(1.0).set_start(start_time)
                video_components.append(fallback_bg)
            except:
                pass
            continue
    
    if not video_components:
        print("Critical error: No video components created. Generating fallback video...")
        fallback_clip = ColorClip(size=(512, 512), color=(50, 50, 50)).set_duration(total_duration)
        video_components = [fallback_clip]
    
    try:
        try:
            base_background = ColorClip(size=(512, 512), color=(0, 0, 0)).with_duration(total_duration)
        except AttributeError:
            base_background = ColorClip(size=(512, 512), color=(0, 0, 0)).set_duration(total_duration)
        
        all_components = [base_background] + video_components
        final_composition = CompositeVideoClip(all_components, size=(512, 512))
        
        try:
            final_composition = final_composition.with_audio(audio_track)
            final_composition = final_composition.with_duration(total_duration)
        except AttributeError:
            final_composition = final_composition.set_audio(audio_track)
            final_composition = final_composition.set_duration(total_duration)
        
        print("Rendering final video...")
        final_composition.write_videofile(
            output_video_path,
            fps=24,
            codec='libx264',
            audio_codec='aac',
            bitrate="2000k",
            audio_bitrate="192k",
            temp_audiofile=os.path.join(TEMPORARY_DIRECTORY, "temp_audio.m4a"),
            remove_temp=True,
            logger=None,
            threads=2
        )
        
        final_composition.close()
        audio_track.close()
        print(f"Video successfully created at: {output_video_path}")
        return output_video_path
    except Exception as e:
        print(f"Error assembling video: {e}")
        import traceback
        traceback.print_exc()
        return None


## Main Pipeline for Lyric Video Generation
The main pipeline orchestrates the entire lyric video creation process, coordinating transcription, segmentation, image generation, and video assembly. It provides a single entry point for users to generate a complete lyric video with minimal manual intervention. The pipeline manages all intermediate steps and handles errors.

In [11]:
def orchestrate_lyric_video_pipeline(audio_file_path, output_video_path=None, diffusion_model_id="runwayml/stable-diffusion-v1-5"):
    """
    Orchestrate the complete pipeline for generating a continuous lyric video with guaranteed coverage.
    
    Args:
        audio_file_path (str): Path to the input audio file.
        output_video_path (str, optional): Path to save the final video. If None, derived from audio file name.
        diffusion_model_id (str): Identifier for the Stable Diffusion model to use for image generation.
    
    Returns:
        str: Path to the generated video file, or None if the process fails.
    """
    try:
        os.makedirs(TEMPORARY_DIRECTORY, exist_ok=True)
        
        if not os.path.exists(audio_file_path):
            print(f"Error: Audio file not found at {audio_file_path}")
            return None
        
        if output_video_path is None:
            base_name = os.path.basename(audio_file_path)
            file_name = os.path.splitext(base_name)[0]
            output_video_path = str(OUTPUT_DIRECTORY / f"{file_name}_continuous_lyric_video.mp4")
        
        print(f"Generating lyric video for: {audio_file_path}")
        
        # Step 1: Transcribe audio with full coverage and anti-hallucination measures
        transcription_data = transcribe_audio_ensured_coverage(audio_file_path, segment_length=10)
        
        # Step 2: Build visual segments ensuring complete audio coverage
        visual_segments = build_complete_visual_segments(transcription_data, audio_file_path, segment_duration=3.0)
        
        if not visual_segments:
            print("Critical error: No visual segments created. Generating fallback coverage...")
            audio_duration = calculate_audio_length(audio_file_path)
            visual_segments = [{
                "text": "♪ ♫ ♪",
                "start": 0,
                "end": audio_duration,
                "is_instrumental": True
            }]
        
        # Step 3: Generate dynamic images for each segment with visual variety
        visual_segments = produce_varied_visual_images(visual_segments, TEMPORARY_DIRECTORY, diffusion_model_id)
        
        # Step 4: Assemble the final continuous lyric video
        final_video_path = assemble_continuous_lyric_video(visual_segments, audio_file_path, output_video_path)
        
        return final_video_path
        
    except Exception as e:
        print(f"Error in lyric video pipeline: {e}")
        import traceback
        traceback.print_exc()
        return None


In [None]:
def execute_lyric_video_demo(audio_file_path=DEFAULT_AUDIO_TRACK, output_video_path=None):
    """
    Execute a demo of the lyric video generator with full coverage.
    
    Args:
        audio_file_path (str): Path to the input audio file. Defaults to a predefined track.
        output_video_path (str, optional): Path to save the final video. If None, derived from audio file name.
    
    Returns:
        str: Path to the generated video file, or None if the process fails.
    """
    if not os.path.exists(audio_file_path):
        print(f"Error: Audio file not found at {audio_file_path}")
        return None
    
    print(f"Processing full audio file with guaranteed coverage: {audio_file_path}")
    
    if output_video_path is None:
        base_name = os.path.basename(audio_file_path)
        file_name = os.path.splitext(base_name)[0]
        output_video_path = str(OUTPUT_DIRECTORY / f"{file_name}_continuous_lyric_video.mp4")
    
    result = orchestrate_lyric_video_pipeline(
        audio_file_path=audio_file_path,
        output_video_path=output_video_path,
        diffusion_model_id="runwayml/stable-diffusion-v1-5"
    )
    
    if result:
        print(f"Success! lyric video saved to: {result}")
        return result
    else:
        print("Video generation process failed!")
        return None


clear_temporary_files()
    
# Run the demo with the default or specified audio track
video_output = execute_lyric_video_demo()
    
clear_temporary_files()