Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions PUBLIC_API.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ The following interfaces are considered the initial public API for Oris:
- `oris.runtime.ExecutionContext`
- `oris.runtime.Hook` and hook ABCs (`PreStepHook`, `PostStepHook`, `PipelinePreHook`, `PipelinePostHook`) plus `Callable*Hook` / `as_*_hook` helpers
- `oris.runtime.TraceManager`
- `oris.integrations.SafeRunner`
- `oris.integrations.SafeRunner` and `oris.integrations.ExternalRunnable` (typing protocol)
- CLI command `oris`

Legacy aliases `PreExecutionHook`, `PostExecutionHook`, `ExecutionHook`, and `PipelineHook` remain available for compatibility; prefer the explicit hook names above.
Expand All @@ -34,9 +34,23 @@ Anything not listed here is internal and may change between minor versions.
### `SafeRunner`

- `SafeRunner(external_pipeline, *, policy: PolicyEnforcer)`
- `run(input_data: dict[str, Any]) -> dict[str, Any]`
- `ExternalRunnable` (optional typing protocol): objects with `run(input_data: dict[str, Any]) -> Any`
- `run(input_data: dict[str, Any], *, include_trace: bool = False) -> dict[str, Any] | PipelineResult`
- With `include_trace=False` (default): returns `dict[str, Any]`.
- With `include_trace=True`: returns `PipelineResult` with the same fields as `Pipeline.run`, including a minimal run trace (one external step with `latency_ms`, `flags` including `"kind": "external_pipeline"`, and run status).

`external_pipeline` must implement a `run(...)` method returning a mapping. Validation uses the same `PolicyEnforcer.validate_input` / `validate_output` entry points as the default executor pipeline hooks. Tracing is not part of this wrapper by design.
`external_pipeline` may be:

- a callable `def f(data: dict) -> ...`, or
- an object with a callable `run(dict)` method (preferred when both `run` and `__call__` exist).

`input_data` may be any `collections.abc.Mapping`; it is copied to a plain `dict` before validation.

Return values are normalized to `dict[str, Any]` when the target returns a `dict`, any `Mapping`, or an object with a duck-typed `model_dump()` that returns a mapping.

Non-`PipelineExecutionError` exceptions raised inside the external target are surfaced as `PipelineExecutionError("External pipeline execution failed.")` without chaining the original exception as `__cause__`. `GuardViolationError` and other `PipelineExecutionError` subclasses raised by policy or adapters propagate unchanged.

Validation uses the same `PolicyEnforcer.validate_input` / `validate_output` entry points as the default executor pipeline hooks.

### Runtime hooks

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "hatchling.build"

[project]
name = "oris-ai"
version = "0.5.1"
version = "0.2.0"
description = "Oris: Responsible AI runtime framework for production pipelines."
readme = "README.md"
requires-python = ">=3.10"
Expand Down
4 changes: 2 additions & 2 deletions src/oris/integrations/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""Framework-agnostic integration wrappers."""

from .safe_runner import SafeRunner
from .safe_runner import ExternalRunnable, SafeRunner

__all__ = ["SafeRunner"]
__all__ = ["ExternalRunnable", "SafeRunner"]
133 changes: 118 additions & 15 deletions src/oris/integrations/safe_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,38 +4,141 @@
``RuntimeExecutor`` pipeline hooks (``validate_input`` / ``validate_output``),
so policy behavior stays aligned when mixing in-framework and external runs.

Tracing is intentionally out of scope here to keep this adapter minimal; a
future enhancement could accept an optional trace sink without coupling to
``TraceManager``.
Accepts a callable ``(dict) -> Any`` or an object with ``run(dict) -> Any``.
Return values are coerced to ``dict[str, Any]`` when they are ``dict``-like
or expose a duck-typed ``model_dump()`` returning a mapping.

Optional tracing reuses ``TraceManager.traced_component`` for a single external
step (status, flags, latency) without changing core executor behavior.
"""

from __future__ import annotations

from typing import Any, Protocol
from collections.abc import Mapping
from typing import Any, Literal, Protocol, overload

from oris.core.exceptions import PipelineExecutionError
from oris.rai.policy import PolicyEnforcer
from oris.runtime.models import PipelineResult
from oris.runtime.trace_manager import TraceManager


class ExternalRunnable(Protocol):
"""Protocol for framework-agnostic external runners."""

def run(self, input_data: dict[str, Any]) -> dict[str, Any]:
"""Execute a pipeline and return a mapping."""
def run(self, input_data: dict[str, Any]) -> Any:
"""Execute a pipeline and return a mapping or mapping-like result."""


def _coerce_input(data: Any) -> dict[str, Any]:
if isinstance(data, dict):
return dict(data)
if isinstance(data, Mapping):
return dict(data)
msg = "Input must be a dictionary-compatible mapping."
raise PipelineExecutionError(msg)


def _coerce_output(result: Any) -> dict[str, Any]:
if isinstance(result, dict):
return dict(result)
if isinstance(result, Mapping):
return dict(result)
model_dump = getattr(result, "model_dump", None)
if callable(model_dump):
dumped = model_dump()
if isinstance(dumped, Mapping):
return dict(dumped)
msg = "External pipeline must return a dictionary-compatible payload."
raise PipelineExecutionError(msg)


_EXTERNAL_FLAGS: dict[str, Any] = {"kind": "external_pipeline"}
_EXTERNAL_STEP_ID = "external_pipeline"


class SafeRunner:
"""Wraps an external runner with the same policy checks as pipeline execution."""

def __init__(self, external_pipeline: ExternalRunnable, *, policy: PolicyEnforcer) -> None:
def __init__(self, external_pipeline: object, *, policy: PolicyEnforcer) -> None:
self._external_pipeline = external_pipeline
self._policy = policy

def run(self, input_data: dict[str, Any]) -> dict[str, Any]:
self._policy.validate_input(input_data)
result = self._external_pipeline.run(input_data)
if not isinstance(result, dict):
msg = "External pipeline must return a dictionary payload."
raise PipelineExecutionError(msg)
self._policy.validate_output(result)
return result
def _dispatch(self, inp: dict[str, Any]) -> Any:
target = self._external_pipeline
run_method = getattr(target, "run", None)
if run_method is not None and callable(run_method):
return run_method(inp)
if callable(target):
return target(inp)
msg = "External target must be callable or provide a run(dict) method."
raise PipelineExecutionError(msg)

def _execute_body(self, inp: dict[str, Any]) -> dict[str, Any]:
self._policy.validate_input(inp)
try:
raw = self._dispatch(inp)
except Exception as exc:
if isinstance(exc, PipelineExecutionError):
raise
raise PipelineExecutionError("External pipeline execution failed.") from None
normalized = _coerce_output(raw)
self._policy.validate_output(normalized)
return normalized

@overload
def run(
self,
input_data: dict[str, Any],
*,
include_trace: Literal[False] = False,
) -> dict[str, Any]: ...

@overload
def run(
self,
input_data: dict[str, Any],
*,
include_trace: Literal[True],
) -> PipelineResult: ...

def run(
self,
input_data: dict[str, Any],
*,
include_trace: bool = False,
) -> dict[str, Any] | PipelineResult:
inp = _coerce_input(input_data)
if not include_trace:
return self._execute_body(inp)

trace_manager = TraceManager()
trace = trace_manager.begin_run()
try:

def execute() -> dict[str, Any]:
return self._execute_body(inp)

output = trace_manager.traced_component(
trace,
step_id=_EXTERNAL_STEP_ID,
component_name=_EXTERNAL_STEP_ID,
flags=_EXTERNAL_FLAGS,
execute=execute,
)
trace_manager.finalize_success(
trace,
pipeline_metadata={"source": "external"},
)
except Exception:
trace_manager.finalize_failure(trace)
raise

return PipelineResult(
output=output,
trace=trace,
metadata={
"run_id": trace.run_id,
"source": "external",
},
)
131 changes: 131 additions & 0 deletions tests/test_safe_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@

from __future__ import annotations

from collections import UserDict
from typing import Any, cast

import pytest

from oris.core.enums import ExecutionStatus
from oris.core.exceptions import GuardViolationError, PipelineExecutionError
from oris.integrations.safe_runner import SafeRunner
from oris.rai.policy import PolicyEnforcer
Expand All @@ -22,12 +24,60 @@ def run(self, input_data: dict[str, str]) -> str:
return "not-a-dict"


class MappingOutputPipeline:
def run(self, input_data: dict[str, str]) -> UserDict[str, str]:
_ = input_data
return UserDict([("out", "mapped")])


class PreferRunOverCall:
"""`.run` must win when both exist."""

def run(self, input_data: dict[str, str]) -> dict[str, str]:
return {"via": "run", "q": input_data.get("query", "")}

def __call__(self, input_data: dict[str, str]) -> dict[str, str]:
return {"via": "call"}


class FakeModel:
def model_dump(self) -> dict[str, str]:
return {"from_model": "yes"}


class BadModelDump:
def model_dump(self) -> str:
return "not-a-mapping"


def test_safe_runner_wraps_external_pipeline() -> None:
runner = SafeRunner(ExternalPipeline(), policy=PolicyEnforcer())
result = runner.run({"query": "hello"})
assert result["output"] == "ok: hello"


def test_safe_runner_wraps_plain_callable() -> None:
runner = SafeRunner(lambda d: {"echo": d["q"]}, policy=PolicyEnforcer())
assert runner.run({"q": "hi"}) == {"echo": "hi"}


def test_safe_runner_prefers_run_over_call() -> None:
runner = SafeRunner(PreferRunOverCall(), policy=PolicyEnforcer())
assert runner.run({"query": "x"}) == {"via": "run", "q": "x"}


def test_safe_runner_accepts_mapping_input() -> None:
runner = SafeRunner(lambda d: {"k": d["a"]}, policy=PolicyEnforcer())
ud: UserDict[str, str] = UserDict([("a", "1")])
assert runner.run(cast(Any, ud)) == {"k": "1"}


def test_safe_runner_rejects_non_mapping_input() -> None:
runner = SafeRunner(lambda d: d, policy=PolicyEnforcer())
with pytest.raises(PipelineExecutionError, match="dictionary-compatible mapping"):
runner.run(cast(Any, "not-a-mapping"))


def test_safe_runner_blocks_unsafe_input() -> None:
runner = SafeRunner(ExternalPipeline(), policy=PolicyEnforcer())
with pytest.raises(GuardViolationError):
Expand All @@ -38,3 +88,84 @@ def test_safe_runner_requires_mapping_output() -> None:
runner = SafeRunner(cast(Any, InvalidExternalPipeline()), policy=PolicyEnforcer())
with pytest.raises(PipelineExecutionError):
runner.run({"query": "hello"})


def test_safe_runner_coerces_mapping_output() -> None:
runner = SafeRunner(MappingOutputPipeline(), policy=PolicyEnforcer())
assert runner.run({"query": "hello"}) == {"out": "mapped"}


def test_safe_runner_coerces_model_dump_output() -> None:
runner = SafeRunner(lambda _d: FakeModel(), policy=PolicyEnforcer())
assert runner.run({"query": "hello"}) == {"from_model": "yes"}


def test_safe_runner_rejects_bad_model_dump() -> None:
runner = SafeRunner(lambda _d: BadModelDump(), policy=PolicyEnforcer())
with pytest.raises(PipelineExecutionError, match="dictionary-compatible"):
runner.run({"query": "hello"})


def test_safe_runner_rejects_invalid_target() -> None:
runner = SafeRunner(object(), policy=PolicyEnforcer())
with pytest.raises(PipelineExecutionError, match="callable or provide a run"):
runner.run({"query": "hello"})


def test_safe_runner_wraps_external_exception() -> None:
def boom(_d: dict[str, str]) -> dict[str, str]:
raise ValueError("sensitive internals")

runner = SafeRunner(boom, policy=PolicyEnforcer())
with pytest.raises(
PipelineExecutionError,
match="External pipeline execution failed",
) as exc_info:
runner.run({"query": "hello"})
assert exc_info.value.__cause__ is None


def test_safe_runner_propagates_pipeline_execution_error() -> None:
def user_defined(_d: dict[str, str]) -> dict[str, str]:
raise PipelineExecutionError("user boundary")

runner = SafeRunner(user_defined, policy=PolicyEnforcer())
with pytest.raises(PipelineExecutionError, match="user boundary"):
runner.run({"query": "hello"})


def test_safe_runner_output_policy_blocked_term() -> None:
runner = SafeRunner(lambda _d: {"text": "exploit payload"}, policy=PolicyEnforcer())
with pytest.raises(GuardViolationError, match="blocked policy"):
runner.run({"query": "ok"})


def test_safe_runner_output_toxicity_stub() -> None:
policy = PolicyEnforcer()
runner = SafeRunner(
lambda _d: {"x": policy.toxicity_stub_marker},
policy=policy,
)
with pytest.raises(GuardViolationError, match="toxicity"):
runner.run({"query": "ok"})


def test_safe_runner_include_trace_success() -> None:
runner = SafeRunner(ExternalPipeline(), policy=PolicyEnforcer())
result = runner.run({"query": "hello"}, include_trace=True)
assert result.output["output"] == "ok: hello"
assert result.trace.status == ExecutionStatus.SUCCEEDED.value
assert len(result.trace.steps) == 1
step = result.trace.steps[0]
assert step.flags["kind"] == "external_pipeline"
assert step.latency_ms >= 0.0
summary = result.to_run_summary()
assert summary["status"] == "success"
assert summary["trace"][0]["flags"]["kind"] == "external_pipeline"
assert result.metadata.get("source") == "external"


def test_safe_runner_include_trace_on_failure() -> None:
runner = SafeRunner(ExternalPipeline(), policy=PolicyEnforcer())
with pytest.raises(GuardViolationError):
runner.run({"secret": "no"}, include_trace=True)
Loading