Skip to content

feat(uptime): Add ability to use queues to manage parallelism#81

Open
camcalaquian wants to merge 1 commit into
tenki/base-9from
tenki/head-9
Open

feat(uptime): Add ability to use queues to manage parallelism#81
camcalaquian wants to merge 1 commit into
tenki/base-9from
tenki/head-9

Conversation

@camcalaquian
Copy link
Copy Markdown

@camcalaquian camcalaquian commented Apr 28, 2026

…try#95633)

One potential problem we have with batch processing is that any one slow
item will clog up the whole batch. This pr implements a queueing method
instead, where we keep N queues that each have their own workers.
There's still a chance of individual items backlogging a queue, but we
can try increased concurrency here to reduce the chances of that
happening

<!-- Describe your PR here. -->
@tenki-reviewer
Copy link
Copy Markdown

tenki-reviewer Bot commented Apr 28, 2026

Tenki Code Review - Complete

Files Reviewed: 7
Findings: 4

By Severity:

  • 🔴 High: 1
  • 🟠 Medium: 3

PR introduces a new thread-queue-parallel processing mode for remote subscriptions with per-queue offset tracking. Code review identified 5 findings: 1 critical data loss bug in graceful shutdown, 2 medium-severity bugs in offset tracking and control flow clarity, 1 medium performance issue with double Sentry transactions, and 1 low-severity style issue.

Files Reviewed (7 files)
src/sentry/consumers/__init__.py
src/sentry/remote_subscriptions/consumers/queue_consumer.py
src/sentry/remote_subscriptions/consumers/result_consumer.py
tests/sentry/remote_subscriptions/__init__.py
tests/sentry/remote_subscriptions/consumers/__init__.py
tests/sentry/remote_subscriptions/consumers/test_queue_consumer.py
tests/sentry/uptime/consumers/test_results_consumer.py

Copy link
Copy Markdown

@tenki-reviewer tenki-reviewer Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overview

This PR adds a thread-queue-parallel processing mode to Sentry's remote subscriptions consumer system. The implementation uses a fixed pool of queues with dedicated worker threads and includes offset tracking to ensure safe Kafka offset commits.

Key Changes

  • New module: queue_consumer.py with FixedQueuePool, OffsetTracker, OrderedQueueWorker, and SimpleQueueProcessingStrategy
  • Integration: result_consumer.py wires the new mode alongside existing serial/parallel/batched-parallel modes
  • CLI: New mode choice added to uptime options
  • Tests: Comprehensive test suite for offset tracking, ordering guarantees, and integration scenarios

Critical Issues

⚠️ High Severity: Data Loss on Graceful Shutdown

The close() method in SimpleQueueProcessingStrategy commits offsets before draining the worker queues. Any messages still queued at shutdown time won't have their offsets committed, causing Kafka to redeliver them on restart (potential duplicate processing). The pool includes a wait_until_empty() method designed exactly for this but is never called.

Code Quality Issues (Medium)

  1. Offset Tracker Safety: mark_committed() cleans all_offsets but not outstanding, which could leave stale outstanding markers if worker threads are killed.
  2. Double Sentry Transactions: Every message creates two nested Sentry transactions (worker-level + ResultProcessor-level), adding unnecessary overhead.
  3. Dead Code: Inner except queue.ShutDown in OrderedQueueWorker.run() is unreachable and misleading.
  4. Fragile Control Flow: create_with_partitions() uses three independent if statements instead of if/elif/elif/else, relying on early returns for correctness.

Testing

The test suite is thorough, covering offset tracking, ordering semantics, concurrent processing, and error handling. However, tests do not catch the graceful shutdown bug because they do not verify that pending queue items are flushed before commit.

Recommendations

  1. Before merge: Fix the graceful shutdown bug by reordering close() to drain queues first
  2. Before merge: Add safety net to mark_committed() for the outstanding set
  3. Before merge: Remove dead code and double Sentry transaction wrapping
  4. Before merge: Strengthen control flow clarity with proper if/elif chain

Comment on lines +335 to +338
def close(self) -> None:
self.shutdown_event.set()
self.commit_thread.join(timeout=5.0)
self.queue_pool.shutdown()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 close() commits offsets before worker queues are drained, causing missed commits on graceful shutdown (bug)

In SimpleQueueProcessingStrategy.close() (lines 335–338), the shutdown sequence is: (1) set shutdown_event, (2) join commit_thread (max 5 s), then (3) call queue_pool.shutdown(). The commit thread performs one final pass over completed offsets before worker threads are asked to stop or drain. Any work items still queued in the FixedQueuePool at that moment have not been processed, so complete_offset has not been called for them. The final commit therefore misses these offsets. On restart the broker will redeliver all uncommitted messages. Notably, FixedQueuePool.wait_until_empty() exists precisely for this purpose but is never called in close().

💡 Suggestion: Drain the worker queues before signalling the commit thread to shut down. Call queue_pool.wait_until_empty() (with an appropriate timeout) before setting shutdown_event, so the final commit pass sees all in-flight work as completed. Then proceed to join the commit thread and shutdown the pool.

📋 Prompt for AI Agents

In src/sentry/remote_subscriptions/consumers/queue_consumer.py, SimpleQueueProcessingStrategy.close() at lines 335-338: the current order is shutdown_event.set()commit_thread.join()queue_pool.shutdown(). This means the final commit happens before worker queues are drained, so in-flight queued items won't have their offsets committed. Fix by reordering: first call self.queue_pool.wait_until_empty(timeout=30.0) to drain pending work, then self.shutdown_event.set(), then self.commit_thread.join(timeout=5.0), then self.queue_pool.shutdown(). This ensures all queued items are processed and their offsets marked complete before the final commit pass runs.

Comment on lines +142 to +143
except queue.ShutDown:
break
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟠 Dead code: inner except queue.ShutDown in OrderedQueueWorker.run() is unreachable (bug)

In OrderedQueueWorker.run() (lines 127–156), queue.ShutDown is caught in two places. The outer try/except at lines 130–133 correctly handles it from work_queue.get() — when ShutDown is raised, break exits the while loop and the inner try is never entered. The inner try/except at lines 135–143 only wraps sentry_sdk.start_transaction and self.result_processor(...), neither of which can raise queue.ShutDown. The inner except queue.ShutDown: break at lines 142–143 is therefore permanently unreachable dead code. This misleads readers into thinking there is a ShutDown-related exit path from inside the processing block, when no such path exists.

💡 Suggestion: Remove the unreachable except queue.ShutDown: break block at lines 142-143. The outer try/except already handles ShutDown from work_queue.get() correctly.

📋 Prompt for AI Agents

In src/sentry/remote_subscriptions/consumers/queue_consumer.py, OrderedQueueWorker.run() at lines 142-143: remove the except queue.ShutDown: break block entirely. It is dead code — queue.ShutDown can only be raised by queue.get() (handled by the outer try/except at lines 130-133), not by sentry_sdk.start_transaction or self.result_processor. The outer break already exits the while loop correctly. Removing this block makes the control flow clearer and eliminates the misleading implication that ShutDown can propagate from the inner try.

Comment on lines +136 to +140
with sentry_sdk.start_transaction(
op="queue_worker.process",
name=f"monitors.{self.identifier}.worker_{self.worker_id}",
):
self.result_processor(self.identifier, work_item.result)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟠 Double Sentry transaction wrapping: every processed item creates two nested transactions (performance)

In OrderedQueueWorker.run() (lines 136–139 of queue_consumer.py), self.result_processor(...) is called inside a sentry_sdk.start_transaction(op='queue_worker.process', ...) block. However, ResultProcessor.__call__ (in result_consumer.py lines 48–52) also wraps its work in sentry_sdk.start_transaction(op='result_processor.handle_result', ...). This creates two nested Sentry transactions for every single message processed in thread-queue-parallel mode. The overhead is two transaction objects and all associated SDK bookkeeping per message, plus confusing trace hierarchies. The other processing modes (serial, batched-parallel) do not have this duplication because the worker-level wrapper doesn't exist there.

💡 Suggestion: Remove the sentry_sdk.start_transaction wrapper in OrderedQueueWorker.run() and rely solely on the transaction created inside ResultProcessor.__call__. Alternatively, pass a raw callable that bypasses ResultProcessor.__call__'s transaction wrapping, but the simpler fix is to remove the worker-level wrapper since ResultProcessor already provides full instrumentation.

📋 Prompt for AI Agents

In src/sentry/remote_subscriptions/consumers/queue_consumer.py, OrderedQueueWorker.run() at lines 136-140: remove the sentry_sdk.start_transaction context manager wrapper. Change the inner try block so it calls self.result_processor(self.identifier, work_item.result) directly without the with sentry_sdk.start_transaction(...) wrapper. This avoids creating two nested Sentry transactions per message, since ResultProcessor.__call__ in result_consumer.py already wraps the call in its own start_transaction.

Comment on lines +100 to +105
def mark_committed(self, partition: Partition, offset: int) -> None:
"""Update the last committed offset for a partition."""
with self._get_partition_lock(partition):
self.last_committed[partition] = offset
# Remove all offsets <= committed offset
self.all_offsets[partition] = {o for o in self.all_offsets[partition] if o > offset}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟠 OffsetTracker.mark_committed() does not clean outstanding set, risking permanent stalls (bug)

In OffsetTracker.mark_committed() (lines 100–105), the method removes old offsets from all_offsets but never removes them from outstanding. Under normal operation this is harmless because complete_offset removes entries from outstanding before mark_committed is called. However, if a worker thread is killed between add_offset and the finally block that calls complete_offset, that offset stays in outstanding forever, permanently blocking get_committable_offsets for that partition because the scan loop breaks on the first outstanding offset in the range. This is a safety/resilience issue: if any worker thread crashes, the partition becomes stalled.

💡 Suggestion: In mark_committed, also clean the outstanding set for all entries <= offset, mirroring the all_offsets cleanup. This provides a safety net against offsets that were added but whose complete_offset was never called (e.g., worker crash before finally executes).

📋 Prompt for AI Agents

In src/sentry/remote_subscriptions/consumers/queue_consumer.py, OffsetTracker.mark_committed() at lines 100-105: after the line self.all_offsets[partition] = {o for o in self.all_offsets[partition] if o > offset}, add a similar cleanup for the outstanding set: self.outstanding[partition] = {o for o in self.outstanding[partition] if o > offset}. This ensures that if complete_offset was never called for an offset (e.g., due to an abrupt thread termination before the finally block), mark_committed still cleans up the stale entry and prevents that partition from being permanently blocked in get_committable_offsets.

@camcalaquian camcalaquian marked this pull request as draft April 30, 2026 16:18
@camcalaquian camcalaquian marked this pull request as ready for review April 30, 2026 16:19
Copy link
Copy Markdown

@devin-ai-integration devin-ai-integration Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 2 potential issues.

View 6 additional findings in Devin Review.

Open in Devin Review

Comment on lines +335 to +338
def close(self) -> None:
self.shutdown_event.set()
self.commit_thread.join(timeout=5.0)
self.queue_pool.shutdown()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 close() stops commit thread before draining queue pool, causing uncommitted offsets

In SimpleQueueProcessingStrategy.close(), the commit thread is signaled to stop and joined before the queue pool is shut down. FixedQueuePool.shutdown() at queue_consumer.py:231-243 calls q.shutdown(immediate=False) which allows workers to finish processing remaining queued items, then joins workers. During this drain phase, workers complete offsets via self.offset_tracker.complete_offset() in the finally block at queue_consumer.py:149, but the commit thread has already exited, so these offsets are never committed to Kafka.

This means on every consumer restart or rebalance, any items that were in the work queues when close() was called will be processed (side effects executed) but their offsets won't be committed—causing systematic duplicate processing of those messages on restart.

Prompt for agents
The close() method in SimpleQueueProcessingStrategy stops the commit thread before draining the queue pool. The correct order should be:

1. First, drain the queue pool (shut down workers so they finish processing remaining items)
2. Then do a final commit of all completed offsets
3. Then stop the commit thread

In close() at queue_consumer.py:335-338, the queue_pool.shutdown() should be called first to drain workers, then the commit loop should do one final commit pass before exiting. One approach: call queue_pool.shutdown() first, then signal the commit thread to do a final commit and exit, then join the commit thread. Alternatively, after draining the pool but before stopping the commit thread, explicitly call get_committable_offsets and commit them synchronously.
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Comment on lines +344 to +345
def join(self, timeout: float | None = None) -> None:
self.close()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 join() ignores timeout parameter, violating Arroyo ProcessingStrategy contract

The join() method accepts a timeout parameter per the Arroyo ProcessingStrategy interface but completely ignores it, unconditionally calling self.close() which has hardcoded 5-second timeouts for both commit_thread.join(timeout=5.0) and each worker.join(timeout=5.0) inside FixedQueuePool.shutdown() at queue_consumer.py:243. This means join() could block for 5s + (5s × num_queues) regardless of the requested timeout, potentially causing issues during Arroyo consumer rebalances where a shorter timeout is expected.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants