# README

# VEO Video Pipeline: Sequential Animation Tool for Google Vertex AI

[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)
[![Python Version](https://img.shields.io/badge/python-3.8+-blue.svg)](https://www.python.org/downloads/)
[![Code Style: PEP-8](https://img.shields.io/badge/code%20style-PEP--8-orange.svg)](https://www.python.org/dev/peps/pep-0008/)

**Repository:** [https://github.com/chirindaopensource/veo-video-pipeline](https://github.com/chirindaopensource/veo-video-pipeline)  
**Author:** 2025 Craig Chirinda (Open Source Projects)

## Overview

The VEO Video Pipeline is a Python-based tool designed to orchestrate the generation of sequential video animations using Google's state-of-the-art Video Generation Model (VEO) via the Vertex AI API. This project provides a robust, cost-efficient, and extensible framework for transforming a series of text prompts, optionally seeded by an initial reference image, into a cohesive video narrative. The entire pipeline is encapsulated within a single iPython Notebook (`veo2_animation_tool.ipynb`) for ease of use, experimentation, and modification.

This tool is particularly useful for creators, researchers, and developers looking to:
*   Generate short video clips from text prompts.
*   Create longer animated sequences by chaining multiple VEO generations.
*   Leverage an initial image to guide the style and content of the first video segment.
*   Automate the process of using the last frame of a generated segment as the reference for the next, ensuring visual continuity.

The implementation prioritizes robustness through comprehensive error handling, retry mechanisms for API calls, and efficient resource management, including memory-efficient image processing and video stitching.

## Key Features

*   **Sequential Video Generation:** Generates multiple video segments based on an ordered list of prompts.
*   **Image-to-Video & Video-to-Video:**
    *   Initiates the sequence with a user-provided reference image.
    *   Automatically uses the last frame of the previously generated segment as the reference for the next, enabling chained animations.
*   **Google VEO Model Integration:** Utilizes the `video-generation-001` model via the synchronous Vertex AI API.
*   **Robust API Interaction:** Implements `tenacity`-based exponential backoff and retry logic for transient network or API errors.
*   **Cost-Conscious Design:**
    *   Optimized image encoding (JPEG) to reduce payload sizes.
    *   Recommends appropriate video dimensions.
    *   Handles GCS downloads efficiently.
*   **Efficient Video Stitching:** Uses `ffmpeg` (via `subprocess`) for fast, lossless concatenation of video segments.
*   **Configuration Management:** Centralized configuration (`VEOConfig`) for model parameters, API endpoints, and retry settings.
*   **Error Handling:** Custom exceptions (`VEOVideoGenerationError`) for pipeline-specific issues.
*   **Standardized Logging:** Clear and informative logging for monitoring and debugging.
*   **Self-Contained Notebook:** All logic, including helper functions and the main pipeline class, is within `veo2_animation_tool.ipynb`.

## How It Works (High-Level)

1.  **Initialization & Authentication:** Sets up Google Cloud credentials and project details.
2.  **Input Validation:** Checks the validity of the reference image path, scene prompts, and output folder name.
3.  **Initial Reference:** Loads the user-provided reference image, resizes it to VEO-compatible dimensions, and encodes it to Base64.
4.  **Iterative Segment Generation:** For each prompt in the sequence:
    *   Constructs a request payload including the prompt and the current reference image data (either the initial image or the last frame of the previous segment).
    *   Calls the Vertex AI VEO API to generate a video segment.
    *   Handles the API response, which may contain video bytes directly or a GCS URI for download.
    *   Saves the generated segment locally.
    *   If it's not the last prompt, extracts the last frame of the newly generated segment, resizes, and encodes it to Base64 to serve as the reference for the next iteration.
5.  **Video Stitching:** Once all segments are generated, they are concatenated into a single MP4 file using `ffmpeg`.
6.  **Output:** The final stitched video and individual segments are saved to a specified output directory in the user's home folder.

## Prerequisites

Before running the notebook, ensure you have the following:

1.  **Python:** Version 3.8 or higher.
2.  **Google Cloud Platform (GCP) Account:**
    *   A GCP Project with billing enabled.
    *   The **Vertex AI API** enabled for your project.
3.  **`gcloud` CLI:** Google Cloud Command Line Interface installed and configured.
    *   Authenticate by running: `gcloud auth application-default login`
4.  **`ffmpeg`:** The `ffmpeg` command-line tool must be installed and accessible in your system's PATH.
    *   Download from [ffmpeg.org](https://ffmpeg.org/download.html).
5.  **Jupyter Notebook or JupyterLab:** To run the `.ipynb` file.

## Installation & Setup

1.  **Clone the Repository:**
    ```bash
    git clone https://github.com/chirindaopensource/veo-video-pipeline.git
    cd veo-video-pipeline
    ```

2.  **Create a Virtual Environment (Recommended):**
    ```bash
    python -m venv veo_env
    source veo_env/bin/activate  # On Windows: veo_env\Scripts\activate
    ```

3.  **Install Python Dependencies:**
    The necessary Python packages are listed in the import section of the notebook. You can install them using pip:
    ```bash
    pip install google-cloud-aiplatform google-cloud-storage opencv-python Pillow requests tenacity numpy jupyterlab
    ```
    (Consider creating a `requirements.txt` file for easier dependency management in future project iterations).

4.  **Verify `ffmpeg` Installation:**
    Open your terminal and type:
    ```bash
    ffmpeg -version
    ```
    If it's installed correctly, you'll see version information. If not, please install it and ensure it's in your system PATH.

5.  **Set Google Cloud Project ID:**
    The script will attempt to auto-detect your GCP Project ID. However, it's best practice to set it explicitly via an environment variable:
    ```bash
    export GOOGLE_CLOUD_PROJECT="your-gcp-project-id"
    ```
    Alternatively, you can pass it as an argument when calling `create_veo_video_pipeline` within the notebook.

## Configuration

The primary configuration happens within the `veo2_animation_tool.ipynb` notebook, specifically in the example usage block (`if __name__ == "__main__":` or a similar cell designed for execution).

Key parameters to configure:

*   `reference_image_path` (str): Absolute or relative path to your initial reference image (e.g., `.jpg`, `.png`). A dummy image will be created if not found, but using your own is recommended.
*   `scene_prompts` (List[str]): A list of text prompts, where each prompt describes a scene. The videos will be generated in the order of these prompts.
*   `output_folder_name` (str): Name of the folder to be created in your user's home directory to store generated segments and the final video.
*   `project_id` (Optional[str]): Your Google Cloud Project ID. If `None`, the pipeline attempts to auto-detect it or use the `GOOGLE_CLOUD_PROJECT` environment variable.

Advanced configuration options (e.g., video length, FPS, API endpoints) are managed within the `VEOConfig` class and can be modified directly in the notebook if needed, though the defaults are generally sensible.

## Usage

1.  **Launch JupyterLab or Jupyter Notebook:**
    Navigate to the cloned repository directory in your terminal and run:
    ```bash
    jupyter lab
    # or
    # jupyter notebook
    ```

2.  **Open the Notebook:**
    In the Jupyter interface, open `veo2_animation_tool.ipynb`.

3.  **Configure Parameters:**
    Locate the main execution cell (typically at the end of the notebook, often guarded by an `if __name__ == "__main__":` equivalent for notebooks or a clearly marked "Run" cell).
    Modify the `example_params` dictionary with your desired `reference_image_path`, `scene_prompts`, and `output_folder_name`.

    ```python
    # Example configuration within the notebook:
    example_params = {
        "reference_image_path": "path/to/your/image.jpg",
        "scene_prompts": [
            "A futuristic cityscape at dusk, neon lights reflecting on wet streets.",
            "A sleek flying car zips through the city canyons.",
            "The car lands on a skyscraper rooftop, overlooking the sprawling metropolis."
        ],
        "output_folder_name": "my_veo_animation",
        # "project_id": "your-gcp-project-id"  # Optional: uncomment and set if needed
    }
    ```

4.  **Run the Notebook:**
    Execute the cells in the notebook sequentially, or use "Run All Cells." The video generation process can take several minutes per segment, depending on the VEO model's load and video length. Monitor the logging output for progress.

5.  **Locate Output:**
    Upon successful completion, the individual video segments and the final stitched video (`final_stitched_video.mp4`) will be saved in `~/output_folder_name/` (e.g., `~/my_veo_animation/`).

## Core Components (within `veo2_animation_tool.ipynb`)

*   **`VEOConfig` (class):** A centralized configuration class holding constants like API model ID, default video parameters, retryable status codes, etc.
*   **`VEOVideoGenerationError` (class):** Custom exception for pipeline-specific errors.
*   **`VEOVideoPipeline` (class):** The main class orchestrating the video generation workflow.
    *   `__init__(...)`: Initializes authentication and GCP settings.
    *   `_initialize_authentication()`: Handles Google Cloud ADC.
    *   `_validate_inputs(...)`: Validates user-provided parameters.
    *   `_load_and_encode_image(...)`: Loads, resizes (preserving aspect ratio for VEO's recommended dimensions), and Base64 encodes images.
    *   `_generate_video_segment(...)`: Makes the API call to Vertex AI VEO, with retries.
    *   `_download_video_from_gcs(...)`: Downloads video if API returns a GCS URI.
    *   `_extract_last_frame(...)`: Uses OpenCV to get the last frame of a video.
    *   `_frame_to_base64(...)`: Converts an OpenCV frame to a Base64 encoded string for the next API call.
    *   `_stitch_videos_ffmpeg(...)`: Concatenates video segments using `ffmpeg`.
    *   `generate_sequential_videos(...)`: The main public method that executes the entire pipeline.
*   **`create_veo_video_pipeline(...)` (function):** A factory function providing a simpler interface to instantiate and run the `VEOVideoPipeline`.
*   **Helper Functions:** Such as `_is_retryable_http_error` for custom retry logic.

## Cost Considerations

Generating videos with Google's VEO model via Vertex AI incurs costs. Please refer to the [Vertex AI pricing page](https://cloud.google.com/vertex-ai/pricing) for the most up-to-date information on "Generative AI models" or "Multimodal models."

This pipeline uses the synchronous API, which might have different pricing characteristics than batch prediction. Costs are typically based on the duration of the video generated.

This tool attempts to be cost-efficient by:
*   Using recommended image dimensions to avoid unnecessary upscaling/downscaling by the model.
*   Encoding images to JPEG with reasonable quality to reduce request payload sizes.
*   Performing frame extraction and video stitching locally, avoiding additional cloud service costs for these tasks.

**Monitor your GCP billing dashboard regularly when using this tool.**

## Troubleshooting

*   **Authentication Errors:**
    *   Ensure you've run `gcloud auth application-default login`.
    *   Verify the `GOOGLE_CLOUD_PROJECT` environment variable is set correctly or passed to the pipeline.
    *   Check if the Vertex AI API is enabled in your GCP project.
*   **`ffmpeg: command not found`:**
    *   `ffmpeg` is not installed or not in your system's PATH. Revisit the installation steps.
*   **API Quota Errors (e.g., HTTP 429):**
    *   You might be hitting API rate limits or project quotas. The retry logic will attempt to handle transient 429s, but persistent issues may require requesting a quota increase from Google Cloud.
*   **`FileNotFoundError` for reference image:**
    *   Double-check the `reference_image_path` provided.
*   **Long Generation Times:**
    *   Video generation is computationally intensive. Longer videos or higher FPS settings will take more time. The default is 4 seconds at 24 FPS.
*   **Permission Denied (Output Directory):**
    *   Ensure your user has write permissions to their home directory, where the output folder is created.

## Contributing

Contributions are welcome! If you have improvements, bug fixes, or new features you'd like to add, please follow these steps:

1.  **Fork the repository.**
2.  **Create a new branch** for your feature or fix: `git checkout -b feature/your-feature-name` or `git checkout -b fix/your-bug-fix`.
3.  **Make your changes** within the `veo2_animation_tool.ipynb` notebook. Ensure your code adheres to PEP-8 standards where applicable and is well-commented.
4.  **Test your changes thoroughly.**
5.  **Commit your changes:** `git commit -m "Add concise description of your changes"`.
6.  **Push to your forked repository:** `git push origin feature/your-feature-name`.
7.  **Open a Pull Request** to the `main` branch of `chirindaopensource/veo-video-pipeline`. Provide a clear description of your changes and why they are needed.

Please also feel free to open an issue for bug reports or feature requests.

## License

This project is licensed under the **MIT License**. See the [LICENSE](LICENSE) file for details.

# Imports

In [None]:
# Standard Library Imports
import os
import sys
import time
import json
import base64
import logging
import shutil
import subprocess
import io
from pathlib import Path
from typing import List, Dict, Any, Optional, Tuple

# Third-party Imports, Grouped for Clarity
import cv2
import numpy as np
from numpy.typing import NDArray
from PIL import Image
import requests
from google.auth import default, exceptions as auth_exceptions
from google.auth.transport.requests import Request
from google.cloud import storage
from google.api_core import exceptions as api_core_exceptions
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type


# Configuration

In [None]:
# Logging Configuration
# Configure a logger for clear, standardized output for debugging and monitoring.
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Instantiate the logger using the standard __name__ convention.
logger = logging.getLogger(__name__)


# Centralized Configuration Class
class VEOConfig:
    """
    A centralized, immutable configuration class for VEO API constants.

    This provides a single source of truth for static parameters, improving
    maintainability and preventing the use of "magic" strings or numbers
    in the pipeline logic.
    """
    # The required OAuth scope for authenticating with the Vertex AI API.
    AUTH_SCOPE: List[str] = ["https://www.googleapis.com/auth/cloud-platform"]
    # The official, versioned model identifier for Google's VEO model.
    MODEL_ID: str = "video-generation-001"
    # The standard Vertex AI API method for synchronous (blocking) predictions.
    API_METHOD: str = "predict"
    # A generous timeout for the synchronous API call to allow for video generation.
    API_TIMEOUT_SECONDS: int = 900
    # The set of HTTP status codes that indicate transient server-side issues safe to retry.
    RETRYABLE_STATUS_CODES: set[int] = {429, 500, 502, 503, 504}
    # Recommended VEO dimensions for landscape videos (width, height).
    LANDSCAPE_DIMS: Tuple[int, int] = (1024, 576)
    # Recommended VEO dimensions for portrait videos (width, height).
    PORTRAIT_DIMS: Tuple[int, int] = (576, 1024)
    # Default video generation parameters.
    DEFAULT_VIDEO_LENGTH: str = "4s"
    DEFAULT_FPS: int = 24
    # JPEG quality for encoding reference frames. 95 is a high-quality setting.
    JPEG_ENCODING_QUALITY: int = 95


# --- Custom Exception for Pipeline-Specific Errors ---
class VEOVideoGenerationError(Exception):
    """Custom exception for specific, identifiable errors within the VEO video generation pipeline."""
    pass


# --- Robust Retry Logic for API Calls ---
def _is_retryable_http_error(exception: BaseException) -> bool:
    """
    Custom tenacity retry condition to check for specific HTTP status codes.

    This function determines if a caught exception is a requests.HTTPError with a
    status code that is defined as retryable in VEOConfig.

    Args:
        exception (BaseException): The exception caught by tenacity.

    Returns:
        bool: True if the exception is an HTTPError with a retryable status code, False otherwise.
    """
    # Return True only if the exception is an HTTPError and its status code is in our retryable set.
    return (
        isinstance(exception, requests.exceptions.HTTPError) and
        hasattr(exception, 'response') and
        exception.response.status_code in VEOConfig.RETRYABLE_STATUS_CODES
    )

# Define a reusable, robust retry strategy for API calls using the tenacity library.
retry_on_api_error = retry(
    # Use exponential backoff, starting at 4s and maxing out at 60s between retries.
    wait=wait_exponential(multiplier=1, min=4, max=60),
    # Stop retrying after 10 attempts to prevent indefinite loops and fail gracefully.
    stop=stop_after_attempt(10),
    # Define the conditions for retrying: network errors or specific HTTP server errors.
    retry=retry_if_exception_type((
        requests.exceptions.ConnectionError,
        requests.exceptions.Timeout,
    )) | _is_retryable_http_error,
    # Log a warning before each retry attempt for visibility into transient failures.
    before_sleep=lambda retry_state: logger.warning(
        f"Retrying API call due to {retry_state.outcome.exception()}. "
        f"Attempt #{retry_state.attempt_number}, waiting {retry_state.next_action.sleep:.2f}s..."
    )
)


# Implementation


In [None]:
# Main Class

class VEOVideoPipeline:
    """
    A professional, implementation-grade class for Google's VEO video generation.

    This class orchestrates the complete workflow of generating sequential videos using
    Google's VEO model via its synchronous API. It handles authentication, request
    building, response parsing, frame extraction, and memory-efficient video stitching.
    """

    def __init__(self, project_id: Optional[str] = None, location: str = "us-central1"):
        """
        Initializes the VEO video generation pipeline.

        Args:
            project_id (Optional[str]): Your Google Cloud project ID. If None, it will be
                                        auto-detected from the environment.
            location (str): The Google Cloud region for Vertex AI. Defaults to us-central1.

        Raises:
            VEOVideoGenerationError: If initialization or authentication fails.
            ValueError: If input parameters are invalid.
        """
        # Validate that the location parameter is a non-empty string.
        if not isinstance(location, str) or not location:
            raise ValueError("location must be a non-empty string")

        # Store the project ID and location as instance-specific attributes.
        self.project_id: Optional[str] = project_id
        self.location: str = location
        # Construct the base API endpoint URL for Vertex AI.
        self.api_endpoint: str = f"https://{location}-aiplatform.googleapis.com"

        # Perform authentication immediately upon initialization to fail fast.
        self._initialize_authentication()

        # After attempting auto-detection, confirm that a project_id is set.
        if not self.project_id:
            raise VEOVideoGenerationError(
                "Google Cloud project_id could not be determined. "
                "Please provide it explicitly or configure the environment."
            )

        # Log successful initialization for monitoring purposes.
        logger.info(f"VEOVideoPipeline initialized for project: {self.project_id}")

    def _initialize_authentication(self) -> None:
        """
        Initializes Google Cloud authentication using Application Default Credentials (ADC).

        This method explicitly requests the necessary scope for Vertex AI, ensuring
        the credentials obtained are valid for making API calls.

        Raises:
            VEOVideoGenerationError: If authentication fails for any reason.
        """
        try:
            # Use google.auth.default to find credentials, requesting the correct scope from VEOConfig.
            self.credentials, detected_project_id = default(scopes=VEOConfig.AUTH_SCOPE)

            # If the project_id was not provided during initialization, use the one detected by ADC.
            if self.project_id is None:
                self.project_id = detected_project_id

        except auth_exceptions.DefaultCredentialsError as e:
            # Provide a user-friendly error if credentials are not found in the environment.
            raise VEOVideoGenerationError(
                "Failed to find default credentials. Please run "
                "'gcloud auth application-default login' or configure the environment."
            ) from e
        except Exception as e:
            # Catch any other unexpected authentication errors and wrap them in our custom exception.
            raise VEOVideoGenerationError(f"An unexpected error occurred during authentication: {e}") from e

    def _validate_inputs(
        self,
        reference_image_path: str,
        scene_prompts: List[str],
        output_folder_name: str
    ) -> None:
        """Validates all user-provided inputs for the video generation pipeline."""
        # Validate the path to the reference image is a non-empty string.
        if not isinstance(reference_image_path, str) or not reference_image_path:
            raise ValueError("reference_image_path must be a non-empty string.")
        # Validate the reference image file actually exists.
        if not os.path.exists(reference_image_path):
            raise FileNotFoundError(f"Reference image file not found: {reference_image_path}")

        # Validate that scene_prompts is a non-empty list.
        if not isinstance(scene_prompts, list) or not scene_prompts:
            raise ValueError("scene_prompts must be a non-empty list of strings.")
        # Validate that every item in the list is a non-empty, non-whitespace string.
        if not all(isinstance(prompt, str) and prompt.strip() for prompt in scene_prompts):
            raise ValueError("All items in scene_prompts must be non-empty strings.")

        # Validate the name for the output folder is a non-empty string.
        if not isinstance(output_folder_name, str) or not output_folder_name:
            raise ValueError("output_folder_name must be a non-empty string.")

    def _create_output_directory(self, folder_name: str) -> Path:
        """Creates a subdirectory in the user's home directory for storing outputs."""
        try:
            # Use Path.home() for robust, cross-platform path resolution to the user's home directory.
            home_dir = Path.home()
            # Define the full path for the output directory.
            output_dir = home_dir / folder_name
            # Create the directory, including parent directories if needed, and set standard permissions.
            output_dir.mkdir(mode=0o755, parents=True, exist_ok=True)
            # Log the successful creation or confirmation of the directory.
            logger.info(f"Output directory ensured at: {output_dir}")
            return output_dir
        except OSError as e:
            # Raise a custom error if directory creation fails due to OS-level issues.
            raise VEOVideoGenerationError(f"Failed to create output directory '{folder_name}': {e}") from e

    def _load_and_encode_image(self, image_path: str) -> str:
        """Loads an image, resizes it for VEO, and encodes it to a base64 string."""
        try:
            # Open the image file using a context manager to ensure it's properly closed.
            with Image.open(image_path) as img:
                # Convert the image to RGB to remove any alpha channel (e.g., from PNGs), ensuring compatibility.
                if img.mode != 'RGB':
                    img = img.convert('RGB')

                # Resize the image to VEO's recommended dimensions while preserving aspect ratio.
                img = self._resize_image_for_veo(img)

                # Use an in-memory buffer to avoid writing a temporary file to disk, which is more efficient.
                buffer = io.BytesIO()

                # Save the processed image to the buffer in JPEG format for efficiency and smaller payload size.
                img.save(buffer, format='JPEG', quality=VEOConfig.JPEG_ENCODING_QUALITY)

                # Get the raw byte value from the buffer.
                image_bytes = buffer.getvalue()

                # Encode the image bytes to a base64 string and decode to utf-8 for JSON serialization.
                base64_encoded = base64.b64encode(image_bytes).decode('utf-8')

                # Log the successful operation.
                logger.info(f"Successfully loaded and encoded image: {image_path}")
                return base64_encoded

        except FileNotFoundError:
            # Raise a specific error if the file does not exist.
            raise VEOVideoGenerationError(f"Image file not found at path: {image_path}")
        except Exception as e:
            # Wrap any other image processing errors in our custom exception.
            raise VEOVideoGenerationError(f"Failed to load and encode image '{image_path}': {e}") from e

    def _resize_image_for_veo(self, img: Image.Image) -> Image.Image:
        """Resizes a PIL Image to VEO recommended dimensions, preserving aspect ratio."""
        # Get the current width and height of the image.
        width, height = img.size
        # Calculate the aspect ratio to determine if the image is landscape, portrait, or square.
        aspect_ratio = width / height

        # Set target dimensions from VEOConfig based on whether the image is landscape (or square) or portrait.
        target_dims = VEOConfig.LANDSCAPE_DIMS if aspect_ratio >= 1 else VEOConfig.PORTRAIT_DIMS

        # Resize the image using Lanczos resampling, which provides high-quality downscaling results.
        resized_img = img.resize(target_dims, Image.Resampling.LANCZOS)

        return resized_img

    def _build_request_body(self, prompt: str, image_data: Optional[str]) -> Dict[str, Any]:
        """Constructs the request body according to the official VEO API specification."""
        # Define the instance payload, which contains the core prompt.
        instance_payload = {"prompt": prompt}

        # If reference image data is provided (for the first or subsequent segments), add it to the payload.
        if image_data:
            instance_payload["reference_image"] = {"image_bytes": image_data}

        # Construct the full request body with instances and parameters from VEOConfig.
        request_body = {
            "instances": [instance_payload],
            "parameters": {
                "video_length": VEOConfig.DEFAULT_VIDEO_LENGTH,
                "fps": VEOConfig.DEFAULT_FPS,
                "seed": np.random.randint(0, 2**32 - 1) # Use a random seed for creative variety.
            }
        }
        return request_body

    @retry_on_api_error
    def _generate_video_segment(self, prompt: str, image_data: Optional[str]) -> Dict[str, Any]:
        """
        Generates a single video segment using the synchronous VEO API with robust retries.

        Args:
            prompt (str): The text prompt for the video segment.
            image_data (Optional[str]): Base64 encoded image data for initialization.

        Returns:
            Dict[str, Any]: The prediction dictionary from the API response.
        """
        try:
            # Refresh credentials before the API call to ensure the auth token is not expired.
            self.credentials.refresh(Request())

            # Construct the full API endpoint URL using constants from VEOConfig for accuracy.
            url = (f"{self.api_endpoint}/v1/projects/{self.project_id}/locations/{self.location}"
                   f"/publishers/google/models/{VEOConfig.MODEL_ID}:{VEOConfig.API_METHOD}")

            # Prepare the authorization and content-type headers for the request.
            headers = {
                'Authorization': f'Bearer {self.credentials.token}',
                'Content-Type': 'application/json; charset=utf-8'
            }

            # Construct the request body using the dedicated helper method.
            request_body = self._build_request_body(prompt, image_data)

            # Log the request initiation. This is a long-running operation.
            logger.info("Sending request to VEO API. This may take several minutes...")
            # Send the POST request to the VEO API with a configured timeout.
            response = requests.post(url, headers=headers, json=request_body, timeout=VEOConfig.API_TIMEOUT_SECONDS)
            # Raise an exception for HTTP error codes (4xx or 5xx), which tenacity will catch for retries.
            response.raise_for_status()

            # Parse the JSON response from the API.
            response_data = response.json()

            # Extract the list of predictions from the response body.
            predictions = response_data.get("predictions")
            # Validate that the predictions list exists and is not empty.
            if not predictions or not isinstance(predictions, list):
                raise VEOVideoGenerationError(f"API response is missing 'predictions'. Response: {response_data}")

            # Log the successful receipt of the response.
            logger.info("Successfully received response from VEO API.")
            # Return the first prediction object, as we send one instance at a time.
            return predictions[0]

        except requests.exceptions.HTTPError as e:
            # Log the detailed error from the API response body if available for easier debugging.
            logger.error(f"HTTP Error during video generation request: {e.response.text}")
            raise  # Re-raise the exception to be handled by the tenacity retry decorator or the caller.
        except Exception as e:
            # Wrap any other exceptions in our custom error type for consistent error handling.
            raise VEOVideoGenerationError(f"Video generation failed: {e}") from e

    def _download_video_from_gcs(self, gcs_uri: str, local_path: Path) -> None:
        """Downloads a video from a Google Cloud Storage URI to a local path."""
        try:
            # Validate that the GCS URI has the correct 'gs://' prefix.
            if not gcs_uri.startswith('gs://'):
                raise ValueError(f"Invalid GCS URI format: {gcs_uri}")

            # Parse the bucket name and blob (object) name from the URI string.
            bucket_name, blob_name = gcs_uri[5:].split('/', 1)

            # Initialize the Google Cloud Storage client with the correct project and credentials.
            storage_client = storage.Client(project=self.project_id, credentials=self.credentials)

            # Get a reference to the bucket and blob objects.
            bucket = storage_client.bucket(bucket_name)
            blob = bucket.blob(blob_name)

            # Download the blob's contents to the specified local file path.
            blob.download_to_filename(str(local_path))

            # Log the successful download.
            logger.info(f"Successfully downloaded video from {gcs_uri} to {local_path}")

        except (api_core_exceptions.NotFound, FileNotFoundError):
            # Handle cases where the GCS object does not exist.
            raise VEOVideoGenerationError(f"GCS object not found at URI: {gcs_uri}")
        except Exception as e:
            # Wrap any other GCS-related errors.
            raise VEOVideoGenerationError(f"Failed to download video from GCS: {e}") from e

    def _extract_last_frame(self, video_path: Path) -> NDArray[np.uint8]:
        """Extracts the last frame from a video file using OpenCV."""
        # Initialize a video capture object with the path to the video file.
        cap = cv2.VideoCapture(str(video_path))
        # Check if the video file was opened successfully.
        if not cap.isOpened():
            raise VEOVideoGenerationError(f"Cannot open video file for frame extraction: {video_path}")

        try:
            # Get the total number of frames in the video.
            total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
            # Ensure the video is not empty.
            if total_frames == 0:
                raise VEOVideoGenerationError(f"Video appears to be empty (0 frames): {video_path}")

            # Set the capture position to the very last frame (index is total_frames - 1).
            cap.set(cv2.CAP_PROP_POS_FRAMES, total_frames - 1)

            # Read the frame at the new position.
            ret, frame = cap.read()

            # Check if the frame was read successfully. 'ret' will be False if it fails.
            if not ret or frame is None:
                raise VEOVideoGenerationError(f"Failed to read the last frame from: {video_path}")

            # Log the successful extraction.
            logger.info(f"Successfully extracted last frame from video: {video_path}")
            # Return the frame as a NumPy array.
            return frame

        except Exception as e:
            # Wrap any OpenCV or other errors in our custom exception.
            raise VEOVideoGenerationError(f"Frame extraction failed for '{video_path}': {e}") from e
        finally:
            # Crucially, release the video capture object to free up system resources.
            cap.release()

    def _frame_to_base64(self, frame: NDArray[np.uint8]) -> str:
        """Converts an OpenCV frame (NumPy array) to a base64 encoded string."""
        try:
            # Convert the frame from OpenCV's default BGR color space to PIL's expected RGB color space.
            frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

            # Create a PIL Image object from the NumPy array for easier processing.
            pil_image = Image.fromarray(frame_rgb)

            # Resize the frame to ensure it's compatible with VEO's input requirements, using the same logic.
            pil_image = self._resize_image_for_veo(pil_image)

            # Use an in-memory buffer to save the image without writing to disk.
            buffer = io.BytesIO()
            # Save the image to the buffer as a high-quality JPEG.
            pil_image.save(buffer, format='JPEG', quality=VEOConfig.JPEG_ENCODING_QUALITY)

            # Get the raw byte value and encode it to a utf-8 decoded base64 string.
            image_bytes = buffer.getvalue()
            base64_encoded = base64.b64encode(image_bytes).decode('utf-8')

            return base64_encoded

        except Exception as e:
            # Wrap any conversion errors.
            raise VEOVideoGenerationError(f"Frame to base64 conversion failed: {e}") from e

    def _stitch_videos_ffmpeg(self, video_paths: List[Path], output_path: Path) -> None:
        """Stitches multiple video files into one using ffmpeg's concat demuxer for efficiency."""
        # Check if the ffmpeg executable is available in the system's PATH before proceeding.
        if not shutil.which("ffmpeg"):
            raise FileNotFoundError(
                "ffmpeg not found. Please install ffmpeg and ensure it is in your system's PATH."
            )

        # Ensure there is at least one video to process.
        if not video_paths:
            raise VEOVideoGenerationError("No video clips were provided to stitch.")

        # Create a temporary manifest file for ffmpeg in the same directory as the output.
        manifest_path = output_path.with_suffix('.txt')

        try:
            # Write the list of video files to the manifest in the format required by ffmpeg's concat demuxer.
            with open(manifest_path, 'w') as f:
                for video_path in video_paths:
                    # Verify each video file exists before adding it to the manifest.
                    if not video_path.exists():
                        raise FileNotFoundError(f"Video file for stitching not found: {video_path}")
                    # Use resolved absolute paths and quotes to handle spaces or special characters safely.
                    f.write(f"file '{video_path.resolve()}'\n")

            # Log the creation of the manifest file.
            logger.info(f"Created ffmpeg manifest at: {manifest_path}")

            # Construct the ffmpeg command for efficient, lossless concatenation.
            command = [
                "ffmpeg",
                "-y",                   # Overwrite output file if it exists.
                "-f", "concat",         # Use the concat demuxer.
                "-safe", "0",           # Allow absolute paths in the manifest file (required for resolved paths).
                "-i", str(manifest_path), # Specify the input manifest file.
                "-c", "copy",           # Copy codecs without re-encoding for maximum speed and quality preservation.
                str(output_path)        # Specify the final output file path.
            ]

            # Log the command being executed for debugging purposes.
            logger.info(f"Executing ffmpeg command: {' '.join(command)}")

            # Execute the command, capturing stdout and stderr for detailed error reporting.
            result = subprocess.run(
                command,
                capture_output=True,
                text=True,
                check=False # We check the return code manually for better error logging.
            )

            # Check if the ffmpeg command executed successfully.
            if result.returncode != 0:
                # If it failed, raise an error with the detailed stderr from ffmpeg.
                raise VEOVideoGenerationError(
                    f"ffmpeg failed with exit code {result.returncode}.\n"
                    f"Stderr: {result.stderr}"
                )

            # Log the successful completion of the stitching process.
            logger.info(f"Successfully stitched {len(video_paths)} videos into: {output_path}")

        except Exception as e:
            # Wrap any exception in our custom error type.
            raise VEOVideoGenerationError(f"Video stitching failed: {e}") from e
        finally:
            # Ensure the temporary manifest file is always cleaned up, even if errors occur.
            if manifest_path.exists():
                manifest_path.unlink()
                logger.info(f"Cleaned up manifest file: {manifest_path}")

    def generate_sequential_videos(self,
        reference_image_path: str,
        scene_prompts: List[str],
        output_folder_name: str
    ) -> Path:
        """Executes the full pipeline to generate a sequence of chained videos."""
        try:
            # Step 0: Validate all inputs before starting any expensive processing.
            self._validate_inputs(reference_image_path, scene_prompts, output_folder_name)

            # Step 1: Create the output directory to store all generated artifacts.
            output_dir = self._create_output_directory(output_folder_name)

            # Step 2: Load and encode the initial reference image to start the sequence.
            current_image_data: Optional[str] = self._load_and_encode_image(reference_image_path)

            # Initialize a list to store the paths of the generated video segments for later stitching.
            video_paths: List[Path] = []

            # Step 3: Iterate through each scene prompt to generate a corresponding video segment.
            for i, prompt in enumerate(scene_prompts):
                logger.info(f"--- Processing Scene {i+1}/{len(scene_prompts)} ---")
                logger.info(f"Prompt: '{prompt[:100]}...'")

                # Step 4: Generate a video segment using the current image data and prompt.
                prediction = self._generate_video_segment(
                    prompt=prompt,
                    image_data=current_image_data
                )

                # Define a unique, ordered filename for this video segment.
                video_filename = f"segment_{i+1:03d}.mp4"
                video_path = output_dir / video_filename

                # Step 5: Process the API response, which could contain bytes or a GCS URI.
                if 'video_bytes' in prediction:
                    # If video bytes are returned directly, decode them from base64 and save to a file.
                    logger.info(f"Received video as base64 bytes. Saving to {video_path}")
                    video_bytes = base64.b64decode(prediction['video_bytes'])
                    with open(video_path, 'wb') as f:
                        f.write(video_bytes)
                elif 'gcs_uri' in prediction:
                    # If a GCS URI is returned, download the video from the bucket.
                    logger.info(f"Received GCS URI. Downloading from {prediction['gcs_uri']}")
                    self._download_video_from_gcs(prediction['gcs_uri'], video_path)
                else:
                    # If neither key is present, the API response is invalid.
                    raise VEOVideoGenerationError(f"API prediction did not contain 'video_bytes' or 'gcs_uri': {prediction}")

                # Add the path of the newly created segment to our list for the final stitching step.
                video_paths.append(video_path)

                # Step 6: For all but the last prompt, extract the last frame to seed the next segment.
                if i < len(scene_prompts) - 1:
                    logger.info("Extracting last frame to use as reference for the next segment...")
                    last_frame = self._extract_last_frame(video_path)
                    current_image_data = self._frame_to_base64(last_frame)

            # Step 7: Stitch all generated video segments into a single final video file.
            final_video_path = output_dir / "final_stitched_video.mp4"
            self._stitch_videos_ffmpeg(video_paths, final_video_path)

            # Log the successful completion of the entire pipeline.
            logger.info("--- Sequential video generation pipeline completed successfully. ---")
            logger.info(f"Final video saved to: {final_video_path}")

            # Return the path to the final product.
            return final_video_path

        except (VEOVideoGenerationError, ValueError, FileNotFoundError) as e:
            # Catch known, specific errors and log them clearly before re-raising.
            logger.error(f"A predictable error occurred in the pipeline: {e}")
            raise
        except Exception as e:
            # Catch any other unexpected errors for robust failure handling.
            logger.error(f"An unexpected error occurred in the main pipeline: {e}", exc_info=True)
            raise VEOVideoGenerationError(f"An unexpected error occurred in the main pipeline: {e}") from e


def create_veo_video_pipeline(
    reference_image_path: str,
    scene_prompts: List[str],
    output_folder_name: str,
    project_id: Optional[str] = None
) -> Path:
    """
    A high-level factory function to create and run the VEO sequential video pipeline.

    This function abstracts away the class instantiation and execution, providing a
    simple, clean interface to generate a complete video from an image and prompts.

    Args:
        reference_image_path (str): Path to the initial reference image file.
        scene_prompts (List[str]): An ordered list of scene description prompts.
        output_folder_name (str): The name for the output folder in the home directory.
        project_id (Optional[str]): Your Google Cloud project ID. If None, it will be
                                    auto-detected from ADC or environment variables.

    Returns:
        Path: The path to the final, stitched video file.
    """
    try:
        # If project_id is not provided, attempt to get it from the environment as a fallback.
        if project_id is None:
            project_id = os.environ.get('GOOGLE_CLOUD_PROJECT')
            if project_id:
                logger.info(f"Using project ID from GOOGLE_CLOUD_PROJECT env var: {project_id}")

        # Initialize the VEO pipeline class with the determined project ID.
        pipeline = VEOVideoPipeline(project_id=project_id)

        # Execute the complete video generation pipeline with the provided parameters.
        final_video_path = pipeline.generate_sequential_videos(
            reference_image_path=reference_image_path,
            scene_prompts=scene_prompts,
            output_folder_name=output_folder_name
        )

        return final_video_path

    except Exception as e:
        # Catch and log any exceptions from the pipeline for clear top-level error reporting.
        logger.error(f"VEO video pipeline execution failed: {e}")
        # Re-raise the exception to allow the calling script to handle it as needed.
        raise


# Usage Example

In [None]:
# Example Usage and Testing Block

if __name__ == "__main__":
    """
    Example usage of the corrected and robust VEO video generation pipeline.

    *** IMPORTANT: CONFIGURATION REQUIRED ***
    1.  **AUTHENTICATION**: Run `gcloud auth application-default login` in your terminal.
    2.  **PROJECT ID**: Set the `GOOGLE_CLOUD_PROJECT` environment variable to your GCP project ID,
        or pass the `project_id` argument to `create_veo_video_pipeline`.
    3.  **APIs**: Ensure the Vertex AI API is enabled in your Google Cloud project.
    4.  **DEPENDENCIES**: Install required packages:
        pip install google-cloud-aiplatform google-cloud-storage opencv-python Pillow requests tenacity numpy
    5.  **FFMPEG**: Install ffmpeg (https://ffmpeg.org/download.html) and ensure it is in your system's PATH.
    6.  **REFERENCE IMAGE**: Replace the dummy image path with a path to your own image.
    """

    # --- CONFIGURE THESE PARAMETERS FOR YOUR RUN ---
    try:
        # Get the directory of the current script to create a sample image nearby.
        script_dir = Path(__file__).parent
    except NameError:
        # Fallback for interactive environments (like Jupyter) where __file__ is not defined.
        script_dir = Path.cwd()

    # Create a dummy reference image for testing if one doesn't exist.
    dummy_image_path = script_dir / "veo_reference_image.jpg"
    if not dummy_image_path.exists():
        print(f"Creating a dummy reference image at: {dummy_image_path}")
        # A blue 16:9 image matching VEO's landscape dimensions.
        dummy_img = Image.new('RGB', VEOConfig.LANDSCAPE_DIMS, color='darkblue')
        dummy_img.save(dummy_image_path)

    # Define the parameters for the video generation task.
    example_params = {
        "reference_image_path": str(dummy_image_path),
        "scene_prompts": [
            "A majestic eagle perched on a rocky cliff overlooking a vast canyon, cinematic lighting.",
            "The eagle spreads its wings and takes flight into the golden sunset, slow motion.",
            "Soaring high above snow-capped mountains with clouds below, drone shot.",
            "The eagle gracefully lands near a crystal clear alpine lake, its reflection in the water."
        ],
        "output_folder_name": "veo_eagle_flight_sequence",
        # "project_id": "your-gcp-project-id"  # Optional: uncomment and set if needed.
    }

    print("--- Starting VEO Video Generation Pipeline ---")
    print(f"Reference Image: {example_params['reference_image_path']}")
    print(f"Output Folder Name: {example_params['output_folder_name']}")
    print(f"Number of Scenes: {len(example_params['scene_prompts'])}")

    try:
        # Execute the pipeline and time its execution.
        start_time = time.time()
        final_video = create_veo_video_pipeline(**example_params)
        end_time = time.time()

        # Print a clear success message with the final output path and total time.
        print("\n" + "="*60)
        print("✅ Video generation pipeline completed successfully!")
        print(f"📹 Final video saved to: {final_video}")
        print(f"⏱️ Total execution time: {end_time - start_time:.2f} seconds.")
        print("="*60)

    except (VEOVideoGenerationError, ValueError, FileNotFoundError) as e:
        # Handle known, controlled pipeline failures gracefully with a clear error message.
        print(f"\n❌ PIPELINE FAILED: {e}")
        sys.exit(1)
    except Exception as e:
        # Handle any other unexpected errors to prevent unhandled stack traces.
        print(f"\n❌ AN UNEXPECTED ERROR OCCURRED: {e}")
        sys.exit(1)
