diff --git a/examples/01-linear-pipeline/main.py b/examples/01-linear-pipeline/main.py deleted file mode 100644 index 8eb2660..0000000 --- a/examples/01-linear-pipeline/main.py +++ /dev/null @@ -1,201 +0,0 @@ -"""Minimal openarmature demo: 2-node graph (plan → write) driven by a local vLLM. - -**Use case:** Take a topic (e.g. "the psychology of long walks") and produce -a short written piece — first plan a few angles, then write the article. - -**Demonstrates:** The minimal graph shape — typed `State`, the `append` -reducer, static edges, `END`, a two-node linear `plan → write` pipeline. - -Run with: - uv run python main.py "the psychology of long walks" -""" - -from __future__ import annotations - -import asyncio -import sys -from collections.abc import Mapping -from typing import Annotated, Any - -from openai import AsyncOpenAI -from openai.types.chat import ( - ChatCompletionMessageParam, - ChatCompletionSystemMessageParam, - ChatCompletionUserMessageParam, -) -from pydantic import Field - -from openarmature.graph import END, CompiledGraph, GraphBuilder, State, append - -VLLM_BASE_URL = "http://localhost:8000/v1" -MODEL = "dark-side-of-the-code/Mistral-Small-24B-Instruct-2501-AWQ" - -client = AsyncOpenAI(base_url=VLLM_BASE_URL, api_key="not-needed") - - -# ---------------------------------------------------------------------------- -# State schema -# ---------------------------------------------------------------------------- -# `State` is the immutable, strictly-typed object that flows through the graph. -# Every node receives an instance and returns a partial-update dict; the engine -# merges each update via per-field reducers and re-validates the result. -# -# Why this shape? The graph is a pipeline of pure state transitions. Nodes -# read a snapshot, emit a diff, and never mutate shared state — which makes -# "what was state when node X ran?" a question with a single clean answer. -# Under the hood, `State` is a Pydantic BaseModel with -# `model_config = ConfigDict(frozen=True, extra="forbid")` pre-baked into the -# base class — you don't touch `model_config` yourself. -# -# Non-obvious bits (see _docs/concepts.md for more): -# - Instances are FROZEN — nodes can't mutate `s.plan = ...`. They return -# {"plan": ...} and the engine applies the reducer. -# - Extra fields are FORBIDDEN — a node that returns {"typo": 1} raises -# StateValidationError instead of silently dropping the key. -# - Reducers attach per-field via Annotated[T, reducer]. Unannotated fields -# default to `last_write_wins` (new value replaces old). -# - Mutable defaults (list/dict) need Field(default_factory=...) — pydantic -# gotcha, not openarmature-specific. - - -class GraphState(State): - topic: str - plan: str = "" - output: str = "" - trace: Annotated[list[str], append] = Field(default_factory=list) - - -# ---------------------------------------------------------------------------- -# LLM client helper (not openarmature — just consumer plumbing) -# ---------------------------------------------------------------------------- -# Nothing below is graph-engine-specific; it's an OpenAI-compatible call to -# the local vLLM. Skim past this to the node functions if you're reading for -# openarmature concepts. - - -async def _chat(system: str, user: str) -> str: - messages: list[ChatCompletionMessageParam] = [ - ChatCompletionSystemMessageParam(role="system", content=system), - ChatCompletionUserMessageParam(role="user", content=user), - ] - resp = await client.chat.completions.create( - model=MODEL, - messages=messages, - temperature=0.4, - stream=False, - ) - return (resp.choices[0].message.content or "").strip() - - -# ---------------------------------------------------------------------------- -# Node functions -# ---------------------------------------------------------------------------- -# A node is `async def name(state) -> dict-of-partial-updates`. It reads the -# immutable state snapshot it was handed and returns *only* the fields it -# wants to change — the engine merges each field via its reducer. -# -# The idea: nodes are dumb and local. "I produced this plan." "I added this -# line to the trace." A node doesn't construct the next state, doesn't know -# what ran before it, and doesn't know what runs after. The graph is what -# composes them; each node stays a plain async function you could unit-test -# in isolation. -# -# For reducer-tracked fields (e.g. `trace: Annotated[list[str], append]`), -# return only the increment — `{"trace": ["plan"]}`, NOT -# `{"trace": s.trace + ["plan"]}`. The `append` reducer does the -# concatenation; returning the full list causes duplication. - - -async def plan_node(s: GraphState) -> Mapping[str, Any]: - content = await _chat( - system="You are a concise outliner. Respond with exactly 3 bullet points, no preamble.", - user=f"Outline the following topic in 3 bullets:\n\n{s.topic}", - ) - return {"plan": content, "trace": ["plan"]} - - -async def write_node(s: GraphState) -> Mapping[str, Any]: - content = await _chat( - system="You are an essayist. Write in prose, one tight paragraph, no bullet points, no headers.", - user=( - f"Topic: {s.topic}\n\nOutline:\n{s.plan}\n\n" - "Write a short paragraph (~120 words) that expands on the outline." - ), - ) - return {"output": content, "trace": ["write"]} - - -# ---------------------------------------------------------------------------- -# Graph construction -# ---------------------------------------------------------------------------- -# `GraphBuilder` is a mutable builder; `.compile()` turns it into an -# immutable `CompiledGraph` that's ready to run. The chain below does four -# things: -# 1. Register nodes under names — `.add_node("plan", plan_node)`. The name -# is what you reference in edges; the fn is the async callable. -# 2. Connect them with static edges — `.add_edge("plan", "write")` means -# "after `plan` runs and its update is merged, go to `write`". -# 3. Terminate with END — `.add_edge("write", END)` marks `write` as the -# last step, so the engine halts and `invoke()` returns. -# 4. Declare the entry point — `.set_entry("plan")` — where execution -# begins. -# -# Then `.compile()` runs the structural checks (no unreachable nodes, no -# dangling edges, no duplicate reducers on a field, etc.) and returns a -# `CompiledGraph`. Any problem with the graph's shape surfaces HERE, not at -# runtime — failing fast at the construction boundary. -# -# Each node has exactly ONE outgoing edge. Branching isn't done with multiple -# static edges; it's done with a single `.add_conditional_edge(source, fn)` where -# `fn(state) -> next_node_name`. We're linear here, so no conditional. -# -# `END` is a sentinel object, not a reserved string. Import it from -# `openarmature.graph`; don't use the literal `"END"` — it would be treated -# as a node name and fail `DanglingEdge` at compile time. - - -def build_graph() -> CompiledGraph[GraphState]: - return ( - GraphBuilder(GraphState) - .add_node("plan", plan_node) - .add_node("write", write_node) - .add_edge("plan", "write") - .add_edge("write", END) - .set_entry("plan") - .compile() - ) - - -# ---------------------------------------------------------------------------- -# Invoking the graph -# ---------------------------------------------------------------------------- -# `graph.invoke(initial_state)` runs the compiled graph from entry to END -# and returns the final state. It's `async` because nodes can (and usually -# do) perform IO. -# -# Initial state: construct an instance of your state class with the required -# fields filled in. Here that's just `GraphState(topic=topic)` — the other -# fields have schema-level defaults, so when `plan` (the entry node) is -# called it sees empty `plan`, empty `output`, and empty `trace`. -# -# `GraphBuilder` and `CompiledGraph` are generic on the state type — -# `build_graph()` is annotated as returning `CompiledGraph[GraphState]`, so -# `await graph.invoke(...)` returns a `GraphState`, not the base `State`. -# Typed field access (`final.topic`) works without a `cast()` on the return. -# (Earlier versions of the library required `cast(GraphState, ...)`; see -# `_docs/rough-edges.md` for the history.) - - -async def main() -> None: - topic = " ".join(sys.argv[1:]) or "the psychology of long walks" - graph = build_graph() - final = await graph.invoke(GraphState(topic=topic)) - - print(f"topic: {final.topic}\n") - print(f"plan:\n{final.plan}\n") - print(f"output:\n{final.output}\n") - print(f"trace: {final.trace}") - - -if __name__ == "__main__": - asyncio.run(main()) diff --git a/examples/02-routing-and-subgraphs/main.py b/examples/01-routing-and-subgraphs/main.py similarity index 85% rename from examples/02-routing-and-subgraphs/main.py rename to examples/01-routing-and-subgraphs/main.py index 557fdc0..bfe0821 100644 --- a/examples/02-routing-and-subgraphs/main.py +++ b/examples/01-routing-and-subgraphs/main.py @@ -10,8 +10,7 @@ custom `ProjectionStrategy` for the parent ↔ subgraph boundary, and the `merge` reducer for dict accumulation. -This is the second demo in the series. It exercises three graph features -that `01-linear-pipeline/` didn't: +Three graph features that `00-hello-world` only touched lightly: 1. **Conditional edges.** The entry node classifies the question and the graph routes to one of two branches based on that classification. @@ -22,27 +21,33 @@ from its own schema's defaults. To pass the user's question in (and shape what comes back out), we write a `ProjectionStrategy` by hand. -And for good measure it also demonstrates the **merge reducer** for dict -accumulation, alongside the **append reducer** already seen in 01-linear-pipeline. +LLM calls go through ``openarmature.llm.OpenAIProvider`` (same pattern as +``00-hello-world``) so the example reads as the recommended path rather +than as "openai with some openarmature on top." + +**Configuration** (env vars; OpenAI defaults shown): + +- ``LLM_BASE_URL`` defaults to ``https://api.openai.com``. **Host root + only** — the provider adds the path itself. +- ``LLM_MODEL`` defaults to ``gpt-4o-mini``. +- ``LLM_API_KEY`` required (empty for local servers that don't authenticate). Run with: - uv run python main.py "what year did the moon landing happen" # → quick branch - uv run python main.py "is espresso actually more caffeinated than drip?" # → research branch + + uv sync --group examples + cd examples/01-routing-and-subgraphs + LLM_API_KEY=sk-... uv run python main.py "what year did the moon landing happen" + LLM_API_KEY=sk-... uv run python main.py "is espresso actually more caffeinated than drip?" """ from __future__ import annotations import asyncio +import os import sys from collections.abc import Mapping from typing import Annotated, Any -from openai import AsyncOpenAI -from openai.types.chat import ( - ChatCompletionMessageParam, - ChatCompletionSystemMessageParam, - ChatCompletionUserMessageParam, -) from pydantic import Field from openarmature.graph import ( @@ -54,11 +59,22 @@ append, merge, ) +from openarmature.llm import OpenAIProvider, SystemMessage, UserMessage + +# Lazy-initialized so importing this module (test harnesses, doc builders, +# IDE inspection) doesn't open an httpx.AsyncClient connection pool. +_provider_instance: OpenAIProvider | None = None -VLLM_BASE_URL = "http://localhost:8000/v1" -MODEL = "dark-side-of-the-code/Mistral-Small-24B-Instruct-2501-AWQ" -client = AsyncOpenAI(base_url=VLLM_BASE_URL, api_key="not-needed") +def _get_provider() -> OpenAIProvider: + global _provider_instance + if _provider_instance is None: + _provider_instance = OpenAIProvider( + base_url=os.environ.get("LLM_BASE_URL", "https://api.openai.com"), + model=os.environ.get("LLM_MODEL", "gpt-4o-mini"), + api_key=os.environ.get("LLM_API_KEY") or None, + ) + return _provider_instance # ---------------------------------------------------------------------------- @@ -77,9 +93,9 @@ # - Boundaries are auditable. To find "what does the subgraph see?" you read # one projection class, not a scattered naming convention. # -# Both schemas below use the same reducer patterns we introduced in -# 01-linear-pipeline: `append` on a `trace` list, `merge` on a dict. Fields without -# an `Annotated[..., reducer]` get `last_write_wins` by default. +# Both schemas below use the standard reducer set: `append` on the +# `trace` list, `merge` on a dict. Fields without an +# `Annotated[..., reducer]` get `last_write_wins` by default. class AssistantState(State): @@ -103,30 +119,27 @@ class ResearchState(State): # ---------------------------------------------------------------------------- -# LLM helper (not openarmature — plumbing) +# LLM helper # ---------------------------------------------------------------------------- +# Thin wrapper over Provider.complete that takes a system + user pair and +# returns the assistant's reply as a string. Keeps the node bodies focused +# on graph logic (state in → state update out) rather than provider +# plumbing. Production code would typically inline the call. async def _chat(system: str, user: str) -> str: - messages: list[ChatCompletionMessageParam] = [ - ChatCompletionSystemMessageParam(role="system", content=system), - ChatCompletionUserMessageParam(role="user", content=user), - ] - resp = await client.chat.completions.create( - model=MODEL, - messages=messages, - temperature=0.3, - stream=False, + response = await _get_provider().complete( + [SystemMessage(content=system), UserMessage(content=user)], ) - return (resp.choices[0].message.content or "").strip() + return (response.message.content or "").strip() # ---------------------------------------------------------------------------- # Outer-graph nodes # ---------------------------------------------------------------------------- -# Every node is the same shape as in 01-linear-pipeline: `async def(state) -> dict`, -# returning ONLY the fields it wants to change. The engine applies per-field -# reducers and re-validates. +# Standard node shape: `async def(state) -> dict`, returning ONLY the +# fields it wants to change. The engine applies per-field reducers and +# re-validates. # # Three things worth noticing as you read these: # @@ -334,9 +347,8 @@ def build_research_subgraph() -> CompiledGraph[ResearchState]: # # - `project_in`: DELIBERATELY LIMITED. It builds a fresh subgraph state # from its schema's defaults — `subgraph_state_cls()`. The parent's -# state is ignored. This is the spec's default and it's an explicit -# design choice (v0.1.1 §2 Subgraph): subgraphs don't see the outer -# world unless the author opts in. +# state is ignored. Subgraphs don't see the outer world unless the +# author opts in — encapsulation is the point. # # For this demo we absolutely need the question in the subgraph. So we write # a projection class that implements the `ProjectionStrategy` Protocol (see @@ -444,15 +456,19 @@ def build_graph() -> CompiledGraph[AssistantState]: async def main() -> None: question = " ".join(sys.argv[1:]) or "is espresso actually more caffeinated than drip coffee?" graph = build_graph() - final = await graph.invoke(AssistantState(question=question)) - - print(f"question: {final.question}") - print(f"route: {final.route}") - print() - print(f"answer:\n{final.answer}") - print() - print(f"trace: {final.trace}") - print(f"tallies: {final.tallies}") + try: + final = await graph.invoke(AssistantState(question=question)) + print(f"question: {final.question}") + print(f"route: {final.route}") + print() + print(f"answer:\n{final.answer}") + print() + print(f"trace: {final.trace}") + print(f"tallies: {final.tallies}") + finally: + await graph.drain() + if _provider_instance is not None: + await _provider_instance.aclose() if __name__ == "__main__": diff --git a/examples/03-explicit-subgraph-mapping/main.py b/examples/02-explicit-subgraph-mapping/main.py similarity index 76% rename from examples/03-explicit-subgraph-mapping/main.py rename to examples/02-explicit-subgraph-mapping/main.py index f31fa40..717bf06 100644 --- a/examples/03-explicit-subgraph-mapping/main.py +++ b/examples/02-explicit-subgraph-mapping/main.py @@ -5,37 +5,41 @@ by running the same analysis subgraph on each, then synthesizing a verdict. **Demonstrates:** One compiled subgraph reused at two parent sites with -per-site `ExplicitMapping` — the case spec v0.2 / proposal 0002 was written -for, and the only way to express "run the same subgraph twice on disjoint -parent fields" without per-site projection classes that mirror each other. +per-site `ExplicitMapping` — the canonical way to express "run the same +subgraph twice on disjoint parent fields" without writing per-site +projection classes that mirror each other. -This is the case proposal 0002 was written for. Without explicit input/output -mapping, both sites would have to read from and write to the same parent -fields under name matching — making "run the same subgraph twice on different -inputs" structurally impossible. The two analyze_a/analyze_b sites here share -the SAME compiled subgraph value but project different parent fields in and -different parent fields out. +Without explicit input/output mapping, both sites would have to read from +and write to the same parent fields under name matching — making "run the +same subgraph twice on different inputs" structurally impossible. The two +analyze_a/analyze_b sites here share the SAME compiled subgraph value but +project different parent fields in and different parent fields out. + +LLM calls go through ``openarmature.llm.OpenAIProvider``. + +**Configuration** (env vars; OpenAI defaults shown): + +- ``LLM_BASE_URL`` defaults to ``https://api.openai.com``. **Host root only.** +- ``LLM_MODEL`` defaults to ``gpt-4o-mini``. +- ``LLM_API_KEY`` required (empty for local servers that don't authenticate). Run with: - uv run python main.py "rust" "go" - uv run python main.py "espresso vs drip coffee" - uv run python main.py # → defaults to rust vs go + + uv sync --group examples + cd examples/02-explicit-subgraph-mapping + LLM_API_KEY=sk-... uv run python main.py "rust" "go" + LLM_API_KEY=sk-... uv run python main.py "espresso vs drip coffee" """ from __future__ import annotations import asyncio +import os import re import sys from collections.abc import Mapping from typing import Annotated, Any -from openai import AsyncOpenAI -from openai.types.chat import ( - ChatCompletionMessageParam, - ChatCompletionSystemMessageParam, - ChatCompletionUserMessageParam, -) from pydantic import Field from openarmature.graph import ( @@ -46,11 +50,20 @@ State, append, ) +from openarmature.llm import OpenAIProvider, SystemMessage, UserMessage + +_provider_instance: OpenAIProvider | None = None -VLLM_BASE_URL = "http://localhost:8000/v1" -MODEL = "dark-side-of-the-code/Mistral-Small-24B-Instruct-2501-AWQ" -client = AsyncOpenAI(base_url=VLLM_BASE_URL, api_key="not-needed") +def _get_provider() -> OpenAIProvider: + global _provider_instance + if _provider_instance is None: + _provider_instance = OpenAIProvider( + base_url=os.environ.get("LLM_BASE_URL", "https://api.openai.com"), + model=os.environ.get("LLM_MODEL", "gpt-4o-mini"), + api_key=os.environ.get("LLM_API_KEY") or None, + ) + return _provider_instance # ---------------------------------------------------------------------------- @@ -66,7 +79,7 @@ # parent's per-side fields. # # This separation is the whole point. If the parent and subgraph shared field -# names (`summary`, `score`) and we relied on the spec's default field-name +# names (`summary`, `score`) and we relied on default field-name # matching, the two subgraph calls would BOTH write to a single # `parent.summary` field — the second call would clobber the first, and the # comparator at the end would have no way to see both. With explicit mapping @@ -96,22 +109,15 @@ class AnalysisState(State): # ---------------------------------------------------------------------------- -# LLM helper (plumbing — not openarmature) +# LLM helper # ---------------------------------------------------------------------------- async def _chat(system: str, user: str) -> str: - messages: list[ChatCompletionMessageParam] = [ - ChatCompletionSystemMessageParam(role="system", content=system), - ChatCompletionUserMessageParam(role="user", content=user), - ] - resp = await client.chat.completions.create( - model=MODEL, - messages=messages, - temperature=0.3, - stream=False, + response = await _get_provider().complete( + [SystemMessage(content=system), UserMessage(content=user)], ) - return (resp.choices[0].message.content or "").strip() + return (response.message.content or "").strip() # ---------------------------------------------------------------------------- @@ -199,10 +205,10 @@ async def synthesize(s: ComparisonState) -> Mapping[str, Any]: # with no way to express "this site reads A, that site reads B." Both # sites would write `parent.summary` and clobber each other. # -# - A custom `ProjectionStrategy` (the 02-routing-and-subgraphs approach) +# - A custom `ProjectionStrategy` (the 01-routing-and-subgraphs approach) # would have to differ per call site — you'd write two distinct projection # classes that do the same thing in mirror image. That's exactly the -# boilerplate proposal 0002 removes. +# boilerplate `ExplicitMapping` removes. # # - The subgraph can't rename its own fields to avoid the clash: its schema # is fixed at compile time and shared across both call sites. Wrong layer. @@ -259,19 +265,24 @@ async def main() -> None: topic_a, topic_b = "rust", "go" graph = build_graph() - final = await graph.invoke(ComparisonState(topic_a=topic_a, topic_b=topic_b)) - - print(f"topic A: {final.topic_a}") - print(f" summary: {final.a_summary}") - print(f" score: {final.a_score}/10") - print() - print(f"topic B: {final.topic_b}") - print(f" summary: {final.b_summary}") - print(f" score: {final.b_score}/10") - print() - print(f"verdict:\n{final.verdict}") - print() - print(f"trace: {final.trace}") + try: + final = await graph.invoke(ComparisonState(topic_a=topic_a, topic_b=topic_b)) + + print(f"topic A: {final.topic_a}") + print(f" summary: {final.a_summary}") + print(f" score: {final.a_score}/10") + print() + print(f"topic B: {final.topic_b}") + print(f" summary: {final.b_summary}") + print(f" score: {final.b_score}/10") + print() + print(f"verdict:\n{final.verdict}") + print() + print(f"trace: {final.trace}") + finally: + await graph.drain() + if _provider_instance is not None: + await _provider_instance.aclose() if __name__ == "__main__": diff --git a/examples/04-observer-hooks/main.py b/examples/03-observer-hooks/main.py similarity index 69% rename from examples/04-observer-hooks/main.py rename to examples/03-observer-hooks/main.py index 5c079ab..abf0a47 100644 --- a/examples/04-observer-hooks/main.py +++ b/examples/03-observer-hooks/main.py @@ -1,36 +1,51 @@ -"""openarmature demo: observer hooks for structured logging + per-call metrics. +"""openarmature demo: observer hooks for structured logging, per-call metrics, and OTel spans. **Use case:** Add observability to a small three-stage answer pipeline (an outer `draft → review → finalize` flow where `review` is its own subgraph) -without changing any node code. A graph-attached console tracer prints -every node-boundary event to stderr; an invocation-scoped metrics -collector tallies counts for THIS specific call. +without changing any node code. Three observer flavors run side-by-side: -**Demonstrates:** Observer hooks (spec v0.3 / proposal 0003) — registering -graph-attached and invocation-scoped observers, the `NodeEvent` shape, -namespace chaining across a subgraph boundary, the `drain()` call required -for short-lived processes, and how observers see structured pre/post state -without nodes having to log anything themselves. + 1. A **graph-attached console tracer** that prints every node-boundary event + to stderr as a structured one-liner. + 2. An **invocation-scoped metrics collector** that tallies counts for THIS + specific call. + 3. The **OTel observer** wired to a console span exporter, so the same + boundaries surface as OpenTelemetry spans. + +**Demonstrates:** Observer hooks — registering graph-attached and +invocation-scoped observers, the `NodeEvent` shape, namespace chaining +across a subgraph boundary, the `drain()` call required for short-lived +processes, and how observers see structured pre/post state without nodes +having to log anything themselves. Also covers the OTel mapping: the +`OTelObserver` is just another observer registration; the same events +turn into spans on its private TracerProvider. + +LLM calls go through ``openarmature.llm.OpenAIProvider``. + +**Configuration** (env vars; OpenAI defaults shown): + +- ``LLM_BASE_URL`` defaults to ``https://api.openai.com``. **Host root only.** +- ``LLM_MODEL`` defaults to ``gpt-4o-mini``. +- ``LLM_API_KEY`` required (empty for local servers that don't authenticate). Run with: - uv run python main.py "what year did the moon landing happen" - uv run python main.py "explain the rise of espresso culture" - uv run python main.py # → uses default question + + uv sync --group examples --all-extras + cd examples/03-observer-hooks + LLM_API_KEY=sk-... uv run python main.py "what year did the moon landing happen" + LLM_API_KEY=sk-... uv run python main.py "explain the rise of espresso culture" + +(``--all-extras`` pulls in ``opentelemetry-sdk`` for the OTel observer.) """ from __future__ import annotations import asyncio +import os import sys from collections.abc import Mapping from typing import Annotated, Any -from openai import AsyncOpenAI -from openai.types.chat import ( - ChatCompletionMessageParam, - ChatCompletionSystemMessageParam, - ChatCompletionUserMessageParam, -) +from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor from pydantic import Field from openarmature.graph import ( @@ -43,11 +58,21 @@ State, append, ) +from openarmature.llm import OpenAIProvider, SystemMessage, UserMessage +from openarmature.observability.otel import OTelObserver + +_provider_instance: OpenAIProvider | None = None -VLLM_BASE_URL = "http://localhost:8000/v1" -MODEL = "dark-side-of-the-code/Mistral-Small-24B-Instruct-2501-AWQ" -client = AsyncOpenAI(base_url=VLLM_BASE_URL, api_key="not-needed") +def _get_provider() -> OpenAIProvider: + global _provider_instance + if _provider_instance is None: + _provider_instance = OpenAIProvider( + base_url=os.environ.get("LLM_BASE_URL", "https://api.openai.com"), + model=os.environ.get("LLM_MODEL", "gpt-4o-mini"), + api_key=os.environ.get("LLM_API_KEY") or None, + ) + return _provider_instance # ---------------------------------------------------------------------------- @@ -57,7 +82,7 @@ # fields we want to flow across the boundary (`draft`, `revised`, `trace`) # and each adds its own (`question` outside, `critique` inside). Keeping # the fields aligned by name lets the subgraph's outputs flow back through -# the spec's default field-name matching — except for `draft`, which we +# default field-name matching — except for `draft`, which we # DO need to project IN. Hence the `inputs={"draft": "draft"}` mapping # below; absent `outputs` falls back to field-name matching for the way # back, projecting `revised` and `trace`. @@ -82,22 +107,15 @@ class ReviewState(State): # ---------------------------------------------------------------------------- -# LLM helper (plumbing — not openarmature) +# LLM helper # ---------------------------------------------------------------------------- async def _chat(system: str, user: str) -> str: - messages: list[ChatCompletionMessageParam] = [ - ChatCompletionSystemMessageParam(role="system", content=system), - ChatCompletionUserMessageParam(role="user", content=user), - ] - resp = await client.chat.completions.create( - model=MODEL, - messages=messages, - temperature=0.3, - stream=False, + response = await _get_provider().complete( + [SystemMessage(content=system), UserMessage(content=user)], ) - return (resp.choices[0].message.content or "").strip() + return (response.message.content or "").strip() # ---------------------------------------------------------------------------- @@ -259,21 +277,40 @@ def build_graph() -> CompiledGraph[AnswerState]: # The shape of an observer-aware run: # # 1. Build the graph. -# 2. Attach graph-level observers (console_tracer here) — these fire on -# every invoke of this compiled graph. +# 2. Attach graph-level observers (console_tracer + OTelObserver here) +# — these fire on every invoke of this compiled graph. # 3. For each invoke, optionally pass invocation-scoped observers # (metrics here) — they fire only for THAT invocation. # 4. await drain() before exiting. The graph dispatches events to a # background queue; without drain, a short-lived process can exit # before the queue's worker has delivered them. In a long-running # service this isn't necessary because the event loop keeps running. +# +# The OTel observer is just another observer registration. It speaks the +# same `Observer` Protocol; the difference is what it does with each event: +# it opens / closes spans on a private TracerProvider, threading parent / +# child / fan-out relationships. Wiring it next to the bare async function +# above shows the point: observability backends are pluggable behind one +# uniform hook. async def main() -> None: question = " ".join(sys.argv[1:]) or "what year did the moon landing happen" + # OTel observer with a console span exporter — every span prints to + # stdout as a JSON blob when it closes. SimpleSpanProcessor exports + # synchronously which is right for a short-lived demo; production + # would use BatchSpanProcessor against a real OTLP exporter. The + # provider here is PRIVATE to the observer; the global + # TracerProvider is untouched, so this won't pollute any OTel + # setup the surrounding application already has. + otel_observer = OTelObserver( + span_processor=SimpleSpanProcessor(ConsoleSpanExporter()), + ) + graph = build_graph() graph.attach_observer(console_tracer) + graph.attach_observer(otel_observer) metrics = InvocationMetrics() try: @@ -285,11 +322,13 @@ async def main() -> None: # Required for short-lived processes: invoke() returns when the # graph reaches END regardless of whether the observer queue has # finished. The try/finally also matters on the failure path — - # per spec v0.3 §6, the engine dispatches a failure event with - # `error` populated BEFORE propagating, and that event is exactly - # what a debugging user would want to see. Without `finally`, an - # invoke that raises would lose those late events. + # the engine dispatches a failure event with `error` populated + # BEFORE propagating, and that event is exactly what a debugging + # user would want to see. Without `finally`, an invoke that + # raises would lose those late events. await graph.drain() + if _provider_instance is not None: + await _provider_instance.aclose() print() print(f"question: {final.question}") diff --git a/examples/04-nested-subgraphs/main.py b/examples/04-nested-subgraphs/main.py new file mode 100644 index 0000000..6441199 --- /dev/null +++ b/examples/04-nested-subgraphs/main.py @@ -0,0 +1,394 @@ +"""openarmature demo: question answering against a tiny document corpus, with +two levels of subgraph nesting. + +**Use case:** Given a question and a small corpus of documents, find the +answer. Three layers of responsibility: + +1. **Outer (coordinator).** Takes the user's question, delegates to the + doc-QA subgraph, and polishes the final answer for the user. +2. **Doc-QA subgraph (middle).** Picks the single most relevant document + from the corpus, delegates the section-level work to its own subgraph, + and synthesizes a clean answer from what came back. +3. **Section-extract subgraph (inner).** Given a single document and the + question, finds the relevant paragraph and extracts the answer text. + +Each layer has its own state schema reflecting its scope: the outer cares +about a question + final answer, the middle picks one document from CORPUS +and synthesizes, the inner cares about a single doc + an extracted span. +That separation is the whole reason the middle and inner pieces are +subgraphs and not flat nodes — each is a self-contained, reusable +sub-pipeline with its own inputs and outputs. + +**Configuration** (env vars; OpenAI defaults shown): + +- ``LLM_BASE_URL`` defaults to ``https://api.openai.com``. **Host root only.** +- ``LLM_MODEL`` defaults to ``gpt-4o-mini``. +- ``LLM_API_KEY`` required (empty for local servers that don't authenticate). + +Run with: + + uv sync --group examples + cd examples/04-nested-subgraphs + LLM_API_KEY=sk-... uv run python main.py "what year did humans first land on the moon?" + LLM_API_KEY=sk-... uv run python main.py "how is espresso different from drip coffee?" +""" + +from __future__ import annotations + +import asyncio +import os +import sys +from collections.abc import Mapping +from typing import Annotated, Any + +from pydantic import Field + +from openarmature.graph import ( + END, + CompiledGraph, + ExplicitMapping, + GraphBuilder, + NodeEvent, + State, + append, +) +from openarmature.llm import OpenAIProvider, SystemMessage, UserMessage + +_provider_instance: OpenAIProvider | None = None + + +def _get_provider() -> OpenAIProvider: + global _provider_instance + if _provider_instance is None: + _provider_instance = OpenAIProvider( + base_url=os.environ.get("LLM_BASE_URL", "https://api.openai.com"), + model=os.environ.get("LLM_MODEL", "gpt-4o-mini"), + api_key=os.environ.get("LLM_API_KEY") or None, + ) + return _provider_instance + + +async def _chat(system: str, user: str) -> str: + response = await _get_provider().complete( + [SystemMessage(content=system), UserMessage(content=user)], + ) + return (response.message.content or "").strip() + + +# --------------------------------------------------------------------------- +# A tiny baked-in corpus. In a real app this would come from a retriever +# or a vector store; here it's three short documents so the example runs +# without any external setup. +# --------------------------------------------------------------------------- + +CORPUS: list[dict[str, str]] = [ + { + "title": "Apollo 11", + "body": ( + "Apollo 11 was the United States spaceflight that first landed humans on the Moon. " + "Commander Neil Armstrong and lunar module pilot Buzz Aldrin landed the Apollo Lunar " + "Module Eagle on July 20, 1969. Armstrong became the first person to step onto the " + "lunar surface six hours and 39 minutes later, on July 21 at 02:56 UTC. The mission " + "fulfilled a national goal proposed by President Kennedy in 1961." + ), + }, + { + "title": "Espresso", + "body": ( + "Espresso is a coffee brewing method of Italian origin. It is made by forcing pressurized " + "hot water through finely ground coffee. The resulting shot is more concentrated than coffee " + "brewed by other methods, with a layer of crema on top. Espresso has more caffeine per " + "unit volume than most coffee beverages but a typical serving is one-tenth the volume of a " + "drip coffee, so a single espresso usually contains less total caffeine than a drip cup." + ), + }, + { + "title": "Walking", + "body": ( + "Walking is the most common form of human locomotion and is associated with a range of " + "health benefits including reduced risk of cardiovascular disease, improved mood, and " + "lower mortality. A moderate pace of around 100 steps per minute is often cited as a " + "useful threshold. Walking as a deliberate practice has long been associated with " + "thinking and writing — many writers credit long walks as part of their creative process." + ), + }, +] + + +# --------------------------------------------------------------------------- +# State schemas — one per layer, each scoped to its layer's job. +# --------------------------------------------------------------------------- + + +class OuterState(State): + """User-facing state: a question goes in, an answer comes out.""" + + question: str + answer: str = "" + trace: Annotated[list[str], append] = Field(default_factory=list) + + +class DocQAState(State): + """Middle: the doc-QA subgraph picks a doc and synthesizes an answer. + + The corpus itself is module-level configuration, not per-invocation + state. Nodes reach into ``CORPUS`` directly rather than carrying it + through state — typical for application config that doesn't change + between calls. + """ + + question: str = "" + selected_title: str = "" + selected_body: str = "" + raw_answer: str = "" + answer: str = "" + trace: Annotated[list[str], append] = Field(default_factory=list) + + +class SectionState(State): + """Inner: the section-extract subgraph narrows to one paragraph then + pulls the answer text out of it.""" + + question: str = "" + doc_body: str = "" + relevant_section: str = "" + extracted: str = "" + trace: Annotated[list[str], append] = Field(default_factory=list) + + +# --------------------------------------------------------------------------- +# Inner subgraph: section-extract (one doc → answer text) +# --------------------------------------------------------------------------- + + +async def find_section(s: SectionState) -> Mapping[str, Any]: + content = await _chat( + system=( + "You are given a document and a question. Find the single paragraph " + "in the document most likely to contain the answer. Return ONLY that " + "paragraph verbatim, no preamble." + ), + user=f"Question: {s.question}\n\nDocument:\n{s.doc_body}", + ) + return {"relevant_section": content, "trace": ["find_section"]} + + +async def extract_answer(s: SectionState) -> Mapping[str, Any]: + content = await _chat( + system=( + "You are given a question and a paragraph that contains the answer. " + "Extract just the answer in one short phrase or sentence. No preamble, " + "no quoting the source." + ), + user=f"Question: {s.question}\n\nParagraph:\n{s.relevant_section}", + ) + return {"extracted": content, "trace": ["extract_answer"]} + + +def build_section_extract() -> CompiledGraph[SectionState]: + return ( + GraphBuilder(SectionState) + .add_node("find_section", find_section) + .add_node("extract_answer", extract_answer) + .add_edge("find_section", "extract_answer") + .add_edge("extract_answer", END) + .set_entry("find_section") + .compile() + ) + + +# --------------------------------------------------------------------------- +# Middle subgraph: doc-QA (corpus → answer) +# --------------------------------------------------------------------------- + + +async def pick_doc(s: DocQAState) -> Mapping[str, Any]: + """Ask the LLM which corpus document is most relevant to the question.""" + titles_and_bodies = "\n\n".join(f"{d['title']}:\n{d['body']}" for d in CORPUS) + content = await _chat( + system=( + "You are given a question and several documents. Reply with EXACTLY " + "the title of the single document most relevant to answering the " + "question. No quotes, no punctuation, just the title." + ), + user=f"Question: {s.question}\n\nDocuments:\n\n{titles_and_bodies}", + ) + reply = content.strip().strip('"').strip("'").lower() + # Permissive match: the model may paraphrase ("Apollo 11 article") or + # return only part of the title. Accept either direction of containment + # over the lowercased strings — strict equality is too brittle for + # free-form output. A production app would constrain the model with + # response_schema (see 00-hello-world) so the reply is guaranteed to be + # a valid title. + match = next( + (d for d in CORPUS if d["title"].lower() in reply or reply in d["title"].lower()), + None, + ) + if match is None: + raise RuntimeError( + f"pick_doc: model returned {content!r} which doesn't match any " + f"corpus title ({[d['title'] for d in CORPUS]!r})." + ) + return {"selected_title": match["title"], "selected_body": match["body"], "trace": ["pick_doc"]} + + +async def synthesize(s: DocQAState) -> Mapping[str, Any]: + """Polish the extracted answer into one user-facing sentence.""" + content = await _chat( + system=( + "You are given a question and a short raw answer extracted from a " + "document. Rewrite the answer as one clean sentence that stands on " + "its own. No preamble." + ), + user=f"Question: {s.question}\n\nRaw answer:\n{s.raw_answer}", + ) + return {"answer": content, "trace": ["synthesize"]} + + +def build_doc_qa(section_extract: CompiledGraph[SectionState]) -> CompiledGraph[DocQAState]: + return ( + GraphBuilder(DocQAState) + .add_node("pick_doc", pick_doc) + .add_subgraph_node( + "section_extract", + section_extract, + # The middle hands its selected doc to the inner subgraph, then + # receives back the extracted text into ``raw_answer`` for the + # synthesize step. + projection=ExplicitMapping[DocQAState, SectionState]( + inputs={"question": "question", "doc_body": "selected_body"}, + outputs={"raw_answer": "extracted", "trace": "trace"}, + ), + ) + .add_node("synthesize", synthesize) + .add_edge("pick_doc", "section_extract") + .add_edge("section_extract", "synthesize") + .add_edge("synthesize", END) + .set_entry("pick_doc") + .compile() + ) + + +# --------------------------------------------------------------------------- +# Outer graph: coordinator +# --------------------------------------------------------------------------- + + +async def receive(s: OuterState) -> Mapping[str, Any]: + """Marker node so the trace shows the outer received the question.""" + del s + return {"trace": ["receive"]} + + +async def format_final(s: OuterState) -> Mapping[str, Any]: + """Light polish on the synthesized answer before returning to the user.""" + content = await _chat( + system=( + "Lightly copy-edit the following answer for clarity. Keep it short " + "and preserve meaning. Return only the edited answer." + ), + user=s.answer, + ) + return {"answer": content, "trace": ["format_final"]} + + +def build_graph() -> CompiledGraph[OuterState]: + section_extract = build_section_extract() + doc_qa = build_doc_qa(section_extract) + return ( + GraphBuilder(OuterState) + .add_node("receive", receive) + .add_subgraph_node( + "doc_qa", + doc_qa, + # The outer feeds its question and the corpus down to the + # doc-QA subgraph and receives back the synthesized answer. + projection=ExplicitMapping[OuterState, DocQAState]( + inputs={"question": "question"}, + outputs={"answer": "answer", "trace": "trace"}, + ), + ) + .add_node("format_final", format_final) + .add_edge("receive", "doc_qa") + .add_edge("doc_qa", "format_final") + .add_edge("format_final", END) + .set_entry("receive") + .compile() + ) + + +# --------------------------------------------------------------------------- +# Observer — formats events so the descent through layers is visible. +# +# The same observer fires for every node at every depth — including the +# inner section-extract subgraph at depth 3. Indentation in the printed +# output makes the descent and return obvious. +# --------------------------------------------------------------------------- + + +def _fmt_state(state: Any) -> str: + """Compact one-line dump of whichever state class the event carries.""" + if state is None: + return "—" + dumped = state.model_dump() + # Hide the trace (already visible in the printed order). Print the + # remaining fields as a compact summary. + skip = {"trace"} + parts: list[str] = [] + for key, value in dumped.items(): + if key in skip: + continue + # Truncate long string values so the line stays scannable. + if isinstance(value, str) and len(value) > 60: + value = value[:57] + "..." + parts.append(f"{key}={value!r}") + return " ".join(parts) if parts else "(empty)" + + +async def depth_observer(event: NodeEvent) -> None: + depth = len(event.namespace) + indent = " " * (depth - 1) + ns = " > ".join(event.namespace) + + if event.phase == "started": + print(f"{indent}[step {event.step}] depth={depth} {ns}") + print(f"{indent} started {_fmt_state(event.pre_state)}") + else: + if event.error is not None: + print(f"{indent} completed ERROR={type(event.error).__name__}: {event.error}") + else: + print(f"{indent} completed {_fmt_state(event.post_state)}") + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + + +async def main() -> None: + question = " ".join(sys.argv[1:]) or "what year did humans first land on the moon?" + + outer = build_graph() + outer.attach_observer(depth_observer) + + print("=" * 72) + print(f"Question: {question}") + print("=" * 72) + print() + + try: + final = await outer.invoke(OuterState(question=question)) + print() + print("=" * 72) + print(f"Answer: {final.answer}") + print("=" * 72) + print() + print(f"Trace: {final.trace}") + finally: + await outer.drain() + if _provider_instance is not None: + await _provider_instance.aclose() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/05-nested-subgraphs/main.py b/examples/05-nested-subgraphs/main.py deleted file mode 100644 index 3fd8160..0000000 --- a/examples/05-nested-subgraphs/main.py +++ /dev/null @@ -1,236 +0,0 @@ -"""openarmature demo: two-level subgraph nesting and the depth invariants. - -**Use case:** Show what observer events look like when subgraphs are nested -TWO levels deep. The first four examples cover composition basics; this one -zooms in on what happens at depth > 1, where the engine's namespace chain, -parent-state stack, and step counter all extend across multiple subgraph -boundaries. - -**Demonstrates:** Spec graph-engine §6 depth invariants — - -- `len(parent_states) == len(namespace) - 1` at every depth, including 3. -- `step` is monotonic across both subgraph boundaries (no resets). -- `parent_states[k]` is the k-th containing graph's state at the moment - that graph entered the subgraph leading to this event. Snapshots are - stable for the duration of the inner run. -- Subgraph state shape is the SUBGRAPH's schema (not the parent's), - so `pre_state`/`post_state` carry the inner graph's fields. - -The bodies are trivial integer updates so the observer output is short -and predictable. Read the printed output top-to-bottom — depth is shown -both numerically and via indentation, and the parent-states list grows -as the engine descends and shrinks as it returns. - -Run with: - uv run python main.py -""" - -from __future__ import annotations - -import asyncio -from collections.abc import Mapping -from typing import Annotated, Any - -from pydantic import Field - -from openarmature.graph import ( - END, - CompiledGraph, - GraphBuilder, - NodeEvent, - State, - append, -) - -# --------------------------------------------------------------------------- -# State schemas — same shape at every level so default field-name matching -# projects values across boundaries. -# --------------------------------------------------------------------------- - - -class InnerState(State): - v: int = 0 - trace: Annotated[list[str], append] = Field(default_factory=list) - - -class MiddleState(State): - v: int = 0 - trace: Annotated[list[str], append] = Field(default_factory=list) - - -class OuterState(State): - v: int = 0 - trace: Annotated[list[str], append] = Field(default_factory=list) - - -# --------------------------------------------------------------------------- -# Node bodies — trivial updates. The numbers are chosen so it's easy to see -# at a glance which level a value came from in the final state: -# 1, 999 → outer -# 10, 20 → middle -# 100, 200 → inner -# --------------------------------------------------------------------------- - - -async def outer_a(_: OuterState) -> Mapping[str, Any]: - return {"v": 1, "trace": ["outer_a"]} - - -async def outer_b(_: OuterState) -> Mapping[str, Any]: - return {"v": 999, "trace": ["outer_b"]} - - -async def mid_x(_: MiddleState) -> Mapping[str, Any]: - return {"v": 10, "trace": ["mid_x"]} - - -async def mid_y(_: MiddleState) -> Mapping[str, Any]: - return {"v": 20, "trace": ["mid_y"]} - - -async def inner_p(_: InnerState) -> Mapping[str, Any]: - return {"v": 100, "trace": ["inner_p"]} - - -async def inner_q(_: InnerState) -> Mapping[str, Any]: - return {"v": 200, "trace": ["inner_q"]} - - -# --------------------------------------------------------------------------- -# Graph builders — innermost first, since each outer graph references the -# inner one as a SubgraphNode and needs it compiled at build time. -# --------------------------------------------------------------------------- - - -def build_inner() -> CompiledGraph[InnerState]: - builder: GraphBuilder[InnerState] = GraphBuilder(InnerState) - builder.set_entry("inner_p") - builder.add_node("inner_p", inner_p) - builder.add_node("inner_q", inner_q) - builder.add_edge("inner_p", "inner_q") - builder.add_edge("inner_q", END) - return builder.compile() - - -def build_middle(inner: CompiledGraph[InnerState]) -> CompiledGraph[MiddleState]: - builder: GraphBuilder[MiddleState] = GraphBuilder(MiddleState) - builder.set_entry("mid_x") - builder.add_node("mid_x", mid_x) - builder.add_subgraph_node("mid_inner", inner) - builder.add_node("mid_y", mid_y) - builder.add_edge("mid_x", "mid_inner") - builder.add_edge("mid_inner", "mid_y") - builder.add_edge("mid_y", END) - return builder.compile() - - -def build_outer(middle: CompiledGraph[MiddleState]) -> CompiledGraph[OuterState]: - builder: GraphBuilder[OuterState] = GraphBuilder(OuterState) - builder.set_entry("outer_a") - builder.add_node("outer_a", outer_a) - builder.add_subgraph_node("outer_mid", middle) - builder.add_node("outer_b", outer_b) - builder.add_edge("outer_a", "outer_mid") - builder.add_edge("outer_mid", "outer_b") - builder.add_edge("outer_b", END) - return builder.compile() - - -def build_graph() -> CompiledGraph[OuterState]: - """Top-level graph factory — the convention every example exposes for - CI smoke validation. Builds inner first, then middle (which references - inner), then outer (which references middle). - """ - return build_outer(build_middle(build_inner())) - - -# --------------------------------------------------------------------------- -# Observer — formats events so depth invariants are visually obvious. -# --------------------------------------------------------------------------- - - -def _fmt(state: Any) -> str: - """Compact one-line state dump.""" - if state is None: - return "—" - return f"v={state.v} trace={state.trace}" - - -async def depth_observer(event: NodeEvent) -> None: - """Print every event with depth-aware indentation. - - The leading spaces visualize how deep into the nested subgraphs this - event came from. Number of `parent_states` entries always equals - `len(namespace) - 1` per the §6 invariant. - """ - depth = len(event.namespace) - indent = " " * (depth - 1) - ns = " > ".join(event.namespace) - parents_summary = " | ".join(_fmt(p) for p in event.parent_states) if event.parent_states else "(none)" - - if event.phase == "started": - line = ( - f"{indent}[step {event.step}] depth={depth} ns=[{ns}]\n" - f"{indent} started pre={_fmt(event.pre_state)}\n" - f"{indent} parents: {parents_summary}" - ) - else: # completed - if event.error is not None: - line = ( - f"{indent} completed pre={_fmt(event.pre_state)} " - f"ERROR={type(event.error).__name__}: {event.error}" - ) - else: - line = f"{indent} completed pre={_fmt(event.pre_state)} post={_fmt(event.post_state)}" - print(line) - - -# --------------------------------------------------------------------------- -# Main -# --------------------------------------------------------------------------- - - -async def main() -> None: - outer = build_graph() - - # Observer attached to the OUTER graph fires for every node executed - # during this invocation, including subgraph-internal nodes at depth - # 2 and 3. (See example 04 for graph-attached vs invocation-scoped - # subtleties; this example uses one observer to keep the output clean.) - outer.attach_observer(depth_observer) - - print("=" * 72) - print("Two-level subgraph nesting — observer events at depths 1, 2, 3") - print("=" * 72) - print() - print("Read top-to-bottom. Indentation = depth. Watch the parents list") - print("grow as the engine descends and shrink as it returns.") - print() - - try: - final = await outer.invoke(OuterState()) - finally: - # Drain MUST be awaited in short-lived processes per spec §6 — without - # it, this script could exit before the delivery worker finishes - # processing the queue, losing late events. In a `finally` so failure - # events flush even if invoke() raises. - await outer.drain() - - print() - print("=" * 72) - print("Final outer state (after both subgraphs project back via") - print("default field-name matching):") - print(f" v = {final.v}") - print(f" trace = {final.trace}") - print("=" * 72) - print() - print("Notice in the events above:") - print(" - step counter never resets across subgraph boundaries (0..5)") - print(" - namespace length matches depth at every event") - print(" - parent_states length = depth - 1, always") - print(" - inner_p and inner_q share the same parents list because") - print(" neither outer nor middle is stepping while inner runs") - - -if __name__ == "__main__": - asyncio.run(main()) diff --git a/examples/README.md b/examples/README.md index 2c52d7c..e2d0db4 100644 --- a/examples/README.md +++ b/examples/README.md @@ -1,8 +1,8 @@ # Examples End-to-end demo projects for `openarmature`. Each is a standalone -`main.py` you can run against a local OpenAI-compatible LLM endpoint -(vLLM, LM Studio, llama.cpp server, etc.). +`main.py` you can run against any OpenAI-compatible LLM endpoint +(OpenAI's public API, vLLM, LM Studio, llama.cpp server, etc.). ## Demos @@ -15,38 +15,43 @@ Pydantic class (`response_schema=Classification` → `Response.parsed` as a `Classification` instance), conditional routing on a parsed field, and a compile-time observer. -Configured via env vars (`LLM_BASE_URL`, `LLM_MODEL`, `LLM_API_KEY`); -defaults to OpenAI public API with `gpt-4o-mini`. - -### [`01-linear-pipeline/`](./01-linear-pipeline/main.py) - -Minimal two-node graph (`plan → write`). Demonstrates: typed `State`, -the `append` reducer, static edges, `END`. - -### [`02-routing-and-subgraphs/`](./02-routing-and-subgraphs/main.py) +### [`01-routing-and-subgraphs/`](./01-routing-and-subgraphs/main.py) Question-answering assistant — classify, then short-answer or research-subgraph, then copy-edit. Demonstrates: conditional edges, `SubgraphNode`, custom `ProjectionStrategy`, the `merge` reducer. -### [`03-explicit-subgraph-mapping/`](./03-explicit-subgraph-mapping/main.py) +### [`02-explicit-subgraph-mapping/`](./02-explicit-subgraph-mapping/main.py) Compare two topics by running the same analysis subgraph on each. Demonstrates: `ExplicitMapping` for reusing one compiled subgraph at multiple parent sites with disjoint parent fields. -### [`04-observer-hooks/`](./04-observer-hooks/main.py) +### [`03-observer-hooks/`](./03-observer-hooks/main.py) Add observability to a `draft → review → finalize` pipeline without changing any node code. Demonstrates: `attach_observer`, `NodeEvent`, -namespace chaining across subgraph boundaries, both function-shaped -and class-shaped observers. +namespace chaining across subgraph boundaries, function-shaped and +class-shaped observers, plus the `OTelObserver` running alongside +the plain observer (same hook, different backend). + +### [`04-nested-subgraphs/`](./04-nested-subgraphs/main.py) -### [`05-nested-subgraphs/`](./05-nested-subgraphs/main.py) +Question answering against a tiny baked-in document corpus, with two +levels of subgraph nesting: outer coordinator → doc-QA subgraph → +section-extract subgraph. A depth-aware observer prints the descent +and return. -Two levels of nested subgraphs (`outer → middle → inner`) with a -depth-aware observer printing the descent and return. Demonstrates -spec graph-engine §6 depth invariants. +## Configuration + +All demos configure their LLM client via env vars; OpenAI public-API +defaults shown: + +| Env var | Default | Notes | +| --- | --- | --- | +| `LLM_BASE_URL` | `https://api.openai.com` | **Host root only** — the provider adds the path. | +| `LLM_MODEL` | `gpt-4o-mini` | Any model the bound endpoint exposes. | +| `LLM_API_KEY` | (none) | Required; pass empty for local servers that don't authenticate. | ## Running @@ -54,10 +59,15 @@ spec graph-engine §6 depth invariants. # From the repo root, install the examples dep group: uv sync --group examples +# Demo 03 also wants the OTel SDK for its OTelObserver: +uv sync --group examples --all-extras + # Run any demo: -uv run python examples/01-linear-pipeline/main.py "the psychology of long walks" +LLM_API_KEY=sk-... uv run python examples/00-hello-world/main.py +LLM_API_KEY=sk-... uv run python examples/01-routing-and-subgraphs/main.py "what year did the moon landing happen" ``` -All five demos expect an OpenAI-compatible LLM endpoint at -`http://localhost:8000/v1`. Edit `VLLM_BASE_URL` and `MODEL` at the -top of each `main.py` to point elsewhere. +For a local OpenAI-compatible server (vLLM, LM Studio, llama.cpp, +etc.), point `LLM_BASE_URL` at the host root (e.g. `http://localhost:8000`) +and set `LLM_API_KEY` to whatever value the server expects (often +empty or a placeholder). diff --git a/tests/test_examples_smoke.py b/tests/test_examples_smoke.py index 9a0bac3..fc95d5d 100644 --- a/tests/test_examples_smoke.py +++ b/tests/test_examples_smoke.py @@ -31,11 +31,10 @@ DEMOS = [ "00-hello-world", - "01-linear-pipeline", - "02-routing-and-subgraphs", - "03-explicit-subgraph-mapping", - "04-observer-hooks", - "05-nested-subgraphs", + "01-routing-and-subgraphs", + "02-explicit-subgraph-mapping", + "03-observer-hooks", + "04-nested-subgraphs", ]