-
Notifications
You must be signed in to change notification settings - Fork 0
feat: SimpleGuard Enforcement Strategy (v0.3.0) #5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
✅ Documentation validation passed!
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR introduces the SimpleGuard enforcement strategy (v0.3.0), shifting the CapiscIO SDK from a "validation-first" to an "enforcement-first" security model. The core feature is zero-configuration security enforcement for Agent-to-Agent (A2A) protocol interactions using Ed25519 signatures, SHA-256 body hash verification, and replay protection.
Key changes:
- SimpleGuard Core: New security enforcer with Identity (Ed25519 JWS), Integrity (SHA-256 body hash), and Freshness (60-second expiration window) enforcement
- FastAPI Middleware: Drop-in
CapiscioMiddlewarefor automatic request protection with telemetry support - Zero-Config Development: Auto-generates keys and agent cards in dev mode
Reviewed changes
Copilot reviewed 14 out of 14 changed files in this pull request and generated 12 comments.
Show a summary per file
| File | Description |
|---|---|
| capiscio_sdk/simple_guard.py | Core security enforcer implementing JWS signing/verification, body hash integrity checks, and replay protection |
| capiscio_sdk/integrations/fastapi.py | FastAPI middleware for automatic request validation with Server-Timing telemetry |
| capiscio_sdk/errors.py | Added ConfigurationError and VerificationError exceptions |
| capiscio_sdk/init.py | Exported SimpleGuard in public API |
| tests/unit/test_simple_guard.py | Comprehensive unit tests for SimpleGuard functionality |
| tests/unit/test_fastapi_integration.py | Integration tests for FastAPI middleware |
| examples/secure_ping_pong/server.py | Demo server showcasing zero-config security setup |
| examples/secure_ping_pong/client.py | Demo client testing valid requests, tampering, and replay attacks |
| examples/secure_ping_pong/README.md | Documentation for the demo application |
| pyproject.toml | Version bump to 0.3.0 |
| examples/simple_agent/requirements.txt | Updated capiscio-sdk dependency version |
| SECURITY.md | Updated supported versions table |
| README.md | Complete rewrite reflecting enforcement-first approach |
| CHANGELOG.md | Added v0.3.0 release notes |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
✅ Documentation validation passed!
|
|
✅ All checks passed! Ready for review. |
|
I have addressed the code review comments:
CI should be running now. |
|
✅ Documentation validation passed!
|
|
✅ All checks passed! Ready for review. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 14 out of 14 changed files in this pull request and generated 19 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| except jwt.InvalidSignatureError: | ||
| logger.warning(f'{{"event": "agent_call_denied", "kid": "{kid}", "reason": "invalid_signature"}}') | ||
| raise VerificationError("Invalid signature.") | ||
| except jwt.ExpiredSignatureError: | ||
| logger.warning(f'{{"event": "agent_call_denied", "kid": "{kid}", "reason": "token_expired"}}') | ||
| raise VerificationError("Token expired.") |
Copilot
AI
Nov 23, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The variable kid may not be defined when exception handlers at lines 199-204 try to reference it. If an exception occurs before line 135 (e.g., during jwt.get_unverified_header()), or if kid is None at line 135, the logging statements will raise a NameError or UnboundLocalError. Consider initializing kid = "unknown" at the start of the try block or handling the undefined case in exception handlers.
| print(f"Agent ID: {guard.agent_id}") | ||
| print(f"Trust Store: {guard.trusted_dir}") | ||
| print("Starting server on port 8000...") | ||
| uvicorn.run(app, host="0.0.0.0", port=8000) |
Copilot
AI
Nov 23, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Binding to 0.0.0.0 (all network interfaces) in a development example could be a security concern. Since this is using dev_mode=True and auto-generated keys meant for local development, the example should bind to 127.0.0.1 (localhost only) to prevent unintended network exposure. If 0.0.0.0 is intentional for demo purposes, add a comment explaining the security implications.
| uvicorn.run(app, host="0.0.0.0", port=8000) | |
| uvicorn.run(app, host="127.0.0.1", port=8000) |
| raise VerificationError("Missing timestamp claims (exp, iat).") | ||
|
|
||
| if now > (exp + CLOCK_SKEW_LEEWAY): | ||
| logger.warning(f'{{"event": "agent_call_denied", "kid": "{kid}", "reason": "token_expired"}}') | ||
| raise VerificationError("Token expired.") | ||
|
|
||
| if now < (iat - CLOCK_SKEW_LEEWAY): | ||
| logger.warning(f'{{"event": "agent_call_denied", "kid": "{kid}", "reason": "clock_skew"}}') | ||
| raise VerificationError("Token not yet valid (Clock skew).") |
Copilot
AI
Nov 23, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inconsistent indentation detected. Lines 183, 186, 187, 190, and 191 use extra indentation (one extra space) compared to the standard indentation used elsewhere in the file. These lines should align with the surrounding code at the same indentation level.
| raise VerificationError("Missing timestamp claims (exp, iat).") | |
| if now > (exp + CLOCK_SKEW_LEEWAY): | |
| logger.warning(f'{{"event": "agent_call_denied", "kid": "{kid}", "reason": "token_expired"}}') | |
| raise VerificationError("Token expired.") | |
| if now < (iat - CLOCK_SKEW_LEEWAY): | |
| logger.warning(f'{{"event": "agent_call_denied", "kid": "{kid}", "reason": "clock_skew"}}') | |
| raise VerificationError("Token not yet valid (Clock skew).") | |
| raise VerificationError("Missing timestamp claims (exp, iat).") | |
| if now > (exp + CLOCK_SKEW_LEEWAY): | |
| logger.warning(f'{{"event": "agent_call_denied", "kid": "{kid}", "reason": "token_expired"}}') | |
| raise VerificationError("Token expired.") | |
| if now < (iat - CLOCK_SKEW_LEEWAY): | |
| logger.warning(f'{{"event": "agent_call_denied", "kid": "{kid}", "reason": "clock_skew"}}') | |
| raise VerificationError("Token not yet valid (Clock skew).") |
| # We need to find the kid from the keys array. | ||
| keys = data.get("public_keys", []) | ||
| if not keys: | ||
| raise ConfigurationError("agent-card.json missing 'public_keys'.") |
Copilot
AI
Nov 23, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inconsistent indentation detected on line 244. This line has extra indentation (one extra space) compared to the surrounding code at the same level. It should align with line 243 and other statements in the same block.
| raise ConfigurationError("agent-card.json missing 'public_keys'.") | |
| raise ConfigurationError("agent-card.json missing 'public_keys'.") |
| return JSONResponse( | ||
| {"error": "Missing X-Capiscio-JWS header. This endpoint is protected by CapiscIO."}, | ||
| status_code=401 | ||
| ) |
Copilot
AI
Nov 23, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inconsistent indentation detected on lines 40-42. These lines have extra indentation (one extra space) compared to the surrounding code at the same level. They should align with the if statement on line 39.
| return JSONResponse( | |
| {"error": "Missing X-Capiscio-JWS header. This endpoint is protected by CapiscIO."}, | |
| status_code=401 | |
| ) | |
| return JSONResponse( | |
| {"error": "Missing X-Capiscio-JWS header. This endpoint is protected by CapiscIO."}, | |
| status_code=401 | |
| ) |
| # Reset the receive channel so downstream can read the body | ||
| async def receive() -> Dict[str, Any]: | ||
| return {"type": "http.request", "body": body_bytes, "more_body": False} | ||
| request._receive = receive |
Copilot
AI
Nov 23, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Accessing the private attribute request._receive is fragile and not part of the public API. This implementation detail may break with future Starlette/FastAPI versions. Consider using a more robust approach, such as storing the body in request.state for downstream handlers to access if needed, or using Starlette's built-in body caching mechanisms.
| # Reset the receive channel so downstream can read the body | |
| async def receive() -> Dict[str, Any]: | |
| return {"type": "http.request", "body": body_bytes, "more_body": False} | |
| request._receive = receive | |
| # Store the body in request.state for downstream access | |
| request.state.body_bytes = body_bytes | |
| # Downstream handlers can access the body via request.state.body_bytes |
| # We must now pass the body bytes to sign_outbound to generate the 'bh' claim | ||
| # CRITICAL: We must ensure the bytes we sign are EXACTLY the bytes we send. | ||
| body_bytes = json.dumps(payload).encode('utf-8') | ||
|
|
||
| t0 = time.perf_counter() | ||
| token = guard.sign_outbound(payload, body=body_bytes) |
Copilot
AI
Nov 23, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The payload dictionary on line 13 contains a timestamp with value time.time() (a float). When this payload is later passed to guard.sign_outbound(payload, body=body_bytes) on line 23, it will be mutated to include additional fields. However, the body bytes were computed from the original payload on line 20. This creates a mismatch: the JWT claims will include the timestamp from the original payload dict, but if json.dumps() produces different output for the same values (though unlikely with floats), it could cause issues. More importantly, this pattern is confusing. Consider creating the body_bytes after the payload is fully prepared, or make it clear that the payload dictionary passed to sign_outbound is separate from the HTTP body content.
| # We must now pass the body bytes to sign_outbound to generate the 'bh' claim | |
| # CRITICAL: We must ensure the bytes we sign are EXACTLY the bytes we send. | |
| body_bytes = json.dumps(payload).encode('utf-8') | |
| t0 = time.perf_counter() | |
| token = guard.sign_outbound(payload, body=body_bytes) | |
| # Prepare payload fully before serialization and signing | |
| prepared_payload = dict(payload) # If sign_outbound mutates, use a copy | |
| body_bytes = json.dumps(prepared_payload).encode('utf-8') | |
| t0 = time.perf_counter() | |
| token = guard.sign_outbound(prepared_payload, body=body_bytes) |
| self, | ||
| request: Request, | ||
| call_next: Callable[[Request], Awaitable[Response]] | ||
| ) -> Response: |
Copilot
AI
Nov 23, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The dispatch method lacks a docstring. This is the main method of the middleware and should document its behavior, including:
- What it does (verifies incoming requests)
- Parameters (
request,call_next) - Return value
- Exceptions or error responses (401 for missing auth, 403 for invalid auth)
- Side effects (modifies
request.state)
| ) -> Response: | |
| ) -> Response: | |
| """ | |
| Verifies incoming requests for A2A identity using Capiscio SimpleGuard. | |
| Parameters: | |
| request (Request): The incoming HTTP request. | |
| call_next (Callable[[Request], Awaitable[Response]]): The next middleware or endpoint handler. | |
| Returns: | |
| Response: The HTTP response, either an error (401/403) or the result of the downstream handler. | |
| Error Responses: | |
| - 401: If the X-Capiscio-JWS header is missing. | |
| - 403: If the authentication/verification fails. | |
| Side Effects: | |
| - Modifies request.state by injecting agent claims and agent_id if verification succeeds. | |
| - Resets the request body receive channel for downstream handlers. | |
| """ |
| # Inject issuer if missing | ||
| if "iss" not in payload: | ||
| payload["iss"] = self.agent_id | ||
|
|
||
| # Replay Protection: Inject timestamps | ||
| now = int(time.time()) | ||
| payload["iat"] = now | ||
| payload["exp"] = now + MAX_TOKEN_AGE | ||
|
|
||
| # Integrity: Calculate Body Hash if body is provided | ||
| if body is not None: | ||
| # SHA-256 hash | ||
| sha256_hash = hashlib.sha256(body).digest() | ||
| # Base64Url encode (no padding) | ||
| bh = base64.urlsafe_b64encode(sha256_hash).decode('utf-8').rstrip('=') | ||
| payload["bh"] = bh |
Copilot
AI
Nov 23, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The sign_outbound method mutates the input payload dictionary by adding iss, iat, exp, and potentially bh fields. This side effect can cause unexpected behavior for callers who might reuse the payload dictionary. Consider creating a copy of the payload before modification to avoid mutating the input parameter.
Example fix:
# Create a copy to avoid mutating input
payload_copy = payload.copy()
if "iss" not in payload_copy:
payload_copy["iss"] = self.agent_id
# ... continue with payload_copy| print("✅ SUCCESS: Tampered payload was blocked!") | ||
| else: | ||
| print("❌ FAILURE: Tampered payload was accepted!") | ||
|
|
Copilot
AI
Nov 23, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] Trailing whitespace detected on line 80. This should be removed to maintain code cleanliness.
SimpleGuard v0.3.0: Enforcement First Strategy
This PR implements the new "Enforcement First" security strategy for the CapiscIO SDK.
🛡️ Major Features
bh) verification to prevent payload tampering.exp,iat).Server-Timingheaders to monitor verification overhead.📦 Changes
0.3.0.README.mdandSECURITY.mdto reflect the new model.examples/secure_ping_pongdemo.✅ Verification
pytest tests/unit/test_simple_guard.py)pytest tests/unit/test_fastapi_integration.py)secure_ping_pongdemo.