Copyright 2025 Google LLC

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    https://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

# Video Ads Compass

Video Ads Compass is a self-service solution that aims to help advertisers predict whether their video ads will be flagged for Google Ads Policy Violation. This tool relais on policy rules and best practices curated by the user based on their experience.
The Video Ads Compass tool will analyze the videos against the provided rules, and will determine if and when each rule was violated in the video ad.
Clients can use this output to avoid video ads being flagged for policy violation.

## Requirements

1.  A GCP project with billing attached.
2.  The Vertex AI API enabled.
3.  Policy rules file in csv format, named `rules.csv` with following columns: `Rule ID, Rule Description`

## Usage

1.  Make a copy of this Colab notebook.
2.  Upload the rules file to the "Files" tab on the left side.
2.  Have your videos in either a GCP Cloud Storage Bucket or a Google Drive folder.
3.  It is recommended to use a shortened version of the videos, containing only the first minute.
4.  Run the Colab cells one by one and enter configuration details where needed.
5.  If using a Google Drive folder as the source of videos, you need to provide the base path of this folder, depending on whether it's your drive or a shared drive, using one of these formats:

    * `MyDrive/Path/To/Folder`
    * `Shareddrive/Path/To/Folder`

6.  When finished, you will receive a Google Spreadsheet URL to access the tool's output.

# Setup

In [None]:
#@title Installations

!pip3 install --upgrade --user google-cloud-aiplatform
!pip3 install ffmpeg-python

In [None]:
#@title Imports
# Standard library imports
import json
import os
import sys
from datetime import datetime
from typing import Dict, TypedDict, List, Optional, Union

# Third-party library imports
from google.auth import default
from google.colab import auth, drive
from google.cloud import storage
import gspread
import pandas as pd
import vertexai
from vertexai.preview.generative_models import (
    GenerationConfig,
    GenerativeModel,
    Part,
)


In [None]:
#@title Configs & Setup

PROJECT_ID = "" #@param {type:"string"}
LOCATION = "us-central1" #@param {type:"string"}
MODEL = "gemini-2.0-flash-001" #@param {type:"string"}
VIDEO_SOURCE = "DRIVE" #@param {type:"string"} ["DRIVE", "GCS"]
BUCKET_NAME = "" #@param {type:"string", placeholder:"Enter if using GCS as source"}
DRIVE_BASE_PATH = "" #@param {type:"string", placeholder:"Enter if using Google Drive as source"}

_DRIVE_PATH_PREFIX = "/content/drive/"
DRIVE_PATH = _DRIVE_PATH_PREFIX + DRIVE_BASE_PATH

auth.authenticate_user()
vertexai.init(project=PROJECT_ID, location=LOCATION)
generative_model = vertexai.generative_models.GenerativeModel(MODEL) # use the fully-qualified name

def list_video_files_gcs(bucket_name: str) -> List[str]:
    """Lists MP4 files within the specified GCS bucket."""
    video_files = []
    for blob in client.list_blobs(BUCKET_NAME):
      if 'mp4' in blob.name:
        video_files.append(f'gs://{BUCKET_NAME}/{blob.name}')
    return video_files

def list_video_files_drive(base_path: str) -> List[str]:
    """Lists MP4 files within the specified base path in Google Drive."""
    video_files = []

    if not os.path.exists(base_path):
        print(f"WARNING: Path does not exist: {base_path}")
        return []

    for root, _, files in os.walk(base_path):
        for file in files:
            if file.lower().endswith('.mp4'):
                video_files.append(os.path.join(root, file))
    return video_files


if VIDEO_SOURCE == "GCS":
  from google.cloud import storage
  client = storage.Client()
  all_uris = list_video_files_gcs(BUCKET_NAME)

elif VIDEO_SOURCE == "DRIVE":
  from google.colab import drive
  try:
      drive.mount('/content/drive')
      print("Google Drive mounted successfully.")
  except Exception as e:
      print(f"Error mounting Google Drive: {e}")
      print("Make sure you've authorized Google Drive access.")
      sys.exit(1)
  all_uris = list_video_files_drive(DRIVE_PATH)

all_keys = [str(x) for x in range(len(all_uris))]
all_uri_map = dict(zip(all_keys, all_uris))


print('All URIs map:')
print(all_uri_map)

print("Script finished.")

#Shorten Videos
### 🛑 Only run this section if you need to create a shorten version of your videos

This section will create a shorten (default 1 minute) versions of all the videos in the video source you provided (DRVIE / GCS)

In [None]:
#@title Shorten videos code
import ffmpeg
import os
import tqdm

new_length_seconds = 60 #@param {type: "number"}
_SHORT_PREFIX = "SHORT_"

def create_short_videos(video_dict):
    shorten_uri_map = {}
    # Initialize the progress bar
    for key, uri in tqdm.tqdm(video_dict.items(), desc="Processing Videos"):

        base_uri, file_name = uri.split("/")[-2:]
        file_name = file_name.replace("SHORT_", "")
        short_file_name = _SHORT_PREFIX + file_name
        short_uri = base_uri + '/' + short_file_name
        if short_uri not in shorten_uri_map.values():
            shorten_uri_map[key] = short_uri


        if VIDEO_SOURCE == "DRIVE":
            video_path = DRIVE_PATH + '/' + file_name
            short_video_path = DRIVE_PATH + '/' + short_file_name
            if os.path.exists(short_video_path):
                tqdm.tqdm.write(f"Shortened video for '{file_name}' already exists. Skipping...")
                continue

            ffmpeg.input(video_path, t=new_length_seconds).output(short_video_path).run()
            shorten_uri_map[key] = base_uri + '/' + short_file_name


        elif VIDEO_SOURCE == "GCS":
            # Initialize Google Cloud Storage client
            storage_client = storage.Client()

            # Check if the short video already exists in the bucket
            bucket = storage_client.bucket(BUCKET_NAME)
            short_blob = bucket.blob(short_file_name)

            if short_blob.exists():
                tqdm.tqdm.write(f"Shortened video for '{file_name}' already exists. Skipping...")
                continue



            # Download the video file from GCS
            blob = bucket.blob(file_name)
            blob.download_to_filename(file_name)

            # Create the short video (only the first 1 minute)
            ffmpeg.input(file_name, t=new_length_seconds).output(short_file_name).run()

            # Upload the short video back to GCS
            short_blob.upload_from_filename(short_file_name)

            # Clean up local files
            os.remove(file_name)
            os.remove(short_file_name)


    return shorten_uri_map

all_uri_map = create_short_videos(all_uri_map)

# Analyze

In [None]:
#@title Load prompt and schema

prompt = """
You are an experienced advertising content reviewer. You specialize in determining whether or not a video ad violates google ads policy or not.
Please conduct a strict content review of the video ad provided according to the detailed review tags - added below in csv format.
Your judgment will directly affect the launch and promotion of this video ad, so please take every detail seriously.
Based on the attached policy rules, analyze the video and for each policy rule determine if it violates it or not. If it does violate it, score the seriousness of the violation on a scale of 1 to 5, explain the reason for your decision, and provide the timestamp for the violation part.

Video Ad Policy Review Instructions:
- Carefully watch the entire video ad from start to finish
- Analyze each frame and content element against the attached policy rules
- Be thorough and objective in your assessment
- Consider context, intent, and potential viewer interpretation

Detailed Review Methodology:
1. For each policy rule, provide a comprehensive analysis
2. If a rule is violated, clearly document:
   - Specific timestamp(s) of violation
   - Exact content causing the violation
   - Visual or contextual details
3. Assess violation severity and potential impact

Output Format (for each rule):
{
    "rule_index": int,                 # Policy rule index number
    "rule_violation": bool,             # Whether rule is violated
    "violation_score": int,             # Severity (1-5)
    "confidence_score": float,          # Confidence in assessment (0.0-1.0)
    "violation_reason": str,            # Detailed explanation
    "violation_timestamp": str,         # Specific timestamp of violation
}

Final Report Requirements:
- Overall Compliance Assessment: Provide a summary percentage of policy compliance
- The answer for each rule according to the provided format


Here are the review tags, in a CSV format:


"""

# Add rules from rules.csv to prompt
try:
  with open('rules.csv', 'r') as file:
    rules_content = file.read()
    prompt += rules_content
except FileNotFoundError:
  print("rules.csv not found in the Files tab.")


rule_schema = {
    "type": "object",
    "properties": {
        "rule_index": {"type": "integer"},
        "rule_violation": {"type": "boolean"},
        "violation_score": {"type": "integer"},
        "violation_reason": {"type": "string"},
        "violation_time": {"type": "string"},
    },
    "required": ["rule_index", "rule_violation", "violation_score", "violation_reason", "violation_time"],
}

response_schema = {
    "type": "object",
    "properties": {
        "overall_compliance_assessment": {"type": "number"},  # Use "number" for float
        "rules": {
            "type": "array",
            "items": rule_schema,
        },
    },
    "required": ["overall_compliance_assessment", "rules"],
}


In [None]:
#@title Load Video Analysis Functions

disclaimer_text = """
This analysis is for reference only and does not constitute the final
interpretation or enforcement of Google Ads policies.
Whether or not an ad violates the policy is still subject to the
latest version of Google Ads policies.
"""

def analyze_video(file_path: str) -> str:
    """Analyzes a video file using Gemini."""

    gen_config = GenerationConfig(
        response_mime_type="application/json",
        response_schema=response_schema
    )

    try:
        # Read the video file data as bytes
        with open(file_path, "rb") as f:
            video_data = f.read()

        # Create a Part object from the video data
        video_part = Part.from_data(data=video_data, mime_type="video/mp4")

        model_response = generative_model.generate_content(
            [prompt, video_part],
            generation_config=gen_config,
        )
        return model_response.text

    except Exception as e:
        print(f"Error analyzing video at {file_path}: {e}")
        return None



def process_videos_and_create_df(all_uri_map: Dict[str, str]) -> pd.DataFrame:
    """
    Processes video URIs, analyzes them, and creates a flattened DataFrame.

    Args:
        all_uri_map: Dictionary mapping keys to local file paths of videos.

    Returns:
        A pandas DataFrame containing the flattened results.
    """

    all_results = []

    for key, file_path in all_uri_map.items():
        try:
            result_text = analyze_video(file_path)
            if result_text:
                result = json.loads(result_text)

                for rule in result['rules']:
                    rule['video_type'] = 'all'
                    rule['video_key'] = key
                    rule['video_uri'] = file_path
                    rule['overall_compliance_assessment'] = result['overall_compliance_assessment']
                all_results.extend(result['rules'])

        except Exception as e:
            print(f"Error processing URI {file_path}: {e}")

    if not all_results:
        return pd.DataFrame()

    df = pd.DataFrame(all_results)
    df = df[['video_type', 'video_key', 'video_uri', 'overall_compliance_assessment',
             'rule_index', 'rule_violation', 'violation_score', 'violation_reason', 'violation_time']]
    return df

def upload_df_to_gsheets(df: pd.DataFrame) -> str | None:
    """Creates a new Google Sheet, adds a merged disclaimer, uploads a DataFrame, and formats it.

    Args:
      df: The Pandas DataFrame to upload.

    Returns:
      The URL of the created Google Sheet, or None on error.
    """
    cols_definition = ['video_type', 'video_key', 'video_uri', 'overall_compliance_assessment',
                       'rule_index', 'rule_violation', 'violation_score', 'violation_reason', 'violation_time']

    if df.empty:
        print("DataFrame is empty, creating sheet with disclaimer and default headers.")
        num_cols = len(cols_definition)
        header_list = [cols_definition]
        data_list = []
    else:
        for col in cols_definition:
             if col not in df.columns:
                df[col] = None
        df = df[cols_definition]

        num_cols = len(df.columns)
        header_list = [df.columns.values.tolist()]
        data_list = df.astype(str).values.tolist()

    if num_cols == 0:
        print("Error: Cannot determine columns for Google Sheet.")
        return None

    last_col_letter = gspread.utils.rowcol_to_a1(1, num_cols)[0]

    try:
        creds, _ = default()
        gc = gspread.authorize(creds)
        print("Google Sheets API authorized successfully.")
    except Exception as e:
        print(f"Error authorizing Google Sheets API: {e}")
        print("Please ensure Application Default Credentials are set up correctly.")
        return None

    ss_name_prefix = "Video Ads Compass Output"
    spreadsheet_name = f"{ss_name_prefix} {datetime.now().strftime('%Y%m%d_%H%M%S')}"

    try:
        print(f"Creating spreadsheet: '{spreadsheet_name}'...")
        sh = gc.create(spreadsheet_name)
        worksheet = sh.sheet1
        print(f"Spreadsheet created with ID: {sh.id}")

        # 1. Write Disclaimer text to A1 (it will be merged and formatted later)
        worksheet.update_acell('A1', disclaimer_text)

        # 2. Write Headers starting from A2
        if header_list:
            worksheet.update(header_list, range_name='A2')

        # 3. Write DataFrame values starting from A3
        if data_list:
            print(f"Writing {len(data_list)} data rows to spreadsheet...")
            worksheet.update(data_list, range_name='A3')
        else:
             print("No data rows to write.")

        # 4. Format the sheet
        print("Applying formatting to spreadsheet...")
        return format_gsheet(sh, df, last_col_letter)
    except gspread.exceptions.APIError as api_e:
        print(f"Google Sheets API Error creating/updating spreadsheet '{spreadsheet_name}': {api_e}")
        return None
    except Exception as e:
        print(f"General Error creating/updating spreadsheet '{spreadsheet_name}': {e}")
        return None


def format_gsheet(sh, df: pd.DataFrame, last_col_letter: str) -> str:
    """Formats the Google Sheet: merged disclaimer, wrapping, colors, borders, filter, and row height.

    Args:
        sh: The gspread spreadsheet object.
        df: The original DataFrame (used for data checks).
        last_col_letter: The letter of the last column (e.g., 'I').

    Returns:
        The spreadsheet URL.
    """
    worksheet = sh.sheet1
    num_data_rows = len(df)

    header_row_index = 2
    first_data_row_index = 3

    disclaimer_range_to_merge = f'A1:{last_col_letter}1'
    disclaimer_format_cell = 'A1'
    header_range = f'A{header_row_index}:{last_col_letter}{header_row_index}'

    if num_data_rows > 0:
        last_data_row_index = first_data_row_index + num_data_rows - 1
        data_range = f'A{first_data_row_index}:{last_col_letter}{last_data_row_index}'
        full_range_inc_header = f'A{header_row_index}:{last_col_letter}{last_data_row_index}'
    else:
        data_range = None
        full_range_inc_header = header_range

    formatting_requests = [] # Collect requests for batch update

    # 1. Merge Disclaimer Row FIRST
    try:
        worksheet.merge_cells(disclaimer_range_to_merge)
    except gspread.exceptions.APIError as e:
         print(f"Warning: Could not merge cells {disclaimer_range_to_merge}. Error: {e}")

    # 2. Format Merged Disclaimer Cell (A1)
    formatting_requests.append({
        'range': disclaimer_format_cell,
        'format': {
            'textFormat': {'bold': True, 'fontSize': 14},
            'wrapStrategy': 'WRAP',
            'verticalAlignment': 'MIDDLE',
            'horizontalAlignment': 'CENTER',
        }
    })

    # 3. Apply Wrapping to Headers and Data Area (if any)
    if num_data_rows >= 0:
         formatting_requests.append({
             'range': full_range_inc_header,
             'format': {
                 'wrapStrategy': 'WRAP',
                 'verticalAlignment': 'TOP',
             }
         })

    # 4. Format Header Row
    formatting_requests.append({
        'range': header_range,
        'format': {
            'textFormat': {'bold': True},
            'backgroundColor': {'red': 0.8, 'green': 0.9, 'blue': 1.0},
            'verticalAlignment': 'MIDDLE',
        }
    })

    # 5. Conditional Formatting for 'rule_violation' == "TRUE"
    if data_range:
        for index, row in df.iterrows():
            if str(row.get('rule_violation')).upper() == "TRUE":
                sheet_row = first_data_row_index + index
                formatting_requests.append({
                    'range': f'A{sheet_row}:{last_col_letter}{sheet_row}',
                    'format': {'backgroundColor': {'red': 1.0, 'green': 0.8, 'blue': 0.8}}
                })


    # 6. Group Border Formatting when 'video_key' changes
    if data_range:
        current_video_key = None
        df_sorted = df.reset_index().sort_values(by=['video_key', 'index'])

        for index, row in df_sorted.iterrows():
            original_df_index = row['index']
            sheet_row = first_data_row_index + original_df_index

            video_key = row.get('video_key')
            if video_key != current_video_key and original_df_index != 0:
                 formatting_requests.append({
                    'range': f'A{sheet_row}:{last_col_letter}{sheet_row}',
                    'format': {'borders': {'top': {'style': 'SOLID_THICK', 'color': {'red': 0, 'green': 0, 'blue': 0}}}}
                })
            current_video_key = video_key


    if formatting_requests:
        print(f"Applying {len(formatting_requests)} formatting requests via batch...")
        try:
            worksheet.batch_format(formatting_requests)
        except gspread.exceptions.APIError as e:
            print(f"Warning: API Error during batch formatting: {e}. Some formats might not be applied.")
        except Exception as e:
             print(f"Warning: General Error during batch formatting: {e}.")

    # 7. Add Filters (covering headers and data)
    try:
        print("Setting basic filter...")
        worksheet.set_basic_filter(full_range_inc_header)
    except gspread.exceptions.APIError as e:
         print(f"Warning: Could not set basic filter on {full_range_inc_header}. Error: {e}")

    return sh.url

In [None]:
#@title Run Analysis

df_results = process_videos_and_create_df(all_uri_map)
output_url = upload_df_to_gsheets(df_results)
print(f"Finished Video Ads Compass analysis. Find results here: {output_url}")