<a href="https://colab.research.google.com/github/gfilicetti/vision-warehouse-examples/blob/main/%5BShared%5D_VisionWarehouse_2_0_Private_GA_Python_SDK.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Get Started (üõë Attention Required)

Before starting,
* Make a copy of the colab before you run it.
* Get added to the allowlist by sending an email to warehouse-v2-trusted-testers-mailing-external+managers@google.com.
  * Mention the email account to get the access to colab and SDK.
  * Ask for the allowlist for the project. Mention the project ID in the email.
* Model garden models (speech, OCR) requires the quota of the underlying prediction services to be sufficient, e.g. to run **one** video api model for **100** concurrent AnalyzeAsset, suggest to request QPM 125 and backend seconds to be 6000. You can request corresponding quota based on your estimated concurrency. Video API [default quota](https://docs.cloud.google.com/video-intelligence/quotas) request QPM is 60 and backend seconds is 180, which means can allow 3 videos processing at the same time. The default quota is sufficient for the colab demo purpose.

* To avoid charges after test, you need to undeploy the index (see the clean up [undeply index section](https://colab.research.google.com/drive/1S-fKU8gTM6jPKd_-Dt9U4ed3qkUB_hAt?resourcekey=0-ri3myj9ID9aBDr0ON7xuUw#scrollTo=6neZMt-YSJh7)).
* The colab is for demo the Warehouse CUJ purpose, and is not for processing large number of files because the colab session may disconnected. So the colab checks the number of input files is less than **20**. Though Warehouse service supports videos up to 15h, due to limitations of colab session and LRO polling timeout (currently set as 2h in colab), it is suggested to test with shorter video lengths in the colab (**<1h**). Please keep the colab session connected during the run.



# Part 1: Main Demo

Demo how to ingest a bunch of video files into Warehouse and Search.

## Overview

At the highest level, there is a simple three-stage process for getting your content into our system and making it searchable.

1.  Provision Corpus: A one-time setup step to create a container for your data and prepare the backend services.
2.  Ingest Videos: The process of adding your video assets, applying machine learning, and attaching your custom metadata.
3.  Search: The final step where your application can query the system to find relevant videos or video segments.


--------------------------------------

| Step | Approximately time |
| :--- | :--- |
| [Step 1: Provision Corpus](https://colab.research.google.com/drive/1S-fKU8gTM6jPKd_-Dt9U4ed3qkUB_hAt?resourcekey=0-ri3myj9ID9aBDr0ON7xuUw#scrollTo=6gIcJvfN2XAz) | Avg: 2 hour, One Time setup |
| [Step 2: Ingest Videos](https://colab.research.google.com/drive/1S-fKU8gTM6jPKd_-Dt9U4ed3qkUB_hAt?resourcekey=0-ri3myj9ID9aBDr0ON7xuUw#scrollTo=5ghMbrQhuEox) | Depends on the video duration, if 1 hour video, it takes 1 hour to ingest. |
| [Step 3: Search](https://colab.research.google.com/drive/1S-fKU8gTM6jPKd_-Dt9U4ed3qkUB_hAt?resourcekey=0-ri3myj9ID9aBDr0ON7xuUw#scrollTo=M_ZHmBe53sNV) |  Instance |




## Pricing

* Model garden models (currently supported speech transcription and text detection) will charge the cost to the underlying prediction services, e.g. speech & OCR will charge to Video API.
* Warehourse pricing see https://cloud.google.com/vision-ai/pricing. Please undeploy the index endpoint after testing to avoid further charges.

## Limitations




* Don't support update existing EMBEDDING_SEARCH corpus to DEFAULT_SEARCH.

* It is expected to observe similar search results for EMBEDDING_SEARCH vs DEFAULT_SEARCH if you are using non-English search query. Please use English search query for testing the search quality difference between the two. Stay tuned for more languages support in the future.

* For speech transcription, currently only support specifying one audio track. Language must be specified. Only en-US is supported. In this colab, it will use audio track 0 for speech transcription if the video has audio tracks.

* It is not allowed to update the config used for the same model, e.g. updating the audio tracks used for the speech transcription model. To do that, you can

  * Option 1: Create new asset.

  * Option 2: Update the existing asset removing the model_garden_model_context entry of the model, run AnalyzeAsset which will clean up the annotations of the removed model, and then update the asset with the correct model_garden_model_context and run AnalyzeAsset again.

## Set Up Eviornment

### Authentication

In [None]:
from google.colab import auth
auth.authenticate_user()

### Download SDK

In [None]:
!gsutil cp gs://visionai-warehouse-v2-preview/visionai-v1-py.tar.gz .
!tar -xf visionai-v1-py.tar.gz
!ls
!pip3 install visionai-v1-py/

## Set Up Variables (üõë Actions Required)

In [None]:
#@title üõë Required: Set Your Project Id
PROJECT_ID="" # @param {type: "string"}

PROJECT_NUMBER_STR=!gcloud projects describe {PROJECT_ID} --format="value(projectNumber)"
PROJECT_NUMBER=int(PROJECT_NUMBER_STR[0])
LOCATION_ID = 'us-central1' #@param ["us-central1"]



In [None]:
# @title Optional: Other variables.
# Please set it at least 2 times of the video length.
TIMEOUT_SECS=7200
# Warehouse provides the optional to turn on the sports enhancement which mainly applies to American football.
IS_AMERICAN_FOOTBALL=False

## üõë Required: Define Your Input Videos

In [None]:
# @title Run this cell to render the input widget.

import ipywidgets as widgets
from IPython.display import display, Markdown

select_file_input_method = widgets.RadioButtons(
    options=['GCS Bucket (e.g., gs://my-data-bucket)', 'List of GCS Files (one path per line)'],
    value='GCS Bucket (e.g., gs://my-data-bucket)',
    description='Input Type:',
    disabled=False,
    layout=widgets.Layout(width='auto'),
    style={'description_width': 'initial'}
)

bucket_input = widgets.Text(
    value='',
    placeholder='gs://your-bucket-name',
    description='Bucket Path:',
    disabled=False,
    layout=widgets.Layout(width='600px')
)
file_list_input = widgets.Textarea(
    value='gs://cloud-samples-data/video/JaneGoodall.mp4\ngs://cloud-samples-data/video/pizza.mp4\ngs://cloud-samples-data/video/animals.mp4',
    placeholder='gs://bucket/file1.mp4\ngs://bucket/file2.mov\ngs://bucket/folder/file3.ts',
    description='File List:',
    disabled=False,
    rows=5,
    layout=widgets.Layout(width='600px')
)

input_container = widgets.VBox([bucket_input])
message_output = widgets.Output()

def on_file_input_radio_change(change):
    """Dynamically updates the input field and the instruction message."""

    with message_output:
        message_output.clear_output(wait=True)

        if 'GCS Bucket' in change['new']:
            input_container.children = [bucket_input]
            print("üí° Please enter a single GCS bucket path. All files in this bucket will be processed.")
        elif 'List of GCS Files' in change['new']:
            input_container.children = [file_list_input]
            print("üí° Please enter a list of complete GCS file paths, one per line.")

select_file_input_method.observe(on_file_input_radio_change, names='value')

display(select_file_input_method)
display(message_output)
display(input_container)

on_file_input_radio_change({'new': select_file_input_method.value, 'old': None})

## üõë Required: Provision New Or Use Exising Corpus/Index/IndexEndpoint

In [None]:
# @title Run this cell to render the input widget.

import ipywidgets as widgets
from IPython.display import display

choice_widget = widgets.RadioButtons(
    options=['Create New Corpus', 'Use Existing IDs'],
    value='Use Existing IDs',
    description='Setup Mode:',
    disabled=False
)

corpus_input = widgets.Text(
    description='Corpus ID:',
    placeholder='Enter existing corpus ID...',
    layout=widgets.Layout(width='auto')
)

index_input = widgets.Text(
    description='Index ID:',
    placeholder='Enter existing index ID...',
    layout=widgets.Layout(width='auto')
)

endpoint_input = widgets.Text(
    description='Endpoint ID:',
    placeholder='Enter existing endpoint ID...',
    layout=widgets.Layout(width='auto')
)

id_input_box = widgets.VBox([corpus_input, index_input, endpoint_input])

output = widgets.Output()

def on_provision_choice_change(change):
    """Callback function to run when the user changes the radio button."""
    with output:
        output.clear_output()

        if change['new'] == 'Use Existing IDs':
            print("‚ö†Ô∏è Please enter the three required IDs below.")
            display(id_input_box)
        else:
            print("‚úÖ Ready to provision a new warehouse corpus from scratch.")

display(choice_widget)

choice_widget.observe(on_provision_choice_change, names='value')

on_provision_choice_change({'new': choice_widget.value})

display(output)



## üõë Required: Whether Clean Up After Tests

If the index is deployed, it will continue charging the index serving cost.

**ATTENTION**: After the index is undeployed, you won't be able to search the corpus. So it is suggested:

* Step 1: Don't check any variables here and run the "Build the Warehouse" section.

* Step 2: Try out the searches with your own search queries.

* Step 3: Then to stop billing without deleting assets/corpus, you can check the undeploy_index option.
   
   * 3.1 : If the colab session is not disconnected, you can just run the "Step 4: Clean up > Undeploy Index " section.
   
   * 3.2: If the colab session has been expired, first set the corpus id, index id, index endpoint id in the "Required:Provision New or Use Existing Corpus/Index/IndexEndpoint" and then run the "Build the Warehouse > Preparation" section and then run the "Step 4: Clean up > Undeploy Index" section.

In [None]:
# @title Run this cell to render the input widget.
delete_assets_widget = widgets.Checkbox(
    value=False,
    description='DELETE_ASSETS',
    disabled=False,
    indent=False
)

undeploy_index_widget = widgets.Checkbox(
    value=False,
    description='UNDEPLOY_INDEX',
    disabled=False,
    indent=False
)

delete_index_widget = widgets.Checkbox(
    value=False,
    description='DELETE_INDEX',
    disabled=False,
    indent=False
)

delete_index_endpoint_widget = widgets.Checkbox(
    value=False,
    description='DELETE_INDEX_ENDPOINT',
    disabled=False,
    indent=False
)

delete_corpus_widget = widgets.Checkbox(
    value=False,
    description='DELETE_CORPUS',
    disabled=False,
    indent=False
)

# Display the widgets
display(
    delete_assets_widget,
    undeploy_index_widget,
    delete_index_widget,
    delete_index_endpoint_widget,
    delete_corpus_widget
)

 üõë After setting the inputs above, now you can run the "Build the Warehouse" section all together (by hovering the mouse at the section title in the left hand "Table of contents" and click the ‚ñ∂ button). Please **DON'T** rerun the above cells which will empty your inputs.

## Build the Warehouse

### Preparation

#### Imports

In [None]:
from concurrent.futures import TimeoutError
from google.api_core import exceptions as google_exceptions
from google.cloud import visionai_v1
from google.longrunning import operations_pb2
from google.protobuf import duration_pb2
from google.protobuf import struct_pb2
from IPython.display import display, Markdown, HTML, Image
from typing import Optional, List, Any, Dict
import concurrent.futures
import dataclasses
import datetime
import grpc
import logging
import os
import pandas as pd
import shlex
import subprocess
import sys
import time

#### Creates a Warehouse Client

In [None]:
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s - %(levelname)s - %(message)s',
                    force=True)

# Initialize the client ONCE
try:
    warehouse_client = visionai_v1.WarehouseClient(
      client_options={"api_endpoint": "warehouse-visionai.googleapis.com"},
    )

    logging.info("WarehouseClient initialized successfully.")
except Exception as e:
    logging.critical(f"Failed to initialize WarehouseClient: {e}")


# --- Type Hinting ---
try:
    from google.cloud.visionai_v1 import (
        WarehouseClient,
        Asset,
        DataSchema,
        Annotation,
        CreateAssetRequest,
        CreateDataSchemaRequest,
        CreateAnnotationRequest,
        UploadAssetRequest,
        UploadAssetResponse,
        AnalyzeAssetRequest,
        AnalyzeAssetResponse,
        IndexAssetRequest,
        IndexAssetResponse,
    )
except ImportError:
    logging.warning("WarehouseClient or other types not imported, using 'Any'.")
    WarehouseClient = Any
    Asset = Any
    DataSchema = Any
    Annotation = Any
    CreateAssetRequest = Any
    CreateDataSchemaRequest = Any
    CreateAnnotationRequest = Any
    UploadAssetRequest = Any
    UploadAssetResponse = Any
    AnalyzeAssetRequest = Any
    AnalyzeAssetResponse = Any
    IndexAssetRequest = Any
    IndexAssetResponse = Any

#### Construct Resource Names if Using Existing Corpus/Index/IndexEndpoint



In [None]:
need_provision = (choice_widget.value == 'Create New Corpus')

if not need_provision:
    # IDs come from the widget input fields
    corpus_id = corpus_input.value
    index_id = index_input.value
    index_endpoint_id = endpoint_input.value

    parent_name=f"projects/{PROJECT_NUMBER}/locations/{LOCATION_ID}"
    corpus_name = f"{parent_name}/corpora/{corpus_id}"
    index_name = f"{parent_name}/corpora/{corpus_id}/indexes/{index_id}"
    index_endpoint_name = f"{parent_name}/indexEndpoints/{index_endpoint_id}"
    logging.info("‚úÖ No Provision is needed as existing ids are used.")
    logging.info (f"Corpus         : {corpus_name}")
    logging.info (f"Index          : {index_name}")
    logging.info (f"Index Endpoint : {index_endpoint_name}")

### Collect Files from User Input


In [None]:
file_names = []

# Get the selected option and the raw input values
selected_option = select_file_input_method.value
bucket_path = bucket_input.value.strip()
file_list_raw = file_list_input.value.strip()

print("--- Starting File List Collection ---")

# --- Logic for GCS Bucket ---
if 'GCS Bucket' in selected_option:
    GCS_SOURCE = bucket_path

    if not GCS_SOURCE or not GCS_SOURCE.startswith('gs://'):
        print("üõë ERROR: Please enter a valid GCS bucket path starting with 'gs://'.")
    else:
        print(f"üìÅ Listing files from bucket: {GCS_SOURCE}")

        # Use gsutil ls to list all files in the bucket.
        # The output is captured and split into individual lines.
        try:
            gsutil_command = f'gsutil ls {GCS_SOURCE}'
            result = subprocess.run(
                shlex.split(gsutil_command),
                capture_output=True,
                text=True,
                check=True
            )
            # Filter the output to remove directories (which end with '/')
            # and clean up empty lines.
            file_names = [
                line.strip()
                for line in result.stdout.split('\n')
                if line.strip() and not line.endswith('/')
            ]

            if not file_names:
                print(f"‚ö†Ô∏è WARNING: Found 0 files in bucket {GCS_SOURCE}. Check the path or contents.")
            else:
                print(f"‚úÖ Success! Found {len(file_names)} files to process.")

        except subprocess.CalledProcessError as e:
            print(f"üõë ERROR running gsutil: {e.stderr}")
        except FileNotFoundError:
            print("üõë ERROR: 'gsutil' command not found. Ensure the environment is configured correctly.")

# --- Logic for List of GCS Files ---
elif 'List of GCS Files' in selected_option:

    if not file_list_raw:
        print("üõë ERROR: The list of GCS files cannot be empty.")
    else:
        # Split the text area input into a list of file paths, cleaning up whitespace
        file_names = [
            line.strip()
            for line in file_list_raw.split('\n')
            if line.strip() and line.startswith('gs://')
        ]

        if not file_names:
            print("‚ö†Ô∏è WARNING: No valid GCS paths found in the list (must start with 'gs://').")
        else:
            print(f"‚úÖ Success! Loaded {len(file_names)} files directly from user input.")

# --- Final Check and Output ---
if file_names:
  if len(file_names) > 20:
    print("üõë ERROR: The number of files to process exceeds 20.")
    sys.exit(1)
  else:
    print("-" * 35)
    print(f"The first 3 files are:")
    for f in file_names[:3]:
        print(f"  - {f}")
    print("-" * 35)
    print("\nüöÄ Ready for processing! The file paths are stored in the 'file_names' list.")
else:
    print("\n‚ùå Processing cannot continue. The 'file_names' list is empty.")

### Enable Google APIs

In [None]:
!gcloud config set project {PROJECT_ID}
!gcloud services enable visionai.googleapis.com
!gcloud services enable videointelligence.googleapis.com

### Step 1: Provision a corpus

Before you can add any content, you must first provision a "Corpus". Think of a Corpus as your dedicated, private container within our system. When you run the "Provision Corpus" section, you are telling our system to prepare all the necessary infrastructure for you. This includes setting up the indexing pipeline (which organizes your data for fast retrieval) and the search serving pipeline (which handles incoming search requests). This is a foundational, one-time action for each collection of content you wish to manage.


---


| Step | Approximately time |
| :--- | :--- |
| Step 1.1: CreateCorpus API | ~ 3min |
| Step 1.2: CreateIndex API | 30min - 60min |
| Step 1.3: CreateIndexEndpoint API | ~ 5min |
| Step 1.4: DeployIndexEndpoint |  30min - 60min |

---

üõë NOTE: You only need to run this step once. Please save the created CorpusID/IndexID/IndexEndpointID for the following steps.  




#### Predefined Utils for Provision
CreateCorpus, CreateIndex, CreateIndexEndpoint, DeployIndex

In [None]:
def create_corpus_robust(
    client: visionai_v1.WarehouseClient,
    project_number: str,
    location_id: str,
    display_name: str,
    description: str,
    search_capabilities: List[visionai_v1.SearchCapability],
    corpus_type: visionai_v1.Corpus.Type = visionai_v1.Corpus.Type.VIDEO_ON_DEMAND,
    timeout_seconds: int = 7200
) -> Optional[str]:
    """
    Submits a request to create a new Corpus in the Vision AI Warehouse.

    Args:
        client: The initialized WarehouseClient.
        project_number: The GCP project number.
        location_id: The GCP location (e.g., "us-central1").
        display_name: The human-readable name for the new corpus.
        description: A description for the new corpus.
        search_capabilities: A list of search capabilities to enable.
        corpus_type: The type of corpus (e.g., VIDEO_ON_DEMAND).
        timeout_seconds: Max time to wait for the operation to complete.

    Returns:
        The resource name of the newly created corpus (e.g., ".../corpora/..."),
        or None if creation failed.
    """
    try:
        logging.info(f"Initializing corpus with name: '{display_name}'")
        parent = f"projects/{project_number}/locations/{location_id}"

        corpus = visionai_v1.Corpus(
            display_name=display_name,
            description=description,
            type_=corpus_type,
            search_capability_setting=visionai_v1.SearchCapabilitySetting(
                search_capabilities=search_capabilities
            ),
        )

        request = visionai_v1.CreateCorpusRequest(
            parent=parent,
            corpus=corpus,
        )

        logging.info(f"Submitting create_corpus request to parent: {parent}...")
        operation = client.create_corpus(request=request)

        logging.info(f"‚è≥Waiting for operation to complete (timeout: {timeout_seconds}s)...")
        response = operation.result(timeout=timeout_seconds)

        logging.info(f"Successfully created corpus: {response.name}")
        logging.debug(f"Full response: {response}")
        return response.name

    except TimeoutError:
        logging.error(f"Operation timed out after {timeout_seconds} seconds. "
                      "The corpus might still be creating in the background.")
        return None
    except google_exceptions.GoogleAPICallError as e:
        logging.error(f"An API error occurred during corpus creation: {e}")
        return None
    except Exception as e:
        logging.critical(f"An unexpected (non-API) error occurred: {e}", exc_info=True)
        return None


def create_index_robust(
    client: visionai_v1.WarehouseClient,
    parent_corpus_name: str,
    display_name: str,
    description: str,
    timeout_seconds: int = 7200
) -> Optional[str]:
    """
    Submits a request to create a new Index in the Vision AI Warehouse.

    Args:
        client: The initialized WarehouseClient.
        parent_corpus_name: The resource name of the corpus (e.g., "projects/.../corpora/...").
        display_name: The human-readable name for the new index.
        description: A description for the new index.
        timeout_seconds: Max time to wait for the operation to complete.

    Returns:
        The resource name of the newly created index (e.g., ".../indexes/..."),
        or None if creation failed.
    """
    try:
        logging.info(f"Initializing index with name: '{display_name}'")
        index = visionai_v1.Index(
            display_name=display_name,
            description=description,
        )
        request = visionai_v1.CreateIndexRequest(
            parent=parent_corpus_name,
            index=index,
        )

        logging.info(f"Submitting create_index request to parent: {parent_corpus_name}...")
        operation = client.create_index(request=request)

        logging.info(f"Operation submitted. Operation name: {operation._operation.name}")
        logging.info(f"‚è≥Waiting for operation to complete (timeout: {timeout_seconds}s)...")
        response = operation.result(timeout=timeout_seconds)
        logging.info(f"Successfully created index: {response.name}")
        logging.debug(f"Full response: {response}")
        return response.name

    except TimeoutError:
        logging.error(f"Operation timed out after {timeout_seconds} seconds. "
                      "The index might still be creating in the background.")
        return None
    except google_exceptions.GoogleAPICallError as e:
        logging.error(f"An API error occurred during index creation: {e}")
        return None
    except Exception as e:
        logging.critical(f"An unexpected (non-API) error occurred: {e}", exc_info=True)
        return None


def create_index_endpoint_robust(
    client: visionai_v1.WarehouseClient,
    project_number: str,
    location_id: str,
    display_name: str,
    description: str,
    timeout_seconds: int = 7200
) -> Optional[str]:
    """
    Submits a request to create a new IndexEndpoint in the Vision AI Warehouse.

    Args:
        client: The initialized WarehouseClient.
        project_number: The GCP project number.
        location_id: The GCP location (e.g., "us-central1").
        display_name: The human-readable name for the new index.
        description: A description for the new index.
        timeout_seconds: Max time to wait for the operation to complete.

    Returns:
        The resource name of the newly created index (e.g., ".../indexes/..."),
        or None if creation failed.
    """
    try:
        logging.info(f"Initializing indexEndpoint with name: '{display_name}'")
        parent = f"projects/{project_number}/locations/{location_id}"

        index_endpoint = visionai_v1.IndexEndpoint(
            display_name=display_name,
            description=description,
        )

        request = visionai_v1.CreateIndexEndpointRequest(
            parent=parent,
            index_endpoint=index_endpoint,
        )

        logging.info(f"Submitting create_index_endpoint request to parent: {parent}...")
        operation = client.create_index_endpoint(request=request)

        logging.info(f"Operation submitted. Operation name: {operation._operation.name}")
        logging.info(f"‚è≥Waiting for operation to complete (timeout: {timeout_seconds}s)...")
        response = operation.result(timeout=timeout_seconds)

        logging.info(f"Successfully created index_endpoint: {response.name}")
        logging.debug(f"Full response: {response}")
        return response.name

    except TimeoutError:
        logging.error(f"Operation timed out after {timeout_seconds} seconds. "
                      "The index_endpoint might still be creating in the background.")
        return None
    except google_exceptions.GoogleAPICallError as e:
        logging.error(f"An API error occurred during index endpoint creation: {e}. Will retry.")
        return None
    except Exception as e:
        logging.critical(f"An unexpected (non-API) error occurred: {e}", exc_info=True)
        return None


def deploy_index_endpoint_robust(
    client: visionai_v1.WarehouseClient,
    index_endpoint_name: str,
    index_name: str,
    timeout_seconds: int = 7200
) -> Optional[str]:
    """
    Submits a request to deploy the index to the index endpoint.

    Args:
        client: The initialized WarehouseClient.
        index_endpoint_name: The resource name of the index_endpoint (e.g., "projects/.../indexEndpoints/...").
        index_name: The resource name of the index (e.g., "projects/.../indexes/...").
        timeout_seconds: Max time to wait for the operation to complete.

    Returns:
        The resource name of the newly created index (e.g., ".../indexes/..."),
        or None if creation failed.
    """
    try:
        logging.info(f"Deploying indexEndpoint with name: '{index_endpoint_name} with index {index_name}'")

        deployed_index = visionai_v1.DeployedIndex()
        deployed_index.index = index_name

        request = visionai_v1.DeployIndexRequest(
            index_endpoint=index_endpoint_name,
            deployed_index=deployed_index,
        )

        logging.info(f"Submitting deploy_index request...")
        operation = client.deploy_index(request=request)
        logging.info(f"Operation submitted. Operation name: {operation._operation.name}")

        logging.info(f"‚è≥Waiting for operation to complete (timeout: {timeout_seconds}s)...")
        response = operation.result(timeout=timeout_seconds)

        logging.info(f"Successfully deploy index_endpoint")
        logging.debug(f"Full response: {response}")
        return operation._operation.name

    except TimeoutError:
        logging.error(f"Operation timed out after {timeout_seconds} seconds. "
                      "The index_endpoint might still be creating in the background.")
        return None
    except google_exceptions.GoogleAPICallError as e:
        logging.error(f"An API error occurred during index creation: {e}. Expected if this is a brand new project. Will retry after the first corpus is created.")
        return None
    except Exception as e:
        logging.critical(f"An unexpected (non-API) error occurred: {e}", exc_info=True)
        return None


#### Provision
You can use this section to trigger creation of corpus, index and index endpoint together automatically.


If any step fails, you can check which step fails, and call the corresponding API to rerun it or check the operation status manually.

In [None]:
need_provision = (choice_widget.value == 'Create New Corpus')

if not need_provision:
    # IDs come from the widget input fields
    corpus_id = corpus_input.value
    index_id = index_input.value
    index_endpoint_id = endpoint_input.value

    parent_name=f"projects/{PROJECT_NUMBER}/locations/{LOCATION_ID}"
    corpus_name = f"{parent_name}/corpora/{corpus_id}"
    index_name = f"{parent_name}/corpora/{corpus_id}/indexes/{index_id}"
    index_endpoint_name = f"{parent_name}/indexEndpoints/{index_endpoint_id}"
    logging.info("‚úÖ No Provision is needed as existing ids are used.")
    logging.info (f"Corpus         : {corpus_name}")
    logging.info (f"Index          : {index_name}")
    logging.info (f"Index Endpoint : {index_endpoint_name}")
else:
  param_corpus_display_name = "python colab test corpus" # @param {type: "string"}
  param_corpus_description = "python colab test corpus" # @param {type: "string"}
  param_index_display_name = "python colab test index" # @param {type: "string"}
  param_index_description = "python colab test index" # @param {type: "string"}
  param_index_endpoint_display_name = "python colab test endpoint" # @param {type: "string"}
  param_index_endpoint_description = "python colab test endpoint" # @param {type: "string"}

  # --- Default Search Capabilities ---
  DEFAULT_SEARCH_CAPABILITY = visionai_v1.SearchCapability(
      type_=visionai_v1.SearchCapability.Type.DEFAULT_SEARCH
  )
  if IS_AMERICAN_FOOTBALL:
    DEFAULT_SEARCH_CAPABILITY.default_search_config=visionai_v1.SearchCapability.DefaultSearchConfig(
        model_id="sports-1.0"
    )
  # --- Speech Transcription ---
  SPEECH_SEARCH_CAPABILITY = visionai_v1.SearchCapability(
      type_=visionai_v1.SearchCapability.Type.MODEL_GARDEN_MODEL_POWERED_SEARCH,
      model_garden_model_powered_search_config=visionai_v1.SearchCapability.ModelGardenModelPoweredSearchConfig(
          model_resource_id="video-speech-transcription@001"
      )
  )
  # --- Text Detections ---
  TEXT_SEARCH_CAPABILITY = visionai_v1.SearchCapability(
      type_=visionai_v1.SearchCapability.Type.MODEL_GARDEN_MODEL_POWERED_SEARCH,
      model_garden_model_powered_search_config=visionai_v1.SearchCapability.ModelGardenModelPoweredSearchConfig(
          model_resource_id="video-text-detection@001"
      )
  )
  DEFAULT_VIDEO_ON_DEMAND_CAPABILITIES = [
      DEFAULT_SEARCH_CAPABILITY,
      SPEECH_SEARCH_CAPABILITY,
      TEXT_SEARCH_CAPABILITY
  ]

  def run_provision_warehouse_pipeline(
      client: visionai_v1.WarehouseClient,
      project_number: str,
      location_id: str,
      corpus_display_name: str,
      corpus_description: str,
      index_display_name: str,
      index_description: str,
      index_endpoint_display_name: str,
      index_endpoint_description: str,
      default_search_capabilities: List[visionai_v1.SearchCapability],
      max_parallel_workers: int = 2
  ) -> Optional[Dict[str, str]]:
      """
      Runs the full pipeline to create and deploy a Vision AI Warehouse index.

      Steps:
      1.  (Parallel) Create Corpus
      2.  (Parallel/Sequential) Create IndexEndpoint. If it is a project without
          any corpus created yet, this step will be retried to run after the
          corpus is created.
      3.  (Sequential) Create Index (depends on Corpus)
      4.  (Sequential) Deploy Index (depends on Index and IndexEndpoint)

      Args:
          client: The initialized WarehouseClient.
          project_number: The GCP project number.
          location_id: The GCP location (e.g., "us-central1").
          corpus_display_name: Display name for the new corpus.
          corpus_description: Description for the new corpus.
          index_display_name: Display name for the new index.
          index_description: Description for the new index.
          index_endpoint_display_name: Display name for the new index endpoint.
          index_endpoint_description: Description for the new index endpoint.
          max_parallel_workers: Max threads to use for parallel steps.

      Returns:
          A dictionary containing the resource names of all created assets,
          or None if any step failed.
      """
      logging.info("--- Starting Provision Warehouse Pipeline ---")

      corpus_name: Optional[str] = None
      index_name: Optional[str] = None
      index_endpoint_name: Optional[str] = None
      deployed_index_endpoint: Optional[str] = None

      try:
          with concurrent.futures.ThreadPoolExecutor(max_workers=max_parallel_workers) as executor:

              # --- Phase 1: Run CreateCorpus and CreateIndexEndpoint ---
              logging.info("Phase 1: Submitting CreateCorpus and CreateIndexEndpoint.")

              # Submit Task (CreateCorpus)
              future_corpus = executor.submit(
                  create_corpus_robust,
                  client=client,
                  project_number=project_number,
                  location_id=location_id,
                  display_name=corpus_display_name,
                  description=corpus_description,
                  search_capabilities=DEFAULT_VIDEO_ON_DEMAND_CAPABILITIES
              )

              # Submit Task (CreateIndexEndpoint)
              future_index_endpoint = executor.submit(
                  create_index_endpoint_robust,
                  client=client,
                  project_number=project_number,
                  location_id=location_id,
                  display_name=index_endpoint_display_name,
                  description=index_endpoint_description
              )

              # --- Barrier: Wait for Phase 1 to complete ---
              # .result() blocks until its specific task is done and returns the value

              logging.info("Waiting for CreateCorpus to complete...")
              corpus_name = future_corpus.result()

              logging.info("Waiting for CreateIndexEndpoint to complete...")
              index_endpoint_name = future_index_endpoint.result()

              # --- Check for failures before proceeding ---
              if not corpus_name:
                  logging.error("‚ùåPhase 1 failed. One or more parallel tasks did not return a resource name. Aborting.")
                  if not corpus_name: logging.error("  > create_corpus_robust failed.")
                  return None
              if not index_endpoint_name:
                  logging.info("Retry create index endpoint which need to run after the first corpus is created in the project.")
                  # --- Waits as CreateCorpus will provision IAM permissions which needs a short period to take effect.
                  time.sleep(240)
                  index_endpoint_name = create_index_endpoint_robust(
                    client=client,
                    project_number=project_number,
                    location_id=location_id,
                    display_name=index_endpoint_display_name,
                    description=index_endpoint_description)
                  if not index_endpoint_name:
                    logging.error("‚ùåPhase 1 failed. One or more parallel tasks did not return a resource name. Aborting.")
                    logging.error("  > create_index_endpoint_robust failed.")
                    return None

              logging.info("‚úÖPhase 1 complete.")
              logging.info(f"  > Corpus Name: {corpus_name}")
              logging.info(f"  > Index Endpoint Name: {index_endpoint_name}")

          # --- Phase 2: Run CreateIndex (depends on corpus_name) ---
          # This runs sequentially after Phase 1
          logging.info("Phase 2: Submitting CreateIndex...")
          index_name = create_index_robust(
              client=client,
              parent_corpus_name=corpus_name,
              display_name=index_display_name,
              description=index_description
          )

          if not index_name:
              logging.error("‚ùåPhase 2 (CreateIndex) failed. Aborting pipeline.")
              return None

          logging.info("‚úÖPhase 2 complete.")
          logging.info(f"  > Index Name: {index_name}")

          # --- Phase 3: Run DeployIndexEndpoint (depends on index_name and index_endpoint_name) ---
          logging.info("Phase 3: Submitting DeployIndexEndpoint...")

          deployed_index_endpoint = deploy_index_endpoint_robust(
              client=client,
              index_endpoint_name=index_endpoint_name,
              index_name=index_name
          )

          if not deployed_index_endpoint:
              logging.error("‚ùåPhase 3 (DeployIndexEndpoint) failed. Pipeline finished with errors.")
              return None

          logging.info("--- üéâFull Warehouse Pipeline Succeeded!üéâ ---")
          logging.info(f"  > Corpus: {corpus_name}")
          logging.info(f"  > Index: {index_name}")
          logging.info(f"  > Index Endpoint: {index_endpoint_name}")
          logging.info(f"  > Deployed Status: Index is deployed to endpoint.")

          # Return all the created resource names
          return {
              "corpus_name": corpus_name,
              "index_name": index_name,
              "index_endpoint_name": index_endpoint_name,
          }

      except Exception as e:
          logging.critical(f"A critical error occurred in the pipeline executor: {e}", exc_info=True)
          return None

  if 'warehouse_client' in locals() and 'PROJECT_NUMBER' in locals() and 'LOCATION_ID' in locals():
      pipeline_results = run_provision_warehouse_pipeline(
          client=warehouse_client,
          project_number=PROJECT_NUMBER,
          location_id=LOCATION_ID,
          corpus_display_name=param_corpus_display_name,
          corpus_description=param_corpus_description,
          index_display_name=param_index_display_name,
          index_description=param_index_description,
          index_endpoint_display_name=param_index_endpoint_display_name,
          index_endpoint_description=param_index_endpoint_description,
          default_search_capabilities=DEFAULT_VIDEO_ON_DEMAND_CAPABILITIES
      )

      if pipeline_results:
          logging.info(f"üéâü•≥Pipeline completed successfully!")
          logging.info(f"Corpus: {pipeline_results.get('corpus_name')}")
          logging.info(f"Index: {pipeline_results.get('index_name')}")
          logging.info(f"Index Endpoint: {pipeline_results.get('index_endpoint_name')}")
          corpus_name = pipeline_results.get('corpus_name')
          index_name = pipeline_results.get('index_name')
          index_endpoint_name = pipeline_results.get('index_endpoint_name')
      else:
          logging.info(f"‚ùå Pipeline failed. Check logs above for errors.")
          sys.exit(1)
  else:
      logging.error("‚ùå`warehouse_client`, `PROJECT_NUMBER`, or `LOCATION_ID` is not defined. "
                    "Please run the initialization cells.")
      sys.exit(1)

### Step 2: Video Processing

For each video, Vision Warehouse will run


--------------------------------------


| Step | Approximately time |
| :--- | :--- |
| CreateAsset API | Instance |
| UploadAsset API | depends on the duration |
| AnalyzeAsset API | depends on the duration |
| IndexAsset API | a few minutes |

> It also checks whether the video has audio track to decided whether trigger
speech model for the asset. This is done by using ffprobe and UpdateAsset (to remove the speech model from the created asset).

> From better user experience, we also create the default `title annotation` for each asset you created, the default title would be the **same** value of the GCS file name.


#### Predefined Utils for ingesting video file

##### Wrapper functions for CreateAsset, UploadAsset, AnalyzeAsset, IndexAsset, CreateDataSchema, CreateAnnotation

In [None]:
def create_asset_robust(
    client: visionai_v1.WarehouseClient,
    parent_corpus_name: str,
    search_capability_contexts: List[visionai_v1.SearchCapabilityContext],
    ignore_already_exists: bool = False
) -> Optional[str]:
    """
    Creates a new Asset resource within a corpus.
    This is a non-LRO, synchronous call.

    Args:
        client: The initialized WarehouseClient.
        parent_corpus_name: The resource name of the corpus to create the asset in.

    Returns:
        The resource name of the newly created asset, or None if creation failed.
    """
    try:
        logging.info(f"Creating asset in corpus '{parent_corpus_name}'...")

        asset = visionai_v1.Asset(
            search_capability_contexts=search_capability_contexts
        )

        request = visionai_v1.CreateAssetRequest(
            parent=parent_corpus_name,
            asset=asset,
        )

        response = client.create_asset(request=request)

        logging.info(f"Successfully created asset: {response.name}")
        return response.name

    except google_exceptions.GoogleAPICallError as e:
        logging.error(f"API error while creating asset: {e}")
        return None
    except Exception as e:
        logging.critical(f"An unexpected error occurred: {e}", exc_info=True)
        return None


def create_data_schema_robust(
    client: visionai_v1.WarehouseClient,
    parent_corpus_name: str,
    data_schema: visionai_v1.DataSchema,
    ignore_already_exists: bool = False
) -> Optional[str]:
    """
    Creates a new DataSchema resource within a corpus.
    This is a non-LRO, synchronous call.

    Args:
        client: The initialized WarehouseClient.
        parent_corpus_name: The resource name of the corpus.
        data_schema: The DataSchema object to create.

    Returns:
        The resource name of the newly created data schema, or None if creation failed.
    """
    try:
        logging.info(f"Creating data schema {data_schema.key} in corpus '{parent_corpus_name}'...")

        request = visionai_v1.CreateDataSchemaRequest(
            parent=parent_corpus_name,
            data_schema=data_schema,
        )

        response = client.create_data_schema(request=request)

        logging.info(f"Successfully created data schema: {response.name}")
        return response.name

    except google_exceptions.AlreadyExists as e:
        logging.warning(f"Data schema with key '{data_schema.key}' already exists in corpus '{parent_corpus_name}'. Continuing.")
        return f"{parent_corpus_name}/dataschemas/{data_schema.key}"
    except google_exceptions.GoogleAPICallError as e:
        logging.error(f"API error while creating data schema: {e}")
        return None
    except Exception as e:
        logging.critical(f"An unexpected error occurred: {e}", exc_info=True)
        return None

def create_annotation_robust(
    client: visionai_v1.WarehouseClient,
    parent_asset_name: str,
    annotation: visionai_v1.Annotation,
    ignore_already_exists: bool = False
) -> Optional[str]:
    """
    Creates a new Annotation resource within an asset.
    This is a non-LRO, synchronous call.

    Args:
        client: The initialized WarehouseClient.
        parent_asset_name: The resource name of the asset to annotate.
        annotation: The Annotation object to create.

    Returns:
        The resource name of the newly created annotation, or None if creation failed.
    """
    try:
        logging.info(f"Creating annotation in asset '{parent_asset_name}'...")

        request = visionai_v1.CreateAnnotationRequest(
            parent=parent_asset_name,
            annotation=annotation,
        )

        response = client.create_annotation(request=request)

        logging.info(f"Successfully created annotation: {response.name}")
        return response.name

    except google_exceptions.AlreadyExists as e:
        logging.warning(f"Annotation '{annotation.key}' already exists in asset '{parent_asset_name}'. Continuing.")
        return f"{annotation.name}"
    except google_exceptions.GoogleAPICallError as e:
        logging.error(f"API error while creating annotation: {e}")
        return None
    except Exception as e:
        logging.critical(f"An unexpected error occurred: {e}", exc_info=True)
        return None

def upload_asset_robust(
    client: visionai_v1.WarehouseClient,
    asset_name: str,
    asset_gcs_uri: str,
    timeout_seconds: int = TIMEOUT_SECS
) -> bool:
    """
    Uploads an asset (e.g., video) from GCS into the Warehouse.
    This is a long-running operation.

    Args:
        client: The initialized WarehouseClient.
        asset_name: The resource name of the *existing* asset placeholder.
        asset_gcs_uri: The "gs://" path to the media file in GCS.
        timeout_seconds: Max time to wait for the operation to complete.

    Returns:
        True on success, False on failure.
    """
    try:
        logging.info(f"Uploading asset from '{asset_gcs_uri}' to asset '{asset_name}'...")

        request = visionai_v1.UploadAssetRequest(
            name=asset_name,
            asset_source=visionai_v1.AssetSource(
                asset_gcs_source=visionai_v1.AssetSource.AssetGcsSource(
                    gcs_uri=asset_gcs_uri
                )
            ),
        )

        operation = client.upload_asset(request=request)

        logging.info(f"Operation submitted. Operation name: {operation._operation.name}")

        logging.info(f"‚è≥Waiting for upload to complete (timeout: {timeout_seconds}s)...")
        response: visionai_v1.UploadAssetResponse = operation.result(timeout=timeout_seconds)

        logging.info(f"Successfully uploaded asset: {asset_name}")
        logging.debug(f"Full response: {response}")
        return True

    except TimeoutError:
        logging.error(f"Operation timed out after {timeout_seconds} seconds. "
                      "The asset upload might still be in progress.")
        return False
    except google_exceptions.GoogleAPICallError as e:
        logging.error(f"API error while uploading asset: {e}")
        return False
    except Exception as e:
        logging.critical(f"An unexpected error occurred: {e}", exc_info=True)
        return False

def upload_asset_robust_with_retry(
    client: visionai_v1.WarehouseClient,
    asset_name: str,
    asset_gcs_uri: str,
    timeout_seconds: int = TIMEOUT_SECS,
    retry_cnt: int = 3,
) -> bool:
    cnt = 0
    uploaded = upload_asset_robust(client, asset_name, asset_gcs_uri, timeout_seconds)
    while not uploaded and cnt < retry_cnt:
        logging.info(f"Retrying upload asset for {asset_gcs_uri}")
        uploaded = upload_asset_robust(client, asset_name, asset_gcs_uri, timeout_seconds)
        cnt = cnt + 1
    return uploaded


def analyze_asset_robust(
    client: visionai_v1.WarehouseClient,
    asset_name: str,
    timeout_seconds: int = TIMEOUT_SECS
) -> bool:
    """
    Runs analysis (e.g., speech-to-text) on an uploaded asset.
    This is a long-running operation.

    Args:
        client: The initialized WarehouseClient.
        asset_name: The resource name of the asset to analyze.
        timeout_seconds: Max time to wait for the operation to complete.

    Returns:
        True on success, False on failure.
    """
    try:
        logging.info(f"Analyzing asset: {asset_name}...")

        request = visionai_v1.AnalyzeAssetRequest(
            name=asset_name,
        )

        operation = client.analyze_asset(request=request)

        logging.info(f"Operation submitted. Operation name: {operation._operation.name}")

        logging.info(f"‚è≥Waiting for analysis to complete (timeout: {timeout_seconds}s)...")
        response: visionai_v1.AnalyzeAssetResponse = operation.result(timeout=timeout_seconds)

        logging.info(f"Successfully analyzed asset: {asset_name}")
        logging.debug(f"Full response: {response}")
        return True

    except TimeoutError:
        logging.error(f"Operation timed out after {timeout_seconds} seconds. "
                      "The asset analysis might still be in progress.")
        return False
    except google_exceptions.GoogleAPICallError as e:
        logging.error(f"API error while analyzing asset: {e}")
        return False
    except Exception as e:
        logging.critical(f"An unexpected error occurred: {e}", exc_info=True)
        return False


def index_asset_robust(
    client: visionai_v1.WarehouseClient,
    asset_name: str,
    index_name: str,
    index_asset_level_only: bool = False,
    timeout_seconds: int = TIMEOUT_SECS
) -> bool:
    """
    Indexes an asset, making its contents searchable.
    This is a long-running operation.

    Args:
        client: The initialized WarehouseClient.
        asset_name: The resource name of the asset to index.
        timeout_seconds: Max time to wait for the operation to complete.

    Returns:
        True on success, False on failure.
    """
    try:
        logging.info(f"Indexing asset: {asset_name}...")

        request = visionai_v1.IndexAssetRequest(
            name=asset_name,
            index_asset_level_only=index_asset_level_only,
            index=index_name,
        )

        operation = client.index_asset(request=request)

        logging.info(f"Operation submitted. Operation name: {operation._operation.name}")

        logging.info(f"‚è≥Waiting for indexing to complete (timeout: {timeout_seconds}s)...")
        response: visionai_v1.IndexAssetResponse = operation.result(timeout=timeout_seconds)

        logging.info(f"Successfully indexed asset: {asset_name}")
        logging.debug(f"Full response: {response}")
        return True

    except TimeoutError:
        logging.error(f"Operation timed out after {timeout_seconds} seconds. "
                      "The asset indexing might still be in progress.")
        return False
    except google_exceptions.GoogleAPICallError as e:
        logging.error(f"API error while indexing asset: {e}")
        return False
    except Exception as e:
        logging.critical(f"An unexpected error occurred: {e}", exc_info=True)
        return False

def generate_retrieval_url_robust(
    client: visionai_v1.WarehouseClient,
    asset_name: str,
) -> str:
    try:
        request = visionai_v1.GenerateRetrievalUrlRequest(
            name=asset_name,
        )

        response = client.generate_retrieval_url(request=request)
        logging.debug(f"Full response: {response}")
        return response.signed_uri

    except google_exceptions.GoogleAPICallError as e:
        logging.error(f"API error while retrieving uri: {e}")
        return False
    except Exception as e:
        logging.critical(f"An unexpected error occurred: {e}", exc_info=True)
        return False

def update_asset_robust(
    client: visionai_v1.WarehouseClient,
    asset_name: str,
    search_capability_contexts: List[visionai_v1.SearchCapabilityContext],
) -> Optional[str]:
    """
    Creates a new Asset resource within a corpus.
    This is a non-LRO, synchronous call.

    Args:
        client: The initialized WarehouseClient.
        parent_corpus_name: The resource name of the corpus to create the asset in.

    Returns:
        The resource name of the newly created asset, or None if creation failed.
    """
    try:
        logging.info(f"Updating asset in corpus '{asset_name}'...")

        asset = visionai_v1.Asset(
            name=asset_name,
            search_capability_contexts=search_capability_contexts
        )

        request = visionai_v1.UpdateAssetRequest(
            asset=asset,
        )

        response = client.update_asset(request=request)

        logging.info(f"Successfully updated asset: {response.name}")
        return response.name

    except google_exceptions.GoogleAPICallError as e:
        logging.error(f"API error while creating asset: {e}")
        return None
    except Exception as e:
        logging.critical(f"An unexpected error occurred: {e}", exc_info=True)
        return None

##### Helper function for checking if video has any audio tracks

In [None]:
import subprocess
import json
import shlex

def check_video_for_audio(gcs_signed_uri: str) -> Optional[bool]:
    """
    Uses ffprobe to check if a video file referenced by a signed GCS URI
    contains at least one audio stream.

    Args:
        gcs_signed_uri: The full, signed URI for the video file.

    Returns:
        True if an audio stream is found, False otherwise.
    """
    command_args = [
        'ffprobe',
        '-v', 'error',
        '-select_streams', 'a',
        '-show_entries', 'stream=codec_type',
        '-of', 'json',
        gcs_signed_uri
    ]

    try:
        process = subprocess.run(
            command_args,
            capture_output=True,
            text=True,
            check=True  # Raise an exception for non-zero exit codes (e.g., file not found)
        )

        # If no audio is found, the JSON might still be valid but with an empty 'streams' array.
        # If the file is valid but has no audio, ffprobe should return a JSON object with "streams": []

        # Handle cases where ffprobe returns nothing (e.g., completely corrupt file)
        if not process.stdout.strip():
            logging.error("Warning: ffprobe returned empty output.")
            return None

        data = json.loads(process.stdout)

        # Check if the 'streams' list contains any elements
        if 'streams' in data and len(data['streams']) > 0:
            logging.info(f"‚úÖ Audio streams found: {len(data['streams'])}")
            return True
        else:
            logging.info("‚ùå No audio streams found. Going to remove speech transcription model.")
            return False

    except subprocess.CalledProcessError as e:
        # Handle errors like 'file not found' or other ffprobe issues
        logging.error(f"Error running ffprobe (Exit Code {e.returncode}):")
        logging.error(f"Stderr: {e.stderr.strip()}")
        return None

    except json.JSONDecodeError:
        # Handle unexpected non-JSON output from ffprobe
        logging.error("Error: Could not parse ffprobe output as JSON.")
        return None

    except Exception as e:
        logging.critical(f"An unexpected error occurred: {e}", exc_info=True)
        return None



##### Helper Function for Ingest One Video File

In [None]:
@dataclasses.dataclass
class Steps:
  """Represents the steps during ingestion."""
  gcs_file: str = ''
  asset_name: str = ''

  asset_created: bool = False
  data_schema_created: bool = False
  title_created: bool = False
  asset_uploaded: bool = False
  audio_verified: bool = False
  asset_analyzed: bool = False
  asset_indexed: bool = False


# --- Setting for speech transcription.
language_code= "en-US" # @param {type: "string"}
audio_track = 0 # @param {type: "number"}

# --- Default Search Capability Contexts ---
default_track_info=visionai_v1.VideoIntelligienceContext.SpeechTranscriptionContext.TrackInfo(
                    language_code=language_code,
                    audio_tracks=[audio_track],
                )
SPEECH_SEARCH_CAPABILITY_CONTEXT = visionai_v1.SearchCapabilityContext(
    model_garden_model_context=visionai_v1.SearchCapabilityContext.ModelGardenModelContext(
        video_intelligence_context=visionai_v1.VideoIntelligienceContext(
            speech_transcription_context=visionai_v1.VideoIntelligienceContext.SpeechTranscriptionContext(
                track_infos=[default_track_info]
            )
        ),
        model_resource_id="video-speech-transcription@001"
    )
)
TEXT_SEARCH_CAPABILITY_CONTEXT = visionai_v1.SearchCapabilityContext(
    model_garden_model_context=visionai_v1.SearchCapabilityContext.ModelGardenModelContext(
        model_resource_id="video-text-detection@001"
    )
)
DEFAULT_SEARCH_CAPABILITY_CONTEXTS = [
    SPEECH_SEARCH_CAPABILITY_CONTEXT,
    TEXT_SEARCH_CAPABILITY_CONTEXT,
]
DEFAULT_SEARCH_CAPABILITY_CONTEXTS_NO_SPEECH = [
  TEXT_SEARCH_CAPABILITY_CONTEXT,
]

def ingest_asset_robust(
    client: visionai_v1.WarehouseClient,
    parent_corpus_name: str,
    index_name: str,
    asset_gcs_uri: str
) -> Optional[Steps]:
    """
    Runs a full 7-step pipeline to ingest a single asset.

    Steps:
    1. Create Asset (placeholder)
    2. Create/Verify Data Schema (for 'title' annotation)
    3. Create Annotation (for title)
    4. Upload Asset
    5. Check whether the video has audio, removing speech model from created
       asset if no audio track.
    6. Analyze Asset (run AI models)
    7. Index Asset (make searchable)

    Args:
        client: The initialized WarehouseClient.
        parent_corpus_name: The resource name of the corpus.
        asset_gcs_uri: The "gs://" path to the media file to upload.

    Returns:
        The resource name of the newly ingested asset, or None if any step failed.
    """
    logging.info(f"--- {asset_gcs_uri}: Starting Full Asset Ingestion ---")

    s = Steps(gcs_file=asset_gcs_uri)

    # Extract filename from GCS path to use as the title
    title_value = os.path.basename(asset_gcs_uri)

    # --- Step 1: Create Asset Placeholder ---
    logging.info(f"{asset_gcs_uri}: Step 1/7: Creating asset")
    asset_name = create_asset_robust(
        client=client,
        parent_corpus_name=parent_corpus_name,
        search_capability_contexts=DEFAULT_SEARCH_CAPABILITY_CONTEXTS,
        ignore_already_exists=True  # Handle idempotency as requested
    )
    if not asset_name:
        logging.error(f"{asset_gcs_uri}: ‚ùåStep 1/7 FAILED: Could not create asset placeholder. Aborting ingestion.")
        return s
    logging.info(f"{asset_gcs_uri}: ‚úÖStep 1/7 Successful. Asset Name: {asset_name}")
    s.asset_created = True
    s.asset_name = asset_name

    # --- Step 2: Create/Verify Data Schema ---
    logging.info(f"{asset_gcs_uri}: Step 2/7: Creating/Verifying 'title' data schema...")
    # Define a simple schema that includes a "title" key of type STRING
    default_title_data_schame=visionai_v1.DataSchema(
        key="title",
        schema_details=visionai_v1.DataSchemaDetails(
            type_=visionai_v1.DataSchemaDetails.DataType.STRING,
            granularity=visionai_v1.DataSchemaDetails.Granularity.GRANULARITY_ASSET_LEVEL,
            search_strategy=visionai_v1.DataSchemaDetails.SearchStrategy(
                search_strategy_type=visionai_v1.DataSchemaDetails.SearchStrategy.SearchStrategyType.SMART_SEARCH
            )
        )
    )

    schema_status = create_data_schema_robust(
        client=client,
        parent_corpus_name=parent_corpus_name,
        data_schema=default_title_data_schame,
        ignore_already_exists=True  # Handle idempotency as requested
    )

    if not schema_status:
        logging.error(f"{asset_gcs_uri}: ‚ùåStep 2/7 FAILED: Could not create/verify data schema '{default_title_data_schame.key}'. Aborting ingestion.")
        return s
    logging.info(f"{asset_gcs_uri}: ‚úÖStep 2/7 Successful. Data schema '{default_title_data_schame.key}' status: {schema_status}")
    s.data_schema_created = True

    # --- Step 3: Create Title Annotation ---
    logging.info(f"{asset_gcs_uri}: Step 3/7: Creating title annotation...")
    title_annotation = visionai_v1.Annotation(
        name=f"{asset_name}/annotations/title",
        user_specified_annotation=visionai_v1.UserSpecifiedAnnotation(
            key="title", # This key is defined in the schema from Step 2
            value=visionai_v1.AnnotationValue(
                str_value=title_value
            )
        )
    )

    annotation_name = create_annotation_robust(
        client=client,
        parent_asset_name=asset_name,
        annotation=title_annotation,
        ignore_already_exists=True  # Handle idempotency as requested
    )

    if not annotation_name:
        logging.error(f"{asset_gcs_uri}: ‚ùåStep 3/7 FAILED: Could not create title annotation. Aborting ingestion.")
        return s
    logging.info(f"{asset_gcs_uri}: ‚úÖStep 3/7 Successful. Annotation status: {annotation_name}")
    s.title_created = True

    # --- Step 4: Upload Asset  ---
    logging.info(f"{asset_gcs_uri}: Step 4/7: Uploading asset ...")
    upload_success = upload_asset_robust_with_retry(
        client=client,
        asset_name=asset_name,
        asset_gcs_uri=asset_gcs_uri
    )
    if not upload_success:
        logging.error(f"{asset_gcs_uri}: ‚ùåStep 4/7 FAILED: Could not upload asset. Aborting ingestion.")
        return s
    logging.info(f"{asset_gcs_uri}: ‚úÖStep 4/7 Successful. Asset uploaded.")
    s.asset_uploaded = True

    # --- Step 5: Checks audio tracks ---
    logging.info(f"{asset_gcs_uri}: Step 5/7: Checking audio tracks ...")

    signed_url = generate_retrieval_url_robust(
        client=client,
        asset_name=asset_name
    )
    if signed_url:
      logging.info(f"{asset_gcs_uri}: Step 5/7 Retrieval URL: {signed_url}")
      has_audio = check_video_for_audio(signed_url)
      if has_audio is None:
        logging.error(f"{asset_gcs_uri}: ‚ùåStep 5/7 FAILED: ffprob failed. Move forward. Speech transcription model will fail.")
      else:
        if not has_audio:
          logging.info(f"{asset_gcs_uri}: Step 5/7: No audio tracks. Removing speech model ...{asset_name}")
          asset_name = update_asset_robust(
              client=client,
              asset_name=asset_name,
              search_capability_contexts=DEFAULT_SEARCH_CAPABILITY_CONTEXTS_NO_SPEECH,
          )
          if not asset_name:
            logging.error(f"{asset_gcs_uri}: ‚ùåStep 5/7 FAILED: Could not update asset. Non blocking. Speech transcription model will fail.")
          else:
            logging.info(f"{asset_gcs_uri}: ‚úÖStep 5/7 Successful update asset. Asset Name: {asset_name}")
            s.audio_verified = True
        else:
          logging.info(f"{asset_gcs_uri}: ‚úÖStep 5/7 Successful update asset. Asset Name: {asset_name}")
          s.audio_verified = True
    else:
       logging.error(f"{asset_gcs_uri}: ‚ùåStep 5/7 FAILED: Non blocking. Move forward. Speech transcription model will fail.")

    # --- Step 6: Analyze Asset ---
    logging.info(f"{asset_gcs_uri}: Step 6/7: Submitting asset for analysis...")
    analyze_success = analyze_asset_robust(
        client=client,
        asset_name=asset_name
    )
    if not analyze_success:
        logging.error(f"{asset_gcs_uri}: ‚ùåStep 6/7 FAILED: Could not analyze asset. Move forward with indexing.")
    else:
        logging.info(f"{asset_gcs_uri}: ‚úÖStep 6/7 Successful. Asset analysis complete.")
        s.asset_analyzed = True


    # --- Step 6: Index Asset ---
    logging.info("Step 7/7: Submitting asset for indexing...")
    index_success = index_asset_robust(
        client=client,
        asset_name=asset_name,
        index_name=index_name,
        index_asset_level_only=False
    )
    if not index_success:
        logging.error(f"{asset_gcs_uri}: ‚ùåStep 7/7 FAILED: Could not index asset. Aborting ingestion.")
        return s
    logging.info(f"{asset_gcs_uri}: ‚úÖStep 7/7 Successful. Asset indexed.")
    s.asset_indexed = True
    return s

#### Ingest Videos

In [None]:
executor = concurrent.futures.ThreadPoolExecutor(max_workers=8)
new_asset_futures = []
for gcs_file in file_names:
    new_asset_futures.append(
        executor.submit(
            ingest_asset_robust,
            warehouse_client,
            corpus_name,
            index_name,
            gcs_file,
        )
    )
done_or_error, _ = concurrent.futures.wait(
    new_asset_futures, return_when="ALL_COMPLETED"
)
asset_ingestion_statuses = []
for done_future in done_or_error:
    try:
        status = done_future.result()
        asset_ingestion_statuses.append(status)
        statuses = [
            status.asset_created,
            status.data_schema_created,
            status.title_created,
            status.asset_uploaded,
            status.audio_verified,
            status.asset_analyzed,
            status.asset_indexed,
        ]

        if not all(statuses):
            logging.error(f"‚ùå Ingestion failed: {status}")
        else:
            logging.info(f"‚úÖ Ingestion succeeded: {status}")
    except Exception as e:
        logging.exception(e)

### Step 3: Search

#### Predefined Utils for rendering search results

In [None]:

@dataclasses.dataclass
class SearchResult:
  """Represents a search result entry."""

  asset_name: str = ''
  start_seconds: float = 0
  end_seconds: float = 0
  signed_uri: str = ''

  html: str = ''

In [None]:
def AttachVideoHtml(results):
  top_results = []
  for r in results:
    video_html = """
    <video width="640" height="360" controls>
        <source src="{}#t={},{}" type="video/mp4">
    </video>
    """.format(r.signed_uri, r.start_seconds, r.end_seconds)
    r.html = video_html
    top_results.append(r)
  return top_results

In [None]:
def RenderLink(uri):
  return '<a href="{}">Link</a>'.format(uri)


In [None]:
def RenderResults(warehouse_client, response, top_k=5):
  results = []
  for i, item in enumerate(response):
    if i >= top_k:
      break
    r = SearchResult(
        asset_name=item.asset,
        start_seconds=item.segment.start_time.timestamp(), # Changed from .seconds
        end_seconds=item.segment.end_time.timestamp(),     # Changed from .seconds
        signed_uri=generate_retrieval_url_robust(warehouse_client, item.asset),
    )
    results.append(r)

  if not results:
    logging.info("No matched search results for this query.")
    return

  top_5 = AttachVideoHtml(
      results
  )

  df = pd.DataFrame(top_5)
  df['signed_uri'] = df['signed_uri'].apply(RenderLink)
  html_table = df.to_html(escape=False)
  display(HTML(html_table))

In [None]:

def generate_retrieval_url_robust(
    client: visionai_v1.WarehouseClient,
    asset_name: str,
) -> str:
    try:
        request = visionai_v1.GenerateRetrievalUrlRequest(
            name=asset_name,
        )

        response = client.generate_retrieval_url(request=request)
        logging.debug(f"Full response: {response}")
        return response.signed_uri

    except google_exceptions.GoogleAPICallError as e:
        logging.error(f"API error while retrieving uri: {e}")
        return False
    except Exception as e:
        logging.critical(f"An unexpected error occurred: {e}", exc_info=True)
        return False

#### Search with a query that match globally without criteria key restriction.

In [None]:
query="bikes" # @param {type:"string"}

request=visionai_v1.SearchIndexEndpointRequest(
    text_query=query,
    result_granularity=visionai_v1.SearchResultGranularity.SEARCH_RESULT_GRANULARITY_PARTITION_LEVEL,
    index_endpoint=index_endpoint_name,
    result_annotation_keys=["title"],
)

response = warehouse_client.search_index_endpoint(request=request)
print(f"Search: {response}")

In [None]:
RenderResults(warehouse_client, response, top_k=5)

#### Search that matches the text detection results

In [None]:
query="Google" # @param {type:"string"}
data_schema_key="mgm-video-text-detection-001.text" # @param {type: "string"}

request=visionai_v1.SearchIndexEndpointRequest(
    text_query=query,
    criteria=[visionai_v1.Criteria(
        field=data_schema_key,
        text_array=visionai_v1.StringArray(
            txt_values=[query],
        ),
    )],
    result_granularity=visionai_v1.SearchResultGranularity.SEARCH_RESULT_GRANULARITY_PARTITION_LEVEL,
    index_endpoint=index_endpoint_name,
    result_annotation_keys=["title"], # This helps returnning the response to include title annotation.
)

response = warehouse_client.search_index_endpoint(request=request)
print(f"Search: {response}")

In [None]:
RenderResults(warehouse_client, response, top_k=5)

#### Search that matches the speech transcription results

In [None]:
query="oh my god" # @param {type:"string"}
data_schema_key="mgm-video-speech-transcription-001.transcript" # @param {type: "string"}

request=visionai_v1.SearchIndexEndpointRequest(
    text_query=query,
    criteria=[visionai_v1.Criteria(
        field=data_schema_key,
        text_array=visionai_v1.StringArray(
            txt_values=[query],
        ),
    )],
    result_granularity=visionai_v1.SearchResultGranularity.SEARCH_RESULT_GRANULARITY_PARTITION_LEVEL,
    index_endpoint=index_endpoint_name,
    result_annotation_keys=["title"], # This helps returnning the response to include title annotation.
)

response = warehouse_client.search_index_endpoint(request=request)
print(f"Search: {response}")

In [None]:
RenderResults(warehouse_client, response, top_k=5)

### Step 4: Clean Up

#### Predefined Utils for cleaning up

In [None]:
def list_assets_robust(
    client: visionai_v1.WarehouseClient,
    parent_corpus_name: str,
) -> Optional[visionai_v1.ListAssetsResponse]:
    """
    List Assets within a corpus.
    This is a non-LRO, synchronous call.

    Args:
        client: The initialized WarehouseClient.
        parent_corpus_name: The resource name of the corpus to create the asset in.

    Returns:
        The resource name of assets under the corpus, or None if creation failed.
    """
    try:
        logging.info(f"Listing assets in corpus '{parent_corpus_name}'...")

        request = visionai_v1.ListAssetsRequest(
            parent=parent_corpus_name,
            page_size=10,
        )

        response = client.list_assets(request=request)

        return response

    except google_exceptions.GoogleAPICallError as e:
        logging.error(f"API error while listing assets: {e}")
        return None
    except Exception as e:
        logging.critical(f"An unexpected error occurred: {e}", exc_info=True)
        return None

def delete_asset_robust(
    client: visionai_v1.WarehouseClient,
    asset_name: str
)-> Optional[bool]:
    """
    Deletes an asset.
    This is a non-LRO, synchronous call.

    Args:
        client: The initialized WarehouseClient.
        asset_name: The resource name of the asset to delete.

    Returns:
        True if deletion was successful, False otherwise. None if an error occurred.
    """
    try:
      logging.info(f"Deleting assets in corpus '{asset_name}'...")
      request = visionai_v1.DeleteAssetRequest(
            name=asset_name,
      )

      operation = client.delete_asset(request=request)
      # TODO: Uncomment below after DeleteAsset populates response in LRO.
      # response = operation.result()
      return True
    except google_exceptions.GoogleAPICallError as e:
        logging.error(f"API error while deleting asset: {e}")
        return None
    except Exception as e:
        logging.critical(f"An unexpected error occurred: {e}", exc_info=True)
        return None

def undeploy_index_robust(
    client: visionai_v1.WarehouseClient,
    index_endpoint_name: str,
    timeout_seconds: int = 7200
) -> Optional[bool]:
    """
    Undeploy index.
    This is a LRO, asynchronous call.

    Args:
        client: The initialized WarehouseClient.
        index_endpoint_name: The resource name of the index endpoint to undeploy.

    Returns:
        True if undeploy was successful, False otherwise. None if an error occurred.
    """
    try:
      logging.info(f"Undeploy index for index endpoint '{index_endpoint_name}'...")
      request = visionai_v1.UndeployIndexRequest(
            index_endpoint=index_endpoint_name,
      )

      operation = client.undeploy_index(request=request)
      logging.info(f"Operation submitted. Operation name: {operation._operation.name}")
      logging.info(f"‚è≥Waiting for operation to complete (timeout: {timeout_seconds}s)...")
      response: visionai_v1.UndeployIndexResponse = operation.result(timeout=timeout_seconds)
      return True
    except TimeoutError:
        logging.error(f"Operation timed out after {timeout_seconds} seconds. "
                      "The undeployment is ongoing in the background.")
        return None
    except google_exceptions.GoogleAPICallError as e:
        logging.error(f"API error while undeploy index: {e}")
        return None
    except Exception as e:
        logging.critical(f"An unexpected error occurred: {e}", exc_info=True)
        return None


def delete_index_robust(
    client: visionai_v1.WarehouseClient,
    index_name: str
) -> Optional[bool]:
    """
    Delete index.
    This is a LRO, asynchronous call.

    Args:
        client: The initialized WarehouseClient.
        index_name: The resource name of the index to delete.

    Returns:
        True if deletion was successful, False otherwise. None if an error occurred.
    """
    try:
      logging.info(f"Delete index for index endpoint '{index_name}'...")
      request = visionai_v1.DeleteIndexRequest(
            name=index_name,
      )

      operation = client.delete_index(request=request)
      # TODO: Uncomment below after DeleteIndex populates response in LRO.
      # response = operation.result()
      return True
    except google_exceptions.GoogleAPICallError as e:
        logging.error(f"API error while deleting index: {e}")
        return None
    except Exception as e:
        logging.critical(f"An unexpected error occurred: {e}", exc_info=True)
        return None


def delete_index_endpoint_robust(
    client: visionai_v1.WarehouseClient,
    index_endpoint_name: str
) -> Optional[bool]:
    """
    Delete index endpoint.
    This is a LRO, asynchronous call.

    Args:
        client: The initialized WarehouseClient.
        index_name: The resource name of the index endpoint to delete.

    Returns:
        True if deletion was successful, False otherwise. None if an error occurred.
    """
    try:
      logging.info(f"Delete index for index endpoint '{index_endpoint_name}'...")
      request = visionai_v1.DeleteIndexEndpointRequest(
            name=index_endpoint_name,
      )

      operation = client.delete_index_endpoint(request=request)
      # TODO: Uncomment below after DeleteIndexEndpoint populates response in LRO.
      # response = operation.result()
      return True
    except google_exceptions.GoogleAPICallError as e:
        logging.error(f"API error while deleting index endpoint: {e}")
        return None
    except Exception as e:
        logging.critical(f"An unexpected error occurred: {e}", exc_info=True)
        return None


def delete_corpus_robust(
    client: visionai_v1.WarehouseClient,
    corpus_name: str
)-> Optional[bool]:
    """
    Deletes a corpus.
    This is a non-LRO, synchronous call.

    Args:
        client: The initialized WarehouseClient.
        corpus_name: The resource name of the corpus to delete.

    Returns:
        True if deletion was successful, False otherwise. None if an error occurred.
    """
    try:
      logging.info(f"Deleting corpus '{corpus_name}'...")
      request = visionai_v1.DeleteCorpusRequest(
            name=corpus_name,
      )

      response = client.delete_corpus(request=request)
      return True
    except google_exceptions.GoogleAPICallError as e:
        logging.error(f"API error while deleting asset: {e}")
        return None
    except Exception as e:
        logging.critical(f"An unexpected error occurred: {e}", exc_info=True)
        return None

#### Delete Assets

In [None]:
if delete_assets_widget.value:
    list_asset_response = list_assets_robust(warehouse_client, corpus_name)
    for i, asset in enumerate(list_asset_response):
      response = delete_asset_robust(warehouse_client, asset.name)
      if not response:
        logging.error(f"‚ùå Failed to delete asset: {asset.name}")
      else:
        logging.info(f"‚úÖ Deleted asset: {asset.name}")

#### Undeploy Index

In [None]:
if undeploy_index_widget.value:
    response = undeploy_index_robust(warehouse_client, index_endpoint_name)
    if not response:
        logging.error(f"‚ùå Failed to undeploy index: {index_endpoint_name}")
    else:
        logging.info(f"‚úÖ Undeployed index: {index_endpoint_name}")

#### Delete Index

In [None]:
if delete_index_widget.value:
    response = delete_index_robust(warehouse_client, index_name)
    if not response:
        logging.error(f"‚ùå Failed to delete index: {index_name}")
    else:
        logging.info(f"‚úÖ Deleted index: {index_name}")

#### Delete Index Endpoint

In [None]:
if delete_index_endpoint_widget.value:
    response = delete_index_endpoint_robust(warehouse_client, index_endpoint_name)
    if not response:
        logging.error(f"‚ùå Failed to delete index endpoint: {index_endpoint_name}")
    else:
        logging.info(f"‚úÖ Deleted index endpoint: {index_endpoint_name}")

#### Delete Corpus

In [None]:
if delete_corpus_widget.value:
    response = delete_corpus_robust(warehouse_client, corpus_name)
    if not response:
        logging.error(f"‚ùå Failed to delete corpus: {corpus_name}")
    else:
        logging.info(f"‚úÖ Deleted corpus: {corpus_name}")


# Part 2: More APIs Examples

This section provides additional APIs for you to interact the Vision Warehouse resources.

To see what we currently provide public: https://docs.cloud.google.com/vision-ai/docs/reference/rest

To run sections below, you need to run sections "Set Up Enviornment", "Download SDK", "Set Up Variables -> Set Your Project Id", "Provision New Or Use Existing Corpus/Index/IndexEndpoint", "Preparation".

### More on Provision

Examples of running CreateCorpus, CreateIndexEndpoint, CreateIndex, DeployIndex separately.

In [None]:
# @title CreateCorpus

# --- Default Search Capabilities (for easy reuse) ---
DEFAULT_SEARCH_CAPABILITY = visionai_v1.SearchCapability(
    type_=visionai_v1.SearchCapability.Type.DEFAULT_SEARCH,
)
SPEECH_SEARCH_CAPABILITY = visionai_v1.SearchCapability(
    type_=visionai_v1.SearchCapability.Type.MODEL_GARDEN_MODEL_POWERED_SEARCH,
    model_garden_model_powered_search_config=visionai_v1.SearchCapability.ModelGardenModelPoweredSearchConfig(
        model_resource_id="video-speech-transcription@001"
    )
)
TEXT_SEARCH_CAPABILITY = visionai_v1.SearchCapability(
    type_=visionai_v1.SearchCapability.Type.MODEL_GARDEN_MODEL_POWERED_SEARCH,
    model_garden_model_powered_search_config=visionai_v1.SearchCapability.ModelGardenModelPoweredSearchConfig(
        model_resource_id="video-text-detection@001"
    )
)
DEFAULT_VIDEO_ON_DEMAND_CAPABILITIES = [
    DEFAULT_SEARCH_CAPABILITY,
    SPEECH_SEARCH_CAPABILITY,
    TEXT_SEARCH_CAPABILITY
]

param_corpus_display_name = "python colab test corpus" # @param {type: "string"}
param_corpus_description = "python colab test corpus" # @param {type: "string"}


if 'warehouse_client' in locals() and 'PROJECT_NUMBER' in locals() and 'LOCATION_ID' in locals():
    logging.info("--- Starting New Corpus Creation ---")
    new_corpus_name = create_corpus_robust(
        client=warehouse_client,
        project_number=PROJECT_NUMBER,
        location_id=LOCATION_ID,
        display_name=param_corpus_display_name,
        description=param_corpus_description,
        search_capabilities=DEFAULT_VIDEO_ON_DEMAND_CAPABILITIES,
    )

    if new_corpus_name:
        print(f"\n‚úÖ Corpus created successfully: {new_corpus_name}")
        # You would then assign this to the global `corpus_name`
        corpus_name = new_corpus_name
    else:
        print(f"\n‚ùå Corpus creation failed. Check logs above for errors.")
else:
    logging.error("`warehouse_client`, `PROJECT_NUMBER`, or `LOCATION_ID` is not defined. "
                  "Please run the initialization cells.")



In [None]:
# @title CreateIndex
# Appriximately takes 30min - 60min

param_display_name = "python colab test" # @param {type: "string"}
param_description = "python colab test" # @param {type: "string"}

if 'warehouse_client' in locals() and 'corpus_name' in locals():
    logging.info("--- Starting New Index Creation ---")
    new_index_name = create_index_robust(
        client=warehouse_client,
        parent_corpus_name=corpus_name,
        display_name=param_display_name,
        description=param_description,
        timeout_seconds=7200  # You can adjust this
    )

    if new_index_name:
        print(f"\n‚úÖ Index created successfully: {new_index_name}")
        # You would then assign this to the global `index_name`
        index_name = new_index_name
    else:
        print(f"\n‚ùå Index creation failed. Check logs above for errors.")
else:
    logging.error("`warehouse_client` or `corpus_name` is not defined. "
                  "Please run the initialization cells.")


In [None]:
# @title CreateEndpoint
# Appriximately takes 30min - 60min

# --- Colab Parameters ---
param_display_name = "python colab test" # @param {type: "string"}
param_description = "python colab test" # @param {type: "string"


if 'warehouse_client' in locals() and 'PROJECT_NUMBER' in locals() and 'LOCATION_ID' in locals():
    logging.info("--- Starting New IndexEndpoint Creation ---")
    new_index_endpoint_name = create_index_endpoint_robust(
        client=warehouse_client,
        project_number=PROJECT_NUMBER,
        location_id=LOCATION_ID,
        display_name=param_display_name,
        description=param_description,
        timeout_seconds=3600  # You can adjust this
    )

    if new_index_endpoint_name:
        print(f"\n‚úÖ IndexEndpoint created successfully: {new_index_endpoint_name}")
        # You would then assign this to the global `index_name`
        index_endpoint_name = new_index_endpoint_name
    else:
        print(f"\n‚ùå IndexEndpoint creation failed. Check logs above for errors.")
else:
    logging.error("`warehouse_client`, `PROJECT_NUMBER`, or `LOCATION_ID` is not defined. "
                  "Please run the initialization cells.")


In [None]:
# @title DeployIndexEndpoint
# Appriximately takes 30min - 60min

if 'warehouse_client' in locals() and 'index_name' in locals() and 'index_endpoint_name' in locals():
    logging.info("--- Starting New IndexEndpoint Creation ---")
    resp = deploy_index_endpoint_robust(
        client=warehouse_client,
        index_endpoint_name=index_endpoint_name,
        index_name=index_name,
        timeout_seconds=7200  # You can adjust this
    )

    if resp:
        print(f"\n‚úÖ IndexEndpoint deployed successfully: {new_index_name}")
    else:
        print(f"\n‚ùå IndexEndpoint deployment failed. Check logs above for errors.")
else:
    logging.error("`warehouse_client`, `index_name`, or `index_endpoint_name` is not defined. "
                  "Please run the initialization cells.")


In [None]:
# @title Example for fetching operation manually
# You only need to run this if previous step is interrupted.

operation_name ="" # @param {type: "string"}


def check_operation_status(
    client: WarehouseClient,
    operation_name: str
) -> Optional[operations_pb2.Operation]:
    """
    Manually fetches the status of a long-running operation given its name.

    Args:
        client: The initialized WarehouseClient.
        operation_name: The full name of the operation
                        (e.g., "projects/.../locations/.../operations/...").

    Returns:
        The operation object itself, or None if an error occurred.
    """
    try:
        logging.info(f"Fetching status for operation: {operation_name}")

        # Make the request to get the operation
        request = operations_pb2.GetOperationRequest(name=operation_name)
        operation = client.get_operation(request=request)

        if operation.done:
            logging.info("Operation is complete.")
            if operation.error.message:
                logging.error(f"Operation failed with error: {operation.error.message}")
            else:
                logging.info("Operation completed successfully.")
                logging.debug(f"Operation response: {operation.response}")
        else:
            logging.info("Operation is still running.")
            logging.debug(f"Operation metadata: {operation.metadata}")

        return operation

    except google_exceptions.GoogleAPICallError as e:
        logging.error(f"API error while checking operation '{operation_name}': {e}")
        return None
    except Exception as e:
        logging.critical(f"An unexpected error occurred: {e}", exc_info=True)
        return None


# # Paste the operation name you got from the logs

if 'warehouse_client' in locals():
    logging.info("--- Checking Operation Status ---")
    op_status = check_operation_status(
        client=warehouse_client,
        operation_name=operation_name
    )

    if op_status:
        print(f"\n‚úÖ Status check complete. Done: {op_status.done}")
    else:
        print(f"\n‚ùå Failed to check operation status. See logs.")
else:
    logging.error("`warehouse_client` is not defined. "
                  "Please run the initialization cells.")


## More on Corpus

You can list/get Corpus/Index/IndexEndpoint

In [None]:
# @title ListCorpora
parent_name = f"projects/{PROJECT_NUMBER}/locations/us-central1"
request = visionai_v1.ListCorporaRequest(
        parent=parent_name,
    )

response = warehouse_client.list_corpora(request=request)
print(f"Corpus List: {response}")

In [None]:
# @title GetCorpus
corpus_id = "" # @param {type: "string"}
corpus_name = f"projects/{PROJECT_NUMBER}/locations/us-central1/corpora/{corpus_id}"
request = visionai_v1.GetCorpusRequest(
    name=corpus_name,
)
response = warehouse_client.get_corpus(request=request)
print(f"Corpus: {response}")

In [None]:
# @title ListIndexes
corpus_id="" # @param {type: "string"}
corpus_name = f"projects/{PROJECT_NUMBER}/locations/us-central1/corpora/{corpus_id}"
request = visionai_v1.ListIndexesRequest(
          parent=corpus_name,
      )
response = warehouse_client.list_indexes(request=request)
print(f"Index List: {response}")

In [None]:
# @title GetIndex
index_id="" # @param {type: "string"}
index_name=f"{corpus_name}/indexes/{index_id}"
request = visionai_v1.GetIndexRequest(
    name=index_name,
)
response = warehouse_client.get_index(request=request)
print(f"Index: {response}")
index_name=response.name

In [None]:
# @title ListIndexEndpoint
parent_name = f"projects/{PROJECT_NUMBER}/locations/us-central1"
request = visionai_v1.ListIndexEndpointsRequest(
          parent=parent_name,
      )
response = warehouse_client.list_index_endpoints(request=request)
print(f"IndexEndpoint List: {response}")

In [None]:
# @title GetIndexEndpoint
index_endpoint_id="" # @param {type: "string"}
index_endpoint_name=f"projects/{PROJECT_NUMBER}/locations/us-central1/indexEndpoints/{index_endpoint_id}"
request = visionai_v1.GetIndexEndpointRequest(
    name=index_endpoint_name,
)
response = warehouse_client.get_index_endpoint(request=request)
print(f"IndexEndpoint: {response}")
index_endpoint_name=response.name

## More on Assets

In [None]:
# @title GetAsset
asset_id=""  # @param {type: "string"}
asset_name=f"projects/{PROJECT_NUMBER}/locations/us-central1/corpora/{corpus_id}/assets/{asset_id}"
request = visionai_v1.GetAssetRequest(
    name=asset_name,
)
response = warehouse_client.get_asset(request=request)
print(f"Asset: {response}")

In [None]:
# @title IndexAsset
asset_id=""  # @param {type: "string"}
asset_name=f"projects/{PROJECT_NUMBER}/locations/us-central1/corpora/{corpus_id}/assets/{asset_id}"
index_id="" # @param {type: "string"}
index_name=f"{corpus_name}/indexes/{index_id}"
request = visionai_v1.IndexAssetRequest(
    name=asset_name,
    index_asset_level_only=False, # Indexing segment signals or all
    index=index_name
)
operation = warehouse_client.index_asset(request=request)
response = operation.result()
print(f"IndexAsset: {response}")

## More on Metadata

There are many features we provide on the metadata.

In [None]:
# @title ListAnnotations - ML metadata
# You can filter by default two ML keys:
# Text: mgm-video-text-detection-001
# Speech: mgm-video-speech-transcription-001

asset_id=""  # @param {type: "string"}
asset_name=f"projects/{PROJECT_NUMBER}/locations/us-central1/corpora/{corpus_id}/assets/{asset_id}"
filter_key="mgm-video-text-detection-001"

request=visionai_v1.ListAnnotationsRequest(
    parent=asset_name,
    filter=f"key={filter_key}"

)
response = warehouse_client.list_annotations(request=request)
print(f"Annotations: {response}")

In [None]:
# @title ListAnnotations - Your own metadata
# This is helpful specifically for segment metadata listing
# For example, if you have segment level data schemas defined,
# this API can help you list all segments value matches with this schema.

asset_id=""  # @param {type: "string"}
asset_name=f"projects/{PROJECT_NUMBER}/locations/us-central1/corpora/{corpus_id}/assets/{asset_id}"
filter_key="" # @param {type: "string"}

request=visionai_v1.ListAnnotationsRequest(
    parent=asset_name,
    filter=f"key={filter_key}"

)
response = warehouse_client.list_annotations(request=request)
print(f"Annotations: {response}")

### Data Schemas - Other types

Vision Warehouse supprots all types of schemas: https://docs.cloud.google.com/vision-ai/docs/reference/rest/v1alpha1/projects.locations.corpora.dataSchemas

In [None]:
# @title Data Schemas - Date Range

In [None]:
# @title Data Schema - Complex struct
# VisionWarehouse provides complex data schemas for your business needs.

### Example: Metadata creation and index

You can define more your own metadata on the assets.

--------------------------------------


| Step | Approximately time |
| :--- | :--- |
| CreateDataSchema API | Instance |
| CreateAnnotation API | Instance |
| IndexAsset API | a few minutes |





#### Define a schema at Asset Level

In [None]:

your_data_schema_key="description" # @param {type: "string"}

data_schema=visionai_v1.DataSchema(
    key=your_data_schema_key,
    schema_details=visionai_v1.DataSchemaDetails(
        type_=visionai_v1.DataSchemaDetails.DataType.STRING,
        granularity=visionai_v1.DataSchemaDetails.Granularity.GRANULARITY_ASSET_LEVEL,
        search_strategy=visionai_v1.DataSchemaDetails.SearchStrategy(
            search_strategy_type=visionai_v1.DataSchemaDetails.SearchStrategy.SearchStrategyType.SMART_SEARCH
        )
    )
)
request = visionai_v1.CreateDataSchemaRequest(
    parent=corpus_name,
    data_schema=data_schema,
)

response = warehouse_client.create_data_schema(request=request)
print(f"DataSchema: {response}")

####  Create the corresponding metadata

In [None]:
asset_id = "16995729758365416897" # @param {type: "string"}
asset_name=f"projects/{PROJECT_NUMBER}/locations/us-central1/corpora/{corpus_id}/assets/{asset_id}"
annotation_value = "a man is driving a car" # @param {type: "string"}

your_annotation = visionai_v1.Annotation(
    user_specified_annotation=visionai_v1.UserSpecifiedAnnotation(
        key=your_data_schema_key,
        value=visionai_v1.AnnotationValue(
            str_value=annotation_value
        )
    )
)
request = visionai_v1.CreateAnnotationRequest(
    parent=asset_name,
    annotation=your_annotation,
)

response = warehouse_client.create_annotation(request=request)
print(f"Annotation: {response}")

#### Create a schema at Segment Level

In [None]:
your_seg_data_schema_key="logs" # @param {type: "string"}

seg_data_schema=visionai_v1.DataSchema(
    key=your_seg_data_schema_key,
    schema_details=visionai_v1.DataSchemaDetails(
        type_=visionai_v1.DataSchemaDetails.DataType.STRING,
        granularity=visionai_v1.DataSchemaDetails.Granularity.GRANULARITY_PARTITION_LEVEL,
        search_strategy=visionai_v1.DataSchemaDetails.SearchStrategy(
            search_strategy_type=visionai_v1.DataSchemaDetails.SearchStrategy.SearchStrategyType.SMART_SEARCH
        )
    )
)
request = visionai_v1.CreateDataSchemaRequest(
    parent=corpus_name,
    data_schema=seg_data_schema,
)

response = warehouse_client.create_data_schema(request=request)
print(f"DataSchema: {response}")

#### Create the corresponding metadata

In [None]:
asset_id = "16995729758365416897" # @param {type: "string"}
asset_name=f"projects/{PROJECT_NUMBER}/locations/us-central1/corpora/{corpus_id}/assets/{asset_id}"
your_seg_data_schema_key="logs" # @param {type: "string"}
seg_annotation_value = "singing and smile" # @param {type: "string"}
start_seconds=5 # @param {type: "number"}
end_seconds=10 # @param {type: "number"}

your_seg_annotation = visionai_v1.Annotation(
    user_specified_annotation=visionai_v1.UserSpecifiedAnnotation(
        key=your_seg_data_schema_key,
        value=visionai_v1.AnnotationValue(
            str_value=seg_annotation_value
        ),
        partition=visionai_v1.Partition(
            relative_temporal_partition=visionai_v1.Partition.RelativeTemporalPartition(
              start_offset=duration_pb2.Duration(
                  seconds=start_seconds,
              ),
              end_offset=duration_pb2.Duration(
                  seconds=end_seconds,
              )
            )
        )
    )
)
request = visionai_v1.CreateAnnotationRequest(
    parent=asset_name,
    annotation=your_seg_annotation,
)

response = warehouse_client.create_annotation(request=request)
print(f"Annotation: {response}")

#### Run Indexing

In [None]:
asset_id="16995729758365416897"  # @param {type: "string"}
asset_name=f"projects/{PROJECT_NUMBER}/locations/us-central1/corpora/{corpus_id}/assets/{asset_id}"
index_id="index-5657433150604653294" # @param {type: "string"}
index_name=f"{corpus_name}/indexes/{index_id}"
request = visionai_v1.IndexAssetRequest(
    name=asset_name,
    index_asset_level_only=False, # Indexing segment signals or all
    index=index_name
)
operation = warehouse_client.index_asset(request=request)
response = operation.result()
print(f"IndexAsset: {response}")

## More on Search

Customer can create facets and hypernyms for their customized search experience.

### Facet Search

To enable facet search, you should create a bunch of facets, they should be aligned with the data schemas you created.


You can see more details in the public documentation: https://docs.cloud.google.com/vision-ai/docs/reference/rest/v1alpha1/projects.locations.corpora.searchConfigs

In [None]:
# @title Create Data Schema
your_data_schema_key="teams" # @param {type: "string"}

# The facets need to map to the data schemas you created.
# For example, if you create a data schema calls "Content-Type"
data_schema=visionai_v1.DataSchema(
    key=your_data_schema_key,
    schema_details=visionai_v1.DataSchemaDetails(
        type_=visionai_v1.DataSchemaDetails.DataType.STRING,
        granularity=visionai_v1.DataSchemaDetails.Granularity.GRANULARITY_ASSET_LEVEL,
        search_strategy=visionai_v1.DataSchemaDetails.SearchStrategy(
            search_strategy_type=visionai_v1.DataSchemaDetails.SearchStrategy.SearchStrategyType.SMART_SEARCH
        )
    )
)
request = visionai_v1.CreateDataSchemaRequest(
    parent=corpus_name,
    data_schema=data_schema,
)

response = warehouse_client.create_data_schema(request=request)
print(f"DataSchema: {response}")


In [None]:

# @title Create Annotations
# Then you create corresponding annotations

asset_id = "" # @param {type: "string"}
asset_name=f"projects/{PROJECT_NUMBER}/locations/us-central1/corpora/{corpus_id}/assets/{asset_id}"
annotation_value = "Golden" # @param {type: "string"}

your_annotation = visionai_v1.Annotation(
    user_specified_annotation=visionai_v1.UserSpecifiedAnnotation(
        key=your_data_schema_key,
        value=visionai_v1.AnnotationValue(
            str_value=annotation_value
        )
    )
)
request = visionai_v1.CreateAnnotationRequest(
    parent=asset_name,
    annotation=your_annotation,
)
response = warehouse_client.create_annotation(request=request)
print(f"Annotation: {response}")

In [None]:
# @title IndexAsset
asset_id=""  # @param {type: "string"}
asset_name=f"projects/{PROJECT_NUMBER}/locations/us-central1/corpora/{corpus_id}/assets/{asset_id}"
index_id="" # @param {type: "string"}
index_name=f"{corpus_name}/indexes/{index_id}"
request = visionai_v1.IndexAssetRequest(
    name=asset_name,
    index_asset_level_only=False, # Indexing segment signals or all
    index=index_name
)
operation = warehouse_client.index_asset(request=request)
response = operation.result()
print(f"IndexAsset: {response}")

In [None]:

# @title Create Facets - 1 dimesion
# 1 dimension facet means 1 data schema key maps to 1 facet.

corpus_id="" # @param {type: "string"}
corpus_name = f"projects/{PROJECT_NUMBER}/locations/us-central1/corpora/{corpus_id}"
request=visionai_v1.CreateSearchConfigRequest(
    parent=corpus_name,
    search_config_id="teams",
    search_config=visionai_v1.SearchConfig(
        facet_property=visionai_v1.FacetProperty(
            mapped_fields=["teams"],
            display_name="teams",
            result_size=2,
            bucket_type=visionai_v1.FacetBucketType.FACET_BUCKET_TYPE_VALUE,
        )
    )
)
response = warehouse_client.create_search_config(request=request)
print(f"SearchConfig: {response}")

In [None]:
# @title Search with facet selected
query="video0" # @param {type:"string"}

request=visionai_v1.SearchIndexEndpointRequest(
    text_query=query,
    result_granularity=visionai_v1.SearchResultGranularity.SEARCH_RESULT_GRANULARITY_ASSET_LEVEL,
    index_endpoint=index_endpoint_name,
    result_annotation_keys=["title", "teams"],
    facet_selections=[visionai_v1.FacetGroup(
        facet_id="teams",
        buckets=[visionai_v1.FacetBucket(
            value=visionai_v1.FacetValue(
                string_value="Golden"
            ),
            selected=True,
        )],
        bucket_type=visionai_v1.FacetBucketType.FACET_BUCKET_TYPE_VALUE,
    )]
)

response = warehouse_client.search_index_endpoint(request=request)
print(f"Search: {response}")

In [None]:

# @title Create Facets - n dimesion
# 1 dimension facet means n data schema key maps to 1 facet.
# For example, you can have data schema as home_team, away_team, and map to teams.

corpus_id="" # @param {type: "string"}
corpus_name = f"projects/{PROJECT_NUMBER}/locations/us-central1/corpora/{corpus_id}"
request=visionai_v1.CreateSearchConfigRequest(
    parent=corpus_name,
    search_config_id="teams", # make sure you update the id.
    search_config=visionai_v1.SearchConfig(
        facet_property=visionai_v1.FacetProperty(
            mapped_fields=["home_team", "away_team"], # make sure you precreate them.
            display_name="teams",
            result_size=2,
            bucket_type=visionai_v1.FacetBucketType.FACET_BUCKET_TYPE_VALUE,
        )
    )
)
response = warehouse_client.create_search_config(request=request)
print(f"SearchConfig: {response}")

### Hypernyms Search

You can create similar words to help search. For example, some abbreviation to map to the full name.

In [None]:
# @title Create hypernyms

# The idea is that later you can search the hypernym in the query,
# instead of hypernym_1, hypernyms_2 which are actually metadata.
# For example, there is annotation value as "a man is dring car",
# After I create a hypernyms "spaceship" mapping to "car",
# Next time when I search "spaceship", it will return the annotation which
# include "car".

hypernym="spaceship" # @param {type:"string"}
hypernym_1="truck" # @param {type:"string"}
hypernym_2="car" # @param {type:"string"}

corpus_id="" # @param {type: "string"}
corpus_name = f"projects/{PROJECT_NUMBER}/locations/us-central1/corpora/{corpus_id}"
request=visionai_v1.CreateSearchHypernymRequest(
    parent=corpus_name,
    search_hypernym=visionai_v1.SearchHypernym(
        hypernym=hypernym,
        hyponyms=[hypernym_1, hypernym_2]
    )
)
response = warehouse_client.create_search_hypernym(request=request)
print(f"SearchHypernyms: {response}")

In [None]:
# @title Search with hypernyms
query="spaceship" # @param {type:"string"}

request=visionai_v1.SearchIndexEndpointRequest(
    text_query=query,
    result_granularity=visionai_v1.SearchResultGranularity.SEARCH_RESULT_GRANULARITY_PARTITION_LEVEL,
    index_endpoint=index_endpoint_name,
    result_annotation_keys=["title"],
)

response = warehouse_client.search_index_endpoint(request=request)
print(f"Search: {response}")