Skip to content

Commit

Permalink
feat: support to control consumption rate for group
Browse files Browse the repository at this point in the history
  • Loading branch information
dengzhiwen1 committed Jun 18, 2024
1 parent 1511809 commit 8f34bd7
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,16 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume

private RPCHook rpcHook = null;

/**
* Enable RateLimiter for consumer
*/
private boolean enableConsumeRateLimit = false;

/**
* The rate of consumption for single consumer
*/
private int consumptionRate = Integer.MAX_VALUE;

/**
* Default constructor.
*/
Expand Down Expand Up @@ -1007,4 +1017,20 @@ public MessageQueueListener getMessageQueueListener() {
public void setMessageQueueListener(MessageQueueListener messageQueueListener) {
this.messageQueueListener = messageQueueListener;
}

public boolean isEnableConsumeRateLimit() {
return enableConsumeRateLimit;
}

public void setEnableConsumeRateLimit(boolean enableConsumeRateLimit) {
this.enableConsumeRateLimit = enableConsumeRateLimit;
}

public int getConsumptionRate() {
return consumptionRate;
}

public void setConsumptionRate(int consumptionRate) {
this.consumptionRate = consumptionRate;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,17 @@ public ProcessQueue getProcessQueue() {

@Override
public void run() {
if (defaultMQPushConsumer.isEnableConsumeRateLimit()) {
boolean canConsume = defaultMQPushConsumerImpl.getConsumeRateLimiter().tryAcquire(10 * 1000, TimeUnit.MILLISECONDS);
while (!canConsume) {
log.warn("[Flow control], wait for group={}, mq={}", consumerGroup, messageQueue.getTopic());
canConsume = defaultMQPushConsumerImpl.getConsumeRateLimiter().tryAcquire(10 * 1000, TimeUnit.MILLISECONDS);
}
}
doConsumeInner();
}

public void doConsumeInner() {
if (this.processQueue.isDropped()) {
log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,17 @@ public MessageQueue getMessageQueue() {

@Override
public void run() {
if (defaultMQPushConsumer.isEnableConsumeRateLimit()) {
boolean canConsume = defaultMQPushConsumerImpl.getConsumeRateLimiter().tryAcquire(10 * 1000, TimeUnit.MILLISECONDS);
while (!canConsume) {
log.warn("[Flow control], wait for group={}, mq={}", consumerGroup, messageQueue.getTopic());
canConsume = defaultMQPushConsumerImpl.getConsumeRateLimiter().tryAcquire(10 * 1000, TimeUnit.MILLISECONDS);
}
}
doConsumeInner();
}

public void doConsumeInner() {
if (this.processQueue.isDropped()) {
log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import com.google.common.util.concurrent.RateLimiter;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.QueryResult;
Expand Down Expand Up @@ -131,6 +132,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
private ConsumeMessageService consumeMessagePopService;
private long queueFlowControlTimes = 0;
private long queueMaxSpanFlowControlTimes = 0;
private RateLimiter consumeRateLimiter;

//10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
private final int[] popDelayLevel = new int[] {10, 30, 60, 120, 180, 240, 300, 360, 420, 480, 540, 600, 1200, 1800, 3600, 7200};
Expand Down Expand Up @@ -243,6 +245,10 @@ public void setOffsetStore(OffsetStore offsetStore) {
this.offsetStore = offsetStore;
}

public RateLimiter getConsumeRateLimiter() {
return consumeRateLimiter;
}

public void pullMessage(final PullRequest pullRequest) {
final ProcessQueue processQueue = pullRequest.getProcessQueue();
if (processQueue.isDropped()) {
Expand Down Expand Up @@ -982,6 +988,7 @@ public synchronized void start() throws MQClientException {
}

mQClientFactory.start();
this.consumeRateLimiter = RateLimiter.create(this.defaultMQPushConsumer.getConsumptionRate());
log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
this.serviceState = ServiceState.RUNNING;
break;
Expand Down

0 comments on commit 8f34bd7

Please sign in to comment.