From b836b1a018eca9d3611e30b271df41b1f3186ca8 Mon Sep 17 00:00:00 2001 From: DevStrikerTech Date: Wed, 8 Apr 2026 18:11:34 +0100 Subject: [PATCH] feat(integrations): external SafeRunner, optional trace, version 0.2.0 Made-with: Cursor --- PUBLIC_API.md | 20 +++- pyproject.toml | 2 +- src/oris/integrations/__init__.py | 4 +- src/oris/integrations/safe_runner.py | 133 ++++++++++++++++++++++++--- tests/test_safe_runner.py | 131 ++++++++++++++++++++++++++ 5 files changed, 269 insertions(+), 21 deletions(-) diff --git a/PUBLIC_API.md b/PUBLIC_API.md index b5a8fb6..c2277b3 100644 --- a/PUBLIC_API.md +++ b/PUBLIC_API.md @@ -10,7 +10,7 @@ The following interfaces are considered the initial public API for Oris: - `oris.runtime.ExecutionContext` - `oris.runtime.Hook` and hook ABCs (`PreStepHook`, `PostStepHook`, `PipelinePreHook`, `PipelinePostHook`) plus `Callable*Hook` / `as_*_hook` helpers - `oris.runtime.TraceManager` -- `oris.integrations.SafeRunner` +- `oris.integrations.SafeRunner` and `oris.integrations.ExternalRunnable` (typing protocol) - CLI command `oris` Legacy aliases `PreExecutionHook`, `PostExecutionHook`, `ExecutionHook`, and `PipelineHook` remain available for compatibility; prefer the explicit hook names above. @@ -34,9 +34,23 @@ Anything not listed here is internal and may change between minor versions. ### `SafeRunner` - `SafeRunner(external_pipeline, *, policy: PolicyEnforcer)` -- `run(input_data: dict[str, Any]) -> dict[str, Any]` +- `ExternalRunnable` (optional typing protocol): objects with `run(input_data: dict[str, Any]) -> Any` +- `run(input_data: dict[str, Any], *, include_trace: bool = False) -> dict[str, Any] | PipelineResult` + - With `include_trace=False` (default): returns `dict[str, Any]`. + - With `include_trace=True`: returns `PipelineResult` with the same fields as `Pipeline.run`, including a minimal run trace (one external step with `latency_ms`, `flags` including `"kind": "external_pipeline"`, and run status). -`external_pipeline` must implement a `run(...)` method returning a mapping. Validation uses the same `PolicyEnforcer.validate_input` / `validate_output` entry points as the default executor pipeline hooks. Tracing is not part of this wrapper by design. +`external_pipeline` may be: + +- a callable `def f(data: dict) -> ...`, or +- an object with a callable `run(dict)` method (preferred when both `run` and `__call__` exist). + +`input_data` may be any `collections.abc.Mapping`; it is copied to a plain `dict` before validation. + +Return values are normalized to `dict[str, Any]` when the target returns a `dict`, any `Mapping`, or an object with a duck-typed `model_dump()` that returns a mapping. + +Non-`PipelineExecutionError` exceptions raised inside the external target are surfaced as `PipelineExecutionError("External pipeline execution failed.")` without chaining the original exception as `__cause__`. `GuardViolationError` and other `PipelineExecutionError` subclasses raised by policy or adapters propagate unchanged. + +Validation uses the same `PolicyEnforcer.validate_input` / `validate_output` entry points as the default executor pipeline hooks. ### Runtime hooks diff --git a/pyproject.toml b/pyproject.toml index a3e79ab..8738726 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "oris-ai" -version = "0.5.1" +version = "0.2.0" description = "Oris: Responsible AI runtime framework for production pipelines." readme = "README.md" requires-python = ">=3.10" diff --git a/src/oris/integrations/__init__.py b/src/oris/integrations/__init__.py index aec15eb..b4fbb9e 100644 --- a/src/oris/integrations/__init__.py +++ b/src/oris/integrations/__init__.py @@ -1,5 +1,5 @@ """Framework-agnostic integration wrappers.""" -from .safe_runner import SafeRunner +from .safe_runner import ExternalRunnable, SafeRunner -__all__ = ["SafeRunner"] +__all__ = ["ExternalRunnable", "SafeRunner"] diff --git a/src/oris/integrations/safe_runner.py b/src/oris/integrations/safe_runner.py index 852635c..190f8a0 100644 --- a/src/oris/integrations/safe_runner.py +++ b/src/oris/integrations/safe_runner.py @@ -4,38 +4,141 @@ ``RuntimeExecutor`` pipeline hooks (``validate_input`` / ``validate_output``), so policy behavior stays aligned when mixing in-framework and external runs. -Tracing is intentionally out of scope here to keep this adapter minimal; a -future enhancement could accept an optional trace sink without coupling to -``TraceManager``. +Accepts a callable ``(dict) -> Any`` or an object with ``run(dict) -> Any``. +Return values are coerced to ``dict[str, Any]`` when they are ``dict``-like +or expose a duck-typed ``model_dump()`` returning a mapping. + +Optional tracing reuses ``TraceManager.traced_component`` for a single external +step (status, flags, latency) without changing core executor behavior. """ from __future__ import annotations -from typing import Any, Protocol +from collections.abc import Mapping +from typing import Any, Literal, Protocol, overload from oris.core.exceptions import PipelineExecutionError from oris.rai.policy import PolicyEnforcer +from oris.runtime.models import PipelineResult +from oris.runtime.trace_manager import TraceManager class ExternalRunnable(Protocol): """Protocol for framework-agnostic external runners.""" - def run(self, input_data: dict[str, Any]) -> dict[str, Any]: - """Execute a pipeline and return a mapping.""" + def run(self, input_data: dict[str, Any]) -> Any: + """Execute a pipeline and return a mapping or mapping-like result.""" + + +def _coerce_input(data: Any) -> dict[str, Any]: + if isinstance(data, dict): + return dict(data) + if isinstance(data, Mapping): + return dict(data) + msg = "Input must be a dictionary-compatible mapping." + raise PipelineExecutionError(msg) + + +def _coerce_output(result: Any) -> dict[str, Any]: + if isinstance(result, dict): + return dict(result) + if isinstance(result, Mapping): + return dict(result) + model_dump = getattr(result, "model_dump", None) + if callable(model_dump): + dumped = model_dump() + if isinstance(dumped, Mapping): + return dict(dumped) + msg = "External pipeline must return a dictionary-compatible payload." + raise PipelineExecutionError(msg) + + +_EXTERNAL_FLAGS: dict[str, Any] = {"kind": "external_pipeline"} +_EXTERNAL_STEP_ID = "external_pipeline" class SafeRunner: """Wraps an external runner with the same policy checks as pipeline execution.""" - def __init__(self, external_pipeline: ExternalRunnable, *, policy: PolicyEnforcer) -> None: + def __init__(self, external_pipeline: object, *, policy: PolicyEnforcer) -> None: self._external_pipeline = external_pipeline self._policy = policy - def run(self, input_data: dict[str, Any]) -> dict[str, Any]: - self._policy.validate_input(input_data) - result = self._external_pipeline.run(input_data) - if not isinstance(result, dict): - msg = "External pipeline must return a dictionary payload." - raise PipelineExecutionError(msg) - self._policy.validate_output(result) - return result + def _dispatch(self, inp: dict[str, Any]) -> Any: + target = self._external_pipeline + run_method = getattr(target, "run", None) + if run_method is not None and callable(run_method): + return run_method(inp) + if callable(target): + return target(inp) + msg = "External target must be callable or provide a run(dict) method." + raise PipelineExecutionError(msg) + + def _execute_body(self, inp: dict[str, Any]) -> dict[str, Any]: + self._policy.validate_input(inp) + try: + raw = self._dispatch(inp) + except Exception as exc: + if isinstance(exc, PipelineExecutionError): + raise + raise PipelineExecutionError("External pipeline execution failed.") from None + normalized = _coerce_output(raw) + self._policy.validate_output(normalized) + return normalized + + @overload + def run( + self, + input_data: dict[str, Any], + *, + include_trace: Literal[False] = False, + ) -> dict[str, Any]: ... + + @overload + def run( + self, + input_data: dict[str, Any], + *, + include_trace: Literal[True], + ) -> PipelineResult: ... + + def run( + self, + input_data: dict[str, Any], + *, + include_trace: bool = False, + ) -> dict[str, Any] | PipelineResult: + inp = _coerce_input(input_data) + if not include_trace: + return self._execute_body(inp) + + trace_manager = TraceManager() + trace = trace_manager.begin_run() + try: + + def execute() -> dict[str, Any]: + return self._execute_body(inp) + + output = trace_manager.traced_component( + trace, + step_id=_EXTERNAL_STEP_ID, + component_name=_EXTERNAL_STEP_ID, + flags=_EXTERNAL_FLAGS, + execute=execute, + ) + trace_manager.finalize_success( + trace, + pipeline_metadata={"source": "external"}, + ) + except Exception: + trace_manager.finalize_failure(trace) + raise + + return PipelineResult( + output=output, + trace=trace, + metadata={ + "run_id": trace.run_id, + "source": "external", + }, + ) diff --git a/tests/test_safe_runner.py b/tests/test_safe_runner.py index e528ba8..09e5874 100644 --- a/tests/test_safe_runner.py +++ b/tests/test_safe_runner.py @@ -2,10 +2,12 @@ from __future__ import annotations +from collections import UserDict from typing import Any, cast import pytest +from oris.core.enums import ExecutionStatus from oris.core.exceptions import GuardViolationError, PipelineExecutionError from oris.integrations.safe_runner import SafeRunner from oris.rai.policy import PolicyEnforcer @@ -22,12 +24,60 @@ def run(self, input_data: dict[str, str]) -> str: return "not-a-dict" +class MappingOutputPipeline: + def run(self, input_data: dict[str, str]) -> UserDict[str, str]: + _ = input_data + return UserDict([("out", "mapped")]) + + +class PreferRunOverCall: + """`.run` must win when both exist.""" + + def run(self, input_data: dict[str, str]) -> dict[str, str]: + return {"via": "run", "q": input_data.get("query", "")} + + def __call__(self, input_data: dict[str, str]) -> dict[str, str]: + return {"via": "call"} + + +class FakeModel: + def model_dump(self) -> dict[str, str]: + return {"from_model": "yes"} + + +class BadModelDump: + def model_dump(self) -> str: + return "not-a-mapping" + + def test_safe_runner_wraps_external_pipeline() -> None: runner = SafeRunner(ExternalPipeline(), policy=PolicyEnforcer()) result = runner.run({"query": "hello"}) assert result["output"] == "ok: hello" +def test_safe_runner_wraps_plain_callable() -> None: + runner = SafeRunner(lambda d: {"echo": d["q"]}, policy=PolicyEnforcer()) + assert runner.run({"q": "hi"}) == {"echo": "hi"} + + +def test_safe_runner_prefers_run_over_call() -> None: + runner = SafeRunner(PreferRunOverCall(), policy=PolicyEnforcer()) + assert runner.run({"query": "x"}) == {"via": "run", "q": "x"} + + +def test_safe_runner_accepts_mapping_input() -> None: + runner = SafeRunner(lambda d: {"k": d["a"]}, policy=PolicyEnforcer()) + ud: UserDict[str, str] = UserDict([("a", "1")]) + assert runner.run(cast(Any, ud)) == {"k": "1"} + + +def test_safe_runner_rejects_non_mapping_input() -> None: + runner = SafeRunner(lambda d: d, policy=PolicyEnforcer()) + with pytest.raises(PipelineExecutionError, match="dictionary-compatible mapping"): + runner.run(cast(Any, "not-a-mapping")) + + def test_safe_runner_blocks_unsafe_input() -> None: runner = SafeRunner(ExternalPipeline(), policy=PolicyEnforcer()) with pytest.raises(GuardViolationError): @@ -38,3 +88,84 @@ def test_safe_runner_requires_mapping_output() -> None: runner = SafeRunner(cast(Any, InvalidExternalPipeline()), policy=PolicyEnforcer()) with pytest.raises(PipelineExecutionError): runner.run({"query": "hello"}) + + +def test_safe_runner_coerces_mapping_output() -> None: + runner = SafeRunner(MappingOutputPipeline(), policy=PolicyEnforcer()) + assert runner.run({"query": "hello"}) == {"out": "mapped"} + + +def test_safe_runner_coerces_model_dump_output() -> None: + runner = SafeRunner(lambda _d: FakeModel(), policy=PolicyEnforcer()) + assert runner.run({"query": "hello"}) == {"from_model": "yes"} + + +def test_safe_runner_rejects_bad_model_dump() -> None: + runner = SafeRunner(lambda _d: BadModelDump(), policy=PolicyEnforcer()) + with pytest.raises(PipelineExecutionError, match="dictionary-compatible"): + runner.run({"query": "hello"}) + + +def test_safe_runner_rejects_invalid_target() -> None: + runner = SafeRunner(object(), policy=PolicyEnforcer()) + with pytest.raises(PipelineExecutionError, match="callable or provide a run"): + runner.run({"query": "hello"}) + + +def test_safe_runner_wraps_external_exception() -> None: + def boom(_d: dict[str, str]) -> dict[str, str]: + raise ValueError("sensitive internals") + + runner = SafeRunner(boom, policy=PolicyEnforcer()) + with pytest.raises( + PipelineExecutionError, + match="External pipeline execution failed", + ) as exc_info: + runner.run({"query": "hello"}) + assert exc_info.value.__cause__ is None + + +def test_safe_runner_propagates_pipeline_execution_error() -> None: + def user_defined(_d: dict[str, str]) -> dict[str, str]: + raise PipelineExecutionError("user boundary") + + runner = SafeRunner(user_defined, policy=PolicyEnforcer()) + with pytest.raises(PipelineExecutionError, match="user boundary"): + runner.run({"query": "hello"}) + + +def test_safe_runner_output_policy_blocked_term() -> None: + runner = SafeRunner(lambda _d: {"text": "exploit payload"}, policy=PolicyEnforcer()) + with pytest.raises(GuardViolationError, match="blocked policy"): + runner.run({"query": "ok"}) + + +def test_safe_runner_output_toxicity_stub() -> None: + policy = PolicyEnforcer() + runner = SafeRunner( + lambda _d: {"x": policy.toxicity_stub_marker}, + policy=policy, + ) + with pytest.raises(GuardViolationError, match="toxicity"): + runner.run({"query": "ok"}) + + +def test_safe_runner_include_trace_success() -> None: + runner = SafeRunner(ExternalPipeline(), policy=PolicyEnforcer()) + result = runner.run({"query": "hello"}, include_trace=True) + assert result.output["output"] == "ok: hello" + assert result.trace.status == ExecutionStatus.SUCCEEDED.value + assert len(result.trace.steps) == 1 + step = result.trace.steps[0] + assert step.flags["kind"] == "external_pipeline" + assert step.latency_ms >= 0.0 + summary = result.to_run_summary() + assert summary["status"] == "success" + assert summary["trace"][0]["flags"]["kind"] == "external_pipeline" + assert result.metadata.get("source") == "external" + + +def test_safe_runner_include_trace_on_failure() -> None: + runner = SafeRunner(ExternalPipeline(), policy=PolicyEnforcer()) + with pytest.raises(GuardViolationError): + runner.run({"secret": "no"}, include_trace=True)