Skip to content

fix(clients/python): make Consumer thread-safe and correct test xact pattern#129

Merged
NikolayS merged 1 commit intomainfrom
fix/clients-python-test-failures
Apr 30, 2026
Merged

fix(clients/python): make Consumer thread-safe and correct test xact pattern#129
NikolayS merged 1 commit intomainfrom
fix/clients-python-test-failures

Conversation

@NikolayS
Copy link
Copy Markdown
Owner

@NikolayS NikolayS commented Apr 30, 2026

Why

PR #84 turns on real per-client driver tests against live Postgres. The Python suite exposes a real driver bug plus a handful of test patterns that always failed against a live ticker:

  • Consumer.start() registered SIGTERM/SIGINT signal handlers unconditionally. signal.signal() raises ValueError when called from a non-main thread, so any caller that runs the consumer from a worker thread (the standard pattern for tests, embedded use, and most multi-consumer apps) crashed before doing any work.
  • The send/receive tests interleaved pgque.send() and pgque.ticker() inside one transaction, then committed. The ticker's snapshot excludes the producing xact's own writes, so the resulting tick references no events and receive() returns []. Worked in isolation only because force_tick() advances the sequence — it does not actually rotate.
  • test_smoke called pgque.subscribe() without first creating the queue.
  • test_nack_routes_to_dlq_at_max_retries synthesised a retried message by setting Message.retry_count = 1 in Python. Per Bug: nack() DLQ path trusts caller-supplied pgque.message and allows forged DLQ rows #98, pgque.nack() reads ev_retry from the canonical batch row and ignores caller-supplied values, so the assertion never fired.

What changes

  • clients/python/pgque/consumer.py: detect non-main-thread invocation and skip signal registration; callers stop via Consumer.stop() in that case.
  • clients/python/tests/test_send.py, test_receive.py, test_nack.py, test_consumer.py: commit between client.send() and pgque.force_tick() so the ticker sees the new events.
  • clients/python/tests/test_smoke.py: add the missing pgque.create_queue() before subscribe().
  • clients/python/tests/test_nack.py: set queue_max_retries = 0 so the first nack routes to DLQ, instead of relying on a client-side retry_count mutation that the SQL ignores.

Coordination

Depends on #84 for live-PG CI to actually exercise these tests. Once merged, the Python job on PR #84 should go green on rebase.

Test plan

  • PGQUE_TEST_DSN=... pytest -v clients/python/tests/ -> 39 passed locally on PG 16
  • CI run on this PR

Generated by Claude Code

…pattern

Two independent fixes uncovered when running clients/python/tests/ against
a live Postgres:

1. Consumer.start() unconditionally registered SIGTERM/SIGINT handlers,
   which raises ValueError when the consumer runs in a worker thread (the
   common pattern for tests and embedded use). Detect non-main-thread and
   skip signal registration; callers stop via Consumer.stop() in that case.

2. The send/tick/receive tests interleaved pgque.send() and
   pgque.ticker() inside one transaction, then committed. The ticker's
   snapshot excludes the producing xact's own writes, so the resulting
   tick references no events and receive() returns []. Commit between
   send and force_tick so the ticker sees the new events. test_smoke
   additionally needed the missing create_queue() before subscribe().

3. test_nack_routes_to_dlq_at_max_retries: nack() reads ev_retry from
   the canonical batch row, ignoring caller-supplied Message.retry_count.
   Set queue_max_retries=0 to route the first nack straight to the DLQ
   instead.
Copy link
Copy Markdown
Owner Author

REV review (Security / Bugs / Tests / Guidelines / Docs) — verdict: PASS (non-blocking nits)

1. Security

  • Signal handler restoration is correct: original_sigterm/sigint are now captured and restored only when registration actually happened (if in_main_thread). No risk of restoring None over a real handler, which the previous unconditional signal.signal(..., None) would have done in worker-thread runs (it would have raised before reaching finally though, masking the issue).
  • No env-var injection surface introduced. DSN handling unchanged; psycopg.connect(self.dsn, autocommit=True) is still used.
  • One pre-existing observation (out of scope): from the main thread, the consumer mutates process-global SIGTERM/SIGINT for the duration of start(). That's unchanged here, but worth a docstring note (see Docs).

2. Bugs

  • threading.current_thread() is threading.main_thread() is the canonical idiom (CPython docs); identity comparison is correct since main_thread() returns the singleton _MainThread.
  • Main-thread shutdown path still works: _stop is the registered handler and flips _running = False, the loop exits, finally restores originals.
  • The "commit between send and force_tick" fix is sound, not papering over a driver bug. PgQ's ticker is intentionally snapshot-isolated: pg_current_snapshot() taken inside the producer's xact excludes that xact's own xid, so the resulting tick references no rows from the in-flight xact. Auto-committing inside force_tick() (or client.send()) would violate the documented "send is part of your transaction" contract — that's a feature of the model, not a bug. The test was wrong.
  • DLQ routing via queue_max_retries = 0: correct. pgque.nack() increments ev_retry from the canonical row to 1, which exceeds the limit of 0, so the very first nack routes to the dead-letter table. The deleted msg.retry_count = 1 mutation was indeed a no-op per Bug: nack() DLQ path trusts caller-supplied pgque.message and allows forged DLQ rows #98 (fix(pgque.nack): canonical + idempotent DLQ terminal handling #116 in the PR body appears to be a typo — the canonical-ev_retry change tracked under issue Bug: nack() DLQ path trusts caller-supplied pgque.message and allows forged DLQ rows #98 in this body; double-check the reference).

3. Test analysis

  • The new tests prove what they claim: each commit() between send() and force_tick() is necessary and sufficient for the ticker to see the producer's writes.
  • No silent skips introduced; conftest.py still env-gates on PGQUE_TEST_DSN.
  • test_concurrency.py does not use Consumer, so the signal-handler fix doesn't intersect; it uses pgque.connect(...) per-thread. No race surfaced.
  • Non-blocking nit: test_consumer_stop_returns_promptly sets poll_interval=10 and joins with timeout=15. Consumer.stop() only flips _running; the loop is parked inside conn.notifies(timeout=10) and won't wake early. The test will wait up to ~10s on every run. Pre-existing — not introduced here — but worth filing a follow-up to make stop() interrupt the notifies wait (e.g. pg_notify self-wakeup, or shorter inner timeout).
  • Non-blocking nit: _run_consumer_for(cons, 3.0) in test_consumer.py joins with timeout=5.0. With poll_interval=1, worst-case wakeup latency is ~1s after stop(), so 5s is fine — but if CI is slow this gets tight. Consider raising the join timeout to 10s for safety.

4. Guidelines

  • Conventional commit prefix fix(clients/python): — compliant with CLAUDE.md.
  • Apache-2.0 + PgQ ISC headers preserved on touched files.
  • No leaked internal references (gitlab/sahmed/AR/round-N/WI) in code or PR body.
  • Diff size is minimal and surgical; matches "red/green TDD for clients" rule (test fixes alongside the driver fix).

5. Docs

  • Non-blocking: Consumer.start() docstring still reads "blocks until SIGTERM/SIGINT". After this PR it also returns when Consumer.stop() is called, and from worker threads SIGTERM/SIGINT registration is skipped entirely. Suggested addition:

    Blocks until Consumer.stop() is called or, when invoked from the main thread, until SIGTERM/SIGINT is received. Worker-thread invocations skip signal registration; stop the consumer via Consumer.stop().

  • README/docs/ not touched in this PR; reference docs for the Python client should mention thread-safety of start() once written.

Blockers

0 blocking issues. Three non-blocking suggestions:

  1. Update Consumer.start() docstring to reflect new thread-aware behavior.
  2. Follow-up issue: make Consumer.stop() actually interrupt the notifies() wait.
  3. Verify #116 vs #98 reference in the PR body matches the canonical-ev_retry change.

Verdict: PASS — safe to merge once #84 lands.


Generated by Claude Code

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants