From 8f34bd722a79676e0e89ca6501efea4364879cff Mon Sep 17 00:00:00 2001 From: dengzhiwen1 Date: Thu, 13 Jun 2024 14:15:07 +0800 Subject: [PATCH] feat: support to control consumption rate for group --- .../consumer/DefaultMQPushConsumer.java | 26 +++++++++++++++++++ .../ConsumeMessageConcurrentlyService.java | 11 ++++++++ .../ConsumeMessageOrderlyService.java | 11 ++++++++ .../consumer/DefaultMQPushConsumerImpl.java | 7 +++++ 4 files changed, 55 insertions(+) diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java index 38a412c237b..8d10f2fd27e 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java @@ -296,6 +296,16 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume private RPCHook rpcHook = null; + /** + * Enable RateLimiter for consumer + */ + private boolean enableConsumeRateLimit = false; + + /** + * The rate of consumption for single consumer + */ + private int consumptionRate = Integer.MAX_VALUE; + /** * Default constructor. */ @@ -1007,4 +1017,20 @@ public MessageQueueListener getMessageQueueListener() { public void setMessageQueueListener(MessageQueueListener messageQueueListener) { this.messageQueueListener = messageQueueListener; } + + public boolean isEnableConsumeRateLimit() { + return enableConsumeRateLimit; + } + + public void setEnableConsumeRateLimit(boolean enableConsumeRateLimit) { + this.enableConsumeRateLimit = enableConsumeRateLimit; + } + + public int getConsumptionRate() { + return consumptionRate; + } + + public void setConsumptionRate(int consumptionRate) { + this.consumptionRate = consumptionRate; + } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java index b151fefbbb3..4415c3ff04a 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java @@ -376,6 +376,17 @@ public ProcessQueue getProcessQueue() { @Override public void run() { + if (defaultMQPushConsumer.isEnableConsumeRateLimit()) { + boolean canConsume = defaultMQPushConsumerImpl.getConsumeRateLimiter().tryAcquire(10 * 1000, TimeUnit.MILLISECONDS); + while (!canConsume) { + log.warn("[Flow control], wait for group={}, mq={}", consumerGroup, messageQueue.getTopic()); + canConsume = defaultMQPushConsumerImpl.getConsumeRateLimiter().tryAcquire(10 * 1000, TimeUnit.MILLISECONDS); + } + } + doConsumeInner(); + } + + public void doConsumeInner() { if (this.processQueue.isDropped()) { log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue); return; diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java index 3ca465da70d..f31676ab36b 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java @@ -426,6 +426,17 @@ public MessageQueue getMessageQueue() { @Override public void run() { + if (defaultMQPushConsumer.isEnableConsumeRateLimit()) { + boolean canConsume = defaultMQPushConsumerImpl.getConsumeRateLimiter().tryAcquire(10 * 1000, TimeUnit.MILLISECONDS); + while (!canConsume) { + log.warn("[Flow control], wait for group={}, mq={}", consumerGroup, messageQueue.getTopic()); + canConsume = defaultMQPushConsumerImpl.getConsumeRateLimiter().tryAcquire(10 * 1000, TimeUnit.MILLISECONDS); + } + } + doConsumeInner(); + } + + public void doConsumeInner() { if (this.processQueue.isDropped()) { log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue); return; diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java index 3e832e5a9a3..437ef23c301 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java @@ -27,6 +27,7 @@ import java.util.Properties; import java.util.Set; import java.util.concurrent.ConcurrentMap; +import com.google.common.util.concurrent.RateLimiter; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.QueryResult; @@ -131,6 +132,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { private ConsumeMessageService consumeMessagePopService; private long queueFlowControlTimes = 0; private long queueMaxSpanFlowControlTimes = 0; + private RateLimiter consumeRateLimiter; //10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h private final int[] popDelayLevel = new int[] {10, 30, 60, 120, 180, 240, 300, 360, 420, 480, 540, 600, 1200, 1800, 3600, 7200}; @@ -243,6 +245,10 @@ public void setOffsetStore(OffsetStore offsetStore) { this.offsetStore = offsetStore; } + public RateLimiter getConsumeRateLimiter() { + return consumeRateLimiter; + } + public void pullMessage(final PullRequest pullRequest) { final ProcessQueue processQueue = pullRequest.getProcessQueue(); if (processQueue.isDropped()) { @@ -982,6 +988,7 @@ public synchronized void start() throws MQClientException { } mQClientFactory.start(); + this.consumeRateLimiter = RateLimiter.create(this.defaultMQPushConsumer.getConsumptionRate()); log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup()); this.serviceState = ServiceState.RUNNING; break;