Skip to content

fix(clients/python): consumer warns + acks unhandled event types#121

Merged
NikolayS merged 9 commits intomainfrom
fix/python-consumer-111
Apr 30, 2026
Merged

fix(clients/python): consumer warns + acks unhandled event types#121
NikolayS merged 9 commits intomainfrom
fix/python-consumer-111

Conversation

@NikolayS
Copy link
Copy Markdown
Owner

@NikolayS NikolayS commented Apr 30, 2026

Summary

Closes #111

Changes behavior for messages with no registered handler (and no * catch-all):

  • Before: nack() → message moved to retry_queue, then DLQ at max retries
  • After: logging.warning(...) via the consumer's own logger, then the batch ack() covers the message normally

The Consumer now creates a per-instance logger (pgque.consumer.<name>) so the WARNING is attributable to the specific consumer.

Finding 1: Behavior change — unhandled event type now warns + acks

Root cause of original issue (#111): Consumer._poll_once() called continue with no handler registered, then ack() for the whole batch — silently dropping the message.

PR #121 original fix: nack before continue. This is now revised: nack routes unknown types to DLQ, which conflates routing gaps with processing failures. The correct behavior is to surface the gap in logs and let the operator register a handler.

New fix: self._log.warning("no handler for event type=%s ev_id=%s; acking", ...) then continue — the batch-level ack() at the end of the loop covers it.

Commits (TDD):

  • Red: 7a0dcbbtest: red -- warn+ack unhandled event type
  • Green: ca0e9effix(clients/python): warn+ack unhandled event types

Test renamed: test_consumer_nacks_unhandled_event_typetest_consumer_warns_and_acks_unhandled_event_type

Finding 2: Docs bug — str payload contract misdocumented (client.py)

Unchanged from original PR #121. send() and send_batch() docstrings now document that str must be valid JSON text.

Test plan

  • PGQUE_TEST_DSN= pytest clients/python/tests/test_consumer.py::test_consumer_warns_and_acks_unhandled_event_type — was failing on 7a0dcbb (red), passes on ca0e9ef (green)
  • Full suite: PGQUE_TEST_DSN= pytest clients/python/tests/ — no regressions

@NikolayS
Copy link
Copy Markdown
Owner Author

REV review — PR #121

Independent reviewer here. Verdict only — no approve/merge.

Verdict: REQUEST CHANGES (minor)

The two fixes are correct in shape and the headline bug is genuinely closed.
Test test_consumer_nacks_unhandled_event_type is a real red→green for the
silent-drop bug. Two issues below are worth addressing before merge; everything
else is a nit.

Counts

  • Blockers: 0
  • Concerns: 2
  • Nits: 3

Concerns

C1 — Test is mildly racy / could degrade silently (test_consumer.py)
The new test uses retry_after=0 and runs the consumer for 3 s with
poll_interval=1. If the message bounces back into the active batch fast
enough (after each ack of the empty-handler batch + the next tick) and gets
nacked 5 more times, it lands in DLQ instead of retry_queue. The assertion
select count(*) from pgque.retry_queue ... >= 1 would then fail despite the
fix being correct.

In practice the rotation requires pgque.maint() to move retry rows back to
the main queue, and the test does not call maint(), so this is unlikely to
fire today. But it is fragile to future test-fixture changes (e.g. adding a
maint loop). Two cheap hardenings:

  • assert against retry_queue OR dead_queue (union all), so the
    test passes whichever terminal state the message reached;
  • or set retry_after to a visibly non-zero value (say 30) so the message
    cannot loop within the 3 s window.

C2 — send_batch still inconsistent with send for None (client.py)
The doc clarification only touches dict/list/str. Compare:

  • send(payload=None) → coerced to "null" then cast to jsonb → stored as
    JSON null. Documented in the new docstring.
  • send_batch(payloads=[None, ...])None skips the isinstance branch
    and is passed to psycopg as Python None → SQL NULL jsonb (not JSON
    null).

Two different semantics for the same input. PR #121 explicitly closes #111
finding 2, which is about "str payloads misdocumented" — but the docstring
update on send_batch mentions only dict/list/str and now leaves a
silent gotcha for None. Suggest one of:

  • match send: coerce None"null" in the comprehension; or
  • explicitly document in send_batch that None becomes SQL NULL, not
    JSON null, so callers don't expect symmetry with send().

This is small but it's literally the class of bug the second half of the PR
exists to fix, so worth getting right in the same PR.


Nits

N1 — README in clients/python/ not updated. The package README's
quickstart shows @consumer.on("order.created") but no longer mentions the
new "unhandled type → nack" default. The behavior change is user-visible
(a previously silent case now writes to retry_queue and may eventually
reach DLQ), so a one-liner under "Consumer behavior" or in the quickstart
would help users who rely on the old silent-drop semantics. Not a release
blocker but cheap.

N2 — Log message tense. "no handler for type=%r, nacking msg_id=%d" is
fine, but the fix path also covers the * default-handler case via
self._handlers.get(msg.type, self._default_handler) and only nacks when
both miss. The message reads a bit ambiguously — consider
"no handler (and no '*' default) for type=%r, nacking msg_id=%d" to make
the precondition explicit.

N3 — nack(reason="unhandled event type") propagation. Looked at
PgqueClient.nackreason is passed positionally to pgque.nack(...)
SQL, which stores it on the DLQ row when max retries is exceeded. No
logging/error-message leakage path, so security surface is zero. Just noting
for the record so the reviewer who flagged this in step 1 doesn't have to
re-derive it.


Confidence

High on C1/C2 reasoning. Manually traced _poll_once against
PgqueClient.nack/ack signatures and the pgque.nack SQL signature in
sql/pgque.sql (default queue_max_retries = 5). PR #116 (canonical/
idempotent nack) is still open as of review, but the nack path used here
goes through ROW(...)::pgque.message with the message's own batch_id and
msg_id, so it does not rely on the canonical-lookup fix.

Anti-leak

Diff + body + commits scanned. No leak markers found.

Out of scope (do not block this PR)

@NikolayS
Copy link
Copy Markdown
Owner Author

REV Review — PR #121 (round 2)

CI: green (all test (14..18), verify, client-smoke pass; claude-review pending — non-blocking auto-review).

R1 concerns + nits status:

  • C1 (test fragility) — addressed. retry_after=30 now set in test_consumer_nacks_unhandled_event_type; with the 3 s consumer window the nacked message physically cannot loop back, so the retry_queue assertion can no longer flap into DLQ. Comment in the test makes the reasoning explicit.
  • C2 red — addressed. test_send_batch_none_payload_produces_json_null in test_send.py exercises send_batch([None]) and asserts the round-tripped payload is JSON null (Python None after psycopg JSONB decode). Test correctly asserts the symmetry contract.
  • C2 green — addressed. send_batch comprehension now reads json.dumps(p) if isinstance(p, (dict, list)) else ("null" if p is None else p) — matches send() exactly. send and send_batch now have identical semantics for {dict, list, str, None}.
  • N1 — addressed. clients/python/README.md quickstart adds the @consumer.on("*") catch-all block with the "nacked rather than silently dropped" note.
  • N2 — addressed. Log message reads "no handler (and no '*' default) for type=%r, nacking msg_id=%d".
  • N3 — was already informational, no change needed.

Verdict: READY FOR USER REVIEW

Blocking

None.

Non-blocking

None new.

Potential

  • Minor: the C2 red test asserts JSON null via client.receive() payload decode rather than a raw select payload from pgque.queue_data_X introspection. This still detects the bug because the pre-fix path passed Python None straight to psycopg → SQL NULL JSONB column, and a ::jsonb[] array containing a true SQL NULL element would either fail the cast or surface as a missing row. In practice the test reliably reds before the fix, so this is a stylistic note only.

Summary table

Item Status
C1 retry_after=30 resolved
C2 send_batch None coercion resolved
C2 red test exists & is meaningful resolved
N1 README catch-all resolved
N2 log message clarity resolved
CI green yes (claude-review pending only)
Counts 0 blockers / 0 concerns / 0 nits

Anti-leak

Diff and commit messages clean. One marker found in the PR body (Round-4 client contract review) — recommend the author quietly edit the PR description to drop the "Round-4" wording before merge to keep the public PR free of internal review-round labels. Not blocking review.


REV-style review (security, bugs, tests, guidelines, docs). SOC2 items skipped per project policy.

@NikolayS
Copy link
Copy Markdown
Owner Author

Comment-scrub sweep applied per CLAUDE.md style: no scrub needed — changes are docstrings on public API and log message improvements, no bug-narrative source comments.

@NikolayS NikolayS changed the title fix(clients/python): consumer nacks unhandled message types fix(clients/python): consumer warns + acks unhandled event types Apr 30, 2026
Copy link
Copy Markdown
Owner Author

Behavior change landed — branch updated with two new commits on top of c67d0d4:

  • 7a0dcbb test: red -- warn+ack unhandled event type (failing before fix)
  • ca0e9ef fix(clients/python): warn+ack unhandled event types (green)

What changed:

  • Unhandled event type (no handler registered, no * catch-all): was nack() → now logging.warning(...) + ack. The message does not enter retry_queue or DLQ.
  • Log line: no handler for event type=<type> ev_id=<id>; acking emitted on the consumer's own logger (pgque.consumer.<name>).
  • Old test test_consumer_nacks_unhandled_event_type removed; replaced by test_consumer_warns_and_acks_unhandled_event_type (asserts zero retry_queue entries + WARNING with type and id in message).
  • README example comment updated to reflect warn+ack behavior.
  • send()/send_batch() docstring fix from original PR fix(clients/python): consumer warns + acks unhandled event types #121 retained unchanged.

Generated by Claude Code

Copy link
Copy Markdown
Owner Author

Round-3 review (delta on top of r2 PASS)

Verdict: PASS -- no blockers.

Scope: just the two new commits 7a0dcbb (red) and ca0e9ef (green) that flip unhandled-event-type handling from nack-then-retry to warn+ack. CI is 8/8 green at ca0e9ef.

Behavior change checks

  1. Test really exercises warn+ack (not passing for the wrong reason). test_consumer_warns_and_acks_unhandled_event_type does three independent things:

    • Sends one event with type totally.unregistered.type and no specific handler and no "*" default. Important: the absence of a "*" handler is what distinguishes this from test_consumer_default_handler_catches_unknown -- it actually hits the new handler is None branch.
    • Asserts pgque.retry_queue count == 0 for the queue -- proves it was acked, not nacked.
    • Asserts a WARNING record exists whose message contains both the event type string and the event id.
      This is a tight, behavioral test; it would fail loudly if anyone restored the old nack path.
  2. WARNING emitted on the consumer's own logger. Consumer.__init__ sets self._log = logging.getLogger(f"pgque.consumer.{name}") and the unhandled branch uses self._log.warning(...). Callers can route per-consumer logs by name. The test captures via caplog.at_level(logging.WARNING, logger="pgque") and relies on the standard parent-logger propagation from pgque.consumer.<name> up to pgque -- which is the right way to do it (and also the right way for users to filter).

  3. No nack on the unhandled-type path. In _poll_once:

    handler = self._handlers.get(msg.type, self._default_handler)
    if handler is None:
        self._log.warning("no handler for event type=%s ev_id=%s; acking", msg.type, msg.msg_id)
        continue

    The continue skips the try/except/nack block; the final client.ack(batch_id) after the loop acks the whole batch. nack is still reachable only from genuine handler exceptions, which test_consumer_nacks_on_handler_error continues to cover.

  4. Docs match the new behavior. Class docstring's "Handler return semantics" section now describes warn+ack for the no-handler case and points users at registering a specific handler or "*". README's Quickstart adds a comment block above the "*" example calling out exactly the same thing. Wording is consistent across docstring/README/test docstring.

  5. Anti-leak compliance. Test imports only pgque (and stdlib logging/threading/time); no pgque.consumer / pgque.client reach-throughs were introduced. Old test_consumer_nacks_unhandled_event_type is gone -- no stale assertions left.

Non-blocking nit (optional, do not gate on this)

The warning format string says ev_id=%s (using msg.msg_id). It works -- and the test passes because %s stringifies the int and str(msg_id) in m matches -- but elsewhere the codebase tends to use msg_id as the canonical name. If you ever touch this line again, msg_id=%s would be marginally more grep-friendly. Not worth a separate commit.

Summary

Behavior change is correctly red-then-green, aligned with PgQ/skytools heritage (default ack, surface the gap in logs), retry semantics for genuine handler failures are preserved, anti-leak posture intact. Ship it.


Generated by Claude Code

NikolayS and others added 9 commits April 30, 2026 14:52
Red test for #111: Consumer silently acks messages with no
registered handler; verify they land in retry_queue instead.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Consumer previously skipped messages with no registered handler
and then acked the whole batch, silently dropping them. Now calls
nack() with reason "unhandled event type" so they go to retry queue.

Fixes #111 finding 1.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
The send()/send_batch() docstrings claimed str payloads are sent
"as-is", obscuring that they are always cast to jsonb. A bare
Python str like "hello" fails; it must be JSON text like '"hello"'.
Clarify with examples and a note about the error.

Fixes #111 finding 2.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
C1: bump retry_after=30 in test_consumer_nacks_unhandled_event_type
so the message cannot loop back within the 3-second window.

C2 (red): add test_send_batch_none_payload_produces_json_null, which
currently fails because send_batch passes Python None as SQL NULL
instead of JSON null.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
C2: coerce Python None to "null" in send_batch comprehension so
send(None) and send_batch([None]) both store JSON null, not SQL NULL.

N1: README Consumer section notes that messages with no registered
handler are nacked (not silently dropped); shows '*' catch-all example.

N2: log message now reads "no handler (and no '*' default)" to make
the precondition explicit.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@NikolayS NikolayS force-pushed the fix/python-consumer-111 branch from 231789d to 03ab8ab Compare April 30, 2026 14:52
Copy link
Copy Markdown
Owner Author

Rebased onto origin/main (714007e) via --force-with-lease.

Conflicts: None. The rebase applied all 9 commits cleanly. The #129 changes (thread-safe signal handling in Consumer.start(), conn.commit() before force_tick in tests) and the #121 changes (warn+ack for unhandled event types, docstring clarifications) touch orthogonal code paths and did not conflict.

New head: 03ab8abf197c6ae7590def432fd560ab5e0e23e8

CI: pending — checks just triggered on the new head.


Generated by Claude Code

Copy link
Copy Markdown
Owner Author

REV final review — post-rebase (head 03ab8abf)

Verdict: PASS

Scope: re-review after rebase of fix/python-consumer-111 onto current main (714007e, post-#129/#130/#119/#120). Adds doc-nit commit 231789d/03ab8ab (docs(clients/python): clarify unhandled-type ack semantics) on top of the previously-PASSed ca0e9ef.

CI: 8/8 green at 03ab8abf (test 14..18, verify, client-smoke, claude-review). No conflicts during rebase — the #129 thread-safe signal-handling diff and the #121 warn+ack diff sit in orthogonal regions of consumer.py.

1. Security

Warning format "no handler for event type=%s ev_id=%s; acking" only interpolates msg.type (event type label, producer-supplied but not user-payload) and msg.msg_id (int). Payload bytes are not logged → no log-injection-via-payload surface. Per-instance logger name is f"pgque.consumer.{name}" where name is the consumer name passed by the caller (own code, not network input). Acceptable.

2. Bugs / rebase regression

  • Thread-safe start() from fix(clients/python): make Consumer thread-safe and correct test xact pattern #129 (signal registration only on main thread, save/restore original_sigterm/original_sigint in try/finally) is intact and untouched.
  • Warn+ack path: handler is Noneself._log.warning(...)continue (skips the try/except/nack block). The post-loop client.ack(batch_id) covers the unhandled message. Verified against the file at 03ab8abf.
  • nack only reachable from genuine handler exceptions — test_consumer_nacks_on_handler_error continues to gate this.

3. Test analysis

  • test_consumer_warns_and_acks_unhandled_event_type still asserts the two real properties: retry_queue count == 0 for the queue (proves ack, not nack) AND a WARNING record whose message contains both "totally.unregistered.type" and str(msg_id). The test correctly distinguishes the no-handler path from test_consumer_default_handler_catches_unknown (no * registered).
  • test_send_batch_none_payload_produces_json_null (the C2 regression test from r2) is still present in test_send.py, asserts JSON null round-trip via client.receive(). Both send and send_batch continue to coerce None → "null".
  • caplog.at_level(..., logger="pgque") works because pgque.consumer.<name> propagates up to pgque — that's the canonical pytest pattern.

4. Guidelines

  • Conventional Commits respected on all 9 commits (feat:, fix(clients/python):, test:, docs(clients/python):).
  • Anti-leak: scanned diff + body + commit messages + new doc-nit commit. No internal review-round markers, no Claude-session URLs that shouldn't be there (the https://claude.ai/code/... link is the project's standard footer pattern, not a leak).
  • No BEGIN ... EXCEPTION in hot paths added; no SECURITY DEFINER SQL touched.

5. Docs

Both the original quickstart catch-all comment block AND the new doc-nit Note: line are present in clients/python/README.md. They say substantively the same thing; the Note: line is slightly redundant alongside the three-line comment immediately above it. Readable as-is — non-blocking. Class docstring in consumer.py matches: "no handler … WARNING is logged and the message is acked. Register a handler … or use a \"*\" catch-all".

6. Architecture

Consumer remains a justified composition over PgqueClient (receive → dispatch → ack/nack), not a 1:1 SQL-API mirror — it sits above the wrapper layer. No new public methods were added in this PR; the only new state is the per-instance self._log. Driver-mirrors-SQL rule is for PgqueClient (untouched in this PR's runtime path); Consumer is a sanctioned higher-level convenience.

Cross-driver divergence (note, not a blocker): Go client's consumer auto-nacks unhandled types (the previous Python behavior). Python is now the canonical "warn+ack" — Go should be aligned in a separate PR. Do not gate this PR on it.

Counts

  • Blockers: 0
  • Concerns: 0
  • Nits: 1 (README has slightly redundant doc lines for unhandled-type behavior — feel free to fold the Note: sentence into the existing comment block in a follow-up; not worth a force-push here).

Anti-leak

Clean.


REV-style review (security, bugs, tests, guidelines, docs, architecture). SOC2 skipped per project policy. No formal approve issued.


Generated by Claude Code

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Client/docs: Python consumer/client contract can drop unhandled messages and misdocuments string payloads

2 participants