Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-10199: Cleanup TaskManager and Task interfaces #12338

Closed

Conversation

guozhangwang
Copy link
Contributor

@guozhangwang guozhangwang commented Jun 24, 2022

This PR is to be reviewed after #12337.

In order to integrate with the state updater, we would need to refactor the TaskManager and Task interfaces. This PR achieved the following purposes:

  1. Separate active and standby tasks in the Tasks placeholder, plus adding pendingActiveTasks and pendingStandbyTasks into Tasks. The exposed active/standby tasks from the Tasks set would only be mutated by a single thread, and the pending tasks hold for those tasks that are assigned but cannot be actively managed yet. For now they include two scenarios: a) tasks from unknown sub-topologies and hence cannot be initialized, b) tasks that are pending for being recycled from active to standby and vice versa. Note case b) would be added in a follow-up PR.

  2. Extract any logic that mutates a task out of the Tasks / TaskCreators. Tasks should only be a place for maintaining the set of tasks, but not for manipulations of a task; and TaskCreators should only be used for creating the tasks, but not for anything else. These logic are all migrated into TaskManger.

  3. While doing 2) I noticed we have a couple of minor issues in the code where we duplicate the closing logics, so I also cleaned them up in the following way:
    a) When closing a task, we first remove it from Tasks, and the trigger the corresponding closeClean/Dirty function; for active task, we also remove its task producer if EOS-V1 is used.
    b) For closing dirty, we swallow the exception from close call and the remove task producer call; for closing clean, we store the thrown exception from either close call or the remove task producer, and then rethrow at the end of the caller. The difference though is that, for the exception from close call we need to retry close it dirty; for the exception from the remove task producer we do not need to re-close it dirty.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@apache apache deleted a comment from VforJuvenile Jun 24, 2022
@guozhangwang guozhangwang changed the title KAFKA-10199 [DO NOT REVIEW]: Cleanup TaskManager and Task interfaces KAFKA-10199: Cleanup TaskManager and Task interfaces Jul 7, 2022
@@ -83,7 +83,6 @@ class DefaultStateUpdaterTest {
private final Time time = new MockTime(1L);
private final StreamsConfig config = new StreamsConfig(configProps());
private final ChangelogReader changelogReader = mock(ChangelogReader.class);
private final java.util.function.Consumer<Set<TopicPartition>> offsetResetter = topicPartitions -> { };
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a leftover from previous commit, we do not need this anymore.

//
// When that occurs we stash these pending tasks until either they are finally clear to be created,
// or they are revoked from a new assignment.
private final Map<TaskId, Set<TopicPartition>> pendingActiveTasks = new HashMap<>();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is mainly for case 1) in the description.

// Tasks may have been assigned for a NamedTopology that is not yet known by this host. When that occurs we stash
// these unknown tasks until either the corresponding NamedTopology is added and we can create them at last, or
// we receive a new assignment and they are revoked from the thread.
private final Map<TaskId, Set<TopicPartition>> unknownTasksToBeCreated = new HashMap<>();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for case 2) in description.

@@ -182,7 +159,7 @@ public Collection<Task> createTasks(final Consumer<byte[], byte[]> consumer,
partitions
);

final InternalProcessorContext context = new ProcessorContextImpl(
final InternalProcessorContext<Object, Object> context = new ProcessorContextImpl(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor cleanup.

class Tasks {
private final Logger log;
private final TopologyMetadata topologyMetadata;

private final Map<TaskId, Task> allTasksPerId = Collections.synchronizedSortedMap(new TreeMap<>());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a minor cleanup: only return read only tasks upon calling.

tasksToRecycle,
tasksToCloseClean,
tasksToCloseDirty,
activeTasksToCreate,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a cleanup: we construct the pending tasks to recycle outside the handleCloseAndRecycle so that we do not need to pass in activeTasksToCreate / standbyTasksToCreate anymore.

@@ -859,16 +892,6 @@ void closeAndCleanUpTasks(final Collection<Task> activeTasks, final Collection<T
closeTaskDirty(task);
}

// TODO: change type to `StreamTask`
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for case 3) in the description: this is already in the closeTaskClean.

@@ -337,7 +369,7 @@ void addToSuccessfullyProcessed(final Task task) {
successfullyProcessed.add(task);
}

void removeTaskFromCuccessfullyProcessedBeforeClosing(final Task task) {
void removeTaskFromSuccessfullyProcessedBeforeClosing(final Task task) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed a typo in function name.

@guozhangwang guozhangwang requested a review from cadonna July 7, 2022 03:45
@guozhangwang
Copy link
Contributor Author

Closing this PR in favor of #12397

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
1 participant