Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-9734: Fix IllegalState in Streams transit to standby #8319

Merged
merged 4 commits into from
Mar 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -33,7 +33,7 @@ public interface ChangelogReader extends ChangelogRegister {
/**
* Transit to restore active changelogs mode
*/
void transitToRestoreActive();
void enforceRestoreActive();

/**
* Transit to update standby changelogs mode
Expand Down
Expand Up @@ -21,18 +21,18 @@
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager.StateStoreMetadata;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.slf4j.Logger;

import java.time.Duration;
Expand Down Expand Up @@ -279,15 +279,15 @@ private boolean hasRestoredToEnd(final ChangelogMetadata metadata) {
// NOTE: even if the newly created tasks do not need any restoring, we still first transit to this state and then
// immediately transit back -- there's no overhead of transiting back and forth but simplifies the logic a lot.
@Override
public void transitToRestoreActive() {
public void enforceRestoreActive() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed this method to make it clear we aren't necessarily "transitioning", we actually call it all the time now any time we want to "be in restoreActive".

if (state != ChangelogReaderState.ACTIVE_RESTORING) {
log.debug("Transiting to restore active tasks: {}", changelogs);
}

// pause all partitions that are for standby tasks from the restore consumer
pauseChangelogsFromRestoreConsumer(standbyRestoringChangelogs());
// pause all partitions that are for standby tasks from the restore consumer
pauseChangelogsFromRestoreConsumer(standbyRestoringChangelogs());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved this inside the block because we should only need to do it if we are transitioning from restoring standbys. I wanted to make the idempotent call slightly more efficient, since we now call this method every time in StreamThread#runOnce.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can move the line 290 inside as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, yeah, of course :)


state = ChangelogReaderState.ACTIVE_RESTORING;
state = ChangelogReaderState.ACTIVE_RESTORING;
}
}

// Only after we've completed restoring all active tasks we'll then move back to resume updating standby tasks.
Expand All @@ -300,8 +300,10 @@ public void transitToRestoreActive() {
@Override
public void transitToUpdateStandby() {
if (state != ChangelogReaderState.ACTIVE_RESTORING) {
throw new IllegalStateException("The changelog reader is not restoring active tasks while trying to " +
"transit to update standby tasks: " + changelogs);
throw new IllegalStateException(
"The changelog reader is not restoring active tasks (is " + state + ") while trying to " +
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be a bit redundant, but it wasted a bit of my time to find out that if it wasn't in active_restoring, it must have been in restoring standbys because that's the only other state. I figure it doesn't hurt to just be explicit.

"transit to update standby tasks: " + changelogs
);
}

log.debug("Transiting to update standby tasks: {}", changelogs);
Expand Down
Expand Up @@ -501,14 +501,14 @@ private void runLoop() {
mainConsumer.enforceRebalance();
}
} catch (final TaskCorruptedException e) {
log.warn("Detected the states of tasks {} are corrupted. " +
"Will close the task as dirty and re-create and bootstrap from scratch.", e.corruptedTaskWithChangelogs());
log.warn("Detected the states of tasks " + e.corruptedTaskWithChangelogs() + " are corrupted. " +
"Will close the task as dirty and re-create and bootstrap from scratch.", e);
Comment on lines +504 to +505
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rearranged this so we can include the exception itself. The stacktrace is useful for tracking down the reason for the exception.


taskManager.handleCorruption(e.corruptedTaskWithChangelogs());
} catch (final TaskMigratedException e) {
log.warn("Detected that the thread is being fenced. " +
"This implies that this thread missed a rebalance and dropped out of the consumer group. " +
"Will close out all assigned tasks and rejoin the consumer group.");
"This implies that this thread missed a rebalance and dropped out of the consumer group. " +
"Will close out all assigned tasks and rejoin the consumer group.", e);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, here.


taskManager.handleLostAll();
mainConsumer.unsubscribe();
Expand Down Expand Up @@ -578,13 +578,13 @@ void runOnce() {
// only try to initialize the assigned tasks
// if the state is still in PARTITION_ASSIGNED after the poll call
if (state == State.PARTITIONS_ASSIGNED) {
// transit to restore active is idempotent so we can call it multiple times
changelogReader.enforceRestoreActive();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the primary fix. Instead of relying (hoping) on TaskManager to put the changelog reader into restoring_active, we just idempotently make sure it's in that state any time we're in partitions_assigned.


if (taskManager.tryToCompleteRestoration()) {
changelogReader.transitToUpdateStandby();

setState(State.RUNNING);
} else {
// transit to restore active is idempotent so we can call it multiple times
changelogReader.transitToRestoreActive();
}
}

Expand Down
Expand Up @@ -253,8 +253,6 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
activeTasks.values().stream().flatMap(Collection::stream).collect(Collectors.toList()),
logPrefix
);

changelogReader.transitToRestoreActive();
}

private void addNewTask(final Task task) {
Expand Down
Expand Up @@ -43,7 +43,7 @@ public void restore() {
}

@Override
public void transitToRestoreActive() {
public void enforceRestoreActive() {
// do nothing
}

Expand Down
Expand Up @@ -766,7 +766,7 @@ public Map<TopicPartition, OffsetAndMetadata> committed(final Set<TopicPartition

// once we are in update active mode, we should not try to update limit offset
time.setCurrentTimeMs(now + 202L);
changelogReader.transitToRestoreActive();
changelogReader.enforceRestoreActive();
changelogReader.restore();
assertEquals(10L, (long) changelogReader.changelogMetadata(tp).endOffset());
assertEquals(4L, changelogReader.changelogMetadata(tp).totalRestored());
Expand Down Expand Up @@ -879,7 +879,7 @@ public Map<TopicPartition, Long> endOffsets(final Collection<TopicPartition> par
assertEquals(ACTIVE_RESTORING, changelogReader.state());

// transition to restore active is idempotent
changelogReader.transitToRestoreActive();
changelogReader.enforceRestoreActive();
assertEquals(ACTIVE_RESTORING, changelogReader.state());

changelogReader.transitToUpdateStandby();
Expand Down Expand Up @@ -907,7 +907,7 @@ public Map<TopicPartition, Long> endOffsets(final Collection<TopicPartition> par
assertEquals(Collections.emptySet(), consumer.paused());
assertEquals(STANDBY_UPDATING, changelogReader.state());

changelogReader.transitToRestoreActive();
changelogReader.enforceRestoreActive();
assertEquals(ACTIVE_RESTORING, changelogReader.state());
assertEquals(mkSet(tp, tp1, tp2), consumer.assignment());
assertEquals(mkSet(tp1, tp2), consumer.paused());
Expand Down
Expand Up @@ -511,7 +511,7 @@ public void shouldAddNewActiveTasks() {
expect(consumer.assignment()).andReturn(emptySet());
consumer.resume(eq(emptySet()));
expectLastCall();
changeLogReader.transitToRestoreActive();
changeLogReader.enforceRestoreActive();
expectLastCall();
expect(activeTaskCreator.createTasks(anyObject(), eq(assignment))).andReturn(singletonList(task00)).anyTimes();
expect(standbyTaskCreator.createTasks(eq(emptyMap()))).andReturn(emptyList()).anyTimes();
Expand Down Expand Up @@ -552,7 +552,7 @@ public void initializeIfNeeded() {
expect(consumer.assignment()).andReturn(emptySet());
consumer.resume(eq(emptySet()));
expectLastCall();
changeLogReader.transitToRestoreActive();
changeLogReader.enforceRestoreActive();
expectLastCall();
expect(activeTaskCreator.createTasks(anyObject(), eq(assignment))).andReturn(asList(task00, task01)).anyTimes();
expect(standbyTaskCreator.createTasks(eq(emptyMap()))).andReturn(emptyList()).anyTimes();
Expand Down Expand Up @@ -591,7 +591,7 @@ public void completeRestoration() {
expect(consumer.assignment()).andReturn(emptySet());
consumer.resume(eq(emptySet()));
expectLastCall();
changeLogReader.transitToRestoreActive();
changeLogReader.enforceRestoreActive();
expectLastCall();
expect(activeTaskCreator.createTasks(anyObject(), eq(assignment))).andReturn(singletonList(task00)).anyTimes();
expect(standbyTaskCreator.createTasks(eq(emptyMap()))).andReturn(emptyList()).anyTimes();
Expand Down Expand Up @@ -677,8 +677,6 @@ public void closeClean() {
};

resetToStrict(changeLogReader);
changeLogReader.transitToRestoreActive();
expectLastCall();
Comment on lines -680 to -681
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a strict mock, so we have to remove this expectation, since the TaskManager no longer calls it.

expect(changeLogReader.completedChangelogs()).andReturn(emptySet());
// make sure we also remove the changelog partitions from the changelog reader
changeLogReader.remove(eq(singletonList(changelog)));
Expand Down Expand Up @@ -745,8 +743,6 @@ public Collection<TopicPartition> changelogPartitions() {
};

resetToStrict(changeLogReader);
changeLogReader.transitToRestoreActive();
expectLastCall();
expect(changeLogReader.completedChangelogs()).andReturn(emptySet());
// make sure we also remove the changelog partitions from the changelog reader
changeLogReader.remove(eq(singletonList(changelog)));
Expand Down Expand Up @@ -801,8 +797,6 @@ public Collection<TopicPartition> changelogPartitions() {
};

resetToStrict(changeLogReader);
changeLogReader.transitToRestoreActive();
expectLastCall();
expect(changeLogReader.completedChangelogs()).andReturn(emptySet());
// make sure we also remove the changelog partitions from the changelog reader
changeLogReader.remove(eq(singletonList(changelog)));
Expand Down Expand Up @@ -871,8 +865,6 @@ public void closeClean() {
};

resetToStrict(changeLogReader);
changeLogReader.transitToRestoreActive();
expectLastCall();
expect(changeLogReader.completedChangelogs()).andReturn(emptySet());
// make sure we also remove the changelog partitions from the changelog reader
changeLogReader.remove(eq(singletonList(changelog)));
Expand Down