# Vertex AI Search Agent Builder

## Pre-requisite Setup

### Enable APIs and service account permissions

Enable the Vertex AI Search API:
```
gcloud services enable discoveryengine.googleapis.com
```
Enable the Enterprise Knowledge Graph API:
```
gcloud services enable enterpriseknowledgegraph.googleapis.com
```
Enable Cloud Run:
```
gcloud services enable run.googleapis.com
```
Give the Cloud Run service account required permissions:
```
gcloud projects add-iam-policy-binding [PROJECT_ID or PROJECT_NUMBER] --member='serviceAccount:[PROJECT_NUMBER]-compute@developer.gserviceaccount.com' --role='roles/discoveryengine.viewer'
```

### Install Dependencies and set variables

In [None]:
! pip3 install -q google-cloud-discoveryengine

In [None]:
import os
import socket
import re
from google.api_core.client_options import ClientOptions
from google.cloud import discoveryengine

# Cloud project id.
PROJECT_IDS = !(gcloud config get-value core/project)
PROJECT_ID = PROJECT_IDS[0]
LOCATION = "global"

UNIQUE_PREFIX = socket.gethostname()
DATASTORE_NAME = re.sub('-', '_', UNIQUE_PREFIX)
APP_NAME = re.sub('_', '-', UNIQUE_PREFIX)
DATASTORE_ID = f"{DATASTORE_NAME}_datastore"

## Setting up Agent Builder

### Creating a Datastore

Data store's ingest data for your search app, including scraping websites, bigquery, google drive etc. ([reference](https://cloud.google.com/generative-ai-app-builder/docs/create-data-store-es))

In [None]:
def create_data_store(
    project_id: str, location: str, data_store_name: str, data_store_id: str
):
    # Create a client
    client_options = (
        ClientOptions(api_endpoint=f"{location}-discoveryengine.googleapis.com")
        if location != "global"
        else None
    )
    client = discoveryengine.DataStoreServiceClient(client_options=client_options)

    # Initialize request argument(s)
    data_store = discoveryengine.DataStore(
        display_name=data_store_name,
        industry_vertical="GENERIC",
        content_config="CONTENT_REQUIRED",
    )

    request = discoveryengine.CreateDataStoreRequest(
        parent=discoveryengine.DataStoreServiceClient.collection_path(
            project_id, location, "default_collection"
        ),
        data_store=data_store,
        data_store_id=data_store_id,
    )
    operation = client.create_data_store(request=request)

    # Make the request
    # The try block is necessary to prevent execution from halting due to an error being thrown when the datastore takes a while to instantiate
    try:
        response = operation.result(timeout=90)
    except:
        print("long-running operation")

In [None]:
try:
    create_data_store(PROJECT_ID, LOCATION, DATASTORE_NAME, DATASTORE_ID)
except Exception as e:
    print("Ignore if datastore already exists, if first time, delete existing engine or change name")
    print(f"Error: {e}")

### Import Documents (Cloud Storage)

In [None]:
def import_documents(
    project_id: str,
    location: str,
    data_store_id: str,
    gcs_uri: str,
):
    # Create a client
    client_options = (
        ClientOptions(api_endpoint=f"{location}-discoveryengine.googleapis.com")
        if location != "global"
        else None
    )
    client = discoveryengine.DocumentServiceClient(client_options=client_options)

    # The full resource name of the search engine branch.
    # e.g. projects/{project}/locations/{location}/dataStores/{data_store_id}/branches/{branch}
    parent = client.branch_path(
        project=project_id,
        location=location,
        data_store=data_store_id,
        branch="default_branch",
    )

    source_documents = [f"{gcs_uri}/*"]

    request = discoveryengine.ImportDocumentsRequest(
        parent=parent,
        gcs_source=discoveryengine.GcsSource(
            input_uris=source_documents, data_schema="content"
        ),
        # Options: `FULL`, `INCREMENTAL`
        reconciliation_mode=discoveryengine.ImportDocumentsRequest.ReconciliationMode.INCREMENTAL,
    )

    # Make the request
    operation = client.import_documents(request=request)

    response = operation.result()

    # Once the operation is complete,
    # get information from operation metadata
    metadata = discoveryengine.ImportDocumentsMetadata(operation.metadata)

    # Handle the response
    return operation.operation.name

In [None]:
source_documents_gs_uri = (
    "gs://cloud-samples-data/gen-app-builder/search/alphabet-investor-pdfs"
)
try:
    import_documents(PROJECT_ID, LOCATION, DATASTORE_ID, source_documents_gs_uri)
except Exception as e:
    print(f"Error: {e}")

### Creating a Search Engine

In [None]:
def create_engine(
    project_id: str, location: str, data_store_name: str, data_store_id: str
):
    # Create a client
    client_options = (
        ClientOptions(api_endpoint=f"{location}-discoveryengine.googleapis.com")
        if location != "global"
        else None
    )
    client = discoveryengine.EngineServiceClient(client_options=client_options)

    # Initialize request argument(s)
    config = discoveryengine.Engine.SearchEngineConfig(
        search_tier="SEARCH_TIER_ENTERPRISE", search_add_ons=["SEARCH_ADD_ON_LLM"]
    )

    engine = discoveryengine.Engine(
        display_name=data_store_name,
        solution_type="SOLUTION_TYPE_SEARCH",
        industry_vertical="GENERIC",
        data_store_ids=[data_store_id],
        search_engine_config=config,
    )

    request = discoveryengine.CreateEngineRequest(
        parent=discoveryengine.DataStoreServiceClient.collection_path(
            project_id, location, "default_collection"
        ),
        engine=engine,
        engine_id=engine.display_name,
    )

    # Make the request
    operation = client.create_engine(request=request)
    response = operation.result(timeout=90)

In [None]:
try:
    create_engine(PROJECT_ID, LOCATION, APP_NAME, DATASTORE_ID)
except Exception as e:
    print("Ignore if engine already exists, if first time, delete existing engine or change name")
    print(f"Error: {e}")

### Query Datastore (Search)

In [None]:
def search_sample(
    project_id: str,
    location: str,
    data_store_id: str,
    search_query: str,
) -> list[discoveryengine.SearchResponse]:
    #  For more information, refer to:
    # https://cloud.google.com/generative-ai-app-builder/docs/locations#specify_a_multi-region_for_your_data_store
    client_options = (
        ClientOptions(api_endpoint=f"{location}-discoveryengine.googleapis.com")
        if LOCATION != "global"
        else None
    )

    # Create a client
    client = discoveryengine.SearchServiceClient(client_options=client_options)

    # The full resource name of the search engine serving config
    # e.g. projects/{project_id}/locations/{location}/dataStores/{data_store_id}/servingConfigs/{serving_config_id}
    serving_config = client.serving_config_path(
        project=project_id,
        location=location,
        data_store=data_store_id,
        serving_config="default_config",
    )

    # Optional: Configuration options for search
    # Refer to the `ContentSearchSpec` reference for all supported fields:
    # https://cloud.google.com/python/docs/reference/discoveryengine/latest/google.cloud.discoveryengine_v1.types.SearchRequest.ContentSearchSpec
    content_search_spec = discoveryengine.SearchRequest.ContentSearchSpec(
        # For information about snippets, refer to:
        # https://cloud.google.com/generative-ai-app-builder/docs/snippets
        snippet_spec=discoveryengine.SearchRequest.ContentSearchSpec.SnippetSpec(
            return_snippet=True
        ),
        # For information about search summaries, refer to:
        # https://cloud.google.com/generative-ai-app-builder/docs/get-search-summaries
        summary_spec=discoveryengine.SearchRequest.ContentSearchSpec.SummarySpec(
            summary_result_count=5,
            include_citations=True,
            ignore_adversarial_query=True,
            ignore_non_summary_seeking_query=True,
        ),
    )

    # Refer to the `SearchRequest` reference for all supported fields:
    # https://cloud.google.com/python/docs/reference/discoveryengine/latest/google.cloud.discoveryengine_v1.types.SearchRequest
    request = discoveryengine.SearchRequest(
        serving_config=serving_config,
        query=search_query,
        page_size=10,
        content_search_spec=content_search_spec,
        query_expansion_spec=discoveryengine.SearchRequest.QueryExpansionSpec(
            condition=discoveryengine.SearchRequest.QueryExpansionSpec.Condition.AUTO,
        ),
        spell_correction_spec=discoveryengine.SearchRequest.SpellCorrectionSpec(
            mode=discoveryengine.SearchRequest.SpellCorrectionSpec.Mode.AUTO
        ),
    )

    response = client.search(request)
    return response

In [None]:
query = "Who is the CEO of Google?"

print(search_sample(PROJECT_ID, LOCATION, DATASTORE_ID, query).summary.summary_text)

### Query Datastore (Multi-turn) 

In [None]:
from typing import List

from google.api_core.client_options import ClientOptions
from google.cloud import discoveryengine_v1 as discoveryengine

def multi_turn_search_sample(
    project_id: str,
    location: str,
    data_store_id: str,
    search_queries: List[str],
) -> List[discoveryengine.ConverseConversationResponse]:
    #  For more information, refer to:
    # https://cloud.google.com/generative-ai-app-builder/docs/locations#specify_a_multi-region_for_your_data_store
    client_options = (
        ClientOptions(api_endpoint=f"{location}-discoveryengine.googleapis.com")
        if location != "global"
        else None
    )

    # Create a client
    client = discoveryengine.ConversationalSearchServiceClient(
        client_options=client_options
    )

    # Initialize Multi-Turn Session
    conversation = client.create_conversation(
        # The full resource name of the data store
        # e.g. projects/{project_id}/locations/{location}/dataStores/{data_store_id}
        parent=client.data_store_path(
            project=project_id, location=location, data_store=data_store_id
        ),
        conversation=discoveryengine.Conversation(),
    )


    for search_query in search_queries:
        # Add new message to session
        request = discoveryengine.ConverseConversationRequest(
            name=conversation.name,
            query=discoveryengine.TextInput(input=search_query),
            serving_config=client.serving_config_path(
                project=project_id,
                location=location,
                data_store=data_store_id,
                serving_config="default_config",
            ),
            # Options for the returned summary
            summary_spec=discoveryengine.SearchRequest.ContentSearchSpec.SummarySpec(
                # Number of results to include in summary
                summary_result_count=3,
                include_citations=True,
            ),
        )
        response = client.converse_conversation(request)

        print(f"Reply: {response.reply.summary.summary_text}\n")

        for i, result in enumerate(response.search_results, 1):
            result_data = result.document.derived_struct_data
            print(f"[{i}]")
            print(f"Link: {result_data['link']}")
            print(f"First Snippet: {result_data['snippets'][0]['snippet']}")
            print(
                "First Extractive Answer: \n"
                f"\tPage: {result_data['extractive_answers'][0]['pageNumber']}\n"
                f"\tContent: {result_data['extractive_answers'][0]['content']}\n\n"
            )
        print("\n\n")

In [None]:
query = [
    "Who is the CEO of Google?", 
    "Where is their office located"
]

print(multi_turn_search_sample(PROJECT_ID, LOCATION, DATASTORE_ID, query))

## Streamlit

### Install dependencies for Streamlit app

In [None]:
!pip3 install -q streamlit
!pip3 install -q python-dotenv
!pip3 install -q google-cloud-aiplatform
!pip3 install -q google-generativeai

! mkdir pages

Generate requirements txt for later deployment

In [None]:
%%writefile requirements.txt
streamlit
python-dotenv
google-generativeai
google-cloud-aiplatform
google-cloud-discoveryengine

### Replace the following below:
ai-sb-test with the project id and renzo_test_1_datastore with the data store id

In [None]:
print(PROJECT_ID)
print(DATASTORE_ID)

In [None]:
%%writefile Dockerfile

FROM python:3.10

EXPOSE 8080
WORKDIR /app

COPY . ./

RUN pip install -r requirements.txt

ENTRYPOINT ["streamlit", "run", "Search.py", "ai-sb-test", "renzo_test_1_datastore", "--server.port=8080", "--server.address=0.0.0.0"]

### Create main Search page for app

In [None]:
%%writefile Search.py

import os
import sys
from dotenv import load_dotenv
import streamlit as st
import vertexai

from typing import List

from google.api_core.client_options import ClientOptions
from google.cloud import discoveryengine_v1 as discoveryengine
from vertexai.preview.generative_models import (
    GenerationConfig,
    GenerativeModel,
    Tool,
    grounding,
)

PROJECT_ID = sys.argv[1]
LOCATION = "global"
DATASTORE_ID = sys.argv[2]

def search_sample(
    project_id: str,
    location: str,
    data_store_id: str,
    search_query: str,
    no_results: int ,
    no_snippets: int,
    no_extract: int,
    no_extract_seg: int,
    no_top_results: int
) -> list[discoveryengine.SearchResponse]:
    #  For more information, refer to:
    # https://cloud.google.com/generative-ai-app-builder/docs/locations#specify_a_multi-region_for_your_data_store
    client_options = (
        ClientOptions(api_endpoint=f"{location}-discoveryengine.googleapis.com")
        if LOCATION != "global"
        else None
    )

    # Create a client
    client = discoveryengine.SearchServiceClient(client_options=client_options)

    # The full resource name of the search engine serving config
    # e.g. projects/{project_id}/locations/{location}/dataStores/{data_store_id}/servingConfigs/{serving_config_id}
    serving_config = client.serving_config_path(
        project=project_id,
        location=location,
        data_store=data_store_id,
        serving_config="default_config",
    )

    # Optional: Configuration options for search
    # Refer to the `ContentSearchSpec` reference for all supported fields:
    # https://cloud.google.com/python/docs/reference/discoveryengine/latest/google.cloud.discoveryengine_v1.types.SearchRequest.ContentSearchSpec
    content_search_spec = {
        'snippet_spec': {'return_snippet': True if no_snippets == 1 else False},
        'extractive_content_spec': {
            'max_extractive_answer_count': no_extract,
            'max_extractive_segment_count': no_extract_seg,
            'return_extractive_segment_score': True,
        },
        'summary_spec': {
            'summary_result_count': no_top_results,
            'include_citations': True,
        },
    }

    # Refer to the `SearchRequest` reference for all supported fields:
    # https://cloud.google.com/python/docs/reference/discoveryengine/latest/google.cloud.discoveryengine_v1.types.SearchRequest
    request = discoveryengine.SearchRequest(
        serving_config=serving_config,
        query=search_query,
        page_size=no_results,
        content_search_spec=content_search_spec,
        query_expansion_spec=discoveryengine.SearchRequest.QueryExpansionSpec(
            condition=discoveryengine.SearchRequest.QueryExpansionSpec.Condition.AUTO,
        ),
        spell_correction_spec=discoveryengine.SearchRequest.SpellCorrectionSpec(
            mode=discoveryengine.SearchRequest.SpellCorrectionSpec.Mode.AUTO
        ),
    )

    response = client.search(request)
    return response

def llm_prompt(
    project_id: str,
    location: str,
    data_store_id: str,
    llm_model: str,
    prompt: str,
    temp,
    top_k,
    top_p
):
    vertexai.init(project=project_id, location="us-central1")

    model = GenerativeModel(llm_model)
    tool = Tool.from_retrieval(
        grounding.Retrieval(
            grounding.VertexAISearch(
                datastore=data_store_id,
                project=project_id,
                location="global",
            )
        )
    )

    response = model.generate_content(
        prompt,
        tools=[tool],
        generation_config=GenerationConfig(
            temperature=temp,
            top_p=top_p,
            top_k=top_k
        ),
    )

    return(response.text)

load_dotenv()

def main():
# --- App layout ---
    st.set_page_config(page_title="Search")
    st.title('Search')  
    
    with st.sidebar:
        st.title('Configurations')
        llm_model = st.selectbox(
            "LLM Model",
            ("Vertex AI Search(default)", "gemini-1.5-flash-001", "gemini-1.5-pro-001"),
        )
        if llm_model is not "Vertex AI Search(default)":
            st.divider()
            temp = st.number_input("Temp", 0.0, 1.0, 0.0)
            top_k = st.number_input("Top K", 0, 100, 40)
            top_p = st.number_input("Top P", 0.0, 1.0, 0.95)
        st.divider()
        no_results = st.slider("Results per Page", 1, 10, 10)
        no_snippets = st.slider("Snippets per Result", 0, 1, 1)
        no_extract = st.slider("Results per Page", 0, 5, 3)
        no_extract_seg = st.slider("Results per Page", 1, 10, 3)
        no_top_results = st.slider("Results per Page", 1, 5, 5)
        
    search_prompt = st.text_input("Search Internal Documents", value="what is the revenue for 2024?")
    if st.button("Generate"):
        search_response = search_sample(PROJECT_ID, LOCATION, DATASTORE_ID, search_prompt, no_results, no_snippets, no_extract, no_extract_seg, no_top_results)
        st.subheader("AI Answer")
        
        container = st.container(border=True)
        if llm_model is "Vertex AI Search(default)":
            container.write(search_response.summary.summary_text)
        else:
            container.write(llm_prompt(PROJECT_ID, LOCATION, DATASTORE_ID, llm_model, search_prompt, temp, top_k, top_p))
            
        st.subheader("Results")
        for result in search_response.results:
            container = st.container(border=True)
            # st.link_button(result.document.derived_struct_data['title'], result.document.derived_struct_data['link'])
            container.subheader(result.document.derived_struct_data['title'])
            for extract in result.document.derived_struct_data['extractive_answers']:
                container.write(f"Page {extract['pageNumber']}")
                container.write(extract['content']) 

# --- End of App layout ---

if __name__ == "__main__":
    main()

### Create Multi-turn page for app

In [None]:
%%writefile pages/Multi-Turn.py

import streamlit as st
from typing import List
from dotenv import load_dotenv
from google.api_core.client_options import ClientOptions
from google.cloud import discoveryengine_v1 as discoveryengine
from google.cloud import discoveryengine_v1alpha

import os
import sys

load_dotenv()

PROJECT_ID = sys.argv[1]
LOCATION = "global"
DATASTORE_ID = sys.argv[2]

st.set_page_config(page_title="Multi-Turn")
st.title('Multi-Turn')

col1, col2 = st.columns(2, gap="medium")

# Create a client
client_options = (
    ClientOptions(api_endpoint=f"{LOCATION}-discoveryengine.googleapis.com")
    if LOCATION != "global"
    else None
)
client = discoveryengine.ConversationalSearchServiceClient(
    client_options=client_options
)

# Initialize Multi-Turn Session
conversation = client.create_conversation(
    # The full resource name of the data store
    # e.g. projects/{project_id}/locations/{location}/dataStores/{data_store_id}
    parent=client.data_store_path(
        project=PROJECT_ID, location=LOCATION, data_store=DATASTORE_ID
    ),
    conversation=discoveryengine.Conversation(),
)

with col1:
    st.subheader("Multi-turn Search")
    prompt = st.chat_input("Ask Follow-up Questions")
    
with col2:
    st.subheader("Reference Results")
    
if prompt:
    request = discoveryengine.ConverseConversationRequest(
        name=conversation.name,
        query=discoveryengine.TextInput(input=prompt),
        serving_config=client.serving_config_path(
            project=PROJECT_ID,
            location=LOCATION,
            data_store=DATASTORE_ID,
            serving_config="default_config",
        ),
        # Options for the returned summary
        summary_spec=discoveryengine.SearchRequest.ContentSearchSpec.SummarySpec(
            # Number of results to include in summary
            summary_result_count=3,
            include_citations=True,
        ),
    )
    response = client.converse_conversation(request)

    with col1:
        with st.chat_message("user"):
            st.write(response.conversation.messages[0].user_input.input)
        with st.chat_message("assistant"):
            st.write(response.conversation.messages[1].reply.summary.summary_text)

    with col2:
        for i, result in enumerate(response.search_results, 1):
            container = st.container(border=True)
            result_data = result.document.derived_struct_data
            container.write(f"[{i}]")
            container.markdown(f"[{result_data['link'].split('/')[-1]}]({result_data['link']})")
            container.markdown("Snippet: ")
            container.markdown(result_data['snippets'][0]['snippet'])
            container.markdown("Extractive Answers:")
            container.markdown(f"    Page: {result_data['extractive_answers'][0]['pageNumber']}")
            container.markdown(f"    {result_data['extractive_answers'][0]['content']}")

### Run Streamlit app locally

To Access your app, go to the **External URL** link

In [None]:
! python3 -m streamlit run Search.py {PROJECT_ID} {DATASTORE_ID}

## Creating Search App

### Deploy Cloud Run function (Uncomment to run)

In [None]:
# LOCATION = 'asia-southeast1'
# ! gcloud artifacts repositories create {APP_NAME}-repo --location={LOCATION} --repository-format=Docker

In [None]:
# ! gcloud auth configure-docker {LOCATION}-docker.pkg.dev -q

In [None]:
# ! gcloud builds submit --tag {LOCATION}-docker.pkg.dev/{PROJECT_ID}/{APP_NAME}-repo/{APP_NAME}

In [None]:
# ! gcloud run deploy {APP_NAME} --allow-unauthenticated --platform=managed --region=asia-southeast1 -q --set-env-vars=PROJECT_ID={PROJECT_ID},DATASTORE_ID={DATASTORE_ID} --image={LOCATION}-docker.pkg.dev/{PROJECT_ID}/{APP_NAME}-repo/{APP_NAME}