From a0be1d582b5cc27d01de98ab745d0559e9ad5ff9 Mon Sep 17 00:00:00 2001 From: Alan Sheinberg <57688982+AlanConfluent@users.noreply.github.com> Date: Wed, 1 Sep 2021 13:18:17 -0700 Subject: [PATCH] fix: Removes config check to insert SPQ Processor (#8062) * fix: Adds a new config check to insert SPQ Processor --- .../java/io/confluent/ksql/util/KsqlConfig.java | 14 ++++++++++++++ .../physical/scalablepush/ProcessingQueue.java | 2 +- .../io/confluent/ksql/query/QueryExecutor.java | 2 +- .../ksql/test/rest/RestQueryTranslationTest.java | 1 + .../ksql/rest/integration/RestApiTest.java | 1 + .../ScalablePushQueryFunctionalTest.java | 2 ++ 6 files changed, 20 insertions(+), 2 deletions(-) diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java b/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java index 458628db31da..8ac667ff3e1c 100644 --- a/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java +++ b/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java @@ -285,6 +285,13 @@ public class KsqlConfig extends AbstractConfig { + "functions, aggregations, or joins, but may include projections and filters."; public static final boolean KSQL_QUERY_PUSH_SCALABLE_ENABLED_DEFAULT = false; + public static final String KSQL_QUERY_PUSH_SCALABLE_REGISTRY_INSTALLED + = "ksql.query.push.scalable.registry.installed"; + public static final String KSQL_QUERY_PUSH_SCALABLE_REGISTRY_INSTALLED_DOC = + "Enables whether scalable push registry should be installed. This is a requirement of " + + "enabling scalable push queries using ksql.query.push.scalable.enabled."; + public static final boolean KSQL_QUERY_PUSH_SCALABLE_REGISTRY_INSTALLED_DEFAULT = false; + public static final String KSQL_QUERY_PUSH_SCALABLE_NEW_NODE_CONTINUITY = "ksql.query.push.scalable.new.node.continuity"; public static final String KSQL_QUERY_PUSH_SCALABLE_NEW_NODE_CONTINUITY_DOC = @@ -923,6 +930,13 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) { Importance.LOW, KSQL_QUERY_PUSH_SCALABLE_ENABLED_DOC ) + .define( + KSQL_QUERY_PUSH_SCALABLE_REGISTRY_INSTALLED, + Type.BOOLEAN, + KSQL_QUERY_PUSH_SCALABLE_REGISTRY_INSTALLED_DEFAULT, + Importance.LOW, + KSQL_QUERY_PUSH_SCALABLE_REGISTRY_INSTALLED_DOC + ) .define( KSQL_QUERY_PUSH_SCALABLE_NEW_NODE_CONTINUITY, Type.BOOLEAN, diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/ProcessingQueue.java b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/ProcessingQueue.java index c345235e3edf..3e982db07992 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/ProcessingQueue.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/ProcessingQueue.java @@ -30,7 +30,7 @@ */ public class ProcessingQueue { - static final int BLOCKING_QUEUE_CAPACITY = 100; + static final int BLOCKING_QUEUE_CAPACITY = 1000; private final Deque rowQueue; private final QueryId queryId; diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java b/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java index 1815fcbceda7..ff916677af3e 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java @@ -225,7 +225,7 @@ private static Optional applyScalablePushProcessor( final Map streamsProperties, final KsqlConfig ksqlConfig ) { - if (!ksqlConfig.getBoolean(KsqlConfig.KSQL_QUERY_PUSH_SCALABLE_ENABLED)) { + if (!ksqlConfig.getBoolean(KsqlConfig.KSQL_QUERY_PUSH_SCALABLE_REGISTRY_INSTALLED)) { return Optional.empty(); } final KStream stream; diff --git a/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/rest/RestQueryTranslationTest.java b/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/rest/RestQueryTranslationTest.java index 018204d938cd..653d12bf6650 100644 --- a/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/rest/RestQueryTranslationTest.java +++ b/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/rest/RestQueryTranslationTest.java @@ -102,6 +102,7 @@ public class RestQueryTranslationTest { .withProperty(KsqlConfig.SCHEMA_REGISTRY_URL_PROPERTY, "set") .withProperty(KsqlConfig.KSQL_QUERY_PULL_TABLE_SCAN_ENABLED, true) .withProperty(KsqlConfig.KSQL_QUERY_PULL_INTERPRETER_ENABLED, true) + .withProperty(KsqlConfig.KSQL_QUERY_PUSH_SCALABLE_REGISTRY_INSTALLED, true) .withProperty(KsqlConfig.KSQL_QUERY_PUSH_SCALABLE_ENABLED, true) .withStaticServiceContext(TEST_HARNESS::getServiceContext) .build(); diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/RestApiTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/RestApiTest.java index 4d9cf0045e60..fa5d29427aa5 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/RestApiTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/RestApiTest.java @@ -215,6 +215,7 @@ public class RestApiTest { .withProperty("sasl.mechanism", "PLAIN") .withProperty("sasl.jaas.config", SecureKafkaHelper.buildJaasConfig(NORMAL_USER)) .withProperties(ClientTrustStore.trustStoreProps()) + .withProperty(KsqlConfig.KSQL_QUERY_PUSH_SCALABLE_REGISTRY_INSTALLED, true) .withProperty(KsqlConfig.KSQL_QUERY_PUSH_SCALABLE_ENABLED, true) .build(); diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/ScalablePushQueryFunctionalTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/ScalablePushQueryFunctionalTest.java index 856838a6b17b..fbcc99dd0fae 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/ScalablePushQueryFunctionalTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/ScalablePushQueryFunctionalTest.java @@ -98,6 +98,7 @@ public class ScalablePushQueryFunctionalTest { .withProperty(KsqlRestConfig.LISTENERS_CONFIG, "http://localhost:8088") .withProperty(KsqlRestConfig.ADVERTISED_LISTENER_CONFIG, "http://localhost:8088") .withProperty(KsqlConfig.KSQL_QUERY_PULL_ENABLE_STANDBY_READS, true) + .withProperty(KsqlConfig.KSQL_QUERY_PUSH_SCALABLE_REGISTRY_INSTALLED, true) .withProperty(KsqlConfig.KSQL_QUERY_PUSH_SCALABLE_ENABLED, true) // Make rebalances happen quicker for the sake of the test .withProperty(KSQL_STREAMS_PREFIX + "max.poll.interval.ms", 5000) @@ -112,6 +113,7 @@ public class ScalablePushQueryFunctionalTest { .withProperty(KsqlRestConfig.LISTENERS_CONFIG, "http://localhost:8089") .withProperty(KsqlRestConfig.ADVERTISED_LISTENER_CONFIG, "http://localhost:8089") .withProperty(KsqlConfig.KSQL_QUERY_PULL_ENABLE_STANDBY_READS, true) + .withProperty(KsqlConfig.KSQL_QUERY_PUSH_SCALABLE_REGISTRY_INSTALLED, true) .withProperty(KsqlConfig.KSQL_QUERY_PUSH_SCALABLE_ENABLED, true) // Make rebalances happen quicker for the sake of the test .withProperty(KSQL_STREAMS_PREFIX + "max.poll.interval.ms", 5000)