diff --git a/src/adcp/decisioning/handler.py b/src/adcp/decisioning/handler.py index 90d8dc72c..d0c7051ee 100644 --- a/src/adcp/decisioning/handler.py +++ b/src/adcp/decisioning/handler.py @@ -52,24 +52,88 @@ from adcp.decisioning.types import Account from adcp.types import ( AccountReference, + AcquireRightsRequest, + AcquireRightsResponse, + ActivateSignalRequest, + ActivateSignalSuccessResponse, + BuildCreativeRequest, + BuildCreativeResponse, + CalibrateContentRequest, + CalibrateContentResponse, + CheckGovernanceRequest, + CheckGovernanceResponse, + CreateCollectionListRequest, + CreateCollectionListResponse, + CreateContentStandardsRequest, + CreateContentStandardsResponse, CreateMediaBuyRequest, CreateMediaBuySuccessResponse, + CreatePropertyListRequest, + CreatePropertyListResponse, + DeleteCollectionListRequest, + DeleteCollectionListResponse, + DeletePropertyListRequest, + DeletePropertyListResponse, + GetBrandIdentityRequest, + GetBrandIdentitySuccessResponse, + GetCollectionListRequest, + GetCollectionListResponse, + GetContentStandardsRequest, + GetContentStandardsResponse, + GetCreativeDeliveryRequest, + GetCreativeDeliveryResponse, + GetCreativeFeaturesRequest, + GetCreativeFeaturesResponse, + GetMediaBuyArtifactsRequest, + GetMediaBuyArtifactsResponse, GetMediaBuyDeliveryRequest, GetMediaBuyDeliveryResponse, GetMediaBuysRequest, GetMediaBuysResponse, + GetPlanAuditLogsRequest, + GetPlanAuditLogsResponse, GetProductsRequest, GetProductsResponse, + GetPropertyListRequest, + GetPropertyListResponse, + GetRightsRequest, + GetRightsSuccessResponse, + GetSignalsRequest, + GetSignalsResponse, + ListCollectionListsRequest, + ListCollectionListsResponse, + ListContentStandardsRequest, + ListContentStandardsResponse, ListCreativeFormatsRequest, ListCreativeFormatsResponse, ListCreativesRequest, ListCreativesResponse, + ListPropertyListsRequest, + ListPropertyListsResponse, + PreviewCreativeRequest, + PreviewCreativeResponse, ProvidePerformanceFeedbackRequest, ProvidePerformanceFeedbackResponse, + ReportPlanOutcomeRequest, + ReportPlanOutcomeResponse, + SyncAudiencesRequest, + SyncAudiencesSuccessResponse, SyncCreativesRequest, SyncCreativesSuccessResponse, + SyncPlansRequest, + SyncPlansResponse, + UpdateCollectionListRequest, + UpdateCollectionListResponse, + UpdateContentStandardsRequest, + UpdateContentStandardsResponse, UpdateMediaBuyRequest, UpdateMediaBuySuccessResponse, + UpdatePropertyListRequest, + UpdatePropertyListResponse, + UpdateRightsRequest, + UpdateRightsResponse, + ValidateContentDeliveryRequest, + ValidateContentDeliveryResponse, ) from adcp.webhook_sender import WebhookSender @@ -78,15 +142,13 @@ # Class-level advertised tool surface # --------------------------------------------------------------------------- -#: All sales-* tools the v6.0 PlatformHandler shim covers. Auto-registered -#: with the framework's tool-discovery seam via ``__init_subclass__`` — -#: ``tools/list`` filters to this set unless the operator passes -#: ``advertise_all=True``. Adopters who only implement a subset of these -#: methods on their ``DecisioningPlatform`` subclass: the framework's -#: existing override-detection (``_is_method_overridden``) handles the -#: filter — methods inherited from the base ``DecisioningPlatform`` (which -#: doesn't define them) are NOT in the override set, so the framework -#: drops the tool from ``tools/list`` automatically. +#: All wire tools the PlatformHandler shim covers. Each Protocol family +#: contributes its required + optional methods. The framework's +#: ``tools/list`` filters to this set; adopters get only the tools their +#: claimed specialism Protocols cover, plus the framework's +#: ``_is_method_overridden`` filter drops shims whose platform method +#: isn't implemented (sales-only adopters don't accidentally advertise +#: ``build_creative``). _SALES_ADVERTISED_TOOLS: frozenset[str] = frozenset( { "get_products", @@ -100,6 +162,156 @@ "list_creatives", } ) +_CREATIVE_ADVERTISED_TOOLS: frozenset[str] = frozenset( + { + "build_creative", + "preview_creative", + "get_creative_delivery", + } +) +_SIGNALS_ADVERTISED_TOOLS: frozenset[str] = frozenset( + { + "get_signals", + "activate_signal", + } +) +_AUDIENCE_ADVERTISED_TOOLS: frozenset[str] = frozenset( + { + "sync_audiences", + } +) +_GOVERNANCE_ADVERTISED_TOOLS: frozenset[str] = frozenset( + { + "check_governance", + "sync_plans", + "report_plan_outcome", + "get_plan_audit_logs", + } +) +_BRAND_RIGHTS_ADVERTISED_TOOLS: frozenset[str] = frozenset( + { + "get_brand_identity", + "get_rights", + "acquire_rights", + "update_rights", + } +) +_CONTENT_STANDARDS_ADVERTISED_TOOLS: frozenset[str] = frozenset( + { + "list_content_standards", + "get_content_standards", + "create_content_standards", + "update_content_standards", + "calibrate_content", + "validate_content_delivery", + "get_media_buy_artifacts", + "get_creative_features", + } +) +_PROPERTY_LISTS_ADVERTISED_TOOLS: frozenset[str] = frozenset( + { + "create_property_list", + "update_property_list", + "get_property_list", + "list_property_lists", + "delete_property_list", + } +) +_COLLECTION_LISTS_ADVERTISED_TOOLS: frozenset[str] = frozenset( + { + "create_collection_list", + "update_collection_list", + "get_collection_list", + "list_collection_lists", + "delete_collection_list", + } +) + +#: Methods adopters MAY leave unimplemented per their Protocol. The shim +#: surfaces ``AdcpError(code='UNSUPPORTED_FEATURE')`` to buyers calling +#: an unimplemented optional method instead of leaking AttributeError. +#: Required methods (per ``REQUIRED_METHODS_PER_SPECIALISM``) are +#: enforced at server boot by ``validate_platform`` — the optional set +#: complements that gate at runtime. +_OPTIONAL_PLATFORM_METHODS: frozenset[str] = frozenset( + { + # Sales-* optional (gated by claim, not method presence) + "get_media_buys", + "provide_performance_feedback", + "list_creative_formats", + "list_creatives", + # CreativeBuilderPlatform optional + "preview_creative", + # ContentStandardsPlatform optional analyzer reads + "get_media_buy_artifacts", + "get_creative_features", + # AudiencePlatform adopter-internal helper (not wire-served, but + # listed here for symmetry should a future shim wire it) + "poll_audience_statuses", + } +) + + +def _project_build_creative(result: Any) -> Any: + """Project the adopter's ``build_creative`` return into the wire + envelope shape. + + The :class:`CreativeBuilderPlatform.build_creative` Protocol + declares the return as ``CreativeManifest | Sequence[CreativeManifest] + | BuildCreativeSuccessResponse`` — three ergonomic arms. The wire + envelope per ``schemas/cache/media-buy/build-creative-response.json`` + has only two success arms: ``{creative_manifest: ...}`` (single) + or ``{creative_manifests: [...]}`` (multi). This helper wraps the + bare-manifest and list cases. + + Mirrors the JS-side ``projectBuildCreativeReturn`` at + ``src/lib/server/decisioning/runtime/from-platform.ts``. Without + this, an adopter returning a bare :class:`CreativeManifest` (which + the Protocol explicitly allows) would ship an unwrapped object that + fails wire ``oneOf`` validation downstream. + """ + # Already an envelope (has the wire field present). + if hasattr(result, "creative_manifest") or hasattr(result, "creative_manifests"): + return result + if isinstance(result, dict) and ( + "creative_manifest" in result or "creative_manifests" in result + ): + return result + # Sequence of manifests → multi-success envelope. + if isinstance(result, list): + return { + "creative_manifests": [ + m.model_dump(mode="json") if hasattr(m, "model_dump") else m for m in result + ] + } + # Bare CreativeManifest → single-success envelope. + if hasattr(result, "model_dump"): + return {"creative_manifest": result.model_dump(mode="json")} + # Unknown shape — pass through and let wire validation surface. + return result + + +def _project_sync_audiences(result: Any) -> Any: + """Project the adopter's ``sync_audiences`` return into the wire + envelope shape. + + The :class:`AudiencePlatform.sync_audiences` Protocol allows + adopters to return either a list of audience-result rows (the + JS-side ergonomic) or a fully-shaped + :class:`SyncAudiencesSuccessResponse`. The wire envelope per + ``schemas/cache/media-buy/sync-audiences-response.json`` is + ``{audiences: [rows]}``. This helper wraps the list case. + + Mirrors the JS-side response wrapping at + ``src/lib/server/decisioning/runtime/from-platform.ts:2242-2249``. + """ + if isinstance(result, list): + return { + "audiences": [ + r.model_dump(mode="json") if hasattr(r, "model_dump") else r for r in result + ] + } + return result class PlatformHandler(ADCPHandler[ToolContext]): @@ -131,7 +343,17 @@ class PlatformHandler(ADCPHandler[ToolContext]): they pass ``advertise_all=True``. """ - advertised_tools: ClassVar[set[str]] = set(_SALES_ADVERTISED_TOOLS) + advertised_tools: ClassVar[set[str]] = ( + set(_SALES_ADVERTISED_TOOLS) + | set(_CREATIVE_ADVERTISED_TOOLS) + | set(_SIGNALS_ADVERTISED_TOOLS) + | set(_AUDIENCE_ADVERTISED_TOOLS) + | set(_GOVERNANCE_ADVERTISED_TOOLS) + | set(_BRAND_RIGHTS_ADVERTISED_TOOLS) + | set(_CONTENT_STANDARDS_ADVERTISED_TOOLS) + | set(_PROPERTY_LISTS_ADVERTISED_TOOLS) + | set(_COLLECTION_LISTS_ADVERTISED_TOOLS) + ) _agent_type = "decisioning platform" @@ -468,5 +690,813 @@ async def list_creatives( # type: ignore[override] ), ) + # ----- Optional-method gate ----- + + def _require_platform_method(self, method_name: str) -> None: + """Raise ``UNSUPPORTED_FEATURE`` if the adopter's platform + doesn't implement ``method_name``. + + Used by shims for OPTIONAL Protocol methods (per + ``_OPTIONAL_PLATFORM_METHODS``). Required methods are caught + at server boot by ``validate_platform``; optional methods can + legitimately be absent and need a runtime gate. Without this, + a buyer calling an optional method on a platform that doesn't + implement it would see ``INTERNAL_ERROR`` from the + AttributeError wrapper in ``_invoke_platform_method`` — + adopter contract violation, not buyer-fixable. + """ + if not hasattr(self._platform, method_name): + from adcp.decisioning.types import AdcpError + + raise AdcpError( + "UNSUPPORTED_FEATURE", + message=( + f"This platform doesn't implement {method_name!r}. " + "The method is optional on the per-specialism Protocol; " + "the adopter chose not to wire it." + ), + recovery="terminal", + ) + + # ----- CreativeBuilderPlatform / CreativeAdServerPlatform ----- + + async def build_creative( # type: ignore[override] + self, + params: BuildCreativeRequest, + context: ToolContext | None = None, + ) -> BuildCreativeResponse: + """Build / retrieve a creative. + + Three discriminated return arms per the per-specialism + Protocol: a single :class:`CreativeManifest`, a list of + manifests (multi-format), or a fully-shaped + :class:`BuildCreativeSuccessResponse`. The shim projects bare + manifests / lists to the wire envelope shape so adopters can + return the ergonomic form (per the JS-side + ``projectBuildCreativeReturn`` parity). + + Wire envelope per + ``schemas/cache/media-buy/build-creative-response.json``: + ``{creative_manifest: ...}`` (single) or + ``{creative_manifests: [...]}`` (multi). + """ + self._require_platform_method("build_creative") + tool_ctx = context or ToolContext() + account = await self._resolve_account(getattr(params, "account", None), tool_ctx) + ctx = self._build_ctx(tool_ctx, account) + result = await _invoke_platform_method( + self._platform, + "build_creative", + params, + ctx, + executor=self._executor, + registry=self._registry, + ) + return cast("BuildCreativeResponse", _project_build_creative(result)) + + async def preview_creative( # type: ignore[override] + self, + params: PreviewCreativeRequest, + context: ToolContext | None = None, + ) -> PreviewCreativeResponse: + """Optional on :class:`CreativeBuilderPlatform`; required on + :class:`CreativeAdServerPlatform`. Surface + ``UNSUPPORTED_FEATURE`` when the adopter's platform doesn't + implement it (Builder adopters who don't render preview).""" + self._require_platform_method("preview_creative") + tool_ctx = context or ToolContext() + account = await self._resolve_account(getattr(params, "account", None), tool_ctx) + ctx = self._build_ctx(tool_ctx, account) + return cast( + "PreviewCreativeResponse", + await _invoke_platform_method( + self._platform, + "preview_creative", + params, + ctx, + executor=self._executor, + registry=self._registry, + ), + ) + + async def get_creative_delivery( # type: ignore[override] + self, + params: GetCreativeDeliveryRequest, + context: ToolContext | None = None, + ) -> GetCreativeDeliveryResponse: + """Required on :class:`CreativeAdServerPlatform` — per-creative + delivery actuals (impressions, spend, pacing).""" + tool_ctx = context or ToolContext() + account = await self._resolve_account(getattr(params, "account", None), tool_ctx) + ctx = self._build_ctx(tool_ctx, account) + result = await _invoke_platform_method( + self._platform, + "get_creative_delivery", + params, + ctx, + executor=self._executor, + registry=self._registry, + ) + self._maybe_auto_emit_sync_completion("get_creative_delivery", params, result) + return cast("GetCreativeDeliveryResponse", result) + + # ----- SignalsPlatform ----- + + async def get_signals( # type: ignore[override] + self, + params: GetSignalsRequest, + context: ToolContext | None = None, + ) -> GetSignalsResponse: + """Catalog discovery for signal-marketplace / signal-owned.""" + tool_ctx = context or ToolContext() + account = await self._resolve_account(getattr(params, "account", None), tool_ctx) + ctx = self._build_ctx(tool_ctx, account) + result = await _invoke_platform_method( + self._platform, + "get_signals", + params, + ctx, + executor=self._executor, + registry=self._registry, + ) + self._maybe_auto_emit_sync_completion("get_signals", params, result) + return cast("GetSignalsResponse", result) + + async def activate_signal( # type: ignore[override] + self, + params: ActivateSignalRequest, + context: ToolContext | None = None, + ) -> ActivateSignalSuccessResponse: + """Provision a signal onto destination platforms.""" + tool_ctx = context or ToolContext() + account = await self._resolve_account(getattr(params, "account", None), tool_ctx) + ctx = self._build_ctx(tool_ctx, account) + result = await _invoke_platform_method( + self._platform, + "activate_signal", + params, + ctx, + executor=self._executor, + registry=self._registry, + ) + self._maybe_auto_emit_sync_completion("activate_signal", params, result) + return cast("ActivateSignalSuccessResponse", result) + + # ----- AudiencePlatform ----- + + async def sync_audiences( # type: ignore[override] + self, + params: SyncAudiencesRequest, + context: ToolContext | None = None, + ) -> SyncAudiencesSuccessResponse: + """Push audiences to the platform. + + Wire shape carries ``audiences[]`` on the request; the platform + method signature is ``sync_audiences(audiences, ctx)`` — adopter + ergonomic per the JS reference. Arg-projection extracts the + list. + + Two return arms per the per-specialism Protocol: a list of + audience-result rows (the JS-side ergonomic) or a fully-shaped + :class:`SyncAudiencesSuccessResponse`. The shim projects the + list arm to the wire envelope ``{audiences: [...]}`` so adopters + can return the ergonomic form. + """ + tool_ctx = context or ToolContext() + account = await self._resolve_account(getattr(params, "account", None), tool_ctx) + ctx = self._build_ctx(tool_ctx, account) + result = await _invoke_platform_method( + self._platform, + "sync_audiences", + params, + ctx, + executor=self._executor, + registry=self._registry, + arg_projector={"audiences": getattr(params, "audiences", []) or []}, + ) + projected = _project_sync_audiences(result) + self._maybe_auto_emit_sync_completion("sync_audiences", params, projected) + return cast("SyncAudiencesSuccessResponse", projected) + + # ----- CampaignGovernancePlatform ----- + + async def check_governance( # type: ignore[override] + self, + params: CheckGovernanceRequest, + context: ToolContext | None = None, + ) -> CheckGovernanceResponse: + """Runtime governance decision (approved / denied / conditions). + + Wire request has no ``account`` field per + ``schemas/cache/governance/check-governance-request.json`` + (``additionalProperties: false`` — account is forbidden); + resolve via auth only. See :meth:`provide_performance_feedback` + for the no-ref account resolution caveat. + """ + tool_ctx = context or ToolContext() + account = await self._resolve_account(None, tool_ctx) + ctx = self._build_ctx(tool_ctx, account) + return cast( + "CheckGovernanceResponse", + await _invoke_platform_method( + self._platform, + "check_governance", + params, + ctx, + executor=self._executor, + registry=self._registry, + ), + ) + + async def sync_plans( # type: ignore[override] + self, + params: SyncPlansRequest, + context: ToolContext | None = None, + ) -> SyncPlansResponse: + """Plan CRUD with delta upsert into governance agent. + + Wire request has no ``account`` field per + ``schemas/cache/governance/sync-plans-request.json`` + (``additionalProperties: false``); resolve via auth only. + """ + tool_ctx = context or ToolContext() + account = await self._resolve_account(None, tool_ctx) + ctx = self._build_ctx(tool_ctx, account) + return cast( + "SyncPlansResponse", + await _invoke_platform_method( + self._platform, + "sync_plans", + params, + ctx, + executor=self._executor, + registry=self._registry, + ), + ) + + async def report_plan_outcome( # type: ignore[override] + self, + params: ReportPlanOutcomeRequest, + context: ToolContext | None = None, + ) -> ReportPlanOutcomeResponse: + """Outcome reporting from sellers (delivery actuals). + + Wire request has no ``account`` field per + ``schemas/cache/governance/report-plan-outcome-request.json`` + (``additionalProperties: false``); resolve via auth only. + """ + tool_ctx = context or ToolContext() + account = await self._resolve_account(None, tool_ctx) + ctx = self._build_ctx(tool_ctx, account) + return cast( + "ReportPlanOutcomeResponse", + await _invoke_platform_method( + self._platform, + "report_plan_outcome", + params, + ctx, + executor=self._executor, + registry=self._registry, + ), + ) + + async def get_plan_audit_logs( # type: ignore[override] + self, + params: GetPlanAuditLogsRequest, + context: ToolContext | None = None, + ) -> GetPlanAuditLogsResponse: + """Audit log read for governance decisions + outcomes. + + Wire request has no ``account`` field per + ``schemas/cache/governance/get-plan-audit-logs-request.json`` + (``additionalProperties: false``); resolve via auth only. + """ + tool_ctx = context or ToolContext() + account = await self._resolve_account(None, tool_ctx) + ctx = self._build_ctx(tool_ctx, account) + return cast( + "GetPlanAuditLogsResponse", + await _invoke_platform_method( + self._platform, + "get_plan_audit_logs", + params, + ctx, + executor=self._executor, + registry=self._registry, + ), + ) + + # ----- BrandRightsPlatform ----- + + async def get_brand_identity( # type: ignore[override] + self, + params: GetBrandIdentityRequest, + context: ToolContext | None = None, + ) -> GetBrandIdentitySuccessResponse: + """Read brand identity record (catalog + identity record). + + Wire request has no ``account`` field per + ``schemas/cache/brand-rights/get-brand-identity-request.json`` + (``additionalProperties: false``); resolve via auth only. See + :meth:`provide_performance_feedback` for the no-ref account + resolution caveat. + """ + tool_ctx = context or ToolContext() + account = await self._resolve_account(None, tool_ctx) + ctx = self._build_ctx(tool_ctx, account) + result = await _invoke_platform_method( + self._platform, + "get_brand_identity", + params, + ctx, + executor=self._executor, + registry=self._registry, + ) + self._maybe_auto_emit_sync_completion("get_brand_identity", params, result) + return cast("GetBrandIdentitySuccessResponse", result) + + async def get_rights( # type: ignore[override] + self, + params: GetRightsRequest, + context: ToolContext | None = None, + ) -> GetRightsSuccessResponse: + """List rights matching a brand + use query. + + Wire request has no ``account`` field per + ``schemas/cache/brand-rights/get-rights-request.json`` + (``additionalProperties: false``); resolve via auth only. + """ + tool_ctx = context or ToolContext() + account = await self._resolve_account(None, tool_ctx) + ctx = self._build_ctx(tool_ctx, account) + result = await _invoke_platform_method( + self._platform, + "get_rights", + params, + ctx, + executor=self._executor, + registry=self._registry, + ) + self._maybe_auto_emit_sync_completion("get_rights", params, result) + return cast("GetRightsSuccessResponse", result) + + async def acquire_rights( # type: ignore[override] + self, + params: AcquireRightsRequest, + context: ToolContext | None = None, + ) -> AcquireRightsResponse: + """Acquire rights — 4-arm discriminated success union + (acquired / pending / rejected / error). Rejection-as-data per + the Protocol; the ``error`` arm covers rights-system failures + the buyer can retry against (vs. ``AdcpError`` for adopter + infrastructure failures). + + Wire request has no ``account`` field per + ``schemas/cache/brand-rights/acquire-rights-request.json`` + (``additionalProperties: false``); resolve via auth only. + """ + tool_ctx = context or ToolContext() + account = await self._resolve_account(None, tool_ctx) + ctx = self._build_ctx(tool_ctx, account) + result = await _invoke_platform_method( + self._platform, + "acquire_rights", + params, + ctx, + executor=self._executor, + registry=self._registry, + ) + self._maybe_auto_emit_sync_completion("acquire_rights", params, result) + return cast("AcquireRightsResponse", result) + + async def update_rights( # type: ignore[override] + self, + params: UpdateRightsRequest, + context: ToolContext | None = None, + ) -> UpdateRightsResponse: + """Mutate an existing rights acquisition (extend term, change + usage scope, revoke). + + Wire request has no ``account`` field per + ``schemas/cache/brand-rights/update-rights-request.json`` + (``additionalProperties: false``); resolve via auth only. + + Not currently in :data:`SPEC_WEBHOOK_TASK_TYPES` — buyers + registering ``push_notification_config.url`` won't get an + auto-emit; rely on ``publishStatusChange`` for long-running + update state. Bump the spec enum and the + ``SPEC_WEBHOOK_TASK_TYPES`` constant in lockstep when this + joins the closed enum. + """ + tool_ctx = context or ToolContext() + account = await self._resolve_account(None, tool_ctx) + ctx = self._build_ctx(tool_ctx, account) + return cast( + "UpdateRightsResponse", + await _invoke_platform_method( + self._platform, + "update_rights", + params, + ctx, + executor=self._executor, + registry=self._registry, + ), + ) + + # ----- ContentStandardsPlatform ----- + + async def list_content_standards( # type: ignore[override] + self, + params: ListContentStandardsRequest, + context: ToolContext | None = None, + ) -> ListContentStandardsResponse: + """Discover content standards published by this agent. + + Wire request has no ``account`` field per + ``schemas/cache/content-standards/list-content-standards-request.json``; + resolve via auth only. + """ + tool_ctx = context or ToolContext() + account = await self._resolve_account(None, tool_ctx) + ctx = self._build_ctx(tool_ctx, account) + return cast( + "ListContentStandardsResponse", + await _invoke_platform_method( + self._platform, + "list_content_standards", + params, + ctx, + executor=self._executor, + registry=self._registry, + ), + ) + + async def get_content_standards( # type: ignore[override] + self, + params: GetContentStandardsRequest, + context: ToolContext | None = None, + ) -> GetContentStandardsResponse: + """Wire request has no ``account`` field per + ``schemas/cache/content-standards/get-content-standards-request.json``; + resolve via auth only.""" + tool_ctx = context or ToolContext() + account = await self._resolve_account(None, tool_ctx) + ctx = self._build_ctx(tool_ctx, account) + return cast( + "GetContentStandardsResponse", + await _invoke_platform_method( + self._platform, + "get_content_standards", + params, + ctx, + executor=self._executor, + registry=self._registry, + ), + ) + + async def create_content_standards( # type: ignore[override] + self, + params: CreateContentStandardsRequest, + context: ToolContext | None = None, + ) -> CreateContentStandardsResponse: + """Wire request has no ``account`` field; resolve via auth only.""" + tool_ctx = context or ToolContext() + account = await self._resolve_account(None, tool_ctx) + ctx = self._build_ctx(tool_ctx, account) + return cast( + "CreateContentStandardsResponse", + await _invoke_platform_method( + self._platform, + "create_content_standards", + params, + ctx, + executor=self._executor, + registry=self._registry, + ), + ) + + async def update_content_standards( # type: ignore[override] + self, + params: UpdateContentStandardsRequest, + context: ToolContext | None = None, + ) -> UpdateContentStandardsResponse: + """Wire request has no ``account`` field; resolve via auth only.""" + tool_ctx = context or ToolContext() + account = await self._resolve_account(None, tool_ctx) + ctx = self._build_ctx(tool_ctx, account) + return cast( + "UpdateContentStandardsResponse", + await _invoke_platform_method( + self._platform, + "update_content_standards", + params, + ctx, + executor=self._executor, + registry=self._registry, + ), + ) + + async def calibrate_content( # type: ignore[override] + self, + params: CalibrateContentRequest, + context: ToolContext | None = None, + ) -> CalibrateContentResponse: + """Calibrate content against published standards. + + Wire request has no ``account`` field per + ``schemas/cache/content-standards/calibrate-content-request.json``; + resolve via auth only. + """ + tool_ctx = context or ToolContext() + account = await self._resolve_account(None, tool_ctx) + ctx = self._build_ctx(tool_ctx, account) + return cast( + "CalibrateContentResponse", + await _invoke_platform_method( + self._platform, + "calibrate_content", + params, + ctx, + executor=self._executor, + registry=self._registry, + ), + ) + + async def validate_content_delivery( # type: ignore[override] + self, + params: ValidateContentDeliveryRequest, + context: ToolContext | None = None, + ) -> ValidateContentDeliveryResponse: + """Post-flight conformance check. + + Wire request has no ``account`` field per + ``schemas/cache/content-standards/validate-content-delivery-request.json``; + resolve via auth only. + """ + tool_ctx = context or ToolContext() + account = await self._resolve_account(None, tool_ctx) + ctx = self._build_ctx(tool_ctx, account) + return cast( + "ValidateContentDeliveryResponse", + await _invoke_platform_method( + self._platform, + "validate_content_delivery", + params, + ctx, + executor=self._executor, + registry=self._registry, + ), + ) + + async def get_media_buy_artifacts( # type: ignore[override] + self, + params: GetMediaBuyArtifactsRequest, + context: ToolContext | None = None, + ) -> GetMediaBuyArtifactsResponse: + """Optional analyzer read — adopters without artifact archival + surface ``UNSUPPORTED_FEATURE``.""" + self._require_platform_method("get_media_buy_artifacts") + tool_ctx = context or ToolContext() + account = await self._resolve_account(getattr(params, "account", None), tool_ctx) + ctx = self._build_ctx(tool_ctx, account) + return cast( + "GetMediaBuyArtifactsResponse", + await _invoke_platform_method( + self._platform, + "get_media_buy_artifacts", + params, + ctx, + executor=self._executor, + registry=self._registry, + ), + ) + + async def get_creative_features( # type: ignore[override] + self, + params: GetCreativeFeaturesRequest, + context: ToolContext | None = None, + ) -> GetCreativeFeaturesResponse: + """Optional analyzer read — adopters without analyzer pipelines + surface ``UNSUPPORTED_FEATURE``.""" + self._require_platform_method("get_creative_features") + tool_ctx = context or ToolContext() + account = await self._resolve_account(getattr(params, "account", None), tool_ctx) + ctx = self._build_ctx(tool_ctx, account) + return cast( + "GetCreativeFeaturesResponse", + await _invoke_platform_method( + self._platform, + "get_creative_features", + params, + ctx, + executor=self._executor, + registry=self._registry, + ), + ) + + # ----- PropertyListsPlatform ----- + + async def create_property_list( # type: ignore[override] + self, + params: CreatePropertyListRequest, + context: ToolContext | None = None, + ) -> CreatePropertyListResponse: + tool_ctx = context or ToolContext() + account = await self._resolve_account(getattr(params, "account", None), tool_ctx) + ctx = self._build_ctx(tool_ctx, account) + result = await _invoke_platform_method( + self._platform, + "create_property_list", + params, + ctx, + executor=self._executor, + registry=self._registry, + ) + self._maybe_auto_emit_sync_completion("create_property_list", params, result) + return cast("CreatePropertyListResponse", result) + + async def update_property_list( # type: ignore[override] + self, + params: UpdatePropertyListRequest, + context: ToolContext | None = None, + ) -> UpdatePropertyListResponse: + tool_ctx = context or ToolContext() + account = await self._resolve_account(getattr(params, "account", None), tool_ctx) + ctx = self._build_ctx(tool_ctx, account) + result = await _invoke_platform_method( + self._platform, + "update_property_list", + params, + ctx, + executor=self._executor, + registry=self._registry, + ) + self._maybe_auto_emit_sync_completion("update_property_list", params, result) + return cast("UpdatePropertyListResponse", result) + + async def get_property_list( # type: ignore[override] + self, + params: GetPropertyListRequest, + context: ToolContext | None = None, + ) -> GetPropertyListResponse: + tool_ctx = context or ToolContext() + account = await self._resolve_account(getattr(params, "account", None), tool_ctx) + ctx = self._build_ctx(tool_ctx, account) + result = await _invoke_platform_method( + self._platform, + "get_property_list", + params, + ctx, + executor=self._executor, + registry=self._registry, + ) + self._maybe_auto_emit_sync_completion("get_property_list", params, result) + return cast("GetPropertyListResponse", result) + + async def list_property_lists( # type: ignore[override] + self, + params: ListPropertyListsRequest, + context: ToolContext | None = None, + ) -> ListPropertyListsResponse: + tool_ctx = context or ToolContext() + account = await self._resolve_account(getattr(params, "account", None), tool_ctx) + ctx = self._build_ctx(tool_ctx, account) + result = await _invoke_platform_method( + self._platform, + "list_property_lists", + params, + ctx, + executor=self._executor, + registry=self._registry, + ) + self._maybe_auto_emit_sync_completion("list_property_lists", params, result) + return cast("ListPropertyListsResponse", result) + + async def delete_property_list( # type: ignore[override] + self, + params: DeletePropertyListRequest, + context: ToolContext | None = None, + ) -> DeletePropertyListResponse: + """Security-critical: revokes the per-seller fetch_token and + signals cache invalidation. Compromise-driven revocation MUST + also trigger this path.""" + tool_ctx = context or ToolContext() + account = await self._resolve_account(getattr(params, "account", None), tool_ctx) + ctx = self._build_ctx(tool_ctx, account) + result = await _invoke_platform_method( + self._platform, + "delete_property_list", + params, + ctx, + executor=self._executor, + registry=self._registry, + ) + self._maybe_auto_emit_sync_completion("delete_property_list", params, result) + return cast("DeletePropertyListResponse", result) + + # ----- CollectionListsPlatform ----- + + async def create_collection_list( # type: ignore[override] + self, + params: CreateCollectionListRequest, + context: ToolContext | None = None, + ) -> CreateCollectionListResponse: + tool_ctx = context or ToolContext() + account = await self._resolve_account(getattr(params, "account", None), tool_ctx) + ctx = self._build_ctx(tool_ctx, account) + return cast( + "CreateCollectionListResponse", + await _invoke_platform_method( + self._platform, + "create_collection_list", + params, + ctx, + executor=self._executor, + registry=self._registry, + ), + ) + + async def update_collection_list( # type: ignore[override] + self, + params: UpdateCollectionListRequest, + context: ToolContext | None = None, + ) -> UpdateCollectionListResponse: + tool_ctx = context or ToolContext() + account = await self._resolve_account(getattr(params, "account", None), tool_ctx) + ctx = self._build_ctx(tool_ctx, account) + return cast( + "UpdateCollectionListResponse", + await _invoke_platform_method( + self._platform, + "update_collection_list", + params, + ctx, + executor=self._executor, + registry=self._registry, + ), + ) + + async def get_collection_list( # type: ignore[override] + self, + params: GetCollectionListRequest, + context: ToolContext | None = None, + ) -> GetCollectionListResponse: + tool_ctx = context or ToolContext() + account = await self._resolve_account(getattr(params, "account", None), tool_ctx) + ctx = self._build_ctx(tool_ctx, account) + return cast( + "GetCollectionListResponse", + await _invoke_platform_method( + self._platform, + "get_collection_list", + params, + ctx, + executor=self._executor, + registry=self._registry, + ), + ) + + async def list_collection_lists( # type: ignore[override] + self, + params: ListCollectionListsRequest, + context: ToolContext | None = None, + ) -> ListCollectionListsResponse: + tool_ctx = context or ToolContext() + account = await self._resolve_account(getattr(params, "account", None), tool_ctx) + ctx = self._build_ctx(tool_ctx, account) + return cast( + "ListCollectionListsResponse", + await _invoke_platform_method( + self._platform, + "list_collection_lists", + params, + ctx, + executor=self._executor, + registry=self._registry, + ), + ) + + async def delete_collection_list( # type: ignore[override] + self, + params: DeleteCollectionListRequest, + context: ToolContext | None = None, + ) -> DeleteCollectionListResponse: + """Security-critical: revokes the fetch_token. See + :meth:`delete_property_list` for the same security contract.""" + tool_ctx = context or ToolContext() + account = await self._resolve_account(getattr(params, "account", None), tool_ctx) + ctx = self._build_ctx(tool_ctx, account) + return cast( + "DeleteCollectionListResponse", + await _invoke_platform_method( + self._platform, + "delete_collection_list", + params, + ctx, + executor=self._executor, + registry=self._registry, + ), + ) + __all__ = ["PlatformHandler"] diff --git a/tests/test_decisioning_handler_shims.py b/tests/test_decisioning_handler_shims.py new file mode 100644 index 000000000..7e7d89c9a --- /dev/null +++ b/tests/test_decisioning_handler_shims.py @@ -0,0 +1,932 @@ +"""Per-specialism handler shim coverage. + +The breadth-sprint (PRs #332-#335) shipped 10 Protocol classes + +REQUIRED_METHODS coverage at the static layer, but the Emma DX smoke +test surfaced that the runtime ``PlatformHandler`` only had 9 sales-* +shims — every non-sales tool 404'd at the wire even though +capabilities + validate_platform reported green. This file pins the +fix: every wire tool the framework dispatches has a handler shim that +routes from ``params: `` through ``_invoke_platform_method`` +to ``platform.(params, ctx)``. + +Test surfaces: + +* ``advertised_tools`` covers every spec wire tool we ship. +* Each shim routes through to the platform method (one per Protocol + family, smoke-tested via stub platform). +* Optional methods surface ``UNSUPPORTED_FEATURE`` when the platform + doesn't implement them — distinct from ``INTERNAL_ERROR`` (which is + what would happen without the runtime gate). +* The ``account`` field is gracefully optional — tools without + ``account`` on the wire still resolve via auth-only path. +""" + +from __future__ import annotations + +import asyncio +from concurrent.futures import ThreadPoolExecutor +from unittest.mock import AsyncMock + +import pytest + +from adcp.decisioning import ( + AdcpError, + DecisioningCapabilities, + DecisioningPlatform, + InMemoryTaskRegistry, + SingletonAccounts, +) +from adcp.decisioning.handler import ( + PlatformHandler, + _project_build_creative, + _project_sync_audiences, +) +from adcp.decisioning.webhook_emit import _BACKGROUND_WEBHOOK_TASKS +from adcp.server.base import ToolContext + + +@pytest.fixture +def executor(): + pool = ThreadPoolExecutor(max_workers=4, thread_name_prefix="test-shim-") + yield pool + pool.shutdown(wait=True) + + +# ---- advertised_tools coverage ---- + + +def test_advertised_tools_covers_every_specialism_wire_tool() -> None: + """``PlatformHandler.advertised_tools`` includes every wire tool + across all 10 Protocol families. Without this, the breadth-sprint + Protocols were dead code at runtime — buyers would 404 on + ``build_creative``, ``get_signals``, etc.""" + expected = { + # Sales + "get_products", + "create_media_buy", + "update_media_buy", + "sync_creatives", + "get_media_buy_delivery", + "get_media_buys", + "provide_performance_feedback", + "list_creative_formats", + "list_creatives", + # Creative (Builder + AdServer) + "build_creative", + "preview_creative", + "get_creative_delivery", + # Signals + "get_signals", + "activate_signal", + # Audience + "sync_audiences", + # Governance + "check_governance", + "sync_plans", + "report_plan_outcome", + "get_plan_audit_logs", + # Brand Rights + "get_brand_identity", + "get_rights", + "acquire_rights", + "update_rights", + # Content Standards + "list_content_standards", + "get_content_standards", + "create_content_standards", + "update_content_standards", + "calibrate_content", + "validate_content_delivery", + "get_media_buy_artifacts", + "get_creative_features", + # Property Lists + "create_property_list", + "update_property_list", + "get_property_list", + "list_property_lists", + "delete_property_list", + # Collection Lists + "create_collection_list", + "update_collection_list", + "get_collection_list", + "list_collection_lists", + "delete_collection_list", + } + assert PlatformHandler.advertised_tools == expected + + +@pytest.mark.parametrize( + "tool_name", + [ + "build_creative", + "preview_creative", + "get_creative_delivery", + "get_signals", + "activate_signal", + "sync_audiences", + "check_governance", + "sync_plans", + "report_plan_outcome", + "get_plan_audit_logs", + "get_brand_identity", + "get_rights", + "acquire_rights", + "update_rights", + "list_content_standards", + "get_content_standards", + "create_content_standards", + "update_content_standards", + "calibrate_content", + "validate_content_delivery", + "get_media_buy_artifacts", + "get_creative_features", + "create_property_list", + "update_property_list", + "get_property_list", + "list_property_lists", + "delete_property_list", + "create_collection_list", + "update_collection_list", + "get_collection_list", + "list_collection_lists", + "delete_collection_list", + ], +) +def test_handler_shim_method_exists(tool_name: str) -> None: + """Every advertised non-sales tool has a corresponding shim method + on PlatformHandler. Without this, ``tools/list`` advertises tools + the handler can't actually dispatch — buyer-facing 404.""" + assert hasattr(PlatformHandler, tool_name), ( + f"PlatformHandler is missing the {tool_name!r} shim — " "advertised but undispatchable." + ) + + +# ---- Shim dispatch via stub platforms ---- + + +@pytest.mark.asyncio +async def test_build_creative_shim_routes_to_platform(executor) -> None: + """End-to-end: shim → ``_invoke_platform_method`` → platform method.""" + captured: list[str] = [] + + class _CreativeBuilder(DecisioningPlatform): + capabilities = DecisioningCapabilities(specialisms=["creative-generative"]) + accounts = SingletonAccounts(account_id="hello") + + def build_creative(self, req, ctx): + captured.append("build_creative_called") + return {"creative_manifest": {"creative_id": "cr_1"}} + + handler = PlatformHandler( + _CreativeBuilder(), + executor=executor, + registry=InMemoryTaskRegistry(), + ) + # Use model_construct to bypass the wire-spec validation; the test + # is about shim routing, not request shape. + from adcp.types import BuildCreativeRequest + + req = BuildCreativeRequest.model_construct() + result = await handler.build_creative(req, ToolContext()) + assert captured == ["build_creative_called"] + assert result == {"creative_manifest": {"creative_id": "cr_1"}} + + +@pytest.mark.asyncio +async def test_get_signals_shim_routes_to_platform(executor) -> None: + captured: list[str] = [] + + class _SignalsAgent(DecisioningPlatform): + capabilities = DecisioningCapabilities(specialisms=["signal-marketplace"]) + accounts = SingletonAccounts(account_id="hello") + + def get_signals(self, req, ctx): + captured.append("get_signals_called") + return {"signals": []} + + def activate_signal(self, req, ctx): + return {} + + handler = PlatformHandler( + _SignalsAgent(), + executor=executor, + registry=InMemoryTaskRegistry(), + ) + from adcp.types import GetSignalsRequest + + result = await handler.get_signals(GetSignalsRequest.model_construct(), ToolContext()) + assert captured == ["get_signals_called"] + assert result == {"signals": []} + + +@pytest.mark.asyncio +async def test_sync_audiences_shim_arg_projects_audiences_list(executor) -> None: + """The ``sync_audiences`` wire request carries ``audiences[]`` but + the AudiencePlatform method signature is ``sync_audiences(audiences, + ctx)`` — adopter ergonomic. The shim arg-projects.""" + received_audiences: list = [] + + class _AudienceAgent(DecisioningPlatform): + capabilities = DecisioningCapabilities(specialisms=["audience-sync"]) + accounts = SingletonAccounts(account_id="hello") + + def sync_audiences(self, audiences, ctx): + received_audiences.extend(audiences) + return {"audiences": []} + + handler = PlatformHandler( + _AudienceAgent(), + executor=executor, + registry=InMemoryTaskRegistry(), + ) + from adcp.types import SyncAudiencesRequest + + fake_audiences = [{"audience_id": "a1"}, {"audience_id": "a2"}] + req = SyncAudiencesRequest.model_construct(audiences=fake_audiences) + await handler.sync_audiences(req, ToolContext()) + assert received_audiences == fake_audiences + + +@pytest.mark.asyncio +async def test_check_governance_shim_routes_to_platform(executor) -> None: + class _GovernanceAgent(DecisioningPlatform): + capabilities = DecisioningCapabilities( + specialisms=["governance-spend-authority"], + governance_aware=True, + ) + accounts = SingletonAccounts(account_id="hello") + + def check_governance(self, req, ctx): + return {"status": "approved"} + + def sync_plans(self, req, ctx): + return {} + + def report_plan_outcome(self, req, ctx): + return {} + + def get_plan_audit_logs(self, req, ctx): + return {} + + handler = PlatformHandler( + _GovernanceAgent(), + executor=executor, + registry=InMemoryTaskRegistry(), + ) + from adcp.types import CheckGovernanceRequest + + result = await handler.check_governance(CheckGovernanceRequest.model_construct(), ToolContext()) + assert result == {"status": "approved"} + + +@pytest.mark.asyncio +async def test_acquire_rights_shim_routes_to_platform(executor) -> None: + class _BrandRightsAgent(DecisioningPlatform): + capabilities = DecisioningCapabilities(specialisms=["brand-rights"]) + accounts = SingletonAccounts(account_id="hello") + + def get_brand_identity(self, req, ctx): + return {} + + def get_rights(self, req, ctx): + return {"rights": []} + + def acquire_rights(self, req, ctx): + return {"rights_id": "r_1", "status": "acquired"} + + handler = PlatformHandler( + _BrandRightsAgent(), + executor=executor, + registry=InMemoryTaskRegistry(), + ) + from adcp.types import AcquireRightsRequest + + result = await handler.acquire_rights(AcquireRightsRequest.model_construct(), ToolContext()) + assert result == {"rights_id": "r_1", "status": "acquired"} + + +@pytest.mark.asyncio +async def test_list_content_standards_shim_routes_to_platform(executor) -> None: + class _ContentStandardsAgent(DecisioningPlatform): + capabilities = DecisioningCapabilities(specialisms=["content-standards"]) + accounts = SingletonAccounts(account_id="hello") + + def list_content_standards(self, req, ctx): + return {"standards": []} + + def get_content_standards(self, req, ctx): + return {} + + def create_content_standards(self, req, ctx): + return {} + + def update_content_standards(self, req, ctx): + return {} + + def calibrate_content(self, req, ctx): + return {} + + def validate_content_delivery(self, req, ctx): + return {} + + handler = PlatformHandler( + _ContentStandardsAgent(), + executor=executor, + registry=InMemoryTaskRegistry(), + ) + from adcp.types import ListContentStandardsRequest + + result = await handler.list_content_standards( + ListContentStandardsRequest.model_construct(), ToolContext() + ) + assert result == {"standards": []} + + +@pytest.mark.asyncio +async def test_create_property_list_shim_routes_to_platform(executor) -> None: + class _PropertyListsAgent(DecisioningPlatform): + capabilities = DecisioningCapabilities(specialisms=["property-lists"]) + accounts = SingletonAccounts(account_id="hello") + + def create_property_list(self, req, ctx): + return {"list_id": "pl_1", "fetch_token": "tok_x"} + + def update_property_list(self, req, ctx): + return {} + + def get_property_list(self, req, ctx): + return {} + + def list_property_lists(self, req, ctx): + return {} + + def delete_property_list(self, req, ctx): + return {} + + handler = PlatformHandler( + _PropertyListsAgent(), + executor=executor, + registry=InMemoryTaskRegistry(), + ) + from adcp.types import CreatePropertyListRequest + + result = await handler.create_property_list( + CreatePropertyListRequest.model_construct(), ToolContext() + ) + assert result == {"list_id": "pl_1", "fetch_token": "tok_x"} + + +# ---- Optional-method UNSUPPORTED_FEATURE gate ---- + + +@pytest.mark.asyncio +async def test_preview_creative_unsupported_when_platform_lacks_method( + executor, +) -> None: + """``preview_creative`` is OPTIONAL on CreativeBuilderPlatform. + A platform claiming ``creative-generative`` without + ``preview_creative`` should surface ``UNSUPPORTED_FEATURE`` to the + buyer — NOT ``INTERNAL_ERROR`` (which is what AttributeError → + dispatch wrapper would produce without the runtime gate).""" + + class _BuilderWithoutPreview(DecisioningPlatform): + capabilities = DecisioningCapabilities(specialisms=["creative-generative"]) + accounts = SingletonAccounts(account_id="hello") + + def build_creative(self, req, ctx): + return {} + + # Deliberately no preview_creative — the Protocol marks it optional. + + handler = PlatformHandler( + _BuilderWithoutPreview(), + executor=executor, + registry=InMemoryTaskRegistry(), + ) + from adcp.types import PreviewCreativeRequest + + with pytest.raises(AdcpError) as exc_info: + await handler.preview_creative(PreviewCreativeRequest.model_construct(), ToolContext()) + assert exc_info.value.code == "UNSUPPORTED_FEATURE" + # Buyer-facing message points at the missing method. + assert "preview_creative" in str(exc_info.value) + + +@pytest.mark.asyncio +async def test_get_creative_features_unsupported_when_platform_lacks_method( + executor, +) -> None: + """``get_creative_features`` is OPTIONAL on + ContentStandardsPlatform — analyzer-pipeline-only adopters omit + it. Same UNSUPPORTED_FEATURE surface.""" + + class _ContentStandardsNoAnalyzer(DecisioningPlatform): + capabilities = DecisioningCapabilities(specialisms=["content-standards"]) + accounts = SingletonAccounts(account_id="hello") + + def list_content_standards(self, req, ctx): + return {} + + def get_content_standards(self, req, ctx): + return {} + + def create_content_standards(self, req, ctx): + return {} + + def update_content_standards(self, req, ctx): + return {} + + def calibrate_content(self, req, ctx): + return {} + + def validate_content_delivery(self, req, ctx): + return {} + + # Optional analyzer reads omitted. + + handler = PlatformHandler( + _ContentStandardsNoAnalyzer(), + executor=executor, + registry=InMemoryTaskRegistry(), + ) + from adcp.types import GetCreativeFeaturesRequest + + with pytest.raises(AdcpError) as exc_info: + await handler.get_creative_features( + GetCreativeFeaturesRequest.model_construct(), ToolContext() + ) + assert exc_info.value.code == "UNSUPPORTED_FEATURE" + + +# ---- AudioStack DX regression ---- + + +@pytest.mark.asyncio +async def test_audiostack_style_creative_generative_agent_dispatches(executor) -> None: + """Direct regression test for the Emma AudioStack DX failure: a + ``creative-generative`` agent claiming the slug + implementing + ``build_creative`` MUST be reachable via the shim. Pre-fix: + advertised but unrouted (404). Post-fix: end-to-end dispatch.""" + audiostack_calls: list[dict] = [] + + class _AudioStackSeller(DecisioningPlatform): + capabilities = DecisioningCapabilities( + specialisms=["creative-generative"], + channels=["audio"], + ) + accounts = SingletonAccounts(account_id="audiostack") + + def build_creative(self, req, ctx): + # Stub AudioStack call — the test is about the SDK + # dispatch, not the third-party API. + audiostack_calls.append({"req": req, "ctx_account_id": ctx.account.id}) + return { + "creative_manifest": { + "creative_id": "as_synthesized_001", + "format_id": "audio_30s", + "asset_url": "https://cdn.audiostack.ai/synth/001.mp3", + } + } + + handler = PlatformHandler( + _AudioStackSeller(), + executor=executor, + registry=InMemoryTaskRegistry(), + ) + # build_creative is in the advertised set + assert "build_creative" in handler.advertised_tools + + from adcp.types import BuildCreativeRequest + + req = BuildCreativeRequest.model_construct() + result = await handler.build_creative(req, ToolContext()) + + # The shim called through; AudioStack stub recorded the invocation. + assert len(audiostack_calls) == 1 + assert audiostack_calls[0]["ctx_account_id"].startswith("audiostack:") + # The wire envelope made it back. + assert result["creative_manifest"]["creative_id"] == "as_synthesized_001" + + +# ---- _project_build_creative arms ---- + + +def test_project_build_creative_passthrough_dict_envelope() -> None: + """Already-shaped envelope dict is unchanged.""" + envelope = {"creative_manifest": {"creative_id": "cr_1"}} + assert _project_build_creative(envelope) is envelope + + multi_envelope = {"creative_manifests": [{"creative_id": "cr_1"}]} + assert _project_build_creative(multi_envelope) is multi_envelope + + +def test_project_build_creative_passthrough_pydantic_envelope() -> None: + """A fully-shaped :class:`BuildCreativeSuccessResponse` (carries + ``creative_manifest``/``creative_manifests`` as attrs) is unchanged + — the shim preserves the typed return for response_validator + middleware.""" + + class _SuccessEnvelope: + creative_manifest = {"creative_id": "cr_1"} + + envelope = _SuccessEnvelope() + assert _project_build_creative(envelope) is envelope + + +def test_project_build_creative_wraps_bare_manifest() -> None: + """A bare :class:`CreativeManifest` (Pydantic model with + ``model_dump``) is wrapped into ``{creative_manifest: ...}``.""" + + class _Manifest: + def model_dump(self, mode: str = "json") -> dict: + return {"creative_id": "cr_1", "format_id": "audio_30s"} + + projected = _project_build_creative(_Manifest()) + assert projected == {"creative_manifest": {"creative_id": "cr_1", "format_id": "audio_30s"}} + + +def test_project_build_creative_wraps_list_into_multi_envelope() -> None: + """A ``Sequence[CreativeManifest]`` is wrapped into + ``{creative_manifests: [...]}``.""" + + class _Manifest: + def __init__(self, cid: str) -> None: + self.cid = cid + + def model_dump(self, mode: str = "json") -> dict: + return {"creative_id": self.cid} + + projected = _project_build_creative([_Manifest("a"), _Manifest("b")]) + assert projected == {"creative_manifests": [{"creative_id": "a"}, {"creative_id": "b"}]} + + +def test_project_build_creative_passes_through_unknown_shape() -> None: + """Adopters returning an unrecognized non-list, non-Pydantic shape + (rare — e.g., a string error sentinel) get a passthrough so the wire + validator can surface a precise mis-shape error.""" + sentinel = "weird_string_return" + assert _project_build_creative(sentinel) == sentinel + + +# ---- _project_sync_audiences arms ---- + + +def test_project_sync_audiences_wraps_list() -> None: + """A list of audience-result rows wraps into ``{audiences: [...]}``.""" + + class _Row: + def __init__(self, aid: str) -> None: + self.aid = aid + + def model_dump(self, mode: str = "json") -> dict: + return {"audience_id": self.aid} + + projected = _project_sync_audiences([_Row("a1"), _Row("a2")]) + assert projected == {"audiences": [{"audience_id": "a1"}, {"audience_id": "a2"}]} + + +def test_project_sync_audiences_passthrough_envelope_dict() -> None: + """Already-shaped envelope is unchanged.""" + envelope = {"audiences": [{"audience_id": "a1"}]} + assert _project_sync_audiences(envelope) is envelope + + +def test_project_sync_audiences_passthrough_dict_rows() -> None: + """List of plain dicts (no model_dump) — the row passthrough + inside the comprehension is exercised.""" + projected = _project_sync_audiences([{"audience_id": "a1"}]) + assert projected == {"audiences": [{"audience_id": "a1"}]} + + +# ---- build_creative gate when platform doesn't implement ---- + + +@pytest.mark.asyncio +async def test_build_creative_unsupported_when_platform_lacks_method(executor) -> None: + """A platform that doesn't implement ``build_creative`` (sales-only + adopter who ended up routing through here, e.g. via + ``advertise_all=True`` mis-configuration) surfaces + ``UNSUPPORTED_FEATURE`` rather than ``INTERNAL_ERROR`` from the + AttributeError wrapper.""" + + class _NoCreative(DecisioningPlatform): + capabilities = DecisioningCapabilities(specialisms=["sales-direct"]) + accounts = SingletonAccounts(account_id="hello") + + # Deliberately no build_creative. + + handler = PlatformHandler( + _NoCreative(), + executor=executor, + registry=InMemoryTaskRegistry(), + ) + from adcp.types import BuildCreativeRequest + + with pytest.raises(AdcpError) as exc_info: + await handler.build_creative(BuildCreativeRequest.model_construct(), ToolContext()) + assert exc_info.value.code == "UNSUPPORTED_FEATURE" + assert "build_creative" in str(exc_info.value) + + +# ---- update_rights shim routes through ---- + + +@pytest.mark.asyncio +async def test_update_rights_shim_routes_to_platform(executor) -> None: + """Brand rights includes ``update_rights`` (extend term, change + scope, revoke). Routes through with no account on the wire.""" + + class _BrandRightsAgent(DecisioningPlatform): + capabilities = DecisioningCapabilities(specialisms=["brand-rights"]) + accounts = SingletonAccounts(account_id="hello") + + def get_brand_identity(self, req, ctx): + return {} + + def get_rights(self, req, ctx): + return {} + + def acquire_rights(self, req, ctx): + return {} + + def update_rights(self, req, ctx): + return {"rights_id": "r_1", "status": "updated"} + + handler = PlatformHandler( + _BrandRightsAgent(), + executor=executor, + registry=InMemoryTaskRegistry(), + ) + from adcp.types import UpdateRightsRequest + + result = await handler.update_rights(UpdateRightsRequest.model_construct(), ToolContext()) + assert result == {"rights_id": "r_1", "status": "updated"} + + +# ---- F12 auto-emit on new webhook-eligible shims ---- + + +def _push_config_params(req_cls, *, url: str = "https://buyer.example.com/wh", **extra): + """Build a request via ``model_construct`` carrying + ``push_notification_config`` so the auto-emit gate fires.""" + + class _Config: + pass + + cfg = _Config() + cfg.url = url + cfg.token = None + return req_cls.model_construct(push_notification_config=cfg, **extra) + + +@pytest.mark.asyncio +async def test_get_signals_auto_emits_completion_webhook(executor) -> None: + """``get_signals`` is in :data:`SPEC_WEBHOOK_TASK_TYPES`. With a + buyer-supplied ``push_notification_config.url``, the shim must + auto-emit a sync-completion webhook after the platform returns.""" + sender = AsyncMock() + + class _SignalsAgent(DecisioningPlatform): + capabilities = DecisioningCapabilities(specialisms=["signal-marketplace"]) + accounts = SingletonAccounts(account_id="hello") + + def get_signals(self, req, ctx): + return {"signals": [{"signal_id": "s1"}]} + + def activate_signal(self, req, ctx): + return {} + + handler = PlatformHandler( + _SignalsAgent(), + executor=executor, + registry=InMemoryTaskRegistry(), + webhook_sender=sender, + auto_emit_completion_webhooks=True, + ) + from adcp.types import GetSignalsRequest + + req = _push_config_params(GetSignalsRequest) + await handler.get_signals(req, ToolContext()) + while _BACKGROUND_WEBHOOK_TASKS: + await asyncio.sleep(0) + + sender.send_mcp.assert_awaited_once() + call_kwargs = sender.send_mcp.await_args.kwargs + assert call_kwargs["task_type"] == "get_signals" + assert call_kwargs["status"] == "completed" + assert call_kwargs["result"] == {"signals": [{"signal_id": "s1"}]} + + +@pytest.mark.asyncio +async def test_acquire_rights_auto_emits_completion_webhook(executor) -> None: + """``acquire_rights`` is in the spec enum; auto-emit fires.""" + sender = AsyncMock() + + class _BrandRights(DecisioningPlatform): + capabilities = DecisioningCapabilities(specialisms=["brand-rights"]) + accounts = SingletonAccounts(account_id="hello") + + def get_brand_identity(self, req, ctx): + return {} + + def get_rights(self, req, ctx): + return {} + + def acquire_rights(self, req, ctx): + return {"rights_id": "r1", "status": "acquired"} + + handler = PlatformHandler( + _BrandRights(), + executor=executor, + registry=InMemoryTaskRegistry(), + webhook_sender=sender, + ) + from adcp.types import AcquireRightsRequest + + req = _push_config_params(AcquireRightsRequest) + await handler.acquire_rights(req, ToolContext()) + while _BACKGROUND_WEBHOOK_TASKS: + await asyncio.sleep(0) + + sender.send_mcp.assert_awaited_once() + assert sender.send_mcp.await_args.kwargs["task_type"] == "acquire_rights" + + +@pytest.mark.asyncio +async def test_sync_audiences_auto_emits_with_projected_envelope(executor) -> None: + """``sync_audiences`` returns a list arm from the platform; the + shim projects to ``{audiences: [...]}`` AND auto-emits the + projected (envelope) shape on the webhook ``result`` field — + receivers see the wire envelope, not the bare list.""" + sender = AsyncMock() + + class _AudienceAgent(DecisioningPlatform): + capabilities = DecisioningCapabilities(specialisms=["audience-sync"]) + accounts = SingletonAccounts(account_id="hello") + + def sync_audiences(self, audiences, ctx): + # Return the bare-list ergonomic arm (not the envelope). + return [{"audience_id": "a1", "status": "deployed"}] + + handler = PlatformHandler( + _AudienceAgent(), + executor=executor, + registry=InMemoryTaskRegistry(), + webhook_sender=sender, + ) + from adcp.types import SyncAudiencesRequest + + req = _push_config_params(SyncAudiencesRequest, audiences=[{"audience_id": "a1"}]) + result = await handler.sync_audiences(req, ToolContext()) + while _BACKGROUND_WEBHOOK_TASKS: + await asyncio.sleep(0) + + # Shim's return is the envelope. + assert result == {"audiences": [{"audience_id": "a1", "status": "deployed"}]} + sender.send_mcp.assert_awaited_once() + # Webhook receives the envelope, not the bare list. + assert sender.send_mcp.await_args.kwargs["task_type"] == "sync_audiences" + assert sender.send_mcp.await_args.kwargs["result"] == result + + +@pytest.mark.asyncio +async def test_property_list_ops_dont_auto_emit_because_schema_forbids_push_notif( + executor, +) -> None: + """Property-list request schemas declare ``additionalProperties: + false`` and don't include ``push_notification_config`` — the wire + forbids buyers from registering a webhook URL on these ops, so + the F12 auto-emit gate naturally skips. The shim still calls + :meth:`_maybe_auto_emit_sync_completion` defensively (mirrors the + sales-* pattern), so a future schema change that adds push-notif + would activate auto-emit without further shim wiring. + + This test pins the current state: zero webhook deliveries on the + property-list dispatch path. If + ``schemas/cache/property/create-property-list-request.json`` ever + grows ``push_notification_config``, this test will surface that as + expected behavior change and the assertion needs to flip. + """ + sender = AsyncMock() + + class _PropAgent(DecisioningPlatform): + capabilities = DecisioningCapabilities(specialisms=["property-lists"]) + accounts = SingletonAccounts(account_id="hello") + + def create_property_list(self, req, ctx): + return {"list_id": "pl1", "fetch_token": "tok"} + + def update_property_list(self, req, ctx): + return {} + + def get_property_list(self, req, ctx): + return {} + + def list_property_lists(self, req, ctx): + return {} + + def delete_property_list(self, req, ctx): + return {} + + handler = PlatformHandler( + _PropAgent(), + executor=executor, + registry=InMemoryTaskRegistry(), + webhook_sender=sender, + ) + from adcp.types import CreatePropertyListRequest + + # ``model_construct`` strips the kwarg because the schema is + # ``extra: forbid`` — we end up with a request that has no + # ``push_notification_config`` attr at all, exactly matching + # production wire behavior. + req = _push_config_params(CreatePropertyListRequest) + assert not hasattr(req, "push_notification_config") + await handler.create_property_list(req, ToolContext()) + while _BACKGROUND_WEBHOOK_TASKS: + await asyncio.sleep(0) + + sender.send_mcp.assert_not_called() + + +@pytest.mark.asyncio +async def test_get_creative_delivery_auto_emits_completion_webhook(executor) -> None: + """``get_creative_delivery`` is in :data:`SPEC_WEBHOOK_TASK_TYPES` + and its wire schema allows ``push_notification_config`` (additional + properties: true). With a buyer-supplied URL the shim fires a + sync-completion webhook.""" + sender = AsyncMock() + + class _AdServer(DecisioningPlatform): + capabilities = DecisioningCapabilities(specialisms=["creative-ad-server"]) + accounts = SingletonAccounts(account_id="hello") + + def build_creative(self, req, ctx): + return {} + + def preview_creative(self, req, ctx): + return {} + + def get_creative_delivery(self, req, ctx): + return {"creatives": [{"creative_id": "c1", "impressions": 100}]} + + handler = PlatformHandler( + _AdServer(), + executor=executor, + registry=InMemoryTaskRegistry(), + webhook_sender=sender, + ) + from adcp.types import GetCreativeDeliveryRequest + + req = _push_config_params(GetCreativeDeliveryRequest) + await handler.get_creative_delivery(req, ToolContext()) + while _BACKGROUND_WEBHOOK_TASKS: + await asyncio.sleep(0) + + sender.send_mcp.assert_awaited_once() + assert sender.send_mcp.await_args.kwargs["task_type"] == "get_creative_delivery" + + +@pytest.mark.asyncio +async def test_update_rights_does_not_auto_emit(executor) -> None: + """``update_rights`` is NOT in :data:`SPEC_WEBHOOK_TASK_TYPES` — the + spec enum freezes at the closed 20-value set per + ``schemas/cache/enums/task-type.json``. Adding it requires a + cross-language pin bump; until then, the shim's no-auto-emit + behavior is the correct posture (skip + warn). Without this guard + a buyer registering a webhook URL on ``update_rights`` would see + a webhook the spec enum doesn't allow, and conformant verifiers + would reject it. + """ + sender = AsyncMock() + + class _BrandRights(DecisioningPlatform): + capabilities = DecisioningCapabilities(specialisms=["brand-rights"]) + accounts = SingletonAccounts(account_id="hello") + + def get_brand_identity(self, req, ctx): + return {} + + def get_rights(self, req, ctx): + return {} + + def acquire_rights(self, req, ctx): + return {} + + def update_rights(self, req, ctx): + return {} + + handler = PlatformHandler( + _BrandRights(), + executor=executor, + registry=InMemoryTaskRegistry(), + webhook_sender=sender, + ) + from adcp.types import UpdateRightsRequest + + req = _push_config_params(UpdateRightsRequest) + await handler.update_rights(req, ToolContext()) + while _BACKGROUND_WEBHOOK_TASKS: + await asyncio.sleep(0) + + sender.send_mcp.assert_not_called()