From ea1ff88abaa44b658e742554ad725049c476a8e0 Mon Sep 17 00:00:00 2001 From: Dmitry Spikhalskiy Date: Thu, 26 Aug 2021 16:41:02 -0400 Subject: [PATCH] Preserve a previous exception in GrpcRetryer in case of DEADLINE_EXCEEDED Issue #666 --- .../common/WorkflowExecutionUtils.java | 2 - .../internal/retryer/GrpcAsyncRetryer.java | 44 ++++++++++++++----- .../internal/retryer/GrpcRetryerUtils.java | 16 ++++--- .../internal/retryer/GrpcSyncRetryer.java | 6 +-- .../retryer/GrpcAsyncRetryerTest.java | 33 ++++++++++++++ .../internal/retryer/GrpcSyncRetryerTest.java | 28 ++++++++++++ 6 files changed, 106 insertions(+), 23 deletions(-) diff --git a/temporal-sdk/src/main/java/io/temporal/internal/common/WorkflowExecutionUtils.java b/temporal-sdk/src/main/java/io/temporal/internal/common/WorkflowExecutionUtils.java index 95eeccfae1..d64640382f 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/common/WorkflowExecutionUtils.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/common/WorkflowExecutionUtils.java @@ -82,8 +82,6 @@ import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Convenience methods to be used by unit tests and during development. diff --git a/temporal-serviceclient/src/main/java/io/temporal/internal/retryer/GrpcAsyncRetryer.java b/temporal-serviceclient/src/main/java/io/temporal/internal/retryer/GrpcAsyncRetryer.java index f11e52c516..801377d4a7 100644 --- a/temporal-serviceclient/src/main/java/io/temporal/internal/retryer/GrpcAsyncRetryer.java +++ b/temporal-serviceclient/src/main/java/io/temporal/internal/retryer/GrpcAsyncRetryer.java @@ -47,7 +47,7 @@ public CompletableFuture retry( options.getMaximumInterval(), options.getBackoffCoefficient()); CompletableFuture resultCF = new CompletableFuture<>(); - retry(options, function, 1, startTime, throttler, resultCF); + retry(options, function, 1, startTime, null, throttler, resultCF); return resultCF; } @@ -56,6 +56,7 @@ private void retry( Supplier> function, int attempt, long startTime, + StatusRuntimeException previousException, AsyncBackoffThrottler throttler, CompletableFuture resultCF) { options.validate(); @@ -74,7 +75,16 @@ private void retry( // CompletableFuture even if it's a failed one. // But if this happens - process the same way as it would be an exception from // completable future - failOrRetry(options, function, attempt, startTime, throttler, e, resultCF); + // Do not retry if it's not StatusRuntimeException + failOrRetry( + options, + function, + attempt, + startTime, + throttler, + previousException, + e, + resultCF); return; } if (result == null) { @@ -89,7 +99,15 @@ private void retry( resultCF.complete(r); } else { throttler.failure(); - failOrRetry(options, function, attempt, startTime, throttler, e, resultCF); + failOrRetry( + options, + function, + attempt, + startTime, + throttler, + previousException, + e, + resultCF); } }); }); @@ -101,37 +119,39 @@ private void failOrRetry( int attempt, long startTime, AsyncBackoffThrottler throttler, - Throwable lastException, + StatusRuntimeException previousException, + Throwable currentException, CompletableFuture resultCF) { // If exception is thrown from CompletionStage/CompletableFuture methods like compose or handle // - it gets wrapped into CompletionException, so here we need to unwrap it. We can get not // wrapped raw exception here too if CompletableFuture was explicitly filled with this exception // using CompletableFuture.completeExceptionally - lastException = unwrapCompletionException(lastException); + currentException = unwrapCompletionException(currentException); // Do not retry if it's not StatusRuntimeException - if (!(lastException instanceof StatusRuntimeException)) { - resultCF.completeExceptionally(lastException); + if (!(currentException instanceof StatusRuntimeException)) { + resultCF.completeExceptionally(currentException); return; } - StatusRuntimeException exception = (StatusRuntimeException) lastException; + StatusRuntimeException statusRuntimeException = (StatusRuntimeException) currentException; RuntimeException finalException = - GrpcRetryerUtils.createFinalExceptionIfNotRetryable(exception, options); + GrpcRetryerUtils.createFinalExceptionIfNotRetryable( + statusRuntimeException, previousException, options); if (finalException != null) { resultCF.completeExceptionally(finalException); return; } if (GrpcRetryerUtils.ranOutOfRetries(options, startTime, clock.millis(), attempt)) { - resultCF.completeExceptionally(exception); + resultCF.completeExceptionally(statusRuntimeException); return; } - log.debug("Retrying after failure", lastException); - retry(options, function, attempt + 1, startTime, throttler, resultCF); + log.debug("Retrying after failure", currentException); + retry(options, function, attempt + 1, startTime, statusRuntimeException, throttler, resultCF); } private static Throwable unwrapCompletionException(Throwable e) { diff --git a/temporal-serviceclient/src/main/java/io/temporal/internal/retryer/GrpcRetryerUtils.java b/temporal-serviceclient/src/main/java/io/temporal/internal/retryer/GrpcRetryerUtils.java index 46ac2b1b3d..b78458155f 100644 --- a/temporal-serviceclient/src/main/java/io/temporal/internal/retryer/GrpcRetryerUtils.java +++ b/temporal-serviceclient/src/main/java/io/temporal/internal/retryer/GrpcRetryerUtils.java @@ -32,14 +32,18 @@ class GrpcRetryerUtils { * This method encapsulates the logic if {@code StatusRuntimeException exception} is retryable or * not. * - * @param exception exception to analyze + * @param currentException exception to analyze + * @param previousException previus exception happened before this one, {@code null} if {@code + * currentException} is the first exception in the chain * @param options retry options * @return null if the {@code exception} can be retried, a final exception to throw in the * external code otherwise */ static @Nullable RuntimeException createFinalExceptionIfNotRetryable( - StatusRuntimeException exception, RpcRetryOptions options) { - Status.Code code = exception.getStatus().getCode(); + StatusRuntimeException currentException, + StatusRuntimeException previousException, + RpcRetryOptions options) { + Status.Code code = currentException.getStatus().getCode(); switch (code) { // CANCELLED and DEADLINE_EXCEEDED usually considered non-retryable in GRPC world, for @@ -48,13 +52,13 @@ class GrpcRetryerUtils { case CANCELLED: return new CancellationException(); case DEADLINE_EXCEEDED: - return exception; + return previousException != null ? previousException : currentException; default: for (RpcRetryOptions.DoNotRetryItem pair : options.getDoNotRetry()) { if (pair.getCode() == code && (pair.getDetailsClass() == null - || StatusUtils.hasFailure(exception, pair.getDetailsClass()))) { - return exception; + || StatusUtils.hasFailure(currentException, pair.getDetailsClass()))) { + return currentException; } } } diff --git a/temporal-serviceclient/src/main/java/io/temporal/internal/retryer/GrpcSyncRetryer.java b/temporal-serviceclient/src/main/java/io/temporal/internal/retryer/GrpcSyncRetryer.java index c8dce66513..7677bfea97 100644 --- a/temporal-serviceclient/src/main/java/io/temporal/internal/retryer/GrpcSyncRetryer.java +++ b/temporal-serviceclient/src/main/java/io/temporal/internal/retryer/GrpcSyncRetryer.java @@ -46,7 +46,7 @@ public R retry( options.getMaximumInterval(), options.getBackoffCoefficient()); - Exception lastException = null; + StatusRuntimeException lastException = null; while (!GrpcRetryerUtils.ranOutOfRetries(options, startTime, clock.millis(), attempt)) { attempt++; @@ -63,12 +63,12 @@ public R retry( Thread.currentThread().interrupt(); throw new CancellationException(); } catch (StatusRuntimeException e) { - lastException = e; RuntimeException finalException = - GrpcRetryerUtils.createFinalExceptionIfNotRetryable(e, options); + GrpcRetryerUtils.createFinalExceptionIfNotRetryable(e, lastException, options); if (finalException != null) { throw finalException; } + lastException = e; } // No catch block for any other exceptions because we don't retry them, we pass them through. // It's designed this way because it's GrpcRetryer, not general purpose retryer. diff --git a/temporal-serviceclient/src/test/java/io/temporal/internal/retryer/GrpcAsyncRetryerTest.java b/temporal-serviceclient/src/test/java/io/temporal/internal/retryer/GrpcAsyncRetryerTest.java index 3ffdc32f61..5fab2113b8 100644 --- a/temporal-serviceclient/src/test/java/io/temporal/internal/retryer/GrpcAsyncRetryerTest.java +++ b/temporal-serviceclient/src/test/java/io/temporal/internal/retryer/GrpcAsyncRetryerTest.java @@ -215,4 +215,37 @@ public void testDeadlineExceededException() throws InterruptedException { assertEquals("If the exception is DEADLINE_EXCEEDED, we shouldn't retry", 1, attempts.get()); } + + @Test + public void testDeadlineExceededAfterAnotherException() throws InterruptedException { + RpcRetryOptions options = + RpcRetryOptions.newBuilder() + .setInitialInterval(Duration.ofMillis(10)) + .setMaximumInterval(Duration.ofMillis(100)) + .validateBuildWithDefaults(); + long start = System.currentTimeMillis(); + final AtomicInteger attempts = new AtomicInteger(); + try { + DEFAULT_ASYNC_RETRYER + .retry( + options, + () -> { + CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally( + new StatusRuntimeException( + attempts.incrementAndGet() > 1 + ? Status.fromCode(Status.Code.DEADLINE_EXCEEDED) + : Status.fromCode(Status.Code.DATA_LOSS))); + return future; + }) + .get(); + fail("unreachable"); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof StatusRuntimeException); + assertEquals( + "We should get a previous exception in case of DEADLINE_EXCEEDED", + Status.Code.DATA_LOSS, + ((StatusRuntimeException) e.getCause()).getStatus().getCode()); + } + } } diff --git a/temporal-serviceclient/src/test/java/io/temporal/internal/retryer/GrpcSyncRetryerTest.java b/temporal-serviceclient/src/test/java/io/temporal/internal/retryer/GrpcSyncRetryerTest.java index 0a27488793..e460ecc7fa 100644 --- a/temporal-serviceclient/src/test/java/io/temporal/internal/retryer/GrpcSyncRetryerTest.java +++ b/temporal-serviceclient/src/test/java/io/temporal/internal/retryer/GrpcSyncRetryerTest.java @@ -162,4 +162,32 @@ public void testDeadlineExceededException() { assertEquals("If the exception is DEADLINE_EXCEEDED - we shouldn't retry", 1, attempts.get()); } + + @Test + public void testDeadlineExceededAfterAnotherException() { + RpcRetryOptions options = + RpcRetryOptions.newBuilder() + .setInitialInterval(Duration.ofMillis(10)) + .setMaximumInterval(Duration.ofMillis(100)) + .validateBuildWithDefaults(); + final AtomicInteger attempts = new AtomicInteger(); + try { + DEFAULT_SYNC_RETRYER.retry( + options, + () -> { + if (attempts.incrementAndGet() > 1) { + throw new StatusRuntimeException(Status.fromCode(Status.Code.DEADLINE_EXCEEDED)); + } else { + throw new StatusRuntimeException(Status.fromCode(Status.Code.DATA_LOSS)); + } + }); + fail("unreachable"); + } catch (Exception e) { + assertTrue(e instanceof StatusRuntimeException); + assertEquals( + "We should get a previous exception in case of DEADLINE_EXCEEDED", + Status.Code.DATA_LOSS, + ((StatusRuntimeException) e).getStatus().getCode()); + } + } }