Skip to content

feat(pipeline): Redis + Postgres checkpointer backends (#147 phase 3a)#234

Merged
miguelgfierro merged 2 commits into
issue-147-pipeline-evolutionfrom
issue-147-phase-3a
May 27, 2026
Merged

feat(pipeline): Redis + Postgres checkpointer backends (#147 phase 3a)#234
miguelgfierro merged 2 commits into
issue-147-pipeline-evolutionfrom
issue-147-phase-3a

Conversation

@miguelgfierro
Copy link
Copy Markdown
Contributor

Stacked on issue-147-pipeline-evolution (the merged Phase 1+2 branch). Part of #147, phase 3a.

Design spec: docs/superpowers/specs/2026-05-27-pipeline-phase-3a-design.md.

What

Two durable Checkpointer backends alongside the existing FileCheckpointer, all conforming to the same Protocol so they're swappable without touching StatePipeline. Single file (pipeline/checkpoint.py), guarded optional imports — existing FileCheckpointer users are unaffected.

from fireflyframework_agentic.pipeline import RedisCheckpointer, PostgresCheckpointer

# URL/DSN or a pre-built client (lets you share a pool across many pipelines)
RedisCheckpointer(url=\"redis://localhost:6379/0\", ttl_seconds=86400 * 30)
RedisCheckpointer(client=my_existing_redis)
PostgresCheckpointer(dsn=\"postgresql://user:pw@host/db\")
PostgresCheckpointer(connection=my_existing_psycopg_connection)

Schemas

Redis — one key per checkpoint with TTL, plus a sorted-set index of run IDs:

  • firefly:ckpt:{pipeline}:{run_id}:{seq:06d}_{node_id} → JSON record (TTL configurable, default 30 days)
  • firefly:ckpt:{pipeline}:runs → ZSET of run_ids scored by last-update timestamp

Postgres — single table created idempotently on first save:

CREATE TABLE IF NOT EXISTS firefly_checkpoints (
  pipeline_name TEXT, run_id TEXT, sequence INT, node_id TEXT,
  state JSONB, completed_nodes JSONB, created_at TIMESTAMPTZ DEFAULT now(),
  PRIMARY KEY (pipeline_name, run_id, sequence)
);

Uses INSERT … ON CONFLICT DO UPDATE for saves. The table_name constructor arg is validated against ^[A-Za-z0-9_]+\$ to prevent SQL injection from a misconfigured caller.

Optional dependencies

[project.optional-dependencies]
redis    = [\"redis>=5.2.0\"]                            # already present
postgres = [\"asyncpg>=0.30.0\", \"sqlalchemy>=2.0.0\",
            \"psycopg[binary]>=3.2.0,<4\"]               # psycopg added for the sync Checkpointer Protocol

The postgres extra already existed for the async memory store (asyncpg); this PR adds psycopg[binary] alongside it because the Checkpointer Protocol's methods are sync. One extra, two consumers.

Without the relevant extra installed, importing RedisCheckpointer / PostgresCheckpointer succeeds (the classes always exist as names) but constructing them raises a clear ImportError:

ImportError: RedisCheckpointer requires the 'redis' extra.
             Install with: pip install fireflyframework-agentic[redis]

Tests — unittest.mock only

No fakeredis, no testcontainers, no new dev deps. Real-service verification is out-of-band by the user against actual Redis and Postgres servers.

tests/unit/pipeline/test_checkpoint_backends.py (17 tests):

  • Per-backend behaviour: save issues the right calls (SET key value EX ttl, ZADD index ts run_id for Redis; CREATE TABLE IF NOT EXISTS once + INSERT … ON CONFLICT DO UPDATE for Postgres). load_latest round-trips. list_runs returns the index contents.
  • Missing-dep tests: monkeypatch the guarded _redis / _psycopg to None, confirm ImportError names the install command.
  • Validation tests: rejecting both url= and client=, rejecting neither, rejecting unsafe table_name.
  • Cross-backend Protocol conformance: the canonical software-factory scenario (architect → python_dev → deployer-that-fails-first → resume → evaluator) is parametrized across FileCheckpointer (real, tmp_path), RedisCheckpointer (MagicMock with in-memory dict), PostgresCheckpointer (MagicMock connection with in-memory dict). Catches Protocol drift between backends.

Docs + example

  • docs/pipeline.md Checkpoint+Resume section gains a backend-comparison table and a code snippet showing the swap.
  • examples/pipeline_state.py gains an optional fourth scenario gated on the PG_DSN env var. No-op when unset, so the example still runs anywhere.

Verification

  • pytest tests/unit/pipeline/114 passed (97 baseline + 17 new)
  • pytest tests/unit/1405 passed (no regressions)
  • ruff check + ruff format --check clean
  • pyright clean on touched files
  • Smoke: in a venv without either extra installed, from fireflyframework_agentic.pipeline import RedisCheckpointer, PostgresCheckpointer succeeds; constructing either raises the documented ImportError naming the install command. (Verified live in the firefly venv where neither extra is installed.)

Stacking

Base = issue-147-pipeline-evolution (the merged Phase 1+2 branch). Once #232 lands on main, this PR's base will retarget to main automatically.

What lands next

  • Phase 3bStatePipelineEventHandler Protocol + OTel spans per state-pipeline node.
  • Phase 3cPause(reason) HITL primitive + AuditLog Protocol (Postgres impl reuses the connection introduced here).

Adds two durable Checkpointer implementations alongside FileCheckpointer.
Single file (pipeline/checkpoint.py), guarded optional imports, no API
changes to StatePipeline. Existing FileCheckpointer users are unaffected.

- RedisCheckpointer (sync redis-py): SET+EX per checkpoint, ZADD/ZRANGE
  index of run_ids. TTL configurable, default 30 days. Accepts url= or a
  pre-built client= (for shared pools).
- PostgresCheckpointer (sync psycopg3): single firefly_checkpoints table
  created idempotently on first save; INSERT … ON CONFLICT DO UPDATE for
  saves; SELECT … ORDER BY sequence DESC LIMIT 1 for load_latest. Accepts
  dsn= or a pre-built connection=. table_name is validated to prevent SQL
  injection from a misconfigured caller.
- pyproject.toml: psycopg[binary]>=3 added to the existing `postgres`
  extra alongside asyncpg; `redis` extra already present.
- Tests use unittest.mock only — no fakeredis, no testcontainers. A
  parametrized software-factory scenario runs across all three backends;
  per-backend tests verify the right calls are issued and that missing
  deps surface a clear ImportError naming the extra to install.
- examples/pipeline_state.py: optional fourth scenario gated on PG_DSN
  env var demonstrating PostgresCheckpointer.
- docs/pipeline.md: backend-comparison table + code snippet for swapping
  backends.

Tests: 17 new in test_checkpoint_backends.py covering per-backend
behaviour + cross-backend conformance. Full pipeline suite 114 passed
(88 phase-1+2 baseline + 9 phase-2 features + 17 new phase-3a). Lints
clean, pyright clean on touched modules.
@miguelgfierro miguelgfierro marked this pull request as ready for review May 27, 2026 15:07
@miguelgfierro miguelgfierro merged commit cc44d70 into issue-147-pipeline-evolution May 27, 2026
@miguelgfierro miguelgfierro deleted the issue-147-phase-3a branch May 27, 2026 15:07
ancongui pushed a commit that referenced this pull request May 31, 2026
feat(pipeline): Redis + Postgres checkpointer backends (#147 phase 3a)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant