# How To Create a Vertex Search AI Data Store Using Python

This notebook outlines how to create an ustructured data store in Vertex AI Search. In this example we will create a Vertex AI Search data store and create some documents from a GCS bucket. After creating the data store, we will perform a query and optionally pass the query and the results to Gemini-Pro to produce the final answer.

## Prepare the python development environment

First, let's identify any project specific variables to customize this notebook to your GCP environment. Change YOUR_PROJECT_ID with your own GCP project ID.

In [None]:
PROJECT_ID = 'rkiles-demo-host-vpc'
REGION = 'us-central1'
LOCATION = 'global'
GCS_BUCKET = 'rkiles-test'
GCS_FOLDER = 'testing'
DSNAME = "manh-test002" 
DSENGINE = 'manh-test002'

Install any needed python modules from our requirements.txt file. Most Vertex Workbench environments include all the packages we'll be using, but if you are using an external Jupyter Notebook or require any additional packages for your own needs, you can simply add them to the included requirements.txt file an run the folloiwng commands.

In [None]:
#pip install -r requirements.txt

Now we will import all required modules. For our purpose, we will be utilizing the following:

- vertexai - Provides authentication access to the Google API's, such as imagegeneration:predict
- vertexai.preview.generative_models - Interact with new multimodal models
- base64 - Imagen API requests return generated or edited images as base64-encoded strings. This module will help us decode this data to an image file
- json - Python module used to interact with JSON data. Imagen returns results in json format.

In [None]:
import vertexai
from vertexai.preview.generative_models import GenerativeModel, Part

from google.cloud import aiplatform
from google.cloud import aiplatform_v1beta1 as vertex_ai

from google.cloud import storage

from typing import List
from google.api_core.client_options import ClientOptions

from google.cloud import discoveryengine_v1beta as discoveryengine

import base64
import json
import re
import mimetypes

## Instantiate Vertex AI ojbect

Define the functions that will be used in this notebook.

Define a function to build a python list of files located within a GCS bucket and subfolder. This list is recursive.

In [None]:
def get_files(bucket_name, prefix):
    """Lists all files in a Google Cloud Storage bucket recursively.

    Args:
        bucket_name: The name of the GCS bucket.
        prefix: (Optional) Filter results to files prefixed by this value.

    Returns:
        A list of file names within the bucket and all subfolders.
    """

    storage_client = storage.Client()
    all_files = []
    
    for blob in storage_client.list_blobs(bucket_name, prefix=prefix):
        if blob.name.endswith("/"):  # It's a folder
            print('Starting recursive import for folder: ' + blob.name)
        else:
            all_files.append(blob.name)

    return all_files

Define a function to create a new VAIS Data Store 

In [None]:
def create_vais_data_store(project_id, location, display_name):
    # Create a client
    client = discoveryengine.DataStoreServiceClient()

    # Initialize request argument(s)
    data_store = discoveryengine.DataStore()
    data_store.display_name = display_name
    data_store.industry_vertical = 'GENERIC'
    data_store.content_config = 'CONTENT_REQUIRED'

    request = discoveryengine.CreateDataStoreRequest(
        #parent=f"projects/{project_id}/locations/{location}",
        parent=f"projects/{project_id}/locations/{location}/collections/default_collection",
        data_store=data_store,
        data_store_id = display_name
    )

    # Make the request
    operation = client.create_data_store(request=request)

    print("Waiting for operation to complete...")

    response = operation.result()

    # Handle the response
    print(response)


Define a function to add a file to an unstructured data store

In [None]:
def text_create_document(project_id, location, datastore_name, document_id, file_uri, mime_type, customer_id, extension_id, category):
    # Create a client
    client = discoveryengine.DocumentServiceClient()
    
    # Create the Document object
    document = discoveryengine.Document(
        content=discoveryengine.Document.Content(
            mime_type=mime_type,
            #raw_bytes=text_content,
            uri=file_uri,
        ),
        json_data='{"customer_id":"'+customer_id+'", "extension_id":"'+extension_id+'", "category": "'+category+'"}'
    )

    # Initialize request argument(s)
    request = discoveryengine.CreateDocumentRequest(
        parent=f"projects/{project_id}/locations/{location}/collections/default_collection/dataStores/{datastore_name}/branches/0",
        document=document,
        document_id=document_id,
    )

    # Make the request
    response = client.create_document(request=request)

    # Handle the response
    print(response)

Define a function to search the data store based on the information from the user prompt

In [None]:
def search_sample(project_id, location, engine_id, search_query, search_filter='') -> List[discoveryengine.SearchResponse]:

    client_options = (
        ClientOptions(api_endpoint=f"{location}-discoveryengine.googleapis.com")
        if location != "global"
        else None
    )
    
    # Create a client
    client = discoveryengine.SearchServiceClient(client_options=client_options)
    
    # The full resource name of the search app serving config
    serving_config = f"projects/{project_id}/locations/{location}/collections/default_collection/engines/{engine_id}/servingConfigs/default_config"
    
    
    content_search_spec = discoveryengine.SearchRequest.ContentSearchSpec(
        # For information about snippets, refer to:
        # https://cloud.google.com/generative-ai-app-builder/docs/snippets
        snippet_spec=discoveryengine.SearchRequest.ContentSearchSpec.SnippetSpec(
            return_snippet=True
            ),
        # For information about search summaries, refer to:
        # https://cloud.google.com/generative-ai-app-builder/docs/get-search-summaries
        summary_spec=discoveryengine.SearchRequest.ContentSearchSpec.SummarySpec(
            summary_result_count=5,
            include_citations=True,
            ignore_adversarial_query=True,
            ignore_non_summary_seeking_query=True,
            model_prompt_spec=discoveryengine.SearchRequest.ContentSearchSpec.SummarySpec.ModelPromptSpec(
                preamble="Do not include any information related to licensing or usage restrictions"
            ),
            model_spec=discoveryengine.SearchRequest.ContentSearchSpec.SummarySpec.ModelSpec(
                version="gemini-1.5-flash-001/answer_gen/v1",
            ),
        ),
    )
    
    request = discoveryengine.SearchRequest(
        serving_config=serving_config,
        query=search_query,
        filter=search_filter,
        #page_size=1,
        content_search_spec=content_search_spec,
        query_expansion_spec=discoveryengine.SearchRequest.QueryExpansionSpec(
            condition=discoveryengine.SearchRequest.QueryExpansionSpec.Condition.AUTO,
        ),
        spell_correction_spec=discoveryengine.SearchRequest.SpellCorrectionSpec(
            mode=discoveryengine.SearchRequest.SpellCorrectionSpec.Mode.AUTO
        ),
    )

    response = client.search(request)
    #print(response)

    return response

Define a function to extract the customer ID, extension ID and categorization from the folder structure

In [None]:
def extract_customer_and_extension(file_path):
    """Extracts customer code, extension, and file type from a GCS file path.

    Args:
        file_path: The GCS file path.

    Returns:
        A tuple containing (customer_id, ext_id, file_category).
    """

    # Use a regular expression to extract the customer code and extension
    match = re.search(r"testing/(.*?)/(\w+)/", file_path)
    #match = re.search(r"testing/(.*?)/(\w+)(/.*)?/", file_path)

    if match:
        customer_id = match.group(1)
        ext_id = match.group(2)

        # Categorize the file type
        if re.search(r".*/extension-packs-templates/.*", file_path):
            file_category = "template"
        elif re.search(r".*/extension-packs/.*", file_path):
            file_category = "extension"
        elif re.search(r".*/extension-packs-extended/.*", file_path):
            file_category = "extended"
        elif file_path.endswith(".docx"):
            file_category = "manifest"
        else:
            file_category = "other"

        return customer_id, ext_id, file_category

    return None, None, None

Create a VAIS Data Store

In [None]:
#create_and_upload_datastore(PROJECT_ID, LOCATION, DSNAME, qa_response, qa_schema)
create_vais_data_store(PROJECT_ID, LOCATION, DSNAME)

Build the python list of all the files in the bucket / parent subfolder

In [None]:
all_files = get_files(GCS_BUCKET, GCS_FOLDER)
#print(all_files)

In [None]:
for file in all_files:
    print(file)

In [None]:
doc_id = 0

for file in all_files:
    if re.search("\\.DS_Store", file):
        continue
    try:
        # set the document ID
        doc_id = doc_id + 1
        
        # Identify the mime type of the file
        mime_type = mimetypes.guess_type(file)[0]
        if re.search("\\.vm$", file):
            mime_type = "text/plain"
        if re.search("\\.md$", file):
            mime_type = "text/plain"
        
        # Set the file_uri
        file_uri = 'gs://'+GCS_BUCKET+'/'+file
        
        # Extract the customer ID and ext ID
        customer_id, ext_id, category = extract_customer_and_extension(file)
        
        # Create the document in the VAIS data store
        text_create_document(PROJECT_ID, LOCATION, DSNAME, str(doc_id), file_uri, mime_type, customer_id, ext_id, category)
        print('Created '+file_uri+' with mime type '+mime_type+' and doc ID '+str(doc_id))
        
    except TypeError:
        print(f"Error: Unknown MIME type for file {file} - Skipping...")

In [None]:
search_prompt = 'Tell me more about this extension'
search_filter = 'extension_id: ANY(\"EX08\")'

In [None]:
#search_result = search_sample(PROJECT_ID, LOCATION, DSENGINE, search_prompt)
search_result = search_sample(PROJECT_ID, LOCATION, DSENGINE, search_prompt, search_filter)

#print(search_result)

In [None]:
print(search_result.summary.summary_text)

In [None]:
#-- Print just the first returned document --#
#print(search_result.results[0].document.derived_struct_data['link'])

#-- Print the full list of returned documents --#
for i in search_result.results:
    print(i.document.derived_struct_data['link'])