Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/intentproof/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,5 @@
"""IntentProof Python SDK."""

from intentproof.exporter import ingest_request_headers, post_execution_event

__all__ = ["ingest_request_headers", "post_execution_event"]
34 changes: 34 additions & 0 deletions src/intentproof/exporter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""HTTP export helpers for hosted ingest."""

from __future__ import annotations

import json
import os
import urllib.error
import urllib.request
from typing import Any, Mapping


def ingest_request_headers() -> dict[str, str]:
headers = {"Content-Type": "application/json"}
token = os.environ.get("INTENTPROOF_INGEST_TOKEN", "").strip()
if token:
headers["Authorization"] = f"Bearer {token}"
return headers


def post_execution_event(ingest_url: str, event: Mapping[str, Any]) -> None:
body = json.dumps(event).encode("utf-8")
request = urllib.request.Request(
ingest_url,
data=body,
headers=ingest_request_headers(),
method="POST",
)
try:
with urllib.request.urlopen(request, timeout=5) as response:
if response.status not in (200, 202):
raise RuntimeError(f"ingest POST returned {response.status}")
except urllib.error.HTTPError as exc:
detail = exc.read().decode("utf-8", errors="replace")[:200]
raise RuntimeError(f"ingest POST {exc.code}: {detail}") from exc
27 changes: 27 additions & 0 deletions tests/test_exporter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import os

from intentproof.exporter import ingest_request_headers


def test_ingest_request_headers_includes_bearer_token() -> None:
previous = os.environ.get("INTENTPROOF_INGEST_TOKEN")
os.environ["INTENTPROOF_INGEST_TOKEN"] = "ingest-secret"
try:
headers = ingest_request_headers()
assert headers["Authorization"] == "Bearer ingest-secret"
finally:
if previous is None:
os.environ.pop("INTENTPROOF_INGEST_TOKEN", None)
else:
os.environ["INTENTPROOF_INGEST_TOKEN"] = previous


def test_ingest_request_headers_omits_authorization_without_token() -> None:
previous = os.environ.get("INTENTPROOF_INGEST_TOKEN")
os.environ.pop("INTENTPROOF_INGEST_TOKEN", None)
try:
headers = ingest_request_headers()
assert "Authorization" not in headers
finally:
if previous is not None:
os.environ["INTENTPROOF_INGEST_TOKEN"] = previous
Loading