Skip to content

Commit

Permalink
[ISSUE apache#5983] Make consumer support flow control code better (a…
Browse files Browse the repository at this point in the history
…pache#5984)

* When encountering the flow control code, pull it after 20ms instead of 3s

* When encountering the flow control code, pull it after 20ms instead of 3s

(cherry picked from commit a1d1cf8)

# Conflicts:
#	client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
#	client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
  • Loading branch information
RongtongJin committed Mar 19, 2023
1 parent fa38abf commit 84b4684
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
Expand Down Expand Up @@ -110,9 +111,13 @@ private enum SubscriptionType {
*/
private long pullTimeDelayMillsWhenException = 1000;
/**
* Flow control interval
* Flow control interval when message cache is full
*/
private static final long PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL = 50;
private static final long PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL = 50;
/**
* Flow control interval when broker return flow control
*/
private static final long PULL_TIME_DELAY_MILLS_WHEN_BROKER_FLOW_CONTROL = 20;
/**
* Delay some time when suspend pull service
*/
Expand Down Expand Up @@ -784,7 +789,7 @@ public void run() {
}

if ((long) consumeRequestCache.size() * defaultLitePullConsumer.getPullBatchSize() > defaultLitePullConsumer.getPullThresholdForAll()) {
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL, TimeUnit.MILLISECONDS);
if ((consumeRequestFlowControlTimes++ % 1000) == 0) {
log.warn("The consume request count exceeds threshold {}, so do flow control, consume request count={}, flowControlTimes={}", consumeRequestCache.size(), consumeRequestFlowControlTimes);
}
Expand All @@ -795,7 +800,7 @@ public void run() {
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);

if (cachedMessageCount > defaultLitePullConsumer.getPullThresholdForQueue()) {
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL, TimeUnit.MILLISECONDS);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"The cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}",
Expand All @@ -805,7 +810,7 @@ public void run() {
}

if (cachedMessageSizeInMiB > defaultLitePullConsumer.getPullThresholdSizeForQueue()) {
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL, TimeUnit.MILLISECONDS);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"The cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}",
Expand All @@ -815,7 +820,7 @@ public void run() {
}

if (processQueue.getMaxSpan() > defaultLitePullConsumer.getConsumeMaxSpan()) {
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL, TimeUnit.MILLISECONDS);
if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
log.warn(
"The queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, flowControlTimes={}",
Expand Down Expand Up @@ -870,7 +875,11 @@ public void run() {
} catch (InterruptedException interruptedException) {
log.warn("Polling thread was interrupted.", interruptedException);
} catch (Throwable e) {
pullDelayTimeMills = pullTimeDelayMillsWhenException;
if (e instanceof MQBrokerException && ((MQBrokerException) e).getResponseCode() == ResponseCode.FLOW_CONTROL) {
pullDelayTimeMills = PULL_TIME_DELAY_MILLS_WHEN_BROKER_FLOW_CONTROL;
} else {
pullDelayTimeMills = pullTimeDelayMillsWhenException;
}
log.error("An error occurred in pull message process.", e);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.rocketmq.common.filter.FilterAPI;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor;
Expand All @@ -86,9 +87,13 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
*/
private long pullTimeDelayMillsWhenException = 3000;
/**
* Flow control interval
* Flow control interval when message cache is full
*/
private static final long PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL = 50;
private static final long PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL = 50;
/**
* Flow control interval when broker return flow control
*/
private static final long PULL_TIME_DELAY_MILLS_WHEN_BROKER_FLOW_CONTROL = 20;
/**
* Delay some time when suspend pull service
*/
Expand Down Expand Up @@ -238,7 +243,7 @@ public void pullMessage(final PullRequest pullRequest) {
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);

if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
Expand All @@ -248,7 +253,7 @@ public void pullMessage(final PullRequest pullRequest) {
}

if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
Expand All @@ -259,7 +264,7 @@ public void pullMessage(final PullRequest pullRequest) {

if (!this.consumeOrderly) {
if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL);
if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
log.warn(
"the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
Expand Down Expand Up @@ -400,7 +405,11 @@ public void onException(Throwable e) {
log.warn("execute the pull request exception", e);
}

DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
if (e instanceof MQBrokerException && ((MQBrokerException) e).getResponseCode() == ResponseCode.FLOW_CONTROL) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_BROKER_FLOW_CONTROL);
} else {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
}
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,6 @@ public class ResponseCode extends RemotingSysResponseCode {

public static final int UPDATE_GLOBAL_WHITE_ADDRS_CONFIG_FAILED = 211;

public static final int FLOW_CONTROL = 215;

}

0 comments on commit 84b4684

Please sign in to comment.