-
Notifications
You must be signed in to change notification settings - Fork 89
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[IND-545]: Batch Process Vulcan messages #1018
Conversation
WalkthroughThe recent updates across the Kafka-related parts of the indexer project focus on enhancing message handling and batch processing capabilities. Key changes include the renaming of a function to better reflect its updated purpose, an increase in the default heartbeat interval for Kafka consumers, and the introduction of batch processing features. These modifications aim to improve efficiency and flexibility in processing Kafka messages, aligning with evolving requirements for scalability and performance in message-driven architectures. Changes
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (invoked as PR comments)
Additionally, you can add CodeRabbit Configration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review Status
Actionable comments generated: 0
Configuration used: CodeRabbit UI
Files selected for processing (10)
- indexer/packages/kafka/tests/consumer.test.ts (2 hunks)
- indexer/packages/kafka/src/config.ts (1 hunks)
- indexer/packages/kafka/src/consumer.ts (3 hunks)
- indexer/services/ender/src/helpers/kafka/kafka-controller.ts (2 hunks)
- indexer/services/scripts/src/print-block.ts (2 hunks)
- indexer/services/socks/src/lib/message-forwarder.ts (2 hunks)
- indexer/services/vulcan/src/config.ts (2 hunks)
- indexer/services/vulcan/src/helpers/kafka/kafka-controller.ts (2 hunks)
- indexer/services/vulcan/src/index.ts (1 hunks)
- indexer/services/vulcan/src/lib/on-batch.ts (1 hunks)
Additional comments: 10
indexer/services/ender/src/helpers/kafka/kafka-controller.ts (1)
- 23-23: Renaming
addOnMessageFunction
toupdateOnMessageFunction
aligns with the intention to standardize function naming across the service. Ensure all references and documentation are updated to reflect this change.indexer/services/vulcan/src/config.ts (1)
- 24-30: The introduction of
BATCH_PROCESSING_ENABLED
,KAFKA_BATCH_PROCESSING_COMMIT_FREQUENCY
, andKAFKA_BATCH_PROCESSING_COMMIT_FREQUENCY_MS
with default values enhances the configurability of batch processing. Ensure these new configurations are documented in the service's configuration guide.indexer/packages/kafka/__tests__/consumer.test.ts (1)
- 29-29: Updating the function name in the test case from
addOnMessageFunction
toupdateOnMessageFunction
ensures consistency with the source code changes. Verify that all related test cases have been updated accordingly.indexer/packages/kafka/src/config.ts (1)
- 24-24: Modifying the
KAFKA_HEARTBEAT_INTERVAL_MS
default value from 2000ms to 5000ms could affect consumer behavior, especially in environments with network latency issues. Ensure this change is documented and communicated to users who might need to adjust their configurations accordingly.indexer/services/vulcan/src/helpers/kafka/kafka-controller.ts (1)
- 25-39: The conditional logic to enable batch processing based on the
BATCH_PROCESSING_ENABLED
configuration is a good approach. Ensure that theonBatch
andonMessage
functions are thoroughly tested in both batch processing enabled and disabled scenarios to confirm correct behavior.indexer/services/vulcan/src/index.ts (1)
- 40-40: Modifying
startService
to accept theBATCH_PROCESSING_ENABLED
configuration as a parameter and passing it tostartConsumer
aligns with the introduction of batch processing functionality. Ensure that all calls tostartService
throughout the codebase are updated to pass this configuration.indexer/services/vulcan/src/lib/on-batch.ts (1)
- 11-92: The
onBatch
function implements logic for processing batches of Kafka messages, including error handling for empty batches and committing offsets at a configurable frequency. Ensure that performance testing is conducted, especially for large batches, to optimize the commit frequency and minimize processing delays.indexer/packages/kafka/src/consumer.ts (1)
- 87-102: Extending the Kafka consumer to support batch processing with a configurable flag is a significant enhancement. Ensure that the
onBatchFunction
andonMessageFunction
are thoroughly tested in isolation and in combination to confirm that batch and individual message processing work as expected under various scenarios.indexer/services/scripts/src/print-block.ts (1)
- 69-69: Updating the function name to
updateOnMessageFunction
in the script ensures consistency with the Kafka package changes. Verify that the script's functionality related to Kafka message handling remains unaffected by this change.indexer/services/socks/src/lib/message-forwarder.ts (1)
- 66-66: The update to use
updateOnMessageFunction
in theMessageForwarder
class aligns with the renaming in the Kafka package. Ensure that the message forwarding functionality is thoroughly tested to confirm that messages are correctly handled and forwarded.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review Status
Actionable comments generated: 0
Configuration used: CodeRabbit UI
Files selected for processing (4)
- indexer/packages/kafka/src/config.ts (1 hunks)
- indexer/services/ender/src/helpers/kafka/kafka-controller.ts (2 hunks)
- indexer/services/vulcan/src/config.ts (1 hunks)
- indexer/services/vulcan/src/lib/on-batch.ts (1 hunks)
Files skipped from review as they are similar to previous changes (4)
- indexer/packages/kafka/src/config.ts
- indexer/services/ender/src/helpers/kafka/kafka-controller.ts
- indexer/services/vulcan/src/config.ts
- indexer/services/vulcan/src/lib/on-batch.ts
Changelist
Batch Process Vulcan messages. It doesn't seem like this change makes much of an impact on processing, I believe it's because Vulcan is reading from multiple partitions and the benefit would be improved if there the same number of partitions as instances of vulcan.
See research here
Test Plan
Tested in Staging and internal mainnet
Author/Reviewer Checklist
state-breaking
label.indexer-postgres-breaking
label.PrepareProposal
orProcessProposal
, manually add the labelproposal-breaking
.feature:[feature-name]
.backport/[branch-name]
.refactor
,chore
,bug
.