Skip to content

Commit

Permalink
KAFKA-10199: Consider tasks in state updater when computing offset su…
Browse files Browse the repository at this point in the history
…ms (#13925)

With the state updater, the task manager needs also to look into the
tasks owned by the state updater when computing the sum of offsets
of the state. This sum of offsets is used by the high availability
assignor to assign warm-up replicas.
If the task manager does not take into account tasks in the
state updater, a warm-up replica will never report back that
the state for the corresponding task has caught up. Consequently,
the warm-up replica will never be dismissed and probing rebalances
will never end.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Walker Carlson <wcarlson@confluent.io>
  • Loading branch information
cadonna committed Jul 3, 2023
1 parent 48eb8c9 commit 5c2492b
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 15 deletions.
Expand Up @@ -1141,25 +1141,30 @@ public Map<TaskId, Long> getTaskOffsetSums() {
// Not all tasks will create directories, and there may be directories for tasks we don't currently own,
// so we consider all tasks that are either owned or on disk. This includes stateless tasks, which should
// just have an empty changelogOffsets map.
for (final TaskId id : union(HashSet::new, lockedTaskDirectories, tasks.allTaskIds())) {
final Task task = tasks.contains(id) ? tasks.task(id) : null;
// Closed and uninitialized tasks don't have any offsets so we should read directly from the checkpoint
if (task != null && task.state() != State.CREATED && task.state() != State.CLOSED) {
final Map<TaskId, Task> tasks = allTasks();
final Set<TaskId> lockedTaskDirectoriesOfNonOwnedTasksAndClosedAndCreatedTasks =
union(HashSet::new, lockedTaskDirectories, tasks.keySet());
for (final Task task : tasks.values()) {
if (task.state() != State.CREATED && task.state() != State.CLOSED) {
final Map<TopicPartition, Long> changelogOffsets = task.changelogOffsets();
if (changelogOffsets.isEmpty()) {
log.debug("Skipping to encode apparently stateless (or non-logged) offset sum for task {}", id);
log.debug("Skipping to encode apparently stateless (or non-logged) offset sum for task {}",
task.id());
} else {
taskOffsetSums.put(id, sumOfChangelogOffsets(id, changelogOffsets));
taskOffsetSums.put(task.id(), sumOfChangelogOffsets(task.id(), changelogOffsets));
}
} else {
final File checkpointFile = stateDirectory.checkpointFileFor(id);
try {
if (checkpointFile.exists()) {
taskOffsetSums.put(id, sumOfChangelogOffsets(id, new OffsetCheckpoint(checkpointFile).read()));
}
} catch (final IOException e) {
log.warn(String.format("Exception caught while trying to read checkpoint for task %s:", id), e);
lockedTaskDirectoriesOfNonOwnedTasksAndClosedAndCreatedTasks.remove(task.id());
}
}

for (final TaskId id : lockedTaskDirectoriesOfNonOwnedTasksAndClosedAndCreatedTasks) {
final File checkpointFile = stateDirectory.checkpointFileFor(id);
try {
if (checkpointFile.exists()) {
taskOffsetSums.put(id, sumOfChangelogOffsets(id, new OffsetCheckpoint(checkpointFile).read()));
}
} catch (final IOException e) {
log.warn(String.format("Exception caught while trying to read checkpoint for task %s:", id), e);
}
}

Expand All @@ -1177,14 +1182,15 @@ private void tryToLockAllNonEmptyTaskDirectories() {
// current set of actually-locked tasks.
lockedTaskDirectories.clear();

final Map<TaskId, Task> allTasks = allTasks();
for (final TaskDirectory taskDir : stateDirectory.listNonEmptyTaskDirectories()) {
final File dir = taskDir.file();
final String namedTopology = taskDir.namedTopology();
try {
final TaskId id = parseTaskDirectoryName(dir.getName(), namedTopology);
if (stateDirectory.lock(id)) {
lockedTaskDirectories.add(id);
if (!tasks.contains(id)) {
if (!allTasks.containsKey(id)) {
log.debug("Temporarily locked unassigned task {} for the upcoming rebalance", id);
}
}
Expand Down
Expand Up @@ -1592,6 +1592,76 @@ public void shouldComputeOffsetSumForNonRunningActiveTask() throws Exception {
computeOffsetSumAndVerify(changelogOffsets, expectedOffsetSums);
}

@Test
public void shouldComputeOffsetSumForRestoringActiveTaskWithStateUpdater() throws Exception {
final StreamTask restoringStatefulTask = statefulTask(taskId00, taskId00ChangelogPartitions)
.inState(State.RESTORING).build();
final long changelogOffset = 42L;
when(restoringStatefulTask.changelogOffsets()).thenReturn(mkMap(mkEntry(t1p0changelog, changelogOffset)));
expectLockObtainedFor(taskId00);
makeTaskFolders(taskId00.toString());
final Map<TopicPartition, Long> changelogOffsetInCheckpoint = mkMap(mkEntry(t1p0changelog, 24L));
writeCheckpointFile(taskId00, changelogOffsetInCheckpoint);
final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(stateUpdater.getTasks()).thenReturn(mkSet(restoringStatefulTask));
replay(stateDirectory);
taskManager.handleRebalanceStart(singleton("topic"));

assertThat(taskManager.getTaskOffsetSums(), is(mkMap(mkEntry(taskId00, changelogOffset))));
}

@Test
public void shouldComputeOffsetSumForRestoringStandbyTaskWithStateUpdater() throws Exception {
final StandbyTask restoringStandbyTask = standbyTask(taskId00, taskId00ChangelogPartitions)
.inState(State.RUNNING).build();
final long changelogOffset = 42L;
when(restoringStandbyTask.changelogOffsets()).thenReturn(mkMap(mkEntry(t1p0changelog, changelogOffset)));
expectLockObtainedFor(taskId00);
makeTaskFolders(taskId00.toString());
final Map<TopicPartition, Long> changelogOffsetInCheckpoint = mkMap(mkEntry(t1p0changelog, 24L));
writeCheckpointFile(taskId00, changelogOffsetInCheckpoint);
final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(stateUpdater.getTasks()).thenReturn(mkSet(restoringStandbyTask));
replay(stateDirectory);
taskManager.handleRebalanceStart(singleton("topic"));

assertThat(taskManager.getTaskOffsetSums(), is(mkMap(mkEntry(taskId00, changelogOffset))));
}

@Test
public void shouldComputeOffsetSumForRunningStatefulTaskAndRestoringTaskWithStateUpdater() {
final StreamTask runningStatefulTask = statefulTask(taskId00, taskId00ChangelogPartitions)
.inState(State.RUNNING).build();
final StreamTask restoringStatefulTask = statefulTask(taskId01, taskId01ChangelogPartitions)
.inState(State.RESTORING).build();
final StandbyTask restoringStandbyTask = standbyTask(taskId02, taskId02ChangelogPartitions)
.inState(State.RUNNING).build();
final long changelogOffsetOfRunningTask = 42L;
final long changelogOffsetOfRestoringStatefulTask = 24L;
final long changelogOffsetOfRestoringStandbyTask = 84L;
when(runningStatefulTask.changelogOffsets())
.thenReturn(mkMap(mkEntry(t1p0changelog, changelogOffsetOfRunningTask)));
when(restoringStatefulTask.changelogOffsets())
.thenReturn(mkMap(mkEntry(t1p1changelog, changelogOffsetOfRestoringStatefulTask)));
when(restoringStandbyTask.changelogOffsets())
.thenReturn(mkMap(mkEntry(t1p2changelog, changelogOffsetOfRestoringStandbyTask)));
final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId00, runningStatefulTask)));
when(stateUpdater.getTasks()).thenReturn(mkSet(restoringStandbyTask, restoringStatefulTask));

assertThat(
taskManager.getTaskOffsetSums(),
is(mkMap(
mkEntry(taskId00, changelogOffsetOfRunningTask),
mkEntry(taskId01, changelogOffsetOfRestoringStatefulTask),
mkEntry(taskId02, changelogOffsetOfRestoringStandbyTask)
))
);
}

@Test
public void shouldSkipUnknownOffsetsWhenComputingOffsetSum() throws Exception {
final Map<TopicPartition, Long> changelogOffsets = mkMap(
Expand Down

0 comments on commit 5c2492b

Please sign in to comment.