Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add topic retention config to WITH clause #9223

Merged
merged 15 commits into from
Aug 24, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -939,7 +939,7 @@ public void shouldListQueries() throws ExecutionException, InterruptedException
hasProperty("queryType", is(QueryType.PERSISTENT)),
hasProperty("id", startsWith("CTAS_" + AGG_TABLE)),
hasProperty("sql", is(
"CREATE TABLE " + AGG_TABLE + " WITH (KAFKA_TOPIC='" + AGG_TABLE + "', PARTITIONS=1, REPLICAS=1) AS SELECT\n"
"CREATE TABLE " + AGG_TABLE + " WITH (KAFKA_TOPIC='" + AGG_TABLE + "', PARTITIONS=1, REPLICAS=1, RETENTION_MS=-1) AS SELECT\n"
+ " " + TEST_STREAM + ".K K,\n"
+ " LATEST_BY_OFFSET(" + TEST_STREAM + ".LONG) LONG\n"
+ "FROM " + TEST_STREAM + " " + TEST_STREAM + "\n"
Expand Down Expand Up @@ -972,7 +972,7 @@ public void shouldDescribeSource() throws Exception {
assertThat(description.readQueries().get(0).getQueryType(), is(QueryType.PERSISTENT));
assertThat(description.readQueries().get(0).getId(), startsWith("CTAS_" + AGG_TABLE));
assertThat(description.readQueries().get(0).getSql(), is(
"CREATE TABLE " + AGG_TABLE + " WITH (KAFKA_TOPIC='" + AGG_TABLE + "', PARTITIONS=1, REPLICAS=1) AS SELECT\n"
"CREATE TABLE " + AGG_TABLE + " WITH (KAFKA_TOPIC='" + AGG_TABLE + "', PARTITIONS=1, REPLICAS=1, RETENTION_MS=-1) AS SELECT\n"
+ " " + TEST_STREAM + ".K K,\n"
+ " LATEST_BY_OFFSET(" + TEST_STREAM + ".LONG) LONG\n"
+ "FROM " + TEST_STREAM + " " + TEST_STREAM + "\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public final class CommonCreateConfigs {
public static final String KAFKA_TOPIC_NAME_PROPERTY = "KAFKA_TOPIC";
public static final String SOURCE_NUMBER_OF_PARTITIONS = "PARTITIONS";
public static final String SOURCE_NUMBER_OF_REPLICAS = "REPLICAS";
public static final String SOURCE_TOPIC_RETENTION_IN_MS = "RETENTION_MS";

// Timestamp Props:
public static final String TIMESTAMP_NAME_PROPERTY = "TIMESTAMP";
Expand Down Expand Up @@ -86,6 +87,14 @@ public static void addToConfigDef(
+ "Kafka cluster configuration for replicas will be used for creating a new "
+ "topic."
)
.define(
SOURCE_TOPIC_RETENTION_IN_MS,
ConfigDef.Type.LONG,
null,
Importance.MEDIUM,
"The retention in milliseconds in the backing topic. If this property is"
+ "not set then the default value of 7 days will be used for creating a new topic."
bvarghese1 marked this conversation as resolved.
Show resolved Hide resolved
)
.define(
VALUE_FORMAT_PROPERTY,
ConfigDef.Type.STRING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ static KafkaTopicClient createProxy(final KafkaTopicClient delegate,
methodParams(String.class, int.class, short.class, Map.class), sandbox)
.forward("isTopicExists", methodParams(String.class), sandbox)
.forward("describeTopic", methodParams(String.class), sandbox)
.forward("getTopicConfig", methodParams(String.class), sandbox)
.forward("describeTopics", methodParams(Collection.class), sandbox)
.forward("deleteTopics", methodParams(Collection.class), sandbox)
.forward("listTopicsStartOffsets", methodParams(Collection.class), sandbox)
Expand All @@ -74,6 +75,7 @@ static KafkaTopicClient createProxy(final KafkaTopicClient delegate,
private final Supplier<Admin> adminClient;

private final Map<String, TopicDescription> createdTopics = new HashMap<>();
private final Map<String, Map<String, String>> createdTopicsConfig = new HashMap<>();

private SandboxedKafkaTopicClient(final KafkaTopicClient delegate,
final Supplier<Admin> sharedAdminClient) {
Expand Down Expand Up @@ -125,6 +127,8 @@ private void createTopic(
partitions,
Sets.newHashSet(AclOperation.READ, AclOperation.WRITE)
));

createdTopicsConfig.put(topic, toStringConfigs(configs));
}

private short getDefaultClusterReplication() {
Expand Down Expand Up @@ -170,6 +174,10 @@ private Map<String, TopicDescription> describeTopics(final Collection<String> to
return descriptions;
}

public Map<String, String> getTopicConfig(final String topicName) {
return createdTopicsConfig.getOrDefault(topicName, Collections.emptyMap());
}

private void deleteTopics(final Collection<String> topicsToDelete) {
topicsToDelete.forEach(createdTopics::remove);
}
Expand All @@ -191,4 +199,9 @@ private Map<TopicPartition, Long> listTopicsStartOffsets(final Collection<String
private Map<TopicPartition, Long> listTopicsEndOffsets(final Collection<String> topics) {
return delegate.listTopicsEndOffsets(topics);
}

private static Map<String, String> toStringConfigs(final Map<String, ?> configs) {
return configs.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,10 @@ private ConfiguredStatement<? extends CreateSource> injectForCreateSource(
final String topicName = properties.getKafkaTopic();

if (topicClient.isTopicExists(topicName)) {
topicPropertiesBuilder.withSource(() -> topicClient.describeTopic(topicName));
topicPropertiesBuilder
.withSource(
() -> topicClient.describeTopic(topicName),
() -> topicClient.getTopicConfig(topicName));
} else if (!properties.getPartitions().isPresent()) {
final CreateSource example = createSource.copyWith(
createSource.getElements(),
Expand All @@ -124,7 +127,8 @@ private ConfiguredStatement<? extends CreateSource> injectForCreateSource(
.withWithClause(
Optional.of(properties.getKafkaTopic()),
properties.getPartitions(),
properties.getReplicas());
properties.getReplicas(),
properties.getRetentionInMillis());

final String topicCleanUpPolicy = createSource instanceof CreateTable
? TopicConfig.CLEANUP_POLICY_COMPACT : TopicConfig.CLEANUP_POLICY_DELETE;
Expand All @@ -151,11 +155,14 @@ private <T extends CreateAsSelect> ConfiguredStatement<?> injectForCreateAsSelec

topicPropertiesBuilder
.withName(prefix + createAsSelect.getName().text())
.withSource(() -> topicClient.describeTopic(sourceTopicName))
.withSource(
() -> topicClient.describeTopic(sourceTopicName),
() -> topicClient.getTopicConfig(sourceTopicName))
.withWithClause(
properties.getKafkaTopic(),
properties.getPartitions(),
properties.getReplicas());
properties.getReplicas(),
properties.getRetentionInMillis());

final String topicCleanUpPolicy;
final Map<String, Object> additionalTopicConfigs = new HashMap<>();
Expand All @@ -182,7 +189,9 @@ private <T extends CreateAsSelect> ConfiguredStatement<?> injectForCreateAsSelec
final T withTopic = (T) createAsSelect.copyWith(properties.withTopic(
info.getTopicName(),
info.getPartitions(),
info.getReplicas()
info.getReplicas(),
(Long) additionalTopicConfigs
.getOrDefault(TopicConfig.RETENTION_MS_CONFIG, info.getRetentionInMillis())
));

final String withTopicText = SqlFormatter.formatSql(withTopic) + ";";
Expand All @@ -206,6 +215,10 @@ private TopicProperties createTopic(

final Map<String, Object> config = new HashMap<>();
config.put(TopicConfig.CLEANUP_POLICY_CONFIG, topicCleanUpPolicy);
// This is to make sure WINDOW RETENTION config is honored over WITH RETENTION_MS config
bvarghese1 marked this conversation as resolved.
Show resolved Hide resolved
if (!additionalTopicConfigs.containsKey(TopicConfig.RETENTION_MS_CONFIG)) {
config.put(TopicConfig.RETENTION_MS_CONFIG, info.getRetentionInMillis());
}
config.putAll(additionalTopicConfigs);

topicClient.createTopic(info.getTopicName(), info.getPartitions(), info.getReplicas(), config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Suppliers;
import io.confluent.ksql.util.KsqlException;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.config.TopicConfig;

/**
* A container for all properties required for creating/validating
Expand All @@ -33,30 +35,33 @@
public final class TopicProperties {

public static final short DEFAULT_REPLICAS = -1;

private static final String INVALID_TOPIC_NAME = ":INVALID:";
private static final int INVALID_PARTITIONS = -1;
private static final long DEFAULT_RETENTION_IN_MS = 604800000L; // 7 days

private final String topicName;
private final Integer partitions;
private final Short replicas;
private final Long retentionMs;

@VisibleForTesting
TopicProperties(
final String topicName,
final Integer partitions,
final Short replicas
) {
final Short replicas,
final Long retentionMs) {
this.topicName = topicName;
this.partitions = partitions;
this.replicas = replicas;
this.retentionMs = retentionMs;
}

@Override
public String toString() {
return "TopicProperties{" + "topicName='" + getTopicName() + '\''
+ ", partitions=" + getPartitions()
+ ", replicas=" + getReplicas()
+ ", retentionMs=" + getRetentionInMillis()
+ '}';
}

Expand All @@ -72,6 +77,10 @@ public short getReplicas() {
return replicas == null ? DEFAULT_REPLICAS : replicas;
}

public long getRetentionInMillis() {
return retentionMs == null ? DEFAULT_RETENTION_IN_MS : retentionMs;
}

/**
* Constructs a {@link TopicProperties} with the following precedence order:
*
Expand All @@ -92,10 +101,10 @@ public short getReplicas() {
public static final class Builder {

private String name;
private TopicProperties fromWithClause = new TopicProperties(null, null, null);
private final TopicProperties fromOverrides = new TopicProperties(null, null, null);
private final TopicProperties fromKsqlConfig = new TopicProperties(null, null, null);
private Supplier<TopicProperties> fromSource = () -> new TopicProperties(null, null, null);
private TopicProperties fromWithClause = new TopicProperties(null, null, null, null);
private final TopicProperties fromOverrides = new TopicProperties(null, null, null, null);
private final TopicProperties fromKsqlConfig = new TopicProperties(null, null, null, null);
private Supplier<TopicProperties> fromSource = () -> new TopicProperties(null, null, null,null);

Builder withName(final String name) {
this.name = name;
Expand All @@ -105,23 +114,31 @@ Builder withName(final String name) {
Builder withWithClause(
final Optional<String> name,
final Optional<Integer> partitionCount,
final Optional<Short> replicationFactor
final Optional<Short> replicationFactor,
final Optional<Long> retentionMs
) {
fromWithClause = new TopicProperties(
name.orElse(null),
partitionCount.orElse(null),
replicationFactor.orElse(null)
);
replicationFactor.orElse(null),
retentionMs.orElse(null));
return this;
}

Builder withSource(final Supplier<TopicDescription> descriptionSupplier) {
Builder withSource(final Supplier<TopicDescription> descriptionSupplier,
final Supplier<Map<String, String>> configsSupplier) {
fromSource = Suppliers.memoize(() -> {
final TopicDescription description = descriptionSupplier.get();
final Integer partitions = description.partitions().size();
final Short replicas = (short) description.partitions().get(0).replicas().size();

return new TopicProperties(null, partitions, replicas);
final Map<String, String> configs = configsSupplier.get();
final Long retentionMs = Long.valueOf(
configs.getOrDefault(
TopicConfig.RETENTION_MS_CONFIG, String.valueOf(DEFAULT_RETENTION_IN_MS))
);

return new TopicProperties(null, partitions, replicas, retentionMs);
});
return this;
}
Expand Down Expand Up @@ -154,7 +171,15 @@ public TopicProperties build() {
.findFirst()
.orElseGet(() -> fromSource.get().replicas);

return new TopicProperties(name, partitions, replicas);
final Long retentionMs = Stream.of(
fromWithClause.retentionMs,
fromOverrides.retentionMs,
fromKsqlConfig.retentionMs)
.filter(Objects::nonNull)
.findFirst()
.orElseGet(() -> fromSource.get().retentionMs);

return new TopicProperties(name, partitions, replicas, retentionMs);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
Expand Down Expand Up @@ -53,7 +54,6 @@
import org.apache.kafka.clients.admin.DescribeClusterResult;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
Expand Down Expand Up @@ -87,6 +87,7 @@ public static Collection<TestCase<KafkaTopicClient>> getMethodsToTest() {
.ignore("createTopic", String.class, int.class, short.class, Map.class)
.ignore("isTopicExists", String.class)
.ignore("describeTopic", String.class)
.ignore("getTopicConfig", String.class)
.ignore("describeTopics", Collection.class)
.ignore("deleteTopics", Collection.class)
.ignore("listTopicsStartOffsets", Collection.class)
Expand Down Expand Up @@ -150,6 +151,8 @@ public void shouldTrackCreatedTopicsWithConfig() {

// Then:
assertThat(sandboxedClient.isTopicExists("some topic"), is(true));
assertThat(sandboxedClient.getTopicConfig("some topic").entrySet(),
equalTo(toStringConfigs(configs).entrySet()));
}

@SuppressFBWarnings("RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT")
Expand Down Expand Up @@ -439,5 +442,10 @@ private static List<TopicPartitionInfo> topicPartitions(

return builder.build();
}

private static Map<String, String> toStringConfigs(final Map<String, ?> configs) {
return configs.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
}
}
}
Loading