Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/adcp/server/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ async def get_products(params, context=None):
from adcp.capabilities import validate_capabilities
from adcp.server.a2a_server import ADCPAgentExecutor, MessageParser, create_a2a_server
from adcp.server.auth import (
A2ABearerAuthMiddleware,
AsyncTokenValidator,
BearerTokenAuth,
BearerTokenAuthMiddleware,
Principal,
SyncTokenValidator,
Expand Down Expand Up @@ -194,7 +196,9 @@ async def get_products(params, context=None):
"SkillMiddleware",
"create_a2a_server",
# Bearer-token auth middleware (seller-facing recipe)
"A2ABearerAuthMiddleware",
"AsyncTokenValidator",
"BearerTokenAuth",
"BearerTokenAuthMiddleware",
"Principal",
"SyncTokenValidator",
Expand Down
24 changes: 19 additions & 5 deletions src/adcp/server/a2a_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,7 @@ def create_a2a_server(
message_parser: MessageParser | None = None,
advertise_all: bool = False,
validation: ValidationHookConfig | None = SERVER_DEFAULT_VALIDATION,
context_builder: Any | None = None,
) -> Any:
"""Create an A2A Starlette application from an ADCP handler.

Expand Down Expand Up @@ -780,12 +781,25 @@ def create_a2a_server(
# ``enable_v0_3_compat=True`` is load-bearing: it makes the server
# dual-serve 0.3 and 1.0 wire formats on the same endpoint so existing
# 0.3 buyer clients keep working unchanged. Do not disable.
#
# ``context_builder`` is the a2a-sdk seam for customising the
# :class:`ServerCallContext` each handler receives. We thread it
# through verbatim when supplied — bearer-token auth is wired
# separately via :class:`A2ABearerAuthMiddleware` at the ASGI
# layer (see ``serve.py:_wrap_a2a_with_auth``) because the v0.3
# compat adapter swallows builder-raised ``HTTPException``s. The
# builder kwarg remains for adopters customising the
# ``ServerCallContext`` shape (e.g. surfacing additional
# ``state`` fields from the request).
jsonrpc_kwargs: dict[str, Any] = {
"request_handler": request_handler,
"rpc_url": "/",
"enable_v0_3_compat": True,
}
if context_builder is not None:
jsonrpc_kwargs["context_builder"] = context_builder
routes = list(create_agent_card_routes(agent_card=agent_card)) + list(
create_jsonrpc_routes(
request_handler=request_handler,
rpc_url="/",
enable_v0_3_compat=True,
)
create_jsonrpc_routes(**jsonrpc_kwargs)
)
app = Starlette(routes=routes)

Expand Down
278 changes: 274 additions & 4 deletions src/adcp/server/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,15 @@ async def validate_token(token: str) -> Principal | None:
* **Authorization.** The middleware answers "who is this?", not "can
they do X?". Authorization checks run on the authenticated principal
inside your handlers or as :data:`~adcp.server.SkillMiddleware`.
* **A2A auth.** A2A uses a different transport; wire a2a-sdk's
``ServerCallContext.user`` via a2a-sdk auth middleware on that side.
The ``Principal`` / ``ToolContext`` shape is the same, so handlers
work unchanged across transports.
* **A2A auth.** A2A uses a different transport; the same
:class:`BearerTokenAuth` config object drives both legs when wired
via :func:`adcp.server.serve`'s ``auth=`` kwarg. The A2A side is
authenticated by a :class:`BearerTokenContextBuilder` plumbed into
``a2a-sdk``'s ``create_jsonrpc_routes(context_builder=...)`` seam,
not by a Starlette middleware — that placement bypasses the
``/.well-known/agent-card.json`` route automatically (which is
registered separately and never invokes the builder), satisfying
A2A spec §4.1's mandate that the agent card be publicly accessible.
"""

from __future__ import annotations
Expand Down Expand Up @@ -480,3 +485,268 @@ def _validate(token: str) -> Principal | None:
return constant_time_token_match(token, stored_hashes)

return _validate


# ---------------------------------------------------------------------------
# Cross-transport auth config — drives both MCP middleware and A2A builder
# ---------------------------------------------------------------------------


@dataclass(frozen=True)
class BearerTokenAuth:
"""Cross-transport bearer-token auth config for :func:`adcp.server.serve`.

Single source of truth that wires the same ``validate_token``
callback (and ``header_name`` / ``bearer_prefix_required`` knobs)
into both the MCP-side :class:`BearerTokenAuthMiddleware` and the
A2A-side :class:`BearerTokenContextBuilder`. Pass via
``serve(auth=BearerTokenAuth(...))`` and both legs are
authenticated against the same token store with no per-leg
drift::

from adcp.server import serve
from adcp.server.auth import BearerTokenAuth, validator_from_token_map

serve(
handler,
transport="both",
auth=BearerTokenAuth(
validate_token=validator_from_token_map({
"secret-token": Principal(caller_identity="p", tenant_id="acme"),
}),
),
)

On MCP, requests without a valid token receive a JSON ``401``
body. On A2A, requests without a valid token receive an HTTP
``401`` from Starlette's :class:`HTTPException`. Discovery
bypasses are transport-specific:

* **MCP**: ``initialize`` / ``tools/list`` / ``notifications/initialized``
/ ``get_adcp_capabilities`` (JSON-RPC method-level bypass).
* **A2A**: ``/.well-known/agent-card.json`` (route-level — the
agent-card route is created separately and never invokes the
builder, so no path-based exemption is needed in the
:class:`BearerTokenContextBuilder` itself).

Knobs mirror :class:`BearerTokenAuthMiddleware` exactly: pass
``header_name="x-adcp-auth"`` and ``bearer_prefix_required=False``
for non-OAuth custom-header schemes.
"""

validate_token: TokenValidator
header_name: str = "authorization"
bearer_prefix_required: bool = True
unauthenticated_response: dict[str, Any] | None = None


# ---------------------------------------------------------------------------
# A2A: ASGI middleware that gates JSON-RPC requests, exempts agent-card
# ---------------------------------------------------------------------------
#
# Why an ASGI middleware (not a ServerCallContextBuilder)?
# The a2a-sdk v0.3 compat adapter wraps the entire dispatch in
# ``except Exception`` and converts any error — including a builder-
# raised :class:`HTTPException(401)` — into a 200 OK with a JSON-RPC
# error body. That breaks the spec-canonical HTTP 401 contract and
# leaks the auth path as a 200. Authenticating outside the dispatcher,
# at the ASGI layer, returns proper HTTP 401 every time.
#
# A2A discovery (``/.well-known/agent-card.json``) is exempted by URL
# path here because the agent-card route happens to live in the same
# Starlette app — the middleware can't rely on the route topology
# alone. Path-exemption keeps the spec §4.1 public-discovery mandate
# satisfied even if a future a2a-sdk refactor merges the routes.


# Canonical 1.0 path is sourced from a2a-sdk's own constant — if a
# future a2a-sdk release renames the well-known URI, the import-time
# reference here lifts to the new value automatically and
# ``test_discovery_paths_match_a2a_sdk_routes`` verifies that the
# frozenset still covers every route ``create_agent_card_routes``
# actually registers. Hardcoding the string would silently leak auth
# on the renamed route until someone notices.
from a2a.utils.constants import ( # noqa: E402 (intentional placement after BearerTokenAuth definition)
AGENT_CARD_WELL_KNOWN_PATH as _A2A_AGENT_CARD_PATH,
)

_A2A_DISCOVERY_PATHS: frozenset[str] = frozenset(
{
_A2A_AGENT_CARD_PATH, # 1.0 canonical: ``/.well-known/agent-card.json``.
"/.well-known/agent.json", # Legacy 0.3 alias retained by enable_v0_3_compat=True.
}
)


class A2ABearerAuthMiddleware:
"""Pure-ASGI middleware that gates A2A JSON-RPC on a bearer token.

Wrap the Starlette app produced by
:func:`adcp.server.a2a_server.create_a2a_server` with this
middleware to require a valid bearer header on every JSON-RPC
request, while leaving the spec-mandated public discovery
surface (``/.well-known/agent-card.json``) accessible.

Designed to compose with a2a-sdk's
:class:`DefaultServerCallContextBuilder`: on auth success the
middleware writes a duck-typed user object into
``scope['user']`` and the principal into ``scope['auth']``,
matching Starlette's :class:`AuthenticationMiddleware` contract.
The default builder reads ``scope['user']`` and adapts it via
:class:`a2a.server.routes.common.StarletteUser`, so downstream
handlers see ``ServerCallContext.user.user_name`` populated with
the principal's ``caller_identity`` without a custom builder.

Composition order matters when ``transport="both"`` is in play:
wrap the per-leg apps before any outer dispatcher closes over
them. See ``serve.py:_build_mcp_and_a2a_app`` for the wiring.
"""

def __init__(self, app: Any, config: BearerTokenAuth) -> None:
self._app = app
self._config = config
self._header_name = config.header_name.lower()

async def __call__(self, scope: Any, receive: Any, send: Any) -> None:
# Lifespan + websocket pass through unchanged. Auth applies to
# HTTP requests only.
if scope.get("type") != "http":
await self._app(scope, receive, send)
return

# CORS preflight is part of the public surface — browser-origin
# clients send ``OPTIONS`` before any auth'd POST. Returning 401
# here breaks the preflight and the buyer never gets a chance to
# retry with a token. Pass through; let the inner app's CORS
# handler (or operator-supplied ``asgi_middleware``) respond.
if scope.get("method") == "OPTIONS":
await self._app(scope, receive, send)
return

path = scope.get("path", "")
if path in _A2A_DISCOVERY_PATHS:
await self._app(scope, receive, send)
return

principal = self._authenticate_scope(scope)
if principal is None:
await self._send_unauthenticated(send)
return

# Stash both the duck-typed user (for DefaultServerCallContextBuilder)
# and the raw Principal (for downstream code reading scope['auth']).
# Mutating the scope dict before delegating propagates state to
# nested apps without copying.
scope["user"] = _A2AAuthenticatedUser(
display_name=principal.caller_identity,
tenant_id=principal.tenant_id,
principal_metadata=dict(principal.metadata) if principal.metadata else None,
)
scope["auth"] = principal
await self._app(scope, receive, send)

def _authenticate_scope(self, scope: Any) -> Principal | None:
"""Read + validate the bearer header off raw ASGI scope.

Validator exceptions are projected to :data:`None` (logged for
operators) so a buggy validator never leaks 500-level stack
traces or signals path existence to unauthenticated callers.
Auth-rejection branches log at INFO with a coarse reason code
so SOC dashboards can detect scanning without bloating logs.
"""
# ASGI ``headers`` is a list of ``(bytes_lower, bytes)`` tuples.
target = self._header_name.encode("latin-1")
raw_value: bytes | None = None
for name, value in scope.get("headers", ()):
if name == target:
raw_value = value
break

if raw_value is None:
logger.info("a2a auth rejected", extra={"reason": "missing_header"})
return None

try:
raw_header = raw_value.decode("latin-1")
except UnicodeDecodeError:
logger.info("a2a auth rejected", extra={"reason": "header_decode"})
return None

if self._config.bearer_prefix_required:
bearer = _parse_bearer_header(raw_header)
else:
stripped = raw_header.strip()
bearer = stripped or None
if not bearer:
logger.info("a2a auth rejected", extra={"reason": "wrong_scheme"})
return None

try:
raw = self._config.validate_token(bearer)
except Exception:
logger.exception("token validator raised on A2A request")
return None

if inspect.isawaitable(raw):
# Should be unreachable — :func:`_assert_sync_validator` at
# config time rejects async validators before any traffic
# lands. This branch is the in-depth catch in case an
# adopter swaps in an async validator at runtime via a
# closure that conditionally awaits.
logger.error(
"a2a auth rejected: validator returned awaitable at request "
"time. Async validators are not supported on the A2A leg; "
"wrap with a sync bridge."
)
return None

if raw is None:
logger.info("a2a auth rejected", extra={"reason": "invalid_token"})
return None
return raw

async def _send_unauthenticated(self, send: Any) -> None:
body_obj = self._config.unauthenticated_response or {
"error": "invalid_token",
"error_description": "Bearer token missing or invalid",
}
body = json.dumps(body_obj).encode("utf-8")
# RFC 6750 §3 + RFC 7235 §3.1 require ``WWW-Authenticate: Bearer``
# on every 401. Without it, RFC-compliant clients (including
# browsers and many HTTP libraries) won't surface the auth
# challenge to the user — they treat the 401 as a generic
# error. Always emit; even when the operator overrides
# ``unauthenticated_response``, the header stays for protocol
# compliance.
await send(
{
"type": "http.response.start",
"status": 401,
"headers": [
(b"content-type", b"application/json"),
(b"content-length", str(len(body)).encode("latin-1")),
(b"www-authenticate", b'Bearer realm="a2a", error="invalid_token"'),
],
}
)
await send({"type": "http.response.body", "body": body})


@dataclass(frozen=True)
class _A2AAuthenticatedUser:
"""Minimal Starlette-BaseUser-shaped object for :class:`StarletteUser`.

a2a-sdk's :class:`StarletteUser` adapter wants ``is_authenticated``
(bool) and ``display_name`` (str). It doesn't import Starlette's
:class:`BaseUser` directly — duck-typing works. We synthesize a
frozen dataclass so the principal's identity flows through with no
Starlette dependency on the auth side.
"""

display_name: str
tenant_id: str | None = None
principal_metadata: dict[str, Any] | None = None

@property
def is_authenticated(self) -> bool:
return True
Loading
Loading