-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-4540: Suspended tasks that are not assigned to the StreamThread need to be closed before new active and standby tasks are created #2266
Conversation
@mjsax @guozhangwang @enothereska - hopefully this is the last in the series of PRs to do with |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
All three builds failed with a compile error... |
@@ -233,6 +233,9 @@ public void onPartitionsAssigned(Collection<TopicPartition> assignment) { | |||
StreamThread.this.getName(), assignment); | |||
|
|||
setStateWhenNotInPendingShutdown(State.ASSIGNING_PARTITIONS); | |||
// do this first as we may have suspended standby tasks that | |||
// will become active | |||
closeNonAssignedSuspendedStandbyTasks(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we move closing non-assigned tasks into this method too? Would be a cleaner workflow -- close all not reassigned tasks (regular and stand-by), than reuse suspended tasks and create new tasks.
@@ -965,34 +985,14 @@ private void removeSuspendedTasks() { | |||
// Close task and state manager | |||
for (final AbstractTask task : suspendedTasks.values()) { | |||
task.close(); | |||
task.flushState(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because it is already done in suspendTasksAndState
so there is no need to flush again.
task.closeStateManager(); | ||
// flush out any extra data sent during close | ||
producer.flush(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because it is already done in suspendTasksAndState
. So this is not required.
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few comments.
// do this first as we may have suspended standby tasks that | ||
// will become active | ||
closeNonAssignedSuspendedTasks(); | ||
// closeNonAssignedSuspendedStandbyTasks(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this intentional?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops!
@@ -848,7 +881,7 @@ private void addStreamTasks(Collection<TopicPartition> assignment) { | |||
} | |||
|
|||
// destroy any remaining suspended tasks | |||
removeSuspendedTasks(); | |||
// removeSuspendedTasks(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also we need to remove the comments.
final Set<TaskId> currentActiveTaskIds = partitionAssignor.activeTasks().keySet(); | ||
final Iterator<Map.Entry<TaskId, StreamTask>> activeTaskIterator = suspendedActiveTasks.entrySet().iterator(); | ||
while (activeTaskIterator.hasNext()) { | ||
closeAndRemoveIfNotAssigned(currentActiveTaskIds, activeTaskIterator.next(), activeTaskIterator); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we return a boolean indicating if this task is removed or not? Then in line 964 below we can instead check that this map should then be empty since we have removed both recycled tasks or non-recycled ones (in line 863).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure how returning a boolean helps with anything. It would just indicate that a single task has been removed. If we really want to make sure we've removed all the suspended tasks we can just check if the map is empty in addStreamTasks
after the for loop.
@@ -960,34 +990,14 @@ private void removeStandbyTasks() { | |||
} | |||
|
|||
private void removeSuspendedTasks() { | |||
log.info("{} Removing all suspended tasks [{}]", logPrefix, suspendedTasks.keySet()); | |||
log.info("{} Removing all suspended tasks [{}]", logPrefix, suspendedActiveTasks.keySet()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See my comments above. If that case should we be guaranteed that suspendedActiveTasks
is always an empty map here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is no longer used and should've been removed
} else { | ||
throw e; | ||
} | ||
} catch (final LockException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did we change the thrown exceptions in ProcessorStateManager? I thought we are still throwing ProcessorStateException
which embed the LockException
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No it just throws a LockException
- i saw this when i was trying to test it.
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Updated this to take into account the partition assignment when finding StreamTasks to close. For a given |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Merged to trunk. |
… need to be closed before new active and standby tasks are created During `onPartitionsAssigned` first close, and remove, any suspended `StandbyTasks` that are no longer assigned to this consumer. Author: Damian Guy <damian.guy@gmail.com> Reviewers: Guozhang Wang <wangguoz@gmail.com> Closes apache#2266 from dguy/kafka-4540
During
onPartitionsAssigned
first close, and remove, any suspendedStandbyTasks
that are no longer assigned to this consumer.