Skip to content

Commit

Permalink
Implementation of RateLimitNetworkTrafficListener (#406)
Browse files Browse the repository at this point in the history
* Initial implementation of RateLimitNetworkTrafficListener

* Fix accidental removal of metrics listener

* Remove bucket4j option as guava and resilience4j would be sufficient

* Make NetworkTrafficRateLimiter and interface

* Add test for RateLimitNetworkTrafficListener

* Add range validator for NETWORK_TRAFFIC_RATE_LIMIT_BYTES_PER_SEC_CONFIG

* Add comment and logging for RateLimitNetworkTrafficListener class, and change the test to show drastic effect of rate limiting

* Fix comments in test and change validator to be atLeast instead of range
  • Loading branch information
trnguyencflt committed Aug 9, 2023
1 parent 4562e0f commit 4e3152a
Show file tree
Hide file tree
Showing 9 changed files with 473 additions and 0 deletions.
4 changes: 4 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-ratelimiter</artifactId>
</dependency>
<!--test-->
<dependency>
<groupId>org.bouncycastle</groupId>
Expand Down
11 changes: 11 additions & 0 deletions core/src/main/java/io/confluent/rest/ApplicationServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http2.server.HTTP2CServerConnectionFactory;
import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory;
import org.eclipse.jetty.io.NetworkTrafficListener;
import org.eclipse.jetty.jmx.MBeanContainer;
import org.eclipse.jetty.server.ConnectionFactory;
import org.eclipse.jetty.server.ConnectionLimit;
Expand Down Expand Up @@ -134,6 +135,15 @@ private void attachMetricsListener(Metrics metrics, Map<String, String> tags) {
}
}

private void attachNetworkTrafficRateLimitListener() {
if (config.getNetworkTrafficRateLimitEnable()) {
NetworkTrafficListener rateLimitListener = new RateLimitNetworkTrafficListener(config);
for (NetworkTrafficServerConnector connector : connectors) {
connector.addNetworkTrafficListener(rateLimitListener);
}
}
}

private void addJettyThreadPoolMetrics(Metrics metrics, Map<String, String> tags) {
//add metric for jetty thread pool queue size
String requestQueueSizeName = "request-queue-size";
Expand Down Expand Up @@ -197,6 +207,7 @@ protected final void doStart() throws Exception {
HandlerCollection wsHandlers = new HandlerCollection();
for (Application<?> app : applications) {
attachMetricsListener(app.getMetrics(), app.getMetricsTags());
attachNetworkTrafficRateLimitListener();
addJettyThreadPoolMetrics(app.getMetrics(), app.getMetricsTags());
handlers.addHandler(app.configureHandler());
wsHandlers.addHandler(app.configureWebSocketHandler());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2023 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.confluent.rest;

import io.confluent.rest.ratelimit.NetworkTrafficRateLimiter;
import io.confluent.rest.ratelimit.NetworkTrafficRateLimiterFactory;
import java.net.Socket;
import java.nio.ByteBuffer;
import org.eclipse.jetty.io.NetworkTrafficListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This class is used as a way to apply back-pressure on all the network traffic sockets when
* incoming rate exceeds a threshold
*/
public class RateLimitNetworkTrafficListener implements NetworkTrafficListener {

private static final Logger log = LoggerFactory.getLogger(RateLimitNetworkTrafficListener.class);

private final NetworkTrafficRateLimiter rateLimiter;

public RateLimitNetworkTrafficListener(RestConfig restConfig) {
rateLimiter = NetworkTrafficRateLimiterFactory.create(restConfig);
}

@Override
public void incoming(final Socket socket, final ByteBuffer bytes) {
int cost = bytes.limit() - bytes.position();
if (cost > 0) {
log.debug("Applying network traffic rate limit on socket: {} with cost: {}", socket, cost);
rateLimiter.rateLimit(cost);
}
}
}
55 changes: 55 additions & 0 deletions core/src/main/java/io/confluent/rest/RestConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import io.confluent.rest.extension.ResourceExtension;
import io.confluent.rest.metrics.RestMetricsContext;
import io.confluent.rest.ratelimit.NetworkTrafficRateLimitBackend;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
Expand Down Expand Up @@ -526,6 +527,28 @@ public class RestConfig extends AbstractConfig {
"Maximum buffer size for jetty request headers in bytes";
protected static final int MAX_REQUEST_HEADER_SIZE_DEFAULT = 8192;

protected static final String NETWORK_TRAFFIC_RATE_LIMIT_ENABLE_CONFIG =
"network.traffic.rate.limit.enable";
protected static final String NETWORK_TRAFFIC_RATE_LIMIT_ENABLE_DOC =
"Whether to enable network traffic rate-limiting. Default is false";
protected static final boolean NETWORK_TRAFFIC_RATE_LIMIT_ENABLE_DEFAULT = false;

protected static final String NETWORK_TRAFFIC_RATE_LIMIT_BACKEND_CONFIG =
"network.traffic.rate.limit.backend";
protected static final String NETWORK_TRAFFIC_RATE_LIMIT_BACKEND_DOC =
"The rate-limiting backend to use. The options are 'guava', and 'resilience4j'."
+ "Default is 'guava'.";
protected static final String NETWORK_TRAFFIC_RATE_LIMIT_BACKEND_DEFAULT = "guava";

protected static final String NETWORK_TRAFFIC_RATE_LIMIT_BYTES_PER_SEC_CONFIG =
"network.traffic.rate.limit.bytes.per.sec";
protected static final String NETWORK_TRAFFIC_RATE_LIMIT_BYTES_PER_SEC_DOC =
"The maximum number of bytes to emit per second for the network traffic. Default is 20MiB.";
protected static final Integer NETWORK_TRAFFIC_RATE_LIMIT_BYTES_PER_SEC_DEFAULT =
20 * 1024 * 1024;
protected static final ConfigDef.Range NETWORK_TRAFFIC_RATE_LIMIT_BYTES_PER_SEC_VALIDATOR =
ConfigDef.Range.atLeast(1);

protected static final boolean SUPPRESS_STACK_TRACE_IN_RESPONSE_DEFAULT = true;

static final List<String> SUPPORTED_URI_SCHEMES =
Expand Down Expand Up @@ -1066,6 +1089,25 @@ private static ConfigDef incompleteBaseConfigDef() {
MAX_REQUEST_HEADER_SIZE_DEFAULT,
Importance.LOW,
MAX_REQUEST_HEADER_SIZE_DOC
).define(
NETWORK_TRAFFIC_RATE_LIMIT_ENABLE_CONFIG,
Type.BOOLEAN,
NETWORK_TRAFFIC_RATE_LIMIT_ENABLE_DEFAULT,
Importance.LOW,
NETWORK_TRAFFIC_RATE_LIMIT_ENABLE_DOC
).define(
NETWORK_TRAFFIC_RATE_LIMIT_BACKEND_CONFIG,
Type.STRING,
NETWORK_TRAFFIC_RATE_LIMIT_BACKEND_DEFAULT,
Importance.LOW,
NETWORK_TRAFFIC_RATE_LIMIT_BACKEND_DOC
).define(
NETWORK_TRAFFIC_RATE_LIMIT_BYTES_PER_SEC_CONFIG,
Type.INT,
NETWORK_TRAFFIC_RATE_LIMIT_BYTES_PER_SEC_DEFAULT,
NETWORK_TRAFFIC_RATE_LIMIT_BYTES_PER_SEC_VALIDATOR,
Importance.LOW,
NETWORK_TRAFFIC_RATE_LIMIT_BYTES_PER_SEC_DOC
);
}

Expand Down Expand Up @@ -1336,6 +1378,19 @@ public final Map<String, String> getListenerProtocolMap() {
return result;
}

public final boolean getNetworkTrafficRateLimitEnable() {
return getBoolean(NETWORK_TRAFFIC_RATE_LIMIT_ENABLE_CONFIG);
}

public final NetworkTrafficRateLimitBackend getNetworkTrafficRateLimitBackend() {
return NetworkTrafficRateLimitBackend.valueOf(
getString(NETWORK_TRAFFIC_RATE_LIMIT_BACKEND_CONFIG).toUpperCase());
}

public final int getNetworkTrafficRateLimitBytesPerSec() {
return getInt(NETWORK_TRAFFIC_RATE_LIMIT_BYTES_PER_SEC_CONFIG);
}

/**
* <p>A helper method for extracting multi-instance application configuration,
* specified via property names of the form PREFIX[.LISTENER_NAME].PROPERTY.</p>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright 2023 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.confluent.rest.ratelimit;

public enum NetworkTrafficRateLimitBackend {
GUAVA,
RESILIENCE4J
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright 2023 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.confluent.rest.ratelimit;

public interface NetworkTrafficRateLimiter {

void rateLimit(int cost);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright 2023 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.confluent.rest.ratelimit;

import com.google.common.util.concurrent.RateLimiter;
import io.confluent.rest.RestConfig;
import io.github.resilience4j.ratelimiter.RateLimiterConfig;
import java.time.Duration;

public final class NetworkTrafficRateLimiterFactory {

private NetworkTrafficRateLimiterFactory() {
// prevent instantiation
}

public static NetworkTrafficRateLimiter create(RestConfig restConfig) {
int bytesPerSecond = restConfig.getNetworkTrafficRateLimitBytesPerSec();
switch (restConfig.getNetworkTrafficRateLimitBackend()) {
case GUAVA:
return GuavaNetworkTrafficRateLimiter.create(bytesPerSecond);
case RESILIENCE4J:
return Resilience4JNetworkTrafficRateLimiter.create(bytesPerSecond);
default:
throw new AssertionError("Unknown enum constant: "
+ restConfig.getNetworkTrafficRateLimitBackend());
}
}

static final class GuavaNetworkTrafficRateLimiter implements NetworkTrafficRateLimiter {

private final RateLimiter delegate;

GuavaNetworkTrafficRateLimiter(RateLimiter delegate) {
this.delegate = delegate;
}

static GuavaNetworkTrafficRateLimiter create(int bytesPerSecond) {
return new GuavaNetworkTrafficRateLimiter(RateLimiter.create(bytesPerSecond));
}

@Override
public void rateLimit(final int cost) {
delegate.acquire(cost);
}
}

static final class Resilience4JNetworkTrafficRateLimiter implements NetworkTrafficRateLimiter {

private final io.github.resilience4j.ratelimiter.RateLimiter delegate;

Resilience4JNetworkTrafficRateLimiter(io.github.resilience4j.ratelimiter.RateLimiter delegate) {
this.delegate = delegate;
}

static Resilience4JNetworkTrafficRateLimiter create(int bytesPerSecond) {
RateLimiterConfig config =
RateLimiterConfig.custom()
.limitRefreshPeriod(Duration.ofSeconds(1))
.limitForPeriod(bytesPerSecond)
.build();
return new Resilience4JNetworkTrafficRateLimiter(
io.github.resilience4j.ratelimiter.RateLimiter.of(
"Resilience4JNetworkTrafficRateLimiter", config)
);
}

@Override
public void rateLimit(final int cost) {
delegate.acquirePermission(cost);
}
}
}
Loading

0 comments on commit 4e3152a

Please sign in to comment.