Skip to content

Commit c04adeb

Browse files
author
Heena Ugale
committed
Merge branch 'main' into heena-dev2
Bringing local main changes into heena-dev2 so they can be pushed upstream.
2 parents cc954ee + a9d5086 commit c04adeb

20 files changed

+1474
-143
lines changed

FETCH_HEAD

Whitespace-only changes.

README.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ Welcome to the official repository for the Microsoft AI Agentic Workshop! This r
3333

3434
- **Configurable LLM Backend:** Use the latest Azure OpenAI GPT models (e.g., GPT-4.1, GPT-4o).
3535
- **MCP Server Integration:** Advanced tools to enhance agent orchestration and capabilities.
36+
- **A2A (Agent-to-Agent) Protocol Support:** Enables strict cross-domain, black-box multi-agent collaboration using [Google's A2A protocol](https://github.com/google-a2a/A2A).
3637
- **Flexible Agent Architecture:**
3738
- Supports single-agent, multi-agent, or reflection-based agents (selectable via `.env`).
3839
- Agents can self-loop, collaborate, reflect, or take on dynamic roles as defined in modules.
@@ -53,6 +54,15 @@ Welcome to the official repository for the Microsoft AI Agentic Workshop! This r
5354

5455
---
5556

57+
## 🆕 A2A (Agent-to-Agent) Cross-Domain Demo
58+
59+
This repository now supports a strict cross-domain multi-agent scenario using the A2A protocol, enabling message-driven, black-box collaboration between a customer-service agent and a logistics agent.
60+
61+
**A2A Example Included:**
62+
See [`agentic_ai/agents/semantic_kernel/multi_agent/a2a`](agentic_ai/agents/semantic_kernel/multi_agent/a2a).
63+
64+
---
65+
5666
## Contributing
5767

5868
Please review our [Code of Conduct](./CODE_OF_CONDUCT.md) and [Security Guidelines](./SECURITY.md) before contributing.

agentic_ai/agents/autogen/multi_agent/handoff_multi_domain_agent.py

Lines changed: 250 additions & 127 deletions
Large diffs are not rendered by default.

agentic_ai/agents/base_agent.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ def __init__(self, state_store: Dict[str, Any], session_id: str) -> None:
1717
self.azure_openai_key = os.getenv("AZURE_OPENAI_API_KEY")
1818
self.azure_openai_endpoint = os.getenv("AZURE_OPENAI_ENDPOINT")
1919
self.api_version = os.getenv("AZURE_OPENAI_API_VERSION")
20-
self.mcp_server_uri = os.getenv("MCP_SERVER_URI")
20+
self.mcp_server_uri = os.getenv("MCP_SERVER_URI")
2121
self.openai_model_name = os.getenv("OPENAI_MODEL_NAME")
2222

2323
self.session_id = session_id
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
# Cross-Domain Return-Pick-up Scheduling (A2A)
2+
3+
This A2A implementation demonstrates inter-domain communication between agents in different domains. Unlike inter-agent communication within a single domain or application—where participating agents typically have full transparency into each other’s details—cross-domain agent communication enforces strict modularity and abstraction. In cross-domain scenarios, the logic and implementation of each agent system are hidden from one another, and only high-level structured information is exchanged. This approach aligns with Google’s Agent-to-Agent (A2A) protocol principles.
4+
5+
### Scenario: Cross-Domain Return Pickup Scheduling
6+
7+
In this implementation, an agent within the Contoso Customer Service AI team collaborates with a Logistics Agent to arrange a product return pickup. After verifying the return eligibility, the Customer Service Agent initiates a multi-turn negotiation with the Logistics Agent to schedule a pickup at the customer's address. The process includes:
8+
9+
- The Customer Service Agent requesting available pickup slots from the Logistics Agent.
10+
- The Logistics Agent responding with a list of available date/time options.
11+
- The Customer Service Agent presenting these options to the customer and collecting a preferred slot.
12+
- The Customer Service Agent confirming the selected slot with the Logistics Agent, who in turn confirms logistics with the carrier and finalizes the arrangement.
13+
- Each communication is handled using high-level, schema-driven A2A messages, with neither agent exposing its internal logic, system details, or direct access to underlying services.
14+
15+
---
16+
17+
#### Mermaid Flow Diagram
18+
19+
```mermaid
20+
sequenceDiagram
21+
actor Customer
22+
participant CSAgent as Customer Service Agent
23+
participant LogAgent as Logistics Agent
24+
25+
Customer->>CSAgent: Request return for Order #85
26+
CSAgent->>Customer: Verifies eligibility, explains process
27+
CSAgent->>LogAgent: PickupAvailabilityRequest (address, preferences)
28+
LogAgent-->>CSAgent: PickupAvailabilityResponse (list of slots)
29+
CSAgent->>Customer: Presents pickup options
30+
Customer->>CSAgent: Chooses preferred slot
31+
CSAgent->>LogAgent: PickupRequestConfirmation (selected slot)
32+
LogAgent-->>CSAgent: PickupScheduledConfirmation (confirmation details)
33+
CSAgent->>Customer: Confirms pickup details, provides instructions
34+
```
35+
36+
## Running the A2A Demo End-to-End
37+
38+
The repo ships three Python modules:
39+
40+
| File | Purpose |
41+
|---------------------------|-----------------------------------------------------------|
42+
| `logistic_mcp.py` | Internal Logistics **MCP** service (tools & DB) |
43+
| `logistic_a2a_server.py` | Thin **A2A façade** that wraps the MCP service |
44+
| `multi_agent_a2a.py` | Contoso **multi-agent** customer-service application |
45+
46+
---
47+
48+
### 1. Install Dependencies
49+
50+
```bash
51+
pip install -r requirements.txt
52+
# or manually:
53+
pip install a2a-sdk semantic-kernel uvicorn httpx python-dotenv
54+
```
55+
### 2. Prepare your .env
56+
57+
Create or edit .env in the `agentic_ai\applications` folder:
58+
59+
```env
60+
# ─── Contoso customer-service app ───────────────────────────────
61+
AGENT_MODULE="agents.semantic_kernel.multi_agent.a2a.multi_agent_a2a"
62+
63+
# ─── End-points used by the agents ──────────────────────────────
64+
LOGISTIC_MCP_SERVER_URI="http://localhost:8100/sse" # internal Fast-MCP
65+
LOGISTICS_A2A_URL="http://localhost:9100" # A2A wrapper
66+
```
67+
68+
Add your usual AZURE_OPENAI_* settings if you have not done so already.
69+
70+
---
71+
72+
### 3. Start the Back-End Services (Two Terminals)
73+
74+
```bash
75+
# Terminal ① – internal Logistics MCP
76+
python logistic_mcp.py # listens on :8100/sse
77+
78+
# Terminal ② – A2A façade
79+
python logistic_a2a_server.py # listens on :9100 (serves /.well-known/agent.json)
80+
```
81+
82+
---
83+
84+
### 4. Launch the Contoso Multi-Agent App under `agentic_ai\applications`
85+
86+
```bash
87+
./run_application.sh
88+
89+
```
90+
91+
The CS agent will now:
92+
93+
1. Verify product-return eligibility via the Contoso MCP tools.
94+
2. Talk to the Logistics agent through the **single free-text tool** exposed by the A2A server (no JSON payloads needed).
95+
3. Keep `taskId` and `contextId` in its session state so subsequent calls continue the same conversation on the Logistics side.
Binary file not shown.
Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
"""
2+
Contoso – Logistics A2A façade
3+
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4+
Bridges the internal Fast-MCP logistics tools into the Google A2A
5+
protocol. All business logic continues to live in the MCP service; this
6+
wrapper merely acts as a protocol translator.
7+
8+
• Listens on http://0.0.0.0:9100/
9+
• Exposes one skill: return-pick-up scheduling
10+
• Streams a single final message per request
11+
"""
12+
from __future__ import annotations
13+
14+
import asyncio
15+
import json
16+
import logging
17+
import os
18+
from typing import Any
19+
20+
import uvicorn
21+
22+
from a2a.server.agent_execution import AgentExecutor,RequestContext
23+
from a2a.server.apps import A2AStarletteApplication
24+
from a2a.server.request_handlers import DefaultRequestHandler
25+
from a2a.server.tasks import InMemoryTaskStore
26+
from a2a.server.events import EventQueue
27+
from a2a.types import (
28+
AgentCapabilities,
29+
AgentCard,
30+
AgentSkill,
31+
Message,
32+
)
33+
from a2a.utils import new_agent_text_message, new_task
34+
from semantic_kernel.agents import ChatCompletionAgent
35+
from semantic_kernel.connectors.ai.open_ai import AzureChatCompletion
36+
from semantic_kernel.connectors.mcp import MCPSsePlugin
37+
from typing import Any, Dict, List, Optional
38+
39+
from dotenv import load_dotenv
40+
# ──────────────────────── Load environment variables ───────────
41+
load_dotenv()
42+
43+
# ───────────────────────── Logging ──────────────────────────
44+
logging.basicConfig(level=logging.INFO)
45+
log = logging.getLogger("logistics-a2a")
46+
47+
# ──────────────────────── Agent State Store ────────────────────────────
48+
AGENT_STATE_STORE: Dict[str, Any] = {}
49+
50+
# ───────────────────────── Environment ──────────────────────
51+
MCP_URI = os.getenv("LOGISTIC_MCP_SERVER_URI", "http://localhost:8100/sse")
52+
AZ_DEPLOYMENT = os.getenv("AZURE_OPENAI_CHAT_DEPLOYMENT")
53+
54+
# ─────────────────────── Build SK Logistics agent ───────────
55+
async def build_sk_logistics_agent() -> ChatCompletionAgent:
56+
"""
57+
Creates the Semantic-Kernel ChatCompletionAgent and opens the SSE
58+
connection to the Fast-MCP server.
59+
"""
60+
logistic_plugin = MCPSsePlugin(
61+
name="LogisticMCP",
62+
description="Logistics MCP plugin",
63+
url=MCP_URI,
64+
headers={"Content-Type": "application/json"},
65+
timeout=30,
66+
)
67+
await logistic_plugin.connect()
68+
69+
instructions = (
70+
"You are the Logistics AI agent responsible for arranging product-return "
71+
"pick-ups."
72+
"Supported request types:\n"
73+
" • availability_request: requireing pickup address and preferred data range\n"
74+
" • schedule_pickup: need order_id, address and timeslot\n"
75+
" • cancel_request\n." \
76+
77+
)
78+
79+
agent = ChatCompletionAgent(
80+
name="logistics_sk_agent",
81+
service=AzureChatCompletion(deployment_name=AZ_DEPLOYMENT),
82+
instructions=instructions,
83+
plugins=[logistic_plugin],
84+
)
85+
return agent
86+
87+
88+
# ──────────────────────── Agent Executor ─────────────────────
89+
class LogisticsA2AExecutor(AgentExecutor):
90+
"""
91+
Thin wrapper that forwards the raw JSON payload to a Semantic-Kernel
92+
agent which, in turn, calls the Logistics MCP tools.
93+
94+
The SK agent is created lazily on first use so we do not need an
95+
event-loop during __init__.
96+
"""
97+
98+
def __init__(self) -> None:
99+
self._agent: ChatCompletionAgent | None = None
100+
self._agent_lock = asyncio.Lock() # guards one-time initialisation
101+
102+
async def _get_agent(self) -> ChatCompletionAgent:
103+
if self._agent is None:
104+
async with self._agent_lock:
105+
if self._agent is None: # double-checked
106+
self._agent = await build_sk_logistics_agent()
107+
return self._agent
108+
109+
async def execute( # type: ignore[override]
110+
self,
111+
context: RequestContext,
112+
event_queue: EventQueue,
113+
) -> None:
114+
try:
115+
agent = await self._get_agent()
116+
query = context.get_user_input()
117+
print(f"Received query: {query}")
118+
task = context.current_task
119+
if not task:
120+
task = new_task(context.message)
121+
await event_queue.enqueue_event(task)
122+
#get thread from session store
123+
thread = AGENT_STATE_STORE.get(task.contextId, {})
124+
# Retrieve user's raw JSON payload (1st text part)
125+
126+
# Forward request to the SK logistics agent
127+
if thread:
128+
response = await agent.get_response(messages=query, thread=thread)
129+
else:
130+
response = await agent.get_response(messages=query)
131+
response_content = str(response.content)
132+
print(f"Response content: {response_content}")
133+
# Update the thread in the session store
134+
AGENT_STATE_STORE[task.contextId] = response.thread if response.thread else {}
135+
136+
# Ensure the answer is valid JSON
137+
138+
await event_queue.enqueue_event(
139+
new_agent_text_message(response_content, task.contextId,
140+
task.id)
141+
)
142+
143+
except Exception as exc: # pragma: no cover
144+
logging.exception("LogisticsA2AExecutor error")
145+
event_queue.enqueue_event(
146+
new_agent_text_message(f"ERROR: {exc}")
147+
)
148+
149+
async def cancel( # type: ignore[override]
150+
self,
151+
context: RequestContext,
152+
event_queue: EventQueue,
153+
) -> None:
154+
event_queue.enqueue_event(
155+
new_agent_text_message("Cancellation not supported", is_final=True)
156+
)
157+
158+
159+
# ────────────────────────── Agent Card ───────────────────────
160+
skill = AgentSkill(
161+
id="return_pickup",
162+
name="Return pick-up scheduling",
163+
description="Provides slots, books, looks up or cancels product-return pick-ups.",
164+
tags=["logistics", "return"],
165+
)
166+
167+
PUBLIC_CARD = AgentCard(
168+
name="Contoso Logistics Agent",
169+
description="Cross-domain logistics service for product returns.",
170+
url="http://0.0.0.0:9100/",
171+
version="1.0.0",
172+
defaultInputModes=["text"],
173+
defaultOutputModes=["text"],
174+
capabilities=AgentCapabilities(streaming=True),
175+
skills=[skill],
176+
)
177+
178+
# ───────────────────────── Run server ────────────────────────
179+
def main() -> None:
180+
handler = DefaultRequestHandler(
181+
agent_executor=LogisticsA2AExecutor(), task_store=InMemoryTaskStore()
182+
)
183+
app = A2AStarletteApplication(agent_card=PUBLIC_CARD, http_handler=handler)
184+
uvicorn.run(app.build(), host="0.0.0.0", port=9100)
185+
186+
187+
if __name__ == "__main__":
188+
main()

0 commit comments

Comments
 (0)