-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Closed
Labels
type/bugThe PR fixed a bug or issue reported a bugThe PR fixed a bug or issue reported a bug
Description
Describe the bug
If the producer enables batch, the message will add to the batch container first, when the broker state is not ready, there is no subsequent logic to send messages in the batch container.
Before PR #14185 there is a timer task to call the method batchMessageContainer to send messages in the batch container. The PR #14185 optimize batch flush logic, remove the timer task, add a scheduled task after adding messages.
Some Codes
Code block in method serializeAndSendMessage
boolean isBatchFull = batchMessageContainer.add(msg, callback);
lastSendFuture = callback.getFuture();
payload.release();
if (isBatchFull) {
batchMessageAndSend(false);
} else {
maybeScheduleBatchFlushTask();
}
If the producer state is not ready, the batch flush task will not be scheduled.
private void maybeScheduleBatchFlushTask() {
if (this.batchFlushTask != null || getState() != State.Ready) {
log.info("xxxx [{}] MaybeScheduleBatchFlushTask state: {}", topic, getState());
return;
}
scheduleBatchFlushTask(conf.getBatchingMaxPublishDelayMicros());
}
To Reproduce
Steps to reproduce the behavior:
- Create a producer
- Mock broker exception to close connection.
- Send normal messages in an async way and call get method or send messages with the transaction.
- The process will be stuck.
Expected behavior
Messages could be published normally.
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
type/bugThe PR fixed a bug or issue reported a bugThe PR fixed a bug or issue reported a bug