diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 35b0aab3613e..9152d20721fa 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -1141,25 +1141,30 @@ public Map getTaskOffsetSums() { // Not all tasks will create directories, and there may be directories for tasks we don't currently own, // so we consider all tasks that are either owned or on disk. This includes stateless tasks, which should // just have an empty changelogOffsets map. - for (final TaskId id : union(HashSet::new, lockedTaskDirectories, tasks.allTaskIds())) { - final Task task = tasks.contains(id) ? tasks.task(id) : null; - // Closed and uninitialized tasks don't have any offsets so we should read directly from the checkpoint - if (task != null && task.state() != State.CREATED && task.state() != State.CLOSED) { + final Map tasks = allTasks(); + final Set lockedTaskDirectoriesOfNonOwnedTasksAndClosedAndCreatedTasks = + union(HashSet::new, lockedTaskDirectories, tasks.keySet()); + for (final Task task : tasks.values()) { + if (task.state() != State.CREATED && task.state() != State.CLOSED) { final Map changelogOffsets = task.changelogOffsets(); if (changelogOffsets.isEmpty()) { - log.debug("Skipping to encode apparently stateless (or non-logged) offset sum for task {}", id); + log.debug("Skipping to encode apparently stateless (or non-logged) offset sum for task {}", + task.id()); } else { - taskOffsetSums.put(id, sumOfChangelogOffsets(id, changelogOffsets)); + taskOffsetSums.put(task.id(), sumOfChangelogOffsets(task.id(), changelogOffsets)); } - } else { - final File checkpointFile = stateDirectory.checkpointFileFor(id); - try { - if (checkpointFile.exists()) { - taskOffsetSums.put(id, sumOfChangelogOffsets(id, new OffsetCheckpoint(checkpointFile).read())); - } - } catch (final IOException e) { - log.warn(String.format("Exception caught while trying to read checkpoint for task %s:", id), e); + lockedTaskDirectoriesOfNonOwnedTasksAndClosedAndCreatedTasks.remove(task.id()); + } + } + + for (final TaskId id : lockedTaskDirectoriesOfNonOwnedTasksAndClosedAndCreatedTasks) { + final File checkpointFile = stateDirectory.checkpointFileFor(id); + try { + if (checkpointFile.exists()) { + taskOffsetSums.put(id, sumOfChangelogOffsets(id, new OffsetCheckpoint(checkpointFile).read())); } + } catch (final IOException e) { + log.warn(String.format("Exception caught while trying to read checkpoint for task %s:", id), e); } } @@ -1177,6 +1182,7 @@ private void tryToLockAllNonEmptyTaskDirectories() { // current set of actually-locked tasks. lockedTaskDirectories.clear(); + final Map allTasks = allTasks(); for (final TaskDirectory taskDir : stateDirectory.listNonEmptyTaskDirectories()) { final File dir = taskDir.file(); final String namedTopology = taskDir.namedTopology(); @@ -1184,7 +1190,7 @@ private void tryToLockAllNonEmptyTaskDirectories() { final TaskId id = parseTaskDirectoryName(dir.getName(), namedTopology); if (stateDirectory.lock(id)) { lockedTaskDirectories.add(id); - if (!tasks.contains(id)) { + if (!allTasks.containsKey(id)) { log.debug("Temporarily locked unassigned task {} for the upcoming rebalance", id); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index 036e2ef7924e..22da72feecdc 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -1592,6 +1592,76 @@ public void shouldComputeOffsetSumForNonRunningActiveTask() throws Exception { computeOffsetSumAndVerify(changelogOffsets, expectedOffsetSums); } + @Test + public void shouldComputeOffsetSumForRestoringActiveTaskWithStateUpdater() throws Exception { + final StreamTask restoringStatefulTask = statefulTask(taskId00, taskId00ChangelogPartitions) + .inState(State.RESTORING).build(); + final long changelogOffset = 42L; + when(restoringStatefulTask.changelogOffsets()).thenReturn(mkMap(mkEntry(t1p0changelog, changelogOffset))); + expectLockObtainedFor(taskId00); + makeTaskFolders(taskId00.toString()); + final Map changelogOffsetInCheckpoint = mkMap(mkEntry(t1p0changelog, 24L)); + writeCheckpointFile(taskId00, changelogOffsetInCheckpoint); + final TasksRegistry tasks = Mockito.mock(TasksRegistry.class); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); + when(stateUpdater.getTasks()).thenReturn(mkSet(restoringStatefulTask)); + replay(stateDirectory); + taskManager.handleRebalanceStart(singleton("topic")); + + assertThat(taskManager.getTaskOffsetSums(), is(mkMap(mkEntry(taskId00, changelogOffset)))); + } + + @Test + public void shouldComputeOffsetSumForRestoringStandbyTaskWithStateUpdater() throws Exception { + final StandbyTask restoringStandbyTask = standbyTask(taskId00, taskId00ChangelogPartitions) + .inState(State.RUNNING).build(); + final long changelogOffset = 42L; + when(restoringStandbyTask.changelogOffsets()).thenReturn(mkMap(mkEntry(t1p0changelog, changelogOffset))); + expectLockObtainedFor(taskId00); + makeTaskFolders(taskId00.toString()); + final Map changelogOffsetInCheckpoint = mkMap(mkEntry(t1p0changelog, 24L)); + writeCheckpointFile(taskId00, changelogOffsetInCheckpoint); + final TasksRegistry tasks = Mockito.mock(TasksRegistry.class); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); + when(stateUpdater.getTasks()).thenReturn(mkSet(restoringStandbyTask)); + replay(stateDirectory); + taskManager.handleRebalanceStart(singleton("topic")); + + assertThat(taskManager.getTaskOffsetSums(), is(mkMap(mkEntry(taskId00, changelogOffset)))); + } + + @Test + public void shouldComputeOffsetSumForRunningStatefulTaskAndRestoringTaskWithStateUpdater() { + final StreamTask runningStatefulTask = statefulTask(taskId00, taskId00ChangelogPartitions) + .inState(State.RUNNING).build(); + final StreamTask restoringStatefulTask = statefulTask(taskId01, taskId01ChangelogPartitions) + .inState(State.RESTORING).build(); + final StandbyTask restoringStandbyTask = standbyTask(taskId02, taskId02ChangelogPartitions) + .inState(State.RUNNING).build(); + final long changelogOffsetOfRunningTask = 42L; + final long changelogOffsetOfRestoringStatefulTask = 24L; + final long changelogOffsetOfRestoringStandbyTask = 84L; + when(runningStatefulTask.changelogOffsets()) + .thenReturn(mkMap(mkEntry(t1p0changelog, changelogOffsetOfRunningTask))); + when(restoringStatefulTask.changelogOffsets()) + .thenReturn(mkMap(mkEntry(t1p1changelog, changelogOffsetOfRestoringStatefulTask))); + when(restoringStandbyTask.changelogOffsets()) + .thenReturn(mkMap(mkEntry(t1p2changelog, changelogOffsetOfRestoringStandbyTask))); + final TasksRegistry tasks = Mockito.mock(TasksRegistry.class); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); + when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId00, runningStatefulTask))); + when(stateUpdater.getTasks()).thenReturn(mkSet(restoringStandbyTask, restoringStatefulTask)); + + assertThat( + taskManager.getTaskOffsetSums(), + is(mkMap( + mkEntry(taskId00, changelogOffsetOfRunningTask), + mkEntry(taskId01, changelogOffsetOfRestoringStatefulTask), + mkEntry(taskId02, changelogOffsetOfRestoringStandbyTask) + )) + ); + } + @Test public void shouldSkipUnknownOffsetsWhenComputingOffsetSum() throws Exception { final Map changelogOffsets = mkMap(