-
Notifications
You must be signed in to change notification settings - Fork 210
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
681 added dynamic throughput limiting
- Loading branch information
1 parent
c6e2e0f
commit d6a277e
Showing
15 changed files
with
400 additions
and
42 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
99 changes: 99 additions & 0 deletions
99
...in/java/pl/allegro/tech/hermes/frontend/publishing/handlers/DynamicThroughputLimiter.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,99 @@ | ||
package pl.allegro.tech.hermes.frontend.publishing.handlers; | ||
|
||
import com.codahale.metrics.Metered; | ||
import pl.allegro.tech.hermes.api.TopicName; | ||
|
||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.ScheduledExecutorService; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
import static pl.allegro.tech.hermes.frontend.publishing.handlers.ThroughputLimiter.QuotaInsight.globalQuotaViolation; | ||
import static pl.allegro.tech.hermes.frontend.publishing.handlers.ThroughputLimiter.QuotaInsight.quotaConfirmed; | ||
import static pl.allegro.tech.hermes.frontend.publishing.handlers.ThroughputLimiter.QuotaInsight.quotaViolation; | ||
|
||
public class DynamicThroughputLimiter implements ThroughputLimiter, Runnable { | ||
private final long max; | ||
private final long threshold; | ||
private final long desired; | ||
private final double idleThreshold; | ||
private final Metered globalThroughputMeter; | ||
|
||
private final ScheduledExecutorService executor; | ||
private final int checkInterval; | ||
|
||
private ConcurrentHashMap<TopicName, Throughput> users = new ConcurrentHashMap<>(); | ||
|
||
public DynamicThroughputLimiter(long max, | ||
long threshold, | ||
long desired, | ||
double idleThreshold, | ||
int checkInterval, | ||
Metered globalThroughput, | ||
ScheduledExecutorService executor) { | ||
this.max = max; | ||
this.threshold = threshold; | ||
this.desired = desired; | ||
this.idleThreshold = idleThreshold; | ||
this.globalThroughputMeter = globalThroughput; | ||
this.checkInterval = checkInterval; | ||
this.executor = executor; | ||
} | ||
|
||
@Override | ||
public QuotaInsight checkQuota(TopicName topic, Metered rate) { | ||
users.computeIfAbsent(topic, name -> new Throughput(rate, max)); | ||
Throughput throughput = users.get(topic); | ||
long value = throughput.getRoundedOneMinuteRate(); | ||
if (value > throughput.max) { | ||
return quotaViolation(value, throughput.max); | ||
} else if (globalThroughputMeter.getOneMinuteRate() > max) { | ||
return globalQuotaViolation(); | ||
} else { | ||
return quotaConfirmed(); | ||
} | ||
} | ||
|
||
@Override | ||
public void start() { | ||
executor.scheduleAtFixedRate(this, checkInterval, checkInterval, TimeUnit.SECONDS); | ||
} | ||
|
||
@Override | ||
public void run() { | ||
if (globalThroughputMeter.getOneMinuteRate() > threshold) { | ||
calibrateLimits(); | ||
} | ||
} | ||
|
||
private void calibrateLimits() { | ||
users.entrySet().removeIf(entry -> entry.getValue().getOneMinuteRate() <= idleThreshold); | ||
int userCount = users.size(); | ||
if (userCount > 0) { | ||
long total = users.reduceValuesToLong(10, Throughput::getRoundedOneMinuteRate, 0, ((left, right) -> left + right)); | ||
long mean = total / userCount; | ||
long desiredMean = desired / userCount; | ||
users.entrySet() | ||
.stream() | ||
.filter(entry -> entry.getValue().getRoundedOneMinuteRate() >= mean) | ||
.forEach(entry -> entry.getValue().max = desiredMean); | ||
} | ||
} | ||
|
||
private static class Throughput { | ||
Metered current; | ||
volatile long max; | ||
|
||
Throughput(Metered current, long max) { | ||
this.current = current; | ||
this.max = max; | ||
} | ||
|
||
long getRoundedOneMinuteRate() { | ||
return (long) Math.floor(current.getOneMinuteRate()); | ||
} | ||
|
||
double getOneMinuteRate() { | ||
return current.getOneMinuteRate(); | ||
} | ||
} | ||
} |
22 changes: 22 additions & 0 deletions
22
...main/java/pl/allegro/tech/hermes/frontend/publishing/handlers/FixedThroughputLimiter.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
package pl.allegro.tech.hermes.frontend.publishing.handlers; | ||
|
||
import com.codahale.metrics.Metered; | ||
import pl.allegro.tech.hermes.api.TopicName; | ||
|
||
import static pl.allegro.tech.hermes.frontend.publishing.handlers.ThroughputLimiter.QuotaInsight.quotaConfirmed; | ||
import static pl.allegro.tech.hermes.frontend.publishing.handlers.ThroughputLimiter.QuotaInsight.quotaViolation; | ||
|
||
public class FixedThroughputLimiter implements ThroughputLimiter { | ||
private long limit; | ||
|
||
public FixedThroughputLimiter(long limit) { | ||
this.limit = limit; | ||
} | ||
|
||
@Override | ||
public QuotaInsight checkQuota(TopicName topic, Metered throughput) { | ||
long rate = (long) Math.floor(throughput.getOneMinuteRate()); | ||
return rate > limit? quotaViolation(rate, limit) : quotaConfirmed(); | ||
} | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
55 changes: 40 additions & 15 deletions
55
.../src/main/java/pl/allegro/tech/hermes/frontend/publishing/handlers/ThroughputLimiter.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,27 +1,52 @@ | ||
package pl.allegro.tech.hermes.frontend.publishing.handlers; | ||
|
||
import pl.allegro.tech.hermes.frontend.metric.CachedTopic; | ||
import com.codahale.metrics.Metered; | ||
import pl.allegro.tech.hermes.api.TopicName; | ||
|
||
import static java.lang.String.format; | ||
|
||
public class ThroughputLimiter { | ||
private long limit; | ||
public interface ThroughputLimiter { | ||
QuotaInsight checkQuota(TopicName topic, Metered throughput); | ||
default void start() {} | ||
default void stop() {} | ||
|
||
public ThroughputLimiter(long limit) { | ||
this.limit = limit; | ||
} | ||
class QuotaInsight { | ||
private static final QuotaInsight QUOTA_CONFIRMED = new QuotaInsight(); | ||
private static final QuotaInsight GLOBAL_VIOLATION = new QuotaInsight(false, | ||
"Global throughput exceeded. Sorry for the inconvenience."); | ||
private static final String DEFAULT_REASON = "unknown"; | ||
|
||
private boolean hasQuota = true; | ||
private String reason; | ||
|
||
public void check(CachedTopic cachedTopic) { | ||
long current = (long) Math.floor(cachedTopic.getThroughput()); | ||
if (current > limit) { | ||
throw new QuotaViolationException(current, limit); | ||
private QuotaInsight() { | ||
} | ||
|
||
private QuotaInsight(boolean pass, String reason) { | ||
this.hasQuota = pass; | ||
this.reason = reason; | ||
} | ||
|
||
public boolean hasQuota() { | ||
return hasQuota; | ||
} | ||
} | ||
|
||
public static final class QuotaViolationException extends RuntimeException { | ||
QuotaViolationException(long current, long limit) { | ||
super(format("Current throughput exceeded limit [current:%s, limit:%s].", | ||
current, limit)); | ||
public String getReason() { | ||
return reason != null? reason : DEFAULT_REASON; | ||
} | ||
|
||
public static QuotaInsight quotaConfirmed() { | ||
return QUOTA_CONFIRMED; | ||
} | ||
|
||
public static QuotaInsight quotaViolation(long current, long limit) { | ||
return new QuotaInsight(false, | ||
format("Current throughput exceeded limit [current:%s, limit:%s].", current, limit)); | ||
} | ||
|
||
public static QuotaInsight globalQuotaViolation() { | ||
return GLOBAL_VIOLATION; | ||
} | ||
|
||
} | ||
} |
Oops, something went wrong.