Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,36 @@ else:
| `UNAUTHORIZED` | Missing/invalid auth | Check auth token |
| `RATE_LIMITED` | Too many requests | Retry with backoff |

## Optional Protocol Extensions

These protocols are opt-in upgrades for `DecisioningPlatform` subclasses. Implement them to unlock advanced framework behaviour.

| Protocol | Method | Domain | Description |
|---|---|---|---|
| `IncrementalGetProducts` | `get_products_incremental` | media_buy | **Protocol declaration only — dispatch path ships in a follow-up to #495.** Implement to stream partial results on `time_budget` timeout; the framework will collect yielded batches until the deadline and project `incomplete[]` for unfinished scopes. Until the dispatch path lands, implementing this Protocol has no effect — timeouts still return `products: []` + `incomplete: [{scope: 'products'}]`. |

Import from `adcp.decisioning`:

```python
from adcp.decisioning import IncrementalGetProducts, ProductsCheckpoint
from adcp.types import GetProductsRequest
from adcp.decisioning import RequestContext
from typing import AsyncIterator

class MySeller(DecisioningPlatform, IncrementalGetProducts):
async def get_products_incremental(
self,
req: GetProductsRequest,
ctx: RequestContext,
checkpoint: ProductsCheckpoint,
) -> AsyncIterator[dict]:
for batch in self._stream_products(req):
checkpoint.add_batch(batch)
yield batch
```

**Note:** `get_products_incremental` MUST be an async generator (`async def` with `yield`). Detection uses `asyncio.isasyncgenfunction`. When `req.time_budget.unit == 'campaign'`, no SDK-managed deadline is installed; the adopter decides timing.

## DX Helpers (adcp.server.helpers)

Eliminate boilerplate in handler code. Import from `adcp.server` or `adcp.server.helpers`.
Expand Down
10 changes: 10 additions & 0 deletions src/adcp/decisioning/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,12 @@ def create_media_buy(
TaskState,
)
from adcp.decisioning.tenant_store import create_tenant_store
from adcp.decisioning.time_budget import (
IncrementalGetProducts,
ProductsCheckpoint,
project_incomplete_response,
resolve_time_budget,
)
from adcp.decisioning.translation import (
TranslationMap,
create_translation_map,
Expand Down Expand Up @@ -290,6 +296,7 @@ def __init__(self, *args: object, **kwargs: object) -> None:
"GOVERNANCE_SPECIALISMS",
"GovernanceContextJWS",
"HttpSigCredential",
"IncrementalGetProducts",
"InMemoryMockAdServer",
"InMemoryTaskRegistry",
"MEDIA_BUY_TRANSITIONS",
Expand All @@ -308,6 +315,7 @@ def __init__(self, *args: object, **kwargs: object) -> None:
"PropertyListFetcher",
"PropertyListReference",
"ProductConfigStore",
"ProductsCheckpoint",
"property_list_capability_enabled",
"PropertyListsPlatform",
"filter_products_by_property_list",
Expand Down Expand Up @@ -362,7 +370,9 @@ def __init__(self, *args: object, **kwargs: object) -> None:
"mixed_registry",
"project_account_for_response",
"project_business_entity_for_response",
"project_incomplete_response",
"ref_account_id",
"resolve_time_budget",
"serve",
"signing_only_registry",
"to_wire_account",
Expand Down
53 changes: 43 additions & 10 deletions src/adcp/decisioning/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
maybe_apply_property_list_filter,
property_list_capability_enabled,
)
from adcp.decisioning.time_budget import project_incomplete_response, resolve_time_budget
from adcp.decisioning.webhook_emit import maybe_emit_sync_completion
from adcp.server.base import ADCPHandler, ToolContext

Expand Down Expand Up @@ -1072,17 +1073,49 @@ async def get_products( # type: ignore[override]
tool_ctx = context or ToolContext()
account = await self._resolve_account(params.account, tool_ctx)
ctx = self._build_ctx(tool_ctx, account)
response = cast(
"GetProductsResponse",
await _invoke_platform_method(
self._platform,
"get_products",
params,
ctx,
executor=self._executor,
registry=self._registry,
),
# Resolve time_budget to a seconds deadline. _resolve_account and
# _build_ctx are intentionally outside this try/except so their
# AdcpErrors propagate unmodified; only the platform call is deadline-
# wrapped.
deadline = resolve_time_budget(params.time_budget)
coro = _invoke_platform_method(
self._platform,
"get_products",
params,
ctx,
executor=self._executor,
registry=self._registry,
)
try:
result = await (
asyncio.wait_for(coro, timeout=deadline) if deadline is not None else coro
)
except asyncio.TimeoutError:
# Deadline expired. The platform coroutine is cancelled; for
# sync adopters the underlying thread runs to completion but the
# asyncio side has moved on (thread-pool slot leak documented in
# adcp.decisioning.time_budget module header).
tb = params.time_budget
interval = tb.interval if tb is not None else 0
unit_raw = tb.unit if tb is not None else None
unit = (
(unit_raw.value if hasattr(unit_raw, "value") else str(unit_raw))
if unit_raw is not None
else "unknown"
)
logger.warning(
"[adcp.decisioning] get_products timed out after %ds "
"(time_budget=%d %s); returning incomplete response. "
"To avoid timeout cancellations, optimise get_products "
"latency or reduce the platform's search scope.",
deadline,
interval,
unit,
)
return GetProductsResponse.model_validate(
project_incomplete_response(interval=interval, unit=unit)
)
response = cast("GetProductsResponse", result)
# Post-adapter: capability-gated property-list filter.
response = cast(
"GetProductsResponse",
Expand Down
Loading
Loading