feat(ruby): Ruby client (pgque gem)#234
Conversation
Scaffold clients/ruby/: gemspec, Gemfile, Rakefile, lib skeleton (Pgque module, Pgque::Client with connect/close), and test_helper.rb with PGQUE_TEST_DSN gating + queue/consumer fixture helpers. First TDD pair: test_connect_returns_client (Pgque.connect returns a Pgque::Client wrapping an open PG::Connection; close finishes it). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Pgque.connect(dsn) { |c| ... } yields the client and closes it on
block exit, mirroring the Python context-manager pattern.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Introduce Pgque::Error base class and Pgque::ConnectionError; rescue PG::ConnectionBad in Pgque::Client.connect and re-raise wrapped. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Pgque::Client.new(raw_conn) defaults owns_conn:false, so close leaves the caller's connection open. Test guards the contract. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Pgque.connect(autocommit:) is propagated to the Client and exposed via
client.autocommit?. The Ruby pg gem has no autocommit attribute on the
connection itself, so the flag is informational here -- callers manage
explicit transactions with conn.transaction { } as Ruby idiom dictates.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Calling close twice does not raise; the finished? guard already in place makes the second call a no-op. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Pgque::Client#send(queue, payload) inserts via pgque.send(queue, payload::jsonb), encoding Hash/Array as JSON and nil as JSON null. Returns the bigint event id as a Ruby Integer. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Add type: keyword arg to Pgque::Client#send. When type is empty/nil/ "default" the 2-arg pgque.send is used; otherwise the 3-arg form. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Add Pgque::Event value class. When passed to send, its type and payload override the keyword args, mirroring the Python Event handling. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Strings are forwarded as-is to ::jsonb cast (caller is responsible for valid JSON text). encode_payload's else branch already handles this. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
encode_payload returns the literal "null" so the ::jsonb cast yields JSON null rather than SQL NULL. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Build a PG text-array literal from JSON-encoded payloads, call pgque.send_batch(...)::jsonb[], and unnest the bigint[] result so ordering is preserved row-by-row. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Ports the rest of the per-message client surface from the Python client: - Pgque::Message value class with msg_id/batch_id/type/payload/retry_count/ created_at/extra1..4. JSONB payloads are decoded to native Ruby via JSON.parse; timestamps via Time.parse. - Pgque::Client#receive(queue, consumer, max_messages=100): returns [Pgque::Message]. Empty array when no batch is current. - Pgque::Client#ack(batch_id): finishes the batch, returns 1 (ok) or 0 (stale/double ack). - Pgque::Client#force_next_tick(queue): returns last tick id or nil. - Pgque::Client#nack(batch_id, msg, retry_after:, reason:): routes the message through retry_queue (or DLQ once queue_max_retries is hit). - Error wrapping: PG::Error from any client method is mapped to Pgque::QueueNotFound / Pgque::BatchNotFound / Pgque::Error based on the message text, mirroring the Python wrapper. Tests: 30 runs / 79 assertions, all green against Postgres 18 with pgque.sql installed. Closes the test_send round-trip cases (unicode, large, jsonb_round_trip, batch mixed/nil, missing-queue raise, SQL form selection via FakeConn) plus full test_receive, test_smoke, and test_nack ports. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Pgque::Consumer (lib/pgque/consumer.rb) — synchronous polling consumer
that mirrors the Python version's behavior end-to-end:
- consumer.on(type) { |msg| ... } registers a handler; "*" is the
catch-all.
- start blocks in a poll loop, processing one batch at a time inside a
conn.transaction { } block. Each msg dispatches to its handler;
exceptions trigger nack with retry_after; messages with no handler
are nacked (or acked when unknown_handler_policy: "ack"), with a
warn either way. The batch is acked at the end -- unless any nack
itself failed, in which case the transaction commits without acking
so PgQ redelivers. ack returning 0 (stale/double ack) logs WARN.
- LISTEN/NOTIFY wakeup uses bounded ~0.5s slices around
PG::Connection#wait_for_notify so stop() unblocks within ~1s even
when poll_interval is large. Buffered NOTIFYs from the prior poll
are drained before waiting, mirroring the Python regression for
issue NikolayS#158.
- Signal handlers (TERM/INT) are installed only when start() runs on
the main thread; tests and embedded use can call stop() from any
thread.
- Cooperative-mode hooks (subconsumer:, dead_interval:) are wired up
but the underlying receive_coop SQL surface lands in a follow-up.
Constructor validates that dead_interval requires subconsumer.
Tests: 47 runs / 119 assertions, all green. Covers max_messages
defaults, on()-dispatch, default-handler catch-all, error-driven
nack, unknown-type nack vs. opt-in ack, stop()-promptness from a
worker thread, NOTIFY wakeup before poll_interval, and the partial-
batch case where good messages are finished by the batch ack while
the failing one survives in retry_queue. Unit tests stub
Pgque::Client.new with a SpyClient to assert _poll_once passes
max_messages through to receive() without needing a real DB round-trip.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds the experimental cooperative-consumers SQL surface to the client: - subscribe_subconsumer / unsubscribe_subconsumer (with batch_handling: kwarg defaulting to 0, the strict mode that raises on an active batch). - receive_coop with max_messages: + dead_interval: kwargs. - touch_subconsumer. The high-level Pgque::Consumer already routes to receive_coop when subconsumer: is set; this commit makes the underlying SQL calls real. Tests: 9 ports of test_coop.py covering subscribe idempotency, two- member batch splitting (no overlap), unsubscribe-with-active-batch strict raise + batch_handling:1 cleanup, touch heartbeat, the high- level Consumer in coop mode, and the dead_interval-without-subconsumer ArgumentError. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- test_concurrency.rb: 4 producer threads x 25 sends each, asserting zero event-id collisions across independent connections. - test_transaction_visibility.rb: locks PgQ's snapshot rule (send + force_next_tick + receive in one xact returns 0 rows) and a regression guard that catches a Consumer whose poll_once is a no-op -- a fresh receive must still return the message because the batch cursor never advanced. The visibility test stubs Consumer#poll_once via define_singleton_method on the instance, mirroring the Python mock.patch.object pattern. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Mirrors python-client-tests: postgres:18 in Docker, build pgque.sql via transform.sh, install into pgque_test DB, then run the Minitest suite (clients/ruby with Ruby 3.3 via ruby/setup-ruby) and a gem build smoke step. Cleanup tears down the container with if: always(). Also: update root .gitignore to exclude clients/ruby/Gemfile.lock, pkg/, .bundle/, and built *.gem files (libraries do not commit Gemfile.lock). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- clients/README.md: Ruby column with the v1 surface (full parity with Python: classified errors, retry delay + reason on nack, LISTEN/NOTIFY wakeup on the high-level Consumer, configurable unknown-type policy). - README.md: Ruby section under "Client libraries" with the install command, a connect example, and a Consumer example matching the Python/TS snippets above. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Pgque::Client#send shadows Object#send. Add a class doc-comment in lib/pgque/client.rb, a "A note on Pgque::Client#send" section in the gem README, and a regression test that __send__ and public_send still dispatch methods reflectively on a Client instance. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Pgque::Consumer's default logger now targets $stderr (not $stdout, so
it cannot collide with application output) and ships at level FATAL,
which the consumer never emits -- making it effectively silent unless
the host app opts in. Set PGQUE_LOG_LEVEL=warn|info|debug|error to
re-enable, or pass logger: Logger.new(...) to Consumer.new.
Mirrors Python's logging.getLogger("pgque") behavior more closely:
no incidental output until the host application configures handlers.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Ruby's pg gem has no per-connection autocommit attribute -- every
exec_params runs in its own implicit transaction by default, the
equivalent of psycopg's autocommit=True. Storing an autocommit flag
on Pgque::Client implied behavior the gem could not actually deliver.
This commit:
- Removes autocommit: from Pgque.connect and Pgque::Client.connect/.new.
- Removes Pgque::Client#autocommit? and the test_autocommit_flag test.
- Adds a doc comment on Pgque.connect explaining that transaction
control is per-call via conn.transaction { ... }, not per-connection.
- Updates test_two_subconsumers_split_batches_no_duplicates with a
comment explaining why the Python autocommit=True call has no
Ruby equivalent (FOR UPDATE drops at end of statement).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
encode_payload's else branch returned the payload verbatim, which crashes the pg gem when the payload is a Symbol or any other object the gem can't serialize as a parameter. Coerce via #to_s so numerics and booleans round-trip naturally (42 -> "42" -> JSON 42; true -> "true" -> JSON true). Objects whose to_s isn't valid JSON (Symbols, Time, etc.) still surface a SQL error from the ::jsonb cast -- callers who care should pre-encode with JSON.dump. Adds a parameterized test asserting round-trip for Integer, Float, true, and false. The String-passthrough test already covers the case where the caller provides JSON text directly. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
If a test body left the conn in a failed transaction (e.g. an
assertion failure after a SQL error without an explicit rollback),
subsequent queries are rejected by Postgres until ROLLBACK -- which
silently broke drop_queue under the rescued PG::Error and could leak
the per-test queue across runs.
The ensure block now starts with conn.exec("ROLLBACK") rescue nil so
cleanup runs against a fresh transaction state.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The PG::Result accessor refactor switched Pgque::Client from result.values[0][0] to result.getvalue(0, 0); TestSendSqlForm's in-class FakeConn::FakeResult still implemented values, so the two SQL-form-selection tests errored with NoMethodError. Re-implement FakeResult#getvalue to return the same canned id. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds the dispatch-only release workflow for the pgque gem and the companion RELEASE.md documenting the bootstrap + ongoing process. Workflow shape (mirrors release-python.yml): - on: workflow_dispatch with version (string) + dry_run (bool, default true). No push/tag triggers; code changes never run this file. - Top-level permissions: contents:read. publish job widens to id-token:write only where it needs OIDC. - build job (always runs on dispatch from main): Gem::Version sanity check, asserts inputs.version matches lib/pgque/version.rb's Pgque::VERSION, gem build, install the resulting .gem into a throwaway GEM_HOME, require "pgque" and assert VERSION + Client + Consumer are defined. Catches packaging mistakes before they reach the registry. - publish-rubygems job (gated on !dry_run, needs: build, environment: rubygems): rubygems/release-gem@v1 with setup-trusted-publisher:true exchanges the GitHub OIDC JWT for a short-lived rubygems.org API key and pushes; await-release:true blocks until the gem is fetchable. No long-lived RUBYGEMS_API_KEY secret. Three human gates protect against accidental publish: workflow_dispatch trigger, dry_run flag, and the rubygems GitHub environment (recommend configuring required reviewers there). RELEASE.md covers: gem identity, versioning conventions (dot-separated pre-releases vs Git-style hyphens), the manual bootstrap publish that RubyGems requires before trusted publishing can be configured, GitHub environment setup, the rubygems.org Trusted Publisher policy fields, the dry-run-then-publish process, and a note on why there is no TestPyPI-equivalent staging step (RubyGems has no public test registry). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Use the Ruby setter idiom: self.running = value reads more naturally at call sites than set_running(value), and matches the running? predicate reader. Stays private under the existing private: modifier, so external callers still go through #stop to flip the flag. The bare assignment running = value would create a local variable inside an instance method, so the three call sites in start, stop, and the signal proc all use self.running = ... explicitly. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
REV Code Review Report
BLOCKING ISSUES (4)HIGH
Evidence from review: Fix: add Bundler gem tasks: require "bundler/gem_tasks"or replace HIGH The SIGTERM/SIGINT handler calls Fix: keep the trap async-signal-safe for Ruby: flip a simple instance variable directly ( HIGH The workflow grants: permissions:
contents: read
id-token: writeBundler's Fix: either grant: permissions:
contents: write
id-token: writeor avoid MEDIUM
Fix: set running false in the outer NON-BLOCKING / HARDENING (3)MEDIUM The publish job grants
The protected Suggestion: pin release-job actions to full commit SHAs, especially MEDIUM Repo guidance expects install/quickstart docs to include install command, ticker setup or explicit skip path, and role grants. Ruby README has install + grants, but no ticker setup/skip guidance; the root Ruby snippet creates a consumer without showing queue/consumer registration or ticker requirements. Suggestion: add one short runnable setup path covering schema install, queue/consumer registration or LOW Root README says all clients are published at Suggestion: call out Ruby separately ( NOTES
REV-assisted review (fallback multi-agent review; Claude Code REV was blocked locally by API key auth). |
The SIGTERM/SIGINT proc installed by Consumer#start was calling @logger.info(...) and a Mutex#synchronize-backed running= setter -- both raise ThreadError when invoked from Ruby's trap context. A real signal delivered while start ran on the main thread would have crashed the trap (and likely the process) instead of cleanly shutting the consumer down. Fix: - Drop @running_mutex. The flag is a single boolean with no ordering dependencies, and Ruby integer/boolean assignment is atomic. The mutex was both unnecessary and the proximate cause of the trap-context crash. - Reduce the trap proc to two plain instance-variable writes: @stop_signum = signum; @running = false. No Logger, no synchronize, no other blocking work. - Move the "received signal N, shutting down" log line out of the trap into the post-loop block, gated on @stop_signum. It runs on the main thread after the wait wakes, so Logger is safe again. - Remove the now-pointless private running= setter and drop self.running = ... at the two non-trap call sites in favor of direct @running assignment. The existing consumer tests never exercised the trap path because they all call cons.start on a worker thread, where Thread.current == Thread.main is false and signal handlers are not installed. The bug only surfaces in production (running consumer on the main thread, e.g. a `bundle exec ruby my_worker.rb`) when the process actually receives TERM/INT. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
start set @running = true up front but only restored signal handlers in the outer ensure. If PG.connect, LISTEN, poll_once, or any SQL call raised, the consumer exited with @running still true -- so callers polling consumer.running? saw "running" with no live worker behind it. Fix: zero @running in the outer ensure (plain instance-var write, trap-safe). Add a regression test that points start at an unreachable host so PG.connect raises immediately, and asserts running? is false both before start and after the failure. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
rubygems/release-gem@v1 runs `bundle exec rake release`, but
clients/ruby/Rakefile only defined :test, so the publish job would
have failed with "Don't know how to build task 'release'" the first
time anyone hit it.
Fix:
- clients/ruby/Rakefile: add `require "bundler/gem_tasks"` so the
conventional release task chain is available (rake build,
rake install, rake release[remote] and the three release:*
subtasks).
- .github/workflows/release-ruby.yml publish-rubygems job: grant
contents:write (needed by release:source_control_push to push
the v${VERSION} tag back to origin via the GITHUB_TOKEN that
actions/checkout configures automatically), switch checkout to
ref: ${{ github.ref }} with fetch-depth:0 so we land on an
attached HEAD with tags visible (rake release's plain `git push`
refuses to operate from detached HEAD), and configure the
github-actions[bot] git identity for the annotated tag.
- clients/ruby/RELEASE.md: document the rake release chain and call
out the side effect that the workflow pushes a v${VERSION} git
tag to NikolayS/pgque, plus the partial-failure recovery (yank
gem + git push --delete origin v${VERSION}).
Race note: the build job pins to inputs.version against
${{ github.sha }}; if main races forward during the workflow the
publish job picks up the new head of refs/heads/main, but the
race window would have to land both a fresh commit and a version
bump for the publish to ship anything different.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
CLAUDE.md requires install/quickstart docs to cover: install command, ticker setup (or skip path), and role grants. The Ruby README had install + roles but no ticker guidance, and the root-README Ruby snippet jumped from connect to Consumer without showing queue or consumer registration. - clients/ruby/README.md: add a Quickstart section between Database permissions and the Object#send note, with one short snippet covering create_queue + subscribe + send + Consumer.on/start, followed by a paragraph on the pg_cron ticker path and the external-scheduler alternative. - README.md (Ruby section under Client libraries): add the one-time setup lines (create_queue + subscribe) and a trailing comment that consumer.start needs pgque.ticker() running, matching the rest of the doc's self-complete style. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Each ecosystem normalises pre-release version strings differently: PyPI prints 0.2.0rc1 (no separator), RubyGems uses 0.2.0.rc.1 (dot-separated -- hyphens are warned against in the gem RELEASE.md), and npm + Go use 0.2.0-rc.1 (semver hyphen). The single shared "v0.2.0-rc.1" literal in the Client libraries intro was wrong for Python and Ruby. Replace the universal literal with a short note that each per-language section below shows the correct spelling. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Summary
clients/ruby/— a first-party Ruby client (pgquegem) with full parity with the Python client, including the experimental cooperative-consumers API.ruby-client-testsCI job mirroringpython-client-tests(Postgres 18 in Docker, build pgque viatransform.sh, install, thenbundle exec rake test+gem buildsmoke).clients/README.mdand adds a Ruby section to the top-level README under "Client libraries".The gem name is
pgque(matches the project; same convention as the TypeScript package on npm). Module isPgque. Required Ruby is>= 3.1. Driver: thepggem.Surface
Pgque.connect(dsn, autocommit:)— block form auto-closes;Pgque::Client.new(raw_pg_conn)for shared connections.closeis idempotent and a no-op for external connections.client.send(queue, payload, type:)acceptsHash/Array/String/nil/Pgque::Event; picks 2-arg vs. 3-argpgque.sendSQL based ontype.client.send_batch(queue, type, payloads)returns IDs in input order.client.receive(queue, consumer, max_messages=100)→[Pgque::Message];client.ack(batch_id)→ rowcount (1 ok, 0 stale);client.nack(batch_id, msg, retry_after:, reason:);client.force_next_tick(queue).Pgque::Errorbase;Pgque::ConnectionError/QueueNotFound/BatchNotFound/ConsumerNotFound. SQL errors are wrapped by message-text matching, mirroring the Python wrapper.Pgque::Consumersubscribe_subconsumer,unsubscribe_subconsumer(batch_handling:),receive_coop(max_messages:, dead_interval:),touch_subconsumer. Constructor validatesdead_intervalrequiressubconsumer. Marked experimental in source comments and READMEs.Tests
59 runs / 146 assertions, all green locally against Postgres 18 with
pgque.sqlinstalled.```
clients/ruby/test/
├── test_connect.rb # 6 tests (ports test_connect.py)
├── test_send.rb # 13 tests (test_send.py)
├── test_receive.rb # 6 tests (test_receive.py)
├── test_nack.rb # 3 tests (test_nack.py)
├── test_smoke.rb # 1 test (test_smoke.py)
├── test_consumer.rb # 19 tests (test_consumer.py — 4 unit + 15 integration)
├── test_consumer_listen_stop.rb # 2 tests (test_consumer_listen_stop.py)
├── test_coop.rb # 9 tests (test_coop.py)
├── test_concurrency.rb # 1 test (test_concurrency.py)
└── test_transaction_visibility.rb # 2 tests (test_transaction_visibility.py)
```
Tests run red/green TDD against a real Postgres (no SQL mocking), gated on
PGQUE_TEST_DSN. Unit-style cases that don't need a real DB use a smallFakeConnin-class to capture the SQL form, andPgque::Client.stub :new, ...to assert the consumer's poll loop forwardsmax_messagescorrectly.Test plan
ruby-client-testsjob needs to go green on this PR.clients/README.mdand the Ruby snippet in the top-level README.Out of scope (explicit follow-ups)
Commit history
18 commits on the branch, organised TDD-style:
🤖 Generated with Claude Code