Skip to content

feat(pipeline): PipelineEngine checkpoint, audit, and resume (#245 layer 1)#246

Merged
miguelgfierro merged 1 commit into
issue-147-pipeline-evolutionfrom
unify-pipeline-1-engine-lifecycle
May 28, 2026
Merged

feat(pipeline): PipelineEngine checkpoint, audit, and resume (#245 layer 1)#246
miguelgfierro merged 1 commit into
issue-147-pipeline-evolutionfrom
unify-pipeline-1-engine-lifecycle

Conversation

@miguelgfierro
Copy link
Copy Markdown
Contributor

First layer of the unification proposal in #245 — stacked on top of #232.

What this lands

Port-based `PipelineEngine` gains the same checkpointing + audit machinery that `StatePipeline` already has, plus resume via `run(run_id=...)`. They were pipeline-wide concerns living in the wrong place; this lifts them to the engine where both modes (eventually) share them.

New surface

```python
engine = PipelineEngine(
dag,
checkpointer=FileCheckpointer("./ckpt"), # NEW (optional)
audit_log=FileAuditLog("./audit"), # NEW (optional)
event_handler=handler,
)
result = await engine.run(inputs=...)

Resume after failure:

result = await engine.run(run_id=result.run_id)
```

Semantics

  • After every successful node: one checkpoint and one audit entry are written.
  • Failed (non-skipped) nodes only emit an audit entry — they are NOT in the checkpoint's `completed_nodes`, so resume re-runs them.
  • Skipped nodes (condition gate, or SKIP_DOWNSTREAM upstream failure) neither audit nor checkpoint. They didn't do work.
  • `PipelineResult.run_id` is always set (auto-generated if not passed).
  • `run(run_id=X)` with no `inputs`/`context` enters resume mode: loads latest checkpoint, restores `context._results` for completed nodes, schedules only the rest.
  • Resume without a checkpointer → `PipelineError`.
  • Unknown `run_id` → `PipelineError`.

What this does NOT do

  • Does not touch `StatePipeline` — it keeps its own `Checkpointer`/`AuditLog` integration. Convergence happens in later layers.
  • Does not unify the event-handler protocols — `PipelineEventHandler` is unchanged. Protocol unification is its own layer.
  • Does not add state, branching changes, cycles, Pause, or Send.

Implementation notes

  • `_execute_node` now accepts `inputs=` so the same gathered dict can be passed to the step AND captured for the audit snapshot — no double-gather.
  • Checkpoint state shape (port-based): `{"inputs": serialized_pipeline_inputs, "results": {node_id: NodeResult.model_dump("json")}}`. JSON-safe via `_serialize_value` (Pydantic→`model_dump`, primitives passthrough, fallback `str()`).
  • Both checkpoint and audit writes are wrapped in try/except — observability/persistence failures never break business logic, matching the existing StatePipeline behavior.

Tests

13 new tests in `tests/unit/pipeline/test_pipeline_engine_lifecycle.py` covering:

  • run_id generation + explicit pass-through
  • Checkpoint writes per successful node + omitted when no checkpointer
  • Resume completed run = no-op
  • Resume after failure re-runs failed node, skips completed
  • Resume without checkpointer / unknown run_id raises
  • Audit per-node, success + failure, skipped-not-recorded
  • Full-stack checkpoint + audit + resume

Full unit suite: 1544 passed, 21 warnings, 0 failures.

Refs

…yer 1)

First layer of the unification proposal in #245. Port-based PipelineEngine
gains the same checkpoint + audit machinery StatePipeline already has, plus
resume via run(run_id=...).

- PipelineEngine accepts checkpointer + audit_log kwargs
- run_id flows into PipelineResult; auto-generated when not supplied
- After every successful node: checkpoint written and audit entry recorded
- Failed (non-skipped) nodes only audit; resume re-runs them
- Skipped nodes neither audit nor checkpoint (no work happened)
- run(run_id=...) loads the latest checkpoint, restores context.results
  for completed nodes, and continues from successors
- Resume without checkpointer raises PipelineError; unknown run_id raises
- Module-level _serialize_value helper makes checkpoint state and audit
  snapshots JSON-safe (Pydantic -> model_dump, primitives passthrough,
  fallback str())

No changes to StatePipeline. No new public types. PipelineEventHandler
unchanged (protocol unification is deferred to a later layer).

Tests: 13 new in tests/unit/pipeline/test_pipeline_engine_lifecycle.py.
Full suite: 1544 passed.

Refs: #245
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant