Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve rate limiter to throttle only full and delta registry fetches. #241

Merged
merged 2 commits into from
Nov 7, 2014
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,13 @@ public class DefaultEurekaServerConfig implements EurekaServerConfig {

// These counters are checked for each HTTP request. Instantiating them per request like for the other
// properties would be too costly.
private final DynamicStringSetProperty rateLimiterPrivilidgedClients =
new DynamicStringSetProperty(namespace + "rateLimiter.privilidgedClients", Collections.<String>emptySet());
private final DynamicStringSetProperty rateLimiterPrivilegedClients =
new DynamicStringSetProperty(namespace + "rateLimiter.privilegedClients", Collections.<String>emptySet());
private final DynamicBooleanProperty rateLimiterEnabled = configInstance.getBooleanProperty(namespace + "rateLimiter.enabled", false);
private final DynamicBooleanProperty rateLimiterThrottleStandardClients = configInstance.getBooleanProperty(namespace + "rateLimiter.throttleStandardClients", false);
private final DynamicIntProperty rateLimiterBurstSize = configInstance.getIntProperty(namespace + "rateLimiter.burstSize", 10);
private final DynamicIntProperty rateLimiterAverageRate = configInstance.getIntProperty(namespace + "rateLimiter.averageRate", 1000);
private final DynamicIntProperty rateLimiterRegistryFetchAverageRate = configInstance.getIntProperty(namespace + "rateLimiter.registryFetchAverageRate", 500);
private final DynamicIntProperty rateLimiterFullFetchAverageRate = configInstance.getIntProperty(namespace + "rateLimiter.fullFetchAverageRate", 100);

public DefaultEurekaServerConfig() {
init();
Expand Down Expand Up @@ -551,8 +553,13 @@ public boolean isRateLimiterEnabled() {
}

@Override
public Set<String> getRateLimiterPrivilidgedClients() {
return rateLimiterPrivilidgedClients.get();
public boolean isRateLimiterThrottleStandardClients() {
return rateLimiterThrottleStandardClients.get();
}

@Override
public Set<String> getRateLimiterPrivilegedClients() {
return rateLimiterPrivilegedClients.get();
}

@Override
Expand All @@ -561,7 +568,12 @@ public int getRateLimiterBurstSize() {
}

@Override
public int getRateLimiterAverageRate() {
return rateLimiterAverageRate.get();
public int getRateLimiterRegistryFetchAverageRate() {
return rateLimiterRegistryFetchAverageRate.get();
}

@Override
public int getRateLimiterFullFetchAverageRate() {
return rateLimiterFullFetchAverageRate.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -517,19 +517,32 @@ public interface EurekaServerConfig {
*/
boolean isRateLimiterEnabled();

/**
* Indicate if rate limit standard clients. If set to false, only non standard clients
* will be rate limited.
*/
boolean isRateLimiterThrottleStandardClients();

/**
* A list of certified clients. This is in addition to standard eureka Java clients.
*/
Set<String> getRateLimiterPrivilidgedClients();
Set<String> getRateLimiterPrivilegedClients();

/**
* Rate limiter, token bucket algorithm property. See also {@link #getRateLimiterAverageRate()}.
* Rate limiter, token bucket algorithm property. See also {@link #getRateLimiterRegistryFetchAverageRate()}
* and {@link #getRateLimiterFullFetchAverageRate()}.
*/
int getRateLimiterBurstSize();

/**
* Rate limiter, token bucket algorithm property. Specifies the average enforced request rate.
* See also {@link #getRateLimiterBurstSize()}.
*/
int getRateLimiterAverageRate();
int getRateLimiterRegistryFetchAverageRate();

/**
* Rate limiter, token bucket algorithm property. Specifies the average enforced request rate.
* See also {@link #getRateLimiterBurstSize()}.
*/
int getRateLimiterFullFetchAverageRate();
}
120 changes: 93 additions & 27 deletions eureka-core/src/main/java/com/netflix/eureka/RateLimitingFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,31 @@
import com.netflix.eureka.util.RateLimiter;

/**
* Rate limiting filter, with configurable threshold above which non-privilidged clients
* Rate limiting filter, with configurable threshold above which non-privileged clients
* will be dropped. This feature enables cutting off non-standard and potentially harmful clients
* in case of system overload.
* in case of system overload. Since it is critical to always allow client registrations and heartbeats into
* the system, which at the same time are relatively cheap operations, the rate limiting is applied only to
* full and delta registry fetches. Furthermore, since delta fetches are much smaller than full fetches,
* and if not served my result in following full registry fetch from the client, they have relatively
* higher priority. This is implemented by two parallel rate limiters, one for overall number of
* full/delta fetches (higher threshold) and one for full fetches only (low threshold).
* <p>
* The client is identified by {@link AbstractEurekaIdentity#AUTH_NAME_HEADER_KEY} HTTP header
* value. The privilidged group by default contains:
* value. The privileged group by default contains:
* <ul>
* <li>
* {@link EurekaClientIdentity#DEFAULT_CLIENT_NAME} - standard Java eureka-client. Applications using
* this client automatically belong to the privilidged group.
* this client automatically belong to the privileged group.
* </li>
* <li>
* {@link com.netflix.eureka.EurekaServerIdentity#DEFAULT_SERVER_NAME} - connections from peer Eureka servers
* (internal only, traffic replication)
* </li>
* </ul>
*
* This feature is not enabled by default, but can be turned on via configuration. Even when disabled,
* It is possible to turn off privileged client filtering via
* {@link EurekaServerConfig#isRateLimiterThrottleStandardClients()} property.
* <p>
* Rate limiting is not enabled by default, but can be turned on via configuration. Even when disabled,
* the throttling statistics are still counted, although on a separate counter, so it is possible to
* measure the impact of this feature before activation.
*
Expand All @@ -71,47 +79,104 @@
*/
public class RateLimitingFilter implements Filter {

private static final Set<String> DEFAULT_PRIVILEDGED_CLIENTS = new HashSet<String>(
private static final Set<String> DEFAULT_PRIVILEGED_CLIENTS = new HashSet<String>(
Arrays.asList(EurekaClientIdentity.DEFAULT_CLIENT_NAME, EurekaServerIdentity.DEFAULT_SERVER_NAME)
);

private static final RateLimiter rateLimiter = new RateLimiter();
enum Target {FullFetch, DeltaFetch, Other}

/**
* Includes both full and delta fetches.
*/
private static final RateLimiter registryFetchRateLimiter = new RateLimiter();

/**
* Only full registry fetches.
*/
private static final RateLimiter registryFullFetchRateLimiter = new RateLimiter();

@Override
public void init(FilterConfig filterConfig) throws ServletException {
}

@Override
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
if (request instanceof HttpServletRequest) {
if (isRateLimited((HttpServletRequest) request)) {
if (EurekaServerConfigurationManager.getInstance().getConfiguration().isRateLimiterEnabled()) {
EurekaMonitors.RATE_LIMITED.increment();
((HttpServletResponse) response).setStatus(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
} else {
EurekaMonitors.RATE_LIMITED_CANDIDATES.increment();
chain.doFilter(request, response);
}
Target target = getTarget(request);
if (target == Target.Other) {
chain.doFilter(request, response);
return;
}

HttpServletRequest httpRequest = (HttpServletRequest) request;

if (isRateLimited(httpRequest, target)) {
incrementStats(target);
if (config().isRateLimiterEnabled()) {
((HttpServletResponse) response).setStatus(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
return;
}
}
chain.doFilter(request, response);
}

private static boolean isRateLimited(HttpServletRequest request) {
return !isPrivilidged(request) && isOverloaded();
private static Target getTarget(ServletRequest request) {
Target target = Target.Other;
if (request instanceof HttpServletRequest) {
HttpServletRequest httpRequest = (HttpServletRequest) request;

if ("GET".equals(httpRequest.getMethod())) {
String path = httpRequest.getPathInfo();
if (path.endsWith("/apps") || path.endsWith("/apps/")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ApplicationsResource also allows calls of the format /apps/{appId} for specific applications, we would want to cover this Target type here too. If we don't want to introduce more Target types it could roll up into FullFetch.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not include this on purpose, as majority of clients will do full/delta registry fetch. If we would like to include this as well, we can add another target type.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense too. We did add a feature to the client recently that allows clients to be configured to change it's periodic fetch from a full_all/delta_all pattern to be always full_single_app, so I just don't want this feature to become a backdoor to get around the rateLimiter.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add apps/<app_id> to the full fetch bucket.

target = Target.FullFetch;
} else if (path.endsWith("/apps/delta") || path.endsWith("/apps/delta/")) {
target = Target.DeltaFetch;
}
}
}
return target;
}

private static boolean isRateLimited(HttpServletRequest request, Target target) {
return !isPrivileged(request) && isOverloaded(target);
}

private static boolean isPrivilidged(HttpServletRequest request) {
Set<String> privilidgedClients = EurekaServerConfigurationManager.getInstance().getConfiguration().getRateLimiterPrivilidgedClients();
private static boolean isPrivileged(HttpServletRequest request) {
if (config().isRateLimiterThrottleStandardClients()) {
return false;
}
Set<String> privilegedClients = config().getRateLimiterPrivilegedClients();
String clientName = request.getHeader(AbstractEurekaIdentity.AUTH_NAME_HEADER_KEY);
return privilidgedClients.contains(clientName) || DEFAULT_PRIVILEDGED_CLIENTS.contains(clientName);
return privilegedClients.contains(clientName) || DEFAULT_PRIVILEGED_CLIENTS.contains(clientName);
}

private static boolean isOverloaded(Target target) {
int maxInWindow = config().getRateLimiterBurstSize();
int fetchWindowSize = config().getRateLimiterRegistryFetchAverageRate();
boolean overloaded = !registryFetchRateLimiter.acquire(maxInWindow, fetchWindowSize);

if (target == Target.FullFetch) {
int fullFetchWindowSize = config().getRateLimiterFullFetchAverageRate();
overloaded |= !registryFullFetchRateLimiter.acquire(maxInWindow, fullFetchWindowSize);
}
return overloaded;
}

private static void incrementStats(Target target) {
if (config().isRateLimiterEnabled()) {
EurekaMonitors.RATE_LIMITED.increment();
if (target == Target.FullFetch) {
EurekaMonitors.RATE_LIMITED_FULL_FETCH.increment();
}
} else {
EurekaMonitors.RATE_LIMITED_CANDIDATES.increment();
if (target == Target.FullFetch) {
EurekaMonitors.RATE_LIMITED_FULL_FETCH_CANDIDATES.increment();
}
}
}

private static boolean isOverloaded() {
int maxInWindow = EurekaServerConfigurationManager.getInstance().getConfiguration().getRateLimiterBurstSize();
int windowSize = EurekaServerConfigurationManager.getInstance().getConfiguration().getRateLimiterAverageRate();
return !rateLimiter.acquire(maxInWindow, windowSize);
private static EurekaServerConfig config() {
return EurekaServerConfigurationManager.getInstance().getConfiguration();
}

@Override
Expand All @@ -120,6 +185,7 @@ public void destroy() {

// For testing purposes
static void reset() {
rateLimiter.reset();
registryFetchRateLimiter.reset();
registryFullFetchRateLimiter.reset();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ public enum EurekaMonitors {
REJECTED_REPLICATIONS("numOfRejectedReplications", "Number of replications rejected because of full queue"),
FAILED_REPLICATIONS("numOfFailedReplications", "Number of failed replications - likely from timeouts"),
RATE_LIMITED("numOfRateLimitedRequests", "Number of requests discarded by the rate limiter"),
RATE_LIMITED_CANDIDATES("numOfRateLimitedRequestCandidates", "Number of requests that would be discarded if the rate limiter's throttling is activated");
RATE_LIMITED_CANDIDATES("numOfRateLimitedRequestCandidates", "Number of requests that would be discarded if the rate limiter's throttling is activated"),
RATE_LIMITED_FULL_FETCH("numOfRateLimitedFullFetchRequests", "Number of full registry fetch requests discarded by the rate limiter"),
RATE_LIMITED_FULL_FETCH_CANDIDATES("numOfRateLimitedFullFetchRequestCandidates", "Number of full registry fetch requests that would be discarded if the rate limiter's throttling is activated");

private final String name;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;

/**
Expand All @@ -41,6 +41,9 @@
@RunWith(MockitoJUnitRunner.class)
public class RateLimitingFilterTest {

private static final String FULL_FETCH = "base/apps";
private static final String DELTA_FETCH = "base/apps/delta";

private static final String CUSTOM_CLIENT = "CustomClient";
private static final String PYTHON_CLIENT = "PythonClient";

Expand All @@ -59,10 +62,12 @@ public class RateLimitingFilterTest {
public void setUp() throws Exception {
RateLimitingFilter.reset();

ConfigurationManager.getConfigInstance().setProperty("eureka.rateLimiter.privilidgedClients", PYTHON_CLIENT);
ConfigurationManager.getConfigInstance().setProperty("eureka.rateLimiter.privilegedClients", PYTHON_CLIENT);
ConfigurationManager.getConfigInstance().setProperty("eureka.rateLimiter.enabled", true);
ConfigurationManager.getConfigInstance().setProperty("eureka.rateLimiter.burstSize", 2);
ConfigurationManager.getConfigInstance().setProperty("eureka.rateLimiter.averageRate", 1);
ConfigurationManager.getConfigInstance().setProperty("eureka.rateLimiter.registryFetchAverageRate", 1);
ConfigurationManager.getConfigInstance().setProperty("eureka.rateLimiter.fullFetchAverageRate", 1);
ConfigurationManager.getConfigInstance().setProperty("eureka.rateLimiter.throttleStandardClients", false);

ApplicationInfoManager.getInstance().initComponent(new MyDataCenterInstanceConfig());

Expand All @@ -71,38 +76,53 @@ public void setUp() throws Exception {
}

@Test
public void testPrivilidgedClientAlwaysServed() throws Exception {
for (int i = 0; i < 2; i++) {
when(request.getHeader(AbstractEurekaIdentity.AUTH_NAME_HEADER_KEY)).thenReturn(PYTHON_CLIENT);
when(request.getHeader(AbstractEurekaIdentity.AUTH_NAME_HEADER_KEY)).thenReturn(EurekaClientIdentity.DEFAULT_CLIENT_NAME);
when(request.getHeader(AbstractEurekaIdentity.AUTH_NAME_HEADER_KEY)).thenReturn(EurekaServerIdentity.DEFAULT_SERVER_NAME);
}

for (int i = 0; i < 6; i++) {
filter.doFilter(request, response, filterChain);
}
verify(filterChain, times(6)).doFilter(request, response);
public void testPrivilegedClientAlwaysServed() throws Exception {
whenRequest(FULL_FETCH, PYTHON_CLIENT);
filter.doFilter(request, response, filterChain);

whenRequest(DELTA_FETCH, EurekaClientIdentity.DEFAULT_CLIENT_NAME);
filter.doFilter(request, response, filterChain);

whenRequest(FULL_FETCH, EurekaServerIdentity.DEFAULT_SERVER_NAME);
filter.doFilter(request, response, filterChain);

verify(filterChain, times(3)).doFilter(request, response);
verify(response, never()).setStatus(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
}

@Test
public void testCustomClientShedding() throws Exception {
public void testStandardClientsThrottled() throws Exception {
ConfigurationManager.getConfigInstance().setProperty("eureka.rateLimiter.throttleStandardClients", true);

// Custom clients will go up to the window limit
when(request.getHeader(AbstractEurekaIdentity.AUTH_NAME_HEADER_KEY)).thenReturn(CUSTOM_CLIENT);
when(request.getHeader(AbstractEurekaIdentity.AUTH_NAME_HEADER_KEY)).thenReturn(CUSTOM_CLIENT);
whenRequest(FULL_FETCH, EurekaClientIdentity.DEFAULT_CLIENT_NAME);
filter.doFilter(request, response, filterChain);
filter.doFilter(request, response, filterChain);

verify(filterChain, times(2)).doFilter(request, response);

// Now we hit the limit
long rateLimiterCounter = EurekaMonitors.RATE_LIMITED.getCount();
filter.doFilter(request, response, filterChain);

assertEquals("Expected rate limiter counter increase", rateLimiterCounter + 1, EurekaMonitors.RATE_LIMITED.getCount());
verify(response, times(1)).setStatus(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
}

@Test
public void testCustomClientShedding() throws Exception {
// Custom clients will go up to the window limit
whenRequest(FULL_FETCH, CUSTOM_CLIENT);
filter.doFilter(request, response, filterChain);
filter.doFilter(request, response, filterChain);

verify(filterChain, times(2)).doFilter(request, response);

// Now we hit the limit
long rateLimiterCounter = EurekaMonitors.RATE_LIMITED.getCount();
when(request.getHeader(AbstractEurekaIdentity.AUTH_NAME_HEADER_KEY)).thenReturn(CUSTOM_CLIENT);
filter.doFilter(request, response, filterChain);

assertEquals("Expected rate limiter counter increase", rateLimiterCounter + 1, EurekaMonitors.RATE_LIMITED.getCount());
// We just test the counter
verify(response, times(1)).setStatus(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
}

Expand All @@ -111,8 +131,7 @@ public void testCustomClientThrottlingCandidatesCounter() throws Exception {
ConfigurationManager.getConfigInstance().setProperty("eureka.rateLimiter.enabled", false);

// Custom clients will go up to the window limit
when(request.getHeader(AbstractEurekaIdentity.AUTH_NAME_HEADER_KEY)).thenReturn(CUSTOM_CLIENT);
when(request.getHeader(AbstractEurekaIdentity.AUTH_NAME_HEADER_KEY)).thenReturn(CUSTOM_CLIENT);
whenRequest(FULL_FETCH, CUSTOM_CLIENT);

filter.doFilter(request, response, filterChain);
filter.doFilter(request, response, filterChain);
Expand All @@ -121,11 +140,16 @@ public void testCustomClientThrottlingCandidatesCounter() throws Exception {

// Now we hit the limit
long rateLimiterCounter = EurekaMonitors.RATE_LIMITED_CANDIDATES.getCount();
when(request.getHeader(AbstractEurekaIdentity.AUTH_NAME_HEADER_KEY)).thenReturn(CUSTOM_CLIENT);
filter.doFilter(request, response, filterChain);

assertEquals("Expected rate limiter counter increase", rateLimiterCounter + 1, EurekaMonitors.RATE_LIMITED_CANDIDATES.getCount());
// We just test the counter
verify(response, times(0)).setStatus(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
}

private void whenRequest(String path, String client) {
when(request.getMethod()).thenReturn("GET");
when(request.getPathInfo()).thenReturn(path);
when(request.getHeader(AbstractEurekaIdentity.AUTH_NAME_HEADER_KEY)).thenReturn(client);
}
}