Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -278,13 +278,20 @@ private void processWork(ComputationState computationState, Work work) {
recordProcessingStats(commitRequest, workItem, executeWorkResult);
LOG.debug("Processing done for work token: {}", workItem.getWorkToken());
} catch (Throwable t) {
workFailureProcessor.logAndProcessFailure(
computationId,
ExecutableWork.create(work, retry -> processWork(computationState, retry)),
t,
invalidWork ->
computationState.completeWorkAndScheduleNextWorkForKey(
invalidWork.getShardedKey(), invalidWork.id()));
// OutOfMemoryError that are caught will be rethrown and trigger jvm termination.
try {
workFailureProcessor.logAndProcessFailure(
computationId,
ExecutableWork.create(work, retry -> processWork(computationState, retry)),
t,
invalidWork ->
computationState.completeWorkAndScheduleNextWorkForKey(
invalidWork.getShardedKey(), invalidWork.id()));
} catch (OutOfMemoryError oom) {
throw oom;
} catch (Throwable t2) {
throw new RuntimeException(t2);
}
} finally {
// Update total processing time counters. Updating in finally clause ensures that
// work items causing exceptions are also accounted in time spent.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,20 @@ public void logAndProcessFailure(
String computationId,
ExecutableWork executableWork,
Throwable t,
Consumer<Work> onInvalidWork) {
if (shouldRetryLocally(computationId, executableWork.work(), t)) {
// Try again after some delay and at the end of the queue to avoid a tight loop.
executeWithDelay(retryLocallyDelayMs, executableWork);
} else {
// Consider the item invalid. It will eventually be retried by Windmill if it still needs to
// be processed.
onInvalidWork.accept(executableWork.work());
Consumer<Work> onInvalidWork)
throws Throwable {
switch (evaluateRetry(computationId, executableWork.work(), t)) {
case DO_NOT_RETRY:
// Consider the item invalid. It will eventually be retried by Windmill if it still needs to
// be processed.
onInvalidWork.accept(executableWork.work());
break;
case RETRY_LOCALLY:
// Try again after some delay and at the end of the queue to avoid a tight loop.
executeWithDelay(retryLocallyDelayMs, executableWork);
break;
case RETHROW_THROWABLE:
throw t;
}
}

Expand All @@ -131,7 +137,13 @@ private void executeWithDelay(long delayMs, ExecutableWork executableWork) {
executableWork, executableWork.work().getSerializedWorkItemSize());
}

private boolean shouldRetryLocally(String computationId, Work work, Throwable t) {
private enum RetryEvaluation {
DO_NOT_RETRY,
RETRY_LOCALLY,
RETHROW_THROWABLE,
}

private RetryEvaluation evaluateRetry(String computationId, Work work, Throwable t) {
@Nullable final Throwable cause = t.getCause();
Throwable parsedException = (t instanceof UserCodeException && cause != null) ? cause : t;
if (KeyTokenInvalidException.isKeyTokenInvalidException(parsedException)) {
Expand All @@ -140,53 +152,59 @@ private boolean shouldRetryLocally(String computationId, Work work, Throwable t)
+ "Work will not be retried locally.",
computationId,
work.getWorkItem().getShardingKey());
} else if (WorkItemCancelledException.isWorkItemCancelledException(parsedException)) {
return RetryEvaluation.DO_NOT_RETRY;
}
if (WorkItemCancelledException.isWorkItemCancelledException(parsedException)) {
LOG.debug(
"Execution of work for computation '{}' on sharding key '{}' failed. "
+ "Work will not be retried locally.",
computationId,
work.getWorkItem().getShardingKey());
} else {
LastExceptionDataProvider.reportException(parsedException);
LOG.debug("Failed work: {}", work);
Duration elapsedTimeSinceStart = new Duration(work.getStartTime(), clock.get());
if (!failureTracker.trackFailure(computationId, work.getWorkItem(), parsedException)) {
LOG.error(
"Execution of work for computation '{}' on sharding key '{}' failed with uncaught exception, "
+ "and Windmill indicated not to retry locally.",
computationId,
work.getWorkItem().getShardingKey(),
parsedException);
} else if (isOutOfMemoryError(parsedException)) {
String heapDump = tryToDumpHeap();
LOG.error(
"Execution of work for computation '{}' for sharding key '{}' failed with out-of-memory. "
+ "Work will not be retried locally. Heap dump {}.",
computationId,
work.getWorkItem().getShardingKey(),
heapDump,
parsedException);
} else if (elapsedTimeSinceStart.isLongerThan(MAX_LOCAL_PROCESSING_RETRY_DURATION)) {
LOG.error(
"Execution of work for computation '{}' for sharding key '{}' failed with uncaught exception, "
+ "and it will not be retried locally because the elapsed time since start {} "
+ "exceeds {}.",
computationId,
work.getWorkItem().getShardingKey(),
elapsedTimeSinceStart,
MAX_LOCAL_PROCESSING_RETRY_DURATION,
parsedException);
} else {
LOG.error(
"Execution of work for computation '{}' on sharding key '{}' failed with uncaught exception. "
+ "Work will be retried locally.",
computationId,
work.getWorkItem().getShardingKey(),
parsedException);
return true;
}
return RetryEvaluation.DO_NOT_RETRY;
}

return false;
LastExceptionDataProvider.reportException(parsedException);
LOG.debug("Failed work: {}", work);
Duration elapsedTimeSinceStart = new Duration(work.getStartTime(), clock.get());
if (isOutOfMemoryError(parsedException)) {
String heapDump = tryToDumpHeap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we guard against/log if tryToDumpHeap throws?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MemoryMonitor itself already catches things, if it does throw it also works out ok so just leaving as is to keep simpler.

LOG.error(
"Execution of work for computation '{}' for sharding key '{}' failed with out-of-memory. "
+ "Work will not be retried locally. Heap dump {}.",
computationId,
work.getWorkItem().getShardingKey(),
heapDump,
parsedException);
return RetryEvaluation.RETHROW_THROWABLE;
}

if (!failureTracker.trackFailure(computationId, work.getWorkItem(), parsedException)) {
LOG.error(
"Execution of work for computation '{}' on sharding key '{}' failed with uncaught exception, "
+ "and Windmill indicated not to retry locally.",
computationId,
work.getWorkItem().getShardingKey(),
parsedException);
return RetryEvaluation.DO_NOT_RETRY;
}
if (elapsedTimeSinceStart.isLongerThan(MAX_LOCAL_PROCESSING_RETRY_DURATION)) {
LOG.error(
"Execution of work for computation '{}' for sharding key '{}' failed with uncaught exception, "
+ "and it will not be retried locally because the elapsed time since start {} "
+ "exceeds {}.",
computationId,
work.getWorkItem().getShardingKey(),
elapsedTimeSinceStart,
MAX_LOCAL_PROCESSING_RETRY_DURATION,
parsedException);
return RetryEvaluation.DO_NOT_RETRY;
}
LOG.error(
"Execution of work for computation '{}' on sharding key '{}' failed with uncaught exception. "
+ "Work will be retried locally.",
computationId,
work.getWorkItem().getShardingKey(),
parsedException);
return RetryEvaluation.RETRY_LOCALLY;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.beam.runners.dataflow.worker.windmill.work.processing.failures;

import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertThrows;
import static org.mockito.Mockito.mock;

import java.util.HashSet;
Expand Down Expand Up @@ -105,7 +106,7 @@ private static ExecutableWork createWork(Consumer<Work> processWorkFn) {
}

@Test
public void logAndProcessFailure_doesNotRetryKeyTokenInvalidException() {
public void logAndProcessFailure_doesNotRetryKeyTokenInvalidException() throws Throwable {
Set<Work> executedWork = new HashSet<>();
ExecutableWork work = createWork(executedWork::add);
WorkFailureProcessor workFailureProcessor =
Expand All @@ -119,7 +120,7 @@ public void logAndProcessFailure_doesNotRetryKeyTokenInvalidException() {
}

@Test
public void logAndProcessFailure_doesNotRetryWhenWorkItemCancelled() {
public void logAndProcessFailure_doesNotRetryWhenWorkItemCancelled() throws Throwable {
Set<Work> executedWork = new HashSet<>();
ExecutableWork work = createWork(executedWork::add);
WorkFailureProcessor workFailureProcessor =
Expand All @@ -142,15 +143,18 @@ public void logAndProcessFailure_doesNotRetryOOM() {
WorkFailureProcessor workFailureProcessor =
createWorkFailureProcessor(streamingEngineFailureReporter());
Set<Work> invalidWork = new HashSet<>();
workFailureProcessor.logAndProcessFailure(
DEFAULT_COMPUTATION_ID, work, new OutOfMemoryError(), invalidWork::add);
assertThrows(
OutOfMemoryError.class,
() ->
workFailureProcessor.logAndProcessFailure(
DEFAULT_COMPUTATION_ID, work, new OutOfMemoryError(), invalidWork::add));

assertThat(executedWork).isEmpty();
assertThat(invalidWork).containsExactly(work.work());
}

@Test
public void logAndProcessFailure_doesNotRetryWhenFailureReporterMarksAsNonRetryable() {
public void logAndProcessFailure_doesNotRetryWhenFailureReporterMarksAsNonRetryable()
throws Throwable {
Set<Work> executedWork = new HashSet<>();
ExecutableWork work = createWork(executedWork::add);
WorkFailureProcessor workFailureProcessor =
Expand All @@ -164,7 +168,7 @@ public void logAndProcessFailure_doesNotRetryWhenFailureReporterMarksAsNonRetrya
}

@Test
public void logAndProcessFailure_doesNotRetryAfterLocalRetryTimeout() {
public void logAndProcessFailure_doesNotRetryAfterLocalRetryTimeout() throws Throwable {
Set<Work> executedWork = new HashSet<>();
ExecutableWork veryOldWork =
createWork(() -> Instant.now().minus(Duration.standardDays(30)), executedWork::add);
Expand All @@ -180,7 +184,7 @@ public void logAndProcessFailure_doesNotRetryAfterLocalRetryTimeout() {

@Test
public void logAndProcessFailure_retriesOnUncaughtUnhandledException_streamingEngine()
throws InterruptedException {
throws Throwable {
CountDownLatch runWork = new CountDownLatch(1);
ExecutableWork work = createWork(ignored -> runWork.countDown());
WorkFailureProcessor workFailureProcessor =
Expand All @@ -195,7 +199,7 @@ public void logAndProcessFailure_retriesOnUncaughtUnhandledException_streamingEn

@Test
public void logAndProcessFailure_retriesOnUncaughtUnhandledException_streamingAppliance()
throws InterruptedException {
throws Throwable {
CountDownLatch runWork = new CountDownLatch(1);
ExecutableWork work = createWork(ignored -> runWork.countDown());
WorkFailureProcessor workFailureProcessor =
Expand Down
Loading