Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-6783: consumer poll(timeout) blocked infinitely #4861

Closed
wants to merge 1 commit into from

Conversation

koqizhao
Copy link

@koqizhao koqizhao commented Apr 12, 2018

KAFKA-6783: consumer poll(timeout) blocked infinitely when no available bootstrap server

@koqizhao
Copy link
Author

bug "consumer poll blocked" is found in 0.10.x versions in:
https://issues.apache.org/jira/browse/KAFKA-5065
but fix of 5065 is out of date, cannot be applied to 1.1.0.

Yesterday I found it in 1.1.0 as well and made a fix.

@hachikuji
Copy link

@koqizhao Thanks for the patch. Can you separate this into two separate PRs please?

@koqizhao koqizhao changed the title KAFKA-6783/6784: consumer poll(timeout) blocked infinitely, FindCoordinatorResponse cannot be cast to FetchResponse KAFKA-6783: consumer poll(timeout) blocked infinitely Apr 13, 2018
@koqizhao
Copy link
Author

koqizhao commented Apr 13, 2018

@hachikuji , OK.

Created another pull #4865 for 6784 and update the current only for 6783.

@koqizhao koqizhao force-pushed the KAFKA-6783/6784 branch 3 times, most recently from d824d9a to aa75e8f Compare April 13, 2018 05:45
bootstrap server
fix bug: FindCoordinatorResponse cannot be cast to FetchResponse
@hachikuji
Copy link

Thanks for the PR. I think @vvcephei is working on this problem as well. See #4855. At a glance, it looks like his patch handles a few more cases and I think he may have a KIP in the works. Perhaps you can coordinate your efforts?

@vvcephei
Copy link
Contributor

Hey @koqizhao , thanks for the patch.

I only just recently started working on this issue. It's causing https://issues.apache.org/jira/browse/KAFKA-5697 as well.

I'm happy to yield to you. In particular, I like your new test and your approach to the fixing the tests. I was just going to add 30s to every invocation of poll() in our tests, which isn't too principled.

Feel free to grab any code from my PR to fill in the missing cases @hachikuji was talking about.

Let me know if you want to continue with this PR, and I'll stop work on mine and switch to reviewing yours when it's ready!

@vvcephei
Copy link
Contributor

Oh, also, I still haven't decided about whether or not to write a KIP. The basic issue is whether we need to preserve the existing semantics in some way.

Namely, whether we need to continue providing some way to just sync the metadata and to just "drain" previously fetched results without fetching another batch.

The biggest place this would matter is when timeout = 0, so I did this search: https://www.google.com/search?q="consumer.poll%280%29"+site%3A%3Ahttps%3A%2F%2Fgithub.com

Many of the hits seem to acknowledge that poll(0) is more or less a hack, but at the least they demonstrate that there is a desire for some way to initialize the consumer and fetch offsets without actually polling.

At the very least, the existance of calls to poll(0) "in the wild" are problematic for changing the semantics of our timeout naively, since that call will definitely not behave as expected if we start taking out time to sync the metadata.

One option is to treat 0 specially. Say, if you call poll(0), we'll actually use Long.MAX_VALUE for the metadata sync and then use 0 timeout for the data poll, exactly as we do today. Does that improve the situation, or make it more complicated and worse? I'm not sure.

What do you think?

@hachikuji
Copy link

Many of the hits seem to acknowledge that poll(0) is more or less a hack, but at the least they demonstrate that there is a desire for some way to initialize the consumer and fetch offsets without actually polling.

Yeah, I think this is exactly right. At a minimum, having an API to do this would be useful in testing. Maybe something like this:

Set<TopicPartition> awaitAssignment(long timeout, TimeUnit unit);

It's kind of a weird though and I'm not sure what kind of use cases it addresses outside of testing. That might make it a tough sell.

Our case for not doing a KIP would be stronger if the change didn't break our own tests 😉. My feeling is we probably just have to do it, but I would like to be convinced otherwise. I almost hate to suggest it, but we could introduce a new config which controls whether or not poll() should block to join the join the group and fetch offsets. Maybe something like block.for.assignment, which defaults to true?

@koqizhao
Copy link
Author

koqizhao commented Apr 17, 2018

A new config block.for.assignment maybe cannot help to resolve the issue, no matter it's default to true or false.
if default to true, how do users learn to config it to false?
if default to false, how do users learn to config it to true?

if poll(0) is equivalent to poll(max), infinite block continues to happen, which is not expected for users using poll(0).

I suggest to have a new config block.for.assignment.ms so as to keep back compatibility for poll(0):
if not all servers down, the config has no effect.
if all servers down, the config causes a max block time as configured

for poll(timeout > 0), always honor the timeout

@vvcephei
Copy link
Contributor

That sounds similar to one thought I had, which would be to add a new variant poll(assignmentTimeout, pollTimeout). This has the advantage of preserving existing semantics and behavior, while allowing you to specify a finite timeout for the assignment phase. The downside is that it's just kind of weird. The parameters leak information about what is going on inside the method.

Plus, from my informal survey, it really seems like you would have two distinct use cases, poll(some_value, 0) with the result ignored for just getting an assignment, and poll(some_value, some_other_value) with the result saved for really doing a poll. In the former usage, I'd argue it's actually a bit unsafe to ignore the result, since you can't be sure that poll won't return some results, and if it does, you'll miss them (the consumer would have no way of knowing that you're ignoring the results). The latter usage is also not great, since it's not clear how I as a caller should divide my available time between the assignment "phase" and actually polling.

I think whether it's a new method or a new config, it's probably KIP-worthy. I think we could forgive ourselves for sneaking something by if it's just purely a semantic change, but offering a new config is just as much a public interface change as a new method.

From where I'm sitting, it seems like the new method is the better option. It will let people who just want to wait for an assignment and not get results to call void waitForAssignment() instead of List poll(0), which is way less mysterious. And it also lets folks who want to poll(timeout) know that their timeout is going to be respected, as opposed to transformed into timeout + block.for.assignment.ms.

I'm willing to write the KIP (which is good, because I'm arguing for it ;) ), but I'm also willing to take a back seat if @koqizhao wants to lead it.

@koqizhao
Copy link
Author

Great! Agree to a new method. Thanks @vvcephei. I would like you to write the KIP.

Maybe boolean waitForAssignment(long timeout) is better.

@koqizhao
Copy link
Author

Maybe add 2 methods:
void waitForAssignment()
boolean waitForAssignment(long timeout)

@vvcephei
Copy link
Contributor

Ok! I'll do it today.

Good call on the timeout, I think void waitForAssignment(long timeoutMs), with a TimeoutException on timeout is a good public interface.

At an implementation level, we are still going to need to figure out what to do with all the tests, I think it boils down to either switching to waitForAssignment or adding more time to the current poll timeout. In #4855 I've experimentally determined that just using a 30s timeout across the board is sufficient to get all the tests to pass, but we should probably make a case-by-case call on which operation the test is semantically going for.

@vvcephei
Copy link
Contributor

@koqizhao @hachikuji ,

I've created KIP-288 as we discussed. I also started a discussion thread on the mailing list (dev@kafka.apache.org).

Please reply with your thoughts! Also, please reach out to anyone you think might have an opinion.

Thanks,
-John

@vvcephei
Copy link
Contributor

Actually, I've just learned of KIP-266, which also covers this issue. @koqizhao and @hachikuji , can you review KIP-266 and contribute thoughts to that discussion?

@tedyu
Copy link
Contributor

tedyu commented Apr 17, 2018

retest this please

@koqizhao
Copy link
Author

These cases failed by the same cause. I'm not familiar with kafka streams, @vvcephei, would you please help? Thanks.

org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingByDurationAfterResetWithoutIntermediateUserTopic
org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic
org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic
org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromFileAfterResetWithoutIntermediateUserTopic
org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromDateTimeAfterResetWithoutIntermediateUserTopic
org.apache.kafka.streams.integration.ResetIntegrationWithSslTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic
org.apache.kafka.streams.integration.ResetIntegrationWithSslTest.testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic

gradle streams:test --tests org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingByDurationAfterResetWithoutIntermediateUserTopic

org.apache.kafka.streams.integration.ResetIntegrationTest > testReprocessingByDurationAfterResetWithoutIntermediateUserTopic FAILED
java.lang.AssertionError: Condition not met within timeout 30000. Did not receive all 10 records from topic outputTopic
at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:276)
at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:194)
at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:165)
at org.apache.kafka.streams.integration.AbstractResetIntegrationTest.testReprocessingByDurationAfterResetWithoutIntermediateUserTopic(AbstractResetIntegrationTest.java:492)
at org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingByDurationAfterResetWithoutIntermediateUserTopic(ResetIntegrationTest.java:92)

@vvcephei
Copy link
Contributor

vvcephei commented Apr 24, 2018

Hey @koqizhao ,

I just saw this. I'm not too familiar with that test either. Does it fail in isolation?

I would say just to try it with an increased timeout, but I think 30s is the max timeout for the Streams tests. For the record, though, I got all the tests to pass on c5b19b5, which sets the metadata timeout to 30s for everything, which suggests that a 30s timeout should be long enough...

It might be that some condition is missed in the metadata update. For example, in 12e0c9c I just fixed a problem where if a consumergroup rejoin timed out, it wouldn't try again. If you can get the test running in your IDE, tracing through the KafkaConsumer part of the test execution might give you a clue about what's going wrong.

For clarity: both of those commits are in #4855 , if you're looking for them.

@kalimatas
Copy link

Hi,

Any updates on this issue? Also faced this issue on production, when we had uncreachable Kafka brokers.

@koqizhao
Copy link
Author

koqizhao commented May 10, 2018 via email

@vvcephei
Copy link
Contributor

Hi @koqizhao ,

I mentioned this on your ticket, but I think that this PR is also addressed by c470ff7 . Is there any remaining change you'd like to propose, or do you want to close this PR?

Thanks,
-John

@koqizhao
Copy link
Author

The bug is duplicated to 5697, and is fixed in 2.0.0. My code is partly included in fix for 5697.
Close the pull request here.

John Roesler resolved KAFKA-6783.

Resolution: Duplicate

We've merged [https://github.com/apache/kafka/commit/c470ff70d3e829c8b12f6eb6cc812c4162071a1f] under KAFKA-5697, which should fix this issue. In retrospect, your ticket would have been a more appropriate scope for the work, but it's too late to change the commit title now.

@koqizhao koqizhao closed this May 30, 2018
@koqizhao koqizhao deleted the KAFKA-6783/6784 branch May 30, 2018 02:24
@koqizhao koqizhao restored the KAFKA-6783/6784 branch May 30, 2018 02:24
@koqizhao koqizhao deleted the KAFKA-6783/6784 branch May 30, 2018 02:24
@koqizhao koqizhao restored the KAFKA-6783/6784 branch May 30, 2018 02:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants