Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Broker] Remove RateLimiter constructors and replace with builder usage #11599

Merged
merged 1 commit into from Aug 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -96,13 +96,22 @@ public void update(ResourceGroup resourceGroup) {
this.publishMaxByteRate = Math.max(resourceGroup.getPublishRateInBytes(), 0);
if (this.publishMaxMessageRate > 0) {
// TODO: pass the executor
publishRateLimiterOnMessage =
new RateLimiter(publishMaxMessageRate, 1, TimeUnit.SECONDS, this::apply);
publishRateLimiterOnMessage = RateLimiter.builder()
.permits(publishMaxMessageRate)
.rateTime(1L)
.timeUnit(TimeUnit.SECONDS)
.rateLimitFunction(this::apply)
.build();
}
if (this.publishMaxByteRate > 0) {
// TODO: pass the executor
publishRateLimiterOnByte =
new RateLimiter(publishMaxByteRate, 1, TimeUnit.SECONDS, this::apply);
RateLimiter.builder()
.permits(publishMaxByteRate)
.rateTime(1L)
.timeUnit(TimeUnit.SECONDS)
.rateLimitFunction(this::apply)
.build();
}
} else {
this.publishMaxMessageRate = 0;
Expand Down
Expand Up @@ -358,8 +358,15 @@ public synchronized void updateDispatchRate(DispatchRate dispatchRate) {
// update msg-rateLimiter
if (msgRate > 0) {
if (this.dispatchRateLimiterOnMessage == null) {
this.dispatchRateLimiterOnMessage = new RateLimiter(brokerService.pulsar().getExecutor(), msgRate,
ratePeriod, TimeUnit.SECONDS, permitUpdaterMsg, true);
this.dispatchRateLimiterOnMessage =
RateLimiter.builder()
.scheduledExecutorService(brokerService.pulsar().getExecutor())
.permits(msgRate)
.rateTime(ratePeriod)
.timeUnit(TimeUnit.SECONDS)
.permitUpdater(permitUpdaterMsg)
.isDispatchOrPrecisePublishRateLimiter(true)
.build();
} else {
this.dispatchRateLimiterOnMessage.setRate(msgRate, dispatchRate.getRatePeriodInSecond(),
TimeUnit.SECONDS, permitUpdaterMsg);
Expand All @@ -378,8 +385,15 @@ public synchronized void updateDispatchRate(DispatchRate dispatchRate) {
// update byte-rateLimiter
if (byteRate > 0) {
if (this.dispatchRateLimiterOnByte == null) {
this.dispatchRateLimiterOnByte = new RateLimiter(brokerService.pulsar().getExecutor(), byteRate,
ratePeriod, TimeUnit.SECONDS, permitUpdaterByte, true);
this.dispatchRateLimiterOnByte =
RateLimiter.builder()
.scheduledExecutorService(brokerService.pulsar().getExecutor())
.permits(byteRate)
.rateTime(ratePeriod)
.timeUnit(TimeUnit.SECONDS)
.permitUpdater(permitUpdaterByte)
.isDispatchOrPrecisePublishRateLimiter(true)
.build();
} else {
this.dispatchRateLimiterOnByte.setRate(byteRate, dispatchRate.getRatePeriodInSecond(),
TimeUnit.SECONDS, permitUpdaterByte);
Expand Down
Expand Up @@ -139,8 +139,12 @@ private synchronized void updateSubscribeRate(ConsumerIdentifier consumerIdentif
if (ratePerConsumer > 0) {
if (this.subscribeRateLimiter.get(consumerIdentifier) == null) {
this.subscribeRateLimiter.put(consumerIdentifier,
new RateLimiter(brokerService.pulsar().getExecutor(), ratePerConsumer,
ratePeriod, TimeUnit.SECONDS));
RateLimiter.builder()
.scheduledExecutorService(brokerService.pulsar().getExecutor())
.permits(ratePerConsumer)
.rateTime(ratePeriod)
.timeUnit(TimeUnit.SECONDS)
.build());
} else {
this.subscribeRateLimiter.get(consumerIdentifier)
.setRate(ratePerConsumer, ratePeriod, TimeUnit.SECONDS,
Expand Down
Expand Up @@ -62,42 +62,6 @@ public class RateLimiter implements AutoCloseable{
private RateLimitFunction rateLimitFunction;
private boolean isDispatchOrPrecisePublishRateLimiter;

public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit) {
this(null, permits, rateTime, timeUnit);
}

public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit, boolean isDispatchOrPrecisePublishRateLimiter) {
this(null, permits, rateTime, timeUnit, null, isDispatchOrPrecisePublishRateLimiter);
}

public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit,
RateLimitFunction autoReadResetFunction) {
this(null, permits, rateTime, timeUnit, null, false);
this.rateLimitFunction = autoReadResetFunction;
}

public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit,
RateLimitFunction autoReadResetFunction, boolean isDispatchOrPrecisePublishRateLimiter) {
this(null, permits, rateTime, timeUnit, null, isDispatchOrPrecisePublishRateLimiter);
this.rateLimitFunction = autoReadResetFunction;
}

public RateLimiter(final ScheduledExecutorService service, final long permits, final long rateTime,
final TimeUnit timeUnit) {
this(service, permits, rateTime, timeUnit, (Supplier<Long>) null);
}

public RateLimiter(final ScheduledExecutorService service, final long permits, final long rateTime,
final TimeUnit timeUnit, Supplier<Long> permitUpdater) {
this(service, permits, rateTime, timeUnit, permitUpdater, false);
}

public RateLimiter(final ScheduledExecutorService service, final long permits, final long rateTime,
final TimeUnit timeUnit, Supplier<Long> permitUpdater, boolean isDispatchOrPrecisePublishRateLimiter) {
this(service, permits, rateTime, timeUnit, permitUpdater, isDispatchOrPrecisePublishRateLimiter,
null);
}

@Builder
RateLimiter(final ScheduledExecutorService scheduledExecutorService, final long permits, final long rateTime,
final TimeUnit timeUnit, Supplier<Long> permitUpdater, boolean isDispatchOrPrecisePublishRateLimiter,
Expand Down
Expand Up @@ -33,14 +33,14 @@ public class RateLimiterTest {
@Test
public void testInvalidRenewTime() {
try {
new RateLimiter(0, 100, TimeUnit.SECONDS);
RateLimiter.builder().permits(0).rateTime(100).timeUnit(TimeUnit.SECONDS).build();
fail("should have thrown exception: invalid rate, must be > 0");
} catch (IllegalArgumentException ie) {
// Ok
}

try {
new RateLimiter(10, 0, TimeUnit.SECONDS);
RateLimiter.builder().permits(10).rateTime(0).timeUnit(TimeUnit.SECONDS).build();
fail("should have thrown exception: invalid rateTime, must be > 0");
} catch (IllegalArgumentException ie) {
// Ok
Expand All @@ -49,7 +49,7 @@ public void testInvalidRenewTime() {

@Test
public void testClose() throws Exception {
RateLimiter rate = new RateLimiter(1, 1000, TimeUnit.MILLISECONDS);
RateLimiter rate = RateLimiter.builder().permits(1).rateTime(1000).timeUnit(TimeUnit.MILLISECONDS).build();
assertFalse(rate.isClosed());
rate.close();
assertTrue(rate.isClosed());
Expand All @@ -64,7 +64,8 @@ public void testClose() throws Exception {
@Test
public void testAcquireBlock() throws Exception {
final long rateTimeMSec = 1000;
RateLimiter rate = new RateLimiter(1, rateTimeMSec, TimeUnit.MILLISECONDS);
RateLimiter rate = RateLimiter.builder().permits(1).rateTime(rateTimeMSec).timeUnit(TimeUnit.MILLISECONDS)
.build();
rate.acquire();
assertEquals(rate.getAvailablePermits(), 0);
long start = System.currentTimeMillis();
Expand All @@ -79,7 +80,8 @@ public void testAcquireBlock() throws Exception {
public void testAcquire() throws Exception {
final long rateTimeMSec = 1000;
final int permits = 100;
RateLimiter rate = new RateLimiter(permits, rateTimeMSec, TimeUnit.MILLISECONDS);
RateLimiter rate = RateLimiter.builder().permits(permits).rateTime(rateTimeMSec).timeUnit(TimeUnit.MILLISECONDS)
.build();
long start = System.currentTimeMillis();
for (int i = 0; i < permits; i++) {
rate.acquire();
Expand All @@ -95,7 +97,8 @@ public void testMultipleAcquire() throws Exception {
final long rateTimeMSec = 1000;
final int permits = 100;
final int acquirePermits = 50;
RateLimiter rate = new RateLimiter(permits, rateTimeMSec, TimeUnit.MILLISECONDS);
RateLimiter rate = RateLimiter.builder().permits(permits).rateTime(rateTimeMSec).timeUnit(TimeUnit.MILLISECONDS)
.build();
long start = System.currentTimeMillis();
for (int i = 0; i < permits / acquirePermits; i++) {
rate.acquire(acquirePermits);
Expand All @@ -109,7 +112,8 @@ public void testMultipleAcquire() throws Exception {
@Test
public void testTryAcquireNoPermits() {
final long rateTimeMSec = 1000;
RateLimiter rate = new RateLimiter(1, rateTimeMSec, TimeUnit.MILLISECONDS);
RateLimiter rate = RateLimiter.builder().permits(1).rateTime(rateTimeMSec).timeUnit(TimeUnit.MILLISECONDS)
.build();
assertTrue(rate.tryAcquire());
assertFalse(rate.tryAcquire());
assertEquals(rate.getAvailablePermits(), 0);
Expand All @@ -120,7 +124,8 @@ public void testTryAcquireNoPermits() {
public void testTryAcquire() {
final long rateTimeMSec = 1000;
final int permits = 100;
RateLimiter rate = new RateLimiter(permits, rateTimeMSec, TimeUnit.MILLISECONDS);
RateLimiter rate = RateLimiter.builder().permits(permits).rateTime(rateTimeMSec).timeUnit(TimeUnit.MILLISECONDS)
.build();
for (int i = 0; i < permits; i++) {
rate.tryAcquire();
}
Expand All @@ -133,7 +138,8 @@ public void testMultipleTryAcquire() {
final long rateTimeMSec = 1000;
final int permits = 100;
final int acquirePermits = 50;
RateLimiter rate = new RateLimiter(permits, rateTimeMSec, TimeUnit.MILLISECONDS);
RateLimiter rate = RateLimiter.builder().permits(permits).rateTime(rateTimeMSec).timeUnit(TimeUnit.MILLISECONDS)
.build();
for (int i = 0; i < permits / acquirePermits; i++) {
rate.tryAcquire(acquirePermits);
}
Expand All @@ -145,7 +151,8 @@ public void testMultipleTryAcquire() {
public void testResetRate() throws Exception {
final long rateTimeMSec = 1000;
final int permits = 100;
RateLimiter rate = new RateLimiter(permits, rateTimeMSec, TimeUnit.MILLISECONDS);
RateLimiter rate = RateLimiter.builder().permits(permits).rateTime(rateTimeMSec).timeUnit(TimeUnit.MILLISECONDS)
.build();
rate.tryAcquire(permits);
assertEquals(rate.getAvailablePermits(), 0);
// check after a rate-time: permits must be renewed
Expand All @@ -168,7 +175,9 @@ public void testResetRate() throws Exception {
public void testDispatchRate() throws Exception {
final long rateTimeMSec = 1000;
final int permits = 100;
RateLimiter rate = new RateLimiter(null, permits, rateTimeMSec, TimeUnit.MILLISECONDS, null, true);
RateLimiter rate = RateLimiter.builder().permits(permits).rateTime(rateTimeMSec).timeUnit(TimeUnit.MILLISECONDS)
.isDispatchOrPrecisePublishRateLimiter(true)
.build();
rate.tryAcquire(100);
rate.tryAcquire(100);
rate.tryAcquire(100);
Expand All @@ -191,7 +200,9 @@ public void testRateLimiterWithPermitUpdater() throws Exception {
long rateTime = 1;
long newUpdatedRateLimit = 100L;
Supplier<Long> permitUpdater = () -> newUpdatedRateLimit;
RateLimiter limiter = new RateLimiter(null, permits, 1, TimeUnit.SECONDS, permitUpdater);
RateLimiter limiter = RateLimiter.builder().permits(permits).rateTime(1).timeUnit(TimeUnit.SECONDS)
.permitUpdater(permitUpdater)
.build();
limiter.acquire();
Thread.sleep(rateTime * 3 * 1000);
assertEquals(limiter.getAvailablePermits(), newUpdatedRateLimit);
Expand All @@ -204,8 +215,10 @@ public void testRateLimiterWithFunction() {
long rateTime = 1;
int reNewTime = 3;
RateLimitFunction rateLimitFunction = atomicInteger::incrementAndGet;
RateLimiter rateLimiter = new RateLimiter(permits, rateTime, TimeUnit.SECONDS, rateLimitFunction);
for (int i = 0 ; i < reNewTime; i++) {
RateLimiter rateLimiter = RateLimiter.builder().permits(permits).rateTime(rateTime).timeUnit(TimeUnit.SECONDS)
.rateLimitFunction(rateLimitFunction)
.build();
for (int i = 0; i < reNewTime; i++) {
rateLimiter.renew();
}
assertEquals(reNewTime, atomicInteger.get());
Expand Down
Expand Up @@ -262,8 +262,18 @@ public FunctionStatsManager(FunctionCollectorRegistry collectorRegistry,
.help("Exception from sink.")
.create());

userExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES);
sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES);
userExceptionRateLimiter = RateLimiter.builder()
.scheduledExecutorService(scheduledExecutorService)
.permits(5)
.rateTime(1)
.timeUnit(TimeUnit.MINUTES)
.build();
sysExceptionRateLimiter = RateLimiter.builder()
.scheduledExecutorService(scheduledExecutorService)
.permits(5)
.rateTime(1)
.timeUnit(TimeUnit.MINUTES)
.build();
}

public void addUserException(Throwable ex) {
Expand Down
Expand Up @@ -196,8 +196,18 @@ public SinkStatsManager(FunctionCollectorRegistry collectorRegistry, String[] me
.help("Exception from sink.")
.create());

sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES);
sinkExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES);
sysExceptionRateLimiter = RateLimiter.builder()
.scheduledExecutorService(scheduledExecutorService)
.permits(5)
.rateTime(1)
.timeUnit(TimeUnit.MINUTES)
.build();
sinkExceptionRateLimiter = RateLimiter.builder()
.scheduledExecutorService(scheduledExecutorService)
.permits(5)
.rateTime(1)
.timeUnit(TimeUnit.MINUTES)
.build();
}

@Override
Expand Down
Expand Up @@ -195,8 +195,18 @@ public SourceStatsManager(FunctionCollectorRegistry collectorRegistry, String[]
.help("Exception from source.")
.create());

sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES);
sourceExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES);
sysExceptionRateLimiter = RateLimiter.builder()
.scheduledExecutorService(scheduledExecutorService)
.permits(5)
.rateTime(1)
.timeUnit(TimeUnit.MINUTES)
.build();
sourceExceptionRateLimiter = RateLimiter.builder()
.scheduledExecutorService(scheduledExecutorService)
.permits(5)
.rateTime(1)
.timeUnit(TimeUnit.MINUTES)
.build();
}

@Override
Expand Down