Copyright 2024 Google, LLC. This software is provided as-is,
without warranty or representation for any use or purpose. Your
use of it is subject to your agreement with Google.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

   http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and

# How To Use Vertex Veo for Text to Video and Imange to Video

This notebook outlines how to interact with Vertex Veo model to create dynamic video content. 

## Prepare the python development environment

First, let's identify any project specific variables to customize this notebook to your GCP environment. At a minimum, change YOUR_PROJECT_ID and YOUR_BUCKET to reflect your own GCP project info.

In [None]:
project_id = "YOUR_PROJECT_ID"
location = "global"
region = "us-central1"

gcs_bucket = "YOUR_GCS_BUCKET"
output_gcs_uri = f'''gs://{gcs_bucket}'''

local_tmp_folder_1 = "./tmp/vid1"
local_tmp_folder_2 = "./tmp/vid2"
local_tmp_folder_3 = "./tmp/vid3"

source_image = "source_image.png"

Install any needed python modules from our requirements.txt file. Most Vertex Workbench environments include all the packages we'll be using, but if you are using an external Jupyter Notebook or require any additional packages for your own needs, you can simply add them to the included requirements.txt file an run the folloiwng commands.

In [None]:
!pip install -r requirements.txt
!pip install --upgrade google-genai

Now we will import all required modules. For our purpose, we will be utilizing the following:

- google.auth - Provides authentication access to the Google API's, such as imagegeneration:predict
- PIL - An easy to use Python image library to help build the background for our banner and perform image layering
- io - Core python libray used to work with I/O. We will use this to help convert strings to byte objects for PIL
- base64 - Imagen API requests return generated or edited images as base64-encoded strings. This module will help us decode this data to an image file
- requests - This module will allow us to interact directly with Imagen over the REST API. 
- json - Python module used to interact with JSON data. Imagen returns results in json format.

In [None]:
import google.auth.transport.requests
import google.auth
from PIL import Image
from io import BytesIO
import base64
import requests
import json
import io
import time
from IPython.display import clear_output, HTML, Video
import asyncio
import contextlib
import itertools
import yaml
import cv2
import ffmpeg
import os

from google.cloud import storage

from google import genai
from google.genai import types
from google.genai.types import GenerateVideosConfig, HttpOptions, Part
from google.genai.types import Image as genImage

## Function Junction 

In [None]:
def upload_file_to_gcs(bucket_name, source_file_name, destination_blob_name):
    """Uploads a file to the GCS bucket (simplified version)."""
    # bucket_name = "your-bucket-name"
    # source_file_name = "local/path/to/file.jpg"
    # destination_blob_name = "storage-object-name.jpg"

    # Initialize the GCS client
    storage_client = storage.Client()

    # Get the bucket
    bucket = storage_client.bucket(bucket_name)

    # Create a new blob (or refer to an existing one)
    blob = bucket.blob(destination_blob_name)

    # Upload the local file to GCS
    blob.upload_from_filename(source_file_name)

    print(
        f"File {source_file_name} uploaded to gs://{bucket_name}/{destination_blob_name}."
    )
    
    return f'gs://{bucket_name}/{destination_blob_name}'

In [None]:
def download_file_from_uri(uri, local_directory_path):
    """
    Downloads a file from Google Cloud Storage using a URI,
    saving it to the specified local directory with its original filename (without subfolders).

    Args:
        uri: The GCS URI (e.g., gs://my-bucket/sub_folder/my-file.txt).
        ocal_directory_path: The directory where the file should be saved locally.
                          The filename will be extracted from the GCS object name.
    Raises:
        ValueError: If the URI is malformed.
        google.cloud.exceptions.NotFound: If the bucket or blob does not exist.
    """
    if not uri.startswith("gs://"):
        raise ValueError(f"Invalid GCS URI: '{uri}'. Must start with 'gs://'.")

    # Parse the URI
    try:
        # Splitting "gs://bucket/object/path" into "bucket" and "object/path"
        path_without_scheme = uri[5:] # Remove "gs://"
        bucket_name, object_name = path_without_scheme.split('/', 1)
    except ValueError:
        raise ValueError(f"Invalid GCS URI format: '{uri}'. Expected gs://bucket_name/object_name.")

    if not object_name:
        raise ValueError(f"Invalid GCS URI: '{uri}'. Object name cannot be empty.")

    # Extract the base filename from the object_name
    # e.g., if object_name is "sub_folder/file.mp4", base_filename will be "file.mp4"
    base_filename = os.path.basename(object_name)
    if not base_filename: # Handles cases like "gs://bucket/folder/"
        raise ValueError(f"Could not determine filename from object: {object_name}")


    # Ensure the local directory exists
    os.makedirs(local_directory_path, exist_ok=True)

    # Construct the full local path for saving the file
    full_local_save_path = os.path.join(local_directory_path, base_filename)

    # Create a client
    storage_client = storage.Client()

    try:
        # Get a bucket and blob object
        bucket = storage_client.bucket(bucket_name) # More direct way to get bucket reference
        blob = bucket.blob(object_name)

        if not blob.exists():
            raise NotFound(f"Blob {object_name} not found in bucket {bucket_name}.")

        # Download the file
        print(f"Downloading {uri} to {full_local_save_path}...")
        blob.download_to_filename(full_local_save_path)
        print(f"Successfully downloaded to {full_local_save_path}")
    
        return full_local_save_path

    except NotFound as e:
        print(f"Error: {e}")
        raise # Re-raise the exception if you want the caller to handle it
    except Exception as e:
        print(f"An unexpected error occurred: {e}")
        raise

In [None]:
def extract_last_frame(video_path, output_path):
    """Extracts the last frame from a video and saves it as an image.

    Args:
        video_path: Path to the input video file.
        output_path: Path to save the output image file.
    """
    print(video_path)
    video_capture = cv2.VideoCapture(video_path)
    if not video_capture.isOpened():
        raise Exception(f"Could not open video file: {video_path}")

    total_frames = int(video_capture.get(cv2.CAP_PROP_FRAME_COUNT))
    # Ensure the frame number is valid
    if total_frames > 0:
      video_capture.set(cv2.CAP_PROP_POS_FRAMES, total_frames - 1)
    ret, last_frame = video_capture.read()

    if ret:
        cv2.imwrite(output_path, last_frame)
        print(f"Last frame saved to {output_path}")
    else:
        print("Failed to retrieve the last frame.")

    video_capture.release()

## Authenticate to the Vertex AI API

Our Vertex Workbench instance is configured to use a specified service account that has IAM access to the Veo API. The following two secitons will allow us to generate the access token we will pass as an authorization bearer token later in the header.

In [None]:
client = genai.Client(
    vertexai=True, project=project_id, location=region
)

## Inspect the source image

In [None]:
with open(source_image, "rb") as f:
    encoded_base_image = base64.b64encode(f.read())
B64_BASE_IMAGE = encoded_base_image.decode('utf-8')

image_mime_type = "image/png"

In [None]:
chat = client.chats.create(model="gemini-2.0-flash-preview-image-generation",
        config=types.GenerateContentConfig(
            response_modalities=["Text", "Image"]
        )
    )

In [None]:
text_part = types.Part(text=f'''
    Carefully examine the provided image. Pay attention to the outfit the model is wearing. 
    Do not generate an image at this point, only inspect the image and tell me what you see.
    '''
)

In [None]:
image_part = types.Part(
    inline_data=types.Blob(
        mime_type=image_mime_type,
        data=B64_BASE_IMAGE
    )
)

In [None]:
message_content = [text_part, image_part]

chat.send_message(message_content)

In [None]:
start_chat_prompt = f'''Generate an image of a woman wearing this exact outfit. 
Ensure the woman has natural features, such as her face, feet, toes and hands. 

The image should be high-quality and professionally styled, as if taken by a professional photographer for a magazine.

The scene is a trendy fashion show runway, with a large audience seated on both sides.
The camera is located at the end of the runway and eye level with the woman
The woman is walking toward the camera as she starts her walk down the runway
'''

In [None]:
print("Generating image. Please wait..." )
print("-" * 20)


response = chat.send_message(start_chat_prompt)

# --- Corrected Image and Text Handling ---
if response.candidates and response.candidates[0].content:
    # Iterate through ALL parts in the response
    for part in response.candidates[0].content.parts:
        if part.text:  # Check if the part has text
            print("Model (Text Part):", part.text)
            print("-" * 20)

        if part.inline_data:  # Check if the part has inline_data (image)
            try:
                byte_stream = BytesIO(part.inline_data.data)
                gen_image = Image.open(byte_stream)  # Open directly
                # gen_image = gen_image.convert("RGB")  # Only convert if you DON'T want alpha
                display(gen_image)
                print("-" * 20)
            except Exception as e:
                print(f"Error displaying image: {e}")
else:
     print("No candidates or content found in the response.")
print("-" * 20)


In [None]:
image_1 = 'image_1.png'
gen_image.save(image_1)

In [None]:
#image_1_uri = upload_file_to_gcs(gcs_bucket, image_1, image_1)

## Generate the fist video clip from the generated image

Define prompt to create the first video clip

In [None]:
with open(image_1, "rb") as f:
    encoded_base_image = base64.b64encode(f.read())
B64_BASE_IMAGE = encoded_base_image.decode('utf-8')

image_mime_type = "image/png"

In [None]:
video_generation_prompt = f'''
The woman is walking towards the camera
'''

Define the request

In [None]:
operation = client.models.generate_videos(
    model="veo-2.0-generate-001",
    prompt=video_generation_prompt,
    image=genImage(
        #gcs_uri=image_1_uri,
        image_bytes=B64_BASE_IMAGE,
        mime_type="image/png",
    ),
    config=GenerateVideosConfig(
        aspect_ratio="9:16", # 16:9 (landscape) and 9:16 (portrait) are supported.
        number_of_videos=4,
        duration_seconds="6",
        fps=24,
        person_generation="allow_adult",
        enhance_prompt=True,
        output_gcs_uri=output_gcs_uri,
    ),
)

Submit the request and process the resutls

In [None]:
while not operation.done:
    time.sleep(15)
    operation = client.operations.get(operation)
    print(operation)

generated_videos_data = [] # Initialize an empty list to store video data

if operation.response:
    print("Processing generated videos...")
    if operation.result and hasattr(operation.result, 'generated_videos'):
        for generated_video_obj in operation.result.generated_videos:
            if generated_video_obj.video and generated_video_obj.video.uri:
                video_uri = generated_video_obj.video.uri
                video_name = os.path.basename(video_uri)
                
                local_file = download_file_from_uri(video_uri, local_tmp_folder_1)

                video_info = {
                    "name": video_name,
                    "uri": video_uri,
                    "tmp_file": local_file
                    # You could add more info here if available and needed,
                    # e.g., duration, mime_type from generated_video_obj.video
                }
                generated_videos_data.append(video_info)
            else:
                print(f"A generated video entry was found but did not have a valid URI. Skipping.")
        
        print(f"\nSuccessfully extracted data for {len(generated_videos_data)} videos:")
        for video_data in generated_videos_data:
            print(f"  Name: {video_data['name']}, URI: {video_data['uri']}, Local File: {video_data['tmp_file']}")
            video_player = Video(video_data['tmp_file'], embed=True, width=640, height=480)
            display(video_player)

    else:
        print("No generated videos found in the response.")
elif operation.error:
    error_message = operation.error.message if hasattr(operation.error, 'message') else str(operation.error)
    print(f"Operation failed with error: {error_message}")
else:
    print("Operation completed but no response or error was found.")

Inspect the generated videos to determine the accuracy and quality 

In [None]:
video_inspect_prompt = f'''
    You will be provided with a video along with the associated name local file and GCS URI.
    
    Judge the accuracy of the generated video based on the input prompt and lifelike motion. 
    Deduct points for features or actions that were either added, changed or removed from the original prompt.
    Deduct additional points for the immediate area around the subject in the image that looks out of place or unrealistic.
    Give the image a ranking of 1-10, with 1 being low and 10 being high.

    Do not include any greetings or pleasantries in your response.
    
    The input prompt used to generate this video is:
    {video_generation_prompt}
    
    
    Example output:
    name: video.mp4
    
    uri: gs://bucket/folder/file.mp4
    
    local: ./folder/file.mp4
    
    qa_score: 5

    details: summary of the video

    inconsistencies: list of identified problems with the image 
'''

In [None]:
vid_client = genai.Client(vertexai=True, project=project_id, location=region, http_options=HttpOptions(api_version="v1"))

In [None]:
if generated_videos_data:
    best_video_score = -1 # Or some other initial comparison value
    best_video_info = None
        
    vid_chat = vid_client.chats.create(model="gemini-2.5-flash-preview-04-17",
        config=types.GenerateContentConfig(
            response_modalities=["Text"]
        )
    )
    
    for video_data in generated_videos_data:
        #print(f"\nAnalyzing video: {video_data['name']} ({video_data['uri']})")
        
        

        video_part = Part.from_uri(
            file_uri=video_data['uri'],
            mime_type="video/mp4",
        )

       
        message_content = [f'''The file name is {video_data['name']} the local file is {video_data['tmp_file']} and the GCS URI is {video_data['uri']}. {video_inspect_prompt}''', video_part]
        
        response = vid_chat.send_message(message_content)
        print('------------------------')
        print(response.text)
        print()

In [None]:
response = vid_chat.send_message(f'''Based on your analysis, provide the local file path for the best video to use.
Only respond with the local file path''')
selected_video_1 = response.text
print(selected_video_1)

## Extract the last image from the video

In [None]:
video_file = selected_video_1.rstrip()
image_file = "clip_1_end.png"

extract_last_frame(video_file, image_file)

## Create a second video with Veo based on the last frame of video 1

Use the same veo code as before

In [None]:
video_generation_prompt = f'''
the woman is turning around
'''

Define the request

In [None]:
with open(image_file, "rb") as f:
    encoded_base_image = base64.b64encode(f.read())
B64_BASE_IMAGE = encoded_base_image.decode('utf-8')

image_mime_type = "image/png"

In [None]:
operation = client.models.generate_videos(
    model="veo-2.0-generate-001",
    prompt=video_generation_prompt,
    image=genImage(
        image_bytes=B64_BASE_IMAGE,
        mime_type="image/png",
    ),
    config=GenerateVideosConfig(
        aspect_ratio="9:16", # 16:9 (landscape) and 9:16 (portrait) are supported.
        number_of_videos=4,
        duration_seconds="8",
        fps=24,
        person_generation="allow_adult",
        enhance_prompt=True,
        output_gcs_uri=output_gcs_uri,
    ),
)

Submit the request and process the resutls

In [None]:
while not operation.done:
    time.sleep(15)
    operation = client.operations.get(operation)
    print(operation)

generated_videos_data = [] # Initialize an empty list to store video data

if operation.response:
    print("Processing generated videos...")
    if operation.result and hasattr(operation.result, 'generated_videos'):
        for generated_video_obj in operation.result.generated_videos:
            if generated_video_obj.video and generated_video_obj.video.uri:
                video_uri = generated_video_obj.video.uri
                video_name = os.path.basename(video_uri)
                
                local_file = download_file_from_uri(video_uri, local_tmp_folder_2)

                video_info = {
                    "name": video_name,
                    "uri": video_uri,
                    "tmp_file": local_file
                    # You could add more info here if available and needed,
                    # e.g., duration, mime_type from generated_video_obj.video
                }
                generated_videos_data.append(video_info)
            else:
                print(f"A generated video entry was found but did not have a valid URI. Skipping.")
        
        print(f"\nSuccessfully extracted data for {len(generated_videos_data)} videos:")
        for video_data in generated_videos_data:
            print(f"  Name: {video_data['name']}, URI: {video_data['uri']}, Local File: {video_data['tmp_file']}")
            video_player = Video(video_data['tmp_file'], embed=True, width=640, height=480)
            display(video_player)

    else:
        print("No generated videos found in the response.")
elif operation.error:
    error_message = operation.error.message if hasattr(operation.error, 'message') else str(operation.error)
    print(f"Operation failed with error: {error_message}")
else:
    print("Operation completed but no response or error was found.")

Inspect the videos for accuracy and quality

In [None]:
video_inspect_prompt = f'''
    You will be provided with a video along with the associated name local file and GCS URI.
    
    Judge the accuracy of the generated video based on the input prompt and lifelike motion. 
    Deduct points for features or actions that were either added, changed or removed from the original prompt.
    Deduct additional points for the immediate area around the subject in the image that looks out of place or unrealistic.
    Give the image a ranking of 1-10, with 1 being low and 10 being high.

    Do not include any greetings or pleasantries in your response.
    
    The input prompt used to generate this video is:
    {video_generation_prompt}
    
    
    Example output:
    name: video.mp4
    
    uri: gs://bucket/folder/file.mp4
    
    local: ./folder/file.mp4
    
    qa_score: 5

    details: summary of the video

    inconsistencies: list of identified problems with the image 
'''

In [None]:
if generated_videos_data:
    best_video_score = -1 # Or some other initial comparison value
    best_video_info = None
    
    vid_chat = vid_client.chats.create(model="gemini-2.5-flash-preview-04-17",
        config=types.GenerateContentConfig(
            response_modalities=["Text"]
        )
    )
    
    for video_data in generated_videos_data:
        #print(f"\nAnalyzing video: {video_data['name']} ({video_data['uri']})")
        
        vid_client = genai.Client(vertexai=True, project=project_id, location=region, http_options=HttpOptions(api_version="v1"))

        video_part = Part.from_uri(
            file_uri=video_data['uri'],
            mime_type="video/mp4",
        )
        
        message_content = [f'''The file name is {video_data['name']} the local file is {video_data['tmp_file']} and the GCS URI is {video_data['uri']}. {video_inspect_prompt}''', video_part]
        
        response = vid_chat.send_message(message_content)
        print('------------------------')
        print(response.text)
        print()

In [None]:
response = vid_chat.send_message(f'''Based on your analysis, provide the local file path for the best video to use.
Only respond with the local file path''')
selected_video_2 = response.text
print(selected_video_2)

## Extract the last image from the video

In [None]:
video_file = selected_video_2.rstrip()
image_file = "clip_2_end.png"

extract_last_frame(video_file, image_file)

## Create a third video with Veo based on the last frame of video 2

Use the same veo code as before

In [None]:
video_generation_prompt = f'''
The woman walks away from the camera
'''

Define the request

In [None]:
with open(image_file, "rb") as f:
    encoded_base_image = base64.b64encode(f.read())
B64_BASE_IMAGE = encoded_base_image.decode('utf-8')

image_mime_type = "image/png"

In [None]:
operation = client.models.generate_videos(
    model="veo-2.0-generate-001",
    prompt=video_generation_prompt,
    image=genImage(
        image_bytes=B64_BASE_IMAGE,
        mime_type="image/png",
    ),
    config=GenerateVideosConfig(
        aspect_ratio="9:16", # 16:9 (landscape) and 9:16 (portrait) are supported.
        number_of_videos=4,
        duration_seconds="8",
        fps=24,
        person_generation="allow_adult",
        enhance_prompt=True,
        output_gcs_uri=output_gcs_uri,
    ),
)

Submit the request and process the resutls

In [None]:
while not operation.done:
    time.sleep(15)
    operation = client.operations.get(operation)
    print(operation)

generated_videos_data = [] # Initialize an empty list to store video data

if operation.response:
    print("Processing generated videos...")
    if operation.result and hasattr(operation.result, 'generated_videos'):
        for generated_video_obj in operation.result.generated_videos:
            if generated_video_obj.video and generated_video_obj.video.uri:
                video_uri = generated_video_obj.video.uri
                video_name = os.path.basename(video_uri)
                
                local_file = download_file_from_uri(video_uri, local_tmp_folder_3)

                video_info = {
                    "name": video_name,
                    "uri": video_uri,
                    "tmp_file": local_file
                    # You could add more info here if available and needed,
                    # e.g., duration, mime_type from generated_video_obj.video
                }
                generated_videos_data.append(video_info)
            else:
                print(f"A generated video entry was found but did not have a valid URI. Skipping.")
        
        print(f"\nSuccessfully extracted data for {len(generated_videos_data)} videos:")
        for video_data in generated_videos_data:
            print(f"  Name: {video_data['name']}, URI: {video_data['uri']}, Local File: {video_data['tmp_file']}")
            video_player = Video(video_data['tmp_file'], embed=True, width=640, height=480)
            display(video_player)

    else:
        print("No generated videos found in the response.")
elif operation.error:
    error_message = operation.error.message if hasattr(operation.error, 'message') else str(operation.error)
    print(f"Operation failed with error: {error_message}")
else:
    print("Operation completed but no response or error was found.")

In [None]:
video_inspect_prompt = f'''
    You will be provided with a video along with the associated name local file and GCS URI.
    
    Judge the accuracy of the generated video based on the input prompt. 
    Do not deduct points for photography techniques, like bokeh, Deep and Shallow Depth of Field, etc.
    Deduct points for features or actions that were either added, changed or removed from the original prompt.
    Deduct additional points for the immediate area around the subject in the image that looks out of place or unrealistic.
    Give the image a ranking of 1-10, with 1 being low and 10 being high.

    Do not include any greetings or pleasantries in your response.
    
    The input prompt used to generate this video is:
    {video_generation_prompt}
    
    
    Example output:
    name: video.mp4
    
    uri: gs://bucket/folder/file.mp4
    
    local: ./folder/file.mp4
    
    qa_score: 5

    details: summary of the image

    inconsistencies: list of identified problems with the image 
'''

In [None]:
if generated_videos_data:
    best_video_score = -1 # Or some other initial comparison value
    best_video_info = None
    
    vid_chat = vid_client.chats.create(model="gemini-2.5-flash-preview-04-17",
        config=types.GenerateContentConfig(
            response_modalities=["Text"]
        )
    )
    
    for video_data in generated_videos_data:
        #print(f"\nAnalyzing video: {video_data['name']} ({video_data['uri']})")
        
        vid_client = genai.Client(vertexai=True, project=project_id, location=region, http_options=HttpOptions(api_version="v1"))

        video_part = Part.from_uri(
            file_uri=video_data['uri'],
            mime_type="video/mp4",
        )

        
        message_content = [f'''The file name is {video_data['name']} the local file is {video_data['tmp_file']} and the GCS URI is {video_data['uri']}. {video_inspect_prompt}''', video_part]
        
        response = vid_chat.send_message(message_content)
        print('------------------------')
        print(response.text)
        print()

In [None]:
response = vid_chat.send_message(f'''Based on your analysis, provide the local file path for the best video to use.
Only respond with the local file path''')
selected_video_3 = response.text
print(selected_video_3)

## Merge the three videos

In [None]:
# --- Configuration ---
vid_clip_1 = selected_video_1.rstrip()
vid_clip_2 = selected_video_2.rstrip()
vid_clip_3 = selected_video_3.rstrip()
output_vid = "merged.mp4"

# Target dimensions for scaling (optional, set to None to disable scaling)
TARGET_WIDTH = 720
TARGET_HEIGHT = 1280

In [None]:
# --- Helper function to process and potentially scale a video clip ---
def process_video_clip(filepath, target_width=None, target_height=None):
    """Creates an ffmpeg input stream, probing and scaling if necessary."""
    if not os.path.exists(filepath):
        print(f"Error: File not found - {filepath}")
        return None # Return None if file doesn't exist

    input_stream = ffmpeg.input(filepath)

    if target_width is not None and target_height is not None:
        try:
            # Probe the video dimensions
            print(f"Probing {os.path.basename(filepath)}...")
            probe = ffmpeg.probe(filepath)
            video_stream_info = next((stream for stream in probe['streams'] if stream['codec_type'] == 'video'), None)

            if video_stream_info:
                width = int(video_stream_info.get('width', 0))
                height = int(video_stream_info.get('height', 0))
                print(f"Detected dimensions for {os.path.basename(filepath)}: {width}x{height}")

                # Scale if not target dimensions
                if width != target_width or height != target_height:
                    print(f"Scaling {os.path.basename(filepath)} to {target_width}x{target_height}")
                    input_stream = input_stream.filter('scale', target_width, target_height)
                else:
                    print(f"No scaling needed for {os.path.basename(filepath)}.")
            else:
                 print(f"Warning: Could not find video stream information for {os.path.basename(filepath)}. Skipping scaling.")

        except ffmpeg.Error as e:
            print(f"Warning: Error probing {os.path.basename(filepath)}. Skipping scaling. Error: {e.stderr.decode()}")
        except Exception as e:
            print(f"Warning: An unexpected error occurred during probing/scaling for {os.path.basename(filepath)}. Skipping scaling. Error: {e}")

    return input_stream

In [None]:
print("Processing video clips...")
streams = []

stream1 = process_video_clip(vid_clip_1, TARGET_WIDTH, TARGET_HEIGHT)
if stream1:
    streams.append(stream1)

stream2 = process_video_clip(vid_clip_2, TARGET_WIDTH, TARGET_HEIGHT)
if stream2:
    streams.append(stream2)
    
stream3 = process_video_clip(vid_clip_3, TARGET_WIDTH, TARGET_HEIGHT)
if stream3:
    streams.append(stream3)

# --- Ensure we have two streams to concatenate ---
if len(streams) == 3:
    print("Concatenating video streams...")
    # --- Concatenate the video streams ---
    # v=1: take video streams from inputs
    # a=1: take audio streams from inputs (keeps original audio from clips)
    # If you want to *discard* audio from the input clips use a=0
    #concatenated = ffmpeg.concat(*streams, v=1, a=1) # Use a=1 to keep audio from clips
    concatenated = ffmpeg.concat(*streams, v=1, a=0)

    # --- Define the output ---
    output = ffmpeg.output(concatenated, output_vid)

    # --- Run ffmpeg ---
    print(f"Running ffmpeg to create {output_vid}...")
    try:
        # Add capture_stderr=True here!
        # quiet=False still prints to console, capture_stderr makes it available in the exception
        stdout, stderr = ffmpeg.run(output, capture_stdout=True, capture_stderr=True, overwrite_output=True, quiet=False)
        # If successful, stderr might still contain warnings, print them if desired
        if stderr:
             print("FFmpeg warnings/stderr (run succeeded):")
             print(stderr.decode())
        print(f"Video successfully created: {output_vid}")

    except ffmpeg.Error as e:
        print("--- Error during ffmpeg execution ---")
        # Check if stderr was captured before trying to decode
        if e.stderr:
            print("FFmpeg stderr output:")
            print(e.stderr.decode())
        else:
            # This part should ideally not be reached if capture_stderr=True, but handles edge cases
            print("Could not capture ffmpeg stderr. Check console output above for errors.")
            print(f"ffmpeg-python exception details: {e}") # Print the exception itself

        # Also print stdout if available, might contain clues
        if e.stdout:
             print("FFmpeg stdout output:")
             print(e.stdout.decode())

    except Exception as e:
        print(f"An unexpected Python error occurred during ffmpeg run: {e}")
        import traceback
        traceback.print_exc() # Print the full Python traceback for unexpected errors


elif len(streams) == 1:
     print("Error: Only one valid video stream was processed. Cannot concatenate.")
else:
     print("Error: No valid video streams were processed. Check input file paths and formats.")

In [None]:
HTML(f"""
<video width="75%" height="75%" controls style="transform: scale(0.5); transform-origin: top left;">
  <source src="{output_vid}" type="video/mp4">
</video>
""")