diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java index 11cdbf6fb10..8161e569afe 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java @@ -65,6 +65,7 @@ import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.NamespaceUtil; +import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; @@ -110,9 +111,13 @@ private enum SubscriptionType { */ private long pullTimeDelayMillsWhenException = 1000; /** - * Flow control interval + * Flow control interval when message cache is full */ - private static final long PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL = 50; + private static final long PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL = 50; + /** + * Flow control interval when broker return flow control + */ + private static final long PULL_TIME_DELAY_MILLS_WHEN_BROKER_FLOW_CONTROL = 20; /** * Delay some time when suspend pull service */ @@ -784,7 +789,7 @@ public void run() { } if ((long) consumeRequestCache.size() * defaultLitePullConsumer.getPullBatchSize() > defaultLitePullConsumer.getPullThresholdForAll()) { - scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS); + scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL, TimeUnit.MILLISECONDS); if ((consumeRequestFlowControlTimes++ % 1000) == 0) { log.warn("The consume request count exceeds threshold {}, so do flow control, consume request count={}, flowControlTimes={}", consumeRequestCache.size(), consumeRequestFlowControlTimes); } @@ -795,7 +800,7 @@ public void run() { long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024); if (cachedMessageCount > defaultLitePullConsumer.getPullThresholdForQueue()) { - scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS); + scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL, TimeUnit.MILLISECONDS); if ((queueFlowControlTimes++ % 1000) == 0) { log.warn( "The cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}", @@ -805,7 +810,7 @@ public void run() { } if (cachedMessageSizeInMiB > defaultLitePullConsumer.getPullThresholdSizeForQueue()) { - scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS); + scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL, TimeUnit.MILLISECONDS); if ((queueFlowControlTimes++ % 1000) == 0) { log.warn( "The cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}", @@ -815,7 +820,7 @@ public void run() { } if (processQueue.getMaxSpan() > defaultLitePullConsumer.getConsumeMaxSpan()) { - scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS); + scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL, TimeUnit.MILLISECONDS); if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) { log.warn( "The queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, flowControlTimes={}", @@ -870,7 +875,11 @@ public void run() { } catch (InterruptedException interruptedException) { log.warn("Polling thread was interrupted.", interruptedException); } catch (Throwable e) { - pullDelayTimeMills = pullTimeDelayMillsWhenException; + if (e instanceof MQBrokerException && ((MQBrokerException) e).getResponseCode() == ResponseCode.FLOW_CONTROL) { + pullDelayTimeMills = PULL_TIME_DELAY_MILLS_WHEN_BROKER_FLOW_CONTROL; + } else { + pullDelayTimeMills = pullTimeDelayMillsWhenException; + } log.error("An error occurred in pull message process.", e); } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java index df68284d49d..bb40f855f46 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java @@ -60,6 +60,7 @@ import org.apache.rocketmq.common.filter.FilterAPI; import org.apache.rocketmq.common.help.FAQUrl; import org.apache.rocketmq.common.protocol.NamespaceUtil; +import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageAccessor; @@ -86,9 +87,13 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { */ private long pullTimeDelayMillsWhenException = 3000; /** - * Flow control interval + * Flow control interval when message cache is full */ - private static final long PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL = 50; + private static final long PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL = 50; + /** + * Flow control interval when broker return flow control + */ + private static final long PULL_TIME_DELAY_MILLS_WHEN_BROKER_FLOW_CONTROL = 20; /** * Delay some time when suspend pull service */ @@ -238,7 +243,7 @@ public void pullMessage(final PullRequest pullRequest) { long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024); if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) { - this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); + this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL); if ((queueFlowControlTimes++ % 1000) == 0) { log.warn( "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}", @@ -248,7 +253,7 @@ public void pullMessage(final PullRequest pullRequest) { } if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) { - this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); + this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL); if ((queueFlowControlTimes++ % 1000) == 0) { log.warn( "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}", @@ -259,7 +264,7 @@ public void pullMessage(final PullRequest pullRequest) { if (!this.consumeOrderly) { if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) { - this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); + this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL); if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) { log.warn( "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}", @@ -400,7 +405,11 @@ public void onException(Throwable e) { log.warn("execute the pull request exception", e); } - DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); + if (e instanceof MQBrokerException && ((MQBrokerException) e).getResponseCode() == ResponseCode.FLOW_CONTROL) { + DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_BROKER_FLOW_CONTROL); + } else { + DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); + } } }; diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java index dc744448f6c..f4adffdd534 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java @@ -80,4 +80,6 @@ public class ResponseCode extends RemotingSysResponseCode { public static final int UPDATE_GLOBAL_WHITE_ADDRS_CONFIG_FAILED = 211; + public static final int FLOW_CONTROL = 215; + }