
```python
# Copyright 2025 Google LLC

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at

# https://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
```

# YouTube AI Event Detector

This Colab notebook is designed to analyze YouTube videos from a specified channel and determine if they contain a user-defined element. It leverages the power of the [Vertex API](https://cloud.google.com/vertex-ai/docs) to analyze video content and the [YouTube Data API](https://developers.google.com/youtube/v3/docs) to retrieve video metadata. The script outputs a structured report indicating the presence of the element and, if present, the approximate time range where it appears.

<br/>

## Requirements

In order to use this Colab you will need the following information:

  1. Your channel ID. This is a 24 character ID that you can find in the URL of your channel.

  2. Create a Google Cloud Project if you don't have one using the [following tutorial](https://developers.google.com/workspace/guides/create-project).

  3. Enable the YouTube APIs and Vertex API on of the [Google Cloud Console](https://support.google.com/googleapi/answer/6158841?hl=en).

  4. Create a service account in IAM section and a service account key for it using the [following tutorial](https://cloud.google.com/iam/docs/keys-create-delete).

  5. Download the service key that you created in step 4 in and keep it a safe place. This file will be used to authenticate this script to your account.


> **NOTE:** Vertex AI API is a paid feature and requires a billing account linked to your project. More [here](https://cloud.google.com/billing/docs/how-to/modify-project).

<br/>

## What does the script do?

The script performs the following actions:

* Retrieves a list of videos from a specified YouTube channel, sorted by view count.
* Analyzes each video using the Gemini API to detect the presence of a user-specified element.
* Records the analysis results, including the presence of the element and its time range (if applicable).
* Saves the results to a CSV file in a user-specified Google Drive folder.
Includes error handling with retry logic to manage potential API issues.

<br/>

## What the script is GOOD to detect?

Elements that have **physical** presence or that you can define by **nouns** are good candidates to be detected. Also, **situations** (where someone performs an action) can be easily identified by Gemini. The more specific you are, the better results you get.

Examples:

* Someone wearing a black T-shirt
* A dog barking
* Drone footage
* A sunset

<br/>


## What the script is NOT SO GOOD to detect?

Elements that are **subjective** or **open to interpretation**.

Examples:

* A person making a mistake
* Someone happy/sad/afraid
* A beautiful sunset


## Configuration

The script uses several configuration variables that need to be set by the user:

* `ROOT_FOLDER_NAME`: The name of the folder in Google Drive where the output CSV file will be saved. If the folder doesn't exist, one with this name will be created.
* `CSV_NAME`: The name you want to give to the output CSV file.
* `CHANNEL_ID`: The ID of the YouTube channel to analyze.
* `PROJECT_ID`: The ID of your Google Cloud Project. You can find your project ID following [these steps](https://support.google.com/googleapi/answer/7014113?hl=en).
* `LOCATION`: The location of your Google Cloud Project.
* `VIDEO_COUNT`: The number of videos to analyze, chosen from predefined options (20, 50, or 100).
* `ELEMENT_TO_FIND`: The specific element to search for within the videos (e.g., "a cat," "a product demonstration").
* `HARD_LIMIT`: A hard limit on the number of videos to retrieve from the channel, ordered by the published date.
* `FINAL_PROMPT`: A formatted prompt that is sent to the Gemini API to analyze the video. It instructs Gemini on what to look for and the desired output format.

In [None]:
ROOT_FOLDER_NAME = "ElementChecker" #@param {type:"string"}
CSV_NAME = "" #@param {type:"string"}
CHANNEL_ID = "" #@param {type:"string"}
PROJECT_ID =  "" #@param {type:"string"}
LOCATION = "" #@param {type:"string"}
VIDEO_COUNT = 100 #@param [20, 50, 100]
ELEMENT_TO_FIND = "" #@param {type:"string"}
HARD_LIMIT = 5000 #@param {type:"integer"}

## PROMPT
FINAL_PROMPT = f"""
Given the following video, can you answer if it contains {ELEMENT_TO_FIND}?
If yes, could you provide the aproximate time range where it appears?
I'd like to output a json object with the following format:
{{
  has_element: TRUE, if the element is present. FALSE otherwise.
  details: The aproximate time range where the element appears If the has_element 
  option is FALSE, this field should be empty.
}}

"""

## Mounting your Google Drive
The `mount()` function in Colab mounts your Google Drive account to the Colab runtime. This means that you can access all of the files in your Google Drive from within Colab, and you can also save files from Colab to your Google Drive.

Please note this step will redirect to an authentication step, and it might take a few minutes if you have a lot of files in Drive.

This step is optional. If you don't execute it, the files will be downloaded locally to the Colab instance.

In [None]:
from google.colab import drive
drive.mount("/content/drive")

## Project Imports

In [None]:
import os
import json
import vertexai
import sys
import time
import numpy as np
import pandas as pd

from googleapiclient import discovery
from google.oauth2 import service_account
from google.cloud import language_v2
from google.colab import (
    auth,
    files
)
from vertexai.generative_models import (
    GenerativeModel,
    HarmCategory,
    HarmBlockThreshold,
    Part,
    SafetySetting,
    GenerationConfig
)

# Authentication

The code below will create credentials to be used for Vertex and YT API.

The authentication process gives rights to the following scopes:

  
*   cloud-platform.readonly
*   youtube.readonly



In [None]:
SCOPES = [
    "https://www.googleapis.com/auth/cloud-platform.read-only",
    "https://www.googleapis.com/auth/youtube.readonly",
]

service_account_upload = files.upload()
service_account_json = json.loads(next(iter(service_account_upload.values())))
credentials = service_account.Credentials.from_service_account_info(
        service_account_json, scopes=SCOPES)

# GenAI Configuration

This section configures the Vertex API. It uses the `gemini-2.5-pro-exp-03-25` to generate the video description and the credentials set in the previous step.

It also sets the Gemini safety config to not block harmful content. This will allow to analyze every video, regardless of the content.

In [None]:
vertexai.init(project=PROJECT_ID, location=LOCATION, credentials=credentials)
model = GenerativeModel("gemini-2.5-pro-exp-03-25")

safety_config = [
    SafetySetting(
        category=HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT,
        threshold=HarmBlockThreshold.BLOCK_NONE,
    ),
    SafetySetting(
        category=HarmCategory.HARM_CATEGORY_HATE_SPEECH,
        threshold=HarmBlockThreshold.BLOCK_NONE,
    ),
    SafetySetting(
        category=HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT,
        threshold=HarmBlockThreshold.BLOCK_NONE,

    ),
    SafetySetting(
        category=HarmCategory.HARM_CATEGORY_HARASSMENT,
        threshold=HarmBlockThreshold.BLOCK_NONE,

    ),
    SafetySetting(
        category=HarmCategory.HARM_CATEGORY_CIVIC_INTEGRITY,
        threshold=HarmBlockThreshold.BLOCK_NONE,

    )
]


def get_gemini_analysis(prompt, video_url):
  """Analyzes a video using the Gemini model based on the provided prompt.

  This function takes a text prompt and a video URL as input, sends them to the
  Gemini model for analysis, and returns the structured JSON response. The
  response is expected to conform to a predefined schema indicating the
  presence of a specific element and providing optional details.

  Args:
      prompt: The text prompt to guide the Gemini model's analysis of the video.
      video_url: The URL of the video file to be analyzed 
      (must be a supported format, e.g., "video/mp4").

  Returns:
      A dictionary representing the JSON response from the Gemini model. The
      dictionary will have the following structure:
      {
          "has_element": "TRUE" or "FALSE",
          "details": "Optional details about the analysis" or None
      }

  Raises:
      Exception: If there is an error during the content generation process,
                 the exception is caught, an error message is printed, and the
                 exception is re-raised.
  """
  try:
    response_schema = {
        "type": "OBJECT",
        "properties": {
            "has_element": {
                "type": "STRING",
                "enum": ["TRUE", "FALSE"],
                "nullable": False
            },
            "details": {
                "type": "STRING",
                "nullable": True
            },
        },
        "required": ["has_element", "details"],
    }


    generation_config = GenerationConfig(
        temperature=1,
        top_p=0.8,
        max_output_tokens=8192,
        response_schema=response_schema,
        response_mime_type="application/json",
    )

    video_part = Part.from_uri(video_url, mime_type="video/mp4")

    responses = model.generate_content(
        [video_part, prompt],
        generation_config=generation_config,
        stream=False,
        safety_settings=safety_config,
    )
    return json.loads(responses.text)
  except Exception as e:
    print("Error generating content: " + str(e))
    raise e

## Youtube API Configuration

In [None]:

YT_API_SERVICE_NAME = "youtube"
YT_API_VERSION = "v3"

youtube_api = discovery.build(YT_API_SERVICE_NAME, YT_API_VERSION, credentials=credentials)


def get_viewcounts(video_ids, df):
  """Retrieves and updates the view counts for a list of YouTube videos.

  This function uses the YouTube Data API to fetch the statistics (specifically
  the view count) for a batch of video IDs. It then updates the 'views' column
  in the provided Pandas DataFrame for the corresponding videos.

  Args:
      video_ids: A list of YouTube video IDs for which to retrieve view counts.
      df: The Pandas DataFrame containing video information, including a
          'video_id' column. The 'views' column will be updated in place.
  """
  response_vids = youtube_api.videos().list(
        part="statistics",
        id=",".join(video_ids)
    ).execute()

  for item in response_vids["items"]:
    if("viewCount" not in item["statistics"]):
      df.loc[df.video_id == item["id"], "views"] = 0
    else:
      df.loc[df.video_id == item["id"], "views"] = item["statistics"]["viewCount"]


def get_videos_with_viewcounts_sorted(channel_id, video_count = None):
  """Retrieves and sorts a channel's recent videos, 
     limited by HARD_LIMIT variable, by view count

  Fetches video metadata and view counts from the specified YouTube channel,
  sorts them by views (descending), and optionally limits the number of results.

  Args:
      channel_id (str): The YouTube channel ID.
      video_count (int, optional): Maximum number of videos to return. 
      Defaults to None (all).

  Returns:
      pandas.DataFrame: DataFrame of videos sorted by views, with columns:
        'video_id', 'video_url', 'title', 'views', 'has_element', 'details'.
  """
  next_token = ""
  upload_playlist_id = "UU" + channel_id[2:]
  video_ids = []

  df = pd.DataFrame(columns=["video_id", "video_url", "title", 
                             "views", "has_element","details"])
  df_counter = 0

  while True:
    reponse = youtube_api.playlistItems().list(
        part="snippet",
        playlistId=upload_playlist_id,
        maxResults=500,
        pageToken=next_token
    ).execute()

    for item in reponse["items"]:
      videoId = item["snippet"]["resourceId"]["videoId"]
      title = item["snippet"]["title"]

      df.loc[df_counter] = [videoId, "https://www.youtube.com/watch?v=" 
                            + videoId, title, "", "", ""]
      df_counter += 1
      video_ids.append(videoId)

      if df_counter >= HARD_LIMIT:
        break

    get_viewcounts(video_ids, df)
    video_ids.clear()
    
    if df_counter >= HARD_LIMIT:
      break

    if "nextPageToken" in reponse:
      next_token = reponse["nextPageToken"]
    else:
      break

  df = df.sort_values(by=["views"], ascending=False, key=pd.to_numeric)
  return df.head(video_count) if video_count is not None else df

## Main Application

This section will call each method to:

 * Get the list of the videos, sorted by views, from the configured channel.
 * Ask Gemini to look for the element configured and return the ocurrences in terms of timestamp.
 * If an error occurs, it will try again, up to a maximum of `max_retry` times.

In [None]:
retry_att = 1;
max_retry = 3;

df = get_videos_with_viewcounts_sorted(CHANNEL_ID, VIDEO_COUNT)

for index, row in df.iterrows():
  while(retry_att <= max_retry):
    print(f'Reading https://youtu.be/{row["video_id"]} at attempt #{retry_att}')
    try:
      analysis = get_gemini_analysis(FINAL_PROMPT, row["video_url"])
      print(analysis)
      df.at[index, "has_element"] = analysis["has_element"]
      df.at[index, "details"] = analysis["details"]
      break
    except Exception as e:
      if retry_att + 1 <= max_retry:
        print(f'Error #{retry_att}, retrying...')
        retry_att += 1
        time.sleep(5)
        continue
      print(f'Max number of retries has been reached; moving on... [{str(e)}]', )
      df.at[index, "has_element"] = "ERROR"
      df.at[index, "details"] = str(e)
      break
  retry_att = 1

# Persist into GDrive

After the script runs, this code will save the results to a folder in Google Drive, as configured by the user.

In [None]:
gdrive_path = "/content/drive/MyDrive/"
OUTPUT_FILE_NAME = f"{CSV_NAME}.csv"
output_folder_path = os.path.join(gdrive_path, ROOT_FOLDER_NAME)
output_file_path = os.path.join(output_folder_path, OUTPUT_FILE_NAME)

os.makedirs(output_folder_path, exist_ok=True)

df.to_csv(output_file_path, index=False)

## Disconnect Google Drive from this Colab

In [None]:
drive.flush_and_unmount()
print("All changes made in this colab session should now be visible in Drive.")