In [1]:
#@title Install the client libraries
!pip install --upgrade google-cloud-videointelligence
!pip install --upgrade google-auth


Collecting google-cloud-videointelligence
  Downloading google_cloud_videointelligence-2.13.2-py2.py3-none-any.whl (240 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m240.4/240.4 kB[0m [31m1.4 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: google-cloud-videointelligence
Successfully installed google-cloud-videointelligence-2.13.2


Collecting google-auth
  Downloading google_auth-2.28.1-py2.py3-none-any.whl (186 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m186.9/186.9 kB[0m [31m1.9 MB/s[0m eta [36m0:00:00[0m
[31mERROR: Operation cancelled by user[0m[31m
[0m

In [1]:
## Import the required modules
from google.cloud import storage
from google.cloud import videointelligence
from google.colab import auth

from vertexai.preview.language_models import TextGenerationModel
import io
import json
import time


In [36]:
#@title Specify your Google Cloud Project id & Authenticate
project_id='INSERT-YOUR-PROJECT-ID'
auth.authenticate_user(project_id=project_id)

CalledProcessError: Command '['gcloud', 'config', 'set', 'project', 'INSERT-YOUR-PROJECT-ID']' returned non-zero exit status 1.

In [35]:
#@title Parameters
input_bucket_name = "abcd_input_bucket" #@param
output_bucket_name = "abcd_output_bucket" #@param
speech_language_code = "en-EN" #@param

#change input & output

# thresholds
confidence_threshold = 0.6 #@param
early_time_seconds = 5 #@param

# face early
duration_threshold_seconds = 1 #@param
surface_threshold_percent = 0.05 #@param

# pacing quick
avg_shot_duration_seconds = 2 #@param

# dynamic start
dynamic_cutoff_ms = 3000  #@param
changed_pixels_minimum = 30  # param

# logo big & early, name early
logo_size_threshold = 3.5  #@param
# Opaque entity ID. Some IDs may be available in [Google Knowledge Graph
# Search API](https://developers.google.com/knowledge-graph/).
entity_id = "/g/11cm05x9g0" #@param
entity_desc = "Pixel"
brand_name = "Pixel" #@param

# product early
duration_threshold = 1
# Opaque entity ID. Some IDs may be available in [Google Knowledge Graph
# Search API](https://developers.google.com/knowledge-graph/).
product_entity_id = None #@param
product_entity_desc = "television advertisement" #@param

# face close
face_duration_threshold_seconds = 1 #@param
face_surface_threshold = 0.05 #@param

# overall pacing
overall_shot_pace_threshold = 2 #@param

#clients
storage_client = storage.Client()
input_bucket = storage_client.get_bucket(input_bucket_name)
output_bucket = storage_client.get_bucket(output_bucket_name)

video_client = videointelligence.VideoIntelligenceServiceClient()

NotFound: 404 GET https://storage.googleapis.com/storage/v1/b/abcd_input_bucket?projection=noAcl&prettyPrint=false: The specified bucket does not exist.

In [6]:
## 1) Label Detection: API Request
def label_detection_api_request(video_name, gs_uri):
  """Detects labels in a video."""
  filename = f"labeldetection/labeldetection-{video_name}.json"
  videos = storage_client.list_blobs(output_bucket_name)

  if(videos):
    for v in videos:
      if(v.name == filename):
        # print(f"File already exists: {filename} not sending API request")
        return

  output_uri = f"gs://{output_bucket_name}/{filename}"


  features = [videointelligence.Feature.LABEL_DETECTION]
  operation = video_client.annotate_video(
      request={
          "features": features,
          "input_uri": gs_uri,
          "output_uri": filename,
      }
  )
  print("\nProcessing video for label annotations:")

  result = operation.result(timeout=180)

  # first result is retrieved because a single video was processed
  segment_labels = result.annotation_results[0].shot_label_annotations

  for i, segment_label in enumerate(segment_labels):
      print("Video label description: {}".format(segment_label.entity.description))
      for category_entity in segment_label.category_entities:
          print(
              "\tLabel category description: {}".format(category_entity.description)
          )

      for i, segment in enumerate(segment_label.segments):
          start_time = (
              segment.segment.start_time_offset.seconds
              + segment.segment.start_time_offset.microseconds / 1e6
          )
          end_time = (
              segment.segment.end_time_offset.seconds
              + segment.segment.end_time_offset.microseconds / 1e6
          )
          positions = "{}s to {}s".format(start_time, end_time)
          confidence = segment.confidence
          print("\tSegment {}: {}".format(i, positions))
          print("\tConfidence: {}".format(confidence))
      print("\n")

In [7]:
## 2) Detect Faces: API Request
def faces_detection_api_request(video_name, gs_uri):
    """Detects faces in a video."""
    filename = f"facesdetection/facesdetection-{video_name}.json"
    videos = storage_client.list_blobs(output_bucket_name)

    if(videos):
      for v in videos:
        if(v.name == filename):
          # print(f"File already exists: {filename} not sending API request")
          return

    output_uri = f"gs://{output_bucket_name}/{filename}"

    # Configure the request
    config = videointelligence.FaceDetectionConfig(
        include_bounding_boxes=True, include_attributes=True
    )
    context = videointelligence.VideoContext(face_detection_config=config)

    # Start the asynchronous request
    operation = video_client.annotate_video(
        request={
            "features": [videointelligence.Feature.FACE_DETECTION],
            "input_uri": gs_uri,
            "output_uri": output_uri,
            "video_context": context,
        }
    )

    print("\nProcessing video for face detection annotations.")
    result = operation.result(timeout=300)

    print("\nFinished processing.\n")

    # Retrieve the first result, because a single video was processed.
    annotation_result = result.annotation_results[0]

    for index, annotation in enumerate(annotation_result.face_detection_annotations):
        print("Face detected: " + format(index))
        for track in annotation.tracks:
            print(
                "Segment: {}s to {}s".format(
                    track.segment.start_time_offset.seconds
                    + track.segment.start_time_offset.microseconds / 1e6,
                    track.segment.end_time_offset.seconds
                    + track.segment.end_time_offset.microseconds / 1e6,
                )
            )

            # Each segment includes timestamped faces that include
            # characteristics of the face detected.
            # Grab the first timestamped face
            timestamped_object = track.timestamped_objects[0]
            box = timestamped_object.normalized_bounding_box
            print("Bounding box:")
            print("\tleft  : {}".format(box.left))
            print("\ttop   : {}".format(box.top))
            print("\tright : {}".format(box.right))
            print("\tbottom: {}".format(box.bottom))

            # Attributes include glasses, headwear, smiling, direction of gaze
            print("Attributes:")
            for attribute in timestamped_object.attributes:
                print(
                    "\t{}:{} {}".format(
                        attribute.name, attribute.value, attribute.confidence
                    )
                )

In [8]:
## 3) Shot Detection: API Request
def shot_detection_api_request(video_name, gs_uri):
  filename = f"shotdetection/shotdetection-{video_name}.json"
  videos = storage_client.list_blobs(output_bucket_name)

  if(videos):
    for v in videos:
      if(v.name == filename):
        # print(f"File already exists: {filename} not sending API request")
        return

  output_uri = f"gs://{output_bucket_name}/{filename}"

  """Detects camera shot changes."""
  features = [videointelligence.Feature.SHOT_CHANGE_DETECTION]
  operation = video_client.annotate_video(
      request={"features": features,
              "input_uri": gs_uri,
              "output_uri": output_uri}
  )
  print("\nProcessing video for shot change annotations:")

  result = operation.result(timeout=90)
  print("\nFinished processing.")

  # first result is retrieved because a single video was processed
  for i, shot in enumerate(result.annotation_results[0].shot_annotations):
      start_time = (
          shot.start_time_offset.seconds + shot.start_time_offset.microseconds / 1e6
      )
      end_time = (
          shot.end_time_offset.seconds + shot.end_time_offset.microseconds / 1e6
      )
      print("\tShot {}: {} to {}".format(i, start_time, end_time))



In [9]:
## 4) Detect Text: API Request
def text_detection_api_request(video_name, gs_uri):
  filename = f"textdetection/textdetection-{video_name}.json"
  videos = storage_client.list_blobs(output_bucket_name)

  if(videos):
    for v in videos:
      if(v.name == filename):
        # print(f"File already exists: {filename} not sending API request")
        return

  output_uri = f"gs://{output_bucket_name}/{filename}"

  features = [videointelligence.Feature.TEXT_DETECTION]

  operation = video_client.annotate_video(
      request={"features": features,
               "input_uri": gs_uri,
               "output_uri": output_uri}
  )

  print("\nProcessing video for text detection.")
  result = operation.result(timeout=600)

  # The first result is retrieved because a single video was processed.
  annotation_result = result.annotation_results[0]

  for text_annotation in annotation_result.text_annotations:
      print("\nText: {}".format(text_annotation.text))

      # Get the first text segment
      text_segment = text_annotation.segments[0]
      start_time = text_segment.segment.start_time_offset
      end_time = text_segment.segment.end_time_offset
      print(
          "start_time: {}, end_time: {}".format(
              start_time.seconds + start_time.microseconds * 1e-6,
              end_time.seconds + end_time.microseconds * 1e-6,
          )
      )

      print("Confidence: {}".format(text_segment.confidence))

      # Show the result for the first frame in this segment.
      frame = text_segment.frames[0]
      time_offset = frame.time_offset
      print(
          "Time offset for the first frame: {}".format(
              time_offset.seconds + time_offset.microseconds * 1e-6
          )
      )
      print("Rotated Bounding Box Vertices:")
      for vertex in frame.rotated_bounding_box.vertices:
          print("\tVertex.x: {}, Vertex.y: {}".format(vertex.x, vertex.y))

In [11]:
## 5) Logo Detection: API Request
def logo_detection_api_request(video_name, gs_uri):
    filename = f"logodetection/logodetection-{video_name}.json"
    videos = storage_client.list_blobs(output_bucket_name)

    if(videos):
      for v in videos:
        if(v.name == filename):
          # print(f"File already exists: {filename} not sending API request")
          return

    output_uri = f"gs://{output_bucket_name}/{filename}"

    features = [videointelligence.Feature.LOGO_RECOGNITION]

    operation = video_client.annotate_video(
        request={"features": features,
                 "input_uri": gs_uri,
                 "output_uri": output_uri}
    )

    print("Waiting for operation to complete...")
    response = operation.result()

    # Get the first response, since we sent only one video.
    annotation_result = response.annotation_results[0]

    # Annotations for list of logos detected, tracked and recognized in video.
    for logo_recognition_annotation in annotation_result.logo_recognition_annotations:
        entity = logo_recognition_annotation.entity

        # Opaque entity ID. Some IDs may be available in [Google Knowledge Graph
        # Search API](https://developers.google.com/knowledge-graph/).
        print("Entity Id : {}".format(entity.entity_id))

        print("Description : {}".format(entity.description))

        # All logo tracks where the recognized logo appears. Each track corresponds
        # to one logo instance appearing in consecutive frames.
        for track in logo_recognition_annotation.tracks:
            # Video segment of a track.
            print(
                "\n\tStart Time Offset : {}.{}".format(
                    track.segment.start_time_offset.seconds,
                    track.segment.start_time_offset.microseconds * 1000,
                )
            )
            print(
                "\tEnd Time Offset : {}.{}".format(
                    track.segment.end_time_offset.seconds,
                    track.segment.end_time_offset.microseconds * 1000,
                )
            )
            print("\tConfidence : {}".format(track.confidence))

            # The object with timestamp and attributes per frame in the track.
            for timestamped_object in track.timestamped_objects:
                # Normalized Bounding box in a frame, where the object is located.
                normalized_bounding_box = timestamped_object.normalized_bounding_box
                print("\n\t\tLeft : {}".format(normalized_bounding_box.left))
                print("\t\tTop : {}".format(normalized_bounding_box.top))
                print("\t\tRight : {}".format(normalized_bounding_box.right))
                print("\t\tBottom : {}".format(normalized_bounding_box.bottom))

                # Optional. The attributes of the object in the bounding box.
                for attribute in timestamped_object.attributes:
                    print("\n\t\t\tName : {}".format(attribute.name))
                    print("\t\t\tConfidence : {}".format(attribute.confidence))
                    print("\t\t\tValue : {}".format(attribute.value))

            # Optional. Attributes in the track level.
            for track_attribute in track.attributes:
                print("\n\t\tName : {}".format(track_attribute.name))
                print("\t\tConfidence : {}".format(track_attribute.confidence))
                print("\t\tValue : {}".format(track_attribute.value))

        # All video segments where the recognized logo appears. There might be
        # multiple instances of the same logo class appearing in one VideoSegment.
        for segment in logo_recognition_annotation.segments:
            print(
                "\n\tStart Time Offset : {}.{}".format(
                    segment.start_time_offset.seconds,
                    segment.start_time_offset.microseconds * 1000,
                )
            )
            print(
                "\tEnd Time Offset : {}.{}".format(
                    segment.end_time_offset.seconds,
                    segment.end_time_offset.microseconds * 1000,
                )
            )

In [12]:
## 6) Transcribe the video: API Request
def speech_detection_api_request(video_name, gs_uri):
  filename = f"transcribe/transcribe-{video_name}.json"
  videos = storage_client.list_blobs(output_bucket_name)

  if(videos):
    for v in videos:
      if(v.name == filename):
        # print(f"File already exists: {filename} not sending API request")
        return

  output_uri = f"gs://{output_bucket_name}/{filename}"

  features = [videointelligence.Feature.SPEECH_TRANSCRIPTION]

  config = videointelligence.SpeechTranscriptionConfig(
      language_code=speech_language_code, enable_automatic_punctuation=True
  )
  video_context = videointelligence.VideoContext(speech_transcription_config=config)

  operation = video_client.annotate_video(
      request={
          "features": features,
          "input_uri": gs_uri,
          "output_uri": output_uri,
          "video_context": video_context,
      }
  )

  print("\nProcessing video for speech transcription.")

  result = operation.result(timeout=600)

  # There is only one annotation_result since only
  # one video is processed.
  annotation_results = result.annotation_results[0]
  for speech_transcription in annotation_results.speech_transcriptions:
      # The number of alternatives for each transcription is limited by
      # SpeechTranscriptionConfig.max_alternatives.
      # Each alternative is a different possible transcription
      # and has its own confidence score.
      for alternative in speech_transcription.alternatives:
          print("Alternative level information:")

          print("Transcript: {}".format(alternative.transcript))
          print("Confidence: {}\n".format(alternative.confidence))

          print("Word level information:")
          for word_info in alternative.words:
              word = word_info.word
              start_time = word_info.start_time
              end_time = word_info.end_time
              print(
                  "\t{}s - {}s: {}".format(
                      start_time.seconds + start_time.microseconds * 1e-6,
                      end_time.seconds + end_time.microseconds * 1e-6,
                      word,
                  )
              )

In [13]:
def send_video_intelligence_requests(video_name, gs_uri):
  label_detection_api_request(video_name, gs_uri)
  faces_detection_api_request(video_name, gs_uri)
  shot_detection_api_request(video_name, gs_uri)
  text_detection_api_request(video_name, gs_uri)
  logo_detection_api_request(video_name, gs_uri)
  speech_detection_api_request(video_name, gs_uri)

# ABCD Detector

## ABCD Signals detected for video

In [14]:
#@title 1) Attract: Face Early
#@markdown Is there a human face on screen in the 1st 5 seconds?
#@markdown Definition: At least seen for 1s and with at least 5% of the surface.

def is_face_early(video_name):
  face_early = False
  max_face_surface = 0
  total_time_face_detected = 0

  blob = output_bucket.blob(f"facesdetection/facesdetection-{video_name}.json")

  # Download the contents of the blob as a string and then parse it using json.loads() method
  data = json.loads(blob.download_as_string(client=None))

  annotation_result = data.get("annotation_results")[0]
  if(annotation_result and annotation_result.get("face_detection_annotations")):
    for annotation in annotation_result.get("face_detection_annotations"):
      for track in annotation.get("tracks"):
        start_nanos = track.get("segment").get("start_time_offset").get("nanos")
        start_nanos_safe = start_nanos and start_nanos / 1e9 or 0
        start_seconds_safe = track.get("segment").get("start_time_offset").get("seconds") or 0
        start_time = start_seconds_safe + start_nanos_safe
        end_nanos = track.get("segment").get("end_time_offset").get("nanos")
        end_nanos_safe = end_nanos and end_nanos / 1e9 or 0
        end_seconds_safe = track.get("segment").get("end_time_offset").get("seconds") or 0
        end_time = end_seconds_safe + end_nanos_safe

        confidence = track.get("confidence")

        if(confidence >= confidence_threshold):
          if(start_time < early_time_seconds):
            # Each segment includes timestamped faces that include
            # characteristics of the face detected.
            face_objects = track.get("timestamped_objects")
            for i, face_object in enumerate(face_objects):
              box = face_object.get("normalized_bounding_box")
              time_offset = face_object.get("time_offset")
              offset_nanos = time_offset.get("nanos")
              offset_nanos_safe = offset_nanos and offset_nanos / 1e9 or 0
              offset_seconds_safe = time_offset.get("seconds") or 0
              start_seconds_safe = offset_seconds_safe + offset_nanos_safe

              left = box.get("left") or 0
              right = box.get("right") or 1
              top = box.get("top") or 0
              bottom = box.get("bottom") or 1

              width = right - left
              height = bottom - top
              surface = width * height

              next_offset_nanos_safe = 0
              next_offset_seconds_safe = 0

              next_i = i + 1
              if(next_i < len(face_objects)):
                next_box = face_objects[next_i].get("normalized_bounding_box")
                time_offset = face_objects[next_i].get("time_offset")
                offset_nanos = time_offset.get("nanos")
                next_offset_nanos_safe = offset_nanos and offset_nanos / 1e9 or 0
                next_offset_seconds_safe = time_offset.get("seconds") or 0

              end_seconds_safe = next_offset_seconds_safe + next_offset_nanos_safe

              if(end_seconds_safe > 0
                and surface >= surface_threshold_percent):
                max_face_surface = max(max_face_surface, surface)
                total_time_face_detected += end_seconds_safe - start_seconds_safe

            if(total_time_face_detected > duration_threshold_seconds):
              face_early = True

  return face_early

In [16]:
#@title 2) Attract: Pacing quick
#@markdown Is the pace of the video under 2 seconds per shot in the first 5 seconds?

def is_pacing_quick(video_name):
  pacing_quick = False
  total_time_per_shot = 0
  total_shots = 0

  blob = output_bucket.blob(f"shotdetection/shotdetection-{video_name}.json")

  # Download the contents of the blob as a string and then parse it using json.loads() method
  data = json.loads(blob.download_as_string(client=None))

  # first result is retrieved because a single video was processed
  for i, shot in enumerate(data.get("annotation_results")[0].get("shot_annotations")):
      start_time_microseconds = shot.get("start_time_offset").get("microseconds")
      start_time_microseconds_safe = start_time_microseconds and start_time_microseconds / 1e6 or 0
      end_time_microseconds = shot.get("end_time_offset").get("microseconds")
      end_time_microseconds_safe = end_time_microseconds and end_time_microseconds / 1e6 or 0
      start_time = (
          shot.get("start_time_offset").get("seconds") or 0 + start_time_microseconds_safe
      )
      end_time = (
          shot.get("end_time_offset").get("seconds") or 0 + end_time_microseconds_safe
      )
      total_shot_time = end_time - start_time
      if(start_time < early_time_seconds):
        total_time_per_shot += total_shot_time
        total_shots += 1
      #print("\tShot {}: {} to {}: {} seconds".format(i, start_time, end_time, total_shot_time))

  # print(total_time_per_shot / total_shots)
  if(total_time_per_shot / total_shots <= avg_shot_duration_seconds):
    pacing_quick = True

  return pacing_quick


In [17]:
#@title 3) Attract: Dynamic Start
#@markdown Does the shot changes in less than 3s?

def has_dynamic_start(video_name):
  dynamic_start = False
  blob = output_bucket.blob(f"shotdetection/shotdetection-{video_name}.json")

  # Download the contents of the blob as a string and then parse it using json.loads() method
  data = json.loads(blob.download_as_string(client=None))

  frame_start_ms = 0
  end_time_off_set = data['annotation_results'][0]['shot_annotations'][0]
  nanos = end_time_off_set.get("end_time_offset").get("nanos")
  seconds = end_time_off_set.get("end_time_offset").get("seconds")
  if(nanos):
    if(seconds):
      total_ms_first_shot = (nanos + seconds * 1e9) / 1e6
    else:
      total_ms_first_shot = nanos / 1e6
  else:
    if(seconds):
      total_ms_first_shot = (seconds * 1e9) / 1e6


  if total_ms_first_shot < dynamic_cutoff_ms:
    dynamic_start = True

  return dynamic_start


In [19]:
#@title 4 & 5) Brand: Logo Big & Logo Early
#@markdown Is Logo larger than 3.5% of screen in the first 5 seconds?

def is_logo_big_early(video_name):
  brand_logo_big = False
  brand_logo_early = False

  blob = output_bucket.blob(f"logodetection/logodetection-{video_name}.json")
  # Download the contents of the blob as a string and then parse it using json.loads() method
  data = json.loads(blob.download_as_string(client=None))
  annotation_result = data.get("annotation_results")[0]

  blob_text = output_bucket.blob(f"textdetection/textdetection-{video_name}.json")
  # Download the contents of the blob as a string and then parse it using json.loads() method
  data_text = json.loads(blob_text.download_as_string(client=None))
  annotation_result_text = data_text.get("annotation_results")[0]

  for text_annotation in annotation_result_text.get("text_annotations"):
      if(brand_name in text_annotation.get("text")):
        frames = text_annotation.get("segments")[0].get("frames")
        for frame in frames:
          start_time_seconds = frame.get("time_offset").get("seconds") or 0
          if(start_time_seconds <= early_time_seconds):
            brand_logo_early = True
            coordinates = []
            for vertex in frame.get("rotated_bounding_box").get("vertices"):
              coordinates.append(((float(vertex.get("x"))), float(vertex.get("y"))))
            surface_area = calculate_surface_area(coordinates)
            if(surface_area > logo_size_threshold):
              brand_logo_big = True

  # Annotations for list of logos detected, tracked and recognized in video.
  for logo_recognition_annotation in annotation_result.get("logo_recognition_annotations"):
    entity = logo_recognition_annotation.get("entity")
    # print(f"{entity}")
    if(entity.get("entity_id") == entity_id or entity.get("description") == entity_desc):
      # All logo tracks where the recognized logo appears. Each track corresponds
      # to one logo instance appearing in consecutive frames.
      for track in logo_recognition_annotation.get("tracks"):
        confidence = track.get("confidence")
        # print(f"Confidence: {confidence}")
        if(confidence > confidence_threshold):
          # Video segment of a track.
          seconds = track.get("segment").get("start_time_offset").get("seconds")
          # print(f"Seen at: {seconds}s")
          if(seconds):
            if(seconds <= early_time_seconds):
              brand_logo_early = True
              # The object with timestamp and attributes per frame in the track.
              for timestamped_object in track.get("timestamped_objects"):
                # Normalized Bounding box in a frame, where the object is located.
                normalized_bounding_box = timestamped_object.get("normalized_bounding_box")
                a = normalized_bounding_box.get("bottom") - normalized_bounding_box.get("top")
                b = normalized_bounding_box.get("right") - normalized_bounding_box.get("left")
                if (a * b * 100 > logo_size_threshold):
                  brand_logo_big = True
                  # print(f"Surface is larger than 3.5%: {a*b*100}")

  return brand_logo_big, brand_logo_early

def calculate_surface_area(points):
  if(len(points) != 4):
    return 0
  area1 = 0.5 * abs(points[0][0] * points[1][1] - points[1][0] * points[0][1])
  area2 = 0.5 * abs(points[1][0] * points[2][1] - points[2][0] * points[1][1])
  area3 = 0.5 * abs(points[2][0] * points[3][1] - points[3][0] * points[2][1])
  area4 = 0.5 * abs(points[3][0] * points[0][1] - points[0][0] * points[3][1])

# Add the areas of the four triangles to get the total surface area.
  surface_area = area1 + area2 + area3 + area4
  return surface_area*100

In [20]:
#@title 6) Brand: Product Early
#@markdown Is the product visible in the first 5 seconds?

def is_product_early(video_name):
  brand_product_early = False

  blob = output_bucket.blob(f"labeldetection/labeldetection-{video_name}.json")

  # Download the contents of the blob as a string and then parse it using json.loads() method
  data = json.loads(blob.download_as_string(client=None))

  # Segments are user specified in a request to capture labels during a specific
  # time frame. If unspecified, each video is treated as a single segment.
  segment_labels = data.get("annotation_results")[0].get(
      "segment_label_annotations")

  # Shots change each time a video cut occurs or the contents of the video have
  # changed. When a new shot is detected, labels are annotated for the new shot.
  shot_labels = data.get("annotation_results")[0].get(
      "shot_label_annotations")

  for i, segment_label in enumerate(shot_labels):
      segment_entity_id = segment_label.get("entity").get("entity_id")
      segment_entity_description = segment_label.get("entity").get("description")
      # print(segment_entity_description)
      if(segment_entity_id == product_entity_id or segment_entity_description == product_entity_desc):
        label_description = ("Video label description: {}".format(
            segment_label.get("entity").get("description")))
        for i, segment in enumerate(segment_label.get("segments")):
            confidence = segment.get("confidence")
            # print(f"Confidence: {confidence}")
            if(confidence >= confidence_threshold):
              start_nanos = segment.get("segment").get(
                  "start_time_offset").get(
                      "nanos")
              start_nanos_safe = start_nanos and start_nanos / 1e9 or 0
              start_seconds_safe = segment.get("segment").get("start_time_offset").get("seconds") or 0
              start_time = start_seconds_safe + start_nanos_safe

              """end_nanos = segment.get("segment").get(
                  "end_time_offset").get(
                      "nanos")
              end_nanos_safe = end_nanos and end_nanos / 1e9 or 0
              end_seconds_safe = segment.get("segment").get("end_time_offset").get("seconds") or 0
              end_time = end_seconds_safe + end_nanos_safe"""
              if(start_time <= early_time_seconds):
                brand_product_early = True
                positions = "{}s to {}s".format(start_time, end_time)


  return brand_product_early


In [21]:
#@title 7) Brand: Name Early
#@markdown Is there text on screen for the brand in the first 5 seconds?

def is_name_early(video_name):
  brand_name_early = False

  blob = output_bucket.blob(f"textdetection/textdetection-{video_name}.json")
  # Download the contents of the blob as a string and then parse it using json.loads() method
  data = json.loads(blob.download_as_string(client=None))

  # The first result is retrieved because a single video was processed.
  annotation_result = data.get("annotation_results")[0]


  for text_annotation in annotation_result.get("text_annotations"):
      if(brand_name in text_annotation.get("text")):
        frames = text_annotation.get("segments")[0].get("frames")
        for frame in frames:
          start_time_seconds = frame.get("time_offset").get("seconds") or 0
          if(start_time_seconds <= early_time_seconds):
            brand_name_early = True


  return brand_name_early

In [23]:
#@title 8) Connect: Face Close
#@markdown Is there a close up of the human face?

def is_face_close(video_name):
  connect_face_close = False

  max_face_surface = 0
  face_coverage = 0
  total_time_face_detected = 0

  blob = output_bucket.blob(f"facesdetection/facesdetection-{video_name}.json")
  # Download the contents of the blob as a string and then parse it using json.loads() method
  data = json.loads(blob.download_as_string(client=None))
  annotation_result = data.get("annotation_results")[0]

  video_length_seconds_safe = annotation_result.get("segment").get("end_time_offset").get("seconds") or 0
  video_length_nanos_safe = annotation_result.get("segment").get("end_time_offset").get("nanos") / 1e9 or 0
  video_length = video_length_seconds_safe + video_length_nanos_safe

  # print(f"Video Length: {video_length}s")

  if(annotation_result.get("face_detection_annotations")):
    for annotation in annotation_result.get("face_detection_annotations"):
      for track in annotation.get("tracks"):
        start_nanos = track.get("segment").get("start_time_offset").get("nanos")
        start_nanos_safe = start_nanos and start_nanos / 1e9 or 0
        start_seconds_safe = track.get("segment").get("start_time_offset").get("seconds") or 0
        start_time = start_seconds_safe + start_nanos_safe
        end_nanos = track.get("segment").get("end_time_offset").get("nanos")
        end_nanos_safe = end_nanos and end_nanos / 1e9 or 0
        end_seconds_safe = track.get("segment").get("end_time_offset").get("seconds") or 0
        end_time = end_seconds_safe + end_nanos_safe
        face_coverage += (end_time - start_time)

        if(track.get("confidence") >= confidence_threshold):
          # print(track.get("confidence"))
          # Each segment includes timestamped faces that include
          # characteristics of the face detected.
          face_objects = track.get("timestamped_objects")
          for i, face_object in enumerate(face_objects):
            box = face_object.get("normalized_bounding_box")
            time_offset = face_object.get("time_offset")
            # print(time_offset)
            offset_nanos = time_offset.get("nanos")
            offset_nanos_safe = offset_nanos and offset_nanos / 1e9 or 0
            offset_seconds_safe = time_offset.get("seconds") or 0
            box_start_seconds_safe = offset_seconds_safe + offset_nanos_safe

            left = box.get("left") or 0
            right = box.get("right") or 1
            top = box.get("top") or 0
            bottom = box.get("bottom") or 1

            width = right - left
            height = bottom - top
            surface = width * height

            next_offset_nanos_safe = 0
            next_offset_seconds_safe = 0

            next_i = i + 1
            if(next_i < len(face_objects)):
              next_box = face_objects[next_i].get("normalized_bounding_box")
              time_offset = face_objects[next_i].get("time_offset")
              offset_nanos = time_offset.get("nanos")
              next_offset_nanos_safe = offset_nanos and offset_nanos / 1e9 or 0
              next_offset_seconds_safe = time_offset.get("seconds") or 0

            box_end_seconds_safe = next_offset_seconds_safe + next_offset_nanos_safe
            # print(f"surface: {surface} vs limit: {surface_threshold}")
            if(box_end_seconds_safe > 0 and surface >= face_surface_threshold):
              max_face_surface = max(max_face_surface, surface)
              total_time_face_detected += box_end_seconds_safe - box_start_seconds_safe
  # print(f"Total closeup face time detected : {total_time_face_detected}s")

  if(total_time_face_detected > face_duration_threshold_seconds):
    connect_face_close = True

  return connect_face_close

In [24]:
#@title 9) Connect: Overall Pacing
#@markdown Is the pace of video greater than 2 seconds per shot?

def is_overall_pacing(video_name):
  overall_pacing = False
  total_time_per_shot = 0
  total_shots = 0

  blob = output_bucket.blob(f"shotdetection/shotdetection-{video_name}.json")

  # Download the contents of the blob as a string and then parse it using json.loads() method
  data = json.loads(blob.download_as_string(client=None))

  # first result is retrieved because a single video was processed
  for i, shot in enumerate(data.get("annotation_results")[0].get("shot_annotations")):
      # print(shot)
      start_time_microseconds = shot.get("start_time_offset").get("microseconds")
      start_time_microseconds_safe = start_time_microseconds and start_time_microseconds / 1e6 or 0
      end_time_microseconds = shot.get("end_time_offset").get("microseconds")
      end_time_microseconds_safe = end_time_microseconds and end_time_microseconds / 1e6 or 0
      start_time = (
          shot.get("start_time_offset").get("seconds") or 0 + start_time_microseconds_safe
      )
      end_time = (
          shot.get("end_time_offset").get("seconds") or 0 + end_time_microseconds_safe
      )
      total_shot_time = end_time - start_time
      total_time_per_shot += total_shot_time
      total_shots += 1
      # print("\tShot {}: {} to {}: {} seconds".format(i, start_time, end_time, total_shot_time))

  # print(total_time_per_shot / total_shots)
  if(total_time_per_shot / total_shots <= overall_shot_pace_threshold):
    overall_pacing = True

  return overall_pacing


In [25]:
#@title 10) Direct: Audio Early
#@markdown Is speech detected in the audio in the first 5 seconds?
def is_audio_early(video_name):
  direct_audio_early = False
  blob = output_bucket.blob(f"transcribe/transcribe-{video_name}.json")

  # Download the contents of the blob as a string and then parse it using json.loads() method
  data = json.loads(blob.download_as_string(client=None))
  annotation_results = data.get("annotation_results")[0]

  speech_transcriptions = annotation_results.get("speech_transcriptions")
  if(speech_transcriptions):
    for speech_transcription in annotation_results.get("speech_transcriptions"):
        for alternative in speech_transcription.get("alternatives"):
          if(alternative.get("confidence")):
            if(alternative.get("confidence") >= confidence_threshold):
              if(alternative.get("words")):
                for word_info in alternative.get("words"):
                    word = word_info.get("word")
                    start_time = word_info.get("start_time")
                    start_time_seconds = start_time.get("seconds") or 0
                    if(start_time_seconds <= early_time_seconds):
                      direct_audio_early = True

  return direct_audio_early


In [26]:
#@title 11) Direct: Call To Action
#@markdown Is there a call to action phrase detected on the video in the speech or text?

def is_call_to_action(video_name):
  model = TextGenerationModel.from_pretrained("text-bison")

  # https://developers.google.com/google-ads/api/rest/reference/rest/latest/CallToActionType?hl=en
  call_to_action_text_api_list = ['LEARN MORE','GET QUOTE', 'APPLY NOW', 'SIGN UP', 'CONTACT US', 'SUBSCRIBE', 'DOWNLOAD', 'BOOK NOW', 'SHOP NOW', 'BUY NOW', 'DONATE NOW', 'ORDER NOW', 'PLAY NOW', 'SEE MORE', 'START NOW', 'VISIT SITE', 'WATCH NOW']
  call_to_action_text_api_list_lower = [x.lower() for x in call_to_action_text_api_list]
  call_to_action_text_verbs_list = ['LEARN','QUOTE', 'APPLY', 'SIGN UP', 'CONTACT', 'SUBSCRIBE', 'DOWNLOAD', 'BOOK', 'SHOP', 'BUY', 'DONATE', 'ORDER', 'PLAY', 'SEE', 'START', 'VISIT', 'WATCH']
  call_to_action_text_verbs_list_lower = [x.lower() for x in call_to_action_text_verbs_list]

  call_to_action_early = False
  call_to_action_speech = False
  call_to_action_text = False

  call_to_action_speech = detect_call_to_action_speech(video_name, model, call_to_action_text_verbs_list_lower)
  call_to_action_text = detect_call_to_action_text(video_name, model, call_to_action_text_verbs_list_lower)

  return call_to_action_speech, call_to_action_text

def prompt_llm_cta(model, transcript):
  parameters = {
      "temperature": 0.2,
      "max_output_tokens": 1,
      "top_p": 0.8,
      "top_k": 40
      }
  #todo: how would this work for different languages.
  prompt = 'Is there a call to action in my ad: "' + transcript + '"? If the transcript is not in English, first translate it to English then do the check for call to action. Show the English version of the text too.'
  print('prompt: ', prompt)

  model_output = model.predict(prompt, **parameters).text

  time.sleep(1) #in order not to exceed text-bison quota of 60 requests
  # per minute (quota increase could be requested)

  print('llm said:', model_output)
  if(('yes' in model_output.lower())):
    return True
  else:
    return False

def detect_call_to_action_speech(video_name, model, cta_list):
  blob = output_bucket.blob(f"transcribe/transcribe-{video_name}.json")

  # Download the contents of the blob as a string and then parse it using json.loads() method
  data = json.loads(blob.download_as_string(client=None))
  annotation_results = data.get("annotation_results")[0]

  speech_transcriptions = annotation_results.get("speech_transcriptions")
  if(speech_transcriptions):
    for speech_transcription in speech_transcriptions:
        for alternative in speech_transcription.get("alternatives"):
          transcript = alternative.get("transcript")
          if(alternative.get("confidence")):
            if(alternative.get("confidence") >= confidence_threshold):
              for word in transcript.split():
                # print(word)
                for cta in cta_list:
                  if(word.lower() == cta):
                    return True
              # if(prompt_llm_cta(model, transcript)):
                # return True
  else:
    return False


def detect_call_to_action_text(video_name, model, cta_list):
  blob_text = output_bucket.blob(f"textdetection/textdetection-{video_name}.json")
  # Download the contents of the blob as a string and then parse it using json.loads() method
  data_text = json.loads(blob_text.download_as_string(client=None))
  annotation_result_text = data_text.get("annotation_results")[0]

  # print("Text on screen: ")
  for text_annotation in annotation_result_text.get("text_annotations"):
    words = text_annotation.get("text")
    for word in words.split():
      for cta in cta_list:
        if(word.lower() == cta):
          # print(word)
          return True
    #if(prompt_llm_cta(model, words)):
    # return True
  return False


In [27]:
def colored(r, g, b, text):
    return f"\033[38;2;{r};{g};{b}m{text}\033[0m"

def print_red(text):
  print(colored(255, 0, 0, text))

def print_green(text):
  print(colored(0, 128, 0, text))

def print_boolean(bool_flag, text):
  if(bool_flag):
    print_green(text)
  else:
    print_red(text)


In [28]:
def save_json_file(json_object, filename):
    blob = output_bucket.blob(filename)
    blob.upload_from_string(
        data=json_object,
        content_type='application/json'
        )
    result = filename + ' upload complete'
    print(result)

In [29]:
def get_jsonl_data(json_object):
    data = ""
    for entry in json_object:
      data+=json.dumps(entry)
      data+="\n"
    return data

In [31]:
def detect_abcds(video_name):
  score = 0

  abcd_names = [
                "attract_face_early",
                "attract_pacing_quick",
                "attract_dynamic_start",
                "brand_logo_big",
                "brand_logo_early",
                "brand_product_early",
                "brand_name_early",
                "connect_face_close",
                "connect_overall_pacing",
                "direct_audio_early",
                "direct_cta_speech",
                "direct_cta_text"]

  abcd_values = [
      is_face_early(video_name),
      is_pacing_quick(video_name),
      has_dynamic_start(video_name),
      list(is_logo_big_early(video_name))[0],
      list(is_logo_big_early(video_name))[1],
      is_product_early(video_name),
      is_name_early(video_name),
      is_face_close(video_name),
      is_overall_pacing(video_name),
      is_audio_early(video_name),
      list(is_call_to_action(video_name))[0],
      list(is_call_to_action(video_name))[1]]

  abcd_dict = dict(zip(abcd_names, abcd_values))

  for abcd in abcd_values:
    if(abcd):
      score += 1

  abcd_dict["video_name"] = video_name
  abcd_dict["score"] = score/12*100

  print(f"Video: {abcd_dict.get('video_name')}")
  print(f"Total score: {abcd_dict.get('score')}%")
  print_boolean(
      abcd_dict.get('attract_face_early'),
      f"1) Attract: Face Early                      : {abcd_dict.get('attract_face_early')}")
  print_boolean(
      abcd_dict.get('attract_pacing_quick'),
      f"2) Attract: Pace Quick                      : {abcd_dict.get('attract_pacing_quick')}")
  print_boolean(
      abcd_dict.get('attract_dynamic_start'),
      f"3) Attract: Dynamic Start                   : {abcd_dict.get('attract_dynamic_start')}")
  print_boolean(
      abcd_dict.get('brand_logo_big'),
      f"4) Brand: Logo Big                          : {abcd_dict.get('brand_logo_big')}")
  print_boolean(
      abcd_dict.get('brand_logo_early'),
      f"5) Brand: Logo Early                        : {abcd_dict.get('brand_logo_early')}")
  print_boolean(
      abcd_dict.get('brand_product_early'),
      f"6) Brand: Product Early                     : {abcd_dict.get('brand_product_early')}")
  print_boolean(
      abcd_dict.get('brand_name_early'),
      f"7) Brand: Name Early                        : {abcd_dict.get('brand_name_early')}")
  print_boolean(
      abcd_dict.get('connect_face_close'),
      f"8) Connect: Face Close                      : {abcd_dict.get('connect_face_close')}")
  print_boolean(
      abcd_dict.get('connect_overall_pacing'),
      f"9) Connect: Overall Pacing                  : {abcd_dict.get('connect_overall_pacing')}")
  print_boolean(
      abcd_dict.get('direct_audio_early'),
      f"10) Direct: Audio Early                     : {abcd_dict.get('direct_audio_early')}")
  print_boolean(
      abcd_dict.get('direct_cta_speech'),
      f"11a) Direct: Call to Action (speech)        : {abcd_dict.get('direct_cta_speech')}")
  print_boolean(
      abcd_dict.get('direct_cta_text'),
      f"11b) Direct: Call to Action (text on screen): {abcd_dict.get('direct_cta_text')}")

  return abcd_dict

In [34]:
#@title Summary ABCD
videos = storage_client.list_blobs(input_bucket_name)
abcd_jsons = []

# Note: The call returns a response only when the iterator is consumed.
for video in videos:
  video_name = video.name
  gs_uri = "gs://" + input_bucket_name + "/" + video_name
  video_url = '/content/' + input_bucket_name + '/' + video_name

  send_video_intelligence_requests(video_name, gs_uri)
  result = detect_abcds(video_name)
  abcd_jsons.append(result)


abcd_jsonls = get_jsonl_data(abcd_jsons)
filename = "final_abcd_report.json"
save_json_file(abcd_jsonls, filename)

Video: 231006_AffordabilityA_Hokkaido _video_15s_16x9_Affordability_Single Digital.mp4
Total score: 8.333333333333332%
[38;2;255;0;0m1) Attract: Face Early                      : False[0m
[38;2;255;0;0m2) Attract: Pace Quick                      : False[0m
[38;2;255;0;0m3) Attract: Dynamic Start                   : False[0m
[38;2;255;0;0m4) Brand: Logo Big                          : False[0m
[38;2;255;0;0m5) Brand: Logo Early                        : False[0m
[38;2;255;0;0m6) Brand: Product Early                     : False[0m
[38;2;255;0;0m7) Brand: Name Early                        : False[0m
[38;2;0;128;0m8) Connect: Face Close                      : True[0m
[38;2;255;0;0m9) Connect: Overall Pacing                  : False[0m
[38;2;255;0;0m10) Direct: Audio Early                     : False[0m
[38;2;255;0;0m11a) Direct: Call to Action (speech)        : False[0m
[38;2;255;0;0m11b) Direct: Call to Action (text on screen): False[0m
Video: DA_Nov23_central-woltpl