From b9af6b3f7eb4015d43f4f7ff86ffda98fe82a315 Mon Sep 17 00:00:00 2001 From: Bruno Cadonna Date: Wed, 28 Jun 2023 16:08:20 +0200 Subject: [PATCH 1/4] KAFKA-10199: Consider tasks in state updater when computing offset sums With the state updater, the task manager needs also to look into the tasks owned by the state updater when computing the sum of offsets of the state. This sum of offsets is used by the high availability assignor to assign warm-up replicas. If the task manager does not take into account tasks in the state updater, a warm-up replica will never report back that the state for the corresponding task has caught up. Consequently, the warm-up replica will never be dismissed and probing rebalances will never end.. --- .../processor/internals/TaskManager.java | 39 ++++++++----- .../processor/internals/TaskManagerTest.java | 58 +++++++++++++++++++ 2 files changed, 81 insertions(+), 16 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 35b0aab3613e..edf5e0fd5be2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -1138,28 +1138,35 @@ public void signalResume() { public Map getTaskOffsetSums() { final Map taskOffsetSums = new HashMap<>(); - // Not all tasks will create directories, and there may be directories for tasks we don't currently own, - // so we consider all tasks that are either owned or on disk. This includes stateless tasks, which should - // just have an empty changelogOffsets map. - for (final TaskId id : union(HashSet::new, lockedTaskDirectories, tasks.allTaskIds())) { - final Task task = tasks.contains(id) ? tasks.task(id) : null; - // Closed and uninitialized tasks don't have any offsets so we should read directly from the checkpoint - if (task != null && task.state() != State.CREATED && task.state() != State.CLOSED) { + final Map tasks = allTasks(); + final Set createdAndClosedTasks = new HashSet<>(); + for (final Task task : tasks.values()) { + if (task.state() != State.CREATED && task.state() != State.CLOSED) { final Map changelogOffsets = task.changelogOffsets(); if (changelogOffsets.isEmpty()) { - log.debug("Skipping to encode apparently stateless (or non-logged) offset sum for task {}", id); + log.debug("Skipping to encode apparently stateless (or non-logged) offset sum for task {}", + task.id()); } else { - taskOffsetSums.put(id, sumOfChangelogOffsets(id, changelogOffsets)); + taskOffsetSums.put(task.id(), sumOfChangelogOffsets(task.id(), changelogOffsets)); } } else { - final File checkpointFile = stateDirectory.checkpointFileFor(id); - try { - if (checkpointFile.exists()) { - taskOffsetSums.put(id, sumOfChangelogOffsets(id, new OffsetCheckpoint(checkpointFile).read())); - } - } catch (final IOException e) { - log.warn(String.format("Exception caught while trying to read checkpoint for task %s:", id), e); + createdAndClosedTasks.add(task.id()); + } + } + + // Not all tasks will create directories, and there may be directories for tasks we don't currently own, + // so we consider all tasks that are either owned or on disk. This includes stateless tasks, which should + // just have an empty changelogOffsets map. + final Set lockedTaskDirectoriesOfNonOwnedTasks = new HashSet<>(lockedTaskDirectories); + lockedTaskDirectoriesOfNonOwnedTasks.removeAll(tasks.keySet()); + for (final TaskId id : union(HashSet::new, lockedTaskDirectoriesOfNonOwnedTasks, createdAndClosedTasks)) { + final File checkpointFile = stateDirectory.checkpointFileFor(id); + try { + if (checkpointFile.exists()) { + taskOffsetSums.put(id, sumOfChangelogOffsets(id, new OffsetCheckpoint(checkpointFile).read())); } + } catch (final IOException e) { + log.warn(String.format("Exception caught while trying to read checkpoint for task %s:", id), e); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index 036e2ef7924e..95ebd7db467b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -1592,6 +1592,64 @@ public void shouldComputeOffsetSumForNonRunningActiveTask() throws Exception { computeOffsetSumAndVerify(changelogOffsets, expectedOffsetSums); } + @Test + public void shouldComputeOffsetSumForRestoringActiveTaskWithStateUpdater() { + final StreamTask restoringStatefulTask = statefulTask(taskId00, taskId00ChangelogPartitions) + .inState(State.RESTORING).build(); + final long changelogOffset = 42L; + when(restoringStatefulTask.changelogOffsets()).thenReturn(mkMap(mkEntry(t1p0changelog, changelogOffset))); + final TasksRegistry tasks = Mockito.mock(TasksRegistry.class); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); + when(stateUpdater.getTasks()).thenReturn(mkSet(restoringStatefulTask)); + + assertThat(taskManager.getTaskOffsetSums(), is(mkMap(mkEntry(taskId00, changelogOffset)))); + } + + @Test + public void shouldComputeOffsetSumForRestoringStandbyTaskWithStateUpdater() { + final StandbyTask restoringStandbyTask = standbyTask(taskId00, taskId00ChangelogPartitions) + .inState(State.RUNNING).build(); + final long changelogOffset = 42L; + when(restoringStandbyTask.changelogOffsets()).thenReturn(mkMap(mkEntry(t1p0changelog, changelogOffset))); + final TasksRegistry tasks = Mockito.mock(TasksRegistry.class); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); + when(stateUpdater.getTasks()).thenReturn(mkSet(restoringStandbyTask)); + + assertThat(taskManager.getTaskOffsetSums(), is(mkMap(mkEntry(taskId00, changelogOffset)))); + } + + @Test + public void shouldComputeOffsetSumForRunningStatefulTaskAndRestoringTaskWithStateUpdater() { + final StreamTask runningStatefulTask = statefulTask(taskId00, taskId00ChangelogPartitions) + .inState(State.RUNNING).build(); + final StreamTask restoringStatefulTask = statefulTask(taskId01, taskId01ChangelogPartitions) + .inState(State.RESTORING).build(); + final StandbyTask restoringStandbyTask = standbyTask(taskId02, taskId02ChangelogPartitions) + .inState(State.RUNNING).build(); + final long changelogOffsetOfRunningTask = 42L; + final long changelogOffsetOfRestoringStatefulTask = 24L; + final long changelogOffsetOfRestoringStandbyTask = 84L; + when(runningStatefulTask.changelogOffsets()) + .thenReturn(mkMap(mkEntry(t1p0changelog, changelogOffsetOfRunningTask))); + when(restoringStatefulTask.changelogOffsets()) + .thenReturn(mkMap(mkEntry(t1p1changelog, changelogOffsetOfRestoringStatefulTask))); + when(restoringStandbyTask.changelogOffsets()) + .thenReturn(mkMap(mkEntry(t1p2changelog, changelogOffsetOfRestoringStandbyTask))); + final TasksRegistry tasks = Mockito.mock(TasksRegistry.class); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); + when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId00, runningStatefulTask))); + when(stateUpdater.getTasks()).thenReturn(mkSet(restoringStandbyTask, restoringStatefulTask)); + + assertThat( + taskManager.getTaskOffsetSums(), + is(mkMap( + mkEntry(taskId00, changelogOffsetOfRunningTask), + mkEntry(taskId01, changelogOffsetOfRestoringStatefulTask), + mkEntry(taskId02, changelogOffsetOfRestoringStandbyTask) + )) + ); + } + @Test public void shouldSkipUnknownOffsetsWhenComputingOffsetSum() throws Exception { final Map changelogOffsets = mkMap( From c4bbd2cf4b2a838d68763b6db21f8ca62a1fe9bb Mon Sep 17 00:00:00 2001 From: Bruno Cadonna Date: Wed, 28 Jun 2023 17:25:57 +0200 Subject: [PATCH 2/4] Save on collections --- .../kafka/streams/processor/internals/TaskManager.java | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index edf5e0fd5be2..79d6acfd3868 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -1139,7 +1139,8 @@ public Map getTaskOffsetSums() { final Map taskOffsetSums = new HashMap<>(); final Map tasks = allTasks(); - final Set createdAndClosedTasks = new HashSet<>(); + final Set lockedTaskDirectoriesOfNonOwnedTasksAndClosedAndCreatedTasks = + union(HashSet::new, lockedTaskDirectories, tasks.keySet()); for (final Task task : tasks.values()) { if (task.state() != State.CREATED && task.state() != State.CLOSED) { final Map changelogOffsets = task.changelogOffsets(); @@ -1149,17 +1150,14 @@ public Map getTaskOffsetSums() { } else { taskOffsetSums.put(task.id(), sumOfChangelogOffsets(task.id(), changelogOffsets)); } - } else { - createdAndClosedTasks.add(task.id()); + lockedTaskDirectoriesOfNonOwnedTasksAndClosedAndCreatedTasks.remove(task.id()); } } // Not all tasks will create directories, and there may be directories for tasks we don't currently own, // so we consider all tasks that are either owned or on disk. This includes stateless tasks, which should // just have an empty changelogOffsets map. - final Set lockedTaskDirectoriesOfNonOwnedTasks = new HashSet<>(lockedTaskDirectories); - lockedTaskDirectoriesOfNonOwnedTasks.removeAll(tasks.keySet()); - for (final TaskId id : union(HashSet::new, lockedTaskDirectoriesOfNonOwnedTasks, createdAndClosedTasks)) { + for (final TaskId id : lockedTaskDirectoriesOfNonOwnedTasksAndClosedAndCreatedTasks) { final File checkpointFile = stateDirectory.checkpointFileFor(id); try { if (checkpointFile.exists()) { From cd76885154f5d5974f2a5e4e62cfe75cbcc83f3f Mon Sep 17 00:00:00 2001 From: Bruno Cadonna Date: Thu, 29 Jun 2023 10:54:48 +0200 Subject: [PATCH 3/4] Improve code and tests --- .../streams/processor/internals/TaskManager.java | 12 ++++++------ .../processor/internals/TaskManagerTest.java | 16 ++++++++++++++-- 2 files changed, 20 insertions(+), 8 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 79d6acfd3868..b4d82fd910fd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -1138,9 +1138,11 @@ public void signalResume() { public Map getTaskOffsetSums() { final Map taskOffsetSums = new HashMap<>(); + // Not all tasks will create directories, and there may be directories for tasks we don't currently own, + // so we consider all tasks that are either owned or on disk. This includes stateless tasks, which should + // just have an empty changelogOffsets map. final Map tasks = allTasks(); - final Set lockedTaskDirectoriesOfNonOwnedTasksAndClosedAndCreatedTasks = - union(HashSet::new, lockedTaskDirectories, tasks.keySet()); + final Set lockedTaskDirectoriesOfNonOwnedTasksAndClosedAndCreatedTasks = new HashSet<>(lockedTaskDirectories); for (final Task task : tasks.values()) { if (task.state() != State.CREATED && task.state() != State.CLOSED) { final Map changelogOffsets = task.changelogOffsets(); @@ -1154,9 +1156,6 @@ public Map getTaskOffsetSums() { } } - // Not all tasks will create directories, and there may be directories for tasks we don't currently own, - // so we consider all tasks that are either owned or on disk. This includes stateless tasks, which should - // just have an empty changelogOffsets map. for (final TaskId id : lockedTaskDirectoriesOfNonOwnedTasksAndClosedAndCreatedTasks) { final File checkpointFile = stateDirectory.checkpointFileFor(id); try { @@ -1182,6 +1181,7 @@ private void tryToLockAllNonEmptyTaskDirectories() { // current set of actually-locked tasks. lockedTaskDirectories.clear(); + final Map allTasks = allTasks(); for (final TaskDirectory taskDir : stateDirectory.listNonEmptyTaskDirectories()) { final File dir = taskDir.file(); final String namedTopology = taskDir.namedTopology(); @@ -1189,7 +1189,7 @@ private void tryToLockAllNonEmptyTaskDirectories() { final TaskId id = parseTaskDirectoryName(dir.getName(), namedTopology); if (stateDirectory.lock(id)) { lockedTaskDirectories.add(id); - if (!tasks.contains(id)) { + if (!allTasks.containsKey(id)) { log.debug("Temporarily locked unassigned task {} for the upcoming rebalance", id); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index 95ebd7db467b..22da72feecdc 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -1593,27 +1593,39 @@ public void shouldComputeOffsetSumForNonRunningActiveTask() throws Exception { } @Test - public void shouldComputeOffsetSumForRestoringActiveTaskWithStateUpdater() { + public void shouldComputeOffsetSumForRestoringActiveTaskWithStateUpdater() throws Exception { final StreamTask restoringStatefulTask = statefulTask(taskId00, taskId00ChangelogPartitions) .inState(State.RESTORING).build(); final long changelogOffset = 42L; when(restoringStatefulTask.changelogOffsets()).thenReturn(mkMap(mkEntry(t1p0changelog, changelogOffset))); + expectLockObtainedFor(taskId00); + makeTaskFolders(taskId00.toString()); + final Map changelogOffsetInCheckpoint = mkMap(mkEntry(t1p0changelog, 24L)); + writeCheckpointFile(taskId00, changelogOffsetInCheckpoint); final TasksRegistry tasks = Mockito.mock(TasksRegistry.class); final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); when(stateUpdater.getTasks()).thenReturn(mkSet(restoringStatefulTask)); + replay(stateDirectory); + taskManager.handleRebalanceStart(singleton("topic")); assertThat(taskManager.getTaskOffsetSums(), is(mkMap(mkEntry(taskId00, changelogOffset)))); } @Test - public void shouldComputeOffsetSumForRestoringStandbyTaskWithStateUpdater() { + public void shouldComputeOffsetSumForRestoringStandbyTaskWithStateUpdater() throws Exception { final StandbyTask restoringStandbyTask = standbyTask(taskId00, taskId00ChangelogPartitions) .inState(State.RUNNING).build(); final long changelogOffset = 42L; when(restoringStandbyTask.changelogOffsets()).thenReturn(mkMap(mkEntry(t1p0changelog, changelogOffset))); + expectLockObtainedFor(taskId00); + makeTaskFolders(taskId00.toString()); + final Map changelogOffsetInCheckpoint = mkMap(mkEntry(t1p0changelog, 24L)); + writeCheckpointFile(taskId00, changelogOffsetInCheckpoint); final TasksRegistry tasks = Mockito.mock(TasksRegistry.class); final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); when(stateUpdater.getTasks()).thenReturn(mkSet(restoringStandbyTask)); + replay(stateDirectory); + taskManager.handleRebalanceStart(singleton("topic")); assertThat(taskManager.getTaskOffsetSums(), is(mkMap(mkEntry(taskId00, changelogOffset)))); } From bb70eab15884c6de609862ac3baf5a7e987acbfb Mon Sep 17 00:00:00 2001 From: Bruno Cadonna Date: Thu, 29 Jun 2023 11:18:45 +0200 Subject: [PATCH 4/4] Be defensive --- .../apache/kafka/streams/processor/internals/TaskManager.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index b4d82fd910fd..9152d20721fa 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -1142,7 +1142,8 @@ public Map getTaskOffsetSums() { // so we consider all tasks that are either owned or on disk. This includes stateless tasks, which should // just have an empty changelogOffsets map. final Map tasks = allTasks(); - final Set lockedTaskDirectoriesOfNonOwnedTasksAndClosedAndCreatedTasks = new HashSet<>(lockedTaskDirectories); + final Set lockedTaskDirectoriesOfNonOwnedTasksAndClosedAndCreatedTasks = + union(HashSet::new, lockedTaskDirectories, tasks.keySet()); for (final Task task : tasks.values()) { if (task.state() != State.CREATED && task.state() != State.CLOSED) { final Map changelogOffsets = task.changelogOffsets();