Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 60 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ pip install adcp
```python
from adcp import ADCPMultiAgentClient, AgentConfig, GetProductsRequest

# Configure agents and handlers
client = ADCPMultiAgentClient(
# Configure agents and handlers (context manager ensures proper cleanup)
async with ADCPMultiAgentClient(
agents=[
AgentConfig(
id="agent_x",
Expand All @@ -54,21 +54,21 @@ client = ADCPMultiAgentClient(
if metadata.status == "completed" else None
)
}
)

# Execute operation - library handles operation IDs, webhook URLs, context management
agent = client.agent("agent_x")
request = GetProductsRequest(brief="Coffee brands")
result = await agent.get_products(request)
) as client:
# Execute operation - library handles operation IDs, webhook URLs, context management
agent = client.agent("agent_x")
request = GetProductsRequest(brief="Coffee brands")
result = await agent.get_products(request)

# Check result
if result.status == "completed":
# Agent completed synchronously!
print(f"✅ Sync completion: {len(result.data.products)} products")
# Check result
if result.status == "completed":
# Agent completed synchronously!
print(f"✅ Sync completion: {len(result.data.products)} products")

if result.status == "submitted":
# Agent will send webhook when complete
print(f"⏳ Async - webhook registered at: {result.submitted.webhook_url}")
if result.status == "submitted":
# Agent will send webhook when complete
print(f"⏳ Async - webhook registered at: {result.submitted.webhook_url}")
# Connections automatically cleaned up here
```

## Features
Expand Down Expand Up @@ -173,6 +173,51 @@ Or use the CLI:
uvx adcp --debug myagent get_products '{"brief":"TV ads"}'
```

### Resource Management

**Why use async context managers?**
- Ensures HTTP connections are properly closed, preventing resource leaks
- Handles cleanup even when exceptions occur
- Required for production applications with connection pooling
- Prevents issues with async task group cleanup in MCP protocol

The recommended pattern uses async context managers:

```python
from adcp import ADCPClient, AgentConfig, GetProductsRequest

# Recommended: Automatic cleanup with context manager
config = AgentConfig(id="agent_x", agent_uri="https://...", protocol="a2a")
async with ADCPClient(config) as client:
request = GetProductsRequest(brief="Coffee brands")
result = await client.get_products(request)
# Connection automatically closed on exit

# Multi-agent client also supports context managers
async with ADCPMultiAgentClient(agents) as client:
# Execute across all agents in parallel
results = await client.get_products(request)
# All agent connections closed automatically (even if some failed)
```

Manual cleanup is available for special cases (e.g., managing client lifecycle manually):

```python
# Use manual cleanup when you need fine-grained control over lifecycle
client = ADCPClient(config)
try:
result = await client.get_products(request)
finally:
await client.close() # Explicit cleanup
```

**When to use manual cleanup:**
- Managing client lifecycle across multiple functions
- Testing scenarios requiring explicit control
- Integration with frameworks that manage resources differently

In most cases, prefer the context manager pattern.

### Error Handling

The library provides a comprehensive exception hierarchy with helpful error messages:
Expand Down
37 changes: 19 additions & 18 deletions examples/basic_usage.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,32 +23,33 @@ async def main():
auth_token="your-token-here", # Optional
)

# Create client
client = ADCPClient(
# Use context manager for automatic resource cleanup
async with ADCPClient(
config,
webhook_url_template="https://myapp.com/webhook/{task_type}/{agent_id}/{operation_id}",
on_activity=lambda activity: print(f"[{activity.type}] {activity.task_type}"),
)
) as client:
# Call get_products
print("Fetching products...")
result = await client.get_products(brief="Coffee brands targeting millennials")

# Call get_products
print("Fetching products...")
result = await client.get_products(brief="Coffee brands targeting millennials")
# Handle result
if result.status == "completed":
print(f"✅ Sync completion: Got {len(result.data.get('products', []))} products")
for product in result.data.get("products", []):
print(f" - {product.get('name')}: {product.get('description')}")

# Handle result
if result.status == "completed":
print(f"✅ Sync completion: Got {len(result.data.get('products', []))} products")
for product in result.data.get("products", []):
print(f" - {product.get('name')}: {product.get('description')}")
elif result.status == "submitted":
print(f"⏳ Async: Webhook will be sent to {result.submitted.webhook_url}")
print(f" Operation ID: {result.submitted.operation_id}")

elif result.status == "submitted":
print(f"⏳ Async: Webhook will be sent to {result.submitted.webhook_url}")
print(f" Operation ID: {result.submitted.operation_id}")
elif result.status == "needs_input":
print(f"❓ Agent needs clarification: {result.needs_input.message}")

elif result.status == "needs_input":
print(f"❓ Agent needs clarification: {result.needs_input.message}")
elif result.status == "failed":
print(f"❌ Error: {result.error}")

elif result.status == "failed":
print(f"❌ Error: {result.error}")
# Connection automatically closed here


if __name__ == "__main__":
Expand Down
39 changes: 20 additions & 19 deletions examples/multi_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ async def main():
),
]

# Create multi-agent client
client = ADCPMultiAgentClient(
# Use context manager for automatic resource cleanup
async with ADCPMultiAgentClient(
agents=agents,
webhook_url_template="https://myapp.com/webhook/{task_type}/{agent_id}/{operation_id}",
on_activity=lambda activity: print(
Expand All @@ -44,29 +44,30 @@ async def main():
handlers={
"on_get_products_status_change": handle_products_result,
},
)
) as client:
# Execute across all agents in parallel
print(f"Querying {len(agents)} agents in parallel...")
results = await client.get_products(brief="Coffee brands")

# Execute across all agents in parallel
print(f"Querying {len(agents)} agents in parallel...")
results = await client.get_products(brief="Coffee brands")
# Process results
sync_count = sum(1 for r in results if r.status == "completed")
async_count = sum(1 for r in results if r.status == "submitted")

# Process results
sync_count = sum(1 for r in results if r.status == "completed")
async_count = sum(1 for r in results if r.status == "submitted")
print(f"\n📊 Results:")
print(f" ✅ Sync completions: {sync_count}")
print(f" ⏳ Async (webhooks pending): {async_count}")

print(f"\n📊 Results:")
print(f" ✅ Sync completions: {sync_count}")
print(f" ⏳ Async (webhooks pending): {async_count}")
for i, result in enumerate(results):
agent_id = client.agent_ids[i]

for i, result in enumerate(results):
agent_id = client.agent_ids[i]
if result.status == "completed":
products = result.data.get("products", [])
print(f"\n{agent_id}: {len(products)} products (sync)")

if result.status == "completed":
products = result.data.get("products", [])
print(f"\n{agent_id}: {len(products)} products (sync)")
elif result.status == "submitted":
print(f"\n{agent_id}: webhook to {result.submitted.webhook_url}")

elif result.status == "submitted":
print(f"\n{agent_id}: webhook to {result.submitted.webhook_url}")
# All agent connections automatically closed here


def handle_products_result(response, metadata):
Expand Down
78 changes: 37 additions & 41 deletions src/adcp/protocols/mcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,39 @@ def __init__(self, *args: Any, **kwargs: Any):
self._session: Any = None
self._exit_stack: Any = None

async def _cleanup_failed_connection(self, context: str) -> None:
"""
Clean up resources after a failed connection attempt.

This method handles cleanup without raising exceptions to avoid
masking the original connection error.

Args:
context: Description of the context for logging (e.g., "during connection attempt")
"""
if self._exit_stack is not None:
old_stack = self._exit_stack
self._exit_stack = None
self._session = None
try:
await old_stack.aclose()
except asyncio.CancelledError:
logger.debug(f"MCP session cleanup cancelled {context}")
except RuntimeError as cleanup_error:
# Known anyio task group cleanup issue
error_msg = str(cleanup_error).lower()
if "cancel scope" in error_msg or "async context" in error_msg:
logger.debug(f"Ignoring anyio cleanup error {context}: {cleanup_error}")
else:
logger.warning(
f"Unexpected RuntimeError during cleanup {context}: {cleanup_error}"
)
except Exception as cleanup_error:
# Log unexpected cleanup errors but don't raise to preserve original error
logger.warning(
f"Unexpected error during cleanup {context}: {cleanup_error}", exc_info=True
)

async def _get_session(self) -> ClientSession:
"""
Get or create MCP client session with URL fallback handling.
Expand Down Expand Up @@ -115,35 +148,8 @@ async def _get_session(self) -> ClientSession:
return self._session # type: ignore[no-any-return]
except Exception as e:
last_error = e
# Clean up the exit stack on failure to avoid async scope issues
if self._exit_stack is not None:
old_stack = self._exit_stack
self._exit_stack = None # Clear immediately to prevent reuse
self._session = None
try:
await old_stack.aclose()
except asyncio.CancelledError:
# Expected during shutdown
pass
except RuntimeError as cleanup_error:
# Known MCP SDK async cleanup issue
if (
"async context" in str(cleanup_error).lower()
or "cancel scope" in str(cleanup_error).lower()
):
logger.debug(
"Ignoring MCP SDK async context error during cleanup: "
f"{cleanup_error}"
)
else:
logger.warning(
f"Unexpected RuntimeError during cleanup: {cleanup_error}"
)
except Exception as cleanup_error:
# Unexpected cleanup errors should be logged
logger.warning(
f"Unexpected error during cleanup: {cleanup_error}", exc_info=True
)
# Clean up the exit stack on failure to avoid resource leaks
await self._cleanup_failed_connection("during connection attempt")

# If this isn't the last URL to try, create a new exit stack and continue
if url != urls_to_try[-1]:
Expand Down Expand Up @@ -352,15 +358,5 @@ async def list_tools(self) -> list[str]:
return [tool.name for tool in result.tools]

async def close(self) -> None:
"""Close the MCP session."""
if self._exit_stack is not None:
old_stack = self._exit_stack
self._exit_stack = None
self._session = None
try:
await old_stack.aclose()
except (asyncio.CancelledError, RuntimeError):
# Cleanup errors during shutdown are expected
pass
except Exception as e:
logger.debug(f"Error during MCP session cleanup: {e}")
"""Close the MCP session and clean up resources."""
await self._cleanup_failed_connection("during close")
7 changes: 3 additions & 4 deletions src/adcp/types/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from enum import Enum
from typing import Any, Generic, Literal, TypeVar

from pydantic import BaseModel, Field, field_validator
from pydantic import BaseModel, ConfigDict, Field, field_validator


class Protocol(str, Enum):
Expand Down Expand Up @@ -125,6 +125,8 @@ class DebugInfo(BaseModel):
class TaskResult(BaseModel, Generic[T]):
"""Result from task execution."""

model_config = ConfigDict(arbitrary_types_allowed=True)

status: TaskStatus
data: T | None = None
message: str | None = None # Human-readable message from agent (e.g., MCP content text)
Expand All @@ -135,9 +137,6 @@ class TaskResult(BaseModel, Generic[T]):
metadata: dict[str, Any] | None = None
debug_info: DebugInfo | None = None

class Config:
arbitrary_types_allowed = True


class ActivityType(str, Enum):
"""Types of activity events."""
Expand Down
Loading