Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: Adjust logic of conditions to set number of partitions in step zero of assignment. #7419

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -384,12 +384,13 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr
// if this topic is one of the sink topics of this topology,
// use the maximum of all its source topic partitions as the number of partitions
for (final String sourceTopicName : otherTopicsInfo.sourceTopics) {
final int numPartitionsCandidate;
int numPartitionsCandidate = 0;
// It is possible the sourceTopic is another internal topic, i.e,
// map().join().join(map())
if (repartitionTopicMetadata.containsKey(sourceTopicName)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here by checking if the source topic is a repartition topic and if the number of partitions are present, we are expecting they might not be available.

However, if that is the case we will drop down to the else block and this results in throwing an Exception. IMHO it seems that is not the intent of the logic, as we would always throw and Exception if any source topic (internal or otherwise) did not have a partition count available.

&& repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().isPresent()) {
numPartitionsCandidate = repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().get();
if (repartitionTopicMetadata.containsKey(sourceTopicName)) {
if (repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().isPresent()) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's the change if the source topic is a repartition topic, drop into a new block to check if the repartition topic has a partition count available. If not we don't throw an Exception, as we will only throw when a non-internal topic reports no partition count available.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bbejeck @vvcephei I checked the code that right now we use linked-hashmap for the node-groups / topic-groups construction, whose order is preserved. I think that means that assuming the topology is a DAG with no cycles, one pass from sub-topology 1 is arguably sufficient. However, once case that we did not handle today which is also why we are still doing a while-loop here is, e.g. (numbers are sub-topology indices):

1 -> 2,
1 -> 3,
3 -> 2

And if we loop over the order of 1,2,3, then when we are processing 2 since 3's not set yet we do no have the num.partitions for the repartition topic between 3 -> 2.

Looking at InternalTopologyBuilder#makeNodeGroups, I think it is possible that we ensure it ordered as

1 -> 3,
1 -> 2,
3 -> 3

so that we can make one pass without the while loop, and can also assume that the parent sub-topologies sink/repartition topic num.partitions are set when processing this, WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically, when ordering the non-source node groups we do not rely on Utils.sorted(nodeFactories.keySet() but rely on some specific logic that those non-source sub-topologies with all parents as source sub-topologies gets indexed first.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the idea @guozhangwang . If I understand right, it sounds like you're suggesting to propagate the partition count from (external) sources all the way through the topology, in topological order. If the partition count is purely determined by the external source topics, then it should indeed work to do this in topological order in one pass.

What I'm wondering now is whether there's any situation where some of the repartition topics might already exist with a specific number of partitions. An easy strawman is, "what if the operator has pre-created some of the internal topics?", which may or may not be allowed. Another is "what if the topology has changed slightly to add a new repartition topic early in the topology?" Maybe there are some other similar scenarios. I'm not sure if any of these are real possibilities, or if they'd affect the outcome, or if we want to disallow them anyway to make our lives easier.

WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those are good points, making a one-pass num.partition decision is not critical in our framework, and I think it's more or less a brainstorming with you guys to see if it is possible :) To me as long as we would not be stuck infinitely in the while loop it should be fine.

If user pre-create the topic with the exact xx-repartition name, then yes I think that could make things tricker. Also with KIP-221 the repartition hint, I'm not sure how that would affect this as well.

numPartitionsCandidate = repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().get();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This dates before this PR, but while reviewing it I realized that line 898 in prepareTopic:

topic.setNumberOfPartitions(numPartitions.get());

is not necessary since the numPartitions is read from the topic.

} else {
final Integer count = metadata.partitionCountForTopic(sourceTopicName);
if (count == null) {
Expand Down