diff --git a/src/main/java/net/jodah/failsafe/AsyncExecution.java b/src/main/java/net/jodah/failsafe/AsyncExecution.java index dcb8ab41..1f558689 100644 --- a/src/main/java/net/jodah/failsafe/AsyncExecution.java +++ b/src/main/java/net/jodah/failsafe/AsyncExecution.java @@ -18,7 +18,9 @@ import net.jodah.failsafe.internal.util.Assert; import net.jodah.failsafe.util.concurrent.Scheduler; -import java.util.concurrent.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.function.Supplier; /** @@ -29,9 +31,6 @@ */ @SuppressWarnings("WeakerAccess") public final class AsyncExecution extends AbstractExecution { - /** Used to complete a promise for a scheduled execution */ - static final Exception SCHEDULED = new Exception("The execution has been scheduled"); - private Supplier> executionSupplier; final FailsafeFuture future; final Scheduler scheduler; @@ -159,9 +158,13 @@ void executeAsync(Supplier> supplier) { supplier.get().whenComplete(this::complete); } - void executeAsyncExecution(Supplier supplier) { - executionSupplier = Functions.makeAsyncExecution(supplier, scheduler, future); - executionSupplier.get(); + /** + * Performs an asynchronous execution where the execution must be manually completed via the provided AsyncExecution. + */ + @SuppressWarnings("unchecked") + void executeAsyncExecution(Supplier> supplier) { + executionSupplier = supplier; + future.inject((Future) scheduler.schedule(supplier::get, 0, TimeUnit.NANOSECONDS)); } /** @@ -189,7 +192,7 @@ boolean completeOrHandle(Object result, Throwable failure) { } private void complete(ExecutionResult result, Throwable error) { - if (AsyncExecution.SCHEDULED.equals(error)) + if (result == null && error == null) return; completed = true; diff --git a/src/main/java/net/jodah/failsafe/FailsafeExecutor.java b/src/main/java/net/jodah/failsafe/FailsafeExecutor.java index dc7eee1b..9707ded9 100644 --- a/src/main/java/net/jodah/failsafe/FailsafeExecutor.java +++ b/src/main/java/net/jodah/failsafe/FailsafeExecutor.java @@ -87,7 +87,7 @@ public T get(ContextualSupplier supplier) { * @throws RejectedExecutionException if the {@code supplier} cannot be scheduled for execution */ public CompletableFuture getAsync(CheckedSupplier supplier) { - return callAsync(execution -> Functions.promiseOf(supplier, execution)); + return callAsync(execution -> Functions.promiseOf(supplier, execution), false); } /** @@ -101,7 +101,7 @@ public CompletableFuture getAsync(CheckedSupplier supplier) * @throws RejectedExecutionException if the {@code supplier} cannot be scheduled for execution */ public CompletableFuture getAsync(ContextualSupplier supplier) { - return callAsync(execution -> Functions.promiseOf(supplier, execution)); + return callAsync(execution -> Functions.promiseOf(supplier, execution), false); } /** @@ -116,7 +116,7 @@ public CompletableFuture getAsync(ContextualSupplier supplie * @throws RejectedExecutionException if the {@code supplier} cannot be scheduled for execution */ public CompletableFuture getAsyncExecution(AsyncSupplier supplier) { - return callAsyncExecution(execution -> Functions.asyncOfExecution(supplier, execution)); + return callAsync(execution -> Functions.asyncOfExecution(supplier, execution), true); } void handleComplete(ExecutionResult result, ExecutionContext context) { @@ -167,7 +167,7 @@ public FailsafeExecutor onSuccess(CheckedConsumer CompletableFuture getStageAsync(CheckedSupplier> supplier) { - return callAsync(execution -> Functions.promiseOfStage(supplier, execution)); + return callAsync(execution -> Functions.promiseOfStage(supplier, execution), false); } /** @@ -181,7 +181,7 @@ public CompletableFuture getStageAsync(CheckedSupplier CompletableFuture getStageAsync(ContextualSupplier> supplier) { - return callAsync(execution -> Functions.promiseOfStage(supplier, execution)); + return callAsync(execution -> Functions.promiseOfStage(supplier, execution), false); } /** @@ -197,7 +197,7 @@ public CompletableFuture getStageAsync(ContextualSupplier CompletableFuture getStageAsyncExecution( AsyncSupplier> supplier) { - return callAsyncExecution(execution -> Functions.asyncOfFutureExecution(supplier, execution)); + return callAsync(execution -> Functions.asyncOfFutureExecution(supplier, execution), true); } /** @@ -234,7 +234,7 @@ public void run(ContextualRunnable runnable) { * @throws RejectedExecutionException if the {@code runnable} cannot be scheduled for execution */ public CompletableFuture runAsync(CheckedRunnable runnable) { - return callAsync(execution -> Functions.promiseOf(runnable, execution)); + return callAsync(execution -> Functions.promiseOf(runnable, execution), false); } /** @@ -247,7 +247,7 @@ public CompletableFuture runAsync(CheckedRunnable runnable) { * @throws RejectedExecutionException if the {@code runnable} cannot be scheduled for execution */ public CompletableFuture runAsync(ContextualRunnable runnable) { - return callAsync(execution -> Functions.promiseOf(runnable, execution)); + return callAsync(execution -> Functions.promiseOf(runnable, execution), false); } /** @@ -262,7 +262,7 @@ public CompletableFuture runAsync(ContextualRunnable runnable) { * @throws RejectedExecutionException if the {@code runnable} cannot be scheduled for execution */ public CompletableFuture runAsyncExecution(AsyncRunnable runnable) { - return callAsyncExecution(execution -> Functions.asyncOfExecution(runnable, execution)); + return callAsync(execution -> Functions.asyncOfExecution(runnable, execution), true); } /** @@ -317,30 +317,14 @@ private T call(Function> supplierFn) { */ @SuppressWarnings("unchecked") private CompletableFuture callAsync( - Function>> supplierFn) { + Function>> supplierFn, boolean asyncExecution) { FailsafeFuture future = new FailsafeFuture(this); AsyncExecution execution = new AsyncExecution(scheduler, future, this); future.inject(execution); - execution.executeAsync(supplierFn.apply(execution)); - return future; - } - - /** - * Calls the asynchronous {@code supplier} via the configured Scheduler, handling results according to the configured - * policies, until any configured policy is exceeded or the AsyncExecution is completed. - *

- * If a configured circuit breaker is open, the resulting future is completed with {@link - * CircuitBreakerOpenException}. - * - * @throws NullPointerException if the {@code supplierFn} is null - * @throws RejectedExecutionException if the {@code supplierFn} cannot be scheduled for execution - */ - @SuppressWarnings("unchecked") - private CompletableFuture callAsyncExecution(Function> supplierFn) { - FailsafeFuture future = new FailsafeFuture(this); - AsyncExecution execution = new AsyncExecution(scheduler, future, this); - future.inject(execution); - execution.executeAsyncExecution(supplierFn.apply(execution)); + if (!asyncExecution) + execution.executeAsync(supplierFn.apply(execution)); + else + execution.executeAsyncExecution(supplierFn.apply(execution)); return future; } } \ No newline at end of file diff --git a/src/main/java/net/jodah/failsafe/Functions.java b/src/main/java/net/jodah/failsafe/Functions.java index 94c1b821..5ceec809 100644 --- a/src/main/java/net/jodah/failsafe/Functions.java +++ b/src/main/java/net/jodah/failsafe/Functions.java @@ -28,6 +28,8 @@ * @author Jonathan Halterman */ final class Functions { + private static final CompletableFuture NULL_FUTURE = CompletableFuture.completedFuture(null); + /** Returns a supplier that supplies the {@code result} once then uses the {@code supplier} for subsequent calls. */ static Supplier> supplyOnce(CompletableFuture result, Supplier> supplier) { @@ -72,25 +74,6 @@ static Supplier> makeAsync(Supplier Supplier> makeAsyncExecution(Supplier supplier, Scheduler scheduler, - FailsafeFuture future) { - return () -> { - CompletableFuture promise = new CompletableFuture<>(); - try { - future.inject((Future) scheduler.schedule(supplier::get, 0, TimeUnit.NANOSECONDS)); - promise.completeExceptionally(AsyncExecution.SCHEDULED); - } catch (Exception e) { - promise.completeExceptionally(e); - } - return promise; - }; - } - static Supplier> promiseOf(CheckedSupplier supplier, AbstractExecution execution) { Assert.notNull(supplier, "supplier"); @@ -156,34 +139,36 @@ static Supplier> promiseOf(ContextualRunnable }; } - static Supplier asyncOfExecution(AsyncSupplier supplier, AsyncExecution execution) { + static Supplier> asyncOfExecution(AsyncSupplier supplier, + AsyncExecution execution) { Assert.notNull(supplier, "supplier"); - return new Supplier() { + return new Supplier>() { @Override - public synchronized T get() { + public synchronized CompletableFuture get() { try { execution.preExecute(); - return supplier.get(execution); + supplier.get(execution); } catch (Throwable e) { execution.completeOrHandle(null, e); - return null; } + return NULL_FUTURE; } }; } - static Supplier asyncOfExecution(AsyncRunnable runnable, AsyncExecution execution) { + static Supplier> asyncOfExecution(AsyncRunnable runnable, + AsyncExecution execution) { Assert.notNull(runnable, "runnable"); - return new Supplier() { + return new Supplier>() { @Override - public synchronized T get() { + public synchronized CompletableFuture get() { try { execution.preExecute(); runnable.run(execution); } catch (Throwable e) { execution.completeOrHandle(null, e); } - return null; + return NULL_FUTURE; } }; } @@ -236,14 +221,14 @@ static Supplier> promiseOfStage( }; } - static Supplier> asyncOfFutureExecution( + static Supplier> asyncOfFutureExecution( AsyncSupplier> supplier, AsyncExecution execution) { Assert.notNull(supplier, "supplier"); - return new Supplier>() { + return new Supplier>() { Semaphore asyncFutureLock = new Semaphore(1); @Override - public CompletableFuture get() { + public CompletableFuture get() { try { execution.preExecute(); asyncFutureLock.acquire(); @@ -264,7 +249,7 @@ public CompletableFuture get() { } } - return null; + return NULL_FUTURE; } }; } diff --git a/src/main/java/net/jodah/failsafe/internal/executor/RetryPolicyExecutor.java b/src/main/java/net/jodah/failsafe/internal/executor/RetryPolicyExecutor.java index 9d15bce1..35b6ed42 100644 --- a/src/main/java/net/jodah/failsafe/internal/executor/RetryPolicyExecutor.java +++ b/src/main/java/net/jodah/failsafe/internal/executor/RetryPolicyExecutor.java @@ -122,8 +122,10 @@ else if (!future.isDone() && !future.isCancelled()) { postExecuteAsync(result, scheduler, future).thenAccept(postExecutionHandler); else postExecutionHandler.accept(result); - } else + } else if (error != null) promise.completeExceptionally(error); + else + promise.complete(null); return result; }); diff --git a/src/test/java/net/jodah/failsafe/AsyncFailsafeTest.java b/src/test/java/net/jodah/failsafe/AsyncFailsafeTest.java index 7d328e1b..db103d5b 100644 --- a/src/test/java/net/jodah/failsafe/AsyncFailsafeTest.java +++ b/src/test/java/net/jodah/failsafe/AsyncFailsafeTest.java @@ -101,7 +101,7 @@ public void shouldRunAsyncContextual() throws Throwable { }); } - public void shouldRunAsyncExecutor() throws Throwable { + public void shouldRunAsyncExecution() throws Throwable { assertRunAsync((AsyncRunnable) exec -> { try { service.connect(); @@ -163,7 +163,7 @@ public void shouldGetAsyncContextual() throws Throwable { }); } - public void shouldGetAsyncExecutor() throws Throwable { + public void shouldGetAsyncExecution() throws Throwable { assertGetAsync((AsyncSupplier) exec -> { try { boolean result = service.connect(); @@ -271,7 +271,7 @@ public void shouldCancelOnGetAsync() throws Throwable { })); } - public void shouldCancelOnGetAsyncExecutor() throws Throwable { + public void shouldCancelOnGetAsyncExecution() throws Throwable { assertCancel(executor -> getAsync(executor, (AsyncSupplier) (e) -> { Thread.sleep(1000); e.complete(); @@ -285,7 +285,7 @@ public void shouldCancelOnRunAsync() throws Throwable { })); } - public void shouldCancelOnRunAsyncExecutor() throws Throwable { + public void shouldCancelOnRunAsyncExecution() throws Throwable { assertCancel(executor -> runAsync(executor, (AsyncRunnable) (e) -> { Thread.sleep(1000); e.complete(); @@ -299,7 +299,7 @@ public void shouldCancelOnGetStageAsync() throws Throwable { })); } - public void shouldCancelOnGetStageAsyncExecutor() throws Throwable { + public void shouldCancelOnGetStageAsyncExecution() throws Throwable { assertCancel(executor -> getStageAsync(executor, (AsyncSupplier) (e) -> { Thread.sleep(1000); CompletableFuture result = CompletableFuture.completedFuture("test");