diff --git a/AGENTS.md b/AGENTS.md index b5d664068..7797ef372 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -97,6 +97,36 @@ else: | `UNAUTHORIZED` | Missing/invalid auth | Check auth token | | `RATE_LIMITED` | Too many requests | Retry with backoff | +## Optional Protocol Extensions + +These protocols are opt-in upgrades for `DecisioningPlatform` subclasses. Implement them to unlock advanced framework behaviour. + +| Protocol | Method | Domain | Description | +|---|---|---|---| +| `IncrementalGetProducts` | `get_products_incremental` | media_buy | **Protocol declaration only — dispatch path ships in a follow-up to #495.** Implement to stream partial results on `time_budget` timeout; the framework will collect yielded batches until the deadline and project `incomplete[]` for unfinished scopes. Until the dispatch path lands, implementing this Protocol has no effect — timeouts still return `products: []` + `incomplete: [{scope: 'products'}]`. | + +Import from `adcp.decisioning`: + +```python +from adcp.decisioning import IncrementalGetProducts, ProductsCheckpoint +from adcp.types import GetProductsRequest +from adcp.decisioning import RequestContext +from typing import AsyncIterator + +class MySeller(DecisioningPlatform, IncrementalGetProducts): + async def get_products_incremental( + self, + req: GetProductsRequest, + ctx: RequestContext, + checkpoint: ProductsCheckpoint, + ) -> AsyncIterator[dict]: + for batch in self._stream_products(req): + checkpoint.add_batch(batch) + yield batch +``` + +**Note:** `get_products_incremental` MUST be an async generator (`async def` with `yield`). Detection uses `asyncio.isasyncgenfunction`. When `req.time_budget.unit == 'campaign'`, no SDK-managed deadline is installed; the adopter decides timing. + ## DX Helpers (adcp.server.helpers) Eliminate boilerplate in handler code. Import from `adcp.server` or `adcp.server.helpers`. diff --git a/src/adcp/decisioning/__init__.py b/src/adcp/decisioning/__init__.py index 1e0975037..cc2c2db9b 100644 --- a/src/adcp/decisioning/__init__.py +++ b/src/adcp/decisioning/__init__.py @@ -188,6 +188,12 @@ def create_media_buy( TaskState, ) from adcp.decisioning.tenant_store import create_tenant_store +from adcp.decisioning.time_budget import ( + IncrementalGetProducts, + ProductsCheckpoint, + project_incomplete_response, + resolve_time_budget, +) from adcp.decisioning.translation import ( TranslationMap, create_translation_map, @@ -290,6 +296,7 @@ def __init__(self, *args: object, **kwargs: object) -> None: "GOVERNANCE_SPECIALISMS", "GovernanceContextJWS", "HttpSigCredential", + "IncrementalGetProducts", "InMemoryMockAdServer", "InMemoryTaskRegistry", "MEDIA_BUY_TRANSITIONS", @@ -308,6 +315,7 @@ def __init__(self, *args: object, **kwargs: object) -> None: "PropertyListFetcher", "PropertyListReference", "ProductConfigStore", + "ProductsCheckpoint", "property_list_capability_enabled", "PropertyListsPlatform", "filter_products_by_property_list", @@ -362,7 +370,9 @@ def __init__(self, *args: object, **kwargs: object) -> None: "mixed_registry", "project_account_for_response", "project_business_entity_for_response", + "project_incomplete_response", "ref_account_id", + "resolve_time_budget", "serve", "signing_only_registry", "to_wire_account", diff --git a/src/adcp/decisioning/handler.py b/src/adcp/decisioning/handler.py index c8a5336ac..0e3d5599f 100644 --- a/src/adcp/decisioning/handler.py +++ b/src/adcp/decisioning/handler.py @@ -49,6 +49,7 @@ maybe_apply_property_list_filter, property_list_capability_enabled, ) +from adcp.decisioning.time_budget import project_incomplete_response, resolve_time_budget from adcp.decisioning.webhook_emit import maybe_emit_sync_completion from adcp.server.base import ADCPHandler, ToolContext @@ -1072,17 +1073,49 @@ async def get_products( # type: ignore[override] tool_ctx = context or ToolContext() account = await self._resolve_account(params.account, tool_ctx) ctx = self._build_ctx(tool_ctx, account) - response = cast( - "GetProductsResponse", - await _invoke_platform_method( - self._platform, - "get_products", - params, - ctx, - executor=self._executor, - registry=self._registry, - ), + # Resolve time_budget to a seconds deadline. _resolve_account and + # _build_ctx are intentionally outside this try/except so their + # AdcpErrors propagate unmodified; only the platform call is deadline- + # wrapped. + deadline = resolve_time_budget(params.time_budget) + coro = _invoke_platform_method( + self._platform, + "get_products", + params, + ctx, + executor=self._executor, + registry=self._registry, ) + try: + result = await ( + asyncio.wait_for(coro, timeout=deadline) if deadline is not None else coro + ) + except asyncio.TimeoutError: + # Deadline expired. The platform coroutine is cancelled; for + # sync adopters the underlying thread runs to completion but the + # asyncio side has moved on (thread-pool slot leak documented in + # adcp.decisioning.time_budget module header). + tb = params.time_budget + interval = tb.interval if tb is not None else 0 + unit_raw = tb.unit if tb is not None else None + unit = ( + (unit_raw.value if hasattr(unit_raw, "value") else str(unit_raw)) + if unit_raw is not None + else "unknown" + ) + logger.warning( + "[adcp.decisioning] get_products timed out after %ds " + "(time_budget=%d %s); returning incomplete response. " + "To avoid timeout cancellations, optimise get_products " + "latency or reduce the platform's search scope.", + deadline, + interval, + unit, + ) + return GetProductsResponse.model_validate( + project_incomplete_response(interval=interval, unit=unit) + ) + response = cast("GetProductsResponse", result) # Post-adapter: capability-gated property-list filter. response = cast( "GetProductsResponse", diff --git a/src/adcp/decisioning/time_budget.py b/src/adcp/decisioning/time_budget.py new file mode 100644 index 000000000..7822e6352 --- /dev/null +++ b/src/adcp/decisioning/time_budget.py @@ -0,0 +1,265 @@ +"""time_budget deadline wrapper for get_products. + +When ``GetProductsRequest.time_budget`` is set, the framework installs an +``asyncio.wait_for`` deadline around the adopter's ``get_products`` call and +projects the wire-compliant ``incomplete[]`` shape on exhaustion. + +Design principles +----------------- +* **Non-intrusive.** The wrapper lives in ``handler.py``'s ``get_products`` + shim, not inside ``_invoke_platform_method``. That keeps the shared + dispatcher clean and the concern local — the same posture as + ``webhook_emit.py`` adding behaviour at the call seam. + +* **Cancel-safe.** ``get_products`` is a pure read — no ``TaskRegistry`` + allocations occur inside ``_invoke_platform_method`` for this tool. + ``asyncio.CancelledError`` (a ``BaseException`` on Python 3.8+) propagates + past the ``except Exception`` in ``_invoke_platform_method`` cleanly. This + invariant MUST be preserved if ``get_products`` ever gains registry work. + +* **Thread-pool warning for sync adopters.** When a sync adopter runs via + ``loop.run_in_executor`` and ``asyncio.wait_for`` fires, the asyncio side + moves on but the underlying thread continues until its blocking call + returns. No Python mechanism can interrupt a running thread. The pool + slot is occupied for the full duration; on a short-budget burst against a + slow sync adopter this can exhaust the pool. Async adopters are + unaffected. Adopters who need to co-operate with deadline cancellation + should implement the ``IncrementalGetProducts`` protocol or migrate to an + async ``get_products``. + +* **``campaign`` unit → no SDK-managed deadline.** ``unit='campaign'`` means + "the seller has the full campaign flight to respond" — this is a + semantically distinct signal (not the same as omitting ``time_budget`` + entirely). The SDK does NOT install a deadline for campaign-scoped + requests; the raw ``time_budget`` value still reaches the adopter via + ``params`` so they can honour it at the application level if desired. + +* **``_invoke_platform_method`` MUST NOT catch TimeoutError.** The + ``except Exception`` in ``dispatch._invoke_platform_method`` catches + ``asyncio.TimeoutError`` (MRO: TimeoutError → OSError → Exception) on + Python < 3.11. The ``wait_for`` is placed in ``handler.get_products`` + *outside* ``_invoke_platform_method``'s try/except to ensure + ``TimeoutError`` reaches the shim-level handler. If a future refactor + moves the deadline inside ``_invoke_platform_method``, the ``except + Exception`` at ``dispatch.py:1134`` must explicitly re-raise + ``asyncio.TimeoutError`` before the generic handler fires. + +Optional incremental protocol +------------------------------ +``IncrementalGetProducts`` is an opt-in upgrade. When the adopter implements +it alongside ``get_products``, the framework (in a follow-up) routes +``get_products`` calls through the streaming path, collecting partial batches +until the deadline and projecting any unfinished scopes to ``incomplete[]``. +Without it, a timeout produces ``products: []`` + ``incomplete[{scope: +'products'}]``. + +Expose the Protocol now so adopters and code generators can reference it via +``from adcp.decisioning import IncrementalGetProducts``; the dispatch routing +for the streaming path ships when the second adopter adopts the wire shape. +""" + +from __future__ import annotations + +import logging +from collections.abc import AsyncIterator +from typing import TYPE_CHECKING, Any, Protocol, runtime_checkable + +if TYPE_CHECKING: + from adcp.decisioning.context import RequestContext + from adcp.types import GetProductsRequest + +logger = logging.getLogger(__name__) + +# ---- Unit conversion ---- + +_UNIT_TO_SECONDS: dict[str, float] = { + "seconds": 1.0, + "minutes": 60.0, + "hours": 3600.0, + "days": 86400.0, +} + + +def resolve_time_budget(time_budget: Any) -> float | None: + """Convert ``GetProductsRequest.time_budget`` to a seconds deadline. + + Returns ``None`` when: + - ``time_budget`` is ``None`` (field absent) — no deadline. + - ``unit == 'campaign'`` — seller decides timing; SDK does not install a + deadline. The raw ``time_budget`` value still reaches the adopter via + ``params``. + + For all other units (seconds / minutes / hours / days), returns + ``interval * unit_seconds`` as a positive ``float``. + """ + if time_budget is None: + return None + + unit = getattr(time_budget, "unit", None) + if unit is None: + # Tolerate plain dicts (test fixtures, future schema variants). + unit = time_budget.get("unit") if isinstance(time_budget, dict) else None + if unit is None: + return None + + # Normalise enum to string. + unit_str = unit.value if hasattr(unit, "value") else str(unit) + + if unit_str == "campaign": + # Semantically distinct from "omitted" — the seller has the full + # campaign flight. Log at DEBUG so adopters see the explicit skip. + logger.debug( + "[adcp.decisioning] time_budget unit='campaign' — " + "no SDK-managed deadline; adopter decides timing." + ) + return None + + factor = _UNIT_TO_SECONDS.get(unit_str) + if factor is None: + logger.warning( + "[adcp.decisioning] Unrecognised time_budget unit %r — " + "treating as no deadline.", + unit_str, + ) + return None + + interval = getattr(time_budget, "interval", None) + if interval is None and isinstance(time_budget, dict): + interval = time_budget.get("interval") + if not isinstance(interval, int) or interval < 1: + logger.warning( + "[adcp.decisioning] Invalid time_budget interval %r — " + "treating as no deadline.", + interval, + ) + return None + + return float(interval) * factor + + +def project_incomplete_response(*, interval: int, unit: str) -> dict[str, Any]: + """Build the wire-compliant timeout response dict. + + Returns ``{"products": [], "incomplete": [{scope, description, estimated_wait}]}`` + with ``incomplete`` containing at least one entry (``min_length=1`` on + the wire schema). Uses a raw dict to stay above the import-layering + boundary (only ``stable.py`` / ``aliases.py`` / ``_ergonomic.py`` may + import from ``adcp.types._generated``). + + ``scope`` is ``"products"`` — the spec's "not all inventory sources were + searched" scope, which is accurate for a full-search timeout. When the + deadline fires before ``_invoke_platform_method`` returns, the seller + genuinely does not know whether pricing / forecast / proposals would have + been attempted; ``scope='products'`` is the minimal correct signal. A + follow-up that ships the incremental protocol can enumerate additional + scopes when the adopter provides partial data. + + ``estimated_wait`` is omitted (``None``) when the plain adopter path is + used, since the framework has no visibility into how much longer the call + would have taken. + """ + description = ( + f"time_budget exhausted ({interval} {unit}); " + "return the best results achievable within the budget. " + "Retry with a larger time_budget to receive complete results." + ) + return { + "products": [], + "incomplete": [ + { + "scope": "products", + "description": description, + "estimated_wait": None, + } + ], + } + + +# ---- Optional incremental protocol ---- + + +class ProductsCheckpoint: + """Accumulates partial ``GetProductsResponse`` batches during streaming. + + Passed to ``IncrementalGetProducts.get_products_incremental`` so the + framework can collect whatever batches the adopter yields before the + deadline. The framework reads ``checkpoint.batches`` after timeout to + project ``products`` and ``incomplete[]``. + + Adopters do not instantiate this directly — the framework creates it and + passes it in. + """ + + def __init__(self) -> None: + self.batches: list[dict[str, Any]] = [] + + def add_batch(self, batch: dict[str, Any]) -> None: + """Record a partial response batch.""" + self.batches.append(batch) + + +@runtime_checkable +class IncrementalGetProducts(Protocol): + """Optional upgrade protocol for streaming partial get_products results. + + **Status: Protocol declaration only.** The dispatch routing for this + path ships in a follow-up to issue #495. Implementing this Protocol + today has no runtime effect — the framework still calls the plain + ``get_products`` method. Declare it now so adopters and code generators + can reference the type; re-enable via the follow-up PR once the dispatch + branch is wired. + + When the dispatch path lands: the framework routes ``get_products`` + calls through this streaming path instead of the plain method. Batches + are collected until the ``time_budget`` deadline; remaining scopes are + projected to ``incomplete[]``. + + Until then, a ``time_budget`` timeout returns + ``products: []`` + ``incomplete: [{scope: 'products'}]``. + + Usage:: + + from adcp.decisioning import IncrementalGetProducts, ProductsCheckpoint + from adcp.types import GetProductsRequest + from adcp.decisioning import RequestContext + from typing import AsyncIterator + + class MySeller(DecisioningPlatform, IncrementalGetProducts): + async def get_products_incremental( + self, + req: GetProductsRequest, + ctx: RequestContext, + checkpoint: ProductsCheckpoint, + ) -> AsyncIterator[dict]: + for batch in self._stream_products(req): + checkpoint.add_batch(batch) + yield batch + + Note: ``get_products_incremental`` MUST be an ``async def`` that yields — + i.e., an async generator function. The framework detects it via + ``asyncio.isasyncgenfunction``, not ``asyncio.iscoroutinefunction``. If + your data source is synchronous, wrap the yield in an ``async def`` body + rather than returning a sync generator. + + ``campaign`` unit: if ``req.time_budget.unit == 'campaign'``, the + framework does not install a deadline; this method is called the same as + the plain path, and the adopter may yield indefinitely (within the + campaign flight window). + """ + + async def get_products_incremental( + self, + req: GetProductsRequest, + ctx: RequestContext, + checkpoint: ProductsCheckpoint, + ) -> AsyncIterator[dict[str, Any]]: + """Yield partial product batches until complete or deadline fires.""" + ... + + +__all__ = [ + "IncrementalGetProducts", + "ProductsCheckpoint", + "project_incomplete_response", + "resolve_time_budget", +] diff --git a/tests/test_time_budget.py b/tests/test_time_budget.py new file mode 100644 index 000000000..d8b08ebd4 --- /dev/null +++ b/tests/test_time_budget.py @@ -0,0 +1,316 @@ +"""Tests for adcp.decisioning.time_budget deadline wrapper. + +Covers: +- resolve_time_budget unit conversion (seconds / minutes / hours / days) +- resolve_time_budget campaign→None and absent→None +- project_incomplete_response wire shape (min_length=1, scope='products') +- get_products shim: short budget against slow adapter → incomplete[] +- get_products shim: within-budget adapter → unchanged response +- get_products shim: absent time_budget → no deadline +- get_products shim: campaign unit → no deadline +- IncrementalGetProducts / ProductsCheckpoint importable from adcp.decisioning +""" + +from __future__ import annotations + +import asyncio +from concurrent.futures import ThreadPoolExecutor +from unittest.mock import AsyncMock + +import pytest + +from adcp.decisioning import ( + DecisioningCapabilities, + DecisioningPlatform, + IncrementalGetProducts, + InMemoryTaskRegistry, + ProductsCheckpoint, + SingletonAccounts, +) +from adcp.decisioning.handler import PlatformHandler +from adcp.decisioning.time_budget import ( + project_incomplete_response, + resolve_time_budget, +) +from adcp.server.base import ToolContext +from adcp.types import GetProductsRequest + + +# --------------------------------------------------------------------------- +# resolve_time_budget +# --------------------------------------------------------------------------- + + +def test_resolve_time_budget_none_input(): + assert resolve_time_budget(None) is None + + +@pytest.mark.parametrize( + "unit, interval, expected", + [ + ("seconds", 5, 5.0), + ("minutes", 2, 120.0), + ("hours", 1, 3600.0), + ("days", 1, 86400.0), + ], +) +def test_resolve_time_budget_unit_conversion(unit, interval, expected): + tb = _make_time_budget(interval=interval, unit=unit) + assert resolve_time_budget(tb) == expected + + +def test_resolve_time_budget_campaign_returns_none(): + """unit='campaign' must produce None (no SDK-managed deadline).""" + tb = _make_time_budget(interval=1, unit="campaign") + assert resolve_time_budget(tb) is None + + +def test_resolve_time_budget_enum_unit(): + """Enum-valued unit is normalised to string via .value.""" + + class FakeUnit: + value = "minutes" + + class FakeTB: + unit = FakeUnit() + interval = 3 + + assert resolve_time_budget(FakeTB()) == 180.0 + + +def test_resolve_time_budget_plain_dict(): + tb = {"interval": 10, "unit": "seconds"} + assert resolve_time_budget(tb) == 10.0 + + +def test_resolve_time_budget_unknown_unit_returns_none(caplog): + tb = _make_time_budget(interval=1, unit="lightyears") + result = resolve_time_budget(tb) + assert result is None + assert "Unrecognised time_budget unit" in caplog.text + + +# --------------------------------------------------------------------------- +# project_incomplete_response +# --------------------------------------------------------------------------- + + +def test_project_incomplete_response_shape(): + resp = project_incomplete_response(interval=5, unit="seconds") + assert resp["products"] == [] + # min_length=1 on the wire schema — must have at least one entry + assert len(resp["incomplete"]) >= 1 + item = resp["incomplete"][0] + assert item["scope"] == "products" + assert isinstance(item["description"], str) and item["description"] + assert "estimated_wait" in item + + +def test_project_incomplete_response_contains_budget_info(): + resp = project_incomplete_response(interval=30, unit="minutes") + description = resp["incomplete"][0]["description"] + assert "30" in description + assert "minutes" in description + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture +def executor(): + pool = ThreadPoolExecutor(max_workers=4, thread_name_prefix="test-tb-") + yield pool + pool.shutdown(wait=True) + + +def _make_platform_with_delay(delay: float): + """Return a DecisioningPlatform whose get_products sleeps for delay seconds.""" + + class _SlowSeller(DecisioningPlatform): + capabilities = DecisioningCapabilities(specialisms=["sales-non-guaranteed"]) + accounts = SingletonAccounts(account_id="test") + + async def get_products(self, req, ctx): + await asyncio.sleep(delay) + return {"products": [{"product_id": "p1", "name": "Product 1"}]} + + return _SlowSeller() + + +def _make_time_budget(*, interval: int, unit: str): + """Create a minimal time_budget-like object.""" + + class TB: + pass + + tb = TB() + tb.interval = interval # type: ignore[attr-defined] + tb.unit = unit # type: ignore[attr-defined] + return tb + + +# --------------------------------------------------------------------------- +# get_products shim integration +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_get_products_timeout_returns_incomplete(executor): + """1s budget against a 10s adapter returns the incomplete[] shape.""" + handler = PlatformHandler( + _make_platform_with_delay(10.0), + executor=executor, + registry=InMemoryTaskRegistry(), + ) + req = GetProductsRequest.model_construct( + account=None, + time_budget=_make_time_budget(interval=1, unit="seconds"), + ) + result = await handler.get_products(req, context=ToolContext()) + + # Must be dict or model with products=[] and incomplete list + if hasattr(result, "incomplete"): + # Pydantic model path + assert result.incomplete is not None and len(result.incomplete) >= 1 + assert result.products == [] or result.products is None or list(result.products) == [] # type: ignore[union-attr] + else: + assert isinstance(result, dict) + assert result["products"] == [] + assert len(result["incomplete"]) >= 1 + assert result["incomplete"][0]["scope"] == "products" + + +@pytest.mark.asyncio +async def test_get_products_within_budget_passes_through(executor): + """Adopter that returns within budget passes response through unchanged.""" + + class _FastSeller(DecisioningPlatform): + capabilities = DecisioningCapabilities(specialisms=["sales-non-guaranteed"]) + accounts = SingletonAccounts(account_id="test") + + async def get_products(self, req, ctx): + return {"products": [{"product_id": "p1", "name": "Fast Product"}]} + + handler = PlatformHandler( + _FastSeller(), + executor=executor, + registry=InMemoryTaskRegistry(), + ) + req = GetProductsRequest.model_construct( + account=None, + time_budget=_make_time_budget(interval=10, unit="seconds"), + ) + result = await handler.get_products(req, context=ToolContext()) + + products = result.get("products") if isinstance(result, dict) else list(getattr(result, "products", [])) # type: ignore[union-attr] + assert len(products) == 1 + pid = products[0].get("product_id") if isinstance(products[0], dict) else products[0].product_id # type: ignore[union-attr] + assert pid == "p1" + # No incomplete key / field when fully resolved + incomplete = result.get("incomplete") if isinstance(result, dict) else getattr(result, "incomplete", None) + assert not incomplete + + +@pytest.mark.asyncio +async def test_get_products_absent_time_budget_no_deadline(executor): + """When time_budget is absent, the platform runs to completion with no deadline.""" + + class _SlowButUnlimited(DecisioningPlatform): + capabilities = DecisioningCapabilities(specialisms=["sales-non-guaranteed"]) + accounts = SingletonAccounts(account_id="test") + + async def get_products(self, req, ctx): + await asyncio.sleep(0.05) # brief delay — no deadline should fire + return {"products": [{"product_id": "p2", "name": "Unlimited"}]} + + handler = PlatformHandler( + _SlowButUnlimited(), + executor=executor, + registry=InMemoryTaskRegistry(), + ) + req = GetProductsRequest.model_construct(account=None, time_budget=None) + result = await handler.get_products(req, context=ToolContext()) + products = result.get("products") if isinstance(result, dict) else list(getattr(result, "products", [])) # type: ignore[union-attr] + assert len(products) == 1 + + +@pytest.mark.asyncio +async def test_get_products_campaign_unit_no_deadline(executor): + """unit='campaign' must not install a deadline; slow platform runs to completion.""" + + class _CampaignSeller(DecisioningPlatform): + capabilities = DecisioningCapabilities(specialisms=["sales-non-guaranteed"]) + accounts = SingletonAccounts(account_id="test") + + async def get_products(self, req, ctx): + await asyncio.sleep(0.05) + return {"products": [{"product_id": "p3", "name": "Campaign Product"}]} + + handler = PlatformHandler( + _CampaignSeller(), + executor=executor, + registry=InMemoryTaskRegistry(), + ) + req = GetProductsRequest.model_construct( + account=None, + time_budget=_make_time_budget(interval=1, unit="campaign"), + ) + result = await handler.get_products(req, context=ToolContext()) + products = result.get("products") if isinstance(result, dict) else list(getattr(result, "products", [])) # type: ignore[union-attr] + assert len(products) == 1 + + +@pytest.mark.asyncio +async def test_get_products_timeout_logs_warning(executor, caplog): + """A timeout emits a WARNING with budget info.""" + import logging + + handler = PlatformHandler( + _make_platform_with_delay(10.0), + executor=executor, + registry=InMemoryTaskRegistry(), + ) + req = GetProductsRequest.model_construct( + account=None, + time_budget=_make_time_budget(interval=1, unit="seconds"), + ) + with caplog.at_level(logging.WARNING, logger="adcp.decisioning.handler"): + await handler.get_products(req, context=ToolContext()) + assert any("timed out" in r.message.lower() for r in caplog.records) + + +# --------------------------------------------------------------------------- +# Public API importability +# --------------------------------------------------------------------------- + + +def test_incremental_get_products_importable_from_decisioning(): + from adcp.decisioning import IncrementalGetProducts as IGP # noqa: F401 + + assert IGP is IncrementalGetProducts + + +def test_products_checkpoint_importable_from_decisioning(): + from adcp.decisioning import ProductsCheckpoint as PC # noqa: F401 + + assert PC is ProductsCheckpoint + + +def test_products_checkpoint_accumulates_batches(): + cp = ProductsCheckpoint() + cp.add_batch({"products": [{"product_id": "a"}]}) + cp.add_batch({"products": [{"product_id": "b"}]}) + assert len(cp.batches) == 2 + assert cp.batches[0]["products"][0]["product_id"] == "a" + + +def test_incremental_get_products_is_runtime_checkable(): + """Protocol is @runtime_checkable so isinstance works at runtime.""" + + class FakeIncremental: + async def get_products_incremental(self, req, ctx, checkpoint): + yield {} # pragma: no cover + + assert isinstance(FakeIncremental(), IncrementalGetProducts)