Skip to content

Commit

Permalink
Merge pull request #241 from tbak/master
Browse files Browse the repository at this point in the history
Improve rate limiter to throttle only full and delta registry fetches.
  • Loading branch information
tbak committed Nov 7, 2014
2 parents 65e0ef9 + c22b69a commit a221472
Show file tree
Hide file tree
Showing 5 changed files with 186 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,13 @@ public class DefaultEurekaServerConfig implements EurekaServerConfig {

// These counters are checked for each HTTP request. Instantiating them per request like for the other
// properties would be too costly.
private final DynamicStringSetProperty rateLimiterPrivilidgedClients =
new DynamicStringSetProperty(namespace + "rateLimiter.privilidgedClients", Collections.<String>emptySet());
private final DynamicStringSetProperty rateLimiterPrivilegedClients =
new DynamicStringSetProperty(namespace + "rateLimiter.privilegedClients", Collections.<String>emptySet());
private final DynamicBooleanProperty rateLimiterEnabled = configInstance.getBooleanProperty(namespace + "rateLimiter.enabled", false);
private final DynamicBooleanProperty rateLimiterThrottleStandardClients = configInstance.getBooleanProperty(namespace + "rateLimiter.throttleStandardClients", false);
private final DynamicIntProperty rateLimiterBurstSize = configInstance.getIntProperty(namespace + "rateLimiter.burstSize", 10);
private final DynamicIntProperty rateLimiterAverageRate = configInstance.getIntProperty(namespace + "rateLimiter.averageRate", 1000);
private final DynamicIntProperty rateLimiterRegistryFetchAverageRate = configInstance.getIntProperty(namespace + "rateLimiter.registryFetchAverageRate", 500);
private final DynamicIntProperty rateLimiterFullFetchAverageRate = configInstance.getIntProperty(namespace + "rateLimiter.fullFetchAverageRate", 100);

public DefaultEurekaServerConfig() {
init();
Expand Down Expand Up @@ -551,8 +553,13 @@ public boolean isRateLimiterEnabled() {
}

@Override
public Set<String> getRateLimiterPrivilidgedClients() {
return rateLimiterPrivilidgedClients.get();
public boolean isRateLimiterThrottleStandardClients() {
return rateLimiterThrottleStandardClients.get();
}

@Override
public Set<String> getRateLimiterPrivilegedClients() {
return rateLimiterPrivilegedClients.get();
}

@Override
Expand All @@ -561,7 +568,12 @@ public int getRateLimiterBurstSize() {
}

@Override
public int getRateLimiterAverageRate() {
return rateLimiterAverageRate.get();
public int getRateLimiterRegistryFetchAverageRate() {
return rateLimiterRegistryFetchAverageRate.get();
}

@Override
public int getRateLimiterFullFetchAverageRate() {
return rateLimiterFullFetchAverageRate.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -517,19 +517,32 @@ public interface EurekaServerConfig {
*/
boolean isRateLimiterEnabled();

/**
* Indicate if rate limit standard clients. If set to false, only non standard clients
* will be rate limited.
*/
boolean isRateLimiterThrottleStandardClients();

/**
* A list of certified clients. This is in addition to standard eureka Java clients.
*/
Set<String> getRateLimiterPrivilidgedClients();
Set<String> getRateLimiterPrivilegedClients();

/**
* Rate limiter, token bucket algorithm property. See also {@link #getRateLimiterAverageRate()}.
* Rate limiter, token bucket algorithm property. See also {@link #getRateLimiterRegistryFetchAverageRate()}
* and {@link #getRateLimiterFullFetchAverageRate()}.
*/
int getRateLimiterBurstSize();

/**
* Rate limiter, token bucket algorithm property. Specifies the average enforced request rate.
* See also {@link #getRateLimiterBurstSize()}.
*/
int getRateLimiterAverageRate();
int getRateLimiterRegistryFetchAverageRate();

/**
* Rate limiter, token bucket algorithm property. Specifies the average enforced request rate.
* See also {@link #getRateLimiterBurstSize()}.
*/
int getRateLimiterFullFetchAverageRate();
}
128 changes: 101 additions & 27 deletions eureka-core/src/main/java/com/netflix/eureka/RateLimitingFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,30 +28,40 @@
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import com.netflix.appinfo.AbstractEurekaIdentity;
import com.netflix.appinfo.EurekaClientIdentity;
import com.netflix.eureka.util.EurekaMonitors;
import com.netflix.eureka.util.RateLimiter;

/**
* Rate limiting filter, with configurable threshold above which non-privilidged clients
* Rate limiting filter, with configurable threshold above which non-privileged clients
* will be dropped. This feature enables cutting off non-standard and potentially harmful clients
* in case of system overload.
* in case of system overload. Since it is critical to always allow client registrations and heartbeats into
* the system, which at the same time are relatively cheap operations, the rate limiting is applied only to
* full and delta registry fetches. Furthermore, since delta fetches are much smaller than full fetches,
* and if not served my result in following full registry fetch from the client, they have relatively
* higher priority. This is implemented by two parallel rate limiters, one for overall number of
* full/delta fetches (higher threshold) and one for full fetches only (low threshold).
* <p>
* The client is identified by {@link AbstractEurekaIdentity#AUTH_NAME_HEADER_KEY} HTTP header
* value. The privilidged group by default contains:
* value. The privileged group by default contains:
* <ul>
* <li>
* {@link EurekaClientIdentity#DEFAULT_CLIENT_NAME} - standard Java eureka-client. Applications using
* this client automatically belong to the privilidged group.
* this client automatically belong to the privileged group.
* </li>
* <li>
* {@link com.netflix.eureka.EurekaServerIdentity#DEFAULT_SERVER_NAME} - connections from peer Eureka servers
* (internal only, traffic replication)
* </li>
* </ul>
*
* This feature is not enabled by default, but can be turned on via configuration. Even when disabled,
* It is possible to turn off privileged client filtering via
* {@link EurekaServerConfig#isRateLimiterThrottleStandardClients()} property.
* <p>
* Rate limiting is not enabled by default, but can be turned on via configuration. Even when disabled,
* the throttling statistics are still counted, although on a separate counter, so it is possible to
* measure the impact of this feature before activation.
*
Expand All @@ -71,47 +81,110 @@
*/
public class RateLimitingFilter implements Filter {

private static final Set<String> DEFAULT_PRIVILEDGED_CLIENTS = new HashSet<String>(
private static final Set<String> DEFAULT_PRIVILEGED_CLIENTS = new HashSet<String>(
Arrays.asList(EurekaClientIdentity.DEFAULT_CLIENT_NAME, EurekaServerIdentity.DEFAULT_SERVER_NAME)
);

private static final RateLimiter rateLimiter = new RateLimiter();
private static final Pattern TARGET_RE = Pattern.compile("^.*/apps(/[^/]*)?$");

enum Target {FullFetch, DeltaFetch, Application, Other}

/**
* Includes both full and delta fetches.
*/
private static final RateLimiter registryFetchRateLimiter = new RateLimiter();

/**
* Only full registry fetches.
*/
private static final RateLimiter registryFullFetchRateLimiter = new RateLimiter();

@Override
public void init(FilterConfig filterConfig) throws ServletException {
}

@Override
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
if (request instanceof HttpServletRequest) {
if (isRateLimited((HttpServletRequest) request)) {
if (EurekaServerConfigurationManager.getInstance().getConfiguration().isRateLimiterEnabled()) {
EurekaMonitors.RATE_LIMITED.increment();
((HttpServletResponse) response).setStatus(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
} else {
EurekaMonitors.RATE_LIMITED_CANDIDATES.increment();
chain.doFilter(request, response);
}
Target target = getTarget(request);
if (target == Target.Other) {
chain.doFilter(request, response);
return;
}

HttpServletRequest httpRequest = (HttpServletRequest) request;

if (isRateLimited(httpRequest, target)) {
incrementStats(target);
if (config().isRateLimiterEnabled()) {
((HttpServletResponse) response).setStatus(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
return;
}
}
chain.doFilter(request, response);
}

private static boolean isRateLimited(HttpServletRequest request) {
return !isPrivilidged(request) && isOverloaded();
private static Target getTarget(ServletRequest request) {
Target target = Target.Other;
if (request instanceof HttpServletRequest) {
HttpServletRequest httpRequest = (HttpServletRequest) request;

if ("GET".equals(httpRequest.getMethod())) {
Matcher matcher = TARGET_RE.matcher(httpRequest.getPathInfo());
if (matcher.matches()) {
if (matcher.groupCount() == 0 || "/".equals(matcher.group(1))) {
target = Target.FullFetch;
} else if ("/delta".equals(matcher.group(1))) {
target = Target.DeltaFetch;
} else {
target = Target.Application;
}
}
}
}
return target;
}

private static boolean isRateLimited(HttpServletRequest request, Target target) {
return !isPrivileged(request) && isOverloaded(target);
}

private static boolean isPrivilidged(HttpServletRequest request) {
Set<String> privilidgedClients = EurekaServerConfigurationManager.getInstance().getConfiguration().getRateLimiterPrivilidgedClients();
private static boolean isPrivileged(HttpServletRequest request) {
if (config().isRateLimiterThrottleStandardClients()) {
return false;
}
Set<String> privilegedClients = config().getRateLimiterPrivilegedClients();
String clientName = request.getHeader(AbstractEurekaIdentity.AUTH_NAME_HEADER_KEY);
return privilidgedClients.contains(clientName) || DEFAULT_PRIVILEDGED_CLIENTS.contains(clientName);
return privilegedClients.contains(clientName) || DEFAULT_PRIVILEGED_CLIENTS.contains(clientName);
}

private static boolean isOverloaded(Target target) {
int maxInWindow = config().getRateLimiterBurstSize();
int fetchWindowSize = config().getRateLimiterRegistryFetchAverageRate();
boolean overloaded = !registryFetchRateLimiter.acquire(maxInWindow, fetchWindowSize);

if (target == Target.FullFetch || target == Target.Application) {
int fullFetchWindowSize = config().getRateLimiterFullFetchAverageRate();
overloaded |= !registryFullFetchRateLimiter.acquire(maxInWindow, fullFetchWindowSize);
}
return overloaded;
}

private static void incrementStats(Target target) {
if (config().isRateLimiterEnabled()) {
EurekaMonitors.RATE_LIMITED.increment();
if (target == Target.FullFetch) {
EurekaMonitors.RATE_LIMITED_FULL_FETCH.increment();
}
} else {
EurekaMonitors.RATE_LIMITED_CANDIDATES.increment();
if (target == Target.FullFetch) {
EurekaMonitors.RATE_LIMITED_FULL_FETCH_CANDIDATES.increment();
}
}
}

private static boolean isOverloaded() {
int maxInWindow = EurekaServerConfigurationManager.getInstance().getConfiguration().getRateLimiterBurstSize();
int windowSize = EurekaServerConfigurationManager.getInstance().getConfiguration().getRateLimiterAverageRate();
return !rateLimiter.acquire(maxInWindow, windowSize);
private static EurekaServerConfig config() {
return EurekaServerConfigurationManager.getInstance().getConfiguration();
}

@Override
Expand All @@ -120,6 +193,7 @@ public void destroy() {

// For testing purposes
static void reset() {
rateLimiter.reset();
registryFetchRateLimiter.reset();
registryFullFetchRateLimiter.reset();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ public enum EurekaMonitors {
REJECTED_REPLICATIONS("numOfRejectedReplications", "Number of replications rejected because of full queue"),
FAILED_REPLICATIONS("numOfFailedReplications", "Number of failed replications - likely from timeouts"),
RATE_LIMITED("numOfRateLimitedRequests", "Number of requests discarded by the rate limiter"),
RATE_LIMITED_CANDIDATES("numOfRateLimitedRequestCandidates", "Number of requests that would be discarded if the rate limiter's throttling is activated");
RATE_LIMITED_CANDIDATES("numOfRateLimitedRequestCandidates", "Number of requests that would be discarded if the rate limiter's throttling is activated"),
RATE_LIMITED_FULL_FETCH("numOfRateLimitedFullFetchRequests", "Number of full registry fetch requests discarded by the rate limiter"),
RATE_LIMITED_FULL_FETCH_CANDIDATES("numOfRateLimitedFullFetchRequestCandidates", "Number of full registry fetch requests that would be discarded if the rate limiter's throttling is activated");

private final String name;

Expand Down
Loading

0 comments on commit a221472

Please sign in to comment.