Skip to content

Commit

Permalink
fix: configure KsqlBoundedMemoryConfigSetter in StandaloneExecutor mo…
Browse files Browse the repository at this point in the history
…de (#10175)

We missed to call method configure on the KsqlBoundedMemoryConfigSetter
when ksql is started with --queries-file. That resulted in

java.lang.IllegalStateException: Cannot use KsqlBoundedMemoryRocksDBConfigSetter before it's been configured.

GitHub issue: #10121

Cherry-pick of #10174
  • Loading branch information
cadonna committed Jan 15, 2024
1 parent 15f32da commit 7342d44
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 21 deletions.
7 changes: 7 additions & 0 deletions ksqldb-rest-app/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@
<version>${io.confluent.ksql.version}</version>
</dependency>

<dependency>
<groupId>io.confluent.ksql</groupId>
<artifactId>ksqldb-rocksdb-config-setter</artifactId>
<version>${io.confluent.ksql.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-json-serializer</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -84,7 +85,9 @@ public class StandaloneExecutor implements Executable {
private final boolean failOnNoQueries;
private final VersionCheckerAgent versionChecker;
private final BiFunction<KsqlExecutionContext, ServiceContext, Injector> injectorFactory;
private final Consumer<KsqlConfig> rocksDBConfigSetterHandler;

@SuppressWarnings({"checkstyle:ParameterNumber"})
StandaloneExecutor(
final ServiceContext serviceContext,
final ProcessingLogConfig processingLogConfig,
Expand All @@ -95,7 +98,8 @@ public class StandaloneExecutor implements Executable {
final boolean failOnNoQueries,
final VersionCheckerAgent versionChecker,
final BiFunction<KsqlExecutionContext, ServiceContext, Injector> injectorFactory,
final MetricCollectors metricCollectors
final MetricCollectors metricCollectors,
final Consumer<KsqlConfig> rocksDBConfigSetterHandler
) {
this.serviceContext = requireNonNull(serviceContext, "serviceContext");
this.processingLogConfig = requireNonNull(processingLogConfig, "processingLogConfig");
Expand All @@ -107,6 +111,8 @@ public class StandaloneExecutor implements Executable {
this.versionChecker = requireNonNull(versionChecker, "versionChecker");
this.injectorFactory = requireNonNull(injectorFactory, "injectorFactory");
metricCollectors.addConfigurableReporter(ksqlConfig);
this.rocksDBConfigSetterHandler =
requireNonNull(rocksDBConfigSetterHandler, "rocksDBConfigSetter");
}

public void startAsync() {
Expand All @@ -120,6 +126,7 @@ public void startAsync() {
log.warn("processing log auto-create is enabled, but this is not supported "
+ "for headless mode.");
}
rocksDBConfigSetterHandler.accept(ksqlConfig);
processesQueryFile(readQueriesFile(queriesFile));
showWelcomeMessage();
final Properties properties = new Properties();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.confluent.ksql.rest.server.computation.ConfigStore;
import io.confluent.ksql.rest.server.computation.KafkaConfigStore;
import io.confluent.ksql.rest.util.KsqlInternalTopicUtils;
import io.confluent.ksql.rest.util.RocksDBConfigSetterHandler;
import io.confluent.ksql.services.DisabledKsqlClient;
import io.confluent.ksql.services.KafkaClusterUtil;
import io.confluent.ksql.services.ServiceContext;
Expand All @@ -42,6 +43,7 @@
import java.util.Collections;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

Expand Down Expand Up @@ -69,6 +71,9 @@ public static StandaloneExecutor create(
updatedProperties.putAll(
metricCollectors.addConfluentMetricsContextConfigs(ksqlServerId, kafkaClusterId));

final Consumer<KsqlConfig> rocksDBConfigSetterHandler =
RocksDBConfigSetterHandler::maybeConfigureRocksDBConfigSetter;

return create(
updatedProperties,
queriesFile,
Expand All @@ -77,12 +82,14 @@ public static StandaloneExecutor create(
KafkaConfigStore::new,
KsqlVersionCheckerAgent::new,
StandaloneExecutor::new,
metricCollectors
metricCollectors,
rocksDBConfigSetterHandler
);
}

interface StandaloneExecutorConstructor {

@SuppressWarnings({"checkstyle:ParameterNumber"})
StandaloneExecutor create(
ServiceContext serviceContext,
ProcessingLogConfig processingLogConfig,
Expand All @@ -93,7 +100,8 @@ StandaloneExecutor create(
boolean failOnNoQueries,
VersionCheckerAgent versionChecker,
BiFunction<KsqlExecutionContext, ServiceContext, Injector> injectorFactory,
MetricCollectors metricCollectors
MetricCollectors metricCollectors,
Consumer<KsqlConfig> rocksDBConfigSetterHandler
);
}

Expand All @@ -106,7 +114,8 @@ static StandaloneExecutor create(
final BiFunction<String, KsqlConfig, ConfigStore> configStoreFactory,
final Function<Supplier<Boolean>, VersionCheckerAgent> versionCheckerFactory,
final StandaloneExecutorConstructor constructor,
final MetricCollectors metricCollectors
final MetricCollectors metricCollectors,
final Consumer<KsqlConfig> rocksDBConfigSetterHandler
) {
final KsqlConfig baseConfig = new KsqlConfig(properties);

Expand Down Expand Up @@ -156,7 +165,8 @@ static StandaloneExecutor create(
true,
versionChecker,
Injectors.NO_TOPIC_DELETE,
metricCollectors
metricCollectors,
rocksDBConfigSetterHandler
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.confluent.ksql.metrics.MetricCollectors;
import io.confluent.ksql.rest.server.StandaloneExecutorFactory.StandaloneExecutorConstructor;
import io.confluent.ksql.rest.server.computation.ConfigStore;
import io.confluent.ksql.rest.util.RocksDBConfigSetterHandler;
import io.confluent.ksql.services.KafkaTopicClient;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.KsqlConfig;
Expand Down Expand Up @@ -78,14 +79,15 @@ public void setup() {
when(configStoreFactory.apply(any(), any())).thenReturn(configStore);
when(topicClient.isTopicExists(configTopicName)).thenReturn(false);
when(configStore.getKsqlConfig()).thenReturn(mergedConfig);
when(constructor.create(any(), any(), any(), argumentCaptor.capture(), anyString(), any(), anyBoolean(), any(), any(), any()))
when(constructor.create(
any(), any(), any(), argumentCaptor.capture(), anyString(), any(), anyBoolean(), any(), any(), any(), any()))
.thenReturn(standaloneExecutor);
}

@After
public void tearDown() throws Exception {
verify(constructor)
.create(any(), any(), any(), engineCaptor.capture(), any(), any(), anyBoolean(), any(), any(), any());
.create(any(), any(), any(), engineCaptor.capture(), any(), any(), anyBoolean(), any(), any(), any(), any());

engineCaptor.getAllValues().forEach(KsqlEngine::close);
}
Expand All @@ -99,7 +101,8 @@ private void create() {
configStoreFactory,
activeQuerySupplier -> versionChecker,
constructor,
new MetricCollectors()
new MetricCollectors(),
RocksDBConfigSetterHandler::maybeConfigureRocksDBConfigSetter
);
}

Expand Down Expand Up @@ -137,7 +140,7 @@ public void shouldCreateConfigTopicThenGetConfig() {
inOrder.verify(topicClient).createTopic(eq(configTopicName), anyInt(), anyShort(), anyMap());
inOrder.verify(configStoreFactory).apply(eq(configTopicName), argThat(sameConfig(baseConfig)));
inOrder.verify(constructor).create(
any(), any(), same(mergedConfig), any(), anyString(), any(), anyBoolean(), any(), any(), any());
any(), any(), same(mergedConfig), any(), anyString(), any(), anyBoolean(), any(), any(), any(), any());

argumentCaptor.getValue().close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.confluent.ksql.metrics.MetricCollectors;
import io.confluent.ksql.name.ColumnName;
import io.confluent.ksql.rest.server.computation.KafkaConfigStore;
import io.confluent.ksql.rest.util.RocksDBConfigSetterHandler;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.PhysicalSchema;
import io.confluent.ksql.schema.ksql.SystemColumns;
Expand All @@ -46,9 +47,11 @@
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
import java.util.Map;
import java.util.function.Function;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.streams.StreamsConfig;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
Expand Down Expand Up @@ -103,10 +106,21 @@ public static void classSetUp() {
public void setUp() throws Exception {
queryFile = TMP.newFile().toPath();

s1 = KsqlIdentifierTestUtil.uniqueIdentifierName("S1");
s2 = KsqlIdentifierTestUtil.uniqueIdentifierName("S2");
t1 = KsqlIdentifierTestUtil.uniqueIdentifierName("T1");
}

private void setupStandaloneExecutor() {
setupStandaloneExecutor(Collections.emptyMap());
}

private void setupStandaloneExecutor(final Map<String, Object> additionalProperties) {
final Map<String, Object> properties = ImmutableMap.<String, Object>builder()
.putAll(KsqlConfigTestUtil.baseTestConfig())
.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, TEST_HARNESS.kafkaBootstrapServers())
.put(KsqlConfig.SCHEMA_REGISTRY_URL_PROPERTY, "http://foo:8080")
.putAll(additionalProperties)
.build();

final Function<KsqlConfig, ServiceContext> serviceContextFactory = config ->
Expand All @@ -123,22 +137,54 @@ public void setUp() throws Exception {
KafkaConfigStore::new,
activeQuerySupplier -> versionChecker,
StandaloneExecutor::new,
new MetricCollectors()
new MetricCollectors(),
RocksDBConfigSetterHandler::maybeConfigureRocksDBConfigSetter
);

s1 = KsqlIdentifierTestUtil.uniqueIdentifierName("S1");
s2 = KsqlIdentifierTestUtil.uniqueIdentifierName("S2");
t1 = KsqlIdentifierTestUtil.uniqueIdentifierName("T1");
}

@After
public void tearDown() throws Exception {
standalone.shutdown();
}

@Test
public void shouldConfigureKsqlBoundedMemoryRocksDBConfigSetter() {
// Given:
final Map<String, Object> additionalProperties = ImmutableMap.<String, Object>builder()
.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, "io.confluent.ksql.rocksdb.KsqlBoundedMemoryRocksDBConfigSetter")
.put("ksql.plugins.rocksdb.cache.size", 10)
.build();
setupStandaloneExecutor(additionalProperties);
givenScript(""
+ "CREATE STREAM S (ROWKEY STRING KEY, ORDERTIME BIGINT)"
+ " WITH (kafka_topic='" + JSON_TOPIC + "', value_format='json');\n"
+ "\n"
+ "SET 'auto.offset.reset' = 'earliest';"
+ "\n"
+ "CREATE TABLE " + s1 + " AS SELECT rowkey, LATEST_BY_OFFSET(ordertime) AS ordertime FROM S GROUP BY rowkey;\n");

// When:
standalone.startAsync();

final PhysicalSchema dataSchema = PhysicalSchema.from(
LogicalSchema.builder()
.keyColumn(SystemColumns.ROWKEY_NAME, SqlTypes.STRING)
.valueColumn(ColumnName.of("ORDERTIME"), SqlTypes.BIGINT)
.build(),
SerdeFeatures.of(),
SerdeFeatures.of()
);

// Then:
TEST_HARNESS.verifyAvailableRows(s1, DATA_SIZE, KAFKA, JSON, dataSchema);

standalone.shutdown();
}

@Test
public void shouldHandleJsonWithSchemas() {
// Given:
setupStandaloneExecutor();
givenScript(""
+ "CREATE STREAM S (ROWKEY STRING KEY, ORDERTIME BIGINT)"
+ " WITH (kafka_topic='" + JSON_TOPIC + "', value_format='json');\n"
Expand Down Expand Up @@ -182,6 +228,7 @@ public void shouldHandleJsonWithSchemas() {
@Test
public void shouldHandleAvroWithSchemas() {
// Given:
setupStandaloneExecutor();
givenScript(""
+ "CREATE STREAM S (ROWKEY STRING KEY, ORDERTIME BIGINT)"
+ " WITH (kafka_topic='" + AVRO_TOPIC + "', value_format='avro');\n"
Expand Down Expand Up @@ -225,6 +272,7 @@ public void shouldHandleAvroWithSchemas() {
@Test
public void shouldInferAvroSchema() {
// Given:
setupStandaloneExecutor();
givenScript(""
+ "SET 'auto.offset.reset' = 'earliest';"
+ ""
Expand All @@ -244,6 +292,7 @@ public void shouldFailOnAvroWithoutSchemasIfSchemaNotAvailable() {
// Given:
TEST_HARNESS.ensureTopics("topic-without-schema");

setupStandaloneExecutor();
givenScript(""
+ "SET 'auto.offset.reset' = 'earliest';"
+ ""
Expand All @@ -263,6 +312,7 @@ public void shouldFailOnAvroWithoutSchemasIfSchemaNotAvailable() {
@Test
public void shouldHandleComments() {
// Given:
setupStandaloneExecutor();
givenScript(""
+ "-- Single line comment\n"
+ ""
Expand Down

0 comments on commit 7342d44

Please sign in to comment.