Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-9298: reuse mapped stream error in joins #8504

Merged
merged 6 commits into from
May 27, 2020

Conversation

bbejeck
Copy link
Contributor

@bbejeck bbejeck commented Apr 17, 2020

When performing a join with a stream that needs repartitioning, Kafka Streams automatically creates a repartition topic. If the user does not use StreamJoined to name to repartition topic, Kafka Streams uses the generated name of the KStream instance for the repartition topic name.

If the KStream instance requiring the repartition participates in another join, the second repartition topic is created using the name of the operator. This name reuse is what causes the InvalidTopologyException. The error occurs because the InternalTopologyBuilder has already registered the repartition source name previously.

For example, this topology will cause an error because Kafka Streams will attempt to create two repartition topics (which is correct behavior) but using the same name each time which causes the error.

KStream<String, String> newStream = stream1.map((k, v) -> new KeyValue<>(v, k));
newStream.join(stream2, (value1, value2) -> value1 + value2, JoinWindows.of(ofMillis(100))).to("out-one");
// using newStream in another join here causes the error
newStream.join(stream3, (value1, value2) -> value1 + value2, JoinWindows.of(ofMillis(100))).to("out-to");

However this topology, which is the same except the user has provided repartition topic names, is fine. Note the use of StreamJoined.withName here

KStream<String, String> newStream = stream1.map((k, v) -> new KeyValue<>(v, k));
final StreamJoined<String, String, String> streamJoined = StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String());
newStream.join(stream2, (value1, value2) -> value1 + value2, JoinWindows.of(ofMillis(100)), streamJoined.withName("first-join")).to("out-one");
// using newStream in another join here is fine because the name of the repartition topic is unique
newStream.join(stream3, (value1, value2) -> value1 + value2, JoinWindows.of(ofMillis(100)), streamJoined.withName("second-join")).to("out-two");

This bug has been present for some time as I tested this out on 2.0 before we added the optimization layer.

Ideally, the fix should be to generate a repartition topic name each time to avoid such issues. But IMHO that ship has already sailed because by introducing a new name generation will cause compatibility issues for existing topologies. So generating new names is out for now, at least.

The proposed fix is:

  1. For KStream objects needing repartitioning and using generated names, reuse the repartition topic node in any additional joins.
  2. For KStream instances needing repartitioning using user-provided names always create a new repartition topic node for each join as each one will have a unique name

I've added tests confirming the expected behavior.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@@ -989,16 +994,18 @@ private void to(final TopicNameExtractor<K, V> topicExtractor,
null,
optimizableRepartitionNodeBuilder);

final OptimizableRepartitionNode<K, V> optimizableRepartitionNode = optimizableRepartitionNodeBuilder.build();
builder.addGraphNode(streamsGraphNode, optimizableRepartitionNode);
if (repartitionNode == null || !name.equals(repartitionName)) {
Copy link
Contributor Author

@bbejeck bbejeck Apr 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the fix. We'll generate the repartitionNode if it's the first time through or if the user has provided a name for the repartition topic. Otherwise, we cache the repartitionNode for use in subsequent joins on the same KStream object.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm... I am wondering if just bumping the index would be sufficient and the optimizer would merge the node automatically?

I am also not sure about the code structure: so far, the DSL layer does not know much about optimizations (even if we "leak" a little bit into it, as we built up the StreamsGraphNode graph... We would push some optimization decisions into the DSL layer thus spreading out "optimization code"? On the other hand, just inserting one OptimizableRepartitionNode is much more efficient than inserting multiple and let the optimizer remove them later?

I am also wondering, if we could do the same for other repartition topics?

Last question: this method is also use for stream-table joins and thus, if one joins a stream with two tables, would this change be backward incompatible? Or would two stream-table joins fail with the same InvalidTopologyException?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm... I am wondering if just bumping the index would be sufficient and the optimizer would merge the node automatically?

I hadn't thought of that, but it should work. I initially had concerns for topology compatibility, but I don't think that is the case since users can't create re-use a KStream node in joins that needs repartitioning.

I am also not sure about the code structure: so far, the DSL layer does not know much about optimizations (even if we "leak" a little bit into it, as we built up the StreamsGraphNode graph... We would push some optimization decisions into the DSL layer thus spreading out "optimization code"? On the other hand, just inserting one OptimizableRepartitionNode is much more efficient than inserting multiple and let the optimizer remove them later?

Yeah, I agree the current approach is leaking too much optimization into the current code. I think it will be better to just go ahead and create the topology "as is" and let the optimizer do its job.

I am also wondering, if we could do the same for other repartition topics?

We probably should have a consistent approach. How about I make the changes in this PR for the Join repartition topics (incrementing the index of the repartition node name) and do a follow-on PR to address the other repartition topics?

Last question: this method is also use for stream-table joins and thus, if one joins a stream with two tables, would this change be backward incompatible? Or would two stream-table joins fail with the same InvalidTopologyException?

I believe two stream-table joins without this fix will fail with the same exception, but I'll add some tests to confirm.

Copy link
Contributor

@vvcephei vvcephei Apr 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you made the mistake of asking my opinion, here it is :) :

bumping the index

It's true that users can't currently reuse the KStream, so there's no compatibility issue there, but we can't bump the index for the first repartition topic, or we would break every topology that uses generated repartition topic names already. So, either way, we have to cache something to tell us to do something different on the "first reuse" (i.e., the second use of the KStream).

Since we have to do that anyway, maybe it's fine to just cache the repartition node itself instead of a flag that says "bump the index next time".

leaking optimizations into the DSL

I'm on the fence about whether this is an "optimization" or "reasonable behavior". It sort of feels like the latter, and the only reason we needed to introduce the "repartition-collapsing" optimization is that we failed to introduce reasonable behavior from the beginning. Also, my read is that the DSL builder and the optimizer are not cleanly separated right now anyway, and if we ever want to build more optimizations, we'll most likely need to make another pass on both anyway. We're also starting to think about topology evolution (cc @cadonna ), which makes this a less scary prospect, as we can then implement a mechanism to compatibly introduce new optimizations. In other words, I'm not taking a hard stance, but leaning in the direction of doing the more efficient thing than the more pure thing, since we're not currently super pure anyway.

Other repartition topics

I think we'd better leave it alone for now, implement topology evolution, then migrate to a completely pure and consistent approach.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably should have a consistent approach. How about I make the changes in this PR for the Join repartition topics (incrementing the index of the repartition node name) and do a follow-on PR to address the other repartition topics?

@bbejeck Works for me.

I'm on the fence about whether this is an "optimization" or "reasonable behavior".

@bbejeck @vvcephei That was my reasoning from my other comments about "don't make the same mistake again", too (cf. #8504 (comment)). Atm, we just need to keep the old behavior for backward compatibility reasons and only give "reasonable" behavior via opt-in to the optimization. IMHO, optimization should be the default behavior anyway (not the other way round; we just have it that way due to compatibility constraints) and you should even be able to turn it off (if possible)

@bbejeck
Copy link
Contributor Author

bbejeck commented Apr 17, 2020

Java 8 passed

Java 11 failed with kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup updated existing Jira ticket

retest this please.

@bbejeck
Copy link
Contributor Author

bbejeck commented Apr 17, 2020

ping @mjsax @vvcephei for review

@bbejeck
Copy link
Contributor Author

bbejeck commented Apr 20, 2020

ping @mjsax and @vvcephei, I know you guys are busy, but do you mind having a look?

@mjsax
Copy link
Member

mjsax commented Apr 22, 2020

Ideally, the fix should be to generate a repartition topic name each time to avoid such issues. But IMHO that ship has already sailed because by introducing a new name generation will cause compatibility issues for existing topologies.

Why that? Because such a topology would hit the bug, it could never be deployed, and thus nobody can actually run such a topology? In fact, shouldn't we "burn" an index even if a name is provided (IIRC, we do this for some cases)?

I agree thought, that merging repartition topics (as proposed in (1)) should be done if possible (it's a historic artifact that we did not merge them in the past and IMHO we should not make the same mistake again?).

For (2), it's a tricky question because the different names are used for different stores and changelog topics (ie, main purpose?) -- it seems to be a "nasty side effect" if we would end up with two repartition topics for this case? Of course, given the new repartition() operator, a user can work around it by using it after map() and before calling join(). Just brainstorming here what the impact could be and what tradeoff we want to pick.

@bbejeck
Copy link
Contributor Author

bbejeck commented Apr 23, 2020

Why that? Because such a topology would hit the bug, it could never be deployed, and thus nobody can actually run such a topology? In fact, shouldn't be "burn" and index even if a name is provided (IIRC, we do this for some cases)?

Yes in some cases we increment the index when users provide names. But right now we don't increment the counter at all when creating repartition topics as we reuse the name as is. My main point is that if we started to generate new names for repartition topics we'd create topology compatibility issues as the newly generated name would bump the count for all downstream nodes. Right now I'm leaning towards going with the solution you presented in point one in #8504 (comment)

I agree thought, that merging repartition topics (as proposed in (1)) should be done if possible (it's a historic artifact that we did not merge them in the past and IMHO we should not make the same mistake again?).

But by doing so we are "leaking" optimization logic as you pointed out above. I'm leaning towards building the topology "as is", meaning create two repartition topics if that's what is required. But I don't have a strong opinion and I would be fine with keeping the current solution in this PR.

For (2), it's a tricky question because the different names are used for different stores and changelog topics (ie, main purpose?) -- it seems to be a "nasty side effect" if we would end up with two repartition topics for this case? Of course, given the new repartition() operator, a user can work around it by using it after map() and before calling join(). Just brainstorming here what the impact could be and what tradeoff we want to pick.

I'm not sure I follow here the "nasty side effect" comment.
If a user does streamA.join(streamB, ..., StreamJoined.name("foo") and streamA.join(streamC, ..., StreamJoined.name("bar") then we should create two repartiton topics as that's what the user is expecting. If they elect to use optimization then removing redundant repartition topics is expected behavior. I think this also goes back to your original comment about the leaking of optimization details.

@bbejeck
Copy link
Contributor Author

bbejeck commented Apr 23, 2020

@mjsax thanks for the review!

Copy link
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @bbejeck , thanks for the fix! I left some thoughts on the earlier conversation. I'm ok with @mjsax 's suggestion, but I'm also ok (and slightly prefer) the current PR.

@bbejeck
Copy link
Contributor Author

bbejeck commented Apr 25, 2020

I'm on the fence about whether this is an "optimization" or "reasonable behavior". It sort of feels like the latter, and the only reason we needed to introduce the "repartition-collapsing" optimization is that we failed to introduce reasonable behavior from the beginning.

Thanks for the comments @vvcephei. That is a good point you raise here.

@mjsax WDYT about I merge this PR as is and we re-visit repartition topics and optimizations in general later on possibly when looking at topology evolution?

@mjsax
Copy link
Member

mjsax commented May 2, 2020

we should create two repartiton topics as that's what the user is expecting.

Is she? I guess it's clearly document that the name is use for repartition topics, too, but it might still be an unexpected "side effect"? But I see the point that it's not too easy to resolve: Maybe we could log a warning about creating an unnecessary duplicated repartition topics (and update the docs how to resolve this, ie, via calling repartition() upfront before the join to specify one name)? Just picking one of both names seems wrong. Throwing an exception and refuse to build the topology seems too harsh?

Overall, it seems it might be best to merge the PR using the current approach, and merge the repartition topics directly in StreamsBuilder instead of waiting for the optimization (this also avoid the issue with index bumping and is better default behavior anyway).

Can you add the test for streamA-tableB, streamA-tableC join?

@bbejeck
Copy link
Contributor Author

bbejeck commented May 4, 2020

Is she? I guess it's clearly document that the name is use for repartition topics, too, but it might still be an unexpected "side effect"? But I see the point that it's not too easy to resolve: Maybe we could log a warning about creating an unnecessary duplicated repartition topics (and update the docs how to resolve this, ie, via calling repartition() upfront before the join to specify one name)? Just picking one of both names seems wrong. Throwing an exception and refuse to build the topology seems too harsh?

Thinking about this again, I'm now leaning towards reusing a repartition node regardless if user provides a name or not. We can update the docs to say repartition topics are only built when required and once one is created the same repartition topic is used again.

WDYT?

Can you add the test for streamA-tableB, streamA-tableC join?

Will do.

@bbejeck bbejeck force-pushed the KAFKA-9298_reuse_mapped_stream_error branch from fc604e3 to 2132bae Compare May 8, 2020 15:32
@bbejeck
Copy link
Contributor Author

bbejeck commented May 8, 2020

@mjsax updated this

@mjsax
Copy link
Member

mjsax commented May 8, 2020

Thinking about this again, I'm now leaning towards reusing a repartition node regardless if user provides a name or not. We can update the docs to say repartition topics are only built when required and once one is created the same repartition topic is used again.

WDYT?

Works for me. Uses can also use TopologyDescription go get more details. We just need to ensure that we pick the same name each time... \cc @vvcephei

@bbejeck
Copy link
Contributor Author

bbejeck commented May 8, 2020

Works for me

Ack, I'll make the updates soon

@bbejeck
Copy link
Contributor Author

bbejeck commented May 9, 2020

@mjsax - Updated this to always reuse repartition graph node.

Also, Aggregates follow a similar pattern to what joins used to do. IMHO we need to make the behavior consistent, so I've created https://issues.apache.org/jira/browse/KAFKA-9976 PR #8637

@bbejeck
Copy link
Contributor Author

bbejeck commented May 12, 2020

One failure for Java 8, 11 and 14 test results cleanup already.

retest this please.

@bbejeck
Copy link
Contributor Author

bbejeck commented May 12, 2020

Java 8 failed with 11:46:32 Caused by: java.lang.OutOfMemoryError: unable to create new native thread

@mjsax
Copy link
Member

mjsax commented May 12, 2020

@bbejeck Are the any backward compatibility concerns for KAFKA-9976 ?

public void shouldReuseRepartitionTopicWithGeneratedName() {
final StreamsBuilder builder = new StreamsBuilder();
final Properties props = new Properties();
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.NO_OPTIMIZATION);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the default. Why setting is explicitly?

newStream.join(stream3, (value1, value2) -> value1 + value2, JoinWindows.of(ofMillis(100)), streamJoined.withName("second-join")).to("out-two");
final Topology topology = builder.build(props);
System.out.println(topology.describe().toString());
assertEquals(expectedTopologyWithUserNamedRepartitionTopics, topology.describe().toString());
Copy link
Member

@mjsax mjsax May 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for being undecided... Reading the code now, I am wondering if this behavior may become problematic with regard to topology upgrade. Assume, the first join is removed. Technically, the new topology is compatible, but we would now generate a new repartition topic name, and thus it's not compatible. This could be fixed by inserting a repartition() in the new code enforcing the old name -- however, this makes me wonder if we might want to throw a "naming conflict" (ie, cannot pick a name) exception based on the original topology for this case when both operators are named, and tell people to insert repartition() right away? For this case, if they later remove a join it's clear what is happening to them.

Ie, we should still not create two repartition topics what would be "bad" (user could still enforce if by calling repartition() twice), but just throw with an informative error message? -- Curious what @vvcephei thinks?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be fixed by inserting a repartition() i the new code enforcing the old name -- however, this make me wonder if we might want to throw a "naming conflict" (ie, cannot pick a name) exception based on the original topology for this case when both operators are named, and tell people to insert repartition() right away? For this case, if they later remove a join it's clear what is happening to them.

I see your point, but I think that is a bad user experience and IMHO leaks too much detail about an operation we want to handle automatically.

I'm leaning towards the simpler case of what we had before. With generated names re-use the reputation node, but if the user creates a new join with explicit names, just go ahead and create two repartition topics.

WDYT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess both are acceptable solutions (ie, creating two repartition topics or throwing an exception). Your proposal is more user friendly but results in a more expensive deployment. The question might be, what do we try to optimize for?

\cc @vvcephei @guozhangwang

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the discussion, all.

Coming back to this proposal, and considering the points you've raised, it seems like we should re-use the generated repartition node when the name is generated, and create two repartition nodes when they are named.

The purpose of re-using the repartition node in this PR isn't exactly to optimize anything, just to avoid throwing the exception that happens when we currently try to create the exact same repartition node twice. We could instead always create two nodes, but this is needlessly wasteful. Reusing the same-named node makes perfect sense.

When the operations are named, on the other hand, there is no problem right now, since we are creating differently named nodes. Since there's no problem, we shouldn't "solve" it ;)

It's true that this isn't the most optimal physical plan, but for anyone who cares enough to look into it, they can just add the repartition node first, as you suggested @mjsax; we don't need to throw an exception to force them to fine-tune their program.

The other option is that they can enable topology optimization, which will also collapse the named repartition nodes in a well-defined way.

Compatibility is a concern, and it seems like it's satisfied if we follow this path:

  1. You currently cannot reuse the same stream in two anonymous joins, so we can share the node without breaking any program
  2. You currently can reuse the same stream in two named joins, and we will create two (named) repartition topics. We have no choice but to maintain this, or we will break compatibility.
  3. Inserting a repartition node is well defined to break compatibility, so people will know they have to reset.
  4. Adding Optimization is well defined to break compatibility, so people will know they have to reset.

Have I missed some consideration?
Thanks,
-John

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @vvcephei -- that is convincing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the discussion @vvcephei and @mjsax. I'll revert this PR to its original state which conforms to @vvcephei's comments above.

@bbejeck
Copy link
Contributor Author

bbejeck commented May 20, 2020

@bbejeck Are the any backward compatibility concerns for KAFKA-9976 ?

Good point, maybe we should just leave this one alone for now. If you agree I'll close that PR.

@bbejeck bbejeck force-pushed the KAFKA-9298_reuse_mapped_stream_error branch from eb70301 to 0861510 Compare May 22, 2020 02:43
@bbejeck
Copy link
Contributor Author

bbejeck commented May 22, 2020

@mjsax @vvcephei updated (rebased as well) this to original PR state, only reuse repartition node when the name is generated.

@bbejeck
Copy link
Contributor Author

bbejeck commented May 26, 2020

Java 8, 11, and 14 failed. Test results cleaned out already.

Retest this please.

@bbejeck
Copy link
Contributor Author

bbejeck commented May 26, 2020

Java 8 passed.

Java 11 failed with org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[false]

Java 14 failed with org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[false] and org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]

I've updated the existing Jira tickets.

Retest this please.

@bbejeck
Copy link
Contributor Author

bbejeck commented May 26, 2020

Java 8 passed,
Java 11 failed with

  1. org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testReplication
  2. org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[false]
  3. org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]

Java 14 failed with

  1. org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[false]
  2. org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]

Retest this please.

@bbejeck
Copy link
Contributor Author

bbejeck commented May 26, 2020

Retest this please

@bbejeck
Copy link
Contributor Author

bbejeck commented May 26, 2020

retest this please

@bbejeck
Copy link
Contributor Author

bbejeck commented May 27, 2020

Java 14 passed,
Java 11 failed with

21:42:58 > Process 'Gradle Test Executor 48' finished with non-zero exit value 1

Java 8 failed with org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest > shouldUpgradeFromEosAlphaToEosBeta[true] FAILED

retest this please

Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Feel free to merge. (Sorry for the forth-and-back discussion on the PR -- it's just a delicate subject to make sure we don't break compatibility.)

@mjsax
Copy link
Member

mjsax commented May 27, 2020

Java 14:

org.apache.kafka.connect.integration.InternalTopicsIntegrationTest.testCreateInternalTopicsWithFewerReplicasThanBrokers
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]

Java 11: org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]
Java 8: org.apache.kafka.streams.integration.KTableSourceTopicRestartIntegrationTest.shouldRestoreAndProgressWhenTopicNotWrittenToDuringRestoration

@mjsax mjsax merged commit cec6202 into apache:trunk May 27, 2020
@bbejeck
Copy link
Contributor Author

bbejeck commented May 27, 2020

(Sorry for the forth-and-back discussion on the PR -- it's just a delicate subject to make sure we don't break compatibility.)

No worries @mjsax! it was a great discussion, and it's always good to go over these type of issues thoroughly

bbejeck added a commit that referenced this pull request May 27, 2020
Reviewers: Matthias J. Sax <matthias@confluent.io>, John Roesler <john@confluent.io>
@bbejeck
Copy link
Contributor Author

bbejeck commented May 27, 2020

cherry-picked to 2.5

@bbejeck bbejeck deleted the KAFKA-9298_reuse_mapped_stream_error branch July 10, 2024 13:58
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants