Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 87 additions & 6 deletions docs/handler-authoring.md
Original file line number Diff line number Diff line change
Expand Up @@ -328,14 +328,95 @@ share one push-notif bucket across every unauthenticated caller. The
warning is the signal your auth middleware isn't populating the
ContextVar — treat it as a P0.

### Known gaps
### Per-skill middleware (audit, activity feeds, rate limiting, tracing)

Every A2A skill dispatch can be wrapped in a chain of middleware
callables. Pass them as `middleware=[...]` to `create_a2a_server` /
`serve` / `ADCPAgentExecutor` — first entry wraps outermost, matching
Starlette/ASGI ordering:

```python
from adcp.server import SkillMiddleware, ToolContext, serve

async def audit_middleware(
skill_name: str,
params: dict,
context: ToolContext,
call_next,
) -> Any:
started = time.monotonic()
try:
result = await call_next()
except Exception as exc:
audit_log.failure(skill_name, context.caller_identity, exc)
raise
audit_log.success(
skill_name,
context.caller_identity,
elapsed_ms=(time.monotonic() - started) * 1000,
)
return result

- Per-skill middleware hooks for audit logging / activity feeds don't
exist yet — tracked at
[#226](https://github.com/adcontextprotocol/adcp-client-python/issues/226).
serve(MyAgent(), transport="a2a", middleware=[audit_middleware])
```

**Semantics worth knowing:**

- **Composition — put audit outermost.** `middleware=[Audit(),
RateLimit(), Metrics()]` runs `Audit → RateLimit → Metrics →
handler` on the way in and unwinds in the opposite order. **If you
put rate-limiting before audit, rejected requests disappear from
your audit log** — often the most interesting events for security
review. Audit always outermost.
- **Short-circuit — cache keys MUST include principal + tenant.** A
middleware that returns without calling `call_next()` stops the
chain; its return value becomes the dispatch result. Rate limiters
/ feature flags use this. **Caching middleware that short-circuits
must key on `(skill_name, params, context.caller_identity,
context.tenant_id)`** — a cache keyed only on `skill_name + params`
serves principal A's data to principal B on a matching-params call.
- **Exception observation — never swallow an `ADCPError`.** Catch
around `await call_next()` to log failures. Re-raise to let the
executor's normal error path take over (`ADCPError` → failed task
with `adcp_error` DataPart; other exceptions → opaque failed task).
Swallowing an `ADCPError` (especially `IdempotencyConflictError` or
`ADCPTaskError`) and returning a fake-success dict silently converts
a rejected mutation into a "completed" task — double-billing,
double-allocation, duplicated side effects. Don't.
- **Exception messages end up in server logs.** Middleware-raised
exceptions flow through `logger.exception` in the executor before
client-facing sanitisation. Don't format `params` or
`context.caller_identity` into exception text — operators read those
logs.
- **Retry is supported.** Call `call_next()` more than once (e.g.
retry-on-transient-error middleware). Each call gets a fresh
inner chain — composition is re-entrant by design.
- **Transform on return, not on input.** `params` passed in is the
same dict every middleware sees. Mutating it doesn't change what
the next layer receives. Transforms happen on the *return* side by
modifying the value of `await call_next()`.
- **Context access**: the middleware sees the `ToolContext` produced
by the `context_factory` (or the a2a-sdk fallback). Tenant id,
caller identity, anything your factory populates. `ContextVar`s set
before `call_next()` propagate to the handler — no `asyncio.create_task`
needed.

**Security — middleware is a data processor for the full skill
payload.** `params` carries decoded buyer briefs, budgets, brand
refs, proposal text, PII in message parts. `context` carries
`caller_identity` + `tenant_id`. Installing a third-party middleware
(SaaS audit, observability vendor, bespoke tracing) hands that vendor
the complete skill surface. Treat it as a data processor under your
GDPR/CCPA controller-processor agreements.

MCP transport has its own middleware story (see "Pattern 2 —
in-process HTTP middleware" above); `SkillMiddleware` is A2A-only.

### Known gaps

Once #226 lands, A2A adoption reaches parity with MCP for production
agents.
All three Phase-2 A2A hooks (#224 TaskStore, #225 PushNotificationConfigStore,
#226 SkillMiddleware) have landed. A2A adoption now reaches parity with
MCP for production agents.

## Testing

Expand Down
2 changes: 2 additions & 0 deletions src/adcp/server/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ async def get_products(params, context=None):
from adcp.server.serve import (
ContextFactory,
RequestMetadata,
SkillMiddleware,
create_mcp_server,
serve,
)
Expand Down Expand Up @@ -153,6 +154,7 @@ async def get_products(params, context=None):
"validate_discovery_set",
# A2A integration
"ADCPAgentExecutor",
"SkillMiddleware",
"create_a2a_server",
# Idempotency middleware (AdCP #2315 seller side)
"IdempotencyStore",
Expand Down
64 changes: 61 additions & 3 deletions src/adcp/server/a2a_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,14 @@
from adcp.server.base import ADCPHandler, ToolContext

if TYPE_CHECKING:
from collections.abc import Sequence

from a2a.server.tasks.push_notification_config_store import (
PushNotificationConfigStore,
)
from a2a.server.tasks.task_store import TaskStore

from adcp.server.serve import ContextFactory
from adcp.server.serve import ContextFactory, SkillMiddleware
from adcp.server.helpers import STANDARD_ERROR_CODES
from adcp.server.mcp_tools import create_tool_caller, get_tools_for_handler
from adcp.server.test_controller import TestControllerStore, _handle_test_controller
Expand All @@ -69,9 +71,16 @@ def __init__(
test_controller: TestControllerStore | None = None,
*,
context_factory: ContextFactory | None = None,
middleware: Sequence[SkillMiddleware] | None = None,
) -> None:
self._handler = handler
self._context_factory = context_factory
# Store as a tuple so the executor can't be mutated from underneath
# at runtime (a flaky test or a handler reaching self._middleware
# can't corrupt the dispatch chain). Tuple ordering = runtime
# ordering; first entry wraps outermost (see ``SkillMiddleware``
# docstring for the composition semantics).
self._middleware: tuple[SkillMiddleware, ...] = tuple(middleware or ())
self._tool_callers: dict[str, Any] = {}

# Build tool callers for all tools this handler supports.
Expand Down Expand Up @@ -117,7 +126,7 @@ async def execute(self, context: RequestContext, event_queue: EventQueue) -> Non

tool_context = self._build_tool_context(skill_name, context)
try:
result = await self._tool_callers[skill_name](params, tool_context)
result = await self._dispatch_with_middleware(skill_name, params, tool_context)
await self._send_result(event_queue, context, skill_name, result)
except ADCPError as exc:
# Application-layer AdCP error (IdempotencyConflictError etc.).
Expand All @@ -131,6 +140,43 @@ async def execute(self, context: RequestContext, event_queue: EventQueue) -> Non
logger.exception("Error executing skill %s", skill_name)
await self._send_error(event_queue, context, f"Skill execution failed: {skill_name}")

async def _dispatch_with_middleware(
self,
skill_name: str,
params: dict[str, Any],
tool_context: ToolContext,
) -> Any:
"""Run the handler wrapped in the configured middleware chain.

Middleware composes outermost-first: the first entry in
``self._middleware`` sees every call *before* the later entries
and *before* the handler. This matches Starlette / ASGI
conventions so sellers porting from those stacks aren't
surprised. Composition is done via a small recursive dispatcher
(no mutable indices, no lambdas closing over loop variables) —
the chain reads the same whether you have zero or ten
middlewares.

Middleware exceptions propagate to the executor's normal error
handling path in ``execute()``; this method does no try/except
so short-circuiting, transform, and exception-observation all
work the same way they do for the underlying handler.
"""
if not self._middleware:
return await self._tool_callers[skill_name](params, tool_context)

async def _step(index: int) -> Any:
if index >= len(self._middleware):
return await self._tool_callers[skill_name](params, tool_context)
middleware = self._middleware[index]

async def call_next() -> Any:
return await _step(index + 1)

return await middleware(skill_name, params, tool_context, call_next)

return await _step(0)

def _build_tool_context(self, skill_name: str, request: RequestContext) -> ToolContext:
"""Build the :class:`ToolContext` handed to the skill dispatcher.

Expand Down Expand Up @@ -445,6 +491,7 @@ def create_a2a_server(
context_factory: ContextFactory | None = None,
task_store: TaskStore | None = None,
push_config_store: PushNotificationConfigStore | None = None,
middleware: Sequence[SkillMiddleware] | None = None,
) -> Any:
"""Create an A2A Starlette application from an ADCP handler.

Expand Down Expand Up @@ -492,6 +539,14 @@ def create_a2a_server(
(via a ``ContextVar`` your auth middleware populates) or by
composition with a tenant-scoped ``TaskStore`` — the reference
impl shows the ContextVar pattern.
middleware: Optional sequence of :data:`~adcp.server.SkillMiddleware`
callables wrapping every A2A skill dispatch. Composes
outermost-first (first entry sees the call before later
entries and before the handler). Use for audit logging,
activity-feed hooks, rate limiting, per-skill tracing. See
:data:`~adcp.server.SkillMiddleware` for the signature,
composition semantics, and the exception-capture pattern
audit hooks need.

Returns:
A Starlette app ready to be run with uvicorn.
Expand All @@ -501,7 +556,10 @@ def create_a2a_server(
resolved_port = port or int(os.environ.get("PORT", "3001"))

executor = ADCPAgentExecutor(
handler, test_controller=test_controller, context_factory=context_factory
handler,
test_controller=test_controller,
context_factory=context_factory,
middleware=middleware,
)

agent_card = _build_agent_card(
Expand Down
Loading
Loading