Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-8620: fix NPE due to race condition during shutdown while rebalancing #7021

Merged
merged 10 commits into from Jul 15, 2019

Conversation

@abbccdda
Copy link
Contributor

commented Jul 1, 2019

We have detected a race condition under system test failure. The problem was that the task manager internal active tasks should be guarded against state changes on the stream thread. Could definitely consider other fixes but this is currently the make-sense one.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@mjsax mjsax added the streams label Jul 2, 2019

@abbccdda abbccdda force-pushed the abbccdda:system_test_fix branch to c6a5997 Jul 3, 2019

@abbccdda abbccdda force-pushed the abbccdda:system_test_fix branch Jul 7, 2019

@abbccdda

This comment has been minimized.

Copy link
Contributor Author

commented Jul 8, 2019

@abbccdda abbccdda force-pushed the abbccdda:system_test_fix branch Jul 8, 2019

@abbccdda abbccdda force-pushed the abbccdda:system_test_fix branch to b2326b4 Jul 8, 2019

@mjsax mjsax changed the title KAFKA-8620: fix stream task concurrent access issue KAFKA-8620: fix NPE due to race condition during shutdown while rebalancing Jul 9, 2019

// when the state is already in NOT_RUNNING, all its transitions
// will be refused but we do not throw exception here
return null;
} else if (state == State.PARTITIONS_REVOKED && newState == State.PARTITIONS_REVOKED) {
log.debug("Invalid transition from PARTITIONS_REVOKED to PARTITIONS_REVOKED: " +
"self transition is not allowed");

This comment has been minimized.

Copy link
@mjsax

mjsax Jul 9, 2019

Member

Thinking about this, I am wondering if we should just change the FSM to allow this transition and simplify the code here? \cc @guozhangwang

This comment has been minimized.

Copy link
@guozhangwang

guozhangwang Jul 15, 2019

Contributor

I thought about this when tightening the FSM before but the unit tests reminds me of one thing: our current contract is that we only transit to PARTITIONS_REVOKED when calling onPartitionsRevoked, which is called only once at the beginning of the rebalance today, so keeping it strict is better just in case we have incorrect partial rebalance procedure.

With KIP-429 this may be violated so we need to revisit our FSM once Streams adopt cooperative protocols. cc @ableegoldman who's working on this.

return;
}
if (streamThread.assignmentErrorCode.get() == StreamsPartitionAssignor.Error.NONE.code()) {
log.error("State transition from {} to PARTITIONS_ASSIGNED failed. " +

This comment has been minimized.

Copy link
@mjsax

mjsax Jul 9, 2019

Member

Why is this an ERROR? If we receive a SHUTDOWN signal, it's just an "eager exit" to not create tasks IMHO. We might want to log a DEBUG though.

Would also update the message:

log.debug("Skipping task creation in rebalance because we are already in shutdown phase.");

This comment has been minimized.

Copy link
@abbccdda

abbccdda Jul 9, 2019

Author Contributor

Sounds good to me.

@@ -1179,7 +1197,7 @@ private void completeShutdown(final boolean cleanRun) {
// intentionally do not check the returned flag
setState(State.PENDING_SHUTDOWN);

log.info("Shutting down");
log.info("Shutting down with cleanRun {}", cleanRun);

This comment has been minimized.

Copy link
@mjsax

mjsax Jul 9, 2019

Member

Not sure if we need this? If the shutdown is not clean, we logged an ERROR before in StreamThread#run()

This comment has been minimized.

Copy link
@cadonna

cadonna Jul 9, 2019

Contributor

I would not use variable names in log messages.

This comment has been minimized.

Copy link
@abbccdda

abbccdda Jul 9, 2019

Author Contributor

I could avoid using variable name in the log, but still feel needed to log the flag here, as we can't control future changes to set cleanRun flag.


// assign single partition
assignedPartitions.add(t1p1);
assignedPartitions.add(t1p2);

This comment has been minimized.

Copy link
@mjsax

mjsax Jul 9, 2019

Member

Why do we need two partitions?

beginOffsets.put(t1p2, 0L);
mockConsumer.updateBeginningOffsets(beginOffsets);

final Thread callbackThread = new Thread(() -> {

This comment has been minimized.

Copy link
@mjsax

mjsax Jul 9, 2019

Member

Why do we need a Thread here?

This comment has been minimized.

Copy link
@abbccdda

abbccdda Jul 9, 2019

Author Contributor

We need a separate thread to trigger callbacks, otherwise the state won't proceed. Maybe there is a better way to do that?

This comment has been minimized.

Copy link
@mjsax

mjsax Jul 9, 2019

Member

Can you elaborate? What do you mean by

otherwise the state won't proceed

@cadonna
Copy link
Contributor

left a comment

@abbccdda, Thank you for the PR.

if (streamThread.assignmentErrorCode.get() == StreamsPartitionAssignor.Error.NONE.code()) {
log.error("State transition from {} to PARTITIONS_ASSIGNED failed. " +
"Will skip the task initialization", streamThread.state());
} else if (streamThread.assignmentErrorCode.get() != StreamsPartitionAssignor.Error.NONE.code()) {

This comment has been minimized.

Copy link
@cadonna

cadonna Jul 9, 2019

Contributor

Isn't this a rather brittle fix? How does this code guarantee that after streamThread.assignmentErrorCode.get() != StreamsPartitionAssignor.Error.NONE.code() is false, the result of streamThread.assignmentErrorCode.get() does not change again before the tasks are created?

This comment has been minimized.

Copy link
@abbccdda

abbccdda Jul 9, 2019

Author Contributor

in PartitionsAssigned we should have already passed the assignment error placement, so I won't expect another flip for its value during this round rebalance.

This comment has been minimized.

Copy link
@cadonna

cadonna Jul 10, 2019

Contributor

Makes sense

@@ -1179,7 +1197,7 @@ private void completeShutdown(final boolean cleanRun) {
// intentionally do not check the returned flag
setState(State.PENDING_SHUTDOWN);

log.info("Shutting down");
log.info("Shutting down with cleanRun {}", cleanRun);

This comment has been minimized.

Copy link
@cadonna

cadonna Jul 9, 2019

Contributor

I would not use variable names in log messages.

@abbccdda abbccdda force-pushed the abbccdda:system_test_fix branch Jul 9, 2019

@abbccdda abbccdda force-pushed the abbccdda:system_test_fix branch to 9c13d42 Jul 9, 2019

@vvcephei
Copy link
Contributor

left a comment

LGTM. Just one minor suggestion.

@abbccdda abbccdda force-pushed the abbccdda:system_test_fix branch to a99606b Jul 10, 2019

@cadonna
Copy link
Contributor

left a comment

LGTM
@mjsax, What is so bad about using a mock for the task manager?

if (streamThread.assignmentErrorCode.get() == StreamsPartitionAssignor.Error.NONE.code()) {
log.error("State transition from {} to PARTITIONS_ASSIGNED failed. " +
"Will skip the task initialization", streamThread.state());
} else if (streamThread.assignmentErrorCode.get() != StreamsPartitionAssignor.Error.NONE.code()) {

This comment has been minimized.

Copy link
@cadonna

cadonna Jul 10, 2019

Contributor

Makes sense

@@ -695,6 +698,85 @@ public void shouldShutdownTaskManagerOnCloseWithoutStart() {
EasyMock.verify(taskManager);
}

@Test
public void shouldNotAccessTaskManagerWhenPendingShutdownInRunOnce() {

This comment has been minimized.

Copy link
@cadonna

cadonna Jul 10, 2019

Contributor

I would give this test a more meaningful name, e.g. shouldNotThrowWhenPendingShutdownInRunOnce.

}

@Test
public void shouldHaveNoErrorAccessTaskManagerInRunOnceNoShutdown() {

This comment has been minimized.

Copy link
@cadonna

cadonna Jul 10, 2019

Contributor

Same as above, e.g. shouldNotThrowWithoutPendingShutdownInRunOnce

return;
}
if (streamThread.assignmentErrorCode.get() == StreamsPartitionAssignor.Error.NONE.code()) {
log.debug("Skipping task creation in rebalance because we are already in {} state.",

This comment has been minimized.

Copy link
@cadonna

cadonna Jul 10, 2019

Contributor

Shouldn't the indentation be as follows?

log.debug(
    "Skipping task creation in rebalance because we are already in {} state.",
    streamThread.state()
);
streamThread.state());
} else if (streamThread.assignmentErrorCode.get() != StreamsPartitionAssignor.Error.NONE.code()) {
log.debug("Encountered assignment error during partition assignment: {}. " +
"Will skip the task initialization", streamThread.assignmentErrorCode);

This comment has been minimized.

Copy link
@cadonna

cadonna Jul 10, 2019

Contributor

Shouldn't the indentation be as follows?

log.debug(
    "Encountered assignment error during partition assignment: {}. Will skip the task initialization", 
    streamThread.assignmentErrorCode
);

The first parameter is one or two characters too long, though.

@cadonna

This comment has been minimized.

Copy link
Contributor

commented Jul 10, 2019

Retest this, please

@abbccdda

This comment has been minimized.

Copy link
Contributor Author

commented Jul 10, 2019

@cadonna Thank you for the review, addressed!

@mjsax

This comment has been minimized.

Copy link
Member

commented Jul 10, 2019

@cadonna The bug is, that if we don't create tasks on the TaskManager and when we later try to receive tasks from it, we hit an NPE. Hence, the TaskManager is on the test code path -- mocking it seems to make the test weak IMHO.

@mjsax
mjsax approved these changes Jul 10, 2019
@abbccdda

This comment has been minimized.

Copy link
Contributor Author

commented Jul 10, 2019

Retest this please

@cadonna

This comment has been minimized.

Copy link
Contributor

commented Jul 10, 2019

@mjsax I do not agree. The TaskManager is on the test code path, but it does not need to be tested. A unit test should test the class under test which is StreamThread in this case and avoid as much as possible testing other classes. With a TaskManager mock, you could easily verify if the calls to the TaskManager that would result in a NullPointerException are done or not. In addition, with a TaskManager mock the tests would also better document what the productive code is supposed to do because you have to specify it explicitly on the mock.

@mjsax

This comment has been minimized.

Copy link
Member

commented Jul 10, 2019

@cadonna Maybe. Do you want that we switch back to a mock? If yes, please let @abbccdda know. If not, please let me know so the PR can be merged. Your call.

@abbccdda

This comment has been minimized.

Copy link
Contributor Author

commented Jul 10, 2019

Retest this please

@abbccdda

This comment has been minimized.

Copy link
Contributor Author

commented Jul 10, 2019

@cadonna @mjsax My take is that the purpose of mocking taskManager is to create a conditional behavior path: if you set an active task, you could retrieve it later; if you don't, you will get NPE, while the setup of real taskManger is too much compared with a mock. Actually in our case, the mock examples in other tests doesn't quite fit to implement this mentioned conditional logic, and the task manager setup is not bad either. So we should be fine just creating a new client in this case.

@ijuma

This comment has been minimized.

Copy link
Contributor

commented Jul 11, 2019

Reopening to trigger Jenkins build.

@ijuma ijuma closed this Jul 11, 2019

@ijuma ijuma reopened this Jul 11, 2019

@cadonna

This comment has been minimized.

Copy link
Contributor

commented Jul 11, 2019

@abbccdda: What you describe is also a valid use case of a mock, but it only achieves the isolation of the class under test. It does not document well the expected behaviour of productive code. Furthermore, IMO conditional behaviour in a mock should be avoided if possible, because it introduces code that needs maintenance and could be wrong. A mock should be as simple as possible. Admittedly, in this case the code would probably not be that complex, but if it could be avoided, I would avoid it.

@mjsax @abbccdda: I do not want to block this fix further. The test seems to be correct. My concern is about clean and maintainable code. Up to you to follow my reasoning or not.

@bbejeck

This comment has been minimized.

Copy link
Contributor

commented Jul 11, 2019

retest this please

2 similar comments
@bbejeck

This comment has been minimized.

Copy link
Contributor

commented Jul 11, 2019

retest this please

@bbejeck

This comment has been minimized.

Copy link
Contributor

commented Jul 11, 2019

retest this please

@abbccdda

This comment has been minimized.

Copy link
Contributor Author

commented Jul 11, 2019

Checked log and the single test failure was in RebalanceSourceConnectorsIntegrationTest in Connect, not related

@bbejeck

This comment has been minimized.

Copy link
Contributor

commented Jul 11, 2019

retest this please

2 similar comments
@abbccdda

This comment has been minimized.

Copy link
Contributor Author

commented Jul 12, 2019

retest this please

@abbccdda

This comment has been minimized.

Copy link
Contributor Author

commented Jul 12, 2019

retest this please

@bbejeck

This comment has been minimized.

Copy link
Contributor

commented Jul 12, 2019

failures unrelated
retest this please

@abbccdda

This comment has been minimized.

Copy link
Contributor Author

commented Jul 12, 2019

All flaky tests:
ExampleConnectIntegrationTest > testSourceConnector

@abbccdda

This comment has been minimized.

Copy link
Contributor Author

commented Jul 12, 2019

retest this please

@abbccdda

This comment has been minimized.

Copy link
Contributor Author

commented Jul 13, 2019

@mjsax @bbejeck I have witnessed 4 times 2/3 green builds, and all failures are flaky tests including this time: ExampleConnectIntegrationTest.testSourceConnector
I personally don't think there is any further meaning to trigger another jenkins build, but you will make the call.

@mjsax mjsax merged commit 3e48bdb into apache:trunk Jul 15, 2019

2 of 4 checks passed

Jenkins Looks like there's a problem with this pull request
Details
JDK 11 and Scala 2.12 FAILURE 11662 tests run, 67 skipped, 1 failed.
Details
JDK 11 and Scala 2.13 SUCCESS 11662 tests run, 67 skipped, 0 failed.
Details
JDK 8 and Scala 2.11 SUCCESS 11662 tests run, 67 skipped, 0 failed.
Details
mjsax added a commit that referenced this pull request Jul 15, 2019
KAFKA-8620: fix NPE due to race condition during shutdown while rebal…
…ancing (#7021)

Reviewers: Matthias J. Sax <matthias@confluent.io>, Bruno Cadonna <bruno@confluent.io>, John Roesler <john@confluent.io>
mjsax added a commit that referenced this pull request Jul 15, 2019
KAFKA-8620: fix NPE due to race condition during shutdown while rebal…
…ancing (#7021)

Reviewers: Matthias J. Sax <matthias@confluent.io>, Bruno Cadonna <bruno@confluent.io>, John Roesler <john@confluent.io>
// when the state is already in NOT_RUNNING, all its transitions
// will be refused but we do not throw exception here
return null;
} else if (state == State.PARTITIONS_REVOKED && newState == State.PARTITIONS_REVOKED) {
log.debug("Invalid transition from PARTITIONS_REVOKED to PARTITIONS_REVOKED: " +
"self transition is not allowed");

This comment has been minimized.

Copy link
@guozhangwang

guozhangwang Jul 15, 2019

Contributor

I thought about this when tightening the FSM before but the unit tests reminds me of one thing: our current contract is that we only transit to PARTITIONS_REVOKED when calling onPartitionsRevoked, which is called only once at the beginning of the rebalance today, so keeping it strict is better just in case we have incorrect partial rebalance procedure.

With KIP-429 this may be violated so we need to revisit our FSM once Streams adopt cooperative protocols. cc @ableegoldman who's working on this.

"will abort the current process and re-throw at the end of rebalance: {}",
t
);
"will abort the current process and re-throw at the end of rebalance", t);

This comment has been minimized.

Copy link
@guozhangwang

guozhangwang Jul 15, 2019

Contributor

Nice catch.

mjsax added a commit that referenced this pull request Jul 15, 2019
KAFKA-8620: fix NPE due to race condition during shutdown while rebal…
…ancing (#7021)

Reviewers: Matthias J. Sax <matthias@confluent.io>, Bruno Cadonna <bruno@confluent.io>, John Roesler <john@confluent.io>
@mjsax

This comment has been minimized.

Copy link
Member

commented Jul 15, 2019

Merged to trunk and cherry-picked to 2.3, 2.2, and 2.1 branches.

ijuma added a commit to confluentinc/kafka that referenced this pull request Jul 20, 2019
Merge remote-tracking branch 'apache-github/2.3' into ccs-2.3
* apache-github/2.3:
  MINOR: Update documentation for enabling optimizations (apache#7099)
  MINOR: Remove stale streams producer retry default docs. (apache#6844)
  KAFKA-8635; Skip client poll in Sender loop when no request is sent (apache#7085)
  KAFKA-8615: Change to track partition time breaks TimestampExtractor (apache#7054)
  KAFKA-8670; Fix exception for kafka-topics.sh --describe without --topic mentioned (apache#7094)
  KAFKA-8602: Separate PR for 2.3 branch (apache#7092)
  KAFKA-8530; Check for topic authorization errors in OffsetFetch response (apache#6928)
  KAFKA-8662; Fix producer metadata error handling and consumer manual assignment (apache#7086)
  KAFKA-8637: WriteBatch objects leak off-heap memory (apache#7050)
  KAFKA-8620: fix NPE due to race condition during shutdown while rebalancing (apache#7021)
  HOT FIX: close RocksDB objects in correct order (apache#7076)
  KAFKA-7157: Fix handling of nulls in TimestampConverter (apache#7070)
  KAFKA-6605: Fix NPE in Flatten when optional Struct is null (apache#5705)
  Fixes #8198 KStreams testing docs use non-existent method pipe (apache#6678)
  KAFKA-5998: fix checkpointableOffsets handling (apache#7030)
  KAFKA-8653; Default rebalance timeout to session timeout for JoinGroup v0 (apache#7072)
  KAFKA-8591; WorkerConfigTransformer NPE on connector configuration reloading (apache#6991)
  MINOR: add upgrade text (apache#7013)
  Bump version to 2.3.1-SNAPSHOT
xiowu0 added a commit to linkedin/kafka that referenced this pull request Aug 22, 2019
[LI-CHERRY-PICK] [0f0093c] KAFKA-8620: fix NPE due to race condition …
…during shutdown while rebalancing (apache#7021)

TICKET = KAFKA-8620
LI_DESCRIPTION =
EXIT_CRITERIA = HASH [0f0093c]
ORIGINAL_DESCRIPTION =

Reviewers: Matthias J. Sax <matthias@confluent.io>, Bruno Cadonna <bruno@confluent.io>, John Roesler <john@confluent.io>
(cherry picked from commit 0f0093c)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
7 participants
You can’t perform that action at this time.