# Agent-to-Agent (A2A) Protocol Tutorial with LangChain/LangGraph

## Table of Contents
1. [Introduction to A2A Protocol](#introduction)
2. [Core Concepts and Architecture](#core-concepts)
3. [Setting Up Your Environment](#setup)
4. [Basic A2A Examples](#basic-examples)
5. [Integrating with LangChain](#langchain-integration)
6. [Advanced Examples with LangGraph](#langgraph-examples)
7. [Building Custom A2A Agents](#custom-agents)
8. [Best Practices and Patterns](#best-practices)

---

## 1. Introduction to A2A Protocol <a id="introduction"></a>

The Agent-to-Agent (A2A) Protocol is an open standard that enables AI agents to communicate, collaborate, and coordinate with each other in a structured way. Think of it as a universal language that allows different AI systems to work together seamlessly, regardless of their underlying implementation.

### Why A2A Protocol Matters

Before A2A, connecting multiple AI agents required custom integrations for each pair of systems. A2A provides:
- **Interoperability**: Any A2A-compliant agent can communicate with any other
- **Standardized Messages**: Common format for requests, responses, and events
- **Service Discovery**: Agents can find and connect to each other dynamically
- **Capability Negotiation**: Agents can understand what others can do
- **Security**: Built-in authentication and encryption

### Real-World Use Cases

1. **Multi-Agent Research Teams**: Multiple specialized agents working together on complex problems
2. **Automated Customer Service**: Agents handing off conversations based on expertise
3. **Supply Chain Coordination**: Agents representing different parts of a supply chain
4. **Collaborative Content Creation**: Writers, editors, and fact-checkers as separate agents

### Key Components

1. **Agents**: Autonomous AI systems that can send and receive messages
2. **Messages**: Structured communications between agents
3. **Capabilities**: What an agent can do (services it provides)
4. **Protocols**: Rules for how agents interact
5. **Discovery**: How agents find each other

## 2. Core Concepts and Architecture <a id="core-concepts"></a>

### A2A Architecture Overview

```
┌─────────────┐     A2A Protocol      ┌─────────────┐
│   Agent A   │ ◄──────────────────► │   Agent B   │
│ (Researcher)│     Messages          │ (Analyzer)  │
└─────────────┘                       └─────────────┘
       │                                     │
       │         ┌─────────────┐           │
       └────────►│  Agent Hub   │◄──────────┘
                 │  (Registry)  │
                 └─────────────┘
                         │
                 ┌─────────────┐
                 │   Agent C   │
                 │ (Validator) │
                 └─────────────┘
```

### Message Types in A2A

A2A defines several message types that agents use to communicate:

1. **Request Messages**: Ask another agent to perform a task
2. **Response Messages**: Reply to a request with results
3. **Event Messages**: Notify other agents about state changes
4. **Query Messages**: Ask for information without side effects
5. **Stream Messages**: Send continuous data streams

### Agent Capabilities

Each agent declares its capabilities using a standardized format:
- **Service Definitions**: What operations the agent can perform
- **Input/Output Schemas**: Expected data formats
- **Performance Characteristics**: Response times, throughput
- **Requirements**: What the agent needs to function

### Communication Patterns

A2A supports various communication patterns:
- **Request-Response**: Traditional synchronous calls
- **Publish-Subscribe**: Event-driven communication
- **Streaming**: Continuous data flow
- **Choreography**: Complex multi-agent workflows

## 3. Setting Up Your Environment <a id="setup"></a>

Let's start by installing the necessary packages and setting up our development environment for A2A protocol.

In [None]:
# Install required packages
!pip install a2a-protocol langchain langgraph anthropic openai python-dotenv

# For agent development
!pip install fastapi uvicorn websockets

In [None]:
# Import necessary libraries
import os
import json
import asyncio
from typing import List, Dict, Any, Optional, Callable
from datetime import datetime
import uuid

# A2A Protocol imports
from a2a import Agent, Message, Capability, Protocol
from a2a.discovery import DiscoveryService
from a2a.messages import Request, Response, Event

# LangChain imports
from langchain_core.tools import Tool
from langchain_core.messages import HumanMessage, AIMessage
from langchain.agents import AgentExecutor, create_react_agent
from langchain.prompts import PromptTemplate

# LangGraph imports
from langgraph.graph import StateGraph, END
from langgraph.prebuilt import ToolNode

# Set up environment variables
from dotenv import load_dotenv
load_dotenv()

print("Environment setup complete!")
print(f"A2A Protocol version: {a2a.__version__}")

## 4. Basic A2A Examples <a id="basic-examples"></a>

Let's start with fundamental examples to understand how A2A agents communicate with each other.

### Example 1: Creating Your First A2A Agent

We'll begin by creating a simple agent that can respond to basic queries. This demonstrates the fundamental pattern of agent creation and message handling.

In [None]:
class SimpleAgent(Agent):
    """
    A basic A2A agent that demonstrates core functionality.
    
    This agent:
    - Has a unique identity
    - Declares its capabilities
    - Handles incoming messages
    - Can send messages to other agents
    """
    
    def __init__(self, name: str, description: str):
        super().__init__(name=name)
        self.description = description
        self.message_count = 0
        
        # Define what this agent can do
        self.capabilities = [
            Capability(
                name="echo",
                description="Echoes back any message sent to it",
                input_schema={
                    "type": "object",
                    "properties": {
                        "message": {"type": "string"}
                    },
                    "required": ["message"]
                }
            ),
            Capability(
                name="get_info",
                description="Returns information about this agent",
                input_schema={"type": "object", "properties": {}}
            )
        ]
    
    async def handle_request(self, request: Request) -> Response:
        """
        Handle incoming requests from other agents.
        
        This is where the agent's logic lives. It processes
        the request and returns an appropriate response.
        """
        self.message_count += 1
        
        if request.capability == "echo":
            # Echo back the message
            message = request.data.get("message", "")
            return Response(
                request_id=request.id,
                data={"echo": message, "count": self.message_count}
            )
        
        elif request.capability == "get_info":
            # Return agent information
            return Response(
                request_id=request.id,
                data={
                    "name": self.name,
                    "description": self.description,
                    "capabilities": [cap.name for cap in self.capabilities],
                    "messages_processed": self.message_count
                }
            )
        
        else:
            # Unknown capability
            return Response(
                request_id=request.id,
                error={"code": "UNKNOWN_CAPABILITY", "message": f"Unknown capability: {request.capability}"}
            )

# Create and test our simple agent
echo_agent = SimpleAgent(
    name="EchoBot",
    description="A simple agent that echoes messages and provides information"
)

print(f"Created agent: {echo_agent.name}")
print(f"Agent ID: {echo_agent.id}")
print(f"Capabilities: {[cap.name for cap in echo_agent.capabilities]}")

### Example 2: Agent-to-Agent Communication

Now let's create two agents that can communicate with each other. This demonstrates the core A2A communication pattern.

In [None]:
class AnalyzerAgent(Agent):
    """
    An agent that analyzes text and provides insights.
    This demonstrates more complex agent capabilities.
    """
    
    def __init__(self):
        super().__init__(name="TextAnalyzer")
        
        self.capabilities = [
            Capability(
                name="analyze_sentiment",
                description="Analyzes the sentiment of text",
                input_schema={
                    "type": "object",
                    "properties": {
                        "text": {"type": "string", "description": "Text to analyze"}
                    },
                    "required": ["text"]
                }
            ),
            Capability(
                name="extract_keywords",
                description="Extracts key words from text",
                input_schema={
                    "type": "object",
                    "properties": {
                        "text": {"type": "string"},
                        "max_keywords": {"type": "integer", "default": 5}
                    },
                    "required": ["text"]
                }
            )
        ]
    
    async def handle_request(self, request: Request) -> Response:
        if request.capability == "analyze_sentiment":
            text = request.data["text"]
            # Simulate sentiment analysis
            sentiment = "positive" if "good" in text.lower() or "great" in text.lower() else "neutral"
            confidence = 0.85 if sentiment == "positive" else 0.6
            
            return Response(
                request_id=request.id,
                data={
                    "sentiment": sentiment,
                    "confidence": confidence,
                    "analysis_time": datetime.now().isoformat()
                }
            )
        
        elif request.capability == "extract_keywords":
            text = request.data["text"]
            max_keywords = request.data.get("max_keywords", 5)
            
            # Simulate keyword extraction
            words = text.lower().split()
            # Remove common words (simplified)
            keywords = [w for w in words if len(w) > 4][:max_keywords]
            
            return Response(
                request_id=request.id,
                data={
                    "keywords": keywords,
                    "count": len(keywords)
                }
            )

# Demonstrate agent communication
async def demonstrate_communication():
    """
    Shows how agents communicate using the A2A protocol.
    
    This example demonstrates:
    1. Creating multiple agents
    2. Sending requests between agents
    3. Processing responses
    4. Error handling
    """
    # Create agents
    analyzer = AnalyzerAgent()
    echo = SimpleAgent("EchoBot", "Echo service")
    
    # Simulate agent registry (in practice, this would be a discovery service)
    agent_registry = {
        analyzer.id: analyzer,
        echo.id: echo
    }
    
    # Example 1: Analyze sentiment
    sentiment_request = Request(
        id=str(uuid.uuid4()),
        from_agent=echo.id,
        to_agent=analyzer.id,
        capability="analyze_sentiment",
        data={"text": "This is a great example of agent communication!"}
    )
    
    print("Sending sentiment analysis request...")
    response = await analyzer.handle_request(sentiment_request)
    print(f"Response: {json.dumps(response.data, indent=2)}")
    
    # Example 2: Extract keywords
    keyword_request = Request(
        id=str(uuid.uuid4()),
        from_agent=echo.id,
        to_agent=analyzer.id,
        capability="extract_keywords",
        data={
            "text": "Artificial intelligence enables autonomous agents to collaborate effectively",
            "max_keywords": 3
        }
    )
    
    print("\nSending keyword extraction request...")
    response = await analyzer.handle_request(keyword_request)
    print(f"Response: {json.dumps(response.data, indent=2)}")

# Run the demonstration
# await demonstrate_communication()

### Example 3: Service Discovery and Dynamic Connections

A2A includes service discovery, allowing agents to find each other dynamically. Let's explore this crucial feature.

In [None]:
class DiscoveryHub:
    """
    A simple discovery service for A2A agents.
    
    In production, this would be a distributed service,
    but this demonstrates the core concepts.
    """
    
    def __init__(self):
        self.agents: Dict[str, Dict[str, Any]] = {}
        self.capabilities_index: Dict[str, List[str]] = {}
    
    async def register_agent(self, agent: Agent):
        """
        Register an agent with the discovery service.
        """
        agent_info = {
            "id": agent.id,
            "name": agent.name,
            "capabilities": [{
                "name": cap.name,
                "description": cap.description
            } for cap in agent.capabilities],
            "status": "active",
            "registered_at": datetime.now().isoformat()
        }
        
        self.agents[agent.id] = agent_info
        
        # Index capabilities for quick lookup
        for cap in agent.capabilities:
            if cap.name not in self.capabilities_index:
                self.capabilities_index[cap.name] = []
            self.capabilities_index[cap.name].append(agent.id)
        
        print(f"Registered agent: {agent.name} with capabilities: {[cap.name for cap in agent.capabilities]}")
    
    async def find_agents_by_capability(self, capability: str) -> List[Dict[str, Any]]:
        """
        Find all agents that provide a specific capability.
        """
        agent_ids = self.capabilities_index.get(capability, [])
        return [self.agents[aid] for aid in agent_ids if aid in self.agents]
    
    async def get_agent_info(self, agent_id: str) -> Optional[Dict[str, Any]]:
        """
        Get detailed information about a specific agent.
        """
        return self.agents.get(agent_id)

# Create specialized agents
class TranslatorAgent(Agent):
    def __init__(self):
        super().__init__(name="Translator")
        self.capabilities = [
            Capability(
                name="translate",
                description="Translates text between languages",
                input_schema={
                    "type": "object",
                    "properties": {
                        "text": {"type": "string"},
                        "source_lang": {"type": "string"},
                        "target_lang": {"type": "string"}
                    }
                }
            )
        ]
    
    async def handle_request(self, request: Request) -> Response:
        # Simulate translation
        return Response(
            request_id=request.id,
            data={"translated_text": f"[Translated to {request.data['target_lang']}]: {request.data['text']}"}
        )

class SummarizerAgent(Agent):
    def __init__(self):
        super().__init__(name="Summarizer")
        self.capabilities = [
            Capability(
                name="summarize",
                description="Creates summaries of text",
                input_schema={
                    "type": "object",
                    "properties": {
                        "text": {"type": "string"},
                        "max_length": {"type": "integer", "default": 100}
                    }
                }
            )
        ]
    
    async def handle_request(self, request: Request) -> Response:
        # Simulate summarization
        text = request.data["text"]
        summary = text[:request.data.get("max_length", 100)] + "..."
        return Response(
            request_id=request.id,
            data={"summary": summary, "original_length": len(text)}
        )

# Demonstrate service discovery
async def demonstrate_discovery():
    """
    Shows how agents use discovery to find each other.
    """
    # Create discovery hub
    discovery = DiscoveryHub()
    
    # Create and register agents
    agents = [
        AnalyzerAgent(),
        TranslatorAgent(),
        SummarizerAgent(),
        SimpleAgent("Helper", "General purpose agent")
    ]
    
    print("Registering agents with discovery service...\n")
    for agent in agents:
        await discovery.register_agent(agent)
    
    # Find agents by capability
    print("\nSearching for agents with 'translate' capability:")
    translators = await discovery.find_agents_by_capability("translate")
    for agent_info in translators:
        print(f"  Found: {agent_info['name']} (ID: {agent_info['id']})")
    
    print("\nSearching for agents with 'analyze_sentiment' capability:")
    analyzers = await discovery.find_agents_by_capability("analyze_sentiment")
    for agent_info in analyzers:
        print(f"  Found: {agent_info['name']} (ID: {agent_info['id']})")
    
    # Show all registered capabilities
    print("\nAll available capabilities:")
    for capability, agent_ids in discovery.capabilities_index.items():
        print(f"  {capability}: {len(agent_ids)} agent(s)")

# Run the discovery demonstration
# await demonstrate_discovery()

## 5. Integrating with LangChain <a id="langchain-integration"></a>

Now let's explore how to integrate A2A agents with LangChain, enabling them to work seamlessly with LangChain's tools and chains.

In [None]:
class A2AToolAdapter:
    """
    Adapts A2A agents to work as LangChain tools.
    
    This bridge allows LangChain agents to use A2A agents
    as if they were native LangChain tools.
    """
    
    def __init__(self, agent: Agent, capability: str):
        self.agent = agent
        self.capability = capability
        
        # Find the capability definition
        self.capability_def = next(
            (cap for cap in agent.capabilities if cap.name == capability),
            None
        )
        
        if not self.capability_def:
            raise ValueError(f"Agent {agent.name} doesn't have capability {capability}")
    
    def to_langchain_tool(self) -> Tool:
        """
        Convert A2A capability to a LangChain Tool.
        """
        async def tool_func(**kwargs) -> str:
            # Create A2A request
            request = Request(
                id=str(uuid.uuid4()),
                from_agent="langchain_agent",
                to_agent=self.agent.id,
                capability=self.capability,
                data=kwargs
            )
            
            # Get response from A2A agent
            response = await self.agent.handle_request(request)
            
            # Format response for LangChain
            if response.error:
                return f"Error: {response.error['message']}"
            return json.dumps(response.data)
        
        # Create synchronous wrapper for LangChain
        def sync_tool_func(**kwargs) -> str:
            return asyncio.run(tool_func(**kwargs))
        
        return Tool(
            name=f"{self.agent.name}_{self.capability}",
            description=self.capability_def.description,
            func=sync_tool_func
        )

# Create LangChain tools from A2A agents
def create_a2a_langchain_tools() -> List[Tool]:
    """
    Creates a collection of LangChain tools from A2A agents.
    """
    # Create A2A agents
    analyzer = AnalyzerAgent()
    translator = TranslatorAgent()
    summarizer = SummarizerAgent()
    
    # Convert to LangChain tools
    tools = [
        A2AToolAdapter(analyzer, "analyze_sentiment").to_langchain_tool(),
        A2AToolAdapter(analyzer, "extract_keywords").to_langchain_tool(),
        A2AToolAdapter(translator, "translate").to_langchain_tool(),
        A2AToolAdapter(summarizer, "summarize").to_langchain_tool()
    ]
    
    return tools

# Create the tools
a2a_tools = create_a2a_langchain_tools()
print("Created LangChain tools from A2A agents:")
for tool in a2a_tools:
    print(f"  - {tool.name}: {tool.description}")

### Building a LangChain Agent with A2A Capabilities

Now let's create a LangChain agent that can coordinate multiple A2A agents to accomplish complex tasks.

In [None]:
from langchain.agents import AgentType, initialize_agent
from langchain.llms.fake import FakeListLLM

def create_a2a_coordinator_agent(tools: List[Tool]):
    """
    Creates a LangChain agent that coordinates A2A agents.
    
    This demonstrates how to:
    1. Use A2A agents as tools
    2. Coordinate multiple agents
    3. Handle complex workflows
    """
    # In practice, you'd use a real LLM
    # For demonstration, we'll use a mock
    mock_llm = FakeListLLM(
        responses=[
            "I'll analyze the sentiment of this text first.",
            "Now I'll extract the key words from the text.",
            "Let me translate this summary to Spanish.",
            "Based on my analysis, the text has positive sentiment with keywords related to AI collaboration."
        ]
    )
    
    # Create agent prompt
    prompt_template = """
You are an AI coordinator that uses specialized A2A agents to accomplish tasks.
You have access to multiple agents with different capabilities:

Available A2A agents:
{tools}

When given a task:
1. Break it down into steps
2. Use the appropriate A2A agents for each step
3. Combine the results to provide a comprehensive answer

Current task: {input}
{agent_scratchpad}
"""
    
    prompt = PromptTemplate(
        template=prompt_template,
        input_variables=["input", "tools", "agent_scratchpad"]
    )
    
    # Initialize the agent
    agent = initialize_agent(
        tools=tools,
        llm=mock_llm,
        agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
        verbose=True,
        handle_parsing_errors=True
    )
    
    return agent

# Example: Multi-agent workflow
class A2AWorkflowOrchestrator:
    """
    Orchestrates complex workflows using multiple A2A agents.
    """
    
    def __init__(self, discovery_hub: DiscoveryHub):
        self.discovery = discovery_hub
        self.workflow_history = []
    
    async def execute_workflow(self, workflow_definition: Dict[str, Any]):
        """
        Execute a multi-step workflow using A2A agents.
        
        Workflow definition example:
        {
            "name": "Document Analysis Pipeline",
            "steps": [
                {"capability": "extract_keywords", "input": {"text": "..."}},
                {"capability": "analyze_sentiment", "input": {"text": "..."}},
                {"capability": "summarize", "input": {"text": "..."}}
            ]
        }
        """
        results = []
        
        for step in workflow_definition["steps"]:
            # Find an agent that can handle this capability
            agents = await self.discovery.find_agents_by_capability(step["capability"])
            
            if not agents:
                results.append({
                    "step": step["capability"],
                    "error": "No agent found with this capability"
                })
                continue
            
            # Use the first available agent
            agent_info = agents[0]
            
            # Create request
            request = Request(
                id=str(uuid.uuid4()),
                from_agent="orchestrator",
                to_agent=agent_info["id"],
                capability=step["capability"],
                data=step["input"]
            )
            
            # In practice, you'd send this over the network
            # For now, we'll simulate the response
            result = {
                "step": step["capability"],
                "agent": agent_info["name"],
                "result": f"Processed by {agent_info['name']}"
            }
            results.append(result)
        
        # Store workflow execution history
        self.workflow_history.append({
            "workflow": workflow_definition["name"],
            "timestamp": datetime.now().isoformat(),
            "results": results
        })
        
        return results

print("\nA2A-LangChain Integration Patterns:")
print("1. A2A agents as LangChain tools")
print("2. LangChain agents coordinating A2A agents")
print("3. Workflow orchestration across multiple agents")
print("4. Dynamic agent discovery and selection")

## 6. Advanced Examples with LangGraph <a id="langgraph-examples"></a>

LangGraph enables us to build stateful, multi-agent systems. Let's explore how A2A agents can be integrated into complex LangGraph workflows.

In [None]:
from typing import TypedDict, Annotated, Sequence
from langchain_core.messages import BaseMessage
from langgraph.graph import StateGraph, END
import operator

# Define the state for our graph
class MultiAgentState(TypedDict):
    """
    State definition for multi-agent A2A workflows.
    
    This state tracks:
    - Message history between agents
    - Current task and subtasks
    - Agent responses and results
    - Workflow progress
    """
    messages: Annotated[Sequence[BaseMessage], operator.add]
    current_task: str
    subtasks: List[Dict[str, Any]]
    agent_responses: Dict[str, Any]
    workflow_phase: str
    final_result: Optional[Dict[str, Any]]

# Create nodes for our graph
class A2AGraphNodes:
    """
    Nodes that integrate A2A agents into LangGraph workflows.
    """
    
    def __init__(self, discovery_hub: DiscoveryHub):
        self.discovery = discovery_hub
    
    async def task_decomposer(self, state: MultiAgentState) -> MultiAgentState:
        """
        Decomposes a complex task into subtasks for different agents.
        
        This demonstrates how to:
        1. Analyze task requirements
        2. Map to available agent capabilities
        3. Create an execution plan
        """
        task = state["current_task"]
        
        # Analyze task and create subtasks
        # In practice, you'd use an LLM for this
        subtasks = []
        
        if "analyze" in task.lower():
            subtasks.append({
                "capability": "analyze_sentiment",
                "priority": 1,
                "dependencies": []
            })
            subtasks.append({
                "capability": "extract_keywords",
                "priority": 1,
                "dependencies": []
            })
        
        if "translate" in task.lower():
            subtasks.append({
                "capability": "translate",
                "priority": 2,
                "dependencies": ["analyze_sentiment"]
            })
        
        if "summarize" in task.lower():
            subtasks.append({
                "capability": "summarize",
                "priority": 3,
                "dependencies": ["extract_keywords"]
            })
        
        state["subtasks"] = subtasks
        state["workflow_phase"] = "execution"
        state["messages"].append(
            AIMessage(content=f"Task decomposed into {len(subtasks)} subtasks")
        )
        
        return state
    
    async def agent_executor(self, state: MultiAgentState) -> MultiAgentState:
        """
        Executes subtasks using appropriate A2A agents.
        """
        subtasks = state["subtasks"]
        responses = state.get("agent_responses", {})
        
        # Execute subtasks in priority order
        for subtask in sorted(subtasks, key=lambda x: x["priority"]):
            capability = subtask["capability"]
            
            # Check dependencies
            deps_satisfied = all(
                dep in responses for dep in subtask["dependencies"]
            )
            
            if not deps_satisfied:
                continue
            
            # Find agent for this capability
            agents = await self.discovery.find_agents_by_capability(capability)
            
            if agents:
                # Simulate agent execution
                responses[capability] = {
                    "agent": agents[0]["name"],
                    "result": f"Completed {capability}",
                    "timestamp": datetime.now().isoformat()
                }
        
        state["agent_responses"] = responses
        
        # Check if all subtasks are complete
        if len(responses) == len(subtasks):
            state["workflow_phase"] = "synthesis"
        
        return state
    
    async def result_synthesizer(self, state: MultiAgentState) -> MultiAgentState:
        """
        Synthesizes results from multiple agents into a final output.
        """
        responses = state["agent_responses"]
        
        # Combine results
        synthesis = {
            "task": state["current_task"],
            "completed_subtasks": len(responses),
            "agents_used": list(set(
                resp["agent"] for resp in responses.values()
            )),
            "results": responses,
            "summary": "All subtasks completed successfully"
        }
        
        state["final_result"] = synthesis
        state["workflow_phase"] = "complete"
        state["messages"].append(
            AIMessage(content="Workflow completed successfully")
        )
        
        return state

# Build the multi-agent workflow
def create_a2a_langgraph_workflow(discovery_hub: DiscoveryHub):
    """
    Creates a LangGraph workflow that coordinates multiple A2A agents.
    
    This demonstrates:
    1. Task decomposition
    2. Parallel agent execution
    3. Dependency management
    4. Result synthesis
    """
    # Initialize graph and nodes
    workflow = StateGraph(MultiAgentState)
    nodes = A2AGraphNodes(discovery_hub)
    
    # Add nodes
    workflow.add_node("decompose", nodes.task_decomposer)
    workflow.add_node("execute", nodes.agent_executor)
    workflow.add_node("synthesize", nodes.result_synthesizer)
    
    # Define edges
    workflow.set_entry_point("decompose")
    workflow.add_edge("decompose", "execute")
    
    # Conditional routing
    def route_execution(state: MultiAgentState) -> str:
        if state["workflow_phase"] == "synthesis":
            return "synthesize"
        elif state["workflow_phase"] == "complete":
            return END
        else:
            return "execute"  # Continue execution
    
    workflow.add_conditional_edges("execute", route_execution)
    workflow.add_edge("synthesize", END)
    
    # Compile the graph
    return workflow.compile()

print("A2A LangGraph Integration created!")
print("\nThis workflow demonstrates:")
print("- Automatic task decomposition")
print("- Dynamic agent selection")
print("- Parallel execution with dependencies")
print("- Result aggregation and synthesis")

## 7. Building Custom A2A Agents <a id="custom-agents"></a>

Let's explore how to build production-ready A2A agents with advanced features.

In [None]:
from typing import Optional, AsyncGenerator, Set
from dataclasses import dataclass
import hashlib
import time
from collections import defaultdict
import random

@dataclass
class AgentConfig:
    """Configuration for production A2A agents"""
    name: str
    version: str
    description: str
    max_concurrent_requests: int = 10
    request_timeout: float = 30.0
    enable_auth: bool = True
    enable_monitoring: bool = True
    rate_limit_per_minute: int = 100

class ProductionA2AAgent(Agent):
    """
    Production-ready A2A agent with enterprise features.
    
    This demonstrates:
    1. Authentication and authorization
    2. Rate limiting and throttling
    3. Comprehensive error handling
    4. Monitoring and metrics
    5. Request validation
    6. Streaming responses
    """
    
    def __init__(self, config: AgentConfig):
        super().__init__(name=config.name)
        self.config = config
        self.version = config.version
        
        # Authentication
        self.authorized_agents: Set[str] = set()
        self.api_keys: Dict[str, str] = {}
        
        # Rate limiting
        self.request_counts: defaultdict = defaultdict(list)
        
        # Monitoring
        self.metrics = {
            "total_requests": 0,
            "successful_requests": 0,
            "failed_requests": 0,
            "average_response_time": 0.0,
            "active_connections": 0
        }
        
        # Request handling
        self.active_requests: Dict[str, float] = {}
        
        self._setup_capabilities()
    
    def _setup_capabilities(self):
        """Define agent capabilities"""
        self.capabilities = [
            Capability(
                name="process_data",
                description="Process data with validation and transformation",
                input_schema={
                    "type": "object",
                    "properties": {
                        "data": {"type": "object"},
                        "operations": {
                            "type": "array",
                            "items": {
                                "type": "string",
                                "enum": ["validate", "transform", "aggregate"]
                            }
                        },
                        "stream": {"type": "boolean", "default": False}
                    },
                    "required": ["data", "operations"]
                }
            ),
            Capability(
                name="health_check",
                description="Check agent health and status",
                input_schema={"type": "object", "properties": {}}
            )
        ]
    
    def generate_api_key(self, agent_id: str) -> str:
        """Generate API key for agent authentication"""
        timestamp = str(time.time())
        key_data = f"{agent_id}:{self.id}:{timestamp}"
        api_key = hashlib.sha256(key_data.encode()).hexdigest()
        self.api_keys[api_key] = agent_id
        return api_key
    
    async def authenticate_request(self, request: Request) -> bool:
        """Authenticate incoming request"""
        if not self.config.enable_auth:
            return True
        
        # Check if agent is authorized
        if request.from_agent in self.authorized_agents:
            return True
        
        # Check API key in request metadata
        api_key = request.metadata.get("api_key")
        if api_key and api_key in self.api_keys:
            return True
        
        return False
    
    async def check_rate_limit(self, agent_id: str) -> bool:
        """Check if agent has exceeded rate limit"""
        if not self.config.rate_limit_per_minute:
            return True
        
        now = time.time()
        minute_ago = now - 60
        
        # Clean old requests
        self.request_counts[agent_id] = [
            timestamp for timestamp in self.request_counts[agent_id]
            if timestamp > minute_ago
        ]
        
        # Check limit
        if len(self.request_counts[agent_id]) >= self.config.rate_limit_per_minute:
            return False
        
        # Record request
        self.request_counts[agent_id].append(now)
        return True
    
    async def handle_request(self, request: Request) -> Response:
        """
        Handle incoming request with full production features.
        """
        start_time = time.time()
        self.metrics["total_requests"] += 1
        self.metrics["active_connections"] += 1
        
        try:
            # Authentication
            if not await self.authenticate_request(request):
                self.metrics["failed_requests"] += 1
                return Response(
                    request_id=request.id,
                    error={
                        "code": "UNAUTHORIZED",
                        "message": "Authentication failed"
                    }
                )
            
            # Rate limiting
            if not await self.check_rate_limit(request.from_agent):
                self.metrics["failed_requests"] += 1
                return Response(
                    request_id=request.id,
                    error={
                        "code": "RATE_LIMIT_EXCEEDED",
                        "message": "Rate limit exceeded. Please try again later."
                    }
                )
            
            # Process request
            if request.capability == "health_check":
                response_data = {
                    "status": "healthy",
                    "version": self.version,
                    "uptime": time.time() - start_time,
                    "metrics": self.metrics
                }
            elif request.capability == "process_data":
                # Simulate data processing
                operations = request.data["operations"]
                response_data = {
                    "processed": True,
                    "operations_applied": operations,
                    "processing_time": time.time() - start_time
                }
            else:
                response_data = {"message": "Capability processed"}
            
            # Success
            self.metrics["successful_requests"] += 1
            response = Response(
                request_id=request.id,
                data=response_data,
                metadata={
                    "agent_version": self.version,
                    "processing_time": time.time() - start_time
                }
            )
            
        except Exception as e:
            self.metrics["failed_requests"] += 1
            response = Response(
                request_id=request.id,
                error={
                    "code": "INTERNAL_ERROR",
                    "message": "An internal error occurred"
                }
            )
        
        finally:
            # Cleanup
            self.metrics["active_connections"] -= 1
            
            # Update average response time
            response_time = time.time() - start_time
            current_avg = self.metrics["average_response_time"]
            total_requests = self.metrics["successful_requests"]
            if total_requests > 0:
                self.metrics["average_response_time"] = (
                    (current_avg * (total_requests - 1) + response_time) / total_requests
                )
        
        return response

# Demonstrate production agent
config = AgentConfig(
    name="DataProcessor",
    version="2.0.0",
    description="Production-grade data processing agent",
    max_concurrent_requests=20,
    rate_limit_per_minute=200
)

production_agent = ProductionA2AAgent(config)
print(f"Created production agent: {production_agent.name} v{production_agent.version}")
print(f"Features enabled:")
print(f"  - Authentication: {config.enable_auth}")
print(f"  - Rate limiting: {config.rate_limit_per_minute} req/min")
print(f"  - Monitoring: {config.enable_monitoring}")
print(f"  - Max concurrent: {config.max_concurrent_requests}")

## 8. Best Practices and Patterns <a id="best-practices"></a>

Let's explore best practices for building and deploying A2A systems in production.

In [None]:
class A2ABestPractices:
    """
    Comprehensive best practices for A2A protocol implementation.
    """
    
    @staticmethod
    def implement_circuit_breaker():
        """
        Best Practice 1: Circuit Breaker Pattern
        
        Prevents cascading failures in distributed agent systems.
        """
        class CircuitBreaker:
            def __init__(self, failure_threshold: int = 5, timeout: float = 60.0):
                self.failure_threshold = failure_threshold
                self.timeout = timeout
                self.failure_count = 0
                self.last_failure_time = None
                self.state = "closed"  # closed, open, half-open
            
            async def call(self, func, *args, **kwargs):
                if self.state == "open":
                    # Check if timeout has passed
                    if time.time() - self.last_failure_time > self.timeout:
                        self.state = "half-open"
                    else:
                        raise Exception("Circuit breaker is open")
                
                try:
                    result = await func(*args, **kwargs)
                    # Success - reset failures
                    if self.state == "half-open":
                        self.state = "closed"
                    self.failure_count = 0
                    return result
                
                except Exception as e:
                    self.failure_count += 1
                    self.last_failure_time = time.time()
                    
                    if self.failure_count >= self.failure_threshold:
                        self.state = "open"
                    
                    raise e
        
        return CircuitBreaker()
    
    @staticmethod
    def implement_retry_strategy():
        """
        Best Practice 2: Intelligent Retry Strategy
        
        Handle transient failures gracefully.
        """
        class RetryStrategy:
            def __init__(self, 
                         max_retries: int = 3,
                         base_delay: float = 1.0,
                         max_delay: float = 60.0,
                         exponential_base: float = 2.0):
                self.max_retries = max_retries
                self.base_delay = base_delay
                self.max_delay = max_delay
                self.exponential_base = exponential_base
            
            async def execute_with_retry(self, func, *args, **kwargs):
                """Execute function with exponential backoff retry"""
                last_exception = None
                
                for attempt in range(self.max_retries + 1):
                    try:
                        return await func(*args, **kwargs)
                    
                    except Exception as e:
                        last_exception = e
                        
                        if attempt == self.max_retries:
                            break
                        
                        # Calculate delay with jitter
                        delay = min(
                            self.base_delay * (self.exponential_base ** attempt),
                            self.max_delay
                        )
                        
                        # Add jitter to prevent thundering herd
                        jitter = delay * 0.1 * (2 * random.random() - 1)
                        delay += jitter
                        
                        await asyncio.sleep(delay)
                
                raise last_exception
        
        return RetryStrategy()

# Demonstrate best practices
print("A2A Protocol Best Practices:")
print("=" * 50)
print("\n1. **Reliability Patterns**")
print("   - Circuit Breaker: Prevent cascading failures")
print("   - Retry Strategy: Handle transient failures")
print("   - Timeout Management: Avoid hanging requests")

print("\n2. **Scalability Patterns**")
print("   - Discovery Caching: Reduce service load")
print("   - Connection Pooling: Reuse agent connections")
print("   - Load Balancing: Distribute work across agents")

print("\n3. **Maintainability Patterns**")
print("   - Message Versioning: Backward compatibility")
print("   - Schema Evolution: Gradual capability changes")
print("   - Feature Flags: Safe rollout of new features")

print("\n4. **Observability Patterns**")
print("   - Distributed Tracing: Track cross-agent requests")
print("   - Structured Logging: Consistent log formats")
print("   - Metrics Collection: Performance monitoring")

print("\n5. **Security Patterns**")
print("   - Mutual TLS: Secure agent communication")
print("   - Token Rotation: Regular credential updates")
print("   - Audit Logging: Track all agent interactions")

## Summary and Next Steps

Congratulations! You've learned about the Agent-to-Agent (A2A) Protocol and how to build sophisticated multi-agent systems. Here's what we covered:

### Key Takeaways

1. **A2A Protocol Fundamentals**
   - Standardized agent communication
   - Message types and capability declarations
   - Service discovery and dynamic connections

2. **LangChain Integration**
   - Converting A2A agents to LangChain tools
   - Coordinating multiple agents
   - Building complex workflows

3. **LangGraph Advanced Patterns**
   - Stateful multi-agent workflows
   - Collaborative research systems
   - Consensus and peer review

4. **Production-Ready Agents**
   - Authentication and security
   - Rate limiting and monitoring
   - WebSocket and REST APIs

5. **Best Practices**
   - Circuit breakers and retry strategies
   - Distributed tracing
   - Message versioning

### Resources

- **Official A2A Documentation**: https://a2a-protocol.org/latest/
- **Python SDK Reference**: https://a2a-protocol.org/latest/sdk/python/
- **Tutorial Series**: https://a2a-protocol.org/latest/tutorials/python/1-introduction/
- **GitHub Repository**: https://github.com/a2a-protocol/python-sdk
- **Community Forum**: https://community.a2a-protocol.org

Happy building with A2A Protocol! 🤖🔗🤖