-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-4476: Kafka Streams gets stuck if metadata is missing #2209
Conversation
mjsax
commented
Dec 3, 2016
- break loop in StreamPartitionAssigner.assign() in case partition metadata is missing
- fit state transition issue (follow up to KAFKA-3637: Add method that checks if streams are initialised)
- some test improvements
@@ -147,16 +146,22 @@ public void testReprocessingFromScratchAfterResetWithIntermediateUserTopic() thr | |||
60000); | |||
// receive only first values to make sure intermediate user topic is not consumed completely |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mjsax is the testNo
logic correct in that we only produce when testNo
is 1?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. That is intended. Both tests consume the same input topic and different applications IDs, so both can read the same data and thus it's sufficient to write the input only once.
7759854
to
9a0d03c
Compare
@ijuma Done. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @mjsax. Looks like two tests failed in Jenkins (one of them is testReprocessingFromScratchAfterResetWithIntermediateUserTopic
). Is it possible to add tests for the bug fixes that fail reliably instead of transiently?
// => want to test "skip over" unprocessed records | ||
// increasing the sleep time only has disadvantage that test run time is increased | ||
mockTime.sleep(sleep); | ||
sleep *= 2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that you removed this, seems like the comment here is no longer valid: https://github.com/mjsax/kafka/blob/9a0d03cf88997e4ddef5c082ae989cf4697472b7/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java#L74
@mjsax - it is not immediately clear to me what this is fixing? Do you think you can add some unit tests to cover the change? They'll probably help with understanding the issue, too. |
@ijuma There is another bug... This is actually tackling two different problems and I did discover a third one. From a bug tracking point of view it would make sense to have different PRs -- however, as all three issue make Reset tool test fails, it's not possible to get a clean build (and be sure there is not fourth one) if we split into multiple PRs... WDYT? |
@mjsax I think one PR is fine if there are unit tests for each of the issues uncovered. |
9a0d03c
to
2483cdd
Compare
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
2483cdd
to
69bb3a7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a few nit comment, otherwise LGTM.
if (numPartitions == NOT_AVAILABLE) { | ||
continue; | ||
} | ||
if (numPartitions < 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: just use (numPartitions == UNKNOWN)
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be UNKOWN
or NOT_AVAILABLE
, thus < 0
does check for both at once. I could change it to (numPartitions == UNKNOWN || numPartitions == NOT_AVAILABLE)
of course... WDYT?
} | ||
}) | ||
.through("topic2"); | ||
stream1.to("topic3"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this output topic here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No really required. Will remove it. Guess it's a left over.
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Merged to trunk. |
- break loop in StreamPartitionAssigner.assign() in case partition metadata is missing - fit state transition issue (follow up to KAFKA-3637: Add method that checks if streams are initialised) - some test improvements Author: Matthias J. Sax <matthias@confluent.io> Reviewers: Eno Thereska, Ismael Juma, Guozhang Wang Closes apache#2209 from mjsax/kafka-4476-stuck-on-missing-metadata