Skip to content

feat(java): add batch consume support for PushConsumer#1249

Closed
qianye1001 wants to merge 8 commits into
apache:masterfrom
qianye1001:feat/java-batch-consume-push-consumer
Closed

feat(java): add batch consume support for PushConsumer#1249
qianye1001 wants to merge 8 commits into
apache:masterfrom
qianye1001:feat/java-batch-consume-push-consumer

Conversation

@qianye1001

Copy link
Copy Markdown
Contributor

Summary

Add batch message consumption capability to the Java PushConsumer, allowing messages to be accumulated in a buffer and delivered in batches based on configurable policies.

  • BatchMessageListener — new public API interface that receives List<MessageView> and returns a single ConsumeResult for the entire batch
  • BatchPolicy — configurable batching strategy with Builder pattern and sensible defaults (32 msgs, 4MB, 5s). Flush triggers on any of: maxBatchSize, maxBatchBytes, maxWaitTime
  • BatchConsumeService — Standard mode implementation with precise batch splitting (extractBatch respects both count and bytes limits simultaneously) and forward-progress guarantee (oversized single message still flushes)
  • FifoBatchConsumeService — FIFO mode via inheritance with whole-batch retry semantics and single-batch-in-flight constraint. On failure, entire batch is retried until max attempts exhausted, then forwarded to DLQ
  • ConsumeService enhanced — added close() lifecycle method, protected constructor for batch services (no NOOP listener hack), and accessor methods
  • Config validation in builder: maxCacheMessageCount >= maxBatchSize, maxCacheMessageSizeInBytes >= maxBatchBytes
  • Comprehensive tests using awaitility — 34 new test cases covering Standard/FIFO modes, retry, DLQ, concurrency, forward-progress, corrupted message handling, Builder validation
  • BatchPushConsumerExample demonstrating usage

Key Design Decisions

Decision Choice Rationale
Batch result type Single ConsumeResult per batch Simple API; entire batch succeeds or fails together
Buffer scope Global (mixed across ProcessQueues) No PQ-based grouping; messages from all sources batch together
FIFO retry Whole-batch retry Preserves ordering guarantee; only one batch in-flight at a time
Class hierarchy FifoBatchConsumeService extends BatchConsumeService Clean separation via inheritance instead of if-else flag
Oversized messages Forward-progress guarantee Single message > maxBatchBytes still flushes as batch-of-one

Test plan

  • BatchPolicyTest — 15 cases: constructor validation, Builder defaults, Builder custom values, invalid params
  • BatchConsumeTaskTest — 6 cases: success, failure, exception, null return, unmodifiable list, batch size
  • BatchConsumeServiceTest — 13 cases: maxBatchSize flush, maxBatchBytes flush, maxWaitTime flush, Standard failure, FIFO success, FIFO retry+DLQ, FIFO single-batch-in-flight, graceful shutdown, concurrent PQ submission, corrupted message discard, precise batch splitting, forward-progress, FIFO corrupted discard
  • Full mvn -B package passes locally (JDK 11, 223 tests, 0 failures)

🤖 Generated with Claude Code

sa-buc and others added 3 commits May 21, 2026 17:35
Add batch message consumption capability to the Java PushConsumer with:

- BatchMessageListener interface returning a single ConsumeResult for the entire batch
- BatchPolicy with Builder pattern and sensible defaults (32 msgs, 4MB, 5s)
- BatchConsumeService for Standard mode with precise batch splitting (extractBatch)
  respecting both maxBatchSize and maxBatchBytes, with forward-progress guarantee
- FifoBatchConsumeService for FIFO mode via inheritance with whole-batch retry
  semantics and single-batch-in-flight constraint
- ConsumeService parent class enhanced with close(), protected constructor, and getters
- Config validation in builder (cache limits >= batch limits)
- Comprehensive tests using awaitility (BatchConsumeServiceTest, BatchConsumeTaskTest,
  BatchPolicyTest) covering Standard/FIFO modes, retry, DLQ, concurrency, and edge cases
- BatchPushConsumerExample demonstrating usage

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Resolved conflicts in PushConsumerBuilder, PushConsumerBuilderImpl, and
PushConsumerImpl to integrate batch consume support with upstream's new
features (fifo consume accelerator, message interceptor filtering, lite
consumer support).

Co-Authored-By: Claude <noreply@anthropic.com>
…merImpl signature

The batch consume feature added BatchMessageListener and BatchPolicy
parameters to PushConsumerImpl constructors, but LitePushConsumerImpl
was still calling the old signature. Pass null for batch parameters
and false for enableMessageInterceptorFiltering to restore compilation.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
@codecov-commenter

codecov-commenter commented May 22, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 76.42857% with 66 lines in your changes missing coverage. Please review.
✅ Project coverage is 53.87%. Comparing base (4076e2b) to head (07fc971).
⚠️ Report is 122 commits behind head on master.

Files with missing lines Patch % Lines
...nt/java/impl/consumer/FifoBatchConsumeService.java 66.19% 21 Missing and 3 partials ⚠️
...client/java/impl/consumer/BatchConsumeService.java 84.72% 19 Missing and 3 partials ⚠️
...nt/java/impl/consumer/PushConsumerBuilderImpl.java 16.66% 8 Missing and 2 partials ⚠️
...mq/client/java/impl/consumer/PushConsumerImpl.java 30.76% 9 Missing ⚠️
...etmq/client/java/impl/consumer/ConsumeService.java 90.90% 1 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master    #1249      +/-   ##
============================================
+ Coverage     53.26%   53.87%   +0.60%     
- Complexity      651      834     +183     
============================================
  Files           208      224      +16     
  Lines         14303    14734     +431     
  Branches       5845     5600     -245     
============================================
+ Hits           7619     7938     +319     
- Misses         6308     6390      +82     
- Partials        376      406      +30     
Flag Coverage Δ
java 62.01% <76.42%> (+0.33%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

sa-buc and others added 3 commits May 22, 2026 11:08
1. Performance: Replace ArrayList with ArrayDeque to fix O(n) removal issue
   - buffer.remove(0) is O(n) for ArrayList, causing O(n*m) complexity
   - ArrayDeque.removeFirst() is O(1), improving overall performance

2. Semantics: Fix firstArrivalNanos reset logic to track actual arrival time
   - Previously reset to System.nanoTime() after partial extraction
   - Now uses actual message arrival time (BufferedMessage.arrivalNanos)
   - Prevents buffer drift under high load

3. Consistency: Cache bodySize in BufferedMessage to avoid repeated calculation
   - ByteBuffer.remaining() called twice could return different values
   - Cache once at buffer entry, reuse throughout lifecycle
   - Ensures bufferBytes counter stays accurate

4. Reliability: Make close() synchronously wait for pending batches
   - submitBatch now returns ListenableFuture for completion tracking
   - close() collects all futures and waits with 30s timeout
   - Prevents message loss during shutdown

5. Documentation: Add warning about FIFO global buffer breaking messageGroup isolation
   - BatchMessageListener JavaDoc warns about cross-group batching
   - FifoBatchConsumeService JavaDoc clarifies single batch in-flight semantics
   - Users understand retry affects all messageGroups in batch

All tests pass (13/13 BatchConsumeServiceTest).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ifoBatchConsumeService.close()

Replace fully qualified names (java.util.List, java.util.ArrayList, etc.)
with short names since the imports are already declared at the top of the file.

This improves code readability and follows Java coding conventions.
@qianye1001

Copy link
Copy Markdown
Contributor Author

Review Suggestions

  1. Per-group buffering for FIFO mode — The current batch buffer is global across all message groups. In FIFO mode, a batch may contain messages from different groups, and a failure sends all to DLQ together. Consider whether per-group buffering would provide stricter FIFO guarantees.

  2. directExecutor() in submitBatch() callbackhandleResult() runs on the same thread that completes the future. If ack/nack does blocking I/O, it could delay other callbacks. Consider using the consumption executor instead of MoreExecutors.directExecutor().

  3. Add FifoBatchConsumeServiceTestBatchConsumeService, BatchConsumeTask, and BatchPolicy all have tests, but FifoBatchConsumeService lacks dedicated coverage for FIFO ordering, retry, and DLQ scenarios.

  4. Constructor backward compat — The old 7-param convenience constructor in PushConsumerImpl was removed. Consider keeping it as a deprecated overload, or call this out in release notes.

sa-buc and others added 2 commits May 22, 2026 17:13
- Add dedicated FifoBatchConsumeServiceTest covering FIFO ordering,
  retry, DLQ, corrupted message handling, metrics, and graceful shutdown
- Change submitBatch() callback executor from directExecutor() to
  listeningDecorator(getConsumptionExecutor()) for consistency with
  the task submission executor

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
# Conflicts:
#	java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
@qianye1001 qianye1001 closed this May 26, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants