diff --git a/src/intentproof/__init__.py b/src/intentproof/__init__.py index ea4b8c1..a1b057a 100644 --- a/src/intentproof/__init__.py +++ b/src/intentproof/__init__.py @@ -1 +1,5 @@ """IntentProof Python SDK.""" + +from intentproof.exporter import ingest_request_headers, post_execution_event + +__all__ = ["ingest_request_headers", "post_execution_event"] diff --git a/src/intentproof/exporter.py b/src/intentproof/exporter.py new file mode 100644 index 0000000..bbd6f8e --- /dev/null +++ b/src/intentproof/exporter.py @@ -0,0 +1,34 @@ +"""HTTP export helpers for hosted ingest.""" + +from __future__ import annotations + +import json +import os +import urllib.error +import urllib.request +from typing import Any, Mapping + + +def ingest_request_headers() -> dict[str, str]: + headers = {"Content-Type": "application/json"} + token = os.environ.get("INTENTPROOF_INGEST_TOKEN", "").strip() + if token: + headers["Authorization"] = f"Bearer {token}" + return headers + + +def post_execution_event(ingest_url: str, event: Mapping[str, Any]) -> None: + body = json.dumps(event).encode("utf-8") + request = urllib.request.Request( + ingest_url, + data=body, + headers=ingest_request_headers(), + method="POST", + ) + try: + with urllib.request.urlopen(request, timeout=5) as response: + if response.status not in (200, 202): + raise RuntimeError(f"ingest POST returned {response.status}") + except urllib.error.HTTPError as exc: + detail = exc.read().decode("utf-8", errors="replace")[:200] + raise RuntimeError(f"ingest POST {exc.code}: {detail}") from exc diff --git a/tests/test_exporter.py b/tests/test_exporter.py new file mode 100644 index 0000000..918f47f --- /dev/null +++ b/tests/test_exporter.py @@ -0,0 +1,27 @@ +import os + +from intentproof.exporter import ingest_request_headers + + +def test_ingest_request_headers_includes_bearer_token() -> None: + previous = os.environ.get("INTENTPROOF_INGEST_TOKEN") + os.environ["INTENTPROOF_INGEST_TOKEN"] = "ingest-secret" + try: + headers = ingest_request_headers() + assert headers["Authorization"] == "Bearer ingest-secret" + finally: + if previous is None: + os.environ.pop("INTENTPROOF_INGEST_TOKEN", None) + else: + os.environ["INTENTPROOF_INGEST_TOKEN"] = previous + + +def test_ingest_request_headers_omits_authorization_without_token() -> None: + previous = os.environ.get("INTENTPROOF_INGEST_TOKEN") + os.environ.pop("INTENTPROOF_INGEST_TOKEN", None) + try: + headers = ingest_request_headers() + assert "Authorization" not in headers + finally: + if previous is not None: + os.environ["INTENTPROOF_INGEST_TOKEN"] = previous