-
Notifications
You must be signed in to change notification settings - Fork 0
feat(uptime): Add ability to use queues to manage parallelism #9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: kafka-consumer-parallel-before
Are you sure you want to change the base?
feat(uptime): Add ability to use queues to manage parallelism #9
Conversation
One potential problem we have with batch processing is that any one slow item will clog up the whole batch. This pr implements a queueing method instead, where we keep N queues that each have their own workers. There's still a chance of individual items backlogging a queue, but we can try increased concurrency here to reduce the chances of that happening <!-- Describe your PR here. -->
WalkthroughA new "thread-queue-parallel" processing mode has been introduced for Kafka message consumers, featuring a fixed pool of ordered, thread-backed queues for concurrent processing while preserving group order. This includes new queue and offset tracking classes, updates to consumer factories, and comprehensive unit and integration tests, including real Kafka integration. Changes
Sequence Diagram(s)sequenceDiagram
participant Kafka as Kafka Broker
participant Consumer as ThreadQueueParallelConsumer
participant QueuePool as FixedQueuePool
participant Worker as OrderedQueueWorker
participant OffsetTracker as OffsetTracker
Kafka->>Consumer: Delivers message
Consumer->>QueuePool: Submit WorkItem(group_key, message)
QueuePool->>Worker: Assign WorkItem to queue (by group_key hash)
Worker->>OffsetTracker: Mark offset as in-progress
Worker->>Worker: Process message (callback)
Worker->>OffsetTracker: Mark offset as complete
Note over OffsetTracker: Tracks committable offsets per partition
OffsetTracker->>Consumer: Report committable offsets (periodically)
Consumer->>Kafka: Commit offsets
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
✨ Finishing Touches
🧪 Generate unit tests
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
✅ Actions performedReview triggered.
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (5)
tests/sentry/uptime/consumers/test_results_consumer.py (1)
2143-2146
: Usecontextlib.suppress
for cleaner exception handling.- finally: - try: - admin_client.delete_topics([test_topic]) - except Exception: - pass + finally: + with contextlib.suppress(Exception): + admin_client.delete_topics([test_topic])Don't forget to add the import at the top of the file:
import contextlibsrc/sentry/remote_subscriptions/consumers/result_consumer.py (1)
131-137
: Consider documenting the default queue count rationale.The default of 20 queues when
max_workers
is not specified seems arbitrary. Consider adding a comment explaining the rationale or making this configurable through settings.if mode == "thread-queue-parallel": self.thread_queue_parallel = True + # Default to 20 queues to balance between parallelism and resource usage + # This allows processing up to 20 different subscription groups concurrently self.queue_pool = FixedQueuePool( result_processor=self.result_processor, identifier=self.identifier, num_queues=max_workers or 20, # Number of parallel queues )tests/sentry/remote_subscriptions/consumers/test_queue_consumer.py (2)
22-63
: Consider adding test cases for edge cases and concurrency.The current tests cover basic functionality well, but consider adding tests for:
- Thread safety with concurrent offset operations
- Handling duplicate offsets
- Out-of-order offset completion scenarios
- Behavior with empty partitions
def test_concurrent_offset_operations(self): """Test thread safety of offset operations.""" import concurrent.futures def add_and_complete_offsets(start_offset): for i in range(100): offset = start_offset + i self.tracker.add_offset(self.partition1, offset) self.tracker.complete_offset(self.partition1, offset) with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: futures = [executor.submit(add_and_complete_offsets, i * 100) for i in range(4)] concurrent.futures.wait(futures) # Verify all offsets were tracked correctly committable = self.tracker.get_committable_offsets() assert self.partition1 in committable def test_out_of_order_completion(self): """Test handling of out-of-order offset completion.""" self.tracker.add_offset(self.partition1, 100) self.tracker.add_offset(self.partition1, 101) self.tracker.add_offset(self.partition1, 102) # Complete out of order self.tracker.complete_offset(self.partition1, 102) self.tracker.complete_offset(self.partition1, 100) committable = self.tracker.get_committable_offsets() assert committable == {self.partition1: 100} self.tracker.complete_offset(self.partition1, 101) committable = self.tracker.get_committable_offsets() assert committable == {self.partition1: 102}
164-173
: Simplify group extraction logic.The group extraction logic can be simplified for better readability.
- groups_seen = set() - for _, item in self.processed_items: - if item.startswith("item_group_"): - # Extract the group number (0, 1, or 2) - parts = item.split("_") - if len(parts) >= 3: - group_num = parts[2] - groups_seen.add(group_num) + groups_seen = { + item.split("_")[2] + for _, item in self.processed_items + if item.startswith("item_group_") and len(item.split("_")) >= 3 + }src/sentry/remote_subscriptions/consumers/queue_consumer.py (1)
222-229
: Consider using a more efficient wait mechanism.The current implementation uses a busy-wait loop with 10ms sleeps, which could be inefficient.
Consider using a condition variable or event that gets signaled when queues become empty, or at least increase the sleep interval for longer waits:
def wait_until_empty(self, timeout: float = 5.0) -> bool: """Wait until all queues are empty. Returns True if successful, False if timeout.""" start_time = time.time() + sleep_interval = 0.01 while time.time() - start_time < timeout: if self.get_stats()["total_items"] == 0: return True - time.sleep(0.01) + time.sleep(min(sleep_interval, timeout - (time.time() - start_time))) + # Exponential backoff up to 100ms + sleep_interval = min(sleep_interval * 1.5, 0.1) return False
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
src/sentry/consumers/__init__.py
(2 hunks)src/sentry/remote_subscriptions/consumers/queue_consumer.py
(1 hunks)src/sentry/remote_subscriptions/consumers/result_consumer.py
(7 hunks)tests/sentry/remote_subscriptions/consumers/test_queue_consumer.py
(1 hunks)tests/sentry/uptime/consumers/test_results_consumer.py
(5 hunks)
🧰 Additional context used
🪛 Ruff (0.12.2)
tests/sentry/uptime/consumers/test_results_consumer.py
2143-2146: Use contextlib.suppress(Exception)
instead of try
-except
-pass
Replace with contextlib.suppress(Exception)
(SIM105)
🔇 Additional comments (11)
src/sentry/consumers/__init__.py (1)
121-121
: LGTM!The addition of
"thread-queue-parallel"
mode and the generalized help text for--max-workers
are appropriate changes that align with the new processing strategy implementation.Also applies to: 141-141
tests/sentry/uptime/consumers/test_results_consumer.py (2)
1689-2010
: Excellent test coverage for the new thread-queue-parallel mode!The test suite comprehensively covers:
- Message processing and ordering guarantees
- Concurrent subscription handling
- Offset commit semantics
- Error resilience
- Graceful shutdown behavior
The use of polling with timeouts and proper assertions ensures test reliability.
2012-2142
: Well-structured Kafka integration test!The integration test properly verifies end-to-end functionality with real Kafka, including:
- Topic creation and cleanup
- Message production and consumption
- Offset commit verification
- Proper resource cleanup
This provides valuable confidence in the implementation's correctness.
src/sentry/remote_subscriptions/consumers/result_consumer.py (3)
244-259
: Well-implemented thread-queue-parallel worker creation!The implementation correctly:
- Uses the queue pool for concurrent processing
- Properly decodes payloads using the existing decoder
- Groups messages by subscription ID for order preservation
- Adjusts offsets by +1 before committing (following Kafka conventions)
183-185
: Proper cleanup of queue_pool resources.The shutdown method correctly handles the queue_pool lifecycle, ensuring threads are properly terminated.
118-118
: Eager initialization ofresult_processor
is safe across all modesI verified that none of the
ResultProcessor
subclasses (includingUptimeResultProcessor
and the test mocks) override__init__
or maintain mutable state, so instantiatingself.result_processor = self.result_processor_cls()
once in__init__
has no side effects. All processing modes (serial
,parallel
,batched-parallel
, andthread-queue-parallel
) continue to use the same, stateless processor instance without altering behavior.No changes needed.
tests/sentry/remote_subscriptions/consumers/test_queue_consumer.py (2)
209-373
: Well-structured and comprehensive test coverage!The tests effectively cover critical scenarios including:
- Message processing and offset committing
- Order preservation within groups
- Concurrent processing across groups
- Invalid message handling
- Offset gap handling (crucial for correct commit behavior)
The use of threading synchronization primitives and helper methods makes the tests clear and reliable.
375-422
: Good integration test with the factory!The test effectively verifies:
- Factory creates the correct strategy type
- Configuration parameters are properly passed
- Queue pool is initialized with the correct number of workers
- Proper shutdown handling
The use of mocks appropriately isolates the test while still verifying the integration points.
src/sentry/remote_subscriptions/consumers/queue_consumer.py (3)
108-157
: Well-implemented worker thread with proper error handling!The implementation correctly:
- Handles shutdown gracefully with queue.ShutDown
- Ensures offset completion in the finally block
- Provides good observability with Sentry transactions and metrics
- Logs exceptions without crashing the worker
273-292
: Well-designed commit loop with proper error handling!The commit loop implementation:
- Runs independently in a daemon thread
- Provides regular commit intervals
- Handles errors gracefully without crashing
- Properly updates offset tracker after successful commits
49-54
: Fix potential race condition in lock creation.There's a time-of-check-time-of-use (TOCTOU) race condition between checking if a lock exists and creating it. Multiple threads could create different locks for the same partition.
def _get_partition_lock(self, partition: Partition) -> threading.Lock: """Get or create a lock for a partition.""" - lock = self.partition_locks.get(partition) - if lock: - return lock - return self.partition_locks.setdefault(partition, threading.Lock()) + # Use setdefault atomically to avoid race condition + return self.partition_locks.setdefault(partition, threading.Lock())Likely an incorrect or invalid review comment.
try: | ||
result = self.decoder(message.payload) | ||
|
||
assert isinstance(message.value, BrokerValue) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replace assert with proper error handling.
Using assert
for runtime validation can be problematic as assertions can be disabled with Python's -O
flag.
- assert isinstance(message.value, BrokerValue)
+ if not isinstance(message.value, BrokerValue):
+ logger.error("Message value is not a BrokerValue", extra={"message_type": type(message.value)})
+ return
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
assert isinstance(message.value, BrokerValue) | |
if not isinstance(message.value, BrokerValue): | |
logger.error( | |
"Message value is not a BrokerValue", | |
extra={"message_type": type(message.value)}, | |
) | |
return |
🤖 Prompt for AI Agents
In src/sentry/remote_subscriptions/consumers/queue_consumer.py at line 297,
replace the assert statement checking if message.value is an instance of
BrokerValue with proper error handling. Instead of using assert, add a
conditional check and raise an appropriate exception (e.g., TypeError or
ValueError) with a clear error message if the check fails, ensuring the
validation is always enforced regardless of Python optimization flags.
Test 9
Summary by CodeRabbit
New Features
Bug Fixes
Tests