diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java b/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java index 1a2c872d01cd..c8703eb40600 100644 --- a/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java +++ b/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java @@ -251,6 +251,14 @@ public class KsqlConfig extends AbstractConfig { "The maximum number of concurrent requests allowed for pull " + "queries on this host. Once the limit is hit, queries will fail immediately"; + public static final String KSQL_QUERY_PULL_MAX_HOURLY_BANDWIDTH_MEGABYTES_CONFIG + = "ksql.query.pull.max.hourly.bandwidth.megabytes"; + public static final Integer KSQL_QUERY_PULL_MAX_HOURLY_BANDWIDTH_MEGABYTES_DEFAULT + = Integer.MAX_VALUE; + public static final String KSQL_QUERY_PULL_MAX_HOURLY_BANDWIDTH_MEGABYTES_DOC + = "The maximum amount of pull query bandwidth in megabytes allowed over" + + " a period of one hour. Once the limit is hit, queries will fail immediately"; + public static final String KSQL_QUERY_PULL_THREAD_POOL_SIZE_CONFIG = "ksql.query.pull.thread.pool.size"; public static final Integer KSQL_QUERY_PULL_THREAD_POOL_SIZE_DEFAULT = 100; @@ -868,6 +876,13 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) { Importance.LOW, KSQL_QUERY_PULL_MAX_CONCURRENT_REQUESTS_DOC ) + .define( + KSQL_QUERY_PULL_MAX_HOURLY_BANDWIDTH_MEGABYTES_CONFIG, + Type.INT, + KSQL_QUERY_PULL_MAX_HOURLY_BANDWIDTH_MEGABYTES_DEFAULT, + Importance.HIGH, + KSQL_QUERY_PULL_MAX_HOURLY_BANDWIDTH_MEGABYTES_DOC + ) .define( KSQL_QUERY_PULL_THREAD_POOL_SIZE_CONFIG, Type.INT, diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/QueryEndpoint.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/QueryEndpoint.java index e82e3bcacc19..a100fb09196b 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/QueryEndpoint.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/QueryEndpoint.java @@ -19,6 +19,7 @@ import com.google.common.util.concurrent.RateLimiter; import io.confluent.ksql.api.server.MetricsCallbackHolder; import io.confluent.ksql.api.server.QueryHandle; +import io.confluent.ksql.api.server.SlidingWindowRateLimiter; import io.confluent.ksql.api.spi.QueryPublisher; import io.confluent.ksql.config.SessionConfig; import io.confluent.ksql.engine.KsqlEngine; @@ -81,11 +82,14 @@ public class QueryEndpoint { private final Optional pullQueryMetrics; private final RateLimiter rateLimiter; private final ConcurrencyLimiter pullConcurrencyLimiter; + private final SlidingWindowRateLimiter pullBandRateLimiter; private final HARouting routing; private final PushRouting pushRouting; private final Optional localCommands; + // CHECKSTYLE_RULES.OFF: ParameterNumberCheck public QueryEndpoint( + // CHECKSTYLE_RULES.OFF: ParameterNumberCheck final KsqlEngine ksqlEngine, final KsqlConfig ksqlConfig, final KsqlRestConfig ksqlRestConfig, @@ -93,6 +97,7 @@ public QueryEndpoint( final Optional pullQueryMetrics, final RateLimiter rateLimiter, final ConcurrencyLimiter pullConcurrencyLimiter, + final SlidingWindowRateLimiter pullBandLimiter, final HARouting routing, final PushRouting pushRouting, final Optional localCommands @@ -104,6 +109,7 @@ public QueryEndpoint( this.pullQueryMetrics = pullQueryMetrics; this.rateLimiter = rateLimiter; this.pullConcurrencyLimiter = pullConcurrencyLimiter; + this.pullBandRateLimiter = pullBandLimiter; this.routing = routing; this.pushRouting = pushRouting; this.localCommands = localCommands; @@ -218,6 +224,7 @@ private QueryPublisher createPullQueryPublisher( metrics.recordRowsProcessed( Optional.ofNullable(r).map(PullQueryResult::getTotalRowsProcessed).orElse(0L), sourceType, planType, routingNodeType); + pullBandRateLimiter.add(responseBytes); }); }); @@ -234,6 +241,7 @@ private QueryPublisher createPullQueryPublisher( PullQueryExecutionUtil.checkRateLimit(rateLimiter); final Decrementer decrementer = pullConcurrencyLimiter.increment(); + pullBandRateLimiter.allow(); try { final PullQueryResult result = ksqlEngine.executePullQuery( diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/SlidingWindowRateLimiter.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/SlidingWindowRateLimiter.java new file mode 100644 index 000000000000..7d049abb63bf --- /dev/null +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/SlidingWindowRateLimiter.java @@ -0,0 +1,116 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.api.server; + +import static io.confluent.ksql.util.KsqlPreconditions.checkArgument; + +import com.google.common.annotations.VisibleForTesting; +import io.confluent.ksql.util.KsqlException; +import io.confluent.ksql.util.Pair; +import java.util.LinkedList; +import java.util.Queue; +import org.apache.kafka.common.utils.Time; + +/** + * SlidingWindowRateLimiter keeps a log of timestamps and the size for each response returned by + * pull queries. When a response comes, we first pop all outdated timestamps outside of past hour + * before appending the new response time and size to the log. Then we decide whether this response + * should be processed depending on whether the log size has exceeded the throttleLimit. + * Many rate limiters require you to ask for access before it's granted whereas this method always + * records access (post-facto) but asks that you check first via allow if previous calls put you in + * debt. This is due to not knowing the size of the response upfront. + */ + +public class SlidingWindowRateLimiter { + + private static long NUM_BYTES_IN_ONE_MEGABYTE = 1 * 1024 * 1024; + + /** + * The log of all the responses returned in the past hour. + * It is a Queue ofPairs of (timestamp in milliseconds, response size in Bytes). + */ + private final Queue> responseSizesLog; + + /** + * Throttle limit measured in Bytes. + */ + private final long throttleLimit; + + /** + * Window size over which the throttle is supposed to be enforced measured in milliseconds. + */ + private final long slidingWindowSizeMs; + + /** + * Aggregate of pull query response sizes in the past hour + */ + private long numBytesInWindow; + + public SlidingWindowRateLimiter(final int requestLimitInMB, final long slidingWindowSizeMs) { + checkArgument(requestLimitInMB >= 0, + "Pull Query bandwidth limit can't be negative."); + checkArgument(slidingWindowSizeMs >= 0, + "Pull Query throttle window size can't be negative"); + + this.throttleLimit = (long) requestLimitInMB * NUM_BYTES_IN_ONE_MEGABYTE; + this.slidingWindowSizeMs = slidingWindowSizeMs; + this.responseSizesLog = new LinkedList<>(); + this.numBytesInWindow = 0; + } + + /** + * Checks if pull queries have returned more than the throttleLimit in the past hour. + * Throws a KsqlException is the limit has been breached + * @throws KsqlException Exception that the throttle limit has been reached for pull queries + */ + public synchronized void allow() throws KsqlException { + this.allow(Time.SYSTEM.milliseconds()); + } + + @VisibleForTesting + protected synchronized void allow(final long timestamp) throws KsqlException { + checkArgument(timestamp >= 0, + "Timestamp can't be negative."); + + while (!responseSizesLog.isEmpty() + && timestamp - responseSizesLog.peek().left >= slidingWindowSizeMs) { + this.numBytesInWindow -= responseSizesLog.poll().right; + } + if (this.numBytesInWindow > throttleLimit) { + throw new KsqlException("Host is at bandwidth rate limit for pull queries."); + } + } + + /** + * Adds the responseSizeInBytes and its timestamp to the queue of all response sizes + * in the past hour. + * @param responseSizeInBytes pull query response size measured in Bytes + */ + public synchronized void add(final long responseSizeInBytes) { + add(Time.SYSTEM.milliseconds(), responseSizeInBytes); + } + + @VisibleForTesting + protected synchronized void add(final long timestamp, final long responseSizeInBytes) { + checkArgument(timestamp >= 0, + "Timestamp can't be negative."); + checkArgument(responseSizeInBytes >= 0, + "Response size can't be negative."); + + responseSizesLog.add(new Pair<>(timestamp, responseSizeInBytes)); + this.numBytesInWindow += responseSizeInBytes; + } +} \ No newline at end of file diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java index d75aa18cdf39..fbfaf44740f1 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java @@ -32,6 +32,7 @@ import io.confluent.ksql.api.impl.KsqlSecurityContextProvider; import io.confluent.ksql.api.impl.MonitoredEndpoints; import io.confluent.ksql.api.server.Server; +import io.confluent.ksql.api.server.SlidingWindowRateLimiter; import io.confluent.ksql.api.spi.Endpoints; import io.confluent.ksql.config.SessionConfig; import io.confluent.ksql.engine.KsqlEngine; @@ -152,6 +153,8 @@ public final class KsqlRestApplication implements Executable { private static final Logger log = LoggerFactory.getLogger(KsqlRestApplication.class); + private static final int NUM_MILLISECONDS_IN_HOUR = 3600 * 1000; + private static final SourceName COMMANDS_STREAM_NAME = SourceName.of("KSQL_COMMANDS"); private final KsqlConfig ksqlConfigNoPort; @@ -192,6 +195,7 @@ public final class KsqlRestApplication implements Executable { private final Optional pullQueryMetrics; private final RateLimiter pullQueryRateLimiter; private final ConcurrencyLimiter pullConcurrencyLimiter; + private final SlidingWindowRateLimiter pullBandRateLimiter; private final HARouting pullQueryRouting; private final Optional localCommands; @@ -233,6 +237,7 @@ public static SourceName getCommandsStreamName() { final RoutingFilterFactory routingFilterFactory, final RateLimiter pullQueryRateLimiter, final ConcurrencyLimiter pullConcurrencyLimiter, + final SlidingWindowRateLimiter pullBandRateLimiter, final HARouting pullQueryRouting, final PushRouting pushQueryRouting, final Optional localCommands @@ -291,6 +296,7 @@ public static SourceName getCommandsStreamName() { this.routingFilterFactory = requireNonNull(routingFilterFactory, "routingFilterFactory"); this.pullQueryRateLimiter = requireNonNull(pullQueryRateLimiter, "pullQueryRateLimiter"); this.pullConcurrencyLimiter = requireNonNull(pullConcurrencyLimiter, "pullConcurrencyLimiter"); + this.pullBandRateLimiter = requireNonNull(pullBandRateLimiter, "pullBandRateLimiter"); this.pullQueryRouting = requireNonNull(pullQueryRouting, "pullQueryRouting"); this.pushQueryRouting = pushQueryRouting; this.localCommands = requireNonNull(localCommands, "localCommands"); @@ -336,6 +342,7 @@ public void startAsync() { routingFilterFactory, pullQueryRateLimiter, pullConcurrencyLimiter, + pullBandRateLimiter, pullQueryRouting, localCommands, pushQueryRouting @@ -362,6 +369,7 @@ public void startAsync() { pullQueryMetrics, pullQueryRateLimiter, pullConcurrencyLimiter, + pullBandRateLimiter, pullQueryRouting, pushQueryRouting, localCommands @@ -745,7 +753,9 @@ static KsqlRestApplication buildApplication( final ConcurrencyLimiter pullQueryConcurrencyLimiter = new ConcurrencyLimiter( ksqlConfig.getInt(KsqlConfig.KSQL_QUERY_PULL_MAX_CONCURRENT_REQUESTS_CONFIG), "pull queries"); - + final SlidingWindowRateLimiter pullBandRateLimiter = new SlidingWindowRateLimiter( + ksqlConfig.getInt(KsqlConfig.KSQL_QUERY_PULL_MAX_HOURLY_BANDWIDTH_MEGABYTES_CONFIG), + NUM_MILLISECONDS_IN_HOUR); final DenyListPropertyValidator denyListPropertyValidator = new DenyListPropertyValidator( ksqlConfig.getList(KsqlConfig.KSQL_PROPERTIES_OVERRIDES_DENYLIST)); @@ -779,6 +789,7 @@ static KsqlRestApplication buildApplication( routingFilterFactory, pullQueryRateLimiter, pullQueryConcurrencyLimiter, + pullBandRateLimiter, pullQueryRouting, pushQueryRouting, localCommands @@ -857,6 +868,7 @@ static KsqlRestApplication buildApplication( routingFilterFactory, pullQueryRateLimiter, pullQueryConcurrencyLimiter, + pullBandRateLimiter, pullQueryRouting, pushQueryRouting, localCommands diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerEndpoints.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerEndpoints.java index 1e6fa9ba7116..e0e7305b8b7e 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerEndpoints.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerEndpoints.java @@ -25,6 +25,7 @@ import io.confluent.ksql.api.server.InsertResult; import io.confluent.ksql.api.server.InsertsStreamSubscriber; import io.confluent.ksql.api.server.MetricsCallbackHolder; +import io.confluent.ksql.api.server.SlidingWindowRateLimiter; import io.confluent.ksql.api.spi.Endpoints; import io.confluent.ksql.api.spi.QueryPublisher; import io.confluent.ksql.engine.KsqlEngine; @@ -89,6 +90,7 @@ public class KsqlServerEndpoints implements Endpoints { private final Optional pullQueryMetrics; private final RateLimiter rateLimiter; private final ConcurrencyLimiter pullConcurrencyLimiter; + private final SlidingWindowRateLimiter pullBandRateLimiter; private final HARouting routing; private final PushRouting pushRouting; private final Optional localCommands; @@ -113,6 +115,7 @@ public KsqlServerEndpoints( final Optional pullQueryMetrics, final RateLimiter rateLimiter, final ConcurrencyLimiter pullConcurrencyLimiter, + final SlidingWindowRateLimiter pullBandRateLimiter, final HARouting routing, final PushRouting pushRouting, final Optional localCommands @@ -138,6 +141,7 @@ public KsqlServerEndpoints( this.pullQueryMetrics = Objects.requireNonNull(pullQueryMetrics); this.rateLimiter = Objects.requireNonNull(rateLimiter); this.pullConcurrencyLimiter = pullConcurrencyLimiter; + this.pullBandRateLimiter = Objects.requireNonNull(pullBandRateLimiter); this.routing = Objects.requireNonNull(routing); this.pushRouting = pushRouting; this.localCommands = Objects.requireNonNull(localCommands); @@ -158,7 +162,8 @@ public CompletableFuture createQueryPublisher(final String sql, try { return new QueryEndpoint( ksqlEngine, ksqlConfig, ksqlRestConfig, routingFilterFactory, pullQueryMetrics, - rateLimiter, pullConcurrencyLimiter, routing, pushRouting, localCommands) + rateLimiter, pullConcurrencyLimiter, pullBandRateLimiter, routing, pushRouting, + localCommands) .createQueryPublisher( sql, properties, diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisher.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisher.java index 8f2b34b84ebe..b8753c984067 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisher.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisher.java @@ -23,6 +23,7 @@ import com.google.common.util.concurrent.ListeningScheduledExecutorService; import com.google.common.util.concurrent.RateLimiter; import io.confluent.ksql.GenericRow; +import io.confluent.ksql.api.server.SlidingWindowRateLimiter; import io.confluent.ksql.engine.KsqlEngine; import io.confluent.ksql.engine.PullQueryExecutionUtil; import io.confluent.ksql.execution.streams.RoutingFilter.RoutingFilterFactory; @@ -57,10 +58,13 @@ class PullQueryPublisher implements Flow.Publisher> { private final RoutingFilterFactory routingFilterFactory; private final RateLimiter rateLimiter; private final ConcurrencyLimiter concurrencyLimiter; + private final SlidingWindowRateLimiter pullBandRateLimiter; private final HARouting routing; @VisibleForTesting + // CHECKSTYLE_RULES.OFF: ParameterNumberCheck PullQueryPublisher( + // CHECKSTYLE_RULES.OFF: ParameterNumberCheck final KsqlEngine ksqlEngine, final ServiceContext serviceContext, final ListeningScheduledExecutorService exec, @@ -70,6 +74,7 @@ class PullQueryPublisher implements Flow.Publisher> { final RoutingFilterFactory routingFilterFactory, final RateLimiter rateLimiter, final ConcurrencyLimiter concurrencyLimiter, + final SlidingWindowRateLimiter pullBandRateLimiter, final HARouting routing ) { this.ksqlEngine = requireNonNull(ksqlEngine, "ksqlEngine"); @@ -81,6 +86,7 @@ class PullQueryPublisher implements Flow.Publisher> { this.routingFilterFactory = requireNonNull(routingFilterFactory, "routingFilterFactory"); this.rateLimiter = requireNonNull(rateLimiter, "rateLimiter"); this.concurrencyLimiter = concurrencyLimiter; + this.pullBandRateLimiter = requireNonNull(pullBandRateLimiter, "pullBandRateLimiter"); this.routing = requireNonNull(routing, "routing"); } @@ -99,6 +105,7 @@ public synchronized void subscribe(final Subscriber> sub PullQueryExecutionUtil.checkRateLimit(rateLimiter); final Decrementer decrementer = concurrencyLimiter.increment(); + pullBandRateLimiter.allow(); PullQueryResult result = null; try { diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java index 7eb99ff4c270..2f2e54e86793 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java @@ -22,6 +22,7 @@ import io.confluent.ksql.GenericRow; import io.confluent.ksql.analyzer.PullQueryValidator; import io.confluent.ksql.api.server.MetricsCallbackHolder; +import io.confluent.ksql.api.server.SlidingWindowRateLimiter; import io.confluent.ksql.config.SessionConfig; import io.confluent.ksql.engine.KsqlEngine; import io.confluent.ksql.engine.PullQueryExecutionUtil; @@ -102,6 +103,7 @@ public class StreamedQueryResource implements KsqlConfigurable { private final RoutingFilterFactory routingFilterFactory; private final RateLimiter rateLimiter; private final ConcurrencyLimiter concurrencyLimiter; + private final SlidingWindowRateLimiter pullBandRateLimiter; private final HARouting routing; private final PushRouting pushRouting; private final Optional localCommands; @@ -124,6 +126,7 @@ public StreamedQueryResource( final RoutingFilterFactory routingFilterFactory, final RateLimiter rateLimiter, final ConcurrencyLimiter concurrencyLimiter, + final SlidingWindowRateLimiter pullBandRateLimiter, final HARouting routing, final PushRouting pushRouting, final Optional localCommands @@ -143,6 +146,7 @@ public StreamedQueryResource( routingFilterFactory, rateLimiter, concurrencyLimiter, + pullBandRateLimiter, routing, pushRouting, localCommands @@ -167,6 +171,7 @@ public StreamedQueryResource( final RoutingFilterFactory routingFilterFactory, final RateLimiter rateLimiter, final ConcurrencyLimiter concurrencyLimiter, + final SlidingWindowRateLimiter pullBandRateLimiter, final HARouting routing, final PushRouting pushRouting, final Optional localCommands @@ -190,6 +195,7 @@ public StreamedQueryResource( Objects.requireNonNull(routingFilterFactory, "routingFilterFactory"); this.rateLimiter = Objects.requireNonNull(rateLimiter, "rateLimiter"); this.concurrencyLimiter = Objects.requireNonNull(concurrencyLimiter, "concurrencyLimiter"); + this.pullBandRateLimiter = Objects.requireNonNull(pullBandRateLimiter, "pullBandRateLimiter"); this.routing = Objects.requireNonNull(routing, "routing"); this.pushRouting = pushRouting; this.localCommands = Objects.requireNonNull(localCommands, "localCommands"); @@ -222,7 +228,7 @@ public EndpointResponse streamQuery( commandQueue, request, commandQueueCatchupTimeout); return handleStatement(securityContext, request, statement, connectionClosedFuture, - isInternalRequest, mediaType, metricsCallbackHolder, context); + isInternalRequest, mediaType, metricsCallbackHolder, context, pullBandRateLimiter); } private void throwIfNotConfigured() { @@ -253,7 +259,8 @@ private EndpointResponse handleStatement( final Optional isInternalRequest, final KsqlMediaType mediaType, final MetricsCallbackHolder metricsCallbackHolder, - final Context context + final Context context, + final SlidingWindowRateLimiter pullBandRateLimiter ) { try { authorizationValidator.ifPresent(validator -> @@ -276,7 +283,8 @@ private EndpointResponse handleStatement( request.getRequestProperties(), isInternalRequest, connectionClosedFuture, - metricsCallbackHolder + metricsCallbackHolder, + pullBandRateLimiter ); } @@ -331,7 +339,8 @@ private EndpointResponse handlePullQuery( final Map requestProperties, final Optional isInternalRequest, final CompletableFuture connectionClosedFuture, - final MetricsCallbackHolder metricsCallbackHolder + final MetricsCallbackHolder metricsCallbackHolder, + final SlidingWindowRateLimiter pullBandRateLimiter ) { // First thing, set the metrics callback so that it gets called, even if we hit an error final AtomicReference resultForMetrics = new AtomicReference<>(null); @@ -354,6 +363,7 @@ private EndpointResponse handlePullQuery( metrics.recordRowsProcessed( Optional.ofNullable(r).map(PullQueryResult::getTotalRowsProcessed).orElse(0L), sourceType, planType, routingNodeType); + pullBandRateLimiter.add(responseBytes); }); }); @@ -396,6 +406,8 @@ private EndpointResponse handlePullQuery( PullQueryExecutionUtil.checkRateLimit(rateLimiter); decrementer = concurrencyLimiter.increment(); } + pullBandRateLimiter.allow(); + final Optional optionalDecrementer = Optional.ofNullable(decrementer); try { diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java index 8d3ef7332b8d..8f337dc9c078 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java @@ -21,6 +21,7 @@ import com.google.common.util.concurrent.ListeningScheduledExecutorService; import com.google.common.util.concurrent.RateLimiter; +import io.confluent.ksql.api.server.SlidingWindowRateLimiter; import io.confluent.ksql.config.SessionConfig; import io.confluent.ksql.engine.KsqlEngine; import io.confluent.ksql.execution.streams.RoutingFilter.RoutingFilterFactory; @@ -84,6 +85,7 @@ public class WSQueryEndpoint { private final RoutingFilterFactory routingFilterFactory; private final RateLimiter rateLimiter; private final ConcurrencyLimiter pullConcurrencyLimiter; + private final SlidingWindowRateLimiter pullBandRateLimiter; private final HARouting routing; private final Optional localCommands; private final PushRouting pushRouting; @@ -105,6 +107,7 @@ public WSQueryEndpoint( final RoutingFilterFactory routingFilterFactory, final RateLimiter rateLimiter, final ConcurrencyLimiter pullConcurrencyLimiter, + final SlidingWindowRateLimiter pullBandRateLimiter, final HARouting routing, final Optional localCommands, final PushRouting pushRouting @@ -128,6 +131,7 @@ public WSQueryEndpoint( routingFilterFactory, rateLimiter, pullConcurrencyLimiter, + pullBandRateLimiter, routing, localCommands, pushRouting @@ -155,6 +159,7 @@ public WSQueryEndpoint( final RoutingFilterFactory routingFilterFactory, final RateLimiter rateLimiter, final ConcurrencyLimiter pullConcurrencyLimiter, + final SlidingWindowRateLimiter pullBandRateLimiter, final HARouting routing, final Optional localCommands, final PushRouting pushRouting @@ -185,6 +190,7 @@ public WSQueryEndpoint( this.rateLimiter = Objects.requireNonNull(rateLimiter, "rateLimiter"); this.pullConcurrencyLimiter = Objects.requireNonNull(pullConcurrencyLimiter, "pullConcurrencyLimiter"); + this.pullBandRateLimiter = Objects.requireNonNull(pullBandRateLimiter, "pullBandRateLimiter"); this.routing = Objects.requireNonNull(routing, "routing"); this.localCommands = Objects.requireNonNull(localCommands, "localCommands"); this.pushRouting = Objects.requireNonNull(pushRouting, "pushRouting"); @@ -331,6 +337,7 @@ private void handleQuery(final RequestContext info, final Query query, routingFilterFactory, rateLimiter, pullConcurrencyLimiter, + pullBandRateLimiter, routing ); } else if (ScalablePushUtil.isScalablePushQuery( @@ -417,6 +424,7 @@ private static void startPullQueryPublisher( final RoutingFilterFactory routingFilterFactory, final RateLimiter rateLimiter, final ConcurrencyLimiter pullConcurrencyLimiter, + final SlidingWindowRateLimiter pullBandRateLimiter, final HARouting routing ) { new PullQueryPublisher( @@ -429,6 +437,7 @@ private static void startPullQueryPublisher( routingFilterFactory, rateLimiter, pullConcurrencyLimiter, + pullBandRateLimiter, routing ).subscribe(streamSubscriber); } @@ -484,6 +493,7 @@ void start( RoutingFilterFactory routingFilterFactory, RateLimiter rateLimiter, ConcurrencyLimiter pullConcurrencyLimiter, + SlidingWindowRateLimiter pullBandRateLimiter, HARouting routing ); diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/integration/PullBandwidthThrottleIntegrationTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/integration/PullBandwidthThrottleIntegrationTest.java new file mode 100644 index 000000000000..52377b3e444c --- /dev/null +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/integration/PullBandwidthThrottleIntegrationTest.java @@ -0,0 +1,223 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.api.integration; + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import io.confluent.common.utils.IntegrationTest; +import io.confluent.ksql.api.utils.QueryResponse; +import io.confluent.ksql.integration.IntegrationTestHarness; +import io.confluent.ksql.integration.Retry; +import static io.confluent.ksql.rest.Errors.ERROR_CODE_BAD_STATEMENT; +import io.confluent.ksql.rest.integration.RestIntegrationTestUtil; +import io.confluent.ksql.rest.server.TestKsqlRestApp; +import io.confluent.ksql.serde.FormatFactory; +import static io.confluent.ksql.test.util.AssertEventually.assertThatEventually; +import io.confluent.ksql.test.util.EmbeddedSingleNodeKafkaCluster; +import static io.confluent.ksql.test.util.EmbeddedSingleNodeKafkaCluster.VALID_USER2; +import io.confluent.ksql.test.util.secure.ClientTrustStore; +import io.confluent.ksql.test.util.secure.Credentials; +import io.confluent.ksql.test.util.secure.SecureKafkaHelper; +import static io.confluent.ksql.util.KsqlConfig.KSQL_DEFAULT_KEY_FORMAT_CONFIG; +import static io.confluent.ksql.util.KsqlConfig.KSQL_QUERY_PULL_MAX_HOURLY_BANDWIDTH_MEGABYTES_CONFIG; +import static io.confluent.ksql.util.KsqlConfig.KSQL_STREAMS_PREFIX; +import io.confluent.ksql.util.KsqlException; +import io.confluent.ksql.util.PageViewDataProvider; +import io.confluent.ksql.util.VertxCompletableFuture; +import io.vertx.core.Vertx; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.HttpVersion; +import io.vertx.core.json.JsonObject; +import io.vertx.ext.web.client.HttpResponse; +import io.vertx.ext.web.client.WebClient; +import io.vertx.ext.web.client.WebClientOptions; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import kafka.zookeeper.ZooKeeperClientException; +import org.apache.kafka.streams.StreamsConfig; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.startsWith; +import org.junit.After; +import org.junit.AfterClass; +import static org.junit.Assert.assertEquals; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.RuleChain; + +@Category({IntegrationTest.class}) +public class PullBandwidthThrottleIntegrationTest { + + private static final PageViewDataProvider TEST_DATA_PROVIDER = new PageViewDataProvider(); + private static final String TEST_TOPIC = TEST_DATA_PROVIDER.topicName(); + private static final String TEST_STREAM = TEST_DATA_PROVIDER.sourceName(); + + private static final String AGG_TABLE = "AGG_TABLE"; + private static final Credentials NORMAL_USER = VALID_USER2; + + private static final IntegrationTestHarness TEST_HARNESS = IntegrationTestHarness.builder() + .withKafkaCluster( + EmbeddedSingleNodeKafkaCluster.newBuilder() + .withoutPlainListeners() + .withSaslSslListeners() + ).build(); + + private static final TestKsqlRestApp REST_APP = TestKsqlRestApp + .builder(TEST_HARNESS::kafkaBootstrapServers) + .withProperty("security.protocol", "SASL_SSL") + .withProperty("sasl.mechanism", "PLAIN") + .withProperty("sasl.jaas.config", SecureKafkaHelper.buildJaasConfig(NORMAL_USER)) + .withProperty("ksql.query.pull.table.scan.enabled", true) + .withProperties(ClientTrustStore.trustStoreProps()) + .withProperty(KSQL_QUERY_PULL_MAX_HOURLY_BANDWIDTH_MEGABYTES_CONFIG, 1) + .withProperty(KSQL_STREAMS_PREFIX + StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1) + .withProperty(KSQL_DEFAULT_KEY_FORMAT_CONFIG, "JSON") + .build(); + + @ClassRule + public static final RuleChain CHAIN = RuleChain + .outerRule(Retry.of(3, ZooKeeperClientException.class, 3, TimeUnit.SECONDS)) + .around(TEST_HARNESS) + .around(REST_APP); + private static final String RATE_LIMIT_MESSAGE = "Host is at bandwidth rate limit for pull queries."; + + @BeforeClass + public static void setUpClass() { + TEST_HARNESS.ensureTopics(TEST_TOPIC); + + TEST_HARNESS.produceRows(TEST_TOPIC, TEST_DATA_PROVIDER, FormatFactory.JSON, FormatFactory.JSON); + + RestIntegrationTestUtil.createStream(REST_APP, TEST_DATA_PROVIDER); + + makeKsqlRequest("CREATE TABLE " + AGG_TABLE + " AS " + + "SELECT PAGEID, LATEST_BY_OFFSET(USERID) AS USERID FROM " + TEST_STREAM + " GROUP BY PAGEID;" + ); + } + + @AfterClass + public static void classTearDown() { + REST_APP.getPersistentQueries().forEach(str -> makeKsqlRequest("TERMINATE " + str + ";")); + } + + private Vertx vertx; + private WebClient client; + + @Before + public void setUp() { + vertx = Vertx.vertx(); + client = createClient(); + } + + @After + public void tearDown() { + if (client != null) { + client.close(); + } + if (vertx != null) { + vertx.close(); + } + REST_APP.getServiceContext().close(); + } + + @SuppressFBWarnings({"DLS_DEAD_LOCAL_STORE"}) + @Test + public void pullBandwidthThrottleTest() { + String veryLong = createDataSize(100000); + + String sql = "SELECT CONCAT(\'"+ veryLong + "\') as placeholder from " + AGG_TABLE + ";"; + + //the pull query should go through 2 times + for (int i = 0; i < 2; i += 1) { + AtomicReference atomicReference1 = new AtomicReference<>(); + assertThatEventually(() -> { + QueryResponse queryResponse1 = executeQuery(sql); + atomicReference1.set(queryResponse1); + return queryResponse1.rows; + }, hasSize(5)); + } + + //the third try should fail + try { + QueryResponse queryResponse3 = executeQuery(sql); + } catch (KsqlException e) { + assertEquals(RATE_LIMIT_MESSAGE, e.getMessage()); + } + } + + private static String createDataSize(int msgSize) { + StringBuilder sb = new StringBuilder(msgSize); + for (int i=0; i response = sendRequest("/query-stream", requestBody.toBuffer()); + return new QueryResponse(response.bodyAsString()); + } + + private WebClient createClient() { + WebClientOptions options = new WebClientOptions(). + setProtocolVersion(HttpVersion.HTTP_2).setHttp2ClearTextUpgrade(false) + .setDefaultHost("localhost").setDefaultPort(REST_APP.getListeners().get(0).getPort()); + return WebClient.create(vertx, options); + } + + private HttpResponse sendRequest(final String uri, final Buffer requestBody) { + return sendRequest(client, uri, requestBody); + } + + private HttpResponse sendRequest(final WebClient client, final String uri, + final Buffer requestBody) { + VertxCompletableFuture> requestFuture = new VertxCompletableFuture<>(); + client + .post(uri) + .sendBuffer(requestBody, requestFuture); + try { + return requestFuture.get(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private static void makeKsqlRequest(final String sql) { + RestIntegrationTestUtil.makeKsqlRequest(REST_APP, sql); + } +} diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/server/SlidingWindowRateLimiterTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/server/SlidingWindowRateLimiterTest.java new file mode 100644 index 000000000000..d89a26c2ccdf --- /dev/null +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/server/SlidingWindowRateLimiterTest.java @@ -0,0 +1,84 @@ +package io.confluent.ksql.api.server; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.fail; + +import io.confluent.ksql.util.KsqlException; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class SlidingWindowRateLimiterTest { + private SlidingWindowRateLimiter limiter; + private static String RATE_LIMIT_MESSAGE = "Host is at bandwidth rate limit for pull queries."; + private static String TEST_SHOULD_NOT_FAIL = "This test should not throw an exception"; + + @Before + public void setUp() { + limiter = new SlidingWindowRateLimiter(1, 5L * 1000L); + } + + @Test + public void bigInitialResponse() { + Throwable exception = assertThrows(KsqlException.class, () -> { + limiter.add(0L, 1148576L); + limiter.allow(1000L); + }); + assertEquals(RATE_LIMIT_MESSAGE, exception.getMessage()); + } + + @Test + public void uniformResponsesUnderLimit() { + try { + for (long i = 0L; i < 30L; i += 1L) { + limiter.add(i * 500L, 100000L); + limiter.allow(i * 500L + 1L); + } + } catch (Exception e) { + fail(TEST_SHOULD_NOT_FAIL); + } + } + + @Test + public void uniformResponsesOverLimit() { + Throwable exception = assertThrows(KsqlException.class, () -> { + for (long i = 0L; i < 11L; i += 1L) { + limiter.add(i * 400L, 100000L); + limiter.allow(i * 400L + 1L); + } + }); + + assertEquals(RATE_LIMIT_MESSAGE, exception.getMessage()); + } + + @Test + public void justUnderForAWhileThenOverLimit() { + try { + for (long i = 0L; i < 5L; i += 1L) { + limiter.add(i * 500L, i * 100000L); + limiter.allow(i * 500L + 1L); + } + } catch (Exception e) { + fail(TEST_SHOULD_NOT_FAIL); + } + + try { + limiter.allow(3499L); + limiter.add(3500L, 80000L); + } catch (Exception e) { + fail(TEST_SHOULD_NOT_FAIL); + } + + Throwable exception = assertThrows(KsqlException.class, () -> { + for (long i = 10L; i < 18L; i += 1L) { + limiter.add(i * 600L, 140000L); + limiter.allow(i * 600L + 1L); + } + }); + + assertEquals(RATE_LIMIT_MESSAGE, exception.getMessage()); + } +} diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java index 6d3770523b16..6add52e995fd 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java @@ -15,6 +15,7 @@ package io.confluent.ksql.rest.server; +import io.confluent.ksql.api.server.SlidingWindowRateLimiter; import static org.hamcrest.CoreMatchers.hasItem; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; @@ -143,6 +144,8 @@ public class KsqlRestApplicationTest { @Mock private ConcurrencyLimiter concurrencyLimiter; @Mock + private SlidingWindowRateLimiter pullBandRateLimiter; + @Mock private HARouting haRouting; @Mock private PushRouting pushRouting; @@ -489,6 +492,7 @@ private void givenAppWithRestConfig(final Map restConfigMap) { routingFilterFactory, rateLimiter, concurrencyLimiter, + pullBandRateLimiter, haRouting, pushRouting, Optional.empty() diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/WSQueryEndpointTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/WSQueryEndpointTest.java index db3a0dd880fd..fdf289347f3d 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/WSQueryEndpointTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/WSQueryEndpointTest.java @@ -15,6 +15,7 @@ package io.confluent.ksql.rest.server.resources; +import io.confluent.ksql.api.server.SlidingWindowRateLimiter; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -86,6 +87,7 @@ public void setUp() { mock(RoutingFilterFactory.class), mock(RateLimiter.class), mock(ConcurrencyLimiter.class), + mock(SlidingWindowRateLimiter.class), mock(HARouting.class), Optional.empty(), mock(PushRouting.class) diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisherTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisherTest.java index e6e26dd50d37..08e70cb3cfe9 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisherTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisherTest.java @@ -16,6 +16,7 @@ package io.confluent.ksql.rest.server.resources.streaming; import static com.google.common.util.concurrent.RateLimiter.create; +import io.confluent.ksql.api.server.SlidingWindowRateLimiter; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; import static org.junit.Assert.assertThrows; @@ -107,6 +108,8 @@ public class PullQueryPublisherTest { @Mock private ConcurrencyLimiter concurrencyLimiter; @Mock + private SlidingWindowRateLimiter pullBandRateLimiter; + @Mock private Decrementer decrementer; @Captor @@ -131,6 +134,7 @@ public void setUp() { routingFilterFactory, create(1), concurrencyLimiter, + pullBandRateLimiter, haRouting); when(statement.getSessionConfig()).thenReturn(sessionConfig); diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java index c1f45c8ee157..65fda4b19048 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java @@ -51,6 +51,7 @@ import io.confluent.ksql.GenericRow; import io.confluent.ksql.api.server.MetricsCallbackHolder; import io.confluent.ksql.api.server.StreamingOutput; +import io.confluent.ksql.api.server.SlidingWindowRateLimiter; import io.confluent.ksql.config.SessionConfig; import io.confluent.ksql.engine.KsqlEngine; import io.confluent.ksql.engine.PullQueryExecutionUtil; @@ -199,6 +200,8 @@ public class StreamedQueryResourceTest { @Mock private ConcurrencyLimiter concurrencyLimiter; @Mock + private SlidingWindowRateLimiter pullBandRateLimiter; + @Mock private KsqlConfig ksqlConfig; @Mock private KsqlRestConfig ksqlRestConfig; @@ -257,6 +260,7 @@ public void setup() { routingFilterFactory, rateLimiter, concurrencyLimiter, + pullBandRateLimiter, haRouting, pushRouting, Optional.empty() @@ -345,6 +349,7 @@ public void shouldRateLimit() { routingFilterFactory, pullQueryRateLimiter, concurrencyLimiter, + pullBandRateLimiter, haRouting, pushRouting, Optional.empty() @@ -453,6 +458,7 @@ public void shouldThrowOnHandleStatementIfNotConfigured() { routingFilterFactory, rateLimiter, concurrencyLimiter, + pullBandRateLimiter, haRouting, pushRouting, Optional.empty() @@ -635,6 +641,7 @@ public void shouldThrowOnDenyListedStreamProperty() { routingFilterFactory, rateLimiter, concurrencyLimiter, + pullBandRateLimiter, haRouting, pushRouting, Optional.empty()