diff --git a/README.md b/README.md index 4b70c36c..8ed1b4af 100644 --- a/README.md +++ b/README.md @@ -38,12 +38,8 @@ Asynchronous invocations are performed and retried on a scheduled executor. When ```java Recurrent.withRetries(() -> connect(), retryPolicy, executor) - .whenComplete((connection, failure) -> { - if (connection != null) - log.info("Connection established"); - else - log.error("Connection attempts failed", failure); - }); + .whenSuccess((connection) -> log.info("Connection established to {}", connection) + .whenFailure((failure) -> log.error("Connection attempts failed", failure)); ``` #### Integrating with Asynchronous Code @@ -52,13 +48,12 @@ Asynchronous code often reports failures via callbacks rather than throwing an e ```java Recurrent.withRetries((invocation) -> { - someService.connect(host, port).addListener((connection, failure) -> { - if (connection != null) - log.info("Connection established"); - else if (!invocation.retry()) - log.error("Connection attempts failed", failure) + someService.connect(host, port).addFailureListener((failure) -> { + // Manually retry invocation + if (!invocation.retry()) + log.error("Connection attempts failed", failure) } - }, retryPolicy, eventLoopGroup)); + }, retryPolicy, executor)); ``` ## Notes diff --git a/pom.xml b/pom.xml index bf998687..e6495b9c 100644 --- a/pom.xml +++ b/pom.xml @@ -48,6 +48,12 @@ 0.3.4-SNAPSHOT test + + io.reactivex + rxjava + 1.0.12 + test + diff --git a/src/main/java/net/jodah/recurrent/AsyncCallable.java b/src/main/java/net/jodah/recurrent/AsyncCallable.java new file mode 100644 index 00000000..a340cecb --- /dev/null +++ b/src/main/java/net/jodah/recurrent/AsyncCallable.java @@ -0,0 +1,85 @@ +package net.jodah.recurrent; + +import java.util.concurrent.Callable; + +import net.jodah.recurrent.event.CompletionListener; + +/** + * A callable that performs async callbacks. + * + * @author Jonathan Halterman + * @param result type + */ +abstract class AsyncCallable implements Callable { + protected Invocation invocation; + protected CompletionListener listener; + + static AsyncCallable of(final Callable callable) { + return new AsyncCallable() { + @Override + public T call() throws Exception { + try { + T result = callable.call(); + listener.onCompletion(result, null); + return result; + } catch (Exception e) { + listener.onCompletion(null, e); + return null; + } + } + }; + } + + static AsyncCallable of(final RetryableCallable callable) { + return new AsyncCallable() { + @Override + public T call() throws Exception { + try { + T result = callable.call(invocation); + listener.onCompletion(result, null); + return result; + } catch (Exception e) { + listener.onCompletion(null, e); + return null; + } + } + }; + } + + static AsyncCallable of(final RetryableRunnable runnable) { + return new AsyncCallable() { + @Override + public Void call() throws Exception { + try { + runnable.run(invocation); + listener.onCompletion(null, null); + } catch (Exception e) { + listener.onCompletion(null, e); + } + + return null; + } + }; + } + + static AsyncCallable of(final Runnable runnable) { + return new AsyncCallable() { + @Override + public Void call() throws Exception { + try { + runnable.run(); + listener.onCompletion(null, null); + } catch (Exception e) { + listener.onCompletion(null, e); + } + + return null; + } + }; + } + + void initialize(Invocation invocation, CompletionListener listener) { + this.invocation = invocation; + this.listener = listener; + } +} \ No newline at end of file diff --git a/src/main/java/net/jodah/recurrent/Callables.java b/src/main/java/net/jodah/recurrent/Callables.java index 56e29228..84c3b960 100644 --- a/src/main/java/net/jodah/recurrent/Callables.java +++ b/src/main/java/net/jodah/recurrent/Callables.java @@ -2,13 +2,17 @@ import java.util.concurrent.Callable; +import net.jodah.recurrent.event.CompletionListener; +import net.jodah.recurrent.event.FailureListener; +import net.jodah.recurrent.event.SuccessListener; + /** * Utilities for creating callables. * * @author Jonathan Halterman */ final class Callables { - static Callable callable(CompletionListener listener, final T result, final Throwable failure) { + static Callable of(CompletionListener listener, final T result, final Throwable failure) { return new Callable() { @Override public T call() { @@ -18,7 +22,17 @@ public T call() { }; } - static Callable callable(final Runnable runnable) { + static Callable of(FailureListener listener, final Throwable failure) { + return new Callable() { + @Override + public T call() { + listener.onFailure(failure); + return null; + } + }; + } + + static Callable of(final Runnable runnable) { return new Callable() { @Override public Void call() { @@ -27,4 +41,14 @@ public Void call() { } }; } + + static Callable of(SuccessListener listener, final T result) { + return new Callable() { + @Override + public T call() { + listener.onSuccess(result); + return null; + } + }; + } } diff --git a/src/main/java/net/jodah/recurrent/CompletableFuture.java b/src/main/java/net/jodah/recurrent/CompletableFuture.java index 25a64232..5d6aa750 100644 --- a/src/main/java/net/jodah/recurrent/CompletableFuture.java +++ b/src/main/java/net/jodah/recurrent/CompletableFuture.java @@ -6,6 +6,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import net.jodah.recurrent.event.CompletionListener; +import net.jodah.recurrent.event.FailureListener; +import net.jodah.recurrent.event.SuccessListener; import net.jodah.recurrent.internal.util.concurrent.InterruptableWaiter; /** @@ -17,6 +20,10 @@ class CompletableFuture implements ListenableFuture { private volatile boolean done; private volatile CompletionListener completionListener; private volatile CompletionListener asyncCompletionListener; + private volatile SuccessListener successListener; + private volatile SuccessListener asyncSuccessListener; + private volatile FailureListener failureListener; + private volatile FailureListener asyncFailureListener; private volatile InterruptableWaiter waiter; private volatile T result; private volatile Throwable failure; @@ -36,10 +43,23 @@ void complete(T result, Throwable failure) { if (waiter != null) waiter.interruptWaiters(); + // Async callbacks if (asyncCompletionListener != null) - executor.schedule(Callables.callable(asyncCompletionListener, result, failure), 0, TimeUnit.MILLISECONDS); + executor.schedule(Callables.of(asyncCompletionListener, result, failure), 0, TimeUnit.MILLISECONDS); + if (failure == null) { + if (asyncSuccessListener != null) + executor.schedule(Callables.of(asyncSuccessListener, result), 0, TimeUnit.MILLISECONDS); + } else if (asyncFailureListener != null) + executor.schedule(Callables.of(asyncFailureListener, failure), 0, TimeUnit.MILLISECONDS); + + // Sync callbacks if (completionListener != null) completionListener.onCompletion(result, failure); + if (failure == null) { + if (successListener != null) + successListener.onSuccess(result); + } else if (failureListener != null) + failureListener.onFailure(failure); } @Override @@ -99,7 +119,7 @@ public ListenableFuture whenComplete(CompletionListener completionListener @Override public ListenableFuture whenCompleteAsync(CompletionListener completionListener) { if (done) - executor.schedule(Callables.callable(completionListener, result, failure), 0, TimeUnit.MILLISECONDS); + executor.schedule(Callables.of(completionListener, result, failure), 0, TimeUnit.MILLISECONDS); else this.completionListener = completionListener; return this; @@ -109,11 +129,69 @@ public ListenableFuture whenCompleteAsync(CompletionListener completionLis public ListenableFuture whenCompleteAsync(CompletionListener completionListener, ScheduledExecutorService executor) { if (done) - executor.schedule(Callables.callable(completionListener, result, failure), 0, TimeUnit.MILLISECONDS); + executor.schedule(Callables.of(completionListener, result, failure), 0, TimeUnit.MILLISECONDS); else { this.asyncCompletionListener = completionListener; this.executor = executor; } return this; } + + @Override + public ListenableFuture whenFailure(FailureListener failureListener) { + if (!done) + this.failureListener = failureListener; + else + failureListener.onFailure(failure); + return this; + } + + @Override + public ListenableFuture whenFailureAsync(FailureListener failureListener) { + if (done) + executor.schedule(Callables.of(failureListener, failure), 0, TimeUnit.MILLISECONDS); + else + this.failureListener = failureListener; + return this; + } + + @Override + public ListenableFuture whenFailureAsync(FailureListener failureListener, ScheduledExecutorService executor) { + if (done) + executor.schedule(Callables.of(failureListener, failure), 0, TimeUnit.MILLISECONDS); + else { + this.asyncFailureListener = failureListener; + this.executor = executor; + } + return this; + } + + @Override + public ListenableFuture whenSuccess(SuccessListener successListener) { + if (!done) + this.successListener = successListener; + else + successListener.onSuccess(result); + return this; + } + + @Override + public ListenableFuture whenSuccessAsync(SuccessListener successListener) { + if (done) + executor.schedule(Callables.of(successListener, result), 0, TimeUnit.MILLISECONDS); + else + this.successListener = successListener; + return this; + } + + @Override + public ListenableFuture whenSuccessAsync(SuccessListener successListener, ScheduledExecutorService executor) { + if (done) + executor.schedule(Callables.of(successListener, result), 0, TimeUnit.MILLISECONDS); + else { + this.asyncSuccessListener = successListener; + this.executor = executor; + } + return this; + } } \ No newline at end of file diff --git a/src/main/java/net/jodah/recurrent/ContextualCallable.java b/src/main/java/net/jodah/recurrent/ContextualCallable.java deleted file mode 100644 index f0bac94c..00000000 --- a/src/main/java/net/jodah/recurrent/ContextualCallable.java +++ /dev/null @@ -1,56 +0,0 @@ -package net.jodah.recurrent; - -import java.util.concurrent.Callable; - -/** - * A callable that performs a call within the context of an invocation, calling a listener after the call is complete. - * - * @author Jonathan Halterman - * @param result type - */ -abstract class ContextualCallable implements Callable { - protected Invocation invocation; - protected CompletionListener listener; - - static ContextualCallable of(final RetryableCallable callable) { - return new ContextualCallable() { - @Override - public T call() throws Exception { - T result = null; - Throwable failure = null; - - try { - result = callable.call(invocation); - } catch (Exception e) { - failure = e; - } - - listener.onCompletion(result, failure); - return result; - } - }; - } - - static ContextualCallable of(final RetryableRunnable runnable) { - return new ContextualCallable() { - @Override - public Void call() throws Exception { - Throwable failure = null; - - try { - runnable.run(invocation); - } catch (Exception e) { - failure = e; - } - - listener.onCompletion(null, failure); - return null; - } - }; - } - - void initialize(Invocation invocation, CompletionListener listener) { - this.invocation = invocation; - this.listener = listener; - } -} \ No newline at end of file diff --git a/src/main/java/net/jodah/recurrent/Invocation.java b/src/main/java/net/jodah/recurrent/Invocation.java index a0c2dba7..010c681b 100644 --- a/src/main/java/net/jodah/recurrent/Invocation.java +++ b/src/main/java/net/jodah/recurrent/Invocation.java @@ -10,7 +10,7 @@ * A retryable invocation. * * @author Jonathan Halterman - * @param invocation result type + * @param result type */ class Invocation { private RetryPolicy retryPolicy; @@ -24,10 +24,6 @@ class Invocation { long waitTime; Invocation(Callable callable, RetryPolicy retryPolicy, ScheduledExecutorService executor) { - initialize(callable, retryPolicy, executor); - } - - void initialize(Callable callable, RetryPolicy retryPolicy, ScheduledExecutorService executor) { this.callable = callable; this.retryPolicy = retryPolicy; this.executor = executor; @@ -49,7 +45,24 @@ public boolean retry() { } /** - * Records a failed attempt, incrementing the attempt count and time. + * Returns the wait time. + */ + Duration getWaitTime() { + return new Duration(waitTime, TimeUnit.NANOSECONDS); + } + + /** + * Returns true if the max retries or max duration for the retry policy have been exceeded, else false. + */ + boolean isPolicyExceeded() { + boolean withinMaxRetries = retryPolicy.getMaxRetries() == -1 || attemptCount <= retryPolicy.getMaxRetries(); + boolean withinMaxDuration = retryPolicy.getMaxDuration() == null + || System.nanoTime() - startTime < retryPolicy.getMaxDuration().toNanos(); + return !withinMaxRetries || !withinMaxDuration; + } + + /** + * Records a failed attempt and adjusts the wait time. */ void recordFailedAttempt() { attemptCount++; @@ -65,21 +78,4 @@ void recordFailedAttempt() { waitTime = Math.min(waitTime, maxRemainingWaitTime < 0 ? 0 : maxRemainingWaitTime); } } - - /** - * Returns the wait time. - */ - Duration getWaitTime() { - return new Duration(waitTime, TimeUnit.NANOSECONDS); - } - - /** - * Returns true if the max retries or max duration for the retry policy have been exceeded, else false. - */ - public boolean isPolicyExceeded() { - boolean withinMaxRetries = retryPolicy.getMaxRetries() == -1 || attemptCount <= retryPolicy.getMaxRetries(); - boolean withinMaxDuration = retryPolicy.getMaxDuration() == null - || System.nanoTime() - startTime < retryPolicy.getMaxDuration().toNanos(); - return !withinMaxRetries || !withinMaxDuration; - } } \ No newline at end of file diff --git a/src/main/java/net/jodah/recurrent/ListenableFuture.java b/src/main/java/net/jodah/recurrent/ListenableFuture.java index 2673f175..ca216721 100644 --- a/src/main/java/net/jodah/recurrent/ListenableFuture.java +++ b/src/main/java/net/jodah/recurrent/ListenableFuture.java @@ -3,10 +3,26 @@ import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; +import net.jodah.recurrent.event.CompletionListener; +import net.jodah.recurrent.event.FailureListener; +import net.jodah.recurrent.event.SuccessListener; + public interface ListenableFuture extends Future { ListenableFuture whenComplete(CompletionListener completionListener); ListenableFuture whenCompleteAsync(CompletionListener completionListener); ListenableFuture whenCompleteAsync(CompletionListener completionListener, ScheduledExecutorService executor); + + ListenableFuture whenFailure(FailureListener failureListener); + + ListenableFuture whenFailureAsync(FailureListener failureListener); + + ListenableFuture whenFailureAsync(FailureListener failureListener, ScheduledExecutorService executor); + + ListenableFuture whenSuccess(SuccessListener successListener); + + ListenableFuture whenSuccessAsync(SuccessListener successListener); + + ListenableFuture whenSuccessAsync(SuccessListener successListener, ScheduledExecutorService executor); } diff --git a/src/main/java/net/jodah/recurrent/Recurrent.java b/src/main/java/net/jodah/recurrent/Recurrent.java index 9e26696d..df424e80 100644 --- a/src/main/java/net/jodah/recurrent/Recurrent.java +++ b/src/main/java/net/jodah/recurrent/Recurrent.java @@ -4,6 +4,13 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import net.jodah.recurrent.event.CompletionListener; + +/** + * Performs invocations with synchronous or asynchronous retries according to a {@link RetryPolicy}. + * + * @author Jonathan Halterman + */ public final class Recurrent { private Recurrent() { } @@ -32,45 +39,69 @@ public void run(Invocation invocation) { }; } + /** + * Invokes the {@code callable}, sleeping between invocation attempts according to the {@code retryPolicy}. + */ public static T withRetries(Callable callable, RetryPolicy retryPolicy) { return call(callable, retryPolicy); } + /** + * Invokes the {@code callable}, scheduling retries with the {@code executor} according to the {@code retryPolicy}. + */ public static ListenableFuture withRetries(Callable callable, RetryPolicy retryPolicy, ScheduledExecutorService executor) { - return call(ContextualCallable.of(retryable(callable)), retryPolicy, executor, false); + return call(AsyncCallable.of(callable), retryPolicy, executor, false); } + /** + * Invokes the {@code callable}, performing the initial call synchronously and scheduling retries with the + * {@code executor} according to the {@code retryPolicy}. + */ public static ListenableFuture withRetries(Callable callable, RetryPolicy retryPolicy, - ScheduledExecutorService executor, boolean initialCallSynchronous) { - return call(ContextualCallable.of(retryable(callable)), retryPolicy, executor, true); + ScheduledExecutorService executor, boolean initialSynchronousCall) { + return call(AsyncCallable.of(callable), retryPolicy, executor, initialSynchronousCall); } + /** + * Invokes the {@code callable}, scheduling retries with the {@code executor} according to the {@code retryPolicy}. + */ public static ListenableFuture withRetries(RetryableCallable callable, RetryPolicy retryPolicy, ScheduledExecutorService executor) { - return call(ContextualCallable.of(callable), retryPolicy, executor, false); + return call(AsyncCallable.of(callable), retryPolicy, executor, false); } + /** + * Invokes the {@code callable}, performing the initial call synchronously and scheduling retries with the + * {@code executor} according to the {@code retryPolicy}. + */ public static ListenableFuture withRetries(RetryableCallable callable, RetryPolicy retryPolicy, - ScheduledExecutorService executor, boolean initialCallSynchronous) { - return call(ContextualCallable.of(callable), retryPolicy, executor, true); + ScheduledExecutorService executor, boolean initialSynchronousCall) { + return call(AsyncCallable.of(callable), retryPolicy, executor, initialSynchronousCall); } + /** + * Invokes the {@code runnable}, scheduling retries with the {@code executor} according to the {@code retryPolicy}. + */ public static ListenableFuture withRetries(RetryableRunnable runnable, RetryPolicy retryPolicy, ScheduledExecutorService executor) { - return call(ContextualCallable.of(runnable), retryPolicy, executor, false); + return call(AsyncCallable.of(runnable), retryPolicy, executor, false); } + /** + * Invokes the {@code runnable}, performing the initial call synchronously and scheduling retries with the + * {@code executor} according to the {@code retryPolicy}. + */ public static ListenableFuture withRetries(RetryableRunnable runnable, RetryPolicy retryPolicy, - ScheduledExecutorService executor, boolean initialCallSynchronous) { - return call(ContextualCallable.of(runnable), retryPolicy, executor, true); + ScheduledExecutorService executor, boolean initialSynchronousCall) { + return call(AsyncCallable.of(runnable), retryPolicy, executor, initialSynchronousCall); } /** * Invokes the {@code runnable}, sleeping between invocation attempts according to the {@code retryPolicy}. */ public static void withRetries(Runnable runnable, RetryPolicy retryPolicy) { - call(Callables.callable(runnable), retryPolicy); + call(Callables.of(runnable), retryPolicy); } /** @@ -78,12 +109,16 @@ public static void withRetries(Runnable runnable, RetryPolicy retryPolicy) { */ public static ListenableFuture withRetries(Runnable runnable, RetryPolicy retryPolicy, ScheduledExecutorService executor) { - return call(ContextualCallable.of(retryable(runnable)), retryPolicy, executor, false); + return call(AsyncCallable.of(runnable), retryPolicy, executor, false); } + /** + * Invokes the {@code runnable}, performing the initial call synchronously and scheduling retries with the + * {@code executor} according to the {@code retryPolicy}. + */ public static ListenableFuture withRetries(Runnable runnable, RetryPolicy retryPolicy, - ScheduledExecutorService executor, boolean initialCallSynchronous) { - return call(ContextualCallable.of(retryable(runnable)), retryPolicy, executor, true); + ScheduledExecutorService executor, boolean initialSynchronousCall) { + return call(AsyncCallable.of(runnable), retryPolicy, executor, initialSynchronousCall); } /** @@ -115,41 +150,33 @@ private static T call(Callable callable, RetryPolicy retryPolicy) { /** * Calls the {@code callable} via the {@code executor}, performing retries according to the {@code retryPolicy}. */ - private static ListenableFuture call(final ContextualCallable callable, final RetryPolicy retryPolicy, - final ScheduledExecutorService executor, boolean scheduleInitialCall) { + private static ListenableFuture call(final AsyncCallable callable, final RetryPolicy retryPolicy, + final ScheduledExecutorService executor, boolean initialSynchronousCall) { final CompletableFuture future = new CompletableFuture(executor); final Invocation invocation = new Invocation(callable, retryPolicy, executor); callable.initialize(invocation, new CompletionListener() { public void onCompletion(T result, Throwable failure) { - if (failure == null) { - future.complete(result, failure); - return; + if (failure == null) + future.complete(result, null); + else { + // TODO fail on specific exceptions + invocation.recordFailedAttempt(); + if (invocation.isPolicyExceeded()) + future.complete(null, failure); + else + future.setFuture(executor.schedule(callable, invocation.waitTime, TimeUnit.NANOSECONDS)); } - - invocation.recordFailedAttempt(); - - // TODO fail on specific exceptions - if (invocation.isPolicyExceeded()) - future.complete(result, failure); - else - future.setFuture(executor.schedule(callable, invocation.waitTime, TimeUnit.NANOSECONDS)); } }); - if (scheduleInitialCall) { - future.setFuture(executor.schedule(callable, invocation.waitTime, TimeUnit.NANOSECONDS)); - } else { + if (initialSynchronousCall) { try { callable.call(); - } catch (Throwable t) { - invocation.recordFailedAttempt(); - if (invocation.isPolicyExceeded()) - future.complete(null, t); - - // Asynchronous retry - future.setFuture(executor.schedule(callable, invocation.waitTime, TimeUnit.NANOSECONDS)); + } catch (Throwable unreachable) { } + } else { + future.setFuture(executor.schedule(callable, invocation.waitTime, TimeUnit.NANOSECONDS)); } return future; diff --git a/src/main/java/net/jodah/recurrent/CompletionListener.java b/src/main/java/net/jodah/recurrent/event/CompletionListener.java similarity index 91% rename from src/main/java/net/jodah/recurrent/CompletionListener.java rename to src/main/java/net/jodah/recurrent/event/CompletionListener.java index 8821efed..5ba39ada 100644 --- a/src/main/java/net/jodah/recurrent/CompletionListener.java +++ b/src/main/java/net/jodah/recurrent/event/CompletionListener.java @@ -1,4 +1,4 @@ -package net.jodah.recurrent; +package net.jodah.recurrent.event; /** * Listens for an asynchronous invocation to complete. diff --git a/src/main/java/net/jodah/recurrent/event/FailureListener.java b/src/main/java/net/jodah/recurrent/event/FailureListener.java new file mode 100644 index 00000000..b2b4b12e --- /dev/null +++ b/src/main/java/net/jodah/recurrent/event/FailureListener.java @@ -0,0 +1,14 @@ +package net.jodah.recurrent.event; + +/** + * Listens for an asynchronous invocation to fail. + * + * @author Jonathan Halterman + * @param result type + */ +public interface FailureListener { + /** + * Handles the failure of a call. + */ + void onFailure(Throwable failure); +} diff --git a/src/main/java/net/jodah/recurrent/event/SuccessListener.java b/src/main/java/net/jodah/recurrent/event/SuccessListener.java new file mode 100644 index 00000000..5d0948cb --- /dev/null +++ b/src/main/java/net/jodah/recurrent/event/SuccessListener.java @@ -0,0 +1,14 @@ +package net.jodah.recurrent.event; + +/** + * Listens for an asynchronous invocation to succeed. + * + * @author Jonathan Halterman + * @param result type + */ +public interface SuccessListener { + /** + * Handles the successful completion of a call. + */ + void onSuccess(T result); +} diff --git a/src/test/java/net/jodah/recurrent/Asserts.java b/src/test/java/net/jodah/recurrent/Asserts.java index 84d39360..66d56627 100644 --- a/src/test/java/net/jodah/recurrent/Asserts.java +++ b/src/test/java/net/jodah/recurrent/Asserts.java @@ -1,25 +1,57 @@ package net.jodah.recurrent; import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertTrue; -import static org.testng.Assert.fail; + +import java.util.function.Consumer; + +import org.testng.Assert; public class Asserts { - public static void shouldFail(Runnable runnable, Class expected) { + @FunctionalInterface + public interface ThrowableRunnable { + void run() throws Throwable; + } + + /** + * Asserts thrown exceptions, including checked exceptions, via a lambda. + *

+ * Sweet idea courtesy of Lukas Eder: http://blog.jooq.org/2014/05/23/java-8-friday-better-exceptions/ + */ + public static void assertThrows(Throwable throwable, ThrowableRunnable runnable) { + boolean fail = false; try { runnable.run(); - fail("A failure was expected"); - } catch (Exception e) { - assertTrue(e.getClass().isAssignableFrom(expected), "The expected exception was not of the expected type " + e); + } catch (Throwable t) { + assertEquals(throwable, t, "The expected exception was not thrown"); } + + if (fail) + Assert.fail("No exception was thrown"); + } + + /** + * Asserts thrown exceptions, including checked exceptions, via a lambda. + *

+ * Sweet idea courtesy of Lukas Eder: http://blog.jooq.org/2014/05/23/java-8-friday-better-exceptions/ + */ + public static void assertThrows(Class throwable, ThrowableRunnable runnable) { + assertThrows(throwable, runnable, t -> { + }); } - public static void shouldFail(Runnable runnable, Exception expected) { + public static void assertThrows(Class throwable, ThrowableRunnable runnable, + Consumer exceptionConsumer) { + boolean fail = false; try { runnable.run(); - fail("A failure was expected"); - } catch (Exception e) { - assertEquals(expected, e, "The expected exception was not of the expected type " + e); + fail = true; + } catch (Throwable t) { + if (!throwable.isInstance(t)) + Assert.fail("Bad exception type"); + exceptionConsumer.accept(t); } + + if (fail) + Assert.fail("No exception was thrown"); } } diff --git a/src/test/java/net/jodah/recurrent/RecurrentTest.java b/src/test/java/net/jodah/recurrent/RecurrentTest.java index b4640e82..0714cc11 100644 --- a/src/test/java/net/jodah/recurrent/RecurrentTest.java +++ b/src/test/java/net/jodah/recurrent/RecurrentTest.java @@ -1,10 +1,11 @@ package net.jodah.recurrent; -import static net.jodah.recurrent.Asserts.shouldFail; +import static net.jodah.recurrent.Asserts.assertThrows; import static net.jodah.recurrent.Testing.failAlways; import static net.jodah.recurrent.Testing.failNTimes; import static org.testng.Assert.assertEquals; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -12,7 +13,7 @@ import org.testng.annotations.Test; import net.jodah.concurrentunit.Waiter; -import net.jodah.recurrent.Testing.RecordingRunnable; +import net.jodah.recurrent.Testing.RecordingCallable; @Test public class RecurrentTest { @@ -25,26 +26,28 @@ protected void beforeMethod() { waiter = new Waiter(); } - public void testDoWithRetries() throws Throwable { + public void testWithRetries() throws Throwable { // Fail twice then succeed RuntimeException expectedFailure = new IllegalArgumentException(); - RecordingRunnable runnable = failNTimes(2, expectedFailure); - Recurrent.withRetries(runnable, new RetryPolicy()); + RecordingCallable runnable = failNTimes(2, expectedFailure, "foo"); + assertEquals(Recurrent.withRetries(runnable, new RetryPolicy()), "foo"); assertEquals(runnable.failures, 2); // Fail three times - final RecordingRunnable runnable2 = failAlways(expectedFailure); - shouldFail(() -> Recurrent.withRetries(runnable2, retryTwice), expectedFailure); + final RecordingCallable runnable2 = failAlways(expectedFailure); + assertThrows(expectedFailure, () -> Recurrent.withRetries(runnable2, retryTwice)); assertEquals(runnable2.failures, 3); } - public void testDoWithRetriesWithScheduler() throws Throwable { + public void testWithRetriesWithScheduler() throws Throwable { RuntimeException expectedFailure = new IllegalArgumentException(); // Fail twice then succeed waiter.expectResume(); - RecordingRunnable runnable = failNTimes(2, expectedFailure); + RecordingCallable runnable = failNTimes(2, expectedFailure, "foo"); Recurrent.withRetries(runnable, new RetryPolicy(), executor).whenComplete((result, failure) -> { + waiter.assertEquals("foo", result); + waiter.assertNull(failure); waiter.resume(); }); @@ -53,13 +56,30 @@ public void testDoWithRetriesWithScheduler() throws Throwable { // Fail three times waiter.expectResume(); - runnable = failAlways(expectedFailure); - Recurrent.withRetries(runnable, retryTwice, executor).whenComplete((result, failure) -> { - waiter.assertEquals(expectedFailure, failure); + RecordingCallable runnable2 = failAlways(expectedFailure); + assertThrows(ExecutionException.class, + () -> Recurrent.withRetries(runnable2, retryTwice, executor).whenComplete((result, failure) -> { + waiter.assertNull(result); + waiter.assertEquals(expectedFailure, failure); + waiter.resume(); + }).get()); + + waiter.await(); + assertEquals(runnable2.failures, 3); + } + + public void testWithRetriesWithInitialSynchronousCall() throws Throwable { + RuntimeException expectedFailure = new IllegalArgumentException(); + RecordingCallable runnable = failNTimes(2, expectedFailure, "foo"); + waiter.expectResume(); + + // Fail twice then succeed + Recurrent.withRetries(runnable, new RetryPolicy(), executor, true).whenComplete((result, failure) -> { + waiter.assertEquals("foo", result); + waiter.assertNull(failure); waiter.resume(); }); waiter.await(); - assertEquals(runnable.failures, 3); } } diff --git a/src/test/java/net/jodah/recurrent/Testing.java b/src/test/java/net/jodah/recurrent/Testing.java index 1df1cd36..59d897d5 100644 --- a/src/test/java/net/jodah/recurrent/Testing.java +++ b/src/test/java/net/jodah/recurrent/Testing.java @@ -1,28 +1,48 @@ package net.jodah.recurrent; +import java.util.concurrent.Callable; +import java.util.function.Consumer; + public class Testing { - public static class RecordingRunnable implements Runnable { + public static class RecordingCallable implements Callable { public int failures; int n; RuntimeException exception; + T eventualResult; - RecordingRunnable(int n, RuntimeException exception) { + RecordingCallable(int n, RuntimeException exception, T eventualResult) { this.n = n; this.exception = exception; + this.eventualResult = eventualResult; } - public void run() { - failures++; - if (n == -1 || failures < n) + public T call() { + if (n == -1 || failures < n) { + failures++; throw exception; + } + return eventualResult; } }; - public static RecordingRunnable failNTimes(int n, RuntimeException exception) { - return new RecordingRunnable(n, exception); + public static void withExceptions(Runnable runnable) { + withExceptions(runnable, t -> { + }); + } + + public static void withExceptions(Runnable runnable, Consumer exceptionConsumer) { + try { + runnable.run(); + } catch (Throwable t) { + exceptionConsumer.accept(t); + } + } + + public static RecordingCallable failNTimes(int n, RuntimeException exception, T eventualResult) { + return new RecordingCallable(n, exception, eventualResult); } - public static RecordingRunnable failAlways(RuntimeException exception) { - return new RecordingRunnable(-1, exception); + public static RecordingCallable failAlways(RuntimeException exception) { + return new RecordingCallable(-1, exception, null); } }