diff --git a/CHANGELOG.md b/CHANGELOG.md index 6ecc620..0173b81 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,34 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added +- Capability namespaces and hierarchical discovery in `CapabilityRegistry`: + dot-notation `capability_id`s now expose `list_namespaces()` / + `list_namespace(prefix)` operations; `register_namespace(prefix, loader=...)` + enables deferred registration for large tool ecosystems (the loader runs + at most once on first access). `search()` gained an `offset` kwarg for + pagination, strips a small stop-word set, and now scores with a + BM25-flavoured ranker that weights `capability_id`/`tags` matches above + `description`. Flat (un-namespaced) capability IDs continue to work + unchanged. (#45) +- Capability marketplace, part 1 — manifest format & local registry: new + `CapabilityDescriptor` and `CapabilityManifest` dataclasses (both + JSON-round-trippable via `to_dict`/`from_dict`), new + `agent_kernel.federation` module with `build_manifest()`, + `import_manifest()`, and `merge_sensitivity()`, and new `Kernel.advertise()` + / `Kernel.import_remote()` methods. `Kernel` gained a `kernel_id` + argument used as the manifest publisher identity. Three trust policies + are honoured at import time (`most_restrictive` (default), `local_only`, + `remote_deferred`); imported capabilities are routed through a + caller-supplied driver and flow through the full local policy → token → + firewall pipeline. HMAC tokens remain kernel-scoped — a token issued by + one kernel cannot be verified by another with a different secret. New + errors `NamespaceNotFound`, `FederationError`, `ManifestError`, + `TrustPolicyError`. (#52) +- New docs: [`docs/federation.md`](docs/federation.md) for the marketplace + protocol and a namespace section in + [`docs/capabilities.md`](docs/capabilities.md). + ## [0.7.0] - 2026-05-20 ### Added diff --git a/docs/capabilities.md b/docs/capabilities.md index db448db..0c85d54 100644 --- a/docs/capabilities.md +++ b/docs/capabilities.md @@ -3,9 +3,60 @@ ## Naming conventions - Use `domain.verb_noun` format: `billing.list_invoices`, `users.get_profile`. +- Prefer fully namespaced IDs (`billing.invoices.list`) over flat ones — + the registry will infer namespace operations from the dot-segments and + large ecosystems benefit from being able to list/search per namespace. - Be specific: prefer `billing.cancel_invoice` over `billing.update`. - Avoid generic names like `billing.execute` or `api.call`. +## Namespaces and discovery + +`CapabilityRegistry` recognises dot-notation namespaces automatically. No +extra registration step is required — `register(Capability(capability_id= +"billing.invoices.list", ...))` is enough to populate the `billing` and +`billing.invoices` namespaces. + +```python +registry.list_namespaces() +# ['billing', 'crm'] + +registry.list_namespace("billing") +# [Capability('billing.invoices.list'), Capability('billing.payments.refund'), …] +``` + +For large tool ecosystems where eagerly registering hundreds of +capabilities is wasteful, declare a deferred loader. The loader runs at +most once, the first time the namespace is searched, listed, or any +capability under it is fetched via `get()`: + +```python +def load_billing() -> list[Capability]: + return [ + Capability(capability_id="billing.invoices.list", …), + Capability(capability_id="billing.invoices.create", …), + Capability(capability_id="billing.payments.refund", …), + ] + +registry.register_namespace( + "billing", + description="Billing and invoicing tools", + loader=load_billing, +) +``` + +Search ranks matches with a BM25-flavoured scorer that weights +`capability_id` and `tags` higher than `description`, strips a small +stop-word set (`a`, `the`, `please`, …), and offers `offset` for +pagination: + +```python +results = registry.search("list invoices", max_results=10, offset=0) +``` + +Search is deterministic — equal-scoring capabilities are returned in +`capability_id` order — and trips any deferred namespace loader whose +prefix shares a token with the query. + ## Granularity Each capability should map to a single, auditable action with clear side-effects. diff --git a/docs/federation.md b/docs/federation.md new file mode 100644 index 0000000..c50edc4 --- /dev/null +++ b/docs/federation.md @@ -0,0 +1,153 @@ +# Capability Federation — Marketplace Part 1 + +> Issue [#52](https://github.com/dgenio/agent-kernel/issues/52) (manifest +> format & local registry) is implemented here. Discovery over a network +> (issue [#51](https://github.com/dgenio/agent-kernel/issues/51)) is **not** +> part of this milestone — `agent-kernel` does not fetch manifests over +> HTTP or sign them on your behalf yet. Bring your own transport for now. + +## What this gives you + +A single kernel can: + +1. **Advertise** its capabilities as a JSON-serialisable + [`CapabilityManifest`](../src/agent_kernel/models.py). +2. **Import** another kernel's manifest, registering each capability locally + and routing invocations through a caller-supplied driver + (typically [`HTTPDriver`](integrations.md) or + [`MCPDriver`](integrations.md)). + +Every imported invocation still runs through the *local* policy → token → +firewall pipeline. The remote endpoint is never trusted to authorise on the +importing kernel's behalf. This keeps weaver-spec invariants intact for +imported capabilities: + +| Invariant | How it's enforced for imports | +|-----------|------------------------------| +| **I-01** — Firewall on every result | The local `Firewall` runs on the driver's `RawResult` exactly as for native capabilities. | +| **I-02** — Authorize + audit each call | The local `PolicyEngine` evaluates every request; the local `TraceStore` records every action. | +| **I-06** — Tokens bind principal + capability + constraints | Tokens are signed with the importing kernel's HMAC secret. A token issued by Kernel A cannot be verified by Kernel B, because their secrets differ. | + +## Publishing a manifest + +```python +from agent_kernel import ( + Capability, CapabilityRegistry, HMACTokenProvider, Kernel, + SafetyClass, SensitivityTag, +) + +registry = CapabilityRegistry() +registry.register( + Capability( + capability_id="billing.invoices.list", + name="List Invoices", + description="List recent invoices", + safety_class=SafetyClass.READ, + sensitivity=SensitivityTag.PII, + tags=["billing", "invoices"], + ) +) + +kernel = Kernel( + registry=registry, + token_provider=HMACTokenProvider(secret="…"), + kernel_id="agent-b", +) + +manifest = kernel.advertise(endpoint="https://agent-b.example/kernel") +print(manifest.to_dict()) +# { +# "kernel_id": "agent-b", +# "version": "1", +# "endpoint": "https://agent-b.example/kernel", +# "trust_level": "unverified", +# "capabilities": [ +# { +# "capability_id": "billing.invoices.list", +# "name": "List Invoices", +# … +# } +# ] +# } +``` + +The manifest deliberately omits internal driver IDs, operation names, +`parameters_model` Python references, and `tool_hints`. Only the +[`CapabilityDescriptor`](../src/agent_kernel/models.py) projection of each +capability is published. + +## Importing a manifest + +```python +import json + +import httpx +from agent_kernel import ( + CapabilityManifest, CapabilityRegistry, HMACTokenProvider, Kernel, +) +from agent_kernel.drivers.http import HTTPDriver, HTTPEndpoint + +# 1. Fetch the manifest by whatever transport suits you. +raw = httpx.get("https://agent-b.example/kernel/manifest").json() +manifest = CapabilityManifest.from_dict(raw) + +# 2. Build a local driver pointing at the remote endpoint. +remote = HTTPDriver(driver_id="agent-b") +for cap in manifest.capabilities: + remote.register_endpoint( + cap.capability_id, + HTTPEndpoint(url=f"{manifest.endpoint}/invoke/{cap.capability_id}", + method="POST"), + ) + +# 3. Import. `import_remote` registers the driver and adds routes. +kernel = Kernel( + registry=CapabilityRegistry(), + token_provider=HMACTokenProvider(secret="local-secret"), + kernel_id="agent-a", +) +kernel.import_remote(manifest, driver=remote, trust_policy="most_restrictive") + +# 4. Use imported capabilities exactly like local ones. +for cap in kernel.list_capabilities(): + print(cap.capability_id, "→", cap.impl.driver_id) +``` + +## Trust policies + +`import_remote(manifest, driver=..., trust_policy=...)` accepts three +values for `trust_policy`: + +| Value | Sensitivity handling | When to use | +|-------|---------------------|-------------| +| `"most_restrictive"` *(default)* | Imported capability keeps the remote `SensitivityTag` verbatim — the local firewall will then redact accordingly. | Crossing trust boundaries — when you can't fully verify the remote's policy. | +| `"local_only"` | Imported capability is registered with `SensitivityTag.NONE`; the importing kernel's policy is the only thing that gates the call. | You own both kernels and have a single canonical policy. | +| `"remote_deferred"` | Same sensitivity handling as `most_restrictive` today. Reserved for part 2, when the importing kernel will be able to defer to a remote policy decision before applying its own. | Delegation patterns where the remote owns the authoritative policy. | + +`merge_sensitivity(local, remote)` is exported for callers that maintain +their own capability records and want the canonical strictest-wins union. + +## What is *not* covered yet + +- **No network transport.** `agent-kernel` does not fetch, sign, or + authenticate manifests over HTTP — bring your own transport. Part 2 + (issue #51) adds an opt-in manifest endpoint and a discovery protocol. +- **No remote policy delegation.** `"remote_deferred"` currently behaves + identically to `"most_restrictive"`. The full "remote policy decides + first" semantics need part 2. +- **No automatic re-import.** Manifests are imported once. If the publisher + adds capabilities, the importer must re-fetch and re-import. +- **No identity verification.** `trust_level` is a publisher-declared hint; + it does not authenticate the publisher. Signature verification arrives + with part 2. + +## Reference + +- Models: [`CapabilityDescriptor`](../src/agent_kernel/models.py), + [`CapabilityManifest`](../src/agent_kernel/models.py). +- Functions: [`build_manifest`](../src/agent_kernel/federation.py), + [`import_manifest`](../src/agent_kernel/federation.py), + [`merge_sensitivity`](../src/agent_kernel/federation.py). +- Kernel methods: `Kernel.advertise()`, `Kernel.import_remote()`, + `Kernel.kernel_id`. +- Errors: `FederationError`, `ManifestError`, `TrustPolicyError`. diff --git a/src/agent_kernel/__init__.py b/src/agent_kernel/__init__.py index e7449b5..2d5a24f 100644 --- a/src/agent_kernel/__init__.py +++ b/src/agent_kernel/__init__.py @@ -31,6 +31,11 @@ from agent_kernel import OpenAIMiddleware, AnthropicMiddleware +Federation (capability marketplace):: + + from agent_kernel import CapabilityManifest, CapabilityDescriptor + from agent_kernel import build_manifest, import_manifest, TrustPolicy + Errors:: from agent_kernel import ( @@ -39,6 +44,7 @@ PolicyDenied, PolicyConfigError, DriverError, FirewallError, BudgetExhausted, BudgetConfigError, CapabilityNotFound, HandleNotFound, HandleExpired, + NamespaceNotFound, FederationError, ManifestError, TrustPolicyError, ) """ @@ -56,15 +62,26 @@ CapabilityAlreadyRegistered, CapabilityNotFound, DriverError, + FederationError, FirewallError, HandleExpired, HandleNotFound, + ManifestError, + NamespaceNotFound, PolicyConfigError, PolicyDenied, TokenExpired, TokenInvalid, TokenRevoked, TokenScopeError, + TrustPolicyError, +) +from .federation import ( + MANIFEST_VERSION, + TrustPolicy, + build_manifest, + import_manifest, + merge_sensitivity, ) from .firewall.budget_manager import BudgetManager from .firewall.budgets import Budgets @@ -75,7 +92,9 @@ from .models import ( ActionTrace, Capability, + CapabilityDescriptor, CapabilityGrant, + CapabilityManifest, CapabilityRequest, DenialExplanation, DryRunResult, @@ -83,6 +102,7 @@ Frame, Handle, ImplementationRef, + NamespaceMetadata, PolicyDecision, PolicyDecisionTrace, PolicyTraceStep, @@ -92,6 +112,7 @@ ResponseMode, RoutePlan, ToolHints, + TrustLevel, ) from .policy import DefaultPolicyEngine, ExplainingPolicyEngine, PolicyEngine from .policy_dsl import DeclarativePolicyEngine, PolicyMatch, PolicyRule @@ -112,7 +133,9 @@ "CapabilityRegistry", # models "Capability", + "CapabilityDescriptor", "CapabilityGrant", + "CapabilityManifest", "CapabilityRequest", "CapabilityToken", "DenialExplanation", @@ -121,6 +144,7 @@ "Frame", "Handle", "ImplementationRef", + "NamespaceMetadata", "PolicyDecision", "PolicyDecisionTrace", "PolicyTraceStep", @@ -131,6 +155,7 @@ "RoutePlan", "ActionTrace", "ToolHints", + "TrustLevel", # enums "SafetyClass", "SensitivityTag", @@ -142,15 +167,25 @@ "CapabilityAlreadyRegistered", "CapabilityNotFound", "DriverError", + "FederationError", "FirewallError", "HandleExpired", "HandleNotFound", + "ManifestError", + "NamespaceNotFound", "PolicyConfigError", "PolicyDenied", "TokenExpired", "TokenInvalid", "TokenRevoked", "TokenScopeError", + "TrustPolicyError", + # federation + "MANIFEST_VERSION", + "TrustPolicy", + "build_manifest", + "import_manifest", + "merge_sensitivity", # policy "AllowReason", "DefaultPolicyEngine", diff --git a/src/agent_kernel/errors.py b/src/agent_kernel/errors.py index 7944f7b..6918d51 100644 --- a/src/agent_kernel/errors.py +++ b/src/agent_kernel/errors.py @@ -126,3 +126,33 @@ class HandleNotFound(AgentKernelError): class HandleExpired(AgentKernelError): """Raised when a handle's TTL has elapsed.""" + + +# ── Registry / namespace errors ─────────────────────────────────────────────── + + +class NamespaceNotFound(AgentKernelError): + """Raised when a namespace prefix is not known to the registry.""" + + +# ── Federation errors ───────────────────────────────────────────────────────── + + +class FederationError(AgentKernelError): + """Base class for federation / capability marketplace failures.""" + + +class TrustPolicyError(FederationError): + """Raised when a federation request violates the configured trust policy. + + Examples include an unknown trust policy name, a remote manifest from an + untrusted endpoint, or a token that originated outside the importing + kernel's HMAC scope. + """ + + +class ManifestError(FederationError): + """Raised when a :class:`~agent_kernel.models.CapabilityManifest` cannot be + serialised, parsed, or imported (e.g. missing fields, invalid version, + duplicate capability IDs). + """ diff --git a/src/agent_kernel/federation.py b/src/agent_kernel/federation.py new file mode 100644 index 0000000..08f327c --- /dev/null +++ b/src/agent_kernel/federation.py @@ -0,0 +1,244 @@ +"""Capability marketplace — manifest format and local-registry federation. + +This module implements *part 1* of the capability marketplace protocol +(issue #52): one kernel can advertise its capabilities as a +:class:`~agent_kernel.models.CapabilityManifest`, and a second kernel can +import that manifest to extend its own registry. Remote invocation is then +performed by routing imported capabilities to a caller-supplied +:class:`~agent_kernel.drivers.base.Driver` (typically an +:class:`~agent_kernel.drivers.http.HTTPDriver` or +:class:`~agent_kernel.drivers.mcp.MCPDriver`) — every imported call still +flows through the *local* policy → token → firewall pipeline, satisfying +weaver-spec I-01 / I-02 / I-06. + +Discovery (part 2 of the marketplace, issue #51) is out of scope here — this +module is purely local. Manifests are constructed by ``Kernel.advertise()`` +and consumed by ``Kernel.import_remote()``; the importing side is free to +fetch them by any transport. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Literal + +from .enums import SensitivityTag +from .errors import ManifestError, TrustPolicyError +from .models import ( + Capability, + CapabilityDescriptor, + CapabilityManifest, + ImplementationRef, +) + +if TYPE_CHECKING: + from .registry import CapabilityRegistry + +MANIFEST_VERSION = "1" +"""Schema version published by :func:`build_manifest`.""" + +TrustPolicy = Literal["most_restrictive", "local_only", "remote_deferred"] +"""How an importing kernel weighs descriptor metadata against its own policy. + +- ``"most_restrictive"`` (default): the descriptor's sensitivity tag is + honoured as a floor — even if the importing kernel's policy would treat + the capability as ``NONE``, the imported capability keeps the remote tag. + Required by use cases that span trust boundaries. +- ``"local_only"``: the importing kernel ignores the descriptor's + sensitivity tag and registers the imported capability with + :attr:`~agent_kernel.enums.SensitivityTag.NONE`. Use when the importer + owns both kernels and has a single canonical policy. +- ``"remote_deferred"``: the descriptor's sensitivity tag is preserved + verbatim and treated as the remote policy's input; the importing kernel + layers its own policy on top. +""" + +_VALID_TRUST_POLICIES: frozenset[str] = frozenset( + {"most_restrictive", "local_only", "remote_deferred"} +) + +# Ordering used by ``most_restrictive`` when picking between two sensitivity +# tags. The strictest tag wins. +_SENSITIVITY_RANK: dict[SensitivityTag, int] = { + SensitivityTag.NONE: 0, + SensitivityTag.PII: 2, + SensitivityTag.PCI: 3, + SensitivityTag.SECRETS: 4, +} + + +def _stricter(a: SensitivityTag, b: SensitivityTag) -> SensitivityTag: + """Return the stricter of two sensitivity tags (higher rank wins).""" + if _SENSITIVITY_RANK.get(b, 0) > _SENSITIVITY_RANK.get(a, 0): + return b + return a + + +def build_manifest( + *, + kernel_id: str, + registry: CapabilityRegistry, + endpoint: str, + trust_level: Literal["verified", "unverified"] = "unverified", +) -> CapabilityManifest: + """Build a public-facing :class:`CapabilityManifest` for *registry*. + + Internal implementation details (``ImplementationRef``, ``parameters_model`` + Python references, ``tool_hints``) are stripped — only fields safe to share + over the wire are emitted. + + Args: + kernel_id: Stable identifier of the advertising kernel. + registry: The :class:`CapabilityRegistry` whose contents to publish. + endpoint: Transport endpoint at which the advertising kernel can be + reached. Format is transport-specific (e.g. + ``"https://agent-a.example/kernel"``). + trust_level: Publisher-declared trust hint. The importing kernel still + applies its configured trust policy regardless. + + Returns: + A :class:`CapabilityManifest` ready to be serialised with + :meth:`CapabilityManifest.to_dict`. + """ + descriptors = [_descriptor_for(cap) for cap in registry.list_all()] + return CapabilityManifest( + kernel_id=kernel_id, + version=MANIFEST_VERSION, + capabilities=descriptors, + endpoint=endpoint, + trust_level=trust_level, + ) + + +def _descriptor_for(cap: Capability) -> CapabilityDescriptor: + """Project a :class:`Capability` onto its safe-to-share descriptor view.""" + return CapabilityDescriptor( + capability_id=cap.capability_id, + name=cap.name, + description=cap.description, + safety_class=cap.safety_class, + sensitivity=cap.sensitivity, + tags=list(cap.tags), + parameters_schema=cap.parameters_schema, + ) + + +def import_manifest( + *, + manifest: CapabilityManifest, + registry: CapabilityRegistry, + driver_id: str, + trust_policy: TrustPolicy = "most_restrictive", +) -> list[Capability]: + """Register a remote manifest's capabilities into *registry*. + + Each descriptor becomes a regular :class:`Capability` whose + :class:`ImplementationRef` points at the caller-supplied *driver_id*. + The importing kernel must register a matching driver with + :meth:`~agent_kernel.Kernel.register_driver`. Invocations on the + resulting capability flow through the full local pipeline — the remote + endpoint is never trusted to perform policy, token verification, or + firewall transformation on behalf of the importer. + + Args: + manifest: The remote :class:`CapabilityManifest` to import. + registry: The local :class:`CapabilityRegistry` to extend. + driver_id: The local driver ID that will execute imported capabilities. + The caller is responsible for registering a driver with that ID + (typically an :class:`~agent_kernel.drivers.http.HTTPDriver` or + :class:`~agent_kernel.drivers.mcp.MCPDriver` configured for + ``manifest.endpoint``). + trust_policy: How the importer weighs the manifest's sensitivity + metadata. See :data:`TrustPolicy`. + + Returns: + The list of imported :class:`Capability` objects, in manifest order. + + Raises: + TrustPolicyError: If *trust_policy* is not one of the documented values. + ManifestError: If the manifest is malformed (missing fields, wrong + version, or contains a capability ID already registered locally). + """ + if trust_policy not in _VALID_TRUST_POLICIES: + raise TrustPolicyError( + f"Unknown trust_policy '{trust_policy}'. " + f"Expected one of: {sorted(_VALID_TRUST_POLICIES)}." + ) + if manifest.version != MANIFEST_VERSION: + raise ManifestError( + f"Manifest version '{manifest.version}' is not supported by this " + f"kernel (expected '{MANIFEST_VERSION}'). Upgrade agent-kernel or " + "re-publish the manifest with the supported version." + ) + if not manifest.endpoint: + raise ManifestError( + f"Manifest from kernel '{manifest.kernel_id}' has no endpoint. " + "Endpoints are required so the local kernel can route imported " + "capabilities to a driver." + ) + + imported: list[Capability] = [] + for descriptor in manifest.capabilities: + cap = _capability_for_import( + descriptor=descriptor, + driver_id=driver_id, + trust_policy=trust_policy, + ) + registry.register(cap) + imported.append(cap) + return imported + + +def _capability_for_import( + *, + descriptor: CapabilityDescriptor, + driver_id: str, + trust_policy: TrustPolicy, +) -> Capability: + """Materialise a local :class:`Capability` from a remote descriptor.""" + sensitivity = _resolve_sensitivity(descriptor.sensitivity, trust_policy) + # The descriptor never exposes a driver-side operation name; we mirror + # the convention used everywhere else in the kernel: drivers resolve + # ``args.get("operation", capability_id)``. Imported capabilities therefore + # default operation to the capability_id. + impl = ImplementationRef(driver_id=driver_id, operation=descriptor.capability_id) + return Capability( + capability_id=descriptor.capability_id, + name=descriptor.name, + description=descriptor.description, + safety_class=descriptor.safety_class, + sensitivity=sensitivity, + tags=list(descriptor.tags), + impl=impl, + parameters_schema=descriptor.parameters_schema, + ) + + +def _resolve_sensitivity(remote: SensitivityTag, trust_policy: TrustPolicy) -> SensitivityTag: + """Apply *trust_policy* to a remote sensitivity tag. + + ``most_restrictive`` and ``remote_deferred`` both keep the remote tag — + they differ only in which side's *policy engine* is consulted at + invocation time, which is part 2 of the marketplace work. ``local_only`` + strips the remote sensitivity entirely. + """ + if trust_policy == "local_only": + return SensitivityTag.NONE + return remote + + +def merge_sensitivity(local: SensitivityTag, remote: SensitivityTag) -> SensitivityTag: + """Return the stricter of *local* and *remote* sensitivity tags. + + Exposed for callers that maintain their own capability records outside + the registry and want the canonical ``most_restrictive`` union rule. + """ + return _stricter(local, remote) + + +__all__ = [ + "MANIFEST_VERSION", + "TrustPolicy", + "build_manifest", + "import_manifest", + "merge_sensitivity", +] diff --git a/src/agent_kernel/kernel.py b/src/agent_kernel/kernel.py index d8cc3bf..bb1680a 100644 --- a/src/agent_kernel/kernel.py +++ b/src/agent_kernel/kernel.py @@ -10,6 +10,7 @@ from .drivers.base import Driver, ExecutionContext from .enums import SafetyClass from .errors import AgentKernelError, DriverError +from .federation import TrustPolicy, build_manifest, import_manifest from .firewall.budget_manager import BudgetManager from .firewall.transform import Firewall from .handles import HandleStore @@ -17,6 +18,7 @@ ActionTrace, Capability, CapabilityGrant, + CapabilityManifest, CapabilityRequest, DenialExplanation, DryRunResult, @@ -83,6 +85,7 @@ def __init__( handle_store: HandleStore | None = None, trace_store: TraceStore | None = None, budget_manager: BudgetManager | None = None, + kernel_id: str = "agent-kernel", ) -> None: self._registry = registry self._policy: PolicyEngine = policy or DefaultPolicyEngine() @@ -93,6 +96,17 @@ def __init__( self._trace_store = trace_store or TraceStore() self._budget_manager = budget_manager self._drivers: dict[str, Driver] = {} + self._kernel_id = kernel_id + + @property + def kernel_id(self) -> str: + """Stable identifier used when this kernel advertises its capabilities. + + Defaults to ``"agent-kernel"`` — override at construction time when + running multiple kernels in the same process or across hosts so that + manifests carry a meaningful publisher identity. + """ + return self._kernel_id # ── Budget accessor ──────────────────────────────────────────────────────── @@ -579,3 +593,112 @@ def explain_denial( result = explain_fn(request, capability, principal, justification=justification) assert isinstance(result, DenialExplanation) return result + + # ── Federation (capability marketplace, part 1) ─────────────────────────── + + def advertise( + self, + *, + endpoint: str, + trust_level: Literal["verified", "unverified"] = "unverified", + ) -> CapabilityManifest: + """Build a public-facing :class:`CapabilityManifest` for this kernel. + + Internal implementation details (driver IDs, operation names, + ``parameters_model`` Python references) are stripped — only fields + safe to share over the wire are emitted. + + Args: + endpoint: Transport endpoint at which this kernel can be reached + (e.g. ``"https://agent-a.example/kernel"``). Format is + transport-specific; importing kernels use it to construct a + local driver — the endpoint is never invoked by federation + itself. + trust_level: Publisher-declared hint. The importing kernel still + applies its configured trust policy regardless. + + Returns: + A :class:`CapabilityManifest` ready to be serialised with + :meth:`CapabilityManifest.to_dict`. + """ + manifest = build_manifest( + kernel_id=self._kernel_id, + registry=self._registry, + endpoint=endpoint, + trust_level=trust_level, + ) + logger.info( + "advertise", + extra={ + "kernel_id": self._kernel_id, + "endpoint": endpoint, + "capability_count": len(manifest.capabilities), + }, + ) + return manifest + + def import_remote( + self, + manifest: CapabilityManifest, + *, + driver: Driver, + trust_policy: TrustPolicy = "most_restrictive", + ) -> list[Capability]: + """Register a remote manifest's capabilities into this kernel. + + Imported capabilities are routed to *driver* — typically an + :class:`~agent_kernel.drivers.http.HTTPDriver` or + :class:`~agent_kernel.drivers.mcp.MCPDriver` configured against the + manifest's endpoint. Invocation still flows through this kernel's + local policy → token → firewall pipeline; the remote endpoint is + never trusted to authorise on our behalf. + + Tokens are kernel-scoped: imported capabilities are signed by *this* + kernel's :class:`HMACTokenProvider`, so a token issued by another + kernel cannot be replayed against this one. + + Args: + manifest: The remote :class:`CapabilityManifest` to import. + driver: A driver that will execute imported capabilities. The + driver is registered on this kernel automatically; its + ``driver_id`` is recorded on each imported + :class:`Capability` so the router can find it. + trust_policy: How the importer weighs the manifest's sensitivity + metadata. See + :data:`~agent_kernel.federation.TrustPolicy`. + + Returns: + The list of imported :class:`Capability` objects, in manifest order. + + Raises: + TrustPolicyError: If *trust_policy* is unknown. + ManifestError: If the manifest is malformed or contains a + capability ID that conflicts with an existing local one. + CapabilityAlreadyRegistered: If any imported capability ID is + already registered locally. + """ + self.register_driver(driver) + imported = import_manifest( + manifest=manifest, + registry=self._registry, + driver_id=driver.driver_id, + trust_policy=trust_policy, + ) + # Route each imported capability to its driver so existing + # ``Kernel.invoke`` works unchanged. + router_add = getattr(self._router, "add_route", None) + if router_add is not None: + for cap in imported: + router_add(cap.capability_id, [driver.driver_id]) + logger.info( + "import_remote", + extra={ + "kernel_id": self._kernel_id, + "remote_kernel_id": manifest.kernel_id, + "endpoint": manifest.endpoint, + "capability_count": len(imported), + "trust_policy": trust_policy, + "driver_id": driver.driver_id, + }, + ) + return imported diff --git a/src/agent_kernel/models.py b/src/agent_kernel/models.py index 9c6adea..ead2e06 100644 --- a/src/agent_kernel/models.py +++ b/src/agent_kernel/models.py @@ -7,6 +7,7 @@ from __future__ import annotations import datetime +from collections.abc import Callable from dataclasses import dataclass, field from typing import TYPE_CHECKING, Any, Literal @@ -445,6 +446,158 @@ class DenialExplanation: # ── Dry-run ─────────────────────────────────────────────────────────────────── +# ── Namespaces & federation ─────────────────────────────────────────────────── + + +@dataclass(slots=True) +class NamespaceMetadata: + """Describes a capability namespace. + + Namespaces are dot-notation prefixes (``"billing"``, ``"billing.invoices"``) + inferred from registered :attr:`Capability.capability_id` values. A + :class:`NamespaceMetadata` entry can optionally carry a description and a + deferred *loader* — a zero-argument callable that registers additional + capabilities the first time the namespace is searched or listed. + """ + + prefix: str + """Dot-notation namespace prefix (e.g. ``"billing"`` or ``"billing.invoices"``).""" + + description: str = "" + """Optional human-readable description shown by ``list_namespaces``.""" + + loader: Callable[[], list[Capability]] | None = None + """Optional zero-arg loader invoked at most once on first access. + + The loader must return capabilities whose ``capability_id`` starts with + :attr:`prefix` (followed by ``.`` or matching the prefix exactly). The + registry stores the returned capabilities and marks the namespace as + loaded — subsequent searches or list calls will not re-invoke it. + """ + + loaded: bool = False + """``True`` once the deferred loader has been invoked (or no loader exists).""" + + +@dataclass(slots=True) +class CapabilityDescriptor: + """Public-facing capability description for cross-kernel advertising. + + A descriptor is the slice of a :class:`Capability` that is safe to share + over the wire: no driver IDs, no operation names, no Python-level + references. JSON-serialisable via :meth:`to_dict`. + """ + + capability_id: str + """Stable, namespaced identifier (e.g. ``"billing.invoices.list"``).""" + + name: str + """Short human-readable name.""" + + description: str + """What the capability does.""" + + safety_class: SafetyClass + """READ / WRITE / DESTRUCTIVE — preserved verbatim from the source capability.""" + + sensitivity: SensitivityTag = SensitivityTag.NONE + """Optional sensitivity tag — preserved verbatim.""" + + tags: list[str] = field(default_factory=list) + """Search/keyword tags from the source capability.""" + + parameters_schema: dict[str, Any] | None = None + """JSON Schema describing the capability's input parameters, if available.""" + + def to_dict(self) -> dict[str, Any]: + """Serialise the descriptor to a JSON-compatible dict.""" + return { + "capability_id": self.capability_id, + "name": self.name, + "description": self.description, + "safety_class": self.safety_class.value, + "sensitivity": self.sensitivity.value, + "tags": list(self.tags), + "parameters_schema": self.parameters_schema, + } + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> CapabilityDescriptor: + """Reconstruct a descriptor from a dict produced by :meth:`to_dict`.""" + return cls( + capability_id=data["capability_id"], + name=data["name"], + description=data["description"], + safety_class=SafetyClass(data["safety_class"]), + sensitivity=SensitivityTag(data.get("sensitivity", SensitivityTag.NONE.value)), + tags=list(data.get("tags", [])), + parameters_schema=data.get("parameters_schema"), + ) + + +TrustLevel = Literal["verified", "unverified"] + + +@dataclass(slots=True) +class CapabilityManifest: + """Serialisable advertisement of a kernel's capabilities. + + A manifest is what one kernel publishes for another to consume. It + intentionally omits internal driver IDs, operation names, and any + Python references — only the public-facing :class:`CapabilityDescriptor` + list, the advertising kernel's identity, and a transport endpoint. + + Manifests are weaver-spec contract artifacts (I-02): the importing kernel + must still run the full local pipeline (policy → token → firewall) on every + imported capability invocation. + """ + + kernel_id: str + """Stable identifier of the advertising kernel (e.g. ``"agent-a"``).""" + + version: str + """Schema version of this manifest payload (e.g. ``"1"``).""" + + capabilities: list[CapabilityDescriptor] + """Public-facing descriptors. Ordered by registration on the advertising side.""" + + endpoint: str + """Transport endpoint at which the advertising kernel can be reached. + + Format is transport-specific (e.g. ``"https://agent-a.example/kernel"`` + or ``"mcp://stdio:python -m mcp_server"``). The importing kernel uses it + purely to construct a local driver — the endpoint is never invoked by + federation itself. + """ + + trust_level: TrustLevel = "unverified" + """Trust hint declared by the publisher. ``"verified"`` indicates the + publisher claims independent verification (e.g. a signed manifest); the + importing kernel still applies its configured trust policy regardless. + """ + + def to_dict(self) -> dict[str, Any]: + """Serialise the manifest to a JSON-compatible dict.""" + return { + "kernel_id": self.kernel_id, + "version": self.version, + "endpoint": self.endpoint, + "trust_level": self.trust_level, + "capabilities": [cap.to_dict() for cap in self.capabilities], + } + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> CapabilityManifest: + """Reconstruct a manifest from a dict produced by :meth:`to_dict`.""" + return cls( + kernel_id=data["kernel_id"], + version=data["version"], + endpoint=data["endpoint"], + trust_level=data.get("trust_level", "unverified"), + capabilities=[CapabilityDescriptor.from_dict(c) for c in data["capabilities"]], + ) + + @dataclass(slots=True) class DryRunResult: """Result of a dry-run invocation — driver is never called. diff --git a/src/agent_kernel/registry.py b/src/agent_kernel/registry.py index 8687ae8..d603634 100644 --- a/src/agent_kernel/registry.py +++ b/src/agent_kernel/registry.py @@ -1,22 +1,99 @@ -"""Capability registry: register, lookup, and keyword-based matching.""" +"""Capability registry: register, lookup, namespaced discovery, and ranked search. + +Supports dot-notation namespaces (``"billing.invoices.list"``), deferred +namespace loaders for large tool ecosystems, and a BM25-flavoured score +that weights matches on ``capability_id`` and ``tags`` higher than +``description``. Flat (un-namespaced) capability IDs continue to work — they +are treated as living in a single-segment namespace. +""" from __future__ import annotations +import math import re +from collections.abc import Callable + +from .errors import ( + CapabilityAlreadyRegistered, + CapabilityNotFound, + NamespaceNotFound, +) +from .models import Capability, CapabilityRequest, NamespaceMetadata + +# Common English stop words that add noise to keyword search. Kept small +# (only words an LLM would routinely type into a goal) to avoid suppressing +# domain terms. +_STOP_WORDS: frozenset[str] = frozenset( + { + "a", + "an", + "and", + "any", + "are", + "as", + "at", + "be", + "by", + "for", + "from", + "get", + "give", + "i", + "in", + "is", + "it", + "me", + "my", + "of", + "on", + "or", + "please", + "show", + "that", + "the", + "this", + "to", + "want", + "with", + } +) -from .errors import CapabilityAlreadyRegistered, CapabilityNotFound -from .models import Capability, CapabilityRequest +# Field weights for BM25-flavoured scoring. Matches on capability_id and tags +# carry the most signal; description text is the noisiest. +_WEIGHT_ID = 4.0 +_WEIGHT_NAME = 2.0 +_WEIGHT_TAGS = 3.0 +_WEIGHT_DESCRIPTION = 1.0 + +# BM25 tunables (Lucene defaults). Held constant — randomness in matching is +# forbidden by AGENTS.md. +_BM25_K1 = 1.5 +_BM25_B = 0.75 class CapabilityRegistry: """Stores and retrieves :class:`Capability` objects. - Capabilities are registered by their ``capability_id`` and can be looked - up directly or discovered via keyword search against the goal description. + Capabilities are registered by their dot-notation ``capability_id`` + (e.g. ``"billing.invoices.list"``) and can be: + + - looked up directly via :meth:`get`, + - enumerated globally via :meth:`list_all`, + - enumerated per namespace via :meth:`list_namespaces` / + :meth:`list_namespace`, + - discovered via ranked text search (:meth:`search`). + + Flat IDs without a ``"."`` continue to work — they live in a single- + segment namespace named after themselves. """ def __init__(self) -> None: self._store: dict[str, Capability] = {} + self._namespaces: dict[str, NamespaceMetadata] = {} + # Reset cached search statistics when registrations change. + self._search_cache_dirty: bool = True + self._avg_doc_len: float = 0.0 + self._doc_freq: dict[str, int] = {} # ── Registration ────────────────────────────────────────────────────────── @@ -35,6 +112,7 @@ def register(self, capability: Capability) -> None: "Use a unique capability_id." ) self._store[capability.capability_id] = capability + self._search_cache_dirty = True def register_many(self, capabilities: list[Capability]) -> None: """Register multiple capabilities at once. @@ -45,11 +123,50 @@ def register_many(self, capabilities: list[Capability]) -> None: for cap in capabilities: self.register(cap) + def register_namespace( + self, + prefix: str, + *, + description: str = "", + loader: Callable[[], list[Capability]] | None = None, + ) -> None: + """Declare a namespace, optionally with a deferred loader. + + The loader (if given) is invoked exactly once, the first time the + namespace is searched, listed, or otherwise traversed. This lets a + host process advertise hundreds of namespaces without paying the + registration cost up front. + + Args: + prefix: Dot-notation namespace prefix (e.g. ``"billing"``). + description: Optional human-readable description. + loader: Optional zero-arg callable returning capabilities to + register on first access. Every returned capability's + ``capability_id`` must start with ``prefix`` (followed by ``.``) + or equal ``prefix`` exactly. + + Raises: + CapabilityAlreadyRegistered: If the namespace is already declared. + """ + if prefix in self._namespaces: + raise CapabilityAlreadyRegistered( + f"Namespace '{prefix}' is already declared. Choose a unique prefix." + ) + self._namespaces[prefix] = NamespaceMetadata( + prefix=prefix, + description=description, + loader=loader, + loaded=loader is None, + ) + # ── Lookup ──────────────────────────────────────────────────────────────── def get(self, capability_id: str) -> Capability: """Retrieve a capability by its ID. + If the capability ID falls under a declared namespace whose deferred + loader has not yet run, the loader is invoked first. + Args: capability_id: The capability's stable identifier. @@ -59,6 +176,8 @@ def get(self, capability_id: str) -> Capability: Raises: CapabilityNotFound: If no capability with that ID exists. """ + if capability_id not in self._store: + self._maybe_load_for(capability_id) try: return self._store[capability_id] except KeyError: @@ -68,21 +187,86 @@ def get(self, capability_id: str) -> Capability: ) from None def list_all(self) -> list[Capability]: - """Return all registered capabilities in registration order.""" + """Return every registered capability in registration order. + + Deferred-loader namespaces are *not* expanded by this call — to keep + ``list_all`` cheap. Use :meth:`list_namespace` to force a load. + """ return list(self._store.values()) + # ── Namespaces ──────────────────────────────────────────────────────────── + + def list_namespaces(self) -> list[str]: + """Return every top-level namespace prefix present in the registry. + + Combines namespaces inferred from registered capability IDs with + explicitly declared (:meth:`register_namespace`) prefixes. Returned + sorted for deterministic output. + """ + prefixes: set[str] = set() + for cap_id in self._store: + head, _, _ = cap_id.partition(".") + prefixes.add(head) + for ns in self._namespaces: + prefixes.add(ns.split(".", 1)[0]) + return sorted(prefixes) + + def list_namespace(self, prefix: str) -> list[Capability]: + """Return every capability whose ID lives under *prefix*. + + Triggers any deferred loader for *prefix* (or for the deepest declared + ancestor of *prefix*) before returning. A capability_id ``cap`` is + considered to live under ``prefix`` when ``cap == prefix`` or + ``cap.startswith(prefix + ".")``. + + Args: + prefix: Dot-notation namespace prefix. + + Returns: + Capabilities under the prefix, in registration order. + + Raises: + NamespaceNotFound: If no declared namespace or registered capability + lives under *prefix*. + """ + self._maybe_load_namespace(prefix) + results = [ + cap + for cap_id, cap in self._store.items() + if cap_id == prefix or cap_id.startswith(prefix + ".") + ] + if not results and prefix not in self._namespaces: + raise NamespaceNotFound( + f"Namespace '{prefix}' has no registered capabilities and is not declared. " + "Use register_namespace(prefix=...) or register a capability under it." + ) + return results + # ── Keyword matching ────────────────────────────────────────────────────── - def search(self, goal: str, *, max_results: int = 10) -> list[CapabilityRequest]: - """Search for capabilities matching a goal string. + def search( + self, + goal: str, + *, + max_results: int = 10, + offset: int = 0, + ) -> list[CapabilityRequest]: + """Search for capabilities matching *goal*. + + Tokenises *goal* (lower-cased word tokens, stop-words stripped) and + scores every capability using a BM25-flavoured ranker that weights + matches on ``capability_id`` and ``tags`` more heavily than + ``description``. Capabilities tied on score are returned in + ``capability_id`` order for determinism. - Splits *goal* into tokens and scores capabilities by how many tokens - appear in their ``capability_id``, ``name``, ``description``, or - ``tags``. Returns the top results as :class:`CapabilityRequest` objects. + Triggers any deferred namespace loader whose prefix overlaps the goal + tokens before scoring. Args: goal: Free-text description of the user's intent. max_results: Maximum number of results to return. + offset: Number of leading results to skip (paginates large + registries). Returns: Ordered list (highest score first) of :class:`CapabilityRequest`. @@ -91,13 +275,21 @@ def search(self, goal: str, *, max_results: int = 10) -> list[CapabilityRequest] if not tokens: return [] - scored: list[tuple[int, Capability]] = [] + self._load_namespaces_overlapping(tokens) + + if self._search_cache_dirty: + self._rebuild_search_index() + + scored: list[tuple[float, Capability]] = [] for cap in self._store.values(): score = self._score(cap, tokens) if score > 0: scored.append((score, cap)) scored.sort(key=lambda x: (-x[0], x[1].capability_id)) + + if offset: + scored = scored[offset:] return [ CapabilityRequest(capability_id=cap.capability_id, goal=goal) for _, cap in scored[:max_results] @@ -107,18 +299,91 @@ def search(self, goal: str, *, max_results: int = 10) -> list[CapabilityRequest] @staticmethod def _tokenize(text: str) -> list[str]: - """Split text into lower-case word tokens.""" - return re.findall(r"[a-z0-9]+", text.lower()) + """Split *text* into lower-case word tokens with stop-words removed.""" + return [t for t in re.findall(r"[a-z0-9]+", text.lower()) if t not in _STOP_WORDS] @staticmethod - def _score(cap: Capability, tokens: list[str]) -> int: - """Return a match score for a capability against query tokens.""" - corpus = " ".join( - [ - cap.capability_id, - cap.name, - cap.description, - ] - + cap.tags - ).lower() - return sum(1 for t in tokens if t in corpus) + def _corpus_fields(cap: Capability) -> tuple[list[str], list[str], list[str], list[str]]: + """Return per-field token lists used for scoring (id, name, tags, description).""" + tokenize = CapabilityRegistry._tokenize + return ( + tokenize(cap.capability_id.replace(".", " ").replace("_", " ")), + tokenize(cap.name), + tokenize(" ".join(cap.tags)), + tokenize(cap.description), + ) + + def _rebuild_search_index(self) -> None: + """Refresh BM25 document statistics after the registry mutates.""" + total_len = 0 + doc_freq: dict[str, int] = {} + for cap in self._store.values(): + id_tokens, name_tokens, tag_tokens, desc_tokens = self._corpus_fields(cap) + total_len += len(id_tokens) + len(name_tokens) + len(tag_tokens) + len(desc_tokens) + unique_tokens = set(id_tokens) | set(name_tokens) | set(tag_tokens) | set(desc_tokens) + for tok in unique_tokens: + doc_freq[tok] = doc_freq.get(tok, 0) + 1 + n = len(self._store) or 1 + self._avg_doc_len = total_len / n + self._doc_freq = doc_freq + self._search_cache_dirty = False + + def _score(self, cap: Capability, tokens: list[str]) -> float: + """Return a BM25-flavoured match score for *cap* against query *tokens*.""" + id_tokens, name_tokens, tag_tokens, desc_tokens = self._corpus_fields(cap) + doc_tokens = id_tokens + name_tokens + tag_tokens + desc_tokens + if not doc_tokens: + return 0.0 + doc_len = len(doc_tokens) + n = len(self._store) or 1 + score = 0.0 + for tok in tokens: + df = self._doc_freq.get(tok, 0) + if df == 0: + continue + # Per-field term frequency with field-specific weights. + tf = ( + _WEIGHT_ID * id_tokens.count(tok) + + _WEIGHT_NAME * name_tokens.count(tok) + + _WEIGHT_TAGS * tag_tokens.count(tok) + + _WEIGHT_DESCRIPTION * desc_tokens.count(tok) + ) + if tf == 0: + continue + idf = math.log(1 + (n - df + 0.5) / (df + 0.5)) + norm = 1 - _BM25_B + _BM25_B * (doc_len / (self._avg_doc_len or 1.0)) + score += idf * ((tf * (_BM25_K1 + 1)) / (tf + _BM25_K1 * norm)) + # Exact-prefix bonus: capability_id starts with the joined query. + joined = ".".join(tokens) + if joined and cap.capability_id.startswith(joined): + score += 1.0 + return score + + def _maybe_load_for(self, capability_id: str) -> None: + """Trigger any deferred loader whose prefix covers *capability_id*.""" + head, _, _ = capability_id.partition(".") + candidates = [head, capability_id] + for prefix in candidates: + if prefix in self._namespaces: + self._maybe_load_namespace(prefix) + + def _maybe_load_namespace(self, prefix: str) -> None: + """Invoke the deferred loader for *prefix* if it has not run yet.""" + meta = self._namespaces.get(prefix) + if meta is None or meta.loaded or meta.loader is None: + return + loader = meta.loader + # Mark as loaded *before* calling so a recursive load doesn't re-enter. + meta.loaded = True + for cap in loader(): + self.register(cap) + + def _load_namespaces_overlapping(self, tokens: list[str]) -> None: + """Load any deferred namespace whose prefix shares a token with *tokens*.""" + token_set = set(tokens) + for prefix, meta in list(self._namespaces.items()): + if meta.loaded: + continue + head_tokens = set(self._tokenize(prefix.replace(".", " ").replace("_", " "))) + if head_tokens & token_set: + self._maybe_load_namespace(prefix) diff --git a/tests/test_federation.py b/tests/test_federation.py new file mode 100644 index 0000000..71ce26d --- /dev/null +++ b/tests/test_federation.py @@ -0,0 +1,392 @@ +"""Tests for the capability marketplace — manifest format & local registry (#52).""" + +from __future__ import annotations + +import asyncio + +import pytest + +from agent_kernel import ( + Capability, + CapabilityAlreadyRegistered, + CapabilityDescriptor, + CapabilityManifest, + CapabilityRegistry, + HMACTokenProvider, + ImplementationRef, + InMemoryDriver, + Kernel, + ManifestError, + Principal, + SafetyClass, + SensitivityTag, + StaticRouter, + TokenInvalid, + TrustPolicyError, + build_manifest, + import_manifest, + merge_sensitivity, +) +from agent_kernel.drivers.base import ExecutionContext +from agent_kernel.federation import MANIFEST_VERSION +from agent_kernel.models import CapabilityRequest + +# ── Helpers ─────────────────────────────────────────────────────────────────── + + +def _make_cap(cap_id: str, **kwargs: object) -> Capability: + defaults: dict[str, object] = { + "name": cap_id.replace(".", " ").title(), + "description": f"Description for {cap_id}", + "safety_class": SafetyClass.READ, + } + defaults.update(kwargs) + return Capability(capability_id=cap_id, **defaults) # type: ignore[arg-type] + + +def _remote_kernel_with(*caps: Capability) -> Kernel: + reg = CapabilityRegistry() + for cap in caps: + reg.register(cap) + return Kernel( + registry=reg, + token_provider=HMACTokenProvider(secret="remote-kernel-secret"), + router=StaticRouter(), + kernel_id="agent-b", + ) + + +# ── Manifest serialisation ──────────────────────────────────────────────────── + + +def test_capability_descriptor_roundtrip() -> None: + descriptor = CapabilityDescriptor( + capability_id="billing.invoices.list", + name="List Invoices", + description="List recent invoices", + safety_class=SafetyClass.READ, + sensitivity=SensitivityTag.PII, + tags=["billing", "invoices"], + parameters_schema={"type": "object", "properties": {"limit": {"type": "integer"}}}, + ) + restored = CapabilityDescriptor.from_dict(descriptor.to_dict()) + assert restored == descriptor + + +def test_capability_manifest_to_dict_is_json_compatible() -> None: + import json + + manifest = CapabilityManifest( + kernel_id="agent-a", + version=MANIFEST_VERSION, + endpoint="https://agent-a.example/kernel", + trust_level="verified", + capabilities=[ + CapabilityDescriptor( + capability_id="billing.list_invoices", + name="List Invoices", + description="List recent invoices", + safety_class=SafetyClass.READ, + sensitivity=SensitivityTag.PII, + tags=["billing"], + ), + ], + ) + payload = json.dumps(manifest.to_dict()) + restored = CapabilityManifest.from_dict(json.loads(payload)) + assert restored == manifest + + +def test_build_manifest_strips_internal_implementation_details() -> None: + reg = CapabilityRegistry() + reg.register( + Capability( + capability_id="billing.list_invoices", + name="List Invoices", + description="List recent invoices", + safety_class=SafetyClass.READ, + sensitivity=SensitivityTag.PII, + tags=["billing"], + impl=ImplementationRef(driver_id="secret_internal_driver", operation="op_x"), + ) + ) + manifest = build_manifest( + kernel_id="agent-a", + registry=reg, + endpoint="https://agent-a.example/kernel", + ) + payload = manifest.to_dict() + serialised = repr(payload) + assert "secret_internal_driver" not in serialised + assert "op_x" not in serialised + # Public-facing fields are present. + cap_dict = payload["capabilities"][0] + assert cap_dict["capability_id"] == "billing.list_invoices" + assert cap_dict["sensitivity"] == SensitivityTag.PII.value + + +def test_build_manifest_preserves_registration_order() -> None: + reg = CapabilityRegistry() + for cid in ["c.three", "a.one", "b.two"]: + reg.register(_make_cap(cid)) + manifest = build_manifest(kernel_id="agent-a", registry=reg, endpoint="https://agent-a/k") + assert [c.capability_id for c in manifest.capabilities] == ["c.three", "a.one", "b.two"] + + +# ── Importing manifests ─────────────────────────────────────────────────────── + + +def test_import_manifest_registers_capabilities_with_driver_routing() -> None: + remote_kernel = _remote_kernel_with(_make_cap("billing.list_invoices")) + manifest = remote_kernel.advertise(endpoint="https://agent-b.example/kernel") + + local_reg = CapabilityRegistry() + imported = import_manifest( + manifest=manifest, + registry=local_reg, + driver_id="remote_b", + trust_policy="most_restrictive", + ) + assert len(imported) == 1 + cap = local_reg.get("billing.list_invoices") + assert cap.impl is not None + assert cap.impl.driver_id == "remote_b" + assert cap.impl.operation == "billing.list_invoices" + + +def test_import_manifest_rejects_unknown_trust_policy() -> None: + manifest = CapabilityManifest( + kernel_id="agent-b", + version=MANIFEST_VERSION, + endpoint="https://agent-b/k", + capabilities=[], + ) + with pytest.raises(TrustPolicyError, match="Unknown trust_policy"): + import_manifest( + manifest=manifest, + registry=CapabilityRegistry(), + driver_id="x", + trust_policy="totally_made_up", # type: ignore[arg-type] + ) + + +def test_import_manifest_rejects_unsupported_version() -> None: + manifest = CapabilityManifest( + kernel_id="agent-b", + version="999", + endpoint="https://agent-b/k", + capabilities=[], + ) + with pytest.raises(ManifestError, match="version '999' is not supported"): + import_manifest(manifest=manifest, registry=CapabilityRegistry(), driver_id="x") + + +def test_import_manifest_rejects_empty_endpoint() -> None: + manifest = CapabilityManifest( + kernel_id="agent-b", + version=MANIFEST_VERSION, + endpoint="", + capabilities=[], + ) + with pytest.raises(ManifestError, match="has no endpoint"): + import_manifest(manifest=manifest, registry=CapabilityRegistry(), driver_id="x") + + +def test_import_manifest_duplicate_capability_raises() -> None: + local = CapabilityRegistry() + local.register(_make_cap("billing.list_invoices")) + remote = _remote_kernel_with(_make_cap("billing.list_invoices")) + manifest = remote.advertise(endpoint="https://agent-b/k") + with pytest.raises(CapabilityAlreadyRegistered): + import_manifest(manifest=manifest, registry=local, driver_id="remote_b") + + +# ── Trust policies ──────────────────────────────────────────────────────────── + + +def test_trust_policy_most_restrictive_preserves_sensitivity() -> None: + remote = _remote_kernel_with(_make_cap("crm.contacts.list", sensitivity=SensitivityTag.PII)) + manifest = remote.advertise(endpoint="https://agent-b/k") + local_reg = CapabilityRegistry() + import_manifest( + manifest=manifest, + registry=local_reg, + driver_id="remote_b", + trust_policy="most_restrictive", + ) + assert local_reg.get("crm.contacts.list").sensitivity == SensitivityTag.PII + + +def test_trust_policy_local_only_strips_sensitivity() -> None: + remote = _remote_kernel_with(_make_cap("crm.contacts.list", sensitivity=SensitivityTag.PII)) + manifest = remote.advertise(endpoint="https://agent-b/k") + local_reg = CapabilityRegistry() + import_manifest( + manifest=manifest, + registry=local_reg, + driver_id="remote_b", + trust_policy="local_only", + ) + assert local_reg.get("crm.contacts.list").sensitivity == SensitivityTag.NONE + + +def test_trust_policy_remote_deferred_preserves_sensitivity() -> None: + remote = _remote_kernel_with(_make_cap("crm.contacts.list", sensitivity=SensitivityTag.PII)) + manifest = remote.advertise(endpoint="https://agent-b/k") + local_reg = CapabilityRegistry() + import_manifest( + manifest=manifest, + registry=local_reg, + driver_id="remote_b", + trust_policy="remote_deferred", + ) + assert local_reg.get("crm.contacts.list").sensitivity == SensitivityTag.PII + + +def test_merge_sensitivity_picks_strictest() -> None: + assert merge_sensitivity(SensitivityTag.NONE, SensitivityTag.PII) == SensitivityTag.PII + assert merge_sensitivity(SensitivityTag.PII, SensitivityTag.NONE) == SensitivityTag.PII + assert merge_sensitivity(SensitivityTag.PII, SensitivityTag.PCI) == SensitivityTag.PCI + assert merge_sensitivity(SensitivityTag.PCI, SensitivityTag.SECRETS) == SensitivityTag.SECRETS + assert merge_sensitivity(SensitivityTag.NONE, SensitivityTag.NONE) == SensitivityTag.NONE + + +# ── Kernel.advertise() / Kernel.import_remote() ─────────────────────────────── + + +def test_kernel_advertise_uses_kernel_id() -> None: + reg = CapabilityRegistry() + reg.register(_make_cap("billing.list_invoices")) + kernel = Kernel( + registry=reg, + token_provider=HMACTokenProvider(secret="k1"), + kernel_id="my-fancy-kernel", + ) + manifest = kernel.advertise(endpoint="https://my-kernel/k") + assert manifest.kernel_id == "my-fancy-kernel" + assert manifest.endpoint == "https://my-kernel/k" + assert manifest.version == MANIFEST_VERSION + + +def test_kernel_import_remote_registers_driver_and_route() -> None: + remote = _remote_kernel_with(_make_cap("billing.list_invoices")) + manifest = remote.advertise(endpoint="https://agent-b/k") + + local_reg = CapabilityRegistry() + local_router = StaticRouter(routes={}) + local = Kernel( + registry=local_reg, + token_provider=HMACTokenProvider(secret="local-secret"), + router=local_router, + kernel_id="agent-a", + ) + + remote_driver = InMemoryDriver(driver_id="remote_b") + remote_driver.register_handler( + "billing.list_invoices", + lambda ctx: [{"id": "INV-1", "amount": 10.0}], + ) + + imported = local.import_remote(manifest, driver=remote_driver, trust_policy="local_only") + assert [c.capability_id for c in imported] == ["billing.list_invoices"] + + # The driver-routing wiring is correct. + plan = local_router.route("billing.list_invoices") + assert plan.driver_ids == ["remote_b"] + + +def test_imported_capability_invokes_through_local_pipeline() -> None: + remote = _remote_kernel_with(_make_cap("billing.list_invoices")) + manifest = remote.advertise(endpoint="https://agent-b/k") + + local_reg = CapabilityRegistry() + local = Kernel( + registry=local_reg, + token_provider=HMACTokenProvider(secret="local-secret"), + router=StaticRouter(), + kernel_id="agent-a", + ) + driver = InMemoryDriver(driver_id="remote_b") + invoked = {"called_with": None} + + def list_invoices(ctx: ExecutionContext) -> list[dict[str, object]]: + invoked["called_with"] = ctx.capability_id # type: ignore[assignment] + return [{"id": "INV-1", "amount": 100.0, "email": "x@y.z"}] + + driver.register_handler("billing.list_invoices", list_invoices) + local.import_remote(manifest, driver=driver, trust_policy="local_only") + + principal = Principal(principal_id="alice", roles=["reader"], attributes={"tenant": "acme"}) + request = CapabilityRequest(capability_id="billing.list_invoices", goal="check invoices") + token = local.get_token(request, principal, justification="") + + async def run() -> object: + return await local.invoke( + token, + principal=principal, + args={"operation": "billing.list_invoices"}, + response_mode="table", + ) + + frame = asyncio.run(run()) + # Capability was routed to the imported driver. + assert invoked["called_with"] == "billing.list_invoices" + # Trace was recorded by the local kernel. + trace = local.explain(frame.action_id) # type: ignore[attr-defined] + assert trace.capability_id == "billing.list_invoices" + assert trace.driver_id == "remote_b" + + +def test_imported_capability_keeps_remote_sensitivity_under_most_restrictive() -> None: + """A `most_restrictive` import floors the imported cap's sensitivity at the remote tag. + + This is what makes the firewall apply the same redaction to imported PII + capabilities as the remote would. + """ + remote = _remote_kernel_with(_make_cap("crm.contacts.list", sensitivity=SensitivityTag.PII)) + manifest = remote.advertise(endpoint="https://agent-b/k") + local = Kernel( + registry=CapabilityRegistry(), + token_provider=HMACTokenProvider(secret="local-secret"), + kernel_id="agent-a", + ) + local.import_remote(manifest, driver=InMemoryDriver(driver_id="remote_b")) + imported_cap = local.list_capabilities()[0] + assert imported_cap.sensitivity == SensitivityTag.PII + + +# ── Token isolation across kernels (kernel-scoped HMAC) ─────────────────────── + + +def test_tokens_are_kernel_scoped_by_hmac_secret() -> None: + """A token signed by kernel A's HMAC provider must not verify on kernel B. + + `Kernel` instances with different secrets produce tokens that fail + signature verification on the other side, which is what makes + "kernel-scoped" tokens safe across an imported capability boundary. + """ + reg_a = CapabilityRegistry() + reg_a.register(_make_cap("billing.list_invoices")) + kernel_a = Kernel( + registry=reg_a, + token_provider=HMACTokenProvider(secret="secret-a"), + router=StaticRouter(), + kernel_id="agent-a", + ) + + reg_b = CapabilityRegistry() + reg_b.register(_make_cap("billing.list_invoices")) + kernel_b_provider = HMACTokenProvider(secret="secret-b") + + principal = Principal(principal_id="alice", roles=["reader"], attributes={"tenant": "acme"}) + token = kernel_a.get_token( + CapabilityRequest(capability_id="billing.list_invoices", goal="x"), + principal, + justification="", + ) + with pytest.raises(TokenInvalid, match="invalid signature"): + kernel_b_provider.verify( + token, + expected_principal_id="alice", + expected_capability_id="billing.list_invoices", + ) diff --git a/tests/test_registry.py b/tests/test_registry.py index 6e63597..f2892a8 100644 --- a/tests/test_registry.py +++ b/tests/test_registry.py @@ -110,3 +110,196 @@ def test_search_goal_preserved(registry: CapabilityRegistry) -> None: goal = "list all billing invoices please" results = registry.search(goal) assert all(r.goal == goal for r in results) + + +# ── Namespace operations (#45) ──────────────────────────────────────────────── + + +def test_list_namespaces_from_registered_capabilities() -> None: + reg = CapabilityRegistry() + reg.register(_make_cap("billing.invoices.list")) + reg.register(_make_cap("billing.invoices.create")) + reg.register(_make_cap("crm.contacts.search")) + reg.register(_make_cap("flat_id")) + assert reg.list_namespaces() == ["billing", "crm", "flat_id"] + + +def test_list_namespace_returns_capabilities_under_prefix() -> None: + reg = CapabilityRegistry() + reg.register(_make_cap("billing.invoices.list")) + reg.register(_make_cap("billing.invoices.create")) + reg.register(_make_cap("billing.payments.refund")) + reg.register(_make_cap("crm.contacts.search")) + + billing = [c.capability_id for c in reg.list_namespace("billing")] + assert sorted(billing) == [ + "billing.invoices.create", + "billing.invoices.list", + "billing.payments.refund", + ] + + invoices = [c.capability_id for c in reg.list_namespace("billing.invoices")] + assert sorted(invoices) == [ + "billing.invoices.create", + "billing.invoices.list", + ] + + +def test_list_namespace_exact_match_is_included() -> None: + reg = CapabilityRegistry() + reg.register(_make_cap("billing")) + reg.register(_make_cap("billing.invoices.list")) + ids = [c.capability_id for c in reg.list_namespace("billing")] + assert "billing" in ids + assert "billing.invoices.list" in ids + + +def test_list_namespace_unknown_prefix_raises() -> None: + from agent_kernel import NamespaceNotFound + + reg = CapabilityRegistry() + reg.register(_make_cap("billing.invoices.list")) + with pytest.raises(NamespaceNotFound, match="no registered capabilities"): + reg.list_namespace("never.declared") + + +def test_register_namespace_duplicate_raises() -> None: + reg = CapabilityRegistry() + reg.register_namespace("billing", description="Billing tools") + with pytest.raises(CapabilityAlreadyRegistered, match="already declared"): + reg.register_namespace("billing") + + +def test_deferred_loader_called_exactly_once_on_first_access() -> None: + call_count = {"n": 0} + + def loader() -> list[Capability]: + call_count["n"] += 1 + return [_make_cap("ondemand.list"), _make_cap("ondemand.create")] + + reg = CapabilityRegistry() + reg.register_namespace("ondemand", description="Lazy loaded", loader=loader) + + # First access triggers the loader. + caps = reg.list_namespace("ondemand") + assert {c.capability_id for c in caps} == {"ondemand.list", "ondemand.create"} + assert call_count["n"] == 1 + + # Second access does not re-invoke. + reg.list_namespace("ondemand") + assert call_count["n"] == 1 + + +def test_deferred_loader_triggers_on_get() -> None: + def loader() -> list[Capability]: + return [_make_cap("lazy.thing")] + + reg = CapabilityRegistry() + reg.register_namespace("lazy", loader=loader) + cap = reg.get("lazy.thing") + assert cap.capability_id == "lazy.thing" + + +def test_deferred_loader_triggers_on_search_overlap() -> None: + call_count = {"n": 0} + + def loader() -> list[Capability]: + call_count["n"] += 1 + return [_make_cap("billing.weekly_report", description="weekly revenue report")] + + reg = CapabilityRegistry() + reg.register_namespace("billing", loader=loader) + results = reg.search("weekly billing") + ids = [r.capability_id for r in results] + assert "billing.weekly_report" in ids + assert call_count["n"] == 1 + + +# ── Search scoring & pagination (#45) ───────────────────────────────────────── + + +def test_search_id_match_ranks_above_description_only() -> None: + reg = CapabilityRegistry() + reg.register(_make_cap("invoices.list", description="unrelated text")) + reg.register( + _make_cap( + "ledger.report", + description="invoices summary", + ) + ) + results = reg.search("invoices") + assert [r.capability_id for r in results][:2] == ["invoices.list", "ledger.report"] + + +def test_search_pagination_offset() -> None: + reg = CapabilityRegistry() + for i in range(15): + reg.register(_make_cap(f"billing.invoice{i:02d}", tags=["invoice"])) + page1 = reg.search("invoice", max_results=5, offset=0) + page2 = reg.search("invoice", max_results=5, offset=5) + page3 = reg.search("invoice", max_results=5, offset=10) + assert len(page1) == 5 + assert len(page2) == 5 + assert len(page3) == 5 + ids = {r.capability_id for r in page1 + page2 + page3} + assert len(ids) == 15 + + +def test_search_pagination_offset_does_not_overlap() -> None: + reg = CapabilityRegistry() + for i in range(10): + reg.register(_make_cap(f"billing.invoice{i:02d}", tags=["invoice"])) + page1 = {r.capability_id for r in reg.search("invoice", max_results=4, offset=0)} + page2 = {r.capability_id for r in reg.search("invoice", max_results=4, offset=4)} + assert page1.isdisjoint(page2) + + +def test_search_stop_words_are_stripped() -> None: + reg = CapabilityRegistry() + reg.register(_make_cap("billing.list_invoices")) + # "to" / "the" / "please" must not contribute matches. + results = reg.search("the to please") + assert results == [] + + +def test_search_tags_outrank_description() -> None: + reg = CapabilityRegistry() + reg.register( + _make_cap( + "alpha.report", + description="quarterly revenue summary", + tags=["analytics"], + ) + ) + reg.register( + _make_cap( + "beta.report", + description="alpha analytics description", + tags=["unrelated"], + ) + ) + results = reg.search("analytics") + assert [r.capability_id for r in results][0] == "alpha.report" + + +def test_search_scales_to_500_capabilities() -> None: + """Sanity check: search over 500 capabilities completes quickly.""" + import time + + reg = CapabilityRegistry() + for i in range(500): + ns = "billing" if i % 2 == 0 else "crm" + reg.register( + _make_cap( + f"{ns}.thing{i:04d}", + description=f"deterministic stuff for record {i}", + tags=[ns, "thing"], + ) + ) + start = time.perf_counter() + results = reg.search("billing thing", max_results=10) + elapsed = time.perf_counter() - start + assert len(results) == 10 + # Generous bound: BM25 over 500 docs with ~5 tokens each should be + # well under a second on any developer machine. + assert elapsed < 1.0, f"search took {elapsed:.3f}s for 500 capabilities"