-
Notifications
You must be signed in to change notification settings - Fork 13.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-10199: Remove lost tasks in state updater with new remove #15870
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @cadonna ! Left some comments/questions
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
Show resolved
Hide resolved
} catch (final ExecutionException executionException) { | ||
log.warn("An exception happened when removing task {} from the state updater. The exception will be handled later: ", | ||
taskId, executionException); | ||
} catch (final InterruptedException ignored) { } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we just ignore this? I see other classes in the package rethrowing it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me check...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are quite inconsistent on how we treat InterruptedException
. In some places we ignore them because they should not happen and in others we treat them as fatal and throw an IllegalStateException
because they should not happen [1].
[1]
kafka/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
Line 597 in b36cf4e
} catch (final InterruptedException fatalException) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am actually in favor of treating them as fatal and throw an IllegalStateException
to make it more explicit that interruption of a stream thread should not happen. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good!
@@ -1421,15 +1422,20 @@ public void shouldRemoveAllActiveTasksFromStateUpdaterOnPartitionLost() { | |||
.withInputPartitions(taskId02Partitions).build(); | |||
final TasksRegistry tasks = mock(TasksRegistry.class); | |||
final TaskManager taskManager = setupForRevocationAndLost(mkSet(task1, task2, task3), tasks); | |||
final CompletableFuture<StateUpdater.RemovedTaskResult> future1 = new CompletableFuture<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need a test that covers the part where we get an exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me write one.
…he#15870) Uses the new remove operation of the state updater that returns a future to remove lost tasks from the state udpater. Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
Uses the new remove operation of the state updater that returns a future to remove lost tasks from the state udpater.
Committer Checklist (excluded from commit message)