Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions capiscio_mcp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ async def read_database(query: str) -> list[dict]:
)
from capiscio_mcp.keeper import ServerBadgeKeeper
from capiscio_mcp.connect import MCPServerIdentity
from capiscio_mcp.events import (
GuardEventEmitter,
get_event_emitter,
set_event_emitter,
)
from capiscio_mcp._core.version import (
MCP_VERSION,
CORE_MIN_VERSION,
Expand Down Expand Up @@ -182,4 +187,8 @@ async def read_database(query: str) -> list[dict]:
# One-liner identity setup (MCPServerIdentity.connect())
"MCPServerIdentity",
"ServerBadgeKeeper",
# Event emission (RFC-008)
"GuardEventEmitter",
"get_event_emitter",
"set_event_emitter",
]
11 changes: 11 additions & 0 deletions capiscio_mcp/connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ async def main():
)

from capiscio_mcp.keeper import ServerBadgeKeeper
from capiscio_mcp.events import GuardEventEmitter, set_event_emitter
from capiscio_mcp.registration import (
RegistrationError,
generate_server_keypair,
Expand Down Expand Up @@ -473,6 +474,16 @@ async def connect(
)

logger.info("MCPServerIdentity ready for server %s: %s", server_id, did)

# Step 6: Auto-configure guard event emitter
set_event_emitter(
GuardEventEmitter(
server_url=server_url,
api_key=api_key,
agent_id=server_id,
)
)
Comment on lines 476 to +485

return cls(
server_id=server_id,
did=did, # type: ignore[arg-type]
Expand Down
173 changes: 173 additions & 0 deletions capiscio_mcp/events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
"""
Guard event emission for policy enforcement telemetry.

Provides a module-level singleton ``GuardEventEmitter`` that the ``@guard``
decorator uses to POST ``capiscio.policy_enforced`` events to the registry's
``/v1/events`` endpoint.

The emitter is auto-configured when ``MCPServerIdentity.connect()`` is called,
or can be set manually via ``set_event_emitter()``. If no emitter is configured
the guard silently skips event emission (graceful degradation).
"""

from __future__ import annotations

import logging
import threading
import uuid
from datetime import datetime, timezone
from typing import Any, Dict, Optional

import requests

logger = logging.getLogger(__name__)

# ---------------------------------------------------------------------------
# Module-level singleton
# ---------------------------------------------------------------------------

_emitter: Optional["GuardEventEmitter"] = None
_emitter_lock = threading.Lock()


def get_event_emitter() -> Optional["GuardEventEmitter"]:
"""Return the configured event emitter, or ``None`` if not set."""
return _emitter


def set_event_emitter(emitter: Optional["GuardEventEmitter"]) -> None:
"""Set (or clear) the module-level event emitter singleton."""
global _emitter
with _emitter_lock:
_emitter = emitter


# ---------------------------------------------------------------------------
# Event emitter
# ---------------------------------------------------------------------------


class GuardEventEmitter:
"""Lightweight event emitter that POSTs events to ``/v1/events``.

Uses ``requests`` (already a capiscio-mcp dependency) and fires events
synchronously in a background thread so the guard decorator does not
block on network I/O.

Args:
server_url: Registry base URL (e.g. ``https://registry.capisc.io``).
api_key: Registry API key for ``X-Capiscio-Registry-Key`` header.
agent_id: Agent/server ID for event attribution.
enabled: Set ``False`` to suppress emission (useful in tests).
timeout: HTTP request timeout in seconds.
"""

EVENT_POLICY_ENFORCED = "capiscio.policy_enforced"

def __init__(
self,
server_url: str,
api_key: str,
agent_id: Optional[str] = None,
*,
enabled: bool = True,
timeout: float = 5.0,
) -> None:
self.server_url = server_url.rstrip("/")
self.api_key = api_key
self.agent_id = agent_id
self.enabled = enabled
self.timeout = timeout

# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------

def emit_policy_enforced(
self,
*,
decision: str,
tool_name: str,
deny_reason: Optional[str] = None,
deny_detail: Optional[str] = None,
agent_did: Optional[str] = None,
trust_level: Optional[int] = None,
evidence_id: Optional[str] = None,
error_code: Optional[str] = None,
requested_capability: Optional[str] = None,
presented_capability: Optional[str] = None,
capability_class: Optional[str] = None,
severity: Optional[str] = None,
) -> None:
"""Emit a ``capiscio.policy_enforced`` event.

Called by the ``@guard`` decorator on DENY decisions. Runs the
HTTP POST in a daemon thread so the caller is not blocked.
"""
if not self.enabled:
return

data: Dict[str, Any] = {
"decision": decision,
"tool_name": tool_name,
}
if deny_reason:
data["deny_reason"] = deny_reason
if deny_detail:
data["deny_detail"] = deny_detail
if agent_did:
data["agent_did"] = agent_did
if trust_level is not None:
data["trust_level"] = trust_level
if evidence_id:
data["evidence_id"] = evidence_id
if error_code:
data["error_code"] = error_code
if requested_capability:
data["requested_capability"] = requested_capability
if presented_capability:
data["presented_capability"] = presented_capability
if capability_class:
data["capability_class"] = capability_class
if severity:
data["severity"] = severity

event = {
"id": str(uuid.uuid4()),
"type": self.EVENT_POLICY_ENFORCED,
"agentId": self.agent_id,
"timestamp": datetime.now(timezone.utc).isoformat(),
"data": data,
}

# Fire-and-forget in a daemon thread
t = threading.Thread(target=self._send, args=(event,), daemon=True)
t.start()
Comment on lines +143 to +145

# ------------------------------------------------------------------
# Internals
# ------------------------------------------------------------------

def _send(self, event: Dict[str, Any]) -> None:
"""POST a single event to ``/v1/events``. Best-effort."""
url = f"{self.server_url}/v1/events"
headers = {
"Content-Type": "application/json",
"X-Capiscio-Registry-Key": self.api_key,
}
try:
resp = requests.post(
url,
json={"events": [event]},
headers=headers,
timeout=self.timeout,
)
if resp.status_code >= 400:
logger.debug(
"Event emission returned %s: %s",
resp.status_code,
resp.text[:200],
)
except Exception:
# Best-effort — never let event emission break the guard
logger.debug("Event emission failed", exc_info=True)
35 changes: 35 additions & 0 deletions capiscio_mcp/guard.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ async def execute_query(sql: str) -> list[dict]:
TrustLevel,
)
from capiscio_mcp.errors import GuardError, GuardConfigError
from capiscio_mcp.events import get_event_emitter

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -355,6 +356,38 @@ async def evaluate_tool_access(
)


def _emit_deny_event(
result: "GuardResult",
tool_name: str,
capability_class: Optional[str] = None,
) -> None:
"""Best-effort emit a ``capiscio.policy_enforced`` event on DENY.

Uses the module-level :func:`get_event_emitter` singleton. If no emitter
is configured the call is a no-op.
"""
emitter = get_event_emitter()
if emitter is None:
return
try:
emitter.emit_policy_enforced(
decision="DENY",
tool_name=tool_name,
deny_reason=result.deny_reason.value if result.deny_reason else None,
deny_detail=result.deny_detail,
agent_did=result.agent_did,
trust_level=result.trust_level,
evidence_id=result.evidence_id,
error_code=result.error_code,
requested_capability=result.requested_capability,
presented_capability=result.presented_capability,
capability_class=capability_class,
severity="HIGH",
)
except Exception:
logger.debug("Failed to emit deny event", exc_info=True)


# Decorator overloads for type hints
@overload
def guard(
Expand Down Expand Up @@ -473,6 +506,7 @@ async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
result.presented_capability,
result.evidence_id,
)
_emit_deny_event(result, effective_tool_name, capability_class)
raise GuardError(
reason=result.deny_reason or DenyReason.INTERNAL_ERROR,
detail=result.deny_detail or "Access denied",
Expand Down Expand Up @@ -584,6 +618,7 @@ async def run_eval():
result.presented_capability,
result.evidence_id,
)
_emit_deny_event(result, effective_tool_name, capability_class)
raise GuardError(
reason=result.deny_reason or DenyReason.INTERNAL_ERROR,
detail=result.deny_detail or "Access denied",
Expand Down
Loading