Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add topic retention config to WITH clause #9223

Merged
merged 15 commits into from
Aug 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -972,7 +972,7 @@ public void shouldListQueries() throws ExecutionException, InterruptedException
hasProperty("queryType", is(QueryType.PERSISTENT)),
hasProperty("id", startsWith("CTAS_" + AGG_TABLE)),
hasProperty("sql", is(
"CREATE TABLE " + AGG_TABLE + " WITH (KAFKA_TOPIC='" + AGG_TABLE + "', PARTITIONS=1, REPLICAS=1) AS SELECT\n"
"CREATE TABLE " + AGG_TABLE + " WITH (KAFKA_TOPIC='" + AGG_TABLE + "', PARTITIONS=1, REPLICAS=1, RETENTION_MS=-1) AS SELECT\n"
+ " " + TEST_STREAM + ".K K,\n"
+ " LATEST_BY_OFFSET(" + TEST_STREAM + ".LONG) LONG\n"
+ "FROM " + TEST_STREAM + " " + TEST_STREAM + "\n"
Expand Down Expand Up @@ -1005,7 +1005,7 @@ public void shouldDescribeSource() throws Exception {
assertThat(description.readQueries().get(0).getQueryType(), is(QueryType.PERSISTENT));
assertThat(description.readQueries().get(0).getId(), startsWith("CTAS_" + AGG_TABLE));
assertThat(description.readQueries().get(0).getSql(), is(
"CREATE TABLE " + AGG_TABLE + " WITH (KAFKA_TOPIC='" + AGG_TABLE + "', PARTITIONS=1, REPLICAS=1) AS SELECT\n"
"CREATE TABLE " + AGG_TABLE + " WITH (KAFKA_TOPIC='" + AGG_TABLE + "', PARTITIONS=1, REPLICAS=1, RETENTION_MS=-1) AS SELECT\n"
+ " " + TEST_STREAM + ".K K,\n"
+ " LATEST_BY_OFFSET(" + TEST_STREAM + ".LONG) LONG\n"
+ "FROM " + TEST_STREAM + " " + TEST_STREAM + "\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public final class CommonCreateConfigs {
public static final String KAFKA_TOPIC_NAME_PROPERTY = "KAFKA_TOPIC";
public static final String SOURCE_NUMBER_OF_PARTITIONS = "PARTITIONS";
public static final String SOURCE_NUMBER_OF_REPLICAS = "REPLICAS";
public static final String SOURCE_TOPIC_RETENTION_IN_MS = "RETENTION_MS";

// Timestamp Props:
public static final String TIMESTAMP_NAME_PROPERTY = "TIMESTAMP";
Expand Down Expand Up @@ -86,6 +87,14 @@ public static void addToConfigDef(
+ "Kafka cluster configuration for replicas will be used for creating a new "
+ "topic."
)
.define(
SOURCE_TOPIC_RETENTION_IN_MS,
ConfigDef.Type.LONG,
null,
Importance.MEDIUM,
"The retention in milliseconds in the backing topic. If this property is"
+ "not set then the default value of 7 days will be used for creating a new topic."
bvarghese1 marked this conversation as resolved.
Show resolved Hide resolved
)
.define(
VALUE_FORMAT_PROPERTY,
ConfigDef.Type.STRING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -97,8 +98,10 @@ public void createTopic(
final Map<String, ?> configs,
final CreateTopicsOptions createOptions
) {
final Optional<Long> retentionMs = KafkaTopicClient.getRetentionMs(configs);

if (isTopicExists(topic)) {
validateTopicProperties(topic, numPartitions, replicationFactor);
validateTopicProperties(topic, numPartitions, replicationFactor, retentionMs);
return;
}

Expand Down Expand Up @@ -128,9 +131,9 @@ public void createTopic(

} catch (final TopicExistsException e) {
// if the topic already exists, it is most likely because another node just created it.
// ensure that it matches the partition count and replication factor before returning
// success
validateTopicProperties(topic, numPartitions, replicationFactor);
// ensure that it matches the partition count, replication factor, and retention
// before returning success
validateTopicProperties(topic, numPartitions, replicationFactor, retentionMs);

} catch (final TopicAuthorizationException e) {
throw new KsqlTopicAuthorizationException(
Expand Down Expand Up @@ -383,14 +386,22 @@ private static boolean isInternalTopic(final String topicName, final String appl
private void validateTopicProperties(
final String topic,
final int requiredNumPartition,
final int requiredNumReplicas
final int requiredNumReplicas,
final Optional<Long> requiredRetentionMs
) {
final TopicDescription existingTopic = describeTopic(topic);
final Map<String, String> existingConfig = getTopicConfig(topic);
TopicValidationUtil
.validateTopicProperties(requiredNumPartition, requiredNumReplicas, existingTopic);
.validateTopicProperties(
requiredNumPartition,
requiredNumReplicas,
requiredRetentionMs,
existingTopic,
existingConfig);
LOG.debug(
"Did not create topic {} with {} partitions and replication-factor {} since it exists",
topic, requiredNumPartition, requiredNumReplicas);
"Did not create topic {} with {} partitions, replication-factor {}, "
+ "and retention {} since it exists",
topic, requiredNumPartition, requiredNumReplicas, requiredRetentionMs);
}

private Map<String, String> topicConfig(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -61,6 +62,7 @@ static KafkaTopicClient createProxy(final KafkaTopicClient delegate,
methodParams(String.class, int.class, short.class, Map.class), sandbox)
.forward("isTopicExists", methodParams(String.class), sandbox)
.forward("describeTopic", methodParams(String.class), sandbox)
.forward("getTopicConfig", methodParams(String.class), sandbox)
.forward("describeTopics", methodParams(Collection.class), sandbox)
.forward("deleteTopics", methodParams(Collection.class), sandbox)
.forward("listTopicsStartOffsets", methodParams(Collection.class), sandbox)
Expand All @@ -74,6 +76,7 @@ static KafkaTopicClient createProxy(final KafkaTopicClient delegate,
private final Supplier<Admin> adminClient;

private final Map<String, TopicDescription> createdTopics = new HashMap<>();
private final Map<String, Map<String, String>> createdTopicsConfig = new HashMap<>();

private SandboxedKafkaTopicClient(final KafkaTopicClient delegate,
final Supplier<Admin> sharedAdminClient) {
Expand All @@ -96,7 +99,8 @@ private void createTopic(
final Map<String, Object> configs
) {
if (isTopicExists(topic)) {
validateTopicProperties(topic, numPartitions, replicationFactor);
final Optional<Long> retentionMs = KafkaTopicClient.getRetentionMs(configs);
validateTopicProperties(topic, numPartitions, replicationFactor, retentionMs);
return;
}

Expand Down Expand Up @@ -125,6 +129,8 @@ private void createTopic(
partitions,
Sets.newHashSet(AclOperation.READ, AclOperation.WRITE)
));

createdTopicsConfig.put(topic, toStringConfigs(configs));
}

private short getDefaultClusterReplication() {
Expand Down Expand Up @@ -170,18 +176,32 @@ private Map<String, TopicDescription> describeTopics(final Collection<String> to
return descriptions;
}

public Map<String, String> getTopicConfig(final String topicName) {
if (createdTopicsConfig.containsKey(topicName)) {
return createdTopicsConfig.get(topicName);
}
return delegate.getTopicConfig(topicName);
}

private void deleteTopics(final Collection<String> topicsToDelete) {
topicsToDelete.forEach(createdTopics::remove);
}

private void validateTopicProperties(
final String topic,
final int requiredNumPartition,
final int requiredNumReplicas
final int requiredNumReplicas,
final Optional<Long> requiredRetentionMs
) {
final TopicDescription existingTopic = describeTopic(topic);
final Map<String, String> existingConfig = getTopicConfig(topic);
TopicValidationUtil
.validateTopicProperties(requiredNumPartition, requiredNumReplicas, existingTopic);
.validateTopicProperties(
requiredNumPartition,
requiredNumReplicas,
requiredRetentionMs,
existingTopic,
existingConfig);
}

private Map<TopicPartition, Long> listTopicsStartOffsets(final Collection<String> topics) {
Expand All @@ -191,4 +211,9 @@ private Map<TopicPartition, Long> listTopicsStartOffsets(final Collection<String
private Map<TopicPartition, Long> listTopicsEndOffsets(final Collection<String> topics) {
return delegate.listTopicsEndOffsets(topics);
}

private static Map<String, String> toStringConfigs(final Map<String, ?> configs) {
return configs.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

import io.confluent.ksql.exception.KafkaTopicExistsException;
import io.confluent.ksql.topic.TopicProperties;
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.clients.admin.TopicDescription;

final class TopicValidationUtil {
Expand All @@ -28,40 +30,73 @@ private TopicValidationUtil() {
public static void validateTopicProperties(
final int requiredNumPartition,
final int requiredNumReplicas,
final TopicDescription existingTopic
final Optional<Long> requiredRetentionMs,
final TopicDescription existingTopic,
final Map<String, String> existingConfig
) {
final int actualNumPartitions = existingTopic.partitions().size();
final int actualNumReplicas = existingTopic.partitions().get(0).replicas().size();
final Optional<Long> actualRetentionMs = KafkaTopicClient.getRetentionMs(existingConfig);

final String topicName = existingTopic.name();
validateTopicProperties(
topicName,
requiredNumPartition,
requiredNumReplicas,
requiredRetentionMs,
actualNumPartitions,
actualNumReplicas);
actualNumReplicas,
actualRetentionMs);
}

public static void validateTopicProperties(
final String topicName,
final int requiredNumPartition,
final int requiredNumReplicas,
final Optional<Long> requiredRetentionMs,
final int actualNumPartitions,
final int actualNumReplicas
final int actualNumReplicas,
final Optional<Long> actualRetentionMs
) {
if (actualNumPartitions != requiredNumPartition
|| (requiredNumReplicas != TopicProperties.DEFAULT_REPLICAS
&& actualNumReplicas < requiredNumReplicas)) {
throw new KafkaTopicExistsException(String.format(
"A Kafka topic with the name '%s' already exists, with different partition/replica "
+ "configuration than required. KSQL expects %d partitions (topic has %d), and %d "
+ "replication factor (topic has %d).",
if (isInvalidPartitions(actualNumPartitions, requiredNumPartition)
|| isInvalidReplicas(actualNumReplicas, requiredNumReplicas)
|| isInvalidRetention(actualRetentionMs, requiredRetentionMs)) {
String errMsg = String.format(
"A Kafka topic with the name '%s' already exists, with different partition/replica"
+ " configuration than required. KSQL expects %d partitions (topic has %d),"
+ " %d replication factor (topic has %d)",
topicName,
requiredNumPartition,
actualNumPartitions,
requiredNumReplicas,
actualNumReplicas
), true);
actualNumReplicas);
if (requiredRetentionMs.isPresent() && actualRetentionMs.isPresent()) {
errMsg = errMsg.replace("partition/replica", "partition/replica/retention");
errMsg = String.format(errMsg + ", and %d retention (topic has %d).",
requiredRetentionMs.get(), actualRetentionMs.get());
} else {
errMsg += ".";
}
throw new KafkaTopicExistsException(errMsg, true);
}
}

private static boolean isInvalidPartitions(final int actualNumPartitions,
final int requiredNumPartition) {
return actualNumPartitions != requiredNumPartition;
}

private static boolean isInvalidReplicas(final int actualNumReplicas,
final int requiredNumReplicas) {
return requiredNumReplicas != TopicProperties.DEFAULT_REPLICAS
&& actualNumReplicas < requiredNumReplicas;
}

private static boolean isInvalidRetention(final Optional<Long> actualRetentionMs,
final Optional<Long> requiredRetentionMs) {
return requiredRetentionMs.isPresent()
&& actualRetentionMs.isPresent()
&& actualRetentionMs.get().longValue() != requiredRetentionMs.get().longValue();
}

}
Loading