diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java index 0e22967583cce..771a40063dd82 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -132,6 +132,7 @@ public final Task.State state() { @Override public void revive() { if (state == CLOSED) { + clearTaskTimeout(); transitionTo(CREATED); } else { throw new IllegalStateException("Illegal state " + state() + " while reviving task " + id); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index ae0b94473cc58..e2c5e03cb15ef 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -494,8 +494,12 @@ public void postCommit(final boolean enforceCheckpoint) { throw new IllegalStateException("Unknown state " + state() + " while post committing active task " + id); } - commitRequested = false; + clearCommitStatuses(); + } + + private void clearCommitStatuses() { commitNeeded = false; + commitRequested = false; hasPendingTxCommit = false; } @@ -511,6 +515,7 @@ private Map extractPartitionTimes() { public void closeClean() { validateClean(); removeAllSensors(); + clearCommitStatuses(); close(true); log.info("Closed clean"); } @@ -518,6 +523,7 @@ public void closeClean() { @Override public void closeDirty() { removeAllSensors(); + clearCommitStatuses(); close(false); log.info("Closed dirty"); } @@ -532,6 +538,7 @@ public void updateInputPartitions(final Set topicPartitions, fin public void closeCleanAndRecycleState() { validateClean(); removeAllSensors(); + clearCommitStatuses(); switch (state()) { case SUSPENDED: stateMgr.recycle(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 73f020774dbe3..9b925a8832362 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -155,40 +155,55 @@ void handleRebalanceComplete() { * @throws TaskMigratedException */ void handleCorruption(final Set corruptedTasks) { - final Map> corruptedStandbyTasks = new HashMap<>(); - final Map> corruptedActiveTasks = new HashMap<>(); + final Set corruptedActiveTasks = new HashSet<>(); + final Set corruptedStandbyTasks = new HashSet<>(); for (final TaskId taskId : corruptedTasks) { final Task task = tasks.task(taskId); if (task.isActive()) { - corruptedActiveTasks.put(task, task.changelogPartitions()); + corruptedActiveTasks.add(task); } else { - corruptedStandbyTasks.put(task, task.changelogPartitions()); + corruptedStandbyTasks.add(task); } } // Make sure to clean up any corrupted standby tasks in their entirety before committing // since TaskMigrated can be thrown and the resulting handleLostAll will only clean up active tasks - closeAndRevive(corruptedStandbyTasks); - - commit(tasks() - .values() - .stream() - .filter(t -> t.state() == Task.State.RUNNING || t.state() == Task.State.RESTORING) - .filter(t -> !corruptedTasks.contains(t.id())) - .collect(Collectors.toSet()) - ); + closeDirtyAndRevive(corruptedStandbyTasks, true); + + // We need to commit before closing the corrupted active tasks since this will force the ongoing txn to abort + try { + commitAndFillInConsumedOffsetsAndMetadataPerTaskMap(tasks() + .values() + .stream() + .filter(t -> t.state() == Task.State.RUNNING || t.state() == Task.State.RESTORING) + .filter(t -> !corruptedTasks.contains(t.id())) + .collect(Collectors.toSet()), + new HashMap<>() + ); + } catch (final TaskCorruptedException e) { + log.info("Some additional tasks were found corrupted while trying to commit, these will be added to the " + + "tasks to clean and revive: {}", e.corruptedTasks()); + corruptedActiveTasks.addAll(tasks.tasks(e.corruptedTasks())); + } catch (final TimeoutException e) { + log.info("Hit TimeoutException when committing all non-corrupted tasks, these will be closed and revived"); + final Collection uncorruptedTasks = new HashSet<>(tasks.activeTasks()); + uncorruptedTasks.removeAll(corruptedActiveTasks); + // Those tasks which just timed out can just be closed dirty without marking changelogs as corrupted + closeDirtyAndRevive(uncorruptedTasks, false); + } - closeAndRevive(corruptedActiveTasks); + closeDirtyAndRevive(corruptedActiveTasks, true); } - private void closeAndRevive(final Map> taskWithChangelogs) { - for (final Map.Entry> entry : taskWithChangelogs.entrySet()) { - final Task task = entry.getKey(); + private void closeDirtyAndRevive(final Collection taskWithChangelogs, final boolean markAsCorrupted) { + for (final Task task : taskWithChangelogs) { + final Collection corruptedPartitions = task.changelogPartitions(); // mark corrupted partitions to not be checkpointed, and then close the task as dirty - final Collection corruptedPartitions = entry.getValue(); - task.markChangelogAsCorrupted(corruptedPartitions); + if (markAsCorrupted) { + task.markChangelogAsCorrupted(corruptedPartitions); + } try { // we do not need to take the returned offsets since we are not going to commit anyways; @@ -201,8 +216,11 @@ private void closeAndRevive(final Map> taskWith try { task.suspend(); + // we need to enforce a checkpoint that removes the corrupted partitions - task.postCommit(true); + if (markAsCorrupted) { + task.postCommit(true); + } } catch (final RuntimeException swallow) { log.error("Error suspending corrupted task {} ", task.id(), swallow); } @@ -332,8 +350,8 @@ private void handleCloseAndRecycle(final Set tasksToRecycle, // write the checkpoint file. final Map offsets = task.prepareCommit(); if (!offsets.isEmpty()) { - log.error("Task {} should has been committed when it was suspended, but it reports non-empty " + - "offsets {} to commit; it means it fails during last commit and hence should be closed dirty", + log.error("Task {} should have been committed when it was suspended, but it reports non-empty " + + "offsets {} to commit; this means it failed during last commit and hence should be closed dirty", task.id(), offsets); tasksToCloseDirty.add(task); @@ -509,20 +527,35 @@ void handleRevocation(final Collection revokedPartitions) { prepareCommitAndAddOffsetsToMap(commitNeededActiveTasks, consumedOffsetsPerTask); } - // even if commit failed, we should still continue and complete suspending those tasks, - // so we would capture any exception and throw + // even if commit failed, we should still continue and complete suspending those tasks, so we would capture + // any exception and rethrow it at the end. some exceptions may be handled immediately and then swallowed, + // as such we just need to skip those dirty tasks in the checkpoint + final Set dirtyTasks = new HashSet<>(); try { commitOffsetsOrTransaction(consumedOffsetsPerTask); + } catch (final TaskCorruptedException e) { + log.warn("Some tasks were corrupted when trying to commit offsets, these will be cleaned and revived: {}", + e.corruptedTasks()); + + // If we hit a TaskCorruptedException it must be EOS, just handle the cleanup for those corrupted tasks right here + dirtyTasks.addAll(tasks.tasks(e.corruptedTasks())); + closeDirtyAndRevive(dirtyTasks, true); + } catch (final TimeoutException e) { + log.warn("Timed out while trying to commit all tasks during revocation, these will be cleaned and revived"); + + // If we hit a TimeoutException it must be ALOS, just close dirty and revive without wiping the state + dirtyTasks.addAll(consumedOffsetsPerTask.keySet()); + closeDirtyAndRevive(dirtyTasks, false); } catch (final RuntimeException e) { log.error("Exception caught while committing those revoked tasks " + revokedActiveTasks, e); firstException.compareAndSet(null, e); + dirtyTasks.addAll(consumedOffsetsPerTask.keySet()); } - // only try to complete post-commit if committing succeeded; - // we enforce checkpointing upon suspending a task: if it is resumed later we just - // proceed normally, if it is going to be closed we would checkpoint by then - if (firstException.get() == null) { - for (final Task task : revokedActiveTasks) { + // we enforce checkpointing upon suspending a task: if it is resumed later we just proceed normally, if it is + // going to be closed we would checkpoint by then + for (final Task task : revokedActiveTasks) { + if (!dirtyTasks.contains(task)) { try { task.postCommit(true); } catch (final RuntimeException e) { @@ -530,9 +563,11 @@ void handleRevocation(final Collection revokedPartitions) { firstException.compareAndSet(null, e); } } + } - if (shouldCommitAdditionalTasks) { - for (final Task task : commitNeededActiveTasks) { + if (shouldCommitAdditionalTasks) { + for (final Task task : commitNeededActiveTasks) { + if (!dirtyTasks.contains(task)) { try { // for non-revoking active tasks, we should not enforce checkpoint // since if it is EOS enabled, no checkpoint should be written while @@ -972,42 +1007,53 @@ void addRecordsToTasks(final ConsumerRecords records) { /** * @throws TaskMigratedException if committing offsets failed (non-EOS) * or if the task producer got fenced (EOS) + * @throws TimeoutException if task.timeout.ms has been exceeded (non-EOS) + * @throws TaskCorruptedException if committing offsets failed due to TimeoutException (EOS) * @return number of committed offsets, or -1 if we are in the middle of a rebalance and cannot commit */ int commit(final Collection tasksToCommit) { + int committed = 0; if (rebalanceInProgress) { - return -1; + committed = -1; } else { - int committed = 0; final Map> consumedOffsetsAndMetadataPerTask = new HashMap<>(); - for (final Task task : tasksToCommit) { - if (task.commitNeeded()) { - final Map offsetAndMetadata = task.prepareCommit(); - if (task.isActive()) { - consumedOffsetsAndMetadataPerTask.put(task, offsetAndMetadata); - } - } - } - try { - commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask); - - for (final Task task : tasksToCommit) { - if (task.commitNeeded()) { - task.clearTaskTimeout(); - ++committed; - task.postCommit(false); - } - } + committed = commitAndFillInConsumedOffsetsAndMetadataPerTaskMap(tasksToCommit, consumedOffsetsAndMetadataPerTask); } catch (final TimeoutException timeoutException) { consumedOffsetsAndMetadataPerTask .keySet() .forEach(t -> t.maybeInitTaskTimeoutOrThrow(time.milliseconds(), timeoutException)); } + } + return committed; + } + /** + * @param consumedOffsetsAndMetadataPerTask an empty map that will be filled in with the prepared offsets + */ + private int commitAndFillInConsumedOffsetsAndMetadataPerTaskMap(final Collection tasksToCommit, + final Map> consumedOffsetsAndMetadataPerTask) { + int committed = 0; - return committed; + for (final Task task : tasksToCommit) { + if (task.commitNeeded()) { + final Map offsetAndMetadata = task.prepareCommit(); + if (task.isActive()) { + consumedOffsetsAndMetadataPerTask.put(task, offsetAndMetadata); + } + } } + + commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask); + + for (final Task task : tasksToCommit) { + if (task.commitNeeded()) { + task.clearTaskTimeout(); + ++committed; + task.postCommit(false); + } + } + return committed; } /** @@ -1027,6 +1073,11 @@ int maybeCommitActiveTasksPerUserRequested() { } } + /** + * @throws TaskMigratedException if committing offsets failed due to CommitFailedException (non-EOS) + * @throws TimeoutException if committing offsets failed due to TimeoutException (non-EOS) + * @throws TaskCorruptedException if committing offsets failed due to TimeoutException (EOS) + */ private void commitOffsetsOrTransaction(final Map> offsetsPerTask) { log.debug("Committing task offsets {}", offsetsPerTask.entrySet().stream().collect(Collectors.toMap(t -> t.getKey().id(), Entry::getValue))); // avoid logging actual Task objects diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java index e12290f395521..4193deb6f7d70 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java @@ -23,6 +23,8 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; + +import java.util.HashSet; import org.slf4j.Logger; import java.util.Collection; @@ -234,6 +236,14 @@ Task task(final TaskId taskId) { return allTasksPerId.get(taskId); } + Collection tasks(final Collection taskIds) { + final Set tasks = new HashSet<>(); + for (final TaskId taskId : taskIds) { + tasks.add(task(taskId)); + } + return tasks; + } + // TODO: change return type to `StreamTask` Collection activeTasks() { return readOnlyActiveTasks; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 4f78cbe1c94d7..1596db210a595 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -2031,6 +2031,24 @@ public void shouldUnregisterMetricsInCloseCleanAndRecycleState() { assertThat(getTaskMetrics(), empty()); } + @Test + public void shouldClearCommitStatusesInCloseDirty() { + task = createSingleSourceStateless(createConfig(AT_LEAST_ONCE, "0"), StreamsConfig.METRICS_LATEST); + task.initializeIfNeeded(); + task.completeRestoration(noOpResetter -> { }); + + task.addRecords(partition1, singletonList(getConsumerRecordWithOffsetAsTimestamp(partition1, 0))); + assertTrue(task.process(0L)); + task.requestCommit(); + + task.suspend(); + assertThat(task.commitNeeded(), is(true)); + assertThat(task.commitRequested(), is(true)); + task.closeDirty(); + assertThat(task.commitNeeded(), is(false)); + assertThat(task.commitRequested(), is(false)); + } + @Test public void closeShouldBeIdempotent() { EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes(); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index 49ee261715837..75662297b0879 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -109,6 +109,7 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; @RunWith(EasyMockRunner.class) public class TaskManagerTest { @@ -118,12 +119,16 @@ public class TaskManagerTest { private final TaskId taskId00 = new TaskId(0, 0); private final TopicPartition t1p0 = new TopicPartition(topic1, 0); + private final TopicPartition t1p0changelog = new TopicPartition("changelog", 0); private final Set taskId00Partitions = mkSet(t1p0); + private final Set taskId00ChangelogPartitions = mkSet(t1p0changelog); private final Map> taskId00Assignment = singletonMap(taskId00, taskId00Partitions); private final TaskId taskId01 = new TaskId(0, 1); private final TopicPartition t1p1 = new TopicPartition(topic1, 1); + private final TopicPartition t1p1changelog = new TopicPartition("changelog", 1); private final Set taskId01Partitions = mkSet(t1p1); + private final Set taskId01ChangelogPartitions = mkSet(t1p1changelog); private final Map> taskId01Assignment = singletonMap(taskId01, taskId01Partitions); private final TaskId taskId02 = new TaskId(0, 2); @@ -816,6 +821,281 @@ public Map prepareCommit() { verify(consumer); } + @Test + public void shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitWithALOS() { + final ProcessorStateManager stateManager = EasyMock.createStrictMock(ProcessorStateManager.class); + stateManager.markChangelogAsCorrupted(taskId00Partitions); + replay(stateManager); + + final StateMachineTask corruptedActive = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager); + final StateMachineTask uncorruptedActive = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager) { + @Override + public void markChangelogAsCorrupted(final Collection partitions) { + fail("Should not try to mark changelogs as corrupted for uncorrupted task"); + } + }; + final Map offsets = singletonMap(t1p1, new OffsetAndMetadata(0L, null)); + uncorruptedActive.setCommittableOffsetsAndMetadata(offsets); + + // handleAssignment + final Map> assignment = new HashMap<>(); + assignment.putAll(taskId00Assignment); + assignment.putAll(taskId01Assignment); + expect(activeTaskCreator.createTasks(anyObject(), eq(assignment))).andStubReturn(asList(corruptedActive, uncorruptedActive)); + topologyBuilder.addSubscribedTopicsFromAssignment(anyObject(), anyString()); + expectLastCall().anyTimes(); + + expectRestoreToBeCompleted(consumer, changeLogReader); + + consumer.commitSync(offsets); + expectLastCall().andThrow(new TimeoutException()); + + expect(consumer.assignment()).andStubReturn(union(HashSet::new, taskId00Partitions, taskId01Partitions)); + + replay(activeTaskCreator, standbyTaskCreator, topologyBuilder, consumer, changeLogReader); + + taskManager.handleAssignment(assignment, emptyMap()); + assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); + + assertThat(uncorruptedActive.state(), is(Task.State.RUNNING)); + assertThat(corruptedActive.state(), is(Task.State.RUNNING)); + + // make sure this will be committed and throw + uncorruptedActive.setCommitNeeded(); + corruptedActive.setChangelogOffsets(singletonMap(t1p0, 0L)); + + assertThat(uncorruptedActive.commitPrepared, is(false)); + assertThat(uncorruptedActive.commitNeeded, is(true)); + assertThat(uncorruptedActive.commitCompleted, is(false)); + assertThat(corruptedActive.commitPrepared, is(false)); + assertThat(corruptedActive.commitNeeded, is(false)); + assertThat(corruptedActive.commitCompleted, is(false)); + + taskManager.handleCorruption(singleton(taskId00)); + + assertThat(uncorruptedActive.commitPrepared, is(true)); + assertThat(uncorruptedActive.commitNeeded, is(false)); + assertThat(uncorruptedActive.commitCompleted, is(false)); //if not corrupted, we should close dirty without committing + assertThat(corruptedActive.commitPrepared, is(true)); + assertThat(corruptedActive.commitNeeded, is(false)); + assertThat(corruptedActive.commitCompleted, is(true)); //if corrupted, should enforce checkpoint with corrupted tasks removed + + assertThat(corruptedActive.state(), is(Task.State.CREATED)); + assertThat(uncorruptedActive.state(), is(Task.State.CREATED)); + verify(consumer); + } + + @Test + public void shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitDuringHandleCorruptedWithEOS() { + setUpTaskManager(ProcessingMode.EXACTLY_ONCE_BETA); + final StreamsProducer producer = mock(StreamsProducer.class); + expect(activeTaskCreator.threadProducer()).andStubReturn(producer); + final ProcessorStateManager stateManager = EasyMock.createMock(ProcessorStateManager.class); + + final AtomicBoolean corruptedTaskChangelogMarkedAsCorrupted = new AtomicBoolean(false); + final StateMachineTask corruptedActiveTask = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) { + @Override + public void markChangelogAsCorrupted(final Collection partitions) { + super.markChangelogAsCorrupted(partitions); + corruptedTaskChangelogMarkedAsCorrupted.set(true); + } + }; + stateManager.markChangelogAsCorrupted(taskId00ChangelogPartitions); + + final AtomicBoolean uncorruptedTaskChangelogMarkedAsCorrupted = new AtomicBoolean(false); + final StateMachineTask uncorruptedActiveTask = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager) { + @Override + public void markChangelogAsCorrupted(final Collection partitions) { + super.markChangelogAsCorrupted(partitions); + uncorruptedTaskChangelogMarkedAsCorrupted.set(true); + } + }; + final Map offsets = singletonMap(t1p1, new OffsetAndMetadata(0L, null)); + uncorruptedActiveTask.setCommittableOffsetsAndMetadata(offsets); + stateManager.markChangelogAsCorrupted(taskId01ChangelogPartitions); + + // handleAssignment + final Map> assignment = new HashMap<>(); + assignment.putAll(taskId00Assignment); + assignment.putAll(taskId01Assignment); + expect(activeTaskCreator.createTasks(anyObject(), eq(assignment))).andStubReturn(asList(corruptedActiveTask, uncorruptedActiveTask)); + topologyBuilder.addSubscribedTopicsFromAssignment(anyObject(), anyString()); + expectLastCall().anyTimes(); + + expectRestoreToBeCompleted(consumer, changeLogReader); + + final ConsumerGroupMetadata groupMetadata = new ConsumerGroupMetadata("appId"); + expect(consumer.groupMetadata()).andReturn(groupMetadata); + producer.commitTransaction(offsets, groupMetadata); + expectLastCall().andThrow(new TimeoutException()); + + expect(consumer.assignment()).andStubReturn(union(HashSet::new, taskId00Partitions, taskId01Partitions)); + + replay(activeTaskCreator, standbyTaskCreator, topologyBuilder, consumer, changeLogReader, stateManager, producer); + + taskManager.handleAssignment(assignment, emptyMap()); + assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); + + assertThat(uncorruptedActiveTask.state(), is(Task.State.RUNNING)); + assertThat(corruptedActiveTask.state(), is(Task.State.RUNNING)); + + // make sure this will be committed and throw + uncorruptedActiveTask.setCommitNeeded(); + + final Map corruptedActiveTaskChangelogOffsets = singletonMap(t1p0changelog, 0L); + corruptedActiveTask.setChangelogOffsets(corruptedActiveTaskChangelogOffsets); + final Map uncorruptedActiveTaskChangelogOffsets = singletonMap(t1p1changelog, 0L); + uncorruptedActiveTask.setChangelogOffsets(uncorruptedActiveTaskChangelogOffsets); + + assertThat(uncorruptedActiveTask.commitPrepared, is(false)); + assertThat(uncorruptedActiveTask.commitNeeded, is(true)); + assertThat(uncorruptedActiveTask.commitCompleted, is(false)); + assertThat(corruptedActiveTask.commitPrepared, is(false)); + assertThat(corruptedActiveTask.commitNeeded, is(false)); + assertThat(corruptedActiveTask.commitCompleted, is(false)); + + taskManager.handleCorruption(singleton(taskId00)); + + assertThat(uncorruptedActiveTask.commitPrepared, is(true)); + assertThat(uncorruptedActiveTask.commitNeeded, is(false)); + assertThat(uncorruptedActiveTask.commitCompleted, is(true)); //if corrupted due to timeout on commit, should enforce checkpoint with corrupted tasks removed + assertThat(corruptedActiveTask.commitPrepared, is(true)); + assertThat(corruptedActiveTask.commitNeeded, is(false)); + assertThat(corruptedActiveTask.commitCompleted, is(true)); //if corrupted, should enforce checkpoint with corrupted tasks removed + + assertThat(corruptedActiveTask.state(), is(Task.State.CREATED)); + assertThat(uncorruptedActiveTask.state(), is(Task.State.CREATED)); + assertThat(corruptedTaskChangelogMarkedAsCorrupted.get(), is(true)); + assertThat(uncorruptedTaskChangelogMarkedAsCorrupted.get(), is(true)); + verify(consumer); + } + + @Test + public void shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitDuringRevocationWithALOS() { + final StateMachineTask revokedActiveTask = new StateMachineTask(taskId00, taskId00Partitions, true); + final Map offsets00 = singletonMap(t1p0, new OffsetAndMetadata(0L, null)); + revokedActiveTask.setCommittableOffsetsAndMetadata(offsets00); + revokedActiveTask.setCommitNeeded(); + + final StateMachineTask unrevokedActiveTaskWithCommitNeeded = new StateMachineTask(taskId01, taskId01Partitions, true) { + @Override + public void markChangelogAsCorrupted(final Collection partitions) { + fail("Should not try to mark changelogs as corrupted for uncorrupted task"); + } + }; + final Map offsets01 = singletonMap(t1p1, new OffsetAndMetadata(1L, null)); + unrevokedActiveTaskWithCommitNeeded.setCommittableOffsetsAndMetadata(offsets01); + unrevokedActiveTaskWithCommitNeeded.setCommitNeeded(); + + final StateMachineTask unrevokedActiveTaskWithoutCommitNeeded = new StateMachineTask(taskId02, taskId02Partitions, true); + + final Map expectedCommittedOffsets = new HashMap<>(); + expectedCommittedOffsets.putAll(offsets00); + expectedCommittedOffsets.putAll(offsets01); + + final Map> assignmentActive = mkMap( + mkEntry(taskId00, taskId00Partitions), + mkEntry(taskId01, taskId01Partitions), + mkEntry(taskId02, taskId02Partitions) + ); + + expectRestoreToBeCompleted(consumer, changeLogReader); + + expect(activeTaskCreator.createTasks(anyObject(), eq(assignmentActive))).andReturn(asList(revokedActiveTask, unrevokedActiveTaskWithCommitNeeded, unrevokedActiveTaskWithoutCommitNeeded)); + activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId00); + expectLastCall(); + consumer.commitSync(expectedCommittedOffsets); + expectLastCall().andThrow(new TimeoutException()); + expect(consumer.assignment()).andStubReturn(union(HashSet::new, taskId00Partitions, taskId01Partitions, taskId02Partitions)); + + replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader); + + taskManager.handleAssignment(assignmentActive, emptyMap()); + assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); + assertThat(revokedActiveTask.state(), is(Task.State.RUNNING)); + assertThat(unrevokedActiveTaskWithCommitNeeded.state(), is(State.RUNNING)); + assertThat(unrevokedActiveTaskWithoutCommitNeeded.state(), is(Task.State.RUNNING)); + + taskManager.handleRevocation(taskId00Partitions); + + assertThat(revokedActiveTask.state(), is(State.SUSPENDED)); + assertThat(unrevokedActiveTaskWithCommitNeeded.state(), is(State.CREATED)); + assertThat(unrevokedActiveTaskWithoutCommitNeeded.state(), is(State.RUNNING)); + } + + @Test + public void shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitDuringRevocationWithEOS() { + setUpTaskManager(ProcessingMode.EXACTLY_ONCE_BETA); + final StreamsProducer producer = mock(StreamsProducer.class); + expect(activeTaskCreator.threadProducer()).andStubReturn(producer); + final ProcessorStateManager stateManager = EasyMock.createMock(ProcessorStateManager.class); + + final StateMachineTask revokedActiveTask = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager); + final Map revokedActiveTaskOffsets = singletonMap(t1p0, new OffsetAndMetadata(0L, null)); + revokedActiveTask.setCommittableOffsetsAndMetadata(revokedActiveTaskOffsets); + revokedActiveTask.setCommitNeeded(); + + final AtomicBoolean unrevokedTaskChangelogMarkedAsCorrupted = new AtomicBoolean(false); + final StateMachineTask unrevokedActiveTask = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager) { + @Override + public void markChangelogAsCorrupted(final Collection partitions) { + super.markChangelogAsCorrupted(partitions); + unrevokedTaskChangelogMarkedAsCorrupted.set(true); + } + }; + final Map unrevokedTaskOffsets = singletonMap(t1p1, new OffsetAndMetadata(1L, null)); + unrevokedActiveTask.setCommittableOffsetsAndMetadata(unrevokedTaskOffsets); + unrevokedActiveTask.setCommitNeeded(); + + final StateMachineTask unrevokedActiveTaskWithoutCommitNeeded = new StateMachineTask(taskId02, taskId02Partitions, true, stateManager); + + final Map expectedCommittedOffsets = new HashMap<>(); + expectedCommittedOffsets.putAll(revokedActiveTaskOffsets); + expectedCommittedOffsets.putAll(unrevokedTaskOffsets); + + stateManager.markChangelogAsCorrupted(taskId00ChangelogPartitions); + stateManager.markChangelogAsCorrupted(taskId01ChangelogPartitions); + + final Map> assignmentActive = mkMap( + mkEntry(taskId00, taskId00Partitions), + mkEntry(taskId01, taskId01Partitions), + mkEntry(taskId02, taskId02Partitions) + ); + + expectRestoreToBeCompleted(consumer, changeLogReader); + + expect(activeTaskCreator.createTasks(anyObject(), eq(assignmentActive))).andReturn(asList(revokedActiveTask, unrevokedActiveTask, unrevokedActiveTaskWithoutCommitNeeded)); + activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId00); + expectLastCall(); + + final ConsumerGroupMetadata groupMetadata = new ConsumerGroupMetadata("appId"); + expect(consumer.groupMetadata()).andReturn(groupMetadata); + producer.commitTransaction(expectedCommittedOffsets, groupMetadata); + expectLastCall().andThrow(new TimeoutException()); + + expect(consumer.assignment()).andStubReturn(union(HashSet::new, taskId00Partitions, taskId01Partitions, taskId02Partitions)); + + replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader, producer, stateManager); + + taskManager.handleAssignment(assignmentActive, emptyMap()); + assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); + assertThat(revokedActiveTask.state(), is(Task.State.RUNNING)); + assertThat(unrevokedActiveTask.state(), is(Task.State.RUNNING)); + assertThat(unrevokedActiveTaskWithoutCommitNeeded.state(), is(State.RUNNING)); + + final Map revokedActiveTaskChangelogOffsets = singletonMap(t1p0changelog, 0L); + revokedActiveTask.setChangelogOffsets(revokedActiveTaskChangelogOffsets); + final Map unrevokedActiveTaskChangelogOffsets = singletonMap(t1p1changelog, 0L); + unrevokedActiveTask.setChangelogOffsets(unrevokedActiveTaskChangelogOffsets); + + taskManager.handleRevocation(taskId00Partitions); + + assertThat(unrevokedTaskChangelogMarkedAsCorrupted.get(), is(true)); + assertThat(revokedActiveTask.state(), is(State.SUSPENDED)); + assertThat(unrevokedActiveTask.state(), is(State.CREATED)); + assertThat(unrevokedActiveTaskWithoutCommitNeeded.state(), is(State.RUNNING)); + } + @Test public void shouldCloseStandbyUnassignedTasksWhenCreatingNewTasks() { final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, false); @@ -1058,6 +1338,7 @@ public void shouldCommitAllActiveTasksThatNeedCommittingOnHandleRevocationWithEo expect(activeTaskCreator.createTasks(anyObject(), eq(assignmentActive))) .andReturn(asList(task00, task01, task02)); + expect(activeTaskCreator.threadProducer()).andReturn(producer); activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId00); expect(standbyTaskCreator.createTasks(eq(assignmentStandby))) @@ -2937,9 +3218,12 @@ private static ConsumerRecord getConsumerRecord(final TopicParti private static class StateMachineTask extends AbstractTask implements Task { private final boolean active; + + // TODO: KAFKA-12569 clean up usage of these flags and use the new commitCompleted flag where appropriate private boolean commitNeeded = false; private boolean commitRequested = false; private boolean commitPrepared = false; + private boolean commitCompleted = false; private Map committableOffsets = Collections.emptyMap(); private Map purgeableOffsets; private Map changelogOffsets = Collections.emptyMap(); @@ -3017,6 +3301,7 @@ public Map prepareCommit() { @Override public void postCommit(final boolean enforceCheckpoint) { commitNeeded = false; + commitCompleted = true; } @Override @@ -3037,6 +3322,14 @@ public void resume() { } } + @Override + public void revive() { + //TODO: KAFKA-12569 move clearing of commit-required statuses to closeDirty/Clean/AndRecycle methods + commitNeeded = false; + commitRequested = false; + super.revive(); + } + @Override public void maybeInitTaskTimeoutOrThrow(final long currentWallClockMs, final Exception cause) {