Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

681 throughput limit #707

Merged
merged 2 commits into from
Feb 21, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public enum ErrorCode {
OAUTH_PROVIDER_ALREADY_EXISTS(BAD_REQUEST),
CROWD_GROUPS_COULD_NOT_BE_LOADED(INTERNAL_SERVER_ERROR),
TOPIC_BLACKLISTED(FORBIDDEN),
THROUGHPUT_QUOTA_VIOLATION(429),
TOPIC_NOT_UNBLACKLISTED(BAD_REQUEST),
OWNER_SOURCE_NOT_FOUND(NOT_FOUND),
OWNER_SOURCE_DOESNT_SUPPORT_AUTOCOMPLETE(BAD_REQUEST),
Expand All @@ -47,10 +48,14 @@ public enum ErrorCode {

private final int httpCode;

private ErrorCode(Response.Status httpCode) {
ErrorCode(Response.Status httpCode) {
this.httpCode = httpCode.getStatusCode();
}

ErrorCode(int httpCode) {
this.httpCode = httpCode;
}

public int getHttpCode() {
return httpCode;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,13 @@ public enum Configs {
FRONTEND_GRACEFUL_SHUTDOWN_INITIAL_WAIT_MS("frontend.graceful.shutdown.initial.wait.ms", 10000),
FRONTEND_HTTP2_ENABLED("frontend.http2.enabled", false),
FRONTEND_FORCE_TOPIC_MAX_MESSAGE_SIZE("frontend.force.topic.max.message.size", false),
FRONTEND_THROUGHPUT_TYPE("frontend.throughput.type", "unlimited"),
FRONTEND_THROUGHPUT_FIXED_MAX("frontend.throughput.fixed.max", Long.MAX_VALUE),
FRONTEND_THROUGHPUT_DYNAMIC_MAX("frontend.throughput.dynamic.max", Long.MAX_VALUE),
FRONTEND_THROUGHPUT_DYNAMIC_THRESHOLD("frontend.throughput.dynamic.threshold", Long.MAX_VALUE),
FRONTEND_THROUGHPUT_DYNAMIC_DESIRED("frontend.throughput.dynamic.desired", Long.MAX_VALUE),
FRONTEND_THROUGHPUT_DYNAMIC_IDLE("frontend.throughput.dynamic.idle", 0.5),
FRONTEND_THROUGHPUT_DYNAMIC_CHECK_INTERVAL("frontend.throughput.dynamic.interval.seconds", 30),

FRONTEND_SSL_ENABLED("frontend.ssl.enabled", false),
FRONTEND_SSL_PORT("frontend.ssl.port", 8443),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import pl.allegro.tech.hermes.frontend.producer.kafka.KafkaMessageProducerFactory;
import pl.allegro.tech.hermes.frontend.producer.kafka.Producers;
import pl.allegro.tech.hermes.frontend.publishing.handlers.HandlersChainFactory;
import pl.allegro.tech.hermes.frontend.publishing.handlers.ThroughputLimiter;
import pl.allegro.tech.hermes.frontend.publishing.handlers.ThroughputLimiterFactory;
import pl.allegro.tech.hermes.frontend.publishing.handlers.end.MessageEndProcessor;
import pl.allegro.tech.hermes.frontend.publishing.handlers.end.MessageErrorProcessor;
import pl.allegro.tech.hermes.frontend.publishing.message.MessageContentTypeEnforcer;
Expand Down Expand Up @@ -61,6 +63,7 @@ protected void configure() {
bindFactory(HandlersChainFactory.class).to(HttpHandler.class).in(Singleton.class);
bindFactory(KafkaMessageProducerFactory.class).to(Producers.class).in(Singleton.class);
bindFactory(KafkaBrokerMessageProducerFactory.class).to(BrokerMessageProducer.class).in(Singleton.class);
bindFactory(ThroughputLimiterFactory.class).to(ThroughputLimiter.class).in(Singleton.class);
bindSingleton(PublishingMessageTracker.class);
bindSingleton(NoOperationPublishingTracker.class);
bindFactory(TopicsCacheFactory.class).to(TopicsCache.class).in(Singleton.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.codahale.metrics.Counter;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Metered;
import com.codahale.metrics.Timer;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.api.TopicName;
Expand Down Expand Up @@ -160,4 +161,8 @@ public void markDelayedProcessing() {
topicDelayedProcessingMeter.mark();
globalDelayedProcessingMeter.mark();
}

public Metered getThroughput() {
return topicThroughputMeter;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package pl.allegro.tech.hermes.frontend.publishing.handlers;

import com.codahale.metrics.Metered;
import pl.allegro.tech.hermes.api.TopicName;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import static pl.allegro.tech.hermes.frontend.publishing.handlers.ThroughputLimiter.QuotaInsight.globalQuotaViolation;
import static pl.allegro.tech.hermes.frontend.publishing.handlers.ThroughputLimiter.QuotaInsight.quotaConfirmed;
import static pl.allegro.tech.hermes.frontend.publishing.handlers.ThroughputLimiter.QuotaInsight.quotaViolation;

public class DynamicThroughputLimiter implements ThroughputLimiter, Runnable {
private final long max;
private final long threshold;
private final long desired;
private final double idleThreshold;
private final Metered globalThroughputMeter;

private final ScheduledExecutorService executor;
private final int checkInterval;

private ConcurrentHashMap<TopicName, Throughput> users = new ConcurrentHashMap<>();

public DynamicThroughputLimiter(long max,
long threshold,
long desired,
double idleThreshold,
int checkInterval,
Metered globalThroughput,
ScheduledExecutorService executor) {
this.max = max;
this.threshold = threshold;
this.desired = desired;
this.idleThreshold = idleThreshold;
this.globalThroughputMeter = globalThroughput;
this.checkInterval = checkInterval;
this.executor = executor;
}

@Override
public QuotaInsight checkQuota(TopicName topic, Metered rate) {
Throughput throughput = users.computeIfAbsent(topic, name -> new Throughput(rate, max));
long value = throughput.getRoundedOneMinuteRate();
if (value > throughput.max) {
return quotaViolation(value, throughput.max);
} else if (globalThroughputMeter.getOneMinuteRate() > max) {
return globalQuotaViolation();
} else {
return quotaConfirmed();
}
}

@Override
public void start() {
executor.scheduleAtFixedRate(this, checkInterval, checkInterval, TimeUnit.SECONDS);
}

@Override
public void run() {
if (globalThroughputMeter.getOneMinuteRate() > threshold) {
calibrateLimits();
}
}

private void calibrateLimits() {
users.entrySet().removeIf(entry -> entry.getValue().getOneMinuteRate() <= idleThreshold);
int userCount = users.size();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i see a lot of potential for race conditions here - you remove entries here while check (which as i see is called for each request) makes putIfAbsent, then you check for size.. will it not create strange results?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not really because removal is done only for idle users, it takes minutes to become idle in the current implementation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

furthermore it is rather difficult to publish messages and be idle at the same time

if (userCount > 0) {
long total = users.reduceValuesToLong(Long.MAX_VALUE, Throughput::getRoundedOneMinuteRate, 0, ((left, right) -> left + right));
long mean = total / userCount;
long desiredMean = desired / userCount;
users.entrySet()
.stream()
.filter(entry -> entry.getValue().getRoundedOneMinuteRate() >= mean)
.forEach(entry -> entry.getValue().max = desiredMean);
}
}

private static class Throughput {
Metered current;
volatile long max;

Throughput(Metered current, long max) {
this.current = current;
this.max = max;
}

long getRoundedOneMinuteRate() {
return (long) Math.floor(current.getOneMinuteRate());
}

double getOneMinuteRate() {
return current.getOneMinuteRate();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package pl.allegro.tech.hermes.frontend.publishing.handlers;

import com.codahale.metrics.Metered;
import pl.allegro.tech.hermes.api.TopicName;

import static pl.allegro.tech.hermes.frontend.publishing.handlers.ThroughputLimiter.QuotaInsight.quotaConfirmed;
import static pl.allegro.tech.hermes.frontend.publishing.handlers.ThroughputLimiter.QuotaInsight.quotaViolation;

public class FixedThroughputLimiter implements ThroughputLimiter {
private long limit;

public FixedThroughputLimiter(long limit) {
this.limit = limit;
}

@Override
public QuotaInsight checkQuota(TopicName topic, Metered throughput) {
long rate = (long) Math.floor(throughput.getOneMinuteRate());
return rate > limit ? quotaViolation(rate, limit) : quotaConfirmed();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ public class HandlersChainFactory implements Factory<HttpHandler> {
private final BrokerMessageProducer brokerMessageProducer;
private final MessagePreviewLog previewLog;
private final boolean previewEnabled;
private final ThroughputLimiter throughputLimiter;

@Inject
public HandlersChainFactory(TopicsCache topicsCache, MessageErrorProcessor messageErrorProcessor,
MessageEndProcessor messageEndProcessor, ConfigFactory configFactory, MessageFactory messageFactory,
BrokerMessageProducer brokerMessageProducer, MessagePreviewLog messagePreviewLog) {
BrokerMessageProducer brokerMessageProducer, MessagePreviewLog messagePreviewLog,
ThroughputLimiter throughputLimiter) {
this.topicsCache = topicsCache;
this.messageErrorProcessor = messageErrorProcessor;
this.messageEndProcessor = messageEndProcessor;
Expand All @@ -36,6 +38,7 @@ public HandlersChainFactory(TopicsCache topicsCache, MessageErrorProcessor messa
this.brokerMessageProducer = brokerMessageProducer;
this.previewLog = messagePreviewLog;
this.previewEnabled = configFactory.getBooleanProperty(Configs.FRONTEND_MESSAGE_PREVIEW_ENABLED);
this.throughputLimiter = throughputLimiter;
}

@Override
Expand All @@ -44,7 +47,8 @@ public HttpHandler provide() {
HttpHandler messageCreateHandler = new MessageCreateHandler(publishing, messageFactory, messageErrorProcessor);
HttpHandler timeoutHandler = new TimeoutHandler(messageEndProcessor, messageErrorProcessor);
HttpHandler handlerAfterRead = previewEnabled ? new PreviewHandler(messageCreateHandler, previewLog) : messageCreateHandler;
HttpHandler readHandler = new MessageReadHandler(handlerAfterRead, timeoutHandler, configFactory, messageErrorProcessor);
HttpHandler readHandler = new MessageReadHandler(handlerAfterRead, timeoutHandler, configFactory,
messageErrorProcessor, throughputLimiter);
TopicHandler topicHandler = new TopicHandler(readHandler, topicsCache, messageErrorProcessor);

return topicHandler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.commons.lang3.exception.ExceptionUtils.getRootCauseMessage;
import static pl.allegro.tech.hermes.api.ErrorCode.INTERNAL_ERROR;
import static pl.allegro.tech.hermes.api.ErrorCode.THROUGHPUT_QUOTA_VIOLATION;
import static pl.allegro.tech.hermes.api.ErrorCode.VALIDATION_ERROR;
import static pl.allegro.tech.hermes.api.ErrorDescription.error;

Expand All @@ -27,15 +28,17 @@ class MessageReadHandler implements HttpHandler {
private final ContentLengthChecker contentLengthChecker;
private final int defaultAsyncTimeout;
private final int longAsyncTimeout;
private final ThroughputLimiter throughputLimiter;

MessageReadHandler(HttpHandler next, HttpHandler timeoutHandler, ConfigFactory configFactory,
MessageErrorProcessor messageErrorProcessor) {
MessageErrorProcessor messageErrorProcessor, ThroughputLimiter throughputLimiter) {
this.next = next;
this.timeoutHandler = timeoutHandler;
this.messageErrorProcessor = messageErrorProcessor;
this.contentLengthChecker = new ContentLengthChecker(configFactory);
this.defaultAsyncTimeout = configFactory.getIntProperty(Configs.FRONTEND_IDLE_TIMEOUT);
this.longAsyncTimeout = configFactory.getIntProperty(Configs.FRONTEND_LONG_IDLE_TIMEOUT);
this.throughputLimiter = throughputLimiter;
}

@Override
Expand All @@ -51,7 +54,14 @@ public void handleRequest(HttpServerExchange exchange) throws Exception {
timeout,
MILLISECONDS)));

readMessage(exchange, attachment);
ThroughputLimiter.QuotaInsight quotaInsight = throughputLimiter.checkQuota(
attachment.getCachedTopic().getTopicName(),
attachment.getCachedTopic().getThroughput());
if (quotaInsight.hasQuota()) {
readMessage(exchange, attachment);
} else {
respondWithQuotaViolation(exchange, attachment, quotaInsight.getReason());
}
}

private void runTimeoutHandler(HttpServerExchange exchange, AttachmentContent attachment) {
Expand Down Expand Up @@ -130,12 +140,13 @@ private void messageRead(HttpServerExchange exchange, byte[] messageContent, Att
try {
contentLengthChecker.check(exchange, messageContent.length, attachment);
attachment.getCachedTopic().reportMessageContentSize(messageContent.length);
attachment.setMessageContent(messageContent);
endWithoutDefaultResponse(exchange);
if (exchange.isInIoThread()) {
dispatchToWorker(exchange, attachment);
ThroughputLimiter.QuotaInsight quotaCheck = throughputLimiter.checkQuota(
attachment.getCachedTopic().getTopicName(),
attachment.getCachedTopic().getThroughput());
if (quotaCheck.hasQuota()) {
finalizeMessageRead(exchange, messageContent, attachment);
} else {
next.handleRequest(exchange);
respondWithQuotaViolation(exchange, attachment, quotaCheck.getReason());
}
} catch (ContentLengthChecker.InvalidContentLengthException | ContentLengthChecker.ContentTooLargeException e) {
attachment.removeTimeout();
Expand All @@ -147,6 +158,29 @@ private void messageRead(HttpServerExchange exchange, byte[] messageContent, Att
}
}

private void finalizeMessageRead(HttpServerExchange exchange,
byte[] messageContent,
AttachmentContent attachment) throws Exception {
attachment.setMessageContent(messageContent);
endWithoutDefaultResponse(exchange);
if (exchange.isInIoThread()) {
dispatchToWorker(exchange, attachment);
} else {
next.handleRequest(exchange);
}
}

private void respondWithQuotaViolation(HttpServerExchange exchange,
AttachmentContent attachment,
String reason) {
attachment.removeTimeout();
messageErrorProcessor.sendAndLog(
exchange,
attachment.getTopic(),
attachment.getMessageId(),
error(reason, THROUGHPUT_QUOTA_VIOLATION));
}

private void dispatchToWorker(HttpServerExchange exchange, AttachmentContent attachment) {
// exchange.dispatch(next) is not called here because async io read flag can be still set to true which combined with
// dispatch() leads to an exception
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package pl.allegro.tech.hermes.frontend.publishing.handlers;

import com.codahale.metrics.Metered;
import pl.allegro.tech.hermes.api.TopicName;

import static java.lang.String.format;

public interface ThroughputLimiter {
QuotaInsight checkQuota(TopicName topic, Metered throughput);
default void start() {}
default void stop() {}

class QuotaInsight {
private static final QuotaInsight QUOTA_CONFIRMED = new QuotaInsight();
private static final QuotaInsight GLOBAL_VIOLATION = new QuotaInsight(false,
"Global throughput exceeded. Sorry for the inconvenience.");
private static final String DEFAULT_REASON = "unknown";

private boolean hasQuota = true;
private String reason;

private QuotaInsight() {
}

private QuotaInsight(boolean pass, String reason) {
this.hasQuota = pass;
this.reason = reason;
}

public boolean hasQuota() {
return hasQuota;
}

public String getReason() {
return reason != null ? reason : DEFAULT_REASON;
}

public static QuotaInsight quotaConfirmed() {
return QUOTA_CONFIRMED;
}

public static QuotaInsight quotaViolation(long current, long limit) {
return new QuotaInsight(false,
format("Current throughput exceeded limit [current:%s, limit:%s].", current, limit));
}

public static QuotaInsight globalQuotaViolation() {
return GLOBAL_VIOLATION;
}

}
}
Loading