Skip to content

feat(clients/typescript): experimental cooperative consumers#214

Merged
NikolayS merged 7 commits into
mainfrom
coop-clients-typescript
May 6, 2026
Merged

feat(clients/typescript): experimental cooperative consumers#214
NikolayS merged 7 commits into
mainfrom
coop-clients-typescript

Conversation

@NikolayS
Copy link
Copy Markdown
Owner

@NikolayS NikolayS commented May 6, 2026

Summary

  • Adds the experimental cooperative-consumers surface to the TypeScript client: subscribeSubconsumer, unsubscribeSubconsumer, receiveCoop, touchSubconsumer on Client, and a subconsumer / deadInterval option on newConsumer that routes the poll loop through receiveCoop.
  • Marked experimental in 0.2 in the README, mirroring the SQL feature's status. deadInterval without subconsumer throws at construction.
  • Updates the cross-client parity matrix in clients/README.md to flag TypeScript as the first client with cooperative coverage.

Test plan

  • bun run check — typecheck clean
  • PGQUE_TEST_DSN=postgres://nik@localhost/pgque_coop_ts bun run test — 69 tests pass (10 new in test/coop.test.ts):
    1. subscribeSubconsumer returns 1 then 0
    2. receiveCoop round-trips messages and ack finishes the batch
    3. Two subconsumers split batches without duplicate msg_id delivery
    4. unsubscribeSubconsumer default raises on active batch
    5. unsubscribeSubconsumer({ batchHandling: 1 }) routes the active batch through retry
    6. touchSubconsumer returns 1 on a registered row
    7. High-level Consumer with { subconsumer } dispatches handler and acks via the cooperative path
    8. High-level Consumer without subconsumer is unchanged (still calls receive)
    9. newConsumer({ deadInterval }) without subconsumer throws with a clear message
    10. Mock-based assertions on the receiveCoop call args (queue, name, subconsumer, options)
  • Manual two-worker e2e (bun src/coop_e2e.ts):
    {
      "total_sent": 50,
      "worker_1_processed": 30,
      "worker_2_processed": 20,
      "sum": 50,
      "overlap_count": 0,
      "disjoint": true
    }
    Both workers received messages, totals sum to N, msg_id sets are disjoint.

Notes

  • Heartbeats are intentionally manual: the high-level Consumer does not auto-call touchSubconsumer. README points to it as the long-handler escape valve.
  • Test teardown for cooperative tests uses a dedicated teardownCoopTestQueue helper that nukes subscription / retry / dead-letter rows before drop_queue, because drop_queue(..., true) calls unregister_consumer which now refuses cooperative mains with active members.

🤖 Generated with Claude Code

NikolayS and others added 5 commits May 6, 2026 11:35
Wraps the cooperative SQL surface (subscribe_subconsumer,
unsubscribe_subconsumer, receive_coop, touch_subconsumer) on Client and
extends the high-level Consumer with a `subconsumer` / `deadInterval`
option that routes the poll loop through receiveCoop. Marked
experimental in 0.2.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Manual end-to-end script that registers two subconsumers under one
logical consumer, sends N events across multiple tick windows, and
asserts the worker totals sum to N with zero msg_id overlap.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Adds a TS README section covering subscribeSubconsumer,
unsubscribeSubconsumer, receiveCoop, touchSubconsumer, and the
subconsumer / deadInterval consumer options. Updates the parity matrix
to mark TypeScript as the first client with cooperative consumer
coverage.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
- bench/coop_demo.ts: two-worker walkthrough referenced from the README's
  experimental coop section. Connects via PGQUE_TEST_DSN, partitions a
  fixed event set across `worker-1` and `worker-2`, and prints per-worker
  message lines plus a summary.
- bench/coop_manual_evidence.ts: scripted runner for the four manual-
  evidence scenarios — disjoint partitioning, idempotent
  subscribeSubconsumer, active-batch rejection + batchHandling=1 retry
  routing, and stale takeover via deadInterval.
- bench/coop_scaling.ts: cooperative scaling benchmark sweeping
  --subconsumers (default 1), with --events / --payload / --runs /
  --handler-work-ms flags. Pre-publishes events with one tick per chunk
  so the allocator has multiple windows to hand out, then runs N async
  workers under a shared pg.Pool sized to subN * 2 + 4. Reports CSV
  (subconsumers, events_per_sec, seconds) on stdout.
- bench/coop_scaling.sh: driver that runs the full {1,2,4,8,16} sweep,
  pipes the CSV to plot.py, and writes coop_scaling.png. Defaults
  handler_work_ms to 0.25 — enough work per message to let parallel
  workers overlap (so the curve rises 1->4) while keeping the
  cooperative FOR UPDATE allocator on the hot path so the plateau is
  visible inside {1..16}.
- bench/plot.py: matplotlib renderer (~800x500 PNG) with a footer that
  reports the PG version, machine, event count, payload size, and
  handler work per message via env vars.
- bench/coop_scaling.png: committed headline chart from a real run on
  PG 18.3 / Darwin arm64; 5000 events / 64 byte payload / 0.25 ms
  handler work / 3 runs / median.
- .gitignore: drop the per-run CSV; the PNG is the canonical result.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Add a short scaling section to the experimental coop block that
references bench/coop_demo.ts (runnable two-worker walkthrough),
embeds bench/coop_scaling.png as the headline chart, and explains the
rise / plateau / regression shape: parallel handlers overlap on the
ascending side, the FOR UPDATE on the cooperative main row dominates
on the flat side. Includes the load-bearing footnote that adding more
*normal* consumers does not share work — each subscribe is a separate
fan-out cursor that re-delivers every event, so coop subconsumers are
the way to split work across N workers.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
NikolayS added 2 commits May 6, 2026 13:56
The scaling benchmark was misleading: Node's single event loop made the
measurement reflect async-task interleaving with sub-ms busy spin rather
than real cooperative-allocator scaling. A unified bench across all
three drivers will land separately. Removing the script, runner, chart,
plot helper, and the manual-evidence helper that paired with it. The
two-worker demo (coop_demo.ts) stays.
Remove the throughput-scaling subsection, chart image, and reproduction
pointers from the experimental cooperative consumers docs. The
underlying bench was misleading (see preceding commit). Keep the
experimental caveat, API table, and the two-worker demo pointer.
@NikolayS
Copy link
Copy Markdown
Owner Author

NikolayS commented May 6, 2026

REV Code Review Report

  • PR: NikolayS/PgQue#214 - feat(clients/typescript): experimental cooperative consumers
  • Author: @NikolayS
  • AI-Assisted: Yes
  • CI: ✅ success

POTENTIAL ISSUES (4)

Issues with moderate confidence (4-7/10). Review manually — may be false positives.

MEDIUM clients/typescript/src/coop_e2e.ts:518 - Incomplete cleanup in teardown (confidence: 5/10)

Teardown only deletes from pgque.subscription before calling drop_queue. It does not clean pgque.retry_queue or pgque.dead_letter. If a batch is retried or dead-lettered during the test run, drop_queue may silently fail or leave orphan rows. Compare with teardownCoopTestQueue in test/helpers.ts which cleans all three tables.
Suggestion: Mirror the helpers.ts teardown: delete from retry_queue and dead_letter before calling drop_queue.

MEDIUM clients/typescript/src/consumer.ts:384 - No construction-time guard for empty subconsumer string (confidence: 5/10)

ConsumerOptions.subconsumer is typed as string | undefined. If a caller passes subconsumer: '' (empty string), the Consumer constructor sets this.subconsumer = '' which is not undefined, so the poll loop routes to receiveCoop — which then throws a PgqueSqlError at runtime rather than at construction. The existing guard only catches deadInterval-without-subconsumer.
Suggestion: Add if (this.subconsumer !== undefined && !this.subconsumer) throw new Error(...) in the constructor, and add a test for it.

INFO clients/typescript/src/coop_e2e.ts - Manual harness placed in src/ instead of bench/ (confidence: 6/10)

src/ conventionally contains production-shipped code. coop_e2e.ts is a manual dev harness (run with bun src/coop_e2e.ts) that mirrors bench/coop_demo.ts in purpose. Placing it in src/ may confuse future contributors about what is shipped vs. what is tooling.
Suggestion: Move to bench/coop_e2e.ts and update the file's run instructions.

INFO clients/typescript/README.md:70 - touchSubconsumer table entry missing manual-call guidance (confidence: 5/10)

The README method table describes touchSubconsumer as "Refresh the subconsumer heartbeat…" but does not note that the high-level Consumer does not call it automatically, so users with long-running handlers must call it themselves. This detail appears only in the JSDoc. Users reading the table alone will miss a critical operational requirement.
Suggestion: Add a note such as "Not called automatically by the high-level Consumer — call manually from long-running handlers or set a conservative deadInterval."


Summary

Area Findings Potential Filtered
CI/Actions 0 0 0
Security 0 0 1
Bugs 0 1 0
Tests 0 1 1
Guidelines 0 1 0
Docs 0 1 0
Metadata 0 0 0

REV-assisted review (AI analysis by postgres-ai/rev)

@NikolayS NikolayS merged commit b6f7509 into main May 6, 2026
11 checks passed
@NikolayS NikolayS deleted the coop-clients-typescript branch May 6, 2026 21:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant