Skip to content

feat(pipeline): state as optional overlay on PipelineEngine (#245 layer 3)#249

Merged
miguelgfierro merged 1 commit into
issue-147-pipeline-evolutionfrom
unify-pipeline-3-state-overlay
May 28, 2026
Merged

feat(pipeline): state as optional overlay on PipelineEngine (#245 layer 3)#249
miguelgfierro merged 1 commit into
issue-147-pipeline-evolutionfrom
unify-pipeline-3-state-overlay

Conversation

@miguelgfierro
Copy link
Copy Markdown
Contributor

Third layer of the unification (#245). `PipelineEngine` now accepts optional `state_schema=` and `state=` arguments. Stacked on layer 2 (#248) which is already merged into `issue-147-pipeline-evolution`.

This layer reclaims parallelism for state-aware pipelines — the answer to the "state-mode is sequential" tradeoff discussed in #245. The existing topological scheduler now runs disjoint state-writing nodes concurrently, and the reducer machinery handles concurrent writes.

New surface

```python
class FactoryState(BaseModel):
spec: str
test_results: Annotated[list[TestRun], extend] = []
critique_history: Annotated[list[str], append] = []

engine = PipelineEngine(dag, state_schema=FactoryState)
result = await engine.run(state=FactoryState(spec="build a CRM"))
print(result.final_state.test_results)
```

Semantics

Node return Without `state_schema` With `state_schema`
`None` No port output No-op
`dict` Port output (legacy) State update via reducers
Other value Port output (legacy) Port output (legacy)

Both modes coexist on the same node — a single node can choose its return shape to either update state or emit a port value.

Parallelism

Concurrent writes are reconciled by the reducer declared on that field:

Reducer Behavior under parallel writes
`replace` (default) Last-write-wins; race-prone for replace.
`append` Commutative; both writes land.
`extend` Commutative; both writes land.
`merge_dict` Commutative; both keys present.

In-flight tasks see the state snapshot they were scheduled with; updates apply sequentially in done-task order, so commutative reducers are race-free regardless of completion order.

Engine changes

  • `PipelineEngine(init)` accepts `state_schema: type[BaseModel] | None`. Caches `discover_reducers(schema)` at construction.
  • `PipelineEngine.run()` accepts `state: BaseModel | None`. When `state_schema` is set: validates `state`, auto-instantiates from defaults if `None`, exposes on `ctx.state`.
  • After each successful node: if return is dict and `state_schema` is set, `apply_update` reduces it into `context.state`.
  • `_save_checkpoint` persists `context.state` under `"shared_state"`.
  • `_load_for_resume` restores `context.state` from `"shared_state"`.
  • `PipelineResult` gets `final_state` field.
  • `PipelineContext` gets `state: Any = None` attribute.

Refactor (no behavior change)

  • `discover_reducers` and `apply_update` lifted from `state_pipeline.py` to `reducers.py` so both `StatePipeline` and `PipelineEngine` can share them without circular imports.
  • `StatePipeline` now imports them from `reducers.py`. Behavior unchanged — verified by 37 existing state-pipeline tests.

Software-factory parallel-build example

```python
class FactoryState(BaseModel):
frontend_files: Annotated[dict[str, str], merge_dict] = {}
backend_files: Annotated[dict[str, str], merge_dict] = {}

dag = DAG("factory")
dag.add_node(DAGNode(node_id="architect", step=Architect()))
dag.add_node(DAGNode(node_id="frontend", step=FrontendDev()))
dag.add_node(DAGNode(node_id="backend", step=BackendDev()))
dag.add_edge(DAGEdge(source="architect", target="frontend")) # parallel
dag.add_edge(DAGEdge(source="architect", target="backend")) # parallel

engine = PipelineEngine(dag, state_schema=FactoryState)
```

frontend and backend run concurrently after architect; their state writes target disjoint fields (`frontend_files`, `backend_files`) and are merged with `merge_dict`. This is parallelism that the legacy `StatePipeline` could not give you.

Tests

12 new tests in `tests/unit/pipeline/test_pipeline_engine_state_overlay.py`:

  • Baseline (no `state_schema` = unchanged)
  • Auto-instantiation from defaults
  • Explicit `state` arg
  • Dict return as state update (replace, append, extend, merge_dict)
  • Non-dict return is still a port output (state untouched)
  • Edge condition reading `ctx.state`
  • Parallel nodes writing same field via `append` (commutative-reducer parallelism)
  • Resume restores shared state from checkpoint
  • Audit log works under state overlay

Full suite: 1573 passed.

What's NOT in this PR

  • Build-time validation of unsafe parallel writes (two nodes that could run at the same level writing the same `replace`-reduced field). Defer to a follow-up — needs a `.writes=` annotation on `DAGNode` or static analysis of return types.
  • Migrating `StatePipeline` to use `PipelineEngine(state_schema=...)` internally. Layer 7.
  • Removing `StatePipeline`. Layer 8.

Refs

…er 3)

PipelineEngine now accepts an optional state_schema= and state= argument.
When configured, nodes that return a dict have it merged into a shared
Pydantic state object via reducers; non-dict returns continue to flow
through edges as port outputs. Both modes coexist on the same node.

This is the layer that reclaims parallelism for state-aware pipelines:
the existing topological scheduler runs disjoint state-writing nodes
concurrently, and concurrent writes to the same field are reconciled by
the reducer declared on that field. Commutative reducers (append,
extend, merge_dict) are race-free; replace is last-write-wins.

Engine changes:

- PipelineEngine(__init__) accepts state_schema: type[BaseModel] | None.
  Caches discover_reducers(schema) at construction.
- PipelineEngine.run() accepts state: BaseModel | None. When state_schema
  is set: validates state, instantiates from defaults if None, exposes on
  ctx.state.
- After each successful node, if return value is dict and state_schema
  is set, apply_update reduces it into context.state.
- _save_checkpoint persists context.state under "shared_state".
- _load_for_resume restores context.state from "shared_state".
- PipelineResult gets final_state field — populated from context.state.

Refactor:

- discover_reducers and apply_update lifted from state_pipeline.py to
  reducers.py so both StatePipeline and PipelineEngine can share them
  without a circular import. state_pipeline.py now imports them.
- PipelineContext gets `state: Any = None` attribute.
- StatePipeline behavior unchanged — verified by 37 existing state-pipeline tests.

Tests: 12 new in tests/unit/pipeline/test_pipeline_engine_state_overlay.py
covering:
- baseline (no state_schema = unchanged behavior)
- auto-instantiation from defaults
- explicit state arg
- dict return as state update (replace, append, extend, merge_dict)
- non-dict return as port output (state untouched)
- edge condition reading ctx.state
- parallel nodes writing same field via append (commutative-reducer parallelism)
- resume restores shared state
- audit log works under state overlay

Full suite: 1573 passed.

Refs: #245
@miguelgfierro miguelgfierro merged commit 2fd0c0b into issue-147-pipeline-evolution May 28, 2026
@miguelgfierro miguelgfierro deleted the unify-pipeline-3-state-overlay branch May 28, 2026 12:49
ancongui pushed a commit that referenced this pull request May 31, 2026
…overlay

feat(pipeline): state as optional overlay on PipelineEngine (#245 layer 3)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant