Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementation of RateLimitNetworkTrafficListener #406

Merged
merged 8 commits into from
Aug 9, 2023
4 changes: 4 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-ratelimiter</artifactId>
</dependency>
<!--test-->
<dependency>
<groupId>org.bouncycastle</groupId>
Expand Down
11 changes: 11 additions & 0 deletions core/src/main/java/io/confluent/rest/ApplicationServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http2.server.HTTP2CServerConnectionFactory;
import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory;
import org.eclipse.jetty.io.NetworkTrafficListener;
import org.eclipse.jetty.jmx.MBeanContainer;
import org.eclipse.jetty.server.ConnectionFactory;
import org.eclipse.jetty.server.ConnectionLimit;
Expand Down Expand Up @@ -134,6 +135,15 @@ private void attachMetricsListener(Metrics metrics, Map<String, String> tags) {
}
}

private void attachNetworkTrafficRateLimitListener() {
if (config.getNetworkTrafficRateLimitEnable()) {
NetworkTrafficListener rateLimitListener = new RateLimitNetworkTrafficListener(config);
for (NetworkTrafficServerConnector connector : connectors) {
connector.addNetworkTrafficListener(rateLimitListener);
}
}
}

private void addJettyThreadPoolMetrics(Metrics metrics, Map<String, String> tags) {
//add metric for jetty thread pool queue size
String requestQueueSizeName = "request-queue-size";
Expand Down Expand Up @@ -197,6 +207,7 @@ protected final void doStart() throws Exception {
HandlerCollection wsHandlers = new HandlerCollection();
for (Application<?> app : applications) {
attachMetricsListener(app.getMetrics(), app.getMetricsTags());
attachNetworkTrafficRateLimitListener();
trnguyencflt marked this conversation as resolved.
Show resolved Hide resolved
addJettyThreadPoolMetrics(app.getMetrics(), app.getMetricsTags());
handlers.addHandler(app.configureHandler());
wsHandlers.addHandler(app.configureWebSocketHandler());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2023 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.confluent.rest;

import io.confluent.rest.ratelimit.NetworkTrafficRateLimiter;
import io.confluent.rest.ratelimit.NetworkTrafficRateLimiterFactory;
import java.net.Socket;
import java.nio.ByteBuffer;
import org.eclipse.jetty.io.NetworkTrafficListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This class is used as a way to apply back-pressure on all the network traffic sockets when
* incoming rate exceeds a threshold
*/
public class RateLimitNetworkTrafficListener implements NetworkTrafficListener {
trnguyencflt marked this conversation as resolved.
Show resolved Hide resolved

private static final Logger log = LoggerFactory.getLogger(RateLimitNetworkTrafficListener.class);

private final NetworkTrafficRateLimiter rateLimiter;

public RateLimitNetworkTrafficListener(RestConfig restConfig) {
rateLimiter = NetworkTrafficRateLimiterFactory.create(restConfig);
}

@Override
public void incoming(final Socket socket, final ByteBuffer bytes) {
int cost = bytes.limit() - bytes.position();
trnguyencflt marked this conversation as resolved.
Show resolved Hide resolved
if (cost > 0) {
log.debug("Applying network traffic rate limit on socket: {} with cost: {}", socket, cost);
rateLimiter.rateLimit(cost);
}
}
}
55 changes: 55 additions & 0 deletions core/src/main/java/io/confluent/rest/RestConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import io.confluent.rest.extension.ResourceExtension;
import io.confluent.rest.metrics.RestMetricsContext;
import io.confluent.rest.ratelimit.NetworkTrafficRateLimitBackend;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
Expand Down Expand Up @@ -526,6 +527,28 @@ public class RestConfig extends AbstractConfig {
"Maximum buffer size for jetty request headers in bytes";
protected static final int MAX_REQUEST_HEADER_SIZE_DEFAULT = 8192;

protected static final String NETWORK_TRAFFIC_RATE_LIMIT_ENABLE_CONFIG =
"network.traffic.rate.limit.enable";
protected static final String NETWORK_TRAFFIC_RATE_LIMIT_ENABLE_DOC =
"Whether to enable network traffic rate-limiting. Default is false";
protected static final boolean NETWORK_TRAFFIC_RATE_LIMIT_ENABLE_DEFAULT = false;

protected static final String NETWORK_TRAFFIC_RATE_LIMIT_BACKEND_CONFIG =
"network.traffic.rate.limit.backend";
protected static final String NETWORK_TRAFFIC_RATE_LIMIT_BACKEND_DOC =
"The rate-limiting backend to use. The options are 'guava', and 'resilience4j'."
+ "Default is 'guava'.";
Copy link
Member

@ehumber ehumber Aug 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For reference, the kafka-rest default may be guava (and good we match it), but we actually run with the config set to resliance4j, so the launch darkly setting should match that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'll make the config in LD set to resilience4j

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the reason to keeping both guava/resilience4j as 2 options? I never understood why kafka-rest gives an option.
IMO, ideally whatever the reason, would have converged to 1 option by now, no reason to have LD/config.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is some historical reason I think when it first developed

protected static final String NETWORK_TRAFFIC_RATE_LIMIT_BACKEND_DEFAULT = "guava";

protected static final String NETWORK_TRAFFIC_RATE_LIMIT_BYTES_PER_SEC_CONFIG =
"network.traffic.rate.limit.bytes.per.sec";
protected static final String NETWORK_TRAFFIC_RATE_LIMIT_BYTES_PER_SEC_DOC =
"The maximum number of bytes to emit per second for the network traffic. Default is 20MiB.";
protected static final Integer NETWORK_TRAFFIC_RATE_LIMIT_BYTES_PER_SEC_DEFAULT =
20 * 1024 * 1024;
protected static final ConfigDef.Range NETWORK_TRAFFIC_RATE_LIMIT_BYTES_PER_SEC_VALIDATOR =
ConfigDef.Range.atLeast(1);

protected static final boolean SUPPRESS_STACK_TRACE_IN_RESPONSE_DEFAULT = true;

static final List<String> SUPPORTED_URI_SCHEMES =
Expand Down Expand Up @@ -1066,6 +1089,25 @@ private static ConfigDef incompleteBaseConfigDef() {
MAX_REQUEST_HEADER_SIZE_DEFAULT,
Importance.LOW,
MAX_REQUEST_HEADER_SIZE_DOC
).define(
NETWORK_TRAFFIC_RATE_LIMIT_ENABLE_CONFIG,
Type.BOOLEAN,
NETWORK_TRAFFIC_RATE_LIMIT_ENABLE_DEFAULT,
Importance.LOW,
NETWORK_TRAFFIC_RATE_LIMIT_ENABLE_DOC
).define(
NETWORK_TRAFFIC_RATE_LIMIT_BACKEND_CONFIG,
Type.STRING,
NETWORK_TRAFFIC_RATE_LIMIT_BACKEND_DEFAULT,
Importance.LOW,
NETWORK_TRAFFIC_RATE_LIMIT_BACKEND_DOC
).define(
NETWORK_TRAFFIC_RATE_LIMIT_BYTES_PER_SEC_CONFIG,
trnguyencflt marked this conversation as resolved.
Show resolved Hide resolved
Type.INT,
NETWORK_TRAFFIC_RATE_LIMIT_BYTES_PER_SEC_DEFAULT,
NETWORK_TRAFFIC_RATE_LIMIT_BYTES_PER_SEC_VALIDATOR,
Importance.LOW,
NETWORK_TRAFFIC_RATE_LIMIT_BYTES_PER_SEC_DOC
);
}

Expand Down Expand Up @@ -1336,6 +1378,19 @@ public final Map<String, String> getListenerProtocolMap() {
return result;
}

public final boolean getNetworkTrafficRateLimitEnable() {
return getBoolean(NETWORK_TRAFFIC_RATE_LIMIT_ENABLE_CONFIG);
}

public final NetworkTrafficRateLimitBackend getNetworkTrafficRateLimitBackend() {
return NetworkTrafficRateLimitBackend.valueOf(
getString(NETWORK_TRAFFIC_RATE_LIMIT_BACKEND_CONFIG).toUpperCase());
}

public final int getNetworkTrafficRateLimitBytesPerSec() {
return getInt(NETWORK_TRAFFIC_RATE_LIMIT_BYTES_PER_SEC_CONFIG);
}

/**
* <p>A helper method for extracting multi-instance application configuration,
* specified via property names of the form PREFIX[.LISTENER_NAME].PROPERTY.</p>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright 2023 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.confluent.rest.ratelimit;

public enum NetworkTrafficRateLimitBackend {
GUAVA,
RESILIENCE4J
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright 2023 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.confluent.rest.ratelimit;

public interface NetworkTrafficRateLimiter {

void rateLimit(int cost);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright 2023 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.confluent.rest.ratelimit;

import com.google.common.util.concurrent.RateLimiter;
import io.confluent.rest.RestConfig;
import io.github.resilience4j.ratelimiter.RateLimiterConfig;
import java.time.Duration;

public final class NetworkTrafficRateLimiterFactory {

private NetworkTrafficRateLimiterFactory() {
// prevent instantiation
}

public static NetworkTrafficRateLimiter create(RestConfig restConfig) {
int bytesPerSecond = restConfig.getNetworkTrafficRateLimitBytesPerSec();
switch (restConfig.getNetworkTrafficRateLimitBackend()) {
case GUAVA:
return GuavaNetworkTrafficRateLimiter.create(bytesPerSecond);
case RESILIENCE4J:
return Resilience4JNetworkTrafficRateLimiter.create(bytesPerSecond);
default:
throw new AssertionError("Unknown enum constant: "
+ restConfig.getNetworkTrafficRateLimitBackend());
}
}

static final class GuavaNetworkTrafficRateLimiter implements NetworkTrafficRateLimiter {

private final RateLimiter delegate;

GuavaNetworkTrafficRateLimiter(RateLimiter delegate) {
this.delegate = delegate;
}

static GuavaNetworkTrafficRateLimiter create(int bytesPerSecond) {
return new GuavaNetworkTrafficRateLimiter(RateLimiter.create(bytesPerSecond));
}

@Override
public void rateLimit(final int cost) {
delegate.acquire(cost);
}
}

static final class Resilience4JNetworkTrafficRateLimiter implements NetworkTrafficRateLimiter {

private final io.github.resilience4j.ratelimiter.RateLimiter delegate;

Resilience4JNetworkTrafficRateLimiter(io.github.resilience4j.ratelimiter.RateLimiter delegate) {
this.delegate = delegate;
}

static Resilience4JNetworkTrafficRateLimiter create(int bytesPerSecond) {
RateLimiterConfig config =
RateLimiterConfig.custom()
.limitRefreshPeriod(Duration.ofSeconds(1))
.limitForPeriod(bytesPerSecond)
.build();
return new Resilience4JNetworkTrafficRateLimiter(
io.github.resilience4j.ratelimiter.RateLimiter.of(
"Resilience4JNetworkTrafficRateLimiter", config)
);
}

@Override
public void rateLimit(final int cost) {
delegate.acquirePermission(cost);
}
}
}