diff --git a/pyproject.toml b/pyproject.toml index 488bc7ae..efd1df09 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -86,6 +86,15 @@ dependencies = [ # bundled AdCP schemas. Pin to the Draft 7 generation the schemas # declare via ``$schema``. "jsonschema>=4.0.0", + # Public Suffix List lookups — used by ``adcp.signing.etld`` to compute + # the eTLD+1 (registrable domain) for the brand-authorization binding + # mandated by ADCP request-signing spec (#3690). We pin a recent floor + # so the bundled PSL snapshot is fresh; the helper constructs the + # extractor with ``suffix_list_urls=()`` to disable network refresh — + # production verifiers must not silently re-fetch the PSL at request + # time, and pinned snapshot semantics are what the spec requires for + # cross-implementation conformance. + "tldextract>=5.1.0", ] [project.scripts] diff --git a/src/adcp/signing/__init__.py b/src/adcp/signing/__init__.py index f81bd5e7..ab21891e 100644 --- a/src/adcp/signing/__init__.py +++ b/src/adcp/signing/__init__.py @@ -101,6 +101,13 @@ SigningDecision, operation_needs_signing, ) +from adcp.signing.brand_authz import ( + BrandAuthorizationReason, + BrandAuthorizationResolver, + BrandAuthorizationResult, + BrandJsonAuthorizationResolver, + build_brand_json_resolvers, +) from adcp.signing.brand_jwks import ( BrandAgentType, BrandJsonJwksResolver, @@ -175,6 +182,11 @@ REQUEST_SIGNATURE_WINDOW_INVALID, SignatureVerificationError, ) +from adcp.signing.etld import ( + host_from, + registrable_domain, + same_registrable_domain, +) from adcp.signing.ip_pinned_transport import ( AsyncIpPinnedTransport, IpPinnedTransport, @@ -293,6 +305,10 @@ def __init__(self, *args: object, **kwargs: object) -> None: "AsyncJwksResolver", "AsyncRevocationListFetcher", "BrandAgentType", + "BrandAuthorizationReason", + "BrandAuthorizationResolver", + "BrandAuthorizationResult", + "BrandJsonAuthorizationResolver", "BrandJsonJwksResolver", "BrandJsonResolverError", "BrandJsonResolverErrorCode", @@ -373,6 +389,7 @@ def __init__(self, *args: object, **kwargs: object) -> None: "b64url_decode", "b64url_encode", "build_async_ip_pinned_transport", + "build_brand_json_resolvers", "build_capability_cache_key", "build_ip_pinned_transport", "build_signature_base", @@ -388,6 +405,7 @@ def __init__(self, *args: object, **kwargs: object) -> None: "extract_signature_bytes", "format_signature_header", "generate_signing_keypair", + "host_from", "install_signing_event_hook", "load_private_key_pem", "operation_needs_signing", @@ -395,8 +413,10 @@ def __init__(self, *args: object, **kwargs: object) -> None: "pem_to_adcp_jwk", "private_key_from_jwk", "public_key_from_jwk", + "registrable_domain", "resolve_agent", "resolve_and_validate_host", + "same_registrable_domain", "sign_request", "sign_signature_base", "sign_standard_webhook", diff --git a/src/adcp/signing/brand_authz.py b/src/adcp/signing/brand_authz.py new file mode 100644 index 00000000..04ec96a9 --- /dev/null +++ b/src/adcp/signing/brand_authz.py @@ -0,0 +1,561 @@ +"""Per-brand authorization check for buyer agents. + +Tier 3 of the v3 identity stack. Composes with — but is separate from — +the JWKS resolution surface (:mod:`adcp.signing.brand_jwks`). Where the +JWKS resolver answers "what public key signs for this counterparty?", +this resolver answers "is this agent authorized to act *for this brand*?" + +Per ADCP request-signing spec (#3690), the binding is: + +1. **Listed in ``agents[]``.** The brand's ``/.well-known/brand.json`` + ``agents[]`` array enumerates the agents the brand has authorized. + The verified ``agent_url`` MUST appear as ``url`` on one of these + entries (canonical-URL match). When the caller supplies an + ``agent_type`` filter, the entry's ``type`` MUST match too. + +2. **Host-bound.** EITHER the agent host shares an eTLD+1 with the + brand domain (the agent lives under the brand's own registrable + domain — the common case for first-party operations), OR the agent + host is listed in ``house.authorized_operators[]`` (multi-tenant + SaaS operators — WPP / GroupM / etc. acting on behalf of multiple + brand clients). + +When the caller passes ``brand_id``, the operator-delegation check is +scoped: the operator's ``brands[]`` must contain ``brand_id`` (or +``"*"``). Without ``brand_id``, only operators authorized for ``"*"`` +satisfy the delegation check — the resolver fails closed on ambiguous +scope. + +**Shared fetcher.** Both this resolver and +:class:`BrandJsonJwksResolver` walk brand.json. Construct them with a +shared ``_BrandJsonFetcher`` (one fetch, two consumers) via +:func:`build_brand_json_resolvers` to avoid double-fetching. +""" + +from __future__ import annotations + +import time +from collections.abc import Callable +from dataclasses import dataclass +from typing import Any, Literal, Protocol, runtime_checkable + +from adcp.signing.brand_jwks import ( + DEFAULT_BRAND_JSON_TIMEOUT_SECONDS, + DEFAULT_MAX_AGE_SECONDS, + DEFAULT_MAX_BRAND_JSON_BYTES, + DEFAULT_MAX_REDIRECTS, + DEFAULT_MIN_COOLDOWN_SECONDS, + BrandAgentType, + BrandJsonJwksResolver, + BrandJsonResolverError, + _BrandJsonFetcher, + _BrandJsonSnapshot, + _ClientFactory, +) +from adcp.signing.etld import host_from, registrable_domain, same_registrable_domain + +#: Reason a brand-authorization check resolved the way it did. Used +#: for verifier error attribution and adopter logging. The framework +#: maps these to ``request_signature_*`` error codes when refusing a +#: request (stage 4 wires that up). +BrandAuthorizationReason = Literal[ + "etld1_match", + "operator_delegation", + "agent_not_listed", + "agent_ambiguous", + "agent_type_mismatch", + "binding_failed", + "brand_json_unavailable", + "brand_domain_invalid", +] + + +@dataclass(frozen=True) +class BrandAuthorizationResult: + """Structured outcome of a brand-authorization check. + + ``authorized`` is the bottom-line gate; ``reason`` carries the why + so the verifier can emit a precise error code and adopters can log + the decision. ``matched_*`` fields are populated on success for + audit attribution and on certain failure paths (e.g. agent matched + but binding failed → ``matched_agent_url`` is set). + """ + + authorized: bool + reason: BrandAuthorizationReason + matched_agent_url: str | None = None + matched_agent_type: BrandAgentType | None = None + matched_operator_domain: str | None = None + #: When ``reason == "brand_json_unavailable"``, this carries the + #: underlying fetch error so the framework can decide between + #: 5xx-retryable and 4xx-misconfiguration. + fetch_error: BrandJsonResolverError | None = None + + +@runtime_checkable +class BrandAuthorizationResolver(Protocol): + """Verify a buyer agent is authorized to act for a brand. + + The framework calls :meth:`is_authorized` per request after + cryptographic identity verification (Tier 1) and registry + resolution (Tier 2). Adopters implement this Protocol to control + the authorization decision; the default + :class:`BrandJsonAuthorizationResolver` reads ``brand.json``. + + Implementations MUST be safe under concurrent calls. + """ + + async def is_authorized( + self, + *, + agent_url: str, + brand_domain: str, + agent_type: BrandAgentType | None = None, + brand_id: str | None = None, + ) -> bool: + """Return True iff ``agent_url`` is authorized to act for + ``brand_domain`` (optionally narrowed to ``brand_id`` and/or + ``agent_type``).""" + + +class BrandJsonAuthorizationResolver: + """Reference :class:`BrandAuthorizationResolver` reading brand.json. + + Walks the brand's ``/.well-known/brand.json`` (with redirect + following, body cap, and SSRF-pinned transport handled by the + shared :class:`_BrandJsonFetcher`), then applies the two-step + binding check: + + 1. The verified ``agent_url`` must be listed in ``agents[]`` — + optionally narrowed by ``agent_type`` and/or ``brand_id``. + 2. The agent host must be eTLD+1-bound to the brand domain, OR + listed in ``house.authorized_operators[]`` with a ``brands[]`` + scope that covers the request. + + On a cold cache, fetches synchronously inside the first + :meth:`is_authorized` / :meth:`check` call. The fetcher caches the + parsed body and honors ``Cache-Control`` / ``ETag`` exactly as the + JWKS resolver does. + """ + + def __init__( + self, + brand_json_url: str, + *, + min_cooldown_seconds: float = DEFAULT_MIN_COOLDOWN_SECONDS, + max_age_seconds: float = DEFAULT_MAX_AGE_SECONDS, + max_redirects: int = DEFAULT_MAX_REDIRECTS, + max_body_bytes: int = DEFAULT_MAX_BRAND_JSON_BYTES, + allow_private_destinations: bool = False, + timeout_seconds: float = DEFAULT_BRAND_JSON_TIMEOUT_SECONDS, + clock: Callable[[], float] | None = None, + _client_factory: _ClientFactory | None = None, + _fetcher: _BrandJsonFetcher | None = None, + ) -> None: + self._clock = clock or time.time + self._allow_private = allow_private_destinations + self._fetcher = _fetcher or _BrandJsonFetcher( + brand_json_url, + min_cooldown_seconds=min_cooldown_seconds, + max_age_seconds=max_age_seconds, + max_redirects=max_redirects, + max_body_bytes=max_body_bytes, + allow_private_destinations=allow_private_destinations, + timeout_seconds=timeout_seconds, + clock=self._clock, + _client_factory=_client_factory, + ) + + @property + def brand_json_url(self) -> str: + return self._fetcher.brand_json_url + + async def is_authorized( + self, + *, + agent_url: str, + brand_domain: str, + agent_type: BrandAgentType | None = None, + brand_id: str | None = None, + ) -> bool: + result = await self.check( + agent_url=agent_url, + brand_domain=brand_domain, + agent_type=agent_type, + brand_id=brand_id, + ) + return result.authorized + + async def check( + self, + *, + agent_url: str, + brand_domain: str, + agent_type: BrandAgentType | None = None, + brand_id: str | None = None, + ) -> BrandAuthorizationResult: + """Run the full binding check and return a structured result. + + Use :meth:`is_authorized` for the boolean gate; use this + method when you need the reason for logging or to emit a + precise error code. + """ + # Validate brand_domain up front so we never silently let a + # blank / IP-literal brand domain match anything via shared + # binding semantics downstream. + try: + brand_host = host_from(brand_domain) + except ValueError: + return BrandAuthorizationResult(False, reason="brand_domain_invalid") + if registrable_domain(brand_host) is None: + return BrandAuthorizationResult(False, reason="brand_domain_invalid") + + snap = await self._snapshot() + if isinstance(snap, BrandJsonResolverError): + return BrandAuthorizationResult( + False, + reason="brand_json_unavailable", + fetch_error=snap, + ) + + listing = _find_listed_agents( + snap.data, + agent_url=agent_url, + agent_type=agent_type, + brand_id=brand_id, + ) + + if len(listing) == 0: + # Distinguish "not present at all" from "present but wrong type": + # the latter is a stronger signal of misconfiguration. Spec + # folds both into ``request_signature_agent_not_in_brand_json``; + # we keep the finer reason so the framework can choose to + # surface it in diagnostics. + if agent_type is not None and _has_listed_agent_at( + snap.data, agent_url=agent_url, brand_id=brand_id + ): + return BrandAuthorizationResult(False, reason="agent_type_mismatch") + return BrandAuthorizationResult(False, reason="agent_not_listed") + + if len(listing) > 1: + # Multiple agents[] entries byte-equal the agent URL. Per + # ADCP #3690 this maps to ``request_signature_brand_json_ambiguous``: + # the brand.json schema does not constrain agents[] to be + # unique-by-URL, so an operator misconfig can produce duplicates + # — fail closed rather than silently picking one. + return BrandAuthorizationResult( + False, + reason="agent_ambiguous", + matched_agent_url=listing[0].url, + ) + + matched = listing[0] + + # Step 2a: eTLD+1 binding. + if same_registrable_domain(agent_url, brand_host): + return BrandAuthorizationResult( + True, + reason="etld1_match", + matched_agent_url=matched.url, + matched_agent_type=matched.type, + ) + + # Step 2b: authorized_operators[] delegation. + operator_domain = _find_authorized_operator( + snap.data, + agent_url=agent_url, + brand_id=brand_id, + ) + if operator_domain is not None: + return BrandAuthorizationResult( + True, + reason="operator_delegation", + matched_agent_url=matched.url, + matched_agent_type=matched.type, + matched_operator_domain=operator_domain, + ) + + return BrandAuthorizationResult( + False, + reason="binding_failed", + matched_agent_url=matched.url, + matched_agent_type=matched.type, + ) + + async def _snapshot(self) -> _BrandJsonSnapshot | BrandJsonResolverError: + """Return a fresh-enough snapshot or the fetch error.""" + snap = self._fetcher.snapshot + if snap is None: + try: + return await self._fetcher.refresh() + except BrandJsonResolverError as exc: + return exc + + if self._fetcher.is_stale(snap) and self._fetcher.can_refresh(snap): + try: + return await self._fetcher.refresh() + except BrandJsonResolverError: + # Stale-on-error: serve the prior snapshot. Matches + # the JWKS resolver's posture exactly. + return snap + return snap + + +# --- builder for sharing the fetcher with the JWKS resolver --- + + +def build_brand_json_resolvers( + brand_json_url: str, + *, + agent_type: BrandAgentType, + agent_id: str | None = None, + brand_id: str | None = None, + min_cooldown_seconds: float = DEFAULT_MIN_COOLDOWN_SECONDS, + max_age_seconds: float = DEFAULT_MAX_AGE_SECONDS, + max_redirects: int = DEFAULT_MAX_REDIRECTS, + max_body_bytes: int = DEFAULT_MAX_BRAND_JSON_BYTES, + allow_private_destinations: bool = False, + timeout_seconds: float = DEFAULT_BRAND_JSON_TIMEOUT_SECONDS, + clock: Callable[[], float] | None = None, +) -> tuple[BrandJsonJwksResolver, BrandJsonAuthorizationResolver]: + """Construct a JWKS resolver and an authorization resolver that + share one underlying brand.json snapshot. + + Both resolvers walk brand.json; constructing them via this builder + avoids paying two fetches per request and keeps cache-control / + ETag state single-source. + + Returns ``(jwks_resolver, authz_resolver)``. Hand the JWKS resolver + to the request-signature verifier; hand the authz resolver to the + framework's ``serve(brand_authz_resolver=...)``. + """ + fetcher = _BrandJsonFetcher( + brand_json_url, + min_cooldown_seconds=min_cooldown_seconds, + max_age_seconds=max_age_seconds, + max_redirects=max_redirects, + max_body_bytes=max_body_bytes, + allow_private_destinations=allow_private_destinations, + timeout_seconds=timeout_seconds, + clock=clock, + ) + jwks = BrandJsonJwksResolver( + brand_json_url, + agent_type=agent_type, + agent_id=agent_id, + brand_id=brand_id, + min_cooldown_seconds=min_cooldown_seconds, + max_age_seconds=max_age_seconds, + max_redirects=max_redirects, + max_body_bytes=max_body_bytes, + allow_private_destinations=allow_private_destinations, + timeout_seconds=timeout_seconds, + clock=clock, + _fetcher=fetcher, + ) + authz = BrandJsonAuthorizationResolver( + brand_json_url, + min_cooldown_seconds=min_cooldown_seconds, + max_age_seconds=max_age_seconds, + max_redirects=max_redirects, + max_body_bytes=max_body_bytes, + allow_private_destinations=allow_private_destinations, + timeout_seconds=timeout_seconds, + clock=clock, + _fetcher=fetcher, + ) + return jwks, authz + + +# --- internal helpers --- + + +@dataclass(frozen=True) +class _ListedAgent: + """The brand-listed agent we matched against the request.""" + + url: str + type: BrandAgentType | None + + +def _find_listed_agents( + data: dict[str, Any], + *, + agent_url: str, + agent_type: BrandAgentType | None, + brand_id: str | None, +) -> list[_ListedAgent]: + """Search ``agents[]`` arrays for entries matching ``agent_url``. + + Walks top-level ``agents``, ``house.agents``, and per-brand + ``brands[].agents`` (bounded by ``brand_id`` when provided), + returning every entry whose ``url`` **byte-equals** ``agent_url``. + + **Byte-equal match by spec mandate.** Per ADCP #3690 security + profile: "Find the entry in ``agents[]`` whose ``url`` byte-equals + A (no canonicalization at this step). The most common failure + mode is a trailing-slash or scheme mismatch (e.g., + ``https://x.com/mcp`` ≠ ``https://x.com/mcp/``)." Canonicalizing + would silently authorize agents whose URL is "close enough" to + what the brand declared — operators must be deliberate about what + they list. + + Returning the full match list (rather than the first match) lets + the caller distinguish ``agent_not_listed`` (0 matches), + ``agent_type_mismatch`` (0 type-filtered matches but the URL is + listed), and ``agent_ambiguous`` (>1 matches — operator misconfig, + spec maps to ``request_signature_brand_json_ambiguous``). + """ + matches: list[_ListedAgent] = [] + for entry in _walk_agents(data, brand_id=brand_id): + if not isinstance(entry, dict): + continue + url = entry.get("url") + if not isinstance(url, str) or url != agent_url: + continue + if agent_type is not None and entry.get("type") != agent_type: + continue + listed_type = entry.get("type") + matches.append( + _ListedAgent( + url=url, + type=listed_type if isinstance(listed_type, str) else None, # type: ignore[arg-type] + ) + ) + return matches + + +def _has_listed_agent_at( + data: dict[str, Any], + *, + agent_url: str, + brand_id: str | None, +) -> bool: + """Return True if ``agent_url`` byte-equals any listed ``agents[].url`` + regardless of ``type`` — used to distinguish ``agent_type_mismatch`` + from ``agent_not_listed`` for caller diagnostics.""" + for entry in _walk_agents(data, brand_id=brand_id): + if not isinstance(entry, dict): + continue + url = entry.get("url") + if isinstance(url, str) and url == agent_url: + return True + return False + + +def _walk_agents(data: dict[str, Any], *, brand_id: str | None) -> list[Any]: + """Collect all ``agents[]`` entries we should consult. + + Layers consulted: + + * Top-level ``agents`` (Brand Agent doc shape). + * ``house.agents`` (House Portfolio doc shape — agents serving the + whole portfolio). + * ``brands[].agents`` — when ``brand_id`` is set, only that one + brand's entry; otherwise every brand's entries (broadest match, + since the caller didn't narrow). + """ + out: list[Any] = [] + + top = data.get("agents") + if isinstance(top, list): + out.extend(top) + + house = data.get("house") + if isinstance(house, dict): + h_agents = house.get("agents") + if isinstance(h_agents, list): + out.extend(h_agents) + + brands = data.get("brands") + if isinstance(brands, list): + for brand in brands: + if not isinstance(brand, dict): + continue + if brand_id is not None and brand.get("id") != brand_id: + continue + b_agents = brand.get("agents") + if isinstance(b_agents, list): + out.extend(b_agents) + + return out + + +def _find_authorized_operator( + data: dict[str, Any], + *, + agent_url: str, + brand_id: str | None, +) -> str | None: + """Find an ``authorized_operators[]`` entry whose ``domain`` matches + the agent host's eTLD+1 AND whose ``brands[]`` scope covers + ``brand_id`` (or ``"*"`` when ``brand_id`` is None — fail-closed + on unscoped requests). + + Returns the operator's ``domain`` string on match; ``None`` on no + match. The match is on eTLD+1 equality (agent host registrable + domain == declared operator domain registrable domain) so an + operator declared as ``wpp.com`` covers ``api.wpp.com``, + ``us-east.wpp.com``, etc. — same posture as eTLD+1 step 2a. + + **Location: top-level on the brand.json document**, not nested + under ``house``. Per the canonical brand.json schema (House + Portfolio variant), ``authorized_operators`` is a sibling of + ``house`` / ``brands`` / ``contact`` / ``trademarks``, and ADCP + #3690 ``security.mdx`` step 3 reads it unqualified at the document + root. Reading it from ``data["house"]["authorized_operators"]`` + would silently fail closed against every conforming brand.json + (binding_failed everywhere) and would create cross-verifier + disagreement with the TS reference impl. + """ + operators = data.get("authorized_operators") + if not isinstance(operators, list): + return None + + agent_etld1 = registrable_domain(agent_url) + if agent_etld1 is None: + # Agent host is not eTLD+1-bindable — IP literal, single label, + # etc. Fail closed (no operator delegation can rescue it). + return None + + for op in operators: + if not isinstance(op, dict): + continue + domain = op.get("domain") + if not isinstance(domain, str): + continue + op_etld1 = registrable_domain(domain) + if op_etld1 is None or op_etld1 != agent_etld1: + continue + + brands_scope = op.get("brands") + if not isinstance(brands_scope, list): + # Schema requires brands[] minItems=1; absence is a + # malformed doc, treat as unscoped → fail closed. + continue + + if brand_id is None: + # No brand context from the caller → only "*" wildcard + # operators satisfy the check. Operators scoped to specific + # brands cannot be honored without knowing which one we're + # acting for. + if any(b == "*" for b in brands_scope if isinstance(b, str)): + return domain + continue + + for b in brands_scope: + if not isinstance(b, str): + continue + if b == "*" or b == brand_id: + return domain + + return None + + +__all__ = [ + "BrandAuthorizationReason", + "BrandAuthorizationResolver", + "BrandAuthorizationResult", + "BrandJsonAuthorizationResolver", + "build_brand_json_resolvers", +] diff --git a/src/adcp/signing/brand_jwks.py b/src/adcp/signing/brand_jwks.py index 30baf75e..4af8bb0e 100644 --- a/src/adcp/signing/brand_jwks.py +++ b/src/adcp/signing/brand_jwks.py @@ -123,13 +123,18 @@ def __init__(self, code: BrandJsonResolverErrorCode, message: str) -> None: self.code: BrandJsonResolverErrorCode = code -@dataclass -class _BrandSnapshot: - """One cached brand.json snapshot — the agent we picked + the - ``jwks_uri`` we resolved + cache metadata.""" +@dataclass(frozen=True) +class _BrandJsonSnapshot: + """One cached brand.json document — full parsed body + final URL + (after redirects) + cache metadata. - jwks_uri: str - agent_url: str + Frozen so consumers can hold a reference without worrying about + mid-flight mutation by a concurrent refresh; refresh swaps in a new + snapshot atomically. + """ + + data: dict[str, Any] + final_url: str fetched_at: float expires_at: float etag: str | None = None @@ -154,6 +159,164 @@ class _FetchedBrandJson: cache_control: str | None = None +class _BrandJsonFetcher: + """Shared brand.json fetcher with TTL cache + single-flight refresh. + + Composed by :class:`BrandJsonJwksResolver` and (forthcoming) + ``BrandAuthorizationResolver`` so both surfaces share one snapshot + per brand.json URL instead of double-fetching. The fetcher owns: + + * the raw brand.json body (parsed dict) and final URL after redirects + * cache metadata (ETag, fetched_at, expires_at) + * single-flight refresh dedup across concurrent callers + + Consumers layer their own selector-output caches on top. + + Cooldown semantics live in the *consumer*, not here. The fetcher's + :meth:`refresh` always issues a fetch (subject to in-flight dedup). + Callers that want "only refresh if stale and past cooldown" should + inspect :attr:`snapshot` and gate the call themselves — same shape + as the existing JWKS resolver, just moved up one layer. + """ + + def __init__( + self, + brand_json_url: str, + *, + min_cooldown_seconds: float = DEFAULT_MIN_COOLDOWN_SECONDS, + max_age_seconds: float = DEFAULT_MAX_AGE_SECONDS, + max_redirects: int = DEFAULT_MAX_REDIRECTS, + max_body_bytes: int = DEFAULT_MAX_BRAND_JSON_BYTES, + allow_private_destinations: bool = False, + timeout_seconds: float = DEFAULT_BRAND_JSON_TIMEOUT_SECONDS, + clock: Callable[[], float] | None = None, + _client_factory: _ClientFactory | None = None, + ) -> None: + self._url = brand_json_url + self._min_cooldown = min_cooldown_seconds + self._max_age = max_age_seconds + self._max_redirects = max_redirects + self._max_body_bytes = max_body_bytes + self._allow_private = allow_private_destinations + self._timeout = timeout_seconds + self._clock = clock or time.time + self._client_factory = _client_factory + + self._snapshot: _BrandJsonSnapshot | None = None + # In-flight refresh future for single-flighting concurrent + # callers — N tasks hitting a cold cache do ONE fetch, not N. + # ``asyncio.Lock`` would also work but SERIALIZES (waiter N+1 + # fetches AFTER waiter N's fetch returns), which is what we + # want to avoid. + self._refresh_in_flight: asyncio.Future[None] | None = None + + @property + def brand_json_url(self) -> str: + """The configured entry URL (pre-redirect).""" + return self._url + + @property + def min_cooldown_seconds(self) -> float: + return self._min_cooldown + + @property + def snapshot(self) -> _BrandJsonSnapshot | None: + """Current cached snapshot, or None on cold cache. No IO.""" + return self._snapshot + + def is_stale(self, snapshot: _BrandJsonSnapshot | None = None) -> bool: + """Return True if ``snapshot`` (or the current one) has expired.""" + snap = snapshot if snapshot is not None else self._snapshot + if snap is None: + return True + return self._clock() > snap.expires_at + + def can_refresh(self, snapshot: _BrandJsonSnapshot | None = None) -> bool: + """Return True if a refresh is allowed by the cooldown gate. + + Cold cache always allows. Otherwise the snapshot must be past + ``min_cooldown_seconds`` since its ``fetched_at``. + """ + snap = snapshot if snapshot is not None else self._snapshot + if snap is None: + return True + return self._clock() - snap.fetched_at >= self._min_cooldown + + def clear(self) -> None: + """Drop the cached snapshot. Next refresh will be unconditional.""" + self._snapshot = None + + async def refresh(self) -> _BrandJsonSnapshot: + """Single-flighted brand.json refresh. + + Concurrent callers share one in-flight fetch via + ``_refresh_in_flight``. ``asyncio.shield`` protects the in-flight + task from a waiter's cancellation propagating into the shared + fetch. + + On 304 (Not Modified) the snapshot's lifetime is extended in + place; on 2xx the snapshot is replaced. Raises + :class:`BrandJsonResolverError` on fetch/parse failure WITHOUT + clearing the prior snapshot — callers that want stale-on-error + get it for free. + """ + if self._refresh_in_flight is not None: + await asyncio.shield(self._refresh_in_flight) + assert self._snapshot is not None # noqa: S101 - invariant after shared refresh + return self._snapshot + + loop = asyncio.get_running_loop() + self._refresh_in_flight = loop.create_future() + try: + try: + snap = await self._do_refresh() + except BaseException as exc: + if not self._refresh_in_flight.done(): + self._refresh_in_flight.set_exception(exc) + raise + else: + if not self._refresh_in_flight.done(): + self._refresh_in_flight.set_result(None) + return snap + finally: + self._refresh_in_flight = None + + async def _do_refresh(self) -> _BrandJsonSnapshot: + fetched = await _fetch_brand_json( + start_url=self._url, + current_etag=self._snapshot.etag if self._snapshot is not None else None, + max_redirects=self._max_redirects, + allow_private=self._allow_private, + timeout_seconds=self._timeout, + max_body_bytes=self._max_body_bytes, + client_factory=self._client_factory, + ) + + now = self._clock() + if fetched.status == "not_modified" and self._snapshot is not None: + self._snapshot = _BrandJsonSnapshot( + data=self._snapshot.data, + final_url=self._snapshot.final_url, + fetched_at=now, + expires_at=now + _compute_lifetime(fetched.cache_control, self._max_age), + etag=fetched.etag or self._snapshot.etag, + ) + return self._snapshot + + if fetched.data is None: + # Defensive: status == "ok" should always carry a body. + raise BrandJsonResolverError("invalid_body", "brand.json response missing body") + + self._snapshot = _BrandJsonSnapshot( + data=fetched.data, + final_url=fetched.final_url, + fetched_at=now, + expires_at=now + _compute_lifetime(fetched.cache_control, self._max_age), + etag=fetched.etag, + ) + return self._snapshot + + class BrandJsonJwksResolver: """JWKS resolver backed by a sender's ``brand.json``. @@ -185,31 +348,38 @@ def __init__( clock: Callable[[], float] | None = None, timeout_seconds: float = DEFAULT_BRAND_JSON_TIMEOUT_SECONDS, _client_factory: _ClientFactory | None = None, + _fetcher: _BrandJsonFetcher | None = None, ) -> None: - self._url = brand_json_url self._agent_type = agent_type self._agent_id = agent_id self._brand_id = brand_id - self._min_cooldown = min_cooldown_seconds - self._max_age = max_age_seconds - self._max_redirects = max_redirects - self._max_body_bytes = max_body_bytes self._allow_private = allow_private_destinations self._jwks_fetcher = jwks_fetcher or async_default_jwks_fetcher - self._client_factory = _client_factory self._clock = clock or time.time - self._timeout = timeout_seconds - self._snapshot: _BrandSnapshot | None = None + # The brand.json fetcher is the shared transport+cache layer. + # Constructing one here means single-tenant resolvers get the + # same behavior as before; passing ``_fetcher=`` lets the + # forthcoming BrandAuthorizationResolver share a snapshot to + # avoid double-fetching brand.json. + self._fetcher = _fetcher or _BrandJsonFetcher( + brand_json_url, + min_cooldown_seconds=min_cooldown_seconds, + max_age_seconds=max_age_seconds, + max_redirects=max_redirects, + max_body_bytes=max_body_bytes, + allow_private_destinations=allow_private_destinations, + timeout_seconds=timeout_seconds, + clock=self._clock, + _client_factory=_client_factory, + ) + + # Derived selector state. Recomputed whenever the fetcher's + # snapshot identity changes (final_url or etag); cheap to redo, + # so we don't bother diffing the body itself. + self._selected: _SelectedAgent | None = None + self._selected_for: tuple[str, str | None] | None = None self._inner: AsyncCachingJwksResolver | None = None - # In-flight refresh future for single-flighting concurrent - # callers. None when no refresh is running. N concurrent - # ``resolve()`` calls on a cold cache share one fetch via this - # future — JS uses Promise sharing natively, Python needs the - # explicit future. ``asyncio.Lock`` would also work but - # SERIALIZES (waiter N+1 fetches AFTER waiter N's fetch - # returns), which is what we want to avoid. - self._refresh_in_flight: asyncio.Future[None] | None = None # AsyncJwksResolver Protocol — callable as ``await resolver(kid)``. async def __call__(self, kid: str) -> dict[str, Any] | None: @@ -223,12 +393,10 @@ async def resolve(self, kid: str) -> dict[str, Any] | None: Unknown kid → cascade: inner resolver refresh first, then brand.json refresh. """ - if self._snapshot is None or self._inner is None: + snap = self._fetcher.snapshot + if snap is None or self._inner is None: await self._refresh() - elif ( - self._clock() > self._snapshot.expires_at - and self._clock() - self._snapshot.fetched_at >= self._min_cooldown - ): + elif self._fetcher.is_stale(snap) and self._fetcher.can_refresh(snap): try: await self._refresh() except BrandJsonResolverError: @@ -243,10 +411,7 @@ async def resolve(self, kid: str) -> dict[str, Any] | None: return hit # Cascade: refresh brand.json in case jwks_uri rotated. - if ( - self._snapshot is not None - and self._clock() - self._snapshot.fetched_at >= self._min_cooldown - ): + if self._fetcher.snapshot is not None and self._fetcher.can_refresh(): try: await self._refresh() except BrandJsonResolverError: @@ -259,7 +424,7 @@ def agent_url(self) -> str | None: """The agent URL we resolved ``jwks_uri`` from. Populated after the first successful refresh; useful for verifier result attribution.""" - return self._snapshot.agent_url if self._snapshot is not None else None + return self._selected.url if self._selected is not None else None @property def jwks_uri(self) -> str | None: @@ -267,7 +432,7 @@ def jwks_uri(self) -> str | None: this resolver's ``(agent_type, agent_id, brand_id)`` tuple. Populated after the first successful refresh; ``None`` on cold cache.""" - return self._snapshot.jwks_uri if self._snapshot is not None else None + return self._selected.jwks_uri if self._selected is not None else None async def force_refresh(self) -> None: """Force refetch of both brand.json and inner JWKS, bypassing @@ -281,78 +446,33 @@ async def force_refresh(self) -> None: either in progress or just completed", not "always issue a new fetch even when one is pending." """ - self._snapshot = None + self._fetcher.clear() + self._selected = None + self._selected_for = None self._inner = None await self._refresh() async def _refresh(self) -> None: - """Single-flighted brand.json refresh. - - Concurrent callers share one in-flight fetch via - ``_refresh_in_flight`` — N verifiers all hitting a cold cache - do ONE brand.json fetch, not N. ``asyncio.shield`` protects - the in-flight task from a waiter's own cancellation - propagating into the shared fetch. - """ - if self._refresh_in_flight is not None: - # Another task is fetching; await its result. - await asyncio.shield(self._refresh_in_flight) + """Refresh brand.json + recompute selector + (re)build inner JWKS.""" + snap = await self._fetcher.refresh() + self._sync_selector(snap) + + def _sync_selector(self, snap: _BrandJsonSnapshot) -> None: + """Reselect the agent if the brand.json snapshot identity changed.""" + identity = (snap.final_url, snap.etag) + if self._selected is not None and self._selected_for == identity: return - loop = asyncio.get_running_loop() - self._refresh_in_flight = loop.create_future() - try: - try: - await self._do_refresh() - except BaseException as exc: - # Surface the failure to any awaiters before re-raising - # so they don't await a never-resolved future. - if not self._refresh_in_flight.done(): - self._refresh_in_flight.set_exception(exc) - raise - else: - if not self._refresh_in_flight.done(): - self._refresh_in_flight.set_result(None) - finally: - self._refresh_in_flight = None - - async def _do_refresh(self) -> None: - fetched = await _fetch_brand_json( - start_url=self._url, - current_etag=self._snapshot.etag if self._snapshot is not None else None, - max_redirects=self._max_redirects, - allow_private=self._allow_private, - timeout_seconds=self._timeout, - max_body_bytes=self._max_body_bytes, - client_factory=self._client_factory, - ) - - # 304 on the entry URL: extend the lifetime, keep the inner resolver. - if fetched.status == "not_modified" and self._snapshot is not None: - now = self._clock() - self._snapshot = _BrandSnapshot( - jwks_uri=self._snapshot.jwks_uri, - agent_url=self._snapshot.agent_url, - fetched_at=now, - expires_at=now + _compute_lifetime(fetched.cache_control, self._max_age), - etag=fetched.etag or self._snapshot.etag, - ) - return - - if fetched.data is None: - # Defensive: status == "ok" should always carry a body. - raise BrandJsonResolverError("invalid_body", "brand.json response missing body") - agent = _select_agent( - fetched.data, - fetched.final_url, + snap.data, + snap.final_url, agent_type=self._agent_type, agent_id=self._agent_id, brand_id=self._brand_id, ) if self._inner is None or ( - self._snapshot is not None and self._snapshot.jwks_uri != agent.jwks_uri + self._selected is not None and self._selected.jwks_uri != agent.jwks_uri ): self._inner = AsyncCachingJwksResolver( agent.jwks_uri, @@ -361,14 +481,8 @@ async def _do_refresh(self) -> None: clock=self._clock, ) - now = self._clock() - self._snapshot = _BrandSnapshot( - jwks_uri=agent.jwks_uri, - agent_url=agent.url, - fetched_at=now, - expires_at=now + _compute_lifetime(fetched.cache_control, self._max_age), - etag=fetched.etag, - ) + self._selected = agent + self._selected_for = identity # --- brand.json fetching --- diff --git a/src/adcp/signing/etld.py b/src/adcp/signing/etld.py new file mode 100644 index 00000000..18228edc --- /dev/null +++ b/src/adcp/signing/etld.py @@ -0,0 +1,131 @@ +"""eTLD+1 (registrable domain) helpers for the brand-authorization binding. + +Per ADCP request-signing semantics (spec #3690), a buyer agent is bound to +the brand whose ``brand.json`` lists it ONLY when the agent's host and the +brand's host share an eTLD+1 — for example, ``ads.brand.example`` agent +serving ``brand.example``. Cross-eTLD+1 agents are only honored when the +brand explicitly delegates via ``brand.authorized_operators[]`` (SaaS-as- +operator multi-tenancy). + +We use the Public Suffix List (via ``tldextract``) rather than naive label +counting because public suffixes are not regular — ``co.uk`` and +``s3.amazonaws.com`` are both single suffixes despite different label +counts, and only a maintained PSL gets this right. + +**Network posture: the bundled PSL snapshot is authoritative.** The +extractor is constructed with ``suffix_list_urls=()`` so a verifier never +silently re-fetches the PSL during request processing — both for latency +determinism and because cross-implementation conformance demands a pinned +snapshot. Bumping the floor on ``tldextract`` is how we refresh. + +**Failure-closed convention.** Inputs whose eTLD+1 cannot be derived (raw +IP addresses, single-label hosts like ``localhost``, hosts that are +themselves public suffixes) yield ``None`` from :func:`registrable_domain` +and ``False`` from :func:`same_registrable_domain`. Callers must treat +None / False as a binding failure, not a soft skip. +""" + +from __future__ import annotations + +from functools import lru_cache +from urllib.parse import urlsplit + +import tldextract + + +@lru_cache(maxsize=1) +def _extractor() -> tldextract.TLDExtract: + """Process-singleton extractor with network refresh disabled. + + First-call PSL parsing is non-trivial (~hundreds of ms on cold disk + cache); subsequent calls are cheap. The singleton keeps that cost + paid-once-per-process. + + **Both ICANN and PRIVATE PSL sections are in scope.** Per ADCP + spec #3690, the eTLD+1 binding must treat platform-shared suffixes + like ``vercel.app``, ``pages.dev``, and ``github.io`` (in the PSL + PRIVATE section) as suffixes — otherwise ``attacker.vercel.app`` + and ``victim.vercel.app`` would share an eTLD+1 of ``vercel.app`` + and an attacker's vercel deployment would falsely satisfy the + binding against a vercel-hosted brand. ``include_psl_private_domains=True`` + closes that vector. + """ + return tldextract.TLDExtract( + suffix_list_urls=(), + fallback_to_snapshot=True, + include_psl_private_domains=True, + ) + + +def host_from(value: str) -> str: + """Return the hostname portion of a URL, or pass a bare host through. + + Normalizes case and trims a single trailing dot (the FQDN root + separator) so ``Example.COM.`` and ``example.com`` compare equal. + + Raises :class:`ValueError` on input that is a URL with no parseable + host (``"http://"``) or empty after normalization. URL inputs MUST + use a scheme — a bare ``//example.com`` is treated as a bare host, + which is by design: bare-host inputs to this helper come from + ``brand_url`` fields whose schema already constrains them, so a + bare-host input is never an attacker-controlled URL. + """ + if "://" in value: + parts = urlsplit(value) + host = parts.hostname + if not host: + raise ValueError(f"URL has no host: {value!r}") + return host.lower() + stripped = value.strip().rstrip(".").lower() + if not stripped: + raise ValueError("host is empty") + return stripped + + +def registrable_domain(host_or_url: str) -> str | None: + """Return the eTLD+1 (registrable domain) for ``host_or_url``. + + Accepts a full URL (``https://ads.brand.example/...``) or a bare + host (``ads.brand.example``). Returns ``None`` when the input has + no eTLD+1: + + * IP literals (v4 and v6) — IP addresses are not eTLD+1-bindable. + * Single-label hosts (``localhost``, ``intranet``). + * Hosts that are themselves a public suffix (``co.uk``). + + The returned domain is lowercased. + + Callers performing a binding check should treat ``None`` as a + failure (the agent's host has no registrable domain to bind + against), NOT as "no opinion". + """ + host = host_from(host_or_url) + result = _extractor()(host) + if not result.domain or not result.suffix: + return None + # Compose explicitly rather than using ``ExtractResult.registered_domain`` + # (or ``top_domain_under_public_suffix`` in 5.3+). Either accessor would + # work, but composing keeps the helper insensitive to the property + # rename tldextract 5.3 announced. + return f"{result.domain}.{result.suffix}".lower() + + +def same_registrable_domain(a: str, b: str) -> bool: + """Return True iff ``a`` and ``b`` share an eTLD+1. + + Both arguments may be URLs or bare hosts (mixed forms are fine). + Returns ``False`` when either side has no derivable eTLD+1 — see + :func:`registrable_domain` for the failure-closed convention. + """ + da = registrable_domain(a) + db = registrable_domain(b) + if da is None or db is None: + return False + return da == db + + +__all__ = [ + "host_from", + "registrable_domain", + "same_registrable_domain", +] diff --git a/tests/test_brand_authz.py b/tests/test_brand_authz.py new file mode 100644 index 00000000..530f8ab8 --- /dev/null +++ b/tests/test_brand_authz.py @@ -0,0 +1,647 @@ +"""Tests for :mod:`adcp.signing.brand_authz`. + +Behavior under test (matches ADCP request-signing spec #3690): + +* eTLD+1 binding: same-eTLD+1 agent ↔ brand pair authorizes. +* ``house.authorized_operators[]`` delegation: cross-eTLD+1 agent + authorized when its host is a listed operator and the operator's + ``brands[]`` scope covers the request. +* Listing requirement: agent_url MUST appear in some ``agents[]`` + array (top-level / house / brand-scoped). +* ``agent_type`` filter narrows the listing match without affecting + the binding check. +* ``brand_id`` filter narrows the agents[] walk to that brand's + array (plus house) and scopes the operator delegation check. +* Failure modes: agent not listed; agent listed under wrong type; + binding failed (cross-eTLD+1 with no operator delegation); + brand_domain invalid (IP literal / no public suffix); brand.json + fetch error. +* Shared fetcher: :func:`build_brand_json_resolvers` returns a JWKS + resolver and an authz resolver that share one fetch. +""" + +from __future__ import annotations + +import json + +import httpx +import pytest + +from adcp.signing.brand_authz import ( + BrandAuthorizationResolver, + BrandJsonAuthorizationResolver, + build_brand_json_resolvers, +) + + +class _MockTransport(httpx.AsyncBaseTransport): + """Minimal async transport that returns canned responses keyed on URL. + + Records each call so tests can assert "only one fetch happened" + when sharing the fetcher across two resolvers. + """ + + def __init__(self, responses: dict[str, dict]): + self.responses = responses + self.calls: list[httpx.Request] = [] + + async def handle_async_request(self, request: httpx.Request) -> httpx.Response: + self.calls.append(request) + url = str(request.url) + if url not in self.responses: + return httpx.Response(404, content=b"") + spec = self.responses[url] + return httpx.Response( + spec.get("status", 200), + content=spec.get("body", b""), + headers=spec.get("headers", {}), + ) + + +def _factory(transport: _MockTransport): + def f(_url: str) -> httpx.AsyncClient: + return httpx.AsyncClient( + transport=transport, + timeout=5.0, + follow_redirects=False, + trust_env=False, + ) + + return f + + +def _brand_json(body: dict) -> bytes: + return json.dumps(body).encode("utf-8") + + +# ----- protocol conformance ----- + + +def test_brand_json_resolver_satisfies_protocol() -> None: + resolver = BrandJsonAuthorizationResolver("https://brand.example/.well-known/brand.json") + assert isinstance(resolver, BrandAuthorizationResolver) + + +# ----- eTLD+1 binding (happy path) ----- + + +@pytest.mark.asyncio +async def test_authz_etld1_match_authorizes_same_origin_agent() -> None: + body = _brand_json( + { + "agents": [ + { + "type": "signals", + "id": "signals_main", + "url": "https://ads.brand.com/signals", + } + ] + } + ) + transport = _MockTransport( + {"https://brand.com/.well-known/brand.json": {"body": body}}, + ) + resolver = BrandJsonAuthorizationResolver( + "https://brand.com/.well-known/brand.json", + _client_factory=_factory(transport), + ) + + result = await resolver.check( + agent_url="https://ads.brand.com/signals", + brand_domain="brand.com", + ) + + assert result.authorized is True + assert result.reason == "etld1_match" + assert result.matched_agent_url == "https://ads.brand.com/signals" + assert result.matched_agent_type == "signals" + + +@pytest.mark.asyncio +async def test_authz_etld1_match_with_subdomain_brand_url() -> None: + body = _brand_json( + {"agents": [{"type": "signals", "id": "x", "url": "https://api.brand.com/x"}]} + ) + transport = _MockTransport({"https://brand.com/.well-known/brand.json": {"body": body}}) + resolver = BrandJsonAuthorizationResolver( + "https://brand.com/.well-known/brand.json", + _client_factory=_factory(transport), + ) + + assert await resolver.is_authorized( + agent_url="https://api.brand.com/x", + brand_domain="https://www.brand.com/", + ) + + +# ----- agent_type filter ----- + + +@pytest.mark.asyncio +async def test_authz_agent_type_filter_matches() -> None: + body = _brand_json( + { + "agents": [ + {"type": "signals", "id": "s", "url": "https://ads.brand.com/agent"}, + {"type": "creative", "id": "c", "url": "https://ads.brand.com/agent"}, + ] + } + ) + transport = _MockTransport({"https://brand.com/.well-known/brand.json": {"body": body}}) + resolver = BrandJsonAuthorizationResolver( + "https://brand.com/.well-known/brand.json", + _client_factory=_factory(transport), + ) + + result = await resolver.check( + agent_url="https://ads.brand.com/agent", + brand_domain="brand.com", + agent_type="creative", + ) + assert result.authorized + assert result.matched_agent_type == "creative" + + +@pytest.mark.asyncio +async def test_authz_agent_type_mismatch_distinguished_from_not_listed() -> None: + # URL is listed under "signals" but caller is asking for "creative". + # We must distinguish the "wrong type" case from "not present at all" + # so the verifier can emit a precise error. + body = _brand_json( + {"agents": [{"type": "signals", "id": "s", "url": "https://ads.brand.com/agent"}]} + ) + transport = _MockTransport({"https://brand.com/.well-known/brand.json": {"body": body}}) + resolver = BrandJsonAuthorizationResolver( + "https://brand.com/.well-known/brand.json", + _client_factory=_factory(transport), + ) + + result = await resolver.check( + agent_url="https://ads.brand.com/agent", + brand_domain="brand.com", + agent_type="creative", + ) + assert result.authorized is False + assert result.reason == "agent_type_mismatch" + + +@pytest.mark.asyncio +async def test_authz_unlisted_agent_returns_agent_not_listed() -> None: + body = _brand_json( + {"agents": [{"type": "signals", "id": "s", "url": "https://ads.brand.com/agent"}]} + ) + transport = _MockTransport({"https://brand.com/.well-known/brand.json": {"body": body}}) + resolver = BrandJsonAuthorizationResolver( + "https://brand.com/.well-known/brand.json", + _client_factory=_factory(transport), + ) + + result = await resolver.check( + agent_url="https://other.brand.com/agent", + brand_domain="brand.com", + ) + assert result.authorized is False + assert result.reason == "agent_not_listed" + + +# ----- cross-eTLD+1 with no delegation: fails ----- + + +@pytest.mark.asyncio +async def test_authz_cross_etld1_without_operator_fails_binding() -> None: + # Agent host is wpp.com but brand is brand.com and there's no + # authorized_operators[] entry. The agent IS listed in agents[] + # (the brand acknowledges it exists) but cannot bind to the brand. + body = _brand_json( + {"agents": [{"type": "signals", "id": "s", "url": "https://wpp.com/brand/agent"}]} + ) + transport = _MockTransport({"https://brand.com/.well-known/brand.json": {"body": body}}) + resolver = BrandJsonAuthorizationResolver( + "https://brand.com/.well-known/brand.json", + _client_factory=_factory(transport), + ) + + result = await resolver.check( + agent_url="https://wpp.com/brand/agent", + brand_domain="brand.com", + ) + assert result.authorized is False + assert result.reason == "binding_failed" + # matched_agent_url IS set: the agent matched the listing, just + # not the binding. Verifier callers use this for log attribution. + assert result.matched_agent_url == "https://wpp.com/brand/agent" + + +# ----- authorized_operators[] delegation ----- + + +@pytest.mark.asyncio +async def test_authz_operator_delegation_with_wildcard_brands() -> None: + body = _brand_json( + { + "house": { + "agents": [ + {"type": "signals", "id": "s", "url": "https://wpp.com/brand/agent"}, + ], + }, + "brands": [{"id": "brand_one"}], + "authorized_operators": [ + {"domain": "wpp.com", "brands": ["*"]}, + ], + } + ) + transport = _MockTransport({"https://brand.com/.well-known/brand.json": {"body": body}}) + resolver = BrandJsonAuthorizationResolver( + "https://brand.com/.well-known/brand.json", + _client_factory=_factory(transport), + ) + + result = await resolver.check( + agent_url="https://wpp.com/brand/agent", + brand_domain="brand.com", + ) + assert result.authorized is True + assert result.reason == "operator_delegation" + assert result.matched_operator_domain == "wpp.com" + + +@pytest.mark.asyncio +async def test_authz_operator_delegation_scoped_brand_id_matches() -> None: + body = _brand_json( + { + "house": { + "agents": [ + {"type": "signals", "id": "s", "url": "https://wpp.com/agent"}, + ], + }, + "brands": [{"id": "nike"}, {"id": "adidas"}], + "authorized_operators": [ + {"domain": "wpp.com", "brands": ["nike"]}, + ], + } + ) + transport = _MockTransport({"https://brand.com/.well-known/brand.json": {"body": body}}) + resolver = BrandJsonAuthorizationResolver( + "https://brand.com/.well-known/brand.json", + _client_factory=_factory(transport), + ) + + result = await resolver.check( + agent_url="https://wpp.com/agent", + brand_domain="brand.com", + brand_id="nike", + ) + assert result.authorized is True + assert result.reason == "operator_delegation" + + +@pytest.mark.asyncio +async def test_authz_operator_delegation_scoped_brand_id_misses() -> None: + # Operator is authorized for nike but caller passed brand_id=adidas. + body = _brand_json( + { + "house": { + "agents": [ + {"type": "signals", "id": "s", "url": "https://wpp.com/agent"}, + ], + }, + "brands": [{"id": "nike"}, {"id": "adidas"}], + "authorized_operators": [ + {"domain": "wpp.com", "brands": ["nike"]}, + ], + } + ) + transport = _MockTransport({"https://brand.com/.well-known/brand.json": {"body": body}}) + resolver = BrandJsonAuthorizationResolver( + "https://brand.com/.well-known/brand.json", + _client_factory=_factory(transport), + ) + + result = await resolver.check( + agent_url="https://wpp.com/agent", + brand_domain="brand.com", + brand_id="adidas", + ) + assert result.authorized is False + assert result.reason == "binding_failed" + + +@pytest.mark.asyncio +async def test_authz_operator_without_wildcard_fails_unscoped_request() -> None: + # No brand_id from the caller AND operator is scoped to a specific + # brand → fail closed. Without the brand context we cannot verify + # which brand the operator is acting for. + body = _brand_json( + { + "house": { + "agents": [ + {"type": "signals", "id": "s", "url": "https://wpp.com/agent"}, + ], + }, + "authorized_operators": [ + {"domain": "wpp.com", "brands": ["nike"]}, + ], + } + ) + transport = _MockTransport({"https://brand.com/.well-known/brand.json": {"body": body}}) + resolver = BrandJsonAuthorizationResolver( + "https://brand.com/.well-known/brand.json", + _client_factory=_factory(transport), + ) + + result = await resolver.check( + agent_url="https://wpp.com/agent", + brand_domain="brand.com", + ) + assert result.authorized is False + assert result.reason == "binding_failed" + + +@pytest.mark.asyncio +async def test_authz_operator_etld1_compared_not_byte_equal() -> None: + # Operator declared as ``wpp.com`` covers ``api.wpp.com`` — eTLD+1 + # equivalence, not byte-equal hostname. Matches the same posture + # as the agent-to-brand binding. + body = _brand_json( + { + "house": { + "agents": [ + {"type": "signals", "id": "s", "url": "https://api.wpp.com/agent"}, + ], + }, + "authorized_operators": [ + {"domain": "wpp.com", "brands": ["*"]}, + ], + } + ) + transport = _MockTransport({"https://brand.com/.well-known/brand.json": {"body": body}}) + resolver = BrandJsonAuthorizationResolver( + "https://brand.com/.well-known/brand.json", + _client_factory=_factory(transport), + ) + + result = await resolver.check( + agent_url="https://api.wpp.com/agent", + brand_domain="brand.com", + ) + assert result.authorized is True + assert result.reason == "operator_delegation" + + +@pytest.mark.asyncio +async def test_authz_operator_declared_under_house_is_ignored() -> None: + # Per the canonical brand.json schema, ``authorized_operators`` is + # top-level on the House Portfolio variant — sibling of ``house`` / + # ``brands`` / ``contact``. A document that misplaces the array + # inside ``house`` MUST fail closed (the spec's reference verifier + # reads top-level; honoring the nested location would create cross- + # verifier disagreement and an operator-delegation bypass via the + # wrong-location reading). + body = _brand_json( + { + "house": { + "agents": [ + {"type": "signals", "id": "s", "url": "https://wpp.com/agent"}, + ], + # Wrong location — should be ignored. + "authorized_operators": [ + {"domain": "wpp.com", "brands": ["*"]}, + ], + }, + } + ) + transport = _MockTransport({"https://brand.com/.well-known/brand.json": {"body": body}}) + resolver = BrandJsonAuthorizationResolver( + "https://brand.com/.well-known/brand.json", + _client_factory=_factory(transport), + ) + + result = await resolver.check( + agent_url="https://wpp.com/agent", + brand_domain="brand.com", + ) + assert result.authorized is False + assert result.reason == "binding_failed" + + +# ----- brand_id scopes the agents[] walk ----- + + +@pytest.mark.asyncio +async def test_authz_brand_id_narrows_agents_walk() -> None: + # Agent listed under brands[id=adidas] but caller passed + # brand_id=nike. House agents and the requested brand's agents + # are the only valid sources — adidas's agents are out of scope. + body = _brand_json( + { + "house": {"agents": []}, + "brands": [ + { + "id": "nike", + "agents": [ + {"type": "signals", "id": "n", "url": "https://nike.brand.com/agent"}, + ], + }, + { + "id": "adidas", + "agents": [ + {"type": "signals", "id": "a", "url": "https://adidas.brand.com/agent"}, + ], + }, + ], + } + ) + transport = _MockTransport({"https://brand.com/.well-known/brand.json": {"body": body}}) + resolver = BrandJsonAuthorizationResolver( + "https://brand.com/.well-known/brand.json", + _client_factory=_factory(transport), + ) + + # Right brand_id → match. + assert await resolver.is_authorized( + agent_url="https://nike.brand.com/agent", + brand_domain="brand.com", + brand_id="nike", + ) + + # Wrong brand_id → agent_not_listed (adidas's agent is invisible + # when brand_id=nike). + result = await resolver.check( + agent_url="https://adidas.brand.com/agent", + brand_domain="brand.com", + brand_id="nike", + ) + assert result.authorized is False + assert result.reason == "agent_not_listed" + + +# ----- brand_domain validation ----- + + +@pytest.mark.asyncio +async def test_authz_rejects_ip_literal_brand_domain() -> None: + # An IP literal has no eTLD+1; binding cannot succeed. + resolver = BrandJsonAuthorizationResolver( + "https://brand.com/.well-known/brand.json", + ) + + result = await resolver.check( + agent_url="https://ads.brand.com/x", + brand_domain="192.0.2.1", + ) + assert result.authorized is False + assert result.reason == "brand_domain_invalid" + + +@pytest.mark.asyncio +async def test_authz_rejects_localhost_brand_domain() -> None: + resolver = BrandJsonAuthorizationResolver( + "https://brand.com/.well-known/brand.json", + ) + + result = await resolver.check( + agent_url="https://ads.brand.com/x", + brand_domain="localhost", + ) + assert result.authorized is False + assert result.reason == "brand_domain_invalid" + + +# ----- fetch errors ----- + + +@pytest.mark.asyncio +async def test_authz_brand_json_404_returns_brand_json_unavailable() -> None: + transport = _MockTransport({}) # 404 for everything + resolver = BrandJsonAuthorizationResolver( + "https://brand.com/.well-known/brand.json", + _client_factory=_factory(transport), + ) + + result = await resolver.check( + agent_url="https://ads.brand.com/x", + brand_domain="brand.com", + ) + assert result.authorized is False + assert result.reason == "brand_json_unavailable" + assert result.fetch_error is not None + + +# ----- byte-equal agents[] matching (spec mandate) ----- + + +@pytest.mark.asyncio +async def test_authz_trailing_slash_mismatch_fails_byte_equal() -> None: + # Per ADCP #3690: agents[].url match MUST be byte-equal. A trailing + # slash on the request side vs no trailing slash on the brand.json + # side is a mismatch — operators must list the exact URL. + body = _brand_json( + {"agents": [{"type": "signals", "id": "s", "url": "https://ads.brand.com/agent"}]} + ) + transport = _MockTransport({"https://brand.com/.well-known/brand.json": {"body": body}}) + resolver = BrandJsonAuthorizationResolver( + "https://brand.com/.well-known/brand.json", + _client_factory=_factory(transport), + ) + + result = await resolver.check( + agent_url="https://ads.brand.com/agent/", # extra trailing slash + brand_domain="brand.com", + ) + assert result.authorized is False + assert result.reason == "agent_not_listed" + + +@pytest.mark.asyncio +async def test_authz_case_mismatch_fails_byte_equal() -> None: + # Scheme/host case differences are NOT canonicalized at this step. + # The spec's rationale: operators must be deliberate about what + # they list; a canonicalization-permissive match silently authorizes + # URLs that drift from what the brand declared. + body = _brand_json( + {"agents": [{"type": "signals", "id": "s", "url": "https://ads.brand.com/agent"}]} + ) + transport = _MockTransport({"https://brand.com/.well-known/brand.json": {"body": body}}) + resolver = BrandJsonAuthorizationResolver( + "https://brand.com/.well-known/brand.json", + _client_factory=_factory(transport), + ) + + result = await resolver.check( + agent_url="https://ADS.brand.com/agent", # uppercase host + brand_domain="brand.com", + ) + assert result.authorized is False + assert result.reason == "agent_not_listed" + + +@pytest.mark.asyncio +async def test_authz_duplicate_agents_entry_returns_ambiguous() -> None: + # brand.json schema does NOT constrain agents[] to be unique-by-URL. + # If an operator misconfigures with duplicate entries for the same + # URL, fail closed rather than silently picking one — maps to + # ``request_signature_brand_json_ambiguous`` at the framework boundary. + body = _brand_json( + { + "agents": [ + {"type": "signals", "id": "a", "url": "https://ads.brand.com/agent"}, + {"type": "signals", "id": "b", "url": "https://ads.brand.com/agent"}, + ] + } + ) + transport = _MockTransport({"https://brand.com/.well-known/brand.json": {"body": body}}) + resolver = BrandJsonAuthorizationResolver( + "https://brand.com/.well-known/brand.json", + _client_factory=_factory(transport), + ) + + result = await resolver.check( + agent_url="https://ads.brand.com/agent", + brand_domain="brand.com", + ) + assert result.authorized is False + assert result.reason == "agent_ambiguous" + + +# ----- shared-fetcher builder ----- + + +@pytest.mark.asyncio +async def test_build_brand_json_resolvers_shares_one_fetch() -> None: + # Both resolvers share one fetcher → ONE brand.json HTTP call + # even when both consumers do work. + body = _brand_json( + { + "agents": [ + { + "type": "signals", + "id": "s", + "url": "https://ads.brand.com/agent", + "jwks_uri": "https://ads.brand.com/.well-known/jwks.json", + } + ] + } + ) + transport = _MockTransport({"https://brand.com/.well-known/brand.json": {"body": body}}) + + jwks, authz = build_brand_json_resolvers( + "https://brand.com/.well-known/brand.json", + agent_type="signals", + ) + # Inject the test transport into both via the shared fetcher. + # The builder doesn't expose that seam publicly (by design), so + # we monkey-patch the private fetcher's factory. + jwks._fetcher._client_factory = _factory(transport) # type: ignore[attr-defined] + + # First call: cold cache, one fetch. + assert await authz.is_authorized( + agent_url="https://ads.brand.com/agent", + brand_domain="brand.com", + ) + + # Cooldown blocks a second fetch even if the JWKS resolver also + # walks brand.json — but the relevant assertion is that the authz + # call alone did not trigger a duplicate brand.json fetch from the + # JWKS-side construction. ONE fetch total at this point. + brand_json_calls = [c for c in transport.calls if "brand.json" in str(c.url)] + assert len(brand_json_calls) == 1 diff --git a/tests/test_etld.py b/tests/test_etld.py new file mode 100644 index 00000000..53321853 --- /dev/null +++ b/tests/test_etld.py @@ -0,0 +1,167 @@ +"""Tests for :mod:`adcp.signing.etld`. + +Behavior under test: + +* eTLD+1 derivation across single-label public suffixes (``.com``) and + multi-label public suffixes (``.co.uk``, ``.s3.amazonaws.com``). +* URL inputs and bare-host inputs accepted symmetrically. +* Failure-closed convention: IP literals, single-label hosts, and + hosts that ARE public suffixes return ``None`` / ``False``. +* Case-insensitive comparison. +""" + +from __future__ import annotations + +import pytest + +from adcp.signing.etld import host_from, registrable_domain, same_registrable_domain + +# ----- host_from ----- + + +def test_host_from_url() -> None: + assert host_from("https://ads.brand.com/path") == "ads.brand.com" + + +def test_host_from_url_lowercases() -> None: + assert host_from("https://ADS.Brand.Com/") == "ads.brand.com" + + +def test_host_from_bare_host() -> None: + assert host_from("Brand.Com") == "brand.com" + + +def test_host_from_strips_trailing_root_dot() -> None: + assert host_from("brand.com.") == "brand.com" + + +def test_host_from_url_with_no_host_raises() -> None: + with pytest.raises(ValueError): + host_from("http://") + + +def test_host_from_empty_raises() -> None: + with pytest.raises(ValueError): + host_from("") + + +def test_host_from_url_with_port() -> None: + assert host_from("https://ads.brand.com:8443/") == "ads.brand.com" + + +# ----- registrable_domain ----- + + +def test_registrable_domain_simple_com() -> None: + assert registrable_domain("brand.com") == "brand.com" + + +def test_registrable_domain_subdomain() -> None: + assert registrable_domain("ads.brand.com") == "brand.com" + + +def test_registrable_domain_deep_subdomain() -> None: + assert registrable_domain("a.b.c.brand.com") == "brand.com" + + +def test_registrable_domain_multi_label_suffix_co_uk() -> None: + # ``co.uk`` is a public suffix; eTLD+1 is ``example.co.uk``. + assert registrable_domain("ads.example.co.uk") == "example.co.uk" + + +def test_registrable_domain_url_input() -> None: + assert registrable_domain("https://ads.brand.com/path?q=1") == "brand.com" + + +def test_registrable_domain_case_normalized() -> None: + assert registrable_domain("ADS.Brand.Com") == "brand.com" + + +def test_registrable_domain_ipv4_returns_none() -> None: + # IP literals are not eTLD+1-bindable per the failure-closed convention. + assert registrable_domain("192.0.2.1") is None + + +def test_registrable_domain_ipv6_returns_none() -> None: + assert registrable_domain("https://[2001:db8::1]/") is None + + +def test_registrable_domain_single_label_returns_none() -> None: + # ``localhost`` has no public suffix → fail closed. + assert registrable_domain("localhost") is None + + +def test_registrable_domain_bare_public_suffix_returns_none() -> None: + # ``co.uk`` is itself a public suffix with no domain label preceding it. + assert registrable_domain("co.uk") is None + + +# ----- same_registrable_domain ----- + + +def test_same_registrable_domain_subdomain_pair() -> None: + assert same_registrable_domain("ads.brand.com", "brand.com") is True + + +def test_same_registrable_domain_different_subdomains() -> None: + assert same_registrable_domain("ads.brand.com", "creative.brand.com") is True + + +def test_same_registrable_domain_different_etld1() -> None: + assert same_registrable_domain("brand.com", "rival.com") is False + + +def test_same_registrable_domain_mixed_url_and_host() -> None: + assert same_registrable_domain("https://ads.brand.com/", "brand.com") is True + + +def test_same_registrable_domain_case_insensitive() -> None: + assert same_registrable_domain("ADS.Brand.Com", "brand.COM") is True + + +def test_same_registrable_domain_ip_fails_closed() -> None: + # An IP literal must NOT bind to anything — even another IP literal. + assert same_registrable_domain("192.0.2.1", "192.0.2.1") is False + + +def test_same_registrable_domain_localhost_fails_closed() -> None: + assert same_registrable_domain("localhost", "localhost") is False + + +def test_same_registrable_domain_multi_label_suffix_pair() -> None: + # Same eTLD+1 under a multi-label suffix. + assert same_registrable_domain("ads.brand.co.uk", "creative.brand.co.uk") is True + + +def test_same_registrable_domain_multi_label_suffix_cross() -> None: + # Different eTLD+1 under same multi-label suffix. + assert same_registrable_domain("brand.co.uk", "rival.co.uk") is False + + +def test_same_registrable_domain_cross_tld_with_shared_label() -> None: + # ``brand.com`` and ``brand.org`` share a label but not an eTLD+1. + assert same_registrable_domain("brand.com", "brand.org") is False + + +def test_registrable_domain_psl_private_section_in_scope() -> None: + # Per ADCP #3690, the PSL PRIVATE section must be in scope so + # platform-shared suffixes (``vercel.app``, ``pages.dev``, + # ``github.io``) are treated as suffixes. Without this, + # ``attacker.vercel.app`` and ``victim.vercel.app`` would share an + # eTLD+1 and the binding check would authorize an attacker's + # vercel deployment for a victim's vercel-hosted brand. + assert registrable_domain("attacker.vercel.app") == "attacker.vercel.app" + assert registrable_domain("victim.vercel.app") == "victim.vercel.app" + assert same_registrable_domain("attacker.vercel.app", "victim.vercel.app") is False + assert registrable_domain("brand.github.io") == "brand.github.io" + assert registrable_domain("brand.pages.dev") == "brand.pages.dev" + + +def test_registrable_domain_reserved_tld_returns_none() -> None: + # ``.example``, ``.test``, ``.invalid``, ``.localhost`` are RFC 2606 + # reserved names — NOT in the PSL — so they fail closed. The spec's + # documentation examples (``brand.example``) do not bind under our + # helper; that is correct, since reserved names are not delegated. + assert registrable_domain("brand.example") is None + assert registrable_domain("foo.test") is None + assert registrable_domain("svc.invalid") is None