Skip to content

Commit 030f58f

Browse files
Merge pull request #246 from microsoft/james-dev-agentic-ai
add A2A implementation for Semantic Kernel Multi-Agent
2 parents 7cb9059 + 720744d commit 030f58f

File tree

11 files changed

+910
-2
lines changed

11 files changed

+910
-2
lines changed

README.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ Welcome to the official repository for the Microsoft AI Agentic Workshop! This r
3333

3434
- **Configurable LLM Backend:** Use the latest Azure OpenAI GPT models (e.g., GPT-4.1, GPT-4o).
3535
- **MCP Server Integration:** Advanced tools to enhance agent orchestration and capabilities.
36+
- **A2A (Agent-to-Agent) Protocol Support:** Enables strict cross-domain, black-box multi-agent collaboration using [Google's A2A protocol](https://github.com/google-a2a/A2A).
3637
- **Flexible Agent Architecture:**
3738
- Supports single-agent, multi-agent, or reflection-based agents (selectable via `.env`).
3839
- Agents can self-loop, collaborate, reflect, or take on dynamic roles as defined in modules.
@@ -53,6 +54,15 @@ Welcome to the official repository for the Microsoft AI Agentic Workshop! This r
5354

5455
---
5556

57+
## 🆕 A2A (Agent-to-Agent) Cross-Domain Demo
58+
59+
This repository now supports a strict cross-domain multi-agent scenario using the A2A protocol, enabling message-driven, black-box collaboration between a customer-service agent and a logistics agent.
60+
61+
**A2A Example Included:**
62+
See [`agentic_ai/agents/semantic_kernel/multi_agent/a2a`](agentic_ai/agents/semantic_kernel/multi_agent/a2a).
63+
64+
---
65+
5666
## Contributing
5767

5868
Please review our [Code of Conduct](./CODE_OF_CONDUCT.md) and [Security Guidelines](./SECURITY.md) before contributing.

agentic_ai/agents/base_agent.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ def __init__(self, state_store: Dict[str, Any], session_id: str) -> None:
1717
self.azure_openai_key = os.getenv("AZURE_OPENAI_API_KEY")
1818
self.azure_openai_endpoint = os.getenv("AZURE_OPENAI_ENDPOINT")
1919
self.api_version = os.getenv("AZURE_OPENAI_API_VERSION")
20-
self.mcp_server_uri = os.getenv("MCP_SERVER_URI")
20+
self.mcp_server_uri = os.getenv("MCP_SERVER_URI")
2121
self.openai_model_name = os.getenv("OPENAI_MODEL_NAME")
2222

2323
self.session_id = session_id
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
# Cross-Domain Return-Pick-up Scheduling (A2A)
2+
3+
This A2A implementation demonstrates inter-domain communication between agents in different domains. Unlike inter-agent communication within a single domain or application—where participating agents typically have full transparency into each other’s details—cross-domain agent communication enforces strict modularity and abstraction. In cross-domain scenarios, the logic and implementation of each agent system are hidden from one another, and only high-level structured information is exchanged. This approach aligns with Google’s Agent-to-Agent (A2A) protocol principles.
4+
5+
### Scenario: Cross-Domain Return Pickup Scheduling
6+
7+
In this implementation, an agent within the Contoso Customer Service AI team collaborates with a Logistics Agent to arrange a product return pickup. After verifying the return eligibility, the Customer Service Agent initiates a multi-turn negotiation with the Logistics Agent to schedule a pickup at the customer's address. The process includes:
8+
9+
- The Customer Service Agent requesting available pickup slots from the Logistics Agent.
10+
- The Logistics Agent responding with a list of available date/time options.
11+
- The Customer Service Agent presenting these options to the customer and collecting a preferred slot.
12+
- The Customer Service Agent confirming the selected slot with the Logistics Agent, who in turn confirms logistics with the carrier and finalizes the arrangement.
13+
- Each communication is handled using high-level, schema-driven A2A messages, with neither agent exposing its internal logic, system details, or direct access to underlying services.
14+
15+
---
16+
17+
#### Mermaid Flow Diagram
18+
19+
```mermaid
20+
sequenceDiagram
21+
actor Customer
22+
participant CSAgent as Customer Service Agent
23+
participant LogAgent as Logistics Agent
24+
25+
Customer->>CSAgent: Request return for Order #85
26+
CSAgent->>Customer: Verifies eligibility, explains process
27+
CSAgent->>LogAgent: PickupAvailabilityRequest (address, preferences)
28+
LogAgent-->>CSAgent: PickupAvailabilityResponse (list of slots)
29+
CSAgent->>Customer: Presents pickup options
30+
Customer->>CSAgent: Chooses preferred slot
31+
CSAgent->>LogAgent: PickupRequestConfirmation (selected slot)
32+
LogAgent-->>CSAgent: PickupScheduledConfirmation (confirmation details)
33+
CSAgent->>Customer: Confirms pickup details, provides instructions
34+
```
35+
## Running the A2A Demo End-to-End
36+
37+
The repo ships three Python modules:
38+
39+
| File | Purpose |
40+
|---------------------------|-----------------------------------------------------------|
41+
| `logistic_mcp.py` | Internal Logistics **MCP** service (tools & DB) |
42+
| `logistic_a2a_server.py` | Thin **A2A façade** that wraps the MCP service |
43+
| `multi_agent_a2a.py` | Contoso **multi-agent** customer-service application |
44+
45+
---
46+
47+
### 1. Install Dependencies
48+
49+
```bash
50+
pip install -r requirements.txt
51+
# or manually:
52+
pip install a2a-sdk semantic-kernel uvicorn httpx python-dotenv
53+
```
54+
### 2. Prepare your .env
55+
56+
Create or edit .env in the `agentic_ai\applications` folder:
57+
58+
```env
59+
# ─── Contoso customer-service app ───────────────────────────────
60+
AGENT_MODULE="agents.semantic_kernel.multi_agent.a2a.multi_agent_a2a"
61+
62+
# ─── End-points used by the agents ──────────────────────────────
63+
LOGISTIC_MCP_SERVER_URI="http://localhost:8100/sse" # internal Fast-MCP
64+
LOGISTICS_A2A_URL="http://localhost:9100" # A2A wrapper
65+
```
66+
67+
Add your usual AZURE_OPENAI_* settings if you have not done so already.
68+
69+
---
70+
71+
### 3. Start the Back-End Services (Two Terminals)
72+
73+
```bash
74+
# Terminal ① – internal Logistics MCP
75+
python logistic_mcp.py # listens on :8100/sse
76+
77+
# Terminal ② – A2A façade
78+
python logistic_a2a_server.py # listens on :9100 (serves /.well-known/agent.json)
79+
```
80+
81+
---
82+
83+
### 4. Launch the Contoso Multi-Agent App under `agentic_ai\applications`
84+
85+
```bash
86+
./run_application.sh
87+
88+
```
89+
90+
The CS agent will now:
91+
92+
1. Verify product-return eligibility via the Contoso MCP tools.
93+
2. Talk to the Logistics agent through the **single free-text tool** exposed by the A2A server (no JSON payloads needed).
94+
3. Keep `taskId` and `contextId` in its session state so subsequent calls continue the same conversation on the Logistics side.
Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
"""
2+
Contoso – Logistics A2A façade
3+
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4+
Bridges the internal Fast-MCP logistics tools into the Google A2A
5+
protocol. All business logic continues to live in the MCP service; this
6+
wrapper merely acts as a protocol translator.
7+
8+
• Listens on http://0.0.0.0:9100/
9+
• Exposes one skill: return-pick-up scheduling
10+
• Streams a single final message per request
11+
"""
12+
from __future__ import annotations
13+
14+
import asyncio
15+
import json
16+
import logging
17+
import os
18+
from typing import Any
19+
20+
import uvicorn
21+
22+
from a2a.server.agent_execution import AgentExecutor,RequestContext
23+
from a2a.server.apps import A2AStarletteApplication
24+
from a2a.server.request_handlers import DefaultRequestHandler
25+
from a2a.server.tasks import InMemoryTaskStore
26+
from a2a.server.events import EventQueue
27+
from a2a.types import (
28+
AgentCapabilities,
29+
AgentCard,
30+
AgentSkill,
31+
Message,
32+
)
33+
from a2a.utils import new_agent_text_message, new_task
34+
from semantic_kernel.agents import ChatCompletionAgent
35+
from semantic_kernel.connectors.ai.open_ai import AzureChatCompletion
36+
from semantic_kernel.connectors.mcp import MCPSsePlugin
37+
from typing import Any, Dict, List, Optional
38+
39+
from dotenv import load_dotenv
40+
# ──────────────────────── Load environment variables ───────────
41+
load_dotenv()
42+
43+
# ───────────────────────── Logging ──────────────────────────
44+
logging.basicConfig(level=logging.INFO)
45+
log = logging.getLogger("logistics-a2a")
46+
47+
# ──────────────────────── Agent State Store ────────────────────────────
48+
AGENT_STATE_STORE: Dict[str, Any] = {}
49+
50+
# ───────────────────────── Environment ──────────────────────
51+
MCP_URI = os.getenv("LOGISTIC_MCP_SERVER_URI", "http://localhost:8100/sse")
52+
AZ_DEPLOYMENT = os.getenv("AZURE_OPENAI_CHAT_DEPLOYMENT")
53+
54+
# ─────────────────────── Build SK Logistics agent ───────────
55+
async def build_sk_logistics_agent() -> ChatCompletionAgent:
56+
"""
57+
Creates the Semantic-Kernel ChatCompletionAgent and opens the SSE
58+
connection to the Fast-MCP server.
59+
"""
60+
logistic_plugin = MCPSsePlugin(
61+
name="LogisticMCP",
62+
description="Logistics MCP plugin",
63+
url=MCP_URI,
64+
headers={"Content-Type": "application/json"},
65+
timeout=30,
66+
)
67+
await logistic_plugin.connect()
68+
69+
instructions = (
70+
"You are the Logistics AI agent responsible for arranging product-return "
71+
"pick-ups."
72+
"Supported request types:\n"
73+
" • availability_request\n"
74+
" • schedule_pickup\n"
75+
" • cancel_request\n"
76+
)
77+
78+
agent = ChatCompletionAgent(
79+
name="logistics_sk_agent",
80+
service=AzureChatCompletion(deployment_name=AZ_DEPLOYMENT),
81+
instructions=instructions,
82+
plugins=[logistic_plugin],
83+
)
84+
return agent
85+
86+
87+
# ──────────────────────── Agent Executor ─────────────────────
88+
class LogisticsA2AExecutor(AgentExecutor):
89+
"""
90+
Thin wrapper that forwards the raw JSON payload to a Semantic-Kernel
91+
agent which, in turn, calls the Logistics MCP tools.
92+
93+
The SK agent is created lazily on first use so we do not need an
94+
event-loop during __init__.
95+
"""
96+
97+
def __init__(self) -> None:
98+
self._agent: ChatCompletionAgent | None = None
99+
self._agent_lock = asyncio.Lock() # guards one-time initialisation
100+
101+
async def _get_agent(self) -> ChatCompletionAgent:
102+
if self._agent is None:
103+
async with self._agent_lock:
104+
if self._agent is None: # double-checked
105+
self._agent = await build_sk_logistics_agent()
106+
return self._agent
107+
108+
async def execute( # type: ignore[override]
109+
self,
110+
context: RequestContext,
111+
event_queue: EventQueue,
112+
) -> None:
113+
try:
114+
agent = await self._get_agent()
115+
query = context.get_user_input()
116+
task = context.current_task
117+
if not task:
118+
task = new_task(context.message)
119+
await event_queue.enqueue_event(task)
120+
print("received contextid", task.contextId)
121+
#get thread from session store
122+
thread = AGENT_STATE_STORE.get(task.contextId, {})
123+
# Retrieve user's raw JSON payload (1st text part)
124+
125+
# Forward request to the SK logistics agent
126+
if thread:
127+
response = await agent.get_response(messages=query, thread=thread)
128+
else:
129+
response = await agent.get_response(messages=query)
130+
response_content = str(response.content)
131+
# Update the thread in the session store
132+
AGENT_STATE_STORE[task.contextId] = response.thread if response.thread else {}
133+
134+
# Ensure the answer is valid JSON
135+
136+
await event_queue.enqueue_event(
137+
new_agent_text_message(response_content, task.contextId,
138+
task.id)
139+
)
140+
141+
except Exception as exc: # pragma: no cover
142+
logging.exception("LogisticsA2AExecutor error")
143+
event_queue.enqueue_event(
144+
new_agent_text_message(f"ERROR: {exc}")
145+
)
146+
147+
async def cancel( # type: ignore[override]
148+
self,
149+
context: RequestContext,
150+
event_queue: EventQueue,
151+
) -> None:
152+
event_queue.enqueue_event(
153+
new_agent_text_message("Cancellation not supported", is_final=True)
154+
)
155+
156+
157+
# ────────────────────────── Agent Card ───────────────────────
158+
skill = AgentSkill(
159+
id="return_pickup",
160+
name="Return pick-up scheduling",
161+
description="Provides slots, books, looks up or cancels product-return pick-ups.",
162+
tags=["logistics", "return"],
163+
)
164+
165+
PUBLIC_CARD = AgentCard(
166+
name="Contoso Logistics Agent",
167+
description="Cross-domain logistics service for product returns.",
168+
url="http://0.0.0.0:9100/",
169+
version="1.0.0",
170+
defaultInputModes=["text"],
171+
defaultOutputModes=["text"],
172+
capabilities=AgentCapabilities(streaming=True),
173+
skills=[skill],
174+
)
175+
176+
# ───────────────────────── Run server ────────────────────────
177+
def main() -> None:
178+
handler = DefaultRequestHandler(
179+
agent_executor=LogisticsA2AExecutor(), task_store=InMemoryTaskStore()
180+
)
181+
app = A2AStarletteApplication(agent_card=PUBLIC_CARD, http_handler=handler)
182+
uvicorn.run(app.build(), host="0.0.0.0", port=9100)
183+
184+
185+
if __name__ == "__main__":
186+
main()

0 commit comments

Comments
 (0)