Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-8412: Still a nullpointer exception thrown on shutdown whi… #7207

Merged
merged 3 commits into from Aug 26, 2019

Conversation

@cpettitt-confluent
Copy link
Contributor

commented Aug 13, 2019

…le flushing before closing producers

Prior to this change an NPE is raised when calling AssignedTasks.close
under the following conditions:

  1. EOS is enabled
  2. The task was in a suspended state

The cause for the NPE is that when a clean close is requested for a
StreamTask the StreamTask tries to commit. However, in the suspended
state there is no producer so ultimately an NPE is thrown for the
contained RecordCollector in flush.

It is my opinion that in the long term, this (and probably other
surprising state interactions) could be cleaned up by consolidating
state into one place instead of spreading it across AssignedTasks,
StreamTask, and AbstractTask. However, that is a much larger, more risky
change, and this issue is currently considered minor.

The fix put forth in this commit is to have AssignedTasks call
closeSuspended when it knows the underlying StreamTask is suspended.

Currently the only externally visible way to detect this problem in test
seems to be via logging. This is because the NPE is logged but then
suppressed under the following sequence:

RecordCollectorImpl.flush:266
- throws NPE (producer is null)

StreamTask.suspend:578
- goes through the finally block and then reraises the NPE

StreamTask.close:706
- catches the NPE, calls closeSuspended with the NPE

StreamTask.closeSuspended:676
- rethrows the NPE after some cleanup

AssignedTasks.close:341
- catches and logs the exception
- tries a "dirty" close (clean = true) which succeeds
- firstException is NOT set because the test !closeUnclean(task)
does not hold.

It seems this is not the intended behavior? If so, I will happily
correct that and stop using logging as a way to detect failure.

Otherwise this commit does not currently pass checkstyle because I'm
using blacklisted imports: LogCaptureAppender and its various
dependencies from log4j. I would appreciate guidance as to whether we
should whitelist these or use another technique for detection.

Note also that this test is quite involved. I could have just tested
that AssignedTasks calls closeSuspended when appropriate, but that is
testing, IMO, a detail of the implementation and doesn't actually verify
we reproduced the original problem as it was described. I feel much more
confident that we are reproducing the behavior - and we can test exactly
the conditions that lead to it - when testing across AssignedTasks and
StreamTask. I believe this is an additional support for the argument of
eventually consolidating the state split across classes.

More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.

Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)
KAFKA-8412[WIP]: Still a nullpointer exception thrown on shutdown whi…
…le flushing before closing producers

Prior to this change an NPE is raised when calling AssignedTasks.close
under the following conditions:

1. EOS is enabled
2. The task was in a suspended state

The cause for the NPE is that when a clean close is requested for a
StreamTask the StreamTask tries to commit. However, in the suspended
state there is no producer so ultimately an NPE is thrown for the
contained RecordCollector in flush.

It is my opinion that in the long term, this (and probably other
surprising state interactions) could be cleaned up by consolidating
state into one place instead of spreading it across AssignedTasks,
StreamTask, and AbstractTask. However, that is a much larger, more risky
change, and this issue is currently considered minor.

The fix put forth in this commit is to have AssignedTasks call
closeSuspended when it knows the underlying StreamTask is suspended.

Currently the only externally visible way to detect this problem in test
seems to be via logging. This is because the NPE is logged but then
suppressed under the following sequence:

RecordCollectorImpl.flush:266
    - throws NPE (producer is null)

StreamTask.suspend:578
    - goes through the finally block and then reraises the NPE

StreamTask.close:706
    - catches the NPE, calls closeSuspended with the NPE

StreamTask.closeSuspended:676
    - rethrows the NPE after some cleanup

AssignedTasks.close:341
    - catches and logs the exception
    - tries a "dirty" close (clean = true) which succeeds
    - firstException is NOT set because the test `!closeUnclean(task)`
      does not hold.

It seems this is not the intended behavior? If so, I will happily
correct that and stop using logging as a way to detect failure.

Otherwise this commit does not currently pass checkstyle because I'm
using blacklisted imports: `LogCaptureAppender` and its various
dependencies from `log4j`. I would appreciate guidance as to whether we
should whitelist these or use another technique for detection.

Note also that this test is quite involved. I could have just tested
that AssignedTasks calls closeSuspended when appropriate, but that is
testing, IMO, a detail of the implementation and doesn't actually verify
we reproduced the original problem as it was described. I feel much more
confident that we are reproducing the behavior - and we can test exactly
the conditions that lead to it - when testing across AssignedTasks and
StreamTask. I believe this is an additional support for the argument of
eventually consolidating the state split across classes.
@cpettitt-confluent

This comment has been minimized.

Copy link
Contributor Author

commented Aug 13, 2019

@mjsax @guozhangwang Please see patch notes, particularly about how the NPE is suppressed and thus difficult to test for. I offered a few ideas, but would appreciate your guidance.

@mjsax mjsax added the streams label Aug 13, 2019

@cpettitt-confluent

This comment has been minimized.

Copy link
Contributor Author

commented Aug 15, 2019

bump

@@ -331,26 +331,16 @@ void closeNonAssignedSuspendedTasks(final Map<TaskId, Set<TopicPartition>> newAs

void close(final boolean clean) {
final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
for (final T task : allTasks()) {
try {
task.close(clean, false);

This comment has been minimized.

Copy link
@mjsax

mjsax Aug 20, 2019

Member

Why not leaf the code "as-is" and change this line to:

if (suspended.values().contains(task)) {
    task.closeSuspended(clean, false, firstException);
} else {
    task.close(clean, false);
}

This comment has been minimized.

Copy link
@cpettitt-confluent

cpettitt-confluent Aug 20, 2019

Author Contributor

Absolutely! Will do.

@mjsax

This comment has been minimized.

Copy link
Member

commented Aug 20, 2019

I think the overall approach is correct. Side notice: should we revert #5993 that seems not to be necessary after this change any longer?

@cpettitt-confluent

This comment has been minimized.

Copy link
Contributor Author

commented Aug 20, 2019

@mjsax

Will give the revert for #5993 a shot and verify.

BTW, I cannot commit this as is because I'm using blacklisted classes (log4j, LogCaptureAppender) in the test code. Would you be ok with me whitelisting them?

@mjsax

This comment has been minimized.

Copy link
Member

commented Aug 21, 2019

I guess you can use org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender instead? Or should we remove this class and switch to kafka.utils.LogCaptureAppender? \cc @vvcephei who introduced the streams LogCaptureAppender (why did you not use the Kafka utils one?)

@cpettitt-confluent

This comment has been minimized.

Copy link
Contributor Author

commented Aug 23, 2019

I inlined closeTask per your suggestion - it's definitely an improvement.

For now I whitelisted kafka.utils.LogCaptureAppender. I actually like @vvcephei 's version better - it's more idiomatic (pure Java) and hides the log4j dependency. I think the only gap is the ability to restore the log level back to its original state. I would be happy to switch over and add this functionality if preferred by whoever reviews this commit.

@guozhangwang guozhangwang changed the title KAFKA-8412[WIP]: Still a nullpointer exception thrown on shutdown whi… KAFKA-8412: Still a nullpointer exception thrown on shutdown whi… Aug 23, 2019

@guozhangwang
Copy link
Contributor

left a comment

LGTM. Triggering one system test before merging.

@guozhangwang

This comment has been minimized.

Copy link
Contributor

commented Aug 26, 2019

System test passed, merging to trunk.

@guozhangwang guozhangwang merged commit 7334222 into apache:trunk Aug 26, 2019

1 of 3 checks passed

JDK 11 and Scala 2.13 FAILURE 11827 tests run, 77 skipped, 1 failed.
Details
JDK 8 and Scala 2.11 FAILURE 11622 tests run, 77 skipped, 1 failed.
Details
JDK 11 and Scala 2.12 SUCCESS 11827 tests run, 77 skipped, 0 failed.
Details
guozhangwang added a commit that referenced this pull request Aug 26, 2019
KAFKA-8412: Fix nullpointer exception thrown on flushing before closi…
…ng producers (#7207)

Prior to this change an NPE is raised when calling AssignedTasks.close
under the following conditions:

1. EOS is enabled
2. The task was in a suspended state

The cause for the NPE is that when a clean close is requested for a
StreamTask the StreamTask tries to commit. However, in the suspended
state there is no producer so ultimately an NPE is thrown for the
contained RecordCollector in flush.

The fix put forth in this commit is to have AssignedTasks call
closeSuspended when it knows the underlying StreamTask is suspended.

Note also that this test is quite involved. I could have just tested
that AssignedTasks calls closeSuspended when appropriate, but that is
testing, IMO, a detail of the implementation and doesn't actually verify
we reproduced the original problem as it was described. I feel much more
confident that we are reproducing the behavior - and we can test exactly
the conditions that lead to it - when testing across AssignedTasks and
StreamTask. I believe this is an additional support for the argument of
eventually consolidating the state split across classes.

Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
guozhangwang added a commit that referenced this pull request Aug 26, 2019
KAFKA-8412: Fix nullpointer exception thrown on flushing before closi…
…ng producers (#7207)

Prior to this change an NPE is raised when calling AssignedTasks.close
under the following conditions:

1. EOS is enabled
2. The task was in a suspended state

The cause for the NPE is that when a clean close is requested for a
StreamTask the StreamTask tries to commit. However, in the suspended
state there is no producer so ultimately an NPE is thrown for the
contained RecordCollector in flush.

The fix put forth in this commit is to have AssignedTasks call
closeSuspended when it knows the underlying StreamTask is suspended.

Note also that this test is quite involved. I could have just tested
that AssignedTasks calls closeSuspended when appropriate, but that is
testing, IMO, a detail of the implementation and doesn't actually verify
we reproduced the original problem as it was described. I feel much more
confident that we are reproducing the behavior - and we can test exactly
the conditions that lead to it - when testing across AssignedTasks and
StreamTask. I believe this is an additional support for the argument of
eventually consolidating the state split across classes.

Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
guozhangwang added a commit that referenced this pull request Aug 26, 2019
KAFKA-8412: Fix nullpointer exception thrown on flushing before closi…
…ng producers (#7207)

Prior to this change an NPE is raised when calling AssignedTasks.close
under the following conditions:

1. EOS is enabled
2. The task was in a suspended state

The cause for the NPE is that when a clean close is requested for a
StreamTask the StreamTask tries to commit. However, in the suspended
state there is no producer so ultimately an NPE is thrown for the
contained RecordCollector in flush.

The fix put forth in this commit is to have AssignedTasks call
closeSuspended when it knows the underlying StreamTask is suspended.

Note also that this test is quite involved. I could have just tested
that AssignedTasks calls closeSuspended when appropriate, but that is
testing, IMO, a detail of the implementation and doesn't actually verify
we reproduced the original problem as it was described. I feel much more
confident that we are reproducing the behavior - and we can test exactly
the conditions that lead to it - when testing across AssignedTasks and
StreamTask. I believe this is an additional support for the argument of
eventually consolidating the state split across classes.

Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
@guozhangwang

This comment has been minimized.

Copy link
Contributor

commented Aug 26, 2019

Cherry-picked to 2.3, 2.2, and 2.1 as well.

Thanks @cpettitt-confluent for your contribution.

@cpettitt-confluent cpettitt-confluent deleted the cpettitt-confluent:kafka-8412 branch Aug 26, 2019

bbejeck added a commit that referenced this pull request Sep 4, 2019
bbejeck added a commit that referenced this pull request Sep 4, 2019
cpettitt-confluent added a commit to cpettitt-confluent/kafka that referenced this pull request Sep 5, 2019
KAFKA-8878: Fix flaky test AssignedStreamsTasksTest#shouldCloseCleanl…
…yWithSuspendedTaskAndEOS

The previous approach to testing KAFKA-8412 was to look at the logs and
determine if an error occurred during close. There was no direct way to
detect than an exception occurred because the exception was eaten in
`AssignedTasks.close`. In the PR for that ticket (apache#7207) it was
acknowledged that this was a brittle way to test for the exception. We
now see occasional failures because an unrelated ERROR level log entry
is made while closing the task.

This change eliminates the brittle log checking by rethrowing any time
an exception occurs in close, even when a subsequent unclean close
succeeds. This has the potential benefit of uncovering other supressed
exceptions down the road.

I've verified that even with us rethrowing on `closeUnclean` that all
tests pass.
bbejeck added a commit that referenced this pull request Sep 10, 2019
KAFKA-8878: Fix flaky test AssignedStreamsTasksTest#shouldCloseCleanl…
…yWithSuspendedTaskAndEOS (#7302)

The previous approach to testing KAFKA-8412 was to look at the logs and
determine if an error occurred during close. There was no direct way to
detect than an exception occurred because the exception was eaten in
AssignedTasks.close. In the PR for that ticket (#7207) it was
acknowledged that this was a brittle way to test for the exception. We
now see occasional failures because an unrelated ERROR level log entry
is made while closing the task.

This change eliminates the brittle log checking by rethrowing any time
an exception occurs in close, even when a subsequent unclean close
succeeds. This has the potential benefit of uncovering other supressed
exceptions down the road.

I've verified that even with us rethrowing on closeUnclean that all
tests pass.

Reviewers: Matthias J. Sax <mjsax@apache.org>,  Bill Bejeck <bbejeck@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants
You can’t perform that action at this time.