Production-grade runtime primitives for multi-step LLM agent loops — sync and async, with retry classification, circuit-breaker protection, per-model cost tracking, opt-in budget enforcement, role-based tool gating, content guardrails, agent-to-agent handoffs, declarative policy, durable checkpointing, token-aware rate limiting, and OpenTelemetry GenAI semantic conventions out of the box. Beta — 0.2.x; 0.x APIs remain explicitly unstable.
pip install techrevati-runtime
# Or with OpenTelemetry:
pip install 'techrevati-runtime[otel]'from techrevati.runtime import (
Orchestrator, UsageSnapshot, ModelPricing, register_pricing,
)
register_pricing("model-a", ModelPricing(input_per_million=3.0, output_per_million=15.0))
orch = Orchestrator(
role="writer", phase="draft", project_id=1,
budget_usd=10.0, enforce_budget=True, max_iterations=25,
)
with orch.session() as session:
result, usage = session.run_turn(
lambda: call_model(prompt),
model="model-a",
usage=UsageSnapshot(input_tokens=5000, output_tokens=1200),
timeout=30.0,
)
print(session.summary())The session walks the worker through INITIALIZING → RUNNING → COMPLETED, classifies any exception that bubbles up into a typed failure scenario, attempts recovery once, enforces the budget, gates tool calls behind permissions and guardrails, and emits structured events to any sink you configure — without you wiring any of it by hand.
Async sibling: replace with with async with, session() with asession(), run_turn with arun_turn. Same parameters. asyncio.CancelledError cleanly transitions the worker to CANCELLED.
For an end-to-end example exercising every primitive (permissions + breaker + budget + guardrail + handoff + policy + OTel), see examples/tiny_agent.py and the end-to-end tutorial.
- Zero runtime dependencies. Imports are stdlib only. OpenTelemetry is an optional
[otel]extra. - Type-safe.
py.typedmarker shipped; clean undermypy --strict. - Composable. Every primitive (
CircuitBreaker,AsyncCircuitBreaker,RetryContext,QualityGate,PolicyEngine,UsageTracker,PermissionEnforcer,Guardrail,Handoff) is usable standalone. TheOrchestratoris just the wiring. - Thread-safe and async-safe.
threading.Lockin sync paths,asyncio.Lockin async paths. State is per-instance. - Configuration-free at the edges. Pricing data is empty by default; phase thresholds are not hardcoded; permission roles are caller-defined. The runtime stays opinion-free about what your numbers mean.
| Module | Provides |
|---|---|
orchestrator |
Orchestrator, OrchestrationSession, AsyncOrchestrationSession, AgentSession |
circuit_breaker |
CircuitBreaker, AsyncCircuitBreaker (CLOSED/OPEN/HALF_OPEN with configurable probe permits) |
retry_policy |
classify_exception, attempt_recovery (sync + async), backoff_delay with full/equal/decorrelated jitter |
usage_tracking |
UsageTracker, register_pricing, load_pricing_from_file, BudgetExceededError, has_pricing |
agent_lifecycle |
AgentRegistry, AgentWorker with validated state machine including CANCELLED |
agent_events |
Typed lifecycle events + OpenTelemetry attribute bridge |
permissions |
Role × tool authorization, deny-first |
guardrails |
Pre-call + post-call content gating around run_tool / arun_tool |
handoffs |
Handoff value + session.handoff_to() agent-to-agent delegation |
policy_engine |
Composable conditions and rule evaluator with auto-elapsed time |
sinks |
EventSink / UsageSink Protocols + ring-buffered defaults |
otel (optional) |
OpenTelemetrySink + OpenTelemetryUsageSink emitting GenAI semconv spans/metrics |
import asyncio
from techrevati.runtime import (
AllowAllGuardrail, AsyncCircuitBreaker, Orchestrator, UsageSnapshot,
)
cb = AsyncCircuitBreaker("model-api", failure_threshold=3, recovery_timeout_seconds=30.0)
async def main():
orch = Orchestrator(
role="writer", phase="draft",
async_circuit_breaker=cb,
guardrails=[AllowAllGuardrail()],
max_iterations=10,
)
async with orch.asession() as session:
text, _ = await session.arun_turn(
lambda: acall_model(prompt),
model="model-a",
usage=UsageSnapshot(input_tokens=5000, output_tokens=1200),
timeout=30.0,
)
handoff = session.handoff_to("editor", reason="review", context={"draft": text})
print(f"handed off to {handoff.target_role}")
asyncio.run(main())from techrevati.runtime import Orchestrator
from techrevati.runtime.otel import OpenTelemetrySink, OpenTelemetryUsageSink
orch = Orchestrator(
role="writer", phase="draft",
event_sink=OpenTelemetrySink(agent_id="writer-001"),
usage_sink=OpenTelemetryUsageSink(),
)
# Every AgentEvent now appears as an OTel span with gen_ai.operation.name,
# gen_ai.agent.id, gen_ai.usage.{input,output}_tokens. Drop-in compatible
# with any APM ingest that already understands GenAI semconv.See docs/api/otel.md for the full attribute list and span name mapping.
Pick just what you need. Each primitive is usable on its own without Orchestrator.
from techrevati.runtime import (
CircuitBreaker, CircuitOpenError,
UsageTracker, UsageSnapshot,
classify_exception, attempt_recovery, RecoveryContext,
)
cb = CircuitBreaker("downstream", failure_threshold=5, recovery_timeout_seconds=60.0)
result = cb.call(fetch, url, timeout=10) # raises CircuitOpenError if tripped
ctx = RecoveryContext()
scenario = classify_exception(my_error)
recovery = attempt_recovery(scenario, ctx) # returns RecoveryResult with steps to retry
tracker = UsageTracker()
tracker.record_turn("model-a", UsageSnapshot(input_tokens=5000, output_tokens=1200))
print(tracker.format_cost())techrevati-runtime is intentionally smaller and narrower than either:
- LangGraph is a workflow engine with durable execution, checkpointer protocols, and a graph model. Use it when your agent flow is a graph that needs to survive restarts and you're OK with the LangChain ecosystem footprint.
- OpenAI Agents SDK is a cohesive runtime tied to OpenAI's models, with default tracing through their dashboards. Use it when you're committed to OpenAI and want the smoothest path.
techrevati-runtimeis a zero-dep primitive set. Sync + async. Vendor-neutral. Emits OpenTelemetry GenAI semantic conventions so the same APM dashboards that consume OpenAI Agents SDK telemetry will pick us up too. Bring your own model client and your own persistence — the runtime stays opinion-free.
The runtime ships a pluggable CheckpointSaver protocol with InMemorySaver and SqliteSaver implementations (0.2.0) — enough for resume-from-checkpoint replay across restarts. It is still not a full durable workflow engine in the Temporal sense; pair with Temporal, dbos, or LangGraph's checkpointer if you need cross-host scheduling, retries-as-history, or a durable timer service.
- Pricing must be registered. The bundled
pricing.jsonis intentionally empty. Withoutregister_pricing()orload_pricing_from_file(), every cost calculation returns $0.00 (you will see a one-time warning per model). - Budget enforcement is opt-in. Set
Orchestrator(enforce_budget=True)to raiseBudgetExceededError; the default merely records an event and continues. - Permissions are advisory.
OrchestrationSession.run_tool()enforces;run_turn()does not gate model calls. There is no sandbox — pair with OS-level isolation if needed. - Durable execution is opt-in. Default sessions are in-memory; pass a
CheckpointSaver(e.g.SqliteSaver) plus a stablethread_idto get resume-from-checkpoint replay. Pair with Temporal/dbos if you need cross-host scheduling or durable timers. - Default sinks are in-memory ring buffers. Long-running sessions need a durable
EventSinkandUsageSink(e.g.OpenTelemetrySink, or your own). CircuitBreakerstate is per-process. Each replica counts its own failures. Add a shared coordinator if you need fleet-wide breaker state.
techrevati-runtime is at version 0.2.0 (beta). This release ships durable execution (CheckpointSaver + SqliteSaver), token-aware rate limiting (RateLimiter / AsyncRateLimiter), provider routing, per-session UsageLimits, nested OTel agent spans, persistent SQLite sinks, and supply-chain hardening (CycloneDX SBOM + CodeQL + zero-deps smoke). The AgentSession rename and OTel wire-format change are the two soft-breaking items — see docs/migrating-from-0.1.x.md. 0.x APIs remain unstable; breaking changes will continue to be gated by deprecation warnings. Pinning Python 3.11+ for from __future__ import annotations ergonomics and modern asyncio.
See CHANGELOG.md for the per-sprint release notes and docs/tutorials/end-to-end.md for a guided tour of every primitive.
Issues and PRs welcome — see CONTRIBUTING.md and SECURITY.md.
MIT — copyright © 2026 TechRevati doo. See LICENSE.