# 🧪 ADK with A2A Application Testing

This notebook demonstrates how to test an ADK (Agent Development Kit) application that implements the Agent2Agent (A2A) protocol.
It covers both local and remote testing, both with Agent Engine and Cloud Run.

## Install dependencies

In [None]:
!pip install google-cloud-aiplatform a2a-sdk --upgrade

In [None]:
# Uncomment the following lines if you're not using the virtual environment created by uv
# import sys
# sys.path.append("../")

### Import libraries

In [None]:
# ruff: noqa
import asyncio
import json
import os
import requests
import uuid

import vertexai
from a2a.types import (
    Message,
    MessageSendParams,
    Part,
    Role,
    SendStreamingMessageRequest,
    TextPart,
)
from IPython.display import Markdown, display

from tests.helpers import (
    build_get_request,
    build_post_request,
    poll_task_completion,
)

### Initialize Vertex AI Client

In [19]:
# Initialize the Vertex AI client
LOCATION = "us-central1"

client = vertexai.Client(
    location=LOCATION,
)

INFO:google_genai._api_client:The user provided project/location will take precedence over the Vertex AI API key from the environment variable.


## If you are using Agent Engine
See more documentation at [Agent Engine Overview](https://cloud.google.com/vertex-ai/generative-ai/docs/agent-engine/overview)

### Remote Testing

In [20]:
# Set to None to auto-detect from ./deployment_metadata.json, or specify manually
# "projects/PROJECT_ID/locations/us-central1/reasoningEngines/ENGINE_ID"
REASONING_ENGINE_ID = None

if REASONING_ENGINE_ID is None:
    try:
        with open("../deployment_metadata.json") as f:
            metadata = json.load(f)
            REASONING_ENGINE_ID = metadata.get("remote_agent_engine_id")
    except (FileNotFoundError, json.JSONDecodeError):
        pass

print(f"Using REASONING_ENGINE_ID: {REASONING_ENGINE_ID}")
# Get the existing agent engine
remote_agent_engine = client.agent_engines.get(name=REASONING_ENGINE_ID)

Using REASONING_ENGINE_ID: projects/321369683273/locations/us-central1/reasoningEngines/8189008672047759360


INFO:httpx:HTTP Request: GET https://us-central1-aiplatform.googleapis.com/v1beta1/projects/321369683273/locations/us-central1/reasoningEngines/8189008672047759360 "HTTP/1.1 200 OK"


#### Fetch Agent Card

In [21]:
remote_a2a_agent_card = await remote_agent_engine.handle_authenticated_agent_card()
print(f"Agent: {remote_a2a_agent_card.name}")
print(f"URL: {remote_a2a_agent_card.url}")
print(f"Skills: {[s.description for s in remote_a2a_agent_card.skills]}")
print(f"Examples: {[s.examples for s in remote_a2a_agent_card.skills][0]}")

INFO:httpx:HTTP Request: GET https://us-central1-aiplatform.googleapis.com/v1beta1/projects/321369683273/locations/us-central1/reasoningEngines/8189008672047759360/a2a/v1/card "HTTP/1.1 200 OK"


Agent: root_agent
URL: https://us-central1-aiplatform.googleapis.com/v1beta1/projects/asp-starter-dev/locations/us-central1/reasoningEngines/8189008672047759360/a2a
Skills: ['An agent that provides weather and current time information. I am a helpful AI assistant designed to provide accurate and useful information.', 'Simulates a web search. Use it get information on weather.\n\nArgs:\n    query: A string containing the location to get weather information for.\n\nReturns:\n    A string with the simulated weather information for the queried location.', 'Simulates getting the current time for a city.\n\nArgs:\n    city: The name of the city to get the current time for.\n\nReturns:\n    A string with the current time information.']
Examples: None


#### Send Message

In [22]:
# Send the message using A2A protocol
message_data = {
    "messageId": f"msg-{os.urandom(8).hex()}",
    "role": "user",
    "parts": [{"kind": "text", "text": "What is the weather in New York?"}],
}

response = await remote_agent_engine.on_message_send(**message_data)

# Extract task object from response
task_object = None
for chunk in response:
    if isinstance(chunk, tuple) and chunk and hasattr(chunk[0], "id"):
        task_object = chunk[0]
        break

# Get task_id
if task_object:
    task_id = task_object.id
    print(f"Task started: {task_id}")
else:
    print("Could not retrieve the task object from the response.")

INFO:httpx:HTTP Request: POST https://us-central1-aiplatform.googleapis.com/v1beta1/projects/321369683273/locations/us-central1/reasoningEngines/8189008672047759360/a2a/v1/message:send "HTTP/1.1 200 OK"


Task started: de0d240e-b205-4b6d-989d-e06cf9c0d2c4


#### Poll for response

In [None]:
# Poll for task completion
max_attempts = 30
for attempt in range(max_attempts):
    result = await remote_agent_engine.on_get_task(id=task_id, historyLength=1)

    task_state = result.status.state if hasattr(result, "status") else None
    print(f"Attempt {attempt + 1}: {task_state}")

    if task_state == "completed":
        print("Task completed!")
        break
    elif task_state == "failed":
        print(f"Task failed: {result}")
        break

    await asyncio.sleep(1)

# Extract and display artifacts
if hasattr(result, "artifacts") and result.artifacts:
    for artifact in result.artifacts:
        if artifact.parts and hasattr(artifact.parts[0], "root"):
            text = getattr(artifact.parts[0].root, "text", None)
            if text:
                display(Markdown(f"**Answer**:\n {text}"))
            else:
                print("Could not extract text from artifact parts.")
        else:
            print("Could not extract text from artifact parts.")
else:
    print("No artifacts found in result")

INFO:httpx:HTTP Request: GET https://us-central1-aiplatform.googleapis.com/v1beta1/projects/321369683273/locations/us-central1/reasoningEngines/8189008672047759360/a2a/v1/tasks/de0d240e-b205-4b6d-989d-e06cf9c0d2c4?historyLength=1 "HTTP/1.1 200 OK"


Attempt 1: TaskState.failed
Task failed: artifacts=[] context_id='1aa5ca2a-76fe-436f-9915-c378eabbb7d9' history=[Message(context_id='1aa5ca2a-76fe-436f-9915-c378eabbb7d9', extensions=None, kind='message', message_id='msg-c0e33bf67d901225', metadata={}, parts=[Part(root=TextPart(kind='text', metadata=None, text='What is the weather in New York?'))], reference_task_ids=None, role=<Role.user: 'user'>, task_id='de0d240e-b205-4b6d-989d-e06cf9c0d2c4'), Message(context_id='1aa5ca2a-76fe-436f-9915-c378eabbb7d9', extensions=None, kind='message', message_id='msg-c0e33bf67d901225', metadata={}, parts=[Part(root=TextPart(kind='text', metadata=None, text='What is the weather in New York?'))], reference_task_ids=None, role=<Role.user: 'user'>, task_id='de0d240e-b205-4b6d-989d-e06cf9c0d2c4')] id='de0d240e-b205-4b6d-989d-e06cf9c0d2c4' kind='task' metadata=None status=TaskStatus(message=Message(context_id=None, extensions=None, kind='message', message_id='22adca1e-5071-41b6-a7db-49b594fe481f', metadata

#### Register Feedback

In [25]:
remote_agent_engine.register_feedback(
    feedback={
        "score": 5,
        "text": "Great response!",
        "invocation_id": "test-invocation-123",
        "user_id": "test",
    }
)

INFO:httpx:HTTP Request: POST https://us-central1-aiplatform.googleapis.com/v1beta1/projects/321369683273/locations/us-central1/reasoningEngines/8189008672047759360:query "HTTP/1.1 200 OK"


### Local Testing

You can import directly the AgentEngineApp class within your environment. 
To run the agent locally, follow these steps:
1. Make sure all required packages are installed in your environment
2. The recommended approach is to use the same virtual environment created by the 'uv' tool
3. You can set up this environment by running 'make install' from your agent's root directory
4. Then select this kernel (.venv folder in your project) in your Jupyter notebook to ensure all dependencies are available

#### Set up local Agent Engine

In [None]:
from app.agent_engine_app import AgentEngineApp

local_agent_engine = await AgentEngineApp.create()
local_agent_engine.set_up()

  app = App(root_agent=root_agent, name="placeholder_name")
  agent_card_builder = AgentCardBuilder(
E0000 00:00:1760969331.273530  813806 alts_credentials.cc:93] ALTS creds ignored. Not running on GCP and untrusted ALTS is not enabled.
  agent_executor_builder=lambda: A2aAgentExecutor(runner=runner),
  self._config = config or A2aAgentExecutorConfig()
E0000 00:00:1760969333.220056  813806 alts_credentials.cc:93] ALTS creds ignored. Not running on GCP and untrusted ALTS is not enabled.


ERROR:grpc._plugin_wrapping:AuthMetadataPluginCallback "<google.auth.transport.grpc.AuthMetadataPlugin object at 0x164274500>" raised exception!
Traceback (most recent call last):
  File "/Users/eliasecchi/a2aae/a2a-ae-starter-temp/.venv/lib/python3.12/site-packages/grpc/_plugin_wrapping.py", line 106, in __call__
    self._metadata_plugin(
  File "/Users/eliasecchi/a2aae/a2a-ae-starter-temp/.venv/lib/python3.12/site-packages/google/auth/transport/grpc.py", line 95, in __call__
    callback(self._get_authorization_headers(context), None)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/eliasecchi/a2aae/a2a-ae-starter-temp/.venv/lib/python3.12/site-packages/google/auth/transport/grpc.py", line 81, in _get_authorization_headers
    self._credentials.before_request(
  File "/Users/eliasecchi/a2aae/a2a-ae-starter-temp/.venv/lib/python3.12/site-packages/google/auth/credentials.py", line 228, in before_request
    self._blocking_refresh(request)
  File "/Users/eliasecchi/

#### Verify Custom Method is Registered

In [2]:
test = local_agent_engine.register_operations()
print(test)

{'a2a_extension': ['on_message_send', 'on_get_task', 'on_cancel_task', 'handle_authenticated_agent_card'], '': ['register_feedback']}


#### Fetch Agent Card

In [5]:
request = build_get_request(None)
response = await local_agent_engine.handle_authenticated_agent_card(
    request=request, context=None
)
print(response)

{'capabilities': {'streaming': False}, 'defaultInputModes': ['text/plain'], 'defaultOutputModes': ['text/plain'], 'description': 'An ADK Agent', 'name': 'root_agent', 'preferredTransport': 'HTTP+JSON', 'protocolVersion': '0.3.0', 'skills': [{'description': 'I am a helpful AI assistant designed to provide accurate and useful information.', 'id': 'root_agent', 'name': 'model', 'tags': ['llm']}, {'description': 'Simulates a web search. Use it get information on weather.\n\nArgs:\n    query: A string containing the location to get weather information for.\n\nReturns:\n    A string with the simulated weather information for the queried location.', 'id': 'root_agent-get_weather', 'name': 'get_weather', 'tags': ['llm', 'tools']}, {'description': 'Simulates getting the current time for a city.\n\nArgs:\n    city: The name of the city to get the current time for.\n\nReturns:\n    A string with the current time information.', 'id': 'root_agent-get_current_time', 'name': 'get_current_time', 'tags

#### Send Message

In [None]:
message_data = {
    "message": {
        "messageId": f"msg-{os.urandom(8).hex()}",
        "content": [{"text": "What is the weather in New York?"}],
        "role": "ROLE_USER",
    },
}
request = build_post_request(message_data)

response = await local_agent_engine.on_message_send(request=request, context=None)
print(response)

  run_args = convert_a2a_request_to_adk_run_args(
  parts=[part_converter(part) for part in request.message.parts],
  task_result_aggregator = TaskResultAggregator()
INFO:google_genai._api_client:The project/location from the environment variables will take precedence over the API key from the environment variables.
INFO:google_adk.google.adk.models.google_llm:Sending out request, model: gemini-2.5-flash, backend: GoogleLLMVariant.VERTEX_AI, stream: False
INFO:google_genai.models:AFC is enabled with max remote calls: 10.
INFO:a2a.server.tasks.task_manager:Task not found or task_id not set. Creating new task for event (task_id: 58533967-b56f-4ff1-93bd-f006fbeb3da3, context_id: 14e2222c-7eb6-40ae-822d-372721f6709a).


{'task': {'id': '58533967-b56f-4ff1-93bd-f006fbeb3da3', 'contextId': '14e2222c-7eb6-40ae-822d-372721f6709a', 'status': {'state': 'TASK_STATE_SUBMITTED', 'message': {'messageId': 'msg-40f40f0d9775d3e5', 'contextId': '14e2222c-7eb6-40ae-822d-372721f6709a', 'taskId': '58533967-b56f-4ff1-93bd-f006fbeb3da3', 'role': 'ROLE_USER', 'content': [{'text': 'What is the weather in New York?'}], 'metadata': {}}}}}


E0000 00:00:1760969359.150990  814340 alts_credentials.cc:93] ALTS creds ignored. Not running on GCP and untrusted ALTS is not enabled.
INFO:google_adk.google.adk.models.google_llm:Response received from the model.
  for a2a_event in convert_event_to_a2a_events(
  message = convert_event_to_a2a_message(
  a2a_part = part_converter(part)
  for a2a_event in convert_event_to_a2a_events(
  message = convert_event_to_a2a_message(
  a2a_part = part_converter(part)
INFO:google_genai._api_client:The project/location from the environment variables will take precedence over the API key from the environment variables.
INFO:google_adk.google.adk.models.google_llm:Sending out request, model: gemini-2.5-flash, backend: GoogleLLMVariant.VERTEX_AI, stream: False
INFO:google_genai.models:AFC is enabled with max remote calls: 10.
I0000 00:00:1760969359.814189  814737 fork_posix.cc:71] Other threads are currently calling into gRPC, skipping fork() handlers
INFO:google_adk.google.adk.models.google_llm:Res

#### Poll for response

In [7]:
task_id = response["task"]["id"]
print(f"The Task ID is: {task_id}")

# Poll for completion using helper
final_response = await poll_task_completion(local_agent_engine, task_id)

# Extract and display artifacts
for artifact in final_response["artifacts"]:
    if artifact["parts"] and "text" in artifact["parts"][0]:
        display(Markdown(f"**Answer**:\n {artifact['parts'][0]['text']}"))
    else:
        print("Could not extract text from artifact parts.")

The Task ID is: 58533967-b56f-4ff1-93bd-f006fbeb3da3


**Answer**:
 The weather in New York is 90 degrees and sunny.

#### Register Feedback

In [8]:
local_agent_engine.register_feedback(
    feedback={
        "score": 5,
        "text": "Great response!",
        "invocation_id": "test-invocation-123",
        "user_id": "test",
    }
)

E0000 00:00:1760969375.729896  813806 alts_credentials.cc:93] ALTS creds ignored. Not running on GCP and untrusted ALTS is not enabled.


## If you are using Cloud Run

### Remote Testing

For more information about authenticating HTTPS requests to Cloud Run services, see:
[Cloud Run Authentication Documentation](https://cloud.google.com/run/docs/triggering/https-request)

Remote testing involves using a deployed service URL instead of localhost.

Authentication is handled using GCP identity tokens instead of local credentials.

In [None]:
ID_TOKEN = get_ipython().getoutput("gcloud auth print-identity-token -q")[0]

In [None]:
SERVICE_URL = "YOUR_SERVICE_URL_HERE"  # Replace with your Cloud Run service URL

Send a message using A2A protocol

In [None]:
# Create A2A message request
message = Message(
    message_id=f"msg-user-{uuid.uuid4()}",
    role=Role.user,
    parts=[Part(root=TextPart(text="Hello! Weather in New York?"))],
)

request = SendStreamingMessageRequest(
    id=f"req-{uuid.uuid4()}",
    params=MessageSendParams(message=message),
)

# Set up headers with authentication
headers = {"Content-Type": "application/json", "Authorization": f"Bearer {ID_TOKEN}"}

# Send the streaming request to the A2A endpoint
response = requests.post(
    SERVICE_URL,  # A2A endpoint is at the base URL
    headers=headers,
    json=request.model_dump(mode="json", exclude_none=True),
    stream=True,
    timeout=60,
)

print(f"Response status code: {response.status_code}")

# Parse streaming A2A responses
for line in response.iter_lines():
    if line:
        line_str = line.decode("utf-8")
        if line_str.startswith("data: "):
            event_json = line_str[6:]
            event = json.loads(event_json)
            print(f"Received event: {event}")

### Local Testing

> You can run the application locally via the `make local-backend` command.

Send a message to the local backend service using the A2A protocol and receive a streaming response.

In [None]:
# Create A2A message request
message = Message(
    message_id=f"msg-user-{uuid.uuid4()}",
    role=Role.user,
    parts=[Part(root=TextPart(text="Hello! Weather in New York?"))],
)

request = SendStreamingMessageRequest(
    id=f"req-{uuid.uuid4()}",
    params=MessageSendParams(message=message),
)

# Set up headers
headers = {"Content-Type": "application/json"}

# Send the streaming request to the local A2A endpoint
response = requests.post(
    "http://127.0.0.1:8000/",
    headers=headers,
    json=request.model_dump(mode="json", exclude_none=True),
    stream=True,
    timeout=60,
)

print(f"Response status code: {response.status_code}")

# Parse streaming A2A responses
for line in response.iter_lines():
    if line:
        line_str = line.decode("utf-8")
        if line_str.startswith("data: "):
            event_json = line_str[6:]
            event = json.loads(event_json)
            print(f"Received event: {event}")