Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #5983] Make consumer support flow control code better (#5984) #6395

Merged
merged 1 commit into from
Mar 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
Expand Down Expand Up @@ -110,9 +111,13 @@ private enum SubscriptionType {
*/
private long pullTimeDelayMillsWhenException = 1000;
/**
* Flow control interval
* Flow control interval when message cache is full
*/
private static final long PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL = 50;
private static final long PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL = 50;
/**
* Flow control interval when broker return flow control
*/
private static final long PULL_TIME_DELAY_MILLS_WHEN_BROKER_FLOW_CONTROL = 20;
/**
* Delay some time when suspend pull service
*/
Expand Down Expand Up @@ -784,7 +789,7 @@ public void run() {
}

if ((long) consumeRequestCache.size() * defaultLitePullConsumer.getPullBatchSize() > defaultLitePullConsumer.getPullThresholdForAll()) {
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL, TimeUnit.MILLISECONDS);
if ((consumeRequestFlowControlTimes++ % 1000) == 0) {
log.warn("The consume request count exceeds threshold {}, so do flow control, consume request count={}, flowControlTimes={}", consumeRequestCache.size(), consumeRequestFlowControlTimes);
}
Expand All @@ -795,7 +800,7 @@ public void run() {
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);

if (cachedMessageCount > defaultLitePullConsumer.getPullThresholdForQueue()) {
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL, TimeUnit.MILLISECONDS);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"The cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}",
Expand All @@ -805,7 +810,7 @@ public void run() {
}

if (cachedMessageSizeInMiB > defaultLitePullConsumer.getPullThresholdSizeForQueue()) {
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL, TimeUnit.MILLISECONDS);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"The cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}",
Expand All @@ -815,7 +820,7 @@ public void run() {
}

if (processQueue.getMaxSpan() > defaultLitePullConsumer.getConsumeMaxSpan()) {
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL, TimeUnit.MILLISECONDS);
if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
log.warn(
"The queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, flowControlTimes={}",
Expand Down Expand Up @@ -870,7 +875,11 @@ public void run() {
} catch (InterruptedException interruptedException) {
log.warn("Polling thread was interrupted.", interruptedException);
} catch (Throwable e) {
pullDelayTimeMills = pullTimeDelayMillsWhenException;
if (e instanceof MQBrokerException && ((MQBrokerException) e).getResponseCode() == ResponseCode.FLOW_CONTROL) {
pullDelayTimeMills = PULL_TIME_DELAY_MILLS_WHEN_BROKER_FLOW_CONTROL;
} else {
pullDelayTimeMills = pullTimeDelayMillsWhenException;
}
log.error("An error occurred in pull message process.", e);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.rocketmq.common.filter.FilterAPI;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor;
Expand All @@ -86,9 +87,13 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
*/
private long pullTimeDelayMillsWhenException = 3000;
/**
* Flow control interval
* Flow control interval when message cache is full
*/
private static final long PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL = 50;
private static final long PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL = 50;
/**
* Flow control interval when broker return flow control
*/
private static final long PULL_TIME_DELAY_MILLS_WHEN_BROKER_FLOW_CONTROL = 20;
/**
* Delay some time when suspend pull service
*/
Expand Down Expand Up @@ -238,7 +243,7 @@ public void pullMessage(final PullRequest pullRequest) {
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);

if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
Expand All @@ -248,7 +253,7 @@ public void pullMessage(final PullRequest pullRequest) {
}

if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
Expand All @@ -259,7 +264,7 @@ public void pullMessage(final PullRequest pullRequest) {

if (!this.consumeOrderly) {
if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL);
if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
log.warn(
"the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
Expand Down Expand Up @@ -400,7 +405,11 @@ public void onException(Throwable e) {
log.warn("execute the pull request exception", e);
}

DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
if (e instanceof MQBrokerException && ((MQBrokerException) e).getResponseCode() == ResponseCode.FLOW_CONTROL) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_BROKER_FLOW_CONTROL);
} else {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
}
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,6 @@ public class ResponseCode extends RemotingSysResponseCode {

public static final int UPDATE_GLOBAL_WHITE_ADDRS_CONFIG_FAILED = 211;

public static final int FLOW_CONTROL = 215;

}