New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-6729: Reuse source topics for source KTable's materialized store's changelog #5017
KAFKA-6729: Reuse source topics for source KTable's materialized store's changelog #5017
Conversation
@@ -192,7 +182,7 @@ public void shouldProcessViaThroughTopic() { | |||
} | |||
|
|||
@Test | |||
public void testMerge() { | |||
public void ShouldMergeStreams() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: camel case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please fix this, too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is just a renaming to make its name consistent with others, I will rename it with non-capitalized name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fwiw, this looks good to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, @guozhangwang LGTM
retest this please |
if (!stateChangelogTopics.containsKey(topicName)) { | ||
final InternalTopicConfig internalTopicConfig = createChangelogTopicConfig(stateFactory, topicName); | ||
stateChangelogTopics.put(topicName, internalTopicConfig); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be simplified with a single if-then-else
checking storeToChangelogTopic.containsKey(stateFactory.name())
once?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that storeToChangelogTopic
and stateChangelogTopics
are different :) The former is pre-built when the DSL is parsed, while the latter is built within the topicGroups
to get the changelog topic configs for topics related to the topic group, a.k.a the sub-topology alone.
@@ -174,7 +173,7 @@ public Integer apply(Integer value1, Integer value2) { | |||
1 + // to | |||
2 + // through | |||
1, // process | |||
StreamsBuilderTest.internalTopologyBuilder(builder).setApplicationId("X").build(null).processors().size()); | |||
TopologyWrapper.getInternalTopologyBuilder(builder.build()).setApplicationId("X").build(null).processors().size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: fix indention
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ack.
retest this please |
@mjsax addressed comments. |
@@ -58,13 +55,6 @@ | |||
private final StreamsBuilder builder = new StreamsBuilder(); | |||
private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String()); | |||
|
|||
@Test(expected = TopologyException.class) | |||
public void testFrom() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we remove this test? Rename -> shouldNotAllowToResueAutoGeneratedProcessorName
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is just a renaming to make its name consistent with others, I will rename it with non-capitalized name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
EDIT: actually for this test, I removed it because it was covered in TopologyTest#shouldNotAllowToAddSourcesWithSameName
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
…e's changelog (apache#5017) 1. In InternalTopologyBuilder#topicGroups, which is used in StreamsPartitionAssignor, look for book-kept storeToChangelogTopic map before creating a new internal changelog topics. In this way if the source KTable is created, its source topic stored in storeToChangelogTopic will be used. 2. Added unit test (confirmed that without 1) it will fail). 3. MINOR: removed TODOs that are related to removed KStreamBuilder. 4. MINOR: removed TODOs in StreamsBuilderTest util functions and replaced with TopologyWrapper. 5. MINOR: removed StreamsBuilderTest#testFrom as it is already covered by TopologyTest#shouldNotAllowToAddSourcesWithSameName, plus it requires KStreamImpl.SOURCE_NAME which should be a package private field of the KStreamImpl. Reviewers: John Roesler <john@confluent.io>, Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>
In InternalTopologyBuilder#topicGroups, which is used in StreamsPartitionAssignor, look for book-kept
storeToChangelogTopic
map before creating a new internal changelog topics. In this way if the source KTable is created, its source topic stored instoreToChangelogTopic
will be used.Added unit test (confirmed that without 1) it will fail).
MINOR: removed TODOs that are related to removed KStreamBuilder.
MINOR: removed TODOs in StreamsBuilderTest util functions and replaced with TopologyWrapper.
MINOR: removed
StreamsBuilderTest#testFrom
as it is already covered byTopologyTest#shouldNotAllowToAddSourcesWithSameName
, plus it requiresKStreamImpl.SOURCE_NAME
which should be a package private field of theKStreamImpl
.Committer Checklist (excluded from commit message)