Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-3896: Fix KStreamRepartitionJoinTest #2405

Conversation

guozhangwang
Copy link
Contributor

@guozhangwang guozhangwang commented Jan 19, 2017

The root cause of this issue is that in InternalTopicManager we are creating topics one-at-a-time, and for this test, there are 31 topics to be created, as a result it is possible that the consumer could time out during the assignment in rebalance, and the next leader has to do the same again because of "makeReady" calls are one-at-a-time.

This patch batches the topics into a single create request and also use the StreamsKafkaClient directly to fetch metadata for validating the created topics. Also optimized a bunch of inefficient code in InternalTopicManager and StreamsKafkaClient.

Minor cleanup: make the exception message more informative in integration tests.

@asfbot
Copy link

asfbot commented Jan 19, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/1013/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Jan 19, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/1015/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Jan 19, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/1013/
Test PASSed (JDK 7 and Scala 2.10).

@guozhangwang
Copy link
Contributor Author

retest this please

@asfbot
Copy link

asfbot commented Jan 19, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/1017/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Jan 19, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/1015/
Test PASSed (JDK 8 and Scala 2.12).

@guozhangwang
Copy link
Contributor Author

retest this please

@asfbot
Copy link

asfbot commented Jan 19, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/1027/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Jan 19, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/1025/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Jan 19, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/1025/
Test PASSed (JDK 7 and Scala 2.10).

@guozhangwang
Copy link
Contributor Author

retest this please

@asfbot
Copy link

asfbot commented Jan 19, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/1027/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Jan 19, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/1029/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Jan 19, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/1027/
Test PASSed (JDK 7 and Scala 2.10).

@guozhangwang
Copy link
Contributor Author

retest this please

@asfbot
Copy link

asfbot commented Jan 19, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/1038/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Jan 19, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/1036/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Jan 19, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/1036/
Test PASSed (JDK 7 and Scala 2.10).

@guozhangwang
Copy link
Contributor Author

retest this please

1 similar comment
@guozhangwang
Copy link
Contributor Author

retest this please

@asfbot
Copy link

asfbot commented Jan 20, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/1045/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Jan 20, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/1043/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Jan 20, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/1043/
Test PASSed (JDK 8 and Scala 2.12).

@guozhangwang guozhangwang changed the title KAFKA-3896: Fix KStreamRepartitionJoinTest [WIP] KAFKA-3896: Fix KStreamRepartitionJoinTest Jan 20, 2017
@guozhangwang
Copy link
Contributor Author

I had a comment in the original KAFKA-4060 PR to batch the requests, but it was not addressed somehow. Ping @hjafarpour @mjsax @dguy @enothereska for review.

@asfbot
Copy link

asfbot commented Jan 20, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/1066/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Jan 20, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/1064/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Jan 20, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/1064/
Test PASSed (JDK 8 and Scala 2.12).

Copy link
Contributor

@dguy dguy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a few minor comments. But i think we need a test for InternalTopicManager.getNumPartitions

Collection<MetadataResponse.TopicMetadata> topicsMetadata = streamsKafkaClient.fetchTopicsMetadata();
validateTopicPartitons(topics, topicsMetadata);
Map<InternalTopicConfig, Integer> topicsToBeCreated = filterExistingTopics(topics, topicsMetadata);
Map<String, Integer> existingTopicPartitions = getExistingTopicNamesPartitions();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These could (should) be final?

* Get the number of partitions for the given topics
*/
public Map<String, Integer> getNumPartitions(final Set<String> topics) {
Map<String, Integer> existingTopicPartitions = getExistingTopicNamesPartitions();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

private Map<InternalTopicConfig, Integer> filterExistingTopics(final Map<InternalTopicConfig, Integer> topicsPartitionsMap, Collection<MetadataResponse.TopicMetadata> topicsMetadata) {
Map<String, Integer> existingTopicNamesPartitions = getExistingTopicNamesPartitions(topicsMetadata);
private Map<InternalTopicConfig, Integer> validateTopicPartitons(final Map<InternalTopicConfig, Integer> topicsPartitionsMap,
final Map<String, Integer> existingTopicNamesPartitions) {
Map<InternalTopicConfig, Integer> nonExistingTopics = new HashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd probably change the name of this to topicsToBeCreated or something similar. Also final

}

private Map<String, Integer> getExistingTopicNamesPartitions(Collection<MetadataResponse.TopicMetadata> topicsMetadata) {
private Map<String, Integer> getExistingTopicNamesPartitions() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getExistingPartitionCountByTopic ?

}

private Map<String, Integer> getExistingTopicNamesPartitions(Collection<MetadataResponse.TopicMetadata> topicsMetadata) {
private Map<String, Integer> getExistingTopicNamesPartitions() {
// The names of existing topics
Map<String, Integer> existingTopicNamesPartitions = new HashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

existinPartitionCountByTopic?
final?

do {
partitions = streamThread.restoreConsumer.partitionsFor(topic.name());
} while (partitions == null || partitions.size() != numPartitions);
Map<String, Integer> partitions = internalTopicManager.getNumPartitions(topicNamesToMakeReady);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My preference here would be to extract this logic into a method like:

private boolean allTopicsCreated(final Set<String> topicNamesToMakeReady, final Map<InternalTopicConfig, Integer> topicsToMakeReady) {
        final Map<String, Integer> partitions = internalTopicManager.getNumPartitions(topicNamesToMakeReady);
        for (Map.Entry<InternalTopicConfig, Integer> entry : topicsToMakeReady.entrySet()) {
            final Integer numPartitions = partitions.get(entry.getKey().name());
            if (numPartitions == null || !numPartitions.equals(entry.getValue())) {
                return false;
            }
        }
        return true;
    }

and then have:

while(!allTopicsCreated(topicNamesToMakeReady, topicsToMakeReady) {
   // should we add a small sleep here?
}

I think it makes the code cleaner. Removes the temporary variable and the break (neither of which i like!)

Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I second @dguy comments. There are a few more vars that can be final, too. Otherwise, LGTM.

@guozhangwang
Copy link
Contributor Author

@dguy @mjsax addressed your comments, please take a look again.

@asfbot
Copy link

asfbot commented Jan 21, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/1089/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Jan 21, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/1087/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Jan 21, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/1087/
Test PASSed (JDK 8 and Scala 2.12).

Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One nit comment. Otherwise LGTM.

if (!existingTopicNamesPartitions.get(topic.name()).equals(topicsPartitionsMap.get(topic))) {
throw new StreamsException("Existing internal topic " + topic.name() + " has invalid partitions." +
" Expected: " + topicsPartitionsMap.get(topic) + " Actual: " + existingTopicNamesPartitions.get(topic.name()) +
". Use 'kafka.tools.StreamsResetter' tool to clean up invalid topics before processing.");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Use 'kafka.tools.StreamsResetter' tool"
-> "Use '" + kafka.tools.StreamsResetter.getClass().getName() + "' tool"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason we do not use the class directly is that streams does not depend on kafka.tools for not, and I'd rather not doing that until we have enough motivations to do so.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack.

Copy link
Contributor

@dguy dguy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGMT

@asfbot
Copy link

asfbot commented Jan 24, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/1126/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Jan 24, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/1128/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Jan 24, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/1126/
Test FAILed (JDK 7 and Scala 2.10).

@guozhangwang
Copy link
Contributor Author

retest this please

@asfbot
Copy link

asfbot commented Jan 24, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/1133/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Jan 24, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/1133/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Jan 24, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/1135/
Test FAILed (JDK 8 and Scala 2.11).

@guozhangwang
Copy link
Contributor Author

I'm investigating the jenkins failure in another JIRA / PR. Could we merge this PR as is @hachikuji ?

final Integer numPartitions = entry.getValue().numPartitions;
// first construct the topics to make ready
Map<InternalTopicConfig, Integer> topicsToMakeReady = new HashMap<>();
Set<String> topicNamesToMakeReady = new HashSet<>();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feels like this collection is redundant. You can get the name from InternalTopicConfig perhaps?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I can, but then I need to do another foreach anyways to extract the names when calling the function, while doing it here saves that.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

Copy link

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

asfgit pushed a commit that referenced this pull request Jan 24, 2017
The root cause of this issue is that in InternalTopicManager we are creating topics one-at-a-time, and for this test, there are 31 topics to be created, as a result it is possible that the consumer could time out during the assignment in rebalance, and the next leader has to do the same again because of "makeReady" calls are one-at-a-time.

This patch batches the topics into a single create request and also use the StreamsKafkaClient directly to fetch metadata for validating the created topics. Also optimized a bunch of inefficient code in InternalTopicManager and StreamsKafkaClient.

Minor cleanup: make the exception message more informative in integration tests.

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Damian Guy, Matthias J. Sax, Jason Gustafson

Closes #2405 from guozhangwang/K3896-fix-kstream-repartition-join-test

(cherry picked from commit 7837d3e)
Signed-off-by: Guozhang Wang <wangguoz@gmail.com>
@guozhangwang
Copy link
Contributor Author

Thanks for your reviews @dguy @mjsax @hachikuji . Merged to trunk and 0.10.2.

@asfgit asfgit closed this in 7837d3e Jan 24, 2017
soenkeliebau pushed a commit to soenkeliebau/kafka that referenced this pull request Feb 7, 2017
The root cause of this issue is that in InternalTopicManager we are creating topics one-at-a-time, and for this test, there are 31 topics to be created, as a result it is possible that the consumer could time out during the assignment in rebalance, and the next leader has to do the same again because of "makeReady" calls are one-at-a-time.

This patch batches the topics into a single create request and also use the StreamsKafkaClient directly to fetch metadata for validating the created topics. Also optimized a bunch of inefficient code in InternalTopicManager and StreamsKafkaClient.

Minor cleanup: make the exception message more informative in integration tests.

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Damian Guy, Matthias J. Sax, Jason Gustafson

Closes apache#2405 from guozhangwang/K3896-fix-kstream-repartition-join-test
@guozhangwang guozhangwang deleted the K3896-fix-kstream-repartition-join-test branch July 15, 2017 22:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants