Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 157 additions & 0 deletions src/adcp/decisioning/platform_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -1034,4 +1034,161 @@ def _select_proposal_method(
return "refine_products"


class _RegistryPlatformAdapter(DecisioningPlatform):
"""DecisioningPlatform adapter backed by a :class:`TenantRegistry`.

Returned by :meth:`TenantRegistry.as_platform`. Not part of the
public API — the return type annotation on that method is the
:class:`DecisioningPlatform` base class.

Per-request dispatch:

1. Extract ``ctx.tenant_id`` (set by the transport layer from the
``Host`` header or URL path in multi-tenant deployments).
2. Await :meth:`~TenantRegistry.resolve_by_id` — triggers lazy
platform construction on first hit for
:meth:`~TenantRegistry.register_lazy` tenants.
3. Gate on health: raise ``SERVICE_UNAVAILABLE`` for states not
in ``serve_states`` (default: ``pending`` and ``disabled``
are blocked; ``healthy`` and ``unverified`` serve).
4. Forward the method call to the resolved platform, honouring
async/sync detection the same way as :class:`PlatformRouter`.
"""

def __init__(
self,
*,
registry: Any, # TenantRegistry at runtime; Any avoids circular import
accounts: Any,
capabilities: DecisioningCapabilities,
serve_states: frozenset[Any],
) -> None:
self.accounts = accounts
self.capabilities = capabilities
self._registry = registry
self._serve_states = frozenset(serve_states)

for method_name in sorted(_all_specialism_methods()):
if method_name in _ACCOUNT_STORE_METHODS:
continue
if method_name == "get_products":
continue
self.__dict__[method_name] = self._make_delegate(method_name)

async def _resolve_tenant_platform(
self, ctx: RequestContext[Any], method_name: str
) -> DecisioningPlatform:
"""Resolve tenant, gate on health, return the child platform."""
tenant_id = ctx.tenant_id
if not tenant_id:
raise AdcpError(
"ACCOUNT_NOT_FOUND",
message=(
"RegistryPlatformAdapter: ctx.tenant_id is unset. The transport "
"layer must populate tenant_id (typically from the Host header) "
"before dispatch. Check your SubdomainTenantMiddleware or "
"context_factory wiring."
),
recovery="terminal",
field="ctx.tenant_id",
)
resolution = await self._registry.resolve_by_id(tenant_id)
# Intentional topology hiding: unregistered tenants produce
# SERVICE_UNAVAILABLE rather than ACCOUNT_NOT_FOUND, matching the
# behaviour when a tenant is known but health-gated. From the buyer's
# perspective, "doesn't exist" and "temporarily unavailable" are
# indistinguishable — this avoids leaking which tenant_ids are
# enrolled with this seller. (Contrast: PlatformRouter raises
# ACCOUNT_NOT_FOUND for unknown tenants because its platform list is
# a closed static set, not a dynamic registry.)
if resolution is None or resolution.health not in self._serve_states:
health = resolution.health if resolution is not None else None
raise AdcpError(
"SERVICE_UNAVAILABLE",
message=(
f"RegistryPlatformAdapter: tenant {tenant_id!r} is not available "
f"(health={health!r}). Allowed states: {sorted(self._serve_states)}."
),
recovery="transient",
details={"tenant_id": tenant_id, "health": health},
)
# Guard catches tenants whose resolved platform doesn't implement the
# requested specialism method (e.g. a tenant registered as a
# SignalsPlatform-only seller being asked for get_products). It fires
# for any method_name dispatched via _make_delegate, not just
# get_products.
method = getattr(resolution.platform, method_name, None)
if method is None or not callable(method):
raise AdcpError(
"UNSUPPORTED_FEATURE",
message=(
f"RegistryPlatformAdapter: tenant {tenant_id!r}'s platform "
f"({type(resolution.platform).__name__!r}) does not implement "
f"{method_name!r}."
),
recovery="terminal",
)
return resolution.platform # type: ignore[no-any-return]

def _make_delegate(self, method_name: str) -> Any:
adapter = self

async def _delegate(*args: Any, **kwargs: Any) -> Any:
ctx = _resolve_ctx_from_args(args, kwargs)
platform = await adapter._resolve_tenant_platform(ctx, method_name)
method = getattr(platform, method_name)
if inspect.iscoroutinefunction(method):
return await method(*args, **kwargs)
return await asyncio.to_thread(method, *args, **kwargs)

_delegate.__name__ = method_name
_delegate.__qualname__ = f"_RegistryPlatformAdapter.{method_name}"
return _delegate

async def refine_get_products(self, *args: Any, **kwargs: Any) -> Any:
"""Refine entry point — delegates to get_products.

Mirrors :meth:`PlatformRouter.refine_get_products` so the
handler's refine dispatch path works correctly.
"""
return await self.get_products(*args, **kwargs)

async def get_products(self, *args: Any, **kwargs: Any) -> Any:
"""Per-tenant get_products dispatch.

Explicit override (not synthesized) to mirror PlatformRouter's
pattern — keeps the refine_get_products / get_products chain
intact without proposal_manager support (which the registry
adapter doesn't need).
"""
ctx = _resolve_ctx_from_args(args, kwargs)
platform = await self._resolve_tenant_platform(ctx, "get_products")
method = getattr(platform, "get_products")
if inspect.iscoroutinefunction(method):
return await method(*args, **kwargs)
return await asyncio.to_thread(method, *args, **kwargs)


def _make_registry_platform_adapter(
registry: Any,
*,
accounts: Any,
capabilities: DecisioningCapabilities,
serve_states: frozenset[Any],
) -> DecisioningPlatform:
"""Factory for :meth:`TenantRegistry.as_platform`.

Accepts the registry as ``Any`` to avoid a circular import at module
load time — :mod:`adcp.server.tenant_registry` calls this function
lazily from inside :meth:`TenantRegistry.as_platform`, not at import
time, so no circular dependency arises.
"""
return _RegistryPlatformAdapter(
registry=registry,
accounts=accounts,
capabilities=capabilities,
serve_states=serve_states,
)


__all__ = ["LazyPlatformRouter", "PlatformRouter"]
Loading
Loading