# **LAB_6 Veo2 for text-to-video**


Are your users receiving your messages but still not returning to your game? Sometimes, a static message just isn't enough to capture their attention and draw them back into the action.

Veo 2 on Vertex AI brings Google's state of the art video generation capabilities to application developers. It's capable of creating videos with astonishing detail that simulate real-world physics across a wide range of visual styles.

In this lab we will explore how to leverage Veo 2 for video generation, powered by the creative writing process of Gemini to craft effective prompts. 

## Initialize

In [None]:
import sys
from IPython.display import HTML
import IPython.display
import google.auth
import requests
import json
import uuid
import base64
import os
import random
import time
import datetime
import base64
import random
import datetime

import logging
from tenacity import retry, wait_exponential, stop_after_attempt, before_sleep_log, retry_if_exception

## Setting parameters 

**!! IMPORTANT**: Set your location and dataset name and verify the output

In [None]:
bigquery_location = "us"
region = "us"
location = "us-central1"
team_name    = ""
dataset_name = "datathon_ds_{}".format(team_name)

# Get the current date and time
now = datetime.datetime.now()

# Format the date and time as desired
formatted_date = now.strftime("%Y-%m-%d-%H-%M")

# Get some values using gcloud
project_id = !(gcloud config get-value project)
user = !(gcloud auth list --filter=status:ACTIVE --format="value(account)")

if len(project_id) != 1:
  raise RuntimeError(f"project_id is not set: {project_id}")
project_id = project_id[0]

if len(user) != 1:
  raise RuntimeError(f"user is not set: {user}")
user = user[0]

print(f"project_id = {project_id}")
print(f"user = {user}")
print(f"location = {location}")
print(f"dataset_name = {dataset_name}")

## Helper methods

### RestAPIHelper: calls the Google Cloud REST API using the current users credentials.

In [None]:
def restAPIHelper(url: str, http_verb: str, request_body: str) -> str:
  """Calls the Google Cloud REST API passing in the current users credentials"""

  import requests
  import google.auth
  import json

  # Get an access token based upon the current user
  creds, project = google.auth.default()
  auth_req = google.auth.transport.requests.Request()
  creds.refresh(auth_req)
  access_token=creds.token

  headers = {
    "Content-Type" : "application/json",
    "Authorization" : "Bearer " + access_token
  }

  if http_verb == "GET":
    response = requests.get(url, headers=headers)
  elif http_verb == "POST":
    response = requests.post(url, json=request_body, headers=headers)
  elif http_verb == "PUT":
    response = requests.put(url, json=request_body, headers=headers)
  elif http_verb == "PATCH":
    response = requests.patch(url, json=request_body, headers=headers)
  elif http_verb == "DELETE":
    response = requests.delete(url, headers=headers)
  else:
    raise RuntimeError(f"Unknown HTTP verb: {http_verb}")

  if response.status_code == 200:
    return json.loads(response.content)
    #image_data = json.loads(response.content)["predictions"][0]["bytesBase64Encoded"]
  else:
    error = f"Error restAPIHelper -> ' Status: '{response.status_code}' Text: '{response.text}'"
    raise RuntimeError(error)

### RetryCondition (for retrying LLM calls)

In [None]:
def RetryCondition(error):
  error_string = str(error)
  print(error_string)

  retry_errors = [
      "RESOURCE_EXHAUSTED",
      "No content in candidate",
      # Add more error messages here as needed
  ]

  for retry_error in retry_errors:
    if retry_error in error_string:
      print("Retrying...")
      return True

  return False

### Gemini LLM (2.0-flash)

In [None]:
@retry(wait=wait_exponential(multiplier=1, min=1, max=60), stop=stop_after_attempt(10), retry=retry_if_exception(RetryCondition), before_sleep=before_sleep_log(logging.getLogger(), logging.INFO))
def GeminiLLM(prompt, model = "gemini-2.0-flash", response_schema = None,
                 temperature = 1, topP = 1, topK = 32):

  # https://cloud.google.com/vertex-ai/generative-ai/docs/model-reference/inference#supported_models
  # model = "gemini-2.0-flash"

  llm_response = None
  if temperature < 0:
    temperature = 0

  creds, project = google.auth.default()
  auth_req = google.auth.transport.requests.Request() # required to acess access token
  creds.refresh(auth_req)
  access_token=creds.token

  headers = {
      "Content-Type" : "application/json",
      "Authorization" : "Bearer " + access_token
  }

  # https://cloud.google.com/vertex-ai/generative-ai/docs/model-reference/inference
  url = f"https://{location}-aiplatform.googleapis.com/v1/projects/{project_id}/locations/{location}/publishers/google/models/{model}:generateContent"

  generation_config = {
    "temperature": temperature,
    "topP": topP,
    "maxOutputTokens": 8192,
    "candidateCount": 1,
    "responseMimeType": "application/json",
  }

  # Add inthe response schema for when it is provided
  if response_schema is not None:
    generation_config["responseSchema"] = response_schema

  if model == "gemini-2.0-flash":
    generation_config["topK"] = topK

  payload = {
    "contents": {
      "role": "user",
      "parts": {
          "text": prompt
      },
    },
    "generation_config": {
      **generation_config
    },
    "safety_settings": {
      "category": "HARM_CATEGORY_SEXUALLY_EXPLICIT",
      "threshold": "BLOCK_LOW_AND_ABOVE"
    }
  }

  response = requests.post(url, json=payload, headers=headers)

  if response.status_code == 200:
    try:
      json_response = json.loads(response.content)
    except Exception as error:
      raise RuntimeError(f"An error occurred parsing the JSON: {error}")

    if "candidates" in json_response:
      candidates = json_response["candidates"]
      if len(candidates) > 0:
        candidate = candidates[0]
        if "content" in candidate:
          content = candidate["content"]
          if "parts" in content:
            parts = content["parts"]
            if len(parts):
              part = parts[0]
              if "text" in part:
                text = part["text"]
                llm_response = text
              else:
                raise RuntimeError("No text in part: {response.content}")
            else:
              raise RuntimeError("No parts in content: {response.content}")
          else:
            raise RuntimeError("No parts in content: {response.content}")
        else:
          raise RuntimeError("No content in candidate: {response.content}")
      else:
        raise RuntimeError("No candidates: {response.content}")
    else:
      raise RuntimeError("No candidates: {response.content}")

    # Remove some typically response characters (if asking for a JSON reply)
    llm_response = llm_response.replace("```json","")
    llm_response = llm_response.replace("```","")
    llm_response = llm_response.replace("\n","")

    return llm_response

  else:
    raise RuntimeError(f"Error with prompt:'{prompt}'  Status:'{response.status_code}' Text:'{response.text}'")

### Get users

In [None]:
def GetUsers():
  sql = f"""SELECT user_pseudo_id, cnt_post_score
  FROM `{dataset_name}.cc_full_dataset_for_genai`
  WHERE churned=1
  ORDER BY user_pseudo_id"""

  result_df = RunQuery(sql)
  result_list = []

  for index, row in result_df.iterrows():
    result_list.append({
        "user_pseudo_id": row['user_pseudo_id'],
        "cnt_post_score": row['cnt_post_score']
    })

  return result_list

In [None]:
def RunQuery(sql):
  import time
  from google.cloud import bigquery
  client = bigquery.Client()

  if (sql.startswith("SELECT") or sql.startswith("WITH")):
      df_result = client.query(sql).to_dataframe()
      return df_result
  else:
    job_config = bigquery.QueryJobConfig(priority=bigquery.QueryPriority.INTERACTIVE)
    query_job = client.query(sql, job_config=job_config)

    # Check on the progress by getting the job's updated state.
    query_job = client.get_job(
        query_job.job_id, location=query_job.location
    )
    print("Job {} is currently in state {} with error result of {}".format(query_job.job_id, query_job.state, query_job.error_result))

    while query_job.state != "DONE":
      time.sleep(2)
      query_job = client.get_job(
          query_job.job_id, location=query_job.location
          )
      print("Job {} is currently in state {} with error result of {}".format(query_job.job_id, query_job.state, query_job.error_result))

    if query_job.error_result == None:
      return True
    else:
      raise Exception(query_job.error_result)

In [None]:
# This was generated by GenAI

def copy_file_to_gcs(local_file_path, bucket_name, destination_blob_name):
  """Copies a file from a local drive to a GCS bucket.

  Args:
      local_file_path: The full path to the local file.
      bucket_name: The name of the GCS bucket to upload to.
      destination_blob_name: The desired name of the uploaded file in the bucket.

  Returns:
      None
  """

  import os
  from google.cloud import storage

  # Ensure the file exists locally
  if not os.path.exists(local_file_path):
      raise FileNotFoundError(f"Local file '{local_file_path}' not found.")

  # Create a storage client
  storage_client = storage.Client()

  # Get a reference to the bucket
  bucket = storage_client.bucket(bucket_name)

  # Create a blob object with the desired destination path
  blob = bucket.blob(destination_blob_name)

  # Upload the file from the local filesystem
  content_type = ""
  if local_file_path.endswith(".html"):
    content_type = "text/html; charset=utf-8"

  if local_file_path.endswith(".json"):
    content_type = "application/json; charset=utf-8"

  if content_type == "":
    blob.upload_from_filename(local_file_path)
  else:
    blob.upload_from_filename(local_file_path, content_type = content_type)

  print(f"File '{local_file_path}' uploaded to GCS bucket '{bucket_name}' as '{destination_blob_name}.  Content-Type: {content_type}'.") 

In [None]:
# prompt: python to delete a file even if it does not exist

def delete_file(filename):
  try:
    os.remove(filename)
    print(f"File '{filename}' deleted successfully.")
  except FileNotFoundError:
    print(f"File '{filename}' not found.")

In [None]:
def PrettyPrintJson(json_string):
  json_object = json.loads(json_string)
  json_formatted_str = json.dumps(json_object, indent=2)
  # print(json_formatted_str)
  return json_formatted_str

## Teach the LLM how to write Veo 2 prompts

In [None]:
# We need to tell the LLM how to write text-to-video prompts

text_to_video_prompt_guide = """
Text-to-Video Prompt Writing Help:

Here are some our best practices for text-to-video prompts:

Detailed prompts = better videos:
  - More details you add, the more control you’ll have over the video.
  - A prompt should look like this: "Camera dollies to show a close up of a desperate man in a green trench coat is making a call on a rotary style wall-phone, green neon light, movie scene."
    - Here is a break down of elements need to create a text-to-video prompt using the above prompt as an example:
      - "Camera dollies to show" = "Camera Motion"
      - "A close up of" = "Composition"
      - "A desperate man in a green trench coat" = "Subject"
      - "Is making a call" = "Action"
      - "On a roary style wall-phone" = "Scene"
      - "Green Neon light" = "Ambiance"
      - "Movie Scene" = "Style"

Use the right keywords for better control:
  - Here is a list of some keywords that work well with text-to-video, use these in your prompts to get the desired camera motion or style.
  - Subject: Who or what is the main focus of the shot.  Example: "happy woman in her 30s".
  - Scene: Where is the location of the shot. Example "on a busy street, in space".
  - Action: What is the subject doing Examples: "walking", "running", "turning head".
  - Camera Motion: What the camera is doing. Example: "POV shot", "Aerial View", "Tracking Drone view", "Tracking Shot".

Example text-to-video prompt using the above keywords:
  - Example text-to-video prompt: "Tracking drone view of a man driving a red convertible car in Palm Springs, 1970s, warm sunlight, long shadows"
  - Example text-to-video prompt: "A POV shot from a vintage car driving in the rain, Canada at night, cinematic"

Styles:
   - Overall aesthetic. Consider using specific film style keywords.  Examples: "horror film", "film noir, "animated styles", "3D cartoon style render".
  - Example text-to-video prompt: "Over the shoulder of a young woman in a car, 1970s, film grain, horror film, cinematic he Film noir style, man and woman walk on the street, mystery, cinematic, black and white"
  - Example text-to-video prompt: "A cute creatures with snow leopard-like fur is walking in winter forest, 3D cartoon style render. An architectural rendering of a white concrete apartment building with flowing organic shapes, seamlessly blending with lush greenery and futuristic elements."

Composition:
  - How the shot is framed. This is often relative to the subject e.g. wide shot, close-up, low angle
  - Example text-to-video prompt: "Extreme close-up of a an eye with city reflected in it. A wide shot of surfer walking on a beach with a surfboard, beautiful sunset, cinematic"

Ambiance & Emotions:
  - How the color and light contribute to the scene (blue tones, night)
  - Example text-to-video prompt: "A close-up of a girl holding adorable golden retriever puppy in the park, sunlight Cinematic close-up shot of a sad woman riding a bus in the rain, cool blue tones, sad mood"

Cinematic effects:
  - e.g. double exposure, projected, glitch camera effect.
  - Example text-to-video prompt: "A double exposure of silhouetted profile of a woman walking and lake, walking in a forest Close-up shot of a model with blue light with geometric shapes projected on her face"
  - Example text-to-video prompt: "Silhouette of a man walking in collage of cityscapes Glitch camera effect, close up of woman’s face speaking, neon colors"

"""

## Generate Veo 2 Prompts using Gemini (let Gemini write the prompts for us)

In [None]:
text_to_video_prompts = []

In [None]:
response_schema = {
  "type": "object",
  "required": [
    "text-to-video-prompt"
  ],
  "properties": {
    "text-to-video-prompt": {
      "type": "string"
    }
  }
}

user_list = GetUsers()
user_pseudo_id = random.randint(1, len(user_list))
cnt_post_score = user_list[user_pseudo_id-1]["cnt_post_score"]

gemini_ad_prompt = f"""You are a marketing expert at gaming company and are creating cacthy video for churning gamers. 
Your user didn't engage with the game for 24h and want to encourage them to play more, and you want to motivate them to come back to play more by letting them know about their current score.
The video should be funny message that will remind the user about the game they played.

The video should be about the following user score:
{user_pseudo_id}: {cnt_post_score}

Output Fields:
- "text-to-video-prompt":
 - Read the  "Text-to-Video Prompt Writing Help" to learn more about how to create good text-to-video prompts.
 - Make sure you include all the relevant best practices when creating the text-to-video prompt:
 - A detailed prompt for generating the video using text-to-video technology.
 - Include "text overlays" in the text-to-video prompt with a uplifting message saying that it seems like he forgot about the game.
 - Do not include children in the text-to-video prompt.

{text_to_video_prompt_guide}
"""

#print(gemini_ad_prompt)
llm_result = GeminiLLM(gemini_ad_prompt, response_schema=response_schema)
gemini_ad_results_dict = json.loads(llm_result)
orginal_ad_results_dict = gemini_ad_results_dict # in case we swap it for a pre-canned one
print(f"User Name: {user_pseudo_id} - {cnt_post_score}")
print()
print(PrettyPrintJson(json.dumps(gemini_ad_results_dict)))
text_to_video_prompts.append(gemini_ad_results_dict)

## Video Generation using function generateVideo

In [None]:
def generateVideo(prompt, storage_account, output_gcs_path):
  """Calls text-to-video to create the video and waits for the output (which can be several minutes).  Saves the prompt/parameters with the vidoe.  Returns the outputted path."""

  full_output_gcs_path = f"gs://{storage_account}/generated_videos"
  model = "veo-2.0-generate-001"
  url = f"https://{location}-aiplatform.googleapis.com/v1beta1/projects/{project_id}/locations/{location}/publishers/google/models/{model}:predictLongRunning"

  request_body = {
      "instances": [
          {
              "prompt": prompt
          }
        ],
      "parameters": {
          "storageUri": full_output_gcs_path,
          "aspectRatio":"16:9"
          }
      }

  rest_api_parameters = request_body.copy()

  print(f"url: {url}")
  print(f"request_body: {request_body}")
  json_result = restAPIHelper(url, "POST", request_body)
  print(f"json_result: {json_result}")
  operation_name = json_result["name"] # odd this is name

  url = f"https://{location}-aiplatform.googleapis.com/v1beta1/projects/{project_id}/locations/{location}/publishers/google/models/{model}:fetchPredictOperation"

  request_body = {
      "operationName": operation_name
      }

  status = False

  while status == False:
    time.sleep(10)
    print(f"url: {url}")
    print(f"request_body: {request_body}")
    json_result = restAPIHelper(url, "POST", request_body)
    print(f"json_result: {json_result}")
    if "done" in json_result:
      status = bool(json_result["done"]) # in the future might be a status of running
    else:
      print("Status 'done' JSON attribute not present.  Assuming not done...")

  # Get the filename of our video
  filename = json_result["response"]["videos"][0]["gcsUri"]

  # Save our prompt (this was we know what we used to generate the video)
  json_filename = "text-to-video-prompt.json"
  with open(json_filename, "w") as f:
    f.write(json.dumps(rest_api_parameters))

  # get the random number directory from text-to-video
  text_to_video_output_directory = filename.replace(full_output_gcs_path,"")
  text_to_video_output_directory = text_to_video_output_directory.split("/")[1]
  text_to_video_output_directory

  # Write the prompt to the same path as our outputted video.  Saving the prompt allow us to know how to regenerate it (you should also save the seed and any other settings)
  copy_file_to_gcs(json_filename, storage_account, f"{output_gcs_path}/{text_to_video_output_directory}/{json_filename}")
  delete_file(json_filename)

  return filename

### Create google cloud storage bucket to store your generated prompt and videos.

In [None]:
!gsutil mb -p $project_id -l US-CENTRAL1 gs://<YOUR_BUCKET_NAME>

## Call Veo2 API with Gemini generated prompt

!! IMPORTANT: Copy the above generated prompt, and your bucket name.

In [None]:
video_file_uri = generateVideo(
    prompt="<YOUR_PROMPT>",
    storage_account="<YOUR_BUCKET_NAME>",
    output_gcs_path=f"gs://{storage_account}/generated_videos"
)

if video_file_uri:
    print(f"Video generation started, final URI will be: {video_file_uri}")
else:
    print("Video generation call failed or returned no URI.")