diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupPublishLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupPublishLimiter.java index 9a80ed046a4e6..bed31d284674e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupPublishLimiter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupPublishLimiter.java @@ -95,8 +95,8 @@ public void update(ResourceGroup resourceGroup) { this.publishMaxMessageRate = Math.max(resourceGroup.getPublishRateInMsgs(), 0); this.publishMaxByteRate = Math.max(resourceGroup.getPublishRateInBytes(), 0); if (this.publishMaxMessageRate > 0) { - // TODO: pass the executor publishRateLimiterOnMessage = RateLimiter.builder() + .scheduledExecutorService(scheduledExecutorService) .permits(publishMaxMessageRate) .rateTime(1L) .timeUnit(TimeUnit.SECONDS) @@ -104,14 +104,14 @@ public void update(ResourceGroup resourceGroup) { .build(); } if (this.publishMaxByteRate > 0) { - // TODO: pass the executor publishRateLimiterOnByte = - RateLimiter.builder() - .permits(publishMaxByteRate) - .rateTime(1L) - .timeUnit(TimeUnit.SECONDS) - .rateLimitFunction(this::apply) - .build(); + RateLimiter.builder() + .scheduledExecutorService(scheduledExecutorService) + .permits(publishMaxByteRate) + .rateTime(1L) + .timeUnit(TimeUnit.SECONDS) + .rateLimitFunction(this::apply) + .build(); } } else { this.publishMaxMessageRate = 0;