Skip to content

Commit

Permalink
fix: dont throw persistent query limit errors for non-queries (#9588)
Browse files Browse the repository at this point in the history
* fix: dont throw persistent query limit errors for non-queries

* remove unused mocks

* increase number of persistent queries

* add a comment

* update test
  • Loading branch information
Zara Lim committed Oct 3, 2022
1 parent 17b2a04 commit f4375f1
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 11 deletions.
Expand Up @@ -126,16 +126,17 @@ public int validate(
SessionConfig.of(ksqlConfig, sessionProperties.getMutableScopedProperties())
);

numPersistentQueries +=
validate(
serviceContext,
configured,
sessionProperties,
ctx,
injector
);

if (QueryCapacityUtil.exceedsPersistentQueryCapacity(ctx, ksqlConfig)) {
final int currNumPersistentQueries = validate(
serviceContext,
configured,
sessionProperties,
ctx,
injector
);
numPersistentQueries += currNumPersistentQueries;

if (currNumPersistentQueries > 0
&& QueryCapacityUtil.exceedsPersistentQueryCapacity(ctx, ksqlConfig)) {
QueryCapacityUtil.throwTooManyActivePersistentQueriesException(ctx, ksqlConfig, sql);
}
}
Expand Down
Expand Up @@ -1932,6 +1932,29 @@ public void shouldFailAllCommandsIfWouldReachActivePersistentQueriesLimit() {
verify(commandStore, never()).enqueueCommand(any(), any(), any());
}

@Test
public void shouldRejectQueryButAcceptNonQueryWhenKsqlRestartsWithLowerQueryLimit() {
// Given 6 queries already running:
givenPersistentQueryCount(6);

// When we restart ksql with a lower persistent query count
givenKsqlConfigWith(
ImmutableMap.of(KsqlConfig.KSQL_ACTIVE_PERSISTENT_QUERY_LIMIT_CONFIG, 3));
givenMockEngine();

// When/Then:
makeSingleRequest("SHOW STREAMS;", StreamsList.class);

// No further queries can be made
final KsqlErrorMessage result = makeFailingRequest(
"CREATE STREAM " + streamName + " AS SELECT * FROM test_stream;", BAD_REQUEST.code());
assertThat(result.getErrorCode(), is(Errors.ERROR_CODE_BAD_REQUEST));
assertThat(result.getMessage(),
containsString("would cause the number of active, persistent queries "
+ "to exceed the configured limit"));
verify(commandStore, never()).enqueueCommand(any(), any(), any());
}

@Test
public void shouldListPropertiesWithOverrides() {
// Given:
Expand Down
Expand Up @@ -47,6 +47,7 @@
import io.confluent.ksql.parser.KsqlParser.ParsedStatement;
import io.confluent.ksql.parser.tree.CreateStream;
import io.confluent.ksql.parser.tree.Explain;
import io.confluent.ksql.parser.tree.ListStreams;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.rest.SessionProperties;
import io.confluent.ksql.rest.server.computation.ValidatedCommandFactory;
Expand Down Expand Up @@ -257,6 +258,21 @@ public void shouldThrowIfTooManyPersistentQueries() {
"persistent queries to exceed the configured limit"));
}

@Test
public void shouldNotThrowIfNotQueryDespiteTooManyPersistentQueries() {
// Given:
givenPersistentQueryCount(2);
givenRequestValidator(ImmutableMap.of(ListStreams.class, StatementValidator.NO_VALIDATION));

final List<ParsedStatement> statements =
givenParsed(
"SHOW STREAMS;"
);

// When/Then:
validator.validate(serviceContext, statements, sessionProperties, "sql");
}

@Test
public void shouldNotThrowIfManyNonPersistentQueries() {
// Given:
Expand All @@ -265,7 +281,6 @@ public void shouldNotThrowIfManyNonPersistentQueries() {
CreateStream.class, StatementValidator.NO_VALIDATION,
Explain.class, StatementValidator.NO_VALIDATION)
);
when(ksqlConfig.getInt(KsqlConfig.KSQL_ACTIVE_PERSISTENT_QUERY_LIMIT_CONFIG)).thenReturn(1);

final List<ParsedStatement> statements =
givenParsed(
Expand Down

0 comments on commit f4375f1

Please sign in to comment.