Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: limit the number of active push queries everywhere using "ksql.max.push.queries" config #7109

Merged
merged 8 commits into from Mar 10, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -33,9 +33,11 @@
import io.confluent.ksql.physical.pull.HARouting;
import io.confluent.ksql.physical.pull.PullQueryResult;
import io.confluent.ksql.query.BlockingRowQueue;
import io.confluent.ksql.rest.server.KsqlRestConfig;
import io.confluent.ksql.rest.server.LocalCommands;
import io.confluent.ksql.rest.server.resources.streaming.PullQueryConfigPlannerOptions;
import io.confluent.ksql.rest.server.resources.streaming.PullQueryConfigRoutingOptions;
import io.confluent.ksql.rest.util.QueryCapacityUtil;
import io.confluent.ksql.schema.ksql.Column;
import io.confluent.ksql.schema.utils.FormatOptions;
import io.confluent.ksql.services.ServiceContext;
Expand All @@ -60,6 +62,7 @@ public class QueryEndpoint {

private final KsqlEngine ksqlEngine;
private final KsqlConfig ksqlConfig;
private final KsqlRestConfig ksqlRestConfig;
private final RoutingFilterFactory routingFilterFactory;
private final Optional<PullQueryExecutorMetrics> pullQueryMetrics;
private final RateLimiter rateLimiter;
Expand All @@ -77,6 +80,7 @@ public QueryEndpoint(
) {
this.ksqlEngine = ksqlEngine;
this.ksqlConfig = ksqlConfig;
this.ksqlRestConfig = new KsqlRestConfig(ksqlConfig.originals());
cprasad1 marked this conversation as resolved.
Show resolved Hide resolved
this.routingFilterFactory = routingFilterFactory;
this.pullQueryMetrics = pullQueryMetrics;
this.rateLimiter = rateLimiter;
Expand Down Expand Up @@ -113,6 +117,14 @@ private QueryPublisher createPushQueryPublisher(
) {
final BlockingQueryPublisher publisher = new BlockingQueryPublisher(context, workerExecutor);

if (QueryCapacityUtil.exceedsPushQueryCapacity(ksqlEngine, ksqlRestConfig)) {
QueryCapacityUtil.throwTooManyActivePushQueriesException(
ksqlEngine,
ksqlRestConfig,
statement.getStatementText()
);
}

final TransientQueryMetadata queryMetadata = ksqlEngine
.executeQuery(serviceContext, statement, true);

Expand Down
Expand Up @@ -38,12 +38,14 @@
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.entity.KsqlMediaType;
import io.confluent.ksql.rest.entity.KsqlRequest;
import io.confluent.ksql.rest.server.KsqlRestConfig;
import io.confluent.ksql.rest.server.LocalCommands;
import io.confluent.ksql.rest.server.StatementParser;
import io.confluent.ksql.rest.server.computation.CommandQueue;
import io.confluent.ksql.rest.server.resources.KsqlConfigurable;
import io.confluent.ksql.rest.server.resources.KsqlRestException;
import io.confluent.ksql.rest.util.CommandStoreUtil;
import io.confluent.ksql.rest.util.QueryCapacityUtil;
import io.confluent.ksql.security.KsqlAuthorizationValidator;
import io.confluent.ksql.security.KsqlSecurityContext;
import io.confluent.ksql.services.ServiceContext;
Expand Down Expand Up @@ -90,6 +92,7 @@ public class StreamedQueryResource implements KsqlConfigurable {
private final Optional<LocalCommands> localCommands;

private KsqlConfig ksqlConfig;
private KsqlRestConfig ksqlRestConfig;

@SuppressWarnings("checkstyle:ParameterNumber")
public StreamedQueryResource(
Expand Down Expand Up @@ -172,6 +175,7 @@ public void configure(final KsqlConfig config) {
}

ksqlConfig = config;
ksqlRestConfig = new KsqlRestConfig(ksqlConfig.originals());
cprasad1 marked this conversation as resolved.
Show resolved Hide resolved
}

public EndpointResponse streamQuery(
Expand Down Expand Up @@ -353,6 +357,14 @@ private EndpointResponse handlePushQuery(
final ConfiguredStatement<Query> configured = ConfiguredStatement
.of(statement, SessionConfig.of(ksqlConfig, streamsProperties));

if (QueryCapacityUtil.exceedsPushQueryCapacity(ksqlEngine, ksqlRestConfig)) {
QueryCapacityUtil.throwTooManyActivePushQueriesException(
ksqlEngine,
ksqlRestConfig,
statement.getStatementText()
);
}

final TransientQueryMetadata query = ksqlEngine
.executeQuery(serviceContext, configured, false);

Expand Down
Expand Up @@ -16,6 +16,7 @@
package io.confluent.ksql.rest.util;

import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.rest.server.KsqlRestConfig;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;

Expand Down Expand Up @@ -53,4 +54,40 @@ public static void throwTooManyActivePersistentQueriesException(
private static int getQueryLimit(final KsqlConfig ksqlConfig) {
return ksqlConfig.getInt(KsqlConfig.KSQL_ACTIVE_PERSISTENT_QUERY_LIMIT_CONFIG);
}

public static boolean exceedsPushQueryCapacity(
final KsqlExecutionContext executionContext,
final KsqlRestConfig ksqlRestConfig
) {
return getNumLivePushQueries(executionContext) >= getPushQueryLimit(ksqlRestConfig);
}

public static void throwTooManyActivePushQueriesException(
final KsqlExecutionContext executionContext,
final KsqlRestConfig ksqlRestConfig,
final String statementStr
) {
throw new KsqlException(
String.format(
"Not executing statement(s) '%s' as it would cause the number "
+ "of active, push queries to exceed the configured limit. "
+ "Terminate existing PUSH queries, "
+ "or increase the '%s' setting via the 'ksql-server.properties' file. "
+ "Current push query count: %d. Configured limit: %d.",
statementStr,
KsqlRestConfig.MAX_PUSH_QUERIES,
getNumLivePushQueries(executionContext),
getPushQueryLimit(ksqlRestConfig)
)
);
}

private static int getNumLivePushQueries(final KsqlExecutionContext ctx) {
return ctx.getAllLiveQueries().size() - ctx.getPersistentQueries().size();
}

private static int getPushQueryLimit(final KsqlRestConfig ksqlRestConfig) {
return ksqlRestConfig.getInt(KsqlRestConfig.MAX_PUSH_QUERIES);
}

}
Expand Up @@ -15,21 +15,22 @@

package io.confluent.ksql.rest.util;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertThrows;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.rest.server.KsqlRestConfig;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.QueryMetadata;
import java.util.List;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertThrows;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
Expand All @@ -39,6 +40,8 @@ public class QueryCapacityUtilTest {
private KsqlEngine ksqlEngine;
@Mock
private KsqlConfig ksqlConfig;
@Mock
private KsqlRestConfig ksqlRestConfig;

@Test
public void shouldReportCapacityExceededIfOverLimit() {
Expand Down Expand Up @@ -86,6 +89,54 @@ public void shouldThrowWhenAsked() {
+ "Current persistent query count: 3. Configured limit: 2."));
}

@Test
public void shouldReportPushQueryCapacityExceededIfOverLimit() {
// Given:
givenAllLiveQueries(10);
givenActivePersistentQueries(4);
givenPushQueryLimit(3);

// Then:
assertThat(QueryCapacityUtil.exceedsPushQueryCapacity(ksqlEngine, ksqlRestConfig),
equalTo(true));
}

@Test
public void shouldReportPushQueryAtCapacityLimit() {
// Given:
givenAllLiveQueries(10);
givenActivePersistentQueries(4);
givenPushQueryLimit(6);

// Then:
assertThat(QueryCapacityUtil.exceedsPushQueryCapacity(ksqlEngine, ksqlRestConfig),
equalTo(true));
}

@Test
public void shouldThrowWhenPushQueryLimitExceeded() {
// Given:
final String statementStr = "my statement";
givenAllLiveQueries(10);
givenActivePersistentQueries(4);
givenPushQueryLimit(3);

// When:
final KsqlException e = assertThrows(
KsqlException.class,
() -> QueryCapacityUtil.throwTooManyActivePushQueriesException(ksqlEngine, ksqlRestConfig, statementStr)
);

// Then:
assertThat(e.getMessage(), containsString(
"Not executing statement(s) 'my statement' as it would cause the number "
+ "of active, push queries to exceed the configured limit. "
+ "Terminate existing PUSH queries, "
+ "or increase the 'ksql.max.push.queries' setting "
+ "via the 'ksql-server.properties' file. "
+ "Current push query count: 6. Configured limit: 3."));
}

@SuppressWarnings("unchecked")
private void givenActivePersistentQueries(final int numQueries) {
final List<PersistentQueryMetadata> queries = mock(List.class);
Expand All @@ -98,4 +149,15 @@ private void givenQueryLimit(final int queryLimit) {
when(ksqlConfig.getInt(KsqlConfig.KSQL_ACTIVE_PERSISTENT_QUERY_LIMIT_CONFIG))
.thenReturn(queryLimit);
}

private void givenAllLiveQueries(final int numLiveQueries) {
final List<QueryMetadata> queries = mock(List.class);
when(queries.size()).thenReturn(numLiveQueries);
when(ksqlEngine.getAllLiveQueries()).thenReturn(queries);
}

private void givenPushQueryLimit(final int pushQueryLimit) {
when(ksqlRestConfig.getInt(KsqlRestConfig.MAX_PUSH_QUERIES))
.thenReturn(pushQueryLimit);
}
}