Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add topic retention config to WITH clause #9223

Merged
merged 15 commits into from
Aug 24, 2022
Merged

Add topic retention config to WITH clause #9223

merged 15 commits into from
Aug 24, 2022

Conversation

bvarghese1
Copy link
Member

@bvarghese1 bvarghese1 commented Jun 24, 2022

Description

Pass retention_ms as a config in the WITH clause.
This param will be used to set the topic retention while creating the topic if the topic is not already present.
It does not update the retention if the topic is already present.
Additionally, validation is performed to ensure retention_ms config cannot be changed once a topic exists similar to partition count.

Testing done

Unit tests
Integration Tests
Manual Tests using CLI

ksql> CREATE STREAM s1 (k1 int key, v1 int) WITH (kafka_topic='s1', format='avro', partitions=2);

 Message
----------------
 Stream created  // Creates topic with default retention of 1 day
----------------

ksql> CREATE STREAM s2 (k1 int key, v1 int) WITH (kafka_topic='s2', format='avro', partitions=2, retention_ms=30000);

 Message
----------------
 Stream created  // Creates topic with retention.ms=30000
----------------

ksql> CREATE STREAM s3 (k1 int key, v1 int) WITH (kafka_topic='s2', format='avro', partitions=2, retention_ms=50000);
A Kafka topic with the name 's2' already exists, with different partition/replica/retention configuration than required. KSQL expects 2 partitions (topic has 2), 1 replication factor (topic has 1), and 50000 retention (topic has 30000).

ksql> CREATE STREAM ss2 AS SELECT * FROM s2;

 Message
----------------------------------
 Created query with ID CSAS_SS2_9    // Creates topic SS2 with retention.ms=30000 & partitions=2 copied from s2
----------------------------------

ksql> CREATE STREAM ss3 WITH (kafka_topic='ss3', partitions=3, format='avro', retention_ms=50000) AS SELECT * FROM s2;

 Message
-----------------------------------
 Created query with ID CSAS_SS3_11  // Creates topic SS3 with retention.ms=50000 & partitions=3 as specified in WITH
-----------------------------------

ksql> CREATE TABLE t1 (k1 int PRIMARY KEY, v1 int) WITH (kafka_topic='t1', partitions=2, format='avro');

 Message
---------------
 Table created
---------------

ksql> CREATE TABLE t2 (k1 int PRIMARY KEY, v1 int) WITH (kafka_topic='t2', partitions=2, format='avro', retention_ms=30000);
Invalid config variable in the WITH clause: RETENTION_MS. Non-windowed tables do not support retention.

ksql> CREATE TABLE t2 (k1 int PRIMARY KEY, v1 int) WITH (kafka_topic='t1', partitions=2, format='avro', retention_ms=30000);
Invalid config variable in the WITH clause: RETENTION_MS. Non-windowed tables do not support retention.

ksql> CREATE TABLE tt2 WITH (kafka_topic='tt2', format='avro', partitions=3, retention_ms=50000) AS SELECT * FROM t1;
Invalid config variable in the WITH clause: RETENTION_MS. Non-windowed tables do not support retention.

ksql> CREATE TABLE tt2 WITH (kafka_topic='tt2', format='avro', partitions=3) AS SELECT * FROM t1;

 Message
-----------------------------------
 Created query with ID CTAS_TT2_15
-----------------------------------

ksql> CREATE TABLE tt3 WITH (kafka_topic='tt3', format='avro', partitions=3, retention_ms=30000) AS SELECT k1, count(*) FROM s1 WINDOW TUMBLING (SIZE 10 SE
CONDS, RETENTION 2 DAYS) GROUP BY k1;

 Message
-----------------------------------
 Created query with ID CTAS_TT3_17  // Creates topic with retention.ms=172800000 (2 DAYS)
-----------------------------------

ksql> CREATE TABLE tt4 WITH (kafka_topic='tt4', format='avro', partitions=3, retention_ms=432000000) AS SELECT k1, count(*) FROM s1 WINDOW TUMBLING (SIZE 1
0 SECONDS, RETENTION 2 DAYS) GROUP BY k1;

 Message
-----------------------------------
 Created query with ID CTAS_TT4_19  // Creates topic with retention.ms=432000000 (5 DAYS)
-----------------------------------

ksql> CREATE TABLE tt5 WITH (kafka_topic='tt4', format='avro', partitions=3, retention_ms=50000) AS SELECT k1, count(*) FROM s1 WINDOW TUMBLING (SIZE 10 SE
CONDS, RETENTION 2 DAYS) GROUP BY k1;
A Kafka topic with the name 'tt4' already exists, with different partition/replica/retention configuration than required. KSQL expects 3 partitions (topic has 3), 1 replication factor (topic has 1), and 172800000 retention (topic has 432000000).

Reviewer checklist

@bvarghese1 bvarghese1 requested a review from a team as a code owner June 24, 2022 21:34
- Introduces a new config RETENTION_MS which can be set in the WITH clause
- The RETENTION_MS config is passed to the Admin client along with existing
  params such as PARTIONS, REPLICAS, etc while creating the underlying topic
- Currently, there is a way to set the RETENTION of the underlying topic using
  the WINDOW config. The RETENTION_MS does not override that param as users may
  be used to the WINDOW config.
Copy link
Contributor

@agavra agavra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implementation looks good to me, some feature level comments inline. Two other open questions:

  1. Can you test to make sure that this doesn't affect the retention of internal topics? (It shouldn't, but would be good to double check)
  2. In the case that the topic exists and retention is different (e.g. in the CS case), do we want to consider either (a) failing the statement or (b) allowing it to override? In the case of partitions, if you specify something different than the existing partitions the statement will fail (IIRC)

@agavra agavra requested a review from a team July 28, 2022 18:55
@bvarghese1
Copy link
Member Author

Implementation looks good to me, some feature level comments inline. Two other open questions:

  1. Can you test to make sure that this doesn't affect the retention of internal topics? (It shouldn't, but would be good to double check)
  2. In the case that the topic exists and retention is different (e.g. in the CS case), do we want to consider either (a) failing the statement or (b) allowing it to override? In the case of partitions, if you specify something different than the existing partitions the statement will fail (IIRC)

For 2, I think we should fail if the retention specified is different than the existing topic's retention. The same is true for partitions.

@colinhicks
Copy link
Member

  1. In the case that the topic exists and retention is different (e.g. in the CS case), do we want to consider either (a) failing the statement or (b) allowing it to override? In the case of partitions, if you specify something different than the existing partitions the statement will fail (IIRC)

Let's make the behavior for retention consistent with the behavior for partition count.

Copy link
Contributor

@agavra agavra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Did a quick skim and have to inline comments.

Copy link
Contributor

@agavra agavra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @bvarghese1

@bvarghese1 bvarghese1 merged commit 1cdde9f into confluentinc:master Aug 24, 2022
@bvarghese1 bvarghese1 deleted the add_topic_retention branch August 24, 2022 23:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Topic configuration while creating a stream/table
3 participants