-
Notifications
You must be signed in to change notification settings - Fork 13.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-10199: Handle assignment with new remove operation in state updater #15882
KAFKA-10199: Handle assignment with new remove operation in state updater #15882
Conversation
b1f8082
to
0dcddeb
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR @cadonna . The overall structure we discussed before. Just two comments on the code
if (stateUpdater == null) { | ||
handleTasksWithoutStateUpdater(activeTasksToCreate, standbyTasksToCreate, tasksToRecycle, tasksToCloseClean); | ||
} else { | ||
handleTasksWithStateUpdater(activeTasksToCreate, standbyTasksToCreate, tasksToRecycle, tasksToCloseClean); | ||
final Map<Task, Set<TopicPartition>> tasksToRecycleFromStateUpdater = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd put this block inside handleTasksWithStateUpdater
to separate it from the non-state updater code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done!
private void updateInputPartitions(final Map<TaskId, CompletableFuture<StateUpdater.RemovedTaskResult>> futures, | ||
final Map<TaskId, Set<TopicPartition>> newInputPartitions, | ||
final Map<TaskId, RuntimeException> failedTasks) { | ||
iterateAndActOnRemovedTask( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this whole iterateAndAct
an artifact of the history of the code? It's quite complicated, and I'm not sure if it needs to be this way. The "act" part is just regular code that doesn't have to be passed around as lambdas at all.
If you'd move the handling of InterruptException
and ExecutionException
into waitForFuture
(which would make sense IMO), couldn't you just write
futures.entrySet().stream().map(entry -> waitForFuture(entry.getKey(), entry.getValue())).forEach(removedTaskResult -> ...)
and avoid this custom-built iterate machinery?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to apply your proposal. Let me know what you think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see it's annoying because Java 8 cannot flatMap with an Optional (to map & filter at the same time) so you are using nulls... Not pretty, but still saves 400 lines of code.
We could reuse
futures.entrySet().stream()
.map(entry -> waitForFuture(entry.getKey(), entry.getValue()))
.filter(Objects::nonNull)
.map(removedTaskResult -> checkIfTaskFailed(removedTaskResult, failedTasks))
.filter(Objects::nonNull);
though, as it's repeated 4 times now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made that code re-usable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks!
…ater Uses the new remove operation of the state updater that returns a future to handle task assignment.
0350fb2
to
79d6bc2
Compare
@@ -1750,6 +1761,7 @@ Map<TaskId, Task> allTasks() { | |||
if (stateUpdater != null) { | |||
final Map<TaskId, Task> ret = stateUpdater.getTasks().stream().collect(Collectors.toMap(Task::id, x -> x)); | |||
ret.putAll(tasks.allTasksPerId()); | |||
ret.putAll(tasks.pendingTasksToInit().stream().collect(Collectors.toMap(Task::id, x -> x))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cadonna Could you please take a look at https://issues.apache.org/jira/browse/KAFKA-16774?
It seems this change could make StreamThreadTest#shouldCloseAllTaskProducersOnCloseIfEosEnabled failed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chia7712 Thanks for notifying me. I will have a look.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here the PR: #15978
…ater (apache#15882) Uses the new remove operation of the state updater that returns a future to handle task assignment. Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
…ater (apache#15882) Uses the new remove operation of the state updater that returns a future to handle task assignment. Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
Uses the new remove operation of the state updater that returns
a future to handle task assignment.
Committer Checklist (excluded from commit message)