Skip to content

Commit

Permalink
KAFKA-9734: Fix IllegalState in Streams transit to standby (#8319)
Browse files Browse the repository at this point in the history
Consolidate ChangelogReader state management inside of StreamThread to avoid having to reason about all execution paths in both StreamThread and TaskManager.

Reviewers: Guozhang Wang <wangguoz@gmail.com>
  • Loading branch information
vvcephei committed Mar 20, 2020
1 parent 24d05aa commit 960b216
Show file tree
Hide file tree
Showing 7 changed files with 26 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public interface ChangelogReader extends ChangelogRegister {
/**
* Transit to restore active changelogs mode
*/
void transitToRestoreActive();
void enforceRestoreActive();

/**
* Transit to update standby changelogs mode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,18 @@
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager.StateStoreMetadata;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.slf4j.Logger;

import java.time.Duration;
Expand Down Expand Up @@ -279,15 +279,15 @@ private boolean hasRestoredToEnd(final ChangelogMetadata metadata) {
// NOTE: even if the newly created tasks do not need any restoring, we still first transit to this state and then
// immediately transit back -- there's no overhead of transiting back and forth but simplifies the logic a lot.
@Override
public void transitToRestoreActive() {
public void enforceRestoreActive() {
if (state != ChangelogReaderState.ACTIVE_RESTORING) {
log.debug("Transiting to restore active tasks: {}", changelogs);
}

// pause all partitions that are for standby tasks from the restore consumer
pauseChangelogsFromRestoreConsumer(standbyRestoringChangelogs());
// pause all partitions that are for standby tasks from the restore consumer
pauseChangelogsFromRestoreConsumer(standbyRestoringChangelogs());

state = ChangelogReaderState.ACTIVE_RESTORING;
state = ChangelogReaderState.ACTIVE_RESTORING;
}
}

// Only after we've completed restoring all active tasks we'll then move back to resume updating standby tasks.
Expand All @@ -300,8 +300,10 @@ public void transitToRestoreActive() {
@Override
public void transitToUpdateStandby() {
if (state != ChangelogReaderState.ACTIVE_RESTORING) {
throw new IllegalStateException("The changelog reader is not restoring active tasks while trying to " +
"transit to update standby tasks: " + changelogs);
throw new IllegalStateException(
"The changelog reader is not restoring active tasks (is " + state + ") while trying to " +
"transit to update standby tasks: " + changelogs
);
}

log.debug("Transiting to update standby tasks: {}", changelogs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -504,14 +504,14 @@ private void runLoop() {
mainConsumer.enforceRebalance();
}
} catch (final TaskCorruptedException e) {
log.warn("Detected the states of tasks {} are corrupted. " +
"Will close the task as dirty and re-create and bootstrap from scratch.", e.corruptedTaskWithChangelogs());
log.warn("Detected the states of tasks " + e.corruptedTaskWithChangelogs() + " are corrupted. " +
"Will close the task as dirty and re-create and bootstrap from scratch.", e);

taskManager.handleCorruption(e.corruptedTaskWithChangelogs());
} catch (final TaskMigratedException e) {
log.warn("Detected that the thread is being fenced. " +
"This implies that this thread missed a rebalance and dropped out of the consumer group. " +
"Will close out all assigned tasks and rejoin the consumer group.");
"This implies that this thread missed a rebalance and dropped out of the consumer group. " +
"Will close out all assigned tasks and rejoin the consumer group.", e);

taskManager.handleLostAll();
mainConsumer.unsubscribe();
Expand Down Expand Up @@ -581,13 +581,13 @@ void runOnce() {
// only try to initialize the assigned tasks
// if the state is still in PARTITION_ASSIGNED after the poll call
if (state == State.PARTITIONS_ASSIGNED) {
// transit to restore active is idempotent so we can call it multiple times
changelogReader.enforceRestoreActive();

if (taskManager.tryToCompleteRestoration()) {
changelogReader.transitToUpdateStandby();

setState(State.RUNNING);
} else {
// transit to restore active is idempotent so we can call it multiple times
changelogReader.transitToRestoreActive();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,8 +286,6 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
activeTasks.values().stream().flatMap(Collection::stream).collect(Collectors.toList()),
logPrefix
);

changelogReader.transitToRestoreActive();
}

private void cleanUpTaskProducer(final Task task,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public void restore() {
}

@Override
public void transitToRestoreActive() {
public void enforceRestoreActive() {
// do nothing
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -766,7 +766,7 @@ public Map<TopicPartition, OffsetAndMetadata> committed(final Set<TopicPartition

// once we are in update active mode, we should not try to update limit offset
time.setCurrentTimeMs(now + 202L);
changelogReader.transitToRestoreActive();
changelogReader.enforceRestoreActive();
changelogReader.restore();
assertEquals(10L, (long) changelogReader.changelogMetadata(tp).endOffset());
assertEquals(4L, changelogReader.changelogMetadata(tp).totalRestored());
Expand Down Expand Up @@ -879,7 +879,7 @@ public Map<TopicPartition, Long> endOffsets(final Collection<TopicPartition> par
assertEquals(ACTIVE_RESTORING, changelogReader.state());

// transition to restore active is idempotent
changelogReader.transitToRestoreActive();
changelogReader.enforceRestoreActive();
assertEquals(ACTIVE_RESTORING, changelogReader.state());

changelogReader.transitToUpdateStandby();
Expand Down Expand Up @@ -907,7 +907,7 @@ public Map<TopicPartition, Long> endOffsets(final Collection<TopicPartition> par
assertEquals(Collections.emptySet(), consumer.paused());
assertEquals(STANDBY_UPDATING, changelogReader.state());

changelogReader.transitToRestoreActive();
changelogReader.enforceRestoreActive();
assertEquals(ACTIVE_RESTORING, changelogReader.state());
assertEquals(mkSet(tp, tp1, tp2), consumer.assignment());
assertEquals(mkSet(tp1, tp2), consumer.paused());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,7 @@ public void shouldAddNewActiveTasks() {
expect(consumer.assignment()).andReturn(emptySet());
consumer.resume(eq(emptySet()));
expectLastCall();
changeLogReader.transitToRestoreActive();
changeLogReader.enforceRestoreActive();
expectLastCall();
expect(activeTaskCreator.createTasks(anyObject(), eq(assignment))).andReturn(singletonList(task00)).anyTimes();
expect(standbyTaskCreator.createTasks(eq(emptyMap()))).andReturn(emptyList()).anyTimes();
Expand Down Expand Up @@ -644,7 +644,7 @@ public void initializeIfNeeded() {
expect(consumer.assignment()).andReturn(emptySet());
consumer.resume(eq(emptySet()));
expectLastCall();
changeLogReader.transitToRestoreActive();
changeLogReader.enforceRestoreActive();
expectLastCall();
expect(activeTaskCreator.createTasks(anyObject(), eq(assignment))).andReturn(asList(task00, task01)).anyTimes();
expect(standbyTaskCreator.createTasks(eq(emptyMap()))).andReturn(emptyList()).anyTimes();
Expand Down Expand Up @@ -685,7 +685,7 @@ public void completeRestoration() {
expect(consumer.assignment()).andReturn(emptySet());
consumer.resume(eq(emptySet()));
expectLastCall();
changeLogReader.transitToRestoreActive();
changeLogReader.enforceRestoreActive();
expectLastCall();
expect(activeTaskCreator.createTasks(anyObject(), eq(assignment))).andReturn(singletonList(task00)).anyTimes();
expect(standbyTaskCreator.createTasks(eq(emptyMap()))).andReturn(emptyList()).anyTimes();
Expand Down Expand Up @@ -841,8 +841,6 @@ public void closeDirty() {
};

resetToStrict(changeLogReader);
changeLogReader.transitToRestoreActive();
expectLastCall();
expect(changeLogReader.completedChangelogs()).andReturn(emptySet());
// make sure we also remove the changelog partitions from the changelog reader
changeLogReader.remove(eq(singletonList(changelog)));
Expand Down Expand Up @@ -927,8 +925,6 @@ public Collection<TopicPartition> changelogPartitions() {
task00.setCommittableOffsetsAndMetadata(offsets);

resetToStrict(changeLogReader);
changeLogReader.transitToRestoreActive();
expectLastCall();
expect(changeLogReader.completedChangelogs()).andReturn(emptySet());
// make sure we also remove the changelog partitions from the changelog reader
changeLogReader.remove(eq(singletonList(changelog)));
Expand Down Expand Up @@ -983,8 +979,6 @@ public Collection<TopicPartition> changelogPartitions() {
};

resetToStrict(changeLogReader);
changeLogReader.transitToRestoreActive();
expectLastCall();
expect(changeLogReader.completedChangelogs()).andReturn(emptySet());
// make sure we also remove the changelog partitions from the changelog reader
changeLogReader.remove(eq(singletonList(changelog)));
Expand Down Expand Up @@ -1053,8 +1047,6 @@ public Map<TopicPartition, Long> prepareCloseClean() {
};

resetToStrict(changeLogReader);
changeLogReader.transitToRestoreActive();
expectLastCall();
expect(changeLogReader.completedChangelogs()).andReturn(emptySet());
// make sure we also remove the changelog partitions from the changelog reader
changeLogReader.remove(eq(singletonList(changelog)));
Expand Down

0 comments on commit 960b216

Please sign in to comment.