diff --git a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientIntegrationTest.java b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientIntegrationTest.java index 5c402aecbe9..ab3a8067e40 100644 --- a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientIntegrationTest.java +++ b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientIntegrationTest.java @@ -972,7 +972,7 @@ public void shouldListQueries() throws ExecutionException, InterruptedException hasProperty("queryType", is(QueryType.PERSISTENT)), hasProperty("id", startsWith("CTAS_" + AGG_TABLE)), hasProperty("sql", is( - "CREATE TABLE " + AGG_TABLE + " WITH (KAFKA_TOPIC='" + AGG_TABLE + "', PARTITIONS=1, REPLICAS=1) AS SELECT\n" + "CREATE TABLE " + AGG_TABLE + " WITH (KAFKA_TOPIC='" + AGG_TABLE + "', PARTITIONS=1, REPLICAS=1, RETENTION_MS=-1) AS SELECT\n" + " " + TEST_STREAM + ".K K,\n" + " LATEST_BY_OFFSET(" + TEST_STREAM + ".LONG) LONG\n" + "FROM " + TEST_STREAM + " " + TEST_STREAM + "\n" @@ -1005,7 +1005,7 @@ public void shouldDescribeSource() throws Exception { assertThat(description.readQueries().get(0).getQueryType(), is(QueryType.PERSISTENT)); assertThat(description.readQueries().get(0).getId(), startsWith("CTAS_" + AGG_TABLE)); assertThat(description.readQueries().get(0).getSql(), is( - "CREATE TABLE " + AGG_TABLE + " WITH (KAFKA_TOPIC='" + AGG_TABLE + "', PARTITIONS=1, REPLICAS=1) AS SELECT\n" + "CREATE TABLE " + AGG_TABLE + " WITH (KAFKA_TOPIC='" + AGG_TABLE + "', PARTITIONS=1, REPLICAS=1, RETENTION_MS=-1) AS SELECT\n" + " " + TEST_STREAM + ".K K,\n" + " LATEST_BY_OFFSET(" + TEST_STREAM + ".LONG) LONG\n" + "FROM " + TEST_STREAM + " " + TEST_STREAM + "\n" diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/properties/with/CommonCreateConfigs.java b/ksqldb-common/src/main/java/io/confluent/ksql/properties/with/CommonCreateConfigs.java index a855823c73d..d3cd3a1c6d4 100644 --- a/ksqldb-common/src/main/java/io/confluent/ksql/properties/with/CommonCreateConfigs.java +++ b/ksqldb-common/src/main/java/io/confluent/ksql/properties/with/CommonCreateConfigs.java @@ -32,6 +32,7 @@ public final class CommonCreateConfigs { public static final String KAFKA_TOPIC_NAME_PROPERTY = "KAFKA_TOPIC"; public static final String SOURCE_NUMBER_OF_PARTITIONS = "PARTITIONS"; public static final String SOURCE_NUMBER_OF_REPLICAS = "REPLICAS"; + public static final String SOURCE_TOPIC_RETENTION_IN_MS = "RETENTION_MS"; // Timestamp Props: public static final String TIMESTAMP_NAME_PROPERTY = "TIMESTAMP"; @@ -86,6 +87,14 @@ public static void addToConfigDef( + "Kafka cluster configuration for replicas will be used for creating a new " + "topic." ) + .define( + SOURCE_TOPIC_RETENTION_IN_MS, + ConfigDef.Type.LONG, + null, + Importance.MEDIUM, + "The retention in milliseconds in the backing topic. If this property is" + + "not set then the default value of 7 days will be used for creating a new topic." + ) .define( VALUE_FORMAT_PROPERTY, ConfigDef.Type.STRING, diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/services/KafkaTopicClientImpl.java b/ksqldb-engine/src/main/java/io/confluent/ksql/services/KafkaTopicClientImpl.java index 2dc2b5b0073..81e6d2569c4 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/services/KafkaTopicClientImpl.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/services/KafkaTopicClientImpl.java @@ -34,6 +34,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -97,8 +98,10 @@ public void createTopic( final Map configs, final CreateTopicsOptions createOptions ) { + final Optional retentionMs = KafkaTopicClient.getRetentionMs(configs); + if (isTopicExists(topic)) { - validateTopicProperties(topic, numPartitions, replicationFactor); + validateTopicProperties(topic, numPartitions, replicationFactor, retentionMs); return; } @@ -128,9 +131,9 @@ public void createTopic( } catch (final TopicExistsException e) { // if the topic already exists, it is most likely because another node just created it. - // ensure that it matches the partition count and replication factor before returning - // success - validateTopicProperties(topic, numPartitions, replicationFactor); + // ensure that it matches the partition count, replication factor, and retention + // before returning success + validateTopicProperties(topic, numPartitions, replicationFactor, retentionMs); } catch (final TopicAuthorizationException e) { throw new KsqlTopicAuthorizationException( @@ -383,14 +386,22 @@ private static boolean isInternalTopic(final String topicName, final String appl private void validateTopicProperties( final String topic, final int requiredNumPartition, - final int requiredNumReplicas + final int requiredNumReplicas, + final Optional requiredRetentionMs ) { final TopicDescription existingTopic = describeTopic(topic); + final Map existingConfig = getTopicConfig(topic); TopicValidationUtil - .validateTopicProperties(requiredNumPartition, requiredNumReplicas, existingTopic); + .validateTopicProperties( + requiredNumPartition, + requiredNumReplicas, + requiredRetentionMs, + existingTopic, + existingConfig); LOG.debug( - "Did not create topic {} with {} partitions and replication-factor {} since it exists", - topic, requiredNumPartition, requiredNumReplicas); + "Did not create topic {} with {} partitions, replication-factor {}, " + + "and retention {} since it exists", + topic, requiredNumPartition, requiredNumReplicas, requiredRetentionMs); } private Map topicConfig( diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/services/SandboxedKafkaTopicClient.java b/ksqldb-engine/src/main/java/io/confluent/ksql/services/SandboxedKafkaTopicClient.java index 66fb57628d0..c72bfa77e70 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/services/SandboxedKafkaTopicClient.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/services/SandboxedKafkaTopicClient.java @@ -30,6 +30,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -61,6 +62,7 @@ static KafkaTopicClient createProxy(final KafkaTopicClient delegate, methodParams(String.class, int.class, short.class, Map.class), sandbox) .forward("isTopicExists", methodParams(String.class), sandbox) .forward("describeTopic", methodParams(String.class), sandbox) + .forward("getTopicConfig", methodParams(String.class), sandbox) .forward("describeTopics", methodParams(Collection.class), sandbox) .forward("deleteTopics", methodParams(Collection.class), sandbox) .forward("listTopicsStartOffsets", methodParams(Collection.class), sandbox) @@ -74,6 +76,7 @@ static KafkaTopicClient createProxy(final KafkaTopicClient delegate, private final Supplier adminClient; private final Map createdTopics = new HashMap<>(); + private final Map> createdTopicsConfig = new HashMap<>(); private SandboxedKafkaTopicClient(final KafkaTopicClient delegate, final Supplier sharedAdminClient) { @@ -96,7 +99,8 @@ private void createTopic( final Map configs ) { if (isTopicExists(topic)) { - validateTopicProperties(topic, numPartitions, replicationFactor); + final Optional retentionMs = KafkaTopicClient.getRetentionMs(configs); + validateTopicProperties(topic, numPartitions, replicationFactor, retentionMs); return; } @@ -125,6 +129,8 @@ private void createTopic( partitions, Sets.newHashSet(AclOperation.READ, AclOperation.WRITE) )); + + createdTopicsConfig.put(topic, toStringConfigs(configs)); } private short getDefaultClusterReplication() { @@ -170,6 +176,13 @@ private Map describeTopics(final Collection to return descriptions; } + public Map getTopicConfig(final String topicName) { + if (createdTopicsConfig.containsKey(topicName)) { + return createdTopicsConfig.get(topicName); + } + return delegate.getTopicConfig(topicName); + } + private void deleteTopics(final Collection topicsToDelete) { topicsToDelete.forEach(createdTopics::remove); } @@ -177,11 +190,18 @@ private void deleteTopics(final Collection topicsToDelete) { private void validateTopicProperties( final String topic, final int requiredNumPartition, - final int requiredNumReplicas + final int requiredNumReplicas, + final Optional requiredRetentionMs ) { final TopicDescription existingTopic = describeTopic(topic); + final Map existingConfig = getTopicConfig(topic); TopicValidationUtil - .validateTopicProperties(requiredNumPartition, requiredNumReplicas, existingTopic); + .validateTopicProperties( + requiredNumPartition, + requiredNumReplicas, + requiredRetentionMs, + existingTopic, + existingConfig); } private Map listTopicsStartOffsets(final Collection topics) { @@ -191,4 +211,9 @@ private Map listTopicsStartOffsets(final Collection listTopicsEndOffsets(final Collection topics) { return delegate.listTopicsEndOffsets(topics); } + + private static Map toStringConfigs(final Map configs) { + return configs.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString())); + } } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/services/TopicValidationUtil.java b/ksqldb-engine/src/main/java/io/confluent/ksql/services/TopicValidationUtil.java index 97f5898b55c..807a306bcf3 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/services/TopicValidationUtil.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/services/TopicValidationUtil.java @@ -17,6 +17,8 @@ import io.confluent.ksql.exception.KafkaTopicExistsException; import io.confluent.ksql.topic.TopicProperties; +import java.util.Map; +import java.util.Optional; import org.apache.kafka.clients.admin.TopicDescription; final class TopicValidationUtil { @@ -28,40 +30,73 @@ private TopicValidationUtil() { public static void validateTopicProperties( final int requiredNumPartition, final int requiredNumReplicas, - final TopicDescription existingTopic + final Optional requiredRetentionMs, + final TopicDescription existingTopic, + final Map existingConfig ) { final int actualNumPartitions = existingTopic.partitions().size(); final int actualNumReplicas = existingTopic.partitions().get(0).replicas().size(); + final Optional actualRetentionMs = KafkaTopicClient.getRetentionMs(existingConfig); final String topicName = existingTopic.name(); validateTopicProperties( topicName, requiredNumPartition, requiredNumReplicas, + requiredRetentionMs, actualNumPartitions, - actualNumReplicas); + actualNumReplicas, + actualRetentionMs); } public static void validateTopicProperties( final String topicName, final int requiredNumPartition, final int requiredNumReplicas, + final Optional requiredRetentionMs, final int actualNumPartitions, - final int actualNumReplicas + final int actualNumReplicas, + final Optional actualRetentionMs ) { - if (actualNumPartitions != requiredNumPartition - || (requiredNumReplicas != TopicProperties.DEFAULT_REPLICAS - && actualNumReplicas < requiredNumReplicas)) { - throw new KafkaTopicExistsException(String.format( - "A Kafka topic with the name '%s' already exists, with different partition/replica " - + "configuration than required. KSQL expects %d partitions (topic has %d), and %d " - + "replication factor (topic has %d).", + if (isInvalidPartitions(actualNumPartitions, requiredNumPartition) + || isInvalidReplicas(actualNumReplicas, requiredNumReplicas) + || isInvalidRetention(actualRetentionMs, requiredRetentionMs)) { + String errMsg = String.format( + "A Kafka topic with the name '%s' already exists, with different partition/replica" + + " configuration than required. KSQL expects %d partitions (topic has %d)," + + " %d replication factor (topic has %d)", topicName, requiredNumPartition, actualNumPartitions, requiredNumReplicas, - actualNumReplicas - ), true); + actualNumReplicas); + if (requiredRetentionMs.isPresent() && actualRetentionMs.isPresent()) { + errMsg = errMsg.replace("partition/replica", "partition/replica/retention"); + errMsg = String.format(errMsg + ", and %d retention (topic has %d).", + requiredRetentionMs.get(), actualRetentionMs.get()); + } else { + errMsg += "."; + } + throw new KafkaTopicExistsException(errMsg, true); } } + + private static boolean isInvalidPartitions(final int actualNumPartitions, + final int requiredNumPartition) { + return actualNumPartitions != requiredNumPartition; + } + + private static boolean isInvalidReplicas(final int actualNumReplicas, + final int requiredNumReplicas) { + return requiredNumReplicas != TopicProperties.DEFAULT_REPLICAS + && actualNumReplicas < requiredNumReplicas; + } + + private static boolean isInvalidRetention(final Optional actualRetentionMs, + final Optional requiredRetentionMs) { + return requiredRetentionMs.isPresent() + && actualRetentionMs.isPresent() + && actualRetentionMs.get().longValue() != requiredRetentionMs.get().longValue(); + } + } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/topic/TopicCreateInjector.java b/ksqldb-engine/src/main/java/io/confluent/ksql/topic/TopicCreateInjector.java index f9825f7121b..f13be80fae2 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/topic/TopicCreateInjector.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/topic/TopicCreateInjector.java @@ -103,10 +103,16 @@ private ConfiguredStatement injectForCreateSource( final CreateSource createSource = statement.getStatement(); final CreateSourceProperties properties = createSource.getProperties(); + final String topicCleanUpPolicy = createSource instanceof CreateTable + ? TopicConfig.CLEANUP_POLICY_COMPACT : TopicConfig.CLEANUP_POLICY_DELETE; + final String topicName = properties.getKafkaTopic(); if (topicClient.isTopicExists(topicName)) { - topicPropertiesBuilder.withSource(() -> topicClient.describeTopic(topicName)); + topicPropertiesBuilder + .withSource( + () -> topicClient.describeTopic(topicName), + () -> topicClient.getTopicConfig(topicName)); } else if (!properties.getPartitions().isPresent()) { final CreateSource example = createSource.copyWith( createSource.getElements(), @@ -119,15 +125,15 @@ private ConfiguredStatement injectForCreateSource( + "For example: " + SqlFormatter.formatSql(example)); } + throwIfRetentionPresentForTable(topicCleanUpPolicy, properties.getRetentionInMillis()); + topicPropertiesBuilder .withName(topicName) .withWithClause( Optional.of(properties.getKafkaTopic()), properties.getPartitions(), - properties.getReplicas()); - - final String topicCleanUpPolicy = createSource instanceof CreateTable - ? TopicConfig.CLEANUP_POLICY_COMPACT : TopicConfig.CLEANUP_POLICY_DELETE; + properties.getReplicas(), + properties.getRetentionInMillis()); createTopic(topicPropertiesBuilder, topicCleanUpPolicy); @@ -151,11 +157,14 @@ private ConfiguredStatement injectForCreateAsSelec topicPropertiesBuilder .withName(prefix + createAsSelect.getName().text()) - .withSource(() -> topicClient.describeTopic(sourceTopicName)) + .withSource( + () -> topicClient.describeTopic(sourceTopicName), + () -> topicClient.getTopicConfig(sourceTopicName)) .withWithClause( properties.getKafkaTopic(), properties.getPartitions(), - properties.getReplicas()); + properties.getReplicas(), + properties.getRetentionInMillis()); final String topicCleanUpPolicy; final Map additionalTopicConfigs = new HashMap<>(); @@ -176,13 +185,17 @@ private ConfiguredStatement injectForCreateAsSelec } } + throwIfRetentionPresentForTable(topicCleanUpPolicy, properties.getRetentionInMillis()); + final TopicProperties info = createTopic(topicPropertiesBuilder, topicCleanUpPolicy, additionalTopicConfigs); final T withTopic = (T) createAsSelect.copyWith(properties.withTopic( info.getTopicName(), info.getPartitions(), - info.getReplicas() + info.getReplicas(), + (Long) additionalTopicConfigs + .getOrDefault(TopicConfig.RETENTION_MS_CONFIG, info.getRetentionInMillis()) )); final String withTopicText = SqlFormatter.formatSql(withTopic) + ";"; @@ -190,6 +203,18 @@ private ConfiguredStatement injectForCreateAsSelec return statement.withStatement(withTopicText, withTopic); } + private void throwIfRetentionPresentForTable( + final String topicCleanUpPolicy, + final Optional retentionInMillis + ) { + if (topicCleanUpPolicy.equals(TopicConfig.CLEANUP_POLICY_COMPACT) + && retentionInMillis.isPresent()) { + throw new KsqlException( + "Invalid config variable in the WITH clause: RETENTION_MS." + + " Non-windowed tables do not support retention."); + } + } + private TopicProperties createTopic( final Builder topicPropertiesBuilder, final String topicCleanUpPolicy @@ -206,8 +231,28 @@ private TopicProperties createTopic( final Map config = new HashMap<>(); config.put(TopicConfig.CLEANUP_POLICY_CONFIG, topicCleanUpPolicy); + + // Set the retention.ms as max(RETENTION_MS, RETENTION) + if (additionalTopicConfigs.containsKey(TopicConfig.RETENTION_MS_CONFIG)) { + config.put( + TopicConfig.RETENTION_MS_CONFIG, + Math.max( + info.getRetentionInMillis(), + (Long) additionalTopicConfigs.get(TopicConfig.RETENTION_MS_CONFIG)) + ); + additionalTopicConfigs.remove(TopicConfig.RETENTION_MS_CONFIG); + } else { + config.put(TopicConfig.RETENTION_MS_CONFIG, info.getRetentionInMillis()); + } + config.putAll(additionalTopicConfigs); + // Note: The retention.ms config has no effect if cleanup.policy=compact + // config is set for topics that are backed by tables + if (topicCleanUpPolicy.equals(TopicConfig.CLEANUP_POLICY_COMPACT)) { + config.remove(TopicConfig.RETENTION_MS_CONFIG); + } + topicClient.createTopic(info.getTopicName(), info.getPartitions(), info.getReplicas(), config); return info; diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/topic/TopicProperties.java b/ksqldb-engine/src/main/java/io/confluent/ksql/topic/TopicProperties.java index 354a07eae36..c2c134cd3b8 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/topic/TopicProperties.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/topic/TopicProperties.java @@ -18,6 +18,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Suppliers; import io.confluent.ksql.util.KsqlException; +import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.function.Supplier; @@ -25,6 +26,7 @@ import org.apache.commons.lang3.ObjectUtils; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.common.config.TopicConfig; /** * A container for all properties required for creating/validating @@ -33,23 +35,25 @@ public final class TopicProperties { public static final short DEFAULT_REPLICAS = -1; - private static final String INVALID_TOPIC_NAME = ":INVALID:"; private static final int INVALID_PARTITIONS = -1; + private static final long DEFAULT_RETENTION_IN_MS = 604800000L; // 7 days private final String topicName; private final Integer partitions; private final Short replicas; + private final Long retentionMs; @VisibleForTesting TopicProperties( final String topicName, final Integer partitions, - final Short replicas - ) { + final Short replicas, + final Long retentionMs) { this.topicName = topicName; this.partitions = partitions; this.replicas = replicas; + this.retentionMs = retentionMs; } @Override @@ -57,6 +61,7 @@ public String toString() { return "TopicProperties{" + "topicName='" + getTopicName() + '\'' + ", partitions=" + getPartitions() + ", replicas=" + getReplicas() + + ", retentionMs=" + getRetentionInMillis() + '}'; } @@ -72,6 +77,10 @@ public short getReplicas() { return replicas == null ? DEFAULT_REPLICAS : replicas; } + public long getRetentionInMillis() { + return retentionMs == null ? DEFAULT_RETENTION_IN_MS : retentionMs; + } + /** * Constructs a {@link TopicProperties} with the following precedence order: * @@ -92,10 +101,10 @@ public short getReplicas() { public static final class Builder { private String name; - private TopicProperties fromWithClause = new TopicProperties(null, null, null); - private final TopicProperties fromOverrides = new TopicProperties(null, null, null); - private final TopicProperties fromKsqlConfig = new TopicProperties(null, null, null); - private Supplier fromSource = () -> new TopicProperties(null, null, null); + private TopicProperties fromWithClause = new TopicProperties(null, null, null, null); + private final TopicProperties fromOverrides = new TopicProperties(null, null, null, null); + private final TopicProperties fromKsqlConfig = new TopicProperties(null, null, null, null); + private Supplier fromSource = () -> new TopicProperties(null, null, null,null); Builder withName(final String name) { this.name = name; @@ -105,23 +114,31 @@ Builder withName(final String name) { Builder withWithClause( final Optional name, final Optional partitionCount, - final Optional replicationFactor + final Optional replicationFactor, + final Optional retentionMs ) { fromWithClause = new TopicProperties( name.orElse(null), partitionCount.orElse(null), - replicationFactor.orElse(null) - ); + replicationFactor.orElse(null), + retentionMs.orElse(null)); return this; } - Builder withSource(final Supplier descriptionSupplier) { + Builder withSource(final Supplier descriptionSupplier, + final Supplier> configsSupplier) { fromSource = Suppliers.memoize(() -> { final TopicDescription description = descriptionSupplier.get(); final Integer partitions = description.partitions().size(); final Short replicas = (short) description.partitions().get(0).replicas().size(); - return new TopicProperties(null, partitions, replicas); + final Map configs = configsSupplier.get(); + final Long retentionMs = Long.parseLong( + String.valueOf(configs.getOrDefault( + TopicConfig.RETENTION_MS_CONFIG, String.valueOf(DEFAULT_RETENTION_IN_MS))) + ); + + return new TopicProperties(null, partitions, replicas, retentionMs); }); return this; } @@ -154,7 +171,15 @@ public TopicProperties build() { .findFirst() .orElseGet(() -> fromSource.get().replicas); - return new TopicProperties(name, partitions, replicas); + final Long retentionMs = Stream.of( + fromWithClause.retentionMs, + fromOverrides.retentionMs, + fromKsqlConfig.retentionMs) + .filter(Objects::nonNull) + .findFirst() + .orElseGet(() -> fromSource.get().retentionMs); + + return new TopicProperties(name, partitions, replicas, retentionMs); } } } diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTest.java index 58ed1f4f633..47d46ad561c 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTest.java @@ -103,6 +103,7 @@ import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; +import org.apache.kafka.common.config.TopicConfig; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.common.config.ConfigException; @@ -2059,7 +2060,8 @@ public void shouldNotUpdateMetaStoreDuringTryExecute() { @Test public void shouldNotCreateAnyTopicsDuringTryExecute() { // Given: - topicClient.preconditionTopicExists("s1_topic", 1, (short) 1, Collections.emptyMap()); + Map configs = ImmutableMap.of(TopicConfig.RETENTION_MS_CONFIG, 604800000L); + topicClient.preconditionTopicExists("s1_topic", 1, (short) 1, configs); final List statements = parse( "CREATE STREAM S1 (COL1 BIGINT) WITH (KAFKA_TOPIC = 's1_topic', KEY_FORMAT = 'KAFKA', VALUE_FORMAT = 'JSON');" diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/execution/ExecutionPlanBuilderTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/execution/ExecutionPlanBuilderTest.java index 81fe42d530a..f9ed9f49efe 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/execution/ExecutionPlanBuilderTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/execution/ExecutionPlanBuilderTest.java @@ -53,6 +53,8 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Map; +import org.apache.kafka.common.config.TopicConfig; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -373,8 +375,9 @@ private TransientQueryMetadata executeQuery(final String sql) { } private void givenKafkaTopicsExist(final String... names) { + final Map config = ImmutableMap.of(TopicConfig.RETENTION_MS_CONFIG, 5000L); Arrays.stream(names).forEach(name -> - kafkaTopicClient.createTopic(name, 1, (short) 1, Collections.emptyMap()) + kafkaTopicClient.createTopic(name, 1, (short) 1, config) ); } } diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/integration/IntegrationTestHarness.java b/ksqldb-engine/src/test/java/io/confluent/ksql/integration/IntegrationTestHarness.java index 63fa250af6f..b561e006cb4 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/integration/IntegrationTestHarness.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/integration/IntegrationTestHarness.java @@ -68,6 +68,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.config.TopicConfig; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.Deserializer; @@ -156,11 +157,13 @@ public void ensureTopics(final String... topicNames) { */ public void ensureTopics(final int partitionCount, final String... topicNames) { final KafkaTopicClient topicClient = serviceContext.get().getTopicClient(); + final Map config = ImmutableMap.of( + TopicConfig.RETENTION_MS_CONFIG, "-1"); Arrays.stream(topicNames) .filter(name -> !topicClient.isTopicExists(name)) .forEach(name -> - topicClient.createTopic(name, partitionCount, DEFAULT_REPLICATION_FACTOR)); + topicClient.createTopic(name, partitionCount, DEFAULT_REPLICATION_FACTOR, config)); } /** diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/services/FakeKafkaTopicClient.java b/ksqldb-engine/src/test/java/io/confluent/ksql/services/FakeKafkaTopicClient.java index c7505b3a7de..f1afceab3db 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/services/FakeKafkaTopicClient.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/services/FakeKafkaTopicClient.java @@ -18,6 +18,7 @@ import static org.apache.kafka.common.config.TopicConfig.CLEANUP_POLICY_COMPACT; import static org.apache.kafka.common.config.TopicConfig.CLEANUP_POLICY_CONFIG; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Sets; import io.confluent.ksql.topic.TopicProperties; import java.util.Collection; @@ -26,6 +27,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; @@ -37,6 +39,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartitionInfo; import org.apache.kafka.common.acl.AclOperation; +import org.apache.kafka.common.config.TopicConfig; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; /** @@ -103,12 +106,15 @@ public int hashCode() { } private final Map topicMap = new HashMap<>(); + private final Map> topicMapConfig = new HashMap<>(); private final Map createdTopics = new HashMap<>(); + private final Map> createdTopicsConfig = new HashMap<>(); public void preconditionTopicExists( final String topic ) { - preconditionTopicExists(topic, 1, 1, Collections.emptyMap()); + Map configs = ImmutableMap.of(TopicConfig.RETENTION_MS_CONFIG, 604800000L); + preconditionTopicExists(topic, 1, 1, configs); } public void preconditionTopicExists( @@ -116,7 +122,9 @@ public void preconditionTopicExists( final int numPartitions, final int replicationFactor, final Map configs) { - topicMap.put(topic, createFakeTopic(topic, numPartitions, replicationFactor, configs)); + final FakeTopic info = createFakeTopic(topic, numPartitions, replicationFactor, configs); + topicMap.put(topic, info); + topicMapConfig.put(topic, configs); } @Override @@ -133,13 +141,14 @@ public void createTopic( final FakeTopic existing = topicMap.get(topic); if (existing != null) { - validateTopicProperties(numPartitions, replicas, existing); + validateTopicProperties(numPartitions, replicas, existing, configs, getTopicConfig(topic)); return; } final FakeTopic info = createFakeTopic(topic, numPartitions, replicas, configs); topicMap.put(topic, info); createdTopics.put(topic, info); + createdTopicsConfig.put(topic, configs); } public Map createdTopics() { @@ -174,7 +183,7 @@ public TopicDescription describeTopic(final String topicName) { @Override public Map getTopicConfig(final String topicName) { - return Collections.emptyMap(); + return (Map) createdTopicsConfig.getOrDefault(topicName, topicMapConfig.get(topicName)); } @Override @@ -221,13 +230,20 @@ private static FakeTopic createFakeTopic( private static void validateTopicProperties( final int requiredNumPartition, final int requiredNumReplicas, - final FakeTopic existing + final FakeTopic existing, + final Map config, + final Map existingConfig ) { + final Optional requiredRetentionMs = KafkaTopicClient.getRetentionMs(config); + final Optional actualRetentionMs = KafkaTopicClient.getRetentionMs(existingConfig); TopicValidationUtil.validateTopicProperties( existing.topicName, requiredNumPartition, requiredNumReplicas, + requiredRetentionMs, existing.numPartitions, - existing.replicationFactor); + existing.replicationFactor, + actualRetentionMs); } + } diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/services/KafkaTopicClientImplIntegrationTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/services/KafkaTopicClientImplIntegrationTest.java index 32ef04d810e..87693f0d9e4 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/services/KafkaTopicClientImplIntegrationTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/services/KafkaTopicClientImplIntegrationTest.java @@ -165,27 +165,13 @@ public void shouldDeleteTopics() { assertThat(client.isTopicExists(testTopic), is(false)); } - @Test - public void shouldCreateTopic() { - // Given: - final String topicName = UUID.randomUUID().toString(); - - // When: - client.createTopic(topicName, 3, (short) 1); - - // Then: - assertThatEventually(() -> topicExists(topicName), is(true)); - final TopicDescription topicDescription = getTopicDescription(topicName); - assertThat(topicDescription.partitions(), hasSize(3)); - assertThat(topicDescription.partitions().get(0).replicas(), hasSize(1)); - } - @Test public void shouldCreateTopicWithConfig() { // Given: final String topicName = UUID.randomUUID().toString(); final Map config = ImmutableMap.of( - TopicConfig.COMPRESSION_TYPE_CONFIG, "snappy"); + TopicConfig.COMPRESSION_TYPE_CONFIG, "snappy", + TopicConfig.RETENTION_MS_CONFIG, "5000"); // When: client.createTopic(topicName, 2, (short) 1, config); @@ -197,21 +183,26 @@ public void shouldCreateTopicWithConfig() { assertThat(topicDescription.partitions().get(0).replicas(), hasSize(1)); final Map configs = client.getTopicConfig(topicName); assertThat(configs.get(TopicConfig.COMPRESSION_TYPE_CONFIG), is("snappy")); + assertThat(configs.get(TopicConfig.RETENTION_MS_CONFIG), is("5000")); } @Test public void shouldCreateTopicWithDefaultReplicationFactor() { // Given: final String topicName = UUID.randomUUID().toString(); + final Map config = ImmutableMap.of( + TopicConfig.RETENTION_MS_CONFIG, "5000"); // When: - client.createTopic(topicName, 2, TopicProperties.DEFAULT_REPLICAS); + client.createTopic(topicName, 2, TopicProperties.DEFAULT_REPLICAS, config); // Then: assertThatEventually(() -> topicExists(topicName), is(true)); final TopicDescription topicDescription = getTopicDescription(topicName); assertThat(topicDescription.partitions(), hasSize(2)); assertThat(topicDescription.partitions().get(0).replicas(), hasSize(1)); + final Map configs = client.getTopicConfig(topicName); + assertThat(configs.get(TopicConfig.RETENTION_MS_CONFIG), is("5000")); } @Test diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/services/KafkaTopicClientImplTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/services/KafkaTopicClientImplTest.java index d99675ee309..255f33d725f 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/services/KafkaTopicClientImplTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/services/KafkaTopicClientImplTest.java @@ -101,6 +101,8 @@ public class KafkaTopicClientImplTest { private final Map> topicPartitionInfo = new HashMap<>(); private final Map topicConfigs = new HashMap<>(); + private final Map configs = ImmutableMap.of(TopicConfig.RETENTION_MS_CONFIG, 8640000000L); + private KafkaTopicClient kafkaTopicClient; @SuppressWarnings({"deprecation", "unchecked"}) @@ -122,12 +124,25 @@ public void setUp() { @Test public void shouldCreateTopic() { + // When: + kafkaTopicClient.createTopic("someTopic", 1, (short) 2, configs); + + // Then: + verify(adminClient).createTopics( + eq(ImmutableSet.of(newTopic("someTopic", 1, 2, configs))), + argThat(createOptions -> !createOptions.shouldValidateOnly()) + ); + } + + @Test + public void shouldCreateTopicWithEmptyConfigs() { + Map configs = ImmutableMap.of(); // When: kafkaTopicClient.createTopic("someTopic", 1, (short) 2); // Then: verify(adminClient).createTopics( - eq(ImmutableSet.of(newTopic("someTopic", 1, 2))), + eq(ImmutableSet.of(newTopic("someTopic", 1, 2, configs))), argThat(createOptions -> !createOptions.shouldValidateOnly()) ); } @@ -136,9 +151,13 @@ public void shouldCreateTopic() { public void shouldNotCreateTopicIfItAlreadyExistsWithMatchingDetails() { // Given: givenTopicExists("someTopic", 3, 2); + givenTopicConfigs( + "someTopic", + overriddenConfigEntry(TopicConfig.RETENTION_MS_CONFIG, "8640000000") + ); // When: - kafkaTopicClient.createTopic("someTopic", 3, (short) 2); + kafkaTopicClient.createTopic("someTopic", 3, (short) 2, configs); // Then: verify(adminClient, never()).createTopics(any(), any()); @@ -148,9 +167,13 @@ public void shouldNotCreateTopicIfItAlreadyExistsWithMatchingDetails() { public void shouldNotCreateTopicIfItAlreadyExistsWithDefaultRf() { // Given: givenTopicExists("someTopic", 1, 2); + givenTopicConfigs( + "someTopic", + overriddenConfigEntry(TopicConfig.RETENTION_MS_CONFIG, "8640000000") + ); // When: - kafkaTopicClient.createTopic("someTopic", 1, (short) -1); + kafkaTopicClient.createTopic("someTopic", 1, (short) -1, configs); // Then: verify(adminClient, never()).createTopics(any(), any()); @@ -160,16 +183,41 @@ public void shouldNotCreateTopicIfItAlreadyExistsWithDefaultRf() { public void shouldThrowFromCreateTopicIfExistingHasDifferentReplicationFactor() { // Given: givenTopicExists("someTopic", 1, 1); + givenTopicConfigs( + "someTopic", + overriddenConfigEntry(TopicConfig.RETENTION_MS_CONFIG, "8640000000") + ); // When: final Exception e = assertThrows( KafkaTopicExistsException.class, - () -> kafkaTopicClient.createTopic("someTopic", 1, (short) 2) + () -> kafkaTopicClient.createTopic("someTopic", 1, (short) 2, configs) ); // Then: assertThat(e.getMessage(), containsString( - "and 2 replication factor (topic has 1)")); + ", 2 replication factor (topic has 1)")); + } + + @Test + public void shouldThrowFromCreateTopicIfExistingHasDifferentRetentionMs() { + // Given: + givenTopicExists("someTopic", 1, 1); + givenTopicConfigs( + "someTopic", + overriddenConfigEntry(TopicConfig.RETENTION_MS_CONFIG, "8640000000") + ); + + // When: + Map newConfigs = ImmutableMap.of(TopicConfig.RETENTION_MS_CONFIG, 1000); + final Exception e = assertThrows( + KafkaTopicExistsException.class, + () -> kafkaTopicClient.createTopic("someTopic", 1, (short) 1, newConfigs) + ); + + // Then: + assertThat(e.getMessage(), containsString( + "and 1000 retention (topic has 8640000000).")); } @Test @@ -181,7 +229,7 @@ public void shouldThrowFromCreateTopicIfNoAclsSet() { // When: final Exception e = assertThrows( KsqlTopicAuthorizationException.class, - () -> kafkaTopicClient.createTopic("someTopic", 1, (short) 2) + () -> kafkaTopicClient.createTopic("someTopic", 1, (short) 2, configs) ); // Then: @@ -193,6 +241,10 @@ public void shouldThrowFromCreateTopicIfNoAclsSet() { public void shouldRetryDescribeTopicDuringCreateTopicOnRetryableException() { // Given: givenTopicExists("topicName", 1, 2); + givenTopicConfigs( + "topicName", + overriddenConfigEntry(TopicConfig.RETENTION_MS_CONFIG, "8640000000") + ); when(adminClient.describeTopics(anyCollection(), any())) .thenAnswer(describeTopicsResult()) // checks that topic exists @@ -200,7 +252,7 @@ public void shouldRetryDescribeTopicDuringCreateTopicOnRetryableException() { .thenAnswer(describeTopicsResult()); // succeeds the third time // When: - kafkaTopicClient.createTopic("topicName", 1, (short) 2); + kafkaTopicClient.createTopic("topicName", 1, (short) 2, configs); // Then: verify(adminClient, times(3)).describeTopics(anyCollection(), any()); @@ -208,12 +260,18 @@ public void shouldRetryDescribeTopicDuringCreateTopicOnRetryableException() { @Test public void shouldValidateCreateTopic() { + // Given + givenTopicConfigs( + "topicA", + overriddenConfigEntry(TopicConfig.RETENTION_MS_CONFIG, "8640000000") + ); + // When: - kafkaTopicClient.validateCreateTopic("topicA", 2, (short) 1); + kafkaTopicClient.validateCreateTopic("topicA", 2, (short) 1, configs); // Then: verify(adminClient).createTopics( - eq(ImmutableSet.of(newTopic("topicA", 2, 1))), + eq(ImmutableSet.of(newTopic("topicA", 2, 1, configs))), argThat(CreateTopicsOptions::shouldValidateOnly) ); } @@ -222,9 +280,13 @@ public void shouldValidateCreateTopic() { public void shouldNotValidateCreateTopicIfItAlreadyExistsWithMatchingDetails() { // Given: givenTopicExists("someTopic", 1, 2); + givenTopicConfigs( + "someTopic", + overriddenConfigEntry(TopicConfig.RETENTION_MS_CONFIG, "8640000000") + ); // When: - kafkaTopicClient.validateCreateTopic("someTopic", 1, (short) 2); + kafkaTopicClient.validateCreateTopic("someTopic", 1, (short) 2, configs); // Then: verify(adminClient, never()).createTopics(any(), any()); @@ -234,16 +296,20 @@ public void shouldNotValidateCreateTopicIfItAlreadyExistsWithMatchingDetails() { public void shouldThrowFromValidateCreateTopicIfExistingHasDifferentReplicationFactor() { // Given: givenTopicExists("someTopic", 1, 1); + givenTopicConfigs( + "someTopic", + overriddenConfigEntry(TopicConfig.RETENTION_MS_CONFIG, "8640000000") + ); // When: final Exception e = assertThrows( KafkaTopicExistsException.class, - () -> kafkaTopicClient.validateCreateTopic("someTopic", 1, (short) 2) + () -> kafkaTopicClient.validateCreateTopic("someTopic", 1, (short) 2, configs) ); // Then: assertThat(e.getMessage(), containsString( - "and 2 replication factor (topic has 1)")); + ", 2 replication factor (topic has 1)")); } @Test @@ -255,7 +321,7 @@ public void shouldThrowFromValidateCreateTopicIfNoAclsSet() { // When: final Exception e = assertThrows( KsqlTopicAuthorizationException.class, - () -> kafkaTopicClient.validateCreateTopic("someTopic", 1, (short) 2) + () -> kafkaTopicClient.validateCreateTopic("someTopic", 1, (short) 2, configs) ); // Then: @@ -267,9 +333,13 @@ public void shouldThrowFromValidateCreateTopicIfNoAclsSet() { public void shouldNotValidateCreateTopicIfItAlreadyExistsWithDefaultRf() { // Given: givenTopicExists("someTopic", 1, 2); + givenTopicConfigs( + "someTopic", + overriddenConfigEntry(TopicConfig.RETENTION_MS_CONFIG, "8640000000") + ); // When: - kafkaTopicClient.validateCreateTopic("someTopic", 1, (short) -1); + kafkaTopicClient.validateCreateTopic("someTopic", 1, (short) -1, configs); // Then: verify(adminClient, never()).createTopics(any(), any()); @@ -279,6 +349,10 @@ public void shouldNotValidateCreateTopicIfItAlreadyExistsWithDefaultRf() { public void shouldRetryDescribeTopicDuringValidateCreateTopicOnRetryableException() { // Given: givenTopicExists("topicName", 1, 2); + givenTopicConfigs( + "topicName", + overriddenConfigEntry(TopicConfig.RETENTION_MS_CONFIG, "8640000000") + ); when(adminClient.describeTopics(anyCollection(), any())) .thenAnswer(describeTopicsResult()) // checks that topic exists @@ -286,7 +360,7 @@ public void shouldRetryDescribeTopicDuringValidateCreateTopicOnRetryableExceptio .thenAnswer(describeTopicsResult()); // succeeds the third time // When: - kafkaTopicClient.validateCreateTopic("topicName", 1, (short) 2); + kafkaTopicClient.validateCreateTopic("topicName", 1, (short) 2, configs); // Then: verify(adminClient, times(3)).describeTopics(anyCollection(), any()); @@ -648,7 +722,9 @@ public void shouldHandleRetryableGetTopicConfigError() { @Test public void shouldSetTopicCleanupPolicyToCompact() { // Given: - final Map configs = ImmutableMap.of("cleanup.policy", "compact"); + final Map configs = ImmutableMap.of( + "cleanup.policy", "compact", + TopicConfig.RETENTION_MS_CONFIG, "5000"); // When: kafkaTopicClient.createTopic("topic-name", 1, (short) 2, configs); @@ -1101,10 +1177,6 @@ private static KafkaFuture failedFuture(final Exception cause) { } } - private static NewTopic newTopic(final String name, final int partitions, final int rf) { - return newTopic(name, partitions, rf, ImmutableMap.of()); - } - private static NewTopic newTopic( final String name, final int partitions, diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/services/SandboxedKafkaTopicClientTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/services/SandboxedKafkaTopicClientTest.java index 4380afddfaf..2c7297ebecd 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/services/SandboxedKafkaTopicClientTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/services/SandboxedKafkaTopicClientTest.java @@ -20,6 +20,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThrows; @@ -53,13 +54,13 @@ import org.apache.kafka.clients.admin.DescribeClusterResult; import org.apache.kafka.clients.admin.DescribeConfigsResult; import org.apache.kafka.clients.admin.TopicDescription; -import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartitionInfo; import org.apache.kafka.common.acl.AclOperation; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.config.ConfigResource.Type; +import org.apache.kafka.common.config.TopicConfig; import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.internals.KafkaFutureImpl; import org.junit.Before; @@ -87,6 +88,7 @@ public static Collection> getMethodsToTest() { .ignore("createTopic", String.class, int.class, short.class, Map.class) .ignore("isTopicExists", String.class) .ignore("describeTopic", String.class) + .ignore("getTopicConfig", String.class) .ignore("describeTopics", Collection.class) .ignore("deleteTopics", Collection.class) .ignore("listTopicsStartOffsets", Collection.class) @@ -124,7 +126,9 @@ public static class SupportedMethods { private Admin mockedAdmin; private KafkaTopicClient sandboxedClient; - private final Map configs = ImmutableMap.of("some config", 1); + private final Map configs = ImmutableMap.of( + "some config", 1, + TopicConfig.RETENTION_MS_CONFIG, 8640000000L); @Before public void setUp() { @@ -150,6 +154,8 @@ public void shouldTrackCreatedTopicsWithConfig() { // Then: assertThat(sandboxedClient.isTopicExists("some topic"), is(true)); + assertThat(sandboxedClient.getTopicConfig("some topic").entrySet(), + equalTo(toStringConfigs(configs).entrySet())); } @SuppressFBWarnings("RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT") @@ -300,7 +306,7 @@ public void shouldThrowOnCreateIfTopicPreviouslyCreatedInScopeWithDifferentParti // Then: assertThat(e.getMessage(), containsString("A Kafka topic with the name 'some topic' already " - + "exists, with different partition/replica configuration than required")); + + "exists, with different partition/replica/retention configuration than required")); } @Test @@ -316,7 +322,25 @@ public void shouldThrowOnCreateIfTopicPreviouslyCreatedInScopeWithDifferentRepli // Then: assertThat(e.getMessage(), containsString("A Kafka topic with the name 'some topic' already " - + "exists, with different partition/replica configuration than required")); + + "exists, with different partition/replica/retention configuration than required")); + } + + @Test + public void shouldThrowOnCreateIfTopicPreviouslyCreatedInScopeWithDifferentRetentionMs() { + // Given: + sandboxedClient.createTopic("some topic", 2, (short) 3, configs); + + // When: + final Map newConfigs = ImmutableMap.of( + TopicConfig.RETENTION_MS_CONFIG, 5000L); + final KafkaTopicExistsException e = assertThrows( + KafkaTopicExistsException.class, + () -> sandboxedClient.createTopic("some topic", 2, (short) 3, newConfigs) + ); + + // Then: + assertThat(e.getMessage(), containsString("A Kafka topic with the name 'some topic' already " + + "exists, with different partition/replica/retention configuration than required")); } @Test @@ -332,7 +356,7 @@ public void shouldThrowOnCreateIfTopicAlreadyExistsWithDifferentPartitionCount() // Then: assertThat(e.getMessage(), containsString("A Kafka topic with the name 'some topic' already " - + "exists, with different partition/replica configuration than required")); + + "exists, with different partition/replica/retention configuration than required")); } @Test @@ -348,7 +372,25 @@ public void shouldThrowOnCreateIfTopicAlreadyExistsWithDifferentReplicaCount() { // Then: assertThat(e.getMessage(), containsString("A Kafka topic with the name 'some topic' already " - + "exists, with different partition/replica configuration than required")); + + "exists, with different partition/replica/retention configuration than required")); + } + + @Test + public void shouldThrowOnCreateIfTopicAlreadyExistsWithDifferentRetentionMs() { + // Given: + givenTopicExists("some topic", 2, 1); + + // When: + final Map newConfigs = ImmutableMap.of( + TopicConfig.RETENTION_MS_CONFIG, 5000L); + final KafkaTopicExistsException e = assertThrows( + KafkaTopicExistsException.class, + () -> sandboxedClient.createTopic("some topic", 2, (short) 1, newConfigs) + ); + + // Then: + assertThat(e.getMessage(), containsString("A Kafka topic with the name 'some topic' already " + + "exists, with different partition/replica/retention configuration than required")); } @Test @@ -422,6 +464,7 @@ private void givenTopicExists( .thenReturn(Collections.singletonMap( topic, new TopicDescription(topic, false, topicPartitions(numPartitions, numReplicas)))); + when(delegate.getTopicConfig(topic)).thenReturn((Map) configs); } private static List topicPartitions( @@ -439,5 +482,10 @@ private static List topicPartitions( return builder.build(); } + + private static Map toStringConfigs(final Map configs) { + return configs.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString())); + } } } \ No newline at end of file diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/topic/TopicCreateInjectorTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/topic/TopicCreateInjectorTest.java index f149c94897a..c7322d4c096 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/topic/TopicCreateInjectorTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/topic/TopicCreateInjectorTest.java @@ -138,9 +138,9 @@ public void setUp() { when(topicClient.describeTopic("source")).thenReturn(sourceDescription); when(topicClient.isTopicExists("source")).thenReturn(true); when(builder.withName(any())).thenReturn(builder); - when(builder.withWithClause(any(), any(), any())).thenReturn(builder); - when(builder.withSource(any())).thenReturn(builder); - when(builder.build()).thenReturn(new TopicProperties("name", 1, (short) 1)); + when(builder.withWithClause(any(), any(), any(), any())).thenReturn(builder); + when(builder.withSource(any(), any())).thenReturn(builder); + when(builder.build()).thenReturn(new TopicProperties("name", 1, (short) 1, (long) 100)); } @Test @@ -225,14 +225,15 @@ public void shouldPassThroughWithClauseToBuilderForCreateAs() { verify(builder).withWithClause( props.getKafkaTopic(), props.getPartitions(), - props.getReplicas() + props.getReplicas(), + props.getRetentionInMillis() ); } @Test public void shouldPassThroughWithClauseToBuilderForCreate() { // Given: - givenStatement("CREATE STREAM x (FOO VARCHAR) WITH(value_format='avro', kafka_topic='topic', partitions=2);"); + givenStatement("CREATE STREAM x (FOO VARCHAR) WITH(value_format='avro', kafka_topic='topic', partitions=2, retention_ms=5000);"); final CreateSourceProperties props = ((CreateSource) statement.getStatement()) .getProperties(); @@ -245,8 +246,10 @@ public void shouldPassThroughWithClauseToBuilderForCreate() { verify(builder).withWithClause( Optional.of(props.getKafkaTopic()), props.getPartitions(), - props.getReplicas() + props.getReplicas(), + props.getRetentionInMillis() ); + } @Test @@ -258,7 +261,7 @@ public void shouldNotUseSourceTopicForCreateMissingTopic() { injector.inject(statement, builder); // Then: - verify(builder, never()).withSource(any()); + verify(builder, never()).withSource(any(), any()); } @Test @@ -270,7 +273,7 @@ public void shouldUseSourceTopicForCreateExistingTopic() { injector.inject(statement, builder); // Then: - verify(builder).withSource(argThat(supplierThatGets(sourceDescription))); + verify(builder).withSource(argThat(supplierThatGets(sourceDescription)), any(Supplier.class)); } @Test @@ -282,7 +285,7 @@ public void shouldIdentifyAndUseCorrectSource() { injector.inject(statement, builder); // Then: - verify(builder).withSource(argThat(supplierThatGets(sourceDescription))); + verify(builder).withSource(argThat(supplierThatGets(sourceDescription)), any(Supplier.class)); } @Test @@ -295,7 +298,7 @@ public void shouldIdentifyAndUseCorrectSourceInJoin() { injector.inject(statement, builder); // Then: - verify(builder).withSource(argThat(supplierThatGets(sourceDescription))); + verify(builder).withSource(argThat(supplierThatGets(sourceDescription)), any(Supplier.class)); } @SuppressWarnings("unchecked") @@ -304,7 +307,7 @@ public void shouldBuildWithClauseWithTopicProperties() { // Given: givenStatement("CREATE STREAM x WITH (kafka_topic='topic') AS SELECT * FROM SOURCE;"); - when(builder.build()).thenReturn(new TopicProperties("expectedName", 10, (short) 10)); + when(builder.build()).thenReturn(new TopicProperties("expectedName", 10, (short) 10, (long) 5000)); // When: final ConfiguredStatement result = @@ -315,6 +318,7 @@ public void shouldBuildWithClauseWithTopicProperties() { assertThat(props.getKafkaTopic(), is(Optional.of("expectedName"))); assertThat(props.getPartitions(), is(Optional.of(10))); assertThat(props.getReplicas(), is(Optional.of((short) 10))); + assertThat(props.getRetentionInMillis(), is(Optional.of((long) 5000))); } @Test @@ -328,7 +332,7 @@ public void shouldUpdateStatementText() { // Then: assertThat(result.getMaskedStatementText(), equalTo( - "CREATE STREAM X WITH (KAFKA_TOPIC='name', PARTITIONS=1, REPLICAS=1) AS SELECT *" + "CREATE STREAM X WITH (KAFKA_TOPIC='name', PARTITIONS=1, REPLICAS=1, RETENTION_MS=100) AS SELECT *" + "\nFROM SOURCE SOURCE\n" + "EMIT CHANGES;")); } @@ -337,7 +341,7 @@ public void shouldUpdateStatementText() { public void shouldCreateMissingTopic() { // Given: givenStatement("CREATE STREAM x WITH (kafka_topic='topic') AS SELECT * FROM SOURCE;"); - when(builder.build()).thenReturn(new TopicProperties("expectedName", 10, (short) 10)); + when(builder.build()).thenReturn(new TopicProperties("expectedName", 10, (short) 10, (long) 100)); // When: injector.inject(statement, builder); @@ -347,14 +351,16 @@ public void shouldCreateMissingTopic() { "expectedName", 10, (short) 10, - ImmutableMap.of(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE)); + ImmutableMap.of( + TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE, + TopicConfig.RETENTION_MS_CONFIG, 100L)); } @Test public void shouldCreateMissingTopicForCreate() { // Given: givenStatement("CREATE STREAM x WITH (kafka_topic='topic') AS SELECT * FROM SOURCE;"); - when(builder.build()).thenReturn(new TopicProperties("expectedName", 10, (short) 10)); + when(builder.build()).thenReturn(new TopicProperties("expectedName", 10, (short) 10, (long) 100)); // When: injector.inject(statement, builder); @@ -364,7 +370,9 @@ public void shouldCreateMissingTopicForCreate() { "expectedName", 10, (short) 10, - ImmutableMap.of(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE)); + ImmutableMap.of( + TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE, + TopicConfig.RETENTION_MS_CONFIG, 100L)); } @Test @@ -372,7 +380,7 @@ public void shouldCreateMissingTopicWithCompactCleanupPolicyForNonWindowedTables // Given: givenStatement("CREATE TABLE x WITH (kafka_topic='topic') " + "AS SELECT * FROM SOURCE;"); - when(builder.build()).thenReturn(new TopicProperties("expectedName", 10, (short) 10)); + when(builder.build()).thenReturn(new TopicProperties("expectedName", 10, (short) 10, (long) 100)); // When: injector.inject(statement, builder); @@ -389,7 +397,7 @@ public void shouldCreateMissingTopicWithCompactCleanupPolicyForNonWindowedTables public void shouldCreateMissingTopicWithCompactCleanupPolicyForCreateTable() { // Given: givenStatement("CREATE TABLE foo (FOO VARCHAR) WITH (value_format='avro', kafka_topic='topic', partitions=1);"); - when(builder.build()).thenReturn(new TopicProperties("topic", 10, (short) 10)); + when(builder.build()).thenReturn(new TopicProperties("topic", 10, (short) 10, (long) 5000)); // When: injector.inject(statement, builder); @@ -407,7 +415,7 @@ public void shouldCreateMissingTopicWithCompactAndDeleteCleanupPolicyForWindowed // Given: givenStatement("CREATE TABLE x WITH (kafka_topic='topic') " + "AS SELECT * FROM SOURCE WINDOW TUMBLING (SIZE 10 SECONDS);"); - when(builder.build()).thenReturn(new TopicProperties("expectedName", 10, (short) 10)); + when(builder.build()).thenReturn(new TopicProperties("expectedName", 10, (short) 10, (long) 7000)); // When: injector.inject(statement, builder); @@ -418,7 +426,8 @@ public void shouldCreateMissingTopicWithCompactAndDeleteCleanupPolicyForWindowed 10, (short) 10, ImmutableMap.of(TopicConfig.CLEANUP_POLICY_CONFIG, - TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE)); + TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE, + TopicConfig.RETENTION_MS_CONFIG, 7000L)); } @Test @@ -426,7 +435,7 @@ public void shouldCreateMissingTopicWithSpecifiedRetentionForWindowedTables() { // Given: givenStatement("CREATE TABLE x WITH (kafka_topic='topic') " + "AS SELECT * FROM SOURCE WINDOW TUMBLING (SIZE 10 SECONDS, RETENTION 4 DAYS);"); - when(builder.build()).thenReturn(new TopicProperties("expectedName", 10, (short) 10)); + when(builder.build()).thenReturn(new TopicProperties("expectedName", 10, (short) 10, (long) 6000)); // When: injector.inject(statement, builder); @@ -444,6 +453,29 @@ public void shouldCreateMissingTopicWithSpecifiedRetentionForWindowedTables() { )); } + @Test + public void shouldCreateMissingTopicWithLargerRetentionForWindowedTables() { + // Given: + givenStatement("CREATE TABLE x WITH (kafka_topic='topic', partitions=2, replicas=1, format='avro', retention_ms=432000000) " + + "AS SELECT * FROM SOURCE WINDOW TUMBLING (SIZE 10 SECONDS, RETENTION 4 DAYS);"); + when(builder.build()).thenReturn(new TopicProperties("topic", 2, (short) 1, (long) 432000000)); + + // When: + injector.inject(statement, builder); + + // Then: + verify(topicClient).createTopic( + "topic", + 2, + (short) 1, + ImmutableMap.of( + TopicConfig.CLEANUP_POLICY_CONFIG, + TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE, + TopicConfig.RETENTION_MS_CONFIG, + Duration.ofDays(5).toMillis() + )); + } + @Test public void shouldHaveSuperUsefulErrorMessageIfCreateWithNoPartitions() { // Given: @@ -464,6 +496,42 @@ public void shouldHaveSuperUsefulErrorMessageIfCreateWithNoPartitions() { + "WITH (KAFKA_TOPIC='doesntexist', PARTITIONS=2, VALUE_FORMAT='avro');")); } + @Test + public void shouldThrowIfRetentionConfigPresentInCreateTable() { + // Given: + givenStatement("CREATE TABLE foo_bar (FOO STRING PRIMARY KEY, BAR STRING) WITH (kafka_topic='doesntexist', partitions=2, format='avro', retention_ms=30000);"); + + // When: + final Exception e = assertThrows( + KsqlException.class, + () -> injector.inject(statement, builder) + ); + + // Then: + assertThat( + e.getMessage(), + containsString("Invalid config variable in the WITH clause: RETENTION_MS." + + " Non-windowed tables do not support retention.")); + } + + @Test + public void shouldThrowIfRetentionConfigPresentInCreateTableAs() { + // Given: + givenStatement("CREATE TABLE foo_bar WITH (kafka_topic='doesntexist', partitions=2, format='avro', retention_ms=30000) AS SELECT * FROM SOURCE;"); + + // When: + final Exception e = assertThrows( + KsqlException.class, + () -> injector.inject(statement, builder) + ); + + // Then: + assertThat( + e.getMessage(), + containsString("Invalid config variable in the WITH clause: RETENTION_MS." + + " Non-windowed tables do not support retention.")); + } + private ConfiguredStatement givenStatement(final String sql) { final PreparedStatement preparedStatement = parser.prepare(parser.parse(sql).get(0), metaStore); diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/topic/TopicPropertiesTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/topic/TopicPropertiesTest.java index 39f985c1a95..83c7455f6dc 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/topic/TopicPropertiesTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/topic/TopicPropertiesTest.java @@ -28,11 +28,15 @@ import com.google.common.collect.ImmutableList; import io.confluent.ksql.topic.TopicProperties.Builder; import io.confluent.ksql.util.KsqlException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.Optional; import java.util.function.Supplier; import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartitionInfo; +import org.apache.kafka.common.config.TopicConfig; import org.junit.Test; public class TopicPropertiesTest { @@ -41,36 +45,88 @@ public class TopicPropertiesTest { public void shouldPreferWithClauseToSourceReplicas() { // When: final TopicProperties properties = new TopicProperties.Builder() - .withWithClause(Optional.of("name"), Optional.empty(), Optional.of((short) 3)) + .withWithClause(Optional.of("name"), Optional.empty(), Optional.of((short) 3), Optional.of((long) 100)) .withSource(() -> new TopicDescription( "", false, ImmutableList.of( new TopicPartitionInfo( - 0, new Node(0, "", 0), ImmutableList.of(new Node(0, "", 0)), ImmutableList.of())))) + 0, new Node(0, "", 0), ImmutableList.of(new Node(0, "", 0)), ImmutableList.of()))), + () -> Collections.emptyMap()) .build(); // Then: assertThat(properties.getReplicas(), is((short) 3)); assertThat(properties.getPartitions(), is(1)); + assertThat(properties.getRetentionInMillis(), is((long) 100)); } @Test public void shouldPreferWithClauseToSourcePartitions() { // When: final TopicProperties properties = new TopicProperties.Builder() - .withWithClause(Optional.of("name"), Optional.of(3), Optional.empty()) + .withWithClause(Optional.of("name"), Optional.of(3), Optional.empty(), Optional.of((long) 100)) .withSource(() -> new TopicDescription( "", false, ImmutableList.of( new TopicPartitionInfo( - 0, new Node(0, "", 0), ImmutableList.of(new Node(0, "", 0)), ImmutableList.of())))) + 0, new Node(0, "", 0), ImmutableList.of(new Node(0, "", 0)), ImmutableList.of()))), + () -> Collections.emptyMap()) .build(); // Then: assertThat(properties.getReplicas(), is((short) 1)); assertThat(properties.getPartitions(), is(3)); + assertThat(properties.getRetentionInMillis(), is((long) 100)); + } + + @Test + public void shouldPreferWithClauseToSourceRetention() { + // When: + final TopicProperties properties = new TopicProperties.Builder() + .withWithClause(Optional.of("name"), Optional.of(3), Optional.empty(), Optional.of((long) 100)) + .withSource(() -> new TopicDescription( + "", + false, + ImmutableList.of( + new TopicPartitionInfo( + 0, new Node(0, "", 0), ImmutableList.of(new Node(0, "", 0)), ImmutableList.of()))), + () -> { + Map configsMap = new HashMap<>(); + configsMap.put(TopicConfig.RETENTION_MS_CONFIG, "5000"); + return configsMap; + }) + .build(); + + // Then: + assertThat(properties.getReplicas(), is((short) 1)); + assertThat(properties.getPartitions(), is(3)); + assertThat(properties.getRetentionInMillis(), is((long) 100)); + } + + @Test + public void shouldPreferSourceRetentionToWithClause() { + // When: + final TopicProperties properties = new TopicProperties.Builder() + .withWithClause(Optional.of("name"), Optional.of(3), Optional.empty(), Optional.empty()) + .withSource(() -> new TopicDescription( + "", + false, + ImmutableList.of( + new TopicPartitionInfo( + 0, new Node(0, "", 0), ImmutableList.of(new Node(0, "", 0)), ImmutableList.of()))), + () -> { + Map configsMap = new HashMap<>(); + configsMap.put(TopicConfig.RETENTION_MS_CONFIG, "5000"); + return configsMap; + }) + .build(); + + // Then: + assertThat(properties.getReplicas(), is((short) 1)); + assertThat(properties.getPartitions(), is(3)); + assertThat(properties.getRetentionInMillis(), is((long) 5000)); } @Test @@ -80,7 +136,8 @@ public void shouldUseNameFromWithClause() { .withWithClause( Optional.of("name"), Optional.of(1), - Optional.empty() + Optional.empty(), + Optional.of((long) 100) ) .build(); @@ -96,7 +153,8 @@ public void shouldUseNameFromWithClauseWhenNameIsAlsoPresent() { .withWithClause( Optional.of("name"), Optional.of(1), - Optional.empty() + Optional.empty(), + Optional.of((long) 100) ) .build(); @@ -109,7 +167,7 @@ public void shouldUseNameIfNoWIthClause() { // When: final TopicProperties properties = new TopicProperties.Builder() .withName("name") - .withWithClause(Optional.empty(), Optional.of(1), Optional.empty()) + .withWithClause(Optional.empty(), Optional.of(1), Optional.empty(), Optional.of((long) 100)) .build(); // Then: @@ -151,7 +209,7 @@ public void shouldFailIfNoPartitionsSupplied() { KsqlException.class, () -> new TopicProperties.Builder() .withName("name") - .withWithClause(empty(), empty(), of((short) 1)) + .withWithClause(empty(), empty(), of((short) 1), of((long) 100)) .build() ); @@ -165,7 +223,7 @@ public void shouldDefaultIfNoReplicasSupplied() { // When: final TopicProperties properties = new Builder() .withName("name") - .withWithClause(Optional.empty(), Optional.of(1), Optional.empty()) + .withWithClause(Optional.empty(), Optional.of(1), Optional.empty(), Optional.of((long) 100)) .build(); // Then: @@ -179,16 +237,18 @@ public void shouldNotMakeRemoteCallIfUnnecessary() { .withWithClause( Optional.of("name"), Optional.of(1), - Optional.of((short) 1) + Optional.of((short) 1), + Optional.of((long) 100) ) - .withSource(() -> { - throw new RuntimeException(); - }) + .withSource( + () -> {throw new RuntimeException();}, + () -> Collections.emptyMap()) .build(); // Then: assertThat(properties.getPartitions(), equalTo(1)); assertThat(properties.getReplicas(), equalTo((short) 1)); + assertThat(properties.getRetentionInMillis(), equalTo((long) 100)); } @SuppressWarnings("unchecked") @@ -212,11 +272,12 @@ public void shouldNotMakeMultipleRemoteCalls() { // When: final TopicProperties properties = new TopicProperties.Builder() .withName("name") - .withSource(source) + .withSource(source, () -> Collections.emptyMap()) .build(); // Then: assertThat(properties.getPartitions(), equalTo(1)); assertThat(properties.getReplicas(), equalTo((short) 1)); + assertThat(properties.getRetentionInMillis(), equalTo((long) 604800000)); } } diff --git a/ksqldb-execution/src/main/java/io/confluent/ksql/services/KafkaTopicClient.java b/ksqldb-execution/src/main/java/io/confluent/ksql/services/KafkaTopicClient.java index 5e11d31100c..47456d03db2 100644 --- a/ksqldb-execution/src/main/java/io/confluent/ksql/services/KafkaTopicClient.java +++ b/ksqldb-execution/src/main/java/io/confluent/ksql/services/KafkaTopicClient.java @@ -26,6 +26,7 @@ import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.config.TopicConfig; /** * Note: all methods are synchronous, i.e. they wait for responses from Kafka before returning. @@ -214,4 +215,14 @@ default Map listTopicsStartOffsets(Collection topi default Map listTopicsEndOffsets(Collection topicName) { return listTopicsOffsets(topicName, OffsetSpec.latest()); } + + static Optional getRetentionMs(Map config) { + if (config.containsKey(TopicConfig.RETENTION_MS_CONFIG)) { + return Optional.ofNullable( + Long.parseLong(String.valueOf(config.get(TopicConfig.RETENTION_MS_CONFIG)))); + } else { + return Optional.empty(); + } + } + } diff --git a/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/driver/AssertExecutorMetaTest.java b/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/driver/AssertExecutorMetaTest.java index de6fdd13e4e..4d9b8626bf1 100644 --- a/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/driver/AssertExecutorMetaTest.java +++ b/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/driver/AssertExecutorMetaTest.java @@ -24,7 +24,8 @@ public class AssertExecutorMetaTest { */ private final Set excluded = ImmutableSet.of( CommonCreateConfigs.SOURCE_NUMBER_OF_PARTITIONS, // testing tool does not support partitions - CommonCreateConfigs.SOURCE_NUMBER_OF_REPLICAS // testing tool does not support replicas + CommonCreateConfigs.SOURCE_NUMBER_OF_REPLICAS, // testing tool does not support replicas + CommonCreateConfigs.SOURCE_TOPIC_RETENTION_IN_MS // testing tool does not support retention_ms ); @Test diff --git a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/properties/with/CreateSourceAsProperties.java b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/properties/with/CreateSourceAsProperties.java index b393e8e7dad..1a807d213d3 100644 --- a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/properties/with/CreateSourceAsProperties.java +++ b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/properties/with/CreateSourceAsProperties.java @@ -21,6 +21,7 @@ import com.google.errorprone.annotations.Immutable; import io.confluent.ksql.execution.expression.tree.IntegerLiteral; import io.confluent.ksql.execution.expression.tree.Literal; +import io.confluent.ksql.execution.expression.tree.LongLiteral; import io.confluent.ksql.execution.expression.tree.StringLiteral; import io.confluent.ksql.name.ColumnName; import io.confluent.ksql.parser.ColumnReferenceParser; @@ -88,6 +89,10 @@ public Optional getReplicas() { return Optional.ofNullable(props.getShort(CommonCreateConfigs.SOURCE_NUMBER_OF_REPLICAS)); } + public Optional getRetentionInMillis() { + return Optional.ofNullable(props.getLong(CommonCreateConfigs.SOURCE_TOPIC_RETENTION_IN_MS)); + } + public Optional getTimestampColumnName() { return Optional.ofNullable(props.getString(CommonCreateConfigs.TIMESTAMP_NAME_PROPERTY)) .map(ColumnReferenceParser::parse); @@ -202,12 +207,14 @@ public Optional getValueSchemaId() { public CreateSourceAsProperties withTopic( final String name, final int partitions, - final short replicas + final short replicas, + final long retentionMs ) { final Map originals = props.copyOfOriginalLiterals(); originals.put(CommonCreateConfigs.KAFKA_TOPIC_NAME_PROPERTY, new StringLiteral(name)); originals.put(CommonCreateConfigs.SOURCE_NUMBER_OF_PARTITIONS, new IntegerLiteral(partitions)); originals.put(CommonCreateConfigs.SOURCE_NUMBER_OF_REPLICAS, new IntegerLiteral(replicas)); + originals.put(CommonCreateConfigs.SOURCE_TOPIC_RETENTION_IN_MS, new LongLiteral(retentionMs)); return new CreateSourceAsProperties(originals, unwrapProtobufPrimitives); } diff --git a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/properties/with/CreateSourceProperties.java b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/properties/with/CreateSourceProperties.java index d913f5b138f..aff6fdb4561 100644 --- a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/properties/with/CreateSourceProperties.java +++ b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/properties/with/CreateSourceProperties.java @@ -99,6 +99,10 @@ public Optional getReplicas() { return Optional.ofNullable(props.getShort(CommonCreateConfigs.SOURCE_NUMBER_OF_REPLICAS)); } + public Optional getRetentionInMillis() { + return Optional.ofNullable(props.getLong(CommonCreateConfigs.SOURCE_TOPIC_RETENTION_IN_MS)); + } + public Optional getWindowType() { try { return Optional.ofNullable(props.getString(CreateConfigs.WINDOW_TYPE_PROPERTY))