feat: vault v2 methods — opaque handles#1
Conversation
9 vault methods added to KovaMind client: - vault_setup, vault_unlock, vault_lock - vault_store, vault_list, vault_delete - vault_handles, vault_execute, vault_recover No method returns plaintext credential values. Bumped version to 0.5.0. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
This PR adds Vault v2 support to the KovaMind Python SDK, introducing client methods and corresponding models intended to ensure credential access is done via opaque handles (not plaintext values), along with a version bump to 0.5.0.
Changes:
- Adds Vault v2 client methods (setup/unlock/lock/store/list/delete/handles/execute/recover) and a new internal
_delete()HTTP helper. - Introduces new Vault-related response models (
VaultSetupResult,VaultHandle,VaultCredentialMeta,VaultExecuteResult) and re-exports them fromkovamind. - Adds a new unit test module covering the vault methods and a codebase scan test to detect plaintext/decrypt references.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
kovamind/client.py |
Adds Vault v2 public APIs and a DELETE helper method. |
kovamind/models.py |
Introduces Vault v2 dataclass models for setup/handles/metadata/execute results. |
kovamind/__init__.py |
Re-exports Vault models and bumps __version__ to 0.5.0. |
pyproject.toml |
Bumps package version to 0.5.0. |
tests/test_vault.py |
Adds unit tests for vault methods plus a plaintext/decrypt scan test. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| assert result["status"] == "recovered" | ||
|
|
||
|
|
||
| class TestNoPlantext: |
There was a problem hiding this comment.
Test class name has a typo: TestNoPlantext should be TestNoPlaintext to match the term used elsewhere and improve discoverability in test output.
| class TestNoPlantext: | |
| class TestNoPlaintext: |
| content = fh.read() | ||
| # Check vault methods don't return raw credential fields | ||
| assert "decrypt" not in content, f"Found 'decrypt' in {path}" | ||
| assert "plaintext" not in content.lower() or "no plaintext" in content.lower() or "never" in content.lower(), f"Suspicious 'plaintext' in {path}" |
There was a problem hiding this comment.
The plaintext scan assertion is logically incorrect: the or "never" in content.lower() clause makes the test pass even when plaintext appears in the file (as long as the word "never" appears anywhere). Consider rewriting this as a conditional that only allows specific safe phrases (e.g., allowlist comment strings) when "plaintext" is present, otherwise fail.
| assert "plaintext" not in content.lower() or "no plaintext" in content.lower() or "never" in content.lower(), f"Suspicious 'plaintext' in {path}" | |
| lc = content.lower() | |
| if "plaintext" in lc: | |
| allowed_phrases = ( | |
| "no plaintext", | |
| "never store plaintext", | |
| "never log plaintext", | |
| ) | |
| assert any(phrase in lc for phrase in allowed_phrases), f"Suspicious 'plaintext' in {path}" |
|
|
||
| class TestNoPlantext: | ||
| def test_grep_no_plaintext_in_codebase(self): | ||
| import os, re |
There was a problem hiding this comment.
re is imported but never used. Please remove the unused import (or use it for the scan logic).
| import os, re | |
| import os |
| def test_vault_setup_returns_words(self, client): | ||
| with patch.object(client._session, "request", return_value=_mock_response({"status": "created", "recovery_words": ["w"] * 12})): | ||
| result = client.vault_setup("strongpass") | ||
| assert result["status"] == "created" |
There was a problem hiding this comment.
These tests patch client._session.request directly, but (unlike the existing tests/test_client.py approach) they don't assert the request URL/path or payload. That means regressions like hitting the wrong endpoint or sending the passphrase under the wrong key won't be caught. Consider switching to the responses library with matchers/callbacks (as used elsewhere in this repo) to validate both request and response behavior.
| def vault_setup(self, passphrase: str) -> dict: | ||
| """First-time vault setup. Returns recovery words.""" | ||
| return self._post("/vault/v2/setup", {"passphrase": passphrase}) | ||
|
|
||
| def vault_unlock(self, passphrase: str) -> dict: | ||
| """Unlock the vault.""" | ||
| return self._post("/vault/v2/unlock", {"passphrase": passphrase}) | ||
|
|
||
| def vault_lock(self) -> dict: | ||
| """Lock the vault.""" | ||
| return self._post("/vault/v2/lock", {}) | ||
|
|
||
| def vault_store(self, label: str, schema_type: str, fields: dict, tags: str | None = None) -> dict: | ||
| """Store a credential. Returns opaque handle.""" | ||
| payload = {"label": label, "schema_type": schema_type, "fields": fields} | ||
| if tags is not None: | ||
| payload["tags"] = tags | ||
| return self._post("/vault/v2/credentials", payload) | ||
|
|
||
| def vault_list(self) -> list[dict]: | ||
| """List credentials (metadata only, no values).""" | ||
| data = self._get("/vault/v2/credentials", {}) | ||
| return data.get("credentials", []) | ||
|
|
||
| def vault_delete(self, credential_id: str) -> dict: | ||
| """Delete a credential.""" | ||
| return self._delete(f"/vault/v2/credentials/{credential_id}") | ||
|
|
||
| def vault_handles(self) -> list[dict]: | ||
| """List opaque handles (handle, label, schema_type only).""" | ||
| data = self._get("/vault/v2/handles", {}) | ||
| return data.get("handles", []) | ||
|
|
||
| def vault_execute(self, handle: str, action: str, target: str, mapping: dict | None = None) -> dict: | ||
| """Execute an action using a credential. Never returns credential values.""" | ||
| payload = {"handle": handle, "action": action, "target": target} | ||
| if mapping is not None: | ||
| payload["mapping"] = mapping | ||
| return self._post("/vault/v2/execute", payload) | ||
|
|
||
| def vault_recover(self, words: list[str], new_passphrase: str) -> dict: | ||
| """Recover vault with 12 recovery words.""" | ||
| return self._post("/vault/v2/recover", {"words": words, "new_passphrase": new_passphrase}) |
There was a problem hiding this comment.
Public client methods currently return typed model objects (e.g., extract() -> ExtractResult, health() -> HealthStatus), but all new vault methods return untyped dict/list[dict]. Since new vault models were added and re-exported, consider returning VaultSetupResult/VaultExecuteResult and lists of VaultCredentialMeta/VaultHandle for consistency and better type safety.
| def vault_store(self, label: str, schema_type: str, fields: dict, tags: str | None = None) -> dict: | ||
| """Store a credential. Returns opaque handle.""" | ||
| payload = {"label": label, "schema_type": schema_type, "fields": fields} | ||
| if tags is not None: | ||
| payload["tags"] = tags | ||
| return self._post("/vault/v2/credentials", payload) | ||
|
|
||
| def vault_list(self) -> list[dict]: | ||
| """List credentials (metadata only, no values).""" | ||
| data = self._get("/vault/v2/credentials", {}) | ||
| return data.get("credentials", []) | ||
|
|
||
| def vault_delete(self, credential_id: str) -> dict: | ||
| """Delete a credential.""" | ||
| return self._delete(f"/vault/v2/credentials/{credential_id}") | ||
|
|
||
| def vault_handles(self) -> list[dict]: | ||
| """List opaque handles (handle, label, schema_type only).""" | ||
| data = self._get("/vault/v2/handles", {}) | ||
| return data.get("handles", []) | ||
|
|
||
| def vault_execute(self, handle: str, action: str, target: str, mapping: dict | None = None) -> dict: | ||
| """Execute an action using a credential. Never returns credential values.""" | ||
| payload = {"handle": handle, "action": action, "target": target} | ||
| if mapping is not None: | ||
| payload["mapping"] = mapping | ||
| return self._post("/vault/v2/execute", payload) |
There was a problem hiding this comment.
The vault methods’ return values are passed through from the HTTP response without any filtering/redaction, but the docstrings/PR description claim credential values are never returned. To make that guarantee true at the SDK level, consider explicitly allowlisting response keys (or stripping any fields/secret-like keys) before returning to callers.
| def vault_setup(self, passphrase: str) -> dict: | ||
| """First-time vault setup. Returns recovery words.""" | ||
| return self._post("/vault/v2/setup", {"passphrase": passphrase}) | ||
|
|
||
| def vault_unlock(self, passphrase: str) -> dict: | ||
| """Unlock the vault.""" | ||
| return self._post("/vault/v2/unlock", {"passphrase": passphrase}) | ||
|
|
||
| def vault_lock(self) -> dict: | ||
| """Lock the vault.""" | ||
| return self._post("/vault/v2/lock", {}) | ||
|
|
||
| def vault_store(self, label: str, schema_type: str, fields: dict, tags: str | None = None) -> dict: | ||
| """Store a credential. Returns opaque handle.""" | ||
| payload = {"label": label, "schema_type": schema_type, "fields": fields} | ||
| if tags is not None: | ||
| payload["tags"] = tags | ||
| return self._post("/vault/v2/credentials", payload) | ||
|
|
||
| def vault_list(self) -> list[dict]: | ||
| """List credentials (metadata only, no values).""" | ||
| data = self._get("/vault/v2/credentials", {}) | ||
| return data.get("credentials", []) | ||
|
|
||
| def vault_delete(self, credential_id: str) -> dict: | ||
| """Delete a credential.""" | ||
| return self._delete(f"/vault/v2/credentials/{credential_id}") | ||
|
|
||
| def vault_handles(self) -> list[dict]: | ||
| """List opaque handles (handle, label, schema_type only).""" | ||
| data = self._get("/vault/v2/handles", {}) | ||
| return data.get("handles", []) | ||
|
|
||
| def vault_execute(self, handle: str, action: str, target: str, mapping: dict | None = None) -> dict: | ||
| """Execute an action using a credential. Never returns credential values.""" | ||
| payload = {"handle": handle, "action": action, "target": target} | ||
| if mapping is not None: | ||
| payload["mapping"] = mapping | ||
| return self._post("/vault/v2/execute", payload) | ||
|
|
||
| def vault_recover(self, words: list[str], new_passphrase: str) -> dict: |
There was a problem hiding this comment.
Type annotations are overly broad (fields: dict, mapping: dict | None). Elsewhere in this client, request payloads use dict[str, Any] / list[dict[str, Any]]. Tightening these types improves static checking and keeps the public API consistent.
| def vault_setup(self, passphrase: str) -> dict: | |
| """First-time vault setup. Returns recovery words.""" | |
| return self._post("/vault/v2/setup", {"passphrase": passphrase}) | |
| def vault_unlock(self, passphrase: str) -> dict: | |
| """Unlock the vault.""" | |
| return self._post("/vault/v2/unlock", {"passphrase": passphrase}) | |
| def vault_lock(self) -> dict: | |
| """Lock the vault.""" | |
| return self._post("/vault/v2/lock", {}) | |
| def vault_store(self, label: str, schema_type: str, fields: dict, tags: str | None = None) -> dict: | |
| """Store a credential. Returns opaque handle.""" | |
| payload = {"label": label, "schema_type": schema_type, "fields": fields} | |
| if tags is not None: | |
| payload["tags"] = tags | |
| return self._post("/vault/v2/credentials", payload) | |
| def vault_list(self) -> list[dict]: | |
| """List credentials (metadata only, no values).""" | |
| data = self._get("/vault/v2/credentials", {}) | |
| return data.get("credentials", []) | |
| def vault_delete(self, credential_id: str) -> dict: | |
| """Delete a credential.""" | |
| return self._delete(f"/vault/v2/credentials/{credential_id}") | |
| def vault_handles(self) -> list[dict]: | |
| """List opaque handles (handle, label, schema_type only).""" | |
| data = self._get("/vault/v2/handles", {}) | |
| return data.get("handles", []) | |
| def vault_execute(self, handle: str, action: str, target: str, mapping: dict | None = None) -> dict: | |
| """Execute an action using a credential. Never returns credential values.""" | |
| payload = {"handle": handle, "action": action, "target": target} | |
| if mapping is not None: | |
| payload["mapping"] = mapping | |
| return self._post("/vault/v2/execute", payload) | |
| def vault_recover(self, words: list[str], new_passphrase: str) -> dict: | |
| def vault_setup(self, passphrase: str) -> dict[str, Any]: | |
| """First-time vault setup. Returns recovery words.""" | |
| return self._post("/vault/v2/setup", {"passphrase": passphrase}) | |
| def vault_unlock(self, passphrase: str) -> dict[str, Any]: | |
| """Unlock the vault.""" | |
| return self._post("/vault/v2/unlock", {"passphrase": passphrase}) | |
| def vault_lock(self) -> dict[str, Any]: | |
| """Lock the vault.""" | |
| return self._post("/vault/v2/lock", {}) | |
| def vault_store( | |
| self, | |
| label: str, | |
| schema_type: str, | |
| fields: dict[str, Any], | |
| tags: str | None = None, | |
| ) -> dict[str, Any]: | |
| """Store a credential. Returns opaque handle.""" | |
| payload = {"label": label, "schema_type": schema_type, "fields": fields} | |
| if tags is not None: | |
| payload["tags"] = tags | |
| return self._post("/vault/v2/credentials", payload) | |
| def vault_list(self) -> list[dict[str, Any]]: | |
| """List credentials (metadata only, no values).""" | |
| data = self._get("/vault/v2/credentials", {}) | |
| return data.get("credentials", []) | |
| def vault_delete(self, credential_id: str) -> dict[str, Any]: | |
| """Delete a credential.""" | |
| return self._delete(f"/vault/v2/credentials/{credential_id}") | |
| def vault_handles(self) -> list[dict[str, Any]]: | |
| """List opaque handles (handle, label, schema_type only).""" | |
| data = self._get("/vault/v2/handles", {}) | |
| return data.get("handles", []) | |
| def vault_execute( | |
| self, | |
| handle: str, | |
| action: str, | |
| target: str, | |
| mapping: dict[str, Any] | None = None, | |
| ) -> dict[str, Any]: | |
| """Execute an action using a credential. Never returns credential values.""" | |
| payload = {"handle": handle, "action": action, "target": target} | |
| if mapping is not None: | |
| payload["mapping"] = mapping | |
| return self._post("/vault/v2/execute", payload) | |
| def vault_recover(self, words: list[str], new_passphrase: str) -> dict[str, Any]: |
| return cls(success=data.get("success", False), output=data.get("output", ""), | ||
| error=data.get("error"), status_code=data.get("status_code"), raw=data) |
There was a problem hiding this comment.
VaultExecuteResult is documented as "never contains credential values", but it stores the full response in raw unmodified. If the service ever includes secret fields by mistake, the SDK would still expose them via .raw. Consider either removing .raw from vault-related models or sanitizing it to an allowlist of safe fields to align with the documentation/security guarantees.
| return cls(success=data.get("success", False), output=data.get("output", ""), | |
| error=data.get("error"), status_code=data.get("status_code"), raw=data) | |
| # Only expose a sanitized subset of fields in `raw` to avoid leaking | |
| # any unexpected credential values that might be added server-side. | |
| safe_raw: dict[str, Any] = {} | |
| for key in ("success", "output", "error", "status_code"): | |
| if key in data: | |
| safe_raw[key] = data[key] | |
| return cls( | |
| success=data.get("success", False), | |
| output=data.get("output", ""), | |
| error=data.get("error"), | |
| status_code=data.get("status_code"), | |
| raw=safe_raw, | |
| ) |
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Summary
Test plan
Generated with Claude Code