In [2]:
! pip install --upgrade --user --quiet lxml markdownify google-genai google-cloud-aiplatform google-cloud-storage

In [1]:
import os
import requests
from bs4 import BeautifulSoup
from urllib.parse import urlparse
from IPython.display import HTML, Image, Markdown, display

In [2]:
from google import genai
from google.genai.types import HttpOptions
from google.cloud import storage

from google.genai.types import (
    FunctionDeclaration,
    GenerateContentConfig,
    GoogleSearch,
    HarmBlockThreshold,
    HarmCategory,
    Part,
    SafetySetting,
    Tool,
    ToolCodeExecution,
)   

In [3]:
# Set these parameters before running
P = ! gcloud config list --format 'value(core.project)'
PROJECT_ID = P[0]
R = ! gcloud config list --format='value(compute.region)'
LOCATION = R[0]
S = ! gcloud config list --format='value(core.account)' 
SERVICE_ACCOUNT = S[0]

BUCKET_NAME = f"bkt-{PROJECT_ID}-agent-docs"

# gemini model
MODEL_ID = "gemini-2.5-pro-preview-03-25"

In [4]:
client = genai.Client(vertexai=True, project=PROJECT_ID, location=LOCATION)

In [5]:
def crawl_urls(urls, output_dir="crawl"):
    """
    Iterate through a list of URLs, fetch each page, and save the content locally in the specified directory.
    :param urls: List of URL strings to crawl.
    :param output_dir: Directory where fetched pages will be stored.
    """
    # Ensure the output directory exists
    os.makedirs(output_dir, exist_ok=True)

    for url in urls:
        try:
            # Fetch the page
            response = requests.get(url, timeout=10)
            response.raise_for_status()
        except requests.RequestException as e:
            print(f"Failed to fetch {url}: {e}")
            continue

        # Parse URL to create a safe filename
        parsed = urlparse(url)
        path = parsed.path.strip("/").replace("/", "_")
        filename = f"{parsed.netloc}_{path or 'index'}.html"
        filepath = os.path.join(output_dir, filename)

        # Save content to file
        with open(filepath, "w", encoding=response.encoding or "utf-8") as f:
            f.write(response.text)
            
def html_to_markdown(input_dir="crawl", output_dir="markdown"):
    """
    Convert all HTML files in input_dir to Markdown and save them in output_dir.
    Uses the markdownify library under the hood.
    :param input_dir: Directory containing .html files
    :param output_dir: Directory to write .md files
    """
    try:
        from markdownify import markdownify as mdify
    except ImportError:
        raise ImportError("Please install markdownify: pip install markdownify")

    os.makedirs(output_dir, exist_ok=True)

    for filename in os.listdir(input_dir):
        if not filename.lower().endswith(".html"):
            continue

        input_path = os.path.join(input_dir, filename)
        with open(input_path, "r", encoding="utf-8") as f:
            html_content = f.read()

        # Convert HTML to Markdown
        markdown = mdify(html_content, heading_style="ATX")

        # Write out .md file
        base = os.path.splitext(filename)[0]
        output_path = os.path.join(output_dir, f"{base}.md")
        with open(output_path, "w", encoding="utf-8") as f:
            f.write(markdown)
    print("Done")
    
    
def count_tokens_in_markdown(input_dir="markdown", output_file="token_counts.txt", model="gemini-2.0-flash-001"):
    """
    Iterate through all Markdown files in input_dir, count tokens using the GenAI client,
    save each filename and its token count to output_file, and print the total token count.
    :param input_dir: Directory containing .md files
    :param output_file: Path to write the filename-token mappings
    :param model: Model name to use for counting tokens
    """
    total_tokens = 0

    # Ensure directory for output exists
    out_dir = os.path.dirname(output_file)
    if out_dir:
        os.makedirs(out_dir, exist_ok=True)

    with open(output_file, "w", encoding="utf-8") as out_f:
        for filename in os.listdir(input_dir):
            if not filename.lower().endswith(".md"):
                continue

            input_path = os.path.join(input_dir, filename)
            with open(input_path, "r", encoding="utf-8") as f:
                contents = f.read()

            response = client.models.count_tokens(
                model=model,
                contents=contents,
            )
            tokens = response.total_tokens
            total_tokens += tokens
            out_f.write(f"{filename}: {tokens}\n")
            #print(f"{filename} -> {tokens} tokens")

    print(f"Total tokens across all files: {total_tokens}")
    
    
def upload_blob_and_get_uri(bucket_name, source_file_name, destination_blob_name):
    """Uploads a file to the bucket and returns the blob's public URI.

    Args:
        bucket_name (str): Your GCS bucket name. e.g. "your-bucket-name"
        source_file_name (str): Path to your file to upload. e.g. "local/path/to/file.txt"
        destination_blob_name (str): Desired name for the blob in GCS. e.g. "storage-object-name.txt"

    Returns:
        str: The public URI of the uploaded blob (gs://<bucket_name>/<blob_name>).
             Returns None if the upload fails.
    """
    try:
        # Initialize the GCS client
        storage_client = storage.Client()

        # Get the bucket object
        bucket = storage_client.bucket(bucket_name)

        # Create a blob object
        blob = bucket.blob(destination_blob_name)

        # Upload the file from the local path
        print(f"Uploading {source_file_name} to gs://{bucket_name}/{destination_blob_name}...")
        blob.upload_from_filename(source_file_name)

        blob_uri = f"gs://{bucket_name}/{destination_blob_name}"
        print(f"File {source_file_name} uploaded successfully to {blob_uri}")

        # Return the GS URI
        return blob_uri

    except Exception as e:
        print(f"An error occurred during upload: {e}")
        return None
    
def combine_markdown_files(input_dir="markdown", output_file="agent-docs-all.txt"):
    """
    Iterate through all Markdown files in `input_dir`, append their contents into a single file, 
    and write it as `output_file` in the current working directory.
    """
    import os

    # Make sure output directory exists (if output_file includes a path)
    out_dir = os.path.dirname(output_file)
    if out_dir:
        os.makedirs(out_dir, exist_ok=True)

    with open(output_file, "w", encoding="utf-8") as out_f:
        for fname in sorted(os.listdir(input_dir)):
            if not fname.lower().endswith(".md"):
                continue

            path = os.path.join(input_dir, fname)
            with open(path, "r", encoding="utf-8") as in_f:
                out_f.write(in_f.read())
                out_f.write("\n\n")  # separator between files

    print(f"All Markdown files in '{input_dir}' have been combined into '{output_file}'.")

In [18]:
# get the sitemap and links
sitemap_url = "https://google.github.io/adk-docs//sitemap.xml"
resp = requests.get(sitemap_url)
soup = BeautifulSoup(resp.content, "xml")
urls = [loc.text for loc in soup.find_all("loc")]

In [19]:
# curate the urls
exclude = ["contributing-guide", "community", "A2A", "api-reference"]
urls = [u for u in urls if not any(exc in u for exc in exclude)]

# add the apr reference - add if desired
urls.append("https://google.github.io/adk-docs/api-reference/google-adk.html")

In [20]:
# crawl
crawl_urls(urls, output_dir="crawl")
# convert
html_to_markdown(input_dir="crawl", output_dir="markdown")
# count tokens
count_tokens_in_markdown(input_dir="markdown", output_file="token_counts.txt", model="gemini-2.0-flash-001")
# as of 2025.04.23 = 226,717 -> 322,886 with the api reference

# combine
fn = "agent-docs-all.txt"
combine_markdown_files(input_dir="markdown", output_file=fn)

# load to GCS
upload_blob_and_get_uri(BUCKET_NAME, fn, fn)

# keep blob uri for context window
uploaded_blob_uri = upload_blob_and_get_uri(
    bucket_name=BUCKET_NAME,
    source_file_name=fn,
    destination_blob_name=fn
)

Done
Total tokens across all files: 322886


In [6]:
# save for next time
uploaded_blob_uri= f"gs://{BUCKET_NAME}/agent-docs-all.txt"

In [7]:
# setup Gemini
system_instruction = (
    "You are an expert AI programming assistant specializing in generating accurate and robust code using various Software Development Kits (SDKs) and libraries. "
    "Your primary goal is correctness and adherence to best practices expected to be implemented for production systems and code."
)

generate_content_config = GenerateContentConfig(
    temperature=1.0,
    top_p=0.95,
    max_output_tokens=65530,
    response_modalities=["TEXT"],
    system_instruction=system_instruction,
    safety_settings=[
        SafetySetting(category="HARM_CATEGORY_HATE_SPEECH", threshold="OFF"),
        SafetySetting(category="HARM_CATEGORY_DANGEROUS_CONTENT", threshold="OFF"),
        SafetySetting(category="HARM_CATEGORY_SEXUALLY_EXPLICIT", threshold="OFF"),
        SafetySetting(category="HARM_CATEGORY_HARASSMENT", threshold="OFF"),
    ],
)

In [8]:
# instruct gemini to learn agents docs
prompt = ("Before writing any code, take time to thoroughly review the SDK documentation provided. "
          "Start by identifying the purpose of the SDK, the problems it solves, and how it's intended to be used. "
          "Familiarize yourself with the core concepts, available classes or methods, and any initialization or setup requirements. "
          "Pay close attention to usage examples, common patterns, and any limitations or edge cases called out in the docs. "
          "As you read, think about production issues that might occur and think about how you will work around them. "
          "Your goal is to build a mental model of how the SDK works so you can use it effectively and avoid common pitfalls. "
          "Tell me when you are finished."
         )

contents = [
    Part.from_text(text = prompt)
    , Part.from_text(text = f"<DOCUMENTATION>")
    , Part.from_uri(file_uri= uploaded_blob_uri, mime_type="text/plain", )
    , Part.from_text(text = f"</DOCUMENTATION>")
]

# create chat
chat = client.chats.create(model=MODEL_ID)
response = chat.send_message(message=contents, config=generate_content_config,)
display(Markdown(response.text))

Okay, I have thoroughly reviewed the provided documentation for the Agent Development Kit (ADK).

Here's a summary of my understanding:

1.  **Purpose & Problem Solved:** ADK is a Python framework designed to build, evaluate, and deploy AI agents, ranging from simple single-agent applications to complex multi-agent systems. It aims to provide structure and flexibility, especially integrating well with Google's Gemini models and cloud services, while remaining open to other models and tools. It addresses the challenges of orchestration, state management, tool usage, deployment, and evaluation in agent development.

2.  **Core Concepts & Components:**
    *   **Agents:** The fundamental execution units. Key types include `LlmAgent` (driven by an LLM for reasoning and dynamic decisions), `WorkflowAgent` (`SequentialAgent`, `ParallelAgent`, `LoopAgent` for deterministic control flow), and `CustomAgent` (inheriting from `BaseAgent`). Multi-agent systems are a core focus, enabling hierarchical structures and delegation.
    *   **Tools:** Extend agent capabilities (e.g., API calls, database queries, code execution, web search). ADK supports custom Python `FunctionTool`s (including long-running ones), `AgentTool` (using another agent as a tool), built-in tools (`google_search`, `built_in_code_execution`, `VertexAiSearchTool`), third-party wrappers (`LangchainTool`, `CrewaiTool`), and tools generated from OpenAPI specs (`OpenAPIToolset`) or discovered via MCP (`MCPToolset`) or Google Cloud services (API Hub, App Integration). Authentication for tools is handled via `AuthScheme` and `AuthCredential`.
    *   **Runtime & Orchestration:** The `Runner` class manages the agent execution lifecycle, driven by an asynchronous event loop. `Events` are the primary communication mechanism, carrying content and `actions` (like state/artifact deltas, control signals). `InvocationContext`, `CallbackContext`, and `ToolContext` provide access to runtime information, state, and services within agents, tools, and callbacks.
    *   **State, Session & Memory:** `Session` represents a single conversation thread, containing `Events` (history) and `State` (session-specific, dictionary-like data). `SessionService` (with implementations like `InMemory`, `Database`, `VertexAI`) manages session persistence. `Memory` (via `MemoryService`) provides a searchable store for longer-term, cross-session knowledge.
    *   **Artifacts:** Managed by `ArtifactService` (`InMemory`, `GCS`), allowing agents/tools to save and load versioned binary data (files) using `context.save_artifact`/`load_artifact`.
    *   **Callbacks:** Python functions (`before_agent`, `after_agent`, `before_model`, `after_model`, `before_tool`, `after_tool`) that hook into the execution lifecycle for logging, validation, modification, guardrails, or control flow changes. Returning specific objects from callbacks can override default behavior.
    *   **Models:** Primarily integrates with Gemini (via `google-genai`, supporting Google AI Studio and Vertex AI backends), but supports other models via `LiteLlm` wrapper, direct Vertex AI endpoints, or custom `BaseLlm` implementations.
    *   **Deployment:** Provides options like Vertex AI Agent Engine (managed service), Cloud Run, and GKE, often facilitated by `adk` CLI commands (`adk deploy`).
    *   **Evaluation:** Includes tools (`AgentEvaluator`, `adk eval`, UI integration) for testing agent performance against datasets, evaluating both final response and execution trajectory.
    *   **Safety & Security:** Offers mechanisms like authentication, callback-based guardrails, sandboxed code execution (Vertex Code Interpreter recommended), and encourages network controls.

3.  **Intended Usage & Patterns:**
    *   Start simple with `LlmAgent` and `FunctionTool`.
    *   Build complex workflows using `WorkflowAgent`s or `CustomAgent`s.
    *   Design modular systems using multi-agent architectures with delegation (`transfer_to_agent` or `AgentTool`).
    *   Manage conversation memory using `session.state` (via context objects or `output_key`).
    *   Handle files/binary data using `Artifacts`.
    *   Implement safety and custom logic using `Callbacks`.
    *   Leverage built-in tools or integrate existing APIs/tools easily.
    *   Test thoroughly using the evaluation framework.
    *   Deploy using appropriate cloud services.

4.  **Production Considerations & Potential Issues:**
    *   **Persistence:** `InMemory` services are not suitable for production; choose persistent backends (`Database`, `VertexAI`, `GCS`).
    *   **Security:** Tool authentication, input/output validation (guardrails), secure credential handling, sandboxed code execution, and escaping UI output are crucial.
    *   **Error Handling:** Implement robust error handling in tools and callbacks.
    *   **State Management:** Avoid direct state modification; rely on the event flow. Be mindful of state size and serialization limits.
    *   **Async Nature:** Design for asynchronous execution; be cautious with blocking operations in callbacks/tools.
    *   **Tool Reliability:** LLM reliance on docstrings means clear documentation is essential. Simple, focused tools are generally better.
    *   **Scalability:** Choose deployment methods appropriate for expected load. Stateful MCP connections might need specific infrastructure.

I have built a mental model of ADK as a comprehensive framework for building structured, stateful, and tool-augmented AI agents, with strong ties to the Google ecosystem but designed for extensibility.

**Finished.**

In [None]:
# run from command line
python -m venv myenv
source myenv/bin/activate
pip install google-adk

mkdir adk_tutorial
cd adk_tutorial
mkdir my_first_agent
echo "GOOGLE_GENAI_USE_VERTEXAI=True" >> .env
echo "GOOGLE_CLOUD_PROJECT=ap-alto-ml-1000" >> .env
echo "GOOGLE_CLOUD_LOCATION=us-central1" >> .env
echo "MODEL=gemini-2.0-flash-001" >> .env
touch my_first_agent/__init__.py
echo "from . import agent" > my_first_agent/__init__.py

In [None]:
- Always deploy to Vertex AI Agent Engine

In [11]:
# create chat
prompt = ("OBJECTIVE: Teach an new analyst how to build agents using this SDK. "
          "Start with getting them to have one agent running as fast as possible and then walk them through the harder concepts. "
          "Walk the analyst through learning in stages i.e. present one concept of the Agent SDK, then generate the code needed to implement the concept. "
          "Before you present the next concept, ask if the are ready to move on. "
         )

prompt += """Assume the following is true or has been set up already:
- always use four spaces for code indentation 
- GOOGLE_GENAI_USE_VERTEXAI = True
- MODEL=gemini-2.0-flash-001

# the following has been run from the command line run from command line
python -m venv myenv
source myenv/bin/activate
pip install google-adk

mkdir adk_tutorial
cd adk_tutorial
mkdir my_first_agent
echo "GOOGLE_GENAI_USE_VERTEXAI=True" >> .env
echo "GOOGLE_CLOUD_PROJECT=ap-alto-ml-1000" >> .env
echo "GOOGLE_CLOUD_LOCATION=us-central1" >> .env
echo "MODEL=gemini-2.0-flash-001" >> .env
touch my_first_agent/__init__.py
echo "from . import agent" > my_first_agent/__init__.py

# Their project structure is below.  NOTE: agent.py is empty:

adk_tutorial/
├── my_first_agent/
│   ├── __init__.py
│   └── agent.py
└── .env
"""
response = chat.send_message(message=prompt)
# self reflection step
p = (
  "Before the analyst views this, conduct a thorough review of your output. "
  "Correct any errors or make improvements and return your output. "
)
response = chat.send_message(message=p)
display(Markdown(response.text))

Okay, Analyst! Welcome to the world of building AI agents with the Agent Development Kit (ADK). My goal is to get you up and running quickly and then introduce more advanced concepts step-by-step.

You've already set up your Python environment, installed the ADK, and created the basic project structure and `.env` file. Great start!

Your current project structure should look like this:

```
adk_tutorial/
├── my_first_agent/    # Your agent's code will go here
└── .env               # Contains GCP project/location/Vertex settings
```

Let's begin with the absolute basics.

**Stage 1: Your First, Simplest Agent**

**Concept:** The core building block in ADK is the `Agent` (specifically, `LlmAgent`, which `Agent` is an alias for). At its simplest, an agent needs:

1.  A `name`: A unique identifier for this agent (e.g., `simple_assistant`).
2.  A `model`: The Large Language Model (LLM) that will power its reasoning. We'll use the pre-configured `gemini-2.0-flash-001`.
3.  An `instruction`: A prompt telling the agent how to behave or what its purpose is.

We define this agent in a Python file (`agent.py`) within its own directory (`my_first_agent`). We also need an `__init__.py` file in that directory to make Python treat it as a package, allowing ADK tools to find and load our agent.

**Implementation:**

First, let's create the `__init__.py` file inside your `my_first_agent` directory.

```python
# filename: my_first_agent/__init__.py
# This line makes the 'agent.py' module and its contents accessible.
from . import agent
```

Now, let's create the `agent.py` file. This is where we define the actual agent.

```python
# filename: my_first_agent/agent.py
from google.adk.agents import Agent # Import the base Agent class

# --- Constants ---
# Use the model specified in the requirements
MODEL = "gemini-2.0-flash-001"

# --- Agent Definition ---
# We name the variable 'root_agent'. This is the default name
# ADK command-line tools look for when loading an agent package.
root_agent = Agent(
    # A unique name for this specific agent instance
    name="simple_assistant",

    # The LLM powering the agent's "brain"
    model=MODEL,

    # Instructions defining the agent's behavior and persona
    instruction="You are a friendly and concise assistant. Keep your answers short and helpful.",
)

# Optional: Print a message when this module is loaded to confirm definition
print(f"Agent '{root_agent.name}' defined using model '{MODEL}'")
```

Your project structure should now be:

```
adk_tutorial/
├── my_first_agent/
│   ├── __init__.py
│   └── agent.py
└── .env
```

**Running the Agent:**

Now, let's run this agent using the ADK command-line tool.

1.  **Make sure you are in the `adk_tutorial` directory** in your terminal (the directory *containing* `my_first_agent`). Your virtual environment (`myenv`) should still be active.
2.  Run the following command:

```bash
adk run my_first_agent
```

This command performs the following steps:
*   Looks for the `my_first_agent` directory (which is a Python package thanks to `__init__.py`).
*   Loads the `root_agent` variable from the `agent` module within that package.
*   Starts an interactive chat session directly in your terminal.
*   Uses the settings from your `.env` file (GCP project, location, and `USE_VERTEXAI=True`) to configure the connection to the Gemini model via Vertex AI.

You should see output indicating the server is running, and then a prompt like `user:`. Try typing a simple question like "Hello" or "What is the capital of France?". The agent should respond based on its instruction ("Keep your answers short and helpful").

Press `Ctrl+C` to exit the chat when you're finished.

**Summary of Stage 1:**
You have successfully:
*   Defined a basic `Agent` with a name, model, and instruction.
*   Structured your code within a Python package (`my_first_agent`).
*   Run the agent interactively using the `adk run` command.
*   Observed the agent responding based on its connection to the specified Gemini model via Vertex AI (using your `.env` settings).

Are you ready to move on to Stage 2, where we'll give the agent a tool to perform a specific action?

In [12]:
## create chat
prompt = ("Analyst: yes I am ready for the next step.")
response = chat.send_message(message=prompt)

# self reflection step
p = (
  "Before the analyst views this conduct a thorough review of your output. "
  "Correct any errors or make improvements and return your output. "
)
response = chat.send_message(message=p)
display(Markdown(response.text))

Okay, let's refine that Stage 2 explanation and code.

Great! Let's give our agent its first superpower: the ability to perform a specific action using a **Tool**.

**Stage 2: Giving the Agent a Tool**

**Concept:** Tools extend an agent's capabilities beyond basic conversation. They allow the agent's LLM brain to trigger specific, predefined actions, like calling external APIs, running calculations, or accessing databases. The simplest way to create a tool in ADK is by wrapping a standard Python function using `FunctionTool`.

For the LLM to use the tool correctly, it relies heavily on:

1.  **The function's signature:** Clear parameter names and **type hints** (e.g., `city: str`, or `-> dict` for the return type) help the LLM understand what inputs are needed and what output to expect.
2.  **The function's docstring:** This is crucial! It tells the LLM *what the tool does*, *when to use it*, *what the parameters mean*, and *what the returned dictionary structure looks like*.

We'll create a tool that gets the current date and time.

**Implementation:**

1.  **Define the Tool Function:** Add the following Python function to your `agent.py` file. Notice the type hint (`-> dict`) indicating it returns a dictionary, and the detailed docstring explaining its purpose and return structure. It's a best practice for tools to return dictionaries, often including a `status` key (though this simple example doesn't strictly need one).

2.  **Wrap with `FunctionTool`:** Import `FunctionTool` from `google.adk.tools` and create an instance wrapping your function.

3.  **Update the Agent Definition:** Modify the `root_agent` definition in `agent.py`:
    *   Add the `tools` parameter, providing a list containing your `FunctionTool` instance (e.g., `tools=[time_tool]`).
    *   Update the `instruction` to explicitly tell the agent about the new tool (using its function name, `get_current_time`) and guide it on when it should use this tool.

Here's the updated `my_first_agent/agent.py` file:

```python
# filename: my_first_agent/agent.py
from google.adk.agents import Agent
from google.adk.tools import FunctionTool # Import FunctionTool
import datetime # Import datetime for the new tool

# --- Constants ---
MODEL = "gemini-2.0-flash-001"

# --- Tool Definition ---
def get_current_time() -> dict:
    """
    Gets the current date and time.

    Use this tool ONLY when the user explicitly asks for the current time or date.
    Do not use it for asking about time in the past or future.

    Returns:
        A dictionary containing the current date and time string.
        Example format: {'current_time': '2024-08-15 14:30:55'}
    """
    # Log to the console when the tool is executed for visibility
    print("--- Tool: get_current_time executing ---")
    now = datetime.datetime.now()
    time_str = now.strftime("%Y-%m-%d %H:%M:%S")
    # Return the result structured as a dictionary
    return {"current_time": time_str}

# --- Wrap the function as a tool ---
# This makes the function callable by the agent's LLM
time_tool = FunctionTool(func=get_current_time)

# --- Agent Definition (Updated) ---
# Renaming the agent variable to reflect its new capability
root_agent = Agent(
    name="assistant_with_time",
    model=MODEL,
    instruction="You are a friendly assistant. Keep answers concise. "
                "If the user explicitly asks for the current time or date, "
                "you MUST use the 'get_current_time' tool to find the answer. " # Explicit instruction
                "For all other questions, answer directly.",
    tools=[time_tool], # Provide the tool instance to the agent
)

# Print confirmation when the module is loaded
print(f"Agent '{root_agent.name}' defined using model '{MODEL}' with tools: {[t.name for t in root_agent.tools]}")

```

**Running the Agent with the Tool:**

Go back to your terminal (still in the `adk_tutorial` directory, with the `myenv` virtual environment active). Run the agent again using the same command as before:

```bash
adk run my_first_agent
```

Now, try these prompts in the interactive chat:

*   `What time is it now?`
*   `Can you tell me today's date?`
*   `What is the capital of Spain?` (This should *not* use the time tool)

**Expected Output:**

When you ask for the time or date, you should observe the following sequence:

1.  The agent receives your prompt.
2.  The LLM, guided by the updated `instruction`, decides to use the `get_current_time` tool.
3.  You'll see the `--- Tool: get_current_time executing ---` message printed in your terminal, confirming the tool was called.
4.  The tool returns the time dictionary (e.g., `{'current_time': '2024-08-15 14:30:55'}`).
5.  This result is sent back to the LLM.
6.  The LLM formulates the final response based on the tool's output, which the agent then displays (e.g., "The current time is 2024-08-15 14:30:55.").

When you ask about Spain, the agent should recognize this doesn't match the condition for using the time tool and should answer directly using only the LLM, without printing the tool execution log.

Press `Ctrl+C` to exit when you are done experimenting.

**Summary of Stage 2:**
You have now successfully:
*   Defined a Python function (`get_current_time`) to perform a specific action.
*   Wrapped that function into an ADK `FunctionTool`, making it available to the agent.
*   Provided the tool to your `Agent` via the `tools` list.
*   Updated the agent's `instruction` to guide the LLM on *when* and *how* to use the specific tool.
*   Observed the agent correctly selecting and calling the tool based on the user's prompt and its instructions.

This demonstrates the basic mechanism for extending agent capabilities with custom actions.

Are you ready to move on to Stage 3, where we'll look at how agents manage conversation memory using Session State?

In [13]:
## add to chat
prompt = ("Analyst: yes I am ready for the next step.")
response = chat.send_message(message=prompt)

# self reflection step
p = (
  "Before the analyst views this conduct a thorough review of your output. "
  "Correct any errors or make improvements and return your output. "
)
response = chat.send_message(message=p)
display(Markdown(response.text))

Okay, let's polish Stage 3.

Excellent. Now that our agent can perform actions using tools, let's explore how it can remember information within a single conversation. This is crucial for more natural and helpful interactions.

**Stage 3: Remembering Information with Session State**

**Concept:** Imagine asking the agent the time, and then later asking, "When did I just ask you for the time?". Without memory, the agent wouldn't know. ADK provides **Session State** (`session.state`) for this purpose.

*   **What it is:** A Python dictionary associated with the current conversation `Session`. It acts as the agent's short-term, dynamic memory or "scratchpad" for that specific chat, distinct from the static `instruction` or the historical `events`.
*   **Persistence:** The data stored here lasts for the duration of the `Session`. Whether it survives application restarts depends on the `SessionService` being used. Our current `InMemorySessionService` **loses all state** when the `adk run` command stops. Other services (like `DatabaseSessionService`) can make state persistent.
*   **Access & Updates (The ADK Way):** The recommended way to interact with state is within **Tools** (or Callbacks, which we'll see later).
    *   By adding a special parameter `tool_context: ToolContext` to your tool function's signature, ADK automatically provides this context object when the tool is called.
    *   `ToolContext` has a `.state` attribute which represents the session state dictionary. You can *read* from it using standard dictionary methods (`tool_context.state.get('my_key', default_value)`).
    *   You can *write* to it using dictionary assignment (`tool_context.state['my_key'] = 'my_value'`).
    *   **Crucially:** ADK automatically tracks modifications made via `tool_context.state` and ensures they are saved correctly as part of the event flow for that conversation turn, updating the session managed by the `SessionService`.

**Implementation:**

1.  **Modify the Tool:** Update the `get_current_time` function to accept `tool_context: ToolContext`. Inside the function, we'll first read a value from the state (`last_time_checked`) and print it (if it exists), demonstrating reading. Then, we'll *write* the newly fetched time back into the state using the same key, demonstrating writing.
2.  **Update Agent Instruction:** Modify the agent's instruction slightly to reflect that the tool now internally records when it was last used.

Here's the updated `my_first_agent/agent.py`:

```python
# filename: my_first_agent/agent.py
from google.adk.agents import Agent
from google.adk.tools import FunctionTool
from google.adk.tools.tool_context import ToolContext # <<< Import ToolContext
import datetime

# --- Constants ---
MODEL = "gemini-2.0-flash-001"

# --- Tool Definition (Updated for State) ---
# Add 'tool_context: ToolContext' as the last parameter
def get_current_time(tool_context: ToolContext) -> dict:
    """
    Gets the current date and time and stores it in session state.

    Use this tool ONLY when the user explicitly asks for the current time or date.
    It also updates the 'last_time_checked' value in the session state.

    Returns:
        A dictionary containing the current date and time string.
        Example format: {'current_time': '2024-08-15 14:30:55'}
    """
    print("--- Tool: get_current_time executing ---")

    # --- Read from State (using tool_context) ---
    # Use .get() to safely retrieve the value, providing a default if the key doesn't exist
    last_checked = tool_context.state.get('last_time_checked', 'Never')
    print(f"--- Tool: Value of 'last_time_checked' in state before update: {last_checked} ---")

    # --- Perform Tool's Core Action ---
    now = datetime.datetime.now()
    time_str = now.strftime("%Y-%m-%d %H:%M:%S")

    # --- Write to State (using tool_context) ---
    # Assign the new time string to the state dictionary key
    tool_context.state['last_time_checked'] = time_str
    # ADK framework will handle saving this change associated with the tool's result event
    print(f"--- Tool: Updated 'last_time_checked' state to: {time_str} ---")

    # Return the primary result of the tool
    return {"current_time": time_str}

# --- Wrap the updated function as a tool ---
time_tool = FunctionTool(func=get_current_time)

# --- Agent Definition (Updated Instruction) ---
root_agent = Agent(
    name="assistant_with_stateful_time", # New name reflecting statefulness
    model=MODEL,
    instruction="You are a friendly assistant. Keep answers concise. "
                "If the user explicitly asks for the current time or date, "
                "you MUST use the 'get_current_time' tool. " # Instruction remains focused on tool usage condition
                "This tool internally notes when it was last used. "
                "Answer other questions directly.",
    tools=[time_tool], # Provide the updated tool instance
)

# Confirmation message
print(f"Agent '{root_agent.name}' defined using model '{MODEL}' with tools: {[t.name for t in root_agent.tools]}")
```

**Running the Agent with State:**

1.  Go back to your terminal (make sure you're in the `adk_tutorial` directory).
2.  Ensure your virtual environment (`myenv`) is active.
3.  Run the agent using the same command:

```bash
adk run my_first_agent
```

Now, try this specific sequence of prompts in the interactive chat:

1.  `What time is it?`
2.  `What time is it again?`
3.  `What is the capital of Germany?`

**Expected Output & Observation:**

*   **First time check:**
    *   The agent decides to use the tool.
    *   The terminal log shows: `--- Tool: Value of 'last_time_checked' in state before update: Never ---`
    *   Then: `--- Tool: Updated 'last_time_checked' state to: <current_time_1> ---`
    *   The agent responds with the current time (e.g., "The current time is `<current_time_1>`.").
*   **Second time check:**
    *   The agent uses the tool again.
    *   The terminal log shows: `--- Tool: Value of 'last_time_checked' in state before update: <current_time_1> ---` (It successfully read the value saved during the previous turn!).
    *   Then: `--- Tool: Updated 'last_time_checked' state to: <current_time_2> ---`
    *   The agent responds with the new current time (e.g., "The current time is `<current_time_2>`.").
*   **Capital of Germany:**
    *   The agent should answer directly ("The capital of Germany is Berlin." or similar).
    *   The `get_current_time` tool should *not* execute (no "Tool executing" logs for this turn).
    *   The `last_time_checked` state stored within the session remains `<current_time_2>`.

This sequence clearly demonstrates that `session.state` allows the agent (via its tool and `ToolContext`) to maintain and access information across different turns within the same conversation run.

**Summary of Stage 3:**
You have learned:
*   What Session State (`session.state`) is and its role as short-term conversational memory.
*   How to access (`tool_context.state.get`) and modify (`tool_context.state['key'] = value`) session state within a tool function using the automatically provided `ToolContext`.
*   That ADK handles the persistence of these state changes within the session lifecycle (though `InMemorySessionService` is non-persistent across application restarts).
*   Observed state being read and updated across multiple interactions with the agent.

Now that we can store and access dynamic information within a conversation using state, we can explore **Callbacks**. Callbacks let us insert custom logic at specific points in the agent's execution – for example, to *react* to the current state before the LLM is called, or to add safety checks before a tool runs.

Are you ready to move on to Stage 4 and learn about Callbacks?

In [14]:
## add to chat
prompt = ("Analyst: yes I am ready for the next step.")
response = chat.send_message(message=prompt)

# self reflection step
p = (
  "Before the analyst views this conduct a thorough review of your output. "
  "Correct any errors or make improvements and return your output. "
)
response = chat.send_message(message=p)
display(Markdown(response.text))

Okay, let's ensure Stage 4 is crystal clear.

Excellent. Now that our agent can perform actions using tools and remember information using state, let's explore **Callbacks**. These are powerful hooks that let you observe, or even change, what the agent is doing at specific moments during its execution.

**Stage 4: Intercepting Execution with Callbacks (`before_model_callback`)**

**Concept:** Callbacks are like checkpoints or middleware functions you provide to an agent. ADK automatically runs these functions at predefined points in the agent's process. This allows you to insert your own custom logic without modifying the core ADK framework.

There are several types of callbacks. We'll start with `before_model_callback`. This function runs *just before* the agent sends its prepared request (including instructions, history, and the current user query) to the underlying Large Language Model (LLM). It runs *every time* this specific agent is about to call its LLM.

**Why use `before_model_callback`?**

*   **Logging/Observability:** See exactly what prompt and context are being sent to the LLM for debugging or monitoring.
*   **Input Guardrails/Validation:** Check the user's input (which is part of the LLM request) for forbidden topics, sensitive data patterns (like PII), or policy violations, and potentially block the LLM call if needed.
*   **Dynamic Prompt Engineering:** Add timely information from the current `session.state` (like user preferences or recent events) to the request context just before sending it to the LLM.

**How it works:**

1.  **Define the Function:** Create a Python function that **must** accept two specific arguments:
    *   `callback_context: CallbackContext`: Provides access to runtime information like the agent's name, the unique ID for the current request (`invocation_id`), and crucially, the session `state` (`callback_context.state`).
    *   `llm_request: LlmRequest`: Contains the *complete payload* that ADK is about to send to the LLM. This includes the conversation history (`llm_request.contents`) and model configuration (`llm_request.config`, which contains the `system_instruction`).
2.  **Implement Logic:** Inside your callback function:
    *   You can **read** `callback_context.state`.
    *   You can **inspect** `llm_request` data.
    *   You can **modify** mutable parts of the `llm_request` object (like the `system_instruction` within `llm_request.config`).
    *   **Control the Flow** using the return value:
        *   `return None`: This is the standard signal to ADK. It means your callback has finished, and ADK should **proceed** to call the LLM with the `llm_request` (which might have been modified by your callback).
        *   `return LlmResponse(...)`: If your callback returns an `LlmResponse` object, ADK **skips the actual LLM call entirely**. The `LlmResponse` you provide is treated *as if* it came from the LLM, and the agent's execution continues based on that response. This is the mechanism for implementing blocking guardrails.

**Implementation:**

1.  **Define the Callback Function:** We'll create `log_and_prefix_callback`. This function will:
    *   Print log messages indicating it's running.
    *   Read the `last_time_checked` value from the session state via `callback_context.state`.
    *   Add a dynamic prefix (including the `last_time_checked` value) to the beginning of the `system_instruction` within the `llm_request.config`.
2.  **Update the Agent Definition:** Modify the `root_agent` definition to include the `before_model_callback` parameter, assigning our new function to it.

Here's the updated `my_first_agent/agent.py`:

```python
# filename: my_first_agent/agent.py
from google.adk.agents import Agent
from google.adk.tools import FunctionTool
from google.adk.tools.tool_context import ToolContext
# Imports needed for the callback
from google.adk.agents.callback_context import CallbackContext # <<< For context argument
from google.adk.models.llm_request import LlmRequest           # <<< For request argument
from google.adk.models.llm_response import LlmResponse         # <<< For optional blocking return
from google.genai import types                                 # <<< For creating/modifying Content
from typing import Optional                                    # <<< For return type hint (Optional[LlmResponse])
import datetime

# --- Constants ---
MODEL = "gemini-2.0-flash-001"

# --- Tool Definition (from Stage 3) ---
def get_current_time(tool_context: ToolContext) -> dict:
    """Gets the current date and time and stores it in session state."""
    print("--- Tool: get_current_time executing ---")
    last_checked = tool_context.state.get('last_time_checked', 'Never')
    print(f"--- Tool: Value of 'last_time_checked' in state before update: {last_checked} ---")
    now = datetime.datetime.now()
    time_str = now.strftime("%Y-%m-%d %H:%M:%S")
    tool_context.state['last_time_checked'] = time_str
    print(f"--- Tool: Updated 'last_time_checked' state to: {time_str} ---")
    return {"current_time": time_str}

time_tool = FunctionTool(func=get_current_time)

# --- Callback Definition ---
def log_and_prefix_callback(
    callback_context: CallbackContext, llm_request: LlmRequest
) -> Optional[LlmResponse]: # Return type hint
    """
    Logs context info, reads state, modifies the LLM request's system
    instruction, and allows the call to proceed.
    """
    agent_name = callback_context.agent_name
    invocation_id = callback_context.invocation_id
    # Read state via the callback context
    last_checked = callback_context.state.get('last_time_checked', 'Not checked yet')

    print(f"\n--- Callback: before_model_callback executing for Agent '{agent_name}' (Invocation: {invocation_id}) ---")
    print(f"--- Callback: Last time checked (from state read): {last_checked} ---")

    # --- Modify the request's system instruction ---
    prefix = f"[Callback Info: Last time check was {last_checked}] "
    try:
        # Ensure config exists
        if not llm_request.config:
            llm_request.config = types.GenerationConfig() # Create if missing (unlikely but safe)
            print("--- Callback WARNING: llm_request.config was missing, created new. ---")

        # Get or create system_instruction as Content object
        sys_instruction = llm_request.config.system_instruction
        if not sys_instruction:
            sys_instruction = types.Content(role="system", parts=[types.Part(text="")])
        elif isinstance(sys_instruction, str): # Convert if string
             sys_instruction = types.Content(role="system", parts=[types.Part(text=sys_instruction)])
        elif not isinstance(sys_instruction, types.Content):
             # If it's some other unexpected type, log a warning and don't modify
             print(f"--- Callback WARNING: Unexpected type for system_instruction: {type(sys_instruction)}. Cannot modify. ---")
             return None # Allow call to proceed without modification

        # Ensure parts list exists
        if not sys_instruction.parts:
            sys_instruction.parts.append(types.Part(text=""))

        # Prepend prefix to the first part's text
        original_text = sys_instruction.parts[0].text or ""
        sys_instruction.parts[0].text = prefix + original_text
        llm_request.config.system_instruction = sys_instruction # Assign back modified object
        print(f"--- Callback: Modified system instruction to start with: '{prefix}' ---")

    except Exception as e:
        print(f"--- Callback ERROR: Failed to modify system instruction: {e} ---")
        # Decide whether to block or allow if modification fails
        # return LlmResponse(...) # Block
        return None # Allow proceed cautiously

    # --- Control Flow Decision ---
    # Return None signals ADK to proceed with the LLM call using the (now modified) llm_request
    print("--- Callback: Returning None to allow LLM call. ---")
    return None

    # # --- Example: How to BLOCK the LLM call ---
    # # Check some condition (e.g., based on user input in llm_request.contents)
    # latest_user_input = llm_request.contents[-1].parts[0].text if llm_request.contents else ""
    # if "FORBIDDEN_WORD" in latest_user_input.upper():
    #     print("--- Callback: Blocking request due to forbidden word! ---")
    #     # Create and return an LlmResponse object
    #     blocking_response = LlmResponse(
    #         content=types.Content(
    #             role="model", # Respond as the model
    #             parts=[types.Part(text="I cannot process this request due to policy violations.")]
    #         )
    #     )
    #     return blocking_response
    # # -------------------------------------------

# --- Agent Definition (Updated with Callback) ---
root_agent = Agent(
    name="assistant_with_callback", # Updated agent name
    model=MODEL,
    # Base instruction for the LLM (will be prefixed by the callback)
    instruction="You are a friendly assistant. Keep answers concise. "
                "If the user asks for the current time or date, use the 'get_current_time' tool. "
                "Answer other questions directly.",
    tools=[time_tool],
    # Assign the callback function to the agent's parameter
    before_model_callback=log_and_prefix_callback,
)

print(f"Agent '{root_agent.name}' defined using model '{MODEL}' with callback and tools: {[t.name for t in root_agent.tools]}")
```

**Running the Agent with the Callback:**

1.  Return to your terminal (in the `adk_tutorial` directory).
2.  Make sure your virtual environment (`myenv`) is active.
3.  Run the agent:

```bash
adk run my_first_agent
```

Now, try this specific sequence in the chat:

1.  `What time is it?` (This will run the tool and set the `last_time_checked` state).
2.  `What is the capital of Germany?` (This should trigger the callback *before* the LLM is called to answer the question).

**Expected Output & Observation:**

*   **First time check:**
    *   You'll see the `--- Callback: before_model_callback executing...` log message.
    *   It will print `--- Callback: Last time checked (from state read): Not checked yet ---`.
    *   It will print the message about modifying the system instruction.
    *   The agent will then decide to use the tool.
    *   The tool logs will appear (`--- Tool: get_current_time executing ---`, etc.), updating the state to `<current_time_1>`.
    *   The agent will provide the time response.
*   **Capital of Germany:**
    *   The `before_model_callback` executes again.
    *   This time, it prints `--- Callback: Last time checked (from state read): <current_time_1> ---` (demonstrating it read the state value set in the previous turn!).
    *   It modifies the system instruction again, prepending `[Callback Info: Last time check was <current_time_1>] `.
    *   It prints `--- Callback: Returning None to allow LLM call. ---`.
    *   The LLM receives the request (with the modified system instruction) and generates the answer about Berlin.
    *   The agent responds, "The capital of Germany is Berlin." (The LLM likely ignores the informational prefix we added, but the callback *did* successfully run, access state, and modify the request before the LLM saw it).

This sequence confirms that the callback intercepted the flow before the LLM call, accessed session state, modified the outgoing request, and correctly allowed the LLM call to proceed by returning `None`.

**Summary of Stage 4:**
You have now:
*   Learned the concept of Callbacks in ADK and the purpose of `before_model_callback`.
*   Defined a `before_model_callback` function with the required signature (`callback_context`, `llm_request`).
*   Used `CallbackContext` to read session state within the callback.
*   Used `LlmRequest` to inspect and *modify* the data being sent to the LLM (specifically, the `system_instruction`).
*   Understood how returning `None` allows the LLM call to proceed, while returning an `LlmResponse` would block it (as shown in the commented-out example code).
*   Observed the callback executing and interacting with state during an agent run.

Callbacks are essential tools for adding custom logic, validation, logging, and safety layers to your agents.

Are you ready for the next stage? We could explore another callback type like `before_tool_callback` for adding guardrails *specifically around tool usage*, or would you prefer to dive into building a **multi-agent system** where agents can delegate tasks to each other?

In [15]:
## add to chat
prompt = ("Analyst: yes I am ready for the next step.")
response = chat.send_message(message=prompt)

# self reflection step
p = (
  "Before the analyst views this conduct a thorough review of your output. "
  "Correct any errors or make improvements and return your output. "
)
response = chat.send_message(message=p)
display(Markdown(response.text))

Okay, let's ensure Stage 5 is polished and easy to follow.

Great, let's add another layer of control using a different type of callback.

**Stage 5: Adding Tool Guardrails with `before_tool_callback`**

**Concept:** In Stage 4, we used `before_model_callback` to intervene *before* the LLM thinks. Now, we'll use `before_tool_callback`. This runs *after* the LLM has decided which tool to use and what arguments to give it, but crucially, *just before* the tool's actual Python function is executed.

**Why use `before_tool_callback`?**

*   **Argument Validation:** Check if the arguments the LLM generated are valid (e.g., correct format, within allowed ranges, not malicious).
*   **Tool-Specific Guardrails:** Prevent the tool from running based on specific inputs (like blocking a certain city in our weather tool) or the current session state (e.g., "User is not authorized for this action").
*   **Resource Protection:** Stop potentially expensive, rate-limited, or harmful tool executions before they happen.
*   **Dynamic Argument Injection/Modification:** Adjust the arguments based on session state right before the tool function receives them.

**How it works:**

1.  **Define the Function:** Create a Python function that **must** accept three specific arguments:
    *   `tool: BaseTool`: The specific ADK tool object (like our `FunctionTool` instance) that is about to be executed. You can inspect properties like `tool.name`.
    *   `args: Dict[str, Any]`: The dictionary of arguments that the LLM generated for the tool call.
    *   `tool_context: ToolContext`: Provides access to the session `state` (`tool_context.state`) and other contextual information related to this specific tool invocation.
2.  **Implement Logic:** Inside your callback function:
    *   You can **inspect** `tool.name` and the `args` dictionary.
    *   You can **read** `tool_context.state`.
    *   You can **modify** the `args` dictionary directly. If your callback returns `None`, the tool function will be called with these (potentially modified) arguments.
    *   **Control the Flow** using the return value:
        *   `return None`: This signals ADK to **proceed** and execute the actual tool function (`get_current_time` in our case) with the `args` provided (or modified by the callback).
        *   `return dict(...)`: If your callback returns a **dictionary**, ADK **skips executing the actual tool function**. The dictionary you return is treated *as if* it were the result of the tool call. This is the mechanism for blocking or overriding tool execution. It's best practice for this dictionary to match the structure the blocked tool normally returns (e.g., include a `status` and `error_message`).

**Implementation:**

1.  **Define the Tool Callback Function:** We'll create `check_allow_time_tool_callback`. This function checks a flag `allow_time_checks` in the session state. If the flag exists and is `False`, it blocks the `get_current_time` tool by returning an error dictionary. Otherwise, it allows the tool to run.
2.  **Update the Agent Definition:** Modify the `root_agent` definition again to include *both* the `before_model_callback` (from Stage 4, improved for safety) and this new `before_tool_callback`.

Here's the updated `my_first_agent/agent.py`:

```python
# filename: my_first_agent/agent.py
from google.adk.agents import Agent
from google.adk.tools import FunctionTool, BaseTool # <<< Import BaseTool for type hint
from google.adk.tools.tool_context import ToolContext
from google.adk.agents.callback_context import CallbackContext
from google.adk.models.llm_request import LlmRequest
from google.adk.models.llm_response import LlmResponse
from google.genai import types
from typing import Optional, Dict, Any # <<< Added Dict, Any for type hints
import datetime
import copy # <<< Import copy for safe request modification

# --- Constants ---
MODEL = "gemini-2.0-flash-001"

# --- Tool Definition (from Stage 3) ---
def get_current_time(tool_context: ToolContext) -> dict:
    """Gets the current date and time and stores it in session state."""
    print("--- Tool: get_current_time executing ---")
    last_checked = tool_context.state.get('last_time_checked', 'Never')
    print(f"--- Tool: Value of 'last_time_checked' in state before update: {last_checked} ---")
    now = datetime.datetime.now()
    time_str = now.strftime("%Y-%m-%d %H:%M:%S")
    tool_context.state['last_time_checked'] = time_str
    print(f"--- Tool: Updated 'last_time_checked' state to: {time_str} ---")
    return {"current_time": time_str}

time_tool = FunctionTool(func=get_current_time)

# --- Callback Definition 1 (Improved from Stage 4) ---
def log_and_prefix_callback(
    callback_context: CallbackContext, llm_request: LlmRequest
) -> Optional[LlmResponse]:
    """Logs info, reads state, modifies sys instruction safely, allows proceed."""
    agent_name = callback_context.agent_name
    invocation_id = callback_context.invocation_id
    last_checked = callback_context.state.get('last_time_checked', 'Not checked yet')
    print(f"\n--- Callback(Model): before_model_callback for Agent '{agent_name}' (Inv: {invocation_id}) ---")
    print(f"--- Callback(Model): Last time checked (state read): {last_checked} ---")
    prefix = f"[Callback Info: Last time check was {last_checked}] "
    try:
        if llm_request.config:
            # Use deepcopy to avoid modifying the original config object directly
            new_config = copy.deepcopy(llm_request.config)
            sys_instruction = new_config.system_instruction
            # Safely prepare Content object
            if not sys_instruction:
                sys_instruction = types.Content(role="system", parts=[types.Part(text="")])
            elif isinstance(sys_instruction, str):
                sys_instruction = types.Content(role="system", parts=[types.Part(text=sys_instruction)])
            elif not isinstance(sys_instruction, types.Content):
                 print(f"--- Callback(Model) WARNING: Unexpected type for system_instruction: {type(sys_instruction)}. Not modifying. ---")
                 return None
            if not sys_instruction.parts:
                sys_instruction.parts.append(types.Part(text=""))
            # Modify the copy
            original_text = sys_instruction.parts[0].text or ""
            sys_instruction.parts[0].text = prefix + original_text
            # Assign the modified copy back to the request
            llm_request.config = new_config
            print(f"--- Callback(Model): Modified system instruction prefix: '{prefix}' ---")
        else:
            print("--- Callback(Model) WARNING: llm_request.config missing. ---")
    except Exception as e:
        print(f"--- Callback(Model) ERROR: Failed to modify sys instruction: {e} ---")
        return None # Allow proceed cautiously
    print("--- Callback(Model): Returning None to allow LLM call. ---")
    return None

# --- Callback Definition 2 (New for Stage 5) ---
def check_allow_time_tool_callback(
    tool: BaseTool, args: Dict[str, Any], tool_context: ToolContext
) -> Optional[Dict]:
    """
    Checks 'allow_time_checks' state. If False, blocks 'get_current_time' tool.
    Otherwise allows it.
    """
    tool_name = tool.name
    agent_name = tool_context.agent_name
    print(f"\n--- Callback(Tool): before_tool_callback for Tool '{tool_name}' (Agent: {agent_name}) ---")
    print(f"--- Callback(Tool): Args received from LLM: {args} ---")

    # --- Guardrail Logic Specific to 'get_current_time' ---
    if tool_name == 'get_current_time':
        # Read the flag from session state, default to True if not set
        allow_checks = tool_context.state.get('allow_time_checks', True)
        print(f"--- Callback(Tool): 'allow_time_checks' state is currently: {allow_checks} ---")

        if not allow_checks:
            print(f"--- Callback(Tool): Blocking tool '{tool_name}' because 'allow_time_checks' is False! ---")
            # Return a dictionary matching the tool's expected output format (or similar error structure)
            # This dictionary becomes the tool's result, SKIPPING the actual tool run.
            return {
                "status": "blocked_by_policy", # Custom status
                "error_message": "Sorry, checking the time is currently disabled by system policy."
            }
        else:
            print(f"--- Callback(Tool): 'allow_time_checks' is True. Allowing tool '{tool_name}' to run. ---")
    else:
        # If the callback is triggered for a different tool, ignore it.
         print(f"--- Callback(Tool): Tool '{tool_name}' is not 'get_current_time'. No action taken by this callback. ---")

    # Return None to signal ADK to execute the actual tool function
    print("--- Callback(Tool): Returning None to allow tool execution. ---")
    return None

# --- Agent Definition (Updated with BOTH Callbacks) ---
root_agent = Agent(
    name="assistant_with_dual_callbacks", # New name reflecting both callbacks
    model=MODEL,
    instruction="You are a friendly assistant. Keep answers concise. "
                "If the user asks for the current time or date, use the 'get_current_time' tool. "
                "Answer other questions directly.",
    tools=[time_tool],
    before_model_callback=log_and_prefix_callback,   # <<< Keep Stage 4 callback
    before_tool_callback=check_allow_time_tool_callback, # <<< Add Stage 5 callback
)

print(f"Agent '{root_agent.name}' defined using model '{MODEL}' with BOTH callbacks and tools: {[t.name for t in root_agent.tools]}")
```

**Running the Agent with the Tool Callback:**

1.  Go back to your terminal (in `adk_tutorial`), activate `myenv`.
2.  Run the agent:

```bash
adk run my_first_agent
```

**Testing the Flow:**

This requires simulating the change in the `allow_time_checks` state flag. Since `adk run` uses `InMemorySessionService`, the state is reset each time you start it. Here’s how to understand the *intended* flow and how you might test it:

1.  **Run 1 (Checks Allowed):**
    *   Start `adk run my_first_agent`.
    *   Ask: `What time is it?`
    *   **Observe:**
        *   `before_model_callback` runs and modifies the instruction.
        *   LLM requests `get_current_time`.
        *   `before_tool_callback` runs, reads `allow_time_checks` (defaults to `True`), logs "Checks allowed...", and returns `None`.
        *   The actual `get_current_time` tool executes and updates the `last_time_checked` state.
        *   The agent responds with the time.
    *   Stop the agent (`Ctrl+C`).

2.  **Run 2 (Simulating Checks Disabled):**
    *   **Option A (Modify Code Temporarily for Test):** Change the `check_allow_time_tool_callback` function. Inside the `if tool_name == 'get_current_time':` block, temporarily *remove* the check for `allow_checks` and *always* return the blocking dictionary:
        ```python
        # Inside check_allow_time_tool_callback, for testing block:
        if tool_name == 'get_current_time':
             print(f"--- Callback(Tool): FORCING BLOCK for testing! ---")
             return {
                 "status": "blocked_by_policy",
                 "error_message": "Sorry, checking the time is currently disabled (TEST BLOCK)."
             }
        # ... rest of the function ...
        ```
        *Then* start `adk run my_first_agent` and ask `What time is it?`. You should see the "Blocking tool..." log and the agent relaying the error message. Remember to revert the code change afterwards!
    *   **Option B (Add a State-Setting Tool - More Advanced):** You could define *another* tool like `def set_policy(allow: bool, tool_context: ToolContext): tool_context.state['allow_time_checks'] = allow; return {'status': 'policy updated'}`, add it to the agent, start `adk run`, first ask the agent `Set time check policy to false`, and *then* ask `What time is it?`. This is closer to a real-world scenario but adds complexity for this stage.

**Expected Output & Observation (Focusing on the blocking scenario):**

*   `before_model_callback` runs as usual.
*   LLM requests `get_current_time`.
*   `before_tool_callback` runs.
    *   It reads `allow_time_checks` as `False` (or hits the forced block in Option A).
    *   It prints the "Blocking tool..." message.
    *   It returns the error dictionary: `{'status': 'blocked_by_policy', 'error_message': 'Sorry...'}`.
*   The actual `get_current_time` Python function **does not execute**. No tool execution logs appear for it.
*   The agent receives the error dictionary from the callback *as if* it were the result from the tool.
*   The agent formulates its final response based on this error information (e.g., "Sorry, checking the time is currently disabled by system policy.").

This demonstrates how `before_tool_callback` provides a precise control point to enforce policies or validate arguments *before* a tool's potentially costly or sensitive code is run.

**Summary of Stage 5:**
You have now:
*   Learned about `before_tool_callback` and its role as a guardrail *before* tool execution.
*   Defined a callback function with the required signature (`tool`, `args`, `tool_context`).
*   Used `ToolContext` to access session state within this tool-specific callback.
*   Understood how returning a dictionary from the callback blocks the actual tool function and provides an alternative result.
*   Configured an agent with multiple callbacks (`before_model_callback` and `before_tool_callback`) working together.
*   Observed (or simulated the observation of) how the `before_tool_callback` can prevent tool execution based on session state.

You're building a solid understanding of how to layer control and custom logic into your agents using ADK's callback system!

What concept would you like to tackle next? We could look at:

a)  **Multi-Agent Systems:** Building a small team where agents delegate tasks.
b)  **Workflow Agents:** Using agents like `SequentialAgent` to define fixed execution sequences.
c)  **Artifacts:** Handling file uploads/downloads.

In [16]:
## add to chat
prompt = ("Analyst: yes I am ready for the next step.")
response = chat.send_message(message=prompt)

# self reflection step
p = (
  "Before the analyst views this conduct a thorough review of your output. "
  "Correct any errors or make improvements and return your output. "
)
response = chat.send_message(message=p)
display(Markdown(response.text))

Okay, let's review and refine Stage 6.

Alright, let's dive into **Multi-Agent Systems**. This is where ADK really starts to shine, allowing you to build more complex and organized applications by having multiple agents work together as a team.

**Stage 6: Building a Team - Delegation with Sub-Agents**

**Concept:** Instead of making one monolithic agent handle everything (greetings, checking time, weather, calculations, etc.), we can create *specialized* agents for specific tasks and a "manager" or "root" agent that decides which specialist should handle the user's request.

*   **Root Agent:** The main entry point for user interaction. It analyzes the incoming request and decides whether to handle it directly using its *own* tools or to delegate it to a specialist.
*   **Sub-Agents:** These are the specialists, each designed and instructed for a particular capability (e.g., an agent *just* for greetings, another *just* for database lookups).
*   **Delegation:** The process where the root agent passes control of the current conversational turn to a sub-agent that is better suited for the specific task identified in the user's request.

**Why use multiple agents?**

*   **Modularity:** It's much easier to develop, test, debug, and update smaller, focused agents than one giant, complex agent.
*   **Specialization:** Each agent can have highly tailored instructions, specific tools, and even use different LLM models optimized for its task (e.g., a faster/cheaper model for simple greetings, a more powerful model for complex reasoning).
*   **Organization & Scalability:** Keeps the logic for complex applications cleaner and makes it easier to add new capabilities by simply adding new specialist agents to the team.

**How ADK Handles Delegation (Auto Flow):**

1.  **Define Hierarchy (`sub_agents` List):** When you define the root agent, you provide a list of the specialist agent instances in its `sub_agents` parameter. This establishes the team structure.
2.  **Sub-Agent `description` is Key:** The `description` field you give each sub-agent is **critical**. The root agent's LLM reads these descriptions to understand *what each specialist can do*. Make these concise and accurate.
3.  **Root Agent `instruction` Guides Delegation:** You must instruct the root agent *about* its team. Tell it which sub-agents exist (referencing them by their `name`) and *under what conditions* it should delegate a task to them.
4.  **Automatic Transfer Mechanism:** When the root agent receives a request, its LLM analyzes the request against its own instructions, its tools, *and* the `description`s of its sub-agents. If the LLM determines the request best fits a sub-agent's capability, ADK's runtime (sometimes called "Auto Flow" in this context) automatically handles the transfer of control to that sub-agent *for that specific turn*. The sub-agent then processes the request using its own model, instructions, and tools.

**Implementation:**

1.  **Define Specialist Tools:** We need a simple `say_hello` tool for our new `greeting_agent`. We'll keep the `get_current_time` tool for the root agent.
2.  **Define the Specialist Sub-Agent:** Create the `greeting_agent`, giving it a clear `name`, `description`, focused `instruction`, and only the `hello_tool`.
3.  **Update the Root Agent:**
    *   Give it a new, representative `name` (e.g., `team_coordinator`).
    *   Add the `greeting_agent` instance to its `sub_agents` list.
    *   Modify its `instruction` to explicitly mention the `greeting_specialist` (by name) and the conditions for delegation ("If the user gives a greeting...").
    *   Ensure the root agent keeps the `time_tool` for tasks it handles directly.
    *   Remove the callbacks from the previous stage to focus purely on the delegation mechanism.

Here's the updated `my_first_agent/agent.py`:

```python
# filename: my_first_agent/agent.py
from google.adk.agents import Agent
from google.adk.tools import FunctionTool, BaseTool # BaseTool needed if type hinting tool arg
from google.adk.tools.tool_context import ToolContext
# Callbacks and related imports removed for this stage's clarity
import datetime
from typing import Dict, Any # Type hints for tool args

# --- Constants ---
MODEL = "gemini-2.0-flash-001"

# --- Tool Definitions ---

# Tool kept by the Root Agent
def get_current_time(tool_context: ToolContext) -> dict:
    """Gets the current date and time."""
    print(f"--- Tool: get_current_time executing (Called by: {tool_context.agent_name}) ---")
    now = datetime.datetime.now()
    time_str = now.strftime("%Y-%m-%d %H:%M:%S")
    # State update is optional here, kept from previous stage
    tool_context.state['last_time_checked_root'] = time_str
    return {"current_time": time_str}

time_tool = FunctionTool(func=get_current_time)

# Tool specifically for the Greeting Sub-Agent
def say_hello(name: str = "there") -> dict:
    """Provides a friendly greeting, optionally using a name."""
    # Log which agent is calling this tool
    # (Requires ToolContext, but let's keep this simple for now)
    # We can infer based on which agent has the tool
    print(f"--- Tool: say_hello executing (Expected Call by: greeting_specialist) with name: {name} ---")
    return {"greeting": f"Hello, {name}! Great to connect."}

hello_tool = FunctionTool(func=say_hello)


# --- Sub-Agent Definition ---
greeting_agent = Agent(
    name="greeting_specialist", # <<< Unique name, used for delegation reference
    model=MODEL, # Can use the same or a different (e.g., cheaper) model
    # CRITICAL: Description helps the root agent decide WHEN to delegate.
    description="Use ONLY for handling simple user greetings like 'Hi', 'Hello', 'Good morning'.",
    instruction="You are the Greeting Specialist. Your ONLY task is to provide a "
                "friendly greeting using the 'say_hello' tool. Extract the user's "
                "name if provided in their greeting (pass it to the tool's 'name' arg). "
                "Do not answer any other type of question.",
    tools=[hello_tool], # Specialist only gets the tools it needs
)


# --- Root Agent Definition (Updated for Delegation) ---
root_agent = Agent(
    name="team_coordinator", # <<< New name for the root/manager agent
    model=MODEL, # This agent orchestrates, might need a capable model
    description="The main coordinator agent. Handles time requests directly using its own tool, and delegates greeting requests to a specialist.", # Describes its own role
    instruction="You are the main assistant and team coordinator. Your primary role is to check the time using the 'get_current_time' tool when asked. "
                "You also manage a specialist sub-agent:\n"
                "- 'greeting_specialist': This agent handles simple user greetings (like 'hello', 'hi').\n\n" # <<< Explicitly mention sub-agent by name
                "CRITICAL Instructions:\n"
                "1. If the user's message is clearly a greeting, **delegate** the task to the 'greeting_specialist'.\n" # <<< Guide delegation
                "2. If the user asks specifically for the current time or date, handle it **yourself** using your 'get_current_time' tool.\n"
                "3. For any other type of request, state politely that you can only handle time requests or greetings.",
    tools=[time_tool], # Root agent keeps tools for tasks it handles directly
    sub_agents=[greeting_agent], # <<< List the sub-agent instances here
    # Callbacks removed for clarity in this stage
)

# Confirmation messages
print(f"Root Agent '{root_agent.name}' defined with Sub-Agent: '{greeting_agent.name}'")
print(f"Root Agent Tools: {[t.name for t in root_agent.tools]}")
print(f"Sub-Agent '{greeting_agent.name}' Tools: {[t.name for t in greeting_agent.tools]}")

```

**Running the Agent Team:**

1.  Ensure you are in the `adk_tutorial` directory in your terminal.
2.  Make sure your `myenv` virtual environment is active.
3.  Run the agent using the same command (it automatically loads the variable named `root_agent` from your package):

```bash
adk run my_first_agent
```

Now, let's specifically test the delegation logic:

1.  `Hello there!` (Should trigger delegation to `greeting_specialist`)
2.  `What time is it?` (Should be handled directly by `team_coordinator`)
3.  `Hi, can you tell me the time?` (Interesting case - observe which agent/tool runs!)
4.  `What is the capital of France?` (Should be declined by `team_coordinator` based on its instructions).

**Expected Output & Observation:**

*   **Prompt 1 (`Hello there!`):**
    *   The `team_coordinator` (root) receives the prompt.
    *   Its LLM reads the instruction ("If the user's message is clearly a greeting, delegate...") and sees the `greeting_specialist`'s `description` ("Use ONLY for handling simple user greetings...").
    *   The LLM decides to delegate. ADK runtime automatically transfers control.
    *   The `greeting_specialist` agent receives the prompt "Hello there!".
    *   Its LLM reads its instruction ("Your ONLY task is to provide a friendly greeting using 'say_hello' tool...").
    *   It decides to use the `say_hello` tool (possibly passing "there" or no name).
    *   You see `--- Tool: say_hello executing... ---` in the terminal log.
    *   The `greeting_specialist` generates the final response based on the tool's output (e.g., "Hello, there! Great to connect.").
*   **Prompt 2 (`What time is it?`):**
    *   The `team_coordinator` receives the prompt.
    *   Its LLM determines this is a time request, matching its own capability and instruction ("If the user asks specifically for the current time..., handle it yourself..."). It does *not* match the `greeting_specialist`'s description.
    *   The LLM decides *not* to delegate.
    *   It uses its *own* `get_current_time` tool.
    *   You see `--- Tool: get_current_time executing... ---` in the log.
    *   The `team_coordinator` generates the final response based on the tool's output (e.g., "The current time is...").
*   **Prompt 3 (`Hi, can you tell me the time?`):**
    *   This tests the LLM's ability to interpret the potentially conflicting signals (greeting + time request) based on the root agent's instructions.
    *   It *might* prioritize the greeting and delegate to `greeting_specialist` first.
    *   It *might* prioritize the time request and handle it directly using `get_current_time`.
    *   *Less likely* with these simple instructions, but a more advanced model might try to handle both (e.g., greet then check time, or ask user which part to address).
    *   **Observe the logs:** See which tool (`say_hello` or `get_current_time`) executes, or if the agent responds differently. This highlights the LLM's role in interpreting delegation rules.
*   **Prompt 4 (`What is the capital of France?`):**
    *   The `team_coordinator` receives the prompt.
    *   Its LLM finds this doesn't match its time tool condition, nor the `greeting_specialist`'s description.
    *   Following the instruction ("For any other type of request, state politely that you can only handle time requests or greetings."), it should generate a response like "Sorry, I can only help with checking the current time or simple greetings." without calling any tools or sub-agents.

**Summary of Stage 6:**
You have now:
*   Understood the concept and benefits of multi-agent systems (modularity, specialization).
*   Defined a specialized sub-agent (`greeting_specialist`) with its own tools, description, and focused instructions.
*   Configured a root agent (`team_coordinator`) to manage this sub-agent using the `sub_agents` parameter.
*   Written instructions for the root agent to guide delegation decisions, referencing the sub-agent by `name` and relying on its `description`.
*   Observed ADK's automatic delegation mechanism correctly routing greeting requests to the specialist, while the root agent handled time requests itself.

This multi-agent structure is fundamental for building more complex, maintainable, and capable applications with ADK.

How does this look? Are you ready to explore **Workflow Agents** next, which allow for more *deterministic* (non-LLM-driven) control over sequences of agent execution?

In [17]:
## add to chat
prompt = ("Analyst: yes I am ready for the next step.")
response = chat.send_message(message=prompt)

# self reflection step
p = (
  "Before the analyst views this conduct a thorough review of your output. "
  "Correct any errors or make improvements and return your output. "
)
response = chat.send_message(message=p)
display(Markdown(response.text))

Okay, let's refine Stage 7 on `SequentialAgent`.

Great! So far, we've seen how an LLM-driven root agent can *delegate* tasks to specialists. Now, let's explore a different way for agents to work together: using **Workflow Agents** for *deterministic* control flow.

**Stage 7: Deterministic Control Flow with `SequentialAgent`**

**Concept:** Sometimes, you don't want the LLM deciding the order of operations based on descriptions and instructions. You might need a specific, predictable sequence: always do Step A, *then* always do Step B, *then* always do Step C. This is crucial for reliable processes or pipelines.

**Workflow Agents** are designed for exactly this. Unlike `LlmAgent`s, they **do not use an LLM** to decide the execution flow. Their behavior is predefined and predictable. ADK provides three types: `SequentialAgent`, `ParallelAgent`, and `LoopAgent`.

We'll start with the most straightforward: `SequentialAgent`.

*   **`SequentialAgent`:** Executes its list of `sub_agents` one after another, strictly in the order you provide them.

**Why use `SequentialAgent`?**

*   **Predictability:** Guarantees a fixed order of operations every time. Essential for processes that have mandatory steps.
*   **Structure:** Clearly defines multi-step workflows (e.g., Validate -> Process -> Report).
*   **Data Flow:** Naturally facilitates passing data between steps using Session State. Agent A can save data (e.g., using `output_key` or `tool_context.state`), and Agent B (running next) can read that data from the state.

**Implementation:**

1.  **Define Simple Task Agents:** We need agents to run in sequence. Let's reuse simplified versions of our greeting and time agents.
    *   `greeting_agent_simple`: Just uses `say_hello`. We can add an `output_key` to save its greeting to state.
    *   `time_agent_simple`: Just uses `get_current_time`. Its instruction *could* potentially reference the state saved by the greeting agent if needed.
2.  **Define the `SequentialAgent`:**
    *   Import `SequentialAgent` from `google.adk.agents`.
    *   Create an instance of `SequentialAgent`.
    *   Provide the `greeting_agent_simple` and `time_agent_simple` instances in the desired execution order (`[greeting_agent_simple, time_agent_simple]`) to the `sub_agents` list.
    *   Make this `SequentialAgent` our new `root_agent`.
3.  **No Workflow Instructions Needed:** The `SequentialAgent` itself doesn't need LLM instructions about *when* to run its sub-agents; its *type* dictates the flow ("run them in order"). The sub-agents, however, still need their own instructions for *their specific tasks* (like using their tools).

Here's the updated `my_first_agent/agent.py`:

```python
# filename: my_first_agent/agent.py
from google.adk.agents import Agent, SequentialAgent # <<< Import SequentialAgent
from google.adk.tools import FunctionTool
from google.adk.tools.tool_context import ToolContext
# Callbacks and related imports removed for clarity
import datetime
from typing import Dict, Any

# --- Constants ---
MODEL = "gemini-2.0-flash-001"

# --- Tool Definitions ---
def say_hello(name: str = "there") -> dict:
    """Provides a friendly greeting."""
    print(f"--- Tool: say_hello executing ---")
    greeting_message = f"Hello, {name}! Welcome to the sequential process."
    return {"greeting": greeting_message}

hello_tool = FunctionTool(func=say_hello)

def get_current_time(tool_context: ToolContext) -> dict:
    """Gets the current date and time."""
    print(f"--- Tool: get_current_time executing ---")
    now = datetime.datetime.now()
    time_str = now.strftime("%Y-%m-%d %H:%M:%S")
    # State written by tool persists within the session
    tool_context.state['last_time_checked_sequential'] = time_str
    return {"current_time": time_str}

time_tool = FunctionTool(func=get_current_time)

# --- Define Simple Sub-Agents for the Sequence ---

# Agent 1: Just Greets and saves result to state
greeting_agent_simple = Agent(
    name="step1_greeting",
    model=MODEL,
    description="Greets the user.", # Description used if this agent is part of a larger delegation tree, less critical for fixed sequence
    instruction="Greet the user politely using the 'say_hello' tool. Do nothing else.",
    tools=[hello_tool],
    output_key="step1_greeting_result" # <<< Save greeting result to state['step1_greeting_result']
)

# Agent 2: Just Checks Time
time_agent_simple = Agent(
    name="step2_time_check",
    model=MODEL,
    description="Checks the current time.",
    # Instruction could potentially use state from previous step:
    # instruction="Acknowledge the greeting in state['step1_greeting_result'] if present,
    #             then check the current time using the 'get_current_time' tool.",
    instruction="Check the current time using the 'get_current_time' tool. Respond only with the time information.",
    tools=[time_tool],
    output_key="step2_time_result" # Can also save its result
)

# --- Define the Sequential Workflow Agent ---
# This agent controls the ORDER of execution using predefined logic, not an LLM.
root_agent = SequentialAgent(
    name="greet_then_time_workflow", # Name for the workflow agent itself
    # List sub-agents IN THE ORDER they must execute
    sub_agents=[
        greeting_agent_simple, # Runs first
        time_agent_simple      # Runs second
    ]
    # No 'model' or 'instruction' needed for SequentialAgent itself.
)

print(f"Workflow Agent '{root_agent.name}' defined with sequence: {[agent.name for agent in root_agent.sub_agents]}")
print(f"Sub-Agent '{greeting_agent_simple.name}' Tools: {[t.name for t in greeting_agent_simple.tools]}")
print(f"Sub-Agent '{time_agent_simple.name}' Tools: {[t.name for t in time_agent_simple.tools]}")

```

**Running the Workflow Agent:**

1.  Go back to your terminal (in `adk_tutorial`), activate `myenv`.
2.  Run the agent. The command remains `adk run my_first_agent` because it loads the `root_agent` variable, which now points to our `SequentialAgent`.

```bash
adk run my_first_agent
```

Now, send **any** prompt to start the sequence. The specific prompt content is less critical for *triggering* the sequence (which is fixed), although the sub-agents might still use the initial user message if their instructions dictate. Try:

*   `Start`
*   `Begin workflow`
*   `Hello`

**Expected Output & Observation:**

Regardless of your initial prompt, you should observe the following **deterministic sequence** in the terminal logs and agent responses:

1.  The `greet_then_time_workflow` (SequentialAgent) starts execution.
2.  It **first** runs `sub_agents[0]`, which is `step1_greeting`.
3.  The `step1_greeting` agent executes its logic (using its LLM and instructions), decides to use its tool.
4.  Log shows: `--- Tool: say_hello executing ---`.
5.  `step1_greeting` yields its final response event (e.g., containing "Hello, there! Welcome."). The `Runner` processes this, saving the text to `state['step1_greeting_result']` because of `output_key`.
6.  **Then, and only then,** the `SequentialAgent` runs `sub_agents[1]`, which is `step2_time_check`.
7.  The `step2_time_check` agent executes its logic. It could potentially access `state['step1_greeting_result']` if its instructions were written to do so. It decides to use its tool.
8.  Log shows: `--- Tool: get_current_time executing ---`.
9.  `step2_time_check` yields its final response event (e.g., containing "The current time is..."). The `Runner` processes this, saving the result to `state['step2_time_result']`.
10. The `SequentialAgent` has finished running all its sub-agents in order. The overall interaction for this turn completes. The *final* response displayed to the user will typically be the response from the *last* agent in the sequence (`step2_time_check`).

The crucial point is that the `SequentialAgent` **enforced the execution order** (greeting first, then time check) without relying on LLM decisions for the flow control itself.

**Summary of Stage 7:**
You have now:
*   Learned about Workflow Agents (`SequentialAgent`, `ParallelAgent`, `LoopAgent`) providing deterministic control flow.
*   Implemented `SequentialAgent` to execute sub-agents in a fixed, predefined order.
*   Understood that the `SequentialAgent` orchestrates the sequence without needing its own LLM.
*   Observed the predictable step-by-step execution enforced by `SequentialAgent`.
*   Reinforced how state (`output_key`, `tool_context.state`) can be used to pass data between agents in a sequence.

Workflow agents are essential for building reliable processes and pipelines within your multi-agent systems, complementing the flexibility of LLM-driven delegation.

Ready to see how `ParallelAgent` allows sub-agents to run concurrently? Or would you prefer to explore another ADK feature like Artifacts for file handling?

In [18]:
## add to chat
prompt = ("Analyst: yes I am ready for the next step.")
response = chat.send_message(message=prompt)

# self reflection step
p = (
  "Before the analyst views this conduct a thorough review of your output. "
  "Correct any errors or make improvements and return your output. "
)
response = chat.send_message(message=p)
display(Markdown(response.text))

Okay, let's polish Stage 8 about `ParallelAgent`.

Excellent! We've seen sequential execution with `SequentialAgent`. Now, let's look at how to run tasks simultaneously using `ParallelAgent`.

**Stage 8: Concurrent Execution with `ParallelAgent`**

**Concept:** While `SequentialAgent` enforces a strict one-after-another order, `ParallelAgent` is designed to run its sub-agents *concurrently*—essentially, starting them all at roughly the same time and letting them run side-by-side.

**Why use `ParallelAgent`?**

*   **Speed & Efficiency:** This is the main benefit. If you have multiple independent tasks (like fetching data from different websites, performing separate calculations, calling different slow APIs) that don't need to wait for each other, running them in parallel can drastically reduce the total time taken compared to running them sequentially.
*   **Independent Operations:** Ideal for scenarios where sub-tasks don't depend on each other's immediate results during their execution.

**How it works:**

1.  **Concurrent Initiation:** When the `ParallelAgent` executes, it immediately initiates the `run_async` method of *all* the sub-agents listed in its `sub_agents`. They start processing concurrently.
2.  **Asynchronous Execution:** ADK leverages Python's `asyncio` to manage these concurrent tasks, allowing them to proceed independently without blocking each other (especially if the tasks involve waiting for I/O like network requests or simulated delays).
3.  **Shared Session State:** This is important: even though they run concurrently, sub-agents within a `ParallelAgent` **still share access to the same underlying session state dictionary** (`session.state`).
    *   They can all *read* the state as it was when the `ParallelAgent` started.
    *   They can all *write* results back to the state using methods like `tool_context.state['key'] = value` or by configuring `output_key` on the sub-agent.
    *   **⚠️ Best Practice:** To avoid race conditions (where multiple agents might try to write to the *same* state key at the same time, and only the last write succeeds), ensure parallel sub-agents write their results to **distinct** state keys.
4.  **Interleaved Output & Completion:** Events generated by the parallel sub-agents (like tool logs or their final responses) may appear interleaved in the console output, reflecting their concurrent progress. The `ParallelAgent` itself finishes its execution turn only *after all* of its sub-agents have completed their individual runs. It doesn't typically produce a single, combined response itself; its value lies in orchestrating the concurrent execution. The results saved to the distinct state keys are then available for subsequent agents (e.g., if the `ParallelAgent` was part of a larger `SequentialAgent`).

**Implementation:**

1.  **Define Independent Agents:** We'll reuse our simple `greeting_agent` and `time_agent` concepts from Stage 7, but ensure their tools are `async` (to better simulate concurrent work) and they use distinct `output_key`s to save their results safely to the shared state.
2.  **Define the `ParallelAgent`:**
    *   Import `ParallelAgent`.
    *   Create an instance, listing the independent agents (`greeting_agent_parallel`, `time_agent_parallel`) in the `sub_agents` list.
    *   Set this `ParallelAgent` as our `root_agent`.

Here's the updated `my_first_agent/agent.py`:

```python
# filename: my_first_agent/agent.py
from google.adk.agents import Agent, ParallelAgent # <<< Import ParallelAgent
from google.adk.tools import FunctionTool
from google.adk.tools.tool_context import ToolContext
import datetime
import asyncio # <<< Import asyncio for simulating concurrent work
from typing import Dict, Any

# --- Constants ---
MODEL = "gemini-2.0-flash-001"

# --- Tool Definitions (Async for better demo) ---

# Marked 'async def' and uses 'await asyncio.sleep'
async def say_hello(name: str = "there") -> dict:
    """Provides a friendly greeting after a short delay."""
    print(f"--- Tool: say_hello starting (will sleep 1s)... ---")
    await asyncio.sleep(1) # Simulate longer work
    greeting_message = f"Hello, {name}! Parallel process says hi."
    print(f"--- Tool: say_hello finished. ---")
    return {"greeting": greeting_message}

hello_tool = FunctionTool(func=say_hello)

# Marked 'async def' and uses 'await asyncio.sleep'
async def get_current_time(tool_context: ToolContext) -> dict:
    """Gets the current date and time after a short delay."""
    print(f"--- Tool: get_current_time starting (will sleep 0.5s)... ---")
    await asyncio.sleep(0.5) # Simulate shorter work
    now = datetime.datetime.now()
    time_str = now.strftime("%Y-%m-%d %H:%M:%S")
    state_key = 'parallel_time_checked' # Use a distinct key
    tool_context.state[state_key] = time_str
    print(f"--- Tool: get_current_time finished (State '{state_key}' updated). ---")
    return {"current_time": time_str}

time_tool = FunctionTool(func=get_current_time)


# --- Define Simple Sub-Agents for Parallel Execution ---

# Agent 1: Just Greets (using async tool)
# Uses a distinct output_key to write its result to state
greeting_agent_parallel = Agent(
    name="parallel_greeting",
    model=MODEL,
    description="Greets the user in parallel.",
    instruction="Greet the user using the 'say_hello' tool.",
    tools=[hello_tool],
    output_key="parallel_greeting_result" # <<< Saves result to state['parallel_greeting_result']
)

# Agent 2: Just Checks Time (using async tool)
# Uses a distinct output_key
time_agent_parallel = Agent(
    name="parallel_time_check",
    model=MODEL,
    description="Checks the current time in parallel.",
    instruction="Check the current time using the 'get_current_time' tool.",
    tools=[time_tool],
    output_key="parallel_time_result" # <<< Saves result to state['parallel_time_result']
)


# --- Define the Parallel Workflow Agent ---
# This agent runs its sub-agents concurrently.
root_agent = ParallelAgent(
    name="greet_and_time_parallel_workflow", # Name for the workflow agent
    # List the agents that should run concurrently
    sub_agents=[
        greeting_agent_parallel,
        time_agent_parallel
    ]
    # No model or instructions needed for ParallelAgent itself.
)

print(f"Workflow Agent '{root_agent.name}' defined to run sub-agents in parallel: {[agent.name for agent in root_agent.sub_agents]}")
print(f"Sub-Agent '{greeting_agent_parallel.name}' Tools: {[t.name for t in greeting_agent_parallel.tools]}, Output Key: {greeting_agent_parallel.output_key}")
print(f"Sub-Agent '{time_agent_parallel.name}' Tools: {[t.name for t in time_agent_parallel.tools]}, Output Key: {time_agent_parallel.output_key}")

```

**Running the Parallel Agent:**

1.  Go back to your terminal (in `adk_tutorial`), activate `myenv`.
2.  Run the agent:

```bash
adk run my_first_agent
```

Send any prompt to start the workflow, for example:

*   `Run parallel`
*   `Go`

**Expected Output & Observation:**

Because the agents run concurrently and we added simulated delays (`asyncio.sleep`), the exact order of the log messages might vary slightly each time. However, the key pattern to observe is the **interleaving** indicating concurrency:

1.  The `greet_and_time_parallel_workflow` agent starts.
2.  *Both* `parallel_greeting` and `parallel_time_check` agents begin execution almost simultaneously.
3.  You'll likely see the **"starting execution"** logs from *both* tools appear close together in the terminal:
    *   `--- Tool: get_current_time starting (will sleep 0.5s)... ---`
    *   `--- Tool: say_hello starting (will sleep 1s)... ---`
4.  Since `get_current_time` has a shorter simulated delay (0.5s) than `say_hello` (1s), you'll probably see its **"finished execution"** log appear *before* `say_hello` finishes:
    *   `--- Tool: get_current_time finished... ---`
5.  A little later, the "finished execution" log for the longer task appears:
    *   `--- Tool: say_hello finished. ---`
6.  **Final Response & State:** The `adk run` command typically displays the final event content from the *last* sub-agent to complete its turn. Since `say_hello` likely finishes last due to the longer sleep, you might see the greeting ("Hello, there! Parallel process says hi.") as the final message in the terminal.
    *   **However, the main outcome is not this single message.** The real result is that *both* tasks ran concurrently, and *both* their results were saved to the session state under their respective `output_key`s (`parallel_greeting_result` and `parallel_time_result`). A subsequent agent (if this `ParallelAgent` were nested in a `SequentialAgent`) could then access *both* pieces of information from the state.

The key observation is the interleaving of "start" and "finish" logs, demonstrating concurrent activity and potentially faster overall completion than a sequential approach.

**Summary of Stage 8:**
You have now:
*   Learned about `ParallelAgent` for running sub-agents concurrently.
*   Understood its primary benefit: potentially speeding up workflows with independent tasks.
*   Seen how to define a `ParallelAgent` and provide its `sub_agents`.
*   Observed the concurrent execution pattern through interleaved logs.
*   Reinforced the importance of using distinct state keys (`output_key` or manual `tool_context.state` writes) when parallel agents modify shared session state.

We've now covered the main workflow agents (`SequentialAgent`, `ParallelAgent`). The third type is `LoopAgent`, which repeats its sub-agents—useful for iterative refinement or polling tasks.

Are you ready to learn about `LoopAgent`, or would you prefer to switch gears and explore **Artifacts** for handling files?

In [19]:
## add to chat
prompt = ("Analyst: yes I am ready for the next step.")
response = chat.send_message(message=prompt)

# self reflection step
p = (
  "Before the analyst views this conduct a thorough review of your output. "
  "Correct any errors or make improvements and return your output. "
)
response = chat.send_message(message=p)
display(Markdown(response.text))

Okay, let's refine Stage 9 about `LoopAgent`.

Excellent! Let's explore the final type of predefined workflow agent: `LoopAgent`.

**Stage 9: Iterative Execution with `LoopAgent`**

**Concept:** Sometimes, you need your agent system to repeat a sequence of actions multiple times. This could be for refining a result iteratively (like generating text, getting feedback, revising, and repeating), polling for a status update until it changes, or attempting an action until it succeeds (up to a certain number of tries). `LoopAgent` is designed specifically for these repetitive workflows.

*   **`LoopAgent`:** Executes its list of `sub_agents` sequentially (one after another within the iteration), and then **repeats** the entire sequence from the beginning.

**Why use `LoopAgent`?**

*   **Iteration & Refinement:** Ideal for processes like Generate -> Critique -> Revise, where you repeat steps to improve quality.
*   **Polling:** Repeatedly check an external system's status or a value in the session state until a desired condition is met.
*   **Retries:** Implement simple retry logic for actions that might fail occasionally.

**How it works:**

1.  **Sequential Execution (Within Each Loop):** In a single pass (iteration) of the loop, `LoopAgent` runs its `sub_agents` one after another, in the order they are listed, just like `SequentialAgent`.
2.  **Repetition:** After the *last* sub-agent in the sequence finishes its turn for the current iteration, the `LoopAgent` automatically starts the sequence over again with the *first* sub-agent for the next iteration.
3.  **Termination Conditions:** A loop needs a way to stop! `LoopAgent` provides two mechanisms:
    *   **`max_iterations` (Safety Net):** You can (and generally should) specify a maximum number of times the loop is allowed to run when you define the `LoopAgent` (e.g., `max_iterations=5`). The loop will stop automatically if it completes this many full iterations.
    *   **Escalation Signal (`actions.escalate=True`) (Dynamic Exit):** This is the primary way to exit the loop based on runtime conditions. If *any* sub-agent during its execution (or a tool/callback called by that sub-agent) signals escalation by setting `tool_context.actions.escalate = True` (which gets included in the resulting `Event`), the `LoopAgent` will **stop** executing immediately after that specific event is processed. It will *not* start the next iteration or even finish the remaining sub-agents in the *current* iteration if the signal occurs mid-sequence.

**Implementation:**

1.  **Define Agents for the Loop Body:** We need agents to perform the actions *inside* each loop iteration. Let's create:
    *   `increment_counter_agent`: Uses a tool (`increment_counter`) to increment a 'loop\_counter' value stored in session state.
    *   `check_counter_agent`: Uses a tool (`check_counter_and_maybe_escalate`) to check the 'loop\_counter'. This tool will be responsible for signaling `escalate=True` via `tool_context.actions` when the counter reaches a predefined target value (e.g., 3).
2.  **Define the `LoopAgent`:**
    *   Import `LoopAgent`.
    *   Create an instance, listing `increment_counter_agent` and `check_counter_agent` in its `sub_agents` (this defines the sequence *within* each iteration).
    *   Set a `max_iterations` value (e.g., 5) as a safety fallback.
    *   Make this `LoopAgent` our new `root_agent`.

Here's the updated `my_first_agent/agent.py`:

```python
# filename: my_first_agent/agent.py
from google.adk.agents import Agent, LoopAgent # <<< Import LoopAgent
from google.adk.tools import FunctionTool, BaseTool
from google.adk.tools.tool_context import ToolContext
from google.adk.events import Event, EventActions # <<< Needed for escalation signal context
# Callbacks and related imports removed for clarity
import datetime
from typing import Dict, Any, Optional

# --- Constants ---
MODEL = "gemini-2.0-flash-001"
TARGET_COUNT = 3 # The count at which the loop should stop via escalation

# --- Tool Definitions for Loop ---

def increment_counter(tool_context: ToolContext) -> dict:
    """Increments 'loop_counter' in session state by 1."""
    print(f"--- Tool: increment_counter executing ---")
    current_count = tool_context.state.get('loop_counter', 0) # Read state
    new_count = current_count + 1
    tool_context.state['loop_counter'] = new_count # Write state
    print(f"--- Tool: Updated 'loop_counter' state from {current_count} to {new_count} ---")
    return {"status": "incremented", "new_count": new_count}

increment_tool = FunctionTool(func=increment_counter)

def check_counter_and_maybe_escalate(tool_context: ToolContext) -> dict:
    """
    Checks 'loop_counter'. If >= TARGET_COUNT, signals LoopAgent to stop
    by setting tool_context.actions.escalate = True.
    """
    print(f"--- Tool: check_counter_and_maybe_escalate executing ---")
    current_count = tool_context.state.get('loop_counter', 0) # Read state
    print(f"--- Tool: Current 'loop_counter' state is: {current_count} ---")

    result = {"status": "checked", "current_count": current_count}
    should_escalate = (current_count >= TARGET_COUNT)

    if should_escalate:
        print(f"--- Tool: Counter reached {TARGET_COUNT}! Signaling escalation via tool_context.actions. ---")
        # <<< This is how the tool signals the LoopAgent to stop
        tool_context.actions.escalate = True
        result["message"] = f"Target count {TARGET_COUNT} reached. Stopping loop."
    else:
         print(f"--- Tool: Counter below {TARGET_COUNT}. Loop will continue. ---")
         result["message"] = f"Target count {TARGET_COUNT} not yet reached."

    # The dictionary returned is the tool's result for the LLM.
    # The escalation signal is carried separately by the framework via the Event's actions.
    return result

check_tool = FunctionTool(func=check_counter_and_maybe_escalate)

# --- Define Simple Sub-Agents for the Loop Body ---

# Agent 1: Runs the increment tool
increment_counter_agent = Agent(
    name="step1_increment",
    model=MODEL, # Model needed to decide to call the tool
    description="Increments the loop counter.",
    instruction="Use the 'increment_counter' tool. Report the new count briefly.",
    tools=[increment_tool],
    output_key="increment_result" # Optional: save tool dict result to state
)

# Agent 2: Runs the check tool (which might escalate)
check_counter_agent = Agent(
    name="step2_check_and_escalate",
    model=MODEL, # Model needed to decide to call the tool
    description="Checks the loop counter and triggers loop stop if target reached.",
    instruction="Use the 'check_counter_and_maybe_escalate' tool. Report the message from the tool.",
    tools=[check_tool],
    output_key="check_result" # Optional: save tool dict result to state
)

# --- Define the Loop Workflow Agent ---
root_agent = LoopAgent(
    name="counter_loop_workflow",
    # Sub-agents run sequentially *within* each iteration
    sub_agents=[
        increment_counter_agent, # Runs first in each iteration
        check_counter_agent      # Runs second in each iteration
    ],
    max_iterations=5 # <<< Safety limit: Stop after 5 full loops if escalation doesn't happen
)

print(f"Workflow Agent '{root_agent.name}' defined with loop sequence: {[agent.name for agent in root_agent.sub_agents]}, max_iterations={root_agent.max_iterations}")

```

**Running the Loop Agent:**

1.  Go back to your terminal (in `adk_tutorial`), activate `myenv`.
2.  Run the agent:

```bash
adk run my_first_agent
```

Send any prompt to start the loop, e.g.:

*   `Start count`
*   `Loop`

**Expected Output & Observation:**

You will see the agents execute sequentially, repeating until the counter reaches `TARGET_COUNT` (3).

*   **Iteration 1:**
    *   `step1_increment` runs -> Tool updates state `loop_counter` to 1. Agent responds.
    *   `step2_check_and_escalate` runs -> Tool checks state (1 < 3), does *not* signal escalate. Agent responds.
*   **Iteration 2:**
    *   `step1_increment` runs -> Tool updates state `loop_counter` to 2. Agent responds.
    *   `step2_check_and_escalate` runs -> Tool checks state (2 < 3), does *not* signal escalate. Agent responds.
*   **Iteration 3:**
    *   `step1_increment` runs -> Tool updates state `loop_counter` to 3. Agent responds.
    *   `step2_check_and_escalate` runs -> Tool checks state (3 >= 3).
        *   Log shows: `--- Tool: Counter reached 3! Signaling escalation... ---`. The tool sets `tool_context.actions.escalate = True`.
        *   Agent responds (e.g., "Target count 3 reached. Stopping loop.").
*   **Loop Terminates:** Because the `check_counter_and_maybe_escalate` tool signaled escalation, the `LoopAgent` **stops** after processing the final event from `step2_check_and_escalate` in Iteration 3. It does *not* proceed to Iteration 4, even though `max_iterations` was 5.

The final output displayed by `adk run` will typically be the response from the `check_counter_agent` in the last iteration where escalation occurred ("Target count 3 reached...").

**Summary of Stage 9:**
You have now:
*   Learned about `LoopAgent` for creating workflows that repeat a sequence of sub-agents.
*   Understood the two ways a `LoopAgent` can terminate: reaching `max_iterations` or receiving an `escalate=True` signal from a sub-agent's event actions.
*   Implemented a loop where a tool (`check_counter_and_maybe_escalate`) dynamically signals escalation based on session state by modifying `tool_context.actions.escalate`.
*   Observed the `LoopAgent` executing its sub-agents repeatedly until the escalation condition was successfully triggered and the loop stopped.

We've now covered the core agent types (`LlmAgent`, `SequentialAgent`, `ParallelAgent`, `LoopAgent`), tools, state management, and callbacks! This gives you a very strong foundation in ADK.

What area would you like to explore next? We could dive into:

a)  **Artifacts:** How agents can save and load files (like images, PDFs, etc.).
b)  **Evaluation:** How to formally test your agent's performance against predefined criteria.
c)  **Deployment:** How to get your agent running on cloud platforms like Cloud Run or Vertex AI Agent Engine.
d)  **Custom Agents:** Building agents with completely custom Python logic by inheriting directly from `BaseAgent` (more advanced).

In [20]:
## add to chat
prompt = ("Analyst: yes I am ready for the next step.")
response = chat.send_message(message=prompt)

# self reflection step
p = (
  "Before the analyst views this conduct a thorough review of your output. "
  "Correct any errors or make improvements and return your output. "
)
response = chat.send_message(message=p)
display(Markdown(response.text))

Okay, let's ensure Stage 10 is clear and accurate.

Okay, let's explore **Artifacts**. While Session State is great for small, serializable pieces of data like strings, numbers, or simple lists/dictionaries, **Artifacts** are ADK's mechanism for handling larger, potentially binary data—essentially, files—associated with a conversation session or a user.

**Stage 10: Handling Files and Large Data with Artifacts**

**Concept:** Think of artifacts as named, versioned files stored alongside your session data.

*   **What they store:** Typically used for binary data (images, PDFs, audio clips) or large text data that wouldn't fit well or efficiently in the regular session state dictionary. The data is represented within ADK using the standard `google.genai.types.Part` object, usually storing the raw `bytes` and the `mime_type` (e.g., 'image/png', 'application/pdf', 'text/plain') in the `Part.inline_data` field.
*   **Why use them?**
    *   To allow users to upload files for processing by the agent.
    *   To enable agents or tools to generate and save files (like PDF reports, CSV exports, generated images).
    *   To pass large data blobs between different steps or agents in a workflow without overloading the session state.
    *   To persist configuration files or user-specific data that isn't simple key-value state.
*   **Management (`ArtifactService`):** Artifacts are not stored *in* the session state dictionary. Their storage and retrieval are handled by a dedicated **`ArtifactService`**. ADK provides:
    *   `InMemoryArtifactService`: Stores artifacts in the application's memory. **Data is lost when the application stops.** Good for local testing. (This is what `adk run` uses by default).
    *   `GcsArtifactService`: Stores artifacts persistently in a Google Cloud Storage bucket. Suitable for production.
    *   The `Runner` must be configured with an `ArtifactService` instance for these operations to work (though `adk run` handles the default `InMemory` setup).
*   **Versioning:** Every time you `save_artifact` with the same `filename`, the `ArtifactService` automatically creates a new version (typically starting from 0 and incrementing). The `save_artifact` method returns the new version number.
*   **Access (via Context):** Within tools (or callbacks), you interact with artifacts using methods on the context objects (`CallbackContext` or `ToolContext`):
    *   `context.save_artifact(filename: str, artifact: types.Part) -> int`: Saves the data from the `Part` object under the specified `filename`. Returns the integer version number assigned. This action is also recorded in the `EventActions.artifact_delta` for tracking purposes by the `SessionService`.
    *   `context.load_artifact(filename: str, version: Optional[int] = None) -> Optional[types.Part]`: Retrieves the specified artifact. If `version` is omitted (or `None`), it fetches the **latest** version. Returns the `Part` object containing the data, or `None` if the artifact/version doesn't exist.
    *   `tool_context.list_artifacts() -> list[str]`: (Available only on `ToolContext`) Returns a list of all unique artifact filenames currently available within the session's scope.
*   **Namespacing (`user:` prefix):** Just like state, prefixing the `filename` with `"user:"` (e.g., `"user:avatar.jpg"`) associates the artifact with the `user_id` across all sessions (assuming a persistent `ArtifactService`). A filename without a prefix is tied to the specific `session_id`.

**Implementation:**

1.  **Modify Tool to Save Artifact:** We'll update the `get_current_time` tool. After getting the time string, it will create a `types.Part` containing this string (as text) and save it as an artifact named `"last_timestamp.txt"` using `tool_context.save_artifact`.
2.  **Create Tool to Load Artifact:** We'll define a new tool, `read_last_timestamp_artifact`. This tool will use `tool_context.load_artifact` to retrieve the *latest* version of `"last_timestamp.txt"` and return its text content.
3.  **Update Agent:** We'll add the new `read_last_timestamp_artifact` tool to the agent's `tools` list and update the `instruction` to clearly explain when to use each tool (one for getting *current* time and saving, one for *reading* the saved artifact).

Here's the updated `my_first_agent/agent.py`:

```python
# filename: my_first_agent/agent.py
from google.adk.agents import Agent
from google.adk.tools import FunctionTool, BaseTool
from google.adk.tools.tool_context import ToolContext
# Callbacks and related imports removed for clarity
from google.adk.events import EventActions # Needed if tool signals actions
from google.genai import types # <<< Need types for creating Part
import datetime
from typing import Dict, Any, Optional

# --- Constants ---
MODEL = "gemini-2.0-flash-001"
TIMESTAMP_ARTIFACT_FILENAME = "last_timestamp.txt" # Use a constant for the filename

# --- Tool Definitions ---

# Tool 1: Gets time, updates state (optional), AND saves artifact
def get_current_time_and_save_artifact(tool_context: ToolContext) -> dict:
    """
    Gets the current date and time. It also saves this timestamp
    as a text artifact named 'last_timestamp.txt'.

    Use this tool ONLY when the user asks for the *current* time or date.

    Returns:
        A dictionary containing the current date and time string.
        Example format: {'current_time': '2024-08-15 14:30:55'}
    """
    print(f"--- Tool: get_current_time_and_save_artifact executing ---")
    now = datetime.datetime.now()
    time_str = now.strftime("%Y-%m-%d %H:%M:%S")

    # Update state (optional, but shows coexistence with artifacts)
    state_key = 'last_time_checked_artifact'
    tool_context.state[state_key] = time_str
    print(f"--- Tool: Updated state '{state_key}' to: {time_str} ---")

    # <<< Save the time string as an Artifact >>>
    try:
        # Create a Part object using from_text for text data
        # ADK handles setting the mime_type to 'text/plain'
        time_artifact_part = types.Part.from_text(time_str)

        # Save the artifact using the context method
        version = tool_context.save_artifact(
            filename=TIMESTAMP_ARTIFACT_FILENAME,
            artifact=time_artifact_part
        )
        # The framework automatically adds {'last_timestamp.txt': version}
        # to the event's actions.artifact_delta
        print(f"--- Tool: Saved artifact '{TIMESTAMP_ARTIFACT_FILENAME}' (Version: {version}) with content: '{time_str}' ---")
        return {"current_time": time_str} # Return success payload

    except ValueError as e:
        print(f"--- Tool ERROR: Failed to save artifact: {e} (Is ArtifactService configured in Runner?) ---")
        # Return an error dictionary for the LLM
        return {"status": "error", "error_message": f"Failed to save timestamp artifact: {e}"}
    except Exception as e:
        print(f"--- Tool ERROR: Unexpected error saving artifact: {e} ---")
        return {"status": "error", "error_message": f"Unexpected error saving timestamp artifact: {e}"}

# Wrap the function
time_artifact_tool = FunctionTool(func=get_current_time_and_save_artifact)


# Tool 2: Reads the timestamp from the artifact
def read_last_timestamp_artifact(tool_context: ToolContext) -> dict:
    """
    Reads the most recent timestamp previously saved in the 'last_timestamp.txt' artifact.

    Use this tool ONLY when the user asks specifically about the timestamp
    that was saved in the artifact, or asks to recall the saved time.

    Returns:
        A dictionary with the status and the content of the artifact, or an error message.
        Example success: {'status': 'found', 'timestamp_from_artifact': '2024-08-15 14:30:55'}
        Example error: {'status': 'not_found', 'error_message': 'Timestamp artifact not found.'}
    """
    print(f"--- Tool: read_last_timestamp_artifact executing ---")
    try:
        # Load the latest version by default (version=None)
        artifact_part = tool_context.load_artifact(filename=TIMESTAMP_ARTIFACT_FILENAME)

        if artifact_part and artifact_part.text is not None:
            timestamp_content = artifact_part.text
            print(f"--- Tool: Loaded latest artifact '{TIMESTAMP_ARTIFACT_FILENAME}', content: '{timestamp_content}' ---")
            # Return success payload
            return {"status": "found", "timestamp_from_artifact": timestamp_content}
        else:
            print(f"--- Tool: Artifact '{TIMESTAMP_ARTIFACT_FILENAME}' not found or has no text content. ---")
            return {"status": "not_found", "error_message": f"Artifact '{TIMESTAMP_ARTIFACT_FILENAME}' not found."}

    except ValueError as e:
        print(f"--- Tool ERROR: Failed to load artifact: {e} (Is ArtifactService configured in Runner?) ---")
        return {"status": "error", "error_message": f"Error loading timestamp artifact: {e}"}
    except Exception as e:
        print(f"--- Tool ERROR: Unexpected error loading artifact: {e} ---")
        return {"status": "error", "error_message": f"Unexpected error loading timestamp artifact: {e}"}

# Wrap the function
read_artifact_tool = FunctionTool(func=read_last_timestamp_artifact)


# --- Agent Definition (Updated with Artifact Tools) ---
root_agent = Agent(
    name="assistant_with_artifacts", # New name for clarity
    model=MODEL,
    instruction="You are a helpful assistant that can manage a timestamp artifact.\n\n"
                "Use Tools Based on User Intent:\n"
                "1. CURRENT Time Request: If the user asks for the *current* time or date NOW, "
                "   you MUST use the 'get_current_time_and_save_artifact' tool. This tool also saves the timestamp.\n"
                "2. RECALL Saved Time: If the user asks *what timestamp was saved*, *what's in the artifact*, "
                "   or asks to *recall the saved time*, you MUST use the 'read_last_timestamp_artifact' tool.\n"
                "3. Other Questions: Answer simple questions directly without using tools.\n\n"
                "Respond Clearly: Report the time or the artifact content based on the tool used.",
    # Provide BOTH tools to the agent
    tools=[time_artifact_tool, read_artifact_tool],
)

print(f"Agent '{root_agent.name}' defined using model '{MODEL}' with artifact tools: {[t.name for t in root_agent.tools]}")
```

**Running the Agent with Artifacts:**

1.  Go back to your terminal (in `adk_tutorial`), activate `myenv`.
2.  Run the agent:

```bash
adk run my_first_agent
```

Now, try this sequence of prompts to see the artifact lifecycle:

1.  `What time is it now?` (Runs Tool 1, saves artifact version 0)
2.  `What was the timestamp saved in the artifact?` (Runs Tool 2, reads version 0)
3.  `What time is it again?` (Runs Tool 1, saves artifact version 1)
4.  `Recall the saved timestamp from the artifact.` (Runs Tool 2, reads version 1 - the latest)

**Expected Output & Observation:**

*   **Prompt 1 (`What time is it now?`):**
    *   Agent calls `get_current_time_and_save_artifact`.
    *   Tool logs show state update and artifact save: `--- Tool: Saved artifact 'last_timestamp.txt' (Version: 0)... ---`
    *   Agent responds with the current time, let's call it `<time_1>`.
*   **Prompt 2 (`What was the timestamp saved...`):**
    *   Agent calls `read_last_timestamp_artifact`.
    *   Tool logs show artifact load: `--- Tool: Loaded latest artifact 'last_timestamp.txt', content: '<time_1>' ---`
    *   Agent responds, confirming the saved time: "The timestamp saved in the artifact was `<time_1>`." (or similar phrasing).
*   **Prompt 3 (`What time is it again?`):**
    *   Agent calls `get_current_time_and_save_artifact` again.
    *   Tool logs show state update and artifact save: `--- Tool: Saved artifact 'last_timestamp.txt' (Version: 1)... ---` (Note the version is now 1).
    *   Agent responds with the new current time, let's call it `<time_2>`.
*   **Prompt 4 (`Recall the saved timestamp...`):**
    *   Agent calls `read_last_timestamp_artifact` again.
    *   Tool logs show artifact load: `--- Tool: Loaded latest artifact 'last_timestamp.txt', content: '<time_2>' ---` (It correctly loaded version 1, the latest).
    *   Agent responds with the updated saved time: "The timestamp saved in the artifact is `<time_2>`."

This clearly demonstrates the artifact lifecycle: saving data using `save_artifact`, the automatic versioning managed by the `ArtifactService`, and retrieving the *latest* version by default using `load_artifact`. All interactions were managed conveniently through the `ToolContext`.

**Summary of Stage 10:**
You have now:
*   Learned about Artifacts as ADK's mechanism for handling file-like or large data.
*   Understood the role of `ArtifactService` (using `InMemory` for now) and how data is represented using `types.Part`.
*   Successfully used `tool_context.save_artifact` within a tool to save data (a simple timestamp string in this case) as a versioned artifact.
*   Successfully used `tool_context.load_artifact` within another tool to retrieve the latest version of the saved artifact.
*   Configured an agent with appropriate instructions to use different tools for saving versus reading the artifact.
*   Observed the artifact saving, versioning (0, then 1), and loading process during an interactive agent session.

You've now covered a significant range of ADK's core features, including agent definition, tools, state management within sessions, using callbacks for control, orchestrating workflows with sequential and parallel agents, and managing file-like data with artifacts! This provides a very strong foundation.

Given this, what would be most beneficial for you to explore next?

a)  **Review and Consolidate:** We can revisit any of the previous stages (Agents, Tools, State, Callbacks, Workflow Agents, Artifacts) if you have questions or want to solidify your understanding.
b)  **Evaluation:** Learn how to formally test your agent's behavior and performance using ADK's evaluation tools.
c)  **Deployment:** Discuss the steps involved in taking your agent from your local machine to a cloud environment (like Cloud Run or Vertex AI Agent Engine).
d)  **More Advanced Concepts:** Briefly touch upon topics like `CustomAgent`s (for non-LLM driven logic) or more complex multi-agent patterns involving different delegation strategies (though these often benefit from more practice with the basics first).

In [21]:
## add to chat
prompt = ("Analyst: yes I am ready for the next step.")
response = chat.send_message(message=prompt)

# self reflection step
p = (
  "Before the analyst views this conduct a thorough review of your output. "
  "Correct any errors or make improvements and return your output. "
)
response = chat.send_message(message=p)
display(Markdown(response.text))

Okay, let's refine Stage 11 on Evaluation.

Excellent! Now that you've built agents with various capabilities (tools, state, workflows), it's crucial to have a reliable way to **test** if they are behaving as expected. Since LLMs introduce variability, simple checks aren't enough. We need a more structured approach.

**Stage 11: Evaluating Your Agent's Performance**

**Concept:** Agent evaluation in ADK goes beyond typical software testing. Because LLM responses can vary while still being correct, we often need to assess not just the final answer but also the *process* the agent took to get there (its "trajectory").

ADK provides an evaluation framework to help automate this:

1.  **Define Test Cases:** You create structured test data describing specific scenarios. This includes:
    *   The user's input (`query`).
    *   The expected *behavior* (e.g., which `tools` the agent *should* call, with what kinds of `arguments`).
    *   An example of a good final answer (`reference`).
2.  **Run Evaluations:** Use ADK tools to automatically run your agent against these test cases.
3.  **Assess Performance:** The tools compare the agent's *actual* behavior (tool calls made, final response generated) against your defined expectations and report whether it meets the criteria.

**ADK Tools for Evaluation:**

*   **Test Files (`<name>.test.json`):** Simple JSON files where each entry is a test interaction (query, expected tool calls, reference). Best suited for focused tests ("unit tests") of specific agent capabilities within a single session context.
*   **Evalset Files (`<name>.evalset.json`):** More structured files capable of defining multiple complex, multi-turn sessions, each with its own initial state. Better for end-to-end or integration testing.
*   **`AgentEvaluator`:** A Python class used programmatically (often within `pytest`) to run evaluations using `.test.json` files.
*   **`adk eval` CLI:** A command-line tool primarily for running evaluations using `.evalset.json` files.
*   **Web UI (`adk web`) Eval Tab:** Provides an interactive way to run evaluations (using evalsets) and capture sessions to create new test cases.

For this stage, we'll focus on the programmatic approach using a **`.test.json` file** and **`pytest`** with **`AgentEvaluator`**.

**Implementation:**

1.  **Install Evaluation Dependencies:** You need `pytest` to run the tests and libraries for comparing results.

    ```bash
    # In your terminal, with 'myenv' active:
    pip install pytest rouge_score nltk
    ```
    *(Note: `rouge_score` uses `nltk`. The first time it runs, `nltk` might prompt you to download data packages like 'punkt'. If it does, follow the instructions or run `python -m nltk.downloader punkt` in your environment).*

2.  **Create a Test File:** Inside your `my_first_agent` directory, create `agent_tests.test.json`. This file defines our test scenarios. We'll test the agent's ability to use the time tool, the artifact reading tool, and handle a general knowledge question.

    ```json
    // filename: my_first_agent/agent_tests.test.json
    [
      {
        "query": "What time is it right now?",
        "expected_tool_use": [
          {
            "tool_name": "get_current_time_and_save_artifact", // Match the function name used by FunctionTool
            "tool_input": {} // Expects no arguments from the LLM for this tool
          }
        ],
        "expected_intermediate_agent_responses": [], // Not used in this simple case
        // Reference focuses on structure/intent, not the exact time string
        "reference": "The current time is [TIME]"
      },
      {
        "query": "What was the last timestamp saved?",
        "expected_tool_use": [
          {
            "tool_name": "read_last_timestamp_artifact", // Match the tool function name
            "tool_input": {} // Expects no arguments
          }
        ],
        "expected_intermediate_agent_responses": [],
        // Reference expects retrieval of a timestamp
        "reference": "The timestamp saved in the artifact was [TIME]"
      },
      {
        "query": "What is the capital of Germany?",
        "expected_tool_use": [], // <<< Expect NO tool use for this query
        "expected_intermediate_agent_responses": [],
        // Reference is a direct factual answer
        "reference": "The capital of Germany is Berlin."
      }
    ]
    ```
    *   `query`: Input to simulate from the user.
    *   `expected_tool_use`: A list specifying which tools the agent *should* call, in order. Each entry requires `tool_name` (must match the name ADK uses, usually the Python function name) and `tool_input` (a dictionary representing the arguments the LLM *should* generate for the tool; `{}` if no arguments are expected). An empty list `[]` asserts that *no* tools should be called.
    *   `reference`: An example of a good final answer. The evaluator uses text similarity metrics (like ROUGE) to compare the agent's actual response to this reference. This means placeholders like `[TIME]` are acceptable for dynamic content; the structure and surrounding text are more important than the exact value.

3.  **Create a Pytest Test Script:** In the parent `adk_tutorial` directory, create `test_my_agent.py`. This script imports `AgentEvaluator` and defines a test function.

    ```python
    # filename: test_my_agent.py (Place this in adk_tutorial directory)
    import pytest
    from google.adk.evaluation import AgentEvaluator # Import the core evaluation class
    import os # Import os to construct paths reliably

    # --- Configuration ---
    # Assumes pytest is run from the 'adk_tutorial' directory
    CURRENT_DIR = os.path.dirname(__file__)
    AGENT_MODULE_PATH = "my_first_agent" # Path to the agent package relative to adk_tutorial
    TEST_DATA_FILE = os.path.join(CURRENT_DIR, "my_first_agent", "agent_tests.test.json")

    # --- Test Function ---
    def test_agent_basic_interactions():
        """
        Uses AgentEvaluator to run the scenarios defined in the .test.json file
        against the agent defined in the specified module path.
        """
        print(f"\n--- Starting Agent Evaluation ---")
        print(f"Agent Module: {AGENT_MODULE_PATH}")
        print(f"Test Data: {TEST_DATA_FILE}")

        if not os.path.exists(TEST_DATA_FILE):
             pytest.fail(f"Test data file not found: {TEST_DATA_FILE}")

        try:
            # The core evaluation call
            AgentEvaluator.evaluate(
                # Tells the evaluator where to find the 'root_agent' variable
                agent_module=AGENT_MODULE_PATH,
                # Specifies the JSON file containing the test cases
                eval_dataset_file_path_or_dir=TEST_DATA_FILE,
                # Optional: Run each test case only once. Default is 2.
                num_runs=1
                # Optional: Define custom pass/fail criteria here via test_config.json
                # or by passing criteria dictionary directly if supported.
            )
            print(f"--- Agent Evaluation Completed Successfully ---")
        except FileNotFoundError:
            # This case is handled above, but kept for robustness
            pytest.fail(f"Test data file not found at {TEST_DATA_FILE}.")
        except ImportError:
             pytest.fail(f"Could not import agent module from '{AGENT_MODULE_PATH}'. Check the path and ensure 'my_first_agent/__init__.py' exists and is correct.")
        except Exception as e:
             # Catch any other errors during evaluation
             pytest.fail(f"Agent evaluation failed with an unexpected error: {e}")

    # You can add more pytest functions here to test other aspects or files
    ```

**Running the Evaluation:**

1.  Ensure you are in the `adk_tutorial` directory in your terminal.
2.  Make sure your virtual environment (`myenv`) is active and you've installed `pytest`, `rouge_score`, and `nltk` (`pip install pytest rouge_score nltk`).
3.  Execute `pytest`:

```bash
pytest -s -v test_my_agent.py
```
*   `-s`: Shows `print` statements from your code (including tool logs).
*   `-v`: Provides more verbose output from pytest itself.

**Expected Output & Observation:**

You'll see detailed output:

1.  Pytest starts and identifies the `test_agent_basic_interactions` function.
2.  The "Starting Agent Evaluation" print statements appear.
3.  For *each* of the three test cases in `agent_tests.test.json`:
    *   ADK runs the agent with the specified `query`.
    *   You'll see the console logs from your tools (`--- Tool: ... executing ---`, `--- Tool: Saved artifact... ---`, `--- Tool: Loaded artifact... ---`) if they are called.
    *   `AgentEvaluator` logs messages indicating which query it's running.
    *   **Crucially, `AgentEvaluator` compares:**
        *   **Tool Trajectory:** Did the agent call the exact tools listed in `expected_tool_use`, in the right order, with compatible arguments?
        *   **Final Response:** How similar is the agent's final text response to the `reference` string (using ROUGE score)?
    *   You'll see output summarizing these comparisons (often showing scores or PASS/FAIL indicators based on default thresholds). *Note: The default criteria usually require a perfect tool trajectory match (score 1.0) and high text similarity (e.g., ROUGE score >= 0.8).*
4.  Pytest provides a final summary showing how many tests passed or failed.

If a test fails, the output will highlight the mismatch – e.g., "Tool trajectory mismatch: Expected ['tool_A'] but got ['tool_B']" or "Response match score X below threshold Y". This helps you pinpoint where the agent's behavior deviated from expectations.

*(Note: You can customize the pass/fail criteria by creating a `test_config.json` file next to your `.test.json` file, specifying thresholds for `tool_trajectory_avg_score` and `response_match_score`.)*

**Summary of Stage 11:**
You have now:
*   Learned the importance of structured evaluation for AI agents.
*   Understood the role of ADK's test files (`.test.json`) for defining scenarios with queries, expected tool calls, and reference answers.
*   Created a `.test.json` file covering different behaviours of your agent.
*   Installed `pytest` and supporting libraries (`rouge_score`, `nltk`).
*   Written a `pytest` script utilizing `AgentEvaluator.evaluate` to automate the execution of your test file against your agent.
*   Executed the evaluation and learned how to interpret the output, which compares actual agent behaviour (tool usage, final response) against your defined expectations.

Evaluation is a vital practice for ensuring your agent is reliable, safe, and performs as intended, especially as you make changes or retrain models. It provides automated checks that complement manual testing.

We've covered all the fundamental stages of building and testing an ADK agent! You now have a solid grasp of the core components and workflows.

What would you like to do now?

a)  **Review and Consolidate:** Go back over any of the stages (Agents, Tools, State, Callbacks, Workflows, Artifacts, Evaluation) if anything was unclear or you'd like to reinforce your understanding.
b)  **Deployment:** Discuss the steps and considerations for deploying this agent to a cloud environment like Cloud Run or Vertex AI Agent Engine.
c)  **More Advanced Topics:** Briefly discuss concepts like `CustomAgent`s, more complex Multi-Agent patterns (e.g., different delegation types), or using persistent `SessionService` / `ArtifactService` backends.

In [22]:
## add to chat
prompt = ("Analyst: yes I am ready for the next step.")
response = chat.send_message(message=prompt)

# self reflection step
p = (
  "Before the analyst views this conduct a thorough review of your output. "
  "Correct any errors or make improvements and return your output. "
)
response = chat.send_message(message=p)
display(Markdown(response.text))

Okay, let's refine the deployment stage explanation for clarity and completeness.

Excellent! Let's move on to **Deployment**. You've built and tested your agent locally, and now it's time to make it accessible over the web by deploying it to the cloud. We'll use **Google Cloud Run**, a serverless platform perfect for running containerized applications like our ADK agent without managing servers.

**Stage 12: Deploying Your Agent to Cloud Run using `adk deploy`**

**Concept:** Deployment involves packaging your agent code into a container image and running that image on cloud infrastructure. Cloud Run handles the servers, scaling, and networking, allowing users or other services to interact with your agent via an HTTP endpoint.

ADK provides a convenient command-line tool, `adk deploy cloud_run`, specifically designed to simplify this process for ADK agents.

**Prerequisites:**

1.  **Google Cloud SDK (`gcloud`):** You need `gcloud` installed and authenticated.
    *   Install: [Google Cloud SDK Install Guide](https://cloud.google.com/sdk/docs/install)
    *   Login: Run `gcloud auth login` and follow the browser prompts.
    *   Set Project: Run `gcloud config set project ap-alto-ml-1000` (using the project ID from your `.env`).
2.  **Enable Google Cloud APIs:** Ensure these APIs are enabled in the `ap-alto-ml-1000` project. You can usually do this via the Cloud Console or `gcloud services enable`:
    *   `cloudbuild.googleapis.com` (Cloud Build API)
    *   `artifactregistry.googleapis.com` (Artifact Registry API)
    *   `run.googleapis.com` (Cloud Run Admin API)
    *   `aiplatform.googleapis.com` (Vertex AI API - needed by the agent at runtime)
    *   Example command: `gcloud services enable cloudbuild.googleapis.com artifactregistry.googleapis.com run.googleapis.com aiplatform.googleapis.com`
3.  **Permissions:** The Google account used with `gcloud auth login` needs roles like "Cloud Build Editor", "Artifact Registry Writer", "Cloud Run Admin", and "Service Account User" to perform the deployment. The **service account** that Cloud Run will use to *run* your agent (usually the default Compute Engine service account unless specified otherwise) needs the "Vertex AI User" role to call the Gemini model. You might need to grant this role in the IAM section of the Google Cloud Console.

**The `adk deploy cloud_run` Command:**

This command streamlines packaging, building, and deploying your agent.

*   **What it does:** It finds your agent code, creates a suitable `Dockerfile` and server configuration, uses Cloud Build to build a container image, pushes that image to your project's Artifact Registry, and then deploys that image as a new service on Cloud Run.
*   **Where to run it:** Execute this command from the `adk_tutorial` directory (the parent directory containing `my_first_agent`).

**Implementation:**

1.  **Navigate to Project Directory:** Open your terminal and ensure you are in the `adk_tutorial` directory.
2.  **Ensure Virtual Environment:** Make sure your `myenv` virtual environment is active (`source myenv/bin/activate`).
3.  **Run the Deployment Command:**

    ```bash
    # Ensure gcloud is authenticated and project is set to ap-alto-ml-1000

    adk deploy cloud_run \
        --project=ap-alto-ml-1000 \
        --region=us-central1 \
        --service-name=my-first-adk-agent-service \
        my_first_agent
        # Note: No --allow-unauthenticated flag here, we'll keep it secure by default.
    ```

    *   `--project=ap-alto-ml-1000`: Matches your `.env` setting.
    *   `--region=us-central1`: Matches your `.env` setting.
    *   `--service-name=my-first-adk-agent-service`: A unique name for your service on Cloud Run. You can change this if you like.
    *   `my_first_agent`: The **path** to your agent package directory (relative to where you run the command).
    *   `--app_name` (Optional): By default, the ADK server inside the container will expose your agent under an app name matching the directory (`my_first_agent`). You can override this with `--app_name=your_preferred_app_name`. We'll stick with the default for the `curl` commands later.
    *   `--allow-unauthenticated`: We are *omitting* this flag. This means the deployed service will require authentication, which is safer.

**Execution & Observation:**

The command will output progress updates:

1.  Identifying agent source code in `my_first_agent`.
2.  Creating temporary build files.
3.  Submitting the build to Google Cloud Build. You'll see logs streaming from the build process (installing dependencies, building the container). This might take a few minutes.
4.  Pushing the container image to Artifact Registry.
5.  Deploying the image to Cloud Run.
6.  **Authentication Prompt:** Because we omitted `--allow-unauthenticated`, Cloud Run asks:
    `Allow unauthenticated invocations to [my-first-adk-agent-service] (y/N)?`
    Press **`N`** (or just hit Enter) to keep the service secure.
7.  **Success:** The command finishes by printing the **`Service URL`**. This is the HTTPS address of your deployed agent. **Copy this URL carefully.** Example: `https://my-first-adk-agent-service-abc123xyz-uc.a.run.app`

    *   **(Runtime Service Account Permissions):** If the deployment succeeds but requests later fail with permission errors related to Vertex AI, ensure the *runtime service account* used by Cloud Run (often `PROJECT_NUMBER-compute@developer.gserviceaccount.com` by default) has the "Vertex AI User" role in IAM.

**Testing the Deployed Agent via `curl`:**

Now, let's send an HTTP request to the deployed agent's API.

1.  **Set URL Environment Variable:** In your terminal, store the Service URL you copied.

    ```bash
    export APP_URL="PASTE_YOUR_DEPLOYED_SERVICE_URL_HERE"
    # Example: export APP_URL="https://my-first-adk-agent-service-abc123xyz-uc.a.run.app"
    ```

2.  **Get Authentication Token:** Since the service requires authentication, get an OIDC identity token associated with your `gcloud` user.

    ```bash
    export TOKEN=$(gcloud auth print-identity-token)
    # This token allows Cloud Run to verify your identity.
    ```

3.  **Send a Request:** Use `curl` to call the `/run` endpoint. Make sure `app_name` in the JSON payload matches your agent's app name (default is the directory name, `my_first_agent`).

    ```bash
    curl -X POST "${APP_URL}/run" \
         -H "Authorization: Bearer $TOKEN" \
         -H "Content-Type: application/json" \
         -d '{
           "app_name": "my_first_agent",
           "user_id": "cloud_run_user_001",
           "session_id": "cloud_run_session_abc",
           "new_message": {
             "role": "user",
             "parts": [{"text": "What time is it?"}]
           },
           "streaming": false
         }'
    ```

**Expected Output & Observation:**

If everything is configured correctly (including runtime permissions for the service account), you should receive a JSON response back from your Cloud Run service. This JSON will be a list containing the sequence of `Event` objects generated during the agent's execution for that request. You should see events related to the LLM call, the `get_current_time_and_save_artifact` tool execution, and the final agent response containing the time.

*   **Troubleshooting:** If you get authentication errors (401/403), double-check that you included the `Authorization: Bearer $TOKEN` header correctly and that your `gcloud` user has permission to invoke the Cloud Run service. If you get errors related to Vertex AI calls *within* the agent's execution, check the Cloud Run service's runtime service account permissions in IAM. Check the Cloud Run service logs in the Google Cloud Console for more detailed error messages.

*   **Persistence Caveat:** Remember, the agent deployed here still uses `InMemoryArtifactService` by default. This means the `"last_timestamp.txt"` artifact saved in one request might not be available in a subsequent request if Cloud Run uses a different server instance. For persistent artifacts in production, you would need to configure and use `GcsArtifactService`.

**Summary of Stage 12:**
You have successfully:
*   Learned the purpose of deploying an agent to make it accessible.
*   Understood the prerequisites for deploying to Google Cloud Run (`gcloud`, APIs, Permissions).
*   Used the `adk deploy cloud_run` command to automate the containerization and deployment process.
*   Configured the deployed service to require authentication.
*   Obtained the public URL for your Cloud Run service.
*   Tested the live, deployed agent by sending authenticated HTTP requests using `curl` and obtaining an identity token.
*   Recognized the importance of runtime service account permissions for cloud services like Vertex AI.

Your agent is now running as a scalable, secure service in the cloud!

This concludes the main tutorial path, covering building, testing, and deploying an ADK agent with various features.

What would you like to focus on next?

a)  **Clean Up:** Learn how to delete the deployed Cloud Run service to avoid ongoing costs.
b)  **Review:** Go back over any previous stage if you have questions.
c)  **Advanced Topics:** Briefly discuss `CustomAgent`s, persistent storage services, or other specific features.

In [23]:
## add to chat
prompt = ("Analyst: yes I am ready for the next step.")
response = chat.send_message(message=prompt)

# self reflection step
p = (
  "Before the analyst views this conduct a thorough review of your output. "
  "Correct any errors or make improvements and return your output. "
)
response = chat.send_message(message=p)
display(Markdown(response.text))

Okay, let's finalize the cleanup stage description.

Okay, great. Before we wrap up, it's important to know how to remove the cloud resources we created during deployment. This prevents any potential ongoing costs for services you're no longer using.

**Stage 13: Cleaning Up Cloud Resources**

**Concept:** Deploying services to the cloud consumes resources (computing instances, network addresses, storage). Even if idle, these can sometimes incur small charges. It's a standard best practice, especially after tutorials or experiments, to delete resources you don't need anymore.

In Stage 12, the `adk deploy cloud_run` command created two main things in your Google Cloud project:
1.  A **Cloud Run Service:** This is the running instance of your agent application.
2.  A **Container Image:** Stored in Google Artifact Registry, this is the packaged version of your agent code that Cloud Run uses.

We'll use the `gcloud` command-line tool (which you used for authentication and project setup) to remove the Cloud Run service.

**Implementation:**

1.  **Confirm `gcloud` Configuration:** Ensure your terminal is still authenticated to Google Cloud (`gcloud auth login` if needed) and configured for the correct project (`gcloud config set project ap-alto-ml-1000`).
2.  **Identify Service Name and Region:** Recall the `--service-name` (e.g., `my-first-adk-agent-service`) and `--region` (e.g., `us-central1`) you used during the `adk deploy` command.
3.  **Delete the Cloud Run Service:** Execute the following `gcloud` command, replacing the service name and region if yours were different:

    ```bash
    # Ensure your terminal is authenticated and project is set to ap-alto-ml-1000

    gcloud run services delete my-first-adk-agent-service \
        --region=us-central1 \
        --quiet
        # Remove --quiet if you want to be prompted for confirmation (y/N)
    ```
    *   `my-first-adk-agent-service`: The name of the service to delete.
    *   `--region=us-central1`: The location of the service.
    *   `--quiet`: Skips the confirmation prompt. Omit this if you prefer to confirm manually.

4.  **(Optional) Clean Up Artifact Registry Image:** The `adk deploy` command pushed a container image to Artifact Registry (usually in a repository named `cloud-run-source-deploy`). While storage costs for a single image are typically negligible, you *can* delete it if desired. However, managing container images and tags can be complex. For most tutorial cleanup purposes, **deleting the Cloud Run service is the most important step**. If you wish to delete the image, you would typically use `gcloud artifacts docker images list ...` to find the specific image digest (the unique `sha256:...` identifier) and then `gcloud artifacts docker images delete ...@[IMAGE_DIGEST]`. We'll skip the detailed commands here as simply deleting the service is usually sufficient after this tutorial.

**Execution & Observation:**

*   Running the `gcloud run services delete` command will show progress and output messages confirming that the service is being deleted.
*   Once deleted, the `APP_URL` for your service will stop working (you'll get errors if you try to `curl` it).
*   You can also verify the deletion by visiting the Cloud Run section in your Google Cloud Console.

**Summary of Stage 13:**
You have learned:
*   The importance of cleaning up deployed cloud resources to manage costs.
*   How to use the `gcloud run services delete` command to remove your deployed Cloud Run service.
*   That container images are stored in Artifact Registry and can optionally be deleted (though often less critical than deleting the running service).

This concludes our step-by-step tutorial on the Agent Development Kit!

**Overall Review:**

Congratulations! You started with just a Python environment and have successfully navigated through:

1.  Creating a basic `LlmAgent`.
2.  Adding capabilities with a custom `FunctionTool`.
3.  Implementing short-term memory using Session State and `ToolContext`.
4.  Intercepting LLM calls with `before_model_callback` for logging and modification.
5.  Adding tool-specific validation using `before_tool_callback`.
6.  Structuring a Multi-Agent System with delegation between a root agent and a specialist sub-agent.
7.  Enforcing deterministic workflows using `SequentialAgent`.
8.  Speeding up independent tasks using `ParallelAgent`.
9.  Creating iterative processes using `LoopAgent` and escalation signals.
10. Managing file-like data using Artifacts (`save_artifact`, `load_artifact`).
11. Programmatically evaluating agent behavior using `AgentEvaluator` and `pytest`.
12. Deploying your agent as a scalable, secure service on Google Cloud Run using `adk deploy`.
13. Cleaning up the deployed cloud resources.

You've gained practical experience with the essential components and patterns for building sophisticated applications with ADK.

**What's Next?**

From here, you can:

*   **Practice:** Try modifying the agents, adding more tools, or combining workflow agents in different ways.
*   **Review:** Revisit any stage that felt complex or unclear. The documentation provides more in-depth explanations.
*   **Explore Persistent Storage:** Experiment with `DatabaseSessionService` or `GcsArtifactService` if you need data to persist between application runs.
*   **Dive Deeper:** Look into `CustomAgent`s for non-LLM logic, advanced multi-agent communication patterns, or the specifics of the `MemoryService`.
*   **Build Your Own!** Start designing and implementing your own agentic application using the skills you've learned.

Do you have any final questions about the cleanup stage or any other part of the tutorial?

In [24]:
## add to chat
prompt = ("Analyst: yes I am ready for the next step.")
response = chat.send_message(message=prompt)

# self reflection step
p = (
  "Before the analyst views this conduct a thorough review of your output. "
  "Correct any errors or make improvements and return your output. "
)
response = chat.send_message(message=p)
display(Markdown(response.text))

Excellent! We've now covered the entire planned tutorial path, from creating your very first simple agent all the way through adding tools, state, callbacks, workflows, artifacts, evaluation, deployment, and finally, cleanup.

You've successfully built, tested, and deployed a functional agent, touching upon many of the core capabilities of the Agent Development Kit.

**Recap of Stages Covered:**

1.  Basic `Agent` Definition
2.  Adding a `FunctionTool`
3.  Using Session `State` with `ToolContext`
4.  Intercepting LLM calls with `before_model_callback`
5.  Guarding tool execution with `before_tool_callback`
6.  Building a Multi-Agent Team with Delegation (`sub_agents`)
7.  Deterministic flow with `SequentialAgent`
8.  Concurrent execution with `ParallelAgent`
9.  Iteration and conditional loops with `LoopAgent`
10. Handling files/data with `Artifacts`
11. Programmatic `Evaluation` using `pytest` and `AgentEvaluator`
12. Deployment to `Cloud Run` using `adk deploy`
13. Cleaning up deployed `Cloud Run` resources

You now have a solid foundation for working with ADK.

Since we've completed the structured tutorial, the "next step" is up to you! How would you like to proceed? We could:

1.  **Review & Reinforce:** Go back over any of the stages (1-13) or specific concepts (like state vs. artifacts, or the difference between delegation and workflow agents) if anything was unclear or you'd like more examples.
2.  **Discuss Next Steps & Advanced Topics:** Talk about how you might apply these concepts to build *your* specific agent. We could also briefly discuss more advanced ADK features, such as:
    *   Persistent Storage (`DatabaseSessionService`, `GcsArtifactService`)
    *   Long-Term Memory (`MemoryService`)
    *   `CustomAgent`s (for non-LLM-based logic)
    *   More complex Multi-Agent patterns.
3.  **Q&A:** Address any specific questions you have about ADK, agent development in general, or anything we've covered.

Let me know what would be most helpful for you at this point!

In [25]:
## add to chat
prompt = ("Analyst: yes I am ready for the next step.")
response = chat.send_message(message=prompt)

# self reflection step
p = (
  "Before the analyst views this conduct a thorough review of your output. "
  "Correct any errors or make improvements and return your output. "
)
response = chat.send_message(message=p)
display(Markdown(response.text))

Okay, let's refine the explanation of Persistence.

Okay, Analyst. We've successfully deployed the agent to Cloud Run, but there's a critical consideration for making it truly robust: **Persistence**.

Throughout the tutorial, we relied on ADK's default `InMemorySessionService` and `InMemoryArtifactService`. These are excellent for getting started and local testing because they require zero setup. However, they have one significant drawback:

*   **`InMemory` Limitation:** All session history, conversation state, and saved artifacts are stored only in the running application's memory. **If the application stops or restarts (which happens frequently in cloud environments like Cloud Run), all that data is lost.**

For any agent intended for real use, where conversations might span multiple interactions or where generated files need to be kept, we need persistent storage.

**Stage 14: Understanding Persistent Storage (Session & Artifact Services)**

**Concept:** ADK is designed to work with different backend storage systems. It achieves this through swappable `SessionService` and `ArtifactService` implementations that save data durably, ensuring it survives application restarts and can potentially be shared across multiple running instances of your agent (crucial for scaling).

**Why Use Persistent Storage?**

*   **Conversation Continuity:** Users can pick up conversations later, even if the server restarted or they connect to a different instance. The agent retains the history and state.
*   **Long-Term Data:** User-specific preferences (`user:` state) or files (`user:` artifacts) remain associated with the user across different sessions and times.
*   **Scalability & Reliability:** Allows multiple instances of your deployed agent (e.g., scaled up on Cloud Run) to access the same, consistent session data. Protects against data loss from crashes or restarts.

**ADK's Persistent Service Options (Examples based on Documentation):**

1.  **`DatabaseSessionService`:**
    *   **Stores:** Session metadata (ID, user, app), the list of `Events` (history), and the `state` dictionary (including `app:`, `user:`, and session scopes).
    *   **Where:** A configured relational database (e.g., PostgreSQL, MySQL, Cloud Spanner via appropriate drivers, or even SQLite for simple local persistence).
    *   **Setup:** Requires providing a database connection URL. ADK often uses libraries like SQLAlchemy, which might handle schema creation/migration. You'll need the relevant database driver installed (e.g., `psycopg2` for PostgreSQL).
    *   **Use Case:** Suitable for deployments where you manage your own database infrastructure or use managed database services.

2.  **`VertexAiSessionService`:**
    *   **Stores:** Session metadata, event history, and state. Managed by Google Cloud.
    *   **Where:** Google Cloud's scalable Vertex AI backend infrastructure.
    *   **Setup:** Requires configuring your environment for Google Cloud (Project ID, Location, Credentials with Vertex AI permissions). The `app_name` used when interacting with this service should typically be the resource name of your deployed Vertex AI Agent Engine.
    *   **Use Case:** Ideal for fully managed deployments on Google Cloud, especially when deploying *to* Vertex AI Agent Engine itself, leveraging its integrated session management.

3.  **`GcsArtifactService`:**
    *   **Stores:** Artifact data (the actual binary content of files like images, PDFs, etc.).
    *   **Where:** A specified Google Cloud Storage (GCS) bucket.
    *   **Setup:** Requires providing the GCS bucket name during instantiation. The agent's runtime environment (e.g., the Cloud Run service account) needs IAM permissions (like `roles/storage.objectAdmin`) to read and write objects in that bucket.
    *   **Use Case:** The standard way to handle persistent file storage for ADK agents deployed on Google Cloud.

**How to Use (Configuring the `Runner`):**

The key step is to instantiate the desired persistent service(s) and provide them to the `Runner` when you initialize it in your application's entry point (e.g., your FastAPI `main.py` if deploying manually, or handled by `adk deploy` configurations).

```python
# Conceptual Example - Configuring Runner with Persistent Services

# Necessary Imports
from google.adk.sessions import DatabaseSessionService # Example
from google.adk.artifacts import GcsArtifactService
from google.adk.runners import Runner
# Assume 'root_agent' is defined elsewhere

# --- Configuration (Replace with your actual values) ---
# Example DB URL (ensure driver like 'psycopg2' is installed if using PostgreSQL)
DATABASE_CONNECTION_URL = "postgresql+psycopg2://db_user:db_password@db_host:5432/adk_sessions_db"
GCS_ARTIFACT_BUCKET_NAME = "your-gcs-bucket-for-adk-artifacts" # Must exist in your GCP project
PERSISTENT_APP_NAME = "my_persistent_agent_app" # Consistent name for the app

# --- Instantiate Persistent Services ---
try:
    # Session Service for DB
    persistent_session_service = DatabaseSessionService(db_url=DATABASE_CONNECTION_URL)
    # Optional: Add logic here to ensure DB schema exists if not automatically handled
    print(f"DatabaseSessionService configured for: {DATABASE_CONNECTION_URL}")

    # Artifact Service for GCS
    persistent_artifact_service = GcsArtifactService(bucket_name=GCS_ARTIFACT_BUCKET_NAME)
    # Ensure runtime environment (e.g., Cloud Run service account) has GCS permissions
    print(f"GcsArtifactService configured for bucket: {GCS_ARTIFACT_BUCKET_NAME}")

    # --- Configure Runner with these Services ---
    persistent_runner = Runner(
        agent=root_agent,
        app_name=PERSISTENT_APP_NAME, # Use the consistent application name
        session_service=persistent_session_service,   # <<< Use persistent service
        artifact_service=persistent_artifact_service, # <<< Use persistent service
    )
    print("Runner configured successfully with persistent Session and Artifact services.")

    # --- How it affects runtime ---
    # Now, when this 'persistent_runner' handles requests:
    # - session_service.create_session / get_session reads/writes from/to the database.
    # - session_service.append_event saves events and applies state changes to the database.
    # - context.save_artifact / load_artifact (called within tools/callbacks) will
    #   interact with the specified GCS bucket via persistent_artifact_service.

except ImportError as ie:
     print(f"Error configuring services: Missing library? (e.g., install 'psycopg2-binary' for PostgreSQL) Details: {ie}")
except Exception as e:
    print(f"Error configuring persistent services: {e}")
    print("Check connection strings, bucket names, cloud permissions, and required libraries.")
    persistent_runner = None # Prevent use if setup failed

# --- Example Usage (Conceptual) ---
# if persistent_runner:
#    # Start your web server (e.g., uvicorn) or use runner directly
#    # await persistent_runner.run_async(...)
#    pass

```

**Key Changes with Persistent Services:**

*   **Data Durability:** Session history, state (except keys prefixed with `temp:`), and artifacts saved via context methods *will* survive application restarts, crashes, or scaling events.
*   **State Scopes Matter More:** Prefixes like `user:` and `app:` become truly meaningful, allowing data sharing across sessions or users as intended, managed by the persistent service.
*   **Setup Complexity:** Requires configuring external resources (database, GCS bucket) and managing credentials/permissions correctly.

**Summary of Stage 14:**
You have now:
*   Understood the critical difference between `InMemory` services (for testing) and persistent services (for real applications).
*   Learned *why* persistence is necessary for conversation continuity, user data, scalability, and reliability.
*   Become familiar with ADK's persistent service options described in the documentation: `DatabaseSessionService`, `VertexAiSessionService`, and `GcsArtifactService`.
*   Seen conceptually how to instantiate these services and configure the `Runner` to use them.
*   Recognized the implications for data durability and state scoping when using persistent backends.

Thinking about persistence *before* or *during* deployment is crucial for building agents that function correctly and reliably in production environments.

This concludes the core tutorial path based on the provided documentation! We've journeyed from a simple "Hello, World" agent to a multi-agent system with tools, state, callbacks, workflows, artifacts, evaluation, cloud deployment, and now, an understanding of persistence.

What would be the most valuable next step for you?

1.  **Review & Consolidate:** Revisit any stage (1-14) or concept.
2.  **Discuss Your Use Case:** How can you apply these ADK features to the specific agent *you* want to build?
3.  **Explore Specific Docs:** Point you towards documentation for features we only touched upon (e.g., `MemoryService`, specific tool wrappers, `CustomAgent`).
4.  **End Session:** Conclude if you feel equipped with the foundational knowledge.

In [26]:
## add to chat
prompt = ("Analyst: yes I am ready for the next step.")
response = chat.send_message(message=prompt)

# self reflection step
p = (
  "Before the analyst views this conduct a thorough review of your output. "
  "Correct any errors or make improvements and return your output. "
)
response = chat.send_message(message=p)
display(Markdown(response.text))

Okay, Analyst! We've reached the end of the structured tutorial based on the core concepts presented in the ADK documentation.

**Let's quickly recap the journey:**

You started with the basics and successfully progressed through:

*   **Agent Creation:** Defining simple `LlmAgent`s.
*   **Tools:** Giving agents capabilities with `FunctionTool`.
*   **State Management:** Enabling short-term memory within sessions using `ToolContext` and `session.state`.
*   **Callbacks:** Intercepting and controlling execution flow (`before_model_callback`, `before_tool_callback`).
*   **Multi-Agent Systems:** Building a team with delegation (`sub_agents`).
*   **Workflow Agents:** Enforcing deterministic execution order (`SequentialAgent`, `ParallelAgent`, `LoopAgent`).
*   **Artifacts:** Handling file-like data (`save_artifact`, `load_artifact`).
*   **Evaluation:** Programmatically testing agent behavior (`AgentEvaluator`, `pytest`).
*   **Deployment:** Running the agent on the cloud (`Cloud Run` via `adk deploy`).
*   **Persistence:** Understanding the need for persistent storage (`DatabaseSessionService`, `GcsArtifactService`, etc.) beyond `InMemory` options.

You now have a solid grasp of the fundamental building blocks and the development lifecycle within the Agent Development Kit.

Since we've covered the core sequence, the "next step" is really about **what would be most valuable for *you* to focus on now**. How would you like to proceed? Here are some options:

1.  **Review & Reinforce:** Would you like to revisit any of the stages (1-14) or concepts (e.g., the difference between state and artifacts, how callbacks control flow, the types of workflow agents)? We can go over specific examples again or clarify details.
2.  **Discuss Your Specific Goals:** We could talk about the kind of agent *you* ultimately want to build. How might the ADK features we've covered apply to your use case? What challenges do you anticipate?
3.  **Explore Advanced Topics (Overview):** I can provide a high-level overview of some features we didn't dive deep into, explaining *what* they are and *when* you might use them:
    *   **`MemoryService`:** For long-term, cross-session memory search.
    *   **`CustomAgent`:** For building agents with unique, non-LLM-driven logic.
    *   **Other Tool Integrations:** Deeper look at OpenAPI/API Hub tools, LangChain/CrewAI wrappers, or specific Google Cloud tools.
    *   **Streaming:** Building real-time voice/video agents.
4.  **Documentation Deep Dive:** If you have a specific area of interest, I can point you to the most relevant sections in the ADK documentation we reviewed earlier.
5.  **Conclude This Session:** If you feel comfortable with the foundation we've built and prefer to explore further on your own using the documentation and examples.

Let me know what direction would be most helpful for you now!

In [29]:
## add to chat
prompt = ("Analyst: Lets step through the advanced topics: MemoryService and then CustomAgent")
response = chat.send_message(message=prompt)

# self reflection step
p = (
  "Before the analyst views this conduct a thorough review of your output. "
  "Correct any errors or make improvements and return your output. "
)
response = chat.send_message(message=p)
display(Markdown(response.text))

Okay, great choices! Let's delve into those advanced topics, starting with `MemoryService`.

**Advanced Topic 1: `MemoryService` - Long-Term, Searchable Knowledge**

**Concept:** We've mastered `session.state` for short-term memory within a *single* conversation. But what happens when a user starts a *new* conversation tomorrow and asks, "What was that project name we discussed last time?" Or, what if the agent needs to consult a large database of product manuals or company policies that aren't part of the immediate chat? This is where **Memory** and the `MemoryService` come in.

*   **Memory vs. Session State:**
    *   `Session State`: Volatile (unless persisted via `SessionService`), tied to *one specific session*, acts like a **scratchpad** for the current interaction, accessed by **key**.
    *   `Memory`: Designed for **long-term persistence**, potentially spanning *multiple sessions* for a user or holding shared knowledge, acts like a **searchable archive/library**, accessed by **query** (often semantic search).
*   **Purpose:** To provide agents with a broader context than just the current conversation, enabling:
    *   **Continuity:** Recalling user preferences, past decisions, or key facts mentioned in previous, separate conversations.
    *   **Knowledge Retrieval:** Searching through ingested documents, FAQs, product specifications, or other relevant data sources to answer questions accurately.
    *   **Personalization:** Tailoring responses based on a longer history of user interactions.
*   **The `MemoryService` (`BaseMemoryService` Interface):** This is the component responsible for managing the long-term, searchable knowledge store. Its core functions are:
    *   `add_session_to_memory(session)`: To **ingest** information. Takes data from a `Session` (often after it's completed) and processes/indexes relevant parts (like text from events) into the memory store. *How* it processes and indexes depends heavily on the specific implementation.
    *   `search_memory(app_name, user_id, query)`: To **retrieve** information. Takes a natural language `query` and searches the memory store for the most relevant pieces of information associated with the `user_id` (and potentially `app_name`). It returns matching results, typically ranked by relevance.
*   **Implementations (from Documentation):**
    *   `InMemoryMemoryService`: Stores extracted information in the application's memory and performs simple **keyword matching** for search. **Not persistent** (data lost on restart). Useful only for basic local testing.
    *   `VertexAiRagMemoryService`: Leverages Google Cloud's Vertex AI RAG service. Ingests data into a RAG Corpus and performs powerful **semantic search** (finding results based on meaning, not just keywords). **Persistent and scalable**. Requires GCP setup, a RAG Corpus, `pip install google-adk[vertexai]`, and proper authentication. Ideal for production on Google Cloud.
*   **Typical Workflow:**
    1.  **Configuration:** You instantiate a `MemoryService` (e.g., `VertexAiRagMemoryService`) and provide it to the `Runner` during setup.
    2.  **Ingestion:** Your application logic determines when to enrich the memory, often by calling `memory_service.add_session_to_memory(completed_session)` after a conversation ends or at meaningful milestones.
    3.  **Retrieval Need:** In a later session, the user asks something requiring past context ("Remind me about Project Alpha details").
    4.  **Agent Uses Memory Tool:** The `LlmAgent`'s instructions guide it to use a specific tool designed for memory retrieval when needed (e.g., the built-in `load_memory` tool or a custom one like `recall_past_info` below). The LLM generates a query for the tool (e.g., `query="Project Alpha details from past discussions"`).
    5.  **Tool Calls Service:** The tool function receives the query and uses `tool_context.search_memory(query=...)`. This method, provided by the context object, interacts with the `MemoryService` instance configured in the `Runner`.
    6.  **Service Searches:** The `MemoryService` searches its underlying store (using keyword or semantic search) and returns relevant results (e.g., `SearchMemoryResponse` containing `MemoryResult` objects with text snippets).
    7.  **Tool Returns Results:** The tool packages the retrieved snippets into its dictionary return value.
    8.  **Agent Synthesizes Answer:** The `LlmAgent` receives the results from the tool and uses this retrieved information, along with the current conversation context, to generate the final answer for the user.

**Conceptual Code Example (Agent using a Memory Tool):**

This focuses on how an agent and tool interact with the memory system via `ToolContext`. We assume a `MemoryService` is configured in the `Runner`.

```python
# Conceptual Code - Agent using a Memory Tool

from google.adk.agents import Agent
from google.adk.tools import FunctionTool, ToolContext
from google.adk.memory import SearchMemoryResponse # Assuming this structure exists
from typing import Dict, Any, List

# --- Assume MemoryService is configured in Runner ---
# runner = Runner(..., memory_service=my_memory_service_instance)
# Assume data was previously added via memory_service.add_session_to_memory(...)

# --- Define a Tool that uses MemoryService via ToolContext ---
def recall_past_info(query: str, tool_context: ToolContext) -> dict:
    """
    Searches the user's long-term memory archive for information relevant to the query.
    Use this tool when the user explicitly asks about past interactions, preferences,
    stored facts, or details from previous conversations (e.g., 'What did we discuss about X?',
    'Remind me of my preference for Y', 'Find my notes on Z').

    Args:
        query (str): A natural language query describing the information to retrieve from memory.

    Returns:
        A dictionary containing the search status and results.
        On success: {'status': 'found', 'results': ['Relevant snippet 1', 'Relevant snippet 2', ...]}
        If not found: {'status': 'not_found', 'message': 'No relevant information found in memory archive.'}
        On error: {'status': 'error', 'error_message': 'Details about the error.'}
    """
    print(f"--- Tool: recall_past_info searching memory for: '{query}' ---")
    try:
        # tool_context provides access to the MemoryService configured in the Runner
        # This triggers the search_memory method of the service instance.
        search_response: SearchMemoryResponse = tool_context.search_memory(query=query)

        # Process the response (structure might vary based on MemoryService implementation)
        if search_response and search_response.results:
            # Example: Extracting text snippets from results
            # The actual structure of 'res' depends on the MemoryService implementation.
            # For VertexAiRagMemoryService, results might be different.
            # This is a simplified placeholder extraction logic.
            extracted_snippets: List[str] = []
            for res in search_response.results:
                 # Adapt this based on the actual result object structure
                 if hasattr(res, 'text') and res.text:
                     extracted_snippets.append(res.text)
                 elif hasattr(res, 'content') and res.content: # Another possible structure
                     extracted_snippets.append(str(res.content))

            if extracted_snippets:
                print(f"--- Tool: Found {len(extracted_snippets)} relevant snippet(s) in memory. ---")
                return {"status": "found", "results": extracted_snippets}
            else:
                 print(f"--- Tool: Found results object, but no extractable text content. ---")
                 return {"status": "not_found", "message": "Relevant information structure found, but no text content available."}
        else:
            print(f"--- Tool: No relevant snippets found in memory. ---")
            return {"status": "not_found", "message": "No relevant information found in your memory archive."}

    except ValueError as e:
         # This error often occurs if MemoryService wasn't configured in the Runner
         print(f"--- Tool ERROR: Failed to search memory: {e} (Is MemoryService configured in Runner?) ---")
         return {"status": "error", "error_message": f"Configuration error accessing memory: {e}"}
    except Exception as e:
        # Catch other potential errors during the search (e.g., network issues with cloud service)
        print(f"--- Tool ERROR: Unexpected error searching memory: {e} ---")
        return {"status": "error", "error_message": f"Unexpected error during memory search: {e}"}

# Wrap the function into an ADK Tool
memory_tool = FunctionTool(func=recall_past_info)

# --- Define an Agent that uses the Memory Tool ---
memory_enabled_agent = Agent(
    name="knowledgeable_assistant",
    model="gemini-2.0-flash-001", # Or another suitable model
    instruction="You are an assistant with access to a long-term memory archive about the user's past interactions and preferences. "
                "Answer questions directly if you know the answer. "
                "However, if the user asks to recall specific details from the past, previous discussions, "
                "stored preferences, or information they previously provided (e.g., 'What did I say about X?', 'Remind me of Y', 'Find my notes on Z'), "
                "you MUST use the 'recall_past_info' tool to search the memory archive. "
                "Synthesize the results from the tool into your answer.",
    tools=[memory_tool], # Provide the memory access tool
)

# --- Note on Running ---
# To run this agent effectively:
# 1. An appropriate MemoryService instance (e.g., InMemoryMemoryService or VertexAiRagMemoryService)
#    must be created and passed to the Runner: `Runner(..., memory_service=...)`.
# 2. Data must have been previously ingested into that MemoryService using
#    `memory_service.add_session_to_memory(...)`.
# 3. Then, running the agent with a query like "What was mentioned about Project Alpha?"
#    should trigger the 'recall_past_info' tool.

```

**Key Takeaway for `MemoryService`:** It provides a **searchable** (often semantically), potentially **persistent**, **cross-session** knowledge layer. Agents access it indirectly via **Tools** that utilize the `tool_context.search_memory` method, which in turn calls the `MemoryService` instance configured in the `Runner`. This complements the short-term, key-value nature of `session.state`.

Does this explanation of `MemoryService` and how agents interact with it make sense? Are you ready to move on to `CustomAgent`?

In [30]:
## add to chat
prompt = ("Analyst: yes I am ready for CustomAgent.")
response = chat.send_message(message=prompt)

# self reflection step
p = (
  "Before the analyst views this conduct a thorough review of your output. "
  "Correct any errors or make improvements and return your output. "
)
response = chat.send_message(message=p)
display(Markdown(response.text))

Okay, let's refine the `CustomAgent` explanation for the analyst.

Okay, let's tackle **`CustomAgent`**. This is an advanced topic for situations where the predefined agent types (`LlmAgent`, `SequentialAgent`, `ParallelAgent`, `LoopAgent`) don't provide the exact control flow you need. Use `CustomAgent` when you want to orchestrate other agents or perform actions using **your own specific Python logic**.

**Advanced Topic 2: `CustomAgent` - Defining Your Own Execution Logic**

**Concept:** We've primarily used:

*   `LlmAgent`: Uses an LLM for flexible, dynamic reasoning and decision-making (including tool use and delegation). Its flow is non-deterministic.
*   `WorkflowAgent`s (`Sequential`, `Parallel`, `Loop`): Execute sub-agents according to fixed, predictable patterns. They provide deterministic flow control *without* using an LLM for the orchestration itself.

You need a `CustomAgent` when your desired orchestration logic involves:

*   **Complex Conditional Branching:** Running different sub-agents or sequences based on intricate conditions evaluated in Python code (e.g., checking multiple state variables, external factors).
*   **Custom Retry or Error Handling Logic:** Implementing sophisticated retry mechanisms for sub-agent failures beyond simple looping.
*   **Hybrid Logic:** Combining deterministic steps with calls to external systems, databases, or custom computations *directly within the orchestration flow* itself, rather than solely within tools called by an `LlmAgent`.
*   **Unique Workflow Patterns:** Creating orchestration structures that don't map cleanly onto simple sequential, parallel, or loop constructs.

**How it works:**

1.  **Inherit from `BaseAgent`:** Your custom orchestrator must be a Python class that inherits directly from `google.adk.agents.BaseAgent`.
2.  **Implement `_run_async_impl`:** This is the heart of your custom agent. You **must** override the asynchronous method:
    ```python
    async def _run_async_impl(self, ctx: InvocationContext) -> AsyncGenerator[Event, None]:
        # Your custom orchestration logic goes here
        pass
    ```
    *   **`async def`:** It must be an asynchronous generator function.
    *   **`ctx: InvocationContext`:** It receives the full `InvocationContext`, providing access to `ctx.session` (including `.state` and `.events`), `ctx.agent` (this custom agent instance), `ctx.invocation_id`, configured services (`artifact_service`, `memory_service`), etc.
    *   **`-> AsyncGenerator[Event, None]`:** It **must** `yield` `Event` objects. This is crucial for:
        *   Passing events generated by any sub-agents it calls up to the `Runner`.
        *   Signaling its own state changes or results via custom `Event`s.
        *   Cooperating with the ADK `Runner`'s event loop (pausing and resuming correctly).
3.  **Define Custom Logic within `_run_async_impl`:** Inside this method, you write standard Python `asyncio` code:
    *   **Store and Call Sub-Agents:** If your custom agent manages other agents, pass them during `__init__` and store them as instance attributes (e.g., `self.checker_agent`). Call their execution methods (usually `sub_agent.run_async(ctx)`) and properly handle their events using `async for`:
        ```python
        print(f"--- {self.name}: Calling {self.checker_agent.name} ---")
        async for event in self.checker_agent.run_async(ctx):
            # Process or log the event from the sub-agent if needed
            print(f"  Yielding event from {event.author}...")
            yield event # <<< Crucial: Yield sub-agent events up to the Runner
        # Execution resumes here after checker_agent finishes AND its last event is processed by Runner
        ```
    *   **Access and Use State:** Read values from `ctx.session.state` to make decisions. You can *write* to state (e.g., `ctx.session.state['my_flag'] = True`), but remember the change is only reliably committed *after* an `Event` containing the corresponding `state_delta` (which you might need to yield yourself or which might be part of a sub-agent's event) is processed by the `Runner`.
    *   **Implement Control Flow:** Use Python's `if`/`elif`/`else`, `for`/`while` loops, `try`/`except` blocks, etc., to build your specific workflow involving sub-agent calls or other actions.
    *   **Yield Custom Events:** Your `CustomAgent` can `yield` its own `Event` objects (e.g., `yield Event(author=self.name, content=..., actions=...)`) to report status, intermediate results, or explicitly signal state changes.

**Implementation Example (Conditional Workflow):**

Let's create a custom agent (`ConditionalWorkflowAgent`) that:
1.  Runs a sub-agent (`task_a`) which sets a specific state key (`task_a_outcome`).
2.  Reads the value of `task_a_outcome` from the state *after* `task_a` completes.
3.  Based on that value, it conditionally runs *either* `task_b` (if outcome was 'success') *or* `task_c` (otherwise).

```python
# filename: my_first_agent/agent.py (Updated for Custom Agent)

from google.adk.agents import Agent, BaseAgent, InvocationContext # <<< BaseAgent, InvocationContext
from google.adk.events import Event, EventActions
from google.genai import types
import asyncio
from typing import AsyncGenerator, List, Optional # <<< AsyncGenerator

# --- Define Simple Sub-Agents ---
# These agents will be orchestrated by our CustomAgent.
# Task A sets a state key via output_key.
agent_task_a = Agent(
    name="TaskA_SetOutcome",
    model="gemini-2.0-flash-001",
    instruction="Set the state key 'task_a_outcome' to the value 'success'. Confirm this action.",
    # output_key automatically creates state_delta={'task_a_outcome': 'success'}
    # in the agent's final event.
    output_key="task_a_outcome",
)

# Task B runs if Task A succeeded.
agent_task_b = Agent(
    name="TaskB_SuccessPath",
    model="gemini-2.0-flash-001",
    instruction="Confirm that you are Task B, running because the previous step succeeded.",
    output_key="final_result" # Overwrites state['final_result']
)

# Task C runs if Task A did not succeed.
agent_task_c = Agent(
    name="TaskC_FailurePath",
    model="gemini-2.0-flash-001",
    instruction="Confirm that you are Task C, running because the previous step did NOT result in 'success'.",
    output_key="final_result" # Overwrites state['final_result']
)


# --- Define the Custom Agent ---
class ConditionalWorkflowAgent(BaseAgent):
    """
    A Custom Agent that runs Task A, then runs Task B or Task C
    based on the 'task_a_outcome' state set by Task A.
    """
    # Define attributes for sub-agents (helps with type checking and clarity)
    task_a: Agent
    task_b: Agent
    task_c: Agent

    # Pydantic configuration to allow Agent objects as attributes
    model_config = {"arbitrary_types_allowed": True}

    def __init__(self, name: str, task_a: Agent, task_b: Agent, task_c: Agent):
        """Initializes the custom agent, storing sub-agents."""
        # List the agents this custom agent directly orchestrates
        direct_sub_agents = [task_a, task_b, task_c]

        # Call super().__init__ FIRST.
        # Pass keyword arguments for Pydantic validation and assignment (self.task_a = task_a).
        # Pass the list of sub-agents to the 'sub_agents' parameter for framework hierarchy.
        super().__init__(
            name=name,
            task_a=task_a,
            task_b=task_b,
            task_c=task_c,
            sub_agents=direct_sub_agents
        )
        print(f"Custom Agent '{self.name}' initialized.")
        print(f"  Orchestrates: {[sa.name for sa in self.sub_agents]}")

    # <<< Override the core execution method >>>
    async def _run_async_impl(
        self, ctx: InvocationContext
    ) -> AsyncGenerator[Event, None]:
        """Implements the custom conditional workflow logic."""
        print(f"\n--- Custom Agent '{self.name}' Starting (Invocation ID: {ctx.invocation_id}) ---")

        # === Step 1: Always run Task A ===
        print(f"--- {self.name}: Running sub-agent '{self.task_a.name}'... ---")
        # Use 'async for' to properly yield all events from the sub-agent's run
        async for event in self.task_a.run_async(ctx):
            print(f"  Yielding event from '{event.author}' (Final: {event.is_final_response()})")
            yield event
        # ---- Task A finished execution ----
        # The Runner processed Task A's final event, including committing its state_delta.

        # === Step 2: Read State set by Task A ===
        # Access the session state via the InvocationContext 'ctx'
        # Use .get() for safe access in case the key wasn't set
        outcome = ctx.session.state.get("task_a_outcome", "unknown")
        print(f"--- {self.name}: Read state 'task_a_outcome' = '{outcome}' ---")

        # === Step 3: Conditional Execution based on State ===
        chosen_task: Optional[Agent] = None # Variable to hold the next task
        if outcome == "success":
            print(f"--- {self.name}: Condition 'success' met. Choosing '{self.task_b.name}'. ---")
            chosen_task = self.task_b
        else:
            print(f"--- {self.name}: Condition NOT 'success' ('{outcome}'). Choosing '{self.task_c.name}'. ---")
            chosen_task = self.task_c

        # === Step 4: Execute the chosen task ===
        if chosen_task:
            print(f"--- {self.name}: Running chosen sub-agent '{chosen_task.name}'... ---")
            async for event in chosen_task.run_async(ctx):
                print(f"  Yielding event from '{event.author}' (Final: {event.is_final_response()})")
                yield event
            # --- Chosen task (B or C) finished execution ---
        else:
             # Should not happen in this logic, but good practice
             print(f"--- {self.name}: No task chosen based on outcome '{outcome}'. ---")

        print(f"--- Custom Agent '{self.name}' Finished ---")
        # Optionally yield a final status event from the custom agent itself
        yield Event(author=self.name, content=types.Content(parts=[types.Part(text="Conditional workflow complete.")]))


# --- Instantiate the Custom Agent as the Root Agent ---
root_agent = ConditionalWorkflowAgent(
    name="my_conditional_workflow",
    task_a=agent_task_a,
    task_b=agent_task_b,
    task_c=agent_task_c,
)

# --- Running this Agent ---
# The `adk run my_first_agent` command will now execute this CustomAgent.
# You still need the supporting code structure (Runner, SessionService)
# if running programmatically, like in the commented-out example in the previous response.
# `adk run` handles that setup implicitly.

print(f"\nRoot agent is now '{root_agent.name}' (Custom Agent).")

```

**Running and Observing:**

1.  Go back to your terminal (in `adk_tutorial`), activate `myenv`.
2.  Run the agent:

```bash
adk run my_first_agent
```

Send any prompt to start the custom workflow, e.g.:

*   `Start`

**Expected Output & Observation:**

1.  The `my_conditional_workflow` (`CustomAgent`) starts.
2.  Log: `--- my_conditional_workflow: Running sub-agent 'TaskA_SetOutcome'... ---`
3.  `TaskA_SetOutcome` runs. It uses its LLM to generate the confirmation and its `output_key` ensures its final event includes `state_delta={'task_a_outcome': 'success'}`. You'll see events yielded from it.
4.  The `CustomAgent` resumes *after* Task A's final event is processed by the Runner (committing the state).
5.  Log: `--- my_conditional_workflow: Read state 'task_a_outcome' = 'success' ---`
6.  Log: `--- my_conditional_workflow: Condition 'success' met. Choosing 'TaskB_SuccessPath'. ---`
7.  Log: `--- my_conditional_workflow: Running chosen sub-agent 'TaskB_SuccessPath'... ---`
8.  `TaskB_SuccessPath` runs, uses its LLM, and yields its events (including its final response confirming it's Task B).
9.  The `CustomAgent` resumes after Task B finishes.
10. Log: `--- Custom Agent 'my_conditional_workflow' Finished ---`
11. The `CustomAgent` yields its final completion event.
12. The final response displayed by `adk run` will be from the last agent in the executed path (Task B in this case).

If you were to modify `agent_task_a`'s instruction to set the outcome differently (e.g., "Set the state key 'task_a_outcome' to 'pending'"), rerunning the workflow would cause the logs to show Task C being chosen and executed instead of Task B. This confirms the custom conditional logic is working deterministically based on the state set by the first step.

**Key Takeaways for `CustomAgent`:**

*   **Use Case:** Needed for complex, deterministic orchestration logic that goes beyond standard `WorkflowAgent` patterns.
*   **Inheritance:** Must inherit from `BaseAgent`.
*   **Core Method:** You **must** implement `async def _run_async_impl(self, ctx: InvocationContext)`.
*   **Sub-Agent Calls:** Use `async for event in sub_agent.run_async(ctx): yield event` to correctly run sub-agents and propagate their events.
*   **State Interaction:** Use `ctx.session.state` to read state for making decisions. Be mindful that state writes are committed by the `Runner` *after* an event with the `state_delta` is yielded.
*   **`yield Event`:** Essential for pausing execution, allowing the `Runner` to process events (including state/artifact changes), and communicating results or status.

`CustomAgent` offers the highest degree of control over agent execution flow but requires careful management of the asynchronous `asyncio` model and the ADK event lifecycle.

We've now explored all the major agent types (`LlmAgent`, `SequentialAgent`, `ParallelAgent`, `LoopAgent`, `CustomAgent`) along with tools, state, callbacks, artifacts, evaluation, deployment, and persistence! This provides a very comprehensive overview of the ADK framework based on the documentation.

What would be the most helpful final step for you?

1.  **Quick Recap Quiz:** A few questions to solidify understanding of all concepts.
2.  **Discuss Your Project:** Brainstorm applying these features to your specific needs.
3.  **Documentation Pointers:** Highlight specific documentation pages for deeper dives.
4.  **Conclude:** End the tutorial session.