Skip to content

feat(java): add batch consume support for PushConsumer#1226

Closed
qianye1001 wants to merge 3 commits into
apache:masterfrom
qianye1001:batch-simple-consumer-0
Closed

feat(java): add batch consume support for PushConsumer#1226
qianye1001 wants to merge 3 commits into
apache:masterfrom
qianye1001:batch-simple-consumer-0

Conversation

@qianye1001

@qianye1001 qianye1001 commented Apr 22, 2026

Copy link
Copy Markdown
Contributor

Which Issue(s) This PR Fixes

Fixes #1225

Brief Description

This PR adds first-class batch consumption support for PushConsumer in the Java client, controlled by a BatchPolicy.

PushConsumer — Batch Consume Service

  • BatchConsumeService: accumulates messages from all ProcessQueues into a single BatchBuffer, flushing to BatchMessageListener when any threshold is met (count, bytes, or timeout).
  • BatchBuffer: encapsulated accumulation buffer with lock-based concurrency, scheduled timeout flush, forward-progress guarantee (single oversized message is still flushed).
  • BatchConsumeTask: Callable that invokes BatchMessageListener with interceptor hooks.
  • New BatchMessageListener interface and PushConsumerBuilder.setBatchMessageListener().
  • FIFO mode support: in FIFO mode, only one batch is in-flight at a time, preserving strict ordering.
  • Graceful shutdown: close() flushes all remaining buffered messages before the consumer shuts down.

BatchPolicy

Policy with maxBatchCount, maxBatchBytes, maxWaitTime. Builder pattern with sensible defaults.

A batch is flushed when any condition is met:

  1. Message count reaches maxBatchCount.
  2. Total body bytes reach maxBatchBytes.
  3. Time since first buffered message reaches maxWaitTime.

How Did You Test This Change?

  • BatchConsumeServiceTest: 14 unit tests covering count/bytes/timeout triggers, corrupted message discard, multiple batches from single consume, single oversized message flush, batch failure, close flush, multiple ProcessQueues, FIFO ordering, FIFO close, FIFO timeout-respects-inflight.
  • BatchConsumeTaskTest: 5 unit tests covering success/failure/exception/null-return scenarios with interceptor hooks, and batch size passthrough.
  • BatchPolicyTest: 11 unit tests covering constructors, builder, validation, toString.
  • PushConsumerBuilderImplTest: added 4 tests for batch builder validation (no listener, both listeners, null listener, null policy).
  • All tests pass: mvn clean test -pl client-apis,client — 307 tests, 0 failures, 0 checkstyle violations.

@qianye1001 qianye1001 force-pushed the batch-simple-consumer-0 branch from 8e76ce6 to 17ab213 Compare April 22, 2026 06:14
@codecov-commenter

codecov-commenter commented Apr 22, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 86.69528% with 31 lines in your changes missing coverage. Please review.
✅ Project coverage is 54.36%. Comparing base (4076e2b) to head (83f867c).
⚠️ Report is 117 commits behind head on master.

Files with missing lines Patch % Lines
...mq/client/java/impl/consumer/PushConsumerImpl.java 20.00% 12 Missing ⚠️
...nt/java/impl/consumer/PushConsumerBuilderImpl.java 43.75% 7 Missing and 2 partials ⚠️
...ocketmq/client/java/impl/consumer/BatchBuffer.java 93.87% 1 Missing and 5 partials ⚠️
...client/java/impl/consumer/BatchConsumeService.java 95.65% 3 Missing ⚠️
...etmq/client/java/impl/consumer/ConsumeService.java 0.00% 1 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master    #1226      +/-   ##
============================================
+ Coverage     53.26%   54.36%   +1.09%     
- Complexity      651      854     +203     
============================================
  Files           208      223      +15     
  Lines         14303    14669     +366     
  Branches       5845     5617     -228     
============================================
+ Hits           7619     7975     +356     
+ Misses         6308     6286      -22     
- Partials        376      408      +32     
Flag Coverage Δ
java 63.79% <86.69%> (+2.10%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@qianye1001 qianye1001 force-pushed the batch-simple-consumer-0 branch from 17ab213 to 7c99677 Compare April 22, 2026 06:44
@qianye1001 qianye1001 changed the title feat(java): add batch receive and batch consume support for SimpleConsumer and PushConsumer feat(java): add batch consume support for PushConsumer May 11, 2026
@qianye1001 qianye1001 force-pushed the batch-simple-consumer-0 branch 2 times, most recently from 83f867c to 6e54d38 Compare May 11, 2026 09:09
@qianye1001 qianye1001 force-pushed the batch-simple-consumer-0 branch from 6e54d38 to 7786d7b Compare May 11, 2026 12:10
Implement batch consumption support for PushConsumer:
- BatchConsumeTask: invokes BatchMessageListener with a list of messages
- BatchConsumeService: concurrent batch consumption with shared buffer,
  flush triggered by count/bytes/timeout
- FifoBatchConsumeService: serial batch consumption with per-message
  retry tracking, partial exhaustion sends individual messages to DLQ
  while remaining messages continue retrying
- Wire up PushConsumerImpl.createConsumeService() for batch mode
- Add ConsumeService protected constructor and getters for subclass use
- Comprehensive unit tests (16 cases) and integration tests (6 cases)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@qianye1001

Copy link
Copy Markdown
Contributor Author

Closing in favor of a new PR with improved implementation: cleaner class hierarchy (BatchConsumeService + FifoBatchConsumeService via inheritance), precise batch splitting with forward-progress guarantee, Builder pattern for BatchPolicy, and whole-batch FIFO retry semantics.

@qianye1001 qianye1001 closed this May 21, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature] Add batch receive and batch consume support for Java SimpleConsumer and PushConsumer

2 participants