Skip to content

Commit

Permalink
[ISSUE apache#5983] Make consumer support flow control code better (a…
Browse files Browse the repository at this point in the history
…pache#5984)

* When encountering the flow control code, pull it after 20ms instead of 3s

* When encountering the flow control code, pull it after 20ms instead of 3s
  • Loading branch information
RongtongJin committed Feb 6, 2023
1 parent 671977e commit a1d1cf8
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.protocol.NamespaceUtil;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.remoting.protocol.filter.FilterAPI;
import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
Expand Down Expand Up @@ -111,9 +112,13 @@ private enum SubscriptionType {
*/
private long pullTimeDelayMillsWhenException = 1000;
/**
* Flow control interval
* Flow control interval when message cache is full
*/
private static final long PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL = 50;
private static final long PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL = 50;
/**
* Flow control interval when broker return flow control
*/
private static final long PULL_TIME_DELAY_MILLS_WHEN_BROKER_FLOW_CONTROL = 20;
/**
* Delay some time when suspend pull service
*/
Expand Down Expand Up @@ -891,7 +896,7 @@ public void run() {
}

if ((long) consumeRequestCache.size() * defaultLitePullConsumer.getPullBatchSize() > defaultLitePullConsumer.getPullThresholdForAll()) {
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL, TimeUnit.MILLISECONDS);
if ((consumeRequestFlowControlTimes++ % 1000) == 0) {
log.warn("The consume request count exceeds threshold {}, so do flow control, consume request count={}, flowControlTimes={}", consumeRequestCache.size(), consumeRequestFlowControlTimes);
}
Expand All @@ -902,7 +907,7 @@ public void run() {
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);

if (cachedMessageCount > defaultLitePullConsumer.getPullThresholdForQueue()) {
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL, TimeUnit.MILLISECONDS);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"The cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}",
Expand All @@ -912,7 +917,7 @@ public void run() {
}

if (cachedMessageSizeInMiB > defaultLitePullConsumer.getPullThresholdSizeForQueue()) {
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL, TimeUnit.MILLISECONDS);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"The cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}",
Expand All @@ -922,7 +927,7 @@ public void run() {
}

if (processQueue.getMaxSpan() > defaultLitePullConsumer.getConsumeMaxSpan()) {
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL, TimeUnit.MILLISECONDS);
if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
log.warn(
"The queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, flowControlTimes={}",
Expand Down Expand Up @@ -979,7 +984,11 @@ public void run() {
} catch (InterruptedException interruptedException) {
log.warn("Polling thread was interrupted.", interruptedException);
} catch (Throwable e) {
pullDelayTimeMills = pullTimeDelayMillsWhenException;
if (e instanceof MQBrokerException && ((MQBrokerException) e).getResponseCode() == ResponseCode.FLOW_CONTROL) {
pullDelayTimeMills = PULL_TIME_DELAY_MILLS_WHEN_BROKER_FLOW_CONTROL;
} else {
pullDelayTimeMills = pullTimeDelayMillsWhenException;
}
log.error("An error occurred in pull message process.", e);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,13 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
*/
private long pullTimeDelayMillsWhenException = 3000;
/**
* Flow control interval
* Flow control interval when message cache is full
*/
private static final long PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL = 50;
private static final long PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL = 50;
/**
* Flow control interval when broker return flow control
*/
private static final long PULL_TIME_DELAY_MILLS_WHEN_BROKER_FLOW_CONTROL = 20;
/**
* Delay some time when suspend pull service
*/
Expand Down Expand Up @@ -264,7 +268,7 @@ public void pullMessage(final PullRequest pullRequest) {
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);

if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
Expand All @@ -274,7 +278,7 @@ public void pullMessage(final PullRequest pullRequest) {
}

if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
Expand All @@ -285,7 +289,7 @@ public void pullMessage(final PullRequest pullRequest) {

if (!this.consumeOrderly) {
if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL);
if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
log.warn(
"the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
Expand Down Expand Up @@ -429,7 +433,11 @@ public void onException(Throwable e) {
log.warn("execute the pull request exception", e);
}

DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
if (e instanceof MQBrokerException && ((MQBrokerException) e).getResponseCode() == ResponseCode.FLOW_CONTROL) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_BROKER_FLOW_CONTROL);
} else {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
}
}
};

Expand Down Expand Up @@ -505,7 +513,7 @@ void popMessage(final PopRequest popRequest) {
}

if (processQueue.getWaiAckMsgCount() > this.defaultMQPushConsumer.getPopThresholdForQueue()) {
this.executePopPullRequestLater(popRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
this.executePopPullRequestLater(popRequest, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn("the messages waiting to ack exceeds the threshold {}, so do flow control, popRequest={}, flowControlTimes={}, wait count={}",
this.defaultMQPushConsumer.getPopThresholdForQueue(), popRequest, queueFlowControlTimes, processQueue.getWaiAckMsgCount());
Expand Down Expand Up @@ -579,7 +587,11 @@ public void onException(Throwable e) {
log.warn("execute the pull request exception: {}", e);
}

DefaultMQPushConsumerImpl.this.executePopPullRequestLater(popRequest, pullTimeDelayMillsWhenException);
if (e instanceof MQBrokerException && ((MQBrokerException) e).getResponseCode() == ResponseCode.FLOW_CONTROL) {
DefaultMQPushConsumerImpl.this.executePopPullRequestLater(popRequest, PULL_TIME_DELAY_MILLS_WHEN_BROKER_FLOW_CONTROL);
} else {
DefaultMQPushConsumerImpl.this.executePopPullRequestLater(popRequest, pullTimeDelayMillsWhenException);
}
}
};

Expand Down

0 comments on commit a1d1cf8

Please sign in to comment.