diff --git a/docs/handler-authoring.md b/docs/handler-authoring.md index 9e4399d0..bb745636 100644 --- a/docs/handler-authoring.md +++ b/docs/handler-authoring.md @@ -328,14 +328,95 @@ share one push-notif bucket across every unauthenticated caller. The warning is the signal your auth middleware isn't populating the ContextVar — treat it as a P0. -### Known gaps +### Per-skill middleware (audit, activity feeds, rate limiting, tracing) + +Every A2A skill dispatch can be wrapped in a chain of middleware +callables. Pass them as `middleware=[...]` to `create_a2a_server` / +`serve` / `ADCPAgentExecutor` — first entry wraps outermost, matching +Starlette/ASGI ordering: + +```python +from adcp.server import SkillMiddleware, ToolContext, serve + +async def audit_middleware( + skill_name: str, + params: dict, + context: ToolContext, + call_next, +) -> Any: + started = time.monotonic() + try: + result = await call_next() + except Exception as exc: + audit_log.failure(skill_name, context.caller_identity, exc) + raise + audit_log.success( + skill_name, + context.caller_identity, + elapsed_ms=(time.monotonic() - started) * 1000, + ) + return result -- Per-skill middleware hooks for audit logging / activity feeds don't - exist yet — tracked at - [#226](https://github.com/adcontextprotocol/adcp-client-python/issues/226). +serve(MyAgent(), transport="a2a", middleware=[audit_middleware]) +``` + +**Semantics worth knowing:** + +- **Composition — put audit outermost.** `middleware=[Audit(), + RateLimit(), Metrics()]` runs `Audit → RateLimit → Metrics → + handler` on the way in and unwinds in the opposite order. **If you + put rate-limiting before audit, rejected requests disappear from + your audit log** — often the most interesting events for security + review. Audit always outermost. +- **Short-circuit — cache keys MUST include principal + tenant.** A + middleware that returns without calling `call_next()` stops the + chain; its return value becomes the dispatch result. Rate limiters + / feature flags use this. **Caching middleware that short-circuits + must key on `(skill_name, params, context.caller_identity, + context.tenant_id)`** — a cache keyed only on `skill_name + params` + serves principal A's data to principal B on a matching-params call. +- **Exception observation — never swallow an `ADCPError`.** Catch + around `await call_next()` to log failures. Re-raise to let the + executor's normal error path take over (`ADCPError` → failed task + with `adcp_error` DataPart; other exceptions → opaque failed task). + Swallowing an `ADCPError` (especially `IdempotencyConflictError` or + `ADCPTaskError`) and returning a fake-success dict silently converts + a rejected mutation into a "completed" task — double-billing, + double-allocation, duplicated side effects. Don't. +- **Exception messages end up in server logs.** Middleware-raised + exceptions flow through `logger.exception` in the executor before + client-facing sanitisation. Don't format `params` or + `context.caller_identity` into exception text — operators read those + logs. +- **Retry is supported.** Call `call_next()` more than once (e.g. + retry-on-transient-error middleware). Each call gets a fresh + inner chain — composition is re-entrant by design. +- **Transform on return, not on input.** `params` passed in is the + same dict every middleware sees. Mutating it doesn't change what + the next layer receives. Transforms happen on the *return* side by + modifying the value of `await call_next()`. +- **Context access**: the middleware sees the `ToolContext` produced + by the `context_factory` (or the a2a-sdk fallback). Tenant id, + caller identity, anything your factory populates. `ContextVar`s set + before `call_next()` propagate to the handler — no `asyncio.create_task` + needed. + +**Security — middleware is a data processor for the full skill +payload.** `params` carries decoded buyer briefs, budgets, brand +refs, proposal text, PII in message parts. `context` carries +`caller_identity` + `tenant_id`. Installing a third-party middleware +(SaaS audit, observability vendor, bespoke tracing) hands that vendor +the complete skill surface. Treat it as a data processor under your +GDPR/CCPA controller-processor agreements. + +MCP transport has its own middleware story (see "Pattern 2 — +in-process HTTP middleware" above); `SkillMiddleware` is A2A-only. + +### Known gaps -Once #226 lands, A2A adoption reaches parity with MCP for production -agents. +All three Phase-2 A2A hooks (#224 TaskStore, #225 PushNotificationConfigStore, +#226 SkillMiddleware) have landed. A2A adoption now reaches parity with +MCP for production agents. ## Testing diff --git a/src/adcp/server/__init__.py b/src/adcp/server/__init__.py index c3563297..a35f868e 100644 --- a/src/adcp/server/__init__.py +++ b/src/adcp/server/__init__.py @@ -112,6 +112,7 @@ async def get_products(params, context=None): from adcp.server.serve import ( ContextFactory, RequestMetadata, + SkillMiddleware, create_mcp_server, serve, ) @@ -153,6 +154,7 @@ async def get_products(params, context=None): "validate_discovery_set", # A2A integration "ADCPAgentExecutor", + "SkillMiddleware", "create_a2a_server", # Idempotency middleware (AdCP #2315 seller side) "IdempotencyStore", diff --git a/src/adcp/server/a2a_server.py b/src/adcp/server/a2a_server.py index 2e58d6a2..36d24631 100644 --- a/src/adcp/server/a2a_server.py +++ b/src/adcp/server/a2a_server.py @@ -39,12 +39,14 @@ from adcp.server.base import ADCPHandler, ToolContext if TYPE_CHECKING: + from collections.abc import Sequence + from a2a.server.tasks.push_notification_config_store import ( PushNotificationConfigStore, ) from a2a.server.tasks.task_store import TaskStore - from adcp.server.serve import ContextFactory + from adcp.server.serve import ContextFactory, SkillMiddleware from adcp.server.helpers import STANDARD_ERROR_CODES from adcp.server.mcp_tools import create_tool_caller, get_tools_for_handler from adcp.server.test_controller import TestControllerStore, _handle_test_controller @@ -69,9 +71,16 @@ def __init__( test_controller: TestControllerStore | None = None, *, context_factory: ContextFactory | None = None, + middleware: Sequence[SkillMiddleware] | None = None, ) -> None: self._handler = handler self._context_factory = context_factory + # Store as a tuple so the executor can't be mutated from underneath + # at runtime (a flaky test or a handler reaching self._middleware + # can't corrupt the dispatch chain). Tuple ordering = runtime + # ordering; first entry wraps outermost (see ``SkillMiddleware`` + # docstring for the composition semantics). + self._middleware: tuple[SkillMiddleware, ...] = tuple(middleware or ()) self._tool_callers: dict[str, Any] = {} # Build tool callers for all tools this handler supports. @@ -117,7 +126,7 @@ async def execute(self, context: RequestContext, event_queue: EventQueue) -> Non tool_context = self._build_tool_context(skill_name, context) try: - result = await self._tool_callers[skill_name](params, tool_context) + result = await self._dispatch_with_middleware(skill_name, params, tool_context) await self._send_result(event_queue, context, skill_name, result) except ADCPError as exc: # Application-layer AdCP error (IdempotencyConflictError etc.). @@ -131,6 +140,43 @@ async def execute(self, context: RequestContext, event_queue: EventQueue) -> Non logger.exception("Error executing skill %s", skill_name) await self._send_error(event_queue, context, f"Skill execution failed: {skill_name}") + async def _dispatch_with_middleware( + self, + skill_name: str, + params: dict[str, Any], + tool_context: ToolContext, + ) -> Any: + """Run the handler wrapped in the configured middleware chain. + + Middleware composes outermost-first: the first entry in + ``self._middleware`` sees every call *before* the later entries + and *before* the handler. This matches Starlette / ASGI + conventions so sellers porting from those stacks aren't + surprised. Composition is done via a small recursive dispatcher + (no mutable indices, no lambdas closing over loop variables) — + the chain reads the same whether you have zero or ten + middlewares. + + Middleware exceptions propagate to the executor's normal error + handling path in ``execute()``; this method does no try/except + so short-circuiting, transform, and exception-observation all + work the same way they do for the underlying handler. + """ + if not self._middleware: + return await self._tool_callers[skill_name](params, tool_context) + + async def _step(index: int) -> Any: + if index >= len(self._middleware): + return await self._tool_callers[skill_name](params, tool_context) + middleware = self._middleware[index] + + async def call_next() -> Any: + return await _step(index + 1) + + return await middleware(skill_name, params, tool_context, call_next) + + return await _step(0) + def _build_tool_context(self, skill_name: str, request: RequestContext) -> ToolContext: """Build the :class:`ToolContext` handed to the skill dispatcher. @@ -445,6 +491,7 @@ def create_a2a_server( context_factory: ContextFactory | None = None, task_store: TaskStore | None = None, push_config_store: PushNotificationConfigStore | None = None, + middleware: Sequence[SkillMiddleware] | None = None, ) -> Any: """Create an A2A Starlette application from an ADCP handler. @@ -492,6 +539,14 @@ def create_a2a_server( (via a ``ContextVar`` your auth middleware populates) or by composition with a tenant-scoped ``TaskStore`` — the reference impl shows the ContextVar pattern. + middleware: Optional sequence of :data:`~adcp.server.SkillMiddleware` + callables wrapping every A2A skill dispatch. Composes + outermost-first (first entry sees the call before later + entries and before the handler). Use for audit logging, + activity-feed hooks, rate limiting, per-skill tracing. See + :data:`~adcp.server.SkillMiddleware` for the signature, + composition semantics, and the exception-capture pattern + audit hooks need. Returns: A Starlette app ready to be run with uvicorn. @@ -501,7 +556,10 @@ def create_a2a_server( resolved_port = port or int(os.environ.get("PORT", "3001")) executor = ADCPAgentExecutor( - handler, test_controller=test_controller, context_factory=context_factory + handler, + test_controller=test_controller, + context_factory=context_factory, + middleware=middleware, ) agent_card = _build_agent_card( diff --git a/src/adcp/server/serve.py b/src/adcp/server/serve.py index c49a546d..de336f71 100644 --- a/src/adcp/server/serve.py +++ b/src/adcp/server/serve.py @@ -19,7 +19,7 @@ async def get_adcp_capabilities(self, params, context=None): from __future__ import annotations import os -from collections.abc import Callable +from collections.abc import Awaitable, Callable from dataclasses import dataclass from typing import TYPE_CHECKING, Any, Literal @@ -27,6 +27,8 @@ async def get_adcp_capabilities(self, params, context=None): from adcp.server.mcp_tools import create_tool_caller, get_tools_for_handler if TYPE_CHECKING: + from collections.abc import Sequence + from a2a.server.tasks.push_notification_config_store import ( PushNotificationConfigStore, ) @@ -61,6 +63,118 @@ class RequestMetadata: request_id: str | None = None +SkillMiddleware = Callable[ + [str, dict[str, Any], ToolContext, Callable[[], Awaitable[Any]]], + Awaitable[Any], +] +"""Middleware that wraps A2A skill dispatch — the audit / activity-feed / +rate-limiter / tracing hook for the A2A transport. + +Signature (conceptually a Protocol; declared as a ``Callable`` alias so +it's importable and consistent with ``ContextFactory``):: + + async def middleware( + skill_name: str, + params: dict[str, Any], + context: ToolContext, + call_next: Callable[[], Awaitable[Any]], + ) -> Any: + ... + +Middleware wraps ``call_next()`` — call it (possibly more than once to +implement retry, or never to short-circuit) to invoke the rest of the +chain plus the underlying handler. Anything the middleware returns +becomes the dispatch result the A2A transport serialises back to the +client, so middleware can short-circuit (skip the handler entirely) or +transform the result on the return side. + +Middleware observes both success and failure — catch exceptions around +``call_next()`` to implement audit-on-failure or retry-classifier hooks. +Middleware re-raising propagates to the executor's normal error path +(application ``ADCPError`` → failed task w/ ``adcp_error`` DataPart; +other exceptions → opaque failed task per the spec's error-sanitisation +rule). **Swallowing an exception and returning a substitute result is +allowed but almost always wrong** — in particular, swallowing +``ADCPError`` subclasses (``IdempotencyConflictError``, +``ADCPTaskError``) serves a fake success for a failed mutation, which +double-bills / double-allocates in production. + +``params`` is the parsed request dict passed to every middleware in +the chain and to the handler. Middleware cannot mutate what the next +layer sees by mutating ``params`` — transforms happen on the return +side only, by modifying the value returned from ``call_next()``. + +Multiple middlewares compose outermost-first, matching Starlette/ASGI +semantics — if you pass ``middleware=[Audit(), RateLimit(), Metrics()]``, +the runtime order is:: + + Audit.__call__ → RateLimit.__call__ → Metrics.__call__ → handler + +**Put audit outermost.** Middleware that short-circuits (rate limiter, +feature-flag gate) never calls ``call_next()``, so anything deeper in +the chain never sees the request. If your audit middleware sits +*after* the rate limiter, rejected calls disappear from the audit +trail — often the most interesting events for security review. + +``call_next()`` runs in the same asyncio task as the middleware that +invoked it, so ``ContextVar`` values set before the call are visible +to downstream middleware and the handler. Don't ``asyncio.create_task`` +your way around this unless you need the isolation. + +**Security — middleware is a data processor for the full skill payload.** +``params`` is decoded business content (buyer briefs, budgets, brand +references, proposal text, PII in message parts). ``context`` carries +``caller_identity``, ``tenant_id``, and anything your ``context_factory`` +populates. Installing a third-party middleware (observability vendor, +SaaS audit pipeline, external tracing) hands that vendor the complete +skill payload surface — treat it as a data processor under your +GDPR/CCPA controller-processor relationships and review the blast +radius before wiring vendors here. + +**Security — do not format ``params`` or ``context.caller_identity`` +into exception messages.** Middleware-raised exceptions pass through +``logger.exception`` in the executor (server-side trace with the raw +message) before the executor's sanitisation kicks in for the client +response. Exception text ends up in operator logs verbatim; keep it +opaque. + +**Security — short-circuit caches MUST include principal + tenant in +the cache key.** A middleware that caches on ``skill_name + params`` +alone and returns a cached result without calling ``call_next()`` +will serve principal A's data to principal B on a matching-params +call. Key on ``(skill_name, params, context.caller_identity, +context.tenant_id)``. + +Example — audit logging with exception capture:: + + from adcp.server import SkillMiddleware, ToolContext + + async def audit_middleware( + skill_name: str, + params: dict[str, Any], + context: ToolContext, + call_next: Callable[[], Awaitable[Any]], + ) -> Any: + started_at = time.monotonic() + try: + result = await call_next() + except Exception as exc: + # Keep exception text opaque — this ends up in server logs. + audit_log.failure( + skill_name, context.caller_identity, type(exc).__name__ + ) + raise + audit_log.success( + skill_name, + context.caller_identity, + elapsed_ms=(time.monotonic() - started_at) * 1000, + ) + return result + + create_a2a_server(MyAgent(), middleware=[audit_middleware]) +""" + + ContextFactory = Callable[[RequestMetadata], ToolContext] """Factory invoked per tool call to build a :class:`ToolContext`. @@ -112,6 +226,7 @@ def serve( context_factory: ContextFactory | None = None, task_store: TaskStore | None = None, push_config_store: PushNotificationConfigStore | None = None, + middleware: Sequence[SkillMiddleware] | None = None, ) -> None: """Start an MCP or A2A server from an ADCP handler or server builder. @@ -140,6 +255,12 @@ def serve( ``UnsupportedOperationError`` — clients cannot register subscriptions at all. See ``examples/a2a_db_tasks.py`` for a durable reference implementation. + middleware: Optional sequence of :data:`SkillMiddleware` callables + wrapping every A2A skill dispatch (A2A transport only). Use + for audit logging, activity-feed hooks, rate limiting, + tracing. Composes outermost-first. See + :data:`SkillMiddleware` for the signature and composition + semantics. Security: This function does NOT configure authentication. In production, @@ -187,6 +308,7 @@ async def force_account_status(self, account_id, status): context_factory=context_factory, task_store=task_store, push_config_store=push_config_store, + middleware=middleware, ) elif transport in ("streamable-http", "sse", "stdio"): _serve_mcp( @@ -315,6 +437,7 @@ def _serve_a2a( context_factory: ContextFactory | None = None, task_store: TaskStore | None = None, push_config_store: PushNotificationConfigStore | None = None, + middleware: Sequence[SkillMiddleware] | None = None, ) -> None: """Start an A2A server using uvicorn.""" import uvicorn @@ -331,6 +454,7 @@ def _serve_a2a( context_factory=context_factory, task_store=task_store, push_config_store=push_config_store, + middleware=middleware, ) sock = _bind_reusable_socket("0.0.0.0", resolved_port) try: diff --git a/tests/test_a2a_server.py b/tests/test_a2a_server.py index 72ba70ff..2bb81e27 100644 --- a/tests/test_a2a_server.py +++ b/tests/test_a2a_server.py @@ -866,3 +866,260 @@ async def test_sqlite_push_config_store_synthesises_config_id_when_omitted(): "fallback config_id must synthesise a unique value to prevent " "silent overwrite." ) + + +# --------------------------------------------------------------------------- +# Per-skill middleware hook (issue #226) +# --------------------------------------------------------------------------- + + +async def test_middleware_runs_and_sees_skill_context_and_result(): + """Single middleware observes the skill name, params, ToolContext, + and the handler's return value. This is the audit/activity-feed + happy path that closes #226.""" + from adcp.server import SkillMiddleware # noqa: F401 (type import) + + observed: list[dict[str, Any]] = [] + + async def audit_middleware( + skill_name: str, + params: dict[str, Any], + context: Any, + call_next: Any, + ) -> Any: + observed.append( + { + "phase": "before", + "skill_name": skill_name, + "params": params, + "caller_identity": getattr(context, "caller_identity", None), + } + ) + result = await call_next() + observed.append({"phase": "after", "skill_name": skill_name, "result": result}) + return result + + executor = ADCPAgentExecutor(_TestHandler(), middleware=[audit_middleware]) + ctx = RequestContext( + request=MessageSendParams(message=_make_datapart_msg("get_products", {"brief": "coffee"})) + ) + queue = EventQueue() + await executor.execute(ctx, queue) + + assert len(observed) == 2, f"expected before+after, got {observed}" + assert observed[0]["phase"] == "before" + assert observed[0]["skill_name"] == "get_products" + assert observed[0]["params"] == {"brief": "coffee"} + assert observed[1]["phase"] == "after" + assert "products" in observed[1]["result"] + + +async def test_middleware_composes_outermost_first(): + """Multiple middlewares compose in order: the first entry wraps + everything later. Matches Starlette/ASGI semantics — the contract + documented in SkillMiddleware's docstring.""" + call_order: list[str] = [] + + def _mw(name: str) -> Any: + async def middleware(skill_name, params, context, call_next): + call_order.append(f"{name}-enter") + try: + return await call_next() + finally: + call_order.append(f"{name}-exit") + + return middleware + + executor = ADCPAgentExecutor( + _TestHandler(), middleware=[_mw("outer"), _mw("middle"), _mw("inner")] + ) + ctx = RequestContext(request=MessageSendParams(message=_make_datapart_msg("get_products"))) + queue = EventQueue() + await executor.execute(ctx, queue) + + assert call_order == [ + "outer-enter", + "middle-enter", + "inner-enter", + "inner-exit", + "middle-exit", + "outer-exit", + ], call_order + + +async def test_middleware_can_short_circuit_without_invoking_handler(): + """Middleware that returns without calling ``call_next`` stops the + chain and its return value becomes the dispatch result. Rate + limiters and feature flags rely on this.""" + handler_called = False + + class _TrackingHandler(ADCPHandler): + async def get_adcp_capabilities(self, params: Any, context: Any = None) -> Any: + return {"adcp": {"major_versions": [3]}} + + async def get_products(self, params: Any, context: Any = None) -> Any: + nonlocal handler_called + handler_called = True + return {"products": []} + + async def rate_limit_middleware(skill_name, params, context, call_next): + # Don't call call_next — short-circuit. + return {"products": [], "sandbox": True, "rate_limited": True} + + executor = ADCPAgentExecutor(_TrackingHandler(), middleware=[rate_limit_middleware]) + ctx = RequestContext(request=MessageSendParams(message=_make_datapart_msg("get_products"))) + queue = EventQueue() + await executor.execute(ctx, queue) + + event = await queue.dequeue_event(no_wait=True) + assert isinstance(event, Task) + assert event.status.state == "completed" + data_parts = [ + p.root + for p in event.artifacts[0].parts + if hasattr(p.root, "data") and isinstance(p.root.data, dict) + ] + result = data_parts[0].data + assert result.get("rate_limited") is True + assert handler_called is False, ( + "middleware short-circuited but the handler still ran — call_next " + "was invoked despite the middleware not calling it" + ) + + +async def test_middleware_observes_handler_exceptions(): + """Audit middleware needs to see failures, not just successes. The + issue's leaning-toward-option-A reasoning cited this explicitly.""" + captured_exceptions: list[Exception] = [] + + async def audit_middleware(skill_name, params, context, call_next): + try: + return await call_next() + except Exception as exc: + captured_exceptions.append(exc) + raise + + class _FailingHandler(ADCPHandler): + async def get_adcp_capabilities(self, params: Any, context: Any = None) -> Any: + return {} + + async def get_products(self, params: Any, context: Any = None) -> Any: + raise RuntimeError("deliberate handler failure") + + executor = ADCPAgentExecutor(_FailingHandler(), middleware=[audit_middleware]) + ctx = RequestContext(request=MessageSendParams(message=_make_datapart_msg("get_products"))) + queue = EventQueue() + await executor.execute(ctx, queue) + + assert len(captured_exceptions) == 1 + assert isinstance(captured_exceptions[0], RuntimeError) + # And the executor's normal failure path still runs — the client + # gets a failed task, not a 500, because middleware re-raised. + event = await queue.dequeue_event(no_wait=True) + assert isinstance(event, Task) + assert event.status.state == "failed" + + +async def test_no_middleware_preserves_direct_dispatch(): + """Sellers who don't pass ``middleware`` see zero behavior change — + the dispatch chain short-circuits to direct handler invocation, + and nothing in the chain allocates per-call middleware state.""" + executor = ADCPAgentExecutor(_TestHandler()) + assert executor._middleware == () + + ctx = RequestContext(request=MessageSendParams(message=_make_datapart_msg("get_products"))) + queue = EventQueue() + await executor.execute(ctx, queue) + event = await queue.dequeue_event(no_wait=True) + assert isinstance(event, Task) + assert event.status.state == "completed" + + +@pytest.mark.skipif( + sys.version_info < (3, 11), + reason="a2a-sdk starlette integration requires Python 3.11+", +) +def test_create_a2a_server_threads_middleware_into_executor(): + """Kwarg on ``create_a2a_server`` reaches the executor. Paranoid + contract test: if a refactor accidentally drops the kwarg from the + ``ADCPAgentExecutor(...)`` construction, this fires.""" + + async def noop_mw(skill_name, params, context, call_next): + return await call_next() + + app = create_a2a_server(_TestHandler(), name="mw-test", middleware=[noop_mw]) + handler = _extract_default_request_handler(app) + executor = handler.agent_executor + assert isinstance(executor, ADCPAgentExecutor) + assert executor._middleware == (noop_mw,) + + +async def test_middleware_can_invoke_call_next_multiple_times_for_retry(): + """Retry-on-transient-error middleware calls ``call_next()`` more + than once — each call builds a fresh inner chain. This locks the + re-entrant composition contract a naive loop-variable closure would + break.""" + call_counts = {"mw": 0, "handler": 0} + + async def retry_middleware(skill_name, params, context, call_next): + last_exc: Exception | None = None + for _ in range(3): + call_counts["mw"] += 1 + try: + return await call_next() + except RuntimeError as exc: + last_exc = exc + raise last_exc if last_exc else RuntimeError("unreachable") + + class _TransientFailHandler(ADCPHandler): + async def get_adcp_capabilities(self, params: Any, context: Any = None) -> Any: + return {} + + async def get_products(self, params: Any, context: Any = None) -> Any: + call_counts["handler"] += 1 + if call_counts["handler"] < 3: + raise RuntimeError("transient") + return {"products": [{"id": "finally"}]} + + executor = ADCPAgentExecutor(_TransientFailHandler(), middleware=[retry_middleware]) + ctx = RequestContext(request=MessageSendParams(message=_make_datapart_msg("get_products"))) + queue = EventQueue() + await executor.execute(ctx, queue) + + assert call_counts["mw"] == 3 + assert call_counts["handler"] == 3 + event = await queue.dequeue_event(no_wait=True) + assert isinstance(event, Task) + assert event.status.state == "completed" + + +async def test_middleware_can_transform_result_on_return_side(): + """Middleware can mutate or replace the value of ``call_next()`` + before returning it. The transformed value is what the client + sees — covers the annotation / enrichment use case distinct from + short-circuiting.""" + + async def enriching_middleware(skill_name, params, context, call_next): + result = await call_next() + # Wrap handler's return with a marker the test observes. + if isinstance(result, dict): + return {**result, "middleware_marker": "wrapped"} + return result + + executor = ADCPAgentExecutor(_TestHandler(), middleware=[enriching_middleware]) + ctx = RequestContext(request=MessageSendParams(message=_make_datapart_msg("get_products"))) + queue = EventQueue() + await executor.execute(ctx, queue) + + event = await queue.dequeue_event(no_wait=True) + assert isinstance(event, Task) + assert event.status.state == "completed" + data_parts = [ + p.root + for p in event.artifacts[0].parts + if hasattr(p.root, "data") and isinstance(p.root.data, dict) + ] + result = data_parts[0].data + assert result["middleware_marker"] == "wrapped" + # And the handler's original payload is still there. + assert result["products"][0]["id"] == "p1"