Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-5086; Update topic expiry time in Metadata every time the topic metadata is requested #2869

Closed
wants to merge 5 commits into from

Conversation

lindong28
Copy link
Member

@lindong28 lindong28 commented Apr 19, 2017

#As of current implementation, KafkaProducer.waitOnMetadata() will first reset topic expiry time of the topic before repeatedly sending TopicMetadataRequest and waiting for metadata response. However, if the metadata of the topic is not available within Metadata.TOPIC_EXPIRY_MS, which is set to 5 minutes, then the topic will be expired and removed from Metadata.topics. The TopicMetadataRequest will no longer include the topic and the KafkaProducer will never receive the metadata of this topic. It will enter an infinite loop of sending TopicMetadataRequest and waiting for metadata response.

This problem can be fixed by updating topic expiry time every time the topic metadata is requested.

Ping @becketqin for review.

@asfbot
Copy link

asfbot commented Apr 19, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/3006/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Apr 19, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/3011/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Apr 19, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3007/
Test PASSed (JDK 8 and Scala 2.12).

@omkreddy
Copy link
Contributor

LGTM

@ijuma
Copy link
Contributor

ijuma commented Apr 19, 2017

cc @rajinisivaram

@rajinisivaram
Copy link
Contributor

KafkaProducer.waitOnMetadata() doesn't wait indefinitely as mentioned in the description, it waits for max.block.ms which is one minute by default. Since expiry time is reset at the start of KafkaProducer.waitOnMetadata(), with the default config, topics are never expired while waiting for metadata. The issue is when max.block.ms is greater than the hard-coded 5 minute topic expiry time. At the moment, the wait continues beyond topic expiry until max.block.ms, but the topic is no longer in the metadata. This PR fixes that. It will be good to add a unit test for this scenario.

@lindong28
Copy link
Member Author

@rajinisivaram Are you suggesting to change the code and add the test, or are you suggesting to add the test only?

I think it can be tested like this:

  • Initialize a KafkaProducer with maxBlockMs = 10 minutes
  • Call producer.waitOnMetadata(...) for a non-existent topic
  • Verify that the topic is still in the metadata after 6 minutes.

The problem with this test is that it takes 6 minutes to finish this test. I am not sure that the test is worth the 6 minutes because it is pretty clear that waitOnMetadata() will keep refreshing the topic in metadata if maxBlockMs > topic expiry time.

Alternatively I can add a test like this:

  • Initialize a KafkaProducer with maxBlockMs = 10 minutes and retryBackOffMS = 1 seconds
  • Call producer.waitOnMetadata(...) for a non-existent topic
  • After 3 seconds, Verify that the age of the topic in metadata is less than 2 seconds

But this test doesn't directly verify that the topic won't be deleted from metadata in this scenario. I can add this test thought I personally don't think it is very useful.

What do you think?

@lindong28
Copy link
Member Author

I realized that this is a way to test the scenario without waiting for 6 minutes wallclock time. I will try it.

@rajinisivaram
Copy link
Contributor

@lindong28 I was suggesting that you could add a test that passes with the code from this PR and fails without. You can do so with a unit test that uses MockTime.

@lindong28
Copy link
Member Author

@rajinisivaram @ijuma @becketqin I have added test. Thanks.

@asfbot
Copy link

asfbot commented Apr 20, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3072/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Apr 20, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/3076/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Apr 20, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/3071/
Test PASSed (JDK 7 and Scala 2.10).

Copy link
Contributor

@rajinisivaram rajinisivaram left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lindong28 Thank you. Looks good overall, left a few minor comments in the test.

MemberModifier.field(KafkaProducer.class, "time").set(producer, time);
String topic = "topic";

Thread t = new Thread(new Runnable() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: new Runnable() not required, you can just override run() from Thread.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Fixed now.

metadata.update(Cluster.empty(), Collections.<String>emptySet(), time.milliseconds());
time.sleep(60 * 1000L);
try {
Thread.sleep(100);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than sleep 12 times, you could wait for an metadata.updateRequested before doing the update.

Copy link
Member Author

@lindong28 lindong28 Apr 21, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rajinisivaram Are you suggesting that we should call metadata.update only once after metadata.updateRequested returns true? I am not sure this is better. I think the test needs call metadata.update() multiple times so that the elapsed time is long enough for the topic to be removed from metadata without this patch, but not if we have this bug fix. This also mimics the scenarios that will cause problem.

If I didn't get your suggestion correctly, can you be more specific with you suggestion? Thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not particularly critical, but my suggestion was to keep the same for loop, but add something like

while (!metadata.updateRequested()) Thread.yield()
metadata.update(...)

or Thread.sleep(1) instead of yield(). Now each update corresponds to a request to update metadata from the partitionsFor. like it does in the real scenario. You need to make sure that the thread terminates in this case.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rajinisivaram Good point. I have changed the code to use yield(). Thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lindong28 I think the 12 wait for updates in the loop may be too many since max.block.ms=10min? It will be good to ensure that the test doesn't leave the thread running even if the test fails.

Copy link
Member Author

@lindong28 lindong28 Apr 24, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rajinisivaram My bad. I should have double checked the code when replacing Thread.sleep() with that while loop. I have updated the code to fix this problem and making sure that the thread will always exit within 1 second. And I have verified that the test can exit quickly (e.g. within 8 ms) if it succeeds.

@Override
public void run() {
for (int i = 0; i < 12; i++) {
metadata.update(Cluster.empty(), Collections.<String>emptySet(), time.milliseconds());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be realistic, the second parameter should be Collections.singleton(topic) rather than emptySet.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is fixed now.

@asfbot
Copy link

asfbot commented Apr 21, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/3101/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Apr 21, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/3106/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Apr 21, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3102/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Apr 21, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3105/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Apr 21, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/3104/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Apr 21, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/3109/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Apr 25, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3152/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Apr 25, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/3157/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Apr 25, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/3164/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Apr 25, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/3155/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Apr 25, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3159/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Apr 25, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/3162/
Test PASSed (JDK 7 and Scala 2.10).

@lindong28
Copy link
Member Author

retest this please

@becketqin
Copy link
Contributor

@rajinisivaram Do you want to merge this patch?

@rajinisivaram
Copy link
Contributor

@becketqin I haven't yet read through the committer documentation, but I can get to this tomorrow. If there is no hurry to merge this PR today, then this could be my first commit :-) Thanks.

@becketqin
Copy link
Contributor

@rajinisivaram Awesome. It is not urgent. :)

@asfbot
Copy link

asfbot commented Apr 27, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/3210/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Apr 27, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3205/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Apr 27, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/3206/
Test PASSed (JDK 7 and Scala 2.10).

@rajinisivaram
Copy link
Contributor

@lindong28 Can you include the JIRA in the PR title so that the PR gets linked to the JIRA?

@lindong28 lindong28 changed the title Update topic expiry time in Metadata every time the topic metadata is requested KAFKA-5086; Update topic expiry time in Metadata every time the topic metadata is requested Apr 27, 2017
@lindong28
Copy link
Member Author

@rajinisivaram Certainly. It is updated now. I forgot to include this in the title previously.

@rajinisivaram
Copy link
Contributor

@lindong28 Can you also update the PR description since it appears as the commit message? The title should be using colon rather than semi-colon. Thank you.

@ijuma
Copy link
Contributor

ijuma commented Apr 27, 2017

@rajinisivaram FYI, the merge script automatically fixes the title to fit the convention. It also lets you edit the title easily. It doesn't let you edit the PR description though (it's possible to do it via git commit --amend, but maybe better to avoid more advanced flows until later :)).

@rajinisivaram
Copy link
Contributor

@ijuma Thank you, I started with the tool, got as far as the commit and it didn't give me an option to update the message. I mentioned the semicolon because the JIRA doesn't currently have a link to this PR, perhaps that needs the PR to be closed and reopened? Or will the merge tool add comment and close JIRA anyway? Thanks.

@lindong28
Copy link
Member Author

lindong28 commented Apr 27, 2017

@rajinisivaram Looking at the git log it seems that we use both colon and semi-colon after the ticket number? It also seems that we don't generally include full description of the problem in the commit message (which we do in the PR description) because that may be too verbose. For example, we probably don't want to include "Ping @becketqin for review." in the commit message.

@becketqin mentioned that you can include edit PR title and not include PR description in the commit message. In other words, you can choose not to include PR description and edit commit message in any way you like.

Thank you.

@asfgit asfgit closed this in 5b5efd4 Apr 27, 2017
@lindong28 lindong28 deleted the KAFKA-5086 branch April 27, 2017 22:03
@rajinisivaram
Copy link
Contributor

@lindong28 Being my first commit to Kafka, I didn't want to get it all wrong! I have checked after the commit and it looks fine. Thanks.

@lindong28
Copy link
Member Author

@rajinisivaram Sure. I understand. Thanks for the review!

@ijuma
Copy link
Contributor

ijuma commented Apr 27, 2017

@lindong28,

"@rajinisivaram Looking at the git log it seems that we use both colon and semi-colon after the ticket number? It also seems that we don't generally include full description of the problem in the commit message (which we do in the PR description) because that may be too verbose."

This is not accurate. The merge tool will use the PR description as the commit message of the squashed commit. This is explained in the wiki page for contributors. As such, you should not ping reviewers in the PR description. You should instead do that as a separate comment.

"@becketqin mentioned that you can include edit PR title and not include PR description in the commit message. In other words, you can choose not to include PR description and edit commit message in any way you like."

Editing the title is correct as I said. The PR description bit is again not accurate. The only way to change that is via direct git commands, the PR tool doesn't provide that functionality (I wrote it, so I would know :)).

@rajinisivaram, not sure if it was clear, but I said the tool allows you to update the commit title, but not the commit message. However, you can update the commit message by using git commit --amend in a separate shell just before saying y to push the merged change. This is advanced usage and it's possible to make mistakes like including local files from your checkout (which has happened when people are not careful).

@lindong28
Copy link
Member Author

lindong28 commented Apr 27, 2017

@ijuma

"Editing the title is correct as I said. The PR description bit is again not accurate. The only way to change that is via direct git commands, the PR tool doesn't provide that functionality (I wrote it, so I would know :))."

Is it true that committer can choose not to include PR description and edit PR title? If so, that effectively allows committer to edit PR description freely in the commit message. That statement should be accurate.

@ijuma
Copy link
Contributor

ijuma commented Apr 27, 2017

No, that's not true. Not via the tool.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants