Skip to content

Commit

Permalink
Preserve a previous exception in GrpcRetryer in case of DEADLINE_EXCE…
Browse files Browse the repository at this point in the history
…EDED

Issue temporalio#666
  • Loading branch information
Spikhalskiy committed Aug 26, 2021
1 parent 6d55180 commit ea1ff88
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,6 @@
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Convenience methods to be used by unit tests and during development.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public <R> CompletableFuture<R> retry(
options.getMaximumInterval(),
options.getBackoffCoefficient());
CompletableFuture<R> resultCF = new CompletableFuture<>();
retry(options, function, 1, startTime, throttler, resultCF);
retry(options, function, 1, startTime, null, throttler, resultCF);
return resultCF;
}

Expand All @@ -56,6 +56,7 @@ private <R> void retry(
Supplier<CompletableFuture<R>> function,
int attempt,
long startTime,
StatusRuntimeException previousException,
AsyncBackoffThrottler throttler,
CompletableFuture<R> resultCF) {
options.validate();
Expand All @@ -74,7 +75,16 @@ private <R> void retry(
// CompletableFuture even if it's a failed one.
// But if this happens - process the same way as it would be an exception from
// completable future
failOrRetry(options, function, attempt, startTime, throttler, e, resultCF);
// Do not retry if it's not StatusRuntimeException
failOrRetry(
options,
function,
attempt,
startTime,
throttler,
previousException,
e,
resultCF);
return;
}
if (result == null) {
Expand All @@ -89,7 +99,15 @@ private <R> void retry(
resultCF.complete(r);
} else {
throttler.failure();
failOrRetry(options, function, attempt, startTime, throttler, e, resultCF);
failOrRetry(
options,
function,
attempt,
startTime,
throttler,
previousException,
e,
resultCF);
}
});
});
Expand All @@ -101,37 +119,39 @@ private <R> void failOrRetry(
int attempt,
long startTime,
AsyncBackoffThrottler throttler,
Throwable lastException,
StatusRuntimeException previousException,
Throwable currentException,
CompletableFuture<R> resultCF) {

// If exception is thrown from CompletionStage/CompletableFuture methods like compose or handle
// - it gets wrapped into CompletionException, so here we need to unwrap it. We can get not
// wrapped raw exception here too if CompletableFuture was explicitly filled with this exception
// using CompletableFuture.completeExceptionally
lastException = unwrapCompletionException(lastException);
currentException = unwrapCompletionException(currentException);

// Do not retry if it's not StatusRuntimeException
if (!(lastException instanceof StatusRuntimeException)) {
resultCF.completeExceptionally(lastException);
if (!(currentException instanceof StatusRuntimeException)) {
resultCF.completeExceptionally(currentException);
return;
}

StatusRuntimeException exception = (StatusRuntimeException) lastException;
StatusRuntimeException statusRuntimeException = (StatusRuntimeException) currentException;

RuntimeException finalException =
GrpcRetryerUtils.createFinalExceptionIfNotRetryable(exception, options);
GrpcRetryerUtils.createFinalExceptionIfNotRetryable(
statusRuntimeException, previousException, options);
if (finalException != null) {
resultCF.completeExceptionally(finalException);
return;
}

if (GrpcRetryerUtils.ranOutOfRetries(options, startTime, clock.millis(), attempt)) {
resultCF.completeExceptionally(exception);
resultCF.completeExceptionally(statusRuntimeException);
return;
}

log.debug("Retrying after failure", lastException);
retry(options, function, attempt + 1, startTime, throttler, resultCF);
log.debug("Retrying after failure", currentException);
retry(options, function, attempt + 1, startTime, statusRuntimeException, throttler, resultCF);
}

private static Throwable unwrapCompletionException(Throwable e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,18 @@ class GrpcRetryerUtils {
* This method encapsulates the logic if {@code StatusRuntimeException exception} is retryable or
* not.
*
* @param exception exception to analyze
* @param currentException exception to analyze
* @param previousException previus exception happened before this one, {@code null} if {@code
* currentException} is the first exception in the chain
* @param options retry options
* @return null if the {@code exception} can be retried, a final exception to throw in the
* external code otherwise
*/
static @Nullable RuntimeException createFinalExceptionIfNotRetryable(
StatusRuntimeException exception, RpcRetryOptions options) {
Status.Code code = exception.getStatus().getCode();
StatusRuntimeException currentException,
StatusRuntimeException previousException,
RpcRetryOptions options) {
Status.Code code = currentException.getStatus().getCode();

switch (code) {
// CANCELLED and DEADLINE_EXCEEDED usually considered non-retryable in GRPC world, for
Expand All @@ -48,13 +52,13 @@ class GrpcRetryerUtils {
case CANCELLED:
return new CancellationException();
case DEADLINE_EXCEEDED:
return exception;
return previousException != null ? previousException : currentException;
default:
for (RpcRetryOptions.DoNotRetryItem pair : options.getDoNotRetry()) {
if (pair.getCode() == code
&& (pair.getDetailsClass() == null
|| StatusUtils.hasFailure(exception, pair.getDetailsClass()))) {
return exception;
|| StatusUtils.hasFailure(currentException, pair.getDetailsClass()))) {
return currentException;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public <R, T extends Throwable> R retry(
options.getMaximumInterval(),
options.getBackoffCoefficient());

Exception lastException = null;
StatusRuntimeException lastException = null;
while (!GrpcRetryerUtils.ranOutOfRetries(options, startTime, clock.millis(), attempt)) {
attempt++;

Expand All @@ -63,12 +63,12 @@ public <R, T extends Throwable> R retry(
Thread.currentThread().interrupt();
throw new CancellationException();
} catch (StatusRuntimeException e) {
lastException = e;
RuntimeException finalException =
GrpcRetryerUtils.createFinalExceptionIfNotRetryable(e, options);
GrpcRetryerUtils.createFinalExceptionIfNotRetryable(e, lastException, options);
if (finalException != null) {
throw finalException;
}
lastException = e;
}
// No catch block for any other exceptions because we don't retry them, we pass them through.
// It's designed this way because it's GrpcRetryer, not general purpose retryer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,4 +215,37 @@ public void testDeadlineExceededException() throws InterruptedException {

assertEquals("If the exception is DEADLINE_EXCEEDED, we shouldn't retry", 1, attempts.get());
}

@Test
public void testDeadlineExceededAfterAnotherException() throws InterruptedException {
RpcRetryOptions options =
RpcRetryOptions.newBuilder()
.setInitialInterval(Duration.ofMillis(10))
.setMaximumInterval(Duration.ofMillis(100))
.validateBuildWithDefaults();
long start = System.currentTimeMillis();
final AtomicInteger attempts = new AtomicInteger();
try {
DEFAULT_ASYNC_RETRYER
.retry(
options,
() -> {
CompletableFuture<?> future = new CompletableFuture<>();
future.completeExceptionally(
new StatusRuntimeException(
attempts.incrementAndGet() > 1
? Status.fromCode(Status.Code.DEADLINE_EXCEEDED)
: Status.fromCode(Status.Code.DATA_LOSS)));
return future;
})
.get();
fail("unreachable");
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof StatusRuntimeException);
assertEquals(
"We should get a previous exception in case of DEADLINE_EXCEEDED",
Status.Code.DATA_LOSS,
((StatusRuntimeException) e.getCause()).getStatus().getCode());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -162,4 +162,32 @@ public void testDeadlineExceededException() {

assertEquals("If the exception is DEADLINE_EXCEEDED - we shouldn't retry", 1, attempts.get());
}

@Test
public void testDeadlineExceededAfterAnotherException() {
RpcRetryOptions options =
RpcRetryOptions.newBuilder()
.setInitialInterval(Duration.ofMillis(10))
.setMaximumInterval(Duration.ofMillis(100))
.validateBuildWithDefaults();
final AtomicInteger attempts = new AtomicInteger();
try {
DEFAULT_SYNC_RETRYER.retry(
options,
() -> {
if (attempts.incrementAndGet() > 1) {
throw new StatusRuntimeException(Status.fromCode(Status.Code.DEADLINE_EXCEEDED));
} else {
throw new StatusRuntimeException(Status.fromCode(Status.Code.DATA_LOSS));
}
});
fail("unreachable");
} catch (Exception e) {
assertTrue(e instanceof StatusRuntimeException);
assertEquals(
"We should get a previous exception in case of DEADLINE_EXCEEDED",
Status.Code.DATA_LOSS,
((StatusRuntimeException) e).getStatus().getCode());
}
}
}

0 comments on commit ea1ff88

Please sign in to comment.