Skip to content

feat(clients/python): experimental cooperative consumers#216

Merged
NikolayS merged 12 commits into
mainfrom
coop-clients-python
May 6, 2026
Merged

feat(clients/python): experimental cooperative consumers#216
NikolayS merged 12 commits into
mainfrom
coop-clients-python

Conversation

@NikolayS
Copy link
Copy Markdown
Owner

@NikolayS NikolayS commented May 6, 2026

Summary

  • Add subscribe_subconsumer, unsubscribe_subconsumer, receive_coop, and touch_subconsumer methods on PgqueClient. Each maps 1:1 to its pgque.* SQL counterpart and reuses the existing _wrap_sql_error path.
  • Extend the high-level Consumer with subconsumer and dead_interval parameters. When subconsumer is set, the poll loop calls client.receive_coop(...) instead of client.receive(...). dead_interval only applies in coop mode (raises ValueError otherwise). No automatic heartbeat — client.touch_subconsumer is the manual primitive.
  • Document the API as Experimental in PgQue 0.2 in the Python client README (caveat block + 2-worker example) and add a parity-matrix row in clients/README.md flagging it experimental.

The cooperative-consumers SQL surface itself is already on main; this PR is purely the Python-client layer over it.

Test plan

  • PGQUE_TEST_DSN=postgres://nik@localhost/pgque_coop_py pytest clients/python/tests — 67 passed (9 new in test_coop.py):

    • subscribe_subconsumer returns 1 then 0
    • receive_coop returns messages from a published batch and ack finishes them
    • Two subconsumers under one consumer split batches, no duplicate delivery
    • unsubscribe_subconsumer(..., batch_handling=0) raises on active batch
    • unsubscribe_subconsumer(..., batch_handling=1) routes through retry/DLQ
    • touch_subconsumer returns 1 on a registered row
    • Consumer(subconsumer="worker-1") dispatches a handler and acks
    • Consumer without subconsumer is unchanged (regression)
    • Consumer(dead_interval=..., subconsumer=None) raises ValueError
  • Manual two-worker e2e (/tmp/coop_e2e.py): N=40 events across 4 ticks, both Consumer instances configured with subconsumer="worker-1" / worker-2. Output:

    sent N         = 40
    worker-1 saw   = 30 msgs
    worker-2 saw   = 10 msgs
    sum of unique  = 40
    overlap        = 0 (must be 0)
    OK -- disjoint subsets, no duplicate delivery.
    

    Disjoint subsets, sums to N, no duplicate delivery — confirms cooperative allocation works end-to-end through the high-level Consumer.

🤖 Generated with Claude Code

@NikolayS
Copy link
Copy Markdown
Owner Author

NikolayS commented May 6, 2026

REV Code Review Report

  • PR: NikolayS/PgQue#216 - feat(clients/python): experimental cooperative consumers
  • Author: @NikolayS
  • AI-Assisted: Yes (Claude Opus 4.7 co-authored bench/demo commits)
  • CI: ✅ success (view run) — Python client tests, TypeScript, Go, pg_cron, pg_tle, verify all passed

POTENTIAL ISSUES (4)

Issues with moderate confidence (4-7/10). Review manually — may be false positives.

MEDIUM clients/python/tests/test_coop.py:183 — Test name promises cooperative batch splitting but assertion is vacuously true (confidence: 7/10)

test_two_subconsumers_split_batches_no_duplicates publishes 6 messages in one tick (one PgQ batch). Cooperative allocation assigns a single batch to exactly one subconsumer, so m2 will be empty. ids1.isdisjoint(ids2) is vacuously true for an empty set, and (len(m1) + len(m2)) >= 1 is satisfied even if only one worker ever sees messages. The test validates no-duplication but not the splitting property its name promises.
Suggestion: Use multiple ticks (e.g. via the _publish helper pattern from coop_demo.py) so each subconsumer gets at least one batch, then assert len(m1) > 0 and len(m2) > 0 in addition to the disjointness check.

MEDIUM clients/python/pgque/client.py:320receive_coop auto-registration path untested (confidence: 6/10)

The docstring prominently states: "The function auto-registers the coop_main and coop_member rows on first call, so callers do not need to subscribe_subconsumer ahead of time." Every test in test_coop.py calls subscribe_subconsumer explicitly first. If auto-registration ever regresses, no test catches it.
Suggestion: Add a test calling receive_coop directly without a prior subscribe_subconsumer to verify the auto-registration path works end-to-end.

MEDIUM clients/python/tests/test_coop.pydead_interval (stale-sibling takeover) path has no test (confidence: 6/10)

receive_coop accepts dead_interval and the docstring explains it allows takeover of a stale sibling's batch under a fresh batch_id. This distinct code path is never triggered in the test suite, though the README example shows dead_interval="5 minutes".
Suggestion: Add a test that registers two subconsumers, has one receive and hold a batch past dead_interval, then verifies the other can take over.

LOW clients/python/pgque/client.py:338max_messages constraint not surfaced in README (confidence: 5/10)

The receive_coop docstring warns: "set this >= the queue's worst-case batch size or consume the full batch before acking" — because ack(batch_id) advances the cursor past the entire PgQ batch including rows beyond max_messages. The README coop section does not mention this constraint.
Suggestion: Add a short note in the README experimental section pointing readers to the max_messages caveat.


Summary

Area Findings Potential Filtered
CI/Actions 0 0 0
Security 0 0 0
Bugs 0 0 0
Tests 0 3 1
Guidelines 0 0 1
Docs 0 1 0
Metadata 0 0 0
  • Filtered (tests): touch_subconsumer returning 0 for non-existent row untested — confidence 3/10, too minor.
  • Filtered (guidelines): batch_handling: int vs Literal[0, 1] — style preference, not a rule violation.

REV-assisted review (AI analysis by postgres-ai/rev)

NikolayS and others added 12 commits May 6, 2026 14:51
Expose the experimental cooperative-consumers SQL surface on
``PgqueClient``: ``subscribe_subconsumer``, ``unsubscribe_subconsumer``,
``receive_coop``, and ``touch_subconsumer``. Each maps 1:1 to its
``pgque.*`` SQL counterpart, wraps psycopg errors via the existing
``_wrap_sql_error`` path, and decodes rows with ``Message.from_row``-style
field mapping (matching ``receive``).

Tests cover the per-method contract -- registration idempotency,
batch ack flow, two-subconsumer split delivery, ``batch_handling``
modes, and ``touch_subconsumer`` heartbeat.
Add ``subconsumer`` and ``dead_interval`` parameters to ``Consumer``.
When ``subconsumer`` is set, the poll loop calls
``client.receive_coop(...)`` instead of ``receive(...)``; otherwise the
loop is unchanged. ``dead_interval`` only applies in coop mode and
raises ``ValueError`` if passed without a subconsumer name. No
heartbeat is sent automatically; ``client.touch_subconsumer`` is the
manual primitive.
Add an "Experimental: cooperative consumers" section to the Python
client README with the recommended caveat block and a two-worker
example, plus a parity-matrix row in clients/README.md flagging the
feature as experimental and pointing readers to the per-client docs.
Two-thread cooperative consumers walkthrough that publishes events
across multiple ticks so each subconsumer claims a different batch.
Prints per-message handler lines and a per-worker count summary.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Drives N cooperative subconsumers under one logical consumer through a
tight receive_coop -> ack loop and reports CSV plus a PNG chart of
total events/sec versus N. Workers use autocommit psycopg connections
so the FOR UPDATE on the cooperative main row is released between
batches.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Drop in a pointer to bench/coop_demo.py and embed bench/coop_scaling.png
with a short interpretation paragraph that explains why the throughput
curve plateaus at higher N.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Re-render with the latest measurement run so the PNG matches the CSV
captured in the PR comment evidence.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
The default coop scaling curve with a no-op handler is monotonically
decreasing in N -- pure FOR UPDATE contention on the cooperative main
row. That's an honest result but it's the wrong story for users
arriving at the README, which exists to motivate why cooperative
subconsumers are useful.

Add a --handler-work-ms flag (default 1.0) that has each worker
time.sleep between receive_coop and ack to simulate per-message
handler latency. Python releases the GIL during time.sleep so threads
genuinely parallelize, and throughput then scales roughly linearly
with N until it plateaus at the FOR UPDATE saturation point. Use
--handler-work-ms 0 to reproduce the contention-only curve.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Re-runs coop_scaling.py with the new --handler-work-ms 1.0 default and
commits the resulting CSV alongside the PNG. Curve rises from
~949 ev/s at N=1 to ~7,005 ev/s at N=16 (about 7.4x), with the slope
softening between N=8 and N=16 as the FOR UPDATE on the cooperative
main row starts to dominate.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Rewrite the interpretation paragraph under #### Scaling so it matches
the new rise-then-soften shape produced with a 1 ms per-message
handler. Adds a pointer to --handler-work-ms 0 for the
contention-only curve and keeps the existing warning that adding
normal consumers (separate register_consumer / subscribe rows) does
not share work -- each one is its own fan-out cursor.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The earlier scaling benchmark used an artificial sleep handler that
mostly measured GIL-released thread-sleep parallelism rather than
PgQue cooperative-allocator scaling. A unified bench across all three
drivers will land separately.
Remove the chart image, scaling interpretation, and reproduction note
tied to the preliminary bench. The experimental caveat, API docs, and
demo pointer remain.
@NikolayS NikolayS force-pushed the coop-clients-python branch from 778cfd7 to dc41078 Compare May 6, 2026 21:52
@NikolayS NikolayS merged commit be04780 into main May 6, 2026
11 checks passed
@NikolayS NikolayS deleted the coop-clients-python branch May 6, 2026 21:57
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant