diff --git a/README.md b/README.md index d27b993c..2c0535c0 100644 --- a/README.md +++ b/README.md @@ -424,6 +424,50 @@ auth = index.get_agent_authorizations("https://agent-x.com") premium = index.find_agents_by_property_tags(["premium", "ctv"]) ``` +## Publisher Authorization Validation + +Verify sales agents are authorized to sell publisher properties via adagents.json: + +```python +from adcp import ( + fetch_adagents, + verify_agent_authorization, + verify_agent_for_property, +) + +# Fetch and parse adagents.json from publisher +adagents_data = await fetch_adagents("publisher.com") + +# Verify agent authorization for a property +is_authorized = verify_agent_authorization( + adagents_data=adagents_data, + agent_url="https://sales-agent.example.com", + property_type="website", + property_identifiers=[{"type": "domain", "value": "publisher.com"}] +) + +# Or use convenience wrapper (fetch + verify in one call) +is_authorized = await verify_agent_for_property( + publisher_domain="publisher.com", + agent_url="https://sales-agent.example.com", + property_identifiers=[{"type": "domain", "value": "publisher.com"}], + property_type="website" +) +``` + +**Domain Matching Rules:** +- Exact match: `example.com` matches `example.com` +- Common subdomains: `www.example.com` matches `example.com` +- Wildcards: `api.example.com` matches `*.example.com` +- Protocol-agnostic: `http://agent.com` matches `https://agent.com` + +**Use Cases:** +- Sales agents verify authorization before accepting media buys +- Publishers test their adagents.json files +- Developer tools build authorization validators + +See `examples/adagents_validation.py` for complete examples. + ## CLI Tool The `adcp` command-line tool provides easy interaction with AdCP agents without writing code. diff --git a/examples/adagents_validation.py b/examples/adagents_validation.py new file mode 100644 index 00000000..efe3a4ce --- /dev/null +++ b/examples/adagents_validation.py @@ -0,0 +1,286 @@ +#!/usr/bin/env python3 +""" +Example: Validating Publisher Authorization with adagents.json + +This example demonstrates how to use the adagents validation utilities +to verify that a sales agent is authorized to sell ads for a publisher's +properties. +""" + +import asyncio + +from adcp import ( + AdagentsNotFoundError, + AdagentsValidationError, + fetch_adagents, + verify_agent_authorization, + verify_agent_for_property, +) + + +async def example_fetch_and_verify(): + """Example: Fetch adagents.json and verify authorization.""" + print("=" * 60) + print("Example 1: Fetch and Verify Authorization") + print("=" * 60) + + publisher_domain = "example-publisher.com" + agent_url = "https://sales-agent.example.com" + + try: + # Fetch the adagents.json file from the publisher + print(f"\n1. Fetching adagents.json from {publisher_domain}...") + adagents_data = await fetch_adagents(publisher_domain) + print(f" ✓ Found {len(adagents_data['authorized_agents'])} authorized agents") + + # Verify if our agent is authorized for a specific property + print(f"\n2. Checking if {agent_url} is authorized...") + is_authorized = verify_agent_authorization( + adagents_data=adagents_data, + agent_url=agent_url, + property_type="website", + property_identifiers=[{"type": "domain", "value": "example-publisher.com"}], + ) + + if is_authorized: + print(" ✓ Agent is authorized for this property") + else: + print(" ✗ Agent is NOT authorized for this property") + + except AdagentsNotFoundError as e: + print(f" ✗ Error: {e}") + print(" The publisher has not deployed an adagents.json file") + except AdagentsValidationError as e: + print(f" ✗ Validation Error: {e}") + + +async def example_convenience_wrapper(): + """Example: Use the convenience wrapper for one-step verification.""" + print("\n\n" + "=" * 60) + print("Example 2: Convenience Wrapper (Fetch + Verify)") + print("=" * 60) + + try: + # Single function call to fetch and verify + print("\nChecking authorization in one step...") + is_authorized = await verify_agent_for_property( + publisher_domain="example-publisher.com", + agent_url="https://sales-agent.example.com", + property_identifiers=[{"type": "domain", "value": "example-publisher.com"}], + property_type="website", + ) + + if is_authorized: + print("✓ Agent is authorized!") + else: + print("✗ Agent is NOT authorized") + + except Exception as e: + print(f"✗ Error: {e}") + + +def example_manual_verification(): + """Example: Manual verification with pre-fetched data.""" + print("\n\n" + "=" * 60) + print("Example 3: Manual Verification with Pre-fetched Data") + print("=" * 60) + + # Example adagents.json data structure + adagents_data = { + "authorized_agents": [ + { + "url": "https://sales-agent.example.com", + "properties": [ + { + "property_type": "website", + "name": "Main Website", + "identifiers": [{"type": "domain", "value": "example.com"}], + }, + { + "property_type": "mobile_app", + "name": "iOS App", + "identifiers": [{"type": "bundle_id", "value": "com.example.app"}], + }, + ], + }, + { + "url": "https://another-agent.com", + "properties": [], # Empty properties = authorized for all + }, + ] + } + + # Test various scenarios + print("\nScenario 1: Agent authorized for website") + result = verify_agent_authorization( + adagents_data, + "https://sales-agent.example.com", + "website", + [{"type": "domain", "value": "www.example.com"}], # www subdomain + ) + print(f" Result: {result} (www subdomain matches example.com)") + + print("\nScenario 2: Agent authorized for mobile app") + result = verify_agent_authorization( + adagents_data, + "https://sales-agent.example.com", + "mobile_app", + [{"type": "bundle_id", "value": "com.example.app"}], + ) + print(f" Result: {result}") + + print("\nScenario 3: Agent NOT authorized for different property") + result = verify_agent_authorization( + adagents_data, + "https://sales-agent.example.com", + "website", + [{"type": "domain", "value": "different.com"}], + ) + print(f" Result: {result}") + + print("\nScenario 4: Agent with empty properties = authorized for all") + result = verify_agent_authorization( + adagents_data, "https://another-agent.com", "website", [{"type": "domain", "value": "any.com"}] + ) + print(f" Result: {result}") + + print("\nScenario 5: Protocol-agnostic matching (http vs https)") + result = verify_agent_authorization( + adagents_data, + "http://sales-agent.example.com", # http instead of https + "website", + [{"type": "domain", "value": "example.com"}], + ) + print(f" Result: {result} (protocol ignored)") + + +def example_property_discovery(): + """Example: Discover all properties and tags from adagents.json.""" + print("\n\n" + "=" * 60) + print("Example 4: Property and Tag Discovery") + print("=" * 60) + + from adcp import get_all_properties, get_all_tags, get_properties_by_agent + + # Example adagents.json with tags + adagents_data = { + "authorized_agents": [ + { + "url": "https://sales-agent-1.example.com", + "properties": [ + { + "property_type": "website", + "name": "News Site", + "identifiers": [{"type": "domain", "value": "news.example.com"}], + "tags": ["premium", "news", "desktop"], + }, + { + "property_type": "mobile_app", + "name": "News App", + "identifiers": [{"type": "bundle_id", "value": "com.example.news"}], + "tags": ["premium", "news", "mobile"], + }, + ], + }, + { + "url": "https://sales-agent-2.example.com", + "properties": [ + { + "property_type": "website", + "name": "Sports Site", + "identifiers": [{"type": "domain", "value": "sports.example.com"}], + "tags": ["sports", "live-streaming"], + } + ], + }, + ] + } + + print("\n1. Get all properties across all agents:") + all_props = get_all_properties(adagents_data) + print(f" Found {len(all_props)} total properties") + for prop in all_props: + print(f" - {prop['name']} ({prop['property_type']}) - Agent: {prop['agent_url']}") + + print("\n2. Get all unique tags:") + all_tags = get_all_tags(adagents_data) + print(f" Tags: {', '.join(sorted(all_tags))}") + + print("\n3. Get properties for a specific agent:") + agent_props = get_properties_by_agent(adagents_data, "https://sales-agent-1.example.com") + print(f" Agent 1 has {len(agent_props)} properties:") + for prop in agent_props: + print(f" - {prop['name']} (tags: {', '.join(prop.get('tags', []))})") + + +def example_domain_matching(): + """Example: Domain matching rules.""" + print("\n\n" + "=" * 60) + print("Example 5: Domain Matching Rules") + print("=" * 60) + + from adcp import domain_matches + + print("\n1. Exact match:") + print(f" example.com == example.com: {domain_matches('example.com', 'example.com')}") + + print("\n2. Common subdomains (www, m) match bare domain:") + print(f" www.example.com matches example.com: {domain_matches('www.example.com', 'example.com')}") + print(f" m.example.com matches example.com: {domain_matches('m.example.com', 'example.com')}") + + print("\n3. Other subdomains DON'T match bare domain:") + print( + f" api.example.com matches example.com: {domain_matches('api.example.com', 'example.com')}" + ) + + print("\n4. Wildcard pattern matches all subdomains:") + print( + f" api.example.com matches *.example.com: {domain_matches('api.example.com', '*.example.com')}" + ) + print( + f" www.example.com matches *.example.com: {domain_matches('www.example.com', '*.example.com')}" + ) + + print("\n5. Case-insensitive matching:") + print(f" Example.COM matches example.com: {domain_matches('Example.COM', 'example.com')}") + + +async def main(): + """Run all examples.""" + print("\n🔍 AdCP adagents.json Validation Examples\n") + + # Note: Examples 1 and 2 would require actual HTTP requests + # Uncomment to test with real domains: + # await example_fetch_and_verify() + # await example_convenience_wrapper() + + # These examples work with mock data: + example_manual_verification() + example_property_discovery() + example_domain_matching() + + print("\n\n" + "=" * 60) + print("Summary") + print("=" * 60) + print(""" +Key Functions: +1. fetch_adagents(domain) - Fetch and validate adagents.json +2. verify_agent_authorization(data, agent_url, ...) - Check authorization +3. verify_agent_for_property(domain, agent_url, ...) - Convenience wrapper +4. get_all_properties(data) - Extract all properties from all agents +5. get_all_tags(data) - Get all unique tags across properties +6. get_properties_by_agent(data, agent_url) - Get properties for specific agent +7. domain_matches(prop_domain, pattern) - Domain matching rules +8. identifiers_match(prop_ids, agent_ids) - Identifier matching + +Use Cases: +- Sales agents: Verify authorization before accepting media buys +- Publishers: Test their adagents.json files are correctly formatted +- Developer tools: Build validators and testing utilities + +See the full API documentation for more details. + """) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/src/adcp/__init__.py b/src/adcp/__init__.py index 88ff30d8..3d90bd86 100644 --- a/src/adcp/__init__.py +++ b/src/adcp/__init__.py @@ -7,8 +7,21 @@ Supports both A2A and MCP protocols with full type safety. """ +from adcp.adagents import ( + domain_matches, + fetch_adagents, + get_all_properties, + get_all_tags, + get_properties_by_agent, + identifiers_match, + verify_agent_authorization, + verify_agent_for_property, +) from adcp.client import ADCPClient, ADCPMultiAgentClient from adcp.exceptions import ( + AdagentsNotFoundError, + AdagentsTimeoutError, + AdagentsValidationError, ADCPAuthenticationError, ADCPConnectionError, ADCPError, @@ -162,6 +175,15 @@ "TaskResult", "TaskStatus", "WebhookMetadata", + # Adagents validation + "fetch_adagents", + "verify_agent_authorization", + "verify_agent_for_property", + "domain_matches", + "identifiers_match", + "get_all_properties", + "get_all_tags", + "get_properties_by_agent", # Test helpers "test_agent", "test_agent_a2a", @@ -185,6 +207,9 @@ "ADCPToolNotFoundError", "ADCPWebhookError", "ADCPWebhookSignatureError", + "AdagentsValidationError", + "AdagentsNotFoundError", + "AdagentsTimeoutError", # Request/Response types "ActivateSignalRequest", "ActivateSignalResponse", diff --git a/src/adcp/adagents.py b/src/adcp/adagents.py new file mode 100644 index 00000000..9bc2c428 --- /dev/null +++ b/src/adcp/adagents.py @@ -0,0 +1,521 @@ +from __future__ import annotations + +""" +Utilities for fetching, parsing, and validating adagents.json files per the AdCP specification. + +Publishers declare authorized sales agents via adagents.json files hosted at +https://{publisher_domain}/.well-known/adagents.json. This module provides utilities +for sales agents to verify they are authorized for specific properties. +""" + +from typing import Any +from urllib.parse import urlparse + +import httpx + +from adcp.exceptions import AdagentsNotFoundError, AdagentsTimeoutError, AdagentsValidationError + + +def _normalize_domain(domain: str) -> str: + """Normalize domain for comparison - strip, lowercase, remove trailing dots/slashes. + + Args: + domain: Domain to normalize + + Returns: + Normalized domain string + + Raises: + AdagentsValidationError: If domain contains invalid patterns + """ + domain = domain.strip().lower() + # Remove both trailing slashes and dots iteratively + while domain.endswith("/") or domain.endswith("."): + domain = domain.rstrip("/").rstrip(".") + + # Check for invalid patterns + if not domain or ".." in domain: + raise AdagentsValidationError(f"Invalid domain format: {domain!r}") + + return domain + + +def _validate_publisher_domain(domain: str) -> str: + """Validate and sanitize publisher domain for security. + + Args: + domain: Publisher domain to validate + + Returns: + Validated and normalized domain + + Raises: + AdagentsValidationError: If domain is invalid or contains suspicious characters + """ + # Check for suspicious characters BEFORE stripping (to catch injection attempts) + suspicious_chars = ["\\", "@", "\n", "\r", "\t"] + for char in suspicious_chars: + if char in domain: + raise AdagentsValidationError( + f"Invalid character in publisher domain: {char!r}" + ) + + domain = domain.strip() + + # Check basic constraints + if not domain: + raise AdagentsValidationError("Publisher domain cannot be empty") + if len(domain) > 253: # DNS maximum length + raise AdagentsValidationError(f"Publisher domain too long: {len(domain)} chars (max 253)") + + # Check for spaces after stripping leading/trailing whitespace + if " " in domain: + raise AdagentsValidationError( + "Invalid character in publisher domain: ' '" + ) + + # Remove protocol if present (common user error) - do this BEFORE checking for slashes + if "://" in domain: + domain = domain.split("://", 1)[1] + + # Remove path if present (should only be domain) - do this BEFORE checking for slashes + if "/" in domain: + domain = domain.split("/", 1)[0] + + # Normalize + domain = _normalize_domain(domain) + + # Final validation - must look like a domain + if "." not in domain: + raise AdagentsValidationError( + f"Publisher domain must contain at least one dot: {domain!r}" + ) + + return domain + + +def normalize_url(url: str) -> str: + """Normalize URL by removing protocol and trailing slash. + + Args: + url: URL to normalize + + Returns: + Normalized URL (domain/path without protocol or trailing slash) + """ + parsed = urlparse(url) + normalized = parsed.netloc + parsed.path + return normalized.rstrip("/") + + +def domain_matches(property_domain: str, agent_domain_pattern: str) -> bool: + """Check if domains match per AdCP rules. + + Rules: + - Exact match always succeeds + - 'example.com' matches www.example.com, m.example.com (common subdomains) + - 'subdomain.example.com' matches that specific subdomain only + - '*.example.com' matches all subdomains + + Args: + property_domain: Domain from property + agent_domain_pattern: Domain pattern from adagents.json + + Returns: + True if domains match per AdCP rules + """ + # Normalize both domains for comparison + try: + property_domain = _normalize_domain(property_domain) + agent_domain_pattern = _normalize_domain(agent_domain_pattern) + except AdagentsValidationError: + # Invalid domain format - no match + return False + + # Exact match + if property_domain == agent_domain_pattern: + return True + + # Wildcard pattern (*.example.com) + if agent_domain_pattern.startswith("*."): + base_domain = agent_domain_pattern[2:] + return property_domain.endswith(f".{base_domain}") + + # Bare domain matches common subdomains (www, m) + # If agent pattern is a bare domain (no subdomain), match www/m subdomains + if "." in agent_domain_pattern and not agent_domain_pattern.startswith("www."): + # Check if this looks like a bare domain (e.g., example.com) + parts = agent_domain_pattern.split(".") + if len(parts) == 2: # Looks like bare domain + common_subdomains = ["www", "m"] + for subdomain in common_subdomains: + if property_domain == f"{subdomain}.{agent_domain_pattern}": + return True + + return False + + +def identifiers_match( + property_identifiers: list[dict[str, str]], + agent_identifiers: list[dict[str, str]], +) -> bool: + """Check if any property identifier matches agent's authorized identifiers. + + Args: + property_identifiers: Identifiers from property + (e.g., [{"type": "domain", "value": "cnn.com"}]) + agent_identifiers: Identifiers from adagents.json + + Returns: + True if any identifier matches + + Notes: + - Domain identifiers use AdCP domain matching rules + - Other identifiers (bundle_id, roku_store_id, etc.) require exact match + """ + for prop_id in property_identifiers: + prop_type = prop_id.get("type", "") + prop_value = prop_id.get("value", "") + + for agent_id in agent_identifiers: + agent_type = agent_id.get("type", "") + agent_value = agent_id.get("value", "") + + # Type must match + if prop_type != agent_type: + continue + + # Domain identifiers use special matching rules + if prop_type == "domain": + if domain_matches(prop_value, agent_value): + return True + else: + # Other identifier types require exact match + if prop_value == agent_value: + return True + + return False + + +def verify_agent_authorization( + adagents_data: dict[str, Any], + agent_url: str, + property_type: str | None = None, + property_identifiers: list[dict[str, str]] | None = None, +) -> bool: + """Check if agent is authorized for a property. + + Args: + adagents_data: Parsed adagents.json data + agent_url: URL of the sales agent to verify + property_type: Type of property (website, app, etc.) - optional + property_identifiers: List of identifiers to match - optional + + Returns: + True if agent is authorized, False otherwise + + Raises: + AdagentsValidationError: If adagents_data is malformed + + Notes: + - If property_type/identifiers are None, checks if agent is authorized + for ANY property on this domain + - Implements AdCP domain matching rules + - Agent URLs are matched ignoring protocol and trailing slash + """ + # Validate structure + if not isinstance(adagents_data, dict): + raise AdagentsValidationError("adagents_data must be a dictionary") + + authorized_agents = adagents_data.get("authorized_agents") + if not isinstance(authorized_agents, list): + raise AdagentsValidationError("adagents.json must have 'authorized_agents' array") + + # Normalize the agent URL for comparison + normalized_agent_url = normalize_url(agent_url) + + # Check each authorized agent + for agent in authorized_agents: + if not isinstance(agent, dict): + continue + + agent_url_from_json = agent.get("url", "") + if not agent_url_from_json: + continue + + # Match agent URL (protocol-agnostic) + if normalize_url(agent_url_from_json) != normalized_agent_url: + continue + + # Found matching agent - now check properties + properties = agent.get("properties") + + # If properties field is missing or empty, agent is authorized for all properties + if properties is None or (isinstance(properties, list) and len(properties) == 0): + return True + + # If no property filters specified, we found the agent - authorized + if property_type is None and property_identifiers is None: + return True + + # Check specific property authorization + if isinstance(properties, list): + for prop in properties: + if not isinstance(prop, dict): + continue + + # Check property type if specified + if property_type is not None: + prop_type = prop.get("property_type", "") + if prop_type != property_type: + continue + + # Check identifiers if specified + if property_identifiers is not None: + prop_identifiers = prop.get("identifiers", []) + if not isinstance(prop_identifiers, list): + continue + + if identifiers_match(property_identifiers, prop_identifiers): + return True + else: + # Property type matched and no identifier check needed + return True + + return False + + +async def fetch_adagents( + publisher_domain: str, + timeout: float = 10.0, + user_agent: str = "AdCP-Client/1.0", + client: httpx.AsyncClient | None = None, +) -> dict[str, Any]: + """Fetch and parse adagents.json from publisher domain. + + Args: + publisher_domain: Domain hosting the adagents.json file + timeout: Request timeout in seconds + user_agent: User-Agent header for HTTP request + client: Optional httpx.AsyncClient for connection pooling. + If provided, caller is responsible for client lifecycle. + If None, a new client is created for this request. + + Returns: + Parsed adagents.json data + + Raises: + AdagentsNotFoundError: If adagents.json not found (404) + AdagentsValidationError: If JSON is invalid or malformed + AdagentsTimeoutError: If request times out + + Notes: + For production use with multiple requests, pass a shared httpx.AsyncClient + to enable connection pooling and improve performance. + """ + # Validate and normalize domain for security + publisher_domain = _validate_publisher_domain(publisher_domain) + + # Construct URL + url = f"https://{publisher_domain}/.well-known/adagents.json" + + try: + # Use provided client or create a new one + if client is not None: + # Reuse provided client (connection pooling) + response = await client.get( + url, + headers={"User-Agent": user_agent}, + timeout=timeout, + follow_redirects=True, + ) + else: + # Create new client for single request + async with httpx.AsyncClient() as new_client: + response = await new_client.get( + url, + headers={"User-Agent": user_agent}, + timeout=timeout, + follow_redirects=True, + ) + + # Process response (same for both paths) + if response.status_code == 404: + raise AdagentsNotFoundError(publisher_domain) + + if response.status_code != 200: + raise AdagentsValidationError( + f"Failed to fetch adagents.json: HTTP {response.status_code}" + ) + + # Parse JSON + try: + data = response.json() + except Exception as e: + raise AdagentsValidationError(f"Invalid JSON in adagents.json: {e}") from e + + # Validate basic structure + if not isinstance(data, dict): + raise AdagentsValidationError("adagents.json must be a JSON object") + + if "authorized_agents" not in data: + raise AdagentsValidationError( + "adagents.json must have 'authorized_agents' field" + ) + + if not isinstance(data["authorized_agents"], list): + raise AdagentsValidationError("'authorized_agents' must be an array") + + return data + + except httpx.TimeoutException as e: + raise AdagentsTimeoutError(publisher_domain, timeout) from e + except httpx.RequestError as e: + raise AdagentsValidationError(f"Failed to fetch adagents.json: {e}") from e + + +async def verify_agent_for_property( + publisher_domain: str, + agent_url: str, + property_identifiers: list[dict[str, str]], + property_type: str | None = None, + timeout: float = 10.0, + client: httpx.AsyncClient | None = None, +) -> bool: + """Convenience wrapper to fetch adagents.json and verify authorization in one call. + + Args: + publisher_domain: Domain hosting the adagents.json file + agent_url: URL of the sales agent to verify + property_identifiers: List of identifiers to match + property_type: Type of property (website, app, etc.) - optional + timeout: Request timeout in seconds + client: Optional httpx.AsyncClient for connection pooling + + Returns: + True if agent is authorized, False otherwise + + Raises: + AdagentsNotFoundError: If adagents.json not found (404) + AdagentsValidationError: If JSON is invalid or malformed + AdagentsTimeoutError: If request times out + """ + adagents_data = await fetch_adagents(publisher_domain, timeout=timeout, client=client) + return verify_agent_authorization( + adagents_data=adagents_data, + agent_url=agent_url, + property_type=property_type, + property_identifiers=property_identifiers, + ) + + +def get_all_properties(adagents_data: dict[str, Any]) -> list[dict[str, Any]]: + """Extract all properties from adagents.json data. + + Args: + adagents_data: Parsed adagents.json data + + Returns: + List of all properties across all authorized agents, with agent_url added + + Raises: + AdagentsValidationError: If adagents_data is malformed + """ + if not isinstance(adagents_data, dict): + raise AdagentsValidationError("adagents_data must be a dictionary") + + authorized_agents = adagents_data.get("authorized_agents") + if not isinstance(authorized_agents, list): + raise AdagentsValidationError("adagents.json must have 'authorized_agents' array") + + properties = [] + for agent in authorized_agents: + if not isinstance(agent, dict): + continue + + agent_url = agent.get("url", "") + if not agent_url: + continue + + agent_properties = agent.get("properties", []) + if not isinstance(agent_properties, list): + continue + + # Add each property with the agent URL for reference + for prop in agent_properties: + if isinstance(prop, dict): + # Create a copy and add agent_url + prop_with_agent = {**prop, "agent_url": agent_url} + properties.append(prop_with_agent) + + return properties + + +def get_all_tags(adagents_data: dict[str, Any]) -> set[str]: + """Extract all unique tags from properties in adagents.json data. + + Args: + adagents_data: Parsed adagents.json data + + Returns: + Set of all unique tags across all properties + + Raises: + AdagentsValidationError: If adagents_data is malformed + """ + properties = get_all_properties(adagents_data) + tags = set() + + for prop in properties: + prop_tags = prop.get("tags", []) + if isinstance(prop_tags, list): + for tag in prop_tags: + if isinstance(tag, str): + tags.add(tag) + + return tags + + +def get_properties_by_agent(adagents_data: dict[str, Any], agent_url: str) -> list[dict[str, Any]]: + """Get all properties authorized for a specific agent. + + Args: + adagents_data: Parsed adagents.json data + agent_url: URL of the agent to filter by + + Returns: + List of properties for the specified agent (empty if agent not found or no properties) + + Raises: + AdagentsValidationError: If adagents_data is malformed + """ + if not isinstance(adagents_data, dict): + raise AdagentsValidationError("adagents_data must be a dictionary") + + authorized_agents = adagents_data.get("authorized_agents") + if not isinstance(authorized_agents, list): + raise AdagentsValidationError("adagents.json must have 'authorized_agents' array") + + # Normalize the agent URL for comparison + normalized_agent_url = normalize_url(agent_url) + + for agent in authorized_agents: + if not isinstance(agent, dict): + continue + + agent_url_from_json = agent.get("url", "") + if not agent_url_from_json: + continue + + # Match agent URL (protocol-agnostic) + if normalize_url(agent_url_from_json) != normalized_agent_url: + continue + + # Found the agent - return their properties + properties = agent.get("properties", []) + if not isinstance(properties, list): + return [] + + return [p for p in properties if isinstance(p, dict)] + + return [] diff --git a/src/adcp/exceptions.py b/src/adcp/exceptions.py index 36e6ffdf..753a46dd 100644 --- a/src/adcp/exceptions.py +++ b/src/adcp/exceptions.py @@ -153,3 +153,33 @@ def __init__( f" # Handle error with full TaskResult context" ) super().__init__(message, agent_id, None, suggestion) + + +class AdagentsValidationError(ADCPError): + """Base error for adagents.json validation issues.""" + + +class AdagentsNotFoundError(AdagentsValidationError): + """adagents.json file not found (404).""" + + def __init__(self, publisher_domain: str): + """Initialize not found error.""" + message = f"adagents.json not found for domain: {publisher_domain}" + suggestion = ( + "Verify that the publisher has deployed adagents.json to:\n" + f" https://{publisher_domain}/.well-known/adagents.json" + ) + super().__init__(message, None, None, suggestion) + + +class AdagentsTimeoutError(AdagentsValidationError): + """Request for adagents.json timed out.""" + + def __init__(self, publisher_domain: str, timeout: float): + """Initialize timeout error.""" + message = f"Request to fetch adagents.json timed out after {timeout}s" + suggestion = ( + "The publisher's server may be slow or unresponsive.\n" + " Try increasing the timeout value or check the domain is correct." + ) + super().__init__(message, None, None, suggestion) diff --git a/tests/test_adagents.py b/tests/test_adagents.py new file mode 100644 index 00000000..624c0504 --- /dev/null +++ b/tests/test_adagents.py @@ -0,0 +1,613 @@ +from __future__ import annotations + +"""Tests for adagents.json validation functionality.""" + +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from adcp.adagents import ( + _normalize_domain, + _validate_publisher_domain, + domain_matches, + get_all_properties, + get_all_tags, + get_properties_by_agent, + identifiers_match, + verify_agent_authorization, +) +from adcp.exceptions import ( + AdagentsValidationError, +) + + +def create_mock_httpx_client(mock_response): + """Helper to create a properly mocked httpx.AsyncClient.""" + mock_get = AsyncMock(return_value=mock_response) + mock_client_instance = MagicMock() + mock_client_instance.get = mock_get + mock_client_instance.__aenter__.return_value = mock_client_instance + mock_client_instance.__aexit__.return_value = AsyncMock() + return mock_client_instance + + +class TestDomainNormalization: + """Test domain normalization function.""" + + def test_normalize_basic(self): + """Basic normalization should work.""" + assert _normalize_domain("Example.COM") == "example.com" + assert _normalize_domain(" example.com ") == "example.com" + + def test_normalize_trailing_slash(self): + """Should remove trailing slashes.""" + assert _normalize_domain("example.com/") == "example.com" + assert _normalize_domain("example.com///") == "example.com" + + def test_normalize_trailing_dot(self): + """Should remove trailing dots.""" + assert _normalize_domain("example.com.") == "example.com" + assert _normalize_domain("example.com...") == "example.com" + + def test_normalize_both(self): + """Should remove both trailing slashes and dots.""" + assert _normalize_domain("example.com/.") == "example.com" + + def test_normalize_invalid_double_dots(self): + """Double dots should raise error.""" + with pytest.raises(AdagentsValidationError, match="Invalid domain format"): + _normalize_domain("example..com") + + def test_normalize_empty(self): + """Empty string should raise error.""" + with pytest.raises(AdagentsValidationError, match="Invalid domain format"): + _normalize_domain("") + with pytest.raises(AdagentsValidationError, match="Invalid domain format"): + _normalize_domain(" ") + + +class TestPublisherDomainValidation: + """Test publisher domain validation for security.""" + + def test_validate_basic(self): + """Basic valid domains should pass.""" + assert _validate_publisher_domain("example.com") == "example.com" + assert _validate_publisher_domain("sub.example.com") == "sub.example.com" + + def test_validate_removes_protocol(self): + """Should strip protocol if present.""" + assert _validate_publisher_domain("https://example.com") == "example.com" + assert _validate_publisher_domain("http://example.com") == "example.com" + + def test_validate_removes_path(self): + """Should strip path if present.""" + assert _validate_publisher_domain("example.com/path") == "example.com" + assert _validate_publisher_domain("https://example.com/path") == "example.com" + + def test_validate_case_insensitive(self): + """Should normalize to lowercase.""" + assert _validate_publisher_domain("EXAMPLE.COM") == "example.com" + + def test_validate_empty(self): + """Empty domain should raise error.""" + with pytest.raises(AdagentsValidationError, match="cannot be empty"): + _validate_publisher_domain("") + with pytest.raises(AdagentsValidationError, match="cannot be empty"): + _validate_publisher_domain(" ") + + def test_validate_too_long(self): + """Domain exceeding DNS max length should raise error.""" + long_domain = "a" * 254 + with pytest.raises(AdagentsValidationError, match="too long"): + _validate_publisher_domain(long_domain) + + def test_validate_suspicious_chars(self): + """Suspicious characters should raise error.""" + with pytest.raises(AdagentsValidationError, match="Invalid character"): + _validate_publisher_domain("example.com\\malicious") + with pytest.raises(AdagentsValidationError, match="Invalid character"): + _validate_publisher_domain("user@example.com") + with pytest.raises(AdagentsValidationError, match="Invalid character"): + _validate_publisher_domain("example.com with spaces") + with pytest.raises(AdagentsValidationError, match="Invalid character"): + _validate_publisher_domain("example.com\n") + + def test_validate_no_dots(self): + """Domain without dots should raise error.""" + with pytest.raises(AdagentsValidationError, match="must contain at least one dot"): + _validate_publisher_domain("localhost") + + +class TestDomainMatching: + """Test domain matching logic per AdCP spec.""" + + def test_exact_match(self): + """Exact domain match should succeed.""" + assert domain_matches("example.com", "example.com") + assert domain_matches("sub.example.com", "sub.example.com") + + def test_case_insensitive(self): + """Domain matching should be case-insensitive.""" + assert domain_matches("Example.com", "example.com") + assert domain_matches("example.com", "EXAMPLE.COM") + + def test_bare_domain_matches_www(self): + """Bare domain should match www subdomain.""" + assert domain_matches("www.example.com", "example.com") + assert domain_matches("m.example.com", "example.com") + + def test_bare_domain_does_not_match_other_subdomains(self): + """Bare domain should NOT match arbitrary subdomains.""" + assert not domain_matches("api.example.com", "example.com") + assert not domain_matches("cdn.example.com", "example.com") + + def test_specific_subdomain_does_not_match_others(self): + """Specific subdomain should only match itself.""" + assert not domain_matches("www.example.com", "api.example.com") + assert domain_matches("api.example.com", "api.example.com") + + def test_wildcard_matches_all_subdomains(self): + """Wildcard pattern should match all subdomains.""" + assert domain_matches("www.example.com", "*.example.com") + assert domain_matches("api.example.com", "*.example.com") + assert domain_matches("cdn.example.com", "*.example.com") + assert domain_matches("sub.api.example.com", "*.example.com") + + def test_wildcard_does_not_match_base_domain(self): + """Wildcard should not match the base domain without subdomain.""" + assert not domain_matches("example.com", "*.example.com") + + def test_no_match_different_domains(self): + """Different domains should not match.""" + assert not domain_matches("example.com", "other.com") + assert not domain_matches("www.example.com", "other.com") + + +class TestIdentifierMatching: + """Test identifier matching logic.""" + + def test_domain_identifier_uses_domain_matching(self): + """Domain identifiers should use domain matching rules.""" + property_ids = [{"type": "domain", "value": "www.example.com"}] + agent_ids = [{"type": "domain", "value": "example.com"}] + assert identifiers_match(property_ids, agent_ids) + + def test_bundle_id_exact_match(self): + """Bundle IDs require exact match.""" + property_ids = [{"type": "bundle_id", "value": "com.example.app"}] + agent_ids = [{"type": "bundle_id", "value": "com.example.app"}] + assert identifiers_match(property_ids, agent_ids) + + def test_bundle_id_no_partial_match(self): + """Bundle IDs should not partially match.""" + property_ids = [{"type": "bundle_id", "value": "com.example.app"}] + agent_ids = [{"type": "bundle_id", "value": "com.example"}] + assert not identifiers_match(property_ids, agent_ids) + + def test_type_mismatch(self): + """Different identifier types should not match.""" + property_ids = [{"type": "domain", "value": "example.com"}] + agent_ids = [{"type": "bundle_id", "value": "example.com"}] + assert not identifiers_match(property_ids, agent_ids) + + def test_multiple_identifiers_any_match(self): + """Should match if ANY identifier matches.""" + property_ids = [ + {"type": "domain", "value": "example.com"}, + {"type": "bundle_id", "value": "com.example.app"}, + ] + agent_ids = [{"type": "bundle_id", "value": "com.example.app"}] + assert identifiers_match(property_ids, agent_ids) + + def test_no_match_empty_lists(self): + """Empty lists should not match.""" + assert not identifiers_match([], []) + assert not identifiers_match([{"type": "domain", "value": "example.com"}], []) + + +class TestVerifyAgentAuthorization: + """Test agent authorization verification.""" + + def test_agent_authorized_no_properties_restriction(self): + """Agent with empty properties array is authorized for all properties.""" + adagents_data = { + "authorized_agents": [{"url": "https://sales-agent.example.com", "properties": []}] + } + assert verify_agent_authorization( + adagents_data, "https://sales-agent.example.com", None, None + ) + + def test_agent_authorized_no_properties_field(self): + """Agent without properties field is authorized for all properties.""" + adagents_data = {"authorized_agents": [{"url": "https://sales-agent.example.com"}]} + assert verify_agent_authorization( + adagents_data, "https://sales-agent.example.com", None, None + ) + + def test_agent_url_protocol_agnostic(self): + """Agent URL matching should ignore protocol.""" + adagents_data = {"authorized_agents": [{"url": "https://sales-agent.example.com"}]} + assert verify_agent_authorization( + adagents_data, "http://sales-agent.example.com", None, None + ) + + def test_agent_url_trailing_slash_ignored(self): + """Agent URL matching should ignore trailing slash.""" + adagents_data = {"authorized_agents": [{"url": "https://sales-agent.example.com/"}]} + assert verify_agent_authorization( + adagents_data, "https://sales-agent.example.com", None, None + ) + + def test_agent_authorized_specific_property(self): + """Agent authorized for specific property type and identifiers.""" + adagents_data = { + "authorized_agents": [ + { + "url": "https://sales-agent.example.com", + "properties": [ + { + "property_type": "website", + "name": "Example Site", + "identifiers": [{"type": "domain", "value": "example.com"}], + } + ], + } + ] + } + assert verify_agent_authorization( + adagents_data, + "https://sales-agent.example.com", + "website", + [{"type": "domain", "value": "www.example.com"}], + ) + + def test_agent_not_authorized_wrong_property_type(self): + """Agent should not be authorized for wrong property type.""" + adagents_data = { + "authorized_agents": [ + { + "url": "https://sales-agent.example.com", + "properties": [ + { + "property_type": "website", + "identifiers": [{"type": "domain", "value": "example.com"}], + } + ], + } + ] + } + assert not verify_agent_authorization( + adagents_data, + "https://sales-agent.example.com", + "mobile_app", + [{"type": "domain", "value": "example.com"}], + ) + + def test_agent_not_authorized_wrong_identifier(self): + """Agent should not be authorized for wrong identifier.""" + adagents_data = { + "authorized_agents": [ + { + "url": "https://sales-agent.example.com", + "properties": [ + { + "property_type": "website", + "identifiers": [{"type": "domain", "value": "example.com"}], + } + ], + } + ] + } + assert not verify_agent_authorization( + adagents_data, + "https://sales-agent.example.com", + "website", + [{"type": "domain", "value": "other.com"}], + ) + + def test_agent_not_in_list(self): + """Agent not in authorized_agents list should not be authorized.""" + adagents_data = { + "authorized_agents": [{"url": "https://other-agent.example.com", "properties": []}] + } + assert not verify_agent_authorization( + adagents_data, "https://sales-agent.example.com", None, None + ) + + def test_multiple_agents(self): + """Should find correct agent in list.""" + adagents_data = { + "authorized_agents": [ + {"url": "https://agent1.example.com", "properties": []}, + {"url": "https://agent2.example.com", "properties": []}, + {"url": "https://sales-agent.example.com", "properties": []}, + ] + } + assert verify_agent_authorization( + adagents_data, "https://sales-agent.example.com", None, None + ) + + def test_invalid_adagents_data_not_dict(self): + """Should raise error if adagents_data is not a dict.""" + with pytest.raises(AdagentsValidationError, match="must be a dictionary"): + verify_agent_authorization([], "https://agent.example.com", None, None) + + def test_invalid_adagents_data_no_authorized_agents(self): + """Should raise error if authorized_agents field is missing.""" + with pytest.raises(AdagentsValidationError, match="authorized_agents"): + verify_agent_authorization({}, "https://agent.example.com", None, None) + + def test_invalid_authorized_agents_not_list(self): + """Should raise error if authorized_agents is not a list.""" + with pytest.raises(AdagentsValidationError, match="authorized_agents"): + verify_agent_authorization( + {"authorized_agents": "not a list"}, "https://agent.example.com", None, None + ) + + def test_property_type_match_without_identifiers(self): + """Should match property type even without identifier check.""" + adagents_data = { + "authorized_agents": [ + { + "url": "https://sales-agent.example.com", + "properties": [ + { + "property_type": "website", + "identifiers": [{"type": "domain", "value": "example.com"}], + } + ], + } + ] + } + # When property_identifiers is None, just check property_type + assert verify_agent_authorization( + adagents_data, "https://sales-agent.example.com", "website", None + ) + + +class TestFetchAdagents: + """Test fetching adagents.json from publisher domains. + + Note: These tests would require proper httpx mocking or integration testing. + For now, we focus on unit testing the core logic (domain matching, + identifier matching, and authorization verification) which are tested above. + The fetch_adagents function is straightforward HTTP + JSON parsing that + calls verify_agent_authorization with the parsed data. + """ + + @pytest.mark.skip(reason="Integration test - requires httpx mocking or real HTTP calls") + @pytest.mark.asyncio + async def test_fetch_success(self): + """Should successfully fetch and parse adagents.json.""" + pass + + +class TestVerifyAgentForProperty: + """Test convenience wrapper for fetching and verifying in one call. + + Note: These tests would require proper httpx mocking or integration testing. + The function is a thin wrapper around fetch_adagents + verify_agent_authorization, + both of which are tested separately above. + """ + + @pytest.mark.skip(reason="Integration test - requires httpx mocking or real HTTP calls") + @pytest.mark.asyncio + async def test_verify_success(self): + """Should fetch and verify authorization successfully.""" + pass + + +class TestGetAllProperties: + """Test extracting all properties from adagents.json data.""" + + def test_get_all_properties(self): + """Should extract all properties from all agents.""" + adagents_data = { + "authorized_agents": [ + { + "url": "https://agent1.example.com", + "properties": [ + { + "property_type": "website", + "name": "Site 1", + "identifiers": [{"type": "domain", "value": "site1.com"}], + }, + { + "property_type": "mobile_app", + "name": "App 1", + "identifiers": [{"type": "bundle_id", "value": "com.site1.app"}], + }, + ], + }, + { + "url": "https://agent2.example.com", + "properties": [ + { + "property_type": "website", + "name": "Site 2", + "identifiers": [{"type": "domain", "value": "site2.com"}], + } + ], + }, + ] + } + + properties = get_all_properties(adagents_data) + assert len(properties) == 3 + assert properties[0]["name"] == "Site 1" + assert properties[0]["agent_url"] == "https://agent1.example.com" + assert properties[1]["name"] == "App 1" + assert properties[1]["agent_url"] == "https://agent1.example.com" + assert properties[2]["name"] == "Site 2" + assert properties[2]["agent_url"] == "https://agent2.example.com" + + def test_get_all_properties_with_empty_properties(self): + """Should handle agents with empty properties array.""" + adagents_data = { + "authorized_agents": [ + {"url": "https://agent1.example.com", "properties": []}, + { + "url": "https://agent2.example.com", + "properties": [ + { + "property_type": "website", + "name": "Site", + "identifiers": [{"type": "domain", "value": "site.com"}], + } + ], + }, + ] + } + + properties = get_all_properties(adagents_data) + assert len(properties) == 1 + assert properties[0]["name"] == "Site" + + def test_get_all_properties_invalid_data(self): + """Should raise error for invalid data.""" + with pytest.raises(AdagentsValidationError): + get_all_properties([]) + + +class TestGetAllTags: + """Test extracting all unique tags from adagents.json data.""" + + def test_get_all_tags(self): + """Should extract all unique tags from properties.""" + adagents_data = { + "authorized_agents": [ + { + "url": "https://agent1.example.com", + "properties": [ + { + "property_type": "website", + "name": "Site 1", + "identifiers": [{"type": "domain", "value": "site1.com"}], + "tags": ["premium", "news"], + }, + { + "property_type": "mobile_app", + "name": "App 1", + "identifiers": [{"type": "bundle_id", "value": "com.site1.app"}], + "tags": ["mobile", "premium"], + }, + ], + }, + { + "url": "https://agent2.example.com", + "properties": [ + { + "property_type": "website", + "name": "Site 2", + "identifiers": [{"type": "domain", "value": "site2.com"}], + "tags": ["sports"], + } + ], + }, + ] + } + + tags = get_all_tags(adagents_data) + assert tags == {"premium", "news", "mobile", "sports"} + + def test_get_all_tags_no_tags(self): + """Should return empty set when no tags present.""" + adagents_data = { + "authorized_agents": [ + { + "url": "https://agent1.example.com", + "properties": [ + { + "property_type": "website", + "name": "Site 1", + "identifiers": [{"type": "domain", "value": "site1.com"}], + } + ], + } + ] + } + + tags = get_all_tags(adagents_data) + assert tags == set() + + +class TestGetPropertiesByAgent: + """Test getting properties for a specific agent.""" + + def test_get_properties_by_agent(self): + """Should return properties for specified agent.""" + adagents_data = { + "authorized_agents": [ + { + "url": "https://agent1.example.com", + "properties": [ + { + "property_type": "website", + "name": "Site 1", + "identifiers": [{"type": "domain", "value": "site1.com"}], + }, + { + "property_type": "mobile_app", + "name": "App 1", + "identifiers": [{"type": "bundle_id", "value": "com.site1.app"}], + }, + ], + }, + { + "url": "https://agent2.example.com", + "properties": [ + { + "property_type": "website", + "name": "Site 2", + "identifiers": [{"type": "domain", "value": "site2.com"}], + } + ], + }, + ] + } + + properties = get_properties_by_agent(adagents_data, "https://agent1.example.com") + assert len(properties) == 2 + assert properties[0]["name"] == "Site 1" + assert properties[1]["name"] == "App 1" + + def test_get_properties_by_agent_protocol_agnostic(self): + """Should match agent URL regardless of protocol.""" + adagents_data = { + "authorized_agents": [ + { + "url": "https://agent1.example.com", + "properties": [ + { + "property_type": "website", + "name": "Site 1", + "identifiers": [{"type": "domain", "value": "site1.com"}], + } + ], + } + ] + } + + properties = get_properties_by_agent(adagents_data, "http://agent1.example.com") + assert len(properties) == 1 + assert properties[0]["name"] == "Site 1" + + def test_get_properties_by_agent_not_found(self): + """Should return empty list for unknown agent.""" + adagents_data = { + "authorized_agents": [ + { + "url": "https://agent1.example.com", + "properties": [ + { + "property_type": "website", + "name": "Site 1", + "identifiers": [{"type": "domain", "value": "site1.com"}], + } + ], + } + ] + } + + properties = get_properties_by_agent(adagents_data, "https://unknown-agent.com") + assert len(properties) == 0