diff --git a/src/adcp/decisioning/__init__.py b/src/adcp/decisioning/__init__.py index ba5d796db..ccc3ffe32 100644 --- a/src/adcp/decisioning/__init__.py +++ b/src/adcp/decisioning/__init__.py @@ -167,6 +167,7 @@ def create_media_buy( TaskRegistry, TaskState, ) +from adcp.decisioning.tenant_store import create_tenant_store from adcp.decisioning.translation import ( TranslationMap, create_translation_map, @@ -318,6 +319,7 @@ def __init__(self, *args: object, **kwargs: object) -> None: "create_adcp_server_from_platform", "create_oauth_passthrough_resolver", "create_roster_account_store", + "create_tenant_store", "create_translation_map", "create_upstream_http_client", "require_account_match", diff --git a/src/adcp/decisioning/tenant_store.py b/src/adcp/decisioning/tenant_store.py new file mode 100644 index 000000000..6b0726552 --- /dev/null +++ b/src/adcp/decisioning/tenant_store.py @@ -0,0 +1,575 @@ +"""``create_tenant_store`` — opinionated multi-tenant :class:`AccountStore` +builder with a baked-in per-entry tenant-isolation gate. + +Solves the recurring class of bug where adopters routing by wire-supplied +operator without cross-checking the auth principal could write across +tenants. The gate is enforced inside the framework: cross-tenant entries +on ``upsert`` / ``sync_governance`` are rejected with +``PERMISSION_DENIED`` before reaching adopter callbacks. + +Mirrors the JS-side ``createTenantStore`` at +``packages/sdk/src/server/decisioning/tenant-store.ts`` (6.7). The +Python adaptation flattens the ``Tenant`` value to a string ``tenant_id`` +since adopters typically denormalize the owning tenant onto the Account +itself; the security semantics are unchanged. + +**Fail-closed.** When ``resolve_from_auth(ctx)`` returns ``None`` +(unknown / unauthenticated principal): + +* ``resolve`` returns ``None`` +* ``upsert`` rejects every entry with ``PERMISSION_DENIED`` +* ``sync_governance`` rejects every entry with ``PERMISSION_DENIED`` +* ``list`` returns ``[]`` + +Don't fork this around to fail-open. Adopters who copied the prior +fail-open shape (``if home_tenant_id and tenant_id != home_tenant_id``) +silently disabled isolation when a credential lacked a tenant binding. + +**Immutability.** The returned store's methods are defined on the class +(not assigned in ``__init__`` to a callable hook), and the class uses +``__slots__`` to forbid instance attribute assignment. An adopter who +writes ``store.upsert = custom_handler`` after construction gets an +:class:`AttributeError` instead of silently bypassing the gate. Adopters +with genuine custom needs compose at the method level (wrap the returned +store) or write a plain :class:`AccountStore` and own the gate. + +``_TenantStore`` is intentionally not exported from +``adcp.decisioning.__init__``; only the :func:`create_tenant_store` +factory is public. Class-level monkey-patching is possible in pure +Python (no language-level final), but the leading-underscore + +non-export keep it out of adopter code paths. +""" + +from __future__ import annotations + +import inspect +import logging +from collections.abc import Awaitable, Callable +from typing import TYPE_CHECKING, Any, Generic, cast + +from typing_extensions import TypeVar + +from adcp.decisioning.accounts import ResolveContext +from adcp.decisioning.context import AuthInfo +from adcp.decisioning.types import ( + Account, + SyncAccountsResultRow, + SyncGovernanceEntry, + SyncGovernanceResultRow, +) + +if TYPE_CHECKING: + from adcp.types import AccountReference + +logger = logging.getLogger(__name__) + +# Alias for the builtin ``list`` so annotations on the +# :meth:`_TenantStore.list` method (which shadows ``list`` inside the +# class body for forward-ref name resolution) keep referring to the +# generic-list type. +_BuiltinList = list + +__all__ = ["create_tenant_store"] + +#: Per-platform metadata generic. Defaults to ``dict[str, Any]`` — +#: matches :class:`Account[TMeta]`'s default. +TMeta = TypeVar("TMeta", default=dict[str, Any]) + +# Type aliases for the adopter callbacks. All callables may be sync OR +# async; the helper awaits at call time. AccountReference enters as +# either a Pydantic model or a raw dict (legacy callers) — typed as +# ``Any`` on the callable boundary to avoid forcing adopters into a +# specific arm of the discriminated union. +_ResolveByRef = Callable[ + [Any, ResolveContext], + "Awaitable[Account[TMeta] | None] | Account[TMeta] | None", +] +_ResolveFromAuth = Callable[[ResolveContext], "Awaitable[str | None] | str | None"] +_TenantIdFn = Callable[["Account[TMeta]"], str] +_TenantToAccount = Callable[[str], "Awaitable[Account[TMeta] | None] | Account[TMeta] | None"] +_UpsertRow = Callable[ + [Any, ResolveContext], + "Awaitable[SyncAccountsResultRow] | SyncAccountsResultRow", +] +_SyncGovernanceRow = Callable[ + [SyncGovernanceEntry, ResolveContext], + "Awaitable[SyncGovernanceResultRow] | SyncGovernanceResultRow", +] + + +async def _await_maybe(value: Any) -> Any: + """Resolve a value that may be a coroutine OR a plain return. + + Adopter callbacks are sync OR async; this shim keeps the helper's + own dispatch uniform without forcing every adopter to write + ``async def``. + """ + if inspect.isawaitable(value): + return await value + return value + + +def _ref_field(ref: Any, name: str) -> Any: + """Read a field off an :class:`AccountReference` whether it arrived + as a Pydantic model (RootModel proxy) or a raw dict. + + Mirrors the JS ``narrowAccountRef`` helper. The wire schema is a + discriminated union of ``{account_id}`` and ``{brand, operator}``; + upstream validation guarantees one arm is populated, so widening to + an all-optional read is safe. + """ + if ref is None: + return None + if isinstance(ref, dict): + return ref.get(name) + return getattr(ref, name, None) + + +def _account_not_found_message(ref: Any) -> str: + account_id = _ref_field(ref, "account_id") + if account_id: + return f"Unknown account_id: {account_id}" + operator = _ref_field(ref, "operator") + if operator: + return f"Unknown operator: {operator}" + return "Unknown account reference" + + +def _permission_denied_message(ref: Any) -> str: + operator = _ref_field(ref, "operator") + account_id = _ref_field(ref, "account_id") + subject = operator or account_id or "this account" + return ( + f"Buyer agent has no authority over '{subject}' " + "(tenant mismatch or auth principal not registered)." + ) + + +def _build_failed_sync_accounts_row(ref: Any, code: str, message: str) -> SyncAccountsResultRow: + """Construct a wire-shaped failure row for ``sync_accounts``. + + The wire schema requires ``brand`` + ``operator`` on every row, so + when the input ref is ``account_id``-only we synthesize ``'unknown'`` + placeholders — the buyer's actionable signal is ``errors[0].code``; + ``brand`` / ``operator`` here are wire-required scaffolding, not + authoritative echoes. + """ + brand_field = _ref_field(ref, "brand") or {} + if not isinstance(brand_field, dict): + # Pydantic BrandReference — coerce via attribute access + brand_dict = {"domain": getattr(brand_field, "domain", "unknown.example")} + else: + brand_dict = {"domain": brand_field.get("domain", "unknown.example")} + operator = _ref_field(ref, "operator") or "unknown" + account_id = _ref_field(ref, "account_id") + return SyncAccountsResultRow( + brand=brand_dict, + operator=operator, + action="failed", + status="rejected", + errors=[{"code": code, "message": message}], + account_id=account_id if isinstance(account_id, str) else None, + ) + + +def _build_failed_sync_governance_row( + entry: SyncGovernanceEntry, code: str, message: str +) -> SyncGovernanceResultRow: + return SyncGovernanceResultRow( + account=entry.account, + status="failed", + errors=[{"code": code, "message": message}], + ) + + +def _default_unchanged_row(ref: Any) -> SyncAccountsResultRow: + """Build a no-op success row when adopter omits ``upsert_row``. + + Matches the wire shape with ``action='unchanged'`` so authorized + entries don't surface as 501 / UNSUPPORTED_FEATURE just because + the adopter has no persistence to perform. + """ + brand_field = _ref_field(ref, "brand") or {} + if not isinstance(brand_field, dict): + brand_dict = {"domain": getattr(brand_field, "domain", "unknown.example")} + else: + brand_dict = {"domain": brand_field.get("domain", "unknown.example")} + operator = _ref_field(ref, "operator") or "unknown" + account_id = _ref_field(ref, "account_id") + return SyncAccountsResultRow( + brand=brand_dict, + operator=operator, + action="unchanged", + status="active", + account_id=account_id if isinstance(account_id, str) else None, + ) + + +class _TenantStore(Generic[TMeta]): + """Concrete :class:`AccountStore` with per-entry tenant gate. + + Methods are defined on the class (not assigned in ``__init__``) so + they can't be monkey-patched to bypass isolation. ``__slots__`` + forbids instance attribute assignment — adopters who try to + override ``upsert`` get :class:`AttributeError`. + """ + + __slots__ = ( + "_resolve_by_ref", + "_resolve_from_auth", + "_tenant_id", + "_tenant_to_account", + "_upsert_row", + "_sync_governance_row", + ) + + # Required for AccountStore Protocol structural matching. The + # tenant-store helper covers all three resolution shapes, so the + # most useful single literal is ``'explicit'`` (wire ref drives + # lookup when present); Path-2 (auth-derived) is handled + # transparently inside ``resolve``. + resolution = "explicit" + + def __init__( + self, + *, + resolve_by_ref: _ResolveByRef[TMeta], + resolve_from_auth: _ResolveFromAuth, + tenant_id: _TenantIdFn[TMeta], + tenant_to_account: _TenantToAccount[TMeta], + upsert_row: _UpsertRow | None = None, + sync_governance_row: _SyncGovernanceRow | None = None, + ) -> None: + self._resolve_by_ref = resolve_by_ref + self._resolve_from_auth = resolve_from_auth + self._tenant_id = tenant_id + self._tenant_to_account = tenant_to_account + self._upsert_row = upsert_row + self._sync_governance_row = sync_governance_row + + async def _auth_tenant(self, ctx: ResolveContext) -> str | None: + """Compute the auth principal's tenant once per request.""" + return cast("str | None", await _await_maybe(self._resolve_from_auth(ctx))) + + async def resolve( + self, + ref: AccountReference | dict[str, Any] | None, + auth_info: AuthInfo | None = None, + ) -> Account[TMeta] | None: + """Resolve a wire reference to the tenant-scoped Account. + + Signature matches the :class:`AccountStore` Protocol + (``resolve(ref, auth_info=None)``); the dispatcher calls this + as ``accounts.resolve(ref_dict, auth_info=auth_info)``. We + synthesize a :class:`ResolveContext` internally so the adopter's + ``resolve_by_ref`` callback continues to take ``(ref, ctx)`` — + that keeps the adopter API uniform with ``upsert_row`` / + ``sync_governance_row``. + + Two paths: + + * **Path 1** (ref provided): call ``resolve_by_ref(ref, ctx)``, + then verify the resolved account's tenant matches the auth + principal's tenant. Mismatch → return ``None`` (the gate + hides the existence of cross-tenant accounts from the + caller's perspective). + + * **Path 2** (ref is ``None``): derive tenant from auth, then + project to Account via ``tenant_to_account``. + + Auth tenant ``None`` → ``None`` on either path. The framework + treats ``None`` as ``ACCOUNT_NOT_FOUND`` for tools that require + an account. + """ + resolve_ctx = ResolveContext(auth_info=auth_info, tool_name="resolve") + auth_tid = await self._auth_tenant(resolve_ctx) + if auth_tid is None: + return None + + if ref is None: + return cast( + "Account[TMeta] | None", + await _await_maybe(self._tenant_to_account(auth_tid)), + ) + + try: + account = cast( + "Account[TMeta] | None", + await _await_maybe(self._resolve_by_ref(ref, resolve_ctx)), + ) + except Exception: + # Per-request consistency with the per-entry isolation in + # ``upsert`` / ``sync_governance``: log-and-deny rather + # than 500-ing the calling tool. Adopter exception details + # stay server-side (could carry stack/DB info). + logger.warning( + "tenant_store.resolve: resolve_by_ref raised; treating as ACCOUNT_NOT_FOUND", + exc_info=True, + ) + return None + if account is None: + return None + if self._tenant_id(account) != auth_tid: + return None + return account + + async def upsert( + self, + refs: _BuiltinList[AccountReference | dict[str, Any]], + ctx: ResolveContext | None = None, + ) -> _BuiltinList[SyncAccountsResultRow]: + """Per-entry tenant-isolation gate for ``sync_accounts``. + + For each ref: + + 1. Compute the entry's tenant via ``resolve_by_ref``. + 2. Compare against the auth principal's tenant + (``resolve_from_auth(ctx)``, computed once per call). + 3. Unknown ref → ``ACCOUNT_NOT_FOUND``. + 4. Auth tenant ``None`` OR auth tenant != entry tenant → + ``PERMISSION_DENIED`` (fail-closed). + 5. Otherwise, dispatch to ``upsert_row`` (or no-op + ``action='unchanged'`` if no hook). + + Sequential, not concurrent: adopter ``upsert_row`` callbacks + commonly mutate shared tenant state; concurrent invocations + against the same tenant are an entropy source the helper + shouldn't introduce. Adopters who want parallel writes fan out + inside their own callback. + """ + resolve_ctx = ctx if ctx is not None else ResolveContext() + auth_tid = await self._auth_tenant(resolve_ctx) + + rows: _BuiltinList[SyncAccountsResultRow] = [] + for ref in refs: + try: + entry_account = await _await_maybe(self._resolve_by_ref(ref, resolve_ctx)) + except Exception: + # Per-entry isolation: one bad row must not poison the + # batch. Log server-side; emit PERMISSION_DENIED on the + # wire (don't leak adopter exception detail — could + # carry stack/DB info). + logger.warning( + "tenant_store.upsert: resolve_by_ref raised for entry; " + "rejecting with PERMISSION_DENIED", + exc_info=True, + ) + rows.append( + _build_failed_sync_accounts_row( + ref, "PERMISSION_DENIED", _permission_denied_message(ref) + ) + ) + continue + if entry_account is None: + rows.append( + _build_failed_sync_accounts_row( + ref, "ACCOUNT_NOT_FOUND", _account_not_found_message(ref) + ) + ) + continue + try: + entry_tid = self._tenant_id(entry_account) + except Exception: + logger.warning( + "tenant_store.upsert: tenant_id raised for entry; " + "rejecting with PERMISSION_DENIED", + exc_info=True, + ) + rows.append( + _build_failed_sync_accounts_row( + ref, "PERMISSION_DENIED", _permission_denied_message(ref) + ) + ) + continue + if auth_tid is None or auth_tid != entry_tid: + rows.append( + _build_failed_sync_accounts_row( + ref, "PERMISSION_DENIED", _permission_denied_message(ref) + ) + ) + continue + if self._upsert_row is None: + rows.append(_default_unchanged_row(ref)) + else: + rows.append(await _await_maybe(self._upsert_row(ref, resolve_ctx))) + return rows + + async def list( + self, + filter: dict[str, Any] | None = None, # noqa: A002 — wire field name + ctx: ResolveContext | None = None, + ) -> _BuiltinList[Account[TMeta]]: + """Return the accounts visible to the calling principal. + + Single-tenant projection: derive the auth tenant, project to + an Account, return as a one-element list. Unauthenticated / + unregistered → ``[]`` (fail-closed but quiet — ``list`` MUST + NOT raise on a per-spec valid request). + + ``filter`` is accepted for AccountStore Protocol parity but + not interpreted here — adopters needing filter / pagination + compose by wrapping the returned store. + """ + del filter # accepted for Protocol parity; not interpreted + resolve_ctx = ctx if ctx is not None else ResolveContext() + auth_tid = await self._auth_tenant(resolve_ctx) + if auth_tid is None: + return [] + try: + account = await _await_maybe(self._tenant_to_account(auth_tid)) + except Exception: + # ``list`` MUST NOT raise on a per-spec valid request + # (docstring contract). Fail-closed quiet — same outcome + # as auth-None: the caller sees an empty list. + logger.warning( + "tenant_store.list: tenant_to_account raised; returning []", + exc_info=True, + ) + return [] + if account is None: + return [] + return [account] + + async def sync_governance( + self, + entries: _BuiltinList[SyncGovernanceEntry], + ctx: ResolveContext | None = None, + ) -> _BuiltinList[SyncGovernanceResultRow]: + """Per-entry tenant gate for ``sync_governance``. Same rules as + :meth:`upsert`, shaped for the ``SyncGovernanceResultRow`` arm + (``status='failed'`` with per-entry ``errors``).""" + resolve_ctx = ctx if ctx is not None else ResolveContext() + auth_tid = await self._auth_tenant(resolve_ctx) + + rows: _BuiltinList[SyncGovernanceResultRow] = [] + for entry in entries: + try: + entry_account = await _await_maybe(self._resolve_by_ref(entry.account, resolve_ctx)) + except Exception: + logger.warning( + "tenant_store.sync_governance: resolve_by_ref raised for entry; " + "rejecting with PERMISSION_DENIED", + exc_info=True, + ) + rows.append( + _build_failed_sync_governance_row( + entry, + "PERMISSION_DENIED", + _permission_denied_message(entry.account), + ) + ) + continue + if entry_account is None: + rows.append( + _build_failed_sync_governance_row( + entry, + "ACCOUNT_NOT_FOUND", + _account_not_found_message(entry.account), + ) + ) + continue + try: + entry_tid = self._tenant_id(entry_account) + except Exception: + logger.warning( + "tenant_store.sync_governance: tenant_id raised for entry; " + "rejecting with PERMISSION_DENIED", + exc_info=True, + ) + rows.append( + _build_failed_sync_governance_row( + entry, + "PERMISSION_DENIED", + _permission_denied_message(entry.account), + ) + ) + continue + if auth_tid is None or auth_tid != entry_tid: + rows.append( + _build_failed_sync_governance_row( + entry, + "PERMISSION_DENIED", + _permission_denied_message(entry.account), + ) + ) + continue + if self._sync_governance_row is None: + rows.append( + SyncGovernanceResultRow( + account=entry.account, + status="synced", + governance_agents=list(entry.governance_agents), + ) + ) + else: + rows.append(await _await_maybe(self._sync_governance_row(entry, resolve_ctx))) + return rows + + +def create_tenant_store( + *, + resolve_by_ref: _ResolveByRef[TMeta], + resolve_from_auth: _ResolveFromAuth, + tenant_id: _TenantIdFn[TMeta], + tenant_to_account: _TenantToAccount[TMeta], + upsert_row: _UpsertRow | None = None, + sync_governance_row: _SyncGovernanceRow | None = None, +) -> _TenantStore[TMeta]: + """Build an :class:`AccountStore` whose ``resolve`` / ``upsert`` / + ``list`` / ``sync_governance`` methods enforce tenant isolation. + + :param resolve_by_ref: ``(ref, ctx) -> Account | None``. Resolves a + wire :class:`AccountReference` to the framework Account it + points at — independent of who the caller is. Return ``None`` + if the ref is unknown (helper emits ``ACCOUNT_NOT_FOUND`` for + that row). May be sync or async. + :param resolve_from_auth: ``(ctx) -> tenant_id | None``. Derives the + tenant from the auth principal. Return ``None`` if no principal + is resolvable (no auth, principal not registered) — every entry + on per-entry tools then fails ``PERMISSION_DENIED`` + (fail-closed). + :param tenant_id: ``(account) -> str``. Stable identity for tenant- + equality checks. The helper compares + ``tenant_id(entry_account) == resolve_from_auth(ctx)`` to + enforce isolation. A stable string id beats reference equality + (Postgres-backed stores hand back fresh objects each fetch). + :param tenant_to_account: ``(tenant_id) -> Account | None``. Project + a tenant id to its Account. Used by Path-2 ``resolve`` + (no-ref tools) and by ``list``. + :param upsert_row: Optional ``(ref, ctx) -> SyncAccountsResultRow`` + per-entry storage callback. Cross-tenant entries and unknown-ref + entries NEVER reach this callback — the helper builds + ``PERMISSION_DENIED`` / ``ACCOUNT_NOT_FOUND`` rows for those + before invoking adopter code. Omit for adopters whose platform + doesn't claim ``sync_accounts``; the helper returns + ``action='unchanged'`` for authorized rows in that case. + :param sync_governance_row: Optional ``(entry, ctx) -> + SyncGovernanceResultRow``. Same gating rules as ``upsert_row``. + Adopters persist the buyer's governance-agent binding here. + + :returns: An :class:`AccountStore`-shaped object whose gate methods + are class-level (immutable per instance — ``__slots__`` forbids + attribute assignment). + + Example:: + + from adcp.decisioning import create_tenant_store + + store = create_tenant_store( + resolve_by_ref=lambda ref, ctx: lookup_account_by_ref(ref), + resolve_from_auth=lambda ctx: principal_to_tenant.get( + ctx.auth_info.principal if ctx.auth_info else None + ), + tenant_id=lambda account: account.metadata["tenant_id"], + tenant_to_account=lambda tid: tenants[tid].account, + upsert_row=lambda ref, ctx: persist_account(ref), + ) + """ + return _TenantStore( + resolve_by_ref=resolve_by_ref, + resolve_from_auth=resolve_from_auth, + tenant_id=tenant_id, + tenant_to_account=tenant_to_account, + upsert_row=upsert_row, + sync_governance_row=sync_governance_row, + ) diff --git a/tests/test_tenant_store.py b/tests/test_tenant_store.py new file mode 100644 index 000000000..0f323723f --- /dev/null +++ b/tests/test_tenant_store.py @@ -0,0 +1,658 @@ +"""Tests for ``create_tenant_store`` — the opinionated multi-tenant +:class:`AccountStore` builder with a baked-in per-entry tenant gate. + +Mirrors the security semantics of the JS-side ``createTenantStore`` at +``packages/sdk/src/server/decisioning/tenant-store.ts``: cross-tenant +entries on ``upsert`` / ``sync_governance`` are rejected with +``PERMISSION_DENIED`` BEFORE reaching adopter callbacks. Fail-closed +when ``resolve_from_auth`` returns ``None``. + +The gate methods (``upsert``, ``sync_governance``) are defined on the +class — adopters cannot monkey-patch instances to bypass isolation. +""" + +from __future__ import annotations + +import asyncio +from typing import Any + +import pytest + +from adcp.decisioning import ( + Account, + AccountStore, + AuthInfo, + ResolveContext, + SyncAccountsResultRow, + SyncGovernanceEntry, + SyncGovernanceResultRow, + create_tenant_store, +) +from adcp.types import AccountReference + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + +# Account directory keyed by tenant_id. Each tenant owns exactly one +# account; in a real adopter the per-account-per-tenant projection is +# more elaborate, but the gate semantics are identical. +ACCOUNTS: dict[str, Account] = { + "t_pinnacle": Account(id="acc_pinnacle", name="Pinnacle"), + "t_meridian": Account(id="acc_meridian", name="Meridian"), +} + +# Operator → tenant routing. Adopter's resolve_by_ref typically +# encapsulates this lookup; the test surfaces it explicitly so +# fixtures stay readable. +OPERATOR_TO_TENANT: dict[str, str] = { + "pinnacle.example": "t_pinnacle", + "meridian.example": "t_meridian", +} + +# Auth principal → tenant. Driven by ctx.auth_info.principal in the +# helper's resolve_from_auth callback. +PRINCIPAL_TO_TENANT: dict[str, str] = { + "buyer@pinnacle": "t_pinnacle", + "buyer@meridian": "t_meridian", +} + + +def _account_tenant_id(account: Account) -> str: + """Project an Account back to its owning tenant id. + + Mirrors the JS ``tenantId`` callback. In the test fixture the + projection is the inverse of ACCOUNTS — adopters typically read + a denormalized field on their own Account model. + """ + for tid, acc in ACCOUNTS.items(): + if acc.id == account.id: + return tid + raise KeyError(f"Account has no known tenant: {account.id!r}") + + +def _resolve_by_ref(ref: AccountReference | dict[str, Any], ctx: ResolveContext) -> Account | None: + """Adopter's ref → Account lookup. Reads the (brand, operator) arm. + + Returns ``None`` for unknown operators (helper emits + ``ACCOUNT_NOT_FOUND`` for that row). + """ + del ctx # ref-based lookup ignores ctx in this fixture + operator = ref.get("operator") if isinstance(ref, dict) else getattr(ref, "operator", None) + if operator is None: + return None + tid = OPERATOR_TO_TENANT.get(operator) + return ACCOUNTS.get(tid) if tid else None + + +def _resolve_from_auth(ctx: ResolveContext) -> str | None: + """Adopter's auth → tenant_id lookup. Returns ``None`` for + unregistered principals (helper rejects every entry with + ``PERMISSION_DENIED``).""" + if ctx.auth_info is None or not ctx.auth_info.principal: + return None + return PRINCIPAL_TO_TENANT.get(ctx.auth_info.principal) + + +def _tenant_to_account(tenant_id: str) -> Account | None: + """Adopter's tenant_id → Account projection. Used for Path-2 + (no-ref) resolution and for ``list``.""" + return ACCOUNTS.get(tenant_id) + + +def _ctx(principal: str | None) -> ResolveContext: + """Construct a ResolveContext with the given principal. ``None`` + principal models an unauthenticated request — the fail-closed + case (every entry rejected with PERMISSION_DENIED).""" + if principal is None: + return ResolveContext(auth_info=None) + return ResolveContext(auth_info=AuthInfo(kind="api_key", principal=principal)) + + +def _auth(principal: str | None) -> AuthInfo | None: + """Construct an AuthInfo with the given principal — mirrors the + dispatcher's ``accounts.resolve(ref_dict, auth_info=auth_info)`` + call shape (the Protocol takes ``auth_info``, not ``ctx``).""" + if principal is None: + return None + return AuthInfo(kind="api_key", principal=principal) + + +def _ref(operator: str, brand: str = "acme.example") -> dict[str, Any]: + """Build an operator-arm AccountReference as a dict (the wire shape).""" + return {"brand": {"domain": brand}, "operator": operator} + + +def _run(coro: Any) -> Any: + """Run an awaitable in a fresh event loop (pytest-asyncio not + required for the tenant store's narrow sync surface).""" + if asyncio.iscoroutine(coro): + return asyncio.new_event_loop().run_until_complete(coro) + return coro + + +# --------------------------------------------------------------------------- +# resolve +# --------------------------------------------------------------------------- + + +class TestResolve: + def test_protocol_conformance(self) -> None: + """``_TenantStore`` must satisfy the runtime-checkable + :class:`AccountStore` Protocol — the dispatcher relies on + ``isinstance(store, AccountStore)`` and calls + ``accounts.resolve(ref_dict, auth_info=auth_info)`` as a + keyword argument.""" + store = create_tenant_store( + resolve_by_ref=_resolve_by_ref, + resolve_from_auth=_resolve_from_auth, + tenant_id=_account_tenant_id, + tenant_to_account=_tenant_to_account, + ) + assert isinstance(store, AccountStore) + + def test_resolve_called_with_auth_info_kwarg(self) -> None: + """Mirrors the dispatcher call shape exactly — ``auth_info`` + as a keyword. This is the call that breaks if ``resolve`` + keeps the old ``ctx=`` signature.""" + store = create_tenant_store( + resolve_by_ref=_resolve_by_ref, + resolve_from_auth=_resolve_from_auth, + tenant_id=_account_tenant_id, + tenant_to_account=_tenant_to_account, + ) + acc = _run(store.resolve(_ref("pinnacle.example"), auth_info=_auth("buyer@pinnacle"))) + assert acc is not None + assert acc.id == "acc_pinnacle" + + def test_same_tenant_ref_returns_account(self) -> None: + store = create_tenant_store( + resolve_by_ref=_resolve_by_ref, + resolve_from_auth=_resolve_from_auth, + tenant_id=_account_tenant_id, + tenant_to_account=_tenant_to_account, + ) + acc = _run(store.resolve(_ref("pinnacle.example"), _auth("buyer@pinnacle"))) + assert acc is not None + assert acc.id == "acc_pinnacle" + + def test_cross_tenant_ref_returns_none(self) -> None: + """Pinnacle credential, Meridian operator on the wire — the + gate hides the existence of the cross-tenant account.""" + store = create_tenant_store( + resolve_by_ref=_resolve_by_ref, + resolve_from_auth=_resolve_from_auth, + tenant_id=_account_tenant_id, + tenant_to_account=_tenant_to_account, + ) + acc = _run(store.resolve(_ref("meridian.example"), _auth("buyer@pinnacle"))) + assert acc is None + + def test_unknown_ref_returns_none(self) -> None: + store = create_tenant_store( + resolve_by_ref=_resolve_by_ref, + resolve_from_auth=_resolve_from_auth, + tenant_id=_account_tenant_id, + tenant_to_account=_tenant_to_account, + ) + acc = _run(store.resolve(_ref("unknown.example"), _auth("buyer@pinnacle"))) + assert acc is None + + def test_auth_has_no_tenant_with_ref_returns_none(self) -> None: + """Unregistered principal cannot resolve any ref — the auth + tenant is None, the equality check fails.""" + store = create_tenant_store( + resolve_by_ref=_resolve_by_ref, + resolve_from_auth=_resolve_from_auth, + tenant_id=_account_tenant_id, + tenant_to_account=_tenant_to_account, + ) + acc = _run(store.resolve(_ref("pinnacle.example"), _auth("not-registered"))) + assert acc is None + + def test_no_ref_path2_returns_auth_tenant_account(self) -> None: + """Path 2: no ref on the wire, derive account from auth.""" + store = create_tenant_store( + resolve_by_ref=_resolve_by_ref, + resolve_from_auth=_resolve_from_auth, + tenant_id=_account_tenant_id, + tenant_to_account=_tenant_to_account, + ) + acc = _run(store.resolve(None, _auth("buyer@pinnacle"))) + assert acc is not None + assert acc.id == "acc_pinnacle" + + def test_no_ref_unauthenticated_returns_none(self) -> None: + store = create_tenant_store( + resolve_by_ref=_resolve_by_ref, + resolve_from_auth=_resolve_from_auth, + tenant_id=_account_tenant_id, + tenant_to_account=_tenant_to_account, + ) + acc = _run(store.resolve(None, _auth(None))) + assert acc is None + + def test_resolve_by_ref_raises_returns_none(self) -> None: + """Per-request log-and-deny: an exception in adopter + ``resolve_by_ref`` must surface as ``None``, not propagate + out and 500 the calling tool.""" + + def raising_resolve_by_ref( + ref: AccountReference | dict[str, Any], ctx: ResolveContext + ) -> Account | None: + del ref, ctx + raise RuntimeError("simulated DB outage") + + store = create_tenant_store( + resolve_by_ref=raising_resolve_by_ref, + resolve_from_auth=_resolve_from_auth, + tenant_id=_account_tenant_id, + tenant_to_account=_tenant_to_account, + ) + acc = _run(store.resolve(_ref("pinnacle.example"), _auth("buyer@pinnacle"))) + assert acc is None + + +# --------------------------------------------------------------------------- +# upsert (sync_accounts) — per-entry tenant-isolation gate +# --------------------------------------------------------------------------- + + +class TestUpsert: + @staticmethod + def _build_with_recorder() -> tuple[Any, list[dict[str, Any]]]: + """Construct a store whose upsert_row records each invocation. + + Asserting on ``writes`` lets tests verify cross-tenant entries + NEVER reach adopter code — the gate filters them upstream. + """ + writes: list[dict[str, Any]] = [] + + def upsert_row(row: dict[str, Any], ctx: ResolveContext) -> SyncAccountsResultRow: + del ctx + writes.append(row) + return SyncAccountsResultRow( + brand=row["brand"], + operator=row["operator"], + action="created", + status="active", + ) + + store = create_tenant_store( + resolve_by_ref=_resolve_by_ref, + resolve_from_auth=_resolve_from_auth, + tenant_id=_account_tenant_id, + tenant_to_account=_tenant_to_account, + upsert_row=upsert_row, + ) + return store, writes + + def test_in_tenant_entry_passes_through(self) -> None: + store, writes = self._build_with_recorder() + rows = _run(store.upsert([_ref("pinnacle.example")], _ctx("buyer@pinnacle"))) + assert len(writes) == 1 + assert rows[0].action == "created" + assert rows[0].status == "active" + + def test_cross_tenant_entry_rejected_before_adopter_code(self) -> None: + store, writes = self._build_with_recorder() + rows = _run(store.upsert([_ref("pinnacle.example")], _ctx("buyer@meridian"))) + assert writes == [], "upsert_row MUST NOT run for cross-tenant entries" + assert rows[0].action == "failed" + assert rows[0].status == "rejected" + assert rows[0].errors is not None + assert rows[0].errors[0]["code"] == "PERMISSION_DENIED" + + def test_unknown_ref_rejected_with_account_not_found(self) -> None: + """ACCOUNT_NOT_FOUND is the right code for "ref points + nowhere" — distinct from PERMISSION_DENIED ("ref valid but + you're not authorized").""" + store, writes = self._build_with_recorder() + rows = _run(store.upsert([_ref("unknown.example")], _ctx("buyer@pinnacle"))) + assert writes == [] + assert rows[0].errors is not None + assert rows[0].errors[0]["code"] == "ACCOUNT_NOT_FOUND" + + def test_fail_closed_no_auth_rejects_every_entry(self) -> None: + """Unregistered principal — every entry fails with + PERMISSION_DENIED regardless of operator.""" + store, writes = self._build_with_recorder() + rows = _run( + store.upsert( + [_ref("pinnacle.example"), _ref("meridian.example")], + _ctx("not-registered"), + ) + ) + assert writes == [] + assert len(rows) == 2 + for row in rows: + assert row.errors is not None + assert row.errors[0]["code"] == "PERMISSION_DENIED" + + def test_fail_closed_unauthenticated_rejects_every_entry(self) -> None: + """ctx.auth_info is None — same fail-closed behavior.""" + store, writes = self._build_with_recorder() + rows = _run(store.upsert([_ref("pinnacle.example")], _ctx(None))) + assert writes == [] + assert rows[0].errors is not None + assert rows[0].errors[0]["code"] == "PERMISSION_DENIED" + + def test_mixed_batch_partitions_correctly(self) -> None: + """Pass / cross-tenant / unknown — three distinct outcomes, + only the in-tenant entry reaches adopter code.""" + store, writes = self._build_with_recorder() + rows = _run( + store.upsert( + [ + _ref("pinnacle.example", "a.example"), # pass + _ref("meridian.example", "b.example"), # cross-tenant + _ref("unknown.example", "c.example"), # unknown + ], + _ctx("buyer@pinnacle"), + ) + ) + assert len(writes) == 1, "only the in-tenant entry should reach upsert_row" + assert rows[0].action == "created" + assert rows[1].errors is not None + assert rows[1].errors[0]["code"] == "PERMISSION_DENIED" + assert rows[2].errors is not None + assert rows[2].errors[0]["code"] == "ACCOUNT_NOT_FOUND" + + def test_resolve_by_ref_raises_isolates_to_single_entry(self) -> None: + """One bad row must not poison the batch. When + ``resolve_by_ref`` raises for one entry, that entry surfaces + as PERMISSION_DENIED while sibling entries pass through.""" + writes: list[dict[str, Any]] = [] + + def upsert_row(row: dict[str, Any], ctx: ResolveContext) -> SyncAccountsResultRow: + del ctx + writes.append(row) + return SyncAccountsResultRow( + brand=row["brand"], + operator=row["operator"], + action="created", + status="active", + ) + + def flaky_resolve_by_ref( + ref: AccountReference | dict[str, Any], ctx: ResolveContext + ) -> Account | None: + operator = ( + ref.get("operator") if isinstance(ref, dict) else getattr(ref, "operator", None) + ) + if operator == "boom.example": + raise RuntimeError("simulated adopter failure") + return _resolve_by_ref(ref, ctx) + + store = create_tenant_store( + resolve_by_ref=flaky_resolve_by_ref, + resolve_from_auth=_resolve_from_auth, + tenant_id=_account_tenant_id, + tenant_to_account=_tenant_to_account, + upsert_row=upsert_row, + ) + rows = _run( + store.upsert( + [ + _ref("pinnacle.example"), + _ref("boom.example"), + _ref("pinnacle.example", "other.example"), + ], + _ctx("buyer@pinnacle"), + ) + ) + assert len(rows) == 3 + assert rows[0].action == "created" + assert rows[1].action == "failed" + assert rows[1].errors is not None + assert rows[1].errors[0]["code"] == "PERMISSION_DENIED" + assert rows[2].action == "created" + # Two passing entries reached upsert_row; the raising one was + # filtered upstream — adopter exception detail did not leak. + assert len(writes) == 2 + + def test_tenant_id_raises_isolates_to_single_entry(self) -> None: + """``tenant_id(account)`` raising for one entry must not abort + the batch — same per-entry isolation as ``resolve_by_ref``.""" + writes: list[dict[str, Any]] = [] + + def upsert_row(row: dict[str, Any], ctx: ResolveContext) -> SyncAccountsResultRow: + del ctx + writes.append(row) + return SyncAccountsResultRow( + brand=row["brand"], + operator=row["operator"], + action="created", + status="active", + ) + + def flaky_tenant_id(account: Account) -> str: + if account.id == "acc_meridian": + raise RuntimeError("simulated tenant lookup failure") + return _account_tenant_id(account) + + store = create_tenant_store( + resolve_by_ref=_resolve_by_ref, + resolve_from_auth=_resolve_from_auth, + tenant_id=flaky_tenant_id, + tenant_to_account=_tenant_to_account, + upsert_row=upsert_row, + ) + rows = _run( + store.upsert( + [_ref("pinnacle.example"), _ref("meridian.example")], + _ctx("buyer@pinnacle"), + ) + ) + assert len(rows) == 2 + # In-tenant entry passes (and reaches adopter code). + assert rows[0].action == "created" + # Cross-tenant ref where tenant_id raised — surfaces as + # PERMISSION_DENIED (the raise is treated as "we cannot + # confirm the entry's tenant" → fail-closed deny). + assert rows[1].errors is not None + assert rows[1].errors[0]["code"] == "PERMISSION_DENIED" + assert len(writes) == 1 + + def test_no_upsert_row_hook_is_noop(self) -> None: + """Without an adopter upsert_row, authorized rows still + receive a wire-shaped success result; the helper provides a + sensible default rather than 501-ing the whole batch.""" + store = create_tenant_store( + resolve_by_ref=_resolve_by_ref, + resolve_from_auth=_resolve_from_auth, + tenant_id=_account_tenant_id, + tenant_to_account=_tenant_to_account, + ) + rows = _run(store.upsert([_ref("pinnacle.example")], _ctx("buyer@pinnacle"))) + # With no adopter hook, authorized rows pass with action='unchanged' + # (no-op acknowledgment). Cross-tenant still rejects. + assert rows[0].action == "unchanged" + assert rows[0].errors is None + + +# --------------------------------------------------------------------------- +# list +# --------------------------------------------------------------------------- + + +class TestList: + def test_returns_only_same_tenant_account(self) -> None: + store = create_tenant_store( + resolve_by_ref=_resolve_by_ref, + resolve_from_auth=_resolve_from_auth, + tenant_id=_account_tenant_id, + tenant_to_account=_tenant_to_account, + ) + accounts = _run(store.list(ctx=_ctx("buyer@pinnacle"))) + assert len(accounts) == 1 + assert accounts[0].id == "acc_pinnacle" + + def test_unregistered_principal_returns_empty(self) -> None: + """Fail-closed but quiet: empty list, not raise.""" + store = create_tenant_store( + resolve_by_ref=_resolve_by_ref, + resolve_from_auth=_resolve_from_auth, + tenant_id=_account_tenant_id, + tenant_to_account=_tenant_to_account, + ) + accounts = _run(store.list(ctx=_ctx("not-registered"))) + assert accounts == [] + + def test_unauthenticated_returns_empty(self) -> None: + store = create_tenant_store( + resolve_by_ref=_resolve_by_ref, + resolve_from_auth=_resolve_from_auth, + tenant_id=_account_tenant_id, + tenant_to_account=_tenant_to_account, + ) + accounts = _run(store.list(ctx=_ctx(None))) + assert accounts == [] + + def test_tenant_to_account_raises_returns_empty(self) -> None: + """``list`` MUST NOT raise on a per-spec valid request — an + adopter ``tenant_to_account`` exception must surface as ``[]``, + not propagate. Same outcome as auth-None (fail-closed quiet).""" + + def raising_tenant_to_account(tenant_id: str) -> Account | None: + del tenant_id + raise RuntimeError("simulated DB outage") + + store = create_tenant_store( + resolve_by_ref=_resolve_by_ref, + resolve_from_auth=_resolve_from_auth, + tenant_id=_account_tenant_id, + tenant_to_account=raising_tenant_to_account, + ) + accounts = _run(store.list(ctx=_ctx("buyer@pinnacle"))) + assert accounts == [] + + +# --------------------------------------------------------------------------- +# sync_governance — same per-entry gate as upsert +# --------------------------------------------------------------------------- + + +class TestSyncGovernance: + @staticmethod + def _build_with_recorder() -> tuple[Any, list[SyncGovernanceEntry]]: + writes: list[SyncGovernanceEntry] = [] + + def sync_governance_row( + entry: SyncGovernanceEntry, ctx: ResolveContext + ) -> SyncGovernanceResultRow: + del ctx + writes.append(entry) + return SyncGovernanceResultRow( + account=entry.account, + status="synced", + governance_agents=[{"url": a["url"]} for a in entry.governance_agents], + ) + + store = create_tenant_store( + resolve_by_ref=_resolve_by_ref, + resolve_from_auth=_resolve_from_auth, + tenant_id=_account_tenant_id, + tenant_to_account=_tenant_to_account, + sync_governance_row=sync_governance_row, + ) + return store, writes + + def test_in_tenant_entry_passes_through(self) -> None: + store, writes = self._build_with_recorder() + entries = [ + SyncGovernanceEntry( + account=_ref("pinnacle.example"), + governance_agents=[{"url": "https://gov.example"}], + ) + ] + rows = _run(store.sync_governance(entries, _ctx("buyer@pinnacle"))) + assert len(writes) == 1 + assert rows[0].status == "synced" + + def test_cross_tenant_entry_rejected(self) -> None: + store, writes = self._build_with_recorder() + entries = [ + SyncGovernanceEntry( + account=_ref("pinnacle.example"), + governance_agents=[], + ) + ] + rows = _run(store.sync_governance(entries, _ctx("buyer@meridian"))) + assert writes == [] + assert rows[0].status == "failed" + assert rows[0].errors is not None + assert rows[0].errors[0]["code"] == "PERMISSION_DENIED" + + def test_fail_closed_no_auth_rejects_every_entry(self) -> None: + store, writes = self._build_with_recorder() + entries = [ + SyncGovernanceEntry( + account=_ref("pinnacle.example"), + governance_agents=[], + ), + SyncGovernanceEntry( + account=_ref("meridian.example"), + governance_agents=[], + ), + ] + rows = _run(store.sync_governance(entries, _ctx("not-registered"))) + assert writes == [] + assert len(rows) == 2 + for row in rows: + assert row.status == "failed" + assert row.errors is not None + assert row.errors[0]["code"] == "PERMISSION_DENIED" + + +# --------------------------------------------------------------------------- +# Immutability — adopters cannot monkey-patch the gate methods +# --------------------------------------------------------------------------- + + +class TestImmutability: + def test_cannot_reassign_upsert(self) -> None: + """The Python equivalent of JS's ``Object.defineProperty(... + writable: false)``: gate methods live on the class. Instance + attribute assignment fails because the class uses ``__slots__`` + — no per-instance ``__dict__``.""" + store = create_tenant_store( + resolve_by_ref=_resolve_by_ref, + resolve_from_auth=_resolve_from_auth, + tenant_id=_account_tenant_id, + tenant_to_account=_tenant_to_account, + ) + with pytest.raises(AttributeError): + store.upsert = lambda *_a, **_kw: [] # type: ignore[method-assign] + + def test_cannot_reassign_sync_governance(self) -> None: + store = create_tenant_store( + resolve_by_ref=_resolve_by_ref, + resolve_from_auth=_resolve_from_auth, + tenant_id=_account_tenant_id, + tenant_to_account=_tenant_to_account, + ) + with pytest.raises(AttributeError): + store.sync_governance = lambda *_a, **_kw: [] # type: ignore[method-assign] + + def test_cannot_reassign_list(self) -> None: + store = create_tenant_store( + resolve_by_ref=_resolve_by_ref, + resolve_from_auth=_resolve_from_auth, + tenant_id=_account_tenant_id, + tenant_to_account=_tenant_to_account, + ) + with pytest.raises(AttributeError): + store.list = lambda *_a, **_kw: [] # type: ignore[method-assign] + + def test_cannot_reassign_resolve(self) -> None: + """Even ``resolve`` is locked — its tenant gate runs on + Path-1 too, and a swapped resolve would bypass it.""" + store = create_tenant_store( + resolve_by_ref=_resolve_by_ref, + resolve_from_auth=_resolve_from_auth, + tenant_id=_account_tenant_id, + tenant_to_account=_tenant_to_account, + ) + with pytest.raises(AttributeError): + store.resolve = lambda *_a, **_kw: None # type: ignore[method-assign]