Skip to content

Commit

Permalink
681 added dynamic throughput limiting
Browse files Browse the repository at this point in the history
  • Loading branch information
wojtkiewicz committed Feb 17, 2017
1 parent b30906a commit 9755cec
Show file tree
Hide file tree
Showing 15 changed files with 401 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public enum ErrorCode {
OAUTH_PROVIDER_ALREADY_EXISTS(BAD_REQUEST),
CROWD_GROUPS_COULD_NOT_BE_LOADED(INTERNAL_SERVER_ERROR),
TOPIC_BLACKLISTED(FORBIDDEN),
QUOTA_VIOLATION(429),
THROUGHPUT_QUOTA_VIOLATION(429),
TOPIC_NOT_UNBLACKLISTED(BAD_REQUEST),
OWNER_SOURCE_NOT_FOUND(NOT_FOUND),
OWNER_SOURCE_DOESNT_SUPPORT_AUTOCOMPLETE(BAD_REQUEST),
Expand All @@ -48,11 +48,11 @@ public enum ErrorCode {

private final int httpCode;

private ErrorCode(Response.Status httpCode) {
ErrorCode(Response.Status httpCode) {
this.httpCode = httpCode.getStatusCode();
}

private ErrorCode(int httpCode) {
ErrorCode(int httpCode) {
this.httpCode = httpCode;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,13 @@ public enum Configs {
FRONTEND_GRACEFUL_SHUTDOWN_INITIAL_WAIT_MS("frontend.graceful.shutdown.initial.wait.ms", 10000),
FRONTEND_HTTP2_ENABLED("frontend.http2.enabled", false),
FRONTEND_FORCE_TOPIC_MAX_MESSAGE_SIZE("frontend.force.topic.max.message.size", false),
FRONTEND_MAX_THROUGHPUT("frontend.throughput.max", Long.MAX_VALUE),
FRONTEND_THROUGHPUT_TYPE("frontend.throughput.type", "unlimited"),
FRONTEND_THROUGHPUT_FIXED_MAX("frontend.throughput.fixed.max", Long.MAX_VALUE),
FRONTEND_THROUGHPUT_DYNAMIC_MAX("frontend.throughput.dynamic.max", Long.MAX_VALUE),
FRONTEND_THROUGHPUT_DYNAMIC_THRESHOLD("frontend.throughput.dynamic.threshold", Long.MAX_VALUE),
FRONTEND_THROUGHPUT_DYNAMIC_DESIRED("frontend.throughput.dynamic.desired", Long.MAX_VALUE),
FRONTEND_THROUGHPUT_DYNAMIC_IDLE("frontend.throughput.dynamic.idle", 0.5),
FRONTEND_THROUGHPUT_DYNAMIC_CHECK_INTERVAL("frontend.throughput.dynamic.interval.seconds", 30),

FRONTEND_SSL_ENABLED("frontend.ssl.enabled", false),
FRONTEND_SSL_PORT("frontend.ssl.port", 8443),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import pl.allegro.tech.hermes.frontend.producer.kafka.KafkaMessageProducerFactory;
import pl.allegro.tech.hermes.frontend.producer.kafka.Producers;
import pl.allegro.tech.hermes.frontend.publishing.handlers.HandlersChainFactory;
import pl.allegro.tech.hermes.frontend.publishing.handlers.ThroughputLimiter;
import pl.allegro.tech.hermes.frontend.publishing.handlers.ThroughputLimiterFactory;
import pl.allegro.tech.hermes.frontend.publishing.handlers.end.MessageEndProcessor;
import pl.allegro.tech.hermes.frontend.publishing.handlers.end.MessageErrorProcessor;
import pl.allegro.tech.hermes.frontend.publishing.message.MessageContentTypeEnforcer;
Expand Down Expand Up @@ -61,6 +63,7 @@ protected void configure() {
bindFactory(HandlersChainFactory.class).to(HttpHandler.class).in(Singleton.class);
bindFactory(KafkaMessageProducerFactory.class).to(Producers.class).in(Singleton.class);
bindFactory(KafkaBrokerMessageProducerFactory.class).to(BrokerMessageProducer.class).in(Singleton.class);
bindFactory(ThroughputLimiterFactory.class).to(ThroughputLimiter.class).in(Singleton.class);
bindSingleton(PublishingMessageTracker.class);
bindSingleton(NoOperationPublishingTracker.class);
bindFactory(TopicsCacheFactory.class).to(TopicsCache.class).in(Singleton.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.codahale.metrics.Counter;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Metered;
import com.codahale.metrics.Timer;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.api.TopicName;
Expand Down Expand Up @@ -161,7 +162,7 @@ public void markDelayedProcessing() {
globalDelayedProcessingMeter.mark();
}

public double getThroughput() {
return topicThroughputMeter.getOneMinuteRate();
public Metered getThroughput() {
return topicThroughputMeter;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package pl.allegro.tech.hermes.frontend.publishing.handlers;

import com.codahale.metrics.Metered;
import pl.allegro.tech.hermes.api.TopicName;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import static pl.allegro.tech.hermes.frontend.publishing.handlers.ThroughputLimiter.QuotaInsight.globalQuotaViolation;
import static pl.allegro.tech.hermes.frontend.publishing.handlers.ThroughputLimiter.QuotaInsight.quotaConfirmed;
import static pl.allegro.tech.hermes.frontend.publishing.handlers.ThroughputLimiter.QuotaInsight.quotaViolation;

public class DynamicThroughputLimiter implements ThroughputLimiter, Runnable {
private final long max;
private final long threshold;
private final long desired;
private final double idleThreshold;
private final Metered globalThroughputMeter;

private final ScheduledExecutorService executor;
private final int checkInterval;

private ConcurrentHashMap<TopicName, Throughput> users = new ConcurrentHashMap<>();

public DynamicThroughputLimiter(long max,
long threshold,
long desired,
double idleThreshold,
int checkInterval,
Metered globalThroughput,
ScheduledExecutorService executor) {
this.max = max;
this.threshold = threshold;
this.desired = desired;
this.idleThreshold = idleThreshold;
this.globalThroughputMeter = globalThroughput;
this.checkInterval = checkInterval;
this.executor = executor;
}

@Override
public QuotaInsight checkQuota(TopicName topic, Metered rate) {
Throughput throughput = users.computeIfAbsent(topic, name -> new Throughput(rate, max));
long value = throughput.getRoundedOneMinuteRate();
if (value > throughput.max) {
return quotaViolation(value, throughput.max);
} else if (globalThroughputMeter.getOneMinuteRate() > max) {
return globalQuotaViolation();
} else {
return quotaConfirmed();
}
}

@Override
public void start() {
executor.scheduleAtFixedRate(this, checkInterval, checkInterval, TimeUnit.SECONDS);
}

@Override
public void run() {
if (globalThroughputMeter.getOneMinuteRate() > threshold) {
calibrateLimits();
}
}

private void calibrateLimits() {
users.entrySet().removeIf(entry -> entry.getValue().getOneMinuteRate() <= idleThreshold);
int userCount = users.size();
if (userCount > 0) {
long total = users.reduceValuesToLong(Long.MAX_VALUE, Throughput::getRoundedOneMinuteRate, 0, ((left, right) -> left + right));
long mean = total / userCount;
long desiredMean = desired / userCount;
users.entrySet()
.stream()
.filter(entry -> entry.getValue().getRoundedOneMinuteRate() >= mean)
.forEach(entry -> entry.getValue().max = desiredMean);
}
}

private static class Throughput {
Metered current;
volatile long max;

Throughput(Metered current, long max) {
this.current = current;
this.max = max;
}

long getRoundedOneMinuteRate() {
return (long) Math.floor(current.getOneMinuteRate());
}

double getOneMinuteRate() {
return current.getOneMinuteRate();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package pl.allegro.tech.hermes.frontend.publishing.handlers;

import com.codahale.metrics.Metered;
import pl.allegro.tech.hermes.api.TopicName;

import static pl.allegro.tech.hermes.frontend.publishing.handlers.ThroughputLimiter.QuotaInsight.quotaConfirmed;
import static pl.allegro.tech.hermes.frontend.publishing.handlers.ThroughputLimiter.QuotaInsight.quotaViolation;

public class FixedThroughputLimiter implements ThroughputLimiter {
private long limit;

public FixedThroughputLimiter(long limit) {
this.limit = limit;
}

@Override
public QuotaInsight checkQuota(TopicName topic, Metered throughput) {
long rate = (long) Math.floor(throughput.getOneMinuteRate());
return rate > limit ? quotaViolation(rate, limit) : quotaConfirmed();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ public class HandlersChainFactory implements Factory<HttpHandler> {
private final BrokerMessageProducer brokerMessageProducer;
private final MessagePreviewLog previewLog;
private final boolean previewEnabled;
private final ThroughputLimiter throughputLimiter;

@Inject
public HandlersChainFactory(TopicsCache topicsCache, MessageErrorProcessor messageErrorProcessor,
MessageEndProcessor messageEndProcessor, ConfigFactory configFactory, MessageFactory messageFactory,
BrokerMessageProducer brokerMessageProducer, MessagePreviewLog messagePreviewLog) {
BrokerMessageProducer brokerMessageProducer, MessagePreviewLog messagePreviewLog,
ThroughputLimiter throughputLimiter) {
this.topicsCache = topicsCache;
this.messageErrorProcessor = messageErrorProcessor;
this.messageEndProcessor = messageEndProcessor;
Expand All @@ -36,6 +38,7 @@ public HandlersChainFactory(TopicsCache topicsCache, MessageErrorProcessor messa
this.brokerMessageProducer = brokerMessageProducer;
this.previewLog = messagePreviewLog;
this.previewEnabled = configFactory.getBooleanProperty(Configs.FRONTEND_MESSAGE_PREVIEW_ENABLED);
this.throughputLimiter = throughputLimiter;
}

@Override
Expand All @@ -44,7 +47,8 @@ public HttpHandler provide() {
HttpHandler messageCreateHandler = new MessageCreateHandler(publishing, messageFactory, messageErrorProcessor);
HttpHandler timeoutHandler = new TimeoutHandler(messageEndProcessor, messageErrorProcessor);
HttpHandler handlerAfterRead = previewEnabled ? new PreviewHandler(messageCreateHandler, previewLog) : messageCreateHandler;
HttpHandler readHandler = new MessageReadHandler(handlerAfterRead, timeoutHandler, configFactory, messageErrorProcessor);
HttpHandler readHandler = new MessageReadHandler(handlerAfterRead, timeoutHandler, configFactory,
messageErrorProcessor, throughputLimiter);
TopicHandler topicHandler = new TopicHandler(readHandler, topicsCache, messageErrorProcessor);

return topicHandler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.commons.lang3.exception.ExceptionUtils.getRootCauseMessage;
import static pl.allegro.tech.hermes.api.ErrorCode.INTERNAL_ERROR;
import static pl.allegro.tech.hermes.api.ErrorCode.QUOTA_VIOLATION;
import static pl.allegro.tech.hermes.api.ErrorCode.THROUGHPUT_QUOTA_VIOLATION;
import static pl.allegro.tech.hermes.api.ErrorCode.VALIDATION_ERROR;
import static pl.allegro.tech.hermes.api.ErrorDescription.error;

Expand All @@ -31,14 +31,14 @@ class MessageReadHandler implements HttpHandler {
private final ThroughputLimiter throughputLimiter;

MessageReadHandler(HttpHandler next, HttpHandler timeoutHandler, ConfigFactory configFactory,
MessageErrorProcessor messageErrorProcessor) {
MessageErrorProcessor messageErrorProcessor, ThroughputLimiter throughputLimiter) {
this.next = next;
this.timeoutHandler = timeoutHandler;
this.messageErrorProcessor = messageErrorProcessor;
this.contentLengthChecker = new ContentLengthChecker(configFactory);
this.defaultAsyncTimeout = configFactory.getIntProperty(Configs.FRONTEND_IDLE_TIMEOUT);
this.longAsyncTimeout = configFactory.getIntProperty(Configs.FRONTEND_LONG_IDLE_TIMEOUT);
this.throughputLimiter = new ThroughputLimiter(configFactory.getLongProperty(Configs.FRONTEND_MAX_THROUGHPUT));
this.throughputLimiter = throughputLimiter;
}

@Override
Expand All @@ -53,11 +53,14 @@ public void handleRequest(HttpServerExchange exchange) throws Exception {
() -> runTimeoutHandler(exchange, attachment),
timeout,
MILLISECONDS)));
try {
throughputLimiter.check(attachment.getCachedTopic());

ThroughputLimiter.QuotaInsight quotaInsight = throughputLimiter.checkQuota(
attachment.getCachedTopic().getTopicName(),
attachment.getCachedTopic().getThroughput());
if (quotaInsight.hasQuota()) {
readMessage(exchange, attachment);
} catch (ThroughputLimiter.QuotaViolationException ex) {
respondWithQuotaViolation(exchange, attachment, ex);
} else {
respondWithQuotaViolation(exchange, attachment, quotaInsight.getReason());
}
}

Expand Down Expand Up @@ -137,35 +140,45 @@ private void messageRead(HttpServerExchange exchange, byte[] messageContent, Att
try {
contentLengthChecker.check(exchange, messageContent.length, attachment);
attachment.getCachedTopic().reportMessageContentSize(messageContent.length);
throughputLimiter.check(attachment.getCachedTopic());
attachment.setMessageContent(messageContent);
endWithoutDefaultResponse(exchange);
if (exchange.isInIoThread()) {
dispatchToWorker(exchange, attachment);
ThroughputLimiter.QuotaInsight quotaCheck = throughputLimiter.checkQuota(
attachment.getCachedTopic().getTopicName(),
attachment.getCachedTopic().getThroughput());
if (quotaCheck.hasQuota()) {
finalizeMessageRead(exchange, messageContent, attachment);
} else {
next.handleRequest(exchange);
respondWithQuotaViolation(exchange, attachment, quotaCheck.getReason());
}
} catch (ContentLengthChecker.InvalidContentLengthException | ContentLengthChecker.ContentTooLargeException e) {
attachment.removeTimeout();
messageErrorProcessor.sendAndLog(exchange, attachment.getTopic(),
attachment.getMessageId(), error(e.getMessage(), VALIDATION_ERROR));
} catch (ThroughputLimiter.QuotaViolationException e) {
respondWithQuotaViolation(exchange, attachment, e);
} catch (Exception e) {
attachment.removeTimeout();
messageErrorProcessor.sendAndLog(exchange, attachment.getTopic(), attachment.getMessageId(), e);
}
}

private void finalizeMessageRead(HttpServerExchange exchange,
byte[] messageContent,
AttachmentContent attachment) throws Exception {
attachment.setMessageContent(messageContent);
endWithoutDefaultResponse(exchange);
if (exchange.isInIoThread()) {
dispatchToWorker(exchange, attachment);
} else {
next.handleRequest(exchange);
}
}

private void respondWithQuotaViolation(HttpServerExchange exchange,
AttachmentContent attachment,
ThroughputLimiter.QuotaViolationException ex) {
String reason) {
attachment.removeTimeout();
messageErrorProcessor.sendAndLog(
exchange,
attachment.getTopic(),
attachment.getMessageId(),
error(ex.getMessage(), QUOTA_VIOLATION));
error(reason, THROUGHPUT_QUOTA_VIOLATION));
}

private void dispatchToWorker(HttpServerExchange exchange, AttachmentContent attachment) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,27 +1,52 @@
package pl.allegro.tech.hermes.frontend.publishing.handlers;

import pl.allegro.tech.hermes.frontend.metric.CachedTopic;
import com.codahale.metrics.Metered;
import pl.allegro.tech.hermes.api.TopicName;

import static java.lang.String.format;

public class ThroughputLimiter {
private long limit;
public interface ThroughputLimiter {
QuotaInsight checkQuota(TopicName topic, Metered throughput);
default void start() {}
default void stop() {}

public ThroughputLimiter(long limit) {
this.limit = limit;
}
class QuotaInsight {
private static final QuotaInsight QUOTA_CONFIRMED = new QuotaInsight();
private static final QuotaInsight GLOBAL_VIOLATION = new QuotaInsight(false,
"Global throughput exceeded. Sorry for the inconvenience.");
private static final String DEFAULT_REASON = "unknown";

private boolean hasQuota = true;
private String reason;

public void check(CachedTopic cachedTopic) {
long current = (long) Math.floor(cachedTopic.getThroughput());
if (current > limit) {
throw new QuotaViolationException(current, limit);
private QuotaInsight() {
}

private QuotaInsight(boolean pass, String reason) {
this.hasQuota = pass;
this.reason = reason;
}

public boolean hasQuota() {
return hasQuota;
}
}

public static final class QuotaViolationException extends RuntimeException {
QuotaViolationException(long current, long limit) {
super(format("Current throughput exceeded limit [current:%s, limit:%s].",
current, limit));
public String getReason() {
return reason != null ? reason : DEFAULT_REASON;
}

public static QuotaInsight quotaConfirmed() {
return QUOTA_CONFIRMED;
}

public static QuotaInsight quotaViolation(long current, long limit) {
return new QuotaInsight(false,
format("Current throughput exceeded limit [current:%s, limit:%s].", current, limit));
}

public static QuotaInsight globalQuotaViolation() {
return GLOBAL_VIOLATION;
}

}
}
Loading

0 comments on commit 9755cec

Please sign in to comment.