From 61a746a13243626f6c39035f5448270f4c47ea67 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 11 May 2026 06:45:00 +0000 Subject: [PATCH 1/3] feat(a2a): callable public_url (PublicUrlResolver) for per-request agent-card URL resolution Adds support for a `Callable[[Request], str]` in addition to the existing static `str` for `serve(public_url=...)` and `create_a2a_server(public_url=...)`. When a callable is supplied, an ASGI-layer intercept replaces the static `create_agent_card_routes` for the well-known endpoints, building the card per GET request. The `DefaultRequestHandler` retains a localhost fallback card for the `GetAgentCard` RPC path. Option A (trust_forwarded_headers) was ruled out by security review: the unauthenticated agent-card endpoint + unvalidated X-Forwarded-Host = agent-card hijack with no clean mitigation short of an allow-list. The callable puts the trust-policy decision in adopter code where it belongs. Closes #647 https://claude.ai/code/session_01CTpSC8E52DSKTDFJx7Y4Gf --- src/adcp/server/__init__.py | 8 +- src/adcp/server/a2a_server.py | 292 +++++++++++++++++++++----- src/adcp/server/serve.py | 45 ++-- tests/test_a2a_public_url_resolver.py | 230 ++++++++++++++++++++ 4 files changed, 507 insertions(+), 68 deletions(-) create mode 100644 tests/test_a2a_public_url_resolver.py diff --git a/src/adcp/server/__init__.py b/src/adcp/server/__init__.py index 8adb7067..4a9d5756 100644 --- a/src/adcp/server/__init__.py +++ b/src/adcp/server/__init__.py @@ -53,7 +53,12 @@ async def get_products(params, context=None): from __future__ import annotations from adcp.capabilities import validate_capabilities -from adcp.server.a2a_server import ADCPAgentExecutor, MessageParser, create_a2a_server +from adcp.server.a2a_server import ( + ADCPAgentExecutor, + MessageParser, + PublicUrlResolver, + create_a2a_server, +) from adcp.server.auth import ( A2ABearerAuthMiddleware, AsyncTokenValidator, @@ -204,6 +209,7 @@ async def get_products(params, context=None): # A2A integration "ADCPAgentExecutor", "MessageParser", + "PublicUrlResolver", "ASGIMiddlewareEntry", "SkillMiddleware", "create_a2a_server", diff --git a/src/adcp/server/a2a_server.py b/src/adcp/server/a2a_server.py index 6c61cfd4..6475fbd6 100644 --- a/src/adcp/server/a2a_server.py +++ b/src/adcp/server/a2a_server.py @@ -86,6 +86,34 @@ ``(None, {})``. """ +PublicUrlResolver = Callable[[Any], str] +"""Per-request public URL resolver for the A2A agent card. + +Called once per GET ``/.well-known/agent-card.json`` (and the 0.3 +alias ``/.well-known/agent.json``) to derive the base URL embedded in +``supportedInterfaces`` entries. Receives the Starlette +:class:`~starlette.requests.Request` and must return an absolute URL +string. + +Typical use — multi-tenant subdomain routing:: + + def agent_card_url(request: Request) -> str: + host = request.headers.get("host", "localhost") + return f"https://{host}/" + + serve(handler, transport="a2a", public_url=agent_card_url) + +**Trust boundary:** the callable owns all header-trust decisions. +Do not read ``X-Forwarded-Host`` unless your proxy layer is confirmed +to strip that header on ingress — on a directly internet-facing +deployment, those headers are attacker-controlled. The ``host`` +header is set by the TLS-terminating proxy and is safe to use. + +Returned URLs must be ``https://`` for non-loopback hosts. Returning +``http://`` for a non-loopback hostname causes the per-request handler +to return HTTP 500 without echoing the bad URL to the client. +""" + from adcp.server.mcp_tools import create_tool_caller, get_tools_for_handler from adcp.server.test_controller import TestControllerStore, _handle_test_controller @@ -746,6 +774,102 @@ def _build_agent_card( ) +def _validate_card_url(url: str) -> str: + """Validate the URL returned by a :data:`PublicUrlResolver`. + + Raises ``ValueError`` when the value is not a valid absolute URL or + uses ``http://`` for a non-loopback host. The per-request card + handler catches this and returns HTTP 500 without echoing the bad + value to the client. + """ + from urllib.parse import urlparse + + parsed = urlparse(url) + if not parsed.scheme or not parsed.netloc: + raise ValueError( + f"public_url resolver returned {url!r} — " + "must be an absolute URL with scheme and host." + ) + hostname = parsed.hostname or "" + is_loopback = hostname in ("localhost", "127.0.0.1", "::1") or hostname.endswith( + ".localhost" + ) + if parsed.scheme != "https" and not is_loopback: + raise ValueError( + f"public_url resolver returned {url!r} — " + "scheme must be 'https' for non-loopback hosts." + ) + return url + + +def _wrap_with_per_request_card( + inner: Any, + *, + resolver: Callable[[Any], str], + handler: ADCPHandler[Any], + name: str, + port: int, + description: str | None, + version: str, + extra_skills: list[pb.AgentSkill] | None, + advertise_all: bool, + push_notifications_supported: bool, + auth: BearerTokenAuth | None, +) -> Any: + """Wrap an ASGI app to serve agent-card endpoints per-request. + + Intercepts GET ``/.well-known/agent-card.json`` and + ``/.well-known/agent.json``; all other requests pass through to the + inner app unchanged. + + Used when :func:`create_a2a_server` receives a + :data:`PublicUrlResolver` callable — the a2a-sdk's + ``create_agent_card_routes`` bakes the card at construction time + and cannot surface per-request context. + """ + from a2a.server.routes.agent_card_routes import agent_card_to_dict # type: ignore[attr-defined] + from starlette.requests import Request + from starlette.responses import JSONResponse, Response + + _card_paths: frozenset[str] = frozenset( + {"/.well-known/agent-card.json", "/.well-known/agent.json"} + ) + + async def _middleware(scope: Any, receive: Any, send: Any) -> None: + if ( + scope.get("type") == "http" + and scope.get("path") in _card_paths + and scope.get("method") == "GET" + ): + request = Request(scope, receive) + try: + raw_url = resolver(request) + url = _validate_card_url(raw_url) + except Exception as exc: + logger.error("public_url resolver raised: %s", exc) + error_response: Any = Response(status_code=500) + await error_response(scope, receive, send) + return + card = _build_agent_card( + handler, + name=name, + port=port, + description=description, + version=version, + extra_skills=extra_skills, + advertise_all=advertise_all, + push_notifications_supported=push_notifications_supported, + auth=auth, + public_url=url, + ) + card_response: Any = JSONResponse(agent_card_to_dict(card)) + await card_response(scope, receive, send) + return + await inner(scope, receive, send) + + return _middleware + + def create_a2a_server( handler: ADCPHandler[Any], *, @@ -765,7 +889,7 @@ def create_a2a_server( pre_validation_hooks: dict[str, Any] | None = None, context_builder: Any | None = None, auth: BearerTokenAuth | None = None, - public_url: str | None = None, + public_url: str | Callable[[Any], str] | None = None, ) -> Any: """Create an A2A Starlette application from an ADCP handler. @@ -861,25 +985,47 @@ def create_a2a_server( at the ASGI layer. Adopters calling ``create_a2a_server`` directly must wrap the returned app with :class:`A2ABearerAuthMiddleware` themselves. - public_url: Optional public base URL advertised in the A2A agent - card (``/.well-known/agent-card.json``). When set, this value - replaces the default ``http://localhost:{port}/`` in every - ``supported_interfaces`` URL entry. Use this when the agent - runs behind a load balancer or reverse proxy and the bound - socket address differs from the externally reachable URL - (e.g. ``https://agent.example.com/``). Falls back to the + public_url: Public base URL for the A2A agent card + (``/.well-known/agent-card.json``). Accepts either a static + string or a :data:`PublicUrlResolver` callable for per-request + resolution. + + *Static string* — replaces ``http://localhost:{port}/`` in + every ``supported_interfaces`` URL. Falls back to the ``PUBLIC_URL`` environment variable when ``public_url`` is - ``None`` and the env var is set, enabling zero-code-change - configuration on Cloud Run / Fly.io / Railway. When neither - is supplied the default ``http://localhost:{port}/`` is used — - correct for local development, incorrect for production - deployments behind a proxy. + ``None``. Correct for single-host or fixed-URL deployments. + + *Callable* — receives the Starlette + :class:`~starlette.requests.Request` on each card fetch and + must return an absolute ``https://`` URL. Use this for + multi-tenant subdomain deployments where each tenant has its + own public host:: + + def agent_card_url(request: Request) -> str: + host = request.headers.get("host", "localhost") + return f"https://{host}/" + + serve(handler, transport="a2a", public_url=agent_card_url) + + When a callable is supplied the a2a-sdk's static + ``create_agent_card_routes`` is bypassed in favour of an + ASGI-layer intercept that builds the card per-request. The + ``DefaultRequestHandler``'s internal ``GetAgentCard`` RPC + path retains a ``localhost`` fallback card — buyers probing + the well-known endpoint always receive the per-request card. + + The ``PUBLIC_URL`` env-var fallback applies only when + ``public_url`` is ``None``; a callable takes priority. Returns: A Starlette app ready to be run with uvicorn. """ resolved_port = port or int(os.environ.get("PORT", "3001")) - resolved_public_url = public_url or os.environ.get("PUBLIC_URL") + # A callable resolver takes priority; env-var fallback only applies + # when public_url is None (not callable). + resolved_public_url: str | Callable[[Any], str] | None = ( + public_url if public_url is not None else os.environ.get("PUBLIC_URL") + ) executor = ADCPAgentExecutor( handler, @@ -893,33 +1039,9 @@ def create_a2a_server( test_controller_account_resolver=test_controller_account_resolver, ) - agent_card = _build_agent_card( - handler, - name=name, - port=resolved_port, - description=description, - version=version, - extra_skills=_test_controller_skills() if test_controller else None, - advertise_all=advertise_all, - push_notifications_supported=push_config_store is not None, - auth=auth, - public_url=resolved_public_url, - ) - if task_store is None: task_store = InMemoryTaskStore() - # DefaultRequestHandler stores push_config_store verbatim and treats - # None as "push-notif endpoints unsupported" (UnsupportedOperationError - # on tasks/pushNotificationConfig/*). Passing None is the correct - # default; sellers opt in by wiring a store. - request_handler = DefaultRequestHandler( - agent_executor=executor, - task_store=task_store, - agent_card=agent_card, - push_config_store=push_config_store, - ) - # ``enable_v0_3_compat=True`` is load-bearing: it makes the server # dual-serve 0.3 and 1.0 wire formats on the same endpoint so existing # 0.3 buyer clients keep working unchanged. Do not disable. @@ -934,24 +1056,96 @@ def create_a2a_server( # ``ServerCallContext`` shape (e.g. surfacing additional # ``state`` fields from the request). jsonrpc_kwargs: dict[str, Any] = { - "request_handler": request_handler, "rpc_url": "/", "enable_v0_3_compat": True, } if context_builder is not None: jsonrpc_kwargs["context_builder"] = context_builder - routes = ( - list(create_agent_card_routes(agent_card=agent_card)) - # 0.3 alias: A2A 0.3 buyer SDKs probe /.well-known/agent.json - # as a positive A2A signal. Same handler, no redirect round-trip. - + list( - create_agent_card_routes( - agent_card=agent_card, card_url="/.well-known/agent.json" + + _extra_skills = _test_controller_skills() if test_controller else None + _push_supported = push_config_store is not None + + if callable(resolved_public_url): + # Per-request path: build a localhost fallback card for + # DefaultRequestHandler's internal GetAgentCard RPC (buyers probe + # /.well-known/agent-card.json directly; the RPC fallback is rarely + # used). The well-known endpoints are served by + # _wrap_with_per_request_card which builds a fresh card per GET. + fallback_card = _build_agent_card( + handler, + name=name, + port=resolved_port, + description=description, + version=version, + extra_skills=_extra_skills, + advertise_all=advertise_all, + push_notifications_supported=_push_supported, + auth=auth, + public_url=None, + ) + # DefaultRequestHandler stores push_config_store verbatim and + # treats None as "push-notif unsupported". Passing None is the + # correct default; sellers opt in by wiring a store. + request_handler = DefaultRequestHandler( + agent_executor=executor, + task_store=task_store, + agent_card=fallback_card, + push_config_store=push_config_store, + ) + jsonrpc_kwargs["request_handler"] = request_handler + routes = list(create_jsonrpc_routes(**jsonrpc_kwargs)) + app = Starlette(routes=routes) + app = _wrap_with_per_request_card( + app, + resolver=resolved_public_url, + handler=handler, + name=name, + port=resolved_port, + description=description, + version=version, + extra_skills=_extra_skills, + advertise_all=advertise_all, + push_notifications_supported=_push_supported, + auth=auth, + ) + else: + # Static card path: existing behaviour — card built once at + # server init and served unchanged on every card request. + agent_card = _build_agent_card( + handler, + name=name, + port=resolved_port, + description=description, + version=version, + extra_skills=_extra_skills, + advertise_all=advertise_all, + push_notifications_supported=_push_supported, + auth=auth, + public_url=resolved_public_url, + ) + # DefaultRequestHandler stores push_config_store verbatim and treats + # None as "push-notif endpoints unsupported" (UnsupportedOperationError + # on tasks/pushNotificationConfig/*). Passing None is the correct + # default; sellers opt in by wiring a store. + request_handler = DefaultRequestHandler( + agent_executor=executor, + task_store=task_store, + agent_card=agent_card, + push_config_store=push_config_store, + ) + jsonrpc_kwargs["request_handler"] = request_handler + routes = ( + list(create_agent_card_routes(agent_card=agent_card)) + # 0.3 alias: A2A 0.3 buyer SDKs probe /.well-known/agent.json + # as a positive A2A signal. Same handler, no redirect round-trip. + + list( + create_agent_card_routes( + agent_card=agent_card, card_url="/.well-known/agent.json" + ) ) + + list(create_jsonrpc_routes(**jsonrpc_kwargs)) ) - + list(create_jsonrpc_routes(**jsonrpc_kwargs)) - ) - app = Starlette(routes=routes) + app = Starlette(routes=routes) # Startup log lives on the create_a2a_server path (symmetric with # MCP's _register_handler_tools). Moved out of diff --git a/src/adcp/server/serve.py b/src/adcp/server/serve.py index 147c7182..0be70847 100644 --- a/src/adcp/server/serve.py +++ b/src/adcp/server/serve.py @@ -135,11 +135,11 @@ class ServeConfig: stateless_http: bool = False session_idle_timeout: float | None = 1800.0 - # --- A2A only --- + # --- A2A / both --- task_store: TaskStore | None = None push_config_store: PushNotificationConfigStore | None = None message_parser: MessageParser | None = None - public_url: str | None = None + public_url: str | Callable[..., str] | None = None # --- Shared infrastructure --- test_controller: TestControllerStore | None = None @@ -587,7 +587,7 @@ def serve( allowed_origins: Sequence[str] | None = None, enable_dns_rebinding_protection: bool | None = None, auth: BearerTokenAuth | None = None, - public_url: str | None = None, + public_url: str | Callable[..., str] | None = None, ) -> None: """Start an MCP or A2A server from an ADCP handler or server builder. @@ -789,18 +789,27 @@ class of bug that shipped the ``pricing_options`` stdio, ``auth`` is ignored with a warning (no HTTP layer). For non-bearer schemes (mTLS, signed-request derivation), wire your own middleware via ``asgi_middleware=`` instead. - public_url: Optional public base URL for the A2A agent card - (``/.well-known/agent-card.json``). When set, replaces the - default ``http://localhost:{port}/`` in every - ``supportedInterfaces`` entry so external clients discover - the correct endpoint instead of the internal socket address. - Use this when the agent runs behind a load balancer, reverse - proxy, or cloud-run service (e.g. - ``public_url="https://agent.example.com/"``). Automatically - falls back to the ``PUBLIC_URL`` environment variable when - the kwarg is ``None``, enabling zero-code-change - configuration on Cloud Run, Fly.io, and Railway. Ignored for - MCP transports. Trailing slash is normalised automatically. + public_url: Public base URL for the A2A agent card + (``/.well-known/agent-card.json``). Accepts a static string + or a :data:`~adcp.server.a2a_server.PublicUrlResolver` + callable for per-request resolution. + + *Static string* — replaces ``http://localhost:{port}/`` in + ``supportedInterfaces``. Falls back to the ``PUBLIC_URL`` + env var when ``None``. Correct for single-host deployments. + + *Callable* — receives the Starlette ``Request`` per card + fetch; must return an absolute ``https://`` URL. Use for + multi-tenant subdomain deployments where each tenant host + needs its own card:: + + def resolver(request): + host = request.headers.get("host", "localhost") + return f"https://{host}/" + + serve(handler, transport="a2a", public_url=resolver) + + Ignored for MCP transports. Example (MCP): from adcp.server import ADCPHandler, serve @@ -1493,7 +1502,7 @@ def _serve_a2a( specialisms: list[str] | None = None, description: str | None = None, auth: BearerTokenAuth | None = None, - public_url: str | None = None, + public_url: str | Callable[..., str] | None = None, ) -> None: """Start an A2A server using uvicorn.""" import uvicorn @@ -1582,7 +1591,7 @@ def _build_mcp_and_a2a_app( allowed_origins: Sequence[str] | None = None, enable_dns_rebinding_protection: bool | None = None, auth: BearerTokenAuth | None = None, - public_url: str | None = None, + public_url: str | Callable[..., str] | None = None, ) -> Any: """Build the unified MCP+A2A ASGI app without starting a server. @@ -1767,7 +1776,7 @@ def _serve_mcp_and_a2a( allowed_origins: Sequence[str] | None = None, enable_dns_rebinding_protection: bool | None = None, auth: BearerTokenAuth | None = None, - public_url: str | None = None, + public_url: str | Callable[..., str] | None = None, ) -> None: """Serve MCP and A2A on a single port via path dispatch. diff --git a/tests/test_a2a_public_url_resolver.py b/tests/test_a2a_public_url_resolver.py new file mode 100644 index 00000000..ae1af825 --- /dev/null +++ b/tests/test_a2a_public_url_resolver.py @@ -0,0 +1,230 @@ +"""Tests for callable ``public_url`` (PublicUrlResolver) on the A2A agent card. + +Issue #647: per-request agent-card URL resolution for multi-tenant +subdomain deployments. +""" + +from __future__ import annotations + +import httpx +import pytest +from asgi_lifespan import LifespanManager + +from adcp.server.a2a_server import ( + PublicUrlResolver, + _validate_card_url, + create_a2a_server, +) +from adcp.server.base import ADCPHandler +from adcp.server.responses import capabilities_response + + +class _OkHandler(ADCPHandler): # type: ignore[misc] + async def get_adcp_capabilities(self, params, context=None): # type: ignore[no-untyped-def] + return capabilities_response(["media_buy"]) + + +# --------------------------------------------------------------------------- +# _validate_card_url unit tests +# --------------------------------------------------------------------------- + + +def test_validate_card_url_accepts_https() -> None: + assert _validate_card_url("https://acme.example.com/") == "https://acme.example.com/" + + +def test_validate_card_url_accepts_http_localhost() -> None: + assert _validate_card_url("http://localhost:3001/") == "http://localhost:3001/" + + +def test_validate_card_url_accepts_http_127() -> None: + assert _validate_card_url("http://127.0.0.1:3001/") == "http://127.0.0.1:3001/" + + +def test_validate_card_url_rejects_http_non_loopback() -> None: + with pytest.raises(ValueError, match="scheme must be 'https'"): + _validate_card_url("http://acme.example.com/") + + +def test_validate_card_url_rejects_no_scheme() -> None: + with pytest.raises(ValueError, match="must be an absolute URL"): + _validate_card_url("acme.example.com") + + +def test_validate_card_url_rejects_empty() -> None: + with pytest.raises(ValueError): + _validate_card_url("") + + +# --------------------------------------------------------------------------- +# Integration tests — per-request card endpoint +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_callable_public_url_serves_per_request_card() -> None: + """Callable ``public_url`` returns a card with the resolver's URL on each request.""" + calls: list[str] = [] + + def resolver(request) -> str: # type: ignore[no-untyped-def] + host = request.headers.get("host", "localhost") + calls.append(host) + return f"https://{host}/" + + app = create_a2a_server(_OkHandler(), name="test-agent", validation=None, public_url=resolver) + async with LifespanManager(app): + async with httpx.AsyncClient( + transport=httpx.ASGITransport(app=app), base_url="http://test" + ) as client: + resp = await client.get( + "/.well-known/agent-card.json", + headers={"host": "acme.scope3.com"}, + ) + + assert resp.status_code == 200 + body = resp.json() + assert "name" in body + # The resolver's URL must appear in supportedInterfaces + interfaces = body.get("supportedInterfaces") or body.get("supported_interfaces", []) + urls = [iface.get("url", "") for iface in interfaces] + assert any("acme.scope3.com" in u for u in urls), f"expected acme.scope3.com in {urls}" + # Resolver was called + assert calls == ["acme.scope3.com"] + + +@pytest.mark.asyncio +async def test_callable_public_url_different_hosts_per_request() -> None: + """Each card request gets its own URL from the resolver.""" + + def resolver(request) -> str: # type: ignore[no-untyped-def] + host = request.headers.get("host", "localhost") + return f"https://{host}/" + + app = create_a2a_server(_OkHandler(), name="test-agent", validation=None, public_url=resolver) + async with LifespanManager(app): + async with httpx.AsyncClient( + transport=httpx.ASGITransport(app=app), base_url="http://test" + ) as client: + resp_acme = await client.get( + "/.well-known/agent-card.json", + headers={"host": "acme.scope3.com"}, + ) + resp_beta = await client.get( + "/.well-known/agent-card.json", + headers={"host": "beta.scope3.com"}, + ) + + for resp, expected_host in [ + (resp_acme, "acme.scope3.com"), + (resp_beta, "beta.scope3.com"), + ]: + assert resp.status_code == 200 + body = resp.json() + interfaces = body.get("supportedInterfaces") or body.get("supported_interfaces", []) + urls = [iface.get("url", "") for iface in interfaces] + assert any(expected_host in u for u in urls), ( + f"expected {expected_host} in {urls}" + ) + + +@pytest.mark.asyncio +async def test_callable_public_url_0_3_alias_also_per_request() -> None: + """0.3 alias /.well-known/agent.json also served by the resolver.""" + + def resolver(request) -> str: # type: ignore[no-untyped-def] + host = request.headers.get("host", "localhost") + return f"https://{host}/" + + app = create_a2a_server(_OkHandler(), name="test-agent", validation=None, public_url=resolver) + async with LifespanManager(app): + async with httpx.AsyncClient( + transport=httpx.ASGITransport(app=app), base_url="http://test" + ) as client: + resp = await client.get( + "/.well-known/agent.json", + headers={"host": "tenant.example.com"}, + ) + + assert resp.status_code == 200 + body = resp.json() + interfaces = body.get("supportedInterfaces") or body.get("supported_interfaces", []) + urls = [iface.get("url", "") for iface in interfaces] + assert any("tenant.example.com" in u for u in urls), f"expected tenant.example.com in {urls}" + + +@pytest.mark.asyncio +async def test_callable_public_url_resolver_error_returns_500() -> None: + """When the resolver raises, the endpoint returns 500 without leaking the error.""" + + def resolver(request) -> str: # type: ignore[no-untyped-def] + raise RuntimeError("upstream lookup failed") + + app = create_a2a_server(_OkHandler(), name="test-agent", validation=None, public_url=resolver) + async with LifespanManager(app): + async with httpx.AsyncClient( + transport=httpx.ASGITransport(app=app), base_url="http://test" + ) as client: + resp = await client.get("/.well-known/agent-card.json") + + assert resp.status_code == 500 + + +@pytest.mark.asyncio +async def test_callable_public_url_invalid_url_returns_500() -> None: + """When the resolver returns an http:// non-loopback URL, the endpoint returns 500.""" + + def resolver(request) -> str: # type: ignore[no-untyped-def] + return "http://acme.example.com/" # http non-loopback — invalid + + app = create_a2a_server(_OkHandler(), name="test-agent", validation=None, public_url=resolver) + async with LifespanManager(app): + async with httpx.AsyncClient( + transport=httpx.ASGITransport(app=app), base_url="http://test" + ) as client: + resp = await client.get("/.well-known/agent-card.json") + + assert resp.status_code == 500 + + +@pytest.mark.asyncio +async def test_static_public_url_unchanged() -> None: + """Existing static ``public_url`` string behaviour is preserved.""" + app = create_a2a_server( + _OkHandler(), + name="test-agent", + validation=None, + public_url="https://agent.example.com/", + ) + async with LifespanManager(app): + async with httpx.AsyncClient( + transport=httpx.ASGITransport(app=app), base_url="http://test" + ) as client: + resp = await client.get("/.well-known/agent-card.json") + + assert resp.status_code == 200 + body = resp.json() + interfaces = body.get("supportedInterfaces") or body.get("supported_interfaces", []) + urls = [iface.get("url", "") for iface in interfaces] + assert any("agent.example.com" in u for u in urls), f"expected agent.example.com in {urls}" + + +@pytest.mark.asyncio +async def test_no_public_url_unchanged() -> None: + """Existing ``public_url=None`` behaviour is preserved (localhost URL).""" + app = create_a2a_server(_OkHandler(), name="test-agent", validation=None) + async with LifespanManager(app): + async with httpx.AsyncClient( + transport=httpx.ASGITransport(app=app), base_url="http://test" + ) as client: + resp = await client.get("/.well-known/agent-card.json") + + assert resp.status_code == 200 + body = resp.json() + assert "name" in body + + +def test_public_url_resolver_is_exported() -> None: + """PublicUrlResolver is importable from adcp.server.""" + from adcp.server import PublicUrlResolver as _PR # noqa: F401 + + assert _PR is PublicUrlResolver From 2dd012fbe3bdb6ccf5a14ba7f706e9a1f9087123 Mon Sep 17 00:00:00 2001 From: "brian@agenticadvertising.org" Date: Mon, 11 May 2026 06:53:05 +0000 Subject: [PATCH 2/3] fix(a2a): address pre-PR review feedback on callable public_url - Await async resolvers (inspect.iscoroutine) so async def resolvers work - Add test_async_resolver_works integration test covering async path - Use logger.error(..., exc_info=True) to preserve resolver tracebacks - Use JSONResponse body for 500 errors instead of empty Response - Consolidate serve.py type annotations to use PublicUrlResolver alias - Replace real brand names in tests with RFC 2606 example.com hostnames https://claude.ai/code/session_01CTpSC8E52DSKTDFJx7Y4Gf --- src/adcp/server/a2a_server.py | 26 +++++++++++++--- src/adcp/server/serve.py | 12 +++---- tests/test_a2a_public_url_resolver.py | 45 +++++++++++++++++++++------ 3 files changed, 63 insertions(+), 20 deletions(-) diff --git a/src/adcp/server/a2a_server.py b/src/adcp/server/a2a_server.py index 6475fbd6..a5cca17e 100644 --- a/src/adcp/server/a2a_server.py +++ b/src/adcp/server/a2a_server.py @@ -93,16 +93,26 @@ alias ``/.well-known/agent.json``) to derive the base URL embedded in ``supportedInterfaces`` entries. Receives the Starlette :class:`~starlette.requests.Request` and must return an absolute URL -string. +string. Both sync and async callables are accepted. Typical use — multi-tenant subdomain routing:: + from starlette.requests import Request + def agent_card_url(request: Request) -> str: host = request.headers.get("host", "localhost") return f"https://{host}/" serve(handler, transport="a2a", public_url=agent_card_url) +Async resolvers work the same way:: + + from starlette.requests import Request + + async def agent_card_url(request: Request) -> str: + host = request.headers.get("host", "localhost") + return f"https://{host}/" + **Trust boundary:** the callable owns all header-trust decisions. Do not read ``X-Forwarded-Host`` unless your proxy layer is confirmed to strip that header on ingress — on a directly internet-facing @@ -827,9 +837,11 @@ def _wrap_with_per_request_card( ``create_agent_card_routes`` bakes the card at construction time and cannot surface per-request context. """ + import inspect + from a2a.server.routes.agent_card_routes import agent_card_to_dict # type: ignore[attr-defined] from starlette.requests import Request - from starlette.responses import JSONResponse, Response + from starlette.responses import JSONResponse _card_paths: frozenset[str] = frozenset( {"/.well-known/agent-card.json", "/.well-known/agent.json"} @@ -844,10 +856,14 @@ async def _middleware(scope: Any, receive: Any, send: Any) -> None: request = Request(scope, receive) try: raw_url = resolver(request) + if inspect.iscoroutine(raw_url): + raw_url = await raw_url url = _validate_card_url(raw_url) - except Exception as exc: - logger.error("public_url resolver raised: %s", exc) - error_response: Any = Response(status_code=500) + except Exception: + logger.error("public_url resolver raised", exc_info=True) + error_response: Any = JSONResponse( + {"error": "agent-card temporarily unavailable"}, status_code=500 + ) await error_response(scope, receive, send) return card = _build_agent_card( diff --git a/src/adcp/server/serve.py b/src/adcp/server/serve.py index 0be70847..f50e29c1 100644 --- a/src/adcp/server/serve.py +++ b/src/adcp/server/serve.py @@ -55,7 +55,7 @@ async def get_adcp_capabilities(self, params, context=None): ) from a2a.server.tasks.task_store import TaskStore - from adcp.server.a2a_server import MessageParser + from adcp.server.a2a_server import MessageParser, PublicUrlResolver from adcp.server.auth import BearerTokenAuth from adcp.server.test_controller import TestControllerStore @@ -139,7 +139,7 @@ class ServeConfig: task_store: TaskStore | None = None push_config_store: PushNotificationConfigStore | None = None message_parser: MessageParser | None = None - public_url: str | Callable[..., str] | None = None + public_url: str | PublicUrlResolver | None = None # --- Shared infrastructure --- test_controller: TestControllerStore | None = None @@ -587,7 +587,7 @@ def serve( allowed_origins: Sequence[str] | None = None, enable_dns_rebinding_protection: bool | None = None, auth: BearerTokenAuth | None = None, - public_url: str | Callable[..., str] | None = None, + public_url: str | PublicUrlResolver | None = None, ) -> None: """Start an MCP or A2A server from an ADCP handler or server builder. @@ -1502,7 +1502,7 @@ def _serve_a2a( specialisms: list[str] | None = None, description: str | None = None, auth: BearerTokenAuth | None = None, - public_url: str | Callable[..., str] | None = None, + public_url: str | PublicUrlResolver | None = None, ) -> None: """Start an A2A server using uvicorn.""" import uvicorn @@ -1591,7 +1591,7 @@ def _build_mcp_and_a2a_app( allowed_origins: Sequence[str] | None = None, enable_dns_rebinding_protection: bool | None = None, auth: BearerTokenAuth | None = None, - public_url: str | Callable[..., str] | None = None, + public_url: str | PublicUrlResolver | None = None, ) -> Any: """Build the unified MCP+A2A ASGI app without starting a server. @@ -1776,7 +1776,7 @@ def _serve_mcp_and_a2a( allowed_origins: Sequence[str] | None = None, enable_dns_rebinding_protection: bool | None = None, auth: BearerTokenAuth | None = None, - public_url: str | Callable[..., str] | None = None, + public_url: str | PublicUrlResolver | None = None, ) -> None: """Serve MCP and A2A on a single port via path dispatch. diff --git a/tests/test_a2a_public_url_resolver.py b/tests/test_a2a_public_url_resolver.py index ae1af825..aa816168 100644 --- a/tests/test_a2a_public_url_resolver.py +++ b/tests/test_a2a_public_url_resolver.py @@ -78,7 +78,7 @@ def resolver(request) -> str: # type: ignore[no-untyped-def] ) as client: resp = await client.get( "/.well-known/agent-card.json", - headers={"host": "acme.scope3.com"}, + headers={"host": "tenant-a.example.com"}, ) assert resp.status_code == 200 @@ -87,9 +87,11 @@ def resolver(request) -> str: # type: ignore[no-untyped-def] # The resolver's URL must appear in supportedInterfaces interfaces = body.get("supportedInterfaces") or body.get("supported_interfaces", []) urls = [iface.get("url", "") for iface in interfaces] - assert any("acme.scope3.com" in u for u in urls), f"expected acme.scope3.com in {urls}" + assert any("tenant-a.example.com" in u for u in urls), ( + f"expected tenant-a.example.com in {urls}" + ) # Resolver was called - assert calls == ["acme.scope3.com"] + assert calls == ["tenant-a.example.com"] @pytest.mark.asyncio @@ -107,16 +109,16 @@ def resolver(request) -> str: # type: ignore[no-untyped-def] ) as client: resp_acme = await client.get( "/.well-known/agent-card.json", - headers={"host": "acme.scope3.com"}, + headers={"host": "tenant-a.example.com"}, ) resp_beta = await client.get( "/.well-known/agent-card.json", - headers={"host": "beta.scope3.com"}, + headers={"host": "tenant-b.example.com"}, ) for resp, expected_host in [ - (resp_acme, "acme.scope3.com"), - (resp_beta, "beta.scope3.com"), + (resp_acme, "tenant-a.example.com"), + (resp_beta, "tenant-b.example.com"), ]: assert resp.status_code == 200 body = resp.json() @@ -186,6 +188,31 @@ def resolver(request) -> str: # type: ignore[no-untyped-def] assert resp.status_code == 500 +@pytest.mark.asyncio +async def test_async_resolver_works() -> None: + """Async resolver coroutines are awaited and resolve the card correctly.""" + + async def resolver(request) -> str: # type: ignore[no-untyped-def] + host = request.headers.get("host", "localhost") + return f"https://{host}/" + + app = create_a2a_server(_OkHandler(), name="test-agent", validation=None, public_url=resolver) + async with LifespanManager(app): + async with httpx.AsyncClient( + transport=httpx.ASGITransport(app=app), base_url="http://test" + ) as client: + resp = await client.get( + "/.well-known/agent-card.json", + headers={"host": "async.scope3.com"}, + ) + + assert resp.status_code == 200 + body = resp.json() + interfaces = body.get("supportedInterfaces") or body.get("supported_interfaces", []) + urls = [iface.get("url", "") for iface in interfaces] + assert any("async.scope3.com" in u for u in urls), f"expected async.scope3.com in {urls}" + + @pytest.mark.asyncio async def test_static_public_url_unchanged() -> None: """Existing static ``public_url`` string behaviour is preserved.""" @@ -225,6 +252,6 @@ async def test_no_public_url_unchanged() -> None: def test_public_url_resolver_is_exported() -> None: """PublicUrlResolver is importable from adcp.server.""" - from adcp.server import PublicUrlResolver as _PR # noqa: F401 + from adcp.server import PublicUrlResolver as ImportedResolver # noqa: N814 - assert _PR is PublicUrlResolver + assert ImportedResolver is PublicUrlResolver From 7000dae0c57365438e2fd533a47df2d53c897d96 Mon Sep 17 00:00:00 2001 From: Brian O'Kelley Date: Mon, 11 May 2026 03:14:59 -0400 Subject: [PATCH 3/3] fix(a2a): widen PublicUrlResolver alias to accept Awaitable[str] - Type alias is now Callable[[Any], str | Awaitable[str]] so adopters annotating ``async def`` resolvers against the public alias type-check. - Propagate the alias through create_a2a_server signature and the internal resolved_public_url type. - Tighten _wrap_with_per_request_card resolver parameter to the alias. - Use inspect.isawaitable (not iscoroutine) for resolver result, with a runtime assert to narrow for mypy. - Replace real brand host async.scope3.com with async.example.com in tests per repo convention. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/adcp/server/a2a_server.py | 23 ++++++++++------------- tests/test_a2a_public_url_resolver.py | 14 ++++++-------- 2 files changed, 16 insertions(+), 21 deletions(-) diff --git a/src/adcp/server/a2a_server.py b/src/adcp/server/a2a_server.py index a5cca17e..f7c5dd3c 100644 --- a/src/adcp/server/a2a_server.py +++ b/src/adcp/server/a2a_server.py @@ -62,7 +62,7 @@ from adcp.server.auth import BearerTokenAuth from adcp.server.serve import ContextFactory, SkillMiddleware -from collections.abc import Callable # noqa: E402 +from collections.abc import Awaitable, Callable # noqa: E402 from adcp.validation.client_hooks import ( # noqa: E402 SERVER_DEFAULT_VALIDATION, @@ -86,7 +86,7 @@ ``(None, {})``. """ -PublicUrlResolver = Callable[[Any], str] +PublicUrlResolver = Callable[[Any], str | Awaitable[str]] """Per-request public URL resolver for the A2A agent card. Called once per GET ``/.well-known/agent-card.json`` (and the 0.3 @@ -801,9 +801,7 @@ def _validate_card_url(url: str) -> str: "must be an absolute URL with scheme and host." ) hostname = parsed.hostname or "" - is_loopback = hostname in ("localhost", "127.0.0.1", "::1") or hostname.endswith( - ".localhost" - ) + is_loopback = hostname in ("localhost", "127.0.0.1", "::1") or hostname.endswith(".localhost") if parsed.scheme != "https" and not is_loopback: raise ValueError( f"public_url resolver returned {url!r} — " @@ -815,7 +813,7 @@ def _validate_card_url(url: str) -> str: def _wrap_with_per_request_card( inner: Any, *, - resolver: Callable[[Any], str], + resolver: PublicUrlResolver, handler: ADCPHandler[Any], name: str, port: int, @@ -855,9 +853,10 @@ async def _middleware(scope: Any, receive: Any, send: Any) -> None: ): request = Request(scope, receive) try: - raw_url = resolver(request) - if inspect.iscoroutine(raw_url): + raw_url: str | Awaitable[str] = resolver(request) + if inspect.isawaitable(raw_url): raw_url = await raw_url + assert isinstance(raw_url, str) url = _validate_card_url(raw_url) except Exception: logger.error("public_url resolver raised", exc_info=True) @@ -905,7 +904,7 @@ def create_a2a_server( pre_validation_hooks: dict[str, Any] | None = None, context_builder: Any | None = None, auth: BearerTokenAuth | None = None, - public_url: str | Callable[[Any], str] | None = None, + public_url: str | PublicUrlResolver | None = None, ) -> Any: """Create an A2A Starlette application from an ADCP handler. @@ -1039,7 +1038,7 @@ def agent_card_url(request: Request) -> str: resolved_port = port or int(os.environ.get("PORT", "3001")) # A callable resolver takes priority; env-var fallback only applies # when public_url is None (not callable). - resolved_public_url: str | Callable[[Any], str] | None = ( + resolved_public_url: str | PublicUrlResolver | None = ( public_url if public_url is not None else os.environ.get("PUBLIC_URL") ) @@ -1155,9 +1154,7 @@ def agent_card_url(request: Request) -> str: # 0.3 alias: A2A 0.3 buyer SDKs probe /.well-known/agent.json # as a positive A2A signal. Same handler, no redirect round-trip. + list( - create_agent_card_routes( - agent_card=agent_card, card_url="/.well-known/agent.json" - ) + create_agent_card_routes(agent_card=agent_card, card_url="/.well-known/agent.json") ) + list(create_jsonrpc_routes(**jsonrpc_kwargs)) ) diff --git a/tests/test_a2a_public_url_resolver.py b/tests/test_a2a_public_url_resolver.py index aa816168..f46f4a9e 100644 --- a/tests/test_a2a_public_url_resolver.py +++ b/tests/test_a2a_public_url_resolver.py @@ -87,9 +87,9 @@ def resolver(request) -> str: # type: ignore[no-untyped-def] # The resolver's URL must appear in supportedInterfaces interfaces = body.get("supportedInterfaces") or body.get("supported_interfaces", []) urls = [iface.get("url", "") for iface in interfaces] - assert any("tenant-a.example.com" in u for u in urls), ( - f"expected tenant-a.example.com in {urls}" - ) + assert any( + "tenant-a.example.com" in u for u in urls + ), f"expected tenant-a.example.com in {urls}" # Resolver was called assert calls == ["tenant-a.example.com"] @@ -124,9 +124,7 @@ def resolver(request) -> str: # type: ignore[no-untyped-def] body = resp.json() interfaces = body.get("supportedInterfaces") or body.get("supported_interfaces", []) urls = [iface.get("url", "") for iface in interfaces] - assert any(expected_host in u for u in urls), ( - f"expected {expected_host} in {urls}" - ) + assert any(expected_host in u for u in urls), f"expected {expected_host} in {urls}" @pytest.mark.asyncio @@ -203,14 +201,14 @@ async def resolver(request) -> str: # type: ignore[no-untyped-def] ) as client: resp = await client.get( "/.well-known/agent-card.json", - headers={"host": "async.scope3.com"}, + headers={"host": "async.example.com"}, ) assert resp.status_code == 200 body = resp.json() interfaces = body.get("supportedInterfaces") or body.get("supported_interfaces", []) urls = [iface.get("url", "") for iface in interfaces] - assert any("async.scope3.com" in u for u in urls), f"expected async.scope3.com in {urls}" + assert any("async.example.com" in u for u in urls), f"expected async.example.com in {urls}" @pytest.mark.asyncio