## <font color='#4285f4'>Overview</font>

This notebook demonstrates an AI-powered method of evaluating campaign videos based on the ABCDs framework built and maintained by the Google Creative Works team. It leverages a [GitHub project](https://github.com/google-marketing-solutions/abcds-detector) built by Ana Esqueda to score the auto-generated brand videos created by Veo and suggest areas for improvement.

Process Flow: 
1. ABCD Criteria & Assessment Functions:
    - Define functions to evaluate each of the 23 ABCD features, using both Video Intelligence API annotations (e.g., shot detection, text detection) and Gemini LLM's understanding of video content.
    - The LLM assesses video features through prompts tailored for each criterion.
    - The functions calculate scores and provide detailed explanations for the assessments.
2. Execute Assessment:
    - Generates video annotations using the Video Intelligence API for all brand videos in the GCS bucket.
    - Trims the videos to create 5-second versions for certain assessments.
    - Executes the ABCD assessment for each video, combining API annotations and LLM evaluations.
    - Parses the assessment results and prints a summary for each video, including score, overall result, and evaluation of each feature.
3. Save Results:
    - Saves the parsed results (brand name, video name, score, result text, feature details, feature_timestamps etc.) to the BigQuery table (campaign_abcd_results) for long-term analysis.


Author: Ana Esqueda (with small integration edits by Paul Ramsey)

## <font color='#4285f4'>License</font>





```
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
```

## <font color='#4285f4'>Pre-req's</font>

### Initialize

In [None]:
!pip install --upgrade google-cloud-videointelligence \
  google-auth==2.23.0 \
  google-cloud-aiplatform \
  google-cloud-storage \
  moviepy \
  google-api-python-client

# Install gcsfuse
# From https://cloud.google.com/storage/docs/gcsfuse-quickstart-mount-bucket
!echo "deb https://packages.cloud.google.com/apt gcsfuse-`lsb_release -c -s` main" | sudo tee /etc/apt/sources.list.d/gcsfuse.list
!curl -s https://packages.cloud.google.com/apt/doc/apt-key.gpg | sudo apt-key add -
!sudo apt-get -q update
!sudo apt-get -q install fuse gcsfuse
!gcsfuse -v

In [None]:
# Restart session - You will get an error saying "Your session crashed for an unknown reason"
# This is expected, and the runtime will automatically reconnect. 

import os
os.kill(os.getpid(), 9)

In [None]:
import json
import random
import time
import datetime
import base64
import vertexai
import os
import urllib
from google.cloud import storage
from google.cloud import videointelligence
from moviepy.editor import VideoFileClip
import vertexai.preview.generative_models as generative_models
from vertexai.preview.generative_models import GenerativeModel, Part
from googleapiclient.errors import HttpError
from IPython.display import HTML
from IPython.display import Video
from base64 import b64encode
from IPython.display import YouTubeVideo
from google.cloud import videointelligence_v1 as videointelligence2
from tenacity import retry, wait_exponential, stop_after_attempt, before_sleep_log, retry_if_exception
import logging
from IPython.display import HTML
from IPython.display import Audio
import markdown
import re


from google.cloud import bigquery
client = bigquery.Client()

# Set these (run this cell to verify the output)
bigquery_location = "${bigquery_location}"
region = "${region}"

# Get some values using gcloud
project_id = !(gcloud config get-value project)
user = !(gcloud auth list --filter=status:ACTIVE --format="value(account)")


if len(project_id) != 1:
  raise RuntimeError(f"project_id is not set: {project_id}")
project_id = project_id[0]

if len(user) != 1:
  raise RuntimeError(f"user is not set: {user}")
user = user[0]

bucket_name = "${chocoate_ai_bucket}"
project_number = !(gcloud projects describe $project_id --format="value(projectNumber)")
project_number = project_number[0]

print(f"project_id = {project_id}")
print(f"project_number = {project_number}")
print(f"user = {user}")
print(f"bucket_name = {bucket_name}")

### Create Table

Create the `campaign_abcd_results` table to store results of the ABCDs assessments for long-term analysis.

In [None]:
%%bigquery

CREATE TABLE IF NOT EXISTS `${project_id}.${bigquery_chocoate_ai_dataset}.campaign_abcd_results`
(
    assessment_id             STRING    DEFAULT   GENERATE_UUID()     OPTIONS(description="Unique identifier for the assessment."),
    assessment_date           TIMESTAMP DEFAULT   CURRENT_TIMESTAMP() OPTIONS(description="The date and time the assessment was run."),
    brand_name                STRING    NOT NULL  OPTIONS(description="The name of the brand."),
    video_name                STRING    NOT NULL  OPTIONS(description="The name of the video."),
    video_url                 STRING    OPTIONS(description="URL of the video being assessed."),
    score                     FLOAT64   OPTIONS(description="Overall score of the assessment."),
    result_text               STRING    OPTIONS(description="Summary text of the assessment result."),
    passed_features_count     FLOAT64   OPTIONS(description="Number of features that passed the assessment."),
    total_features_count      FLOAT64   OPTIONS(description="Total number of features assessed."),
    features_detail           JSON      OPTIONS(description="Detailed information about each feature and its assessment result."),
    feature_timestamps        JSON      OPTIONS(description="Timestamps for each feature and its assessment result."),
) CLUSTER BY assessment_id;

## <font color='#4285f4'>Setup ABCDs Environment</font>

NOTE: This notebook is based on the work of Ana Esqueda in the [google-marketing-solutions/abdcs-detector](https://github.com/google-marketing-solutions/abcds-detector) repo. Please refer to the source repo for updates and enhancements to this product.

### Define environment variables

In [None]:
"""Module that defines the colab parameters"""
# @markdown ### Knowledge Graph Key
# @markdown Generate an API Key to connect to the Knowledge Graph API to find entities such as brands, products, etc., to match with video annotation results.

# @markdown To generate an API key, please follow the steps [here](https://support.google.com/googleapi/answer/6158862?hl=en), then enter the key in the box below.


KNOWLEDGE_GRAPH_API_KEY = ""  # @param {type:"string"}

# @markdown ### Brand and Product Details
# @markdown Providing video hints helps the AI model perform a better evaluation.

brand_name = "chocolate_ai"  # @param {type:"string"}
brand_variations_str = "chocolateai"  # @param {type:"string"}
branded_products_str = "Chocolate Tasting Flight, Chocolate Decadence, Molten Caramel Surprise"  # @param {type:"string"}
branded_products_categories_str = "chocolate, cake, coffee"  # @param {type:"string"}
branded_call_to_actions_str = "Indulge in the artistry of Chocolate AI"  # @param {type:"string"}

# @markdown ### Solution Setup
# @markdown Advanced options that allow only parts of the solution to run.

VIDEO_SIZE_LIMIT_MB = 40  # @param {type:"number"}
VERBOSE = True  # @param {type:"boolean"}
use_llms = True  # @param {type:"boolean"}
use_annotations = True # @param {type:"boolean"}
# For local testing outside colab ONLY, set to False for colab
STORE_ASSESSMENT_RESULTS_LOCALLY = True # @param {type:"boolean"}
TEST_RESULTS = []

# @markdown ### ABCD Framework Details
# @markdown Video analysis parameters to generate text, runs first.

early_time_seconds = 5
confidence_threshold = 0.5  # @param {type:"number"}
face_surface_threshold = 0.15  # @param {type:"number"}
logo_size_threshold = 3.5  # @param {type:"number"}
avg_shot_duration_seconds = 2  # @param {type:"number"}
dynamic_cutoff_ms = 3000  # @param {type:"number"}


# @markdown ### LLM Configuration
# @markdown Tune the text analysis model, runs second.

GEMINI_PRO = "gemini-1.5-pro-001"  # @param {type:"string"}
llm_location = "${location}"  # @param {type:"string"}
max_output_tokens = 8192  # @param {type:"number"}
temperature = 1  # @param {type:"number"}
top_p = 0.95  # @param {type:"number"}
top_k = 32  # @param {type:"number"}

### Mount Bucket and Transfer Video Files

In [None]:
# Copy the sample ad video
!gsutil -m cp gs://data-analytics-golden-demo/chocolate-ai/v1/Campaign-Assets-Text-to-Video-01/story-01/full-video-with-audio-en-GB.mp4 gs://"$bucket_name/chocolate_ai/videos/chocolate-ai_story-HITL-01_full-video-with-audio-en-GB.mp4"
!gsutil -m cp gs://data-analytics-golden-demo/chocolate-ai/v1/Campaign-Assets-Text-to-Video-01/story-02/full-video-with-audio-en-GB.mp4 gs://"$bucket_name/chocolate_ai/videos/chocolate-ai_story-HITL-02_full-video-with-audio-en-GB.mp4"
!gsutil -m cp gs://data-analytics-golden-demo/chocolate-ai/v1/Campaign-Assets-Text-to-Video-01/story-03/full-video-with-audio-en-GB.mp4 gs://"$bucket_name/chocolate_ai/videos/chocolate-ai_story-HITL-03_full-video-with-audio-en-GB.mp4"
!gsutil -m cp gs://data-analytics-golden-demo/chocolate-ai/v1/Campaign-Assets-Text-to-Video-02/story-01/full-video-with-audio-en-GB.mp4 gs://"$bucket_name/chocolate_ai/videos/chocolate-ai_story-No-HITL-03_full-video-with-audio-en-GB.mp4"

# Mount the Google Cloud Storage Bucket
%env bucket_name={bucket_name}

!mkdir /content/$bucket_name
!gcsfuse --implicit-dirs $bucket_name /content/$bucket_name

### Load Helper Methods

#### restAPIHelper()

In [None]:
def restAPIHelper(url: str, http_verb: str, request_body: str) -> str:
  """Calls the Google Cloud REST API passing in the current users credentials"""

  import requests
  import google.auth
  import json

  # Get an access token based upon the current user
  creds, project = google.auth.default()
  auth_req = google.auth.transport.requests.Request()
  creds.refresh(auth_req)
  access_token=creds.token

  headers = {
    "Content-Type" : "application/json",
    "Authorization" : "Bearer " + access_token
  }

  if http_verb == "GET":
    response = requests.get(url, headers=headers)
  elif http_verb == "POST":
    response = requests.post(url, json=request_body, headers=headers)
  elif http_verb == "PUT":
    response = requests.put(url, json=request_body, headers=headers)
  elif http_verb == "PATCH":
    response = requests.patch(url, json=request_body, headers=headers)
  elif http_verb == "DELETE":
    response = requests.delete(url, headers=headers)
  else:
    raise RuntimeError(f"Unknown HTTP verb: {http_verb}")

  if response.status_code == 200:
    return json.loads(response.content)
    #image_data = json.loads(response.content)["predictions"][0]["bytesBase64Encoded"]
  else:
    error = f"Error restAPIHelper -> ' Status: '{response.status_code}' Text: '{response.text}'"
    raise RuntimeError(error)

#### RunQuery(sql)

In [None]:
def RunQuery(sql, job_config = None):
  import time

  if (sql.startswith("SELECT") or sql.startswith("WITH")):
      df_result = client.query(sql).to_dataframe()
      return df_result
  else:
    if job_config == None:
      job_config = bigquery.QueryJobConfig(priority=bigquery.QueryPriority.INTERACTIVE)
    query_job = client.query(sql, job_config=job_config)

    # Check on the progress by getting the job's updated state.
    query_job = client.get_job(
        query_job.job_id, location=query_job.location
    )
    print("Job {} is currently in state {} with error result of {}".format(query_job.job_id, query_job.state, query_job.error_result))

    while query_job.state != "DONE":
      time.sleep(2)
      query_job = client.get_job(
          query_job.job_id, location=query_job.location
          )
      print("Job {} is currently in state {} with error result of {}".format(query_job.job_id, query_job.state, query_job.error_result))

    if query_job.error_result == None:
      return True
    else:
      return False

#### RetryCondition(error)

In [None]:
def RetryCondition(error):
  error_string = str(error)
  print(error_string)

  retry_errors = [
      "RESOURCE_EXHAUSTED",
      "No content in candidate",
      "429 Unable to submit request because the service is temporarily out of capacity",
      # Add more error messages here as needed
  ]

  for retry_error in retry_errors:
    if retry_error in error_string:
      print("Retrying...")
      return True

  return False

#### ExtractTimestampsFromText()

In [None]:
def ExtractTimestampsFromText(parsed_abcd_result):
  timestamp_array = []
  timestamp_pattern = r"\b\d+:\d{2}(?:(?:\s*(?:and|[-,])\s*)?\d+:\d{2})*\b"
  timestamp_video = parsed_abcd_result['video_name']
  for detail in parsed_abcd_result['features_detail']:
    timestamp_feature = detail['feature']
    timestamp_feature_detected = detail['feature_detected']
    
    # Initialize an empty list to store timestamps for the current feature
    all_timestamps = []
    all_explanations = []

    for llm_detail in detail['llm_details']:
      try:
        timestamps =  re.findall(timestamp_pattern, llm_detail['llm_explanation'])
        # Extend the list with timestamps found in this llm_detail
        all_timestamps.extend(timestamps)
        all_explanations.append(llm_detail['llm_explanation'])
      except:
        pass

    # Create a single entry for the feature with all timestamps
    timestamp_array.append({
        'feature': timestamp_feature,
        'feature_detected': timestamp_feature_detected,
        'timestamps': all_timestamps,
        'explanation': all_explanations  # Store all explanations
    })

  return {'video': timestamp_video, 'feature_timestamps': timestamp_array}

#### Video Annotations

Generate video annotations using Video Intelligence API

Note: No output is expected from this section.

In [None]:
@retry(wait=wait_exponential(multiplier=1, min=1, max=60), stop=stop_after_attempt(10), retry=retry_if_exception(RetryCondition), before_sleep=before_sleep_log(logging.getLogger(), logging.INFO))
def generate_video_annotations(brand_name: str):
    """Generates video annotations for videos in Google Cloud Storage
    Args:
        brand_name: the brand to generate the video annotations for
    """
    # Get videos from GCS
    bucket = get_bucket()
    brand_videos_folder = f"{brand_name}/videos"
    blobs = bucket.list_blobs(prefix=brand_videos_folder)
    # Video processing
    for video in blobs:
        if video.name == f"{brand_videos_folder}/" or "1st_5_secs" in video.name:
            # Skip parent folder and trimmed versions of videos
            continue
        video_name, video_name_with_format = get_file_name_from_gcs_url(video.name)
        video_location = f"gs://{bucket_name}/{video.name}"
        video_annotations = get_existing_annotations_from_gcs(brand_name)
        # Generate video annotations
        generate_annotations_for_video(
            brand_name,
            video_name,
            video_name_with_format,
            video_location,
            video_annotations,
        )


def generate_annotations_for_video(
    brand_name: str,
    video_name: str,
    video_name_with_format: str,
    video_location: str,
    existing_video_annotations: list[str],
):
    """Generates video annotations only if the video hasn't been processed
    Args:
        brand_name: the brand to generate the video annotations for
        video_name: the name of the video to generate the annotations for
        video_name_with_format: video name and format
        existing_video_annotations: a list of existing annotations to avoid generating
        them for the same video
    """

    # Label Detection
    label_detection_output = (
        f"gs://{bucket_name}/{brand_name}/annotations/{video_name}/label-detection.json"
    )
    if label_detection_output not in existing_video_annotations:
        detect_labels(video_location, label_detection_output)
    else:
        print(
            f"Label annotations for video {video_name_with_format} already exist, API request skipped.\n"
        )

    # Face Detection
    face_detection_output = (
        f"gs://{bucket_name}/{brand_name}/annotations/{video_name}/face-detection.json"
    )
    if face_detection_output not in existing_video_annotations:
        detect_faces(video_location, face_detection_output)
    else:
        print(
            f"Face annotations for video {video_name_with_format} already exist, API request skipped.\n"
        )

    # People Detection
    people_detection_output = f"gs://{bucket_name}/{brand_name}/annotations/{video_name}/people-detection.json"
    if people_detection_output not in existing_video_annotations:
        detect_people(video_location, people_detection_output)
    else:
        print(
            f"People annotations for video {video_name_with_format} already exist, API request skipped.\n"
        )

    # Shot Detection
    shot_detection_output = (
        f"gs://{bucket_name}/{brand_name}/annotations/{video_name}/shot-detection.json"
    )
    if shot_detection_output not in existing_video_annotations:
        detect_shots(video_location, shot_detection_output)
    else:
        print(
            f"Shot annotations for video {video_name_with_format} already exist, API request skipped.\n"
        )

    # Text Detection
    text_detection_output = (
        f"gs://{bucket_name}/{brand_name}/annotations/{video_name}/text-detection.json"
    )
    if text_detection_output not in existing_video_annotations:
        detect_text(video_location, text_detection_output)
    else:
        print(
            f"Text annotations for video {video_name_with_format} already exist, API request skipped.\n"
        )

    # Logo Detection
    logo_detection_output = (
        f"gs://{bucket_name}/{brand_name}/annotations/{video_name}/logo-detection.json"
    )
    if logo_detection_output not in existing_video_annotations:
        detect_logos(video_location, logo_detection_output)
    else:
        print(
            f"Logo annotations for video {video_name_with_format} already exist, API request skipped.\n"
        )

    # Speech Detection
    speech_detection_output = f"gs://{bucket_name}/{brand_name}/annotations/{video_name}/speech-detection.json"
    if speech_detection_output not in existing_video_annotations:
        detect_speech(video_location, speech_detection_output)
    else:
        print(
            f"Speech annotations for video {video_name_with_format} already exist, API request skipped.\n"
        )


#### Label Detection

The Video Intelligence API can identify entities shown in video footage using the LABEL_DETECTION feature. This feature identifies objects, locations, activities, animal species, products, and more.

For more information visit the official Google Cloud documentation: [https://cloud.google.com/video-intelligence/docs/analyze-labels]

Note: No output is expected from this cell.

In [None]:
@retry(wait=wait_exponential(multiplier=1, min=1, max=60), stop=stop_after_attempt(10), retry=retry_if_exception(RetryCondition), before_sleep=before_sleep_log(logging.getLogger(), logging.INFO))
def detect_labels(input_gs_file_name: str, output_gs_file_name: str) -> None:
    """Detect labels in a video
    Args:
      input_gs_file_name: gcs bucket where the video is located
      output_gs_file_name: gcs bucket output for the video annotations
    """
    video_client = videointelligence.VideoIntelligenceServiceClient()

    features = [videointelligence.Feature.LABEL_DETECTION]
    operation = video_client.annotate_video(
        request={
            "features": features,
            "input_uri": input_gs_file_name,
            "output_uri": output_gs_file_name,
        }
    )
    print(f"\nProcessing video {input_gs_file_name} for label annotations...")

    result = operation.result(timeout=800)

    print(
        f"\nFinished processing video {input_gs_file_name} for label annotations...\n"
    )

#### Face Detection

The Video Intelligence API Face detection feature looks for faces in a video.

For more information visit the official Google Cloud documentation: https://cloud.google.com/video-intelligence/docs/face-detection

Note: No output is expected from this cell.

In [None]:
@retry(wait=wait_exponential(multiplier=1, min=1, max=60), stop=stop_after_attempt(10), retry=retry_if_exception(RetryCondition), before_sleep=before_sleep_log(logging.getLogger(), logging.INFO))
def detect_faces(input_gs_file_name: str, output_gs_file_name: str) -> None:
    """Detects faces in a video.
    Args:
      input_gs_file_name: gcs bucket where the video is located
      output_gs_file_name: gcs bucket output for the video annotations
    """

    video_client = videointelligence.VideoIntelligenceServiceClient()

    # Configure the request
    config = videointelligence.FaceDetectionConfig(
        include_bounding_boxes=True, include_attributes=True
    )
    context = videointelligence.VideoContext(face_detection_config=config)

    # Start the asynchronous request
    operation = video_client.annotate_video(
        request={
            "features": [videointelligence.Feature.FACE_DETECTION],
            "input_uri": input_gs_file_name,
            "output_uri": output_gs_file_name,
            "video_context": context,
        }
    )

    print(f"\nProcessing video {input_gs_file_name} for face annotations...")

    result = operation.result(timeout=800)

    print(f"\nFinished processing video {input_gs_file_name} for face annotations...\n")

#### People Detection

Video Intelligence can detect the presence of humans in a video file and track individuals across a video or video segment.

For more information visit the official Google Cloud documentation: https://cloud.google.com/video-intelligence/docs/people-detection

Note: No output is expected from this cell.

In [None]:
@retry(wait=wait_exponential(multiplier=1, min=1, max=60), stop=stop_after_attempt(10), retry=retry_if_exception(RetryCondition), before_sleep=before_sleep_log(logging.getLogger(), logging.INFO))
def detect_people(input_gs_file_name: str, output_gs_file_name: str) -> None:
    """Detects people in a video
    Args:
      input_gs_file_name: gcs bucket where the video is located
      output_gs_file_name: gcs bucket output for the video annotations
    """
    video_client = videointelligence2.VideoIntelligenceServiceClient()

    # Configure the request
    config = videointelligence2.types.PersonDetectionConfig(
        include_bounding_boxes=True,
        include_attributes=True,
        include_pose_landmarks=True,
    )
    context = videointelligence2.types.VideoContext(person_detection_config=config)

    # Start the asynchronous request
    operation = video_client.annotate_video(
        request={
            "features": [videointelligence2.Feature.PERSON_DETECTION],
            "input_uri": input_gs_file_name,
            "video_context": context,
            "output_uri": output_gs_file_name,
        }
    )

    print(f"\nProcessing video {input_gs_file_name} for people annotations...")

    result = operation.result(timeout=800)

    print(
        f"\nFinished processing video {input_gs_file_name} for people annotations...\n"
    )


#### Shot Detection
Shot change analysis detects shot changes in a video.

For more information visit the official Google Cloud documentation: https://cloud.google.com/video-intelligence/docs/analyze-shots

Note: No output is expected from this cell.

In [None]:
@retry(wait=wait_exponential(multiplier=1, min=1, max=60), stop=stop_after_attempt(10), retry=retry_if_exception(RetryCondition), before_sleep=before_sleep_log(logging.getLogger(), logging.INFO))
def detect_shots(input_gs_file_name: str, output_gs_file_name: str) -> None:
    """Detects camera shot changes in a video.
    Args:
      input_gs_file_name: gcs bucket where the video is located
      output_gs_file_name: gcs bucket output for the video annotations
    """
    video_client = videointelligence.VideoIntelligenceServiceClient()
    features = [videointelligence.Feature.SHOT_CHANGE_DETECTION]
    operation = video_client.annotate_video(
        request={
            "features": features,
            "input_uri": input_gs_file_name,
            "output_uri": output_gs_file_name,
        }
    )
    print(f"\nProcessing video {input_gs_file_name} for shot annotations...")

    result = operation.result(timeout=800)

    print(f"\nFinished processing video {input_gs_file_name} for shot annotations...\n")

#### Object Detection

Object tracking tracks objects detected in an input video. To make an object tracking request, call the annotate method and specify OBJECT_TRACKING in the features field.

For more information visit the official Google Cloud documentation: https://cloud.google.com/video-intelligence/docs/object-tracking

Note: No output is expected from this cell.


In [None]:
@retry(wait=wait_exponential(multiplier=1, min=1, max=60), stop=stop_after_attempt(10), retry=retry_if_exception(RetryCondition), before_sleep=before_sleep_log(logging.getLogger(), logging.INFO))
def detect_objects(input_gs_file_name: str, output_gs_file_name: str) -> None:
    """Detects objects in a video.
    Args:
      input_gs_file_name: gcs bucket where the video is located
      output_gs_file_name: gcs bucket output for the video annotations
    """
    video_client = videointelligence.VideoIntelligenceServiceClient()
    features = [videointelligence.Feature.OBJECT_TRACKING]
    operation = video_client.annotate_video(
        request={
            "features": features,
            "input_uri": input_gs_file_name,
            "output_uri": output_gs_file_name,
        }
    )
    print(f"\nProcessing video {input_gs_file_name} for object annotations...")

    result = operation.result(timeout=800)

    print(
        f"\nFinished processing video {input_gs_file_name} for object annotations...\n"
    )

#### Text Detection
Text Detection performs Optical Character Recognition (OCR), which detects and extracts text within an input video.

Text detection is available for all the languages supported by the Cloud Vision API.

For more information visit the official Google Cloud documentation: https://cloud.google.com/video-intelligence/docs/text-detection

Note: No output is expected from this cell.

In [None]:
@retry(wait=wait_exponential(multiplier=1, min=1, max=60), stop=stop_after_attempt(10), retry=retry_if_exception(RetryCondition), before_sleep=before_sleep_log(logging.getLogger(), logging.INFO))
def detect_text(input_gs_file_name: str, output_gs_file_name: str) -> None:
    """Detects text in a video.
    Args:
      input_gs_file_name: gcs bucket where the video is located
      output_gs_file_name: gcs bucket output for the video annotations
    """
    video_client = videointelligence.VideoIntelligenceServiceClient()
    features = [videointelligence.Feature.TEXT_DETECTION]

    operation = video_client.annotate_video(
        request={
            "features": features,
            "input_uri": input_gs_file_name,
            "output_uri": output_gs_file_name,
        }
    )

    print(f"\nProcessing video {input_gs_file_name} for text annotations...")

    result = operation.result(timeout=800)

    print(f"\nFinished processing video {input_gs_file_name} for text annotations...\n")

#### Logo Detection
The Video Intelligence API can detect, track, and recognize the presence of over 100,000 brands and logos in video content.

For more information visit the official Google Cloud documentation: https://cloud.google.com/video-intelligence/docs/logo-recognition

Note: No output is expected from this cell.

In [None]:
@retry(wait=wait_exponential(multiplier=1, min=1, max=60), stop=stop_after_attempt(10), retry=retry_if_exception(RetryCondition), before_sleep=before_sleep_log(logging.getLogger(), logging.INFO))
def detect_logos(input_gs_file_name: str, output_gs_file_name: str) -> None:
    """Detect logos in a video.
    Args:
      input_gs_file_name: gcs bucket where the video is located
      output_gs_file_name: gcs bucket output for the video annotations
    """
    video_client = videointelligence.VideoIntelligenceServiceClient()
    features = [videointelligence.Feature.LOGO_RECOGNITION]

    operation = video_client.annotate_video(
        request={
            "features": features,
            "input_uri": input_gs_file_name,
            "output_uri": output_gs_file_name,
        }
    )

    print(f"\nProcessing video {input_gs_file_name} for logo annotations...")

    response = operation.result(timeout=800)

    print(f"\nFinished processing video {input_gs_file_name} for logo annotations...\n")

#### Speech Detection

The Video Intelligence API transcribes speech to text from supported video files. There are two supported models, "default" and "video."

For more information visit the official Google Cloud documentation: https://cloud.google.com/video-intelligence/docs/transcription

Note: No output is expected from this cell.

In [None]:
@retry(wait=wait_exponential(multiplier=1, min=1, max=60), stop=stop_after_attempt(10), retry=retry_if_exception(RetryCondition), before_sleep=before_sleep_log(logging.getLogger(), logging.INFO))
def detect_speech(input_gs_file_name: str, output_gs_file_name: str) -> None:
    """Detects speech in a video.
    Args:
      input_gs_file_name: gcs bucket where the video is located
      output_gs_file_name: gcs bucket output for the video annotations
    """

    video_client = videointelligence.VideoIntelligenceServiceClient()
    features = [videointelligence.Feature.SPEECH_TRANSCRIPTION]

    config = videointelligence.SpeechTranscriptionConfig(
        language_code="en-US", enable_automatic_punctuation=True
    )
    video_context = videointelligence.VideoContext(speech_transcription_config=config)

    operation = video_client.annotate_video(
        request={
            "features": features,
            "input_uri": input_gs_file_name,
            "output_uri": output_gs_file_name,
            "video_context": video_context,
        }
    )

    print(f"\nProcessing video {input_gs_file_name} for speech annotations...")

    result = operation.result(timeout=800)

    print(
        f"\nFinished processing video {input_gs_file_name} for speech annotations...\n"
    )

#### Misc. ABCD Helpers

In [None]:
### DO NOT EDIT, vars built from user's input ###
def convert_string_to_list(list_str: str):
    """Converts a string to a list and
    removes white spaces from strings in list
    Args:
        list_str
    """
    cleaned_list = []
    for item in list_str.split(","):
        cleaned_list.append(item.strip())
    return cleaned_list

brand_variations = convert_string_to_list(brand_variations_str)
brand_variations.append(brand_name)
branded_products = convert_string_to_list(branded_products_str)
branded_products_categories = convert_string_to_list(branded_products_categories_str)
branded_call_to_actions = convert_string_to_list(branded_call_to_actions_str)

if VERBOSE:
    print("ABCD Detector parameters:")
    print(f"Brand Variations: {brand_variations}")
    print(f"Brand products: {branded_products}")
    print(f"Brand categories: {branded_products_categories}")
    print(f"Brand call to actions: {branded_call_to_actions}")

llm_generation_config = {
    "max_output_tokens": max_output_tokens,
    "temperature": temperature,
    "top_p": top_p,
    "top_k": top_k,
}

context_and_examples = """Only base your answers strictly on what information is available in the video attached.
Do not make up any information that is not part of the video.
Explain in a very detailed way the reasoning behind your answer.
Please present the extracted information in a VALID JSON format like this:
{
    "feature_detected": "True/False",
    "explanation": "..."
}
"""




def calculate_time_seconds(part_obj: dict, part: str) -> float:
    """Calculate time of the provided part of the video
    Args:
        part_obj: part of the video to calculate the time
        part: either start_time_offset or end_time_offset
    Returns:
        time_seconds: the time in seconds
    """
    if part not in part_obj:
        if VERBOSE:
            print(f"There is no part time {part} in {part_obj}")
            return 0
    time_seconds = (
        (part_obj.get(part).get("seconds") or 0)
        + ((part_obj.get(part).get("microseconds") or 0) / 1e6)
        + ((part_obj.get(part).get("nanos") or 0) / 1e9)
    )
    return time_seconds


def detected_text_in_first_5_seconds(annotation: dict) -> tuple[bool, any]:
    """Detect if the text feature appears in the first 5 seconds
    Args:
        annotation: the text annotation
    Returns:
        True if the text is found in the 1st 5 secs, False otherwise
        frame: the frame where the feature was found
    """
    for segment in annotation.get("segments"):
        start_time_secs = calculate_time_seconds(
            segment.get("segment"), "start_time_offset"
        )
        if start_time_secs > early_time_seconds:
            continue  # Ignore a segment > 5 secs
        frames = segment.get("frames")
        for frame in frames:
            start_time_seconds = calculate_time_seconds(frame, "time_offset")
            if start_time_seconds <= early_time_seconds:
                return True, frame
    return False, None


def find_elements_in_transcript(
    speech_transcriptions: list[dict],
    elements: list[str],
    elements_categories: list[str],
    apply_condition: bool,
) -> tuple[bool, bool]:
    """Finds a list of elements in the video transcript
    Args:
        speech_transcriptions: the speech annotations
        elements: list of elements to find in the transcript
        elements_categories: list of element categories to find in the transcript
        apply_condition: flag to filter out text with less than x chars. This is
        only needed when elements come from text annotations since words are
        sometimes 1 character only.
    Returns:
        True if the elements are found, False otherwise
    """
    words_1st_5_secs = []
    element_mention_speech = False
    element_mention_speech_1st_5_secs = False
    for speech_transcription in speech_transcriptions:
        # The number of alternatives for each transcription is limited by
        # SpeechTranscriptionConfig.max_alternatives.
        # Each alternative is a different possible transcription
        # and has its own confidence score.
        for alternative in speech_transcription.get("alternatives"):
            # Check confidence against user defined threshold
            if alternative and alternative.get("confidence") >= confidence_threshold:
                transcript = alternative.get("transcript")
                # Check if elements or elements categories are found in transcript
                if apply_condition:
                    found_elements = find_text_annotation_elements_in_transcript(
                        elements, transcript
                    )
                else:
                    found_elements = [
                        element
                        for element in elements
                        if element.lower() in transcript.lower()
                    ]
                found_elements_categories = [
                    elements_category
                    for elements_category in elements_categories
                    if elements_category.lower() in transcript.lower()
                ]
                if len(found_elements) > 0 or len(found_elements_categories) > 0:
                    element_mention_speech = True
                # For 1st 5 secs, check elements and elements_categories in words
                # since only the words[] contain times
                words = alternative.get("words") if "words" in alternative else []
                # Sort words by time to construct correct transcript later
                sorted_words = sorted(
                    words,
                    key=lambda x: calculate_time_seconds(x, "start_time"),
                    reverse=False,
                )
                for word_info in sorted_words:
                    start_time_secs = calculate_time_seconds(word_info, "start_time")
                    # Consider only words in the 1st 5 secs
                    if start_time_secs <= early_time_seconds:
                        words_1st_5_secs.append(word_info.get("word"))

    # Evaluate 1st 5 secs - Construct transcript from words
    transcript_1st_5_secs = " ".join(words_1st_5_secs)
    if apply_condition:
        found_elements_1st_5_seconds = find_text_annotation_elements_in_transcript(
            elements, transcript_1st_5_secs
        )
    else:
        found_elements_1st_5_seconds = [
            element
            for element in elements
            if element.lower() in transcript_1st_5_secs.lower()
        ]
    found_elements_categories_1st_5_seconds = [
        elements_category
        for elements_category in elements_categories
        if elements_category.lower() in transcript_1st_5_secs.lower()
    ]
    if (
        len(found_elements_1st_5_seconds) > 0
        or len(found_elements_categories_1st_5_seconds) > 0
    ):
        element_mention_speech_1st_5_secs = True

    return element_mention_speech, element_mention_speech_1st_5_secs


def find_text_annotation_elements_in_transcript(elements: list[str], transcript: str):
    """Checks if text annotation elements in an array are found in transcript
    Args:
        elements: list of elements to find in the transcript
        transcript: the transcript to find the elements in
        This is only needed when elements come from text annotations since
        words are sometimes 1 character only.
    """
    found_elements = [
        element
        for element in elements
        # filter out words with less than 3 chars? - DONE
        if len(element) > 3 and element.lower() in transcript.lower()
    ]
    return found_elements


def get_speech_transcript(speech_transcriptions: list[dict]) -> str:
    """Get transcript built from transcript alternatives
    Args:
        speech_transcriptions: the speech annotations
    Returns
        final_transcript: the constructured transcript
    """
    transcript_alternatives = []
    transcript_alt_confidence = []
    for speech_transcription in speech_transcriptions:
        # The number of alternatives for each transcription is limited by
        # SpeechTranscriptionConfig.max_alternatives.
        # Each alternative is a different possible transcription
        # and has its own confidence score.
        for alternative in speech_transcription.get("alternatives"):
            # Check confidence against user defined threshold
            transcript = alternative.get("transcript")
            if alternative and alternative.get("confidence") >= confidence_threshold:
                transcript_alternatives.append(transcript)
                transcript_alt_confidence.append(alternative)

    sorted_transcript_by_confidence = sorted(
        transcript_alt_confidence,
        key=lambda x: x.get("confidence"),
        reverse=True,
    )  # don't use this for now
    highest_confidence_trascript = (
        sorted_transcript_by_confidence[0].get("transcript")
        if len(sorted_transcript_by_confidence) > 0
        else ""
    )  # don't use this for now
    final_transcript = " ".join(transcript_alternatives)
    return final_transcript


def get_speech_transcript_1st_5_secs(speech_transcriptions: list[dict]):
    """Get transcript with highest confidence
    Args:
        speech_transcriptions: the speech annotations
    Returns
        transcript_1st_5_secs: the transcript in the 1st 5 secs
    """
    words_1st_5_secs = []
    for speech_transcription in speech_transcriptions:
        # The number of alternatives for each transcription is limited by
        # SpeechTranscriptionConfig.max_alternatives.
        # Each alternative is a different possible transcription
        # and has its own confidence score.
        for alternative in speech_transcription.get("alternatives"):
            # Check confidence against user defined threshold
            if alternative and alternative.get("confidence") >= confidence_threshold:
                # For 1st 5 secs get transcript from words
                # since only the words[] contain times
                words = alternative.get("words") if "words" in alternative else []
                # Sort words by time to construct correct transcript later
                sorted_words = sorted(
                    words,
                    key=lambda x: calculate_time_seconds(x, "start_time"),
                    reverse=False,
                )
                for word_info in sorted_words:
                    start_time_secs = calculate_time_seconds(word_info, "start_time")
                    # Consider only words in the 1st 5 secs
                    if start_time_secs <= early_time_seconds:
                        words_1st_5_secs.append(word_info.get("word"))
    # Construct transcript from words
    transcript_1st_5_secs = " ".join(words_1st_5_secs)
    return transcript_1st_5_secs


def get_existing_annotations_from_gcs(brand_name: str) -> list[str]:
    """Get existing annotations from Cloud Storage
    Args:
        brand_name: the parent folder in Cloud Storage
    Returns:
        video_annotations: array of annotation url/names
    """
    bucket = get_bucket()
    blobs = bucket.list_blobs(prefix=f"{brand_name}/annotations/")
    video_annotations = []
    for blob in blobs:
        video_annotations.append(f"gs://{bucket_name}/{blob.name}")
    return video_annotations


def download_video_annotations(
    brand_name: str, video_name: str
) -> tuple[dict, dict, dict, dict, dict, dict, dict]:
    """Download video annotations from Google Cloud Storage
    Args:
        brand_name: the brand to generate the video annotations for
        video_name: Full video name
    Returns:
        text_annotation_results (tuple): Text annotations tuple
    """
    annotation_location = f"{brand_name}/annotations/{video_name}"
    bucket = get_bucket()

    # Label Annotations
    blob_label = bucket.blob(f"{annotation_location}/label-detection.json")
    data_label = json.loads(blob_label.download_as_string(client=None))
    # Get label annotations. The first result is retrieved because a single video was processed.
    label_annotation_results = data_label.get("annotation_results")[0]

    # Face Annotations
    blob_face = bucket.blob(f"{annotation_location}/face-detection.json")
    data_face = json.loads(blob_face.download_as_string(client=None))
    # Get face annotations. The first result is retrieved because a single video was processed.
    face_annotation_results = data_face.get("annotation_results")[0]

    # People Annotations
    blob_people = bucket.blob(f"{annotation_location}/people-detection.json")
    data_people = json.loads(blob_people.download_as_string(client=None))
    # Get people annotations. The first result is retrieved because a single video was processed.
    people_annotation_results = data_people.get("annotation_results")[0]

    # Shot Annotations
    blob_shot = bucket.blob(f"{annotation_location}/shot-detection.json")
    data_shot = json.loads(blob_shot.download_as_string(client=None))
    # Get logo annotations. The first result is retrieved because a single video was processed.
    shot_annotation_results = data_shot.get("annotation_results")[0]

    # Text Annotations
    blob_text = bucket.blob(f"{annotation_location}/text-detection.json")
    data_text = json.loads(blob_text.download_as_string(client=None))
    # Get text annotations. The first result is retrieved because a single video was processed.
    text_annotation_results = data_text.get("annotation_results")[0]

    # Logo Annotations
    blob_logo = bucket.blob(f"{annotation_location}/logo-detection.json")
    data_logo = json.loads(blob_logo.download_as_string(client=None))
    # Get logo annotations. The first result is retrieved because a single video was processed.
    logo_annotation_results = data_logo.get("annotation_results")[0]

    # Speech Annotations
    blob_speech = bucket.blob(f"{annotation_location}/speech-detection.json")
    data_speech = json.loads(blob_speech.download_as_string(client=None))
    # Get speech annotations. The first result is retrieved because a single video was processed.
    speech_annotation_results = data_speech.get("annotation_results")[0]

    return (
        label_annotation_results,
        face_annotation_results,
        people_annotation_results,
        shot_annotation_results,
        text_annotation_results,
        logo_annotation_results,
        speech_annotation_results,
    )

class LLMParameters:
    """Class that represents the required params to make a prediction to the LLM"""

    model_name: str
    location: str
    modality: dict
    generation_config: dict = {  # Default model config
        "max_output_tokens": 2048,
        "temperature": 0.5,
        "top_p": 1,
        "top_k": 40,
    }

    def __init__(
        self,
        model_name: str,
        location: str,
        generation_config: dict,
        modality: dict = None,
    ):
        self.model_name = model_name
        self.location = location
        self.generation_config = generation_config
        self.modality = modality

    def set_modality(self, modality: dict) -> None:
        """Sets the modal to use in the LLM
        The modality object changes depending on the type.
        For video:
        {
            "type": "video", # prompt is handled separately
            "video_uri": ""
        }
        For text:
        {
            "type": "text" # prompt is handled separately
        }
        """
        self.modality = modality


class VertexAIService:
    """Vertex AI Service to leverage the Vertex APIs for inference"""

    def __init__(self, project_id: str):
        self.project_id = project_id

    def execute_gemini_pro(self, prompt: str, params: LLMParameters) -> str:
        """Makes a request to Gemini to get a prediction based on the provided prompt
        and multi-modal params
        Args:
            prompt: a string with the prompt for LLM
            params: llm params model_name, location, modality and generation_config
        Returns:
            response.text: a string with the generated response
        """
        retries = 4
        for this_retry in range(retries):
            try:
                vertexai.init(project=self.project_id, location=params.location)
                model = GenerativeModel(params.model_name)
                modality_params = self._get_modality_params(prompt, params)
                response = model.generate_content(
                    modality_params,
                    generation_config=params.generation_config,
                    safety_settings={
                        generative_models.HarmCategory.HARM_CATEGORY_HATE_SPEECH: generative_models.HarmBlockThreshold.BLOCK_ONLY_HIGH,
                        generative_models.HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: generative_models.HarmBlockThreshold.BLOCK_ONLY_HIGH,
                        generative_models.HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: generative_models.HarmBlockThreshold.BLOCK_ONLY_HIGH,
                        generative_models.HarmCategory.HARM_CATEGORY_HARASSMENT: generative_models.HarmBlockThreshold.BLOCK_ONLY_HIGH,
                    },
                    stream=False,
                )
                return response.text if response else ""
            except AttributeError as ex:
                error_message = str(ex)
                if (
                    this_retry == retries - 1
                    or "Content has no parts" not in error_message
                ):
                    # Raise exception for other attribute errors
                    raise
                # Retry request
                if "Content has no parts" in error_message:
                    print(
                        f"Error: {ex} Gemini might be blocking the response due to safety issues.\n"
                    )
                wait = 10 * 2**this_retry
                time.sleep(wait)
            except Exception as ex:
                print("GENERAL EXCEPTION...\n")
                error_message = str(ex)
                # Check quota issues for now
                if (
                    this_retry == retries - 1
                    or "429 Quota exceeded" not in error_message
                    or "503 The service is currently unavailable" not in error_message
                    or "500 Internal error encountered" not in error_message
                    or "403" not in error_message
                ):
                    if VERBOSE:
                        print(f"{error_message}\n")
                    # Raise exception for non-retriable errors
                    raise
                # Retry request
                if VERBOSE:
                    print(
                        f"Error {error_message}. Retrying {retries - 1} times using exponential backoff. Retry number {this_retry}...\n"
                    )
                wait = 10 * 2**this_retry
                time.sleep(wait)
        return ""

    def _get_modality_params(self, prompt: str, params: LLMParameters) -> list[any]:
        """Build the modality params based on the type of llm capability to use
        Args:
            prompt: a string with the prompt for LLM
            model_params: the model params for inference, see defaults above
        Returns:
            modality_params: list of modality params based on the model capability to use
        """
        if params.modality["type"] == "video":
            mime_type = f"video/{get_video_format(params.modality['video_uri'])}"
            video = Part.from_uri(uri=params.modality["video_uri"], mime_type=mime_type)
            return [video, prompt]
        elif params.modality["type"] == "text":
            return [prompt]
        return []


def get_vertex_ai_service():
    """Gets Vertex AI service to interact with Gemini"""
    vertex_ai_service = VertexAIService(project_id)
    return vertex_ai_service


def detect_feature_with_llm(
    feature: str, prompt: str, llm_params: LLMParameters
) -> tuple[bool, str]:
    """Detect feature using LLM
    Args:
        feature: the feature to evaluate
        prompt: prompt for the llm
        llm_params: object with llm params
    Returns:
        feature_detected: True if the feature is detected, False otherwise
    """
    try:
        vertex_ai_service = get_vertex_ai_service()
        if llm_params.model_name == GEMINI_PRO:
            # Gemini 1.5 does not support top_k param
            if "top_k" in llm_params.generation_config:
                del llm_params.generation_config["top_k"]
            llm_response = vertex_ai_service.execute_gemini_pro(
                prompt=prompt, params=llm_params
            )
        else:
            print(f"LLM {llm_params.model_name} not supported.")
            return False
        # Parse response
        llm_response_json = json.loads(clean_llm_response(llm_response))
        if (
            "feature_detected" in llm_response_json
            and "explanation" in llm_response_json
        ):
            if VERBOSE:
                print("***Powered by LLMs***")
                print(
                    f"Feature detected: {feature}: {llm_response_json.get('feature_detected')}"
                )
                print(f"Explanation: {llm_response_json.get('explanation')}\n")
            feature_detected = (
                llm_response_json.get("feature_detected") == "True"
                or llm_response_json.get("feature_detected") == "true"
            )
            return feature_detected, llm_response_json.get("explanation")
        else:
            if VERBOSE:
                print("***Powered by LLMs***")
                print(
                    "JSON parse was successful but the JSON keys: feature_detected and explanation were not found."
                )
                print("Using string version...\n")
                print(llm_response)
            feature_detected = is_feature_detected(llm_response)
            return feature_detected, llm_response
    except json.JSONDecodeError as ex:
        if VERBOSE:
            print(f"LLM response could not be parsed. Error: {ex}.\n")
            print("Using string version...\n")
            if llm_response:
                print("***Powered by LLMs***")
                print(f"{feature}: {llm_response}")
    except Exception as ex:
        print(ex)
        raise
    feature_detected = is_feature_detected(llm_response)
    return feature_detected, llm_response

def is_feature_detected(llm_response: str):
    """Checks if feature is detected

    Args:
        llm_response: string llm respose
    Returns:
        detected: whether the feature was detected or not

    """
    detected = llm_response and (
        '"feature_detected" : "True"' in llm_response
        or '"feature_detected" : "true"' in llm_response
        or '"feature_detected": "True"' in llm_response
        or '"feature_detected": "true"' in llm_response
    )
    return detected


def clean_llm_response(response: str) -> str:
    """Cleans LLM response
    Args:
        response: llm response to clean
    Returns:
        reponse: without extra characters
    """
    return response.replace("```", "").replace("json", "")

def get_bucket() -> any:
    """Builds GCS bucket"""
    # Init cloud storage bucket
    storage_client = storage.Client()
    bucket = storage_client.get_bucket(bucket_name)
    return bucket


# Knowledge Graph module


def get_knowledge_graph_entities(queries: list[str]) -> dict[str, dict]:
    """Get the knowledge Graph Entities for a list of queries
    Args:
        queries: a list of entities to find in KG
    Returns:
        kg_entities: entities found in KG
        Format example: entity id is the key and entity details the value
        kg_entities = {
            "mcy/12": {}  (ae) add here
        }
    """
    kg_entities = {}
    try:
        for query in queries:
            service_url = "https://kgsearch.googleapis.com/v1/entities:search"
            params = {
                "query": query,
                "limit": 10,
                "indent": True,
                "key": KNOWLEDGE_GRAPH_API_KEY,
            }
            url = f"{service_url}?{urllib.parse.urlencode(params)}"
            response = json.loads(urllib.request.urlopen(url).read())
            for element in response["itemListElement"]:
                kg_entity_name = element["result"]["name"]
                # To only add the exact KG entity
                if query.lower() == kg_entity_name.lower():
                    kg_entities[element["result"]["@id"][3:]] = element["result"]
        return kg_entities
    except Exception as ex:
        print(
            f"\n\x1b[31mERROR: There was an error fetching the Knowledge Graph entities. Please check that your API key is correct. ERROR: {ex}\x1b[0m"
        )
        raise


def get_file_name_from_gcs_url(gcs_url: str) -> tuple[str]:
    """Get file name from GCS url
    Args:
        gcs_url: the gcs url with the file name
    Returns:
        file_name_with_format: the file name with its format
        file_name: the file name
    """
    url_parts = gcs_url.split("/")
    if len(url_parts) == 3:
        file_name = url_parts[2].split(".")[0]
        file_name_with_format = url_parts[2]
        return file_name, file_name_with_format
    return ""


def get_video_format(video_location: str):
    """Gets video format from gcs url
    Args:
        video_location: gcs video location
    Returns:
        video_format: video format
    """
    gcs_parts = video_location.split(".")
    if len(gcs_parts) == 2:
        video_format = gcs_parts[1]
        return video_format
    return ""


def get_n_secs_video_uri_from_uri(video_uri: str, new_name_part: str):
    """Get uri for the n seconds video
    Args:
        video_uri: str
    Return:
        video_name_n_secs
    """
    gcs_parts = video_uri.split(".")
    if len(gcs_parts) == 2:
        video_format = gcs_parts[1]
        long_video_name_parts = gcs_parts[0].split("/")
        if len(long_video_name_parts) == 6:
            gcs = long_video_name_parts[0]
            bucket_name = long_video_name_parts[2]
            brand = long_video_name_parts[3]
            videos_folder = long_video_name_parts[4]
            # Last element is the video name
            video_name = f"{long_video_name_parts[-1]}_{new_name_part}.{video_format}"
            n_secs_video_uri = (
                f"{gcs}//{bucket_name}/{brand}/{videos_folder}/{video_name}"
            )
        return n_secs_video_uri
    return ""


def store_assessment_results_locally(brand_name: str, assessment: any) -> None:
    """Store test results in a file"""
    file_name = f"results/{brand_name}_{assessment.get('video_uri')}.json"
    assessment = {
        "brand_name": brand_name,
        "assessment": assessment
    }
    os.makedirs(os.path.dirname(file_name), exist_ok=True)
    with open(file_name, "w", encoding="utf-8") as f:
        json.dump(assessment, f, ensure_ascii=False, indent=4)


def trim_videos(brand_name: str):
    """Trims videos to create new versions of 5 secs
    Args:
        brand_name: the brand to trim the videos for
    """
    local_videos_path = "abcd_videos"
    # Check if the directory exists
    if not os.path.exists(local_videos_path):
        os.makedirs(local_videos_path)
    # Get videos from GCS
    brand_videos_folder = f"{brand_name}/videos"
    bucket = get_bucket()
    blobs = bucket.list_blobs(prefix=brand_videos_folder)
    # Video processing
    for video in blobs:
        if video.name == f"{brand_videos_folder}/" or "1st_5_secs" in video.name:
            # Skip parent folder and trimmed versions of videos
            continue
        video_name, video_name_with_format = get_file_name_from_gcs_url(video.name)
        video_name_1st_5_secs = (
            f"{video_name}_1st_5_secs.{get_video_format(video_name_with_format)}"
        )
        video_name_1st_5_secs_parent_folder = (
            f"{brand_videos_folder}/{video_name_1st_5_secs}"
        )
        video_1st_5_secs_metadata = bucket.get_blob(video_name_1st_5_secs_parent_folder)
        # Only process the video if it was not previously trimmed
        if not video_1st_5_secs_metadata:
            # Download the video from GCS
            download_and_save_video(
                output_path=local_videos_path,
                video_name_with_format=video_name_with_format,
                video_uri=video.name,
            )
            # Trim the video
            trim_and_push_video_to_gcs(
                local_videos_path=local_videos_path,
                gcs_output_path=brand_videos_folder,
                video_name_with_format=video_name_with_format,
                new_video_name=video_name_1st_5_secs,
                trim_start=0,
                trim_end=5,
            )
        else:
            print(f"Video {video.name} has already been trimmed. Skipping...\n")


def download_and_save_video(
    output_path: str, video_name_with_format: str, video_uri: str
) -> None:
    """Downloads a video from Google Cloud Storage
    and saves it locally
    Args:
        output_path: the path to store the video
        video_name_with_format: the video name with format
        video_uri: the video location
    """
    bucket = get_bucket()
    video_blob = bucket.blob(video_uri)
    video = video_blob.download_as_string(client=None)
    with open(f"{output_path}/{video_name_with_format}", "wb") as f:
        f.write(video)  # writing content to file
        if VERBOSE:
            print(f"Video {video_uri} downloaded and saved!\n")


def trim_and_push_video_to_gcs(
    local_videos_path: str,
    gcs_output_path: str,
    video_name_with_format: str,
    new_video_name: str,
    trim_start: int,
    trim_end: int,
) -> None:
    """Trims a video to generate a 5 secs version
    Args:
        local_videos_path: where the videos are stored locally
        gcs_output_path: the path to store the video in Google Cloud storage
        video_name_with_format: the original video name with format
        new_video_name: the new name for the trimmed video
        trim_start: the start time to trim the video
        trim_end: the end time to trim the video
    """
    bucket = get_bucket()
    # Load video dsa gfg intro video
    local_video_path = f"{local_videos_path}/{video_name_with_format}"
    clip = VideoFileClip(local_video_path)
    # Get only first N seconds
    clip = clip.subclip(trim_start, trim_end)
    # Save the clip
    new_video_name_path = f"{local_videos_path}/{new_video_name}"
    clip.write_videofile(new_video_name_path)
    # Upload back to Google Cloud Storage
    blob = bucket.blob(f"{gcs_output_path}/{new_video_name}")
    # Optional: set a generation-match precondition to avoid potential race conditions
    # and data corruptions.
    generation_match_precondition = 0
    blob.upload_from_filename(
        new_video_name_path, if_generation_match=generation_match_precondition
    )
    if VERBOSE:
        print(f"File {new_video_name} uploaded to {gcs_output_path}.\n")

#### player(video_url)

In [None]:
# Define video player
def player(video_url):
  # Loads a video file and plays it
  print(f"Displaying Video URL: {video_url}")
  HTML(f"""
  <video width=600 height=337 controls>
    <source src="{video_url}" type="video/mp4">
  </video>
  """)
  return

## <font color='#4285f4'>Define ABDCs Criteria</font>

In [None]:
# @title 1, 2) Attract: Quick Pacing & Quick Pacing (First 5 seconds)

# @markdown **Features:**

# @markdown **Quick Pacing:** Within ANY 5 consecutive seconds there are 5 or more shots in the video. These include hard cuts, soft transitions and camera changes such as camera pans, swipes, zooms, depth of field changes, tracking shots and movement of the camera.

# @markdown **Quick Pacing (First 5 seconds):** There are at least 5 shot changes or visual cuts detected within the first 5 seconds (up to 4.99s) of the video. These include hard cuts, soft transitions and camera changes such as camera pans, swipes, zooms, depth of field changes, tracking shots and movement of the camera.

@retry(wait=wait_exponential(multiplier=1, min=1, max=60), stop=stop_after_attempt(10), retry=retry_if_exception(RetryCondition), before_sleep=before_sleep_log(logging.getLogger(), logging.INFO))
def detect_quick_pacing(
    shot_annotation_results: any, video_uri: str
) -> tuple[bool, bool]:
    """Detect Quick Pacing & Quick Pacing (First 5 seconds)
    Args:
        shot_annotation_results: shot annotations
        video_uri: video location in gcs
    Returns:
        quick_pacing, quick_pacing_1st_5_secs: quick pacing evaluation tuple
    """
    required_secs_for_quick_pacing = 5
    required_shots_for_quick_pacing = 5
    # Feature Quick Pacing
    quick_pacing_feature = "Quick Pacing"
    quick_pacing = False
    quick_pacing_criteria = """Within ANY 5 consecutive seconds there are 5 or more shots in the video.
        These include hard cuts, soft transitions and camera changes such as camera pans, swipes, zooms,
        depth of field changes, tracking shots and movement of the camera."""
    total_shots_count = 0
    total_time_all_shots = 0
    quick_pacing_eval_details = {
        "feature": quick_pacing_feature,
        "feature_description": quick_pacing_criteria,
        "feature_detected": quick_pacing,
        "llm_details": [],
    }
    # Feature Quick Pacing (First 5 secs)
    quick_pacing_1st_5_secs_feature = "Quick Pacing (First 5 seconds)"
    quick_pacing_1st_5_secs = False
    quick_pacing_1st_5_secs_criteria = """There are at least 5 shot changes or visual cuts detected in the video.
        These include hard cuts, soft transitions and camera changes such as camera pans, swipes, zooms, depth of
        field changes, tracking shots and movement of the camera."""
    total_shots_count_1st_5_secs = 0
    quick_pacing_1st_5_secs_eval_details = {
        "feature": quick_pacing_1st_5_secs_feature,
        "feature_description": quick_pacing_1st_5_secs_criteria,
        "feature_detected": quick_pacing_1st_5_secs,
        "llm_details": [],
    }

    # Video API: Evaluate quick_pacing_feature and quick_pacing_1st_5_secs_feature
    if use_annotations:
        if "shot_annotations" in shot_annotation_results:
            sorted_shots = sorted(
                shot_annotation_results.get("shot_annotations"),
                key=lambda x: calculate_time_seconds(x, "start_time_offset"),
                reverse=False,
            )
            # Video API: Evaluate quick_pacing_feature & quick_pacing_1st_5_secs_feature
            for shot in sorted_shots:
                start_time_secs = calculate_time_seconds(shot, "start_time_offset")
                end_time_secs = calculate_time_seconds(shot, "end_time_offset")
                shot_total_time = end_time_secs - start_time_secs
                # Quick Pacing calculation
                total_time_all_shots += shot_total_time
                if total_time_all_shots < required_secs_for_quick_pacing:
                    total_shots_count += 1
                    # Quick Pacing (First 5 secs) calculation
                    if start_time_secs < early_time_seconds:
                        total_shots_count_1st_5_secs += 1
                else:
                    # To start counting shot time and # shots again
                    if total_shots_count >= required_shots_for_quick_pacing:
                        quick_pacing = True
                    # Quick Pacing (First 5 secs) calculation
                    if total_shots_count_1st_5_secs >= required_shots_for_quick_pacing:
                        quick_pacing_1st_5_secs = True
                    total_time_all_shots = 0
                    total_shots_count = 0
        else:
            print(
                f"No Shot annotations found. Skipping {quick_pacing_feature} evaluation with Video Intelligence API."
            )

    # LLM: Evaluate quick_pacing_feature and quick_pacing_1st_5_secs_feature
    if use_llms:
        llm_params = LLMParameters(
            model_name=GEMINI_PRO,
            location=llm_location,
            generation_config=llm_generation_config,
        )
        # 1. Evaluate quick_pacing_feature
        prompt = (
            """Are there 5 or more shots within ANY 5 consecutive seconds in the video?
            Consider the following criteria for your answer: {criteria}
            Look through each frame in the video carefully and answer the question.
            Provide the shot changes count in the following format:
            Number of shots: #
            Provide the exact timestamp when the shot changes happen and the shot description.
            Return False if the number of shots identified is less than 5.
            {context_and_examples}
        """.replace(
                "{feature}", quick_pacing_feature
            )
            .replace("{criteria}", quick_pacing_criteria)
            .replace("{context_and_examples}", context_and_examples)
        )
        # Use full video for this feature
        llm_params.set_modality({"type": "video", "video_uri": video_uri})
        feature_detected, llm_explanation = detect_feature_with_llm(
            quick_pacing_feature, prompt, llm_params
        )
        if feature_detected:
            quick_pacing = True

        # Include llm details
        quick_pacing_eval_details["llm_details"].append(
            {
                "llm_params": llm_params.__dict__,
                "prompt": prompt,
                "llm_explanation": llm_explanation,
            }
        )

        # 2. Evaluate quick_pacing_1st_5_secs_feature
        # Remove 1st 5 secs references from prompt to avoid hallucinations since the video is already 5 secs
        prompt = (
            """Are there at least 5 shot changes or visual cuts detected in the video?
            Consider the following criteria for your answer: {criteria}
            Look through each frame in the video carefully and answer the question.
            Provide the shot changes count in the following format:
            Number of shots: #
            Provide the exact timestamp when the shot changes happen and the shot description.
            Return False if the number of shots identified is less than 5.
            {context_and_examples}
        """.replace(
                "{feature}", quick_pacing_1st_5_secs_feature
            )
            .replace("{criteria}", quick_pacing_1st_5_secs_criteria)
            .replace("{context_and_examples}", context_and_examples)
        )
        # Use first 5 secs video for this feature
        video_uri_1st_5_secs = get_n_secs_video_uri_from_uri(video_uri, "1st_5_secs")
        llm_params.set_modality({"type": "video", "video_uri": video_uri_1st_5_secs})
        feature_detected, llm_explanation = detect_feature_with_llm(
            quick_pacing_1st_5_secs_feature, prompt, llm_params
        )
        if feature_detected:
            quick_pacing_1st_5_secs = True

        # Include llm details
        quick_pacing_1st_5_secs_eval_details["llm_details"].append(
            {
                "llm_params": llm_params.__dict__,
                "prompt": prompt,
                "llm_explanation": llm_explanation,
            }
        )

    print(f"{quick_pacing_feature}: {quick_pacing}")
    quick_pacing_eval_details["feature_detected"] = quick_pacing
    print(f"{quick_pacing_1st_5_secs_feature}: {quick_pacing_1st_5_secs}")
    quick_pacing_1st_5_secs_eval_details["feature_detected"] = quick_pacing_1st_5_secs

    return quick_pacing_eval_details, quick_pacing_1st_5_secs_eval_details

In [None]:
# @title 3) Attract: Dynamic Start

# @markdown **Features:**

# @markdown **Dynamic Start:** The first shot in the video changes in less than 3 seconds.

@retry(wait=wait_exponential(multiplier=1, min=1, max=60), stop=stop_after_attempt(10), retry=retry_if_exception(RetryCondition), before_sleep=before_sleep_log(logging.getLogger(), logging.INFO))
def detect_dynamic_start(shot_annotation_results: any, video_uri: str) -> dict:
    """Detects Dynamic Start
    Args:
        shot_annotation_results: shot annotations
        video_uri: video location in gcs
    Returns:
        dynamic_start_eval_details: dynamic start evaluation
    """
    # Feature Dynamic Start
    dynamic_start_feature = "Dynamic Start"
    dynamic_start = False
    dynamic_start_criteria = (
        """The first shot in the video changes in less than 3 seconds."""
    )
    dynamic_start_eval_details = {
        "feature": dynamic_start_feature,
        "feature_description": dynamic_start_criteria,
        "feature_detected": dynamic_start,
        "llm_details": [],
    }

    # Video API: Evaluate dynamic_start_feature
    if use_annotations:
        if "shot_annotations" in shot_annotation_results:
            first_shot_end_time_off_set = shot_annotation_results.get(
                "shot_annotations"
            )[0]
            nanos = first_shot_end_time_off_set.get("end_time_offset").get("nanos")
            seconds = first_shot_end_time_off_set.get("end_time_offset").get("seconds")
            if nanos:
                if seconds:
                    total_ms_first_shot = (nanos + seconds * 1e9) / 1e6
                else:
                    total_ms_first_shot = nanos / 1e6
            else:
                if seconds:
                    total_ms_first_shot = (seconds * 1e9) / 1e6

            if total_ms_first_shot < dynamic_cutoff_ms:
                dynamic_start = True
        else:
            print(
                f"No Shot annotations found. Skipping {dynamic_start_feature} evaluation with Video Intelligence API."
            )

    # LLM: Evaluate dynamic_start_feature
    if use_llms:
        # 1. Evaluate dynamic_start_feature
        prompt = (
            """Does the first shot in the video change in less than 3 seconds?
            Consider the following criteria for your answer: {criteria}
            Look through each frame in the video carefully and answer the question.
            Provide the exact timestamp when the first shot in the video changes.
            Return True if and only if the first shot in the video changes in less than 3 seconds.
            {context_and_examples}
        """.replace(
                "{feature}", dynamic_start_feature
            )
            .replace("{criteria}", dynamic_start_criteria)
            .replace("{context_and_examples}", context_and_examples)
        )
        llm_params = LLMParameters(
            model_name=GEMINI_PRO,
            location=llm_location,
            generation_config=llm_generation_config,
        )
        # Use first 5 secs video for this feature
        video_uri_1st_5_secs = get_n_secs_video_uri_from_uri(video_uri, "1st_5_secs")
        llm_params.set_modality({"type": "video", "video_uri": video_uri_1st_5_secs})
        feature_detected, llm_explanation = detect_feature_with_llm(
            dynamic_start_feature, prompt, llm_params
        )
        if feature_detected:
            dynamic_start = True

        # Include llm details
        dynamic_start_eval_details["llm_details"].append(
            {
                "llm_params": llm_params.__dict__,
                "prompt": prompt,
                "llm_explanation": llm_explanation,
            }
        )

    print(f"{dynamic_start_feature}: {dynamic_start}")
    dynamic_start_eval_details["feature_detected"] = dynamic_start

    return dynamic_start_eval_details

In [None]:
# @title 4 & 5) Attract: Supers & Supers with Audio

# @markdown **Features:**

# @markdown 1. **Supers:** Any supers (text overlays) have been incorporated at any time in the video.

# @markdown 2. **Supers with Audio**: The speech heard in the audio of the video matches OR is contextually supportive of the overlaid text shown on screen.

@retry(wait=wait_exponential(multiplier=1, min=1, max=60), stop=stop_after_attempt(10), retry=retry_if_exception(RetryCondition), before_sleep=before_sleep_log(logging.getLogger(), logging.INFO))
def detect_supers(text_annotation_results: any, video_uri: str) -> dict:
    """Detect Supers
    Args:
        text_annotation_results: text annotations
        video_uri: video location in gcs
    Returns:
        supers_eval_details: supers evaluation
    """
    # Feature Supers
    supers = False
    supers_feature = "Supers"
    supers_criteria = """Any supers (text overlays) have been incorporated at any time in the video."""
    supers_eval_details = {
        "feature": supers_feature,
        "feature_description": supers_criteria,
        "feature_detected": supers,
        "llm_details": None,
    }

    # Video API: Evaluate supers_feature
    if use_annotations:
        if "text_annotations" in text_annotation_results:
            if len(text_annotation_results.get("text_annotations")) > 0:
                supers = True
        else:
            print(
                f"No Text annotations found. Skipping {supers_feature} evaluation with Video Intelligence API."
            )

    # LLM: Evaluate supers_feature
    if use_llms:
        # 1. Evaluate supers_feature
        prompt = (
            """Are there any supers (text overlays) at any time in the video?
            Consider the following criteria for your answer: {criteria}
            Look through each frame in the video carefully and answer the question.
            Provide the exact timestamp where supers are found as well as the list of supers.
            {context_and_examples}
        """.replace(
                "{feature}", supers_feature
            )
            .replace("{criteria}", supers_criteria)
            .replace("{context_and_examples}", context_and_examples)
        )
        llm_params = LLMParameters(
            model_name=GEMINI_PRO,
            location=llm_location,
            generation_config=llm_generation_config,
        )
        # Use full video for this feature
        llm_params.set_modality({"type": "video", "video_uri": video_uri})
        feature_detected, llm_explanation = detect_feature_with_llm(
            supers_feature, prompt, llm_params
        )
        if feature_detected:
            supers = True

        # Include llm details
        supers_eval_details["llm_details"] = {
            "llm_params": llm_params.__dict__,
            "prompt": prompt,
            "llm_explanation": llm_explanation,
        }

    print(f"{supers_feature}: {supers}")
    supers_eval_details["feature_detected"] = supers

    return supers_eval_details

@retry(wait=wait_exponential(multiplier=1, min=1, max=60), stop=stop_after_attempt(10), retry=retry_if_exception(RetryCondition), before_sleep=before_sleep_log(logging.getLogger(), logging.INFO))
def detect_supers_with_audio(
    text_annotation_results: any,
    speech_annotation_results: any,
    video_uri: str,
) -> dict:
    """Detect Supers with Audio
    Args:
        text_annotation_results: text annotations
        speech_annotation_results: speech annotations
        video_uri: video location in gcs
    Returns:
        supers_with_audio_eval_details: supers with audio evaluation
    """
    # Feature Supers with Audio
    supers_with_audio_feature = "Supers with Audio"
    supers_with_audio = False
    supers_with_audio_criteria = """The speech heard in the audio of the video matches OR is contextually
        supportive of the overlaid text shown on screen."""
    supers_with_audio_eval_details = {
        "feature": supers_with_audio_feature,
        "feature_description": supers_with_audio_criteria,
        "feature_detected": supers_with_audio,
        "llm_details": [],
    }
    detected_text_list = []

    # Video API: Evaluate supers_with_audio_feature
    if use_annotations:
        if (
            "text_annotations" in text_annotation_results
            and "speech_transcriptions" in speech_annotation_results
        ):
            # Build list of found supers
            for text_annotation in text_annotation_results.get("text_annotations"):
                detected_text_list.append(text_annotation.get("text"))

            # Video API: Evaluate supers_with_audio
            (
                supers_with_audio,
                na,
            ) = find_elements_in_transcript(
                speech_transcriptions=speech_annotation_results.get(
                    "speech_transcriptions"
                ),
                elements=detected_text_list,
                elements_categories=[],
                apply_condition=True,  # flag to filter out text with less than x chars. This is
                # only needed when elements come from text annotations since words are sometimes
                # 1 character only.
            )
        else:
            print(
                f"No Text or Speech annotations found. Skipping {supers_with_audio_feature} evaluation."
            )

    # LLM: Evaluate supers_with_audio_feature
    if use_llms:
        llm_params = LLMParameters(
            model_name=GEMINI_PRO,
            location=llm_location,
            generation_config=llm_generation_config,
        )

        # LLM Only
        # 1. Evaluate supers_with_audio_feature
        prompt = (
            """Does the speech match any supers (text overlays) in the video or is the speech
            contextually supportive of the overlaid text shown on the video?
            Consider the following criteria for your answer: {criteria}
            Look through each frame in the video carefully and answer the question.
            Provide the exact timestamp where supers are found and the timestamp when the speech matches
            the supers or is contextually supportive of the overlaid text shown on the video.
            {context_and_examples}
        """.replace(
                "{feature}", supers_with_audio_feature
            )
            .replace("{criteria}", supers_with_audio_criteria)
            .replace("{context_and_examples}", context_and_examples)
        )
        # Use full video for this feature
        llm_params.set_modality({"type": "video", "video_uri": video_uri})
        feature_detected, llm_explanation = detect_feature_with_llm(
            supers_with_audio_feature, prompt, llm_params
        )
        if feature_detected:
            supers_with_audio = True

        # Include llm details
        supers_with_audio_eval_details["llm_details"].append(
            {
                "llm_params": llm_params.__dict__,
                "prompt": prompt,
                "llm_explanation": llm_explanation,
            }
        )

        # Combination of Annotations + LLM
        if use_annotations:
            if "speech_transcriptions" in speech_annotation_results:
                # 1. Evaluate supers_with_audio_feature
                transcript = get_speech_transcript(
                    speech_annotation_results.get("speech_transcriptions")
                )
                prompt = (
                    """Does the provided speech transcript matches any supers (text overlays) in the video or is the speech transcript
                        contextually supportive of the overlaid text shown on the video?
                        This is the speech transcript: "{transcript}"
                        Consider the following criteria for your answer: {criteria}
                        {context_and_examples}
                    """.replace(
                        "{feature}", supers_with_audio_feature
                    )
                    .replace("{transcript}", transcript)
                    .replace("{criteria}", supers_with_audio_criteria)
                    .replace("{context_and_examples}", context_and_examples)
                )
                # Use full video for this feature
                llm_params.set_modality({"type": "video", "video_uri": video_uri})
                # If transcript is empty, this feature should be False
                if transcript:
                    feature_detected, llm_explanation = detect_feature_with_llm(
                        supers_with_audio_feature, prompt, llm_params
                    )
                    if feature_detected:
                        supers_with_audio = True

                    # Include llm details
                    supers_with_audio_eval_details["llm_details"].append(
                        {
                            "llm_params": llm_params.__dict__,
                            "prompt": prompt,
                            "llm_explanation": llm_explanation,
                        }
                    )
                else:
                    supers_with_audio = False
                    # Include default details
                    supers_with_audio_eval_details["llm_details"].append(
                        {
                            "llm_params": llm_params.__dict__,
                            "prompt": prompt,
                            "llm_explanation": "Annotations + LLM: Speech was not found in annotations.",
                        }
                    )
            else:
                print(
                    f"No Speech annotations found. Skipping {supers_with_audio_feature} evaluation with Annotations + LLM."
                )

    print(f"{supers_with_audio_feature}: {supers_with_audio}")
    supers_with_audio_eval_details["feature_detected"] = supers_with_audio

    return supers_with_audio_eval_details

In [None]:
# @title 6 & 7) Brand: Brand Visuals & Brand Visuals (First 5 seconds)

# @markdown **Features:**

# @markdown 1. **Brand Visuals:** Branding, defined as the brand name or brand logo are shown in-situation or overlaid at any time in the video.

# @markdown 2. **Brand Visuals (First 5 seconds):** Branding, defined as the brand name or brand logo are shown in-situation or overlaid in the first 5 seconds (up to 4.99s) of the video.
# @markdown Including Logo Big & Logo Early. Is Logo larger than x% (3.5% default) of screen in the first 5 seconds?

@retry(wait=wait_exponential(multiplier=1, min=1, max=60), stop=stop_after_attempt(10), retry=retry_if_exception(RetryCondition), before_sleep=before_sleep_log(logging.getLogger(), logging.INFO))
def calculate_surface_area(points) -> float:
    """Calculate surface area of an object"""
    if len(points) != 4:
        return 0
    area1 = 0.5 * abs(points[0][0] * points[1][1] - points[1][0] * points[0][1])
    area2 = 0.5 * abs(points[1][0] * points[2][1] - points[2][0] * points[1][1])
    area3 = 0.5 * abs(points[2][0] * points[3][1] - points[3][0] * points[2][1])
    area4 = 0.5 * abs(points[3][0] * points[0][1] - points[0][0] * points[3][1])

    # Add the areas of the four triangles to get the total surface area.
    surface_area = area1 + area2 + area3 + area4
    return surface_area * 100

@retry(wait=wait_exponential(multiplier=1, min=1, max=60), stop=stop_after_attempt(10), retry=retry_if_exception(RetryCondition), before_sleep=before_sleep_log(logging.getLogger(), logging.INFO))
def detect_brand_visuals(
    text_annotation_results: any,
    logo_annotation_results: any,
    video_uri: str,
    brand_name: str,
    brand_variations: list[str],
) -> tuple[dict, dict, bool]:
    """Detect Brand Visuals & Brand Visuals (First 5 seconds)
    Args:
        text_annotation_results: text annotations
        logo_annotation_results: logo annotations
        video_uri: video location in gcs
        brand_name: name of the brand
        brand_variations: a list of brand name variations
    Returns:
        brand_visuals_eval_details,
        brand_visuals_1st_5_secs_eval_details,
        brand_visuals_logo_big_1st_5_secs: brand visuals evaluation
    """
    # Feature Brand Visuals
    brand_visuals_feature = "Brand Visuals"
    brand_visuals = False
    brand_visuals_criteria = """Branding, defined as the brand name or brand logo are shown
        in-situation or overlaid at any time in the video."""
    brand_visuals_eval_details = {
        "feature": brand_visuals_feature,
        "feature_description": brand_visuals_criteria,
        "feature_detected": brand_visuals,
        "llm_details": [],
    }
    # Feature Brand Visuals (First 5 seconds)
    brand_visuals_1st_5_secs_feature = "Brand Visuals (First 5 seconds)"
    brand_visuals_1st_5_secs = False
    # Remove 1st 5 secs references from prompt to avoid hallucinations since the video is already 5 secs
    brand_visuals_1st_5_secs_criteria = """Branding, defined as the brand name or brand logo are shown in-situation
    or overlaid in the video"""
    brand_visuals_1st_5_secs_eval_details = {
        "feature": brand_visuals_1st_5_secs_feature,
        "feature_description": brand_visuals_1st_5_secs_criteria,
        "feature_detected": brand_visuals_1st_5_secs,
        "llm_details": [],
    }
    # Feature Logo Big (First 5 seconds)
    brand_visuals_logo_big_1st_5_secs = False

    # Video API: Evaluate brand_visuals_feature and brand_visuals_1st_5_secs_feature 1st_5_secs
    if use_annotations:
        # Evaluate brand_visuals_brand_feature & brand_visuals_brand_1st_5_secs
        # in text annotations
        if "text_annotations" in text_annotation_results:
            for text_annotation in text_annotation_results.get("text_annotations"):
                text = text_annotation.get("text")
                found_brand = [
                    brand for brand in brand_variations if brand.lower() in text.lower()
                ]
                if found_brand:
                    brand_visuals = True
                    found_brand_1st_5_secs, frame = detected_text_in_first_5_seconds(
                        text_annotation
                    )
                    if found_brand_1st_5_secs:
                        brand_visuals_1st_5_secs = True
                    # Check surface area
                    if brand_visuals_1st_5_secs and frame:
                        coordinates = []
                        for vertex in frame.get("rotated_bounding_box").get("vertices"):
                            coordinates.append(
                                ((float(vertex.get("x"))), float(vertex.get("y")))
                            )
                        surface_area = calculate_surface_area(coordinates)
                        if surface_area > logo_size_threshold:
                            brand_visuals_logo_big_1st_5_secs = True
        else:
            print(
                f"No Text annotations found. Skipping {brand_visuals_feature} evaluation with Video Intelligence API."
            )

        # Evaluate brand_visuals_feature & brand_visuals_1st_5_secs in logo annotations
        brand_kg_entities = get_knowledge_graph_entities(brand_variations)
        brand_kg_entities_list = []
        for key, value in brand_kg_entities.items():
            entity_id = value["@id"][3:] if "@id" in value else ""
            entity_name = value["name"] if "name" in value else ""
            entity_description = value["description"] if "description" in value else ""
            brand_kg_entities_list.append(
                {
                    "entity_id": entity_id,
                    "entity_name": entity_name,
                    "entity_description": entity_description,
                }
            )

        if "logo_recognition_annotations" in logo_annotation_results:
            for logo_recognition_annotation in logo_annotation_results.get(
                "logo_recognition_annotations"
            ):
                entity_id = logo_recognition_annotation.get("entity").get("entity_id")
                entity_description = logo_recognition_annotation.get("entity").get(
                    "description"
                )
                found_entities = [
                    ent
                    for ent in brand_kg_entities_list
                    if ent["entity_id"] == entity_id
                    or ent["entity_description"].lower() == entity_description.lower()
                ]
                if len(found_entities) > 0:
                    # All logo tracks where the recognized logo appears. Each track corresponds
                    # to one logo instance appearing in consecutive frames.
                    for track in logo_recognition_annotation.get("tracks"):
                        # Check confidence against user defined threshold
                        if track.get("confidence") >= confidence_threshold:
                            brand_visuals = True
                            # Video segment of a track.
                            start_time_secs = calculate_time_seconds(
                                track.get("segment"), "start_time_offset"
                            )
                            if start_time_secs <= early_time_seconds:
                                brand_visuals_1st_5_secs = True
                                # The object with timestamp and attributes per frame in the track.
                                for timestamped_object in track.get(
                                    "timestamped_objects"
                                ):
                                    # Normalized Bounding box in a frame, where the object is located.
                                    normalized_bounding_box = timestamped_object.get(
                                        "normalized_bounding_box"
                                    )
                                    bottom_top = (
                                        normalized_bounding_box.get("bottom") or 0
                                    ) - (normalized_bounding_box.get("top") or 0)
                                    right_left = (
                                        normalized_bounding_box.get("right") or 0
                                    ) - (normalized_bounding_box.get("left") or 0)
                                    surface = bottom_top * right_left * 100
                                    if surface > logo_size_threshold:
                                        brand_visuals_logo_big_1st_5_secs = True

                    # All video segments where the recognized logo appears. There might be
                    # multiple instances of the same logo class appearing in one VideoSegment.
                    # Since there is no confidence here, just check 1st 5 mins feature - CHECK
                    for segment in logo_recognition_annotation.get("segments"):
                        start_time_secs = calculate_time_seconds(
                            segment, "start_time_offset"
                        )
                        if start_time_secs <= early_time_seconds:
                            brand_visuals_1st_5_secs = True
        else:
            print(
                f"No Logo annotations found. Skipping {brand_visuals_feature} evaluation with Video Intelligence API."
            )

    # LLM: Evaluate brand_visuals_feature and brand_visuals_1st_5_secs_feature 1st_5_secs
    if use_llms:
        llm_params = LLMParameters(
            model_name=GEMINI_PRO,
            location=llm_location,
            generation_config=llm_generation_config,
        )
        # 1. Evaluate brand_visuals_feature
        prompt = (
            """Is the brand {brand_name} or brand logo {brand_name} visible at any time in the video?
            Consider the following criteria for your answer: {criteria}
            Look through each frame in the video carefully and answer the question.
            Provide the exact timestamp when the brand {brand_name} or brand logo {brand_name} is found.
            {context_and_examples}
        """.replace(
                "{brand_name}", brand_name
            )
            .replace("{feature}", brand_visuals_feature)
            .replace("{criteria}", brand_visuals_criteria)
            .replace("{context_and_examples}", context_and_examples)
        )
        # Use full video for this feature
        llm_params.set_modality({"type": "video", "video_uri": video_uri})
        feature_detected, llm_explanation = detect_feature_with_llm(
            brand_visuals_feature, prompt, llm_params
        )
        if feature_detected:
            brand_visuals = True

        # Include llm details
        brand_visuals_eval_details["llm_details"].append(
            {
                "llm_params": llm_params.__dict__,
                "prompt": prompt,
                "llm_explanation": llm_explanation,
            }
        )

        # 2. Evaluate brand_visuals_1st_5_secs_feature
        prompt = (
            """Is the brand {brand_name} or brand logo {brand_name} visible in the video?
            Consider the following criteria for your answer: {criteria}
            Look through each frame in the video carefully and answer the question.
            Provide the exact timestamp when the brand {brand_name} or brand logo {brand_name} is found.
            {context_and_examples}
        """.replace(
                "{brand_name}", brand_name
            )
            .replace("{feature}", brand_visuals_1st_5_secs_feature)
            .replace("{criteria}", brand_visuals_1st_5_secs_criteria)
            .replace("{context_and_examples}", context_and_examples)
        )
        # Use first 5 secs video for this feature
        video_uri_1st_5_secs = get_n_secs_video_uri_from_uri(video_uri, "1st_5_secs")
        llm_params.set_modality({"type": "video", "video_uri": video_uri_1st_5_secs})
        feature_detected, llm_explanation = detect_feature_with_llm(
            brand_visuals_1st_5_secs_feature, prompt, llm_params
        )
        if feature_detected:
            brand_visuals_1st_5_secs = True

        # Include llm details
        brand_visuals_1st_5_secs_eval_details["llm_details"].append(
            {
                "llm_params": llm_params.__dict__,
                "prompt": prompt,
                "llm_explanation": llm_explanation,
            }
        )

    print(f"{brand_visuals_feature}: {brand_visuals}")
    brand_visuals_eval_details["feature_detected"] = brand_visuals
    print(
        f"""{brand_visuals_1st_5_secs_feature}: {brand_visuals_1st_5_secs}
        Logo Big: {brand_visuals_logo_big_1st_5_secs}"""
    )
    brand_visuals_1st_5_secs_eval_details["feature_detected"] = brand_visuals_1st_5_secs

    return (
        brand_visuals_eval_details,
        brand_visuals_1st_5_secs_eval_details,
        brand_visuals_logo_big_1st_5_secs,
    )


In [None]:
# @title 8 & 9) Brand: Brand Mention (Speech) & Brand Mention (Speech) (First 5 seconds)

# @markdown **Features:**

# @markdown **Brand Mention (Speech):** The brand name is heard in the audio or speech at any time in the video.

# @markdown **Brand Mention (Speech) (First 5 seconds):** The brand name is heard in the audio or speech in the first 5 seconds (up to 4.99s) of the video.

@retry(wait=wait_exponential(multiplier=1, min=1, max=60), stop=stop_after_attempt(10), retry=retry_if_exception(RetryCondition), before_sleep=before_sleep_log(logging.getLogger(), logging.INFO))
def detect_brand_mention_speech(
    speech_annotation_results: any,
    video_uri: str,
    brand_name: str,
    brand_variations: list[str],
) -> tuple[dict, dict]:
    """Detect Brand Mention (Speech) & Brand Mention (Speech) (First 5 seconds)
    Args:
        speech_annotation_results: speech annotations
        video_uri: video location in gcs
        brand_name: name of the brand
        brand_variations: a list of brand name variations
    Retirns:
        brand_mention_speech_eval_details,
        brand_mention_speech_1st_5_secs_eval_details: brand mention speech evaluation
    """
    # Feature Brand Mention (Speech)
    brand_mention_speech_feature = "Brand Mention (Speech)"
    brand_mention_speech = False
    brand_mention_speech_criteria = (
        """The brand name is heard in the audio or speech at any time in the video."""
    )
    brand_mention_speech_eval_details = {
        "feature": brand_mention_speech_feature,
        "feature_description": brand_mention_speech_criteria,
        "feature_detected": brand_mention_speech,
        "llm_details": [],
    }
    # Feature Brand Mention (Speech) (First 5 seconds)
    brand_mention_speech_1st_5_secs_feature = "Brand Mention (Speech) (First 5 seconds)"
    brand_mention_speech_1st_5_secs = False
    # remove 1st 5 secs references from prompt to avoid hallucinations since the video is already 5 secs
    brand_mention_speech_1st_5_secs_criteria = (
        """The brand name is heard in the audio or speech in the video."""
    )
    brand_mention_speech_1st_5_secs_eval_details = {
        "feature": brand_mention_speech_1st_5_secs_feature,
        "feature_description": brand_mention_speech_1st_5_secs_criteria,
        "feature_detected": brand_mention_speech_1st_5_secs,
        "llm_details": [],
    }

    # Video API: Evaluate brand_mention_speech and brand_mention_speech_1st_5_secs
    if use_annotations:
        if "speech_transcriptions" in speech_annotation_results:
            # Video API: Evaluate brand_mention & brand_mention_speech_1st_5_secs
            (
                brand_mention_speech,
                brand_mention_speech_1st_5_secs,
            ) = find_elements_in_transcript(
                speech_transcriptions=speech_annotation_results.get(
                    "speech_transcriptions"
                ),
                elements=brand_variations,
                elements_categories=[],
                apply_condition=False,
            )
        else:
            print(
                f"No Speech annotations found. Skipping {brand_mention_speech_feature} evaluation with Video Intelligence API."
            )

    # LLM: Evaluate brand_mention_speech and brand_mention_speech_1st_5_secs
    if use_llms:
        llm_params = LLMParameters(
            model_name=GEMINI_PRO,
            location=llm_location,
            generation_config=llm_generation_config,
        )

        # LLM Only
        # 1. Evaluate brand_mention_speech_feature
        prompt = (
            """Does the speech mention the brand {brand_name} at any time on the video?
            Consider the following criteria for your answer: {criteria}
            Provide the exact timestamp when the brand {brand_name} is heard in the speech of the video.
            {context_and_examples}
        """.replace(
                "{brand_name}", brand_name
            )
            .replace("{feature}", brand_mention_speech_feature)
            .replace("{criteria}", brand_mention_speech_criteria)
            .replace("{context_and_examples}", context_and_examples)
        )
        # Use full video for this feature
        llm_params.set_modality({"type": "video", "video_uri": video_uri})
        feature_detected, llm_explanation = detect_feature_with_llm(
            brand_mention_speech_feature, prompt, llm_params
        )
        if feature_detected:
            brand_mention_speech = True

        # Include llm details
        brand_mention_speech_eval_details["llm_details"].append(
            {
                "llm_params": llm_params.__dict__,
                "prompt": prompt,
                "llm_explanation": llm_explanation,
            }
        )

        # 2. Evaluate brand_mention_speech_feature_1st_5_secs
        prompt = (
            """Does the speech mention the brand {brand_name} in the video?
            Consider the following criteria for your answer: {criteria}
            Provide the exact timestamp when the brand {brand_name} is heard in the speech of the video.
            Return True if and only if the brand {brand_name} is heard in the speech of the video.
            {context_and_examples}
        """.replace(
                "{brand_name}", brand_name
            )
            .replace("{feature}", brand_mention_speech_1st_5_secs_feature)
            .replace("{criteria}", brand_mention_speech_1st_5_secs_criteria)
            .replace("{context_and_examples}", context_and_examples)
        )
        # Use first 5 secs video for this feature
        video_uri_1st_5_secs = get_n_secs_video_uri_from_uri(video_uri, "1st_5_secs")
        llm_params.set_modality({"type": "video", "video_uri": video_uri_1st_5_secs})
        feature_detected, llm_explanation = detect_feature_with_llm(
            brand_mention_speech_1st_5_secs_feature, prompt, llm_params
        )
        if feature_detected:
            brand_mention_speech_1st_5_secs = True

        # Include llm details
        brand_mention_speech_1st_5_secs_eval_details["llm_details"].append(
            {
                "llm_params": llm_params.__dict__,
                "prompt": prompt,
                "llm_explanation": llm_explanation,
            }
        )

        # Combination of Annotations + LLM
        if use_annotations:
            if "speech_transcriptions" in speech_annotation_results:
                # 1. Evaluate brand_mention_speech_feature
                transcript = get_speech_transcript(
                    speech_annotation_results.get("speech_transcriptions")
                )
                prompt = (
                    """Does the provided speech transcript mention the brand {brand_name}?
                        This is the speech transcript: "{transcript}"
                        Consider the following criteria for your answer: {criteria}
                        {context_and_examples}
                    """.replace(
                        "{brand_name}", brand_name
                    )
                    .replace("{transcript}", transcript)
                    .replace("{feature}", brand_mention_speech_feature)
                    .replace("{criteria}", brand_mention_speech_criteria)
                    .replace("{context_and_examples}", context_and_examples)
                )
                # Set modality to text since we are not using video for Annotations + LLM
                llm_params.set_modality({"type": "text"})
                # If transcript is empty, this feature should be False
                if transcript:
                    feature_detected, llm_explanation = detect_feature_with_llm(
                        brand_mention_speech_feature, prompt, llm_params
                    )
                    if feature_detected:
                        brand_mention_speech = True

                    # Include llm details
                    brand_mention_speech_eval_details["llm_details"].append(
                        {
                            "llm_params": llm_params.__dict__,
                            "prompt": prompt,
                            "llm_explanation": llm_explanation,
                        }
                    )
                else:
                    brand_mention_speech = False
                    # Include default details
                    brand_mention_speech_eval_details["llm_details"].append(
                        {
                            "llm_params": llm_params.__dict__,
                            "prompt": prompt,
                            "llm_explanation": "Annotations + LLM: Speech was not found in annotations.",
                        }
                    )

                # 2. Evaluate brand_mention_speech_feature_1st_5_secs
                transcript_1st_5_secs = get_speech_transcript_1st_5_secs(
                    speech_annotation_results.get("speech_transcriptions")
                )
                prompt = (
                    """Does the provided speech transcript mention the brand {brand_name}?
                        This is the speech transcript: "{transcript}"
                        Consider the following criteria for your answer: {criteria}
                        {context_and_examples}
                    """.replace(
                        "{brand_name}", brand_name
                    )
                    .replace("{transcript}", transcript_1st_5_secs)
                    .replace("{feature}", brand_mention_speech_1st_5_secs_feature)
                    .replace("{criteria}", brand_mention_speech_1st_5_secs_criteria)
                    .replace("{context_and_examples}", context_and_examples)
                )
                # Set modality to text since we are not using video for Annotations + LLM
                llm_params.set_modality({"type": "text"})
                # If transcript is empty, this feature should be False
                if transcript_1st_5_secs:
                    feature_detected, llm_explanation = detect_feature_with_llm(
                        brand_mention_speech_1st_5_secs_feature, prompt, llm_params
                    )
                    if feature_detected:
                        brand_mention_speech_1st_5_secs = True

                    # Include llm details
                    brand_mention_speech_1st_5_secs_eval_details["llm_details"].append(
                        {
                            "llm_params": llm_params.__dict__,
                            "prompt": prompt,
                            "llm_explanation": llm_explanation,
                        }
                    )
                else:
                    brand_mention_speech_1st_5_secs = False
                    # Include default details
                    brand_mention_speech_1st_5_secs_eval_details["llm_details"].append(
                        {
                            "llm_params": llm_params.__dict__,
                            "prompt": prompt,
                            "llm_explanation": "Annotations + LLM: Speech was not found in annotations.",
                        }
                    )
            else:
                print(
                    f"No Speech annotations found. Skipping {brand_mention_speech_feature} evaluation with LLM."
                )

    print(f"{brand_mention_speech_feature}: {brand_mention_speech}")
    brand_mention_speech_eval_details["feature_detected"] = brand_mention_speech
    print(
        f"{brand_mention_speech_1st_5_secs_feature}: {brand_mention_speech_1st_5_secs}"
    )
    brand_mention_speech_1st_5_secs_eval_details["feature_detected"] = (
        brand_mention_speech_1st_5_secs
    )

    return (
        brand_mention_speech_eval_details,
        brand_mention_speech_1st_5_secs_eval_details,
    )


In [None]:
# @title 10 & 11) Brand: Product Visuals & Product Visuals (First 5 seconds)

# @markdown **Features:**

# @markdown 1. **Product Visuals:** A product or branded packaging is visually present at any time in the video. Where the product is a service a relevant substitute should be shown such as via a branded app or branded service personnel.

# @markdown 2. **Product Visuals (First 5 seconds):** A product or branded packaging is visually present in the first 5 seconds (up to 4.99s) of the video. Where the product is a service a relevant substitute should be shown such as via a branded app or branded service personnel.

@retry(wait=wait_exponential(multiplier=1, min=1, max=60), stop=stop_after_attempt(10), retry=retry_if_exception(RetryCondition), before_sleep=before_sleep_log(logging.getLogger(), logging.INFO))
def detect(
    entity: dict,
    segment: dict,
    branded_products_kg_entities: dict,
    branded_products: list[str],
    branded_products_categories: list[str],
):
    """Detect Product Visuals & Product Visuals (First 5 seconds)
    Args:
        entity: entity found in annotations
        segment: segment of the video
        branded_products_kg_entities
        branded_products: list of products
        branded_products_categories: list of products categories
    Returns:
        product_visuals,
        product_visuals_1st_5_secs: evaluation
    """
    product_visuals = False
    product_visuals_1st_5_secs = False
    entity_id = entity.get("entity_id")
    entity_description = entity.get("description")
    # Check if any of the provided products or categories
    # match the label segment description
    found_branded_products = [
        bp for bp in branded_products if bp.lower() == entity_description.lower()
    ]
    found_branded_product_categories = [
        bp
        for bp in branded_products_categories
        if bp.lower() == entity_description.lower()
    ]
    if (
        entity_id in branded_products_kg_entities
        or len(found_branded_products) > 0
        or len(found_branded_product_categories) > 0
    ):
        # Check confidence against user defined threshold
        if segment.get("confidence") >= confidence_threshold:
            product_visuals = True
            start_time_secs = calculate_time_seconds(
                segment.get("segment"), "start_time_offset"
            )
            if start_time_secs <= early_time_seconds:
                product_visuals_1st_5_secs = True

    return product_visuals, product_visuals_1st_5_secs

@retry(wait=wait_exponential(multiplier=1, min=1, max=60), stop=stop_after_attempt(10), retry=retry_if_exception(RetryCondition), before_sleep=before_sleep_log(logging.getLogger(), logging.INFO))
def detect_product_visuals(
    label_annotation_results: any,
    video_uri: str,
    branded_products: list[str],
    branded_products_categories: list[str],
) -> tuple[dict, dict]:
    """Detect Product Visuals & Product Visuals (First 5 seconds)
    Args:
        label_annotation_results: label annotations
        video_location: video location in gcs
        branded_products: list of products
        branded_products_categories: list of products categories
    Returns:
        product_visuals_eval_details,
        product_visuals_1st_5_secs_eval_details: product visuals evaluation
    """
    # Feature Product Visuals
    product_visuals_feature = "Product Visuals"
    product_visuals = False
    product_visuals_criteria = """A product or branded packaging is visually present at any time in the video.
        Where the product is a service a relevant substitute should be shown such as via a branded app or branded
        service personnel."""
    product_visuals_eval_details = {
        "feature": product_visuals_feature,
        "feature_description": product_visuals_criteria,
        "feature_detected": product_visuals,
        "llm_details": [],
    }
    # Feature Product Visuals (First 5 seconds)
    product_visuals_1st_5_secs_feature = "Product Visuals (First 5 seconds)"
    product_visuals_1st_5_secs = False
    # Remove 1st 5 secs references from prompt to avoid hallucinations since the video is already 5 secs
    product_visuals_1st_5_secs_criteria = """A product or branded packaging is visually present the video.
    Where the product is a service a relevant substitute should be shown such as via a
    branded app or branded service personnel."""
    product_visuals_1st_5_secs_eval_details = {
        "feature": product_visuals_1st_5_secs_feature,
        "feature_description": product_visuals_1st_5_secs_criteria,
        "feature_detected": product_visuals_1st_5_secs,
        "llm_details": [],
    }

    branded_products_kg_entities = get_knowledge_graph_entities(branded_products)

    # Video API: Evaluate product_visuals_feature and product_visuals_1st_5_secs_feature
    if use_annotations:
        # Video API: Evaluate product_visuals and product_visuals_1st_5_secs
        # Check in annotations at segment level
        if "segment_label_annotations" in label_annotation_results:
            # Process video/segment level label annotations
            for segment_label in label_annotation_results.get(
                "segment_label_annotations"
            ):
                for segment in segment_label.get("segments"):
                    pv, pv_1st_5_secs = detect(
                        segment_label.get("entity"),
                        segment,
                        branded_products_kg_entities,
                        branded_products,
                        branded_products_categories,
                    )
                    if pv:
                        product_visuals = True
                    if pv_1st_5_secs:
                        product_visuals_1st_5_secs = True
        else:
            print(
                f"No Segment Label annotations found. Skipping {product_visuals_feature} Segment Label evaluation with Video Intelligence API."
            )

        # Check in annotations at shot level
        if "shot_label_annotations" in label_annotation_results:
            # Process shot level label annotations
            for shot_label in label_annotation_results.get("shot_label_annotations"):
                for segment in shot_label.get("segments"):
                    pv, pv_1st_5_secs = detect(
                        shot_label.get("entity"),
                        segment,
                        branded_products_kg_entities,
                        branded_products,
                        branded_products_categories,
                    )
                    if pv:
                        product_visuals = True
                    if pv_1st_5_secs:
                        product_visuals_1st_5_secs = True
        else:
            print(
                f"No Shot Label annotations found. Skipping {product_visuals_feature} Shot Label evaluation with Video Intelligence API."
            )

        # Check in annotations at frame level
        if "frame_label_annotations" in label_annotation_results:
            # Process frame level label annotations
            for frame_label in label_annotation_results.get("frame_label_annotations"):
                for frame in frame_label.get("frames"):
                    pv, pv_1st_5_secs = detect(
                        frame_label.get("entity"),
                        frame,
                        branded_products_kg_entities,
                        branded_products,
                        branded_products_categories,
                    )
                    if pv:
                        product_visuals = True
                    if pv_1st_5_secs:
                        product_visuals_1st_5_secs = True
        else:
            print(
                f"No Frame Label annotations found. Skipping {product_visuals_feature} Frame Label evaluation with Video Intelligence API."
            )

    # LLM: Evaluate product_visuals_feature and product_visuals_1st_5_secs_feature
    if use_llms:
        llm_params = LLMParameters(
            model_name=GEMINI_PRO,
            location=llm_location,
            generation_config=llm_generation_config,
        )
        # 1. Evaluate product_visuals_feature
        prompt = (
            """Is any of the following products: {branded_products}
            or product categories: {branded_products_categories}
            visually present at any time in the video?
            Consider the following criteria for your answer: {criteria}
            Provide the exact timestamp when the products {branded_products}
            or product categories: {branded_products_categories} are found.
            {context_and_examples}
        """.replace(
                "{branded_products}", ", ".join(branded_products)
            )
            .replace(
                "{branded_products_categories}", ", ".join(branded_products_categories)
            )
            .replace("{feature}", product_visuals_feature)
            .replace("{criteria}", product_visuals_criteria)
            .replace("{context_and_examples}", context_and_examples)
        )
        # Use full video for this feature
        llm_params.set_modality({"type": "video", "video_uri": video_uri})
        feature_detected, llm_explanation = detect_feature_with_llm(
            product_visuals_feature, prompt, llm_params
        )
        if feature_detected:
            product_visuals = True

        # Include llm details
        product_visuals_eval_details["llm_details"].append(
            {
                "llm_params": llm_params.__dict__,
                "prompt": prompt,
                "llm_explanation": llm_explanation,
            }
        )

        # 2. Evaluate product_visuals_1st_5_secs_feature
        prompt = (
            """Is any of the following products: {branded_products}
            or product categories: {branded_products_categories}
            visually present in the video?
            Consider the following criteria for your answer: {criteria}
            Provide the exact timestamp when the products {branded_products}
            or product categories: {branded_products_categories} are visually present.
            Return True if and only if the branded producs or product categories are
            visually present in the video.
            {context_and_examples}
        """.replace(
                "{branded_products}", ", ".join(branded_products)
            )
            .replace(
                "{branded_products_categories}", ", ".join(branded_products_categories)
            )
            .replace("{feature}", product_visuals_1st_5_secs_feature)
            .replace("{criteria}", product_visuals_1st_5_secs_criteria)
            .replace("{context_and_examples}", context_and_examples)
        )
        # Use first 5 secs video for this feature
        video_uri_1st_5_secs = get_n_secs_video_uri_from_uri(video_uri, "1st_5_secs")
        llm_params.set_modality({"type": "video", "video_uri": video_uri_1st_5_secs})
        feature_detected, llm_explanation = detect_feature_with_llm(
            product_visuals_1st_5_secs_feature, prompt, llm_params
        )
        if feature_detected:
            product_visuals_1st_5_secs = True

        product_visuals_1st_5_secs_eval_details["llm_details"].append(
            {
                "llm_params": llm_params.__dict__,
                "prompt": prompt,
                "llm_explanation": llm_explanation,
            }
        )

    print(f"{product_visuals_feature}: {product_visuals}")
    product_visuals_eval_details["feature_detected"] = product_visuals
    print(f"{product_visuals_1st_5_secs_feature}: {product_visuals_1st_5_secs}")
    product_visuals_1st_5_secs_eval_details["feature_detected"] = (
        product_visuals_1st_5_secs
    )

    return product_visuals_eval_details, product_visuals_1st_5_secs_eval_details

In [None]:
# @title 12, 13) Brand: Product Mention (Text) & Product Mention (Text) (First 5 seconds)

# @markdown **Features:**

# @markdown **Product Mention (Text):** The branded product names or generic product categories are present in any text or overlay at any time in the video.

# @markdown **Product Mention (Text) (First 5 seconds):** The branded product names or generic product categories are present in any text or overlay in the first 5 seconds (up to 4.99s) of the video.

@retry(wait=wait_exponential(multiplier=1, min=1, max=60), stop=stop_after_attempt(10), retry=retry_if_exception(RetryCondition), before_sleep=before_sleep_log(logging.getLogger(), logging.INFO))
def detect_product_mention_text(
    text_annotation_results: any,
    video_uri: str,
    branded_products: list[str],
    branded_products_categories: list[str],
) -> tuple[dict, dict]:
    """Detect Product Mention (Text) & Product Mention (Text) (First 5 seconds)
    Args:
        text_annotation_results: text annotations
        video_uri: video location in gcs
        branded_products: list of products
        branded_products_categories: list of products categories
    Returns:
        product_mention_text_eval_details,
        product_mention_text_1st_5_secs_eval_details: product mention text evaluation
    """
    # Feature Product Mention (Text)
    product_mention_text_feature = "Product Mention (Text)"
    product_mention_text = False
    product_mention_text_criteria = """The branded product names or generic product categories
        are present in any text or overlay at any time in the video."""
    product_mention_text_eval_details = {
        "feature": product_mention_text_feature,
        "feature_description": product_mention_text_criteria,
        "feature_detected": product_mention_text,
        "llm_details": [],
    }
    # Feature Product Mention (Text) (First 5 seconds)
    product_mention_text_1st_5_secs_feature = "Product Mention (Text) (First 5 seconds)"
    product_mention_text_1st_5_secs = False
    # Remove 1st 5 secs references from prompt to avoid hallucinations since the video is already 5 secs
    product_mention_text_1st_5_secs_criteria = """The branded product names or generic product categories
    are present in any text or overlay in the video."""
    product_mention_text_1st_5_secs_eval_details = {
        "feature": product_mention_text_1st_5_secs_feature,
        "feature_description": product_mention_text_1st_5_secs_criteria,
        "feature_detected": product_mention_text_1st_5_secs,
        "llm_details": [],
    }

    # Video API: Evaluate product_mention_text_feature and product_mention_text_1st_5_secs_feature
    if use_annotations:
        if "text_annotations" in text_annotation_results:
            # Video API: Evaluate product_mention_text_feature and product_mention_text_1st_5_secs_feature
            for text_annotation in text_annotation_results.get("text_annotations"):
                text = text_annotation.get("text")
                found_branded_products = [
                    prod for prod in branded_products if prod.lower() in text.lower()
                ]
                found_branded_products_categories = [
                    prod
                    for prod in branded_products_categories
                    if prod.lower() in text.lower()
                ]
                if (
                    len(found_branded_products) > 0
                    or len(found_branded_products_categories) > 0
                ):
                    product_mention_text = True
                    pmt_1st_5_secs, frame = detected_text_in_first_5_seconds(
                        text_annotation
                    )
                    if pmt_1st_5_secs:
                        product_mention_text_1st_5_secs = True
        else:
            print(
                f"No Text annotations found. Skipping {product_mention_text_feature} evaluation with Video Intelligence API."
            )

    # LLM: Evaluate product_mention_text_feature and product_mention_text_1st_5_secs_feature
    if use_llms:
        llm_params = LLMParameters(
            model_name=GEMINI_PRO,
            location=llm_location,
            generation_config=llm_generation_config,
        )
        # 1. Evaluate product_mention_text_feature
        prompt = (
            """Is any of the following products: {branded_products}
            or product categories: {branded_products_categories}
            present in any text or overlay at any time in the video?
            Consider the following criteria for your answer: {criteria}
            Provide the exact timestamp when the products {branded_products}
            or product categories: {branded_products_categories} are found
            in any text or overlay in the video.
            {context_and_examples}
        """.replace(
                "{branded_products}", f"{', '.join(branded_products)}"
            )
            .replace(
                "{branded_products_categories}",
                f"{', '.join(branded_products_categories)}",
            )
            .replace("{feature}", product_mention_text_feature)
            .replace("{criteria}", product_mention_text_criteria)
            .replace("{context_and_examples}", context_and_examples)
        )
        # Use full video for this feature
        llm_params.set_modality({"type": "video", "video_uri": video_uri})
        feature_detected, llm_explanation = detect_feature_with_llm(
            product_mention_text_feature, prompt, llm_params
        )
        if feature_detected:
            product_mention_text = True

        # Include llm details
        product_mention_text_eval_details["llm_details"].append(
            {
                "llm_params": llm_params.__dict__,
                "prompt": prompt,
                "llm_explanation": llm_explanation,
            }
        )

        # 2. Evaluate product_mention_text_1st_5_secs_feature
        prompt = (
            """Is any of the following products: {branded_products}
            or product categories: {branded_products_categories}
            present in any text or overlay in the video?
            Consider the following criteria for your answer: {criteria}
            Provide the exact timestamp when the products {branded_products}
            or product categories: {branded_products_categories} are found
            in any text or overlay in the video.
            {context_and_examples}
        """.replace(
                "{branded_products}", f"{', '.join(branded_products)}"
            )
            .replace(
                "{branded_products_categories}",
                f"{', '.join(branded_products_categories)}",
            )
            .replace("{feature}", product_mention_text_1st_5_secs_feature)
            .replace("{criteria}", product_mention_text_1st_5_secs_criteria)
            .replace("{context_and_examples}", context_and_examples)
        )
        # Use first 5 secs video for this feature
        video_uri_1st_5_secs = get_n_secs_video_uri_from_uri(video_uri, "1st_5_secs")
        llm_params.set_modality({"type": "video", "video_uri": video_uri_1st_5_secs})
        feature_detected, llm_explanation = detect_feature_with_llm(
            product_mention_text_1st_5_secs_feature, prompt, llm_params
        )
        if feature_detected:
            product_mention_text_1st_5_secs = True

        product_mention_text_1st_5_secs_eval_details["llm_details"].append(
            {
                "llm_params": llm_params.__dict__,
                "prompt": prompt,
                "llm_explanation": llm_explanation,
            }
        )

    print(f"{product_mention_text_feature}: {product_mention_text}")
    product_mention_text_eval_details["feature_detected"] = product_mention_text
    print(
        f"{product_mention_text_1st_5_secs_feature}: {product_mention_text_1st_5_secs}"
    )
    product_mention_text_1st_5_secs_eval_details["feature_detected"] = (
        product_mention_text_1st_5_secs
    )

    return (
        product_mention_text_eval_details,
        product_mention_text_1st_5_secs_eval_details,
    )

In [None]:
# @title 14, 15) Brand: Product Mention (Speech), Product Mention (Speech) (First 5 seconds)

# @markdown **Features:**

# @markdown **Product Mention (Speech):** The branded product names or generic product categories are heard or mentioned in the audio or speech at any time in the video.

# @markdown **Product Mention (Speech) (First 5 seconds):** The branded product names or generic product categories are heard or mentioned in the audio or speech in the first 5 seconds (up to 4.99s) of the video.

@retry(wait=wait_exponential(multiplier=1, min=1, max=60), stop=stop_after_attempt(10), retry=retry_if_exception(RetryCondition), before_sleep=before_sleep_log(logging.getLogger(), logging.INFO))
def detect_product_mention_speech(
    speech_annotation_results: any,
    video_uri: str,
    branded_products: list[str],
    branded_products_categories: list[str],
) -> tuple[dict, dict]:
    """Detect Product Mention (Speech) & Product Mention (Speech) (First 5 seconds)
    Args:
        speech_annotation_results: peech annotations
        video_uri: video location in gcs
        branded_products: list of products
        branded_products_categories: list of products categories
    Returns:
        product_mention_speech_eval_details,
        product_mention_speech_1st_5_secs_eval_details: product mention speech evaluation
    """
    # Feature Product Mention (Speech)
    product_mention_speech_feature = "Product Mention (Speech)"
    product_mention_speech = False
    product_mention_speech_criteria = """The branded product names or generic product categories
        are heard or mentioned in the audio or speech at any time in the video."""
    product_mention_speech_eval_details = {
        "feature": product_mention_speech_feature,
        "feature_description": product_mention_speech_criteria,
        "feature_detected": product_mention_speech,
        "llm_details": [],
    }
    # Feature Product Mention (Speech) (First 5 seconds)
    product_mention_speech_1st_5_secs_feature = (
        "Product Mention (Speech) (First 5 seconds)"
    )
    product_mention_speech_1st_5_secs = False
    # remove 1st 5 secs references from prompt to avoid hallucinations since the video is already 5 secs
    product_mention_speech_1st_5_secs_criteria = """The branded product names or generic product categories
    are heard or mentioned in the audio or speech in the the video."""
    product_mention_speech_1st_5_secs_eval_details = {
        "feature": product_mention_speech_1st_5_secs_feature,
        "feature_description": product_mention_speech_1st_5_secs_criteria,
        "feature_detected": product_mention_speech_1st_5_secs,
        "llm_details": [],
    }

    # Video API: Evaluate product_mention_speech_feature and product_mention_speech_1st_5_secs_feature
    if use_annotations:
        if "speech_transcriptions" in speech_annotation_results:
            # Video API: Evaluate product_mention_speech & product_mention_speech_1st_5_secs
            (
                product_mention_speech,
                product_mention_speech_1st_5_secs,
            ) = find_elements_in_transcript(
                speech_transcriptions=speech_annotation_results.get(
                    "speech_transcriptions"
                ),
                elements=branded_products,
                elements_categories=branded_products_categories,
                apply_condition=False,
            )
        else:
            print(
                f"No Speech annotations found. Skipping {product_mention_speech_feature} evaluation with Video Intelligence API."
            )

    # LLM: Evaluate product_mention_speech_feature and product_mention_speech_1st_5_secs_feature
    if use_llms:
        llm_params = LLMParameters(
            model_name=GEMINI_PRO,
            location=llm_location,
            generation_config=llm_generation_config,
        )

        # LLM Only
        # 1. Evaluate product_mention_speech_feature
        prompt = (
            """Are any of the following products: {branded_products}
            or product categories: {branded_products_categories} heard
            at any time in the speech of the video?
            Consider the following criteria for your answer: {criteria}
            Provide the exact timestamp when the products {branded_products}
            or product categories {branded_products_categories} are heard in the speech of the video.
            Return False if the products or product categories are not heard in the speech.
            Only strictly use the speech of the video to answer, don't consider visual elements.
            {context_and_examples}
        """.replace(
                "{branded_products}", f"{', '.join(branded_products)}"
            )
            .replace(
                "{branded_products_categories}",
                f"{', '.join(branded_products_categories)}",
            )
            .replace("{feature}", product_mention_speech_feature)
            .replace("{criteria}", product_mention_speech_criteria)
            .replace("{context_and_examples}", context_and_examples)
        )
        # Use full video for this feature
        llm_params.set_modality({"type": "video", "video_uri": video_uri})
        feature_detected, llm_explanation = detect_feature_with_llm(
            product_mention_speech_feature, prompt, llm_params
        )
        if feature_detected:
            product_mention_speech = True

        # Include llm details
        product_mention_speech_eval_details["llm_details"].append(
            {
                "llm_params": llm_params.__dict__,
                "prompt": prompt,
                "llm_explanation": llm_explanation,
            }
        )

        # 2. Evaluate product_mention_speech_feature_1st_5_secs
        prompt = (
            """Are any of the following products: {branded_products}
            or product categories: {branded_products_categories} heard in the speech of the video?
            Consider the following criteria for your answer: {criteria}
            Provide the exact timestamp when the products {branded_products}
            or product categories {branded_products_categories} are heard in the speech of the video.
            Return False if the products or product categories are not heard in the speech.
            Only strictly use the speech of the video to answer, don't consider visual elements.
            {context_and_examples}
        """.replace(
                "{branded_products}", f"{', '.join(branded_products)}"
            )
            .replace(
                "{branded_products_categories}",
                f"{', '.join(branded_products_categories)}",
            )
            .replace("{feature}", product_mention_speech_1st_5_secs_feature)
            .replace("{criteria}", product_mention_speech_1st_5_secs_criteria)
            .replace("{context_and_examples}", context_and_examples)
        )
        # Use first 5 secs video for this feature
        video_uri_1st_5_secs = get_n_secs_video_uri_from_uri(video_uri, "1st_5_secs")
        llm_params.set_modality({"type": "video", "video_uri": video_uri_1st_5_secs})
        feature_detected, llm_explanation = detect_feature_with_llm(
            product_mention_speech_1st_5_secs_feature, prompt, llm_params
        )
        if feature_detected:
            product_mention_speech_1st_5_secs = True

        # Include llm details
        product_mention_speech_1st_5_secs_eval_details["llm_details"].append(
            {
                "llm_params": llm_params.__dict__,
                "prompt": prompt,
                "llm_explanation": llm_explanation,
            }
        )

        # Combination of Annotations + LLM
        if use_annotations:
            if "speech_transcriptions" in speech_annotation_results:
                # 1. Evaluate product_mention_speech_feature
                transcript = get_speech_transcript(
                    speech_annotation_results.get("speech_transcriptions")
                )
                prompt = (
                    """Does the provided speech transcript mention any of the following products: {branded_products}
                        or product categories: {branded_products_categories} at any time in the video?
                        This is the speech transcript: "{transcript}"
                        Consider the following criteria for your answer: {criteria}
                        {context_and_examples}
                    """.replace(
                        "{branded_products}", f"{', '.join(branded_products)}"
                    )
                    .replace(
                        "{branded_products_categories}",
                        f"{', '.join(branded_products_categories)}",
                    )
                    .replace("{transcript}", transcript)
                    .replace("{feature}", product_mention_speech_feature)
                    .replace("{criteria}", product_mention_speech_criteria)
                    .replace("{context_and_examples}", context_and_examples)
                )
                # Set modality to text since we are not using video for Annotations + LLM
                llm_params.set_modality({"type": "text"})
                # If transcript is empty, this feature should be False
                if transcript:
                    feature_detected, llm_explanation = detect_feature_with_llm(
                        product_mention_speech_feature, prompt, llm_params
                    )
                    if feature_detected:
                        product_mention_speech = True

                    # Include llm details
                    product_mention_speech_eval_details["llm_details"].append(
                        {
                            "llm_params": llm_params.__dict__,
                            "prompt": prompt,
                            "llm_explanation": llm_explanation,
                        }
                    )
                else:
                    product_mention_speech = False
                    # Include default details
                    product_mention_speech_eval_details["llm_details"].append(
                        {
                            "llm_params": llm_params.__dict__,
                            "prompt": prompt,
                            "llm_explanation": "Annotations + LLM: Speech was not found in annotations.",
                        }
                    )

                # 2. Evaluate product_mention_speech_feature_1st_5_secs
                transcript_1st_5_secs = get_speech_transcript_1st_5_secs(
                    speech_annotation_results.get("speech_transcriptions")
                )
                prompt = (
                    """Does the provided speech transcript mention any of the following products: {branded_products}
                        or product categories: {branded_products_categories} in the video?
                        This is the speech transcript: "{transcript}"
                        Consider the following criteria for your answer: {criteria}
                        {context_and_examples}
                    """.replace(
                        "{branded_products}", f"{', '.join(branded_products)}"
                    )
                    .replace(
                        "{branded_products_categories}",
                        f"{', '.join(branded_products_categories)}",
                    )
                    .replace("{transcript}", transcript_1st_5_secs)
                    .replace("{feature}", product_mention_speech_1st_5_secs_feature)
                    .replace("{criteria}", product_mention_speech_1st_5_secs_criteria)
                    .replace("{context_and_examples}", context_and_examples)
                )
                # Set modality to text since we are not using video for Annotations + LLM
                llm_params.set_modality({"type": "text"})
                # If transcript is empty, this feature should be False
                if transcript_1st_5_secs:
                    feature_detected, llm_explanation = detect_feature_with_llm(
                        product_mention_speech_1st_5_secs_feature, prompt, llm_params
                    )
                    if feature_detected:
                        product_mention_speech_1st_5_secs = True

                    # Include llm details
                    product_mention_speech_1st_5_secs_eval_details[
                        "llm_details"
                    ].append(
                        {
                            "llm_params": llm_params.__dict__,
                            "prompt": prompt,
                            "llm_explanation": llm_explanation,
                        }
                    )
                else:
                    product_mention_speech_1st_5_secs = False
                    # Include default details
                    product_mention_speech_1st_5_secs_eval_details[
                        "llm_details"
                    ].append(
                        {
                            "llm_params": llm_params.__dict__,
                            "prompt": prompt,
                            "llm_explanation": "Annotations + LLM: Speech was not found in annotations.",
                        }
                    )
            else:
                print(
                    f"No Speech annotations found. Skipping {product_mention_speech_feature} evaluation with LLM."
                )

    print(f"{product_mention_speech_feature}: {product_mention_speech}")
    product_mention_speech_eval_details["feature_detected"] = product_mention_speech
    print(
        f"{product_mention_speech_1st_5_secs_feature}: {product_mention_speech_1st_5_secs}"
    )
    product_mention_speech_1st_5_secs_eval_details["feature_detected"] = (
        product_mention_speech_1st_5_secs
    )

    return (
        product_mention_speech_eval_details,
        product_mention_speech_1st_5_secs_eval_details,
    )

In [None]:
# @title 16 & 17) Connect: Visible Face (First 5 seconds) & Visible Face (Close Up)

# @markdown **Features:**

# @markdown  **Visible Face (First 5 seconds):** At least one human face is present in the first 5 seconds (up to 4.99s) of the video. Alternate representations of people such as Animations or Cartoons ARE acceptable.

# @markdown  **Visible Face (Close Up):** There is a close up of a human face at any time in the video.

@retry(wait=wait_exponential(multiplier=1, min=1, max=60), stop=stop_after_attempt(10), retry=retry_if_exception(RetryCondition), before_sleep=before_sleep_log(logging.getLogger(), logging.INFO))
def detect_visible_face(
    face_annotation_results: any, video_uri: str
) -> tuple[bool, bool]:
    """Detect Visible Face (First 5 seconds) & Visible Face (Close Up)
    Args:
        face_annotation_results: face annotations
        video_uri: video location in gcs
    Returns:
        visible_face_1st_5_secs_eval_details,
        visible_face_close_up_eval_details: visible face evaluation
    """
    # Feature Visible Face (First 5 seconds)
    visible_face_1st_5_secs_feature = "Visible Face (First 5 seconds)"
    visible_face_1st_5_secs = False
    # Remove 1st 5 secs references from prompt to avoid hallucinations since the video is already 5 secs
    visible_face_1st_5_secs_criteria = """At least one human face is present in the video.
    Alternate representations of people such as Animations or Cartoons ARE acceptable."""
    visible_face_1st_5_secs_eval_details = {
        "feature": visible_face_1st_5_secs_feature,
        "feature_description": visible_face_1st_5_secs_criteria,
        "feature_detected": visible_face_1st_5_secs,
        "llm_details": [],
    }
    # Feature Visible Face (Close Up)
    visible_face_close_up_feature = "Visible Face (Close Up)"
    visible_face_close_up = False
    visible_face_close_up_criteria = (
        """There is a close up of a human face at any time in the video."""
    )
    visible_face_close_up_eval_details = {
        "feature": visible_face_close_up_feature,
        "feature_description": visible_face_close_up_criteria,
        "feature_detected": visible_face_close_up,
        "llm_details": [],
    }

    # Video API: Evaluate visible_face_1st_5_secs_feature and visible_face_close_up_feature
    if use_annotations:
        if "face_detection_annotations" in face_annotation_results:
            # Video API: Evaluate visible_face_1st_5_secs_feature and visible_face_close_up_feature
            if face_annotation_results.get("face_detection_annotations"):
                for annotation in face_annotation_results.get(
                    "face_detection_annotations"
                ):
                    for track in annotation.get("tracks"):
                        start_time_secs = calculate_time_seconds(
                            track.get("segment"), "start_time_offset"
                        )
                        # Check confidence against user defined threshold
                        if track.get("confidence") >= confidence_threshold:
                            if start_time_secs < early_time_seconds:
                                visible_face_1st_5_secs = True
                            for face_object in track.get("timestamped_objects"):
                                box = face_object.get("normalized_bounding_box")
                                left = box.get("left") or 0
                                right = box.get("right") or 1
                                top = box.get("top") or 0
                                bottom = box.get("bottom") or 1
                                width = right - left
                                height = bottom - top
                                surface = width * height
                                if surface >= face_surface_threshold:
                                    visible_face_close_up = True
        else:
            print(
                f"No Face annotations found. Skipping {visible_face_1st_5_secs_feature} evaluation with Video Intelligence API."
            )

    # LLM: Evaluate visible_face_1st_5_secs_feature and visible_face_close_up_feature
    if use_llms:
        llm_params = LLMParameters(
            model_name=GEMINI_PRO,
            location=llm_location,
            generation_config=llm_generation_config,
        )
        # 1. Evaluate visible_face_1st_5_secs_feature
        prompt = (
            """Is there a human face present in the video?
            Consider the following criteria for your answer: {criteria}
            Look through each frame in the video carefully and answer the question.
            Provide the exact timestamp when the human face is present.
            {context_and_examples}
        """.replace(
                "{feature}", visible_face_1st_5_secs_feature
            )
            .replace("{criteria}", visible_face_1st_5_secs_criteria)
            .replace("{context_and_examples}", context_and_examples)
        )
        # Use first 5 secs video for this feature
        video_uri_1st_5_secs = get_n_secs_video_uri_from_uri(video_uri, "1st_5_secs")
        llm_params.set_modality({"type": "video", "video_uri": video_uri_1st_5_secs})
        feature_detected, llm_explanation = detect_feature_with_llm(
            visible_face_1st_5_secs_feature, prompt, llm_params
        )
        if feature_detected:
            visible_face_1st_5_secs = True

        # Include llm details
        visible_face_1st_5_secs_eval_details["llm_details"].append(
            {
                "llm_params": llm_params.__dict__,
                "prompt": prompt,
                "llm_explanation": llm_explanation,
            }
        )

        # 2. Evaluate visible_face_close_up_feature
        prompt = (
            """Is there a close up of a human face present at any time the video?
            Consider the following criteria for your answer: {criteria}
            Look through each frame in the video carefully and answer the question.
            Provide the exact timestamp when there is a close up of a human face.
            {context_and_examples}
        """.replace(
                "{feature}", visible_face_close_up_feature
            )
            .replace("{criteria}", visible_face_close_up_criteria)
            .replace("{context_and_examples}", context_and_examples)
        )
        # Use full video for this feature
        llm_params.set_modality({"type": "video", "video_uri": video_uri})
        feature_detected, llm_explanation = detect_feature_with_llm(
            visible_face_close_up_feature, prompt, llm_params
        )
        if feature_detected:
            visible_face_close_up = True

        # Include llm details
        visible_face_close_up_eval_details["llm_details"].append(
            {
                "llm_params": llm_params.__dict__,
                "prompt": prompt,
                "llm_explanation": llm_explanation,
            }
        )

    print(f"{visible_face_1st_5_secs_feature}: {visible_face_1st_5_secs}")
    visible_face_1st_5_secs_eval_details["feature_detected"] = visible_face_1st_5_secs
    print(f"{visible_face_close_up_feature}: {visible_face_close_up}")
    visible_face_close_up_eval_details["feature_detected"] = visible_face_close_up

    return visible_face_1st_5_secs_eval_details, visible_face_close_up_eval_details


In [None]:
# @title 18 & 19) Connect: Presence of People & Presence of People (First 5 seconds)

# @markdown **Features:**

# @markdown  **Presence of People:** People are shown in any capacity at any time in the video. Any human body parts are acceptable to pass this guideline. Alternate representations of people such as Animations or Cartoons ARE acceptable.

# @markdown  **Presence of People (First 5 seconds):** People are shown in any capacity in the first 5 seconds (up to 4.99s) of the video. Any human body parts are acceptable to pass this guideline. Alternate representations of people such as Animations or Cartoons ARE acceptable.

@retry(wait=wait_exponential(multiplier=1, min=1, max=60), stop=stop_after_attempt(10), retry=retry_if_exception(RetryCondition), before_sleep=before_sleep_log(logging.getLogger(), logging.INFO))
def detect_presence_of_people(
    people_annotation_results: any, video_uri: str
) -> tuple[dict, dict]:
    """Detect Presence of People & Presence of People (First 5 seconds)
    Args:
        people_annotation_results: people annotations
        video_uri: video location in gcs
    Returns:
        presence_of_people_eval_details,
        presence_of_people_1st_5_secs_eval_details: presence of people evaluation
    """
    # Feature Presence of People
    presence_of_people_feature = "Presence of People"
    presence_of_people = False
    presence_of_people_criteria = """People are shown in any capacity at any time in the video.
        Any human body parts are acceptable to pass this guideline. Alternate representations of
        people such as Animations or Cartoons ARE acceptable."""
    presence_of_people_eval_details = {
        "feature": presence_of_people_feature,
        "feature_description": presence_of_people_criteria,
        "feature_detected": presence_of_people,
        "llm_details": [],
    }
    # Feature Presence of People (First 5 seconds)
    presence_of_people_1st_5_secs_feature = "Presence of People (First 5 seconds)"
    presence_of_people_1st_5_secs = False
    # Remove 1st 5 secs references from prompt to avoid hallucinations since the video is already 5 secs
    presence_of_people_1st_5_secs_criteria = """People are shown in any capacity in the video.
    Any human body parts are acceptable to pass this guideline. Alternate
    representations of people such as Animations or Cartoons ARE acceptable."""
    presence_of_people_1st_5_secs_eval_details = {
        "feature": presence_of_people_1st_5_secs_feature,
        "feature_description": presence_of_people_1st_5_secs_criteria,
        "feature_detected": presence_of_people_1st_5_secs,
        "llm_details": [],
    }

    # Video API: Evaluate presence_of_people_feature and presence_of_people_1st_5_secs_feature
    if use_annotations:
        if "person_detection_annotations" in people_annotation_results:
            # Video API: Evaluate presence_of_people_feature and presence_of_people_1st_5_secs_feature
            for people_annotation in people_annotation_results.get(
                "person_detection_annotations"
            ):
                for track in people_annotation.get("tracks"):
                    # Check confidence against user defined threshold
                    if track.get("confidence") >= confidence_threshold:
                        presence_of_people = True
                        start_time_secs = calculate_time_seconds(
                            track.get("segment"), "start_time_offset"
                        )
                        if start_time_secs < early_time_seconds:
                            presence_of_people_1st_5_secs = True
                        # Each segment includes track.get("timestamped_objects") that include
                        # characteristics - -e.g.clothes, posture of the person detected.
        else:
            print(
                f"No People annotations found. Skipping {presence_of_people_feature} evaluation with Video Intelligence API."
            )

    # LLM: Evaluate presence_of_people_feature and presence_of_people_1st_5_secs_feature
    if use_llms:
        llm_params = LLMParameters(
            model_name=GEMINI_PRO,
            location=llm_location,
            generation_config=llm_generation_config,
        )
        # 1. Evaluate presence_of_people_feature
        prompt = (
            """Are there people present at any time in the video?
            Consider the following criteria for your answer: {criteria}
            Look through each frame in the video carefully and answer the question.
            Provide the exact timestamp when people are present in the video.
            {context_and_examples}
        """.replace(
                "{feature}", presence_of_people_feature
            )
            .replace("{criteria}", presence_of_people_criteria)
            .replace("{context_and_examples}", context_and_examples)
        )
        # Use full video for this feature
        llm_params.set_modality({"type": "video", "video_uri": video_uri})
        feature_detected, llm_explanation = detect_feature_with_llm(
            presence_of_people_feature, prompt, llm_params
        )
        if feature_detected:
            presence_of_people = True

        # Include llm details
        presence_of_people_eval_details["llm_details"].append(
            {
                "llm_params": llm_params.__dict__,
                "prompt": prompt,
                "llm_explanation": llm_explanation,
            }
        )

        # 2. Evaluate presence_of_people_1st_5_secs_feature
        prompt = (
            """Are there people present in the video?
            Consider the following criteria for your answer: {criteria}
            Look through each frame in the video carefully and answer the question.
            Provide the exact timestamp when people are present in the video.
            {context_and_examples}
        """.replace(
                "{feature}", presence_of_people_1st_5_secs_feature
            )
            .replace("{criteria}", presence_of_people_1st_5_secs_criteria)
            .replace("{context_and_examples}", context_and_examples)
        )
        # Use first 5 secs video for this feature
        video_uri_1st_5_secs = get_n_secs_video_uri_from_uri(video_uri, "1st_5_secs")
        llm_params.set_modality({"type": "video", "video_uri": video_uri_1st_5_secs})
        feature_detected, llm_explanation = detect_feature_with_llm(
            presence_of_people_1st_5_secs_feature, prompt, llm_params
        )
        if feature_detected:
            presence_of_people_1st_5_secs = True

        # Include llm details
        presence_of_people_1st_5_secs_eval_details["llm_details"].append(
            {
                "llm_params": llm_params.__dict__,
                "prompt": prompt,
                "llm_explanation": llm_explanation,
            }
        )

    print(f"{presence_of_people_feature}: {presence_of_people}")
    presence_of_people_eval_details["feature_detected"] = presence_of_people
    print(f"{presence_of_people_1st_5_secs_feature}: {presence_of_people_1st_5_secs}")
    presence_of_people_1st_5_secs_eval_details["feature_detected"] = (
        presence_of_people_1st_5_secs
    )

    return presence_of_people_eval_details, presence_of_people_1st_5_secs_eval_details

In [None]:
# @title 20) Direct: Audio Speech Early

# @markdown **Features**

# @markdown **Audio Early (First 5 seconds):** Speech is detected in the audio in the first 5 seconds (up to 4.99s) of the video

@retry(wait=wait_exponential(multiplier=1, min=1, max=60), stop=stop_after_attempt(10), retry=retry_if_exception(RetryCondition), before_sleep=before_sleep_log(logging.getLogger(), logging.INFO))
def detect_audio_speech_early(speech_annotation_results: any, video_uri: str) -> bool:
    """Detect Audio Early (First 5 seconds)
    Args:
        speech_annotation_results: speech annotations
        video_uri: video location in gcs
    Returns:
        audio_speech_early_eval_details: audio early evaluation
    """
    # Feature Audio Early (First 5 seconds)
    audio_speech_early_feature = "Audio Early (First 5 seconds)"
    audio_speech_early = False
    # Remove 1st 5 secs references from prompt to avoid hallucinations since the video is already 5 secs
    audio_speech_early_criteria = """Speech is detected in the audio of the video."""
    audio_speech_early_eval_details = {
        "feature": audio_speech_early_feature,
        "feature_description": audio_speech_early_criteria,
        "feature_detected": audio_speech_early,
        "llm_details": [],
    }

    # Video API: Evaluate audio_speech_early_feature
    if use_annotations:
        if "speech_transcriptions" in speech_annotation_results:
            # Video API: Evaluate audio_speech_early_feature
            for speech_transcription in speech_annotation_results.get(
                "speech_transcriptions"
            ):
                for alternative in speech_transcription.get("alternatives"):
                    # Check confidence against user defined threshold
                    if (
                        alternative
                        and alternative.get("confidence") >= confidence_threshold
                    ):
                        # For 1st 5 secs, check elements and elements_categories in words
                        # since only the words[] contain times
                        words = (
                            alternative.get("words") if "words" in alternative else []
                        )
                        for word_info in words:
                            start_time_secs = calculate_time_seconds(
                                word_info, "start_time"
                            )
                            if start_time_secs <= early_time_seconds:
                                audio_speech_early = True
        else:
            print(
                f"No Speech annotations found. Skipping {audio_speech_early_feature} evaluation with Video Intelligence API."
            )

    # LLM: Evaluate audio_speech_early_feature
    if use_llms:
        llm_params = LLMParameters(
            model_name=GEMINI_PRO,
            location=llm_location,
            generation_config=llm_generation_config,
        )

        # LLM Only
        # 1. Evaluate product_mention_speech_feature
        prompt = (
            """Is speech detected in the audio of the video?
            Consider the following criteria for your answer: {criteria}
            Only strictly use the speech of the video to answer.
            {context_and_examples}
        """.replace(
                "{feature}", audio_speech_early_feature
            )
            .replace("{criteria}", audio_speech_early_criteria)
            .replace("{context_and_examples}", context_and_examples)
        )
        # Use first 5 secs video for this feature
        video_uri_1st_5_secs = get_n_secs_video_uri_from_uri(video_uri, "1st_5_secs")
        llm_params.set_modality({"type": "video", "video_uri": video_uri_1st_5_secs})
        feature_detected, llm_explanation = detect_feature_with_llm(
            audio_speech_early_feature, prompt, llm_params
        )
        if feature_detected:
            audio_speech_early = True

        # Include llm details
        audio_speech_early_eval_details["llm_details"].append(
            {
                "llm_params": llm_params.__dict__,
                "prompt": prompt,
                "llm_explanation": llm_explanation,
            }
        )

        # Combination of Annotations + LLM
        if use_annotations:
            if "speech_transcriptions" in speech_annotation_results:
                # 1. Evaluate product_mention_speech_feature
                transcript_1st_5_secs = get_speech_transcript_1st_5_secs(
                    speech_annotation_results.get("speech_transcriptions")
                )
                prompt = (
                        """Does the provided speech transcript mention any words?
                        This is the speech transcript: "{transcript}"
                        Consider the following criteria for your answer: {criteria}
                        {context_and_examples}
                    """.replace(
                            "{transcript}", transcript_1st_5_secs
                        )
                        .replace("{feature}", audio_speech_early_feature)
                        .replace("{criteria}", audio_speech_early_criteria)
                        .replace("{context_and_examples}", context_and_examples)
                    )
                # Set modality to text since we are not using video for Annotations + LLM
                llm_params.set_modality({"type": "text"})
                # If transcript is empty, this feature should be False
                if transcript_1st_5_secs:
                    feature_detected, llm_explanation = detect_feature_with_llm(
                        audio_speech_early_feature, prompt, llm_params
                    )
                    if feature_detected:
                        audio_speech_early = True

                    # Include llm details
                    audio_speech_early_eval_details["llm_details"].append(
                        {
                            "llm_params": llm_params.__dict__,
                            "prompt": prompt,
                            "llm_explanation": llm_explanation,
                        }
                    )
                else:
                    audio_speech_early = False
                    # Include default details
                    audio_speech_early_eval_details["llm_details"].append(
                        {
                            "llm_params": llm_params.__dict__,
                            "prompt": prompt,
                            "llm_explanation": "Annotations + LLM: Speech was not found in annotations.",
                        }
                    )
            else:
                print(
                    f"No Speech annotations found. Skipping {audio_speech_early_feature} evaluation with LLM."
                )

    print(f"{audio_speech_early_feature}: {audio_speech_early}")
    audio_speech_early_eval_details["feature_detected"] = audio_speech_early

    return audio_speech_early_eval_details

In [None]:
# @title 21) Connect: Overall Pacing

# @markdown **Features:**

# @markdown **Overall Pacing:** The pace of the video is greater than 2 seconds per shot/frame

@retry(wait=wait_exponential(multiplier=1, min=1, max=60), stop=stop_after_attempt(10), retry=retry_if_exception(RetryCondition), before_sleep=before_sleep_log(logging.getLogger(), logging.INFO))
def detect_overall_pacing(shot_annotation_results: any, video_uri: str) -> dict:
    """Detect Overall Pacing
    Args:
        shot_annotation_results: shot annotations
        video_uri: video location in gcs
    Returns:
        overall_pacing_eval_details: overall pacing evaluation
    """
    # Feature Overall Pacing
    overall_pacing_feature = "Overall Pacing"
    overall_pacing = False
    overall_pacing_criteria = (
        """The pace of the video is greater than 2 seconds per shot/frame"""
    )
    overall_pacing_eval_details = {
        "feature": overall_pacing_feature,
        "feature_description": overall_pacing_criteria,
        "feature_detected": overall_pacing,
        "llm_details": [],
    }
    total_time_all_shots = 0
    total_shots = 0

    # Video API: Evaluate overall_pacing_feature
    if use_annotations:
        if "shot_annotations" in shot_annotation_results:
            # Video API: Evaluate overall_pacing_feature
            for shot in shot_annotation_results.get("shot_annotations"):
                start_time_secs = calculate_time_seconds(shot, "start_time_offset")
                end_time_secs = calculate_time_seconds(shot, "end_time_offset")
                total_shot_time = end_time_secs - start_time_secs
                total_time_all_shots += total_shot_time
                total_shots += 1
            avg_pacing = total_time_all_shots / total_shots
            if avg_pacing <= avg_shot_duration_seconds:
                overall_pacing = True
        else:
            print(
                f"No Shot annotations found. Skipping {overall_pacing_feature} evaluation with Video Intelligence API."
            )

    # LLM: Evaluate overall_pacing_feature
    if use_llms:
        # 1. Evaluate overall_pacing_feature
        prompt = (
            """Is the pace of video greater than 2 seconds per shot/frame?
            Consider the following criteria for your answer: {criteria}
            Look through each frame in the video carefully and answer the question.
            Return True if and only if the pace of video greater than 2 seconds per shot/frame
            {context_and_examples}
        """.replace(
                "{feature}", overall_pacing_feature
            )
            .replace("{criteria}", overall_pacing_criteria)
            .replace("{context_and_examples}", context_and_examples)
        )
        llm_params = LLMParameters(
            model_name=GEMINI_PRO,
            location=llm_location,
            generation_config=llm_generation_config,
        )
        # Use full video for this feature
        llm_params.set_modality({"type": "video", "video_uri": video_uri})
        feature_detected, llm_explanation = detect_feature_with_llm(
            overall_pacing_feature, prompt, llm_params
        )
        if feature_detected:
            overall_pacing = True

        # Include llm details
        overall_pacing_eval_details["llm_details"].append(
            {
                "llm_params": llm_params.__dict__,
                "prompt": prompt,
                "llm_explanation": llm_explanation,
            }
        )

    print(f"{overall_pacing_feature}: {overall_pacing}")
    overall_pacing_eval_details["feature_detected"] = overall_pacing

    return overall_pacing_eval_details


In [None]:
# @title 22 & 23) Direct: Call To Action (Text) & Call To Action (Speech)

# @markdown: **Features**

# @markdown **Call To Action (Text):** A 'Call To Action' phrase is detected in the video supers (overlaid text) at any time in the video.

# @markdown **Call To Action (Speech):** A 'Call To Action' phrase is heard or mentioned in the audio or speech at any time in the video.


call_to_action_api_list = [
    "LEARN MORE",
    "GET QUOTE",
    "APPLY NOW",
    "SIGN UP",
    "CONTACT US",
    "SUBSCRIBE",
    "DOWNLOAD",
    "BOOK NOW",
    "SHOP NOW",
    "BUY NOW",
    "DONATE NOW",
    "ORDER NOW",
    "PLAY NOW",
    "SEE MORE",
    "START NOW",
    "VISIT SITE",
    "WATCH NOW",
]
call_to_action_verbs_api_list = [
    "LEARN",
    "QUOTE",
    "APPLY",
    "SIGN UP",
    "CONTACT",
    "SUBSCRIBE",
    "DOWNLOAD",
    "BOOK",
    "SHOP",
    "BUY",
    "DONATE",
    "ORDER",
    "PLAY",
    "SEE",
    "START",
    "VISIT",
    "WATCH",
]

@retry(wait=wait_exponential(multiplier=1, min=1, max=60), stop=stop_after_attempt(10), retry=retry_if_exception(RetryCondition), before_sleep=before_sleep_log(logging.getLogger(), logging.INFO))
def detect_call_to_action_speech(
    speech_annotation_results: any,
    video_uri: str,
    branded_call_to_actions: list[str],
) -> bool:
    """Detect Call To Action (Speech)
    Args:
        speech_annotation_results: speech annotations
        video_uri: video location in gcs
        branded_call_to_actions: list of branded call to actions
    Returns:
        call_to_action_speech_eval_details: call to action speech evaluation
    """
    # Feature Call To Action (Speech)
    call_to_action_speech_feature = "Call To Action (Speech)"
    call_to_action_speech = False
    call_to_action_speech_criteria = """A 'Call To Action' phrase is heard or mentioned in the audio or speech
        at any time in the video."""
    call_to_action_speech_eval_details = {
        "feature": call_to_action_speech_feature,
        "feature_description": call_to_action_speech_criteria,
        "feature_detected": call_to_action_speech,
        "llm_details": [],
    }
    call_to_action_api_list.extend(branded_call_to_actions)

    # Video API: Evaluate call_to_action_speech_feature
    if use_annotations:
        if "speech_transcriptions" in speech_annotation_results:
            # Video API: Evaluate call_to_action_speech_feature
            (
                call_to_action_speech,
                na,
            ) = find_elements_in_transcript(
                speech_transcriptions=speech_annotation_results.get(
                    "speech_transcriptions"
                ),
                elements=call_to_action_api_list,
                elements_categories=[],
                apply_condition=False,
            )
        else:
            print(
                f"No Speech annotations found. Skipping {call_to_action_speech} evaluation with Video Intelligence API."
            )

    # LLM: Evaluate call_to_action_speech_feature
    if use_llms:
        llm_params = LLMParameters(
            model_name=GEMINI_PRO,
            location=llm_location,
            generation_config=llm_generation_config,
        )

        # LLM Only
        prompt = (
            """Is any call to action heard or mentioned in the speech of the video?
            Consider the following criteria for your answer: {criteria}
            Some examples of call to actions are: {call_to_actions}
            Provide the exact timestamp when the call to actions are heard or mentioned in the
            speech of the video.
            {context_and_examples}
        """.replace(
                "{call_to_actions}", ", ".join(call_to_action_api_list)
            )
            .replace("{feature}", call_to_action_speech_feature)
            .replace("{criteria}", call_to_action_speech_criteria)
            .replace("{context_and_examples}", context_and_examples)
        )
        # Use full video for this feature
        llm_params.set_modality({"type": "video", "video_uri": video_uri})
        feature_detected, llm_explanation = detect_feature_with_llm(
            call_to_action_speech_feature, prompt, llm_params
        )
        if feature_detected:
            call_to_action_speech = True

        # Include llm details
        call_to_action_speech_eval_details["llm_details"].append(
            {
                "llm_params": llm_params.__dict__,
                "prompt": prompt,
                "llm_explanation": llm_explanation,
            }
        )

        # Combination of Annotations + LLM
        if use_annotations:
            if "speech_transcriptions" in speech_annotation_results:
                # Evaluate call_to_action_speech_feature
                transcript = get_speech_transcript(
                    speech_annotation_results.get("speech_transcriptions")
                )
                prompt = (
                    """Does the provided speech transcript mention any call to actions in the video?
                        This is the speech transcript: "{transcript}"
                        Consider the following criteria for your answer: {criteria}
                        Some examples of call to actions are: {call_to_actions}
                        {context_and_examples}
                    """.replace(
                        "{call_to_actions}", ", ".join(call_to_action_api_list)
                    )
                    .replace("{transcript}", transcript)
                    .replace("{feature}", call_to_action_speech_feature)
                    .replace("{criteria}", call_to_action_speech_criteria)
                    .replace("{context_and_examples}", context_and_examples)
                )
                # Set modality to text since we are not using video for Annotations + LLM
                llm_params.set_modality({"type": "text"})
                # If transcript is empty, this feature should be False
                if transcript:
                    feature_detected, llm_explanation = detect_feature_with_llm(
                        call_to_action_speech_feature, prompt, llm_params
                    )
                    if feature_detected:
                        call_to_action_speech = True

                    # Include llm details
                    call_to_action_speech_eval_details["llm_details"].append(
                        {
                            "llm_params": llm_params.__dict__,
                            "prompt": prompt,
                            "llm_explanation": llm_explanation,
                        }
                    )
                else:
                    call_to_action_speech = False
                    # Include default details
                    call_to_action_speech_eval_details["llm_details"].append(
                        {
                            "llm_params": llm_params.__dict__,
                            "prompt": prompt,
                            "llm_explanation": "Annotations + LLM: Speech was not found in annotations.",
                        }
                    )
            else:
                print(
                    f"No Speech annotations found. Skipping {call_to_action_speech_feature} evaluation with Video Intelligence API."
                )

    print(f"{call_to_action_speech_feature}: {call_to_action_speech}")
    call_to_action_speech_eval_details["feature_detected"] = call_to_action_speech

    return call_to_action_speech_eval_details

@retry(wait=wait_exponential(multiplier=1, min=1, max=60), stop=stop_after_attempt(10), retry=retry_if_exception(RetryCondition), before_sleep=before_sleep_log(logging.getLogger(), logging.INFO))
def detect_call_to_action_text(
    text_annotation_results: any,
    video_uri: str,
    branded_call_to_actions: list[str],
) -> bool:
    """Detect Call To Action (Text)
    Args:
        text_annotation_results: text annotations
        video_uri: video location in gcs
        branded_call_to_actions: list of branded call to actions
    Returns:
        call_to_action_text_eval_details: call to action text evaluation
    """
    # Feature Call To Action (Text)
    call_to_action_text_feature = "Call To Action (Text)"
    call_to_action_text = False
    call_to_action_text_criteria = """A 'Call To Action' phrase is detected in the video supers (overlaid text)
        at any time in the video."""
    call_to_action_text_eval_details = {
        "feature": call_to_action_text_feature,
        "feature_description": call_to_action_text_criteria,
        "feature_detected": call_to_action_text,
        "llm_details": [],
    }
    call_to_action_api_list.extend(branded_call_to_actions)

    # Video API: Evaluate call_to_action_text_feature
    if use_annotations:
        if "text_annotations" in text_annotation_results:
            # Video API: Evaluate call_to_action_text_feature
            for text_annotation in text_annotation_results.get("text_annotations"):
                text = text_annotation.get("text")
                found_call_to_actions = [
                    cta
                    for cta in call_to_action_api_list
                    if cta.lower() in text.lower()
                ]
                if len(found_call_to_actions) > 0:
                    call_to_action_text = True
        else:
            print(
                f"No Text annotations found. Skipping {call_to_action_text_feature} evaluation with Video Intelligence API."
            )

    # LLM: Evaluate call_to_action_text_feature
    if use_llms:
        llm_params = LLMParameters(
            model_name=GEMINI_PRO,
            location=llm_location,
            generation_config=llm_generation_config,
        )
        # 1. Evaluate call_to_action_text_feature
        prompt = (
            """Is any call to action detected in any text overlay at any time in the video?
            Consider the following criteria for your answer: {criteria}
            Some examples of call to actions are: {call_to_actions}
            Look through each frame in the video carefully and answer the question.
            Provide the exact timestamp when the call to action is detected in any text overlay in the video.
            {context_and_examples}
        """.replace(
                "{call_to_actions}", ", ".join(call_to_action_api_list)
            )
            .replace("{feature}", call_to_action_text_feature)
            .replace("{criteria}", call_to_action_text_criteria)
            .replace("{context_and_examples}", context_and_examples)
        )
        # Use full video for this feature
        llm_params.set_modality({"type": "video", "video_uri": video_uri})
        feature_detected, llm_explanation = detect_feature_with_llm(
            call_to_action_text_feature, prompt, llm_params
        )
        if feature_detected:
            call_to_action_text = True

        # Include llm details
        call_to_action_text_eval_details["llm_details"].append(
            {
                "llm_params": llm_params.__dict__,
                "prompt": prompt,
                "llm_explanation": llm_explanation,
            }
        )

    print(f"{call_to_action_text_feature}: {call_to_action_text}")
    call_to_action_text_eval_details["feature_detected"] = call_to_action_text

    return call_to_action_text_eval_details

## <font color='#4285f4'>Execute ABCD Assessment</font>

### Define Assessment Functions

In [None]:
def parse_abcd_assessment_results(abcd_assessment: dict) -> None:
    """Print ABCD Assessments
    Args:
        abcd_assessments: dict of video abcd assessments
    """

    result_array = []

    for video_assessment in abcd_assessment.get("video_assessments"):
        intermediate_result_text = ""

        video_url = f"/content/{bucket_name}/{brand_name}/videos/{video_assessment.get('video_name')}"
        intermediate_result_text = f"\nAsset name: {video_assessment.get('video_name')}\n"
        passed_features_count = video_assessment.get("passed_features_count")
        total_features = len(video_assessment.get("features"))

        intermediate_result_text = intermediate_result_text + f"Video score: {round(video_assessment.get('score'), 2)}%, adherence ({passed_features_count}/{total_features})\n"

        if video_assessment.get("score") >= 80:
            intermediate_result_text = intermediate_result_text + "Asset result: ✅ Excellent \n"
        elif video_assessment.get("score") >= 65 and video_assessment.get("score") < 80:
            intermediate_result_text = intermediate_result_text + "Asset result: ⚠ Might Improve \n"
        else:
            intermediate_result_text = intermediate_result_text + "Asset result: ❌ Needs Review \n"

        intermediate_result_text = intermediate_result_text + "Evaluated Features:\n"
        for feature in video_assessment.get("features"):
            if feature.get("feature_detected"):
                intermediate_result_text = intermediate_result_text + f" * ✅ {feature.get('feature')}\n"
            else:
                intermediate_result_text = intermediate_result_text + f" * ❌ {feature.get('feature')}\n"

        result_array.append({
            'brand_name': brand_name,
            'video_name': video_assessment.get('video_name'),
            'video_url': video_url,
            'score': video_assessment.get('score'),
            'result_text': intermediate_result_text,
            'passed_features_count': passed_features_count,
            'total_features_count': total_features,
            'features_detail': video_assessment.get('features'),
        })

    return result_array


def execute_abcd_assessment_for_videos():
    """Execute ABCD Assessment for all brand videos in GCS"""

    assessments = {"brand_name": brand_name, "video_assessments": []}

    # Get videos for ABCD Assessment
    brand_videos_folder = f"{brand_name}/videos"
    bucket = get_bucket()
    blobs = bucket.list_blobs(prefix=brand_videos_folder)

    # Video processing
    for video in blobs:
        if video.name == f"{brand_videos_folder}/" or "1st_5_secs" in video.name:
            # Skip parent folder
            continue
        video_name, video_name_with_format = get_file_name_from_gcs_url(video.name)
        if not video_name or not video_name_with_format:
            print(f"Video name not resolved for {video.name}... Skipping execution")
            continue
        # Check size of video to avoid processing videos > 7MB
        video_metadata = bucket.get_blob(video.name)
        size_mb = video_metadata.size / 1e6
        if use_llms and size_mb > VIDEO_SIZE_LIMIT_MB:
            print(
                f"The size of video {video.name} is greater than {VIDEO_SIZE_LIMIT_MB} MB. Skipping execution."
            )
            continue

        print(f"\n\nProcessing ABCD Assessment for video {video.name}...")

        label_annotation_results = {}
        face_annotation_results = {}
        people_annotation_results = {}
        shot_annotation_results = {}
        text_annotation_results = {}
        logo_annotation_results = {}
        speech_annotation_results = {}

        if use_annotations:
            # 2) Download generated video annotations
            (
                label_annotation_results,
                face_annotation_results,
                people_annotation_results,
                shot_annotation_results,
                text_annotation_results,
                logo_annotation_results,
                speech_annotation_results,
            ) = download_video_annotations(brand_name, video_name)

        # 3) Execute ABCD Assessment
        video_uri = f"gs://{bucket_name}/{video.name}"
        features = []

        # Quick pacing
        quick_pacing, quick_pacing_1st_5_secs = detect_quick_pacing(
            shot_annotation_results, video_uri
        )
        features.append(quick_pacing)
        features.append(quick_pacing_1st_5_secs)

        # Dynamic Start
        dynamic_start = detect_dynamic_start(shot_annotation_results, video_uri)
        features.append(dynamic_start)

        # Supers and Supers with Audio
        supers = detect_supers(text_annotation_results, video_uri)
        supers_with_audio = detect_supers_with_audio(
            text_annotation_results, speech_annotation_results, video_uri
        )
        features.append(supers)
        features.append(supers_with_audio)

        # Brand Visuals & Brand Visuals (First 5 seconds)
        (
            brand_visuals,
            brand_visuals_1st_5_secs,
            brand_visuals_logo_big_1st_5_secs,
        ) = detect_brand_visuals(
            text_annotation_results,
            logo_annotation_results,
            video_uri,
            brand_name,
            brand_variations,
        )
        features.append(brand_visuals)
        features.append(brand_visuals_1st_5_secs)

        # Brand Mention (Speech) & Brand Mention (Speech) (First 5 seconds)
        (
            brand_mention_speech,
            brand_mention_speech_1st_5_secs,
        ) = detect_brand_mention_speech(
            speech_annotation_results, video_uri, brand_name, brand_variations
        )
        features.append(brand_mention_speech)
        features.append(brand_mention_speech_1st_5_secs)

        # Product Visuals & Product Visuals (First 5 seconds)
        product_visuals, product_visuals_1st_5_secs = detect_product_visuals(
            label_annotation_results,
            video_uri,
            branded_products,
            branded_products_categories,
        )
        features.append(product_visuals)
        features.append(product_visuals_1st_5_secs)

        # Product Mention (Text) & Product Mention (Text) (First 5 seconds)
        (
            product_mention_text,
            product_mention_text_1st_5_secs,
        ) = detect_product_mention_text(
            text_annotation_results,
            video_uri,
            branded_products,
            branded_products_categories,
        )
        features.append(product_mention_text)
        features.append(product_mention_text_1st_5_secs)

        # Product Mention (Speech) & Product Mention (Speech) (First 5 seconds)
        (
            product_mention_speech,
            product_mention_speech_1st_5_secs,
        ) = detect_product_mention_speech(
            speech_annotation_results,
            video_uri,
            branded_products,
            branded_products_categories,
        )
        features.append(product_mention_speech)
        features.append(product_mention_speech_1st_5_secs)

        # Visible Face (First 5s) & Visible Face (Close Up)
        visible_face_1st_5_secs, visible_face_close_up = detect_visible_face(
            face_annotation_results, video_uri
        )
        features.append(visible_face_1st_5_secs)
        features.append(visible_face_close_up)

        # Presence of People & Presence of People (First 5 seconds)
        presence_of_people, presence_of_people_1st_5_secs = detect_presence_of_people(
            people_annotation_results, video_uri
        )
        features.append(presence_of_people)
        features.append(presence_of_people_1st_5_secs)

        #  Audio Early (First 5 seconds)
        audio_speech_early = detect_audio_speech_early(
            speech_annotation_results, video_uri
        )
        features.append(audio_speech_early)

        # Overall Pacing
        overall_pacing = detect_overall_pacing(shot_annotation_results, video_uri)
        features.append(overall_pacing)

        # Call To Action (Speech)
        call_to_action_speech = detect_call_to_action_speech(
            speech_annotation_results, video_uri, branded_call_to_actions
        )
        features.append(call_to_action_speech)

        # Call To Action (Text)
        call_to_action_text = detect_call_to_action_text(
            text_annotation_results, video_uri, branded_call_to_actions
        )
        features.append(call_to_action_text)

        # Calculate ABCD final score
        total_features = len(features)
        passed_features_count = 0
        for feature in features:
            if feature.get("feature_detected"):
                passed_features_count += 1
        # Get score
        score = (passed_features_count * 100) / total_features
        video_assessment = {
            "video_name": video_name_with_format,
            "video_uri": video_uri,
            "features": features,
            "passed_features_count": passed_features_count,
            "score": score,
        }
        assessments.get("video_assessments").append(video_assessment)

        if STORE_ASSESSMENT_RESULTS_LOCALLY:
            # Store assessment results locally
            store_assessment_results_locally(brand_name, video_assessment)

    return assessments


def execute_abcd_detector():
    """Main ABCD Assessment execution"""

    if use_annotations:
        generate_video_annotations(brand_name)

    trim_videos(brand_name)

    abcd_assessments = execute_abcd_assessment_for_videos()
    if len(abcd_assessments.get("video_assessments")) == 0:
        print("There are no videos to display.")

    return abcd_assessments



### Run Assessment

In [None]:
# Run the assessments
assessments = execute_abcd_detector()

### Parse and Display Results

In [None]:
# Parse ABCD Assessments Results
parsed_results = parse_abcd_assessment_results(assessments)

# Display Assessment Results
result_html = ""
for result in parsed_results:
  # Reformat URI
  video_uri = result['video_url']
  content_index = video_uri.index('/content/') + len('/content/')
  video_uri = 'https://storage.cloud.google.com/' + video_uri[content_index:]

  #Build result HTML
  result_html = result_html + f"""
  <br><br><b>ABCDs Result For: {result['video_name']}</b><br>

  <video width=600 height=337 controls>
    <source src="{video_uri}" type="video/mp4">
  </video>
  """

  split_text_result = result['result_text'].split('\n')
  for line in split_text_result:
    result_html = result_html + f"""
    <p>{line}</p>
    """

HTML(result_html)


### Save Results to BigQuery

In [None]:
for res in parsed_results:

  # Extract timestamps for features
  timestamps = ExtractTimestampsFromText(res)

  # Save profile to database
  job_config = bigquery.QueryJobConfig(
    query_parameters=[
        bigquery.ScalarQueryParameter("features_detail", "JSON", res['features_detail']),
        bigquery.ScalarQueryParameter("result_text", "STRING", res['result_text']),
        bigquery.ScalarQueryParameter("feature_timestamps", "JSON", timestamps)
    ],
    priority=bigquery.QueryPriority.INTERACTIVE
  )

  sql=f"""INSERT INTO `chocolate-ai-demo-b2kvrbnkb3.chocolate_ai.campaign_abcd_results`
          (assessment_id, assessment_date, brand_name, video_name, video_url, score, result_text, passed_features_count, total_features_count, features_detail, feature_timestamps)
          VALUES(GENERATE_UUID(), CURRENT_TIMESTAMP(),'{res['brand_name']}','{res['video_name']}','{res['video_url']}',{res['score']},@result_text,{res['passed_features_count']},{res['total_features_count']},@features_detail,@feature_timestamps);"""

  RunQuery(sql, job_config)

## <font color='#4285f4'>Clean Up</font>

Uncomment the lines below to cleanup resources created by this notebook.

In [None]:
# Unmount bucket
#!fusermount -u /content/$bucket_name


## <font color='#4285f4'>Reference Links</font>


- [Google.com](https://www.google.com)