Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-10847: Set StreamsConfig on InternalTopologyDriver before writing topology #10640

Merged
merged 3 commits into from May 7, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -290,6 +290,10 @@ public void buildAndOptimizeTopology(final Properties props) {
}

if (streamGraphNode.allParentsWrittenToTopology() && !streamGraphNode.hasWrittenToTopology()) {
if (props != null && !props.isEmpty()) {
internalTopologyBuilder.setStreamsConfig(new StreamsConfig(props));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should unify this can call this method only once.

Why not just call it with KafkaStreams constructor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ksql calls the StreamsBuilder first, which calls the StreamStreamJoinNode.writeToTopology() at some point. When KafkaStreams is called later, the shared state store was already added to the join nodes.

}

streamGraphNode.writeToTopology(internalTopologyBuilder);
streamGraphNode.setHasWrittenToTopology(true);
}
Expand Down
Expand Up @@ -116,6 +116,9 @@ public void testLeftJoinWithSpuriousResultFixDisabled() {
driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();

// Only 2 window stores should be available
assertEquals(2, driver.getAllStateStores().size());

// push two items to the primary stream; the other window is empty
// w1 {}
// w2 {}
Expand Down Expand Up @@ -167,6 +170,9 @@ public void testLeftJoinDuplicatesWithFixDisabled() {
driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();

// Only 2 window stores should be available
assertEquals(2, driver.getAllStateStores().size());

inputTopic1.pipeInput(0, "A0", 0);
inputTopic1.pipeInput(0, "A0-0", 0);
inputTopic2.pipeInput(0, "a0", 0);
Expand Down Expand Up @@ -488,6 +494,9 @@ public void runLeftJoin(final StreamJoined<Integer, String, String> streamJoined
driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();

// 2 window stores + 1 shared window store should be available
assertEquals(3, driver.getAllStateStores().size());

// push two items to the primary stream; the other window is empty
// w1 {}
// w2 {}
Expand Down
Expand Up @@ -112,6 +112,9 @@ public void testOuterJoinDuplicatesWithFixDisabled() {
driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();

// Only 2 window stores should be available
assertEquals(2, driver.getAllStateStores().size());

inputTopic1.pipeInput(0, "A0", 0);
inputTopic1.pipeInput(0, "A0-0", 0);
inputTopic2.pipeInput(0, "a0", 0);
Expand Down Expand Up @@ -557,6 +560,9 @@ public void runOuterJoin(final StreamJoined<Integer, String, String> streamJoine
driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();

// 2 window stores + 1 shared window store should be available
assertEquals(3, driver.getAllStateStores().size());

// push two items to the primary stream; the other window is empty; this should not
// produce any items because window has not expired
// w1 {}
Expand Down