Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

681 throughput limit #707

Merged
merged 2 commits into from
Feb 21, 2017
Merged

681 throughput limit #707

merged 2 commits into from
Feb 21, 2017

Conversation

wojtkiewicz
Copy link
Contributor

@wojtkiewicz wojtkiewicz commented Feb 5, 2017

Added two implementations of throughput limiter:

  • unlimited
  • fixed (uses single threshold for all topics)
  • dynamic (calculates dynamically thresholds per topic)

Dynamic throughput limiting is configured by following factors:

  • max_throughout which is a hard limit on throughout that we accept across all topics
  • emergency_threshold after which we trigger actual limiting procedures (below this value we won't interfere)
  • desired_throughput which is a desired global value that we are trying to achieve after throughout limiting
  • idle_threshold which is used for detecting if topic is idle (if throughout on this topic is below this value then we clear all calculated limits and forget about it)

Related to #681

@wojtkiewicz wojtkiewicz changed the base branch from master to develop February 5, 2017 20:32
@@ -102,6 +102,12 @@
FRONTEND_GRACEFUL_SHUTDOWN_INITIAL_WAIT_MS("frontend.graceful.shutdown.initial.wait.ms", 10000),
FRONTEND_HTTP2_ENABLED("frontend.http2.enabled", false),
FRONTEND_FORCE_TOPIC_MAX_MESSAGE_SIZE("frontend.force.topic.max.message.size", false),
FRONTEND_THROUGHPUT_TYPE("frontend.throughput.type", "unlimited"),
FRONTEND_THROUGHPUT_MAX("frontend.throughput.max", Long.MAX_VALUE),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If these 5 parameters apply to dynamic implementation only, then let's put them in a proper namespace, e.g. frontend.throughput.dynamic.max.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

users.entrySet().removeIf(entry -> entry.getValue().getOneMinuteRate() <= idleThreshold);
int userCount = users.size();
if (userCount > 0) {
long total = users.reduceValuesToLong(10, Throughput::getRoundedOneMinuteRate, 0, ((left, right) -> left + right));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Introduce a private method for the right hand side please, I don't know what we're doing here ;)

def limiter = new DynamicThroughputLimiter(max, threshold, desired, idleThreshold, 1, globalMeter, executor)


def "global allow"() {
Copy link
Contributor

@pbetkier pbetkier Feb 5, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All test names: how about we stick to our convention of test names specifying "how the system should behave in a particular situation" instead of a 2-word scenario identifier?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, please use should .... when ... convention

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

abuserRate = 2001

expect:
!checkThroughput(abuser, abuserMeter)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not clear to me what to expect from this method without looking into it's implementation. It checks whether the throughput check passes for this given topic. How about throughputCheckPasses or isThroughputCheckPassing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clarified

@adamdubiel
Copy link
Member

I noticed sth strange in version that is already on master. At /topics/{topic}/subscriptions/{sub}/metrics there is throughput field which holds path to metrics in Graphite:

"throughput": "sumSeries(stats.tech.hermes.consumer.*.throughput.pl_allegro_tech_hermes.monitor.pl_allegro_tech_hermes_monitor_consumer1.m1_rate)",

This should not be like this, paths to graphite are calculated in console. Please fix this.

}

@Override
public void check(TopicName topic, Metered rate) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method uses exceptions as flow control - i think it should return proper values instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


private void calibrateLimits() {
users.entrySet().removeIf(entry -> entry.getValue().getOneMinuteRate() <= idleThreshold);
int userCount = users.size();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i see a lot of potential for race conditions here - you remove entries here while check (which as i see is called for each request) makes putIfAbsent, then you check for size.. will it not create strange results?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not really because removal is done only for idle users, it takes minutes to become idle in the current implementation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

furthermore it is rather difficult to publish messages and be idle at the same time


@Override
public void check(TopicName topic, Metered rate) {
if (!users.containsKey(topic)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you make if contains followed by put if absent, which is the same but actually atomic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -130,6 +139,9 @@ private void messageRead(HttpServerExchange exchange, byte[] messageContent, Att
try {
contentLengthChecker.check(exchange, messageContent.length, attachment);
attachment.getCachedTopic().reportMessageContentSize(messageContent.length);
throughputLimiter.check(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we check throughput in 2 places? here and handleRequest

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

main reason is not to read entire requests of abusive client

import static java.lang.String.format;

public interface ThroughputLimiter {
void check(TopicName topic, Metered throughput);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this interfaces contract is hidden behind assumptions about throwing exceptions of proper kinds which are used as flow control - please return values

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


def "global block"() {
given:
globalRate = 12_000
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't like the idea of buliding tests based on a class state. please be explicit, i.e.

given:
globalMeter = globalMeter(12_000)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mocking using spock makes this operation very verbose because I can't dynamically change this value until test is over which violates idea of throughput that changes in time

def limiter = new DynamicThroughputLimiter(max, threshold, desired, idleThreshold, 1, globalMeter, executor)


def "global allow"() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, please use should .... when ... convention

public void shouldReturn429ForQuotaViolation() {
// given
Topic topic = operations.buildTopic("publishAndConsumeGroup", "topic");
TestMessage message = TestMessage.of("content", StringUtils.repeat("X", 60_000));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this 60_000 - where does it come from? if this is based on config, please add this as test config instead of relying on defaults and add comment that describes origin of this magic number

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added comment

private static class Throughput {
Metered current;
volatile long max;
long creationTime;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

creationTime - not needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, thanks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

@@ -39,6 +39,7 @@
OAUTH_PROVIDER_ALREADY_EXISTS(BAD_REQUEST),
SUPPORT_TEAMS_COULD_NOT_BE_LOADED(INTERNAL_SERVER_ERROR),
TOPIC_BLACKLISTED(FORBIDDEN),
QUOTA_VIOLATION(429),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe THROUGHPUT_QUOTA_VIOLATION to be clear and let's create a constant for 429 (TOO_MANY_REQUESTS) if there is no available already.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -141,12 +153,25 @@ private void messageRead(HttpServerExchange exchange, byte[] messageContent, Att
attachment.removeTimeout();
messageErrorProcessor.sendAndLog(exchange, attachment.getTopic(),
attachment.getMessageId(), error(e.getMessage(), VALIDATION_ERROR));
} catch (FixedThroughputLimiter.QuotaViolationException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why FixedThroughputLimiter.* and notThroughputLimiter.*?
I would suggest refactoring QuotaViolation exceptions making them proper top level classes.

@wojtkiewicz wojtkiewicz force-pushed the 681_throughput_limit branch 3 times, most recently from 75601c9 to d6a277e Compare February 6, 2017 10:43
}

public String getReason() {
return reason != null? reason : DEFAULT_REASON;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Checkstyle] ERROR: '?' is not preceded with whitespace.

private ConfigFactory configs;
private HermesMetrics hermesMetrics;

private enum ThroughputLimiterType {UNLIMITED, FIXED, DYNAMIC}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Checkstyle] ERROR: '{' is not followed by whitespace.

private ConfigFactory configs;
private HermesMetrics hermesMetrics;

private enum ThroughputLimiterType {UNLIMITED, FIXED, DYNAMIC}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Checkstyle] ERROR: '}' is not preceded with whitespace.

@@ -47,6 +48,10 @@ private ErrorCode(Response.Status httpCode) {
this.httpCode = httpCode.getStatusCode();
}

private ErrorCode(int httpCode) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Checkstyle] ERROR: Redundant 'private' modifier.

users.entrySet().removeIf(entry -> entry.getValue().getOneMinuteRate() <= idleThreshold);
int userCount = users.size();
if (userCount > 0) {
long total = users.reduceValuesToLong(10, Throughput::getRoundedOneMinuteRate, 0, ((left, right) -> left + right));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Checkstyle] INFO: '10' is a magic number.

@Override
public QuotaInsight checkQuota(TopicName topic, Metered throughput) {
long rate = (long) Math.floor(throughput.getOneMinuteRate());
return rate > limit? quotaViolation(rate, limit) : quotaConfirmed();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Checkstyle] ERROR: '?' is not preceded with whitespace.

}

public String getReason() {
return reason != null? reason : DEFAULT_REASON;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Checkstyle] ERROR: '?' is not preceded with whitespace.

private ConfigFactory configs;
private HermesMetrics hermesMetrics;

private enum ThroughputLimiterType {UNLIMITED, FIXED, DYNAMIC}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Checkstyle] ERROR: '{' is not followed by whitespace.

private ConfigFactory configs;
private HermesMetrics hermesMetrics;

private enum ThroughputLimiterType {UNLIMITED, FIXED, DYNAMIC}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Checkstyle] ERROR: '}' is not preceded with whitespace.

@wojtkiewicz wojtkiewicz force-pushed the 681_throughput_limit branch 2 times, most recently from 5d8d6e4 to a2755ee Compare February 6, 2017 14:43
@adamdubiel adamdubiel added this to the 0.11.1 milestone Feb 28, 2017
@adamdubiel adamdubiel deleted the 681_throughput_limit branch October 25, 2017 13:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants