Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -429,8 +429,8 @@ private void maybeThrowTaskExceptions(final Map<TaskId, RuntimeException> taskEx
if (exception instanceof TaskMigratedException) {
lastTaskMigrated = (TaskMigratedException) exception;
} else if (exception instanceof TaskCorruptedException) {
log.warn("Encounter corrupted task " + taskId + ", will group it with other corrupted tasks " +
"and handle together", exception);
log.warn("Encounter corrupted task {}, will group it with other corrupted tasks " +
"and handle together", taskId, exception);
aggregatedCorruptedTaskIds.add(taskId);
} else {
((StreamsException) exception).setTaskId(taskId);
Expand Down Expand Up @@ -1057,81 +1057,91 @@ void handleRevocation(final Collection<TopicPartition> revokedPartitions) {
"have been cleaned up by the handleAssignment callback.", remainingRevokedPartitions);
}

if (revokedTasksNeedCommit) {
prepareCommitAndAddOffsetsToMap(revokedActiveTasks, consumedOffsetsPerTask);
// if we need to commit any revoking task then we just commit all of those needed committing together
prepareCommitAndAddOffsetsToMap(commitNeededActiveTasks, consumedOffsetsPerTask);
}

// even if commit failed, we should still continue and complete suspending those tasks, so we would capture
// any exception and rethrow it at the end. some exceptions may be handled immediately and then swallowed,
// as such we just need to skip those dirty tasks in the checkpoint
// even if prepare, commit, or postCommit failed, we must still suspend revoked tasks and unlock,
// so we use try-finally to guarantee that. Exceptions are captured and rethrown at the end.
final Set<Task> dirtyTasks = new TreeSet<>(Comparator.comparing(Task::id));
boolean prepareCommitSucceeded = false;
try {
if (revokedTasksNeedCommit) {
// in handleRevocation we must call commitOffsetsOrTransaction() directly rather than
// commitAndFillInConsumedOffsetsAndMetadataPerTaskMap() to make sure we don't skip the
// offset commit because we are in a rebalance
taskExecutor.commitOffsetsOrTransaction(consumedOffsetsPerTask);
}
} catch (final TaskCorruptedException e) {
log.warn("Some tasks were corrupted when trying to commit offsets, these will be cleaned and revived: {}",
e.corruptedTasks());

// If we hit a TaskCorruptedException it must be EOS, just handle the cleanup for those corrupted tasks right here
dirtyTasks.addAll(tasks.initializedTasks(e.corruptedTasks()));
closeDirtyAndRevive(dirtyTasks, true);
} catch (final TimeoutException e) {
log.warn("Timed out while trying to commit all tasks during revocation, these will be cleaned and revived");

// If we hit a TimeoutException it must be ALOS, just close dirty and revive without wiping the state
dirtyTasks.addAll(consumedOffsetsPerTask.keySet());
closeDirtyAndRevive(dirtyTasks, false);
} catch (final RuntimeException e) {
log.error("Exception caught while committing those revoked tasks " + revokedActiveTasks, e);
firstException.compareAndSet(null, e);
dirtyTasks.addAll(consumedOffsetsPerTask.keySet());
}

// we enforce checkpointing upon suspending a task: if it is resumed later we just proceed normally, if it is
// going to be closed we would checkpoint by then
for (final Task task : revokedActiveTasks) {
if (!dirtyTasks.contains(task)) {
try {
task.postCommit(true);
prepareCommitAndAddOffsetsToMap(revokedActiveTasks, consumedOffsetsPerTask);
// if we need to commit any revoking task then we just commit all of those needed committing together
prepareCommitAndAddOffsetsToMap(commitNeededActiveTasks, consumedOffsetsPerTask);
prepareCommitSucceeded = true;
} catch (final RuntimeException e) {
log.error("Exception caught while post-committing task " + task.id(), e);
maybeSetFirstException(false, maybeWrapTaskException(e, task.id()), firstException);
log.error("Exception caught while preparing to commit revoked tasks {}", revokedActiveTasks, e);
maybeSetFirstException(false, e, firstException);
dirtyTasks.addAll(revokedActiveTasks);
dirtyTasks.addAll(commitNeededActiveTasks);
}
}
}

if (revokedTasksNeedCommit) {
for (final Task task : commitNeededActiveTasks) {
try {
if (revokedTasksNeedCommit && prepareCommitSucceeded) {
// in handleRevocation we must call commitOffsetsOrTransaction() directly rather than
// commitAndFillInConsumedOffsetsAndMetadataPerTaskMap() to make sure we don't skip the
// offset commit because we are in a rebalance
taskExecutor.commitOffsetsOrTransaction(consumedOffsetsPerTask);
}
} catch (final TaskCorruptedException e) {
log.warn("Some tasks were corrupted when trying to commit offsets, these will be cleaned and revived: {}",
e.corruptedTasks());

// If we hit a TaskCorruptedException it must be EOS, just handle the cleanup for those corrupted tasks right here
dirtyTasks.addAll(tasks.initializedTasks(e.corruptedTasks()));
closeDirtyAndRevive(dirtyTasks, true);
} catch (final TimeoutException e) {
log.warn("Timed out while trying to commit all tasks during revocation, these will be cleaned and revived");

// If we hit a TimeoutException it must be ALOS, just close dirty and revive without wiping the state
dirtyTasks.addAll(consumedOffsetsPerTask.keySet());
closeDirtyAndRevive(dirtyTasks, false);
} catch (final RuntimeException e) {
log.error("Exception caught while committing those revoked tasks {}", revokedActiveTasks, e);
maybeSetFirstException(false, e, firstException);
dirtyTasks.addAll(consumedOffsetsPerTask.keySet());
}

// we enforce checkpointing upon suspending a task: if it is resumed later we just proceed normally, if it is
// going to be closed we would checkpoint by then
for (final Task task : revokedActiveTasks) {
if (!dirtyTasks.contains(task)) {
try {
// for non-revoking active tasks, we should not enforce checkpoint
// since if it is EOS enabled, no checkpoint should be written while
// the task is in RUNNING tate
task.postCommit(false);
task.postCommit(true);
} catch (final RuntimeException e) {
log.error("Exception caught while post-committing task " + task.id(), e);
log.error("Exception caught while post-committing task {}", task.id(), e);
maybeSetFirstException(false, maybeWrapTaskException(e, task.id()), firstException);
}
}
}
}

for (final Task task : revokedActiveTasks) {
try {
task.suspend();
} catch (final RuntimeException e) {
log.error("Caught the following exception while trying to suspend revoked task " + task.id(), e);
maybeSetFirstException(false, maybeWrapTaskException(e, task.id()), firstException);
if (revokedTasksNeedCommit) {
for (final Task task : commitNeededActiveTasks) {
if (!dirtyTasks.contains(task)) {
try {
// for non-revoking active tasks, we should not enforce checkpoint
// since if it is EOS enabled, no checkpoint should be written while
// the task is in RUNNING tate
task.postCommit(false);
} catch (final RuntimeException e) {
log.error("Exception caught while post-committing task {}", task.id(), e);
maybeSetFirstException(false, maybeWrapTaskException(e, task.id()), firstException);
}
}
}
}
} finally {
for (final Task task : revokedActiveTasks) {
try {
task.suspend();
} catch (final RuntimeException e) {
log.error("Caught the following exception while trying to suspend revoked task {}", task.id(), e);
maybeSetFirstException(false, maybeWrapTaskException(e, task.id()), firstException);
}
}
}

maybeUnlockTasks(lockedTaskIds);
maybeUnlockTasks(lockedTaskIds);
}

if (firstException.get() != null) {
throw firstException.get();
Expand Down Expand Up @@ -1384,14 +1394,14 @@ void shutdown(final boolean clean) {
executeAndMaybeSwallow(
clean,
() -> closeAndCleanUpTasks(activeTasks, standbyTasks, clean),
e -> firstException.compareAndSet(null, e),
e -> maybeSetFirstException(false, e, firstException),
e -> log.warn("Ignoring an exception while unlocking remaining task directories.", e)
);

executeAndMaybeSwallow(
clean,
activeTaskCreator::close,
e -> firstException.compareAndSet(null, e),
e -> maybeSetFirstException(false, e, firstException),
e -> log.warn("Ignoring an exception while closing thread producer.", e)
);

Expand All @@ -1402,7 +1412,7 @@ void shutdown(final boolean clean) {
executeAndMaybeSwallow(
clean,
this::releaseLockedUnassignedTaskDirectories,
e -> firstException.compareAndSet(null, e),
e -> maybeSetFirstException(false, e, firstException),
e -> log.warn("Ignoring an exception while unlocking remaining task directories.", e)
);

Expand Down Expand Up @@ -1511,10 +1521,10 @@ private Collection<StreamTask> tryCloseCleanActiveTasks(final Collection<StreamT
tasksToCloseDirty.add(task);
} catch (final StreamsException e) {
e.setTaskId(task.id());
firstException.compareAndSet(null, e);
maybeSetFirstException(false, e, firstException);
tasksToCloseDirty.add(task);
} catch (final RuntimeException e) {
firstException.compareAndSet(null, new StreamsException(e, task.id()));
maybeSetFirstException(false, new StreamsException(e, task.id()), firstException);
tasksToCloseDirty.add(task);
}
}
Expand All @@ -1527,7 +1537,7 @@ private Collection<StreamTask> tryCloseCleanActiveTasks(final Collection<StreamT
try {
taskExecutor.commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
} catch (final RuntimeException e) {
log.error("Exception caught while committing tasks " + consumedOffsetsAndMetadataPerTask.keySet(), e);
log.error("Exception caught while committing tasks {}", consumedOffsetsAndMetadataPerTask.keySet(), e);
// TODO: should record the task ids when handling this exception
maybeSetFirstException(false, e, firstException);

Expand All @@ -1551,7 +1561,7 @@ private Collection<StreamTask> tryCloseCleanActiveTasks(final Collection<StreamT
try {
task.postCommit(true);
} catch (final RuntimeException e) {
log.error("Exception caught while post-committing task " + task.id(), e);
log.error("Exception caught while post-committing task {}", task.id(), e);
maybeSetFirstException(false, maybeWrapTaskException(e, task.id()), firstException);
tasksToCloseDirty.add(task);
tasksToCloseClean.remove(task);
Expand Down Expand Up @@ -2048,7 +2058,9 @@ private void maybeSetFirstException(final boolean ignoreTaskMigrated,
final RuntimeException exception,
final AtomicReference<RuntimeException> firstException) {
if (!ignoreTaskMigrated || !(exception instanceof TaskMigratedException)) {
firstException.compareAndSet(null, exception);
if (!firstException.compareAndSet(null, exception)) {
firstException.get().addSuppressed(exception);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2989,6 +2989,54 @@ public void shouldSuspendActiveTasksDuringRevocation() {
verify(task00).suspend();
}

@Test
public void shouldSuspendRevokedTasksWhenPrepareCommitThrows() {
final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions)
.withInputPartitions(taskId00Partitions)
.inState(State.RUNNING)
.build();

final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.allInitializedTasks()).thenReturn(Set.of(task00));

when(task00.commitNeeded()).thenReturn(true);
when(task00.prepareCommit(true)).thenThrow(new TaskMigratedException("task migrated"));

final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);

final StreamsException thrown = assertThrows(StreamsException.class,
() -> taskManager.handleRevocation(taskId00Partitions));

assertInstanceOf(TaskMigratedException.class, thrown);

verify(task00).suspend();
verify(task00, never()).postCommit(anyBoolean());
}

@Test
public void shouldAttachSuppressedExceptionsWhenMultiplePhasesFailDuringRevocation() {
final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions)
.withInputPartitions(taskId00Partitions)
.inState(State.RUNNING)
.build();

final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.allInitializedTasks()).thenReturn(Set.of(task00));

when(task00.commitNeeded()).thenReturn(true);
when(task00.prepareCommit(true)).thenThrow(new TaskMigratedException("task migrated"));
doThrow(new RuntimeException("suspend failed")).when(task00).suspend();

final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);

final StreamsException thrown = assertThrows(StreamsException.class,
() -> taskManager.handleRevocation(taskId00Partitions));

assertInstanceOf(TaskMigratedException.class, thrown);
assertEquals(1, thrown.getSuppressed().length);
assertInstanceOf(StreamsException.class, thrown.getSuppressed()[0]);
}

@Test
public void shouldCommitAllActiveTasksThatNeedCommittingOnHandleRevocationWithEosV2() {
// task being revoked, needs commit
Expand Down
Loading