-
Notifications
You must be signed in to change notification settings - Fork 13.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-10199: Add methods to add and remove tasks to task manager #12384
Conversation
To integrate the state updater into the current code, we need the ability to add and remove tasks from the task manager. This functionality is needed to ensure that a task is managed either by the task manager or by the state updater but not by both.
@@ -82,7 +82,7 @@ int process(final int maxNumRecords, final Time time) { | |||
} | |||
} catch (final Throwable t) { | |||
taskExecutionMetadata.registerTaskError(task, t, now); | |||
tasks.removeTaskFromCuccessfullyProcessedBeforeClosing(lastProcessed); | |||
tasks.removeTaskFromSuccessfullyProcessedBeforeClosing(lastProcessed); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a typo
@@ -798,12 +798,12 @@ private void closeTaskDirty(final Task task) { | |||
} catch (final RuntimeException swallow) { | |||
log.error("Error suspending dirty task {} ", task.id(), swallow); | |||
} | |||
tasks.removeTaskBeforeClosing(task.id()); | |||
tasks.removeTask(task.id()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renaming to a more appropriate name.
Set<TaskId> activeTaskIds() { | ||
return readOnlyActiveTaskIds; | ||
} | ||
|
||
Set<TaskId> standbyTaskIds() { | ||
return readOnlyStandbyTaskIds; | ||
} | ||
|
||
// TODO: change return type to `StreamTask` | ||
Map<TaskId, Task> activeTaskMap() { | ||
return readOnlyActiveTasksPerId; | ||
} | ||
|
||
// TODO: change return type to `StandbyTask` | ||
Map<TaskId, Task> standbyTaskMap() { | ||
return readOnlyStandbyTasksPerId; | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Those methods are not used anywhere.
// TODO: change return type to `StandbyTask` | ||
Task standbyTask(final TaskId taskId) { | ||
if (!standbyTasksPerId.containsKey(taskId)) { | ||
throw new IllegalStateException("Standby task unknown: " + taskId); | ||
} | ||
return standbyTasksPerId.get(taskId); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not used anywhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @cadonna thanks for the PR, I found it conflicts with my PR #12338 a bit which also tries to refactor the TaskManager. There seems to be a difference between these two approaches: in my PR the Tasks
set is basically a bookkeeper of all tasks assigned to it, including both that should only be owned and manipulated by the task-manager, and the state-updater; in your PR the Tasks
is solely owned by the TaskManager
and hence should not include any tasks that are manipulated by the state updater.
I think in order to support recycling, we need to keep those "pending" tasks still in some places, so that when they are returned by the state updater we still know which tasks should be recycled. Could you please take a look at my PR and we can discuss which approach we can take?
To integrate the state updater into the current code, we need the
ability to add and remove tasks from the task manager. This
functionality is needed to ensure that a task is managed either
by the task manager or by the state updater but not by both.
Committer Checklist (excluded from commit message)