Skip to content

Commit

Permalink
feat: pull query bandwidth based throttling (#7738)
Browse files Browse the repository at this point in the history
* first poc

* everything except ws

* refactor

* remove useless prints

* remove useless prints 2

* synchronize

* basic unit test

* clean up unit test

* arg checking

* minor cleanup

* minor cleanup checkstyle

* final clean up

* make config megabytes

* nit config name change

* spotbugs fix

Co-authored-by: Chittaranjan Prasad <>
  • Loading branch information
cprasad1 committed Jul 1, 2021
1 parent 75806a0 commit 8f01ad9
Show file tree
Hide file tree
Showing 14 changed files with 515 additions and 6 deletions.
15 changes: 15 additions & 0 deletions ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,14 @@ public class KsqlConfig extends AbstractConfig {
"The maximum number of concurrent requests allowed for pull "
+ "queries on this host. Once the limit is hit, queries will fail immediately";

public static final String KSQL_QUERY_PULL_MAX_HOURLY_BANDWIDTH_MEGABYTES_CONFIG
= "ksql.query.pull.max.hourly.bandwidth.megabytes";
public static final Integer KSQL_QUERY_PULL_MAX_HOURLY_BANDWIDTH_MEGABYTES_DEFAULT
= Integer.MAX_VALUE;
public static final String KSQL_QUERY_PULL_MAX_HOURLY_BANDWIDTH_MEGABYTES_DOC
= "The maximum amount of pull query bandwidth in megabytes allowed over"
+ " a period of one hour. Once the limit is hit, queries will fail immediately";

public static final String KSQL_QUERY_PULL_THREAD_POOL_SIZE_CONFIG
= "ksql.query.pull.thread.pool.size";
public static final Integer KSQL_QUERY_PULL_THREAD_POOL_SIZE_DEFAULT = 100;
Expand Down Expand Up @@ -868,6 +876,13 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
Importance.LOW,
KSQL_QUERY_PULL_MAX_CONCURRENT_REQUESTS_DOC
)
.define(
KSQL_QUERY_PULL_MAX_HOURLY_BANDWIDTH_MEGABYTES_CONFIG,
Type.INT,
KSQL_QUERY_PULL_MAX_HOURLY_BANDWIDTH_MEGABYTES_DEFAULT,
Importance.HIGH,
KSQL_QUERY_PULL_MAX_HOURLY_BANDWIDTH_MEGABYTES_DOC
)
.define(
KSQL_QUERY_PULL_THREAD_POOL_SIZE_CONFIG,
Type.INT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.common.util.concurrent.RateLimiter;
import io.confluent.ksql.api.server.MetricsCallbackHolder;
import io.confluent.ksql.api.server.QueryHandle;
import io.confluent.ksql.api.server.SlidingWindowRateLimiter;
import io.confluent.ksql.api.spi.QueryPublisher;
import io.confluent.ksql.config.SessionConfig;
import io.confluent.ksql.engine.KsqlEngine;
Expand Down Expand Up @@ -81,18 +82,22 @@ public class QueryEndpoint {
private final Optional<PullQueryExecutorMetrics> pullQueryMetrics;
private final RateLimiter rateLimiter;
private final ConcurrencyLimiter pullConcurrencyLimiter;
private final SlidingWindowRateLimiter pullBandRateLimiter;
private final HARouting routing;
private final PushRouting pushRouting;
private final Optional<LocalCommands> localCommands;

// CHECKSTYLE_RULES.OFF: ParameterNumberCheck
public QueryEndpoint(
// CHECKSTYLE_RULES.OFF: ParameterNumberCheck
final KsqlEngine ksqlEngine,
final KsqlConfig ksqlConfig,
final KsqlRestConfig ksqlRestConfig,
final RoutingFilterFactory routingFilterFactory,
final Optional<PullQueryExecutorMetrics> pullQueryMetrics,
final RateLimiter rateLimiter,
final ConcurrencyLimiter pullConcurrencyLimiter,
final SlidingWindowRateLimiter pullBandLimiter,
final HARouting routing,
final PushRouting pushRouting,
final Optional<LocalCommands> localCommands
Expand All @@ -104,6 +109,7 @@ public QueryEndpoint(
this.pullQueryMetrics = pullQueryMetrics;
this.rateLimiter = rateLimiter;
this.pullConcurrencyLimiter = pullConcurrencyLimiter;
this.pullBandRateLimiter = pullBandLimiter;
this.routing = routing;
this.pushRouting = pushRouting;
this.localCommands = localCommands;
Expand Down Expand Up @@ -218,6 +224,7 @@ private QueryPublisher createPullQueryPublisher(
metrics.recordRowsProcessed(
Optional.ofNullable(r).map(PullQueryResult::getTotalRowsProcessed).orElse(0L),
sourceType, planType, routingNodeType);
pullBandRateLimiter.add(responseBytes);
});
});

Expand All @@ -234,6 +241,7 @@ private QueryPublisher createPullQueryPublisher(

PullQueryExecutionUtil.checkRateLimit(rateLimiter);
final Decrementer decrementer = pullConcurrencyLimiter.increment();
pullBandRateLimiter.allow();

try {
final PullQueryResult result = ksqlEngine.executePullQuery(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.api.server;

import static io.confluent.ksql.util.KsqlPreconditions.checkArgument;

import com.google.common.annotations.VisibleForTesting;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.Pair;
import java.util.LinkedList;
import java.util.Queue;
import org.apache.kafka.common.utils.Time;

/**
* SlidingWindowRateLimiter keeps a log of timestamps and the size for each response returned by
* pull queries. When a response comes, we first pop all outdated timestamps outside of past hour
* before appending the new response time and size to the log. Then we decide whether this response
* should be processed depending on whether the log size has exceeded the throttleLimit.
* Many rate limiters require you to ask for access before it's granted whereas this method always
* records access (post-facto) but asks that you check first via allow if previous calls put you in
* debt. This is due to not knowing the size of the response upfront.
*/

public class SlidingWindowRateLimiter {

private static long NUM_BYTES_IN_ONE_MEGABYTE = 1 * 1024 * 1024;

/**
* The log of all the responses returned in the past hour.
* It is a Queue ofPairs of (timestamp in milliseconds, response size in Bytes).
*/
private final Queue<Pair<Long, Long>> responseSizesLog;

/**
* Throttle limit measured in Bytes.
*/
private final long throttleLimit;

/**
* Window size over which the throttle is supposed to be enforced measured in milliseconds.
*/
private final long slidingWindowSizeMs;

/**
* Aggregate of pull query response sizes in the past hour
*/
private long numBytesInWindow;

public SlidingWindowRateLimiter(final int requestLimitInMB, final long slidingWindowSizeMs) {
checkArgument(requestLimitInMB >= 0,
"Pull Query bandwidth limit can't be negative.");
checkArgument(slidingWindowSizeMs >= 0,
"Pull Query throttle window size can't be negative");

this.throttleLimit = (long) requestLimitInMB * NUM_BYTES_IN_ONE_MEGABYTE;
this.slidingWindowSizeMs = slidingWindowSizeMs;
this.responseSizesLog = new LinkedList<>();
this.numBytesInWindow = 0;
}

/**
* Checks if pull queries have returned more than the throttleLimit in the past hour.
* Throws a KsqlException is the limit has been breached
* @throws KsqlException Exception that the throttle limit has been reached for pull queries
*/
public synchronized void allow() throws KsqlException {
this.allow(Time.SYSTEM.milliseconds());
}

@VisibleForTesting
protected synchronized void allow(final long timestamp) throws KsqlException {
checkArgument(timestamp >= 0,
"Timestamp can't be negative.");

while (!responseSizesLog.isEmpty()
&& timestamp - responseSizesLog.peek().left >= slidingWindowSizeMs) {
this.numBytesInWindow -= responseSizesLog.poll().right;
}
if (this.numBytesInWindow > throttleLimit) {
throw new KsqlException("Host is at bandwidth rate limit for pull queries.");
}
}

/**
* Adds the responseSizeInBytes and its timestamp to the queue of all response sizes
* in the past hour.
* @param responseSizeInBytes pull query response size measured in Bytes
*/
public synchronized void add(final long responseSizeInBytes) {
add(Time.SYSTEM.milliseconds(), responseSizeInBytes);
}

@VisibleForTesting
protected synchronized void add(final long timestamp, final long responseSizeInBytes) {
checkArgument(timestamp >= 0,
"Timestamp can't be negative.");
checkArgument(responseSizeInBytes >= 0,
"Response size can't be negative.");

responseSizesLog.add(new Pair<>(timestamp, responseSizeInBytes));
this.numBytesInWindow += responseSizeInBytes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.confluent.ksql.api.impl.KsqlSecurityContextProvider;
import io.confluent.ksql.api.impl.MonitoredEndpoints;
import io.confluent.ksql.api.server.Server;
import io.confluent.ksql.api.server.SlidingWindowRateLimiter;
import io.confluent.ksql.api.spi.Endpoints;
import io.confluent.ksql.config.SessionConfig;
import io.confluent.ksql.engine.KsqlEngine;
Expand Down Expand Up @@ -152,6 +153,8 @@ public final class KsqlRestApplication implements Executable {

private static final Logger log = LoggerFactory.getLogger(KsqlRestApplication.class);

private static final int NUM_MILLISECONDS_IN_HOUR = 3600 * 1000;

private static final SourceName COMMANDS_STREAM_NAME = SourceName.of("KSQL_COMMANDS");

private final KsqlConfig ksqlConfigNoPort;
Expand Down Expand Up @@ -192,6 +195,7 @@ public final class KsqlRestApplication implements Executable {
private final Optional<PullQueryExecutorMetrics> pullQueryMetrics;
private final RateLimiter pullQueryRateLimiter;
private final ConcurrencyLimiter pullConcurrencyLimiter;
private final SlidingWindowRateLimiter pullBandRateLimiter;
private final HARouting pullQueryRouting;
private final Optional<LocalCommands> localCommands;

Expand Down Expand Up @@ -233,6 +237,7 @@ public static SourceName getCommandsStreamName() {
final RoutingFilterFactory routingFilterFactory,
final RateLimiter pullQueryRateLimiter,
final ConcurrencyLimiter pullConcurrencyLimiter,
final SlidingWindowRateLimiter pullBandRateLimiter,
final HARouting pullQueryRouting,
final PushRouting pushQueryRouting,
final Optional<LocalCommands> localCommands
Expand Down Expand Up @@ -291,6 +296,7 @@ public static SourceName getCommandsStreamName() {
this.routingFilterFactory = requireNonNull(routingFilterFactory, "routingFilterFactory");
this.pullQueryRateLimiter = requireNonNull(pullQueryRateLimiter, "pullQueryRateLimiter");
this.pullConcurrencyLimiter = requireNonNull(pullConcurrencyLimiter, "pullConcurrencyLimiter");
this.pullBandRateLimiter = requireNonNull(pullBandRateLimiter, "pullBandRateLimiter");
this.pullQueryRouting = requireNonNull(pullQueryRouting, "pullQueryRouting");
this.pushQueryRouting = pushQueryRouting;
this.localCommands = requireNonNull(localCommands, "localCommands");
Expand Down Expand Up @@ -336,6 +342,7 @@ public void startAsync() {
routingFilterFactory,
pullQueryRateLimiter,
pullConcurrencyLimiter,
pullBandRateLimiter,
pullQueryRouting,
localCommands,
pushQueryRouting
Expand All @@ -362,6 +369,7 @@ public void startAsync() {
pullQueryMetrics,
pullQueryRateLimiter,
pullConcurrencyLimiter,
pullBandRateLimiter,
pullQueryRouting,
pushQueryRouting,
localCommands
Expand Down Expand Up @@ -745,7 +753,9 @@ static KsqlRestApplication buildApplication(
final ConcurrencyLimiter pullQueryConcurrencyLimiter = new ConcurrencyLimiter(
ksqlConfig.getInt(KsqlConfig.KSQL_QUERY_PULL_MAX_CONCURRENT_REQUESTS_CONFIG),
"pull queries");

final SlidingWindowRateLimiter pullBandRateLimiter = new SlidingWindowRateLimiter(
ksqlConfig.getInt(KsqlConfig.KSQL_QUERY_PULL_MAX_HOURLY_BANDWIDTH_MEGABYTES_CONFIG),
NUM_MILLISECONDS_IN_HOUR);
final DenyListPropertyValidator denyListPropertyValidator = new DenyListPropertyValidator(
ksqlConfig.getList(KsqlConfig.KSQL_PROPERTIES_OVERRIDES_DENYLIST));

Expand Down Expand Up @@ -779,6 +789,7 @@ static KsqlRestApplication buildApplication(
routingFilterFactory,
pullQueryRateLimiter,
pullQueryConcurrencyLimiter,
pullBandRateLimiter,
pullQueryRouting,
pushQueryRouting,
localCommands
Expand Down Expand Up @@ -857,6 +868,7 @@ static KsqlRestApplication buildApplication(
routingFilterFactory,
pullQueryRateLimiter,
pullQueryConcurrencyLimiter,
pullBandRateLimiter,
pullQueryRouting,
pushQueryRouting,
localCommands
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.confluent.ksql.api.server.InsertResult;
import io.confluent.ksql.api.server.InsertsStreamSubscriber;
import io.confluent.ksql.api.server.MetricsCallbackHolder;
import io.confluent.ksql.api.server.SlidingWindowRateLimiter;
import io.confluent.ksql.api.spi.Endpoints;
import io.confluent.ksql.api.spi.QueryPublisher;
import io.confluent.ksql.engine.KsqlEngine;
Expand Down Expand Up @@ -89,6 +90,7 @@ public class KsqlServerEndpoints implements Endpoints {
private final Optional<PullQueryExecutorMetrics> pullQueryMetrics;
private final RateLimiter rateLimiter;
private final ConcurrencyLimiter pullConcurrencyLimiter;
private final SlidingWindowRateLimiter pullBandRateLimiter;
private final HARouting routing;
private final PushRouting pushRouting;
private final Optional<LocalCommands> localCommands;
Expand All @@ -113,6 +115,7 @@ public KsqlServerEndpoints(
final Optional<PullQueryExecutorMetrics> pullQueryMetrics,
final RateLimiter rateLimiter,
final ConcurrencyLimiter pullConcurrencyLimiter,
final SlidingWindowRateLimiter pullBandRateLimiter,
final HARouting routing,
final PushRouting pushRouting,
final Optional<LocalCommands> localCommands
Expand All @@ -138,6 +141,7 @@ public KsqlServerEndpoints(
this.pullQueryMetrics = Objects.requireNonNull(pullQueryMetrics);
this.rateLimiter = Objects.requireNonNull(rateLimiter);
this.pullConcurrencyLimiter = pullConcurrencyLimiter;
this.pullBandRateLimiter = Objects.requireNonNull(pullBandRateLimiter);
this.routing = Objects.requireNonNull(routing);
this.pushRouting = pushRouting;
this.localCommands = Objects.requireNonNull(localCommands);
Expand All @@ -158,7 +162,8 @@ public CompletableFuture<QueryPublisher> createQueryPublisher(final String sql,
try {
return new QueryEndpoint(
ksqlEngine, ksqlConfig, ksqlRestConfig, routingFilterFactory, pullQueryMetrics,
rateLimiter, pullConcurrencyLimiter, routing, pushRouting, localCommands)
rateLimiter, pullConcurrencyLimiter, pullBandRateLimiter, routing, pushRouting,
localCommands)
.createQueryPublisher(
sql,
properties,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.RateLimiter;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.api.server.SlidingWindowRateLimiter;
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.engine.PullQueryExecutionUtil;
import io.confluent.ksql.execution.streams.RoutingFilter.RoutingFilterFactory;
Expand Down Expand Up @@ -57,10 +58,13 @@ class PullQueryPublisher implements Flow.Publisher<Collection<StreamedRow>> {
private final RoutingFilterFactory routingFilterFactory;
private final RateLimiter rateLimiter;
private final ConcurrencyLimiter concurrencyLimiter;
private final SlidingWindowRateLimiter pullBandRateLimiter;
private final HARouting routing;

@VisibleForTesting
// CHECKSTYLE_RULES.OFF: ParameterNumberCheck
PullQueryPublisher(
// CHECKSTYLE_RULES.OFF: ParameterNumberCheck
final KsqlEngine ksqlEngine,
final ServiceContext serviceContext,
final ListeningScheduledExecutorService exec,
Expand All @@ -70,6 +74,7 @@ class PullQueryPublisher implements Flow.Publisher<Collection<StreamedRow>> {
final RoutingFilterFactory routingFilterFactory,
final RateLimiter rateLimiter,
final ConcurrencyLimiter concurrencyLimiter,
final SlidingWindowRateLimiter pullBandRateLimiter,
final HARouting routing
) {
this.ksqlEngine = requireNonNull(ksqlEngine, "ksqlEngine");
Expand All @@ -81,6 +86,7 @@ class PullQueryPublisher implements Flow.Publisher<Collection<StreamedRow>> {
this.routingFilterFactory = requireNonNull(routingFilterFactory, "routingFilterFactory");
this.rateLimiter = requireNonNull(rateLimiter, "rateLimiter");
this.concurrencyLimiter = concurrencyLimiter;
this.pullBandRateLimiter = requireNonNull(pullBandRateLimiter, "pullBandRateLimiter");
this.routing = requireNonNull(routing, "routing");
}

Expand All @@ -99,6 +105,7 @@ public synchronized void subscribe(final Subscriber<Collection<StreamedRow>> sub

PullQueryExecutionUtil.checkRateLimit(rateLimiter);
final Decrementer decrementer = concurrencyLimiter.increment();
pullBandRateLimiter.allow();

PullQueryResult result = null;
try {
Expand Down

0 comments on commit 8f01ad9

Please sign in to comment.