Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-10847: Set StreamsConfig on InternalTopologyDriver before writing topology #10640

Merged
merged 3 commits into from May 7, 2021

Conversation

spena
Copy link
Contributor

@spena spena commented May 6, 2021

While running some tests, I noticed the KSTREAMS-OUTERSHARED store, used in left/outer joins, was still added in the list of state stores when the StreamsConfig.InternalConfig.ENABLE_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX flag was false. When this flag is false, the shared store should not be added to the join nodes.

Testing

  • Added unit tests

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@mjsax mjsax added the streams label May 6, 2021
@@ -290,6 +290,10 @@ public void buildAndOptimizeTopology(final Properties props) {
}

if (streamGraphNode.allParentsWrittenToTopology() && !streamGraphNode.hasWrittenToTopology()) {
if (props != null && !props.isEmpty()) {
internalTopologyBuilder.setStreamsConfig(new StreamsConfig(props));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should unify this can call this method only once.

Why not just call it with KafkaStreams constructor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ksql calls the StreamsBuilder first, which calls the StreamStreamJoinNode.writeToTopology() at some point. When KafkaStreams is called later, the shared state store was already added to the join nodes.

@@ -119,6 +120,11 @@ public Integer buildPriority() {

public abstract void writeToTopology(final InternalTopologyBuilder topologyBuilder);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if removing this method and replace it with the below method with the properties parameter is better. There are several classes that implement GraphNode and that will need to accept the new parameter. It is an internal interface and impl, though. Do you think I should go with that approach instead?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. Might be worth to just remove this one even if we need to update all implementations.

@@ -98,12 +108,19 @@ public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
topologyBuilder.addStateStore(thisWindowStoreBuilder, thisWindowedStreamProcessorName, otherProcessorName);
topologyBuilder.addStateStore(otherWindowStoreBuilder, otherWindowedStreamProcessorName, thisProcessorName);

final StreamsConfig streamsConfig = topologyBuilder.getStreamsConfig();
if (streamsConfig == null || StreamsConfig.InternalConfig.getBoolean(streamsConfig.originals(), ENABLE_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX, true)) {
if (props == null || StreamsConfig.InternalConfig.getBoolean(toMap(props), ENABLE_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX, true)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of using toMap, can't we just to new HashMap<String,String>(props)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HashMap does not have a constructor that accepts Properties.

Copy link
Member

@mjsax mjsax May 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This works (note that Properties implements Map<Object, Object>):

Properties p = new Properties();
Map<String, Object> foo = new HashMap(p);

So you should be able to do getBoolean(new HashMap(props), ...) (Need to omit the generics though...)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. It wasn't working initially, I probably had a syntax error.

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made a pass on the new commit, I do not have further comments. @mjsax please feel free to merge when it LGTY.

@spena spena force-pushed the set_config_internal_topology branch from ca620bc to 2d3b936 Compare May 6, 2021 21:23
@mjsax mjsax merged commit 45d7440 into apache:trunk May 7, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
3 participants