Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: limit the number of active push queries everywhere using "ksql.max.push.queries" config #7109

Merged
merged 8 commits into from Mar 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -33,9 +33,11 @@
import io.confluent.ksql.physical.pull.HARouting;
import io.confluent.ksql.physical.pull.PullQueryResult;
import io.confluent.ksql.query.BlockingRowQueue;
import io.confluent.ksql.rest.server.KsqlRestConfig;
import io.confluent.ksql.rest.server.LocalCommands;
import io.confluent.ksql.rest.server.resources.streaming.PullQueryConfigPlannerOptions;
import io.confluent.ksql.rest.server.resources.streaming.PullQueryConfigRoutingOptions;
import io.confluent.ksql.rest.util.QueryCapacityUtil;
import io.confluent.ksql.schema.ksql.Column;
import io.confluent.ksql.schema.utils.FormatOptions;
import io.confluent.ksql.services.ServiceContext;
Expand All @@ -60,6 +62,7 @@ public class QueryEndpoint {

private final KsqlEngine ksqlEngine;
private final KsqlConfig ksqlConfig;
private final KsqlRestConfig ksqlRestConfig;
private final RoutingFilterFactory routingFilterFactory;
private final Optional<PullQueryExecutorMetrics> pullQueryMetrics;
private final RateLimiter rateLimiter;
Expand All @@ -69,6 +72,7 @@ public class QueryEndpoint {
public QueryEndpoint(
final KsqlEngine ksqlEngine,
final KsqlConfig ksqlConfig,
final KsqlRestConfig ksqlRestConfig,
final RoutingFilterFactory routingFilterFactory,
final Optional<PullQueryExecutorMetrics> pullQueryMetrics,
final RateLimiter rateLimiter,
Expand All @@ -77,6 +81,7 @@ public QueryEndpoint(
) {
this.ksqlEngine = ksqlEngine;
this.ksqlConfig = ksqlConfig;
this.ksqlRestConfig = ksqlRestConfig;
this.routingFilterFactory = routingFilterFactory;
this.pullQueryMetrics = pullQueryMetrics;
this.rateLimiter = rateLimiter;
Expand Down Expand Up @@ -113,6 +118,14 @@ private QueryPublisher createPushQueryPublisher(
) {
final BlockingQueryPublisher publisher = new BlockingQueryPublisher(context, workerExecutor);

if (QueryCapacityUtil.exceedsPushQueryCapacity(ksqlEngine, ksqlRestConfig)) {
QueryCapacityUtil.throwTooManyActivePushQueriesException(
ksqlEngine,
ksqlRestConfig,
statement.getStatementText()
);
}

final TransientQueryMetadata queryMetadata = ksqlEngine
.executeQuery(serviceContext, statement, true);

Expand Down
Expand Up @@ -336,6 +336,7 @@ public void startAsync() {
final Endpoints endpoints = new KsqlServerEndpoints(
ksqlEngine,
ksqlConfigNoPort,
restConfig,
routingFilterFactory,
ksqlSecurityContextProvider,
ksqlResource,
Expand Down Expand Up @@ -747,6 +748,7 @@ static KsqlRestApplication buildApplication(

final StreamedQueryResource streamedQueryResource = new StreamedQueryResource(
ksqlEngine,
restConfig,
commandStore,
Duration.ofMillis(
restConfig.getLong(KsqlRestConfig.STREAMED_QUERY_DISCONNECT_CHECK_MS_CONFIG)),
Expand Down
Expand Up @@ -68,6 +68,7 @@ public class KsqlServerEndpoints implements Endpoints {

private final KsqlEngine ksqlEngine;
private final KsqlConfig ksqlConfig;
private final KsqlRestConfig ksqlRestConfig;
private final RoutingFilterFactory routingFilterFactory;
private final ReservedInternalTopics reservedInternalTopics;
private final KsqlSecurityContextProvider ksqlSecurityContextProvider;
Expand All @@ -90,6 +91,7 @@ public class KsqlServerEndpoints implements Endpoints {
public KsqlServerEndpoints(
final KsqlEngine ksqlEngine,
final KsqlConfig ksqlConfig,
final KsqlRestConfig ksqlRestConfig,
final RoutingFilterFactory routingFilterFactory,
final KsqlSecurityContextProvider ksqlSecurityContextProvider,
final KsqlResource ksqlResource,
Expand All @@ -111,6 +113,7 @@ public KsqlServerEndpoints(
// CHECKSTYLE_RULES.ON: ParameterNumber
this.ksqlEngine = Objects.requireNonNull(ksqlEngine);
this.ksqlConfig = Objects.requireNonNull(ksqlConfig);
this.ksqlRestConfig = Objects.requireNonNull(ksqlRestConfig);
this.routingFilterFactory = Objects.requireNonNull(routingFilterFactory);
this.reservedInternalTopics = new ReservedInternalTopics(ksqlConfig);
this.ksqlSecurityContextProvider = Objects.requireNonNull(ksqlSecurityContextProvider);
Expand Down Expand Up @@ -141,8 +144,8 @@ public CompletableFuture<QueryPublisher> createQueryPublisher(final String sql,
return executeOnWorker(() -> {
try {
return new QueryEndpoint(
ksqlEngine, ksqlConfig, routingFilterFactory, pullQueryMetrics, rateLimiter, routing,
localCommands)
ksqlEngine, ksqlConfig, ksqlRestConfig, routingFilterFactory, pullQueryMetrics,
rateLimiter, routing, localCommands)
.createQueryPublisher(
sql,
properties,
Expand Down
Expand Up @@ -38,12 +38,14 @@
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.entity.KsqlMediaType;
import io.confluent.ksql.rest.entity.KsqlRequest;
import io.confluent.ksql.rest.server.KsqlRestConfig;
import io.confluent.ksql.rest.server.LocalCommands;
import io.confluent.ksql.rest.server.StatementParser;
import io.confluent.ksql.rest.server.computation.CommandQueue;
import io.confluent.ksql.rest.server.resources.KsqlConfigurable;
import io.confluent.ksql.rest.server.resources.KsqlRestException;
import io.confluent.ksql.rest.util.CommandStoreUtil;
import io.confluent.ksql.rest.util.QueryCapacityUtil;
import io.confluent.ksql.security.KsqlAuthorizationValidator;
import io.confluent.ksql.security.KsqlSecurityContext;
import io.confluent.ksql.services.ServiceContext;
Expand Down Expand Up @@ -90,10 +92,12 @@ public class StreamedQueryResource implements KsqlConfigurable {
private final Optional<LocalCommands> localCommands;

private KsqlConfig ksqlConfig;
private KsqlRestConfig ksqlRestConfig;

@SuppressWarnings("checkstyle:ParameterNumber")
public StreamedQueryResource(
final KsqlEngine ksqlEngine,
final KsqlRestConfig ksqlRestConfig,
final CommandQueue commandQueue,
final Duration disconnectCheckInterval,
final Duration commandQueueCatchupTimeout,
Expand All @@ -109,6 +113,7 @@ public StreamedQueryResource(
) {
this(
ksqlEngine,
ksqlRestConfig,
new StatementParser(ksqlEngine),
commandQueue,
disconnectCheckInterval,
Expand All @@ -130,6 +135,7 @@ public StreamedQueryResource(
StreamedQueryResource(
// CHECKSTYLE_RULES.OFF: ParameterNumberCheck
final KsqlEngine ksqlEngine,
final KsqlRestConfig ksqlRestConfig,
final StatementParser statementParser,
final CommandQueue commandQueue,
final Duration disconnectCheckInterval,
Expand All @@ -145,6 +151,7 @@ public StreamedQueryResource(
final Optional<LocalCommands> localCommands
) {
this.ksqlEngine = Objects.requireNonNull(ksqlEngine, "ksqlEngine");
this.ksqlRestConfig = Objects.requireNonNull(ksqlRestConfig, "ksqlRestConfig");
this.statementParser = Objects.requireNonNull(statementParser, "statementParser");
this.commandQueue = Objects.requireNonNull(commandQueue, "commandQueue");
this.disconnectCheckInterval =
Expand Down Expand Up @@ -353,6 +360,14 @@ private EndpointResponse handlePushQuery(
final ConfiguredStatement<Query> configured = ConfiguredStatement
.of(statement, SessionConfig.of(ksqlConfig, streamsProperties));

if (QueryCapacityUtil.exceedsPushQueryCapacity(ksqlEngine, ksqlRestConfig)) {
QueryCapacityUtil.throwTooManyActivePushQueriesException(
ksqlEngine,
ksqlRestConfig,
statement.getStatementText()
);
}

final TransientQueryMetadata query = ksqlEngine
.executeQuery(serviceContext, configured, false);

Expand Down
Expand Up @@ -16,6 +16,7 @@
package io.confluent.ksql.rest.util;

import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.rest.server.KsqlRestConfig;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;

Expand Down Expand Up @@ -53,4 +54,40 @@ public static void throwTooManyActivePersistentQueriesException(
private static int getQueryLimit(final KsqlConfig ksqlConfig) {
return ksqlConfig.getInt(KsqlConfig.KSQL_ACTIVE_PERSISTENT_QUERY_LIMIT_CONFIG);
}

public static boolean exceedsPushQueryCapacity(
final KsqlExecutionContext executionContext,
final KsqlRestConfig ksqlRestConfig
) {
return getNumLivePushQueries(executionContext) >= getPushQueryLimit(ksqlRestConfig);
}

public static void throwTooManyActivePushQueriesException(
final KsqlExecutionContext executionContext,
final KsqlRestConfig ksqlRestConfig,
final String statementStr
) {
throw new KsqlException(
String.format(
"Not executing statement(s) '%s' as it would cause the number "
+ "of active, push queries to exceed the configured limit. "
+ "Terminate existing PUSH queries, "
+ "or increase the '%s' setting via the 'ksql-server.properties' file. "
+ "Current push query count: %d. Configured limit: %d.",
statementStr,
KsqlRestConfig.MAX_PUSH_QUERIES,
getNumLivePushQueries(executionContext),
getPushQueryLimit(ksqlRestConfig)
)
);
}

private static int getNumLivePushQueries(final KsqlExecutionContext ctx) {
return ctx.getAllLiveQueries().size() - ctx.getPersistentQueries().size();
}

private static int getPushQueryLimit(final KsqlRestConfig ksqlRestConfig) {
return ksqlRestConfig.getInt(KsqlRestConfig.MAX_PUSH_QUERIES);
}

}
Expand Up @@ -15,40 +15,12 @@

package io.confluent.ksql.rest.server.resources.streaming;

import static io.confluent.ksql.GenericRow.genericRow;
import static io.confluent.ksql.rest.Errors.ERROR_CODE_BAD_STATEMENT;
import static io.confluent.ksql.rest.Errors.ERROR_CODE_FORBIDDEN_KAFKA_ACCESS;
import static io.confluent.ksql.rest.Errors.badRequest;
import static io.confluent.ksql.rest.entity.KsqlErrorMessageMatchers.errorCode;
import static io.confluent.ksql.rest.entity.KsqlErrorMessageMatchers.errorMessage;
import static io.confluent.ksql.rest.entity.KsqlStatementErrorMessageMatchers.statement;
import static io.confluent.ksql.rest.server.resources.KsqlRestExceptionMatchers.exceptionErrorMessage;
import static io.confluent.ksql.rest.server.resources.KsqlRestExceptionMatchers.exceptionStatementErrorMessage;
import static io.confluent.ksql.rest.server.resources.KsqlRestExceptionMatchers.exceptionStatusCode;
import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
import static io.netty.handler.codec.http.HttpResponseStatus.SERVICE_UNAVAILABLE;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.RateLimiter;
import io.confluent.ksql.GenericRow;
import static io.confluent.ksql.GenericRow.genericRow;
import io.confluent.ksql.api.server.StreamingOutput;
import io.confluent.ksql.config.SessionConfig;
import io.confluent.ksql.engine.KsqlEngine;
Expand All @@ -71,17 +43,27 @@
import io.confluent.ksql.rest.ApiJsonMapper;
import io.confluent.ksql.rest.EndpointResponse;
import io.confluent.ksql.rest.Errors;
import static io.confluent.ksql.rest.Errors.ERROR_CODE_BAD_STATEMENT;
import static io.confluent.ksql.rest.Errors.ERROR_CODE_FORBIDDEN_KAFKA_ACCESS;
import static io.confluent.ksql.rest.Errors.badRequest;
import io.confluent.ksql.rest.SessionProperties;
import io.confluent.ksql.rest.entity.KsqlEntityList;
import io.confluent.ksql.rest.entity.KsqlErrorMessage;
import static io.confluent.ksql.rest.entity.KsqlErrorMessageMatchers.errorCode;
import static io.confluent.ksql.rest.entity.KsqlErrorMessageMatchers.errorMessage;
import io.confluent.ksql.rest.entity.KsqlMediaType;
import io.confluent.ksql.rest.entity.KsqlRequest;
import io.confluent.ksql.rest.entity.KsqlStatementErrorMessage;
import static io.confluent.ksql.rest.entity.KsqlStatementErrorMessageMatchers.statement;
import io.confluent.ksql.rest.entity.StreamedRow;
import io.confluent.ksql.rest.entity.StreamedRow.DataRow;
import io.confluent.ksql.rest.server.KsqlRestConfig;
import io.confluent.ksql.rest.server.StatementParser;
import io.confluent.ksql.rest.server.computation.CommandQueue;
import io.confluent.ksql.rest.server.resources.KsqlRestException;
import static io.confluent.ksql.rest.server.resources.KsqlRestExceptionMatchers.exceptionErrorMessage;
import static io.confluent.ksql.rest.server.resources.KsqlRestExceptionMatchers.exceptionStatementErrorMessage;
import static io.confluent.ksql.rest.server.resources.KsqlRestExceptionMatchers.exceptionStatusCode;
import io.confluent.ksql.rest.server.validation.CustomValidators;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.types.SqlTypes;
Expand All @@ -97,6 +79,9 @@
import io.confluent.ksql.util.TransientQueryMetadata;
import io.confluent.ksql.util.TransientQueryMetadata.ResultType;
import io.confluent.ksql.version.metrics.ActivenessRegistrar;
import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
import static io.netty.handler.codec.http.HttpResponseStatus.SERVICE_UNAVAILABLE;
import java.io.EOFException;
import java.io.IOException;
import java.io.PipedInputStream;
Expand Down Expand Up @@ -124,11 +109,26 @@
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.codehaus.plexus.util.StringUtils;
import org.hamcrest.CoreMatchers;
import static org.hamcrest.MatcherAssert.assertThat;
import org.hamcrest.Matchers;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import org.mockito.Mock;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
Expand Down Expand Up @@ -187,6 +187,8 @@ public class StreamedQueryResourceTest {
@Mock
private KsqlConfig ksqlConfig;
@Mock
private KsqlRestConfig ksqlRestConfig;
@Mock
private PullQueryResult pullQueryResult;
@Mock
private LogicalSchema schema;
Expand Down Expand Up @@ -218,6 +220,7 @@ public void setup() {

testResource = new StreamedQueryResource(
mockKsqlEngine,
ksqlRestConfig,
mockStatementParser,
commandQueue,
DISCONNECT_CHECK_INTERVAL,
Expand Down Expand Up @@ -301,6 +304,7 @@ public void shouldRateLimit() {

testResource = new StreamedQueryResource(
mockKsqlEngine,
ksqlRestConfig,
mockStatementParser,
commandQueue,
DISCONNECT_CHECK_INTERVAL,
Expand Down Expand Up @@ -347,6 +351,7 @@ public void shouldThrowOnHandleStatementIfNotConfigured() {
// Given:
testResource = new StreamedQueryResource(
mockKsqlEngine,
ksqlRestConfig,
mockStatementParser,
commandQueue,
DISCONNECT_CHECK_INTERVAL,
Expand Down Expand Up @@ -512,6 +517,7 @@ public void shouldThrowOnDenyListedStreamProperty() {
when(mockStatementParser.<Query>parseSingleStatement(PULL_QUERY_STRING)).thenReturn(query);
testResource = new StreamedQueryResource(
mockKsqlEngine,
ksqlRestConfig,
mockStatementParser,
commandQueue,
DISCONNECT_CHECK_INTERVAL,
Expand Down Expand Up @@ -625,6 +631,8 @@ public void shouldStreamRowsCorrectly() throws Throwable {
.of(query, SessionConfig.of(VALID_CONFIG, requestStreamsProperties)), false))
.thenReturn(transientQueryMetadata);

when(ksqlRestConfig.getInt(KsqlRestConfig.MAX_PUSH_QUERIES)).thenReturn(Integer.MAX_VALUE);

final EndpointResponse response =
testResource.streamQuery(
securityContext,
Expand Down