New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Kafka Streams Threading: Exception handling #13957
Conversation
Catch any exceptions that escape the processing logic inside TaskExecutors and record them in the TaskManager. Make sure the TaskExecutor survives, but the task is unassigned. Add a method to TaskManager to drain the exceptions. The aim here is that the polling thread will drain the exceptions to be able to execute the uncaught exception handler, abort transactions, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR @lucasbru !
Here my feedback!
...rc/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutorTest.java
Outdated
Show resolved
Hide resolved
public void shouldNotSetUncaughtExceptionsForUnassignedTasks() { | ||
taskManager.add(Collections.singleton(task)); | ||
|
||
assertThrows(IllegalArgumentException.class, () -> taskManager.setUncaughtException(exception, task.id())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please also verify the error message? Otherwise, we actually do not know which IllegalArgumentException
was thrown.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
taskManager.assignNextTask(taskExecutor); | ||
taskManager.setUncaughtException(exception, task.id()); | ||
|
||
assertThrows(IllegalArgumentException.class, () -> taskManager.setUncaughtException(exception, task.id())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please also verify the error message? Otherwise, we actually do not know which IllegalArgumentException
was thrown.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@Test | ||
public void shouldReturnExceptionsOnDrainExceptions() { | ||
taskManager.add(Collections.singleton(task)); | ||
when(tasks.activeTasks()).thenReturn(Collections.singleton(task)); | ||
taskManager.assignNextTask(taskExecutor); | ||
taskManager.setUncaughtException(exception, task.id()); | ||
|
||
assertEquals(taskManager.drainUncaughtExceptions(), Collections.singletonMap(task.id(), exception)); | ||
} | ||
|
||
@Test | ||
public void shouldClearExceptionsOnDrainExceptions() { | ||
taskManager.add(Collections.singleton(task)); | ||
when(tasks.activeTasks()).thenReturn(Collections.singleton(task)); | ||
taskManager.assignNextTask(taskExecutor); | ||
taskManager.setUncaughtException(exception, task.id()); | ||
taskManager.drainUncaughtExceptions(); | ||
|
||
assertEquals(taskManager.drainUncaughtExceptions(), Collections.emptyMap()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think, it would be OK to merge these two tests. Just add the assertEquals()
of the second test to the end of the first test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
uncaughtExceptions.put(taskId, exception); | ||
}); | ||
|
||
log.info("Set an uncaught exception for task {}", taskId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we log some more information about the exception, like the type and the error message? Just to be able to better reason about the error case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
* Setting an uncaught exception for a task prevents it from being reassigned until the | ||
* corresponding exception has been handled in the polling thread. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That part is not yet implemented, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct
…nals/tasks/DefaultTaskExecutorTest.java Co-authored-by: Bruno Cadonna <cadonna@apache.org>
Test failures are unrelated |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR @lucasbru !
LGTM!
Build failures are unrelated:
|
Catch any exceptions that escape the processing logic inside TaskExecutors and record them in the TaskManager. Make sure the TaskExecutor survives, but the task is unassigned. Add a method to TaskManager to drain the exceptions. The aim here is that the polling thread will drain the exceptions to be able to execute the uncaught exception handler, abort transactions, etc. Reviewer: Bruno Cadonna <cadonna@apache.org>
Catch any exceptions that escape the processing logic inside TaskExecutors and record them in the TaskManager. Make sure the TaskExecutor survives, but the task is unassigned. Add a method to TaskManager to drain the exceptions. The aim here is that the polling thread will drain the exceptions to be able to execute the
uncaught exception handler, abort transactions, etc.
The logic is tested by basic unit tests.