Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KAFKA-6328] Exclude node groups belonging to global stores in InternalTopologyBuilder#makeNodeGroups #4339

Closed
wants to merge 14 commits into from

Conversation

ConcurrencyPractitioner
Copy link
Contributor

No description provided.

@mjsax
Copy link
Member

mjsax commented Dec 18, 2017

Test failures seems related. Please update PR before we can review.

@guozhangwang
Copy link
Contributor

The system tests failures are relevant.

The NPE is because InternalTopologyBuilder#buildGlobalStateTopology calls globalNodeGroups, which then calls nodeGroups and which in turn calls makeNodeGroups. So skipping those groups for global stores will cause InternalTopologyBuilder#buildGlobalStateTopology to not return anything.

So on a second though, we cannot simply just skip building the group for global stores; instead we can consider using negative group ids for them (this is allowed as we return Map<Integer, Set<String>> for nodeGroups()). Then we can also simplify the related functions: globalNodeGroups, describeSubtopologies, describeGlobalStores and topicGroups() to rely on the key of the map directly to tell if the group is for global stores or not.

@ConcurrencyPractitioner

@ConcurrencyPractitioner
Copy link
Contributor Author

Oh, I used a boolean parameter instead to determine whether or not to keep the global node groups. I think that way we could also accomplish the same thing.

@guozhangwang
Copy link
Contributor

Oh, I used a boolean parameter instead to determine whether or not to keep the global node groups. I think that way we could also accomplish the same thing.

I was looking at an older version of the PR with only the first commit. A boolean flag could also work but I think creating the node groups with negative index values on the key could also help simplifying the other logic a bit. WDYT?

@ConcurrencyPractitioner
Copy link
Contributor Author

ConcurrencyPractitioner commented Dec 20, 2017

@guozhangwang Ok, I have simplified the logic a bit.


if (!isNodeGroupOfGlobalStores) {
describeSubtopology(description, nodeGroup.getKey(), allNodesOfGroups);
}
}
}

private boolean nodeGroupContainsGlobalSourceNode(final Set<String> allNodesOfGroups) {
for (final String node : allNodesOfGroups) {
if (isGlobalSource(node)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could change other callers of isGlobalSource and use the index instead? E.g. describeGlobalStores(), and then this isGlobalSource itself could be removed.

@guozhangwang
Copy link
Contributor

@ConcurrencyPractitioner could you try to run unit test locally before updating the PR? The command line is ./gradlew streams:test.

@ConcurrencyPractitioner
Copy link
Contributor Author

ConcurrencyPractitioner commented Dec 20, 2017

@guozhangwang I think I have spotted some inconsistency with using negative integers as indicating a node group to be a global node group. For starters, in the original code, the key used in makeNodeGroups#nodeGroups always start at zero (denoted by the variable nodeGroupId). And many of the tests written calling makeNodeGroups took this into account and required that the key mapped to the first node group would also start from zero. Hence, when I changed the initial integer key to equal to one, a considerable number of tests failed (at least five). Meanwhile, when starting with the integer key zero, it is impossible to tell whether the very first node group (which is assigned a integer key of 0) is global due to zero's non-positive or non-negative nature. Do you think there is a way around this?

@ConcurrencyPractitioner
Copy link
Contributor Author

What I could do is add an extra check specifically for the zero case to check if its global.

On another note, I would like to point out that by this point, negative integers for keys does not seem to be any more simple than using only a boolean flag. From my understanding, your approach might as well be increasing the logic complexity involved, since it requires drastic changes with nodeGroupId as well modifying the structure of related tests. So I think we should switch back to using boolean flags to make the code more readable. WDYT?

@guozhangwang
Copy link
Contributor

I see your point. Though the makeNodeGroups(boolean removeGlobalNodeGroups) approach has another side effect:

  1. StreamPartitionAssignor is responsible for creating the tasks, with their task ids x_x and assign them to the clients. This task id is used in many places and also printed in many log4j entries.

  2. If we removeGlobalNodeGroups when makeNodeGroups, then the first digits of the task ids would be shifted. Hence when you topology.describe() you may, for example, see a sub-topology 3 while its tasks printed out in log4j are actually 4_1, 4_2 because the former used removeGlobalNodeGroups while the latter does not.

On the other hand, my proposed solution would have a bad effect on nodeGroupId as well, since the task ids would change as well when you upgrade and hence break lots of things...

So I'm wondering, if it would be a simpler change to just improve the TopologyDescription#subtopologiesAsString and #globalStoresAsString to list the sub-topologies and global stores following the groupId ordering and also add the id in the toString of the global store description?

@ConcurrencyPractitioner
Copy link
Contributor Author

ConcurrencyPractitioner commented Dec 21, 2017

@guozhangwang I think we could have a way to partition nodes into global and non global. Namely after all the nodes have been added, near the very end of makeNodeGroups, right before nodeGroups is returned, we could sort the node groups into distinct categories such that when nodeGroups#keySet() is called, the first half of the set will be keys with a global node group as a value and the second half will have keys with non-global node group as a value. Is this permissible?

@ConcurrencyPractitioner
Copy link
Contributor Author

@guozhangwang Will be on a vacation for a few days, so wouldn't be able to respond to your comments. Thanks!

@guozhangwang
Copy link
Contributor

@ConcurrencyPractitioner The problem that I was pointing out, is that we need to keep compatibility in our mind: assume you are going to upgrade your streams app from 1.0.0 to 1.1.0 with this fix, and then task 3_3 becomes task 2_3 because we reorder the sub-topology ids, then sticky assignor will not work and we would unnecessarily move tasks around which would add significant rebalance delay. So as long as your proposed solution can still maintain the indexing that would be fine, though I personally cannot come up with a better solution that would keep compatibility.

Happy holidays! We can chat later when you are back.

@ConcurrencyPractitioner
Copy link
Contributor Author

Got it. I would just like to note that the keys which the node groups are mapped to are never changed. I am only rearranging the order in which the Map.Entrys are presented by sorting them into distinctive groups. I don't think sorting Map.Entrys would not change the sub-topology ids in anyway.

@ConcurrencyPractitioner
Copy link
Contributor Author

Typo above: I don't think sorting Map.Entrys would change the sub-topology ids in anyway.

@guozhangwang
Copy link
Contributor

Is the current PR your final proposal? It seems only sorted the makeNodeGroups function to make sure non-global sub-topology are returned before global sub-topologies, how that would help in displaying the describe() returned string output?

@ConcurrencyPractitioner
Copy link
Contributor Author

ConcurrencyPractitioner commented Jan 3, 2018

On one note, the partitioning above I think was intended to insure that global node groups come before non-global.

On another note, this PR is not my final proposal. My intention was to present a partitioning not to improve on describe() but rather another attempt at what KAFKA-6328 was initially described to be (exclude node groups). But rather than excluding global node groups, I sort node groups into global and non-global to allow other methods which call makeNodeGroups to be simpler. For example, globalNodeGroups() would only have to iterate over the global node groups at the beginning, then once non-global has been reached, we could terminate.

…alTopologyBuilder

Testing

[KAFKA-6328] Exclude node groups belonging to global stores in InternalTopologyBuilder#makeNodeGroups

Removing foolish recursion error

Adding checks for global node groups

Converted to negative partitions

Adding sorting mechanism
@guozhangwang
Copy link
Contributor

Note that as of today, in topology.describe().toString() the sub-topologies and global stores are already separated, it is just that since the global stores does not include the index in it the display may be confusing: say node group 0, 1, 3, 4 are normal sub-topologies and 2 is for a global store; when we check the task metrics etc, we would only see 0_x, 1_x, 3_x, 4_x but not 2_x, users who are not aware of global stores also taking an index number may think there is an issue; and then when check the topology they will see

sub-topology 1:
   ...
sub-topology 2:
   ...
sub-topology 4:
   ...
sub-topology 5:
   ...
global store:
   ...

Note the global store display does not include the index, and hence they may not realize it is actually for group 3.

What I was thinking, that if we cannot exclude the global store from the ids in a compatible way, we can change the display in a way like:

sub-topology 1:
   ...
sub-topology 2:
   ...
global store 3 (no tasks will be created for this sub-topology):
   ...
sub-topology 4:
   ...
sub-topology 5:
   ...

this may help users understand the topology re-presentation better.

@ConcurrencyPractitioner
Copy link
Contributor Author

Oh, I understand your point. I will see what I can do.

@ConcurrencyPractitioner
Copy link
Contributor Author

ConcurrencyPractitioner commented Jan 4, 2018

Currently, in my last commit I have simplified InternalTopologyBuilder#describe() a bit. I am still working towards changing how topologies are displayed.

@guozhangwang
Copy link
Contributor

Thanks! Your current commit looks promising. Please ping me again when you are finally done.

@ConcurrencyPractitioner
Copy link
Contributor Author

ConcurrencyPractitioner commented Jan 10, 2018

@guozhangwang It appears that TopologyDescription.GlobalStore in fact does not store the index in its fields. In GlobalStore#toString(), we have:

        public String toString() {
            return "GlobalStore: " + source.name + " (topic: " + source.topics + ")\n      --> "
                + processor.name + " (store: " + processor.stores.iterator().next() + ")\n";
        }

The index number is not given, nor is it one of the fileds in GlobalStore. Meanwhile, in Subtopology#toString(), we have:

@Override
        public String toString() {
            return "Sub-topology: " + id + "\n" + nodesAsString();
        }

In this case, the id number is present. This might mean that your above suggestion to change TopologyDesciption#toString() might be infeasible.

@guozhangwang
Copy link
Contributor

Could we add the index id to TopologyDescription.GlobalStore via constructor then? More specifically, in describeGlobalStores add the nodeGroup.getKey() to GlobalStore just as we did in describeSubTopologies, note that it is all internal functions so nothing gets changed publicly.

@ConcurrencyPractitioner
Copy link
Contributor Author

@guozhangwang Ok, I have added a id field to TopologyDescription.GlobalStore.

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In line 1243 of InternalTopologyBuilder, in GlobalStore#toString(), could we add the id as well?

@guozhangwang
Copy link
Contributor

LGTM. Edited on the commit message and minor updates on the description message before merging. Thanks.

guozhangwang pushed a commit that referenced this pull request Jan 17, 2018
…ologyBuilder#makeNodeGroups

Author: RichardYuSTUG <yohan.richard.yu2@gmail.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #4339 from ConcurrencyPractitioner/kafka-6238

Minor edits on description
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants