feat(pipeline): state-based PipelineBuilder — full #147 stack (phase 1+2+3a+3b+3c)#232
Merged
Conversation
phase 1) Evolve PipelineBuilder so a single API covers both legacy port-based parallel DAGs and declarative state-based agentic pipelines. State mode is opt-in via the new `state=` kwarg and produces a StatePipeline; existing port-based pipelines are unchanged. New surface: - PipelineBuilder(name, state=SomeModel, checkpointer=...): nodes become async (state) -> dict | None over a typed shared state. - add_node(fn): function references derive the node id from __name__. - .branch(source, router, mapping=None): unified branching. With no mapping, the router returns a target node id directly; with a mapping, it returns an abstract label that maps to a node. - Annotated[T, reducer_fn] field annotations declare merge semantics (append/extend/merge_dict; default replace). - Auto-entry detection picks the first node added; override via start_at. - FileCheckpointer persists state after each successful node; invoke(run_id=...) resumes from the latest checkpoint, skipping completed nodes. - invoke(state, start_at=node) jumps into a pipeline mid-flow with explicit state. - Agent-like objects (anything with async run(state)) drop in as nodes. The state-based executor is sequential. Port-based parallel DAGs continue to use PipelineEngine unchanged. Phase 2 (frontier executor + cycles + Send, Mermaid/JSON export, typed-edge Literal validation, soft-deprecation of BranchStep/FanOutStep, Redis/Postgres checkpointers) lands separately. Tests: 14 new in tests/unit/pipeline/test_state_pipeline.py covering linear/branching/reducers/checkpoint-resume/start_at/agent-adapter. Full pipeline suite (88) and unit suite (1405) green.
Adds the agentic-loop and fan-out features deferred from phase 1, plus
visualization on the DAG, plus soft-deprecation of the legacy branching/
fan-out steps. Phase 1 surface is preserved; everything here is additive.
What's new:
- Send(target, payload) dataclass. Routers can return list[Send] for
runtime fan-out: workers run concurrently with their own payload-merged
state copy; results reduce back into shared state.
- Cycles supported in state mode. PipelineBuilder(state=...) constructs
the underlying DAG with allow_cycles=True so a node can route back to
itself for ReAct loops / retry-with-critique.
- recursion_limit kwarg on PipelineBuilder (default 25). Per-node visit
counter aborts runaway cycles with a clean failure result.
- DAG.to_mermaid() / DAG.to_json() for any DAG.
- StatePipeline.to_mermaid() that adds branch-edge labels from the
registered mapping.
- BranchStep and FanOutStep emit DeprecationWarning pointing to
.branch(...) and Send(...). Existing pipelines continue to work.
API additions exported from pipeline.__init__:
Send, RecursionLimitError
Tests: 9 new in test_state_pipeline_phase2.py covering loop-with-exit,
recursion_limit, map-reduce-style fan-out, unknown-target error,
Mermaid/JSON output, and deprecation warning emission. Pipeline suite
now 97 passed (88 phase-1 + 9 phase-2); full unit suite 1405 passed.
Ruff check + format clean. Pyright clean on touched modules.
…eline - examples/pipeline_state.py: runnable demo covering branching, software-factory with checkpoint/resume, and Send-based map-reduce. No API key needed. - docs/pipeline.md: new "State-Based Pipelines" section documenting state schema, reducers, .branch, checkpoint/resume, recursion_limit, Send fan-out, Mermaid export, and a "when to use which mode" comparison. Mark BranchStep / FanOutStep as deprecated with pointers to the new API. - state_pipeline.py: small simplifications from the simplifier pass — dead 'rendered' set removed from to_mermaid, _common_successor uses direct equality instead of set intersection, dropped unused last_node_id tracker, _NodeFailureError as a plain Exception subclass instead of @DataClass. 97 pipeline tests still green; ruff check + format clean.
feat(pipeline): cycles, Send fan-out, Mermaid/JSON export (#147 phase 2)
Adds two durable Checkpointer implementations alongside FileCheckpointer. Single file (pipeline/checkpoint.py), guarded optional imports, no API changes to StatePipeline. Existing FileCheckpointer users are unaffected. - RedisCheckpointer (sync redis-py): SET+EX per checkpoint, ZADD/ZRANGE index of run_ids. TTL configurable, default 30 days. Accepts url= or a pre-built client= (for shared pools). - PostgresCheckpointer (sync psycopg3): single firefly_checkpoints table created idempotently on first save; INSERT … ON CONFLICT DO UPDATE for saves; SELECT … ORDER BY sequence DESC LIMIT 1 for load_latest. Accepts dsn= or a pre-built connection=. table_name is validated to prevent SQL injection from a misconfigured caller. - pyproject.toml: psycopg[binary]>=3 added to the existing `postgres` extra alongside asyncpg; `redis` extra already present. - Tests use unittest.mock only — no fakeredis, no testcontainers. A parametrized software-factory scenario runs across all three backends; per-backend tests verify the right calls are issued and that missing deps surface a clear ImportError naming the extra to install. - examples/pipeline_state.py: optional fourth scenario gated on PG_DSN env var demonstrating PostgresCheckpointer. - docs/pipeline.md: backend-comparison table + code snippet for swapping backends. Tests: 17 new in test_checkpoint_backends.py covering per-backend behaviour + cross-backend conformance. Full pipeline suite 114 passed (88 phase-1+2 baseline + 9 phase-2 features + 17 new phase-3a). Lints clean, pyright clean on touched modules.
feat(pipeline): Redis + Postgres checkpointer backends (#147 phase 3a)
Adds observability to state pipelines, mirroring the legacy PipelineEngine story but with state-mode semantics: run_id is plumbed through every callback, on_node_start carries a per-node visit counter, and there is no on_node_skip (state pipelines abort on failure rather than skipping). - New StatePipelineEventHandler Protocol in pipeline/engine.py with on_pipeline_start / on_node_start / on_node_complete / on_node_error / on_pipeline_complete. Partial handlers are valid (hasattr-checked). - Per-node OTel spans nested under one pipeline-level span. The existing _start_otel_span helper on PipelineEngine is lifted to a module-level start_otel_span function shared by both pipeline types. - PipelineBuilder gains an event_handler kwarg that flows into StatePipeline. - Fan-out via Send emits per-Send node-start/complete pairs with each Send's own visit number (snapshot at increment time, not post-loop). - Handler exceptions are swallowed — observability never breaks business logic. Tests: 8 new in test_state_pipeline_observability.py covering event ordering, failure path, cyclic visit counts, fan-out per-Send events, resume-from-checkpoint, partial handler, swallowed handler exceptions, and OTel span emission with attribute snapshots. Example: examples/pipeline_state.py gains a ProgressHandler that prints live progress for the software-factory scenario. Verification: - pytest tests/unit/pipeline/ → 122 passed (114 baseline + 8 new) - ruff check + format clean - pyright clean on touched modules
feat(pipeline): observability for state pipelines (#147 phase 3b)
Two enterprise features for state pipelines: human-in-the-loop pause/approve
gates, and a structured audit trail of every node visit.
HITL via Pause:
- New Pause(reason=...) sentinel returned by a node halts the pipeline cleanly.
- StatePipeline writes a paused checkpoint (new optional CheckpointRecord
fields: paused=False, pause_reason=None — backward compatible).
- StatePipelineResult gains paused/paused_node/pause_reason.
- StatePipelineEventHandler gains an optional on_node_pause callback.
- invoke(run_id=..., approve_pause=True) resumes from the SUCCESSOR of the
paused node. Without approve_pause=True, resuming a paused run raises
PipelineError — pauses are sticky until explicitly released.
Audit log via four backends in new pipeline/audit.py:
- AuditEntry pydantic model + split Protocol: AuditLog (write-only) +
QueryableAuditLog (adds list_entries).
- FileAuditLog — JSONL per (pipeline, run_id). Implements QueryableAuditLog.
- PostgresAuditLog — single firefly_audit table, idempotent DDL, reuses the
psycopg connection from Phase 3a's postgres extra. Implements QueryableAuditLog.
- LoggingAuditLog — stdlib logging.Logger; pairs with any log-aggregation
stack (Splunk-HEC, Loki, Datadog, OTel-LoggingHandler-bridge). Write-only.
No new dep.
- OtelAuditLog — OTel logs API directly; emits LogRecord with trace_id /
span_id correlation. Requires opentelemetry-sdk. Write-only.
- StatePipeline records one AuditEntry per node visit (success/error/pause)
with inputs_snapshot, outputs_snapshot, latency_ms, started_at/completed_at,
status, plus error_message or pause_reason as appropriate.
- Audit-write failures are non-fatal — logged and swallowed.
API additions exported from fireflyframework_agentic.pipeline:
Pause, AuditEntry, AuditLog, QueryableAuditLog,
FileAuditLog, PostgresAuditLog, LoggingAuditLog, OtelAuditLog
No new required deps. Postgres extra (already added in 3a) covers
PostgresAuditLog. opentelemetry-sdk is the same optional dep already used
by Phase 3b for OTel spans.
Tests: 20 new across two files.
- test_state_pipeline_hitl.py (6): pause halts pipeline; resume without
approval raises; resume with approve_pause continues from successor;
on_node_pause fires; backward-compat for old checkpoints; pause→fail→retry.
- test_audit_log.py (14): per-backend (File JSONL, Postgres mocked,
Logging via caplog, OTel via mocked logger); pipeline writes one entry
per visit; status reflects success/error/paused; audit write failures
don't abort the pipeline.
Full pipeline suite 142 passed (122 baseline + 20 new). Lints clean,
pyright clean on touched modules. Example pipeline_state.py gains a 5th
scenario showing HITL + audit end-to-end.
feat(pipeline): HITL Pause + AuditLog (#147 phase 3c)
Replace 7 _finalize_run call sites in StatePipeline.invoke with a single try/finally that ends the pipeline span and emits on_pipeline_complete once. Remove the _finalize_run helper. Behavior unchanged; net -32 LOC.
…-invoke-single-return refactor(pipeline): collapse invoke() to single return path
…audit Extract the duplicated Postgres setup (optional-dep guard, dsn-xor-connection check, table-name validation, lazy idempotent DDL) into a single PsycopgBackend base class in pipeline/_psycopg_backend.py. PostgresCheckpointer and PostgresAuditLog now inherit from it; each only declares its DDL and default table name. Tests updated to monkeypatch _psycopg on the shared module.
…r 4) Fourth layer of the unification. PipelineEngine gains the ability to run cyclic DAGs (ReAct loops, retry-with-critique) and the long-standing silent corruption in topological_sort/execution_levels on cyclic graphs is fixed. Engine changes: - PipelineEngine(__init__) accepts recursion_limit: int = 25 (matches StatePipeline). Bounds visit count per node in cyclic mode. - run() detects cyclic DAGs via dag.is_cyclic() and routes to a new _run_cyclic helper. Acyclic graphs use the existing topological scheduler unchanged. - _run_cyclic: sequential frontier-following. Picks the unique alive outgoing edge from each completed node, increments per-node visit count, enforces recursion_limit. Fan-out to multiple alive edges raises a clear PipelineError (multi-target cyclic fan-out arrives with Send in layer 5). - _record_audit accepts visit=, defaulting to 1 for the acyclic scheduler. The cyclic scheduler passes the actual visit number so audit entries distinguish iterations. DAG changes (silent-corruption fix from #245's review): - topological_sort() now raises PipelineError on cyclic graphs instead of returning a wrong-length list with a misleading "should not reach here" message. - execution_levels() now raises PipelineError on cyclic graphs instead of silently producing incomplete levels. Both methods document is_cyclic() as the right pre-check. Tests: 7 new in tests/unit/pipeline/test_pipeline_engine_cycles.py covering: - topological_sort raises on cyclic DAGs - execution_levels raises on cyclic DAGs - ReAct-style finite loop terminates correctly - Recursion limit halts runaway cycle - Default recursion_limit is 25 - Audit visit number increments per iteration - Acyclic DAG with allow_cycles=True still uses the parallel scheduler Full suite: 1580 passed. Refs: #245
…scheduler feat(pipeline): cycle-aware scheduler and topo-sort safety (#245 layer 4)
Fifth layer of the unification. PipelineEngine now recognizes the same control sentinels that StatePipeline uses today: - A node returning Pause(reason=...) halts the pipeline cleanly. The run resumes with engine.run(run_id=..., approve_pause=True). - A node returning Send or list[Send] triggers parallel fan-out where each Send's target runs concurrently with the payload merged into a per-worker state copy. Reducers merge worker outputs back into shared state. Refactor (no behavior change): - Pause and Send dataclasses moved from state_pipeline.py to engine.py so PipelineEngine can recognize them without a circular import. state_pipeline.py now imports them from engine. Public re-exports from fireflyframework_agentic.pipeline are unchanged. Engine changes: - run() accepts approve_pause: bool = False kwarg. Resuming a paused checkpoint without approve_pause=True raises PipelineError with the pause reason. - _save_checkpoint accepts paused=, pause_reason= kwargs, persisted on CheckpointRecord (the fields landed in Layer 1A). - _load_for_resume enforces the approve_pause gate. - Main loop branches on Pause: emits on_node_pause event, checkpoints with paused=True, sets pending_pause and aborts. - Main loop branches on Send: dispatches workers via the new _run_sends helper, marks workers as completed so the scheduler does not re-run them. - _run_sends: validates target IDs up front; per-worker PipelineContext with its own state copy (payload applied via reducers); asyncio.gather across workers; results merge back into shared state via reducers; any worker failure aborts the pipeline. - _is_send_payload helper at module level. Result changes: - PipelineResult gains paused: bool, paused_node: str | None, pause_reason: str | None. Mirrors StatePipelineResult. Tests: 7 new in tests/unit/pipeline/test_pipeline_engine_pause_send.py covering: - Pause halts the pipeline and records paused state in result + checkpoint - Resume without approve_pause=True raises - Resume with approve_pause=True continues from the paused node's successor - list[Send] dispatches workers concurrently with per-worker state copies - Single Send is treated as list[Send] of one - Unknown Send target marks the pipeline as failed - Pause and Send remain re-exported from the pipeline package Full suite: 1587 passed. Refs: #245
…send feat(pipeline): Pause and Send in unified PipelineEngine (#245 layer 5)
Sixth layer of the unification. PipelineEngine.run() accepts start_at= (string node id or callable reference) to begin execution mid-DAG. Nodes not reachable from start_at are treated as pre-completed and skipped. Resume (run_id) and approve_pause already landed in layers 1A and 5; this completes the entry-control kwargs. Engine changes: - run() accepts start_at: str | Callable | None = None. - New _resolve_node_id helper at module level. - pre_completed initialized from all_nodes - forward(start_at). - Works with state overlay, edge conditions, and cyclic mode. Tests: 7 new in tests/unit/pipeline/test_pipeline_engine_start_at.py. Full suite: 1594 passed. Refs: #245
feat(pipeline): start_at kwarg for mid-pipeline entry (#245 layer 6)
Seventh layer of the unification. StatePipeline now emits a DeprecationWarning on construction pointing users at PipelineEngine configured with state_schema= as the unified replacement. PipelineEngine has feature parity with StatePipeline after layers 1-6: state overlay, reducers, Pause, Send, cycles, recursion_limit, checkpointing, audit log, resume, start_at — plus parallel state-aware execution that StatePipeline could not provide. Changes: - StatePipeline.__init__ raises DeprecationWarning with migration text. - import warnings added at module level. - Class docstring includes a deprecation notice and migration example. StatePipeline still works — all 37 existing state-pipeline tests pass unchanged (they emit the new DeprecationWarning but continue to operate). Tests: 1 new in tests/unit/pipeline/test_state_pipeline_deprecation.py verifying the warning is raised and references PipelineEngine + #245. Full suite: 1595 passed. Layer 8 (full deletion of state_pipeline.py and dual-mode logic in builder.py) is tracked as follow-up — it requires migrating the 37 state-pipeline tests to the unified PipelineEngine surface and translating PipelineBuilder.branch(source, router, mapping) into conditional DAGEdges, which is more invasive than this deprecation layer warrants. Refs: #245
…ate-state-pipeline feat(pipeline): deprecate StatePipeline (#245 layer 7)
Three Ruff rules that the project ruff CI enforces but my local ruff missed (newer CI version): - SIM103 in engine.py:_is_send_payload — collapse the second if/return pair into a direct return. - F841 in test_pipeline_engine_pause_send.py — remove an unused 'planner' variable that was leftover from an earlier draft of test_single_send_is_treated_as_list_of_one. - N817 in same test file — rename 'Pause as P' / 'Send as S' to 'PausePkg'/'SendPkg' (single-letter CamelCase aliases trip the acronym rule). No behavior change. Tests still 7/7 in that file.
fix(ci): satisfy stricter ruff rules on PR gate (#232)
…ayer 8) Final layer of the unification. StatePipeline + state_pipeline.py + the dual-mode logic in builder.py are gone; PipelineEngine is the only executor. PipelineBuilder(state=...) now constructs a PipelineEngine configured with state_schema, recursion_limit, audit_log, checkpointer, event_handler, and any routers registered via .branch(...). No deprecation cycle needed: StatePipeline never landed on main (introduced in #232 which is the umbrella PR). All consumers move at the same time. Engine changes: - PipelineEngine accepts routers: dict[str, RouterFn] and router_mappings: dict[str, dict[str, str]]. When non-empty (or the DAG is cyclic), the engine routes through the cyclic frontier scheduler. - New _cyclic_next consults routers first, then falls back to edge conditions. - _resolve_router_decision lifts the router-return → next-step translation from state_pipeline. - _run_cyclic now handles Pause, Send, and list[Send] returns alongside the dict/state-update path. - _run_sends accepts visit_counts; per-Send visit numbers are tracked per target so observability shows the right counter across fan-out. - PipelineEngine.to_mermaid renders branch labels from router_mappings. - PipelineEngine.invoke(state, ...) shorthand mirrors the StatePipeline.invoke signature so test migration stays mechanical. - _record_audit accepts status_override + pause_reason so a Pause-returning node is audited as "paused" rather than "success". - Span names use "pipeline.state.*" when state_schema is set (matches the legacy taxonomy observability dashboards already key on). - on_node_start is now emitted by the schedulers (acyclic main loop, cyclic loop, _run_sends) — removed from _execute_node so the visit number is correct in every scheduler. - _load_for_resume returns list[str] (ordered) instead of set[str] so PipelineResult.completed_nodes after resume reflects the original execution order. - Resume seeds trace_entries with the pre_completed nodes so PipelineResult.completed_nodes (now derived from execution_trace) includes the full history. Builder changes: - PipelineBuilder.build() always returns PipelineEngine. The state-mode branch wires state_schema/routers/router_mappings into the engine. - New _StateStepAdapter wraps a state-mode fn into a StepExecutor so the unified engine's _execute_node can run it. - _coerce_state_node_fn (moved from state_pipeline.py) keeps the "async def, def, or .run(state) object" forms working. - _StateNodePlaceholder is gone — state-mode nodes now run real code. Result changes: - PipelineResult grows state-mode convenience properties: state (alias of final_state), completed_nodes (derived from execution_trace so cyclic visits appear individually), failed_node, error. Deletions: - fireflyframework_agentic/pipeline/state_pipeline.py (~750 LOC) - StatePipeline, StatePipelineResult, StatePipelineEventHandler, RecursionLimitError, BranchSpec, StateNodeFn (in state_pipeline) - _StateNodePlaceholder (in builder) - tests/unit/pipeline/test_state_pipeline_deprecation.py (Layer 7's warning is gone too) - " StatePipeline," imports and isinstance assertions across the state-pipeline test files. Migration notes (internal to this PR): - Pause and Send already live in engine.py since Layer 5; this PR just removes the state_pipeline re-export. - StatePipelineEventHandler removed; existing usages either implement EventHandler (the unified protocol) or the legacy PipelineEventHandler. Full unit suite: 1594 passed. Net diff: -507 LOC (-955 deleted, +448 added). Refs: #245
…-state-pipeline feat(pipeline): delete StatePipeline, unify on PipelineEngine (#245 layer 8)
| * ``reason`` — human-readable string; for skips and pauses. | ||
| """ | ||
|
|
||
| async def on_pipeline_start(self, pipeline_name: str, run_id: str) -> None: ... |
|
|
||
| async def on_pipeline_start(self, pipeline_name: str, run_id: str) -> None: ... | ||
|
|
||
| async def on_node_start(self, pipeline_name: str, run_id: str, node_id: str, visit: int) -> None: ... |
|
|
||
| async def on_node_start(self, pipeline_name: str, run_id: str, node_id: str, visit: int) -> None: ... | ||
|
|
||
| async def on_node_complete(self, pipeline_name: str, run_id: str, node_id: str, latency_ms: float) -> None: ... |
|
|
||
| async def on_node_complete(self, pipeline_name: str, run_id: str, node_id: str, latency_ms: float) -> None: ... | ||
|
|
||
| async def on_node_error(self, pipeline_name: str, run_id: str, node_id: str, error: str) -> None: ... |
|
|
||
| async def on_node_error(self, pipeline_name: str, run_id: str, node_id: str, error: str) -> None: ... | ||
|
|
||
| async def on_node_skip(self, pipeline_name: str, run_id: str, node_id: str, reason: str) -> None: ... |
Layer 8 (#255) deleted StatePipeline but I missed the import + return type in examples/software_factory/pipeline.py. CI on #232 fails to collect tests/examples/software_factory/test_pipeline.py with: ImportError: cannot import name 'StatePipeline' from 'fireflyframework_agentic.pipeline' Fix: import PipelineEngine instead and drop the now-unnecessary cast. PipelineBuilder.build() returns PipelineEngine directly after Layer 8. Test passes locally.
…-example fix(example): software-factory pipeline imports PipelineEngine (post-#255)
ancongui
pushed a commit
that referenced
this pull request
May 31, 2026
ancongui
pushed a commit
that referenced
this pull request
May 31, 2026
fix(ci): satisfy stricter ruff rules on PR gate (#232)
ancongui
pushed a commit
that referenced
this pull request
May 31, 2026
…ayer 8) Final layer of the unification. StatePipeline + state_pipeline.py + the dual-mode logic in builder.py are gone; PipelineEngine is the only executor. PipelineBuilder(state=...) now constructs a PipelineEngine configured with state_schema, recursion_limit, audit_log, checkpointer, event_handler, and any routers registered via .branch(...). No deprecation cycle needed: StatePipeline never landed on main (introduced in #232 which is the umbrella PR). All consumers move at the same time. Engine changes: - PipelineEngine accepts routers: dict[str, RouterFn] and router_mappings: dict[str, dict[str, str]]. When non-empty (or the DAG is cyclic), the engine routes through the cyclic frontier scheduler. - New _cyclic_next consults routers first, then falls back to edge conditions. - _resolve_router_decision lifts the router-return → next-step translation from state_pipeline. - _run_cyclic now handles Pause, Send, and list[Send] returns alongside the dict/state-update path. - _run_sends accepts visit_counts; per-Send visit numbers are tracked per target so observability shows the right counter across fan-out. - PipelineEngine.to_mermaid renders branch labels from router_mappings. - PipelineEngine.invoke(state, ...) shorthand mirrors the StatePipeline.invoke signature so test migration stays mechanical. - _record_audit accepts status_override + pause_reason so a Pause-returning node is audited as "paused" rather than "success". - Span names use "pipeline.state.*" when state_schema is set (matches the legacy taxonomy observability dashboards already key on). - on_node_start is now emitted by the schedulers (acyclic main loop, cyclic loop, _run_sends) — removed from _execute_node so the visit number is correct in every scheduler. - _load_for_resume returns list[str] (ordered) instead of set[str] so PipelineResult.completed_nodes after resume reflects the original execution order. - Resume seeds trace_entries with the pre_completed nodes so PipelineResult.completed_nodes (now derived from execution_trace) includes the full history. Builder changes: - PipelineBuilder.build() always returns PipelineEngine. The state-mode branch wires state_schema/routers/router_mappings into the engine. - New _StateStepAdapter wraps a state-mode fn into a StepExecutor so the unified engine's _execute_node can run it. - _coerce_state_node_fn (moved from state_pipeline.py) keeps the "async def, def, or .run(state) object" forms working. - _StateNodePlaceholder is gone — state-mode nodes now run real code. Result changes: - PipelineResult grows state-mode convenience properties: state (alias of final_state), completed_nodes (derived from execution_trace so cyclic visits appear individually), failed_node, error. Deletions: - fireflyframework_agentic/pipeline/state_pipeline.py (~750 LOC) - StatePipeline, StatePipelineResult, StatePipelineEventHandler, RecursionLimitError, BranchSpec, StateNodeFn (in state_pipeline) - _StateNodePlaceholder (in builder) - tests/unit/pipeline/test_state_pipeline_deprecation.py (Layer 7's warning is gone too) - " StatePipeline," imports and isinstance assertions across the state-pipeline test files. Migration notes (internal to this PR): - Pause and Send already live in engine.py since Layer 5; this PR just removes the state_pipeline re-export. - StatePipelineEventHandler removed; existing usages either implement EventHandler (the unified protocol) or the legacy PipelineEventHandler. Full unit suite: 1594 passed. Net diff: -507 LOC (-955 deleted, +448 added). Refs: #245
ancongui
pushed a commit
that referenced
this pull request
May 31, 2026
Layer 8 (#255) deleted StatePipeline but I missed the import + return type in examples/software_factory/pipeline.py. CI on #232 fails to collect tests/examples/software_factory/test_pipeline.py with: ImportError: cannot import name 'StatePipeline' from 'fireflyframework_agentic.pipeline' Fix: import PipelineEngine instead and drop the now-unnecessary cast. PipelineBuilder.build() returns PipelineEngine directly after Layer 8. Test passes locally.
ancongui
pushed a commit
that referenced
this pull request
May 31, 2026
…ution feat(pipeline): state-based PipelineBuilder — full #147 stack (phase 1+2+3a+3b+3c)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Closes #147 and #245. Follow-ups: #237 / #238 / #239 / #257.
This PR extends the existing
PipelineEngine(the port-based DAG that was already in the repo) into a single engine that also handles agentic workflows. Nothing was thrown away: every previous capability is preserved, and a set of new capabilities is built on top of the same DAG, the same scheduler, and the samePipelineEngineclass.What we already had
The repo shipped a port-based DAG runtime:
PipelineEngineexecuted aDAGmade ofDAGNode+DAGEdgeobjects.StepExecutorobjects (CallableStep,BranchStep,FanInStep,FanOutStep) connected by typed ports — each node's outputs flowed into the named inputs of downstream nodes.asyncio.gather.What it could not express was anything that needed shared mutable state, cycles, runtime fan-out over a dynamic work list, human-in-the-loop pauses, or durable resume across crashes. Those are exactly the shapes an agentic / IDP / "software factory" workflow needs.
What we built on top of the DAG
Every new feature is an additive extension of the same engine. The port-based mode keeps working unchanged; activating the new features is opt-in.
1. A shared state object that travels with the DAG
PipelineBuilder("name", state=SomeModel)attaches a Pydantic state schema to the pipeline. Each node receives the current state, returns adictupdate, and the engine merges the update into the live state object via per-field reducers:Reducers ship in
pipeline/reducers.py:replace,append,extend,merge_dict. Custom ones are just functions.State sits alongside ports — when state mode is on, the engine still schedules topologically and still passes ports through. State is an overlay, not a replacement.
2. Branching as a first-class builder call
The original way to branch was
BranchStep(...)plus per-edgeconditionlambdas. That still works, but it was verbose. The new shape is one call on the builder:Internally
.branch()compiles the router function intoDAGEdge.conditionpredicates, so the scheduler stays unchanged — it just skips edges whose predicate is false.3. Cycles in the DAG
The original topological scheduler refused cycles. We added a cycle-aware scheduler that runs alongside the topological one and engages only when the DAG actually has back-edges. A
recursion_limitguards against runaway loops (default 25, configurable per build).This is what lets us express ReAct loops, retry-with-critique, QA feedback loops, planner ↔ executor ping-pong, etc.
4.
Send— runtime fan-out with per-worker state copiesA node may return
Send(target, payload)orlist[Send]. The engine then dispatches N workers concurrently — each gets its own state copy with the Send's payload merged in. When the workers finish, their state updates fold back into the shared state through the same reducers.This is dynamic fan-out (the count is decided at runtime by the planner). The original
FanOutStepwas static — it just returned a list and let downstream topology do whatever it did.Sendmakes "spawn N parallel sub-agents for a dynamic work list" a first-class operation.5.
Pause— human-in-the-loop gatesA node may return
Pause(reason="..."). The engine halts cleanly, writes a checkpoint withpaused=True, and returns. Callinginvoke(run_id=..., approve_pause=True)later resumes from the successor of the paused node (the pause node itself is not re-executed). Withoutapprove_pause=Truea paused run raisesPipelineError— silent skips are not allowed.6. Checkpointing + resume
A
CheckpointerProtocol with three production backends:FileCheckpointer— JSONL on disk.RedisCheckpointer— SET+EX per checkpoint, ZADD/ZRANGE run-id index, configurable TTL.PostgresCheckpointer— singlefirefly_checkpointstable, idempotent DDL,INSERT … ON CONFLICT DO UPDATE.Checkpoints are written automatically after each successful node.
invoke(run_id="...")resumes from the last good checkpoint;invoke(state, start_at="node")jumps mid-pipeline for debugging or replay.7. Audit log — separate from checkpoint
A
Checkpointeranswers "where do I resume from?". AnAuditLoganswers "what happened?". They are distinct concerns, so they live in distinct protocols and distinct files (pipeline/audit.py).Each
AuditEntryrecordsinputs_snapshot,outputs_snapshot,latency_ms,started_at,completed_at,status(success/error/paused),error_message,pause_reason, and avisitcounter. Four backends:FileAuditLog(JSONL, queryable)PostgresAuditLog(queryable)LoggingAuditLog(stdliblogging, plays nicely with Splunk / Loki / Datadog / OTel-LoggingHandler)OtelAuditLog(OTel logs API withtrace_id/span_idcorrelation)Audit-write failures are non-fatal — logged and swallowed. Compliance / debugging / replay don't get to break business logic.
8. A unified
EventHandlerprotocol + OTel spansOne
EventHandlerProtocol now covers both port-based and state-based pipelines:on_pipeline_start/on_node_start/on_node_complete/on_node_error/on_node_pause/on_pipeline_complete. Every callback carriesrun_id;on_node_startcarries avisitcounter so cycles andSendfan-out are distinguishable in observability tools.Per-pipeline and per-node OTel spans are emitted automatically. Partial handlers (only some methods defined) work via
hasattr-checked dispatch. Handler exceptions are swallowed.9. Mermaid + JSON export
DAG.to_mermaid()andDAG.to_json()render the graph including branch conditions ("if?" labels), which is helpful when looking at a non-trivial agentic pipeline you didn't write.What new capabilities are added
Each shape below names the concrete extension on the engine that enables it, so it's clear which piece of the PR unlocks which capability.
Software factory / IDP — architect → codegen → builder → qa, with a QA feedback cycle back to codegen capped at 3 iterations. Lives in
examples/software_factory/.Enabled by: state overlay (so qa can read/write
qa_status),.branch()(qa_router picks the next node), the cycle-aware scheduler (qa → codegen is a back-edge), andrecursion_limit(caps the loop). The original DAG had none of these — it would have rejected the back-edge at build time.ReAct loops — planner ↔ executor with bounded recursion.
Enabled by: the cycle-aware scheduler +
recursion_limit. The original topological scheduler rejected cycles outright.Retry-with-critique — generator → critic → (if "redo") generator.
Enabled by:
.branch()driving a back-edge into the cycle-aware scheduler. Same primitive as ReAct; different shape.Map-reduce over a dynamic work list — planner returns
list[Send(...)], the engine dispatches N workers concurrently each with its own state copy + payload, and reducers fold the results back into shared state.Enabled by: the
Sendsentinel + per-worker state copies + reducers. The originalFanOutStepreturned a static list and let downstream topology handle parallelism; it could not vary the worker count at runtime or hand each worker an isolated state slice.Human-in-the-loop approval gates — build →
Pause("await approval")→ deploy. The pause is checkpointed; the run resumes days later withinvoke(run_id=..., approve_pause=True); every node visit (including the pause itself, withstatus="paused") lands in the audit log.Enabled by: the
Pausesentinel, theCheckpointerProtocol carryingpaused=True/pause_reason, and theAuditLogProtocol recording the gate visit. None of the three existed in the original DAG.Crash-survivable long-running pipelines — the engine auto-writes a checkpoint after every successful node;
invoke(run_id=...)picks up from the last good checkpoint without re-running completed nodes.Enabled by: the
CheckpointerProtocol +FileCheckpointer/RedisCheckpointer/PostgresCheckpointerbackends + resume logic in the engine. Survival across crashes is bounded by the chosen backend's durability — Postgres is fully transactional; Redis depends on its persistence config (AOF/RDB); the file backend survives the process but not the disk.Compliance and post-mortem investigation — every node visit is recorded with input snapshot, output snapshot, latency, status, error or pause reason, visit counter, and (with
OtelAuditLog) trace/span IDs. Queryable backends (FileAuditLog,PostgresAuditLog) let you walk the trail node-by-node after the fact.Enabled by: the
AuditLogProtocol + its four backends. Note: the audit log captures what happened; it is not a one-call replay engine. To re-run a single node, combine the captured input snapshot withstart_at=below.Mid-pipeline debug entry —
invoke(state, start_at="failing_node")reproduces one node in isolation against a hand-supplied state (typically reconstructed from the audit log).Enabled by: the
start_at=kwarg added in unification layer 6 (feat(pipeline): start_at kwarg for mid-pipeline entry (#245 layer 6) #252). The original DAG ran from the source nodes only.How this PR landed
This integration PR is the merge of five phases plus an eight-layer unification, all originally cut against
issue-147-pipeline-evolution:Phases (built the new capabilities, initially as a parallel
StatePipelineclass):CheckpointerProtocol,FileCheckpointer, branching as.branch(), reducersSendfan-out, Mermaid / JSON exportRedisCheckpointer+PostgresCheckpointerEventHandlerprotocol + OTel spansPause(HITL) + four-backendAuditLogUnification (#245, eight layers — folded
StatePipelineinto the existingPipelineEngine):After phase 3c shipped, we had two engines side by side: the original
PipelineEngine(parallel, port-based) and the newStatePipeline(sequential, state-based). They duplicated checkpointing, audit, events, and OTel. Issue #245 collapsed them in eight stacked PRs:PipelineEngine.EventHandlerprotocol shared by both modes.DAGEdge.condition.PipelineEngine.PipelineEngine; topological sort raises on cycles instead of silently truncating.PauseandSendrecognized by the unified engine.start_at=kwarg for mid-pipeline entry.StatePipelinedeprecated (engine became feature-equivalent).StatePipelineentirely (~750 LOC).PipelineBuilder.build()always returnsPipelineEngine.Companion fixes inside the branch: #254 (stricter ruff rules surfaced by the PR gate) and #256 (the software_factory example was still importing the deleted
StatePipeline).Earlier cleanup on the same branch: #240 (collapse
invoke()to a single return path), #241 (share psycopg scaffolding betweencheckpoint.pyandaudit.py), #242 (drop dead variables), #243 (assert result narrowing), #244 (extractsoftware_factoryexample, drop Postgres/Redis from default deps).End result: one engine, one scheduler, one set of events, one checkpoint format. State-mode pipelines get topological parallelism for free when the graph permits and only fall back to the cyclic scheduler when there are actual cycles.
How to run the example
The canonical example is
examples/software_factory/— an IDP-style pipeline (architect → codegen → builder → qa → release) with a QA feedback loop,recursion_limit=3, a progress event handler, andFileCheckpointer. Plain-Python "agents", no API key required.It exercises: state-based nodes,
.branch(), cycles (qa → codegen on fail), event-handler progress output, and checkpoint/resume.Extending the IDP example (
examples/idp_pipeline.py)The IDP example (
ingest → split → classify → extract → validate → assemble → explain) is wired today as a plain linear.chain()ofCallableStepnodes with per-nodetimeout_secondsand oneretry_max=1. Per-document work is done withasyncio.gather()inside the steps. With this PR landed, almost all of that can be expressed as DAG structure instead. Natural extensions, highest-value first:DAGEdge(condition=...)so a low-confidenceclassifyresult routes to a review path, and unsupported sub-doc types skipextract/validatestraight toassemble. Skipped nodes surface ason_node_skipevents instead of vanishing inside Python branching.Send-based fan-out + reducers instead of in-stepasyncio.gather. Rewriteextractto emit[Send("extract_one", {doc: d}) for d in subdocs]against a typed state schema (Annotated[list[Result], extend]). Each sub-document becomes a first-class node with its own trace span, retry, and timeout — real map/reduce instead of an opaque single node.Pausefor human-in-the-loop. When grounding ratio / validation drops below threshold, returnPause(reason="needs human review"), checkpoint, and resume withapprove_pause=True— replacing the implicit "validation failed → Reflexion" path with an explicit gate.FileCheckpointer(or Postgres) so a crashed run resumes at the last completed node, orstart_at="extract"to skip the expensive 300s stage on re-run.raw_text/sub_documents/classificationsfacts stuffed intoMemoryManager/metadata with aPipelineBuilder(state=IDPState)Pydantic model + reducers — type-checked data flow, and a prerequisite for theSendfan-out above.FailureStrategy. Push the current per-subdoc try/except into the DAG:failure_strategy=SKIP_DOWNSTREAM(orPROPAGATE) so a failed optional stage (e.g.explain) doesn't abort the run.Net effect: the example moves from "proves a pipeline runs" to "demonstrates branching, map/reduce, HITL, and resumability" — the features that justify the DAG over a
forloop.Extending flydocs
flydocs's
PipelineOrchestratorbuilds a freshPipelineEngineper request, but still usesbuilder.chain(*chain)with optional stages conditionally appended in Python and per-task concurrency hand-rolled viaasyncio.gather()inside each stage. The engine in this PR unlocks several things that previously required custom code or weren't possible:chainlist per request, make the DAG static and letDAGEdge(condition=...)decide which ofdiscover/classify/judge/ authenticity fire from request config — skipped nodes then emiton_node_skipevents instead of being silently absent.Send+ reducers. Today per-segment concurrency lives inside a stage, so traces only have per-stage granularity. Convertingextract/judge/ authenticity fan-out toSendworker nodes yields per-task spans, per-task retry, and per-task timeout — observability that wasn't achievable without restructuring.Pause/ resume. A "needs human confirmation" gate (low-confidence judge, failed authenticity) becomes a native pipeline pause + checkpoint instead of a bespoke worker + EDA event.PostgresCheckpointer+start_at=lets a revived job resume at the last completed stage — e.g. skip the 600s extract that already finished. Directly complementary to the existing reliability hardening.judge_escalationas a real cycle. Replace the hardcoded "if judge fail-rate > threshold, re-run extract+judge with a stronger model" stage withallow_cycles=True+ a router andrecursion_limit: extract → judge → router → (escalate back to extract | exit when pass-rate met).RuleEngineruns its owngraphlib.TopologicalSorterto evaluate business rules level-by-level — a second orchestration layer that could move onto the engine'sexecution_levels()+ concurrent scheduler, removing the duplication. Worth weighing against the simplicity of the currentgraphlibapproach.Verification
Local:
pytest tests/unit/→ 1594 passed. Zero regressions in port-based DAG tests; the 37 state-pipeline tests were migrated ontoPipelineEngineduring layer 8.pytest tests/unit/pipeline/→ all green (port-based + state-based + checkpoint backends + observability + HITL + audit + unification layers).ruff check+ruff format --checkclean.pyrightclean on touched modules.CI on this PR: all 8 checks green (Lint, Pre-commit, Test, Type Check, CodeQL).
What's NOT in this PR (tracked follow-ups)
pipeline.stream(). Generator-based progress API for chat UIs / SSE / websockets. ~500 LOC executor refactor.Literal[...]return annotations. Catches branch-mapping typos at.build()instead of first invocation. ~100 LOC.BranchStep/FanOutStep. Cleanup sweep across Signature repos. Those classes stay (explicit user direction) — they keep working, the migration is just to shake out usability issues in the new APIs.FanInStep/Sendnaming asymmetry. Discussion-only issue exploring whether to renameFanInStep→MergeStepwhen [#147 phase 3d] Migrate internal callers off BranchStep / FanOutStep to state-mode API #239 lands.Also out of scope:
BranchStep/FanOutStep. They stay (deprecation warning); explicit user direction.unittest.mockonly.