From 5160ab0bc054dfb6b72b4f8076f12a25bbf42ad3 Mon Sep 17 00:00:00 2001 From: Brian O'Kelley Date: Wed, 20 May 2026 07:41:14 -0700 Subject: [PATCH 1/2] feat(adagents): publisher_domains compact form, revoked_publisher_domains, streaming fetch caps (#729) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements full scope of issue #729 plus the scope-expansion comment: - publisher-property-selector schema gains optional publisher_domains[] compact form (XOR with publisher_domain; by_id selectors excluded from compact form). - adagents.json gains top-level revoked_publisher_domains[] with Reason enum. - Resolver: _fanout_publisher_properties expands compact entries one-per-domain; filter_revoked_selectors applies revocation precedence over both publisher_properties[] selectors and top-level properties[]. - Fetch path rewritten onto httpx.AsyncClient.stream with two-tier size caps (MAX_POINTER_BYTES=5MiB first hop, MAX_AUTHORITATIVE_BYTES=20MiB second hop) and Content-Length pre-check. New fetch_adagents_with_cache sends If-None-Match / If-Modified-Since; 304 treated as cache-lifetime refresh. - Runtime validation: two XORs enforced on publisher_properties[] entries; by_id rejects publisher_domains; validate_revoked_publisher_domain_entry plumbed into validate_adagents. - 24 new tests across four classes; 122 existing adagents tests ported onto the new streaming mock helper. Cached schemas under schemas/cache/3.0/ are hand-patched and will be overwritten by the next \`make regenerate-schemas\` once upstream 3.0.10+ ships and ADCP_VERSION is bumped — that's fine; the runtime/test changes are independent of the cached schema text. Per-authorized_agents[] last_updated (also discussed in the spec PR thread) is intentionally not included — the merged upstream spec PR does not carry that field, confirmed against the latest schema. Co-Authored-By: Claude Opus 4.7 (1M context) --- schemas/cache/3.0/adagents.json | 34 + .../3.0/core/publisher-property-selector.json | 90 +- src/adcp/__init__.py | 8 + src/adcp/adagents.py | 413 +++++++-- src/adcp/types/generated_poc/adagents.py | 40 +- .../core/publisher_property_selector.py | 42 +- src/adcp/validation/__init__.py | 2 + src/adcp/validation/legacy.py | 106 ++- tests/fixtures/public_api_snapshot.json | 4 + tests/test_adagents.py | 871 ++++++++++++++---- 10 files changed, 1331 insertions(+), 279 deletions(-) diff --git a/schemas/cache/3.0/adagents.json b/schemas/cache/3.0/adagents.json index f942c2a67..a5ad86910 100644 --- a/schemas/cache/3.0/adagents.json +++ b/schemas/cache/3.0/adagents.json @@ -89,6 +89,40 @@ }, "minItems": 1 }, + "revoked_publisher_domains": { + "type": "array", + "description": "Publisher domains explicitly removed from this managed network. Validators MUST treat any publisher domain listed here as no-longer-authorized, taking precedence over any appearance of the same domain in `authorized_agents[].publisher_properties[].publisher_domain` / `.publisher_domains[]` or in top-level `properties[].publisher_domain`. Lets a network propagate per-publisher revocations on the next refresh instead of waiting for the file-level 7-day cache cap. Validators MUST hold previously-observed `(publisher_domain, revoked_at)` tuples for 7 days from the validator's first observation, even if the entry vanishes from a subsequent fetch — this closes the rollback gap where an attacker re-serves a stale file with the revocation removed. Networks SHOULD retain entries for at least 7 days after `revoked_at` so validators that didn't observe the original entry still pick it up on refresh.", + "items": { + "type": "object", + "properties": { + "publisher_domain": { + "type": "string", + "description": "Publisher domain being revoked. Matches against the same canonicalized form used in `publisher_properties[].publisher_domain`.", + "pattern": "^[a-z0-9]([a-z0-9-]*[a-z0-9])?(\\.[a-z0-9]([a-z0-9-]*[a-z0-9])?)*$" + }, + "revoked_at": { + "type": "string", + "format": "date-time", + "description": "ISO 8601 timestamp when this publisher was revoked. Validators MAY use this to order revocations against their own cached state." + }, + "reason": { + "type": "string", + "enum": [ + "relationship_ended", + "compliance_violation", + "publisher_request", + "other" + ], + "description": "Reason for revocation. Operator-internal self-classification for review routing — not a public accusation. `relationship_ended` is the routine commercial case. `compliance_violation` SHOULD be used only when the network has itself determined the publisher is out of policy; for un-adjudicated third-party allegations (regulator inquiries, advertiser complaints, ongoing investigations), use `other` to avoid making a discoverable adverse statement. `publisher_request` is for publisher-initiated exits." + } + }, + "required": [ + "publisher_domain", + "revoked_at" + ], + "additionalProperties": true + } + }, "collections": { "type": "array", "description": "Collections produced or distributed by this publisher. Declares the content programs whose inventory is sold through authorized agents. Products in get_products responses reference these collections by collection_id.", diff --git a/schemas/cache/3.0/core/publisher-property-selector.json b/schemas/cache/3.0/core/publisher-property-selector.json index 67e7856c3..204a05964 100644 --- a/schemas/cache/3.0/core/publisher-property-selector.json +++ b/schemas/cache/3.0/core/publisher-property-selector.json @@ -1,39 +1,72 @@ { "$schema": "http://json-schema.org/draft-07/schema#", "title": "Publisher Property Selector", - "description": "Selects properties from a publisher's adagents.json. Used for both product definitions and agent authorization. Supports three selection patterns: all properties, specific IDs, or by tags.", + "description": "Selects properties from a publisher's adagents.json. Used for both product definitions and agent authorization. Supports three selection patterns: all properties, specific IDs, or by tags. Each selector targets one publisher via `publisher_domain` (string) or a fan-out across many publishers that share the same selector via `publisher_domains` (array). Exactly one of `publisher_domain` or `publisher_domains` MUST be present. When `publisher_domains` is used, the selector is logically equivalent to repeating the same entry once per listed domain.", "discriminator": { "propertyName": "selection_type" }, "oneOf": [ { "type": "object", - "description": "Select all properties from the publisher domain", + "description": "Select all properties from one publisher domain, or from each publisher domain when `publisher_domains` is used.", "properties": { "publisher_domain": { "type": "string", - "description": "Domain where publisher's adagents.json is hosted (e.g., 'cnn.com')", + "description": "Domain where publisher's adagents.json is hosted (e.g., 'cnn.com'). XOR with `publisher_domains` — exactly one MUST be present on each `publisher_properties[]` entry; both-present and neither-present both fail validation.", "pattern": "^[a-z0-9]([a-z0-9-]*[a-z0-9])?(\\.[a-z0-9]([a-z0-9-]*[a-z0-9])?)*$" }, + "publisher_domains": { + "type": "array", + "description": "Compact form for fanning the same selector across many publishers (e.g., a managed network listing every publisher it represents). Each entry is the domain where that publisher's adagents.json is hosted. Each listed domain MUST be canonicalized to lowercase (the `pattern` already rejects uppercase). Mutually exclusive with `publisher_domain`. Each listed domain counts as explicitly scoped for the `managerdomain` fallback safety rule.", + "items": { + "type": "string", + "pattern": "^[a-z0-9]([a-z0-9-]*[a-z0-9])?(\\.[a-z0-9]([a-z0-9-]*[a-z0-9])?)*$" + }, + "minItems": 1, + "uniqueItems": true + }, "selection_type": { "type": "string", "const": "all", - "description": "Discriminator indicating all properties from this publisher are included" + "description": "Discriminator indicating all properties from each addressed publisher are included" } }, "required": [ - "publisher_domain", "selection_type" ], + "allOf": [ + { + "not": { + "required": [ + "publisher_domain", + "publisher_domains" + ] + } + }, + { + "anyOf": [ + { + "required": [ + "publisher_domain" + ] + }, + { + "required": [ + "publisher_domains" + ] + } + ] + } + ], "additionalProperties": true }, { "type": "object", - "description": "Select specific properties by ID", + "description": "Select specific properties by ID. Single-publisher only — property IDs are publisher-scoped, so the compact `publisher_domains[]` form is intentionally NOT available for this selector. Use multiple `publisher_properties[]` entries (one per publisher) when each publisher's ID set differs.", "properties": { "publisher_domain": { "type": "string", - "description": "Domain where publisher's adagents.json is hosted (e.g., 'cnn.com')", + "description": "Domain where publisher's adagents.json is hosted (e.g., 'cnn.com').", "pattern": "^[a-z0-9]([a-z0-9-]*[a-z0-9])?(\\.[a-z0-9]([a-z0-9-]*[a-z0-9])?)*$" }, "selection_type": { @@ -59,13 +92,23 @@ }, { "type": "object", - "description": "Select properties by tag membership", + "description": "Select properties by tag membership. With `publisher_domains`, the same `property_tags` predicate is resolved against each listed publisher's adagents.json — the common managed-network case where every represented site tags inventory with a shared label.", "properties": { "publisher_domain": { "type": "string", - "description": "Domain where publisher's adagents.json is hosted (e.g., 'cnn.com')", + "description": "Domain where publisher's adagents.json is hosted (e.g., 'cnn.com'). XOR with `publisher_domains` — exactly one MUST be present on each `publisher_properties[]` entry; both-present and neither-present both fail validation.", "pattern": "^[a-z0-9]([a-z0-9-]*[a-z0-9])?(\\.[a-z0-9]([a-z0-9-]*[a-z0-9])?)*$" }, + "publisher_domains": { + "type": "array", + "description": "Compact form for fanning the same tag predicate across many publishers (canonical managed-network shape). Each entry is the domain where that publisher's adagents.json is hosted. Each listed domain MUST be canonicalized to lowercase (the `pattern` already rejects uppercase). Mutually exclusive with `publisher_domain`. Each listed domain counts as explicitly scoped for the `managerdomain` fallback safety rule.", + "items": { + "type": "string", + "pattern": "^[a-z0-9]([a-z0-9-]*[a-z0-9])?(\\.[a-z0-9]([a-z0-9-]*[a-z0-9])?)*$" + }, + "minItems": 1, + "uniqueItems": true + }, "selection_type": { "type": "string", "const": "by_tag", @@ -73,7 +116,7 @@ }, "property_tags": { "type": "array", - "description": "Property tags from the publisher's adagents.json. Selector covers all properties with these tags", + "description": "Property tags resolved against each addressed publisher's adagents.json. Selector covers all properties carrying any of these tags.", "items": { "$ref": "property-tag.json" }, @@ -81,11 +124,34 @@ } }, "required": [ - "publisher_domain", "selection_type", "property_tags" ], + "allOf": [ + { + "not": { + "required": [ + "publisher_domain", + "publisher_domains" + ] + } + }, + { + "anyOf": [ + { + "required": [ + "publisher_domain" + ] + }, + { + "required": [ + "publisher_domains" + ] + } + ] + } + ], "additionalProperties": true } ] -} \ No newline at end of file +} diff --git a/src/adcp/__init__.py b/src/adcp/__init__.py index 0c8272ed1..cd6e38708 100644 --- a/src/adcp/__init__.py +++ b/src/adcp/__init__.py @@ -11,7 +11,9 @@ from importlib.metadata import version as _pkg_version from adcp.adagents import ( + AdagentsCacheEntry, AdagentsEntryError, + AdagentsFetchResult, AdagentsValidationReport, AdAgentsValidationResult, AuthorizationContext, @@ -19,7 +21,9 @@ EntryErrorKind, domain_matches, fetch_adagents, + fetch_adagents_with_cache, fetch_agent_authorizations, + filter_revoked_selectors, get_all_properties, get_all_tags, get_properties_by_agent, @@ -813,13 +817,17 @@ def get_adcp_version() -> str: "PushNotificationConfig", # Adagents validation "AdAgentsValidationResult", + "AdagentsCacheEntry", "AdagentsEntryError", + "AdagentsFetchResult", "AdagentsValidationReport", "AuthorizationContext", "DiscoveryMethod", "EntryErrorKind", "fetch_adagents", + "fetch_adagents_with_cache", "fetch_agent_authorizations", + "filter_revoked_selectors", "validate_adagents_domain", "validate_adagents_structure", "verify_agent_authorization", diff --git a/src/adcp/adagents.py b/src/adcp/adagents.py index f97ac5e0e..417495a40 100644 --- a/src/adcp/adagents.py +++ b/src/adcp/adagents.py @@ -9,6 +9,7 @@ """ import ipaddress +import json import re from dataclasses import dataclass, field from typing import Any, Literal @@ -445,19 +446,70 @@ def verify_agent_authorization( # arbitrarily large body during the MANAGERDOMAIN fallback. MAX_ADS_TXT_BYTES = 1_048_576 # 1 MiB +# Two-tier size caps for adagents.json fetches (adcp#4504). The pointer +# file served at /.well-known/adagents.json is small (URL reference or +# small inline file); the dereferenced authoritative file behind +# ``authoritative_location`` can be much larger for managed networks +# enumerating thousands of publishers, so a higher cap applies on the +# second hop. +MAX_POINTER_BYTES = 5 * 1024 * 1024 # 5 MiB — first hop SSRF cap +MAX_AUTHORITATIVE_BYTES = 20 * 1024 * 1024 # 20 MiB — second hop + + +@dataclass(frozen=True) +class AdagentsCacheEntry: + """Conditional-refresh cache state for an adagents.json URL. + + Pass an entry into :func:`fetch_adagents_with_cache` to send + ``If-None-Match`` (preferred) and ``If-Modified-Since`` validators + on the next fetch. A 304 from the publisher is treated as a + successful cache-lifetime refresh — the ``body`` is returned + unchanged with refreshed timing, per the adcp#4504 fetch contract. + """ + + body: dict[str, Any] + etag: str | None = None + last_modified: str | None = None + + +@dataclass(frozen=True) +class AdagentsFetchResult: + """Result of a fetch, including refreshed cache validators. + + ``not_modified`` is True when the server returned 304 and ``data`` + came from the supplied cache entry. ``etag`` / ``last_modified`` are + the validators to persist for the next fetch — on 304 they come + from the 304 response headers if present, falling back to the + supplied entry's values. + """ + + data: dict[str, Any] + discovery_method: DiscoveryMethod + etag: str | None = None + last_modified: str | None = None + not_modified: bool = False + async def _resolve_direct( publisher_domain: str, timeout: float, user_agent: str, client: httpx.AsyncClient | None, -) -> tuple[dict[str, Any], DiscoveryMethod]: + cache_entry: AdagentsCacheEntry | None = None, +) -> tuple[dict[str, Any], DiscoveryMethod, str | None, str | None, bool]: """Direct fetch with authoritative_location redirect following. - Returns ``(data, discovery_method)`` where ``discovery_method`` is - ``'direct'`` if no redirect was followed, ``'authoritative_location'`` - otherwise. Raises :class:`AdagentsNotFoundError` on 404 so callers - can attempt the ads.txt MANAGERDOMAIN fallback. + Returns ``(data, discovery_method, etag, last_modified, not_modified)``. + ``discovery_method`` is ``'direct'`` if no redirect was followed, + ``'authoritative_location'`` otherwise. The cache validators come + from the hop that produced ``data`` (the authoritative file when + redirected, the publisher otherwise). Raises + :class:`AdagentsNotFoundError` on 404 so callers can attempt the + ads.txt MANAGERDOMAIN fallback. + + The first hop uses :data:`MAX_POINTER_BYTES` (5 MiB) and any + dereferenced ``authoritative_location`` hop uses + :data:`MAX_AUTHORITATIVE_BYTES` (20 MiB) per adcp#4504. """ url = f"https://{publisher_domain}/.well-known/adagents.json" visited_urls: set[str] = set() @@ -473,9 +525,23 @@ async def _resolve_direct( # Caller's client is only used on the initial publisher fetch; redirect # targets are third-party origins, so use a fresh client per hop. fetch_client = None if is_redirect else client + max_bytes = MAX_AUTHORITATIVE_BYTES if is_redirect else MAX_POINTER_BYTES + # Conditional refresh only applies to the hop that actually produced + # the cached body. For an SDK-level cache, that's whichever hop the + # caller fetched last. The simplest correct behavior is to apply the + # validators on the first hop only — a 304 there short-circuits the + # redirect chain. Pointer files rarely change anyway. + hop_cache = cache_entry if not is_redirect else None try: - data = await _fetch_adagents_url(url, timeout, user_agent, fetch_client) + data, etag, last_modified, not_modified = await _fetch_adagents_url( + url, + timeout, + user_agent, + fetch_client, + max_bytes=max_bytes, + cache_entry=hop_cache, + ) except AdagentsNotFoundError: # A 404 on a followed authoritative_location target is a broken # redirect chain, not a missing publisher manifest. Surface it as @@ -484,10 +550,17 @@ async def _resolve_direct( # really an upstream pointer failure. if is_redirect: raise AdagentsValidationError( - f"authoritative_location target returned 404: {url}" + f"authoritative_location target returned 304/404: {url}" ) from None raise + if not_modified: + # 304 on the first hop: return the cached body and stop — + # the cached body is the previously-resolved authoritative + # data, so no second hop is needed. + discovery: DiscoveryMethod = "authoritative_location" if is_redirect else "direct" + return data, discovery, etag, last_modified, True + if "authoritative_location" in data and "authorized_agents" not in data: authoritative_url = data["authoritative_location"] @@ -509,7 +582,13 @@ async def _resolve_direct( is_redirect = True continue - return data, ("authoritative_location" if is_redirect else "direct") + return ( + data, + ("authoritative_location" if is_redirect else "direct"), + etag, + last_modified, + False, + ) raise AssertionError("Unreachable") # pragma: no cover @@ -625,7 +704,7 @@ async def fetch_adagents( publisher_domain = _validate_publisher_domain(publisher_domain) try: - data, _ = await _resolve_direct(publisher_domain, timeout, user_agent, client) + data, *_ = await _resolve_direct(publisher_domain, timeout, user_agent, client) return data except AdagentsNotFoundError: manager_data = await _try_managerdomain_fallback( @@ -636,6 +715,45 @@ async def fetch_adagents( raise +async def fetch_adagents_with_cache( + publisher_domain: str, + cache_entry: AdagentsCacheEntry | None = None, + timeout: float = 10.0, + user_agent: str = "AdCP-Client/1.0", + client: httpx.AsyncClient | None = None, +) -> AdagentsFetchResult: + """Fetch with conditional refresh — returns body plus refreshed validators. + + Pass the previous fetch's :class:`AdagentsCacheEntry` to send + ``If-None-Match`` / ``If-Modified-Since`` on the next fetch. A 304 + from the publisher is treated as a successful refresh: the cached + ``body`` is returned with ``not_modified=True``, satisfying the + 7-day cache window described in adcp#4504. + + The first hop (``/.well-known/adagents.json``) is capped at 5 MiB; + a dereferenced ``authoritative_location`` file is capped at 20 MiB. + Both caps fail closed — oversized responses raise + :class:`AdagentsValidationError` rather than truncate. + + Does NOT perform the ads.txt ``managerdomain`` fallback; the + fallback is best-effort discovery, not cache-aware refresh, and + bypassing it on 304 keeps the path simple. Callers that need both + behaviors should compose this helper with + :func:`validate_adagents_domain`. + """ + publisher_domain = _validate_publisher_domain(publisher_domain) + data, discovery, etag, last_modified, not_modified = await _resolve_direct( + publisher_domain, timeout, user_agent, client, cache_entry=cache_entry + ) + return AdagentsFetchResult( + data=data, + discovery_method=discovery, + etag=etag, + last_modified=last_modified, + not_modified=not_modified, + ) + + async def _try_managerdomain_fallback( publisher_domain: str, timeout: float, @@ -667,7 +785,9 @@ async def _try_managerdomain_fallback( try: # Manager domain is a different origin from the publisher; use a fresh # client rather than the caller's so credentials don't leak across origins. - data, _ = await _resolve_direct(manager_domain_normalized, timeout, user_agent, client=None) + data, *_ = await _resolve_direct( + manager_domain_normalized, timeout, user_agent, client=None + ) return data except (AdagentsNotFoundError, AdagentsValidationError, AdagentsTimeoutError): return None @@ -714,7 +834,7 @@ async def validate_adagents_domain( url = f"https://{normalized}/.well-known/adagents.json" try: - data, discovery = await _resolve_direct(normalized, timeout, user_agent, client) + data, discovery, *_ = await _resolve_direct(normalized, timeout, user_agent, client) return AdAgentsValidationResult( domain=normalized, url=url, @@ -764,7 +884,7 @@ async def validate_adagents_domain( ) try: - manager_data, _ = await _resolve_direct( + manager_data, *_ = await _resolve_direct( manager_normalized, timeout, user_agent, client=None ) except AdagentsNotFoundError: @@ -802,73 +922,130 @@ async def _fetch_adagents_url( timeout: float, user_agent: str, client: httpx.AsyncClient | None, -) -> dict[str, Any]: + max_bytes: int = MAX_POINTER_BYTES, + cache_entry: AdagentsCacheEntry | None = None, +) -> tuple[dict[str, Any], str | None, str | None, bool]: """Fetch and parse adagents.json from a specific URL. - This is the core fetch logic, separated to support redirect following. + Returns a 4-tuple ``(data, etag, last_modified, not_modified)``. + ``not_modified`` is True only when ``cache_entry`` was supplied and + the origin responded with 304 — in that case ``data`` is the cached + body. Response bodies larger than ``max_bytes`` are rejected (use + :data:`MAX_POINTER_BYTES` for the first hop and + :data:`MAX_AUTHORITATIVE_BYTES` for dereferenced authoritative files + per adcp#4504). """ + headers: dict[str, str] = {"User-Agent": user_agent} + if cache_entry is not None: + if cache_entry.etag: + headers["If-None-Match"] = cache_entry.etag + if cache_entry.last_modified: + headers["If-Modified-Since"] = cache_entry.last_modified + try: - # Use provided client or create a new one if client is not None: - response = await client.get( - url, - headers={"User-Agent": user_agent}, - timeout=timeout, - follow_redirects=True, + body, status_code, response_headers = await _stream_capped( + client, url, headers, timeout, max_bytes ) else: async with httpx.AsyncClient() as new_client: - response = await new_client.get( - url, - headers={"User-Agent": user_agent}, - timeout=timeout, - follow_redirects=True, + body, status_code, response_headers = await _stream_capped( + new_client, url, headers, timeout, max_bytes ) + except httpx.TimeoutException as e: + parsed = urlparse(url) + raise AdagentsTimeoutError(parsed.netloc, timeout) from e + except httpx.RequestError as e: + raise AdagentsValidationError(f"Failed to fetch adagents.json: {e}") from e - # Process response - if response.status_code == 404: - # Extract domain from URL for error message - parsed = urlparse(url) - raise AdagentsNotFoundError(parsed.netloc) - - if response.status_code != 200: + if status_code == 304: + if cache_entry is None: + # The server should not return 304 without a conditional + # request; treat as an error rather than silently returning + # nothing. raise AdagentsValidationError( - f"Failed to fetch adagents.json: HTTP {response.status_code}" + "Received 304 Not Modified without a cache entry to serve" ) + return ( + cache_entry.body, + response_headers.get("etag") or cache_entry.etag, + response_headers.get("last-modified") or cache_entry.last_modified, + True, + ) - # Parse JSON - try: - data = response.json() - except Exception as e: - raise AdagentsValidationError(f"Invalid JSON in adagents.json: {e}") from e + if status_code == 404: + parsed = urlparse(url) + raise AdagentsNotFoundError(parsed.netloc) - # Validate basic structure - if not isinstance(data, dict): - raise AdagentsValidationError("adagents.json must be a JSON object") + if status_code != 200: + raise AdagentsValidationError(f"Failed to fetch adagents.json: HTTP {status_code}") - # If this has authorized_agents, validate it - if "authorized_agents" in data: - if not isinstance(data["authorized_agents"], list): - raise AdagentsValidationError("'authorized_agents' must be an array") + try: + data = json.loads(body) + except Exception as e: + raise AdagentsValidationError(f"Invalid JSON in adagents.json: {e}") from e - # Validate mutual exclusivity constraints - try: - validate_adagents(data) - except ValidationError as e: - raise AdagentsValidationError(f"Invalid adagents.json structure: {e}") from e - elif "authoritative_location" not in data: - # Neither authorized_agents nor authoritative_location - raise AdagentsValidationError( - "adagents.json must have either 'authorized_agents' or 'authoritative_location'" - ) + if not isinstance(data, dict): + raise AdagentsValidationError("adagents.json must be a JSON object") - return data + if "authorized_agents" in data: + if not isinstance(data["authorized_agents"], list): + raise AdagentsValidationError("'authorized_agents' must be an array") - except httpx.TimeoutException as e: - parsed = urlparse(url) - raise AdagentsTimeoutError(parsed.netloc, timeout) from e - except httpx.RequestError as e: - raise AdagentsValidationError(f"Failed to fetch adagents.json: {e}") from e + try: + validate_adagents(data) + except ValidationError as e: + raise AdagentsValidationError(f"Invalid adagents.json structure: {e}") from e + elif "authoritative_location" not in data: + raise AdagentsValidationError( + "adagents.json must have either 'authorized_agents' or 'authoritative_location'" + ) + + return data, response_headers.get("etag"), response_headers.get("last-modified"), False + + +async def _stream_capped( + client: httpx.AsyncClient, + url: str, + headers: dict[str, str], + timeout: float, + max_bytes: int, +) -> tuple[bytes, int, httpx.Headers]: + """Stream a GET and abort if the body exceeds ``max_bytes``. + + Reading the body via ``iter_bytes`` lets us bail before buffering an + oversized response. A ``Content-Length`` larger than the cap is + rejected up-front; servers that omit the header (or lie) are still + caught by the running total inside the loop. + """ + async with client.stream( + "GET", url, headers=headers, timeout=timeout, follow_redirects=True + ) as response: + if response.status_code == 304: + return b"", 304, response.headers + + content_length = response.headers.get("content-length") + if content_length is not None: + try: + if int(content_length) > max_bytes: + raise AdagentsValidationError( + f"adagents.json body Content-Length {content_length} exceeds " + f"size cap of {max_bytes} bytes" + ) + except ValueError: + # malformed Content-Length — fall through to streaming cap + pass + + chunks: list[bytes] = [] + total = 0 + async for chunk in response.aiter_bytes(): + total += len(chunk) + if total > max_bytes: + raise AdagentsValidationError( + f"adagents.json body exceeds size cap of {max_bytes} bytes" + ) + chunks.append(chunk) + return b"".join(chunks), response.status_code, response.headers async def verify_agent_for_property( @@ -949,16 +1126,95 @@ def _resolve_agent_properties( and {t for t in p.get("tags", []) if isinstance(t, str)} & authorized_tags ] - # Handle publisher_properties (cross-domain references) + # Handle publisher_properties (cross-domain references). + # Each entry with publisher_domains[a,b,c] fans out to one selector per + # listed domain — the compact form is exactly equivalent to repeating + # the entry once per publisher per adcp#4504. The original entry is + # also retained so callers that want the as-authored compact form for + # diff stability can still see it. if authorization_type == "publisher_properties": publisher_props = agent.get("publisher_properties", []) if not isinstance(publisher_props, list): return [] - return [p for p in publisher_props if isinstance(p, dict)] + return _fanout_publisher_properties([p for p in publisher_props if isinstance(p, dict)]) return [] +def _fanout_publisher_properties( + publisher_props: list[dict[str, Any]], +) -> list[dict[str, Any]]: + """Expand ``publisher_domains[]`` compact entries into one selector per domain. + + For each entry that uses the compact form, emits one selector per + listed domain with ``publisher_domain`` set and ``publisher_domains`` + stripped — preserving every other key (``selection_type``, + ``property_tags``, custom extensions). Entries that already use the + singular ``publisher_domain`` form pass through unchanged. + + Malformed entries (compact form with non-list / empty + ``publisher_domains``) are dropped silently: structural validation + happens at :func:`validate_publisher_properties_item`, which is the + right layer to raise. The resolver is best-effort and stays useful + on partially-broken files. + """ + out: list[dict[str, Any]] = [] + for entry in publisher_props: + domains = entry.get("publisher_domains") + if domains is None: + out.append(entry) + continue + + if not isinstance(domains, list) or not domains: + continue + + for domain in domains: + if not isinstance(domain, str) or not domain: + continue + expanded = {k: v for k, v in entry.items() if k != "publisher_domains"} + expanded["publisher_domain"] = domain + out.append(expanded) + return out + + +def _get_revoked_publisher_domains(adagents_data: dict[str, Any]) -> set[str]: + """Return the set of publisher domains revoked by this file. + + Validators MUST treat any publisher domain listed in + ``revoked_publisher_domains[]`` as no-longer-authorized regardless of + where else it appears (per adcp#4504). Malformed entries are skipped + — structural validation is the validator's job, not the index's. + """ + revoked_raw = adagents_data.get("revoked_publisher_domains") + if not isinstance(revoked_raw, list): + return set() + revoked: set[str] = set() + for entry in revoked_raw: + if not isinstance(entry, dict): + continue + publisher_domain = entry.get("publisher_domain") + if isinstance(publisher_domain, str) and publisher_domain: + revoked.add(publisher_domain) + return revoked + + +def filter_revoked_selectors( + selectors: list[dict[str, Any]], + revoked_domains: set[str], +) -> list[dict[str, Any]]: + """Strip selectors whose ``publisher_domain`` is revoked. + + Apply this AFTER the compact-form fan-out so each remaining selector + addresses exactly one publisher, then drop any whose domain is in + ``revoked_domains``. Revocation takes precedence over every other + listing of that domain in the file (selectors, top-level properties, + etc.) per adcp#4504. + """ + if not revoked_domains: + return selectors + return [s for s in selectors if s.get("publisher_domain") not in revoked_domains] + + def get_all_properties(adagents_data: dict[str, Any]) -> list[dict[str, Any]]: """Extract all properties from adagents.json data. @@ -985,6 +1241,17 @@ def get_all_properties(adagents_data: dict[str, Any]) -> list[dict[str, Any]]: if not isinstance(top_level_properties, list): top_level_properties = [] + revoked = _get_revoked_publisher_domains(adagents_data) + revoked_top_level = [ + p + for p in top_level_properties + if not ( + isinstance(p, dict) + and isinstance(p.get("publisher_domain"), str) + and p["publisher_domain"] in revoked + ) + ] + properties = [] for agent in authorized_agents: if not isinstance(agent, dict): @@ -994,7 +1261,9 @@ def get_all_properties(adagents_data: dict[str, Any]) -> list[dict[str, Any]]: if not agent_url: continue - agent_properties = _resolve_agent_properties(agent, top_level_properties) + agent_properties = _resolve_agent_properties(agent, revoked_top_level) + if revoked and agent.get("authorization_type") == "publisher_properties": + agent_properties = filter_revoked_selectors(agent_properties, revoked) for prop in agent_properties: prop_with_agent = {**prop, "agent_url": agent_url} @@ -1059,6 +1328,17 @@ def get_properties_by_agent(adagents_data: dict[str, Any], agent_url: str) -> li if not isinstance(top_level_properties, list): top_level_properties = [] + revoked = _get_revoked_publisher_domains(adagents_data) + revoked_top_level = [ + p + for p in top_level_properties + if not ( + isinstance(p, dict) + and isinstance(p.get("publisher_domain"), str) + and p["publisher_domain"] in revoked + ) + ] + normalized_agent_url = normalize_url(agent_url) for agent in authorized_agents: @@ -1072,7 +1352,10 @@ def get_properties_by_agent(adagents_data: dict[str, Any], agent_url: str) -> li if normalize_url(agent_url_from_json) != normalized_agent_url: continue - return _resolve_agent_properties(agent, top_level_properties) + resolved = _resolve_agent_properties(agent, revoked_top_level) + if revoked and agent.get("authorization_type") == "publisher_properties": + resolved = filter_revoked_selectors(resolved, revoked) + return resolved return [] diff --git a/src/adcp/types/generated_poc/adagents.py b/src/adcp/types/generated_poc/adagents.py index 2aa9fe056..9db41f509 100644 --- a/src/adcp/types/generated_poc/adagents.py +++ b/src/adcp/types/generated_poc/adagents.py @@ -1,6 +1,6 @@ # generated by datamodel-codegen: # filename: adagents.json -# timestamp: 2026-05-08T13:40:47+00:00 +# timestamp: 2026-05-20T04:40:04+00:00 from __future__ import annotations @@ -95,6 +95,38 @@ class Contact(AdCPBaseModel): ] = None +class Reason(Enum): + relationship_ended = 'relationship_ended' + compliance_violation = 'compliance_violation' + publisher_request = 'publisher_request' + other = 'other' + + +class RevokedPublisherDomain(AdCPBaseModel): + model_config = ConfigDict( + extra='allow', + ) + publisher_domain: Annotated[ + str, + Field( + description='Publisher domain being revoked. Matches against the same canonicalized form used in `publisher_properties[].publisher_domain`.', + pattern='^[a-z0-9]([a-z0-9-]*[a-z0-9])?(\\.[a-z0-9]([a-z0-9-]*[a-z0-9])?)*$', + ), + ] + revoked_at: Annotated[ + AwareDatetime, + Field( + description='ISO 8601 timestamp when this publisher was revoked. Validators MAY use this to order revocations against their own cached state.' + ), + ] + reason: Annotated[ + Reason | None, + Field( + description='Reason for revocation. Operator-internal self-classification for review routing — not a public accusation. `relationship_ended` is the routine commercial case. `compliance_violation` SHOULD be used only when the network has itself determined the publisher is out of policy; for un-adjudicated third-party allegations (regulator inquiries, advertiser complaints, ongoing investigations), use `other` to avoid making a discoverable adverse statement. `publisher_request` is for publisher-initiated exits.' + ), + ] = None + + class Tags(AdCPBaseModel): model_config = ConfigDict( extra='allow', @@ -644,6 +676,12 @@ class AdcpAgentsAuthorization2(AdCPBaseModel): min_length=1, ), ] = None + revoked_publisher_domains: Annotated[ + list[RevokedPublisherDomain] | None, + Field( + description="Publisher domains explicitly removed from this managed network. Validators MUST treat any publisher domain listed here as no-longer-authorized, taking precedence over any appearance of the same domain in `authorized_agents[].publisher_properties[].publisher_domain` / `.publisher_domains[]` or in top-level `properties[].publisher_domain`. Lets a network propagate per-publisher revocations on the next refresh instead of waiting for the file-level 7-day cache cap. Validators MUST hold previously-observed `(publisher_domain, revoked_at)` tuples for 7 days from the validator's first observation, even if the entry vanishes from a subsequent fetch — this closes the rollback gap where an attacker re-serves a stale file with the revocation removed. Networks SHOULD retain entries for at least 7 days after `revoked_at` so validators that didn't observe the original entry still pick it up on refresh." + ), + ] = None collections: Annotated[ list[collection.Collection] | None, Field( diff --git a/src/adcp/types/generated_poc/core/publisher_property_selector.py b/src/adcp/types/generated_poc/core/publisher_property_selector.py index 63e3909b2..39f1ba38e 100644 --- a/src/adcp/types/generated_poc/core/publisher_property_selector.py +++ b/src/adcp/types/generated_poc/core/publisher_property_selector.py @@ -1,6 +1,6 @@ # generated by datamodel-codegen: # filename: core/publisher_property_selector.json -# timestamp: 2026-05-02T19:36:29+00:00 +# timestamp: 2026-05-20T04:40:04+00:00 from __future__ import annotations @@ -12,21 +12,34 @@ from . import property_id, property_tag +class PublisherDomain(RootModel[str]): + root: Annotated[ + str, Field(pattern='^[a-z0-9]([a-z0-9-]*[a-z0-9])?(\\.[a-z0-9]([a-z0-9-]*[a-z0-9])?)*$') + ] + + class PublisherPropertySelector1(AdCPBaseModel): model_config = ConfigDict( extra='allow', ) publisher_domain: Annotated[ - str, + str | None, Field( - description="Domain where publisher's adagents.json is hosted (e.g., 'cnn.com')", + description="Domain where publisher's adagents.json is hosted (e.g., 'cnn.com'). XOR with `publisher_domains` — exactly one MUST be present on each `publisher_properties[]` entry; both-present and neither-present both fail validation.", pattern='^[a-z0-9]([a-z0-9-]*[a-z0-9])?(\\.[a-z0-9]([a-z0-9-]*[a-z0-9])?)*$', ), - ] + ] = None + publisher_domains: Annotated[ + list[PublisherDomain] | None, + Field( + description="Compact form for fanning the same selector across many publishers (e.g., a managed network listing every publisher it represents). Each entry is the domain where that publisher's adagents.json is hosted. Each listed domain MUST be canonicalized to lowercase (the `pattern` already rejects uppercase). Mutually exclusive with `publisher_domain`. Each listed domain counts as explicitly scoped for the `managerdomain` fallback safety rule.", + min_length=1, + ), + ] = None selection_type: Annotated[ Literal['all'], Field( - description='Discriminator indicating all properties from this publisher are included' + description='Discriminator indicating all properties from each addressed publisher are included' ), ] = 'all' @@ -38,7 +51,7 @@ class PublisherPropertySelector2(AdCPBaseModel): publisher_domain: Annotated[ str, Field( - description="Domain where publisher's adagents.json is hosted (e.g., 'cnn.com')", + description="Domain where publisher's adagents.json is hosted (e.g., 'cnn.com').", pattern='^[a-z0-9]([a-z0-9-]*[a-z0-9])?(\\.[a-z0-9]([a-z0-9-]*[a-z0-9])?)*$', ), ] @@ -57,19 +70,26 @@ class PublisherPropertySelector3(AdCPBaseModel): extra='allow', ) publisher_domain: Annotated[ - str, + str | None, Field( - description="Domain where publisher's adagents.json is hosted (e.g., 'cnn.com')", + description="Domain where publisher's adagents.json is hosted (e.g., 'cnn.com'). XOR with `publisher_domains` — exactly one MUST be present on each `publisher_properties[]` entry; both-present and neither-present both fail validation.", pattern='^[a-z0-9]([a-z0-9-]*[a-z0-9])?(\\.[a-z0-9]([a-z0-9-]*[a-z0-9])?)*$', ), - ] + ] = None + publisher_domains: Annotated[ + list[PublisherDomain] | None, + Field( + description="Compact form for fanning the same tag predicate across many publishers (canonical managed-network shape). Each entry is the domain where that publisher's adagents.json is hosted. Each listed domain MUST be canonicalized to lowercase (the `pattern` already rejects uppercase). Mutually exclusive with `publisher_domain`. Each listed domain counts as explicitly scoped for the `managerdomain` fallback safety rule.", + min_length=1, + ), + ] = None selection_type: Annotated[ Literal['by_tag'], Field(description='Discriminator indicating selection by property tags') ] = 'by_tag' property_tags: Annotated[ list[property_tag.PropertyTag], Field( - description="Property tags from the publisher's adagents.json. Selector covers all properties with these tags", + description="Property tags resolved against each addressed publisher's adagents.json. Selector covers all properties carrying any of these tags.", min_length=1, ), ] @@ -81,7 +101,7 @@ class PublisherPropertySelector( root: Annotated[ PublisherPropertySelector1 | PublisherPropertySelector2 | PublisherPropertySelector3, Field( - description="Selects properties from a publisher's adagents.json. Used for both product definitions and agent authorization. Supports three selection patterns: all properties, specific IDs, or by tags.", + description="Selects properties from a publisher's adagents.json. Used for both product definitions and agent authorization. Supports three selection patterns: all properties, specific IDs, or by tags. Each selector targets one publisher via `publisher_domain` (string) or a fan-out across many publishers that share the same selector via `publisher_domains` (array). Exactly one of `publisher_domain` or `publisher_domains` MUST be present. When `publisher_domains` is used, the selector is logically equivalent to repeating the same entry once per listed domain.", discriminator='selection_type', title='Publisher Property Selector', ), diff --git a/src/adcp/validation/__init__.py b/src/adcp/validation/__init__.py index 49f42835c..a9a5d0f6c 100644 --- a/src/adcp/validation/__init__.py +++ b/src/adcp/validation/__init__.py @@ -30,6 +30,7 @@ validate_agent_authorization, validate_product, validate_publisher_properties_item, + validate_revoked_publisher_domain_entry, ) from adcp.validation.schema_errors import ( AdcpValidationErrorDetails, @@ -59,6 +60,7 @@ "validate_agent_authorization", "validate_product", "validate_publisher_properties_item", + "validate_revoked_publisher_domain_entry", # Schema core "Direction", "ResponseVariant", diff --git a/src/adcp/validation/legacy.py b/src/adcp/validation/legacy.py index b3742f082..de76735fe 100644 --- a/src/adcp/validation/legacy.py +++ b/src/adcp/validation/legacy.py @@ -28,13 +28,19 @@ class ValidationError(ValueError): def validate_publisher_properties_item(item: dict[str, Any]) -> None: - """Validate publisher_properties item discriminated union. + """Validate a single ``publisher_properties[]`` entry. - AdCP v2.4.0+ uses discriminated unions with selection_type discriminator: - - selection_type: "by_id" requires property_ids - - selection_type: "by_tag" requires property_tags + Two XORs are enforced per the publisher-property-selector JSON Schema + (adcp#4504): - For backward compatibility, also validates the old mutual exclusivity constraint. + * Selector XOR: exactly one of ``property_ids`` / ``property_tags`` + is present for ``by_id`` / ``by_tag`` (``all`` requires neither). + * Publisher XOR: exactly one of ``publisher_domain`` (singular) or + ``publisher_domains`` (compact array) is present — both or neither + both fail. ``publisher_domains`` is NOT allowed on + ``selection_type='by_id'`` since property IDs are publisher-scoped; + callers wanting per-publisher ID sets must use one entry per + publisher. Args: item: A single item from publisher_properties array @@ -45,8 +51,10 @@ def validate_publisher_properties_item(item: dict[str, Any]) -> None: selection_type = item.get("selection_type") has_property_ids = "property_ids" in item and item["property_ids"] is not None has_property_tags = "property_tags" in item and item["property_tags"] is not None + has_publisher_domain = "publisher_domain" in item and item["publisher_domain"] is not None + publisher_domains = item.get("publisher_domains") + has_publisher_domains = publisher_domains is not None - # If selection_type discriminator is present, validate discriminated union if selection_type: if selection_type == "by_id" and not has_property_ids: raise ValidationError( @@ -56,24 +64,60 @@ def validate_publisher_properties_item(item: dict[str, Any]) -> None: raise ValidationError( "publisher_properties item with selection_type='by_tag' must have property_tags" ) - elif selection_type not in ("by_id", "by_tag"): + elif selection_type not in ("all", "by_id", "by_tag"): raise ValidationError( f"publisher_properties item has invalid selection_type: {selection_type}" ) - # Validate mutual exclusivity (for both old and new formats) if has_property_ids and has_property_tags: raise ValidationError( "publisher_properties item cannot have both property_ids and property_tags. " "These fields are mutually exclusive." ) - if not has_property_ids and not has_property_tags: + # selection_type='all' carries neither selector array; older callers + # without the discriminator must still provide one of the two. + if selection_type not in ("all",) and not has_property_ids and not has_property_tags: raise ValidationError( "publisher_properties item must have either property_ids or property_tags. " "At least one is required." ) + if has_publisher_domain and has_publisher_domains: + raise ValidationError( + "publisher_properties item cannot have both publisher_domain and " + "publisher_domains. These fields are mutually exclusive (XOR)." + ) + + if not has_publisher_domain and not has_publisher_domains: + raise ValidationError( + "publisher_properties item must have exactly one of publisher_domain " + "or publisher_domains." + ) + + if has_publisher_domains and selection_type == "by_id": + # by_id is single-publisher only — property IDs are publisher-scoped, + # so fanning the same ID set across multiple publishers is meaningless. + raise ValidationError( + "publisher_properties item with selection_type='by_id' cannot use " + "publisher_domains[]; property IDs are publisher-scoped. Use one " + "entry per publisher with publisher_domain." + ) + + if has_publisher_domains: + if not isinstance(publisher_domains, list) or len(publisher_domains) == 0: + raise ValidationError( + "publisher_properties item publisher_domains must be a non-empty array" + ) + if any(not isinstance(d, str) or not d for d in publisher_domains): + raise ValidationError( + "publisher_properties item publisher_domains entries must be non-empty strings" + ) + if len(set(publisher_domains)) != len(publisher_domains): + raise ValidationError( + "publisher_properties item publisher_domains entries must be unique" + ) + def validate_agent_authorization(agent: dict[str, Any]) -> None: """Validate agent authorization discriminated union. @@ -158,6 +202,41 @@ def validate_product(product: dict[str, Any]) -> None: validate_publisher_properties_item(item) +_REVOCATION_REASONS = frozenset( + {"relationship_ended", "compliance_violation", "publisher_request", "other"} +) + + +def validate_revoked_publisher_domain_entry(entry: dict[str, Any]) -> None: + """Validate a single ``revoked_publisher_domains[]`` entry. + + Required fields: ``publisher_domain`` (non-empty string) and + ``revoked_at`` (RFC 3339 / ISO 8601 string). Optional ``reason`` + must be one of the four enum values when present. Wall-clock parsing + of ``revoked_at`` is left to Pydantic (``AwareDatetime`` in the + generated model) — this helper only enforces shape on raw dicts. + """ + publisher_domain = entry.get("publisher_domain") + if not isinstance(publisher_domain, str) or not publisher_domain: + raise ValidationError( + "revoked_publisher_domains entry must have a non-empty " "'publisher_domain' string" + ) + + revoked_at = entry.get("revoked_at") + if not isinstance(revoked_at, str) or not revoked_at: + raise ValidationError( + "revoked_publisher_domains entry must have a non-empty " + "'revoked_at' ISO 8601 timestamp string" + ) + + reason = entry.get("reason") + if reason is not None and reason not in _REVOCATION_REASONS: + raise ValidationError( + f"revoked_publisher_domains entry has invalid reason={reason!r} " + f"(expected one of: {', '.join(sorted(_REVOCATION_REASONS))})" + ) + + def validate_adagents(adagents: dict[str, Any]) -> None: """Validate an adagents.json structure. @@ -170,3 +249,12 @@ def validate_adagents(adagents: dict[str, Any]) -> None: if "agents" in adagents: for agent in adagents["agents"]: validate_agent_authorization(agent) + + revoked = adagents.get("revoked_publisher_domains") + if revoked is not None: + if not isinstance(revoked, list): + raise ValidationError("'revoked_publisher_domains' must be an array") + for entry in revoked: + if not isinstance(entry, dict): + raise ValidationError("revoked_publisher_domains entry must be an object") + validate_revoked_publisher_domain_entry(entry) diff --git a/tests/fixtures/public_api_snapshot.json b/tests/fixtures/public_api_snapshot.json index b79eedd34..d927297d5 100644 --- a/tests/fixtures/public_api_snapshot.json +++ b/tests/fixtures/public_api_snapshot.json @@ -27,7 +27,9 @@ "ActivateSignalResponse", "ActivateSignalSuccessResponse", "AdAgentsValidationResult", + "AdagentsCacheEntry", "AdagentsEntryError", + "AdagentsFetchResult", "AdagentsNotFoundError", "AdagentsTimeoutError", "AdagentsValidationError", @@ -348,7 +350,9 @@ "domain_matches", "extract_webhook_result_data", "fetch_adagents", + "fetch_adagents_with_cache", "fetch_agent_authorizations", + "filter_revoked_selectors", "generate_webhook_idempotency_key", "generated", "get_adcp_signed_headers_for_webhook", diff --git a/tests/test_adagents.py b/tests/test_adagents.py index 44195fd14..2ad8225d1 100644 --- a/tests/test_adagents.py +++ b/tests/test_adagents.py @@ -2,6 +2,7 @@ """Tests for adagents.json validation functionality.""" +import json import unittest.mock from unittest.mock import AsyncMock, MagicMock @@ -25,11 +26,133 @@ ) +def _make_stream_response( + *, + status_code: int, + body: bytes | None = None, + headers: dict[str, str] | None = None, +) -> MagicMock: + """Build a mock streaming response with status, headers, aiter_bytes.""" + response = MagicMock() + response.status_code = status_code + response.headers = httpx.Headers(headers or {}) + + body_bytes = body or b"" + + async def aiter_bytes(): + if body_bytes: + yield body_bytes + + response.aiter_bytes = aiter_bytes + return response + + +def _stream_cm(response: MagicMock): + """Wrap a response as an async context manager for client.stream(...).""" + cm = MagicMock() + cm.__aenter__ = AsyncMock(return_value=response) + cm.__aexit__ = AsyncMock(return_value=None) + return cm + + +def make_text_url_client(url_to_text, called_urls=None): + """Like ``make_url_dispatching_client`` but for plain-text ads.txt fetches. + + Used to mock the ads.txt MANAGERDOMAIN fallback path, which still + uses ``client.get()``/``response.text`` rather than the streaming + fetch. Maps URL → text body (200) or None (404). + """ + + async def _get(url, **kwargs): + if called_urls is not None: + called_urls.append(url) + body = url_to_text.get(url) + response = MagicMock() + if body is None: + response.status_code = 404 + response.text = "" + response.content = b"" + else: + response.status_code = 200 + response.text = body + response.content = body.encode("utf-8") + return response + + mock_client = MagicMock() + mock_client.get = AsyncMock(side_effect=_get) + mock_client.__aenter__ = AsyncMock(return_value=mock_client) + mock_client.__aexit__ = AsyncMock(return_value=None) + return mock_client + + +def make_url_dispatching_client(url_to_payload, called_urls=None, default_status=200): + """Return a mock client whose .stream() dispatches by request URL. + + ``url_to_payload`` maps URL → either a JSON-serializable dict (sent + with ``default_status``) or a tuple ``(dict, status_code, headers)``. + For URLs not in the map, the mock returns 404. If ``called_urls`` + is provided, every request appends to it in order. + """ + + def _stream(method, url, **kwargs): + if called_urls is not None: + called_urls.append(url) + entry = url_to_payload.get(url) + if entry is None: + response = _make_stream_response(status_code=404) + return _stream_cm(response) + if isinstance(entry, tuple): + data, status, headers = entry + else: + data, status, headers = entry, default_status, {} + body = json.dumps(data).encode("utf-8") if data is not None else b"" + response = _make_stream_response(status_code=status, body=body, headers=headers) + return _stream_cm(response) + + mock_client = MagicMock() + mock_client.stream = MagicMock(side_effect=_stream) + mock_client.__aenter__ = AsyncMock(return_value=mock_client) + mock_client.__aexit__ = AsyncMock(return_value=None) + return mock_client + + def create_mock_httpx_client(mock_response): - """Helper to create a properly mocked httpx.AsyncClient.""" - mock_get = AsyncMock(return_value=mock_response) + """Build a mock AsyncClient compatible with the streaming fetch path. + + Accepts a response built by tests with ``.status_code`` and either + ``.json.return_value`` (legacy ergonomic) or ``.content`` / + ``.text``. Translates that into a stream-capable mock by serializing + the legacy JSON return value into the streamed body. + """ + headers: dict[str, str] = {} + if hasattr(mock_response, "headers") and mock_response.headers: + try: + headers = dict(mock_response.headers) + except (TypeError, ValueError): + headers = {} + + if hasattr(mock_response, "_mock_children") and "content" in mock_response._mock_children: + body_bytes = mock_response.content + elif ( + hasattr(mock_response, "json") + and getattr(mock_response.json, "return_value", None) is not None + ): + body_bytes = json.dumps(mock_response.json.return_value).encode("utf-8") + else: + body_bytes = b"" + + stream_response = _make_stream_response( + status_code=mock_response.status_code, + body=body_bytes, + headers=headers, + ) + mock_client_instance = MagicMock() - mock_client_instance.get = mock_get + mock_client_instance.stream = MagicMock(return_value=_stream_cm(stream_response)) + # Retain .get for tests that still assert against it; the production + # code calls .stream(), so .get is effectively unused but kept callable + # so legacy .get.assert_* assertions continue to operate on a real Mock. + mock_client_instance.get = AsyncMock(return_value=mock_response) mock_client_instance.__aenter__.return_value = mock_client_instance mock_client_instance.__aexit__.return_value = AsyncMock() return mock_client_instance @@ -398,8 +521,8 @@ async def test_fetch_success(self): result = await fetch_adagents("example.com", client=mock_client) assert result == mock_adagents_data - mock_client.get.assert_called_once() - call_args = mock_client.get.call_args + mock_client.stream.assert_called_once() + call_args = mock_client.stream.call_args assert "https://example.com/.well-known/adagents.json" in str(call_args) @pytest.mark.asyncio @@ -429,35 +552,21 @@ async def test_fetch_follows_authoritative_location(self): "last_updated": "2025-01-15T10:00:00Z", } - # Mock client for the initial fetch (returns redirect) called_urls: list[str] = [] - - async def mock_get(url, **kwargs): - called_urls.append(url) - mock_response = MagicMock() - mock_response.status_code = 200 - mock_response.json.return_value = redirect_response_data - return mock_response - - mock_client = MagicMock() - mock_client.get = mock_get + mock_client = make_url_dispatching_client( + {"https://example.com/.well-known/adagents.json": redirect_response_data}, + called_urls=called_urls, + ) # Redirect hop uses a fresh client — mock httpx.AsyncClient for that - class MockRedirectClient: - async def get(self, url, **kwargs): - called_urls.append(url) - mock_response = MagicMock() - mock_response.status_code = 200 - mock_response.json.return_value = resolved_data - return mock_response - - async def __aenter__(self): - return self - - async def __aexit__(self, *args): - pass + redirect_client = make_url_dispatching_client( + {"https://cdn.example.com/adagents/v2/adagents.json": resolved_data}, + called_urls=called_urls, + ) - with unittest.mock.patch.object(adagents_module.httpx, "AsyncClient", MockRedirectClient): + with unittest.mock.patch.object( + adagents_module.httpx, "AsyncClient", lambda *a, **kw: redirect_client + ): result = await fetch_adagents("example.com", client=mock_client) assert result == resolved_data @@ -516,33 +625,27 @@ async def test_fetch_enforces_max_redirect_depth(self): # Create a long chain of redirects call_count = [0] - async def mock_get(url, **kwargs): + def _stream(method, url, **kwargs): call_count[0] += 1 - mock_response = MagicMock() - mock_response.status_code = 200 - # Always return a redirect to a new URL - mock_response.json.return_value = { + data = { "$schema": "/schemas/2.6.0/adagents.json", "authoritative_location": f"https://cdn{call_count[0]}.example.com/adagents.json", "last_updated": "2025-01-15T10:00:00Z", } - return mock_response + response = _make_stream_response(status_code=200, body=json.dumps(data).encode("utf-8")) + return _stream_cm(response) mock_client = MagicMock() - mock_client.get = mock_get - - # Redirect hops use a fresh client, so patch httpx.AsyncClient too - class MockRedirectClient: - async def get(self, url, **kwargs): - return await mock_get(url, **kwargs) + mock_client.stream = MagicMock(side_effect=_stream) - async def __aenter__(self): - return self + redirect_client = MagicMock() + redirect_client.stream = MagicMock(side_effect=_stream) + redirect_client.__aenter__ = AsyncMock(return_value=redirect_client) + redirect_client.__aexit__ = AsyncMock(return_value=None) - async def __aexit__(self, *args): - pass - - with unittest.mock.patch.object(adagents_module.httpx, "AsyncClient", MockRedirectClient): + with unittest.mock.patch.object( + adagents_module.httpx, "AsyncClient", lambda *a, **kw: redirect_client + ): with pytest.raises(AdagentsValidationError, match="redirect|depth"): await fetch_adagents("example.com", client=mock_client) @@ -682,35 +785,21 @@ async def test_redirect_uses_fresh_client(self): "last_updated": "2025-01-15T10:00:00Z", } - caller_urls = [] - - async def mock_get(url, **kwargs): - caller_urls.append(url) - mock_response = MagicMock() - mock_response.status_code = 200 - mock_response.json.return_value = redirect_data - return mock_response - - caller_client = MagicMock() - caller_client.get = mock_get - - fresh_client_urls = [] - - class TrackingClient: - async def get(self, url, **kwargs): - fresh_client_urls.append(url) - mock_response = MagicMock() - mock_response.status_code = 200 - mock_response.json.return_value = resolved_data - return mock_response - - async def __aenter__(self): - return self + caller_urls: list[str] = [] + caller_client = make_url_dispatching_client( + {"https://example.com/.well-known/adagents.json": redirect_data}, + called_urls=caller_urls, + ) - async def __aexit__(self, *args): - pass + fresh_client_urls: list[str] = [] + fresh_client = make_url_dispatching_client( + {"https://cdn.other-domain.com/adagents.json": resolved_data}, + called_urls=fresh_client_urls, + ) - with unittest.mock.patch.object(adagents_module.httpx, "AsyncClient", TrackingClient): + with unittest.mock.patch.object( + adagents_module.httpx, "AsyncClient", lambda *a, **kw: fresh_client + ): result = await fetch_adagents("example.com", client=caller_client) # Initial fetch used the caller's client @@ -750,29 +839,16 @@ async def test_allows_public_domain_redirect(self): "last_updated": "2025-01-15T10:00:00Z", } - async def mock_get(url, **kwargs): - mock_response = MagicMock() - mock_response.status_code = 200 - mock_response.json.return_value = redirect_data - return mock_response - - mock_client = MagicMock() - mock_client.get = mock_get - - class MockRedirectClient: - async def get(self, url, **kwargs): - mock_response = MagicMock() - mock_response.status_code = 200 - mock_response.json.return_value = resolved_data - return mock_response - - async def __aenter__(self): - return self - - async def __aexit__(self, *args): - pass + mock_client = make_url_dispatching_client( + {"https://example.com/.well-known/adagents.json": redirect_data}, + ) + redirect_client = make_url_dispatching_client( + {"https://cdn.example.com/adagents/v2/adagents.json": resolved_data}, + ) - with unittest.mock.patch.object(adagents_module.httpx, "AsyncClient", MockRedirectClient): + with unittest.mock.patch.object( + adagents_module.httpx, "AsyncClient", lambda *a, **kw: redirect_client + ): result = await fetch_adagents("example.com", client=mock_client) assert "authorized_agents" in result @@ -812,7 +888,7 @@ async def test_verify_success(self): ) assert result is True - mock_client.get.assert_called_once() + mock_client.stream.assert_called_once() class TestGetAllProperties: @@ -1904,12 +1980,27 @@ class TestValidateAdagentsDomain: """Test validate_adagents_domain typed validator with discovery_method.""" def _build_mock_client(self, url_handler): - """Mock client whose .get(url, **kw) returns whatever url_handler(url) yields.""" + """Mock client backing both .stream() (adagents) and .get() (ads.txt). + + ``url_handler(url)`` returns a MagicMock built by ``_ok`` / + ``_not_found`` / ``_text``. For adagents URLs we adapt that + legacy-style response into a stream-capable mock; for ads.txt + URLs the legacy ``.get()``/``.text`` path is preserved since the + ads.txt fetch never went through the new streaming code. + """ + + def _stream(method, url, **kwargs): + response = url_handler(url) + body_data = response.json.return_value if response.status_code == 200 else None + body = json.dumps(body_data).encode("utf-8") if body_data else b"" + stream_response = _make_stream_response(status_code=response.status_code, body=body) + return _stream_cm(stream_response) async def mock_get(url, **kwargs): return url_handler(url) mock_client = MagicMock() + mock_client.stream = MagicMock(side_effect=_stream) mock_client.get = mock_get return mock_client @@ -1931,6 +2022,7 @@ def _text(self, body, status=200): response = MagicMock() response.status_code = status response.text = body + response.content = body.encode("utf-8") if isinstance(body, str) else body response.json.return_value = {} return response @@ -1974,22 +2066,13 @@ async def test_authoritative_location_discovery(self): def handler(url): return self._ok(redirect) - # Redirect hop uses a fresh httpx.AsyncClient — patch it to serve resolved. - class RedirectClient: - async def get(self, url, **kwargs): - response = MagicMock() - response.status_code = 200 - response.json.return_value = resolved - response.text = "" - return response - - async def __aenter__(self): - return self - - async def __aexit__(self, *args): - pass + redirect_client = make_url_dispatching_client( + {"https://cdn.example.com/adagents.json": resolved}, + ) - with unittest.mock.patch.object(adagents_module.httpx, "AsyncClient", RedirectClient): + with unittest.mock.patch.object( + adagents_module.httpx, "AsyncClient", lambda *a, **kw: redirect_client + ): result = await validate_adagents_domain( "publisher.example", client=self._build_mock_client(handler) ) @@ -2018,24 +2101,13 @@ def handler(url): return self._text("MANAGERDOMAIN=manager.example\n") raise AssertionError(f"unexpected url {url}") - # Fresh client (used for cross-origin manager fetch) serves manager adagents. - class ManagerClient: - async def get(self, url, **kwargs): - if url == "https://manager.example/.well-known/adagents.json": - response = MagicMock() - response.status_code = 200 - response.json.return_value = manager_adagents - response.text = "" - return response - raise AssertionError(f"unexpected manager url {url}") - - async def __aenter__(self): - return self - - async def __aexit__(self, *args): - pass + manager_client = make_url_dispatching_client( + {"https://manager.example/.well-known/adagents.json": manager_adagents}, + ) - with unittest.mock.patch.object(adagents_module.httpx, "AsyncClient", ManagerClient): + with unittest.mock.patch.object( + adagents_module.httpx, "AsyncClient", lambda *a, **kw: manager_client + ): result = await validate_adagents_domain( "publisher.example", client=self._build_mock_client(handler) ) @@ -2085,27 +2157,14 @@ def handler(url): raise AssertionError(f"unexpected url {url}") attempted_urls: list[str] = [] + manager_client = make_url_dispatching_client( + {"https://good-manager.example/.well-known/adagents.json": manager_adagents}, + called_urls=attempted_urls, + ) - class ManagerClient: - async def get(self, url, **kwargs): - attempted_urls.append(url) - if url == "https://good-manager.example/.well-known/adagents.json": - response = MagicMock() - response.status_code = 200 - response.json.return_value = manager_adagents - response.text = "" - return response - if url == "https://bad-manager.example/.well-known/adagents.json": - raise AssertionError("bad-manager.example must not be tried; last entry wins") - raise AssertionError(f"unexpected manager url {url}") - - async def __aenter__(self): - return self - - async def __aexit__(self, *args): - pass - - with unittest.mock.patch.object(adagents_module.httpx, "AsyncClient", ManagerClient): + with unittest.mock.patch.object( + adagents_module.httpx, "AsyncClient", lambda *a, **kw: manager_client + ): result = await validate_adagents_domain( "publisher.example", client=self._build_mock_client(handler) ) @@ -2114,6 +2173,9 @@ async def __aexit__(self, *args): assert result.discovery_method == "ads_txt_managerdomain" assert result.manager_domain == "good-manager.example" assert "https://good-manager.example/.well-known/adagents.json" in attempted_urls + assert ( + "https://bad-manager.example/.well-known/adagents.json" not in attempted_urls + ), "bad-manager.example must not be tried; last entry wins" @pytest.mark.asyncio async def test_manager_domain_404_is_terminal_failure(self): @@ -2127,22 +2189,11 @@ def handler(url): return self._text("MANAGERDOMAIN=manager.example\n") raise AssertionError(f"unexpected url {url}") - class ManagerClient: - async def get(self, url, **kwargs): - # Manager domain also 404s. - response = MagicMock() - response.status_code = 404 - response.json.return_value = {} - response.text = "" - return response + manager_client = make_url_dispatching_client({}) # always 404 - async def __aenter__(self): - return self - - async def __aexit__(self, *args): - pass - - with unittest.mock.patch.object(adagents_module.httpx, "AsyncClient", ManagerClient): + with unittest.mock.patch.object( + adagents_module.httpx, "AsyncClient", lambda *a, **kw: manager_client + ): result = await validate_adagents_domain( "publisher.example", client=self._build_mock_client(handler) ) @@ -2245,22 +2296,11 @@ def handler(url): return self._text("MANAGERDOMAIN=manager.example\n") raise AssertionError(f"unexpected url {url}") - class RedirectClient: - async def get(self, url, **kwargs): - # Authoritative location 404s. - response = MagicMock() - response.status_code = 404 - response.json.return_value = {} - response.text = "" - return response - - async def __aenter__(self): - return self - - async def __aexit__(self, *args): - pass + redirect_client = make_url_dispatching_client({}) # always 404 - with unittest.mock.patch.object(adagents_module.httpx, "AsyncClient", RedirectClient): + with unittest.mock.patch.object( + adagents_module.httpx, "AsyncClient", lambda *a, **kw: redirect_client + ): with pytest.raises(AdagentsValidationError, match="authoritative_location"): await fetch_adagents("publisher.example", client=self._build_mock_client(handler)) @@ -2675,3 +2715,472 @@ def test_report_dataclass_is_immutable(self): err = AdagentsEntryError(index=0, kind="missing_url", message="x") with pytest.raises(dataclasses.FrozenInstanceError): err.index = 1 # type: ignore[misc] + + +class TestPublisherDomainsCompactForm: + """adcp#4504: ``publisher_domains[]`` fan-out + XOR + ``by_id`` restriction.""" + + def test_fanout_singular_passes_through(self): + from adcp.adagents import _fanout_publisher_properties + + entry = {"selection_type": "all", "publisher_domain": "cnn.com"} + assert _fanout_publisher_properties([entry]) == [entry] + + def test_fanout_expands_compact_form(self): + from adcp.adagents import _fanout_publisher_properties + + out = _fanout_publisher_properties( + [ + { + "selection_type": "by_tag", + "property_tags": ["ctv"], + "publisher_domains": ["site1.example", "site2.example", "site3.example"], + } + ] + ) + assert [s["publisher_domain"] for s in out] == [ + "site1.example", + "site2.example", + "site3.example", + ] + # property_tags carried through to every expanded selector + assert all(s.get("property_tags") == ["ctv"] for s in out) + # publisher_domains stripped from each expanded selector + assert all("publisher_domains" not in s for s in out) + + def test_fanout_skips_invalid_compact_entries(self): + from adcp.adagents import _fanout_publisher_properties + + out = _fanout_publisher_properties( + [ + {"selection_type": "by_tag", "publisher_domains": "not-a-list"}, + {"selection_type": "by_tag", "publisher_domains": []}, + ] + ) + assert out == [] + + def test_resolve_compact_form_via_get_properties_by_agent(self): + adagents = { + "authorized_agents": [ + { + "url": "https://agent.example", + "authorized_for": "Managed network CTV", + "authorization_type": "publisher_properties", + "publisher_properties": [ + { + "selection_type": "by_tag", + "property_tags": ["ctv"], + "publisher_domains": ["a.example", "b.example", "c.example"], + } + ], + } + ] + } + resolved = get_properties_by_agent(adagents, "https://agent.example") + assert [s["publisher_domain"] for s in resolved] == [ + "a.example", + "b.example", + "c.example", + ] + assert all(s["selection_type"] == "by_tag" for s in resolved) + assert all(s["property_tags"] == ["ctv"] for s in resolved) + + def test_xor_violation_both_publisher_fields(self): + from adcp.validation import ( + ValidationError, + validate_publisher_properties_item, + ) + + with pytest.raises(ValidationError, match="mutually exclusive"): + validate_publisher_properties_item( + { + "selection_type": "by_tag", + "property_tags": ["ctv"], + "publisher_domain": "cnn.com", + "publisher_domains": ["espn.com"], + } + ) + + def test_xor_violation_neither_publisher_field(self): + from adcp.validation import ( + ValidationError, + validate_publisher_properties_item, + ) + + with pytest.raises(ValidationError, match="exactly one"): + validate_publisher_properties_item( + {"selection_type": "by_tag", "property_tags": ["ctv"]} + ) + + def test_by_id_rejects_publisher_domains(self): + from adcp.validation import ( + ValidationError, + validate_publisher_properties_item, + ) + + with pytest.raises(ValidationError, match="by_id"): + validate_publisher_properties_item( + { + "selection_type": "by_id", + "property_ids": ["p1"], + "publisher_domains": ["cnn.com", "espn.com"], + } + ) + + def test_publisher_domains_must_be_unique(self): + from adcp.validation import ( + ValidationError, + validate_publisher_properties_item, + ) + + with pytest.raises(ValidationError, match="unique"): + validate_publisher_properties_item( + { + "selection_type": "all", + "publisher_domains": ["a.example", "a.example"], + } + ) + + def test_compact_form_accepts_selection_type_all(self): + from adcp.validation import validate_publisher_properties_item + + # Should not raise. + validate_publisher_properties_item( + { + "selection_type": "all", + "publisher_domains": ["a.example", "b.example"], + } + ) + + +class TestRevokedPublisherDomains: + """adcp#4504: ``revoked_publisher_domains[]`` filter takes precedence.""" + + def test_revocation_filters_compact_form_selectors(self): + adagents = { + "revoked_publisher_domains": [ + { + "publisher_domain": "b.example", + "revoked_at": "2026-05-01T00:00:00Z", + "reason": "relationship_ended", + } + ], + "authorized_agents": [ + { + "url": "https://agent.example", + "authorized_for": "Managed", + "authorization_type": "publisher_properties", + "publisher_properties": [ + { + "selection_type": "by_tag", + "property_tags": ["ctv"], + "publisher_domains": ["a.example", "b.example", "c.example"], + } + ], + } + ], + } + resolved = get_properties_by_agent(adagents, "https://agent.example") + assert [s["publisher_domain"] for s in resolved] == ["a.example", "c.example"] + + def test_revocation_filters_singular_selectors(self): + adagents = { + "revoked_publisher_domains": [ + {"publisher_domain": "cnn.com", "revoked_at": "2026-05-01T00:00:00Z"} + ], + "authorized_agents": [ + { + "url": "https://agent.example", + "authorized_for": "x", + "authorization_type": "publisher_properties", + "publisher_properties": [ + {"selection_type": "all", "publisher_domain": "cnn.com"}, + {"selection_type": "all", "publisher_domain": "espn.com"}, + ], + } + ], + } + resolved = get_properties_by_agent(adagents, "https://agent.example") + assert [s["publisher_domain"] for s in resolved] == ["espn.com"] + + def test_revocation_filters_top_level_properties(self): + adagents = { + "revoked_publisher_domains": [ + {"publisher_domain": "revoked.example", "revoked_at": "2026-05-01T00:00:00Z"} + ], + "properties": [ + {"property_id": "p1", "publisher_domain": "kept.example"}, + {"property_id": "p2", "publisher_domain": "revoked.example"}, + ], + "authorized_agents": [ + { + "url": "https://agent.example", + "authorized_for": "x", + "authorization_type": "property_ids", + "property_ids": ["p1", "p2"], + } + ], + } + resolved = get_properties_by_agent(adagents, "https://agent.example") + assert [p["property_id"] for p in resolved] == ["p1"] + + def test_revocation_validation_rejects_missing_revoked_at(self): + from adcp.validation import ( + ValidationError, + validate_revoked_publisher_domain_entry, + ) + + with pytest.raises(ValidationError, match="revoked_at"): + validate_revoked_publisher_domain_entry({"publisher_domain": "x.example"}) + + def test_revocation_validation_rejects_invalid_reason(self): + from adcp.validation import ( + ValidationError, + validate_revoked_publisher_domain_entry, + ) + + with pytest.raises(ValidationError, match="invalid reason"): + validate_revoked_publisher_domain_entry( + { + "publisher_domain": "x.example", + "revoked_at": "2026-05-01T00:00:00Z", + "reason": "made_up_reason", + } + ) + + def test_revocation_validation_accepts_all_enum_reasons(self): + from adcp.validation import validate_revoked_publisher_domain_entry + + for reason in ( + "relationship_ended", + "compliance_violation", + "publisher_request", + "other", + ): + validate_revoked_publisher_domain_entry( + { + "publisher_domain": "x.example", + "revoked_at": "2026-05-01T00:00:00Z", + "reason": reason, + } + ) + + def test_validate_adagents_rejects_bad_revoked_array(self): + from adcp.validation import ValidationError, validate_adagents + + with pytest.raises(ValidationError, match="revoked_publisher_domains"): + validate_adagents({"revoked_publisher_domains": "not an array"}) + + +class TestFetchWithCache: + """adcp#4504: 304 conditional refresh + two-tier size cap.""" + + @pytest.mark.asyncio + async def test_304_returns_cached_body(self): + from adcp.adagents import AdagentsCacheEntry, fetch_adagents_with_cache + + cached_body = { + "authorized_agents": [ + { + "url": "https://agent.example.com", + "authorized_for": "All", + "authorization_type": "property_ids", + "property_ids": ["p1"], + } + ] + } + cache_entry = AdagentsCacheEntry( + body=cached_body, etag='"abc123"', last_modified="Mon, 01 Jan 2026 00:00:00 GMT" + ) + + mock_client = make_url_dispatching_client( + { + "https://example.com/.well-known/adagents.json": ( + None, + 304, + {"etag": '"abc123"'}, + ) + } + ) + + result = await fetch_adagents_with_cache( + "example.com", cache_entry=cache_entry, client=mock_client + ) + assert result.not_modified is True + assert result.data == cached_body + assert result.etag == '"abc123"' + + # The request must have carried the conditional headers. + call_kwargs = mock_client.stream.call_args.kwargs + sent_headers = call_kwargs["headers"] + assert sent_headers.get("If-None-Match") == '"abc123"' + assert sent_headers.get("If-Modified-Since") == "Mon, 01 Jan 2026 00:00:00 GMT" + + @pytest.mark.asyncio + async def test_200_returns_fresh_validators(self): + from adcp.adagents import fetch_adagents_with_cache + + fresh_body = { + "authorized_agents": [ + { + "url": "https://agent.example.com", + "authorized_for": "All", + "authorization_type": "property_ids", + "property_ids": ["p1"], + } + ] + } + mock_client = make_url_dispatching_client( + { + "https://example.com/.well-known/adagents.json": ( + fresh_body, + 200, + {"etag": '"xyz"', "last-modified": "Mon, 19 May 2026 00:00:00 GMT"}, + ) + } + ) + + result = await fetch_adagents_with_cache("example.com", client=mock_client) + assert result.not_modified is False + assert result.data == fresh_body + assert result.etag == '"xyz"' + assert result.last_modified == "Mon, 19 May 2026 00:00:00 GMT" + + @pytest.mark.asyncio + async def test_304_without_cache_entry_is_an_error(self): + from adcp.adagents import fetch_adagents_with_cache + from adcp.exceptions import AdagentsValidationError + + mock_client = make_url_dispatching_client( + {"https://example.com/.well-known/adagents.json": (None, 304, {})} + ) + + with pytest.raises(AdagentsValidationError, match="304"): + await fetch_adagents_with_cache("example.com", client=mock_client) + + +class TestSizeCaps: + """adcp#4504: 5 MiB pointer cap + 20 MiB authoritative cap.""" + + @pytest.mark.asyncio + async def test_pointer_body_over_5mb_rejected(self): + from adcp.adagents import MAX_POINTER_BYTES, fetch_adagents + from adcp.exceptions import AdagentsValidationError + + oversized_body = b"x" * (MAX_POINTER_BYTES + 1) + mock_client = MagicMock() + + def _stream(method, url, **kwargs): + response = _make_stream_response(status_code=200, body=oversized_body) + return _stream_cm(response) + + mock_client.stream = MagicMock(side_effect=_stream) + + with pytest.raises(AdagentsValidationError, match="size cap"): + await fetch_adagents("example.com", client=mock_client) + + @pytest.mark.asyncio + async def test_pointer_body_under_5mb_accepted(self): + from adcp.adagents import fetch_adagents + + body = { + "authorized_agents": [ + { + "url": "https://agent.example.com", + "authorized_for": "All", + "authorization_type": "property_ids", + "property_ids": ["p1"], + } + ] + } + mock_client = make_url_dispatching_client( + {"https://example.com/.well-known/adagents.json": body} + ) + result = await fetch_adagents("example.com", client=mock_client) + assert result == body + + @pytest.mark.asyncio + async def test_pointer_content_length_over_cap_rejected_up_front(self): + from adcp.adagents import MAX_POINTER_BYTES, fetch_adagents + from adcp.exceptions import AdagentsValidationError + + mock_client = MagicMock() + + def _stream(method, url, **kwargs): + # The cap is enforced from Content-Length before any bytes + # are streamed, so the body itself doesn't need to be large. + response = _make_stream_response( + status_code=200, + body=b"{}", + headers={"content-length": str(MAX_POINTER_BYTES + 100)}, + ) + return _stream_cm(response) + + mock_client.stream = MagicMock(side_effect=_stream) + + with pytest.raises(AdagentsValidationError, match="Content-Length"): + await fetch_adagents("example.com", client=mock_client) + + @pytest.mark.asyncio + async def test_authoritative_hop_uses_20mb_cap(self): + # A body that's larger than the pointer cap but under the + # authoritative cap MUST be accepted when served as the second + # hop. We simulate this with an in-band body that's 6 MB — over + # the 5 MB pointer cap, under the 20 MB authoritative cap. + import adcp.adagents as adagents_module + from adcp.adagents import MAX_POINTER_BYTES, fetch_adagents + + pointer = {"authoritative_location": "https://cdn.example.com/adagents.json"} + large_body = { + "authorized_agents": [ + { + "url": "https://agent.example.com", + "authorized_for": "x", + "authorization_type": "property_ids", + "property_ids": ["p1"], + # Inject 6 MB of padding into a permissive (extra='allow') key. + "padding": "x" * (MAX_POINTER_BYTES + 1024 * 1024), + } + ] + } + mock_client = make_url_dispatching_client( + {"https://example.com/.well-known/adagents.json": pointer} + ) + redirect_client = make_url_dispatching_client( + {"https://cdn.example.com/adagents.json": large_body} + ) + + with unittest.mock.patch.object( + adagents_module.httpx, "AsyncClient", lambda *a, **kw: redirect_client + ): + result = await fetch_adagents("example.com", client=mock_client) + + assert result["authorized_agents"][0]["property_ids"] == ["p1"] + + @pytest.mark.asyncio + async def test_authoritative_hop_rejects_over_20mb(self): + import adcp.adagents as adagents_module + from adcp.adagents import MAX_AUTHORITATIVE_BYTES, fetch_adagents + from adcp.exceptions import AdagentsValidationError + + pointer = {"authoritative_location": "https://cdn.example.com/adagents.json"} + oversized_body = b"x" * (MAX_AUTHORITATIVE_BYTES + 1) + + mock_client = make_url_dispatching_client( + {"https://example.com/.well-known/adagents.json": pointer} + ) + redirect_client = MagicMock() + + def _stream(method, url, **kwargs): + response = _make_stream_response(status_code=200, body=oversized_body) + return _stream_cm(response) + + redirect_client.stream = MagicMock(side_effect=_stream) + redirect_client.__aenter__ = AsyncMock(return_value=redirect_client) + redirect_client.__aexit__ = AsyncMock(return_value=None) + + with unittest.mock.patch.object( + adagents_module.httpx, "AsyncClient", lambda *a, **kw: redirect_client + ): + with pytest.raises(AdagentsValidationError, match="size cap"): + await fetch_adagents("example.com", client=mock_client) From fcb967c142a023c01a4b0bf14d34e3121247e72e Mon Sep 17 00:00:00 2001 From: Brian O'Kelley Date: Wed, 20 May 2026 09:48:56 -0700 Subject: [PATCH 2/2] =?UTF-8?q?fix(adagents):=20address=20expert=20review?= =?UTF-8?q?=20on=20#729=20=E2=80=94=20SSRF=20+=20validator=20coverage?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three reviewer passes (protocol, security, code) on 5160ab0b surfaced one must-fix per area; this commit closes them. Security: - `_stream_capped` now passes `follow_redirects=False`. HTTP 30x cannot be used to slip past the SSRF gate that protects `authoritative_location` (which is the only sanctioned cross-host delegation path). - `_validate_publisher_domain` calls the same SSRF gate (`_check_safe_host`) used for redirect targets, so IP literals and internal hostnames are rejected on the first hop too. `is_multicast` added to the IP class set. - Cache validators (ETag / Last-Modified) are truncated above 256 bytes before being persisted, so a hostile origin cannot inflate every subsequent conditional request. - The JSON-decode error path catches `json.JSONDecodeError` (not bare `Exception`) and truncates the upstream-derived message to 200 chars to bound log injection. Protocol: - `validate_adagents` keyed off `agents`; the schema field is `authorized_agents`. The new `validate_publisher_properties_item` XOR enforcer was therefore dead at the integration level. Now lit up. Code review: - Drop dead "304/" token from the redirect-hop error message. - Drop dead ternary in the 304-on-first-hop return — `is_redirect` can never be true there. - Fix misleading comment on `_fanout_publisher_properties` (the original compact entry is *not* retained — the docstring claimed otherwise). - Add drift-detection test that asserts `_REVOCATION_REASONS` matches the generated `Reason` enum. Tests: - Multi-chunk streaming-cap test (exercises the running-total guard without relying on a single oversized chunk). - Mixed compact + expanded entries in the same `publisher_properties[]` array, with order preservation. - Cache entry carrying only `Last-Modified` (no ETag) — verify the conditional headers and the 304 cache-hit path. - Three existing test fixtures updated to be schema-compliant (`authorization_type` + `property_ids`) so the now-live integration validator no longer rejects them as pre-discriminator legacy. Deferred follow-ups (not blocking this PR): - DNS pinning to harden against rebinding attacks (security H1). - Pydantic `model_validator` for selector XOR — datamodel-codegen can't emit `allOf[not[required[both]]]`; the runtime validator covers the dict path, but typed Pydantic consumers still need an explicit gate. - Revocation filtering on the `inline_properties` branch. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/adcp/adagents.py | 102 ++++++++++++++++------- src/adcp/validation/legacy.py | 11 ++- tests/test_adagents.py | 147 +++++++++++++++++++++++++++++++++- 3 files changed, 223 insertions(+), 37 deletions(-) diff --git a/src/adcp/adagents.py b/src/adcp/adagents.py index 417495a40..cbb7720ef 100644 --- a/src/adcp/adagents.py +++ b/src/adcp/adagents.py @@ -168,6 +168,37 @@ def _normalize_domain(domain: str) -> str: return domain +# Hostnames that resolve to cloud metadata services or local-only namespaces. +# `.internal` is the GCP convention; `.local` is RFC 6762 mDNS. +_INTERNAL_HOSTNAMES: frozenset[str] = frozenset( + {"localhost", "localhost.localdomain", "metadata.google.internal"} +) + + +def _check_safe_host(hostname: str, context: str) -> None: + """Reject hostnames that target loopback, link-local, private, or metadata services. + + Used for every outbound HTTP target derived from publisher-controlled + input (publisher_domain, authoritative_location, MANAGERDOMAIN). This + is a string-level gate — it catches IP literals and well-known + private hostnames, but does not pin DNS resolution. A hostile DNS + server that returns a public IP on first lookup and a private IP on + connect (DNS rebinding) is out of scope; see security follow-up. + """ + if not hostname: + raise AdagentsValidationError(f"{context} must have a hostname") + if hostname in _INTERNAL_HOSTNAMES or hostname.endswith(".local"): + raise AdagentsValidationError(f"{context} must not target localhost or internal hostnames") + if hostname.endswith(".internal"): + raise AdagentsValidationError(f"{context} must not target an .internal hostname") + try: + ip = ipaddress.ip_address(hostname) + except ValueError: + return + if ip.is_private or ip.is_loopback or ip.is_link_local or ip.is_reserved or ip.is_multicast: + raise AdagentsValidationError(f"{context} must not target a private/reserved address") + + def _validate_publisher_domain(domain: str) -> str: """Validate and sanitize publisher domain for security. @@ -213,6 +244,9 @@ def _validate_publisher_domain(domain: str) -> str: if "." not in domain: raise AdagentsValidationError(f"Publisher domain must contain at least one dot: {domain!r}") + # SSRF gate: reject IP literals and internal hostnames. + _check_safe_host(domain, "publisher_domain") + return domain @@ -229,21 +263,7 @@ def _validate_redirect_url(url: str) -> None: AdagentsValidationError: If the URL targets a private/reserved address """ parsed = urlparse(url) - hostname = parsed.hostname or "" - - # Reject localhost by name - if hostname in ("localhost", "localhost.localdomain") or hostname.endswith(".local"): - raise AdagentsValidationError("authoritative_location must not target localhost") - - # Reject private/reserved IP addresses - try: - ip = ipaddress.ip_address(hostname) - if ip.is_private or ip.is_loopback or ip.is_link_local or ip.is_reserved: - raise AdagentsValidationError( - "authoritative_location must not target private/reserved addresses" - ) - except ValueError: - pass # Not an IP literal — hostname is fine + _check_safe_host(parsed.hostname or "", "authoritative_location") def normalize_url(url: str) -> str: @@ -550,16 +570,16 @@ async def _resolve_direct( # really an upstream pointer failure. if is_redirect: raise AdagentsValidationError( - f"authoritative_location target returned 304/404: {url}" + f"authoritative_location target returned 404: {url}" ) from None raise if not_modified: - # 304 on the first hop: return the cached body and stop — - # the cached body is the previously-resolved authoritative - # data, so no second hop is needed. - discovery: DiscoveryMethod = "authoritative_location" if is_redirect else "direct" - return data, discovery, etag, last_modified, True + # 304 is only ever returned on the first hop: hop_cache is None + # on the redirected hop (see above), and _fetch_adagents_url + # raises if a server returns 304 without a cache_entry. So the + # discovery method here is always "direct". + return data, "direct", etag, last_modified, True if "authoritative_location" in data and "authorized_agents" not in data: authoritative_url = data["authoritative_location"] @@ -968,8 +988,8 @@ async def _fetch_adagents_url( ) return ( cache_entry.body, - response_headers.get("etag") or cache_entry.etag, - response_headers.get("last-modified") or cache_entry.last_modified, + _safe_validator(response_headers.get("etag")) or cache_entry.etag, + _safe_validator(response_headers.get("last-modified")) or cache_entry.last_modified, True, ) @@ -982,8 +1002,11 @@ async def _fetch_adagents_url( try: data = json.loads(body) - except Exception as e: - raise AdagentsValidationError(f"Invalid JSON in adagents.json: {e}") from e + except json.JSONDecodeError as e: + # Truncate the upstream-derived error to bound log volume — a + # malicious server can otherwise force unbounded `str(e)` content + # into caller logs by sending a large unparsable body. + raise AdagentsValidationError(f"Invalid JSON in adagents.json: {str(e)[:200]}") from e if not isinstance(data, dict): raise AdagentsValidationError("adagents.json must be a JSON object") @@ -1001,7 +1024,24 @@ async def _fetch_adagents_url( "adagents.json must have either 'authorized_agents' or 'authoritative_location'" ) - return data, response_headers.get("etag"), response_headers.get("last-modified"), False + return ( + data, + _safe_validator(response_headers.get("etag")), + _safe_validator(response_headers.get("last-modified")), + False, + ) + + +# Cache validators (ETag / Last-Modified) are replayed on the next fetch, so +# an unbounded value sent back by a hostile server would balloon every future +# request. RFC 9110 doesn't cap either header; this is purely defensive. +_MAX_VALIDATOR_LEN = 256 + + +def _safe_validator(value: str | None) -> str | None: + if value is None or len(value) > _MAX_VALIDATOR_LEN: + return None + return value async def _stream_capped( @@ -1018,8 +1058,12 @@ async def _stream_capped( rejected up-front; servers that omit the header (or lie) are still caught by the running total inside the loop. """ + # follow_redirects=False: HTTP 30x is not how adagents.json delegates. + # Cross-host delegation goes through the explicit `authoritative_location` + # field, which passes through _validate_redirect_url. Allowing httpx to + # transparently follow 30x would bypass that SSRF gate. async with client.stream( - "GET", url, headers=headers, timeout=timeout, follow_redirects=True + "GET", url, headers=headers, timeout=timeout, follow_redirects=False ) as response: if response.status_code == 304: return b"", 304, response.headers @@ -1129,9 +1173,7 @@ def _resolve_agent_properties( # Handle publisher_properties (cross-domain references). # Each entry with publisher_domains[a,b,c] fans out to one selector per # listed domain — the compact form is exactly equivalent to repeating - # the entry once per publisher per adcp#4504. The original entry is - # also retained so callers that want the as-authored compact form for - # diff stability can still see it. + # the entry once per publisher per adcp#4504. if authorization_type == "publisher_properties": publisher_props = agent.get("publisher_properties", []) if not isinstance(publisher_props, list): diff --git a/src/adcp/validation/legacy.py b/src/adcp/validation/legacy.py index de76735fe..0012430dc 100644 --- a/src/adcp/validation/legacy.py +++ b/src/adcp/validation/legacy.py @@ -202,6 +202,9 @@ def validate_product(product: dict[str, Any]) -> None: validate_publisher_properties_item(item) +# Must stay in sync with the `Reason` enum on RevokedPublisherDomain in +# the generated adagents schema. A drift test in +# tests/test_adagents.py compares this frozenset to the generated enum. _REVOCATION_REASONS = frozenset( {"relationship_ended", "compliance_violation", "publisher_request", "other"} ) @@ -246,9 +249,11 @@ def validate_adagents(adagents: dict[str, Any]) -> None: Raises: ValidationError: If validation fails """ - if "agents" in adagents: - for agent in adagents["agents"]: - validate_agent_authorization(agent) + authorized_agents = adagents.get("authorized_agents") + if isinstance(authorized_agents, list): + for agent in authorized_agents: + if isinstance(agent, dict): + validate_agent_authorization(agent) revoked = adagents.get("revoked_publisher_domains") if revoked is not None: diff --git a/tests/test_adagents.py b/tests/test_adagents.py index 2ad8225d1..e977c9851 100644 --- a/tests/test_adagents.py +++ b/tests/test_adagents.py @@ -2031,7 +2031,14 @@ async def test_direct_discovery(self): from adcp.adagents import validate_adagents_domain adagents = { - "authorized_agents": [{"url": "https://agent.example.com", "authorized_for": "All"}] + "authorized_agents": [ + { + "url": "https://agent.example.com", + "authorized_for": "All", + "authorization_type": "property_ids", + "property_ids": ["p1"], + } + ] } def handler(url): @@ -2059,7 +2066,14 @@ async def test_authoritative_location_discovery(self): "authoritative_location": "https://cdn.example.com/adagents.json", } resolved = { - "authorized_agents": [{"url": "https://agent.example.com", "authorized_for": "All"}] + "authorized_agents": [ + { + "url": "https://agent.example.com", + "authorized_for": "All", + "authorization_type": "property_ids", + "property_ids": ["p1"], + } + ] } # Initial fetch (publisher) returns the redirect stub. @@ -2089,7 +2103,12 @@ async def test_ads_txt_managerdomain_fallback(self): manager_adagents = { "authorized_agents": [ - {"url": "https://agent.example", "authorized_for": "Managed inventory"} + { + "url": "https://agent.example", + "authorized_for": "Managed inventory", + "authorization_type": "property_ids", + "property_ids": ["p1"], + } ] } @@ -2144,7 +2163,14 @@ async def test_duplicate_managerdomain_last_wins(self): from adcp.adagents import validate_adagents_domain manager_adagents = { - "authorized_agents": [{"url": "https://agent.example", "authorized_for": "All"}] + "authorized_agents": [ + { + "url": "https://agent.example", + "authorized_for": "All", + "authorization_type": "property_ids", + "property_ids": ["p1"], + } + ] } def handler(url): @@ -2748,6 +2774,37 @@ def test_fanout_expands_compact_form(self): # publisher_domains stripped from each expanded selector assert all("publisher_domains" not in s for s in out) + def test_fanout_preserves_mixed_compact_and_expanded_in_order(self): + # adcp#4504 allows both forms in the same publisher_properties[] + # array. Order must be preserved; compact entries fan out in-place. + from adcp.adagents import _fanout_publisher_properties + + out = _fanout_publisher_properties( + [ + { + "selection_type": "by_tag", + "property_tags": ["ctv"], + "publisher_domain": "first.example", + }, + { + "selection_type": "by_tag", + "property_tags": ["ctv"], + "publisher_domains": ["b1.example", "b2.example"], + }, + { + "selection_type": "by_tag", + "property_tags": ["ctv"], + "publisher_domain": "last.example", + }, + ] + ) + assert [s["publisher_domain"] for s in out] == [ + "first.example", + "b1.example", + "b2.example", + "last.example", + ] + def test_fanout_skips_invalid_compact_entries(self): from adcp.adagents import _fanout_publisher_properties @@ -2856,6 +2913,15 @@ def test_compact_form_accepts_selection_type_all(self): class TestRevokedPublisherDomains: """adcp#4504: ``revoked_publisher_domains[]`` filter takes precedence.""" + def test_revocation_reasons_match_generated_enum(self): + # The validator's hard-coded reason set must stay in sync with the + # generated `Reason` enum. Drift here would silently reject valid + # values (or accept invalid ones) when the schema regen runs. + from adcp.types.generated_poc.adagents import Reason + from adcp.validation.legacy import _REVOCATION_REASONS + + assert _REVOCATION_REASONS == frozenset(r.value for r in Reason) + def test_revocation_filters_compact_form_selectors(self): adagents = { "revoked_publisher_domains": [ @@ -3016,6 +3082,46 @@ async def test_304_returns_cached_body(self): assert sent_headers.get("If-None-Match") == '"abc123"' assert sent_headers.get("If-Modified-Since") == "Mon, 01 Jan 2026 00:00:00 GMT" + @pytest.mark.asyncio + async def test_cache_entry_with_only_last_modified_sends_only_ims(self): + # A cache entry can legitimately carry only Last-Modified (no + # ETag) — origins differ. Verify we send If-Modified-Since alone + # and the 304 path still serves the cached body. + from adcp.adagents import AdagentsCacheEntry, fetch_adagents_with_cache + + cached_body = { + "authorized_agents": [ + { + "url": "https://agent.example.com", + "authorized_for": "All", + "authorization_type": "property_ids", + "property_ids": ["p1"], + } + ] + } + cache_entry = AdagentsCacheEntry( + body=cached_body, + etag=None, + last_modified="Mon, 01 Jan 2026 00:00:00 GMT", + ) + mock_client = make_url_dispatching_client( + {"https://example.com/.well-known/adagents.json": (None, 304, {})} + ) + + result = await fetch_adagents_with_cache( + "example.com", cache_entry=cache_entry, client=mock_client + ) + assert result.not_modified is True + assert result.data == cached_body + # The cache validators are echoed back when the 304 carries no + # fresh ones — IMS persists, etag remains None. + assert result.last_modified == "Mon, 01 Jan 2026 00:00:00 GMT" + assert result.etag is None + + sent_headers = mock_client.stream.call_args.kwargs["headers"] + assert sent_headers.get("If-Modified-Since") == "Mon, 01 Jan 2026 00:00:00 GMT" + assert "If-None-Match" not in sent_headers + @pytest.mark.asyncio async def test_200_returns_fresh_validators(self): from adcp.adagents import fetch_adagents_with_cache @@ -3079,6 +3185,39 @@ def _stream(method, url, **kwargs): with pytest.raises(AdagentsValidationError, match="size cap"): await fetch_adagents("example.com", client=mock_client) + @pytest.mark.asyncio + async def test_streaming_cap_enforced_across_chunks_without_content_length(self): + # When Content-Length is absent (chunked transfer-encoding), the + # cap MUST come from the running-total guard inside the stream + # loop. A single big body would exercise that too, but using many + # small chunks demonstrates the loop guard is hit mid-stream, not + # only on the final accumulator size. + from adcp.adagents import MAX_POINTER_BYTES, fetch_adagents + from adcp.exceptions import AdagentsValidationError + + chunk_size = 256 * 1024 + chunk_count = (MAX_POINTER_BYTES // chunk_size) + 2 + + response = MagicMock() + response.status_code = 200 + response.headers = httpx.Headers({}) + + async def aiter_bytes(): + for _ in range(chunk_count): + yield b"x" * chunk_size + + response.aiter_bytes = aiter_bytes + + mock_client = MagicMock() + + def _stream(method, url, **kwargs): + return _stream_cm(response) + + mock_client.stream = MagicMock(side_effect=_stream) + + with pytest.raises(AdagentsValidationError, match="size cap"): + await fetch_adagents("example.com", client=mock_client) + @pytest.mark.asyncio async def test_pointer_body_under_5mb_accepted(self): from adcp.adagents import fetch_adagents