Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-5226: Fixes issue where adding topics matching a regex #3157

Conversation

bbejeck
Copy link
Contributor

@bbejeck bbejeck commented May 26, 2017

subscribed stream may not be detected by all followers until
onJoinComplete returns.

subscribed stream may not be detected by all followers until
onJoinComplete returns.
@bbejeck
Copy link
Contributor Author

bbejeck commented May 26, 2017

This PR address situation where adding topics matching a regex subscribed stream may not be detected by all followers in StreamPartitionAssignor.subscribe but are picked up when
ConsumerCoordinator.onJoinComplete returns.

ping @mjsax @dguy @guozhangwang for review

@asfbot
Copy link

asfbot commented May 26, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/4456/
Test PASSed (JDK 7 and Scala 2.11).

@asfbot
Copy link

asfbot commented May 26, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/4442/
Test FAILed (JDK 8 and Scala 2.12).

@bbejeck
Copy link
Contributor Author

bbejeck commented May 26, 2017

The test failure is kafka.admin.DescribeConsumerGroupTest.testDescribeExistingGroupWithNoMembersWithNewConsumer so I think this is unrelated to streams and this PR

@bbejeck
Copy link
Contributor Author

bbejeck commented May 26, 2017

retest this please

@asfbot
Copy link

asfbot commented May 27, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/4463/
Test PASSed (JDK 7 and Scala 2.11).

@asfbot
Copy link

asfbot commented May 27, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/4449/
Test PASSed (JDK 8 and Scala 2.12).

@guozhangwang
Copy link
Contributor

Good find!

About the fix itself, in ConsumerCoordinator we already have the logic for handling unknown topics coming from the leader's assignment:

// check if the assignment contains some topics that were not in the original
        // subscription, if yes we will obey what leader has decided and add these topics
        // into the subscriptions as long as they still match the subscribed pattern
        //
        // TODO this part of the logic should be removed once we allow regex on leader assign
        Set<String> addedTopics = new HashSet<>();
        for (TopicPartition tp : subscriptions.assignedPartitions()) {
            if (!joinedSubscription.contains(tp.topic()))
                addedTopics.add(tp.topic());
        }

        if (!addedTopics.isEmpty()) {
            Set<String> newSubscription = new HashSet<>(subscriptions.subscription());
            Set<String> newJoinedSubscription = new HashSet<>(joinedSubscription);
            newSubscription.addAll(addedTopics);
            newJoinedSubscription.addAll(addedTopics);

            this.subscriptions.subscribeFromPattern(newSubscription);
            this.joinedSubscription = newJoinedSubscription;
        }

        // update the metadata and enforce a refresh to make sure the fetcher can start
        // fetching data in the next iteration
        this.metadata.setTopics(subscriptions.groupSubscription());
        client.ensureFreshMetadata();

I think a better way to fix it is to modify StreamPartitionAssignor#onAssignment which is called after the above code and hence when it is called the metadata should have been refreshed. And in that call we can trigger streamThread.builder.updateSubscriptions().

Also if we do that in onAssignment, maybe we do not need to call updateSubscription in StreamPartitionAssignor#subscription() any more?


try {
streamsLeader.start();
Thread.sleep(1000);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest avoid Thread.sleep() as much as possible in unit test as it will introduce timing-dependent flakiness. If there is any condition that we need to wait on consider using TestUtils.waitForCondition.

Also this test seem not always testing the race condition that can cause the issue; I'd suggest make it really "modular" by adding a test for StreamThread, in which we do not need to start the thread but just mocking its embedded StreamPartitionAssignor#onAssignment() which gives it some assigned partition whose topic is not unknown, and then call its onPartitionAssigned callback to check if the topology builder has updated its topology.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll remove the first two, but ok to leave in the one on line 352? I want to space the the creation of topics.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm actually thinking more about making the unit test really "unit" by mocking stream partition assignor to always return unknown topic partitions; the current test case seems to be more "integration test"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it is, I see your point now about the test, I'll make the changes.

@bbejeck
Copy link
Contributor Author

bbejeck commented May 30, 2017

@guozhangwang thanks for the review. I'll move the fix to the StreamPartitionAssignor#onAssignment .

From what I can see we'll need to leave the to call to TopologyBuilder.updateSubscription in StreamPartitionAssignor#subscription() as it updates the leader's topology used in theStreamPartitionAssignor.assign method.

@guozhangwang
Copy link
Contributor

From what I can see we'll need to leave the to call to TopologyBuilder.updateSubscription in StreamPartitionAssignor#subscription() as it updates the leader's topology used in theStreamPartitionAssignor.assign method.

makes sense.

@bbejeck
Copy link
Contributor Author

bbejeck commented May 30, 2017

@guozhangwang updates per comments.

  • Put SubscriptionUpdates back as inner class of StreamPartitionAssignor
  • Removed integration test and added unit test
  • Put the check for updated topics in the StreamPartitionAssignor#onAssignment

Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just some nits. Overall LGTM.



private void updateSubscribedTopics(Assignment assignment) {
if (streamThread != null && streamThread.builder.sourceTopicPattern() != null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How could streamThread be null -- null would indicate a bug and we might want to fail hard on that instead of masking it with this check?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you're right it can't, I put that in for unit tests, I'll remove and update the tests

}


private void updateSubscribedTopics(Assignment assignment) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add final
also in L616

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

for (TopicPartition topicPartition : assignment.partitions()) {
assignedTopics.add(topicPartition.topic());
}
if (!assignedTopics.isEmpty() && !streamThread.builder.subscriptionUpdates().getUpdates().containsAll(assignedTopics)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would containsAll not return true if assignedTopics is empty? (ie, can we remove the first check?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, containsAll returns if assignedTopics is empty, removing first check.

@asfbot
Copy link

asfbot commented May 31, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/4586/
Test PASSed (JDK 7 and Scala 2.11).

@asfbot
Copy link

asfbot commented May 31, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/4570/
Test PASSed (JDK 8 and Scala 2.12).

@bbejeck
Copy link
Contributor Author

bbejeck commented May 31, 2017

@mjsax removed null check for streamThread and first check for empty assignedTopic

cc\ @guozhangwang

@asfbot
Copy link

asfbot commented May 31, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/4580/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented May 31, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/4595/
Test PASSed (JDK 7 and Scala 2.11).

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update! LGTM overall, just a few minor comments.

}


private void updateSubscribedTopics(final Assignment assignment) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could we change this to maybeUpdateSubscribedTopics(Set<String> topics) with the check on streamThread.builder.sourceTopicPattern() != null within the function so that both subscription() and onAssignment could consolidate on this function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. This will cause a rebuild of the topology in those cases where all topics are already known. But thinking about it some more, IMHO it's worth the tradeoff for simplifying the logic.

@@ -104,6 +106,12 @@
private final MockClientSupplier mockClientSupplier = new MockClientSupplier();
private final TopologyBuilder builder = new TopologyBuilder();
private final StreamsConfig config = new StreamsConfig(configProps());
private final StreamThread mockStreamThread = new StreamThread(builder, config,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why modify this test class to let partitionAssignor set this mock stream thread?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are some tests using onAssign which were getting NPEs. Since in practice the embedded streamThread should never be null, I opted to update the test.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.

assignment = new PartitionAssignor.Assignment(topicPartitions, info.encode());
partitionAssignor.onAssignment(assignment);

assertTrue(nodeToSourceTopics.get("source").size() == 3);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the third step (i.e. from 2 topics to 3 topics) intended for additional test coverage?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I wanted to exercise the functionality more than once.

@bbejeck
Copy link
Contributor Author

bbejeck commented May 31, 2017

@guozhangwang thanks for the additional pass, updates per comments.

@asfbot
Copy link

asfbot commented May 31, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/4646/
Test FAILed (JDK 7 and Scala 2.11).

@asfbot
Copy link

asfbot commented May 31, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/4631/
Test FAILed (JDK 8 and Scala 2.12).

@bbejeck
Copy link
Contributor Author

bbejeck commented May 31, 2017

retest this please

@asfbot
Copy link

asfbot commented May 31, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/4644/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented May 31, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/4659/
Test PASSed (JDK 7 and Scala 2.11).

@bbejeck
Copy link
Contributor Author

bbejeck commented May 31, 2017

previous failure was unrelated to this PR

Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. (beside the reflection comment, but this should not block the PR).

final Field
nodeToSourceTopicsField =
topologyBuilder.getClass().getDeclaredField("nodeToSourceTopics");
nodeToSourceTopicsField.setAccessible(true);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arg. Reflections. Seems like our code is not well structure if we need this. Can you file a Jira to do some follow up cleanup (or if this does not make it into 0.11, we might to the cleanup first).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

@bbejeck
Copy link
Contributor Author

bbejeck commented Jun 1, 2017

@guozhangwang @mjsax updated per comments and discussion on slack.

Specifically, now we'll only rebuild the topology under two conditions

  1. The topics passed into StreamPartitionAssignor.subscription don't exactly match what the current TopologyBuilder.subscriptionUpdates contains
  2. Topics passed into StreamPartitionAssignor.onAssign have newly discovered topics from ConsumerCoordinator.onJoinComplete

@asfbot
Copy link

asfbot commented Jun 1, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/4691/
Test PASSed (JDK 7 and Scala 2.11).

@asfbot
Copy link

asfbot commented Jun 1, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/4675/
Test PASSed (JDK 8 and Scala 2.12).

@guozhangwang
Copy link
Contributor

LGTM! Merged to trunk and 0.11.0.

@asfgit asfgit closed this in 6360e04 Jun 1, 2017
asfgit pushed a commit that referenced this pull request Jun 1, 2017
subscribed stream may not be detected by all followers until
onJoinComplete returns.

Author: Bill Bejeck <bill@confluent.io>

Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>

Closes #3157 from bbejeck/KAFKA-5226_null_pointer_source_node_deserialize

(cherry picked from commit 6360e04)
Signed-off-by: Guozhang Wang <wangguoz@gmail.com>
@bbejeck bbejeck deleted the KAFKA-5226_null_pointer_source_node_deserialize branch July 10, 2024 13:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants