Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/adcp/server/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,12 @@ async def get_products(params, context=None):
from __future__ import annotations

from adcp.capabilities import validate_capabilities
from adcp.server.a2a_server import ADCPAgentExecutor, MessageParser, create_a2a_server
from adcp.server.a2a_server import (
ADCPAgentExecutor,
MessageParser,
PublicUrlResolver,
create_a2a_server,
)
from adcp.server.auth import (
A2ABearerAuthMiddleware,
AsyncTokenValidator,
Expand Down Expand Up @@ -204,6 +209,7 @@ async def get_products(params, context=None):
# A2A integration
"ADCPAgentExecutor",
"MessageParser",
"PublicUrlResolver",
"ASGIMiddlewareEntry",
"SkillMiddleware",
"create_a2a_server",
Expand Down
307 changes: 257 additions & 50 deletions src/adcp/server/a2a_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
from adcp.server.auth import BearerTokenAuth
from adcp.server.serve import ContextFactory, SkillMiddleware

from collections.abc import Callable # noqa: E402
from collections.abc import Awaitable, Callable # noqa: E402

from adcp.validation.client_hooks import ( # noqa: E402
SERVER_DEFAULT_VALIDATION,
Expand All @@ -86,6 +86,44 @@
``(None, {})``.
"""

PublicUrlResolver = Callable[[Any], str | Awaitable[str]]
"""Per-request public URL resolver for the A2A agent card.

Called once per GET ``/.well-known/agent-card.json`` (and the 0.3
alias ``/.well-known/agent.json``) to derive the base URL embedded in
``supportedInterfaces`` entries. Receives the Starlette
:class:`~starlette.requests.Request` and must return an absolute URL
string. Both sync and async callables are accepted.

Typical use — multi-tenant subdomain routing::

from starlette.requests import Request

def agent_card_url(request: Request) -> str:
host = request.headers.get("host", "localhost")
return f"https://{host}/"

serve(handler, transport="a2a", public_url=agent_card_url)

Async resolvers work the same way::

from starlette.requests import Request

async def agent_card_url(request: Request) -> str:
host = request.headers.get("host", "localhost")
return f"https://{host}/"

**Trust boundary:** the callable owns all header-trust decisions.
Do not read ``X-Forwarded-Host`` unless your proxy layer is confirmed
to strip that header on ingress — on a directly internet-facing
deployment, those headers are attacker-controlled. The ``host``
header is set by the TLS-terminating proxy and is safe to use.

Returned URLs must be ``https://`` for non-loopback hosts. Returning
``http://`` for a non-loopback hostname causes the per-request handler
to return HTTP 500 without echoing the bad URL to the client.
"""


from adcp.server.mcp_tools import create_tool_caller, get_tools_for_handler
from adcp.server.test_controller import TestControllerStore, _handle_test_controller
Expand Down Expand Up @@ -746,6 +784,107 @@ def _build_agent_card(
)


def _validate_card_url(url: str) -> str:
"""Validate the URL returned by a :data:`PublicUrlResolver`.

Raises ``ValueError`` when the value is not a valid absolute URL or
uses ``http://`` for a non-loopback host. The per-request card
handler catches this and returns HTTP 500 without echoing the bad
value to the client.
"""
from urllib.parse import urlparse

parsed = urlparse(url)
if not parsed.scheme or not parsed.netloc:
raise ValueError(
f"public_url resolver returned {url!r} — "
"must be an absolute URL with scheme and host."
)
hostname = parsed.hostname or ""
is_loopback = hostname in ("localhost", "127.0.0.1", "::1") or hostname.endswith(".localhost")
if parsed.scheme != "https" and not is_loopback:
raise ValueError(
f"public_url resolver returned {url!r} — "
"scheme must be 'https' for non-loopback hosts."
)
return url


def _wrap_with_per_request_card(
inner: Any,
*,
resolver: PublicUrlResolver,
handler: ADCPHandler[Any],
name: str,
port: int,
description: str | None,
version: str,
extra_skills: list[pb.AgentSkill] | None,
advertise_all: bool,
push_notifications_supported: bool,
auth: BearerTokenAuth | None,
) -> Any:
"""Wrap an ASGI app to serve agent-card endpoints per-request.

Intercepts GET ``/.well-known/agent-card.json`` and
``/.well-known/agent.json``; all other requests pass through to the
inner app unchanged.

Used when :func:`create_a2a_server` receives a
:data:`PublicUrlResolver` callable — the a2a-sdk's
``create_agent_card_routes`` bakes the card at construction time
and cannot surface per-request context.
"""
import inspect

from a2a.server.routes.agent_card_routes import agent_card_to_dict # type: ignore[attr-defined]
from starlette.requests import Request
from starlette.responses import JSONResponse

_card_paths: frozenset[str] = frozenset(
{"/.well-known/agent-card.json", "/.well-known/agent.json"}
)

async def _middleware(scope: Any, receive: Any, send: Any) -> None:
if (
scope.get("type") == "http"
and scope.get("path") in _card_paths
and scope.get("method") == "GET"
):
request = Request(scope, receive)
try:
raw_url: str | Awaitable[str] = resolver(request)
if inspect.isawaitable(raw_url):
raw_url = await raw_url
assert isinstance(raw_url, str)
url = _validate_card_url(raw_url)
except Exception:
logger.error("public_url resolver raised", exc_info=True)
error_response: Any = JSONResponse(
{"error": "agent-card temporarily unavailable"}, status_code=500
)
await error_response(scope, receive, send)
return
card = _build_agent_card(
handler,
name=name,
port=port,
description=description,
version=version,
extra_skills=extra_skills,
advertise_all=advertise_all,
push_notifications_supported=push_notifications_supported,
auth=auth,
public_url=url,
)
card_response: Any = JSONResponse(agent_card_to_dict(card))
await card_response(scope, receive, send)
return
await inner(scope, receive, send)

return _middleware


def create_a2a_server(
handler: ADCPHandler[Any],
*,
Expand All @@ -765,7 +904,7 @@ def create_a2a_server(
pre_validation_hooks: dict[str, Any] | None = None,
context_builder: Any | None = None,
auth: BearerTokenAuth | None = None,
public_url: str | None = None,
public_url: str | PublicUrlResolver | None = None,
) -> Any:
"""Create an A2A Starlette application from an ADCP handler.

Expand Down Expand Up @@ -861,25 +1000,47 @@ def create_a2a_server(
at the ASGI layer. Adopters calling ``create_a2a_server``
directly must wrap the returned app with
:class:`A2ABearerAuthMiddleware` themselves.
public_url: Optional public base URL advertised in the A2A agent
card (``/.well-known/agent-card.json``). When set, this value
replaces the default ``http://localhost:{port}/`` in every
``supported_interfaces`` URL entry. Use this when the agent
runs behind a load balancer or reverse proxy and the bound
socket address differs from the externally reachable URL
(e.g. ``https://agent.example.com/``). Falls back to the
public_url: Public base URL for the A2A agent card
(``/.well-known/agent-card.json``). Accepts either a static
string or a :data:`PublicUrlResolver` callable for per-request
resolution.

*Static string* — replaces ``http://localhost:{port}/`` in
every ``supported_interfaces`` URL. Falls back to the
``PUBLIC_URL`` environment variable when ``public_url`` is
``None`` and the env var is set, enabling zero-code-change
configuration on Cloud Run / Fly.io / Railway. When neither
is supplied the default ``http://localhost:{port}/`` is used —
correct for local development, incorrect for production
deployments behind a proxy.
``None``. Correct for single-host or fixed-URL deployments.

*Callable* — receives the Starlette
:class:`~starlette.requests.Request` on each card fetch and
must return an absolute ``https://`` URL. Use this for
multi-tenant subdomain deployments where each tenant has its
own public host::

def agent_card_url(request: Request) -> str:
host = request.headers.get("host", "localhost")
return f"https://{host}/"

serve(handler, transport="a2a", public_url=agent_card_url)

When a callable is supplied the a2a-sdk's static
``create_agent_card_routes`` is bypassed in favour of an
ASGI-layer intercept that builds the card per-request. The
``DefaultRequestHandler``'s internal ``GetAgentCard`` RPC
path retains a ``localhost`` fallback card — buyers probing
the well-known endpoint always receive the per-request card.

The ``PUBLIC_URL`` env-var fallback applies only when
``public_url`` is ``None``; a callable takes priority.

Returns:
A Starlette app ready to be run with uvicorn.
"""
resolved_port = port or int(os.environ.get("PORT", "3001"))
resolved_public_url = public_url or os.environ.get("PUBLIC_URL")
# A callable resolver takes priority; env-var fallback only applies
# when public_url is None (not callable).
resolved_public_url: str | PublicUrlResolver | None = (
public_url if public_url is not None else os.environ.get("PUBLIC_URL")
)

executor = ADCPAgentExecutor(
handler,
Expand All @@ -893,33 +1054,9 @@ def create_a2a_server(
test_controller_account_resolver=test_controller_account_resolver,
)

agent_card = _build_agent_card(
handler,
name=name,
port=resolved_port,
description=description,
version=version,
extra_skills=_test_controller_skills() if test_controller else None,
advertise_all=advertise_all,
push_notifications_supported=push_config_store is not None,
auth=auth,
public_url=resolved_public_url,
)

if task_store is None:
task_store = InMemoryTaskStore()

# DefaultRequestHandler stores push_config_store verbatim and treats
# None as "push-notif endpoints unsupported" (UnsupportedOperationError
# on tasks/pushNotificationConfig/*). Passing None is the correct
# default; sellers opt in by wiring a store.
request_handler = DefaultRequestHandler(
agent_executor=executor,
task_store=task_store,
agent_card=agent_card,
push_config_store=push_config_store,
)

# ``enable_v0_3_compat=True`` is load-bearing: it makes the server
# dual-serve 0.3 and 1.0 wire formats on the same endpoint so existing
# 0.3 buyer clients keep working unchanged. Do not disable.
Expand All @@ -934,24 +1071,94 @@ def create_a2a_server(
# ``ServerCallContext`` shape (e.g. surfacing additional
# ``state`` fields from the request).
jsonrpc_kwargs: dict[str, Any] = {
"request_handler": request_handler,
"rpc_url": "/",
"enable_v0_3_compat": True,
}
if context_builder is not None:
jsonrpc_kwargs["context_builder"] = context_builder
routes = (
list(create_agent_card_routes(agent_card=agent_card))
# 0.3 alias: A2A 0.3 buyer SDKs probe /.well-known/agent.json
# as a positive A2A signal. Same handler, no redirect round-trip.
+ list(
create_agent_card_routes(
agent_card=agent_card, card_url="/.well-known/agent.json"

_extra_skills = _test_controller_skills() if test_controller else None
_push_supported = push_config_store is not None

if callable(resolved_public_url):
# Per-request path: build a localhost fallback card for
# DefaultRequestHandler's internal GetAgentCard RPC (buyers probe
# /.well-known/agent-card.json directly; the RPC fallback is rarely
# used). The well-known endpoints are served by
# _wrap_with_per_request_card which builds a fresh card per GET.
fallback_card = _build_agent_card(
handler,
name=name,
port=resolved_port,
description=description,
version=version,
extra_skills=_extra_skills,
advertise_all=advertise_all,
push_notifications_supported=_push_supported,
auth=auth,
public_url=None,
)
# DefaultRequestHandler stores push_config_store verbatim and
# treats None as "push-notif unsupported". Passing None is the
# correct default; sellers opt in by wiring a store.
request_handler = DefaultRequestHandler(
agent_executor=executor,
task_store=task_store,
agent_card=fallback_card,
push_config_store=push_config_store,
)
jsonrpc_kwargs["request_handler"] = request_handler
routes = list(create_jsonrpc_routes(**jsonrpc_kwargs))
app = Starlette(routes=routes)
app = _wrap_with_per_request_card(
app,
resolver=resolved_public_url,
handler=handler,
name=name,
port=resolved_port,
description=description,
version=version,
extra_skills=_extra_skills,
advertise_all=advertise_all,
push_notifications_supported=_push_supported,
auth=auth,
)
else:
# Static card path: existing behaviour — card built once at
# server init and served unchanged on every card request.
agent_card = _build_agent_card(
handler,
name=name,
port=resolved_port,
description=description,
version=version,
extra_skills=_extra_skills,
advertise_all=advertise_all,
push_notifications_supported=_push_supported,
auth=auth,
public_url=resolved_public_url,
)
# DefaultRequestHandler stores push_config_store verbatim and treats
# None as "push-notif endpoints unsupported" (UnsupportedOperationError
# on tasks/pushNotificationConfig/*). Passing None is the correct
# default; sellers opt in by wiring a store.
request_handler = DefaultRequestHandler(
agent_executor=executor,
task_store=task_store,
agent_card=agent_card,
push_config_store=push_config_store,
)
jsonrpc_kwargs["request_handler"] = request_handler
routes = (
list(create_agent_card_routes(agent_card=agent_card))
# 0.3 alias: A2A 0.3 buyer SDKs probe /.well-known/agent.json
# as a positive A2A signal. Same handler, no redirect round-trip.
+ list(
create_agent_card_routes(agent_card=agent_card, card_url="/.well-known/agent.json")
)
+ list(create_jsonrpc_routes(**jsonrpc_kwargs))
)
+ list(create_jsonrpc_routes(**jsonrpc_kwargs))
)
app = Starlette(routes=routes)
app = Starlette(routes=routes)

# Startup log lives on the create_a2a_server path (symmetric with
# MCP's _register_handler_tools). Moved out of
Expand Down
Loading
Loading