From 80bab4b2d6a0a190f778d65c1e5d4c609eebcafc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=A2=A7=E8=BF=9C?= Date: Fri, 6 Aug 2021 16:26:47 +0800 Subject: [PATCH 1/3] feat:pass the executor to RateLimiter in ResourceGroupPublishLimiter --- .../broker/resourcegroup/ResourceGroupPublishLimiter.java | 8 ++++---- .../java/org/apache/pulsar/common/util/RateLimiter.java | 5 +++++ 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupPublishLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupPublishLimiter.java index 5e75799b36fed..edef531e0217f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupPublishLimiter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupPublishLimiter.java @@ -95,14 +95,14 @@ public void update(ResourceGroup resourceGroup) { this.publishMaxMessageRate = Math.max(resourceGroup.getPublishRateInMsgs(), 0); this.publishMaxByteRate = Math.max(resourceGroup.getPublishRateInBytes(), 0); if (this.publishMaxMessageRate > 0) { - // TODO: pass the executor publishRateLimiterOnMessage = - new RateLimiter(publishMaxMessageRate, 1, TimeUnit.SECONDS, this::apply); + new RateLimiter(scheduledExecutorService, publishMaxMessageRate, 1, TimeUnit.SECONDS, + this::apply); } if (this.publishMaxByteRate > 0) { - // TODO: pass the executor publishRateLimiterOnByte = - new RateLimiter(publishMaxByteRate, 1, TimeUnit.SECONDS, this::apply); + new RateLimiter(scheduledExecutorService, publishMaxByteRate, 1, TimeUnit.SECONDS, + this::apply); } } else { this.publishMaxMessageRate = 0; diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java index 1bb2fcd9e08d6..f5ce8e84bcdc9 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java @@ -92,6 +92,11 @@ public RateLimiter(final ScheduledExecutorService service, final long permits, f this(service, permits, rateTime, timeUnit, permitUpdater, false); } + public RateLimiter(final ScheduledExecutorService service, final long permits, final long rateTime, + final TimeUnit timeUnit, RateLimitFunction autoReadResetFunction) { + this(service, permits, rateTime, timeUnit, null,false,autoReadResetFunction); + } + public RateLimiter(final ScheduledExecutorService service, final long permits, final long rateTime, final TimeUnit timeUnit, Supplier permitUpdater, boolean isDispatchOrPrecisePublishRateLimiter) { this(service, permits, rateTime, timeUnit, permitUpdater, isDispatchOrPrecisePublishRateLimiter, From 4670b6a7d4814780918fda495435ee624fe1a21c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=A2=A7=E8=BF=9C?= Date: Mon, 9 Aug 2021 07:12:55 +0800 Subject: [PATCH 2/3] chore:use builder to construct RateLimiter --- .../ResourceGroupPublishLimiter.java | 21 ++++++++++++++----- .../pulsar/common/util/RateLimiter.java | 5 ----- 2 files changed, 16 insertions(+), 10 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupPublishLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupPublishLimiter.java index edef531e0217f..ecaddcf782207 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupPublishLimiter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupPublishLimiter.java @@ -95,14 +95,25 @@ public void update(ResourceGroup resourceGroup) { this.publishMaxMessageRate = Math.max(resourceGroup.getPublishRateInMsgs(), 0); this.publishMaxByteRate = Math.max(resourceGroup.getPublishRateInBytes(), 0); if (this.publishMaxMessageRate > 0) { - publishRateLimiterOnMessage = - new RateLimiter(scheduledExecutorService, publishMaxMessageRate, 1, TimeUnit.SECONDS, - this::apply); + publishRateLimiterOnMessage = RateLimiter.builder() + .scheduledExecutorService(scheduledExecutorService) + .permits(publishMaxMessageRate) + .rateTime(1L) + .timeUnit(TimeUnit.SECONDS) + .isDispatchOrPrecisePublishRateLimiter(false) + .rateLimitFunction(this::apply) + .build(); } if (this.publishMaxByteRate > 0) { publishRateLimiterOnByte = - new RateLimiter(scheduledExecutorService, publishMaxByteRate, 1, TimeUnit.SECONDS, - this::apply); + RateLimiter.builder() + .scheduledExecutorService(scheduledExecutorService) + .permits(publishMaxByteRate) + .rateTime(1L) + .timeUnit(TimeUnit.SECONDS) + .isDispatchOrPrecisePublishRateLimiter(false) + .rateLimitFunction(this::apply) + .build(); } } else { this.publishMaxMessageRate = 0; diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java index f5ce8e84bcdc9..1bb2fcd9e08d6 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java @@ -92,11 +92,6 @@ public RateLimiter(final ScheduledExecutorService service, final long permits, f this(service, permits, rateTime, timeUnit, permitUpdater, false); } - public RateLimiter(final ScheduledExecutorService service, final long permits, final long rateTime, - final TimeUnit timeUnit, RateLimitFunction autoReadResetFunction) { - this(service, permits, rateTime, timeUnit, null,false,autoReadResetFunction); - } - public RateLimiter(final ScheduledExecutorService service, final long permits, final long rateTime, final TimeUnit timeUnit, Supplier permitUpdater, boolean isDispatchOrPrecisePublishRateLimiter) { this(service, permits, rateTime, timeUnit, permitUpdater, isDispatchOrPrecisePublishRateLimiter, From aad6d90f666d2bcbde80b0317b3a56fcf4043618 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=A2=A7=E8=BF=9C?= Date: Mon, 9 Aug 2021 15:29:49 +0800 Subject: [PATCH 3/3] chore:remove unnecessary settings --- .../broker/resourcegroup/ResourceGroupPublishLimiter.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupPublishLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupPublishLimiter.java index ecaddcf782207..bed31d284674e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupPublishLimiter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupPublishLimiter.java @@ -100,7 +100,6 @@ public void update(ResourceGroup resourceGroup) { .permits(publishMaxMessageRate) .rateTime(1L) .timeUnit(TimeUnit.SECONDS) - .isDispatchOrPrecisePublishRateLimiter(false) .rateLimitFunction(this::apply) .build(); } @@ -111,7 +110,6 @@ public void update(ResourceGroup resourceGroup) { .permits(publishMaxByteRate) .rateTime(1L) .timeUnit(TimeUnit.SECONDS) - .isDispatchOrPrecisePublishRateLimiter(false) .rateLimitFunction(this::apply) .build(); }