In [None]:
#Step 0: Authenticate

from google.cloud import aiplatform
import os

# Set your project ID and location
PROJECT_ID = "your-gcp-project-id"
LOCATION = "us-central1" # Or your desired region

aiplatform.init(project=PROJECT_ID, location=LOCATION)

In [None]:
#Step 1: Create data store

from google.cloud import discoveryengine_v1beta as discoveryengine
from google.api_core.client_options import ClientOptions

def create_data_store(
    project_id: str,
    location: str,
    data_store_name: str,
    data_store_id: str, # A unique identifier for your data store
    industry_vertical: str = "GENERIC", # Or "MEDIA", "HEALTHCARE", "RETAIL"
    data_store_type: str = "WEB", # Or "BIGQUERY", "CLOUD_STORAGE", etc.
    solution_type: str = "SEARCH", # Or "RECOMMENDATION"
):
    client_options = ClientOptions(api_endpoint=f"{location}-discoveryengine.googleapis.com")
    client = discoveryengine.DataStoreServiceClient(client_options=client_options)

    parent = f"projects/{project_id}/locations/{location}"

    data_store = discoveryengine.DataStore(
        display_name=data_store_name,
        industry_vertical=industry_vertical,
        solution_types=[solution_type],
        id=data_store_id # This 'id' is often the display_name transformed to a URL-friendly string
    )

    if data_store_type == "WEB":
        data_store.content_config = discoveryengine.DataStore.ContentConfig.WEBSITE
    elif data_store_type == "BIGQUERY":
        data_store.content_config = discoveryengine.DataStore.ContentConfig.BIGQUERY
    # Add other content configs as needed

    request = discoveryengine.CreateDataStoreRequest(
        parent=parent,
        data_store=data_store,
        data_store_id=data_store_id,
    )

    try:
        response = client.create_data_store(request=request)
        print(f"Created data store: {response.name}")
        return response
    except Exception as e:
        print(f"Error creating data store: {e}")
        return None



In [None]:
#Step 2: Ingest data

from google.cloud import discoveryengine_v1beta as discoveryengine
from google.api_core.client_options import ClientOptions

def import_documents_from_gcs(
    project_id: str,
    location: str,
    data_store_id: str,
    gcs_uri: str, # e.g., "gs://your-bucket-name/your-folder/*.json"
):
    client_options = ClientOptions(api_endpoint=f"{location}-discoveryengine.googleapis.com")
    client = discoveryengine.DocumentServiceClient(client_options=client_options)

    parent = client.data_store_path(project_id, location, data_store_id)

    input_config = discoveryengine.ImportDocumentsRequest.GcsSource(
        input_uris=[gcs_uri],
        data_schema="document", # Or "product", depending on your data
    )

    request = discoveryengine.ImportDocumentsRequest(
        parent=parent,
        gcs_source=input_config,
        reconciliation_mode=discoveryengine.ImportDocumentsRequest.ReconciliationMode.FULL, # Or INCREMENTAL
        # auto_generate_document_id=True, # Use if your documents don't have IDs
    )

    operation = client.import_documents(request=request)

    print(f"Waiting for import operation to complete: {operation.operation.name}")
    response = operation.result()
    print("Import operation completed.")
    print(response)



In [None]:
#Step 3: Create search app to enable search
from google.cloud import discoveryengine_v1beta as discoveryengine
from google.api_core.client_options import ClientOptions

def create_search_app(
    project_id: str,
    location: str,
    search_app_name: str,
    data_store_ids: list[str],
):
    client_options = ClientOptions(api_endpoint=f"{location}-discoveryengine.googleapis.com")
    client = discoveryengine.SearchServiceClient(client_options=client_options)

    parent = f"projects/{project_id}/locations/{location}/collections/default_collection"

    # You need to get the full resource names of your data stores
    data_store_resources = [
        f"projects/{project_id}/locations/{location}/dataStores/{ds_id}"
        for ds_id in data_store_ids
    ]

    search_app = discoveryengine.App(
        display_name=search_app_name,
        data_store_ids=data_store_resources,
        solution_type=discoveryengine.SolutionType.SOLUTION_TYPE_SEARCH,
    )

    request = discoveryengine.CreateAppRequest(
        parent=parent,
        app=search_app,
        app_id=search_app_name.lower().replace(" ", "-"), # A unique ID for the app
    )

    try:
        response = client.create_app(request=request)
        print(f"Created search app: {response.name}")
        return response
    except Exception as e:
        print(f"Error creating search app: {e}")
        return None
    



In [None]:
#Step 4: perform search queries
from google.cloud import discoveryengine_v1beta as discoveryengine
from google.api_core.client_options import ClientOptions

def search_documents(
    project_id: str,
    location: str,
    search_app_id: str,
    query: str,
    page_size: int = 10,
):
    client_options = ClientOptions(api_endpoint=f"{location}-discoveryengine.googleapis.com")
    client = discoveryengine.SearchServiceClient(client_options=client_options)

    serving_config = client.serving_config_path(
        project_id, location, search_app_id, "default_serving_config"
    )

    request = discoveryengine.SearchRequest(
        serving_config=serving_config,
        query=query,
        page_size=page_size,
        # You can add more parameters for advanced search, like:
        # query_expansion_spec=discoveryengine.SearchRequest.QueryExpansionSpec(
        #     condition=discoveryengine.SearchRequest.QueryExpansionSpec.Condition.AUTO,
        # ),
        # content_search_spec=discoveryengine.SearchRequest.ContentSearchSpec(
        #     snippet_spec=discoveryengine.SearchRequest.ContentSearchSpec.SnippetSpec(
        #         return_snippet=True
        #     )
        # )
    )

    try:
        response = client.search(request=request)
        print(f"Search results for '{query}':")
        for i, result in enumerate(response.results):
            print(f"--- Result {i+1} ---")
            print(f"Document ID: {result.document.id}")
            print(f"Document Name: {result.document.name}")
            if result.document.derived_struct_data:
                # Access structured data if available
                # Example: print(result.document.derived_struct_data.get("title"))
                pass
            if result.snippet:
                print(f"Snippet: {result.snippet.snippet}")
            if result.document.uri:
                print(f"URI: {result.document.uri}")
        return response
    except Exception as e:
        print(f"Error during search: {e}")
        return None
    


In [None]:
#Integrating with Rag resource

from google.cloud import aiplatform
from google.cloud.aiplatform.generative_models import HarmCategory, HarmBlockThreshold
from google.cloud.aiplatform import telemetry

def generate_content_with_rag(
    project_id: str,
    location: str,
    rag_corpus_id: str, # This is usually your data store ID or a specific RAG corpus
    query_text: str,
):
    aiplatform.init(project=project_id, location=location)
    model = aiplatform.GenerativeModel("gemini-pro")

    rag_resource = aiplatform.rag.RagResource(
        rag_corpus=f"projects/{project_id}/locations/{location}/ragCorpora/{rag_corpus_id}",
    )

    response = model.generate_content(
        contents=[query_text],
        rag_resources=[rag_resource],
        safety_settings={
            HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
            HarmCategory.HARM_CATEGORY_HATE_SPEECH: HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
            HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
            HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
        }
    )

    print(f"Generated Content: {response.text}")
    if response.usage_metadata and response.usage_metadata.retrieval_tool_code:
        print(f"Retrieval Tool Code: {response.usage_metadata.retrieval_tool_code}")
    if response.usage_metadata and response.usage_metadata.retrieval_tool_response:
        print(f"Retrieval Tool Response: {response.usage_metadata.retrieval_tool_response}")



In [None]:
# Example usage:
# data_store = create_data_store(
#     PROJECT_ID,
#     LOCATION,
#     "My Custom Search Data Store",
#     "my-custom-search-data-store-id",
#     data_store_type="WEB" # For website crawling
# )

# Example usage (assuming you have a data store and data in GCS):
# import_documents_from_gcs(
#     PROJECT_ID,
#     LOCATION,
#     "my-custom-search-data-store-id",
#     "gs://your-bucket/documents/*.json"
# )

# Example usage:
# search_app = create_search_app(
#     PROJECT_ID,
#     LOCATION,
#     "My Custom Search App",
#     ["my-custom-search-data-store-id"]
# )

# Example usage:
# search_results = search_documents(
#     PROJECT_ID,
#     LOCATION,
#     "my-custom-search-app-id",
#     "What is Vertex AI?"
# )

# Example usage (assuming you have a RAG corpus/data store set up):
# generate_content_with_rag(
#     PROJECT_ID,
#     LOCATION,
#     "my-custom-search-data-store-id", # Or a specific RAG corpus ID
#     "Tell me about the financial performance of Alphabet in Q1 2023."
# )