# Agent-to-Agent (A2A) Protocol - Client Tutorial

This notebook demonstrates how to interact with an A2A-compliant agent service using the A2A client library. You'll learn how to:

- Discover an agent's capabilities through AgentCards
- Send single messages to an agent
- Handle multi-turn conversations with context
- Work with streaming responses

## Prerequisites
- A running A2A agent service (default: http://localhost:10000)
- Python 3.8+
- Required packages: a2a-sdk, httpx

## What is A2A?
The Agent-to-Agent (A2A) protocol is a standardized way for AI agents to communicate with each other. It uses AgentCards (similar to OpenAPI specs) to describe agent capabilities and JSON-RPC for communication.

## Starting the Agent Service

Before running this notebook, start the A2A agent service:

```bash
# From the project root
cd a2a_service
uv run python -m a2a_service
```

The service should start on http://localhost:10000

In [1]:
# Import required libraries
import logging
from typing import Any
from uuid import uuid4
import httpx

from a2a.client import A2ACardResolver, ClientFactory, ClientConfig
from a2a.types import (
    AgentCard,
    MessageSendParams,
    SendMessageRequest,
    SendStreamingMessageRequest,
)
from a2a.utils.constants import (
    AGENT_CARD_WELL_KNOWN_PATH,
    EXTENDED_AGENT_CARD_PATH,
)

# Configure logging to see what's happening
logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s')
logger = logging.getLogger(__name__)

print("✓ All imports successful!")


✓ All imports successful!


## Configuration

Before we start, let's configure the connection to our A2A agent service. The service should be running locally on port 10000.

**Key Configuration Parameters:**
- `base_url`: The URL where your A2A agent service is running
- `timeout`: HTTP timeout for requests (increased to 60s for LLM responses)


In [2]:
# Configuration
base_url = 'http://localhost:10000'
timeout_seconds = 60.0

print(f"Agent Service URL: {base_url}")
print(f"Request Timeout: {timeout_seconds}s")


Agent Service URL: http://localhost:10000
Request Timeout: 60.0s


## Part 1: Agent Discovery via AgentCards

An **AgentCard** is a machine-readable description of an agent's capabilities, similar to an OpenAPI specification. It tells clients:
- What the agent can do
- What endpoints are available
- What authentication is required
- What features are supported

### Two Types of AgentCards:

1. **Public AgentCard** (`/.well-known/agent.card.json`):
   - Publicly accessible without authentication
   - Contains basic agent information
   - May indicate if an extended card is available

2. **Extended AgentCard** (`/agent.card.json`):
   - Requires authentication
   - May contain additional capabilities or sensitive information
   - Only available if indicated in the public card


In [3]:
# Create an async HTTP client with extended timeout
httpx_client = httpx.AsyncClient(timeout=httpx.Timeout(timeout_seconds))

# Initialize the A2ACardResolver
# This helps us discover and fetch AgentCards from the service
resolver = A2ACardResolver(
    httpx_client=httpx_client,
    base_url=base_url,
)

print("✓ HTTP client and resolver initialized")


✓ HTTP client and resolver initialized


In [4]:
# Fetch the public agent card
try:
    logger.info(f'Fetching public agent card from: {base_url}{AGENT_CARD_WELL_KNOWN_PATH}')
    public_card = await resolver.get_agent_card()
    
    print("\n" + "="*60)
    print("PUBLIC AGENT CARD")
    print("="*60)
    print(public_card.model_dump_json(indent=2, exclude_none=True))
    
    # Track which card we'll use
    final_agent_card = public_card
    
    print("\n✓ Successfully fetched public agent card")
    
except Exception as e:
    logger.error(f'Failed to fetch public agent card: {e}')
    raise RuntimeError('Cannot continue without agent card') from e


INFO: Fetching public agent card from: http://localhost:10000/.well-known/agent-card.json
INFO: HTTP Request: GET http://localhost:10000/.well-known/agent-card.json "HTTP/1.1 200 OK"
INFO: Successfully fetched agent card data from http://localhost:10000/.well-known/agent-card.json: {'capabilities': {'pushNotifications': True, 'streaming': True}, 'defaultInputModes': ['text', 'text/plain'], 'defaultOutputModes': ['text', 'text/plain'], 'description': 'A helpful AI assistant with web search, academic paper search, and document retrieval capabilities', 'name': 'General Purpose Agent', 'preferredTransport': 'JSONRPC', 'protocolVersion': '0.3.0', 'skills': [{'description': 'Search the web for current information', 'examples': ['What are the latest news about AI?'], 'id': 'web_search', 'name': 'Web Search Tool', 'tags': ['search', 'web', 'internet']}, {'description': 'Search for academic papers on arXiv', 'examples': ['Find recent papers on large language models'], 'id': 'arxiv_search', 'nam


PUBLIC AGENT CARD
{
  "capabilities": {
    "pushNotifications": true,
    "streaming": true
  },
  "defaultInputModes": [
    "text",
    "text/plain"
  ],
  "defaultOutputModes": [
    "text",
    "text/plain"
  ],
  "description": "A helpful AI assistant with web search, academic paper search, and document retrieval capabilities",
  "name": "General Purpose Agent",
  "preferredTransport": "JSONRPC",
  "protocolVersion": "0.3.0",
  "skills": [
    {
      "description": "Search the web for current information",
      "examples": [
        "What are the latest news about AI?"
      ],
      "id": "web_search",
      "name": "Web Search Tool",
      "tags": [
        "search",
        "web",
        "internet"
      ]
    },
    {
      "description": "Search for academic papers on arXiv",
      "examples": [
        "Find recent papers on large language models"
      ],
      "id": "arxiv_search",
      "name": "Academic Paper Search",
      "tags": [
        "research",
        "pap

## Part 2: Initialize the A2A Client

Now that we have the AgentCard, we can create a client to communicate with the agent. The client uses:
- **ClientFactory**: Creates properly configured clients
- **JSON-RPC transport**: Default protocol for A2A communication


In [5]:
# Create ClientFactory with configuration
factory = ClientFactory(
    ClientConfig(
        httpx_client=httpx_client,
        # JSON-RPC is the default transport
    )
)

# Create client using the factory and agent card
client = factory.create(card=final_agent_card)

print("✓ A2A Client initialized successfully")
print(f"  Ready to communicate with agent at: {base_url}")


✓ A2A Client initialized successfully
  Ready to communicate with agent at: http://localhost:10000


## Part 3: Sending a Single Message

Let's send our first message to the agent! This is a simple one-shot interaction.

### Key Components:
- **Message**: Contains the user's question/request
- **Parts**: Can include text, images, or other content types
- **message_id**: Unique identifier for tracking
- **Request ID**: Unique identifier for the JSON-RPC request


In [9]:
# Construct the message (NOT a SendMessageRequest)
message = {
    "role": "user",
    "parts": [
        {"kind": "text", "text": "What are the latest developments in artificial intelligence?"}
    ],
    "message_id": uuid4().hex,
}

print("Sending message to agent...")
print("\n" + "=" * 60)
print("STREAMING RESPONSE CHUNKS")
print("=" * 60)

chunk_count = 0
try:
    # BaseClient.send_message expects a Message (dict or model)
    # When the client is configured for streaming, this returns an async generator of events
    async for chunk in client.send_message(message):
        chunk_count += 1
        print(f"\n--- Chunk {chunk_count} ---")
        print(chunk.model_dump(mode="json", exclude_none=True))
finally:
    print("\n" + "=" * 60)
    print(f"STREAMING COMPLETE - Received {chunk_count} chunks")
    print("=" * 60)


INFO: HTTP Request: POST http://localhost:10000/ "HTTP/1.1 200 OK"


Sending message to agent...

STREAMING RESPONSE CHUNKS


INFO: New task created with id: be3ebcff-66b3-4c5e-ba02-462897182869



--- Chunk 1 ---

STREAMING COMPLETE - Received 1 chunks


AttributeError: 'tuple' object has no attribute 'model_dump'

In [10]:
import json
import dataclasses
from dataclasses import asdict

# Construct the message (NOT a SendMessageRequest)
message = {
    "role": "user",
    "parts": [
        {"kind": "text", "text": "What are the latest developments in artificial intelligence?"}
    ],
    "message_id": uuid4().hex,
}

def to_jsonable(obj):
    # pydantic v2 models
    if hasattr(obj, "model_dump"):
        return obj.model_dump(mode="json", exclude_none=True)
    # dataclasses
    if dataclasses.is_dataclass(obj):
        return asdict(obj)
    # dicts are fine
    if isinstance(obj, dict):
        return obj
    # strings/numbers, etc.
    return obj

print("Sending message to agent...")
print("\n" + "=" * 60)
print("STREAMING RESPONSE CHUNKS")
print("=" * 60)

chunk_count = 0
try:
    # BaseClient.send_message yields streaming events
    async for chunk in client.send_message(message):
        chunk_count += 1

        # Normalize common shapes:
        # 1) (event_type, payload)
        # 2) object with .type/.delta/.message
        # 3) raw dict / model
        if isinstance(chunk, tuple) and len(chunk) == 2:
            etype, payload = chunk
        else:
            etype = getattr(chunk, "type", None)
            payload = (
                getattr(chunk, "delta", None)
                or getattr(chunk, "message", None)
                or getattr(chunk, "payload", None)
                or chunk
            )

        print(f"\n--- Chunk {chunk_count} ({etype or 'event'}) ---")

        # Prefer printing text deltas inline for a nicer UX
        if isinstance(payload, str):
            print(payload, end="", flush=True)
        else:
            print(json.dumps(to_jsonable(payload), indent=2, default=str))

finally:
    print("\n" + "=" * 60)
    print(f"STREAMING COMPLETE - Received {chunk_count} chunks")
    print("=" * 60)


INFO: HTTP Request: POST http://localhost:10000/ "HTTP/1.1 200 OK"


Sending message to agent...

STREAMING RESPONSE CHUNKS


INFO: New task created with id: a3ef20e7-d3b6-4f6c-84f0-5fd3f17206c6



--- Chunk 1 (artifacts=None context_id='94f9cbdb-857a-479f-a71d-6c5fe87e91c6' history=[Message(context_id='94f9cbdb-857a-479f-a71d-6c5fe87e91c6', extensions=None, kind='message', message_id='ee844a5befa5430da1a7cf901a0e53ed', metadata=None, parts=[Part(root=TextPart(kind='text', metadata=None, text='What are the latest developments in artificial intelligence?'))], reference_task_ids=None, role=<Role.user: 'user'>, task_id='a3ef20e7-d3b6-4f6c-84f0-5fd3f17206c6')] id='a3ef20e7-d3b6-4f6c-84f0-5fd3f17206c6' kind='task' metadata=None status=TaskStatus(message=None, state=<TaskState.submitted: 'submitted'>, timestamp=None)) ---
null

--- Chunk 2 (artifacts=None context_id='94f9cbdb-857a-479f-a71d-6c5fe87e91c6' history=[Message(context_id='94f9cbdb-857a-479f-a71d-6c5fe87e91c6', extensions=None, kind='message', message_id='ee844a5befa5430da1a7cf901a0e53ed', metadata=None, parts=[Part(root=TextPart(kind='text', metadata=None, text='What are the latest developments in artificial intelligence?')

## Part 4: Multi-Turn Conversations

A2A supports multi-turn conversations where the agent maintains context across messages. This is crucial for follow-up questions or complex interactions.

### Key Concepts:
- **context_id**: Identifies the conversation thread
- **task_id**: Identifies the specific task within a context
- Both IDs must be included in follow-up messages to maintain context


In [None]:
# First message in a multi-turn conversation
first_message_payload = {
    'message': {
        'role': 'user',
        'parts': [
            {
                'kind': 'text',
                'text': 'Find me recent papers on transformer architectures',
            }
        ],
        'message_id': uuid4().hex,
    },
}

request = SendMessageRequest(
    id=str(uuid4()),
    params=MessageSendParams(**first_message_payload),
)

print("Sending first message in conversation...")
response = await client.send_message(request)

print("\n" + "="*60)
print("FIRST RESPONSE")
print("="*60)
print(response.model_dump(mode='json', exclude_none=True))

# Extract IDs for context continuation
task_id = response.root.result.id
context_id = response.root.result.context_id

print(f"\n✓ Captured conversation IDs:")
print(f"  task_id: {task_id}")
print(f"  context_id: {context_id}")


In [None]:
# Second message - includes context IDs for continuation
second_message_payload = {
    'message': {
        'role': 'user',
        'parts': [
            {
                'kind': 'text',
                'text': 'Can you summarize the key findings?'
            }
        ],
        'message_id': uuid4().hex,
        'task_id': task_id,
        'context_id': context_id,
    },
}

second_request = SendMessageRequest(
    id=str(uuid4()),
    params=MessageSendParams(**second_message_payload),
)

print("Sending follow-up message with context...")
second_response = await client.send_message(second_request)

print("\n" + "="*60)
print("FOLLOW-UP RESPONSE")
print("="*60)
print(second_response.model_dump(mode='json', exclude_none=True))


## Part 5: Streaming Responses

For longer responses, streaming allows you to receive the agent's reply incrementally as it's generated, rather than waiting for the complete response.

### Benefits:
- Lower latency - see results as they're generated
- Better user experience - can start displaying content immediately
- More efficient for long-running operations

### How it Works:
- Use `SendStreamingMessageRequest` instead of `SendMessageRequest`
- Call `send_message_streaming()` instead of `send_message()`
- Iterate over the response stream with `async for`


In [None]:
# Reuse the first message payload for streaming
streaming_request = SendStreamingMessageRequest(
    id=str(uuid4()), params=MessageSendParams(**send_message_payload)
)

print("Sending streaming request...")
print("\n" + "="*60)
print("STREAMING RESPONSE CHUNKS")
print("="*60 + "\n")

# Get the streaming response
stream_response = client.send_message_streaming(streaming_request)

# Process each chunk as it arrives
chunk_count = 0
async for chunk in stream_response:
    chunk_count += 1
    print(f"--- Chunk {chunk_count} ---")
    print(chunk.model_dump(mode='json', exclude_none=True))
    print()

print(f"\n✓ Received {chunk_count} chunks total")


## Cleanup

Always remember to close the HTTP client when you're done to free up resources.


In [None]:
# Close the HTTP client when done
await httpx_client.aclose()
print("✓ HTTP client closed")


## Summary

Congratulations! You've learned how to:

1. **Discover agents** using AgentCards (both public and extended)
2. **Initialize an A2A client** using ClientFactory
3. **Send single messages** for one-shot interactions
4. **Maintain context** in multi-turn conversations using task_id and context_id
5. **Stream responses** for better user experience with long-running operations

## Next Steps

- Explore different message types (with images, files, etc.)
- Implement error handling for production use
- Build a conversation UI that maintains context
- Experiment with different agents supporting the A2A protocol
- Integrate A2A clients into your applications

## Troubleshooting

### Common Issues:

1. **Connection Refused Error**
   - Make sure the agent service is running: `cd a2a_service && uv run python -m a2a_service`
   - Check that the service is on the correct port (default: 10000)

2. **Timeout Errors**
   - LLM responses can take time, especially with the helpfulness evaluation loop
   - The timeout is set to 60 seconds, but you can increase it if needed

3. **Import Errors**
   - Make sure you have installed dependencies: `uv sync` or `pip install a2a-sdk httpx`

4. **Authentication Errors for Extended Card**
   - The example uses a dummy token - replace with real authentication in production

## Additional Resources

- [A2A Protocol Specification](https://github.com/missingstudio/a2a-protocol)
- [LangGraph Documentation](https://python.langchain.com/docs/langgraph)
- [Project README](../README.md)
