diff --git a/core/pom.xml b/core/pom.xml index 2bf900e8f1..e46afc1e1c 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -125,6 +125,10 @@ guava ${guava.version} + + io.github.resilience4j + resilience4j-ratelimiter + org.bouncycastle diff --git a/core/src/main/java/io/confluent/rest/ApplicationServer.java b/core/src/main/java/io/confluent/rest/ApplicationServer.java index caf39cdfeb..d911937e22 100644 --- a/core/src/main/java/io/confluent/rest/ApplicationServer.java +++ b/core/src/main/java/io/confluent/rest/ApplicationServer.java @@ -36,6 +36,7 @@ import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.http2.server.HTTP2CServerConnectionFactory; import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory; +import org.eclipse.jetty.io.NetworkTrafficListener; import org.eclipse.jetty.jmx.MBeanContainer; import org.eclipse.jetty.server.ConnectionFactory; import org.eclipse.jetty.server.ConnectionLimit; @@ -134,6 +135,15 @@ private void attachMetricsListener(Metrics metrics, Map tags) { } } + private void attachNetworkTrafficRateLimitListener() { + if (config.getNetworkTrafficRateLimitEnable()) { + NetworkTrafficListener rateLimitListener = new RateLimitNetworkTrafficListener(config); + for (NetworkTrafficServerConnector connector : connectors) { + connector.addNetworkTrafficListener(rateLimitListener); + } + } + } + private void addJettyThreadPoolMetrics(Metrics metrics, Map tags) { //add metric for jetty thread pool queue size String requestQueueSizeName = "request-queue-size"; @@ -197,6 +207,7 @@ protected final void doStart() throws Exception { HandlerCollection wsHandlers = new HandlerCollection(); for (Application app : applications) { attachMetricsListener(app.getMetrics(), app.getMetricsTags()); + attachNetworkTrafficRateLimitListener(); addJettyThreadPoolMetrics(app.getMetrics(), app.getMetricsTags()); handlers.addHandler(app.configureHandler()); wsHandlers.addHandler(app.configureWebSocketHandler()); diff --git a/core/src/main/java/io/confluent/rest/RateLimitNetworkTrafficListener.java b/core/src/main/java/io/confluent/rest/RateLimitNetworkTrafficListener.java new file mode 100644 index 0000000000..959a95d46e --- /dev/null +++ b/core/src/main/java/io/confluent/rest/RateLimitNetworkTrafficListener.java @@ -0,0 +1,49 @@ +/* + * Copyright 2023 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.confluent.rest; + +import io.confluent.rest.ratelimit.NetworkTrafficRateLimiter; +import io.confluent.rest.ratelimit.NetworkTrafficRateLimiterFactory; +import java.net.Socket; +import java.nio.ByteBuffer; +import org.eclipse.jetty.io.NetworkTrafficListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class is used as a way to apply back-pressure on all the network traffic sockets when + * incoming rate exceeds a threshold + */ +public class RateLimitNetworkTrafficListener implements NetworkTrafficListener { + + private static final Logger log = LoggerFactory.getLogger(RateLimitNetworkTrafficListener.class); + + private final NetworkTrafficRateLimiter rateLimiter; + + public RateLimitNetworkTrafficListener(RestConfig restConfig) { + rateLimiter = NetworkTrafficRateLimiterFactory.create(restConfig); + } + + @Override + public void incoming(final Socket socket, final ByteBuffer bytes) { + int cost = bytes.limit() - bytes.position(); + if (cost > 0) { + log.debug("Applying network traffic rate limit on socket: {} with cost: {}", socket, cost); + rateLimiter.rateLimit(cost); + } + } +} diff --git a/core/src/main/java/io/confluent/rest/RestConfig.java b/core/src/main/java/io/confluent/rest/RestConfig.java index 8e7aa4df76..470c97cb71 100644 --- a/core/src/main/java/io/confluent/rest/RestConfig.java +++ b/core/src/main/java/io/confluent/rest/RestConfig.java @@ -24,6 +24,7 @@ import io.confluent.rest.extension.ResourceExtension; import io.confluent.rest.metrics.RestMetricsContext; +import io.confluent.rest.ratelimit.NetworkTrafficRateLimitBackend; import java.net.URI; import java.net.URISyntaxException; import java.time.Duration; @@ -526,6 +527,28 @@ public class RestConfig extends AbstractConfig { "Maximum buffer size for jetty request headers in bytes"; protected static final int MAX_REQUEST_HEADER_SIZE_DEFAULT = 8192; + protected static final String NETWORK_TRAFFIC_RATE_LIMIT_ENABLE_CONFIG = + "network.traffic.rate.limit.enable"; + protected static final String NETWORK_TRAFFIC_RATE_LIMIT_ENABLE_DOC = + "Whether to enable network traffic rate-limiting. Default is false"; + protected static final boolean NETWORK_TRAFFIC_RATE_LIMIT_ENABLE_DEFAULT = false; + + protected static final String NETWORK_TRAFFIC_RATE_LIMIT_BACKEND_CONFIG = + "network.traffic.rate.limit.backend"; + protected static final String NETWORK_TRAFFIC_RATE_LIMIT_BACKEND_DOC = + "The rate-limiting backend to use. The options are 'guava', and 'resilience4j'." + + "Default is 'guava'."; + protected static final String NETWORK_TRAFFIC_RATE_LIMIT_BACKEND_DEFAULT = "guava"; + + protected static final String NETWORK_TRAFFIC_RATE_LIMIT_BYTES_PER_SEC_CONFIG = + "network.traffic.rate.limit.bytes.per.sec"; + protected static final String NETWORK_TRAFFIC_RATE_LIMIT_BYTES_PER_SEC_DOC = + "The maximum number of bytes to emit per second for the network traffic. Default is 20MiB."; + protected static final Integer NETWORK_TRAFFIC_RATE_LIMIT_BYTES_PER_SEC_DEFAULT = + 20 * 1024 * 1024; + protected static final ConfigDef.Range NETWORK_TRAFFIC_RATE_LIMIT_BYTES_PER_SEC_VALIDATOR = + ConfigDef.Range.atLeast(1); + protected static final boolean SUPPRESS_STACK_TRACE_IN_RESPONSE_DEFAULT = true; static final List SUPPORTED_URI_SCHEMES = @@ -1066,6 +1089,25 @@ private static ConfigDef incompleteBaseConfigDef() { MAX_REQUEST_HEADER_SIZE_DEFAULT, Importance.LOW, MAX_REQUEST_HEADER_SIZE_DOC + ).define( + NETWORK_TRAFFIC_RATE_LIMIT_ENABLE_CONFIG, + Type.BOOLEAN, + NETWORK_TRAFFIC_RATE_LIMIT_ENABLE_DEFAULT, + Importance.LOW, + NETWORK_TRAFFIC_RATE_LIMIT_ENABLE_DOC + ).define( + NETWORK_TRAFFIC_RATE_LIMIT_BACKEND_CONFIG, + Type.STRING, + NETWORK_TRAFFIC_RATE_LIMIT_BACKEND_DEFAULT, + Importance.LOW, + NETWORK_TRAFFIC_RATE_LIMIT_BACKEND_DOC + ).define( + NETWORK_TRAFFIC_RATE_LIMIT_BYTES_PER_SEC_CONFIG, + Type.INT, + NETWORK_TRAFFIC_RATE_LIMIT_BYTES_PER_SEC_DEFAULT, + NETWORK_TRAFFIC_RATE_LIMIT_BYTES_PER_SEC_VALIDATOR, + Importance.LOW, + NETWORK_TRAFFIC_RATE_LIMIT_BYTES_PER_SEC_DOC ); } @@ -1336,6 +1378,19 @@ public final Map getListenerProtocolMap() { return result; } + public final boolean getNetworkTrafficRateLimitEnable() { + return getBoolean(NETWORK_TRAFFIC_RATE_LIMIT_ENABLE_CONFIG); + } + + public final NetworkTrafficRateLimitBackend getNetworkTrafficRateLimitBackend() { + return NetworkTrafficRateLimitBackend.valueOf( + getString(NETWORK_TRAFFIC_RATE_LIMIT_BACKEND_CONFIG).toUpperCase()); + } + + public final int getNetworkTrafficRateLimitBytesPerSec() { + return getInt(NETWORK_TRAFFIC_RATE_LIMIT_BYTES_PER_SEC_CONFIG); + } + /** *

A helper method for extracting multi-instance application configuration, * specified via property names of the form PREFIX[.LISTENER_NAME].PROPERTY.

diff --git a/core/src/main/java/io/confluent/rest/ratelimit/NetworkTrafficRateLimitBackend.java b/core/src/main/java/io/confluent/rest/ratelimit/NetworkTrafficRateLimitBackend.java new file mode 100644 index 0000000000..12ca84ba9e --- /dev/null +++ b/core/src/main/java/io/confluent/rest/ratelimit/NetworkTrafficRateLimitBackend.java @@ -0,0 +1,22 @@ +/* + * Copyright 2023 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.confluent.rest.ratelimit; + +public enum NetworkTrafficRateLimitBackend { + GUAVA, + RESILIENCE4J +} diff --git a/core/src/main/java/io/confluent/rest/ratelimit/NetworkTrafficRateLimiter.java b/core/src/main/java/io/confluent/rest/ratelimit/NetworkTrafficRateLimiter.java new file mode 100644 index 0000000000..9ad4b79db0 --- /dev/null +++ b/core/src/main/java/io/confluent/rest/ratelimit/NetworkTrafficRateLimiter.java @@ -0,0 +1,22 @@ +/* + * Copyright 2023 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.confluent.rest.ratelimit; + +public interface NetworkTrafficRateLimiter { + + void rateLimit(int cost); +} diff --git a/core/src/main/java/io/confluent/rest/ratelimit/NetworkTrafficRateLimiterFactory.java b/core/src/main/java/io/confluent/rest/ratelimit/NetworkTrafficRateLimiterFactory.java new file mode 100644 index 0000000000..1f42be81f0 --- /dev/null +++ b/core/src/main/java/io/confluent/rest/ratelimit/NetworkTrafficRateLimiterFactory.java @@ -0,0 +1,86 @@ +/* + * Copyright 2023 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.confluent.rest.ratelimit; + +import com.google.common.util.concurrent.RateLimiter; +import io.confluent.rest.RestConfig; +import io.github.resilience4j.ratelimiter.RateLimiterConfig; +import java.time.Duration; + +public final class NetworkTrafficRateLimiterFactory { + + private NetworkTrafficRateLimiterFactory() { + // prevent instantiation + } + + public static NetworkTrafficRateLimiter create(RestConfig restConfig) { + int bytesPerSecond = restConfig.getNetworkTrafficRateLimitBytesPerSec(); + switch (restConfig.getNetworkTrafficRateLimitBackend()) { + case GUAVA: + return GuavaNetworkTrafficRateLimiter.create(bytesPerSecond); + case RESILIENCE4J: + return Resilience4JNetworkTrafficRateLimiter.create(bytesPerSecond); + default: + throw new AssertionError("Unknown enum constant: " + + restConfig.getNetworkTrafficRateLimitBackend()); + } + } + + static final class GuavaNetworkTrafficRateLimiter implements NetworkTrafficRateLimiter { + + private final RateLimiter delegate; + + GuavaNetworkTrafficRateLimiter(RateLimiter delegate) { + this.delegate = delegate; + } + + static GuavaNetworkTrafficRateLimiter create(int bytesPerSecond) { + return new GuavaNetworkTrafficRateLimiter(RateLimiter.create(bytesPerSecond)); + } + + @Override + public void rateLimit(final int cost) { + delegate.acquire(cost); + } + } + + static final class Resilience4JNetworkTrafficRateLimiter implements NetworkTrafficRateLimiter { + + private final io.github.resilience4j.ratelimiter.RateLimiter delegate; + + Resilience4JNetworkTrafficRateLimiter(io.github.resilience4j.ratelimiter.RateLimiter delegate) { + this.delegate = delegate; + } + + static Resilience4JNetworkTrafficRateLimiter create(int bytesPerSecond) { + RateLimiterConfig config = + RateLimiterConfig.custom() + .limitRefreshPeriod(Duration.ofSeconds(1)) + .limitForPeriod(bytesPerSecond) + .build(); + return new Resilience4JNetworkTrafficRateLimiter( + io.github.resilience4j.ratelimiter.RateLimiter.of( + "Resilience4JNetworkTrafficRateLimiter", config) + ); + } + + @Override + public void rateLimit(final int cost) { + delegate.acquirePermission(cost); + } + } +} diff --git a/core/src/test/java/io/confluent/rest/RateLimitNetworkTrafficListenerTest.java b/core/src/test/java/io/confluent/rest/RateLimitNetworkTrafficListenerTest.java new file mode 100644 index 0000000000..30a80d3aea --- /dev/null +++ b/core/src/test/java/io/confluent/rest/RateLimitNetworkTrafficListenerTest.java @@ -0,0 +1,219 @@ +/* + * Copyright 2023 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.confluent.rest; + +import static com.google.common.base.Preconditions.checkArgument; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.fail; + +import java.net.URI; +import java.time.Duration; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import javax.ws.rs.Consumes; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.client.Client; +import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.client.Entity; +import javax.ws.rs.core.Configurable; +import javax.ws.rs.core.Form; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.Status; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; + +public class RateLimitNetworkTrafficListenerTest { + + private static TestRestConfig testConfig; + private static ApplicationServer server; + private static final String TEST_MESSAGE = "Test message"; + + private ScheduledExecutorService executor; + private TestApp app; + private Client client; + + @BeforeEach + public void setup(TestInfo info) throws Exception { + Properties props = new Properties(); + props.setProperty(RestConfig.LISTENERS_CONFIG, "http://0.0.0.0:0"); + + if (info.getDisplayName().contains("NetworkTrafficRateLimitEnabled")) { + props.put(RestConfig.NETWORK_TRAFFIC_RATE_LIMIT_ENABLE_CONFIG, "true"); + props.put(RestConfig.NETWORK_TRAFFIC_RATE_LIMIT_BYTES_PER_SEC_CONFIG, 10000); + if (info.getDisplayName().contains("Resilience4j")) { + props.put(RestConfig.NETWORK_TRAFFIC_RATE_LIMIT_BACKEND_CONFIG, "Resilience4j"); + } + } + + testConfig = new TestRestConfig(props); + server = new ApplicationServer<>(testConfig); + app = new TestApp("/app"); + server.registerApplication(app); + server.start(); + + executor = Executors.newScheduledThreadPool(4); + client = ClientBuilder.newClient(); + } + + @AfterEach + public void tearDown() throws Exception { + server.stop(); + server.join(); + + client.close(); + awaitTerminationAfterShutdown(executor); + } + + @Test + @DisplayName("NetworkTrafficRateLimitDisabled") + public void testNetworkTrafficRateLimitDisabled_unlimited() throws Exception { + long startTime = System.nanoTime(); + // send 1000 POST requests in 1 second + hammerAtConstantRate(app.getServer().getURI(), "/resource", Duration.ofMillis(1), 10, 1000); + double durationMillis = (System.nanoTime() - startTime) / 1_000_000.0; + // with no rate limit, 1000 requests should finish less than 2 seconds + assertThat("Duration must be greater than 1 second", durationMillis >= 1000); + assertThat("Duration must be smaller than 2 seconds", durationMillis < 2000); + } + + @Test + @DisplayName("NetworkTrafficRateLimitEnabled") + public void testNetworkTrafficRateLimitEnabled_Guava_slowdown() throws Exception { + long startTime = System.nanoTime(); + // send 1000 POST requests in 1 second + hammerAtConstantRate(app.getServer().getURI(), "/resource", Duration.ofMillis(1), 10, 1000); + double durationMillis = (System.nanoTime() - startTime) / 1_000_000.0; + // with rate limiting, 1000 requests should finish in more than 10 seconds + assertThat("Duration must be greater than 10 seconds", + durationMillis >= Duration.ofSeconds(10).toMillis()); + } + + @Test + @DisplayName("NetworkTrafficRateLimitEnabled_Resilience4j") + public void testNetworkTrafficRateLimitEnabled_Resilience4j_slowdown() throws Exception { + long startTime = System.nanoTime(); + // send 1000 POST requests in 1 second + hammerAtConstantRate(app.getServer().getURI(), "/resource", Duration.ofMillis(1), 10, 1000); + double durationMillis = (System.nanoTime() - startTime) / 1_000_000.0; + // with rate limiting, 1000 requests should finish in more than 10 seconds + assertThat("Duration must be greater than 10 seconds", + durationMillis >= Duration.ofSeconds(10).toMillis()); + } + + // Send many concurrent requests and return the number of requests with "200" status + private int hammerAtConstantRate(URI server, + String path, Duration rate, int warmupRequests, int totalRequests) { + checkArgument(!rate.isNegative(), "rate must be non-negative"); + checkArgument(warmupRequests <= totalRequests, "warmupRequests must be at most totalRequests"); + + List responses = + IntStream.range(0, totalRequests) + .mapToObj( + i -> + executor.schedule( + () -> client.target(server) + .path(path) + .request(MediaType.APPLICATION_FORM_URLENCODED_TYPE) + .post(Entity.form(new Form("message", TEST_MESSAGE))), + /* delay= */ i * rate.toMillis(), + TimeUnit.MILLISECONDS)) + .collect(Collectors.toList()).stream() + .map( + future -> { + try { + return future.get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + }) + .collect(Collectors.toList()); + + for (Response response : responses) { + int status = response.getStatus(); + if (status != 200 && status != 429) { + fail( + String.format( + "Expected HTTP 200 or HTTP 429, but got HTTP %d instead: %s", + status, response.readEntity(String.class))); + } + } + + return (int) + responses.subList(warmupRequests, responses.size()).stream() + .filter(response -> response.getStatus() == Status.OK.getStatusCode()) + .count(); + } + + private void awaitTerminationAfterShutdown(ExecutorService threadPool) { + threadPool.shutdown(); + try { + if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) { + threadPool.shutdownNow(); + } + } catch (InterruptedException ex) { + threadPool.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + + private static class TestApp extends Application implements AutoCloseable { + + TestApp(String path) { + this(testConfig, path); + } + + TestApp(TestRestConfig config, String path) { + super(config, path); + } + + @Override + public void setupResources(final Configurable config, final TestRestConfig appConfig) { + config.register(RestResource.class); + } + + @Override + public void close() throws Exception { + stop(); + } + } + + @Path("/") + @Produces(MediaType.TEXT_PLAIN) + public static class RestResource { + + @POST + @Path("/resource") + @Consumes(MediaType.APPLICATION_FORM_URLENCODED) + @Produces(MediaType.WILDCARD) + public String post() { + return "Hello"; + } + } +} diff --git a/pom.xml b/pom.xml index 43a2b4121e..b275af47c3 100644 --- a/pom.xml +++ b/pom.xml @@ -233,6 +233,11 @@ http2-http-client-transport ${jetty.version}
+ + io.github.resilience4j + resilience4j-ratelimiter + 1.7.1 + org.glassfish.jersey.test-framework