Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
201 changes: 0 additions & 201 deletions examples/01-linear-pipeline/main.py

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@
custom `ProjectionStrategy` for the parent ↔ subgraph boundary, and the
`merge` reducer for dict accumulation.

This is the second demo in the series. It exercises three graph features
that `01-linear-pipeline/` didn't:
Three graph features that `00-hello-world` only touched lightly:

1. **Conditional edges.** The entry node classifies the question and the
graph routes to one of two branches based on that classification.
Expand All @@ -22,27 +21,33 @@
from its own schema's defaults. To pass the user's question in (and
shape what comes back out), we write a `ProjectionStrategy` by hand.

And for good measure it also demonstrates the **merge reducer** for dict
accumulation, alongside the **append reducer** already seen in 01-linear-pipeline.
LLM calls go through ``openarmature.llm.OpenAIProvider`` (same pattern as
``00-hello-world``) so the example reads as the recommended path rather
than as "openai with some openarmature on top."

**Configuration** (env vars; OpenAI defaults shown):

- ``LLM_BASE_URL`` defaults to ``https://api.openai.com``. **Host root
only** — the provider adds the path itself.
- ``LLM_MODEL`` defaults to ``gpt-4o-mini``.
- ``LLM_API_KEY`` required (empty for local servers that don't authenticate).

Run with:
uv run python main.py "what year did the moon landing happen" # → quick branch
uv run python main.py "is espresso actually more caffeinated than drip?" # → research branch

uv sync --group examples
cd examples/01-routing-and-subgraphs
LLM_API_KEY=sk-... uv run python main.py "what year did the moon landing happen"
LLM_API_KEY=sk-... uv run python main.py "is espresso actually more caffeinated than drip?"
"""

from __future__ import annotations

import asyncio
import os
import sys
from collections.abc import Mapping
from typing import Annotated, Any

from openai import AsyncOpenAI
from openai.types.chat import (
ChatCompletionMessageParam,
ChatCompletionSystemMessageParam,
ChatCompletionUserMessageParam,
)
from pydantic import Field

from openarmature.graph import (
Expand All @@ -54,11 +59,22 @@
append,
merge,
)
from openarmature.llm import OpenAIProvider, SystemMessage, UserMessage

# Lazy-initialized so importing this module (test harnesses, doc builders,
# IDE inspection) doesn't open an httpx.AsyncClient connection pool.
_provider_instance: OpenAIProvider | None = None

VLLM_BASE_URL = "http://localhost:8000/v1"
MODEL = "dark-side-of-the-code/Mistral-Small-24B-Instruct-2501-AWQ"

client = AsyncOpenAI(base_url=VLLM_BASE_URL, api_key="not-needed")
def _get_provider() -> OpenAIProvider:
global _provider_instance
if _provider_instance is None:
_provider_instance = OpenAIProvider(
base_url=os.environ.get("LLM_BASE_URL", "https://api.openai.com"),
model=os.environ.get("LLM_MODEL", "gpt-4o-mini"),
api_key=os.environ.get("LLM_API_KEY") or None,
)
return _provider_instance


# ----------------------------------------------------------------------------
Expand All @@ -77,9 +93,9 @@
# - Boundaries are auditable. To find "what does the subgraph see?" you read
# one projection class, not a scattered naming convention.
#
# Both schemas below use the same reducer patterns we introduced in
# 01-linear-pipeline: `append` on a `trace` list, `merge` on a dict. Fields without
# an `Annotated[..., reducer]` get `last_write_wins` by default.
# Both schemas below use the standard reducer set: `append` on the
# `trace` list, `merge` on a dict. Fields without an
# `Annotated[..., reducer]` get `last_write_wins` by default.


class AssistantState(State):
Expand All @@ -103,30 +119,27 @@ class ResearchState(State):


# ----------------------------------------------------------------------------
# LLM helper (not openarmature — plumbing)
# LLM helper
# ----------------------------------------------------------------------------
# Thin wrapper over Provider.complete that takes a system + user pair and
# returns the assistant's reply as a string. Keeps the node bodies focused
# on graph logic (state in → state update out) rather than provider
# plumbing. Production code would typically inline the call.


async def _chat(system: str, user: str) -> str:
messages: list[ChatCompletionMessageParam] = [
ChatCompletionSystemMessageParam(role="system", content=system),
ChatCompletionUserMessageParam(role="user", content=user),
]
resp = await client.chat.completions.create(
model=MODEL,
messages=messages,
temperature=0.3,
stream=False,
response = await _get_provider().complete(
[SystemMessage(content=system), UserMessage(content=user)],
)
return (resp.choices[0].message.content or "").strip()
return (response.message.content or "").strip()


# ----------------------------------------------------------------------------
# Outer-graph nodes
# ----------------------------------------------------------------------------
# Every node is the same shape as in 01-linear-pipeline: `async def(state) -> dict`,
# returning ONLY the fields it wants to change. The engine applies per-field
# reducers and re-validates.
# Standard node shape: `async def(state) -> dict`, returning ONLY the
# fields it wants to change. The engine applies per-field reducers and
# re-validates.
#
# Three things worth noticing as you read these:
#
Expand Down Expand Up @@ -334,9 +347,8 @@ def build_research_subgraph() -> CompiledGraph[ResearchState]:
#
# - `project_in`: DELIBERATELY LIMITED. It builds a fresh subgraph state
# from its schema's defaults — `subgraph_state_cls()`. The parent's
# state is ignored. This is the spec's default and it's an explicit
# design choice (v0.1.1 §2 Subgraph): subgraphs don't see the outer
# world unless the author opts in.
# state is ignored. Subgraphs don't see the outer world unless the
# author opts in — encapsulation is the point.
#
# For this demo we absolutely need the question in the subgraph. So we write
# a projection class that implements the `ProjectionStrategy` Protocol (see
Expand Down Expand Up @@ -444,15 +456,19 @@ def build_graph() -> CompiledGraph[AssistantState]:
async def main() -> None:
question = " ".join(sys.argv[1:]) or "is espresso actually more caffeinated than drip coffee?"
graph = build_graph()
final = await graph.invoke(AssistantState(question=question))

print(f"question: {final.question}")
print(f"route: {final.route}")
print()
print(f"answer:\n{final.answer}")
print()
print(f"trace: {final.trace}")
print(f"tallies: {final.tallies}")
try:
final = await graph.invoke(AssistantState(question=question))
print(f"question: {final.question}")
print(f"route: {final.route}")
print()
print(f"answer:\n{final.answer}")
print()
print(f"trace: {final.trace}")
print(f"tallies: {final.tallies}")
finally:
await graph.drain()
if _provider_instance is not None:
await _provider_instance.aclose()


if __name__ == "__main__":
Expand Down
Loading