From 3848b10b88e518e5ce3e939021b011e4e3466ee6 Mon Sep 17 00:00:00 2001 From: Brian O'Kelley Date: Sat, 2 May 2026 20:39:53 -0400 Subject: [PATCH] =?UTF-8?q?feat(signing):=20async=5Fresolve=5Fagent=20?= =?UTF-8?q?=E2=80=94=20bootstrap=20to=20JWKS=20via=20brand.json?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes #344 (resolver + CLI core). Verifier-side verify_from_agent_url factory + 8 typed request_signature_* errors remain a follow-up. Adds adcp.signing.agent_resolver with the 3-hop walk: agent_url -> get_adcp_capabilities -> identity.brand_json_url -> brand.json -> jwks_uri -> JWK set closing the SSRF gap on the capabilities hop (the existing brand.json + JWKS hops were already pinned via build_async_ip_pinned_transport). The capabilities fetcher is intentionally NOT routed through adcp.client.ADCPClient — that client is for trusted-counterparty traffic; here agent_url is attacker-shaped. Public surface: - async_resolve_agent(agent_url, *, agent_type, agent_id=None, ...) -> AgentResolution - resolve_agent(...) -> sync wrapper (asyncio.run) - AgentResolution: agent_url, brand_json_url, agent_entry, jwks_uri, jwks (RFC 7517 set), fetched_at, trace - TraceEntry: per-hop record (capabilities | brand_json | jwks) - AgentResolverError(code, message) with stable AgentResolverErrorCode - BrandJsonJwksResolver.jwks_uri: new property (5 LOC) consumed by the resolver after force_refresh Notably absent (deferred per design-decision triage on #344): - identity_posture / consistency: zero schema provenance in 3.0.5; dropped from AgentResolution to keep --json output cross-SDK clean. - get_agent_jwks: redundant with BrandJsonJwksResolver; the JWK set reaches adopters via AgentResolution.jwks instead. - tldextract: only relevant when verifier-side eTLD+1 binding lands (Tier 3 #350 / adcp#3690 territory). No PSL dep added in this PR. CLI: --resolve --agent-type [--agent-id ] [--json]. Flag form (not positional subcommand) sidesteps the saved- alias collision code-reviewer flagged. 11 unit tests cover happy path + each AgentResolverErrorCode + body cap (DoS guard) + sync wrapper dispatch + forward-compat read of identity.brand_json_url via the 3.0.5 additionalProperties: true relaxation. Local gates: - ruff check src/ - clean - mypy src/adcp/ - 745 source files, no issues - pytest tests/ - 3109 passed, 26 skipped, 1 xfailed Co-Authored-By: Claude Opus 4.7 (1M context) --- src/adcp/__main__.py | 103 ++++++ src/adcp/signing/__init__.py | 14 + src/adcp/signing/agent_resolver.py | 542 +++++++++++++++++++++++++++++ src/adcp/signing/brand_jwks.py | 8 + tests/test_agent_resolver.py | 420 ++++++++++++++++++++++ 5 files changed, 1087 insertions(+) create mode 100644 src/adcp/signing/agent_resolver.py create mode 100644 tests/test_agent_resolver.py diff --git a/src/adcp/__main__.py b/src/adcp/__main__.py index e45e7d6e2..803892ab2 100644 --- a/src/adcp/__main__.py +++ b/src/adcp/__main__.py @@ -473,6 +473,71 @@ def handle_show_config() -> None: print(f"Config file: {CONFIG_FILE}") +def handle_resolve( + agent_url: str, + agent_type: str | None, + agent_id: str | None, + json_output: bool, +) -> None: + """Handle --resolve command — bootstrap from agent URL to JWK set. + + Walks ``agent_url`` → ``get_adcp_capabilities`` → + ``identity.brand_json_url`` → ``brand.json`` → ``jwks_uri`` → + JWK set, with SSRF guards on each hop. Prints either the + full :class:`AgentResolution` as JSON (``--json``) or a short + human-readable summary. + + ``--agent-type`` is required because brand.json may list multiple + agents (sales, governance, creative, etc.) under the same operator + and the resolver can't infer which one ``agent_url`` corresponds to + from the URL alone. + """ + from adcp.signing.agent_resolver import ( + AgentResolverError, + async_resolve_agent, + ) + + if not agent_type: + print( + "Error: --agent-type is required with --resolve " + "(brand|rights|measurement|governance|creative|sales|buying|signals)", + file=sys.stderr, + ) + sys.exit(2) + + try: + result = asyncio.run( + async_resolve_agent( + agent_url, + agent_type=cast(Any, agent_type), + agent_id=agent_id, + ) + ) + except AgentResolverError as exc: + if json_output: + print(json.dumps({"error": {"code": exc.code, "message": exc.message}}, indent=2)) + else: + print(f"Error [{exc.code}]: {exc.message}", file=sys.stderr) + sys.exit(1) + + if json_output: + print(result.model_dump_json(indent=2, exclude_none=True)) + return + + print(f"agent_url: {result.agent_url}") + print(f"brand_json_url: {result.brand_json_url}") + print(f"jwks_uri: {result.jwks_uri}") + print(f"agent_entry: {json.dumps(result.agent_entry)}") + print(f"jwks keys: {len(result.jwks.get('keys', []))}") + print("trace:") + for entry in result.trace: + marker = "✓" if entry.status == "ok" else "✗" + line = f" {marker} [{entry.hop}] {entry.url} ({entry.latency_ms:.0f}ms)" + if entry.error_code: + line += f" error={entry.error_code}: {entry.error_message}" + print(line) + + def resolve_agent_config(agent_identifier: str) -> dict[str, Any]: """Resolve agent identifier to configuration.""" # Check if it's a saved alias @@ -516,6 +581,33 @@ def main() -> None: parser.add_argument("--remove-agent", metavar="ALIAS", help="Remove saved agent") parser.add_argument("--show-config", action="store_true", help="Show config file location") parser.add_argument("--version", action="store_true", help="Show SDK and AdCP version") + parser.add_argument( + "--resolve", + metavar="AGENT_URL", + help="Resolve agent identity via brand.json (capabilities → brand.json → JWKS).", + ) + parser.add_argument( + "--agent-type", + metavar="TYPE", + choices=[ + "brand", + "rights", + "measurement", + "governance", + "creative", + "sales", + "buying", + "signals", + ], + help="Agent type for --resolve (matches the brand.json agents[] entry). " + "Required with --resolve.", + ) + parser.add_argument( + "--agent-id", + metavar="ID", + help="Optional agent ID for --resolve (disambiguates multiple agents of " + "the same type in brand.json).", + ) # Execution options parser.add_argument("--protocol", choices=["mcp", "a2a"], help="Force protocol type") @@ -542,6 +634,7 @@ def main() -> None: args.remove_agent, args.show_config, args.version, + args.resolve, ] ) ): @@ -559,6 +652,12 @@ def main() -> None: print(' adcp cs-agent calibrate_content \'{"content_standards_id":"cs-123"}\'') print(" adcp si-agent si_get_offering") print(" adcp gov-agent list_property_lists") + print("\nIdentity Resolution Examples:") + print(" adcp --resolve https://buyer.example.com/mcp --agent-type sales") + print( + " adcp --resolve https://buyer.example.com/mcp --agent-type sales --json | " + "jq .jwks_uri" + ) sys.exit(0) # Handle configuration commands @@ -587,6 +686,10 @@ def main() -> None: handle_show_config() sys.exit(0) + if args.resolve: + handle_resolve(args.resolve, args.agent_type, args.agent_id, args.json) + sys.exit(0) + # Execute tool if not args.agent: print("Error: Agent identifier required", file=sys.stderr) diff --git a/src/adcp/signing/__init__.py b/src/adcp/signing/__init__.py index 37730b84b..a63913666 100644 --- a/src/adcp/signing/__init__.py +++ b/src/adcp/signing/__init__.py @@ -87,6 +87,14 @@ from __future__ import annotations +from adcp.signing.agent_resolver import ( + AgentResolution, + AgentResolverError, + AgentResolverErrorCode, + TraceEntry, + async_resolve_agent, + resolve_agent, +) from adcp.signing.autosign import ( SigningConfig, SigningDecision, @@ -266,6 +274,9 @@ def __init__(self, *args: object, **kwargs: object) -> None: "ALG_ED25519", "ALG_ES256", "ALLOWED_ALGS", + "AgentResolution", + "AgentResolverError", + "AgentResolverErrorCode", "AsyncCachingJwksResolver", "AsyncCachingRevocationChecker", "AsyncIpPinnedTransport", @@ -336,6 +347,7 @@ def __init__(self, *args: object, **kwargs: object) -> None: "SigningDecision", "SigningProvider", "StaticJwksResolver", + "TraceEntry", "VerifiedSigner", "VerifierCapability", "VerifyOptions", @@ -344,6 +356,7 @@ def __init__(self, *args: object, **kwargs: object) -> None: "as_async_resolver", "async_default_jwks_fetcher", "async_default_revocation_list_fetcher", + "async_resolve_agent", "async_sign_request", "averify_detached_jws", "averify_jws_document", @@ -371,6 +384,7 @@ def __init__(self, *args: object, **kwargs: object) -> None: "pem_to_adcp_jwk", "private_key_from_jwk", "public_key_from_jwk", + "resolve_agent", "resolve_and_validate_host", "sign_request", "sign_signature_base", diff --git a/src/adcp/signing/agent_resolver.py b/src/adcp/signing/agent_resolver.py new file mode 100644 index 000000000..4db2f7de7 --- /dev/null +++ b/src/adcp/signing/agent_resolver.py @@ -0,0 +1,542 @@ +"""Bootstrap from an agent URL to that agent's signing keys. + +AdCP 3.x adds ``identity.brand_json_url`` to the +``get_adcp_capabilities`` response (per adcontextprotocol/adcp#3690, +schema-relaxed in 3.0.5 via ``identity.additionalProperties: true``). +With that field present, a verifier can hand an agent URL alone and +the resolver walks: + + agent_url → get_adcp_capabilities → identity.brand_json_url → + brand.json → jwks_uri → JWK set + +without out-of-band knowledge of the operator domain. + +Three hops, three SSRF guards: + +* **Capabilities (this module):** built atop + :func:`adcp.signing.ip_pinned_transport.build_async_ip_pinned_transport`. + Same posture as the brand.json + JWKS hops — IP-pinned at connect, + redirect-capped, body-capped, HTTPS-validated. **Not routed through + :class:`adcp.client.ADCPClient`** because that client is for + trusted-counterparty traffic; here ``agent_url`` is attacker-shaped. +* **brand.json:** delegated to :class:`BrandJsonJwksResolver` (already + IP-pinned per-hop with redirect cap). +* **JWKS:** delegated to :func:`async_default_jwks_fetcher` (IP-pinned, + trust_env=False). + +The resolver returns an :class:`AgentResolution` snapshot — +``agent_url``, ``brand_json_url``, ``jwks_uri``, the full JWK set, and +a per-hop ``trace``. Adopters who want ongoing rotation handling +instantiate :class:`BrandJsonJwksResolver` directly with the resolved +``brand_json_url``; this resolver is one-shot. + +The 8 ``request_signature_*`` verifier-side error codes are NOT mapped +here — those belong to the :func:`verify_from_agent_url` factory (still +to ship). Resolver-side failures surface as :class:`AgentResolverError` +with a stable ``code`` attribute. +""" + +from __future__ import annotations + +import asyncio +import json +import time +from collections.abc import Callable +from contextlib import AbstractAsyncContextManager +from dataclasses import dataclass +from typing import Any, Literal + +import httpx +from pydantic import BaseModel, ConfigDict, Field + +from adcp.signing.brand_jwks import ( + BrandAgentType, + BrandJsonJwksResolver, + BrandJsonResolverError, +) +from adcp.signing.ip_pinned_transport import build_async_ip_pinned_transport +from adcp.signing.jwks import ( + SSRFValidationError, + async_default_jwks_fetcher, +) + +#: Maximum capabilities response body in bytes. Capabilities documents +#: are larger than brand.json (operator/agent declarations, supported +#: protocol matrix, idempotency policy) so the cap is higher — 64 KiB +#: matches the issue's spec quickstart guidance. +DEFAULT_MAX_CAPABILITIES_BYTES = 64 * 1024 + +#: Capabilities fetch must NOT follow redirects across origins — a +#: malicious agent_url could redirect to a trusted origin's +#: capabilities endpoint and steal that origin's identity claim. +#: Following 0 redirects forces the resolver to surface the redirect +#: as ``capabilities_unreachable`` rather than silently follow it. +DEFAULT_CAPABILITIES_MAX_REDIRECTS = 0 + +DEFAULT_CAPABILITIES_TIMEOUT_SECONDS = 10.0 + +#: Stable error codes raised by :class:`AgentResolverError`. Surface +#: matches the resolver-side concerns (the verifier-side +#: ``request_signature_*`` codes ship with :func:`verify_from_agent_url`, +#: not here). +AgentResolverErrorCode = Literal[ + "invalid_agent_url", + "capabilities_unreachable", + "capabilities_invalid", + "brand_json_url_missing", + "brand_json_resolution_failed", + "jwks_fetch_failed", +] + + +class AgentResolverError(Exception): + """Raised when ``async_resolve_agent`` cannot produce an + :class:`AgentResolution`. The ``code`` attribute is stable across + versions and intended for ``except`` clarity / structured logging. + """ + + def __init__(self, code: AgentResolverErrorCode, message: str) -> None: + super().__init__(message) + self.code: AgentResolverErrorCode = code + self.message = message + + +# ---- Trace + AgentResolution ---- + + +class TraceEntry(BaseModel): + """One hop in the resolver chain. Captured for observability and + CLI ``--json`` output. Adopters reading ``trace`` for telemetry + should treat hop names as a stable enum (``capabilities``, + ``brand_json``, ``jwks``).""" + + model_config = ConfigDict(extra="forbid") + + hop: Literal["capabilities", "brand_json", "jwks"] + url: str + status: Literal["ok", "error"] + latency_ms: float + error_code: str | None = None + error_message: str | None = None + + +class AgentResolution(BaseModel): + """Snapshot of an agent's signing-key chain at a point in time. + + Carries everything a verifier needs to validate a request from + ``agent_url``: the brand.json URL the operator advertised, the + matched agent entry, the JWKS URI and the JWK set itself, plus + a per-hop trace for observability. + + Note ``identity_posture`` and ``consistency`` (proposed in the + original #344 issue body) are **not** present — neither term + has normative AdCP provenance in 3.0.5 schemas, so emitting them + in cross-SDK ``--json`` output would leak SDK-invented terms. + """ + + model_config = ConfigDict(extra="forbid") + + agent_url: str = Field(description="Agent URL passed to the resolver") + brand_json_url: str = Field( + description="Operator-declared brand.json URL discovered via " + "``identity.brand_json_url`` on the capabilities response" + ) + agent_entry: dict[str, Any] = Field( + description="The matching entry from brand.json's agents[] array" + ) + jwks_uri: str = Field(description="The JWKS URI from the matched agent entry") + jwks: dict[str, Any] = Field( + description="Full JWK set fetched from ``jwks_uri`` (RFC 7517 ``{keys: [...]}``)" + ) + fetched_at: float = Field(description="Resolution wall-clock time (Unix epoch seconds)") + trace: list[TraceEntry] = Field(default_factory=list) + + +# ---- Capabilities fetch (the SSRF gap this module closes) ---- + + +@dataclass +class _CapabilitiesPayload: + body: dict[str, Any] + final_url: str + + +async def _fetch_capabilities( + agent_url: str, + *, + allow_private: bool, + max_body_bytes: int, + max_redirects: int, + timeout_seconds: float, + client_factory: Callable[[str], AbstractAsyncContextManager[httpx.AsyncClient]] | None, +) -> _CapabilitiesPayload: + """SSRF-pinned ``GET `` returning the parsed + capabilities body and the final URL after redirects (if any are + allowed). + + Mirrors the brand.json fetcher's posture: per-hop IP pin via + :func:`build_async_ip_pinned_transport`, body cap before parse, + no auto-redirect, ``trust_env=False`` so proxy env vars can't + rewrite the destination. + + Capabilities-specific tightening: default ``max_redirects=0`` + blocks cross-origin redirect-as-identity-pivot. + """ + if client_factory is not None: + client_cm = client_factory(agent_url) + else: + try: + transport = build_async_ip_pinned_transport(agent_url, allow_private=allow_private) + except SSRFValidationError as exc: + raise AgentResolverError( + "capabilities_unreachable", f"agent_url failed SSRF check: {exc}" + ) from exc + except ValueError as exc: + raise AgentResolverError( + "invalid_agent_url", f"agent_url is not a valid URL: {exc}" + ) from exc + client_cm = httpx.AsyncClient( + transport=transport, + timeout=timeout_seconds, + follow_redirects=False, + trust_env=False, + ) + + seen: set[str] = set() + url = agent_url + for hop in range(max_redirects + 1): + if url in seen: + raise AgentResolverError( + "capabilities_unreachable", + "capabilities fetch hit redirect loop", + ) + seen.add(url) + + try: + async with client_cm as client: + try: + response = await client.get(url, headers={"accept": "application/json"}) + except SSRFValidationError as exc: + raise AgentResolverError( + "capabilities_unreachable", + f"agent_url failed SSRF check: {exc}", + ) from exc + except (httpx.HTTPError, OSError) as exc: + raise AgentResolverError( + "capabilities_unreachable", + f"capabilities fetch failed: {exc}", + ) from exc + + if 300 <= response.status_code < 400 and "location" in response.headers: + if hop == max_redirects: + raise AgentResolverError( + "capabilities_unreachable", + f"capabilities fetch hit redirect limit ({max_redirects})", + ) + url = str(httpx.URL(url).join(response.headers["location"])) + # New host → new transport. Rebuild client_cm. + try: + transport = build_async_ip_pinned_transport( + url, allow_private=allow_private + ) + except SSRFValidationError as exc: + raise AgentResolverError( + "capabilities_unreachable", + f"redirect target failed SSRF check: {exc}", + ) from exc + client_cm = httpx.AsyncClient( + transport=transport, + timeout=timeout_seconds, + follow_redirects=False, + trust_env=False, + ) + continue + + if response.status_code != 200: + raise AgentResolverError( + "capabilities_unreachable", + f"capabilities fetch returned HTTP {response.status_code}", + ) + + body_bytes = response.content + if len(body_bytes) > max_body_bytes: + raise AgentResolverError( + "capabilities_invalid", + f"capabilities response exceeds {max_body_bytes} bytes " + f"(got {len(body_bytes)})", + ) + + try: + parsed = response.json() + except (ValueError, httpx.DecodingError, json.JSONDecodeError) as exc: + raise AgentResolverError( + "capabilities_invalid", + "capabilities response is not valid JSON", + ) from exc + + if not isinstance(parsed, dict): + raise AgentResolverError( + "capabilities_invalid", + "capabilities response is not a JSON object", + ) + + return _CapabilitiesPayload(body=parsed, final_url=url) + except AgentResolverError: + raise + + # Unreachable: loop body either returns or raises on every iteration. + raise AgentResolverError( + "capabilities_unreachable", "capabilities fetch exhausted redirect chain" + ) + + +def _extract_brand_json_url(capabilities: dict[str, Any]) -> str: + """Pluck ``identity.brand_json_url`` from the capabilities body. + + The field is forward-compat — typed in 3.1, accepted via + ``additionalProperties: true`` on 3.0.5+. Reading the raw dict + avoids depending on the typed Pydantic surface (which won't carry + the field until 3.1 schemas land). + """ + identity = capabilities.get("identity") + if not isinstance(identity, dict): + raise AgentResolverError( + "brand_json_url_missing", + "capabilities response has no `identity` object", + ) + brand_json_url = identity.get("brand_json_url") + if not isinstance(brand_json_url, str) or not brand_json_url: + raise AgentResolverError( + "brand_json_url_missing", + "capabilities `identity.brand_json_url` is missing or not a string " + "(operator must publish 3690 to be discoverable from agent URL)", + ) + return brand_json_url + + +# ---- Public API ---- + + +async def async_resolve_agent( + agent_url: str, + *, + agent_type: BrandAgentType, + agent_id: str | None = None, + brand_id: str | None = None, + allow_private_destinations: bool = False, + max_capabilities_bytes: int = DEFAULT_MAX_CAPABILITIES_BYTES, + max_capabilities_redirects: int = DEFAULT_CAPABILITIES_MAX_REDIRECTS, + capabilities_timeout_seconds: float = DEFAULT_CAPABILITIES_TIMEOUT_SECONDS, + _capabilities_client_factory: ( + Callable[[str], AbstractAsyncContextManager[httpx.AsyncClient]] | None + ) = None, + _brand_jwks_client_factory: ( + Callable[[str], AbstractAsyncContextManager[httpx.AsyncClient]] | None + ) = None, +) -> AgentResolution: + """Bootstrap from ``agent_url`` to its JWK set via brand.json. + + Walks three hops with SSRF guards on each: + + 1. ``GET `` — capabilities fetch (this module). + 2. ``GET `` — brand.json walk via + :class:`BrandJsonJwksResolver`. + 3. ``GET `` — JWKS fetch via + :func:`async_default_jwks_fetcher`. + + The selector tuple ``(agent_type, agent_id, brand_id)`` matches the + brand.json ``agents[]`` entry. ``agent_type`` is required because + brand.json may list multiple agents (sales, governance, creative, + etc.) under the same operator and the resolver can't infer which + one ``agent_url`` corresponds to from the agent URL alone — that's + operator topology, not in the wire response. + """ + trace: list[TraceEntry] = [] + fetched_at = time.time() + + # --- Hop 1: capabilities --- + cap_start = time.monotonic() + try: + capabilities = await _fetch_capabilities( + agent_url, + allow_private=allow_private_destinations, + max_body_bytes=max_capabilities_bytes, + max_redirects=max_capabilities_redirects, + timeout_seconds=capabilities_timeout_seconds, + client_factory=_capabilities_client_factory, + ) + trace.append( + TraceEntry( + hop="capabilities", + url=capabilities.final_url, + status="ok", + latency_ms=(time.monotonic() - cap_start) * 1000.0, + ) + ) + except AgentResolverError as exc: + trace.append( + TraceEntry( + hop="capabilities", + url=agent_url, + status="error", + latency_ms=(time.monotonic() - cap_start) * 1000.0, + error_code=exc.code, + error_message=exc.message, + ) + ) + raise + + brand_json_url = _extract_brand_json_url(capabilities.body) + + # --- Hop 2: brand.json --- + bj_start = time.monotonic() + brand_kwargs: dict[str, Any] = { + "agent_type": agent_type, + "agent_id": agent_id, + "brand_id": brand_id, + "allow_private_destinations": allow_private_destinations, + } + # Only forward _client_factory when caller passed one — keeps the + # test seam from squashing the patched-init default with None. + if _brand_jwks_client_factory is not None: + brand_kwargs["_client_factory"] = _brand_jwks_client_factory + resolver = BrandJsonJwksResolver(brand_json_url, **brand_kwargs) + try: + await resolver.force_refresh() + except BrandJsonResolverError as exc: + trace.append( + TraceEntry( + hop="brand_json", + url=brand_json_url, + status="error", + latency_ms=(time.monotonic() - bj_start) * 1000.0, + error_code=exc.code, + error_message=str(exc), + ) + ) + raise AgentResolverError( + "brand_json_resolution_failed", + f"brand.json resolution failed: {exc.code}: {exc}", + ) from exc + + jwks_uri = resolver.jwks_uri + resolved_agent_url = resolver.agent_url + if jwks_uri is None or resolved_agent_url is None: + # Defensive — force_refresh must populate the snapshot or raise. + raise AgentResolverError( + "brand_json_resolution_failed", + "brand.json refresh completed without populating jwks_uri / agent_url", + ) + trace.append( + TraceEntry( + hop="brand_json", + url=brand_json_url, + status="ok", + latency_ms=(time.monotonic() - bj_start) * 1000.0, + ) + ) + + # --- Hop 3: JWKS --- + jwks_start = time.monotonic() + try: + jwks = await async_default_jwks_fetcher(jwks_uri, allow_private=allow_private_destinations) + except SSRFValidationError as exc: + trace.append( + TraceEntry( + hop="jwks", + url=jwks_uri, + status="error", + latency_ms=(time.monotonic() - jwks_start) * 1000.0, + error_code="ssrf", + error_message=str(exc), + ) + ) + raise AgentResolverError("jwks_fetch_failed", f"JWKS URL failed SSRF check: {exc}") from exc + except (httpx.HTTPError, ValueError, OSError) as exc: + trace.append( + TraceEntry( + hop="jwks", + url=jwks_uri, + status="error", + latency_ms=(time.monotonic() - jwks_start) * 1000.0, + error_code="fetch_failed", + error_message=str(exc), + ) + ) + raise AgentResolverError("jwks_fetch_failed", f"JWKS fetch failed: {exc}") from exc + trace.append( + TraceEntry( + hop="jwks", + url=jwks_uri, + status="ok", + latency_ms=(time.monotonic() - jwks_start) * 1000.0, + ) + ) + + return AgentResolution( + agent_url=agent_url, + brand_json_url=brand_json_url, + agent_entry=_make_agent_entry(resolved_agent_url, jwks_uri, agent_type, agent_id), + jwks_uri=jwks_uri, + jwks=jwks, + fetched_at=fetched_at, + trace=trace, + ) + + +def resolve_agent( + agent_url: str, + *, + agent_type: BrandAgentType, + agent_id: str | None = None, + brand_id: str | None = None, + allow_private_destinations: bool = False, +) -> AgentResolution: + """Sync wrapper over :func:`async_resolve_agent` for CLI / scripts. + + Library code on an event loop should call + :func:`async_resolve_agent` directly — wrapping it in + :func:`asyncio.run` would deadlock the loop. + """ + return asyncio.run( + async_resolve_agent( + agent_url, + agent_type=agent_type, + agent_id=agent_id, + brand_id=brand_id, + allow_private_destinations=allow_private_destinations, + ) + ) + + +# ---- helpers ---- + + +def _make_agent_entry( + agent_url: str, + jwks_uri: str, + agent_type: BrandAgentType, + agent_id: str | None, +) -> dict[str, Any]: + """Synthesize the matched ``agents[]`` entry from the resolved + snapshot. The brand.json walk already discarded the surrounding + document; this is the projection consumers want. + """ + entry: dict[str, Any] = { + "type": agent_type, + "url": agent_url, + "jwks_uri": jwks_uri, + } + if agent_id is not None: + entry["id"] = agent_id + return entry + + +__all__ = [ + "AgentResolution", + "AgentResolverError", + "AgentResolverErrorCode", + "TraceEntry", + "async_resolve_agent", + "resolve_agent", +] diff --git a/src/adcp/signing/brand_jwks.py b/src/adcp/signing/brand_jwks.py index 675977383..30baf75e2 100644 --- a/src/adcp/signing/brand_jwks.py +++ b/src/adcp/signing/brand_jwks.py @@ -261,6 +261,14 @@ def agent_url(self) -> str | None: result attribution.""" return self._snapshot.agent_url if self._snapshot is not None else None + @property + def jwks_uri(self) -> str | None: + """The JWKS URI selected from brand.json's ``agents[]`` for + this resolver's ``(agent_type, agent_id, brand_id)`` tuple. + Populated after the first successful refresh; ``None`` on + cold cache.""" + return self._snapshot.jwks_uri if self._snapshot is not None else None + async def force_refresh(self) -> None: """Force refetch of both brand.json and inner JWKS, bypassing the cooldown. diff --git a/tests/test_agent_resolver.py b/tests/test_agent_resolver.py new file mode 100644 index 000000000..10279c6da --- /dev/null +++ b/tests/test_agent_resolver.py @@ -0,0 +1,420 @@ +"""``async_resolve_agent`` — bootstrap from agent URL to JWK set. + +Exercises the 3-hop walk: capabilities → brand.json → JWKS. Each hop +gets its own SSRF-pinned client; tests inject a shared +``_MockTransport`` that maps URLs to canned responses to verify +orchestration behavior (status codes, body parsing, error mapping, +trace shape) without standing up real HTTP. + +Algorithm-level coverage of the brand.json walk + JWK lookup lives in +``test_brand_jwks.py`` and ``test_jwks.py``; this file pins the +resolver-orchestrator behavior on top of those tested primitives. +""" + +from __future__ import annotations + +import json +from typing import Any + +import httpx +import pytest + +from adcp.signing import agent_resolver +from adcp.signing.agent_resolver import ( + AgentResolution, + AgentResolverError, + async_resolve_agent, + resolve_agent, +) +from adcp.signing.brand_jwks import BrandJsonJwksResolver + +# ---- Mock transport (shared across all 3 hops) ---- + + +class _MockTransport(httpx.AsyncBaseTransport): + """Maps URLs to canned ``(status, body, headers)`` triples. Each + call is recorded for assertion. + """ + + def __init__(self, responses: dict[str, dict[str, Any]]) -> None: + self.responses = responses + self.calls: list[httpx.Request] = [] + + async def handle_async_request(self, request: httpx.Request) -> httpx.Response: + self.calls.append(request) + url = str(request.url) + if url not in self.responses: + return httpx.Response(404, content=b"") + spec = self.responses[url] + return httpx.Response( + spec.get("status", 200), + content=spec.get("body", b""), + headers=spec.get("headers", {}), + ) + + +@pytest.fixture +def patch_resolver(monkeypatch: pytest.MonkeyPatch): + """Wire a single ``_MockTransport`` into every hop of the resolver. + + Returns a callable ``patch(responses) -> (transport, factory)`` that: + + * patches ``BrandJsonJwksResolver.__init__`` to inject the factory, + * patches ``async_default_jwks_fetcher`` at the resolver's import site, + * returns ``factory`` so the test passes it as + ``_capabilities_client_factory=factory`` to ``async_resolve_agent``. + + All three hops resolve through one transport. Tests assert + against ``transport.calls`` and the returned :class:`AgentResolution`. + """ + + def _patch( + responses: dict[str, dict[str, Any]], + ) -> tuple[_MockTransport, Any]: + transport = _MockTransport(responses) + + def factory(_url: str) -> httpx.AsyncClient: + return httpx.AsyncClient( + transport=transport, + timeout=5.0, + follow_redirects=False, + trust_env=False, + ) + + # Hop 2: brand.json — patch BrandJsonJwksResolver.__init__ to + # inject the factory under _client_factory. + original_brand_init = BrandJsonJwksResolver.__init__ + + def patched_brand_init(self, *args, **kwargs): # type: ignore[no-untyped-def] + kwargs.setdefault("_client_factory", factory) + return original_brand_init(self, *args, **kwargs) + + monkeypatch.setattr(BrandJsonJwksResolver, "__init__", patched_brand_init) + + # Hop 3: JWKS — replace the fetcher at the resolver's import site + # (the resolver imports it by name from adcp.signing.jwks). + async def mock_jwks_fetcher(uri: str, *, allow_private: bool = False) -> dict[str, Any]: + async with httpx.AsyncClient( + transport=transport, + timeout=5.0, + follow_redirects=False, + trust_env=False, + ) as client: + response = await client.get(uri, headers={"Accept": "application/json"}) + response.raise_for_status() + body = response.json() + if not isinstance(body, dict) or "keys" not in body: + raise ValueError(f"JWKS document at {uri!r} has no 'keys' array") + return body + + monkeypatch.setattr( + "adcp.signing.agent_resolver.async_default_jwks_fetcher", mock_jwks_fetcher + ) + return transport, factory + + return _patch + + +# ---- Helpers ---- + + +def _capabilities_body(brand_json_url: str | None) -> bytes: + identity: dict[str, Any] = {} + if brand_json_url is not None: + identity["brand_json_url"] = brand_json_url + return json.dumps({"adcp_version": "3.0.5", "identity": identity}).encode() + + +def _brand_json_body(jwks_uri: str) -> bytes: + return json.dumps( + { + "agents": [ + { + "type": "sales", + "url": "https://buyer.example.com/mcp", + "jwks_uri": jwks_uri, + } + ] + } + ).encode() + + +def _jwks_body() -> bytes: + return json.dumps( + { + "keys": [ + { + "kty": "OKP", + "crv": "Ed25519", + "x": "11qYAYKxCrfVS_7TyWQHOg7hcvPapiMlrwIaaPcHURo", + "kid": "test-key-1", + "alg": "EdDSA", + } + ] + } + ).encode() + + +# ---- Happy path ---- + + +@pytest.mark.asyncio +async def test_resolve_returns_agent_resolution_with_full_trace(patch_resolver) -> None: + """End-to-end: all 3 hops succeed → AgentResolution carries the + expected URLs, the JWK set, and a 3-entry trace marked ok.""" + transport, factory = patch_resolver( + { + "https://buyer.example.com/mcp": { + "body": _capabilities_body("https://example.com/.well-known/brand.json"), + "headers": {"content-type": "application/json"}, + }, + "https://example.com/.well-known/brand.json": { + "body": _brand_json_body("https://example.com/.well-known/jwks.json"), + "headers": {"content-type": "application/json"}, + }, + "https://example.com/.well-known/jwks.json": { + "body": _jwks_body(), + "headers": {"content-type": "application/json"}, + }, + } + ) + + result = await async_resolve_agent( + "https://buyer.example.com/mcp", + agent_type="sales", + _capabilities_client_factory=factory, + ) + + assert isinstance(result, AgentResolution) + assert result.agent_url == "https://buyer.example.com/mcp" + assert result.brand_json_url == "https://example.com/.well-known/brand.json" + assert result.jwks_uri == "https://example.com/.well-known/jwks.json" + assert result.jwks["keys"][0]["kid"] == "test-key-1" + assert result.agent_entry["type"] == "sales" + assert len(result.trace) == 3 + assert [t.hop for t in result.trace] == ["capabilities", "brand_json", "jwks"] + assert all(t.status == "ok" for t in result.trace) + assert all(t.latency_ms >= 0 for t in result.trace) + # Verify each hop got hit exactly once + assert len(transport.calls) == 3 + + +# ---- Error paths ---- + + +@pytest.mark.asyncio +async def test_resolve_raises_when_capabilities_unreachable(patch_resolver) -> None: + """Capabilities returns 503 → AgentResolverError(capabilities_unreachable), + trace records the error hop with code + message.""" + _, factory = patch_resolver( + { + "https://buyer.example.com/mcp": { + "status": 503, + "body": b"", + }, + } + ) + + with pytest.raises(AgentResolverError) as exc: + await async_resolve_agent( + "https://buyer.example.com/mcp", + agent_type="sales", + _capabilities_client_factory=factory, + ) + assert exc.value.code == "capabilities_unreachable" + assert "503" in exc.value.message + + +@pytest.mark.asyncio +async def test_resolve_raises_when_capabilities_body_invalid_json(patch_resolver) -> None: + """Capabilities returns 200 with non-JSON body → capabilities_invalid.""" + _, factory = patch_resolver( + { + "https://buyer.example.com/mcp": { + "body": b"not json", + "headers": {"content-type": "text/html"}, + }, + } + ) + + with pytest.raises(AgentResolverError) as exc: + await async_resolve_agent( + "https://buyer.example.com/mcp", + agent_type="sales", + _capabilities_client_factory=factory, + ) + assert exc.value.code == "capabilities_invalid" + + +@pytest.mark.asyncio +async def test_resolve_raises_when_brand_json_url_missing(patch_resolver) -> None: + """Capabilities response has no ``identity.brand_json_url`` → + brand_json_url_missing. This is the gate on operators publishing + adcp#3690. + """ + _, factory = patch_resolver( + { + "https://buyer.example.com/mcp": { + "body": _capabilities_body(brand_json_url=None), + "headers": {"content-type": "application/json"}, + }, + } + ) + + with pytest.raises(AgentResolverError) as exc: + await async_resolve_agent( + "https://buyer.example.com/mcp", + agent_type="sales", + _capabilities_client_factory=factory, + ) + assert exc.value.code == "brand_json_url_missing" + + +@pytest.mark.asyncio +async def test_resolve_raises_when_brand_json_unreachable(patch_resolver) -> None: + """Capabilities OK, brand.json returns 404 → + brand_json_resolution_failed (wraps the underlying + BrandJsonResolverError code in the message). + """ + _, factory = patch_resolver( + { + "https://buyer.example.com/mcp": { + "body": _capabilities_body("https://example.com/.well-known/brand.json"), + "headers": {"content-type": "application/json"}, + }, + # brand.json URL not in mock → 404 + } + ) + + with pytest.raises(AgentResolverError) as exc: + await async_resolve_agent( + "https://buyer.example.com/mcp", + agent_type="sales", + _capabilities_client_factory=factory, + ) + assert exc.value.code == "brand_json_resolution_failed" + + +@pytest.mark.asyncio +async def test_resolve_raises_when_jwks_fetch_fails(patch_resolver) -> None: + """Capabilities + brand.json succeed, JWKS endpoint returns 500 → + jwks_fetch_failed. + """ + _, factory = patch_resolver( + { + "https://buyer.example.com/mcp": { + "body": _capabilities_body("https://example.com/.well-known/brand.json"), + "headers": {"content-type": "application/json"}, + }, + "https://example.com/.well-known/brand.json": { + "body": _brand_json_body("https://example.com/.well-known/jwks.json"), + "headers": {"content-type": "application/json"}, + }, + "https://example.com/.well-known/jwks.json": { + "status": 500, + "body": b"", + }, + } + ) + + with pytest.raises(AgentResolverError) as exc: + await async_resolve_agent( + "https://buyer.example.com/mcp", + agent_type="sales", + _capabilities_client_factory=factory, + ) + assert exc.value.code == "jwks_fetch_failed" + + +# ---- Body cap (DoS guard) ---- + + +@pytest.mark.asyncio +async def test_resolve_rejects_oversize_capabilities_body(patch_resolver) -> None: + """Capabilities body exceeding ``max_capabilities_bytes`` is + rejected before parse — DoS guard. Also confirms the cap actually + fires at the configured limit (not at some baked-in default). + """ + _, factory = patch_resolver( + { + "https://buyer.example.com/mcp": { + "body": b"x" * 1024, + "headers": {"content-type": "application/json"}, + }, + } + ) + + with pytest.raises(AgentResolverError) as exc: + await async_resolve_agent( + "https://buyer.example.com/mcp", + agent_type="sales", + max_capabilities_bytes=512, + _capabilities_client_factory=factory, + ) + assert exc.value.code == "capabilities_invalid" + assert "exceeds" in exc.value.message + + +# ---- Sync wrapper ---- + + +def test_sync_wrapper_dispatches_via_asyncio_run(monkeypatch: pytest.MonkeyPatch) -> None: + """``resolve_agent`` is the sync convenience wrapper for CLI / + scripts. Spot-check that it dispatches through ``asyncio.run`` and + returns the result of ``async_resolve_agent`` unchanged. + + Library code on an event loop should call ``async_resolve_agent`` + directly — wrapping in ``asyncio.run`` from inside a running loop + would deadlock. + """ + sentinel = AgentResolution( + agent_url="https://buyer.example.com/mcp", + brand_json_url="https://example.com/.well-known/brand.json", + agent_entry={"type": "sales", "url": "https://x", "jwks_uri": "https://j"}, + jwks_uri="https://j", + jwks={"keys": []}, + fetched_at=0.0, + trace=[], + ) + + async def fake_async_resolve(*args, **kwargs): # type: ignore[no-untyped-def] + return sentinel + + monkeypatch.setattr(agent_resolver, "async_resolve_agent", fake_async_resolve) + + result = resolve_agent("https://buyer.example.com/mcp", agent_type="sales") + assert result is sentinel + + +# ---- Forward-compat read of identity.brand_json_url ---- + + +def test_extract_brand_json_url_reads_from_extra_fields() -> None: + """``identity.brand_json_url`` is forward-compat on 3.0.5 (typed in + 3.1). The extractor reads it as a raw dict key, so it works whether + the field is in the typed Pydantic surface or only in + ``model_extra`` — pinning the read path so a future schema bump + doesn't break this contract. + """ + raw = { + "adcp_version": "3.0.5", + "identity": { + "per_principal_key_isolation": True, + "brand_json_url": "https://example.com/.well-known/brand.json", + }, + } + assert ( + agent_resolver._extract_brand_json_url(raw) == "https://example.com/.well-known/brand.json" + ) + + +def test_extract_brand_json_url_raises_when_identity_missing() -> None: + with pytest.raises(AgentResolverError) as exc: + agent_resolver._extract_brand_json_url({"adcp_version": "3.0.5"}) + assert exc.value.code == "brand_json_url_missing" + + +def test_extract_brand_json_url_raises_when_field_empty_string() -> None: + with pytest.raises(AgentResolverError) as exc: + agent_resolver._extract_brand_json_url({"identity": {"brand_json_url": ""}}) + assert exc.value.code == "brand_json_url_missing"