diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 0845bb5..eb353fe 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -21,5 +21,8 @@ jobs: run: | pip install -e ".[dev]" - - name: Run tests + - name: Run tests with coverage run: pytest -q + + - name: Enforce coverage threshold + run: bash ./scripts/check-coverage.sh 95 diff --git a/.gitignore b/.gitignore index 9164e40..c3d43d6 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,8 @@ __pycache__/ *.py[cod] *$py.class .pytest_cache/ +.coverage +htmlcov/ # Virtual environments .venv/ diff --git a/README.md b/README.md index 7ef42ee..2245f81 100644 --- a/README.md +++ b/README.md @@ -8,12 +8,15 @@ Early scaffolding repo for IntentProof's Python SDK. Tracks the Node SDK's wrap()/exporter/outbox contract so a Python application can emit and verify the same signed execution events. -## Planned scope +## Development -- Event wrapping helpers -- Correlation and chain management -- Canonical serialization and signing -- Hosted ingest transport +```bash +pip install -e ".[dev]" +pytest +``` + +CI enforces at least 95% line coverage on `src/intentproof/` (see +`pyproject.toml` and `scripts/check-coverage.sh`). ## License diff --git a/pyproject.toml b/pyproject.toml index 8106bfb..3665c79 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,11 +17,26 @@ dependencies = [ [project.optional-dependencies] dev = [ "pytest>=8.0.0", + "pytest-cov>=5.0.0", ] [tool.setuptools.packages.find] where = ["src"] +[tool.coverage.run] +source = ["intentproof"] +omit = [] + +[tool.coverage.report] +fail_under = 95 +show_missing = true +skip_empty = true + [tool.pytest.ini_options] testpaths = ["tests"] pythonpath = ["src"] +addopts = [ + "--cov=intentproof", + "--cov-report=term-missing", + "--cov-fail-under=95", +] diff --git a/scripts/check-coverage.sh b/scripts/check-coverage.sh new file mode 100755 index 0000000..c0d3ac2 --- /dev/null +++ b/scripts/check-coverage.sh @@ -0,0 +1,41 @@ +#!/usr/bin/env bash + +set -euo pipefail + +MIN_COVERAGE="${1:-95}" + +COVERAGE=() +if command -v coverage >/dev/null 2>&1; then + COVERAGE=(coverage) +else + for py in python python3; do + if command -v "$py" >/dev/null 2>&1 && "$py" -m coverage --version >/dev/null 2>&1; then + COVERAGE=("$py" -m coverage) + break + fi + done +fi + +if [[ ${#COVERAGE[@]} -eq 0 ]]; then + echo "coverage CLI not found; install dev deps with: pip install -e \".[dev]\"" >&2 + exit 2 +fi + +TOTAL_LINE="$("${COVERAGE[@]}" report --include='src/intentproof/*' | awk '/^TOTAL/{print; exit}')" +if [[ -z "$TOTAL_LINE" ]]; then + echo "unable to read total coverage; run pytest with --cov first" >&2 + exit 2 +fi + +TOTAL_PERCENT="$(printf '%s' "$TOTAL_LINE" | awk '{print $NF}' | tr -d '%')" + +echo "Total coverage: ${TOTAL_PERCENT}%" +echo "Minimum required: ${MIN_COVERAGE}%" + +if awk -v got="$TOTAL_PERCENT" -v min="$MIN_COVERAGE" 'BEGIN { exit !(got + 0 >= min + 0) }'; then + echo "PASS: coverage threshold met" + exit 0 +fi + +echo "FAIL: coverage threshold not met" >&2 +exit 1 diff --git a/tests/test_async_wrap.py b/tests/test_async_wrap.py index 71f193c..a823f86 100644 --- a/tests/test_async_wrap.py +++ b/tests/test_async_wrap.py @@ -6,7 +6,7 @@ import pytest -from intentproof import client, configure, wrap +from intentproof import client, configure, run_with_correlation_id, wrap def test_cancelled_error_not_recorded(tmp_path) -> None: @@ -25,3 +25,94 @@ async def cancelled() -> None: asyncio.run(fn()) assert client.get_outbox().get_events() == [] + + +def test_async_wrap_records_success(tmp_path) -> None: + configure( + db_path=str(tmp_path / "outbox.db"), + data_dir=str(tmp_path / "data"), + tenant_id="tnt_async", + ) + + async def add(a: int, b: int) -> int: + return a + b + + fn = wrap(intent="Async", action="async.add", fn=add) + result = run_with_correlation_id("corr-async-ok", lambda: asyncio.run(fn(2, 3))) + + assert result == 5 + events = client.get_outbox().get_events() + assert len(events) == 1 + assert events[0]["status"] == "ok" + assert events[0]["output"] == 5 + + +def test_async_wrap_records_error(tmp_path) -> None: + configure( + db_path=str(tmp_path / "outbox.db"), + data_dir=str(tmp_path / "data"), + tenant_id="tnt_async", + ) + + async def boom() -> None: + raise ValueError("async boom") + + fn = wrap(intent="Async", action="async.boom", fn=boom) + with pytest.raises(ValueError, match="async boom"): + asyncio.run(fn()) + + events = client.get_outbox().get_events() + assert events[-1]["status"] == "error" + assert events[-1]["error"] == {"message": "async boom"} + + +def test_async_wrap_preserves_app_error_when_record_fails( + tmp_path, monkeypatch: pytest.MonkeyPatch +) -> None: + configure( + db_path=str(tmp_path / "outbox.db"), + data_dir=str(tmp_path / "data"), + tenant_id="tnt_async", + ) + + async def boom() -> None: + raise ValueError("async boom") + + fn = wrap(intent="Async", action="async.boom", fn=boom) + + def fail_record(**_kwargs: object) -> None: + raise RuntimeError("outbox unavailable") + + monkeypatch.setattr( + "intentproof.instrumentation._record_execution", fail_record + ) + + with pytest.raises(ValueError, match="async boom") as exc_info: + asyncio.run(fn()) + + assert isinstance(exc_info.value.__cause__, RuntimeError) + + +def test_async_wrap_record_failure_without_app_error_raises( + tmp_path, monkeypatch: pytest.MonkeyPatch +) -> None: + configure( + db_path=str(tmp_path / "outbox.db"), + data_dir=str(tmp_path / "data"), + tenant_id="tnt_async", + ) + + async def ok() -> int: + return 1 + + fn = wrap(intent="Async", action="async.ok", fn=ok) + + def fail_record(**_kwargs: object) -> None: + raise RuntimeError("outbox unavailable") + + monkeypatch.setattr( + "intentproof.instrumentation._record_execution", fail_record + ) + + with pytest.raises(RuntimeError, match="outbox unavailable"): + asyncio.run(fn()) diff --git a/tests/test_canon.py b/tests/test_canon.py index c181b2e..714c0cf 100644 --- a/tests/test_canon.py +++ b/tests/test_canon.py @@ -2,8 +2,17 @@ import os import unittest +from unittest.mock import patch -from intentproof.canon import canonicalize +from intentproof.canon import ( + _CanonObject, + _encode_int, + _encode_object, + _format_es6, + _integer_to_scientific, + _to_shortest_scientific, + canonicalize, +) # Ported from intentproof-spec/conformance/jcs_vectors.ts @@ -191,6 +200,74 @@ def test_rejects_unclosed_object(self): canonicalize('{') +class TestLargeIntegers(unittest.TestCase): + def test_safe_integer_uses_decimal(self): + self.assertEqual(canonicalize(9007199254740992), "9007199254740992") + + def test_out_of_range_integer_raises(self): + with self.assertRaises(ValueError): + canonicalize(10**400) + + def test_integer_overflow_on_float_conversion_raises(self): + with patch("intentproof.canon.float", side_effect=OverflowError): + with self.assertRaises(ValueError): + canonicalize(10**400) + + +class TestUnsupportedTypes(unittest.TestCase): + def test_rejects_non_string_object_keys(self): + with self.assertRaises(TypeError): + canonicalize({1: "x"}) + + def test_rejects_unsupported_values(self): + with self.assertRaises(TypeError): + canonicalize(object()) + + def test_rejects_unsupported_nested_values(self): + with self.assertRaises(TypeError): + canonicalize([object()]) + + +class TestFloatEdgeBranches(unittest.TestCase): + def test_zero_float(self): + self.assertEqual(canonicalize(0.0), "0") + + def test_negative_zero_string(self): + self.assertEqual(canonicalize("-0"), "0") + + def test_trailing_zero_decimal_string(self): + self.assertEqual(canonicalize("5.0"), "5") + + def test_integer_to_scientific_zero_strings(self): + self.assertEqual(_integer_to_scientific("0"), ("0", 0)) + self.assertEqual(_integer_to_scientific("000"), ("0", 0)) + + def test_to_shortest_scientific_without_dot_in_repr(self): + with patch("intentproof.canon.repr", return_value="42"): + self.assertEqual(_to_shortest_scientific(42.0), ("42", 1)) + + def test_format_es6_when_digits_round_to_zero(self): + with patch( + "intentproof.canon._to_shortest_scientific", return_value=("0", 0) + ): + self.assertEqual(_format_es6(1.0), "0") + + def test_large_integer_beyond_safe_range_uses_float_formatting(self): + value = 2**53 + 1 + self.assertEqual(canonicalize(value), _encode_int(value)) + + def test_encode_int_rejects_infinite_float_conversion(self): + with patch("intentproof.canon.float", return_value=float("inf")): + with self.assertRaises(ValueError): + _encode_int(2**60) + + def test_encode_object_rejects_unsupported_nested_type(self): + obj = _CanonObject([("key", 1)]) + obj.values["key"] = frozenset({1}) + with self.assertRaises(TypeError): + _encode_object(obj) + + class TestPolicyBodyCrossCheck(unittest.TestCase): def test_byte_equality_with_go_fixture(self): body = { diff --git a/tests/test_e2e_local_loop.py b/tests/test_e2e_local_loop.py new file mode 100644 index 0000000..6172d62 --- /dev/null +++ b/tests/test_e2e_local_loop.py @@ -0,0 +1,253 @@ +"""End-to-end local-loop flows: configure, wrap, outbox, and HTTP export.""" + +from __future__ import annotations + +import json +import threading +from http.server import BaseHTTPRequestHandler, HTTPServer +from pathlib import Path +from typing import Any +from unittest.mock import patch + +import pytest + +from intentproof import client, configure, flush, push_subject_mapping, run_with_correlation_id, wrap +from intentproof.exporter import post_execution_event +from intentproof.http_exporter import HttpExporter, resolve_ingest_url +from intentproof.signing import verify_event_signature + + +class _IngestHandler(BaseHTTPRequestHandler): + received: list[dict[str, Any]] = [] + + def log_message(self, format: str, *args: object) -> None: + del format, args + + def do_POST(self) -> None: + length = int(self.headers.get("Content-Length", "0")) + body = json.loads(self.rfile.read(length).decode("utf-8")) + type(self).received.append(body) + self.send_response(202) + self.end_headers() + + +@pytest.fixture +def ingest_server() -> tuple[str, type[_IngestHandler]]: + _IngestHandler.received = [] + server = HTTPServer(("127.0.0.1", 0), _IngestHandler) + thread = threading.Thread(target=server.serve_forever, daemon=True) + thread.start() + host, port = server.server_address + base_url = f"http://{host}:{port}" + try: + yield base_url, _IngestHandler + finally: + server.shutdown() + thread.join(timeout=2.0) + + +def test_local_loop_wrap_flush_posts_to_ingest( + tmp_path: Path, ingest_server: tuple[str, type[_IngestHandler]] +) -> None: + base_url, handler_cls = ingest_server + data_dir = tmp_path / "data" + configure( + db_path=str(tmp_path / "outbox.db"), + data_dir=str(data_dir), + tenant_id="tnt_e2e", + ingest_url=base_url, + ) + push_subject_mapping("src", "user", "usr_1") + + fn = wrap(intent="Pay", action="payments.charge", fn=lambda amount: amount * 100) + result = run_with_correlation_id("corr-e2e", lambda: fn(42)) + assert result == 4200 + + flush() + + assert len(handler_cls.received) == 1 + posted = handler_cls.received[0] + assert posted["tenant_id"] == "tnt_e2e" + assert posted["correlation_id"] == "corr-e2e" + assert posted["status"] == "ok" + assert verify_event_signature(posted, client.get_public_key()) + + stored = client.get_outbox().get_events() + assert len(stored) == 1 + assert stored[0]["event_id"] == posted["event_id"] + + +def test_configure_default_outbox_path_under_data_dir(tmp_path: Path) -> None: + data_dir = tmp_path / "data" + configure( + data_dir=str(data_dir), + tenant_id="tnt_default_db", + ) + assert (data_dir / "outbox.db").exists() + + +def test_configure_from_environment_defaults( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, + ingest_server: tuple[str, type[_IngestHandler]], +) -> None: + base_url, _handler_cls = ingest_server + home = tmp_path / "home" + home.mkdir() + custom_db = tmp_path / "env-outbox.db" + + monkeypatch.setenv("INTENTPROOF_TENANT_ID", "tnt_from_env") + monkeypatch.setenv("INTENTPROOF_OUTBOX_PATH", str(custom_db)) + monkeypatch.setenv("INTENTPROOF_INGEST_URL", base_url) + + with patch.object(Path, "home", return_value=home): + configure() + + assert client.get_tenant_id() == "tnt_from_env" + assert custom_db.exists() + assert (home / ".intentproof" / "sdk-python" / "keypair.json").exists() + + fn = wrap(intent="Env", action="env.test", fn=lambda: "ok") + run_with_correlation_id("corr-env", fn) + flush() + + +def test_resolve_ingest_url_local_ingest_flag(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.delenv("INTENTPROOF_INGEST_URL", raising=False) + monkeypatch.setenv("INTENTPROOF_USE_LOCAL_INGEST", "1") + assert resolve_ingest_url() == "http://127.0.0.1:9787/v1/events" + + +def test_resolve_ingest_url_appends_events_suffix() -> None: + assert ( + resolve_ingest_url("https://ingest.example.com") + == "https://ingest.example.com/v1/events" + ) + + +def test_reconfigure_flushes_previous_exporter( + tmp_path: Path, ingest_server: tuple[str, type[_IngestHandler]] +) -> None: + base_url, handler_cls = ingest_server + data_dir = tmp_path / "data" + configure( + db_path=str(tmp_path / "outbox.db"), + data_dir=str(data_dir), + tenant_id="tnt_flush", + ingest_url=base_url, + ) + fn = wrap(intent="Flush", action="flush.test", fn=lambda: None) + run_with_correlation_id("corr-flush", fn) + + configure( + db_path=str(tmp_path / "outbox.db"), + data_dir=str(data_dir), + tenant_id="tnt_flush", + ) + + assert len(handler_cls.received) == 1 + + +def test_http_exporter_surfaces_ingest_url() -> None: + exporter = HttpExporter("http://127.0.0.1:1/v1/events") + assert exporter.ingest_url.endswith("/v1/events") + + +def test_post_execution_event_accepts_200( + ingest_server: tuple[str, type[_IngestHandler]], +) -> None: + base_url, handler_cls = ingest_server + post_execution_event( + f"{base_url}/v1/events", + {"schema": "intentproof.event.v1", "event_id": "evt_test"}, + ) + assert len(handler_cls.received) == 1 + + +def test_export_logs_ingest_failure( + tmp_path: Path, caplog: pytest.LogCaptureFixture +) -> None: + exporter = HttpExporter("http://127.0.0.1:1/v1/events") + with caplog.at_level("WARNING"): + exporter._export_one({"schema": "intentproof.event.v1"}) + assert any("ingest export failed" in record.message for record in caplog.records) + + +def test_wrap_without_inputs_marks_untrusted_when_output_present( + tmp_path: Path, +) -> None: + configure( + db_path=str(tmp_path / "outbox.db"), + data_dir=str(tmp_path / "data"), + tenant_id="tnt_trusted", + ) + + fn = wrap(intent="No args", action="trusted.test", fn=lambda: {"ok": True}) + run_with_correlation_id("corr-trusted", fn) + + ev = client.get_outbox().get_events()[0] + assert ev["untrusted_payload"] is True + assert ev["inputs"] == [] + + +def test_wrap_without_inputs_and_none_output_is_trusted( + tmp_path: Path, +) -> None: + configure( + db_path=str(tmp_path / "outbox.db"), + data_dir=str(tmp_path / "data"), + tenant_id="tnt_none", + ) + + fn = wrap(intent="Void", action="void.test", fn=lambda: None) + run_with_correlation_id("corr-none", fn) + + ev = client.get_outbox().get_events()[0] + assert ev["untrusted_payload"] is False + + +def test_wrap_record_failure_without_app_error_raises( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch +) -> None: + configure( + db_path=str(tmp_path / "outbox.db"), + data_dir=str(tmp_path / "data"), + tenant_id="tnt_record", + ) + + def fail_record(**_kwargs: object) -> None: + raise RuntimeError("outbox unavailable") + + monkeypatch.setattr( + "intentproof.instrumentation._record_execution", fail_record + ) + + fn = wrap(intent="Record fail", action="record.fail", fn=lambda: 1) + with pytest.raises(RuntimeError, match="outbox unavailable"): + fn() + + +def test_sdk_not_configured_errors() -> None: + import intentproof.client as client_module + + saved = ( + client_module._outbox, + client_module._instance_id, + client_module._instance_private_key, + ) + client_module._outbox = None + client_module._instance_id = None + client_module._instance_private_key = None + try: + with pytest.raises(RuntimeError, match="SDK not configured"): + client.get_outbox() + with pytest.raises(RuntimeError, match="SDK not configured"): + client.get_instance_id() + with pytest.raises(RuntimeError, match="SDK not configured"): + client.get_private_key() + finally: + ( + client_module._outbox, + client_module._instance_id, + client_module._instance_private_key, + ) = saved diff --git a/tests/test_exporter_edges.py b/tests/test_exporter_edges.py new file mode 100644 index 0000000..7de714e --- /dev/null +++ b/tests/test_exporter_edges.py @@ -0,0 +1,43 @@ +"""HTTP exporter response edge cases.""" + +from __future__ import annotations + +import io +from unittest.mock import MagicMock, patch + +import pytest +import urllib.error + +from intentproof.exporter import post_execution_event + + +def test_post_execution_event_wraps_http_error_detail() -> None: + body = io.BytesIO(b"server error") + error = urllib.error.HTTPError( + "http://127.0.0.1:1/v1/events", + 500, + "Internal Server Error", + {}, + body, + ) + + with patch("intentproof.exporter.urllib.request.urlopen", side_effect=error): + with pytest.raises(RuntimeError, match="ingest POST 500: server error"): + post_execution_event( + "http://127.0.0.1:1/v1/events", + {"schema": "intentproof.event.v1"}, + ) + + +def test_post_execution_event_rejects_unexpected_success_status() -> None: + response = MagicMock() + response.status = 204 + response.__enter__ = MagicMock(return_value=response) + response.__exit__ = MagicMock(return_value=False) + + with patch("intentproof.exporter.urllib.request.urlopen", return_value=response): + with pytest.raises(RuntimeError, match="ingest POST returned 204"): + post_execution_event( + "http://127.0.0.1:1/v1/events", + {"schema": "intentproof.event.v1"}, + ) diff --git a/tests/test_keys_edges.py b/tests/test_keys_edges.py new file mode 100644 index 0000000..7b96a6e --- /dev/null +++ b/tests/test_keys_edges.py @@ -0,0 +1,111 @@ +"""Keypair load edge cases and permission warnings.""" + +from __future__ import annotations + +import json +import os +from pathlib import Path +from unittest.mock import patch + +import pytest + +from intentproof.keys import load_or_create_keypair + + +def test_load_retries_until_keypair_is_written(tmp_path: Path) -> None: + key_path = tmp_path / "keypair.json" + key_path.write_text("", encoding="utf-8") + payload = { + "privateKey": "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA=", + "instanceId": "inst_retry", + } + attempts = {"count": 0} + + def read_text(self: Path, *args: object, **kwargs: object) -> str: + attempts["count"] += 1 + if attempts["count"] < 3: + return "" + return json.dumps(payload) + + with patch.object(Path, "read_text", read_text): + kp = load_or_create_keypair(tmp_path) + + assert kp.instance_id == "inst_retry" + + +def test_load_or_create_keypair_races_on_file_exists(tmp_path: Path) -> None: + payload = { + "privateKey": "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA=", + "instanceId": "inst_race", + } + key_path = tmp_path / "keypair.json" + + def write_then_lose_race(path: Path, _pl: dict[str, str]) -> None: + path.write_text(json.dumps(payload), encoding="utf-8") + raise FileExistsError + + with patch( + "intentproof.keys._write_keypair_file", + side_effect=write_then_lose_race, + ): + kp = load_or_create_keypair(tmp_path) + + assert key_path.exists() + assert kp.instance_id == "inst_race" + + +def test_write_keypair_cleans_up_on_failure(tmp_path: Path) -> None: + from contextlib import contextmanager + + key_path = tmp_path / "keypair.json" + + class FailingWriter: + def write(self, _data: str) -> int: + raise OSError("disk full") + + def flush(self) -> None: + return None + + def fileno(self) -> int: + return 1 + + def __enter__(self) -> "FailingWriter": + return self + + def __exit__(self, *args: object) -> None: + return None + + @contextmanager + def failing_fdopen(fd: int, *args: object, **kwargs: object): + del fd, args, kwargs + yield FailingWriter() + + with patch("intentproof.keys.os.fdopen", side_effect=failing_fdopen): + with pytest.raises(OSError, match="disk full"): + load_or_create_keypair(tmp_path) + + assert not key_path.exists() + + +def test_load_rejects_invalid_keypair_shape(tmp_path: Path) -> None: + key_path = tmp_path / "keypair.json" + key_path.write_text(json.dumps({"privateKey": 1, "instanceId": 2}), encoding="utf-8") + with pytest.raises(ValueError, match="invalid keypair file"): + load_or_create_keypair(tmp_path) + + +def test_chmod_warning_when_permissions_cannot_be_set( + tmp_path: Path, caplog: pytest.LogCaptureFixture +) -> None: + payload = { + "privateKey": "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA=", + "instanceId": "inst_perm", + } + (tmp_path / "keypair.json").write_text(json.dumps(payload), encoding="utf-8") + + with patch("intentproof.keys.os.chmod", side_effect=OSError("read-only fs")): + with caplog.at_level("WARNING"): + kp = load_or_create_keypair(tmp_path) + + assert kp.instance_id == "inst_perm" + assert any("could not set" in record.message for record in caplog.records) diff --git a/tests/test_outbox.py b/tests/test_outbox.py new file mode 100644 index 0000000..e990111 --- /dev/null +++ b/tests/test_outbox.py @@ -0,0 +1,110 @@ +"""Outbox persistence, chain state, and transaction behavior.""" + +from __future__ import annotations + +import pytest + +from intentproof.outbox import Outbox +from intentproof.signing import SENTINEL_PREV_HASH, event_content_hash + + +def _sample_event(event_id: str, correlation_id: str, position: int, prev_hash: str) -> dict: + body = { + "schema": "intentproof.event.v1", + "event_id": event_id, + "correlation_id": correlation_id, + "chain_position": position, + "prev_event_hash": prev_hash, + "status": "ok", + } + return body, event_content_hash(body) + + +def test_append_and_query_events(tmp_path) -> None: + outbox = Outbox(str(tmp_path / "outbox.db")) + body = {"event_id": "evt_1", "status": "ok"} + outbox.append("evt_1", body) + assert outbox.get_events() == [body] + outbox.close() + + +def test_record_chained_event_builds_chain(tmp_path) -> None: + outbox = Outbox(str(tmp_path / "outbox.db")) + + def build_first(pos: int, prev: str) -> tuple[dict, str]: + event = { + "event_id": "evt_a", + "correlation_id": "corr-1", + "chain_position": pos, + "prev_event_hash": prev, + } + return event, event_content_hash(event) + + outbox.record_chained_event("corr-1", "evt_a", build_first) + + def build_second(pos: int, prev: str) -> tuple[dict, str]: + event = { + "event_id": "evt_b", + "correlation_id": "corr-1", + "chain_position": pos, + "prev_event_hash": prev, + } + return event, event_content_hash(event) + + outbox.record_chained_event("corr-1", "evt_b", build_second) + + state = outbox.get_chain_state("corr-1") + assert state is not None + assert state["position"] == 2 + assert len(outbox.get_events()) == 2 + outbox.close() + + +def test_append_with_chain_state_and_set_chain_state(tmp_path) -> None: + outbox = Outbox(str(tmp_path / "outbox.db")) + body, event_hash = _sample_event("evt_1", "corr-x", 1, SENTINEL_PREV_HASH) + outbox.append_with_chain_state("evt_1", body, "corr-x", 1, event_hash) + + state = outbox.get_chain_state("corr-x") + assert state == {"position": 1, "hash": event_hash} + + body2, hash2 = _sample_event("evt_2", "corr-x", 2, event_hash) + outbox.set_chain_state("corr-x", 2, hash2) + outbox.append_with_chain_state("evt_2", body2, "corr-x", 2, hash2) + + assert outbox.get_chain_state("corr-x") == {"position": 2, "hash": hash2} + outbox.close() + + +def test_record_chained_event_rolls_back_on_duplicate_id(tmp_path) -> None: + outbox = Outbox(str(tmp_path / "outbox.db")) + body, event_hash = _sample_event("evt_dup", "corr-dup", 1, SENTINEL_PREV_HASH) + outbox.append_with_chain_state("evt_dup", body, "corr-dup", 1, event_hash) + + def build_duplicate(_pos: int, _prev: str) -> tuple[dict, str]: + dup_body, dup_hash = _sample_event("evt_dup", "corr-dup", 2, event_hash) + return dup_body, dup_hash + + with pytest.raises(Exception): + outbox.record_chained_event("corr-dup", "evt_dup", build_duplicate) + + assert outbox.get_chain_state("corr-dup") == {"position": 1, "hash": event_hash} + outbox.close() + + +def test_get_chain_state_missing_returns_none(tmp_path) -> None: + outbox = Outbox(str(tmp_path / "outbox.db")) + assert outbox.get_chain_state("missing") is None + outbox.close() + + +def test_append_with_chain_state_rolls_back_on_failure(tmp_path) -> None: + outbox = Outbox(str(tmp_path / "outbox.db")) + body, event_hash = _sample_event("evt_1", "corr-rb", 1, SENTINEL_PREV_HASH) + outbox.append_with_chain_state("evt_1", body, "corr-rb", 1, event_hash) + + with pytest.raises(Exception): + outbox.append_with_chain_state("evt_1", body, "corr-rb", 2, event_hash) + + assert outbox.get_chain_state("corr-rb") == {"position": 1, "hash": event_hash} + outbox.close() diff --git a/tests/test_signing_edges.py b/tests/test_signing_edges.py new file mode 100644 index 0000000..a4dce3c --- /dev/null +++ b/tests/test_signing_edges.py @@ -0,0 +1,27 @@ +"""Signing verification edge cases.""" + +from __future__ import annotations + +import base64 + +from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey + +from intentproof.signing import verify_event_signature + + +def test_verify_rejects_missing_signature() -> None: + private_key = Ed25519PrivateKey.generate() + event = {"schema": "intentproof.event.v1", "event_id": "evt_unsigned"} + assert verify_event_signature(event, private_key.public_key()) is False + + +def test_verify_rejects_tampered_signature() -> None: + private_key = Ed25519PrivateKey.generate() + digest = b"\x00" * 32 + signature = base64.b64encode(private_key.sign(digest)).decode("ascii") + event = { + "schema": "intentproof.event.v1", + "signature": {"value": signature}, + } + other_key = Ed25519PrivateKey.generate() + assert verify_event_signature(event, other_key.public_key()) is False