Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
dca8fb8
feat(pipeline): state-based PipelineBuilder with checkpoint/resume (#…
miguelgfierro May 27, 2026
37450ea
feat(pipeline): cycles, Send fan-out, Mermaid/JSON export (#147 phase 2)
miguelgfierro May 27, 2026
32e373f
docs(pipeline): state-mode example + docs section, simplify state_pip…
miguelgfierro May 27, 2026
9bc4d39
Merge pull request #233 from fireflyframework/issue-147-phase-2
miguelgfierro May 27, 2026
159a2d4
docs(spec): Phase 3a design — Redis + Postgres checkpointers
miguelgfierro May 27, 2026
2952e43
feat(pipeline): Redis + Postgres checkpointer backends (#147 phase 3a)
miguelgfierro May 27, 2026
cc44d70
Merge pull request #234 from fireflyframework/issue-147-phase-3a
miguelgfierro May 27, 2026
61f6971
chore: remove docs/superpowers/specs/* accidentally force-added past …
miguelgfierro May 27, 2026
6471c60
feat(pipeline): StatePipelineEventHandler + OTel spans (#147 phase 3b)
miguelgfierro May 27, 2026
b0c0f93
Merge pull request #235 from fireflyframework/issue-147-phase-3b
miguelgfierro May 27, 2026
890465a
style(examples): ruff format pipeline_state.py (fixes #232 CI)
miguelgfierro May 27, 2026
0baab84
feat(pipeline): HITL Pause + AuditLog with 4 backends (#147 phase 3c)
miguelgfierro May 27, 2026
9827d72
Merge pull request #236 from fireflyframework/issue-147-phase-3c
miguelgfierro May 27, 2026
9b6a687
refactor(pipeline): collapse invoke() to single return path
miguelgfierro May 28, 2026
f032ba1
Merge pull request #240 from fireflyframework/simplify/state-pipeline…
miguelgfierro May 28, 2026
1e65e82
refactor(pipeline): share psycopg scaffolding between checkpoint and …
miguelgfierro May 28, 2026
236b814
Merge pull request #241 from fireflyframework/simplify/shared-psycopg…
miguelgfierro May 28, 2026
716affe
refactor(pipeline): drop sequence variable and resumed_completed dupl…
miguelgfierro May 28, 2026
b8b9886
Merge pull request #242 from fireflyframework/simplify/invoke-bookkee…
miguelgfierro May 28, 2026
d1f6699
fix(pipeline): assert result narrowed in invoke return path
miguelgfierro May 28, 2026
e7c342f
Merge pull request #243 from fireflyframework/fix/pyright-result-narr…
miguelgfierro May 28, 2026
affeff7
feat(examples): software_factory example + drop Postgres/Redis from f…
miguelgfierro May 28, 2026
ff5bd7c
test(examples): move software_factory smoke test to tests/examples/
miguelgfierro May 28, 2026
f0640e2
Merge pull request #244 from fireflyframework/feat/software-factory-e…
miguelgfierro May 28, 2026
d8c54bb
feat(pipeline): PipelineEngine checkpoint, audit, and resume (#245 la…
miguelgfierro May 28, 2026
8c7f5a5
Merge pull request #246 from fireflyframework/unify-pipeline-1-engine…
miguelgfierro May 28, 2026
0f8b173
feat(pipeline): unified EventHandler protocol (#245 layer 1B)
miguelgfierro May 28, 2026
1880542
Merge pull request #247 from fireflyframework/unify-pipeline-1b-event…
miguelgfierro May 28, 2026
65f7fbe
feat(pipeline): branching as DAGEdge.condition (#245 layer 2)
miguelgfierro May 28, 2026
5f55e8d
Merge pull request #248 from fireflyframework/unify-pipeline-2-edge-c…
miguelgfierro May 28, 2026
bd109a3
feat(pipeline): state as optional overlay on PipelineEngine (#245 lay…
miguelgfierro May 28, 2026
2fd0c0b
Merge pull request #249 from fireflyframework/unify-pipeline-3-state-…
miguelgfierro May 28, 2026
7a6cf0b
feat(pipeline): cycle-aware scheduler and topo-sort safety (#245 laye…
miguelgfierro May 28, 2026
aab6b3c
Merge pull request #250 from fireflyframework/unify-pipeline-4-cycle-…
miguelgfierro May 28, 2026
edaf9f7
feat(pipeline): Pause and Send in unified PipelineEngine (#245 layer 5)
miguelgfierro May 28, 2026
c17c7cf
Merge pull request #251 from fireflyframework/unify-pipeline-5-pause-…
miguelgfierro May 28, 2026
0f1686e
feat(pipeline): start_at kwarg for mid-pipeline entry (#245 layer 6)
miguelgfierro May 28, 2026
292af30
Merge pull request #252 from fireflyframework/unify-pipeline-6-start-at
miguelgfierro May 28, 2026
56d52ec
feat(pipeline): deprecate StatePipeline (#245 layer 7)
miguelgfierro May 28, 2026
f77dbde
Merge pull request #253 from fireflyframework/unify-pipeline-7-deprec…
miguelgfierro May 28, 2026
f09d179
fix(ci): satisfy stricter ruff rules on PR gate
miguelgfierro May 28, 2026
86663ce
Merge pull request #254 from fireflyframework/fix-ci-lint-errors
miguelgfierro May 28, 2026
19d3b63
feat(pipeline): delete StatePipeline, unify on PipelineEngine (#245 l…
miguelgfierro May 28, 2026
15ac615
Merge pull request #255 from fireflyframework/unify-pipeline-8-delete…
miguelgfierro May 28, 2026
fbbadb3
fix(example): software-factory pipeline imports PipelineEngine
miguelgfierro May 28, 2026
11e58f9
Merge pull request #256 from fireflyframework/fix-ci-software-factory…
miguelgfierro May 28, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
326 changes: 323 additions & 3 deletions docs/pipeline.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,17 @@ workflows. It supports parallel execution, conditional branching, retries, timeo
and fan-out/fan-in patterns -- everything needed to model real-world enterprise
processing pipelines.

`PipelineBuilder` has two modes:

* **Port-based** (legacy, parallel) — nodes communicate via `output_key` /
`input_key` edge ports and run concurrently within each topological level.
Best for ETL-shaped DAGs. Documented in the bulk of this guide.
* **State-based** — opt-in via `PipelineBuilder("name", state=SomeModel)`.
Nodes become `async (state) -> dict` over a typed shared state. One
`.branch(source, router)` call covers conditional routing; `Send(target, payload)`
covers runtime fan-out; a `Checkpointer` enables resume after failure. Best for
agentic workflows and ReAct-style loops. See [State-Based Pipelines](#state-based-pipelines).

---

## Concepts
Expand Down Expand Up @@ -88,15 +99,318 @@ The framework provides these built-in executors:
- **CallableStep** -- Wraps any `async` function `(context, inputs) -> output`.
- **BatchLLMStep** -- Processes multiple prompts concurrently through an agent for
cost optimization. See [Batch Processing](#batch-processing-batchllmstep) below.
- **BranchStep** -- Routes execution to one of several downstream paths based on
a predicate (see [Conditional Branching](#conditional-branching-branchstep) below).
- **FanOutStep** -- Splits input into a list for parallel downstream processing.
- **BranchStep** _(deprecated)_ -- Routes execution to one of several downstream paths based on
a predicate. Use `.branch(...)` in [State-Based Pipelines](#state-based-pipelines) instead.
- **FanOutStep** _(deprecated)_ -- Splits input into a list for parallel downstream processing.
Use `Send` in [State-Based Pipelines](#runtime-fan-out-via-send) instead.
- **FanInStep** -- Merges outputs from multiple upstream nodes.

---

## State-Based Pipelines

Set `state=` on `PipelineBuilder` to switch to a declarative API designed for
agentic workflows. Nodes become `async (state) -> dict | None` functions over
a typed shared-state object; the engine reduces each node's partial-update
dict back into the state.

```python
from typing import Annotated
from pydantic import BaseModel
from fireflyframework_agentic.pipeline import PipelineBuilder, append


class AgentState(BaseModel):
messages: Annotated[list[str], append] = [] # reducer: append
intent: str | None = None # default reducer: replace
answer: str | None = None


async def classify(state: AgentState) -> dict:
return {"intent": "complaint" if "refund" in state.messages[-1] else "general"}


async def answer(state: AgentState) -> dict:
return {"answer": "Here is your answer."}


async def escalate(state: AgentState) -> dict:
return {"answer": "Escalated to human."}


def route(state: AgentState) -> str:
return "escalate" if state.intent == "complaint" else "answer"


pipeline = (
PipelineBuilder("support-agent", state=AgentState)
.add_node(classify) # node id derived from fn.__name__
.add_node(answer)
.add_node(escalate)
.branch(classify, route) # router returns target node id
.build()
)
result = await pipeline.invoke(AgentState(messages=["I want a refund"]))
print(result.state.answer)
```

### Reducers

Reducers are declared as `Annotated[T, reducer_fn]` on the state schema. The
built-ins live in `fireflyframework_agentic.pipeline.reducers`:

| Reducer | Semantics |
|---------------|-------------------------------------------------|
| `replace` | Last-write-wins (the default for any field). |
| `append` | Append a single item to a list. |
| `extend` | Concatenate two iterables. |
| `merge_dict` | Shallow-merge two dicts; update wins on conflict. |

Custom reducers are any callable `(current, update) -> merged`.

### Branching

`.branch(source, router, mapping=None)` registers a synchronous
`(state) -> str | Send | list[Send]` router on `source`:

* Returning a node id (string) routes to that node directly.
* Passing `mapping={"label": target_node, ...}` lets the router return an
abstract label instead of a node id.
* Returning a `Send` or `list[Send]` triggers runtime fan-out (see below).

### Checkpoint + Resume

Pass a `Checkpointer` to persist state after each successful node. Three
backends ship out of the box, all conforming to the same `Checkpointer`
Protocol so they're swappable without code changes.

| Backend | Use when | Trade-off | Install |
|---|---|---|---|
| `FileCheckpointer` | Dev, single-host, ephemeral | No cross-process / cross-host sharing | (default — no extra) |
| `RedisCheckpointer` | Multi-worker, sub-day-scale runs | TTL eviction; not durable forever | `pip install fireflyframework-agentic[redis]` |
| `PostgresCheckpointer` | Long-lived runs, compliance, audit-friendly | Operational overhead of a DB | `pip install fireflyframework-agentic[postgres]` |

```python
from fireflyframework_agentic.pipeline import FileCheckpointer # or Redis / Postgres

pipeline = (
PipelineBuilder("software-factory", state=BuildState,
checkpointer=FileCheckpointer("./checkpoints"))
.add_node(architect)
.add_node(python_dev)
.add_node(deployer)
.add_node(evaluator)
.chain(architect, python_dev, deployer, evaluator)
.build()
)

# Fresh run
result = await pipeline.invoke(BuildState(requirements="user-mgmt service"))

# Resume after crash — picks up at the failed node, skips completed ones
result = await pipeline.invoke(run_id=result.run_id)

# Or jump into a specific node with explicit state
result = await pipeline.invoke(state=loaded_state, start_at=deployer)
```

Swapping backends is a one-line change. Redis uses a TTL on each checkpoint
key (default 30 days) plus a sorted-set index of run IDs; Postgres uses a
single `firefly_checkpoints` table created idempotently on first save:

```python
from fireflyframework_agentic.pipeline import RedisCheckpointer, PostgresCheckpointer

# Either a URL/DSN (backend constructs its own client) or a pre-built client
# (lets you share a connection pool across many pipelines).
checkpointer = RedisCheckpointer(url="redis://localhost:6379/0", ttl_seconds=86400 * 30)
checkpointer = RedisCheckpointer(client=my_existing_redis)
checkpointer = PostgresCheckpointer(dsn="postgresql://user:pw@host/db")
checkpointer = PostgresCheckpointer(connection=my_existing_psycopg_connection)
```

### Cycles and `recursion_limit`

State pipelines permit cycles for ReAct loops and retry-with-critique patterns.
The builder accepts `recursion_limit` (default 25) as a safety net — a runaway
loop surfaces as `result.success=False` with a clean error, not an infinite hang.

```python
def route(state):
return "done" if state.counter >= 3 else "step"

PipelineBuilder("loop", state=LoopState, recursion_limit=25)
.add_node(step).add_node(done).branch(step, route).build()
```

### Runtime Fan-Out via `Send`

A router may return `list[Send(target, payload)]` to dispatch multiple
invocations of the same (or different) workers concurrently. Each Send's
payload is applied to a copy of the current state before its target runs;
results reduce back into shared state. Replaces the legacy `FanOutStep`.

```python
from fireflyframework_agentic.pipeline import Send

def dispatch(state):
return [Send("worker", {"item": x}) for x in state.items]

PipelineBuilder("mapreduce", state=MapReduceState)
.add_node(planner).add_node(worker).add_node(collect)
.add_edge(worker, collect)
.branch(planner, dispatch)
.build()
```

When all worker targets share a common successor, the engine continues there
once the fan-out completes; the aggregator runs once with all results in
shared state.

### Observability

State pipelines emit lifecycle callbacks and OTel spans so ops can see what
an agent workflow is doing in real time.

`StatePipelineEventHandler` mirrors the legacy `PipelineEventHandler` but
every callback carries the `run_id` (so events can be correlated across
resumes) and `on_node_start` carries a per-node visit counter (so cyclic
graphs and `Send` fan-outs are distinguishable). Implement any subset of
methods; missing ones are no-ops.

```python
from fireflyframework_agentic.pipeline import PipelineBuilder, StatePipelineEventHandler


class ProgressHandler:
async def on_pipeline_start(self, name, run_id):
print(f"▶ [{name}] run {run_id} starting")

async def on_node_start(self, name, run_id, node_id, visit):
print(f" ▶ {node_id} (visit #{visit})")

async def on_node_complete(self, name, run_id, node_id, latency_ms):
print(f" ✔ {node_id} ({latency_ms:.0f}ms)")

async def on_node_error(self, name, run_id, node_id, error):
print(f" ✗ {node_id}: {error}")

async def on_pipeline_complete(self, name, run_id, success, duration_ms):
status = "OK" if success else "FAILED"
print(f"═ [{name}] {status} in {duration_ms:.0f}ms")


pipeline = (
PipelineBuilder("agent", state=AgentState, event_handler=ProgressHandler())
.add_node(classify).add_node(answer).add_node(escalate)
.branch(classify, route)
.build()
)
```

In parallel, the pipeline emits OTel spans automatically when
`observability_enabled` is True and `opentelemetry` is installed:

- One pipeline-level span `pipeline.state.<name>` around each `invoke`,
attributes `firefly.pipeline`, `firefly.run_id`.
- One per-node span `pipeline.state.node.<node_id>` for each `fn(state)`
call, parented under the pipeline span, attributes `firefly.node`,
`firefly.visit`.
- For `Send` fan-out: one per-Send span as a sibling under the pipeline span.

Handler exceptions are swallowed — observability never breaks business logic.

### Human-in-the-loop (Pause)

Any node may return ``Pause(reason="...")`` instead of a state update to halt
the pipeline cleanly. The current state is checkpointed with a paused marker;
``invoke`` returns with ``result.paused=True`` and ``result.success=False``.

```python
from fireflyframework_agentic.pipeline import Pause

async def await_deploy_approval(state: DeployState) -> Pause:
return Pause(reason="awaiting human approval to deploy to production")
```

To resume after the external approval comes in, call ``invoke`` with the same
``run_id`` and ``approve_pause=True``. Without ``approve_pause=True``, the
resume raises a ``PipelineError`` — the pause is sticky until explicitly
released. The successor of the paused node runs next; the pause node itself
is not re-executed.

```python
first = await pipeline.invoke(DeployState(...))
assert first.paused
# ...later, after approval...
done = await pipeline.invoke(run_id=first.run_id, approve_pause=True)
assert done.success
```

The configured ``StatePipelineEventHandler`` receives an ``on_node_pause``
callback when this happens (the callback is optional — partial handlers
without it continue to work).

### Audit Log

Distinct from the ``Checkpointer`` (which stores the *latest* state for
crash recovery), an ``AuditLog`` is an append-only record of *every* node
visit for compliance, debugging, and replay. Wire one in via the
``audit_log`` kwarg:

```python
from fireflyframework_agentic.pipeline import (
PipelineBuilder, FileAuditLog, PostgresAuditLog, LoggingAuditLog, OtelAuditLog,
)

PipelineBuilder("agent", state=AgentState, audit_log=FileAuditLog("./audit"))
```

Four backends ship, each conforming to the ``AuditLog`` Protocol:

| Backend | Use when | Read API | Trace-correlated | Install |
|---|---|---|---|---|
| ``FileAuditLog`` | Dev / single-host | yes | no | (default) |
| ``PostgresAuditLog`` | Compliance, retention, cross-run queries | yes | no | ``[postgres]`` |
| ``LoggingAuditLog`` | Generic log stacks (Splunk-HEC, Loki, JSON-logging) | no (write-only) | no | (default — stdlib) |
| ``OtelAuditLog`` | OTel-native stacks (Application Insights, Datadog APM, OTel Collector) | no (write-only) | **yes** | ``opentelemetry-sdk`` |

``FileAuditLog`` and ``PostgresAuditLog`` also implement
``QueryableAuditLog`` with ``list_entries(pipeline_name, run_id)``. The
write-only backends delegate query/search to the user's existing
observability stack.

Audit-log write failures are non-fatal — logged but never abort the
pipeline.

### Mermaid Export

`StatePipeline.to_mermaid()` and `DAG.to_mermaid()` render the topology as a
Mermaid flowchart. Branch edges declared with an explicit mapping show their
label; dynamic routers are noted as such.

### When to use which mode

| Use port-based when… | Use state-based when… |
|----------------------|------------------------|
| Pure ETL: parallel, fan-out/fan-in, no shared state | Agentic workflow: classify → branch → respond / loop / retry |
| Each step's input is a single value from the previous step | Multiple agents reading/writing different fields of a shared object |
| You want the engine to run independent nodes concurrently | You want resume-after-failure and start-from-middle semantics |
| You're happy with `BranchStep` + per-node `condition` lambdas | You want one `.branch(...)` call and inspectable routing |

See [`examples/pipeline_state.py`](../examples/pipeline_state.py) for a
runnable demo covering branching, software-factory checkpoint/resume, and
map-reduce fan-out.

---

## Parallel Execution (Fan-Out / Fan-In)

> **`FanOutStep` is deprecated.** For runtime fan-out (one dispatch per item,
> arbitrary count), prefer `Send` from [State-Based Pipelines](#runtime-fan-out-via-send).
> `FanOutStep` still works for now (it emits a `DeprecationWarning` on
> construction); `FanInStep` is not deprecated.

```mermaid
graph TD
SPLIT[Fan-Out] --> W1[Worker 1]
Expand Down Expand Up @@ -248,6 +562,12 @@ dag.add_node(DAGNode(

### Conditional Branching (BranchStep)

> **Deprecated.** Prefer [State-Based Pipelines](#state-based-pipelines) with
> `.branch(source, router)` — one call instead of `BranchStep` + per-node
> `condition` lambdas, and the topology becomes inspectable as data.
> `BranchStep` still works (it emits a `DeprecationWarning` on construction);
> removal will be tracked in a follow-up issue once internal callers migrate.

`BranchStep` provides router-based conditional branching. The router callable
receives the node's input and returns a string key. Downstream nodes use
condition gates to check the branch key and execute only the matching path.
Expand Down
2 changes: 2 additions & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ If `OPENAI_API_KEY` is not set, each script will prompt you interactively.
## Pipeline Examples

- **`pipeline_branching.py`** — `BranchStep` for conditional routing in a DAG, `PipelineEventHandler` for live progress, and `DAGNode.backoff_factor` for exponential retry backoff. **No API key required.**
- **`pipeline_state.py`** — Three short scenarios with the state-based `PipelineBuilder` (`state=` mode): sentiment branching with `.branch()`, map-reduce with `Send` fan-out, and a HITL deploy gate using `Pause` plus `FileAuditLog`. **No API key required.**
- **`software_factory/`** — Self-contained example package showing a state-mode agentic SDLC pipeline (`architect → codegen → builder → qa → stable_release`) with the QA feedback loop (`recursion_limit=3`), checkpoint + resume on a transient `builder` failure, and a `StatePipelineEventHandler` printing progress. Includes plug-and-play `Checkpointer` Protocol implementations for Postgres and Redis under `checkpointers/`, and a `QueryableAuditLog` Postgres template under `audit/`. **No API key required.**

## Complex Examples

Expand Down
Loading
Loading