From fcd3336c083f359e9f5a5d624edffb21df62d9d1 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Mon, 9 Aug 2021 19:32:09 +0300 Subject: [PATCH] Remove RateLimiter constructors and replace with builder usage (#11599) --- .../ResourceGroupPublishLimiter.java | 15 +++++-- .../persistent/DispatchRateLimiter.java | 22 ++++++++-- .../persistent/SubscribeRateLimiter.java | 8 +++- .../pulsar/common/util/RateLimiter.java | 36 ---------------- .../pulsar/common/util/RateLimiterTest.java | 41 ++++++++++++------- .../instance/stats/FunctionStatsManager.java | 14 ++++++- .../instance/stats/SinkStatsManager.java | 14 ++++++- .../instance/stats/SourceStatsManager.java | 14 ++++++- 8 files changed, 99 insertions(+), 65 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupPublishLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupPublishLimiter.java index 5e75799b36fed..9a80ed046a4e6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupPublishLimiter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupPublishLimiter.java @@ -96,13 +96,22 @@ public void update(ResourceGroup resourceGroup) { this.publishMaxByteRate = Math.max(resourceGroup.getPublishRateInBytes(), 0); if (this.publishMaxMessageRate > 0) { // TODO: pass the executor - publishRateLimiterOnMessage = - new RateLimiter(publishMaxMessageRate, 1, TimeUnit.SECONDS, this::apply); + publishRateLimiterOnMessage = RateLimiter.builder() + .permits(publishMaxMessageRate) + .rateTime(1L) + .timeUnit(TimeUnit.SECONDS) + .rateLimitFunction(this::apply) + .build(); } if (this.publishMaxByteRate > 0) { // TODO: pass the executor publishRateLimiterOnByte = - new RateLimiter(publishMaxByteRate, 1, TimeUnit.SECONDS, this::apply); + RateLimiter.builder() + .permits(publishMaxByteRate) + .rateTime(1L) + .timeUnit(TimeUnit.SECONDS) + .rateLimitFunction(this::apply) + .build(); } } else { this.publishMaxMessageRate = 0; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java index 994d27418e76e..45e322dae4ff3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java @@ -358,8 +358,15 @@ public synchronized void updateDispatchRate(DispatchRate dispatchRate) { // update msg-rateLimiter if (msgRate > 0) { if (this.dispatchRateLimiterOnMessage == null) { - this.dispatchRateLimiterOnMessage = new RateLimiter(brokerService.pulsar().getExecutor(), msgRate, - ratePeriod, TimeUnit.SECONDS, permitUpdaterMsg, true); + this.dispatchRateLimiterOnMessage = + RateLimiter.builder() + .scheduledExecutorService(brokerService.pulsar().getExecutor()) + .permits(msgRate) + .rateTime(ratePeriod) + .timeUnit(TimeUnit.SECONDS) + .permitUpdater(permitUpdaterMsg) + .isDispatchOrPrecisePublishRateLimiter(true) + .build(); } else { this.dispatchRateLimiterOnMessage.setRate(msgRate, dispatchRate.getRatePeriodInSecond(), TimeUnit.SECONDS, permitUpdaterMsg); @@ -378,8 +385,15 @@ public synchronized void updateDispatchRate(DispatchRate dispatchRate) { // update byte-rateLimiter if (byteRate > 0) { if (this.dispatchRateLimiterOnByte == null) { - this.dispatchRateLimiterOnByte = new RateLimiter(brokerService.pulsar().getExecutor(), byteRate, - ratePeriod, TimeUnit.SECONDS, permitUpdaterByte, true); + this.dispatchRateLimiterOnByte = + RateLimiter.builder() + .scheduledExecutorService(brokerService.pulsar().getExecutor()) + .permits(byteRate) + .rateTime(ratePeriod) + .timeUnit(TimeUnit.SECONDS) + .permitUpdater(permitUpdaterByte) + .isDispatchOrPrecisePublishRateLimiter(true) + .build(); } else { this.dispatchRateLimiterOnByte.setRate(byteRate, dispatchRate.getRatePeriodInSecond(), TimeUnit.SECONDS, permitUpdaterByte); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java index a13328cc85933..bac2a17ed2bd5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java @@ -139,8 +139,12 @@ private synchronized void updateSubscribeRate(ConsumerIdentifier consumerIdentif if (ratePerConsumer > 0) { if (this.subscribeRateLimiter.get(consumerIdentifier) == null) { this.subscribeRateLimiter.put(consumerIdentifier, - new RateLimiter(brokerService.pulsar().getExecutor(), ratePerConsumer, - ratePeriod, TimeUnit.SECONDS)); + RateLimiter.builder() + .scheduledExecutorService(brokerService.pulsar().getExecutor()) + .permits(ratePerConsumer) + .rateTime(ratePeriod) + .timeUnit(TimeUnit.SECONDS) + .build()); } else { this.subscribeRateLimiter.get(consumerIdentifier) .setRate(ratePerConsumer, ratePeriod, TimeUnit.SECONDS, diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java index 1bb2fcd9e08d6..61a9177343764 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java @@ -62,42 +62,6 @@ public class RateLimiter implements AutoCloseable{ private RateLimitFunction rateLimitFunction; private boolean isDispatchOrPrecisePublishRateLimiter; - public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit) { - this(null, permits, rateTime, timeUnit); - } - - public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit, boolean isDispatchOrPrecisePublishRateLimiter) { - this(null, permits, rateTime, timeUnit, null, isDispatchOrPrecisePublishRateLimiter); - } - - public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit, - RateLimitFunction autoReadResetFunction) { - this(null, permits, rateTime, timeUnit, null, false); - this.rateLimitFunction = autoReadResetFunction; - } - - public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit, - RateLimitFunction autoReadResetFunction, boolean isDispatchOrPrecisePublishRateLimiter) { - this(null, permits, rateTime, timeUnit, null, isDispatchOrPrecisePublishRateLimiter); - this.rateLimitFunction = autoReadResetFunction; - } - - public RateLimiter(final ScheduledExecutorService service, final long permits, final long rateTime, - final TimeUnit timeUnit) { - this(service, permits, rateTime, timeUnit, (Supplier) null); - } - - public RateLimiter(final ScheduledExecutorService service, final long permits, final long rateTime, - final TimeUnit timeUnit, Supplier permitUpdater) { - this(service, permits, rateTime, timeUnit, permitUpdater, false); - } - - public RateLimiter(final ScheduledExecutorService service, final long permits, final long rateTime, - final TimeUnit timeUnit, Supplier permitUpdater, boolean isDispatchOrPrecisePublishRateLimiter) { - this(service, permits, rateTime, timeUnit, permitUpdater, isDispatchOrPrecisePublishRateLimiter, - null); - } - @Builder RateLimiter(final ScheduledExecutorService scheduledExecutorService, final long permits, final long rateTime, final TimeUnit timeUnit, Supplier permitUpdater, boolean isDispatchOrPrecisePublishRateLimiter, diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/RateLimiterTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/RateLimiterTest.java index 61336f4210b8b..788ab749390db 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/RateLimiterTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/RateLimiterTest.java @@ -33,14 +33,14 @@ public class RateLimiterTest { @Test public void testInvalidRenewTime() { try { - new RateLimiter(0, 100, TimeUnit.SECONDS); + RateLimiter.builder().permits(0).rateTime(100).timeUnit(TimeUnit.SECONDS).build(); fail("should have thrown exception: invalid rate, must be > 0"); } catch (IllegalArgumentException ie) { // Ok } try { - new RateLimiter(10, 0, TimeUnit.SECONDS); + RateLimiter.builder().permits(10).rateTime(0).timeUnit(TimeUnit.SECONDS).build(); fail("should have thrown exception: invalid rateTime, must be > 0"); } catch (IllegalArgumentException ie) { // Ok @@ -49,7 +49,7 @@ public void testInvalidRenewTime() { @Test public void testClose() throws Exception { - RateLimiter rate = new RateLimiter(1, 1000, TimeUnit.MILLISECONDS); + RateLimiter rate = RateLimiter.builder().permits(1).rateTime(1000).timeUnit(TimeUnit.MILLISECONDS).build(); assertFalse(rate.isClosed()); rate.close(); assertTrue(rate.isClosed()); @@ -64,7 +64,8 @@ public void testClose() throws Exception { @Test public void testAcquireBlock() throws Exception { final long rateTimeMSec = 1000; - RateLimiter rate = new RateLimiter(1, rateTimeMSec, TimeUnit.MILLISECONDS); + RateLimiter rate = RateLimiter.builder().permits(1).rateTime(rateTimeMSec).timeUnit(TimeUnit.MILLISECONDS) + .build(); rate.acquire(); assertEquals(rate.getAvailablePermits(), 0); long start = System.currentTimeMillis(); @@ -79,7 +80,8 @@ public void testAcquireBlock() throws Exception { public void testAcquire() throws Exception { final long rateTimeMSec = 1000; final int permits = 100; - RateLimiter rate = new RateLimiter(permits, rateTimeMSec, TimeUnit.MILLISECONDS); + RateLimiter rate = RateLimiter.builder().permits(permits).rateTime(rateTimeMSec).timeUnit(TimeUnit.MILLISECONDS) + .build(); long start = System.currentTimeMillis(); for (int i = 0; i < permits; i++) { rate.acquire(); @@ -95,7 +97,8 @@ public void testMultipleAcquire() throws Exception { final long rateTimeMSec = 1000; final int permits = 100; final int acquirePermits = 50; - RateLimiter rate = new RateLimiter(permits, rateTimeMSec, TimeUnit.MILLISECONDS); + RateLimiter rate = RateLimiter.builder().permits(permits).rateTime(rateTimeMSec).timeUnit(TimeUnit.MILLISECONDS) + .build(); long start = System.currentTimeMillis(); for (int i = 0; i < permits / acquirePermits; i++) { rate.acquire(acquirePermits); @@ -109,7 +112,8 @@ public void testMultipleAcquire() throws Exception { @Test public void testTryAcquireNoPermits() { final long rateTimeMSec = 1000; - RateLimiter rate = new RateLimiter(1, rateTimeMSec, TimeUnit.MILLISECONDS); + RateLimiter rate = RateLimiter.builder().permits(1).rateTime(rateTimeMSec).timeUnit(TimeUnit.MILLISECONDS) + .build(); assertTrue(rate.tryAcquire()); assertFalse(rate.tryAcquire()); assertEquals(rate.getAvailablePermits(), 0); @@ -120,7 +124,8 @@ public void testTryAcquireNoPermits() { public void testTryAcquire() { final long rateTimeMSec = 1000; final int permits = 100; - RateLimiter rate = new RateLimiter(permits, rateTimeMSec, TimeUnit.MILLISECONDS); + RateLimiter rate = RateLimiter.builder().permits(permits).rateTime(rateTimeMSec).timeUnit(TimeUnit.MILLISECONDS) + .build(); for (int i = 0; i < permits; i++) { rate.tryAcquire(); } @@ -133,7 +138,8 @@ public void testMultipleTryAcquire() { final long rateTimeMSec = 1000; final int permits = 100; final int acquirePermits = 50; - RateLimiter rate = new RateLimiter(permits, rateTimeMSec, TimeUnit.MILLISECONDS); + RateLimiter rate = RateLimiter.builder().permits(permits).rateTime(rateTimeMSec).timeUnit(TimeUnit.MILLISECONDS) + .build(); for (int i = 0; i < permits / acquirePermits; i++) { rate.tryAcquire(acquirePermits); } @@ -145,7 +151,8 @@ public void testMultipleTryAcquire() { public void testResetRate() throws Exception { final long rateTimeMSec = 1000; final int permits = 100; - RateLimiter rate = new RateLimiter(permits, rateTimeMSec, TimeUnit.MILLISECONDS); + RateLimiter rate = RateLimiter.builder().permits(permits).rateTime(rateTimeMSec).timeUnit(TimeUnit.MILLISECONDS) + .build(); rate.tryAcquire(permits); assertEquals(rate.getAvailablePermits(), 0); // check after a rate-time: permits must be renewed @@ -168,7 +175,9 @@ public void testResetRate() throws Exception { public void testDispatchRate() throws Exception { final long rateTimeMSec = 1000; final int permits = 100; - RateLimiter rate = new RateLimiter(null, permits, rateTimeMSec, TimeUnit.MILLISECONDS, null, true); + RateLimiter rate = RateLimiter.builder().permits(permits).rateTime(rateTimeMSec).timeUnit(TimeUnit.MILLISECONDS) + .isDispatchOrPrecisePublishRateLimiter(true) + .build(); rate.tryAcquire(100); rate.tryAcquire(100); rate.tryAcquire(100); @@ -191,7 +200,9 @@ public void testRateLimiterWithPermitUpdater() throws Exception { long rateTime = 1; long newUpdatedRateLimit = 100L; Supplier permitUpdater = () -> newUpdatedRateLimit; - RateLimiter limiter = new RateLimiter(null, permits, 1, TimeUnit.SECONDS, permitUpdater); + RateLimiter limiter = RateLimiter.builder().permits(permits).rateTime(1).timeUnit(TimeUnit.SECONDS) + .permitUpdater(permitUpdater) + .build(); limiter.acquire(); Thread.sleep(rateTime * 3 * 1000); assertEquals(limiter.getAvailablePermits(), newUpdatedRateLimit); @@ -204,8 +215,10 @@ public void testRateLimiterWithFunction() { long rateTime = 1; int reNewTime = 3; RateLimitFunction rateLimitFunction = atomicInteger::incrementAndGet; - RateLimiter rateLimiter = new RateLimiter(permits, rateTime, TimeUnit.SECONDS, rateLimitFunction); - for (int i = 0 ; i < reNewTime; i++) { + RateLimiter rateLimiter = RateLimiter.builder().permits(permits).rateTime(rateTime).timeUnit(TimeUnit.SECONDS) + .rateLimitFunction(rateLimitFunction) + .build(); + for (int i = 0; i < reNewTime; i++) { rateLimiter.renew(); } assertEquals(reNewTime, atomicInteger.get()); diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java index b0083714ba52f..e42ef3e32657d 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java @@ -262,8 +262,18 @@ public FunctionStatsManager(FunctionCollectorRegistry collectorRegistry, .help("Exception from sink.") .create()); - userExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES); - sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES); + userExceptionRateLimiter = RateLimiter.builder() + .scheduledExecutorService(scheduledExecutorService) + .permits(5) + .rateTime(1) + .timeUnit(TimeUnit.MINUTES) + .build(); + sysExceptionRateLimiter = RateLimiter.builder() + .scheduledExecutorService(scheduledExecutorService) + .permits(5) + .rateTime(1) + .timeUnit(TimeUnit.MINUTES) + .build(); } public void addUserException(Throwable ex) { diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java index 536f55a9a3919..8270aec0e58be 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java @@ -196,8 +196,18 @@ public SinkStatsManager(FunctionCollectorRegistry collectorRegistry, String[] me .help("Exception from sink.") .create()); - sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES); - sinkExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES); + sysExceptionRateLimiter = RateLimiter.builder() + .scheduledExecutorService(scheduledExecutorService) + .permits(5) + .rateTime(1) + .timeUnit(TimeUnit.MINUTES) + .build(); + sinkExceptionRateLimiter = RateLimiter.builder() + .scheduledExecutorService(scheduledExecutorService) + .permits(5) + .rateTime(1) + .timeUnit(TimeUnit.MINUTES) + .build(); } @Override diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java index 451a8adc5e6f2..5236128e934d0 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java @@ -195,8 +195,18 @@ public SourceStatsManager(FunctionCollectorRegistry collectorRegistry, String[] .help("Exception from source.") .create()); - sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES); - sourceExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES); + sysExceptionRateLimiter = RateLimiter.builder() + .scheduledExecutorService(scheduledExecutorService) + .permits(5) + .rateTime(1) + .timeUnit(TimeUnit.MINUTES) + .build(); + sourceExceptionRateLimiter = RateLimiter.builder() + .scheduledExecutorService(scheduledExecutorService) + .permits(5) + .rateTime(1) + .timeUnit(TimeUnit.MINUTES) + .build(); } @Override