In [None]:
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Setup
----

### Install Google Gen AI SDK for Python

In [None]:
%pip install --upgrade --quiet google-genai

### Setup ffmpeg

In [None]:
!apt-get update -qq && apt-get install -y ffmpeg -qq

### Authenticate your notebook environment (Colab only)
If you are running this notebook on Google Colab, run the following cell to authenticate your environment.

In [None]:
import sys

if "google.colab" in sys.modules:
    from google.colab import auth

    auth.authenticate_user()

### Import libraries

In [None]:
from IPython.display import display, Image, HTML, Markdown, Video
from google import genai
from google.genai import types

import time
import json
import numpy as np
import pandas as pd

import matplotlib.pyplot as plt
import matplotlib.image as mpimg

import concurrent.futures

### Set Google Cloud project information and create client

In [None]:
import os

PROJECT_ID = "[your-project-id]"  # @param {type: "string", placeholder: "[your-project-id]", isTemplate: true}
if not PROJECT_ID or PROJECT_ID == "[your-project-id]":
    PROJECT_ID = str(os.environ.get("GOOGLE_CLOUD_PROJECT"))

LOCATION = os.environ.get("GOOGLE_CLOUD_REGION", "us-central1")

client = genai.Client(vertexai=True, project=PROJECT_ID, location=LOCATION)

### Define a helper function to display media

In [None]:
def show_video(video):
    if isinstance(video, str):
        file_name = video.split("/")[-1]
        !gsutil cp {video} {file_name}
        display(Video(file_name, embed=True, width=600))
    else:
        with open("sample.mp4", "wb") as out_file:
            out_file.write(video)
        display(Video("sample.mp4", embed=True, width=600))

### Setup video generation (Veo) and image editing (Imagen) models

In [None]:
video_model = "veo-3.0-generate-preview" #@param{type:"string"} ["veo-3.0-generate-001", "veo-3.0-fast-generate-001", "veo-3.0-generate-preview", "veo-3.0-fast-generate-preview"]

image_editing_model = "imagen-3.0-capability-001" #@param{type:"string"} ["imagen-3.0-capability-001", "imagen-3.0-generate-002"]

# Setup folders involved in images and video generation

In [None]:
images_gcs_path = "gs://genai_public/ice_cream_images" #@param{type:"string"}

original_images_folder = "original_images" #@param{type:"string"}

edited_images_folder = "edited_images" #@param{type:"string"}

videos_folder = "videos" #@param{type:"string"}

for folder in [original_images_folder, edited_images_folder, videos_folder]:
  if not os.path.exists(folder):
    os.makedirs(folder)

# Copy Image Files Locally and Inspect
----

### Get links to image files from Google Cloud Storage

In [None]:
# Define the path to images folder in Google Cloud Storage

!gsutil -m cp -r {images_gcs_path}/* /content/{original_images_folder}

# Use os.listdir to list all files in local folder and filter for image files
image_files = [file for file in os.listdir(original_images_folder)
  if file.endswith(('.jpg', '.jpeg', '.png'))]

# Create a list to store the image URIs and local file paths
image_data = []

# Iterate through the output of gsutil, copy files, and store data
for file in image_files:
  file = file.strip()  # Remove leading/trailing whitespace
  file_name = file.split("/")[-1]
  image_data.append({"file_name": file_name})

# Create a Pandas DataFrame from the list of image data
images = (pd.DataFrame(image_data).sort_values(['file_name']).
  reset_index(drop=True))

images['folder'] = original_images_folder

# Get name of image from file name
images['image_name'] = images['file_name'].apply(lambda x: x.split('.')[0].
  replace('_', ' '))

display(images)

### Show each image with image name above in grid

In [None]:
# Set figure size
plt.figure(figsize=(12, 2 * len(images)))

for index, row in images.iterrows():
  file_path = os.path.join(row['folder'], row['file_name'])
  img = mpimg.imread(file_path)
  plt.subplot(len(images) // 3 + 1, 3, index + 1)
  plt.imshow(img)
  plt.title(row['image_name'])
  plt.axis('off')

plt.tight_layout()

# save off original images together as PNG file (have to do this before calling
# "show" to avoid getting blank PNG file)
plt.savefig("original_images.png")

plt.show()

# Try to Remove Image Backgrounds Using Imagen Editing
----

### Remove backgrounds for all images in parallel

In [None]:
def remove_image_background(image_file_name):

    initial_image = types.Image.from_file(location=image_file_name)

    edit_prompt = """Keep ice cream and whatever it's already in in the picture.
      Remove everything else around it and leave a plain white background,
      with no additions.
      """

    raw_ref_image = types.RawReferenceImage(reference_image=initial_image,
      reference_id=0)

    mask_ref_image = types.MaskReferenceImage(
      reference_id=1,
      reference_image=None,
      config=types.MaskReferenceConfig(
          mask_mode="MASK_MODE_BACKGROUND",
      ),
    )

    edited_image = client.models.edit_image(
      model=image_editing_model,
      prompt=edit_prompt,
      reference_images=[raw_ref_image, mask_ref_image],
      config=types.EditImageConfig(
          edit_mode="EDIT_MODE_INPAINT_INSERTION",
          number_of_images=1,
          safety_filter_level="BLOCK_MEDIUM_AND_ABOVE",
          person_generation="ALLOW_ADULT",
      ),
    )

    return(edited_image)

def process_image(row):
  file_path = os.path.join(row['folder'], row['file_name'])
  edited_image = remove_image_background(file_path)
  edited_file_path = os.path.join(edited_images_folder, row['file_name'])
  edited_image.generated_images[0].image.save(edited_file_path)
  return edited_file_path

with concurrent.futures.ThreadPoolExecutor() as executor:
  edited_results = list(executor.map(process_image,
    [row for _, row in images.iterrows()]))

images['edited_image_folder_and_file_name'] = edited_results

print(f"Edited images saved to folder: {edited_images_folder}")

### See all resulting edited images together

In [None]:
# Set figure size
plt.figure(figsize=(12, 2 * len(images)))

for index, row in images.iterrows():
  file_path = row['edited_image_folder_and_file_name']
  img = mpimg.imread(file_path)
  plt.subplot(len(images) // 3 + 1, 3, index + 1)
  plt.imshow(img)
  plt.title(row['image_name'])
  plt.axis('off')

plt.tight_layout()

# save off edited images together as PNG file (have to do this before calling
# "show" to avoid getting blank PNG file)
plt.savefig("edited_images.png")

plt.show()

# Generate Videos for Each Image
----

### Generate videos for all images in parallel

In [None]:
def generate_video_from_image(image_file_name, image_name):

  video_prompt = f"""A high-resolution, professional product shot of this
    ice cream named {image_name} on a clean white background. The video
    shows a dynamic 360-degree rotation of the ice cream, with the camera
    smoothly dollying out to reveal all its details. The ice cream does not get
    changed. No other new subjects or people are shown, just focus on the ice
    cream throughout the video.

    A highly energetic male voiceover (not shown) says 1 short sentence about
    what makes this particular ice cream creation so delicious, using specific
    details from the image (but not the text or brand names).
    Do not use the words "induldge", "delightful", or "decadent", just get right
    into what the ice cream creation is when describing it.
    """

  video_gen_operation = client.models.generate_videos(
    model=video_model,
    prompt=video_prompt,
    image=types.Image.from_file(location=image_file_name),
    config=types.GenerateVideosConfig(
      aspect_ratio="16:9",
      number_of_videos=1,
      duration_seconds=8,
      resolution="1080p",
      person_generation="allow_adult",
      enhance_prompt=True,
      generate_audio=True
      )
    )

  while not video_gen_operation.done:
    time.sleep(15)
    video_gen_operation = client.operations.get(video_gen_operation)
    print("Video generation in progress")

  if video_gen_operation.response:
    print("Video generation complete")
    gen_video = video_gen_operation.result.generated_videos[0].video

    return gen_video

  else:
    print("Video generation failed")

def generate_video_for_row(row):
  # Can modify here whether to use original images or edited ones
  # image_folder_and_file_name = f"{row['folder']}/{row['file_name']}"
  image_folder_and_file_name = row['edited_image_folder_and_file_name']

  video = generate_video_from_image(image_folder_and_file_name,
    row['image_name'])

  return video

with concurrent.futures.ThreadPoolExecutor() as executor:
  videos = list(executor.map(generate_video_for_row, images.to_dict('records')))

images['video'] = videos

display(images)

# Putting Things Together and Outputting
---

### Output all videos as .mp4 files to local folder

In [None]:
# Limit down to images where actual video was generated
images_with_video = (images[
  ~pd.isna(images['video'])].
  reset_index(drop = True)
  )

# loop over videos in images df and output as .mp4 files to videos folder
for index, row in images_with_video.iterrows():
  video_file_name = f"{row['file_name']}".replace(".jpg", ".mp4")
  video_file_path = os.path.join(videos_folder, video_file_name)
  with open(video_file_path, "wb") as out_file:
    out_file.write(row['video'].video_bytes)

print(f"Videos saved to folder: {videos_folder}")

### Combine all video files into 1 longer video

In [None]:
# Create a file containing the list of video files
video_list_path = os.path.join(videos_folder, "video_list.txt")

video_files = [file for file in os.listdir(videos_folder)
  if os.path.isfile(os.path.join(videos_folder, file)) and file.endswith('.mp4')
  ]

video_files.sort()

with open(video_list_path, "w") as f:
  for video_file in video_files:
    f.write(f"file '{video_file}'\n")

# Concatenate the videos using ffmpeg
!ffmpeg -y -f concat -safe 0 -i {video_list_path} -c copy combined_video.mp4

### Download combined video

In [None]:
from google.colab import files
files.download('combined_video.mp4')

### Display the combined video

In [None]:
# show_video("combined_video.mp4")