Copyright 2024 Google, LLC. This software is provided as-is,
without warranty or representation for any use or purpose. Your
use of it is subject to your agreement with Google.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

   http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

# Example Agent Workflow using Google's ADK

This notebook provides an example of building an agentic workflow with Google's new ADK. For more information please visit https://cloud.google.com/vertex-ai/generative-ai/docs/multimodal/send-chat-prompts-gemini

In [None]:
# Core Vertex AI and Agent SDK
!pip install google-cloud-aiplatform
!pip install google-adk

# Google Cloud Storage client
!pip install google-cloud-storage

# For making HTTP requests (to the mock ticket server)
!pip install requests

Import python libraries

In [None]:
# Vertex AI Modules
import vertexai
from vertexai.preview.generative_models import GenerativeModel, GenerationConfig, Part, Tool, ChatSession, FunctionDeclaration, grounding, GenerationResponse
from vertexai.preview import rag

# Vertex Agent Modules
from google.adk.agents import Agent
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.adk.artifacts.in_memory_artifact_service import InMemoryArtifactService


# Vertex GenAI Modules
import google.genai
from google.genai import types

# Google Cloud AI Platform Modules
from google.cloud import aiplatform_v1beta1 as aiplatform # This module helps parse the info for delete_rag_corpa function
from google.cloud import storage

# Other Python Modules
#import base64
#from IPython.display import Markdown
import requests
import os
from typing import List, Dict, TypedDict
import json
from urllib.parse import urlparse

## Set these variables for your environment 

In [None]:
project_id = "rkiles-demo-host-vpc" # Your GCP Project ID
location = "global" # You can leave this setting as global
region = "us-central1" # Your region. This notebook has only been tested in us-central1

corpa_name = "nest-rag-corpus" # This will be the display name of your RAG Engine corpus

corpa_document_bucket = "gs://rkiles-test/nest/docs/" # The GCS path to the files you want to ingest into your RAG Engine corpus

local_documents = "./nest_docs/" # Local directory containing Nest support files to copy

ticket_server_url = "http://ticket01:8000" # The url to the mock ticket system. This will be a GCE VM running the ticket_server.py web service.

Set environment variables for the Agent and GenAI modules

In [None]:
os.environ["GOOGLE_GENAI_USE_VERTEXAI"] = "1"
os.environ["GOOGLE_CLOUD_PROJECT"] = project_id
os.environ["GOOGLE_CLOUD_LOCATION"] = region

Initialze Vertex AI setting

In [None]:
vertexai.init(project=project_id, location=region)

## Upload PDF Files for the RAG Engine Instance

Let's start by defining the GCS folder where we'll keep our PDF files. These will be used with RAG Engine to help identify relevant support files that we can add to the shared session state (i.e. context window) for our agents.

Parse the GCS path from our "corpa_document_bucket" variable to get bucket name and prefix

In [None]:
parsed_uri = urlparse(corpa_document_bucket)
bucket_name = parsed_uri.netloc
prefix = parsed_uri.path.lstrip('/')

Initialize the Google Cloud Storage client

In [None]:
storage_client = storage.Client()

Check to see if the GCS bucket exists and if not, create it.

In [None]:
# Get the bucket object
bucket = storage_client.bucket(bucket_name)

# Check if the bucket exists, create it if not
if not bucket.exists():
    bucket.create()
    print(f"Bucket '{bucket_name}' created successfully.")
else:
    print(f"Bucket '{bucket_name}' already exists.")

# Create the folder prefix if it doesn't implicitly exist
if prefix:
    blob_name = f"{prefix}" if prefix.endswith('/') else f"{prefix}/"
    placeholder_blob = bucket.blob(blob_name + ".placeholder")
    if not placeholder_blob.exists():
        placeholder_blob.upload_from_string('')
        print(f"Simulated folder '{corpa_document_bucket}' created.")
    else:
        print(f"Simulated folder '{corpa_document_bucket}' already exists.")


Copy files from the local directory to the GCS bucket

In [None]:
if os.path.exists(local_documents) and os.path.isdir(local_documents):
    for filename in os.listdir(local_documents):
        local_file_path = os.path.join(local_documents, filename)
        if os.path.isfile(local_file_path):
            gcs_blob_name = f"{prefix}{filename}"
            blob = bucket.blob(gcs_blob_name)
            blob.upload_from_filename(local_file_path)
            print(f"Uploaded '{local_file_path}' to 'gs://{bucket_name}/{gcs_blob_name}'")
else:
    print(f"Local directory '{local_documents}' does not exist or is not a directory.")

## Define Functions

We define agent tools the same way we define functions in Python. 

We'll start by defining Python functions we'll use as part of our application code base

Define a function to delete a RAG Engine corpus

In [None]:
def delete_rag_corpora(data: aiplatform.services.vertex_rag_data_service.pagers.ListRagCorporaPager):
    """Extracts 'name' values from a ListRagCorporaPager and prints them.

    Args:
        data: The ListRagCorporaPager object returned from the API.
    """

    names_list = []
    # First loop: Add names to the list
    for rag_corpus in data:  # Iterate through the rag_corpora objects
        if hasattr(rag_corpus, 'name'):  #  Check if the attribute exists
             names_list.append(rag_corpus.name)

    # Second loop: Print the names
    for corpa_name in names_list:
        rag.get_corpus(name=corpa_name)
        rag.delete_corpus(name=corpa_name)

Define a function to create a RAG Engine corpus

In [None]:
def create_rag_corpora(display_name, source_bucket):
    EMBEDDING_MODEL = "publishers/google/models/text-embedding-004"  # @param {type:"string", isTemplate: true}
    embedding_model_config = rag.EmbeddingModelConfig(publisher_model=EMBEDDING_MODEL)

    rag_corpus = rag.create_corpus(
        display_name=display_name, embedding_model_config=embedding_model_config
    )
    

    
    INPUT_GCS_BUCKET = (
        source_bucket
    )

    response = rag.import_files(
        corpus_name=rag_corpus.name,
        paths=[INPUT_GCS_BUCKET],
        chunk_size=1024,  # Optional
        chunk_overlap=100,  # Optional
        max_embedding_requests_per_min=900,  # Optional
    )
    
    # This code shows how to upload local files to the corpus. 
    #rag_file = rag.upload_file(
    #    corpus_name=rag_corpus.name,
    #    path="./test.txt",
    #    display_name="test.txt",
    #    description="my test file"
    #)
    
    return rag_corpus

## Define Agent Tools that we'll use with our Agents

Define an Agent Tool to create a ticket (currently not in use)

In [None]:
def create_ticket(ticket_data: Dict) -> Dict:
    """
    Creates a ticket via the /ticket endpoint.

    Args:
        ticket_data: A dictionary containing the ticket data
                     (ticket_id, description, customer_id, contact_name).

    Returns:
        A dictionary containing the API response.  Handles errors gracefully.
    """
    url = f"{ticket_server_url}/ticket"
    headers = {"Content-Type": "application/json"}

    try:
        response = requests.post(url, headers=headers, data=json.dumps(ticket_data))
        response.raise_for_status()  # Raise HTTPError for bad responses (4xx or 5xx)
        return response.json()
    except requests.exceptions.RequestException as e:
        return {"error": f"Request failed: {e}"}

Define an Agent Tool to add notes to an existing ticket

In [None]:
def add_note(ticket_id: int, contact_name: str, note: str) -> Dict:
    """
    Adds a note via the /notes endpoint.

    Args:
        ticket_id: int - A number representing the ticket ID number
        contact_name: str - The name of the contact person as a string value
        note: str - A string value that outlines the plan of action to resolve the ticket

    Returns:
         A dictionary containing the API response.  Handles errors gracefully.
    """
    url = f"{ticket_server_url}/notes"
    headers = {"Content-Type": "application/json"}

    # Construct the dictionary *inside* the function
    note_data = {
        "ticket_id": ticket_id,
        "contact_name": contact_name,
        "note": note
    }

    try:
        response = requests.post(url, headers=headers, data=json.dumps(note_data))
        response.raise_for_status()
        return response.json()
    except requests.exceptions.RequestException as e:
        return {"error": f"Request failed: {e}"}

Define an Agent Tool to add a file to the Agent Session. This is a shared memory space for your agents

In [None]:
def add_file_to_session(uri: str) -> types.Content:
    """
    Adds a specific file from Google Cloud Storage (GCS) to the current session state for agent processing.

    This function takes a GCS URI for a file and wraps it in a `types.Content` object.
    This object is then typically used to make the file's content accessible to the
    agent for tasks like summarization, question answering, or data extraction
    related specifically to that file within the ongoing conversation or session.
    The MIME type is assumed to be inferred by the underlying system or defaults.

    Use this function *after* you have identified a specific GCS URI (e.g., using
    `get_gcs_uri` or similar) that you need the agent to analyze or reference directly.

    Args:
        uri: str - The complete Google Cloud Storage URI of the file to add.
                 Must be in the format "gs://bucket_name/path/to/file.pdf".
                 Example: "gs://my-doc-bucket/reports/q1_report.pdf"

    Returns:
         types.Content - A structured Content object representing the referenced file.
                       This object has `role='user'` and contains a `types.Part`
                       that holds the reference to the provided GCS URI.
                       This Content object can be passed to the agent in subsequent calls.
    """
    print(uri)
    content = types.Content(
        role='user',
        parts=[
            # Try passing ONLY the uri positionally, based on the error message "takes 1 positional argument"
            types.Part.from_uri(
                file_uri=uri,
                mime_type="application/pdf",
            )
        ]
    )

    return content

Define an Agent Tool to retrieve the GCS Bucket location of the document(s) associated with the user's query (in this case the ticket description)

In [None]:
def get_gcs_uri(query: str) -> str:
    """
    Retrieves Google Cloud Storage (GCS) URIs for documents relevant to a given query.

    This function queries a pre-configured Retrieval-Augmented Generation (RAG)
    corpus to find documents related to the input query string. It extracts
    the source GCS URIs from the top relevant documents identified by the
    RAG system based on semantic similarity. Use this function when you need
    to find the source files in GCS that contain information related to a
    specific question or topic.

    Args:
        query: str - The natural language query or topic to search for within
                 the RAG corpus. For example: "What were the Q3 sales figures?"
                 or "Tell me about project Alpha's latest status".

    Returns:
         str - A JSON string representing a list of unique GCS URIs. These URIs
               point to the source documents found to be relevant to the query.
               Returns a JSON string representing an empty list ('[]') if no
               relevant documents meet the similarity criteria.
               Example return value: '["gs://my-bucket/doc1.pdf", "gs://my-bucket/report_q3.txt"]'
    """
    query_response = rag.retrieval_query(
        rag_resources=[
            rag.RagResource(
                rag_corpus=rag_corpus.name,
                # Optional: supply IDs from `rag.list_files()`.
                # rag_file_ids=["rag-file-1", "rag-file-2", ...],
            )
        ],
        text=f'''
        {query}
        ''',
        similarity_top_k=10,  # Optional
        vector_distance_threshold=0.5,  # Optional
    )
    #print(response)
    uri_set = set()
    for context in query_response.contexts.contexts:
        uri_set.add(context.source_uri)
        #json.dumps(list(uri_set))
    #doc_uri = uri_set.pop()
    doc_uri = json.dumps(list(uri_set))
    return doc_uri

## Create a RAG Corpus

Start by listing all avaiable RAG Engine corpora

In [None]:
existing_corpora = rag.list_corpora()

In [None]:
print(existing_corpora)

We will first see if there is an existing Corpus we should use. If not, we will create one.

In [None]:
# Variable to hold the corpus if found
found_corpus = None

# Iterate through all existing RAG corpora
for corpus in existing_corpora.rag_corpora: # Ensure you iterate the correct attribute
    # Check if display_name exists and matches
    if getattr(corpus, 'display_name', None) == corpa_name:
        print(f"Existing Corpa found. Using {corpus.name}")
        
        # You already have the corpus object, no need to call get_corpus usually
        # If 'corpus' object from the list is sufficient, use it directly.
        # If you MUST get a fresh object or different type, uncomment the next line:
        # rag_corpus = rag.get_corpus(name=corpus.name) 
        found_corpus = corpus # Store the found corpus object
        
        print(f"This corpus contains the following files:")
        try:
            # List files associated with the found corpus
            for file in rag.list_files(corpus.name): # Use corpus.name
                print(getattr(file, 'display_name', 'N/A')) # Safer access
        except Exception as e:
            print(f"Warning: Could not list files for {corpus.name}. Error: {e}")
            
        break # Exit the loop as soon as we find the match

# After the loop, check if we found anything
if found_corpus is None:
    # The loop completed without finding the corpus
    print(f"No existing {corpa_name} resource found. Creating one now.")
    try:
        rag_corpus = create_rag_corpora(corpa_name, corpa_document_bucket)
        print(f"New RAG corpus created at {rag_corpus.name}")
    except Exception as e:
        print(f"Error creating corpus {corpa_name}: {e}")
        rag_corpus = None # Indicate failure
else:
    # The corpus was found in the loop
    rag_corpus = found_corpus # Assign the found corpus to the main variable

# Now 'rag_corpus' holds either the found or newly created corpus (or None if creation failed)
# You can proceed to use 'rag_corpus' here
if rag_corpus:
    print(f"\nProceeding with corpus: {rag_corpus.name}")
    # ... your next steps using rag_corpus ...
else:
    print(f"\nFailed to find or create corpus '{corpa_name}'. Cannot proceed.")

**WARNING**: Uncomment this next line to delete ALL existing RAG Engine corpora within this project.

ONLY USE THIS OPTION IN A NON PRODUCTION ENVIRONMENT!

In [None]:
#delete_rag_corpora(rag.list_corpora())

## Test the RAG connection

Let's test the RAG Engine by using the get_gcs_uri function. This will query RAG Engine and return the URI for the grounding documents associated with our query. 

In [None]:
test = get_gcs_uri('How do I install a Nest E thermostat')
print(test)

As you can see with this response, we just exposed a common challenge with standard RAG systems: their reliance on vector search can lead to retrieving contextually similar yet inaccurate documents. In this instance, the RAG engine provided three installation guides when only one directly relates the user's query about the Nest E thermostat. We will see how we can address this issue when we define our reasoning agent.

## Define the Agents

Let's define some agents

First, we'll create a RAG agent. This agent will pull information from a RAG and return the GCS URI of applicable files.

In [None]:
rag_agent = Agent(
    name="rag_agent",
    model="gemini-2.0-flash-001",
    description="Retrieves information from a RAG Engine instance and returns the GCS URI of relevant files.",
    instruction="""
      You are a customer support agent. You help locate documentation that will resolve customer issues.
      Identify the most relevant support document that pertains to the question.
      Your job is to only provide the GCS URI for the closest matching document, not to resolve the issue.
      You will use the get_gcs_uri to identify the correct file. 
      The response from get_gcs_uri will be a text string like 'gs://bucket_name/folder/file 1.pdf'
      Determine which files are relevant to the customer's question and return them to the root agent.
    """,
    tools=[
        get_gcs_uri
    ],
    #flow='single',
)

Next we'll create a Reasoning Agent. This agent will generate the troubleshooting steps to help resolve the customer issue.

We will design the reasoning agent to explicitly name the source document it will use. Our approach will help mitigate the common issue with RAG retrieving contextually similar, but incorrect documents by adding the retrieved files to the shared session state, enabling the LLM itself to analyze and select the most pertinent source document.

In [None]:
reasoning_agent = Agent(
    name="reasoning_agent",
    model="gemini-2.5-pro-exp-03-25",
    description="Defines the troubleshooting process to help resolve the customer's problem",
    instruction="""
      You are a customer support agent. You help define the troubleshooting process to resolve customer issues.
      Use the information in the document to define the process for resolving the issue.
      Once the files have been added to your context, use that information to outline the process needed to resolve the identified problem.
      If multiple documents are provided, specify which document or documents contains the relevant information to resolve the issue.
      The process needs to outline the activities for the Nest technical support representitive to perform.
      The notes system only supports plain text. Ensure you only use text in your output.
      
      Example:
      Step 1: Do this
      Step 2: Do this other task
      Step 3: etc
    """,
    tools=[
        #add_file_to_session,
    ],
    #flow='single',
)

Let's create a notes agent to add notes to a ticket. We will use the "add_note" tool to interact with our custom API on the ticket server.

In [None]:
notes_agent = Agent(
    name="notes_agent",
    model="gemini-2.0-flash-001",
    description="Add notes to the associated ticket",
    instruction="""
      You are a customer support assistant agent. Your primary task is to generate informative and well-structured notes for customer support tickets. 
      These notes will be added to the ticket's history and used by customer support representatives to understand the issue, track progress, and provide consistent service.
    """,
    tools=[
        add_note,
    ],
    #flow='single',
)

Lastly, we'll define the root agent

In [None]:
nest_agent_team = Agent(
    name="nest_support_agent",
    model="gemini-2.0-flash-001",
    description="The main coordinator agent. Handles user requests and delegates tasks to specialists.",
    instruction="""
      You are a Nest customer support agent. You help triage and document actions for customer support tickets. 
      These notes will be added to the ticket's history and used by customer support representatives to understand the issue, track progress, and provide consistent service
      Start by identifying what the problem is, then call the rag_agent to identify the related documents.
      The add_file_to_session tool only supports 1 document at a time. 
      Use the add_file_to_session tool to add the document to your context. 
      If the rag_agent provides multiple documents, you will need to make multiple calls using the add_file_to_session tool
      Once the rag_agent has provided you the gcs links to the documents, call the reasoning_agent to define the troubleshooting process.
      The reasoning_agent will return the process to troubleshoot the issue. 
      Call the notes_agent to add the troubleshooting process to the ticket notes. The notes_agent is expecting the following json request body:
            {
                "ticket_id":  The ticket ID represented as an integer,
                "contact_name": "string value of the contact name",
                "note": "string value of the troubleshooting process"
            }
    """,
    tools=[add_file_to_session],
    #flow='auto',
    sub_agents=[rag_agent, reasoning_agent, notes_agent]
)

## Let's see how it works

In [None]:
def format_output(parts_list: list[types.Part]) -> str:
    """
    Formats a list of Part objects (typically returned by run_prompt)
    into a human-readable string summarizing the interaction.
    Includes specific details for known function calls like transfer_to_agent.
    """
    if not isinstance(parts_list, list):
        return "[Error: Input to format_output should be a list of Part objects]"

    formatted_lines = []
    formatted_lines.append("--- Agent Interaction (Formatted) ---")

    if not parts_list:
        formatted_lines.append("(No content parts received)")
    else:
        for part in parts_list:
            if part.text:
                # Append agent's text
                formatted_lines.append(part.text)
            elif part.function_call:
                # === Updated Function Call Handling ===
                fc = part.function_call
                # Special handling for transfer_to_agent
                if fc.name == 'transfer_to_agent':
                    # Safely get the agent_name from args, provide default if missing
                    agent_name = fc.args.get('agent_name', 'UNKNOWN_AGENT')
                    formatted_lines.append(f"  [Function Call: Transfer to Agent -> {agent_name}]")
                # Special handling for add_file_to_session
                elif fc.name == 'add_file_to_session':
                    # Safely get the uri from args
                    file_uri = fc.args.get('uri', 'UNKNOWN_URI')
                    # Extract just the filename for brevity if desired
                    filename = file_uri.split('/')[-1] if '/' in file_uri else file_uri
                    formatted_lines.append(f"  [Function Call: Add File to Session ({filename})]")
                    # Or show full URI:
                    # formatted_lines.append(f"  [Function Call: {fc.name} (URI: {file_uri})]")
                else:
                    # Default formatting for any other function calls
                    formatted_lines.append(f"  [Function Call: {fc.name}]")
                    # Optionally show generic args for unknown functions:
                    # if fc.args:
                    #    formatted_lines.append(f"    Args: {fc.args}")
                # === End of Updated Function Call Handling ===
            elif part.function_response:
                # Indicate a function response
                fr = part.function_response
                # Add specific details based on function name if needed
                if fr.name == 'add_file_to_session':
                     # Try to extract file info if available in the response structure
                     file_info = "(Response received)" # Default text
                     try:
                         if isinstance(fr.response, dict) and 'result' in fr.response:
                             result_content = fr.response['result']
                             if isinstance(result_content, types.Content) and result_content.parts:
                                 for res_part in result_content.parts:
                                     if res_part.file_data:
                                         filename = res_part.file_data.file_uri.split('/')[-1]
                                         file_info = f"(File Processed: {filename})"
                                         break
                     except Exception:
                         pass # Keep default text on error
                     formatted_lines.append(f"  [Function Response for: {fr.name} {file_info}]")
                else:
                     # Generic response message
                     formatted_lines.append(f"  [Function Response for: {fr.name}]")

            elif part.file_data:
                 # Handle directly referenced file data
                 filename = part.file_data.file_uri.split('/')[-1]
                 formatted_lines.append(f"  [File Reference: {filename}]")
            # Add checks for other Part types if needed (code, thought)
            # elif part.executable_code: ...
            # elif part.code_execution_result: ...
            # elif part.thought: ...
            else:
                 # Ignore parts with no directly readable content
                 pass

    formatted_lines.append("--- End of Formatted Interaction ---")
    return "\n".join(formatted_lines)

In [None]:
# Define constants for identifying the interaction context
APP_NAME = "nest_support_agent" # The name of our agentic workflow
USER_ID = "user_1" # The assoicated user we are interacting with
SESSION_ID = "session_001" # Using a fixed ID for simplicity

session_service = InMemorySessionService() # Memory service stores message history, kind of like a context window
artifact_service = InMemoryArtifactService() # Artifact service stores binary data, such as PDF files
session = session_service.create_session(
    app_name=APP_NAME,
    user_id=USER_ID,
    session_id=SESSION_ID
)
runner = Runner(
    agent='nest_support_agent', # The agent we want to run
    app_name=APP_NAME,   # Associates runs with our app
    artifact_service=artifact_service, # Uses our Artifact service
    session_service=session_service # Uses our session manager
)

def run_prompt(new_message: str) -> list[types.Part]:
  """
  Runs a prompt using the ADK runner and returns a list of all
  Part objects received in the response events.
  (Does not print the raw parts itself anymore).
  """
  content = types.Content(role='user', parts=[types.Part(text=new_message)])
  all_parts = [] # Initialize an empty list to collect parts of the message


  try:
    for event in runner.run(
        session_id=session,
        user_id=USER_ID,
        new_message=content,
    ):
      if event.content and event.content.parts:
          # Extend the all_parts list with the parts from this event
          all_parts.extend(event.content.parts)

  except Exception as e:
      # Handle potential errors during the run
      print(f"\n--- Error during ADK execution ---")
      print(e)
      pass
  finally:
      # Optional: indicate completion
      print("--- ADK run finished ---")
      pass

  return all_parts # Return the complete list of Part objects


In [None]:
raw_parts_list = run_prompt(f'''
Hi, I need help installing my Nest 3rd gen thermostat. I don't know which wires to connect for power.
''')
#readable_string = format_output(raw_parts_list)
print(raw_parts_list)

In [None]:
raw_parts_list = run_prompt(f'''
Define the troubleshooting process for this issue.
''')
readable_string = format_output(raw_parts_list)
print(readable_string)

In [None]:
raw_parts_list = run_prompt(f'''
Add the troubleshooting process to the notes of the ticket. The ticket number is 1759 and the contact name is 'JonD@here.com'
''')
readable_string = format_output(raw_parts_list)
print(readable_string)

You can now check the "notes.txt" file on the GCE VM running ticket_server.py. You should see the full notes from this ticket. To make it easier to read, you can run 'pandoc notes.txt | lynx -stdin' from the Ticket server console to render the default markdown from the model. 