Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-10017: fix flaky EOS-beta upgrade test #9690

Closed
wants to merge 3 commits into from

Conversation

mjsax
Copy link
Member

@mjsax mjsax commented Dec 4, 2020

PR for 2.6 branch. "Main" PR for trunk and 2.7 is #9688

The difference is, that in 2.6 and eos-alpha, we commit tasks individually, while in 2.7 and eos-alpha, if one tasks needs a commit we commit all tasks.

Call for review @abbccdda @ableegoldman @guozhangwang

@mjsax mjsax added streams tests Test fixes (including flaky tests) labels Dec 4, 2020
@guozhangwang
Copy link
Contributor

LGTM. Left some comment on the PR for trunk.

@mjsax
Copy link
Member Author

mjsax commented Dec 9, 2020

The test failed... pushed a commit for better debugging. Will try to reproduce locally. Seems there is still something going on.

@showuon
Copy link
Contributor

showuon commented Dec 11, 2020

@mjsax , I investigated your failed tests for some days, and finally found out why sometimes the test failed here:

Did not receive all 148 records from topic multiPartitionOutputTopic within 60000 ms
Expected: is a value equal to or greater than <148>
     but: <138> was less than <148>
	at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
	at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:597)
	at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:449)
	at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417)
	at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:593)
	at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:566)
	at org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationThreeTest.readResult(EosBetaUpgradeIntegrationThreeTest.java:1056)
	at org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationThreeTest.verifyUncommitted(EosBetaUpgradeIntegrationThreeTest.java:1030)
	at org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationThreeTest.shouldUpgradeFromEosAlphaToEosBeta(EosBetaUpgradeIntegrationThreeTest.java:619)

It's because sometimes, the keys in stream store is empty, and that's why the following computation based on the variable is wrong.
Here's the logs I got: (They mapped to the code here, which is in phase 6)

keysFirstClientBeta is []
keysSecondClientAlpha is [1, 3]

And the variable newlyCommittedKeys in phase 5 should also be empty [] since we all get it via keysFromInstance(streams1Beta);. And that's why at the end of phase 5, verifyCommitted(expectedCommittedResultAfterRestartFirstClient); is actually doing verifyCommitted([]) <-- I also printed out the log here and can confirmed this

So, in summary, there's no logic error in the code, just has a bad assumption: keysFromInstance(streams1Beta); will always get something, but it might be empty. I just can't figure out why it's empty, store not ready? Or some other reasons here? Do you have any thought for this?

Thanks.

@guozhangwang
Copy link
Contributor

THanks for the investigation and the PR #9733 @showuon . I will leave it to @mjsax to review your PR, hopefully these two combined together capture all the root causes

@showuon
Copy link
Contributor

showuon commented Dec 17, 2020

@mjsax , I further investigated the issue I found last week:

It's because sometimes, the keys in stream store is empty, and that's why the following computation based on the variable is wrong. Here's the logs I got: (They mapped to the code here, which is in phase 6)

keysFirstClientBeta is []
keysSecondClientAlpha is [1, 3]

I finally found out the root cause, it's because the stream is not completed the stable assignment rebalancing during keysFromInstance(streams). Echo the discussion in my another PR: #9733 (comment), we did have "cut off" the unstable rebalances via assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS);, but with this, we can only make sure the assignment is completed and stable now, but the streams haven't completed the REBALANCING yet. The workflow is like this:

Coordinator finished stable assignment of tasks -> notify tasks -> task handles the new assignment -> stream thread change state from RUNNING to PARTITIONS_ASSIGNED -> stream client change state from RUNNING to REBALANCING -> stream thread change state from PARTITIONS_ASSIGNED to RUNNING -> stream client change state from REBALANCING to RUNNING

And what we can make sure via assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS); is only the step 1: Coordinator finished stable assignment of tasks completes. Also remember, the current stream state is RUNNING, which will pass the following checking (waitForRunning()). And the empty list is because the 1st unstable assignment to the stream is empty:

2020-12-17T14:07:32.331+0800 [DEBUG] [TestEventLogger]     [2020-12-17 14:07:32,331] INFO [Consumer clientId=appDir1-StreamThread-1-consumer, groupId=appId-1] Updating assignment with
2020-12-17T14:07:32.331+0800 [DEBUG] [TestEventLogger]          Assigned partitions:                       []
2020-12-17T14:07:32.331+0800 [DEBUG] [TestEventLogger]          Current owned partitions:                  []
2020-12-17T14:07:32.331+0800 [DEBUG] [TestEventLogger]          Added partitions (assigned - owned):       []
2020-12-17T14:07:32.331+0800 [DEBUG] [TestEventLogger]          Revoked partitions (owned - assigned):     []
2020-12-17T14:07:32.331+0800 [DEBUG] [TestEventLogger]      (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:393)

So, that's why we got the empty key list form the stream store.

As I mentioned in #9733 , after assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS);, I think we should explicitly wait for specific transition pair, ex: [KeyValue(RUNNING, REBALANCING), KeyValue(REBALANCING, RUNNING)], instead of waiting for RUNNING state only. Also, as you said, there might be more than 2 rebalancing happened after 1 stream started, I think we can have a count for onAssignmentComplete after prepareForRebalance (unstable + stable count), so that after it's stable assigned, we can know exactly how many rebalancing happened so that we can check the state transition content, ex: with 2 assignment happened, we can check if state transition list has 2 rebalancing value, and the last one is RUNNING state...etc.

Anyway, that's my finding, share with you. I'll update in my PR #9733 (maybe next week since a little busy these days). Thanks.

@mjsax
Copy link
Member Author

mjsax commented Feb 23, 2021

As we have a 2.8 release branch now, it seems not to be worth any longer to backport to fix to 2.6 branch. Closing this PR.

@mjsax mjsax closed this Feb 23, 2021
@mjsax mjsax deleted the eos-test-26 branch February 23, 2021 03:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
streams tests Test fixes (including flaky tests)
Projects
None yet
3 participants