Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: Adjust logic of conditions to set number of partitions in step zero of assignment. #7419

Conversation

bbejeck
Copy link
Contributor

@bbejeck bbejeck commented Sep 30, 2019

A minor change in logic to account for repartition topics where we might not have the num partitions yet in the metadata.

Ran all existing tests plus all streams system tests

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

// It is possible the sourceTopic is another internal topic, i.e,
// map().join().join(map())
if (repartitionTopicMetadata.containsKey(sourceTopicName)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here by checking if the source topic is a repartition topic and if the number of partitions are present, we are expecting they might not be available.

However, if that is the case we will drop down to the else block and this results in throwing an Exception. IMHO it seems that is not the intent of the logic, as we would always throw and Exception if any source topic (internal or otherwise) did not have a partition count available.

&& repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().isPresent()) {
numPartitionsCandidate = repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().get();
if (repartitionTopicMetadata.containsKey(sourceTopicName)) {
if (repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().isPresent()) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's the change if the source topic is a repartition topic, drop into a new block to check if the repartition topic has a partition count available. If not we don't throw an Exception, as we will only throw when a non-internal topic reports no partition count available.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bbejeck @vvcephei I checked the code that right now we use linked-hashmap for the node-groups / topic-groups construction, whose order is preserved. I think that means that assuming the topology is a DAG with no cycles, one pass from sub-topology 1 is arguably sufficient. However, once case that we did not handle today which is also why we are still doing a while-loop here is, e.g. (numbers are sub-topology indices):

1 -> 2,
1 -> 3,
3 -> 2

And if we loop over the order of 1,2,3, then when we are processing 2 since 3's not set yet we do no have the num.partitions for the repartition topic between 3 -> 2.

Looking at InternalTopologyBuilder#makeNodeGroups, I think it is possible that we ensure it ordered as

1 -> 3,
1 -> 2,
3 -> 3

so that we can make one pass without the while loop, and can also assume that the parent sub-topologies sink/repartition topic num.partitions are set when processing this, WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically, when ordering the non-source node groups we do not rely on Utils.sorted(nodeFactories.keySet() but rely on some specific logic that those non-source sub-topologies with all parents as source sub-topologies gets indexed first.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the idea @guozhangwang . If I understand right, it sounds like you're suggesting to propagate the partition count from (external) sources all the way through the topology, in topological order. If the partition count is purely determined by the external source topics, then it should indeed work to do this in topological order in one pass.

What I'm wondering now is whether there's any situation where some of the repartition topics might already exist with a specific number of partitions. An easy strawman is, "what if the operator has pre-created some of the internal topics?", which may or may not be allowed. Another is "what if the topology has changed slightly to add a new repartition topic early in the topology?" Maybe there are some other similar scenarios. I'm not sure if any of these are real possibilities, or if they'd affect the outcome, or if we want to disallow them anyway to make our lives easier.

WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those are good points, making a one-pass num.partition decision is not critical in our framework, and I think it's more or less a brainstorming with you guys to see if it is possible :) To me as long as we would not be stuck infinitely in the while loop it should be fine.

If user pre-create the topic with the exact xx-repartition name, then yes I think that could make things tricker. Also with KIP-221 the repartition hint, I'm not sure how that would affect this as well.

@bbejeck
Copy link
Contributor Author

bbejeck commented Sep 30, 2019

ping @guozhangwang, and @vvcephei for review

Copy link
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, good call, @bbejeck . Thanks for the fix!

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change LGTM, I left some minor comments and also a meta one for improving in a future PR.

if (repartitionTopicMetadata.containsKey(sourceTopicName)) {
if (repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().isPresent()) {
numPartitionsCandidate = repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().get();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This dates before this PR, but while reviewing it I realized that line 898 in prepareTopic:

topic.setNumberOfPartitions(numPartitions.get());

is not necessary since the numPartitions is read from the topic.

&& repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().isPresent()) {
numPartitionsCandidate = repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().get();
if (repartitionTopicMetadata.containsKey(sourceTopicName)) {
if (repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().isPresent()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bbejeck @vvcephei I checked the code that right now we use linked-hashmap for the node-groups / topic-groups construction, whose order is preserved. I think that means that assuming the topology is a DAG with no cycles, one pass from sub-topology 1 is arguably sufficient. However, once case that we did not handle today which is also why we are still doing a while-loop here is, e.g. (numbers are sub-topology indices):

1 -> 2,
1 -> 3,
3 -> 2

And if we loop over the order of 1,2,3, then when we are processing 2 since 3's not set yet we do no have the num.partitions for the repartition topic between 3 -> 2.

Looking at InternalTopologyBuilder#makeNodeGroups, I think it is possible that we ensure it ordered as

1 -> 3,
1 -> 2,
3 -> 3

so that we can make one pass without the while loop, and can also assume that the parent sub-topologies sink/repartition topic num.partitions are set when processing this, WDYT?

&& repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().isPresent()) {
numPartitionsCandidate = repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().get();
if (repartitionTopicMetadata.containsKey(sourceTopicName)) {
if (repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().isPresent()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically, when ordering the non-source node groups we do not rely on Utils.sorted(nodeFactories.keySet() but rely on some specific logic that those non-source sub-topologies with all parents as source sub-topologies gets indexed first.

@guozhangwang guozhangwang merged commit d53eab1 into apache:trunk Sep 30, 2019
guozhangwang pushed a commit that referenced this pull request Jan 7, 2020
…ics counts (#7904)

This PR fixes the regression introduced in 2.4 from 2 refactoring PRs:
#7249
#7419

The bug was introduced by having a logical path leading numPartitionsCandidate to be 0, which is assigned to numPartitions and later being checked by setNumPartitions. In the subsequent check we will throw illegal argument if the numPartitions is 0.

This bug is both impacting new 2.4 application and upgrades to 2.4 in certain types of topology. The example in original JIRA was imported as a new integration test to guard against such regression. We also verify that without the bug fix application will still fail by running this integration test.

Reviewers: Guozhang Wang <wangguoz@gmail.com>
guozhangwang pushed a commit that referenced this pull request Jan 7, 2020
…ics counts (#7904)

This PR fixes the regression introduced in 2.4 from 2 refactoring PRs:
#7249
#7419

The bug was introduced by having a logical path leading numPartitionsCandidate to be 0, which is assigned to numPartitions and later being checked by setNumPartitions. In the subsequent check we will throw illegal argument if the numPartitions is 0.

This bug is both impacting new 2.4 application and upgrades to 2.4 in certain types of topology. The example in original JIRA was imported as a new integration test to guard against such regression. We also verify that without the bug fix application will still fail by running this integration test.

Reviewers: Guozhang Wang <wangguoz@gmail.com>
guozhangwang pushed a commit to confluentinc/kafka that referenced this pull request Jan 7, 2020
…ics counts (apache#7904)

This PR fixes the regression introduced in 2.4 from 2 refactoring PRs:
apache#7249
apache#7419

The bug was introduced by having a logical path leading numPartitionsCandidate to be 0, which is assigned to numPartitions and later being checked by setNumPartitions. In the subsequent check we will throw illegal argument if the numPartitions is 0.

This bug is both impacting new 2.4 application and upgrades to 2.4 in certain types of topology. The example in original JIRA was imported as a new integration test to guard against such regression. We also verify that without the bug fix application will still fail by running this integration test.

Reviewers: Guozhang Wang <wangguoz@gmail.com>
qq619618919 pushed a commit to qq619618919/kafka that referenced this pull request May 12, 2020
…ics counts (apache#7904)

This PR fixes the regression introduced in 2.4 from 2 refactoring PRs:
apache#7249
apache#7419

The bug was introduced by having a logical path leading numPartitionsCandidate to be 0, which is assigned to numPartitions and later being checked by setNumPartitions. In the subsequent check we will throw illegal argument if the numPartitions is 0.

This bug is both impacting new 2.4 application and upgrades to 2.4 in certain types of topology. The example in original JIRA was imported as a new integration test to guard against such regression. We also verify that without the bug fix application will still fail by running this integration test.

Reviewers: Guozhang Wang <wangguoz@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
3 participants