<a href="https://colab.research.google.com/github/ahmadluay9/ADK-Training/blob/main/07_rag_with_vertexai_search.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# End-to-End RAG with Vertex AI Search and Gemini,
,
This tutorial demonstrates how to build a **Retrieval Augmented Generation (RAG)** pipeline using Google Cloud. We will cover three main steps:,
,
1.  **Ingestion**: Creating a Google Cloud Storage (GCS) bucket and uploading data.,
2.  **Indexing**: Importing that data into a Vertex AI Search Data Store.,
3.  **RAG**: Searching the data store for relevant context and using Gemini to answer questions.,
,
### Prerequisites,
1.  A Google Cloud Project.,
2.  The following APIs enabled:,
    * Vertex AI API (`aiplatform.googleapis.com`),
    * Discovery Engine API (`discoveryengine.googleapis.com`),
    * Cloud Storage API (`storage.googleapis.com`),
3.  **Important:** You must manually create a **Vertex AI Search App (Generic Type)** in the Google Cloud Console to get a `Data Store ID`. (Creation via API is possible but takes ~15-20 minutes, so we use an existing shell for this tutorial).

<img height="650" src="
https://storage.cloud.google.com/training-public-eikon/search_agent_workflow.png" alt="adk-search-diagram" />

---

## 1 Setup

### 1.1 Install dependencies

The Google Colab Notebooks environment includes a pre-installed version of the [`google-adk`](https://google.github.io/adk-docs/) library for Python and its required dependencies, so you don't need to install additional packages in this notebook.

To install and use ADK in your own Python development environment outside of this course, you can do so by running:

```
pip install google-adk
```

In [None]:
# !pip show google-cloud-storage google-adk google-genai google-cloud-discoveryengine

In [None]:
# !pip install google-adk google-genai

In [None]:
from google.colab import auth

# Set your project ID
project_id = 'YOUR-PROJECT-ID' # @param {type:"string"}
location = 'global' # @param {type:"string"}

gcs_bucket_name = 'YOUR-BUCKET-NAME' # @param {type:"string"}
gcs_location = 'asia-southeast2' # @param {type:"string"}

data_store_id = 'YOUR-DATASTORE-ID' # @param {type:"string"}

# Authenticate
auth.authenticate_user(project_id=project_id)

# Set the project for gcloud command-line tool
!gcloud config set project {project_id}

# Enable the required APIs
!gcloud services enable aiplatform.googleapis.com discoveryengine.googleapis.com storage.googleapis.com

In [None]:
project_id_output = !gcloud config get project
print(f"PROJECT_ID = {project_id_output[0]}")
print("")
!gcloud auth list

---

## 2 Data Ingestion (Google Cloud Storage)

We will create a GCS bucket and upload a sample policy document. This serves as the "source of truth" for our RAG pipeline.

In [None]:
# create sample text
sample_text = """
City General Hospital: Patient Discharge Policy (2025)
1. Discharge Planning: Planning begins upon admission. A case manager is assigned within 24 hours.
2. Medication Reconciliation: A pharmacist must review all discharge medications with the patient before release.
3. Follow-up Appointments: All appointments must be scheduled prior to the patient leaving the premises.
4. Transportation: Patients undergoing sedation must have a verified escort for transport home; taxi/rideshare is not permitted alone.
5. Documentation: The discharge summary must be completed in the EMR within 48 hours of release.
    """

filename = "discharge_policy.txt"

with open(filename, "w") as f:
    f.write(sample_text)
print(f"Saved local file: {filename}")

In [None]:
from google.cloud import storage

storage_client = storage.Client(project=project_id)

# Create Bucket
print(f"Creating bucket {gcs_bucket_name} in {gcs_location}...")
bucket = storage_client.create_bucket(gcs_bucket_name, location=gcs_location)
print(f"Created bucket: {bucket.name}")

In [None]:
# Upload to GCS

# Define the destination path and filename inside the GCS bucket. "blobs" (Binary Large OBjects)
blob_name = "policies/discharge_policy_2025.txt"

# Create a blob object representing the file within the bucket
blob = bucket.blob(blob_name)

# Upload the contents of the local file to the GCS blob
blob.upload_from_filename(filename)

# Construct the standard GCS URI format (e.g., gs://my-bucket/policies/...)
gcs_uri = f"gs://{gcs_bucket_name}/{blob_name}"

# Print a confirmation message to the console
print(f"Successfully uploaded data to: {gcs_uri}")

---

## 3 Create the Data Store

Now we create a **Data Store** and trigger an ingestion job to index the files from GCS. This allows Vertex AI to perform semantic searches over your documents.

[Documentation](https://docs.cloud.google.com/generative-ai-app-builder/docs/create-data-store-es#discoveryengine_v1_generated_DocumentService_ImportDocuments_sync-python)

In [None]:
from google.api_core.client_options import ClientOptions
from google.cloud import discoveryengine

def create_data_store(project_id: str, location: str, data_store_id: str) -> str:
    # Set endpoint if location is not global
    client_options = (
        ClientOptions(api_endpoint=f"{location}-discoveryengine.googleapis.com")
        if location != "global"
        else None
    )

    # Create a client
    client = discoveryengine.DataStoreServiceClient(client_options=client_options)

    # The full resource name of the collection
    parent = client.collection_path(
        project=project_id,
        location=location,
        collection="default_collection",
    )

    data_store = discoveryengine.DataStore(
        display_name="Demo Data Store",

        # Options: GENERIC, MEDIA, HEALTHCARE_FHIR
        industry_vertical=discoveryengine.IndustryVertical.GENERIC,

        # Options: SOLUTION_TYPE_RECOMMENDATION, SOLUTION_TYPE_SEARCH, SOLUTION_TYPE_CHAT, SOLUTION_TYPE_GENERATIVE_CHAT
        solution_types=[discoveryengine.SolutionType.SOLUTION_TYPE_SEARCH],

        # Options: NO_CONTENT, CONTENT_REQUIRED, PUBLIC_WEBSITE
        content_config=discoveryengine.DataStore.ContentConfig.CONTENT_REQUIRED,
    )

    request = discoveryengine.CreateDataStoreRequest(
        parent=parent,
        data_store_id=data_store_id,
        data_store=data_store,
    )

    # Make the request
    print(f"Creating Data Store '{data_store_id}'... This may take a moment.")
    operation = client.create_data_store(request=request)

    print(f"Waiting for operation to complete: {operation.operation.name}")
    response = operation.result()

    metadata = discoveryengine.CreateDataStoreMetadata(operation.metadata)
    print("\nData Store Created Successfully!")

    return operation.operation.name

# Execute the creation
create_data_store(project_id, location, data_store_id)

In [None]:
def import_documents_from_gcs(project_id: str, location: str, data_store_id: str, gcs_uri: str):
    client_options = (
        ClientOptions(api_endpoint=f"{location}-discoveryengine.googleapis.com")
        if location != "global"
        else None
    )

    # Create a client
    client = discoveryengine.DocumentServiceClient(client_options=client_options)

    # The full resource name of the search engine branch.
    # e.g projects/{project}/locations/{location}/dataStores/{data_store_id}/branches/{branch}
    parent = client.branch_path(
        project=project_id,
        location=location,
        data_store=data_store_id,
        branch="default_branch",
    )

    request = discoveryengine.ImportDocumentsRequest(
        parent=parent,
        gcs_source=discoveryengine.GcsSource(
            input_uris=[gcs_uri],

            # **Options** :
            # - `content` - Unstructured documents (PDF, HTML, DOC, TXT, PPTX)
            # - `custom` - Unstructured documents with custom JSONL metadata
            # - `document` - Structured documents in the discoveryengine.Document format.
            # - `csv` - Unstructured documents with CSV metadata

            data_schema="content", # Using 'content' for unstructured documents (PDF, HTML, DOC, TXT, etc.)
        ),
        # Using INCREMENTAL to add new docs without deleting existing ones
        reconciliation_mode=discoveryengine.ImportDocumentsRequest.ReconciliationMode.INCREMENTAL,
    )

    # Make the request
    print(f"Starting import job from {gcs_uri}...")
    operation = client.import_documents(request=request)

    print(f"Waiting for operation to complete: {operation.operation.name}")
    response = operation.result()

    metadata = discoveryengine.ImportDocumentsMetadata(operation.metadata)

    print("\nImport Operation Complete!")
    print(f"Success Count: {metadata.success_count}")
    print(f"Failure Count: {metadata.failure_count}")

# Execute the import
import_documents_from_gcs(project_id, location, data_store_id, gcs_uri)

---

## 4 Agent Configuration (ADK)


Using the **Agent Development Kit (ADK)**, we define an LLM agent that uses the VertexAiSearchTool to retrieve facts before answering.

[Documentation](https://google.github.io/adk-docs/integrations/vertex-ai-search/)

### 4.1 Environment Configurations

Here we bridge the Google Cloud settings into the Python environment variables. The ADK SDK uses these variables to authenticate and interact with Vertex AI models.

In [None]:
import os

try:
    # The SDK uses this ID for usage tracking and billing
    os.environ['GOOGLE_CLOUD_PROJECT'] = project_id

    # Defines the region where Vertex AI resources are hosted
    os.environ['GOOGLE_CLOUD_LOCATION'] = location

    # Directs the SDK to use Vertex AI infrastructure instead of the public Gemini API
    os.environ['GOOGLE_GENAI_USE_VERTEXAI'] = "1"

    print(f"✅ Environment configured for project: {project_id} in {location}")

except Exception as e:
    print(f"❌ Configuration Error: {e}")

## 4.2 Import Components & Initialize Asyncio
Now, import the specific components you'll need from the Agent Development Kit and the Generative AI library. This keeps your code organized and ensures we have access to the necessary building blocks.

In [None]:
from google.genai import types

from google.adk.agents import LlmAgent
from google.adk.models.google_llm import Gemini
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.adk.tools import VertexAiSearchTool
from google.adk.tools.agent_tool import AgentTool
from google.adk.runners import InMemoryRunner

from google.genai.types import GenerateContentConfig

print("✅ ADK components imported successfully.")

## 4.3: Configure Retry Options
When working with LLMs, you may encounter transient errors like rate limits or temporary service unavailability. Retry options automatically handle these failures by retrying the request with exponential backoff.

In [None]:
retry_config=types.HttpRetryOptions(
    attempts=5,  # Maximum retry attempts
    exp_base=7,  # Delay multiplier
    initial_delay=1, # Initial delay before first retry (in seconds)
    http_status_codes=[429, 500, 503, 504] # Retry on these HTTP errors
)

In [None]:
DATASTORE_PATH = f"projects/{project_id}/locations/{location}/collections/default_collection/dataStores/{data_store_id}"

vertex_search_tool = VertexAiSearchTool(data_store_id=DATASTORE_PATH)

In [None]:
vertexai_search_agent = LlmAgent(
    model=Gemini(
        model="gemini-2.5-flash-lite",
        retry_options=retry_config
    ),
    name='vertexai_search_agent',
    description='A helpful assistant for answering questions based on documents retrieved from Vertex AI Search.',
    instruction=f"""
    You are a helpful assistant that answers questions based on information found in the document store: {DATASTORE_PATH}.
    Use the search tool to find relevant information before answering.
    If the answer isn't in the documents, say that you couldn't find the information.
    """,
    tools=[vertex_search_tool],
    generate_content_config=GenerateContentConfig(
        temperature=0.1
    )
)

search_tool = AgentTool(agent=vertexai_search_agent, skip_summarization=False)

In [None]:
root_agent = LlmAgent(
    model=Gemini(model="gemini-2.5-flash-lite", retry_options=retry_config),
    name="root_agent",
    description=(
        "A supervisory agent that handles user queries and delegates document-based "
        "questions to the Vertex AI Search agent when external knowledge retrieval is required."
    ),
    instruction="""
    You are the main orchestrator agent.

    Your responsibilities:
    1. Understand the user's question.
    2. If the question requires information from the document store, call the `search_tool`.
    3. Use the response from the `search_tool` to generate a clear, concise, and accurate final answer.
    4. Do NOT fabricate information.
    5. If the `search_tool` cannot find relevant information, clearly inform the user that the information is not available in the documents.
    6. Keep responses professional and well-structured.

    Only rely on verified information retrieved via the `search_tool` when answering document-based questions.
    """,
    tools=[search_tool],
)

In [None]:
runner = InMemoryRunner(agent=root_agent)
response = await runner.run_debug(
    "Can I take an Uber or taxi home after being sedated?"
)

---