Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 119 additions & 0 deletions capiscio_sdk/_rpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1346,6 +1346,125 @@ def init(
"registered": response.registered,
}, None

def create_envelope(
self,
key_id: str,
subject_did: str,
capability_class: str,
delegation_depth_remaining: int,
issuer_badge_jti: str,
txn_id: str = "",
expires_in_seconds: int = 3600,
constraints_json: str = "",
subject_badge_jti: str = "",
enforcement_mode_min: str = "",
) -> tuple[str, str, str, Optional[str]]:
"""Create a root Authority Envelope (RFC-008 §6.1).

Args:
key_id: Key to sign with (from key store)
subject_did: DID of the agent receiving authority
capability_class: e.g., "tools.database.read"
delegation_depth_remaining: How many further delegations allowed
issuer_badge_jti: JTI of the issuer's badge
txn_id: Transaction ID (auto-generated if empty)
expires_in_seconds: TTL from now (default: 3600)
constraints_json: Optional JSON constraints object
subject_badge_jti: JTI of the subject's badge (optional)
enforcement_mode_min: Optional minimum enforcement mode

Returns:
Tuple of (envelope_jws, envelope_id, issuer_did, error_message)
"""
request = simpleguard_pb2.CreateEnvelopeRequest(
key_id=key_id,
subject_did=subject_did,
capability_class=capability_class,
delegation_depth_remaining=delegation_depth_remaining,
txn_id=txn_id,
expires_in_seconds=expires_in_seconds,
constraints_json=constraints_json,
issuer_badge_jti=issuer_badge_jti,
subject_badge_jti=subject_badge_jti,
enforcement_mode_min=enforcement_mode_min,
)
response = self._stub.CreateEnvelope(request)
error = response.error_message if response.error_message else None
return response.envelope_jws, response.envelope_id, response.issuer_did, error

def derive_envelope(
self,
parent_envelope_jws: str,
key_id: str,
subject_did: str,
capability_class: str,
delegation_depth_remaining: int,
issuer_badge_jti: str,
expires_in_seconds: int = 1800,
constraints_json: str = "",
subject_badge_jti: str = "",
enforcement_mode_min: str = "",
) -> tuple[str, str, str, Optional[str]]:
"""Derive a child Authority Envelope from a parent (RFC-008 §6.3).

Args:
parent_envelope_jws: Parent envelope JWS
key_id: Key to sign child with
subject_did: DID of the next delegate
capability_class: Must be equal or narrower than parent
delegation_depth_remaining: Must be < parent's depth
issuer_badge_jti: JTI of the child issuer's own badge
expires_in_seconds: TTL from now (default: 1800)
constraints_json: Must be equal or more restrictive
subject_badge_jti: JTI of the subject's badge
enforcement_mode_min: Optional minimum enforcement mode

Returns:
Tuple of (envelope_jws, envelope_id, parent_authority_hash, error_message)
"""
request = simpleguard_pb2.DeriveEnvelopeRequest(
parent_envelope_jws=parent_envelope_jws,
key_id=key_id,
subject_did=subject_did,
capability_class=capability_class,
delegation_depth_remaining=delegation_depth_remaining,
expires_in_seconds=expires_in_seconds,
constraints_json=constraints_json,
subject_badge_jti=subject_badge_jti,
issuer_badge_jti=issuer_badge_jti,
enforcement_mode_min=enforcement_mode_min,
)
response = self._stub.DeriveEnvelope(request)
error = response.error_message if response.error_message else None
return response.envelope_jws, response.envelope_id, response.parent_authority_hash, error

def build_transport_headers(
self,
chain: list[str],
badge_map: Optional[dict[str, str]] = None,
) -> tuple[str, str, str, Optional[str]]:
"""Build RFC-008 §15 transport headers for a delegation chain.

Args:
chain: Ordered JWS array [root, ..., leaf]
badge_map: DID → badge JWS for intermediate agents

Returns:
Tuple of (authority_header, authority_chain_header, badge_map_header, error_message)
"""
request = simpleguard_pb2.BuildTransportHeadersRequest(
chain=chain,
badge_map=badge_map or {},
)
response = self._stub.BuildTransportHeaders(request)
error = response.error_message if response.error_message else None
return (
response.authority_header,
response.authority_chain_header,
response.badge_map_header,
error,
)


class MCPClient:
"""Client wrapper for MCPService (RFC-006 Tool Authority + RFC-007 Server Identity).
Expand Down
Loading
Loading