From 9df7aaae1b46b5dc74aab6f2c6ce1a3f3b30a938 Mon Sep 17 00:00:00 2001 From: Nathan Gillett Date: Mon, 18 May 2026 21:05:18 -0500 Subject: [PATCH 01/10] Add Python SDK core with wrap and signed outbox Implement configure, wrap, correlation context, flush, SQLite outbox, Ed25519 signing, and ingest export. Golden signing fixtures match the Node SDK byte-for-byte in CI. Signed-off-by: Nathan Gillett --- .github/workflows/ci.yml | 10 +- pyproject.toml | 9 + src/intentproof/__init__.py | 20 ++- src/intentproof/client.py | 92 ++++++++++ src/intentproof/http_exporter.py | 64 +++++++ src/intentproof/instrumentation.py | 195 +++++++++++++++++++++ src/intentproof/keys.py | 55 ++++++ src/intentproof/outbox.py | 65 +++++++ src/intentproof/signing.py | 73 ++++++++ tests/fixtures/signing_canonical_utf8.txt | 1 + tests/fixtures/signing_event_hash.txt | 1 + tests/fixtures/signing_private_key_b64.txt | 1 + tests/fixtures/signing_signature_b64.txt | 1 + tests/fixtures/signing_unsigned_event.json | 25 +++ tests/test_sdk.py | 191 ++++++++++++++++++++ 15 files changed, 794 insertions(+), 9 deletions(-) create mode 100644 src/intentproof/client.py create mode 100644 src/intentproof/http_exporter.py create mode 100644 src/intentproof/instrumentation.py create mode 100644 src/intentproof/keys.py create mode 100644 src/intentproof/outbox.py create mode 100644 src/intentproof/signing.py create mode 100644 tests/fixtures/signing_canonical_utf8.txt create mode 100644 tests/fixtures/signing_event_hash.txt create mode 100644 tests/fixtures/signing_private_key_b64.txt create mode 100644 tests/fixtures/signing_signature_b64.txt create mode 100644 tests/fixtures/signing_unsigned_event.json create mode 100644 tests/test_sdk.py diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 790bc7d..0845bb5 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -19,13 +19,7 @@ jobs: - name: Install dependencies run: | - if [ -f requirements.txt ]; then pip install -r requirements.txt; fi - if [ -f pyproject.toml ]; then pip install -e .; fi + pip install -e ".[dev]" - name: Run tests - run: | - if [ -z "$(find tests -type f -name '*.py' 2>/dev/null)" ]; then - echo "No test files found (project scaffold only)" - exit 0 - fi - python -m unittest discover -s tests -v + run: pytest -q diff --git a/pyproject.toml b/pyproject.toml index d97ab56..8106bfb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,6 +9,15 @@ description = "Python SDK for IntentProof" readme = "README.md" license = {text = "Apache-2.0"} requires-python = ">=3.9" +dependencies = [ + "cryptography>=42.0.0", + "ulid-py>=1.1.0", +] + +[project.optional-dependencies] +dev = [ + "pytest>=8.0.0", +] [tool.setuptools.packages.find] where = ["src"] diff --git a/src/intentproof/__init__.py b/src/intentproof/__init__.py index a1b057a..4e53f0b 100644 --- a/src/intentproof/__init__.py +++ b/src/intentproof/__init__.py @@ -1,5 +1,23 @@ """IntentProof Python SDK.""" +from intentproof import client from intentproof.exporter import ingest_request_headers, post_execution_event +from intentproof.instrumentation import ( + push_subject_mapping, + run_with_correlation_id, + wrap, +) +from intentproof.client import flush -__all__ = ["ingest_request_headers", "post_execution_event"] +configure = client.configure + +__all__ = [ + "client", + "configure", + "flush", + "ingest_request_headers", + "post_execution_event", + "push_subject_mapping", + "run_with_correlation_id", + "wrap", +] diff --git a/src/intentproof/client.py b/src/intentproof/client.py new file mode 100644 index 0000000..2df46f2 --- /dev/null +++ b/src/intentproof/client.py @@ -0,0 +1,92 @@ +"""SDK configuration and shared runtime state.""" + +from __future__ import annotations + +import os +from pathlib import Path +from typing import TYPE_CHECKING + +from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey + +from intentproof.http_exporter import HttpExporter, resolve_ingest_url +from intentproof.keys import ensure_dir, load_or_create_keypair +from intentproof.outbox import Outbox +from intentproof.signing import load_private_key + +if TYPE_CHECKING: + from intentproof.signing import Ed25519PublicKey + +SDK_VERSION = "python@0.1.0" +DEFAULT_DATA_DIR = Path.home() / ".intentproof" / "sdk-python" + +_instance_private_key: Ed25519PrivateKey | None = None +_instance_id: str | None = None +_tenant_id: str = "tnt_default" +_outbox: Outbox | None = None +_exporter: HttpExporter | None = None +_data_dir: Path | None = None + + +def configure( + *, + db_path: str | None = None, + tenant_id: str | None = None, + data_dir: str | Path | None = None, + ingest_url: str | None = None, +) -> None: + global _instance_private_key, _instance_id, _tenant_id, _outbox, _exporter, _data_dir + + _data_dir = Path(data_dir) if data_dir else DEFAULT_DATA_DIR + ensure_dir(_data_dir) + + kp = load_or_create_keypair(_data_dir) + _instance_private_key = load_private_key(kp.private_key) + _instance_id = kp.instance_id + _tenant_id = ( + tenant_id + or os.environ.get("INTENTPROOF_TENANT_ID", "").strip() + or "tnt_default" + ) + + resolved_db = db_path or os.environ.get("INTENTPROOF_OUTBOX_PATH", "").strip() + if not resolved_db: + resolved_db = str(_data_dir / "outbox.db") + _outbox = Outbox(resolved_db) + + ingest = resolve_ingest_url(ingest_url) + _exporter = HttpExporter(ingest) if ingest else None + + +def flush() -> None: + if _exporter is not None: + _exporter.flush() + + +def get_outbox() -> Outbox: + if _outbox is None: + raise RuntimeError("SDK not configured: call configure() before use") + return _outbox + + +def get_instance_id() -> str: + if _instance_id is None: + raise RuntimeError("SDK not configured: call configure() before get_instance_id()") + return _instance_id + + +def get_private_key() -> Ed25519PrivateKey: + if _instance_private_key is None: + raise RuntimeError("SDK not configured: call configure() before signing") + return _instance_private_key + + +def get_tenant_id() -> str: + return _tenant_id + + +def get_exporter() -> HttpExporter | None: + return _exporter + + +def get_public_key() -> "Ed25519PublicKey": + return get_private_key().public_key() diff --git a/src/intentproof/http_exporter.py b/src/intentproof/http_exporter.py new file mode 100644 index 0000000..7652fdc --- /dev/null +++ b/src/intentproof/http_exporter.py @@ -0,0 +1,64 @@ +"""Background HTTP export of signed events to ingest.""" + +from __future__ import annotations + +import logging +import os +import threading +from typing import Any, Mapping + +from intentproof.exporter import ingest_request_headers, post_execution_event + +logger = logging.getLogger(__name__) + +DEFAULT_LOCAL_INGEST_URL = "http://127.0.0.1:9787/v1/events" + + +def resolve_ingest_url(explicit: str | None = None) -> str | None: + raw = (explicit or os.environ.get("INTENTPROOF_INGEST_URL", "")).strip() + if raw: + return _normalize_ingest_url(raw) + if os.environ.get("INTENTPROOF_USE_LOCAL_INGEST", "").strip() == "1": + return DEFAULT_LOCAL_INGEST_URL + return None + + +def _normalize_ingest_url(raw: str) -> str: + trimmed = raw.strip().rstrip("/") + if trimmed.endswith("/v1/events"): + return trimmed + return f"{trimmed}/v1/events" + + +class HttpExporter: + def __init__(self, ingest_url: str) -> None: + self._ingest_url = ingest_url + self._lock = threading.Lock() + self._pending: list[threading.Thread] = [] + + @property + def ingest_url(self) -> str: + return self._ingest_url + + def enqueue(self, event: Mapping[str, Any]) -> None: + thread = threading.Thread( + target=self._export_one, + args=(dict(event),), + daemon=True, + ) + with self._lock: + self._pending.append(thread) + thread.start() + + def _export_one(self, event: dict[str, Any]) -> None: + try: + post_execution_event(self._ingest_url, event) + except Exception as exc: + logger.warning("[intentproof] ingest export failed: %s", exc) + + def flush(self) -> None: + with self._lock: + threads = list(self._pending) + self._pending.clear() + for thread in threads: + thread.join() diff --git a/src/intentproof/instrumentation.py b/src/intentproof/instrumentation.py new file mode 100644 index 0000000..d55a3f1 --- /dev/null +++ b/src/intentproof/instrumentation.py @@ -0,0 +1,195 @@ +"""wrap(), correlation context, and subject mapping.""" + +from __future__ import annotations + +import inspect +import time +from contextvars import ContextVar +from datetime import datetime, timezone +from functools import wraps +from typing import Any, Callable, TypeVar + +import ulid as _ulid + +from intentproof import client +from intentproof.signing import SENTINEL_PREV_HASH, event_content_hash, sign_event + +_correlation_id: ContextVar[str | None] = ContextVar( + "intentproof_correlation_id", default=None +) + +F = TypeVar("F", bound=Callable[..., Any]) + + +def run_with_correlation_id(correlation_id: str, fn: Callable[[], Any]) -> Any: + token = _correlation_id.set(correlation_id) + try: + return fn() + finally: + _correlation_id.reset(token) + + +def push_subject_mapping( + source_id: str, subject_type: str, subject_id: str +) -> None: + """Record a subject mapping (no-op until reconciliation storage lands).""" + del source_id, subject_type, subject_id + + +def _new_correlation_id() -> str: + current = _correlation_id.get() + if current: + return current + return f"req_{_ulid.new()}" + + +def _iso_timestamp(ms: int) -> str: + return ( + datetime.fromtimestamp(ms / 1000.0, tz=timezone.utc) + .isoformat(timespec="milliseconds") + .replace("+00:00", "Z") + ) + + +def _untrusted_payload(inputs: list[Any], output: Any, status: str) -> bool: + if inputs: + return True + return status == "ok" and output is not None + + +def _record_execution( + *, + intent: str, + action: str, + correlation_id: str, + event_id: str, + t0_ms: int, + t1_ms: int, + inputs: list[Any], + output: Any, + status: str, + error_obj: dict[str, Any] | None, +) -> None: + outbox = client.get_outbox() + chain_pos = 1 + prev_hash = SENTINEL_PREV_HASH + state = outbox.get_chain_state(correlation_id) + if state: + chain_pos = state["position"] + 1 + prev_hash = state["hash"] + + event: dict[str, Any] = { + "schema": "intentproof.event.v1", + "event_id": event_id, + "tenant_id": client.get_tenant_id(), + "instance_id": client.get_instance_id(), + "correlation_id": correlation_id, + "provenance_class": "sdk_attested_evidence", + "prev_event_hash": prev_hash, + "chain_position": chain_pos, + "intent": intent, + "action": action, + "status": status, + "started_at": _iso_timestamp(t0_ms), + "completed_at": _iso_timestamp(t1_ms), + "duration_ms": t1_ms - t0_ms, + "inputs": inputs, + "output": output if status == "ok" else None, + "error": error_obj, + "attributes": {}, + "untrusted_payload": _untrusted_payload(inputs, output, status), + "spec_version": "1.0.0", + "sdk_version": client.SDK_VERSION, + } + + signed = sign_event(event, client.get_private_key(), client.get_instance_id()) + event_hash = event_content_hash(signed) + + outbox.append(event_id, signed) + outbox.set_chain_state(correlation_id, chain_pos, event_hash) + + exporter = client.get_exporter() + if exporter is not None: + exporter.enqueue(signed) + + +def wrap( + intent: str, + action: str, + fn: F, +) -> F: + """Wrap a callable to emit signed ExecutionEvent.v1 records.""" + + if inspect.iscoroutinefunction(fn): + + @wraps(fn) + async def async_wrapped(*args: Any, **kwargs: Any) -> Any: + t0_ms = int(time.time() * 1000) + correlation_id = _new_correlation_id() + event_id = str(_ulid.new()) + status = "ok" + error_obj: dict[str, Any] | None = None + result: Any = None + reraise: BaseException | None = None + + try: + result = await fn(*args, **kwargs) + except BaseException as exc: + status = "error" + error_obj = {"message": str(exc)} + reraise = exc + + t1_ms = int(time.time() * 1000) + _record_execution( + intent=intent, + action=action, + correlation_id=correlation_id, + event_id=event_id, + t0_ms=t0_ms, + t1_ms=t1_ms, + inputs=list(args), + output=result, + status=status, + error_obj=error_obj, + ) + if reraise is not None: + raise reraise + return result + + return async_wrapped # type: ignore[return-value] + + @wraps(fn) + def sync_wrapped(*args: Any, **kwargs: Any) -> Any: + t0_ms = int(time.time() * 1000) + correlation_id = _new_correlation_id() + event_id = str(_ulid.new()) + status = "ok" + error_obj: dict[str, Any] | None = None + result: Any = None + reraise: BaseException | None = None + + try: + result = fn(*args, **kwargs) + except BaseException as exc: + status = "error" + error_obj = {"message": str(exc)} + reraise = exc + + t1_ms = int(time.time() * 1000) + _record_execution( + intent=intent, + action=action, + correlation_id=correlation_id, + event_id=event_id, + t0_ms=t0_ms, + t1_ms=t1_ms, + inputs=list(args), + output=result, + status=status, + error_obj=error_obj, + ) + if reraise is not None: + raise reraise + return result + + return sync_wrapped # type: ignore[return-value] diff --git a/src/intentproof/keys.py b/src/intentproof/keys.py new file mode 100644 index 0000000..a265c38 --- /dev/null +++ b/src/intentproof/keys.py @@ -0,0 +1,55 @@ +"""Instance keypair persistence.""" + +from __future__ import annotations + +import base64 +import json +import os +from dataclasses import dataclass +from pathlib import Path + +import ulid as _ulid + + +@dataclass(frozen=True) +class Keypair: + private_key: str + instance_id: str + + +def ensure_dir(path: Path) -> None: + path.mkdir(parents=True, exist_ok=True) + + +def load_or_create_keypair(data_dir: Path) -> Keypair: + key_path = data_dir / "keypair.json" + if key_path.exists(): + raw = key_path.read_text(encoding="utf-8") + try: + os.chmod(key_path, 0o600) + except OSError: + pass + data = json.loads(raw) + return Keypair( + private_key=data["privateKey"], + instance_id=data["instanceId"], + ) + + private_key = base64.b64encode(os.urandom(32)).decode("ascii") + kp = Keypair( + private_key=private_key, + instance_id=f"inst_{_ulid.new()}", + ) + key_path.write_text( + json.dumps( + {"privateKey": kp.private_key, "instanceId": kp.instance_id}, + indent=2, + ) + + "\n", + encoding="utf-8", + ) + try: + os.chmod(key_path, 0o600) + except OSError: + pass + return kp diff --git a/src/intentproof/outbox.py b/src/intentproof/outbox.py new file mode 100644 index 0000000..4a9493e --- /dev/null +++ b/src/intentproof/outbox.py @@ -0,0 +1,65 @@ +"""SQLite WAL outbox for signed execution events.""" + +from __future__ import annotations + +import json +import sqlite3 +from typing import Any + + +class Outbox: + def __init__(self, db_path: str) -> None: + self._db = sqlite3.connect(db_path) + self._db.execute("PRAGMA journal_mode=WAL") + self._db.executescript( + """ + CREATE TABLE IF NOT EXISTS events ( + event_id TEXT PRIMARY KEY, + body JSON NOT NULL + ); + CREATE TABLE IF NOT EXISTS chains ( + correlation_id TEXT PRIMARY KEY, + last_position INTEGER NOT NULL, + last_hash TEXT NOT NULL + ); + """ + ) + self._db.commit() + + def append(self, event_id: str, body: dict[str, Any]) -> None: + self._db.execute( + "INSERT INTO events (event_id, body) VALUES (?, ?)", + (event_id, json.dumps(body)), + ) + self._db.commit() + + def get_events(self) -> list[dict[str, Any]]: + rows = self._db.execute("SELECT body FROM events").fetchall() + return [json.loads(row[0]) for row in rows] + + def get_chain_state(self, correlation_id: str) -> dict[str, Any] | None: + row = self._db.execute( + "SELECT last_position, last_hash FROM chains WHERE correlation_id = ?", + (correlation_id,), + ).fetchone() + if row is None: + return None + return {"position": row[0], "hash": row[1]} + + def set_chain_state( + self, correlation_id: str, position: int, event_hash: str + ) -> None: + self._db.execute( + """ + INSERT INTO chains (correlation_id, last_position, last_hash) + VALUES (?, ?, ?) + ON CONFLICT(correlation_id) DO UPDATE SET + last_position = excluded.last_position, + last_hash = excluded.last_hash + """, + (correlation_id, position, event_hash), + ) + self._db.commit() + + def close(self) -> None: + self._db.close() diff --git a/src/intentproof/signing.py b/src/intentproof/signing.py new file mode 100644 index 0000000..8c59058 --- /dev/null +++ b/src/intentproof/signing.py @@ -0,0 +1,73 @@ +"""Ed25519 signing over JCS-canonical execution events.""" + +from __future__ import annotations + +import base64 +import hashlib +from typing import Any + +from cryptography.hazmat.primitives.asymmetric.ed25519 import ( + Ed25519PrivateKey, + Ed25519PublicKey, +) + +from intentproof.canon import canonicalize + +SENTINEL_PREV_HASH = ( + "sha256:0000000000000000000000000000000000000000000000000000000000000000" +) + + +def canonicalize_event(event: dict[str, Any]) -> str: + unsigned = {k: v for k, v in event.items() if k != "signature"} + return canonicalize(unsigned) + + +def event_content_hash(event: dict[str, Any]) -> str: + canonical = canonicalize_event(event) + digest = hashlib.sha256(canonical.encode("utf-8")).hexdigest() + return f"sha256:{digest}" + + +def sign_event( + event: dict[str, Any], + private_key: Ed25519PrivateKey, + instance_id: str, +) -> dict[str, Any]: + canonical = canonicalize_event(event) + digest = hashlib.sha256(canonical.encode("utf-8")).digest() + signature = private_key.sign(digest) + signed = dict(event) + signed["signature"] = { + "alg": "ed25519", + "key_id": f"{instance_id}:k1", + "value": base64.b64encode(signature).decode("ascii"), + } + return signed + + +def verify_event_signature( + event: dict[str, Any], public_key: Ed25519PublicKey +) -> bool: + sig_block = event.get("signature") + if not sig_block: + return False + canonical = canonicalize_event(event) + digest = hashlib.sha256(canonical.encode("utf-8")).digest() + try: + public_key.verify( + base64.b64decode(sig_block["value"]), digest + ) + except Exception: + return False + return True + + +def load_private_key(raw_b64: str) -> Ed25519PrivateKey: + return Ed25519PrivateKey.from_private_bytes( + base64.b64decode(raw_b64) + ) + + +def public_key_bytes(private_key: Ed25519PrivateKey) -> bytes: + return private_key.public_key().public_bytes_raw() diff --git a/tests/fixtures/signing_canonical_utf8.txt b/tests/fixtures/signing_canonical_utf8.txt new file mode 100644 index 0000000..9598e62 --- /dev/null +++ b/tests/fixtures/signing_canonical_utf8.txt @@ -0,0 +1 @@ +{"action":"test.golden.wrap","attributes":{},"chain_position":1,"completed_at":"2026-05-11T14:32:17.422Z","correlation_id":"corr_golden_wrap","duration_ms":1,"error":null,"event_id":"01JMR9F0X4M9V0K2YH8M0X3D9N","inputs":[5],"instance_id":"inst_golden_test","intent":"Golden wrap fixture","output":10,"prev_event_hash":"sha256:0000000000000000000000000000000000000000000000000000000000000000","provenance_class":"sdk_attested_evidence","schema":"intentproof.event.v1","sdk_version":"python@0.1.0","spec_version":"1.0.0","started_at":"2026-05-11T14:32:17.421Z","status":"ok","tenant_id":"tnt_golden","untrusted_payload":true} \ No newline at end of file diff --git a/tests/fixtures/signing_event_hash.txt b/tests/fixtures/signing_event_hash.txt new file mode 100644 index 0000000..6498bad --- /dev/null +++ b/tests/fixtures/signing_event_hash.txt @@ -0,0 +1 @@ +sha256:71aaa399f9e43d4086d69eed7e080f62849bc60969cc36b8b6018296756578fb diff --git a/tests/fixtures/signing_private_key_b64.txt b/tests/fixtures/signing_private_key_b64.txt new file mode 100644 index 0000000..e475d59 --- /dev/null +++ b/tests/fixtures/signing_private_key_b64.txt @@ -0,0 +1 @@ +AAECAwQFBgcICQoLDA0ODxAREhMUFRYXGBkaGxwdHh8= diff --git a/tests/fixtures/signing_signature_b64.txt b/tests/fixtures/signing_signature_b64.txt new file mode 100644 index 0000000..8b55d74 --- /dev/null +++ b/tests/fixtures/signing_signature_b64.txt @@ -0,0 +1 @@ +eJEL6Wa2GmV8aSYXBG59jZz0QHT6Xx40p61s+Zk65PWv0OqAD6qLDdz/RZRTVz6eMxao0poCktwKWNFOJDbvAw== diff --git a/tests/fixtures/signing_unsigned_event.json b/tests/fixtures/signing_unsigned_event.json new file mode 100644 index 0000000..eead9be --- /dev/null +++ b/tests/fixtures/signing_unsigned_event.json @@ -0,0 +1,25 @@ +{ + "schema": "intentproof.event.v1", + "event_id": "01JMR9F0X4M9V0K2YH8M0X3D9N", + "tenant_id": "tnt_golden", + "instance_id": "inst_golden_test", + "correlation_id": "corr_golden_wrap", + "provenance_class": "sdk_attested_evidence", + "prev_event_hash": "sha256:0000000000000000000000000000000000000000000000000000000000000000", + "chain_position": 1, + "intent": "Golden wrap fixture", + "action": "test.golden.wrap", + "status": "ok", + "started_at": "2026-05-11T14:32:17.421Z", + "completed_at": "2026-05-11T14:32:17.422Z", + "duration_ms": 1, + "inputs": [ + 5 + ], + "output": 10, + "error": null, + "attributes": {}, + "untrusted_payload": true, + "spec_version": "1.0.0", + "sdk_version": "python@0.1.0" +} diff --git a/tests/test_sdk.py b/tests/test_sdk.py new file mode 100644 index 0000000..747fb8e --- /dev/null +++ b/tests/test_sdk.py @@ -0,0 +1,191 @@ +"""SDK wrap, outbox, and signing tests.""" + +from __future__ import annotations + +import base64 +import json +import tempfile +from pathlib import Path + +import pytest + +from intentproof import client, configure, flush, run_with_correlation_id, wrap +from intentproof.signing import ( + canonicalize_event, + event_content_hash, + load_private_key, + sign_event, + verify_event_signature, +) + + +@pytest.fixture +def sdk_dirs(tmp_path: Path) -> tuple[str, str]: + data_dir = tmp_path / "data" + db_path = str(tmp_path / "outbox.db") + return str(db_path), str(data_dir) + + +def test_persists_keypair_across_configure(sdk_dirs: tuple[str, str]) -> None: + db_path, data_dir = sdk_dirs + configure(db_path=db_path, data_dir=data_dir, tenant_id="tnt_a") + id1 = client.get_instance_id() + pub1 = client.get_public_key().public_bytes_raw() + + configure(db_path=db_path, data_dir=data_dir, tenant_id="tnt_a") + id2 = client.get_instance_id() + pub2 = client.get_public_key().public_bytes_raw() + + assert id1 == id2 + assert pub1 == pub2 + + +def test_generates_new_keypair_for_fresh_data_dir( + sdk_dirs: tuple[str, str], +) -> None: + db_path, data_dir = sdk_dirs + configure(db_path=db_path, data_dir=data_dir, tenant_id="tnt_a") + id1 = client.get_instance_id() + + fresh_dir = Path(data_dir).parent / "fresh" + configure(db_path=db_path, data_dir=str(fresh_dir), tenant_id="tnt_a") + id2 = client.get_instance_id() + + assert id1 != id2 + + +def test_produces_signed_event_with_sentinel_prev_hash( + sdk_dirs: tuple[str, str], +) -> None: + db_path, data_dir = sdk_dirs + configure(db_path=db_path, data_dir=data_dir, tenant_id="tnt_a") + + fn = wrap(intent="Test", action="test.action", fn=lambda x: x * 2) + + run_with_correlation_id("corr-1", lambda: fn(5)) + + events = client.get_outbox().get_events() + assert len(events) == 1 + ev = events[0] + assert ev["chain_position"] == 1 + assert ( + ev["prev_event_hash"] + == "sha256:0000000000000000000000000000000000000000000000000000000000000000" + ) + assert ev["signature"]["alg"] == "ed25519" + assert ev["signature"]["value"] + assert ev["provenance_class"] == "sdk_attested_evidence" + assert ev["untrusted_payload"] is True + + +def test_chain_continuity_across_reconfigure(sdk_dirs: tuple[str, str]) -> None: + db_path, data_dir = sdk_dirs + configure(db_path=db_path, data_dir=data_dir, tenant_id="tnt_a") + fn = wrap(intent="Test", action="test.action", fn=lambda x: x * 2) + run_with_correlation_id("corr-2", lambda: fn(1)) + + configure(db_path=db_path, data_dir=data_dir, tenant_id="tnt_a") + fn2 = wrap(intent="Test", action="test.action", fn=lambda x: x * 2) + run_with_correlation_id("corr-2", lambda: fn2(2)) + + events = client.get_outbox().get_events() + assert len(events) == 2 + ev2 = next(e for e in events if e["chain_position"] == 2) + assert ev2["prev_event_hash"].startswith("sha256:") + assert ( + ev2["prev_event_hash"] + != "sha256:0000000000000000000000000000000000000000000000000000000000000000" + ) + + +def test_correlation_isolation(sdk_dirs: tuple[str, str]) -> None: + db_path, data_dir = sdk_dirs + configure(db_path=db_path, data_dir=data_dir, tenant_id="tnt_a") + fn = wrap(intent="Test", action="test.action", fn=lambda x: x * 2) + + run_with_correlation_id("corr-a", lambda: fn(1)) + run_with_correlation_id("corr-b", lambda: fn(2)) + + events = client.get_outbox().get_events() + ev_a = next(e for e in events if e["correlation_id"] == "corr-a") + ev_b = next(e for e in events if e["correlation_id"] == "corr-b") + assert ev_a["chain_position"] == 1 + assert ev_b["chain_position"] == 1 + + +def test_verifiable_ed25519_signature(sdk_dirs: tuple[str, str]) -> None: + db_path, data_dir = sdk_dirs + configure(db_path=db_path, data_dir=data_dir, tenant_id="tnt_a") + fn = wrap(intent="Test", action="test.action", fn=lambda x: x * 2) + run_with_correlation_id("corr-verify", lambda: fn(7)) + + ev = next( + e + for e in client.get_outbox().get_events() + if e["correlation_id"] == "corr-verify" + ) + assert verify_event_signature(ev, client.get_public_key()) + + +def test_wrap_reraises_exception(sdk_dirs: tuple[str, str]) -> None: + db_path, data_dir = sdk_dirs + configure(db_path=db_path, data_dir=data_dir, tenant_id="tnt_a") + + def boom() -> None: + raise ValueError("boom") + + fn = wrap(intent="Test", action="test.action", fn=boom) + with pytest.raises(ValueError, match="boom"): + fn() + + events = client.get_outbox().get_events() + assert events[-1]["status"] == "error" + assert events[-1]["error"] == {"message": "boom"} + + +def test_flush_waits_for_exporter(monkeypatch: pytest.MonkeyPatch) -> None: + with tempfile.TemporaryDirectory() as tmp: + configure( + db_path=f"{tmp}/outbox.db", + data_dir=f"{tmp}/data", + tenant_id="tnt_a", + ingest_url="http://127.0.0.1:9/v1/events", + ) + posted: list[dict] = [] + + def fake_post(url: str, event: dict) -> None: + posted.append(event) + + monkeypatch.setattr( + "intentproof.http_exporter.post_execution_event", fake_post + ) + fn = wrap(intent="Export", action="export.test", fn=lambda n: n + 1) + run_with_correlation_id("corr-export", lambda: fn(1)) + flush() + assert len(posted) == 1 + + +def test_signing_golden_bytes() -> None: + fixture_dir = Path(__file__).parent / "fixtures" + unsigned = json.loads( + (fixture_dir / "signing_unsigned_event.json").read_text(encoding="utf-8") + ) + expected_canonical = ( + fixture_dir / "signing_canonical_utf8.txt" + ).read_text(encoding="utf-8") + expected_hash = ( + fixture_dir / "signing_event_hash.txt" + ).read_text(encoding="utf-8").strip() + expected_sig = ( + fixture_dir / "signing_signature_b64.txt" + ).read_text(encoding="utf-8").strip() + private_key_b64 = ( + fixture_dir / "signing_private_key_b64.txt" + ).read_text(encoding="utf-8").strip() + + assert canonicalize_event(unsigned) == expected_canonical + private_key = load_private_key(private_key_b64) + signed = sign_event(unsigned, private_key, "inst_golden_test") + assert event_content_hash(signed) == expected_hash + assert signed["signature"]["value"] == expected_sig + assert verify_event_signature(signed, private_key.public_key()) From 23eefa890bb476f95f229d7976d1991219467b9e Mon Sep 17 00:00:00 2001 From: Nathan Gillett Date: Mon, 18 May 2026 21:15:40 -0500 Subject: [PATCH 02/10] fix(sdk): atomic outbox write and preserve wrapped errors Record event and chain state in one transaction; chain app exceptions when outbox recording fails after a wrapped failure. Signed-off-by: Nathan Gillett --- src/intentproof/instrumentation.py | 63 ++++++++++++++++++------------ src/intentproof/outbox.py | 29 ++++++++++++++ tests/test_sdk.py | 25 ++++++++++++ 3 files changed, 91 insertions(+), 26 deletions(-) diff --git a/src/intentproof/instrumentation.py b/src/intentproof/instrumentation.py index d55a3f1..77170e2 100644 --- a/src/intentproof/instrumentation.py +++ b/src/intentproof/instrumentation.py @@ -105,8 +105,9 @@ def _record_execution( signed = sign_event(event, client.get_private_key(), client.get_instance_id()) event_hash = event_content_hash(signed) - outbox.append(event_id, signed) - outbox.set_chain_state(correlation_id, chain_pos, event_hash) + outbox.append_with_chain_state( + event_id, signed, correlation_id, chain_pos, event_hash + ) exporter = client.get_exporter() if exporter is not None: @@ -140,18 +141,23 @@ async def async_wrapped(*args: Any, **kwargs: Any) -> Any: reraise = exc t1_ms = int(time.time() * 1000) - _record_execution( - intent=intent, - action=action, - correlation_id=correlation_id, - event_id=event_id, - t0_ms=t0_ms, - t1_ms=t1_ms, - inputs=list(args), - output=result, - status=status, - error_obj=error_obj, - ) + try: + _record_execution( + intent=intent, + action=action, + correlation_id=correlation_id, + event_id=event_id, + t0_ms=t0_ms, + t1_ms=t1_ms, + inputs=list(args), + output=result, + status=status, + error_obj=error_obj, + ) + except BaseException as record_exc: + if reraise is not None: + raise reraise from record_exc + raise if reraise is not None: raise reraise return result @@ -176,18 +182,23 @@ def sync_wrapped(*args: Any, **kwargs: Any) -> Any: reraise = exc t1_ms = int(time.time() * 1000) - _record_execution( - intent=intent, - action=action, - correlation_id=correlation_id, - event_id=event_id, - t0_ms=t0_ms, - t1_ms=t1_ms, - inputs=list(args), - output=result, - status=status, - error_obj=error_obj, - ) + try: + _record_execution( + intent=intent, + action=action, + correlation_id=correlation_id, + event_id=event_id, + t0_ms=t0_ms, + t1_ms=t1_ms, + inputs=list(args), + output=result, + status=status, + error_obj=error_obj, + ) + except BaseException as record_exc: + if reraise is not None: + raise reraise from record_exc + raise if reraise is not None: raise reraise return result diff --git a/src/intentproof/outbox.py b/src/intentproof/outbox.py index 4a9493e..610c3cc 100644 --- a/src/intentproof/outbox.py +++ b/src/intentproof/outbox.py @@ -33,6 +33,35 @@ def append(self, event_id: str, body: dict[str, Any]) -> None: ) self._db.commit() + def append_with_chain_state( + self, + event_id: str, + body: dict[str, Any], + correlation_id: str, + position: int, + event_hash: str, + ) -> None: + """Persist event and chain head in one transaction.""" + try: + self._db.execute( + "INSERT INTO events (event_id, body) VALUES (?, ?)", + (event_id, json.dumps(body)), + ) + self._db.execute( + """ + INSERT INTO chains (correlation_id, last_position, last_hash) + VALUES (?, ?, ?) + ON CONFLICT(correlation_id) DO UPDATE SET + last_position = excluded.last_position, + last_hash = excluded.last_hash + """, + (correlation_id, position, event_hash), + ) + self._db.commit() + except Exception: + self._db.rollback() + raise + def get_events(self) -> list[dict[str, Any]]: rows = self._db.execute("SELECT body FROM events").fetchall() return [json.loads(row[0]) for row in rows] diff --git a/tests/test_sdk.py b/tests/test_sdk.py index 747fb8e..7590baa 100644 --- a/tests/test_sdk.py +++ b/tests/test_sdk.py @@ -127,6 +127,31 @@ def test_verifiable_ed25519_signature(sdk_dirs: tuple[str, str]) -> None: assert verify_event_signature(ev, client.get_public_key()) +def test_wrap_preserves_app_exception_when_record_fails( + sdk_dirs: tuple[str, str], monkeypatch: pytest.MonkeyPatch +) -> None: + db_path, data_dir = sdk_dirs + configure(db_path=db_path, data_dir=data_dir, tenant_id="tnt_a") + + def boom() -> None: + raise ValueError("boom") + + fn = wrap(intent="Test", action="test.action", fn=boom) + + def fail_record(**_kwargs: object) -> None: + raise RuntimeError("outbox unavailable") + + monkeypatch.setattr( + "intentproof.instrumentation._record_execution", fail_record + ) + + with pytest.raises(ValueError, match="boom") as exc_info: + fn() + + assert exc_info.value.__cause__ is not None + assert isinstance(exc_info.value.__cause__, RuntimeError) + + def test_wrap_reraises_exception(sdk_dirs: tuple[str, str]) -> None: db_path, data_dir = sdk_dirs configure(db_path=db_path, data_dir=data_dir, tenant_id="tnt_a") From bf5724a34455a6ccb02ff7fee7c55f57d53a9dc0 Mon Sep 17 00:00:00 2001 From: Nathan Gillett Date: Mon, 18 May 2026 21:27:52 -0500 Subject: [PATCH 03/10] fix(sdk): start export thread before releasing enqueue lock Prevents flush() from joining a thread that is still in _pending but not yet started. Signed-off-by: Nathan Gillett --- src/intentproof/http_exporter.py | 2 +- tests/test_exporter.py | 33 ++++++++++++++++++++++++++++++++ 2 files changed, 34 insertions(+), 1 deletion(-) diff --git a/src/intentproof/http_exporter.py b/src/intentproof/http_exporter.py index 7652fdc..5e6e923 100644 --- a/src/intentproof/http_exporter.py +++ b/src/intentproof/http_exporter.py @@ -48,7 +48,7 @@ def enqueue(self, event: Mapping[str, Any]) -> None: ) with self._lock: self._pending.append(thread) - thread.start() + thread.start() def _export_one(self, event: dict[str, Any]) -> None: try: diff --git a/tests/test_exporter.py b/tests/test_exporter.py index 918f47f..fead89d 100644 --- a/tests/test_exporter.py +++ b/tests/test_exporter.py @@ -1,6 +1,8 @@ import os +import threading from intentproof.exporter import ingest_request_headers +from intentproof.http_exporter import HttpExporter def test_ingest_request_headers_includes_bearer_token() -> None: @@ -16,6 +18,37 @@ def test_ingest_request_headers_includes_bearer_token() -> None: os.environ["INTENTPROOF_INGEST_TOKEN"] = previous +def test_enqueue_starts_thread_before_releasing_lock() -> None: + order: list[str] = [] + inner = threading.Lock() + + class TrackingLock: + def __enter__(self) -> "TrackingLock": + inner.acquire() + return self + + def __exit__(self, *args: object) -> None: + order.append("lock_release") + inner.release() + + exporter = HttpExporter("http://127.0.0.1:9787/v1/events") + exporter._lock = TrackingLock() # type: ignore[assignment] + + original_start = threading.Thread.start + + def tracked_start(self: threading.Thread) -> None: + order.append("start") + original_start(self) + + threading.Thread.start = tracked_start # type: ignore[method-assign] + try: + exporter.enqueue({"schema": "intentproof.event.v1"}) + finally: + threading.Thread.start = original_start # type: ignore[method-assign] + + assert order.index("start") < order.index("lock_release") + + def test_ingest_request_headers_omits_authorization_without_token() -> None: previous = os.environ.get("INTENTPROOF_INGEST_TOKEN") os.environ.pop("INTENTPROOF_INGEST_TOKEN", None) From 7c5187b8040f11b6f10ed7cadba77e542d0a182c Mon Sep 17 00:00:00 2001 From: Nathan Gillett Date: Mon, 18 May 2026 21:38:25 -0500 Subject: [PATCH 04/10] fix(sdk): prune export threads and close outbox on reconfigure Drop finished threads from the exporter pending list on enqueue; flush and close prior outbox and exporter when configure() runs. Signed-off-by: Nathan Gillett --- src/intentproof/client.py | 5 +++++ src/intentproof/http_exporter.py | 7 ++++++- tests/test_exporter.py | 23 +++++++++++++++++++++++ tests/test_sdk.py | 14 ++++++++++++++ 4 files changed, 48 insertions(+), 1 deletion(-) diff --git a/src/intentproof/client.py b/src/intentproof/client.py index 2df46f2..0b72ffb 100644 --- a/src/intentproof/client.py +++ b/src/intentproof/client.py @@ -36,6 +36,11 @@ def configure( ) -> None: global _instance_private_key, _instance_id, _tenant_id, _outbox, _exporter, _data_dir + if _exporter is not None: + _exporter.flush() + if _outbox is not None: + _outbox.close() + _data_dir = Path(data_dir) if data_dir else DEFAULT_DATA_DIR ensure_dir(_data_dir) diff --git a/src/intentproof/http_exporter.py b/src/intentproof/http_exporter.py index 5e6e923..bd007a4 100644 --- a/src/intentproof/http_exporter.py +++ b/src/intentproof/http_exporter.py @@ -40,6 +40,9 @@ def __init__(self, ingest_url: str) -> None: def ingest_url(self) -> str: return self._ingest_url + def _prune_finished_threads(self) -> None: + self._pending = [t for t in self._pending if t.is_alive()] + def enqueue(self, event: Mapping[str, Any]) -> None: thread = threading.Thread( target=self._export_one, @@ -47,6 +50,7 @@ def enqueue(self, event: Mapping[str, Any]) -> None: daemon=True, ) with self._lock: + self._prune_finished_threads() self._pending.append(thread) thread.start() @@ -61,4 +65,5 @@ def flush(self) -> None: threads = list(self._pending) self._pending.clear() for thread in threads: - thread.join() + if thread.is_alive(): + thread.join() diff --git a/tests/test_exporter.py b/tests/test_exporter.py index fead89d..2fa5266 100644 --- a/tests/test_exporter.py +++ b/tests/test_exporter.py @@ -1,6 +1,8 @@ import os import threading +import pytest + from intentproof.exporter import ingest_request_headers from intentproof.http_exporter import HttpExporter @@ -18,6 +20,27 @@ def test_ingest_request_headers_includes_bearer_token() -> None: os.environ["INTENTPROOF_INGEST_TOKEN"] = previous +def test_enqueue_prunes_finished_threads(monkeypatch: pytest.MonkeyPatch) -> None: + exporter = HttpExporter("http://127.0.0.1:9787/v1/events") + monkeypatch.setattr( + "intentproof.http_exporter.post_execution_event", + lambda _url, _event: None, + ) + + for _ in range(3): + exporter.enqueue({"schema": "intentproof.event.v1"}) + + with exporter._lock: + threads = list(exporter._pending) + for thread in threads: + thread.join(timeout=2.0) + + exporter.enqueue({"schema": "intentproof.event.v1"}) + + with exporter._lock: + assert len(exporter._pending) == 1 + + def test_enqueue_starts_thread_before_releasing_lock() -> None: order: list[str] = [] inner = threading.Lock() diff --git a/tests/test_sdk.py b/tests/test_sdk.py index 7590baa..651e067 100644 --- a/tests/test_sdk.py +++ b/tests/test_sdk.py @@ -78,6 +78,20 @@ def test_produces_signed_event_with_sentinel_prev_hash( assert ev["untrusted_payload"] is True +def test_configure_closes_previous_outbox(sdk_dirs: tuple[str, str]) -> None: + db_path, data_dir = sdk_dirs + configure(db_path=db_path, data_dir=data_dir, tenant_id="tnt_a") + first = client.get_outbox() + + configure(db_path=db_path, data_dir=data_dir, tenant_id="tnt_a") + second = client.get_outbox() + + assert first is not second + fn = wrap(intent="Test", action="test.action", fn=lambda x: x + 1) + run_with_correlation_id("corr-reconfig", lambda: fn(1)) + assert len(client.get_outbox().get_events()) == 1 + + def test_chain_continuity_across_reconfigure(sdk_dirs: tuple[str, str]) -> None: db_path, data_dir = sdk_dirs configure(db_path=db_path, data_dir=data_dir, tenant_id="tnt_a") From ed8d91b4cb2ae09178198e9d6de1daabaa5b5b2c Mon Sep 17 00:00:00 2001 From: Nathan Gillett Date: Mon, 18 May 2026 21:53:26 -0500 Subject: [PATCH 05/10] fix(sdk): lazy default data dir and secure keypair create Create keypair.json with mode 0600 via O_CREAT|O_EXCL; resolve Path.home() only in default_data_dir() so import works in containers. Signed-off-by: Nathan Gillett --- src/intentproof/client.py | 8 +++++-- src/intentproof/keys.py | 45 ++++++++++++++++++++++++++------------- tests/test_keys.py | 43 +++++++++++++++++++++++++++++++++++++ 3 files changed, 79 insertions(+), 17 deletions(-) create mode 100644 tests/test_keys.py diff --git a/src/intentproof/client.py b/src/intentproof/client.py index 0b72ffb..a5ef22d 100644 --- a/src/intentproof/client.py +++ b/src/intentproof/client.py @@ -17,7 +17,11 @@ from intentproof.signing import Ed25519PublicKey SDK_VERSION = "python@0.1.0" -DEFAULT_DATA_DIR = Path.home() / ".intentproof" / "sdk-python" + + +def default_data_dir() -> Path: + """Default SDK data directory (resolved lazily for container imports).""" + return Path.home() / ".intentproof" / "sdk-python" _instance_private_key: Ed25519PrivateKey | None = None _instance_id: str | None = None @@ -41,7 +45,7 @@ def configure( if _outbox is not None: _outbox.close() - _data_dir = Path(data_dir) if data_dir else DEFAULT_DATA_DIR + _data_dir = Path(data_dir) if data_dir else default_data_dir() ensure_dir(_data_dir) kp = load_or_create_keypair(_data_dir) diff --git a/src/intentproof/keys.py b/src/intentproof/keys.py index a265c38..8e8ab93 100644 --- a/src/intentproof/keys.py +++ b/src/intentproof/keys.py @@ -4,12 +4,15 @@ import base64 import json +import logging import os from dataclasses import dataclass from pathlib import Path import ulid as _ulid +logger = logging.getLogger(__name__) + @dataclass(frozen=True) class Keypair: @@ -21,14 +24,34 @@ def ensure_dir(path: Path) -> None: path.mkdir(parents=True, exist_ok=True) +def _ensure_key_permissions(key_path: Path) -> None: + try: + os.chmod(key_path, 0o600) + except OSError as exc: + logger.warning( + "[intentproof] could not set %s to mode 0600: %s", + key_path, + exc, + ) + + +def _write_keypair_file(key_path: Path, payload: dict[str, str]) -> None: + content = json.dumps(payload, indent=2) + "\n" + flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL + fd = os.open(key_path, flags, 0o600) + try: + with os.fdopen(fd, "w", encoding="utf-8") as handle: + handle.write(content) + except Exception: + key_path.unlink(missing_ok=True) + raise + + def load_or_create_keypair(data_dir: Path) -> Keypair: key_path = data_dir / "keypair.json" if key_path.exists(): + _ensure_key_permissions(key_path) raw = key_path.read_text(encoding="utf-8") - try: - os.chmod(key_path, 0o600) - except OSError: - pass data = json.loads(raw) return Keypair( private_key=data["privateKey"], @@ -40,16 +63,8 @@ def load_or_create_keypair(data_dir: Path) -> Keypair: private_key=private_key, instance_id=f"inst_{_ulid.new()}", ) - key_path.write_text( - json.dumps( - {"privateKey": kp.private_key, "instanceId": kp.instance_id}, - indent=2, - ) - + "\n", - encoding="utf-8", + _write_keypair_file( + key_path, + {"privateKey": kp.private_key, "instanceId": kp.instance_id}, ) - try: - os.chmod(key_path, 0o600) - except OSError: - pass return kp diff --git a/tests/test_keys.py b/tests/test_keys.py new file mode 100644 index 0000000..3bbffb1 --- /dev/null +++ b/tests/test_keys.py @@ -0,0 +1,43 @@ +"""Keypair file permission and import safety tests.""" + +from __future__ import annotations + +import stat +import subprocess +import sys +from pathlib import Path + +from intentproof.keys import load_or_create_keypair + + +def test_new_keypair_created_with_mode_0600(tmp_path: Path) -> None: + load_or_create_keypair(tmp_path) + key_path = tmp_path / "keypair.json" + mode = stat.S_IMODE(key_path.stat().st_mode) + assert mode == 0o600 + + +def test_client_module_import_without_home(tmp_path: Path) -> None: + """import intentproof.client must not resolve Path.home() at import time.""" + script = f""" +from pathlib import Path +from unittest.mock import patch + +with patch.object(Path, "home", side_effect=RuntimeError("no home directory")): + import intentproof.client as client + +client.configure( + data_dir={str(tmp_path)!r}, + db_path={str(tmp_path / "outbox.db")!r}, + tenant_id="tnt_container", +) +print(client.get_instance_id()) +""" + result = subprocess.run( + [sys.executable, "-c", script], + capture_output=True, + text=True, + check=True, + cwd=Path(__file__).resolve().parents[1], + ) + assert result.stdout.strip().startswith("inst_") From 36b96be6fa6083acc3701994f7f04d17d1f7cba5 Mon Sep 17 00:00:00 2001 From: Nathan Gillett Date: Mon, 18 May 2026 22:03:43 -0500 Subject: [PATCH 06/10] fix(sdk): allow outbox use from worker threads Open SQLite with check_same_thread=False and serialize access with a lock so wrap() works under Flask/gunicorn workers. Signed-off-by: Nathan Gillett --- src/intentproof/outbox.py | 122 +++++++++++++++++++++----------------- tests/test_sdk.py | 22 +++++++ 2 files changed, 88 insertions(+), 56 deletions(-) diff --git a/src/intentproof/outbox.py b/src/intentproof/outbox.py index 610c3cc..abff2d8 100644 --- a/src/intentproof/outbox.py +++ b/src/intentproof/outbox.py @@ -4,34 +4,39 @@ import json import sqlite3 +import threading from typing import Any class Outbox: def __init__(self, db_path: str) -> None: - self._db = sqlite3.connect(db_path) + # Allow use from worker threads (e.g. Flask/gunicorn) after configure(). + self._db = sqlite3.connect(db_path, check_same_thread=False) + self._lock = threading.Lock() self._db.execute("PRAGMA journal_mode=WAL") - self._db.executescript( - """ - CREATE TABLE IF NOT EXISTS events ( - event_id TEXT PRIMARY KEY, - body JSON NOT NULL - ); - CREATE TABLE IF NOT EXISTS chains ( - correlation_id TEXT PRIMARY KEY, - last_position INTEGER NOT NULL, - last_hash TEXT NOT NULL - ); - """ - ) - self._db.commit() + with self._lock: + self._db.executescript( + """ + CREATE TABLE IF NOT EXISTS events ( + event_id TEXT PRIMARY KEY, + body JSON NOT NULL + ); + CREATE TABLE IF NOT EXISTS chains ( + correlation_id TEXT PRIMARY KEY, + last_position INTEGER NOT NULL, + last_hash TEXT NOT NULL + ); + """ + ) + self._db.commit() def append(self, event_id: str, body: dict[str, Any]) -> None: - self._db.execute( - "INSERT INTO events (event_id, body) VALUES (?, ?)", - (event_id, json.dumps(body)), - ) - self._db.commit() + with self._lock: + self._db.execute( + "INSERT INTO events (event_id, body) VALUES (?, ?)", + (event_id, json.dumps(body)), + ) + self._db.commit() def append_with_chain_state( self, @@ -42,35 +47,38 @@ def append_with_chain_state( event_hash: str, ) -> None: """Persist event and chain head in one transaction.""" - try: - self._db.execute( - "INSERT INTO events (event_id, body) VALUES (?, ?)", - (event_id, json.dumps(body)), - ) - self._db.execute( - """ - INSERT INTO chains (correlation_id, last_position, last_hash) - VALUES (?, ?, ?) - ON CONFLICT(correlation_id) DO UPDATE SET - last_position = excluded.last_position, - last_hash = excluded.last_hash - """, - (correlation_id, position, event_hash), - ) - self._db.commit() - except Exception: - self._db.rollback() - raise + with self._lock: + try: + self._db.execute( + "INSERT INTO events (event_id, body) VALUES (?, ?)", + (event_id, json.dumps(body)), + ) + self._db.execute( + """ + INSERT INTO chains (correlation_id, last_position, last_hash) + VALUES (?, ?, ?) + ON CONFLICT(correlation_id) DO UPDATE SET + last_position = excluded.last_position, + last_hash = excluded.last_hash + """, + (correlation_id, position, event_hash), + ) + self._db.commit() + except Exception: + self._db.rollback() + raise def get_events(self) -> list[dict[str, Any]]: - rows = self._db.execute("SELECT body FROM events").fetchall() + with self._lock: + rows = self._db.execute("SELECT body FROM events").fetchall() return [json.loads(row[0]) for row in rows] def get_chain_state(self, correlation_id: str) -> dict[str, Any] | None: - row = self._db.execute( - "SELECT last_position, last_hash FROM chains WHERE correlation_id = ?", - (correlation_id,), - ).fetchone() + with self._lock: + row = self._db.execute( + "SELECT last_position, last_hash FROM chains WHERE correlation_id = ?", + (correlation_id,), + ).fetchone() if row is None: return None return {"position": row[0], "hash": row[1]} @@ -78,17 +86,19 @@ def get_chain_state(self, correlation_id: str) -> dict[str, Any] | None: def set_chain_state( self, correlation_id: str, position: int, event_hash: str ) -> None: - self._db.execute( - """ - INSERT INTO chains (correlation_id, last_position, last_hash) - VALUES (?, ?, ?) - ON CONFLICT(correlation_id) DO UPDATE SET - last_position = excluded.last_position, - last_hash = excluded.last_hash - """, - (correlation_id, position, event_hash), - ) - self._db.commit() + with self._lock: + self._db.execute( + """ + INSERT INTO chains (correlation_id, last_position, last_hash) + VALUES (?, ?, ?) + ON CONFLICT(correlation_id) DO UPDATE SET + last_position = excluded.last_position, + last_hash = excluded.last_hash + """, + (correlation_id, position, event_hash), + ) + self._db.commit() def close(self) -> None: - self._db.close() + with self._lock: + self._db.close() diff --git a/tests/test_sdk.py b/tests/test_sdk.py index 651e067..81ce788 100644 --- a/tests/test_sdk.py +++ b/tests/test_sdk.py @@ -5,6 +5,7 @@ import base64 import json import tempfile +import threading from pathlib import Path import pytest @@ -78,6 +79,27 @@ def test_produces_signed_event_with_sentinel_prev_hash( assert ev["untrusted_payload"] is True +def test_wrap_records_from_worker_thread(sdk_dirs: tuple[str, str]) -> None: + db_path, data_dir = sdk_dirs + configure(db_path=db_path, data_dir=data_dir, tenant_id="tnt_a") + fn = wrap(intent="Test", action="test.action", fn=lambda x: x + 1) + errors: list[BaseException] = [] + + def worker() -> None: + try: + run_with_correlation_id("corr-worker", lambda: fn(2)) + except BaseException as exc: + errors.append(exc) + + thread = threading.Thread(target=worker) + thread.start() + thread.join(timeout=2.0) + + assert not errors + events = client.get_outbox().get_events() + assert any(e["correlation_id"] == "corr-worker" for e in events) + + def test_configure_closes_previous_outbox(sdk_dirs: tuple[str, str]) -> None: db_path, data_dir = sdk_dirs configure(db_path=db_path, data_dir=data_dir, tenant_id="tnt_a") From 2daeabcbe2b01b22ed4156583e89309bbbca97e9 Mon Sep 17 00:00:00 2001 From: Nathan Gillett Date: Mon, 18 May 2026 22:14:15 -0500 Subject: [PATCH 07/10] fix(sdk): serialize chain reserve, sign, and persist record_chained_event holds the outbox lock from chain read through signing to commit so concurrent workers cannot reuse the same chain position. Signed-off-by: Nathan Gillett --- src/intentproof/instrumentation.py | 68 +++++++++++++---------------- src/intentproof/outbox.py | 70 ++++++++++++++++++++++++------ tests/test_sdk.py | 42 ++++++++++++++++++ 3 files changed, 129 insertions(+), 51 deletions(-) diff --git a/src/intentproof/instrumentation.py b/src/intentproof/instrumentation.py index 77170e2..2da9258 100644 --- a/src/intentproof/instrumentation.py +++ b/src/intentproof/instrumentation.py @@ -12,7 +12,7 @@ import ulid as _ulid from intentproof import client -from intentproof.signing import SENTINEL_PREV_HASH, event_content_hash, sign_event +from intentproof.signing import event_content_hash, sign_event _correlation_id: ContextVar[str | None] = ContextVar( "intentproof_correlation_id", default=None @@ -71,43 +71,35 @@ def _record_execution( error_obj: dict[str, Any] | None, ) -> None: outbox = client.get_outbox() - chain_pos = 1 - prev_hash = SENTINEL_PREV_HASH - state = outbox.get_chain_state(correlation_id) - if state: - chain_pos = state["position"] + 1 - prev_hash = state["hash"] - - event: dict[str, Any] = { - "schema": "intentproof.event.v1", - "event_id": event_id, - "tenant_id": client.get_tenant_id(), - "instance_id": client.get_instance_id(), - "correlation_id": correlation_id, - "provenance_class": "sdk_attested_evidence", - "prev_event_hash": prev_hash, - "chain_position": chain_pos, - "intent": intent, - "action": action, - "status": status, - "started_at": _iso_timestamp(t0_ms), - "completed_at": _iso_timestamp(t1_ms), - "duration_ms": t1_ms - t0_ms, - "inputs": inputs, - "output": output if status == "ok" else None, - "error": error_obj, - "attributes": {}, - "untrusted_payload": _untrusted_payload(inputs, output, status), - "spec_version": "1.0.0", - "sdk_version": client.SDK_VERSION, - } - - signed = sign_event(event, client.get_private_key(), client.get_instance_id()) - event_hash = event_content_hash(signed) - - outbox.append_with_chain_state( - event_id, signed, correlation_id, chain_pos, event_hash - ) + + def build_signed(chain_pos: int, prev_hash: str) -> tuple[dict[str, Any], str]: + event: dict[str, Any] = { + "schema": "intentproof.event.v1", + "event_id": event_id, + "tenant_id": client.get_tenant_id(), + "instance_id": client.get_instance_id(), + "correlation_id": correlation_id, + "provenance_class": "sdk_attested_evidence", + "prev_event_hash": prev_hash, + "chain_position": chain_pos, + "intent": intent, + "action": action, + "status": status, + "started_at": _iso_timestamp(t0_ms), + "completed_at": _iso_timestamp(t1_ms), + "duration_ms": t1_ms - t0_ms, + "inputs": inputs, + "output": output if status == "ok" else None, + "error": error_obj, + "attributes": {}, + "untrusted_payload": _untrusted_payload(inputs, output, status), + "spec_version": "1.0.0", + "sdk_version": client.SDK_VERSION, + } + signed = sign_event(event, client.get_private_key(), client.get_instance_id()) + return signed, event_content_hash(signed) + + signed = outbox.record_chained_event(correlation_id, event_id, build_signed) exporter = client.get_exporter() if exporter is not None: diff --git a/src/intentproof/outbox.py b/src/intentproof/outbox.py index abff2d8..95607cf 100644 --- a/src/intentproof/outbox.py +++ b/src/intentproof/outbox.py @@ -5,8 +5,11 @@ import json import sqlite3 import threading +from collections.abc import Callable from typing import Any +from intentproof.signing import SENTINEL_PREV_HASH + class Outbox: def __init__(self, db_path: str) -> None: @@ -38,6 +41,58 @@ def append(self, event_id: str, body: dict[str, Any]) -> None: ) self._db.commit() + def _next_chain_link(self, correlation_id: str) -> tuple[int, str]: + row = self._db.execute( + "SELECT last_position, last_hash FROM chains WHERE correlation_id = ?", + (correlation_id,), + ).fetchone() + if row is None: + return 1, SENTINEL_PREV_HASH + return row[0] + 1, row[1] + + def _persist_chain_event( + self, + event_id: str, + body: dict[str, Any], + correlation_id: str, + position: int, + event_hash: str, + ) -> None: + self._db.execute( + "INSERT INTO events (event_id, body) VALUES (?, ?)", + (event_id, json.dumps(body)), + ) + self._db.execute( + """ + INSERT INTO chains (correlation_id, last_position, last_hash) + VALUES (?, ?, ?) + ON CONFLICT(correlation_id) DO UPDATE SET + last_position = excluded.last_position, + last_hash = excluded.last_hash + """, + (correlation_id, position, event_hash), + ) + + def record_chained_event( + self, + correlation_id: str, + event_id: str, + build_signed: Callable[[int, str], tuple[dict[str, Any], str]], + ) -> dict[str, Any]: + """Reserve chain slot, sign, and persist under one lock.""" + with self._lock: + chain_pos, prev_hash = self._next_chain_link(correlation_id) + signed, event_hash = build_signed(chain_pos, prev_hash) + try: + self._persist_chain_event( + event_id, signed, correlation_id, chain_pos, event_hash + ) + self._db.commit() + except Exception: + self._db.rollback() + raise + return signed + def append_with_chain_state( self, event_id: str, @@ -49,19 +104,8 @@ def append_with_chain_state( """Persist event and chain head in one transaction.""" with self._lock: try: - self._db.execute( - "INSERT INTO events (event_id, body) VALUES (?, ?)", - (event_id, json.dumps(body)), - ) - self._db.execute( - """ - INSERT INTO chains (correlation_id, last_position, last_hash) - VALUES (?, ?, ?) - ON CONFLICT(correlation_id) DO UPDATE SET - last_position = excluded.last_position, - last_hash = excluded.last_hash - """, - (correlation_id, position, event_hash), + self._persist_chain_event( + event_id, body, correlation_id, position, event_hash ) self._db.commit() except Exception: diff --git a/tests/test_sdk.py b/tests/test_sdk.py index 81ce788..8715252 100644 --- a/tests/test_sdk.py +++ b/tests/test_sdk.py @@ -79,6 +79,48 @@ def test_produces_signed_event_with_sentinel_prev_hash( assert ev["untrusted_payload"] is True +def test_concurrent_wrap_preserves_chain_positions( + sdk_dirs: tuple[str, str], +) -> None: + from intentproof.signing import event_content_hash + + db_path, data_dir = sdk_dirs + configure(db_path=db_path, data_dir=data_dir, tenant_id="tnt_a") + fn = wrap(intent="Test", action="test.action", fn=lambda x: x) + errors: list[BaseException] = [] + barrier = threading.Barrier(2) + + def worker() -> None: + try: + barrier.wait(timeout=2.0) + for _ in range(25): + run_with_correlation_id("corr-race", lambda: fn(1)) + except BaseException as exc: + errors.append(exc) + + threads = [threading.Thread(target=worker) for _ in range(2)] + for thread in threads: + thread.start() + for thread in threads: + thread.join(timeout=5.0) + + assert not errors + events = [ + e + for e in client.get_outbox().get_events() + if e["correlation_id"] == "corr-race" + ] + assert len(events) == 50 + positions = sorted(e["chain_position"] for e in events) + assert positions == list(range(1, 51)) + + by_pos = {e["chain_position"]: e for e in events} + assert by_pos[1]["prev_event_hash"].startswith("sha256:") + for pos in range(2, 51): + prev_hash = event_content_hash(by_pos[pos - 1]) + assert by_pos[pos]["prev_event_hash"] == prev_hash + + def test_wrap_records_from_worker_thread(sdk_dirs: tuple[str, str]) -> None: db_path, data_dir = sdk_dirs configure(db_path=db_path, data_dir=data_dir, tenant_id="tnt_a") From 932b68a2ece6c3052eade6b9d458d71cd4269484 Mon Sep 17 00:00:00 2001 From: Nathan Gillett Date: Mon, 18 May 2026 22:25:26 -0500 Subject: [PATCH 08/10] fix(sdk): safe reconfigure, async cancel, drop dead code Swap outbox only after new config succeeds; use Exception in wrap so CancelledError propagates; remove unused public_key_bytes. Signed-off-by: Nathan Gillett --- src/intentproof/client.py | 36 +++++++++++++++++++----------- src/intentproof/instrumentation.py | 8 +++---- src/intentproof/signing.py | 4 ---- tests/test_async_wrap.py | 27 ++++++++++++++++++++++ tests/test_sdk.py | 19 ++++++++++++++++ 5 files changed, 73 insertions(+), 21 deletions(-) create mode 100644 tests/test_async_wrap.py diff --git a/src/intentproof/client.py b/src/intentproof/client.py index a5ef22d..b36e50e 100644 --- a/src/intentproof/client.py +++ b/src/intentproof/client.py @@ -40,18 +40,16 @@ def configure( ) -> None: global _instance_private_key, _instance_id, _tenant_id, _outbox, _exporter, _data_dir - if _exporter is not None: - _exporter.flush() - if _outbox is not None: - _outbox.close() + prev_exporter = _exporter + prev_outbox = _outbox - _data_dir = Path(data_dir) if data_dir else default_data_dir() - ensure_dir(_data_dir) + new_data_dir = Path(data_dir) if data_dir else default_data_dir() + ensure_dir(new_data_dir) - kp = load_or_create_keypair(_data_dir) - _instance_private_key = load_private_key(kp.private_key) - _instance_id = kp.instance_id - _tenant_id = ( + kp = load_or_create_keypair(new_data_dir) + new_private_key = load_private_key(kp.private_key) + new_instance_id = kp.instance_id + new_tenant_id = ( tenant_id or os.environ.get("INTENTPROOF_TENANT_ID", "").strip() or "tnt_default" @@ -59,11 +57,23 @@ def configure( resolved_db = db_path or os.environ.get("INTENTPROOF_OUTBOX_PATH", "").strip() if not resolved_db: - resolved_db = str(_data_dir / "outbox.db") - _outbox = Outbox(resolved_db) + resolved_db = str(new_data_dir / "outbox.db") + new_outbox = Outbox(resolved_db) ingest = resolve_ingest_url(ingest_url) - _exporter = HttpExporter(ingest) if ingest else None + new_exporter = HttpExporter(ingest) if ingest else None + + if prev_exporter is not None: + prev_exporter.flush() + if prev_outbox is not None: + prev_outbox.close() + + _data_dir = new_data_dir + _instance_private_key = new_private_key + _instance_id = new_instance_id + _tenant_id = new_tenant_id + _outbox = new_outbox + _exporter = new_exporter def flush() -> None: diff --git a/src/intentproof/instrumentation.py b/src/intentproof/instrumentation.py index 2da9258..d0c60a6 100644 --- a/src/intentproof/instrumentation.py +++ b/src/intentproof/instrumentation.py @@ -127,7 +127,7 @@ async def async_wrapped(*args: Any, **kwargs: Any) -> Any: try: result = await fn(*args, **kwargs) - except BaseException as exc: + except Exception as exc: status = "error" error_obj = {"message": str(exc)} reraise = exc @@ -146,7 +146,7 @@ async def async_wrapped(*args: Any, **kwargs: Any) -> Any: status=status, error_obj=error_obj, ) - except BaseException as record_exc: + except Exception as record_exc: if reraise is not None: raise reraise from record_exc raise @@ -168,7 +168,7 @@ def sync_wrapped(*args: Any, **kwargs: Any) -> Any: try: result = fn(*args, **kwargs) - except BaseException as exc: + except Exception as exc: status = "error" error_obj = {"message": str(exc)} reraise = exc @@ -187,7 +187,7 @@ def sync_wrapped(*args: Any, **kwargs: Any) -> Any: status=status, error_obj=error_obj, ) - except BaseException as record_exc: + except Exception as record_exc: if reraise is not None: raise reraise from record_exc raise diff --git a/src/intentproof/signing.py b/src/intentproof/signing.py index 8c59058..eb05268 100644 --- a/src/intentproof/signing.py +++ b/src/intentproof/signing.py @@ -67,7 +67,3 @@ def load_private_key(raw_b64: str) -> Ed25519PrivateKey: return Ed25519PrivateKey.from_private_bytes( base64.b64decode(raw_b64) ) - - -def public_key_bytes(private_key: Ed25519PrivateKey) -> bytes: - return private_key.public_key().public_bytes_raw() diff --git a/tests/test_async_wrap.py b/tests/test_async_wrap.py new file mode 100644 index 0000000..71f193c --- /dev/null +++ b/tests/test_async_wrap.py @@ -0,0 +1,27 @@ +"""Async wrap behavior tests.""" + +from __future__ import annotations + +import asyncio + +import pytest + +from intentproof import client, configure, wrap + + +def test_cancelled_error_not_recorded(tmp_path) -> None: + configure( + db_path=str(tmp_path / "outbox.db"), + data_dir=str(tmp_path / "data"), + tenant_id="tnt_async", + ) + + async def cancelled() -> None: + raise asyncio.CancelledError() + + fn = wrap(intent="Test", action="test.action", fn=cancelled) + + with pytest.raises(asyncio.CancelledError): + asyncio.run(fn()) + + assert client.get_outbox().get_events() == [] diff --git a/tests/test_sdk.py b/tests/test_sdk.py index 8715252..e396c2a 100644 --- a/tests/test_sdk.py +++ b/tests/test_sdk.py @@ -142,6 +142,25 @@ def worker() -> None: assert any(e["correlation_id"] == "corr-worker" for e in events) +def test_configure_keeps_working_outbox_on_failure(sdk_dirs: tuple[str, str]) -> None: + db_path, data_dir = sdk_dirs + configure(db_path=db_path, data_dir=data_dir, tenant_id="tnt_a") + first = client.get_outbox() + fn = wrap(intent="Test", action="test.action", fn=lambda x: x + 1) + run_with_correlation_id("corr-before-fail", lambda: fn(1)) + + key_path = Path(data_dir) / "keypair.json" + key_path.write_text("not-json", encoding="utf-8") + + with pytest.raises(Exception): + configure(db_path=db_path, data_dir=data_dir, tenant_id="tnt_a") + + assert client.get_outbox() is first + run_with_correlation_id("corr-after-fail", lambda: fn(2)) + events = client.get_outbox().get_events() + assert len(events) == 2 + + def test_configure_closes_previous_outbox(sdk_dirs: tuple[str, str]) -> None: db_path, data_dir = sdk_dirs configure(db_path=db_path, data_dir=data_dir, tenant_id="tnt_a") From 764c90d5c2aa586e34c4c5d28178ade7218d0f7d Mon Sep 17 00:00:00 2001 From: Nathan Gillett Date: Mon, 18 May 2026 22:37:13 -0500 Subject: [PATCH 09/10] fix(keys): load existing keypair on O_EXCL race If another process creates keypair.json first, catch FileExistsError and read the file instead of failing. Signed-off-by: Nathan Gillett --- src/intentproof/keys.py | 29 ++++++++++++++++++----------- tests/test_keys.py | 27 ++++++++++++++++++++++++++- 2 files changed, 44 insertions(+), 12 deletions(-) diff --git a/src/intentproof/keys.py b/src/intentproof/keys.py index 8e8ab93..c6a5c44 100644 --- a/src/intentproof/keys.py +++ b/src/intentproof/keys.py @@ -47,24 +47,31 @@ def _write_keypair_file(key_path: Path, payload: dict[str, str]) -> None: raise +def _load_keypair(key_path: Path) -> Keypair: + _ensure_key_permissions(key_path) + raw = key_path.read_text(encoding="utf-8") + data = json.loads(raw) + return Keypair( + private_key=data["privateKey"], + instance_id=data["instanceId"], + ) + + def load_or_create_keypair(data_dir: Path) -> Keypair: key_path = data_dir / "keypair.json" if key_path.exists(): - _ensure_key_permissions(key_path) - raw = key_path.read_text(encoding="utf-8") - data = json.loads(raw) - return Keypair( - private_key=data["privateKey"], - instance_id=data["instanceId"], - ) + return _load_keypair(key_path) private_key = base64.b64encode(os.urandom(32)).decode("ascii") kp = Keypair( private_key=private_key, instance_id=f"inst_{_ulid.new()}", ) - _write_keypair_file( - key_path, - {"privateKey": kp.private_key, "instanceId": kp.instance_id}, - ) + try: + _write_keypair_file( + key_path, + {"privateKey": kp.private_key, "instanceId": kp.instance_id}, + ) + except FileExistsError: + return _load_keypair(key_path) return kp diff --git a/tests/test_keys.py b/tests/test_keys.py index 3bbffb1..bb4ef85 100644 --- a/tests/test_keys.py +++ b/tests/test_keys.py @@ -5,9 +5,10 @@ import stat import subprocess import sys +import threading from pathlib import Path -from intentproof.keys import load_or_create_keypair +from intentproof.keys import Keypair, load_or_create_keypair def test_new_keypair_created_with_mode_0600(tmp_path: Path) -> None: @@ -17,6 +18,30 @@ def test_new_keypair_created_with_mode_0600(tmp_path: Path) -> None: assert mode == 0o600 +def test_concurrent_load_or_create_keypair(tmp_path: Path) -> None: + """Parallel configure() must not raise when racing on keypair creation.""" + barrier = threading.Barrier(8) + results: list[Keypair] = [] + errors: list[BaseException] = [] + + def worker() -> None: + try: + barrier.wait() + results.append(load_or_create_keypair(tmp_path)) + except BaseException as exc: + errors.append(exc) + + threads = [threading.Thread(target=worker) for _ in range(8)] + for thread in threads: + thread.start() + for thread in threads: + thread.join() + + assert not errors + assert len(results) == 8 + assert len({kp.instance_id for kp in results}) == 1 + + def test_client_module_import_without_home(tmp_path: Path) -> None: """import intentproof.client must not resolve Path.home() at import time.""" script = f""" From c0cf4029c520594c6562709f3ae2afa06f6dd1ae Mon Sep 17 00:00:00 2001 From: Nathan Gillett Date: Mon, 18 May 2026 22:40:17 -0500 Subject: [PATCH 10/10] fix(keys): retry load while peer finishes keypair write Losers on O_EXCL can read an empty or partial file; retry JSON load and fsync on create so CI concurrent test is stable. Signed-off-by: Nathan Gillett --- src/intentproof/keys.py | 31 ++++++++++++++++++++++++------- 1 file changed, 24 insertions(+), 7 deletions(-) diff --git a/src/intentproof/keys.py b/src/intentproof/keys.py index c6a5c44..3cc740d 100644 --- a/src/intentproof/keys.py +++ b/src/intentproof/keys.py @@ -6,6 +6,7 @@ import json import logging import os +import time from dataclasses import dataclass from pathlib import Path @@ -13,6 +14,9 @@ logger = logging.getLogger(__name__) +_LOAD_ATTEMPTS = 50 +_LOAD_RETRY_SEC = 0.02 + @dataclass(frozen=True) class Keypair: @@ -42,19 +46,32 @@ def _write_keypair_file(key_path: Path, payload: dict[str, str]) -> None: try: with os.fdopen(fd, "w", encoding="utf-8") as handle: handle.write(content) + handle.flush() + os.fsync(handle.fileno()) except Exception: key_path.unlink(missing_ok=True) raise def _load_keypair(key_path: Path) -> Keypair: - _ensure_key_permissions(key_path) - raw = key_path.read_text(encoding="utf-8") - data = json.loads(raw) - return Keypair( - private_key=data["privateKey"], - instance_id=data["instanceId"], - ) + last_error: Exception | None = None + for _ in range(_LOAD_ATTEMPTS): + try: + _ensure_key_permissions(key_path) + raw = key_path.read_text(encoding="utf-8") + if not raw.strip(): + raise ValueError("empty keypair file") + data = json.loads(raw) + private_key = data["privateKey"] + instance_id = data["instanceId"] + if not isinstance(private_key, str) or not isinstance(instance_id, str): + raise ValueError("invalid keypair file") + return Keypair(private_key=private_key, instance_id=instance_id) + except (OSError, json.JSONDecodeError, ValueError, KeyError, TypeError) as exc: + last_error = exc + time.sleep(_LOAD_RETRY_SEC) + assert last_error is not None + raise last_error def load_or_create_keypair(data_dir: Path) -> Keypair: