Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
8dc0f50
remove commit from handleCorruption and handle TaskCorrupted in handl…
ableegoldman Mar 23, 2021
549a5f7
fixin up the tests
ableegoldman Mar 26, 2021
a843f11
skip only corrupted tasks when checkpointing in handleRevocation
ableegoldman Mar 26, 2021
dbec22d
checkstyle
ableegoldman Mar 26, 2021
4ce51e9
comments
ableegoldman Mar 26, 2021
7282884
clean up logic
ableegoldman Mar 26, 2021
5950b39
simplify even further
ableegoldman Mar 26, 2021
d6f8c0b
handle corrupted tasks
ableegoldman Mar 26, 2021
1a579a4
use own catch block
ableegoldman Mar 26, 2021
4d4f41c
add flag to closeAndRevive to conditionally mark changelogs as corru…
ableegoldman Mar 27, 2021
dc91e75
cleanup
ableegoldman Mar 27, 2021
1446ec2
fix test I messed with before
ableegoldman Mar 27, 2021
9c8730a
add unit test
ableegoldman Mar 27, 2021
723520b
backtrack on handleCorrupted, tick the task timer and add another test
ableegoldman Mar 27, 2021
e5cca19
Revert "backtrack on handleCorrupted, tick the task timer and add ano…
ableegoldman Mar 28, 2021
4a9ffff
add second unit test
ableegoldman Mar 28, 2021
669367f
Merge branch 'trunk' of https://github.com/apache/kafka into 12523-im…
ableegoldman Mar 28, 2021
4d23c99
cleanup in handleRevocation
ableegoldman Mar 28, 2021
0d21ff4
javadocs for commit
ableegoldman Mar 28, 2021
ecb7e7c
clear timeout in revive and refactor commit
ableegoldman Mar 28, 2021
9ea3256
update/remove comment
ableegoldman Mar 28, 2021
f6361c0
need to clear commit statuses during close
ableegoldman Mar 28, 2021
4aed51d
clear status in all kinds of close
ableegoldman Mar 28, 2021
9db590b
add timed out tasks to dirty tasks in handleRevocation
ableegoldman Mar 28, 2021
4c78a79
add StreamTaskTest
ableegoldman Mar 28, 2021
55c076b
add KAFKA-12569 jira number to TODOs
ableegoldman Mar 28, 2021
6fcdd56
add unit tests for eos
ableegoldman Mar 29, 2021
fa4cde5
fix flaky test due to strict mock
ableegoldman Mar 29, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ public final Task.State state() {
@Override
public void revive() {
if (state == CLOSED) {
clearTaskTimeout();
transitionTo(CREATED);
} else {
throw new IllegalStateException("Illegal state " + state() + " while reviving task " + id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -494,8 +494,12 @@ public void postCommit(final boolean enforceCheckpoint) {
throw new IllegalStateException("Unknown state " + state() + " while post committing active task " + id);
}

commitRequested = false;
clearCommitStatuses();
}

private void clearCommitStatuses() {
commitNeeded = false;
commitRequested = false;
hasPendingTxCommit = false;
}

Expand All @@ -511,13 +515,15 @@ private Map<TopicPartition, Long> extractPartitionTimes() {
public void closeClean() {
validateClean();
removeAllSensors();
clearCommitStatuses();
close(true);
log.info("Closed clean");
}

@Override
public void closeDirty() {
removeAllSensors();
clearCommitStatuses();
close(false);
log.info("Closed dirty");
}
Expand All @@ -532,6 +538,7 @@ public void updateInputPartitions(final Set<TopicPartition> topicPartitions, fin
public void closeCleanAndRecycleState() {
validateClean();
removeAllSensors();
clearCommitStatuses();
switch (state()) {
case SUSPENDED:
stateMgr.recycle();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,40 +155,55 @@ void handleRebalanceComplete() {
* @throws TaskMigratedException
*/
void handleCorruption(final Set<TaskId> corruptedTasks) {
final Map<Task, Collection<TopicPartition>> corruptedStandbyTasks = new HashMap<>();
final Map<Task, Collection<TopicPartition>> corruptedActiveTasks = new HashMap<>();
final Set<Task> corruptedActiveTasks = new HashSet<>();
final Set<Task> corruptedStandbyTasks = new HashSet<>();

for (final TaskId taskId : corruptedTasks) {
final Task task = tasks.task(taskId);
if (task.isActive()) {
corruptedActiveTasks.put(task, task.changelogPartitions());
corruptedActiveTasks.add(task);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the cleanup! I think in the past we may only mark some subset of changelog partitions as corrupted, but later we would always just mark all of them as corrupted. Just following that thought, maybe in task.markChangelogAsCorrupted we do not need to pass in parameters either but just mark all changelog partitions as corrupted?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't want to go all the way with consolidating this logic since eventually we may want to have it so that only the subset of partitions/stores which are actually corrupted will need to be wiped out. So I'd prefer to leave this as-is for now and keep the places in which we infer the changelogs from the task restricted to just the TaskManager for now

} else {
corruptedStandbyTasks.put(task, task.changelogPartitions());
corruptedStandbyTasks.add(task);
}
}

// Make sure to clean up any corrupted standby tasks in their entirety before committing
// since TaskMigrated can be thrown and the resulting handleLostAll will only clean up active tasks
closeAndRevive(corruptedStandbyTasks);

commit(tasks()
.values()
.stream()
.filter(t -> t.state() == Task.State.RUNNING || t.state() == Task.State.RESTORING)
.filter(t -> !corruptedTasks.contains(t.id()))
.collect(Collectors.toSet())
);
closeDirtyAndRevive(corruptedStandbyTasks, true);

// We need to commit before closing the corrupted active tasks since this will force the ongoing txn to abort
try {
commitAndFillInConsumedOffsetsAndMetadataPerTaskMap(tasks()
.values()
.stream()
.filter(t -> t.state() == Task.State.RUNNING || t.state() == Task.State.RESTORING)
.filter(t -> !corruptedTasks.contains(t.id()))
.collect(Collectors.toSet()),
new HashMap<>()
);
} catch (final TaskCorruptedException e) {
log.info("Some additional tasks were found corrupted while trying to commit, these will be added to the " +
"tasks to clean and revive: {}", e.corruptedTasks());
corruptedActiveTasks.addAll(tasks.tasks(e.corruptedTasks()));
} catch (final TimeoutException e) {
log.info("Hit TimeoutException when committing all non-corrupted tasks, these will be closed and revived");
final Collection<Task> uncorruptedTasks = new HashSet<>(tasks.activeTasks());
uncorruptedTasks.removeAll(corruptedActiveTasks);
// Those tasks which just timed out can just be closed dirty without marking changelogs as corrupted
closeDirtyAndRevive(uncorruptedTasks, false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If closeDirtyAndRevive throws here, then the next closeDirtyAndRevive would not be triggered. Is that okay, or do we guarantee that closeDirtyAndRevive would not throw at all now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we guarantee that closeDirtyAndRevive does not throw -- this isn't a new assumption, since prior to this it was possible for closeDirtyAndRevive to throw for standby tasks which means we would not invoke it for active tasks. We're just doing the same thing here. (Even if we did throw I think it would be ok under both ALOS or EOS, as for EOS this would cause an unclean shutdown which would mean wiping the store anyway, and for ALOS we would just be closing dirty which again is what we were about to do anyway)

}

closeAndRevive(corruptedActiveTasks);
closeDirtyAndRevive(corruptedActiveTasks, true);
}

private void closeAndRevive(final Map<Task, Collection<TopicPartition>> taskWithChangelogs) {
for (final Map.Entry<Task, Collection<TopicPartition>> entry : taskWithChangelogs.entrySet()) {
final Task task = entry.getKey();
private void closeDirtyAndRevive(final Collection<Task> taskWithChangelogs, final boolean markAsCorrupted) {
for (final Task task : taskWithChangelogs) {
final Collection<TopicPartition> corruptedPartitions = task.changelogPartitions();

// mark corrupted partitions to not be checkpointed, and then close the task as dirty
final Collection<TopicPartition> corruptedPartitions = entry.getValue();
task.markChangelogAsCorrupted(corruptedPartitions);
if (markAsCorrupted) {
task.markChangelogAsCorrupted(corruptedPartitions);
}

try {
// we do not need to take the returned offsets since we are not going to commit anyways;
Expand All @@ -201,8 +216,11 @@ private void closeAndRevive(final Map<Task, Collection<TopicPartition>> taskWith

try {
task.suspend();

// we need to enforce a checkpoint that removes the corrupted partitions
task.postCommit(true);
if (markAsCorrupted) {
task.postCommit(true);
}
} catch (final RuntimeException swallow) {
log.error("Error suspending corrupted task {} ", task.id(), swallow);
}
Expand Down Expand Up @@ -332,8 +350,8 @@ private void handleCloseAndRecycle(final Set<Task> tasksToRecycle,
// write the checkpoint file.
final Map<TopicPartition, OffsetAndMetadata> offsets = task.prepareCommit();
if (!offsets.isEmpty()) {
log.error("Task {} should has been committed when it was suspended, but it reports non-empty " +
"offsets {} to commit; it means it fails during last commit and hence should be closed dirty",
log.error("Task {} should have been committed when it was suspended, but it reports non-empty " +
"offsets {} to commit; this means it failed during last commit and hence should be closed dirty",
task.id(), offsets);

tasksToCloseDirty.add(task);
Expand Down Expand Up @@ -509,30 +527,47 @@ void handleRevocation(final Collection<TopicPartition> revokedPartitions) {
prepareCommitAndAddOffsetsToMap(commitNeededActiveTasks, consumedOffsetsPerTask);
}

// even if commit failed, we should still continue and complete suspending those tasks,
// so we would capture any exception and throw
// even if commit failed, we should still continue and complete suspending those tasks, so we would capture
// any exception and rethrow it at the end. some exceptions may be handled immediately and then swallowed,
// as such we just need to skip those dirty tasks in the checkpoint
final Set<Task> dirtyTasks = new HashSet<>();
try {
commitOffsetsOrTransaction(consumedOffsetsPerTask);
} catch (final TaskCorruptedException e) {
log.warn("Some tasks were corrupted when trying to commit offsets, these will be cleaned and revived: {}",
e.corruptedTasks());

// If we hit a TaskCorruptedException it must be EOS, just handle the cleanup for those corrupted tasks right here
dirtyTasks.addAll(tasks.tasks(e.corruptedTasks()));
closeDirtyAndRevive(dirtyTasks, true);
} catch (final TimeoutException e) {
log.warn("Timed out while trying to commit all tasks during revocation, these will be cleaned and revived");

// If we hit a TimeoutException it must be ALOS, just close dirty and revive without wiping the state
dirtyTasks.addAll(consumedOffsetsPerTask.keySet());
closeDirtyAndRevive(dirtyTasks, false);
} catch (final RuntimeException e) {
log.error("Exception caught while committing those revoked tasks " + revokedActiveTasks, e);
firstException.compareAndSet(null, e);
dirtyTasks.addAll(consumedOffsetsPerTask.keySet());
}

// only try to complete post-commit if committing succeeded;
// we enforce checkpointing upon suspending a task: if it is resumed later we just
// proceed normally, if it is going to be closed we would checkpoint by then
if (firstException.get() == null) {
for (final Task task : revokedActiveTasks) {
// we enforce checkpointing upon suspending a task: if it is resumed later we just proceed normally, if it is
// going to be closed we would checkpoint by then
for (final Task task : revokedActiveTasks) {
if (!dirtyTasks.contains(task)) {
try {
task.postCommit(true);
} catch (final RuntimeException e) {
log.error("Exception caught while post-committing task " + task.id(), e);
firstException.compareAndSet(null, e);
}
}
}

if (shouldCommitAdditionalTasks) {
for (final Task task : commitNeededActiveTasks) {
if (shouldCommitAdditionalTasks) {
for (final Task task : commitNeededActiveTasks) {
if (!dirtyTasks.contains(task)) {
try {
// for non-revoking active tasks, we should not enforce checkpoint
// since if it is EOS enabled, no checkpoint should be written while
Expand Down Expand Up @@ -972,42 +1007,53 @@ void addRecordsToTasks(final ConsumerRecords<byte[], byte[]> records) {
/**
* @throws TaskMigratedException if committing offsets failed (non-EOS)
* or if the task producer got fenced (EOS)
* @throws TimeoutException if task.timeout.ms has been exceeded (non-EOS)
* @throws TaskCorruptedException if committing offsets failed due to TimeoutException (EOS)
* @return number of committed offsets, or -1 if we are in the middle of a rebalance and cannot commit
*/
int commit(final Collection<Task> tasksToCommit) {
int committed = 0;
if (rebalanceInProgress) {
return -1;
committed = -1;
} else {
int committed = 0;
final Map<Task, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask = new HashMap<>();
for (final Task task : tasksToCommit) {
if (task.commitNeeded()) {
final Map<TopicPartition, OffsetAndMetadata> offsetAndMetadata = task.prepareCommit();
if (task.isActive()) {
consumedOffsetsAndMetadataPerTask.put(task, offsetAndMetadata);
}
}
}

try {
commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);

for (final Task task : tasksToCommit) {
if (task.commitNeeded()) {
task.clearTaskTimeout();
++committed;
task.postCommit(false);
}
}
committed = commitAndFillInConsumedOffsetsAndMetadataPerTaskMap(tasksToCommit, consumedOffsetsAndMetadataPerTask);
} catch (final TimeoutException timeoutException) {
consumedOffsetsAndMetadataPerTask
.keySet()
.forEach(t -> t.maybeInitTaskTimeoutOrThrow(time.milliseconds(), timeoutException));
}
}
return committed;
}

/**
* @param consumedOffsetsAndMetadataPerTask an empty map that will be filled in with the prepared offsets
*/
private int commitAndFillInConsumedOffsetsAndMetadataPerTaskMap(final Collection<Task> tasksToCommit,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just pulled all the actual contents, excluding the TimeoutException + maybeInitTaskTimeoutOrThrow handling, so we could use it in handleCorruption without that stuff

final Map<Task, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask) {
int committed = 0;

return committed;
for (final Task task : tasksToCommit) {
if (task.commitNeeded()) {
final Map<TopicPartition, OffsetAndMetadata> offsetAndMetadata = task.prepareCommit();
if (task.isActive()) {
consumedOffsetsAndMetadataPerTask.put(task, offsetAndMetadata);
}
}
}

commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);

for (final Task task : tasksToCommit) {
if (task.commitNeeded()) {
task.clearTaskTimeout();
++committed;
task.postCommit(false);
}
}
return committed;
}

/**
Expand All @@ -1027,6 +1073,11 @@ int maybeCommitActiveTasksPerUserRequested() {
}
}

/**
* @throws TaskMigratedException if committing offsets failed due to CommitFailedException (non-EOS)
* @throws TimeoutException if committing offsets failed due to TimeoutException (non-EOS)
* @throws TaskCorruptedException if committing offsets failed due to TimeoutException (EOS)
*/
private void commitOffsetsOrTransaction(final Map<Task, Map<TopicPartition, OffsetAndMetadata>> offsetsPerTask) {
log.debug("Committing task offsets {}", offsetsPerTask.entrySet().stream().collect(Collectors.toMap(t -> t.getKey().id(), Entry::getValue))); // avoid logging actual Task objects

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;

import java.util.HashSet;
import org.slf4j.Logger;

import java.util.Collection;
Expand Down Expand Up @@ -234,6 +236,14 @@ Task task(final TaskId taskId) {
return allTasksPerId.get(taskId);
}

Collection<Task> tasks(final Collection<TaskId> taskIds) {
final Set<Task> tasks = new HashSet<>();
for (final TaskId taskId : taskIds) {
tasks.add(task(taskId));
}
return tasks;
}

// TODO: change return type to `StreamTask`
Collection<Task> activeTasks() {
return readOnlyActiveTasks;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2031,6 +2031,24 @@ public void shouldUnregisterMetricsInCloseCleanAndRecycleState() {
assertThat(getTaskMetrics(), empty());
}

@Test
public void shouldClearCommitStatusesInCloseDirty() {
task = createSingleSourceStateless(createConfig(AT_LEAST_ONCE, "0"), StreamsConfig.METRICS_LATEST);
task.initializeIfNeeded();
task.completeRestoration(noOpResetter -> { });

task.addRecords(partition1, singletonList(getConsumerRecordWithOffsetAsTimestamp(partition1, 0)));
assertTrue(task.process(0L));
task.requestCommit();

task.suspend();
assertThat(task.commitNeeded(), is(true));
assertThat(task.commitRequested(), is(true));
task.closeDirty();
assertThat(task.commitNeeded(), is(false));
assertThat(task.commitRequested(), is(false));
}

@Test
public void closeShouldBeIdempotent() {
EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
Expand Down
Loading